Commit 90ae78a1 authored by Michael Ritter's avatar Michael Ritter
Browse files

Execute threads through the CollectionThreadPoolExecutor

parent ab7fbbe0
......@@ -32,23 +32,26 @@ package edu.umiacs.ace.monitor.audit;
import edu.umiacs.ace.ims.api.IMSService;
import edu.umiacs.ace.ims.api.TokenValidator;
import edu.umiacs.ace.util.PersistUtil;
import edu.umiacs.ace.monitor.core.Collection;
import edu.umiacs.ace.monitor.core.MonitoredItem;
import edu.umiacs.ace.monitor.core.MonitoredItemManager;
import edu.umiacs.ace.monitor.log.LogEventManager;
import edu.umiacs.ace.monitor.core.Collection;
import edu.umiacs.ace.monitor.log.LogEnum;
import edu.umiacs.ace.monitor.log.LogEventManager;
import edu.umiacs.ace.token.AceToken;
import edu.umiacs.ace.util.CollectionThreadPoolExecutor;
import edu.umiacs.ace.util.KSFuture;
import edu.umiacs.ace.util.PersistUtil;
import edu.umiacs.ace.util.Submittable;
import edu.umiacs.ace.util.TokenUtil;
import edu.umiacs.util.Strings;
import java.security.MessageDigest;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;
import javax.persistence.Query;
import org.apache.log4j.Logger;
import java.security.MessageDigest;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
*
......@@ -56,8 +59,8 @@ import org.apache.log4j.Logger;
*/
public final class AuditTokens extends Thread implements CancelCallback {
private static final Map<Collection, AuditTokens> runningThreads =
new HashMap<Collection, AuditTokens>();
private static final Map<Collection, KSFuture<AuditTokens>> runningThreads =
new ConcurrentHashMap<>();
// private Map<TokenResponse, Token> tokenMap = new ConcurrentHashMap<TokenResponse, Token>();
private Map<AceToken, MonitoredItem> itemMap =
new ConcurrentHashMap<AceToken, MonitoredItem>();
......@@ -78,7 +81,7 @@ public final class AuditTokens extends Thread implements CancelCallback {
this.collection = collection;
session = System.currentTimeMillis();
logManager = new LogEventManager(session, collection);
this.start();
// this.start();
}
/**
......@@ -91,7 +94,8 @@ public final class AuditTokens extends Thread implements CancelCallback {
public static AuditTokens getThread( Collection c ) {
synchronized ( runningThreads ) {
if ( isRunning(c) ) {
return runningThreads.get(c);
KSFuture<AuditTokens> future = runningThreads.get(c);
return getAuditTokens(future);
}
}
return null;
......@@ -101,19 +105,25 @@ public final class AuditTokens extends Thread implements CancelCallback {
* Return a new or existing thread. New threads will start replication
*
* @param c
* @param tri
* @return
*/
public static AuditTokens createThread( Collection c ) {
synchronized ( runningThreads ) {
if ( !runningThreads.containsKey(c) ) {
AuditTokens at = new AuditTokens(c);
at.imsHost = AuditThreadFactory.getIMS();
at.imsPort = AuditThreadFactory.getImsPort();
runningThreads.put(c, at);
}
return runningThreads.get(c);
CollectionThreadPoolExecutor executor = CollectionThreadPoolExecutor.getExecutor();
AuditTokens at = new AuditTokens(c);
at.imsHost = AuditThreadFactory.getIMS();
at.imsPort = AuditThreadFactory.getImsPort();
KSFuture<AuditTokens> future = executor.submitTokenAudit(c, at);
// If we already submitted, return the existing thread
// Else add the current thread to the map
if (future == null) {
KSFuture<AuditTokens> previous = runningThreads.get(c);
at = getAuditTokens(previous);
} else {
runningThreads.put(c, future);
}
return at;
}
public static final boolean isRunning( Collection c ) {
......@@ -129,11 +139,18 @@ public final class AuditTokens extends Thread implements CancelCallback {
}
static void cancellAll() {
for ( AuditTokens at : runningThreads.values() ) {
at.cancel();
for ( KSFuture<AuditTokens> at : runningThreads.values() ) {
AuditTokens thread = getAuditTokens(at);
thread.cancel();
at.cancel(true);
}
}
public static AuditTokens getAuditTokens(KSFuture<AuditTokens> future) {
Submittable<AuditTokens> result = future.getKnownResult();
return result.getThread();
}
public long getTotalErrors() {
if ( callback == null ) {
return 0;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment