Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

House cleaning #72

Merged
merged 8 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions angrop/arch.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,23 @@ def __init__(self, project, kernel_mode=False):
self.max_sym_mem_access = 4
self.alignment = project.arch.instruction_alignment
self.max_block_size = self.alignment * 8
self.reg_list = self._get_reg_list()
self.reg_set = self._get_reg_set()

a = project.arch
self.base_pointer = a.register_names[a.bp_offset]

def _get_reg_list(self):
def _get_reg_set(self):
"""
get the list of names of general-purpose registers
get the set of names of general-purpose registers
"""
arch = self.project.arch
_sp_reg = arch.register_names[arch.sp_offset]
_ip_reg = arch.register_names[arch.ip_offset]

# get list of general-purpose registers
self._reg_list = arch.default_symbolic_registers
default_regs = arch.default_symbolic_registers
# prune the register list of the instruction pointer and the stack pointer
return [r for r in self._reg_list if r not in (_sp_reg, _ip_reg)]
return {r for r in default_regs if r not in (_sp_reg, _ip_reg)}

def block_make_sense(self, block):
return True
Expand Down
338 changes: 43 additions & 295 deletions angrop/chain_builder/__init__.py

Large diffs are not rendered by default.

92 changes: 70 additions & 22 deletions angrop/chain_builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,35 @@ class Builder:
"""
a generic class to bootstrap more complicated chain building functionality
"""
def __init__(self, project, reg_list=None, badbytes=None, filler=None):
self.project = project
self._reg_set = set(reg_list)
self._badbytes = badbytes
self._roparg_filler = filler
def __init__(self, chain_builder):
self.chain_builder = chain_builder
self.project = chain_builder.project
self.arch = chain_builder.arch

@property
def badbytes(self):
return self.chain_builder.badbytes

@property
def roparg_filler(self):
return self.chain_builder.roparg_filler

def make_sim_state(self, pc):
"""
make a symbolic state with all general purpose register + base pointer symbolized
and emulate a `pop pc` situation
"""
arch_bytes = self.project.arch.bytes
arch_endness = self.project.arch.memory_endness

state = rop_utils.make_symbolic_state(self.project, self.arch.reg_set)
rop_utils.make_reg_symbolic(state, self.arch.base_pointer)

state.regs.ip = pc
state.add_constraints(state.memory.load(state.regs.sp, arch_bytes, endness=arch_endness) == pc)
state.regs.sp += arch_bytes
state.solver._solver.timeout = 5000
return state

@staticmethod
def _sort_chains(chains):
Expand Down Expand Up @@ -47,10 +71,35 @@ def _word_contain_badbyte(self, ptr):
else:
ptr = ptr.concreted
raw_bytes = struct.pack(self.project.arch.struct_fmt(), ptr)
if any(x in raw_bytes for x in self._badbytes):
if any(x in raw_bytes for x in self.badbytes):
return True
return False

def _get_ptr_to_writable(self, size):
"""
get a pointer to writable region that can fit `size` bytes
it shouldn't contain bad byte
"""
# get all writable segments
segs = [ s for s in self.project.loader.main_object.segments if s.is_writable ]
# enumerate through all address to find a good address
for seg in segs:
for addr in range(seg.min_addr, seg.max_addr):
if all(not self._word_contain_badbyte(x) for x in range(addr, addr+size, self.project.arch.bytes)):
return addr
return None

def _get_ptr_to_null(self):
# get all non-writable segments
segs = [ s for s in self.project.loader.main_object.segments if not s.is_writable ]
# enumerate through all address to find a good address
for seg in segs:
null = b'\x00'*self.project.arch.bytes
for addr in self.project.loader.memory.find(null, search_min=seg.min_addr, search_max=seg.max_addr):
if not self._word_contain_badbyte(addr):
return addr
return None

@rop_utils.timeout(2)
def _build_reg_setting_chain(self, gadgets, modifiable_memory_range, register_dict, stack_change):
"""
Expand All @@ -61,22 +110,15 @@ def _build_reg_setting_chain(self, gadgets, modifiable_memory_range, register_di
FIXME: trim this disgusting function
"""

# create a symbolic state
test_symbolic_state = rop_utils.make_symbolic_state(self.project, self._reg_set)
# emulate a 'pop pc' of the first gadget
test_symbolic_state = self.make_sim_state(gadgets[0].addr)

addrs = [g.addr for g in gadgets]
addrs.append(test_symbolic_state.solver.BVS("next_addr", self.project.arch.bits))

arch_bytes = self.project.arch.bytes
arch_endness = self.project.arch.memory_endness

# emulate a 'pop pc' of the first gadget
state = test_symbolic_state
state.regs.ip = addrs[0]
# the stack pointer must begin pointing to our first gadget
state.add_constraints(state.memory.load(state.regs.sp, arch_bytes, endness=arch_endness) == addrs[0])
# push the stack pointer down, like a pop would do
state.regs.sp += arch_bytes
state.solver._solver.timeout = 5000

# step through each gadget
# for each gadget, constrain memory addresses and add constraints for the successor
Expand Down Expand Up @@ -110,20 +152,20 @@ def _build_reg_setting_chain(self, gadgets, modifiable_memory_range, register_di
test_symbolic_state.add_constraints(var == v.data)

# constrain the "filler" values
if self._roparg_filler is not None:
if self.roparg_filler is not None:
for i in range(stack_change // bytes_per_pop):
sym_word = test_symbolic_state.memory.load(sp + bytes_per_pop*i, bytes_per_pop,
endness=self.project.arch.memory_endness)
# check if we can constrain val to be the roparg_filler
if test_symbolic_state.solver.satisfiable((sym_word == self._roparg_filler,)) and \
rebase_state.solver.satisfiable((sym_word == self._roparg_filler,)):
if test_symbolic_state.solver.satisfiable((sym_word == self.roparg_filler,)) and \
rebase_state.solver.satisfiable((sym_word == self.roparg_filler,)):
# constrain the val to be the roparg_filler
test_symbolic_state.add_constraints(sym_word == self._roparg_filler)
rebase_state.add_constraints(sym_word == self._roparg_filler)
test_symbolic_state.add_constraints(sym_word == self.roparg_filler)
rebase_state.add_constraints(sym_word == self.roparg_filler)

# create the ropchain
chain = RopChain(self.project, self, state=test_symbolic_state.copy(),
badbytes=self._badbytes)
badbytes=self.badbytes)

# iterate through the stack values that need to be in the chain
for i in range(stack_change // bytes_per_pop):
Expand Down Expand Up @@ -152,3 +194,9 @@ def _build_reg_setting_chain(self, gadgets, modifiable_memory_range, register_di
if len(gadgets) > 0:
raise RopException("Didnt find all gadget addresses, something must've broke")
return chain

def _get_fill_val(self):
if self.roparg_filler is not None:
return self.roparg_filler
else:
return claripy.BVS("filler", self.project.arch.bits)
109 changes: 109 additions & 0 deletions angrop/chain_builder/func_caller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import logging

import angr

from .builder import Builder
from ..errors import RopException
from ..rop_gadget import RopGadget

l = logging.getLogger(__name__)

class FuncCaller(Builder):
"""
handle function calls by automatically detecting the target binary's
calling convention
"""

def _func_call(self, func_gadget, cc, args, extra_regs=None, modifiable_memory_range=None, preserve_regs=None,
use_partial_controllers=False, needs_return=True):
assert type(args) in [list, tuple], "function arguments must be a list or tuple!"
arch_bytes = self.project.arch.bytes
registers = {} if extra_regs is None else extra_regs
if preserve_regs is None:
preserve_regs = []

# distinguish register and stack arguments
register_arguments = args
stack_arguments = []
if len(args) > len(cc.ARG_REGS):
register_arguments = args[:len(cc.ARG_REGS)]
stack_arguments = args[len(cc.ARG_REGS):]

# set register arguments
for arg, reg in zip(register_arguments, cc.ARG_REGS):
registers[reg] = arg
for reg in preserve_regs:
registers.pop(reg, None)
chain = self.chain_builder.set_regs(modifiable_memory_range=modifiable_memory_range,
use_partial_controllers=use_partial_controllers,
**registers)

# invoke the function
chain.add_gadget(func_gadget)
for _ in range(func_gadget.stack_change//arch_bytes-1):
chain.add_value(self._get_fill_val())

# we are done here if there is no stack arguments
if not stack_arguments:
return chain

# handle stack arguments:
# 1. we need to pop the arguments after use
# 2. push the stack arguments

# step 1: find a stack cleaner (a gadget that can pop all the stack args)
# with the smallest stack change
stack_cleaner = None
if needs_return:
for g in self.chain_builder.gadgets:
# just pop plz
if g.mem_reads or g.mem_writes or g.mem_changes:
continue
# at least can pop all the args
if g.stack_change < arch_bytes * (len(stack_arguments)+1):
continue

if stack_cleaner is None or g.stack_change < stack_cleaner.stack_change:
stack_cleaner = g

if stack_cleaner is None:
raise RopException(f"Fail to find a stack cleaner that can pop {len(stack_arguments)} words!")

# in case we can't find a stack_cleaner and we don't need to return
if stack_cleaner is None:
stack_cleaner = RopGadget(self._get_fill_val())
stack_cleaner.stack_change = arch_bytes * (len(stack_arguments)+1)

chain.add_gadget(stack_cleaner)
stack_arguments += [self._get_fill_val()]*(stack_cleaner.stack_change//arch_bytes - len(stack_arguments)-1)
for arg in stack_arguments:
chain.add_value(arg)

return chain

def func_call(self, address, args, **kwargs):
"""
:param address: address or name of function to call
:param args: a list/tuple of arguments to the function
:param preserve_regs: list of registers which shouldn't be set
:param needs_return: whether to continue the ROP after invoking the function
:return: a RopChain which inovkes the function with the arguments
"""
# is it a symbol?
if isinstance(address, str):
symbol = address
symobj = self.project.loader.main_object.get_symbol(symbol)
if hasattr(self.project.loader.main_object, 'plt') and address in self.project.loader.main_object.plt:
address = self.project.loader.main_object.plt[symbol]
elif symobj is not None:
address = symobj.rebased_addr
else:
raise RopException("Symbol passed to func_call does not exist in the binary")

cc = angr.default_cc(
self.project.arch.name,
platform=self.project.simos.name if self.project.simos is not None else None,
)(self.project.arch)
func_gadget = RopGadget(address)
func_gadget.stack_change = self.project.arch.bytes
return self._func_call(func_gadget, cc, args, **kwargs)
Loading