|
| 1 | +from __future__ import annotations |
| 2 | +import logging |
| 3 | +from typing import List, Union, Callable |
| 4 | + |
| 5 | +from .types import ChunkType, PayloadType |
| 6 | +from .page import UsmPage, pack_pages, get_pages |
| 7 | +from .tools import bytes_to_hex, is_valid_chunk |
| 8 | + |
| 9 | + |
| 10 | +class UsmChunk: |
| 11 | + def __init__( |
| 12 | + self, |
| 13 | + chunk_type: ChunkType, |
| 14 | + payload_type: PayloadType, |
| 15 | + payload: Union[bytes, List[UsmPage]], |
| 16 | + frame_rate: int = 30, |
| 17 | + frame_time: int = 0, |
| 18 | + padding: Union[int, Callable[[int], int]] = 0, |
| 19 | + channel_number: int = 0, |
| 20 | + payload_offset: int = 0x18, |
| 21 | + encoding: str = "UTF-8", |
| 22 | + ): |
| 23 | + self.chunk_type = chunk_type |
| 24 | + self.payload_type = payload_type |
| 25 | + self.payload = payload |
| 26 | + self.frame_rate = frame_rate |
| 27 | + self.frame_time = frame_time |
| 28 | + self._padding = padding |
| 29 | + self.channel_number = channel_number |
| 30 | + self.payload_offset = payload_offset |
| 31 | + self.encoding = encoding |
| 32 | + |
| 33 | + @property |
| 34 | + def padding(self) -> int: |
| 35 | + """The number of byte padding a chunk will have when packed.""" |
| 36 | + if isinstance(self._padding, int): |
| 37 | + return self._padding |
| 38 | + |
| 39 | + if isinstance(self.payload, list): |
| 40 | + payload_size = len(pack_pages(self.payload, self.encoding)) |
| 41 | + else: |
| 42 | + payload_size = len(self.payload) |
| 43 | + |
| 44 | + return self._padding(0x20 + payload_size) |
| 45 | + |
| 46 | + def __len__(self) -> int: |
| 47 | + """Returns the packed length of a chunk. Including _padding.""" |
| 48 | + if isinstance(self.payload, list): |
| 49 | + payload_size = len(pack_pages(self.payload, self.encoding)) |
| 50 | + else: |
| 51 | + payload_size = len(self.payload) |
| 52 | + |
| 53 | + if isinstance(self._padding, int): |
| 54 | + padding = self._padding |
| 55 | + else: |
| 56 | + padding = self._padding(0x20 + payload_size) |
| 57 | + |
| 58 | + return 0x20 + payload_size + padding |
| 59 | + |
| 60 | + @classmethod |
| 61 | + def from_bytes(cls, chunk: bytes, encoding: str = "UTF-8") -> UsmChunk: |
| 62 | + chunk = bytearray(chunk) |
| 63 | + signature = chunk[:0x4] |
| 64 | + |
| 65 | + chunksize = int.from_bytes(chunk[0x4:0x8], "big") |
| 66 | + # r08: 1 byte |
| 67 | + payload_offset = chunk[0x9] |
| 68 | + padding = int.from_bytes(chunk[0xA:0xC], "big") |
| 69 | + channel_number = chunk[0xC] |
| 70 | + # r0D: 1 byte |
| 71 | + # r0E: 1 byte |
| 72 | + |
| 73 | + payload_type = PayloadType.from_int(chunk[0xF] & 0x3) |
| 74 | + |
| 75 | + frame_time = int.from_bytes(chunk[0x10:0x14], "big") |
| 76 | + frame_rate = int.from_bytes(chunk[0x14:0x18], "big") |
| 77 | + # r18: 4 bytes |
| 78 | + # r1C: 4 bytes |
| 79 | + |
| 80 | + logging.debug( |
| 81 | + "UsmChunk: Chunk type: %s, chunk size: %x, r08: %x, payload offset: %x " |
| 82 | + + "padding: %x, chno: %x, r0D: %x, r0E: %x, payload type: %s " |
| 83 | + + "frame time: %x, frame rate: %d, r18: %s, r1C: %s", |
| 84 | + bytes_to_hex(signature), |
| 85 | + chunksize, |
| 86 | + chunk[0x8], |
| 87 | + payload_offset, |
| 88 | + padding, |
| 89 | + channel_number, |
| 90 | + chunk[0xD], |
| 91 | + chunk[0xE], |
| 92 | + payload_type, |
| 93 | + frame_time, |
| 94 | + frame_rate, |
| 95 | + bytes_to_hex(chunk[0x18:0x1C]), |
| 96 | + bytes_to_hex(chunk[0x1C:0x20]), |
| 97 | + ) |
| 98 | + |
| 99 | + if not is_valid_chunk(signature): |
| 100 | + raise ValueError(f"Invalid signature: {bytes_to_hex(signature)}") |
| 101 | + |
| 102 | + payload_begin = 0x08 + payload_offset |
| 103 | + payload_size = chunksize - padding - payload_offset |
| 104 | + payload: bytearray = chunk[payload_begin : payload_begin + payload_size] |
| 105 | + |
| 106 | + # Get pages for header and seek payload types |
| 107 | + if payload_type in [PayloadType.HEADER, PayloadType.METADATA]: |
| 108 | + payload: List[UsmPage] = get_pages(payload, encoding) |
| 109 | + for page in payload: |
| 110 | + logging.debug("Name: %s, Contents: %s", page.name, page.dict) |
| 111 | + |
| 112 | + return cls( |
| 113 | + ChunkType.from_bytes(signature), |
| 114 | + payload_type, |
| 115 | + payload, |
| 116 | + frame_rate, |
| 117 | + frame_time=frame_time, |
| 118 | + padding=padding, |
| 119 | + channel_number=channel_number, |
| 120 | + payload_offset=payload_begin, |
| 121 | + ) |
| 122 | + |
| 123 | + def pack(self) -> bytes: |
| 124 | + result = bytearray() |
| 125 | + result += self.chunk_type.value |
| 126 | + |
| 127 | + if isinstance(self.payload, list): |
| 128 | + payload = pack_pages(self.payload, self.encoding) |
| 129 | + else: |
| 130 | + payload = self.payload |
| 131 | + |
| 132 | + if isinstance(self._padding, int): |
| 133 | + padding = self._padding |
| 134 | + else: |
| 135 | + padding = self._padding(0x20 + len(payload)) |
| 136 | + |
| 137 | + chunksize = 0x18 + len(payload) + padding |
| 138 | + result += chunksize.to_bytes(4, "big") |
| 139 | + result += bytes(1) |
| 140 | + result += (0x18).to_bytes(1, "big") |
| 141 | + result += padding.to_bytes(2, "big") |
| 142 | + result += self.channel_number.to_bytes(1, "big") |
| 143 | + result += bytes(2) |
| 144 | + result += self.payload_type.value.to_bytes(1, "big") |
| 145 | + result += self.frame_time.to_bytes(4, "big") |
| 146 | + result += self.frame_rate.to_bytes(4, "big") |
| 147 | + |
| 148 | + result += bytearray(8) |
| 149 | + result += payload |
| 150 | + result += bytearray(padding) |
| 151 | + return bytes(result) |
0 commit comments