Skip to content

Commit

Permalink
Add support for output entities in CSV configuration (#49)
Browse files Browse the repository at this point in the history
* refactor: add missing output entity to csv export

* refactor: add include output entities flag

---------

Co-authored-by: Tendai Marengereke <[email protected]>
  • Loading branch information
tendai-zw and Tendai Marengereke authored Apr 27, 2023
1 parent b0c3eae commit 534a2cb
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 11 deletions.
31 changes: 23 additions & 8 deletions maltego_trx/decorator_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,18 @@
serialize_xml,
)

TRANSFORMS_CSV_HEADER = (
LEGACY_TRANSFORMS_CSV_HEADER = (
"Owner,Author,Disclaimer,Description,Version,"
"Name,UIName,URL,entityName,"
"oAuthSettingId,transformSettingIDs,seedIDs"
)

TRANSFORMS_CSV_HEADER = (
"Owner,Author,Disclaimer,Description,Version,"
"Name,UIName,URL,entityName,"
"oAuthSettingId,transformSettingIDs,seedIDs,outputEntities"
)

SETTINGS_CSV_HEADER = "Name,Type,Display,DefaultValue,Optional,Popup"


Expand Down Expand Up @@ -127,7 +134,7 @@ def decorated(transform_callable: object):

return decorated

def _create_transforms_config(self) -> Iterable[str]:
def _create_transforms_config(self, include_output_entities: bool = True) -> Iterable[str]:
global_settings_full_names = [gs.id for gs in self.global_settings]

for transform_name, transform_meta in self.transform_metas.items():
Expand All @@ -149,22 +156,30 @@ def _create_transforms_config(self) -> Iterable[str]:
";".join(self.oauth_settings_id),
# combine global and transform scoped settings
";".join(chain(meta_settings, global_settings_full_names)),
";".join(self.seed_ids),
";".join(self.seed_ids)
]

if include_output_entities:
transform_row.append(";".join(transform_meta.output_entities))

escaped_fields = escape_csv_fields(*transform_row)
yield ",".join(escaped_fields)

def write_transforms_config(
self, config_path: str = "./transforms.csv", csv_line_limit: int = 100
self, config_path: str = "./transforms.csv", csv_line_limit: int = 100, include_output_entities: bool = True
):
"""Exports the collected transform metadata as a csv-file to config_path"""

csv_lines = self._create_transforms_config()
csv_lines = self._create_transforms_config(include_output_entities=include_output_entities)

export_as_csv(
TRANSFORMS_CSV_HEADER, tuple(csv_lines), config_path, csv_line_limit
)
if not include_output_entities:
export_as_csv(
LEGACY_TRANSFORMS_CSV_HEADER, tuple(csv_lines), config_path, csv_line_limit
)
else:
export_as_csv(
TRANSFORMS_CSV_HEADER, tuple(csv_lines), config_path, csv_line_limit
)

def _create_settings_config(self) -> Iterable[str]:
chained_settings = chain(
Expand Down
2 changes: 1 addition & 1 deletion maltego_trx/template_dir/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

register_transform_classes(transforms)

registry.write_transforms_config()
registry.write_transforms_config(include_output_entities=True)
registry.write_settings_config()

if __name__ == '__main__':
Expand Down
51 changes: 49 additions & 2 deletions tests/test_decorator_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
TransformSetting,
TransformRegistry,
TRANSFORMS_CSV_HEADER,
LEGACY_TRANSFORMS_CSV_HEADER,
SETTINGS_CSV_HEADER,
TransformSet,
)
Expand Down Expand Up @@ -120,7 +121,21 @@ class TransformCsvLine(NamedTuple):
oauth_id: str
settings_ids: str
seed_ids: str
output_entities: str

class LegacyTransformCsvLine(NamedTuple):
owner: str
author: str
disclaimer: str
description: str
version: str
name: str
display_name: str
host: str
input_entity: str
oauth_id: str
settings_ids: str
seed_ids: str

class SettingCsvLine(NamedTuple):
name: str
Expand All @@ -139,9 +154,10 @@ def test_transform_to_csv(registry: TransformRegistry):
tx_meta = registry.transform_metas.get(path_name)
tx_settings = registry.transform_settings.get(path_name, [])

registry.write_transforms_config()
output_path = "./transforms_with_output_entities.csv"
registry.write_transforms_config(config_path=output_path,include_output_entities=True)

with open("./transforms.csv") as transforms_csv:
with open(output_path) as transforms_csv:
header = next(transforms_csv)
assert header.rstrip("\n") == TRANSFORMS_CSV_HEADER

Expand All @@ -160,7 +176,38 @@ def test_transform_to_csv(registry: TransformRegistry):
assert data.oauth_id == registry.oauth_settings_id
assert data.settings_ids.split(";") == [s.id for s in tx_settings]
assert data.seed_ids.split(";") == registry.seed_ids
assert data.output_entities.split(";") == tx_meta.output_entities

def test_transform_to_legacy_csv(registry: TransformRegistry):
random_class = make_transform(registry)

path_name = name_to_path(random_class.__name__)

tx_meta = registry.transform_metas.get(path_name)
tx_settings = registry.transform_settings.get(path_name, [])

output_path = "./transforms_no_output_entities.csv"
registry.write_transforms_config(config_path=output_path,include_output_entities=False)

with open(output_path) as transforms_csv:
header = next(transforms_csv)
assert header.rstrip("\n") == LEGACY_TRANSFORMS_CSV_HEADER

line = next(transforms_csv).rstrip("\n")
data: LegacyTransformCsvLine = LegacyTransformCsvLine(*line.split(","))

assert data.owner == registry.owner
assert data.author == registry.author
assert data.disclaimer == tx_meta.disclaimer
assert data.description == tx_meta.description
assert data.version == registry.version
assert data.name == tx_meta.class_name
assert data.display_name == tx_meta.display_name
assert data.host == os.path.join(registry.host_url, "run", path_name)
assert data.input_entity == tx_meta.input_entity
assert data.oauth_id == registry.oauth_settings_id
assert data.settings_ids.split(";") == [s.id for s in tx_settings]
assert data.seed_ids.split(";") == registry.seed_ids

def test_setting_to_csv(registry: TransformRegistry):
local_setting = make_transform_setting(global_setting=False)
Expand Down

0 comments on commit 534a2cb

Please sign in to comment.