Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add profile events support #455

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions clickhouse_driver/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ def receive_packet(self):
self.last_query.store_profile(packet.profile_info)
return True

elif packet.type == ServerPacketTypes.PROFILE_EVENTS:
self.last_query.store_profile_events(packet)
return True

else:
return True

Expand Down Expand Up @@ -691,7 +695,7 @@ def receive_end_of_query(self):
pass

elif packet.type == ServerPacketTypes.PROFILE_EVENTS:
self.last_query.store_profile(packet.profile_info)
self.last_query.store_profile_events(packet)

else:
message = self.connection.unexpected_packet_message(
Expand Down Expand Up @@ -734,7 +738,7 @@ def receive_profile_events(self):
packet = self.connection.receive_packet()

if packet.type == ServerPacketTypes.PROFILE_EVENTS:
self.last_query.store_profile(packet.profile_info)
self.last_query.store_profile_events(packet)
break

elif packet.type == ServerPacketTypes.PROGRESS:
Expand Down
12 changes: 12 additions & 0 deletions clickhouse_driver/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def __init__(self):
self.profile_info = BlockStreamProfileInfo()
self.progress = Progress()
self.elapsed = 0
self.stats = {}

def store_profile(self, profile_info):
self.profile_info = profile_info
Expand All @@ -142,3 +143,14 @@ def store_progress(self, progress):

def store_elapsed(self, elapsed):
self.elapsed = elapsed

def store_profile_events(self, packet):
data = QueryResult([packet]).get_result()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we have 'static' attributes that stores statistics: https://clickhouse-driver.readthedocs.io/en/latest/features.html#query-execution-statistics: client.last_query.progress.total_rows, client.last_query.progress.total_bytes, etc.

I'd prefer to store statistics in the same way if it's possible: client.last_query.stats.select_query, client.last_query.stats.selected_rows.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use content of this data for analyzing queries, and they may be very different. Metrics here vary on query type and even queried table engine. I guess server version may have effect too (I use only v.23 at this moment). So, finally, that is not stable list of metrics. And number of options too big (may be >100 finally).
I found that ~20 of them most common for queries and most intersting. I use pydantic model to get them:

class ClickhouseStats(pydantic.BaseModel):
    elapsed: int = pydantic.Field(alias="elapsed")
    is_insert: int | None = pydantic.Field(alias="InsertQuery", default=None)
    read_bytes: int | None = pydantic.Field(alias="ReadCompressedBytes", default=None)
    write_bytes: int | None = pydantic.Field(alias="WriteBufferFromFileDescriptorWriteBytes", default=None)
    network_recv_bytes: int | None = pydantic.Field(alias="NetworkReceiveBytes", default=None)
    network_recv_time: int | None = pydantic.Field(alias="NetworkReceiveElapsedMicroseconds", default=None)
    network_send_bytes: int | None = pydantic.Field(alias="NetworkSendBytes", default=None)
    network_send_time: int | None = pydantic.Field(alias="NetworkSendElapsedMicroseconds", default=None)
    memory_usage: int | None = pydantic.Field(alias="MemoryTrackerUsage", default=None)
    memory_peak: int | None = pydantic.Field(alias="MemoryTrackerPeakUsage", default=None)
    file_open: int | None = pydantic.Field(alias="FileOpen", default=None)
    function_execute: int | None = pydantic.Field(alias="FunctionExecute", default=None)
    write_time: int | None = pydantic.Field(alias="DiskWriteElapsedMicroseconds", default=None)
    insert_rows: int | None = pydantic.Field(alias="InsertedRows", default=None)
    insert_bytes: int | None = pydantic.Field(alias="InsertedBytes", default=None)
    select_rows: int | None = pydantic.Field(alias="SelectedRows", default=None)
    select_bytes: int | None = pydantic.Field(alias="SelectedBytes", default=None)
    insert_parts: int | None = pydantic.Field(alias="InsertedCompactParts", default=None)
    real_time: int | None = pydantic.Field(alias="RealTimeMicroseconds", default=None)
    system_time: int | None = pydantic.Field(alias="SystemTimeMicroseconds", default=None)

    def __init__(self, result: CursorResult | None = None, query_info: QueryInfo | None = None):
        if query_info is None:
            query_info: QueryInfo = result.context.query_info
        super().__init__(elapsed=int(query_info.elapsed * 1000), **(query_info.stats or {}))  # TODO: 1000?

I can add them here(without pydantic), but wouldn't that be too much?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. It's too much. Dict will be fine.

column_names = [i[0] for i in packet.block.columns_with_types]
for row in data:
item = dict(zip(column_names, row))
name = item.get('name', '')
if item['type'] == 'increment':
self.stats[name] = self.stats.get(name, 0) + item['value']
elif item['type'] == 'gauge':
self.stats[name] = item['value']
27 changes: 25 additions & 2 deletions tests/test_query_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ def test_store_last_query_after_execute(self):

self.assertGreater(last_query.elapsed, 0)

self.assertEqual(last_query.stats['SelectQuery'], 1)
self.assertEqual(last_query.stats['SelectedRows'], 42)

def test_last_query_after_execute_iter(self):
with self.sample_table():
list(self.client.execute_iter(self.sample_query))
Expand All @@ -57,6 +60,9 @@ def test_last_query_after_execute_iter(self):

self.assertEqual(last_query.elapsed, 0)

self.assertEqual(last_query.stats['SelectQuery'], 1)
self.assertEqual(last_query.stats['SelectedRows'], 42)

def test_last_query_after_execute_with_progress(self):
with self.sample_table():
progress = self.client.execute_with_progress(self.sample_query)
Expand All @@ -77,6 +83,9 @@ def test_last_query_after_execute_with_progress(self):

self.assertEqual(last_query.elapsed, 0)

self.assertEqual(last_query.stats['SelectQuery'], 1)
self.assertEqual(last_query.stats['SelectedRows'], 42)

def test_last_query_progress_total_rows(self):
self.client.execute('SELECT number FROM numbers(10) LIMIT 10')

Expand All @@ -96,6 +105,9 @@ def test_last_query_progress_total_rows(self):

self.assertGreater(last_query.elapsed, 0)

self.assertEqual(last_query.stats['SelectQuery'], 1)
self.assertEqual(last_query.stats['SelectedRows'], 10)

def test_last_query_after_execute_insert(self):
with self.sample_table():
self.client.execute('INSERT INTO test (foo) VALUES',
Expand All @@ -111,14 +123,19 @@ def test_last_query_after_execute_insert(self):

self.assertGreater(last_query.elapsed, 0)

self.assertEqual(last_query.stats['InsertQuery'], 1)
self.assertEqual(last_query.stats['InsertedRows'], 42)

def test_override_after_subsequent_queries(self):
query = 'SELECT * FROM test WHERE foo < %(i)s ORDER BY foo LIMIT 5'
with self.sample_table():
for i in range(1, 10):
self.client.execute(query, {'i': i})

profile_info = self.client.last_query.profile_info
self.assertEqual(profile_info.rows_before_limit, i)
last_query = self.client.last_query
self.assertEqual(last_query.profile_info.rows_before_limit, i)
self.assertEqual(last_query.stats['SelectQuery'], 1)
self.assertEqual(last_query.stats['SelectedRows'], 42)

def test_reset_last_query(self):
with self.sample_table():
Expand Down Expand Up @@ -150,6 +167,11 @@ def test_progress_info_increment(self):
total_rows = 100000000 if self.server_version > (19, 4) else 0
self.assertEqual(last_query.progress.total_rows, total_rows)

last_query = self.client.last_query
self.assertEqual(last_query.stats['SelectQuery'], 1)
self.assertEqual(last_query.stats['SelectedRows'], 100000000)
self.assertEqual(last_query.stats['SelectedBytes'], 800000000)

def test_progress_info_ddl(self):
self.client.execute('DROP TABLE IF EXISTS foo')

Expand All @@ -162,3 +184,4 @@ def test_progress_info_ddl(self):
self.assertEqual(last_query.progress.elapsed_ns, 0)

self.assertGreater(last_query.elapsed, 0)
self.assertDictEqual(last_query.stats, {})
Loading