Skip to content

Commit

Permalink
internal(jetstream): direct consumer (#154)
Browse files Browse the repository at this point in the history
* experimental consumer based on direct

Signed-off-by: Alberto Ricart <[email protected]>

Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart authored Dec 11, 2024
1 parent 8a3d5b7 commit 1dc4fe9
Show file tree
Hide file tree
Showing 6 changed files with 491 additions and 6 deletions.
24 changes: 22 additions & 2 deletions jetstream/src/jsapi_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
*/

import type { Nanos } from "@nats-io/nats-core";
import type { StoredMsg } from "./types.ts";
import { nanos } from "@nats-io/nats-core";
import type { MaxBytes, StoredMsg } from "./types.ts";

export type ApiPaged = {
total: number;
Expand Down Expand Up @@ -513,14 +514,20 @@ export type CompletionResult = { err?: Error };
export type BatchCallback<T> = (done: CompletionResult | null, d: T) => void;
export type StartSeq = { seq?: number };
export type StartTime = { start_time?: Date | string };

export type DirectBatch = {
batch: number;
};
export type DirectMaxBytes = MaxBytes;

export type DirectBatchLimits = {
batch?: number;
max_bytes?: number;
callback?: BatchCallback<StoredMsg>;
};
export type DirectBatchStartSeq = StartSeq & DirectBatchLimits;
export type DirectBatchStartTime = StartTime & DirectBatchLimits;
export type DirectBatchOptions = DirectBatchStartSeq | DirectBatchStartTime;
export type DirectBatchOptions = DirectBatchStartSeq & DirectBatchStartTime;

export type DirectLastFor = {
multi_last: string[];
Expand Down Expand Up @@ -1052,6 +1059,19 @@ export enum PriorityPolicy {
Overflow = "overflow",
}

export function defaultConsumer(
name: string,
opts: Partial<ConsumerConfig> = {},
): ConsumerConfig {
return Object.assign({
name: name,
deliver_policy: DeliverPolicy.All,
ack_policy: AckPolicy.Explicit,
ack_wait: nanos(30 * 1000),
replay_policy: ReplayPolicy.Instant,
}, opts);
}

export type OverflowMinPending = {
/**
* The name of the priority_group
Expand Down
261 changes: 257 additions & 4 deletions jetstream/src/jsm_direct.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@

import { BaseApiClientImpl } from "./jsbaseclient_api.ts";
import type {
ConsumerNotification,
DirectMsg,
DirectStreamAPI,
JetStreamOptions,
MaxBytes,
StoredMsg,
} from "./types.ts";
import { DirectMsgHeaders } from "./types.ts";
import type {
CallbackFn,
Codec,
Deferred,
Delay,
Msg,
MsgHdrs,
NatsConnection,
Expand All @@ -32,20 +35,27 @@ import type {
} from "@nats-io/nats-core/internal";
import {
createInbox,
deferred,
delay,
Empty,
Feature,
QueuedIteratorImpl,
TD,
} from "@nats-io/nats-core/internal";
import type {
CompletionResult,
DirectBatch,
DirectBatchOptions,
DirectBatchStartSeq,
DirectBatchStartTime,
DirectLastFor,
DirectMaxBytes,
DirectMsgRequest,
LastForMsgRequest,
PullOptions,
} from "./jsapi_types.ts";
import { validateStreamName } from "./jsutil.ts";
import { JetStreamStatus } from "./jserrors.ts";
import { JetStreamStatus, JetStreamStatusError } from "./jserrors.ts";

export class DirectStreamAPIImpl extends BaseApiClientImpl
implements DirectStreamAPI {
Expand Down Expand Up @@ -120,7 +130,7 @@ export class DirectStreamAPIImpl extends BaseApiClientImpl
): Promise<QueuedIterator<StoredMsg>> {
const { min, ok } = this.nc.features.get(Feature.JS_BATCH_DIRECT_GET);
if (!ok) {
throw new Error(`batch direct require server ${min}`);
return Promise.reject(new Error(`batch direct require server ${min}`));
}
validateStreamName(stream);
const callback = typeof opts.callback === "function" ? opts.callback : null;
Expand Down Expand Up @@ -220,7 +230,6 @@ export class DirectStreamAPIImpl extends BaseApiClientImpl
export class DirectMsgImpl implements DirectMsg {
data: Uint8Array;
header: MsgHdrs;
static jc?: Codec<unknown>;

constructor(m: Msg) {
if (!m.headers) {
Expand Down Expand Up @@ -256,6 +265,15 @@ export class DirectMsgImpl implements DirectMsg {
return typeof v === "string" ? parseInt(v) : 0;
}

get pending(): number {
const v = this.header.last(DirectMsgHeaders.NumPending);
// if we have a pending - this pending will include the number of messages
// in the stream + the end of batch signal - better to remove the eob signal
// from the count so the client can estimate how many messages are
// in the stream. If a batch is 1 message, the pending is not included.
return typeof v === "string" ? parseInt(v) - 1 : -1;
}

json<T = unknown>(reviver?: ReviverFn): T {
return JSON.parse(new TextDecoder().decode(this.data), reviver);
}
Expand All @@ -264,3 +282,238 @@ export class DirectMsgImpl implements DirectMsg {
return TD.decode(this.data);
}
}

/**
* Options for directly starting a direct consumer. The options can either specify
* a sequence start or a time start.
* @property {DirectBatchStartSeq} DirectBatchStartSeq - Specifies a sequence start for the consumer.
* @property {DirectBatchStartTime} DirectBatchStartTime - Specifies a time start for the consumer.
*/
export type DirectStartOptions = DirectBatchStartSeq | DirectBatchStartTime;

/**
* Represents the limits for the operation. For fetch requests it represents the maximum to be retrieved.
* For consume operations it represents the buffering for the consumer.
*
* This type is used to define constraints or configurations for batching processes that
* operate under specific limits, either in terms of quantity (DirectBatch) or size in bytes (DirectMaxBytes).
*/
export type DirectBatchLimits = DirectBatch | DirectMaxBytes;

function isDirectBatchStartTime(
t: DirectStartOptions,
): t is DirectBatchStartTime {
return typeof t === "object" && "start_time" in t;
}

function isMaxBytes(t: DirectBatchLimits): t is MaxBytes {
return typeof t === "object" && "max_bytes" in t;
}

export class DirectConsumer {
stream: string;
api: DirectStreamAPIImpl;
cursor: { last: number; pending?: number };
listeners: QueuedIteratorImpl<ConsumerNotification>[];
start: DirectStartOptions;

constructor(
stream: string,
api: DirectStreamAPIImpl,
start: DirectStartOptions,
) {
this.stream = stream;
this.api = api;
this.cursor = { last: 0 };
this.listeners = [];
this.start = start;
}

getOptions(
opts?: DirectBatchLimits,
): DirectBatchOptions {
opts = opts || {} as DirectBatchLimits;
const dbo: Partial<DirectBatchOptions> = {};

if (this.cursor.last === 0) {
// we have never pulled, honor initial request options
if (isDirectBatchStartTime(this.start)) {
dbo.start_time = this.start.start_time;
} else {
dbo.seq = this.start.seq || 1;
}
} else {
dbo.seq = this.cursor.last + 1;
}

if (isMaxBytes(opts)) {
dbo.max_bytes = opts.max_bytes;
} else {
dbo.batch = opts.batch ?? 100;
}

return dbo;
}

status(): AsyncIterable<ConsumerNotification> {
const iter = new QueuedIteratorImpl<ConsumerNotification>();
this.listeners.push(iter);
return iter;
}

notify(n: ConsumerNotification): void {
if (this.listeners.length > 0) {
(() => {
const remove: QueuedIteratorImpl<ConsumerNotification>[] = [];
this.listeners.forEach((l) => {
const qi = l as QueuedIteratorImpl<ConsumerNotification>;
if (!qi.done) {
qi.push(n);
} else {
remove.push(qi);
}
});
this.listeners = this.listeners.filter((l) => !remove.includes(l));
})();
}
}

debug() {
console.log(this.cursor);
}

consume(opts: DirectBatchLimits): Promise<QueuedIterator<StoredMsg>> {
let pending: Delay;
let requestDone: Deferred<void>;
const qi = new QueuedIteratorImpl<StoredMsg>();

(async () => {
while (true) {
// if we have nothing pending, slow it down
// on the first pull pending doesn't exist so no delay
if (this.cursor.pending === 0) {
this.notify({
type: "debug",
code: 0,
description: "sleeping for 2500",
});
pending = delay(2500);
await pending;
}
// check that we are still supposed to be running
// pending could have released if the iter closed
if (qi.done) {
break;
}
requestDone = deferred<void>();
const dbo = this.getOptions(opts);
this.notify({
type: "next",
options: Object.assign({}, opts) as PullOptions,
});

dbo.callback = (r: CompletionResult | null, sm: StoredMsg): void => {
if (r) {
// if the current fetch is done, ready to schedule the next
if (r.err) {
if (r.err instanceof JetStreamStatusError) {
this.notify({
type: "debug",
code: r.err.code,
description: r.err.message,
});
} else {
this.notify({
type: "debug",
code: 0,
description: r.err.message,
});
}
}
requestDone.resolve();
} else if (
sm.lastSequence > 0 && sm.lastSequence !== this.cursor.last
) {
// need to reset
src.stop();
requestDone.resolve();
this.notify({
type: "reset",
name: "direct",
});
} else {
qi.push(sm);
qi.received++;
this.cursor.last = sm.seq;
this.cursor.pending = sm.pending;
}
};

const src = await this.api.getBatch(
this.stream,
dbo,
) as QueuedIteratorImpl<StoredMsg>;

qi.iterClosed.then(() => {
src.stop();
pending?.cancel();
requestDone?.resolve();
});

await requestDone;
}
})().catch((err) => {
qi.stop(err);
});

return Promise.resolve(qi);
}

async fetch(opts?: DirectBatchLimits): Promise<QueuedIterator<StoredMsg>> {
const dbo = this.getOptions(opts);
const qi = new QueuedIteratorImpl<StoredMsg>();
const src = await this.api.get(
this.stream,
Object.assign({
callback: (done: CompletionResult | null, sm: StoredMsg) => {
if (done) {
// the server sent error or is done, we are done
qi.push(() => {
done.err ? qi.stop(done.err) : qi.stop();
});
} else if (
sm.lastSequence > 0 && sm.lastSequence !== this.cursor.last
) {
// we are done early because the sequence jumped unexpectedly
qi.push(() => {
qi.stop();
});
src.stop();
} else {
// pass-through to client, and record
qi.push(sm);
qi.received++;
this.cursor.last = sm.seq;
this.cursor.pending = sm.pending;
}
},
}, dbo),
);
qi.iterClosed.then(() => {
src.stop();
});

return qi;
}

async next(): Promise<StoredMsg | null> {
const sm = await this.api.getMessage(this.stream, {
seq: this.cursor.last + 1,
});
const seq = sm?.seq;
if (seq) {
this.cursor.last = seq;
}
return sm;
}
}
7 changes: 7 additions & 0 deletions jetstream/src/jsmstream_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,13 @@ export class StoredMsgImpl implements StoredMsg {
constructor(smr: StreamMsgResponse) {
this.smr = smr;
}
get pending(): number {
return 0;
}

get lastSequence(): number {
return 0;
}

get subject(): string {
return this.smr.message.subject;
Expand Down
Loading

0 comments on commit 1dc4fe9

Please sign in to comment.