Skip to content
Merged
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
737439f
Make malsim.agents a package and rename compute_action_from_dict to g…
mrkickling Dec 20, 2024
2f0145f
Simplify MAL Simulator by removing observation creation, add separate…
mrkickling Dec 20, 2024
cbcdc44
Deprecate attacker_agent_class and defender_agent_class from scenario
mrkickling Dec 20, 2024
b7fbd8b
Update CLI to use new simulator structure
mrkickling Dec 20, 2024
577039a
Update README with new scenario format
mrkickling Dec 20, 2024
c3cecaf
Update Attacker/DefenderEnv to use new MalSim and scenario format
mrkickling Dec 20, 2024
a73f65e
Update testdata and tests to work with new scenario format and simulator
mrkickling Dec 20, 2024
db42070
Inherit DecisionAgent in all decision agents
mrkickling Dec 20, 2024
b11c528
Fix cli: log issue when no action is selected
mrkickling Dec 20, 2024
806b6a2
Implement and use MalSimulator.get_agent
mrkickling Dec 20, 2024
894fb86
Move serialized obs simulator into a MalSimVectorizedObsEnv that inhe…
mrkickling Jan 10, 2025
a0ce2ff
Make MalSimVectorizedObsEnv a proper wrapper taking a simulator as on…
mrkickling Jan 13, 2025
97a7273
Move logging utils for vectorized obs mal sim env to ./wrappers
mrkickling Jan 15, 2025
44268bc
Comment adjustments
mrkickling Jan 15, 2025
dc3554c
Make agents_dict private _agents_dict
mrkickling Jan 16, 2025
aab11d0
_initialize_rewards -> _init_agent_rewards
mrkickling Jan 16, 2025
a1de11c
Add MalSimAgentView - a public read only interface for agent states i…
mrkickling Jan 17, 2025
7489788
Adapt MalSimVectorizedObsEnv to new MalSimAgentView interface
mrkickling Jan 17, 2025
1ad08bd
Fix bug: defenders need to perform action before attacker
mrkickling Jan 17, 2025
3dc117c
Use kwarg 'action_mask' in searcher decision agents, use that in rest…
mrkickling Jan 17, 2025
a8596c9
Fix log stmt
mrkickling Jan 17, 2025
be7cd66
rename module agents.base_agent->agents.decision_agent to easier diff…
mrkickling Jan 17, 2025
1c0f574
Rewrite searcher agents to use AttackGraphNodes instead of serialized…
mrkickling Jan 17, 2025
91f3a02
Add types to MalSimAgentView
mrkickling Jan 17, 2025
d0822a2
Add tests for searcher agents
mrkickling Jan 17, 2025
52eaf6a
Fix return types decision agents, make KeyboardAgent return nodes, ad…
mrkickling Jan 17, 2025
773f692
Remove unused variable
mrkickling Jan 17, 2025
9a47597
Have CLI use MalSimulator instead of SerializedObsEnv
mrkickling Jan 17, 2025
0b35b89
Fix cli test
mrkickling Jan 17, 2025
38c987f
Remove unused vars
mrkickling Jan 20, 2025
629ddc2
Rename MalSimAgent->MalSimAgentState
mrkickling Jan 21, 2025
1e2300e
Simplify example scenario test
mrkickling Jan 21, 2025
9947605
Rename: get_agent -> get_agent_state
mrkickling Jan 21, 2025
3623162
typing
mrkickling Jan 21, 2025
465ff6d
Make sure seed is sent to agents when running MalSimVectorizedObsEnv
mrkickling Jan 21, 2025
2b873af
Simplify agents
nkakouros Jan 20, 2025
c4f76c8
Update malsim vector obs env to abide to new language graph
mrkickling Jan 29, 2025
6f94db7
Give AttackGraphpNode lang_graph_attack_step in tests the tests does …
mrkickling Jan 29, 2025
5eb094a
Update test to work with new mal simulator - but for some reason it f…
mrkickling Jan 29, 2025
37c1ae3
Make sure searcher agents are deterministic by sorting the action sur…
mrkickling Jan 29, 2025
80f44b5
Allow to give agent config when loading scenario
mrkickling Jan 29, 2025
34dc192
Use relative imports
mrkickling Feb 7, 2025
029e7a2
Move logging methods into malsim_vectorized_obs_env.py
mrkickling Feb 13, 2025
63135bf
Use new API of maltoolbox=0.3
mrkickling Feb 14, 2025
561d090
Use iterator instead of list when getting first attacker in graph
mrkickling Feb 14, 2025
785d716
Remove unused import/variables
mrkickling Feb 14, 2025
410d18b
Include simulation step count in cli
mrkickling Feb 14, 2025
0a1a050
Move all MalSimulator related classes into one file (mal_simulator.py)
mrkickling Feb 19, 2025
dd877af
Import VectorizedObsEnv from wrappers instead of from sims
mrkickling Feb 19, 2025
e23b9d8
Move mal_simulator.py to root package directory instead of in ./sims …
mrkickling Feb 19, 2025
f9de585
Add MalSimulator to __init__.py so it can be imported directly from t…
mrkickling Feb 19, 2025
79793dc
Add MalSimVectorized to its __init__.py __all__
mrkickling Feb 19, 2025
57f69b6
Check asset names without using the now remove model.asset_names
mrkickling Feb 20, 2025
afc8278
Add error message if node that is stepped through is not part of simu…
mrkickling Feb 20, 2025
a6d0a90
Update tests to work with mal-toolbox version 0.3.6, also fix some mi…
andrewbwm Feb 20, 2025
d42e1a3
Move the passive agent to its own file
andrewbwm Feb 20, 2025
0ac43f6
Update gitignore to hide the logs file now used for logging
andrewbwm Feb 20, 2025
d4c123b
Make code more pythonic and simpler
nkakouros Feb 19, 2025
eaa4503
Address feedback
nkakouros Feb 21, 2025
99be8b8
Imports with extra deps should not be top-level
nkakouros Feb 22, 2025
3775f87
Rename wrappers to envs
nkakouros Feb 22, 2025
7439c5c
Have test files follow package structure
nkakouros Feb 22, 2025
d3aa4e3
Stricter check for accepting to compromise node
nkakouros Feb 22, 2025
7ce7276
Don't spam the debug log when all agents are terminated
nkakouros Feb 24, 2025
cef8b55
Fix failing tests
mrkickling Feb 24, 2025
2b5e99e
Fix broken reference to old cli module
mrkickling Feb 24, 2025
01d8997
Rework apply_attacker_entrypoints to add_attacker_entrypoints
andrewbwm Feb 24, 2025
4607704
Update malsim/mal_simulator.py
nkakouros Feb 24, 2025
a608e8a
Rework MalSimulatorAgentState to track action surface additions and r…
andrewbwm Feb 25, 2025
4bab692
Remove unused import and change default_factory in set in MalSimDefen…
mrkickling Feb 25, 2025
1168a06
Add assert to test_example_scenario test
mrkickling Feb 25, 2025
9c4a904
Remove Generic
nkakouros Feb 25, 2025
1d38531
Remove unused malsim setting
nkakouros Feb 25, 2025
938f313
Minor rewrite
nkakouros Feb 25, 2025
c27aa65
Fix tests
nkakouros Feb 25, 2025
b720c7e
Apply compromised defender penalty once
nkakouros Feb 25, 2025
2db215e
Unnest loops for faster sim initialization
nkakouros Feb 25, 2025
68cbcd0
Fix typo/bug, add comment
mrkickling Feb 26, 2025
986c8b4
Rename _disable_attack_steps -> _uncompromise_attack_steps and fix do…
mrkickling Feb 26, 2025
2da1ca3
Make MalSimulator _agents and _alive_agents private
mrkickling Feb 26, 2025
597fbe3
Fix typehint
mrkickling Feb 26, 2025
f86a66c
Have one list with registered agents, one with living agent and one d…
mrkickling Feb 26, 2025
6062ba7
Add more action surface assertions in MalSim step test
mrkickling Feb 26, 2025
2c1fcf8
Re-enable clearing the step_action_surface_removals at the beginning …
andrewbwm Feb 26, 2025
65db799
Re-enable cummulative step action surface removals
andrewbwm Feb 26, 2025
1f61dfd
Add a simple AgentStateView test to check some of the basic functiona…
andrewbwm Feb 26, 2025
785c0c9
Remove _registered_agents, have _get_defender_agents and _get_attacke…
mrkickling Feb 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 11 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ they are a setup for running a simulation. This is how the format looks like:
lang_file: <path to .mar-archive>
model_file: <path to json/yml model>

