Bridges
This library contains components to create bridges from/to other message brokers. These bridges are ordered by their protocols AMQP, JMS, and MQTT.
The general way of creating a bridge is to add a Connection
component to the flow and configure it. The Connection
provides a reference used by Inbound
and/or Outbound
bridge components that do the actual bridging. A single Connection
component can be connected to multiple Inbound
and/or Outbound
bridge components.
AMQP
AMQP means Advanced Message Queueing Protocol and is an ISO standard protocol since version 1.0. There is a prior version, 0.9.1, which is completely incompatible with version 1.0 but much easier to handle. RabbitMQ was (and still is) the first message broker that has implemented AMQP 0.9.1 and still uses it as its base protocol. Due to the popularity of RabbitMQ, AMQP 0.9.1 currently has a wider adoption than AMQP 1.0 but, in fact, is a private protocol of RabbitMQ.
Flow Director supports both versions of AMQP. Because flows work exclusively with JMS messages internally, inbound and outbound AMQP bridges need to be connected to SwiftMQ's AMQP Swiftlet for the conversion from AMQP messages into JMS messages and vice versa.
Inbound Bridges
The above example defines a RabbitMQ inbound bridge on queue testqueue
connected to the local SwiftMQ router as an outbound bridge to queue fromrabbitmq
. AMQP 0.9.1 messages from the inbound bridge are passed to the outbound bridge and are automatically converted into JMS messages. You can then use a normal Queue Input component to receive the JMS messages from queue fromrabbitmq
and you can process the messages.
Outbound Bridges
Here messages are sent within the flow as JMS messages through component Queue Output
to queue torabbitmq
, received from a SwiftMQ inbound bridge as converted AMQP 0.9.1 messages and sent to a RabbitMQ outbound bridge testqueue
.
AMQP 1.0
Connection
, Inbound
and Outbound
components are provided. The components use SwiftMQ AMQP 1.0 Java Client internally, which is in the classpath, so there are no additional jar files required. The AMQP ports are 5672
for plain and 5671
for TLS connections. The latter is not supported (see the last section). A SwiftMQ Connection
has to be created to localhost
on port 5672
.
AMQP 0.9.1 (RabbitMQ)
Connection
, Inbound
and Outbound
components are provided. The components use RabbitMQ Java Client internally, which is not in the classpath and has to be attached to the flow:
The AMQP ports are 5672
for plain and 5671
for TLS connections. The latter is not supported (see the last section). A SwiftMQ Connection
has to be created to localhost
on port 5672
.
Note: SwiftMQ in- and outbound bridges can be created on queues only as SwiftMQ has only limited support for AMQP 0.9.1. It supports exchange type direct
only.
If the AMQP header's content-type property
starts with text/
(like text/plain
), the resulting JMS message will be a JMS TextMessage
where the message body is a UTF-8 decoded String
from the AMQP bytes message body. Otherwise, the resulting JMS message will be a JMS BytesMessage
with the body set from the AMQP bytes message body.
JMS
Connection
components exist for the following message brokers:
ActiveMQ Classic (ActiveMQ 5)
ActiveMQ Artemis
IBM MQ (formerly WebsphereMQ, MQ Series)
Solace
SwiftMQ to connect remote routers that are not connected to the router network via routing connection
Inbound
and Outbound
bridges exist for queues and topics. The bridges provide and accept JMS messages and can be directly connected with flow components without an intermediate bridge to SwiftMQ.
The components use the respective client jar files of the specific message broker internally, which are not in the classpath and have to be attached to the flow:
So if you use ActiveMQ Artemis, you need to attach the ActiveMQ Artemis client jar. You don't need to attach a jar file if you use a SwiftMQ bridge.
Avoid mixing bridges of different message brokers within the same flow as you need to upload all client jar files to the same flow. This might create clashes in the class loader if they use the same dependencies but different versions. Instead, create separate flows.
There is no problem with having a SwiftMQ bridge and another message broker bridge within the same flow.
Kafka
The bridge to Apache Kafka has a different design. Kafka doesn't know connections but only producers and consumers. So the bridge consists of a Producer
and a Consumer
.
The components use the Apache Kafka Client jar file, which is not in the classpath and have to be attached to the flow:
Producer
The Producer
component connects to a topic at a Kafka cluster. It accepts BytesMessage
or TextMessage
as input. The input message requires a message property with a name key
and a type string containing
the key. The body of the message contains the value. StringSerializer
is used for the key, and ByteArraySerializer
for the value.
Consumer
The Consumer
component can connect to multiple topics at a Kafka cluster. It produces BytesMessage
as output where the key is set as a String
property and the value as the body. StringDeserializer
is used for the key, and ByteArrayDeserializer
for the value.
Kafka's consumer API is poll-based. Therefore a Timer
component must be connected that drives the poll. It can return multiple messages within a single poll.
The Consumer
component uses explicit commit of the offsets from the last poll after the messages have been processed. So it will always start with the last uncommitted offset.
KSQL
KSQL is a SQL-based streaming analytics engine for Apache Kafka. The integration into Flow Director consists of 2 components:
A
Run
component that is used to issue DDL commands, such as to create streams/tables.A
Query
component executes a continuous query on the base of a stream/table and returns a continuous flow of results.
MQTT
MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, straightforward, and lightweight messaging protocol designed for constrained devices and low-bandwidth, high-latency, or unreliable networks.
The most popular version is 3.1.1, which is supported. A newer version 5 is available but not widely adopted and not yet supported.
MQTT 3.1.1
Connection
, Inbound
and Outbound
components are provided. The components use Eclipse Paho Java Client internally, which is not in the classpath and has to be attached to the flow:
As with AMQP, the MQTT bridges require creating a counterpart bridge to SwiftMQ's MQTT Swiftlet for the message transformation. See the AMQP section.
The resulting messages are JMS BytesMessage
.
The MQTT ports are 1883
for plain and 8883
for TLS connections. The latter is not supported (see the last section). A SwiftMQ Connection
has to be created to localhost
on port 1883
.
Pulsar
Connection
, Inbound
and Outbound
components are provided to connect, receive, and send from/to Pulsar topics. All data is sent as byte[]
payload. Received Pulsar messages are returned as BytesMessage
.
The components use the Apache Pulsar Java Client internally, which is not in the classpath, so all required jar files have been attached to the flow:
TLS (Secure Connections)
TLS should work with other message brokers out of the box if they use a certificate from a Certificate Authority (CA). If they use self-signed certificates, please convert it into a PEM file, if necessary, and attach it to your flow:
The certificate is then automatically sent to the router during deployment and stored in the router's trust store.