Skip to content

Commit b7fd265

Browse files
authored
Adding Save As pcapng for ProtocolAnalyzer (#970)
1 parent 8004f45 commit b7fd265

File tree

6 files changed

+165
-2
lines changed

6 files changed

+165
-2
lines changed

src/urh/controller/CompareFrameController.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from PyQt5.QtCore import pyqtSlot, QTimer, Qt, pyqtSignal, QItemSelection, QItemSelectionModel, QLocale, \
99
QModelIndex
1010
from PyQt5.QtGui import QContextMenuEvent, QIcon
11-
from PyQt5.QtWidgets import QMessageBox, QAbstractItemView, QUndoStack, QMenu, QWidget, QHeaderView
11+
from PyQt5.QtWidgets import QMessageBox, QAbstractItemView, QUndoStack, QMenu, QWidget, QHeaderView, QInputDialog
1212

1313
from urh import settings
1414
from urh.awre import AutoAssigner
@@ -864,6 +864,11 @@ def save_protocol(self):
864864

865865
if filename.endswith(".bin"):
866866
self.proto_analyzer.to_binary(filename, use_decoded=True)
867+
elif filename.endswith(".pcapng"):
868+
data_link_type, ok = QInputDialog.getInt(self, "Link type",
869+
"Interface Link Type to use (probably one between DLT_USER0-DLT_USER15 (147-162)):", 147, 0, 65535)
870+
if ok:
871+
self.proto_analyzer.to_pcapng(filename=filename, link_type=data_link_type)
867872
else:
868873
self.proto_analyzer.to_xml_file(filename=filename, decoders=self.decodings,
869874
participants=self.project_manager.participants, write_bits=True)

src/urh/dev/PCAPNG.py

+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import os
2+
import struct
3+
import math
4+
5+
from urh.util.Logger import logger
6+
7+
8+
# Refer to PCAPNG spec
9+
# https://www.ietf.org/staging/draft-tuexen-opsawg-pcapng-02.html
10+
11+
def _build_pcapng_shb(shb_userappl: str = "", shb_hardware: str = "") -> bytes:
12+
BLOCKTYPE = 0x0A0D0D0A
13+
HEADERS_BLOCK_LENGTH = 28
14+
MAGIC_NUMBER = 0x1A2B3C4D
15+
VERSION_MAJOR, VERSION_MINOR = 1, 0
16+
SECTIONLENGTH = 0xFFFFFFFFFFFFFFFF # -1 => Not specified
17+
18+
shb_userappl_padded_len = math.ceil(len(shb_userappl) / 4) * 4
19+
shb_hardware_padded_len = math.ceil(len(shb_hardware) / 4) * 4
20+
21+
total_block_len = HEADERS_BLOCK_LENGTH
22+
if shb_userappl_padded_len > 0:
23+
total_block_len += shb_userappl_padded_len + 4
24+
25+
if shb_hardware_padded_len > 0:
26+
total_block_len += shb_hardware_padded_len + 4
27+
28+
shb = struct.pack(">IIIHHQ",
29+
BLOCKTYPE,
30+
total_block_len,
31+
MAGIC_NUMBER,
32+
VERSION_MAJOR, VERSION_MINOR,
33+
SECTIONLENGTH)
34+
35+
if shb_userappl != "":
36+
SHB_USERAPPL = 4
37+
strpad = shb_userappl.ljust(shb_userappl_padded_len, "\0")
38+
shb += struct.pack(">HH", SHB_USERAPPL, shb_userappl_padded_len)
39+
shb += bytes(strpad, 'ascii')
40+
41+
if shb_hardware != "":
42+
SHB_HARDWARE = 2
43+
strpad = shb_hardware.ljust(shb_hardware_padded_len, "\0")
44+
shb += struct.pack(">HH", SHB_HARDWARE, shb_hardware_padded_len)
45+
shb += bytes(strpad, 'ascii')
46+
47+
shb += struct.pack(">I", total_block_len)
48+
return shb
49+
50+
def _build_pcapng_idb(link_type) -> bytes:
51+
BLOCKTYPE = 0x00000001
52+
BLOCKLENGTH = 20
53+
SNAP_LEN = 0
54+
55+
return struct.pack(">IIHHII",
56+
BLOCKTYPE,
57+
BLOCKLENGTH,
58+
link_type, 0,
59+
SNAP_LEN,
60+
BLOCKLENGTH)
61+
62+
def _build_pcapng_epb(packet: bytes, timestamp: float) -> bytes:
63+
BLOCKTYPE = 0x00000006
64+
BLOCKHEADERLEN = 32
65+
INTERFACE_ID = 0
66+
67+
captured_packet_len = len(packet)
68+
original_packet_len = captured_packet_len
69+
padded_packet_len = math.ceil(captured_packet_len / 4) * 4
70+
padding_len = padded_packet_len - original_packet_len
71+
padded_packet = packet + bytearray(padding_len)
72+
block_total_length = BLOCKHEADERLEN + padded_packet_len
73+
timestamp_int = int(timestamp * 1e6) # Set the proper resolution
74+
timestamp_high = timestamp_int >> 32
75+
timestamp_low = timestamp_int & 0x00000000FFFFFFFF
76+
77+
epb = struct.pack(">IIIIIII",
78+
BLOCKTYPE,
79+
block_total_length,
80+
INTERFACE_ID,
81+
timestamp_high,
82+
timestamp_low,
83+
captured_packet_len,
84+
original_packet_len)
85+
epb += padded_packet
86+
epb += struct.pack(">I", block_total_length)
87+
return epb
88+
89+
def create_pcapng_file(filename: str, shb_userappl: str = "", shb_hardware: str = "",
90+
link_type: int = 147) -> bytes:
91+
if filename == "":
92+
return
93+
94+
shb_bytes = _build_pcapng_shb(shb_userappl, shb_hardware)
95+
idb_bytes = _build_pcapng_idb(link_type)
96+
97+
if os.path.isfile(filename):
98+
logger.warning("{0} already exists. Overwriting it".format(filename))
99+
100+
with open(filename, "wb") as f:
101+
f.write(shb_bytes)
102+
f.write(idb_bytes)
103+
104+
def append_packets_to_pcapng(filename: str, packets: list, timestamps: list):
105+
with open(filename, "ab") as f:
106+
for packet, timestamp in zip(packets, timestamps):
107+
f.write(_build_pcapng_epb(packet, timestamp))

src/urh/signalprocessing/Message.py

+4
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,10 @@ def decoded_ascii_array(self) -> array.array:
302302
def decoded_ascii_str(self) -> str:
303303
return "".join(map(chr, self.decoded_ascii_array))
304304

305+
@property
306+
def decoded_ascii_buffer(self) -> bytes:
307+
return self.decoded_ascii_array.tobytes()
308+
305309
def __get_bit_range_from_hex_or_ascii_index(self, from_index: int, decoded: bool, is_hex: bool) -> tuple:
306310
bits = self.decoded_bits if decoded else self.plain_bits
307311
factor = 4 if is_hex else 8

src/urh/signalprocessing/ProtocolAnalyzer.py

+8
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from urh import settings
1010
from urh.cythonext import signal_functions
11+
import urh.dev.PCAPNG as PCAPNG
1112
from urh.signalprocessing.Encoding import Encoding
1213
from urh.signalprocessing.Message import Message
1314
from urh.signalprocessing.MessageType import MessageType
@@ -645,6 +646,13 @@ def from_xml_file(self, filename: str, read_bits=False):
645646
root = tree.getroot()
646647
self.from_xml_tag(root, read_bits=read_bits)
647648

649+
def to_pcapng(self, filename : str, hardware_desc_name: str = "", link_type: int = 147):
650+
PCAPNG.create_pcapng_file(filename=filename, shb_userappl="Universal Radio Hacker", shb_hardware=hardware_desc_name, link_type=link_type)
651+
PCAPNG.append_packets_to_pcapng(
652+
filename=filename,
653+
packets=(msg.decoded_ascii_buffer for msg in self.messages),
654+
timestamps=(msg.timestamp for msg in self.messages))
655+
648656
def eliminate(self):
649657
self.message_types = None
650658
self.messages = None

src/urh/util/FileOperator.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
WAV_FILE_FILTER = "Waveform Audio File Format (*.wav *.wave)"
4343
PROTOCOL_FILE_FILTER = "Protocol (*.proto.xml *.proto)"
4444
BINARY_PROTOCOL_FILE_FILTER = "Binary Protocol (*.bin)"
45+
WIRESHARK_FILE_FILTER = "Wireshark File (*.pcapng)"
4546
PLAIN_BITS_FILE_FILTER = "Plain Bits (*.txt)"
4647
FUZZING_FILE_FILTER = "Fuzzing Profile (*.fuzz.xml *.fuzz)"
4748
SIMULATOR_FILE_FILTER = "Simulator Profile (*.sim.xml *.sim)"
@@ -93,7 +94,7 @@ def ask_save_file_name(initial_name: str, caption="Save signal", selected_name_f
9394
elif caption == "Export spectrogram":
9495
name_filter = "Frequency Time (*.ft);;Frequency Time Amplitude (*.fta)"
9596
elif caption == "Save protocol":
96-
name_filter = ";;".join([PROTOCOL_FILE_FILTER, BINARY_PROTOCOL_FILE_FILTER])
97+
name_filter = ";;".join([PROTOCOL_FILE_FILTER, BINARY_PROTOCOL_FILE_FILTER, WIRESHARK_FILE_FILTER])
9798
elif caption == "Export demodulated":
9899
name_filter = ";;".join([WAV_FILE_FILTER, SUB_FILE_FILTER])
99100
else:

tests/test_util.py

+38
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from tests.utils_testing import get_path_for_data_file
1010
from urh import settings
1111
from urh.dev.PCAP import PCAP
12+
import urh.dev.PCAPNG as PCAPNG
1213
from urh.signalprocessing.ProtocolAnalyzer import ProtocolAnalyzer
1314
from urh.signalprocessing.Signal import Signal
1415
from urh.util import util
@@ -71,6 +72,43 @@ def test_write_pcap(self):
7172
pcap = PCAP()
7273
pcap.write_packets(proto_analyzer.messages, os.path.join(tempfile.gettempdir(), "test.pcap"), 1e6)
7374

75+
def test_write_pcapng(self):
76+
signal = Signal(get_path_for_data_file("ask.complex"), "ASK-Test")
77+
signal.modulation_type = "ASK"
78+
signal.samples_per_symbol = 295
79+
signal.center = -0.1667
80+
self.assertEqual(signal.num_samples, 13710)
81+
82+
proto_analyzer = ProtocolAnalyzer(signal)
83+
proto_analyzer.get_protocol_from_signal()
84+
self.assertEqual(proto_analyzer.decoded_hex_str[0], "b25b6db6c80")
85+
86+
proto_analyzer.messages.append(copy.deepcopy(proto_analyzer.messages[0]))
87+
proto_analyzer.messages.append(copy.deepcopy(proto_analyzer.messages[0]))
88+
proto_analyzer.messages.append(copy.deepcopy(proto_analyzer.messages[0]))
89+
90+
filepath = os.path.join(tempfile.gettempdir(), "test.pcapng")
91+
PCAPNG.create_pcapng_file(filepath, "Universal Radio Hacker Test", "TestHW", 147)
92+
PCAPNG.append_packets_to_pcapng(
93+
filename=filepath,
94+
packets=(msg.decoded_ascii_buffer for msg in proto_analyzer.messages),
95+
timestamps=(msg.timestamp for msg in proto_analyzer.messages))
96+
97+
# As we don't have PCAPNG importers, we'll verify output just by checking file size, PCAPNG SHB type number
98+
# and that all msg bytes were written somewhere inside output file
99+
filechecks = False
100+
if os.path.isfile(filepath): # ok, file exist
101+
with open(filepath, "rb") as f:
102+
filecontents = f.read()
103+
# min file len= SHB + IDB + 4 EPB msgs
104+
minfilelen = 28 + 20 + (4 * (32 + len(proto_analyzer.messages[0].decoded_ascii_buffer)))
105+
if len(filecontents) >= minfilelen: # ok, min file length passed
106+
if filecontents.find(b'\x0A\x0D\x0D\x0A') >= 0: # ok, seems that SHB was written
107+
if filecontents.find(proto_analyzer.messages[0].decoded_ascii_buffer) >= 0: # ok, msg bytes written
108+
filechecks = True
109+
110+
self.assertTrue(filechecks)
111+
74112
def test_de_bruijn_fuzzing(self):
75113
self.assertEqual(c_util.de_bruijn(3), array.array("B", [0, 0, 0, 1, 0, 1, 1, 1]))
76114
self.assertEqual(c_util.de_bruijn(4), array.array("B", [0, 0, 0, 0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 1, 1, 1]))

0 commit comments

Comments
 (0)