Skip to content

Commit 3c240b4

Browse files
committed
Adding Save As pcapng for ProtocolAnalyzer
1 parent 7ddf54b commit 3c240b4

File tree

6 files changed

+180
-2
lines changed

6 files changed

+180
-2
lines changed

src/urh/controller/CompareFrameController.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from PyQt5.QtCore import pyqtSlot, QTimer, Qt, pyqtSignal, QItemSelection, QItemSelectionModel, QLocale, \
99
QModelIndex
1010
from PyQt5.QtGui import QContextMenuEvent, QIcon
11-
from PyQt5.QtWidgets import QMessageBox, QAbstractItemView, QUndoStack, QMenu, QWidget, QHeaderView
11+
from PyQt5.QtWidgets import QMessageBox, QAbstractItemView, QUndoStack, QMenu, QWidget, QHeaderView, QInputDialog
1212

1313
from urh import settings
1414
from urh.awre import AutoAssigner
@@ -864,6 +864,11 @@ def save_protocol(self):
864864

865865
if filename.endswith(".bin"):
866866
self.proto_analyzer.to_binary(filename, use_decoded=True)
867+
elif filename.endswith(".pcapng"):
868+
data_link_type, ok = QInputDialog.getInt(self, "Link type",
869+
"Interface Link Type to use (probably one between DLT_USER0-DLT_USER15 (147-162)):", 147, 0, 65535)
870+
if ok:
871+
self.proto_analyzer.to_pcapng(filename=filename, link_type=data_link_type)
867872
else:
868873
self.proto_analyzer.to_xml_file(filename=filename, decoders=self.decodings,
869874
participants=self.project_manager.participants, write_bits=True)

src/urh/dev/PCAPNG.py

+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
import os
2+
import struct
3+
import time
4+
import math
5+
6+
from urh.util.Logger import logger
7+
8+
# Refer to PCAPNG spec
9+
# https://www.ietf.org/staging/draft-tuexen-opsawg-pcapng-02.html
10+
11+
class PCAPNG(object):
12+
def __init__(self):
13+
self.__messages = []
14+
self.__messages_timestamps = []
15+
self.__filename = ""
16+
17+
def __build_pcapng_shb(self, shb_userappl : str = "", shb_hardware : str = "") -> bytes:
18+
BLOCKTYPE = 0x0A0D0D0A
19+
HEADERS_BLOCK_LENGTH = 28
20+
MAGIC_NUMBER = 0x1A2B3C4D
21+
VERSION_MAJOR, VERSION_MINOR = 1,0
22+
SECTIONLENGTH = 0xFFFFFFFFFFFFFFFF # -1 => Not specified
23+
24+
shb_userappl_padded_len = math.ceil(len(shb_userappl)/4) * 4
25+
shb_hardware_padded_len = math.ceil(len(shb_hardware)/4) * 4
26+
27+
total_block_len = HEADERS_BLOCK_LENGTH
28+
if shb_userappl_padded_len > 0:
29+
total_block_len += shb_userappl_padded_len + 4
30+
31+
if shb_hardware_padded_len > 0:
32+
total_block_len += shb_hardware_padded_len + 4
33+
34+
shb = struct.pack(">IIIHHQ",
35+
BLOCKTYPE,
36+
total_block_len,
37+
MAGIC_NUMBER,
38+
VERSION_MAJOR, VERSION_MINOR,
39+
SECTIONLENGTH)
40+
41+
if shb_userappl != "":
42+
SHB_USERAPPL = 4
43+
strpad = shb_userappl.ljust(shb_userappl_padded_len, "\0")
44+
shb += struct.pack(">HH", SHB_USERAPPL, shb_userappl_padded_len)
45+
shb += bytes(strpad, 'ascii')
46+
47+
if shb_hardware != "":
48+
SHB_HARDWARE = 2
49+
strpad = shb_hardware.ljust(shb_hardware_padded_len, "\0")
50+
shb += struct.pack(">HH", SHB_HARDWARE, shb_hardware_padded_len)
51+
shb += bytes(strpad, 'ascii')
52+
53+
shb += struct.pack(">I",total_block_len)
54+
return shb
55+
56+
def __build_pcapng_idb(self, link_type) -> bytes:
57+
BLOCKTYPE = 0x00000001
58+
BLOCKLENGTH = 20
59+
SNAP_LEN = 0
60+
61+
return struct.pack(">IIHHII",
62+
BLOCKTYPE,
63+
BLOCKLENGTH,
64+
link_type, 0,
65+
SNAP_LEN,
66+
BLOCKLENGTH)
67+
68+
def __build_pcapng_epb(self, packet : bytes, timestamp : float) -> bytes:
69+
BLOCKTYPE = 0x00000006
70+
BLOCKHEADERLEN = 32
71+
INTERFACE_ID = 0
72+
73+
captured_packet_len = len(packet)
74+
original_packet_len = captured_packet_len
75+
padded_packet_len = math.ceil(captured_packet_len/4) * 4
76+
padding_len = padded_packet_len - original_packet_len
77+
padded_packet = packet + bytearray(padding_len)
78+
block_total_length = BLOCKHEADERLEN + padded_packet_len
79+
timestamp_int = int(timestamp *1e6) # Set the proper resolution
80+
timestamp_high = timestamp_int >> 32
81+
timestamp_low = timestamp_int & 0x00000000FFFFFFFF
82+
83+
epb = struct.pack(">IIIIIII",
84+
BLOCKTYPE,
85+
block_total_length,
86+
INTERFACE_ID,
87+
timestamp_high,
88+
timestamp_low,
89+
captured_packet_len,
90+
original_packet_len)
91+
epb += padded_packet
92+
epb += struct.pack(">I", block_total_length)
93+
return epb
94+
95+
############################################################################################################
96+
# PUBLIC INTERFACES
97+
def create_pcapng_file(self, filename : str, shb_userappl : str = "", shb_hardware : str = "", link_type : int =147) -> bytes:
98+
if (filename == ""):
99+
return
100+
self.__filename = filename
101+
102+
shb_bytes = self.__build_pcapng_shb(shb_userappl,shb_hardware)
103+
idb_bytes = self.__build_pcapng_idb(link_type)
104+
105+
if os.path.isfile(self.__filename):
106+
logger.warning("{0} already exists. Overwriting it".format(filename))
107+
108+
with open(self.__filename, "wb") as f:
109+
f.write(shb_bytes)
110+
f.write(idb_bytes)
111+
112+
def pcapng_file_append_packet(self, packet : bytes, timestamp: float):
113+
with open(self.__filename, "ab") as f:
114+
f.write(self.__build_pcapng_epb(packet, timestamp))
115+
116+
def pcapng_file_append_multi_packets(self, packets : list):
117+
with open(self.__filename, "ab") as f:
118+
for packet, timestamp in packets:
119+
f.write(self.__build_pcapng_epb(packet, timestamp))
120+

