I changed ActiveMQ configuration and used ActiveMQ feature: 'Mirrored Queues' to forward all messages to mirrored queues prefixed with qmirror. Here is ActiceMQ configuration:
<destinationInterceptors>
<mirroredQueue copyMessage = "true" postfix="" prefix="qmirror."/>
</destinationInterceptors>
So now when we have copied messages in queues with names: 'qmirror.*' and it is time to log them with Apache Camel. Thus I changed '$ACTIVEMQ_HOME/conf/camel.xml' config file in the following way:
<beans...>
<context:component-scan base-package="info.sargis.dbloggger"/>
<context:annotation-config/>
<camelContext xmlns="http://camel.apache.org/schema/spring">
<package>info.sargis.dbloggger</package>
</camelContext>
<bean id="lobHandler" class="org.springframework.jdbc.support.lob.DefaultLobHandler"/>
<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
<property name="url" value="jdbc:oracle:thin:@localhost:1521:TWMDB"/>
<property name="username" value="activemq"/>
<property name="password" value="ee0thaXu"/>
<property name="maxActive" value="5"/>
<property name="maxIdle" value="2"/>
</bean>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL"
value="vm://localhost?create=false&waitForStart=10000&broker.populateJMSXUserID=true"/>
<property name="userName" value="${activemq.username}"/>
<property name="password" value="${activemq.password}"/>
</bean>
</property>
</bean>
</beans>
and created project with structure:
| `-- create_db.sql
|-- payment-dblogger.iml
|-- pom.xml
|-- README.txt
`-- src
|-- data
`-- main
|-- java
| `-- info
| `-- sargis
| `-- dbloggger
| |-- DBLoggerRouteBuilder.java
| `-- logger
| |-- DBLogger.java
| |-- DBLoggerProcessor.java
| `-- Logger.java
`-- resources
|-- log4j.properties
`-- META-INF
`-- spring
`-- camel-context.xml
note that project artifact payment-dblogger-*.jar should be deployed to '$ACTIVEMQ_HOME/webapps/camel/WEB-INF/lib'
Now its time for router :) here everything is simple as well:
package info.sargis.dbloggger;
import org.apache.camel.builder.RouteBuilder;
public class DBLoggerRouteBuilder extends RouteBuilder {
@Override
public void configure() {
from("activemq:topic:qmirror.>").threads().processRef("dbLoggerProcessor");
}
}
As you can see I get all messages from my mirrored queues and then process them to 'dbLoggerProcessor' bean. See below the target beans with simplified definitions:
package info.sargis.dbloggger.logger;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service("dbLoggerProcessor")
public class DBLoggerProcessor implements Processor {
private static final Log LOGGER = LogFactory.getLog(DBLogger.class);
@Autowired
private Logger logger;
@Override
public void process(Exchange exchange) throws Exception {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Processing exchange: " + exchange);
}
try {
logger.logExchange(exchange);
} catch (Throwable e) {
LOGGER.error("", e); // I should never stop routing
}
}
public void setLogger(Logger logger) {
this.logger = logger;
}
}
the rest is pure spring stuff
@Repository
public class DBLogger implements Logger {
private static final Log LOGGER = LogFactory.getLog(DBLogger.class);
private JdbcTemplate jdbcTemplate;
private LobHandler lobHandler;
@Override
public void logExchange(Exchange exchange) {
Message message = exchange.getIn();
jdbcTemplate.execute(
"INSERT INTO DB_LOG (MESSAGE_ID, MESSAGE_DATE, MESSAGE_PAYLOAD) VALUES (?, ?, ?)",
new LobCreatingPreparedStatementCallback(lobHandler, message)
);
}
@Autowired
private void setDataSource(DataSource dataSource) {
jdbcTemplate = new JdbcTemplate(dataSource);
}
@Autowired
public void setLobHandler(LobHandler lobHandler) {
this.lobHandler = lobHandler;
}
private static class LobCreatingPreparedStatementCallback extends AbstractLobCreatingPreparedStatementCallback {
.....................................
}
}
That's all, please note all message processing is not transactional, for transactions check Apache Camel documentation. You can find full project here: source
No comments:
Post a Comment