Commit 2c421359 authored by Michael Ritter's avatar Michael Ritter
Browse files

Update monitored items and add log events through the db

parent 16162d1b
...@@ -32,48 +32,54 @@ package edu.umiacs.ace.monitor.audit; ...@@ -32,48 +32,54 @@ package edu.umiacs.ace.monitor.audit;
import edu.umiacs.ace.driver.AuditIterable; import edu.umiacs.ace.driver.AuditIterable;
import edu.umiacs.ace.driver.DriverStateBean; import edu.umiacs.ace.driver.DriverStateBean;
import edu.umiacs.ace.driver.filter.PathFilter;
import edu.umiacs.ace.driver.FileBean; import edu.umiacs.ace.driver.FileBean;
import edu.umiacs.ace.driver.StorageDriver; import edu.umiacs.ace.driver.StorageDriver;
import edu.umiacs.ace.driver.filter.PathFilter;
import edu.umiacs.ace.driver.filter.SimpleFilter;
import edu.umiacs.ace.ims.api.IMSException; import edu.umiacs.ace.ims.api.IMSException;
import edu.umiacs.ace.ims.api.IMSService; import edu.umiacs.ace.ims.api.IMSService;
import edu.umiacs.ace.ims.api.TokenRequestBatch; import edu.umiacs.ace.ims.api.TokenRequestBatch;
import edu.umiacs.ace.ims.api.TokenValidator; import edu.umiacs.ace.ims.api.TokenValidator;
import edu.umiacs.ace.ims.ws.TokenRequest; import edu.umiacs.ace.ims.ws.TokenRequest;
import edu.umiacs.ace.monitor.access.CollectionCountContext; import edu.umiacs.ace.monitor.access.CollectionCountContext;
import edu.umiacs.ace.monitor.register.IngestThreadPool;
import edu.umiacs.ace.util.PersistUtil;
import edu.umiacs.ace.driver.filter.SimpleFilter;
import edu.umiacs.ace.monitor.compare.CollectionCompare2; import edu.umiacs.ace.monitor.compare.CollectionCompare2;
import edu.umiacs.ace.monitor.compare.CompareResults; import edu.umiacs.ace.monitor.compare.CompareResults;
import edu.umiacs.ace.remote.JsonGateway; import edu.umiacs.ace.monitor.core.Collection;
import edu.umiacs.ace.monitor.core.ConfigConstants;
import edu.umiacs.ace.monitor.core.MonitoredItem; import edu.umiacs.ace.monitor.core.MonitoredItem;
import edu.umiacs.ace.monitor.core.MonitoredItemManager; import edu.umiacs.ace.monitor.core.MonitoredItemManager;
import edu.umiacs.ace.monitor.log.LogEnum;
import edu.umiacs.ace.monitor.log.LogEvent;
import edu.umiacs.ace.monitor.log.LogEventManager; import edu.umiacs.ace.monitor.log.LogEventManager;
import edu.umiacs.ace.monitor.peers.PeerCollection;
import edu.umiacs.ace.monitor.reporting.ReportSummary; import edu.umiacs.ace.monitor.reporting.ReportSummary;
import edu.umiacs.ace.monitor.reporting.SchedulerContextListener; import edu.umiacs.ace.monitor.reporting.SchedulerContextListener;
import edu.umiacs.ace.monitor.reporting.SummaryGenerator; import edu.umiacs.ace.monitor.reporting.SummaryGenerator;
import edu.umiacs.ace.monitor.core.Collection;
import edu.umiacs.ace.monitor.core.ConfigConstants;
import edu.umiacs.ace.monitor.settings.SettingsUtil; import edu.umiacs.ace.monitor.settings.SettingsUtil;
import edu.umiacs.ace.monitor.log.LogEnum; import edu.umiacs.ace.remote.JsonGateway;
import edu.umiacs.ace.monitor.log.LogEvent;
import edu.umiacs.ace.monitor.peers.PeerCollection;
import edu.umiacs.ace.token.AceToken; import edu.umiacs.ace.token.AceToken;
import edu.umiacs.ace.util.PersistUtil;
import edu.umiacs.ace.util.TokenUtil; import edu.umiacs.ace.util.TokenUtil;
import edu.umiacs.util.Strings; import edu.umiacs.util.Strings;
import org.apache.log4j.Logger;
import org.apache.log4j.NDC;
import javax.mail.MessagingException;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;
import javax.persistence.Query;
import javax.sql.DataSource;
import java.io.InputStream; import java.io.InputStream;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import javax.mail.MessagingException;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;
import org.apache.log4j.Logger;
import org.apache.log4j.NDC;
/** /**
* *
...@@ -222,12 +228,6 @@ public final class AuditThread extends Thread implements CancelCallback { ...@@ -222,12 +228,6 @@ public final class AuditThread extends Thread implements CancelCallback {
session = System.currentTimeMillis(); session = System.currentTimeMillis();
logManager = new LogEventManager(session, coll); logManager = new LogEventManager(session, coll);
logAuditStart(); logAuditStart();
/*
while (IngestThreadPool.isIngesting(coll)) {
LOG.debug("Waiting for ingest to finish");
Thread.sleep(500);
}
*/
callback = new FileAuditCallback(coll, session, this); callback = new FileAuditCallback(coll, session, this);
boolean auditTokens = SettingsUtil.getBoolean(coll, boolean auditTokens = SettingsUtil.getBoolean(coll,
...@@ -254,7 +254,7 @@ public final class AuditThread extends Thread implements CancelCallback { ...@@ -254,7 +254,7 @@ public final class AuditThread extends Thread implements CancelCallback {
// Let outstanding tokens finish, TODO, de-hackify this. // Let outstanding tokens finish, TODO, de-hackify this.
sleep(2000); sleep(2000);
} catch (Throwable e) { } catch (Throwable e) {
LOG.fatal("UNcaught exception in performAudit()", e); LOG.fatal("Uncaught exception in performAudit()", e);
if (abortException != null) { if (abortException != null) {
abortException = e; abortException = e;
} }
...@@ -595,7 +595,6 @@ public final class AuditThread extends Thread implements CancelCallback { ...@@ -595,7 +595,6 @@ public final class AuditThread extends Thread implements CancelCallback {
} }
TokenRequest request = new TokenRequest(); TokenRequest request = new TokenRequest();
//request.setName(item.getPath());
request.setName(item.getId().toString()); request.setName(item.getId().toString());
request.setHashValue(currentFile.getHash()); request.setHashValue(currentFile.getHash());
if (!Strings.isEmpty(item.getPath()) && if (!Strings.isEmpty(item.getPath()) &&
...@@ -616,7 +615,6 @@ public final class AuditThread extends Thread implements CancelCallback { ...@@ -616,7 +615,6 @@ public final class AuditThread extends Thread implements CancelCallback {
LogEvent event; LogEvent event;
// If we have a registered file, set the digested value // If we have a registered file, set the digested value
LOG.trace(null == item.getFileDigest());
if (null == item.getFileDigest()) { if (null == item.getFileDigest()) {
LOG.trace("Setting digest for registered file " + item.getPath()); LOG.trace("Setting digest for registered file " + item.getPath());
item.setFileDigest(currentFile.getHash()); item.setFileDigest(currentFile.getHash());
...@@ -760,11 +758,43 @@ public final class AuditThread extends Thread implements CancelCallback { ...@@ -760,11 +758,43 @@ public final class AuditThread extends Thread implements CancelCallback {
return; return;
} }
DataSource dataSource = PersistUtil.getDataSource();
EntityManager em = PersistUtil.getEntityManager(); EntityManager em = PersistUtil.getEntityManager();
MonitoredItemManager mim = new MonitoredItemManager(em); MonitoredItemManager mim = new MonitoredItemManager(em);
EntityTransaction trans = em.getTransaction(); EntityTransaction trans = em.getTransaction();
trans.begin(); trans.begin();
try {
Query query = em.createNamedQuery("MonitoredItem.updateMissing")
.setParameter("coll", coll)
.setParameter("date", d);
int i = query.executeUpdate();
if (i > 0) {
LOG.info("Set " + i + " new missing items");
Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement("INSERT INTO logevent(session, path, date, logtype, collection_id) " +
"SELECT ?, path, NOW(), ?, parentcollection_id FROM monitored_item m WHERE m.parentcollection_id = ? AND m.state = ? AND m.statechange = ?");
ps.setLong(1, session);
ps.setInt(2, LogEnum.FILE_MISSING.getType());
ps.setLong(3, coll.getId());
ps.setString(4, String.valueOf('M'));
ps.setTimestamp(5, new Timestamp(d.getTime()));
ps.executeUpdate();
ps.close();
connection.close();
}
trans.commit();
} catch (SQLException e) {
trans.rollback();
LOG.error(e);
} finally {
em.close();
}
/*
int idx = 0;
for (MonitoredItem mi : mim.listItemsBefore(coll, d)) { for (MonitoredItem mi : mim.listItemsBefore(coll, d)) {
LOG.trace("Updating missing item: " + mi.getPath()); LOG.trace("Updating missing item: " + mi.getPath());
LOG.trace("Item information: LS= " + mi.getLastSeen() LOG.trace("Item information: LS= " + mi.getLastSeen()
...@@ -781,9 +811,13 @@ public final class AuditThread extends Thread implements CancelCallback { ...@@ -781,9 +811,13 @@ public final class AuditThread extends Thread implements CancelCallback {
} }
mi.setLastVisited(new Date()); mi.setLastVisited(new Date());
em.merge(mi); em.merge(mi);
idx++;
if (idx % 30 == 0) {
em.flush();
em.clear();
} }
trans.commit(); }
em.close(); */
} }
private void compareToPeers() { private void compareToPeers() {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment