Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions .github/workflows/build-qs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ jobs:
with:
submodules: false
- name: Set up Python 3.11
uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
uses: actions/setup-python@83679a892e2d95755f2dac6acb0bfd1e9ac5d548 # v6.1.0
with:
python-version: '3.11'
- name: Install floss[build]
run: pip install -e .[build]
- name: Install floss[qs]
run: pip install -e .[qs]
- name: Install floss[qs,ida]
run: pip install -e .[qs,ida]
- name: Build standalone executable
run: pyinstaller .github/pyinstaller/qs.spec
- uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.4
Expand Down Expand Up @@ -78,6 +78,19 @@ jobs:
run: chmod +x ${{ matrix.artifact_name }}
- name: Run quantumstrand -h
run: ./${{ matrix.artifact_name }} -h
- name: Set up Python 3.11
uses: actions/setup-python@83679a892e2d95755f2dac6acb0bfd1e9ac5d548 # v6.1.0
with:
python-version: '3.11'
- name: Setup uv
uses: astral-sh/setup-uv@61cb8a9741eeb8a550a1b8544337180c0fc8476b # v7.2.0
- name: Install IDA
if: ${{ env.IDA_LICENSE_ID != 0 }}
run: |
uv run --with ida-hcli hcli ida install --download-id ida-pro:latest --license-id ${{ secrets.IDA_LICENSE_ID }} --set-default --accept-eula --yes
env:
HCLI_API_KEY: ${{ secrets.HCLI_API_KEY }}
IDA_LICENSE_ID: ${{ secrets.IDA_LICENSE_ID }}
- name: Run quantumstrand
run: ./${{ matrix.artifact_name }} tests/data/test-decode-to-stack.exe

Expand Down
11 changes: 10 additions & 1 deletion .github/workflows/tests-qs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,18 @@ jobs:
uses: actions/setup-python@0a5c61591373683505ea898e09a3ea4f39ef2b9c # v5.0.0
with:
python-version: '3.11'
- name: Setup uv
uses: astral-sh/setup-uv@61cb8a9741eeb8a550a1b8544337180c0fc8476b # v7.2.0
- name: Install FLOSS
run: |
pip install -r requirements.txt
pip install -e .[dev,qs]
pip install -e .[dev,qs,ida]
- name: Install IDA
if: ${{ env.IDA_LICENSE_ID != 0 }}
run: |
uv run --with ida-hcli hcli ida install --download-id ida-pro:latest --license-id ${{ secrets.IDA_LICENSE_ID }} --set-default --accept-eula --yes
env:
HCLI_API_KEY: ${{ secrets.HCLI_API_KEY }}
IDA_LICENSE_ID: ${{ secrets.IDA_LICENSE_ID }}
- name: Run tests
run: pytest -k qs
105 changes: 67 additions & 38 deletions floss/qs/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,33 @@
import json
import time
import bisect
import shutil
import hashlib
import logging
import pathlib
import argparse
import datetime
import tempfile
import functools
import itertools
import contextlib
from typing import Set, Dict, List, Tuple, Literal, Callable, Iterable, Optional, Sequence
from pathlib import Path
from collections import defaultdict

import pefile
import colorama
import lancelot
import rich.traceback
from pydantic import Field, BaseModel, ConfigDict
from rich.text import Text
from rich.style import Style
from rich.console import Console

try:
import ida_domain
HAS_IDA = True
except ImportError:
HAS_IDA = False

import floss.main
import floss.qs.db.gp
import floss.qs.db.oss
Expand Down Expand Up @@ -652,7 +659,7 @@ def make_tagger(db, queryfn) -> Tagger:

# supplement code analysis with a database of junk code strings
junk_db = StringGlobalPrevalenceDatabase.from_file(
pathlib.Path(floss.qs.db.__file__).parent / "data" / "gp" / "junk-code.jsonl.gz"
Path(floss.qs.db.__file__).parent / "data" / "gp" / "junk-code.jsonl.gz"
)
ret.append(make_tagger(junk_db, query_code_string_database))

