diff --git a/server/filestore.go b/server/filestore.go index 734b6d58b0b..aa1c3facee1 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5652,7 +5652,7 @@ func (fs *fileStore) expireMsgs() { // if it was the last message of that particular subject that we just deleted. if sdmEnabled { if last, ok := fs.shouldProcessSdm(seq, sm.subj); ok { - sdm := last && len(getHeader(JSMarkerReason, sm.hdr)) == 0 + sdm := last && isSubjectDeleteMarker(sm.hdr) fs.handleRemovalOrSdm(seq, sm.subj, sdm, sdmTTL) } } else { @@ -5685,7 +5685,7 @@ func (fs *fileStore) expireMsgs() { if ttlSdm == nil { ttlSdm = make(map[string][]SDMBySubj, 1) } - ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, len(getHeader(JSMarkerReason, sm.hdr)) != 0}) + ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, !isSubjectDeleteMarker(sm.hdr)}) } else { // Collect sequences to remove. Don't remove messages inline here, // as that releases the lock and THW is not thread-safe. diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 0652953c31e..ada1dfa4610 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -44,6 +44,7 @@ import ( "github.com/nats-io/jwt/v2" "github.com/nats-io/nats-server/v2/server/sysmem" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" "github.com/nats-io/nkeys" "github.com/nats-io/nuid" ) @@ -21122,3 +21123,63 @@ func TestJetStreamGetNoHeaders(t *testing.T) { require_Equal(t, headers.Get("Nats-Time-Stamp"), _EMPTY_) }) } + +func TestJetStreamKVNoSubjectDeleteMarkerOnPurgeMarker(t *testing.T) { + for _, storage := range []jetstream.StorageType{jetstream.FileStorage, jetstream.MemoryStorage} { + t.Run(storage.String(), func(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnectNewAPI(t, s) + defer nc.Close() + + ctx := context.Background() + kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ + Bucket: "bucket", + History: 1, + Storage: storage, + TTL: 2 * time.Second, + LimitMarkerTTL: time.Minute, + }) + require_NoError(t, err) + + stream, err := js.Stream(ctx, "KV_bucket") + require_NoError(t, err) + + // Purge such that the bucket TTL expires this message. + require_NoError(t, kv.Purge(ctx, "key")) + rsm, err := stream.GetMsg(ctx, 1) + require_NoError(t, err) + require_Equal(t, rsm.Header.Get("KV-Operation"), "PURGE") + + // The bucket TTL should have removed the message by now. + time.Sleep(2500 * time.Millisecond) + + // Confirm the purge marker is gone. + _, err = stream.GetMsg(ctx, 1) + require_Error(t, err, jetstream.ErrMsgNotFound) + require_Equal(t, rsm.Header.Get("KV-Operation"), "PURGE") + + // Confirm we don't get a redundant subject delete marker. + _, err = stream.GetMsg(ctx, 2) + require_Error(t, err, jetstream.ErrMsgNotFound) + + // Purge with a TTL so it expires this message. + require_NoError(t, kv.Purge(ctx, "key", jetstream.PurgeTTL(time.Second))) + rsm, err = stream.GetMsg(ctx, 2) + require_NoError(t, err) + require_Equal(t, rsm.Header.Get("KV-Operation"), "PURGE") + + // The purge TTL should have removed the message by now. + time.Sleep(1500 * time.Millisecond) + + // Confirm the purge marker is gone. + _, err = stream.GetMsg(ctx, 2) + require_Error(t, err, jetstream.ErrMsgNotFound) + + // Confirm we don't get a redundant subject delete marker. + _, err = stream.GetMsg(ctx, 3) + require_Error(t, err, jetstream.ErrMsgNotFound) + }) + } +} diff --git a/server/memstore.go b/server/memstore.go index ad3a7aa5b2e..05f431be4be 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -1075,7 +1075,7 @@ func (ms *memStore) expireMsgs() { } if sdmEnabled { if last, ok := ms.shouldProcessSdm(seq, sm.subj); ok { - sdm := last && len(getHeader(JSMarkerReason, sm.hdr)) == 0 + sdm := last && isSubjectDeleteMarker(sm.hdr) ms.handleRemovalOrSdm(seq, sm.subj, sdm, sdmTTL) } } else { @@ -1105,7 +1105,7 @@ func (ms *memStore) expireMsgs() { if ttlSdm == nil { ttlSdm = make(map[string][]SDMBySubj, 1) } - ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, len(getHeader(JSMarkerReason, sm.hdr)) != 0}) + ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, !isSubjectDeleteMarker(sm.hdr)}) return false } } else { diff --git a/server/sdm.go b/server/sdm.go index 7431479580b..88a1be4e494 100644 --- a/server/sdm.go +++ b/server/sdm.go @@ -13,7 +13,10 @@ package server -import "time" +import ( + "bytes" + "time" +) // SDMMeta holds pending/proposed data for subject delete markers or message removals. type SDMMeta struct { @@ -40,6 +43,12 @@ func newSDMMeta() *SDMMeta { } } +// isSubjectDeleteMarker returns whether the headers indicate this message is a subject delete marker. +// Either it's a usual marker with JSMarkerReason, or it's a KV Purge marker as the KVOperation. +func isSubjectDeleteMarker(hdr []byte) bool { + return len(sliceHeader(JSMarkerReason, hdr)) == 0 && !bytes.Equal(sliceHeader(KVOperation, hdr), KVOperationValuePurge) +} + // empty clears all data. func (sdm *SDMMeta) empty() { if sdm == nil { diff --git a/server/stream.go b/server/stream.go index 2759f544603..f0ca46f5517 100644 --- a/server/stream.go +++ b/server/stream.go @@ -468,6 +468,12 @@ const ( JSBatchCommit = "Nats-Batch-Commit" ) +// Headers for published KV messages. +var ( + KVOperation = "KV-Operation" + KVOperationValuePurge = []byte("PURGE") +) + // Headers for republished messages and direct gets. const ( JSStream = "Nats-Stream"