Skip to content

Adding internal classes to support buffered producer.#30078

Merged
conniey merged 29 commits intoAzure:conniey/feature/buffered-producerfrom
conniey:buffered-producer-work
Jul 24, 2022
Merged

Adding internal classes to support buffered producer.#30078
conniey merged 29 commits intoAzure:conniey/feature/buffered-producerfrom
conniey:buffered-producer-work

Conversation

@conniey
Copy link
Member

@conniey conniey commented Jul 21, 2022

Description

Adds internal classes to support buffered producer feature.

  • EventDataAggregator to aggregate EventData into EventDataBatches and pushes them downstream when either:
    • max wait time has elapsed or, events cannot fit into the batch.
  • EventHubBufferedPartitionProducer to manage a single Event Hub partition when buffering events.
  • Adds tests.
  • Adds some basic javadocs to the public classes.

All SDK Contribution checklist:

  • The pull request does not introduce [breaking changes]
  • CHANGELOG is updated for new features, bug fixes or other significant changes.
  • I have read the contribution guidelines.

General Guidelines and Best Practices

  • Title of the pull request is clear and informative.
  • There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, see this page.

Testing Guidelines

  • Pull request includes test coverage for the included changes.

@conniey
Copy link
Member Author

conniey commented Jul 21, 2022

/azp run java - eventhubs

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@conniey
Copy link
Member Author

conniey commented Jul 21, 2022

/azp run java - eventhubs

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@azure-sdk
Copy link
Collaborator

API change check

APIView has identified API level changes in this PR and created following API reviews.

azure-messaging-eventhubs

@conniey
Copy link
Member Author

conniey commented Jul 21, 2022

/azp run java - eventhubs

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@@ -44,8 +44,10 @@ steps:
Write-Host "git sparse-checkout init"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for that breaking change in eng so it builds.

.order(ByteOrder.LITTLE_ENDIAN);
.put(data);

buffer.flip();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Java8, flip() method returns a class of type Buffer. But in Java 9+, it returns a type of ByteBuffer. So we split this up to avoid the compilation errors.

b += buffer.getInt(index + 4);
c += buffer.getInt(index + 8);
break;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These fallthroughs are intentional.

@conniey
Copy link
Member Author

conniey commented Jul 22, 2022

/azp run java - eventhubs

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@conniey
Copy link
Member Author

conniey commented Jul 22, 2022

/azp run java - eventhubs

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

Copy link
Contributor

@liukun-msft liukun-msft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks good to me, just add few comments for details. 😃


