Skip to content

Commit 7874835

Browse files
committed
FIX #20
1 parent a51339c commit 7874835

File tree

3 files changed

+53
-1
lines changed

3 files changed

+53
-1
lines changed

README.adoc

+32
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,38 @@ Group id group-XX - Consumer id: consumer-group-XX-1-421db3e2-6501-45b1-acfd-275
364364
Group id group-XX - Consumer id: consumer-group-XX-1-421db3e2-6501-45b1-acfd-275ce8d18368 - Topic: users_clicks - Partition: 1 - Offset: 0 - Key: frank - Value: 1
365365
----
366366
367+
=== Python Consumer
368+
369+
Install python lib _confluent-kafka_:
370+
371+
[source,bash]
372+
----
373+
pip install confluent-kafka
374+
----
375+
376+
Create topic:
377+
378+
[source,bash]
379+
----
380+
kafka-topics --bootstrap-server localhost:9092 --create --topic kafka-topic --replication-factor 1 --partitions 1
381+
----
382+
383+
Run producer:
384+
385+
[source,bash]
386+
----
387+
cd kafka-python-producer
388+
python producer.py
389+
----
390+
391+
Run consumer:
392+
393+
[source,bash]
394+
----
395+
cd kafka-python-consumer
396+
python consumer.py
397+
----
398+
367399
== Admin & Management
368400
369401
=== Kafka CLI Tools

kafka-python-consumer/consumer.py

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from confluent_kafka import Consumer
2+
3+
conf = {'bootstrap.servers': "localhost:9092", 'group.id': 'pythongroup', 'auto.offset.reset': 'earliest'}
4+
topic = 'test-python'
5+
6+
consumer = Consumer(conf)
7+
8+
consumer.subscribe([topic])
9+
10+
while True:
11+
msg = consumer.poll(1.0)
12+
13+
if msg is None:
14+
continue
15+
if msg.error():
16+
print("Error: {}".format(msg.error()))
17+
continue
18+
19+
print('Message: {}'.format(msg.value().decode('utf-8')))
20+
21+
consumer.close()

kafka-python-producer/producer.py

-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ def ack(error, message):
1111
print("offset={}".format(message.offset()))
1212

1313

14-
1514
conf = {'bootstrap.servers': "localhost:9092"}
1615

1716
producer = Producer(conf)

0 commit comments

Comments
 (0)