Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tool for identifying the most used rules #11439

Merged
merged 10 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions build-scripts/profile_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import argparse

try:
from utils.profile_tool import command_stats, command_sub
from utils.profile_tool import command_stats, command_sub, command_most_used_rules
except ImportError:
print("The ssg module could not be found.")
print(
Expand Down Expand Up @@ -250,11 +250,39 @@ def parse_sub_subcommand(subparsers):
)


def parse_most_used_rules_subcommand(subparsers):
parser_most_used_rules = subparsers.add_parser(
"most-used-rules",
description=(
"Generates list of all rules used by the existing profiles. In various formats."
),
help="Generates list of all rules used by the existing profiles.",
)
parser_most_used_rules.add_argument(
"BENCHMARKS",
type=str,
nargs="*",
default=[],
help=(
"Specify XCCDF files or a SCAP source data stream files to act on. "
"If not provided are used control files. e.g.: ~/scap-security-guide/controls"
),
)
parser_most_used_rules.add_argument(
"--format",
default="plain",
choices=["plain", "json", "csv"],
help="Which format to use for output.",
)


def parse_args():
parser = argparse.ArgumentParser(description="Profile statistics and utilities tool")
subparsers = parser.add_subparsers(title="subcommands", dest="subcommand", required=True)

parse_stats_subcommand(subparsers)
parse_sub_subcommand(subparsers)
parse_most_used_rules_subcommand(subparsers)

args = parser.parse_args()

Expand Down Expand Up @@ -287,7 +315,11 @@ def parse_args():
return args


SUBCMDS = dict(stats=command_stats, sub=command_sub)
SUBCMDS = {
"stats": command_stats,
"sub": command_sub,
"most-used-rules": command_most_used_rules,
}


def main():
Expand Down
20 changes: 19 additions & 1 deletion docs/manual/developer/05_tools_and_utilities.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,29 @@ rules selected by another profile, run this command:

```bash
$ ./build-scripts/profile_tool.py sub --profile1 rhel7/profiles/ospp.profile --profile2 rhel7/profiles/pci-dss.profile
````
```

This will result in a new YAML profile containing exclusive rules to the
profile pointed by the `--profile1` option.

The tool can also generate a list of the most used rules contained in profiles from a given data stream or benchmark.

For example, to get a list of the most used rules in the benchmark for `rhel8`, run this command:

```bash
$ ./build-scripts/profile_tool.py most-used-rules build/ssg-rhel8-xccdf.xml
```

Or you can also run this command to get a list of the most used rules in the entire project:

```bash
$ ./build-scripts/profile_tool.py most-used-rules
```

The result will be a list of rules with the number of uses in the profiles.
The list can be generated as plain text, JSON or CVS.
Via the `--format FORMAT` parameter.

## Generating Controls from DISA's XCCDF Files

If you want a control file for product from DISA's XCCDF files you can run the following command:
Expand Down
10 changes: 8 additions & 2 deletions ssg/build_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -807,15 +807,21 @@ def show_profile_stats(self, profile, options):

return profile_stats

def show_all_profile_stats(self, options):
def _process_all_profile_stats(self, function_to_process_profile, *args):
all_profile_elems = self.tree.findall("./{%s}Profile" % (XCCDF12_NS))
ret = []
for elem in all_profile_elems:
profile = elem.get('id')
if profile is not None:
ret.append(self.show_profile_stats(profile, options))
ret.append(function_to_process_profile(profile, *args))
return ret

def show_all_profile_stats(self, options):
return self._process_all_profile_stats(self.show_profile_stats, options)

def get_all_profile_stats(self):
return self._process_all_profile_stats(self.get_profile_stats)

def console_print(self, content, width):
"""Prints the 'content' array left aligned, each time 45 characters
long, each row 'width' characters wide"""
Expand Down
23 changes: 23 additions & 0 deletions tests/unit/utils/test_generate_most_used_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import os
import sys
import pytest
from argparse import Namespace
from utils.profile_tool import command_most_used_rules

DATA_DIR = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "ssg-module", "data")
)
DATA_STREAM_PATH = os.path.join(DATA_DIR, "simple_data_stream.xml")


def get_fake_args():
return Namespace(
subcommand="most-used-rules", BENCHMARKS=[str(DATA_STREAM_PATH)], format="plain"
)


@pytest.mark.skipif(sys.version_info[0] < 3, reason="requires python3")
def test_command(capsys):
command_most_used_rules(get_fake_args())
captured = capsys.readouterr()
assert "xccdf_com.example.www_rule_test-pass: 1" in captured.out
1 change: 1 addition & 0 deletions utils/profile_tool/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .sub import command_sub
from .stats import command_stats
from .most_used_rules import command_most_used_rules
80 changes: 80 additions & 0 deletions utils/profile_tool/most_used_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import sys
import json

from ssg.build_profile import XCCDFBenchmark


PYTHON_2 = sys.version_info[0] < 3

if not PYTHON_2:
from .profile import get_profile
from ..controleval import (
load_controls_manager,
get_available_products,
get_product_profiles_files,
)


def _count_rules_per_rules_list(rules_list, rules):
for rule in rules_list:
if rule in rules:
rules[rule] += 1
else:
rules[rule] = 1


def _count_rules_per_benchmark(benchmark, rules):
benchmark = XCCDFBenchmark(benchmark)
for profile in benchmark.get_all_profile_stats():
_count_rules_per_rules_list(profile.get("rules", []), rules)


def _get_profiles_for_product(ctrls_mgr, product):
profiles_files = get_product_profiles_files(product)

profiles = []
for file in profiles_files:
profiles.append(get_profile(profiles_files, file, ctrls_mgr.policies))
return profiles


def _process_all_products_from_controls(rules):
if PYTHON_2:
raise Exception("This feature is not supported for python2.")

for product in get_available_products():
controls_manager = load_controls_manager("./controls/", product)
for profile in _get_profiles_for_product(controls_manager, product):
_count_rules_per_rules_list(profile.rules, rules)


def _sorted_rules(rules):
sorted_rules = {
k: v
for k, v in sorted(rules.items(), key=lambda x: x[1], reverse=True)
}
return sorted_rules


def command_most_used_rules(args):
rules = {}

if not args.BENCHMARKS:
_process_all_products_from_controls(rules)
else:
for benchmark in args.BENCHMARKS:
_count_rules_per_benchmark(benchmark, rules)

sorted_rules = _sorted_rules(rules)

f_string = "{}: {}"

if args.format == "json":
print(json.dumps(sorted_rules, indent=4))
return
elif args.format == "csv":
print("rule_id,count_of_profiles")
f_string = "{},{}"

for rule_id, rule_count in sorted_rules.items():
print(f_string.format(rule_id, rule_count))
100 changes: 100 additions & 0 deletions utils/profile_tool/profile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from ..controleval import get_parameter_from_yaml


def _get_extends_profile_path(profiles_files, profile_name):
for profile_path in profiles_files:
if f"{profile_name}.profile" in profile_path:
return profile_path
return None


def _process_extends(profiles_files, file, policies, profile):
extends = get_parameter_from_yaml(file, "extends")
if isinstance(extends, str):
profile_path = _get_extends_profile_path(profiles_files, extends)
if profile_path is None:
raise Exception("There is no Extension '{}' Profile.".format(extends))
profile = get_profile(profiles_files, profile_path, policies, profile)


def _process_selections(file, profile, policies):
selections = get_parameter_from_yaml(file, "selections")
for selected in selections:
if ":" in selected and "=" not in selected:
profile.add_from_policy(policies, selected)
else:
profile.add_rule(selected)
profile.clean_rules()


def get_profile(profiles_files, file, policies, profile=None):
if profile is None:
title = get_parameter_from_yaml(file, "title")
profile = Profile(file, title)

_process_extends(profiles_files, file, policies, profile)

_process_selections(file, profile, policies)
return profile


class Profile:
def __init__(self, path, title):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered to also count the variables? I think it is easy to extend and this information might be useful as well.

If so, the next function can also be renamed from add_rule to add_rule_or_var, for example. However, it would be probably better to extend this in another PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it can easily be expanded if necessary.

self.path = path
self.title = title
self.rules = []
self.unselected_rules = []

def add_rule(self, rule_id):
if rule_id.startswith("!"):
self.unselected_rules.append(rule_id)
return
if "=" not in rule_id:
self.rules.append(rule_id)

def add_rules(self, rules):
for rule in rules:
self.add_rule(rule)

def clean_rules(self):
for rule in self.unselected_rules:
rule_ = rule.replace("!", "")
if rule_ in self.rules:
self.rules.remove(rule_)

@staticmethod
def _get_sel(selected):
policy = None
control = None
level = None
if selected.count(":") == 2:
policy, control, level = selected.split(":")
else:
policy, control = selected.split(":")
return policy, control, level

@staticmethod
def _get_levels(policy, level):
levels = [level]
if policy.levels_by_id.get(level).inherits_from is not None:
levels.extend(policy.levels_by_id.get(level).inherits_from)
return levels

def add_from_policy(self, policies, selected):
policy_id, control, level = self._get_sel(selected)
policy = policies[policy_id]

if control != "all":
self.add_rules(policy.controls_by_id[control].rules)
return

if level is None:
for control in policy.controls:
self.add_rules(control.rules)
return

levels = self._get_levels(policy, level)
for control in policy.controls:
intersection = set(control.levels) & set(levels)
if len(intersection) >= 1:
self.add_rules(control.rules)
Loading