diff --git a/jetstream/src/jsmsg.ts b/jetstream/src/jsmsg.ts index c7f7813d..0ab5abfa 100644 --- a/jetstream/src/jsmsg.ts +++ b/jetstream/src/jsmsg.ts @@ -20,19 +20,21 @@ import type { ProtocolHandler, } from "@nats-io/nats-core/internal"; import { + DataBuffer, deferred, millis, nanos, RequestOne, + RequestOptions, } from "@nats-io/nats-core/internal"; -import type { DeliveryInfo } from "./jsapi_types.ts"; +import type { DeliveryInfo, PullOptions } from "./jsapi_types.ts"; export const ACK = Uint8Array.of(43, 65, 67, 75); const NAK = Uint8Array.of(45, 78, 65, 75); const WPI = Uint8Array.of(43, 87, 80, 73); -const _NXT = Uint8Array.of(43, 78, 88, 84); +const NXT = Uint8Array.of(43, 78, 88, 84); const TERM = Uint8Array.of(43, 84, 69, 82, 77); -const _SPACE = Uint8Array.of(32); +const SPACE = Uint8Array.of(32); /** * Represents a message stored in JetStream @@ -98,19 +100,19 @@ export type JsMsg = { */ working(): void; - // /** - // * !! this is an experimental feature - and could be removed - // * - // * next() combines ack() and pull(), requires the subject for a - // * subscription processing to process a message is provided - // * (can be the same) however, because the ability to specify - // * how long to keep the request open can be specified, this - // * functionality doesn't work well with iterators, as an error - // * (408s) are expected and needed to re-trigger a pull in case - // * there was a timeout. In an iterator, the error will close - // * the iterator, requiring a subscription to be reset. - // */ - // next(subj: string, ro?: Partial): void; + /** + * !! this is an experimental feature - and could be removed + * + * next() combines ack() and pull(), requires the subject for a + * subscription processing to process a message is provided + * (can be the same) however, because the ability to specify + * how long to keep the request open can be specified, this + * functionality doesn't work well with iterators, as an error + * (408s) are expected and needed to re-trigger a pull in case + * there was a timeout. In an iterator, the error will close + * the iterator, requiring a subscription to be reset. + */ + next(subj: string, ro?: Partial): void; /** * Indicate to the JetStream server that processing of the message @@ -308,18 +310,18 @@ export class JsMsgImpl implements JsMsg { this.doAck(WPI); } - // next(subj: string, opts: Partial = { batch: 1 }) { - // const args: Partial = {}; - // args.batch = opts.batch || 1; - // args.no_wait = opts.no_wait || false; - // if (opts.expires && opts.expires > 0) { - // args.expires = nanos(opts.expires); - // } - // const data = new TextEncoder().encode(JSON.stringify(args)); - // const payload = DataBuffer.concat(NXT, SPACE, data); - // const reqOpts = subj ? { reply: subj } as RequestOptions : undefined; - // this.msg.respond(payload, reqOpts); - // } + next(subj: string, opts: Partial = { batch: 1 }) { + const args: Partial = {}; + args.batch = opts.batch || 1; + args.no_wait = opts.no_wait || false; + if (opts.expires && opts.expires > 0) { + args.expires = nanos(opts.expires); + } + const data = new TextEncoder().encode(JSON.stringify(args)); + const payload = DataBuffer.concat(NXT, SPACE, data); + const reqOpts = subj ? { reply: subj } as RequestOptions : undefined; + this.msg.respond(payload, reqOpts); + } term(reason = "") { let term = TERM; diff --git a/migration.md b/migration.md index b5a8a7d1..a92b7c4b 100644 --- a/migration.md +++ b/migration.md @@ -144,8 +144,6 @@ To use JetStream, you must install and import `@nats/jetstream`. - The `ConsumerEvents` and `ConsumerDebugEvents` enum has been removed and replaced with `ConsumerNotification` which have a discriminating field `type`. The status objects provide a more specific API for querying those events. -- The JsMsg.next() API has been retracted as the simplified consumer `next()`, - and `consume()` provide the necessary functionality. ## Changes to KV