final long left = REQUESTED.get(this);
if (left > 0) {
subscription.request(1L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have subscription.request(n) when downstream request, do we still need to request(1L) here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I removed it from this method.

if (alwaysPublish) {
publishDownstream();
return;
} else if (eventData == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add some log when eventData is null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment since it is a normal path after maxWaitTime has elapsed.

return;
}

downstream.onNext(previous);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need double check if downstream have other locks or may take a long time to avoid deadlock.
I check the producer send method, so far looks good.


publishDownstream();

synchronized (lock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand why we have two tryAdd() here. But if we separate the lock block for two tryAdd(), between publishDownstream() and the second tryAdd(), I think it is possible that other threads add eventData into batch which cause the second tryAdd() fails. Maybe we need lock from the first tryAdd() until the second tryAdd()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. :)

@check-enforcer
Copy link

This pull request is protected by Check Enforcer.

What is Check Enforcer?

Check Enforcer helps ensure all pull requests are covered by at least one check-run (typically an Azure Pipeline). When all check-runs associated with this pull request pass then Check Enforcer itself will pass.

Why am I getting this message?

You are getting this message because Check Enforcer did not detect any check-runs being associated with this pull request within five minutes. This may indicate that your pull request is not covered by any pipelines and so Check Enforcer is correctly blocking the pull request being merged.

What should I do now?

If the check-enforcer check-run is not passing and all other check-runs associated with this PR are passing (excluding license-cla) then you could try telling Check Enforcer to evaluate your pull request again. You can do this by adding a comment to this pull request as follows:
/check-enforcer evaluate
Typically evaulation only takes a few seconds. If you know that your pull request is not covered by a pipeline and this is expected you can override Check Enforcer using the following command:
/check-enforcer override
Note that using the override command triggers alerts so that follow-up investigations can occur (PRs still need to be approved as normal).

What if I am onboarding a new service?

Often, new services do not have validation pipelines associated with them, in order to bootstrap pipelines for a new service, you can issue the following command as a pull request comment:
/azp run prepare-pipelines
This will run a pipeline that analyzes the source tree and creates the pipelines necessary to build and validate your pull request. Once the pipeline has been created you can trigger the pipeline using the following comment:
/azp run java - [service] - ci

@conniey conniey force-pushed the buffered-producer-work branch from 3e6001c to 54df3ec Compare July 24, 2022 07:55
@conniey conniey merged commit 669d733 into Azure:conniey/feature/buffered-producer Jul 24, 2022
@conniey conniey deleted the buffered-producer-work branch July 24, 2022 08:18
conniey added a commit that referenced this pull request Aug 1, 2022
* Updating property name to getMaxEventBufferLengthPerPartition

* Adds an aggregator that publishes events based on full batches and time.

* Adding implementation for publishing events for a partition.

* Adding license info.

* Adding inline mock maker for final classes.

* Fixing recursive subscription.

* Do not get additional batches if the current instance is completed.

* Rename to EventHubBufferedPartitionProducer

* Applying retryWhen policy to Aggregator.

* Remove unused EventData field

* Adding documentation to mock helper.

* Hack around the blocking call.

* Adding tests for EventHubBufferedPartitionProducerTest

* Populating PartitionProcessors for each partition id.

* Creating private static class to hold PublishResults from buffered producer.

* Adding tests to ensure upstream requests are respected.

* Add suppression for fallthrough

* Use --no-cone in pipeline sparse checkout script (#29905)

Co-authored-by: Ben Broderick Phillips <bebroder@microsoft.com>

* Adding comments for cases meant to fallthrough

* Wrap synchronous lock.

* Adding documentation to EventDataAggregatora

Co-authored-by: Azure SDK Bot <53356347+azure-sdk@users.noreply.github.com>
Co-authored-by: Ben Broderick Phillips <bebroder@microsoft.com>
conniey added a commit that referenced this pull request Aug 1, 2022
* Adding initial Buffered Producer APIs with documentation. (#29525)

* Adding skeleton classes.

* Adding documentation to buffered producer models.

* Adding minor implementation for Event HubBufferedProducerClientBuilder.

* Adding some documentation to Event Hub Buffered Producer Client Builder.

* Adding suppression for PagedFlux

* Adds hashing algorithm for buffered producer (#30012)

* Adding implementation of PartitionResolver

* Adding tests for PartitionResolver.

* Using unsigned right shift operator.

* Adding concurrent test cases.

* Add CHANGELOG

* Adding internal classes to support buffered producer. (#30078)

* Updating property name to getMaxEventBufferLengthPerPartition

* Adds an aggregator that publishes events based on full batches and time.

* Adding implementation for publishing events for a partition.

* Adding license info.

* Adding inline mock maker for final classes.

* Fixing recursive subscription.

* Do not get additional batches if the current instance is completed.

* Rename to EventHubBufferedPartitionProducer

* Applying retryWhen policy to Aggregator.

* Remove unused EventData field

* Adding documentation to mock helper.

* Hack around the blocking call.

* Adding tests for EventHubBufferedPartitionProducerTest

* Populating PartitionProcessors for each partition id.

* Creating private static class to hold PublishResults from buffered producer.

* Adding tests to ensure upstream requests are respected.

* Add suppression for fallthrough

* Use --no-cone in pipeline sparse checkout script (#29905)

Co-authored-by: Ben Broderick Phillips <bebroder@microsoft.com>

* Adding comments for cases meant to fallthrough

* Wrap synchronous lock.

* Adding documentation to EventDataAggregatora

Co-authored-by: Azure SDK Bot <53356347+azure-sdk@users.noreply.github.com>
Co-authored-by: Ben Broderick Phillips <bebroder@microsoft.com>

* Connecting EventHubBufferedProducerClient to internal classes. (#30194)

* Adds flush to PartitionProcessor.

* Adds implementation to methods for async client.

* Fixes build breaks.

* Remove idempotent retries for next release.

* Connect asynchronous client.

* Closing producer client after test case.

* Make default retry options package-private.

* Adding integration test for BufferedProducerAsyncClient.

* Adding partition id to EventDataAggregator

* Adding defaults to builder and documentation.

* Remove localizedBy usages.

* Fixing switchMap logic.

* Have PartitionProducer throw an UncheckedInterruptedException when a batch creation occurs while the thread is interrrupted.

* Update PublishResultSubscriber to clear remaining queue items when closed.

* Delay computation of EventDataAggregator to prevent multiple instances being created.

* Adding support for flush().

* Add emitResult constant.

* Complete EventDataAggregator when it is cancelled.

* Implement flush and clean up logging.

* Add implementation for sync-client.

* Making non-implemented methods package-private.

* Add changelog entry.

Co-authored-by: Azure SDK Bot <53356347+azure-sdk@users.noreply.github.com>
Co-authored-by: Ben Broderick Phillips <bebroder@microsoft.com>
conniey added a commit to conniey/azure-sdk-for-java that referenced this pull request Sep 29, 2022
…#30224)

* Adding initial Buffered Producer APIs with documentation. (Azure#29525)

* Adding skeleton classes.

* Adding documentation to buffered producer models.

* Adding minor implementation for Event HubBufferedProducerClientBuilder.

* Adding some documentation to Event Hub Buffered Producer Client Builder.

* Adding suppression for PagedFlux

* Adds hashing algorithm for buffered producer (Azure#30012)

* Adding implementation of PartitionResolver

* Adding tests for PartitionResolver.

* Using unsigned right shift operator.

* Adding concurrent test cases.

* Add CHANGELOG

* Adding internal classes to support buffered producer. (Azure#30078)

* Updating property name to getMaxEventBufferLengthPerPartition

* Adds an aggregator that publishes events based on full batches and time.

* Adding implementation for publishing events for a partition.

* Adding license info.

* Adding inline mock maker for final classes.

* Fixing recursive subscription.

* Do not get additional batches if the current instance is completed.

* Rename to EventHubBufferedPartitionProducer

* Applying retryWhen policy to Aggregator.

* Remove unused EventData field

* Adding documentation to mock helper.

* Hack around the blocking call.

* Adding tests for EventHubBufferedPartitionProducerTest

* Populating PartitionProcessors for each partition id.

* Creating private static class to hold PublishResults from buffered producer.

* Adding tests to ensure upstream requests are respected.

* Add suppression for fallthrough

* Use --no-cone in pipeline sparse checkout script (Azure#29905)

Co-authored-by: Ben Broderick Phillips <bebroder@microsoft.com>

* Adding comments for cases meant to fallthrough

* Wrap synchronous lock.

* Adding documentation to EventDataAggregatora

Co-authored-by: Azure SDK Bot <53356347+azure-sdk@users.noreply.github.com>
Co-authored-by: Ben Broderick Phillips <bebroder@microsoft.com>

* Connecting EventHubBufferedProducerClient to internal classes. (Azure#30194)

* Adds flush to PartitionProcessor.

* Adds implementation to methods for async client.

* Fixes build breaks.

* Remove idempotent retries for next release.

* Connect asynchronous client.

* Closing producer client after test case.

* Make default retry options package-private.

* Adding integration test for BufferedProducerAsyncClient.

* Adding partition id to EventDataAggregator

* Adding defaults to builder and documentation.

* Remove localizedBy usages.

* Fixing switchMap logic.

* Have PartitionProducer throw an UncheckedInterruptedException when a batch creation occurs while the thread is interrrupted.

* Update PublishResultSubscriber to clear remaining queue items when closed.

* Delay computation of EventDataAggregator to prevent multiple instances being created.

* Adding support for flush().

* Add emitResult constant.

* Complete EventDataAggregator when it is cancelled.

* Implement flush and clean up logging.

* Add implementation for sync-client.

* Making non-implemented methods package-private.

* Add changelog entry.

Co-authored-by: Azure SDK Bot <53356347+azure-sdk@users.noreply.github.com>
Co-authored-by: Ben Broderick Phillips <bebroder@microsoft.com>
conniey added a commit that referenced this pull request Sep 30, 2022
* Remerge "Using new windowTimeout operator with backpressure support" (#30111)

* Using new windowTimeout operator with backpressure support

* Using Flux.class directly for windowTimeout lookup, using azure core libs defined to use v4.3.19 reactor-core

* Improving reflection logic and moving it to ReactorShim

* Replace inline reflection with ReactorShim

* Adding ReactorShim test for backpressure aware windowTimeout.

* Changelog update for ReactorShim

* Using ConcurrentLinkedQueue to avoid ConcurrentModificationException while iterating array.

Co-authored-by: Anu Thomas Chandy <anuamd@hotmail.com>

* Merge buffered producer into release/azure-messaging-eventhubs (#30224)

* Adding initial Buffered Producer APIs with documentation. (#29525)

* Adding skeleton classes.

* Adding documentation to buffered producer models.

* Adding minor implementation for Event HubBufferedProducerClientBuilder.

* Adding some documentation to Event Hub Buffered Producer Client Builder.

* Adding suppression for PagedFlux

* Adds hashing algorithm for buffered producer (#30012)

* Adding implementation of PartitionResolver

* Adding tests for PartitionResolver.

* Using unsigned right shift operator.

* Adding concurrent test cases.

* Add CHANGELOG

* Adding internal classes to support buffered producer. (#30078)

* Updating property name to getMaxEventBufferLengthPerPartition

* Adds an aggregator that publishes events based on full batches and time.

* Adding implementation for publishing events for a partition.

* Adding license info.

* Adding inline mock maker for final classes.

* Fixing recursive subscription.

* Do not get additional batches if the current instance is completed.

* Rename to EventHubBufferedPartitionProducer

* Applying retryWhen policy to Aggregator.

* Remove unused EventData field

* Adding documentation to mock helper.

* Hack around the blocking call.

* Adding tests for EventHubBufferedPartitionProducerTest

* Populating PartitionProcessors for each partition id.

* Creating private static class to hold PublishResults from buffered producer.

* Adding tests to ensure upstream requests are respected.

* Add suppression for fallthrough

* Use --no-cone in pipeline sparse checkout script (#29905)

Co-authored-by: Ben Broderick Phillips <bebroder@microsoft.com>

* Adding comments for cases meant to fallthrough

* Wrap synchronous lock.

* Adding documentation to EventDataAggregatora

Co-authored-by: Azure SDK Bot <53356347+azure-sdk@users.noreply.github.com>
Co-authored-by: Ben Broderick Phillips <bebroder@microsoft.com>

* Connecting EventHubBufferedProducerClient to internal classes. (#30194)

* Adds flush to PartitionProcessor.

* Adds implementation to methods for async client.

* Fixes build breaks.

* Remove idempotent retries for next release.

* Connect asynchronous client.

* Closing producer client after test case.

* Make default retry options package-private.

* Adding integration test for BufferedProducerAsyncClient.

* Adding partition id to EventDataAggregator

* Adding defaults to builder and documentation.

* Remove localizedBy usages.

* Fixing switchMap logic.

* Have PartitionProducer throw an UncheckedInterruptedException when a batch creation occurs while the thread is interrrupted.

* Update PublishResultSubscriber to clear remaining queue items when closed.

* Delay computation of EventDataAggregator to prevent multiple instances being created.

* Adding support for flush().

* Add emitResult constant.

* Complete EventDataAggregator when it is cancelled.

* Implement flush and clean up logging.

* Add implementation for sync-client.

* Making non-implemented methods package-private.

* Add changelog entry.

Co-authored-by: Azure SDK Bot <53356347+azure-sdk@users.noreply.github.com>
Co-authored-by: Ben Broderick Phillips <bebroder@microsoft.com>

* Update CHANGELOG.md

* Fix CHANGELOG.

Co-authored-by: Anu Thomas Chandy <anuamd@hotmail.com>
Co-authored-by: Azure SDK Bot <53356347+azure-sdk@users.noreply.github.com>
Co-authored-by: Ben Broderick Phillips <bebroder@microsoft.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants