Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5652,7 +5652,7 @@ func (fs *fileStore) expireMsgs() {
// if it was the last message of that particular subject that we just deleted.
if sdmEnabled {
if last, ok := fs.shouldProcessSdm(seq, sm.subj); ok {
sdm := last && len(getHeader(JSMarkerReason, sm.hdr)) == 0
sdm := last && isSubjectDeleteMarker(sm.hdr)
fs.handleRemovalOrSdm(seq, sm.subj, sdm, sdmTTL)
}
} else {
Expand Down Expand Up @@ -5685,7 +5685,7 @@ func (fs *fileStore) expireMsgs() {
if ttlSdm == nil {
ttlSdm = make(map[string][]SDMBySubj, 1)
}
ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, len(getHeader(JSMarkerReason, sm.hdr)) != 0})
ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, !isSubjectDeleteMarker(sm.hdr)})
} else {
// Collect sequences to remove. Don't remove messages inline here,
// as that releases the lock and THW is not thread-safe.
Expand Down
61 changes: 61 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats-server/v2/server/sysmem"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/nats-io/nkeys"
"github.com/nats-io/nuid"
)
Expand Down Expand Up @@ -21122,3 +21123,63 @@ func TestJetStreamGetNoHeaders(t *testing.T) {
require_Equal(t, headers.Get("Nats-Time-Stamp"), _EMPTY_)
})
}

func TestJetStreamKVNoSubjectDeleteMarkerOnPurgeMarker(t *testing.T) {
for _, storage := range []jetstream.StorageType{jetstream.FileStorage, jetstream.MemoryStorage} {
t.Run(storage.String(), func(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnectNewAPI(t, s)
defer nc.Close()

ctx := context.Background()
kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "bucket",
History: 1,
Storage: storage,
TTL: 2 * time.Second,
LimitMarkerTTL: time.Minute,
})
require_NoError(t, err)

stream, err := js.Stream(ctx, "KV_bucket")
require_NoError(t, err)

// Purge such that the bucket TTL expires this message.
require_NoError(t, kv.Purge(ctx, "key"))
rsm, err := stream.GetMsg(ctx, 1)
require_NoError(t, err)
require_Equal(t, rsm.Header.Get("KV-Operation"), "PURGE")

// The bucket TTL should have removed the message by now.
time.Sleep(2500 * time.Millisecond)

// Confirm the purge marker is gone.
_, err = stream.GetMsg(ctx, 1)
require_Error(t, err, jetstream.ErrMsgNotFound)
require_Equal(t, rsm.Header.Get("KV-Operation"), "PURGE")

// Confirm we don't get a redundant subject delete marker.
_, err = stream.GetMsg(ctx, 2)
require_Error(t, err, jetstream.ErrMsgNotFound)

// Purge with a TTL so it expires this message.
require_NoError(t, kv.Purge(ctx, "key", jetstream.PurgeTTL(time.Second)))
rsm, err = stream.GetMsg(ctx, 2)
require_NoError(t, err)
require_Equal(t, rsm.Header.Get("KV-Operation"), "PURGE")

// The purge TTL should have removed the message by now.
time.Sleep(1500 * time.Millisecond)

// Confirm the purge marker is gone.
_, err = stream.GetMsg(ctx, 2)
require_Error(t, err, jetstream.ErrMsgNotFound)

// Confirm we don't get a redundant subject delete marker.
_, err = stream.GetMsg(ctx, 3)
require_Error(t, err, jetstream.ErrMsgNotFound)
})
}
}
4 changes: 2 additions & 2 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,7 @@ func (ms *memStore) expireMsgs() {
}
if sdmEnabled {
if last, ok := ms.shouldProcessSdm(seq, sm.subj); ok {
sdm := last && len(getHeader(JSMarkerReason, sm.hdr)) == 0
sdm := last && isSubjectDeleteMarker(sm.hdr)
ms.handleRemovalOrSdm(seq, sm.subj, sdm, sdmTTL)
}
} else {
Expand Down Expand Up @@ -1105,7 +1105,7 @@ func (ms *memStore) expireMsgs() {
if ttlSdm == nil {
ttlSdm = make(map[string][]SDMBySubj, 1)
}
ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, len(getHeader(JSMarkerReason, sm.hdr)) != 0})
ttlSdm[sm.subj] = append(ttlSdm[sm.subj], SDMBySubj{seq, !isSubjectDeleteMarker(sm.hdr)})
return false
}
} else {
Expand Down
11 changes: 10 additions & 1 deletion server/sdm.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@

package server

import "time"
import (
"bytes"
"time"
)

// SDMMeta holds pending/proposed data for subject delete markers or message removals.
type SDMMeta struct {
Expand All @@ -40,6 +43,12 @@ func newSDMMeta() *SDMMeta {
}
}

// isSubjectDeleteMarker returns whether the headers indicate this message is a subject delete marker.
// Either it's a usual marker with JSMarkerReason, or it's a KV Purge marker as the KVOperation.
func isSubjectDeleteMarker(hdr []byte) bool {
return len(sliceHeader(JSMarkerReason, hdr)) == 0 && !bytes.Equal(sliceHeader(KVOperation, hdr), KVOperationValuePurge)
}

// empty clears all data.
func (sdm *SDMMeta) empty() {
if sdm == nil {
Expand Down
6 changes: 6 additions & 0 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,12 @@ const (
JSBatchCommit = "Nats-Batch-Commit"
)

// Headers for published KV messages.
var (
KVOperation = "KV-Operation"
KVOperationValuePurge = []byte("PURGE")
)

// Headers for republished messages and direct gets.
const (
JSStream = "Nats-Stream"
Expand Down