diff --git a/.gitignore b/.gitignore index efb31e2..c80b77e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ ### Project specific ignores results*/ +src_bak ### Python ignores # Byte-compiled / optimized / DLL files diff --git a/src_bak/configuration.py b/src_bak/configuration.py deleted file mode 100755 index 3624782..0000000 --- a/src_bak/configuration.py +++ /dev/null @@ -1,136 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -"""A class for test configurations on batch compute.""" - -from getpass import getuser -from pathlib import Path -from re import search as re_search -from subprocess import PIPE # nosec -from subprocess import run as subprocess_run # nosec -from tempfile import NamedTemporaryFile -from time import sleep -from typing import Any - -BASH_SHEBANG = "#!/bin/sh\n" -JOB_ID_REGEX = r"Submitted batch job (\d+)" - - -class RunConfiguration: - """A builder/runner for a run configuration.""" - - def __init__(self, name: str, run_command: str, output_file: Path): - """Initialise the run configuration file as a empty bash file.""" - # TODO: Are name and output file both needed? - self.name: str = name - self.output_file: Path = output_file - self.sbatch_config: dict[str, str] = {} - self.module_loads: list[str] = [] - self.environment_variables: dict[str, str] = {} - self.directory: Path | None = None - self.build_commands: list[str] = [] - self.run_command: str = run_command - self.args: str | None = None - - @property - def sbatch_contents(self) -> str: - """Construct the sbatch configuration for the run.""" - sbatch_file = BASH_SHEBANG - - for key, value in self.sbatch_config.items(): - sbatch_file += f"#SBATCH --{key}={value}\n" - sbatch_file += f"#SBATCH --output={self.output_file}\n" - if "output" in self.sbatch_config: - # NOTE: The output file will always override this key! - # This should probably be a logging statement... - print("WARNING: Output file configuration overriden!") - - if len(self.module_loads) > 0: - sbatch_file += "module purge\n" - sbatch_file += f"module load {' '.join(self.module_loads)}\n" - - for key, value in self.environment_variables.items(): - sbatch_file += f"export {key}={value}\n" - - sbatch_file += "\necho '===== ENVIRONMENT ====='\n" - sbatch_file += "echo '=== CPU ARCHITECTURE ==='\n" - sbatch_file += "lscpu\n" - sbatch_file += "echo '=== SLURM CONFIG ==='\n" - sbatch_file += "scontrol show job $SLURM_JOB_ID\n" - sbatch_file += "echo\n" - - sbatch_file += "\necho '===== BUILD ====='\n" - if self.directory is not None: - sbatch_file += f"cd {self.directory}\n" - sbatch_file += "\n".join(self.build_commands) + "\n" - - sbatch_file += f"\necho '===== RUN {self.name} ====='\n" - sbatch_file += f"time {self.run_command} {self.args}\n" - - return sbatch_file - - def __repr__(self) -> str: - """Get the sbatch configuration file defining the run.""" - return self.sbatch_contents - - def run(self) -> int | None: - """Run the specified run configuration.""" - # Ensure the output directory exists before it is used - self.output_file.parent.mkdir(parents=True, exist_ok=True) - - # Create and run the temporary sbatch file - with NamedTemporaryFile( - prefix=self.name, suffix=".sbatch", dir=Path("./"), mode="w+" - ) as sbatch_tmp: - sbatch_tmp.write(self.sbatch_contents) - sbatch_tmp.flush() - result = subprocess_run( # nosec - ["sbatch", Path(sbatch_tmp.name)], # noqa: S603, S607 - check=True, - stdout=PIPE, - ) - job_id_search = re_search(JOB_ID_REGEX, result.stdout.decode("utf-8")) - if job_id_search is None: - return None - return int(job_id_search.group(1)) - - @classmethod - def get_output_file_name( - cls, run_configuration_name: str, variables: dict[str, Any] - ) -> str: - """Construct an output file name for a run.""" - # TODO: Better representation of sbatch etc than stringifying - variables_str = ",".join( - f"{name}={str(value).replace('/','').replace(' ','_')}" - for name, value in variables.items() - ) - return f"{run_configuration_name}__{variables_str}__%j.out" - - -def wait_till_queue_empty( - max_time_to_wait: int = 172_800, backoff: list[int] | None = None -) -> bool: - """Wait until the Slurm queue is empty for the current user.""" - if backoff is None or len(backoff) < 1: - backoff = [5, 10, 15, 30, 60] - - time_waited = 0 - backoff_index = 0 - print("Starting to wait for Slurm queue") - while time_waited < max_time_to_wait: - wait_time = backoff[backoff_index] - sleep(wait_time) - print(f"Waited {time_waited}s for Slurm queue to empty") - time_waited += wait_time - - result = subprocess_run( # nosec - ["squeue", "-u", getuser()], # noqa: S603, S607 - check=True, - stdout=PIPE, - ) - if result.stdout.decode("utf-8").count("\n") <= 1: - return True - - if backoff_index < len(backoff) - 1: - backoff_index += 1 - - return False diff --git a/src_bak/main.py b/src_bak/main.py deleted file mode 100755 index 776b338..0000000 --- a/src_bak/main.py +++ /dev/null @@ -1,61 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -"""Docstring for an example tool.""" - -from argparse import ArgumentParser, Namespace -from enum import Enum, auto -from pathlib import Path - -from hpc_multibench.tui.user_interface import UserInterface -from hpc_multibench.yaml_model import TestPlan - - -class Mode(Enum): - """.""" - - RUN = auto() - ANALYSE = auto() - ALL = auto() - - -def get_parser() -> ArgumentParser: - """Get the argument parser for the tool.""" - parser = ArgumentParser(description="A tool to spawn and analyse HPC jobs.") - parser.add_argument( - "yaml_path", type=Path, help="the path to the configuration YAML file" - ) - mutex_group = parser.add_mutually_exclusive_group() - mutex_group.add_argument( - "-m", - "--mode", - type=Mode, # TODO: This needs fixing to actually work... - default=Mode.RUN, - help="the mode to run the tool in (default: run)", - ) - mutex_group.add_argument( - "-i", - "--interactive", - action="store_true", - help="show the interactive TUI", - ) - return parser - - -def main() -> None: # pragma: no cover - """Run the tool.""" - # We can set yaml_path to `Path("./yaml_examples/kudu_plan.yaml")` - args: Namespace = get_parser().parse_args() - test_plan = TestPlan.from_yaml(args.yaml_path) - - if args.interactive: - UserInterface(test_plan).run() - - else: - if args.mode in (Mode.RUN, Mode.ALL): - test_plan.run() - - if args.mode == Mode.ALL: - pass # wait_till_queue_empty() - - if args.mode in (Mode.ANALYSE, Mode.ALL): - test_plan.analyse() diff --git a/src_bak/tui/user_interface.py b/src_bak/tui/user_interface.py deleted file mode 100755 index 2009385..0000000 --- a/src_bak/tui/user_interface.py +++ /dev/null @@ -1,240 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -"""The definition of the user interface.""" - -from textual.app import App, ComposeResult -from textual.containers import Container, Horizontal, Vertical -from textual.screen import Screen -from textual.timer import Timer -from textual.widgets import ( - Button, - DataTable, - Footer, - Header, - Label, - ProgressBar, - TabbedContent, - TabPane, - TextArea, - Tree, -) -from textual.widgets.tree import TreeNode -from textual_plotext import PlotextPlot - -from hpc_multibench.yaml_model import BenchModel, RunConfigurationModel, TestPlan - -TestPlanTreeType = RunConfigurationModel | BenchModel - -PLOTEXT_MARKER = "braille" -INITIAL_TAB = "run-tab" - - -class TestPlanTree(Tree[TestPlanTreeType]): - """A tree showing the hierarchy of benches and runs in a test plan.""" - - def __init__(self, *args, **kwargs) -> None: - """Instantiate a tree representing a test plan.""" - self.previous_cursor_node: TreeNode[TestPlanTreeType] | None = None - self._app: "UserInterface" = self.app - super().__init__(*args, **kwargs) - - def populate(self) -> None: - """Populate the tree with data from the test plan.""" - for bench_name, bench in self._app.test_plan.benches.items(): - bench_node = self.root.add(bench_name, data=bench) - for run_configuration_name in bench.run_configurations: - run_configuration: RunConfigurationModel = ( - self._app.test_plan.run_configurations[run_configuration_name] - ) - bench_node.add( - run_configuration_name, - allow_expand=False, - data=run_configuration, - ) - bench_node.expand() - self.root.expand() - - def action_select_cursor(self) -> None: - """Pass the selection back and only toggle if already selected.""" - if self.cursor_node is not None: - self._app.handle_tree_selection(self.cursor_node) - if self.cursor_node in (self.previous_cursor_node, self.root): - self.cursor_node.toggle() - self.previous_cursor_node = self.cursor_node - - -class RunDialogScreen(Screen[None]): - """Screen with a dialog to quit.""" - - progress_timer: Timer - - def compose(self) -> ComposeResult: - """Compose the structure of the dialog screen.""" - with Vertical(id="run-dialog"): - yield Label( - "Waiting for queued jobs to complete.\nYou can continue, but may need to reload once they are complete.", - id="run-dialog-message", - ) - yield ProgressBar(id="run-dialog-progress") - yield Button("Continue", variant="primary", id="run-dialog-continue") - - def on_mount(self) -> None: - """Set up a timer to simulate progess happening.""" - self.progress_timer = self.set_interval(1 / 10, self.make_progress) - self.query_one(ProgressBar).update(total=100) - - def make_progress(self) -> None: - """Called automatically to advance the progress bar.""" - self.query_one(ProgressBar).advance(1) - - def on_button_pressed(self, event: Button.Pressed) -> None: - """Dismiss the modal dialog when the continue button is pressed.""" - if event.button.id == "run-dialog-continue": - self.app.pop_screen() - - -class UserInterface(App[None]): - """The interactive TUI.""" - - CSS_PATH = "user_interface.tcss" - TITLE = "HPC MultiBench" - SUB_TITLE = "A Swiss army knife for comparing programs on HPC resources" - - BINDINGS = [ - ("q", "quit", "Quit"), - # TODO: Add button to reload test plan - ] - - def __init__(self, test_plan: TestPlan, *args, **kwargs) -> None: - """.""" - self.test_plan: TestPlan = test_plan - self.start_pane_shown: bool = True - super().__init__(*args, **kwargs) - - def compose(self) -> ComposeResult: - """Compose the structure of the application.""" - yield Header() - with Horizontal(): - # The navigation bar for the test plan - yield TestPlanTree(label="Test Plan", id="explorer") - - # The starting pane that conceals the data pane when nothing selected - with Container(id="start-pane"): - yield Label("Select a benchmark or run to start", id="start-pane-label") - - with TabbedContent(initial=INITIAL_TAB, id="informer"): - with TabPane("Run", id="run-tab"): - yield DataTable(id="run-information") - # TODO: Get bash language working - yield TextArea( - "echo hello", - id="sbatch-contents", - read_only=True, - show_line_numbers=True, - ) - yield Button("Run", id="run-button") - with TabPane("Metrics", id="metrics-tab"): - yield DataTable(id="metrics-table") - with TabPane("Plot", id="plot-tab"): - yield PlotextPlot(id="metrics-plot") - yield Footer() - - def on_mount(self) -> None: - """Initialise data when the application is created.""" - tree = self.query_one(TestPlanTree) - tree.populate() - - def on_button_pressed(self, event: Button.Pressed) -> None: - """.""" - if event.button.id == "run-button": - self.push_screen(RunDialogScreen()) - - def remove_start_pane(self) -> None: - """Remove the start pane from the screen.""" - if self.start_pane_shown: - self.query_one("#start-pane", Container).remove() - self.start_pane_shown = False - - def update_run_tab(self, node: TreeNode[TestPlanTreeType]) -> None: - """.""" - run_information = self.query_one("#run-information", DataTable) - sbatch_contents = self.query_one("#sbatch-contents", TextArea) - run_information.clear(columns=True) - - if isinstance(node.data, BenchModel): - # TODO: This is a slightly annoying hack - but it works... - sbatch_contents.visible = False - sbatch_contents.text = "" - matrix_iterator = node.data.matrix_iterator - else: - sbatch_contents.visible = True - sbatch_contents.text = node.data.realise( - "", str(node.label), {} - ).sbatch_contents - - assert node.parent is not None - matrix_iterator = node.parent.data.matrix_iterator - - next_values = next(matrix_iterator, None) - if next_values is not None: - run_information.add_columns(*next_values.keys()) - run_information.add_row(*next_values.values()) - for item in matrix_iterator: - run_information.add_row(*item.values()) - - def update_metrics_tab(self, node: TreeNode[TestPlanTreeType]) -> None: - """.""" - metrics_table = self.query_one("#metrics-table", DataTable) - metrics_table.clear(columns=True) - if isinstance(node.data, BenchModel): - column_names = ["Name", *list(node.data.analysis.metrics.keys())] - metrics_table.add_columns(*column_names) - for results in node.data.get_analysis(str(node.label)): - metrics_table.add_row(*results.values()) - # TODO: Fix sorting - # metrics_table.sort("Name", node.data.analysis.plot.x) - else: - assert node.parent is not None - metrics_table.add_columns(*node.parent.data.analysis.metrics.keys()) - for results in node.parent.data.get_analysis(str(node.parent.label)): - if results["name"] == str(node.label): - metrics_table.add_row( - *[value for key, value in results.items() if key != "name"] - ) - # metrics_table.sort(node.parent.data.analysis.plot.x) - - def update_plot_tab(self, node: TreeNode[TestPlanTreeType]) -> None: - """.""" - metrics_plot_widget = self.query_one("#metrics-plot", PlotextPlot) - metrics_plot = metrics_plot_widget.plt - metrics_plot.clear_figure() - metrics_plot.title("Benchmark analysis") - if isinstance(node.data, BenchModel): - # metrics_plot.plot(metrics_plot.sin()) - for name, result in node.data.comparative_plot_results( - str(node.label) - ).items(): - metrics_plot.plot( - *zip(*result, strict=True), label=name, marker=PLOTEXT_MARKER - ) - else: - assert node.parent is not None - for name, result in node.parent.data.comparative_plot_results( - str(node.parent.label) - ).items(): - if name == str(node.label): - metrics_plot.plot( - *zip(*result, strict=True), label=name, marker=PLOTEXT_MARKER - ) - metrics_plot_widget.refresh() - - def handle_tree_selection(self, node: TreeNode[TestPlanTreeType]) -> None: - """.""" - if node == self.query_one(TestPlanTree).root: - return - - self.remove_start_pane() - - self.update_run_tab(node) - self.update_metrics_tab(node) - self.update_plot_tab(node) diff --git a/src_bak/tui/user_interface.tcss b/src_bak/tui/user_interface.tcss deleted file mode 100644 index f0bad80..0000000 --- a/src_bak/tui/user_interface.tcss +++ /dev/null @@ -1,56 +0,0 @@ -#explorer { - width: 25%; - height: 100%; -} -#informer { - width: 75%; - height: 100%; -} - -#start-pane { - align: center middle; - width: 75%; - height: 100%; -} -#start-pane-label { - text-align: center; - content-align: center middle; -} - -#sbatch-contents { - margin: 1 0 0 0; -} -#run-button { - align: center bottom; - margin: 1 0 0 0; - width: 100%; -} - - - -RunDialogScreen { - align: center middle; -} - -#run-dialog { - padding: 0 1; - width: 60; - height: 11; - border: thick $background 80%; - background: $surface; - content-align: center middle; -} -#run-dialog-message { - width: 1fr; - text-align: center; - content-align: center middle; -} -#run-dialog-progress { - margin: 2; - text-align: center; - content-align: center middle; -} -#run-dialog-continue { - align: center bottom; - content-align: center middle; -} diff --git a/src_bak/yaml_model.py b/src_bak/yaml_model.py deleted file mode 100755 index 302c561..0000000 --- a/src_bak/yaml_model.py +++ /dev/null @@ -1,219 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -A set of objects modelling the YAML schema. - -Sample code for adding defaults: -```python -class Defaults(BaseModel): - sbatch_config: Optional[list[str]] = None - module_loads: Optional[list[str]] = None - environment_variables: Optional[list[str]] = None - directory: Optional[Path] = None - build_commands: Optional[list[str]] = None - run_commands: Optional[list[str]] = None - args: Optional[str] = None -``` -""" - -from collections.abc import Iterator -from itertools import product -from pathlib import Path -from re import search as re_search -from typing import Any - -import matplotlib.pyplot as plt -from pydantic import BaseModel -from ruamel.yaml import YAML -from typing_extensions import Self - -from hpc_multibench.configuration import RunConfiguration - -BASE_OUTPUT_DIRECTORY = Path("results/") -NAME_REGEX = r"===== RUN (.*) =====" - - -class RunConfigurationModel(BaseModel): - """A Pydantic model for an executable.""" - - sbatch_config: dict[str, Any] - module_loads: list[str] - environment_variables: dict[str, Any] - directory: Path - build_commands: list[str] - run_command: str - args: str | None = None - - def realise( - self, bench_name: str, run_configuration_name: str, variables: dict[str, Any] - ) -> RunConfiguration: - """Construct a run configuration from its data model.""" - # Get the output file path - output_file_name = RunConfiguration.get_output_file_name( - run_configuration_name, variables - ) - output_file = BASE_OUTPUT_DIRECTORY / bench_name / output_file_name - - # TODO: Modify contents based on variables keys here - - run = RunConfiguration(run_configuration_name, self.run_command, output_file) - run.sbatch_config = self.sbatch_config - run.module_loads = self.module_loads - run.environment_variables = self.environment_variables - run.directory = Path(self.directory) - run.build_commands = self.build_commands - run.args = self.args - - # Fix this to work for more things than args... - for key, value in variables.items(): - # TODO: Error checking on keys - setattr(run, key, value) - # if key == "args": - # run.args = value - - return run - - -class PlotModel(BaseModel): - """A Pydantic model for plotting two variables.""" - - x: str - y: str - - -class AnalysisModel(BaseModel): - """A Pydantic model for a test bench's analysis.""" - - metrics: dict[str, str] - plot: PlotModel - - def parse_output_file(self, output_file: Path) -> dict[str, str] | None: - """.""" - run_output = output_file.read_text(encoding="utf-8") - results: dict[str, str] = {} - name_search = re_search(NAME_REGEX, run_output) - if name_search is None: - return None - results["name"] = name_search.group(1) - for name, regex in self.metrics.items(): - metric_search = re_search(regex, run_output) - if metric_search is None: - return None - # TODO: Support multiple groups by lists as keys? - results[name] = metric_search.group(1) - return results - - -class BenchModel(BaseModel): - """A Pydantic model for a test bench.""" - - run_configurations: list[str] - # This is a list of dictionaries to preserve matrix ordering!!! - matrix: dict[str | tuple[str, ...], list[Any]] - analysis: AnalysisModel - - def get_runs( - self, bench_name: str, run_configurations: dict[str, RunConfigurationModel] - ) -> Iterator[RunConfiguration]: - """.""" - for variables in self.matrix_iterator: - for run_configuration_name in self.run_configurations: - if run_configuration_name not in run_configurations.keys(): - raise RuntimeError( - f"'{run_configuration_name}' not in list of" - " defined run configurations!" - ) - - yield run_configurations[run_configuration_name].realise( - bench_name, run_configuration_name, variables - ) - - def get_analysis(self, bench_name: str) -> Iterator[dict[str, str]]: - """.""" - output_directory = BASE_OUTPUT_DIRECTORY / bench_name - for output_file in output_directory.iterdir(): - results = self.analysis.parse_output_file(output_file) - if results is not None: - yield results - - def comparative_plot_results( - self, bench_name: str - ) -> dict[str, list[tuple[float, float]]]: - """.""" - results: dict[str, list[tuple[float, float]]] = {} - for result in self.get_analysis(bench_name): - if result["name"] not in results: - results[result["name"]] = [] - point = ( - float(result[self.analysis.plot.x]), - float(result[self.analysis.plot.y]), - ) - results[result["name"]].append(point) - return {name: sorted(value) for name, value in results.items()} - - def plot(self, bench_name: str) -> None: - """.""" - results = self.comparative_plot_results(bench_name) - for name, result in results.items(): - print(name, result) - plt.plot(*zip(*result, strict=True), marker="x", label=name) - plt.xlabel(self.analysis.plot.x) - plt.ylabel(self.analysis.plot.y) - plt.title("Benchmark analysis") - plt.legend() - plt.show() - - @property - def matrix_iterator(self) -> Iterator[dict[str, Any]]: - """Get an iterator of values to update from the test matrix.""" - # TODO: How does a property play with iterators? - shaped: list[list[list[tuple[str, Any]]]] = [ - ( - [[(key, value)] for value in values] - if isinstance(key, str) - else [ - [(k, v) for k, v in zip(key, setting, strict=True)] - for setting in values - ] - ) - for key, values in self.matrix.items() - ] - for combination in product(*shaped): - # # Consider the case - # # [sbatch_config, sbatch_config]: - # # - [{"nodes": 2}, {"mem-per-cpu": 1000}] - # # - # variables = {} - # for items in combination: - # for item in items: - # if item[0] in variables and isinstance(item[1], dict): - # variables[item[0]].update(item[1]) - # else: - # variables[item[0]] = item[1] - # yield variables - yield {item[0]: item[1] for items in combination for item in items} - - -class TestPlan(BaseModel): - """A Pydantic model for a set of test benches and their executables.""" - - run_configurations: dict[str, RunConfigurationModel] - benches: dict[str, BenchModel] - - @classmethod - def from_yaml(cls, file: Path) -> Self: - """Construct the model from a YAML file.""" - with file.open(encoding="utf-8") as handle: - return cls(**YAML(typ="safe").load(handle)) - - def run(self) -> None: - """.""" - for bench_name, bench in self.benches.items(): - for run in bench.get_runs(bench_name, self.run_configurations): - print(run) - # Run the configurations with `run.run()` - - def analyse(self) -> None: - """.""" - for bench_name, bench in self.benches.items(): - bench.plot(bench_name)