Commit e6c10f3e authored by Michael Ritter's avatar Michael Ritter
Browse files

#30 Log TokenAudit result after validation has been completed

parent 5d6453a0
......@@ -55,46 +55,40 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
*
* @author toaster
*/
public final class AuditTokens extends Thread implements CancelCallback {
private static final Map<Collection, KSFuture<AuditTokens>> runningThreads =
new ConcurrentHashMap<>();
// private Map<TokenResponse, Token> tokenMap = new ConcurrentHashMap<TokenResponse, Token>();
private Map<AceToken, MonitoredItem> itemMap =
new ConcurrentHashMap<>();
new ConcurrentHashMap<>();
private static final Logger LOG = Logger.getLogger(AuditTokens.class);
private Collection collection;
private boolean cancel = false;
private boolean hasRun = false;
private long session;
// private long totalErrors = 0;
private long tokensSeen = 0;
// private long validTokens = 0;
private TokenAuditCallback callback;
private String imsHost;
private int imsPort;
LogEventManager logManager;
private LogEventManager logManager;
private AuditTokens( Collection collection ) {
private AuditTokens(Collection collection) {
this.collection = collection;
session = System.currentTimeMillis();
logManager = new LogEventManager(session, collection);
// this.start();
}
/**
* Return the current thread for a collection.
*
* @param c collection to fetch
*
* @return current running thread or null if nothing is running
*
*/
public static AuditTokens getThread( Collection c ) {
synchronized ( runningThreads ) {
if ( isRunning(c) ) {
public static AuditTokens getThread(Collection c) {
synchronized (runningThreads) {
if (isRunning(c)) {
KSFuture<AuditTokens> future = runningThreads.get(c);
return getAuditTokens(future);
}
......@@ -104,11 +98,11 @@ public final class AuditTokens extends Thread implements CancelCallback {
/**
* Return a new or existing thread. New threads will start replication
*
* @param c
* @return
*
* @param c the collection to create a thread for
* @return The AuditTokens thread
*/
public static AuditTokens createThread( Collection c ) {
public static AuditTokens createThread(Collection c) {
CollectionThreadPoolExecutor executor = CollectionThreadPoolExecutor.getExecutor();
AuditTokens at = new AuditTokens(c);
at.imsHost = AuditThreadFactory.getIMS();
......@@ -127,8 +121,8 @@ public final class AuditTokens extends Thread implements CancelCallback {
return at;
}
public static final boolean isRunning( Collection c ) {
synchronized ( runningThreads ) {
public static boolean isRunning(Collection c) {
synchronized (runningThreads) {
return runningThreads.containsKey(c);
}
}
......@@ -140,7 +134,7 @@ public final class AuditTokens extends Thread implements CancelCallback {
}
static void cancellAll() {
for ( KSFuture<AuditTokens> at : runningThreads.values() ) {
for (KSFuture<AuditTokens> at : runningThreads.values()) {
AuditTokens thread = getAuditTokens(at);
thread.cancel();
at.cancel(true);
......@@ -153,7 +147,7 @@ public final class AuditTokens extends Thread implements CancelCallback {
}
public long getTotalErrors() {
if ( callback == null ) {
if (callback == null) {
return 0;
}
return callback.getTotalErrors();
......@@ -164,7 +158,7 @@ public final class AuditTokens extends Thread implements CancelCallback {
}
public long getValidTokens() {
if ( callback == null ) {
if (callback == null) {
return 0;
}
return callback.getValidTokens();
......@@ -173,7 +167,7 @@ public final class AuditTokens extends Thread implements CancelCallback {
@Override
public void run() {
if ( hasRun ) {
if (hasRun) {
LOG.fatal("Cannot run thread, already executed");
throw new IllegalStateException("Thread has already run");
}
......@@ -184,12 +178,11 @@ public final class AuditTokens extends Thread implements CancelCallback {
doWork();
} catch ( Throwable e ) {
} catch (Throwable e) {
LOG.fatal("UNcaught exception in doWork()", e);
} finally {
itemMap.clear(); // free memory in case this gets stuck hanging around
}
}
private void markMissingTokensOffline() {
......@@ -207,20 +200,20 @@ public final class AuditTokens extends Thread implements CancelCallback {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
IMSService ims;
ims = IMSService.connect(imsHost,
imsPort,
AuditThreadFactory.useSSL(),
AuditThreadFactory.isBlocking(),
AuditThreadFactory.getMaxBlockTime());
imsPort,
AuditThreadFactory.useSSL(),
AuditThreadFactory.isBlocking(),
AuditThreadFactory.getMaxBlockTime());
callback = new TokenAuditCallback(itemMap, this, collection, session);
validator = ims.createTokenValidator(callback, 1000, 5000,
digest);
} catch ( Throwable e ) {
} catch (Throwable e) {
EntityManager em;
LOG.error("Cannot connect to IMS ", e);
LogEventManager lem;
em = PersistUtil.getEntityManager();
lem = new LogEventManager(session,collection);
lem = new LogEventManager(session, collection);
EntityTransaction trans = em.getTransaction();
trans.begin();
em.persist(lem.createCollectionEvent(LogEnum.UNKNOWN_IMS_COMMUNICATION_ERROR, e));
......@@ -231,17 +224,18 @@ public final class AuditTokens extends Thread implements CancelCallback {
}
private void doWork() {
EntityManager em;
Throwable thrown = null;
boolean interrupted = false;
TokenValidator validator = openIms();
if ( validator == null ) {
if (validator == null) {
runningThreads.remove(collection);
return;
}
em = PersistUtil.getEntityManager();
logManager.persistCollectionEvent(LogEnum.TOKEN_AUDIT_START, null,em);
// new LogEventManager(em, session).tokenAuditStart(collection);
logManager.persistCollectionEvent(LogEnum.TOKEN_AUDIT_START, null, em);
em.close();
em = null;
......@@ -254,15 +248,15 @@ public final class AuditTokens extends Thread implements CancelCallback {
q.setParameter("coll", collection);
for ( Object o : q.getResultList() ) {
for (Object o : q.getResultList()) {
MonitoredItem item = (MonitoredItem) o;
AceToken t = TokenUtil.convertToAceToken(item.getToken());
if ( t != null ) {
if (t != null) {
itemMap.put(t, item);
tokensSeen++;
validator.add(item.getFileDigest(), t);
}
if ( cancel ) {
if (cancel) {
break;
}
}
......@@ -273,40 +267,43 @@ public final class AuditTokens extends Thread implements CancelCallback {
// mark unavailable tokens offline
markMissingTokensOffline();
em = PersistUtil.getEntityManager();
// finished
logManager.persistCollectionEvent(LogEnum.TOKEN_AUDIT_FINISH, "successful audit", em);
em.close();
em = null;
setCollectionState();
LOG.trace(
"Token Audit ending successfully for " + collection.getName());
} catch ( Throwable t ) {
} catch (Throwable t) {
LOG.error("Unexpected error auditing tokens", t);
logManager.persistCollectionEvent(LogEnum.SYSTEM_ERROR, Strings.exceptionAsString(t),em);
thrown = t;
interrupted = true;
} finally {
validator.close();
if ( em != null ) {
if (em != null) {
em.close();
}
LOG.trace("Validator closed");
synchronized ( runningThreads ) {
synchronized (runningThreads) {
runningThreads.remove(collection);
}
}
setCollectionState();
if (interrupted) {
em = PersistUtil.getEntityManager();
logManager.persistCollectionEvent(
LogEnum.SYSTEM_ERROR, Strings.exceptionAsString(thrown), em);
em.close();
} else {
// finished successfully
em = PersistUtil.getEntityManager();
logManager.persistCollectionEvent(
LogEnum.TOKEN_AUDIT_FINISH, "successful audit", em);
em.close();
LOG.trace("Token Audit ending successfully for " + collection.getName());
}
}
}
private void setCollectionState() {
EntityManager em = PersistUtil.getEntityManager();
MonitoredItemManager mim = new MonitoredItemManager(em);
if ( mim.countErrorsInCollection(collection) == 0 ) {
if (mim.countErrorsInCollection(collection) == 0) {
collection.setState(CollectionState.ACTIVE);
} else {
collection.setState(CollectionState.ERROR);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment