-
-
Notifications
You must be signed in to change notification settings - Fork 164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Tool to find likely stored block from deflate streams of a known file #99
Comments
import zlib
import struct
import argparse
from collections import defaultdict
# Helper class to read bits from byte data
class BitReader:
def __init__(self, data):
self.data = data
self.pos = 0
self.bit_pos = 0
def read_bits(self, num_bits):
result = 0
for _ in range(num_bits):
if self.bit_pos == 0:
self.current_byte = self.data[self.pos]
self.pos += 1
result |= ((self.current_byte >> self.bit_pos) & 1) << _
self.bit_pos = (self.bit_pos + 1) % 8
return result
def read_bytes(self, num_bytes):
result = self.data[self.pos:self.pos + num_bytes]
self.pos += num_bytes
return result
# Helper functions for Huffman decoding
def build_huffman_tree(codes_lengths):
"""Builds a Huffman tree for given code lengths."""
code_to_symbol = {}
max_bits = max(codes_lengths)
bl_count = [0] * (max_bits + 1)
next_code = [0] * (max_bits + 1)
for length in codes_lengths:
bl_count[length] += 1
code = 0
for bits in range(1, max_bits + 1):
code = (code + bl_count[bits - 1]) << 1
next_code[bits] = code
for symbol, length in enumerate(codes_lengths):
if length != 0:
code_to_symbol[next_code[length]] = symbol
next_code[length] += 1
return code_to_symbol
def decode_huffman_code(bit_reader, huffman_tree):
"""Decodes a symbol using the provided Huffman tree."""
code = 0
while True:
code = (code << 1) | bit_reader.read_bits(1)
if code in huffman_tree:
return huffman_tree[code]
def parse_deflate_stream(data):
"""Parse a deflate stream and return offsets of stored blocks."""
offsets = []
bit_reader = BitReader(data)
while bit_reader.pos < len(data):
byte = bit_reader.read_bits(8)
if byte & 0x01: # Check if the block is the last block
last_block = True
else:
last_block = False
block_type = (byte >> 1) & 0x03
if block_type == 0: # Stored block
if bit_reader.pos + 4 > len(data):
raise ValueError("Unexpected end of data")
length, nlength = struct.unpack_from("<HH", data, bit_reader.pos)
bit_reader.pos += 4
if length != (~nlength & 0xFFFF):
raise ValueError("Stored block length does not match its one's complement")
offsets.append(bit_reader.pos)
bit_reader.pos += length
elif block_type == 1: # Fixed Huffman
parse_fixed_huffman_block(bit_reader)
elif block_type == 2: # Dynamic Huffman
parse_dynamic_huffman_block(bit_reader)
else:
raise ValueError("Reserved block type encountered")
if last_block:
break
return offsets
def parse_fixed_huffman_block(bit_reader):
"""Parse fixed Huffman blocks."""
lit_len_tree = build_fixed_huffman_tree()
dist_tree = build_fixed_distance_tree()
while True:
symbol = decode_huffman_code(bit_reader, lit_len_tree)
if symbol < 256: # Literal byte
continue
elif symbol == 256: # End of block
break
else: # Length/Distance pair
length = decode_length(bit_reader, symbol)
distance = decode_distance(bit_reader, dist_tree)
bit_reader.pos += length # Skip over the length of data
return
def parse_dynamic_huffman_block(bit_reader):
"""Parse dynamic Huffman blocks."""
hlit = bit_reader.read_bits(5) + 257
hdist = bit_reader.read_bits(5) + 1
hclen = bit_reader.read_bits(4) + 4
code_length_order = [16, 17, 18, 0, 8, 7, 9, 6, 10, 5, 11, 4, 12, 3, 13, 2, 14, 1, 15]
code_lengths = [0] * 19
for i in range(hclen):
code_lengths[code_length_order[i]] = bit_reader.read_bits(3)
code_length_tree = build_huffman_tree(code_lengths)
lit_len_lengths = []
dist_lengths = []
num_lengths = hlit + hdist
while len(lit_len_lengths) + len(dist_lengths) < num_lengths:
symbol = decode_huffman_code(bit_reader, code_length_tree)
if symbol <= 15:
lit_len_lengths.append(symbol)
elif symbol == 16:
repeat = bit_reader.read_bits(2) + 3
lit_len_lengths.extend([lit_len_lengths[-1]] * repeat)
elif symbol == 17:
repeat = bit_reader.read_bits(3) + 3
lit_len_lengths.extend([0] * repeat)
elif symbol == 18:
repeat = bit_reader.read_bits(7) + 11
lit_len_lengths.extend([0] * repeat)
lit_len_tree = build_huffman_tree(lit_len_lengths[:hlit])
dist_tree = build_huffman_tree(lit_len_lengths[hlit:hlit + hdist])
while True:
symbol = decode_huffman_code(bit_reader, lit_len_tree)
if symbol < 256: # Literal byte
continue
elif symbol == 256: # End of block
break
else: # Length/Distance pair
length = decode_length(bit_reader, symbol)
distance = decode_distance(bit_reader, dist_tree)
bit_reader.pos += length # Skip over the length of data
return
def build_fixed_huffman_tree():
"""Builds a fixed Huffman tree for literals/lengths."""
lit_len_lengths = [8] * 144 + [9] * 112 + [7] * 24 + [8] * 8
return build_huffman_tree(lit_len_lengths)
def build_fixed_distance_tree():
"""Builds a fixed Huffman tree for distances."""
dist_lengths = [5] * 32
return build_huffman_tree(dist_lengths)
def decode_length(bit_reader, symbol):
"""Decodes the length based on the symbol."""
if symbol <= 264:
return symbol - 254
elif symbol == 285:
return 258
else:
extra_bits = (symbol - 265) // 4 + 1
base_length = (symbol - 261) * 4 + 11
return base_length + bit_reader.read_bits(extra_bits)
def decode_distance(bit_reader, dist_tree):
"""Decodes the distance based on the Huffman tree."""
symbol = decode_huffman_code(bit_reader, dist_tree)
if symbol < 4:
return symbol + 1
else:
extra_bits = (symbol // 2) - 1
base_distance = (2 + (symbol % 2)) << extra_bits
return base_distance + bit_reader.read_bits(extra_bits)
def brute_force_offsets(deflate_streams):
"""Brute force the offset of stored blocks in multiple deflate streams."""
common_offsets = None
for stream in deflate_streams:
offsets = parse_deflate_stream(stream)
if common_offsets is None:
common_offsets = set(offsets)
else:
common_offsets &= set(offsets)
return common_offsets
def main():
parser = argparse.ArgumentParser(description="Find common stored block offsets in deflate streams.")
parser.add_argument('files', metavar='F', type=str, nargs='+', help='Paths to deflate stream files')
args = parser.parse_args()
deflate_streams = []
for file in args.files:
with open(file, 'rb') as f:
deflate_streams.append(f.read())
common_offsets = brute_force_offsets(deflate_streams)
print("Common stored block offsets:", common_offsets)
if __name__ == "__main__":
main()
`
|
Thank you for sharing this. An idea to make the tool more useful: it would be better to list overlapping intervals (offset and length) of stored data relative to the original file, and their corresponding position in deflate streams. |
Thank you for the idea to automate this task. I put together a script that takes a list of files (or a wildcard) and finds offsets with matching values across all of them. It performs the search from the beginning of the file as well as from the end (positive and negative offsets). This could be useful either for finding shared blocks of bytes across various "zippers" and compression settings of a known plaintext file, or for finding common byte areas in a specific file format that could then be used against an encrypted archive with a stored block. import sys
import glob
import argparse
from itertools import combinations
def find_matching_offsets(filenames, file_contents):
contents = [file_contents[filename] for filename in filenames]
min_length = min(len(content) for content in contents)
positive_matches = []
negative_matches = []
# Positive offsets
offset = 0
while offset < min_length:
bytes_at_offset = [content[offset] for content in contents]
if len(set(bytes_at_offset)) == 1:
start_offset = offset
matching_bytes = [bytes_at_offset[0]]
offset += 1
while offset < min_length:
bytes_at_offset = [content[offset] for content in contents]
if len(set(bytes_at_offset)) == 1:
matching_bytes.append(bytes_at_offset[0])
offset += 1
else:
break
end_offset = offset - 1
positive_matches.append((start_offset, end_offset, matching_bytes))
else:
offset += 1
# Negative offsets
offset = 1
while offset <= min_length:
bytes_at_offset = [content[-offset] for content in contents]
if len(set(bytes_at_offset)) == 1:
start_offset = offset
matching_bytes = [bytes_at_offset[0]]
offset += 1
while offset <= min_length:
bytes_at_offset = [content[-offset] for content in contents]
if len(set(bytes_at_offset)) == 1:
matching_bytes.append(bytes_at_offset[0])
offset += 1
else:
break
end_offset = offset - 1
negative_matches.append((start_offset, end_offset, matching_bytes))
else:
offset += 1
total_matches = sum(len(matching_bytes) for _, _, matching_bytes in positive_matches + negative_matches)
return positive_matches, negative_matches, total_matches
def print_matches(positive_matches, negative_matches):
if positive_matches:
print("Matching bytes at positive offsets:")
for match in positive_matches:
start_offset, end_offset, matching_bytes = match
if start_offset == end_offset:
print(f"Offset {start_offset}: {matching_bytes[0]:02x}")
else:
byte_sequence = ' '.join(f"{b:02x}" for b in matching_bytes)
print(f"Offsets {start_offset}-{end_offset}: {byte_sequence}")
else:
print("No matching bytes at positive offsets.")
if negative_matches:
print("\nMatching bytes at negative offsets:")
for match in negative_matches:
start_offset, end_offset, matching_bytes = match
if start_offset == end_offset:
print(f"Offset -{start_offset}: {matching_bytes[0]:02x}")
else:
matching_bytes_reversed = list(reversed(matching_bytes))
byte_sequence = ' '.join(f"{b:02x}" for b in matching_bytes_reversed)
print(f"Offsets -{end_offset}-(-{start_offset}): {byte_sequence}")
else:
print("\nNo matching bytes at negative offsets.")
def main():
parser = argparse.ArgumentParser(description='Find matching bytes at the same offsets in multiple files.')
parser.add_argument('filenames', nargs='+', help='Filenames or wildcard pattern')
parser.add_argument('-m', '--matches', type=int, default=1, help='Desired minimum number of matching offsets')
args = parser.parse_args()
filenames = args.filenames
minimum_matches = args.matches
if len(filenames) == 1:
# Use the single argument as a wildcard pattern
filenames = glob.glob(filenames[0])
if len(filenames) < 1:
print("Wildcard pattern must match at least one file.")
sys.exit(1)
# Read file contents
file_contents = {}
for filename in filenames:
try:
with open(filename, 'rb') as f:
content = f.read()
file_contents[filename] = content
except IOError as e:
print(f"Error reading file {filename}: {e}")
sys.exit(1)
matched = False
n = len(filenames)
# Start searching for matching offsets
for group_size in range(n, 1, -1):
for combo in combinations(filenames, group_size):
positive_matches, negative_matches, total_matches = find_matching_offsets(combo, file_contents)
if total_matches >= minimum_matches:
# Matches found
print(f"Found {total_matches} matching bytes.")
print(f"Matching set of {minimum_matches} or more bytes found in a group of {group_size} out of {len(filenames)} files:")
print("Comparing files:")
for filename in combo:
print(f" - {filename}")
print()
print_matches(positive_matches, negative_matches)
matched = True
break # Exit the combinations loop
if matched:
break # Exit the group_size loop
if not matched:
print(f"No combination of files yielded at least {minimum_matches} matching offsets.")
if __name__ == '__main__':
main() You run it like this:
If you get back fewer than required matching offsets (let's say we need at least 40), you can specify that number using the
Lastly, a much simplified script that exports all .deflate blocks as individual files. import os
import sys
import glob
import struct
import argparse
from zipfile import ZipFile, ZipInfo
def extract_deflate_blocks(zip_path):
zip_base_name = os.path.splitext(os.path.basename(zip_path))[0]
with open(zip_path, 'rb') as f:
zip_file = ZipFile(f)
for zip_info in zip_file.infolist():
# Only process files that are compressed using the deflate method
if zip_info.compress_type != 8: # 8 is the code for deflate compression
print(f"Skipping {zip_info.filename}: not deflate compressed.")
continue
print(f"Processing {zip_info.filename} in {zip_base_name}...")
# Get the offset to the local file header
header_offset = zip_info.header_offset
f.seek(header_offset)
# Read and parse the local file header
local_file_header = f.read(30)
signature, version_needed, flag, compression_method, \
last_mod_time, last_mod_date, crc32, compressed_size, \
uncompressed_size, filename_length, extra_field_length = \
struct.unpack('<IHHHHHIIIHH', local_file_header)
if signature != 0x04034b50:
print(f"Error: Invalid local file header signature for {zip_info.filename}.")
continue
# Skip over the filename and extra field to reach the compressed data
f.seek(filename_length + extra_field_length, os.SEEK_CUR)
# Read the compressed data
compressed_data = f.read(zip_info.compress_size)
output_filename = f"{zip_base_name}_{os.path.splitext(zip_info.filename)[0]}.deflate"
output_dir = os.path.dirname(output_filename)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
with open(output_filename, 'wb') as output_file:
output_file.write(compressed_data)
print(f"Extracted deflate data saved to {output_filename}.")
def main():
parser = argparse.ArgumentParser(description='Extract deflate blocks from ZIP archive(s).')
parser.add_argument('filenames', nargs='+', help='Filenames or wildcard pattern')
args = parser.parse_args()
filenames = args.filenames
if len(filenames) == 1:
# Use the single argument as a wildcard pattern
filenames = glob.glob(filenames[0])
if len(filenames) < 1:
print("Wildcard pattern must match at least one file.")
sys.exit(1)
for filename in filenames:
if not os.path.isfile(filename):
print(f"Error: File '{filename}' does not exist.")
continue
extract_deflate_blocks(filename)
if __name__ == "__main__":
main()
|
Originally posted by @kimci86 in #95 (comment)
The text was updated successfully, but these errors were encountered: