Commit ce321f55 authored by Michael Ritter's avatar Michael Ritter
Browse files

Merge branch 'develop'

parents fd2b092f 58ae94c5
......@@ -24,7 +24,8 @@ git checkout --track origin/develop
Todo:
Lots of stuff
* Add property for setting the from field for smtp (may want to copy ace and do from@localhost)
* Move from Quartz to Spring Batch.
[1]: https://chron-git.umiacs.umd.edu/chron-core/chronopolis-doc
\ No newline at end of file
[1]: https://gitlab.umiacs.umd.edu/chronopolis/chronopolis-core/wikis/home
package org.chronopolis.amqp;
import java.io.IOException;
import org.chronopolis.messaging.MessageType;
import org.chronopolis.messaging.base.ChronBody;
......@@ -17,44 +18,50 @@ import org.springframework.amqp.core.MessageProperties;
/**
* Listener which receives notifications from AMQP when chronopolis messages are
* received by various services
*
*
* @author toaster
*/
public abstract class ChronMessageListener implements MessageListener {
private Logger log = LoggerFactory.getLogger(ChronMessageListener.class);
@Override
@Override
public void onMessage(Message msg) {
MessageProperties props = msg.getMessageProperties();
byte[] body = msg.getBody();
ChronMessage message = null;
if ( null == props ) {
if (null == props) {
throw new IllegalArgumentException("Message properties are null!");
}
if ( null == props.getHeaders() || props.getHeaders().isEmpty()) {
if (null == props.getHeaders() || props.getHeaders().isEmpty()) {
throw new IllegalArgumentException("Message headers are empty!");
}
ChronBody chronBody = getChronBody(body);
message = ChronMessage.getMessage(chronBody.getType());
message.setHeader(props.getHeaders());
message.setBody(chronBody);
try {
ChronBody chronBody = getChronBody(body);
message = ChronMessage.getMessage(chronBody.getType());
message.setHeader(props.getHeaders());
message.setBody(chronBody);
} catch (Exception e) {
log.error("Error reading message", e);
}
// Sanity Check
if ( null != message ) {
ChronProcessor processor = getProcessor(message.getType());
try {
if (null != message) {
log.debug("Received {}", message);
try {
ChronProcessor processor = getProcessor(message.getType());
log.info("Processing {}", message.getType());
processor.process(message);
} catch (Exception e){
log.error("Unexpected processing error '{}' ", e);
} catch (Exception e) {
log.error("Unexpected processing error", e);
}
}
}
private ChronBody getChronBody(byte[] body) {
if ( body == null ) {
if (body == null) {
throw new IllegalArgumentException("Message body is null.");
}
......@@ -71,5 +78,5 @@ public abstract class ChronMessageListener implements MessageListener {
}
}
public abstract ChronProcessor getProcessor(MessageType type);
public abstract ChronProcessor getProcessor(MessageType type);
}
......@@ -12,11 +12,12 @@ import org.chronopolis.messaging.base.ChronMessage;
*/
public interface ChronProducer {
/*
*
/**
* Send a {@link org.chronopolis.messaging.base.ChronMessage} through RabbitMQ
*
* @param message message to be sent
* @param routingKey key for the route
* @param routingKey key to route to
*/
public void send(ChronMessage message, String routingKey);
void send(ChronMessage message, String routingKey);
}
......@@ -16,12 +16,12 @@ import org.springframework.amqp.rabbit.connection.ConnectionListener;
public class ConnectionListenerImpl implements ConnectionListener {
private final Logger log = LoggerFactory.getLogger(ConnectionListenerImpl.class);
public void onCreate(Connection cnctn) {
log.info("Connection created " + cnctn.toString());
public void onCreate(final Connection cnctn) {
log.info("Connection created {}", cnctn.toString());
}
public void onClose(Connection cnctn) {
log.info("Connection closed" + cnctn.toString());
public void onClose(final Connection cnctn) {
log.info("Connection closed {}", cnctn.toString());
}
}
......@@ -5,10 +5,22 @@
package org.chronopolis.amqp;
/**
* Could use this for broadcast routing keys
*
* @author toaster
*/
public enum RoutingKey {
INGEST_BROADCAST("ingest.broadcast"),
REPLICATE_BROADCAST("replicate.broadcast");
private final String route;
private RoutingKey(String route) {
this.route = route;
}
public String asRoute() {
return route;
}
}
......@@ -4,7 +4,6 @@
*/
package org.chronopolis.amqp;
import java.io.IOException;
import java.util.Map;
import org.chronopolis.messaging.base.ChronMessage;
import org.slf4j.Logger;
......@@ -23,7 +22,7 @@ public class TopicProducer implements ChronProducer {
private final Logger log = LoggerFactory.getLogger(TopicProducer.class);
private RabbitTemplate template;
private final RabbitTemplate template;
private String defaultRoutingKey;
public TopicProducer(RabbitTemplate template) {
......@@ -31,7 +30,7 @@ public class TopicProducer implements ChronProducer {
}
@Override
public void send(ChronMessage message, String routingKey) {
public void send(final ChronMessage message, String routingKey) {
boolean done = false;
int numTries = 0;
log.debug("Preparing message {}", message.toString());
......@@ -39,21 +38,21 @@ public class TopicProducer implements ChronProducer {
props.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
props.setContentType("application/json");
if ( null == routingKey ) {
if (null == routingKey) {
routingKey = defaultRoutingKey;
}
Map<String, Object> headers = message.getHeader();
if ( headers != null && !headers.isEmpty()) {
for ( String key : headers.keySet()) {
if (headers != null && !headers.isEmpty()) {
for (String key : headers.keySet()) {
props.setHeader(key, headers.get(key));
}
}else {
} else {
log.error("Message headers not valid!");
throw new RuntimeException("Invalid headers");
}
while ( !done && numTries < 3 ) {
while (!done && numTries < 3) {
try {
Message msg = new Message(message.toJson().getBytes(), props);
log.info("Sending {} to {} ", message.getType(), routingKey);
......@@ -61,15 +60,12 @@ public class TopicProducer implements ChronProducer {
template.send(routingKey, msg);
done = true;
} catch (AmqpException e) {
log.error("Error publishing '{}', retrying", message, e);
log.error("Error publishing {}, retrying", message, e);
numTries++;
}
}
}
}
......@@ -17,7 +17,7 @@ public class ErrorHandlerImpl implements ErrorHandler {
private final Logger log = LoggerFactory.getLogger(ErrorHandlerImpl.class);
public void handleError(Throwable thrwbl) {
log.error("Caught error: {}", thrwbl);
log.error("Caught error: ", thrwbl);
}
}
package org.chronopolis.amqp;
import junit.framework.TestCase;
import org.chronopolis.common.properties.GenericProperties;
import org.chronopolis.common.settings.ChronopolisSettings;
import org.chronopolis.messaging.MessageType;
import org.chronopolis.messaging.base.ChronProcessor;
import org.chronopolis.messaging.collection.CollectionInitMessage;
......@@ -34,12 +34,12 @@ public class ChronMessageListenerTest extends TestCase {
public void setUp() throws Exception {
super.setUp();
GenericProperties properties = new GenericProperties("one", "two", "three", "four", "five");
messageFactory = new MessageFactory(properties);
ChronopolisSettings settings = new ChronopolisSettings();
messageFactory = new MessageFactory(settings);
}
public void testOnMessage() throws Exception {
CollectionInitMessage chronMessage = messageFactory.DefaultCollectionInitMessage();
CollectionInitMessage chronMessage = messageFactory.defaultCollectionInitMessage();
processor.setExpected(chronMessage);
String json = chronMessage.toJson();
......
<?xml version="1.0"?>
<!DOCTYPE module PUBLIC
"-//Puppy Crawl//DTD Check Configuration 1.2//EN"
"http://www.puppycrawl.com/dtds/configuration_1_2.dtd">
<!--
Checkstyle configuration that checks the sun coding conventions from:
- the Java Language Specification at
http://java.sun.com/docs/books/jls/second_edition/html/index.html
- the Sun Code Conventions at http://java.sun.com/docs/codeconv/
- the Javadoc guidelines at
http://java.sun.com/j2se/javadoc/writingdoccomments/index.html
- the JDK Api documentation http://java.sun.com/j2se/docs/api/index.html
- some best practices
Checkstyle is very configurable. Be sure to read the documentation at
http://checkstyle.sf.net (or in your downloaded distribution).
Most Checks are configurable, be sure to consult the documentation.
To completely disable a check, just comment it out or delete it from the file.
Finally, it is worth reading the documentation.
-->
<module name="Checker">
<!--
If you set the basedir property below, then all reported file
names will be relative to the specified directory. See
http://checkstyle.sourceforge.net/5.x/config.html#Checker
<property name="basedir" value="${basedir}"/>
-->
<!-- Checks that each Java package has a Javadoc file used for commenting. -->
<!-- See http://checkstyle.sf.net/config_javadoc.html#JavadocPackage -->
<module name="JavadocPackage">
<property name="allowLegacy" value="true"/>
<property name="severity" value="warning"/>
</module>
<!-- Checks whether files end with a new line. -->
<!-- See http://checkstyle.sf.net/config_misc.html#NewlineAtEndOfFile -->
<module name="NewlineAtEndOfFile"/>
<!-- Checks that property files contain the same keys. -->
<!-- See http://checkstyle.sf.net/config_misc.html#Translation -->
<module name="Translation"/>
<module name="FileLength"/>
<!-- Following interprets the header file as regular expressions. -->
<!-- <module name="RegexpHeader"/> -->
<module name="FileTabCharacter">
<property name="eachLine" value="true"/>
</module>
<module name="RegexpSingleline">
<!-- \s matches whitespace character, $ matches end of line. -->
<property name="format" value="\s+$"/>
<property name="message" value="Line has trailing spaces."/>
</module>
<module name="TreeWalker">
<property name="cacheFile" value="${checkstyle.cache.file}"/>
<!-- Checks for Javadoc comments. -->
<!-- See http://checkstyle.sf.net/config_javadoc.html
<module name="JavadocMethod">
<property name="severity" value="warning"/>
</module>
<module name="JavadocType">
<property name="severity" value="warning"/>
</module>
<module name="JavadocVariable"/>
<property name="severity" value="warning"/>
</module>
<module name="JavadocStyle"/> -->
<!-- Checks for Naming Conventions. -->
<!-- See http://checkstyle.sf.net/config_naming.html -->
<module name="ConstantName"/>
<module name="LocalFinalVariableName"/>
<module name="LocalVariableName"/>
<module name="MemberName">
<property name="severity" value="warning"/>
</module>
<module name="MethodName"/>
<module name="PackageName"/>
<module name="ParameterName"/>
<module name="StaticVariableName"/>
<module name="TypeName"/>
<!-- Checks for Headers -->
<!-- See http://checkstyle.sf.net/config_header.html -->
<!-- <module name="Header"> -->
<!-- The follow property value demonstrates the ability -->
<!-- to have access to ANT properties. In this case it uses -->
<!-- the ${basedir} property to allow Checkstyle to be run -->
<!-- from any directory within a project. See property -->
<!-- expansion, -->
<!-- http://checkstyle.sf.net/config.html#properties -->
<!-- <property -->
<!-- name="headerFile" -->
<!-- value="${basedir}/java.header"/> -->
<!-- </module> -->
<!-- Checks for imports -->
<!-- See http://checkstyle.sf.net/config_import.html -->
<module name="AvoidStarImport"/>
<module name="IllegalImport"/> <!-- defaults to sun.* packages -->
<module name="RedundantImport"/>
<module name="UnusedImports">
<property name="severity" value="warning"/>
</module>
<!-- Checks for Size Violations. -->
<!-- See http://checkstyle.sf.net/config_sizes.html -->
<module name="LineLength">
<property name="max" value="100"/>
<property name="ignorePattern" value="^\s*public static final"/>
</module>
<module name="MethodLength"/>
<module name="ParameterNumber">
<property name="max" value="15"/>
</module>
<!-- Checks for whitespace -->
<!-- See http://checkstyle.sf.net/config_whitespace.html -->
<module name="EmptyForIteratorPad"/>
<module name="MethodParamPad"/>
<module name="NoWhitespaceAfter"/>
<module name="NoWhitespaceBefore"/>
<module name="OperatorWrap"/>
<module name="ParenPad"/>
<module name="TypecastParenPad"/>
<module name="WhitespaceAfter"/>
<module name="WhitespaceAround"/>
<!-- Modifier Checks -->
<!-- See http://checkstyle.sf.net/config_modifiers.html -->
<module name="ModifierOrder"/>
<module name="RedundantModifier"/>
<!-- Checks for blocks. You know, those {}'s -->
<!-- See http://checkstyle.sf.net/config_blocks.html -->
<module name="AvoidNestedBlocks"/>
<module name="EmptyBlock">
<property name="severity" value="info"/>
</module>
<module name="LeftCurly"/>
<module name="NeedBraces"/>
<module name="RightCurly"/>
<!-- Checks for common coding problems -->
<!-- See http://checkstyle.sf.net/config_coding.html -->
<module name="AvoidInlineConditionals">
<property name="severity" value="warning"/>
</module>
<module name="EmptyStatement">
<property name="severity" value="info"/>
</module>
<module name="EqualsHashCode"/>
<module name="HiddenField">
<property name="ignoreConstructorParameter" value="true"/>
<property name="ignoreSetter" value="true"/>
</module>
<module name="IllegalInstantiation"/>
<module name="InnerAssignment"/>
<module name="MagicNumber">
<property name="severity" value="warning"/>
</module>
<module name="MissingSwitchDefault"/>
<module name="RedundantThrows"/>
<module name="SimplifyBooleanExpression"/>
<module name="SimplifyBooleanReturn"/>
<!-- Checks for class design -->
<!-- See http://checkstyle.sf.net/config_design.html -->
<!-- <module name="DesignForExtension"/> -->
<module name="FinalClass"/>
<module name="HideUtilityClassConstructor"/>
<module name="InterfaceIsType"/>
<module name="VisibilityModifier"/>
<!-- Miscellaneous other checks. -->
<!-- See http://checkstyle.sf.net/config_misc.html -->
<module name="ArrayTypeStyle"/>
<module name="FinalParameters">
<property name="severity" value="warning"/>
</module>
<module name="TodoComment">
<property name="severity" value="warning"/>
</module>
<module name="UpperEll"/>
</module>
</module>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.chronopolis</groupId>
<artifactId>chronopolis</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<groupId>org.chronopolis</groupId>
<artifactId>chron-db</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
<version>1.1.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>1.1.4.RELEASE</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<version>2.3.2</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.chronopolis.db;
import org.chronopolis.db.ingest.IngestDB;
import org.chronopolis.db.ingest.ReplicationFlowTable;
import org.springframework.beans.factory.annotation.Autowired;
/**
* Created by shake on 4/9/14.
*/
public class DatabaseManager {
@Autowired
private IngestDB ingestDatabase;
@Autowired
private ReplicationFlowTable replicationFlowTable;
public IngestDB getIngestDatabase() {
return ingestDatabase;
}
public void setIngestDatabase(final IngestDB ingestDatabase) {
this.ingestDatabase = ingestDatabase;
}
public ReplicationFlowTable getReplicationFlowTable() {
return replicationFlowTable;
}
}
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package org.chronopolis.db;
import org.chronopolis.db.ingest.IngestDB;
import org.chronopolis.db.ingest.ReplicationFlowTable;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.FilterType;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.orm.hibernate4.HibernateExceptionTranslator;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.vendor.Database;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.sql.DataSource;
import java.sql.SQLException;
/**
*
* @author shake
*/
@Configuration
@EnableJpaRepositories(basePackages = "org.chronopolis.db",
includeFilters = @ComponentScan.Filter(value = {IngestDB.class,
ReplicationFlowTable.class},
type = FilterType.ASSIGNABLE_TYPE))
@EnableTransactionManagement
public class JPAConfiguration {
@Bean
public DataSource dataSource() throws SQLException {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("org.h2.Driver");
dataSource.setUrl("test-h2");
dataSource.setUsername("h2");
dataSource.setPassword("h2");
return dataSource;
}
@Bean
public DatabaseManager databaseManager() {