Skip to content

Commit 2affe05

Browse files
committed
Initial commit
0 parents  commit 2affe05

10 files changed

+450
-0
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
.venv/
2+
vector/vector.toml

.vscode/ris_live.code-workspace

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"folders": [
3+
{
4+
"path": ".."
5+
}
6+
],
7+
"settings": {
8+
"python.formatting.provider": "black"
9+
}
10+
}

LICENSE

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
MIT License
2+
3+
Copyright (c) 2022 uplol
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.

README.md

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Ingestor
2+
3+
Example pipeline to ingest [RIS Live](https://ris-live.ripe.net/) BGP messages into [Clickhouse](https://clickhouse.com/) using [Vector](https://vector.dev/)
4+
5+
> **Warning**
6+
> Intended as a simple POC to prove out ingesting RIS Live messages
7+
8+
## Example Pipeline
9+
10+
Setup tables and materialized view:
11+
12+
`clickhouse-client --queries-file ./sql/clickhouse.sql`
13+
14+
Ingest and process messages:
15+
16+
`python ris_live.py | vector --config-toml ./vector/vector.toml`

poetry.lock

+253
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

poetry.toml

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[virtualenvs]
2+
in-project = true

pyproject.toml

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
[tool.poetry]
2+
name = "ris-live"
3+
version = "0.1.0"
4+
description = "Ingest RIS Live BGP messages"
5+
authors = []
6+
license = "MIT"
7+
readme = "README.md"
8+
packages = [{include = "ris_live"}]
9+
10+
[tool.poetry.dependencies]
11+
python = "^3.10"
12+
asyncio = "^3.4.3"
13+
websockets = "^10.3"
14+
orjson = "^3.8.0"
15+
16+
[tool.poetry.group.dev.dependencies]
17+
black = "^22.10.0"
18+
19+
[build-system]
20+
requires = ["poetry-core"]
21+
build-backend = "poetry.core.masonry.api"

ris_live.py

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import asyncio
2+
import websockets
3+
import orjson
4+
5+
6+
async def main():
7+
async for websocket in websockets.connect("wss://ris-live.ripe.net/v1/ws"):
8+
9+
# Subscribe to everything
10+
await websocket.send(
11+
orjson.dumps(
12+
{
13+
"type": "ris_subscribe",
14+
"data": {
15+
"moreSpecific": True,
16+
"socketOptions": {"includeRaw": False},
17+
},
18+
}
19+
)
20+
)
21+
22+
# Blindly process forever!
23+
try:
24+
async for message in websocket:
25+
await process(message)
26+
except websockets.ConnectionClosed:
27+
continue
28+
29+
30+
async def process(message):
31+
32+
data = orjson.loads(message)
33+
34+
# as `path` may contain mixed data types Array(UInt32 | Array(Uint32)) (see https://ris-live.ripe.net/manual/)
35+
# check path for AS_SET (the array in the array) and pop it into it's own key
36+
if "path" in data["data"]:
37+
for idx, i in enumerate(data["data"]["path"]):
38+
if type(i) == type(list()):
39+
data["data"]["path"].pop(int(idx))
40+
data["data"]["as_set"] = i
41+
42+
# print to stout to pipe into whatever
43+
print(orjson.dumps(data).decode("utf-8"))
44+
45+
46+
if __name__ == "__main__":
47+
loop = asyncio.new_event_loop()
48+
loop.run_until_complete(main())
49+
loop.run_forever()

sql/clickhouse.sql

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
create database if not exists bgp;
2+
3+
create table if not exists bgp.ris_live_queue
4+
(
5+
data String
6+
)
7+
ENGINE = Null;
8+
9+
create table if not exists bgp.ris_live
10+
(
11+
aggregator Nullable(String),
12+
announcements Nested(
13+
next_hop String,
14+
prefixes Array(String)
15+
),
16+
as_set Array(Int32),
17+
community Array(Array(Nullable(UInt32))),
18+
host String,
19+
id String,
20+
med Nullable(UInt32),
21+
origin Nullable(String),
22+
path Array(Nullable(UInt32)),
23+
peer String,
24+
peer_asn String,
25+
timestamp DateTime64,
26+
type LowCardinality(String),
27+
withdrawals Array(Nullable(String))
28+
)
29+
ENGINE = MergeTree()
30+
ORDER BY (timestamp, id);
31+
32+
create materialized view if not exists bgp.ris_live_mv to bgp.ris_live
33+
as select
34+
JSONExtractArrayRaw(data, 'announcements') as announcements_,
35+
arrayMap(c -> (JSONExtractString(c, 'next_hop')), announcements_) as `announcements.next_hop`,
36+
arrayMap(c -> (JSONExtractArrayRaw(c, 'prefixes')), announcements_) as `announcements.prefixes`,
37+
JSONExtractString(data, 'aggregator') as aggregator,
38+
JSONExtractArrayRaw(data, 'as_set')::Array(UInt32) as as_set,
39+
arrayMap(c -> (c::Array(UInt32)), JSONExtractArrayRaw(data, 'community')) as community,
40+
JSONExtractString(data, 'host') as host,
41+
JSONExtractString(data, 'id') as id,
42+
JSONExtractUInt(data, 'med') as med,
43+
JSONExtractString(data, 'origin') as origin,
44+
JSONExtractArrayRaw(data, 'path')::Array(UInt32) as path,
45+
JSONExtractString(data, 'peer') as peer,
46+
JSONExtractString(data, 'peer_asn') as peer_asn,
47+
JSONExtractString(data, 'timestamp') as timestamp,
48+
JSONExtractString(data, 'type') as type,
49+
JSONExtractArrayRaw(data, 'withdrawals') as withdrawals
50+
from bgp.ris_live_queue;

vector/vector.toml.example

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
[sources.stdin]
2+
type = "stdin"
3+
decoding.codec = "json"
4+
5+
[transforms.update_only]
6+
type = "filter"
7+
inputs = ["stdin"]
8+
condition = '.data.type == "UPDATE"'
9+
10+
[transforms.move_up]
11+
type = "remap"
12+
inputs = ["update_only"]
13+
source = """
14+
. = .data
15+
"""
16+
17+
[sinks.clickhouse_http]
18+
type = "http"
19+
inputs = ["move_up"]
20+
encoding.codec = "json"
21+
uri = "http://localhost:8123/?query=INSERT%20INTO%20bgp.ris_live_queue%20%28data%29%20FORMAT%20JSONAsString"
22+
auth.strategy = "basic"
23+
auth.user = "USERNAME"
24+
auth.password = "PASSWORD"
25+
request.concurrency = "adaptive"
26+
request.retry_attempts = 0

0 commit comments

Comments
 (0)