Committing changes to an external database
While Actyx and AQL cover a wide range of use-cases, no IT system exists in isolation. We have covered how to get data into Actyx, the missing piece is how to update an external system with the events recorded in Actyx. We might for example need to export all material movements carried out by AGVs into a warehouse management system (WMS) as material bookings.
Solution strategy
Since everything in Actyx is recorded as events, the most straightforward solution starts by ensuring that every material movement is recorded with an event like this:
type MaterialMoved = {
type: 'materialMoved'
from: Location
to: Location
by: AgentId
article: ArticleId
quantity: number
// plus whatever else is needed: serial/batch/lot numbers, etc.
}
We can then record up to which point in the event stream these movements have already been booked into the WMS and what the result was:
type BookingSuccess = { type: 'bookingSuccess'; stream: StreamId; offset: Offset }
type BookingError = { type: 'bookingError'; stream: StreamId; offset: Offset; error: string }
// periodically emit one of these for efficient start-up
type BookingOffsets = { type: 'bookingOffsets'; offsets: OffsetMap }
type BookingEvent = BookingSuccess | BookingError | BookingOffsets
The idea here is to use the event identity consisting of stream identifier and event offset within that stream: for each booking we record the success or error as a new event. In theory this would be a complete solution because the set of already booked movement events can be obtained by reading all these booking events. In practice, it pays to write a full offset map every once in a while, since that serves as a compact description of all event stream positions covered so far.
Completing the event model
With the event structure already outlined above we concentrate on the tags:
- we assume that
MaterialMoved
events are tagged withmaterialMoved
- we tag all
BookingEvent
s withbooking
- in order to find our starting point quickly and efficiently, we tag
BookingOffsets
withbookingOffsets
const BookingTag = Tag<BookingEvent>('booking')
const BookingOffsetsTag = Tag<BookingOffsets>('bookingOffsets')
Implementation of the app logic
When the exporter app starts, it will first need to figure out where it left off before.
This begins by obtaining the latest BookingOffsets
event and then proceeds by adding all later successes or errors into the contained offset map.
let eventsSinceOffsets = 0
const results = await actyx.queryAql(
`PRAGMA features := aggregate
FROM 'booking' & 'bookingOffsets' AGGREGATE LAST(_)`,
)
const lowerBound = results.filter((r) => r.type === 'event')[0]?.payload?.offsets || {}
await new Promise((resolve, reject) => {
actyx.queryAqlChunked(
'FROM "booking"',
1, // chunkSize one
([resp]) => {
if (resp.type === 'event') {
eventsSinceOffsets += 1
lowerBound[resp.payload.stream] = resp.payload.offset
}
},
(err) => (err ? reject(err) : resolve()),
)
})
actyx.subscribeAql(
'FROM "materialMoved"',
(resp) => resp.type === 'event' && processEvent(resp),
(err) => console.error('Actyx subscription error:', err),
lowerBound,
)
When updating lowerBound
we rely on the fact that Actyx always returns events from the same stream in strictly increasing offset order, therefore we can simply update the offset map.
We then use the computed OffsetMap
as the starting point for the subscription to material movement events — which will cover both historic and live events.
Upon first start, there will be no BookingOffsets
event, so lowerBound
will stay the emtpy object, also throughout the queryAqlChunked
call (since no events will be found), so we start processing from the beginning of time.
If this is not desired then we can always add a time range restricting how far back in history we are willing or obliged to look.
Now all that is missing is the function for performing the actual bookings:
async function processEvent(resp: AqlEventMessage): Promise<void> {
const { stream, offset } = resp.meta
try {
await callWmsApiForBooking(resp.payload)
await actyx.publish(BookingTag.apply({ type: 'bookingSuccess', stream, offset }))
} catch (err) {
await actyx.publish(
BookingTag.apply({ type: 'bookingError', stream, offset, error: err.toString() }),
)
}
lowerbound[stream] = offset
if (eventsSinceOffsets > 1000) {
await actyx.publish(
BookingTag.and(BookingOffsetsTag).apply({ type: 'bookingOffsets', offsets: lowerBound }),
)
eventsSinceOffsets = 0
}
}
The precise details of callWmsApiForBooking
are not important for the Actyx treatment, it only matters that this function is asynchronous and may fail.
The latter gives rise to the bookingError
result while the former requires the event subscription to be inhibited while the WMS is being called.
Luckily this is part of the Actyx SDK: actyx.subscribeAql
will await a Promise returned from the onResponse
callback before invoking the callback again.
Idiomatic JavaScript APIs are built around callbacks that are invoked whenever new information is ready to be processed — there is no established concept of “hold your horses while I think”.
Therefore, when Actyx delivers the subscription results, they will accumulate in memory while the bookings into the WMS are ongoing.
If this exporter app is started for the first time with a really large volume of materialMoved
events already stored in Actyx, this may lead to an out-of-memory situation.
The solution sketched above should deal with this if you ensure that it is restarted after a crash.
All bookings made during the first attempt will be skipped during the second, and so on.
Or you could catch up from lowerBound
to the present offsets (from await actyx.offsets()
) using AQL queries with a LIMIT
clause.
Alternative solution if database can store offset map
If your external system offers the possibility to store auxiliary data with your bookings, then you can keep track of what has already been exported within that external system instead of using BookingEvent
s.
The common pattern in this case is to write the stream
and offset
together with the business data in a single transaction.
This ensures that every event will lead to exactly one booking.
The solution given above may theoretically crash between performing the booking and persisting the result as an Actyx event, in which case the booking will be done a second time upon restart of the exporter app.
In addition to the change to processEvents
, the startup process will use a SQL query (or whatever is offered by the external system) to get the initial lowerBounds
offset map, which then is used to start the subscription as usual.