The Pond
We have seen in the previous sections that Actyx provides durable event streams, indexed by a tagging system. Another way to look at this is: by choosing some tags, you get a durable pub–sub topic for the communication between apps on different nodes. The event ordering guarantees only that events from the same source node are delivered in the correct order — there is no ordering guarantee between events coming from different nodes.
So how do we write applications that come to the same conclusions on all nodes?
The answer is the Actyx Pond, a Typescript library that builds upon the Actyx event service to offer you a higher level programming model. In the following we discuss the main concepts to give you an intuition what the Pond can be used for.
The naming refers to the view that an Actyx swarm is a distributed storage system for events, like a data lake. Each node has some part of the events, so it is smaller than the lake, hence named pond. Within this allegory the active entities that consume the data are consequently called fishes, even though in the context of digital factories they tend to be called digital twins (or we prefer local twins).
The problem setting
Picture an order workflow: a customer calls and places an order, which is created in the system. Tom starts working on the order, recording this as an event, not knowing that Jerry canceled the order shortly before — the customer had called again and asked for some changes that will require more thought. In a peer-to-peer system such things can happen, in fact they frequently do if interactions occur on a timescale of seconds, as network message can sometimes be delayed. The above might for example happen if Tom just woke his computer from sleep and the network synchronization between Actyx nodes was still ongoing when he pressed the “start work” button on the order workflow.
The Actyx Pond solution
There are several approaches to solving the above issue. The most firmly established solution is to use a central database to avoid the synchronization issues (at least on the app level; a replicated database will need to solve these issues internally). Since we’re not doing that — we want autonomous network peers to work together without requiring additional infrastructure — we could use consensus protocols to do what a replicated database does as well. This would summon the CAP theorem to enter the scene, which basically says that in the face of network issues we cannot offer strong consistency and full availability at the same time. A more modern solution than preventing conflicts from happening would be to encode the problem such that conflicts are excluded by the design; while this is possible in surprisingly many cases, it won’t work here.
The approach offered by Actyx Pond is to allow and embrace conflicts. Instead of preventing them, we make it easy to recognize them once they have occurred, giving you the tools to react appropriately.
In the above example, we’d track the sequence of events with a state machine, like so:
- the order is created, we’re now in state
created
- Jerry registers the cancellation, now we’re in state
canceled
- Tom (unbeknownst of the above) starts working, which is not foreseen in the current state
⇒ we need to perform a compensating action (i.e. Tom needs to stop working)
The anatomy of a Fish
The main concept of the Actyx Pond is that of a state machine, which together with its event subscription and some configuration settings is called a Fish.
A state machine is a piece of business logic that tracks a current state and reacts to incoming events based on that state:
events may be processed only in certain states, like the “work started” event in the example above that only has its intended effect in the created
state.
Another way to picture this is that the Fish consumes incoming events, making sense of that input stream using its state machine logic. The events are fed into the Fish by the Pond, always strictly in the right order — knowing that the event stream may have holes in it, namely those events from other nodes that have not yet been replicated. Once all events are in the local disk store, the sequence is completely known.
But what happens in the meantime? As new events can be emitted on other Actyx nodes at all times, the Pond cannot assume that it knows when it has all relevant events at its disposal, it cannot wait feeding the Fish until all events are available. This means that it will feed the Fish with whatever events are already present, and as soon as new events are received over the network they are sorted into their respective positions in the event stream and the fish is fed again.
This last point may sound nonsensical: assuming the example above is modeled as a Fish, and assuming that Fish has seen the “created” and “work started” events, how can it now be fed the full sequence of “created”, “canceled”, “work started”? Simply feeding the whole sequence again won’t work, because the Fish would rightfully reject the “created” event, having seen it earlier.
The trick applied by the Pond is called “time warp algorithm”, which is a fancy name for the fact that the Fish is reset to an earlier state before feeding the new part of the log again. This is where the allegory breaks down a bit, since Actyx Pond takes regular snapshots of what each Fish knows so that it can reset their brains to earlier states when needed.
What this gives you is that you program for a linear sequence of events, as if it was always complete, and the Pond runs it to always get the correct state for those events that are locally available.
Concrete solution example
The process from our example might be modeled as shown in the diagram above, consisting of five states and four distinct event types that drive the Fish’s state machine forward.
Not all combinations are foreseen, e.g. it makes little sense for workDone
to occur in the initial
or created
states.
As discussed above, workDone
and canceled
may happen concurrently if event synchronization is not fast enough for some reason, in which case we might see workStarted
in the canceled state
.
Such invalid events still represent truth: it is important to remember that Tom actually did start working!
So the above diagram only describes the intended order workflow, it does not tell the whole story.
We might for example acknowledge this case by adding another state named “cleanup” where we track that Tom is still working.
Tom’s computer might alert him of this state so that he can stop what he’s doing and record that fact with a workCanceled
event so that everything is in its proper final state.
In terms of Typescript code a Fish looks like this:
// states and events are usually keyed with a `type` property
type Created = { type: 'created'; order: string }
...
type Events = Created | WorkStarted | WorkDone | OrderCanceled
type Initial = { type: 'initial' }
type Open = { type: 'open'; order: string }
...
type State = (Initial | Open | Working | Done | Canceled) & { toStop: string[] }
const onEvent = (state: State, event: Event): State => {
switch (state.type) {
case 'initial': {
switch (event.type) {
case 'created': {
return { ...state, type: 'open', order: event.order }
}
}
// ignore all other events and defer to generic handling below
break
}
// handle other states
}
// we only get here for unhandled events, so handle those which require compensation
if (event.type === 'workStarted') {
state.toStop.push(event.who)
return state
}
// otherwise return state unchanged
console.log('completely ignoreing event', event.type)
return state
}
The above code shows how to write a state machine in Typescript: one outer switch statement for the state, with an inner switch statement per state that declares how to handle the expected events in this state.
Unhandled events may result from programming bugs (so it is a good idea to log them), but they may also arise due to apps running on multiple nodes at the same time.
Whereas we proposed above to expand the state diagram, this code demonstrates another approach: we add a piece of data that attached to all states, toStop
in this example.
This is made convenient by the Typescript spread operator { ...state, /* overwrite some fields */ }
that helps carrying along common properties.
Both approaches are valid, sometimes a combination of both is the best solution.
The important outcome is that those who observe the Fish’s state from the outside can see all necessary compensating actions and are thus in a position to perform them.
The purpose of the onEvent
state machine is purely to compute the current state.
Recall that the Pond may rerun the event stream through this function from an earlier state as new events are received from the network, so onEvent
may see the same event many times!
Hence it is a really bad idea to perform any actions (like HTTP requests, DB writes, sending emails etc.) in this function.
In order to run the Fish, we need to tell the Pond a few more things:
- we need to define the initial state to start with
- the event stream needs to be selected using a combination of tags
- we need to give this Fish a name so that the Pond can recognize whether it is already running — running the same Fish twice would be purely a waste of resources
import { Fish, FishId } from '@actyx/pond'
const orderTag = Tag<Event>('order')
const orderFish = (orderId: string): Fish<State, Event> => ({
fishId: FishId.of('orderFish', orderId, 0),
where: orderTag.withId(orderId),
initialState: { type: 'initial' },
onEvent,
})
How to use a Fish
With the above preparations we can spawn the Fish using a Pond:
const pond = await Pond.default(/* app manifest */)
pond.observe(orderFish('o12346fx'), (state) => /* render UI */)
pond.keepRunning(orderFish('o12346fx'), (state, enqueue) => /* make automatic decisions */)
The observe
operation wakes up the Fish if it is not yet running and attaches the given callback so that it is called whenever a new state has been computed (which might be the same state as the previous one).
This is useful for feeding state information into other parts of the app, which might be the UI or some algorithm.
In contrast to that, keepRunning
(and its single-shot brother run
) invokes its callback with the latest state and a function to emit new events.
This can be used to automatically make decisions, like alerting Tom that he should abandon this work order and emitting an event to record this action.
The last part is a common pattern: you want to avoid alerting twice because the Pond emits the same state again — which may happen again at any time because some event has been received from the past.
keepRunning
and run
specifically ensure that the callback is only invoked after all locally known events have been processed, so any state change you effected by using enqueue
will be visible upon the next invocation.
Summary
When the order of events matters for processing an event stream, Actyx Pond is the local-first solution for you. The Pond is a container for running your pieces of business logic, encoded as Fishes. Due to the time warp algorithm your Fishes enjoy eventual consistency even in an unreliable peer-to-peer network situation, meaning that once the needed events have been transmitted, the same Fish will compute the same state on all Actyx nodes.
The Pond offers a few more functions as well as access to the underlying event stream API. You should prefer the latter when only emitting events or when the interpretation of your events does not depend on a current state.