Commit ff061d54 authored by shake's avatar shake
Browse files

Added support for sampling collections using sqrt(n) samples.

git-svn-id: https://subversion.umiacs.umd.edu/ace/trunk@202 f1b3a171-7291-4a19-a512-95ad0ad9394a
parent 0119b679
......@@ -63,13 +63,13 @@ import org.apache.log4j.NDC;
*/
public final class AuditConfigurationContext implements ServletContextListener {
private static final String PARAM_IMS = "ims";
private static final String PARAM_IMS_PORT = "ims.port";
private static final String PARAM_IMS_TOKEN_CLASS = "ims.tokenclass";
private static final String PARAM_IMS_SSL = "ims.ssl";
private static final String PARAM_DISABLE_AUDIT = "auto.audit.disable";
private static final String PARAM_THROTTLE_MAXAUDIT = "throttle.maxaudit";
private static final String PARAM_AUDIT_ONLY = "audit.only";
//private static final String PARAM_IMS = "ims";
//private static final String PARAM_IMS_PORT = "ims.port";
//private static final String PARAM_IMS_TOKEN_CLASS = "ims.tokenclass";
//private static final String PARAM_IMS_SSL = "ims.ssl";
//private static final String PARAM_DISABLE_AUDIT = "auto.audit.disable";
//private static final String PARAM_THROTTLE_MAXAUDIT = "throttle.maxaudit";
//private static final String PARAM_AUDIT_ONLY = "audit.only";
public static final String ATTRIBUTE_PAUSE = "pause";
private static final long HOUR = 1000 * 60 * 60;
private Timer checkTimer;
......@@ -83,14 +83,14 @@ public final class AuditConfigurationContext implements ServletContextListener {
ServletContext ctx = arg0.getServletContext();
// set IMS for audit Thread from server parameter
q.setParameter("attr", PARAM_IMS);
q.setParameter("attr", SettingsConstants.PARAM_IMS);
s = (SettingsParameter) q.getSingleResult();
AuditThreadFactory.setIMS(s.getValue());
if ( Strings.isEmpty(AuditThreadFactory.getIMS()) ) {
throw new RuntimeException("IMS is empty");
}
q.setParameter("attr", PARAM_IMS_TOKEN_CLASS);
q.setParameter("attr", SettingsConstants.PARAM_IMS_TOKEN_CLASS);
s = (SettingsParameter) q.getSingleResult();
if ( !Strings.isEmpty(s.getValue()) ) {
String tokenClass = s.getValue();
......@@ -98,7 +98,7 @@ public final class AuditConfigurationContext implements ServletContextListener {
}
q.setParameter("attr", PARAM_IMS_PORT);
q.setParameter("attr", SettingsConstants.PARAM_IMS_PORT);
s = (SettingsParameter) q.getSingleResult();
if ( Strings.isValidInt(s.getValue()) ) {
int port = Integer.parseInt(s.getValue());
......@@ -109,7 +109,7 @@ public final class AuditConfigurationContext implements ServletContextListener {
}
}
q.setParameter("attr", PARAM_AUDIT_ONLY);
q.setParameter("attr", SettingsConstants.PARAM_AUDIT_ONLY);
try {
s = (SettingsParameter) q.getSingleResult();
AuditThreadFactory.setAuditOnly(Boolean.valueOf(s.getValue()));
......@@ -122,7 +122,15 @@ public final class AuditConfigurationContext implements ServletContextListener {
AuditThreadFactory.setAuditOnly(false);
}
q.setParameter("attr", PARAM_IMS_SSL);
q.setParameter("attr", SettingsConstants.PARAM_AUDIT_SAMPLE);
try {
s = (SettingsParameter) q.getSingleResult();
AuditThreadFactory.setAuditSampling(Boolean.valueOf(s.getValue()));
} catch (NoResultException ex) {
LOG.error(ex.getStackTrace());
}
q.setParameter("attr", SettingsConstants.PARAM_IMS_SSL);
try {
s = (SettingsParameter) q.getSingleResult();
AuditThreadFactory.setSSL(Boolean.valueOf(s.getValue()));
......@@ -138,12 +146,12 @@ public final class AuditConfigurationContext implements ServletContextListener {
PauseBean pb = new PauseBean();
ctx.setAttribute(ATTRIBUTE_PAUSE, pb);
q.setParameter("attr", PARAM_DISABLE_AUDIT);
q.setParameter("attr", SettingsConstants.PARAM_DISABLE_AUDIT);
s = (SettingsParameter) q.getSingleResult();
String startPaused = s.getValue();
pb.setPaused(Boolean.valueOf(startPaused));
q.setParameter("attr", PARAM_THROTTLE_MAXAUDIT);
q.setParameter("attr", SettingsConstants.PARAM_THROTTLE_MAXAUDIT);
s = (SettingsParameter) q.getSingleResult();
String maxRun = s.getValue();
if ( Strings.isValidInt(maxRun) ) {
......
......@@ -33,8 +33,14 @@ package edu.umiacs.ace.monitor.audit;
import edu.umiacs.ace.driver.StorageDriver;
import edu.umiacs.ace.monitor.core.MonitoredItem;
import edu.umiacs.ace.monitor.core.Collection;
import edu.umiacs.ace.util.PersistUtil;
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.persistence.EntityManager;
import javax.persistence.Query;
/**
*
......@@ -49,6 +55,7 @@ public class AuditThreadFactory {
private static int imsPort = 8080;
private static String tokenClass = "SHA-256";
private static boolean auditOnly = false;
private static boolean auditSample = false;
private static boolean ssl = false;
public static void setIMS( String ims ) {
......@@ -79,6 +86,10 @@ public class AuditThreadFactory {
AuditThreadFactory.auditOnly = auditOnlyMode;
}
public static void setAuditSampling(boolean auditSampling ) {
AuditThreadFactory.auditSample = auditSampling;
}
public static boolean isAuditing() {
return !runningThreads.isEmpty();
}
......@@ -94,13 +105,12 @@ public class AuditThreadFactory {
MonitoredItem... startItem ) {
synchronized ( runningThreads ) {
if ( !runningThreads.containsKey(c) && runningThreads.size() < max_audits ) {
// String[] pathList = null;
// if (startItem != null)
// {
// pathList = new String[startItem.size()];
// }
AuditThread newThread = new AuditThread(c, tri, auditOnly,
verbose, startItem);
AuditThread newThread = null;
if ( auditSample ) {
startItem = getSampledList(c);
}
newThread = new AuditThread(c, tri, auditOnly,
verbose, startItem);
newThread.setImsHost(imsHost);
newThread.setImsport(imsPort);
newThread.setTokenClassName(tokenClass);
......@@ -111,23 +121,6 @@ public class AuditThreadFactory {
}
}
// public static AuditThread createThread(Collection c, StorageAccess tri,
// MonitoredItem startItem)
// {
// synchronized ( runningThreads )
// {
// if ( !runningThreads.containsKey(c) && runningThreads.size() < max_audits)
// {
// String path = null;
// if (startItem != null)
// {
// path = startItem.getPath();
// }
// runningThreads.put(c, new AuditThread(c, tri, path));
// }
// return runningThreads.get(c);
// }
// }
public static final boolean isRunning( Collection c ) {
synchronized ( runningThreads ) {
return runningThreads.containsKey(c);
......@@ -182,4 +175,22 @@ public class AuditThreadFactory {
return AuditThreadFactory.ssl;
}
private static MonitoredItem[] getSampledList(Collection c) {
EntityManager em = PersistUtil.getEntityManager();
Query q = em.createNamedQuery("MonitoredItem.listIds");
q.setParameter("coll", c);
List<Long> itemIds = q.getResultList();
int size = (int) Math.ceil(Math.sqrt(itemIds.size()));
SecureRandom rand = new SecureRandom();
List<MonitoredItem> items = new LinkedList<MonitoredItem>();
for ( int i=0;i < size; i++) {
int idxToGet = rand.nextInt(itemIds.size());
MonitoredItem item = em.find(MonitoredItem.class, itemIds.remove(idxToGet));
items.add(item);
}
return items.toArray(new MonitoredItem[items.size()]);
}
}
......@@ -57,6 +57,7 @@ public final class StartSyncServlet extends EntityManagerServlet {
throws ServletException, IOException {
boolean isTokenAudit = false;
boolean verbose = true;
Collection collection;
StorageDriver driver;
MonitoredItem[] item = null;
......@@ -71,8 +72,6 @@ public final class StartSyncServlet extends EntityManagerServlet {
if ( (collection = getCollection(request, em)) != null ) {
driver = StorageDriverFactory.createStorageAccess(collection, em);
if ( "corrupt".equals(request.getParameter(PARAM_TYPE)) ) {
List<MonitoredItem> itemlist;
Query query = em.createNamedQuery("MonitoredItem.listLocalErrors");
query.setParameter("coll", collection);
List<MonitoredItem> resList = query.getResultList();
......@@ -92,8 +91,7 @@ public final class StartSyncServlet extends EntityManagerServlet {
if ( isTokenAudit ) {
AuditTokens.createThread(collection);
} else {
// Move 'true' to separate variable for clarity
AuditThreadFactory.createThread(collection, driver, true, item);
AuditThreadFactory.createThread(collection, driver, verbose, item);
}
response.sendRedirect("Status?collectionid=" + collection.getId());
......
......@@ -74,10 +74,10 @@ public class IngestThreadPool {
// 2 - Java will throw an exception otherwise
if ( threads == null ) {
threads = new ThreadPoolExecutor(maxThreads,
maxThreads, 1, TimeUnit.MINUTES, ingestQueue);
maxThreads, 5, TimeUnit.MINUTES, ingestQueue);
}
if ( dirThread == null ) {
dirThread = new ThreadPoolExecutor(1, 1, 1, TimeUnit.MINUTES, dirQueue);
dirThread = new ThreadPoolExecutor(1, 1, 5, TimeUnit.MINUTES, dirQueue);
}
if ( hasSeen == null ) {
hasSeen = new HashMap<Collection, Set<String>>();
......@@ -134,6 +134,13 @@ public class IngestThreadPool {
protected static void shutdownPools() {
LOG.debug("[Ingest]: Shutting down thread pools.");
threads.shutdown();
if (!threads.isTerminated() ) {
threads.shutdownNow();
}
dirThread.shutdown();
if ( !dirThread.isShutdown()) {
dirThread.shutdownNow();
}
}
}
......@@ -7,6 +7,7 @@ package edu.umiacs.ace.monitor.settings;
public class SettingsConstants {
// Attributes
public static final String PARAM_AUDIT_ONLY="audit.only";
public static final String PARAM_AUDIT_SAMPLE = "audit.sample";
public static final String PARAM_IMS = "ims";
public static final String PARAM_IMS_PORT = "ims.port";
public static final String PARAM_IMS_TOKEN_CLASS = "ims.tokenclass";
......@@ -34,6 +35,7 @@ public class SettingsConstants {
// Default Values
public static final String auditOnly = "false";
public static final String auditSample = "false";
public static final String mailServer = "localhost.localdomain";
public static final String mailFrom = "acemail@localhost";
public static final String maxAudit = "3";
......
......@@ -33,7 +33,8 @@ public class SettingsServlet extends EntityManagerServlet {
@Override
protected void processRequest(HttpServletRequest request, HttpServletResponse response,
EntityManager em) throws ServletException, IOException {
Set<String> paramSet = SettingsUtil.getParamNames();
// Set<String> paramSet = SettingsUtil.getParamNames();
List<SettingsParameter> settings = new ArrayList<SettingsParameter>();
boolean update = false;
......@@ -49,12 +50,10 @@ public class SettingsServlet extends EntityManagerServlet {
if ( item.isFormField() ) {
String name = item.getFieldName();
String value = Streams.asString(stream);
if ( paramSet.contains(name) && !value.isEmpty() ) {
settings.add(new SettingsParameter(name, value));
}
if ( name.equals("update") ) {
update = true;
}else if (!value.isEmpty()) {
settings.add(new SettingsParameter(name, value));
}
}
}
......
......@@ -104,7 +104,6 @@ public class SettingsUtil {
// Get the names of all curent settings
public static Set<String> getParamNames() {
List<SettingsParameter> settings = getCurrentSettings();
Set<String> paramSet = new HashSet<String>();
for ( SettingsParameter s : settings ) {
paramSet.add(s.getName());
......@@ -158,6 +157,8 @@ public class SettingsUtil {
SettingsConstants.maxIngestThreads, false));
defaults.add(new SettingsParameter(SettingsConstants.PARAM_AUDIT_ONLY,
SettingsConstants.auditOnly, false));
defaults.add(new SettingsParameter(SettingsConstants.PARAM_AUDIT_SAMPLE,
SettingsConstants.auditSample, false));
return defaults;
}
......
......@@ -37,6 +37,11 @@
<div class="settingsVal"><input type=text name="throttle.maxaudit" value="${currSettings['throttle.maxaudit']}"/></div>
<div class="settingsHelp"><img src="images/help.png" title="Max number of running audits"></div>
</div>
<div class="settingsRow">
<div class="settingsName">Audit Sampling:</div>
<div class="settingsVal"><input type=text name="audit.sample" value="${currSettings['audit.sample']}"/></div>
<div class="settingsHelp"><img src="images/help.png" title="Enable or disable statistical sampling while auditing (samples sqrt n files)"></div>
</div>
<div class="settingsRow">
<div class="settingsName">Audit Wait Time:</div>
<div class="settingsVal"><input type=text name="throttle.wait" value="${currSettings['throttle.wait']}"/></div>
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment