-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
Copy pathproducer.py
128 lines (102 loc) · 3.64 KB
/
producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# Copyright 2025 Google LLC All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import datetime
import json
import random
import time
import urllib3
import google.auth
import google.auth.transport.urllib3
import confluent_kafka
# Token Provider class
# This class handles the OAuth token retrieval and formatting
class TokenProvider(object):
def __init__(self):
self.credentials = google.auth.default()
self.http_client = urllib3.PoolManager()
self.HEADER = json.dumps(dict(typ="JWT", alg="GOOG_OAUTH2_TOKEN"))
def valid_credentials(self):
if not self.credentials.valid:
self.credentials.refresh(
google.auth.transport.urllib3.Request(self.http_client))
return self.credentials
def get_jwt(self, creds):
return json.dumps(
dict(
exp=creds.expiry.timestamp(),
iss="Google",
iat=datetime.datetime.now(datetime.timezone.utc).timestamp(),
scope="kafka",
sub=creds.service_account_email,
))
def b64_encode(self, source):
return (base64.urlsafe_b64encode(
source.encode("utf-8")).decode("utf-8").rstrip("="))
def get_kafka_access_token(self, creds):
return ".".join([
self.b64_encode(self.HEADER),
self.b64_encode(self.get_jwt(creds)),
self.b64_encode(creds.token),
])
def token(self):
creds = self.valid_credentials()
return self.get_kafka_access_token(creds)
def confluent_token(self):
creds = self.valid_credentials()
utc_expiry = creds.expiry.replace(tzinfo=datetime.timezone.utc)
expiry_seconds = (
utc_expiry -
datetime.datetime.now(datetime.timezone.utc)).total_seconds()
return self.get_kafka_access_token(creds), time.time() + expiry_seconds
# Confluent does not use a TokenProvider object
# It calls a method
def make_token(args):
"""Method to get the Token"""
t = TokenProvider()
token = t.confluent_token()
return token
# TODO (for the user): Update Following Variables
kafka_cluster_name = "CLUSTER_ID"
region = "us-central1"
project_id = "PROJECT_ID"
port = "9092"
kafka_topic_name = "example-topic"
# Kafka Producer configuration with OAUTHBEARER authentication
config = {
"bootstrap.servers":
f"bootstrap.{kafka_cluster_name}.{region}.managedkafka.{project_id}.cloud.goog:{port}",
"security.protocol":
"SASL_SSL",
"sasl.mechanisms":
"OAUTHBEARER",
"oauth_cb":
make_token,
}
producer = confluent_kafka.Producer(config)
# Produce and submit 10 messages
for i in range(10):
# Generate a random message
now = datetime.datetime.now()
datetime_string = now.strftime("%Y-%m-%d %H:%M:%S")
message_data = {
"random_id": random.randint(1, 100),
"date_time": datetime_string
}
# Serialize data to bytes
serialized_data = json.dumps(message_data).encode("utf-8")
# Produce the message
producer.produce(kafka_topic_name, serialized_data)
print(f"Produced {i+1} messages")
producer.flush()