Commit 50abd3f8 authored by toaster's avatar toaster
Browse files

throttled is changes, swap schema update patch

git-svn-id: https://subversion.umiacs.umd.edu/ace/trunk@106 f1b3a171-7291-4a19-a512-95ad0ad9394a
parent 5dd091a8
......@@ -63,7 +63,7 @@ import org.apache.log4j.Logger;
*
* @author toaster
*/
public class BenchmarkAccess extends StorageDriver {
public final class BenchmarkAccess extends StorageDriver {
private static final String PAGE = "benchmark.jsp";
private BenchmarkSettings settings;
......@@ -350,6 +350,7 @@ public class BenchmarkAccess extends StorageDriver {
}
}
long totalTime = System.currentTimeMillis() - start;
if (LOG.isTraceEnabled())
LOG.trace(
"Time: " + (totalTime / 1000) + "s " + ((settings.getFileLength() / 1024)
/ totalTime) / 1000 + "K/s");
......
......@@ -10,4 +10,16 @@ CREATE TABLE `acetoken` (
`PARENTCOLLECTION_ID` bigint(20) NOT NULL,
PRIMARY KEY (`ID`),
KEY `FK_acetoken_PARENTCOLLECTION_ID` (`PARENTCOLLECTION_ID`)
) ENGINE=MyISAM DEFAULT CHARSET=latin1 MAX_ROWS=2431504384;
\ No newline at end of file
) ENGINE=MyISAM DEFAULT CHARSET=latin1 MAX_ROWS=2431504384;
CREATE TABLE `swapsettings` (
`ID` bigint(20) NOT NULL auto_increment,
`USERNAME` varchar(255) NOT NULL,
`SERVERS` varchar(255) NOT NULL,
`PREFIX` varchar(255) NOT NULL,
`PORT` int(11) NOT NULL,
`PASSWORD` varchar(255) NOT NULL,
`COLLECTION_ID` bigint(20) NOT NULL,
PRIMARY KEY (`ID`),
KEY `FK_srbsettings_COLLECTION_ID` (`COLLECTION_ID`)
) ENGINE=MyISAM DEFAULT CHARSET=latin1;
......@@ -10,11 +10,17 @@ import java.security.MessageDigest;
import java.util.concurrent.LinkedBlockingQueue;
/**
* This class parallelizes file reading and digesting.
* This class parallelizes file reading and digesting. It maintains a set of
* byte buffers which are transfered between free and fille queues. A dedicated
* digest thread will read all filled blocks, update a digest, then return the
* block to the free queue.
*
* All users of this class must call shutdown or abort when no more calls to
* readStream are expected or the digest thread will not terminate
*
* @author toaster
*/
public class ThreadedDigestStream {
public final class ThreadedDigestStream {
private LinkedBlockingQueue<ByteWrapper> filledBlocks = new LinkedBlockingQueue<ByteWrapper>();
private LinkedBlockingQueue<ByteWrapper> emptyBlocks = new LinkedBlockingQueue<ByteWrapper>();
......@@ -26,6 +32,13 @@ public class ThreadedDigestStream {
private Thread digestThread;
private boolean digestDone = false;
/**
* Create a new threaded digest stream. The queues will be initialized
* with 5 buffers of the supplied block size.
*
* @param md digest to update
* @param blockSize block size to use in queues
*/
public ThreadedDigestStream(MessageDigest md, int blockSize) {
this.md = md;
......@@ -37,6 +50,14 @@ public class ThreadedDigestStream {
}
}
/**
* stop this queue. Any open files will be given a chance to finish. If the
* this stream is currently idle, the digest thread will exit, otherwise it
* will finish the current open file.
*
* After a shutdown call, future calls to readstream will error.
*
*/
public void shutdown() {
running = false;
// if a digest is not running, toss a shutdown message into queue
......@@ -47,25 +68,35 @@ public class ThreadedDigestStream {
}
}
/**
* Set a listener to receive notifications of bit processing.
*
* @param listener
*/
public void setListener(ThreadedDigestStreamListener listener) {
this.listener = listener;
}
private void checkThread() {
if (digestThread == null) {
DigestRunnable dr = new DigestRunnable();
digestThread = new Thread(dr);
digestThread.setName("Digest Thread");
digestThread.start();
}
}
/**
* Read the given inputstream and update the stored digest. If a listener
* has been set, it will be notified when bytes are read. Also, if an
* exception is thrown, the digest thread will not be shutdown and additional
* files can still be processed.
*
* @param is inputstream to read
* @return total bytes read
* @throws IOException if there was an issue reading the file.
*/
public long readStream(InputStream is) throws IOException {
checkThread();
if (!filledBlocks.isEmpty()) {
throw new IllegalStateException("Read in progress");
}
if (!running) {
throw new IllegalStateException("Stream shutdown");
}
digestDone = false;
int read = 0;
delayedException = null;
......@@ -74,24 +105,26 @@ public class ThreadedDigestStream {
try {
ByteWrapper bw = emptyBlocks.take();
while (running && (read = is.read(bw.block)) != -1) {
if (listener != null) {
listener.bytesRead(read);
try {
while (running && (read = is.read(bw.block)) != -1) {
if (listener != null) {
listener.bytesRead(read);
}
bw.len = read;
filledBlocks.put(bw);
totalBytes += read;
bw = emptyBlocks.take();
}
bw.len = read;
// System.out.println("len " + bw.len + bw.block.length);
filledBlocks.put(bw);
totalBytes += read;
bw = emptyBlocks.take();
} finally {
emptyBlocks.put(bw);
}
emptyBlocks.put(bw);
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
ByteWrapper closeBlock = new ByteWrapper(0);
closeBlock.len = -1;
if (!filledBlocks.offer(closeBlock)) {
// digestThread.stop(); // bad, but there is no other way this is stopping if we're here
digestThread = null;
throw new RuntimeException("Could not queue close block");
}
......@@ -115,6 +148,12 @@ public class ThreadedDigestStream {
}
/**
* Abort processing the current file.
*
* @return if abort was successful
*
*/
public boolean abort() {
ByteWrapper closeBlock = new ByteWrapper(0);
closeBlock.len = -1;
......@@ -126,6 +165,15 @@ public class ThreadedDigestStream {
return false;
}
private void checkThread() {
if (digestThread == null) {
DigestRunnable dr = new DigestRunnable();
digestThread = new Thread(dr);
digestThread.setName("Digest Thread");
digestThread.start();
}
}
private class DigestRunnable implements Runnable {
public void run() {
......@@ -139,10 +187,9 @@ public class ThreadedDigestStream {
ThreadedDigestStream.this.notifyAll();
}
} else {
md.update(bw.block, 0, bw.len);
emptyBlocks.put(bw);
// System.out.println("updating block, resetting" + emptyBlocks.size());
}
}
} catch (InterruptedException e) {
......@@ -152,7 +199,6 @@ public class ThreadedDigestStream {
} finally {
synchronized (ThreadedDigestStream.this) {
running = false;
// System.out.println("Digest thread cleaning");
filledBlocks.clear();
emptyBlocks.clear();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment