Commit 45e53fd0 authored by Michael Ritter's avatar Michael Ritter
Browse files

#50 Support for introspection into Token Ingestion

parent cf47ae01
......@@ -5,18 +5,11 @@
package edu.umiacs.ace.monitor.register;
import edu.umiacs.ace.monitor.settings.SettingsConstants;
import edu.umiacs.ace.monitor.settings.SettingsParameter;
import edu.umiacs.ace.monitor.settings.SettingsUtil;
import edu.umiacs.ace.util.PersistUtil;
import org.apache.log4j.NDC;
import javax.persistence.EntityManager;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import static edu.umiacs.ace.monitor.settings.SettingsConstants.PARAM_INGEST;
/**
*
* @author shake
......@@ -25,10 +18,6 @@ public class IngestContextListener implements ServletContextListener {
public void contextInitialized(ServletContextEvent sce) {
NDC.push("[Ingest startup]");
EntityManager em = PersistUtil.getEntityManager();
SettingsParameter ingestSettings = SettingsUtil.getOrDefault(PARAM_INGEST,
SettingsConstants.maxIngestThreads, em);
IngestThreadPool.setMaxThreads(Integer.parseInt(ingestSettings.getValue()));
NDC.pop();
}
......
......@@ -34,18 +34,17 @@ import edu.umiacs.ace.monitor.core.Collection;
import edu.umiacs.ace.monitor.core.MonitoredItem;
import edu.umiacs.ace.util.PersistUtil;
import edu.umiacs.util.Strings;
import org.apache.log4j.Logger;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;
import javax.persistence.NoResultException;
import javax.persistence.Query;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.RecursiveAction;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;
import javax.persistence.NoResultException;
import javax.persistence.Query;
/**
* Class to register directories from a token store
......@@ -53,14 +52,13 @@ import javax.persistence.Query;
* @author shake
*/
public class IngestDirectory extends RecursiveAction {
private static final Logger LOG = Logger.getLogger(IngestDirectory.class);
private Collection coll;
private Set<String> identifiers;
private Set<String> existingParents = new HashSet<String>();
private Set<String> existingParents = new HashSet<>();
private EntityManager em = PersistUtil.getEntityManager();
private int numTransactions = 0;
public IngestDirectory(Set<String> identifiers , Collection coll){
public IngestDirectory(Set<String> identifiers, Collection coll) {
this.identifiers = identifiers;
this.coll = coll;
}
......@@ -68,13 +66,13 @@ public class IngestDirectory extends RecursiveAction {
@Override
protected void compute() {
// We want this to remain single threaded, so we just leave it be
if ( identifiers == null || coll == null ) {
if (identifiers == null || coll == null) {
return;
}
EntityTransaction trans = em.getTransaction();
trans.begin();
for ( String identifier : identifiers ) {
for (String identifier : identifiers) {
extractAndRegisterParentDirs(identifier);
}
trans.commit();
......@@ -82,31 +80,30 @@ public class IngestDirectory extends RecursiveAction {
private void extractAndRegisterParentDirs(String path) {
// We don't have a FileBean, so build the pathList ourselves
int index;
List<String> pathList = new LinkedList<>();
StringBuilder fullPath = new StringBuilder(path);
List <String> pathList = new LinkedList<String>();
if ( fullPath.charAt(0) != '/' ) {
if (fullPath.charAt(0) != '/') {
fullPath.insert(0, "/");
}
int index = 0;
while( (index = fullPath.lastIndexOf("/")) != 0 ) {
while ((index = fullPath.lastIndexOf("/")) != 0) {
pathList.add(fullPath.toString());
fullPath.delete(index, fullPath.length());
}
pathList.add(fullPath.toString());
// Same as AuditThread, but with our pathList
String parentName = (pathList.size() > 1
? pathList.get(1) : null);
String parentName = pathList.size() > 1 ? pathList.get(1) : null;
// 1. make sure directory path is registered
if (parentName != null) {
//parentName = Strings.cleanStringForXml(parentName, '_');
for ( int i = 1; i < pathList.size(); i++) {
String parent = (pathList.size() > i + 1 ? pathList.get(i+1) : null);
for (int i = 1; i < pathList.size(); i++) {
String parent = (pathList.size() > i + 1 ? pathList.get(i + 1) : null);
parent = Strings.cleanStringForXml(parent, '_');
createDirectory(pathList.get(i), parent);
if ( numTransactions % 30 == 0 ) {
if (numTransactions % 30 == 0) {
em.flush();
em.clear();
}
......@@ -116,10 +113,10 @@ public class IngestDirectory extends RecursiveAction {
private void createDirectory(String directory, String root) {
MonitoredItem mi;
if ( existingParents.contains(directory) || directory == null ) {
if (existingParents.contains(directory) || directory == null) {
return;
}
if ( (mi = getItemByPath(directory)) != null ) {
if ((mi = getItemByPath(directory)) != null) {
Date d = new Date();
mi.setLastSeen(d);
mi.setLastVisited(d);
......@@ -133,13 +130,13 @@ public class IngestDirectory extends RecursiveAction {
}
public MonitoredItem getItemByPath( String path ) {
public MonitoredItem getItemByPath(String path) {
Query q = em.createNamedQuery("MonitoredItem.getItemByPath");
q.setParameter("path", path);
q.setParameter("coll", coll);
try {
return (MonitoredItem) q.getSingleResult();
} catch ( NoResultException ex ) {
} catch (NoResultException ex) {
return null;
}
......@@ -161,9 +158,9 @@ public class IngestDirectory extends RecursiveAction {
mi.setPath(path);
mi.setState(initialState);
mi.setSize(size);
em.persist(mi);
numTransactions++;
return mi;
}
......
......@@ -5,57 +5,97 @@ import edu.umiacs.ace.monitor.core.Token;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
/**
* A private class to supervise token ingestion. We use it to keep track of
* what collections we have seen
*
* @author shake
*/
public class IngestSupervisor implements Runnable {
private static final Logger LOG = Logger.getLogger(IngestSupervisor.class);
private static Map<Collection, Set<String>> hasSeen = new HashMap<>();
private final Map<String, Token> tokens;
private final Collection coll;
private final ForkJoinPool pool;
// I wonder if we should combine these and have some type of work unit to encapsulate
// Token, IngestState
private final Map<String, Token> tokens;
private ConcurrentMap<IngestState, ConcurrentSkipListSet<String>> states;
public IngestSupervisor(final Map<String, Token> tokens, final Collection coll) {
this.tokens = tokens;
this.coll = coll;
this.pool = new ForkJoinPool();
this.states = new ConcurrentHashMap<>();
// so we don't have to worry about npes
states.put(IngestState.NEW, new ConcurrentSkipListSet<>());
states.put(IngestState.MATCH, new ConcurrentSkipListSet<>());
states.put(IngestState.QUEUED, new ConcurrentSkipListSet<>());
states.put(IngestState.UPDATED, new ConcurrentSkipListSet<>());
}
public void run() {
LOG.info("Starting Supervisor");
ForkJoinTask dirTask = pool.submit(new IngestDirectory(tokens.keySet(), coll));
// Remove any tokens we've already seen and can possibly be in progress
// Possibly release tokens after the thread has finished merging them
/*
Set<String> tokensSeen = hasSeen.get(coll);
if (tokensSeen == null) {
tokensSeen = new HashSet<>();
tokensSeen.addAll(tokens.keySet());
} else {
tokens.keySet().removeAll(hasSeen.get(coll));
tokensSeen.addAll(tokens.keySet());
}
hasSeen.put(coll, tokensSeen);
*/
ConcurrentSkipListSet<String> queued = states.get(IngestState.QUEUED);
queued.addAll(tokens.keySet());
ForkJoinTask dirTask = pool.submit(new IngestDirectory(tokens.keySet(), coll));
// Split the token store we're given up equally among our threads
// and submit jobs to the thread pool
List<String> keyList = new ArrayList<>(tokens.keySet());
ForkJoinTask fileTask = pool.submit(new IngestThread(tokens, coll, keyList));
ForkJoinTask fileTask = pool.submit(new IngestThread(tokens, coll, keyList, states));
dirTask.quietlyJoin();
fileTask.quietlyJoin();
pool.shutdown();
LOG.info("Leaving Supervisor");
}
public ConcurrentMap<IngestState, ConcurrentSkipListSet<String>> getState() {
return states;
}
// jsp helpers
public int getQueuedSize() {
return states.get(IngestState.QUEUED).size();
}
public Set<String> getQueued() {
return states.get(IngestState.QUEUED);
}
public int getNewSize() {
return states.get(IngestState.NEW).size();
}
public Set<String> getNewItems() {
return states.get(IngestState.NEW);
}
public int getUpdatedSize() {
return states.get(IngestState.UPDATED).size();
}
public Set<String> getUpdated() {
return states.get(IngestState.UPDATED);
}
public int getMatchSize() {
return states.get(IngestState.MATCH).size();
}
public Set<String> getMatched() {
return states.get(IngestState.MATCH);
}
}
......@@ -46,11 +46,11 @@ import org.apache.log4j.Logger;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.RecursiveAction;
/**
......@@ -67,58 +67,29 @@ public class IngestThread extends RecursiveAction {
private Collection coll;
private List<String> identifiers;
// Unique to each IngestThread
private boolean running = true;
// Writable map for updating the state of items
private ConcurrentMap<IngestState, ConcurrentSkipListSet<String>> states;
private long session;
private LogEventManager logManager;
private Set<String> updatedTokens;
private Set<String> newTokens;
private Set<String> unchangedTokens;
private int numTransactions = 0;
private LogEventManager logManager;
// May cause problems
private EntityManager em;
public IngestThread(Map<String,
Token> tokens,
public IngestThread(Map<String, Token> tokens,
Collection coll,
List<String> subList) {
this.tokens = tokens;
List<String> subList,
ConcurrentMap<IngestState, ConcurrentSkipListSet<String>> states) {
this.coll = coll;
this.tokens = tokens;
this.identifiers = subList;
}
private void finished() {
running = false;
}
public boolean isRunning() {
return running;
}
public Set<String> getUpdatedTokens() {
return updatedTokens;
}
public Set<String> getNewTokens() {
return newTokens;
}
public int getUpdatedTokensSize() {
return updatedTokens.size();
}
public int getNewTokensSize() {
return newTokens.size();
}
public int getUnchangedSize() {
return unchangedTokens.size();
this.states = states;
}
@Override
protected void compute() {
if ( identifiers == null || coll == null ) {
if (identifiers == null || coll == null) {
return;
}
......@@ -135,127 +106,123 @@ public class IngestThread extends RecursiveAction {
}
} else {
int mid = identifiers.size() >>> 1;
invokeAll(new IngestThread(tokens, coll, identifiers.subList(0, mid)),
new IngestThread(tokens, coll, identifiers.subList(mid, identifiers.size())));
invokeAll(new IngestThread(tokens, coll, identifiers.subList(0, mid), states),
new IngestThread(tokens, coll, identifiers.subList(mid, identifiers.size()), states));
}
}
public void run() {
updatedTokens = new HashSet<>();
newTokens = new HashSet<>();
unchangedTokens = new HashSet<>();
MonitoredItemManager mim = new MonitoredItemManager(em);
MonitoredItem item = null;
MonitoredItem item;
session = System.currentTimeMillis();
logManager = new LogEventManager(session, coll);
MonitoredItemManager mim = new MonitoredItemManager(em);
// Cycle through all items read in and add/update tokens
// Commit only if there are no errors in all transactions
try{
for(String identifier: identifiers) {
Token token = tokens.get(identifier);
item = mim.getItemByPath(identifier, coll);
if ( item == null ) {
LOG.debug("[Ingest Thread " + Thread.currentThread().getId()
+ "] Adding new item " + identifier);
LogEvent[] event = new LogEvent[2];
// LOG.trace does not exist
event[0] = logManager.createItemEvent(LogEnum.FILE_REGISTER,
identifier, coll.getDirectory() + identifier);
event[1] = logManager.createItemEvent(LogEnum.ADD_TOKEN,
identifier, coll.getDirectory() + identifier);
String parent = null;
parent = extractParent(mim, identifier, coll);
item = addItem(identifier, parent, false, coll, 'R', 0);
token.setParentCollection(coll);
// Token
// em.persist(token);
item.setToken(token);
//Finish adding the item
em.persist(event[0]);
em.persist(event[1]);
em.persist(item);
numTransactions += 3;
newTokens.add(identifier);
}else{
LOG.debug("[Ingest Thread " + Thread.currentThread().getId()
+ "] Updating existing item " + identifier);
updateToken(em, token, item, coll, identifier);
}
ConcurrentSkipListSet<String> queued = states.get(IngestState.QUEUED);
for (String identifier : identifiers) {
queued.remove(identifier);
Token token = tokens.get(identifier);
item = mim.getItemByPath(identifier, coll);
if (item == null) {
LOG.debug("[Ingest Thread " + Thread.currentThread().getId()
+ "] Adding new item " + identifier);
LogEvent[] event = new LogEvent[2];
// LOG.trace does not exist
event[0] = logManager.createItemEvent(LogEnum.FILE_REGISTER,
identifier, coll.getDirectory() + identifier);
event[1] = logManager.createItemEvent(LogEnum.ADD_TOKEN,
identifier, coll.getDirectory() + identifier);
String parent;
parent = extractParent(identifier);
item = addItem(identifier, parent, false, coll, 'R', 0);
token.setParentCollection(coll);
// Token
item.setToken(token);
//Finish adding the item
em.persist(event[0]);
em.persist(event[1]);
em.persist(item);
numTransactions += 3;
// stateMap.put(identifier, IngestState.NEW);
states.get(IngestState.NEW).add(identifier);
} else {
LOG.debug("[Ingest Thread " + Thread.currentThread().getId()
+ "] Updating existing item " + identifier);
updateToken(em, token, item, coll, identifier);
}
// With large Token Stores, we get a large number of transactions
// Flushing and Clearing the EM helps to clear some memory
// TODO: W/ fork join this isn't needed anymore, unless we want to flush at a lower number
if ( numTransactions > 30 ) {
em.flush();
em.clear();
numTransactions = 0;
}
// With large Token Stores, we get a large number of transactions
// Flushing and Clearing the EM helps to clear some memory
if (numTransactions > 30) {
em.flush();
em.clear();
numTransactions = 0;
}
}finally{
finished();
}
}
// If we have a monitored item already in the database, check against the
// new token and update if necessary
private void updateToken(EntityManager em, Token token, MonitoredItem item,
Collection coll, String identifier) {
private void updateToken(EntityManager em,
Token token,
MonitoredItem item,
Collection coll,
String identifier) {
boolean update = false;
Token registeredToken = item.getToken();
if ( registeredToken != null ) {
if (registeredToken != null) {
token.setParentCollection(coll);
// TODO: Find a way to compare tokens w/o converting to AceTokens
// Opted not to use token.equals because we want to compare the
// proof text
// Opted not to use token.equals because we want to compare the proof text
AceToken registeredAceToken = TokenUtil.convertToAceToken(registeredToken);
AceToken aceToken = TokenUtil.convertToAceToken(token);
if ( !registeredAceToken.getProof().equals(aceToken.getProof()) ) {
if (!registeredAceToken.getProof().equals(aceToken.getProof())) {
update = true;
}
}else{
} else {
update = true;
}
if ( update ) {
if (update) {
LogEvent event = logManager.createItemEvent(LogEnum.TOKEN_INGEST_UPDATE,
identifier, coll.getDirectory() + identifier);
// em.persist(token);
item.setToken(token);
// TODO: Why set 'I'? It's not necessarily invalid, maybe 'R' would be better
// or even better yet 'UpdatedToken'!
item.setState('I');
em.merge(item);
em.persist(event);
numTransactions += 2;
updatedTokens.add(identifier);
}else{
unchangedTokens.add(identifier);
states.get(IngestState.UPDATED).add(identifier);
// stateMap.put(identifier, IngestState.UPDATED);
} else {
states.get(IngestState.MATCH).add(identifier);
// stateMap.put(identifier, IngestState.MATCH);
}
}
// From MonitoredItemManager, but without any registration
// Can probably be trimmed down
private String extractParent(MonitoredItemManager mim,
String path, Collection coll) {
private String extractParent(String path) {
// We don't have a FileBean, so build the pathList ourselves
StringBuilder fullPath = new StringBuilder(path);
List <String> pathList = new LinkedList<>();
int index = 0;
List<String> pathList = new LinkedList<>();
int index;
if (fullPath.charAt(0) != '/') {
fullPath.insert(0, "/");
}
while( (index = fullPath.lastIndexOf("/")) != 0 ) {
//System.out.println(fullPath);
while ((index = fullPath.lastIndexOf("/")) != 0) {
pathList.add(fullPath.toString());
fullPath.delete(index, fullPath.length());
}
......@@ -274,8 +241,12 @@ public class IngestThread extends RecursiveAction {
}
// MIM method without transaction
public MonitoredItem addItem( String path, String parentDir,boolean directory,
Collection parentCollection, char initialState, long size ) {
private MonitoredItem addItem(String path,
String parentDir,
boolean directory,
Collection parentCollection,
char initialState,
long size) {
MonitoredItem mi = new MonitoredItem();
mi.setDirectory(directory);
mi.setLastSeen(new Date());
......@@ -286,11 +257,6 @@ public class IngestThread extends RecursiveAction {
mi.setPath(path);
mi.setState(initialState);
mi.setSize(size);
// em.persist(mi);
</