Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MSK IAM: add support for AssumeRole auto refresh #944

Closed
hnaoto opened this issue Jun 6, 2022 · 12 comments
Closed

MSK IAM: add support for AssumeRole auto refresh #944

hnaoto opened this issue Jun 6, 2022 · 12 comments

Comments

@hnaoto
Copy link

hnaoto commented Jun 6, 2022

Hi, MSK IAM auth support was added recently #937, but it doesn't support temp credentials/ assume role refreshes. This feature was initially mentioned by @garrett528 in a thread #907 (comment)

I am interested in adding support for auto refreshing AssumeRole.

Ideally, the client will retrieve temporary credentials from STS and refresh those credentials in the background.

      sts = Aws::STS::Client.new
      role_credentials = Aws::AssumeRoleCredentials.new(
        client: sts,
        role_arn: "arn:aws:iam::......",
        role_session_name: "example_session"
      )
    
  # can access access_key_id by using  role_credentials.access_key_id

The sasl_authenticator is initiated when the kafka client got created https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/sasl_authenticator.rb#L42. I am wondering what is the appropriate way of refreshing the credentials used by in sasl_authenticator the background. Theoretically, I can recreate the sasl_authenticator after the credentials get updated, but that feels a little bit hacky to me..

Any suggestions are much appreciated.

@garrett528
Copy link

@hnaoto the way that i've seen this done, specifically in librdkafka which is the standard C client for Kafka, is that the credentials are stored in an object. that object is refreshed on a certain cadence (about 80% of the way through the credential lifetime set by duration_sec). however, this requires that the refresh mechanism is 1. scheduled and 2. running on a background thread. the credential object requires that it be locked and unlocked upon update since multiple threads may be trying to access it concurrently.

here's what i got to in late 2021. https://github.com/garrett528/ruby-kafka/pull/2/files. it's not complete and i don't think i ever got the thread to be scheduled properly (i'm no rubyist so i may be heading in the wrong direction trying to port C to Ruby).

here's the librdkafka C implementation that i wrote to do this if it helps. https://github.com/UrbanCompass/librdkafka/blob/master/src/rdkafka_sasl_aws_msk_iam.c

@hnaoto
Copy link
Author

hnaoto commented Jun 8, 2022

@garrett528 Thank you so much for sharing all the details. The solution that you tried sounds promising. Let me see whether I can figure out the scheduling part.

@hnaoto
Copy link
Author

hnaoto commented Jun 8, 2022

Hi @garrett528, I went through the branch that you shared. May I ask some questions? You mentioned "thread was not scheduled properly".. What kind of errors did you get? (For example, did the credentials get updated after expiration?)

@garrett528
Copy link

trying to remember where i stopped... right now, that code doesn't actually call the sts endpoint so that definitely needs to be added. the refresh thread works but it is fixed to sleep(60) instead of using the refresh duration. additionally, it's using an infinite loop + sleep instead of an actual scheduling mechanism. that may not be the best way to handle this and i don't think i ever tested whether the creds pick up new values after the refresh is executed.

@hnaoto
Copy link
Author

hnaoto commented Jun 10, 2022

Thanks for sharing all the information @garrett528 😃

I did some digging into Ruby AWS SDK and looks like the the temporary credentials(assumeRoleCredentials) will be refreshed in the background automatically (source code: https://github.com/aws/aws-sdk-ruby/blob/version-3/gems/aws-sdk-core/lib/aws-sdk-core/refreshing_credentials.rb)

If I try to pass the role credentials object to the kafka client, the role credentials object used by Ruby Kafka client will get updated in the background as well. (From some people's perspectives, Ruby is "pass by object reference"....)

Do you think it is still required to refresh the credentials in Kakfa Ruby client if the credentials object got updated in the background?

As far as I know, once a connection is established, the client can keep talking to Kafka. Kafka won't kick away the client with expired credentials proactively. The code change in this MR #951 is working but I haven't figured out a way to verify the behavior of the client when it needs to re-establish a connection.

@garrett528
Copy link

garrett528 commented Jun 12, 2022

oh that's a good find! i'm not sure what the mechanism is for how the ruby-kafka client itself manages once the connection requires reset. the way i tested this on the C client was to set up an IAM-enabled test MSK cluster and write a script to instantiate a producer and consumer on a fixed interval (say 1 minute) and have it run for longer than the refresh duration. that helped me pinpoint the fact that the C client actually requires some background thread to refresh the credential and then see what happens when the broker thread disconnects from MSK due to the credential expiring.

you should be able to set the credential duration to a minimum of 15 minutes so it's a bit annoying to test.

@hnaoto
Copy link
Author

hnaoto commented Jun 14, 2022

Thanks so much for sharing the testing method @garrett528! I tried that with a MSK cluster and let the program run for around 8 hours. It worked fine.

I also took a further look at the source code of Ruby Kafka client. On a high level, the authentication process appears to work in the following fashion:

To sum up, if there is no connection error, the client will keep working even if the credentials have expired.
If there is a connection error, depends on scenario, sometimes the Kafka client might just raise the connection error and crash the client. The application code will be responsible for re-creating the client at this point. The assumeRole credentials have been getting refreshed in the background, so the newly created client will be using valid assumeRole credentials.

If the Kafka client attempts to recreate the connection, it will invoke build_connection and authenticate! https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/sasl/awsmskiam.rb#L30 eventually. The credentials of awsmskiam.rb are encapsulated in an object and have been getting updated in the background automatically. The newly created connection should work as expected.

I apologize for the long text. I might miss some nuances of how connection errors are handled by Ruby Kafka client. I think the authentication process should be similar across clients written in different languages. I think the current implementation
should be sufficient. Does the analysis sound reasonable to you?

@garrett528
Copy link

@hnaoto that makes sense to me. do you know which scenarios cause a crash vs a retry? if those scenarios are well-defined, it will help devs understand the circumstance that cause failure and allow them to make an educated decision on whether to catch and restart the client or to let it fail and investigate.

outside of that, this is great work!

@hnaoto
Copy link
Author

hnaoto commented Jun 16, 2022

I checked how ConnectionError exceptions are handled. ConnectionError can be thrown in a number of situations https://github.com/zendesk/ruby-kafka/search?q=ConnectionError

From the perspective of producer, looks like the pattern is:

  1. the ConnectionError will be caught by the upstream https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/producer.rb#L405
  2. the upstream will retry util the counter hit @max_retries https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/producer.rb#L422 (max_retries is a client/producer config)
  3. A connection can be re-established during retry. Otherwise, the client will crash after all the retries are exhausted.

@garrett528
Copy link

ok so it does have a retry mechanism that it hits. sounds like it's less of an issue with this client anyways since the ruby aws sdk handles the credential refresh automatically (the C client does not!).

@hnaoto
Copy link
Author

hnaoto commented Jun 16, 2022

Yes, there is a retry mechanism but once the client crashed, the messages inside the local buffer will be lost as well. (The behavior of buffer was briefly mentioned in this section: https://github.com/zendesk/ruby-kafka#buffering-and-error-handling). Ideally, the client should be able to re-establish the connection before it reaches the retry limits if there is an intermittent network issue.

😄 It would be much more complicated to implement refreshing credentials manually, and I think some people who worked on ruby AWS SDK had to conquer some hurdles as well. (Related threads: aws/aws-sdk-ruby#2641 and aws/aws-sdk-ruby#2642)

@github-actions
Copy link

Issue has been marked as stale due to a lack of activity.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants