diff --git a/snuba/cleanup.py b/snuba/cleanup.py index e983e2a4ad..5399312032 100644 --- a/snuba/cleanup.py +++ b/snuba/cleanup.py @@ -44,7 +44,7 @@ def get_active_partitions( assert isinstance(schema, TableSchema) part_format = schema.get_part_format() assert part_format is not None - return [util.decode_part_str(part, part_format) for part, in response] + return [util.decode_part_str(part, part_format) for part, in response.results] def filter_stale_partitions( diff --git a/snuba/clickhouse/native.py b/snuba/clickhouse/native.py index 5d14965591..0ef5c49530 100644 --- a/snuba/clickhouse/native.py +++ b/snuba/clickhouse/native.py @@ -1,7 +1,10 @@ +from __future__ import annotations + import logging import queue import re import time +from dataclasses import dataclass, field from datetime import date, datetime from typing import Any, Mapping, Optional, Sequence, Union from uuid import UUID @@ -23,6 +26,12 @@ metrics = MetricsWrapper(environment.metrics, "clickhouse.native") +@dataclass(frozen=True) +class ClickhouseResult: + results: Sequence[Any] = field(default_factory=list) + meta: Sequence[Any] | None = None + + class ClickhousePool(object): def __init__( self, @@ -63,7 +72,7 @@ def execute( settings: Optional[Mapping[str, Any]] = None, types_check: bool = False, columnar: bool = False, - ) -> Sequence[Any]: + ) -> ClickhouseResult: """ Execute a clickhouse query with a single quick retry in case of connection failure. @@ -84,7 +93,7 @@ def execute( conn = self._create_conn() try: - result: Sequence[Any] = conn.execute( + result_data: Sequence[Any] = conn.execute( query, params=params, with_column_types=with_column_types, @@ -93,6 +102,15 @@ def execute( types_check=types_check, columnar=columnar, ) + if with_column_types: + result = ClickhouseResult( + results=result_data[0], meta=result_data[1], + ) + else: + if not isinstance(result_data, (list, tuple)): + result_data = [result_data] + result = ClickhouseResult(results=result_data) + return result except (errors.NetworkError, errors.SocketTimeoutError, EOFError) as e: metrics.increment("connection_error") @@ -113,7 +131,7 @@ def execute( finally: self.pool.put(conn, block=False) - return [] + return ClickhouseResult() def execute_robust( self, @@ -124,7 +142,7 @@ def execute_robust( settings: Optional[Mapping[str, Any]] = None, types_check: bool = False, columnar: bool = False, - ) -> Sequence[Any]: + ) -> ClickhouseResult: """ Execute a clickhouse query with a bit more tenacity. Make more retry attempts, (infinite in the case of too many simultaneous queries @@ -244,13 +262,13 @@ class NativeDriverReader(Reader): def __init__(self, client: ClickhousePool) -> None: self.__client = client - def __transform_result(self, result: Sequence[Any], with_totals: bool) -> Result: + def __transform_result(self, result: ClickhouseResult, with_totals: bool) -> Result: """ Transform a native driver response into a response that is structurally similar to a ClickHouse-flavored JSON response. """ - data, meta = result - + meta = result.meta if result.meta is not None else [] + data = result.results # XXX: Rows are represented as mappings that are keyed by column or # alias, which is problematic when the result set contains duplicate # names. To ensure that the column headers and row data are consistent @@ -269,7 +287,11 @@ def __transform_result(self, result: Sequence[Any], with_totals: bool) -> Result if with_totals: assert len(data) > 0 totals = data.pop(-1) - new_result = {"data": data, "meta": meta, "totals": totals} + new_result = { + "data": data, + "meta": meta, + "totals": totals, + } else: new_result = {"data": data, "meta": meta} diff --git a/snuba/clusters/cluster.py b/snuba/clusters/cluster.py index 6a45e25069..6189654c74 100644 --- a/snuba/clusters/cluster.py +++ b/snuba/clusters/cluster.py @@ -295,7 +295,7 @@ def __get_cluster_nodes(self, cluster_name: str) -> Sequence[ClickhouseNode]: ClickhouseClientSettings.QUERY ).execute( f"select host_name, port, shard_num, replica_num from system.clusters where cluster={escape_string(cluster_name)}" - ) + ).results ] diff --git a/snuba/migrations/connect.py b/snuba/migrations/connect.py index 80de48f40b..b06e90d2f4 100644 --- a/snuba/migrations/connect.py +++ b/snuba/migrations/connect.py @@ -46,7 +46,7 @@ def check_clickhouse_connections() -> None: def check_clickhouse(clickhouse: ClickhousePool) -> None: - ver = clickhouse.execute("SELECT version()")[0][0] + ver = clickhouse.execute("SELECT version()").results[0][0] if version.parse(ver) < version.parse(CLICKHOUSE_SERVER_MIN_VERSION): raise InvalidClickhouseVersion( f"Snuba requires Clickhouse version {CLICKHOUSE_SERVER_MIN_VERSION}" diff --git a/snuba/migrations/parse_schema.py b/snuba/migrations/parse_schema.py index 51a8266883..88a8e40263 100644 --- a/snuba/migrations/parse_schema.py +++ b/snuba/migrations/parse_schema.py @@ -220,6 +220,6 @@ def get_local_schema( return { column_name: _get_column(column_type, default_type, default_expr, codec_expr) for column_name, column_type, default_type, default_expr, _comment, codec_expr in [ - cols[:6] for cols in conn.execute("DESCRIBE TABLE %s" % table_name) + cols[:6] for cols in conn.execute("DESCRIBE TABLE %s" % table_name).results ] } diff --git a/snuba/migrations/runner.py b/snuba/migrations/runner.py index b3358e5df4..c4b23b784c 100644 --- a/snuba/migrations/runner.py +++ b/snuba/migrations/runner.py @@ -90,7 +90,7 @@ def get_status( "group": migration_key.group.value, "migration_id": migration_key.migration_id, }, - ) + ).results if data: status, timestamp = data[0] @@ -331,7 +331,7 @@ def _get_next_version(self, migration_key: MigrationKey) -> int: "group": migration_key.group.value, "migration_id": migration_key.migration_id, }, - ) + ).results if result: (version,) = result[0] return int(version) + 1 @@ -356,7 +356,7 @@ def _get_migration_status(self) -> Mapping[MigrationKey, Status]: try: for row in self.__connection.execute( f"SELECT group, migration_id, status FROM {self.__table_name} FINAL WHERE group IN {migration_groups}" - ): + ).results: group_name, migration_id, status_name = row data[MigrationKey(MigrationGroup(group_name), migration_id)] = Status( status_name diff --git a/snuba/migrations/snuba_migrations/events/0010_groupedmessages_onpremise_compatibility.py b/snuba/migrations/snuba_migrations/events/0010_groupedmessages_onpremise_compatibility.py index 119d781940..f843715beb 100644 --- a/snuba/migrations/snuba_migrations/events/0010_groupedmessages_onpremise_compatibility.py +++ b/snuba/migrations/snuba_migrations/events/0010_groupedmessages_onpremise_compatibility.py @@ -25,7 +25,7 @@ def fix_order_by(_logger: logging.Logger) -> None: ((curr_primary_key,),) = clickhouse.execute( f"SELECT primary_key FROM system.tables WHERE name = '{TABLE_NAME}' AND database = '{database}'" - ) + ).results assert curr_primary_key in [ new_primary_key, @@ -47,7 +47,7 @@ def fix_order_by(_logger: logging.Logger) -> None: # There shouldn't be any data in the table yet assert ( - clickhouse.execute(f"SELECT COUNT() FROM {TABLE_NAME} FINAL;")[0][0] == 0 + clickhouse.execute(f"SELECT COUNT() FROM {TABLE_NAME} FINAL;").results[0][0] == 0 ), f"{TABLE_NAME} is not empty" new_order_by = f"ORDER BY ({new_primary_key})" @@ -55,7 +55,7 @@ def fix_order_by(_logger: logging.Logger) -> None: ((curr_create_table_statement,),) = clickhouse.execute( f"SHOW CREATE TABLE {database}.{TABLE_NAME}" - ) + ).results new_create_table_statement = curr_create_table_statement.replace( TABLE_NAME, TABLE_NAME_NEW diff --git a/snuba/migrations/snuba_migrations/events/0014_backfill_errors.py b/snuba/migrations/snuba_migrations/events/0014_backfill_errors.py index 0ad1e9fbc2..316b5a17a7 100644 --- a/snuba/migrations/snuba_migrations/events/0014_backfill_errors.py +++ b/snuba/migrations/snuba_migrations/events/0014_backfill_errors.py @@ -147,7 +147,7 @@ def backfill_errors(logger: logging.Logger) -> None: ORDER BY timestamp ASC, project_id ASC LIMIT 1 """ - )[0] + ).results[0] logger.info("Error data was found") except IndexError: @@ -199,7 +199,7 @@ def backfill_errors(logger: logging.Logger) -> None: ORDER BY c DESC, partition """, {"database": database_name, "table": errors_table_name}, - ) + ).results for partition, _count in partitions: clickhouse.execute( diff --git a/snuba/migrations/snuba_migrations/transactions/0002_transactions_onpremise_fix_orderby_and_partitionby.py b/snuba/migrations/snuba_migrations/transactions/0002_transactions_onpremise_fix_orderby_and_partitionby.py index e606c0e430..a7d3076f3d 100644 --- a/snuba/migrations/snuba_migrations/transactions/0002_transactions_onpremise_fix_orderby_and_partitionby.py +++ b/snuba/migrations/snuba_migrations/transactions/0002_transactions_onpremise_fix_orderby_and_partitionby.py @@ -35,7 +35,7 @@ def forwards(logger: logging.Logger) -> None: ((curr_sampling_key, curr_partition_key, curr_primary_key),) = clickhouse.execute( f"SELECT sampling_key, partition_key, primary_key FROM system.tables WHERE name = '{TABLE_NAME}' AND database = '{database}'" - ) + ).results sampling_key_needs_update = curr_sampling_key != new_sampling_key partition_key_needs_update = curr_partition_key != new_partition_key @@ -54,7 +54,7 @@ def forwards(logger: logging.Logger) -> None: # Create transactions_local_new and insert data ((curr_create_table_statement,),) = clickhouse.execute( f"SHOW CREATE TABLE {database}.{TABLE_NAME}" - ) + ).results new_create_table_statement = curr_create_table_statement.replace( TABLE_NAME, TABLE_NAME_NEW @@ -93,7 +93,7 @@ def forwards(logger: logging.Logger) -> None: clickhouse.execute(new_create_table_statement) # Copy over data in batches of 100,000 - [(row_count,)] = clickhouse.execute(f"SELECT count() FROM {TABLE_NAME}") + [(row_count,)] = clickhouse.execute(f"SELECT count() FROM {TABLE_NAME}").results batch_size = 100000 batch_count = math.ceil(row_count / batch_size) @@ -116,9 +116,10 @@ def forwards(logger: logging.Logger) -> None: clickhouse.execute(f"RENAME TABLE {TABLE_NAME_NEW} TO {TABLE_NAME};") # Ensure each table has the same number of rows before deleting the old one - assert clickhouse.execute( - f"SELECT COUNT() FROM {TABLE_NAME} FINAL;" - ) == clickhouse.execute(f"SELECT COUNT() FROM {TABLE_NAME_OLD} FINAL;") + assert ( + clickhouse.execute(f"SELECT COUNT() FROM {TABLE_NAME} FINAL;").results + == clickhouse.execute(f"SELECT COUNT() FROM {TABLE_NAME_OLD} FINAL;").results + ) clickhouse.execute(f"DROP TABLE {TABLE_NAME_OLD};") @@ -137,7 +138,7 @@ def backwards(logger: logging.Logger) -> None: clickhouse = cluster.get_query_connection(ClickhouseClientSettings.MIGRATE) def table_exists(table_name: str) -> bool: - return clickhouse.execute(f"EXISTS TABLE {table_name};") == [(1,)] + return clickhouse.execute(f"EXISTS TABLE {table_name};").results == [(1,)] if not table_exists(TABLE_NAME): raise Exception(f"Table {TABLE_NAME} is missing") diff --git a/snuba/optimize.py b/snuba/optimize.py index 5ea1e63969..fd057cfe2f 100644 --- a/snuba/optimize.py +++ b/snuba/optimize.py @@ -33,7 +33,7 @@ def get_partitions_to_optimize( table: str, before: Optional[datetime] = None, ) -> Sequence[util.Part]: - engine = clickhouse.execute( + response = clickhouse.execute( """ SELECT engine FROM system.tables @@ -42,14 +42,14 @@ def get_partitions_to_optimize( {"database": database, "table": table}, ) - if not engine: + if not response.results: logger.warning( "Table %s.%s doesn't exist on %s:%s" % (database, table, clickhouse.host, clickhouse.port) ) return [] - if engine[0][0].startswith("Replicated"): + if response.results[0][0].startswith("Replicated"): is_leader = clickhouse.execute( """ SELECT is_leader @@ -58,9 +58,10 @@ def get_partitions_to_optimize( """, {"database": database, "table": table}, ) - + if not is_leader: + return [] # response: [(0,)] for non-leader or [(1,)] for leader - if not (len(is_leader) == 1 and is_leader[0][0]): + if not (len(is_leader.results) == 1 and is_leader.results[0][0]): return [] active_parts = clickhouse.execute( @@ -78,13 +79,12 @@ def get_partitions_to_optimize( """, {"database": database, "table": table}, ) - schema = storage.get_schema() assert isinstance(schema, TableSchema) part_format = schema.get_part_format() assert part_format is not None - parts = [util.decode_part_str(part, part_format) for part, count in active_parts] + parts = [util.decode_part_str(part, part_format) for part, count in active_parts.results] if before: parts = [ diff --git a/snuba/replacer.py b/snuba/replacer.py index 064acae065..d2275a5ff4 100644 --- a/snuba/replacer.py +++ b/snuba/replacer.py @@ -353,7 +353,7 @@ def flush_batch(self, batch: Sequence[Replacement]) -> None: count_query = replacement.get_count_query(table_name) if count_query is not None: - count = clickhouse_read.execute_robust(count_query)[0][0] + count = clickhouse_read.execute_robust(count_query).results[0][0] if count == 0: continue else: diff --git a/snuba/snapshots/loaders/single_table.py b/snuba/snapshots/loaders/single_table.py index a2eeda958c..b9a7119732 100644 --- a/snuba/snapshots/loaders/single_table.py +++ b/snuba/snapshots/loaders/single_table.py @@ -32,14 +32,14 @@ def __init__( self.__clickhouse = clickhouse def __validate_table(self, ignore_existing_data: bool) -> None: - clickhouse_tables = self.__clickhouse.execute("show tables") + clickhouse_tables = self.__clickhouse.execute("show tables").results if (self.__dest_table,) not in clickhouse_tables: raise ValueError("Destination table %s does not exists" % self.__dest_table) if not ignore_existing_data: table_content = self.__clickhouse.execute( "select count(*) from %s" % self.__dest_table - ) + ).results if table_content != [(0,)]: raise ValueError("Destination Table is not empty") diff --git a/snuba/web/views.py b/snuba/web/views.py index 27f0607e25..f294ce80da 100644 --- a/snuba/web/views.py +++ b/snuba/web/views.py @@ -124,7 +124,7 @@ def check_clickhouse() -> bool: for (cluster_key, cluster) in unique_clusters.items(): clickhouse = cluster.get_query_connection(ClickhouseClientSettings.QUERY) - clickhouse_tables = clickhouse.execute("show tables") + clickhouse_tables = clickhouse.execute("show tables").results known_table_names = connection_grouped_table_names[cluster_key] logger.debug(f"checking for {known_table_names} on {cluster_key}") for table in known_table_names: diff --git a/tests/clusters/fake_cluster.py b/tests/clusters/fake_cluster.py index 6043092fb6..612995ebd0 100644 --- a/tests/clusters/fake_cluster.py +++ b/tests/clusters/fake_cluster.py @@ -1,6 +1,6 @@ from typing import Any, List, Mapping, MutableMapping, Optional, Sequence, Set, Tuple -from snuba.clickhouse.native import ClickhousePool, Params +from snuba.clickhouse.native import ClickhousePool, ClickhouseResult, Params from snuba.clusters.cluster import ( ClickhouseClientSettings, ClickhouseCluster, @@ -27,9 +27,9 @@ def execute( settings: Optional[Mapping[str, Any]] = None, types_check: bool = False, columnar: bool = False, - ) -> Sequence[Any]: + ) -> ClickhouseResult: self.__queries.append(query) - return [[1]] + return ClickhouseResult([[1]]) def get_queries(self) -> Sequence[str]: return self.__queries @@ -49,7 +49,7 @@ def execute( settings: Optional[Mapping[str, Any]] = None, types_check: bool = False, columnar: bool = False, - ) -> Sequence[Any]: + ) -> ClickhouseResult: raise ServerExplodedException("The server exploded") def get_queries(self) -> Sequence[str]: diff --git a/tests/clusters/test_cluster.py b/tests/clusters/test_cluster.py index 6e6b6da552..b0572a894d 100644 --- a/tests/clusters/test_cluster.py +++ b/tests/clusters/test_cluster.py @@ -4,7 +4,7 @@ import pytest from snuba import settings -from snuba.clickhouse.native import ClickhousePool +from snuba.clickhouse.native import ClickhousePool, ClickhouseResult from snuba.clusters import cluster from snuba.clusters.storage_sets import StorageSetKey from snuba.datasets.storages import StorageKey @@ -119,10 +119,9 @@ def test_disabled_cluster() -> None: def test_get_local_nodes() -> None: importlib.reload(cluster) with patch.object(ClickhousePool, "execute") as execute: - execute.return_value = [ - ("host_1", 9000, 1, 1), - ("host_2", 9000, 2, 1), - ] + execute.return_value = ClickhouseResult( + [("host_1", 9000, 1, 1), ("host_2", 9000, 2, 1)] + ) local_cluster = get_storage(StorageKey("events")).get_cluster() assert len(local_cluster.get_local_nodes()) == 1 diff --git a/tests/datasets/cdc/test_groupassignee.py b/tests/datasets/cdc/test_groupassignee.py index c37acf4c54..b9438fcc38 100644 --- a/tests/datasets/cdc/test_groupassignee.py +++ b/tests/datasets/cdc/test_groupassignee.py @@ -171,7 +171,7 @@ def test_messages(self) -> None: ret = ( self.storage.get_cluster() .get_query_connection(ClickhouseClientSettings.QUERY) - .execute("SELECT * FROM groupassignee_local;") + .execute("SELECT * FROM groupassignee_local;").results ) assert ret[0] == ( 42, # offset @@ -217,7 +217,7 @@ def test_bulk_load(self) -> None: ret = ( self.storage.get_cluster() .get_query_connection(ClickhouseClientSettings.QUERY) - .execute("SELECT * FROM groupassignee_local;") + .execute("SELECT * FROM groupassignee_local;").results ) assert ret[0] == ( 0, # offset diff --git a/tests/datasets/cdc/test_groupedmessage.py b/tests/datasets/cdc/test_groupedmessage.py index e53e039d72..416e97e13a 100644 --- a/tests/datasets/cdc/test_groupedmessage.py +++ b/tests/datasets/cdc/test_groupedmessage.py @@ -237,7 +237,7 @@ def test_messages(self) -> None: ret = ( get_cluster(StorageSetKey.EVENTS) .get_query_connection(ClickhouseClientSettings.INSERT) - .execute("SELECT * FROM groupedmessage_local;") + .execute("SELECT * FROM groupedmessage_local;").results ) assert ret[0] == ( 42, # offset @@ -279,7 +279,7 @@ def test_bulk_load(self) -> None: ret = ( get_cluster(StorageSetKey.EVENTS) .get_query_connection(ClickhouseClientSettings.QUERY) - .execute("SELECT * FROM groupedmessage_local;") + .execute("SELECT * FROM groupedmessage_local;").results ) assert ret[0] == ( 0, # offset diff --git a/tests/datasets/test_events.py b/tests/datasets/test_events.py index ccf3f4bd97..4fac6d547a 100644 --- a/tests/datasets/test_events.py +++ b/tests/datasets/test_events.py @@ -37,7 +37,7 @@ def test_tags_hash_map(self) -> None: hashed = clickhouse.execute( "SELECT cityHash64('test_tag1=value1'), cityHash64('test_tag\\\\=2=value2')" - ) + ).results tag1, tag2 = hashed[0] event = clickhouse.execute( @@ -45,7 +45,7 @@ def test_tags_hash_map(self) -> None: f"SELECT replaceAll(toString(event_id), '-', '') FROM {table_name} WHERE has(_tags_hash_map, {tag1}) " f"AND has(_tags_hash_map, {tag2})" ) - ) + ).results assert len(event) == 1 assert event[0][0] == self.event["data"]["id"] diff --git a/tests/migrations/test_runner.py b/tests/migrations/test_runner.py index bbcb381d32..9646348324 100644 --- a/tests/migrations/test_runner.py +++ b/tests/migrations/test_runner.py @@ -24,7 +24,7 @@ def _drop_all_tables() -> None: data = connection.execute( f"SELECT name FROM system.tables WHERE database = '{database}'" - ) + ).results for (table,) in data: connection.execute(f"DROP TABLE IF EXISTS {table}") @@ -82,7 +82,7 @@ def test_run_migration() -> None: ) assert connection.execute( "SELECT group, migration_id, status, version FROM migrations_local;" - ) == [("system", "0001_migrations", "completed", 1)] + ).results == [("system", "0001_migrations", "completed", 1)] # Invalid migration ID with pytest.raises(MigrationError): @@ -96,7 +96,7 @@ def test_run_migration() -> None: runner.run_migration( MigrationKey(MigrationGroup.EVENTS, "0001_events_initial"), fake=True ) - assert connection.execute("SHOW TABLES LIKE 'sentry_local'") == [] + assert connection.execute("SHOW TABLES LIKE 'sentry_local'").results == [] def test_reverse_migration() -> None: @@ -122,7 +122,7 @@ def test_reverse_migration() -> None: MigrationKey(MigrationGroup.EVENTS, migration_id), fake=True ) assert ( - len(connection.execute("SHOW TABLES LIKE 'sentry_local'")) == 1 + len(connection.execute("SHOW TABLES LIKE 'sentry_local'").results) == 1 ), "Table still exists" @@ -155,7 +155,7 @@ def test_reverse_all() -> None: connection = get_cluster(StorageSetKey.MIGRATIONS).get_query_connection( ClickhouseClientSettings.MIGRATE ) - assert connection.execute("SHOW TABLES") == [], "All tables should be deleted" + assert connection.execute("SHOW TABLES").results == [], "All tables should be deleted" def get_total_migration_count() -> int: @@ -213,4 +213,4 @@ def test_settings_skipped_group() -> None: connection = get_cluster(StorageSetKey.MIGRATIONS).get_query_connection( ClickhouseClientSettings.MIGRATE ) - assert connection.execute("SHOW TABLES LIKE 'querylog_local'") == [] + assert connection.execute("SHOW TABLES LIKE 'querylog_local'").results == [] diff --git a/tests/migrations/test_runner_add_node.py b/tests/migrations/test_runner_add_node.py index 6a6e3e5fdf..9af2d98480 100644 --- a/tests/migrations/test_runner_add_node.py +++ b/tests/migrations/test_runner_add_node.py @@ -32,7 +32,7 @@ def setup_function() -> None: data = connection.execute( f"SELECT name FROM system.tables WHERE database = '{database}'" - ) + ).results for (table,) in data: connection.execute(f"DROP TABLE IF EXISTS {table}") @@ -53,7 +53,7 @@ def test_add_node() -> None: client = ClickhousePool(host_name, port, user, password, database,) - assert set(client.execute("SHOW TABLES")) == set() + assert set(client.execute("SHOW TABLES").results) == set() runner.Runner.add_node( node_type=cluster.ClickhouseNodeType.LOCAL, @@ -65,7 +65,7 @@ def test_add_node() -> None: database=database, ) - assert set(client.execute("SHOW TABLES")) == { + assert set(client.execute("SHOW TABLES").results) == { ("outcomes_raw_local",), ("outcomes_hourly_local",), ("outcomes_mv_hourly_local",), diff --git a/tests/migrations/test_runner_individual.py b/tests/migrations/test_runner_individual.py index f429e4d602..b539d242d1 100644 --- a/tests/migrations/test_runner_individual.py +++ b/tests/migrations/test_runner_individual.py @@ -246,7 +246,7 @@ def perform_select_query( limit_clause = (" LIMIT " + limit) if limit else "" full_query = select_clause + from_clause + where_clause + limit_clause - return connection.execute(full_query) + return connection.execute(full_query).results def get_count_from_storage(table_name: str, connection: ClickhousePool) -> int: diff --git a/tests/subscriptions/test_executor_consumer.py b/tests/subscriptions/test_executor_consumer.py index 370dd72926..6b9f1a3137 100644 --- a/tests/subscriptions/test_executor_consumer.py +++ b/tests/subscriptions/test_executor_consumer.py @@ -161,10 +161,10 @@ def test_execute_query_strategy() -> None: assert next_step.submit.call_args[0][0].timestamp == message.timestamp assert next_step.submit.call_args[0][0].offset == message.offset - assert next_step.submit.call_args[0][0].payload.result[1] == { - "data": [{"count()": 0}], - "meta": [{"name": "count()", "type": "UInt64"}], - } + + result = next_step.submit.call_args[0][0].payload.result + assert result[1]["data"] == [{"count()": 0}] + assert result[1]["meta"] == [{"name": "count()", "type": "UInt64"}] strategy.close() strategy.join() diff --git a/tests/test_api.py b/tests/test_api.py index 090bd178bb..a2342de5ad 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -179,7 +179,7 @@ def test_count(self) -> None: .get_cluster() .get_query_connection(ClickhouseClientSettings.QUERY) ) - res = clickhouse.execute("SELECT count() FROM %s" % self.table) + res = clickhouse.execute("SELECT count() FROM %s" % self.table).results assert res[0][0] == 330 rollup_mins = 60 @@ -2063,17 +2063,17 @@ def test_test_endpoints(self) -> None: ) # There is data in the events table - assert len(clickhouse.execute(f"SELECT * FROM {self.table}")) > 0 + assert len(clickhouse.execute(f"SELECT * FROM {self.table}").results) > 0 assert self.app.post("/tests/events/drop").status_code == 200 writer = storage.get_table_writer() table = writer.get_schema().get_table_name() - assert table not in clickhouse.execute("SHOW TABLES") + assert table not in clickhouse.execute("SHOW TABLES").results assert self.redis_db_size() == 0 # No data in events table - assert len(clickhouse.execute(f"SELECT * FROM {self.table}")) == 0 + assert len(clickhouse.execute(f"SELECT * FROM {self.table}").results) == 0 def test_max_limit(self) -> None: with pytest.raises(Exception): diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 95aa4527c2..7d6b473eff 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -131,7 +131,7 @@ def get_row_count(storage: Storage) -> int: return int( storage.get_cluster() .get_query_connection(ClickhouseClientSettings.INSERT) - .execute(f"SELECT count() FROM {schema.get_local_table_name()}")[0][0] + .execute(f"SELECT count() FROM {schema.get_local_table_name()}").results[0][0] ) diff --git a/tests/test_replacer.py b/tests/test_replacer.py index f13c5384d7..e43cbe99d3 100644 --- a/tests/test_replacer.py +++ b/tests/test_replacer.py @@ -73,7 +73,7 @@ def _issue_count(self, project_id: int): AND project_id = {project_id} GROUP BY group_id """ - ) + ).results return [{"group_id": row[0], "count": row[1]} for row in data] @@ -485,7 +485,7 @@ def _issue_count(total=False): {total_cond} GROUP BY group_id """ - ) + ).results return [{"group_id": row[0], "count": row[1]} for row in data]