Skip to content

Commit

Permalink
STY: Code reuse within LzwCodec.encode (#2885)
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinThoma authored Sep 29, 2024
1 parent 42de71a commit 8e1799e
Showing 1 changed file with 21 additions and 34 deletions.
55 changes: 21 additions & 34 deletions pypdf/_codecs/_codecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,23 @@ class LzwCodec(Codec):
INITIAL_BITS_PER_CODE = 9 # Initial code bit width
MAX_BITS_PER_CODE = 12 # Maximum code bit width

def __init__(self) -> None:
"""Initialize codec and reset the compression table."""
self.clear_table()

def clear_table(self) -> None:
"""Reset the encoding table and coding state to initial conditions."""
def _initialize_encoding_table(self) -> None:
"""Initialize the encoding table and state to initial conditions."""
self.table: Dict[bytes, int] = {bytes([i]): i for i in range(256)}
self.next_code = self.EOD_MARKER + 1
self.bits_per_code = self.INITIAL_BITS_PER_CODE
self.max_code_value = (1 << self.bits_per_code) - 1

def _increase_next_code(self) -> None:
"""Update bits_per_code and max_code_value if necessary."""
self.next_code += 1
if (
self.next_code > self.max_code_value
and self.bits_per_code < self.MAX_BITS_PER_CODE
):
self.bits_per_code += 1
self.max_code_value = (1 << self.bits_per_code) - 1

def encode(self, data: bytes) -> bytes:
"""
Encode data using the LZW compression algorithm.
Expand All @@ -66,7 +72,7 @@ def encode(self, data: bytes) -> bytes:

# The encoder shall begin by issuing a clear-table code
result_codes.append(self.CLEAR_TABLE_MARKER)
self.clear_table()
self._initialize_encoding_table()

current_sequence = b""
for byte in data:
Expand All @@ -82,37 +88,28 @@ def encode(self, data: bytes) -> bytes:
# Add the new sequence to the table if there's room
if self.next_code <= (1 << self.MAX_BITS_PER_CODE) - 1:
self.table[next_sequence] = self.next_code
self.next_code += 1
# Increase bits_per_code if necessary
if (
self.next_code > self.max_code_value
and self.bits_per_code < self.MAX_BITS_PER_CODE
):
self.bits_per_code += 1
self.max_code_value = (1 << self.bits_per_code) - 1
self._increase_next_code()
else:
# If the table is full, emit a clear-table command
result_codes.append(self.CLEAR_TABLE_MARKER)
self.clear_table()
self._initialize_encoding_table()

# Start new sequence
current_sequence = bytes([byte])

# Ensure everything actually is encoded
if current_sequence:
result_codes.append(self.table[current_sequence])

result_codes.append(self.EOD_MARKER)

return self.pack_codes_into_bytes(result_codes)
return self._pack_codes_into_bytes(result_codes)

def pack_codes_into_bytes(self, codes: List[int]) -> bytes:
def _pack_codes_into_bytes(self, codes: List[int]) -> bytes:
"""
Convert the list of result codes into a continuous byte stream, with codes packed as per the code bit-width.
The bit-width starts at 9 bits and expands as needed.
"""
# Reset coding state
self.clear_table()
self._initialize_encoding_table()
buffer = 0
bits_in_buffer = 0
output = bytearray()
Expand All @@ -128,22 +125,12 @@ def pack_codes_into_bytes(self, codes: List[int]) -> bytes:
bits_in_buffer -= 8
output.append((buffer >> bits_in_buffer) & 0xFF)

# After a clear-table marker, reset coding state
if code == self.CLEAR_TABLE_MARKER:
self.clear_table()
self._initialize_encoding_table()
elif code == self.EOD_MARKER:
# Do not increment next_code for EOD_MARKER
pass
continue
else:
# Increase next_code after processing each code (except special codes)
self.next_code += 1
# Increase bits_per_code if necessary
if (
self.next_code > self.max_code_value
and self.bits_per_code < self.MAX_BITS_PER_CODE
):
self.bits_per_code += 1
self.max_code_value = (1 << self.bits_per_code) - 1
self._increase_next_code()

# Flush any remaining bits in the buffer
if bits_in_buffer > 0:
Expand Down

0 comments on commit 8e1799e

Please sign in to comment.