Skip to content

Commit

Permalink
qs: documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
williballenthin committed May 30, 2023
1 parent c0268f2 commit 4d92299
Showing 1 changed file with 45 additions and 20 deletions.
65 changes: 45 additions & 20 deletions floss/qs/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import argparse
import itertools
import contextlib
from typing import Set, Dict, List, Tuple, Union, Literal, Callable, Iterable, Optional, Sequence
from typing import Set, Dict, List, Tuple, Literal, Callable, Iterable, Optional, Sequence
from dataclasses import field, dataclass

import pefile
Expand All @@ -30,8 +30,6 @@
from floss.qs.db.expert import ExpertStringDatabase
from floss.qs.db.winapi import WindowsApiStringDatabase

MIN_STR_LEN = 6

logger = logging.getLogger("quantumstrand")


Expand All @@ -45,6 +43,7 @@ def timing(msg: str):

@dataclass
class Range:
"a range of contiguous integer values, such as offsets within a byte sequence"
offset: int
length: int

Expand All @@ -53,22 +52,13 @@ def end(self) -> int:
return self.offset + self.length

def slice(self, offset, size) -> "Range":
"create a new range thats a sub-range of this one"
"create a new range thats a sub-range of this one, using relative offsets"
assert offset < self.length
assert offset + size <= self.length
return Range(self.offset + offset, size)

def __contains__(self, other: Union[int, "Range"]) -> bool:
if isinstance(other, int):
# this range strictly contains the point
return self.offset <= other < self.end
elif isinstance(other, Range):
# this range strictly contains the other one
return (other.offset in self) and (other.end in self)
else:
raise TypeError(f"unsupported type: {type(other)}")

def __iter__(self):
"iterate over the values in this range"
yield from range(self.offset, self.end)

def __repr__(self):
Expand All @@ -91,10 +81,11 @@ class Slice:

@property
def data(self) -> bytes:
"get the bytes in this slice, copying the data out"
return self.buf[self.range.offset : self.range.end]

def slice(self, offset, size) -> "Slice":
"create a new slice thats a sub-slice of this one"
"create a new slice thats a sub-slice of this one, using relative offsets"
return Slice(self.buf, self.range.slice(offset, size))

@classmethod
Expand Down Expand Up @@ -130,6 +121,7 @@ def offset(self) -> int:
return self.string.slice.range.offset


MIN_STR_LEN = 6
ASCII_BYTE = r" !\"#\$%&\'\(\)\*\+,-\./0123456789:;<=>\?@ABCDEFGHIJKLMNOPQRSTUVWXYZ\[\]\^_`abcdefghijklmnopqrstuvwxyz\{\|\}\\\~\t".encode(
"ascii"
)
Expand All @@ -138,7 +130,7 @@ def offset(self) -> int:


def extract_ascii_strings(slice: Slice, n: int = MIN_STR_LEN) -> Iterable[ExtractedString]:
"""Extract ASCII strings from the given binary data."""
"enumerate ASCII strings in the given binary data"

