Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tracing) Change the return type of the native execute #2279

Merged
merged 6 commits into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion snuba/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def get_active_partitions(
assert isinstance(schema, TableSchema)
part_format = schema.get_part_format()
assert part_format is not None
return [util.decode_part_str(part, part_format) for part, in response]
return [util.decode_part_str(part, part_format) for part, in response.results]


def filter_stale_partitions(
Expand Down
38 changes: 30 additions & 8 deletions snuba/clickhouse/native.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from __future__ import annotations

import logging
import queue
import re
import time
from dataclasses import dataclass, field
from datetime import date, datetime
from typing import Any, Mapping, Optional, Sequence, Union
from uuid import UUID
Expand All @@ -23,6 +26,12 @@
metrics = MetricsWrapper(environment.metrics, "clickhouse.native")


@dataclass(frozen=True)
class ClickhouseResult:
results: Sequence[Any] = field(default_factory=list)
meta: Sequence[Any] | None = None


class ClickhousePool(object):
def __init__(
self,
Expand Down Expand Up @@ -63,7 +72,7 @@ def execute(
settings: Optional[Mapping[str, Any]] = None,
types_check: bool = False,
columnar: bool = False,
) -> Sequence[Any]:
) -> ClickhouseResult:
"""
Execute a clickhouse query with a single quick retry in case of
connection failure.
Expand All @@ -84,7 +93,7 @@ def execute(
conn = self._create_conn()

try:
result: Sequence[Any] = conn.execute(
result_data: Sequence[Any] = conn.execute(
query,
params=params,
with_column_types=with_column_types,
Expand All @@ -93,6 +102,15 @@ def execute(
types_check=types_check,
columnar=columnar,
)
if with_column_types:
result = ClickhouseResult(
results=result_data[0], meta=result_data[1],
)
else:
if not isinstance(result_data, (list, tuple)):
result_data = [result_data]
result = ClickhouseResult(results=result_data)

return result
except (errors.NetworkError, errors.SocketTimeoutError, EOFError) as e:
metrics.increment("connection_error")
Expand All @@ -113,7 +131,7 @@ def execute(
finally:
self.pool.put(conn, block=False)

return []
return ClickhouseResult()

def execute_robust(
self,
Expand All @@ -124,7 +142,7 @@ def execute_robust(
settings: Optional[Mapping[str, Any]] = None,
types_check: bool = False,
columnar: bool = False,
) -> Sequence[Any]:
) -> ClickhouseResult:
"""
Execute a clickhouse query with a bit more tenacity. Make more retry
attempts, (infinite in the case of too many simultaneous queries
Expand Down Expand Up @@ -244,13 +262,13 @@ class NativeDriverReader(Reader):
def __init__(self, client: ClickhousePool) -> None:
self.__client = client

def __transform_result(self, result: Sequence[Any], with_totals: bool) -> Result:
def __transform_result(self, result: ClickhouseResult, with_totals: bool) -> Result:
"""
Transform a native driver response into a response that is
structurally similar to a ClickHouse-flavored JSON response.
"""
data, meta = result

meta = result.meta if result.meta is not None else []
data = result.results
# XXX: Rows are represented as mappings that are keyed by column or
# alias, which is problematic when the result set contains duplicate
# names. To ensure that the column headers and row data are consistent
Expand All @@ -269,7 +287,11 @@ def __transform_result(self, result: Sequence[Any], with_totals: bool) -> Result
if with_totals:
assert len(data) > 0
totals = data.pop(-1)
new_result = {"data": data, "meta": meta, "totals": totals}
new_result = {
"data": data,
"meta": meta,
"totals": totals,
}
else:
new_result = {"data": data, "meta": meta}

Expand Down
2 changes: 1 addition & 1 deletion snuba/clusters/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ def __get_cluster_nodes(self, cluster_name: str) -> Sequence[ClickhouseNode]:
ClickhouseClientSettings.QUERY
).execute(
f"select host_name, port, shard_num, replica_num from system.clusters where cluster={escape_string(cluster_name)}"
)
).results
]


Expand Down
2 changes: 1 addition & 1 deletion snuba/migrations/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def check_clickhouse_connections() -> None:


def check_clickhouse(clickhouse: ClickhousePool) -> None:
ver = clickhouse.execute("SELECT version()")[0][0]
ver = clickhouse.execute("SELECT version()").results[0][0]
if version.parse(ver) < version.parse(CLICKHOUSE_SERVER_MIN_VERSION):
raise InvalidClickhouseVersion(
f"Snuba requires Clickhouse version {CLICKHOUSE_SERVER_MIN_VERSION}"
Expand Down
2 changes: 1 addition & 1 deletion snuba/migrations/parse_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,6 @@ def get_local_schema(
return {
column_name: _get_column(column_type, default_type, default_expr, codec_expr)
for column_name, column_type, default_type, default_expr, _comment, codec_expr in [
cols[:6] for cols in conn.execute("DESCRIBE TABLE %s" % table_name)
cols[:6] for cols in conn.execute("DESCRIBE TABLE %s" % table_name).results
]
}
6 changes: 3 additions & 3 deletions snuba/migrations/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def get_status(
"group": migration_key.group.value,
"migration_id": migration_key.migration_id,
},
)
).results

if data:
status, timestamp = data[0]
Expand Down Expand Up @@ -331,7 +331,7 @@ def _get_next_version(self, migration_key: MigrationKey) -> int:
"group": migration_key.group.value,
"migration_id": migration_key.migration_id,
},
)
).results
if result:
(version,) = result[0]
return int(version) + 1
Expand All @@ -356,7 +356,7 @@ def _get_migration_status(self) -> Mapping[MigrationKey, Status]:
try:
for row in self.__connection.execute(
f"SELECT group, migration_id, status FROM {self.__table_name} FINAL WHERE group IN {migration_groups}"
):
).results:
group_name, migration_id, status_name = row
data[MigrationKey(MigrationGroup(group_name), migration_id)] = Status(
status_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def fix_order_by(_logger: logging.Logger) -> None:

((curr_primary_key,),) = clickhouse.execute(
f"SELECT primary_key FROM system.tables WHERE name = '{TABLE_NAME}' AND database = '{database}'"
)
).results

assert curr_primary_key in [
new_primary_key,
Expand All @@ -47,15 +47,15 @@ def fix_order_by(_logger: logging.Logger) -> None:

# There shouldn't be any data in the table yet
assert (
clickhouse.execute(f"SELECT COUNT() FROM {TABLE_NAME} FINAL;")[0][0] == 0
clickhouse.execute(f"SELECT COUNT() FROM {TABLE_NAME} FINAL;").results[0][0] == 0
), f"{TABLE_NAME} is not empty"

new_order_by = f"ORDER BY ({new_primary_key})"
old_order_by = f"ORDER BY {old_primary_key}"

((curr_create_table_statement,),) = clickhouse.execute(
f"SHOW CREATE TABLE {database}.{TABLE_NAME}"
)
).results

new_create_table_statement = curr_create_table_statement.replace(
TABLE_NAME, TABLE_NAME_NEW
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def backfill_errors(logger: logging.Logger) -> None:
ORDER BY timestamp ASC, project_id ASC
LIMIT 1
"""
)[0]
).results[0]

logger.info("Error data was found")
except IndexError:
Expand Down Expand Up @@ -199,7 +199,7 @@ def backfill_errors(logger: logging.Logger) -> None:
ORDER BY c DESC, partition
""",
{"database": database_name, "table": errors_table_name},
)
).results

for partition, _count in partitions:
clickhouse.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def forwards(logger: logging.Logger) -> None:

((curr_sampling_key, curr_partition_key, curr_primary_key),) = clickhouse.execute(
f"SELECT sampling_key, partition_key, primary_key FROM system.tables WHERE name = '{TABLE_NAME}' AND database = '{database}'"
)
).results

sampling_key_needs_update = curr_sampling_key != new_sampling_key
partition_key_needs_update = curr_partition_key != new_partition_key
Expand All @@ -54,7 +54,7 @@ def forwards(logger: logging.Logger) -> None:
# Create transactions_local_new and insert data
((curr_create_table_statement,),) = clickhouse.execute(
f"SHOW CREATE TABLE {database}.{TABLE_NAME}"
)
).results

new_create_table_statement = curr_create_table_statement.replace(
TABLE_NAME, TABLE_NAME_NEW
Expand Down Expand Up @@ -93,7 +93,7 @@ def forwards(logger: logging.Logger) -> None:
clickhouse.execute(new_create_table_statement)

# Copy over data in batches of 100,000
[(row_count,)] = clickhouse.execute(f"SELECT count() FROM {TABLE_NAME}")
[(row_count,)] = clickhouse.execute(f"SELECT count() FROM {TABLE_NAME}").results
batch_size = 100000
batch_count = math.ceil(row_count / batch_size)

Expand All @@ -116,9 +116,10 @@ def forwards(logger: logging.Logger) -> None:
clickhouse.execute(f"RENAME TABLE {TABLE_NAME_NEW} TO {TABLE_NAME};")

# Ensure each table has the same number of rows before deleting the old one
assert clickhouse.execute(
f"SELECT COUNT() FROM {TABLE_NAME} FINAL;"
) == clickhouse.execute(f"SELECT COUNT() FROM {TABLE_NAME_OLD} FINAL;")
assert (
clickhouse.execute(f"SELECT COUNT() FROM {TABLE_NAME} FINAL;").results
== clickhouse.execute(f"SELECT COUNT() FROM {TABLE_NAME_OLD} FINAL;").results
)

clickhouse.execute(f"DROP TABLE {TABLE_NAME_OLD};")

Expand All @@ -137,7 +138,7 @@ def backwards(logger: logging.Logger) -> None:
clickhouse = cluster.get_query_connection(ClickhouseClientSettings.MIGRATE)

def table_exists(table_name: str) -> bool:
return clickhouse.execute(f"EXISTS TABLE {table_name};") == [(1,)]
return clickhouse.execute(f"EXISTS TABLE {table_name};").results == [(1,)]

if not table_exists(TABLE_NAME):
raise Exception(f"Table {TABLE_NAME} is missing")
Expand Down
14 changes: 7 additions & 7 deletions snuba/optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def get_partitions_to_optimize(
table: str,
before: Optional[datetime] = None,
) -> Sequence[util.Part]:
engine = clickhouse.execute(
response = clickhouse.execute(
"""
SELECT engine
FROM system.tables
Expand All @@ -42,14 +42,14 @@ def get_partitions_to_optimize(
{"database": database, "table": table},
)

if not engine:
if not response.results:
logger.warning(
"Table %s.%s doesn't exist on %s:%s"
% (database, table, clickhouse.host, clickhouse.port)
)
return []

if engine[0][0].startswith("Replicated"):
if response.results[0][0].startswith("Replicated"):
is_leader = clickhouse.execute(
"""
SELECT is_leader
Expand All @@ -58,9 +58,10 @@ def get_partitions_to_optimize(
""",
{"database": database, "table": table},
)

if not is_leader:
return []
# response: [(0,)] for non-leader or [(1,)] for leader
if not (len(is_leader) == 1 and is_leader[0][0]):
if not (len(is_leader.results) == 1 and is_leader.results[0][0]):
return []

active_parts = clickhouse.execute(
Expand All @@ -78,13 +79,12 @@ def get_partitions_to_optimize(
""",
{"database": database, "table": table},
)

schema = storage.get_schema()
assert isinstance(schema, TableSchema)
part_format = schema.get_part_format()
assert part_format is not None

parts = [util.decode_part_str(part, part_format) for part, count in active_parts]
parts = [util.decode_part_str(part, part_format) for part, count in active_parts.results]

if before:
parts = [
Expand Down
2 changes: 1 addition & 1 deletion snuba/replacer.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ def flush_batch(self, batch: Sequence[Replacement]) -> None:
count_query = replacement.get_count_query(table_name)

if count_query is not None:
count = clickhouse_read.execute_robust(count_query)[0][0]
count = clickhouse_read.execute_robust(count_query).results[0][0]
if count == 0:
continue
else:
Expand Down
4 changes: 2 additions & 2 deletions snuba/snapshots/loaders/single_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ def __init__(
self.__clickhouse = clickhouse

def __validate_table(self, ignore_existing_data: bool) -> None:
clickhouse_tables = self.__clickhouse.execute("show tables")
clickhouse_tables = self.__clickhouse.execute("show tables").results
if (self.__dest_table,) not in clickhouse_tables:
raise ValueError("Destination table %s does not exists" % self.__dest_table)

if not ignore_existing_data:
table_content = self.__clickhouse.execute(
"select count(*) from %s" % self.__dest_table
)
).results
if table_content != [(0,)]:
raise ValueError("Destination Table is not empty")

Expand Down
2 changes: 1 addition & 1 deletion snuba/web/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def check_clickhouse() -> bool:

for (cluster_key, cluster) in unique_clusters.items():
clickhouse = cluster.get_query_connection(ClickhouseClientSettings.QUERY)
clickhouse_tables = clickhouse.execute("show tables")
clickhouse_tables = clickhouse.execute("show tables").results
known_table_names = connection_grouped_table_names[cluster_key]
logger.debug(f"checking for {known_table_names} on {cluster_key}")
for table in known_table_names:
Expand Down
8 changes: 4 additions & 4 deletions tests/clusters/fake_cluster.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Any, List, Mapping, MutableMapping, Optional, Sequence, Set, Tuple

from snuba.clickhouse.native import ClickhousePool, Params
from snuba.clickhouse.native import ClickhousePool, ClickhouseResult, Params
from snuba.clusters.cluster import (
ClickhouseClientSettings,
ClickhouseCluster,
Expand All @@ -27,9 +27,9 @@ def execute(
settings: Optional[Mapping[str, Any]] = None,
types_check: bool = False,
columnar: bool = False,
) -> Sequence[Any]:
) -> ClickhouseResult:
self.__queries.append(query)
return [[1]]
return ClickhouseResult([[1]])

def get_queries(self) -> Sequence[str]:
return self.__queries
Expand All @@ -49,7 +49,7 @@ def execute(
settings: Optional[Mapping[str, Any]] = None,
types_check: bool = False,
columnar: bool = False,
) -> Sequence[Any]:
) -> ClickhouseResult:
raise ServerExplodedException("The server exploded")

def get_queries(self) -> Sequence[str]:
Expand Down
Loading