attacker_agent_class: 'BreadthFirstAttacker' | 'DepthFirstAttacker' | 'KeyboardAgent'

# For defender_agent_class, null and False are treated the same - no defender will be used in the simulation
defender_agent_class: 'BreadthFirstAttacker' | 'DepthFirstAttacker' | 'KeyboardAgent' | null | False


# Optionally add rewards for each attack step
rewards:
<full name of attack step>: <reward>
Expand All @@ -50,17 +44,19 @@ rewards:
# Data A:read: 100
...

# Add entry points to AttackGraph with attacker names
# and attack step full_names
agents:
'Attacker1':
type: 'attacker'
agent_class: BreadthFirstAttacker | DepthFirstAttacker | KeyboardAgent | null
entry_points:
- 'Credentials:6:attemptCredentialsReuse'

# Optionally add entry points to AttackGraph with attacker name and attack step full_names.
# NOTE: If attacker entry points defined in both model and scenario,
# the scenario overrides the ones in the model.
attacker_entry_points:
<attacker name>:
- <attack step full name>
'Defender1':
type: 'defender'
agent_class: BreadthFirstDefender | DepthFirstDefender | KeyboardAgent | null

# example:
# 'Attacker1':
# - 'Credentials:6:attemptCredentialsReuse'

# Optionally add observability rules that are applied to AttackGrapNodes
# to make only certain steps observable
Expand Down
11 changes: 11 additions & 0 deletions malsim/agents/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from .decision_agent import PassiveAgent, DecisionAgent
from .keyboard_input import KeyboardAgent
from .searchers import BreadthFirstAttacker, DepthFirstAttacker

__all__ = [
'PassiveAgent',
'DecisionAgent',
'KeyboardAgent',
'BreadthFirstAttacker',
'DepthFirstAttacker'
]
39 changes: 39 additions & 0 deletions malsim/agents/decision_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""A decision agent is a heuristic agent"""

from __future__ import annotations
from typing import TYPE_CHECKING, Optional
from abc import ABC, abstractmethod

if TYPE_CHECKING:
from ..sims import MalSimAgentStateView
from maltoolbox.attackgraph import AttackGraphNode

class DecisionAgent(ABC):

@abstractmethod
def get_next_action(
self,
agent: MalSimAgentStateView,
**kwargs
) -> Optional[AttackGraphNode]:
"""
Select next action the agent will work with.
Attributes:
agent: Current state of and other info about the agent from the simulator
Returns:
The selected action or None if there are no actions to select from.
"""
...

class PassiveAgent(DecisionAgent):
def __init__(self, *args, **kwargs):
...

def get_next_action(
self,
agent: MalSimAgentStateView,
**kwargs
) -> Optional[AttackGraphNode]:
...
62 changes: 33 additions & 29 deletions malsim/agents/keyboard_input.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,29 @@
import numpy as np
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Optional

AGENT_ATTACKER = "attacker"
AGENT_DEFENDER = "defender"
from .decision_agent import DecisionAgent
from ..sims import MalSimAgentStateView

if TYPE_CHECKING:
from maltoolbox.attackgraph import AttackGraphNode

logger = logging.getLogger(__name__)

null_action = (0, None)
class KeyboardAgent(DecisionAgent):
"""An agent that makes decisions by asking user for keyboard input"""

def __init__(self, _, **kwargs):
super().__init__(**kwargs)
logger.info("Creating KeyboardAgent")

class KeyboardAgent:
def __init__(self, vocab):
logger.debug("Create Keyboard agent.")
self.vocab = vocab
def get_next_action(
self,
agent: MalSimAgentStateView,
**kwargs
) -> Optional[AttackGraphNode]:
"""Compute action from action_surface"""

def compute_action_from_dict(self, obs: dict, mask: tuple) -> tuple:
def valid_action(user_input: str) -> bool:
if user_input == "":
return True
Expand All @@ -24,40 +33,35 @@ def valid_action(user_input: str) -> bool:
except ValueError:
return False

try:
a = associated_action[action_strings[node]]
except IndexError:
return False

if a == 0:
return True # wait is always valid
return node < len(available_actions) and node >= 0
return 0 <= node <= len(agent.action_surface)

def get_action_object(user_input: str) -> tuple:
node = int(user_input) if user_input != "" else None
action = associated_action[action_strings[node]] if user_input != "" else 0
return node, action

available_actions = np.flatnonzero(mask[1])
return node

action_strings = [self.vocab[i] for i in available_actions]
associated_action = {i: 1 for i in action_strings}
action_strings += ["wait"]
associated_action["wait"] = 0
if not agent.action_surface:
print("No actions to pick for defender")
return []

index_to_node = dict(enumerate(agent.action_surface))
user_input = "xxx"
while not valid_action(user_input):
print("Available actions:")
print("\n".join([f"{i}. {a}" for i, a in enumerate(action_strings)]))
print(
"\n".join(
[f"{i}. {n.full_name}" for i, n in index_to_node.items()]
)
)
print("Enter action or leave empty to wait:")
user_input = input("> ")

if not valid_action(user_input):
print("Invalid action.")

node, a = get_action_object(user_input)
index = get_action_object(user_input)
print(
f"Selected action: {action_strings[node] if node is not None else 'wait'}"
f"Selected action: {index_to_node[index].full_name}"
if index is not None else 'wait'
)

return (a, available_actions[node] if a != 0 else -1)
return index_to_node[index] if index is not None else None
175 changes: 68 additions & 107 deletions malsim/agents/searchers.py
Original file line number Diff line number Diff line change
@@ -1,137 +1,98 @@
from __future__ import annotations
import logging
import re

from collections import deque
from typing import Any, Deque, Dict, List, Set, Union
from typing import Optional, TYPE_CHECKING

import numpy as np

logger = logging.getLogger(__name__)

from .decision_agent import DecisionAgent
from ..sims import MalSimAgentStateView

def get_new_targets(
observation: dict, discovered_targets: Set[int], mask: tuple
) -> List[int]:
attack_surface = mask[1]
surface_indexes = list(np.flatnonzero(attack_surface))
new_targets = [idx for idx in surface_indexes if idx not in discovered_targets]
return new_targets, surface_indexes
if TYPE_CHECKING:
from maltoolbox.attackgraph import AttackGraphNode

logger = logging.getLogger(__name__)

class PassiveAttacker:
def compute_action_from_dict(self, observation, mask):
return (0, None)

class BreadthFirstAttacker:
def __init__(self, agent_config: dict) -> None:
self.targets: Deque[int] = deque([])
self.current_target: int = None
seed = (
agent_config["seed"]
if agent_config.get("seed", None)
else np.random.SeedSequence().entropy
)
self.rng = (
np.random.default_rng(seed)
if agent_config.get("randomize", False)
else None
)
class BreadthFirstAttacker(DecisionAgent):
"""A Breadth-First agent, with possible randomization at each level."""

def compute_action_from_dict(self, observation: Dict[str, Any], mask: tuple):
new_targets, surface_indexes = get_new_targets(observation, self.targets, mask)
_extend_method = "extendleft"
# Controls where newly discovered steps will be appended to the list of
# available actions. Currently used to differentiate between BFS and DFS
# agents.

# Add new targets to the back of the queue
# if desired, shuffle the new targets to make the attacker more unpredictable
if self.rng:
self.rng.shuffle(new_targets)
for c in new_targets:
self.targets.appendleft(c)
name = ' '.join(re.findall(r'[A-Z][^A-Z]*', __qualname__))
# A human-friendly name for the agent.

self.current_target, done = self.select_next_target(
self.current_target, self.targets, surface_indexes
)
default_settings = {
'randomize': False,
# Whether to randomize next target selection, still respecting the
# policy of the agent (e.g. BFS or DFS).
'seed': None,
# The random seed to initialize the randomness engine with.
}

self.current_target = None if done else self.current_target
action = 0 if done else 1
if action == 0:
logger.debug(
"Attacker Breadth First agent does not have "
"any valid targets it will terminate"
)
def __init__(self, agent_config: dict) -> None:
"""Initialize a BFS agent.
return (action, self.current_target)
Args:
agent_config: Dict with settings to override defaults
"""
self.targets: deque[AttackGraphNode] = deque()
self.current_target: Optional[AttackGraphNode] = None

@staticmethod
def select_next_target(
current_target: int,
targets: Union[List[int], Deque[int]],
attack_surface: Set[int],
) -> int:
# If the current target was not compromised, put it
# back, but on the bottom of the stack.
if current_target in attack_surface:
targets.appendleft(current_target)
current_target = targets.pop()
self.settings = self.default_settings | agent_config

while current_target not in attack_surface:
if len(targets) == 0:
return None, True
self.rng = np.random.default_rng(
self.settings['seed'] or np.random.SeedSequence()
)

current_target = targets.pop()
def get_next_action(
self, agent: MalSimAgentStateView, **kwargs
) -> Optional[AttackGraphNode]:
self._update_targets(agent.action_surface)
self._select_next_target()

return current_target, False
return self.current_target

def _update_targets(self, action_surface: list[AttackGraphNode]):

class DepthFirstAttacker:
def __init__(self, agent_config: dict) -> None:
self.current_target = -1
self.targets: List[int] = []
seed = (
agent_config["seed"]
if agent_config.get("seed", None)
else np.random.SeedSequence().entropy
)
self.rng = (
np.random.default_rng(seed)
if agent_config.get("randomize", False)
else None
)
# action surface does not have a guaranteed order,
# so for the agent to be deterministic we need to sort
action_surface.sort(key=lambda n: n.id)

def compute_action_from_dict(self, observation: Dict[str, Any], mask: tuple):
new_targets, surface_indexes = get_new_targets(observation, self.targets, mask)
new_targets = [
step
for step in action_surface
if step not in self.targets and not step.is_compromised()
]

# Add new targets to the top of the stack
if self.rng:
if self.settings['randomize']:
self.rng.shuffle(new_targets)
for c in new_targets:
self.targets.append(c)

self.current_target, done = self.select_next_target(
self.current_target, self.targets, surface_indexes
)

self.current_target = None if done else self.current_target
action = 0 if done else 1
return (action, self.current_target)

@staticmethod
def select_next_target(
current_target: int,
targets: Union[List[int], Deque[int]],
attack_surface: Set[int],
) -> int:
if current_target in attack_surface:
return current_target, False
if self.current_target in new_targets:
# If self.current_target is not yet compromised, e.g. due to TTCs,
# keep using that as the target.
new_targets.remove(self.current_target)
new_targets.append(self.current_target)

while current_target not in attack_surface:
if len(targets) == 0:
return None, True
# Enabled defenses may remove previously possible attack steps.
self.targets = deque(filter(lambda n: n.is_viable, self.targets))

current_target = targets.pop()
getattr(self.targets, self._extend_method)(new_targets)

return current_target, False
def _select_next_target(self) -> None:
"""
Implement the actual next target selection logic.
"""
try:
self.current_target = self.targets.pop()
except IndexError:
self.current_target = None


AGENTS = {
BreadthFirstAttacker.__name__: BreadthFirstAttacker,
DepthFirstAttacker.__name__: DepthFirstAttacker,
}
class DepthFirstAttacker(BreadthFirstAttacker):
_extend_method = "extend"
Loading