Skip to content

Commit

Permalink
Restore and implement the JsMsg.next() API.
Browse files Browse the repository at this point in the history
The `JsMsg.next()` method has been reintroduced and fully implemented, replacing its previous stubbed and commented-out state. This method combines `ack()` and `pull()` functionality, allowing for experimental subscription processing. The migration guide was updated to reflect the restoration of this API.

Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart committed Dec 16, 2024
1 parent 8228dfe commit 8638082
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 30 deletions.
58 changes: 30 additions & 28 deletions jetstream/src/jsmsg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,21 @@ import type {
ProtocolHandler,
} from "@nats-io/nats-core/internal";
import {
DataBuffer,
deferred,
millis,
nanos,
RequestOne,
RequestOptions,
} from "@nats-io/nats-core/internal";
import type { DeliveryInfo } from "./jsapi_types.ts";
import type { DeliveryInfo, PullOptions } from "./jsapi_types.ts";

export const ACK = Uint8Array.of(43, 65, 67, 75);
const NAK = Uint8Array.of(45, 78, 65, 75);
const WPI = Uint8Array.of(43, 87, 80, 73);
const _NXT = Uint8Array.of(43, 78, 88, 84);
const NXT = Uint8Array.of(43, 78, 88, 84);
const TERM = Uint8Array.of(43, 84, 69, 82, 77);
const _SPACE = Uint8Array.of(32);
const SPACE = Uint8Array.of(32);

/**
* Represents a message stored in JetStream
Expand Down Expand Up @@ -98,19 +100,19 @@ export type JsMsg = {
*/
working(): void;

// /**
// * !! this is an experimental feature - and could be removed
// *
// * next() combines ack() and pull(), requires the subject for a
// * subscription processing to process a message is provided
// * (can be the same) however, because the ability to specify
// * how long to keep the request open can be specified, this
// * functionality doesn't work well with iterators, as an error
// * (408s) are expected and needed to re-trigger a pull in case
// * there was a timeout. In an iterator, the error will close
// * the iterator, requiring a subscription to be reset.
// */
// next(subj: string, ro?: Partial<PullOptions>): void;
/**
* !! this is an experimental feature - and could be removed
*
* next() combines ack() and pull(), requires the subject for a
* subscription processing to process a message is provided
* (can be the same) however, because the ability to specify
* how long to keep the request open can be specified, this
* functionality doesn't work well with iterators, as an error
* (408s) are expected and needed to re-trigger a pull in case
* there was a timeout. In an iterator, the error will close
* the iterator, requiring a subscription to be reset.
*/
next(subj: string, ro?: Partial<PullOptions>): void;

/**
* Indicate to the JetStream server that processing of the message
Expand Down Expand Up @@ -308,18 +310,18 @@ export class JsMsgImpl implements JsMsg {
this.doAck(WPI);
}

// next(subj: string, opts: Partial<PullOptions> = { batch: 1 }) {
// const args: Partial<PullOptions> = {};
// args.batch = opts.batch || 1;
// args.no_wait = opts.no_wait || false;
// if (opts.expires && opts.expires > 0) {
// args.expires = nanos(opts.expires);
// }
// const data = new TextEncoder().encode(JSON.stringify(args));
// const payload = DataBuffer.concat(NXT, SPACE, data);
// const reqOpts = subj ? { reply: subj } as RequestOptions : undefined;
// this.msg.respond(payload, reqOpts);
// }
next(subj: string, opts: Partial<PullOptions> = { batch: 1 }) {
const args: Partial<PullOptions> = {};
args.batch = opts.batch || 1;
args.no_wait = opts.no_wait || false;
if (opts.expires && opts.expires > 0) {
args.expires = nanos(opts.expires);
}
const data = new TextEncoder().encode(JSON.stringify(args));
const payload = DataBuffer.concat(NXT, SPACE, data);
const reqOpts = subj ? { reply: subj } as RequestOptions : undefined;
this.msg.respond(payload, reqOpts);
}

term(reason = "") {
let term = TERM;
Expand Down
2 changes: 0 additions & 2 deletions migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,6 @@ To use JetStream, you must install and import `@nats/jetstream`.
- The `ConsumerEvents` and `ConsumerDebugEvents` enum has been removed and
replaced with `ConsumerNotification` which have a discriminating field `type`.
The status objects provide a more specific API for querying those events.
- The JsMsg.next() API has been retracted as the simplified consumer `next()`,
and `consume()` provide the necessary functionality.

## Changes to KV

Expand Down

0 comments on commit 8638082

Please sign in to comment.