Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions pkg/isbsvc/jetstream_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,23 @@ func (jss *jetStreamSvc) CreateBuffersAndBuckets(ctx context.Context, buffers, b
if !errors.Is(err, nats.ErrStreamNotFound) {
return fmt.Errorf("failed to query information of stream %q during buffer creating, %w", streamName, err)
}
// get the retention policy from the stream config
retention := nats.RetentionPolicy(v.GetInt("stream.retention"))
// default discard policy is set to DiscardOld
discard := nats.DiscardOld

// Based on the retention policy we use the following discard policy
// 1) Limits/Interest Policy -> DiscardOld
// 2) WorkQueuePolicy -> DiscardNew
if retention == nats.WorkQueuePolicy {
discard = nats.DiscardNew
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about Interest? Why should it be DiscardOld? please document

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went ahead as it was default, but I realised it didn't seem to be correct.
We should use it with DiscardNew instead
I have added few lines to document, please take a look @vigith


if _, err := jss.js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{streamName}, // Use the stream name as the only subject
Retention: nats.RetentionPolicy(v.GetInt("stream.retention")),
Discard: nats.DiscardOld,
Retention: retention,
Discard: discard,
MaxMsgs: v.GetInt64("stream.maxMsgs"),
MaxAge: v.GetDuration("stream.maxAge"),
MaxBytes: v.GetInt64("stream.maxBytes"),
Expand Down