Commit eb883307 authored by Michael Ritter's avatar Michael Ritter
Browse files

Add type parameters to methods

parent e6c10f3e
......@@ -32,22 +32,21 @@ public class CollectionThreadPoolExecutor extends ThreadPoolExecutor {
private static final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
private final Set<Submittable> set = new ConcurrentSkipListSet<>();
public CollectionThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
private CollectionThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
// Initialization
static class ExecutorHolder {
static final CollectionThreadPoolExecutor INSTANCE =
createThreadPool();
static final CollectionThreadPoolExecutor INSTANCE = createThreadPool();
private static CollectionThreadPoolExecutor createThreadPool() {
SettingsParameter attr = SettingsUtil.getItemByAttr(PARAM_THROTTLE_MAXAUDIT);
int max = Integer.parseInt(attr.getValue());
int max = Integer.parseInt(attr != null ? attr.getValue() : SettingsConstants.maxAudit);
if (max <= 0) {
max = Integer.parseInt(SettingsConstants.maxAudit);
}
......@@ -64,40 +63,39 @@ public class CollectionThreadPoolExecutor extends ThreadPoolExecutor {
/**
* Submit a File Audit to run for a collection
*
* @param c the collection to audit
* @param c the collection to audit
* @param at the audit thread to use
* @return {@link KSFuture} containing the submitted request
*/
public KSFuture submitFileAudit(Collection c, AuditThread at) {
public KSFuture<AuditThread> submitFileAudit(Collection c, AuditThread at) {
LOG.info("Submitting [FILE_AUDIT]" + " for " + c.getName());
Submittable submittable = new Submittable(c, RunType.FILE_AUDIT, at);
Submittable<AuditThread> submittable = new Submittable<>(c, RunType.FILE_AUDIT, at);
return submitThread(at, submittable, c);
}
/**
* Submit a request to Ingest Tokens for a collection
*
* @param c the collection to use
* @param c the collection to use
* @param is the IngestSupervisor to run
* @return
* @return {@link KSFuture} containing the submitted request
*/
public KSFuture submitIngestThread(Collection c, IngestSupervisor is) {
public KSFuture<IngestSupervisor> submitIngestThread(Collection c, IngestSupervisor is) {
LOG.info("Submitting [INGEST_THREAD]" + " for " + c.getName());
Submittable submittable = new Submittable(c, RunType.TOKEN_INGEST, is);
Submittable<IngestSupervisor> submittable = new Submittable<>(c, RunType.TOKEN_INGEST, is);
return submitThread(is, submittable, c);
}
/**
* Submit a TokenAudit to run for a collection
*
* @param c the collection to audit
* @param c the collection to audit
* @param at the audit thread to use
* @return {@link KSFuture} containing the submitted request
*/
public KSFuture submitTokenAudit(Collection c, AuditTokens at) {
public KSFuture<AuditTokens> submitTokenAudit(Collection c, AuditTokens at) {
LOG.info("Submitting [TOKEN_AUDIT]" + " for " + c.getName());
Submittable submittable = new Submittable(c, RunType.TOKEN_AUDIT, at);
Submittable<AuditTokens> submittable = new Submittable<>(c, RunType.TOKEN_AUDIT, at);
return submitThread(at, submittable, c);
}
......@@ -106,19 +104,22 @@ public class CollectionThreadPoolExecutor extends ThreadPoolExecutor {
* can extract the Submittable without blocking on the thread and to allow
* locking on each collection.
*
* @param r The runnable to submit
* @param s The submittable to use
* @param r The runnable to submit
* @param s The submittable to use
* @param c The collection being operated on
* @param <T> The generic type of the Submittable
* @return true if a new thread was created, false otherwise
*/
private KSFuture submitThread(Runnable r, Submittable s, Collection c) {
private <T extends Runnable> KSFuture<T> submitThread(Runnable r,
Submittable<T> s,
Collection c) {
if (set.add(s)) {
KSFuture f = new KSFuture(r, s);
KSFuture<T> f = new KSFuture<>(r, s);
execute(f);
return f;
}
RunType t = s.getType();
LOG.info("Already submitted RunType [" + t.name() + "]" + " for " + c.getName());
return null;
}
......@@ -127,8 +128,8 @@ public class CollectionThreadPoolExecutor extends ThreadPoolExecutor {
/**
* Remove any {@link Submittable} from the working set
*
* @param r
* @param t
* @param r the runnable to remove
* @param t an exception, possibly thrown by the runnable process
*/
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment