Skip to content

Commit 71f4211

Browse files
authored
Merge pull request #101 from us-irs/improve-sp-parser-api
Improved space packet parser API
2 parents dc0b42e + 161781c commit 71f4211

File tree

4 files changed

+163
-86
lines changed

4 files changed

+163
-86
lines changed

Diff for: CHANGELOG.md

+9
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
88

99
# [unreleased]
1010

11+
# [v0.28.0] 2025-02-03
12+
13+
## Changed
14+
15+
- Improved the space packet parser API: The `parse_space_packets` function simply expects
16+
a byte buffer and returns results and useful context information inside a `ParseResult`
17+
structure. The former `parse_space_packets` was renamed to `parse_space_packets_from_deque`
18+
and now treats the provided `deque` read-only.
19+
1120
# [v0.27.0] 2025-01-15
1221

1322
## Changed

Diff for: src/spacepackets/ccsds/__init__.py

+6
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,14 @@
66
PacketId,
77
PacketSeqCtrl,
88
PacketType,
9+
ParserResult,
910
SequenceFlags,
1011
SpacePacket,
1112
SpacePacketHeader,
1213
SpHeader,
1314
get_total_space_packet_len_from_len_field,
15+
parse_space_packets,
16+
parse_space_packets_from_deque,
1417
)
1518
from .time import * # noqa: F403 # re-export
1619

@@ -20,9 +23,12 @@
2023
"PacketId",
2124
"PacketSeqCtrl",
2225
"PacketType",
26+
"ParserResult",
2327
"SequenceFlags",
2428
"SpHeader",
2529
"SpacePacket",
2630
"SpacePacketHeader",
2731
"get_total_space_packet_len_from_len_field",
32+
"parse_space_packets",
33+
"parse_space_packets_from_deque",
2834
]

Diff for: src/spacepackets/ccsds/spacepacket.py

+84-51
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from __future__ import annotations
55

6+
import dataclasses
67
import enum
78
import struct
89
from abc import ABC, abstractmethod
@@ -553,68 +554,100 @@ def get_total_space_packet_len_from_len_field(len_field: int) -> int:
553554
return len_field + SPACE_PACKET_HEADER_SIZE + 1
554555

555556

556-
def parse_space_packets(
557-
analysis_queue: deque[bytearray], packet_ids: Sequence[PacketId]
558-
) -> list[bytearray]:
559-
"""Given a deque of bytearrays, parse for space packets. This funtion expects the deque
560-
to be filled on the right side, for example with :py:meth:`collections.deque.append`.
561-
If a split packet with a valid header is detected, this function will re-insert the header into
562-
the given deque on the right side.
557+
@dataclasses.dataclass
558+
class ParserResult:
559+
#: List of parsed space packets.
560+
tm_list: list[bytes]
561+
#: Range of bytes which were skipped during parsing. This can happen if there are
562+
#: broken/invalid spacepackets or packets/bytes which are not CCSDS spacepackets in the
563+
#: datastream. This context information can be used for debugging.
564+
skipped_ranges: list[range]
565+
#: Number of bytes scanned. Incomplete packets will not increment this number. Therefore,
566+
#: this can be smaller than the total size of the provided buffer. Furthermore, the packet
567+
#: parser will not scan fragments at the buffer end which are smaller than the minimum CCSDS
568+
#: space packet size.
569+
scanned_bytes: int
563570

564-
:param analysis_queue:
565-
:param packet_ids:
566-
:return:
571+
@classmethod
572+
def empty(cls) -> ParserResult:
573+
return ParserResult(tm_list=[], skipped_ranges=[], scanned_bytes=0)
574+
575+
def num_of_found_packets(self) -> int:
576+
return len(self.tm_list)
577+
578+
579+
def parse_space_packets_from_deque(
580+
analysis_queue: deque[bytearray | bytes], packet_ids: Sequence[PacketId]
581+
) -> ParserResult:
582+
"""Given a deque of bytearrays, parse for space packets which start with the provided
583+
list of packet IDs.
584+
585+
This funtion expects the deque to be filled on the right side, for example with
586+
:py:meth:`collections.deque.append`. This function only reads the given deque. It fills
587+
the provided queue content into a regular byte buffer and then calls
588+
:py:meth:`parse_space_packets`.
589+
590+
The user needs to take care to clear the analysis queue depending on how this API is used. The
591+
number of bytes scanned is returned as part of the :py:class:`ParserResult` and can be used to
592+
clear the deque or only keep relevant portions for the next parse call.
567593
"""
568-
ids_raw = [packet_id.raw() for packet_id in packet_ids]
569-
tm_list = []
570-
concatenated_packets = bytearray()
594+
packet_buf = bytearray()
571595
if not analysis_queue:
572-
return tm_list
573-
while analysis_queue:
596+
return ParserResult.empty()
597+
for value in analysis_queue:
574598
# Put it all in one buffer
575-
concatenated_packets.extend(analysis_queue.popleft())
599+
packet_buf.extend(value)
600+
return parse_space_packets(packet_buf, packet_ids)
601+
602+
603+
def parse_space_packets(buf: bytearray | bytes, packet_ids: Sequence[PacketId]) -> ParserResult:
604+
"""Given a byte buffer, parse for space packets which start with the provided list of packet
605+
IDs.
606+
607+
If there are broken/invalid spacepackets or packets/bytes which are not CCSDS spacepackets in
608+
the datastream, those bytes will be skipped, and the range of skipped bytes is returned as
609+
part of the :py:class:`ParserResult`. This context information can be used for debugging or
610+
management of invalid data.
611+
612+
In case of incomplete packets where are partial packet header is found at the end of the buffer,
613+
the scanned bytes number will be the start of the incomplete packet. The number of scanned bytes
614+
returned might also be smaller than the buffer size for invalid data because the packet parser
615+
will not scan fragments at the end of the buffer which are smaller than the minimum
616+
space packet header length.
617+
"""
618+
tm_list = []
619+
skipped_ranges = []
620+
621+
packet_id_list = [packet_id.raw() for packet_id in packet_ids]
576622
current_idx = 0
577-
if len(concatenated_packets) < 6:
578-
return tm_list
579-
# Packet ID detected
623+
skip_start = None
624+
if len(buf) < 6:
625+
return ParserResult(tm_list, skipped_ranges, current_idx)
580626
while True:
581-
# Can't even parse CCSDS header. Wait for more data to arrive.
582-
if current_idx + CCSDS_HEADER_LEN >= len(concatenated_packets):
627+
# Can't even parse CCSDS packet ID. Wait for more data to arrive.
628+
if current_idx + CCSDS_HEADER_LEN >= len(buf):
629+
if skip_start is not None:
630+
skipped_ranges.append(range(skip_start, current_idx))
583631
break
584632
current_packet_id = (
585-
struct.unpack("!H", concatenated_packets[current_idx : current_idx + 2])[0]
586-
& PACKET_ID_MASK
633+
struct.unpack("!H", buf[current_idx : current_idx + 2])[0] & PACKET_ID_MASK
587634
)
588-
if current_packet_id in ids_raw:
589-
result, current_idx = __handle_packet_id_match(
590-
concatenated_packets=concatenated_packets,
591-
analysis_queue=analysis_queue,
592-
current_idx=current_idx,
593-
tm_list=tm_list,
635+
# Packet ID detected
636+
if current_packet_id in packet_id_list:
637+
total_packet_len = get_total_space_packet_len_from_len_field(
638+
struct.unpack("!H", buf[current_idx + 4 : current_idx + 6])[0]
594639
)
595-
if result != 0:
640+
# Partial header.
641+
if current_idx + total_packet_len > len(buf):
596642
break
643+
tm_list.append(buf[current_idx : current_idx + total_packet_len])
644+
if skip_start is not None:
645+
skipped_ranges.append(range(skip_start, current_idx))
646+
skip_start = None
647+
current_idx += total_packet_len
597648
else:
649+
if skip_start is None:
650+
skip_start = current_idx
598651
# Keep parsing until a packet ID is found
599652
current_idx += 1
600-
return tm_list
601-
602-
603-
def __handle_packet_id_match(
604-
concatenated_packets: bytearray,
605-
analysis_queue: deque[bytearray],
606-
current_idx: int,
607-
tm_list: list[bytearray],
608-
) -> tuple[int, int]:
609-
total_packet_len = get_total_space_packet_len_from_len_field(
610-
struct.unpack("!H", concatenated_packets[current_idx + 4 : current_idx + 6])[0]
611-
)
612-
# Might be part of packet. Put back into analysis queue as whole
613-
if current_idx + total_packet_len > len(concatenated_packets):
614-
# Clear the queue first. We are done with parsing
615-
analysis_queue.clear()
616-
analysis_queue.append(concatenated_packets[current_idx:])
617-
return -1, current_idx
618-
tm_list.append(concatenated_packets[current_idx : current_idx + total_packet_len])
619-
current_idx += total_packet_len
620-
return 0, current_idx
653+
return ParserResult(tm_list, skipped_ranges, current_idx)

Diff for: tests/ccsds/test_sp_parser.py

+64-35
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from unittest import TestCase
33

44
from spacepackets.ccsds import CdsShortTimestamp
5-
from spacepackets.ccsds.spacepacket import parse_space_packets
5+
from spacepackets.ccsds.spacepacket import parse_space_packets, parse_space_packets_from_deque
66
from spacepackets.ecss.tm import PusTm
77

88

@@ -18,14 +18,33 @@ def setUp(self) -> None:
1818
self.packet_ids = (self.tm_packet.packet_id,)
1919
self.tm_packet_raw = self.tm_packet.pack()
2020
self.packet_deque = deque()
21+
self.packet_buf = bytearray()
2122

2223
def test_sp_parser(self):
23-
self.packet_deque.appendleft(self.tm_packet_raw)
24-
self.packet_deque.appendleft(self.tm_packet_raw)
25-
sp_list = parse_space_packets(analysis_queue=self.packet_deque, packet_ids=self.packet_ids)
26-
self.assertEqual(len(sp_list), 2)
27-
self.assertEqual(sp_list[0], self.tm_packet_raw)
28-
self.assertEqual(sp_list[1], self.tm_packet_raw)
24+
self.packet_buf.extend(self.tm_packet_raw)
25+
self.packet_buf.extend(self.tm_packet_raw)
26+
27+
result = parse_space_packets(buf=self.packet_buf, packet_ids=self.packet_ids)
28+
self.assertEqual(len(result.tm_list), 2)
29+
self.assertEqual(result.tm_list[0], self.tm_packet_raw)
30+
self.assertEqual(result.tm_list[1], self.tm_packet_raw)
31+
self.assertEqual(result.scanned_bytes, len(self.tm_packet_raw) * 2)
32+
self.assertEqual(len(result.skipped_ranges), 0)
33+
34+
def test_sp_parser_with_deque(self):
35+
self.packet_deque.append(self.tm_packet_raw)
36+
self.packet_deque.append(self.tm_packet_raw)
37+
result = parse_space_packets_from_deque(self.packet_deque, self.packet_ids)
38+
self.assertEqual(len(result.tm_list), 2)
39+
self.assertEqual(result.tm_list[0], self.tm_packet_raw)
40+
self.assertEqual(result.tm_list[1], self.tm_packet_raw)
41+
self.assertEqual(result.scanned_bytes, len(self.tm_packet_raw) * 2)
42+
self.assertEqual(len(result.skipped_ranges), 0)
43+
self.assertEqual(len(self.packet_deque), 2)
44+
flattened_deque = bytearray()
45+
while self.packet_deque:
46+
flattened_deque.extend(self.packet_deque.popleft())
47+
self.assertEqual(len(flattened_deque), len(self.tm_packet_raw) * 2)
2948

3049
def test_sp_parser_crap_data_is_skipped(self):
3150
other_larger_packet = PusTm(
@@ -36,42 +55,52 @@ def test_sp_parser_crap_data_is_skipped(self):
3655
timestamp=CdsShortTimestamp.empty().pack(),
3756
)
3857
other_larger_packet_raw = other_larger_packet.pack()
39-
self.packet_deque.append(self.tm_packet_raw)
40-
self.packet_deque.append(bytearray(8))
41-
self.packet_deque.append(other_larger_packet_raw)
42-
sp_list = parse_space_packets(analysis_queue=self.packet_deque, packet_ids=self.packet_ids)
43-
self.assertEqual(len(sp_list), 2)
44-
self.assertEqual(sp_list[0], self.tm_packet_raw)
45-
self.assertEqual(sp_list[1], other_larger_packet_raw)
58+
self.packet_buf.extend(self.tm_packet_raw)
59+
# Crap data, could also be a CCSDS packet with an unknown packet ID.
60+
self.packet_buf.extend(bytearray(8))
61+
self.packet_buf.extend(other_larger_packet_raw)
62+
result = parse_space_packets(self.packet_buf, packet_ids=self.packet_ids)
63+
self.assertEqual(len(result.tm_list), 2)
64+
self.assertEqual(result.tm_list[0], self.tm_packet_raw)
65+
self.assertEqual(result.tm_list[1], other_larger_packet_raw)
66+
self.assertEqual(
67+
result.skipped_ranges, [range(len(self.tm_packet_raw), len(self.tm_packet_raw) + 8)]
68+
)
4669

4770
def test_sp_parser_crap_data(self):
48-
self.packet_deque.appendleft(bytearray(3))
49-
sp_list = parse_space_packets(analysis_queue=self.packet_deque, packet_ids=self.packet_ids)
50-
self.assertEqual(len(sp_list), 0)
51-
sp_list = parse_space_packets(analysis_queue=self.packet_deque, packet_ids=self.packet_ids)
52-
self.assertEqual(len(sp_list), 0)
71+
self.packet_buf.extend(bytearray(3))
72+
result = parse_space_packets(self.packet_buf, packet_ids=self.packet_ids)
73+
self.assertEqual(len(result.tm_list), 0)
74+
self.assertEqual(result.scanned_bytes, 0)
75+
self.assertEqual(result.skipped_ranges, [])
76+
77+
self.packet_buf = bytearray(7)
78+
result = parse_space_packets(self.packet_buf, packet_ids=self.packet_ids)
79+
self.assertEqual(len(result.tm_list), 0)
80+
# Scanned one byte
81+
self.assertEqual(result.scanned_bytes, 1)
82+
self.assertEqual(result.skipped_ranges, [range(1)])
5383

5484
def test_broken_packet(self):
5585
# slice TM packet in half
5686
tm_packet_first_half = self.tm_packet_raw[:10]
5787
tm_packet_second_half = self.tm_packet_raw[10:]
58-
self.packet_deque.appendleft(tm_packet_first_half)
59-
sp_list = parse_space_packets(analysis_queue=self.packet_deque, packet_ids=self.packet_ids)
60-
self.assertEqual(len(sp_list), 0)
61-
self.assertEqual(len(self.packet_deque), 1)
62-
self.packet_deque.append(tm_packet_second_half)
63-
sp_list = parse_space_packets(analysis_queue=self.packet_deque, packet_ids=self.packet_ids)
64-
self.assertEqual(len(sp_list), 1)
65-
self.assertEqual(len(self.packet_deque), 0)
66-
self.assertEqual(sp_list[0], self.tm_packet_raw)
88+
self.packet_buf.extend(tm_packet_first_half)
89+
result = parse_space_packets(self.packet_buf, packet_ids=self.packet_ids)
90+
self.assertEqual(len(result.tm_list), 0)
91+
self.assertEqual(result.scanned_bytes, 0)
92+
self.packet_buf.extend(tm_packet_second_half)
93+
result = parse_space_packets(self.packet_buf, packet_ids=self.packet_ids)
94+
self.assertEqual(len(result.tm_list), 1)
95+
self.assertEqual(result.tm_list[0], self.tm_packet_raw)
96+
self.assertEqual(result.scanned_bytes, len(self.tm_packet_raw))
6797

6898
def test_broken_packet_at_end(self):
69-
self.packet_deque.append(self.tm_packet_raw)
99+
self.packet_buf.extend(self.tm_packet_raw)
70100
# slice TM packet in half
71101
tm_packet_first_half = self.tm_packet_raw[:10]
72-
self.packet_deque.append(tm_packet_first_half)
73-
sp_list = parse_space_packets(analysis_queue=self.packet_deque, packet_ids=self.packet_ids)
74-
self.assertEqual(len(sp_list), 1)
75-
self.assertEqual(len(self.packet_deque), 1)
76-
self.assertEqual(self.packet_deque.pop(), tm_packet_first_half)
77-
self.assertEqual(sp_list[0], self.tm_packet_raw)
102+
self.packet_buf.extend(tm_packet_first_half)
103+
result = parse_space_packets(self.packet_buf, packet_ids=self.packet_ids)
104+
self.assertEqual(len(result.tm_list), 1)
105+
self.assertEqual(result.tm_list[0], self.tm_packet_raw)
106+
self.assertEqual(result.scanned_bytes, len(self.tm_packet_raw))

0 commit comments

Comments
 (0)