diff --git a/server/jetstream_batching_test.go b/server/jetstream_batching_test.go index 5bf0a2a21db..82e16ba4ff8 100644 --- a/server/jetstream_batching_test.go +++ b/server/jetstream_batching_test.go @@ -2567,3 +2567,51 @@ func TestJetStreamAtomicBatchPublishPersistModeAsync(t *testing.T) { _, err := jsStreamCreate(t, nc, cfg) require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("async persist mode is not supported with atomic batch publish"))) } + +func TestJetStreamAtomicBatchPublishExpectedLastSubjectSequence(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + cfg := &StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.*"}, + Storage: FileStorage, + Replicas: 1, + AllowAtomicPublish: true, + } + _, err := jsStreamCreate(t, nc, cfg) + require_NoError(t, err) + + _, err = js.Publish("foo.A", nil) + require_NoError(t, err) + _, err = js.Publish("foo.B", nil) + require_NoError(t, err) + + m := nats.NewMsg("foo.A") + m.Header.Set("Nats-Batch-Id", "uuid") + m.Header.Set("Nats-Batch-Sequence", "1") + m.Header.Set("Nats-Expected-Last-Sequence", "2") + m.Header.Set("Nats-Expected-Last-Subject-Sequence", "1") + m.Header.Set("Nats-Expected-Last-Subject-Sequence-Subject", "foo.A") + msg, err := nc.RequestMsg(m, time.Second) + require_NoError(t, err) + require_Len(t, len(msg.Data), 0) // Empty ack. + + m = nats.NewMsg("foo.B") + m.Header.Set("Nats-Batch-Id", "uuid") + m.Header.Set("Nats-Batch-Sequence", "2") + m.Header.Set("Nats-Batch-Commit", "1") + m.Header.Set("Nats-Expected-Last-Subject-Sequence", "2") + m.Header.Set("Nats-Expected-Last-Subject-Sequence-Subject", "foo.B") + msg, err = nc.RequestMsg(m, time.Second) + require_NoError(t, err) + var resp JSPubAckResponse + require_NoError(t, json.Unmarshal(msg.Data, &resp)) + require_True(t, resp.Error == nil) + require_Equal(t, resp.PubAck.Sequence, 4) + require_Equal(t, resp.PubAck.BatchId, "uuid") + require_Equal(t, resp.PubAck.BatchSize, 2) +} diff --git a/server/stream.go b/server/stream.go index 7dbc7d2b4e3..1a080e6cd44 100644 --- a/server/stream.go +++ b/server/stream.go @@ -6482,7 +6482,7 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr return errorOnUnsupported(seq, JSExpectedLastMsgId) } - if bhdr, bmsg, _, apiErr, err = checkMsgHeadersPreClusteredProposal(diff, mset, subject, bhdr, bmsg, false, name, jsa, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules, discard, discardNewPer, maxMsgSize, maxMsgs, maxMsgsPer, maxBytes); err != nil { + if bhdr, bmsg, _, apiErr, err = checkMsgHeadersPreClusteredProposal(diff, mset, bsubj, bhdr, bmsg, false, name, jsa, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules, discard, discardNewPer, maxMsgSize, maxMsgs, maxMsgsPer, maxBytes); err != nil { rollback(seq) b.cleanupLocked(batchId, batches) batches.mu.Unlock()