-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19405][STREAMING] Support for cross-account Kinesis reads via STS #16744
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #72163 has finished for PR 16744 at commit
|
2401eff to
95ebd9c
Compare
|
Test build #72164 has finished for PR 16744 at commit
|
|
Test build #72165 has finished for PR 16744 at commit
|
|
Missed the code in python/streaming that this touches. Will update PR. |
|
The JIRA I opended for this issue contains further details and background. Linking to it here for good measure: |
|
Also, on another note, the |
|
Test build #72175 has finished for PR 16744 at commit
|
|
Pinging @tdas on this-- looks like you're the committer who has contributed the most to kinesis-asl. |
srowen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't feel qualified to review the substance of this but the form looks reasonable.
What are the drawbacks, if any? any behavior change or compatibility issues to note?
|
There shouldn't be any change to behavior or compatibility when using the existing implementations of |
|
Pinging @brkyvz as well, who also appears to have reviewed kinesis-asl changes in the past |
srowen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's probably OK if it's not changing APIs and adding useful support, without complicating things too much. My only real question is about the new dependency and its license and whether it already existed or not
pom.xml
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably an ignorant question but is this the first and only time something depends on the AWS SDK here? I know we had discussions about the Kinesis client already because its license was problematic. Didn't it already depend on the AWS SDK and is the license OK? Worth re-checking this situation, if you would, as a second set of eyes would be useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe there was previously a direct dependency on the AWS SDK but it is currently getting pulled in as a transitive dependency of the Kinesis Client Library. The KCL dependencies don't include the aws-java-sdk-sts Maven artifact so we must add it as an explicit dependency in the pom.xml for kinesis-asl.
The AWS SDK is licensed under Apache 2.0
|
Amending this PR to upgrade the KCL/AWS SDK dependencies to more-current versions (1.7.3 and 1.11.76, respectively). The |
|
Test build #72465 has finished for PR 16744 at commit
|
|
Hi @budde, taking a look at this now. Sorry for the wait |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is too brittle. I would rather use something more like:
RegionUtils.getRegionsForService("kinesis").find(_.getAvailableEndpoints.contains(endpoint)).getOrElse(
throw new IllegalArgumentException(s"Couldn't find region for endpoint: $endpoint"))There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Went for a quick fix but this is much nicer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we simplify this piece of code? There are too many options. Might I suggest something like:
trait SerializableAWSAuthProvider extends Serializable {
def getProvider: AWSCredentialsProvider
}
case class BasicCredentialProvider(
accessKeyId: String,
secretKey: String) extends SerializableAWSAuthProvider {
def getProvider: ...
}
case class STSCredentialsProvider(
roleArn: String,
sessionName: String,
externalId: Option[String],
credentials: Option[BasicCredentialProvider]) ...
case object DefaultProvider extends DefaultAWSCredentialsProviderChainThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was hoping for some feedback here. I think making this an interface with split basic/STS implementations should work well. I'll give it a shot.
|
PR has been amended to reflect feedback. Thanks for taking a look, @brkyvz. |
|
Test build #72480 has finished for PR 16744 at commit
|
|
Test build #72481 has finished for PR 16744 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would move these cred providers to its own file and make this a sealed trait
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do
|
@budde I'm just concerned by the exponential blowoff of APIs. Here's my proposal. |
brkyvz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feel free to push back, but let's just keep the most comprehensive API options. We should deprecate them once we add the Builder interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's remove this one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's keep this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's remove this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's remove this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's remove this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's keep this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's remove this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's remove this
|
@brkyvz I share your concerns around expanding this API further than necessary. I think I'm okay with this as long as we're fairly confident the builder pattern work can be merged in the same Spark release. My reluctance here is that forcing users who want to use STS to provide static credentials for the long-lived auth to STS itself will be a security regression for folks using EC2 where IAM instance profiles provide a secure way of avoiding potential problems with static credential management. As you've pointed out though, this is less of a concern if we can deprecate the brittle I'll give another look over the |
|
Can't they still use |
|
So, if these values are |
|
@budde The scaladocs mention if that's not the case, we should make it so! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add a null check here and add logging that we're falling back to DefaultProviderChain
|
@brkyvz I actually think that Scaladoc may be outdated– I double checked the current master branch and it looks like Regardless, the check you've suggested would restore this behavior. I'll go that route. |
|
@brkyvz I've updated the PR per your feedback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can add the exception to the log message if you think it's appropriate
|
Test build #73240 has finished for PR 16744 at commit
|
|
Missed updating a test, my mistake. Fixing now. |
python/pyspark/streaming/kinesis.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
External
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ugh. Fixed. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you like to add it? Does the AWS exception include what was missing, e.g. access key was null or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does specify that access key/secret key is null. I'll just add it.
|
Two final comments. Then I'll merge it pending tests |
|
Updated the PR. Thanks for the work you've done on this! Hopefully I can have a PR for the builder interface up later this week. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use "falling back to DefaultAWSCredentialsProviderChain.", e) instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can do it in a separate PR if you like
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went ahead and updated it.
…t Kinesis reads via STS - Add dependency on aws-java-sdk-sts - Replace SerializableAWSCredentials with new SerializableCredentialsProvider interface - Make KinesisReceiver take SerializableCredentialsProvider as argument and pass credential provider to KCL - Add new implementations of KinesisUtils.createStream() that take STS arguments - Make JavaKinesisStreamSuite test the entire KinesisUtils Java API - Update KCL/AWS SDK dependencies to 1.7.x/1.11.x - Make SerializableCredentialsProvider a sealed trait and classes to their own file
|
Test build #73245 has finished for PR 16744 at commit
|
|
Test build #73244 has finished for PR 16744 at commit
|
|
Test build #73248 has finished for PR 16744 at commit
|
|
Merging to master. Thanks for your patience! |
- Add dependency on aws-java-sdk-sts - Replace SerializableAWSCredentials with new SerializableCredentialsProvider interface - Make KinesisReceiver take SerializableCredentialsProvider as argument and pass credential provider to KCL - Add new implementations of KinesisUtils.createStream() that take STS arguments - Make JavaKinesisStreamSuite test the entire KinesisUtils Java API - Update KCL/AWS SDK dependencies to 1.7.x/1.11.x ## What changes were proposed in this pull request? [JIRA link with detailed description.](https://issues.apache.org/jira/browse/SPARK-19405) * Replace SerializableAWSCredentials with new SerializableKCLAuthProvider class that takes 5 optional config params for configuring AWS auth and returns the appropriate credential provider object * Add new public createStream() APIs for specifying these parameters in KinesisUtils ## How was this patch tested? * Manually tested using explicit keypair and instance profile to read data from Kinesis stream in separate account (difficult to write a test orchestrating creation and assumption of IAM roles across separate accounts) * Expanded JavaKinesisStreamSuite to test the entire Java API in KinesisUtils ## License acknowledgement This contribution is my original work and that I license the work to the project under the project’s open source license. Author: Budde <[email protected]> Closes apache#16744 from budde/master.
pass credential provider to KCL
arguments
What changes were proposed in this pull request?
JIRA link with detailed description.
How was this patch tested?
License acknowledgement
This contribution is my original work and that I license the work to the project under the project’s open source license.