Commit 5dd091a8 authored by toaster's avatar toaster
Browse files

threaded digesting, new irods iterator

git-svn-id: https://subversion.umiacs.umd.edu/ace/trunk@105 f1b3a171-7291-4a19-a512-95ad0ad9394a
parent 9e325456
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>ace</artifactId>
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>ace</artifactId>
<groupId>edu.umiacs.ace</groupId>
<version>1.7-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>edu.umiacs.ace</groupId>
<version>1.7-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>edu.umiacs.ace</groupId>
<artifactId>ace-am</artifactId>
<name>ace-am</name>
<packaging>war</packaging>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<netbeans.hint.deploy.server>Tomcat60</netbeans.hint.deploy.server>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-war-plugin</artifactId>
<configuration>
<warSourceExcludes>**/footer.jsp</warSourceExcludes>
<webResources>
<resource>
<directory>src/main/webapp</directory>
<filtering>true</filtering>
<includes>
<include>**/footer.jsp</include>
</includes>
</resource>
<!--<resource>
<artifactId>ace-am</artifactId>
<name>ace-am</name>
<packaging>war</packaging>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<netbeans.hint.deploy.server>Tomcat60</netbeans.hint.deploy.server>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-war-plugin</artifactId>
<configuration>
<warSourceExcludes>**/footer.jsp</warSourceExcludes>
<webResources>
<resource>
<directory>src/main/webapp</directory>
<filtering>true</filtering>
<includes>
<include>**/footer.jsp</include>
</includes>
</resource>
<!--<resource>
<directory>src/main/webapp</directory>
<filtering>false</filtering>
<excludes>
<exclude>**/footer.jsp</exclude>
</excludes>
</resource>-->
</webResources>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!-- local ace deps -->
<dependency>
<groupId>edu.umiacs.ace</groupId>
<artifactId>ace-ims-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<!-- for persistence -->
<dependency>
<groupId>org.eclipse.persistence</groupId>
<artifactId>eclipselink</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.eclipse.persistence</groupId>
<artifactId>javax.persistence</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
<version>1.4.1</version>
</dependency>
<!-- for json -->
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.0.1</version>
</dependency>
<!-- for Jstl / webapp stuff-->
<dependency>
<groupId>taglibs</groupId>
<artifactId>standard</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
<version>2.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>jstl</groupId>
<artifactId>jstl</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>json-taglib</groupId>
<artifactId>json-taglib</artifactId>
<version>0.4.1</version>
</dependency>
<dependency>
<groupId>commons-fileupload</groupId>
<artifactId>commons-fileupload</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>quartz</groupId>
<artifactId>quartz</artifactId>
<version>1.5.2</version>
</dependency>
<!-- local ones -->
<dependency>
<groupId>edu.umiacs</groupId>
<artifactId>jargon</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>edu.umiacs</groupId>
<artifactId>swap</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>edu.umiacs</groupId>
<artifactId>adapt-srb</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>edu.umiacs</groupId>
<artifactId>irods-api</artifactId>
<version>1.1</version>
</dependency>
</dependencies>
</webResources>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!-- local ace deps -->
<dependency>
<groupId>edu.umiacs.ace</groupId>
<artifactId>ace-ims-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<!-- for persistence -->
<dependency>
<groupId>org.eclipse.persistence</groupId>
<artifactId>eclipselink</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.eclipse.persistence</groupId>
<artifactId>javax.persistence</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
<version>1.4.1</version>
</dependency>
<!-- for json -->
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.0.1</version>
</dependency>
<!-- for Jstl / webapp stuff-->
<dependency>
<groupId>taglibs</groupId>
<artifactId>standard</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
<version>2.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>jstl</groupId>
<artifactId>jstl</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>json-taglib</groupId>
<artifactId>json-taglib</artifactId>
<version>0.4.1</version>
</dependency>
<dependency>
<groupId>commons-fileupload</groupId>
<artifactId>commons-fileupload</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>quartz</groupId>
<artifactId>quartz</artifactId>
<version>1.5.2</version>
</dependency>
<!-- local ones -->
<dependency>
<groupId>edu.umiacs</groupId>
<artifactId>jargon</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>edu.umiacs</groupId>
<artifactId>adapt-srb</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>edu.umiacs</groupId>
<artifactId>swap</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>edu.umiacs</groupId>
<artifactId>irods-api</artifactId>
<version>1.3</version>
</dependency>
</dependencies>
</project>
......
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package edu.umiacs.ace.driver;
import edu.umiacs.ace.util.ThreadedDigestStreamListener;
/**
*
* @author toaster
*/
public class StateBeanDigestListener implements ThreadedDigestStreamListener{
private DriverStateBean bean;
public StateBeanDigestListener(DriverStateBean bean) {
this.bean = bean;
}
public void bytesRead(int read) {
bean.setRead(bean.getRead() + read);
bean.updateLastChange();
}
}
......@@ -30,6 +30,7 @@
// $Id$
package edu.umiacs.ace.driver;
import edu.umiacs.ace.driver.benchmark.BenchmarkAccess;
import edu.umiacs.ace.driver.irods.IrodsAccess;
import edu.umiacs.ace.driver.localfile.LocalFileAccess;
import edu.umiacs.ace.monitor.core.Collection;
......@@ -65,7 +66,7 @@ public final class StorageDriverFactory {
implementationMap.put("local", LocalFileAccess.class);
implementationMap.put("srb", SrbAccess.class);
implementationMap.put("swap", SwapFileAccess.class);
// implementationMap.put("benchmark", BenchmarkAccess.class);
implementationMap.put("benchmark", BenchmarkAccess.class);
// implementationMap.put("smb", SmbAccess.class);
}
......
......@@ -281,6 +281,7 @@ public class BenchmarkAccess extends StorageDriver {
fb.setError(false);
fb.setHash(getHash());
fb.setPathList(createPathList());
fb.setFileSize(settings.getFileLength());
incrementFile();
return fb;
......
......@@ -105,9 +105,14 @@ public class IrodsAccess extends StorageDriver {
@Override
public AuditIterable<FileBean> getWorkList( final String digestAlgorithm,
final PathFilter filter, final MonitoredItem[] startPathList ) {
final IrodsIterator it = new IrodsIterator(getCollection(),
ic, startPathList, filter, digestAlgorithm);
final DriverStateBean statlist = new DriverStateBean();
final ConnectOperation co = new ConnectOperation(ic.getServer(), ic.getPort(),
ic.getUsername(), ic.getPassword(), ic.getZone());
final IrodsIterator2 it = new IrodsIterator2(startPathList, filter,
digestAlgorithm, statlist, ic.getCollection().getDirectory(), co);
return new AuditIterable<FileBean>() {
@Override
......@@ -122,7 +127,7 @@ public class IrodsAccess extends StorageDriver {
@Override
public DriverStateBean[] getState() {
return new DriverStateBean[] {it.getStateBean()};
return new DriverStateBean[] {statlist};
}
};
......
......@@ -131,7 +131,7 @@ public class IrodsHandler implements BulkFileHandler {
*/
@Override
public void writeBytes( byte[] bytes, long offset, int length ) {
// LOG.trace("digesting chunk, offset: " + offset + " length " + length);
LOG.trace("digesting chunk, offset: " + offset + " length " + length);
digest.update(bytes, 0, length);
fileSize += length;
}
......
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package edu.umiacs.ace.driver.irods;
import edu.umiacs.ace.driver.DriverStateBean;
import edu.umiacs.ace.driver.DriverStateBean.State;
import edu.umiacs.ace.driver.FileBean;
import edu.umiacs.ace.driver.QueryThrottle;
import edu.umiacs.ace.driver.StateBeanDigestListener;
import edu.umiacs.ace.driver.filter.PathFilter;
import edu.umiacs.ace.monitor.core.MonitoredItem;
import edu.umiacs.ace.util.HashValue;
import edu.umiacs.ace.util.ThreadedDigestStream;
import edu.umiacs.ace.util.ThrottledInputStream;
import edu.umiacs.io.IO;
import edu.umiacs.irods.api.IRodsRequestException;
import edu.umiacs.irods.api.pi.ErrorEnum;
import edu.umiacs.irods.api.pi.GenQueryEnum;
import edu.umiacs.irods.api.pi.RodsObjStat_PI;
import edu.umiacs.irods.operation.ConnectOperation;
import edu.umiacs.irods.operation.IrodsOperations;
import edu.umiacs.irods.operation.IrodsProxyInputStream;
import edu.umiacs.irods.operation.QueryBuilder;
import edu.umiacs.irods.operation.QueryResult;
import edu.umiacs.util.Strings;
import java.io.IOException;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.apache.log4j.Logger;
/**
*
* @author toaster
*/
public class IrodsIterator2 implements Iterator<FileBean> {
private static final Logger LOG = Logger.getLogger(IrodsIterator2.class);
private FileBean next;
private Queue<String> dirsToProcess = new LinkedList<String>();
private Queue<String> filesToProcess = new LinkedList<String>();
private MessageDigest digest;
private static final int BLOCK_SIZE = 1048576;
// private byte[] buffer = new byte[BLOCK_SIZE];
private String rootFile;
private PathFilter filter;
private DriverStateBean statebean;
private boolean cancel = false;
private ConnectOperation co;
private double lastDelay = 0;
ThreadedDigestStream reader;
public IrodsIterator2(MonitoredItem[] startPath, PathFilter filter,
String digestAlgorithm, DriverStateBean statebean,
String collectionDirectory, ConnectOperation co) {
this.statebean = statebean;
this.filter = filter;
this.co = co;
try {
digest = MessageDigest.getInstance(digestAlgorithm);
reader = new ThreadedDigestStream(digest, BLOCK_SIZE);
rootFile = collectionDirectory;
statebean.setRunningThread(Thread.currentThread());
statebean.setStateAndReset(State.LISTING);
if (startPath != null) {
for (MonitoredItem mi : startPath) {
String startFile;
startFile = collectionDirectory + startPath[0].getPath();
if (mi.isDirectory()) {
dirsToProcess.add(startFile);
} else {
filesToProcess.add(startFile);
}
}
} else {
dirsToProcess.add(rootFile);
}
loadNext();
} catch (NoSuchAlgorithmException ex) {
throw new RuntimeException(ex);
}
statebean.setStateAndReset(State.IDLE);
}
public void cancel() {
this.cancel = true;
reader.abort();
}
@Override
public boolean hasNext() {
boolean hasNext = (!cancel && next != null);
if (!hasNext) {
reader.shutdown();
}
return hasNext;
// return !cancel && next != null;
}
@Override
public FileBean next() {
FileBean retValue = next;
loadNext();
return retValue;
}
@Override
public void remove() {
}
private void loadNext() {
statebean.setStateAndReset(State.LISTING);
// see if wee need to process a directory or if there are files in queue
while (filesToProcess.isEmpty() && !dirsToProcess.isEmpty()) {
String directory = dirsToProcess.poll();
statebean.setFile(directory);
LOG.trace("Popping directory: " + directory);
scanForDirectories(directory);
scanForFiles(directory);
}
// now see why we ended loop
// we have files
if (!filesToProcess.isEmpty() && !cancel) {
next = processFile(filesToProcess.poll());
} else {
next = null;
}
statebean.setStateAndReset(State.IDLE);
}
//private void testConnection() {
// if (opCount > MAXOPCOUNT) {
// opCount = 0;
// try {
// co.reconnect();
// } catch (IOException ex) {
// LOG.error("Error reconnecting", ex);
// }
// }
// opCount++;
// }
private void scanForFiles(String parent) {
QueryBuilder qb;
QueryResult qr;
try {
qb =
new QueryBuilder(GenQueryEnum.COL_DATA_NAME);
qb.eq(GenQueryEnum.COL_COLL_NAME, parent);
// testConnection();
qr = qb.execute(co.getConnection());
while (qr.next()) {
String file = parent + "/" + qr.getValue(GenQueryEnum.COL_DATA_NAME);
if (filter.process(extractPathList(file), false)) {
filesToProcess.add(file);
}
}
} catch (IOException ex) {
LOG.error("Exception", ex);
}
}