diff --git a/pkg/isbsvc/jetstream_service.go b/pkg/isbsvc/jetstream_service.go index c1f1bd489e..867d8a655e 100644 --- a/pkg/isbsvc/jetstream_service.go +++ b/pkg/isbsvc/jetstream_service.go @@ -120,11 +120,35 @@ func (jss *jetStreamSvc) CreateBuffersAndBuckets(ctx context.Context, buffers, b if !errors.Is(err, nats.ErrStreamNotFound) { return fmt.Errorf("failed to query information of stream %q during buffer creating, %w", streamName, err) } + // get the retention policy from the stream config + retention := nats.RetentionPolicy(v.GetInt("stream.retention")) + discard := nats.DiscardNew + + // Based on the retention policy we use the following discard policy + // 1) Limits Policy -> DiscardOld + // 2) WorkQueuePolicy/Interest -> DiscardNew + + // In WorkQueuePolicy the messages will be removed as soon as the Consumer received an Acknowledgement. + // In InterestPolicy messages will be removed as soon as all Consumers of the stream for that subject have + // received an Acknowledgement for the message. + // For Numaflow, workqueue and interest is the same, because we only have one consumer + // Old messages should be deleted once, they are acknowledged, hence we use DiscardNew with these two + // policies in which during a buffer full we will not write more message to the stream and wait + // for the older messages to get cleared + + // When operating with DiscardNew and Limits, on reaching the maxMsgs limit, it will result in the stream + // returning an error when attempting to write new messages and old messages will not be deleted from the stream + // so the pipeline will get stuck. Hence, we cannot use Limits with DiscardNew. + // + if retention == nats.LimitsPolicy { + discard = nats.DiscardOld + } + if _, err := jss.js.AddStream(&nats.StreamConfig{ Name: streamName, Subjects: []string{streamName}, // Use the stream name as the only subject - Retention: nats.RetentionPolicy(v.GetInt("stream.retention")), - Discard: nats.DiscardOld, + Retention: retention, + Discard: discard, MaxMsgs: v.GetInt64("stream.maxMsgs"), MaxAge: v.GetDuration("stream.maxAge"), MaxBytes: v.GetInt64("stream.maxBytes"),