Skip to content

LeechCore_API_Python

ufrisk edited this page Aug 20, 2023 · 11 revisions

Python API

Most functionality in LeechCore is exported in a Python API. To make things easier the API is packaged in a pip package which is available as leechcorepyc on Python PIP. This is also the preferred way of installing the Python package even though it's completely possible to compile and install locally.

LeechCore handles physical memory and low-level operations. For analysis of processes and virtual memory check out MemProcFS and the memprocfs Python PIP package.

The Python PIP package is a native binary CPython C Extension and are only available for 64-bit Linux (in source and binary) and for 64-bit Windows (binary).

If using the Python API outside the Python PIP package please note that python may have to be started from the same folder as leechcorepyc.pyd (Windows) or leechcorepyc.so (Linux).

Installing:

To install LeechCore for Python please run:

pip install leechcorepyc

If successful, LeechCore should now be installed and possible to use. If doing DMA attacks using an FPGA on Linux permissions may have to be relaxed on some /dev/ devices (please see the FPGA guide section) or be run as root. On 64-bit Windows 10 it should just work.

Functionality:

leechcorepyc package:

# initialization function
leechcorepyc.LeechCore(str: device, [str: remote], [int: flags], [int: maxaddr]) -> LeechCore object

# constants used in get_option, set_option and command_data
# constants are derived from the c-header 'leechcore.h'
leechcorepyc.LC_*

LeechCore Object:

# object functions:
close()
write(int: addr, bytes: data to write) -> None
read(int: addr, int: length, [bool: zeropad_on_fail]) -> bytes object
read_scatter([int: addr1, ..., int: addrN]) -> array of dict with read pages
get_option(int: option-id leechcorepyc.LC_OPT*) -> int: option value
set_option(int: option-id leechcorepyc.LC_OPT*, int: option value) -> None
command_data(int: command-id leechcorepyc.LC_CMD*, [bytes: input data]) -> bytes: output data
tlp_write([bytes: tlp0, ..., bytes: tlpN]) -> None
tlp_read(function: read_callback, [bool: supress_tlp_cpl_on_read, bool: background_thread]) -> None
tlp_tostring(bytes: tlp) -> str: tlp as human-readable string
bar_disable() -> None
bar_enable(function: bar_callback) -> None
bar_enable_zero() -> None

# object fields:
call_statistics  # dict of call statistics
bar_info         # list of 6 dicts containing PCIe BAR info.
memmap           # dict of memory map items (read/write)
device           # device connection string
remote           # remote connection string
type             # device type name
is_writable      # bool if device memory is writable
is_volatile      # bool if device memory is live or static
is_keepalive     # enable a background read every 2s
is_remote        # bool if device is remote
is_remote_nocompress

LeechCoreBarRequest Object:

# object functions:
reply(bytes: read_reply) -> None
reply_fail() -> None

# object fields:
i_bar            # int: index of PCIe BAR (0-5).
bar              # dict: containing PCIe BAR info.
be_first         # int: first byte enable.
be_last          # int: last byte enable.
data_offset      # int: byte offset inside bar.
data_length      # int: data length.
data_write       # bytes: (or None if read).
is_read          # bool: is read request - reply with reply().
is_write         # bool: is write request.
tag              # int: pcie tlp tag id.

Minimal Example:

# Initialize using FPGA hardware device for DMA memory acquisition and then
# read 16 (0x10) bytes from physical address 0x1000.
import leechcorepyc
lc = leechcorepyc.LeechCore('fpga')
b = lc.read(0x1000, 0x10)
print(b) # -> b'\xe9M\x06\x00\x01\x00\x00\x00\x01\x00\x00\x00?\x00\x18\x10'

Example Initialize:

The examples below show how it's possible to initialize LeechCore in multiple different ways.

Each initialization example should be run individually and not at the same time.

# 1. Initialize using FPGA hardware device for DMA memory acquisition (Intel):
import leechcorepyc
lc = leechcorepyc.LeechCore('fpga')


# 2. Initialize using FPGA hardware device for DMA memory acquisition (AMD and Thunderbolt):
# AMD systems and Thunderbolt are sensitive for reads outside allowed ranges. A memory map
# which is specific to each system hardware is recommended to be provided manually (get it
# from the target with MemProcFS or Sysinternals RAMMap). By default LeechCore will try to
# auto-detect memory unless memmap is manually specified here as -1 dummy value.
text_memmap_example = """
0000         1000 -        9cfff
0001       100000 -       101fff
0002       103000 -     ad137fff
0003    100000000 -    43e5fffff"""
import leechcorepyc
lc = leechcorepyc.LeechCore('fpga', '', 0, -1)
lc.command_data(leechcorepyc.LC_CMD_MEMMAP_SET, bytes(text_memmap_example, 'ascii'))


