Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Synchronizers #5071

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
18 changes: 18 additions & 0 deletions .changeset/silly-needles-applaud.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
'xstate': minor
---

Added support for synchronizers in XState, allowing state persistence and synchronization across different storage mechanisms.

- Introduced `Synchronizer` interface for implementing custom synchronization logic
- Added `sync` option to `createActor` for attaching synchronizers to actors

```ts
import { createActor } from 'xstate';
import { someMachine } from './someMachine';
import { createLocalStorageSync } from './localStorageSynchronizer';

const actor = createActor(someMachine, {
sync: createLocalStorageSync('someKey')
});
```
14 changes: 8 additions & 6 deletions packages/core/src/StateMachine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -471,13 +471,15 @@ export class StateMachine<
TConfig
>
): void {
Object.values(snapshot.children as Record<string, AnyActorRef>).forEach(
(child: any) => {
if (child.getSnapshot().status === 'active') {
child.start();
if (snapshot.children) {
Object.values(snapshot.children as Record<string, AnyActorRef>).forEach(
(child: any) => {
if (child.getSnapshot().status === 'active') {
child.start();
}
}
}
);
);
}
}

public getStateNodeById(stateId: string): StateNode<TContext, TEvent> {
Expand Down
26 changes: 24 additions & 2 deletions packages/core/src/createActor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import type {
InputFrom,
IsNotNever,
Snapshot,
SnapshotFrom
SnapshotFrom,
Synchronizer
} from './types.ts';
import {
ActorOptions,
Expand Down Expand Up @@ -119,6 +120,9 @@ export class Actor<TLogic extends AnyActorLogic>

public src: string | AnyActorLogic;

private _synchronizer?: Synchronizer<any>;
private _synchronizerSubscription?: Subscription;

/**
* Creates a new actor instance for the given logic with the provided options,
* if any.
Expand Down Expand Up @@ -207,7 +211,23 @@ export class Actor<TLogic extends AnyActorLogic>
this.system._set(systemId, this);
}

this._initState(options?.snapshot ?? options?.state);
this._synchronizer = options?.sync;

const initialSnapshot =
this._synchronizer?.getSnapshot() ?? options?.snapshot ?? options?.state;

this._initState(initialSnapshot);

if (this._synchronizer) {
this._synchronizerSubscription = this._synchronizer.subscribe(
(rawSnapshot) => {
const restoredSnapshot =
this.logic.restoreSnapshot?.(rawSnapshot, this._actorScope) ??
rawSnapshot;
this.update(restoredSnapshot, { type: '@xstate.sync' });
}
);
}

if (systemId && (this._snapshot as any).status !== 'active') {
this.system._unregister(this);
Expand Down Expand Up @@ -263,6 +283,7 @@ export class Actor<TLogic extends AnyActorLogic>

switch ((this._snapshot as any).status) {
case 'active':
this._synchronizer?.setSnapshot(snapshot);
for (const observer of this.observers) {
try {
observer.next?.(snapshot);
Expand Down Expand Up @@ -567,6 +588,7 @@ export class Actor<TLogic extends AnyActorLogic>
return this;
}
this.mailbox.clear();
this._synchronizerSubscription?.unsubscribe();
if (this._processingStatus === ProcessingStatus.NotStarted) {
this._processingStatus = ProcessingStatus.Stopped;
return this;
Expand Down
13 changes: 13 additions & 0 deletions packages/core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1884,6 +1884,19 @@ export interface ActorOptions<TLogic extends AnyActorLogic> {
inspect?:
| Observer<InspectionEvent>
| ((inspectionEvent: InspectionEvent) => void);

sync?: Synchronizer<SnapshotFrom<TLogic>>;
}

export interface Synchronizer<T> extends Subscribable<T> {
/**
* Gets the snapshot or undefined
*
* An undefined snapshot means the synchronizer does not intend to override
* the initial or provided snapshot of the actor
*/
getSnapshot(): Snapshot<T> | undefined;
setSnapshot(snapshot: T): void;
}

export type AnyActor = Actor<any>;
Expand Down
176 changes: 176 additions & 0 deletions packages/core/test/sync.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import {
createActor,
createMachine,
Observer,
Synchronizer,
toObserver,
waitFor
} from '../src';

describe('synchronizers', () => {
it('work with a synchronous synchronizer', () => {
const snapshotRef = {
current: JSON.stringify({ value: 'b', children: {}, status: 'active' })
};
const pseudoStorage = {
getItem: (key: string) => {
return JSON.parse(snapshotRef.current);
},
setItem: (key: string, value: string) => {
snapshotRef.current = value;
}
};
const createStorageSync = (key: string): Synchronizer<any> => {
const observers = new Set();
return {
getSnapshot: () => pseudoStorage.getItem(key),
setSnapshot: (snapshot) => {
pseudoStorage.setItem(key, JSON.stringify(snapshot));
},
subscribe: (o) => {
const observer = toObserver(o);

const state = pseudoStorage.getItem(key);

observer.next?.(state);

observers.add(observer);

return {
unsubscribe: () => {
observers.delete(observer);
}
};
}
};
};

const machine = createMachine({
initial: 'a',
states: {
a: {},
b: {
on: {
next: 'c'
}
},
c: {}
}
});

const actor = createActor(machine, {
sync: createStorageSync('test')
}).start();

expect(actor.getSnapshot().value).toBe('b');

actor.send({ type: 'next' });

expect(actor.getSnapshot().value).toBe('c');

expect(pseudoStorage.getItem('test').value).toBe('c');
});

it('work with an asynchronous synchronizer', async () => {
let snapshotRef = {
current: undefined as any
};
let onChangeRef = {
current: (() => {}) as (value: any) => void
};
const pseudoStorage = {
getItem: async (key: string) => {
if (!snapshotRef.current) {
return undefined;
}
return JSON.parse(snapshotRef.current);
},
setItem: (key: string, value: string, source?: 'sync') => {
snapshotRef.current = value;

if (source !== 'sync') {
onChangeRef.current(JSON.parse(value));
}
},
subscribe: (fn: (value: any) => void) => {
onChangeRef.current = fn;
}
};

const createStorageSync = (key: string): Synchronizer<any> => {
const observers = new Set<Observer<any>>();

pseudoStorage.subscribe((value) => {
observers.forEach((observer) => {
observer.next?.(value);
});
});

const getSnapshot = () => {
if (!snapshotRef.current) {
return undefined;
}
return JSON.parse(snapshotRef.current);
};

const storageSync = {
getSnapshot,
setSnapshot: (snapshot) => {
const s = JSON.stringify(snapshot);
pseudoStorage.setItem(key, s, 'sync');
},
subscribe: (o) => {
const observer = toObserver(o);

const state = getSnapshot();

if (state) {
observer.next?.(state);
}

observers.add(observer);

return {
unsubscribe: () => {
observers.delete(observer);
}
};
}
} satisfies Synchronizer<any>;

setTimeout(() => {
pseudoStorage.setItem(
'key',
JSON.stringify({ value: 'b', children: {}, status: 'active' })
);
}, 100);

return storageSync;
};

const machine = createMachine({
initial: 'a',
states: {
a: {},
b: {
on: {
next: 'c'
}
},
c: {}
}
});

const actor = createActor(machine, {
sync: createStorageSync('test')
}).start();

expect(actor.getSnapshot().value).toBe('a');

await waitFor(actor, () => actor.getSnapshot().value === 'b');

actor.send({ type: 'next' });

expect(actor.getSnapshot().value).toBe('c');
});
});