Skip to main content
Skip table of contents

Streams Swiftlet

SwiftMQ Streams is a platform to run microservices - called Streams - within a SwiftMQ Router.

Advantages of SwiftMQ Streams

The big advantage over other libraries that run microservices on a JVM is this:

  • The JVM owner SwiftMQ is a full-blown Enterprise Messaging System.

  • Streams are fully integrated with the internal messaging infrastructure of SwiftMQ.

  • Applications can communicate out of the box with Streams through normal JMS/AMQP messages.

  • SwiftMQ's integrated Federated Router Network let Streams communicate with Streams on other nodes transparently.

  • Streams can intercept, analyze and visualize any message flow sent between applications without changing them.

  • Everything is provided by a SwiftMQ Router. No external libraries, no wiring and deployment hassle.

Features

Per definition, microservices are:

  • Small in size.

  • Loosely coupled.

  • Self-contained.

  • Easily replaceable.

  • Autonomously developed.

  • Independently deployable.

  • Communicate over Messages.

A Stream completely conforms to these definitions and provides the following additional features:

Streams are implemented in JavaScript

Users implement Streams in JavaScript (the latest ECMAScript standard is supported by GraalVM). These scripts use an elegant and powerful Java DSL, the Stream Interface, to interact with the stream.

Here is a nice JavaScript example:

CODE
stream.memory("itemstats").group("ITEMNO").sum("QTY").sort("QTY").reverse();

This statement

  • uses a Memory called itemstats where order items are stored in a sliding window over the last 30 seconds,

  • groups it by property ITEMNO,

  • computes the sum over property QTY (quantity),

  • sorts the result,

  • and reverses the order.

The result is a statistic of all order items in descending order over the last 30 seconds.

Single-threaded

The processing of a Stream takes place single-threaded. No side effects, no synchronization overhead, no locks.

Event-driven

A Stream is driven by the following events:

  • A Message arrives on a queue or topic.

  • A management event occurs.

  • A Timer event occurs.

  • A Stream's start or stop event occurs.

Data is private

All data that Streams store or create are local to the Stream. Data is not shared. The only way to share it is to communicate it via Messages to other Streams. This makes a Stream very simple and replaceable.

Stateful / stateless

A Stream can store data in so-called Memories and compute it. Memories can be stateful and stateless. A stateful Memory stores its content in a persistent queue which survives restarts of a Router and failover of a HA Router. A stateless Memory stores its content on the heap or, for larger content, in a temporary queue.

Transactional

A transaction is automatically started whenever an event on the Stream occurs. All actions done within the Stream such as sending to Outputs, and adding to / removing from Memories are transactionally consistent. It is not possible to perform these actions outside of a transaction.

When the event has been processed, all changes that have been performed are fully committed. If an error/exception occurs during the processing of the event, the transaction will be aborted and all changes will be rolled back. For a message event, this means that the Message is being redelivered.

Fault-tolerant

Streams that run on a SwiftMQ High Availability Router are automatically fault-tolerant. The Stream scripts and configuration as well as any data changes are automatically replicated to the STANDBY instance and, in case of a failover, Streams continue after the failover where they have stopped before without duplicates or missed messages.

Streams have a restart policy that makes them fault-tolerant to internal exceptions such as the unavailability of resources.

Stream dependencies

Streams can be configured to depend on other Streams to ensure a consistent start/stop semantic. A dependency graph is automatically created. When a Stream is started, all dependent Streams are started first; when a Stream is stopped, all Streams that depend on this Stream are stopped first. This is done recursively through the whole dependency graph.

XML
<stream name="orderprobe" enabled="true" script-file="repository:/dashboard/propertyprobe.js">
  <dependencies>
    <dependency name="dashboard.warehouse.store"/>
    <dependency name="swiftmq.system.streamregistry"/>
  </dependencies>
  <parameters>
    <parameter name="destination-name" value="stream_router1_dashboard_warehouse_accounting"/>
    <parameter name="destination-type" value="topic"/>
    <parameter name="props" value="total nitems"/>
  </parameters>
</stream>

Dependency configuration allows to build large Apps as a single deployable unit that may contain hundreds of single Streams that are automatically started and stopped in a consistent manner.

Remotely deployable

Each SwiftMQ and High Availability Router provides a local Stream Repository which stores Stream scripts (the sources) in named repositories. These repositories are backed by a persistent queue so in case of a failover of a HA Router these Stream scripts are automatically available at the other HA instance.

DevOps use a simple repo client to upload Stream scripts to any Router in a Router Network. When a Stream is started, the source is fetched out of the local repository. So new Streams or changes are immediately available at remote Routers without the need to log in to a remote host.

CODE
./repojms smqp://localhost:4001 ConnectionFactory admin secret router1 \
          add warehouse /opt/swiftmq_1_0_0_dashboard/apps/warehouse/streams js

accounting.js added to repository warehouse
customer.js added to repository warehouse
ordermonitor.js added to repository warehouse
producer.js added to repository warehouse
reordermonitor.js added to repository warehouse
shipper.js added to repository warehouse
shippingmonitor.js added to repository warehouse
store.js added to repository warehouse

Scalable over Router Networks

Streams communicate via Messages over queues and/or topics. They can be deployed on a single Router or on many different nodes of a Router Network. SwiftMQ's integrated Federated Router Network provides transparent Message routing so that Streams on one node can communicate with Streams on other nodes transparently.

With this, it is possible to split time-consuming tasks to many different nodes or to process these tasks at the data source (edge) and send results to other nodes for further processing.

Realtime Streaming Analytics

A Stream can intercept any queue by a wiretap and any topic as a subscriber as well as any event from the Router's Management Tree. It is possible to intercept the Message flow of any JMS/AMQP application that communicates over SwiftMQ. Neither the application nor the Messages they exchange have to be modified.

JS
// Create a wiretap input on the orderpos queue.
stream.create().input(orderPosQueue).wiretap("w1").onInput(function (input) {
  // We need ITEMNO and QTY as a property
  input.current().property("ITEMNO").set(input.current().body().get("ITEMNO").toInteger());
  input.current().property("QTY").set(input.current().body().get("QTY").toInteger());

  stream.memory("itemstats").add(input.current());
});

Intercepted Messages are stored in Memories that use Message Properties to store, query, and index them. Therefore, a Message can be seen and processed like a row in a database table:

The data can be analyzed by using the real-time streaming analytics capabilities of the Streaming Interface which are on par with other popular real-time streaming analytics libraries. It provides sliding, tumbling, and session windows, event time processing, and late arrival handling, to name a few features.

JS
// Callback function for onRetire to print the statistics
function printStatistic(retired){
  // Generate statistics in the streams's log file
  stream.log().info("Item Statistics:");

  // Get and log the statistic data (items with summarized quantity in descending order)
  retired.group("ITEMNO").sum("QTY").sort("QTY").reverse().forEach(function(message){
    stream.log().info(message.property("ITEMNO").value().toString() + " = " +
              message.property("QTY").value().toString());
  });
}
// Create itemstats Memory
stream.create().memory("itemstats")
                .heap()
                .limit()
                .time()
                .tumbling()
                .seconds(10)
                .onRetire(printStatistic);

Stream Interface

Code Completion

Since: 10.2.0

We provide TypeScript stubs that can be installed in IDEs like IntelliJ IDEA so that the IDE supports JavaScript code completion for the Streams Interface.

First, download it here:

File

Description

swiftmq_streams_12_00_00.ts

TypeScript stubs for release 12.0.0 and later.

Next, install it in your IDE. Example IntelliJ IDEA:

... and you have code completion:

Parameters

Since: 10.1.0

A SwiftMQ Streams script can have an unlimited number of parameters. These are defined in the script definition:

When a script is started, a script-global variable parameters is passed that is only visible to this particular script (engine scope).

Get

To get the value of a parameter, use the get method. It returns the value or null if the parameter is not defined:

JS
  var inputQueue = parameters.get("input-queue");

Require

This method defines a required (mandatory) parameter. It throws an exception that leads to a stop of this script if the parameter is not defined:

JS
  var inputQueue = parameters.require("input-queue");

Optional

This method defines an optional parameter and provides a default value which is returned if the parameter is not defined:

JS
  var mailHost = parameters.optional("mail-host", "localhost");

Stream

Variable 'stream'

This is the entry point for scripts to interface with the stream processor. It is provided as a global script variable stream that is only visible to this particular script (engine scope). Everything can be declared out of this stream variable.

JS
  stream.create().input(inputTopic).topic();

Log

A Stream provides a Log where scripts can log informational, warning, and error messages. A separate log file is maintained by the Log Swiftlet (see Log Swiftlet, log sink directory) for each Stream. If there is an exception during stream processing, the stream processor will log the exception with the stack trace to the log file.

JS
  stream.log().info("this is an information");
  stream.log().warning("this is a warning");
  stream.log().error("this is an error");

CLI

Since: 10.1.0

A Stream provides a CLI interface to the local Router's management tree to execute any CLI command, e.g. changing contexts, creating queues, rebooting the Router. Before a command can be executed, a cc must be executed to change to the respective CLI context. The root context (top of the management tree) is / and is the default when a CLI command is executed the first time.

JS
  stream.cli().execute("cc /sys$queuemanager/queues")
      .execute("new myqueue cache-size 1000")
      .execute("save");

Whenever a CLI command fails, an exception will be thrown which will either lead to a stop of the Stream or enter the Stream's restart policy (see chapter below). But sometimes it is adequate to tolerate and thus ignore exceptions for particular commands, e.g. if a queue should be deleted but the queue is not defined. In that case, just switch exceptions off for a particular command:

JS
  stream.cli().execute("cc /sys$queuemanager/queues")
      .exceptionOff()
      .execute("delete myqueue")
      .exceptionOn()
      .execute("new myqueue cache-size 1000")
      .execute("save");

Creating Temporary Queues

Since: 10.1.0

If a Stream acts as a service to other clients and receives request Messages on an Input, it is not always necessary to use a regular queue here if the Messages are non-persistent. For this case a Stream can create a temporary queue and registers it in JNDI so that JMS clients can perform a JNDI lookup and send Messages to this queue:

JS
  stream.create().tempQueue("requestQueue").registerJNDI();

A temporary queue can be explicitly deleted when it is not needed anymore:

CODE
  stream.tempQueue("requestQueue").delete();

