Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add pluggable compression serde #407

Merged
merged 5 commits into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ coverage.xml
#Docs
docs/_build
docs/apidoc/
venv/
sontek marked this conversation as resolved.
Show resolved Hide resolved
63 changes: 59 additions & 4 deletions pymemcache/serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
import logging
from io import BytesIO
import pickle
import zlib

FLAG_BYTES = 0
FLAG_PICKLE = 1 << 0
FLAG_INTEGER = 1 << 1
FLAG_LONG = 1 << 2
FLAG_COMPRESSED = 1 << 3 # unused, to main compatibility with python-memcached
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any insight into how this flag still maps back to python-memcached. Also would this flag cause any issues since two folks can use the same flag but different compression algorithms?

Also I wonder what compatibility we still have with python-memcached before this change and now.

Copy link
Collaborator

@jparise jparise Jul 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relatedly, a way to handle a generally compressed values with different compressors is to introduce a prefix.

serde = CompressedSerde(..., prefix="z")

... and then prepend that prefix on serialization (e.g. value = prefix + "|" + value). On deserialization, validate and strip the prefix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

python-memcached used zlib by default but I don't think at this point there is any chance of compatibility since we are supporting multiple versions of pickle and we wouldn't be able to guarantee versions across the serialization.

I like the idea of adding a prefix on there so we identify which compressor was used. In the end the client is only going to have one (via serde) so its not going to be able to handle decompressing/compressing multiple versions but we could at least raise an error that says "Found keys compressed with a different serializer".

I don't know if we do this today though. For example, we don't track pickle versions so we can select the correct one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jparise @jogo How strongly are you feeling that you want to be able to identify the compression algorithm between clients that are configured with different serde? I have a feeling its not that high of a priority since we don't do that today with the pickle serializer but wanted to check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think to do it properly we'd have to do what we discussed before, which I'm not really a fan of:

#53 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it's not a high priority now. I mainly wanted to explicitly make sure we weren't trying to keep any python-memcached compatibility.

FLAG_COMPRESSED = 1 << 3
FLAG_TEXT = 1 << 4

# Pickle protocol version (highest available to runtime)
Expand Down Expand Up @@ -121,6 +122,63 @@ def deserialize(self, key, value, flags):
return python_memcache_deserializer(key, value, flags)


pickle_serde = PickleSerde()


class CompressedSerde:
"""
An object which implements the serialization/deserialization protocol for
:py:class:`pymemcache.client.base.Client` and its descendants with
configurable compression.
"""

def __init__(
self,
compress=zlib.compress,
decompress=zlib.decompress,
serde=pickle_serde,
# Discovered scientifically by testing at what point the serialization
sontek marked this conversation as resolved.
Show resolved Hide resolved
# begins to improve, with a little padded on since compression adds
# CPU overhead
# >>> foo = 'foo'*4
# >>> len(zlib.compress(foo.encode('utf-8'))), len(foo)
# (13, 12)
# >>> foo = 'foo'*5
# >>> len(zlib.compress(foo.encode('utf-8'))), len(foo)
# (13, 15)
min_compress_len=30,
):
self._serde = serde
self._compress = compress
self._decompress = decompress
self._min_compress_len = min_compress_len

def serialize(self, key, value):
value, flags = self._serde.serialize(key, value)

if len(value) > self._min_compress_len > 0:
old_value = value
value = self._compress(value)
# Don't use the compressed value if our end result is actually
# larger uncompressed.
if len(old_value) < len(value):
value = old_value
else:
flags |= FLAG_COMPRESSED

return value, flags

def deserialize(self, key, value, flags):
if flags & FLAG_COMPRESSED:
value = self._decompress(value)

value = self._serde.deserialize(key, value, flags)
return value


compressed_serde = CompressedSerde()


