Skip to content

Commit

Permalink
feat: replay option for RxReq
Browse files Browse the repository at this point in the history
  • Loading branch information
penpenpng committed Jan 14, 2025
1 parent f324ce9 commit 74679ed
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 11 deletions.
73 changes: 73 additions & 0 deletions packages/core/src/__test__/rx-req-replay.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { afterEach, beforeEach, expect, test } from "vitest";
import { createMockRelay, type MockRelay } from "vitest-nostr";

import {
createRxBackwardReq,
createRxNostr,
noopVerifier,
RxNostr,
} from "../index.js";
import { disposeMockRelay, stateWillBe } from "./helper.js";

const DEFAULT_RELAY = "ws://localhost:1234";
let rxNostr: RxNostr;
let defaultRelay: MockRelay;

beforeEach(async () => {
defaultRelay = createMockRelay(DEFAULT_RELAY);

rxNostr = createRxNostr({
verifier: noopVerifier,
skipFetchNip11: true,
skipVerify: true,
});
await rxNostr.setDefaultRelays([DEFAULT_RELAY]);
});

afterEach(() => {
rxNostr.dispose();
disposeMockRelay(defaultRelay);
});

test("`replay` option works well.", async () => {
const rxReq = createRxBackwardReq({ rxReqId: "sub", replay: true });

rxReq.emit({ limit: 1 });
rxReq.emit({ limit: 2 });
rxReq.emit({ limit: 3 });

rxNostr.use(rxReq).subscribe();

await defaultRelay.connected;
await expect(defaultRelay).toReceiveREQ(["sub:0", { limit: 1 }]);
await expect(defaultRelay).toReceiveREQ(["sub:1", { limit: 2 }]);
await expect(defaultRelay).toReceiveREQ(["sub:2", { limit: 3 }]);
});

test("`replay` option works well even if over() is called.", async () => {
const rxReq = createRxBackwardReq({ rxReqId: "sub", replay: true });

rxReq.emit({ limit: 1 });
rxReq.emit({ limit: 2 });
rxReq.emit({ limit: 3 });
rxReq.over();

rxNostr.use(rxReq).subscribe();

await defaultRelay.connected;
await expect(defaultRelay).toReceiveREQ(["sub:0", { limit: 1 }]);
await expect(defaultRelay).toReceiveREQ(["sub:1", { limit: 2 }]);
await expect(defaultRelay).toReceiveREQ(["sub:2", { limit: 3 }]);

await expect(stateWillBe(rxNostr, DEFAULT_RELAY, "connected")).resolves.toBe(
true,
);

defaultRelay.emitEOSE("sub:0");
defaultRelay.emitEOSE("sub:1");
defaultRelay.emitEOSE("sub:2");

await expect(stateWillBe(rxNostr, DEFAULT_RELAY, "dormant")).resolves.toBe(
true,
);
});
62 changes: 51 additions & 11 deletions packages/core/src/rx-nostr/rx-req.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import * as Nostr from "nostr-typedef";
import { Observable, of, type OperatorFunction, Subject } from "rxjs";
import {
Observable,
of,
type OperatorFunction,
ReplaySubject,
Subject,
} from "rxjs";

import { LazyFilter, ReqPacket } from "../packet.js";

Expand Down Expand Up @@ -150,6 +156,12 @@ const createRxReq = <S extends RxReqStrategy>(params: {
};
};

export interface RxBackwardReqOptions {
/** @deprecated */
rxReqId?: string;
replay?: boolean;
}

/**
* Create a RxReq instance based on the backward strategy.
* It is useful if you want to retrieve past events that have already been published.
Expand All @@ -162,15 +174,32 @@ const createRxReq = <S extends RxReqStrategy>(params: {
* For more information, see [document](https://penpenpng.github.io/rx-nostr/v1/req-strategy.html#backward-strategy).
*/
export function createRxBackwardReq(
rxReqId?: string,
// TODO (v4): remove string format options
options?: string /* for backward compatibility. */ | RxBackwardReqOptions,
): RxReq<"backward"> &
RxReqEmittable<{ relays: string[] }> &
RxReqOverable &
RxReqPipeable {
return createRxReq({
strategy: "backward",
rxReqId,
});
if (typeof options === "string") {
const rxReqId = options;
return createRxReq({
strategy: "backward",
rxReqId,
});
} else {
const { rxReqId, replay } = options ?? {};
return createRxReq({
strategy: "backward",
rxReqId,
subject: replay ? new ReplaySubject() : undefined,
});
}
}

export interface RxForwardReqOptions {
/** @deprecated */
rxReqId?: string;
replay?: boolean;
}

/**
Expand All @@ -186,12 +215,23 @@ export function createRxBackwardReq(
* For more information, see [document](https://penpenpng.github.io/rx-nostr/v1/req-strategy.html#forward-strategy).
*/
export function createRxForwardReq(
rxReqId?: string,
// TODO (v4): remove string format options
options?: string /* for backward compatibility. */ | RxForwardReqOptions,
): RxReq<"forward"> & RxReqEmittable & RxReqPipeable {
return createRxReq({
strategy: "forward",
rxReqId,
});
if (typeof options === "string") {
const rxReqId = options;
return createRxReq({
strategy: "forward",
rxReqId,
});
} else {
const { rxReqId, replay } = options ?? {};
return createRxReq({
strategy: "forward",
rxReqId,
subject: replay ? new ReplaySubject() : undefined,
});
}
}

/**
Expand Down

0 comments on commit 74679ed

Please sign in to comment.