Saturday, June 25, 2011

ActiveMQ messages database logging with Apache Camel

Hi, we had epayment system based on Fuse ActiveMQ and  we decided to log into DB all messages passed through ActiveMQ. I want to share my first experience with Apache Camel :)

I changed ActiveMQ configuration and used ActiveMQ feature: 'Mirrored Queues' to forward all messages to mirrored queues prefixed with qmirror. Here is ActiceMQ configuration:

<destinationInterceptors>
<mirroredQueue copyMessage = "true" postfix="" prefix="qmirror."/>
</destinationInterceptors>

So now when we have copied messages in queues with names: 'qmirror.*' and it is time to log them with Apache Camel. Thus I changed '$ACTIVEMQ_HOME/conf/camel.xml' config file in the following way:

<beans...>

    <context:component-scan base-package="info.sargis.dbloggger"/>

    <context:annotation-config/>

    <camelContext xmlns="http://camel.apache.org/schema/spring">
        <package>info.sargis.dbloggger</package>
    </camelContext>

    <bean id="lobHandler" class="org.springframework.jdbc.support.lob.DefaultLobHandler"/>

    <bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
        <property name="url" value="jdbc:oracle:thin:@localhost:1521:TWMDB"/>
        <property name="username" value="activemq"/>
        <property name="password" value="ee0thaXu"/>
        <property name="maxActive" value="5"/>
        <property name="maxIdle" value="2"/>
    </bean>

    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL"
                          value="vm://localhost?create=false&amp;waitForStart=10000&amp;broker.populateJMSXUserID=true"/>
                <property name="userName" value="${activemq.username}"/>
                <property name="password" value="${activemq.password}"/>
            </bean>
        </property>
    </bean>

</beans>

and created project with structure:

|-- docs
|   `-- create_db.sql
|-- payment-dblogger.iml
|-- pom.xml
|-- README.txt
`-- src
    |-- data
    `-- main
        |-- java
        |   `-- info
        |       `-- sargis
        |           `-- dbloggger
        |               |-- DBLoggerRouteBuilder.java
        |               `-- logger
        |                   |-- DBLogger.java
        |                   |-- DBLoggerProcessor.java
        |                   `-- Logger.java
        `-- resources
            |-- log4j.properties
            `-- META-INF
                `-- spring
                    `-- camel-context.xml

note that project artifact payment-dblogger-*.jar should be deployed to '$ACTIVEMQ_HOME/webapps/camel/WEB-INF/lib'

Now its time for router :) here everything is simple as well:

package info.sargis.dbloggger;

import org.apache.camel.builder.RouteBuilder;

public class DBLoggerRouteBuilder extends RouteBuilder {
    @Override
    public void configure() {
        from("activemq:topic:qmirror.>").threads().processRef("dbLoggerProcessor");
    }
}

As you can see I get all messages from my mirrored queues and then process them to 'dbLoggerProcessor' bean. See below the target beans with simplified definitions:

package info.sargis.dbloggger.logger;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service("dbLoggerProcessor")
public class DBLoggerProcessor implements Processor {

    private static final Log LOGGER = LogFactory.getLog(DBLogger.class);

    @Autowired
    private Logger logger;

    @Override
    public void process(Exchange exchange) throws Exception {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Processing exchange: " + exchange);
        }
        try {
            logger.logExchange(exchange);
        } catch (Throwable e) {
            LOGGER.error("", e); // I should never stop routing
        }
    }

    public void setLogger(Logger logger) {
        this.logger = logger;
    }
}

the rest is pure spring stuff

@Repository
public class DBLogger implements Logger {

    private static final Log LOGGER = LogFactory.getLog(DBLogger.class);

    private JdbcTemplate jdbcTemplate;

    private LobHandler lobHandler;

    @Override
    public void logExchange(Exchange exchange) {
        Message message = exchange.getIn();
        jdbcTemplate.execute(
                "INSERT INTO DB_LOG (MESSAGE_ID, MESSAGE_DATE, MESSAGE_PAYLOAD) VALUES (?, ?, ?)",
                new LobCreatingPreparedStatementCallback(lobHandler, message)
        );
    }

    @Autowired
    private void setDataSource(DataSource dataSource) {
        jdbcTemplate = new JdbcTemplate(dataSource);
    }

    @Autowired
    public void setLobHandler(LobHandler lobHandler) {
        this.lobHandler = lobHandler;
    }

    private static class LobCreatingPreparedStatementCallback extends AbstractLobCreatingPreparedStatementCallback {
.....................................
    }
}

That's all, please note all message processing is not transactional, for transactions check Apache Camel documentation. You can find full project here: source

No comments:

Post a Comment