Commit 32b972e5 authored by Michael Ritter's avatar Michael Ritter
Browse files

Fixes for 1.14 regressions

ace-am/
  - update jersey dependencies
  - fix redirect for TokenImportStatus
  - catch possible NPE when shutting down the IngestThreadPool
  - start to incorporate the ForkJoin common pool for tasks which need to be run
  - in the AuditThread, if a MonitoredItem has no digest set the storedDigest to be the hash generated
  - in the IngestThread, use a non-locking method when querying for MonitoredItems. The MonitoredItemManager has the potential to lock threads out which have open database transactions, which can cause deadlocks.
parent f9f692f1
<?xml version="1.0"?> <?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<parent> <parent>
<artifactId>ace</artifactId> <artifactId>ace</artifactId>
...@@ -72,10 +74,12 @@ ...@@ -72,10 +74,12 @@
</dependency> </dependency>
<!-- for json --> <!-- for json -->
<!--
<dependency> <dependency>
<groupId>com.fasterxml.jackson.core</groupId> <groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId> <artifactId>jackson-core</artifactId>
</dependency> </dependency>
-->
<dependency> <dependency>
<groupId>net.java.dev.rome</groupId> <groupId>net.java.dev.rome</groupId>
...@@ -185,12 +189,14 @@ ...@@ -185,12 +189,14 @@
<dependency> <dependency>
<groupId>org.glassfish.jersey.containers</groupId> <groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-servlet</artifactId> <artifactId>jersey-container-servlet</artifactId>
<version>2.24</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.glassfish.jersey.media</groupId> <groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-jackson</artifactId> <artifactId>jersey-media-json-jackson</artifactId>
<version>2.24</version> </dependency>
<dependency>
<groupId>org.glassfish.jersey.inject</groupId>
<artifactId>jersey-hk2</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.httpcomponents</groupId> <groupId>org.apache.httpcomponents</groupId>
......
...@@ -593,9 +593,12 @@ public final class AuditThread extends Thread implements CancelCallback { ...@@ -593,9 +593,12 @@ public final class AuditThread extends Thread implements CancelCallback {
LOG.trace("Generated checksum: " + currentFileHash + " expected checksum: " + storedDigest); LOG.trace("Generated checksum: " + currentFileHash + " expected checksum: " + storedDigest);
LogEvent event; LogEvent event;
// If we have a registered file, set the digested value // If we have a registered file:
// set the stored digest to be the generated hash
// set the file digest of the current monitored_item to the generated hash
if (null == storedDigest) { if (null == storedDigest) {
LOG.trace("Setting digest for registered file " + item.getPath()); LOG.trace("Setting digest for registered file " + item.getPath());
storedDigest = currentFileHash;
item.setFileDigest(currentFileHash); item.setFileDigest(currentFileHash);
item.setLastSeen(new Date()); item.setLastSeen(new Date());
} }
......
...@@ -37,6 +37,8 @@ import edu.umiacs.ace.util.EntityManagerServlet; ...@@ -37,6 +37,8 @@ import edu.umiacs.ace.util.EntityManagerServlet;
import edu.umiacs.ace.util.PersistUtil; import edu.umiacs.ace.util.PersistUtil;
import edu.umiacs.util.Strings; import edu.umiacs.util.Strings;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.log4j.NDC;
import org.eclipse.persistence.exceptions.DatabaseException;
import javax.persistence.EntityManager; import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction; import javax.persistence.EntityTransaction;
...@@ -50,9 +52,11 @@ import java.io.IOException; ...@@ -50,9 +52,11 @@ import java.io.IOException;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.ForkJoinPool;
/** /**
* add,modify,remove the settings of a collection * add,modify,remove the settings of a collection
*
* @author toaster * @author toaster
*/ */
public class ManageCollectionServlet extends EntityManagerServlet { public class ManageCollectionServlet extends EntityManagerServlet {
...@@ -77,7 +81,6 @@ public class ManageCollectionServlet extends EntityManagerServlet { ...@@ -77,7 +81,6 @@ public class ManageCollectionServlet extends EntityManagerServlet {
// private static final String PARAM_PEER_COLLECTION_PREFIX = "peer_col_id"; // private static final String PARAM_PEER_COLLECTION_PREFIX = "peer_col_id";
/** /**
*
* @param request * @param request
* @param response * @param response
* @param em * @param em
...@@ -86,7 +89,7 @@ public class ManageCollectionServlet extends EntityManagerServlet { ...@@ -86,7 +89,7 @@ public class ManageCollectionServlet extends EntityManagerServlet {
*/ */
@Override @Override
protected void processRequest(HttpServletRequest request, protected void processRequest(HttpServletRequest request,
HttpServletResponse response, EntityManager em) HttpServletResponse response, EntityManager em)
throws ServletException, IOException { throws ServletException, IOException {
EntityTransaction trans; EntityTransaction trans;
...@@ -97,7 +100,7 @@ public class ManageCollectionServlet extends EntityManagerServlet { ...@@ -97,7 +100,7 @@ public class ManageCollectionServlet extends EntityManagerServlet {
/* /*
* Modification, view, or removal of an existing collection * Modification, view, or removal of an existing collection
* if we have an int, and its > 0, and it is the key for a resource * if we have an int, and its > 0, and it is the key for a resource
*/ */
if ((collection = getCollection(request, em)) != null) { if ((collection = getCollection(request, em)) != null) {
...@@ -105,23 +108,20 @@ public class ManageCollectionServlet extends EntityManagerServlet { ...@@ -105,23 +108,20 @@ public class ManageCollectionServlet extends EntityManagerServlet {
if (collection.getStorage() != null) { if (collection.getStorage() != null) {
storage = StorageDriverFactory.createStorageAccess(collection, em); storage = StorageDriverFactory.createStorageAccess(collection, em);
} }
/* /*
* Tst to see if we're removing a collection * Tst to see if we're removing a collection
*/ */
if (!Strings.isEmpty(request.getParameter(PARAM_REMOVE)) if (!Strings.isEmpty(request.getParameter(PARAM_REMOVE))
&& request.getParameter(PARAM_REMOVE).toLowerCase().equals("yes")) { && request.getParameter(PARAM_REMOVE).toLowerCase().equals("yes")) {
LOG.debug("removing collection " + collection.getName()); LOG.debug("removing collection " + collection.getName());
removeCollection(em, collection, storage); ForkJoinPool.commonPool().submit(new RemoveThread(collection, storage));
response.sendRedirect("Status?collectionid=-1"); response.sendRedirect("Status?collectionid=-1");
return; return;
} /* } /*
* otherwise, are we updating? * otherwise, are we updating?
*/ */ else if (checkParameters(request)
else if (checkParameters(request)
&& ((paramCheckResponse = checkStorage(storage, request, collection)) == null)) { && ((paramCheckResponse = checkStorage(storage, request, collection)) == null)) {
LOG.debug("updating collection: " + collection.getName()); LOG.debug("updating collection: " + collection.getName());
trans = em.getTransaction(); trans = em.getTransaction();
...@@ -135,17 +135,15 @@ public class ManageCollectionServlet extends EntityManagerServlet { ...@@ -135,17 +135,15 @@ public class ManageCollectionServlet extends EntityManagerServlet {
} /* } /*
* ok, we're just opening an existing collection for modification * ok, we're just opening an existing collection for modification
* - this should display the storage page since storage will be set * - this should display the storage page since storage will be set
*/ */ else {
else {
LOG.debug("loading existing collection: " + collection.getName()); LOG.debug("loading existing collection: " + collection.getName());
dispatcher = request.getRequestDispatcher("collectionmodify.jsp"); dispatcher = request.getRequestDispatcher("collectionmodify.jsp");
} }
} /* } /*
* its either a new submission, or requesting a blank page. * its either a new submission, or requesting a blank page.
* We shouldn't set storage information here yet, just create the * We shouldn't set storage information here yet, just create the
* new collection and create a blank storage * new collection and create a blank storage
*/ */ else {
else {
collection = new Collection(); collection = new Collection();
collection.setState(CollectionState.NEVER); collection.setState(CollectionState.NEVER);
populateCollection(request, collection); populateCollection(request, collection);
...@@ -154,8 +152,7 @@ public class ManageCollectionServlet extends EntityManagerServlet { ...@@ -154,8 +152,7 @@ public class ManageCollectionServlet extends EntityManagerServlet {
if (checkParameters(request) && hasDigest(request)) { if (checkParameters(request) && hasDigest(request)) {
LOG.debug("creating collection, empty driver: " + collection.getName()); LOG.debug("creating collection, empty driver: " + collection.getName());
PersistUtil.persist(collection); PersistUtil.persist(collection);
storage = StorageDriverFactory.createStorageAccess(collection, storage = StorageDriverFactory.createStorageAccess(collection, em);
em);
CollectionCountContext.incrementTotalCollections(); CollectionCountContext.incrementTotalCollections();
} }
...@@ -182,37 +179,6 @@ public class ManageCollectionServlet extends EntityManagerServlet { ...@@ -182,37 +179,6 @@ public class ManageCollectionServlet extends EntityManagerServlet {
return result; return result;
} }
private void removeCollection(EntityManager em, Collection collection, StorageDriver storage) {
EntityTransaction trans;
Query q;
trans = em.getTransaction();
trans.begin();
q = em.createNamedQuery("LogEvent.deleteByCollection");
q.setParameter("coll", collection);
q.executeUpdate();
q = em.createNamedQuery("Token.deleteByCollection");
q.setParameter("coll", collection);
q.executeUpdate();
q = em.createNamedQuery("MonitoredItem.deleteByCollection");
q.setParameter("coll", collection);
q.executeUpdate();
q = em.createNamedQuery("FilterEntry.dropByCollection");
q.setParameter("coll", collection);
q.executeUpdate();
q = em.createNamedQuery("ReportSummary.deleteByCollection");
q.setParameter("coll", collection);
q.executeUpdate();
q = em.createNamedQuery("PeerCollection.deleteByCollection");
q.setParameter("coll", collection);
q.executeUpdate();
if (storage != null) {
storage.remove(em);
}
em.remove(collection);
CollectionCountContext.decrementTotalCollections(collection);
trans.commit();
}
private boolean hasDigest(HttpServletRequest req) { private boolean hasDigest(HttpServletRequest req) {
String digest = req.getParameter(PARAM_DIGEST); String digest = req.getParameter(PARAM_DIGEST);
if (!Strings.isEmpty(digest)) { if (!Strings.isEmpty(digest)) {
...@@ -246,29 +212,23 @@ public class ManageCollectionServlet extends EntityManagerServlet { ...@@ -246,29 +212,23 @@ public class ManageCollectionServlet extends EntityManagerServlet {
} }
if (col.getSettings() == null) if (col.getSettings() == null)
col.setSettings(new HashMap<String, String>()); col.setSettings(new HashMap<>());
if (!Strings.isEmpty(req.getParameter(PARAM_EMAILLIST))) { if (!Strings.isEmpty(req.getParameter(PARAM_EMAILLIST))) {
// col.setEmailList(req.getParameter(PARAM_EMAILLIST));
col.getSettings().put(ConfigConstants.ATTR_EMAIL_RECIPIENTS, req.getParameter(PARAM_EMAILLIST)); col.getSettings().put(ConfigConstants.ATTR_EMAIL_RECIPIENTS, req.getParameter(PARAM_EMAILLIST));
} }
//
if (Strings.isValidInt(req.getParameter(PARAM_CHECKPERIOD))) { if (Strings.isValidInt(req.getParameter(PARAM_CHECKPERIOD))) {
col.getSettings().put(ConfigConstants.ATTR_AUDIT_PERIOD, req.getParameter(PARAM_CHECKPERIOD)); col.getSettings().put(ConfigConstants.ATTR_AUDIT_PERIOD, req.getParameter(PARAM_CHECKPERIOD));
// col.setCheckPeriod(Integer.parseInt(req.getParameter(PARAM_CHECKPERIOD)));
} }
//
if (req.getParameter(PARAM_PROXY_DATA) != null) { if (req.getParameter(PARAM_PROXY_DATA) != null) {
col.getSettings().put(ConfigConstants.ATTR_PROXY_DATA, req.getParameter(PARAM_PROXY_DATA)); col.getSettings().put(ConfigConstants.ATTR_PROXY_DATA, req.getParameter(PARAM_PROXY_DATA));
// col.setProxyData("true".equals(req.getParameter(PARAM_PROXY_DATA).toLowerCase()));
} }
//
if (req.getParameter(PARAM_AUDIT_TOKENS) != null) { if (req.getParameter(PARAM_AUDIT_TOKENS) != null) {
col.getSettings().put(ConfigConstants.ATTR_AUDIT_TOKENS, req.getParameter(PARAM_AUDIT_TOKENS)); col.getSettings().put(ConfigConstants.ATTR_AUDIT_TOKENS, req.getParameter(PARAM_AUDIT_TOKENS));
// col.setAuditTokens("true".equals(req.getParameter(PARAM_AUDIT_TOKENS).toLowerCase()));
} }
} }
public boolean checkParameters(HttpServletRequest req) { public boolean checkParameters(HttpServletRequest req) {
...@@ -276,4 +236,93 @@ public class ManageCollectionServlet extends EntityManagerServlet { ...@@ -276,4 +236,93 @@ public class ManageCollectionServlet extends EntityManagerServlet {
&& !Strings.isEmpty(req.getParameter(PARAM_DIR)) && !Strings.isEmpty(req.getParameter(PARAM_DIR))
&& StorageDriverFactory.listResources().contains(req.getParameter(PARAM_DRIVER))); && StorageDriverFactory.listResources().contains(req.getParameter(PARAM_DRIVER)));
} }
private class RemoveThread implements Runnable {
private static final String COLL_ID = "collection_id";
private static final String PARENT_ID = "parent_id";
private static final String PARENT_COLL_ID = "parentcollection_id";
private static final String LOG_EVENT = "logevent";
private static final String LOG_EVENT_ID = COLL_ID;
private static final String ACE_TOKEN = "acetoken";
private static final String ACE_TOKEN_ID = PARENT_COLL_ID;
private static final String MONITORED_ITEM = "monitored_item";
private static final String MONITORED_ITEM_ID = PARENT_COLL_ID;
private static final String FILTER_ENTRY = "filter_entry";
private static final String FILTER_ENTRY_ID = COLL_ID;
private static final String REPORT_SUMMARY = "report_summary";
private static final String REPORT_SUMMARY_ID = COLL_ID;
private static final String PEER_COLLECTION = "peer_collection";
private static final String PEER_COLLECTION_ID = PARENT_ID;
private Boolean abort = false;
private Collection collection;
private StorageDriver storage;
private RemoveThread(Collection collection, StorageDriver storage) {
this.collection = collection;
this.storage = storage;
}
@Override
public void run() {
// this is on a separate thread so re-acquire the entity manager
NDC.push("[Remove " + collection.getName() + "] ");
EntityManager em = PersistUtil.getEntityManager();
// batching to reduce contention on tables when deleting many rows
// todo: native queries aren't the best for this, but deleting with jpql is... weird
// could possibly try to use a CriteriaBuilder and issue a subquery
LOG.info("Starting remove");
batchRm(em, collection, LOG_EVENT, LOG_EVENT_ID);
batchRm(em, collection, ACE_TOKEN, ACE_TOKEN_ID);
batchRm(em, collection, MONITORED_ITEM, MONITORED_ITEM_ID);
batchRm(em, collection, FILTER_ENTRY, FILTER_ENTRY_ID);
batchRm(em, collection, REPORT_SUMMARY, REPORT_SUMMARY_ID);
batchRm(em, collection, PEER_COLLECTION, PEER_COLLECTION_ID);
if (!abort) {
// The collection and storage driver are detached at this point so they need to be
// re-acquired
collection = em.find(Collection.class, collection.getId());
storage = StorageDriverFactory.createStorageAccess(collection, em);
if (storage != null) {
storage.remove(em);
}
LOG.info("Finishing remove");
EntityTransaction transaction = em.getTransaction();
transaction.begin();
em.remove(collection);
transaction.commit();
CollectionCountContext.decrementTotalCollections(collection);
}
NDC.pop();
NDC.clear();
}
private void batchRm(EntityManager em, Collection coll, String table, String row) {
final String queryString = "DELETE FROM %s WHERE %s = %s ORDER BY id LIMIT 1000";
boolean run = true;
LOG.debug("Removing entries for " + table);
Query q = em.createNativeQuery(String.format(queryString, table, row, coll.getId()));
while (run && !abort) {
EntityTransaction trans = em.getTransaction();
trans.begin();
try {
int affected = q.executeUpdate();
trans.commit();
run = (affected != 0);
// LOG.info("Removed " + affected + " rows setting run to " + run);
} catch (DatabaseException e) {
LOG.warn("Caught exception when removing collection", e);
trans.rollback();
run = false;
abort = true;
}
}
}
}
} }
...@@ -35,6 +35,12 @@ import edu.umiacs.ace.monitor.audit.AuditConfigurationContext; ...@@ -35,6 +35,12 @@ import edu.umiacs.ace.monitor.audit.AuditConfigurationContext;
import edu.umiacs.ace.monitor.audit.AuditConfigurationContext.PauseBean; import edu.umiacs.ace.monitor.audit.AuditConfigurationContext.PauseBean;
import edu.umiacs.ace.util.PersistUtil; import edu.umiacs.ace.util.PersistUtil;
import edu.umiacs.sql.SQL; import edu.umiacs.sql.SQL;
import org.apache.log4j.Logger;
import org.apache.log4j.NDC;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.sql.DataSource;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectInputStream; import java.io.ObjectInputStream;
import java.sql.Blob; import java.sql.Blob;
...@@ -43,11 +49,6 @@ import java.sql.DatabaseMetaData; ...@@ -43,11 +49,6 @@ import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import javax.sql.DataSource;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import org.apache.log4j.Logger;
import org.apache.log4j.NDC;
/** /**
* Token migration listner to migrate tokens from the older pre-1.6 version to the current 1.6+ * Token migration listner to migrate tokens from the older pre-1.6 version to the current 1.6+
...@@ -225,7 +226,7 @@ public class TokenMigrationContextListener implements ServletContextListener { ...@@ -225,7 +226,7 @@ public class TokenMigrationContextListener implements ServletContextListener {
+ writeStmt.executeBatch().length); + writeStmt.executeBatch().length);
writeConn.commit(); writeConn.commit();
LOG.info("Entried processed: " + i); LOG.info("Entries processed: " + i);
} catch ( SQLException e ) { } catch ( SQLException e ) {
writeConn.rollback(); writeConn.rollback();
LOG.error("SQL Exception while moving tokens"); LOG.error("SQL Exception while moving tokens");
......
...@@ -139,7 +139,7 @@ public class IngestStore extends EntityManagerServlet { ...@@ -139,7 +139,7 @@ public class IngestStore extends EntityManagerServlet {
IngestThreadPool tPool = IngestThreadPool.getInstance(); IngestThreadPool tPool = IngestThreadPool.getInstance();
tPool.submitTokens(batchTokens, coll); tPool.submitTokens(batchTokens, coll);
response.sendRedirect("/TokenImportStatus?active=" + coll.getId()); response.sendRedirect("TokenImportStatus?active=" + coll.getId());
} }
} }
...@@ -33,7 +33,7 @@ public class IngestSupervisor implements Runnable { ...@@ -33,7 +33,7 @@ public class IngestSupervisor implements Runnable {
public IngestSupervisor(final Map<String, Token> tokens, final Collection coll) { public IngestSupervisor(final Map<String, Token> tokens, final Collection coll) {
this.tokens = tokens; this.tokens = tokens;
this.coll = coll; this.coll = coll;
this.pool = new ForkJoinPool(); this.pool = ForkJoinPool.commonPool();
this.states = new ConcurrentHashMap<>(); this.states = new ConcurrentHashMap<>();
// so we don't have to worry about npes // so we don't have to worry about npes
...@@ -58,7 +58,7 @@ public class IngestSupervisor implements Runnable { ...@@ -58,7 +58,7 @@ public class IngestSupervisor implements Runnable {
dirTask.quietlyJoin(); dirTask.quietlyJoin();
fileTask.quietlyJoin(); fileTask.quietlyJoin();
pool.shutdown(); // pool.shutdown();
LOG.info("Leaving Supervisor"); LOG.info("Leaving Supervisor");
} }
......
...@@ -32,7 +32,6 @@ package edu.umiacs.ace.monitor.register; ...@@ -32,7 +32,6 @@ package edu.umiacs.ace.monitor.register;
import edu.umiacs.ace.monitor.core.Collection; import edu.umiacs.ace.monitor.core.Collection;
import edu.umiacs.ace.monitor.core.MonitoredItem; import edu.umiacs.ace.monitor.core.MonitoredItem;
import edu.umiacs.ace.monitor.core.MonitoredItemManager;
import edu.umiacs.ace.monitor.core.Token; import edu.umiacs.ace.monitor.core.Token;
import edu.umiacs.ace.monitor.log.LogEnum; import edu.umiacs.ace.monitor.log.LogEnum;
import edu.umiacs.ace.monitor.log.LogEvent; import edu.umiacs.ace.monitor.log.LogEvent;
...@@ -45,6 +44,9 @@ import org.apache.log4j.Logger; ...@@ -45,6 +44,9 @@ import org.apache.log4j.Logger;
import javax.persistence.EntityManager; import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction; import javax.persistence.EntityTransaction;
import javax.persistence.TypedQuery;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date; import java.util.Date;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
...@@ -93,7 +95,7 @@ public class IngestThread extends RecursiveAction { ...@@ -93,7 +95,7 @@ public class IngestThread extends RecursiveAction {
return; return;
} }
if (identifiers.size() < 7000) { if (identifiers.size() < 2500) {
// TODO: I still want to play around with rollbacks in case of failure // TODO: I still want to play around with rollbacks in case of failure
em = PersistUtil.getEntityManager(); em = PersistUtil.getEntityManager();
EntityTransaction trans = em.getTransaction(); EntityTransaction trans = em.getTransaction();
...@@ -115,7 +117,6 @@ public class IngestThread extends RecursiveAction { ...@@ -115,7 +117,6 @@ public class IngestThread extends RecursiveAction {
MonitoredItem item; MonitoredItem item;
session = System.currentTimeMillis(); session = System.currentTimeMillis();
logManager = new LogEventManager(session, coll); logManager = new LogEventManager(session, coll);
MonitoredItemManager mim = new MonitoredItemManager(em);
// Cycle through all items read in and add/update tokens // Cycle through all items read in and add/update tokens
// Commit only if there are no errors in all transactions // Commit only if there are no errors in all transactions
...@@ -123,7 +124,7 @@ public class IngestThread extends RecursiveAction { ...@@ -123,7 +124,7 @@ public class IngestThread extends RecursiveAction {
for (String identifier : identifiers) { for (String identifier : identifiers) {
queued.remove(identifier); queued.remove(identifier);
Token token = tokens.get(identifier); Token token = tokens.get(identifier);
item = mim.getItemByPath(identifier, coll); item = getItemByPath(identifier, coll);
if (item == null) { if (item == null) {
LOG.debug("[Ingest Thread " + Thread.currentThread().getId() LOG.debug("[Ingest Thread " + Thread.currentThread().getId()
+ "] Adding new item " + identifier); + "] Adding new item " + identifier);
...@@ -256,4 +257,28 @@ public class IngestThread extends RecursiveAction { ...@@ -256,4 +257,28 @@ public class IngestThread extends RecursiveAction {
mi.setSize(size); mi.setSize(size);
return mi; return mi;