Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: how to stream data #122

Closed
den-crane opened this issue Feb 10, 2023 · 25 comments · Fixed by #124
Closed

Question: how to stream data #122

den-crane opened this issue Feb 10, 2023 · 25 comments · Fixed by #124
Labels
bug Something isn't working

Comments

@den-crane
Copy link

den-crane commented Feb 10, 2023

cat x.py
import clickhouse_connect

client = clickhouse_connect.get_client(host='localhost', username='default', password='')

result = client.query('SELECT toString(cityHash64(number)) FROM  numbers(10000000000) where intDiv(1,number-100000)>-1000000000000000')
for i in result.result_rows:
    print( i )
python3 x.py|tail
('17933741768352543064',)
('11086356212394800338',)
('12749720657102764978',)
('10098974843304845854',)
('9676786211223313804',)
('17244299265287719294',)
('2925889024054896551',)
('7692628340459912383',)
('738548801493469449',)
('14340599725870999718',)

the same in CH client

clickhouse :) SELECT toString(cityHash64(number)) FROM  numbers(10000000000) where intDiv(1,number-100000)>-1000000000000000;

65409 rows in set. Elapsed: 0.035 sec.

Received exception from server (version 23.1.2):
Code: 153. DB::Exception: Received from localhost:9000. DB::Exception: Division by zero: while executing 'FUNCTION intDiv(1 :: 1, minus(number, 100000) :: 4) -> intDiv(1, minus(number, 100000)) Int8 : 2'. (ILLEGAL_DIVISION)
@den-crane
Copy link
Author

My mistake, it reads only 5000 rows. I don't understand why, though.

@genzgd
Copy link
Collaborator

genzgd commented Feb 10, 2023

It defaults to 5000 rows for queries to prevent blowing up client memory -- you can change the query_limit parameter to some other value, or 0 to allow unlimited -- which might be okay as a default at this point now that the HTTP response is streamed.

@genzgd
Copy link
Collaborator

genzgd commented Feb 10, 2023

You also add a higher LIMIT to the end of the query which should be respected

@den-crane
Copy link
Author

It defaults to 5000 rows for queries to prevent blowing up client memory

I suggest to document it.

@den-crane
Copy link
Author

den-crane commented Feb 10, 2023

BTW is it possible to stream a result? I need to query about 50 millions wide rows (~15 GB) ?

@genzgd
Copy link
Collaborator

genzgd commented Feb 10, 2023

@den-crane
Copy link
Author

Hm, I cannot make it work

import clickhouse_connect

client = clickhouse_connect.get_client(host='localhost', username='default', password='')

client.query_limit = 0

with client.query_row_block_stream('SELECT toString(cityHash64(number)) FROM  numbers(10000000000) where intDiv(1,number-30000000)>-1000000000000000') as stream:
    for block in stream:
                print ("new block")
                for row in block:
                    print ( row )

this SQL throws "Division by zero" after 30 000 000 rows.
I expect multiple print ("new block") for each 65k rows and 30 mil print ( row ) but I see nothing.

@den-crane den-crane reopened this Feb 10, 2023
@den-crane den-crane changed the title Driver does not recognize CH errors in the middle of a result Question: how to stream data Feb 10, 2023
@genzgd
Copy link
Collaborator

genzgd commented Feb 10, 2023

I think this is the result of the default True value for the client setting for send_progress, which adds the http setting wait_end_of_query=1. If I do this it works for me:

client = clickhouse_connect.get_client(send_progress=False)

Because of the wait_end_of_query setting clickhouse is generating the error before it sends any data for streaming.

That default probably should also be changed at least for streaming queries (another leftover from before streaming was implemented).

@den-crane
Copy link
Author

den-crane commented Feb 10, 2023

Still, no streaming

import clickhouse_connect
import datetime
client = clickhouse_connect.get_client(send_progress=False)

client.query_limit = 0

print ("Start ", datetime.datetime.now())

with client.query_row_block_stream('SELECT toString(cityHash64(number)) FROM  numbers(10000000000) where intDiv(1,number-100000000)>-1000000000000000') as stream:
    for block in stream:
                print ("new block", datetime.datetime.now())
python3 x.py |head
Start  2023-02-10 19:16:26.160108
new block 2023-02-10 19:17:10.907592
new block 2023-02-10 19:17:10.919922
new block 2023-02-10 19:17:10.932581
new block 2023-02-10 19:17:10.944436
new block 2023-02-10 19:17:10.955817
new block 2023-02-10 19:17:10.967090
new block 2023-02-10 19:17:10.978131
new block 2023-02-10 19:17:10.989059
new block 2023-02-10 19:17:10.999771

Curl returns data instantly

curl "http://localhost:8123/?query=SELECT+toString(cityHash64(number))+FROM+numbers(10000000000)+where+intDiv(1,number-100000000)>-1000000000000000"

@den-crane
Copy link
Author

den-crane commented Feb 10, 2023

Also send_progress=False hides the real error Code: 153. DB::Exception: Division by zero:

Closed HTTP response due to unexpected exception
/usr/lib/python3/dist-packages/requests/__init__.py:89: RequestsDependencyWarning: urllib3 (1.26.14) or chardet (3.0.4) doesn't match a supported version!
  warnings.warn("urllib3 ({}) or chardet ({}) doesn't match a supported "
Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.8/site-packages/clickhouse_connect/driver/transform.py", line 57, in gen
    next_block = get_block()
  File "/home/ubuntu/.local/lib/python3.8/site-packages/clickhouse_connect/driver/transform.py", line 42, in get_block
    column = col_type.read_column(source, num_rows, context)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/clickhouse_connect/datatypes/base.py", line 128, in read_column
    column = self.read_column_data(source, num_rows, ctx)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/clickhouse_connect/datatypes/base.py", line 143, in read_column_data
    return self._read_column_binary(source, num_rows, ctx)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/clickhouse_connect/datatypes/string.py", line 15, in _read_column_binary
    return source.read_str_col(num_rows, ctx.encoding or self.encoding)
  File "clickhouse_connect/driverc/buffer.pyx", line 194, in clickhouse_connect.driverc.buffer.ResponseBuffer.read_str_col
  File "clickhouse_connect/driverc/buffer.pyx", line 123, in clickhouse_connect.driverc.buffer.ResponseBuffer._read_str_col
  File "clickhouse_connect/driverc/buffer.pyx", line 66, in clickhouse_connect.driverc.buffer.ResponseBuffer.read_bytes_c
StopIteration

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "x.py", line 10, in <module>
    for block in stream:
  File "/home/ubuntu/.local/lib/python3.8/site-packages/clickhouse_connect/driver/common.py", line 196, in __next__
    return next(self.gen)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/clickhouse_connect/driver/query.py", line 202, in _row_block_stream
    for block in self._column_block_stream():
RuntimeError: generator raised StopIteration

@genzgd
Copy link
Collaborator

genzgd commented Feb 10, 2023

Based on running a profile, the delay looks like it's coming from ClickHouse doing compression. One more setting change make it almost instant.

client = clickhouse_connect.get_client(send_progress=False, compress=False)

Bubbling the reported error up through the optmized Cython code is a bit more challenging but I'll take a look.

@den-crane
Copy link
Author

den-crane commented Feb 10, 2023

compress=False I am sure you wait till the whole result is decompressed, then yield the result.

with client.query_row_block_stream('SELECT toString(cityHash64(number)) FROM  numbers(100000)') as stream:

Start  2023-02-10 20:18:36.166016
new block 2023-02-10 20:18:36.203521
new block 2023-02-10 20:18:36.212884

with client.query_row_block_stream('SELECT toString(cityHash64(number)) FROM  numbers(10000000)') as stream:

Start  2023-02-10 20:20:42.689736
new block 2023-02-10 20:20:47.038857
....
new block 2023-02-10 20:20:48.431567

Or a decompression buffer is huge.

@genzgd
Copy link
Collaborator

genzgd commented Feb 10, 2023

compress=False tells ClickHouse to not use compression at all. On my local machine the profile showed that it was waiting for the ClickHouse server to send the response which was causing the delay:

compress=True -- uses lz4 by default:

Screenshot 2023-02-10 at 1 27 14 PM

Waiting in `method 'recv_into' means the Python driver is waiting for ClickHouse to actually put data on the wire. Decompression on the Python side is actually pretty fast.

If compression is used, the Python side decompresses each block individually. I'm not sure how ClickHouse does the lz4 compression over http.

@den-crane
Copy link
Author

import clickhouse_connect
import datetime
import tracemalloc

tracemalloc.start()

client = clickhouse_connect.get_client(send_progress=False, compress=True)

client.query_limit = 0

with client.query_row_block_stream('SELECT toString(cityHash64(number)) FROM  numbers(100000000)') as stream:
    for block in stream:
        pass

current, peak = tracemalloc.get_traced_memory()
print(f"Current memory usage is {current / 10**6}MB; Peak was {peak / 10**6}MB")
tracemalloc.stop()
compress=False
Current memory usage is 3.465356MB; Peak was 23.124167MB

compress=True
Current memory usage is 8.005755MB; Peak was 4080.914301MB

@genzgd
Copy link
Collaborator

genzgd commented Feb 10, 2023

Hmm, I lied, it looks like ClickHouse picks zstd when Accept-Encoding includes both lz4 and zstd. And it appears that ClickHouse is sending the whole thing (or at least very big pieces) as a single zstd frame. I'll dig a little further.

@den-crane
Copy link
Author

with client.query_row_block_stream('SELECT toString(cityHash64(number)) FROM  numbers(1000000000)') as stream:

dmesg -T |tail
[Fri Feb 10 20:44:08 2023] Out of memory: Killed process 2362915 (python3) total-vm:40294768kB, anon-rss:22543012kB, file-rss:2104kB, shmem-rss:0kB, UID:1001 pgtables:44384kB oom_score_adj:0

22 GB

@genzgd
Copy link
Collaborator

genzgd commented Feb 10, 2023

compress='lz4' looks much faster, so that probably should be the only Accept-Encoding for reads as a default

@genzgd
Copy link
Collaborator

genzgd commented Feb 10, 2023

Yeah zstd is broken for streaming for whatever reason, looks like it slow on both the ClickHouse side and the Python side.

compress='lz4'

Current memory usage is 7.531117MB; Peak was 25.668245MB

@genzgd genzgd added the bug Something isn't working label Feb 10, 2023
@den-crane
Copy link
Author

den-crane commented Feb 10, 2023

Thanks, send_progress=False, compress='lz4' solves my task.

Though, tracemalloc is not trustworthy or shows not RSS.
tracemalloc shows 2412MB, but python never used them.

import clickhouse_connect
import datetime
import tracemalloc

tracemalloc.start()
try:
    client = clickhouse_connect.get_client(send_progress=False, compress=False)

    client.query_limit = 0

    print ("Start ", datetime.datetime.now())

    with client.query_row_block_stream("SELECT 'aaaaaaaaaaaaaaaaaaaaaaaaaaa' FROM  numbers(10000000)") as stream:
        for block in stream:
            #pass
            print ("new block", datetime.datetime.now())
            for row in block:
                print ( row )

finally:
    current, peak = tracemalloc.get_traced_memory()
    print(f"Current memory usage is {current / 10**6}MB; Peak was {peak / 10**6}MB")
    tracemalloc.stop()
compress=False
/usr/bin/time -v  python3 x.py|tail -1
Peak was 24.637032MB

Maximum resident set size (kbytes): 143032

compress='lz4'
/usr/bin/time -v  python3 x.py|tail -1
Peak was 2412.769951MB

Maximum resident set size (kbytes): 605988

@genzgd
Copy link
Collaborator

genzgd commented Feb 10, 2023

Thanks for stress testing and prompting me to find the bad zstd behavior and rethink some defaults. I just thought compression was expensive but it appears that is ClickHouse putting the entire HTTP response in big zstd frames (apparently 4GB), which is not optimal. As for the maximum RSS, that indeed looks weird.

@den-crane
Copy link
Author

den-crane commented Feb 10, 2023

HTTP response in big zstd frames (apparently 4GB), which is not optimal

I suggest to verify it w Clickhouse team.
I am not sure that's true.

curl "http://localhost:8123/?enable_http_compression=1&query=SELECT+toString(cityHash64(number))+FROM+numbers(100000000)" -H 'Accept-Encoding: zstd' | /usr/bin/time -v zstd -d > out

Maximum resident set size (kbytes): 4088

It looks like console zstd spends - 4 MB RAM

@genzgd
Copy link
Collaborator

genzgd commented Feb 10, 2023

Yes, it needs more investigation, I'm quite possible using the Python zstd library wrong.

@genzgd genzgd mentioned this issue Feb 11, 2023
2 tasks
@genzgd genzgd linked a pull request Feb 11, 2023 that will close this issue
2 tasks
@genzgd
Copy link
Collaborator

genzgd commented Feb 11, 2023

I was using the wrong function in the zstandard library which was expecting a content size. It was reading the zstd frame header sent by clickhouse incorrectly and presumed a content size of many petabytes, so it would retrieve all of the response before decompressing.

zstd and lz4 performance should be pretty much the same following the 0.5.9 release

@den-crane
Copy link
Author

Thank you for the fix. It works amazingly fast now.

Probably you can improve streaming exception message by extracting Code: .*. DB::Exception:. *^$

import clickhouse_connect
import datetime
import tracemalloc

tracemalloc.start()
try:
    client = clickhouse_connect.get_client(send_progress=False, compress=True , query_limit=10000)

    client.query_limit = 0

    print ("Start ", datetime.datetime.now())

    with client.query_row_block_stream('SELECT toString(cityHash64(number)) FROM  numbers(10000000000) where intDiv(1,number-2000000)>-1000000000000000') as stream:
        for block in stream:
            #pass
            print ("new block", datetime.datetime.now())
            #for row in block:
            #    print ( row )

finally:
    current, peak = tracemalloc.get_traced_memory()
    print(f"Current memory usage is {current / 10**6}MB; Peak was {peak / 10**6}MB")
    tracemalloc.stop()

Now it looks like this

new block 2023-02-12 16:42:48.746552
new block 2023-02-12 16:42:48.786660
Current memory usage is 15.471932MB; Peak was 26.160725MB
/usr/lib/python3/dist-packages/requests/__init__.py:89: RequestsDependencyWarning: urllib3 (1.26.14) or chardet (3.0.4) doesn't match a supported version!
  warnings.warn("urllib3 ({}) or chardet ({}) doesn't match a supported "
Traceback (most recent call last):
  File "x.py", line 15, in <module>
    for block in stream:
  File "/home/ubuntu/.local/lib/python3.8/site-packages/clickhouse_connect/driver/common.py", line 196, in __next__
    return next(self.gen)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/clickhouse_connect/driver/query.py", line 248, in _row_block_stream
    for block in self._column_block_stream():
  File "/home/ubuntu/.local/lib/python3.8/site-packages/clickhouse_connect/driver/transform.py", line 63, in gen
    next_block = get_block()
  File "/home/ubuntu/.local/lib/python3.8/site-packages/clickhouse_connect/driver/transform.py", line 51, in get_block
    raise StreamFailureError(source.last_message) from None
clickhouse_connect.driver.exceptions.StreamFailureError: 63410833136910804783088432610395774261422113808077803087057503177461281993188170981031911707218394.

...several megabytes of garbage...

Code: 153. DB::Exception: Division by zero: while executing 'FUNCTION intDiv(1 :: 1, minus(number, 2000000) :: 4) -> intDiv(1, minus(number, 2000000)) Int8 : 2'. (ILLEGAL_DIVISION) (version 23.1.2.9 (official build))

@genzgd
Copy link
Collaborator

genzgd commented Feb 12, 2023

I'll try the regex -- I was working under the assumption that the ClickHouse exception would be in a separate block/chunk at the end of the response, but that's clearly incorrect (or maybe only true on my current Mac clickhouse build)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants