Skip to content

Commit

Permalink
rdkafka, waterdrop examples (#14)
Browse files Browse the repository at this point in the history
* rdkafka example

* remove shale

* more docs

* more docs

* rubocop

* rubocop

* module scope

* Update examples/rdkafka/README.md

* Update examples/rdkafka/README.md

* Update examples/rdkafka/README.md

* sentence order

* waterdrop example

* rubocop

* set token failure on failure

* karafka

* karafka

* karafka

* exclude examples from codeclimate
  • Loading branch information
bruce-szalwinski-he authored May 6, 2024
1 parent 1b0bb66 commit 6d45f2b
Show file tree
Hide file tree
Showing 14 changed files with 394 additions and 65 deletions.
1 change: 1 addition & 0 deletions .codeclimate.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
exclude_patterns:
- "**/thor_ext.rb"
- "examples/"
67 changes: 2 additions & 65 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,72 +34,10 @@ This is the preferred method to install aws-msk-iam-sasl-signer-ruby, as it will
gem install aws-msk-iam-sasl-signer
```

## Usage
## Examples

```ruby

# frozen_string_literal: true
require "aws-msk-iam-sasl-signer"
require "json"
require "rdkafka"

KAFKA_TOPIC = ENV['KAFKA_TOPIC']
KAFKA_BOOTSTRAP_SERVERS = ENV['KAFKA_BOOTSTRAP_SERVERS']

kafka_config = {
"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
"security.protocol": 'sasl_ssl',
"sasl.mechanisms": 'OAUTHBEARER',
"client.id": 'ruby-producer',
}

def refresh_token(client, config)
signer = AwsMskIamSaslSigner::MSKTokenProvider.new(region: 'us-east-1')
auth_token = signer.generate_auth_token

error_buffer = FFI::MemoryPointer.from_string(' ' * 256)
response = Rdkafka::Bindings.rd_kafka_oauthbearer_set_token(
client, auth_token.token, auth_token.expiration_time_ms, 'kafka-cluster', nil, 0, error_buffer, 256
)
return unless response != 0

Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure(client,
"Failed to set token: #{error_buffer.read_string}")

end

# set the token refresh callback
Rdkafka::Config.oauthbearer_token_refresh_callback = method(:refresh_token)
producer = Rdkafka::Config.new(kafka_config).producer
- [rdkafka](examples/rdkafka/README.md) (using rdkafka gem)

# seed the token
# events_poll will invoke all registered callbacks, of which oauthbearer_token_refresh_callback is one

consumer = Rdkafka::Config.new(kafka_config).consumer
consumer.events_poll

# produce some messages

Payload = Data.define(:device_id, :creation_timestamp, :temperature)

loop do
payload = Payload.new(
device_id: '1234',
creation_timestamp: Time.now.to_i,
temperature: rand(0..100)
)

handle = producer.produce(
topic: KAFKA_TOPIC,
payload: payload.to_h.to_json,
key: "ruby-kafka-#{rand(0..999)}"
)
handle.wait(max_wait_timeout: 10)

sleep(10)
end

```

In order to use a named profile to generate the token, replace the `generate_auth_token` function with code below:

Expand All @@ -110,7 +48,6 @@ In order to use a named profile to generate the token, replace the `generate_aut
)
```


In order to use a role arn to generate the token, replace the `generate_auth_token` function with code below:

```ruby
Expand Down
6 changes: 6 additions & 0 deletions examples/rdkafka/Gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
source "https://rubygems.org"