Expand Down Expand Up @@ -957,11 +964,15 @@ def _merge_overlapping_ranges(ranges: List[Tuple[int, int]]) -> List[Tuple[int,
return merged_ranges


def _get_code_ranges(ws: lancelot.Workspace, pe: pefile.PE, slice_: Slice) -> List[Tuple[int, int]]:
def _get_code_ranges(db, pe: pefile.PE, slice_: Slice) -> List[Tuple[int, int]]:
"""
Extract and return the raw, unmerged code ranges from a PE file.

db is an ida_domain.Database instance.
"""
base_address = ws.base_address
from ida_domain import flowchart

base_address = db.metadata.base_address

# cache because getting the offset is slow
@functools.lru_cache(maxsize=None)
Expand All @@ -973,26 +984,29 @@ def get_offset_from_rva_cached(rva):
return None

code_ranges: List[Tuple[int, int]] = []
for function in ws.get_functions():
cfg = ws.build_cfg(function)
for bb in cfg.basic_blocks.values():
va = bb.address
rva = va - base_address
offset = get_offset_from_rva_cached(rva)
if offset is None:
continue
for function in db.functions:
try:
fc = flowchart.FlowChart(db, function)
for block in fc:
va: int = block.start_ea
rva: int = va - base_address
offset: int = get_offset_from_rva_cached(rva)
if offset is None:
continue

size = bb.length
size: int = block.end_ea - block.start_ea

if not slice_.contains_range(offset, size):
logger.warning("lancelot identified code at an invalid location, skipping basic block at 0x%x", rva)
continue
if not slice_.contains_range(offset, size):
logger.warning("IDA identified code at an invalid location, skipping basic block at 0x%x", rva)
continue

code_ranges.append((offset, offset + size - 1))
code_ranges.append((offset, offset + size - 1))
except Exception as e:
logger.warning("Failed to get flowchart for function at 0x%x: %s", function.start_ea, e)
return code_ranges


def compute_pe_layout(slice: Slice, xor_key: int | None) -> Layout:
def compute_pe_layout(slice: Slice, xor_key: int | None, path: Optional[Path] = None) -> Layout:
data = slice.data

try:
Expand All @@ -1008,21 +1022,33 @@ def compute_pe_layout(slice: Slice, xor_key: int | None) -> Layout:
for offset in structure.slice.range:
structures_by_address[offset] = structure

# lancelot only accepts bytes, not mmap
ws = None
with timing("lancelot: load workspace"):
try:
ws = lancelot.from_bytes(data)
except ValueError as e:
logger.warning("lancelot failed to load workspace: %s", e)

# contains the file offsets of bytes that are part of recognized instructions.
code_offsets = OffsetRanges()
if ws:
with timing("lancelot: find code"):
code_ranges = _get_code_ranges(ws, pe, slice)
merged_code_ranges = _merge_overlapping_ranges(code_ranges)
code_offsets = OffsetRanges.from_merged_ranges(merged_code_ranges)

if path and not HAS_IDA:
logger.debug("ida-domain not available, skipping code analysis")

if path and HAS_IDA:
from ida_domain import Database
from ida_domain.database import IdaCommandOptions

with tempfile.TemporaryDirectory() as tmpdir:
# TODO: if there's already an .i64, maybe we should use that instead.
work_path = Path(tmpdir) / path.name
shutil.copy2(path, work_path)

logger.debug("ida-domain: opening database...")
opts = IdaCommandOptions(
# - we set the primary and secondary Lumina servers to 0.0.0.0 to disable Lumina,
# which sometimes provides bad names, including overwriting names from debug info.
auto_analysis=True,
plugin_options="lumina:host=0.0.0.0 -Osecondary_lumina:host=0.0.0.0 -R",
)
with Database.open(path=str(work_path), args=opts, save_on_close=False) as db:
with timing("ida-domain: find code"):
code_ranges = _get_code_ranges(db, pe, slice)
merged_code_ranges = _merge_overlapping_ranges(code_ranges)
code_offsets = OffsetRanges.from_merged_ranges(merged_code_ranges)

layout = PELayout(
slice=slice,
Expand Down Expand Up @@ -1157,7 +1183,11 @@ def collect_pe_resources(dir_data: pefile.ResourceDirData, path: Tuple[str, ...]

for resource in resources:
# parse content of resources, such as embedded PE files
resource.add_child(compute_layout(resource.slice))
#
# IDA can't load more than one file at once, nor can it load from memory,
# so we can't analyze the code of the embedded file, unfortunately.
# this is why we pass `path=None`.
resource.add_child(compute_layout(resource.slice, path=None))

for resource in resources:
# place resources into their parent section, usually .rsrc
Expand All @@ -1173,7 +1203,7 @@ def xor_static(data: bytes, i: int) -> bytes:
return bytes(c ^ i for c in data)


def compute_layout(slice: Slice) -> Layout:
def compute_layout(slice: Slice, path: Optional[Path] = None) -> Layout:

# TODO don't do this for text or other obvious non-xored data

Expand Down Expand Up @@ -1202,8 +1232,7 @@ def compute_layout(slice: Slice) -> Layout:
# Try to parse as PE file
if decoded_slice.data.startswith(b"MZ"):
try:
# lancelot may panic here, which we can't currently catch from Python
return compute_pe_layout(decoded_slice, xor_key)
return compute_pe_layout(decoded_slice, xor_key, path)
except ValueError as e:
logger.debug("failed to parse as PE file: %s", e)
# Fall back to using the default binary layout
Expand Down Expand Up @@ -1496,7 +1525,7 @@ def main():
sys.stdout.reconfigure(encoding="utf-8")
colorama.just_fix_windows_console()

path = pathlib.Path(args.path)
path = Path(args.path)
if not path.exists():
logging.error("%s does not exist", path)
return 1
Expand All @@ -1520,7 +1549,7 @@ def main():
slice = Slice.from_bytes(buf=buf)

# build the layout tree that describes the structures and ranges of the file.
layout = compute_layout(slice)
layout = compute_layout(slice, path)

# recursively populate the `.strings: List[ExtractedString]` field of each layout node.
extract_layout_strings(layout, args.min_length)
Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ qs = [
"dnfile==0.13.0",
"colorama==0.4.6",
"msgspec==0.14.2",
"python-lancelot==0.8.10",
]
ida = [
"ida-domain>=0.3.0,<0.4.0",
]
dev = [
"pre-commit==4.2.0",
Expand Down
55 changes: 30 additions & 25 deletions tests/test_qs_code_ranges.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pefile
import pytest
import lancelot
from ida_domain import database, flowchart

from floss.qs.main import (
Range,
Expand Down Expand Up @@ -66,39 +66,44 @@ def get_offset_from_rva(rva):
return pe


@pytest.fixture
def mock_ws():
"""Fixture for a mocked lancelot.Workspace object."""
ws = MagicMock(spec=lancelot.Workspace)
ws.base_address = 0x400000
@pytest.fixture()
def mock_db(monkeypatch):
"""Fixture for a mocked ida_domain.database.Database object."""
db = MagicMock(spec=database.Database)
db.metadata.base_address = 0x400000

# Mock functions and basic blocks
func1 = Mock()
func1.start_ea = 0x401000
func2 = Mock()
ws.get_functions.return_value = [func1, func2]

bb1 = Mock(address=0x401000, length=0x10) # rva: 0x1000, offset: 0x2000
bb2 = Mock(address=0x401020, length=0x15) # rva: 0x1020, offset: 0x2020
bb3 = Mock(address=0x402000, length=0x20) # rva: 0x2000, offset: 0x3000
func2.start_ea = 0x402000
db.functions = [func1, func2]

# Setup cfg for each function
cfg1 = Mock(basic_blocks={bb1.address: bb1, bb2.address: bb2})
cfg2 = Mock(basic_blocks={bb3.address: bb3})
bb1 = Mock(start_ea=0x401000, end_ea=0x401010) # rva: 0x1000, offset: 0x2000, size 0x10
bb2 = Mock(start_ea=0x401020, end_ea=0x401035) # rva: 0x1020, offset: 0x2020, size 0x15
bb3 = Mock(start_ea=0x402000, end_ea=0x402020) # rva: 0x2000, offset: 0x3000, size 0x20

def build_cfg(func):
# Setup FlowChart mock
def mock_flowchart_init(self, database, func, **kwargs):
if func == func1:
return cfg1
return cfg2
self._blocks = [bb1, bb2]
else:
self._blocks = [bb3]

def mock_flowchart_iter(self):
return iter(self._blocks)

monkeypatch.setattr(flowchart.FlowChart, "__init__", mock_flowchart_init)
monkeypatch.setattr(flowchart.FlowChart, "__iter__", mock_flowchart_iter)

ws.build_cfg.side_effect = build_cfg
return ws
return db


def test_get_code_ranges_basic(mock_ws, mock_pe):
def test_get_code_ranges_basic(mock_db, mock_pe):
"""Test basic extraction of code ranges."""
# Slice covers the entire mock file
slice_ = Slice(buf=b"", range=Range(offset=0, length=0x5000))
ranges = _get_code_ranges(mock_ws, mock_pe, slice_)
ranges = _get_code_ranges(mock_db, mock_pe, slice_)

assert ranges == [
(0x2000, 0x200F), # bb1: offset 0x2000, size 0x10
Expand All @@ -107,17 +112,17 @@ def test_get_code_ranges_basic(mock_ws, mock_pe):
]


def test_get_code_ranges_skips_invalid_offset(mock_ws, mock_pe):
def test_get_code_ranges_skips_invalid_offset(mock_db, mock_pe):
"""Test that it skips basic blocks that fall outside the slice."""
# Slice is small and only covers the first basic block
slice_ = Slice(buf=b"", range=Range(offset=0, length=0x2010))
ranges = _get_code_ranges(mock_ws, mock_pe, slice_)
ranges = _get_code_ranges(mock_db, mock_pe, slice_)

# Only bb1 should be included
assert ranges == [(0x2000, 0x200F)]


def test_get_code_ranges_handles_pe_error(mock_ws, mock_pe):
def test_get_code_ranges_handles_pe_error(mock_db, mock_pe):
"""Test that it handles PEFormatError when getting an offset."""

# Make one of the RVA lookups fail
Expand All @@ -129,7 +134,7 @@ def get_offset_from_rva_with_error(rva):
mock_pe.get_offset_from_rva.side_effect = get_offset_from_rva_with_error

slice_ = Slice(buf=b"", range=Range(offset=0, length=0x5000))
ranges = _get_code_ranges(mock_ws, mock_pe, slice_)
ranges = _get_code_ranges(mock_db, mock_pe, slice_)

# bb2 should be skipped
assert ranges == [
Expand Down
4 changes: 3 additions & 1 deletion tests/test_qs_pma0101.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def pma0101_layout():
binary_path = Path("tests") / Path("data") / Path("pma") / Path("Practical Malware Analysis Lab 01-01.dll_")
slice_buf = binary_path.read_bytes()
file_slice = Slice.from_bytes(slice_buf)
layout = compute_layout(file_slice)
layout = compute_layout(file_slice, path=binary_path)
extract_layout_strings(layout, 6)
taggers = load_databases()
layout.tag_strings(taggers)
Expand Down Expand Up @@ -89,4 +89,6 @@ def test_strings(pma0101_layout):

# assert count of expected strings not tagged as #code or #reloc
filtered_strings = [s for s in all_strings if not s.tags.intersection({"#code", "#reloc"})]

# if there are 18, then an expected #code string is not getting filtered out
assert len(filtered_strings) == 17