Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -1988,5 +1988,15 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSAtomicPublishContainsDuplicateMessageErr",
"code": 400,
"error_code": 10201,
"description": "atomic publish batch contains duplicate message id",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
4 changes: 2 additions & 2 deletions server/jetstream_batching.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,15 +303,15 @@ func checkMsgHeadersPreClusteredProposal(
if msgId := getMsgId(hdr); msgId != _EMPTY_ {
// Dedupe if staged.
if _, ok = diff.msgIds[msgId]; ok {
return hdr, msg, 0, nil, errMsgIdDuplicate
return hdr, msg, 0, NewJSAtomicPublishContainsDuplicateMessageError(), errMsgIdDuplicate
}
mset.ddMu.Lock()
if dde := mset.checkMsgId(msgId); dde != nil {
seq := dde.seq
mset.ddMu.Unlock()
// Should not return an invalid sequence, in that case error.
if seq > 0 {
return hdr, msg, seq, nil, errMsgIdDuplicate
return hdr, msg, seq, NewJSAtomicPublishContainsDuplicateMessageError(), errMsgIdDuplicate
} else {
return hdr, msg, 0, NewJSStreamDuplicateMessageConflictError(), errMsgIdDuplicate
}
Expand Down
31 changes: 22 additions & 9 deletions server/jetstream_batching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,24 +397,38 @@ func TestJetStreamAtomicBatchPublishDedupeNotAllowed(t *testing.T) {
_, err := jsStreamCreate(t, nc, cfg)
require_NoError(t, err)

_, err = js.Publish("foo", nil, nats.MsgId("pre-existing"))
require_NoError(t, err)

var pubAck JSPubAckResponse
m := nats.NewMsg("foo")
m.Header.Set("Nats-Msg-Id", "pre-existing")
m.Header.Set("Nats-Batch-Id", "uuid")
m.Header.Set("Nats-Batch-Sequence", "1")
m.Header.Set("Nats-Batch-Commit", "1")
rmsg, err := nc.RequestMsg(m, time.Second)
require_NoError(t, err)
require_NoError(t, json.Unmarshal(rmsg.Data, &pubAck))
require_NotNil(t, pubAck.Error)
require_Error(t, pubAck.Error, NewJSAtomicPublishContainsDuplicateMessageError())

m = nats.NewMsg("foo")
m.Header.Set("Nats-Batch-Id", "uuid")
m.Header.Set("Nats-Batch-Sequence", "1")
m.Header.Set("Nats-Msg-Id", "msgId1")
require_NoError(t, nc.PublishMsg(m))
m.Header.Set("Nats-Batch-Sequence", "2")
m.Header.Set("Nats-Msg-Id", "pre-existing")
require_NoError(t, nc.PublishMsg(m))

var pubAck JSPubAckResponse
m.Header.Set("Nats-Batch-Sequence", "3")
pubAck = JSPubAckResponse{}
m.Header.Set("Nats-Batch-Sequence", "2")
m.Header.Set("Nats-Msg-Id", "msgId2")
m.Header.Set("Nats-Batch-Commit", "1")
rmsg, err := nc.RequestMsg(m, time.Second)
rmsg, err = nc.RequestMsg(m, time.Second)
require_NoError(t, err)
require_NoError(t, json.Unmarshal(rmsg.Data, &pubAck))
require_NotNil(t, pubAck.Error)
require_Error(t, pubAck.Error, NewJSAtomicPublishUnsupportedHeaderBatchError("Nats-Msg-Id"))
require_True(t, pubAck.Error == nil)
require_Equal(t, pubAck.BatchSize, 2)
require_Equal(t, pubAck.Sequence, 3)
require_Equal(t, pubAck.BatchId, "uuid")
}

for _, storage := range []StorageType{FileStorage, MemoryStorage} {
Expand Down Expand Up @@ -727,7 +741,6 @@ func TestJetStreamAtomicBatchPublishDenyHeaders(t *testing.T) {

// We might support these headers later on, but for now error.
for key, value := range map[string]string{
"Nats-Msg-Id": "msgId",
"Nats-Expected-Last-Msg-Id": "msgId",
} {
t.Run(key, func(t *testing.T) {
Expand Down
14 changes: 14 additions & 0 deletions server/jetstream_errors_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ const (
// JSAccountResourcesExceededErr resource limits exceeded for account
JSAccountResourcesExceededErr ErrorIdentifier = 10002

// JSAtomicPublishContainsDuplicateMessageErr atomic publish batch contains duplicate message id
JSAtomicPublishContainsDuplicateMessageErr ErrorIdentifier = 10201

// JSAtomicPublishDisabledErr atomic publish is disabled
JSAtomicPublishDisabledErr ErrorIdentifier = 10174

Expand Down Expand Up @@ -606,6 +609,7 @@ const (
var (
ApiErrors = map[ErrorIdentifier]*ApiError{
JSAccountResourcesExceededErr: {Code: 400, ErrCode: 10002, Description: "resource limits exceeded for account"},
JSAtomicPublishContainsDuplicateMessageErr: {Code: 400, ErrCode: 10201, Description: "atomic publish batch contains duplicate message id"},
JSAtomicPublishDisabledErr: {Code: 400, ErrCode: 10174, Description: "atomic publish is disabled"},
JSAtomicPublishIncompleteBatchErr: {Code: 400, ErrCode: 10176, Description: "atomic publish batch is incomplete"},
JSAtomicPublishInvalidBatchCommitErr: {Code: 400, ErrCode: 10200, Description: "atomic publish batch commit is invalid"},
Expand Down Expand Up @@ -839,6 +843,16 @@ func NewJSAccountResourcesExceededError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSAccountResourcesExceededErr]
}

// NewJSAtomicPublishContainsDuplicateMessageError creates a new JSAtomicPublishContainsDuplicateMessageErr error: "atomic publish batch contains duplicate message id"
func NewJSAtomicPublishContainsDuplicateMessageError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSAtomicPublishContainsDuplicateMessageErr]
}

// NewJSAtomicPublishDisabledError creates a new JSAtomicPublishDisabledErr error: "atomic publish is disabled"
func NewJSAtomicPublishDisabledError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
Expand Down
3 changes: 0 additions & 3 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6505,9 +6505,6 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr
}

// Reject unsupported headers.
if msgId := getMsgId(bhdr); msgId != _EMPTY_ {
return errorOnUnsupported(seq, JSMsgId)
}
if getExpectedLastMsgId(hdr) != _EMPTY_ {
return errorOnUnsupported(seq, JSExpectedLastMsgId)
}
Expand Down