Skip to main content

Event Service HTTP API

This is the reference documentation for the ActyxOS Event API.

The Event API provides local access to the Event Service, allowing you to

It is reachable at the following base URI: http://localhost:4454/api/v1/events.

Pretty printed JSON

JSON used in the examples below is pretty-printed. This is only to make it more readable here. In reality, the Event Service API does not return pretty-printed JSON.

Publish events#

You can publish new events using the publish endpoint.

Request#

  • Endpoint: http://localhost:4454/api/v1/events/publish
  • HTTP method: POST
  • HTTP headers:
    • Content-Type, must be application/json, default: application/json

The request body must contain a JSON object with the following structure:

{
"data": [
{
"semantics": "<string: semantics>",
"name": "<string: name>",
"payload": "<object>"
},
{
"semantics": "<string: semantics>",
"name": "<string: name>",
"payload": "<object>"
}
]
}

You use the request body to provide the Event Service with the stream semantics, stream name and payload of the events to be published.

Response#

The response will provide feedback using HTTP status codes, with 201 signifying that the request was successfully processed and the events published.

Example#

See the following example using cURL:

API Request
echo '{
"data": [
{
"semantics": "com.actyx.examples.temperature",
"name": "temperatureSensor1",
"payload": {
"foo": [1, 3, 4],
"bar": { "a": 1, "b": 103 }
}
},
{
"semantics": "com.actyx.examples.temperature",
"name": "temperatureSensor2",
"payload": {
"foo": [3, 1, 1],
"bar": { "a": 13, "b": 48 }
}
}
]
}
'\
| curl \
-X "POST" \
-d @- \
-H "Content-Type: application/json" \
http://localhost:4454/api/v1/events/publish
API Response
HTTP 201 | 500 | 400 with an invalid body

Get information about known offsets#

You can get information from the Event Service about known offsets, i.e. what the event service believes to be the last offset for each stream.

Request#

  • Endpoint: http://localhost:4454/api/v1/events/offsets
  • HTTP method: GET
  • HTTP headers:
    • Content-Type, must be application/json, default: application/json
    • (optional) Accept, must be application/json, default: application/json

There is no request body.

Response#

  • HTTP headers:
    • Content-Type is application/json
    • Cache-Control is no-store (to get fresh data and not use cache slots)

The response body will contain a JSON object of the following structure:

{
"<string: sourceID>": "<integer: last-known-offset>",
"<string: sourceID>": "<integer: last-known-offset>"
}

Example#

See the following example using cURL:

API Request
curl \
-s -X "GET" \
-H "Accept: application/json" \
http://localhost:4454/api/v1/events/offsets | jq .
API Respose
{
"JAggoXUT7Bw": 57,
"a263bad7cMr": 60
}
Source ID

Note that in order to follow the examples below, you need to exchange the source ID from the example with one of the IDs you got as a response from the offsets query.

Query event streams#

You can query the Event Service for bounded sets of events in one or more event streams.

Request#

  • Endpoint: http://localhost:4454/api/v1/events/query
  • HTTP method: POST
  • HTTP headers:
    • Content-Type, must be application/json, default: application/json
    • (optional) Accept, must be application/x-ndjson, default: application/x-ndjson

The request body must contain a JSON object with the following structure:

{
"lowerBound": {
"<string: sourceID>": "<integer: exclusive-lower-bound, e.g. 34>",
"<string: sourceID>": "<integer: exclusive-lower-bound, e.g. -1>"
},
"upperBound": {
"<string: sourceID>": "<integer: inclusive-upper-bound, e.g. 49>",
"<string: sourceID>": "<integer: inclusive-upper-bound, e.g. 101>"
},
"subscriptions": [
{
"semantics": "<string: semantics | undefined>",
"name": "<string: name | undefined>",
"source": "<string: sourceID | undefined>"
},
{
"semantics": "<string: semantics | undefined>",
"name": "<string: name | undefined>",
"source": "<string: sourceID | undefined>"
},
{
"semantics": "<string: semantics | undefined>",
"name": "<string: name | undefined>",
"source": "<string: sourceID | undefined>"
}
],
"order": "<string: 'lamport' | 'lamport-reverse' | 'source-ordered'>"
}

