-
Notifications
You must be signed in to change notification settings - Fork 592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: impl ack and migrate to durable consumer for Nats #18873
Conversation
Signed-off-by: tabVersion <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After browsing the doc about the model, I don't get why we need to ack for NATS Jetstream.
https://docs.nats.io/using-nats/developer/develop_jetstream/consumers#delivery-reliability
The "AckPolicy" is defined by the Consumer, not the Stream. And it means "application level acknowledgements", like failure in bussiness logic.
We can just always use none
and no need to ack.
Thanks for your fast review, the pr is part of Nats JetStream refactor. Let me address your concern in a tracking issue. |
@@ -30,9 +36,12 @@ impl From<NatsMessage> for SourceMessage { | |||
key: None, | |||
payload: Some(message.payload), | |||
// For nats jetstream, use sequence id as offset | |||
// DEPRECATED: no longer use sequence id as offset, let nats broker handle failover |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
match ack_policy { | ||
JetStreamAckPolicy::None => (), | ||
JetStreamAckPolicy::Explicit => { | ||
for reply_subject in reply_subjects { | ||
ack(context, reply_subject).await; | ||
} | ||
} | ||
JetStreamAckPolicy::All => { | ||
if let Some(reply_subject) = reply_subjects.last() { | ||
ack(context, reply_subject.clone()).await; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, the subjects like "$JS.ACK.test_stream_1.l2vxD20k.1.3.4.1728547619594368340.0"
already contain the offset information, right?
Want to make the logic correct in this situation:
Nats send message: m1,m2,m3,m4
RW get message m1, m2, (checkpoint 1), m3, m4
When we do checkpoint 1 and send batch ack, which means ack m2 subjects. In nats part, it will only ack the m1, m2, and m3,m4 not ack, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, the subjects like "$JS.ACK.test_stream_1.l2vxD20k.1.3.4.1728547619594368340.0" already contain the offset information, right?
yes
Want to make the logic correct in this situation:
Nats send message: m1,m2,m3,m4
RW get message m1, m2, (checkpoint 1), m3, m4
When we do checkpoint 1 and send batch ack, which means ack m2 subjects. In nats part, it will only ack the m1, m2, and m3,m4 not ack, right?
If ack_policy is ack_all (batch ack), at ckpt_1, we just ack msg2, which ack both msg1 and msg2. Msg3 and msg4 are not ack-ed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
|
GitGuardian id | GitGuardian status | Secret | Commit | Filename | |
---|---|---|---|---|---|
9425213 | Triggered | Generic Password | a6ffcd6 | e2e_test/source/tvf/postgres_query.slt | View secret |
9425213 | Triggered | Generic Password | a6ffcd6 | e2e_test/source/tvf/postgres_query.slt | View secret |
9425213 | Triggered | Generic Password | 02f28b8 | ci/scripts/e2e-source-test.sh | View secret |
🛠 Guidelines to remediate hardcoded secrets
- Understand the implications of revoking this secret by investigating where it is used in your code.
- Replace and store your secrets safely. Learn here the best practices.
- Revoke and rotate these secrets.
- If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.
To avoid such incidents in the future consider
- following these best practices for managing and storing secrets including API keys and other credentials
- install secret detection on pre-commit to catch secret before it leaves your machine and ease remediation.
🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.
// | ||
// DEPRECATED: no longer use sequence id as offset, let nats broker handle failover | ||
// use reply_subject as offset for ack use, we just check the persisted state for whether this is the first run | ||
offset: message.reply_subject.unwrap_or_default(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yufansong Unfortunately, offset
column should be kept as additional column and I think it is not worth making an exception for Nats. We use offset
here to store the reply subject.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get it. SGTM
// We have record on this Nats Split, contains the last seen offset (seq id) or reply subject | ||
// We do not use the seq id as start position anymore, | ||
// but just let the reader load from durable consumer on broker. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand these comments, but you didn't make any change to the implementation code (L58 ~ L72). Why & how it works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use api get_or_create_consumer
when building stream consumer. api ref
If we utilize an existing durable consumer, the provided config should be the same as the consumer created. So the config here should never change, always align to the one conducted from with clause.
following changes in #18895
@@ -61,6 +62,8 @@ def_anyhow_newtype! { | |||
async_nats::jetstream::consumer::pull::MessagesError => "Nats error", | |||
async_nats::jetstream::context::CreateStreamError => "Nats error", | |||
async_nats::jetstream::stream::ConsumerError => "Nats error", | |||
NatsJetStreamError => "Nats error", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All these 5 errors are "Nats error"
. Consider including more context?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the main motivation is performance, should we first do a quick demo & benchmark to confirm this direction is correct?
Another thing I still don't understand why we need to make Ack policy customizable. It should be decided by the application (i.e., us).
Don't get me wrong, the motivation for the refactor is making Nats JetStream consumer parallelizable. Ack is not avoidable when syncing consumption progress among workers. |
I mean exactly that we can test "consumer group"-like usage can have perf improvements. Otherwise, perhaps sth like manually shard subjects into sub-subjects is inevitable. e.g., Chrisss93/flink-connector-nats says:
(I do not mean what they are doing is correct neither.) Google Pub/Sub mentioned explicitly that they are "per-message parallelism". But I feel lost when browsing NATS's docs about what's the best practice. Therefore, IMO testing is better. |
On the other hand, cumulative ack (AckAll)'s overhead should be much smaller than individual acks. |
risingwave/src/connector/src/source/nats/mod.rs Lines 137 to 139 in b6c4ca8
we are offering the flexibility to users. |
I asked this exact question above:
I don't get what benefits this flexibility brings to users. |
Agree. IIUC, changing "ack all" + "sequence number as offset" to per-message ack is a regression in terms of the "exactly-once" semantics. It's reasonable to prove there'll be performance benefit. And if we make it customizable, when user specifies |
Previously we are not "ack all", but no ack (regardless of ack policy). And regarding exactly-once and performance, #18876 has more discussion. |
Previous, we use no ack to keep exactly-once sematic, but will have performance problem |
Signed-off-by: tabVersion <[email protected]>
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
As title, do ack on messages when a barrier completes.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
add a required param for nats connector
consumer.durable_name
, and we will either build a new one or continue reading from an existing durable consumer.Note here: we don't encourage reusing the
consumer.durable_name
across streaming jobs, it will cause data loss.After this pr, RW no longer keeps offset of Nats, the broker is responsible for offset management. And we have to accept a regression on sematic, from exactly-once to at-least-once.
If users care about data loss, they should set
consumer.ack_policy
toall
orexplicit
.get_or_create_consumer
requires the config provided should remain the same as it was created. It does not encourage reading from a spec offset. If we really want to do it, we should build a new one.