Commit f68e8cfb authored by Michael Ritter's avatar Michael Ritter
Browse files

Use the CollectionThreadPool for Token Ingest threads

parent 7ee66e80
......@@ -30,27 +30,18 @@
package edu.umiacs.ace.monitor.register;
import edu.umiacs.ace.monitor.audit.AuditThreadFactory;
import edu.umiacs.ace.monitor.core.Collection;
import edu.umiacs.ace.monitor.core.Token;
import edu.umiacs.ace.monitor.settings.SettingsConstants;
import edu.umiacs.ace.token.TokenStoreEntry;
import edu.umiacs.ace.token.TokenStoreReader;
import edu.umiacs.ace.util.CollectionThreadPoolExecutor;
import edu.umiacs.ace.util.TokenUtil;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* TODO - Possibly make a LinkedBlockingQueue that threads can poll from
......@@ -65,14 +56,9 @@ public class IngestThreadPool {
private static final IngestThreadPool instance = new IngestThreadPool();
private static final Logger LOG = Logger.getLogger(IngestThreadPool.class);
private static CopyOnWriteArraySet<Collection> collections;
private static Map<Collection, Set<String>> hasSeen;
private static ForkJoinPool forkJoinPool;
private static int maxThreads =
Integer.parseInt(SettingsConstants.maxIngestThreads);
private static ThreadPoolExecutor executor;
private static LinkedBlockingQueue supervisorQueue =
new LinkedBlockingQueue();
private IngestThreadPool() {
}
......@@ -81,17 +67,8 @@ public class IngestThreadPool {
// We instantiate here to ensure 2 things:
// 1 - We use the correct number for max threads from the DB
// 2 - Java will throw an exception otherwise
if (executor == null) {
executor = new ThreadPoolExecutor(maxThreads, maxThreads, 5, TimeUnit.MINUTES, supervisorQueue);
}
if (hasSeen == null) {
hasSeen = new HashMap<Collection, Set<String>>();
}
if (forkJoinPool == null) {
forkJoinPool = new ForkJoinPool();
}
if (collections == null) {
collections = new CopyOnWriteArraySet<Collection>();
hasSeen = new HashMap<>();
}
return instance;
......@@ -123,27 +100,6 @@ public class IngestThreadPool {
thePool.submitTokens(batchTokens, coll);
}
public static boolean isIngesting(Collection collection) {
return collections.contains(collection);
}
/**
* Something like this to wait until all ingestion has completed before doing anything
*
* @param collection
*/
public static void awaitIngestionComplete(Collection collection) {
while (collections.contains(collection)) {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
LOG.error("Sleep Interrupted, returning");
return;
}
}
}
/**
* Submit a batch of tokens to be ingested
* TODO: This can me memory intensive for large amounts of tokens,
......@@ -153,20 +109,19 @@ public class IngestThreadPool {
* @param coll The collection to ingest to
*/
public void submitTokens(Map<String, Token> tokens, Collection coll) {
if (AuditThreadFactory.isQueued(coll) || AuditThreadFactory.isRunning(coll)) {
LOG.error("Cannot ingest tokens for a collection which is auditing");
} else if (executor.getActiveCount() == executor.getMaximumPoolSize()) {
LOG.error("Executor has reached it's maximum limit");
} else {
executor.execute(new IngestSupervisor(tokens, coll));
}
CollectionThreadPoolExecutor executor = CollectionThreadPoolExecutor.getExecutor();
// We may want to save the Future so that we can display information
// about the current ingestion
executor.submitIngestThread(coll, new IngestSupervisor(tokens, coll));
}
// TODO: Figure out how to display this... probably will move to the status display like audits
public String getStatus() {
return String.format("[Thread Pool] Active: %d, Completed: %d, Total: %d",
return String.format("[Thread Pool] Active"); /*: %d, Completed: %d, Total: %d",
executor.getActiveCount(),
executor.getCompletedTaskCount(),
executor.getTaskCount());
executor.getTaskCount());*/
}
// Not entirely accurate for the jsp, but it'll show what collections are
......@@ -181,54 +136,12 @@ public class IngestThreadPool {
protected static void shutdownPools() {
LOG.debug("[Ingest] Shutting down thread pools.");
/*
executor.shutdown();
if (!executor.isTerminated()) {
executor.shutdownNow();
}
*/
}
/**
* A private class to supervise token ingestion. We use it to keep track of
* what collections we have seen
*
*/
public class IngestSupervisor implements Runnable {
private Map<String, Token> tokens;
private Collection coll;
public IngestSupervisor(final Map<String, Token> tokens, final Collection coll) {
this.tokens = tokens;
this.coll = coll;
}
public void run() {
collections.add(coll);
try {
ForkJoinTask dirTask = forkJoinPool.submit(new IngestDirectory(tokens.keySet(), coll));
// Remove any tokens we've already seen and can possibly be in progress
// Possibly release tokens after the thread has finished merging them
Set<String> tokensSeen = hasSeen.get(coll);
if (tokensSeen == null) {
tokensSeen = new HashSet<String>();
tokensSeen.addAll(tokens.keySet());
} else {
tokens.keySet().removeAll(hasSeen.get(coll));
tokensSeen.addAll(tokens.keySet());
}
hasSeen.put(coll, tokensSeen);
// Split the token store we're given up equally among our threads
// and submit jobs to the thread pool
List<String> keyList = new ArrayList<String>(tokens.keySet());
ForkJoinTask fileTask = forkJoinPool.submit(new IngestThread(tokens, coll, keyList));
dirTask.quietlyJoin();
fileTask.quietlyJoin();
} finally {
collections.remove(coll);
}
}
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment