Skip to main content
Skip table of contents

AMQP 1.0 Client

Introduction

The SwiftMQ AMQP 1.0 Java Client is an implementation of the AMQP 1.0 specification in Java and can be used to connect to any AMQP 1.0 capable endpoints such as a SwiftMQ Router.

The client API is similar to JMS but easier to handle, thread-safe if not specified otherwise and consists only of a few classes:

Class

Package

Description

AMQPMessage

com.swiftmq.amqp.v100.messaging

Represents an AMQP message.

QoS

com.swiftmq.amqp.v100.client

Constants representing the different quality of service modes.

AMQPContext

com.swiftmq.amqp

Represents the context of the client.

Connection

com.swiftmq.amqp.v100.client

Connection to an AMQP endpoint.

ExceptionListener

com.swiftmq.amqp.v100.client

Can be registered at a Connection to listen for disconnects.

Session

com.swiftmq.amqp.v100.client

Grouping context for Links.

Producer

com.swiftmq.amqp.v100.client

Outgoing (sending) Link.

Consumer

com.swiftmq.amqp.v100.client

Incoming (receiving) Link.

DurableConsumer

com.swiftmq.amqp.v100.client

Incoming (receiving) Durable Link.

MessageAvailabilityListener

com.swiftmq.amqp.v100.client

Used for non-blocking receives.

DeliveryMemory

com.swiftmq.amqp.v100.client

Used for settlement and link recovery.

TransactionController

com.swiftmq.amqp.v100.client

Can control multiple transactions per Session.

Specification and Samples

Other classes referenced from the above are part of SwiftMQ's AMQP library (amqp.jar) which is almost directly generated from the specification's XML files.

There are also several example programs provided under the SwiftMQ distribution's directory "samples/amqp". Please have a look!

Debugging

The following system properties can be set at the client to enable debugging. All output goes to System.out.

System Property

Description

swiftmq.amqp.frame.debug

Shows all sent and received AMQP frames.

swiftmq.amqp.debug

Shows quite verbose debug output of the SwiftMQ AMQP library.

Example:

-Dswiftmq.amqp.frame.debug=true -Dswiftmq.amqp.debug=true

AMQPMessage

The AMQP specification does not define a message type. It only specified sections that can make a message. AMQPMessage is SwiftMQ's custom type of message that handles access to the sections. With the exception of accept() and reject() an AMQPMessage is not thread-safe.

The following fields are overwritten from the Producer during send and therefore MUST not be set from the application:

Section

Field

Header

Durable

Header

Priority

Header

Ttl

Properties

MessageId

Properties

To

Properties

UserId

QoS

For convenience the client API pre-defines 3 modes of quality of service (QoS):

Mode

Description

AT_MOST_ONCE

Messages are pre-settled at the sender endpoint. Messages may be lost.

AT_LEAST_ONCE

Messages are received and settled at the receiver without waiting for the sender to settle. Duplicates may occur.

EXACTLY_ONCE

Messages are (1) received, (2) the sender settles and then (3) the receiver settles. Messages are delivered once and only once.

AMQPContext

The client requires thread pool and tracing facilities from its environment. This is provided from the AMQPContext class which has 2 modes, CLIENT and ROUTER. Mode ROUTER is for internal use only so the context always needs to be created with mode CLIENT:

JAVA
            AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);

Connection

Without SASL

A Connection without SASL authentication is created by setting the doAuth parameter to false:

JAVA
            Connection connection = new Connection(ctx, host, port, false);

With SASL

A Connection with SASL authentication is created by setting the doAuth parameter to true. If usernameand password is not specified, connect takes place via the SASL mechanism ANONYMOUS:

JAVA
            Connection connection = new Connection(ctx, host, port, true);

Otherwise as the specified user with SASL mechanism PLAIN by default:

JAVA
            Connection connection = new Connection(ctx, host, port, true, username, password);

With SASL EXTERNAL

Here is a nice writeup by Jakub Scholz about using SSL and SASL EXTERNAL authentication with the Apache Qpid C++ Broker.

Connection Configuration and actual connect

The Connection is in an unconnected state after construction and can now be configured. For example, the maximum frame size and an exception listener can be set or the SASL mechanism to use can be specified. The SASL mechanism depends on the provision of the remote endpoint. SwiftMQ provides PLAIN, ANONYMOUS and the platform mechanisms CRAM-MD5 and Digest-MD5. For use of ANONYMOUSmechanism please use the resp. constructor of the connection.

JAVA
            connection.setMechanism("CRAM-MD5");
            connection.setMaxFrameSize(1024);
            connection.setIdleTimeout(120000); // Sends and expects heart beat frames
            connection.setExceptionListener(this);

There are 2 socket factories that can be used to create the TCP connections: com.swiftmq.net.PlainSocketFactory (default) and com.swiftmq.net.JSSESocketFactory. The latter must be specified to connect via SSL/TLS:

JAVA
            connection.setSocketFactory(new JSSESocketFactory());

Finally, do the connect:

JAVA
            connection.connect();

Session

A Session is created from a Connection. It multiplexes traffic between link endpoints and can handle non-transacted and transacted transfers with multiple active transactions simultaneously. It maintains incoming and outgoing windows of unsettled transfers (depending on the maximum frame size, a message can be split over multiple transfers). Both need to be specified when creating the Session:

JAVA
            Session session = connection.createSession(100, 100);

Producer

A Producer is created from a Session on a target (queue or topic) with a specific QoS. The whole settlement interaction will be done internally so there is nothing more to do than to send messages. The transfer itself is asynchronous as well as the settlement. Only the close() of the Producer is done synchronously and ensures that all settlement has been done for the previously sent messages when the close()-method returns.

If the connection is dropped before close() can be called, unsettled deliveries can be recovered. See the last section Link Recovery at the end of this page.

JAVA
            Producer p = session.createProducer("orderqueue@router4", QoS.AT_LEAST_ONCE);
            ...
            p.send(msg); // Always asynchronous
            ...
            p.close(); // Settlement completed after this call returns

Consumer

A Consumer is created from a Session on a source (queue or topic) with a specific QoS. Each Consumerhas a client-side message cache which is dimensioned by the link credit, a value that is passed to the source.

On a non-temporary Source

A Consumer on a non-temporary source (regular queue or topic) is created by specifying the name of the source, the link credit (consumer cache size), the QoS, a NoLocal flag, and an optional message selector. If the NoLocal flag is set to true (default) then messages sent from the same connection on a topic are not received. This is the case when a client sends and receives on the same topic but doesn't want to receive its own messages. The message selector is a string that the connected message broker must understand. For SwiftMQ this is a JMS message selector.

JAVA
            Consumer c = session.createConsumer("orderqueue", 200, QoS.EXACTLY_ONCE,
                                                true, "orderid between 0 and 100");

On a temporary Sources

A Consumer on a temporary source (temporary queue, e.g. for request/reply) is created like above but without specifying a source parameter. This automatically creates the temporary source with a lifetime of that of this link and attaches the Consumer to it. The address of the temporary source can be obtained by calling getRemoteAddress():

JAVA
            Consumer c = session.createConsumer(200, QoS.AT_MOST_ONCE);
            AddressIF remoteAddress = c.getRemoteAddress(); // Use it for request/reply

Non-blocking Receive

To consume messages, the various receive() methods of a consumer can be called. There is no message listener like in JMS but a way to perform non-blocking receives. This is done by calling receiveNoWait(listener) and passing a MessageAvailabilityListener as a parameter. If there is no message available at the time of the receiveNoWait(listener) call, the MessageAvailabilityListeneris stored and called later when a message is available:

JAVA
          // Implementation of MessageAvailabilityListener
          // Notifies a Poller thread to call poll()
          public void messageAvailable(Consumer consumer)
          {
            poller.enqueue(this);
          }
    
          // Called from the Poller thread
          public void poll()
          {
            AMQPMessage msg = c.receiveNoWait(this);
            if (msg != null)
            {
              // process message
            }
          }

Settlement

Settlement on the Consumer side needs to be done by the application and can be done in 2 ways. If a message is unsettled, it can be accepted or rejected. The latter will lead to the redelivery of the message. Note that AMQP settles every single message (and not streams of messages like in JMS). Settlement can be done by different threads so it's completely legal to call "receive()" by different threads and process them in parallel, e.g. by using worker thread pools. The order of settlement is not relevant, that is, message #3 can be settled before message #1.

Settlement is done asynchronously. Only the close() of the Consumer is done synchronously and ensures that all settlement has been done for the previously consumed messages when the close()-method returns. If the connection is dropped before close() can be called, unsettled deliveries can be recovered. See the last section Link Recovery at the end of this page.

JAVA
            AMQPMessage msg = c.receive();
            if (msg != null) // null is returned if the consumer was closed
            {
              if (!msg.isSettled())
              {
                if (processed(msg))
                  msg.accept();
                else
                  msg.reject();
              }
            }

DurableConsumer

A DurableConsumer is created from a Session on a topic source with a specific QoS. The durable link is identified by a link name. If the link does not exists, it will be created with a terminus expiry policy of NEVERand terminus durability of CONFIGURATION. If it exists, it will be attached to the DurableConsumer. The durable link will remain in place after detaching the DurableConsumer so messages are received while the DurableConsumer is disconnected and delivered to it once the DurableConsumer reconnects. A call to unsubscribe() will set the terminus expiry policy to LINK_DETACH and the durable link will be deleted.