Otherwise, it will be automatically deleted when the Stream stops.

Lookup Temporary Queues from JNDI

Since: 10.2.0

When multiple Streams communicate, one Stream will use a TempQueue as input and registers it in JNDI under a name. To be able that other Streams can send to this queue, a JNDI lookup can be performed and an Output can be created.

JS
  // One Stream registers its Input queue in JNDI
  stream.create().input(stream.create().tempQueue("requestQueue").registerJNDI()).queue();

  // The other stream performs a lookup and creates an Output
  stream.create().output("requestQueue").forAddress(stream.lookupJNDI("requestQueue"));

onMessage Callback

Messages flow from Inputs into the Stream. To process these Messages, an optional onMessage callback can be registered which is called from the Stream Processor for each Message. Registering an onMessage callback is not always necessary; Messages can also be processed in the respective onInput callbacks.

The onMessage callback is a function without parameters:

JS
  stream.onMessage(function() {
    // Do something
  });

During the onMessage callback "stream.current()" returns the current message that is being processed.

JS
  stream.onMessage(function() {
    stream.memory("all").add(stream.current());
  });

If a Message is handled in onInput and must not flow through the onMessage callback, disable it during onInput:

JS
  input.current().onMessageEnabled(false):

onException Callback

Since: 10.1.0

To get informed about exceptions that may occur during Stream processing an onException callback can be registered at the Stream. It is a function with 2 String parameters. The first is the exception, the second is the formatted stack trace:

JS
  stream.onException(function(lastException, lastStackTrace){
      print("Last Exception: "+lastException);
      print("Last Stacktrace: "+lastStackTrace);
  });

It is not necessary to log these exceptions to the Stream's log file as this is done by the Stream Processor anyway.

The onException callback can be used to inform an administrator by eMail:

JS
  stream.onException(function(lastException, lastStackTrace){
    stream.mailserver("localhost")
          .email()
          .from(stream.name()+"@company.com")
          .to("admin@company.com")
          .subject("Stream "+stream.name()+" got an Exception: "+lastException)
          .body(lastStackTrace)
          .send();
  });

Automatic Stream Restart

Since: 10.1.0

When an exception occurred anywhere during the Stream processing, the onException callback is called (if registered), the current transaction is rolled back, the exception is logged into the Stream's log file and the Stream is stopped.

There are 2 properties for each Stream to configure an automatic restart, e.g. if a database connection was temporarily down, etc. These are Restart Delay and Max Restarts. Restart delay specifies the time in milliseconds between restarts and max restarts is self-explaining:

The current restart count can be acquired from the Stream with:

JS
  stream.restartCount();

onStart Callback

Since 11.0.0

A Stream can set an onStart callback which is called when the Stream has been started. The intention of this callback is to let others know that the Stream is now available. This can be accomplished by sending a message to a registry to register or sending an eMail to someone.

The onStart callback is a function without parameters:

JS
  stream.onStart(function() {
    stream.output("registry")
          .send(stream.create()
                  .message().textMessage()
                        .body(stream.name()+": Available")));
  });

onStop Callback

Since 10.2.0

A Stream can set an onStop callback which is called just before all Stream resources are closed. The intention of this callback is to let others know that the Stream is now unavailable. This can be accomplished by sending a message to a registry to unregister or sending an eMail to someone.

The onStop callback is a function without parameters:

JS
  stream.onStop(function() {
    stream.output("registry")
          .send(stream.create()
                  .message().textMessage()
                        .body(stream.name()+": Unavailable")));
  });

Function Callback

Since 11.2.0

If a Stream uses external libraries and calls an asynchronous method where a result is passed through a callback, the Stream must enqueue the processing into the Stream's event queue. Hereto the Stream registers a Function Callback. The function is then called when the event arrives at the Stream.

To register a Function Callback, the function itself is registered and an optional context object (which can be any Object) is passed. The context is then passed as a parameter to the registered function:

JS
  someExternalLib.someAsyncMethod(function(result){
    // Enqueue it into the Stream's event queue
    stream.executeCallback(function(context){
      // context parameter is the result parameter passed to executeCallback
      // process it
    }, result);
  });

Messages

SwiftMQ Streams works exclusively with JMS messages. However, they are not very convenient to use and access so we have created a facade that wraps JMS messages and provides a more elegant way to access all parts of it.

When a message arrives on an Input, it is already wrapped with the facade. To create a new message (e.g. to send it via Output), use stream.create().message() which returns a MessageBuilder that has various methods to create the required message type.

This creates a persistent bytes message and puts a byte array into the body (since 11.2.0):

JS
stream.create()
      .message()
      .bytesMessage()
      .persistent()
      .body([1, 2, 3]);

This creates a persistent text message and puts a text into the body:

JS
stream.create()
      .message()
      .textMessage()
      .persistent()
      .body(msg);

This creates a nonpersistent stream message and writes 2 values to the body:

JS
  stream.create()
        .message()
        .streamMessage()
        .nonpersistent()
        .body()
        .writeInt(100)
        .writeString("Hello");

This creates a persistent map message, sets 2 properties, the correlation id and sets 2 values in the body:

JS
  stream.create()
        .message()
        .mapMessage()
        .persistent()
        .correlationId(otherMsg.messageId())
        .property("ACCOUNTID").set(1234)
        .property("ORDERID").set(4331)
        .body()
        .set("QTY", 100)
        .set("COUNTRY", "DE");

By accessing a message property, a PropertyValue is returned (.value()) which has several conversion methods (e.g. .toDouble()) where method toObject() just returns the object type that is stored as value (e.g. java.lang.Double):

JS
  var val = stream.current().property("temp").value().toInteger();

Property Set

Since: 10.1.0

To get all Properties of a Message, use properties() which returns a PropertySet. This has various methods to select Properties by their names like startsWith(), endsWith(), select(regex). Each of these methods returns a new PropertySet. Once you are happy with the selected set, use forEach to iterate over the result:

JS
    // Returns all Properties not starting with an underscore and prints it
    input.current().properties().notStartsWith("_").forEach(function(property){
        print(property.name()+"="+property.value().toString());
    });

Determining the Message Type

Since: 11.2.0

Sometimes it is required to check the message type of an arriving Message to process it. Each Message facade has a type() method that returns the Message type: "message", "bytes", "text", "stream" or "map", respectively.

JS
  var msg = input.current();
  switch (msg.type()){
      case "bytes":
        // BytesMessage
        break;
      case "text":
        // TextMessage
        break;
      case "stream":
        // StreamMessage
        break;
      case "map":
        // MapMessage
        break;
      case "message":
        // Message
        break;
  }

Inputs

Inputs are created out of the stream variable and can consume on queues and/or topics, including durable subscribers, and from the Router's Management Tree. The number of Inputs created for a stream is not limited.

Each Input may have an optional callback onInput which is called from the stream processor when a message is received on this Input. It can be used to store the message in a memory, to convert or enrich a message. The callback is a function with a single parameter which is the Input itself. input.current()returns the current message of this Input.

JS
stream.create().input(orderHeadQueue).queue().onInput(function (input) {
  stream.memory("orderhead").add(input.current());
});

stream.create().input(orderPosQueue).queue().onInput(function (input) {
  stream.memory("orderpos").add(input.current());

  // We need the QTY as a property
  input.current().property("QTY").set(input.current().body().get("QTY").toInteger());
  stream.memory("itemstats").add(input.current());
});

Queue Input (Regular Queue)

A Queue Input consumes messages from a queue.

JS
  stream.create().input(orderHeadQueue).queue().onInput(function (input) {
    stream.memory("orderhead").add(input.current());
  });

Queue Input (Temporary Queue)

Since: 10.1.0

If a Stream acts as a service to other clients and receives request Messages on an Input, it is not always necessary to use a regular queue here if the Messages are non-persistent. For this case a Stream can create a temporary queue, registers it in JNDI and creates a Queue Input so that JMS clients can perform a JNDI lookup and send Messages to this queue.

JS
  stream.create().input(stream.create().tempQueue("requestQueue").registerJNDI()).queue();

Queue Input (Wire Tap)

Since 10.2.0

You can wiretap any queue by adding a QueueWireTapInput to it. This is fully dynamic and doesn't require any configuration changes. Once a WireTap has been added to a queue, it sends a copy of each message sent to this queue also to the WireTap.

JS
  // Create a wiretap input on the orderpos queue.
  stream.create().input(orderPosQueue).wiretap("w1").onInput(function (input) {
      // We need ITEMNO and QTY as a property
      input.current().property("ITEMNO").set(input.current().body().get("ITEMNO").toInteger());
      input.current().property("QTY").set(input.current().body().get("QTY").toInteger());

      stream.memory("itemstats").add(input.current());
  });

A WireTap is identified by its name and has subscribers. If a Stream registers a WireTap under a name and a second Stream registers it under the same name, a single WireTap with 2 subscribers is created and Message copies are distributed round-robin to the subscribers. If a Stream registers it under a different name, another WireTap with a single subscriber is created and will receive its distinct Message copy.

A WireTap processes messages outside a transaction via memory buffering. If a subscriber is too slow, messages may not be sent to a subscriber if the memory buffer is full. So a WireTap is only useful if no transactional consistency is required and if a message lost can be tolerated, e.g. in statistic scenarios. For higher requirements use a regular Queue Input.

Memory buffer behavior is specified by the buffer size of the internal memory of the QueueWireTapInput and the maximum blocking time in case the memory is full. The default for buffer size is 10 Messages, for maximum block time is 500 milliseconds.

JS
  // Create a wiretap input with 100 messages buffer and no block time
  stream.create().input(orderPosQueue).wiretap("w1").bufferSize(100).maxBlockTime(0).onInput(function (input) {
      // We need ITEMNO and QTY as a property
      input.current().property("ITEMNO").set(input.current().body().get("ITEMNO").toInteger());
      input.current().property("QTY").set(input.current().body().get("QTY").toInteger());

      stream.memory("itemstats").add(input.current());
  });

Non-durable Topic Input

A non-durable Topic Input consumes messages from a topic. It creates a non-durable subcriber on it.

JS
  stream.create().input(inputTopic).topic().onInput(function (input) {
    stream.memory("mymemory").add(input.current());
  });

Durable Topic Input

A durable Topic Input consumes messages from a topic. It creates or reuses an existing durable subcriber on it. A client id and a durable name must be provided to identify the durable subscriber.