You use the request body to specify the details of your request as documented in the following.

Optional: Lower bound for offsets (lowerBound)#

The lowerBound object specifies the lower bound offset for each source id with the numbers being exclusive. i.e. a lowerBound specification of 34 means the event service will return events with offsets > 34.

The lowerBound is optional. If none is set for one, multiple or all subscribed sources, the Event Store will assume a lower bound offset of -1, i.e. the beginning.

Required: Upper bounds for offsets (upperBound)#

The upperBound object specifies the upper bound offset for each source id with the numbers being inclusive. i.e. an upperBound specification of 34 means the event service will return events with offsets <= 34.

The upperBound is required. For every subscribed source where no upper bound offset it set, the result will be empty.

Required: Subscriptions (subscriptions)#

The subscriptions object specifies which event streams should be queried, with streams being specified with the source, semantics and name 3-tuple. You may not provide some or all of these properties to specify wildcard.

Not specifying the source of a stream does not make sense in this context since no events will be returned for sources without a defined upper bound.

Required: Ordering (order)#

The order object specifies in which order the events should be returned to the caller. There are three options, one of which must be specified:

  1. lamport: ascending order according to events' lamport timestamp
  2. lamport-reverse: descending order according to events' lamport timestamp
  3. source-ordered: ascending order according to events' lamport timestamp per source, with no inter-source ordering guarantees

Response#

  • HTTP headers:
    • Content-Type is application/x-ndjson
    • Transfer-Encoding is chunked

The response will be a stream of <CR><LF>-delimited event payloads of the following structure:

{
"stream": {
"semantics": "<string: semantics>",
"name": "<string: name>",
"source": "<string: sourceID>"
},
"timestamp": "<integer>", // unix epoch in microseconds
"lamport": "<integer>",
"offset": "<integer>",
"payload": "<object>"
}

If an error is encountered while processing the stream of events, the stream will terminate with a final error JSON object with the following structure:

{
"error": "message",
"errorCode": 500
}

Example#

See the following example using cURL:

API Request
echo '
{
"lowerBound": {
"JAggoXUT7Bw": 14
},
"upperBound": {
"JAggoXUT7Bw": 47
},
"subscriptions": [
{
"semantics": "com.actyx.examples.temperature",
"name": "temperatureSensor1",
"source": "JAggoXUT7Bw"
},
{
"semantics": "com.actyx.examples.temperature",
"name": "temperatureSensor1",
"source": "JAggoXUT7Bw"
},
{
"name": "temperatureSensor2",
"source": "JAggoXUT7Bw"
}
],
"order": "lamport-reverse"
}
'\
| curl \
-X "POST" \
-d @- \
-H "Content-Type: application/json" \
-H "Accept: application/x-ndjson" \
http://localhost:4454/api/v1/events/query | jq
API Response
{
"lamport": 541,
"stream": {
"semantics": "com.actyx.examples.temperature",
"name": "temperatureSensor2",
"source": "JAggoXUT7Bw"
},
"timestamp": 1618924129607691,
"offset": 47,
"payload": {
"foo": [3, 1, 1],
"bar": {
"a": 13,
"b": 48
}
}
}

Subscribe to event streams#

You can use the Event Service API to subscribe to event streams. The Event Service may return past events and will return new events as they are received.

Request#

  • Endpoint: http://localhost:4454/api/v1/events/subscribe
  • HTTP method: POST
  • HTTP headers:
    • Content-Type, must be application/json, default: application/json
    • (optional) Accept, must be application/x-ndjson, default: application/x-ndjson

The request body must contain a JSON object with the following structure:

{
"lowerBound": {
"<string: sourceID>": "<integer: exclusive-lower-bound, e.g. 34>",
"<string: sourceID>": "<integer: exclusive-lower-bound, e.g. -1>"
},
"subscriptions": [
{
"semantics": "<string: semantics | undefined>",
"name": "<string: name | undefined>",
"source": "<string: sourceID | undefined>"
},
{
"semantics": "<string: semantics | undefined>",
"name": "<string: name | undefined>",
"source": "<string: sourceID | undefined>"
},
{
"semantics": "<string: semantics | undefined>",
"name": "<string: name | undefined>",
"source": "<string: sourceID | undefined>"
}
]
}

You use the request body to specify the details of your request as documented in the following.

Optional: Lower bound for offsets (lowerBound)#

The lowerBound object specifies the lower bound offset for each source id with the numbers being exclusive. i.e. a lowerBound specification of 34 means the event service will return events with offsets > 34.

The lowerBound is optional. If none is set for one, multiple or all subscribed sources, the Event Store will assume a lower bound offset of -1, i.e. the beginning.

Required: Subscriptions (subscriptions)#

The subscriptions objects specifies which event streams should be queried, with streams being specified with the source, semantics and name 3-tuple. You may not provide some or all of these properties to specify wildcard.

Response#

  • HTTP headers:
    • Content-Type is application/x-ndjson
    • Transfer-Encoding is chunked

The response will be a stream of <CR><LF>-delimited event payloads of the following structure:

{
"stream": {
"semantics": "<string: semantics>",
"name": "<string: name>",
"source": "<string: sourceID>"
},
"timestamp": "<integer>",
"lamport": "<integer>",
"offset": "<integer>",
"payload": "<object>"
}

If an error is encountered while processing the stream of events, the stream will terminate with a final error JSON object with the following structure:

{
"error": "message",
"errorCode": 500
}

Example#

See the following example using cURL:

API Request
echo '
{
"lowerBound": {
"JAggoXUT7Bw": 34,
"a263bad7cMr": -1
},
"subscriptions": [
{
"semantics": "com.actyx.examples.temperature",
"name": "temperatureSensor1",
"source": "JAggoXUT7Bw"
},
{
"semantics": "com.actyx.examples.temperature",
"name": "temperatureSensor1",
"source": "a263bad7cMr"
},
{
"name": "temperatureSensor2",
"source": "a263bad7cMr"
}
]
}
'\
| curl \
-s -X "POST" \
-d @- \
-H "Content-Type: application/json" \
-H "Accept: application/x-ndjson" \
http://localhost:4454/api/v1/events/subscribe | jq
API Response
{
"stream": {
"semantics": "com.actyx.examples.temperature",
"name": "temperatureSensor1",
"source": "JAggoXUT7Bw"
},
"timestamp": 21323,
"lamport": 323,
"offset": 34,
"payload": {
"foo": "bar",
"fooArr": ["bar1", "bar2"]
}
}

Usage examples in different languages#

The following examples show how you could interact with the event services from different languages and environments.

const { Transform } = require('stream')
const http = require('http')
const StringDecoder = require('string_decoder').StringDecoder
const decoder = new StringDecoder('utf8')
const actyxDecoder = new Transform({
readableObjectMode: true,
transform(chunk, \_, cb) {
try {
if (this.\_last === undefined) {
this.\_last = ''
}
this.\_last += decoder.write(chunk)
var list = this.\_last.split(/\r?\n/)
this.\_last = list.pop()
for (var i = 0; i < list.length; i++) {
if (list[i].length !== 0) {
//ignore keep alive empty lines
const message = JSON.parse(list[i])
if (message.error !== undefined) {
return cb(message)
}
this.push(message)
}
}
cb()
} catch (err) {
cb(err)
}
},
})
const options = {
hostname: 'localhost',
port: 4454,
path: '/api/v1/events/subscribe',
method: 'POST',
headers: { 'Content-Type': 'application/json' },
}
const body = JSON.stringify({ subscriptions: [{ semantics: 'com.actyx.examples.temperature' }] })
const req = http.request(options, (res) => {
if (res.statusCode == 200) {
res.pipe(actyxDecoder).on('data', console.log).on('error', console.log)
} else {
console.log(`error, status code: ${res.statusCode}`)
}
})
req.write(body)
req.end()