Events API
This is the reference documentation for the Events API. If you run into questions while going through the document, we recommend reading the conceptual guide on event streams to get a more holistic understanding of how the Events API works.
The Events API allows you to
- get information about known offsets,
- publish events
- query event streams,
- subscribe to event streams, and
- subscribe to event streams monotonically
It is reachable at the following base URI: http://localhost:4454/api/v2/events
.
JSON used in the examples below is pretty-printed with jq. This is only to make it more readable here. In reality, the Events API does not return pretty-printed JSON but the usual compact JSON you know from any other service.
Prerequisites
Communication with the Events API needs to be authenticated.
Therefore an auth token which is associated with the requesting app needs to be retrieved from the Auth API.
This token then needs to be passed in the Authorization
header with every request to the Events API.
In the following examples we will use the $AUTH_TOKEN
environment variable which can be initialized with
export AUTH_TOKEN="$(curl -s localhost:4454/api/v2/auth -d'{"appId": "com.example.app","displayName": "Example App","version": "1.0"}' -H "Content-Type: application/json" | jq -r '.token')"
While the following examples use cURL, other command-line or graphical tools (e.g. Postman) would work as well.
The sections below are split into reference documentation and usage examples. The examples are made in such a way that you can simply copy and paste the commands into your terminal. Please note that to follow along, you need to replace the node and stream IDs from the examples with yours.
Get information about known offsets
You can get information from the Events API about known offsets, i.e. what the API believes to be the latest offset for each stream.
This request returns the currently validated present
as observed by this node, meaning that all events described by this offset map are available for querying.
It further returns a map in toReplicate
, which describes the number of events that are available in the swarm and should eventually become available on this local node.
Fully replicated streams are omitted from this map.
Take a look at the event streams guide to learn more about the role of offsets.
- Reference
- Example
Request
- Endpoint:
http://localhost:4454/api/v2/events/offsets
- HTTP method:
GET
- HTTP headers:
Authorization
, see Prerequisites- (optional)
Accept
, must beapplication/json
, default:application/json
There is no request body.
Response
- HTTP headers:
Content-Type
isapplication/json
Cache-Control
isno-store
(to get fresh data and not use cache slots)
The response body will contain a JSON object of the following structure:
{
"present": {
"<string: stream ID>": "<integer: last-known-offset>",
"<string: stream ID>": "<integer: last-known-offset>"
},
"toReplicate": {
"<string: stream ID>": "<unsigned integer > 0: number of events pending replication to this node>",
"<string: stream ID>": "<unsigned integer > 0: number of events pending replication to this node>"
}
}
To learn more about stream IDs and event streams in general see our conceptual guide on event streams.
See the following example using cURL:
curl \
-s -X "GET" \
-H "Authorization: Bearer $AUTH_TOKEN" \
-H "Accept: application/json" \
http://localhost:4454/api/v2/events/offsets | jq .
{
"present": {
"1g1UOqdpvBB1KHsGWGZiK3Vi8MYGDZZ1oylpOajUk.s-2": 56
},
"toReplicate": {
"1g1UOqdpvBB1KHsGWGZiK3Vi8MYGDZZ1oylpOajUk.s-2": 1
}
}
Publish events
You can publish new events using the Events API.
- Reference
- Example
Request
- Endpoint:
http://localhost:4454/api/v2/events/publish
- HTTP method:
POST
- HTTP headers:
Authorization
, see Prerequisites- (optional)
Content-Type
, must beapplication/json
, default:application/json
The request body must contain a JSON object with the following structure:
{
"data": [
{
"tags": ["<string: tag, e.g. tag-01>", "<string: tag, e.g. tag-02>"],
"payload": "<object>"
},
{
"tags": ["<string: tag, e.g. tag-01>", "<string: tag, e.g. tag-02>"],
"payload": "<object>"
}
]
}
Response
- HTTP headers:
Content-Type
isapplication/json
The response body will contain a JSON object of the following structure:
{
"data": [
{
"lamport": "<integer>",
"stream": "<string: stream id>",
"offset": "<integer>",
"timestamp": "<integer>"
},
{
"lamport": "<integer>",
"stream": "<string: stream id>",
"offset": "<integer>",
"timestamp": "<integer>"
}
]
}
See the following example using cURL:
echo '
{
"data": [
{
"tags": ["tag-01", "tag-02"],
"payload": {
"foo": { "a": 1, "b": 2 }
}
},
{
"tags": ["tag-02", "tag-03"],
"payload": {
"value": 42
}
}
]
}
' \
| curl \
-s -X "POST" \
-H "Authorization: Bearer $AUTH_TOKEN" \
-d @- \
-H "Content-Type: application/json" \
http://localhost:4454/api/v2/events/publish | jq .
{
"data": [
{
"lamport": 84,
"stream": "1g1UOqdpvBB1KHsGWGZiK3Vi8MYGDZZ1oylpOajUk.s-2",
"offset": 20,
"timestamp": 1622110001582587
},
{
"lamport": 85,
"stream": "1g1UOqdpvBB1KHsGWGZiK3Vi8MYGDZZ1oylpOajUk.s-2",
"offset": 21,
"timestamp": 1622110001582587
}
]
}
Query event streams
You can query the Events API for bounded sets of events in one or more event streams.
- Reference
- Example
Request
- Endpoint:
http://localhost:4454/api/v2/events/query
- HTTP method:
POST
- HTTP headers:
Authorization
, see Prerequisites- (optional)
Content-Type
, must beapplication/json
, default:application/json
- (optional)
Accept
, must beapplication/x-ndjson
, default:application/x-ndjson
The request body must contain a JSON object with the following structure:
{
"query": "<string: tag query, e.g. «FROM 'tag-01' & 'tag-02'»>",
"order": "<string: 'asc' | 'desc' | 'stream-asc'>",
"upperBound": {
"<string: stream ID>": "<integer: inclusive-upper-bound, e.g. 49>",
"<string: stream ID>": "<integer: inclusive-upper-bound, e.g. 101>"
},
"lowerBound": {
"<string: stream ID>": "<integer: exclusive-lower-bound, e.g. 34>",
"<string: stream ID>": "<integer: exclusive-lower-bound, e.g. -1>"
}
}
You use the request body to specify the details of your request as documented in the following.
Optional: Lower bound for offsets (lowerBound
)
The lowerBound
object specifies the lower bound offset for each stream with the numbers being exclusive,
i.e. a lowerBound
specification of 34
means the Events API will return events with offsets > 34
.
The lowerBound
is optional.
If none is set for one, multiple or all subscribed streams, the Events API will assume no lower bound.
Optional: Upper bounds for offsets (upperBound
)
The upperBound
object specifies the upper bound offset for each stream with the numbers being inclusive.
i.e. an upperBound
specification of 34
means the Events API will return events with offsets <= 34
.
If none is set, the response stream will contain all events that were known to the system at the time of query.
Otherwise the Events API will only return events from streams that are specified.
Required: Query (query
)
The query
field specifies an AQL query that defines how events should be filtered and/or transformed.
Required: Ordering (order
)
The order
object specifies in which order the events should be returned to the caller. There are three options, one of which must be specified:
asc
: ascending order according to events' lamport timestampdesc
: descending order according to events' lamport timestampstream-asc
: ascending order according to events' lamport timestamp per stream, with no inter-stream ordering guarantees.
Please note that for identical Lamport timestamps the stream ID is taken into account as a secondary sort criterion for event ordering.
Response
- HTTP headers:
Content-Type
isapplication/x-ndjson
Transfer-Encoding
ischunked
The response will be in the Newline Delimited JSON format with the following formats:
Response type event
{
"type": "event",
"lamport": "<integer>",
"stream": "<string: stream ID>",
"offset": "<integer>",
"timestamp": "<integer: unix epoch in microseconds>",
"tags": "<string[]: tags>",
"appId": "<string>",
"payload": "<object>"
}
Response type offsets
{
"type": "offsets",
"offsets": {
"<string: stream ID>": "<integer: event offset, e.g. 49>",
"<string: stream ID>": "<integer: event offset, e.g. 101>"
}
}
Response type diagnostics
{
"type": "diagnostic",
"severity": "<string: 'warning' or 'error'>",
"message": "<string>"
}
This is just a subset of possible response types. Clients should be prepared to handle (or ignore)
responses with a value of type
not specified above.
If an error is encountered while processing the stream of events, the stream will terminate with a final error JSON object with the following structure:
{
"code": "<string: error code>",
"message": "<string: error message>"
}
See the following example using cURL:
echo '
{
"query": "FROM \"tag-01\" & (\"tag-02\" | \"tag-03\")",
"order": "desc",
"lowerBound": {
"1g1UOqdpvBB1KHsGWGZiK3Vi8MYGDZZ1oylpOajUk.s-2": 34
},
"upperBound": {
"1g1UOqdpvBB1KHsGWGZiK3Vi8MYGDZZ1oylpOajUk.s-2": 57
}
}
' \
| curl \
-s -X "POST" \
-H "Authorization: Bearer $AUTH_TOKEN" \
-d @- \
-H "Content-Type: application/json" \
-H "Accept: application/x-ndjson" \
http://localhost:4454/api/v2/events/query | jq .
{
"type": "event",
"lamport": 28,
"stream": "1g1UOqdpvBB1KHsGWGZiK3Vi8MYGDZZ1oylpOajUk.s-2",
"offset": 4,
"timestamp": 1622108806233884,
"tags": ["tag-01", "tag-02"],
"appId": "com.example.app",
"payload": {
"value": 2
}
}
Note that in order for the JSON to be parsed correctly, we need to use a backslash \
before the double quotes in the tag expression.
Alternatively, you could save the JSON payload to a file called query.json
{
"query": "FROM 'tag-01' & ('tag-02' | 'tag-03')",
"order": "desc",
"lowerBound": {
"1g1UOqdpvBB1KHsGWGZiK3Vi8MYGDZZ1oylpOajUk.s-2": 34
},
"upperBound": {
"1g1UOqdpvBB1KHsGWGZiK3Vi8MYGDZZ1oylpOajUk.s-2": 57
}
}
and use it in the HTTP request like so:
curl \
-s -X "POST" \
-H "Authorization: Bearer $AUTH_TOKEN" \
-d @query.json \
-H "Content-Type: application/json" \
-H "Accept: application/x-ndjson" \
http://localhost:4454/api/v2/events/query | jq .
Subscribe to event streams
You can use the Events API to subscribe to event streams. The Events API may return past events and continue returning new "live" events as they are received.
- Reference
- Example
Request
- Endpoint:
http://localhost:4454/api/v2/events/subscribe
- HTTP method:
POST
- HTTP headers:
Authorization
, see Prerequisites- (optional)
Content-Type
, must beapplication/json
, default:application/json
- (optional)
Accept
, must beapplication/x-ndjson
, default:application/x-ndjson
The request body must contain a JSON object with the following structure:
{
"query": "<string: tag query, e.g. «FROM 'tag1' & 'tag2'»>",
"lowerBound": {
"<string: stream ID>": "<integer: exclusive-lower-bound, e.g. 34>",
"<string: stream ID>": "<integer: exclusive-lower-bound, e.g. -1>"
}
}
You use the request body to specify the details of your request as documented in the following.
Required: Query (query
)
The query
field specifies an AQL query that defines how events should be filtered and/or transformed.
Optional: Lower bound for offsets (lowerBound
)
The lowerBound
object specifies the lower bound offset for each stream with the numbers being
exclusive, i.e. a lowerBound
specification of 34
means the Events API will return events with offsets > 34
.
The lowerBound
field is optional. If none is set for any of the subscribed streams,
the Events API will assume a lower bound offset of -1
, i.e. the beginning.
Response
- HTTP headers:
Content-Type
isapplication/x-ndjson
Transfer-Encoding
ischunked
The response will be in the Newline Delimited JSON format with the following formats:
Response type event
{
"type": "event",
"lamport": "<integer>",
"stream": "<string: stream ID>",
"offset": "<integer>",
"timestamp": "<integer>",
"tags": "<string[]: tags>",
"appId": "<string>",
"payload": "<object>"
}
Response type offsets
{
"type": "offsets",
"offsets": {
"<string: stream ID>": "<integer: event offset, e.g. 49>",
"<string: stream ID>": "<integer: event offset, e.g. 101>"
}
}
Response type diagnostics
{
"type": "diagnostic",
"severity": "<string: 'warning' or 'error'>",
"message": "<string>"
}
This is just a subset of possible response types. Clients should be prepared to handle (or ignore)
responses with a value of type
not specified above.
If an error is encountered while processing the stream of events, the stream will terminate with a final error JSON object with the following structure:
{
"code": "<string: error code>",
"message": "<string: error message>"
}
See the following example using cURL:
echo '
{
"query": "FROM \"tag-01\" & (\"tag-02\" | \"tag-03\")",
"lowerBound": {
"1g1UOqdpvBB1KHsGWGZiK3Vi8MYGDZZ1oylpOajUk.s-2": 34
}
}
' \
| curl -N \
-s -X "POST" \
-H "Authorization: Bearer $AUTH_TOKEN" \
-d @- \
-H "Content-Type: application/json" \
-H "Accept: application/x-ndjson" \
http://localhost:4454/api/v2/events/subscribe | jq .
{
"type": "event",
"lamport": 28,
"stream": "1g1UOqdpvBB1KHsGWGZiK3Vi8MYGDZZ1oylpOajUk.s-2",
"offset": 4,
"timestamp": 1622108806233884,
"tags": ["tag-01", "tag-02"],
"appId": "com.example.app",
"payload": {
"value": 2
}
}
Subscribe to event streams monotonically
You can use the Events API to subscribe to event streams with the guarantee that whenever the service learns about events that need to be sorted earlier than an event that has already been delivered the stream ends with a time travel message.
- Reference
- Example
Request
- Endpoint:
http://localhost:4454/api/v2/events/subscribe_monotonic
- HTTP method:
POST
- HTTP headers:
Authorization
, see Prerequisites- (optional)
Content-Type
, must beapplication/json
, default:application/json
- (optional)
Accept
, must beapplication/x-ndjson
, default:application/x-ndjson
The request body must contain a JSON object with the following structure:
{
"session": "<string: user supplied session ID>",
"query": "<string: tag query, e.g. «FROM 'tag1' & 'tag2'»>",
"lowerBound": {
"<string: stream ID>": "<integer: exclusive-lower-bound, e.g. 34>",
"<string: stream ID>": "<integer: exclusive-lower-bound, e.g. -1>"
}
}
You use the request body to specify additional details of your request as documented in the following:
Required: Session ID (session
)
The session identifier is chosen by the client and must be used consistently by the client to resume an earlier session.
If the query
changes, a new session will be created regardless of the existence of a session with the same ID.
Required: Query (query
)
The query
field specifies an AQL query that defines how events should be filtered and/or transformed.
Optional: Lower bound for offsets (lowerBound
)
The lowerBound
object specifies the lower bound offset for each stream with the numbers being
exclusive, i.e. a lowerBound
specification of 34
means the Events API will return events with offsets > 34
.
The lowerBound
field is optional. If none is set for any of the subscribed streams,
the Events API will assume a lower bound offset of -1
, i.e. the beginning.
Response
- HTTP headers:
Content-Type
isapplication/x-ndjson
Transfer-Encoding
ischunked
The response will be in the Newline Delimited JSON format with the following formats:
Response type event
{
"type": "event",
"lamport": "<integer>",
"stream": "<string: stream ID>",
"offset": "<integer>",
"timestamp": "<integer: unix epoch in microseconds>",
"tags": "<string[]>",
"appId": "<string>",
"payload": "<object>",
"caughtUp": "<boolean: known events delivery exhausted?>"
}
Response type offsets
{
"type": "offsets",
"offsets": {
"<string: stream ID>": "<integer: event offset, e.g. 49>",
"<string: stream ID>": "<integer: event offset, e.g. 101>"
}
}
#### Response type `timeTravel`
In case the service learns about events that need to be sorted earlier than an event that has already been delivered, an event of this type is emitted and the stream is closed.
```json
{
"type": "timeTravel",
"newStart": {
"stream": "<string: stream ID>",
"lamport": "<integer>",
"offset": "<integer>"
}
}
Response type diagnostics
{
"type": "diagnostic",
"severity": "<string: 'warning' or 'error'>",
"message": "<string>"
}
This is just a subset of possible response types. Clients should be prepared to handle (or ignore)
responses with a value of type
not specified above.
If an error is encountered while processing the stream of events, the stream will terminate with a final error JSON object with the following structure:
{
"code": "<string: error code>",
"message": "<string: error message>"
}
See the following example using cURL:
echo '
{
"session": "my_session_id",
"query": "FROM \"tag-01\" & (\"tag-02\" | \"tag-03\")",
"lowerBound": {
"1g1UOqdpvBB1KHsGWGZiK3Vi8MYGDZZ1oylpOajUk.s-2": 34
}
}
' \
| curl -N \
-s -X "POST" \
-H "Authorization: Bearer $AUTH_TOKEN" \
-d @- \
-H "Content-Type: application/json" \
-H "Accept: application/x-ndjson" \
http://localhost:4454/api/v2/events/subscribe_monotonic | jq .
{
"type": "event",
"lamport": 323,
"stream": "1g1UOqdpvBB1KHsGWGZiK3Vi8MYGDZZ1oylpOajUk.s-2",
"offset": 34,
"timestamp": 1599224884528020,
"tags": ["tag-01", "tag-02"],
"appId": "com.example.app",
"payload": {
"foo": "bar",
"fooArr": ["bar1", "bar2"]
},
"caughtUp": true
}