JS
  stream.create().input(inputTopic).topic().durable()
        .clientId("mycid")
        .durableName("myname")
        .onInput(function (input) {
          stream.memory("mymemory").add(input.current());
        });

Message Selectors

To limit the number of messages that arrive on an Input, a JMS message selector can be supplied on each Input type.

JS
  stream.create().input(orderHeadQueue).queue()
        .selector("ORDERHEADID between 1 and 10000")
        .onInput(function (input) {
          stream.memory("orderhead").add(input.current());
        });

Creating multiple Inputs on the same Destination

Since 10.2.0

Sometimes it might be necessary to create multiple Inputs on the same destination, e.g. to have multiple subscribers on the same topic, each with a different message selector. To archive this, create the Inputs under a different name and use destinationName(name) to set the destination name.

JS
  // Input 1
  stream.create().input("input1").topic().destinationName(inputTopic)
        .selector("ORDERHEADID between 1 and 10000")
        .onInput(function (input) {
          // Do something
        });

  // Input 2
  stream.create().input("input2").topic().destinationName(inputTopic)
        .selector("ORDERHEADID between 100001 and 20000")
        .onInput(function (input) {
          // Do something
        });

Management Input

Since: 10.1.0

A Management Input consumes events from the local Router's management tree and converts them into Messages. The name of the Input is the CLI context and can refer to an EntityList, an Entity, or a Property.

Starting an Input dynamically

Since: 10.1.0

Inputs are usually created outside of callbacks. The Stream starts these Inputs automatically. If an Input is created inside a callback, e.g. as a result of an event from another Input or on a Timer event, it must be started explicitly:

JS
  stream.create().input("sys$queuemanager/usage/testqueue/messagecount").management().onChange(function(input){
      if (input.current().property("messagecount").value().toInteger() > 0 && stream.input("testqueue") == null)
          stream.create().input("testqueue").queue().onInput(function(input){
          print("onInput (testqueue): "+input.current().body());
          }).start();
      else if (stream.input("testqueue") != null)
          stream.input("testqueue").close();
  });

Disabling to call onMessage

Since: 10.1.0

If a Message is handled in onInput and must not flow through the onMessage callback, disable it during onInput:

JS
  input.current().onMessageEnabled(false):

Closing an Input dynamically

Since: 10.1.0

An Input, like every other component, is automatically closed when the Stream is stopped. An Input can also be closed explicitly anytime with:

JS
  input.close();

Timer

Timers can be created for any purpose (mostly it would be to call checkLimit() on Memories or Memory Groups). A Timer must have a callback that is executed from the stream processor when this Timer event occurs. The callback is a function with a single parameter which is the Timer itself.

JS
stream.create().timer("monitor").interval().minutes(2).seconds(10).onTimer(function (timer) {
  stream.memory("all").checkLimit();
});

Interval Timer

An Interval Timer executes the onTimer callback in an interval that can be specified in days, hours, minutes, seconds, and milliseconds. Each call of these methods adds to the interval. For example, a Timer that should be executed every 90 minutes can be specified as:

JS
  stream.create().timer("monitor").interval().hours(1).minutes(30).onTimer(function (timer) {
    // Do something
  });

At Timer

Since 10.2.0

An At Timer executes the onTimer callback at specific absolute times. Multiple times can be specified. The Timer is recurring. That is, when all times have been executed, it continues the next day. The time format is either HH:MM or HH:MM:SS.

The following Timer fires at 10:00, 11:35:45, and 17:30 each day:

JS
  stream.create().timer("trigger").at().time("10:00").time("11:35:45").time("17:30").onTimer(function (timer) {
    // Do something
  });

Next Timer

Since 10.2.0

A Next Timer is a one-shot Timer that executes the onTimer callback at the beginning of a specific time: begin of the next second, minute, hour, or day.

CODE
  stream.create().timer("daystarter").next().beginOfDay().onTimer(function (timer) {
    // Do something
  });

Starting a Timer dynamically

Since: 10.1.0

Timers are usually created outside of callbacks. The Stream starts these Timers automatically. If a Timer is created inside a callback, e.g. as a result of an event from an Input or on a Timer event, it must be started explicitly:

JS
  // Starts a Timer each time a new JMS client connects
  var x = 0;
  stream.create().input("sys$jms/usage").management().onAdd(function(input){
      if (stream.timer("counter") == null)
      {
          stream.create().timer("counter").interval().seconds(1).onTimer(function(timer){
          print("onTimer, x="+(++x));
          if (x == 10)
          {
              x = 0;
              timer.close();
          }
          }).start();
      }
  });

Resetting/Reconfiguring a Timer

Since: 10.1.0

Should a Timer be reconfigured with a new interval without recreating it, it must be reset, the new value must be set, and then reconfigure must be called:

JS
  // Create a Management Input that receives changes on my own statistic-interval-sec parameter
  // and reconfigures the Timer
  stream.create().input("sys$streams/streams/"+stream.name()+"/parameters/statistic-interval-sec").management().onChange(function (input){

    // Get the new value
    var secs = input.current().property("value").value().toInteger();

    // Reset and reconfigure the Timer with the new value
    stream.timer("stats").reset().seconds(secs).reconfigure();

    // Recreate the itemstats Memory
    stream.memory("itemstats").close();
    stream.create().memory("itemstats").heap().limit().time().sliding().seconds(secs);

    // Log it into the Logfile
    stream.log().info("Statistic interval reconfigured to "+secs+" seconds");

    // Avoid that this Management Message is passed to onMessage
    input.current().onMessageEnabled(false);

  });

Closing a Timer dynamically

Since: 10.1.0

A Timer, like every other component, is automatically closed when the Stream is stopped. A Timer can also be closed explicitly anytime with:

JS
  timer.close();

Memories

The heart of SwiftMQ Streams is Memories. They store messages, adjust automatically, and provide many functions to analyze the data.

All messages in a Memory must have the same message properties if these properties are used in queries or to create an index. That way a message can be seen and processed like a row in a database table.

A Memory works exclusively over message properties and analyzing data works only if the data is part of the message properties. So in case some data are in the message body but not in the properties, use onInput to correct that:

JS
stream.create().input(orderPosQueue).queue().onInput(function (input) {
  stream.memory("orderpos").add(input.current());

  // We need the QTY as a property
  input.current().property("QTY").set(input.current().body().get("QTY").toInteger());
  stream.memory("itemstats").add(input.current());
});

Create

Memories are created with stream.create().memory(name) which returns a MemoryBuilder. This builder can create a HeapMemory, a QueueMemory or a TempQueueMemory.

Add

Adding a message to one or more Memories takes either place in onInput or onMessage from input.current() or stream.current(), respectively.

Adding in onInput:

JS
  stream.memory("orderpos").add(input.current());

Adding in onMessage:

JS
  stream.memory("orderpos").add(stream.current());

Access/Query

A Memory maintains the insertion order of the message. The first message is the oldest. Messages can be accessed at(index) or the convenience methods first() and last(). size() returns the number of messages held in the Memory. The forEach(callback) is a convenience method that iterated over the whole Memory.

JS
  //Equivalent
  var msg1 = stream.memory("critical").at(0);
  var msg2 = stream.memory("critical").first();

Messages can also be selected by a JMS message selector. This is a sequential operation that compares all messages against the selector. Therefore it is only recommended for small Memories. It returns a new Memory that contains the result set. This memory can then be used in further queries and so on.

JS
  if (stream.memory("warning").select("temp > 100").size() > 2) {
    // do something
  }

All Messages of a Memory can also be accessed by the forEach method:

JS
  grouped.forEach(function (message) {
    stream.log().info(message.property("ITEMNO").value().toString() + " = " +
                  message.property("QTY").value().toString());
  });

Large Memories should be indexed. To access it through an index, use .index(name).get(propvalue)which returns a new Memory with the result.

JS
  // Selects all order positions of a particular orderhead id
  var orderPosMem = stream.memory("orderpos").index("ORDERHEADID").get(orderHeadId);

Note: All Memories returned from a query are of type HeapMemory.

Remove

Removal of messages from a Memory takes place automatically if the Memory has a limit declared. Manual removal can take place with remove(index) or remove(selector). A Memory stores the insertion time of a message. To remove messages older than a specific time, use removeOlderThan(time).

Messages can also be removed via any associated index.

JS
  // Remove Message at index 10
  stream.memory("mymem").remove(10);
  // Remove all Messages with temp > 100
  stream.memory("warning").remove("temp > 100")
  // Remove all Messages older than 30 secs
  stream.memory("mymem").removeOlderThan(java.lang.System.currentTimeMillis()-30000);

  // Remove the messages with this orderhead id from orderhead Memory
  stream.memory("orderhead").index("ORDERHEADID").remove(orderHeadId);

Note: The returned Memory of this method is the Memory where the messages are removed from (this).

Values

To get all values of a particular property of all messages of a Memory (which may be a result of a query), use values(propName). It returns a java.util.List of values.

JS
  // Logs all sorted temps
  stream.log().info(stream.memory("all").sort("temp").values("temp"));

Aggregate

A Memory provides the aggregate functions min, max, sum, and average. They all work on a specific property. min and max return a message while sum and average require the property type as a number and return a double value.

JS
  stream.log().info("AVERAGE TEMP LAST 10 SECS=" +
                mem.average("temp") +
                ", MIN=" +
                mem.min("temp").property("temp").value().toObject() +
                ", MAX=" +
                mem.max("temp").property("temp").value().toObject());

Series

A Memory provides functions to detect a series of values of a property. .ascendingSeries() returns true if all values of this particular property are in ascending order, .descendingSeries() returns true if they are in descending order.

JS
  if (stream.memory("critical").ascendingSeries("temp")) {
    // Do something
  }

Group

To arrange identical values of a property into a group, use .group(propName). It returns a GroupResult which contains a set of Memories, each containing the messages for a unique property value. This set can be acquired with .result() which returns an array of Memory (Memory[]). GroupResult provides the aggregate functions min, max, average and sum that can be applied on the GroupResult and returns a new Memory.

JS
  var mem = stream.memory("itemstats").group("ITEMNO").sum("QTY");

Sort

To sort a Memory in ascending order over a specific property, use .sort(propName). It returns a new Memory with the sorted messages. To get the result in descending order, apply .reverse() on the result which returns another new Memory in descending order.