# 3. Initialize using FPGA hardware device with debug console (non-python) output
# extra verbose (-v -vv) at initialization and then disable debug output.
import leechcorepyc
lc = leechcorepyc.LeechCore('fpga', '', leechcorepyc.LC_CONFIG_PRINTF_ENABLED +
                           leechcorepyc.LC_CONFIG_PRINTF_V + leechcorepyc.LC_CONFIG_PRINTF_VV)
lc.set_option(leechcorepyc.LC_OPT_CORE_PRINTF_ENABLE, 0)


# 4. Initialize using FPGA hardware device with debug console (non-python) output
# super extra verbose (-v -vv -vvv) to see all PCIe TLPs at initialization and
# then disable debug output.
import leechcorepyc
lc = leechcorepyc.LeechCore('fpga', '', leechcorepyc.LC_CONFIG_PRINTF_ENABLED +
                           leechcorepyc.LC_CONFIG_PRINTF_V + leechcorepyc.LC_CONFIG_PRINTF_VV +
                           leechcorepyc.LC_CONFIG_PRINTF_VVV)
lc.set_option(leechcorepyc.LC_OPT_CORE_PRINTF_ENABLE, 0)


# 5. Initialize reading VirtualBox .core dump file as read-only memory device.
# LeechCore also supports .raw, .dmp, .core, .vmware dump files.
# Memory map is retrieved from dump files to the '.memmap' field.
import leechcorepyc
lc = leechcorepyc.LeechCore('file://C:/dumps/win10.core')
print(lc.memmap)
#    ->
#    [{'base': 0, 'size': 3758096384, 'offset': 18860},
#    {'base': 3758096384, 'size': 50331648, 'offset': 3758115244}, ...]


# 6. Access Hyper-V machine live memory with LiveCloudLd from Hyper-V host
# via LeechAgent service running on Hyper-V host.
# (currently only available on leechcorepyc running on Windows]
import leechcorepyc
lc = leechcorepyc.LeechCore('hvmm', 'rpc://[email protected]:hv2.contoso.com')
#    -> TypeError: Unable to initialize.
print(leechcorepyc.GetLastError())
#    ->
#    Please select the ID of the virtual machine
#    [id = 0] [HV2]-ad-ca1
#    [id = 1] [HV2]-ad-dc2
#    [id = 2] [HV2]-ad-vivado
#    [id = 3] [HV2]-ad-dev
lc = leechcorepyc.LeechCore('hvmm://id=3', 'rpc://[email protected]:hv2.contoso.com')

Example Read/Write Physical Memory:

The below examples will assume a LeechCore object have already been initialized as x.

Not all memory is writable. FPGA DMA and Hyper-V LiveCloudKd is writable while files are read-only.

# 1. Read 3 memory pages (4096-byte aligned chunks) at non-contiguous
# locations at the same time in one  optimized call to the device.
# This is the recommended high-performance way to access memory.
mem = lc.read_scatter([0x1000, 0x7000, 0x01000000])
print(mem)
#   ->
#   [{'addr': 4096, 'data': b'\xe8\x90\xff\xf7\x00\x00 ...'},
#    {'addr': 28672, 'data': b'\x00\x00\x00\x00\x00\x00 ...'},
#    {'addr': 16777216, 'data': b'hbin\x00\xf0!\x02\x00 ...'}]


# 2. Read memory at any size and starting position. The call will
# fail if not all memory is read even if partial memory is read.
# read 10 bytes from address 0x10a0
mem = lc.read(0x10a0, 10)
print(mem) # -> b'\x00\xa0b\x00\x00\x00\x00\x00\x00 '


# 3. Read memory at any size and starting position. The call will
# zero-pad any memory that is failed as long as at least one byte
# of memory is successfully read.
# read 10 bytes from address 0xffff8
mem = lc.read(0xffff8, 10, True)
print(mem) # -> b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'


# 4. Write bytes to address 0x10a0.
print(lc.read(0x10a0, 8)) # -> b'\x00\xa0b\x00\x00\x00\x00\x00'
lc.write(0x10a0, b'\xc0\xfe\xfe\x00\x00\x00\x00\x00')
print(lc.read(0x10a0, 8)) # -> b'\xc0\xfe\xfe\x00\x00\x00\x00\x00'

Example Read/Write PCIe TLPs:

The example below displays how raw PCIe TLPs may be both written and read. PCIe TLP read/write requires a supported FPGA device connected to PCI Express of the target system.

import time
import binascii
import leechcorepyc

# Callback function to retrieve received PCIe TLPs. TLPs are forwarded
# to the registered callback function one at a time 'inline'.
def tlp_read_cb(tlp_bytes, tlp_desc):
    print('TLP CALLBACK RECEIVED:')
    print(tlp_desc)

try:
    # Initialize a new LeechCore FPGA backed object with default options.
    lc = leechcorepyc.LeechCore('fpga')
    #
    # By default TLPs from the host system towards the device PCIe
    # configuration space is filtered. Disable this filtering to
    # receive most TLPs (config read TLPs towards Xilinx FPGA core
    # config space is still not received). This is done by zero bit
    # number 204 (25 bytes + 4 bits) in 'pcileech_fifo.sv'.
    lc.command_data(leechcorepyc.LC_CMD_FPGA_CFGREGCFG_MARKWR + 25, b'\x00\x00\x10\x00')
    #
    # Register TLP callback for received TLPs. Callback can either
    # be peformed automatically in the background or whenever the
    # callback register function is called. In this case it will be
    # in the background. Also, TLPs related to memory reads will not
    # be filtered.
    lc.tlp_read(tlp_read_cb, False, True)
    #
    # Create a read TLP and send it. Sent TLPs are not delivered via
    # callback so print the TLP on-screen manually before sending it.
    # Results will be received by the callback.
    # TLP reads 6 DWORDS (24 bytes) from address 0x1000).
    wr_tlp_1 = binascii.unhexlify("00000006040000ff00001000")
    print('TLP TO SEND:')
    print(x.tlp_tostring(wr_tlp_1))
    lc.tlp_write([wr_tlp_1])
    #
    # TLPs will continue to be received by the callback function
    # until Python program exits or LeechCore object is closed.
    # As an example it may be interesting to run on the target
    # system, as root, 'lspci -d 10ee:0666 -xxx' to see config
    # space read requests delivered to the callback function.
    time.sleep(1)
    input("Press Enter to exit")
    lc.close()
except Exception as e:
    print(e)

Example Read/Write PCIe TLPs:

The example shows how to implement a PCIe BAR programmatically. The LeechCore instance in the example is initialized from an existing MemProcFS instance. This is however not required.

import time
import leechcorepyc
import memprocfs

# Callback function to retrieve request from the host system towards a device PCIe BAR.
# Requests may be either read or write. Read requests should be replied to.
def bar_cb(bar, req):
    try:
        print('BAR CALLBACK RECEIVED %s' % (req))
        if req.is_write:
            # write request - handle the write!
            print(req.data_write)
        else:
            reply_data = bytearray(req.data_length)
            for i in range(0, req.data_length):
                 reply_data[i] = (req.data_offset + req.data_length + i) & 0xff
            req.reply(bytes(reply_data))
    except Exception as e:
        print(e)

try:
    # Initialize MemProcFS and retrieve the numeric LeechCore handle.
    # From the numeric handle initialize a proper LeechCore object.
    # (uncomment three lines below to use this initialization method)
    #vmm = memprocfs.Vmm(['-device', 'fpga'])
    #lc_handle_numeric = vmm.get_config(memprocfs.VMMDLL_OPT_CORE_LEECHCORE_HANDLE)
    #lc = leechcorepyc.LeechCore('existing://' + str(lc_handle_numeric))
    #
    # Alternatively, initialize LeechCore directly without MemProcFS
    lc = leechcorepyc.LeechCore('fpga')
    #
    # View BAR information:
    lc.bar_info
    #
    # Enable a BAR ZERO implementation.
    # All BAR reads will be responded with bytes containing zeroes/nulls.
    lc.bar_enable_zero()
    #
    # Enable a BAR CUSTOM implementation via callback function.
    # This will supersede the previous bar_enable_zero() implementation.
    lc.bar_enable(bar_cb)
    #
    time.sleep(1)
    input("Press Enter to exit")
    # Disable any active BAR implementations.
    lc.bar_disable()
    # Close the LeechCore handle.
    # The handle inside the Vmm object will still be usable.
    lc.close()
except Exception as e:
    print(e)