Wednesday, July 1, 2015

ActiveMQ does not store more than 1 message when the message size is Big

When performing some tests on a new proxy service, I observed that in some cases a message store was not accepting more than one message at a time.

Here's a brief description of what I was trying to do and what I observed:

I have a proxy service that takes messages from a queue, perform an xslt translation on it, then put it in a message store. There is a Message processor that picks up the message from the store and calls a remote web service. Once the the web service returns, I enrich the main message with the result and send it to its final destination.




What happened is when messages reached certain size (about 4MB), the web service call was taking about 40 seconds to return results. While the message processor was waiting for the response, nothing else was getting in the message store (see example below).

Example: I have 3 messages in the input queue that need to be processed. (A proxy service listens on this queue) Here's what happens. 

Step 1: The proxy picks up Message_1, translates it and puts it in the message store to be processed by the remote web service call
Step 2: The message processor picks up Message 1 from the store and sends it to the remote web service for processing. At the same time, the proxy picks Message 2, translates it and tries to put it in the message store.
Step 2.5: the message processor is still waiting for the remote service to return. Message 1 seems to be still in the store, Message 2 is in the ESB memory, and Message 3 is not being picked up by the proxy because the proxy still has Message 2.
Step 3: The remote call returns. The message processor handles control of Message 1 to the next step on the ESB (enrich message with response and send to final destination). Message 1 is removed from the message store. Message 2 finally gets in the message store, and Message 3 is picked up by the proxy.
Step 4: Repeat until no more messages exist in the input queue.


In theory, message store is acting like a queue, while message processor processing messages from the message store. So, Message 2 should have been successfully placed in the message store and waited there for Message 1 to finish processing. But for some reason it doesn't.

Here's the sample synapse-config I used with WSO2 ESB 4.8.1 and ActiveMQ 5.4.3 used as the queue.

<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://ws.apache.org/ns/synapse">
   <proxy name="FrontQueue"
          transports="https http jms"
          startOnLoad="true"
          trace="disable">
      <description/>
      <target>
         <inSequence>
            <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
            <property name="OUT_ONLY" value="true"/>
            <store messageStore="MyTest"/>
         </inSequence>
      </target>
   </proxy>
    <proxy name="FrontProxy" transports="http" startOnLoad="true">
      <target>
         <endpoint>
            <address uri="jms:/FrontQueue?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&amp;java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&amp;java.naming.provider.url=tcp://localhost:61616"/>
         </endpoint>
         <inSequence>
            <property name="OUT_ONLY" value="true"/>
         </inSequence>
         <outSequence>
            <send/>
         </outSequence>
      </target>
   </proxy>
   <endpoint name="SimpleStockQuoteService">
      <address uri="http://127.0.0.1:9000/services/SimpleStockQuoteService"/>
   </endpoint>
    <messageStore class="org.apache.synapse.message.store.impl.jms.JmsStore"
                 name="MyTest">
      <parameter name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
      <parameter name="store.jms.password">admin</parameter>
      <parameter name="java.naming.provider.url">tcp://localhost:61616</parameter>
      <parameter name="store.jms.username">admin</parameter>
      <parameter name="store.jms.JMSSpecVersion">1.1</parameter>
   </messageStore>
   <messageProcessor class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor"
                     name="Processor1"
                     targetEndpoint="SimpleStockQuoteService"
                     messageStore="MyTest">
      <parameter name="client.retry.interval">1000</parameter>
      <parameter name="interval">1000</parameter>
      <parameter name="is.active">true</parameter>
   </messageProcessor>
</definitions> 


Here my only guess is message size. because this reproduced only for the big messages like 4MB. 

1. When I debug the  MessageStoreMediator, messages passes to the queue without waiting, but in ActiveMQ shows there's only 1 message in the queue at given time. (once the first message finish processing, second message come to the queue)

2. When I setup the scenario with WSO2 MB 2.0.0, all the small and big messages were placed in queue correctly regardless of the message size.

3. When I send big messages in different frequency, I could reproduce the same behavior via JMS Client too.
Here are the results when I send different size of messages with different delays via JMS Client 

1MB 10 messages
with 5000ms delay - all 10 messages queued
with 10000ms delay - all 10 messages queued
with 30000ms delay - all 10 messages queued


2MB 10 messages
with 5000ms delay - only 6 messages queued
with 10000ms delay - only 3 messages queued
with 30000ms delay - only 1 messages queued


4MB 10 messages
with 5000ms delay - only 6 messages queued
with 10000ms delay - only 3 messages queued
with 30000ms delay - only 1 messages queued


So it seems there's no issue with WSO2 ESB message store and this is a limitation of ActiveMQ; it takes message by message to the queue when the message size is big.

Further investigating I noticed the following log message in ActiveMQ log at the time when message which does not placed in queue.

2014-12-08 09:25:05,922 | INFO | Usage Manager Memory Limit (1048576) reached on queue://SAMPLEQUEUE4000. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info | org.apache.activemq.broker.region.Queue | ActiveMQ Transport: tcp:///127.0.0.1:37613

"Producer Flow Control occurs when you hit one of the size limits (memory, disk, etc.) for either the entire broker or a single destination. Once you hit it, producers on that destination (or the whole broker, depending on which limit you hit) are unable to send more messages until enough space frees up to hold the next one. So your one-out-one-in mental model is the right one (though if the messages are of different sizes, then it might not be truly one-for-one). This will continue for as long as the limit continues to be hit because producers are faster than consumers; it's not time-based, and it's not forever, just until consumers start catching up and you're not running into any limits." 

So when I changed either the producerFlowControl=false or memoryLimit to higher value, I could successfully send all big messages to the queue without waiting until the previous messages finish processing.


activemq.xml
-------------------

<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">   
  <pendingQueuePolicy>
    <vmQueueCursor/>
  </pendingQueuePolicy>
</policyEntry>


WSO2 ESB http://wso2.com/products/enterprise-service-bus/
WSO2 MB http://wso2.com/products/message-broker/