-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AWS Sqs] Add SqsIO.writeBatches for improved performance #26946
Conversation
R: @aromanenko-dev No rush on this one, I'll be off for quite a while mid next week. |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello!
I ran basic QA checks and did not find issues. Will go through the code deeper this week.
Found one inconvenience - when I try to run SqsIOIT
against real AWS resources, I get:
> Task :sdks:java:io:amazon-web-services2:integrationTest
org.apache.beam.sdk.io.aws2.sqs.testing.SqsIOIT > testWriteThenRead FAILED
software.amazon.awssdk.services.sqs.model.QueueDeletedRecentlyException: You must wait 60 seconds after deleting a queue before you can create another with the same name. (Service: Sqs, Status Code: 400, Request ID: 37bcbb71-8938-5711-afa0-e7860fc9d2d7)
This is thrown too if I run the suite two times without waiting 60 sec.
What do you think of making the queue name vary in different tests and in different test runs? Not sure about the latter, but I think making it different in two tests is good - allows running full suite without errors.
Thanks @psolomin, good catch. That shouldn't happen, I've fixed it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for inviting me to review this, I've learnt some new things.
When I tried this with Flink runner, performance-wise, new writeBatches()
managed to submit 100k records in a couple of minutes, while write()
was at 1 - 2k / minute. Big difference 👏 Dynamic writes to 2 destinations also worked just fine.
The code looks good to me too, I've left only a couple of nitpicks.
I wonder though, should we keep the existing write()
then, long term? Or make it as deprecated and eventually drop?
...n-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/AsyncBatchWriteHandler.java
Outdated
Show resolved
Hide resolved
@@ -1328,7 +1332,7 @@ void addClientRecord(int recordBytes) { | |||
} | |||
|
|||
@Override | |||
public void addPutRecordsRequest(long latencyMillis, boolean isPartialRetry) { | |||
public void addBatchWriteRequest(long latencyMillis, boolean isPartialRetry) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[side note] While reading this PR, I've noticed:
... Concurrency settings above the default have caused a bug in the AWS SDK v2. ...
public Write<T> withConcurrentRequests(int concurrentRequests)
Does it still hold after the recent bump of AWS SDK version? Should we create an issue for Beam to expose this setting once the AWS SDK is fixed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually not sure about this at the moment. I'll make a note and will have look into this later when back from my pto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. LGTM
I added a couple of minor notes, ptal.
Also, I was a bit confused to see the changes of KinesisIO as well, despite the fact that this PR titles about only SqsIO. Never mind but it would be more clear to split such PR into two ones or properly name it in the future.
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java
Show resolved
Hide resolved
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java
Outdated
Show resolved
Hide resolved
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java
Outdated
Show resolved
Hide resolved
@psolomin I'm still wondering / unsure how to proceed. I was also considering to re-implement write using write batches internally... |
The changes on the Kinesis side are just to extract some common base functionality, but nothing functional. |
0f8c48c
to
59c247f
Compare
This adds an additional
writeBatches
to SqsIO to improve throughput using the batch API with an async client.The previous KinesisIo
AsyncPutRecordsHandler
became a general handler, so it can also be used for SQS (and others).(closes #21429)
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.