-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgraphql.py
140 lines (121 loc) · 3.89 KB
/
graphql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# -*- coding: utf-8 -*-
import asyncio
import time
from aiokafka import AIOKafkaConsumer
from tartiflette import Resolver
from tartiflette import Scalar
from tartiflette import Subscription
from tartiflette_starlette import GraphiQL
from tartiflette_starlette import Subscriptions
from tartiflette_starlette import TartifletteApp
import config
from util import debezium_deserializer
from util import utf8_deserializer
# define the graphql sdl
sdl = """
scalar JSON
type Query {
_: Boolean
}
type Subscription {
kafka(topics: [String], from: Int): JSON
sample(topic: String!, rate: Float): String
topics: JSON
}
"""
# define graphql Scalar for json
@Scalar("JSON")
class ScalarJSON:
def coerce_output(self, value):
return value
def coerce_input(self, value):
return json.loads(value)
def parse_literal(self, ast):
if isinstance(ast, StringValueNode):
return json.loads(ast)
return "UNDEFINED_VALUE"
# define a graphql subscription allowing one to subscribe to any kafka topic
@Subscription("Subscription.kafka")
async def on_kafka(parent, args, context, info):
consumer = AIOKafkaConsumer(
*args["topics"],
bootstrap_servers=config.BOOTSTRAP_SERVERS,
loop=asyncio.get_running_loop(),
value_deserializer=debezium_deserializer
)
try:
await consumer.start()
start_from = args.get("from", int(time.time())) * 1000
partitions = consumer.assignment()
partition_times = {partition: start_from for partition in partitions}
partition_offsets = await consumer.offsets_for_times(partition_times)
for partition in partitions:
if partition in partition_offsets and partition_offsets[partition]:
consumer.seek(partition, partition_offsets[partition].offset)
async for msg in consumer:
yield msg.value
except Exception as e:
yield repr(e)
finally:
await consumer.stop()
@Subscription("Subscription.topics")
async def on_topics(parent, args, context, info):
consumer = AIOKafkaConsumer(
bootstrap_servers=config.BOOTSTRAP_SERVERS,
loop=asyncio.get_running_loop(),
value_deserializer=debezium_deserializer,
)
await consumer.start()
topics = []
try:
while True:
new_topics = [topic for topic in await consumer.topics()]
if topics != new_topics:
topics = new_topics
yield topics
await asyncio.sleep(10)
except Exception as e:
yield repr(e)
finally:
await consumer.stop()
@Subscription("Subscription.sample")
async def on_sample(parent, args, context, info):
from secrets import token_urlsafe
topic = args["topic"]
consumer = AIOKafkaConsumer(
topic,
group_id="kafka-graphql-bridge" + token_urlsafe(16),
bootstrap_servers=config.BOOTSTRAP_SERVERS,
loop=asyncio.get_running_loop(),
value_deserializer=utf8_deserializer,
)
await consumer.start()
previous_offset = -1
try:
while True:
partitions = consumer.assignment()
end_offsets = await consumer.end_offsets(partitions)
for partition, offset in end_offsets.items():
if offset and previous_offset != offset:
consumer.seek(partition, max(0, offset - 1))
sample = await consumer.getone()
previous_offset = offset
yield sample.value
await asyncio.sleep(1 / min(args["rate"], 1000))
except Exception as e:
yield repr(e)
finally:
await consumer.stop()
graphql_app = TartifletteApp(
sdl=sdl,
subscriptions=True,
graphiql=GraphiQL(
default_query="""
subscription {
kafka(topics: ["mysql1.inventory.customers"]),
}
"""
)
if config.DEBUG
else None,
)