Skip to content

Commit

Permalink
Remove cleaning step from indexer pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-snx committed Jan 8, 2025
1 parent b982bcf commit 4995e8a
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 76 deletions.
64 changes: 0 additions & 64 deletions indexers/scripts/clean_parquet.py

This file was deleted.

21 changes: 12 additions & 9 deletions indexers/scripts/import_parquet.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import argparse
import os
import time
from pathlib import Path
import clickhouse_connect
from utils import create_table_from_schema, insert_data_from_path
from utils import create_table_from_schema, insert_data_from_path, get_event_list_from_file_names

CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean"
CLEAN_DATA_PATH = "/parquet-data/indexers/clean"
CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/raw"
DATA_PATH = "/parquet-data/indexers/raw"
SCHEMAS_PATH = "/parquet-data/indexers/schemas"


Expand All @@ -24,17 +25,19 @@ def init_tables_from_schemas(client, network_name: str, protocol_name: str):

def import_parquet_files(client, network_name: str, protocol_name: str):
print(f"Inserting {network_name} {protocol_name} data into tables")
clean_path = Path(f"{CLEAN_DATA_PATH}/{network_name}/{protocol_name}")
raw_path = Path(f"{DATA_PATH}/{network_name}/{protocol_name}")
db_name = f"raw_{network_name}"

for event_name in clean_path.iterdir():
if not event_name.is_dir():
continue
event_name = event_name.name
event_list = get_event_list_from_file_names(raw_path, network_name, raw_path)

time_start = time.time()
for event_name in event_list:
table_name = f"{protocol_name}_{event_name}"
file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{network_name}/{protocol_name}/{event_name}/*.parquet"
file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{network_name}/{protocol_name}/*/{event_name}.parquet"

insert_data_from_path(client, db_name, table_name, file_path)
time_end = time.time()
print(f"Time taken: {time_end - time_start} seconds")


if __name__ == "__main__":
Expand Down
25 changes: 22 additions & 3 deletions indexers/scripts/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
import re
from pathlib import Path
from clickhouse_connect.driver.client import Client


def get_event_list_from_file_names(root: str, network: str, data_path: str) -> set[str]:
event_list = set()
path = Path(data_path).rglob("*.parquet")
for parquet_file in path:
event_name = parquet_file.stem
event_list.add(event_name)
print(f"Found {len(event_list)} events for {network}")
return event_list


def create_table_from_schema(client: Client, path: str):
try:
with open(path, "r") as file:
Expand All @@ -16,10 +27,18 @@ def create_table_from_schema(client: Client, path: str):


def insert_data_from_path(client: Client, db_name: str, table_name: str, path: str):
query = (
f"insert into {db_name}.{table_name} select * from file('{path}', 'Parquet')"
)
columns_query = f"describe file('{path}', 'Parquet')"
try:
columns = client.query(columns_query).named_results()
column_mappings = [
f"{col['name']} as {convert_case(col['name'])}"
for col in columns
]
select_expr = ", ".join(column_mappings)
query = (
f"insert into {db_name}.{table_name} "
f"select {select_expr} from file('{path}', 'Parquet')"
)
client.command(query)
except Exception as e:
print(f"Error inserting data into {db_name}.{table_name}: {e}")
Expand Down

0 comments on commit 4995e8a

Please sign in to comment.