JS
  var mem = stream.memory("itemstats").group("ITEMNO").sum("QTY").sort("QTY").reverse();

Join

Since: 10.1.0

Performs an inner join with the right Memory over the named join Property name which must exist in the Messages of both Memories. The result is a Memory that contains Messages where each Message on the left side (this Memory) matches with a Message on the right side. The result Message will contain the left result Message enriched with all Properties of the right result Message.

If the join Property of the right Memory is not indexed, an Index will be created automatically before performing the join operation.

This example joins an order head memory with an order position memory over join property ORDERHEADID. The result is a Memory that contains all order positions of that ORDERHEADID.

JS
  // Join orderhead with the orderpos memory
  var orderPosMem = orderHeadMem.join(stream.memory("orderpos"), "ORDERHEADID");

If the name of the join Property names is different, use this method:

JS
  // Join orderhead with the orderpos memory
  var orderPosMem = orderHeadMem.join(stream.memory("orderpos"), "ORDERHEADID", "orderheadid");

Closing a Memory dynamically

Since: 10.1.0

A Memory, as every other component, is automatically closed when the Stream is stopped. A Memory can also be closed explicitly anytime with:

JS
  memory.close();

Example:

JS
  // Recreate the itemstats Memory
  stream.memory("itemstats").close();
  stream.create().memory("itemstats").heap().limit().time().sliding().seconds(secs);

Note that all non-permanent Memories (Heap/TempQueue) will lose their content on close while a permanent Memory (Queue) keeps its Messages in the queue.

Windowing

Since: 10.1.0

Windowing is a technique to divide a potentially infinite Message stream into finite slices to process it. Message streams are stored in Memories and this is where windows are defined.

SwiftMQ Streams supports 2 types of windows: tumbling and sliding windows, both can be limited by count or by time. It further supports session windows by Memory Groups.

When a window reaches its limit, Messages will retire and can be processed by an onRetire callback that can be registered at the particular Memory.

Tumbling Windows

A tumbling window retires all Messages of that window at once when the limit is reached. For a count-based window, this is when the count of the limit is reached, in a time-based window this is when the oldest Message time is less or equal to the limit's time.

This is useful to process Messages in batches, e.g. every 1000 Messages (count-based) or every 10 seconds (time-based).

Tumbling windows are usually processed in the onRetire callback (see the section below on this page).

Sliding Windows

A sliding window retires only those Messages that drop out of the window if the limit is reached. It does not create a new window but the window slides along the Message stream:

If the window is count-based with a limit of 1000 Messages, the oldest Message will retire so the Memory will contain exactly the last 1000 Messages added to the Memory. If the window is time-based with a limit of 10 seconds and a Message will be added, all Messages will retire that are older than 10 seconds so the Memory will contain exactly the Messages added within the last 10 seconds.

With sliding windows, the (usual single) Message that retires is not of interest but the Messages that remain in the Memory (current window). So with a sliding window, the processing is not in the onRetire callback but usually within onMessage.

Limits

Windows are defined by attaching Limits to a Memory.

onRetire Callback

When a Limit is reached, Messages will retire. To process these Messages, an onRetire callback needs to be registered at the Memory where the processing takes place:

JS
  // Callback function for onRetire to print the statistics
  function printStatistic(retired){
      // Generate statistics in the streams's log file
      stream.log().info("Item Statistics:");

      // Get and log the statistic data (items with summarized quantity in descending order)
      retired.group("ITEMNO").sum("QTY").sort("QTY").reverse().forEach(function(message){
        stream.log().info(message.property("ITEMNO").value().toString() + " = " +
                  message.property("QTY").value().toString());
      });
  }
  // Create itemstats Memory
  stream.create().memory("itemstats")
             .heap()
             .limit()
             .time()
             .tumbling()
             .seconds(10)
             .onRetire(printStatistic);

The callback can be registered either at the Memory itself or on one of the attached Limits. If the callback is registered at the Limit, it will be automatically registered at the Memory. So it is always a single callback, no matter where it will be registered.

The callback has one parameter - retired - where a new HeapMemory is passed that contains all retired Messages.

Event Time Processing

When a Message is added to a Memory, the Memory uses the current time as the store time and all time-based limits work on this time. This is called processing time.

However, if Messages should be processed at the time at which the event has occurred, it is called event time processing. In that case, the Message must contain a Property of type Long with the actual event time. If this Property is not set (e.g. it is part of the Message body), use onInput to extract and set it accordingly.

The event time Property name needs to be set at the Memory with:

JS
  stream.create().memory("mem").orderBy("evttime");

The Memory then orders the Messages by the values of the orderBy Property. This is also the case if the Messages arrive out of order but only within the current window. The first Message in the Memory is the oldest, the last Message is the youngest, based on event time.

Session Windows (Memory Groups)

A session window is a window that allows to group of different Messages from the stream for a specific session.

The Message stream may consist of activities of different users, customers, hosts, etc. A Property of the Messages in the stream contains the value where we like to group these Messages. The result is one Memory per distinct value, e.g. one Memory per distinct user if we group over a user id.

SwiftMQ Streams provides Memory Groups for this purpose. A Memory Group defines a group Property. Once a Message is added to a Memory Group, it takes care to create a new Memory for each distinct value of this Property and adds the Message to this Memory. So in the end, the stream of Messages is split into session windows (Memories). Further windows (tumbling, sliding) and onRetire callbacks can be defined for these Memories to process these Messages.

A Memory Group has one mandatory callback (MemoryCreateCallback) which is responsible to return a new Memory once it is called:

JS
  stream.create().memoryGroup("users", "userid").onCreate(function (key) {
      // A new value of the group is received and we create a Memory for it
      // with a 1 min tumbling time window
      stream.create().memory(key).heap().limit().time().tumbling().minutes(1).onRetire(function (retired) {
        // Do something with this window for this user id
      });

      // Return the new memory
      return stream.memory(key);
  })

New Messages are simply added to the Memory Group in onInput and the Memory Group takes care to create Memories for each distinct user id:

JS
  stream.create().input("useractivities").queue().onInput(function (input) {
    stream.memoryGroup("users").add(input.current());
  });

Timer and Check Limit

Limits of a Memory are automatically checked when a new Message is added to that Memory. So if you have a Count Limit defined, you don't need to do anything.

However, a Time Limit and a Time Unit Change Limit require that a Timer calls Memory.checkLimit() in the interval that is less than that of the Time Limit. For example, for the above itemstats Memory a Timer needs to be declared as follows:

JS
  // Create a timer that checks the limit of the itemstats memory every second
  stream.create().timer("stats").interval().seconds(1).onTimer(function (timer) {
      stream.memory(`itemstats`).checkLimit();
  });

Memory.checkLimit() also checks for the Inactivity Timeout.

Memory Groups require a Timer to call MemoryGroup.checkLimit() to remove expired Memories from the Memory Group if a Group Inactivity Timeout is defined. MemoryGroup.checkLimit() also calls Memory.checkLimit() on all Memories of this Memory Group automatically:

JS
  // Create a timer that calls checkLimit on the memory group each minute
  stream.create().timer("queues").interval().minutes(1).onTimer(function (timer) {
      stream.memoryGroup("queues").checkLimit();
  });

Outputs

Output messages can be sent from the onTimer and/or onMessage callback to queues or topics.

An Output is created from the stream:

JS
// Queue Output
stream.create().output(outputQueue).queue();

// Topic Output
stream.create().output(outputTopic).topic();

Queue Output

Sends a message to a queue.

JS
  stream.output(outputQueue)
        .send(stream.create()
                .message()
                .textMessage()
                .persistent()
                .body(msg));

Topic Output

Sends a message to a topic.

JS
  stream.output(outputTopic)
        .send(stream.create()
                .message()
                .textMessage()
                .persistent()
                .body(msg));

Short living Outputs, Request/Reply

In a request/reply scenario where Script works as a service to send replies, the received requests usually contain a replyTo address with a temporary queue name. In this case, it is not possible to pre-create the Outputs. Instead, this must be done in the onMessage callback by specifying null as the name of the Output. This marks the Output as short living and avoids that it is being registered in the Management Tree for the live Usage data.

The Output must be closed after the send has been performed.

To create the Output for a replyTo address, use the forAddress(..) method.

JS
  // A simple reply service
  stream.onMessage(function () {
    stream.create()
          .output(null)
          .forAddress(stream.current().replyTo())
          .send(stream.create()
          .message()
          .textMessage()
          .correlationId(stream.current().messageId())
          .body("RE: "+stream.current().body()))
          .close();
  });

Exceptions are suppressed when sending to a temporary queue to avoid the Stream is stopped in case the temporary queue does not exist anymore (since 10.2.0).

Purging unused Outputs

Since: 10.2.0

If a Stream sends to many different Outputs, they will count up until they are closed. It is possible to purge Outputs that have not been used for a time. This must be done timer-driven. All Outputs will be closed that were not used between 2 timer intervals.

JS
  // Create the Timer to purge unused Outputs
  stream.create().timer("purger").interval().minutes(purgeInterval).onTimer(function (timer) {
      stream.purgeOutputs();
  });

Closing an Output dynamically

Since: 10.1.0

An Output, as every other component, is automatically closed when the Stream is stopped. An Output can also be closed explicitly anytime with:

JS
  stream.output(outputTopic).close();

JDBC

To enrich messages with data from existing databases or to validate messages, the JDBCLookup facade can be used to query any JDBC database.

Note: Before a JDBCLookup can be used, install the JDBC driver classes. This is done by copying it into the directory kernel/sys_streams. Then restart the router.

A JDBCLookup is created from the stream:

JS
stream.create().jdbcLookup("accounts")
      .driver("com.mysql.jdbc.Driver")
      .url("jdbc:mysql://localhost:3306/test")
      .username("root")
      .password("changeme")
      .connect();

Any SQL Select can be used to query the database:

JS
var accounts = stream.jdbcLookup("accounts")
                 .query("select * from accounts");

It returns a HeapMemory which can be used to access the data in the onMessage callback. The returned Memory contains entries of the type Message for each selected row. For each row-column, a message property is created with the same name and the column value in the corresponding Java types.

If the query result is not large, it can be saved in a HeapMemory, indexed, and used in the onMessage callback:

