-
Is it possible to manually acknowledge delivered message stream sequence in a separate request (independent on the request receiving the message) to the nats server? Usually you would receive messages from a consumer, process them and ack them in a single defined context: const messages = await consumer.consume({ max_messages: maxMessages });
for await (const m of messages) {
//some process logic
await m.ack();
} In my use case I want to bridge http API with a nats stream of messages. The issue is I can not //inside implementation of POST /events/ack endpoint`
const s : Stream | undefined = await jetstream.streams.get('mystream');
const message = await s.getMessage({seq: 10});
//message has no `ack` method |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments
-
Sure you can! - and for that reason, any of the other operations as well (have a look at Line 24 in fe45514 In a nutshell, you'll need to "grab" the embedded nats message that is wrapped by JsMsg, get the reply subject from it, and then publish a message to the reply subject with the payload const name = nuid.next();
const jsm = await nc.jetstreamManager();
await jsm.streams.add({
name,
subjects: [name],
storage: StorageType.Memory,
retention: RetentionPolicy.Workqueue,
});
const js = nc.jetstream();
await js.publish(name);
let si = await jsm.streams.info(name);
assertEquals(si.state.messages, 1);
await jsm.consumers.add(name, {
durable_name: "c",
ack_policy: AckPolicy.Explicit,
});
const c = await js.consumers.get(name, "c");
const m = await c.next();
const mi = m as JsMsgImpl;
const subj = mi.msg.reply!
nc.publish(subj, "+ACK");
await nc.flush();
si = await jsm.streams.info(name);
assertEquals(si.state.messages, 0); |
Beta Was this translation helpful? Give feedback.
-
When reading a message from the stream directly (as you were doing above) all you are doing is reading from the stream (there's no consumer behind it) so you cannot ack or anything. The acks, etc are for maintaining the consumer's state, which in the case of the direct from the stream doesn't exist. |
Beta Was this translation helpful? Give feedback.
Sure you can! - and for that reason, any of the other operations as well (have a look at
nats.deno/jetstream/jsmsg.ts
Line 24 in fe45514
In a nutshell, you'll need to "grab" the embedded nats message that is wrapped by JsMsg, get the reply subject from it, and then publish a message to the reply subject with the payload
+ACK
: