Commit 709ee0ae authored by Michael Ritter's avatar Michael Ritter
Browse files

Return Futures when submitting

parent cd5e4ece
......@@ -4,13 +4,20 @@ import edu.umiacs.ace.monitor.audit.AuditThread;
import edu.umiacs.ace.monitor.audit.AuditTokens;
import edu.umiacs.ace.monitor.core.Collection;
import edu.umiacs.ace.monitor.register.IngestThreadPool;
import edu.umiacs.ace.monitor.settings.SettingsConstants;
import edu.umiacs.ace.monitor.settings.SettingsParameter;
import edu.umiacs.ace.monitor.settings.SettingsUtil;
import org.apache.log4j.Logger;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static edu.umiacs.ace.monitor.settings.SettingsConstants.PARAM_THROTTLE_MAXAUDIT;
/**
* ThreadPoolExecutor for managing the various types of threads which can be run on collections.
* Each thread can mutate the collection, so we use this as a way to control which types of threads
......@@ -19,8 +26,10 @@ import java.util.concurrent.TimeUnit;
* Created by shake on 9/8/15.
*/
public class CollectionThreadPoolExecutor extends ThreadPoolExecutor {
private static final Logger LOG = Logger.getLogger(CollectionThreadPoolExecutor.class);
private final Set<Submittable> set = new ConcurrentSkipListSet<Submittable>();
private static final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
private final Set<Submittable> set = new ConcurrentSkipListSet<>();
public CollectionThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
......@@ -30,30 +39,80 @@ public class CollectionThreadPoolExecutor extends ThreadPoolExecutor {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
// Initialization
static class ExecutorHolder {
static final CollectionThreadPoolExecutor INSTANCE =
createThreadPool();
private static CollectionThreadPoolExecutor createThreadPool() {
SettingsParameter attr = SettingsUtil.getItemByAttr(PARAM_THROTTLE_MAXAUDIT);
int max = Integer.parseInt(attr.getValue());
if (max <= 0) {
max = Integer.parseInt(SettingsConstants.maxAudit);
}
return new CollectionThreadPoolExecutor(max, max, 1, TimeUnit.MINUTES, queue);
}
}
public static CollectionThreadPoolExecutor getExecutor() {
return ExecutorHolder.INSTANCE;
}
// Public interface to interact with the TPE
public boolean submitFileAudit(Collection c, AuditThread at) {
Submittable submittable = new Submittable(c, RunType.FILE_AUDIT);
public KnownFuture<Submittable> submitFileAudit(Collection c, AuditThread at) {
Submittable submittable = new Submittable(c, Submittable.RunType.FILE_AUDIT, at);
return submitThread(at, submittable);
}
public boolean submitIngestThread(Collection c, IngestThreadPool.IngestSupervisor is) {
Submittable submittable = new Submittable(c, RunType.TOKEN_INGEST);
public KnownFuture<Submittable> submitIngestThread(Collection c, IngestThreadPool.IngestSupervisor is) {
Submittable submittable = new Submittable(c, Submittable.RunType.TOKEN_INGEST, is);
return submitThread(is, submittable);
}
public boolean submitTokenAduit(Collection c, AuditTokens at) {
Submittable submittable = new Submittable(c, RunType.TOKEN_AUDIT);
public KnownFuture<Submittable> submitTokenAduit(Collection c, AuditTokens at) {
Submittable submittable = new Submittable(c, Submittable.RunType.TOKEN_AUDIT, at);
return submitThread(at, submittable);
}
private boolean submitThread(Runnable r, Submittable s) {
public Runnable getRunnableForSubmittable(Collection c, Submittable.RunType t) {
// hm...
Submittable s = new Submittable(c, t, null);
for (Submittable submittable : set) {
if (submittable.equals(s)) {
return submittable.getThread();
}
}
return null;
}
public Submittable getSubmission(Collection c, Submittable.RunType t) {
Submittable s = new Submittable(c, t, null);
for (Submittable submittable : set) {
if (submittable.equals(s)) {
return submittable;
}
}
return null;
}
/**
* Method for submitting threads. We wrap it in a KnownFuture so that we
* can extract the Submittable without blocking on the thread.
*
* @param r The runnable to submit
* @param s The submittable to use
* @return true if a new thread was created, false otherwise
*/
private KnownFuture<Submittable> submitThread(Runnable r, Submittable s) {
if (set.add(s)) {
this.execute(new KnownFuture<Submittable>(r, s));
return true;
KnownFuture<Submittable> f = new KnownFuture<>(r, s);
execute(f);
return f;
}
return false;
return null;
}
// Overrides to allow us to display information about running threads in ACE
......@@ -67,7 +126,7 @@ public class CollectionThreadPoolExecutor extends ThreadPoolExecutor {
if (known instanceof Submittable) {
Submittable s = (Submittable) known;
s.setState(RunState.RUNNING);
s.setState(Submittable.RunState.RUNNING);
}
}
......@@ -86,46 +145,4 @@ public class CollectionThreadPoolExecutor extends ThreadPoolExecutor {
}
}
private enum RunType {
FILE_AUDIT, TOKEN_AUDIT, TOKEN_INGEST
}
public enum RunState {
QUEUED, RUNNING
}
private class Submittable {
final Collection collection;
final RunType type;
RunState state;
private Submittable(Collection collection, RunType type) {
this.collection = collection;
this.type = type;
this.state = RunState.QUEUED;
}
public void setState(RunState state) {
this.state = state;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Submittable that = (Submittable) o;
if (!collection.equals(that.collection)) return false;
return type == that.type;
}
@Override
public int hashCode() {
int result = collection.hashCode();
result = 31 * result + type.hashCode();
return result;
}
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment