RabbitMQ Inbound

Introduction

AMQP is a wire-level messaging protocol that describes the format of the data that is sent across the network. If a system or application can read and write AMQP, it can exchange messages with any other system or application that understands AMQP regardless of the implementation language.

Syntax

<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse" name="RabbitMQConsumer" sequence="amqpSeq" onError="amqpErrorSeq" protocol="rabbitmq" suspend="false">
   <parameters>
      <parameter name="sequential">true</parameter>
      <parameter name="coordination">true</parameter>
      <parameter name="rabbitmq.connection.factory">AMQPConnectionFactory</parameter>
      <parameter name="rabbitmq.server.host.name">localhost</parameter>
      <parameter name="rabbitmq.server.port">5672</parameter>
      <parameter name="rabbitmq.server.user.name">guest</parameter>
      <parameter name="rabbitmq.server.password">guest</parameter>
      <parameter name="rabbitmq.queue.name">queue</parameter>
      <parameter name="rabbitmq.exchange.name">exchange</parameter>
      <parameter name="rabbitmq.connection.ssl.enabled">false</parameter>
   </parameters>
</inboundEndpoint>

Properties

Listed below are the properties used for creating a RabbitMQ inbound endpoint.

Required Properties

The following properties are required when creating a RabbitMQ inbound endpoint.

Property Description
sequential The behavior when executing the given sequence.
When set as true , mediation will happen within the same thread. When set as false , the mediation engine will use the inbound thread pool. The default thread pool values can be found in the MI_HOME/conf/deployment.toml file, under the `[mediation]` section. The default setting is true.
rabbitmq.connection.factory Name of the connection factory.
rabbitmq.server.host.name Host name of the server.
rabbitmq.server.port The port number of the server.
rabbitmq.server.user.name The user name to connect to the server.
rabbitmq.server.password The password to connect to the server.
rabbitmq.queue.name The queue name to send or consume messages.
coordination This parameter is only applicable in a clustered environment.
In a cluster environment an inbound endpoint will only be executed in worker nodes. If this parameter is set to true in a clustered environment, the inbound will only be executed in a single worker node. Once the running worker node is down, the inbound will start on another available worker node in the cluster. By default, this setting is true.

Optional Properties

The following optional properties can be configured when creating an RabbitMQ inbound endpoint. Note that the optional properties related to defining a queue should contain the rabbitmq.queue.optional. prefix, and the optional properties related to defining an exchange should contain the rabbitmq.exchange.optional. prefix.

Tip

Note that keystore information is not required for an SSL connection if the fail_if_no_peer_cert parameter is set to 'false' in the RabbitMQ broker. You only need to enable SSL in the Micro Integrator (using the rabbitmq.connection.ssl.enabled parameter).

However, if the fail_if_no_peer_cert parameter is set to 'true' in RabbitMQ, the keystore configurations (given below) are also required for the Micro Integrator.

Shown below is an example of the config file where fail_if_no_peer_cert is set to false:

ssl_options.cacertfile = /path/to/ca_certificate.pem
ssl_options.certfile   = /path/to/server_certificate.pem
ssl_options.keyfile    = /path/to/server_key.pem
ssl_options.verify     = verify_peer
ssl_options.fail_if_no_peer_cert = false

Property Name Description
rabbitmq.server.virtual.host The virtual host name of the server (if available).
rabbitmq.connection.ssl.enabled Whether to use TCP connection or SSL connection.
rabbitmq.connection.ssl.keystore.location The keystore location.
rabbitmq.connection.ssl.keystore.type The keystore type.
rabbitmq.connection.ssl.keystore.password The keystore password.
rabbitmq.connection.ssl.truststore.location The truststore location.
rabbitmq.connection.ssl.truststore.type The truststore type.
rabbitmq.connection.ssl.truststore.password The truststore password.
rabbitmq.connection.ssl.version The SSL version.
rabbitmq.channel.consumer.qos The prefetch message count. This many messaged will be prefetched before the application sees it. If a value is not set, 0 will be used as the default value.
rabbitmq.consumer.tag RabbitMQ consumer identifier.
rabbitmq.connection.factory.network.recovery.interval Interval at which the server will retry connecting to the RabbitMQ server in the case of a failure in the established connection.
rabbitmq.factory.connection.timeout Timeout for the connection initialization.
rabbitmq.queue.durable Whether the queue should remain declared even if the broker restarts.
rabbitmq.queue.exclusive Whether the queue should be exclusive or should be consumable by other connections.
rabbitmq.queue.auto.delete Whether to keep the queue even if it is not being consumed anymore.
rabbitmq.queue.auto.ack Whether to send back an acknowledgment.
rabbitmq.exchange.name The name of the RabbitMQ exchange to which the queue is bound.
rabbitmq.queue.routing.key The routing key of the queue.
rabbitmq.queue.autodeclare Whether or not to declare the queue. If set to true, the Micro Integrator creates queues if they are not already present. If set to false, the Micro Integrator will assume that a queue is already available. However, you should set this parameter to true only if queues are not already declared in the RabbitMQ server. Setting this parameter to false in the publish URL improves RabbitMQ transport performance.
rabbitmq.exchange.type The type of the exchange.
rabbitmq.exchange.durable Whether the exchange should remain declared even if the broker restarts.
rabbitmq.exchange.auto.delete Whether to keep the queue even if it is not used anymore.
rabbitmq.exchange.autodeclare Whether or not to declare the exchange. If set to true, the Micro Integrator creates exchanges. If set to false, the Micro Integrator will assume that an exchange is already available. However, you should set this parameter to true only if exchanges are not already declared in the RabbitMQ server. Setting this parameter to false in the publish URL improves RabbitMQ transport performance.
rabbitmq.factory.heartbeat

The period of time after which the connection should be considered dead.

rabbitmq.message.max.dead.lettered.count The maximum number of attempts a message is allowed to be dead lettered. Once the count exceeds, the message will be discarded or published to the given 'rabbitmq.message.error.queue.routing.key'.
rabbitmq.message.requeue.delay Delay for the message to be requeued in the case of immediate requeing.
rabbitmq.message.error.queue.routing.key The routing key to publish the message after the 'max.dead.lettered' count has been reached.
rabbitmq.message.error.exchange.name The exchange to which messages are published after the `max.dead.lettered` count has been reached.

Note

The property rabbitmq.exchange.name becomes mandatory if you are trying to connect to a new queue, so that it can be bound to an exchange for messages to flow.

Connection Recovery Properties

In case of a network failure or broker shutdown, the Micro Integrator can try to recreate the connection.

If you want to enable connection recovery, you should configure the following parameters in the inbound endpoint:

<parameter name="rabbitmq.connection.retry.interval" locked="false">10000</parameter>
<parameter name="rabbitmq.connection.retry.count" locked="false">5</parameter>   

If the parameters are configured with sample values as given above, the server makes 5 retry attempts with a time interval of 10000 milliseconds between each retry attempt to reconnect when the connection is lost. If reconnecting fails after 5 retry attempts, the Micro Integrator terminates the connection.

Mediator Properties

In addition to the parameters described above, you can define RabbitMQ properties using the Property mediator and the Property Group mediator.

Top