Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions pkg/isbsvc/jetstream_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,35 @@ func (jss *jetStreamSvc) CreateBuffersAndBuckets(ctx context.Context, buffers, b
if !errors.Is(err, nats.ErrStreamNotFound) {
return fmt.Errorf("failed to query information of stream %q during buffer creating, %w", streamName, err)
}
// get the retention policy from the stream config
retention := nats.RetentionPolicy(v.GetInt("stream.retention"))
discard := nats.DiscardNew

// Based on the retention policy we use the following discard policy
// 1) Limits Policy -> DiscardOld
// 2) WorkQueuePolicy/Interest -> DiscardNew

// In WorkQueuePolicy the messages will be removed as soon as the Consumer received an Acknowledgement.
// In InterestPolicy messages will be removed as soon as all Consumers of the stream for that subject have
// received an Acknowledgement for the message.
// For Numaflow, workqueue and interest is the same, because we only have one consumer
// Old messages should be deleted once, they are acknowledged, hence we use DiscardNew with these two
// policies in which during a buffer full we will not write more message to the stream and wait
// for the older messages to get cleared

// When operating with DiscardNew and Limits, on reaching the maxMsgs limit, it will result in the stream
// returning an error when attempting to write new messages and old messages will not be deleted from the stream
// so the pipeline will get stuck. Hence, we cannot use Limits with DiscardNew.
//
if retention == nats.LimitsPolicy {
discard = nats.DiscardOld
}

if _, err := jss.js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{streamName}, // Use the stream name as the only subject
Retention: nats.RetentionPolicy(v.GetInt("stream.retention")),
Discard: nats.DiscardOld,
Retention: retention,
Discard: discard,
MaxMsgs: v.GetInt64("stream.maxMsgs"),
MaxAge: v.GetDuration("stream.maxAge"),
MaxBytes: v.GetInt64("stream.maxBytes"),
Expand Down