JS
stream.create().jdbcLookup("accounts")
           .driver("com.mysql.jdbc.Driver")
           .url("jdbc:mysql://localhost:3306/test")
           .username("root")
           .password("changeme")
           .connect();
  var accounts = stream.jdbcLookup("accounts")
                       .query("select * from accounts")
                       .createIndex("accountid");
  // We don't need a JDBC connection anymore so close it to save resources
  stream.jdbcLookup("accounts").close();

  // Define onMessage
  stream.onMessage(function () {
    // Do something
    var discount = accounts.index("accountid").get(ACCOUNTID)
                                              .first()
                                              .property("discount")
                                              .value().toDouble();
    if (discount > 0)
    {
      // Do something else
    }
});

Of course, a JDBCLookup can be used to query it in onMessage:

JS
// Create the JDBCLookup
stream.create().jdbcLookup("accounts")
               .driver("com.mysql.jdbc.Driver")
               .url("jdbc:mysql://localhost:3306/test")
               .username("root")
               .password("changeme")
               .connect();

// Define onMessage
stream.onMessage(function () {
  // Do something
  var discount = stream.jdbcLookup("accounts")
                   .query("select discount from accounts where accountid = "+ACCOUNTID)
                   .first()
                   .property("discount")
                   .value().toDouble();
  if (discount > 0)
  {
    // Do something else
  }
});

eMail

To send eMails (e.g. as an alert to someone), a Mail Server must be created. This takes place only once for each different Mail Server. This Mail Server is then used to send eMails.

JS
stream.create()
      .mailserver("localhost")
      .username("admin@company.com")
      .password("secret")
      .connect();

To send an eMail, use a predefined MailServer, configure the eMail properties and send it.

JS
stream.mailserver("localhost")
      .email()
      .from("tempmonitor@swiftmq.com")
      .to("admin@company.com")
      .bcc("boss@company.com")
      .subject("Nuclear Powerplant Monitor - CRITICAL!")
      .body("Temp spike detected, last temps=" + stream.memory("critical").values("temp"))
      .send();

Closing a Mail Server dynamically

Since: 10.1.0

A Mail Server, like every other component, is automatically closed when the Stream is stopped. A Mail Server can also be closed explicitly anytime with:

JS
  stream.mailserver("localhost").close();

Time Support

Since: 10.2.0

The Streams Interface provides a convenience class TimeSupport with various time methods, e.g. getting the current time, format/parse time, getting seconds, minute, day, week, month, year from a time. This class is accessible from Streams scripts via variable time.

Examples:

JS
print("Second: "+time.second());
print("Minute: "+time.minute());
print("Minute-10: "+time.minute(time.currentTime(),-10));
print("Minute+10: "+time.minute(time.currentTime(),10));
print("Hour: "+time.hour());
print("Hour-5: "+time.hour(time.currentTime(),-5));
print("Hour+5: "+time.hour(time.currentTime(),5));
print("Day: "+time.day());
print("Day-3: "+time.day(time.currentTime(),-3));
print("Day+10: "+time.day(time.currentTime(),10));
print("Day Display: "+time.dayDisplayName());
print("Day Display-3: "+time.dayDisplayName(time.currentTime(),-3));
print("Day Display+10: "+time.dayDisplayName(time.currentTime(),10));
print("Week: "+time.week());
print("Week-10: "+time.week(time.currentTime(),-10));
print("Week+45: "+time.week(time.currentTime(),45));
print("Month: "+time.month());
print("Month-10: "+time.month(time.currentTime(),-10));
print("Month+16: "+time.month(time.currentTime(),16));
print("Month Display: "+time.monthDisplayName());
print("Month Display-10: "+time.monthDisplayName(time.currentTime(),-10));
print("Month Display+16: "+time.monthDisplayName(time.currentTime(),16));
print("Year: "+time.year());
print("Format: "+time.format(time.currentTime(), "dd/MM/yyyy"));
print("Parse: "+time.parse(time.format(time.currentTime(), "dd/MM/yyyy"), "dd/MM/yyyy"));

Output:

CODE
Second: 29
Minute: 10
Minute-10: 0
Minute+10: 20
Hour: 11
Hour-5: 6
Hour+5: 16
Day: 4
Day-3: 1
Day+10: 14
Day Display: Tuesday
Day Display-3: Saturday
Day Display+10: Friday
Week: 14
Week-10: 4
Week+45: 7
Month: 3
Month-10: 5
Month+16: 7
Month Display: April
Month Display-10: June
Month Display+16: August
Year: 2017
Format: 04/04/2017
Parse: 1491256800000

Stream Scripts

General Structure of a Script

The general structure of a script is:

  • Parse parameters.

  • Create Memories and define onRetire callbacks.

  • Create a Mail Server if you want to send eMails.

  • Create JDBC Lookups if you want to select data from databases.

  • Create Inputs and define onInput callbacks.

  • Create Timers and define onTimer callbacks.

  • Define onMessage callback.

Note: The following code snippets are from the sample "Nuclear Powerplant Monitor".

Parse Parameters

Any number of parameters can be configured in the Stream configuration. These parameters will be passed to the script as script-global variable parameters as a java.util.Map.

JS
  var inputQueue = parameters.require("input-queue");
  var mailHost = parameters.optional("mail-host", "localhost");
  var mailUser = parameters.require("mail-user");
  var mailPassword = parameters.require("mail-password");
  var mailFrom = parameters.require("mail-from");
  var mailTo = parameters.require("mail-to");
  var mailBcc = parameters.require("mail-bcc");

Create Memories

Create all Memories you want to use and define onRetire callbacks, if necessary.

JS
  // Create 3 Memories, one for monitoring (all), one to detect warning, one to detect critical conditions
  stream.create().memory("all").heap().limit().time().tumbling().seconds(10).onRetire(function (retired) {
      stream.log().info("AVERAGE TEMP LAST 10 SECS=" + retired.average("temp") +
          ", MIN=" + retired.min("temp").property("temp").value().toObject() +
          ", MAX=" + retired.max("temp").property("temp").value().toObject());
      stream.log().info(retired.values("temp"));
  });
  stream.create().memory("warning").heap().limit().count(2).sliding();
  stream.create().memory("critical").heap().limit().count(4).sliding();

Create Mail Server

If you want to send eMails in your script, you need to create a mail server.

JS
  // Create a mail server to send critical mails
  stream.create().mailserver(mailHost).username(mailUser).password(mailPassword).connect();

Create Inputs and define onInput Callbacks

Create all Inputs and define onInput callbacks, if needed.

JS
  // Create the Inputs
  stream.create().input(inputQueue).queue().onInput(function (input) {
      // We need a property "temp" instead of "TEMP"
      input.current().property("temp").set(input.current().property("TEMP").value().toInteger());
  });

Create Timers and define onTimer Callbacks

Create all Timers and define their onTimer callbacks.

JS
  // Create a timer to trigger the retirement of the "all" memory every 10 secs
  stream.create().timer("monitor").interval().seconds(10).onTimer(function (timer) {
      stream.memory("all").checkLimit();
  });

Define onMessage Callback (deprecated)

Note: An onMessage callback can be defined to have the whole processing in this callback. However, the best practice to process messages is to use onInput to add them to Memories, define the appropriate Limits (Windows) on the Memory, and process the Messages in the Memory's onRetire callback.

Define the onMessage callback that is invoked when a message is processed.

JS
  stream.onMessage(function () {

    // Store the message in all 3 memories
    stream.memory("all").add(stream.current());
    stream.memory("warning").add(stream.current());
    stream.memory("critical").add(stream.current());

    // Log a warning if the last 2 temps were higher than 100 degrees
    if (stream.memory("warning").select("temp > 100").size() == 2)
      stream.log().warning("LAST 2 TEMPS > 100!");

    // Spike condition is:
    // At least 4 messages in memory,
    // the first is more than 400 degrees,
    // each temp is greater than the temp before (ascending series),
    // the last temp is more than 1.5 times higher than the first.
    if (stream.memory("critical").size() == 4 &&
        stream.memory("critical").first().property("temp").value().toDouble() > 400 &&
        stream.memory("critical").ascendingSeries("temp") &&
        stream.memory("critical").last().property("temp").value().toDouble() >
        stream.memory("critical").first().property("temp").value().toDouble() * 1.5) {

      // Log an error
      stream.log().error("WE HAVE A SPIKE!!!");

      // Send an alert mail
      stream.mailserver(mailHost)
        .email()
          .from(mailFrom)
          .to(mailTo)
          .bcc(mailBcc)
          .subject("Nuclear Powerplant Monitor - CRITICAL!")
          .body("Temp spike detected, last temps=" + stream.memory("critical").values("temp"))
        .send();
    }

  });

Dependencies

Since: 11.0.0

If Streams depends on other Streams to work properly, e.g. a service Stream must be started before requests from other Streams can be sent, dependencies can be configured:

XML
<stream name="partitionprobe" enabled="true" script-file="/opt/swiftmq_1_0_0_dashboard/streams/partitionprobe.js">
  <dependencies>
    <dependency name="swiftmq.system.streamregistry"/>
  </dependencies>
  <parameters>
    <parameter name="unit" value="GB"/>
  </parameters>
</stream>

In the above example, Stream swiftmq.system.streamregistry will be automatically started (if not running) before Stream partitionprobe is started. When Stream swiftmq.system.streamregistry is stopped, Stream partitionprobe is stopped first.

Multiple dependencies can be configured, each in a fully qualified manner domain.package.stream. The Streams Swiftlet automatically builds a dependency graph internally and detects circular references. So only the direct dependencies should be specified.

Best Practice

Here are some best practices to organize your Streams. We use the warehouse sample.

1. Keep your Streams in a separate Directory outside the SwiftMQ Distribution

Store all Streams that belong together as a package or application in a separate directory.

2. Upload your Stream Scripts into the Stream Repository

Each SwiftMQ Router provides a Stream repository where you can upload your Stream scripts to a named repository. This is much easier than copying the scripts to each Router's host.

Go to the SwiftMQ Client's script directory and use the repojms command:

Upload the Stream scripts of your application:

CODE
  ./repojms smqp://localhost:4001 ConnectionFactory admin secret router1 \
        add warehouse /opt/swiftmq_1_0_0_dashboard/apps/warehouse/streams js
  accounting.js added to repository warehouse
  customer.js added to repository warehouse
  ordermonitor.js added to repository warehouse
  producer.js added to repository warehouse
  reordermonitor.js added to repository warehouse
  shipper.js added to repository warehouse
  shippingmonitor.js added to repository warehouse
  store.js added to repository warehouse

