Commit 9c2ae7eb authored by Michael Ritter's avatar Michael Ritter
Browse files

Merge branch 'release-1.3.0'

parents 2378b25d abe1da0a
......@@ -5,11 +5,11 @@
<parent>
<groupId>org.chronopolis</groupId>
<artifactId>chronopolis</artifactId>
<version>1.2.0-RELEASE</version>
<version>1.3.0-RELEASE</version>
</parent>
<artifactId>chron-common</artifactId>
<version>1.2.0-RELEASE</version>
<version>1.3.0-RELEASE</version>
<name>Common</name>
<url>http://maven.apache.org</url>
<properties>
......
......@@ -25,7 +25,7 @@ public interface AceService {
Call<Void> modifyCollection(@Path("id") long id, @Body GsonCollection collection);
@POST("rest/collection/")
Call<Map<String, Integer>> addCollection(@Body GsonCollection collection);
Call<Map<String, Long>> addCollection(@Body GsonCollection collection);
@GET("rest/collection/settings/by-id/{id}")
Call<GsonCollection> getCollectionById(@Path("id") long id);
......
......@@ -38,7 +38,6 @@ public class GsonCollection {
return digestAlgorithm;
}
public String getDirectory() {
return directory;
}
......
......@@ -36,10 +36,10 @@ public class Tokenizer {
*
*/
public static class IMSFactory {
public TokenRequestBatch createIMSConnection(RequestBatchCallback callback) {
public TokenRequestBatch createIMSConnection(String imsHostName, RequestBatchCallback callback) {
IMSService ims;
// TODO: Use the AceSettings to get the ims host name
ims = IMSService.connect("ims.umiacs.umd.edu", SSL_PORT, true);
ims = IMSService.connect(imsHostName, SSL_PORT, true);
return ims.createImmediateTokenRequestBatch("SHA-256",
callback,
MAX_QUEUE_LEN,
......@@ -53,6 +53,7 @@ public class Tokenizer {
private final Logger log = LoggerFactory.getLogger(Tokenizer.class);
private final Path bag;
private final String imsHostName;
private final Digest fixityAlgorithm;
......@@ -71,6 +72,7 @@ public class Tokenizer {
public Tokenizer(final Path bag,
final String fixityAlgorithm,
final String imsHostName,
final RequestBatchCallback callback) {
this.bag = bag;
this.fixityAlgorithm = Digest.fromString(fixityAlgorithm);
......@@ -78,6 +80,7 @@ public class Tokenizer {
this.callback = callback;
this.tagDigest = null;
this.factory = new Tokenizer.IMSFactory();
this.imsHostName = imsHostName;
addManifests();
}
......@@ -133,7 +136,7 @@ public class Tokenizer {
* @throws InterruptedException
*/
public void tokenize(Filter<Path> filter) throws IOException, InterruptedException {
batch = factory.createIMSConnection(callback);
batch = factory.createIMSConnection(imsHostName, callback);
try {
// Digest the manifest
......
package org.chronopolis.common.settings;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* Created by shake on 8/13/14.
*/
@Component
public class AMQPSettings {
@Value("${amqp.virtual.host:chronopolis}")
String virtualHost;
@Value("${amqp.exchange:chronopolis-control}")
String exchange;
// TODO: rename to addresses; make List<String>
@Value("${amqp.addresses:adapt-mq.umiacs.umd.edu}")
String addresses;
public String getVirtualHost() {
return virtualHost;
}
public void setVirtualHost(final String virtualHost) {
this.virtualHost = virtualHost;
}
public String getExchange() {
return exchange;
}
public void setExchange(final String exchange) {
this.exchange = exchange;
}
public String getAddresses() {
return addresses;
}
public void setAddresses(final String addresses) {
this.addresses = addresses;
}
}
......@@ -13,6 +13,7 @@ import java.net.HttpURLConnection;
import java.net.URL;
/**
*
* Created by shake on 8/14/14.
*/
@Component
......@@ -37,7 +38,7 @@ public class AceSettings {
@Value("${ace.am.password:admin}")
String amPassword;
@Value("${ace.am.validate:true}")
@Value("${ace.am.validate:false}")
Boolean amValidate;
......
......@@ -5,7 +5,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
/**
......@@ -38,12 +38,19 @@ public class DPNSettings {
}
@Value("${dpn.endpoints:http://localhost}")
public void setIngestEndpoints(String ingestEndpoints) {
public void setDpnEndpoints(String args) {
log.debug("Splitting dpn endpoints");
String[] endpoints = ingestEndpoints.split(",");
String[] endpoints = args.split(",");
log.debug("Found {} endpoints: {}", endpoints.length, endpoints);
if (endpoints.length > 0) {
this.dpnEndpoints = Arrays.asList(endpoints);
this.dpnEndpoints = new ArrayList<>();
// TODO: Replace string with HttpUrl?
for (String endpoint : endpoints) {
if (!endpoint.endsWith("/")) {
endpoint += "/";
}
dpnEndpoints.add(endpoint);
}
}
......
......@@ -5,7 +5,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
/**
......@@ -81,12 +81,19 @@ public class IngestAPISettings {
}
@Value("${ingest.api.endpoints:http://localhost}")
public void setIngestEndpoints(String ingestEndpoints) {
public void setIngestEndpoints(String endpointArgs) {
log.debug("Splitting endpoints");
String[] endpoints = ingestEndpoints.split(",");
log.debug("Found {} endpoints: {}", endpoints.length, endpoints);
if (endpoints.length > 0) {
this.ingestEndpoints = Arrays.asList(endpoints);
String[] args = endpointArgs.split(",");
log.debug("Found {} endpoints: {}", args.length, args);
// TODO: Replace string with HttpUrl?
ingestEndpoints = new ArrayList<>();
for (String endpoint : args) {
if (!endpoint.endsWith("/")) {
endpoint += "/";
}
ingestEndpoints.add(endpoint);
}
}
}
......@@ -14,8 +14,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* TODO: If the rsync cannot connect it will simply hang.
......@@ -75,8 +73,7 @@ public class RSyncTransfer implements FileTransfer {
throw new FileTransferException("rsync was interrupted", e);
}
Path dir = local.resolve(pathList[pathList.length - 1]);
return dir;
return local.resolve(pathList[pathList.length - 1]);
}
};
......@@ -107,31 +104,28 @@ public class RSyncTransfer implements FileTransfer {
@Override
public void put(final Path localFile, final String uri) throws FileTransferException {
Callable<Boolean> upload = new Callable() {
@Override
public Boolean call() {
// Ensure that we don't include the directory
String local = localFile.toString();
if (!local.endsWith("/")) {
local += "/";
}
String[] cmd = new String[]{"rsync", "-az", local, uri};
ProcessBuilder pb = new ProcessBuilder(cmd);
Process p = null;
try {
log.info("Executing {} {} {} {}", cmd);
p = pb.start();
p.waitFor();
log.info("rsync exit value: " + p.exitValue());
} catch (IOException e) {
log.error("Error starting rsync", e);
return false;
} catch (InterruptedException e) {
log.error("rsync interrupted", e);
return false;
}
return true;
Callable<Boolean> upload = () -> {
// Ensure that we don't include the directory
String local = localFile.toString();
if (!local.endsWith("/")) {
local += "/";
}
String[] cmd = new String[]{"rsync", "-az", local, uri};
ProcessBuilder pb = new ProcessBuilder(cmd);
Process p = null;
try {
log.info("Executing {} {} {} {}", cmd);
p = pb.start();
p.waitFor();
log.info("rsync exit value: " + p.exitValue());
} catch (IOException e) {
log.error("Error starting rsync", e);
return false;
} catch (InterruptedException e) {
log.error("rsync interrupted", e);
return false;
}
return true;
};
FutureTask<Boolean> timedTask = new FutureTask<>(upload);
......
......@@ -7,11 +7,11 @@
<parent>
<groupId>org.chronopolis</groupId>
<artifactId>chronopolis</artifactId>
<version>1.2.0-RELEASE</version>
<version>1.3.0-RELEASE</version>
</parent>
<artifactId>duracloud-backend</artifactId>
<version>1.2.0-RELEASE</version>
<version>1.3.0-RELEASE</version>
<build>
<plugins>
......
package org.chronopolis.intake.duracloud;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import okhttp3.OkHttpClient;
import org.chronopolis.common.ace.OkBasicInterceptor;
import org.chronopolis.common.mail.MailUtil;
......@@ -17,6 +19,8 @@ import org.chronopolis.intake.duracloud.scheduled.Bridge;
import org.chronopolis.intake.duracloud.service.ChronService;
import org.chronopolis.rest.api.ErrorLogger;
import org.chronopolis.rest.api.IngestAPI;
import org.chronopolis.rest.support.ZonedDateTimeDeserializer;
import org.chronopolis.rest.support.ZonedDateTimeSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
......@@ -34,6 +38,7 @@ import org.springframework.context.annotation.ComponentScan;
import retrofit2.GsonConverterFactory;
import retrofit2.Retrofit;
import java.time.ZonedDateTime;
import java.util.concurrent.TimeUnit;
// import org.chronopolis.earth.api.LocalAPI;
......@@ -74,6 +79,12 @@ public class Application implements CommandLineRunner {
@Bean
IngestAPI ingestAPI(IngestAPISettings settings) {
String endpoint = settings.getIngestEndpoints().get(0);
Gson gson = new GsonBuilder()
.registerTypeAdapter(ZonedDateTime.class, new ZonedDateTimeSerializer())
.registerTypeAdapter(ZonedDateTime.class, new ZonedDateTimeDeserializer())
.create();
OkHttpClient client = new OkHttpClient.Builder()
.addInterceptor(new OkBasicInterceptor(
settings.getIngestAPIUsername(),
......@@ -82,7 +93,7 @@ public class Application implements CommandLineRunner {
.build();
Retrofit adapter = new Retrofit.Builder()
.addConverterFactory(GsonConverterFactory.create())
.addConverterFactory(GsonConverterFactory.create(gson))
.baseUrl(endpoint)
.client(client)
.build();
......
......@@ -49,7 +49,7 @@ public class DpnInfoReader {
}
// Empty dpninfo?
return new DpnInfoReader(ImmutableMultimap.<Tag, String>of());
return new DpnInfoReader(ImmutableMultimap.of());
}
private static DpnInfoReader read(BufferedReader r) throws IOException {
......
......@@ -41,6 +41,7 @@ public class PropertiesDataCollector implements DataCollector {
try {
InputStream is = Files.newInputStream(propertiesPath);
properties.load(is);
is.close();
data.setSnapshotId(snapshotId);
data.setDepositor(properties.getProperty(PROPERTY_OWNER_ID, "DEPOSITOR_PLACEHOLDER"));
......
......@@ -11,6 +11,7 @@ import org.chronopolis.bag.writer.TarPackager;
import org.chronopolis.bag.writer.UUIDNamingSchema;
import org.chronopolis.bag.writer.Writer;
import org.chronopolis.intake.duracloud.batch.support.DpnWriter;
import org.chronopolis.intake.duracloud.batch.support.DuracloudMD5;
import org.chronopolis.intake.duracloud.config.IntakeSettings;
import org.chronopolis.intake.duracloud.model.BaggingHistory;
import org.chronopolis.intake.duracloud.remote.BridgeAPI;
......@@ -82,6 +83,7 @@ public class BaggingTasklet implements Tasklet {
// TODO: fill out with what...?
// TODO: EXTERNAL-IDENTIFIER: snapshot.description
BagInfo info = new BagInfo()
.includeMissingTags(true)
.withInfo(BagInfo.Tag.INFO_SOURCE_ORGANIZATION, depositor);
Writer writer = new DpnWriter()
......@@ -90,12 +92,12 @@ public class BaggingTasklet implements Tasklet {
.withBagIt(new BagIt())
.withDigest(Digest.SHA_256)
.withPayloadManifest(manifest)
.withMaxSize(250, Unit.GIGABYTE)
.withMaxSize(245, Unit.GIGABYTE)
.withPackager(new TarPackager(out))
.withNamingSchema(new UUIDNamingSchema())
.withTagFile(new OnDiskTagFile(snapshotBase.resolve(SNAPSHOT_COLLECTION_PROPERTIES)))
.withTagFile(new OnDiskTagFile(snapshotBase.resolve(SNAPSHOT_CONTENT_PROPERTIES)))
.withTagFile(new OnDiskTagFile(snapshotBase.resolve(SNAPSHOT_MD5)));
.withTagFile(new DuracloudMD5(snapshotBase.resolve(SNAPSHOT_MD5)));
List<Bag> bags = writer.write();
boolean valid = true;
......
......@@ -312,9 +312,9 @@ public class ReplicationTasklet implements Runnable {
chronRequest.setReplicatingNodes(
ImmutableList.of(settings.getChronReplicationNodes()));
Call<org.chronopolis.rest.models.Bag> stageCall = chronAPI.stageBag(chronRequest);
Call<org.chronopolis.rest.entities.Bag> stageCall = chronAPI.stageBag(chronRequest);
try {
retrofit2.Response<org.chronopolis.rest.models.Bag> response = stageCall.execute();
retrofit2.Response<org.chronopolis.rest.entities.Bag> response = stageCall.execute();
} catch (IOException e) {
log.error("Unable to stage bag with chronopolis", e);
......@@ -355,7 +355,7 @@ public class ReplicationTasklet implements Runnable {
.setInterpretive(reader.getInterpretiveIds())
.setFixities(ImmutableMap.of("sha256", receipt)) // sha256 digest
.setFirstVersionUuid(reader.getFirstVersionUUID()) // uuid
.setReplicatingNodes(ImmutableList.<String>of("chron")); // chron
.setReplicatingNodes(ImmutableList.of("chron")); // chron
Call<Bag> call = dpn.getBagAPI().createBag(bag);/*, new retrofit.Callback<Bag>() {
@Override
......
......@@ -97,4 +97,20 @@ public class DpnInfo implements TagFile {
return is;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DpnInfo dpnInfo = (DpnInfo) o;
return path != null ? path.equals(dpnInfo.path) : dpnInfo.path == null;
}
@Override
public int hashCode() {
return path != null ? path.hashCode() : 0;
}
}
......@@ -4,7 +4,10 @@ import org.chronopolis.bag.core.Bag;
import org.chronopolis.bag.writer.MultipartWriter;
import org.chronopolis.bag.writer.Writer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Optional;
/**
* Extension of the MultipartWriter that adds a DpnInfo file into Bags
......@@ -33,7 +36,10 @@ public class DpnWriter extends MultipartWriter {
String name = namingSchema.getName(idx);
bag.setName(name);
// Create the dpn info + update the bags md5 manifest
addDpnInfo(bag);
updateMd5s(bag);
writeBag(bag);
idx++;
}
......@@ -41,6 +47,35 @@ public class DpnWriter extends MultipartWriter {
return bags;
}
private void updateMd5s(Bag b) {
Path md5 = Paths.get("manifest-md5.txt");
b.getTags().values().stream()
.filter(t -> t.getPath().equals(md5))
.map(t -> {
// Create a List<Optional<DuracloudMD5>>
Optional<DuracloudMD5> optional;
if (t instanceof DuracloudMD5) {
optional = Optional.of((DuracloudMD5) t);
} else {
optional = Optional.empty();
}
return optional;
})
.forEach(o -> o.ifPresent(m -> ifPresent(b, m)));
}
private void ifPresent(Bag b, DuracloudMD5 md5) {
md5.setPredicate(s -> {
String[] split = s.split("\\s+", 2);
if (split.length != 2) {
return false;
}
String path = split[1];
return b.getFiles().containsKey(Paths.get(path));
});
}
private void addDpnInfo(Bag b) {
DpnInfo dpnInfo = new DpnInfo();
......
package org.chronopolis.intake.duracloud.batch.support;
import org.chronopolis.bag.core.OnDiskTagFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
*
* Created by shake on 5/12/16.
*/
public class DuracloudMD5 extends OnDiskTagFile {
private final Logger log = LoggerFactory.getLogger(DuracloudMD5.class);
private Predicate<String> predicate;
private List<String> collection;
private Long size;
private final Path path;
public DuracloudMD5(Path tag) {
super(tag);
this.path = tag;
}
// TODO: Can probably combine this + update stream and discard the predicate when we're done
public void setPredicate(Predicate<String> predicate) {
this.predicate = predicate;
updateStream();
}
private void updateStream() {
try {
collection = Files.lines(path)
.filter(predicate)
.collect(Collectors.toList());
size = collection.stream().reduce(0L, (l, s) -> l + (s + "\n").length(), (l, r) -> l + r);
} catch (IOException e) {
log.error("Error reading duracloud md5 manifest");
// TODO: RuntimeError?
throw new RuntimeException("");
}
}
@Override
public long getSize() {
if (size != null) {
return size;
}
return super.getSize();
}
@Override
public InputStream getInputStream() {
if (collection != null) {
return new IteratorInputStream(collection.iterator());
}
return super.getInputStream();
}
class IteratorInputStream extends InputStream {
Iterator<String> iterator;
ByteBuffer current;
public IteratorInputStream(Iterator<String> iterator) {
this.iterator = iterator;
}
@Override