Commit 0fab49e4 authored by shake's avatar shake
Browse files

Fixed UTF-8 bug in digests header.

Updated the ingest store to limit thread creation based on the maximum number of threads allowed.

git-svn-id: https://subversion.umiacs.umd.edu/ace/trunk@162 f1b3a171-7291-4a19-a512-95ad0ad9394a
parent 51550ce0
...@@ -105,7 +105,8 @@ public class ListContentsServlet extends EntityManagerServlet { ...@@ -105,7 +105,8 @@ public class ListContentsServlet extends EntityManagerServlet {
header.append(":"); header.append(":");
header.append(mi.getPath()); header.append(mi.getPath());
} }
os.println(header.toString()); os.write(header.toString().getBytes("UTF-8"));
os.println();
} else if ( TYPE_CHECKM.equals(output) ) { } else if ( TYPE_CHECKM.equals(output) ) {
os.println("#%checkm_0.7"); os.println("#%checkm_0.7");
......
...@@ -183,6 +183,7 @@ public class IngestThread extends Thread { ...@@ -183,6 +183,7 @@ public class IngestThread extends Thread {
em.close(); em.close();
trans = null; trans = null;
em = null; em = null;
IngestThreadFactory.release();
finished(); finished();
} }
} }
......
...@@ -39,12 +39,17 @@ import java.util.HashSet; ...@@ -39,12 +39,17 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.persistence.EntityManager; import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction; import javax.persistence.EntityTransaction;
/** /**
* TODO - Possibly make a LinkedBlockingQueue that threads can poll from
* to get items to ingest
* - What happens when maxThreads is changed in the middle of ingestion?
* - May need to lock it
* *
* @author shake * @author shake
*/ */
...@@ -53,6 +58,8 @@ public class IngestThreadFactory extends Thread{ ...@@ -53,6 +58,8 @@ public class IngestThreadFactory extends Thread{
private List<IngestThread> threads; private List<IngestThread> threads;
private IngestDirectory idThread; private IngestDirectory idThread;
private Collection coll; private Collection coll;
private static int maxThreads = 4;
private static AtomicInteger semaphore = new AtomicInteger();
// private EntityTransaction trans; // private EntityTransaction trans;
// private EntityManager em = PersistUtil.getEntityManager(); // private EntityManager em = PersistUtil.getEntityManager();
...@@ -62,31 +69,38 @@ public class IngestThreadFactory extends Thread{ ...@@ -62,31 +69,38 @@ public class IngestThreadFactory extends Thread{
this.coll = coll; this.coll = coll;
//this.trans = em.getTransaction(); //this.trans = em.getTransaction();
threads = new ArrayList<IngestThread>(); threads = new ArrayList<IngestThread>();
}
@Override
public void run() {
// Separate single thread for registering directories to avoid any race // Separate single thread for registering directories to avoid any race
// conditions // conditions
idThread = new IngestDirectory(tokens.keySet(), coll); idThread = new IngestDirectory(tokens.keySet(), coll);
// Ingesting can be _really_ slow with large stores, so split the // Ingesting can be _really_ slow with large stores, so split the
// process up between 1-4 threads // process up between 1-max threads
double numThreads = (tokens.size()/4.0 > 4) ? 4.0 : Math.ceil(tokens.size()/4.0); double numThreads = (tokens.size()/maxThreads > maxThreads) ? maxThreads
: Math.ceil(tokens.size()/maxThreads);
int begin = 0; int begin = 0;
List<String> keyList = new ArrayList<String>(tokens.keySet()); List<String> keyList = new ArrayList<String>(tokens.keySet());
for( int i = 0;i<numThreads;i++ ) { for( int i = 0;i<numThreads;i++ ) {
int end = (int) (tokens.size() * ((i + 1) / numThreads)); // Create a new thread if we are under the threshold, else
IngestThread thread = new IngestThread(tokens, coll, // wait for i's thread to become available by decrementing it
keyList.subList(begin, end)); if( semaphore.get() <= maxThreads ) {
threads.add(thread); semaphore.getAndIncrement();
begin = end; int end = (int) (tokens.size() * ((i + 1) / numThreads));
IngestThread thread = new IngestThread(tokens, coll,
keyList.subList(begin, end));
threads.add(thread);
begin = end;
}else{
i--;
}
} }
}
@Override
public void run() {
executeThreads(); executeThreads();
} }
...@@ -167,6 +181,11 @@ public class IngestThreadFactory extends Thread{ ...@@ -167,6 +181,11 @@ public class IngestThreadFactory extends Thread{
if ( tokens.isEmpty() ) { if ( tokens.isEmpty() ) {
return "There are no tokens to process"; return "There are no tokens to process";
} }
if ( semaphore.get() > maxThreads ) {
return "Waiting for other ingestion threads to finish before starting";
}
int totalFinished = 0; int totalFinished = 0;
for (IngestThread it: threads) { for (IngestThread it: threads) {
// This is only an estimate, no need to lock {new,updated}Tokens // This is only an estimate, no need to lock {new,updated}Tokens
...@@ -179,4 +198,12 @@ public class IngestThreadFactory extends Thread{ ...@@ -179,4 +198,12 @@ public class IngestThreadFactory extends Thread{
return "Ingested " + format.format(percent) + "% of tokens"; return "Ingested " + format.format(percent) + "% of tokens";
} }
public static void setMaxIngestThreads(int ingestThreads){
maxThreads = ingestThreads;
}
protected static void release(){
semaphore.getAndDecrement();
}
} }
...@@ -28,6 +28,7 @@ public class SettingsConstants { ...@@ -28,6 +28,7 @@ public class SettingsConstants {
"log4j.appender.A1.layout.ConversionPattern"; "log4j.appender.A1.layout.ConversionPattern";
public static final String PARAM_4J_IRODS = "log4j.logger.edu.umiacs.irods"; public static final String PARAM_4J_IRODS = "log4j.logger.edu.umiacs.irods";
public static final String PARAM_4J_CLASS ="log4j.logger.edu.umiacs"; public static final String PARAM_4J_CLASS ="log4j.logger.edu.umiacs";
public static final String PARAM_INGEST = "ingest.maxthreads";
// Default Values // Default Values
public static final String mailServer = "localhost.localdomain"; public static final String mailServer = "localhost.localdomain";
......
...@@ -131,6 +131,8 @@ public class SettingsUtil { ...@@ -131,6 +131,8 @@ public class SettingsUtil {
SettingsConstants.log4JA1Layout,false)); SettingsConstants.log4JA1Layout,false));
defaults.add(new SettingsParameter(SettingsConstants.PARAM_4J_ROOT_LOGGER, defaults.add(new SettingsParameter(SettingsConstants.PARAM_4J_ROOT_LOGGER,
SettingsConstants.log4JRootLogger,false)); SettingsConstants.log4JRootLogger,false));
defaults.add(new SettingsParameter(SettingsConstants.PARAM_INGEST,
SettingsConstants.maxIngestThreads, false));
return defaults; return defaults;
} }
......
...@@ -62,6 +62,11 @@ ...@@ -62,6 +62,11 @@
<div class="settingsVal"><input type=text name="auth.management" value="${currSettings['auth.management']}"/></div> <div class="settingsVal"><input type=text name="auth.management" value="${currSettings['auth.management']}"/></div>
<div class="settingsHelp"><img src="images/help.png" title="Set this to true to disable internal user management. This should only be used in conjunction with changes to the Authentication realm listed above."></div> <div class="settingsHelp"><img src="images/help.png" title="Set this to true to disable internal user management. This should only be used in conjunction with changes to the Authentication realm listed above."></div>
</div> </div>
<div class="settingsRow">
<div class="settingsName">Max Ingest Threads:</div>
<div class="settingsVal"><input type=text name="ingest.maxthreads" value="${currSettings['ingest.maxthreads']}"/></div>
<div class="settingsHelp"><img src="images/help.png" title="The maximum number of threads allowed for ingesting token stores."></div>
</div>
<div class="settingsRow"> <div class="settingsRow">
<div class="settingsName">Log Location:</div> <div class="settingsName">Log Location:</div>
<div class="settingsVal"><input type=text name="log4j.appender.A1.File" value="${currSettings['log4j.appender.A1.File']}"/></div> <div class="settingsVal"><input type=text name="log4j.appender.A1.File" value="${currSettings['log4j.appender.A1.File']}"/></div>
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment