Skip to content

Commit

Permalink
feat: watch function
Browse files Browse the repository at this point in the history
  • Loading branch information
divdavem committed Feb 4, 2025
1 parent 37ec893 commit 3cd8a05
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 15 deletions.
37 changes: 36 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { RawStoreWithOnUse } from './internal/storeWithOnUse';
import { RawStoreWritable } from './internal/storeWritable';
import { noop } from './internal/subscribeConsumer';
import { untrack } from './internal/untrack';
import { WatcherConsumer } from './internal/watch';
import type {
AsyncDeriveFn,
AsyncDeriveOptions,
Expand All @@ -41,15 +42,17 @@ import type {
UnsubscribeObject,
Unsubscriber,
Updater,
Watcher,
Writable,
WritableSignal,
} from './types';

export { batch } from './internal/batch';
import { batch } from './internal/batch';
export { equal } from './internal/equal';
export { symbolObservable } from './internal/exposeRawStores';
export { untrack } from './internal/untrack';
export type * from './types';
export { batch };

/**
* Returns a wrapper (for the given store) which only exposes the {@link ReadableSignal} interface.
Expand Down Expand Up @@ -509,3 +512,35 @@ export function computed<T>(
): ReadableSignal<T> {
return exposeRawStore(applyStoreOptions(new RawStoreComputed(fn), options));
}

/**
* Creates a watcher on a store and returns it.
*
* @remarks
*
* A watcher calls synchronously its notify function when any store in the set of transitive dependencies of the watched store is changing.
*
* Note that the notify function must not read or write any store. It should not do any heavy task, it usually only schedules some work to be done later.
* It is called even inside {@link batch}.
*
* A watcher is initially created in the dirty state.
*
* When a watcher is in the dirty state, the notify function is not called until the {@link Watcher.update|update} method is called.
*
* The {@link Watcher.update|update} method clears the dirty state and updates the watched store, allowing the notify function to be called the next
* time any store in the set of transitive dependencies of the watched store changes.
*
* When a watcher is no longer needed, it should be destroyed by calling its {@link Watcher.destroy|destroy} method.
*
* @param store - store to watch
* @param notify - function that will be called synchronously when any store in the set of transitive dependencies of the watched store is changing
* @returns watcher object
*/
export function watch(store: StoreInput<any>, notify: () => void): Watcher {
const watcherConsumer = new WatcherConsumer(getRawStore(store), notify);
return {
isDirty: watcherConsumer.isDirty.bind(watcherConsumer),
update: watcherConsumer.update.bind(watcherConsumer),
destroy: watcherConsumer.destroy.bind(watcherConsumer),
};
}

Check warning on line 546 in src/index.ts

View check run for this annotation

Codecov / codecov/patch

src/index.ts#L540-L546

Added lines #L540 - L546 were not covered by tests
6 changes: 2 additions & 4 deletions src/internal/batch.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import type { SubscribeConsumer } from './subscribeConsumer';

export const subscribersQueue: SubscribeConsumer<any, any>[] = [];
export const subscribersQueue: { process(): void }[] = [];
let willProcessQueue = false;

/**
Expand Down Expand Up @@ -56,7 +54,7 @@ export const batch = <T>(fn: () => T): T => {
while (subscribersQueue.length > 0) {
const consumer = subscribersQueue.shift()!;
try {
consumer.notify();
consumer.process();
} catch (e) {
// an error in one consumer should not impact others
if (success) {
Expand Down
3 changes: 2 additions & 1 deletion src/internal/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ export interface RawStore<T, Link extends BaseLink<T> = BaseLink<T>>
unregisterConsumer(link: Link): void;
updateValue(): void;
isLinkUpToDate(link: Link): boolean;
updateLink(link: Link): T;
updateLink(link: Link): void;
readValue(): T;
}

export const updateLinkProducerValue = <T>(link: BaseLink<T>): void => {
Expand Down
3 changes: 2 additions & 1 deletion src/internal/storeComputed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ export class RawStoreComputed<T>
if (producer.flags & RawStoreFlags.HAS_VISIBLE_ONUSE) {
this.flags |= RawStoreFlags.HAS_VISIBLE_ONUSE;
}
return producer.updateLink(link);
producer.updateLink(link);
return producer.readValue();
}

override startUse(): void {
Expand Down
3 changes: 2 additions & 1 deletion src/internal/storeConst.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ export class RawStoreConst<T> implements RawStore<T, BaseLink<T>> {
isLinkUpToDate(_link: BaseLink<T>): boolean {
return true;
}
updateLink(_link: BaseLink<T>): T {
updateLink(_link: BaseLink<T>): void {}
readValue(): T {
return this.value;
}
get(): T {
Expand Down
6 changes: 5 additions & 1 deletion src/internal/storeDerived.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ abstract class RawStoreDerived<T, S extends StoresInput>
override recompute(): void {
try {
this.callCleanUpFn();
const values = this.producerLinks!.map((link) => link.producer.updateLink(link));
const values = this.producerLinks!.map((link) => {
const producer = link.producer;
producer.updateLink(link);
return producer.readValue();
});
this.cleanUpFn = normalizeUnsubscribe(this.derive(this.arrayMode ? values : values[0]));
} catch (error) {
this.error = error;
Expand Down
3 changes: 1 addition & 2 deletions src/internal/storeWritable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,9 @@ export class RawStoreWritable<T> implements RawStore<T, ProducerConsumerLink<T>>
return res;
}

updateLink(link: ProducerConsumerLink<T>): T {
updateLink(link: ProducerConsumerLink<T>): void {
link.value = this.value;
link.version = this.version;
return this.readValue();
}

registerConsumer(link: ProducerConsumerLink<T>): ProducerConsumerLink<T> {
Expand Down
8 changes: 4 additions & 4 deletions src/internal/subscribeConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export class SubscribeConsumer<T, Link extends BaseLink<T>> implements Consumer
constructor(producer: RawStore<T, Link>, subscriber: Subscriber<T>) {
this.subscriber = toSubscriberObject(subscriber);
this.link = producer.registerConsumer(producer.newLink(this));
this.notify(true);
this.process(true);
}

unsubscribe(): void {
Expand All @@ -46,7 +46,7 @@ export class SubscribeConsumer<T, Link extends BaseLink<T>> implements Consumer
}
}

notify(first = false): void {
process(first = false): void {
this.dirtyCount--;
if (this.dirtyCount === 0 && this.subscriber !== noopSubscriber) {
const link = this.link;
Expand All @@ -55,9 +55,9 @@ export class SubscribeConsumer<T, Link extends BaseLink<T>> implements Consumer
if (producer.isLinkUpToDate(link) && !first) {
this.subscriber.resume();
} else {
producer.updateLink(link);
// note that the following line can throw
const value = producer.updateLink(link);
this.subscriber.next(value);
this.subscriber.next(producer.readValue());
}
}
}
Expand Down
51 changes: 51 additions & 0 deletions src/internal/watch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import type { Watcher } from '../types';
import { updateLinkProducerValue, type BaseLink, type Consumer, type RawStore } from './store';
import { noop } from './subscribeConsumer';

export class WatcherConsumer<T, Link extends BaseLink<T>> implements Consumer, Watcher {
dirty = false;
link: Link | undefined;
constructor(
producer: RawStore<T, Link>,
private notifyFn: () => void
) {
this.link = producer.registerConsumer(producer.newLink(this));
}

Check warning on line 13 in src/internal/watch.ts

View check run for this annotation

Codecov / codecov/patch

src/internal/watch.ts#L9-L13

Added lines #L9 - L13 were not covered by tests

isDirty(): boolean {
return this.dirty;
}

Check warning on line 17 in src/internal/watch.ts

View check run for this annotation

Codecov / codecov/patch

src/internal/watch.ts#L16-L17

Added lines #L16 - L17 were not covered by tests

markDirty(): void {
if (!this.dirty) {
this.dirty = true;
const notifyFn = this.notifyFn;
notifyFn();
}
}

Check warning on line 25 in src/internal/watch.ts

View check run for this annotation

Codecov / codecov/patch

src/internal/watch.ts#L20-L25

Added lines #L20 - L25 were not covered by tests

update(): boolean {
if (this.dirty) {
this.dirty = false;
const link = this.link!;
const producer = link.producer;
updateLinkProducerValue(link);
if (producer.isLinkUpToDate(link)) {
return false;
}
producer.updateLink(link);
return true;
}
return false;
}

Check warning on line 40 in src/internal/watch.ts

View check run for this annotation

Codecov / codecov/patch

src/internal/watch.ts#L28-L40

Added lines #L28 - L40 were not covered by tests

destroy(): void {
const link = this.link;
if (link) {
this.link = undefined;
this.notifyFn = noop;
this.dirty = false;
link.producer.unregisterConsumer(link);
}
}

Check warning on line 50 in src/internal/watch.ts

View check run for this annotation

Codecov / codecov/patch

src/internal/watch.ts#L43-L50

Added lines #L43 - L50 were not covered by tests
}
24 changes: 24 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,3 +260,27 @@ export type AsyncDeriveFn<T, S> = (
export interface AsyncDeriveOptions<T, S> extends Omit<StoreOptions<T>, 'onUse'> {
derive: AsyncDeriveFn<T, S>;
}

/**
* Watcher interface.
*/
export interface Watcher {
/**
* Whether the watcher is dirty (i.e. the {@link Watcher.update|update} method needs to be called before notify can be called).
*/
isDirty(): boolean;

/**
* Updates the watched store and returns true if its value changed (since the last call of update) or false if it stayed the same.
*/
update(): boolean;

/**
* Destroys the watcher object.
*
* @remarks
*
* After a watcher is destroyed, its notify function will no longer be called, and it should no longer be used.
*/
destroy(): void;
}

0 comments on commit 3cd8a05

Please sign in to comment.