Event Streams

ActyxOS was designed to facilitate building multi-node applications. As such it offers a powerful method for communication between nodes. These are so called event streams. In this guide we will go through the basics of event streams, how event streams are modelled in ActyxOS and how you can publish to, query and subscribe to event streams.

Background

Events are facts from the past that can represent anything you would like them to: a robot arriving at a location, a person signing off from work or a button click. Event streams are simply ordered chains of events. Event streams are very valuable because they allow you to keep track of what is happening and make decisions accordingly–especially in other parts of the system.

Consider a robot moving from place to place. This robot may publish an event stream about itself so that other machines or humans can make decisions accordingly:

[
{
source: 'robot1',
timestamp: 1568807572258000,
payload: { locationChangedTo: 'loading-bay-1' }
},
{
source: 'robot1',
timestamp: 1568807936074000,
payload: { locationChangedTo: 'charging-station-39' }
}
]

Technically, event streaming is a message-driven, publish and subscribe mechanism that enables asynchronous communication between several components, as upstream components transfer processing to downstream components by publishing domain events that are then consumed downstream. Event streaming enables event dissemination throughout the entire system.

With most event streaming technologies events are ephemeral, meaning that the events are only available for a short period of time. After this period, interested parties will no longer be able to find or read this event. Persistent event streams expand on this by automatically persisting published events (to disk, for example). This means interested consumers can access any past event at any time.

ActyxOS's Event Services offers your APIs for creating and leveraging persistent event streams.

Overview

The ActyxOS Event Service is designed as a decentralized transaction log and has been engineered for availability, partition tolerance, and eventual consistency. It offers low latency and high throughput with a small footprint allowing for use on mobile and embedded devices.

It is is a core component of ActyxOS as it allows events published by apps to be streamed in real-time to other subscribed apps. Every subscribed app on any device will eventually receive published events (at least once delivery).

Communication between the client and the Event Service is done with a simple, high-performance, language agnostic HTTP API or dedicated, language-specific SDKs.

Basics

Events and streams

The following image shows a simple event stream generated by a device running ActyxOS.

As can be seen, the device is uniquely identified by its SourceId. Each event generated by that device will contain that specific SourceID. Each event also has a specific and unique offset. The offset denotes at which point in the stream the specific event is located.

In ActyxOS, event streams are always defined by a three-tuple containing the following properties:

  • SourceId, i.e. the device on which this stream is generated
  • Semantics, i.e. the namespaced meaning of the stream
  • Name, i.e. the name of the stream or stream generator

This information is encoded in an event envelope that wraps your user-defined payload. Here is an example:

{
"source": "b24486fe-c68b-11e9-aa8c-2a2ae2dbcce4",
"semantics": "com.actyx.os.documentation.example",
"name": "Streaming Silvia",
"offset": 42,
"payload": {
"foo": "bar",
"foos": ["bar1", "bar2"],
"bars": {
"foo": "bar",
"bar": "foo"
}
}
}

In the above example, the event

  • was generated on a node with source id b24486fe-c68b-11e9-aa8c-2a2ae2dbcce4,
  • has the semantincs com.actyx.os.documentation.example,
  • is part of a stream called Streaming Silvia; and,
  • finds itself at offset 42 of that stream.

Offsets and partitions

Offsets play a very important role in ActyxOS. As it is designed for high-availability, ActyxOS continues to work, even in the face of network partitions. This means, however, that events generated by a device in a different network partition, will arrive only once the partition has been healed.

Offsets allow you - as a client - to keep track of your progress in reading a stream as a subscriber. In order to know if new information may be available, you can simply compare the offset you know with the largest one of the stream.

Event stream persistence

As opposed to some streaming technologies, events published through the ActyxOS Event Service are not ephemeral. Quite the opposite in fact: published events are automatically persisted locally and in your entire ActyxOS swarm.

This means you can - at any time - access events you or others have published in the past. Combining this ability with some functionality to remember offsets, allows you to ensure you get all events ever emitted on a stream, even long after they were originally emitted.

note

The Event Service API is designed with this very notion in mind. As you will see below, queries can be parameterized using from- and to-offsets.

Usage

The Event Service is one of the tools ActyxOS offers you to build distributed, multi-device apps. Using the Event Service means publishing to and subscribing to event streams. This is done by interacting with the Event Service's HTTP API accessible at http://localhost:4454/api/v1.

info

The API can only be accessed locally, i.e. at localhost. It is not meant for being accessed from other nodes. The dissemination of events to other nodes happens automatically in the background. To access events from other nodes, simply add respective subscriptions.

Querying event streams

The HTTP API allows you to query event streams by sending HTTP POST requests to the http://localhost:4454/api/v1/events/query endpoint. The request body must contain a JSON object specifying your subscription request.

Please check out the relevant API reference for more information and examples.

Subscribing to event streams

Instead of querying an existing set of events, you can also subscribe to event streams. This is done with HTTP POST requests to the http://localhost:4454/api/v1/events/subscribe endpoint and allows you to, potentially, retrieve events from the past and, especially, automatically receive new events as they are published.

Check out the API reference for more information and some common examples.

Publishing to event streams

Publishing is how new events are generated in your ActyxOS swarm. You can publish events by sending HTTP POST requests to the http://localhost:4454/api/v1/events/publish endpoint.

Please refer to the API reference for more information and examples.

Managing internal state with offsets

All event streams emitted from one edge device are merged into a single per-device stream that is managed by the Event Service. Each event thus has its unique position within that stream, also called its offset, that is part of the event metadata. When consuming event streams, these are always delivered in contiguously ascending offset order (strictly monotonic and gapless) for each device — the ordering of streams coming from different devices is governed by the ordering parameter given to the event query.

Therefore, the highest event offset seen from each device serves as a cursor for identifying the replay position within that device’s stream of events. When an app is restarted, it may want to resume consuming event streams from the last known position in the same vein as for example resuming a Kafka consumer.

In contrast to a Kafka stream, an ActyxOS event stream consists of multiple per-device streams, hence the single cursor needs to be replaced with one offset per device, the so-called offset map.

This map is passed into the event query and subscription endpoints, either as a starting point, declaring which parts of the stream we have already seen, or as a termination condition, saying up to which point we want the stream to be delivered.

note

When persisting computed state in an event consumer, it is good practice to store the offset map in the same place, for example in the same database transaction, so that processing may later be resumed from exactly the right place. This yields effectively exact-once delivery of events.

Example

Let's run through an example using the hypothetical robot described above. This robot sends events about its location changes. The app running on the robot would have a piece of code as follows for generating relevant event objects (more information about the exact API check out the API reference).

function mkPositionChangedEvent(newPosition: string): Event {
return {
semantics: "com.robot-maker-ltd.positionChange",
name: "robot1",
payload: { locationChangedTo: newPosition }
}
}

Whenever the robot has changed its position it would then publish the relevant events.

function publishEvent(event: Event): void {
return fetch("http://localhost:4454/api/v1/events/publish", {
method: "POST",
body: JSON.stringify({ data: [event] }),
headers: { "Content-Type": "application/json" }
});
}
// This function would be provided to call to a higher-level controller
function onChangedPosition(newPosition: string): void {
const event = mkChangedPositionEvent(newPosition);
publishEvent(event);
}
note

We did not have to specify the source for this event publication. That is because the local Event Service (at http://localhost:4454), to which we are publishing the event, will automatically add the nodes's source ID.

Because ActyxOS automatically disseminates events, on a second node, at any time, you could subscribe to that specific event stream, and receive all events published by the robot.

fetch("http://localhost:4454/api/v1/events/subscribe", {
method: "POST",
body: JSON.stringify({
subscriptions: [{ name: "robot1" }]
}),
headers: { "Content-Type": "application/json" }
})
.then(r => r.body.getReader())
.then(reader => {
const dec = new TextDecoder();
const loop = () => {
reader.read().then(
chunk => {
if (!chunk.done) {
console.log("Received event from robot:", JSON.parse(dec.decode(chunk.value)));
// Result:
// {
// "stream": {
// "semantics": "com.robot-maker-ltd.positionChange",
// "name": "robot1",
// "source": "db66a77f"
// },
// "timestamp": 21323,
// "lamport": 323,
// "offset": 34,
// "payload": {
// "locationChangedTo": "loading-bay-5"
// }
// }
//
loop();
}
}
);
};
loop();
});
note

Did you notice how in the last example we specified the event stream's name in our subscription? This is important because we are accessing the stream from another node and need to tell the Event Service what we are interested in. The ActyxOS Event Service provides powerful subscription mechanisms based on the stream source, semantics and name. Check out the API reference for more information.

As noted above, the ActyxOS Event Service also persists the events upon publication. This means that we can not only access current and future events, but also events from the past. This is an important property for event sourcing; one of the most popular methods for building apps on Actyx. Check out Event Sourcing for an introduction to event sourcing.