Also, upload all Stream scripts you depend on (in this example the base Streams of SwiftMQ Dashboard):

CODE
  ./repojms smqp://localhost:4001 ConnectionFactory admin secret router1 \
        add dashboard /opt/swiftmq_1_0_0_dashboard/streams js
  aggregator.js added to repository dashboard
  correlator.js added to repository dashboard
  entitylistcountprobe.js added to repository dashboard
  history.js added to repository dashboard
  inspector.js added to repository dashboard
  join.js added to repository dashboard
  mgmtinspector.js added to repository dashboard
  monitorpanel.js added to repository dashboard
  propertyprobe.js added to repository dashboard
  propertysumprobe.js added to repository dashboard
  thresholdmonitor.js added to repository dashboard
  timeline.js added to repository dashboard
  top.js added to repository dashboard

Repeat this for all Routers where you like to use your application (just change the router name in the above commands).

3. Provide an install.cli Script

For the fresh installation of your package, provide an install.cli script that defines all your Streams.

You can use a variable ${routername} instead of a static router name in your script. The variable is then automatically substituted with the router name from the last sr CLI command. So you can install your package on every router you want without touching the CLI script.

As script-file property set the URL of the repository in the form:

CODE
  repository:/<reponame>/<filename>

Excerpt from the install.cli:

CODE
  cc /sys$streams/domains
  new dashboard
  cc /sys$streams/domains/dashboard/packages
  new warehouse

  cc /sys$streams/domains/dashboard/packages/warehouse/streams
  new accounting script-file "repository:/warehouse/accounting.js"
  cc /sys$streams/domains/dashboard/packages/warehouse/streams/accounting/parameters
  new accounting-topic value stream_${routername}_dashboard_warehouse_accounting
  new customer-invoice-topic value stream_${routername}_dashboard_warehouse_customer_invoice
  cc /sys$streams/domains/dashboard/packages/warehouse/streams/accounting/dependencies
  new dashboard.warehouse.store
  ...

Resulting configuration in the routerconfig.xml:

JS
  <stream name="accounting" enabled="true" script-file="repository:/warehouse/accounting.js">
    <dependencies>
      <dependency name="dashboard.warehouse.store"/>
    </dependencies>
    <parameters>
      <parameter name="accounting-topic" value="stream_router1_dashboard_warehouse_accounting"/>
      <parameter name="customer-invoice-topic" value="stream_router1_dashboard_warehouse_customer_invoice"/>
    </parameters>
  </stream>

To invoke install.cli, go to the SwiftMQ Client's script directory and use CLI's execute command:

CODE
  bigmac$ ./cli

  Welcome to SwiftMQ!

  Username:
  Password:
  Trying to connect ... connected
  Router 'router1' is available for administration.
  Router 'router2' is available for administration.
  Type 'help' to get a list of available commands.
  > sr router1
  router1> execute /opt/swiftmq_1_0_0_dashboard/app/warehouse/streams/install.cli
  ...
  router1> sr router2
  router2> execute /opt/swiftmq_1_0_0_dashboard/app/warehouse/streams/install.cli
  ...

The above example installs the warehouse dashboard app on router1 and router2.

4. Provide an enable.cli Script

To enable (start) your Streams, provide an enable.cli script:

CODE
  cc /sys$streams/domains/dashboard/packages/warehouse/streams/store
  set enabled true
  cc /sys$streams/domains/dashboard/packages/warehouse/streams/accounting
  set enabled true
  cc /sys$streams/domains/dashboard/packages/warehouse/streams/trackingpanel
  set enabled true
  cc /sys$streams/domains/dashboard/packages/warehouse/streams/ordermonitor
  set enabled true
  ...

To invoke enable.cli, go to the SwiftMQ Client's script directory and use CLI's execute command:

CODE
  bigmac$ ./cli

  Welcome to SwiftMQ!

  Username:
  Password:
  Trying to connect ... connected
  Router 'router1' is available for administration.
  Router 'router2' is available for administration.
  Type 'help' to get a list of available commands.
  > sr router1
  router1> execute /opt/swiftmq_1_0_0_dashboard/app/warehouse/streams/enable.cli
  ...
  router1> sr router2
  router2> execute /opt/swiftmq_1_0_0_dashboard/app/warehouse/streams/enable.cli
  ...

Your Streams are now running.

5. Provide a disable.cli Script

To disable (stop) your Streams, provide a disable.cli script:

CODE
  cc /sys$streams/domains/dashboard/packages/warehouse/streams/store
  set enabled false
  cc /sys$streams/domains/dashboard/packages/warehouse/streams/accounting
  set enabled false
  cc /sys$streams/domains/dashboard/packages/warehouse/streams/trackingpanel
  set enabled false
  cc /sys$streams/domains/dashboard/packages/warehouse/streams/ordermonitor
  set enabled false
  ...

To invoke disable.cli, go to the SwiftMQ Client's script directory and use CLI's execute command:

CODE
  bigmac$ ./cli

  Welcome to SwiftMQ!

  Username:
  Password:
  Trying to connect ... connected
  Router 'router1' is available for administration.
  Router 'router2' is available for administration.
  Type 'help' to get a list of available commands.
  > sr router1
  router1> execute /opt/swiftmq_1_0_0_dashboard/app/warehouse/streams/disable.cli
  ...
  router1> sr router2
  router2> execute /opt/swiftmq_1_0_0_dashboard/app/warehouse/streams/disable.cli
  ...

Your Streams are now stopped.

6. Provide a remove.cli Script

To completely remove your Streams from a Router, provide a remove.cli script. This script should remove all Streams and all resources that your Streams may have created such as queues (e.g. as a persistent store for Memories).

CODE
  cc /sys$streams/domains/dashboard/packages
  delete warehouse

  cc /sys$queuemanager/queues
  delete state_dashboard_warehouse_custstat
  delete state_dashboard_warehouse_shipstat
  delete state_dashboard_warehouse_productstat
  delete state_dashboard_warehouse_ordertimeline
  delete state_dashboard_warehouse_shipmenttimeline
  delete state_dashboard_warehouse_producertimeline
  delete state_dashboard_warehouse_trackingpanel
  delete streams_dashboard_warehouse_custstat
  delete streams_dashboard_warehouse_shipstat
  delete streams_dashboard_warehouse_productstat
  delete streams_dashboard_warehouse_ordertimeline
  delete streams_dashboard_warehouse_ordertimeline_active
  delete streams_dashboard_warehouse_shipmenttimeline
  delete streams_dashboard_warehouse_shipmenttimeline_active
  delete streams_dashboard_warehouse_producertimeline
  delete streams_dashboard_warehouse_producertimeline_active
  delete streams_dashboard_warehouse_orderprobehistory
  delete streams_dashboard_warehouse_orderstatechainhistory
  delete streams_dashboard_warehouse_trackingpanel_input
  delete streams_dashboard_warehouse_trackingpanel_store
  save

To invoke remove.cli, go to the SwiftMQ Router's script directory and use CLI's execute command:

CODE
  bigmac$ ./cli

  Welcome to SwiftMQ!

  Username:
  Password:
  Trying to connect ... connected
  Router 'router1' is available for administration.
  Router 'router2' is available for administration.
  Type 'help' to get a list of available commands.
  > sr router1
  router1> execute /opt/swiftmq_1_0_0_dashboard/apps/warehouse/streams/remove.cli
  ...
  router1> sr router2
  router2> execute /opt/swiftmq_1_0_0_dashboard/apps/warehouse/streams/remove.cli
  ...

The above example removes the warehouse dashboard application including all resources from router1 and router2.

7. Optionally provide a upgrade.cli Script

If you want to automatically upgrade your package, e.g. Streams have been added, changed, or removed, provide a upgrade.cli script that does this job but preserves your data. Execute it in the same manner as above.

Advanced Configuration

Adding Streams to existing Queues and Topics

Wire Tapping

Since 10.2.0

You can wiretap any queue by adding a QueueWireTapInput to it. This is fully dynamic and doesn't require any configuration changes. Once a WireTap has been added to a queue, it sends a copy of each message sent to this queue also to the WireTap. There can be multiple WireTaps on a queue, each will receive a message copy. So any Stream can register its own WireTap with an optional message selector.

A WireTap can have multiple subscribers (Streams). With this, it is possible to scale up the processing of messages sent to the WireTap. Messages are distributed in a round-robin fashion to the subscribers of WireTap.

Note: A WireTap processes messages outside a transaction via memory buffering. If a subscriber is too slow, messages may not be sent to a subscriber if the memory buffer is full. So a WireTap is only useful if no transactional consistency is required and if a message lost can be tolerated, e.g. in statistic scenarios. For higher requirements see the next chapter.

Composite Queue, Topic Subscriber

If your data streams are based on topics, it is no problem to add a stream as another subscriber to that topic. If they are queue-based, convert this queue to a composite queue and bind 2 new queues to it, one as the original destination queue and one for your stream as Input.

Connecting Streams together

Streams can be connected very easily - via queues or topics. This is very common if large tasks are split into several smaller tasks and implemented as streams. One stream outputs its results into a queue where another stream picks it up and generates a result into another queue. This repeats until the final result is accomplished.

Scaling Streams locally and in Router Networks

Streams are single-threaded. Therefore the throughput of a single stream is somewhat limited and depends highly on the amount of analysis a stream has to perform. However, the same stream can run in multiple instances in parallel, consuming from the same queues/topics. This results in more threads and more throughput.

To make streams able to run in multiple instances, all queue names that are used as store for permanent Memories must be given as parameters.

Streams can be horizontally scaled on the same Router like this:

And, of course, in a Router network:

Using Streams with SwiftMQ HA Router

If your stream consumes persistent messages, stores them in permanent Memories and sends persistent messages to Outputs, you certainly need transactional consistency in case of a failover.

To guarantee this, you need to enable Multi-Queue Transaction Global Lock in the Queue Manager Swiftlet:

It ensures that consumption, storing and production in a stream is logged as one single transaction so no data is lost nor processed twice.

Managing Streams

To have a clear logical separation between Streams, we have introduced a new structure in SwiftMQ 10.1.0. It consists of domains, packages, and inside packages the Stream declaration itself.

Create a Domain

A domain is the top-level hierarchy of the Stream structure. It is usually a company name. The domain name swiftmq is reserved.

Go to Streams Swiftlet, select Domains, and click Create new Entity ...:

Create a Package

A domain can have an unlimited number of packages. A package is a logical unit to define Streams that belong together.

Go to the newly created domain, expand it and select Packages, and click Create new Entity ...:

Create a Stream

Go to the newly created package, expand it and select Streams, and click Create new Entity ...:

Property Name

Purpose

Name

A unique name for this script.

Enabled

Check it to start the script. Uncheck it stops it.

Restart Delay

Specifies the delay in milliseconds between Stream restarts. -1 disables it.

Maximum Restarts

Specifies the number of maximum restarts if a Stream throws an exception. -1 disables it.

Script File

Absolute path and name of the script file.

Script Language

The name of the language. See the info.log of the Streams Swiftlet to see which engines are available and what the name is. For example, if you use Jython, use "python" as name.

If the stream has parameters, expand the new stream, select Parameters, click Create new Entity ... and add the parameters.

Start/Stop a Stream

A stream is started if the Enabled property is checked.

In case there is an exception, please have a look at the stream's log file in the Log Sink Directory (see Log Swiftlet).

Note: All queues used by the stream (e.g. input queues, queues for permanent Memories) must be created separately before starting the script.

A stream is stopped if the Enabled property is unchecked.

If you have changed the script code, just disable and enable the script to get the new version.

Delete a Stream

Go to the stream entity, select it, and click Delete Entity will delete the stream.

Stream Monitoring

After a Stream has been enabled, a new entity under the Usage node of the Streams Swiftlet is created. This entity contains the live usage data of the stream such as the last time in microseconds it took in the onMessage callback, the stream processing rate in messages per second, and the total number of messages processed by the stream:

System Streams

Mail

Purpose

The Mail Stream receives TextMessages from an input queue, converts it to an eMail, sets default from, to, cc, bcc, and subject and sends the eMail to the configured mail server as plain text mail.

All Monitor Streams have a parameter mailout-queue. If set, they send their states as eMail to the corresponding recipients.

The intention behind the Mail Streams was to concentrate mail configurations in the Mail Streams and not to have that in each Monitor Stream. There can be multiple instances of Mail Streams, each with different configurations so Monitor Streams just send their state to a queue. Other applications can of course use the Mail Streams too.

Location

Domain

Package

Stream

swiftmq

mail

mailout.js

Parameters

Name

Mandatory

Default

Description

input-queue

No

streams_mailout

Input queue to receive mail messages. The queue is automatically created if it not exists.

servername

No

localhost

The hostname of the mail server.

username

Yes

None

The username of the mail account.

password

Yes

None

The password of the mail account.

default-from

No

None

Default "From" address.

default-to

No

None

Default "To" address.

default-cc

No

None

Default "CC" address.

default-bcc

No

None

Default "BCC" address.

default-subject

No

None

Default subject.

Message Scheduler

Purpose

The Message Scheduler Stream handels message schedules sent from JMS or AMQP clients if a message should be delivered with a delay to a destination (queue or topic). The Stream understands message schedule properties of the Scheduler Swiftlet's Message Scheduler and can be used as a drop-in replacement for it. It is able to handle much more schedules and is more scalable.

Location

Domain

Package

Stream

swiftmq

scheduler

messagescheduler.js

Parameters

Name

Mandatory

Default

Description

input-queue

No

streams_scheduler_input

Input queue to receive message schedule requests. The queue is automatically created if it not exists.

store-queue

No

streams_scheduler_store

Store queue to store message schedule requests. The queue is automatically created if it not exists.

delay-format

No

dd.MM.yyyy HH:mm:ss

Date/Time format to parse the delay out of the schedule requests.

interval

No

10

Timer interval in seconds in which the schedules are checked for expiration.

purge-interval

No

2

Timer interval in minutes after unused Outputs are purged.

max-batch-size

No

10000

Maximum number of expired schedules to be sent to their destinations in one batch.

Scheduling Messages from Clients

When a client (JMS or AMQP) receives a message but cannot process it yet, it usually wants to redeliver the message sometime later. For this, it creates a message schedule by adding properties to the message specifying the delivery time and sends it to the Message Scheduler Stream's input queue. The Stream stores the schedule and delivers the very same message to the destination at the time specified in the schedule properties.

The Stream understands 2 flavors of scheduling properties: New flavor and Scheduler Swiftlet flavor.

New Flavor

Property

Meaning

streams_scheduler_delay

The date and time when the message should be delivered. The format is specified in the Streams's parameter "delay-format".

streams_scheduler_destination

The name of the destination where the message should be delivered to.

streams_scheduler_destination_type

The type of the destination (queue or topic).

streams_scheduler_expiration

Optional. The final message expiration in milliseconds. This value is set on each message sent as the JMSExpiration.

Scheduler Swiftlet Flavor

Scheduler Swiftlet flavor is supported for the most relevant parts:

Property

Meaning

JMS_SWIFTMQ_SCHEDULER_DATE_FROM

The date when the message should be delivered. The format is yyyy-MM-dd or 'now'.

JMS_SWIFTMQ_SCHEDULER_TIME_EXPRESSION

The delivery time. Either "at HH:mm" or "at HH:mm:ss".

JMS_SWIFTMQ_SCHEDULER_DESTINATION

The name of the destination where the message should be delivered to.

JMS_SWIFTMQ_SCHEDULER_DESTINATION_TYPE

The type of the destination (queue or topic).

JMS_SWIFTMQ_SCHEDULER_EXPIRATION

Optional. The final message expiration in milliseconds. This value is set on each message sent as the JMSExpiration.

How to scale this Stream

One instance of the Stream is able to handle thousands of schedules with ease. If that is not enough, the Stream can be scaled as follows:

Multiple Instances on the same Input Queue

With this option, you create multiple instances of the Stream and specify the same "input-queue" parameter but for each instance a different "store-queue". The Streams act as concurrent consumers on the input queue but then work independently of each other in separate threads. This gives more throughput.

Multiple Instances on different Input Queues

With this option, you create multiple instances of the Stream and specify a different "input-queue" parameter and a different "store-queue" for each instance. This may be useful if a dedicated Stream should handle schedules for dedicated applications. In this case, the application needs to send the schedules to the input queue of their dedicated Stream.

Multiple Instances on the same and different Input Queues

This is a mix of the above options with dedicated Streams where some of them might have multiple instances listening on the same input queue.

Route Announcer

Stream Name

CODE
  stream_{routername}_routeannouncer

Purpose

The Stream observes all static routes (sys$routing/static-routes) versus all active routes (sys$routing/usage/routing-table). When it detects that a static route doesn't have a corresponding active route in the routing table, the remote Router is unreachable. If a remote Router reconnects and provides an active route, it becomes reachable. In both cases, the Stream sends an event to the Stream's topic.

If a remote Router should not be observed, the corresponding static route needs to be deleted. And, vice versa, if it should be observed, a static route needs to be created.

Location

Domain

Package

Stream

swiftmq

system

routeannouncer.js

Parameters

Name

Mandatory

Default

Description

topic

No

Stream Name

Topic where the Streams sends its events.

Events

All events are sent as TextMessage.

Stream Registry

Stream Name

CODE
  stream_{routername}_streamregistry

Purpose

This Stream provides a registry for other Streams to register their availability/unavailability.

Location

Domain

Package

Stream

swiftmq

system

streamregistry.js

Parameters

Name

Mandatory

Default

Description

topic

No

Stream Name

Topic where the Streams sends its events.

Events

All events are sent as TextMessage.

Registry Requests

Streams that want to be enlisted in the Stream registry send registry requests to the Stream registry topic.

Stream Monitor

Purpose

This Stream connects to the Stream registry and for each Stream it sends an init request and subscribes for subsequent updates. The content is logged into the Stream monitor's log file.

Caution! This Stream is for debugging purposes only and therefore disabled by default. Enabling it when running SwiftMQ Dashboard, for example, will create a large amount of log output.

Location

Domain

Package

Stream

swiftmq

system

streammonitor.js

Parameters

Name

Mandatory

Default

Description

registry-topic

No

stream_{routername}_streamregistry

Name of the Stream registry topic.

Stream Repository

Purpose

This Stream provides a Stream script repository to a SwiftMQ Router. Stream scripts can be uploaded into it and then be loaded and executed out of the repository. Upload and management of the repositories take place remotely by client programs repojms resp. repoamqp which is part of the SwiftMQ distribution.

Repositories are named and independent of each other.

In contrast to store Stream scripts locally on a Router's host, uploading them from remote into a Router repository provides a way to distribute Stream scripts in a Router Network very easily through the above clients.

Repositories are stored in a persistent queue inside the Router's persistent store. For SwiftMQ HA Routers the repositories are available after a failover so they only need to be uploaded to the ACTIVE instance.

Location

Domain

Package

Stream

swiftmq

system

streamrepository.js

Parameters

Name

Mandatory

Default

Description

input-queue

No

streamrepo

Queue where the Stream receives requests.

store-queue

No

streamrepo_store

Queue where the Stream stores its repositories.

Clients

The repository clients are located under:

XML
  <swiftmq-dir>/scripts/<platform>

A JMS-based repository client called repojms and a AMQP based client called repoamqp are available. Both differ only in their connection parameters. The commands are the same.

Load and Execute a Stream Script from a Repository

This is done by defining a repository URL in the script-file property of a Stream configuration. The URL has the following format:

CODE
  repository:/<reponame>/<filename>

Example configuration:

XML
  <stream name="wh_accounting" enabled="true" script-file="repository:/warehouse/accounting.js">
    <dependencies>
      <dependency name="swiftmq.samples.wh_store"/>
    </dependencies>
    <parameters>
      <parameter name="accounting-topic" value="stream_router1_sample_wh_accounting"/>
      <parameter name="customer-invoice-topic" value="stream_router1_sample_wh_customer_invoice"/>
    </parameters>
  </stream>

Jobs

Overview

The Streams Swiftlet registers a job in job group Streams at the Scheduler Swiftlet:

This job can be scheduled via the Scheduler Swiftlet to run at specific times or in intervals, based on calendars and so on.

Stream Activator

The Stream Activator job activates a stream on job start and deactivates it on job stop. The name of the stream must be specified as a parameter. The job requires setting a maximum runtime to deactivate the stream. Otherwise, it would be active forever.

The implementation of the Stream Activator job is quite simple. It sets the enabled attribute to true on job start and to false on job stop. Therefore, a stream that should run as a job should be initially disabled.

Parameter

Mandatory

Description

Domain Name

Yes

Name of the domain.

Package Name

Yes

Name of the package.

Stream Name

Yes

Name of the stream.

Examples

Note: All SwiftMQ Streams samples install in domain swiftmq, package samples.

Nuclear Power Plant Monitor

Location of this example: samples/streams/nuclear

This example monitors temperatures generated from sensors of a nuclear power plant and generates monitor, warning, and critical messages. It is an implementation of this article in Dzone which uses Esper, a CEP engine.

The example contains the following scripts:

Script Name

Purpose

Parameters

tempmonitor.js

Monitors temperatures and generates monitor, warning, critical messages.

input-queue, a set of mail properties

tempproducer.js

Sends temperature messages in an interval.

output-queue

To install the example, go to the Router's working dir scripts and invoke CLI. Perform

CODE
        sr <routername>
        execute ../samples/streams/nuclear/install.cli

The example is then installed but the scripts are disabled.

To remove the example, go to the Router's working dir scripts and invoke CLI. Perform

CODE
        sr <routername>
        execute ../samples/streams/nuclear/remove.cli

The example is then removed.

Order Collector

Location of this example: samples/streams/ordercollector

This example receives order head messages from one queue and order position messages from another queue. Both come in serial order. Each order head message contains the number of order positions. When all order positions of an order head are received, it generates a text message with an XML body that contains the complete order.

The Order Collector is an implementation of the Enterprise Integration Pattern Aggregator.

The example contains the following scripts:

Script Name

Purpose

Parameters

ordercollector.js

Collects orders.

orderhead-queue, orderpos-queue, output-queue

orderheadproducer.js

Sends order head messages in an 5 sec interval.

output-queue

orderposproducer.js

Sends order position messages in an 1 sec interval.

output-queue

To install the example, go to the Router's working dir scripts and invoke CLI. Perform

CODE
      sr <routername>
      execute ../samples/streams/ordercollector/install.cli

The example is then installed but the scripts are disabled.

To remove the example, go to the Router's working dir scripts and invoke CLI. Perform

CODE
      sr <routername>
      execute ../samples/streams/ordercollector/remove.cli

The example is then removed.

Echo Service

Location of this example: samples/streams/management

This example is a simple echo service that sends a reply to each request it receives. It creates a temporary queue that acts as Input and registers it in JNDI under the name requestQueue. JMS clients can look it up and can send TextMessages as requests and will receive a reply. Please use samples/router_network/P2PRequestor to test it.

The example contains the following script:

Script Name

Purpose

Parameters

replier.js

Echo Service.

none

To install the example, go to the Router's working dir scripts and invoke CLI. Perform

CODE
      sr <routername>
      execute ../samples/streams/replier/install.cli

The example is then installed but the scripts are disabled.

To remove the example, go to the Router's working dir scripts and invoke CLI. Perform

CODE
      sr <routername>
      execute ../samples/streams/replier/remove.cli

The example is then removed.

Management: Queue Status

Location of this example: samples/streams/management

This example creates a ManagementInput on sys$queuemanager/usage and receives add/change/remove notifications. It stores these events in a MemoryGroup and prints the average message count per minute for each queue in use. The example demonstrates the use of ManagementInputs and how to use MemoryGroups for session windowing.

The example contains the following script:

Script Name

Purpose

Parameters

queuestats.js

Queue Statistics.

none

To install the example, go to the Router's working dir scripts and invoke CLI. Perform

CODE
      sr <routername>
      execute ../samples/streams/management/install.cli

The example is then installed but the scripts are disabled.

To remove the example, go to the Router's working dir scripts and invoke CLI. Perform

CODE
      sr <routername>
      execute ../samples/streams/management/remove.cli

The example is then removed.

Configuration

The configuration of the Streams Swiftlet is defined within the element

XML
  <swiftlet name="sys$streams" .../>

of the router's configuration file.

Attributes of Element "swiftlet"

Definition

Attribute

Type

Mandatory

Description

collect-interval

java.lang.Long

No

Interval for collecting Stream usage information

stream-grant-predicates

java.lang.String

No

SQL-Like Predicates to grant Stream Topic access

Values

Attribute

Values

collect-interval

Default: 1000

stream-grant-predicates

Default: stream\_%

Element List "domains", Parent Element: "swiftlet"

Stream Domain Names. This element list contains zero or more "domain" elements with this template definition:

Definition

Attribute

Type

Mandatory

Description

name

java.lang.String

Yes

Name of this Domain

Element List "packages", Parent Element: "domain"

Stream Packages. This element list contains zero or more "package" elements with this template definition:

Definition

Attribute

Type

Mandatory

Description

name

java.lang.String

Yes

Name of this Package

Element List "streams", Parent Element: "package"

Complex Event Streams. This element list contains zero or more "stream" elements with this template definition:

Definition

Attribute

Type

Mandatory

Description

name

java.lang.String

Yes

Name of this Stream

restart-delay

java.lang.Long

No

Delay in ms to restart the Stream on an exception

restart-max

java.lang.Integer

No

Maximum number of restart attempts

script-language

java.lang.String

No

Name of the Scripting Language

script-file

java.lang.String

Yes

Name of the Script File

enabled

java.lang.Boolean

No

Enables/Disables this Stream

Values

Attribute

Values

restart-delay

Default: -1

restart-max

Default: -1

script-language

Default: JavaScript

script-file

enabled

Default: false

Element List "parameters", Parent Element: "stream"

Parameters. This element list contains zero or more "parameter" elements with this template definition:

Definition

Attribute

Type

Mandatory

Description

name

java.lang.String

Yes

Name of this Parameter

value

java.lang.String

Yes

Value

Values

Attribute

Values

value

Element List "dependencies", Parent Element: "stream"

Dependencies from other Streams. This element list contains zero or more "dependency" elements with this template definition:

Definition

Attribute

Type

Mandatory

Description

name

java.lang.String

Yes

Name of this Dependency

Element List "usage", Parent Element: "swiftlet"

Active Streams. This element list contains zero or more "stream" elements with this template definition:

Definition

Attribute

Type

Mandatory

Description

name

java.lang.String

Yes

Name of this Active Stream

starttime

java.lang.String

No

Start Time of this Stream

stream-last-onmessage-time

java.lang.Integer

No

Last Time in µs it took in onMessage

stream-processing-rate

java.lang.Integer

No

Messages/Sec processed by this Stream

stream-total-processed

java.lang.Integer

No

Total Number of Messages processed by this Stream

Values

Attribute

Values

starttime

stream-last-onmessage-time

Default: 0

stream-processing-rate

Default: 0

stream-total-processed

Default: 0

Element List "inputs", Parent Element: "stream"

Inputs. This element list contains zero or more "input" elements with this template definition:

Definition

Attribute

Type

Mandatory

Description

name

java.lang.String

Yes

Name of this Input

01-type

java.lang.String

No

Type of this Input

destinationname

java.lang.String

No

Destination Name

input-processing-rate

java.lang.Integer

No

Messages/Sec processed by this Input

input-total-processed

java.lang.Integer

No

Total Number of Messages processed by this Input

Values

Attribute

Values

01-type

destinationname

input-processing-rate

Default: 0

input-total-processed

Default: 0

Element List "timers", Parent Element: "stream"

Timers. This element list contains zero or more "timer" elements with this template definition:

Definition

Attribute

Type

Mandatory

Description

name

java.lang.String

Yes

Name of this Timer

timer-last-ontimer-time

java.lang.Integer

No

Last Time in µs it took in onTimer

timer-type

java.lang.String

No

Type of this Timer

started

java.lang.Boolean

No

Timer Started?

Values

Attribute

Values

timer-last-ontimer-time

Default: 0

timer-type

started

Default: false

Element List "memories", Parent Element: "stream"

Memories. This element list contains zero or more "memory" elements with this template definition:

Definition

Attribute

Type

Mandatory

Description

name

java.lang.String

Yes

Name of this Memory

memory-type

java.lang.String

No

Type of this Memory

size

java.lang.Integer

No

Size of this Memory in Messages

Values

Attribute

Values

memory-type

size

Default: 0

Element List "jdbclookups", Parent Element: "stream"

JDBCLookups. This element list contains zero or more "jdbclookup" elements with this template definition:

Definition

Attribute

Type

Mandatory

Description

name

java.lang.String

Yes

Name of this JDBCLookup

last-query-time

java.lang.Integer

No

Last Time in µs it took in query

Values

Attribute

Values

last-query-time

Default: 0

Element List "mailservers", Parent Element: "stream"

Mail Servers. This element list contains zero or more "mailserver" elements with this template definition:

Definition

Attribute

Type

Mandatory

Description

name

java.lang.String

Yes

Name of this Mail Server

emails-sent

java.lang.Integer

No

Number of eMail sent

Values

Attribute

Values

emails-sent

Default: 0

Element List "tempqueues", Parent Element: "stream"

Temp Queues. This element list contains zero or more "tempqueue" elements with this template definition:

Definition

Attribute

Type

Mandatory

Description

name

java.lang.String

Yes

Name of this Temp Queue

queuename

java.lang.String

No

Name of the temp Queue

Values

Attribute

Values

queuename

Element List "outputs", Parent Element: "stream"

Outputs. This element list contains zero or more "output" elements with this template definition:

Definition

Attribute

Type

Mandatory

Description

name

java.lang.String

Yes

Name of this Output

01-type

java.lang.String

No

Type of this Output

messages-sent

java.lang.Integer

No

Number of Messages sent to this Output

Values

Attribute

Values

01-type

messages-sent

Default: 0

Element List "dashboard", Parent Element: "stream"

Dashboard. This element list contains zero or more "widget" elements with this template definition:

Definition

Attribute

Type

Mandatory

Description

name

java.lang.String

Yes

Name of this Widget

description

java.lang.String

No

Widget Description

type

java.lang.String

No

Widget Type

Values

Attribute

Values

description

type

Choice: statistic monitor

JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.