Commit f8919c92 authored by Michael Ritter's avatar Michael Ritter
Browse files

Merge branch 'develop'

parents 0499813b 14a4e29d
......@@ -144,7 +144,7 @@ public class BagTokenizer {
files.append(p.toString());
files.append("\n");
}
log.error("Error validating collection, ({}) bad files found:\n{}",
log.error("Error validating collection, {} bad files found:\n{}",
badFiles.size(), files.toString());
return null;
}
......
......@@ -27,4 +27,11 @@ public interface FileTransfer {
* @throws FileTransferException
*/
void put(Path localFile, String uri) throws FileTransferException;
/**
* Return the statistics for the given transfer (ie: transfer speed, amount, etc)
*
* @return
*/
String getStats();
}
......@@ -67,4 +67,9 @@ public class HttpsTransfer implements FileTransfer {
public void put(final Path localFile, final String uri) throws FileTransferException {
// TBD
}
@Override
public String getStats() {
return "";
}
}
package org.chronopolis.common.transfer;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Path;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
......@@ -26,31 +28,43 @@ public class RSyncTransfer implements FileTransfer {
private final Logger log = LoggerFactory.getLogger(RSyncTransfer.class);
private final ExecutorService threadPool = Executors.newSingleThreadExecutor();
private final String user;
private String stats;
public RSyncTransfer(String user) {
this.user = user;
this.user = user;
this.stats = "";
}
@Override
public Path getFile(final String uri, final Path local) throws FileTransferException {
// Taken from http://stackoverflow.com/questions/1246255/any-good-rsync-library-for-java
// Need to test/modify command
// Currently uses passwordless SSH keys to login to sword
// Currently uses passwordless SSH keys to login
log.debug(local.toString());
Callable<Path> download = new Callable<Path>() {
@Override
public Path call() throws Exception {
String[] cmd = new String[]{"rsync", "-a", user + "@" + uri, local.toString()};
String[] cmd = new String[]{"rsync", "-a", "--stats", user + "@" + uri, local.toString()};
String[] parts = uri.split(":", 2);
String[] pathList = parts[1].split("/");
ProcessBuilder pb = new ProcessBuilder(cmd);
Process p = null;
try {
log.info("Executing {} {} {} {}", cmd);
log.info("Executing {} {} {} {} {}", cmd);
p = pb.start();
p.waitFor();
} catch (IOException e) {
BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()));
StringBuilder out = new StringBuilder();
String line = null;
while ((line = reader.readLine()) != null) {
out.append(line).append("\n");
}
stats = out.toString();
log.info("rsync exit stats:\n {}", stats);
} catch (IOException e) {
log.error("IO Exception in rsync ", e);
throw new FileTransferException("IOException in rsync", e);
} catch (InterruptedException e) {
......@@ -121,4 +135,9 @@ public class RSyncTransfer implements FileTransfer {
}
@Override
public String getStats() {
return stats;
}
}
......@@ -13,12 +13,14 @@
<groupId>org.chronopolis</groupId>
<artifactId>duracloud-intake</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>war</packaging>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<!--
<version>1.0.2.RELEASE</version>
<configuration>
<mainClass>org.chronopolis.intake.Application</mainClass>
......@@ -30,6 +32,7 @@
</goals>
</execution>
</executions>
-->
</plugin>
</plugins>
</build>
......@@ -71,4 +74,4 @@
</dependencies>
</project>
\ No newline at end of file
</project>
......@@ -2,6 +2,13 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.1.4.RELEASE</version>
</parent>
<groupId>org.chronopolis</groupId>
<artifactId>chronopolis</artifactId>
<version>1.0-SNAPSHOT</version>
......
......@@ -3,6 +3,7 @@ package org.chronopolis.replicate.batch;
import org.chronopolis.common.ace.AceService;
import org.chronopolis.common.ace.GsonCollection;
import org.chronopolis.messaging.collection.CollectionInitMessage;
import org.chronopolis.replicate.ReplicationNotifier;
import org.chronopolis.replicate.config.ReplicationSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -30,11 +31,16 @@ public class AceRegisterStep implements Tasklet {
private AceService aceService;
private ReplicationSettings settings;
private CollectionInitMessage message;
private ReplicationNotifier notifier;
public AceRegisterStep(AceService aceService, ReplicationSettings settings, CollectionInitMessage message) {
public AceRegisterStep(AceService aceService,
ReplicationSettings settings,
CollectionInitMessage message,
ReplicationNotifier notifier) {
this.aceService = aceService;
this.settings = settings;
this.message = message;
this.notifier = notifier;
}
@Override
......@@ -74,10 +80,13 @@ public class AceRegisterStep implements Tasklet {
} catch (RetrofitError error) {
log.error("Error registering ACE collection. Response code {} with reason {}",
error.getResponse().getStatus(), error.getResponse().getReason());
notifier.setSuccess(false);
notifier.setAceStep(error.getResponse().getReason());
throw new RuntimeException(error);
}
long id = idMap.get("id");
final String[] statusMessage = {"success"};
Callback tsCallback = new Callback() {
@Override
......@@ -91,6 +100,8 @@ public class AceRegisterStep implements Tasklet {
log.error("Error posting token store {} {}",
retrofitError.getResponse().getStatus(),
retrofitError.getBody());
notifier.setSuccess(false);
statusMessage[0] = retrofitError.getResponse().getReason();
callbackComplete.getAndSet(true);
}
};
......@@ -114,13 +125,15 @@ public class AceRegisterStep implements Tasklet {
log.info("Could not start audit. {} {}",
error.getResponse().getStatus(),
error.getResponse().getReason());
notifier.setSuccess(false);
statusMessage[0] = error.getResponse().getReason();
callbackComplete.set(true);
}
});
log.trace("Waiting for audit start to complete");
waitForCallback(callbackComplete);
notifier.setAceStep(statusMessage[0]);
return RepeatStatus.FINISHED;
}
......
......@@ -9,6 +9,7 @@ import org.chronopolis.common.transfer.FileTransfer;
import org.chronopolis.common.transfer.HttpsTransfer;
import org.chronopolis.common.transfer.RSyncTransfer;
import org.chronopolis.messaging.collection.CollectionInitMessage;
import org.chronopolis.replicate.ReplicationNotifier;
import org.chronopolis.replicate.config.ReplicationSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -29,15 +30,21 @@ public class BagDownloadStep implements Tasklet {
private ReplicationSettings settings;
private CollectionInitMessage message;
private ReplicationNotifier notifier;
public BagDownloadStep(final ReplicationSettings settings,
final CollectionInitMessage message) {
final CollectionInitMessage message,
final ReplicationNotifier notifier) {
this.settings = settings;
this.message = message;
this.notifier = notifier;
}
@Override
public RepeatStatus execute(final StepContribution stepContribution, final ChunkContext chunkContext) throws Exception {
// For our notifier
String statusMessage = "success";
// Set up our download parameters
String collection = message.getCollection();
String depositor = message.getDepositor();
......@@ -63,8 +70,11 @@ public class BagDownloadStep implements Tasklet {
try {
transfer.getFile(uri, bagPath);
notifier.setRsyncStats(transfer.getStats());
} catch (FileTransferException e) {
log.error("File transfer exception", e);
notifier.setSuccess(false);
statusMessage = e.getMessage();
}
......@@ -78,6 +88,7 @@ public class BagDownloadStep implements Tasklet {
hashCode = Files.hash(tagmanifest.toFile(), hashFunction);
} catch (IOException e) {
log.error("Error hashing tagmanifest", e);
statusMessage = e.getMessage();
}
String calculatedDigest = hashCode.toString();
......@@ -88,11 +99,12 @@ public class BagDownloadStep implements Tasklet {
"\nFound {}\nExpected {}",
calculatedDigest,
tagDigest);
statusMessage = "Downloaded tag manifest does not match expected digest";
} else {
log.info("Successfully validated tagmanifest");
}
notifier.setBagStep(statusMessage);
return RepeatStatus.FINISHED;
}
}
......@@ -5,6 +5,7 @@ import org.chronopolis.common.mail.MailUtil;
import org.chronopolis.messaging.base.ChronMessage;
import org.chronopolis.messaging.collection.CollectionInitMessage;
import org.chronopolis.messaging.factory.MessageFactory;
import org.chronopolis.replicate.ReplicationNotifier;
import org.chronopolis.replicate.config.ReplicationSettings;
import org.chronopolis.replicate.util.MailFunctions;
import org.springframework.batch.core.StepContribution;
......@@ -24,33 +25,39 @@ public class ReplicationSuccessStep implements Tasklet {
private ChronProducer producer;
private MessageFactory messageFactory;
private ReplicationSettings settings;
private CollectionInitMessage message;
private ReplicationNotifier notifier;
public ReplicationSuccessStep(ChronProducer producer,
CollectionInitMessage message,
MessageFactory messageFactory,
MailUtil mailUtil,
ReplicationSettings replicationSettings) {
ReplicationSettings replicationSettings,
ReplicationNotifier notifier) {
this.producer = producer;
this.message = message;
this.messageFactory = messageFactory;
this.mailUtil = mailUtil;
this.notifier = notifier;
settings = replicationSettings;
}
@Override
public RepeatStatus execute(final StepContribution stepContribution, final ChunkContext chunkContext) throws Exception {
CollectionInitMessage message = notifier.getMessage();
String correlationId = message.getCorrelationId();
String returnKey = message.getReturnKey();
String nodeName = settings.getNode();
String subject = "Successful replication of " + message.getCollection();
String subject = notifier.isSuccess()
? "Successful replication of " + message.getCollection()
: "Failure in replication of " + message.getCollection();
ChronMessage response = messageFactory.collectionInitCompleteMessage(correlationId);
producer.send(response, returnKey);
String text = MailFunctions.createText(message, new HashMap<String, String>(), null);
SimpleMailMessage mailMessage = mailUtil.createMessage(nodeName, subject, text);
SimpleMailMessage mailMessage = mailUtil.createMessage(nodeName,
subject,
notifier.getNotificationBody());
mailUtil.send(mailMessage);
return RepeatStatus.FINISHED;
......
......@@ -7,6 +7,7 @@ import com.google.common.io.Files;
import org.chronopolis.common.exception.FileTransferException;
import org.chronopolis.common.exception.FixityException;
import org.chronopolis.messaging.collection.CollectionInitMessage;
import org.chronopolis.replicate.ReplicationNotifier;
import org.chronopolis.replicate.ReplicationQueue;
import org.chronopolis.replicate.config.ReplicationSettings;
import org.slf4j.Logger;
......@@ -27,17 +28,22 @@ public class TokenDownloadStep implements Tasklet {
private final Logger log = LoggerFactory.getLogger(TokenDownloadStep.class);
private ReplicationSettings settings;
private ReplicationNotifier notifier;
private CollectionInitMessage message;
public TokenDownloadStep(final ReplicationSettings settings,
final CollectionInitMessage message) {
final CollectionInitMessage message,
final ReplicationNotifier notifier) {
this.settings = settings;
this.message = message;
this.notifier = notifier;
}
@Override
public RepeatStatus execute(final StepContribution stepContribution, final ChunkContext chunkContext) throws Exception {
String statusMessage = "success";
String location = message.getTokenStore();
String protocol = message.getProtocol();
String digest = message.getTokenStoreDigest();
......@@ -50,9 +56,13 @@ public class TokenDownloadStep implements Tasklet {
protocol);
} catch (IOException e) {
log.error("Error downloading token store", e);
notifier.setSuccess(false);
notifier.setTokenStep(e.getMessage());
throw e;
} catch (FileTransferException e) {
log.error("File transfer exception", e);
notifier.setSuccess(false);
notifier.setTokenStep(e.getMessage());
throw e;
}
......@@ -61,12 +71,16 @@ public class TokenDownloadStep implements Tasklet {
try {
// Check to make sure the download was successful
if (!tokenStore.toFile().exists()) {
throw new IOException("TokenStore does does not exist");
throw new IOException("TokenStore "
+ tokenStore.toString()
+ " does does not exist");
}
hashCode = Files.hash(tokenStore.toFile(), hashFunction);
} catch (IOException e) {
log.error("Error hashing token store", e);
notifier.setSuccess(false);
notifier.setTokenStep(e.getMessage());
throw new FixityException("Could not validate the fixity of the token store", e);
}
......@@ -79,11 +93,15 @@ public class TokenDownloadStep implements Tasklet {
"\nFound {}\nExpected {}",
calculatedDigest,
digest);
notifier.setSuccess(false);
notifier.setTokenStep("Downloaded token store does not match expected digest");
throw new FixityException("Could not validate the fixity of the token store");
} else {
log.info("Successfully validated token store");
}
notifier.setTokenStep(statusMessage);
return RepeatStatus.FINISHED;
}
}
......@@ -14,6 +14,7 @@ import org.chronopolis.messaging.base.ChronProcessor;
import org.chronopolis.messaging.collection.CollectionInitCompleteMessage;
import org.chronopolis.messaging.collection.CollectionInitMessage;
import org.chronopolis.messaging.factory.MessageFactory;
import org.chronopolis.replicate.ReplicationNotifier;
import org.chronopolis.replicate.batch.AceRegisterStep;
import org.chronopolis.replicate.batch.BagDownloadStep;
import org.chronopolis.replicate.batch.ReplicationStepListener;
......@@ -100,21 +101,22 @@ public class CollectionInitProcessor implements ChronProcessor {
// if we do, just sent an init complete message
GsonCollection gsonCollection = aceService.getCollectionByName(collection, depositor);
if (gsonCollection == null) {
ReplicationNotifier notifier = new ReplicationNotifier(msg);
Job job = jobBuilderFactory.get("collection-replicate")
.start(stepBuilderFactory.get("token-replicate")
.tasklet(new TokenDownloadStep(settings, msg))
.tasklet(new TokenDownloadStep(settings, msg, notifier))
.listener(replicationStepListener)
.build())
.next(stepBuilderFactory.get("bag-replicate")
.tasklet(new BagDownloadStep(settings, msg))
.tasklet(new BagDownloadStep(settings, msg, notifier))
.listener(replicationStepListener)
.build())
.next(stepBuilderFactory.get("ace-register")
.tasklet(new AceRegisterStep(aceService, settings, msg))
.tasklet(new AceRegisterStep(aceService, settings, msg, notifier))
.listener(replicationStepListener)
.build())
.next(stepBuilderFactory.get("replication-success")
.tasklet(new ReplicationSuccessStep(producer, msg, messageFactory, mailUtil, settings))
.tasklet(new ReplicationSuccessStep(producer, messageFactory, mailUtil, settings, notifier))
.listener(replicationStepListener)
.build())
.build();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment