2
2
from unittest import TestCase
3
3
4
4
from spacepackets .ccsds import CdsShortTimestamp
5
- from spacepackets .ccsds .spacepacket import parse_space_packets
5
+ from spacepackets .ccsds .spacepacket import parse_space_packets , parse_space_packets_from_deque
6
6
from spacepackets .ecss .tm import PusTm
7
7
8
8
@@ -18,14 +18,33 @@ def setUp(self) -> None:
18
18
self .packet_ids = (self .tm_packet .packet_id ,)
19
19
self .tm_packet_raw = self .tm_packet .pack ()
20
20
self .packet_deque = deque ()
21
+ self .packet_buf = bytearray ()
21
22
22
23
def test_sp_parser (self ):
23
- self .packet_deque .appendleft (self .tm_packet_raw )
24
- self .packet_deque .appendleft (self .tm_packet_raw )
25
- sp_list = parse_space_packets (analysis_queue = self .packet_deque , packet_ids = self .packet_ids )
26
- self .assertEqual (len (sp_list ), 2 )
27
- self .assertEqual (sp_list [0 ], self .tm_packet_raw )
28
- self .assertEqual (sp_list [1 ], self .tm_packet_raw )
24
+ self .packet_buf .extend (self .tm_packet_raw )
25
+ self .packet_buf .extend (self .tm_packet_raw )
26
+
27
+ result = parse_space_packets (buf = self .packet_buf , packet_ids = self .packet_ids )
28
+ self .assertEqual (len (result .tm_list ), 2 )
29
+ self .assertEqual (result .tm_list [0 ], self .tm_packet_raw )
30
+ self .assertEqual (result .tm_list [1 ], self .tm_packet_raw )
31
+ self .assertEqual (result .scanned_bytes , len (self .tm_packet_raw ) * 2 )
32
+ self .assertEqual (len (result .skipped_ranges ), 0 )
33
+
34
+ def test_sp_parser_with_deque (self ):
35
+ self .packet_deque .append (self .tm_packet_raw )
36
+ self .packet_deque .append (self .tm_packet_raw )
37
+ result = parse_space_packets_from_deque (self .packet_deque , self .packet_ids )
38
+ self .assertEqual (len (result .tm_list ), 2 )
39
+ self .assertEqual (result .tm_list [0 ], self .tm_packet_raw )
40
+ self .assertEqual (result .tm_list [1 ], self .tm_packet_raw )
41
+ self .assertEqual (result .scanned_bytes , len (self .tm_packet_raw ) * 2 )
42
+ self .assertEqual (len (result .skipped_ranges ), 0 )
43
+ self .assertEqual (len (self .packet_deque ), 2 )
44
+ flattened_deque = bytearray ()
45
+ while self .packet_deque :
46
+ flattened_deque .extend (self .packet_deque .popleft ())
47
+ self .assertEqual (len (flattened_deque ), len (self .tm_packet_raw ) * 2 )
29
48
30
49
def test_sp_parser_crap_data_is_skipped (self ):
31
50
other_larger_packet = PusTm (
@@ -36,42 +55,52 @@ def test_sp_parser_crap_data_is_skipped(self):
36
55
timestamp = CdsShortTimestamp .empty ().pack (),
37
56
)
38
57
other_larger_packet_raw = other_larger_packet .pack ()
39
- self .packet_deque .append (self .tm_packet_raw )
40
- self .packet_deque .append (bytearray (8 ))
41
- self .packet_deque .append (other_larger_packet_raw )
42
- sp_list = parse_space_packets (analysis_queue = self .packet_deque , packet_ids = self .packet_ids )
43
- self .assertEqual (len (sp_list ), 2 )
44
- self .assertEqual (sp_list [0 ], self .tm_packet_raw )
45
- self .assertEqual (sp_list [1 ], other_larger_packet_raw )
58
+ self .packet_buf .extend (self .tm_packet_raw )
59
+ # Crap data, could also be a CCSDS packet with an unknown packet ID.
60
+ self .packet_buf .extend (bytearray (8 ))
61
+ self .packet_buf .extend (other_larger_packet_raw )
62
+ result = parse_space_packets (self .packet_buf , packet_ids = self .packet_ids )
63
+ self .assertEqual (len (result .tm_list ), 2 )
64
+ self .assertEqual (result .tm_list [0 ], self .tm_packet_raw )
65
+ self .assertEqual (result .tm_list [1 ], other_larger_packet_raw )
66
+ self .assertEqual (
67
+ result .skipped_ranges , [range (len (self .tm_packet_raw ), len (self .tm_packet_raw ) + 8 )]
68
+ )
46
69
47
70
def test_sp_parser_crap_data (self ):
48
- self .packet_deque .appendleft (bytearray (3 ))
49
- sp_list = parse_space_packets (analysis_queue = self .packet_deque , packet_ids = self .packet_ids )
50
- self .assertEqual (len (sp_list ), 0 )
51
- sp_list = parse_space_packets (analysis_queue = self .packet_deque , packet_ids = self .packet_ids )
52
- self .assertEqual (len (sp_list ), 0 )
71
+ self .packet_buf .extend (bytearray (3 ))
72
+ result = parse_space_packets (self .packet_buf , packet_ids = self .packet_ids )
73
+ self .assertEqual (len (result .tm_list ), 0 )
74
+ self .assertEqual (result .scanned_bytes , 0 )
75
+ self .assertEqual (result .skipped_ranges , [])
76
+
77
+ self .packet_buf = bytearray (7 )
78
+ result = parse_space_packets (self .packet_buf , packet_ids = self .packet_ids )
79
+ self .assertEqual (len (result .tm_list ), 0 )
80
+ # Scanned one byte
81
+ self .assertEqual (result .scanned_bytes , 1 )
82
+ self .assertEqual (result .skipped_ranges , [range (1 )])
53
83
54
84
def test_broken_packet (self ):
55
85
# slice TM packet in half
56
86
tm_packet_first_half = self .tm_packet_raw [:10 ]
57
87
tm_packet_second_half = self .tm_packet_raw [10 :]
58
- self .packet_deque . appendleft (tm_packet_first_half )
59
- sp_list = parse_space_packets (analysis_queue = self .packet_deque , packet_ids = self .packet_ids )
60
- self .assertEqual (len (sp_list ), 0 )
61
- self .assertEqual (len ( self . packet_deque ), 1 )
62
- self .packet_deque . append (tm_packet_second_half )
63
- sp_list = parse_space_packets (analysis_queue = self .packet_deque , packet_ids = self .packet_ids )
64
- self .assertEqual (len (sp_list ), 1 )
65
- self .assertEqual (len ( self . packet_deque ), 0 )
66
- self .assertEqual (sp_list [ 0 ], self .tm_packet_raw )
88
+ self .packet_buf . extend (tm_packet_first_half )
89
+ result = parse_space_packets (self .packet_buf , packet_ids = self .packet_ids )
90
+ self .assertEqual (len (result . tm_list ), 0 )
91
+ self .assertEqual (result . scanned_bytes , 0 )
92
+ self .packet_buf . extend (tm_packet_second_half )
93
+ result = parse_space_packets (self .packet_buf , packet_ids = self .packet_ids )
94
+ self .assertEqual (len (result . tm_list ), 1 )
95
+ self .assertEqual (result . tm_list [ 0 ], self . tm_packet_raw )
96
+ self .assertEqual (result . scanned_bytes , len ( self .tm_packet_raw ) )
67
97
68
98
def test_broken_packet_at_end (self ):
69
- self .packet_deque . append (self .tm_packet_raw )
99
+ self .packet_buf . extend (self .tm_packet_raw )
70
100
# slice TM packet in half
71
101
tm_packet_first_half = self .tm_packet_raw [:10 ]
72
- self .packet_deque .append (tm_packet_first_half )
73
- sp_list = parse_space_packets (analysis_queue = self .packet_deque , packet_ids = self .packet_ids )
74
- self .assertEqual (len (sp_list ), 1 )
75
- self .assertEqual (len (self .packet_deque ), 1 )
76
- self .assertEqual (self .packet_deque .pop (), tm_packet_first_half )
77
- self .assertEqual (sp_list [0 ], self .tm_packet_raw )
102
+ self .packet_buf .extend (tm_packet_first_half )
103
+ result = parse_space_packets (self .packet_buf , packet_ids = self .packet_ids )
104
+ self .assertEqual (len (result .tm_list ), 1 )
105
+ self .assertEqual (result .tm_list [0 ], self .tm_packet_raw )
106
+ self .assertEqual (result .scanned_bytes , len (self .tm_packet_raw ))
0 commit comments