Skip to content

Commit 8fd15cb

Browse files
committed
rebuild table
1 parent 88b8d14 commit 8fd15cb

File tree

5 files changed

+20
-11
lines changed

5 files changed

+20
-11
lines changed

.env

-6
This file was deleted.

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
*.log
2+
.env
23
*.pyc
34
.cache/
45
.coverage

unimeta/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '0.0.12'
1+
__version__ = '0.0.14'

unimeta/mysql2ch.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ def sync() -> None:
1212
source = MysqlSource(database_url=mysql_url)
1313
sink = KafkaSink(database_url = kafka_url)
1414
pipe = Pipeline(source, sink)
15-
pipe.sync_tables()
16-
pipe.sync()
15+
pipe.rebuild_table('employees')
16+
#pipe.sync()
1717

1818
if __name__ == "__main__":
1919
sync()

unimeta/pipeline.py

+16-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import asyncio
1515
import json
16+
import random
1617
from confluent_kafka import Producer
1718
from loguru import logger
1819

@@ -34,7 +35,9 @@ class MysqlSource(Source):
3435
metatable:Dict[str,Table]
3536
stream: BinLogStreamReader
3637

37-
def __init__(self,database_url):
38+
def __init__(self,database_url,server_id:int=None):
39+
if server_id is None:
40+
server_id = random.randint(1,1000)
3841
Source.__init__(self)
3942
settings = parse_url(database_url)
4043
settings['db'] = settings['name']
@@ -128,10 +131,21 @@ def __init__(self,source, sink):
128131
def sync_tables(self):
129132
tables = self.source.metatable.values()
130133
for table in tables:
134+
print(table.name)
131135
ddl = table.get_ch_ddl()
132136
if ddl is not None:
133137
self.sink.execute(ddl)
134-
138+
139+
def rebuild_table(self, table):
140+
source_tables = self.source.metatable.values()
141+
for item in source_tables:
142+
if table == item.name:
143+
delete_stmt = "drop table {db}.{table}".format(db=item.db_name,table=table)
144+
create_stmt = item.get_ch_ddl()
145+
self.sink.execute(delete_stmt)
146+
self.sink.execute(create_stmt)
147+
148+
135149
def sync(self):
136150
for event in self.source.subscribe():
137151
self.sink.publish(event)

0 commit comments

Comments
 (0)