If connected to a SwiftMQ Router, a DurableConsumer is used to create a durable subscription on a topic like in JMS. The durable subscription is identified by the container id and the link name. Therefore, the container id and the link name need to be the same to connect to the same durable subscription.

JAVA
            // Set up the connection
            AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
            Connection connection = new Connection(ctx, host, port, true);
            // Set the container id
            connection.setContainerId("orderprocessor");
            connection.connect();
            Session session = connection.createSession(Integer.MAX_VALUE, Integer.MAX_VALUE);
            // Create the durable on topic hierarchy "orders.%"
            DurableConsumer c = session.createDurableConsumer("ordermailbox", "orders.%", 20,
                                                              QoS.EXACTLY_ONCE, true, null);
    
            // Do processing
    
            // Close the link
            c.close();
            // Delete the durable if not needed anymore
            if (!neededAnymore)
              c.unsubscribe();
            session.close();
            connection.close();

TransactionController

A TransactionController can be obtained from a Session to do transactional work on it. A Session does not have to be created as a transacted Session as in JMS. Rather a Session can handle non-transacted and transacted traffic at the same time.

The Session's TransactionController is obtained by use of the corresponding getter method and the transaction capabilities can be retrieved:

JAVA
            TransactionController txc = session.getTransactionController();
            boolean multiTxnsPerSession = txc.isSupportMultiTxnsPerSsn();

The SwiftMQ Router can handle local transactions (LOCAL_TRANSACTIONS) and multiple transactions per session (MULTI_TXNS_PER_SSN).

To do transactional work, a transaction id is required. This is created by the TransactionController(which obtains it from the transactional resource):

JAVA
            TxnIdIF txnid = txc.createTxnId();

To finish a transaction, either commit():

JAVA
            txc.commit(txnid);

or rollback() needs to be called:

JAVA
            txc.rollback(txnid);

Outbound (Sending) Transactions

To send a message in a transaction, the message has to be associated with the transaction id.

JAVA
            // Send messages in transactions of size 
            int currentTxSize = 0;
            TxnIdIF txnId = txc.createTxnId();
            for (int i = 0; i < nMsgs; i++)
            {
              AMQPMessage msg = new AMQPMessage();
              String s = "Message #" + (i + 1);
              System.out.println("Sending " + s);
              msg.setAmqpValue(new AmqpValue(new AMQPString(s)));
              msg.setTxnIdIF(txnId); // Transaction association
              p.send(msg);
              currentTxSize++;
              if ((i + 1) % txSize == 0)
              {
                txc.commit(txnId);
                txnId = txc.createTxnId();
                currentTxSize = 0;
              }
            }
            if (currentTxSize > 0)
              txc.commit(txnId);

Inbound (Receiving) Transactions as Transactional Retirement

Transactional retirement means that messages are delivered to the Consumer in a non-transacted fashion. After the client has received it, the outcome in form of accepting or rejecting the message can be associated with a transaction and will then be applied to the messages on commit or discarded on rollback. So NOT the messages are associated with a transaction BUT the outcomes accept and reject.

The big advantage here is that the messages are still at the client (and don't need to be redelivered) if a transaction is rolled back and so the messages can be associated with another transaction or can be settled in a non-transacted fashion. This provides enormous flexibility.

Transactional retirement is not conforming with QoS mode AT_MOST_ONCE (pre-settlement).

JAVA
            TxnIdIF txnid = txc.createTxnId();
            AMQPMessage msg = c.receive();
            msg.setTxnIdIF(txnid);  // Transaction association
            try {
              String s = "RE: "+((AMQPString)msg.getAmqpValue().getValue()).getValue();
              AMQPMessage msg1 = new AMQPMessage();
              msg1.setAmqpValue(new AmqpValue(new AMQPString(s)));
              msg1.setTxnIdIF(txnid);  // Transaction association
              producer.send(msg1);
              msg.accept();
            } catch (Exception e)
            {
              // error, reject
              msg.reject();
            }
            txc.commit(txnid);

Inbound (Receiving) Transactions as Transactional Acquisition

AMQP provides a second way to handle inbound traffic as transactions. It is called transactional acquisition. Here the client acquires a number of messages under a transaction id so the delivery is already associated with that transaction id. This is the only difference to transactional retirement.

The transactional acquisition is not conforming with QoS mode AT_MOST_ONCE (pre-settlement).

The Consumer must be created without a link credit parameter because the link credit is dimensioned by the number of acquired messages:

JAVA
            // Important to create the consumer without a link credit
            // because the link credit is set by the acquisition
            Consumer c = session.createConsumer(source, OoS.EXACTLY_ONCE, true, null);
    
            // Get the transaction controller
            TransactionController txc = session.getTransactionController();
    
            // Receive messages in transactions in size 
            int currentTxSize = 0;
            TxnIdIF txnId = txc.createTxnId();
            // Acquire  messages under this txnid
            c.acquire(txSize, txnId);
            for (int i = 0; i < nMsgs; i++)
            {
              AMQPMessage msg = c.receive();
              if (msg == null)
                break;
              AmqpValue value = msg.getAmqpValue();
              System.out.println("Received: " + ((AMQPString) value.getValue()).getValue());
              msg.accept();
              currentTxSize++;
              if ((i + 1) % txSize == 0)
              {
                // Commit and acquire the next  messages under a new txnid
                txc.commit(txnId);
                txnId = txc.createTxnId();
                c.acquire(txSize, txnId);
                currentTxSize = 0;
              }
            }
            if (currentTxSize > 0)
              txc.commit(txnId);

Link Recovery

If a Link (a Producer, Consumer or DurableConsumer) cannot be properly closed by its close()-method, e.g. because the corresponding connection disconnects, there can be unsettled deliveries. In order to finish the settlement, a link can be recovered.

Each Link has a so-called DeliveryMemory where unsettled deliveries are stored and removed after settlement. This DeliveryMemory is specified by this interface:

JAVA
          package com.swiftmq.amqp.v100.client;
    
          import com.swiftmq.amqp.v100.generated.transport.definitions.DeliveryTag;
    
          import java.util.Collection;
    
          /**
           * Specifies a memory to store unsettled deliveries of a link (producer and consumer).
           *
           * @author IIT Software GmbH, Bremen/Germany, (c) 2012, All Rights Reserved
           */
          public interface DeliveryMemory
          {
            /**
             * Will be called from the link to set its link name. This is only done if the name
             * has not been set before and ensures that new created links that use this delivery
             * memory use the same link name as before.
             *
             * @param name
             */
            public void setLinkName(String name);
    
            /**
             * Returns the link name,
             *
             * @return link name
             */
            public String getLinkName();
    
            /**
             * Adds an unsettled delivery which consists of a delivery tag, the delivery state
             * and the AMQP message.
             *
             * @param unsettledDelivery unsettled delivery
             */
            public void addUnsettledDelivery(UnsettledDelivery unsettledDelivery);
    
            /**
             * Removes an unsettled delivery from the memory.
             *
             * @param deliveryTag delivery tag
             */
            public void deliverySettled(DeliveryTag deliveryTag);
    
            /**
             * Returns the number of unsettled deliveries contained in this memory.
             *
             * @return number unsettled deliveries
             */
            public int getNumberUnsettled();
    
            /**
             * Returns a collection of all unsettled deliveries. The delivery memory remains untouched
             * so the returned Collection is a copy (or better a clone) of the content.
             *
             * @return unsettled deliveries
             */
            public Collection getUnsettled();
          }

One of the hidden beauties of AMQP 1.0 is the ability to delegate the management of deliveries and their state to the application which can reconstruct this state out of application data. So even if the client dies and there are unsettled deliveries, it can be recovered.

A DeliveryMemory can be optionally specified as a parameter to the various create methods of a Session, e.g.:

JAVA
          Producer p = session.createProducer("orderqueue@router4",
                                              QoS.EXACTLY_ONCE,
                                              new OrderDatabaseDeliveryMemory());

To identify a delivery, an application can tag a message with its own DeliveryTag which will then be used for the settlement via the DeliveryMemory. If no delivery tag is specified, an internal sequence number will be used instead:

JAVA
          AMQPMessage msg = new AMQPMessage();
          ...
          msg.setDeliveryTag(getOrderId());
          p.send(msg);

This works similarly for consumers.

The actual recovery of a Link will be done during Link creation, see above. Before the Link is attached, it retrieves the unsettled deliveries from the DeliveryMemory and performs the settlement.

If no DeliveryMemory is specified when the Link is created, a DefaultDeliveryMemory is used which stores unsettled deliveries in a Map. If the client dies, this state is lost, of course.

So the most basic recovery without any custom implementation of DeliveryMemory or DeliveryTag can look like this:

JAVA
          Producer p = session.createProducer("orderqueue@router4",
                                              QoS.AT_LEAST_ONCE);
          ...
          p.send(msg); // Always asynchronously
          ...
          < CONNECTION DROPS >
          // Retrieve the old DeliveryMemory from the former Producer, do the
          // recovery and continue
          Producer p = session.createProducer("orderqueue@router4",
                                              QoS.AT_LEAST_ONCE,
                                              p.getDeliveryMemory());
          p.close(); // All settlement done (recovered)

Filters

AMQP 1.0 filters are extension points that are listed in a public registry on www.amqp.org

To ensure minimal interoperability between AMQP brokers and clients concerning JMS selector filters, the SwiftMQ AMQP 1.0 Java Client uses the filter declarations APACHE.ORG:SELECTOR and APACHE.ORG:NO_LOCAL of Apache Qpid's AMQP 1.0 implementation.

JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.