Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -18651,7 +18651,7 @@
"type": "boolean"
},
"bufferConfig": {
"description": "Optional configuration for the streams, consumers and buckets to be created in this JetStream service, if specified, it will be merged with the default configuration in numaflow-controller-config. It accepts a YAML format configuration, it may include 4 sections, \"stream\", \"consumer\", \"otBucket\" and \"procBucket\". Available fields under \"stream\" include \"retention\" (e.g. interest, limits, workerQueue), \"maxMsgs\", \"maxAge\" (e.g. 72h), \"replicas\" (1, 3, 5), \"duplicates\" (e.g. 5m). Available fields under \"consumer\" include \"ackWait\" (e.g. 60s) Available fields under \"otBucket\" include \"maxValueSize\", \"history\", \"ttl\" (e.g. 72h), \"maxBytes\", \"replicas\" (1, 3, 5). Available fields under \"procBucket\" include \"maxValueSize\", \"history\", \"ttl\" (e.g. 72h), \"maxBytes\", \"replicas\" (1, 3, 5).",
"description": "Optional configuration for the streams, consumers and buckets to be created in this JetStream service, if specified, it will be merged with the default configuration in numaflow-controller-config. It accepts a YAML format configuration, it may include 4 sections, \"stream\", \"consumer\", \"otBucket\" and \"procBucket\". Available fields under \"stream\" include \"maxMsgs\", \"maxAge\" (e.g. 72h), \"replicas\" (1, 3, 5), \"duplicates\" (e.g. 5m). Available fields under \"consumer\" include \"ackWait\" (e.g. 60s) Available fields under \"otBucket\" include \"maxValueSize\", \"history\", \"ttl\" (e.g. 72h), \"maxBytes\", \"replicas\" (1, 3, 5). Available fields under \"procBucket\" include \"maxValueSize\", \"history\", \"ttl\" (e.g. 72h), \"maxBytes\", \"replicas\" (1, 3, 5).",
"type": "string"
},
"containerTemplate": {
Expand Down
2 changes: 1 addition & 1 deletion api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -18647,7 +18647,7 @@
"type": "boolean"
},
"bufferConfig": {
"description": "Optional configuration for the streams, consumers and buckets to be created in this JetStream service, if specified, it will be merged with the default configuration in numaflow-controller-config. It accepts a YAML format configuration, it may include 4 sections, \"stream\", \"consumer\", \"otBucket\" and \"procBucket\". Available fields under \"stream\" include \"retention\" (e.g. interest, limits, workerQueue), \"maxMsgs\", \"maxAge\" (e.g. 72h), \"replicas\" (1, 3, 5), \"duplicates\" (e.g. 5m). Available fields under \"consumer\" include \"ackWait\" (e.g. 60s) Available fields under \"otBucket\" include \"maxValueSize\", \"history\", \"ttl\" (e.g. 72h), \"maxBytes\", \"replicas\" (1, 3, 5). Available fields under \"procBucket\" include \"maxValueSize\", \"history\", \"ttl\" (e.g. 72h), \"maxBytes\", \"replicas\" (1, 3, 5).",
"description": "Optional configuration for the streams, consumers and buckets to be created in this JetStream service, if specified, it will be merged with the default configuration in numaflow-controller-config. It accepts a YAML format configuration, it may include 4 sections, \"stream\", \"consumer\", \"otBucket\" and \"procBucket\". Available fields under \"stream\" include \"maxMsgs\", \"maxAge\" (e.g. 72h), \"replicas\" (1, 3, 5), \"duplicates\" (e.g. 5m). Available fields under \"consumer\" include \"ackWait\" (e.g. 60s) Available fields under \"otBucket\" include \"maxValueSize\", \"history\", \"ttl\" (e.g. 72h), \"maxBytes\", \"replicas\" (1, 3, 5). Available fields under \"procBucket\" include \"maxValueSize\", \"history\", \"ttl\" (e.g. 72h), \"maxBytes\", \"replicas\" (1, 3, 5).",
"type": "string"
},
"containerTemplate": {
Expand Down
2 changes: 0 additions & 2 deletions config/advanced-install/namespaced-controller-wo-crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,6 @@ data:
bufferConfig: |
# The default properties of the buffers (streams) to be created in this JetStream service
stream:
# 0: Limits, 1: Interest, 2: WorkQueue
retention: 0
maxMsgs: 100000
maxAge: 72h
maxBytes: -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ data:
bufferConfig: |
# The default properties of the buffers (streams) to be created in this JetStream service
stream:
# 0: Limits, 1: Interest, 2: WorkQueue
retention: 0
maxMsgs: 100000
maxAge: 72h
maxBytes: -1
Expand Down
2 changes: 0 additions & 2 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18320,8 +18320,6 @@ data:
bufferConfig: |
# The default properties of the buffers (streams) to be created in this JetStream service
stream:
# 0: Limits, 1: Interest, 2: WorkQueue
retention: 0
maxMsgs: 100000
maxAge: 72h
maxBytes: -1
Expand Down
2 changes: 0 additions & 2 deletions config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18223,8 +18223,6 @@ data:
bufferConfig: |
# The default properties of the buffers (streams) to be created in this JetStream service
stream:
# 0: Limits, 1: Interest, 2: WorkQueue
retention: 0
maxMsgs: 100000
maxAge: 72h
maxBytes: -1
Expand Down
13 changes: 6 additions & 7 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -4337,13 +4337,12 @@ created in this JetStream service, if specified, it will be merged with
the default configuration in numaflow-controller-config. It accepts a
YAML format configuration, it may include 4 sections, “stream”,
“consumer”, “otBucket” and “procBucket”. Available fields under “stream”
include “retention” (e.g. interest, limits, workerQueue), “maxMsgs”,
“maxAge” (e.g. 72h), “replicas” (1, 3, 5), “duplicates” (e.g. 5m).
Available fields under “consumer” include “ackWait” (e.g. 60s) Available
fields under “otBucket” include “maxValueSize”, “history”, “ttl”
(e.g. 72h), “maxBytes”, “replicas” (1, 3, 5). Available fields under
“procBucket” include “maxValueSize”, “history”, “ttl” (e.g. 72h),
“maxBytes”, “replicas” (1, 3, 5).
include “maxMsgs”, “maxAge” (e.g. 72h), “replicas” (1, 3, 5),
“duplicates” (e.g. 5m). Available fields under “consumer” include
“ackWait” (e.g. 60s) Available fields under “otBucket” include
“maxValueSize”, “history”, “ttl” (e.g. 72h), “maxBytes”, “replicas” (1,
3, 5). Available fields under “procBucket” include “maxValueSize”,
“history”, “ttl” (e.g. 72h), “maxBytes”, “replicas” (1, 3, 5).
</p>

</td>
Expand Down
2 changes: 0 additions & 2 deletions docs/core-concepts/inter-step-buffer-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ Both these 2 places expect a YAML format configuration like below:
bufferConfig: |
# The properties of the buffers (streams) to be created in this JetStream service
stream:
# 0: Limits, 1: Interest, 2: WorkQueue
retention: 1
maxMsgs: 30000
maxAge: 168h
maxBytes: -1
Expand Down
2 changes: 0 additions & 2 deletions docs/operations/numaflow-controller-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ data:
bufferConfig: |
# The default properties of the buffers (streams) to be created in this JetStream service
stream:
# 0: Limits, 1: Interest, 2: WorkQueue
retention: 0
maxMsgs: 2000000
maxAge: 168h
maxBytes: -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ spec:

It's not recommended to use values over `8388608` (8MB) but `max_payload` can be set up to `67108864` (64MB).

Please be aware that if you increase the max message size of the `InterStepBufferService`, you probably will also need to change some other limits. For example, if the size of each messages is as large as 8MB, then 100 messages flowing in the pipeline will make each of the Inter-Step Buffer need at least 800MB of disk space to store the messages, and the memory consumption will also be high, that will probably cause the Inter-Step Buffer Service to crash. In that case, you might need to update the retention policy in the Inter-Step Buffer Service to make sure the messages are not stored for too long. Check out the [Inter-Step Buffer Service](../../../core-concepts/inter-step-buffer-service.md#buffer-configuration) for more details.
Please be aware that if you increase the max message size of the `InterStepBufferService`, you probably will also need to change some other limits. For example, if the size of each messages is as large as 8MB, then 100 messages flowing in the pipeline will make each of the Inter-Step Buffer need at least 800MB of storage to store the messages, and the memory consumption overhead will also be high, which could cause the Inter-Step Buffer Service to crash. In that eventuality, Check out the [Inter-Step Buffer Service](../../../core-concepts/inter-step-buffer-service.md#buffer-configuration) for more details.
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/jetstream_buffer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type JetStreamBufferService struct {
StartArgs []string `json:"startArgs,omitempty" protobuf:"bytes,9,rep,name=startArgs"`
// Optional configuration for the streams, consumers and buckets to be created in this JetStream service, if specified, it will be merged with the default configuration in numaflow-controller-config.
// It accepts a YAML format configuration, it may include 4 sections, "stream", "consumer", "otBucket" and "procBucket".
// Available fields under "stream" include "retention" (e.g. interest, limits, workerQueue), "maxMsgs", "maxAge" (e.g. 72h), "replicas" (1, 3, 5), "duplicates" (e.g. 5m).
// Available fields under "stream" include "maxMsgs", "maxAge" (e.g. 72h), "replicas" (1, 3, 5), "duplicates" (e.g. 5m).
// Available fields under "consumer" include "ackWait" (e.g. 60s)
// Available fields under "otBucket" include "maxValueSize", "history", "ttl" (e.g. 72h), "maxBytes", "replicas" (1, 3, 5).
// Available fields under "procBucket" include "maxValueSize", "history", "ttl" (e.g. 72h), "maxBytes", "replicas" (1, 3, 5).
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/isb/stores/jetstream/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var isbSoftUsage = promauto.NewGaugeVec(prometheus.GaugeOpts{
Help: "Percentage of buffer soft usage",
}, []string{"buffer"})

// isbSolidUsage is used to indicate of buffer that is used up, it is calculated based on the messages remain in the stream (if it's not Limits retention policy)
// isbSolidUsage is used to indicate buffer usage, it is calculated based on the messages remaining in the stream.
var isbSolidUsage = promauto.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: "isb_jetstream",
Name: "buffer_solid_usage",
Expand Down
2 changes: 1 addition & 1 deletion pkg/isb/stores/jetstream/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func addStream(t *testing.T, js nats.JetStreamContext, streamName string) {
Name: streamName,
Subjects: []string{streamName},
Retention: nats.WorkQueuePolicy,
Discard: nats.DiscardOld,
Discard: nats.DiscardNew,
MaxMsgs: 100, //
Storage: nats.FileStorage,
Duplicates: 2 * 60 * time.Second,
Expand Down
6 changes: 1 addition & 5 deletions pkg/isb/stores/jetstream/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,7 @@ func (jw *jetStreamWriter) runStatusChecker(ctx context.Context) {
}
var solidUsage, softUsage float64
softUsage = (float64(c.NumPending) + float64(c.NumAckPending)) / float64(jw.opts.maxLength)
if s.Config.Retention == nats.LimitsPolicy {
solidUsage = softUsage
} else {
solidUsage = float64(s.State.Msgs) / float64(jw.opts.maxLength)
}
solidUsage = float64(s.State.Msgs) / float64(jw.opts.maxLength)
// TODO: solid usage calculation might be incorrect due to incorrect JetStream metadata issue caused by pod migration, need to revisit this later.
// We should set up alerts with sort of rules to detect the metadata issue. For example, solidUsage is too much greater than softUsage for a while.
if solidUsage >= jw.opts.bufferUsageLimit && softUsage >= jw.opts.bufferUsageLimit {
Expand Down
7 changes: 2 additions & 5 deletions pkg/isbsvc/jetstream_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ func (jss *jetStreamSvc) CreateBuffersAndBuckets(ctx context.Context, buffers, b
if _, err := jss.js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{streamName}, // Use the stream name as the only subject
Retention: nats.RetentionPolicy(v.GetInt("stream.retention")),
Discard: nats.DiscardOld,
Retention: nats.WorkQueuePolicy,
Discard: nats.DiscardNew,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make it overridable by the user?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can set it when users specify "drop on full" ? It's pretty much the exact same behaviour.
Though according to @yhl25 DiscardNew can't be used, but I'm still trying to understand why in the other issue.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to figure out why it won't work, ideally it should.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to make it configurable to the user, but let's set it to DiscardOld by default since we use the Limits Policy by default. Also, please add a comment saying DiscardNew can only be used with WorkQueue Policy.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we cannot use DiscardNew with Limits policy. we should not let the pipeline even start and validation should fail.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, I will try to implement that, and try to reproduce the issue @yhl25 was mentioning with stuck messages.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should also document the risk of data loss on surge when using DiscardOld on high throughput and be really transparent with why it happens. Right now, users like me who fiddle with the config might be in big trouble if some UDF/Sink create a silent data loss scenario on production.

MaxMsgs: v.GetInt64("stream.maxMsgs"),
MaxAge: v.GetDuration("stream.maxAge"),
MaxBytes: v.GetInt64("stream.maxBytes"),
Expand Down Expand Up @@ -285,9 +285,6 @@ func (jss *jetStreamSvc) GetBufferInfo(ctx context.Context, buffer string) (*Buf
return nil, fmt.Errorf("failed to get consumer information of stream %q", streamName)
}
totalMessages := int64(stream.State.Msgs)
if stream.Config.Retention == nats.LimitsPolicy {
totalMessages = int64(consumer.NumPending) + int64(consumer.NumAckPending)
}
bufferInfo := &BufferInfo{
Name: buffer,
PendingCount: int64(consumer.NumPending),
Expand Down
2 changes: 1 addition & 1 deletion ui/src/components/pages/Namespace/index.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ const mockISBRawData = {
},
},
streamConfig:
"consumer:\n ackwait: 60s\n maxackpending: 25000\notbucket:\n history: 1\n maxbytes: 0\n maxvaluesize: 0\n replicas: 3\n storage: 0\n ttl: 3h\nprocbucket:\n history: 1\n maxbytes: 0\n maxvaluesize: 0\n replicas: 3\n storage: 0\n ttl: 72h\nstream:\n duplicates: 60s\n maxage: 72h\n maxbytes: -1\n maxmsgs: 100000\n replicas: 3\n retention: 0\n storage: 0\n",
"consumer:\n ackwait: 60s\n maxackpending: 25000\notbucket:\n history: 1\n maxbytes: 0\n maxvaluesize: 0\n replicas: 3\n storage: 0\n ttl: 3h\nprocbucket:\n history: 1\n maxbytes: 0\n maxvaluesize: 0\n replicas: 3\n storage: 0\n ttl: 72h\nstream:\n duplicates: 60s\n maxage: 72h\n maxbytes: -1\n maxmsgs: 100000\n replicas: 3\n storage: 0\n",
},
},
type: "jetstream",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ const mockData = {
},
},
streamConfig:
"consumer:\n ackwait: 60s\n maxackpending: 25000\notbucket:\n history: 1\n maxbytes: 0\n maxvaluesize: 0\n replicas: 3\n storage: 0\n ttl: 3h\nprocbucket:\n history: 1\n maxbytes: 0\n maxvaluesize: 0\n replicas: 3\n storage: 0\n ttl: 72h\nstream:\n duplicates: 60s\n maxage: 72h\n maxbytes: -1\n maxmsgs: 100000\n replicas: 3\n retention: 0\n storage: 0\n",
"consumer:\n ackwait: 60s\n maxackpending: 25000\notbucket:\n history: 1\n maxbytes: 0\n maxvaluesize: 0\n replicas: 3\n storage: 0\n ttl: 3h\nprocbucket:\n history: 1\n maxbytes: 0\n maxvaluesize: 0\n replicas: 3\n storage: 0\n ttl: 72h\nstream:\n duplicates: 60s\n maxage: 72h\n maxbytes: -1\n maxmsgs: 100000\n replicas: 3\n storage: 0\n",
},
},
type: "jetstream",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ const mockISBRawData = {
},
},
streamConfig:
"consumer:\n ackwait: 60s\n maxackpending: 25000\notbucket:\n history: 1\n maxbytes: 0\n maxvaluesize: 0\n replicas: 3\n storage: 0\n ttl: 3h\nprocbucket:\n history: 1\n maxbytes: 0\n maxvaluesize: 0\n replicas: 3\n storage: 0\n ttl: 72h\nstream:\n duplicates: 60s\n maxage: 72h\n maxbytes: -1\n maxmsgs: 100000\n replicas: 3\n retention: 0\n storage: 0\n",
"consumer:\n ackwait: 60s\n maxackpending: 25000\notbucket:\n history: 1\n maxbytes: 0\n maxvaluesize: 0\n replicas: 3\n storage: 0\n ttl: 3h\nprocbucket:\n history: 1\n maxbytes: 0\n maxvaluesize: 0\n replicas: 3\n storage: 0\n ttl: 72h\nstream:\n duplicates: 60s\n maxage: 72h\n maxbytes: -1\n maxmsgs: 100000\n replicas: 3\n storage: 0\n",
},
},
type: "jetstream",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ const mockISBData = {
},
},
streamConfig:
"consumer:\n ackwait: 60s\n maxackpending: 25000\notbucket:\n history: 1\n maxbytes: 0\n maxvaluesize: 0\n replicas: 3\n storage: 0\n ttl: 3h\nprocbucket:\n history: 1\n maxbytes: 0\n maxvaluesize: 0\n replicas: 3\n storage: 0\n ttl: 72h\nstream:\n duplicates: 60s\n maxage: 72h\n maxbytes: -1\n maxmsgs: 100000\n replicas: 3\n retention: 0\n storage: 0\n",
"consumer:\n ackwait: 60s\n maxackpending: 25000\notbucket:\n history: 1\n maxbytes: 0\n maxvaluesize: 0\n replicas: 3\n storage: 0\n ttl: 3h\nprocbucket:\n history: 1\n maxbytes: 0\n maxvaluesize: 0\n replicas: 3\n storage: 0\n ttl: 72h\nstream:\n duplicates: 60s\n maxage: 72h\n maxbytes: -1\n maxmsgs: 100000\n replicas: 3\n storage: 0\n",
},
},
type: "jetstream",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ const mockData = {
},
},
streamConfig:
"consumer:\n ackwait: 60s\n maxackpending: 25000\notbucket:\n history: 1\n maxbytes: 0\n maxvaluesize: 0\n replicas: 3\n storage: 0\n ttl: 3h\nprocbucket:\n history: 1\n maxbytes: 0\n maxvaluesize: 0\n replicas: 3\n storage: 0\n ttl: 72h\nstream:\n duplicates: 60s\n maxage: 72h\n maxbytes: -1\n maxmsgs: 100000\n replicas: 3\n retention: 0\n storage: 0\n",
"consumer:\n ackwait: 60s\n maxackpending: 25000\notbucket:\n history: 1\n maxbytes: 0\n maxvaluesize: 0\n replicas: 3\n storage: 0\n ttl: 3h\nprocbucket:\n history: 1\n maxbytes: 0\n maxvaluesize: 0\n replicas: 3\n storage: 0\n ttl: 72h\nstream:\n duplicates: 60s\n maxage: 72h\n maxbytes: -1\n maxmsgs: 100000\n replicas: 3\n storage: 0\n",
},
},
type: "jetstream",
Expand Down
Loading