Spring JMS Tutorial with ActiveMQ

In this post I’ll look at Springs messaging support and how it can be used to integrate with Message Oriented Middleware (MOM) offerings such as Apache ActiveMQ. Although the sample application in this post will use ActiveMQ as its message broker, the application itself is vendor agnostic and can  integrate with any JMS compliant messaging platform. I’ve kept the application as loosely coupled from ActiveMQ as possible and will highlight the bits that would need to change if you were to choose another message platform such as IBM MQSeries.

Tech Stack

The sample code in this post will be built and deployed as a simple web application. Obviously this doesn’t have to be the case but I’ve chosen to build a web app as these type of enterprise integration components tend to be deployed as web applications in the real world. The sample app will be built using the following stack.
  • Apache ActiveMQ – JMS Message Broker(not part of the actual application but used to test our applications JMS fucntionality)
  • Spring 3.1
  • Maven
  • Tomcat

Setting up ActiveMQ

If you don’t already have ActiveMQ you’ll need to download it at http://activemq.apache.org/download.html The latest version at the time of writing is 5.6.0. To set up ActiveMQ follow the steps below.

  1. Copy the downloaded zip to C:Program Files and unzip.
  2. Open a command window and cd to C:Program Filesapache-activemq-5.6.0-binapache-activemq-5.6.0bin.
  3. Start ApacheMQ by calling activemq.bat

ActiveMQ should start up as shown in figure 1.0 below.

Figure 1.0 ActiveMQ Startup

Now that ActiveMQ has started we can open the admin console by navigating to http://localhost:8161/admin/index.jsp. On the home page we’ll see some information about the broker and some administration menu options.

Figure 2.0 ActiveMQ Admin Console

Next we need to create 2 new Queues, one that our sample application will consume messages from and another Queue that we’ll write messages to. Click on the Queues link and create a new Queue by entering a Queue name and clicking submit. For our sample application we’ll create 2 Queues, TestQueueOne and TestQueueTwo as shown in figure 3.0 below.

Figure 3.0 Create New Queues

Now that we have our message broker set up lets take start building an application to use it.

Creating the Project

We’ll start off by creating a simple Spring web project like the one shown in figure 4.0 below.

Figure 4.0 Project Structure

Spring Configuration

