From 954ef1d27b17f1eb88228d782efd332b980d8759 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Sat, 23 Nov 2024 12:24:56 -0600 Subject: [PATCH] fixed types Signed-off-by: Alberto Ricart --- jetstream/src/jsapi_types.ts | 1 - jetstream/src/jsm_direct.ts | 62 +++++++++++++++++++++++++----------- 2 files changed, 43 insertions(+), 20 deletions(-) diff --git a/jetstream/src/jsapi_types.ts b/jetstream/src/jsapi_types.ts index 28387ab3..830c9b36 100644 --- a/jetstream/src/jsapi_types.ts +++ b/jetstream/src/jsapi_types.ts @@ -528,7 +528,6 @@ export type DirectBatchLimits = { export type DirectBatchStartSeq = StartSeq & DirectBatchLimits; export type DirectBatchStartTime = StartTime & DirectBatchLimits; export type DirectBatchOptions = DirectBatchStartSeq & DirectBatchStartTime; -export type DirectFetchOptions = DirectBatch & DirectMaxBytes; export type DirectLastFor = { multi_last: string[]; diff --git a/jetstream/src/jsm_direct.ts b/jetstream/src/jsm_direct.ts index 22975c9c..a43c87b6 100644 --- a/jetstream/src/jsm_direct.ts +++ b/jetstream/src/jsm_direct.ts @@ -19,12 +19,12 @@ import type { DirectMsg, DirectStreamAPI, JetStreamOptions, + MaxBytes, StoredMsg, } from "./types.ts"; import { DirectMsgHeaders } from "./types.ts"; import type { CallbackFn, - Codec, Deferred, Delay, Msg, @@ -44,14 +44,15 @@ import { } from "@nats-io/nats-core/internal"; import type { CompletionResult, + DirectBatch, DirectBatchOptions, - DirectFetchOptions, + DirectBatchStartSeq, + DirectBatchStartTime, DirectLastFor, + DirectMaxBytes, DirectMsgRequest, LastForMsgRequest, PullOptions, - StartSeq, - StartTime, } from "./jsapi_types.ts"; import { validateStreamName } from "./jsutil.ts"; import { JetStreamStatus, JetStreamStatusError } from "./jserrors.ts"; @@ -229,7 +230,6 @@ export class DirectStreamAPIImpl extends BaseApiClientImpl export class DirectMsgImpl implements DirectMsg { data: Uint8Array; header: MsgHdrs; - static jc?: Codec; constructor(m: Msg) { if (!m.headers) { @@ -283,17 +283,44 @@ export class DirectMsgImpl implements DirectMsg { } } +/** + * Options for directly starting a direct consumer. The options can either specify + * a sequence start or a time start. + * @property {DirectBatchStartSeq} DirectBatchStartSeq - Specifies a sequence start for the consumer. + * @property {DirectBatchStartTime} DirectBatchStartTime - Specifies a time start for the consumer. + */ +export type DirectStartOptions = DirectBatchStartSeq | DirectBatchStartTime; + +/** + * Represents the limits for the operation. For fetch requests it represents the maximum to be retrieved. + * For consume operations it represents the buffering for the consumer. + * + * This type is used to define constraints or configurations for batching processes that + * operate under specific limits, either in terms of quantity (DirectBatch) or size in bytes (DirectMaxBytes). + */ +export type DirectBatchLimits = DirectBatch | DirectMaxBytes; + +function isDirectBatchStartTime( + t: DirectStartOptions, +): t is DirectBatchStartTime { + return typeof t === "object" && "start_time" in t; +} + +function isMaxBytes(t: DirectBatchLimits): t is MaxBytes { + return typeof t === "object" && "max_bytes" in t; +} + export class DirectConsumer { stream: string; api: DirectStreamAPIImpl; cursor: { last: number; pending?: number }; listeners: QueuedIteratorImpl[]; - start: StartSeq & StartTime; + start: DirectStartOptions; constructor( stream: string, api: DirectStreamAPIImpl, - start: StartSeq & StartTime, + start: DirectStartOptions, ) { this.stream = stream; this.api = api; @@ -303,29 +330,26 @@ export class DirectConsumer { } getOptions( - opts: Partial = {}, - ): Partial { + opts?: DirectBatchLimits, + ): DirectBatchOptions { + opts = opts || {} as DirectBatchLimits; const dbo: Partial = {}; if (this.cursor.last === 0) { // we have never pulled, honor initial request options - if (this.start.seq) { - dbo.seq = this.start.seq; - } else if (this.start.start_time) { + if (isDirectBatchStartTime(this.start)) { dbo.start_time = this.start.start_time; } else { - dbo.seq = 1; + dbo.seq = this.start.seq || 1; } } else { dbo.seq = this.cursor.last + 1; } - if (opts.batch) { - dbo.batch = opts.batch; - } else if (opts.max_bytes) { + if (isMaxBytes(opts)) { dbo.max_bytes = opts.max_bytes; } else { - dbo.batch = 100; + dbo.batch = opts.batch ?? 100; } return dbo; @@ -358,7 +382,7 @@ export class DirectConsumer { console.log(this.cursor); } - consume(opts?: DirectBatchOptions): Promise> { + consume(opts: DirectBatchLimits): Promise> { let pending: Delay; let requestDone: Deferred; const qi = new QueuedIteratorImpl(); @@ -445,7 +469,7 @@ export class DirectConsumer { return Promise.resolve(qi); } - async fetch(opts?: DirectBatchOptions): Promise> { + async fetch(opts?: DirectBatchLimits): Promise> { const dbo = this.getOptions(opts); const qi = new QueuedIteratorImpl(); const src = await this.api.get(