Commit f76bebe1 authored by Michael Ritter's avatar Michael Ritter
Browse files

Migrate to KSFuture

parent 90ae78a1
......@@ -34,7 +34,7 @@ import edu.umiacs.ace.driver.StorageDriver;
import edu.umiacs.ace.monitor.core.Collection;
import edu.umiacs.ace.monitor.core.MonitoredItem;
import edu.umiacs.ace.util.CollectionThreadPoolExecutor;
import edu.umiacs.ace.util.KnownFuture;
import edu.umiacs.ace.util.KSFuture;
import edu.umiacs.ace.util.PersistUtil;
import edu.umiacs.ace.util.Submittable;
import edu.umiacs.util.Strings;
......@@ -57,7 +57,7 @@ import static edu.umiacs.ace.util.Submittable.RunState.RUNNING;
public class AuditThreadFactory {
private static final Logger LOG = Logger.getLogger(AuditThreadFactory.class);
private static ConcurrentHashMap<Collection, KnownFuture<Submittable>> audits
private static ConcurrentHashMap<Collection, KSFuture<AuditThread>> audits
= new ConcurrentHashMap<>();
private static int max_audits = 3;
private static String imsHost = null;
......@@ -146,12 +146,12 @@ public class AuditThreadFactory {
// Note: Because we don't put the collection/thread in the map atomically, we need to lock
CollectionThreadPoolExecutor executor = getExecutor();
LOG.trace("Creating new thread for " + c.getName());
AuditThread newThread = null;
AuditThread newThread;
newThread = new AuditThread(c, tri, auditOnly, verbose, startItem);
newThread.setImsHost(imsHost);
newThread.setImsPort(imsPort);
newThread.setTokenClassName(tokenClass);
KnownFuture<Submittable> f = executor.submitFileAudit(c, newThread);
KSFuture f = executor.submitFileAudit(c, newThread);
if (f != null) {
audits.put(c, f);
}
......@@ -160,9 +160,9 @@ public class AuditThreadFactory {
}
public static AuditThread getThread(Collection c) {
KnownFuture<Submittable> future = audits.get(c);
KSFuture<AuditThread> future = audits.get(c);
if (future != null) {
return (AuditThread) future.getKnownResult().getThread();
return future.getKnownResult().getThread();
}
return null;
......@@ -177,7 +177,7 @@ public class AuditThreadFactory {
}
private static boolean checkState(Collection c, Submittable.RunState state) {
KnownFuture<Submittable> future = audits.get(c);
KSFuture<AuditThread> future = audits.get(c);
if (future != null) {
// purge
if (future.isDone()) {
......@@ -195,7 +195,7 @@ public class AuditThreadFactory {
static void cancellAll() {
LOG.info("Shutting down audits");
for (KnownFuture<Submittable> submittable : audits.values()) {
for (KSFuture submittable : audits.values()) {
submittable.cancel(true);
}
}
......@@ -268,7 +268,7 @@ public class AuditThreadFactory {
int size = (int) Math.ceil(Math.sqrt(itemIds.size()));
SecureRandom rand = new SecureRandom();
List<MonitoredItem> items = new LinkedList<MonitoredItem>();
List<MonitoredItem> items = new LinkedList<>();
for ( int i=0;i < size; i++) {
int idxToGet = rand.nextInt(itemIds.size());
MonitoredItem item = em.find(MonitoredItem.class, itemIds.remove(idxToGet));
......@@ -280,11 +280,11 @@ public class AuditThreadFactory {
public static void cancel(Collection collection) {
LOG.info("Cancelling audit on " + collection.getName());
KnownFuture<Submittable> future = audits.get(collection);
KSFuture<AuditThread> future = audits.get(collection);
if (future != null) {
Submittable result = future.getKnownResult();
Submittable<AuditThread> result = future.getKnownResult();
if (result != null) {
AuditThread thread = (AuditThread) result.getThread();
AuditThread thread = result.getThread();
thread.cancel();
}
future.cancel(true);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment