Commit 7ce41f56 authored by toaster's avatar toaster
Browse files

per-thread popup, csv export

git-svn-id: https://subversion.umiacs.umd.edu/ace/trunk@19 f1b3a171-7291-4a19-a512-95ad0ad9394a
parent ff06a6bf
......@@ -28,7 +28,7 @@
* Maryland Institute for Advanced Computer Study.
*/
// $Id$
package edu.umiacs.ace.monitor.audit;
package edu.umiacs.ace.driver;
/**
*
......@@ -36,5 +36,6 @@ package edu.umiacs.ace.monitor.audit;
*/
public interface AuditIterable<T> extends Iterable<T> {
public DriverStateBean[] getState();
public void cancel();
}
/*
* Copyright (c) 2007-@year@, University of Maryland
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided
* that the following conditions are met:
*
* Redistributions of source code must retain the above copyright notice, this list of conditions
* and the following disclaimer.
*
* Redistributions in binary form must reproduce the above copyright notice, this list of conditions
* and the following disclaimer in the documentation and/or other materials provided with the
* distribution.
*
* Neither the name of the University of Maryland nor the names of its contributors may be used to
* endorse or promote products derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* ACE Components were written in the ADAPT Project at the University of
* Maryland Institute for Advanced Computer Study.
*/
package edu.umiacs.ace.driver;
/**
*
* @author toaster
*/
public class DriverStateBean {
private Thread runningThread;
private State state;
private String file;
private long read;
private long totalSize;
private long lastChange;
public DriverStateBean() {
state = State.UNKNOWN;
}
public void clear() {
//state = State.UNKNOWN;
file = null;
read = -1;
totalSize = -1;
lastChange = System.currentTimeMillis();
}
public void setRunningThread( Thread runningThread ) {
this.runningThread = runningThread;
}
public Thread getRunningThread() {
return runningThread;
}
public long getIdle() {
return System.currentTimeMillis() - lastChange;
}
/**
* @return the state
*/
public State getState() {
return state;
}
/**
* @param state the state to set, this will also clear all other values
*/
public void setStateAndReset( State state ) {
this.state = state;
clear();
}
public void setState( State state ) {
this.state = state;
}
/**
* @return the file
*/
public String getFile() {
return file;
}
/**
* @param file the file to set
*/
public void setFile( String file ) {
this.file = file;
}
/**
* @return the read
*/
public long getRead() {
return read;
}
/**
* @param read the read to set
*/
public void setRead( long read ) {
this.read = read;
}
/**
* @return the totalSize
*/
public long getTotalSize() {
return totalSize;
}
/**
* @param totalSize the totalSize to set
*/
public void setTotalSize( long totalSize ) {
this.totalSize = totalSize;
}
/**
* @return the lastChange
*/
public long getLastChange() {
return lastChange;
}
public void updateLastChange() {
lastChange = System.currentTimeMillis();
}
/**
* @param lastChange the lastChange to set
*/
public void setLastChange( long lastChange ) {
this.lastChange = lastChange;
}
public enum State {
IDLE,
READING,
LISTING,
THROTTLE_WAIT,
UNKNOWN,
WAITING_ON_FILE,
OPENING_FILE
}
}
......@@ -30,7 +30,6 @@
// $Id$
package edu.umiacs.ace.driver;
import edu.umiacs.ace.monitor.audit.AuditIterable;
import edu.umiacs.ace.driver.filter.PathFilter;
import edu.umiacs.ace.monitor.core.MonitoredItem;
import edu.umiacs.ace.monitor.core.Collection;
......
......@@ -31,8 +31,9 @@
package edu.umiacs.ace.driver.benchmark;
import edu.umiacs.ace.driver.DriverStateBean;
import edu.umiacs.ace.util.PersistUtil;
import edu.umiacs.ace.monitor.audit.AuditIterable;
import edu.umiacs.ace.driver.AuditIterable;
import edu.umiacs.ace.driver.FileBean;
import edu.umiacs.ace.driver.filter.PathFilter;
import edu.umiacs.ace.driver.StorageDriver;
......@@ -201,6 +202,12 @@ public class BenchmarkAccess extends StorageDriver {
}
}
@Override
public DriverStateBean[] getState() {
return new DriverStateBean[0];
}
};
}
......
......@@ -31,7 +31,7 @@
package edu.umiacs.ace.driver.cifs;
import edu.umiacs.ace.monitor.audit.AuditIterable;
import edu.umiacs.ace.driver.AuditIterable;
import edu.umiacs.ace.driver.FileBean;
import edu.umiacs.ace.driver.filter.PathFilter;
import edu.umiacs.ace.driver.StorageDriver;
......
......@@ -31,8 +31,9 @@
package edu.umiacs.ace.driver.irods;
import edu.umiacs.ace.driver.DriverStateBean;
import edu.umiacs.ace.util.PersistUtil;
import edu.umiacs.ace.monitor.audit.AuditIterable;
import edu.umiacs.ace.driver.AuditIterable;
import edu.umiacs.ace.driver.FileBean;
import edu.umiacs.ace.driver.filter.PathFilter;
import edu.umiacs.ace.driver.StorageDriver;
......@@ -118,6 +119,12 @@ public class IrodsAccess extends StorageDriver {
public void cancel() {
it.cancel();
}
@Override
public DriverStateBean[] getState() {
return new DriverStateBean[] {it.getStateBean()};
}
};
}
......
......@@ -28,9 +28,10 @@
* Maryland Institute for Advanced Computer Study.
*/
// $Id$
package edu.umiacs.ace.driver.irods;
import edu.umiacs.ace.driver.DriverStateBean;
import edu.umiacs.ace.driver.DriverStateBean.State;
import edu.umiacs.ace.driver.FileBean;
import edu.umiacs.ace.driver.filter.PathFilter;
import edu.umiacs.ace.monitor.core.MonitoredItem;
......@@ -51,6 +52,7 @@ import org.apache.log4j.Logger;
*/
public class IrodsIterator implements Iterator<FileBean> {
private DriverStateBean stateBean;
private IrodsHandler handler = null;
private LinkedBlockingQueue<FileBean> readyList = new LinkedBlockingQueue<FileBean>();
private Thread takingThread;
......@@ -65,6 +67,7 @@ public class IrodsIterator implements Iterator<FileBean> {
String digestAlgorithm ) {
this.co = new ConnectOperation(ic.getServer(), ic.getPort(),
ic.getUsername(), ic.getPassword(), ic.getZone());
this.stateBean = new DriverStateBean();
try {
co.getConnection().closeConnection();
......@@ -100,6 +103,10 @@ public class IrodsIterator implements Iterator<FileBean> {
bfs.cancel();
}
public DriverStateBean getStateBean() {
return stateBean;
}
@Override
public boolean hasNext() {
return !readyList.isEmpty() || handler != null;
......@@ -131,24 +138,38 @@ public class IrodsIterator implements Iterator<FileBean> {
private class MyListener implements BulkTransferListener {
private long bytes;
@Override
public void startTransfer() {
stateBean.setStateAndReset(State.LISTING);
stateBean.setRunningThread(Thread.currentThread());
}
@Override
public void startFile( String fullPath ) {
bytes = 0;
stateBean.setStateAndReset(State.READING);
stateBean.setFile(fullPath);
}
@Override
public void bytesWritten( int bytesWritten ) {
bytes += bytesWritten;
stateBean.setRead(bytes);
stateBean.updateLastChange();
}
@Override
public void endFile( String fullPath ) {
stateBean.setStateAndReset(State.LISTING);
}
@Override
public void endTransfer() {
stateBean.setRunningThread(null);
LOG.debug("endTransfer called on irods iterator");
if ( !saverList.isEmpty() ) {
saverList.poll().execute(true);
......
......@@ -28,10 +28,11 @@
* Maryland Institute for Advanced Computer Study.
*/
// $Id$
package edu.umiacs.ace.driver.localfile;
import edu.umiacs.ace.monitor.audit.AuditIterable;
import edu.umiacs.ace.driver.AuditIterable;
import edu.umiacs.ace.driver.DriverStateBean;
import edu.umiacs.ace.driver.DriverStateBean.State;
import edu.umiacs.ace.driver.FileBean;
import edu.umiacs.ace.driver.filter.PathFilter;
import edu.umiacs.ace.driver.StorageDriver;
......@@ -106,15 +107,25 @@ public class LocalFileAccess extends StorageDriver {
final PathFilter filter, final MonitoredItem[] startPathList ) {
return new AuditIterable<FileBean>() {
private DriverStateBean statebean = new DriverStateBean();
private MyIterator it;
@Override
public Iterator<FileBean> iterator() {
return new MyIterator(startPathList, filter, digestAlgorithm);
it = new MyIterator(startPathList, filter, digestAlgorithm, statebean);
return it;
}
@Override
public void cancel() {
LOG.debug("Cancel called on localfile iterator");
cancel = true;
}
@Override
public DriverStateBean[] getState() {
return new DriverStateBean[]{statebean};
}
};
}
......@@ -125,18 +136,22 @@ public class LocalFileAccess extends StorageDriver {
private Queue<File> dirsToProcess = new LinkedList<File>();
private Queue<File> filesToProcess = new LinkedList<File>();
private MessageDigest digest;
private byte[] buffer = new byte[4096];
private byte[] buffer = new byte[32768];
private File rootFile;
private PathFilter filter;
private DriverStateBean statebean;
public MyIterator( MonitoredItem[] startPath, PathFilter filter,
String digestAlgorithm ) {
String digestAlgorithm, DriverStateBean statebean ) {
this.statebean = statebean;
this.filter = filter;
try {
digest = MessageDigest.getInstance(digestAlgorithm);
rootFile = new File(getCollection().getDirectory());
statebean.setRunningThread(Thread.currentThread());
statebean.setStateAndReset(State.LISTING);
if ( startPath != null ) {
for ( MonitoredItem mi : startPath ) {
File startFile;
......@@ -178,6 +193,7 @@ public class LocalFileAccess extends StorageDriver {
} catch ( NoSuchAlgorithmException ex ) {
throw new RuntimeException(ex);
}
statebean.setStateAndReset(State.IDLE);
}
@Override
......@@ -200,9 +216,12 @@ public class LocalFileAccess extends StorageDriver {
}
private void loadNext() {
statebean.setStateAndReset(State.LISTING);
// see if wee need to process a directory or if there are files in queue
while ( filesToProcess.isEmpty() && !dirsToProcess.isEmpty() ) {
File directory = dirsToProcess.poll();
statebean.setFile(directory.getPath());
LOG.trace("Popping directory: " + directory);
File[] fileList = directory.listFiles();
if ( fileList == null ) {
......@@ -230,6 +249,7 @@ public class LocalFileAccess extends StorageDriver {
} else {
next = null;
}
statebean.setStateAndReset(State.IDLE);
}
......@@ -252,17 +272,27 @@ public class LocalFileAccess extends StorageDriver {
@SuppressWarnings("empty-statement")
private FileBean processFile( File file ) {
DigestInputStream dis = null;
FileBean fb = new FileBean();
fb.setPathList(extractPathList(file));
LOG.trace("Processing file: " + file);
statebean.setStateAndReset(State.OPENING_FILE);
statebean.setRead(0);
statebean.setTotalSize(file.length());
statebean.setFile(file.getPath());
digest.reset();
try {
statebean.setState(State.THROTTLE_WAIT);
QueryThrottle.waitToRun();
statebean.setState(State.READING);
statebean.updateLastChange();
long fileSize = 0;
ThrottledInputStream tis =
new ThrottledInputStream(new FileInputStream(file), QueryThrottle.getMaxBps(), lastDelay);
......@@ -270,6 +300,8 @@ public class LocalFileAccess extends StorageDriver {
int read = 0;
while ( (read = dis.read(buffer)) >= 0 && !cancel ) {
fileSize += read;
statebean.updateLastChange();
statebean.setRead(fileSize);
}
lastDelay = tis.getSleepTime();
......@@ -284,6 +316,7 @@ public class LocalFileAccess extends StorageDriver {
fb.setErrorMessage(Strings.exceptionAsString(ie));
} finally {
IO.release(dis);
statebean.setStateAndReset(State.IDLE);
if ( cancel ) {
return null;
} else {
......@@ -295,6 +328,7 @@ public class LocalFileAccess extends StorageDriver {
@Override
public InputStream getItemInputStream( String itemPath ) throws IOException {
File f = new File(getCollection().getDirectory() + itemPath);
return new FileInputStream(f);
}
......
......@@ -35,8 +35,9 @@ import edu.sdsc.grid.io.srb.SRBAccount;
import edu.sdsc.grid.io.srb.SRBFile;
import edu.sdsc.grid.io.srb.SRBFileInputStream;
import edu.sdsc.grid.io.srb.SRBFileSystem;
import edu.umiacs.ace.driver.DriverStateBean;
import edu.umiacs.ace.util.PersistUtil;
import edu.umiacs.ace.monitor.audit.AuditIterable;
import edu.umiacs.ace.driver.AuditIterable;
import edu.umiacs.ace.driver.FileBean;
import edu.umiacs.ace.driver.filter.PathFilter;
import edu.umiacs.ace.driver.StorageDriver;
......@@ -223,6 +224,12 @@ public class SrbAccess extends StorageDriver {
srbIterator.cancel(true);
}
}
@Override
public DriverStateBean[] getState() {
return srbIterator.getStatebeans();
}
};
}
......
......@@ -28,12 +28,13 @@
* Maryland Institute for Advanced Computer Study.
*/
// $Id$
package edu.umiacs.ace.driver.srb;
import edu.umiacs.ace.driver.QueryThrottle;
import edu.sdsc.grid.io.srb.SRBFileInputStream;
import edu.sdsc.grid.io.srb.SRBFileSystem;
import edu.umiacs.ace.driver.DriverStateBean;
import edu.umiacs.ace.driver.DriverStateBean.State;
import edu.umiacs.ace.driver.FileBean;
import edu.umiacs.ace.driver.filter.PathFilter;
import edu.umiacs.ace.monitor.core.MonitoredItem;
......@@ -54,6 +55,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.apache.log4j.NDC;
......@@ -63,6 +66,7 @@ import org.apache.log4j.NDC;
*/
public class SrbDirectoryIterator implements Iterator<FileBean> {
private DriverStateBean[] statebeans;
private static final int MAX_THREADS = 5;
private static final int RETRY = 5;
private static final Logger LOG = Logger.getLogger(
......@@ -78,6 +82,7 @@ public class SrbDirectoryIterator implements Iterator<FileBean> {
private PathFilter filter;
private Thread takingThread = null;
private double lastDelay = 0;
private Lock loadLoack = new ReentrantLock();
public SrbDirectoryIterator( Collection c, SrbSettings settings,
MonitoredItem[] basePath, PathFilter filter, String digestAlgorithm ) {
......@@ -88,7 +93,6 @@ public class SrbDirectoryIterator implements Iterator<FileBean> {
SRBFileSystem sfs = null;
try {
pool.freeConnection(sfs = pool.getConnection());
// String startPath = root;
if ( basePath != null ) {
......@@ -108,9 +112,12 @@ public class SrbDirectoryIterator implements Iterator<FileBean> {
dirsToProcess.add(root);
}
statebeans = new DriverStateBean[MAX_THREADS];
for ( int i = 0; i < MAX_THREADS; i++ ) {
DriverStateBean sb = new DriverStateBean();
MessageDigest digest = MessageDigest.getInstance(digestAlgorithm);
Thread t = new Thread(new ProcessFileThread(digest, i));
Thread t = new Thread(new ProcessFileThread(digest, i, sb));
statebeans[i] = sb;
t.setName("SRB Reader " + i + " " + c.getName());
t.start();
......@@ -135,6 +142,10 @@ public class SrbDirectoryIterator implements Iterator<FileBean> {
}
public DriverStateBean[] getStatebeans() {
return statebeans;
}
/**
* Has next if it's not finished or queue isn't empty
* @return
......@@ -179,7 +190,7 @@ public class SrbDirectoryIterator implements Iterator<FileBean> {
* Load next file
* @return
*/
private synchronized String loadNext() {
private String loadNext() {
while ( filesToProcess.isEmpty() && !dirsToProcess.isEmpty() && !pool.isShutdown() ) {
String directory = dirsToProcess.poll();
LOG.trace("Popping directory dirsToProcess: " + directory);
......@@ -296,18 +307,21 @@ public class SrbDirectoryIterator implements Iterator<FileBean> {
class ProcessFileThread implements Runnable {
private DriverStateBean stateBean;
private byte[] buffer = new byte[2097152];
private MessageDigest digest;
private int id;
public ProcessFileThread( MessageDigest digest, int id ) {
public ProcessFileThread( MessageDigest digest, int id, DriverStateBean bean ) {
this.id = id;
this.digest = digest;
this.stateBean = bean;
}
@Override
public void run() {
NDC.push("[SRB" + id + "] ");
stateBean.setRunningThread(Thread.currentThread());
LOG.debug("SRB Thread starting: " + Thread.currentThread().getName());
synchronized ( threads ) {
threads.add(this);
......@@ -318,12 +332,18 @@ public class SrbDirectoryIterator implements Iterator<FileBean> {
String nextFile = null;
int retry = 0;
while ( retry < RETRY ) {
stateBean.setStateAndReset(State.WAITING_ON_FILE);
loadLoack.lockInterruptibly();
try {
stateBean.setStateAndReset(State.LISTING);
nextFile = loadNext();
retry = RETRY;