Commit ba92b7b1 authored by Michael Ritter's avatar Michael Ritter
Browse files

Merge branch 'feature/collection-thread-pool' into 'develop'

Feature/collection-thread-pool

This adds a new model for how we control threads when operating on collections. So any file audit/token audit/token registration will be controlled by the same source.

See merge request !1
parents cefe3653 bb39ed5d
......@@ -43,6 +43,7 @@
</configuration>
</plugin>
<!-- debian package made easy -->
<!--
<plugin>
<artifactId>jdeb</artifactId>
<groupId>org.vafer</groupId>
......@@ -78,6 +79,7 @@
</execution>
</executions>
</plugin>
-->
</plugins>
</build>
......@@ -89,7 +91,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.tomcat</groupId>
<artifactId>catalina</artifactId>
......@@ -116,11 +117,6 @@
</dependency>
<!-- for json -->
<!--<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.0.1</version>
</dependency>-->
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
......@@ -162,22 +158,32 @@
<version>0.4.1</version>
</dependency>
<!-- Various libs for helping with stuff -->
<dependency>
<groupId>commons-fileupload</groupId>
<artifactId>commons-fileupload</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
<dependency>
<groupId>quartz</groupId>
<artifactId>quartz</artifactId>
......
......@@ -183,11 +183,21 @@ public final class AuditThread extends Thread implements CancelCallback {
@Override
public void cancel() {
LOG.info("Received cancel for audit " + coll.getName());
cancel = true;
if (iterableItems != null) {
iterableItems.cancel();
}
if ( AuditThreadFactory.isRunning(coll) || AuditThreadFactory.isQueued(coll)) {
if (batch != null) {
batch.close();
}
if (validator != null) {
validator.close();
}
if (AuditThreadFactory.isRunning(coll) || AuditThreadFactory.isQueued(coll)) {
AuditThreadFactory.finished(coll);
}
this.interrupt();
......@@ -207,10 +217,12 @@ public final class AuditThread extends Thread implements CancelCallback {
session = System.currentTimeMillis();
logManager = new LogEventManager(session, coll);
logAuditStart();
/*
while (IngestThreadPool.isIngesting(coll)) {
LOG.debug("Waiting for ingest to finish");
Thread.sleep(500);
}
*/
callback = new FileAuditCallback(coll, session, this);
boolean auditTokens = SettingsUtil.getBoolean(coll,
......
......@@ -31,22 +31,24 @@
package edu.umiacs.ace.monitor.audit;
import edu.umiacs.ace.driver.StorageDriver;
import edu.umiacs.ace.monitor.core.MonitoredItem;
import edu.umiacs.ace.monitor.core.Collection;
import edu.umiacs.ace.monitor.core.MonitoredItem;
import edu.umiacs.ace.util.CollectionThreadPoolExecutor;
import edu.umiacs.ace.util.KSFuture;
import edu.umiacs.ace.util.PersistUtil;
import edu.umiacs.ace.util.Submittable;
import edu.umiacs.util.Strings;
import org.apache.log4j.Logger;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import java.security.SecureRandom;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import edu.umiacs.util.Strings;
import org.apache.log4j.Logger;
import org.apache.log4j.NDC;
import static edu.umiacs.ace.util.Submittable.RunState.QUEUED;
import static edu.umiacs.ace.util.Submittable.RunState.RUNNING;
/**w
*
......@@ -55,13 +57,9 @@ import org.apache.log4j.NDC;
public class AuditThreadFactory {
private static final Logger LOG = Logger.getLogger(AuditThreadFactory.class);
private static ConcurrentHashMap<Collection, KSFuture<AuditThread>> audits
= new ConcurrentHashMap<>();
private static int max_audits = 3;
private static final ConcurrentHashMap<Collection, AuditThread> runningAudits =
new ConcurrentHashMap<Collection, AuditThread>();
private static final LinkedBlockingQueue<Runnable> blockingQueue =
new LinkedBlockingQueue<Runnable>();
private static final ThreadPoolExecutor executor =
new ThreadPoolExecutor(3, 3, 2, TimeUnit.MINUTES, blockingQueue);
private static String imsHost = null;
private static int imsPort = 8080;
private static String tokenClass = "SHA-256";
......@@ -131,8 +129,7 @@ public class AuditThreadFactory {
}
public static boolean isAuditing() {
return executor.getActiveCount() != 0;
//return !runningThreads.isEmpty();
return !audits.isEmpty();
}
/**
......@@ -147,56 +144,60 @@ public class AuditThreadFactory {
boolean verbose,
MonitoredItem... startItem) {
// Note: Because we don't put the collection/thread in the map atomically, we need to lock
AuditThread newThread = null;
synchronized ( runningAudits ) {
boolean contains = runningAudits.contains(c);
if (!contains) {
LOG.trace("Creating new thread for " + c.getName());
newThread = new AuditThread(c, tri, auditOnly, verbose, startItem);
newThread.setImsHost(imsHost);
newThread.setImsPort(imsPort);
newThread.setTokenClassName(tokenClass);
runningAudits.put(c, newThread);
executor.execute(newThread);
}
return newThread;
CollectionThreadPoolExecutor executor = getExecutor();
LOG.trace("Creating new thread for " + c.getName());
AuditThread newThread;
newThread = new AuditThread(c, tri, auditOnly, verbose, startItem);
newThread.setImsHost(imsHost);
newThread.setImsPort(imsPort);
newThread.setTokenClassName(tokenClass);
KSFuture f = executor.submitFileAudit(c, newThread);
if (f != null) {
audits.put(c, f);
}
return null;
}
public static AuditThread getThread(Collection c) {
AuditThread thread;
thread = runningAudits.get(c);
return thread;
KSFuture<AuditThread> future = audits.get(c);
if (future != null) {
return future.getKnownResult().getThread();
}
return null;
}
public static final boolean isQueued(Collection c) {
return blockingQueue.contains(getThread(c));
return checkState(c, QUEUED);
}
public static final boolean isRunning(Collection c) {
AuditThread thread = getThread(c);
return thread != null && !blockingQueue.contains(thread);
return checkState(c, RUNNING);
}
private static boolean checkState(Collection c, Submittable.RunState state) {
KSFuture<AuditThread> future = audits.get(c);
if (future != null) {
// purge
if (future.isDone()) {
audits.remove(c);
}
Submittable s = future.getKnownResult();
if (s != null) {
return s.getState() == state;
}
}
return false;
}
static void cancellAll() {
LOG.info("Shutting down audits");
/*
for ( AuditThread thread : runningAudits.values() ) {
LOG.info("Canceling threads");
thread.cancel();
for (KSFuture submittable : audits.values()) {
submittable.cancel(true);
}
*/
executor.shutdown();
if ( !executor.isTerminated() ) {
LOG.info("Shutting down Executor");
executor.shutdownNow();
}
// Hm... this seems to work... still need more testing
for ( Runnable r : blockingQueue ) {
r = null;
}
runningAudits.clear();
blockingQueue.clear();
}
public static int getMaxAudits() {
......@@ -209,8 +210,6 @@ public class AuditThreadFactory {
return;
}
AuditThreadFactory.max_audits = max_audits;
executor.setCorePoolSize(max_audits);
executor.setMaximumPoolSize(max_audits*2);
}
public static void setSSL(Boolean ssl) {
......@@ -243,20 +242,25 @@ public class AuditThreadFactory {
// Clean up everything which may contain a reference to the thread
// Thread will only ever be removed once, so no need to worry about
// race conditions
AuditThread thread = runningAudits.remove(c);
if ( thread != null ) {
LOG.debug("Removing old audit thread from thread pool executor");
executor.remove(thread);
blockingQueue.remove(thread);
runningAudits.remove(c);
thread = null;
}
// AuditThread thread = runningAudits.remove(c);
// if ( thread != null ) {
// LOG.debug("Removing old audit thread from thread pool executor");
// executor.remove(thread);
// blockingQueue.remove(thread);
// runningAudits.remove(c);
// thread = null;
// }
KSFuture<AuditThread> future = audits.remove(c);
}
public static boolean useSSL() {
return AuditThreadFactory.ssl;
}
private static CollectionThreadPoolExecutor getExecutor() {
return CollectionThreadPoolExecutor.getExecutor();
}
private static MonitoredItem[] getSampledList(Collection c) {
EntityManager em = PersistUtil.getEntityManager();
Query q = em.createNamedQuery("MonitoredItem.listIds");
......@@ -265,7 +269,7 @@ public class AuditThreadFactory {
int size = (int) Math.ceil(Math.sqrt(itemIds.size()));
SecureRandom rand = new SecureRandom();
List<MonitoredItem> items = new LinkedList<MonitoredItem>();
List<MonitoredItem> items = new LinkedList<>();
for ( int i=0;i < size; i++) {
int idxToGet = rand.nextInt(itemIds.size());
MonitoredItem item = em.find(MonitoredItem.class, itemIds.remove(idxToGet));
......@@ -275,4 +279,16 @@ public class AuditThreadFactory {
return items.toArray(new MonitoredItem[items.size()]);
}
public static void cancel(Collection collection) {
LOG.info("Cancelling audit on " + collection.getName());
KSFuture<AuditThread> future = audits.get(collection);
if (future != null) {
Submittable<AuditThread> result = future.getKnownResult();
if (result != null) {
AuditThread thread = result.getThread();
thread.cancel();
}
future.cancel(true);
}
}
}
......@@ -32,23 +32,26 @@ package edu.umiacs.ace.monitor.audit;
import edu.umiacs.ace.ims.api.IMSService;
import edu.umiacs.ace.ims.api.TokenValidator;
import edu.umiacs.ace.util.PersistUtil;
import edu.umiacs.ace.monitor.core.Collection;
import edu.umiacs.ace.monitor.core.MonitoredItem;
import edu.umiacs.ace.monitor.core.MonitoredItemManager;
import edu.umiacs.ace.monitor.log.LogEventManager;
import edu.umiacs.ace.monitor.core.Collection;
import edu.umiacs.ace.monitor.log.LogEnum;
import edu.umiacs.ace.monitor.log.LogEventManager;
import edu.umiacs.ace.token.AceToken;
import edu.umiacs.ace.util.CollectionThreadPoolExecutor;
import edu.umiacs.ace.util.KSFuture;
import edu.umiacs.ace.util.PersistUtil;
import edu.umiacs.ace.util.Submittable;
import edu.umiacs.ace.util.TokenUtil;
import edu.umiacs.util.Strings;
import java.security.MessageDigest;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;
import javax.persistence.Query;
import org.apache.log4j.Logger;
import java.security.MessageDigest;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
*
......@@ -56,8 +59,8 @@ import org.apache.log4j.Logger;
*/
public final class AuditTokens extends Thread implements CancelCallback {
private static final Map<Collection, AuditTokens> runningThreads =
new HashMap<Collection, AuditTokens>();
private static final Map<Collection, KSFuture<AuditTokens>> runningThreads =
new ConcurrentHashMap<>();
// private Map<TokenResponse, Token> tokenMap = new ConcurrentHashMap<TokenResponse, Token>();
private Map<AceToken, MonitoredItem> itemMap =
new ConcurrentHashMap<AceToken, MonitoredItem>();
......@@ -78,7 +81,7 @@ public final class AuditTokens extends Thread implements CancelCallback {
this.collection = collection;
session = System.currentTimeMillis();
logManager = new LogEventManager(session, collection);
this.start();
// this.start();
}
/**
......@@ -91,7 +94,8 @@ public final class AuditTokens extends Thread implements CancelCallback {
public static AuditTokens getThread( Collection c ) {
synchronized ( runningThreads ) {
if ( isRunning(c) ) {
return runningThreads.get(c);
KSFuture<AuditTokens> future = runningThreads.get(c);
return getAuditTokens(future);
}
}
return null;
......@@ -101,19 +105,25 @@ public final class AuditTokens extends Thread implements CancelCallback {
* Return a new or existing thread. New threads will start replication
*
* @param c
* @param tri
* @return
*/
public static AuditTokens createThread( Collection c ) {
synchronized ( runningThreads ) {
if ( !runningThreads.containsKey(c) ) {
AuditTokens at = new AuditTokens(c);
at.imsHost = AuditThreadFactory.getIMS();
at.imsPort = AuditThreadFactory.getImsPort();
runningThreads.put(c, at);
}
return runningThreads.get(c);
CollectionThreadPoolExecutor executor = CollectionThreadPoolExecutor.getExecutor();
AuditTokens at = new AuditTokens(c);
at.imsHost = AuditThreadFactory.getIMS();
at.imsPort = AuditThreadFactory.getImsPort();
KSFuture<AuditTokens> future = executor.submitTokenAudit(c, at);
// If we already submitted, return the existing thread
// Else add the current thread to the map
if (future == null) {
KSFuture<AuditTokens> previous = runningThreads.get(c);
at = getAuditTokens(previous);
} else {
runningThreads.put(c, future);
}
return at;
}
public static final boolean isRunning( Collection c ) {
......@@ -129,11 +139,18 @@ public final class AuditTokens extends Thread implements CancelCallback {
}
static void cancellAll() {
for ( AuditTokens at : runningThreads.values() ) {
at.cancel();
for ( KSFuture<AuditTokens> at : runningThreads.values() ) {
AuditTokens thread = getAuditTokens(at);
thread.cancel();
at.cancel(true);
}
}
public static AuditTokens getAuditTokens(KSFuture<AuditTokens> future) {
Submittable<AuditTokens> result = future.getKnownResult();
return result.getThread();
}
public long getTotalErrors() {
if ( callback == null ) {
return 0;
......
......@@ -30,14 +30,15 @@
// $Id$
package edu.umiacs.ace.monitor.audit;
import edu.umiacs.ace.util.EntityManagerServlet;
import edu.umiacs.ace.monitor.core.Collection;
import edu.umiacs.ace.util.EntityManagerServlet;
import edu.umiacs.util.Strings;
import java.io.IOException;
import javax.persistence.EntityManager;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
/**
*
......@@ -71,10 +72,7 @@ public final class StopSyncServlet extends EntityManagerServlet {
AuditTokens.getThread(collection).cancel();
}
} else {
if ( AuditThreadFactory.isRunning(collection) ||
AuditThreadFactory.isQueued(collection) ) {
AuditThreadFactory.getThread(collection).cancel();
}
AuditThreadFactory.cancel(collection);
}
response.sendRedirect("Status?collectionid=" + collectionId);
......
......@@ -148,23 +148,18 @@ public class Collection implements Serializable {
@Override
public int hashCode() {
int hash = 0;
hash += (id != null ? id.hashCode() : 0);
return hash;
return id != null ? id.hashCode() : 0;
}
@Override
public boolean equals( Object object ) {
if ( !(object instanceof Collection) ) {
return false;
}
Collection other = (Collection) object;
if ( (this.id == null && other.id != null) || (this.id != null && !this.id.equals(
other.id)) ) {
return false;
}
return true;
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Collection that = (Collection) o;
// auto gen loves nots apparently
return !(id != null ? !id.equals(that.id) : that.id != null);
}
@Override
......
package edu.umiacs.ace.monitor.register;
import edu.umiacs.ace.monitor.core.Collection;
import edu.umiacs.ace.monitor.core.Token;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
/**
* A private class to supervise token ingestion. We use it to keep track of
* what collections we have seen
*/
public class IngestSupervisor implements Runnable {
private static final Logger LOG = Logger.getLogger(IngestSupervisor.class);
private static Map<Collection, Set<String>> hasSeen = new HashMap<>();
private final Map<String, Token> tokens;
private final Collection coll;
private final ForkJoinPool pool;
public IngestSupervisor(final Map<String, Token> tokens, final Collection coll) {
this.tokens = tokens;
this.coll = coll;
this.pool = new ForkJoinPool();
}
public void run() {
LOG.info("Starting Supervisor");
ForkJoinTask dirTask = pool.submit(new IngestDirectory(tokens.keySet(), coll));
// Remove any tokens we've already seen and can possibly be in progress
// Possibly release tokens after the thread has finished merging them
/*
Set<String> tokensSeen = hasSeen.get(coll);
if (tokensSeen == null) {
tokensSeen = new HashSet<>();
tokensSeen.addAll(tokens.keySet());
} else {
tokens.keySet().removeAll(hasSeen.get(coll));
tokensSeen.addAll(tokens.keySet());
}
hasSeen.put(coll, tokensSeen);
*/
// Split the token store we're given up equally among our threads
// and submit jobs to the thread pool
List<String> keyList = new ArrayList<>(tokens.keySet());
ForkJoinTask fileTask = pool.submit(new IngestThread(tokens, coll, keyList));
dirTask.quietlyJoin();
fileTask.quietlyJoin();
pool.shutdown();
LOG.info("Leaving Supervisor");
}
}
......@@ -30,27 +30,18 @@
package edu.umiacs.ace.monitor.register;
import edu.umiacs.ace.monitor.audit.AuditThreadFactory;
import edu.umiacs.ace.monitor.core.Collection;
import edu.umiacs.ace.monitor.core.Token;
import edu.umiacs.ace.monitor.settings.SettingsConstants;
import edu.umiacs.ace.token.TokenStoreEntry;
import edu.umiacs.ace.token.TokenStoreReader;
import edu.umiacs.ace.util.CollectionThreadPoolExecutor;
import edu.umiacs.ace.util.TokenUtil;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;