diff --git a/facedancer/__init__.py b/facedancer/__init__.py index 5fac5ce..5d18974 100644 --- a/facedancer/__init__.py +++ b/facedancer/__init__.py @@ -19,13 +19,8 @@ # Alias objects to make them easier to import. from .backends import * -from .core import FacedancerUSBApp, FacedancerUSBHostApp, FacedancerBasicScheduler -from .devices import default_main as main - -# Set up our extra log levels. -import logging -from .constants import LOGLEVEL_TRACE -logging.addLevelName(LOGLEVEL_TRACE, 'TRACE') +from .core import FacedancerUSBApp, FacedancerUSBHostApp, FacedancerBasicScheduler +from .devices import default_main as main # Wildcard import. __all__ = [ @@ -35,5 +30,5 @@ 'to_any_endpoint', 'to_this_interface', 'to_any_interface', 'to_other', 'USBDirection', 'USBTransferType', 'USBUsageType', 'USBSynchronizationType', 'USBRequestType', 'USBRequestRecipient', 'USBStandardRequests', 'LanguageIDs', - 'use_automatically', 'use_inner_classes_automatically', 'LOGLEVEL_TRACE', + 'use_automatically', 'use_inner_classes_automatically', ] diff --git a/facedancer/backends/goodfet.py b/facedancer/backends/goodfet.py index a27ae22..4354550 100644 --- a/facedancer/backends/goodfet.py +++ b/facedancer/backends/goodfet.py @@ -2,10 +2,10 @@ import serial import sys import time -import logging -from ..core import FacedancerApp +from ..core import FacedancerApp from ..backends.MAXUSBApp import MAXUSBApp +from ..logging import log class GoodfetMaxUSBApp(MAXUSBApp): @@ -31,7 +31,7 @@ def appropriate_for_environment(cls, backend_name): gf.close() return True except ImportError: - logging.info("Skipping GoodFET-based devices, as pyserial isn't installed.") + log.info("Skipping GoodFET-based devices, as pyserial isn't installed.") return False except: return False diff --git a/facedancer/backends/greatdancer.py b/facedancer/backends/greatdancer.py index 913f2ca..aa15d55 100644 --- a/facedancer/backends/greatdancer.py +++ b/facedancer/backends/greatdancer.py @@ -3,15 +3,13 @@ import sys import time import codecs -import logging import traceback -from ..core import * -from ..types import * +from ..core import * +from ..types import * +from ..logging import log -# FIXME: abstract this to the logging library -LOGLEVEL_TRACE = 5 class GreatDancerApp(FacedancerApp): """ @@ -63,7 +61,7 @@ def appropriate_for_environment(cls, backend_name): gf = greatfet.GreatFET() return gf.supports_api('greatdancer') except ImportError: - logging.info("Skipping GreatFET-based devices, as the greatfet python module isn't installed.") + log.info("Skipping GreatFET-based devices, as the greatfet python module isn't installed.") return False except: return False @@ -176,7 +174,7 @@ def _generate_endpoint_config_arguments(self, config): for interface in config.get_interfaces(): for endpoint in interface.get_endpoints(): - logging.info(f"Configuring {endpoint}.") + log.info(f"Configuring {endpoint}.") triple = (endpoint.get_address(), endpoint.max_packet_size, endpoint.transfer_type,) arguments.append(triple) @@ -199,19 +197,19 @@ def connect(self, usb_device, max_ep0_packet_size=64): # Compute our quirk flags. if 'manual_set_address' in self.quirks: - logging.info("Handling SET_ADDRESS on the host side!") + log.info("Handling SET_ADDRESS on the host side!") quirks |= self.QUIRK_MANUAL_SET_ADDRESS self.api.connect(self.max_ep0_packet_size, quirks) self.connected_device = usb_device - logging.info("Connecting to host.") + log.info("Connecting to host.") def disconnect(self): """ Disconnects the GreatDancer from its target host. """ - logging.info("Disconnecting from host.") + log.info("Disconnecting from host.") self.device.comms.release_exclusive_access() self.api.disconnect() @@ -240,7 +238,7 @@ def send_on_endpoint(self, ep_num, data, blocking=True): data: The data to be sent. blocking: If true, this function will wait for the transfer to complete. """ - logging.log(LOGLEVEL_TRACE, f"EP{ep_num}/IN: <- {bytes(data)}") + log.trace(f"EP{ep_num}/IN: <- {bytes(data)}") self._wait_until_ready_to_send(ep_num) self.api.send_on_endpoint(ep_num, bytes(data)) @@ -291,7 +289,7 @@ def stall_endpoint(self, ep_num, direction=0): """ in_vs_out = "IN" if direction else "OUT" - logging.log(LOGLEVEL_TRACE, "Stalling EP{} {}".format(ep_num, in_vs_out)) + log.trace("Stalling EP{} {}".format(ep_num, in_vs_out)) self.endpoint_stalled[ep_num] = True self.api.stall_endpoint(self._endpoint_address(ep_num, direction)) @@ -566,7 +564,7 @@ def _handle_transfer_complete_on_endpoint(self, endpoint_number, direction): # callback. else: data = self._finish_primed_read_on_endpoint(endpoint_number) - logging.log(LOGLEVEL_TRACE, f"EP{endpoint_number}/OUT -> {data}") + log.trace(f"EP{endpoint_number}/OUT -> {data}") self.connected_device.handle_data_available(endpoint_number, data) @@ -675,7 +673,7 @@ def _bus_reset(self): Triggers the GreatDancer to perform its side of a bus reset. """ - logging.debug("Host issued bus reset.") + log.debug("Host issued bus reset.") if self.connected_device: self.connected_device.handle_bus_reset() diff --git a/facedancer/backends/greathost.py b/facedancer/backends/greathost.py index cb827fc..9297f64 100644 --- a/facedancer/backends/greathost.py +++ b/facedancer/backends/greathost.py @@ -6,11 +6,12 @@ import time import codecs import struct -import logging from ..core import * from ..types import * +from ..logging import log + class GreatDancerHostApp(FacedancerUSBHost): """ @@ -102,7 +103,7 @@ def appropriate_for_environment(cls, backend_name): greatfet.GreatFET() return True except ImportError: - logging.info("Skipping GreatFET-based devices, as the greatfet python module isn't installed.") + log.info("Skipping GreatFET-based devices, as the greatfet python module isn't installed.") return False except: return False diff --git a/facedancer/backends/moondancer.py b/facedancer/backends/moondancer.py index 398d985..4d2bd63 100644 --- a/facedancer/backends/moondancer.py +++ b/facedancer/backends/moondancer.py @@ -3,20 +3,14 @@ import sys import time import codecs -import functools import enum -import logging import traceback from ..core import * from ..constants import DeviceSpeed from ..types import USBDirection -# add a TRACE level to logging -logging.TRACE = 5 -logging.addLevelName(logging.TRACE, "TRACE") -logging.Logger.trace = functools.partialmethod(logging.Logger.log, logging.TRACE) -logging.trace = functools.partial(logging.log, logging.TRACE) +from ..logging import log # Quirk flags class QuirkFlag(enum.IntFlag): @@ -36,7 +30,7 @@ class InterruptEvent(enum.Enum): def parse(data): if len(data) != 2: - logging.error(f"Invalid length for InterruptEvent: {len(data)}") + log.error(f"Invalid length for InterruptEvent: {len(data)}") raise ValueError(f"Invalid length for InterruptEvent: {len(data)}") message = InterruptEvent(data[0]) message.endpoint_number = data[1] @@ -66,9 +60,7 @@ def __init__(self, device=None, verbose=0, quirks=None): verbose: The verbosity level of the given application. """ - #logging.getLogger().setLevel(logging.DEBUG) - - logging.info("Using the Moondancer backend.") + log.info("Using the Moondancer backend.") import cynthion @@ -140,7 +132,7 @@ def appropriate_for_environment(cls, backend_name): device = cynthion.Cynthion() return device.supports_api('moondancer') except ImportError: - logging.debug("Skipping Cynthion-based devices, as the cynthion python module isn't installed.") + log.debug("Skipping Cynthion-based devices, as the cynthion python module isn't installed.") return False except: return False @@ -180,14 +172,14 @@ def connect(self, usb_device, max_ep0_packet_size=64): emulated. """ - logging.debug(f"moondancer.connect(max_ep0_packet_size:{max_ep0_packet_size}, device_speed:{self.device_speed}, quirks:{self.quirks})") + log.debug(f"moondancer.connect(max_ep0_packet_size:{max_ep0_packet_size}, device_speed:{self.device_speed}, quirks:{self.quirks})") self.max_ep0_packet_size = max_ep0_packet_size # compute our quirk flags quirks = 0 if 'manual_set_address' in self.quirks: - logging.warn("Handling SET_ADDRESS on the target host side!") + log.warn("Handling SET_ADDRESS on the target host side!") quirks |= QuirkFlag.MANUAL_SET_ADDRESS # connect to target host @@ -197,13 +189,13 @@ def connect(self, usb_device, max_ep0_packet_size=64): # get device name device_name = f"{type(self.connected_device).__module__}.{type(self.connected_device).__qualname__}" - logging.info(f"Connected '{device_name}' to target host.") + log.info(f"Connected '{device_name}' to target host.") def disconnect(self): """ Disconnects Cynthion from the target host. """ - logging.info("Disconnecting from target host.") + log.info("Disconnecting from target host.") self.device.comms.release_exclusive_access() @@ -217,7 +209,7 @@ def reset(self): Triggers the Cynthion to handle its side of a bus reset. """ - logging.debug(f"moondancer.bus_reset()") + log.debug(f"moondancer.bus_reset()") self.api.bus_reset() @@ -231,7 +223,7 @@ def set_address(self, address, defer=False): defer: True iff the set_address request should wait for an active transaction to finish. """ - logging.debug(f"moondancer.set_address({address}, {defer})") + log.debug(f"moondancer.set_address({address}, {defer})") self.api.set_address(address, 1 if defer else 0) @@ -244,10 +236,10 @@ def configured(self, configuration): configuration: The USBConfiguration object applied by the SET_CONFIG request. """ - logging.debug("fmoondancer.configured({configuration})") + log.debug("fmoondancer.configured({configuration})") if configuration is None: - logging.error("Target host configuration could not be applied.") + log.error("Target host configuration could not be applied.") return # If we need to issue a configuration command, issue one. @@ -258,7 +250,7 @@ def configured(self, configuration): for interface in configuration.get_interfaces(): for endpoint in interface.get_endpoints(): - logging.debug(f"Configuring endpoint: {endpoint}.") + log.debug(f"Configuring endpoint: {endpoint}.") triple = (endpoint.get_address(), endpoint.max_packet_size, endpoint.transfer_type,) endpoint_triplets.append(triple) @@ -276,7 +268,7 @@ def configured(self, configuration): nak_status = self.api.get_nak_status() self.handle_ep_in_nak_status(nak_status) - logging.info("Target host configuration complete.") + log.info("Target host configuration complete.") def read_from_endpoint(self, endpoint_number): @@ -286,7 +278,7 @@ def read_from_endpoint(self, endpoint_number): endpoint_number: The number of the OUT endpoint on which data is to be rx'd. """ - logging.debug(f"moondancer.read_from_endpoint({endpoint_number})") + log.debug(f"moondancer.read_from_endpoint({endpoint_number})") # Read from the given endpoint... data = self.api.read_endpoint(endpoint_number) @@ -294,7 +286,7 @@ def read_from_endpoint(self, endpoint_number): # Prime endpoint to receive data again... self.api.ep_out_prime_receive(endpoint_number) - logging.trace(f" moondancer.api.read_endpoint({endpoint_number}) -> {len(data)} '{data}'") + log.trace(f" moondancer.api.read_endpoint({endpoint_number}) -> {len(data)} '{data}'") # Finally, return the result. return data @@ -309,11 +301,11 @@ def send_on_endpoint(self, endpoint_number, data, blocking=True): blocking: If true, this function will wait for the transfer to complete. """ - logging.debug(f"moondancer.send_on_endpoint({endpoint_number}, {len(data)}, {blocking})") + log.debug(f"moondancer.send_on_endpoint({endpoint_number}, {len(data)}, {blocking})") self.api.write_endpoint(endpoint_number, blocking, bytes(data)) - logging.trace(f" moondancer.api.write_endpoint({endpoint_number}, {blocking}, {data})") + log.trace(f" moondancer.api.write_endpoint({endpoint_number}, {blocking}, {data})") def ack_status_stage(self, direction=Direction.HOST_TO_DEVICE, endpoint_number=0, blocking=False): @@ -329,14 +321,14 @@ def ack_status_stage(self, direction=Direction.HOST_TO_DEVICE, endpoint_number=0 before returning. """ - logging.debug(f"moondancer.ack_status_stage({direction.name}, {endpoint_number}, {blocking})") + log.debug(f"moondancer.ack_status_stage({direction.name}, {endpoint_number}, {blocking})") if direction == Direction.HOST_TO_DEVICE: # 0 = HOST_TO_DEVICE (OUT) # If this was an OUT request, we'll prime the output buffer to # respond with the ZLP expected during the status stage. self.api.write_endpoint(endpoint_number, blocking, bytes([])) - logging.trace(f" moondancer.api.write_endpoint({endpoint_number}, {blocking}, [])") + log.trace(f" moondancer.api.write_endpoint({endpoint_number}, {blocking}, [])") else: # 1 = DEVICE_TO_HOST (IN) # If this was an IN request, we'll need to set up a transfer descriptor @@ -344,7 +336,7 @@ def ack_status_stage(self, direction=Direction.HOST_TO_DEVICE, endpoint_number=0 # zero length packet from the STATUS phase. self.api.ep_out_prime_receive(endpoint_number) - logging.trace(f" moondancer.api.ep_out_prime_receive({endpoint_number})") + log.trace(f" moondancer.api.ep_out_prime_receive({endpoint_number})") def stall_endpoint(self, endpoint_number, direction=0): @@ -358,7 +350,7 @@ def stall_endpoint(self, endpoint_number, direction=0): # USBDirection.IN = 1 endpoint_address = (endpoint_number | 0x80) if direction else endpoint_number - logging.debug(f"Stalling EP{endpoint_number} {USBDirection(direction).name} (0x{endpoint_address:x})") + log.debug(f"Stalling EP{endpoint_number} {USBDirection(direction).name} (0x{endpoint_address:x})") # Mark endpoint number as stalled. self.endpoint_stalled[endpoint_number] = True @@ -366,10 +358,10 @@ def stall_endpoint(self, endpoint_number, direction=0): # Stall endpoint address. if direction: self.api.stall_endpoint_in(endpoint_number) - logging.debug(f" moondancer.api.stall_endpoint_in({endpoint_number})") + log.debug(f" moondancer.api.stall_endpoint_in({endpoint_number})") else: self.api.stall_endpoint_out(endpoint_number) - logging.debug(f" moondancer.api.stall_endpoint_out({endpoint_number})") + log.debug(f" moondancer.api.stall_endpoint_out({endpoint_number})") def stall_ep0(self, direction=0): @@ -409,7 +401,7 @@ def service_irqs(self): # Handle interrupt events. for event in events: - logging.trace(f"MD IRQ => {event}") + log.trace(f"MD IRQ => {event}") match event: case InterruptEvent.USB_BUS_RESET: self.handle_bus_reset() @@ -420,7 +412,7 @@ def service_irqs(self): case InterruptEvent.USB_SEND_COMPLETE: self.handle_send_complete(event.endpoint_number) case _: - logging.error(f"Unhandled interrupt event: {event}") + log.error(f"Unhandled interrupt event: {event}") # - Interrupt event handlers ---------------------------------------------- @@ -445,7 +437,7 @@ def handle_receive_control(self, endpoint_number): endpoint_number: The endpoint number for which a control event should be serviced. """ - logging.debug(f"handle_receive_control({endpoint_number})"); + log.debug(f"handle_receive_control({endpoint_number})"); # HACK: to maintain API compatibility with the existing facedancer API, # we need to know if a stall happens at any point during our handler. @@ -455,13 +447,13 @@ def handle_receive_control(self, endpoint_number): data = bytearray(self.api.read_control()) request = self.connected_device.create_request(data) - logging.debug(f" moondancer.api.read_control({endpoint_number}) -> {len(data)} '{request}'") + log.debug(f" moondancer.api.read_control({endpoint_number}) -> {len(data)} '{request}'") # If this is an OUT request, handle the data stage, # and add it to the request. is_out = request.get_direction() == Direction.HOST_TO_DEVICE has_data = (request.length > 0) - logging.trace(f" is_out:{is_out} has_data:{has_data}") + log.trace(f" is_out:{is_out} has_data:{has_data}") # Special case: if this is an OUT request with a data stage, we won't # handle the request until the data stage has been completed. Instead, @@ -470,15 +462,15 @@ def handle_receive_control(self, endpoint_number): # complete, triggering a corresponding code path in # in handle_transfer_complete_on_endpoint. if is_out and has_data: - logging.info(f" setup packet has data - queueing read") + log.info(f" setup packet has data - queueing read") self.pending_control_request = request return - logging.trace(f" connected_device.handle_request({request})") + log.trace(f" connected_device.handle_request({request})") self.connected_device.handle_request(request) if not is_out and not self.endpoint_stalled[endpoint_number]: - logging.trace(f" IN packet -> ack_status_stage(DEVICE_TO_HOST) ACK STATUS STAGE") + log.trace(f" IN packet -> ack_status_stage(DEVICE_TO_HOST) ACK STATUS STAGE") self.ack_status_stage(direction=Direction.DEVICE_TO_HOST) @@ -490,7 +482,7 @@ def handle_receive_packet(self, endpoint_number): endpoint_number: The endpoint number for which the transfer should be serviced. """ - logging.debug(f"handle_receive_packet({endpoint_number}) pending:{self.pending_control_request}") + log.debug(f"handle_receive_packet({endpoint_number}) pending:{self.pending_control_request}") # If we have a pending control request with a data stage... # TODO support endpoints other than EP0 @@ -499,7 +491,7 @@ def handle_receive_packet(self, endpoint_number): # Read the rest of the data from the endpoint, completing the control request. new_data = self.api.read_endpoint(endpoint_number) - logging.debug(f" handling control data stage: {len(new_data)} bytes") + log.debug(f" handling control data stage: {len(new_data)} bytes") # Append our new data to the pending control request. self.pending_control_request.data.extend(new_data) @@ -525,11 +517,11 @@ def handle_receive_packet(self, endpoint_number): # Prime endpoint to receive again. self.api.ep_out_prime_receive(endpoint_number) - logging.debug(f" moondancer.api.read_endpoint({endpoint_number}) -> {len(data)}") + log.debug(f" moondancer.api.read_endpoint({endpoint_number}) -> {len(data)}") if len(data) == 0: # it's an ack - logging.trace(" received ACK") + log.trace(" received ACK") return # Finally, pass it to the device's handler @@ -538,13 +530,13 @@ def handle_receive_packet(self, endpoint_number): # USB0_SEND_COMPLETE def handle_send_complete(self, endpoint_number): - logging.debug(f"handle_send_complete({endpoint_number})") + log.debug(f"handle_send_complete({endpoint_number})") pass # Handle pending data requests on EP_IN def handle_ep_in_nak_status(self, nak_status): nakked_endpoints = [epno for epno in range(self.SUPPORTED_ENDPOINTS) if (nak_status >> epno) & 1] for endpoint_number in nakked_endpoints: - logging.trace(f"Received IN NAK: {endpoint_number}") + log.trace(f"Received IN NAK: {endpoint_number}") if endpoint_number != 0: self.connected_device.handle_nak(endpoint_number) diff --git a/facedancer/backends/raspdancer.py b/facedancer/backends/raspdancer.py index 8434cbe..9060c02 100644 --- a/facedancer/backends/raspdancer.py +++ b/facedancer/backends/raspdancer.py @@ -9,11 +9,13 @@ import os import sys import time -import logging -from ..core import FacedancerApp +from ..core import FacedancerApp from ..backends.MAXUSBApp import MAXUSBApp +from ..logging import log + + class RaspdancerMaxUSBApp(MAXUSBApp): app_name = "MAXUSB" app_num = 0x00 # Not meaningful for us. TODO: Remove! @@ -36,7 +38,7 @@ def appropriate_for_environment(cls, backend_name): rd = Raspdancer() return True except ImportError as e: - logging.info("Skipping Raspdancer devices, as perquisites aren't installed ({}).".format(e)) + log.info("Skipping Raspdancer devices, as perquisites aren't installed ({}).".format(e)) return False except: return False diff --git a/facedancer/constants.py b/facedancer/constants.py index 01d875c..f8c25b9 100644 --- a/facedancer/constants.py +++ b/facedancer/constants.py @@ -1,8 +1,5 @@ from enum import IntEnum -# Extra log level -LOGLEVEL_TRACE = 5 - # Based on libusb's LIBUSB_SPEED_* constants. # diff --git a/facedancer/device.py b/facedancer/device.py index 1c7b124..35d7795 100644 --- a/facedancer/device.py +++ b/facedancer/device.py @@ -6,7 +6,6 @@ import sys import asyncio import struct -import logging import warnings from typing import Coroutine, Dict, Iterable, Union @@ -15,7 +14,6 @@ from prompt_toolkit import HTML, print_formatted_text from .core import FacedancerUSBApp -from .constants import LOGLEVEL_TRACE from .types import DescriptorTypes, LanguageIDs, USBStandardRequests from .types import USBDirection, USBRequestType, USBRequestRecipient @@ -28,6 +26,8 @@ from .request import USBControlRequest, USBRequestHandler from .request import standard_request_handler, to_device, get_request_handler_methods +from .logging import log + @dataclass class USBBaseDevice(USBDescribable, USBRequestHandler): @@ -183,7 +183,7 @@ async def run(self): # Sanity check to avoid common issues. if len(self.configurations) == 0: - logging.error("No configurations defined on the emulated device! " + log.error("No configurations defined on the emulated device! " "Did you forget @use_inner_classes_automatically?") if self.backend is None: @@ -312,10 +312,10 @@ def get_endpoint(self, endpoint_number: int, direction: USBDirection) -> USBEndp if self.configuration: endpoint = self.configuration.get_endpoint(endpoint_number, direction) if endpoint is None: - logging.error(f"Requested non-existent endpoint EP{endpoint_number}/{direction.name} for configured device!") + log.error(f"Requested non-existent endpoint EP{endpoint_number}/{direction.name} for configured device!") return endpoint else: - logging.error(f"Requested endpoint EP{endpoint_number}/{direction.name} for unconfigured device!") + log.error(f"Requested endpoint EP{endpoint_number}/{direction.name} for unconfigured device!") return None @@ -374,7 +374,7 @@ def handle_data_available(self, ep_num, data): def handle_bus_reset(self): """ Event handler for a bus reset. """ - logging.info("Host issued a bus reset; resetting our connection.") + log.info("Host issued a bus reset; resetting our connection.") # Clear our state back to address zero and no configuration. self.configuration = None @@ -392,7 +392,7 @@ def handle_request(self, request: USBControlRequest): Parameters: request -- the USBControlRequest object representing the relevant request """ - logging.debug(f"{self.name} received request: {request}") + log.debug(f"{self.name} received request: {request}") # Call our base USBRequestHandler method. handled = super().handle_request(request) @@ -400,7 +400,7 @@ def handle_request(self, request: USBControlRequest): # As the top-most handle_request function, we have an extra responsibility: # we'll need to stall the endpoint if no handler was found. if not handled: - logging.warning(f"Stalling unhandled {request}.") + log.warning(f"Stalling unhandled {request}.") self._add_request_suggestion(request) self.stall(direction=USBDirection.IN) @@ -426,7 +426,7 @@ def handle_data_received(self, endpoint: USBEndpoint, data: bytes): # If we're un-configured, we don't expect to receive # anything other than control data; defer to our "unexpected data". else: - logging.error(f"Received non-control data when unconfigured!" + log.error(f"Received non-control data when unconfigured!" "This is invalid host behavior.") self.handle_unexpected_data_received(endpoint.number, data) @@ -442,7 +442,7 @@ def handle_unexpected_data_received(self, endpoint_number: int, data: bytes): endpoint_number -- The endpoint number on which the data was received. data -- The raw bytes received on the relevant endpoint. """ - logging.error(f"Received {len(data)} bytes of data on invalid EP{endpoint_number}/OUT.") + log.error(f"Received {len(data)} bytes of data on invalid EP{endpoint_number}/OUT.") def handle_data_requested(self, endpoint: USBEndpoint): @@ -463,7 +463,7 @@ def handle_data_requested(self, endpoint: USBEndpoint): # If we're un-configured, we don't expect to receive # anything other than control data; defer to our "unexpected data". else: - logging.error(f"Received non-control data when unconfigured!" + log.error(f"Received non-control data when unconfigured!" "This is invalid host behavior.") self.handle_unexpected_data_requested(endpoint.number) @@ -478,7 +478,7 @@ def handle_unexpected_data_requested(self, endpoint_number: int): Parameters: endpoint_number -- The endpoint number the data was received. """ - logging.error(f"Host requested data on invalid EP{endpoint_number}/IN.") + log.error(f"Host requested data on invalid EP{endpoint_number}/IN.") def handle_buffer_empty(self, endpoint: USBEndpoint): @@ -722,7 +722,7 @@ def handle_generic_get_descriptor_request( request: USBControlRequest): """ Handle GET_DESCRIPTOR requests; per USB2 [9.4.3] """ - logging.debug(f"received GET_DESCRIPTOR request {request}") + log.debug(f"received GET_DESCRIPTOR request {request}") # Extract the core parameters from the request. descriptor_type = request.value_high @@ -741,9 +741,9 @@ def handle_generic_get_descriptor_request( response_length = min(request.length, len(response)) request.reply(response[:response_length]) - logging.log(LOGLEVEL_TRACE, f"sending {response_length} bytes in response") + log.trace(f"sending {response_length} bytes in response") else: - logging.log(LOGLEVEL_TRACE, f"stalling descriptor request") + log.trace(f"stalling descriptor request") request.stall() @@ -780,7 +780,7 @@ class USBDevice(USBBaseDevice): def handle_get_status_request(self, request): """ Handles GET_STATUS requests; per USB2 [9.4.5].""" - logging.debug("received GET_STATUS request") + log.debug("received GET_STATUS request") # self-powered and remote-wakeup (USB 2.0 Spec section 9.4.5) request.reply(b'\x03\x00') @@ -790,7 +790,7 @@ def handle_get_status_request(self, request): @to_device def handle_clear_feature_request(self, request): """ Handle CLEAR_FEATURE requests; per USB2 [9.4.1] """ - logging.debug(f"Received CLEAR_FEATURE request with type {request.number} and value {request.value}.") + log.debug(f"Received CLEAR_FEATURE request with type {request.number} and value {request.value}.") request.acknowledge() @@ -798,7 +798,7 @@ def handle_clear_feature_request(self, request): @to_device def handle_set_feature_request(self, request): """ Handle SET_FEATURE requests; per USB2 [9.4.9] """ - logging.debug("received SET_FEATURE request") + log.debug("received SET_FEATURE request") request.stall() @@ -824,7 +824,7 @@ def handle_get_descriptor_request(self, request): @to_device def handle_set_descriptor_request(self, request): """ Handle SET_DESCRIPTOr requests; per USB2 [9.4.8] """ - logging.debug("received SET_DESCRIPTOR request") + log.debug("received SET_DESCRIPTOR request") request.stall() @@ -832,7 +832,7 @@ def handle_set_descriptor_request(self, request): @to_device def handle_get_configuration_request(self, request): """ Handle GET_CONFIGURATION requests; per USB2 [9.4.2] """ - logging.debug(f"received GET_CONFIGURATION request for configuration {request.value}") + log.debug(f"received GET_CONFIGURATION request for configuration {request.value}") # If we haven't yet been configured, send back a zero configuration value. if self.configuration is None: @@ -848,7 +848,7 @@ def handle_get_configuration_request(self, request): @to_device def handle_set_configuration_request(self, request): """ Handle SET_CONFIGURATION requests; per USB2 [9.4.7] """ - logging.debug("received SET_CONFIGURATION request") + log.debug("received SET_CONFIGURATION request") # If the host is requesting configuration zero, they're asking # us to drop our configuration. @@ -873,7 +873,7 @@ def handle_set_configuration_request(self, request): @to_device def handle_get_interface_request(self, request): """ Handle GET_INTERFACE requests; per USB2 [9.4.4] """ - logging.debug("received GET_INTERFACE request") + log.debug("received GET_INTERFACE request") # TODO: support alternate interfaces. # Since we don't support alternate interfaces [yet], we'll always @@ -888,7 +888,7 @@ def handle_get_interface_request(self, request): @to_device def handle_set_interface_request(self, request): """ Handle SET_INTERFACE requests; per USB2 [9.4.10] """ - logging.debug(f"f{self.name} received SET_INTERFACE request") + log.debug(f"f{self.name} received SET_INTERFACE request") # We don't support alternate interfaces; so ACK setting # interface zero, and stall all others. @@ -903,5 +903,5 @@ def handle_set_interface_request(self, request): @to_device def handle_synch_frame_request(self, request): """ Handle SYNC_FRAME requests; per USB2 [9.4.10] """ - logging.debug(f"f{self.name} received SYNCH_FRAME request") + log.debug(f"f{self.name} received SYNCH_FRAME request") request.acknowledge() diff --git a/facedancer/devices/__init__.py b/facedancer/devices/__init__.py index 97670f8..d22c558 100644 --- a/facedancer/devices/__init__.py +++ b/facedancer/devices/__init__.py @@ -6,14 +6,9 @@ import pprint import asyncio import inspect -import logging import argparse - -# Log formatting strings. -LOG_FORMAT_COLOR = "\u001b[37;1m%(levelname)-8s| \u001b[0m\u001b[1m%(module)-15s|\u001b[0m %(message)s" -LOG_FORMAT_PLAIN = "%(levelname)-8s:n%(module)-15s>%(message)s" - +from ..logging import configure_default_logging def default_main(device_or_type, *coroutines): """ Simple, default main for FaceDancer emulation. @@ -29,14 +24,9 @@ def default_main(device_or_type, *coroutines): parser.add_argument('-v', '--verbose', help="Controls verbosity. 0=silent, 3=default, 5=spammy", default=3) args = parser.parse_args() - if sys.stdout.isatty(): - log_format = LOG_FORMAT_COLOR - else: - log_format = LOG_FORMAT_PLAIN - # Set up our logging output. python_loglevel = 50 - (int(args.verbose) * 10) - logging.basicConfig(level=python_loglevel, format=log_format) + configure_default_logging(level=python_loglevel) if inspect.isclass(device_or_type): device = device_or_type() diff --git a/facedancer/devices/ftdi.py b/facedancer/devices/ftdi.py index 9ae4d72..3764130 100644 --- a/facedancer/devices/ftdi.py +++ b/facedancer/devices/ftdi.py @@ -1,11 +1,10 @@ # pylint: disable=unused-wildcard-import, wildcard-import # -# This file is part of Facedancer. +# This file is part of FaceDancer. # """ Emulation of an FTDI USB-to-serial converter. """ import asyncio -import logging from enum import IntFlag from typing import Union @@ -14,12 +13,13 @@ from .. import * from ..classes import USBDeviceClass +from ..logging import log + OUT_ENDPOINT = 1 IN_ENDPOINT = 3 - class FTDIFlowControl(IntFlag): """ Constants describing how FTDI flow control works. """ NO_FLOW_CONTROL = 0 @@ -94,7 +94,7 @@ def reset_ftdi(self): @vendor_request_handler(number=0) def handle_reset_request(self, request): - logging.debug("Received FTDI reset; assuming initial settings.") + log.debug("Received FTDI reset; assuming initial settings.") self.reset_ftdi() request.acknowledge() @@ -102,7 +102,7 @@ def handle_reset_request(self, request): @vendor_request_handler(number=1) def handle_modem_ctrl_request(self, req): - logging.debug("received modem_ctrl request") + log.debug("received modem_ctrl request") dtr = bool(req.value & 0x0001) rts = bool(req.value & 0x0002) @@ -110,9 +110,9 @@ def handle_modem_ctrl_request(self, req): self.use_rts = bool(req.value & 0x0200) if dtr: - logging.info("DTR set -- host appears to have connected via virtual serial.") + log.info("DTR set -- host appears to have connected via virtual serial.") else: - logging.info("DTR cleared -- host appears to have disconnected from virtual serial.") + log.info("DTR cleared -- host appears to have disconnected from virtual serial.") if self.use_dtr: self.data_terminal_ready = dtr @@ -131,9 +131,9 @@ def handle_set_flow_ctrl_request(self, request): self.flow_control = FTDIFlowControl(request.value) if self.flow_control: - logging.info(f"Host has set up {self.flow_control.name} flow control.") + log.info(f"Host has set up {self.flow_control.name} flow control.") else: - logging.info(f"Host has disabled flow control.") + log.info(f"Host has disabled flow control.") request.acknowledge() except KeyError: @@ -146,7 +146,7 @@ def handle_set_baud_rate_request(self, request): """ Control request to set our baud rate. """ if request.value > 9: - logging.warning("Host specified an unknown baud rate value. Stalling.") + log.warning("Host specified an unknown baud rate value. Stalling.") request.stall() return @@ -161,13 +161,13 @@ def handle_set_baud_rate_request(self, request): elif request.value == 9: self.baud_rate = 115200 - logging.info(f"Host set baud rate to {self.baud_rate}.") + log.info(f"Host set baud rate to {self.baud_rate}.") request.acknowledge() @vendor_request_handler(number=4) def handle_set_data_request(self, request): - logging.debug("received set_data request") + log.debug("received set_data request") request.acknowledge() @@ -187,25 +187,25 @@ def handle_get_modem_status_request(self, request): @vendor_request_handler(number=6) def handle_set_event_char_request(self, request): - logging.debug("received set_event_char request") + log.debug("received set_event_char request") request.acknowledge() @vendor_request_handler(number=7) def handle_set_error_char_request(self, request): - logging.debug("received set_error_char request") + log.debug("received set_error_char request") request.acknowledge() @vendor_request_handler(number=9) def handle_set_latency_timer_request(self, request): - logging.debug("received set_latency_timer request") + log.debug("received set_latency_timer request") request.acknowledge() @vendor_request_handler(number=10) def handle_get_latency_timer_request(self, request): - logging.debug("received get_latency_timer request") + log.debug("received get_latency_timer request") # Per Travis Goodspeed, this is a "bullshit value". request.reply(b'\x01') @@ -237,7 +237,7 @@ def handle_serial_data_received(self, data): Subclasses should override this to capture data from the host. """ - logging.debug(f"Received serial data: {data}") + log.debug(f"Received serial data: {data}") def transmit(self, data: Union[str, bytes], *, blocking: bool = False, adjust_endings: bool = True): diff --git a/facedancer/devices/umass/umass.py b/facedancer/devices/umass/umass.py index 0539dc2..617d2b9 100644 --- a/facedancer/devices/umass/umass.py +++ b/facedancer/devices/umass/umass.py @@ -10,14 +10,17 @@ import struct import sys import time -import logging from enum import IntFlag from typing import Union +from .. import default_main + from ... import * from ...classes import USBDeviceClass -from .. import default_main + +from ...logging import log + ENDPOINT_OUT = 1 ENDPOINT_IN = 3 @@ -90,7 +93,7 @@ def handle_data_received(self, endpoint, data): # dispatch received data to our SCSI command handler self.scsi_command_handler.handle_data_received(data) else: - logging.warn(f"Received data on unexpected endpoint: {endpoint}") + log.warn(f"Received data on unexpected endpoint: {endpoint}") # diff --git a/facedancer/endpoint.py b/facedancer/endpoint.py index ec52bce..2c2c2e6 100644 --- a/facedancer/endpoint.py +++ b/facedancer/endpoint.py @@ -4,7 +4,6 @@ """ Functionality for describing USB endpoints. """ import struct -import logging from typing import Iterable from dataclasses import dataclass @@ -16,6 +15,8 @@ from .types import USBDirection, USBTransferType, USBSynchronizationType from .types import USBUsageType, USBStandardRequests +from .logging import log + @dataclass class USBEndpoint(USBDescribable, AutoInstantiable, USBRequestHandler): @@ -116,7 +117,7 @@ def handle_data_received(self, data: bytes): Parameters: data -- The raw bytes received. """ - logging.info(f"EP{self.number} received {len(data)} bytes of data; " + log.info(f"EP{self.number} received {len(data)} bytes of data; " "but has no handler.") @@ -131,7 +132,7 @@ def handle_buffer_empty(self): @standard_request_handler(number=USBStandardRequests.CLEAR_FEATURE) @to_this_endpoint def handle_clear_feature_request(self, request): - logging.debug(f"received CLEAR_FEATURE request for endpoint {self.number} " + log.debug(f"received CLEAR_FEATURE request for endpoint {self.number} " f"with value {req.value}") request.acknowledge() diff --git a/facedancer/filters/logging.py b/facedancer/filters/logging.py index 0140c93..b266cb3 100644 --- a/facedancer/filters/logging.py +++ b/facedancer/filters/logging.py @@ -3,9 +3,10 @@ # import datetime -import logging -from ..proxy import USBProxyFilter +from ..proxy import USBProxyFilter + +from ..logging import log class USBProxyPrettyPrintFilter(USBProxyFilter): @@ -28,14 +29,14 @@ def filter_control_in(self, req, data, stalled): """ if self.verbose > 3 and req is None: - logging.info("{} {}< --filtered out-- ".format(self.timestamp(), self.decoration)) + log.info("{} {}< --filtered out-- ".format(self.timestamp(), self.decoration)) return req, data, stalled if self.verbose > 3: - logging.info("{} {}{}".format(self.timestamp(), self.decoration, repr(req))) + log.info("{} {}{}".format(self.timestamp(), self.decoration, repr(req))) if self.verbose > 3 and stalled: - logging.info("{} {}< --STALLED-- ".format(self.timestamp(), self.decoration)) + log.info("{} {}< --STALLED-- ".format(self.timestamp(), self.decoration)) if self.verbose > 4 and data: is_string = (req.request == 6) and (req.value >> 8 == 3) @@ -52,11 +53,11 @@ def filter_control_out(self, req, data): # TODO: just call control_in, it's the same: if self.verbose > 3 and req is None: - logging.info("{} {}> --filtered out-- ".format(self.timestamp(), self.decoration)) + log.info("{} {}> --filtered out-- ".format(self.timestamp(), self.decoration)) return req, data if self.verbose > 3: - logging.info("{} {}{}".format(self.timestamp(), self.decoration, repr(req))) + log.info("{} {}{}".format(self.timestamp(), self.decoration, repr(req))) if self.verbose > 4 and data: self._pretty_print_data(data, '>', self.decoration) @@ -70,9 +71,9 @@ def handle_out_request_stall(self, req, data, stalled): """ if self.verbose > 3 and req is None: if stalled: - logging.info("{} {}> --STALLED-- ".format(self.timestamp(), self.decoration)) + log.info("{} {}> --STALLED-- ".format(self.timestamp(), self.decoration)) else: - logging.info("{} {}> --STALLED, but unstalled by filter-- ".format(self.timestamp(), self.decoration)) + log.info("{} {}> --STALLED, but unstalled by filter-- ".format(self.timestamp(), self.decoration)) return req, data, stalled @@ -112,4 +113,4 @@ def _magic_decode(self, data): def _pretty_print_data(self, data, direction_marker, decoration='', is_string=False, ep_marker=''): data = self._magic_decode(data) if is_string else bytes(data) - logging.info("{} {}{}{}: {}".format(self.timestamp(), ep_marker, decoration, direction_marker, data)) + log.info("{} {}{}{}: {}".format(self.timestamp(), ep_marker, decoration, direction_marker, data)) diff --git a/facedancer/filters/standard.py b/facedancer/filters/standard.py index fe7d758..d8fdcd9 100644 --- a/facedancer/filters/standard.py +++ b/facedancer/filters/standard.py @@ -2,13 +2,13 @@ # Standard filters for USBProxy that should (almost) always be used # -import logging - from .. import * from ..descriptor import USBDescribable from ..proxy import USBProxyFilter from ..errors import * +from ..logging import log + class USBProxySetupFilters(USBProxyFilter): @@ -47,7 +47,7 @@ def filter_control_in(self, req, data, stalled): if descriptor_type == self.DESCRIPTOR_CONFIGURATION and req.length >= 32: configuration = USBDescribable.from_binary_descriptor(data) self.configurations[configuration.number] = configuration - logging.debug("-- Storing configuration {} --".format(configuration)) + log.debug("-- Storing configuration {} --".format(configuration)) if descriptor_type == self.DESCRIPTOR_DEVICE and req.length >= 7: @@ -56,7 +56,7 @@ def filter_control_in(self, req, data, stalled): device = USBDescribable.from_binary_descriptor(data) device.max_packet_size_ep0 = 64 data = bytearray(device.get_descriptor())[:len(data)] - logging.debug("-- Patched device descriptor. --") + log.debug("-- Patched device descriptor. --") return req, data, stalled @@ -81,13 +81,13 @@ def filter_control_out(self, req, data): if configuration_index in self.configurations: configuration = self.configurations[configuration_index] - logging.debug("-- Applying configuration {} --".format(configuration)) + log.debug("-- Applying configuration {} --".format(configuration)) self.device.configured(configuration) # Otherwise, the host has applied a configuration without ever reading # its descriptor. This is mighty strange behavior! else: - logging.warn("-- WARNING: Applying configuration {}, but we've never read that configuration's descriptor! --".format(configuration_index)) + log.warn("-- WARNING: Applying configuration {}, but we've never read that configuration's descriptor! --".format(configuration_index)) return req, data diff --git a/facedancer/interface.py b/facedancer/interface.py index 09b067b..b38ece8 100644 --- a/facedancer/interface.py +++ b/facedancer/interface.py @@ -4,7 +4,6 @@ """ Functionality for defining USB interfaces. """ import struct -import logging from typing import Dict, Iterable from dataclasses import dataclass, field @@ -18,6 +17,8 @@ from .request import standard_request_handler, to_this_interface from .endpoint import USBEndpoint +from .logging import log + @dataclass class USBInterface(USBDescribable, AutoInstantiable, USBRequestHandler): @@ -199,7 +200,7 @@ def get_endpoints(self): @to_this_interface def handle_get_descriptor_request(self, request): """ Handle GET_DESCRIPTOR requests; per USB2 [9.4.3] """ - logging.debug("Handling GET_DESCRIPTOR on endpoint.") + log.debug("Handling GET_DESCRIPTOR on endpoint.") # This is the same as the USBDevice get descriptor request => avoid duplication. self.get_device().handle_generic_get_descriptor_request(self, request) diff --git a/facedancer/logging.py b/facedancer/logging.py new file mode 100644 index 0000000..9f6192f --- /dev/null +++ b/facedancer/logging.py @@ -0,0 +1,36 @@ +import functools +import logging +import sys + + +LOGLEVEL_TRACE = logging.DEBUG + 2 + +LOG_FORMAT_COLOR = "\u001b[37;1m%(levelname)-8s| \u001b[0m\u001b[1m%(module)-15s|\u001b[0m %(message)s" +LOG_FORMAT_PLAIN = "%(levelname)-8s| %(module)-15s| %(message)s" + + +def configure_default_logging(level=logging.INFO, logger=logging): + if sys.stdout.isatty(): + log_format = LOG_FORMAT_COLOR + else: + log_format = LOG_FORMAT_PLAIN + + logger.basicConfig(level=level, format=log_format) + logging.getLogger("facedancer").level = level + + +def _initialize_logging(): + # add a TRACE level to logging + logging.TRACE = LOGLEVEL_TRACE + logging.addLevelName(logging.TRACE, "TRACE") + logging.Logger.trace = functools.partialmethod(logging.Logger.log, logging.TRACE) + logging.trace = functools.partial(logging.log, logging.TRACE) + + # Configure facedancer logger + logger = logging.getLogger("facedancer") + logger.level = logging.WARN + + return logger + + +log = _initialize_logging() diff --git a/facedancer/proxy.py b/facedancer/proxy.py index 8e717c6..455b682 100644 --- a/facedancer/proxy.py +++ b/facedancer/proxy.py @@ -2,10 +2,11 @@ import atexit import usb1 -import logging -from .errors import DeviceNotFoundError -from .usb import USB +from .errors import DeviceNotFoundError +from .usb import USB + +from .logging import log class LibUSB1Proxy: @@ -282,7 +283,7 @@ def __init__(self, index=0, quirks=[], scheduler=None, **kwargs): # detach it from any kernel-side driver that may prevent us # from communicating with it... device_handle = self.proxied_device.open(device, detach=True) - logging.info(f"Found {self.proxied_device.device_speed().name} speed device to proxy: {device}") + log.info(f"Found {self.proxied_device.device_speed().name} speed device to proxy: {device}") def add_filter(self, filter_object, head=False): @@ -317,7 +318,7 @@ def connect(self): # FIXME self.backend does not yet exist here so this will always fail self.backend.set_device_speed(device_speed) except Exception as e: - logging.warning(f"-- facedancer backend does not support setting device speed: {device_speed.name} --") + log.warning(f"-- facedancer backend does not support setting device speed: {device_speed.name} --") super().connect() @@ -554,7 +555,7 @@ def _proxy_in_transfer(self, endpoint): if __name__ == "__main__": - from . import FacedancerUSBApp, LOGLEVEL_TRACE + from . import FacedancerUSBApp from .filters.standard import USBProxySetupFilters from .filters.logging import USBProxyPrettyPrintFilter @@ -576,7 +577,8 @@ def _proxy_in_transfer(self, endpoint): device.add_filter(USBProxyPrettyPrintFilter(verbose=5)) async def configure_logging(): - logging.getLogger().setLevel(logging.INFO) + import logging + logging.getLogger("facedancer").setLevel(logging.INFO) from facedancer import main main(device, configure_logging()) diff --git a/facedancer/request.py b/facedancer/request.py index 21a812d..e091893 100644 --- a/facedancer/request.py +++ b/facedancer/request.py @@ -4,7 +4,6 @@ """ Functionality for declaring and working with USB control requests. """ import inspect -import logging import warnings import functools @@ -12,11 +11,9 @@ from dataclasses import dataclass from abc import ABCMeta, abstractmethod - from .descriptor import USBDescribable from .types import USBRequestRecipient, USBRequestType, USBDirection, USBStandardRequests -logger = logging.getLogger(__name__) def _wrap_with_field_matcher(func, field_name, field_value, match_index=False): """ Internal function; generates a request-refinement decorator.