Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/.pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,4 @@ repos:
rev: '4.0.1'
hooks:
- id: flake8
args: [ "--ignore=E501,W503" ]
args: [ "--ignore=E501,W503,E203" ]
11 changes: 11 additions & 0 deletions python/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,14 @@
limitations under the License.

--------------------------------------------------------------------------------

This product includes code from Apache Avro.

* Code for initializing the Avro (de)compression codecs
* The Binary decoder for reading in an Avro byte stream

Copyright: 2014-2022 The Apache Software Foundation.
Home page: https://avro.apache.org/
License: https://www.apache.org/licenses/LICENSE-2.0

--------------------------------------------------------------------------------
312 changes: 268 additions & 44 deletions python/poetry.lock

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,15 @@ mmh3 = "^3.0.0"

pyarrow = { version = "^8.0.0", optional = true }

zstandard = { version = "^0.17.0", optional = true }

python-snappy = { version = "^0.6.1", optional = true }

[tool.poetry.dev-dependencies]
pytest = "^7.0.0"
pytest-checkdocs = "^2.0.0"
pre-commit = "^2.0.0"
fastavro = "^1.5.1"
coverage = { version = "^6.0.0", extras = ["toml"] }

[build-system]
Expand All @@ -55,6 +60,8 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry.extras]
pyarrow = ["pyarrow"]
snappy = ["python-snappy"]
python-snappy = ["zstandard"]

[tool.black]
line-length = 130
Expand All @@ -79,5 +86,13 @@ warn_unreachable = true
module = "mypy-pyarrow.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "mypy-snappy.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "mypy-zstandard.*"
ignore_missing_imports = true

[tool.coverage.run]
source = ['src/']
6 changes: 6 additions & 0 deletions python/spellcheck-dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,9 @@ UnboundPredicate
BoundPredicate
BooleanExpression
BooleanExpressionVisitor
zigzag
unix
zlib
Codecs
codecs
uri
16 changes: 16 additions & 0 deletions python/src/iceberg/avro/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
40 changes: 40 additions & 0 deletions python/src/iceberg/avro/codecs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Contains Codecs for Python Avro.

Note that the word "codecs" means "compression/decompression algorithms" in the
Avro world (https://avro.apache.org/docs/current/spec.html#Object+Container+Files),
so don't confuse it with the Python's "codecs", which is a package mainly for
converting character sets (https://docs.python.org/3/library/codecs.html).
"""
from __future__ import annotations

from iceberg.avro.codecs.bzip2 import BZip2Codec
from iceberg.avro.codecs.codec import Codec
from iceberg.avro.codecs.deflate import DeflateCodec
from iceberg.avro.codecs.snappy_codec import SnappyCodec
from iceberg.avro.codecs.zstandard_codec import ZStandardCodec

KNOWN_CODECS: dict[str, type[Codec] | None] = {
"null": None,
"bzip2": BZip2Codec,
"snappy": SnappyCodec,
"zstandard": ZStandardCodec,
"deflate": DeflateCodec,
}
43 changes: 43 additions & 0 deletions python/src/iceberg/avro/codecs/bzip2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from iceberg.avro.codecs.codec import Codec

try:
import bz2

class BZip2Codec(Codec):
@staticmethod
def compress(data: bytes) -> tuple[bytes, int]:
compressed_data = bz2.compress(data)
return compressed_data, len(compressed_data)

@staticmethod
def decompress(data: bytes) -> bytes:
return bz2.decompress(data)

except ImportError:

class BZip2Codec(Codec): # type: ignore
@staticmethod
def compress(data: bytes) -> tuple[bytes, int]:
raise ImportError("Python bzip2 support not installed, please install the extension")

@staticmethod
def decompress(data: bytes) -> bytes:
raise ImportError("Python bzip2 support not installed, please install the extension")
33 changes: 33 additions & 0 deletions python/src/iceberg/avro/codecs/codec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from abc import ABC, abstractmethod


class Codec(ABC):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: would it make sense to put this in the __init__.py file?

Copy link
Contributor Author

@Fokko Fokko Jun 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some projects do this, and some don't :) For me, it makes sense to add base classes in the init. Mostly because they need to be loaded anyway, and by adding them to the __init__.py they are read when you access the module. Also, for the case of the codecs, this avoids having yet another file. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for adding it to __init__.py. Fewer files cuts down on load time and we will need to load it anyway.

"""Abstract base class for all Avro codec classes."""

@staticmethod
@abstractmethod
def compress(data: bytes) -> tuple[bytes, int]:
...

@staticmethod
@abstractmethod
def decompress(data: bytes) -> bytes:
...
36 changes: 36 additions & 0 deletions python/src/iceberg/avro/codecs/deflate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import zlib

from iceberg.avro.codecs.codec import Codec


class DeflateCodec(Codec):
@staticmethod
def compress(data: bytes) -> tuple[bytes, int]:
# The first two characters and last character are zlib
# wrappers around deflate data.
compressed_data = zlib.compress(data)[2:-1]
return compressed_data, len(compressed_data)

@staticmethod
def decompress(data: bytes) -> bytes:
# -15 is the log of the window size; negative indicates
# "raw" (no zlib headers) decompression. See zlib.h.
return zlib.decompress(data, -15)
69 changes: 69 additions & 0 deletions python/src/iceberg/avro/codecs/snappy_codec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import binascii
import struct

from iceberg.avro.codecs.codec import Codec

STRUCT_CRC32 = struct.Struct(">I") # big-endian unsigned int

try:
import snappy

class SnappyCodec(Codec):
@staticmethod
def _check_crc32(bytes_: bytes, checksum: bytes) -> None:
"""Incrementally compute CRC-32 from bytes and compare to a checksum

Args:
bytes_ (bytes): The bytes to check against `checksum`
checksum (bytes): Byte representation of a checksum

Raises:
ValueError: If the computed CRC-32 does not match the checksum
"""
if binascii.crc32(bytes_) & 0xFFFFFFFF != STRUCT_CRC32.unpack(checksum)[0]:
raise ValueError("Checksum failure")

@staticmethod
def compress(data: bytes) -> tuple[bytes, int]:
compressed_data = snappy.compress(data)
# A 4-byte, big-endian CRC32 checksum
compressed_data += STRUCT_CRC32.pack(binascii.crc32(data) & 0xFFFFFFFF)
return compressed_data, len(compressed_data)

@staticmethod
def decompress(data: bytes) -> bytes:
# Compressed data includes a 4-byte CRC32 checksum
data = data[0:-4]
uncompressed = snappy.decompress(data)
checksum = data[-4:]
SnappyCodec._check_crc32(uncompressed, checksum)
return uncompressed

except ImportError:

class SnappyCodec(Codec): # type: ignore
@staticmethod
def compress(data: bytes) -> tuple[bytes, int]:
raise ImportError("Snappy support not installed, please install using `pip install pyiceberg[snappy]`")

@staticmethod
def decompress(data: bytes) -> bytes:
raise ImportError("Snappy support not installed, please install using `pip install pyiceberg[snappy]`")
53 changes: 53 additions & 0 deletions python/src/iceberg/avro/codecs/zstandard_codec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from io import BytesIO

from iceberg.avro.codecs.codec import Codec

try:
from zstandard import ZstdCompressor, ZstdDecompressor

class ZStandardCodec(Codec):
@staticmethod
def compress(data: bytes) -> tuple[bytes, int]:
compressed_data = ZstdCompressor().compress(data)
return compressed_data, len(compressed_data)

@staticmethod
def decompress(data: bytes) -> bytes:
uncompressed = bytearray()
dctx = ZstdDecompressor()
with dctx.stream_reader(BytesIO(data)) as reader:
while True:
chunk = reader.read(16384)
if not chunk:
break
uncompressed.extend(chunk)
return uncompressed

except ImportError:

class ZStandardCodec(Codec): # type: ignore
@staticmethod
def compress(data: bytes) -> tuple[bytes, int]:
raise ImportError("Zstandard support not installed, please install using `pip install pyiceberg[zstandard]`")

@staticmethod
def decompress(data: bytes) -> bytes:
raise ImportError("Zstandard support not installed, please install using `pip install pyiceberg[zstandard]`")
Loading