Commit 5d19210e authored by Michael Ritter's avatar Michael Ritter
Browse files

Update the AuditThreadFactory to use the CollectionThreadPool

parent 709ee0ae
......@@ -31,22 +31,24 @@
package edu.umiacs.ace.monitor.audit;
import edu.umiacs.ace.driver.StorageDriver;
import edu.umiacs.ace.monitor.core.MonitoredItem;
import edu.umiacs.ace.monitor.core.Collection;
import edu.umiacs.ace.monitor.core.MonitoredItem;
import edu.umiacs.ace.util.CollectionThreadPoolExecutor;
import edu.umiacs.ace.util.KnownFuture;
import edu.umiacs.ace.util.PersistUtil;
import edu.umiacs.ace.util.Submittable;
import edu.umiacs.util.Strings;
import org.apache.log4j.Logger;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import java.security.SecureRandom;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import edu.umiacs.util.Strings;
import org.apache.log4j.Logger;
import org.apache.log4j.NDC;
import static edu.umiacs.ace.util.Submittable.RunState.QUEUED;
import static edu.umiacs.ace.util.Submittable.RunState.RUNNING;
/**w
*
......@@ -55,13 +57,9 @@ import org.apache.log4j.NDC;
public class AuditThreadFactory {
private static final Logger LOG = Logger.getLogger(AuditThreadFactory.class);
private static ConcurrentHashMap<Collection, KnownFuture<Submittable>> audits
= new ConcurrentHashMap<>();
private static int max_audits = 3;
private static final ConcurrentHashMap<Collection, AuditThread> runningAudits =
new ConcurrentHashMap<Collection, AuditThread>();
private static final LinkedBlockingQueue<Runnable> blockingQueue =
new LinkedBlockingQueue<Runnable>();
private static final ThreadPoolExecutor executor =
new ThreadPoolExecutor(3, 3, 2, TimeUnit.MINUTES, blockingQueue);
private static String imsHost = null;
private static int imsPort = 8080;
private static String tokenClass = "SHA-256";
......@@ -131,8 +129,7 @@ public class AuditThreadFactory {
}
public static boolean isAuditing() {
return executor.getActiveCount() != 0;
//return !runningThreads.isEmpty();
return !audits.isEmpty();
}
/**
......@@ -147,56 +144,60 @@ public class AuditThreadFactory {
boolean verbose,
MonitoredItem... startItem) {
// Note: Because we don't put the collection/thread in the map atomically, we need to lock
CollectionThreadPoolExecutor executor = getExecutor();
LOG.trace("Creating new thread for " + c.getName());
AuditThread newThread = null;
synchronized ( runningAudits ) {
boolean contains = runningAudits.contains(c);
if (!contains) {
LOG.trace("Creating new thread for " + c.getName());
newThread = new AuditThread(c, tri, auditOnly, verbose, startItem);
newThread.setImsHost(imsHost);
newThread.setImsPort(imsPort);
newThread.setTokenClassName(tokenClass);
runningAudits.put(c, newThread);
executor.execute(newThread);
}
return newThread;
newThread = new AuditThread(c, tri, auditOnly, verbose, startItem);
newThread.setImsHost(imsHost);
newThread.setImsPort(imsPort);
newThread.setTokenClassName(tokenClass);
KnownFuture<Submittable> f = executor.submitFileAudit(c, newThread);
if (f != null) {
audits.put(c, f);
}
return null;
}
public static AuditThread getThread(Collection c) {
AuditThread thread;
thread = runningAudits.get(c);
return thread;
KnownFuture<Submittable> future = audits.get(c);
if (future != null) {
return (AuditThread) future.getKnownResult().getThread();
}
return null;
}
public static final boolean isQueued(Collection c) {
return blockingQueue.contains(getThread(c));
return checkState(c, QUEUED);
}
public static final boolean isRunning(Collection c) {
AuditThread thread = getThread(c);
return thread != null && !blockingQueue.contains(thread);
return checkState(c, RUNNING);
}
private static boolean checkState(Collection c, Submittable.RunState state) {
KnownFuture<Submittable> future = audits.get(c);
if (future != null) {
// purge
if (future.isDone()) {
audits.remove(c);
}
Submittable s = future.getKnownResult();
if (s != null) {
return s.getState() == state;
}
}
return false;
}
static void cancellAll() {
LOG.info("Shutting down audits");
/*
for ( AuditThread thread : runningAudits.values() ) {
LOG.info("Canceling threads");
thread.cancel();
for (KnownFuture<Submittable> submittable : audits.values()) {
submittable.cancel(true);
}
*/
executor.shutdown();
if ( !executor.isTerminated() ) {
LOG.info("Shutting down Executor");
executor.shutdownNow();
}
// Hm... this seems to work... still need more testing
for ( Runnable r : blockingQueue ) {
r = null;
}
runningAudits.clear();
blockingQueue.clear();
}
public static int getMaxAudits() {
......@@ -209,8 +210,6 @@ public class AuditThreadFactory {
return;
}
AuditThreadFactory.max_audits = max_audits;
executor.setCorePoolSize(max_audits);
executor.setMaximumPoolSize(max_audits*2);
}
public static void setSSL(Boolean ssl) {
......@@ -243,20 +242,24 @@ public class AuditThreadFactory {
// Clean up everything which may contain a reference to the thread
// Thread will only ever be removed once, so no need to worry about
// race conditions
AuditThread thread = runningAudits.remove(c);
if ( thread != null ) {
LOG.debug("Removing old audit thread from thread pool executor");
executor.remove(thread);
blockingQueue.remove(thread);
runningAudits.remove(c);
thread = null;
}
// AuditThread thread = runningAudits.remove(c);
// if ( thread != null ) {
// LOG.debug("Removing old audit thread from thread pool executor");
// executor.remove(thread);
// blockingQueue.remove(thread);
// runningAudits.remove(c);
// thread = null;
// }
}
public static boolean useSSL() {
return AuditThreadFactory.ssl;
}
private static CollectionThreadPoolExecutor getExecutor() {
return CollectionThreadPoolExecutor.getExecutor();
}
private static MonitoredItem[] getSampledList(Collection c) {
EntityManager em = PersistUtil.getEntityManager();
Query q = em.createNamedQuery("MonitoredItem.listIds");
......@@ -275,4 +278,16 @@ public class AuditThreadFactory {
return items.toArray(new MonitoredItem[items.size()]);
}
public static void cancel(Collection collection) {
LOG.info("Cancelling audit on " + collection.getName());
KnownFuture<Submittable> future = audits.get(collection);
if (future != null) {
Submittable result = future.getKnownResult();
if (result != null) {
AuditThread thread = (AuditThread) result.getThread();
thread.cancel();
}
future.cancel(true);
}
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment