Commit b8c7ef9c authored by Michael Ritter's avatar Michael Ritter
Browse files

Merge branch '28-oom-in-auditthread' into 'develop'

Merge OOM in AuditThread

Merge request for #28

See merge request !2
parents 0fed3842 acf49f1a
...@@ -32,48 +32,54 @@ package edu.umiacs.ace.monitor.audit; ...@@ -32,48 +32,54 @@ package edu.umiacs.ace.monitor.audit;
import edu.umiacs.ace.driver.AuditIterable; import edu.umiacs.ace.driver.AuditIterable;
import edu.umiacs.ace.driver.DriverStateBean; import edu.umiacs.ace.driver.DriverStateBean;
import edu.umiacs.ace.driver.filter.PathFilter;
import edu.umiacs.ace.driver.FileBean; import edu.umiacs.ace.driver.FileBean;
import edu.umiacs.ace.driver.StorageDriver; import edu.umiacs.ace.driver.StorageDriver;
import edu.umiacs.ace.driver.filter.PathFilter;
import edu.umiacs.ace.driver.filter.SimpleFilter;
import edu.umiacs.ace.ims.api.IMSException; import edu.umiacs.ace.ims.api.IMSException;
import edu.umiacs.ace.ims.api.IMSService; import edu.umiacs.ace.ims.api.IMSService;
import edu.umiacs.ace.ims.api.TokenRequestBatch; import edu.umiacs.ace.ims.api.TokenRequestBatch;
import edu.umiacs.ace.ims.api.TokenValidator; import edu.umiacs.ace.ims.api.TokenValidator;
import edu.umiacs.ace.ims.ws.TokenRequest; import edu.umiacs.ace.ims.ws.TokenRequest;
import edu.umiacs.ace.monitor.access.CollectionCountContext; import edu.umiacs.ace.monitor.access.CollectionCountContext;
import edu.umiacs.ace.monitor.register.IngestThreadPool;
import edu.umiacs.ace.util.PersistUtil;
import edu.umiacs.ace.driver.filter.SimpleFilter;
import edu.umiacs.ace.monitor.compare.CollectionCompare2; import edu.umiacs.ace.monitor.compare.CollectionCompare2;
import edu.umiacs.ace.monitor.compare.CompareResults; import edu.umiacs.ace.monitor.compare.CompareResults;
import edu.umiacs.ace.remote.JsonGateway; import edu.umiacs.ace.monitor.core.Collection;
import edu.umiacs.ace.monitor.core.ConfigConstants;
import edu.umiacs.ace.monitor.core.MonitoredItem; import edu.umiacs.ace.monitor.core.MonitoredItem;
import edu.umiacs.ace.monitor.core.MonitoredItemManager; import edu.umiacs.ace.monitor.core.MonitoredItemManager;
import edu.umiacs.ace.monitor.log.LogEnum;
import edu.umiacs.ace.monitor.log.LogEvent;
import edu.umiacs.ace.monitor.log.LogEventManager; import edu.umiacs.ace.monitor.log.LogEventManager;
import edu.umiacs.ace.monitor.peers.PeerCollection;
import edu.umiacs.ace.monitor.reporting.ReportSummary; import edu.umiacs.ace.monitor.reporting.ReportSummary;
import edu.umiacs.ace.monitor.reporting.SchedulerContextListener; import edu.umiacs.ace.monitor.reporting.SchedulerContextListener;
import edu.umiacs.ace.monitor.reporting.SummaryGenerator; import edu.umiacs.ace.monitor.reporting.SummaryGenerator;
import edu.umiacs.ace.monitor.core.Collection;
import edu.umiacs.ace.monitor.core.ConfigConstants;
import edu.umiacs.ace.monitor.settings.SettingsUtil; import edu.umiacs.ace.monitor.settings.SettingsUtil;
import edu.umiacs.ace.monitor.log.LogEnum; import edu.umiacs.ace.remote.JsonGateway;
import edu.umiacs.ace.monitor.log.LogEvent;
import edu.umiacs.ace.monitor.peers.PeerCollection;
import edu.umiacs.ace.token.AceToken; import edu.umiacs.ace.token.AceToken;
import edu.umiacs.ace.util.PersistUtil;
import edu.umiacs.ace.util.TokenUtil; import edu.umiacs.ace.util.TokenUtil;
import edu.umiacs.util.Strings; import edu.umiacs.util.Strings;
import org.apache.log4j.Logger;
import org.apache.log4j.NDC;
import javax.mail.MessagingException;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;
import javax.persistence.Query;
import javax.sql.DataSource;
import java.io.InputStream; import java.io.InputStream;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import javax.mail.MessagingException;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;
import org.apache.log4j.Logger;
import org.apache.log4j.NDC;
/** /**
* *
...@@ -191,16 +197,6 @@ public final class AuditThread extends Thread implements CancelCallback { ...@@ -191,16 +197,6 @@ public final class AuditThread extends Thread implements CancelCallback {
iterableItems.cancel(); iterableItems.cancel();
} }
/*
if (batch != null) {
batch.close();
}
if (validator != null) {
validator.close();
}
*/
if (AuditThreadFactory.isRunning(coll) || AuditThreadFactory.isQueued(coll)) { if (AuditThreadFactory.isRunning(coll) || AuditThreadFactory.isQueued(coll)) {
AuditThreadFactory.finished(coll); AuditThreadFactory.finished(coll);
} }
...@@ -222,12 +218,6 @@ public final class AuditThread extends Thread implements CancelCallback { ...@@ -222,12 +218,6 @@ public final class AuditThread extends Thread implements CancelCallback {
session = System.currentTimeMillis(); session = System.currentTimeMillis();
logManager = new LogEventManager(session, coll); logManager = new LogEventManager(session, coll);
logAuditStart(); logAuditStart();
/*
while (IngestThreadPool.isIngesting(coll)) {
LOG.debug("Waiting for ingest to finish");
Thread.sleep(500);
}
*/
callback = new FileAuditCallback(coll, session, this); callback = new FileAuditCallback(coll, session, this);
boolean auditTokens = SettingsUtil.getBoolean(coll, boolean auditTokens = SettingsUtil.getBoolean(coll,
...@@ -254,7 +244,7 @@ public final class AuditThread extends Thread implements CancelCallback { ...@@ -254,7 +244,7 @@ public final class AuditThread extends Thread implements CancelCallback {
// Let outstanding tokens finish, TODO, de-hackify this. // Let outstanding tokens finish, TODO, de-hackify this.
sleep(2000); sleep(2000);
} catch (Throwable e) { } catch (Throwable e) {
LOG.fatal("UNcaught exception in performAudit()", e); LOG.fatal("Uncaught exception in performAudit()", e);
if (abortException != null) { if (abortException != null) {
abortException = e; abortException = e;
} }
...@@ -595,7 +585,6 @@ public final class AuditThread extends Thread implements CancelCallback { ...@@ -595,7 +585,6 @@ public final class AuditThread extends Thread implements CancelCallback {
} }
TokenRequest request = new TokenRequest(); TokenRequest request = new TokenRequest();
//request.setName(item.getPath());
request.setName(item.getId().toString()); request.setName(item.getId().toString());
request.setHashValue(currentFile.getHash()); request.setHashValue(currentFile.getHash());
if (!Strings.isEmpty(item.getPath()) && if (!Strings.isEmpty(item.getPath()) &&
...@@ -616,7 +605,6 @@ public final class AuditThread extends Thread implements CancelCallback { ...@@ -616,7 +605,6 @@ public final class AuditThread extends Thread implements CancelCallback {
LogEvent event; LogEvent event;
// If we have a registered file, set the digested value // If we have a registered file, set the digested value
LOG.trace(null == item.getFileDigest());
if (null == item.getFileDigest()) { if (null == item.getFileDigest()) {
LOG.trace("Setting digest for registered file " + item.getPath()); LOG.trace("Setting digest for registered file " + item.getPath());
item.setFileDigest(currentFile.getHash()); item.setFileDigest(currentFile.getHash());
...@@ -713,8 +701,8 @@ public final class AuditThread extends Thread implements CancelCallback { ...@@ -713,8 +701,8 @@ public final class AuditThread extends Thread implements CancelCallback {
if (parentName != null) { if (parentName != null) {
parentName = Strings.cleanStringForXml(parentName, '_'); parentName = Strings.cleanStringForXml(parentName, '_');
for (int i = 1; i < currentFile.getPathList().length; i++) { for (int i = 1; i < currentFile.getPathList().length; i++) {
String parent = (currentFile.getPathList().length > i + 1 String parent = currentFile.getPathList().length > i + 1 ?
? currentFile.getPathList()[i + 1] : null); currentFile.getPathList()[i + 1] : null;
parent = Strings.cleanStringForXml(parent, '_'); parent = Strings.cleanStringForXml(parent, '_');
mim.createDirectory(currentFile.getPathList()[i], parent, coll); mim.createDirectory(currentFile.getPathList()[i], parent, coll);
} }
...@@ -725,8 +713,7 @@ public final class AuditThread extends Thread implements CancelCallback { ...@@ -725,8 +713,7 @@ public final class AuditThread extends Thread implements CancelCallback {
private String[] createMailList() { private String[] createMailList() {
String addrs = SettingsUtil.getString(coll, ConfigConstants.ATTR_EMAIL_RECIPIENTS); String addrs = SettingsUtil.getString(coll, ConfigConstants.ATTR_EMAIL_RECIPIENTS);
String[] maillist = (addrs == null ? null : addrs.split("\\s*,\\s*")); return addrs == null ? null : addrs.split("\\s*,\\s*");
return maillist;
} }
private void setCollectionState() { private void setCollectionState() {
...@@ -760,30 +747,41 @@ public final class AuditThread extends Thread implements CancelCallback { ...@@ -760,30 +747,41 @@ public final class AuditThread extends Thread implements CancelCallback {
return; return;
} }
DataSource dataSource = PersistUtil.getDataSource();
EntityManager em = PersistUtil.getEntityManager(); EntityManager em = PersistUtil.getEntityManager();
MonitoredItemManager mim = new MonitoredItemManager(em);
EntityTransaction trans = em.getTransaction(); EntityTransaction trans = em.getTransaction();
trans.begin(); trans.begin();
for (MonitoredItem mi : mim.listItemsBefore(coll, d)) { try {
LOG.trace("Updating missing item: " + mi.getPath()); // Update the monitored item table
LOG.trace("Item information: LS= " + mi.getLastSeen() Query query = em.createNamedQuery("MonitoredItem.updateMissing")
+ ", LV = " + mi.getLastVisited() .setParameter("coll", coll)
+ ", SC = " + mi.getStateChange() .setParameter("date", d);
+ "Audit started on: " + d);
if (mi.getState() != 'M' int i = query.executeUpdate();
&& (mi.getStateChange() == null if (i > 0) {
|| d.after(mi.getStateChange()))) { LOG.info("Set " + i + " new missing items");
mi.setState('M');
mi.setStateChange(new Date()); // Add log entries for the new missing items
em.persist(logManager.createItemEvent(LogEnum.FILE_MISSING, Connection connection = dataSource.getConnection();
mi.getPath())); PreparedStatement ps = connection.prepareStatement("INSERT INTO logevent(session, path, date, logtype, collection_id) " +
"SELECT ?, path, NOW(), ?, parentcollection_id FROM monitored_item m WHERE m.parentcollection_id = ? AND m.state = ? AND m.statechange = ?");
ps.setLong(1, session);
ps.setInt(2, LogEnum.FILE_MISSING.getType());
ps.setLong(3, coll.getId());
ps.setString(4, String.valueOf('M'));
ps.setTimestamp(5, new Timestamp(d.getTime()));
ps.executeUpdate();
ps.close();
connection.close();
} }
mi.setLastVisited(new Date()); trans.commit();
em.merge(mi); } catch (SQLException e) {
trans.rollback();
LOG.error("SQL error, rolling back missing items and log entries", e);
} finally {
em.close();
} }
trans.commit();
em.close();
} }
private void compareToPeers() { private void compareToPeers() {
......
...@@ -101,35 +101,48 @@ import javax.persistence.TemporalType; ...@@ -101,35 +101,48 @@ import javax.persistence.TemporalType;
@NamedQuery(name = "MonitoredItem.listRemoteErrors", query = @NamedQuery(name = "MonitoredItem.listRemoteErrors", query =
"SELECT m FROM MonitoredItem m WHERE (m.state = 'P' OR m.state = 'D') AND m.directory = false AND m.parentCollection = :coll"), "SELECT m FROM MonitoredItem m WHERE (m.state = 'P' OR m.state = 'D') AND m.directory = false AND m.parentCollection = :coll"),
@NamedQuery(name = "MonitoredItem.listLocalErrors", query = @NamedQuery(name = "MonitoredItem.listLocalErrors", query =
"SELECT m FROM MonitoredItem m WHERE (m.state = 'C' OR m.state = 'M' OR m.state = 'T' OR m.state = 'I') AND m.directory = false AND m.parentCollection = :coll") "SELECT m FROM MonitoredItem m WHERE (m.state = 'C' OR m.state = 'M' OR m.state = 'T' OR m.state = 'I') AND m.directory = false AND m.parentCollection = :coll"),
@NamedQuery(name = "MonitoredItem.updateMissing", query =
"UPDATE MonitoredItem SET state = 'M', stateChange = :date, lastVisited = :date WHERE parentCollection = :coll AND lastVisited < :date AND state != 'M'")
}) })
public class MonitoredItem implements Serializable, Comparable { public class MonitoredItem implements Serializable, Comparable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@Id @Id
@GeneratedValue(strategy = GenerationType.IDENTITY) @GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id; private Long id;
@Column(nullable = false, columnDefinition = "VARCHAR(255) COLLATE utf8") @Column(nullable = false, columnDefinition = "VARCHAR(255) COLLATE utf8")
private String path; // path relative to base directory private String path; // path relative to base directory
@Column(columnDefinition = "VARCHAR(255) COLLATE utf8") @Column(columnDefinition = "VARCHAR(255) COLLATE utf8")
private String parentPath; private String parentPath;
@Column(nullable = false) @Column(nullable = false)
private boolean directory; // true if directory private boolean directory; // true if directory
@Temporal(TemporalType.TIMESTAMP) @Temporal(TemporalType.TIMESTAMP)
private Date lastSeen; private Date lastSeen;
@Temporal(TemporalType.TIMESTAMP) @Temporal(TemporalType.TIMESTAMP)
private Date stateChange; private Date stateChange;
@Temporal(TemporalType.TIMESTAMP) @Temporal(TemporalType.TIMESTAMP)
private Date lastVisited; private Date lastVisited;
@ManyToOne @ManyToOne
@JoinColumn(nullable = false) @JoinColumn(nullable = false)
private Collection parentCollection; private Collection parentCollection;
private String fileDigest;
@Column(nullable = false) @Column(nullable = false)
private char state; private char state;
@ManyToOne(cascade = CascadeType.ALL) @ManyToOne(cascade = CascadeType.ALL)
private Token token; private Token token;
private long size; private long size;
private String fileDigest;
public void setId( Long id ) { public void setId( Long id ) {
this.id = id; this.id = id;
......
...@@ -62,7 +62,7 @@ import java.util.concurrent.RecursiveAction; ...@@ -62,7 +62,7 @@ import java.util.concurrent.RecursiveAction;
public class IngestThread extends RecursiveAction { public class IngestThread extends RecursiveAction {
private static final Logger LOG = Logger.getLogger(IngestThread.class); private static final Logger LOG = Logger.getLogger(IngestThread.class);
// These only gets read from, never written to // These only get read from, never written to
private Map<String, Token> tokens; private Map<String, Token> tokens;
private Collection coll; private Collection coll;
private List<String> identifiers; private List<String> identifiers;
...@@ -293,4 +293,4 @@ public class IngestThread extends RecursiveAction { ...@@ -293,4 +293,4 @@ public class IngestThread extends RecursiveAction {
} }
} }
\ No newline at end of file
...@@ -133,56 +133,7 @@ ...@@ -133,56 +133,7 @@
background-color: #e8e8e8; background-color: #e8e8e8;
} }
#searchtable {
margin-top: 10px;
margin-bottom: -10px;
margin-left: auto;
margin-right: auto;
width: 650px;
}
.input {
padding: 2px;
display: flex;
width: 650px;
}
.input-group-addon {
border: 1px solid #ccc;
font-size: 12px;
background-color: #e8e8e8;
text-align: center;
width: 75px;
height: 20px;
line-height: 20px;
padding: 3px 10px;
}
.form-input {
border: 1px solid #ccc;
width: 100%;
height: 20px;
padding: 3px 8px;
margin-left: -1px;
margin-right: 5px;
}
.form-select {
border: 1px solid #ccc;
width: 100%;
padding: 3px 8px;
margin-left: -3px;
margin-right: 5px;
}
.btn {
padding: 2px;
width: 75px;
height: 25px;
border: 1px solid #e8e8e8;
margin-left: 2px;
margin-top: 2px;
}
</style> </style>
</head> </head>
...@@ -344,8 +295,6 @@ ...@@ -344,8 +295,6 @@
<table id="linktable"> <table id="linktable">
<tr> <tr>
<td align="left"> <td align="left">
<%-- <a href="Status?count=${count}">|&lt;</a>&nbsp;&nbsp;&nbsp;
<a href="Status?page=${page - 1}&count=${count}">&lt;&lt;</a> --%>
<a href="${page.first}">|&lt;</a>&nbsp;&nbsp;&nbsp; <a href="${page.first}">|&lt;</a>&nbsp;&nbsp;&nbsp;
<a href="${page.previous}">&lt;&lt;</a> <a href="${page.previous}">&lt;&lt;</a>
</td> </td>
...@@ -356,9 +305,6 @@ ...@@ -356,9 +305,6 @@
<a href="${page.getCount(1000)}">1000</a> <a href="${page.getCount(1000)}">1000</a>
</td> </td>
<td align="right"> <td align="right">
<%--
<a href="Status?page=${page + 1}&count=${count}">&gt;&gt;</a>&nbsp;&nbsp;&nbsp;
<a href="Status?count=${count}&start=0&top=0">&gt;|</a> --%>
<a href="${page.next}">&gt;&gt;</a>&nbsp;&nbsp;&nbsp; <a href="${page.next}">&gt;&gt;</a>&nbsp;&nbsp;&nbsp;
<a href="${page.end}">&gt;|</a> <a href="${page.end}">&gt;|</a>
</td> </td>
......
body { body {
font-family: Arial,Helvetica,sans-serif; font-family: Arial, Helvetica, sans-serif;
font-size: 10px; font-size: 10px;
width: 750px; width: 750px;
border: 1px solid #000000; border: 1px solid #000000;
...@@ -13,10 +13,19 @@ h1 { ...@@ -13,10 +13,19 @@ h1 {
margin-left: 10px; margin-left: 10px;
} }
a:link, a:visited {
font-size: 12px;
color: #003388;
text-decoration: none;
}
a:hover {
color: #0066CC;
}
a:link, a:visited {font-size: 12px; color: #003388; text-decoration: none;} a img {
a:hover {color: #0066CC;} border: none;
a img {border:none;} }
input { input {
font-size: 12px; font-size: 12px;
...@@ -29,13 +38,16 @@ input { ...@@ -29,13 +38,16 @@ input {
margin-left: 50px; margin-left: 50px;
margin-right: 50px; margin-right: 50px;
} }
.menuheader { .menuheader {
float: right; float: right;
} }
.menuheader table { .menuheader table {
} }
/** /**
.menubar { .menubar {
padding: 0px; padding: 0px;
...@@ -79,7 +91,6 @@ width: 100%; ...@@ -79,7 +91,6 @@ width: 100%;
text-align: center; text-align: center;
} }
.footer { .footer {
/*padding-left: 50px;*/ /*padding-left: 50px;*/
text-align: center; text-align: center;
...@@ -101,7 +112,7 @@ vertical-align: top; ...@@ -101,7 +112,7 @@ vertical-align: top;
.submitLink { .submitLink {
color: #4444cc; color: #4444cc;
font-family: Arial,Helvetica,sans-serif; font-family: Arial, Helvetica, sans-serif;
background-color: transparent; background-color: transparent;
text-decoration: underline; text-decoration: underline;
border: none; border: none;
...@@ -120,8 +131,6 @@ vertical-align: top; ...@@ -120,8 +131,6 @@ vertical-align: top;
font-size: small; font-size: small;
} }
#statustable thead { #statustable thead {
background-color: #e8e8e8; background-color: #e8e8e8;
} }
...@@ -130,7 +139,7 @@ vertical-align: top; ...@@ -130,7 +139,7 @@ vertical-align: top;
border-bottom: 1px dotted #555; border-bottom: 1px dotted #555;
} }
#statustable tr td{ #statustable tr td {
padding-left: 5px; padding-left: 5px;
} }
...@@ -163,10 +172,10 @@ vertical-align: top; ...@@ -163,10 +172,10 @@ vertical-align: top;
} }
#settingsTable { #settingsTable {
font-size: small; font-size: small;
border: 1px solid #000000; border: 1px solid #000000;
margin-left: 5%; margin-left: 5%;
margin-right: 5%; margin-right: 5%;
} }
.settingsRow { .settingsRow {
...@@ -178,9 +187,9 @@ vertical-align: top; ...@@ -178,9 +187,9 @@ vertical-align: top;
} }
.setti