Skip to content

Conversation

@mrkickling
Copy link
Contributor

@mrkickling mrkickling commented Dec 17, 2024

This is a rather big redesign of the simulator.

MalSimulator is now very much simplified:

  • This creates a base for other wrappers (with their own observation formats) to use
  • It also allows to run the simulator more easily for other reasons than ML training
  • It works on AttackGraphs only
  • It stores agent states (Agent classes were added for this)
  • The MAL Simulator contains the state of the Attack Graph
  • step() takes AttackGraphNodes as actions and returns those that were performed (not the full state). Does this make sense?

Things related to building up observations (specifically the ParallelEnv part that before was in the MalSimulator) has now been moved to a wrapper malsim_vectorized_obs_env.py (naming here is an open question).
This wrapper builds up state step by step from the performed actions returned from MalSimulator.step(actions).
Logging for that wrapper was also factored into its own module (should it be?)

Added the class DecisionAgent (these are agents like BFS/DFS + keyboard + in the future more advanced heuristics).
This was to create a common interface for working with DecisionAgents.
All our DecisionAgents are still not done, they are still tailored towards the ParallelEnv/MalSim Vectorized Obs Env, but the goal is to have them work with the regular MalSimulator (on attack graphs state).

Any number of agents can now be specified in the scenario file. This is a cleaner solution in my opinion.

Let me know if something does not make sense.

@mrkickling mrkickling force-pushed the add-base-mal-simulator branch 3 times, most recently from aebfcb8 to 753ce55 Compare December 20, 2024 12:33
@mrkickling mrkickling force-pushed the add-base-mal-simulator branch 2 times, most recently from 502b688 to fe27e46 Compare January 15, 2025 13:52
@mrkickling mrkickling marked this pull request as ready for review January 15, 2025 14:18
@mrkickling mrkickling force-pushed the add-base-mal-simulator branch from ce89c22 to 950fed4 Compare January 15, 2025 14:22
@mrkickling mrkickling requested a review from andrewbwm January 15, 2025 14:25
@mrkickling mrkickling changed the title Add base mal simulator Simplify MalSimulator and create wrappers for other interfaces Jan 15, 2025
@mrkickling mrkickling requested a review from Hoclor January 15, 2025 14:45
@mrkickling mrkickling force-pushed the add-base-mal-simulator branch from d1b0cf8 to 2fe03ed Compare January 17, 2025 15:07
@kasanari
Copy link
Collaborator

kasanari commented Jan 17, 2025

Throwing this into the mix:

# Author: Jakob Nyberg, 2025
from collections.abc import Callable
import logging


from typing import Any

import numpy as np
import numpy.typing as npt

logger = logging.getLogger(__name__)


Array = npt.NDArray[np.int32]
BoolArray = npt.NDArray[np.bool_]


def get_new_targets(
    discovered_targets: npt.NDArray[np.int32],
    mask: tuple[npt.NDArray[np.bool_], npt.NDArray[np.bool_]],
) -> tuple[npt.NDArray[np.int32], npt.NDArray[np.int32]]:
    attack_surface = mask[1]
    surface_indexes = np.flatnonzero(attack_surface)
    new_targets = np.array(
        [idx for idx in surface_indexes if idx not in discovered_targets],
        dtype=np.int32,
    )
    return new_targets, surface_indexes


def move_target_to_back(
    current_target: np.int32 | None,
    targets: npt.NDArray[np.int32],
    attack_surface: npt.NDArray[np.int32],
) -> tuple[npt.NDArray[np.int32], np.int32 | None]:
    """
    If the current target was not compromised this turn, put it
    on the bottom of the stack and focus on next target instead
    """

    if not current_target:
        return targets, current_target

    if current_target in attack_surface:
        targets = np.concatenate((current_target, targets[:-1]))
        return targets, targets[-1]

    return targets, current_target


def choose_target(
    targets: npt.NDArray[np.int32],
    attack_surface: npt.NDArray[np.int32],
) -> tuple[npt.NDArray[np.int32], np.int32, bool]:
    # targets that have not been removed from the attack surface by another agent
    valid_targets = np.array(
        [t for t in targets if t in attack_surface], dtype=np.int32
    )

    if len(valid_targets) == 0:
        return valid_targets, np.int32(0), True

    return valid_targets[:-1], valid_targets[-1], False


def compute_action(
    permute_func: Callable[[Array], Array],
    action_func: Callable[
        [np.int32 | None, Array, Array], tuple[Array, np.int32 | None, bool]
    ],
    add_targets_func: Callable[[Array, Array], Array],
):
    def _compute_action(
        targets: Array,
        mask: tuple[BoolArray, BoolArray],
        current_target: np.int32 | None,
    ):
        new_targets, surface_indexes = get_new_targets(targets, mask)

        targets, current_target, done = action_func(
            current_target,
            add_targets_func(targets, permute_func(new_targets)),
            surface_indexes,
        )

        action = 0 if done else 1
        if action == 0:
            logger.debug(
                'Attacker agent does not have any valid targets and will terminate'
            )

        logger.debug(f'Attacker targets: {targets}')
        logger.debug(f'Attacker current target: {current_target}')
        logger.debug(f'Attacker action: {action}')
        return targets, action, current_target

    return _compute_action


def create_permute_func(seed: int | None, randomize: bool) -> Callable[[Array], Array]:
    s = seed if seed else np.random.SeedSequence().entropy
    rng = np.random.default_rng(s) if randomize else None
    return rng.permutation if rng else lambda x: x


class BreadthFirstAttacker:
    def __init__(self, agent_config: dict[str, Any]) -> None:
        self.current_target: np.int32 | None = None
        self.targets: npt.NDArray[np.int32] = np.array([], dtype=np.int32)

        permute_func = create_permute_func(
            agent_config.get('seed', None), agent_config.get('randomize', False)
        )

        self.compute_action = compute_action(
            permute_func, self._action_func, self._add_new_targets_func
        )

    def __call__(
        self,
        _: dict[str, Any],
        mask: tuple[npt.NDArray[np.bool_], npt.NDArray[np.bool_]],
    ):
        self.targets, action, self.current_target = self.compute_action(
            self.targets, mask, self.current_target
        )

        return (action, self.current_target)

    @staticmethod
    def _action_func(
        current_target: np.int32 | None,
        targets: Array,
        surface_indexes: Array,
    ):
        targets, current_target = move_target_to_back(
            current_target, targets, surface_indexes
        )
        return choose_target(targets, surface_indexes)

    @staticmethod
    def _add_new_targets_func(targets: Array, new_targets: Array):
        new_targets = np.flip(new_targets)  # to comply with the original implementation
        return np.concatenate([new_targets, targets])


class DepthFirstAttacker:
    def __init__(self, agent_config: dict[str, Any]) -> None:
        self.current_target: np.int32 | None = None
        self.targets: npt.NDArray[np.int32] = np.array([], dtype=np.int32)

        permute_func = create_permute_func(
            agent_config.get('seed', None), agent_config.get('randomize', False)
        )

        self.compute_action = compute_action(
            permute_func, self._action_func, self._add_new_targets_func
        )

    def __call__(
        self,
        _: dict[str, Any],
        mask: tuple[npt.NDArray[np.bool_], npt.NDArray[np.bool_]],
    ):
        self.targets, action, self.current_target = self.compute_action(
            self.targets, mask, self.current_target
        )

        return (action, self.current_target)

    @staticmethod
    def _action_func(
        current_target: np.int32 | None,
        targets: Array,
        surface_indexes: Array,
    ):
        # keep working on a target unless it has been removed from the attack surface
        return (
            choose_target(targets, surface_indexes)
            if current_target not in surface_indexes
            else (targets, current_target, False)
        )

    @staticmethod
    def _add_new_targets_func(targets: Array, new_targets: Array):
        # add new targets to the front of the list, so that the agent works on the latest targets first
        return np.concatenate([targets, new_targets])

@mrkickling mrkickling force-pushed the add-base-mal-simulator branch 2 times, most recently from d208cd3 to 45a1182 Compare January 29, 2025 08:58
nkakouros and others added 24 commits February 24, 2025 16:37
- test_step: required attack surface to be set
- test_pz: had to make sure ParallelEnv returned only alive agents on .agents
…emovals

separately.

Have both Attacker and Defender AgentStates use the common
performed_nodes property.

Cleanup return values of attacker and defenders steps and update the
AgentState within the _attacker/_defender_step as much as possible.
…ict with agent states

- All are private
- @Property MalSimulator.agent_states is still the public interface to fetch AgentStates
@andrewbwm andrewbwm merged commit d120750 into main Feb 26, 2025
5 checks passed
@andrewbwm andrewbwm deleted the add-base-mal-simulator branch February 26, 2025 13:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants