From 7fce84ce1121dd692fa19add87c8c5c8c0049d05 Mon Sep 17 00:00:00 2001 From: stphnlyd Date: Tue, 10 Mar 2020 17:48:28 +0800 Subject: [PATCH] allow new capacity byte for connection handshake Recent Kdb versions support new capacity bytes for connection handshake. See https://code.kx.com/v2/basics/ipc/#handshake The change is to make it possible to override the default to support large memory > 2GB. --- qpython/fastutils.pyx | 4 ++-- qpython/qconnection.py | 7 +++++-- qpython/qreader.py | 21 ++++++++++++++------- qpython/utils.py | 14 +++++++------- 4 files changed, 28 insertions(+), 18 deletions(-) diff --git a/qpython/fastutils.pyx b/qpython/fastutils.pyx index 5e462e4..87fc8f8 100644 --- a/qpython/fastutils.pyx +++ b/qpython/fastutils.pyx @@ -17,8 +17,8 @@ import numpy cimport numpy -DTYPE = numpy.int -ctypedef numpy.int_t DTYPE_t +DTYPE = numpy.uint +ctypedef numpy.uint_t DTYPE_t DTYPE8 = numpy.int ctypedef numpy.uint8_t DTYPE8_t diff --git a/qpython/qconnection.py b/qpython/qconnection.py index c3054d8..3c97f27 100644 --- a/qpython/qconnection.py +++ b/qpython/qconnection.py @@ -65,6 +65,8 @@ class QConnection(object): - `encoding` (`string`) - string encoding for data deserialization - `reader_class` (subclass of `QReader`) - data deserializer - `writer_class` (subclass of `QWriter`) - data serializer + - `capacity_byte` (byte) - handshake capacity byte. Use it only if you + want to override the default. See https://code.kx.com/v2/basics/ipc/#handshake :Options: - `raw` (`boolean`) - if ``True`` returns raw data chunk instead of parsed data, **Default**: ``False`` @@ -78,11 +80,12 @@ class QConnection(object): ''' - def __init__(self, host, port, username = None, password = None, timeout = None, encoding = 'latin-1', reader_class = None, writer_class = None, **options): + def __init__(self, host, port, username = None, password = None, timeout = None, encoding = 'latin-1', reader_class = None, writer_class = None, capacity_byte = b'\3', **options): self.host = host self.port = port self.username = username self.password = password + self.capacity_byte = capacity_byte self._connection = None self._connection_file = None @@ -186,7 +189,7 @@ def _initialize(self): '''Performs a IPC protocol handshake.''' credentials = (self.username if self.username else '') + ':' + (self.password if self.password else '') credentials = credentials.encode(self._encoding) - self._connection.send(credentials + b'\3\0') + self._connection.send(credentials + self.capacity_byte + b'\0') response = self._connection.recv(1) if len(response) != 1: diff --git a/qpython/qreader.py b/qpython/qreader.py index 2dbdf54..e65c8a9 100644 --- a/qpython/qreader.py +++ b/qpython/qreader.py @@ -167,7 +167,7 @@ def read_header(self, source = None): # skip 1 byte self._buffer.skip() - message_size = self._buffer.get_int() + message_size = self._buffer.get_size() return QMessage(None, message_type, message_size, message_compressed) @@ -197,14 +197,14 @@ def read_data(self, message_size, is_compressed = False, **options): if is_compressed: if self._stream: self._buffer.wrap(self._read_bytes(4)) - uncompressed_size = -8 + self._buffer.get_int() + uncompressed_size = -8 + self._buffer.get_size() compressed_data = self._read_bytes(message_size - 12) if self._stream else self._buffer.raw(message_size - 12) raw_data = numpy.frombuffer(compressed_data, dtype = numpy.uint8) if uncompressed_size <= 0: raise QReaderException('Error while data decompression.') - raw_data = uncompress(raw_data, numpy.intc(uncompressed_size)) + raw_data = uncompress(raw_data, numpy.uintc(uncompressed_size)) raw_data = numpy.ndarray.tostring(raw_data) self._buffer.wrap(raw_data) elif self._stream: @@ -243,7 +243,7 @@ def _read_error(self, qtype = QERROR): @parse(QSTRING) def _read_string(self, qtype = QSTRING): self._buffer.skip() # ignore attributes - length = self._buffer.get_int() + length = self._buffer.get_size() return self._buffer.raw(length) if length > 0 else b'' @@ -284,7 +284,7 @@ def _read_temporal(self, qtype): def _read_list(self, qtype): self._buffer.skip() # ignore attributes - length = self._buffer.get_int() + length = self._buffer.get_size() conversion = PY_TYPE.get(-qtype, None) if qtype == QSYMBOL_LIST: @@ -333,7 +333,7 @@ def _read_table(self, qtype = QTABLE): @parse(QGENERAL_LIST) def _read_general_list(self, qtype = QGENERAL_LIST): self._buffer.skip() # ignore attributes - length = self._buffer.get_int() + length = self._buffer.get_size() return [self._read_object() for x in range(length)] @@ -373,7 +373,7 @@ def _read_adverb_function(self, qtype = QADVERB_FUNC_106): @parse(QPROJECTION) def _read_projection(self, qtype = QPROJECTION): - length = self._buffer.get_int() + length = self._buffer.get_size() parameters = [ self._read_object() for x in range(length) ] return QProjection(parameters) @@ -499,6 +499,13 @@ def get_int(self): ''' return self.get('i') + def get_size(self): + ''' + Gets a single 32-bit unsinged integer from the buffer. + + :returns: single unsigned integer + ''' + return self.get('i') & 0xffffffff def get_symbol(self): ''' diff --git a/qpython/utils.py b/qpython/utils.py index daebee3..7c9395c 100644 --- a/qpython/utils.py +++ b/qpython/utils.py @@ -18,19 +18,19 @@ def uncompress(data, uncompressed_size): - _0 = numpy.intc(0) - _1 = numpy.intc(1) - _2 = numpy.intc(2) - _128 = numpy.intc(128) - _255 = numpy.intc(255) + _0 = numpy.uintc(0) + _1 = numpy.uintc(1) + _2 = numpy.uintc(2) + _128 = numpy.uintc(128) + _255 = numpy.uintc(255) n, r, s, p = _0, _0, _0, _0 i, d = _1, _1 f = _255 & data[_0] - ptrs = numpy.zeros(256, dtype = numpy.intc) + ptrs = numpy.zeros(256, dtype = numpy.uintc) uncompressed = numpy.zeros(uncompressed_size, dtype = numpy.uint8) - idx = numpy.arange(uncompressed_size, dtype = numpy.intc) + idx = numpy.arange(uncompressed_size, dtype = numpy.uintc) while s < uncompressed_size: pp = p + _1