File tree 2 files changed +8
-16
lines changed
2 files changed +8
-16
lines changed Original file line number Diff line number Diff line change @@ -32,15 +32,14 @@ async def process(message):
32
32
data = orjson .loads (message )
33
33
34
34
# as `path` may contain mixed data types Array(UInt32 | Array(Uint32)) (see https://ris-live.ripe.net/manual/)
35
- # check `path`` for AS_SET (the array in the array) and pop it into it's own key
36
- if "path" in data ["data" ]:
37
- for idx , i in enumerate (data ["data" ]["path" ]):
38
- if type (i ) == type (list ()):
39
- data ["data" ]["path" ].pop (int (idx ))
40
- data ["data" ]["as_set" ] = i
35
+ # check `path` for AS_SET (the array in the array) and pop it into it's own key
36
+ if "path" in data ["data" ] and data ["data" ]["path" ]:
37
+ if type (data ["data" ]["path" ][- 1 ]) == type (list ()):
38
+ data ["data" ]["as_set" ] = data ["data" ]["path" ][- 1 ].copy ()
39
+ data ["data" ]["path" ].pop ()
41
40
42
41
# print and pipe into whatever
43
- print (orjson .dumps (data ).decode ("utf-8" ))
42
+ print (orjson .dumps (data [ "data" ] ).decode ("utf-8" ))
44
43
45
44
46
45
if __name__ == "__main__" :
Original file line number Diff line number Diff line change @@ -5,18 +5,11 @@ decoding.codec = "json"
5
5
[transforms.update_only]
6
6
type = "filter"
7
7
inputs = ["stdin"]
8
- condition = '.data.type == "UPDATE"'
9
-
10
- [transforms.move_up]
11
- type = "remap"
12
- inputs = ["update_only"]
13
- source = """
14
- . = .data
15
- """
8
+ condition = '.type == "UPDATE"'
16
9
17
10
[sinks.clickhouse_http]
18
11
type = "http"
19
- inputs = ["move_up "]
12
+ inputs = ["update_only "]
20
13
encoding.codec = "json"
21
14
uri = "http://localhost:8123/?query=INSERT%20INTO%20bgp.ris_live_queue%20%28data%29%20FORMAT%20JSONAsString"
22
15
auth.strategy = "basic"
You can’t perform that action at this time.
0 commit comments