From 8d2ac70cd16218d086c871ced02e3f736ad6bd1e Mon Sep 17 00:00:00 2001 From: Konstantin Volkov Date: Wed, 18 Sep 2024 19:11:31 +0300 Subject: [PATCH 1/3] Add profile events support --- clickhouse_driver/client.py | 8 ++++++-- clickhouse_driver/result.py | 12 ++++++++++++ tests/test_query_info.py | 27 +++++++++++++++++++++++++-- 3 files changed, 43 insertions(+), 4 deletions(-) diff --git a/clickhouse_driver/client.py b/clickhouse_driver/client.py index 0e573384..b2caf802 100644 --- a/clickhouse_driver/client.py +++ b/clickhouse_driver/client.py @@ -264,6 +264,10 @@ def receive_packet(self): self.last_query.store_profile(packet.profile_info) return True + elif packet.type == ServerPacketTypes.PROFILE_EVENTS: + self.last_query.store_profile_events(packet) + return True + else: return True @@ -691,7 +695,7 @@ def receive_end_of_query(self): pass elif packet.type == ServerPacketTypes.PROFILE_EVENTS: - self.last_query.store_profile(packet.profile_info) + self.last_query.store_profile_events(packet) else: message = self.connection.unexpected_packet_message( @@ -734,7 +738,7 @@ def receive_profile_events(self): packet = self.connection.receive_packet() if packet.type == ServerPacketTypes.PROFILE_EVENTS: - self.last_query.store_profile(packet.profile_info) + self.last_query.store_profile_events(packet) break elif packet.type == ServerPacketTypes.PROGRESS: diff --git a/clickhouse_driver/result.py b/clickhouse_driver/result.py index 0ed639c1..e0bb15aa 100644 --- a/clickhouse_driver/result.py +++ b/clickhouse_driver/result.py @@ -130,6 +130,7 @@ def __init__(self): self.profile_info = BlockStreamProfileInfo() self.progress = Progress() self.elapsed = 0 + self.stats = {} def store_profile(self, profile_info): self.profile_info = profile_info @@ -142,3 +143,14 @@ def store_progress(self, progress): def store_elapsed(self, elapsed): self.elapsed = elapsed + + def store_profile_events(self, packet): + data = QueryResult([packet]).get_result() + column_names = [i[0] for i in packet.block.columns_with_types] + for row in data: + item = dict(zip(column_names, row)) + name = item.get('name', '') + if item['type'] == 'increment': + self.stats[name] = self.stats.get(name, 0) + item['value'] + elif item['type'] == 'gauge': + self.stats[name] = item['value'] diff --git a/tests/test_query_info.py b/tests/test_query_info.py index 0b74344b..6bcdc78b 100644 --- a/tests/test_query_info.py +++ b/tests/test_query_info.py @@ -39,6 +39,9 @@ def test_store_last_query_after_execute(self): self.assertGreater(last_query.elapsed, 0) + self.assertEqual(last_query.stats['SelectQuery'], 1) + self.assertEqual(last_query.stats['SelectedRows'], 42) + def test_last_query_after_execute_iter(self): with self.sample_table(): list(self.client.execute_iter(self.sample_query)) @@ -57,6 +60,9 @@ def test_last_query_after_execute_iter(self): self.assertEqual(last_query.elapsed, 0) + self.assertEqual(last_query.stats['SelectQuery'], 1) + self.assertEqual(last_query.stats['SelectedRows'], 42) + def test_last_query_after_execute_with_progress(self): with self.sample_table(): progress = self.client.execute_with_progress(self.sample_query) @@ -77,6 +83,9 @@ def test_last_query_after_execute_with_progress(self): self.assertEqual(last_query.elapsed, 0) + self.assertEqual(last_query.stats['SelectQuery'], 1) + self.assertEqual(last_query.stats['SelectedRows'], 42) + def test_last_query_progress_total_rows(self): self.client.execute('SELECT number FROM numbers(10) LIMIT 10') @@ -96,6 +105,9 @@ def test_last_query_progress_total_rows(self): self.assertGreater(last_query.elapsed, 0) + self.assertEqual(last_query.stats['SelectQuery'], 1) + self.assertEqual(last_query.stats['SelectedRows'], 10) + def test_last_query_after_execute_insert(self): with self.sample_table(): self.client.execute('INSERT INTO test (foo) VALUES', @@ -111,14 +123,19 @@ def test_last_query_after_execute_insert(self): self.assertGreater(last_query.elapsed, 0) + self.assertEqual(last_query.stats['InsertQuery'], 1) + self.assertEqual(last_query.stats['InsertedRows'], 42) + def test_override_after_subsequent_queries(self): query = 'SELECT * FROM test WHERE foo < %(i)s ORDER BY foo LIMIT 5' with self.sample_table(): for i in range(1, 10): self.client.execute(query, {'i': i}) - profile_info = self.client.last_query.profile_info - self.assertEqual(profile_info.rows_before_limit, i) + last_query = self.client.last_query + self.assertEqual(last_query.profile_info.rows_before_limit, i) + self.assertEqual(last_query.stats['SelectQuery'], 1) + self.assertEqual(last_query.stats['SelectedRows'], 42) def test_reset_last_query(self): with self.sample_table(): @@ -150,6 +167,11 @@ def test_progress_info_increment(self): total_rows = 100000000 if self.server_version > (19, 4) else 0 self.assertEqual(last_query.progress.total_rows, total_rows) + last_query = self.client.last_query + self.assertEqual(last_query.stats['SelectQuery'], 1) + self.assertEqual(last_query.stats['SelectedRows'], 100000000) + self.assertEqual(last_query.stats['SelectedBytes'], 800000000) + def test_progress_info_ddl(self): self.client.execute('DROP TABLE IF EXISTS foo') @@ -162,3 +184,4 @@ def test_progress_info_ddl(self): self.assertEqual(last_query.progress.elapsed_ns, 0) self.assertGreater(last_query.elapsed, 0) + self.assertDictEqual(last_query.stats, {}) From 4ac7a81c1ce1989434767f8c8c7d636736b90bc9 Mon Sep 17 00:00:00 2001 From: Konstantin Volkov Date: Fri, 20 Sep 2024 16:49:29 +0300 Subject: [PATCH 2/3] Fix tests for old clickhouse versions --- tests/test_query_info.py | 52 ++++++++++++++++++++++++++++------------ 1 file changed, 37 insertions(+), 15 deletions(-) diff --git a/tests/test_query_info.py b/tests/test_query_info.py index 6bcdc78b..99d2e9a2 100644 --- a/tests/test_query_info.py +++ b/tests/test_query_info.py @@ -39,8 +39,11 @@ def test_store_last_query_after_execute(self): self.assertGreater(last_query.elapsed, 0) - self.assertEqual(last_query.stats['SelectQuery'], 1) - self.assertEqual(last_query.stats['SelectedRows'], 42) + if self.server_version > (22, 8): + self.assertEqual(last_query.stats['SelectQuery'], 1) + self.assertEqual(last_query.stats['SelectedRows'], 42) + else: + self.assertDictEqual(last_query.stats, {}) def test_last_query_after_execute_iter(self): with self.sample_table(): @@ -60,8 +63,11 @@ def test_last_query_after_execute_iter(self): self.assertEqual(last_query.elapsed, 0) - self.assertEqual(last_query.stats['SelectQuery'], 1) - self.assertEqual(last_query.stats['SelectedRows'], 42) + if self.server_version > (22, 8): + self.assertEqual(last_query.stats['SelectQuery'], 1) + self.assertEqual(last_query.stats['SelectedRows'], 42) + else: + self.assertDictEqual(last_query.stats, {}) def test_last_query_after_execute_with_progress(self): with self.sample_table(): @@ -83,8 +89,11 @@ def test_last_query_after_execute_with_progress(self): self.assertEqual(last_query.elapsed, 0) - self.assertEqual(last_query.stats['SelectQuery'], 1) - self.assertEqual(last_query.stats['SelectedRows'], 42) + if self.server_version > (22, 8): + self.assertEqual(last_query.stats['SelectQuery'], 1) + self.assertEqual(last_query.stats['SelectedRows'], 42) + else: + self.assertDictEqual(last_query.stats, {}) def test_last_query_progress_total_rows(self): self.client.execute('SELECT number FROM numbers(10) LIMIT 10') @@ -105,8 +114,11 @@ def test_last_query_progress_total_rows(self): self.assertGreater(last_query.elapsed, 0) - self.assertEqual(last_query.stats['SelectQuery'], 1) - self.assertEqual(last_query.stats['SelectedRows'], 10) + if self.server_version > (22, 8): + self.assertEqual(last_query.stats['SelectQuery'], 1) + self.assertEqual(last_query.stats['SelectedRows'], 10) + else: + self.assertDictEqual(last_query.stats, {}) def test_last_query_after_execute_insert(self): with self.sample_table(): @@ -123,8 +135,11 @@ def test_last_query_after_execute_insert(self): self.assertGreater(last_query.elapsed, 0) - self.assertEqual(last_query.stats['InsertQuery'], 1) - self.assertEqual(last_query.stats['InsertedRows'], 42) + if self.server_version > (22, 8): + self.assertEqual(last_query.stats['InsertQuery'], 1) + self.assertEqual(last_query.stats['InsertedRows'], 42) + else: + self.assertDictEqual(last_query.stats, {}) def test_override_after_subsequent_queries(self): query = 'SELECT * FROM test WHERE foo < %(i)s ORDER BY foo LIMIT 5' @@ -134,8 +149,12 @@ def test_override_after_subsequent_queries(self): last_query = self.client.last_query self.assertEqual(last_query.profile_info.rows_before_limit, i) - self.assertEqual(last_query.stats['SelectQuery'], 1) - self.assertEqual(last_query.stats['SelectedRows'], 42) + + if self.server_version > (22, 8): + self.assertEqual(last_query.stats['SelectQuery'], 1) + self.assertEqual(last_query.stats['SelectedRows'], 42) + else: + self.assertDictEqual(last_query.stats, {}) def test_reset_last_query(self): with self.sample_table(): @@ -168,9 +187,12 @@ def test_progress_info_increment(self): self.assertEqual(last_query.progress.total_rows, total_rows) last_query = self.client.last_query - self.assertEqual(last_query.stats['SelectQuery'], 1) - self.assertEqual(last_query.stats['SelectedRows'], 100000000) - self.assertEqual(last_query.stats['SelectedBytes'], 800000000) + if self.server_version > (22, 8): + self.assertEqual(last_query.stats['SelectQuery'], 1) + self.assertEqual(last_query.stats['SelectedRows'], 100000000) + self.assertEqual(last_query.stats['SelectedBytes'], 800000000) + else: + self.assertDictEqual(last_query.stats, {}) def test_progress_info_ddl(self): self.client.execute('DROP TABLE IF EXISTS foo') From 03f0995909342b354cbdfa4bf1fa9bef15ec48d9 Mon Sep 17 00:00:00 2001 From: Konstantin Volkov Date: Mon, 23 Sep 2024 19:14:17 +0300 Subject: [PATCH 3/3] Fix clickhouse statistics min version in tests --- tests/test_query_info.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/test_query_info.py b/tests/test_query_info.py index 99d2e9a2..51feff5e 100644 --- a/tests/test_query_info.py +++ b/tests/test_query_info.py @@ -39,7 +39,7 @@ def test_store_last_query_after_execute(self): self.assertGreater(last_query.elapsed, 0) - if self.server_version > (22, 8): + if self.server_version >= (21, 12): self.assertEqual(last_query.stats['SelectQuery'], 1) self.assertEqual(last_query.stats['SelectedRows'], 42) else: @@ -63,7 +63,7 @@ def test_last_query_after_execute_iter(self): self.assertEqual(last_query.elapsed, 0) - if self.server_version > (22, 8): + if self.server_version >= (21, 12): self.assertEqual(last_query.stats['SelectQuery'], 1) self.assertEqual(last_query.stats['SelectedRows'], 42) else: @@ -89,7 +89,7 @@ def test_last_query_after_execute_with_progress(self): self.assertEqual(last_query.elapsed, 0) - if self.server_version > (22, 8): + if self.server_version >= (21, 12): self.assertEqual(last_query.stats['SelectQuery'], 1) self.assertEqual(last_query.stats['SelectedRows'], 42) else: @@ -114,7 +114,7 @@ def test_last_query_progress_total_rows(self): self.assertGreater(last_query.elapsed, 0) - if self.server_version > (22, 8): + if self.server_version >= (21, 12): self.assertEqual(last_query.stats['SelectQuery'], 1) self.assertEqual(last_query.stats['SelectedRows'], 10) else: @@ -135,7 +135,7 @@ def test_last_query_after_execute_insert(self): self.assertGreater(last_query.elapsed, 0) - if self.server_version > (22, 8): + if self.server_version >= (21, 12): self.assertEqual(last_query.stats['InsertQuery'], 1) self.assertEqual(last_query.stats['InsertedRows'], 42) else: @@ -150,7 +150,7 @@ def test_override_after_subsequent_queries(self): last_query = self.client.last_query self.assertEqual(last_query.profile_info.rows_before_limit, i) - if self.server_version > (22, 8): + if self.server_version >= (21, 12): self.assertEqual(last_query.stats['SelectQuery'], 1) self.assertEqual(last_query.stats['SelectedRows'], 42) else: @@ -187,7 +187,7 @@ def test_progress_info_increment(self): self.assertEqual(last_query.progress.total_rows, total_rows) last_query = self.client.last_query - if self.server_version > (22, 8): + if self.server_version >= (21, 12): self.assertEqual(last_query.stats['SelectQuery'], 1) self.assertEqual(last_query.stats['SelectedRows'], 100000000) self.assertEqual(last_query.stats['SelectedBytes'], 800000000)