Skip to content

Commit 09c685a

Browse files
committed
fixed types
Signed-off-by: Alberto Ricart <[email protected]>
1 parent afc3fa1 commit 09c685a

File tree

2 files changed

+43
-20
lines changed

2 files changed

+43
-20
lines changed

jetstream/src/jsapi_types.ts

-1
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,6 @@ export type DirectBatchLimits = {
528528
export type DirectBatchStartSeq = StartSeq & DirectBatchLimits;
529529
export type DirectBatchStartTime = StartTime & DirectBatchLimits;
530530
export type DirectBatchOptions = DirectBatchStartSeq & DirectBatchStartTime;
531-
export type DirectFetchOptions = DirectBatch & DirectMaxBytes;
532531

533532
export type DirectLastFor = {
534533
multi_last: string[];

jetstream/src/jsm_direct.ts

+43-19
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@ import type {
1919
DirectMsg,
2020
DirectStreamAPI,
2121
JetStreamOptions,
22+
MaxBytes,
2223
StoredMsg,
2324
} from "./types.ts";
2425
import { DirectMsgHeaders } from "./types.ts";
2526
import type {
2627
CallbackFn,
27-
Codec,
2828
Deferred,
2929
Delay,
3030
Msg,
@@ -44,14 +44,15 @@ import {
4444
} from "@nats-io/nats-core/internal";
4545
import type {
4646
CompletionResult,
47+
DirectBatch,
4748
DirectBatchOptions,
48-
DirectFetchOptions,
49+
DirectBatchStartSeq,
50+
DirectBatchStartTime,
4951
DirectLastFor,
52+
DirectMaxBytes,
5053
DirectMsgRequest,
5154
LastForMsgRequest,
5255
PullOptions,
53-
StartSeq,
54-
StartTime,
5556
} from "./jsapi_types.ts";
5657
import { validateStreamName } from "./jsutil.ts";
5758
import { JetStreamStatus, JetStreamStatusError } from "./jserrors.ts";
@@ -229,7 +230,6 @@ export class DirectStreamAPIImpl extends BaseApiClientImpl
229230
export class DirectMsgImpl implements DirectMsg {
230231
data: Uint8Array;
231232
header: MsgHdrs;
232-
static jc?: Codec<unknown>;
233233

234234
constructor(m: Msg) {
235235
if (!m.headers) {
@@ -283,17 +283,44 @@ export class DirectMsgImpl implements DirectMsg {
283283
}
284284
}
285285

286+
/**
287+
* Options for directly starting a direct consumer. The options can either specify
288+
* a sequence start or a time start.
289+
* @property {DirectBatchStartSeq} DirectBatchStartSeq - Specifies a sequence start for the consumer.
290+
* @property {DirectBatchStartTime} DirectBatchStartTime - Specifies a time start for the consumer.
291+
*/
292+
export type DirectStartOptions = DirectBatchStartSeq | DirectBatchStartTime;
293+
294+
/**
295+
* Represents the limits for the operation. For fetch requests it represents the maximum to be retrieved.
296+
* For consume operations it represents the buffering for the consumer.
297+
*
298+
* This type is used to define constraints or configurations for batching processes that
299+
* operate under specific limits, either in terms of quantity (DirectBatch) or size in bytes (DirectMaxBytes).
300+
*/
301+
export type DirectBatchLimits = DirectBatch | DirectMaxBytes;
302+
303+
function isDirectBatchStartTime(
304+
t: DirectStartOptions,
305+
): t is DirectBatchStartTime {
306+
return typeof t === "object" && "start_time" in t;
307+
}
308+
309+
function isMaxBytes(t: DirectBatchLimits): t is MaxBytes {
310+
return typeof t === "object" && "max_bytes" in t;
311+
}
312+
286313
export class DirectConsumer {
287314
stream: string;
288315
api: DirectStreamAPIImpl;
289316
cursor: { last: number; pending?: number };
290317
listeners: QueuedIteratorImpl<ConsumerNotification>[];
291-
start: StartSeq & StartTime;
318+
start: DirectStartOptions;
292319

293320
constructor(
294321
stream: string,
295322
api: DirectStreamAPIImpl,
296-
start: StartSeq & StartTime,
323+
start: DirectStartOptions,
297324
) {
298325
this.stream = stream;
299326
this.api = api;
@@ -303,29 +330,26 @@ export class DirectConsumer {
303330
}
304331

305332
getOptions(
306-
opts: Partial<DirectFetchOptions> = {},
307-
): Partial<DirectBatchOptions> {
333+
opts?: DirectBatchLimits,
334+
): DirectBatchOptions {
335+
opts = opts || {} as DirectBatchLimits;
308336
const dbo: Partial<DirectBatchOptions> = {};
309337

310338
if (this.cursor.last === 0) {
311339
// we have never pulled, honor initial request options
312-
if (this.start.seq) {
313-
dbo.seq = this.start.seq;
314-
} else if (this.start.start_time) {
340+
if (isDirectBatchStartTime(this.start)) {
315341
dbo.start_time = this.start.start_time;
316342
} else {
317-
dbo.seq = 1;
343+
dbo.seq = this.start.seq || 1;
318344
}
319345
} else {
320346
dbo.seq = this.cursor.last + 1;
321347
}
322348

323-
if (opts.batch) {
324-
dbo.batch = opts.batch;
325-
} else if (opts.max_bytes) {
349+
if (isMaxBytes(opts)) {
326350
dbo.max_bytes = opts.max_bytes;
327351
} else {
328-
dbo.batch = 100;
352+
dbo.batch = opts.batch ?? 100;
329353
}
330354

331355
return dbo;
@@ -358,7 +382,7 @@ export class DirectConsumer {
358382
console.log(this.cursor);
359383
}
360384

361-
consume(opts?: DirectBatchOptions): Promise<QueuedIterator<StoredMsg>> {
385+
consume(opts: DirectBatchLimits): Promise<QueuedIterator<StoredMsg>> {
362386
let pending: Delay;
363387
let requestDone: Deferred<void>;
364388
const qi = new QueuedIteratorImpl<StoredMsg>();
@@ -445,7 +469,7 @@ export class DirectConsumer {
445469
return Promise.resolve(qi);
446470
}
447471

448-
async fetch(opts?: DirectBatchOptions): Promise<QueuedIterator<StoredMsg>> {
472+
async fetch(opts?: DirectBatchLimits): Promise<QueuedIterator<StoredMsg>> {
449473
const dbo = this.getOptions(opts);
450474
const qi = new QueuedIteratorImpl<StoredMsg>();
451475
const src = await this.api.get(

0 commit comments

Comments
 (0)