18
18
import requests
19
19
from concurrent .futures import ProcessPoolExecutor
20
20
import aiohttp
21
- from aiokafka import AIOKafkaProducer
21
+ from aiokafka import AIOKafkaProducer , AIOKafkaConsumer
22
22
from aioch import Client
23
23
24
24
@@ -102,6 +102,7 @@ async def publish(self, event):
102
102
columns = "," .join (columns ))
103
103
try :
104
104
await self .client .execute (sql ,[event .data ])
105
+ logger .success ("{sql} insert clickhouse" .format (sql = sql ))
105
106
except :
106
107
logger .error (event .data )
107
108
logger .exception ("what?" )
@@ -118,13 +119,13 @@ def delivery_report(err, msg):
118
119
class KafkaSink (Sink ):
119
120
120
121
def __init__ (self , database_url ):
121
- Source .__init__ (self )
122
+ Sink .__init__ (self )
122
123
settings = parse_url (database_url )
123
124
loop = asyncio .get_event_loop ()
125
+ bootstrap = '{host}:{port}' .format (host = settings ['host' ],port = settings ['port' ])
124
126
self .producer = AIOKafkaProducer (
125
127
loop = loop ,
126
- bootstrap_servers = '{host}:{port}' .format (host = settings ['host' ],port = settings ['port' ]),
127
-
128
+ bootstrap_servers = bootstrap
128
129
)
129
130
self .producer .start ()
130
131
self .topic = settings ['name' ]
@@ -142,6 +143,31 @@ async def publish(self, event):
142
143
143
144
#logger.success("publish {event} success".format(event=str(event)))
144
145
146
+ class KafkaSource (Source ):
147
+
148
+ def __init__ (self , database_url ):
149
+ Source .__init__ (self )
150
+ settings = parse_url (database_url )
151
+ name = settings ['name' ]
152
+ user = settings ['user' ]
153
+ if user is None :
154
+ user = "default"
155
+ loop = asyncio .get_event_loop ()
156
+ bootstrap = '{host}:{port}' .format (host = settings ['host' ],port = settings ['port' ])
157
+ self .consumer = AIOKafkaConsumer (
158
+ name , loop = loop ,
159
+ bootstrap_servers = bootstrap ,
160
+ group_id = user
161
+ )
162
+
163
+ async def start (self ):
164
+ await self .consumer .start ()
165
+
166
+ async def subscribe (self ):
167
+ async for msg in self .consumer :
168
+ print (msg )
169
+ yield msg
170
+
145
171
146
172
147
173
class MetaServer ():
@@ -186,6 +212,8 @@ def __init__(self,source:str, sink:str, meta:str):
186
212
sconf = parse_url (source )
187
213
if sconf ['scheme' ] == 'mysql' :
188
214
self .source = MysqlSource (source )
215
+ elif sconf ['scheme' ] == 'kafka' :
216
+ self .source = KafkaSource (source )
189
217
else :
190
218
raise Exception ("unregister source" )
191
219
@@ -223,5 +251,7 @@ def rebuild_table(self, table):
223
251
async def sync (self ):
224
252
await self .sink .start ()
225
253
async for event in self .source .subscribe ():
254
+ debug (event )
255
+ continue
226
256
await self .metaserver .reg (event )
227
257
await self .sink .publish (event )
0 commit comments