Monday, November 16, 2015

Using Qpid JMS with Mule

We were using Qpid client 0.32 to consume messages from Azure AMQP queues:

  <spring:beans>
    <util:map id="providerProperties" map-class="java.util.Hashtable">
      <spring:entry key="connectionfactory.SBCF"
        value="amqps://POLICY_NAME:SHARED_ACCESS_KEY@NAMESPACE.servicebus.windows.net" />
    </util:map>
  </spring:beans>

  <jms:connector name="amqpJmsBridge" specification="1.1" validateConnections="false"
      numberOfConsumers="2" maxRedelivery="-1" doc:name="AQMP JMS Bridge">
    <spring:property name="connectionFactoryJndiName" value="SBCF" />
    <reconnect-forever/>
    <jms:custom-jndi-name-resolver class="org.mule.transport.jms.jndi.SimpleJndiNameResolver">
      <spring:property name="jndiInitialFactory"
        value="org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory" />
      <spring:property name="jndiProviderProperties" ref="providerProperties" />
    </jms:custom-jndi-name-resolver>
  </jms:connector>

Occasionally the message consumer would hang. A thread dump showed a deadlock:

Found one Java-level deadlock:
=============================
"Thread-74163":
  waiting to lock monitor 0x00007f1e58011e18 (object 0x00000000dc109838, a org.mule.transport.jms.JmsConnector),
  which is held by "Thread-74162"
"Thread-74162":
  waiting to lock monitor 0x00007f1e54004c18 (object 0x00000000dc4810c8, a org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint),
  which is held by "Thread-61"
"Thread-61":
  waiting to lock monitor 0x00007f1e58011e18 (object 0x00000000dc109838, a org.mule.transport.jms.JmsConnector),
  which is held by "Thread-74162"

We decided to migrate to the amqp-1.0 branch of Qpid
(qpid-client 0.32 is an amqp-0.X branch of Qpid)

We hit two exceptions in the process of migrating:

(1) javax.jms.JMSException: Idle timeout value specified in connection OPEN ('30000 ms') is not supported. Minimum idle timeout is '60000' ms. ... [condition = amqp:internal-error]

(2) javax.jms.JMSSecurityException: Unauthorized access. 'Listen' claim(s) are required to perform this operation. ... [condition = amqp:unauthorized-access]

The following configuration resolved these exceptions:

  <spring:beans>
    <util:map id="providerProperties" map-class="java.util.Hashtable">
      <spring:entry key="connectionfactory.SBCF"
        value="amqps://NAMESPACE.servicebus.windows.net?amqp.idleTimeout=120000" />
      <spring:entry key="property.connectionfactory.SBCF.username" value="POLICY_NAME" />
      <spring:entry key="property.connectionfactory.SBCF.password" value="SHARED_ACCESS_KEY" />
    </util:map>
  </spring:beans>

  <jms:connector name="amqpJmsBridge" specification="1.1" validateConnections="false"
      numberOfConsumers="2" maxRedelivery="-1" doc:name="AQMP JMS Bridge">
    <spring:property name="connectionFactoryJndiName" value="SBCF" />
    <reconnect-forever/>
    <jms:custom-jndi-name-resolver class="org.mule.transport.jms.jndi.SimpleJndiNameResolver">
      <spring:property name="jndiInitialFactory" value="org.apache.qpid.jms.jndi.JmsInitialContextFactory" />
      <spring:property name="jndiProviderProperties" ref="providerProperties" />
    </jms:custom-jndi-name-resolver>
  </jms:connector>

[(1) was resolved by passing parameter amqp.idleTimeout. Note that the actual timeout seems to be this divided by 2. See https://qpid.apache.org/releases/qpid-jms-0.6.0/docs/index.html]
[(2) was resolved by explicitly setting the credentials on the connection factory using 'property.connectionfactory.' prefix, rather than passing the credentials in the uri. See method getConnectionFactoryProperties of https://github.com/apache/qpid-jms/blob/0.6.0/qpid-jms-client/src/main/java/org/apache/qpid/jms/jndi/JmsInitialContextFactory.java]