We’ll start by defining the Spring configuration for our application. I’ve commented the configuration below to help explain the main components. The most important parts are the DefaultMessageListenerContainer (used to consume messages) and the JMSTemplate (used to put message onto a queue).
<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
               xmlns:context="http://www.springframework.org/schema/context"  
               xmlns:jee="http://www.springframework.org/schema/jee"  
               xsi:schemaLocation="http://www.springframework.org/schema/beans  
                                     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
                                     http://www.springframework.org/schema/context  
                                     http://www.springframework.org/schema/context/spring-context-3.0.xsd  
                                     http://www.springframework.org/schema/beans  
                                     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
                                     http://www.springframework.org/schema/jee  
                                     http://www.springframework.org/schema/jee/spring-jee-3.0.xsd">  
   
   
      <!-- Use Springs JNDI support to look up JMS Connection Factory and Queue definitions from the  
           container. This means that specific connection details are not embedded in the application  
       -->  
      <jee:jndi-lookup id="mqConnectionFactory" jndi-name="java:comp/env/jms/mqConnectionFactory" />  
      <jee:jndi-lookup id="testQueueOne" jndi-name="java:comp/env/jms/testQueueOne" />  
      <jee:jndi-lookup id="testQueueTwo" jndi-name="java:comp/env/jms/testQueueTwo" />  
   
      <!-- Our message listener implementation that implements the JMS MessageListener interface and implements the  
            onMessage method to process incoming messages  
       -->  
      <bean id="testMessageListener" class="com.blog.spring.jms.TestMessageListener">  
         <property name="testMessageSender" ref ="testMessageSender" />  
      </bean>  
   
      <!-- DefaultMessageListenerConatiner is the Spring equivalent to an EJB Message Driven Bean.  
         It polls and consumes messages from a JMS queue. The configuration below is as follows  
   
         1. connectionFactory - the connection factory definition used to connect to the Message Broker  
            which in our case is Active MQ  
         2. destination - the Queue which the MessageListener container is listening on from incoming messages  
         3. messageListener - the implementation class that will actually handle the incoming messages. The  
            DeafultMesssageListener takes messages from the queue and passes them to the message listener for  
            processing. We've defined our message listener above (testMessageListener)  
         4. concurrentConsumers - this is the number of threads that the DeafultMesaegListenerContainer will  
            spawn to handle incoming messages. The default is 1 but in our application we'll have 2 separate  
            threads processing incoming messages.  
       -->  
      <bean id="poiMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
         <property name="connectionFactory" ref ="mqConnectionFactory" />  
         <property name="destination" ref ="testQueueOne"/>  
         <property name="messageListener" ref ="testMessageListener"/>  
         <property name="concurrentConsumers" value="2" />  
      </bean>  
   
      <!-- MessageSender is a simple POJO that we supply with a JMSTemplate and  
           the Queue that we want to send messages to  
       -->  
      <bean id="testMessageSender" class="com.blog.spring.jms.TestMessageSender">  
         <property name="jmsTemplate" ref="jmsTemplate"/>  
         <property name="testQueue" ref="testQueueTwo"/>  
      </bean>  
   
      <!-- JMSTemplate is a Spring template that allows us to communicate with  
           a message broker via JMS. JMSTemplate takes care of boiler plate code such as exception handling  
           and resource management such as connection pooling. This allows us concentrate on solving the 'business'  
           problem. We supply the JMS template with the connection factory mentioned above  
       -->  
      <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
         <property name="connectionFactory" ref="mqConnectionFactory" />  
      </bean>  
   
 </beans>  

JMS Resource Configuration

Next we’ll configure the ActiveMQ Connection Factory and Message Queues which we referenced on lines 19 to 21 above. These resources are defined in context.xml, a configuration file that Tomcat uses to look up runtime resources. This approach allows us to define connection details on the Servlet Container and outside of the actual application, which ensures that our application is not tightly coupled to the the JMS implementation. The resource lookups in context.xml is the only place where we reference ActiveMQ specifics. The application itself does not contain any reference to ActiveMQ, which means that we could easily switch to another JMS broker without changing any code in our application. To integrate with a different JMS broker all we’d need to do is update the context.xml to lookup Connection Factory and Queue definitions specific to our new JMS implementation, IBM MQSeries for example. Our context.xml is defined below.

<?xml version="1.0" encoding="UTF-8"?>  
<Context>  
  
  <!--  
        Active MQ Connection Factory manages pooled connections  
        to the ActiveMQ broker. Tomcat will connect with the  
        broker using a TCP connection on port 61616 - this is the  
        default port for ActiveMQ  
  -->  
   <Resource name="jms/mqConnectionFactory"  
             auth="Container"  
             type="org.apache.activemq.ActiveMQConnectionFactory"  
             description="JMS Connection Factory"  
             factory="org.apache.activemq.jndi.JNDIReferenceFactory"  
             brokerURL="tcp://localhost:61616" />  
   
   <!--  
         This is a reference to the first Queue we defined  
         earlier in the ActiveMQ admin console  
   -->  
   <Resource name="jms/testQueueOne"  
             auth="Container"  
             type="org.apache.activemq.command.ActiveMQQueue"  
             factory="org.apache.activemq.jndi.JNDIReferenceFactory"  
             physicalName="TestQueueOne"/>  
   
   <!--  
         This is a reference to the second Queue we defined  
         earlier in the ActiveMQ admin console  
   -->  
   <Resource name="jms/testQueueTwo"  
             auth="Container"  
             type="org.apache.activemq.command.ActiveMQQueue"  
             factory="org.apache.activemq.jndi.JNDIReferenceFactory"  
             physicalName="TestQueueTwo"/>  
   
 </Context>  

