Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/isbsvc/jetstream_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (jss *jetStreamSvc) CreateBuffersAndBuckets(ctx context.Context, buffers, b
Name: streamName,
Subjects: []string{streamName}, // Use the stream name as the only subject
Retention: nats.RetentionPolicy(v.GetInt("stream.retention")),
Discard: nats.DiscardOld,
Discard: nats.DiscardNew,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make it overridable by the user?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can set it when users specify "drop on full" ? It's pretty much the exact same behaviour.
Though according to @yhl25 DiscardNew can't be used, but I'm still trying to understand why in the other issue.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to figure out why it won't work, ideally it should.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to make it configurable to the user, but let's set it to DiscardOld by default since we use the Limits Policy by default. Also, please add a comment saying DiscardNew can only be used with WorkQueue Policy.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we cannot use DiscardNew with Limits policy. we should not let the pipeline even start and validation should fail.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, I will try to implement that, and try to reproduce the issue @yhl25 was mentioning with stuck messages.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should also document the risk of data loss on surge when using DiscardOld on high throughput and be really transparent with why it happens. Right now, users like me who fiddle with the config might be in big trouble if some UDF/Sink create a silent data loss scenario on production.

MaxMsgs: v.GetInt64("stream.maxMsgs"),
MaxAge: v.GetDuration("stream.maxAge"),
MaxBytes: v.GetInt64("stream.maxBytes"),
Expand Down