-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kinesis enhanced fanout #23540
Kinesis enhanced fanout #23540
Conversation
f87e895
to
7b6d993
Compare
Thanks @psolomin! Sorry, I didn't get to look into this today but I'll have a look tomorrow. |
66a379e
to
b5dd494
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@psolomin Thanks so much for working on this. Enhanced fanout is a long missed feature and it was frequently asked for on the mailing list. 🎉 💪
I have a few comments below.
Overall I'm honestly a bit torn and I'm wondering about your experience working on the IO?
One the one hand I feel this is pretty close and mostly misses the resharding stuff (which you're still working on). That's so awesome 🎉.
On the other hand the complexity of the IO has grown even further and I found it surprisingly tough to understand how things play together (or not). That part concerns me a bit, also thinking about maintenance.
Is it worth it to step back from the current implementation and think through how a pure subscriber based implementation would look like without the existing luggage? I suspect there's some potential to simplify things (a lot?). E.g. we're actually notified about child shards on the SubscribeToShardEvent
, so no need for separate requests to figure these out. But I could be wrong ...
What do you think and how do you feel about the complexity aspect having worked on this for a bit now?
...azon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/AWSClientsProvider.java
Outdated
Show resolved
Hide resolved
kinesisAsyncSupplier = spec.getAWSClientsProvider()::getKinesisAsyncClient; | ||
cloudWatchSupplier = spec.getAWSClientsProvider()::getCloudWatchClient; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can forbid using the deprecated configuration path (AwsClientsProvider) when enabling enhanced fanout. Similar checks are in KinesisIO.Read.expand(...)
...amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardReadersPool.java
Outdated
Show resolved
Hide resolved
private void subscribeLoop(ShardRecordsIterator shardRecordsIterator) { | ||
while (poolOpened.get()) { | ||
try { | ||
shardRecordsIterator.subscribeToShard(this::putRecord); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Somehow we have to get / process childShards
from the SubscribeToShardEvent
to handle any re-sharding event.
Also, shouldn't the next subscribe event use the continuationSequenceNumber
?
Use this as
SequenceNumber
in the next call to SubscribeToShard, with
StartingPosition
set toAT_SEQUENCE_NUMBER
orAFTER_SEQUENCE_NUMBER
. Use
ContinuationSequenceNumber
for checkpointing because it captures your shard progress even when no
data is written to the shard.
Honestly, I'm not sure if that has any implications for checkpointing... I think the current approach should still be fine.
.../amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardCheckpoint.java
Outdated
Show resolved
Hide resolved
@mosche many thanks for the detailed feedback 👍 This was my first attempt. I basically tried to pull some pieces from #9899 to get myself more familiar with this codebase, and then after basic tests with real Kinesis stream found out this approach doesn't seem to work when I consider resharding.
First time I am putting my hands on into the guts of a Beam IO, but not the first time dealing with Kinesis consumers overall.
My impression was that I needed to re-think this approach - namely, avoid overloading
I felt the same and that's actually the thing I'm busy now with. Once I have something tangible which covers re-sharding cleanly, may I ping you again? |
Sure, absolutely, feel free to reach out any time! |
Also sorry @psolomin , you got me wrong there ... my fault. I wasn't asking for your Beam / Kinesis experience at all, but wanted to understand how you experienced it getting started on the KinesisIO. I personally think there's a lot that can be improved and it's unnecessarily complex (already). Glad to hear you got to the same conclusions and very keen to see your next version 🙂 |
@psolomin Great to see so much activity here :) Let me know when this is ready for another review, happy to have a look! |
How's things going @psolomin ? Please let me know if there's any way I can support you on this. |
Hello @mosche Thanks for reaching out. I went through implementing 1 design attempt two weeks ago, but was not happy with it in the end, and did not push that into this draft. I plan to finish and push the commits of another attempt by next Mon. |
Sounds great @psolomin, happy to hear :) And no rush, I just wanted to reach out and see if I can support you. |
74a60ab
to
c6dd506
Compare
...n-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/enhancedfanout/Config.java
Outdated
Show resolved
Hide resolved
...web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/enhancedfanout/Checkers.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/sdk/io/aws2/kinesis/enhancedfanout/KinesisEnhancedFanOutSource.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/beam/sdk/io/aws2/kinesis/enhancedfanout/KinesisShardEventsSubscriber.java
Outdated
Show resolved
Hide resolved
...s2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/enhancedfanout/ShardSubscriberImpl.java
Outdated
Show resolved
Hide resolved
c6dd506
to
6a35a8e
Compare
...c/main/java/org/apache/beam/sdk/io/aws2/kinesis/enhancedfanout/ShardSubscribersPoolImpl.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/beam/sdk/io/aws2/kinesis/enhancedfanout/ShardSubscribersPoolImpl.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/beam/sdk/io/aws2/kinesis/enhancedfanout/ShardSubscribersPoolImpl.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/beam/sdk/io/aws2/kinesis/enhancedfanout/ShardSubscribersPoolImpl.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/beam/sdk/io/aws2/kinesis/enhancedfanout/helpers/KinesisClientProxyStub.java
Outdated
Show resolved
Hide resolved
530d92a
to
7a6ab3a
Compare
63577d1
to
746eb72
Compare
f30544b
to
f3250e2
Compare
7b48fd5
to
de679cd
Compare
...azon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOShardSubscriber.java
Outdated
Show resolved
Hide resolved
...ava/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java
Show resolved
Hide resolved
ShardState shardState = Preconditions.checkStateNotNull(state.get(shardId)); | ||
if (current.hasNext()) { | ||
KinesisClientRecord r = current.next(); | ||
KinesisRecord kinesisRecord = new KinesisRecord(r, read.getStreamName(), shardId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, at this point, this is the only doubt I actually have. Other things are already addressed, right @mosche ?
@psolomin yes, this is the only open issue I'm aware of. Before returning, shardState
must be adequately updated if this was the last record of current
to make sure checkpoint marks are correctly reflecting progress.
KinesisClientRecord r = current.next();
// Make sure to update shard state accordingly if `current` does not contain any more
// events. This is necessary to account for any re-sharding, so we could correctly resume
// from a checkpoint if taken once we advanced to the record returned by getNextRecord().
if (!current.hasNext()) {
onEventDone(shardState, current);
current = null;
}
KinesisRecord kinesisRecord = new KinesisRecord(r, read.getStreamName(), shardId);
if (shardState.isAfterInitialCheckpoint(kinesisRecord)) {
shardState.update(kinesisRecord);
return kinesisRecord;
}
Looks like we're missing a test case for this as follows:
- stub
subscribeToShard
with a reshard event that also contains 1 record. - wait for exactly that 1 record
- verify the checkpoint mark now contains the new shards, but not the old one anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reshard event that also contains 1 record
I am not sure such a thing exists. I was convinced that re-shard events always come with continuationSequenceNumber = null
, and they never carry records with non-null sequence numbers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was convinced that re-shard events always come with continuationSequenceNumber = null
continuationSequenceNumber
is unrelated to this. If it is null
it simply means the shard doesn't contain any next records. But that doesn't mean the current event did not contain records.
Have you seen any documentation that explicitly states that events containing child shards do never contain records? If not we have to assume that this could be the case. Even if that is something we've never observed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you seen any documentation that explicitly states that events containing child shards do never contain records?
Not really, but I think I implicitly assumed that after I checked Flink implementation of EFO.
Even if that is something we've never observed.
Ok, got it. I am fine to go defensive here and assume that SubscribeToShardEvent
can carry records AND child shards together. Let me try to put this out in the code, there might be other places it will be a trouble - I implicitly assumed this never happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we're missing a test case
Added it.
…m/sdk/io/aws2/kinesis/EFOShardSubscriber.java Co-authored-by: Moritz Mack <[email protected]>
252c1a1
to
24aa4f1
Compare
Run Java PreCommit |
Run Java_GCP_IO_Direct PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🥇 Awesome work @psolomin. I'm just waiting for the tests to pass before merging.
addresses #19967
fixes #19967
This should be reviewed after#26117 ✅Design decisions
AFTER_SEQUENCE_NUMBER
in checkpoints for both non- and aggregated records, and, in fact, startsAT
all the time, compensating withRecordFilter
. EFO consumer implements the same semantic for executing first subscription request. This is to be re-visited in [Task]: Adjust KinesisIO ShardCheckpoint semantics such that is more consistent #26073As per #26117:
KinesisReaderCheckpoint
with Beam 2.46.0KinesisIO.Read
andKinesisSource
- this is confirmed for Flink runner, but may affect other runners as well, if they serialise these objects as parts of their checkpoint / savepoint. Unfortunately,KinesisIO.Read
and its auto-generated counterpart had different defaultserialVersionUID
, and it's impossible to keep them when we addwithConsumerArn()
option.How this change was tested
With Flink runner - Flink 1.15.2, Java 11:
Playground repo: https://github.com/psolomin/beam-playground/tree/master/kinesis-io-with-enhanced-fan-out
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.