class LegacyWrappingSerde:
"""
This class defines how to wrap legacy de/serialization functions into a
Expand All @@ -141,6 +199,3 @@ def _default_serialize(self, key, value):

def _default_deserialize(self, key, value, flags):
return value


pickle_serde = PickleSerde()
87 changes: 77 additions & 10 deletions pymemcache/test/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,16 @@
import pytest

from pymemcache.client.base import Client
from pymemcache.exceptions import MemcacheIllegalInputError, MemcacheClientError
from pymemcache.serde import PickleSerde, pickle_serde
from pymemcache.exceptions import (
MemcacheIllegalInputError,
MemcacheClientError,
MemcacheServerError,
)
from pymemcache.serde import (
compressed_serde,
PickleSerde,
pickle_serde,
)


def get_set_helper(client, key, value, key2, value2):
Expand All @@ -41,8 +49,15 @@ def get_set_helper(client, key, value, key2, value2):


@pytest.mark.integration()
def test_get_set(client_class, host, port, socket_module):
client = client_class((host, port), socket_module=socket_module)
@pytest.mark.parametrize(
"serde",
[
pickle_serde,
compressed_serde,
],
)
def test_get_set(client_class, host, port, serde, socket_module):
client = client_class((host, port), serde=serde, socket_module=socket_module)
client.flush_all()

key = b"key"
Expand All @@ -53,9 +68,16 @@ def test_get_set(client_class, host, port, socket_module):


@pytest.mark.integration()
def test_get_set_unicode_key(client_class, host, port, socket_module):
@pytest.mark.parametrize(
"serde",
[
pickle_serde,
compressed_serde,
],
)
def test_get_set_unicode_key(client_class, host, port, serde, socket_module):
client = client_class(
(host, port), socket_module=socket_module, allow_unicode_keys=True
(host, port), serde=serde, socket_module=socket_module, allow_unicode_keys=True
)
client.flush_all()

Expand All @@ -67,8 +89,15 @@ def test_get_set_unicode_key(client_class, host, port, socket_module):


@pytest.mark.integration()
def test_add_replace(client_class, host, port, socket_module):
client = client_class((host, port), socket_module=socket_module)
@pytest.mark.parametrize(
"serde",
[
pickle_serde,
compressed_serde,
],
)
def test_add_replace(client_class, host, port, serde, socket_module):
client = client_class((host, port), serde=serde, socket_module=socket_module)
client.flush_all()

result = client.add(b"key", b"value", noreply=False)
Expand Down Expand Up @@ -277,8 +306,15 @@ def check(value):


@pytest.mark.integration()
def test_serde_serialization(client_class, host, port, socket_module):
serde_serialization_helper(client_class, host, port, socket_module, pickle_serde)
@pytest.mark.parametrize(
"serde",
[
pickle_serde,
compressed_serde,
],
)
def test_serde_serialization(client_class, host, port, socket_module, serde):
serde_serialization_helper(client_class, host, port, socket_module, serde)


@pytest.mark.integration()
Expand Down Expand Up @@ -350,3 +386,34 @@ def test_tls(client_class, tls_host, tls_port, socket_module, tls_context):
key2 = b"key2"
value2 = b"value2"
get_set_helper(client, key, value, key2, value2)


@pytest.mark.integration()
@pytest.mark.parametrize(
"serde,should_fail",
[
(pickle_serde, True),
(compressed_serde, False),
],
)
def test_get_set_large(
client_class,
host,
port,
serde,
socket_module,
should_fail,
):
client = client_class((host, port), serde=serde, socket_module=socket_module)
client.flush_all()

key = b"key"
value = b"value" * 1024 * 1024
key2 = b"key2"
value2 = b"value2" * 1024 * 1024

if should_fail:
with pytest.raises(MemcacheServerError):
get_set_helper(client, key, value, key2, value2)
else:
get_set_helper(client, key, value, key2, value2)
106 changes: 87 additions & 19 deletions pymemcache/test/test_serde.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
from unittest import TestCase

from pymemcache.serde import (
CompressedSerde,
pickle_serde,
PickleSerde,
FLAG_BYTES,
FLAG_COMPRESSED,
FLAG_PICKLE,
FLAG_INTEGER,
FLAG_TEXT,
)
import pytest
import pickle
import sys
import zlib


class CustomInt(int):
Expand All @@ -23,39 +27,40 @@ class CustomInt(int):
pass


@pytest.mark.unit()
class TestSerde(TestCase):
serde = pickle_serde
def check(serde, value, expected_flags):
serialized, flags = serde.serialize(b"key", value)
assert flags == expected_flags

# pymemcache stores values as byte strings, so we immediately the value
# if needed so deserialized works as it would with a real server
if not isinstance(serialized, bytes):
serialized = str(serialized).encode("ascii")

def check(self, value, expected_flags):
serialized, flags = self.serde.serialize(b"key", value)
assert flags == expected_flags
deserialized = serde.deserialize(b"key", serialized, flags)
assert deserialized == value

# pymemcache stores values as byte strings, so we immediately the value
# if needed so deserialized works as it would with a real server
if not isinstance(serialized, bytes):
serialized = str(serialized).encode("ascii")

deserialized = self.serde.deserialize(b"key", serialized, flags)
assert deserialized == value
@pytest.mark.unit()
class TestSerde:
serde = pickle_serde

def test_bytes(self):
self.check(b"value", FLAG_BYTES)
self.check(b"\xc2\xa3 $ \xe2\x82\xac", FLAG_BYTES) # £ $ €
check(self.serde, b"value", FLAG_BYTES)
check(self.serde, b"\xc2\xa3 $ \xe2\x82\xac", FLAG_BYTES) # £ $ €

def test_unicode(self):
self.check("value", FLAG_TEXT)
self.check("£ $ €", FLAG_TEXT)
check(self.serde, "value", FLAG_TEXT)
check(self.serde, "£ $ €", FLAG_TEXT)

def test_int(self):
self.check(1, FLAG_INTEGER)
check(self.serde, 1, FLAG_INTEGER)

def test_pickleable(self):
self.check({"a": "dict"}, FLAG_PICKLE)
check(self.serde, {"a": "dict"}, FLAG_PICKLE)

def test_subtype(self):
# Subclass of a native type will be restored as the same type
self.check(CustomInt(123123), FLAG_PICKLE)
check(self.serde, CustomInt(123123), FLAG_PICKLE)


@pytest.mark.unit()
Expand All @@ -76,3 +81,66 @@ class TestSerdePickleVersion2(TestCase):
@pytest.mark.unit()
class TestSerdePickleVersionHighest(TestCase):
serde = PickleSerde(pickle_version=pickle.HIGHEST_PROTOCOL)


@pytest.mark.parametrize("serde", [pickle_serde, CompressedSerde()])
@pytest.mark.unit()
def test_compressed_simple(serde):
sontek marked this conversation as resolved.
Show resolved Hide resolved
# test_bytes
check(serde, b"value", FLAG_BYTES)
check(serde, b"\xc2\xa3 $ \xe2\x82\xac", FLAG_BYTES) # £ $ €

# test_unicode
check(serde, "value", FLAG_TEXT)
check(serde, "£ $ €", FLAG_TEXT)

# test_int
check(serde, 1, FLAG_INTEGER)

# test_pickleable
check(serde, {"a": "dict"}, FLAG_PICKLE)

# test_subtype
# Subclass of a native type will be restored as the same type
check(serde, CustomInt(12312), FLAG_PICKLE)


@pytest.mark.parametrize(
"serde",
[
CompressedSerde(),
# Custom compression. This could be something like lz4
CompressedSerde(
compress=lambda value: zlib.compress(value, 9),
decompress=lambda value: zlib.decompress(value),
),
],
)
@pytest.mark.unit()
def test_compressed_complex(serde):
# test_bytes
check(serde, b"value" * 10, FLAG_BYTES | FLAG_COMPRESSED)
check(serde, b"\xc2\xa3 $ \xe2\x82\xac" * 10, FLAG_BYTES | FLAG_COMPRESSED) # £ $ €

# test_unicode
check(serde, "value" * 10, FLAG_TEXT | FLAG_COMPRESSED)
check(serde, "£ $ €" * 10, FLAG_TEXT | FLAG_COMPRESSED)

# test_int, doesn't make sense to compress
check(serde, sys.maxsize, FLAG_INTEGER)

# test_pickleable
check(
serde,
{
"foo": "bar",
"baz": "qux",
"uno": "dos",
"tres": "tres",
},
FLAG_PICKLE | FLAG_COMPRESSED,
)

# test_subtype
# Subclass of a native type will be restored as the same type
check(serde, CustomInt(sys.maxsize), FLAG_PICKLE | FLAG_COMPRESSED)