Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow new capacity byte for connection handshake #73

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions qpython/fastutils.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import numpy
cimport numpy

DTYPE = numpy.int
ctypedef numpy.int_t DTYPE_t
DTYPE = numpy.uint
ctypedef numpy.uint_t DTYPE_t
DTYPE8 = numpy.int
ctypedef numpy.uint8_t DTYPE8_t

Expand Down
7 changes: 5 additions & 2 deletions qpython/qconnection.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class QConnection(object):
- `encoding` (`string`) - string encoding for data deserialization
- `reader_class` (subclass of `QReader`) - data deserializer
- `writer_class` (subclass of `QWriter`) - data serializer
- `capacity_byte` (byte) - handshake capacity byte. Use it only if you
want to override the default. See https://code.kx.com/v2/basics/ipc/#handshake
:Options:
- `raw` (`boolean`) - if ``True`` returns raw data chunk instead of parsed
data, **Default**: ``False``
Expand All @@ -78,11 +80,12 @@ class QConnection(object):
'''


def __init__(self, host, port, username = None, password = None, timeout = None, encoding = 'latin-1', reader_class = None, writer_class = None, **options):
def __init__(self, host, port, username = None, password = None, timeout = None, encoding = 'latin-1', reader_class = None, writer_class = None, capacity_byte = b'\3', **options):
self.host = host
self.port = port
self.username = username
self.password = password
self.capacity_byte = capacity_byte

self._connection = None
self._connection_file = None
Expand Down Expand Up @@ -186,7 +189,7 @@ def _initialize(self):
'''Performs a IPC protocol handshake.'''
credentials = (self.username if self.username else '') + ':' + (self.password if self.password else '')
credentials = credentials.encode(self._encoding)
self._connection.send(credentials + b'\3\0')
self._connection.send(credentials + self.capacity_byte + b'\0')
response = self._connection.recv(1)

if len(response) != 1:
Expand Down
21 changes: 14 additions & 7 deletions qpython/qreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def read_header(self, source = None):
# skip 1 byte
self._buffer.skip()

message_size = self._buffer.get_int()
message_size = self._buffer.get_size()
return QMessage(None, message_type, message_size, message_compressed)


Expand Down Expand Up @@ -197,14 +197,14 @@ def read_data(self, message_size, is_compressed = False, **options):
if is_compressed:
if self._stream:
self._buffer.wrap(self._read_bytes(4))
uncompressed_size = -8 + self._buffer.get_int()
uncompressed_size = -8 + self._buffer.get_size()
compressed_data = self._read_bytes(message_size - 12) if self._stream else self._buffer.raw(message_size - 12)

raw_data = numpy.frombuffer(compressed_data, dtype = numpy.uint8)
if uncompressed_size <= 0:
raise QReaderException('Error while data decompression.')

raw_data = uncompress(raw_data, numpy.intc(uncompressed_size))
raw_data = uncompress(raw_data, numpy.uintc(uncompressed_size))
raw_data = numpy.ndarray.tostring(raw_data)
self._buffer.wrap(raw_data)
elif self._stream:
Expand Down Expand Up @@ -243,7 +243,7 @@ def _read_error(self, qtype = QERROR):
@parse(QSTRING)
def _read_string(self, qtype = QSTRING):
self._buffer.skip() # ignore attributes
length = self._buffer.get_int()
length = self._buffer.get_size()
return self._buffer.raw(length) if length > 0 else b''


Expand Down Expand Up @@ -284,7 +284,7 @@ def _read_temporal(self, qtype):

def _read_list(self, qtype):
self._buffer.skip() # ignore attributes
length = self._buffer.get_int()
length = self._buffer.get_size()
conversion = PY_TYPE.get(-qtype, None)

if qtype == QSYMBOL_LIST:
Expand Down Expand Up @@ -333,7 +333,7 @@ def _read_table(self, qtype = QTABLE):
@parse(QGENERAL_LIST)
def _read_general_list(self, qtype = QGENERAL_LIST):
self._buffer.skip() # ignore attributes
length = self._buffer.get_int()
length = self._buffer.get_size()

return [self._read_object() for x in range(length)]

Expand Down Expand Up @@ -373,7 +373,7 @@ def _read_adverb_function(self, qtype = QADVERB_FUNC_106):

@parse(QPROJECTION)
def _read_projection(self, qtype = QPROJECTION):
length = self._buffer.get_int()
length = self._buffer.get_size()
parameters = [ self._read_object() for x in range(length) ]
return QProjection(parameters)

Expand Down Expand Up @@ -499,6 +499,13 @@ def get_int(self):
'''
return self.get('i')

def get_size(self):
'''
Gets a single 32-bit unsinged integer from the buffer.

:returns: single unsigned integer
'''
return self.get('i') & 0xffffffff

def get_symbol(self):
'''
Expand Down
14 changes: 7 additions & 7 deletions qpython/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@


def uncompress(data, uncompressed_size):
_0 = numpy.intc(0)
_1 = numpy.intc(1)
_2 = numpy.intc(2)
_128 = numpy.intc(128)
_255 = numpy.intc(255)
_0 = numpy.uintc(0)
_1 = numpy.uintc(1)
_2 = numpy.uintc(2)
_128 = numpy.uintc(128)
_255 = numpy.uintc(255)

n, r, s, p = _0, _0, _0, _0
i, d = _1, _1
f = _255 & data[_0]

ptrs = numpy.zeros(256, dtype = numpy.intc)
ptrs = numpy.zeros(256, dtype = numpy.uintc)
uncompressed = numpy.zeros(uncompressed_size, dtype = numpy.uint8)
idx = numpy.arange(uncompressed_size, dtype = numpy.intc)
idx = numpy.arange(uncompressed_size, dtype = numpy.uintc)

while s < uncompressed_size:
pp = p + _1
Expand Down