Commit c317a263 authored by shake's avatar shake
Browse files

Better thread safety for ingestion (ghetto semaphore -> thread pools)

git-svn-id: https://subversion.umiacs.umd.edu/ace/trunk@177 f1b3a171-7291-4a19-a512-95ad0ad9394a
parent 7d23cee8
Package: ace-am
Version: 1.7.1-SNAPSHOT
Version: 1.7.1
Section: misc
Priority: extra
Architecture: all
......
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package edu.umiacs.ace.monitor.register;
import edu.umiacs.ace.monitor.settings.SettingsParameter;
import edu.umiacs.ace.util.PersistUtil;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import org.apache.log4j.NDC;
/**
*
* @author shake
*/
public class IngestContextListener implements ServletContextListener {
public static final String PARAM_INGEST = "ingest.maxthreads";
public void contextInitialized(ServletContextEvent sce) {
NDC.push("[Ingest startup]");
System.out.println("INGEST STARTOOP");
EntityManager em = PersistUtil.getEntityManager();
Query threadQuery = em.createNamedQuery("SettingsParameter.getAttr");
threadQuery.setParameter("attr", PARAM_INGEST);
SettingsParameter s = (SettingsParameter) threadQuery.getSingleResult();
IngestThreadPool.setMaxThreads(Integer.parseInt(s.getValue()));
IngestThreadPool pool = IngestThreadPool.getInstance();
//pool.start();
//IngestThreadPool factory = new IngestThreadPool();
//factory.start();
}
public void contextDestroyed(ServletContextEvent sce) {
IngestThreadPool.shutdownPools();
IngestThreadPool pool = IngestThreadPool.getInstance();
//pool.interrupt();
}
}
......@@ -49,7 +49,7 @@ import javax.persistence.Query;
*
* @author shake
*/
public class IngestDirectory extends Thread{
public class IngestDirectory implements Runnable {
private Collection coll;
private Set<String> identifiers;
private Set<String> existingParents = new HashSet<String>();
......
......@@ -137,12 +137,12 @@ public class IngestStore extends EntityManagerServlet {
throw new ServletException("Bad upload parameters");
}
IngestThreadFactory threads = new IngestThreadFactory(batchTokens, coll);
IngestThreadPool tPool = IngestThreadPool.getInstance();
tPool.submitTokens(batchTokens, coll);
HttpSession session = request.getSession();
session.setAttribute(PAGE_RESULTS, threads);
threads.start();
session.setAttribute(PAGE_RESULTS, tPool);
dispatcher = request.getRequestDispatcher("ingestfinish.jsp");
dispatcher = request.getRequestDispatcher("ingeststatus.jsp");
dispatcher.forward(request, response);
}
......
......@@ -57,7 +57,7 @@ import javax.persistence.EntityTransaction;
*
* @author shake
*/
public class IngestThread extends Thread {
public class IngestThread implements Runnable {
// These only gets read from, never written to
private Map<String, Token> tokens;
private Collection coll;
......@@ -75,7 +75,8 @@ public class IngestThread extends Thread {
// May cause problems
private EntityManager em = PersistUtil.getEntityManager();
IngestThread(Map<String, Token> tokens, Collection coll, List<String> subList) {
public IngestThread(Map<String, Token> tokens, Collection coll,
List<String> subList) {
this.tokens = tokens;
this.coll = coll;
this.identifiers = subList;
......@@ -145,8 +146,8 @@ public class IngestThread extends Thread {
// LOG.trace does not exist
event[0] = logManager.createItemEvent(LogEnum.FILE_REGISTER,
identifier, coll.getDirectory() + identifier);
event[1] = logManager.createItemEvent(LogEnum.ADD_TOKEN, identifier,
coll.getDirectory() + identifier);
event[1] = logManager.createItemEvent(LogEnum.ADD_TOKEN,
identifier, coll.getDirectory() + identifier);
String parent = null;
parent = extractParent(mim, identifier, coll);
......@@ -183,7 +184,6 @@ public class IngestThread extends Thread {
em.close();
trans = null;
em = null;
IngestThreadFactory.release();
finished();
}
}
......@@ -210,7 +210,6 @@ public class IngestThread extends Thread {
update = true;
}
if ( update ) {
LogEvent event = logManager.createItemEvent(LogEnum.TOKEN_INGEST_UPDATE,
identifier, coll.getDirectory() + identifier);
......
......@@ -32,18 +32,17 @@ package edu.umiacs.ace.monitor.register;
import edu.umiacs.ace.monitor.core.Collection;
import edu.umiacs.ace.monitor.core.Token;
import edu.umiacs.ace.util.PersistUtil;
import java.text.DecimalFormat;
import edu.umiacs.ace.monitor.settings.SettingsConstants;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
/**
* TODO - Possibly make a LinkedBlockingQueue that threads can poll from
......@@ -53,157 +52,88 @@ import javax.persistence.EntityTransaction;
*
* @author shake
*/
public class IngestThreadFactory extends Thread{
private Map<String, Token> tokens;
private List<IngestThread> threads;
private IngestDirectory idThread;
private Collection coll;
private static int maxThreads = 4;
private static AtomicInteger semaphore = new AtomicInteger();
// private EntityTransaction trans;
// private EntityManager em = PersistUtil.getEntityManager();
public class IngestThreadPool {
private static final IngestThreadPool instance = new IngestThreadPool();
private static final Logger LOG = Logger.getLogger(IngestThreadPool.class);
public IngestThreadFactory(Map<String, Token> tokens, Collection coll){
this.tokens = tokens;
this.coll = coll;
//this.trans = em.getTransaction();
threads = new ArrayList<IngestThread>();
}
@Override
public void run() {
// Separate single thread for registering directories to avoid any race
// conditions
idThread = new IngestDirectory(tokens.keySet(), coll);
// Ingesting can be _really_ slow with large stores, so split the
// process up between 1-max threads
double numThreads = (tokens.size()/maxThreads > maxThreads) ? maxThreads
: Math.ceil(tokens.size()/maxThreads);
int begin = 0;
List<String> keyList = new ArrayList<String>(tokens.keySet());
for( int i = 0;i<numThreads;i++ ) {
// Create a new thread if we are under the threshold, else
// wait for i's thread to become available by decrementing it
if( semaphore.get() <= maxThreads ) {
semaphore.getAndIncrement();
int end = (int) (tokens.size() * ((i + 1) / numThreads));
IngestThread thread = new IngestThread(tokens, coll,
keyList.subList(begin, end));
threads.add(thread);
begin = end;
}else{
i--;
}
}
private static Map<Collection, Set<String>> hasSeen;
private static ThreadPoolExecutor threads;
private static ThreadPoolExecutor dirThread;
private static int maxThreads =
Integer.parseInt(SettingsConstants.maxIngestThreads);
private static LinkedBlockingQueue ingestQueue = new LinkedBlockingQueue();
private static LinkedBlockingQueue dirQueue = new LinkedBlockingQueue();
executeThreads();
private IngestThreadPool() {
}
public void joinThreads() {
try{
idThread.join();
for (IngestThread it: threads ) {
it.join();
}
}catch (InterruptedException ex) {
Logger.getLogger(
IngestThreadFactory.class.getName()).log(Level.SEVERE, null, ex);
}finally{
if ( !getNewTokens().isEmpty() ) {
coll.setState('E');
EntityManager em = PersistUtil.getEntityManager();
EntityTransaction trans = em.getTransaction();
trans.begin();
em.merge(coll);
trans.commit();
trans = null;
}
public static IngestThreadPool getInstance() {
// We instantiate here to ensure 2 things:
// 1 - We use the correct number for max threads from the DB
// 2 - Java will throw an exception otherwise
if ( threads == null ) {
threads = new ThreadPoolExecutor(maxThreads,
maxThreads, 1, TimeUnit.MINUTES, ingestQueue);
}
}
// For jsp
public boolean isRunning() {
boolean running = false;
for ( IngestThread it: threads) {
if (it.isRunning()){
running = true;
}
if ( dirThread == null ) {
dirThread = new ThreadPoolExecutor(1, 1, 1, TimeUnit.MINUTES, dirQueue);
}
return running;
}
public void executeThreads() {
idThread.start();
for (Thread t: threads) {
t.start();
if ( hasSeen == null ) {
hasSeen = new HashMap<Collection, Set<String>>();
}
joinThreads();
return instance;
}
public Set<String> getUpdatedTokens() {
Set<String> updated = new HashSet<String>();
for (IngestThread it: threads) {
updated.addAll(it.getUpdatedTokens());
}
return updated;
}
public void submitTokens(Map<String, Token> tokens, Collection coll) {
dirThread.execute(new IngestDirectory(tokens.keySet(), coll));
public Set<String> getNewTokens() {
Set<String> newTokens = new HashSet<String>();
for (IngestThread it: threads) {
newTokens.addAll(it.getNewTokens());
}
return newTokens;
}
double numThreads = (tokens.size()/maxThreads > maxThreads) ? maxThreads
: Math.ceil(tokens.size()/maxThreads);
public int getUpdatedTokensSize() {
int size = 0;
for (IngestThread it: threads) {
size += it.getUpdatedTokensSize();
// Remove any tokens we've already seen and can possibly be in progress
// Possibly release tokens after the thread has finished merging them
Set<String> tokensSeen = hasSeen.get(coll);
if ( tokensSeen == null ) {
tokensSeen = new HashSet<String>();
tokensSeen.addAll(tokens.keySet());
}else {
tokens.keySet().removeAll(hasSeen.get(coll));
tokensSeen.addAll(tokens.keySet());
}
return size;
}
hasSeen.put(coll, tokensSeen);
public int getNewTokensSize() {
int size = 0;
for (IngestThread it: threads) {
size += it.getNewTokensSize();
// Split the token store we're givin up equally among our threads
// and submit jobs to the thread pool
int begin = 0;
List<String> keyList = new ArrayList<String>(tokens.keySet());
for ( int idx=0; idx < numThreads; idx++) {
int end = (int) (tokens.size() * ((idx + 1) / numThreads));
threads.execute( new IngestThread(tokens, coll, keyList.subList(begin, end)));
begin = end;
}
return size; }
}
// For jsp, display approximate % of ingested tokens
public String getStatus() {
if ( tokens.isEmpty() ) {
return "There are no tokens to process";
}
if ( semaphore.get() > maxThreads ) {
return "Waiting for other ingestion threads to finish before starting";
}
int totalFinished = 0;
for (IngestThread it: threads) {
// This is only an estimate, no need to lock {new,updated}Tokens
totalFinished += it.getNewTokensSize() + it.getUpdatedTokensSize()
+ it.getUnchangedSize();
}
DecimalFormat format = new DecimalFormat("#.##");
double percent = 100*(totalFinished/(double)tokens.size());
return "Ingested " + format.format(percent) + "% of tokens";
return String.format("[Thread Pool] Active: %d, Completed: %d, Total: %d",
threads.getActiveCount(),
threads.getCompletedTaskCount(),
threads.getTaskCount());
}
public static void setMaxIngestThreads(int ingestThreads){
maxThreads = ingestThreads;
// Not entirely accurate for the jsp, but it'll show what collections are
// ingesting what
public Map<Collection, Set<String>> getIngestedItems() {
return hasSeen;
}
protected static void release(){
semaphore.getAndDecrement();
public static void setMaxThreads(int maxThreads) {
IngestThreadPool.maxThreads = maxThreads;
}
protected static void shutdownPools() {
LOG.debug("[Ingest]: Shutting down thread pools.");
threads.shutdown();
dirThread.shutdown();
}
}
......@@ -34,7 +34,6 @@ import edu.umiacs.ace.monitor.settings.SettingsParameter;
import edu.umiacs.ace.util.PersistUtil;
import edu.umiacs.util.Strings;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Properties;
import javax.mail.Message;
import javax.mail.MessagingException;
......
......@@ -85,6 +85,7 @@ public class SettingsUtil {
for ( String name : settings ) {
// TODO: Find a better way to do this
// Could possibly wrap the ID into the settings list
SettingsParameter setting = getItemByAttr(name);
SettingsParameter managedSetting =
em.find(SettingsParameter.class, setting.getId());
......
......@@ -73,6 +73,10 @@
<description>srb file opening throttle</description>
<listener-class>edu.umiacs.ace.driver.QueryThrottle</listener-class>
</listener>
<listener>
<description>Initial ingestion configuration </description>
<listener-class>edu.umiacs.ace.monitor.register.IngestContextListener</listener-class>
</listener>
<servlet>
<servlet-name>ServletAdaptor</servlet-name>
<servlet-class>com.sun.jersey.spi.container.servlet.ServletContainer</servlet-class>
......
<%--
Document : ingestfinish
Created on : Jun 11, 2012, 4:20:58 PM
Author : shake
--%>
<%@page pageEncoding="UTF-8"%>
<%@taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c"%>
<%@ taglib prefix="fn" uri="http://java.sun.com/jsp/jstl/functions" %>
<!DOCTYPE html
PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN"
"http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
<title>Ingest Store</title>
<link rel="stylesheet" type="text/css" href="style.css" />
<script type="text/javascript" src="jquery-1.7.1.min.js">
</script>
<script type="text/javascript">
jQuery(document).ready(function() {
jQuery(".content").hide();
//toggle the componenet with class msg_body
jQuery(".heading").click(function()
{
jQuery(this).next(".content").slideToggle(100);
});
});
</script>
</head>
<body>
<jsp:include page="header.jsp"/>
<div class="standardBody">
<h2>Ingestion Status</h2>
<h3>${results.status}</h3>
<div class="headingContainer">
<c:forEach items="${results.ingestedItems}" var="entry">
<p class="heading">Collection: ${entry.key.name}</p>
<div class="content">
<c:forEach items="${entry.value}" var="what">
<li>${what}</li>
</c:forEach>
</div>
</c:forEach>
</div>
<!--
<div class="headingContainer">
<p class="heading">Header-1 </p>
<div class="content">Lorem ipsum dolor sit amet, consectetuer adipiscing elit orem ipsum dolor sit amet, consectetuer adipiscing elit</div>
<p class="heading">Header-2</p>
<div class="content">Lorem ipsum dolor sit amet, consectetuer adipiscing elit orem ipsum dolor sit amet, consectetuer adipiscing elitdddd
</div>
-->
<jsp:include page="footer.jsp"/>
</body>
</html>
......@@ -192,3 +192,23 @@ vertical-align: top;
.settingsHelp {
float: left;
}
.headingContainer {
margin: 0;
padding: 0;
width: 100%;
}
.heading {
margin: 1px;
color: #000;
padding: 3px 10px;
cursor: pointer;
position: relative;
background-color:#eee;
}
.content {
padding: 5px 10px;
background-color:#fff;
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment