Commit c1564eab authored by Michael Ritter's avatar Michael Ritter
Browse files

Simplify the CollectionThreadPool

parent 93322b1e
......@@ -3,7 +3,7 @@ package edu.umiacs.ace.util;
import edu.umiacs.ace.monitor.audit.AuditThread;
import edu.umiacs.ace.monitor.audit.AuditTokens;
import edu.umiacs.ace.monitor.core.Collection;
import edu.umiacs.ace.monitor.register.IngestThreadPool;
import edu.umiacs.ace.monitor.register.IngestSupervisor;
import edu.umiacs.ace.monitor.settings.SettingsConstants;
import edu.umiacs.ace.monitor.settings.SettingsParameter;
import edu.umiacs.ace.monitor.settings.SettingsUtil;
......@@ -17,12 +17,13 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static edu.umiacs.ace.monitor.settings.SettingsConstants.PARAM_THROTTLE_MAXAUDIT;
import static edu.umiacs.ace.util.Submittable.RunType;
/**
* ThreadPoolExecutor for managing the various types of threads which can be run on collections.
* Each thread can mutate the collection, so we use this as a way to control which types of threads
* get run and when.
*
* <p/>
* Created by shake on 9/8/15.
*/
public class CollectionThreadPoolExecutor extends ThreadPoolExecutor {
......@@ -60,48 +61,50 @@ public class CollectionThreadPoolExecutor extends ThreadPoolExecutor {
// Public interface to interact with the TPE
/**
* Submit a File Audit to run for a collection
*
* @param c the collection to audit
* @param at the audit thread to use
* @return {@link KSFuture} containing the submitted request
*/
public KSFuture submitFileAudit(Collection c, AuditThread at) {
LOG.info("Submitting [FILE_AUDIT]" + " for " + c.getName());
Submittable submittable = new Submittable(c, Submittable.RunType.FILE_AUDIT, at);
Submittable submittable = new Submittable(c, RunType.FILE_AUDIT, at);
return submitThread(at, submittable, c);
}
public KSFuture submitIngestThread(Collection c, IngestThreadPool.IngestSupervisor is) {
Submittable submittable = new Submittable(c, Submittable.RunType.TOKEN_INGEST, is);
/**
* Submit a request to Ingest Tokens for a collection
*
* @param c the collection to use
* @param is the IngestSupervisor to run
* @return
* @return {@link KSFuture} containing the submitted request
*/
public KSFuture submitIngestThread(Collection c, IngestSupervisor is) {
LOG.info("Submitting [INGEST_THREAD]" + " for " + c.getName());
Submittable submittable = new Submittable(c, RunType.TOKEN_INGEST, is);
return submitThread(is, submittable, c);
}
/**
* Submit a TokenAudit to run for a collection
*
* @param c the collection to audit
* @param at the audit thread to use
* @return {@link KSFuture} containing the submitted request
*/
public KSFuture submitTokenAudit(Collection c, AuditTokens at) {
LOG.info("Submitting [TOKEN_AUDIT]" + " for " + c.getName());
Submittable submittable = new Submittable(c, Submittable.RunType.TOKEN_AUDIT, at);
Submittable submittable = new Submittable(c, RunType.TOKEN_AUDIT, at);
return submitThread(at, submittable, c);
}
public Runnable getRunnableForSubmittable(Collection c, Submittable.RunType t) {
// hm...
Submittable s = new Submittable(c, t, null);
for (Submittable submittable : set) {
if (submittable.equals(s)) {
return submittable.getThread();
}
}
return null;
}
public Submittable getSubmission(Collection c, Submittable.RunType t) {
Submittable s = new Submittable(c, t, null);
for (Submittable submittable : set) {
if (submittable.equals(s)) {
return submittable;
}
}
return null;
}
/**
* Method for submitting threads. We wrap it in a KnownFuture so that we
* can extract the Submittable without blocking on the thread.
* can extract the Submittable without blocking on the thread and to allow
* locking on each collection.
*
* @param r The runnable to submit
* @param s The submittable to use
......@@ -114,40 +117,34 @@ public class CollectionThreadPoolExecutor extends ThreadPoolExecutor {
return f;
}
Submittable.RunType t = s.getType();
RunType t = s.getType();
LOG.info("Already submitted RunType [" + t.name() + "]" + " for " + c.getName());
return null;
}
// Overrides to allow us to display information about running threads in ACE
protected void beforeExecute(Thread t, Runnable r) {
// Find our Submittable and update the state
// Type erasure :(
if (r instanceof KnownFuture) {
KnownFuture future = (KnownFuture) r;
Object known = future.getKnownResult();
if (known instanceof Submittable) {
Submittable s = (Submittable) known;
s.setState(Submittable.RunState.RUNNING);
}
}
super.beforeExecute(t, r);
}
/**
* Remove any {@link Submittable} from the working set
*
* @param r
* @param t
*/
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (r instanceof KnownFuture) {
KnownFuture future = (KnownFuture) r;
if (r instanceof KSFuture) {
KSFuture future = (KSFuture) r;
Object known = future.getKnownResult();
if (known instanceof Submittable) {
set.remove(known);
Submittable s = (Submittable) known;
set.remove(s);
}
}
if (t != null) {
LOG.error("semaphore error", t);
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment