Commit 3b9cac93 authored by toaster's avatar toaster
Browse files

swap multi-threading and checkm manifests

git-svn-id: https://subversion.umiacs.umd.edu/ace/trunk@24 f1b3a171-7291-4a19-a512-95ad0ad9394a
parent 6e6b8573
......@@ -32,35 +32,21 @@ package edu.umiacs.ace.driver.swap;
import edu.umiacs.ace.driver.AuditIterable;
import edu.umiacs.ace.driver.DriverStateBean;
import edu.umiacs.ace.driver.DriverStateBean.State;
import edu.umiacs.ace.driver.FileBean;
import edu.umiacs.ace.driver.filter.PathFilter;
import edu.umiacs.ace.driver.StorageDriver;
import edu.umiacs.ace.monitor.core.MonitoredItem;
import edu.umiacs.ace.monitor.core.Collection;
import edu.umiacs.ace.driver.QueryThrottle;
import edu.umiacs.ace.util.HashValue;
import edu.umiacs.ace.util.PersistUtil;
import edu.umiacs.ace.util.ThrottledInputStream;
import edu.umiacs.io.IO;
import edu.umiacs.srb.util.StringUtil;
import edu.umiacs.util.Strings;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URL;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;
......@@ -87,8 +73,6 @@ public class SwapFileAccess extends StorageDriver {
private static final String PARAM_PREFIX = "prefix";
private static final String PARAM_PORT = "port";
private static final Logger LOG = Logger.getLogger(SwapFileAccess.class);
private boolean cancel = false;
private double lastDelay = 0;
private SwapSettings settings;
private EntityManager em;
......@@ -256,7 +240,7 @@ public class SwapFileAccess extends StorageDriver {
return settings;
}
private FileGroup groupByName( SwapClient client, String name ) {
static FileGroup groupByName( SwapClient client, String name ) {
try {
......@@ -264,9 +248,8 @@ public class SwapFileAccess extends StorageDriver {
LOG.trace("Path parses to uuid, testing " + id);
return client.getFileGroupList().get(id);
} catch ( IllegalArgumentException e ) {
LOG.trace("Path does not parse to uuis, checking namespaces " + name);
LOG.trace("Path does not parse to a uuid, checking namespaces for '" + name +"'");
for ( FileGroup fg : client.getFileGroupList() ) {
System.out.println("testing " + fg.getCombinedNameSpace());
if ( fg.getCombinedNameSpace().equals(name) ) {
return fg;
}
......@@ -280,24 +263,50 @@ public class SwapFileAccess extends StorageDriver {
final PathFilter filter, final MonitoredItem[] startPathList ) {
return new AuditIterable<FileBean>() {
private DriverStateBean statebean = new DriverStateBean();
private MyIterator it;
private SwapIterator it;
@Override
public Iterator<FileBean> iterator() {
it = new MyIterator(startPathList, filter, digestAlgorithm, statebean);
SwapFile rootFile;
SwapClient client = createClient();
try {
Thread.sleep(2000);
FileGroup group = groupByName(client, settings.getCollection().getDirectory());
if ( group == null ) {
LOG.error("Could not extract swap filegroup '"
+ settings.getCollection().getDirectory() + "' ");
throw new RuntimeException("Could not find swap file group");
}
rootFile = group.getFileDetails(settings.getPrefix());
} catch ( InterruptedException e ) {
LOG.error("Could not lookup swap file '" + settings.getPrefix() + "' ");
throw new RuntimeException("Could not open " + settings.getPrefix());
}
if (rootFile == null)
{
LOG.error("Could not lookup swap file '" + settings.getPrefix() + "' ");
throw new RuntimeException("Could not open " + settings.getPrefix());
}
it =
new SwapIterator(startPathList, filter, digestAlgorithm, settings.getCollection().getName(), rootFile, client);
// it = new MyIterator(startPathList, filter, digestAlgorithm, statebean);
return it;
}
@Override
public void cancel() {
LOG.debug("Cancel called on localfile iterator");
cancel = true;
LOG.debug("Cancel called on swapfile iterator");
it.cancel();
}
@Override
public DriverStateBean[] getState() {
return new DriverStateBean[]{statebean};
return it.getStateBeans();
}
};
......@@ -315,218 +324,29 @@ public class SwapFileAccess extends StorageDriver {
return client;
}
class MyIterator implements Iterator<FileBean> {
private FileBean next;
private Queue<SwapFile> dirsToProcess = new LinkedList<SwapFile>();
private Queue<SwapFile> filesToProcess = new LinkedList<SwapFile>();
private MessageDigest digest;
private byte[] buffer = new byte[32768];
private SwapFile rootFile;
private PathFilter filter;
private DriverStateBean statebean;
private SwapClient client;
public MyIterator( MonitoredItem[] startPath, PathFilter filter,
String digestAlgorithm, DriverStateBean statebean ) {
this.statebean = statebean;
this.filter = filter;
try {
client = createClient();
Thread.sleep(2000);
digest = MessageDigest.getInstance(digestAlgorithm);
FileGroup group = groupByName(client, getCollection().getDirectory());
if ( Strings.isEmpty(settings.getPrefix()) ) {
rootFile = group.getFileDetails("");
} else {
rootFile = group.getFileDetails(settings.getPrefix());
}
if (rootFile == null)
throw new NullPointerException("Root file is null!");
statebean.setRunningThread(Thread.currentThread());
statebean.setStateAndReset(State.LISTING);
if ( startPath != null ) {
for ( MonitoredItem mi : startPath ) {
SwapFile startFile;
String subpath = settings.getPrefix() + mi.getPath();
startFile = group.getFileDetails(subpath);
if ( startFile.isDirectory() ) {
dirsToProcess.add(startFile);
} else if ( startFile.isFile() ) {
filesToProcess.add(startFile);
}
}
} else {
dirsToProcess.add(rootFile);
}
loadNext();
} catch ( InterruptedException ex ) {
throw new RuntimeException(ex);
} catch ( NoSuchAlgorithmException ex ) {
throw new RuntimeException(ex);
}
statebean.setStateAndReset(State.IDLE);
}
@Override
public boolean hasNext() {
return !cancel && next != null;
}
@Override
public FileBean next() {
FileBean retValue = next;
try {
loadNext();
} catch ( InterruptedException e ) {
throw new RuntimeException(e);
}
return retValue;
}
@Override
public void remove() {
}
private void loadNext() throws InterruptedException {
statebean.setStateAndReset(State.LISTING);
// see if wee need to process a directory or if there are files in queue
while ( filesToProcess.isEmpty() && !dirsToProcess.isEmpty() ) {
SwapFile directory = dirsToProcess.poll();
statebean.setFile(directory.getFullPath());
LOG.trace("Popping directory: " + directory);
SwapFile[] fileList = directory.listFiles();
if ( fileList == null ) {
LOG.info("Could not read directory, skipping: " + directory);
} else {
for ( SwapFile f : fileList ) {
LOG.trace("Found item " + f);
if ( f.isDirectory() && filter.process(
extractPathList(f), true) ) {
LOG.trace("Adding matching directory: " + f);
dirsToProcess.add(f);
} else if ( f.isFile() && filter.process(
extractPathList(f), false) ) {
LOG.trace("Adding matching file: " + f);
filesToProcess.add(f);
}
}
}
}
// now see why we ended loop
// we have files
if ( !filesToProcess.isEmpty() && !cancel ) {
next = processFile(filesToProcess.poll());
} else {
next = null;
}
statebean.setStateAndReset(State.IDLE);
@Override
public InputStream getItemInputStream( String itemPath ) throws IOException {
SwapClient client = createClient();
FileGroup group = groupByName(client, settings.getCollection().getDirectory());
if ( group == null ) {
throw new IOException("Could not file swap file group");
}
private String[] extractPathList( SwapFile file ) throws InterruptedException {
int substrLength = rootFile.getFullPath().length();
// build directory path
List<String> dirPathList = new ArrayList<String>();
SwapFile currFile = file;
while ( currFile != null && !currFile.equals(rootFile) ) {
// LOG.trace("Adding dir to path: " + currFile.getPath().substring(
// substrLength));
String pathToAdd = currFile.getFullPath().substring(substrLength);
pathToAdd = pathToAdd.replace(File.separatorChar, '/');
dirPathList.add(pathToAdd);
currFile = currFile.getParentFile();
}
return dirPathList.toArray(new String[dirPathList.size()]);
SwapFile file = null;
try {
file = group.getFileDetails(settings.getPrefix() + itemPath);
} catch ( InterruptedException e ) {
throw new IOException("Could not open " + itemPath);
}
@SuppressWarnings("empty-statement")
private FileBean processFile( SwapFile file ) throws InterruptedException {
DigestInputStream dis = null;
FileBean fb = new FileBean();
fb.setPathList(extractPathList(file));
LOG.trace("Processing file: " + file);
statebean.setStateAndReset(State.OPENING_FILE);
statebean.setRead(0);
statebean.setTotalSize(file.getSize());
statebean.setFile(file.getFullPath());
digest.reset();
try {
statebean.setState(State.THROTTLE_WAIT);
QueryThrottle.waitToRun();
statebean.setState(State.READING);
statebean.updateLastChange();
long fileSize = 0;
URL outputURL = file.getURL(null);
HttpURLConnection connection;
connection = (HttpURLConnection) outputURL.openConnection();
// connection.setChunkedStreamingMode(buffer.length);
HttpURLConnection.setFollowRedirects(true);
// connection.setRequestMethod("GET");
connection.setDoInput(true);
// connection.setDoOutput(true);
ThrottledInputStream tis =
new ThrottledInputStream(connection.getInputStream(), QueryThrottle.getMaxBps(), lastDelay);
dis = new DigestInputStream(tis, digest);
int read = 0;
while ( (read = dis.read(buffer)) >= 0 && !cancel ) {
fileSize += read;
statebean.updateLastChange();
statebean.setRead(fileSize);
}
lastDelay = tis.getSleepTime();
byte[] hashValue = digest.digest();
dis.close();
fb.setHash(HashValue.asHexString(hashValue));
fb.setFileSize(fileSize);
} catch ( IOException ie ) {
LOG.error("Error reading file: " + file, ie);
fb.setError(true);
fb.setErrorMessage(Strings.exceptionAsString(ie));
} finally {
IO.release(dis);
statebean.setStateAndReset(State.IDLE);
if ( cancel ) {
return null;
} else {
return fb;
}
}
if ( file == null ) {
throw new FileNotFoundException(itemPath);
}
}
@Override
public InputStream getItemInputStream( String itemPath ) throws IOException {
URL outputURL = file.getURL(null);
HttpURLConnection connection;
connection = (HttpURLConnection) outputURL.openConnection();
HttpURLConnection.setFollowRedirects(true);
connection.setDoInput(true);
return connection.getInputStream();
File f = new File(getCollection().getDirectory() + itemPath);
return new FileInputStream(f);
}
}
/*
* Copyright (c) 2007-2010, University of Maryland
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided
* that the following conditions are met:
*
* Redistributions of source code must retain the above copyright notice, this list of conditions
* and the following disclaimer.
*
* Redistributions in binary form must reproduce the above copyright notice, this list of conditions
* and the following disclaimer in the documentation and/or other materials provided with the
* distribution.
*
* Neither the name of the University of Maryland nor the names of its contributors may be used to
* endorse or promote products derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* ACE Components were written in the ADAPT Project at the University of
* Maryland Institute for Advanced Computer Study.
*/
// $Id: LocalFileAccess.java 19 2010-10-22 18:45:19Z toaster $
package edu.umiacs.ace.driver.swap;
import edu.umiacs.ace.driver.DriverStateBean;
import edu.umiacs.ace.driver.DriverStateBean.State;
import edu.umiacs.ace.driver.FileBean;
import edu.umiacs.ace.driver.QueryThrottle;
import edu.umiacs.ace.driver.filter.PathFilter;
import edu.umiacs.ace.monitor.core.MonitoredItem;
import edu.umiacs.ace.util.HashValue;
import edu.umiacs.ace.util.ThrottledInputStream;
import edu.umiacs.io.IO;
import edu.umiacs.util.Strings;
import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.apache.log4j.NDC;
import org.swap.client.SwapClient;
import org.swap.client.SwapFile;
/**
*
* @author toaster
*/
public class SwapIterator implements Iterator<FileBean> {
private Queue<SwapFile> dirsToProcess = new LinkedList<SwapFile>();
private LinkedBlockingQueue<SwapFile> filesToProcess = new LinkedBlockingQueue<SwapFile>();
private boolean finished = false;
private SwapFile rootFile;
private PathFilter filter;
private DriverStateBean[] statebeans;
private static final int MAX_THREADS = 5;
private double lastDelay = 0;
private final List<FileWorker> threads = new LinkedList<FileWorker>();
private Thread takingThread = null;
private LinkedBlockingQueue<FileBean> resultQueue = new LinkedBlockingQueue<FileBean>();
private static final Logger LOG = Logger.getLogger(SwapIterator.class);
private SwapClient client;
private Lock loadLoack = new ReentrantLock();
public SwapIterator( MonitoredItem[] basePath, PathFilter filter,
String digestAlgorithm, String collectionName,
SwapFile rootFile, SwapClient client ) {
this.filter = filter;
this.rootFile = rootFile;
this.client = client;
try {
if ( basePath != null ) {
for ( MonitoredItem mi : basePath ) {
String startFile;
startFile = rootFile.getFullPath() + mi.getPath();
SwapFile sf = rootFile.getFileGroup().getFileDetails(startFile);
if ( mi.isDirectory() ) {
dirsToProcess.add(sf);
} else {
filesToProcess.add(sf);
}
}
} else {
dirsToProcess.add(rootFile);
}
statebeans = new DriverStateBean[MAX_THREADS];
for ( int i = 0; i < MAX_THREADS; i++ ) {
DriverStateBean sb = new DriverStateBean();
MessageDigest digest = MessageDigest.getInstance(digestAlgorithm);
Thread t = new Thread(new FileWorker(sb, i, digest));
statebeans[i] = sb;
t.setName("SWAP Reader " + i + " " + collectionName);
t.start();
}
// ensure this doesn't return before thread has started
// race condition in hasNext otherwise
if ( threads.isEmpty() ) {
synchronized ( threads ) {
threads.wait();
}
}
} catch ( NoSuchAlgorithmException ex ) {
throw new RuntimeException(ex);
} catch ( Throwable ioe ) {
LOG.error("Could not connect to the srb", ioe);
throw new RuntimeException("Could not connect to the srb", ioe);
}
}
public void cancel() {
this.finished = true;
}
public DriverStateBean[] getStateBeans() {
return statebeans;
}
@Override
public boolean hasNext() {
return (!threads.isEmpty() || !resultQueue.isEmpty());
}
@Override
public FileBean next() {
takingThread = Thread.currentThread();
try {
return resultQueue.take();
} catch ( InterruptedException ie ) {
LOG.error("Interrupted Exception", ie);
if ( resultQueue.isEmpty() ) {
return null;
}
throw new RuntimeException(ie);
} finally {
takingThread = null;
}
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove not supported");
}
private String[] extractPathList( SwapFile file ) throws InterruptedException {
int substrLength = rootFile.getFullPath().length();
// build directory path
List<String> dirPathList = new ArrayList<String>();
String currFile = file.getFullPath();
while ( !currFile.equals(rootFile.getFullPath()) ) {
// LOG.trace("Adding dir to path: " + currFile.substring(substrLength));
dirPathList.add(currFile.substring(substrLength));
currFile = currFile.substring(0, currFile.lastIndexOf('/'));
}
return dirPathList.toArray(new String[dirPathList.size()]);
}
private class FileWorker implements Runnable {
private int id;
private DriverStateBean statebean;
private byte[] buffer = new byte[2097152];
private MessageDigest digest;
public FileWorker( DriverStateBean statebean, int id, MessageDigest digest ) {
this.statebean = statebean;
this.id = id;
this.digest = digest;
}
@Override
@SuppressWarnings("empty-statement")
public void run() {
NDC.push("[SWAP-" + id + "] ");
statebean.setRunningThread(Thread.currentThread());
synchronized ( threads ) {
threads.add(this);
threads.notify();
}
try {
while ( !finished ) {
loadLoack.lockInterruptibly();
SwapFile file;
try {
file = loadNext();
} finally {
loadLoack.unlock();
}
FileBean fb = null;
if ( file != null ) {
fb = processFile(file);
}
if ( fb != null ) {
resultQueue.offer(fb);