Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions hathor/serialization/encoding/ecv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Copyright 2025 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module implements ECV (Exponential-continuation Varint) for integers (signed or unsigned).

ECV is based on LEB128, but instead of using a continuation bit on every byte, it uses a continuation bit every 2**N
bytes, that means each continuation bit implies a doubling in size, instead of a 1-byte increase. This change means
that the amount of continuation bits is proportional to O(log(N)) instead of O(N) for an integer of size N bytes.

- Layout is little-endian across bytes (like LEB128).
- Bytes at 1-based positions that are powers of two are *control* bytes:
MSB = continuation flag, lower 7 bits are payload.
- Other bytes are full 8-bit payload bytes.
- Total length is always a power of two (1, 2, 4, 8, ... bytes).

>>> se = Serializer.build_bytes_serializer()
>>> se.write_bytes(b'test') # writes 74657374
>>> encode_ecv(se, 0, signed=True) # writes 00
>>> encode_ecv(se, 624485, signed=False) # writes e58e2600
>>> encode_ecv(se, -123456, signed=True) # writes c0bbf87f
>>> bytes(se.finalize()).hex()
'7465737400e58e2600c0bbf87f'

>>> data = bytes.fromhex('00 e58e2600 c0bbf87f 74657374')
>>> de = Deserializer.build_bytes_deserializer(data)
>>> decode_ecv(de, signed=True) # reads 00
0
>>> decode_ecv(de, signed=False) # reads e58e2600
624485
>>> decode_ecv(de, signed=True) # reads c0bbf87f
-123456
>>> bytes(de.read_all()) # reads 74657374
b'test'
>>> de.finalize()
"""

from hathor.serialization import Deserializer, Serializer


def _is_pow2(x: int) -> bool:
return x > 0 and (x & (x - 1)) == 0


def encode_ecv(serializer: Serializer, value: int, *, signed: bool) -> None:
""" Encodes an integer using ECV.

Caller must explicitly choose `signed=True` or `signed=False`.

This module's docstring has more details on ECV and examples.
"""
if not signed and value < 0:
raise ValueError('cannot encode value <0 as unsigend')
pos = 1
while True:
if _is_pow2(pos):
byte = value & 0b0111_1111
value >>= 7
if signed:
cont = (value == 0 and (byte & 0b0100_0000) == 0) or (value == -1 and (byte & 0b0100_0000) != 0)
else:
cont = (value == 0 and (byte & 0b1000_0000) == 0)
if cont:
serializer.write_byte(byte)
break
else:
byte |= 0b1000_0000
else:
byte = value & 0b1111_1111
value >>= 8
serializer.write_byte(byte)
pos += 1


def decode_ecv(deserializer: Deserializer, *, signed: bool) -> int:
""" Decodes an ECV-encoded integer.

Caller must explicitly choose `signed=True` or `signed=False`.

This module's docstring has more details on ECV and examples.
"""
result = 0
shift = 0
pos = 1
while True:
byte = deserializer.read_byte()
if _is_pow2(pos):
result |= (byte & 0b0111_1111) << shift
shift += 7
if (byte & 0b1000_0000) == 0:
if signed and (byte & 0b0100_0000) != 0:
return result | -(1 << shift)
else:
return result
else:
result |= byte << shift
shift += 8
pos += 1
3 changes: 2 additions & 1 deletion hathor/serialization/encoding/leb128.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,5 @@ def decode_leb128(deserializer: Deserializer, *, signed: bool) -> int:
if (byte & 0b1000_0000) == 0:
if signed and (byte & 0b0100_0000) != 0:
return result | -(1 << shift)
return result
else:
return result
162 changes: 162 additions & 0 deletions tests/serialization/test_ecv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import pytest


def _do_round_trip_test_with_size(n: int, encoded_size: int, signed: bool) -> None:
from hathor.serialization import Deserializer, Serializer
from hathor.serialization.encoding.ecv import decode_ecv, encode_ecv
se = Serializer.build_bytes_serializer()
encode_ecv(se, n, signed=signed)
encoded_n = bytes(se.finalize())
assert len(encoded_n) == encoded_size
de = Deserializer.build_bytes_deserializer(encoded_n)
assert decode_ecv(de, signed=signed) == n


def _is_pow2(x: int) -> bool:
return x > 0 and (x & (x - 1)) == 0


def _capacity_bits_for_size(size: int) -> int:
assert _is_pow2(size), f'ECV size must be a power of two, got {size}'
return 8 * size - size.bit_length()


EXAMPLES_SIGNED_BY_SIZE = {
1: [
0,
1,
2,
3,
4,
50,
62,
63,
-1,
-2,
-3,
-63,
-64,
],
2: [
64,
65,
66,
1000,
3001,
8190,
8191,
-65,
-66,
-67,
-3000,
-8191,
-8192,
],
4: [
8192,
8193,
9000,
100000,
1048574,
1048575,
1048576,
268435454,
268435455,
-8193,
-8194,
-100000,
-1048575,
-1048576,
-1048577,
-268435455,
-268435456,
],
}


def gen_signed_test_cases():
test_cases = []
# convert example to test cases
for size, examples in EXAMPLES_SIGNED_BY_SIZE.items():
for example in examples:
test_cases.append((example, size))
# generate additional test cases
for size_power in range(2, 7):
size = 1 << size_power
cap_size = _capacity_bits_for_size(size)
cap_size_prev = _capacity_bits_for_size(size >> 1)
n_pos_hi = (1 << (cap_size - 1)) - 1
n_pos_lo = 1 << (cap_size_prev - 1)
n_neg_hi = -(1 << (cap_size_prev - 1)) - 1
n_neg_lo = -(1 << (cap_size - 1))
test_cases.append((n_pos_lo, size))
test_cases.append((n_pos_hi, size))
test_cases.append((n_neg_lo, size))
test_cases.append((n_neg_hi, size))
return test_cases


@pytest.mark.parametrize('n, encoded_size', gen_signed_test_cases())
def test_signed_round_trip_with_size(n, encoded_size):
_do_round_trip_test_with_size(n, encoded_size, True)


EXAMPLES_UNSIGNED_BY_SIZE = {
1: [
0,
1,
2,
3,
4,
50,
62,
63,
64,
126,
127,
],
2: [
128,
129,
1000,
3001,
8190,
8191,
8192,
16382,
16383,
],
4: [
16384,
100000,
1048574,
1048575,
1048576,
2097150,
2097151,
2097152,
536870910,
536870911,
],
}


def gen_unsigned_test_cases():
test_cases = []
# convert example to test cases
for size, examples in EXAMPLES_UNSIGNED_BY_SIZE.items():
for example in examples:
test_cases.append((example, size))
# generate additional test cases
for size_power in range(2, 7):
size = 1 << size_power
n_lo = 1 << _capacity_bits_for_size(size >> 1)
n_hi = (1 << _capacity_bits_for_size(size)) - 1
test_cases.append((n_lo, size))
test_cases.append((n_hi, size))
return test_cases


@pytest.mark.parametrize('n, encoded_size', gen_unsigned_test_cases())
def test_unsigned_round_trip_with_size(n, encoded_size):
_do_round_trip_test_with_size(n, encoded_size, False)
Loading