AMQP 1.0 Client
Introduction
The SwiftMQ AMQP 1.0 Java Client is an implementation of the AMQP 1.0 specification in Java and can be used to connect to any AMQP 1.0 capable endpoints such as a SwiftMQ Router.
The client API is similar to JMS but easier to handle, thread-safe if not specified otherwise and consists only of a few classes:
Class | Package | Description |
---|---|---|
| com.swiftmq.amqp.v100.messaging | Represents an AMQP message. |
| com.swiftmq.amqp.v100.client | Constants representing the different quality of service modes. |
| com.swiftmq.amqp | Represents the context of the client. |
| com.swiftmq.amqp.v100.client | Connection to an AMQP endpoint. |
| com.swiftmq.amqp.v100.client | Can be registered at a |
| com.swiftmq.amqp.v100.client | Grouping context for |
| com.swiftmq.amqp.v100.client | Outgoing (sending) |
| com.swiftmq.amqp.v100.client | Incoming (receiving) |
| com.swiftmq.amqp.v100.client | Incoming (receiving) |
| com.swiftmq.amqp.v100.client | Used for non-blocking receives. |
| com.swiftmq.amqp.v100.client | Used for settlement and link recovery. |
| com.swiftmq.amqp.v100.client | Can control multiple transactions per Session. |
Specification and Samples
Other classes referenced from the above are part of SwiftMQ's AMQP library (amqp.jar) which is almost directly generated from the specification's XML files.
There are also several example programs provided under the SwiftMQ distribution's directory "samples/amqp". Please have a look!
Debugging
The following system properties can be set at the client to enable debugging. All output goes to System.out.
System Property | Description |
---|---|
| Shows all sent and received AMQP frames. |
| Shows quite verbose debug output of the SwiftMQ AMQP library. |
Example:
-Dswiftmq.amqp.frame.debug=true -Dswiftmq.amqp.debug=true
AMQPMessage
The AMQP specification does not define a message type. It only specified sections that can make a message. AMQPMessage
is SwiftMQ's custom type of message that handles access to the sections. With the exception of accept()
and reject()
an AMQPMessage
is not thread-safe.
The following fields are overwritten from the Producer
during send and therefore MUST not be set from the application:
Section | Field |
---|---|
Header | Durable |
Header | Priority |
Header | Ttl |
Properties | MessageId |
Properties | To |
Properties | UserId |
QoS
For convenience the client API pre-defines 3 modes of quality of service (QoS):
Mode | Description |
---|---|
| Messages are pre-settled at the sender endpoint. Messages may be lost. |
| Messages are received and settled at the receiver without waiting for the sender to settle. Duplicates may occur. |
| Messages are (1) received, (2) the sender settles and then (3) the receiver settles. Messages are delivered once and only once. |
AMQPContext
The client requires thread pool and tracing facilities from its environment. This is provided from the AMQPContext
class which has 2 modes, CLIENT
and ROUTER
. Mode ROUTER
is for internal use only so the context always needs to be created with mode CLIENT
:
AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
Connection
Without SASL
A Connection without SASL authentication is created by setting the doAuth
parameter to false
:
Connection connection = new Connection(ctx, host, port, false);
With SASL
A Connection with SASL authentication is created by setting the doAuth
parameter to true. If username
and password
is not specified, connect takes place via the SASL mechanism ANONYMOUS
:
Connection connection = new Connection(ctx, host, port, true);
Otherwise as the specified user with SASL mechanism PLAIN
by default:
Connection connection = new Connection(ctx, host, port, true, username, password);
With SASL EXTERNAL
Here is a nice writeup by Jakub Scholz about using SSL and SASL EXTERNAL
authentication with the Apache Qpid C++ Broker.
Connection Configuration and actual connect
The Connection
is in an unconnected state after construction and can now be configured. For example, the maximum frame size and an exception listener can be set or the SASL mechanism to use can be specified. The SASL mechanism depends on the provision of the remote endpoint. SwiftMQ provides PLAIN
, ANONYMOUS
and the platform mechanisms CRAM-MD5
and Digest-MD5
. For use of ANONYMOUS
mechanism please use the resp. constructor of the connection.
connection.setMechanism("CRAM-MD5");
connection.setMaxFrameSize(1024);
connection.setIdleTimeout(120000); // Sends and expects heart beat frames
connection.setExceptionListener(this);
There are 2 socket factories that can be used to create the TCP connections: com.swiftmq.net.PlainSocketFactory
(default) and com.swiftmq.net.JSSESocketFactory
. The latter must be specified to connect via SSL/TLS:
connection.setSocketFactory(new JSSESocketFactory());
Finally, do the connect:
connection.connect();
Session
A Session
is created from a Connection
. It multiplexes traffic between link endpoints and can handle non-transacted and transacted transfers with multiple active transactions simultaneously. It maintains incoming and outgoing windows of unsettled transfers (depending on the maximum frame size, a message can be split over multiple transfers). Both need to be specified when creating the Session
:
Session session = connection.createSession(100, 100);
Producer
A Producer
is created from a Session
on a target (queue or topic) with a specific QoS. The whole settlement interaction will be done internally so there is nothing more to do than to send messages. The transfer itself is asynchronous as well as the settlement. Only the close()
of the Producer
is done synchronously and ensures that all settlement has been done for the previously sent messages when the close()
-method returns.
If the connection is dropped before close()
can be called, unsettled deliveries can be recovered. See the last section Link Recovery at the end of this page.
Producer p = session.createProducer("orderqueue@router4", QoS.AT_LEAST_ONCE);
...
p.send(msg); // Always asynchronous
...
p.close(); // Settlement completed after this call returns
Consumer
A Consumer
is created from a Session
on a source (queue or topic) with a specific QoS. Each Consumer
has a client-side message cache which is dimensioned by the link credit, a value that is passed to the source.
On a non-temporary Source
A Consumer
on a non-temporary source (regular queue or topic) is created by specifying the name of the source, the link credit (consumer cache size), the QoS, a NoLocal
flag, and an optional message selector. If the NoLocal
flag is set to true (default) then messages sent from the same connection on a topic are not received. This is the case when a client sends and receives on the same topic but doesn't want to receive its own messages. The message selector is a string that the connected message broker must understand. For SwiftMQ this is a JMS message selector.
Consumer c = session.createConsumer("orderqueue", 200, QoS.EXACTLY_ONCE,
true, "orderid between 0 and 100");
On a temporary Sources
A Consumer
on a temporary source (temporary queue, e.g. for request/reply) is created like above but without specifying a source parameter. This automatically creates the temporary source with a lifetime of that of this link and attaches the Consumer
to it. The address of the temporary source can be obtained by calling getRemoteAddress()
:
Consumer c = session.createConsumer(200, QoS.AT_MOST_ONCE);
AddressIF remoteAddress = c.getRemoteAddress(); // Use it for request/reply
Non-blocking Receive
To consume messages, the various receive()
methods of a consumer can be called. There is no message listener like in JMS but a way to perform non-blocking receives. This is done by calling receiveNoWait(listener)
and passing a MessageAvailabilityListener as a parameter. If there is no message available at the time of the receiveNoWait(listener)
call, the MessageAvailabilityListener
is stored and called later when a message is available:
// Implementation of MessageAvailabilityListener
// Notifies a Poller thread to call poll()
public void messageAvailable(Consumer consumer)
{
poller.enqueue(this);
}
// Called from the Poller thread
public void poll()
{
AMQPMessage msg = c.receiveNoWait(this);
if (msg != null)
{
// process message
}
}
Settlement
Settlement on the Consumer
side needs to be done by the application and can be done in 2 ways. If a message is unsettled, it can be accepted or rejected. The latter will lead to the redelivery of the message. Note that AMQP settles every single message (and not streams of messages like in JMS). Settlement can be done by different threads so it's completely legal to call "receive()" by different threads and process them in parallel, e.g. by using worker thread pools. The order of settlement is not relevant, that is, message #3 can be settled before message #1.
Settlement is done asynchronously. Only the close()
of the Consumer
is done synchronously and ensures that all settlement has been done for the previously consumed messages when the close()
-method returns. If the connection is dropped before close()
can be called, unsettled deliveries can be recovered. See the last section Link Recovery at the end of this page.
AMQPMessage msg = c.receive();
if (msg != null) // null is returned if the consumer was closed
{
if (!msg.isSettled())
{
if (processed(msg))
msg.accept();
else
msg.reject();
}
}
DurableConsumer
A DurableConsumer
is created from a Session
on a topic source with a specific QoS. The durable link is identified by a link name. If the link does not exists, it will be created with a terminus expiry policy of NEVER
and terminus durability of CONFIGURATION
. If it exists, it will be attached to the DurableConsumer
. The durable link will remain in place after detaching the DurableConsumer
so messages are received while the DurableConsumer
is disconnected and delivered to it once the DurableConsumer
reconnects. A call to unsubscribe()
will set the terminus expiry policy to LINK_DETACH
and the durable link will be deleted.
If connected to a SwiftMQ Router, a DurableConsumer
is used to create a durable subscription on a topic like in JMS. The durable subscription is identified by the container id and the link name. Therefore, the container id and the link name need to be the same to connect to the same durable subscription.
// Set up the connection
AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
Connection connection = new Connection(ctx, host, port, true);
// Set the container id
connection.setContainerId("orderprocessor");
connection.connect();
Session session = connection.createSession(Integer.MAX_VALUE, Integer.MAX_VALUE);
// Create the durable on topic hierarchy "orders.%"
DurableConsumer c = session.createDurableConsumer("ordermailbox", "orders.%", 20,
QoS.EXACTLY_ONCE, true, null);
// Do processing
// Close the link
c.close();
// Delete the durable if not needed anymore
if (!neededAnymore)
c.unsubscribe();
session.close();
connection.close();
TransactionController
A TransactionController
can be obtained from a Session
to do transactional work on it. A Session
does not have to be created as a transacted Session as in JMS. Rather a Session can handle non-transacted and transacted traffic at the same time.
The Session
's TransactionController
is obtained by use of the corresponding getter method and the transaction capabilities can be retrieved:
TransactionController txc = session.getTransactionController();
boolean multiTxnsPerSession = txc.isSupportMultiTxnsPerSsn();
The SwiftMQ Router can handle local transactions (LOCAL_TRANSACTIONS
) and multiple transactions per session (MULTI_TXNS_PER_SSN
).
To do transactional work, a transaction id is required. This is created by the TransactionController
(which obtains it from the transactional resource):
TxnIdIF txnid = txc.createTxnId();
To finish a transaction, either commit()
:
txc.commit(txnid);
or rollback()
needs to be called:
txc.rollback(txnid);
Outbound (Sending) Transactions
To send a message in a transaction, the message has to be associated with the transaction id.
// Send messages in transactions of size
int currentTxSize = 0;
TxnIdIF txnId = txc.createTxnId();
for (int i = 0; i < nMsgs; i++)
{
AMQPMessage msg = new AMQPMessage();
String s = "Message #" + (i + 1);
System.out.println("Sending " + s);
msg.setAmqpValue(new AmqpValue(new AMQPString(s)));
msg.setTxnIdIF(txnId); // Transaction association
p.send(msg);
currentTxSize++;
if ((i + 1) % txSize == 0)
{
txc.commit(txnId);
txnId = txc.createTxnId();
currentTxSize = 0;
}
}
if (currentTxSize > 0)
txc.commit(txnId);
Inbound (Receiving) Transactions as Transactional Retirement
Transactional retirement means that messages are delivered to the Consumer
in a non-transacted fashion. After the client has received it, the outcome in form of accepting or rejecting the message can be associated with a transaction and will then be applied to the messages on commit or discarded on rollback. So NOT the messages are associated with a transaction BUT the outcomes accept and reject.
The big advantage here is that the messages are still at the client (and don't need to be redelivered) if a transaction is rolled back and so the messages can be associated with another transaction or can be settled in a non-transacted fashion. This provides enormous flexibility.
Transactional retirement is not conforming with QoS mode AT_MOST_ONCE
(pre-settlement).
TxnIdIF txnid = txc.createTxnId();
AMQPMessage msg = c.receive();
msg.setTxnIdIF(txnid); // Transaction association
try {
String s = "RE: "+((AMQPString)msg.getAmqpValue().getValue()).getValue();
AMQPMessage msg1 = new AMQPMessage();
msg1.setAmqpValue(new AmqpValue(new AMQPString(s)));
msg1.setTxnIdIF(txnid); // Transaction association
producer.send(msg1);
msg.accept();
} catch (Exception e)
{
// error, reject
msg.reject();
}
txc.commit(txnid);
Inbound (Receiving) Transactions as Transactional Acquisition
AMQP provides a second way to handle inbound traffic as transactions. It is called transactional acquisition. Here the client acquires a number of messages under a transaction id so the delivery is already associated with that transaction id. This is the only difference to transactional retirement.
The transactional acquisition is not conforming with QoS mode AT_MOST_ONCE
(pre-settlement).
The Consumer
must be created without a link credit parameter because the link credit is dimensioned by the number of acquired messages:
// Important to create the consumer without a link credit
// because the link credit is set by the acquisition
Consumer c = session.createConsumer(source, OoS.EXACTLY_ONCE, true, null);
// Get the transaction controller
TransactionController txc = session.getTransactionController();
// Receive messages in transactions in size
int currentTxSize = 0;
TxnIdIF txnId = txc.createTxnId();
// Acquire messages under this txnid
c.acquire(txSize, txnId);
for (int i = 0; i < nMsgs; i++)
{
AMQPMessage msg = c.receive();
if (msg == null)
break;
AmqpValue value = msg.getAmqpValue();
System.out.println("Received: " + ((AMQPString) value.getValue()).getValue());
msg.accept();
currentTxSize++;
if ((i + 1) % txSize == 0)
{
// Commit and acquire the next messages under a new txnid
txc.commit(txnId);
txnId = txc.createTxnId();
c.acquire(txSize, txnId);
currentTxSize = 0;
}
}
if (currentTxSize > 0)
txc.commit(txnId);
Link Recovery
If a Link
(a Producer
, Consumer
or DurableConsumer
) cannot be properly closed by its close()
-method, e.g. because the corresponding connection disconnects, there can be unsettled deliveries. In order to finish the settlement, a link can be recovered.
Each Link has a so-called DeliveryMemory
where unsettled deliveries are stored and removed after settlement. This DeliveryMemory
is specified by this interface:
package com.swiftmq.amqp.v100.client;
import com.swiftmq.amqp.v100.generated.transport.definitions.DeliveryTag;
import java.util.Collection;
/**
* Specifies a memory to store unsettled deliveries of a link (producer and consumer).
*
* @author IIT Software GmbH, Bremen/Germany, (c) 2012, All Rights Reserved
*/
public interface DeliveryMemory
{
/**
* Will be called from the link to set its link name. This is only done if the name
* has not been set before and ensures that new created links that use this delivery
* memory use the same link name as before.
*
* @param name
*/
public void setLinkName(String name);
/**
* Returns the link name,
*
* @return link name
*/
public String getLinkName();
/**
* Adds an unsettled delivery which consists of a delivery tag, the delivery state
* and the AMQP message.
*
* @param unsettledDelivery unsettled delivery
*/
public void addUnsettledDelivery(UnsettledDelivery unsettledDelivery);
/**
* Removes an unsettled delivery from the memory.
*
* @param deliveryTag delivery tag
*/
public void deliverySettled(DeliveryTag deliveryTag);
/**
* Returns the number of unsettled deliveries contained in this memory.
*
* @return number unsettled deliveries
*/
public int getNumberUnsettled();
/**
* Returns a collection of all unsettled deliveries. The delivery memory remains untouched
* so the returned Collection is a copy (or better a clone) of the content.
*
* @return unsettled deliveries
*/
public Collection getUnsettled();
}
One of the hidden beauties of AMQP 1.0 is the ability to delegate the management of deliveries and their state to the application which can reconstruct this state out of application data. So even if the client dies and there are unsettled deliveries, it can be recovered.
A DeliveryMemory
can be optionally specified as a parameter to the various create
methods of a Session
, e.g.:
Producer p = session.createProducer("orderqueue@router4",
QoS.EXACTLY_ONCE,
new OrderDatabaseDeliveryMemory());
To identify a delivery, an application can tag a message with its own DeliveryTag
which will then be used for the settlement via the DeliveryMemory
. If no delivery tag is specified, an internal sequence number will be used instead:
AMQPMessage msg = new AMQPMessage();
...
msg.setDeliveryTag(getOrderId());
p.send(msg);
This works similarly for consumers.
The actual recovery of a Link
will be done during Link
creation, see above. Before the Link
is attached, it retrieves the unsettled deliveries from the DeliveryMemory
and performs the settlement.
If no DeliveryMemory
is specified when the Link
is created, a DefaultDeliveryMemory
is used which stores unsettled deliveries in a Map
. If the client dies, this state is lost, of course.
So the most basic recovery without any custom implementation of DeliveryMemory
or DeliveryTag
can look like this:
Producer p = session.createProducer("orderqueue@router4",
QoS.AT_LEAST_ONCE);
...
p.send(msg); // Always asynchronously
...
< CONNECTION DROPS >
// Retrieve the old DeliveryMemory from the former Producer, do the
// recovery and continue
Producer p = session.createProducer("orderqueue@router4",
QoS.AT_LEAST_ONCE,
p.getDeliveryMemory());
p.close(); // All settlement done (recovered)
Filters
AMQP 1.0 filters are extension points that are listed in a public registry on www.amqp.org
To ensure minimal interoperability between AMQP brokers and clients concerning JMS selector filters, the SwiftMQ AMQP 1.0 Java Client uses the filter declarations APACHE.ORG:SELECTOR
and APACHE.ORG:NO_LOCAL
of Apache Qpid's AMQP 1.0 implementation.