The final part of our application configuration is web.xml. Here we simply define the context loader listener and point it at the Spring configuration file we defined earlier.

<?xml version="1.0" encoding="UTF-8"?>  
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
           xmlns="http://java.sun.com/xml/ns/javaee"  
           xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"  
           xsi:schemaLocation="http://java.sun.com/xml/ns/javaee  
                               http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"  
           id="WebApp_ID"  
           version="2.5">  
  
      <!--  
           Main configuration file for this Spring web application.  
      -->  
      <context-param>  
           <param-name>contextConfigLocation</param-name>  
           <param-value>  
                /WEB-INF/config/spring-config.xml  
           </param-value>  
      </context-param>  
   
      <!--  
           Loads the Spring web application context using the config file defined above.  
      -->  
      <listener>  
           <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>  
      </listener>  
   
 </web-app>  

Message Listener

Now we’re ready to start writing some code. You may be glad to know that you that don’t have to write very much code to consume messages from a JMS broker. In fact all we need is a class that implements the MessageListener interface – we override the onMessage method which is then invoked by the DefaultMessageListenerContainer (defined earlier) when a message is read from the queue. Our implementation of onMessage is very straight forward – we simply log the incoming message and then pass it to the message sender (defined later). The Message Listener class is defined below.

package com.blog.spring.jms;  
  
import javax.jms.JMSException;  
import javax.jms.Message;  
import javax.jms.MessageListener;  
import javax.jms.TextMessage;  
import org.apache.log4j.Logger;  
  
 /**  
  * Class handles incoming messages  
  *  
  * @see PointOfIssueMessageEvent  
  */  
 public class TestMessageListener implements MessageListener  
 {  
   
      private TestMessageSender messageSender_i;  
      private static final Logger logger_c = Logger.getLogger(TestMessageListener.class);
   
      /**  
       * Method implements JMS onMessage and acts as the entry  
       * point for messages consumed by Springs DefaultMessageListenerContainer.  
       * When DefaultMessageListenerContainer picks a message from the queue it  
       * invokes this method with the message payload.  
       */  
      public void onMessage(Message message)  
      {  
           logger_c.debug("Received message from queue [" + message +"]");  
   
           /* The message must be of type TextMessage */  
           if (message instanceof TextMessage)  
           {  
                try  
                {  
                     String msgText = ((TextMessage) message).getText();  
                     logger_c.debug("About to process message: " + msgText);  
   
                     /* call message sender to put message onto second queue */  
                     messageSender_i.sendMessage(msgText);  
   
                }  
                catch (JMSException jmsEx_p)  
                {  
                     String errMsg = "An error occurred extracting message";  
                     logger_c.error(errMsg, jmsEx_p);  
                }  
           }  
           else  
           {  
                String errMsg = "Message is not of expected type TextMessage";  
                logger_c.error(errMsg);  
                throw new RuntimeException(errMsg);  
           }  
      }  
   
      /**  
       * Sets the message sender.  
       *  
       * @param messageSender_p the new message sender  
       */  
      public void setTestMessageSender(TestMessageSender messageSender_p)  
      {  
           this.messageSender_i = messageSender_p;  
      }  
 }  
   

Message Sender

Now that our application can consume messages lets take a look at how we push messages onto a queue. We defined a JMSTemplate in the Spring configuration earlier – this class provides a convenient way of pushing messages onto a queue and saves us from having to write boiler plate code for opening and closing connections, handling JMS exceptions etc. This allows the developer to simply specify a queue and the message payload that you want to push onto that queue.

package com.blog.spring.jms;  
  
import javax.jms.JMSException;  
import javax.jms.Queue;  
import org.apache.log4j.Logger;  
import org.springframework.jms.core.JmsTemplate;  
import org.springframework.stereotype.Service;  
  
/**  
  * The TestMessageSender class uses the injected JMSTemplate to send a message  
  * to a specified Queue. In our case we're sending messages to 'TestQueueTwo'  
  */  
 @Service  
 public class TestMessageSender  
 {  
      private JmsTemplate jmsTemplate_i;  
      private Queue testQueue_i;  
      private static final Logger logger_c = Logger .getLogger(TestMessageSender.class);  
   
      /**  
       * Sends message using JMS Template.  
       *  
       * @param message_p the message_p  
       * @throws JMSException the jMS exception  
       */  
      public void sendMessage(String message_p) throws JMSException  
      {  
           logger_c.debug("About to put message on queue. Queue[" + testQueue_i.toString() + "] Message[" + message_p + "]");  
           jmsTemplate_i.convertAndSend(testQueue_i, message_p);  
      }  
   
      /**  
       * Sets the jms template.  
       *  
       * @param template the jms template  
       */  
      public void setJmsTemplate(JmsTemplate tmpl)  
      {  
           this.jmsTemplate_i = tmpl;  
      }  
   
      /**  
       * Sets the test queue.  
       *  
       * @param queue the new test queue  
       */  
      public void setTestQueue(Queue queue)  
      {  
           this.testQueue_i = queue;  
      }  
 }  

Testing the Application

Now that the application is complete lets test it. Using the ActiveMQ admin console we’ll put a message onto TestQueueOne. Conveniently you can tell the broker to submit the message as may times as you like. We’ll ask the broker to submit our message 100 times so that we have time to see  it get picked up and processed in the eclipse console. When we put a message onto TestQueueOne we should see the following

  1. DefaultMessageListenerContainer will pick up the next message from TestQueuOne and invoke the onMessage method in our Message listener class with the message payload.
  2. onMessage will log the message payload and then call the TestMessageSender sendMessage with the message we’ve just received.
  3. TestMessageSender will use the JMSTemplate to put the message onto TestQueueTwo.

To test the application follow the steps below.

  • In the ActiveMQ admin console go to the TestQueueOne definition and click the SendTo link. Add a message and set the ‘Number of Messages to Send’ to 100. This will ensure that broker puts the message onto the queue 100 times.

Figure 5.0 Send JMS Message

  • Click Send. 100 messages will be put onto the queue and you should see the application log the consumption of each message as shown in the log extract below.
DEBUG: [Sep-16 22:59:29,911] spring.jms.TestMessageListener - Received message from queue [ActiveMQTextMessage {commandId = 96, responseRequired = false, messageId = ID:motown-58007-1347813056932-3:4:1:1:92, originalDestination = null, originalTransactionId = null, producerId = ID:motown-58007-1347813056932-3:4:1:1, destination = queue://TestQueueOne, transactionId = null, expiration = 0, timestamp = 1347832725483, arrival = 0, brokerInTime = 1347832725510, brokerOutTime = 1347832766192, correlationId = , replyTo = null, persistent = false, type = , priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@855da06, marshalledProperties = org.apache.activemq.util.ByteSequence@3c5cc430, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = null}]  
 DEBUG: [Sep-16 22:59:29,911] spring.jms.TestMessageListener - About to process message: Test Message!!  
 DEBUG: [Sep-16 22:59:29,911] spring.jms.TestMessageSender - About to put message on queue. Queue[queue://TestQueueTwo] Message[Test Message!!]  
 DEBUG: [Sep-16 22:59:29,947] spring.jms.TestMessageListener - Received message from queue [ActiveMQTextMessage {commandId = 97, responseRequired = false, messageId = ID:motown-58007-1347813056932-3:4:1:1:93, originalDestination = null, originalTransactionId = null, producerId = ID:motown-58007-1347813056932-3:4:1:1, destination = queue://TestQueueOne, transactionId = null, expiration = 0, timestamp = 1347832725483, arrival = 0, brokerInTime = 1347832725510, brokerOutTime = 1347832766192, correlationId = , replyTo = null, persistent = false, type = , priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@6e544a45, marshalledProperties = org.apache.activemq.util.ByteSequence@5fd83099, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = null}]  
 DEBUG: [Sep-16 22:59:29,947] spring.jms.TestMessageListener - About to process message: Test Message!!  
 DEBUG: [Sep-16 22:59:29,948] spring.jms.TestMessageSender - About to put message on queue. Queue[queue://TestQueueTwo] Message[Test Message!!]  
 DEBUG: [Sep-16 22:59:29,986] spring.jms.TestMessageListener - Received message from queue [ActiveMQTextMessage {commandId = 98, responseRequired = false, messageId = ID:motown-58007-1347813056932-3:4:1:1:94, originalDestination = null, originalTransactionId = null, producerId = ID:motown-58007-1347813056932-3:4:1:1, destination = queue://TestQueueOne, transactionId = null, expiration = 0, timestamp = 1347832725483, arrival = 0, brokerInTime = 1347832725510, brokerOutTime = 1347832766192, correlationId = , replyTo = null, persistent = false, type = , priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@6a5ebdf7, marshalledProperties = org.apache.activemq.util.ByteSequence@7209d9af, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = null}]  
 DEBUG: [Sep-16 22:59:29,986] spring.jms.TestMessageListener - About to process message: Test Message!!  
 DEBUG: [Sep-16 22:59:29,986] spring.jms.TestMessageSender - About to put message on queue. Queue[queue://TestQueueTwo] Message[Test Message!!]  
 DEBUG: [Sep-16 22:59:30,022] spring.jms.TestMessageListener - Received message from queue [ActiveMQTextMessage {commandId = 99, responseRequired = false, messageId = ID:motown-58007-1347813056932-3:4:1:1:95, originalDestination = null, originalTransactionId = null, producerId = ID:motown-58007-1347813056932-3:4:1:1, destination = queue://TestQueueOne, transactionId = null, expiration = 0, timestamp = 1347832725483, arrival = 0, brokerInTime = 1347832725511, brokerOutTime = 1347832766192, correlationId = , replyTo = null, persistent = false, type = , priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@64b2aaa6, marshalledProperties = org.apache.activemq.util.ByteSequence@de1abf0, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = null}]  
 DEBUG: [Sep-16 22:59:30,022] spring.jms.TestMessageListener - About to process message: Test Message!!  
 DEBUG: [Sep-16 22:59:30,022] spring.jms.TestMessageSender - About to put message on queue. Queue[queue://TestQueueTwo] Message[Test Message!!]  
 DEBUG: [Sep-16 22:59:30,052] spring.jms.TestMessageListener - Received message from queue [ActiveMQTextMessage {commandId = 100, responseRequired = false, messageId = ID:motown-58007-1347813056932-3:4:1:1:96, originalDestination = null, originalTransactionId = null, producerId = ID:motown-58007-1347813056932-3:4:1:1, destination = queue://TestQueueOne, transactionId = null, expiration = 0, timestamp = 1347832725484, arrival = 0, brokerInTime = 1347832725511, brokerOutTime = 1347832766192, correlationId = , replyTo = null, persistent = false, type = , priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@5adf20ae, marshalledProperties = org.apache.activemq.util.ByteSequence@6edaae1d, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = null}]  
 DEBUG: [Sep-16 22:59:30,053] spring.jms.TestMessageListener - About to process message: Test Message!!  
 DEBUG: [Sep-16 22:59:30,053] spring.jms.TestMessageSender - About to put message on queue. Queue[queue://TestQueueTwo] Message[Test Message!!]
  • Now take a look at the number of messages on TestQueueTwo – it should now have 100 pending messages. These are the 100 messages that were consumed from TestQueueOne and pushed onto TestQueueTwo by our application. See screenshot below.

Figure 6.0 Messages pushed to TestQueueTwo

Wrapping Up

The sample code in this post should be enough to familiarise you with the fundamentals of JMS messaging with Spring. Although the code and sample configurations are very simple they should act as a basic guide and get you moving in the right direction. If you’re interested in learning more I’d recommend you look at the Spring documentation so that you can read up on some of the more advanced messaging features available. If you want to run this tutorial locally you can grab the full source code here https://docs.google.com/folder/d/0B_SZOyniHfc1YXE0M3BER242X28/edit Enjoy!