if not slice.range.length:
return
Expand All @@ -158,7 +150,7 @@ def extract_ascii_strings(slice: Slice, n: int = MIN_STR_LEN) -> Iterable[Extrac


def extract_unicode_strings(slice: Slice, n: int = MIN_STR_LEN) -> Iterable[ExtractedString]:
"""Extract naive UTF-16 strings from the given binary data."""
"enumerate naive UTF-16 strings in the given binary data"

if not slice.range.length:
return
Expand All @@ -183,6 +175,7 @@ def extract_unicode_strings(slice: Slice, n: int = MIN_STR_LEN) -> Iterable[Extr


def extract_strings(slice: Slice, n: int = MIN_STR_LEN) -> Iterable[ExtractedString]:
"enumerate ASCII and naive UTF-16 strings in the given binary data"
return list(
sorted(
itertools.chain(extract_ascii_strings(slice, n), extract_unicode_strings(slice, n)),
Expand Down Expand Up @@ -464,9 +457,14 @@ def load_databases() -> Sequence[Tagger]:

data_path = pathlib.Path(floss.qs.db.oss.__file__).parent / "data"

# below i use a `if True` blocks to delineate the different databases.
# these could be functions, at the expense of more visual noise.
# note that each one creates a closure over the database object.

if True:
winapi_database = floss.qs.db.winapi.WindowsApiStringDatabase.from_dir(data_path / "winapi")

# note closure over winapi_database
def winapi_database_tagger(s: ExtractedString) -> Sequence[Tag]:
return query_winapi_name_database(winapi_database, s.string)

Expand All @@ -475,6 +473,7 @@ def winapi_database_tagger(s: ExtractedString) -> Sequence[Tag]:
if True:
capa_expert_database = ExpertStringDatabase.from_file(data_path / "expert" / "capa.jsonl")

# note closure over capa_expert_database
def capa_expert_database_tagger(s: ExtractedString) -> Sequence[Tag]:
return query_expert_string_database(capa_expert_database, s.string)

Expand All @@ -487,6 +486,7 @@ def capa_expert_database_tagger(s: ExtractedString) -> Sequence[Tag]:

library_databases.append(OpenSourceStringDatabase.from_file(data_path / "crt" / "msvc_v143.jsonl.gz"))

# note closure over library_databases
def library_databases_tagger(s: ExtractedString) -> Sequence[Tag]:
return query_library_string_databases(library_databases, s.string)

Expand All @@ -501,6 +501,7 @@ def library_databases_tagger(s: ExtractedString) -> Sequence[Tag]:
StringGlobalPrevalenceDatabase.from_file(data_path / "gp" / "cwindb-dotnet.jsonl.gz")
)

# note closure over global_prevalence_database
def global_prevalence_database_tagger(s: ExtractedString) -> Sequence[Tag]:
return query_global_prevalence_database(global_prevalence_database, s.string)

Expand All @@ -509,6 +510,7 @@ def global_prevalence_database_tagger(s: ExtractedString) -> Sequence[Tag]:
if True:
global_prevalence_hash_database_xaa = StringHashDatabase.from_file(data_path / "gp" / "xaa-hashes.bin")

# note closure over global_prevalence_hash_database_xaa
def global_prevalence_hash_database_xaa_tagger(s: ExtractedString) -> Sequence[Tag]:
return query_global_prevalence_hash_database(global_prevalence_hash_database_xaa, s.string)

Expand All @@ -517,6 +519,7 @@ def global_prevalence_hash_database_xaa_tagger(s: ExtractedString) -> Sequence[T
if True:
global_prevalence_hash_database_yaa = StringHashDatabase.from_file(data_path / "gp" / "yaa-hashes.bin")

# note closure over global_prevalence_hash_database_yaa
def global_prevalence_hash_database_yaa_tagger(s: ExtractedString) -> Sequence[Tag]:
return query_global_prevalence_hash_database(global_prevalence_hash_database_yaa, s.string)

Expand All @@ -527,6 +530,27 @@ def global_prevalence_hash_database_yaa_tagger(s: ExtractedString) -> Sequence[T

@dataclass
class Layout(abc.ABC):
"""
recursively describe a region of a data, as a tree.
the compute_layout routines construct this tree.
each node in the tree (Layout), describes a range of the data.
it may have children, which describes sub-ranges of the data.
children don't overlap nor extend before/beyond the parent range.
children are ordered by their offset in the data.
children don't have to be contiguous - there can be gaps, or none at all.
there are routines for traversing to the prior/next sibling, if any,
and accessor properties for the parent and children.
each node has a nice human readable name.
each node has a list of strings that are contained by the node;
these strings don't overlap with any children strings, they're only found in the gaps.
note that `Layout` is the abstract base class for nodes in the tree.
subclasses are used to represent different types of regions,
such as a PE file, a section, a segment, or a resource.
subclasses can provide more specific behavior when it comes to tagging strings.
"""
slice: Slice

# human readable name
Expand Down Expand Up @@ -588,10 +612,12 @@ def add_child(self, child: "Layout"):

@property
def offset(self) -> int:
"convenience"
return self.slice.range.offset

@property
def end(self) -> int:
"convenience"
return self.slice.range.end

def tag_strings(self, taggers: Sequence[Tagger]):
Expand Down Expand Up @@ -650,8 +676,7 @@ class SectionLayout(Layout):

@dataclass
class SegmentLayout(Layout):
"""region not covered by any section"""

"""region not covered by any section, such as PE header or overlay"""
pass


Expand Down Expand Up @@ -1014,7 +1039,7 @@ def render_strings(
# rsrc: BINARY/102/0 (pe)
return render_strings(console, layout.children[0], tag_rules, depth, name_hint=layout.name)

BORDER_STYLE = Style(color="grey50")
BORDER_STYLE = MUTED_STYLE

name = layout.name
if name_hint:
Expand Down

0 comments on commit 4d92299

Please sign in to comment.