-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[refactor][function][PIP-166] Function add MANUAL delivery semantics #16279
Conversation
8e23f7e
to
3f4dcbb
Compare
@asafm @nlu90 @freeznet @eolivelli @codelipenghui Please help to review this, I am coding on |
Modifications of the |
...ctions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
Outdated
Show resolved
Hide resolved
...ons/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
Outdated
Show resolved
Hide resolved
...unctions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarFunctionRecord.java
Outdated
Show resolved
Hide resolved
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
Outdated
Show resolved
Hide resolved
pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
Outdated
Show resolved
Hide resolved
...ions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarFunctionRecordTest.java
Outdated
Show resolved
Hide resolved
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
Outdated
Show resolved
Hide resolved
...functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
Outdated
Show resolved
Hide resolved
pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
Outdated
Show resolved
Hide resolved
/pulsarbot run-failure-checks |
assert.Panics(t, func() { | ||
newInstanceConfWithConf(&cfg.Conf{ProcessingGuarantees: 0, AutoACK: false}) | ||
}, "Should have a panic") | ||
assert.Panics(t, func() { | ||
newInstanceConfWithConf(&cfg.Conf{ProcessingGuarantees: 1, AutoACK: false}) | ||
}, "Should have a panic") | ||
assert.Panics(t, func() { | ||
newInstanceConfWithConf(&cfg.Conf{ProcessingGuarantees: 2}) | ||
}, "Should have a panic") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this will introduce break change? If users upgrade to the new version but with the old configs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's more like a bug fix, In go
instance, it does not support exactly-once
. It was not checked at startup before, which will cause the function to not ack any message. python
instance has the same problem.
@nahguam Thanks for your review, all suggestions are fixed, please look again. Thanks. |
8c75395
to
5d6ce24
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work, @shibd, LGTM.
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
Show resolved
Hide resolved
…tions/windowing/WindowFunctionExecutor.java Co-authored-by: Dave Maughan <[email protected]>
Co-authored-by: Dave Maughan <[email protected]>
…tions/source/PulsarFunctionRecord.java Co-authored-by: Dave Maughan <[email protected]>
…ns/utils/FunctionConfigUtils.java Co-authored-by: Dave Maughan <[email protected]>
Co-authored-by: Dave Maughan <[email protected]>
Co-authored-by: Dave Maughan <[email protected]>
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
2 similar comments
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
Motivation
#15560
Modifications
autoAck
will be deprecated.MANUAL
processing guarantees.autoAck == false
andprocessingGuarantees == ATMOST_ONCE
verification.processingGuarantees
not support EFFECTIVELY_ONCE validate.Verifying this change
This change added tests and can be verified as follows:
FunctionConfigUtils
coverautoAck == fase
andATMOST_ONCE
scenarios.SinkConfigUtils
coverautoAck == fase
andATMOST_ONCE
scenarios.FunctionConfigUtils
cover WindowsFunctionConfig will set processingGuarantees.ContextImplTest
andPulsarFunctionRecordTest
cover whenprocessingGuarantees != MANUAL
, call record.ack() ignore.JavaInstanceRunnableTest
cover whenfunction result == null
, scenarios in which ack messages are required under various configurations.WindowsFunctionExecutorTest
cover windows function support processing guarantees scenarios.Other unit tests
have not changed, compatible with the current version implementation.Documentation
doc
(Your PR contains doc changes)