src/urh/signalprocessing/Message.py

+4
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,10 @@ def decoded_ascii_array(self) -> array.array:
302302
def decoded_ascii_str(self) -> str:
303303
return "".join(map(chr, self.decoded_ascii_array))
304304

305+
@property
306+
def decoded_ascii_buffer(self) -> bytes:
307+
return self.decoded_ascii_array.tobytes()
308+
305309
def __get_bit_range_from_hex_or_ascii_index(self, from_index: int, decoded: bool, is_hex: bool) -> tuple:
306310
bits = self.decoded_bits if decoded else self.plain_bits
307311
factor = 4 if is_hex else 8

src/urh/signalprocessing/ProtocolAnalyzer.py

+9
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from urh import settings
1010
from urh.cythonext import signal_functions
11+
from urh.dev.PCAPNG import PCAPNG
1112
from urh.signalprocessing.Encoding import Encoding
1213
from urh.signalprocessing.Message import Message
1314
from urh.signalprocessing.MessageType import MessageType
@@ -645,6 +646,14 @@ def from_xml_file(self, filename: str, read_bits=False):
645646
root = tree.getroot()
646647
self.from_xml_tag(root, read_bits=read_bits)
647648

649+
def to_pcapng(self, filename : str, hardware_desc_name: str = "", link_type: int = 147):
650+
pcapng = PCAPNG()
651+
pcapng.create_pcapng_file(filename=filename, shb_userappl="Universal Radio Hacker", shb_hardware=hardware_desc_name, link_type=link_type)
652+
msg_list = []
653+
for msg in self.messages:
654+
msg_list.append([msg.decoded_ascii_buffer, msg.timestamp])
655+
pcapng.pcapng_file_append_multi_packets(msg_list)
656+
648657
def eliminate(self):
649658
self.message_types = None
650659
self.messages = None

src/urh/util/FileOperator.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
WAV_FILE_FILTER = "Waveform Audio File Format (*.wav *.wave)"
4343
PROTOCOL_FILE_FILTER = "Protocol (*.proto.xml *.proto)"
4444
BINARY_PROTOCOL_FILE_FILTER = "Binary Protocol (*.bin)"
45+
WIRESHARK_FILE_FILTER = "Wireshark File (*.pcapng)"
4546
PLAIN_BITS_FILE_FILTER = "Plain Bits (*.txt)"
4647
FUZZING_FILE_FILTER = "Fuzzing Profile (*.fuzz.xml *.fuzz)"
4748
SIMULATOR_FILE_FILTER = "Simulator Profile (*.sim.xml *.sim)"
@@ -93,7 +94,7 @@ def ask_save_file_name(initial_name: str, caption="Save signal", selected_name_f
9394
elif caption == "Export spectrogram":
9495
name_filter = "Frequency Time (*.ft);;Frequency Time Amplitude (*.fta)"
9596
elif caption == "Save protocol":
96-
name_filter = ";;".join([PROTOCOL_FILE_FILTER, BINARY_PROTOCOL_FILE_FILTER])
97+
name_filter = ";;".join([PROTOCOL_FILE_FILTER, BINARY_PROTOCOL_FILE_FILTER, WIRESHARK_FILE_FILTER])
9798
elif caption == "Export demodulated":
9899
name_filter = WAV_FILE_FILTER
99100
else:

tests/test_util.py

+39
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from tests.utils_testing import get_path_for_data_file
1010
from urh import settings
1111
from urh.dev.PCAP import PCAP
12+
from urh.dev.PCAPNG import PCAPNG
1213
from urh.signalprocessing.ProtocolAnalyzer import ProtocolAnalyzer
1314
from urh.signalprocessing.Signal import Signal
1415
from urh.util import util
@@ -71,6 +72,44 @@ def test_write_pcap(self):
7172
pcap = PCAP()
7273
pcap.write_packets(proto_analyzer.messages, os.path.join(tempfile.gettempdir(), "test.pcap"), 1e6)
7374

75+
def test_write_pcapng(self):
76+
signal = Signal(get_path_for_data_file("ask.complex"), "ASK-Test")
77+
signal.modulation_type = "ASK"
78+
signal.samples_per_symbol = 295
79+
signal.center = -0.1667
80+
self.assertEqual(signal.num_samples, 13710)
81+
82+
proto_analyzer = ProtocolAnalyzer(signal)
83+
proto_analyzer.get_protocol_from_signal()
84+
self.assertEqual(proto_analyzer.decoded_hex_str[0], "b25b6db6c80")
85+
86+
proto_analyzer.messages.append(copy.deepcopy(proto_analyzer.messages[0]))
87+
proto_analyzer.messages.append(copy.deepcopy(proto_analyzer.messages[0]))
88+
proto_analyzer.messages.append(copy.deepcopy(proto_analyzer.messages[0]))
89+
90+
pcap = PCAPNG()
91+
pcap.create_pcapng_file(os.path.join(tempfile.gettempdir(), "test.pcapng"), "Universal Radio Hacker Test", "TestHW", 147)
92+
msg_list = []
93+
for msg in proto_analyzer.messages:
94+
msg_list.append([msg.decoded_ascii_buffer, msg.timestamp])
95+
pcap.pcapng_file_append_multi_packets(msg_list)
96+
97+
# As we don't have PCAPNG importers, we'll verify output just by checking file size, PCAPNG SHB type number
98+
# and that all msg bytes were written somewhere inside output file
99+
filepath = os.path.join(tempfile.gettempdir(), "test.pcapng")
100+
filechecks = False
101+
if os.path.isfile(filepath): # ok, file exist
102+
with open(filepath, "rb") as f:
103+
filecontents = f.read()
104+
# min file len= SHB + IDB + 4 EPB msgs
105+
minfilelen = 28 + 20 + (4 * (32 + len(proto_analyzer.messages[0].decoded_ascii_buffer)))
106+
if len(filecontents) >= minfilelen: # ok, min file length passed
107+
if filecontents.find(b'\x0A\x0D\x0D\x0A') >= 0: # ok, seems that SHB was written
108+
if filecontents.find(proto_analyzer.messages[0].decoded_ascii_buffer) >= 0: # ok, msg bytes written
109+
filechecks = True
110+
111+
self.assertTrue(filechecks)
112+
74113
def test_de_bruijn_fuzzing(self):
75114
self.assertEqual(c_util.de_bruijn(3), array.array("B", [0, 0, 0, 1, 0, 1, 1, 1]))
76115
self.assertEqual(c_util.de_bruijn(4), array.array("B", [0, 0, 0, 0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 1, 1, 1]))

0 commit comments

Comments
 (0)