gem "aws-msk-iam-sasl-signer"
gem "base64"
gem "karafka-rdkafka"
gem "rexml", "~> 3.2"
46 changes: 46 additions & 0 deletions examples/rdkafka/Gemfile.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
GEM
remote: https://rubygems.org/
specs:
aws-eventstream (1.3.0)
aws-msk-iam-sasl-signer (0.1.0)
aws-sdk-kafka
thor
aws-partitions (1.924.0)
aws-sdk-core (3.194.1)
aws-eventstream (~> 1, >= 1.3.0)
aws-partitions (~> 1, >= 1.651.0)
aws-sigv4 (~> 1.8)
jmespath (~> 1, >= 1.6.1)
aws-sdk-kafka (1.70.0)
aws-sdk-core (~> 3, >= 3.193.0)
aws-sigv4 (~> 1.1)
aws-sigv4 (1.8.0)
aws-eventstream (~> 1, >= 1.0.2)
base64 (0.2.0)
bigdecimal (3.1.7)
ffi (1.16.3)
jmespath (1.6.2)
karafka-rdkafka (0.15.0)
ffi (~> 1.15)
mini_portile2 (~> 2.6)
rake (> 12)
mini_portile2 (2.8.6)
rake (13.2.1)
rexml (3.2.6)
shale (1.1.0)
bigdecimal
thor (1.3.1)

PLATFORMS
arm64-darwin-22
ruby

DEPENDENCIES
aws-msk-iam-sasl-signer
base64
karafka-rdkafka
rexml (~> 3.2)
shale

BUNDLED WITH
2.5.7
58 changes: 58 additions & 0 deletions examples/rdkafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# aws-msk-iam-sasl-signer with rdkafka

This example demonstrates how to use [aws-msk-iam-sasl-signer-ruby](https://github.com/bruce-szalwinski-he/aws-msk-iam-sasl-signer-ruby) with the [karafka-rdkafka](https://rubygems.org/gems/karafka-rdkafka).


## Usage

```bash
export AWS_REGION=us-west-2
export AWS_ACCOUNT_ID=123456789012
export CLUSTER_NAME=my-cluster
export CLUSTER_UUID=abc
export KAFKA_TOPIC=my-topic

# Create AWS MSK Cluster with IAM authentication enabled
# https://docs.aws.amazon.com/msk/latest/developerguide/msk-iam.html

export CLUSTER_ARN="arn:aws:kafka:${AWS_REGION}:${AWS_ACCOUNT_ID}:cluster/${CLUSTER_NAME}/${CLUSTER_UUID}"
export KAFKA_BROKERS=$(aws kafka get-bootstrap-brokers --cluster-arn ${CLUSTER_ARN} | jq -r ".BootstrapBrokerStringSaslIam")
bundle install
bundle exec ruby example.rb
```

## Key Things to Know

Configure the oauthbearer refresh token callback.
Create a producer / consumer / admin client using the delayed start feature.
This allows the client to be registered with the `@clients` hash.
Then start the client using the `start` method.

```ruby
def self.start!(kafka_config)
Rdkafka::Config.oauthbearer_token_refresh_callback = method(:refresh_token)
@producer = Rdkafka::Config.new(kafka_config).producer(native_kafka_auto_start: false)
@clients[@producer.name] = @producer
@producer.start
end
```

At the start and every time the token needs to be refreshed, the `refresh_token` method is called.
The callback will receive the name of the client.
Use the `@clients` hash to get the client by name.
Use the `AwsMskIamSaslSigner::MSKTokenProvider` to generate a new token and set it on the client using the `oauthbearer_set_token` method.

```ruby
def refresh_token(client_name)
print "refreshing token\n"
signer = AwsMskIamSaslSigner::MSKTokenProvider.new(region: ENV['AWS_REGION'])
token = signer.generate_auth_token

client = Producer.from_name(client_name)
client.oauthbearer_set_token(
token: token.token,
lifetime_ms: token.expiration_time_ms,
principal_name: 'kafka-cluster'
)
end
```
33 changes: 33 additions & 0 deletions examples/rdkafka/example.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# frozen_string_literal: true

require "json"
require_relative "producer"

Payload = Data.define(:device_id, :creation_timestamp, :temperature)

KAFKA_TOPIC = ENV.fetch("KAFKA_TOPIC", nil)
KAFKA_BOOTSTRAP_SERVERS = ENV.fetch("KAFKA_BOOTSTRAP_SERVERS", nil)

kafka_config = {
"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
"security.protocol": "sasl_ssl",
"sasl.mechanisms": "OAUTHBEARER",
"client.id": "ruby-producer"
}

print "starting\n"
Producer.start!(kafka_config)
loop do
print "producing\n"
payload = Payload.new(
device_id: "1234",
creation_timestamp: Time.now.to_i,
temperature: rand(0..100)
)

Producer.produce(
topic: KAFKA_TOPIC,
payload: payload.to_json
)
sleep(10)
end
29 changes: 29 additions & 0 deletions examples/rdkafka/oauth_token_refresher.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# frozen_string_literal: true

require "aws_msk_iam_sasl_signer"
require "rdkafka"

class OAuthTokenRefresher
def refresh_token(client_name)
print "refreshing token\n"
client = Producer.from_name(client_name)
signer = AwsMskIamSaslSigner::MSKTokenProvider.new(region: ENV.fetch("AWS_REGION", nil))
token = signer.generate_auth_token

if token
client.oauthbearer_set_token(
token: token.token,
lifetime_ms: token.expiration_time_ms,
principal_name: "kafka-cluster"
)
else
client.oauthbearer_set_token_failure(
"Failed to generate token."
)
end
end
end

def refresh_token(_config, client_name)
OAuthTokenRefresher.new.refresh_token(client_name)
end
26 changes: 26 additions & 0 deletions examples/rdkafka/producer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

require "rdkafka"
require_relative "oauth_token_refresher"

module Producer
@clients = {}

def self.from_name(client_name)
raise "Client not found: #{client_name}\n" if @clients[client_name].nil?

@clients[client_name]
end

def self.start!(kafka_config)
Rdkafka::Config.oauthbearer_token_refresh_callback = method(:refresh_token)
@producer = Rdkafka::Config.new(kafka_config).producer(native_kafka_auto_start: false)
@clients[@producer.name] = @producer
@producer.start
end

def self.produce(**args)
handle = @producer.produce(**args)
handle.wait(max_wait_timeout: 10)
end
end
6 changes: 6 additions & 0 deletions examples/waterdrop/Gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
source "https://rubygems.org"

gem "aws-msk-iam-sasl-signer"
gem "base64"
gem "rexml", "~> 3.2"
gem "waterdrop"
48 changes: 48 additions & 0 deletions examples/waterdrop/Gemfile.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
GEM
remote: https://rubygems.org/
specs:
aws-eventstream (1.3.0)
aws-msk-iam-sasl-signer (0.1.0)
aws-sdk-kafka
thor
aws-partitions (1.924.0)
aws-sdk-core (3.194.1)
aws-eventstream (~> 1, >= 1.3.0)
aws-partitions (~> 1, >= 1.651.0)
aws-sigv4 (~> 1.8)
jmespath (~> 1, >= 1.6.1)
aws-sdk-kafka (1.70.0)
aws-sdk-core (~> 3, >= 3.193.0)
aws-sigv4 (~> 1.1)
aws-sigv4 (1.8.0)
aws-eventstream (~> 1, >= 1.0.2)
base64 (0.2.0)
ffi (1.16.3)
jmespath (1.6.2)
karafka-core (2.4.0)
karafka-rdkafka (>= 0.15.0, < 0.16.0)
karafka-rdkafka (0.15.0)
ffi (~> 1.15)
mini_portile2 (~> 2.6)
rake (> 12)
mini_portile2 (2.8.6)
rake (13.2.1)
rexml (3.2.6)
thor (1.3.1)
waterdrop (2.7.0)
karafka-core (>= 2.4.0, < 3.0.0)
zeitwerk (~> 2.3)
zeitwerk (2.6.13)

PLATFORMS
arm64-darwin-22
ruby

DEPENDENCIES
aws-msk-iam-sasl-signer
base64
rexml (~> 3.2)
waterdrop

BUNDLED WITH
2.5.7
Loading

0 comments on commit 6d45f2b

Please sign in to comment.