Skip to main content

@actyx/pond#

Table of contents#

Enumerations#

Interfaces#

Type aliases#

AddEmission#

Ƭ AddEmission<EWrite>: <E>(tags: Tags<E>, event: E) => void

Queue emission of an event whose type is covered by EWrite.

Type parameters:#

Name
EWrite

Type declaration:#

▸ <E>(tags: Tags<E>, event: E): void

Type parameters:#

NameType
EEWrite

Parameters:#

NameType
tagsTags<E>
eventE

Returns: void


Caching#

Ƭ Caching: NoCaching | InProcessCaching

Caching indicator for pond.observeAll(). @beta


CancelSubscription#

Ƭ CancelSubscription: () => void

Cancel an ongoing aggregation (the provided callback will stop being called).

Type declaration:#

▸ (): void

Returns: void


ConnectivityStatus#

Ƭ ConnectivityStatus: t.TypeOf<typeof ConnectivityStatus>

Current connectivity of the underlying ActyxOS node.


Counters#

Ƭ Counters: Readonly<CountersMut>

Immutable swarm counters information.


CountersMut#

Ƭ CountersMut: object

Mutable swarm counters information.

Type declaration:#

NameTypeDescription
bothnumberBoth the pond and the swarm have it
ownnumberThe pond has it, the swarm doesn't
swarmnumberThe swarm has it, the pond does not

Fish#

Ƭ Fish<S, E>: Readonly<{ deserializeState?: (jsonState: unknown) => S ; fishId: FishId ; initialState: S ; isReset?: IsReset<E> ; onEvent: Reduce<S, E> ; where: Where<E> }>

A Fish<S, E> describes an ongoing aggregration (fold) of events of type E into state of type S. A Fish always sees events in the correct order, even though event delivery on ActyxOS is only eventually consistent: To this effect, arrival of an hitherto unknown event "from the past" will cause a replay of the aggregation from an earlier state, instead of passing that event to the Fish out of order.

Type parameters:#

Name
S
E

FishErrorContext#

Ƭ FishErrorContext: { event: unknown ; metadata: Metadata ; occuredIn: onEvent ; state: unknown } | { event: unknown ; metadata: Metadata ; occuredIn: isReset } | { jsonState: unknown ; occuredIn: deserializeState }

Context for an error thrown by a Fish’s functions. @public


FishErrorReporter#

Ƭ FishErrorReporter: (err: unknown, fishId: FishId, detail: FishErrorContext) => void

Error reporter for when Fish functions throw exceptions. @public

Type declaration:#

▸ (err: unknown, fishId: FishId, detail: FishErrorContext): void

Parameters:#

NameType
errunknown
fishIdFishId
detailFishErrorContext

Returns: void


FishId#

Ƭ FishId: object

Unique identifier for a fish.

Type declaration:#

NameType
entityTypestring
namestring
versionnumber

FishProcessInfo#

Ƭ FishProcessInfo: object

How many Fish are active, and of which entityTypes?

Type declaration:#

NameType
fishobject
numBeingProcessednumber

FullWaitForSwarmConfig#

Ƭ FullWaitForSwarmConfig: Readonly<{ allowSkip: boolean ; enabled: boolean ; minSources: number ; waitForSwarmMs: number ; waitForSyncMs?: number }>

Configure how to wait for swarm. @public


GetNodeConnectivityParams#

Ƭ GetNodeConnectivityParams: Readonly<{ callback: (newState: ConnectivityStatus) => void ; specialSources?: ReadonlyArray<SourceId> }>

Parameter object for the Pond.getNodeConnectivity call. @public


InProcessCaching#

Ƭ InProcessCaching: Readonly<{ key: string ; type: in-process }>

Indicate in-process (nonpersistent) Caching. @beta


IsReset#

Ƭ IsReset<E>: (event: E, metadata: Metadata) => boolean

A function indicating events which completely determine the state. Any event for which isReset returns true will be applied to the initial state, all earlier events discarded.

Type parameters:#

Name
E

Type declaration:#

▸ (event: E, metadata: Metadata): boolean

Parameters:#

NameType
eventE
metadataMetadata

Returns: boolean


Lamport#

Ƭ Lamport: number

Lamport timestamp, cf. https://en.wikipedia.org/wiki/Lamport_timestamp


LogFunction#

Ƭ LogFunction: (first: any, ...rest: any[]) => void

Generic logging function signature.

Type declaration:#

▸ (first: any, ...rest: any[]): void

Parameters:#

NameType
firstany
...restany[]

Returns: void


Loggers#

Ƭ Loggers: object

A collection of loggers of different severity for a fixed topic.

Type declaration:#

NameType
debugLogger
errorLogger
infoLogger
warnLogger

Metadata#

Ƭ Metadata: Readonly<{ eventId: string ; isLocalEvent: boolean ; lamport: Lamport ; tags: ReadonlyArray<string> ; timestampAsDate: () => Date ; timestampMicros: Timestamp }>

Generic Metadata attached to every event. @public


Milliseconds#

Ƭ Milliseconds: number

Some number of milliseconds. @public


NoCaching#

Ƭ NoCaching: object

Indicator for disabled caching of pond.observeAll(). @beta

Type declaration:#

NameType
typenone

NodeInfoEntry#

Ƭ NodeInfoEntry: Readonly<{ own?: number ; swarm?: number }>

All the info we got for a single node

This might grow in the future to include things like timestamps


ObserveAllOpts#

Ƭ ObserveAllOpts: Partial<{ caching: Caching ; expireAfterFirst: Milliseconds ; expireAfterSeed: Milliseconds }>

Optional parameters to pond.observeAll @beta


PendingEmission#

Ƭ PendingEmission: object

Allows you to register actions for when event emission has completed.

Type declaration:#

NameType
subscribe(whenEmitted: () => void) => void
toPromise() => Promise<void>

Pond#

Ƭ Pond: object

Main interface for interaction with the ActyxOS event system. New instances are created via Pond.default() or Pond.of(options). Acquire a Pond for testing (which uses a simulated clean Event Store) via Pond.test().

Type declaration:#

NameType
dispose() => void
emit<E>(tags: Tags<E>, event: E) => PendingEmission
getNodeConnectivity(params: Readonly<{ callback: (newState: ConnectivityStatus) => void ; specialSources?: ReadonlyArray<SourceId> }>) => CancelSubscription
getPondState(callback: (newState: PondState) => void) => CancelSubscription
info() => PondInfo
keepRunning<S, EWrite>(fish: Readonly<{ deserializeState?: (jsonState: unknown) => S ; fishId: FishId ; initialState: S ; isReset?: IsReset<E> ; onEvent: Reduce<S, E> ; where: Where<E> }>, fn: StateEffect<S, EWrite>, autoCancel?: (state: S) => boolean) => CancelSubscription
observe<S, E>(fish: Readonly<{ deserializeState?: (jsonState: unknown) => S ; fishId: FishId ; initialState: S ; isReset?: IsReset<E> ; onEvent: Reduce<S, E> ; where: Where<E> }>, callback: (newState: S) => void, stoppedByError?: (err: unknown) => void) => CancelSubscription
observeAll<ESeed, S>(seedEventsSelector: Where<ESeed>, makeFish: (seedEvent: ESeed) => undefined | Readonly<{ deserializeState?: (jsonState: unknown) => S ; fishId: FishId ; initialState: S ; isReset?: IsReset<E> ; onEvent: Reduce<S, E> ; where: Where<E> }>, opts: Partial<{ caching: Caching ; expireAfterFirst: Milliseconds ; expireAfterSeed: Milliseconds }>, callback: (states: S[]) => void) => CancelSubscription
observeOne<ESeed, S>(seedEventSelector: Where<ESeed>, makeFish: (seedEvent: ESeed) => Readonly<{ deserializeState?: (jsonState: unknown) => S ; fishId: FishId ; initialState: S ; isReset?: IsReset<E> ; onEvent: Reduce<S, E> ; where: Where<E> }>, callback: (newState: S) => void, stoppedByError?: (err: unknown) => void) => CancelSubscription
run<S, EWrite>(fish: Readonly<{ deserializeState?: (jsonState: unknown) => S ; fishId: FishId ; initialState: S ; isReset?: IsReset<E> ; onEvent: Reduce<S, E> ; where: Where<E> }>, fn: StateEffect<S, EWrite>) => PendingEmission
waitForSwarmSync(params: WaitForSwarmSyncParams) => void

PondInfo#

Ƭ PondInfo: object

Information concerning the running Pond. @public

Type declaration:#

NameType
sourceIdSourceId

PondOptions#

Ƭ PondOptions: object

Advanced configuration options for the Pond. @public

Type declaration:#

NameType
currentPsnHistoryDelay?number
fishErrorReporter?FishErrorReporter
hbHistDelay?number
stateEffectDebounce?number
updateConnectivityEvery?Milliseconds

PondState#

Ƭ PondState: object

What sort of activity is currently going on in the Pond?

Type declaration:#

NameType
commandsFishProcessInfo
eventsFromOtherSourcesFishProcessInfo
hydrationFishProcessInfo

Progress#

Ƭ Progress: Readonly<{ current: number ; max: number ; min: number }>

Sync progress in terms of event numbers. @public


Reduce#

Ƭ Reduce<S, E>: (state: S, event: E, metadata: Metadata) => S

Combine the existing ("old") state and next event into a new state. The returned value may be something completely new, or a mutated version of the input state.

Type parameters:#

Name
S
E

Type declaration:#

▸ (state: S, event: E, metadata: Metadata): S

Parameters:#

NameType
stateS
eventE
metadataMetadata

Returns: S


SourceId#

Ƭ SourceId: string

An ActyxOS source id.


SplashState#

Ƭ SplashState: SplashStateDiscovery | SplashStateSync

Current state of swarm synchronization procedure.


SplashStateDiscovery#

Ƭ SplashStateDiscovery: Readonly<{ current: SwarmSummary ; mode: discovery ; skip?: () => void }>

Discovering swarm state.


SplashStateSync#

Ƭ SplashStateSync: Readonly<{ current: SwarmSummary ; mode: sync ; progress: SyncProgress ; reference: SwarmSummary ; skip?: () => void }>

Synchronizing up to the discovered swarm state.


StateEffect#

Ƭ StateEffect<S, EWrite>: (state: S, enqueue: AddEmission<EWrite>) => void | Promise<void>

Enqueue event emissions based on currently known local state.

Type parameters:#

Name
S
EWrite

Type declaration:#

▸ (state: S, enqueue: AddEmission<EWrite>): void | Promise<void>

Parameters:#

NameType
stateS
enqueueAddEmission<EWrite>

Returns: void | Promise<void>


StoreConfig#

Ƭ StoreConfig: Readonly<{ metaMs: number ; monitoringMeta?: object ; runStatsPeriodMs: number }>

Actyx store communication configuration. @public


StoreConnectionClosedHook#

Ƭ StoreConnectionClosedHook: () => void

Hook to run on store connection being closed. @public

Type declaration:#

▸ (): void

Returns: void


SwarmInfo#

Ƭ SwarmInfo: Readonly<{ nodes: immutable.Map<string, NodeInfoEntry> }>

All the info we got for our device in relation to the swarm


SwarmSummary#

Ƭ SwarmSummary: Readonly<{ events: Counters ; info: SwarmInfo ; sources: Counters }>

Summary of swarm info


SyncProgress#

Ƭ SyncProgress: Readonly<{ events: Progress ; sources: Progress }>

Sync progress per source, and overall. @public


TestEvent#

Ƭ TestEvent: object

A raw Actyx event to be emitted by the TestEventStore, as if it really arrived from the outside.

Type declaration:#

NameType
lamportLamport
payloadunknown
psnnumber
sourceIdstring
tagsReadonlyArray<string>
timestampTimestamp

TestPond#

Ƭ TestPond: Pond & { directlyPushEvents: (events: TestEvent[]) => void }

A Pond with extensions for testing. @public


Timestamp#

Ƭ Timestamp: number

Timestamp (UNIX epoch), MICROseconds resolution. @public


WaitForSwarmConfig#

Ƭ WaitForSwarmConfig: Partial<FullWaitForSwarmConfig>

Partially configure waiting for the swarm. @public


WaitForSwarmSyncParams#

Ƭ WaitForSwarmSyncParams: WaitForSwarmConfig & Readonly<{ onProgress?: (newState: SplashState) => void ; onSyncComplete: () => void }>

Parameter object for the Pond.waitForSwarmSync call. @public


WsStoreConfig#

Ƭ WsStoreConfig: Readonly<{ onStoreConnectionClosed?: StoreConnectionClosedHook ; protocol?: string ; reconnectTimeout?: number ; url: string }>

Configuration for the WebSocket store connection. @public

Variables#

Caching#

Caching: object

Caching related functions @beta

Type declaration:#

NameType
inProcess(key: string) => Caching
isEnabled(c: undefined | Caching) => c is Readonly<object>
noneobject
none.typenone

ConnectivityStatus#

Const ConnectivityStatus: UnionC<[ReadonlyC<TypeC<{ inCurrentStatusForMs: NumberC ; status: LiteralC<FullyConnected> }>>, ReadonlyC<TypeC<{ eventsToRead: NumberC ; eventsToSend: NumberC ; inCurrentStatusForMs: NumberC ; specialsDisconnected: ReadonlyArrayC<StringC> ; status: LiteralC<PartiallyConnected> ; swarmConnectivityLevel: NumberC }>>, ReadonlyC<TypeC<{ eventsToRead: NumberC ; eventsToSend: NumberC ; inCurrentStatusForMs: NumberC ; status: LiteralC<NotConnected> }>>]>

The IO-TS type parser for ConnectivityStatus.


Fish#

Fish: object

Fish generic generator methods.

Type declaration:#

NameType
eventsAscending<E>(where: Where<E>, capacity: number) => Readonly<{ deserializeState?: (jsonState: unknown) => E[] ; fishId: FishId ; initialState: E[] ; isReset?: IsReset<E> ; onEvent: Reduce<E[], E> ; where: Where<E> }>
eventsDescending<E>(where: Where<E>, capacity: number) => Readonly<{ deserializeState?: (jsonState: unknown) => E[] ; fishId: FishId ; initialState: E[] ; isReset?: IsReset<E> ; onEvent: Reduce<E[], E> ; where: Where<E> }>
latestEvent<E>(where: Where<E>) => Readonly<{ deserializeState?: (jsonState: unknown) => undefined | E ; fishId: FishId ; initialState: undefined | E ; isReset?: IsReset<E> ; onEvent: Reduce<undefined | E, E> ; where: Where<E> }>

FishId#

FishId: object

FishId associated functions.

Type declaration:#

NameType
canonical(v: FishId) => string
of(entityType: string, name: string, version: number) => { entityType: string ; name: string ; version: number }

Lamport#

Lamport: object

Type declaration:#

NameType
FromNumberType<number, number, unknown>
of(value: number) => number
zeronumber

Loggers#

Loggers: object

Loggers associated methods. @public

Type declaration:#

NameType
of(topic: string) => Loggers

Milliseconds#

Milliseconds: object

Helper functions for making sense of and converting Milliseconds. @public

Type declaration:#

NameType
FromNumberType<number, number, unknown>
fromAny(value: number) => number
fromDate(date: Date) => number
fromMinutes(value: number) => number
fromSeconds(value: number) => number
now(now?: number) => number
of(time: number) => number
toSeconds(value: number) => number
toTimestamp(value: number) => number
zeronumber

Pond#

Pond: object

Static methods for constructing Pond instances. @public

Type declaration:#

NameType
default() => Promise<Pond>
mock(opts?: PondOptions) => Pond
of(connectionOpts: Partial<Readonly<{ onStoreConnectionClosed?: StoreConnectionClosedHook ; protocol?: string ; reconnectTimeout?: number ; url: string }>>, opts: PondOptions) => Promise<Pond>
test(opts?: PondOptions) => TestPond

PondState#

PondState: object

PondState associated functions.

Type declaration:#

NameType
isBusy(state: PondState) => boolean
isHydrating(state: PondState) => boolean
isProcessingCommands(state: PondState) => boolean
isProcessingEventsFromOtherSources(state: PondState) => boolean

SourceId#

SourceId: object

SourceId associated functions.

Type declaration:#

NameType
FromStringType<string, string, unknown>
of(text: string) => string
random(digits?: number) => string

SwarmSummary#

SwarmSummary: object

SwarmSummary associated functions. @public

Type declaration:#

NameType
emptyReadonly<{ events: Readonly<CountersMut> ; info: Readonly<{ nodes: Map<string, Readonly<{ own?: number ; swarm?: number }>> }> ; sources: Readonly<CountersMut> }>
fromSwarmInfo(info: Readonly<{ nodes: Map<string, Readonly<{ own?: number ; swarm?: number }>> }>) => Readonly<{ events: Readonly<CountersMut> ; info: Readonly<{ nodes: Map<string, Readonly<{ own?: number ; swarm?: number }>> }> ; sources: Readonly<CountersMut> }>

Timestamp#

Timestamp: object

Helper functions for making sense of and converting Timestamps. @public

Type declaration:#

NameType
FromNumberType<number, number, unknown>
format(timestamp: number) => string
fromDate(date: Date) => number
fromDays(value: number) => number
fromMilliseconds(value: number) => number
fromSeconds(value: number) => number
max(values: number[]) => number
maxSafenumber
min(...values: number[]) => number
now(now?: number) => number
of(time: number) => number
toDate(value: number) => Date
toMilliseconds(value: number) => number
toSeconds(value: number) => number
zeronumber

WaitForSwarmConfig#

WaitForSwarmConfig: object

WaitForSwarmConfig associated functions. @public

Type declaration:#

NameType
defaultsReadonly<{ allowSkip: boolean ; enabled: boolean ; minSources: number ; waitForSwarmMs: number ; waitForSyncMs?: number }>

allEvents#

Const allEvents: Tags<unknown>

A Where expression that selects all events.


noEvents#

Const noEvents: Where<never>

A Where expression that selects no events.

Functions#

Tag#

ConstTag<E>(rawTag: string): Tag<E>

Create a new tag from the given string. (Tag factory function. Call WITHOUT new, e.g. const myTag = Tag<MyType>('my-tag'))

Type parameters:#

Name
E

Parameters:#

NameType
rawTagstring

Returns: Tag<E>


Tags#

ConstTags<E>(...requiredTags: string[]): Tags<E>

Declare a set of tags. This is a generator function to be called WITHOUT new, e.g. const required = Tags('a', 'b', 'c')

Type parameters:#

Name
E

Parameters:#

NameType
...requiredTagsstring[]

Returns: Tags<E>


enableAllLoggersExcept#

ConstenableAllLoggersExcept(excludeModules: string[]): void

Utility function to enable all logging with exception for passed in logger namespaces. For excluded logger namespaces errors will still be logged!

Parameters:#

NameType
excludeModulesstring[]

Returns: void


isBoolean#

ConstisBoolean(x: any): x is boolean

Refinement that checks whether typeof x === 'boolean'

Parameters:#

NameType
xany

Returns: x is boolean


isNumber#

ConstisNumber(x: any): x is number

Refinement that checks whether typeof x === 'number'

Parameters:#

NameType
xany

Returns: x is number


isString#

ConstisString(x: any): x is string

Refinement that checks whether typeof x === 'string'

Parameters:#

NameType
xany

Returns: x is string


unreachable#

Constunreachable(x?: undefined): never

Assert that from the type information, a piece of code can never be reached. If it’s still reached at runtime, this throws an Error.

Parameters:#

NameType
x?undefined

Returns: never


unreachableOrElse#

unreachableOrElse<T>(_: never, t: T): T

Assert that from the type information, a certain statement can never be reached, while installing a default value to return in case the type information was wrong and the statement was in fact reached.

Type parameters:#

Name
T

Parameters:#

NameType
_never
tT

Returns: T