Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for output entities in CSV configuration #49

Merged
merged 2 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions maltego_trx/decorator_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,18 @@
serialize_xml,
)

TRANSFORMS_CSV_HEADER = (
LEGACY_TRANSFORMS_CSV_HEADER = (
"Owner,Author,Disclaimer,Description,Version,"
"Name,UIName,URL,entityName,"
"oAuthSettingId,transformSettingIDs,seedIDs"
)

TRANSFORMS_CSV_HEADER = (
"Owner,Author,Disclaimer,Description,Version,"
"Name,UIName,URL,entityName,"
"oAuthSettingId,transformSettingIDs,seedIDs,outputEntities"
)

SETTINGS_CSV_HEADER = "Name,Type,Display,DefaultValue,Optional,Popup"


Expand Down Expand Up @@ -127,7 +134,7 @@ def decorated(transform_callable: object):

return decorated

def _create_transforms_config(self) -> Iterable[str]:
def _create_transforms_config(self, include_output_entities: bool = True) -> Iterable[str]:
global_settings_full_names = [gs.id for gs in self.global_settings]

for transform_name, transform_meta in self.transform_metas.items():
Expand All @@ -149,22 +156,30 @@ def _create_transforms_config(self) -> Iterable[str]:
";".join(self.oauth_settings_id),
# combine global and transform scoped settings
";".join(chain(meta_settings, global_settings_full_names)),
";".join(self.seed_ids),
";".join(self.seed_ids)
]

if include_output_entities:
transform_row.append(";".join(transform_meta.output_entities))

escaped_fields = escape_csv_fields(*transform_row)
yield ",".join(escaped_fields)

def write_transforms_config(
self, config_path: str = "./transforms.csv", csv_line_limit: int = 100
self, config_path: str = "./transforms.csv", csv_line_limit: int = 100, include_output_entities: bool = True
):
"""Exports the collected transform metadata as a csv-file to config_path"""

csv_lines = self._create_transforms_config()
csv_lines = self._create_transforms_config(include_output_entities=include_output_entities)

export_as_csv(
TRANSFORMS_CSV_HEADER, tuple(csv_lines), config_path, csv_line_limit
)
if not include_output_entities:
export_as_csv(
LEGACY_TRANSFORMS_CSV_HEADER, tuple(csv_lines), config_path, csv_line_limit
)
else:
export_as_csv(
TRANSFORMS_CSV_HEADER, tuple(csv_lines), config_path, csv_line_limit
)

def _create_settings_config(self) -> Iterable[str]:
chained_settings = chain(
Expand Down
2 changes: 1 addition & 1 deletion maltego_trx/template_dir/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

register_transform_classes(transforms)

registry.write_transforms_config()
registry.write_transforms_config(include_output_entities=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does default to True anyway

registry.write_settings_config()

if __name__ == '__main__':
Expand Down
51 changes: 49 additions & 2 deletions tests/test_decorator_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
TransformSetting,
TransformRegistry,
TRANSFORMS_CSV_HEADER,
LEGACY_TRANSFORMS_CSV_HEADER,
SETTINGS_CSV_HEADER,
TransformSet,
)
Expand Down Expand Up @@ -120,7 +121,21 @@ class TransformCsvLine(NamedTuple):
oauth_id: str
settings_ids: str
seed_ids: str
output_entities: str

class LegacyTransformCsvLine(NamedTuple):
owner: str
author: str
disclaimer: str
description: str
version: str
name: str
display_name: str
host: str
input_entity: str
oauth_id: str
settings_ids: str
seed_ids: str

class SettingCsvLine(NamedTuple):
name: str
Expand All @@ -139,9 +154,10 @@ def test_transform_to_csv(registry: TransformRegistry):
tx_meta = registry.transform_metas.get(path_name)
tx_settings = registry.transform_settings.get(path_name, [])

registry.write_transforms_config()
output_path = "./transforms_with_output_entities.csv"
registry.write_transforms_config(config_path=output_path,include_output_entities=True)

with open("./transforms.csv") as transforms_csv:
with open(output_path) as transforms_csv:
header = next(transforms_csv)
assert header.rstrip("\n") == TRANSFORMS_CSV_HEADER

Expand All @@ -160,7 +176,38 @@ def test_transform_to_csv(registry: TransformRegistry):
assert data.oauth_id == registry.oauth_settings_id
assert data.settings_ids.split(";") == [s.id for s in tx_settings]
assert data.seed_ids.split(";") == registry.seed_ids
assert data.output_entities.split(";") == tx_meta.output_entities

def test_transform_to_legacy_csv(registry: TransformRegistry):
random_class = make_transform(registry)

path_name = name_to_path(random_class.__name__)

tx_meta = registry.transform_metas.get(path_name)
tx_settings = registry.transform_settings.get(path_name, [])

output_path = "./transforms_no_output_entities.csv"
registry.write_transforms_config(config_path=output_path,include_output_entities=False)

with open(output_path) as transforms_csv:
header = next(transforms_csv)
assert header.rstrip("\n") == LEGACY_TRANSFORMS_CSV_HEADER

line = next(transforms_csv).rstrip("\n")
data: LegacyTransformCsvLine = LegacyTransformCsvLine(*line.split(","))

assert data.owner == registry.owner
assert data.author == registry.author
assert data.disclaimer == tx_meta.disclaimer
assert data.description == tx_meta.description
assert data.version == registry.version
assert data.name == tx_meta.class_name
assert data.display_name == tx_meta.display_name
assert data.host == os.path.join(registry.host_url, "run", path_name)
assert data.input_entity == tx_meta.input_entity
assert data.oauth_id == registry.oauth_settings_id
assert data.settings_ids.split(";") == [s.id for s in tx_settings]
assert data.seed_ids.split(";") == registry.seed_ids

def test_setting_to_csv(registry: TransformRegistry):
local_setting = make_transform_setting(global_setting=False)
Expand Down