diff --git a/server/errors.json b/server/errors.json index 7f04b23469a..410544bdaa2 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1988,5 +1988,15 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSAtomicPublishContainsDuplicateMessageErr", + "code": 400, + "error_code": 10201, + "description": "atomic publish batch contains duplicate message id", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } ] diff --git a/server/jetstream_batching.go b/server/jetstream_batching.go index 16daf30648a..fc97f1f5453 100644 --- a/server/jetstream_batching.go +++ b/server/jetstream_batching.go @@ -303,7 +303,7 @@ func checkMsgHeadersPreClusteredProposal( if msgId := getMsgId(hdr); msgId != _EMPTY_ { // Dedupe if staged. if _, ok = diff.msgIds[msgId]; ok { - return hdr, msg, 0, nil, errMsgIdDuplicate + return hdr, msg, 0, NewJSAtomicPublishContainsDuplicateMessageError(), errMsgIdDuplicate } mset.ddMu.Lock() if dde := mset.checkMsgId(msgId); dde != nil { @@ -311,7 +311,7 @@ func checkMsgHeadersPreClusteredProposal( mset.ddMu.Unlock() // Should not return an invalid sequence, in that case error. if seq > 0 { - return hdr, msg, seq, nil, errMsgIdDuplicate + return hdr, msg, seq, NewJSAtomicPublishContainsDuplicateMessageError(), errMsgIdDuplicate } else { return hdr, msg, 0, NewJSStreamDuplicateMessageConflictError(), errMsgIdDuplicate } diff --git a/server/jetstream_batching_test.go b/server/jetstream_batching_test.go index c00d3c5fdf6..e64e958fc68 100644 --- a/server/jetstream_batching_test.go +++ b/server/jetstream_batching_test.go @@ -397,24 +397,38 @@ func TestJetStreamAtomicBatchPublishDedupeNotAllowed(t *testing.T) { _, err := jsStreamCreate(t, nc, cfg) require_NoError(t, err) + _, err = js.Publish("foo", nil, nats.MsgId("pre-existing")) + require_NoError(t, err) + + var pubAck JSPubAckResponse m := nats.NewMsg("foo") + m.Header.Set("Nats-Msg-Id", "pre-existing") + m.Header.Set("Nats-Batch-Id", "uuid") + m.Header.Set("Nats-Batch-Sequence", "1") + m.Header.Set("Nats-Batch-Commit", "1") + rmsg, err := nc.RequestMsg(m, time.Second) + require_NoError(t, err) + require_NoError(t, json.Unmarshal(rmsg.Data, &pubAck)) + require_NotNil(t, pubAck.Error) + require_Error(t, pubAck.Error, NewJSAtomicPublishContainsDuplicateMessageError()) + + m = nats.NewMsg("foo") m.Header.Set("Nats-Batch-Id", "uuid") m.Header.Set("Nats-Batch-Sequence", "1") m.Header.Set("Nats-Msg-Id", "msgId1") require_NoError(t, nc.PublishMsg(m)) - m.Header.Set("Nats-Batch-Sequence", "2") - m.Header.Set("Nats-Msg-Id", "pre-existing") - require_NoError(t, nc.PublishMsg(m)) - var pubAck JSPubAckResponse - m.Header.Set("Nats-Batch-Sequence", "3") + pubAck = JSPubAckResponse{} + m.Header.Set("Nats-Batch-Sequence", "2") m.Header.Set("Nats-Msg-Id", "msgId2") m.Header.Set("Nats-Batch-Commit", "1") - rmsg, err := nc.RequestMsg(m, time.Second) + rmsg, err = nc.RequestMsg(m, time.Second) require_NoError(t, err) require_NoError(t, json.Unmarshal(rmsg.Data, &pubAck)) - require_NotNil(t, pubAck.Error) - require_Error(t, pubAck.Error, NewJSAtomicPublishUnsupportedHeaderBatchError("Nats-Msg-Id")) + require_True(t, pubAck.Error == nil) + require_Equal(t, pubAck.BatchSize, 2) + require_Equal(t, pubAck.Sequence, 3) + require_Equal(t, pubAck.BatchId, "uuid") } for _, storage := range []StorageType{FileStorage, MemoryStorage} { @@ -727,7 +741,6 @@ func TestJetStreamAtomicBatchPublishDenyHeaders(t *testing.T) { // We might support these headers later on, but for now error. for key, value := range map[string]string{ - "Nats-Msg-Id": "msgId", "Nats-Expected-Last-Msg-Id": "msgId", } { t.Run(key, func(t *testing.T) { diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index dbc7f0499d7..d244ebd7ac2 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -8,6 +8,9 @@ const ( // JSAccountResourcesExceededErr resource limits exceeded for account JSAccountResourcesExceededErr ErrorIdentifier = 10002 + // JSAtomicPublishContainsDuplicateMessageErr atomic publish batch contains duplicate message id + JSAtomicPublishContainsDuplicateMessageErr ErrorIdentifier = 10201 + // JSAtomicPublishDisabledErr atomic publish is disabled JSAtomicPublishDisabledErr ErrorIdentifier = 10174 @@ -606,6 +609,7 @@ const ( var ( ApiErrors = map[ErrorIdentifier]*ApiError{ JSAccountResourcesExceededErr: {Code: 400, ErrCode: 10002, Description: "resource limits exceeded for account"}, + JSAtomicPublishContainsDuplicateMessageErr: {Code: 400, ErrCode: 10201, Description: "atomic publish batch contains duplicate message id"}, JSAtomicPublishDisabledErr: {Code: 400, ErrCode: 10174, Description: "atomic publish is disabled"}, JSAtomicPublishIncompleteBatchErr: {Code: 400, ErrCode: 10176, Description: "atomic publish batch is incomplete"}, JSAtomicPublishInvalidBatchCommitErr: {Code: 400, ErrCode: 10200, Description: "atomic publish batch commit is invalid"}, @@ -839,6 +843,16 @@ func NewJSAccountResourcesExceededError(opts ...ErrorOption) *ApiError { return ApiErrors[JSAccountResourcesExceededErr] } +// NewJSAtomicPublishContainsDuplicateMessageError creates a new JSAtomicPublishContainsDuplicateMessageErr error: "atomic publish batch contains duplicate message id" +func NewJSAtomicPublishContainsDuplicateMessageError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSAtomicPublishContainsDuplicateMessageErr] +} + // NewJSAtomicPublishDisabledError creates a new JSAtomicPublishDisabledErr error: "atomic publish is disabled" func NewJSAtomicPublishDisabledError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) diff --git a/server/stream.go b/server/stream.go index 1bf64daba42..50c4a1b0778 100644 --- a/server/stream.go +++ b/server/stream.go @@ -6505,9 +6505,6 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr } // Reject unsupported headers. - if msgId := getMsgId(bhdr); msgId != _EMPTY_ { - return errorOnUnsupported(seq, JSMsgId) - } if getExpectedLastMsgId(hdr) != _EMPTY_ { return errorOnUnsupported(seq, JSExpectedLastMsgId) }