Streams Swiftlet
SwiftMQ Streams is a platform to run microservices - called Streams - within a SwiftMQ Router.
Advantages of SwiftMQ Streams
The big advantage over other libraries that run microservices on a JVM is this:
The JVM owner SwiftMQ is a full-blown Enterprise Messaging System.
Streams are fully integrated with the internal messaging infrastructure of SwiftMQ.
Applications can communicate out of the box with Streams through normal JMS/AMQP messages.
SwiftMQ's integrated Federated Router Network let Streams communicate with Streams on other nodes transparently.
Streams can intercept, analyze and visualize any message flow sent between applications without changing them.
Everything is provided by a SwiftMQ Router. No external libraries, no wiring and deployment hassle.
Features
Per definition, microservices are:
Small in size.
Loosely coupled.
Self-contained.
Easily replaceable.
Autonomously developed.
Independently deployable.
Communicate over Messages.
A Stream completely conforms to these definitions and provides the following additional features:
Streams are implemented in JavaScript
Users implement Streams in JavaScript (the latest ECMAScript standard is supported by GraalVM). These scripts use an elegant and powerful Java DSL, the Stream Interface, to interact with the stream.
Here is a nice JavaScript example:
stream.memory("itemstats").group("ITEMNO").sum("QTY").sort("QTY").reverse();
This statement
uses a Memory called
itemstats
where order items are stored in a sliding window over the last 30 seconds,groups it by property
ITEMNO
,computes the sum over property
QTY
(quantity),sorts the result,
and reverses the order.
The result is a statistic of all order items in descending order over the last 30 seconds.
Single-threaded
The processing of a Stream takes place single-threaded. No side effects, no synchronization overhead, no locks.
Event-driven
A Stream is driven by the following events:
A Message arrives on a queue or topic.
A management event occurs.
A Timer event occurs.
A Stream's start or stop event occurs.
Data is private
All data that Streams store or create are local to the Stream. Data is not shared. The only way to share it is to communicate it via Messages to other Streams. This makes a Stream very simple and replaceable.
Stateful / stateless
A Stream can store data in so-called Memories and compute it. Memories can be stateful and stateless. A stateful Memory stores its content in a persistent queue which survives restarts of a Router and failover of a HA Router. A stateless Memory stores its content on the heap or, for larger content, in a temporary queue.
Transactional
A transaction is automatically started whenever an event on the Stream occurs. All actions done within the Stream such as sending to Outputs, and adding to / removing from Memories are transactionally consistent. It is not possible to perform these actions outside of a transaction.
When the event has been processed, all changes that have been performed are fully committed. If an error/exception occurs during the processing of the event, the transaction will be aborted and all changes will be rolled back. For a message event, this means that the Message is being redelivered.
Fault-tolerant
Streams that run on a SwiftMQ High Availability Router are automatically fault-tolerant. The Stream scripts and configuration as well as any data changes are automatically replicated to the STANDBY instance and, in case of a failover, Streams continue after the failover where they have stopped before without duplicates or missed messages.
Streams have a restart policy that makes them fault-tolerant to internal exceptions such as the unavailability of resources.
Stream dependencies
Streams can be configured to depend on other Streams to ensure a consistent start/stop semantic. A dependency graph is automatically created. When a Stream is started, all dependent Streams are started first; when a Stream is stopped, all Streams that depend on this Stream are stopped first. This is done recursively through the whole dependency graph.
<stream name="orderprobe" enabled="true" script-file="repository:/dashboard/propertyprobe.js">
<dependencies>
<dependency name="dashboard.warehouse.store"/>
<dependency name="swiftmq.system.streamregistry"/>
</dependencies>
<parameters>
<parameter name="destination-name" value="stream_router1_dashboard_warehouse_accounting"/>
<parameter name="destination-type" value="topic"/>
<parameter name="props" value="total nitems"/>
</parameters>
</stream>
Dependency configuration allows to build large Apps as a single deployable unit that may contain hundreds of single Streams that are automatically started and stopped in a consistent manner.
Remotely deployable
Each SwiftMQ and High Availability Router provides a local Stream Repository which stores Stream scripts (the sources) in named repositories. These repositories are backed by a persistent queue so in case of a failover of a HA Router these Stream scripts are automatically available at the other HA instance.
DevOps use a simple repo client to upload Stream scripts to any Router in a Router Network. When a Stream is started, the source is fetched out of the local repository. So new Streams or changes are immediately available at remote Routers without the need to log in to a remote host.
./repojms smqp://localhost:4001 ConnectionFactory admin secret router1 \
add warehouse /opt/swiftmq_1_0_0_dashboard/apps/warehouse/streams js
accounting.js added to repository warehouse
customer.js added to repository warehouse
ordermonitor.js added to repository warehouse
producer.js added to repository warehouse
reordermonitor.js added to repository warehouse
shipper.js added to repository warehouse
shippingmonitor.js added to repository warehouse
store.js added to repository warehouse
Scalable over Router Networks
Streams communicate via Messages over queues and/or topics. They can be deployed on a single Router or on many different nodes of a Router Network. SwiftMQ's integrated Federated Router Network provides transparent Message routing so that Streams on one node can communicate with Streams on other nodes transparently.
With this, it is possible to split time-consuming tasks to many different nodes or to process these tasks at the data source (edge) and send results to other nodes for further processing.
Realtime Streaming Analytics
A Stream can intercept any queue by a wiretap and any topic as a subscriber as well as any event from the Router's Management Tree. It is possible to intercept the Message flow of any JMS/AMQP application that communicates over SwiftMQ. Neither the application nor the Messages they exchange have to be modified.
// Create a wiretap input on the orderpos queue.
stream.create().input(orderPosQueue).wiretap("w1").onInput(function (input) {
// We need ITEMNO and QTY as a property
input.current().property("ITEMNO").set(input.current().body().get("ITEMNO").toInteger());
input.current().property("QTY").set(input.current().body().get("QTY").toInteger());
stream.memory("itemstats").add(input.current());
});
Intercepted Messages are stored in Memories that use Message Properties to store, query, and index them. Therefore, a Message can be seen and processed like a row in a database table:
The data can be analyzed by using the real-time streaming analytics capabilities of the Streaming Interface which are on par with other popular real-time streaming analytics libraries. It provides sliding, tumbling, and session windows, event time processing, and late arrival handling, to name a few features.
// Callback function for onRetire to print the statistics
function printStatistic(retired){
// Generate statistics in the streams's log file
stream.log().info("Item Statistics:");
// Get and log the statistic data (items with summarized quantity in descending order)
retired.group("ITEMNO").sum("QTY").sort("QTY").reverse().forEach(function(message){
stream.log().info(message.property("ITEMNO").value().toString() + " = " +
message.property("QTY").value().toString());
});
}
// Create itemstats Memory
stream.create().memory("itemstats")
.heap()
.limit()
.time()
.tumbling()
.seconds(10)
.onRetire(printStatistic);
Stream Interface
Code Completion
Since: 10.2.0
We provide TypeScript stubs that can be installed in IDEs like IntelliJ IDEA so that the IDE supports JavaScript code completion for the Streams Interface.
First, download it here:
File | Description |
---|---|
TypeScript stubs for release 12.0.0 and later. |
Next, install it in your IDE. Example IntelliJ IDEA:
... and you have code completion:
Parameters
Since: 10.1.0
A SwiftMQ Streams script can have an unlimited number of parameters. These are defined in the script definition:
When a script is started, a script-global variable parameters
is passed that is only visible to this particular script (engine scope).
Get
To get the value of a parameter, use the get method. It returns the value or null if the parameter is not defined:
var inputQueue = parameters.get("input-queue");
Require
This method defines a required (mandatory) parameter. It throws an exception that leads to a stop of this script if the parameter is not defined:
var inputQueue = parameters.require("input-queue");
Optional
This method defines an optional parameter and provides a default value which is returned if the parameter is not defined:
var mailHost = parameters.optional("mail-host", "localhost");
Stream
Variable 'stream'
This is the entry point for scripts to interface with the stream processor. It is provided as a global script variable stream
that is only visible to this particular script (engine scope). Everything can be declared out of this stream
variable.
stream.create().input(inputTopic).topic();
Log
A Stream provides a Log where scripts can log informational, warning, and error messages. A separate log file is maintained by the Log Swiftlet (see Log Swiftlet, log sink directory) for each Stream. If there is an exception during stream processing, the stream processor will log the exception with the stack trace to the log file.
stream.log().info("this is an information");
stream.log().warning("this is a warning");
stream.log().error("this is an error");
CLI
Since: 10.1.0
A Stream provides a CLI interface to the local Router's management tree to execute any CLI command, e.g. changing contexts, creating queues, rebooting the Router. Before a command can be executed, a cc
must be executed to change to the respective CLI context. The root context (top of the management tree) is /
and is the default when a CLI command is executed the first time.
stream.cli().execute("cc /sys$queuemanager/queues")
.execute("new myqueue cache-size 1000")
.execute("save");
Whenever a CLI command fails, an exception will be thrown which will either lead to a stop of the Stream or enter the Stream's restart policy (see chapter below). But sometimes it is adequate to tolerate and thus ignore exceptions for particular commands, e.g. if a queue should be deleted but the queue is not defined. In that case, just switch exceptions off for a particular command:
stream.cli().execute("cc /sys$queuemanager/queues")
.exceptionOff()
.execute("delete myqueue")
.exceptionOn()
.execute("new myqueue cache-size 1000")
.execute("save");
Creating Temporary Queues
Since: 10.1.0
If a Stream acts as a service to other clients and receives request Messages on an Input, it is not always necessary to use a regular queue here if the Messages are non-persistent. For this case a Stream can create a temporary queue and registers it in JNDI so that JMS clients can perform a JNDI lookup and send Messages to this queue:
stream.create().tempQueue("requestQueue").registerJNDI();
A temporary queue can be explicitly deleted when it is not needed anymore:
stream.tempQueue("requestQueue").delete();
Otherwise, it will be automatically deleted when the Stream stops.
Lookup Temporary Queues from JNDI
Since: 10.2.0
When multiple Streams communicate, one Stream will use a TempQueue as input and registers it in JNDI under a name. To be able that other Streams can send to this queue, a JNDI lookup can be performed and an Output can be created.
// One Stream registers its Input queue in JNDI
stream.create().input(stream.create().tempQueue("requestQueue").registerJNDI()).queue();
// The other stream performs a lookup and creates an Output
stream.create().output("requestQueue").forAddress(stream.lookupJNDI("requestQueue"));
onMessage Callback
Messages flow from Inputs into the Stream. To process these Messages, an optional onMessage
callback can be registered which is called from the Stream Processor for each Message. Registering an onMessage
callback is not always necessary; Messages can also be processed in the respective onInput
callbacks.
The onMessage callback is a function without parameters:
stream.onMessage(function() {
// Do something
});
During the onMessage callback "stream.current()" returns the current message that is being processed.
stream.onMessage(function() {
stream.memory("all").add(stream.current());
});
If a Message is handled in onInput and must not flow through the onMessage callback, disable it during onInput:
input.current().onMessageEnabled(false):
onException Callback
Since: 10.1.0
To get informed about exceptions that may occur during Stream processing an onException
callback can be registered at the Stream. It is a function with 2 String parameters. The first is the exception, the second is the formatted stack trace:
stream.onException(function(lastException, lastStackTrace){
print("Last Exception: "+lastException);
print("Last Stacktrace: "+lastStackTrace);
});
It is not necessary to log these exceptions to the Stream's log file as this is done by the Stream Processor anyway.
The onException
callback can be used to inform an administrator by eMail:
stream.onException(function(lastException, lastStackTrace){
stream.mailserver("localhost")
.email()
.from(stream.name()+"@company.com")
.to("admin@company.com")
.subject("Stream "+stream.name()+" got an Exception: "+lastException)
.body(lastStackTrace)
.send();
});
Automatic Stream Restart
Since: 10.1.0
When an exception occurred anywhere during the Stream processing, the onException
callback is called (if registered), the current transaction is rolled back, the exception is logged into the Stream's log file and the Stream is stopped.
There are 2 properties for each Stream to configure an automatic restart, e.g. if a database connection was temporarily down, etc. These are Restart Delay
and Max Restarts
. Restart delay specifies the time in milliseconds between restarts and max restarts is self-explaining:
The current restart count can be acquired from the Stream with:
stream.restartCount();
onStart Callback
Since 11.0.0
A Stream can set an onStart
callback which is called when the Stream has been started. The intention of this callback is to let others know that the Stream is now available. This can be accomplished by sending a message to a registry to register or sending an eMail to someone.
The onStart
callback is a function without parameters:
stream.onStart(function() {
stream.output("registry")
.send(stream.create()
.message().textMessage()
.body(stream.name()+": Available")));
});
onStop Callback
Since 10.2.0
A Stream can set an onStop
callback which is called just before all Stream resources are closed. The intention of this callback is to let others know that the Stream is now unavailable. This can be accomplished by sending a message to a registry to unregister or sending an eMail to someone.
The onStop
callback is a function without parameters:
stream.onStop(function() {
stream.output("registry")
.send(stream.create()
.message().textMessage()
.body(stream.name()+": Unavailable")));
});
Function Callback
Since 11.2.0
If a Stream uses external libraries and calls an asynchronous method where a result is passed through a callback, the Stream must enqueue the processing into the Stream's event queue. Hereto the Stream registers a Function Callback. The function is then called when the event arrives at the Stream.
To register a Function Callback, the function itself is registered and an optional context object (which can be any Object) is passed. The context is then passed as a parameter to the registered function:
someExternalLib.someAsyncMethod(function(result){
// Enqueue it into the Stream's event queue
stream.executeCallback(function(context){
// context parameter is the result parameter passed to executeCallback
// process it
}, result);
});
Messages
SwiftMQ Streams works exclusively with JMS messages. However, they are not very convenient to use and access so we have created a facade that wraps JMS messages and provides a more elegant way to access all parts of it.
When a message arrives on an Input, it is already wrapped with the facade. To create a new message (e.g. to send it via Output), use stream.create().message()
which returns a MessageBuilder that has various methods to create the required message type.
This creates a persistent bytes message and puts a byte array into the body (since 11.2.0):
stream.create()
.message()
.bytesMessage()
.persistent()
.body([1, 2, 3]);
This creates a persistent text message and puts a text into the body:
stream.create()
.message()
.textMessage()
.persistent()
.body(msg);
This creates a nonpersistent stream message and writes 2 values to the body:
stream.create()
.message()
.streamMessage()
.nonpersistent()
.body()
.writeInt(100)
.writeString("Hello");
This creates a persistent map message, sets 2 properties, the correlation id and sets 2 values in the body:
stream.create()
.message()
.mapMessage()
.persistent()
.correlationId(otherMsg.messageId())
.property("ACCOUNTID").set(1234)
.property("ORDERID").set(4331)
.body()
.set("QTY", 100)
.set("COUNTRY", "DE");
By accessing a message property, a PropertyValue is returned (.value()
) which has several conversion methods (e.g. .toDouble()
) where method toObject()
just returns the object type that is stored as value (e.g. java.lang.Double):
var val = stream.current().property("temp").value().toInteger();
Property Set
Since: 10.1.0
To get all Properties of a Message, use properties()
which returns a PropertySet. This has various methods to select Properties by their names like startsWith()
, endsWith()
, select(regex)
. Each of these methods returns a new PropertySet. Once you are happy with the selected set, use forEach to iterate over the result:
// Returns all Properties not starting with an underscore and prints it
input.current().properties().notStartsWith("_").forEach(function(property){
print(property.name()+"="+property.value().toString());
});
Determining the Message Type
Since: 11.2.0
Sometimes it is required to check the message type of an arriving Message to process it. Each Message facade has a type()
method that returns the Message type: "message", "bytes", "text", "stream" or "map", respectively.
var msg = input.current();
switch (msg.type()){
case "bytes":
// BytesMessage
break;
case "text":
// TextMessage
break;
case "stream":
// StreamMessage
break;
case "map":
// MapMessage
break;
case "message":
// Message
break;
}
Inputs
Inputs are created out of the stream
variable and can consume on queues and/or topics, including durable subscribers, and from the Router's Management Tree. The number of Inputs created for a stream is not limited.
Each Input may have an optional callback onInput which is called from the stream processor when a message is received on this Input. It can be used to store the message in a memory, to convert or enrich a message. The callback is a function with a single parameter which is the Input itself. input.current()
returns the current message of this Input.
stream.create().input(orderHeadQueue).queue().onInput(function (input) {
stream.memory("orderhead").add(input.current());
});
stream.create().input(orderPosQueue).queue().onInput(function (input) {
stream.memory("orderpos").add(input.current());
// We need the QTY as a property
input.current().property("QTY").set(input.current().body().get("QTY").toInteger());
stream.memory("itemstats").add(input.current());
});
Queue Input (Regular Queue)
A Queue Input consumes messages from a queue.
stream.create().input(orderHeadQueue).queue().onInput(function (input) {
stream.memory("orderhead").add(input.current());
});
Queue Input (Temporary Queue)
Since: 10.1.0
If a Stream acts as a service to other clients and receives request Messages on an Input, it is not always necessary to use a regular queue here if the Messages are non-persistent. For this case a Stream can create a temporary queue, registers it in JNDI and creates a Queue Input so that JMS clients can perform a JNDI lookup and send Messages to this queue.
stream.create().input(stream.create().tempQueue("requestQueue").registerJNDI()).queue();
Queue Input (Wire Tap)
Since 10.2.0
You can wiretap any queue by adding a QueueWireTapInput to it. This is fully dynamic and doesn't require any configuration changes. Once a WireTap has been added to a queue, it sends a copy of each message sent to this queue also to the WireTap.
// Create a wiretap input on the orderpos queue.
stream.create().input(orderPosQueue).wiretap("w1").onInput(function (input) {
// We need ITEMNO and QTY as a property
input.current().property("ITEMNO").set(input.current().body().get("ITEMNO").toInteger());
input.current().property("QTY").set(input.current().body().get("QTY").toInteger());
stream.memory("itemstats").add(input.current());
});
A WireTap is identified by its name and has subscribers. If a Stream registers a WireTap under a name and a second Stream registers it under the same name, a single WireTap with 2 subscribers is created and Message copies are distributed round-robin to the subscribers. If a Stream registers it under a different name, another WireTap with a single subscriber is created and will receive its distinct Message copy.
A WireTap processes messages outside a transaction via memory buffering. If a subscriber is too slow, messages may not be sent to a subscriber if the memory buffer is full. So a WireTap is only useful if no transactional consistency is required and if a message lost can be tolerated, e.g. in statistic scenarios. For higher requirements use a regular Queue Input.
Memory buffer behavior is specified by the buffer size of the internal memory of the QueueWireTapInput and the maximum blocking time in case the memory is full. The default for buffer size is 10 Messages, for maximum block time is 500 milliseconds.
// Create a wiretap input with 100 messages buffer and no block time
stream.create().input(orderPosQueue).wiretap("w1").bufferSize(100).maxBlockTime(0).onInput(function (input) {
// We need ITEMNO and QTY as a property
input.current().property("ITEMNO").set(input.current().body().get("ITEMNO").toInteger());
input.current().property("QTY").set(input.current().body().get("QTY").toInteger());
stream.memory("itemstats").add(input.current());
});
Non-durable Topic Input
A non-durable Topic Input consumes messages from a topic. It creates a non-durable subcriber on it.
stream.create().input(inputTopic).topic().onInput(function (input) {
stream.memory("mymemory").add(input.current());
});
Durable Topic Input
A durable Topic Input consumes messages from a topic. It creates or reuses an existing durable subcriber on it. A client id and a durable name must be provided to identify the durable subscriber.
stream.create().input(inputTopic).topic().durable()
.clientId("mycid")
.durableName("myname")
.onInput(function (input) {
stream.memory("mymemory").add(input.current());
});
Message Selectors
To limit the number of messages that arrive on an Input, a JMS message selector can be supplied on each Input type.
stream.create().input(orderHeadQueue).queue()
.selector("ORDERHEADID between 1 and 10000")
.onInput(function (input) {
stream.memory("orderhead").add(input.current());
});
Creating multiple Inputs on the same Destination
Since 10.2.0
Sometimes it might be necessary to create multiple Inputs on the same destination, e.g. to have multiple subscribers on the same topic, each with a different message selector. To archive this, create the Inputs under a different name and use destinationName(name)
to set the destination name.
// Input 1
stream.create().input("input1").topic().destinationName(inputTopic)
.selector("ORDERHEADID between 1 and 10000")
.onInput(function (input) {
// Do something
});
// Input 2
stream.create().input("input2").topic().destinationName(inputTopic)
.selector("ORDERHEADID between 100001 and 20000")
.onInput(function (input) {
// Do something
});
Management Input
Since: 10.1.0
A Management Input consumes events from the local Router's management tree and converts them into Messages. The name of the Input is the CLI context and can refer to an EntityList, an Entity, or a Property.
Starting an Input dynamically
Since: 10.1.0
Inputs are usually created outside of callbacks. The Stream starts these Inputs automatically. If an Input is created inside a callback, e.g. as a result of an event from another Input or on a Timer event, it must be started explicitly:
stream.create().input("sys$queuemanager/usage/testqueue/messagecount").management().onChange(function(input){
if (input.current().property("messagecount").value().toInteger() > 0 && stream.input("testqueue") == null)
stream.create().input("testqueue").queue().onInput(function(input){
print("onInput (testqueue): "+input.current().body());
}).start();
else if (stream.input("testqueue") != null)
stream.input("testqueue").close();
});
Disabling to call onMessage
Since: 10.1.0
If a Message is handled in onInput and must not flow through the onMessage
callback, disable it during onInput
:
input.current().onMessageEnabled(false):
Closing an Input dynamically
Since: 10.1.0
An Input, like every other component, is automatically closed when the Stream is stopped. An Input can also be closed explicitly anytime with:
input.close();
Timer
Timers can be created for any purpose (mostly it would be to call checkLimit()
on Memories or Memory Groups). A Timer must have a callback that is executed from the stream processor when this Timer event occurs. The callback is a function with a single parameter which is the Timer itself.
stream.create().timer("monitor").interval().minutes(2).seconds(10).onTimer(function (timer) {
stream.memory("all").checkLimit();
});
Interval Timer
An Interval Timer executes the onTimer
callback in an interval that can be specified in days, hours, minutes, seconds, and milliseconds. Each call of these methods adds to the interval. For example, a Timer that should be executed every 90 minutes can be specified as:
stream.create().timer("monitor").interval().hours(1).minutes(30).onTimer(function (timer) {
// Do something
});
At Timer
Since 10.2.0
An At Timer executes the onTimer
callback at specific absolute times. Multiple times can be specified. The Timer is recurring. That is, when all times have been executed, it continues the next day. The time format is either HH:MM
or HH:MM:SS
.
The following Timer fires at 10:00, 11:35:45, and 17:30 each day:
stream.create().timer("trigger").at().time("10:00").time("11:35:45").time("17:30").onTimer(function (timer) {
// Do something
});
Next Timer
Since 10.2.0
A Next Timer is a one-shot Timer that executes the onTimer
callback at the beginning of a specific time: begin of the next second, minute, hour, or day.
stream.create().timer("daystarter").next().beginOfDay().onTimer(function (timer) {
// Do something
});
Starting a Timer dynamically
Since: 10.1.0
Timers are usually created outside of callbacks. The Stream starts these Timers automatically. If a Timer is created inside a callback, e.g. as a result of an event from an Input or on a Timer event, it must be started explicitly:
// Starts a Timer each time a new JMS client connects
var x = 0;
stream.create().input("sys$jms/usage").management().onAdd(function(input){
if (stream.timer("counter") == null)
{
stream.create().timer("counter").interval().seconds(1).onTimer(function(timer){
print("onTimer, x="+(++x));
if (x == 10)
{
x = 0;
timer.close();
}
}).start();
}
});
Resetting/Reconfiguring a Timer
Since: 10.1.0
Should a Timer be reconfigured with a new interval without recreating it, it must be reset, the new value must be set, and then reconfigure must be called:
// Create a Management Input that receives changes on my own statistic-interval-sec parameter
// and reconfigures the Timer
stream.create().input("sys$streams/streams/"+stream.name()+"/parameters/statistic-interval-sec").management().onChange(function (input){
// Get the new value
var secs = input.current().property("value").value().toInteger();
// Reset and reconfigure the Timer with the new value
stream.timer("stats").reset().seconds(secs).reconfigure();
// Recreate the itemstats Memory
stream.memory("itemstats").close();
stream.create().memory("itemstats").heap().limit().time().sliding().seconds(secs);
// Log it into the Logfile
stream.log().info("Statistic interval reconfigured to "+secs+" seconds");
// Avoid that this Management Message is passed to onMessage
input.current().onMessageEnabled(false);
});
Closing a Timer dynamically
Since: 10.1.0
A Timer, like every other component, is automatically closed when the Stream is stopped. A Timer can also be closed explicitly anytime with:
timer.close();
Memories
The heart of SwiftMQ Streams is Memories. They store messages, adjust automatically, and provide many functions to analyze the data.
All messages in a Memory must have the same message properties if these properties are used in queries or to create an index. That way a message can be seen and processed like a row in a database table.
A Memory works exclusively over message properties and analyzing data works only if the data is part of the message properties. So in case some data are in the message body but not in the properties, use onInput
to correct that:
stream.create().input(orderPosQueue).queue().onInput(function (input) {
stream.memory("orderpos").add(input.current());
// We need the QTY as a property
input.current().property("QTY").set(input.current().body().get("QTY").toInteger());
stream.memory("itemstats").add(input.current());
});
Create
Memories are created with stream.create().memory(name)
which returns a MemoryBuilder. This builder can create a HeapMemory, a QueueMemory or a TempQueueMemory.
Add
Adding a message to one or more Memories takes either place in onInput
or onMessage
from input.current()
or stream.current()
, respectively.
Adding in onInput
:
stream.memory("orderpos").add(input.current());
Adding in onMessage
:
stream.memory("orderpos").add(stream.current());
Access/Query
A Memory maintains the insertion order of the message. The first message is the oldest. Messages can be accessed at(index)
or the convenience methods first()
and last()
. size()
returns the number of messages held in the Memory. The forEach(callback)
is a convenience method that iterated over the whole Memory.
//Equivalent
var msg1 = stream.memory("critical").at(0);
var msg2 = stream.memory("critical").first();
Messages can also be selected by a JMS message selector. This is a sequential operation that compares all messages against the selector. Therefore it is only recommended for small Memories. It returns a new Memory that contains the result set. This memory can then be used in further queries and so on.
if (stream.memory("warning").select("temp > 100").size() > 2) {
// do something
}
All Messages of a Memory can also be accessed by the forEach
method:
grouped.forEach(function (message) {
stream.log().info(message.property("ITEMNO").value().toString() + " = " +
message.property("QTY").value().toString());
});
Large Memories should be indexed. To access it through an index, use .index(name).get(propvalue)
which returns a new Memory with the result.
// Selects all order positions of a particular orderhead id
var orderPosMem = stream.memory("orderpos").index("ORDERHEADID").get(orderHeadId);
Note: All Memories returned from a query are of type HeapMemory.
Remove
Removal of messages from a Memory takes place automatically if the Memory has a limit declared. Manual removal can take place with remove(index)
or remove(selector)
. A Memory stores the insertion time of a message. To remove messages older than a specific time, use removeOlderThan(time)
.
Messages can also be removed via any associated index.
// Remove Message at index 10
stream.memory("mymem").remove(10);
// Remove all Messages with temp > 100
stream.memory("warning").remove("temp > 100")
// Remove all Messages older than 30 secs
stream.memory("mymem").removeOlderThan(java.lang.System.currentTimeMillis()-30000);
// Remove the messages with this orderhead id from orderhead Memory
stream.memory("orderhead").index("ORDERHEADID").remove(orderHeadId);
Note: The returned Memory of this method is the Memory where the messages are removed from (this).
Values
To get all values of a particular property of all messages of a Memory (which may be a result of a query), use values(propName)
. It returns a java.util.List
of values.
// Logs all sorted temps
stream.log().info(stream.memory("all").sort("temp").values("temp"));
Aggregate
A Memory provides the aggregate functions min
, max
, sum
, and average
. They all work on a specific property. min
and max
return a message while sum
and average
require the property type as a number
and return a double
value.
stream.log().info("AVERAGE TEMP LAST 10 SECS=" +
mem.average("temp") +
", MIN=" +
mem.min("temp").property("temp").value().toObject() +
", MAX=" +
mem.max("temp").property("temp").value().toObject());
Series
A Memory provides functions to detect a series of values of a property. .ascendingSeries()
returns true if all values of this particular property are in ascending order, .descendingSeries()
returns true if they are in descending order.
if (stream.memory("critical").ascendingSeries("temp")) {
// Do something
}
Group
To arrange identical values of a property into a group, use .group(propName)
. It returns a GroupResult which contains a set of Memories, each containing the messages for a unique property value. This set can be acquired with .result()
which returns an array of Memory (Memory[]). GroupResult provides the aggregate functions min, max, average and sum that can be applied on the GroupResult and returns a new Memory.
var mem = stream.memory("itemstats").group("ITEMNO").sum("QTY");
Sort
To sort a Memory in ascending order over a specific property, use .sort(propName)
. It returns a new Memory with the sorted messages. To get the result in descending order, apply .reverse()
on the result which returns another new Memory in descending order.
var mem = stream.memory("itemstats").group("ITEMNO").sum("QTY").sort("QTY").reverse();
Join
Since: 10.1.0
Performs an inner join with the right Memory over the named join Property name which must exist in the Messages of both Memories. The result is a Memory that contains Messages where each Message on the left side (this Memory) matches with a Message on the right side. The result Message will contain the left result Message enriched with all Properties of the right result Message.
If the join Property of the right Memory is not indexed, an Index will be created automatically before performing the join operation.
This example joins an order head memory with an order position memory over join property ORDERHEADID
. The result is a Memory that contains all order positions of that ORDERHEADID
.
// Join orderhead with the orderpos memory
var orderPosMem = orderHeadMem.join(stream.memory("orderpos"), "ORDERHEADID");
If the name of the join Property names is different, use this method:
// Join orderhead with the orderpos memory
var orderPosMem = orderHeadMem.join(stream.memory("orderpos"), "ORDERHEADID", "orderheadid");
Closing a Memory dynamically
Since: 10.1.0
A Memory, as every other component, is automatically closed when the Stream is stopped. A Memory can also be closed explicitly anytime with:
memory.close();
Example:
// Recreate the itemstats Memory
stream.memory("itemstats").close();
stream.create().memory("itemstats").heap().limit().time().sliding().seconds(secs);
Note that all non-permanent Memories (Heap/TempQueue) will lose their content on close while a permanent Memory (Queue) keeps its Messages in the queue.
Windowing
Since: 10.1.0
Windowing is a technique to divide a potentially infinite Message stream into finite slices to process it. Message streams are stored in Memories and this is where windows are defined.
SwiftMQ Streams supports 2 types of windows: tumbling and sliding windows, both can be limited by count or by time. It further supports session windows by Memory Groups.
When a window reaches its limit, Messages will retire and can be processed by an onRetire
callback that can be registered at the particular Memory.
Tumbling Windows
A tumbling window retires all Messages of that window at once when the limit is reached. For a count-based window, this is when the count of the limit is reached, in a time-based window this is when the oldest Message time is less or equal to the limit's time.
This is useful to process Messages in batches, e.g. every 1000 Messages (count-based) or every 10 seconds (time-based).
Tumbling windows are usually processed in the onRetire callback (see the section below on this page).
Sliding Windows
A sliding window retires only those Messages that drop out of the window if the limit is reached. It does not create a new window but the window slides along the Message stream:
If the window is count-based with a limit of 1000 Messages, the oldest Message will retire so the Memory will contain exactly the last 1000 Messages added to the Memory. If the window is time-based with a limit of 10 seconds and a Message will be added, all Messages will retire that are older than 10 seconds so the Memory will contain exactly the Messages added within the last 10 seconds.
With sliding windows, the (usual single) Message that retires is not of interest but the Messages that remain in the Memory (current window). So with a sliding window, the processing is not in the onRetire
callback but usually within onMessage.
Limits
Windows are defined by attaching Limits to a Memory.
onRetire Callback
When a Limit is reached, Messages will retire. To process these Messages, an onRetire
callback needs to be registered at the Memory where the processing takes place:
// Callback function for onRetire to print the statistics
function printStatistic(retired){
// Generate statistics in the streams's log file
stream.log().info("Item Statistics:");
// Get and log the statistic data (items with summarized quantity in descending order)
retired.group("ITEMNO").sum("QTY").sort("QTY").reverse().forEach(function(message){
stream.log().info(message.property("ITEMNO").value().toString() + " = " +
message.property("QTY").value().toString());
});
}
// Create itemstats Memory
stream.create().memory("itemstats")
.heap()
.limit()
.time()
.tumbling()
.seconds(10)
.onRetire(printStatistic);
The callback can be registered either at the Memory itself or on one of the attached Limits. If the callback is registered at the Limit, it will be automatically registered at the Memory. So it is always a single callback, no matter where it will be registered.
The callback has one parameter - retired - where a new HeapMemory is passed that contains all retired Messages.
Event Time Processing
When a Message is added to a Memory, the Memory uses the current time as the store time and all time-based limits work on this time. This is called processing time.
However, if Messages should be processed at the time at which the event has occurred, it is called event time processing. In that case, the Message must contain a Property of type Long
with the actual event time. If this Property is not set (e.g. it is part of the Message body), use onInput
to extract and set it accordingly.
The event time Property name needs to be set at the Memory with:
stream.create().memory("mem").orderBy("evttime");
The Memory then orders the Messages by the values of the orderBy
Property. This is also the case if the Messages arrive out of order but only within the current window. The first Message in the Memory is the oldest, the last Message is the youngest, based on event time.
Session Windows (Memory Groups)
A session window is a window that allows to group of different Messages from the stream for a specific session.
The Message stream may consist of activities of different users, customers, hosts, etc. A Property of the Messages in the stream contains the value where we like to group these Messages. The result is one Memory per distinct value, e.g. one Memory per distinct user if we group over a user id.
SwiftMQ Streams provides Memory Groups for this purpose. A Memory Group defines a group Property
. Once a Message is added to a Memory Group, it takes care to create a new Memory for each distinct value of this Property and adds the Message to this Memory. So in the end, the stream of Messages is split into session windows (Memories). Further windows (tumbling, sliding) and onRetire
callbacks can be defined for these Memories to process these Messages.
A Memory Group has one mandatory callback (MemoryCreateCallback
) which is responsible to return a new Memory once it is called:
stream.create().memoryGroup("users", "userid").onCreate(function (key) {
// A new value of the group is received and we create a Memory for it
// with a 1 min tumbling time window
stream.create().memory(key).heap().limit().time().tumbling().minutes(1).onRetire(function (retired) {
// Do something with this window for this user id
});
// Return the new memory
return stream.memory(key);
})
New Messages are simply added to the Memory Group in onInput and the Memory Group takes care to create Memories for each distinct user id:
stream.create().input("useractivities").queue().onInput(function (input) {
stream.memoryGroup("users").add(input.current());
});
Timer and Check Limit
Limits of a Memory are automatically checked when a new Message is added to that Memory. So if you have a Count Limit defined, you don't need to do anything.
However, a Time Limit and a Time Unit Change Limit require that a Timer calls Memory.checkLimit()
in the interval that is less than that of the Time Limit. For example, for the above itemstats
Memory a Timer needs to be declared as follows:
// Create a timer that checks the limit of the itemstats memory every second
stream.create().timer("stats").interval().seconds(1).onTimer(function (timer) {
stream.memory(`itemstats`).checkLimit();
});
Memory.checkLimit()
also checks for the Inactivity Timeout.
Memory Groups require a Timer to call MemoryGroup.checkLimit()
to remove expired Memories from the Memory Group if a Group Inactivity Timeout is defined. MemoryGroup.checkLimit()
also calls Memory.checkLimit()
on all Memories of this Memory Group automatically:
// Create a timer that calls checkLimit on the memory group each minute
stream.create().timer("queues").interval().minutes(1).onTimer(function (timer) {
stream.memoryGroup("queues").checkLimit();
});
Outputs
Output messages can be sent from the onTimer
and/or onMessage
callback to queues or topics.
An Output is created from the stream:
// Queue Output
stream.create().output(outputQueue).queue();
// Topic Output
stream.create().output(outputTopic).topic();
Queue Output
Sends a message to a queue.
stream.output(outputQueue)
.send(stream.create()
.message()
.textMessage()
.persistent()
.body(msg));
Topic Output
Sends a message to a topic.
stream.output(outputTopic)
.send(stream.create()
.message()
.textMessage()
.persistent()
.body(msg));
Short living Outputs, Request/Reply
In a request/reply scenario where Script works as a service to send replies, the received requests usually contain a replyTo
address with a temporary queue name. In this case, it is not possible to pre-create the Outputs. Instead, this must be done in the onMessage callback by specifying null
as the name of the Output. This marks the Output as short living and avoids that it is being registered in the Management Tree for the live Usage data.
The Output must be closed after the send has been performed.
To create the Output for a replyTo
address, use the forAddress(..)
method.
// A simple reply service
stream.onMessage(function () {
stream.create()
.output(null)
.forAddress(stream.current().replyTo())
.send(stream.create()
.message()
.textMessage()
.correlationId(stream.current().messageId())
.body("RE: "+stream.current().body()))
.close();
});
Exceptions are suppressed when sending to a temporary queue to avoid the Stream is stopped in case the temporary queue does not exist anymore (since 10.2.0).
Purging unused Outputs
Since: 10.2.0
If a Stream sends to many different Outputs, they will count up until they are closed. It is possible to purge Outputs that have not been used for a time. This must be done timer-driven. All Outputs will be closed that were not used between 2 timer intervals.
// Create the Timer to purge unused Outputs
stream.create().timer("purger").interval().minutes(purgeInterval).onTimer(function (timer) {
stream.purgeOutputs();
});
Closing an Output dynamically
Since: 10.1.0
An Output, as every other component, is automatically closed when the Stream is stopped. An Output can also be closed explicitly anytime with:
stream.output(outputTopic).close();
JDBC
To enrich messages with data from existing databases or to validate messages, the JDBCLookup facade can be used to query any JDBC database.
Note: Before a JDBCLookup can be used, install the JDBC driver classes. This is done by copying it into the directory kernel/sys_streams
. Then restart the router.
A JDBCLookup is created from the stream:
stream.create().jdbcLookup("accounts")
.driver("com.mysql.jdbc.Driver")
.url("jdbc:mysql://localhost:3306/test")
.username("root")
.password("changeme")
.connect();
Any SQL Select can be used to query the database:
var accounts = stream.jdbcLookup("accounts")
.query("select * from accounts");
It returns a HeapMemory which can be used to access the data in the onMessage
callback. The returned Memory contains entries of the type Message for each selected row. For each row-column, a message property is created with the same name and the column value in the corresponding Java types.
If the query result is not large, it can be saved in a HeapMemory, indexed, and used in the onMessage
callback:
stream.create().jdbcLookup("accounts")
.driver("com.mysql.jdbc.Driver")
.url("jdbc:mysql://localhost:3306/test")
.username("root")
.password("changeme")
.connect();
var accounts = stream.jdbcLookup("accounts")
.query("select * from accounts")
.createIndex("accountid");
// We don't need a JDBC connection anymore so close it to save resources
stream.jdbcLookup("accounts").close();
// Define onMessage
stream.onMessage(function () {
// Do something
var discount = accounts.index("accountid").get(ACCOUNTID)
.first()
.property("discount")
.value().toDouble();
if (discount > 0)
{
// Do something else
}
});
Of course, a JDBCLookup can be used to query it in onMessage:
// Create the JDBCLookup
stream.create().jdbcLookup("accounts")
.driver("com.mysql.jdbc.Driver")
.url("jdbc:mysql://localhost:3306/test")
.username("root")
.password("changeme")
.connect();
// Define onMessage
stream.onMessage(function () {
// Do something
var discount = stream.jdbcLookup("accounts")
.query("select discount from accounts where accountid = "+ACCOUNTID)
.first()
.property("discount")
.value().toDouble();
if (discount > 0)
{
// Do something else
}
});
To send eMails (e.g. as an alert to someone), a Mail Server must be created. This takes place only once for each different Mail Server. This Mail Server is then used to send eMails.
stream.create()
.mailserver("localhost")
.username("admin@company.com")
.password("secret")
.connect();
To send an eMail, use a predefined MailServer, configure the eMail properties and send it.
stream.mailserver("localhost")
.email()
.from("tempmonitor@swiftmq.com")
.to("admin@company.com")
.bcc("boss@company.com")
.subject("Nuclear Powerplant Monitor - CRITICAL!")
.body("Temp spike detected, last temps=" + stream.memory("critical").values("temp"))
.send();
Closing a Mail Server dynamically
Since: 10.1.0
A Mail Server, like every other component, is automatically closed when the Stream is stopped. A Mail Server can also be closed explicitly anytime with:
stream.mailserver("localhost").close();
Time Support
Since: 10.2.0
The Streams Interface provides a convenience class TimeSupport
with various time methods, e.g. getting the current time, format/parse time, getting seconds, minute, day, week, month, year from a time. This class is accessible from Streams scripts via variable time
.
Examples:
print("Second: "+time.second());
print("Minute: "+time.minute());
print("Minute-10: "+time.minute(time.currentTime(),-10));
print("Minute+10: "+time.minute(time.currentTime(),10));
print("Hour: "+time.hour());
print("Hour-5: "+time.hour(time.currentTime(),-5));
print("Hour+5: "+time.hour(time.currentTime(),5));
print("Day: "+time.day());
print("Day-3: "+time.day(time.currentTime(),-3));
print("Day+10: "+time.day(time.currentTime(),10));
print("Day Display: "+time.dayDisplayName());
print("Day Display-3: "+time.dayDisplayName(time.currentTime(),-3));
print("Day Display+10: "+time.dayDisplayName(time.currentTime(),10));
print("Week: "+time.week());
print("Week-10: "+time.week(time.currentTime(),-10));
print("Week+45: "+time.week(time.currentTime(),45));
print("Month: "+time.month());
print("Month-10: "+time.month(time.currentTime(),-10));
print("Month+16: "+time.month(time.currentTime(),16));
print("Month Display: "+time.monthDisplayName());
print("Month Display-10: "+time.monthDisplayName(time.currentTime(),-10));
print("Month Display+16: "+time.monthDisplayName(time.currentTime(),16));
print("Year: "+time.year());
print("Format: "+time.format(time.currentTime(), "dd/MM/yyyy"));
print("Parse: "+time.parse(time.format(time.currentTime(), "dd/MM/yyyy"), "dd/MM/yyyy"));
Output:
Second: 29
Minute: 10
Minute-10: 0
Minute+10: 20
Hour: 11
Hour-5: 6
Hour+5: 16
Day: 4
Day-3: 1
Day+10: 14
Day Display: Tuesday
Day Display-3: Saturday
Day Display+10: Friday
Week: 14
Week-10: 4
Week+45: 7
Month: 3
Month-10: 5
Month+16: 7
Month Display: April
Month Display-10: June
Month Display+16: August
Year: 2017
Format: 04/04/2017
Parse: 1491256800000
Stream Scripts
General Structure of a Script
The general structure of a script is:
Parse parameters.
Create Memories and define
onRetire
callbacks.Create a Mail Server if you want to send eMails.
Create JDBC Lookups if you want to select data from databases.
Create Inputs and define
onInput
callbacks.Create Timers and define
onTimer
callbacks.Define
onMessage
callback.
Note: The following code snippets are from the sample "Nuclear Powerplant Monitor".
Parse Parameters
Any number of parameters can be configured in the Stream configuration. These parameters will be passed to the script as script-global variable parameters as a java.util.Map
.
var inputQueue = parameters.require("input-queue");
var mailHost = parameters.optional("mail-host", "localhost");
var mailUser = parameters.require("mail-user");
var mailPassword = parameters.require("mail-password");
var mailFrom = parameters.require("mail-from");
var mailTo = parameters.require("mail-to");
var mailBcc = parameters.require("mail-bcc");
Create Memories
Create all Memories you want to use and define onRetire
callbacks, if necessary.
// Create 3 Memories, one for monitoring (all), one to detect warning, one to detect critical conditions
stream.create().memory("all").heap().limit().time().tumbling().seconds(10).onRetire(function (retired) {
stream.log().info("AVERAGE TEMP LAST 10 SECS=" + retired.average("temp") +
", MIN=" + retired.min("temp").property("temp").value().toObject() +
", MAX=" + retired.max("temp").property("temp").value().toObject());
stream.log().info(retired.values("temp"));
});
stream.create().memory("warning").heap().limit().count(2).sliding();
stream.create().memory("critical").heap().limit().count(4).sliding();
Create Mail Server
If you want to send eMails in your script, you need to create a mail server.
// Create a mail server to send critical mails
stream.create().mailserver(mailHost).username(mailUser).password(mailPassword).connect();
Create Inputs and define onInput Callbacks
Create all Inputs and define onInput
callbacks, if needed.
// Create the Inputs
stream.create().input(inputQueue).queue().onInput(function (input) {
// We need a property "temp" instead of "TEMP"
input.current().property("temp").set(input.current().property("TEMP").value().toInteger());
});
Create Timers and define onTimer Callbacks
Create all Timers and define their onTimer
callbacks.
// Create a timer to trigger the retirement of the "all" memory every 10 secs
stream.create().timer("monitor").interval().seconds(10).onTimer(function (timer) {
stream.memory("all").checkLimit();
});
Define onMessage Callback (deprecated)
Note: An onMessage
callback can be defined to have the whole processing in this callback. However, the best practice to process messages is to use onInput
to add them to Memories, define the appropriate Limits (Windows) on the Memory, and process the Messages in the Memory's onRetire
callback.
Define the onMessage
callback that is invoked when a message is processed.
stream.onMessage(function () {
// Store the message in all 3 memories
stream.memory("all").add(stream.current());
stream.memory("warning").add(stream.current());
stream.memory("critical").add(stream.current());
// Log a warning if the last 2 temps were higher than 100 degrees
if (stream.memory("warning").select("temp > 100").size() == 2)
stream.log().warning("LAST 2 TEMPS > 100!");
// Spike condition is:
// At least 4 messages in memory,
// the first is more than 400 degrees,
// each temp is greater than the temp before (ascending series),
// the last temp is more than 1.5 times higher than the first.
if (stream.memory("critical").size() == 4 &&
stream.memory("critical").first().property("temp").value().toDouble() > 400 &&
stream.memory("critical").ascendingSeries("temp") &&
stream.memory("critical").last().property("temp").value().toDouble() >
stream.memory("critical").first().property("temp").value().toDouble() * 1.5) {
// Log an error
stream.log().error("WE HAVE A SPIKE!!!");
// Send an alert mail
stream.mailserver(mailHost)
.email()
.from(mailFrom)
.to(mailTo)
.bcc(mailBcc)
.subject("Nuclear Powerplant Monitor - CRITICAL!")
.body("Temp spike detected, last temps=" + stream.memory("critical").values("temp"))
.send();
}
});
Dependencies
Since: 11.0.0
If Streams depends on other Streams to work properly, e.g. a service Stream must be started before requests from other Streams can be sent, dependencies can be configured:
<stream name="partitionprobe" enabled="true" script-file="/opt/swiftmq_1_0_0_dashboard/streams/partitionprobe.js">
<dependencies>
<dependency name="swiftmq.system.streamregistry"/>
</dependencies>
<parameters>
<parameter name="unit" value="GB"/>
</parameters>
</stream>
In the above example, Stream swiftmq.system.streamregistry
will be automatically started (if not running) before Stream partitionprobe
is started. When Stream swiftmq.system.streamregistry
is stopped, Stream partitionprobe
is stopped first.
Multiple dependencies can be configured, each in a fully qualified manner domain.package.stream
. The Streams Swiftlet automatically builds a dependency graph internally and detects circular references. So only the direct dependencies should be specified.
Best Practice
Here are some best practices to organize your Streams. We use the warehouse
sample.
1. Keep your Streams in a separate Directory outside the SwiftMQ Distribution
Store all Streams that belong together as a package or application in a separate directory.
2. Upload your Stream Scripts into the Stream Repository
Each SwiftMQ Router provides a Stream repository where you can upload your Stream scripts to a named repository. This is much easier than copying the scripts to each Router's host.
Go to the SwiftMQ Client's script directory and use the repojms
command:
Upload the Stream scripts of your application:
./repojms smqp://localhost:4001 ConnectionFactory admin secret router1 \
add warehouse /opt/swiftmq_1_0_0_dashboard/apps/warehouse/streams js
accounting.js added to repository warehouse
customer.js added to repository warehouse
ordermonitor.js added to repository warehouse
producer.js added to repository warehouse
reordermonitor.js added to repository warehouse
shipper.js added to repository warehouse
shippingmonitor.js added to repository warehouse
store.js added to repository warehouse
Also, upload all Stream scripts you depend on (in this example the base Streams of SwiftMQ Dashboard):
./repojms smqp://localhost:4001 ConnectionFactory admin secret router1 \
add dashboard /opt/swiftmq_1_0_0_dashboard/streams js
aggregator.js added to repository dashboard
correlator.js added to repository dashboard
entitylistcountprobe.js added to repository dashboard
history.js added to repository dashboard
inspector.js added to repository dashboard
join.js added to repository dashboard
mgmtinspector.js added to repository dashboard
monitorpanel.js added to repository dashboard
propertyprobe.js added to repository dashboard
propertysumprobe.js added to repository dashboard
thresholdmonitor.js added to repository dashboard
timeline.js added to repository dashboard
top.js added to repository dashboard
Repeat this for all Routers where you like to use your application (just change the router name in the above commands).
3. Provide an install.cli Script
For the fresh installation of your package, provide an install.cli script that defines all your Streams.
You can use a variable ${routername} instead of a static router name in your script. The variable is then automatically substituted with the router name from the last sr
CLI command. So you can install your package on every router you want without touching the CLI script.
As script-file
property set the URL of the repository in the form:
repository:/<reponame>/<filename>
Excerpt from the install.cli
:
cc /sys$streams/domains
new dashboard
cc /sys$streams/domains/dashboard/packages
new warehouse
cc /sys$streams/domains/dashboard/packages/warehouse/streams
new accounting script-file "repository:/warehouse/accounting.js"
cc /sys$streams/domains/dashboard/packages/warehouse/streams/accounting/parameters
new accounting-topic value stream_${routername}_dashboard_warehouse_accounting
new customer-invoice-topic value stream_${routername}_dashboard_warehouse_customer_invoice
cc /sys$streams/domains/dashboard/packages/warehouse/streams/accounting/dependencies
new dashboard.warehouse.store
...
Resulting configuration in the routerconfig.xml
:
<stream name="accounting" enabled="true" script-file="repository:/warehouse/accounting.js">
<dependencies>
<dependency name="dashboard.warehouse.store"/>
</dependencies>
<parameters>
<parameter name="accounting-topic" value="stream_router1_dashboard_warehouse_accounting"/>
<parameter name="customer-invoice-topic" value="stream_router1_dashboard_warehouse_customer_invoice"/>
</parameters>
</stream>
To invoke install.cli
, go to the SwiftMQ Client's script directory and use CLI's execute
command:
bigmac$ ./cli
Welcome to SwiftMQ!
Username:
Password:
Trying to connect ... connected
Router 'router1' is available for administration.
Router 'router2' is available for administration.
Type 'help' to get a list of available commands.
> sr router1
router1> execute /opt/swiftmq_1_0_0_dashboard/app/warehouse/streams/install.cli
...
router1> sr router2
router2> execute /opt/swiftmq_1_0_0_dashboard/app/warehouse/streams/install.cli
...
The above example installs the warehouse
dashboard app on router1 and router2.
4. Provide an enable.cli Script
To enable (start) your Streams, provide an enable.cli
script:
cc /sys$streams/domains/dashboard/packages/warehouse/streams/store
set enabled true
cc /sys$streams/domains/dashboard/packages/warehouse/streams/accounting
set enabled true
cc /sys$streams/domains/dashboard/packages/warehouse/streams/trackingpanel
set enabled true
cc /sys$streams/domains/dashboard/packages/warehouse/streams/ordermonitor
set enabled true
...
To invoke enable.cli,
go to the SwiftMQ Client's script directory and use CLI's execute
command:
bigmac$ ./cli
Welcome to SwiftMQ!
Username:
Password:
Trying to connect ... connected
Router 'router1' is available for administration.
Router 'router2' is available for administration.
Type 'help' to get a list of available commands.
> sr router1
router1> execute /opt/swiftmq_1_0_0_dashboard/app/warehouse/streams/enable.cli
...
router1> sr router2
router2> execute /opt/swiftmq_1_0_0_dashboard/app/warehouse/streams/enable.cli
...
Your Streams are now running.
5. Provide a disable.cli Script
To disable (stop) your Streams, provide a disable.cli
script:
cc /sys$streams/domains/dashboard/packages/warehouse/streams/store
set enabled false
cc /sys$streams/domains/dashboard/packages/warehouse/streams/accounting
set enabled false
cc /sys$streams/domains/dashboard/packages/warehouse/streams/trackingpanel
set enabled false
cc /sys$streams/domains/dashboard/packages/warehouse/streams/ordermonitor
set enabled false
...
To invoke disable.cli
, go to the SwiftMQ Client's script directory and use CLI's execute
command:
bigmac$ ./cli
Welcome to SwiftMQ!
Username:
Password:
Trying to connect ... connected
Router 'router1' is available for administration.
Router 'router2' is available for administration.
Type 'help' to get a list of available commands.
> sr router1
router1> execute /opt/swiftmq_1_0_0_dashboard/app/warehouse/streams/disable.cli
...
router1> sr router2
router2> execute /opt/swiftmq_1_0_0_dashboard/app/warehouse/streams/disable.cli
...
Your Streams are now stopped.
6. Provide a remove.cli Script
To completely remove your Streams from a Router, provide a remove.cli
script. This script should remove all Streams and all resources that your Streams may have created such as queues (e.g. as a persistent store for Memories).
cc /sys$streams/domains/dashboard/packages
delete warehouse
cc /sys$queuemanager/queues
delete state_dashboard_warehouse_custstat
delete state_dashboard_warehouse_shipstat
delete state_dashboard_warehouse_productstat
delete state_dashboard_warehouse_ordertimeline
delete state_dashboard_warehouse_shipmenttimeline
delete state_dashboard_warehouse_producertimeline
delete state_dashboard_warehouse_trackingpanel
delete streams_dashboard_warehouse_custstat
delete streams_dashboard_warehouse_shipstat
delete streams_dashboard_warehouse_productstat
delete streams_dashboard_warehouse_ordertimeline
delete streams_dashboard_warehouse_ordertimeline_active
delete streams_dashboard_warehouse_shipmenttimeline
delete streams_dashboard_warehouse_shipmenttimeline_active
delete streams_dashboard_warehouse_producertimeline
delete streams_dashboard_warehouse_producertimeline_active
delete streams_dashboard_warehouse_orderprobehistory
delete streams_dashboard_warehouse_orderstatechainhistory
delete streams_dashboard_warehouse_trackingpanel_input
delete streams_dashboard_warehouse_trackingpanel_store
save
To invoke remove.cli
, go to the SwiftMQ Router's script directory and use CLI's execute
command:
bigmac$ ./cli
Welcome to SwiftMQ!
Username:
Password:
Trying to connect ... connected
Router 'router1' is available for administration.
Router 'router2' is available for administration.
Type 'help' to get a list of available commands.
> sr router1
router1> execute /opt/swiftmq_1_0_0_dashboard/apps/warehouse/streams/remove.cli
...
router1> sr router2
router2> execute /opt/swiftmq_1_0_0_dashboard/apps/warehouse/streams/remove.cli
...
The above example removes the warehouse dashboard application including all resources from router1 and router2.
7. Optionally provide a upgrade.cli Script
If you want to automatically upgrade your package, e.g. Streams have been added, changed, or removed, provide a upgrade.cli
script that does this job but preserves your data. Execute it in the same manner as above.
Advanced Configuration
Adding Streams to existing Queues and Topics
Wire Tapping
Since 10.2.0
You can wiretap any queue by adding a QueueWireTapInput to it. This is fully dynamic and doesn't require any configuration changes. Once a WireTap has been added to a queue, it sends a copy of each message sent to this queue also to the WireTap. There can be multiple WireTaps on a queue, each will receive a message copy. So any Stream can register its own WireTap with an optional message selector.
A WireTap can have multiple subscribers (Streams). With this, it is possible to scale up the processing of messages sent to the WireTap. Messages are distributed in a round-robin fashion to the subscribers of WireTap.
Note: A WireTap processes messages outside a transaction via memory buffering. If a subscriber is too slow, messages may not be sent to a subscriber if the memory buffer is full. So a WireTap is only useful if no transactional consistency is required and if a message lost can be tolerated, e.g. in statistic scenarios. For higher requirements see the next chapter.
Composite Queue, Topic Subscriber
If your data streams are based on topics, it is no problem to add a stream as another subscriber to that topic. If they are queue-based, convert this queue to a composite queue and bind 2 new queues to it, one as the original destination queue and one for your stream as Input.
Connecting Streams together
Streams can be connected very easily - via queues or topics. This is very common if large tasks are split into several smaller tasks and implemented as streams. One stream outputs its results into a queue where another stream picks it up and generates a result into another queue. This repeats until the final result is accomplished.
Scaling Streams locally and in Router Networks
Streams are single-threaded. Therefore the throughput of a single stream is somewhat limited and depends highly on the amount of analysis a stream has to perform. However, the same stream can run in multiple instances in parallel, consuming from the same queues/topics. This results in more threads and more throughput.
To make streams able to run in multiple instances, all queue names that are used as store for permanent Memories must be given as parameters.
Streams can be horizontally scaled on the same Router like this:
And, of course, in a Router network:
Using Streams with SwiftMQ HA Router
If your stream consumes persistent messages, stores them in permanent Memories and sends persistent messages to Outputs, you certainly need transactional consistency in case of a failover.
To guarantee this, you need to enable Multi-Queue Transaction Global Lock in the Queue Manager Swiftlet:
It ensures that consumption, storing and production in a stream is logged as one single transaction so no data is lost nor processed twice.
Managing Streams
To have a clear logical separation between Streams, we have introduced a new structure in SwiftMQ 10.1.0. It consists of domains, packages, and inside packages the Stream declaration itself.
Create a Domain
A domain is the top-level hierarchy of the Stream structure. It is usually a company name. The domain name swiftmq
is reserved.
Go to Streams Swiftlet, select Domains
, and click Create new Entity ...
:
Create a Package
A domain can have an unlimited number of packages. A package is a logical unit to define Streams that belong together.
Go to the newly created domain, expand it and select Packages
, and click Create new Entity ...
:
Create a Stream
Go to the newly created package, expand it and select Streams
, and click Create new Entity ...
:
Property Name | Purpose |
---|---|
Name | A unique name for this script. |
Enabled | Check it to start the script. Uncheck it stops it. |
Restart Delay | Specifies the delay in milliseconds between Stream restarts. -1 disables it. |
Maximum Restarts | Specifies the number of maximum restarts if a Stream throws an exception. -1 disables it. |
Script File | Absolute path and name of the script file. |
Script Language | The name of the language. See the info.log of the Streams Swiftlet to see which engines are available and what the name is. For example, if you use Jython, use "python" as name. |
If the stream has parameters, expand the new stream, select Parameters
, click Create new Entity ...
and add the parameters.
Start/Stop a Stream
A stream is started if the Enabled
property is checked.
In case there is an exception, please have a look at the stream's log file in the Log Sink Directory (see Log Swiftlet).
Note: All queues used by the stream (e.g. input queues, queues for permanent Memories) must be created separately before starting the script.
A stream is stopped if the Enabled
property is unchecked.
If you have changed the script code, just disable and enable the script to get the new version.
Delete a Stream
Go to the stream entity, select it, and click Delete Entity
will delete the stream.
Stream Monitoring
After a Stream has been enabled, a new entity under the Usage
node of the Streams Swiftlet is created. This entity contains the live usage data of the stream such as the last time in microseconds it took in the onMessage callback, the stream processing rate in messages per second, and the total number of messages processed by the stream:
System Streams
Purpose
The Mail Stream receives TextMessages
from an input queue, converts it to an eMail, sets default from, to, cc, bcc, and subject and sends the eMail to the configured mail server as plain text mail.
All Monitor Streams have a parameter mailout-queue
. If set, they send their states as eMail to the corresponding recipients.
The intention behind the Mail Streams was to concentrate mail configurations in the Mail Streams and not to have that in each Monitor Stream. There can be multiple instances of Mail Streams, each with different configurations so Monitor Streams just send their state to a queue. Other applications can of course use the Mail Streams too.
Location
Domain | Package | Stream |
---|---|---|
swiftmq | mailout.js |
Parameters
Name | Mandatory | Default | Description |
---|---|---|---|
input-queue | No | streams_mailout | Input queue to receive mail messages. The queue is automatically created if it not exists. |
servername | No | localhost | The hostname of the mail server. |
username | Yes | None | The username of the mail account. |
password | Yes | None | The password of the mail account. |
default-from | No | None | Default "From" address. |
default-to | No | None | Default "To" address. |
default-cc | No | None | Default "CC" address. |
default-bcc | No | None | Default "BCC" address. |
default-subject | No | None | Default subject. |
Message Scheduler
Purpose
The Message Scheduler Stream handels message schedules sent from JMS or AMQP clients if a message should be delivered with a delay to a destination (queue or topic). The Stream understands message schedule properties of the Scheduler Swiftlet's Message Scheduler and can be used as a drop-in replacement for it. It is able to handle much more schedules and is more scalable.
Location
Domain | Package | Stream |
---|---|---|
swiftmq | scheduler | messagescheduler.js |
Parameters
Name | Mandatory | Default | Description |
---|---|---|---|
input-queue | No | streams_scheduler_input | Input queue to receive message schedule requests. The queue is automatically created if it not exists. |
store-queue | No | streams_scheduler_store | Store queue to store message schedule requests. The queue is automatically created if it not exists. |
delay-format | No | dd.MM.yyyy HH:mm:ss | Date/Time format to parse the delay out of the schedule requests. |
interval | No | 10 | Timer interval in seconds in which the schedules are checked for expiration. |
purge-interval | No | 2 | Timer interval in minutes after unused Outputs are purged. |
max-batch-size | No | 10000 | Maximum number of expired schedules to be sent to their destinations in one batch. |
Scheduling Messages from Clients
When a client (JMS or AMQP) receives a message but cannot process it yet, it usually wants to redeliver the message sometime later. For this, it creates a message schedule by adding properties to the message specifying the delivery time and sends it to the Message Scheduler Stream's input queue. The Stream stores the schedule and delivers the very same message to the destination at the time specified in the schedule properties.
The Stream understands 2 flavors of scheduling properties: New flavor and Scheduler Swiftlet flavor.
New Flavor
Property | Meaning |
---|---|
streams_scheduler_delay | The date and time when the message should be delivered. The format is specified in the Streams's parameter "delay-format". |
streams_scheduler_destination | The name of the destination where the message should be delivered to. |
streams_scheduler_destination_type | The type of the destination (queue or topic). |
streams_scheduler_expiration | Optional. The final message expiration in milliseconds. This value is set on each message sent as the JMSExpiration. |
Scheduler Swiftlet Flavor
Scheduler Swiftlet flavor is supported for the most relevant parts:
Property | Meaning |
---|---|
JMS_SWIFTMQ_SCHEDULER_DATE_FROM | The date when the message should be delivered. The format is yyyy-MM-dd or 'now'. |
JMS_SWIFTMQ_SCHEDULER_TIME_EXPRESSION | The delivery time. Either "at HH:mm" or "at HH:mm:ss". |
JMS_SWIFTMQ_SCHEDULER_DESTINATION | The name of the destination where the message should be delivered to. |
JMS_SWIFTMQ_SCHEDULER_DESTINATION_TYPE | The type of the destination (queue or topic). |
JMS_SWIFTMQ_SCHEDULER_EXPIRATION | Optional. The final message expiration in milliseconds. This value is set on each message sent as the JMSExpiration. |
How to scale this Stream
One instance of the Stream is able to handle thousands of schedules with ease. If that is not enough, the Stream can be scaled as follows:
Multiple Instances on the same Input Queue
With this option, you create multiple instances of the Stream and specify the same "input-queue" parameter but for each instance a different "store-queue". The Streams act as concurrent consumers on the input queue but then work independently of each other in separate threads. This gives more throughput.
Multiple Instances on different Input Queues
With this option, you create multiple instances of the Stream and specify a different "input-queue" parameter and a different "store-queue" for each instance. This may be useful if a dedicated Stream should handle schedules for dedicated applications. In this case, the application needs to send the schedules to the input queue of their dedicated Stream.
Multiple Instances on the same and different Input Queues
This is a mix of the above options with dedicated Streams where some of them might have multiple instances listening on the same input queue.
Route Announcer
Stream Name
stream_{routername}_routeannouncer
Purpose
The Stream observes all static routes (sys$routing/static-routes
) versus all active routes (sys$routing/usage/routing-table). When it detects that a static route doesn't have a corresponding active route in the routing table, the remote Router is unreachable. If a remote Router reconnects and provides an active route, it becomes reachable. In both cases, the Stream sends an event to the Stream's topic.
If a remote Router should not be observed, the corresponding static route needs to be deleted. And, vice versa, if it should be observed, a static route needs to be created.
Location
Domain | Package | Stream |
---|---|---|
swiftmq | system | routeannouncer.js |
Parameters
Name | Mandatory | Default | Description |
---|---|---|---|
topic | No | Stream Name | Topic where the Streams sends its events. |
Events
All events are sent as TextMessage.
Stream Registry
Stream Name
stream_{routername}_streamregistry
Purpose
This Stream provides a registry for other Streams to register their availability/unavailability.
Location
Domain | Package | Stream |
---|---|---|
swiftmq | system | streamregistry.js |
Parameters
Name | Mandatory | Default | Description |
---|---|---|---|
topic | No | Stream Name | Topic where the Streams sends its events. |
Events
All events are sent as TextMessage.
Registry Requests
Streams that want to be enlisted in the Stream registry send registry requests to the Stream registry topic.
Stream Monitor
Purpose
This Stream connects to the Stream registry and for each Stream it sends an init request and subscribes for subsequent updates. The content is logged into the Stream monitor's log file.
Caution! This Stream is for debugging purposes only and therefore disabled by default. Enabling it when running SwiftMQ Dashboard, for example, will create a large amount of log output.
Location
Domain | Package | Stream |
---|---|---|
swiftmq | system | streammonitor.js |
Parameters
Name | Mandatory | Default | Description |
---|---|---|---|
registry-topic | No | stream_{routername}_streamregistry | Name of the Stream registry topic. |
Stream Repository
Purpose
This Stream provides a Stream script repository to a SwiftMQ Router. Stream scripts can be uploaded into it and then be loaded and executed out of the repository. Upload and management of the repositories take place remotely by client programs repojms
resp. repoamqp
which is part of the SwiftMQ distribution.
Repositories are named and independent of each other.
In contrast to store Stream scripts locally on a Router's host, uploading them from remote into a Router repository provides a way to distribute Stream scripts in a Router Network very easily through the above clients.
Repositories are stored in a persistent queue inside the Router's persistent store. For SwiftMQ HA Routers the repositories are available after a failover so they only need to be uploaded to the ACTIVE instance.
Location
Domain | Package | Stream |
---|---|---|
swiftmq | system | streamrepository.js |
Parameters
Name | Mandatory | Default | Description |
---|---|---|---|
input-queue | No | streamrepo | Queue where the Stream receives requests. |
store-queue | No | streamrepo_store | Queue where the Stream stores its repositories. |
Clients
The repository clients are located under:
<swiftmq-dir>/scripts/<platform>
A JMS-based repository client called repojms
and a AMQP based client called repoamqp
are available. Both differ only in their connection parameters. The commands are the same.
Load and Execute a Stream Script from a Repository
This is done by defining a repository URL in the script-file
property of a Stream configuration. The URL has the following format:
repository:/<reponame>/<filename>
Example configuration:
<stream name="wh_accounting" enabled="true" script-file="repository:/warehouse/accounting.js">
<dependencies>
<dependency name="swiftmq.samples.wh_store"/>
</dependencies>
<parameters>
<parameter name="accounting-topic" value="stream_router1_sample_wh_accounting"/>
<parameter name="customer-invoice-topic" value="stream_router1_sample_wh_customer_invoice"/>
</parameters>
</stream>
Jobs
Overview
The Streams Swiftlet registers a job in job group Streams
at the Scheduler Swiftlet:
This job can be scheduled via the Scheduler Swiftlet to run at specific times or in intervals, based on calendars and so on.
Stream Activator
The Stream Activator job activates a stream on job start and deactivates it on job stop. The name of the stream must be specified as a parameter. The job requires setting a maximum runtime to deactivate the stream. Otherwise, it would be active forever.
The implementation of the Stream Activator job is quite simple. It sets the enabled
attribute to true
on job start and to false
on job stop. Therefore, a stream that should run as a job should be initially disabled.
Parameter | Mandatory | Description |
---|---|---|
Domain Name | Yes | Name of the domain. |
Package Name | Yes | Name of the package. |
Stream Name | Yes | Name of the stream. |
Examples
Note: All SwiftMQ Streams samples install in domain swiftmq
, package samples
.
Nuclear Power Plant Monitor
Location of this example: samples/streams/nuclear
This example monitors temperatures generated from sensors of a nuclear power plant and generates monitor, warning, and critical messages. It is an implementation of this article in Dzone which uses Esper, a CEP engine.
The example contains the following scripts:
Script Name | Purpose | Parameters |
---|---|---|
tempmonitor.js | Monitors temperatures and generates monitor, warning, critical messages. | input-queue, a set of mail properties |
tempproducer.js | Sends temperature messages in an interval. | output-queue |
To install the example, go to the Router's working dir scripts
and invoke CLI. Perform
sr <routername>
execute ../samples/streams/nuclear/install.cli
The example is then installed but the scripts are disabled.
To remove the example, go to the Router's working dir scripts
and invoke CLI. Perform
sr <routername>
execute ../samples/streams/nuclear/remove.cli
The example is then removed.
Order Collector
Location of this example: samples/streams/ordercollector
This example receives order head messages from one queue and order position messages from another queue. Both come in serial order. Each order head message contains the number of order positions. When all order positions of an order head are received, it generates a text message with an XML body that contains the complete order.
The Order Collector is an implementation of the Enterprise Integration Pattern Aggregator.
The example contains the following scripts:
Script Name | Purpose | Parameters |
---|---|---|
ordercollector.js | Collects orders. | orderhead-queue, orderpos-queue, output-queue |
orderheadproducer.js | Sends order head messages in an 5 sec interval. | output-queue |
orderposproducer.js | Sends order position messages in an 1 sec interval. | output-queue |
To install the example, go to the Router's working dir scripts
and invoke CLI. Perform
sr <routername>
execute ../samples/streams/ordercollector/install.cli
The example is then installed but the scripts are disabled.
To remove the example, go to the Router's working dir scripts
and invoke CLI. Perform
sr <routername>
execute ../samples/streams/ordercollector/remove.cli
The example is then removed.
Echo Service
Location of this example: samples/streams/management
This example is a simple echo service that sends a reply to each request it receives. It creates a temporary queue that acts as Input and registers it in JNDI under the name requestQueue
. JMS clients can look it up and can send TextMessages
as requests and will receive a reply. Please use samples/router_network/P2PRequestor
to test it.
The example contains the following script:
Script Name | Purpose | Parameters |
---|---|---|
replier.js | Echo Service. | none |
To install the example, go to the Router's working dir scripts
and invoke CLI. Perform
sr <routername>
execute ../samples/streams/replier/install.cli
The example is then installed but the scripts are disabled.
To remove the example, go to the Router's working dir scripts
and invoke CLI. Perform
sr <routername>
execute ../samples/streams/replier/remove.cli
The example is then removed.
Management: Queue Status
Location of this example: samples/streams/management
This example creates a ManagementInput on sys$queuemanager/usage
and receives add/change/remove notifications. It stores these events in a MemoryGroup and prints the average message count per minute for each queue in use. The example demonstrates the use of ManagementInputs and how to use MemoryGroups for session windowing.
The example contains the following script:
Script Name | Purpose | Parameters |
---|---|---|
queuestats.js | Queue Statistics. | none |
To install the example, go to the Router's working dir scripts
and invoke CLI. Perform
sr <routername>
execute ../samples/streams/management/install.cli
The example is then installed but the scripts are disabled.
To remove the example, go to the Router's working dir scripts
and invoke CLI. Perform
sr <routername>
execute ../samples/streams/management/remove.cli
The example is then removed.
Configuration
The configuration of the Streams Swiftlet is defined within the element
<swiftlet name="sys$streams" .../>
of the router's configuration file.
Attributes of Element "swiftlet"
Definition
Attribute | Type | Mandatory | Description |
---|---|---|---|
collect-interval | java.lang.Long | No | Interval for collecting Stream usage information |
stream-grant-predicates | java.lang.String | No | SQL-Like Predicates to grant Stream Topic access |
Values
Attribute | Values |
---|---|
collect-interval | Default: 1000 |
stream-grant-predicates | Default: stream\_% |
Element List "domains", Parent Element: "swiftlet"
Stream Domain Names. This element list contains zero or more "domain" elements with this template definition:
Definition
Attribute | Type | Mandatory | Description |
---|---|---|---|
name | java.lang.String | Yes | Name of this Domain |
Element List "packages", Parent Element: "domain"
Stream Packages. This element list contains zero or more "package" elements with this template definition:
Definition
Attribute | Type | Mandatory | Description |
---|---|---|---|
name | java.lang.String | Yes | Name of this Package |
Element List "streams", Parent Element: "package"
Complex Event Streams. This element list contains zero or more "stream" elements with this template definition:
Definition
Attribute | Type | Mandatory | Description |
---|---|---|---|
name | java.lang.String | Yes | Name of this Stream |
restart-delay | java.lang.Long | No | Delay in ms to restart the Stream on an exception |
restart-max | java.lang.Integer | No | Maximum number of restart attempts |
script-language | java.lang.String | No | Name of the Scripting Language |
script-file | java.lang.String | Yes | Name of the Script File |
enabled | java.lang.Boolean | No | Enables/Disables this Stream |
Values
Attribute | Values |
---|---|
restart-delay | Default: -1 |
restart-max | Default: -1 |
script-language | Default: JavaScript |
script-file | |
enabled | Default: false |
Element List "parameters", Parent Element: "stream"
Parameters. This element list contains zero or more "parameter" elements with this template definition:
Definition
Attribute | Type | Mandatory | Description |
---|---|---|---|
name | java.lang.String | Yes | Name of this Parameter |
value | java.lang.String | Yes | Value |
Values
Attribute | Values |
---|---|
value |
Element List "dependencies", Parent Element: "stream"
Dependencies from other Streams. This element list contains zero or more "dependency" elements with this template definition:
Definition
Attribute | Type | Mandatory | Description |
---|---|---|---|
name | java.lang.String | Yes | Name of this Dependency |
Element List "usage", Parent Element: "swiftlet"
Active Streams. This element list contains zero or more "stream" elements with this template definition:
Definition
Attribute | Type | Mandatory | Description |
---|---|---|---|
name | java.lang.String | Yes | Name of this Active Stream |
starttime | java.lang.String | No | Start Time of this Stream |
stream-last-onmessage-time | java.lang.Integer | No | Last Time in µs it took in onMessage |
stream-processing-rate | java.lang.Integer | No | Messages/Sec processed by this Stream |
stream-total-processed | java.lang.Integer | No | Total Number of Messages processed by this Stream |
Values
Attribute | Values |
---|---|
starttime | |
stream-last-onmessage-time | Default: 0 |
stream-processing-rate | Default: 0 |
stream-total-processed | Default: 0 |
Element List "inputs", Parent Element: "stream"
Inputs. This element list contains zero or more "input" elements with this template definition:
Definition
Attribute | Type | Mandatory | Description |
---|---|---|---|
name | java.lang.String | Yes | Name of this Input |
01-type | java.lang.String | No | Type of this Input |
destinationname | java.lang.String | No | Destination Name |
input-processing-rate | java.lang.Integer | No | Messages/Sec processed by this Input |
input-total-processed | java.lang.Integer | No | Total Number of Messages processed by this Input |
Values
Attribute | Values |
---|---|
01-type | |
destinationname | |
input-processing-rate | Default: 0 |
input-total-processed | Default: 0 |
Element List "timers", Parent Element: "stream"
Timers. This element list contains zero or more "timer" elements with this template definition:
Definition
Attribute | Type | Mandatory | Description |
---|---|---|---|
name | java.lang.String | Yes | Name of this Timer |
timer-last-ontimer-time | java.lang.Integer | No | Last Time in µs it took in onTimer |
timer-type | java.lang.String | No | Type of this Timer |
started | java.lang.Boolean | No | Timer Started? |
Values
Attribute | Values |
---|---|
timer-last-ontimer-time | Default: 0 |
timer-type | |
started | Default: false |
Element List "memories", Parent Element: "stream"
Memories. This element list contains zero or more "memory" elements with this template definition:
Definition
Attribute | Type | Mandatory | Description |
---|---|---|---|
name | java.lang.String | Yes | Name of this Memory |
memory-type | java.lang.String | No | Type of this Memory |
size | java.lang.Integer | No | Size of this Memory in Messages |
Values
Attribute | Values |
---|---|
memory-type | |
size | Default: 0 |
Element List "jdbclookups", Parent Element: "stream"
JDBCLookups. This element list contains zero or more "jdbclookup" elements with this template definition:
Definition
Attribute | Type | Mandatory | Description |
---|---|---|---|
name | java.lang.String | Yes | Name of this JDBCLookup |
last-query-time | java.lang.Integer | No | Last Time in µs it took in query |
Values
Attribute | Values |
---|---|
last-query-time | Default: 0 |
Element List "mailservers", Parent Element: "stream"
Mail Servers. This element list contains zero or more "mailserver" elements with this template definition:
Definition
Attribute | Type | Mandatory | Description |
---|---|---|---|
name | java.lang.String | Yes | Name of this Mail Server |
emails-sent | java.lang.Integer | No | Number of eMail sent |
Values
Attribute | Values |
---|---|
emails-sent | Default: 0 |
Element List "tempqueues", Parent Element: "stream"
Temp Queues. This element list contains zero or more "tempqueue" elements with this template definition:
Definition
Attribute | Type | Mandatory | Description |
---|---|---|---|
name | java.lang.String | Yes | Name of this Temp Queue |
queuename | java.lang.String | No | Name of the temp Queue |
Values
Attribute | Values |
---|---|
queuename |
Element List "outputs", Parent Element: "stream"
Outputs. This element list contains zero or more "output" elements with this template definition:
Definition
Attribute | Type | Mandatory | Description |
---|---|---|---|
name | java.lang.String | Yes | Name of this Output |
01-type | java.lang.String | No | Type of this Output |
messages-sent | java.lang.Integer | No | Number of Messages sent to this Output |
Values
Attribute | Values |
---|---|
01-type | |
messages-sent | Default: 0 |
Element List "dashboard", Parent Element: "stream"
Dashboard. This element list contains zero or more "widget" elements with this template definition:
Definition
Attribute | Type | Mandatory | Description |
---|---|---|---|
name | java.lang.String | Yes | Name of this Widget |
description | java.lang.String | No | Widget Description |
type | java.lang.String | No | Widget Type |
Values
Attribute | Values |
---|---|
description | |
type | Choice: statistic monitor |