Collaboration between autonomous machines
Now that we covered passively following an external thing’s state changes and actively managing a process within Actyx, we widen our gaze to shaping the collaboration of external things, like robots — we use AGVs and machining centers as currently available examples. Steering such a process requires knowing what another robot is doing and communicating intent between these machines. Consider the following scenarios:
- an AGV approaches a machining center to unload fresh work pieces or load processed ones
- a machining center asks the AGV fleet to deliver fresh work pieces
The first one involves 1:1 communication between two machines while the second one uses broadcast communication to a group.
Solution strategy
We use Actyx event streams as pubsub topics by employing corresponding event tags.
The machining center identified by the number 381 will listen to events tagged with machine:381
, the AGV numbered 17 listens to tag agv:17
, and all AGVs in the fleet observe events tagged with agv:fleet
.
Opening a conversation means starting a new workflow, each instance of which will have its own tag, like dockingProcess:6abb4e89-142a-4a37-ba1d-dd53e4b61c95
.
In order to keep an overview of the started conversations, each initial event is also tagged with workflowStarted
.
And we tag all events with their originating machine (e.g. agv:17
) so that we can see at a glance what that robot has been doing.
The (un)loading workflow
Designing the event model
For the (un)loading task we need to describe the complete conversation between the AGV and the machining center (the machine in the following). It begins by the AGV announcing its approach and requesting permission to enter the docking area. The machine then either says “no, go away!” or eventually gives permission to enter — this may imply that an access door is now open etc. In the latter case the AGV will enter the docking area and eventually confirm being in the assigned spot, upon which the machine may close the access door. After the machine has unloaded the fresh work pieces or loaded the processed ones, it will open the access door and inform the AGV that it may now depart. Once the AGV is outside the restricted area, it confirms this by emitting the final event of this exchange.
If something goes wrong, the process can be aborted by either side. As easy as this sounds, it may be some work to handle the fallout of such an occurrence: if for example the AGV aborts while being locked inside the restricted area, then a real factory may want this situation to be escalated to a human operator who will then resolve it, making use of the local control panels affixed to the AGV and the machine.
In summary, the event model for the unloading of work pieces is:
// assuming some WorkPieceId, and Position to describe placement on the AGV
type UnloadingRequested = { type: 'unloadingRequested'; workPieces: [Position, WorkPieceId][] }
type LoadingRequested = { type: 'loadingRequested'; free: Position[] }
type MayEnter = { type: 'mayEnter', coordinates: Coordinates }
type Positioned = { type: 'positioned'; coordinates: Coordinates }
type MayLeave = { type: 'mayLeave'; taken: Position[]; placed: [Position, WorkPieceId][] }
type Outside = { type: 'outside' }
type Aborted = { type: 'aborted' }
type DockingProcessEvent =
UnloadingRequested | LoadingRequested | MayEnter | Positioned | MayLeave | Outside
Implementing the state machine
The implementation proceeds quite like for the dinner example. We use the Actyx Pond’s fish abstraction again, identifying each of the short-lived workflow instances by tagging with
const DockingProcessTag = Tag<DockingProcessEvent>('dockingProcess')
const dockingTags = (id: string) => DockingProcessTag.withId(id)
This time we split the available commands according to the role of the participant (the same technique can also be applied to the dinner example).
// could also split into one function per role instead
const dockingCommands =
(role: 'agv' | 'machine', id: string, pond: Pond) => (state: DinnerState) => {
requestUnloading: role === 'agv' && state.type === 'initial'
? (workPieces: [Position, WorkPieceId][]) =>
pond
.run(
(state2, enqueue) =>
state2.type === 'initial' &&
enqueue([
dockingTags(id).and(Tag('started')),
{ type: 'unloadingRequested', workPieces },
]),
)
.toPromise()
: undefined
requestLoading: role === 'agv' && state.type === 'initial'
? (free: Position[]) =>
pond
.run(
(state2, enqueue) =>
state2.type === 'initial' &&
enqueue([dockingTags(id).and(Tag('started')), { type: 'loadingRequested', free }]),
)
.toPromise()
: undefined
// assuming the above events take the state from `initial` to `requested`
mayEnter: role === 'machine' && state.type === 'requested'
? () =>
pond.run(
(state2, enqueue) =>
state2.type === 'requested' && enqueue([dockingTags(id), { type: 'mayEnter' }]),
)
: undefined
positioned: role === 'agv' && state.type === 'mayEnter'
? (coordinates: Coordinates) =>
pond.run(
(state2, enqueue) =>
state2.type === 'mayEnter' &&
enqueue([dockingTags(id), { type: 'positioned', coordinates }]),
)
: undefined
// and so on for the other commands
}
In this case we expect only a single participant to play each of the two roles, and we can make the code more robust by fixing the association of each command to one role in this central definition.
The above state machine only models the collaboration with the machining center, it does not describe the current state of either of the two participants. This is especially important when considering how to handle faults or aborts: the AGV should continue with other missions unless it is trapped inside the restricted area. In order to achieve this, we add a state tracker for each AGV and machine. The former may use events like:
// tagged also with 'positionUpdate'
type PositionUpdate = { type: 'positionUpdate'; position: Coordinates }
// the following three also tagged with 'modeUpdate'
type Idle = { type: 'idle' }
type Transporting = { type: 'transporting'; from: Coordinates; to: Coordinates }
type Docking = { type: 'docking'; dockingProcessId: string }
type AgvEvent = PositionUpdate | Idle | Transporting | Docking
The AGV will be transporting
when it initiates the docking process.
The transition is triggered by the external notification that the target position has been reached.
At this point we call an API that sets the real-time controller of the AGV into docking mode and publish the corresponding event;
we also initiate the docking process:
import * as uuid from 'uuid'
const AgvTag = Tag<AgvEvent>('agv')
const agvTags = (id: string) => AgvTag.withId(id) // tags 'agv' & 'agv:<id>'
const withAgv = (id: string) => AgvTag.id(id) // only tag 'agv:<id>'
agvController.on(
'unloadingPositionReached',
(position: Coordinates, workPieces: [Position, WorkPieceId][]) => {
const dockingProcessId = uuid.v4()
agvController.setMode('docking')
pond.publish([
agvTags(agvId).apply({ type: 'docking', dockingProcessId }),
dockingTags(dockingProcessId)
.and(withAgv(agvId)) // leave out 'agv' tag because this is not an AgvEvent
.and(Tag('started'))
.apply({ type: 'unloadingRequested', workPieces }),
])
},
)
In order to implement the docking process, we will need to keep track of two states: the current state of the AGV and the state of the docking process.
While the latter is easily done with Pond.observe()
, the former requires a little more decorum while aggregations are not yet supported on the AQL subscribe
endpoint:
const getValueAndOffsets = async (query: string) => {
const results = await pond.events().queryAql(query)
const values = results.filter((r) => r.type === 'event').map((r) => r.payload)
const offsets = results.filter((r) => r.type === 'offsets').map((r) => r.offsets)
return [values[0], offsets[0]]
}
const [position, positionOffsets] = await getValueAndOffsets(
`PRAGMA features := aggregate
FROM 'agv:${agvId}' AGGREGATE LAST(CASE _.type = 'positionUpdate' => _.position ENDCASE)`,
)
const [mode, modeOffsets] = await getValueAndOffsets(
`PRAGMA features := aggregate
FROM 'agv:${agvId}' & 'modeUpdate' AGGREGATE LAST(_)`,
)
// store initial values
const agvState = {
position: position as Coordinates,
mode: mode as Idle | Transporting | Docking,
}
// update with live values
pond.events().subscribeAql(
`FROM 'agv:${agvId}' & 'positionUpdate'`,
(response) => response.type === 'event' && (agvState.position = response.payload),
console.error, // error handling needs to restart the subscriptions in this case
positionOffsets, // start the subscription at the point where queryAql() ended
)
pond.events().subscribeAql(
`FROM 'agv:${agvId}' & 'modeUpdate'`,
// newState() is discussed below
(response) => response.type === 'event' && (agvState.mode = response.payload) && newState(),
console.error,
modeOffsets,
)
While the AGV state is meaningful as long as the AGV is running, the docking process is ephemeral. We therefore only monitor it for the current workflow instance, if one is actually running.
let dockingState: { state: DockingProcessState | null; cancel: () => void } | null = null
// utility to run some asynchronous code and inhibit notifications while doing so
let ignore = false
let ignored = false
const dispatchBlocking = (func: () => Promise<void>) => {
ignore = true
ignored = false
func().finally(() => {
ignore = false
ignored && newState() // ensure that newState() is called if state may have changed
})
}
// this callback is invoked whenever the AGV mode changes or the active docking process advances
const newState = async () => {
if (ignore) {
ignored = true
return
}
const { position, mode } = agvState
switch (mode.type) {
case 'docking': {
if (dockingState === null) {
// start monitoring the docking process
dockingState = { state: null, cancel: () => {} }
dockingState.cancel = pond.observe(
DockingProcessFish(state.dockingProcessId),
(state) => (dockingState!.state = state) && newState(),
)
} else if (dockingState.state !== null) {
// then perform the docking process once we have the first state update
const { state, cancel } = dockingState
// first check if finished
if (state.type === 'outside' || state.type === 'aborted') {
cancel() // we’re finished, so clean up
dockingState = null
// TODO: tell AGV to do the next thing
return
}
// otherwise make progress
const commands = dockingCommands('agv', state.dockingProcessId, pond)(state)
if (commands.position && state.type === 'mayEnter') {
// it is our turn to enter the docking area, so do it and ignore concurrent updates
dispatchBlocking(async () => {
const coordinates = await agvController.moveIntoPosition(state.coordinates)
// tell the machine of our progress, which will update state again and call newState()
commands.position(coordinates)
})
}
// ... plus the other states in which the AGV should act
}
}
}
}
Some of the code above may be simplified by using RxJS: with that library combineLatest
combinator you can more easily join together multiple dynamically updating states and ensure that a callback is invoked once any part of it changes.
See rx-pond for how to adapt Actyx Pond streams into RxJS Observables.
Summary
What we have done here is to
- make the physical participants observable by using the tracking state pattern
- model the short-lived interaction just like a business process
- join together the respective states of physical thing and business process to publish events that drive the latter forward
The work piece request workflow
This case is a variation of the above which uses the same principles, only the business process is different. Therefore we will focus mostly on that part and assume that AGVs and machining center are appropriately observable already.
Designing the event model
The issue we need to tackle is that the machining center knows that it will soon need fresh work pieces, but it does not know which AGV will deliver them. What we need to implement is a workflow for deciding who will perform the delivery — the following engagement boils down to the case discussed above.
We start with the request, since that is rather clear:
type WorkPiecesRequested = { type: 'workPiecesRequested'; articleId: string; quantity: number }
This event refers to a class of work pieces since the machine doesn’t care which precise pieces shall be processed next.
We tag this event with agv:fleet
so that all AGVs should see it.
For the sake of simplicity we assume that the work piece scheduler has already been written: this app emits events that the AGV fleet uses to know which individual work pieces are next in line for each articleId
.
With this knowledge, each AGV can assess its own position relative to warehouse and machining center, its battery level, and its already accepted missions to answer the question whether it should offer to handle the incoming request. If it decides positively, it will emit an event to say so:
type WorkPiecesOffered = { type: 'workPiecesOffered'; estimatedDuration: number }
Since there is no central coordination among Actyx nodes, the controlling apps of multiple AGVs may make such offers. The requesting machine is a natural candidate for serving as a referee: it will wait a few seconds and then pick the best offer by publishing another event.
type WorkPiecesAccepted = { type: 'workPiecesAccepted'; agvId: string }
With this, the named AGV has been assigned the mission of delivering the requested work pieces to the machine.
Just like in the dinner preparation case some offers may not be received by the machine due to network delays. This means that the machine may pick a suboptimal offer, but it also means that the machine will get its delivery even though some AGVs were momentarily not able to communicate.
All three events defined above will be tagged with workPieceRequest:<uuid>
, the first one also with started
and agv:fleet
.
And as usual, every event is tagged with the identifier of the participant that emits it, like agv:17
.
Implementing the state machine
This is very similar to the (un)loading case discussed above.
There is a condition in the AGV state handling code that recognises a new work piece request.
In this case, the current state is used to decide whether to offer fulfilling this request, and if so an offer is made and the request workflow is monitored using Pond.observe()
.
Once the workflow ends with workPiecesAccepted
, the machine will check whether its own identifier is included in that event and act accordingly.