Commit fe6e2be6 authored by Michael Ritter's avatar Michael Ritter
Browse files

Resolve "NPE in IngestThreadPool submitTokens"

ace-am/
  - check null when caching IngestSupervisor future
  - create enum for show duplicate task submission
  - return 409 if a collection is already ingesting tokens
parent 05cda902
...@@ -136,28 +136,27 @@ public class AuditThreadFactory { ...@@ -136,28 +136,27 @@ public class AuditThreadFactory {
/** /**
* Return a new or existing thread if any room is available New threads will start replication * Return a new or existing thread if any room is available New threads will start replication
* *
* @param c * @param collection the {@link Collection} to run a file audit on
* @param tri * @param driver the {@link StorageDriver} for retrieving files
* @return * @param verbose flag for setting verbose output of the {@link AuditThread}
* @param startItem the first {@link MonitoredItem} to audit from or null to audit all items
*/ */
public static AuditThread createThread(Collection c, public static void createThread(Collection collection,
StorageDriver tri, StorageDriver driver,
boolean verbose, boolean verbose,
MonitoredItem... startItem) { MonitoredItem... startItem) {
// Note: Because we don't put the collection/thread in the map atomically, we need to lock // Note: Because we don't put the collection/thread in the map atomically, we need to lock
CollectionThreadPoolExecutor executor = getExecutor(); CollectionThreadPoolExecutor executor = getExecutor();
LOG.trace("Creating new thread for " + c.getName()); LOG.trace("Creating new thread for " + collection.getName());
AuditThread newThread; AuditThread newThread;
newThread = new AuditThread(c, tri, auditOnly, verbose, startItem); newThread = new AuditThread(collection, driver, auditOnly, verbose, startItem);
newThread.setImsHost(imsHost); newThread.setImsHost(imsHost);
newThread.setImsPort(imsPort); newThread.setImsPort(imsPort);
newThread.setTokenClassName(tokenClass); newThread.setTokenClassName(tokenClass);
KSFuture<AuditThread> f = executor.submitFileAudit(c, newThread); KSFuture<AuditThread> f = executor.submitFileAudit(collection, newThread);
if (f != null) { if (f != null) {
audits.put(c, f); audits.put(collection, f);
} }
return null;
} }
public static AuditThread getThread(Collection c) { public static AuditThread getThread(Collection c) {
......
...@@ -99,10 +99,9 @@ public final class AuditTokens extends Thread implements CancelCallback { ...@@ -99,10 +99,9 @@ public final class AuditTokens extends Thread implements CancelCallback {
/** /**
* Return a new or existing thread. New threads will start replication * Return a new or existing thread. New threads will start replication
* *
* @param c the collection to create a thread for * @param c the {@link Collection} to create an {@link AuditTokens} thread for
* @return The AuditTokens thread
*/ */
public static AuditTokens createThread(Collection c) { public static void createThread(Collection c) {
CollectionThreadPoolExecutor executor = CollectionThreadPoolExecutor.getExecutor(); CollectionThreadPoolExecutor executor = CollectionThreadPoolExecutor.getExecutor();
AuditTokens at = new AuditTokens(c); AuditTokens at = new AuditTokens(c);
at.imsHost = AuditThreadFactory.getIMS(); at.imsHost = AuditThreadFactory.getIMS();
...@@ -111,14 +110,9 @@ public final class AuditTokens extends Thread implements CancelCallback { ...@@ -111,14 +110,9 @@ public final class AuditTokens extends Thread implements CancelCallback {
// If we already submitted, return the existing thread // If we already submitted, return the existing thread
// Else add the current thread to the map // Else add the current thread to the map
if (future == null) { if (future != null) {
KSFuture<AuditTokens> previous = runningThreads.get(c);
at = getAuditTokens(previous);
} else {
runningThreads.put(c, future); runningThreads.put(c, future);
} }
return at;
} }
public static boolean isRunning(Collection c) { public static boolean isRunning(Collection c) {
......
...@@ -34,6 +34,7 @@ import com.github.benmanes.caffeine.cache.Cache; ...@@ -34,6 +34,7 @@ import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Caffeine;
import edu.umiacs.ace.monitor.core.Collection; import edu.umiacs.ace.monitor.core.Collection;
import edu.umiacs.ace.monitor.core.Token; import edu.umiacs.ace.monitor.core.Token;
import edu.umiacs.ace.monitor.support.SubmissionResult;
import edu.umiacs.ace.token.TokenStoreEntry; import edu.umiacs.ace.token.TokenStoreEntry;
import edu.umiacs.ace.token.TokenStoreReader; import edu.umiacs.ace.token.TokenStoreReader;
import edu.umiacs.ace.util.CollectionThreadPoolExecutor; import edu.umiacs.ace.util.CollectionThreadPoolExecutor;
...@@ -84,7 +85,7 @@ public class IngestThreadPool { ...@@ -84,7 +85,7 @@ public class IngestThreadPool {
* @param tokenReader The token store * @param tokenReader The token store
* @param coll The collection to ingest to * @param coll The collection to ingest to
*/ */
public static void submitTokenStore(TokenStoreReader tokenReader, Collection coll) { public static SubmissionResult submitTokenStore(TokenStoreReader tokenReader, Collection coll) {
if (tokenReader == null) { if (tokenReader == null) {
throw new RuntimeException("Token file is corrupt"); throw new RuntimeException("Token file is corrupt");
} }
...@@ -101,7 +102,8 @@ public class IngestThreadPool { ...@@ -101,7 +102,8 @@ public class IngestThreadPool {
} }
batchTokens.put(tokenEntry.getIdentifiers().get(0), token); batchTokens.put(tokenEntry.getIdentifiers().get(0), token);
} }
thePool.submitTokens(batchTokens, coll);
return thePool.submitTokens(batchTokens, coll);
} }
/** /**
...@@ -112,14 +114,21 @@ public class IngestThreadPool { ...@@ -112,14 +114,21 @@ public class IngestThreadPool {
* @param tokens The tokens to ingest (mapping path to the token) * @param tokens The tokens to ingest (mapping path to the token)
* @param coll The collection to ingest to * @param coll The collection to ingest to
*/ */
public void submitTokens(Map<String, Token> tokens, Collection coll) { public SubmissionResult submitTokens(Map<String, Token> tokens, Collection coll) {
SubmissionResult result = SubmissionResult.CONFLICT;
CollectionThreadPoolExecutor executor = CollectionThreadPoolExecutor.getExecutor(); CollectionThreadPoolExecutor executor = CollectionThreadPoolExecutor.getExecutor();
// We may want to save the Future so that we can display information // We may want to save the Future so that we can display information
// about the current ingestion // about the current ingestion
KSFuture<IngestSupervisor> future = KSFuture<IngestSupervisor> future =
executor.submitIngestThread(coll, new IngestSupervisor(tokens, coll)); executor.submitIngestThread(coll, new IngestSupervisor(tokens, coll));
cache.put(coll, future);
if (future != null) {
cache.put(coll, future);
result = SubmissionResult.SUCCESS;
}
return result;
} }
public ConcurrentMap<Collection, KSFuture<IngestSupervisor>> getCache() { public ConcurrentMap<Collection, KSFuture<IngestSupervisor>> getCache() {
......
package edu.umiacs.ace.monitor.support;
/**
* Encapsulate the result when trying to submit a collection for processing. Separate from the
* {@link edu.umiacs.ace.util.CollectionThreadPoolExecutor} because that will return the future.
* This is more for the API to signal what response code we should return.
*
* @author shake
*/
public enum SubmissionResult {
SUCCESS, CONFLICT
}
...@@ -7,10 +7,16 @@ package edu.umiacs.ace.rest; ...@@ -7,10 +7,16 @@ package edu.umiacs.ace.rest;
import edu.umiacs.ace.monitor.core.Collection; import edu.umiacs.ace.monitor.core.Collection;
import edu.umiacs.ace.monitor.register.IngestThreadPool; import edu.umiacs.ace.monitor.register.IngestThreadPool;
import edu.umiacs.ace.monitor.support.SubmissionResult;
import edu.umiacs.ace.token.TokenStoreReader; import edu.umiacs.ace.token.TokenStoreReader;
import edu.umiacs.ace.util.PersistUtil; import edu.umiacs.ace.util.PersistUtil;
import java.io.IOException; import org.apache.commons.fileupload.FileItem;
import java.util.List; import org.apache.commons.fileupload.FileItemFactory;
import org.apache.commons.fileupload.FileUploadException;
import org.apache.commons.fileupload.disk.DiskFileItemFactory;
import org.apache.commons.fileupload.servlet.ServletFileUpload;
import org.apache.log4j.Logger;
import javax.persistence.EntityManager; import javax.persistence.EntityManager;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.POST; import javax.ws.rs.POST;
...@@ -19,12 +25,8 @@ import javax.ws.rs.PathParam; ...@@ -19,12 +25,8 @@ import javax.ws.rs.PathParam;
import javax.ws.rs.core.Context; import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.Response.Status;
import org.apache.commons.fileupload.FileItem; import java.io.IOException;
import org.apache.commons.fileupload.FileItemFactory; import java.util.List;
import org.apache.commons.fileupload.FileUploadException;
import org.apache.commons.fileupload.disk.DiskFileItemFactory;
import org.apache.commons.fileupload.servlet.ServletFileUpload;
import org.apache.log4j.Logger;
/** /**
* Service for manipulating whole token stores * Service for manipulating whole token stores
...@@ -46,7 +48,9 @@ public class TokenStoreUpload { ...@@ -46,7 +48,9 @@ public class TokenStoreUpload {
FileItemFactory factory = new DiskFileItemFactory(); FileItemFactory factory = new DiskFileItemFactory();
ServletFileUpload upload = new ServletFileUpload(factory); ServletFileUpload upload = new ServletFileUpload(factory);
List<FileItem> items = null; List<FileItem> items;
SubmissionResult subResult = null;
Response response = Response.ok().build();
try { try {
items = upload.parseRequest(request); items = upload.parseRequest(request);
LOG.info("item size: " + items.size()); LOG.info("item size: " + items.size());
...@@ -54,9 +58,16 @@ public class TokenStoreUpload { ...@@ -54,9 +58,16 @@ public class TokenStoreUpload {
LOG.info("item " + item.getSize() + " " + item.getName() + " " + item.getContentType()); LOG.info("item " + item.getSize() + " " + item.getName() + " " + item.getContentType());
Collection coll = em.find(Collection.class, collectionId); Collection coll = em.find(Collection.class, collectionId);
TokenStoreReader reader = new TokenStoreReader(item.getInputStream()); TokenStoreReader reader = new TokenStoreReader(item.getInputStream());
IngestThreadPool.submitTokenStore(reader, coll); subResult = IngestThreadPool.submitTokenStore(reader, coll);
} }
return Response.ok().build();
if (subResult == null) {
response = Response.status(Status.BAD_REQUEST).build();
} else if (subResult == SubmissionResult.CONFLICT) {
response = Response.status(Status.CONFLICT).build();
}
return response;
} }
catch (FileUploadException e) { catch (FileUploadException e) {
LOG.error("Error parsing token store upload",e); LOG.error("Error parsing token store upload",e);
......
...@@ -12,6 +12,9 @@ import static edu.umiacs.ace.util.Submittable.RunState.RUNNING; ...@@ -12,6 +12,9 @@ import static edu.umiacs.ace.util.Submittable.RunState.RUNNING;
/** /**
* KnownSubmittableFuture. Just so we have less generics going around. * KnownSubmittableFuture. Just so we have less generics going around.
* *
* Note: We could extend FutureTask here instead of in the KnownFuture - one less class to have
* floating around.
*
* Created by shake on 9/14/15. * Created by shake on 9/14/15.
*/ */
public class KSFuture<V extends Runnable> extends KnownFuture<Submittable<V>> { public class KSFuture<V extends Runnable> extends KnownFuture<Submittable<V>> {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment