Skip to main content

Tracking state and history

In this section we focus on the task of mirroring the state of something that lives outside Actyx by emitting suitable events inside Actyx. Once we have ensured that all relevant state changes of that outside thing (e.g. robot, thermostat, … ) are represented as events, we can build interesting apps on top of that. For example, we could build an app that observes the window and the thermostat in my living room and tells the friendly house robot to open or close the window when appropriate.

The important notion here is that the changes happen independent of Actyx, by themselves; the part of the app we’re building merely tracks what is happening elsewhere. This is often true of all kinds of data connectors feeding information into an Actyx system.

Solution strategy

Since the Actyx app has no power over the external thing, we cannot enforce conditions on the sequence of events that will happen. If for example the specification says that the robot will return to its base when idle, but we observe the robot to walk around instead, then we need to accept reality. Therefore, all incoming information is valid by definition — but we may still choose to ingest only that subset which our app needs and understands.

Observing the external thing involves some code that emits a stream of notifications; here we assume that we have some EventEmitter for both the window and the thermostat. The strategy we follow is to subscribe to the interesting subset of notifications and emit them as Actyx events with suitable tags. Then we can obtain the latest state by using some simple AQL queries, and we also have access to the full history as well.

Designing the event model

The event model for the window is rather straightforward:

type WindowId = string
type Opened = { type: 'opened'; id: WindowId }
type Closed = { type: 'closed'; id: WindowId }
type WindowEvent = Opened | Closed

The window can only be opened and closed, and when it is, we emit the appropriate event. Since we can have multiple windows, we need to record the identifier within each event while Actyx takes care of other metadata like timestamp and device ID for when and where the event was written.

In the case of the thermostat the event model is bigger but not complicated — following some external entity tends to be this way.

type ThermostatId = string
type TemperaturReading = { type: 'temperatureReading'; id: ThermostatId; inside: number; outside: number }
type SetPointChanged = { type: 'setPointChanged'; id: ThermostatId; setPoint: number }
type ModeChanged = { type: 'modeChanged'; id: ThermostatId; mode: 'heating' | 'neutral' | 'cooling' }
type ThermostatEvent = TemperaturReading | SetPointChanged | ModeChanged

Now that we know the shape of the data we want to write, the only missing design piece is the event tagging. It is always a good idea to tag each event with the identifier of the thing it pertains to, in our case the WindowId or ThermostatId. This is done by using tags like window:bathroom or window:7326 which can conveniently be constructed with syntax like

const bathroomWindow = Tag<WindowEvent>('window').withId('bathroom')
// OR
const windowTag = Tag<WindowEvent>('window')
const bathroomWindow = windowTag.withId('bathroom')

in Typescript (the type parameter ensures that this tag can only be applied to the two event types declared above). In most cases, it is also useful to include the event type in the tags, like Tag('setPointChanged') for the thermostat. This is especially helpful for event types which do not occur regularly for some type of thing, as it makes it very efficient to find these events among all the more abundant ones. In case of the window, the event model sketched would not benefit from even type tagging, as both kinds of events should occur at the same frequency, ideally in alternating fashion; adding the type to the tags does not hurt, though, in case you would like to include it anyway, as compression typically reduces the overhead a lot.

Emitting the events

Turning EventEmitter notifications into Actyx events is basically done like this:

const actyx = Actyx.of(/* app manifest */)
const emitter = getWindowSensorNotifications(windowId)

// usually a good idea to make a type-safe factory function for the designed tags
const tags = (eventType: WindowEvent['type']) => windowTag.withId(windowId).and(Tag(eventType))

emitter.on('closed', () => {
actyx.publish(tags('closed').apply({ type: 'closed', id: windowId }))
})
// and analogously for 'opened'

While this is the core of the process, there is one complication: event publication is asychronous in Actyx (as it involves storing the bits on disk) while the event emitter can fire at any rate. If the emitter is too fast, the Promises returned from actyx.publish will pile up inside the node.js process and the system will eventually run out of memory. This conflict naturally arises because we are connecting two things where neither has power over the other — we’ll need to act as a mediator.

One solution is to limit the number of outstanding publish requests:

const queue: TaggedEvent[] = []
const publish = (t: TaggedEvent) => {
const start = queue.length === 0
queue.length < QUEUE_CAPACITY && queue.push(t)
start &&
(async () => {
while (queue.length > 0) {
await actyx.publish(queue.shift())
}
})()
}

// and use this like so:
emitter.on('closed', () => {
publish(tags('closed').apply({ type: 'closed', id: windowId }))
})

Upon receiving the first event to publish, an asynchronous loop is started that feeds events into actyx.publish one by one. When the queue runs empty, this process becomes dormant, to be awoken again when the next event shall be written. And when the queue reaches QUEUE_CAPACITY, new publish requests will be dropped.

This pattern should be tailored to your use-case by

  • considering whether to drop the oldest, newest, or all outstanding publish requests upon queue overrun,
  • possibly raising alerts when dropping events
  • deciding how to handle errors in publishing (which may be inherent to the data as in “event too large” or due to Actyx being temporarily unreachable)

which is why we have not published the above code as a package on the npm registry.

Obtaining the latest state

With events written into Actyx as shown above, we now have the means to “see” the external thing from within apps. All we have to do is to run some AQL queries. In case of the window this is almost too easy:

PRAGMA features := aggregate
FROM 'window:living room' AGGREGATE LAST(_.type)

If the connector for the living room window has never been running then this will result in an empty response. Otherwise, it will say either "closed" or "openend".

We can retrieve the current mode of the thermostat in the same fashion, or we can also include the latest temperature measurements:

PRAGMA features := aggregate
FROM 'thermostat:living room'
AGGREGATE {
temperature: LAST(CASE _.type = 'temperatureReading' => _.inside ENDCASE)
mode: LAST(CASE _.type = 'modeChanged' => _.mode ENDCASE)
}

This query will cause Actyx to search all events tagged for the living room thermostat in chronologically decreasing order until both a temperature reading and a mode change event have been found. If we wanted to get the last set point then it would be more efficient to search for that using more tags, since we can assume that the set point is changed much less frequently than the occurrence of the other events.

PRAGMA features := aggregate
FROM 'window:living room' & 'setPointChanged' AGGREGATE LAST(_.setPoint)

If we encapsulate these queries inside helper functions, our little example app’s core loop could look like this:

const getWindowState = async (actyx: EventFns) => {
const results = await actyx.queryAql({
// the PRAGMA needs its own line
query: `PRAGMA features := aggregate
FROM 'window:living room' AGGREGATE LAST(_.type)`,
})
const values = results.filter((r) => r.type === 'event')
return values.length > 0 ? (values[0].payload as WindowEvent['type']) : 'unknown'
}
// and similar for the thermostat

const checkLoop = async () => {
const window = await getWindowState(actyx)
const thermostat = await getThermostatMode(actyx)
if (window === 'opened' && thermostat === 'heating') {
const robotState = robot.getState()
if (robotState.type !== 'mission' || robotState.mission !== 'close living room window') {
robot.setNewMission('close living room window')
}
}
// run check again every 10sec
setTimeout(checkLoop, 10_000)
}
checkLoop()
timeout vs interval

We’re using setTimeout instead of setInterval because the loop action is asynchronous and we don’t want to trigger the next check before the previous one is complete.

Accessing historic states

The above can also be done by updating database cells, so what is the advantage of using Actyx? Since we emit changes as events, we have access to the whole history! (modulo limiting the history to avoid filling up the disk by using ephemeral event streams) This means we can find out whether the window was open at some specific point in time:

PRAGMA features := aggregate timeRange
FROM 'window:living room' & to(2022-09-23T12:15:00+02:00) AGGREGATE LAST(_.type)

This retrieves the youngest event before 12:15 CEST on Sep 23, 2022. We could also check the temperatur readings of the last hour to compute a trend:

import { linearRegression } from 'simple-statistics'

const temperatureTrend = async (actyx: EventFns) => {
const fromTime = new Date(Date.now() - 60 * 60 * 1000) // one hour ago
const results = await actyx.queryAql({
query: `PRAGMA features := timeRange
FROM 'thermostat:living room' & 'temperatureReading' & from(${fromTime})
SELECT _.outside`,
})
const values = results
.filter((r) => r.type === 'event')
.map((r) => [r.meta.timestampMicros, r.payload])
const { m } = linearRegression(values) // yields degrees per microsecond
Math.abs(m) < 1e-10 ? 'flat' : m > 0 ? 'rising' : 'falling'
}