-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Io jms fix ack message checkpoint #22932
Io jms fix ack message checkpoint #22932
Conversation
Assigning reviewers. If you would like to opt out of this review, comment R: @lukecwik for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Reminder, please take a look at this pr: @lukecwik @chamikaramj |
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @apilloud for label java. Available commands:
|
Reminder, please take a look at this pr: @apilloud @johnjcasey |
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
Outdated
Show resolved
Hide resolved
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @kileys for label java. Available commands:
|
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
Outdated
Show resolved
Hide resolved
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
Outdated
Show resolved
Hide resolved
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
Outdated
Show resolved
Hide resolved
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
Outdated
Show resolved
Hide resolved
- Upgrade equals and hashcode method in JmsCheckpointMark - Add a serciveExecutor.schedule method in order to close the JMS session after a tiemout and discard all the related checkpointd
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
Outdated
Show resolved
Hide resolved
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
Outdated
Show resolved
Hide resolved
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
Outdated
Show resolved
Hide resolved
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
Outdated
Show resolved
Hide resolved
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
Outdated
Show resolved
Hide resolved
- Get back to the initial implementation of JmsCheckpointMark - Add the discard attribute and discard() method to JmsCheckpointMark
Sorry for the long wait, lots of other issues blocked me from reviewing this. Taking a follow-up look now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All my comments are minor other then improving how we author the testCloseWithTimeout
so it isn't reliant on Thread.sleep
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
Outdated
Show resolved
Hide resolved
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
Show resolved
Hide resolved
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
Outdated
Show resolved
Hide resolved
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
Outdated
Show resolved
Hide resolved
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
Outdated
Show resolved
Hide resolved
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
Outdated
Show resolved
Hide resolved
JmsIO.UnboundedJmsReader reader = source.createReader(null, null); | ||
|
||
reader.start(); | ||
reader.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest using a mock ScheduledExecutorService that you set on the PipelineOptions object when creating the reader. This way you can inject here in the test and capture the runnable/callable directly without needing to have a test reliant on Thread.sleep
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we get the executor service from ExecutorOptions? As far as I can see, PipelineOptions should be then a reader field, but how do we get the ExecutorService from PipelineOptions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pipelineOptions.as(ExecutorOptions.class).getScheduledExecutorService()
, the key part being the as(...) method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I get it. I don't know exactly how to mock it.
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
Outdated
Show resolved
Hide resolved
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
Outdated
Show resolved
Hide resolved
…nalizeCheckpoint method Co-authored-by: Lukasz Cwik <[email protected]>
#23234 was merged allowing you to use the ScheduledExecutorService from PipelineOptions |
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
Outdated
Show resolved
Hide resolved
consumer = null; | ||
closeAutoscaler(); | ||
closeConsumer(); | ||
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pipelineOptions.as(ExecutorOptions.class).getScheduledExecutorService()
, the key part being the as(...) method
Note that the next release cut is this Wednesday so if your able to clean-up this PR we could merge it and it would make its way into the 2.43 release otherwise it will be another 6 weeks before the next release cut. |
Co-authored-by: Lukasz Cwik <[email protected]>
Co-authored-by: Lukasz Cwik <[email protected]>
Co-authored-by: Lukasz Cwik <[email protected]>
… (don't know if it is better)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try updating the test to use a mock.
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
Outdated
Show resolved
Hide resolved
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
Outdated
Show resolved
Hide resolved
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
Outdated
Show resolved
Hide resolved
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
Outdated
Show resolved
Hide resolved
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
Outdated
Show resolved
Hide resolved
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
Show resolved
Hide resolved
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
Outdated
Show resolved
Hide resolved
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
Show resolved
Hide resolved
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
Outdated
Show resolved
Hide resolved
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
Outdated
Show resolved
Hide resolved
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
Outdated
Show resolved
Hide resolved
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
Outdated
Show resolved
Hide resolved
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
Outdated
Show resolved
Hide resolved
Updated to use a mock, now waiting for results of test run before merging. |
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
Outdated
Show resolved
Hide resolved
Run Java PreCommit |
Thanks @lukecwik ! |
Hi @lukecwik would you kindly review this PR related to the following issue:
#20814
Please find the following design documentation:
https://docs.google.com/document/d/19HiNPoJeIlzCFyWGdlw7WEFutceL2AmN_5Z0Vmi1s-I/edit?usp=sharing
assign to next reviewer:
R: @lukecwik
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.