-
Notifications
You must be signed in to change notification settings - Fork 2.2k
CosmosDb: Remove blocking calls from CFP #4602
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…er various fixes.
|
/azp run java-cosmos-tests |
|
No pipelines are associated with this pull request. |
|
/azp run java - cosmos - tests |
|
Azure Pipelines successfully started running 1 pipeline(s). |
moderakh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- there are still some blocking call in the code (please check my comments).
- please add unit tests. There are a lot of code here which is not tested.
...src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/AutoCheckpointer.java
Outdated
Show resolved
Hide resolved
...src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/BootstrapperImpl.java
Outdated
Show resolved
Hide resolved
...src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/LeaseRenewerImpl.java
Outdated
Show resolved
Hide resolved
...src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/LeaseRenewerImpl.java
Outdated
Show resolved
Hide resolved
...src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/LeaseRenewerImpl.java
Outdated
Show resolved
Hide resolved
...n/java/com/azure/data/cosmos/internal/changefeed/implementation/PartitionControllerImpl.java
Outdated
Show resolved
Hide resolved
...n/java/com/azure/data/cosmos/internal/changefeed/implementation/PartitionControllerImpl.java
Outdated
Show resolved
Hide resolved
...n/java/com/azure/data/cosmos/internal/changefeed/implementation/PartitionControllerImpl.java
Outdated
Show resolved
Hide resolved
...java/com/azure/data/cosmos/internal/changefeed/implementation/PartitionLoadBalancerImpl.java
Outdated
Show resolved
Hide resolved
...java/com/azure/data/cosmos/internal/changefeed/implementation/PartitionLoadBalancerImpl.java
Outdated
Show resolved
Hide resolved
kushagraThapar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few questions and minor comments on coding standards.
One thing I noticed throughout the code is, one liner if conditions statements are not surrounded by braces. I believe that reduces the code readability by a big factor. One liner if condition statements should be surrounded by braces. :)
...os/src/main/java/com/azure/data/cosmos/internal/changefeed/exceptions/ObserverException.java
Show resolved
Hide resolved
...src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/AutoCheckpointer.java
Outdated
Show resolved
Hide resolved
...src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/AutoCheckpointer.java
Outdated
Show resolved
Hide resolved
...src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/BootstrapperImpl.java
Outdated
Show resolved
Hide resolved
...src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/LeaseRenewerImpl.java
Outdated
Show resolved
Hide resolved
...src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/LeaseRenewerImpl.java
Outdated
Show resolved
Hide resolved
...va/com/azure/data/cosmos/internal/changefeed/implementation/ChangeFeedContextClientImpl.java
Show resolved
Hide resolved
...internal/changefeed/implementation/ObserverExceptionWrappingChangeFeedObserverDecorator.java
Show resolved
Hide resolved
|
in response to your question:
Take a look at |
...src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/AutoCheckpointer.java
Outdated
Show resolved
Hide resolved
...c/main/java/com/azure/data/cosmos/internal/changefeed/implementation/TraceHealthMonitor.java
Outdated
Show resolved
Hide resolved
remove Thread.sleep() call from some of the loops
...src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/AutoCheckpointer.java
Show resolved
Hide resolved
…where earlier feeds can be processed slower than newer feeds and wrong checkpoint is saved in the lease document
...src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/BootstrapperImpl.java
Show resolved
Hide resolved
...com/azure/data/cosmos/internal/changefeed/implementation/ChangeFeedProcessorBuilderImpl.java
Show resolved
Hide resolved
.../src/main/java/com/azure/data/cosmos/internal/changefeed/implementation/DefaultObserver.java
Outdated
Show resolved
Hide resolved
...java/com/azure/data/cosmos/internal/changefeed/implementation/DocumentServiceLeaseStore.java
Show resolved
Hide resolved
...om/azure/data/cosmos/internal/changefeed/implementation/DocumentServiceLeaseUpdaterImpl.java
Outdated
Show resolved
Hide resolved
...n/java/com/azure/data/cosmos/internal/changefeed/implementation/PartitionSupervisorImpl.java
Outdated
Show resolved
Hide resolved
moderakh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
two general comments:
-
in some classes we have class fields which are not atomic/synchronized/volatile, those class field may get updated on one thread and accessed on another thread, as the access is not atomic/synchronized/volatile I am not sure if the recent most changes will be visible by the second thread. Please note even for a single flux when you do multiple transformations based on your operators the actions will take place on different thread so we should consider thread safety.
-
we should use reactor scheduler to have the same async pattern in our sdk. I see in some places you decided to use ExecutorService instead of Reactor pattern. If we decide to go with ExecutorService at least we should let ExecutorService handle threads and not create any threads manually.
milismsft
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- The oder of the operations ensures that the non-atomic class members are not accessed from threads that are executing in parallel.
- The ExecutorService will handle the threads.
…ation with Chris). Code style fixes in anticipation of liner rules that will be eventually enabled for the SDK.
...n/java/com/azure/data/cosmos/internal/changefeed/implementation/PartitionSupervisorImpl.java
Show resolved
Hide resolved
moderakh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed offline,
there is a potential bug:
I see this pattern that you have two threads
- thread
1is keep looping till either cancellation is requested or a failure is observed by a PartitionProcessor instance - thread
2is setting the failure
as exception setting/getting are not done synchronized/atomically the most recent value set by thread 2 will not be visible by thread 1. and thread 1. may keep looping anware of the failure.
this applies to all implementors of the interface PartitionProcessor.
we should fix this bug.
for long term we should consider merging the exception returned by getResultException to the Mono returned by run(.) if that makes sense. Millis please create work items for this.
...n/java/com/azure/data/cosmos/internal/changefeed/implementation/PartitionSupervisorImpl.java
Show resolved
Hide resolved
…d as volatile since it can be accessed via parallel running threads.
moderakh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please create work items for any work/comment you plan to address outside of this PR; so we can track them.
I see three work items:
- the remaining blocking calls (Thread.sleep) need to change to non-blocking pattern
- more mocking unit tests (see bellow) this is very important IMO
- please consider merging exception to Mono terminal result as discussed.
We need more unit test and integration tests. Ideally every class should be unit tested, or at least the main classes. You can mock the input to the class and test the behaviour given the mock behaviour.
doing just end to end testing is hard to simulate different scenarios.
We should consider mock unit tests for:
- Normal Scenario
- Partition Split (I assume we have some special logic? we should test that)
- network failure, etc
Please see DocumentProducerTest which does just unit test using mocking against different scenarios (partition split, etc)
https://github.com/Azure/azure-cosmosdb-java/blob/cc5d007aa7e5984ceb4919b910fe2e691bd91647/sdk/src/test/java/com/microsoft/azure/cosmosdb/rx/internal/query/DocumentProducerTest.java
|
Work items created for two remaining issues: There are two E2E unit tests in practice will ensure coverage for most of the functionality in CFP. There's one work item (in progress) which adds the coverage for extensive usage and handles potential splits. Additionally I'm looking at adding tests similar to what we have in .NET for CFP on opportunistic bases. |
Remove blocking calls from ChangeFeedProcessor implementation and other various fixes.