From ba8c3db9a2d767e91a0c66f43c94ea8dcad246af Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Wed, 22 May 2024 16:29:19 -0400 Subject: [PATCH 01/26] first pass at creating pipestat config and associating psms with pifaces --- looper/__init__.py | 1 + looper/cli_pydantic.py | 4 ++ looper/conductor.py | 7 ++ looper/plugins.py | 9 +++ looper/project.py | 122 +++++++++++++++++++++++++++++++++++ tests/smoketests/test_run.py | 29 +++++++++ 6 files changed, 172 insertions(+) diff --git a/looper/__init__.py b/looper/__init__.py index fe751d02d..5db46a828 100644 --- a/looper/__init__.py +++ b/looper/__init__.py @@ -25,6 +25,7 @@ write_sample_yaml_cwl, write_sample_yaml_prj, write_custom_template, + # write_local_pipestat_config, ) from .const import * from .pipeline_interface import PipelineInterface diff --git a/looper/cli_pydantic.py b/looper/cli_pydantic.py index 924a6c7eb..f25e6ebd5 100644 --- a/looper/cli_pydantic.py +++ b/looper/cli_pydantic.py @@ -241,6 +241,10 @@ def run_looper(args: TopLevelParser, parser: ArgumentParser, test_args=None): selector_flag=subcommand_args.sel_flag, exclusion_flag=subcommand_args.exc_flag, ) as prj: + + # Check at the beginning if user wants to use pipestat and pipestat is configurable + is_pipestat_configured = prj._check_if_pipestat_configured_2() + if subcommand_name in ["run", "rerun"]: rerun = subcommand_name == "rerun" run = Runner(prj) diff --git a/looper/conductor.py b/looper/conductor.py index 0ebf554b9..84ee24963 100644 --- a/looper/conductor.py +++ b/looper/conductor.py @@ -90,6 +90,13 @@ def write_pipestat_config(looper_pipestat_config_path, pipestat_config_dict): :param dict pipestat_config_dict: the dict containing key value pairs to be written to the pipestat configutation return bool """ + + if not os.path.exists(os.path.dirname(looper_pipestat_config_path)): + try: + os.makedirs(os.path.dirname(looper_pipestat_config_path)) + except FileExistsError: + pass + with open(looper_pipestat_config_path, "w") as f: yaml.dump(pipestat_config_dict, f) _LOGGER.debug( diff --git a/looper/plugins.py b/looper/plugins.py index dc34283e0..b0c4a246c 100644 --- a/looper/plugins.py +++ b/looper/plugins.py @@ -158,3 +158,12 @@ def write_sample_yaml(namespaces): ) sample.to_yaml(sample["sample_yaml_path"], add_prj_ref=False) return {"sample": sample} + + +# def write_local_pipestat_config(namespaces): +# +# config_path = "" +# +# print(config_path) +# +# return config_path diff --git a/looper/project.py b/looper/project.py index fba207e9c..1df8f447d 100644 --- a/looper/project.py +++ b/looper/project.py @@ -468,6 +468,128 @@ def get_pipestat_managers(self, sample_name=None, project_level=False): for pipeline_name, pipestat_vars in pipestat_configs.items() } + def _check_if_pipestat_configured_2(self): + + # First check if pipestat key is in looper_config, if not return false + + if PIPESTAT_KEY not in self[EXTRA_KEY]: + return False + else: + # If pipestat key is available assume user desires pipestat usage + # This should return True OR raise an exception at this point. + return self._get_pipestat_configuration2() + + def _get_pipestat_configuration2(self): + + # First check if it already exists + print("DEBUG!") + + for piface in self.pipeline_interfaces: + print(piface) + # first check if this piface has a psm? + + if not self._check_for_existing_pipestat_config(piface): + self._create_pipestat_config(piface) + + return True + + def _check_for_existing_pipestat_config(self, piface): + """ + + config files should be in looper output directory and named as: + + pipestat_config_pipelinename.yaml + + """ + + # Cannot do much if we cannot retrieve the pipeline_name + try: + pipeline_name = piface.data["pipeline_name"] + except KeyError: + raise Exception( + "To use pipestat, a pipeline_name must be set in the pipeline interface." + ) + + config_file_name = f"pipestat_config_{pipeline_name}.yaml" + output_dir = expandpath(self.output_dir) + + config_file_path = os.path.join( + # os.path.dirname(output_dir), config_file_name + output_dir, + config_file_name, + ) + + return os.path.exists(config_file_path) + + def _create_pipestat_config(self, piface): + """ + Each piface needs its own config file and associated psm + """ + + if PIPESTAT_KEY in self[EXTRA_KEY]: + pipestat_config_dict = self[EXTRA_KEY][PIPESTAT_KEY] + else: + _LOGGER.debug( + f"'{PIPESTAT_KEY}' not found in '{LOOPER_KEY}' section of the " + f"project configuration file." + ) + # We cannot use pipestat without it being defined in the looper config file. + raise ValueError + + # Expand paths in the event ENV variables were used in config files + output_dir = expandpath(self.output_dir) + + pipestat_config_dict.update({"output_dir": output_dir}) + + if "output_schema" in piface.data: + schema_path = expandpath(piface.data["output_schema"]) + if not os.path.isabs(schema_path): + # Get path relative to the pipeline_interface + schema_path = os.path.join( + os.path.dirname(piface.pipe_iface_file), schema_path + ) + pipestat_config_dict.update({"schema_path": schema_path}) + if "pipeline_name" in piface.data: + pipeline_name = piface.data["pipeline_name"] + pipestat_config_dict.update({"pipeline_name": piface.data["pipeline_name"]}) + if "pipeline_type" in piface.data: + pipestat_config_dict.update({"pipeline_type": piface.data["pipeline_type"]}) + + try: + # TODO if user gives non-absolute path should we force results to be in a pipeline folder? + # TODO otherwise pipelines could write to the same results file! + results_file_path = expandpath(pipestat_config_dict["results_file_path"]) + if not os.path.exists(os.path.dirname(results_file_path)): + results_file_path = os.path.join( + os.path.dirname(output_dir), results_file_path + ) + pipestat_config_dict.update({"results_file_path": results_file_path}) + except KeyError: + results_file_path = None + + try: + flag_file_dir = expandpath(pipestat_config_dict["flag_file_dir"]) + if not os.path.isabs(flag_file_dir): + flag_file_dir = os.path.join(os.path.dirname(output_dir), flag_file_dir) + pipestat_config_dict.update({"flag_file_dir": flag_file_dir}) + except KeyError: + flag_file_dir = None + + # Pipestat_dict_ is now updated from all sources and can be written to a yaml. + pipestat_config_path = os.path.join( + # os.path.dirname(output_dir), f"pipestat_config_{pipeline_name}.yaml" + output_dir, + f"pipestat_config_{pipeline_name}.yaml", + ) + + # Two end goals, create a config file + write_pipestat_config(pipestat_config_path, pipestat_config_dict) + + # piface['psm'] = PipestatManager(config_file=pipestat_config_path) + piface.psm = PipestatManager(config_file=pipestat_config_path) + + return None + def _check_if_pipestat_configured(self, project_level=False): """ A helper method determining whether pipestat configuration is complete diff --git a/tests/smoketests/test_run.py b/tests/smoketests/test_run.py index 05231f594..a94fa9b8a 100644 --- a/tests/smoketests/test_run.py +++ b/tests/smoketests/test_run.py @@ -443,6 +443,35 @@ def test_looper_command_templates_hooks(self, prep_temp_pep, cmd): sd = os.path.join(get_outdir(tp), "submission") verify_filecount_in_dir(sd, "test.txt", 3) + # @pytest.mark.parametrize( + # "plugin,appendix", + # [ + # ("looper.write_local_pipestat_config", "submission.yaml"), + # ], + # ) + # def test_looper_pipestat_plugins(self, prep_temp_pep_pipestat, plugin, appendix): + # # tp = prep_temp_pep + # tp = prep_temp_pep_pipestat + # pep_dir = os.path.dirname(tp) + # pipeline_interface1 = os.path.join( + # pep_dir, "pipeline_pipestat/pipeline_interface.yaml" + # ) + # + # with mod_yaml_data(pipeline_interface1) as piface_data: + # piface_data.update({PRE_SUBMIT_HOOK_KEY: {}}) + # piface_data[PRE_SUBMIT_HOOK_KEY].update({PRE_SUBMIT_PY_FUN_KEY: {}}) + # piface_data[PRE_SUBMIT_HOOK_KEY][PRE_SUBMIT_PY_FUN_KEY] = [plugin] + # + # # x = test_args_expansion(tp, "run") + # x = ["run", "--looper-config", tp, "--dry-run"] + # # x.pop(-1) + # try: + # main(test_args=x) + # except Exception as err: + # raise pytest.fail(f"DID RAISE {err}") + # sd = os.path.join(get_outdir(tp), "submission") + # verify_filecount_in_dir(sd, appendix, 3) + class TestLooperRunSubmissionScript: def test_looper_run_produces_submission_scripts(self, prep_temp_pep): From 1ce86fb143d45a8e9eafc4483eff83fe0c4c94f7 Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Thu, 23 May 2024 10:37:51 -0400 Subject: [PATCH 02/26] fix bug with pipestat key present but value being None --- looper/project.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/looper/project.py b/looper/project.py index 1df8f447d..132d35bb0 100644 --- a/looper/project.py +++ b/looper/project.py @@ -474,10 +474,13 @@ def _check_if_pipestat_configured_2(self): if PIPESTAT_KEY not in self[EXTRA_KEY]: return False - else: - # If pipestat key is available assume user desires pipestat usage - # This should return True OR raise an exception at this point. - return self._get_pipestat_configuration2() + elif PIPESTAT_KEY in self[EXTRA_KEY]: + if self[EXTRA_KEY][PIPESTAT_KEY] is None: + return False + else: + # If pipestat key is available assume user desires pipestat usage + # This should return True OR raise an exception at this point. + return self._get_pipestat_configuration2() def _get_pipestat_configuration2(self): From 8040e299c5c763d273cee4f648d6d13836868fbb Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Thu, 23 May 2024 10:48:47 -0400 Subject: [PATCH 03/26] use_pipestat = is_pipestat_configured --- looper/cli_pydantic.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/looper/cli_pydantic.py b/looper/cli_pydantic.py index f25e6ebd5..cb7e9e90e 100644 --- a/looper/cli_pydantic.py +++ b/looper/cli_pydantic.py @@ -275,11 +275,13 @@ def run_looper(args: TopLevelParser, parser: ArgumentParser, test_args=None): if subcommand_name == "destroy": return Destroyer(prj)(subcommand_args) - use_pipestat = ( - prj.pipestat_configured_project - if getattr(subcommand_args, "project", None) - else prj.pipestat_configured - ) + # use_pipestat = ( + # prj.pipestat_configured_project + # if getattr(subcommand_args, "project", None) + # else prj.pipestat_configured + # ) + + use_pipestat = is_pipestat_configured if subcommand_name == "table": if use_pipestat: From 396ea40f61b00ecd935dfc6cb0390ca70906abaa Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Thu, 23 May 2024 11:45:10 -0400 Subject: [PATCH 04/26] ensure a pipestatmanager is created if a config file does already exist --- looper/conductor.py | 6 +++--- looper/looper.py | 6 ++---- looper/project.py | 13 ++++++++++--- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/looper/conductor.py b/looper/conductor.py index 84ee24963..813270a18 100644 --- a/looper/conductor.py +++ b/looper/conductor.py @@ -300,12 +300,12 @@ def add_sample(self, sample, rerun=False): ) ) if self.prj.pipestat_configured: - psms = self.prj.get_pipestat_managers(sample_name=sample.sample_name) - sample_statuses = psms[self.pl_name].get_status( + # psms = self.prj.get_pipestat_managers(sample_name=sample.sample_name) + sample_statuses = self.pl_iface.psm.get_status( record_identifier=sample.sample_name ) if sample_statuses == "failed" and rerun is True: - psms[self.pl_name].set_status( + self.pl_iface.psm.set_status( record_identifier=sample.sample_name, status_identifier="waiting" ) sample_statuses = "waiting" diff --git a/looper/looper.py b/looper/looper.py index b044ef1d9..bb5508f93 100755 --- a/looper/looper.py +++ b/looper/looper.py @@ -423,10 +423,8 @@ def __call__(self, args, top_level_args=None, rerun=False, **compute_kwargs): ) submission_conductors[piface.pipe_iface_file] = conductor - _LOGGER.info(f"Pipestat compatible: {self.prj.pipestat_configured_project}") - self.debug["Pipestat compatible"] = ( - self.prj.pipestat_configured_project or self.prj.pipestat_configured - ) + _LOGGER.info(f"Pipestat compatible: {self.prj.pipestat_configured}") + self.debug["Pipestat compatible"] = self.prj.pipestat_configured for sample in select_samples(prj=self.prj, args=args): pl_fails = [] diff --git a/looper/project.py b/looper/project.py index 132d35bb0..3b4ba1224 100644 --- a/looper/project.py +++ b/looper/project.py @@ -340,7 +340,7 @@ def pipestat_configured(self): :return bool: whether pipestat configuration is complete """ - return self._check_if_pipestat_configured() + return self._check_if_pipestat_configured_2() @cached_property def pipestat_configured_project(self): @@ -491,8 +491,12 @@ def _get_pipestat_configuration2(self): print(piface) # first check if this piface has a psm? - if not self._check_for_existing_pipestat_config(piface): + pipestat_config_path = self._check_for_existing_pipestat_config(piface) + + if not pipestat_config_path: self._create_pipestat_config(piface) + else: + piface.psm = PipestatManager(config_file=pipestat_config_path) return True @@ -522,7 +526,10 @@ def _check_for_existing_pipestat_config(self, piface): config_file_name, ) - return os.path.exists(config_file_path) + if os.path.exists(config_file_path): + return config_file_path + else: + return None def _create_pipestat_config(self, piface): """ From 970f18be882ec30617c06844dba03851a6de5947 Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Thu, 23 May 2024 12:11:24 -0400 Subject: [PATCH 05/26] modify comprehensive tests to have correct paths --- tests/test_comprehensive.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tests/test_comprehensive.py b/tests/test_comprehensive.py index 5026eb538..65566d578 100644 --- a/tests/test_comprehensive.py +++ b/tests/test_comprehensive.py @@ -66,6 +66,8 @@ def test_comprehensive_looper_pipestat(prep_temp_pep_pipestat): # open up the project config and replace the derived attributes with the path to the data. In a way, this simulates using the environment variables. pipestat_project_file = get_project_config_path(path_to_looper_config) + pipestat_pipeline_interface_file = os.path.join(pipestat_dir, "pipeline_pipestat/pipeline_interface.yaml") + with open(pipestat_project_file, "r") as f: pipestat_project_data = safe_load(f) @@ -73,6 +75,11 @@ def test_comprehensive_looper_pipestat(prep_temp_pep_pipestat): os.path.join(pipestat_dir, "data/{sample_name}.txt") ) + with open(pipestat_pipeline_interface_file, "r") as f: + pipestat_piface_data = safe_load(f) + + pipeline_name = pipestat_piface_data["pipeline_name"] + with open(pipestat_project_file, "w") as f: dump(pipestat_project_data, f) @@ -92,7 +99,8 @@ def test_comprehensive_looper_pipestat(prep_temp_pep_pipestat): # looper cannot create flags, the pipeline or pipestat does that # if you do not specify flag dir, pipestat places them in the same dir as config file - path_to_pipestat_config = os.path.join(pipestat_dir, "looper_pipestat_config.yaml") + path_to_pipestat_config = os.path.join(pipestat_dir, f"results/pipestat_config_{pipeline_name}.yaml") + # pipestat_config_example_pipestat_pipeline.yaml psm = PipestatManager(config_file=path_to_pipestat_config) psm.set_status(record_identifier="frog_1", status_identifier="completed") psm.set_status(record_identifier="frog_2", status_identifier="completed") From 99c2e24c703dcb47b0cd762eb3659ce05689e5fe Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Thu, 23 May 2024 13:36:22 -0400 Subject: [PATCH 06/26] refactor Checker code --- looper/looper.py | 55 +++++++++++++++++++++++++------------ tests/test_comprehensive.py | 10 +++++-- 2 files changed, 45 insertions(+), 20 deletions(-) diff --git a/looper/looper.py b/looper/looper.py index bb5508f93..9788a45d0 100755 --- a/looper/looper.py +++ b/looper/looper.py @@ -88,21 +88,41 @@ def __call__(self, args): # aggregate pipeline status data status = {} + + psms = {} if getattr(args, "project", None): - psms = self.prj.get_pipestat_managers(project_level=True) - for pipeline_name, psm in psms.items(): - s = psm.get_status() or "unknown" - status.setdefault(pipeline_name, {}) - status[pipeline_name][self.prj.name] = s - _LOGGER.debug(f"{self.prj.name} ({pipeline_name}): {s}") + # psms = self.prj.get_pipestat_managers(project_level=True) + # for pipeline_name, psm in psms.items(): + # s = psm.get_status() or "unknown" + # status.setdefault(pipeline_name, {}) + # status[pipeline_name][self.prj.name] = s + # _LOGGER.debug(f"{self.prj.name} ({pipeline_name}): {s}") + + for piface in self.prj.pipeline_interfaces: + if piface.psm.pipeline_type == "project": + psms[piface.psm.pipeline_name] = piface.psm + s = piface.psm.get_status() or "unknown" + status.setdefault(piface.psm.pipeline_name, {}) + status[piface.psm.pipeline_name][self.prj.name] = s + _LOGGER.debug(f"{self.prj.name} ({piface.psm.pipeline_name}): {s}") + else: for sample in self.prj.samples: - psms = self.prj.get_pipestat_managers(sample_name=sample.sample_name) - for pipeline_name, psm in psms.items(): - s = psm.get_status(record_identifier=sample.sample_name) - status.setdefault(pipeline_name, {}) - status[pipeline_name][sample.sample_name] = s - _LOGGER.debug(f"{sample.sample_name} ({pipeline_name}): {s}") + for piface in sample.project.pipeline_interfaces: + if piface.psm.pipeline_type == "sample": + psms[piface.psm.pipeline_name] = piface.psm + s = piface.psm.get_status(record_identifier=sample.sample_name) + status.setdefault(piface.psm.pipeline_name, {}) + status[piface.psm.pipeline_name][sample.sample_name] = s + _LOGGER.debug( + f"{sample.sample_name} ({piface.psm.pipeline_name}): {s}" + ) + # psms = self.prj.get_pipestat_managers(sample_name=sample.sample_name) + # for pipeline_name, psm in psms.items(): + # s = psm.get_status(record_identifier=sample.sample_name) + # status.setdefault(pipeline_name, {}) + # status[pipeline_name][sample.sample_name] = s + # _LOGGER.debug(f"{sample.sample_name} ({pipeline_name}): {s}") console = Console() @@ -116,7 +136,7 @@ def __call__(self, args): ) table.add_column(f"Status", justify="center") table.add_column("Jobs count/total jobs", justify="center") - for status_id in psm.status_schema.keys(): + for status_id in psms[pipeline_name].status_schema.keys(): status_list = list(pipeline_status.values()) if status_id in status_list: status_count = status_list.count(status_id) @@ -141,7 +161,7 @@ def __call__(self, args): for name, status_id in pipeline_status.items(): try: color = Color.from_rgb( - *psm.status_schema[status_id]["color"] + *psms[pipeline_name].status_schema[status_id]["color"] ).name except KeyError: color = "#bcbcbc" @@ -150,16 +170,17 @@ def __call__(self, args): console.print(table) if args.describe_codes: + # TODO this needs to be redone because it only takes the last psm in the list and gets status code and descriptions table = Table( show_header=True, header_style="bold magenta", title=f"Status codes description", - width=len(psm.status_schema_source) + 20, - caption=f"Descriptions source: {psm.status_schema_source}", + width=len(psms[pipeline_name].status_schema_source) + 20, + caption=f"Descriptions source: {psms[pipeline_name].status_schema_source}", ) table.add_column("Status code", justify="center") table.add_column("Description", justify="left") - for status, status_obj in psm.status_schema.items(): + for status, status_obj in psms[pipeline_name].status_schema.items(): if "description" in status_obj: desc = status_obj["description"] else: diff --git a/tests/test_comprehensive.py b/tests/test_comprehensive.py index 65566d578..eb22fb608 100644 --- a/tests/test_comprehensive.py +++ b/tests/test_comprehensive.py @@ -66,7 +66,9 @@ def test_comprehensive_looper_pipestat(prep_temp_pep_pipestat): # open up the project config and replace the derived attributes with the path to the data. In a way, this simulates using the environment variables. pipestat_project_file = get_project_config_path(path_to_looper_config) - pipestat_pipeline_interface_file = os.path.join(pipestat_dir, "pipeline_pipestat/pipeline_interface.yaml") + pipestat_pipeline_interface_file = os.path.join( + pipestat_dir, "pipeline_pipestat/pipeline_interface.yaml" + ) with open(pipestat_project_file, "r") as f: pipestat_project_data = safe_load(f) @@ -78,7 +80,7 @@ def test_comprehensive_looper_pipestat(prep_temp_pep_pipestat): with open(pipestat_pipeline_interface_file, "r") as f: pipestat_piface_data = safe_load(f) - pipeline_name = pipestat_piface_data["pipeline_name"] + pipeline_name = pipestat_piface_data["pipeline_name"] with open(pipestat_project_file, "w") as f: dump(pipestat_project_data, f) @@ -99,7 +101,9 @@ def test_comprehensive_looper_pipestat(prep_temp_pep_pipestat): # looper cannot create flags, the pipeline or pipestat does that # if you do not specify flag dir, pipestat places them in the same dir as config file - path_to_pipestat_config = os.path.join(pipestat_dir, f"results/pipestat_config_{pipeline_name}.yaml") + path_to_pipestat_config = os.path.join( + pipestat_dir, f"results/pipestat_config_{pipeline_name}.yaml" + ) # pipestat_config_example_pipestat_pipeline.yaml psm = PipestatManager(config_file=path_to_pipestat_config) psm.set_status(record_identifier="frog_1", status_identifier="completed") From f54f85daaec6286e528e285b60638d4ff6ca8f37 Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Thu, 23 May 2024 14:02:42 -0400 Subject: [PATCH 07/26] refactor Reporter code --- looper/looper.py | 58 +++++++++++++++++++++++++++++++----------------- 1 file changed, 38 insertions(+), 20 deletions(-) diff --git a/looper/looper.py b/looper/looper.py index 9788a45d0..734eb1c48 100755 --- a/looper/looper.py +++ b/looper/looper.py @@ -583,35 +583,53 @@ def __call__(self, args): portable = args.portable + psms = {} + if project_level: - psms = self.prj.get_pipestat_managers(project_level=True) - print(psms) - for name, psm in psms.items(): - # Summarize will generate the static HTML Report Function - report_directory = psm.summarize( - looper_samples=self.prj.samples, portable=portable - ) + # psms = self.prj.get_pipestat_managers(project_level=True) + # print(psms) + # for name, psm in psms.items(): + # # Summarize will generate the static HTML Report Function + # report_directory = psm.summarize( + # looper_samples=self.prj.samples, portable=portable + # ) + + for piface in self.prj.pipeline_interfaces: + if piface.psm.pipeline_type == "project": + psms[piface.psm.pipeline_name] = piface.psm + report_directory = piface.psm.summarize( + looper_samples=self.prj.samples, portable=portable + ) print(f"Report directory: {report_directory}") self.debug["report_directory"] = report_directory return self.debug else: - for piface_source_samples in self.prj._samples_by_piface( - self.prj.piface_key - ).values(): - # For each piface_key, we have a list of samples, but we only need one sample from the list to - # call the related pipestat manager object which will pull ALL samples when using psm.summarize - first_sample_name = list(piface_source_samples)[0] - psms = self.prj.get_pipestat_managers( - sample_name=first_sample_name, project_level=False - ) - print(psms) - for name, psm in psms.items(): - # Summarize will generate the static HTML Report Function - report_directory = psm.summarize( + for piface in self.prj.pipeline_interfaces: + if piface.psm.pipeline_type == "sample": + psms[piface.psm.pipeline_name] = piface.psm + report_directory = piface.psm.summarize( looper_samples=self.prj.samples, portable=portable ) print(f"Report directory: {report_directory}") self.debug["report_directory"] = report_directory + + # for piface_source_samples in self.prj._samples_by_piface( + # self.prj.piface_key + # ).values(): + # # For each piface_key, we have a list of samples, but we only need one sample from the list to + # # call the related pipestat manager object which will pull ALL samples when using psm.summarize + # first_sample_name = list(piface_source_samples)[0] + # psms = self.prj.get_pipestat_managers( + # sample_name=first_sample_name, project_level=False + # ) + # print(psms) + # for name, psm in psms.items(): + # # Summarize will generate the static HTML Report Function + # report_directory = psm.summarize( + # looper_samples=self.prj.samples, portable=portable + # ) + # print(f"Report directory: {report_directory}") + # self.debug["report_directory"] = report_directory return self.debug From 946b408f319e2472e5c41dab6355b389862618aa Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Thu, 23 May 2024 14:17:22 -0400 Subject: [PATCH 08/26] refactor Linker and Tabulator --- looper/looper.py | 79 ++++++++++++++++++++++++++++++------------------ 1 file changed, 50 insertions(+), 29 deletions(-) diff --git a/looper/looper.py b/looper/looper.py index 734eb1c48..fa8785ff2 100755 --- a/looper/looper.py +++ b/looper/looper.py @@ -642,23 +642,35 @@ def __call__(self, args): project_level = getattr(args, "project", None) link_dir = getattr(args, "output_dir", None) + psms = {} + if project_level: - psms = self.prj.get_pipestat_managers(project_level=True) - for name, psm in psms.items(): - linked_results_path = psm.link(link_dir=link_dir) - print(f"Linked directory: {linked_results_path}") + for piface in self.prj.pipeline_interfaces: + if piface.psm.pipeline_type == "project": + psms[piface.psm.pipeline_name] = piface.psm + linked_results_path = piface.psm.link(link_dir=link_dir) + print(f"Linked directory: {linked_results_path}") + # psms = self.prj.get_pipestat_managers(project_level=True) + # for name, psm in psms.items(): + # linked_results_path = psm.link(link_dir=link_dir) + # print(f"Linked directory: {linked_results_path}") else: - for piface_source_samples in self.prj._samples_by_piface( - self.prj.piface_key - ).values(): - # For each piface_key, we have a list of samples, but we only need one sample from the list to - # call the related pipestat manager object which will pull ALL samples when using psm.summarize - first_sample_name = list(piface_source_samples)[0] - psms = self.prj.get_pipestat_managers( - sample_name=first_sample_name, project_level=False - ) - for name, psm in psms.items(): - linked_results_path = psm.link(link_dir=link_dir) + # for piface_source_samples in self.prj._samples_by_piface( + # self.prj.piface_key + # ).values(): + # # For each piface_key, we have a list of samples, but we only need one sample from the list to + # # call the related pipestat manager object which will pull ALL samples when using psm.summarize + # first_sample_name = list(piface_source_samples)[0] + # psms = self.prj.get_pipestat_managers( + # sample_name=first_sample_name, project_level=False + # ) + # for name, psm in psms.items(): + # linked_results_path = psm.link(link_dir=link_dir) + # print(f"Linked directory: {linked_results_path}") + for piface in self.prj.pipeline_interfaces: + if piface.psm.pipeline_type == "sample": + psms[piface.psm.pipeline_name] = piface.psm + linked_results_path = piface.psm.link(link_dir=link_dir) print(f"Linked directory: {linked_results_path}") @@ -672,22 +684,31 @@ def __call__(self, args): # p = self.prj project_level = getattr(args, "project", None) results = [] + psms = {} if project_level: - psms = self.prj.get_pipestat_managers(project_level=True) - for name, psm in psms.items(): - results = psm.table() + for piface in self.prj.pipeline_interfaces: + if piface.psm.pipeline_type == "project": + psms[piface.psm.pipeline_name] = piface.psm + results = piface.psm.table() + # psms = self.prj.get_pipestat_managers(project_level=True) + # for name, psm in psms.items(): + # results = psm.table() else: - for piface_source_samples in self.prj._samples_by_piface( - self.prj.piface_key - ).values(): - # For each piface_key, we have a list of samples, but we only need one sample from the list to - # call the related pipestat manager object which will pull ALL samples when using psm.table - first_sample_name = list(piface_source_samples)[0] - psms = self.prj.get_pipestat_managers( - sample_name=first_sample_name, project_level=False - ) - for name, psm in psms.items(): - results = psm.table() + # for piface_source_samples in self.prj._samples_by_piface( + # self.prj.piface_key + # ).values(): + # # For each piface_key, we have a list of samples, but we only need one sample from the list to + # # call the related pipestat manager object which will pull ALL samples when using psm.table + # first_sample_name = list(piface_source_samples)[0] + # psms = self.prj.get_pipestat_managers( + # sample_name=first_sample_name, project_level=False + # ) + # for name, psm in psms.items(): + # results = psm.table() + for piface in self.prj.pipeline_interfaces: + if piface.psm.pipeline_type == "sample": + psms[piface.psm.pipeline_name] = piface.psm + results = piface.psm.table() # Results contains paths to stats and object summaries. return results From 9989c9ff893e44a088a7d431633e7d278db697e3 Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Thu, 23 May 2024 14:24:52 -0400 Subject: [PATCH 09/26] refactor Destroyer --- looper/looper.py | 67 ++++++++++++++++++++++++++---------------------- 1 file changed, 37 insertions(+), 30 deletions(-) diff --git a/looper/looper.py b/looper/looper.py index fa8785ff2..3145c90f6 100755 --- a/looper/looper.py +++ b/looper/looper.py @@ -263,7 +263,7 @@ def __call__(self, args, preview_flag=True): """ use_pipestat = ( - self.prj.pipestat_configured_project + self.prj.pipestat_configured if getattr(args, "project", None) else self.prj.pipestat_configured ) @@ -748,8 +748,12 @@ def destroy_summary(prj, dry_run=False, project_level=False): This function is for use with pipestat configured projects. """ + psms = {} if project_level: - psms = prj.get_pipestat_managers(project_level=True) + for piface in prj.pipeline_interfaces: + if piface.psm.pipeline_type == "project": + psms[piface.psm.pipeline_name] = piface.psm + for name, psm in psms.items(): _remove_or_dry_run( [ @@ -773,35 +777,38 @@ def destroy_summary(prj, dry_run=False, project_level=False): dry_run, ) else: - for piface_source_samples in prj._samples_by_piface(prj.piface_key).values(): - # For each piface_key, we have a list of samples, but we only need one sample from the list to - # call the related pipestat manager object which will pull ALL samples when using psm.table - first_sample_name = list(piface_source_samples)[0] - psms = prj.get_pipestat_managers( - sample_name=first_sample_name, project_level=False + for piface in prj.pipeline_interfaces: + if piface.psm.pipeline_type == "sample": + psms[piface.psm.pipeline_name] = piface.psm + # for piface_source_samples in prj._samples_by_piface(prj.piface_key).values(): + # # For each piface_key, we have a list of samples, but we only need one sample from the list to + # # call the related pipestat manager object which will pull ALL samples when using psm.table + # first_sample_name = list(piface_source_samples)[0] + # psms = prj.get_pipestat_managers( + # sample_name=first_sample_name, project_level=False + # ) + for name, psm in psms.items(): + _remove_or_dry_run( + [ + get_file_for_table( + psm, pipeline_name=psm.pipeline_name, directory="reports" + ), + get_file_for_table( + psm, + pipeline_name=psm.pipeline_name, + appendix="stats_summary.tsv", + ), + get_file_for_table( + psm, + pipeline_name=psm.pipeline_name, + appendix="objs_summary.yaml", + ), + os.path.join( + os.path.dirname(psm.config_path), "aggregate_results.yaml" + ), + ], + dry_run, ) - for name, psm in psms.items(): - _remove_or_dry_run( - [ - get_file_for_table( - psm, pipeline_name=psm.pipeline_name, directory="reports" - ), - get_file_for_table( - psm, - pipeline_name=psm.pipeline_name, - appendix="stats_summary.tsv", - ), - get_file_for_table( - psm, - pipeline_name=psm.pipeline_name, - appendix="objs_summary.yaml", - ), - os.path.join( - os.path.dirname(psm.config_path), "aggregate_results.yaml" - ), - ], - dry_run, - ) class LooperCounter(object): From 08e4fa0f74eadb36a783b22e5c9f6400ea6d8b52 Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Thu, 23 May 2024 18:00:40 -0400 Subject: [PATCH 10/26] attempt refactor for obtaining project level pifaces and associated psms --- looper/conductor.py | 6 +++++- looper/looper.py | 30 +++++++++++++++--------------- looper/project.py | 40 ++++++++++++++++++++++++++++------------ 3 files changed, 48 insertions(+), 28 deletions(-) diff --git a/looper/conductor.py b/looper/conductor.py index 813270a18..f54fe63db 100644 --- a/looper/conductor.py +++ b/looper/conductor.py @@ -273,8 +273,12 @@ def is_project_submittable(self, force=False): :param bool frorce: whether to force the project submission (ignore status/flags) """ + psms = {} if self.prj.pipestat_configured_project: - psm = self.prj.get_pipestat_managers(project_level=True)[self.pl_name] + for piface in self.prj.project_pipeline_interfaces: + if piface.psm.pipeline_type == "project": + psms[piface.psm.pipeline_name] = piface.psm + psm = psms[self.pl_name] status = psm.get_status() if not force and status is not None: _LOGGER.info(f"> Skipping project. Determined status: {status}") diff --git a/looper/looper.py b/looper/looper.py index 3145c90f6..1cad8ff48 100755 --- a/looper/looper.py +++ b/looper/looper.py @@ -339,7 +339,7 @@ def __call__(self, args, **compute_kwargs): """ jobs = 0 self.debug = {} - project_pifaces = self.prj.project_pipeline_interface_sources + project_pifaces = self.prj.project_pipeline_interfaces if not project_pifaces: raise MisconfigurationException( "Looper requires a pointer to at least one project pipeline. " @@ -349,27 +349,27 @@ def __call__(self, args, **compute_kwargs): ) self.counter = LooperCounter(len(project_pifaces)) for project_piface in project_pifaces: - try: - project_piface_object = PipelineInterface( - project_piface, pipeline_type="project" - ) - except (IOError, ValidationError) as e: - _LOGGER.warning( - "Ignoring invalid pipeline interface source: {}. " - "Caught exception: {}".format( - project_piface, getattr(e, "message", repr(e)) - ) - ) - continue + # try: + # project_piface_object = PipelineInterface( + # project_piface, pipeline_type="project" + # ) + # except (IOError, ValidationError) as e: + # _LOGGER.warning( + # "Ignoring invalid pipeline interface source: {}. " + # "Caught exception: {}".format( + # project_piface, getattr(e, "message", repr(e)) + # ) + # ) + # continue _LOGGER.info( self.counter.show( name=self.prj.name, type="project", - pipeline_name=project_piface_object.pipeline_name, + pipeline_name=project_piface.pipeline_name, ) ) conductor = SubmissionConductor( - pipeline_interface=project_piface_object, + pipeline_interface=project_piface, prj=self.prj, compute_variables=compute_kwargs, delay=getattr(args, "time_delay", None), diff --git a/looper/project.py b/looper/project.py index 3b4ba1224..ff89f5b2f 100644 --- a/looper/project.py +++ b/looper/project.py @@ -349,7 +349,7 @@ def pipestat_configured_project(self): :return bool: whether pipestat configuration is complete """ - return self._check_if_pipestat_configured(project_level=True) + return self._check_if_pipestat_configured_2(pipeline_type="project") def get_sample_piface(self, sample_name): """ @@ -468,7 +468,7 @@ def get_pipestat_managers(self, sample_name=None, project_level=False): for pipeline_name, pipestat_vars in pipestat_configs.items() } - def _check_if_pipestat_configured_2(self): + def _check_if_pipestat_configured_2(self, pipeline_type="sample"): # First check if pipestat key is in looper_config, if not return false @@ -480,23 +480,39 @@ def _check_if_pipestat_configured_2(self): else: # If pipestat key is available assume user desires pipestat usage # This should return True OR raise an exception at this point. - return self._get_pipestat_configuration2() + return self._get_pipestat_configuration2(pipeline_type) - def _get_pipestat_configuration2(self): + def _get_pipestat_configuration2(self, pipeline_type="sample"): # First check if it already exists print("DEBUG!") - for piface in self.pipeline_interfaces: - print(piface) - # first check if this piface has a psm? + if pipeline_type == "sample": + for piface in self.pipeline_interfaces: + print(piface) + # first check if this piface has a psm? - pipestat_config_path = self._check_for_existing_pipestat_config(piface) + pipestat_config_path = self._check_for_existing_pipestat_config(piface) - if not pipestat_config_path: - self._create_pipestat_config(piface) - else: - piface.psm = PipestatManager(config_file=pipestat_config_path) + if not pipestat_config_path: + self._create_pipestat_config(piface) + else: + piface.psm = PipestatManager(config_file=pipestat_config_path) + + elif pipeline_type == "project": + for prj_piface in self.project_pipeline_interfaces: + pipestat_config_path = self._check_for_existing_pipestat_config( + prj_piface + ) + + if not pipestat_config_path: + self._create_pipestat_config(prj_piface) + else: + prj_piface.psm = PipestatManager(config_file=pipestat_config_path) + else: + _LOGGER.error( + msg="No pipeline type specified during pipestat configuration" + ) return True From 25d5d6af48d1f84406ed437a2feb554449789ef2 Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Fri, 24 May 2024 14:49:54 -0400 Subject: [PATCH 11/26] bump pipestat req to 0.9.2a1 --- requirements/requirements-all.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/requirements-all.txt b/requirements/requirements-all.txt index 108303465..ab0eff0f2 100644 --- a/requirements/requirements-all.txt +++ b/requirements/requirements-all.txt @@ -6,7 +6,7 @@ logmuse>=0.2.0 pandas>=2.0.2 pephubclient>=0.4.0 peppy>=0.40.0 -pipestat>=0.8.3a1 +pipestat>=0.9.2a1 pyyaml>=3.12 rich>=9.10.0 ubiquerg>=0.5.2 From e1b3aefb05d29b08683bc7628e162c49a5620f33 Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Fri, 24 May 2024 14:59:20 -0400 Subject: [PATCH 12/26] set multi_pipelines to True so that sample and project pipelines can be reported to the same results.yaml file. --- looper/project.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/looper/project.py b/looper/project.py index ff89f5b2f..6cd3ceee4 100644 --- a/looper/project.py +++ b/looper/project.py @@ -497,7 +497,9 @@ def _get_pipestat_configuration2(self, pipeline_type="sample"): if not pipestat_config_path: self._create_pipestat_config(piface) else: - piface.psm = PipestatManager(config_file=pipestat_config_path) + piface.psm = PipestatManager( + config_file=pipestat_config_path, multi_pipelines=True + ) elif pipeline_type == "project": for prj_piface in self.project_pipeline_interfaces: @@ -508,7 +510,9 @@ def _get_pipestat_configuration2(self, pipeline_type="sample"): if not pipestat_config_path: self._create_pipestat_config(prj_piface) else: - prj_piface.psm = PipestatManager(config_file=pipestat_config_path) + prj_piface.psm = PipestatManager( + config_file=pipestat_config_path, multi_pipelines=True + ) else: _LOGGER.error( msg="No pipeline type specified during pipestat configuration" @@ -612,7 +616,9 @@ def _create_pipestat_config(self, piface): write_pipestat_config(pipestat_config_path, pipestat_config_dict) # piface['psm'] = PipestatManager(config_file=pipestat_config_path) - piface.psm = PipestatManager(config_file=pipestat_config_path) + piface.psm = PipestatManager( + config_file=pipestat_config_path, multi_pipelines=True + ) return None From f63e1ca8eac81b3531fc9783606534a2599f92fc Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Fri, 24 May 2024 15:10:28 -0400 Subject: [PATCH 13/26] Add warning for mismatched pipeline names --- looper/project.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/looper/project.py b/looper/project.py index 6cd3ceee4..ba0881a14 100644 --- a/looper/project.py +++ b/looper/project.py @@ -3,6 +3,8 @@ import itertools import os +from yaml import safe_load + try: from functools import cached_property except ImportError: @@ -579,12 +581,30 @@ def _create_pipestat_config(self, piface): os.path.dirname(piface.pipe_iface_file), schema_path ) pipestat_config_dict.update({"schema_path": schema_path}) + try: + with open(schema_path, "r") as f: + output_schema_data = safe_load(f) + output_schema_pipeline_name = output_schema_data[ + PIPELINE_INTERFACE_PIPELINE_NAME_KEY + ] + except Exception: + output_schema_pipeline_name = None + else: + output_schema_pipeline_name = None if "pipeline_name" in piface.data: pipeline_name = piface.data["pipeline_name"] pipestat_config_dict.update({"pipeline_name": piface.data["pipeline_name"]}) + else: + pipeline_name = None if "pipeline_type" in piface.data: pipestat_config_dict.update({"pipeline_type": piface.data["pipeline_type"]}) + # Warn user if there is a mismatch in pipeline_names from sources!!! + if pipeline_name != output_schema_pipeline_name: + _LOGGER.warning( + msg=f"Pipeline name mismatch detected. Pipeline interface: {pipeline_name} Output schema: {output_schema_pipeline_name} Defaulting to pipeline_interface value." + ) + try: # TODO if user gives non-absolute path should we force results to be in a pipeline folder? # TODO otherwise pipelines could write to the same results file! From e95b62a833e0737f7da37c846a245b4d06aa0ab3 Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Mon, 3 Jun 2024 09:51:43 -0400 Subject: [PATCH 14/26] refactor _set_pipestat_namespace --- looper/conductor.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/looper/conductor.py b/looper/conductor.py index f54fe63db..378e2c9a1 100644 --- a/looper/conductor.py +++ b/looper/conductor.py @@ -548,12 +548,7 @@ def _set_pipestat_namespace( :return yacman.YAMLConfigManager: pipestat namespace """ try: - psms = ( - self.prj.get_pipestat_managers(sample_name) - if sample_name - else self.prj.get_pipestat_managers(project_level=True) - ) - psm = psms[self.pl_iface.pipeline_name] + psm = self.pl_iface.psm except (PipestatError, AttributeError) as e: # pipestat section faulty or not found in project.looper or sample # or project is missing required pipestat attributes From 2e1d1368ce776225c1716c094f2b8d215a655392 Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Mon, 3 Jun 2024 10:19:19 -0400 Subject: [PATCH 15/26] remove unused code, finish Destroyer refactor --- looper/looper.py | 8 +-- looper/project.py | 151 ---------------------------------------------- 2 files changed, 4 insertions(+), 155 deletions(-) diff --git a/looper/looper.py b/looper/looper.py index 1cad8ff48..0827a0507 100755 --- a/looper/looper.py +++ b/looper/looper.py @@ -277,7 +277,7 @@ def __call__(self, args, preview_flag=True): ) _LOGGER.info("Removing results:") - + psms = {} for sample in select_samples(prj=self.prj, args=args): _LOGGER.info(self.counter.show(sample.sample_name)) sample_output_folder = sample_folder(self.prj, sample) @@ -286,9 +286,9 @@ def __call__(self, args, preview_flag=True): _LOGGER.info(str(sample_output_folder)) else: if use_pipestat: - psms = self.prj.get_pipestat_managers( - sample_name=sample.sample_name - ) + for piface in sample.project.pipeline_interfaces: + if piface.psm.pipeline_type == "sample": + psms[piface.psm.pipeline_name] = piface.psm for pipeline_name, psm in psms.items(): psm.backend.remove_record( record_identifier=sample.sample_name, rm_record=True diff --git a/looper/project.py b/looper/project.py index ba0881a14..0155e138a 100644 --- a/looper/project.py +++ b/looper/project.py @@ -449,27 +449,6 @@ def get_schemas(pifaces, schema_key=INPUT_SCHEMA_KEY): schema_set.update([schema_file]) return list(schema_set) - def get_pipestat_managers(self, sample_name=None, project_level=False): - """ - Get a collection of pipestat managers for the selected sample or project. - - The number of pipestat managers corresponds to the number of unique - output schemas in the pipeline interfaces specified by the sample or project. - - :param str sample_name: sample name to get pipestat managers for - :param bool project_level: whether the project PipestatManagers - should be returned - :return dict[str, pipestat.PipestatManager]: a mapping of pipestat - managers by pipeline interface name - """ - pipestat_configs = self._get_pipestat_configuration( - sample_name=sample_name, project_level=project_level - ) - return { - pipeline_name: PipestatManager(**pipestat_vars) - for pipeline_name, pipestat_vars in pipestat_configs.items() - } - def _check_if_pipestat_configured_2(self, pipeline_type="sample"): # First check if pipestat key is in looper_config, if not return false @@ -642,136 +621,6 @@ def _create_pipestat_config(self, piface): return None - def _check_if_pipestat_configured(self, project_level=False): - """ - A helper method determining whether pipestat configuration is complete - - :param bool project_level: whether the project pipestat config should be checked - :return bool: whether pipestat configuration is complete - """ - try: - if project_level: - pipestat_configured = self._get_pipestat_configuration( - sample_name=None, project_level=project_level - ) - else: - for s in self.samples: - pipestat_configured = self._get_pipestat_configuration( - sample_name=s.sample_name - ) - except Exception as e: - context = ( - f"Project '{self.name}'" - if project_level - else f"Sample '{s.sample_name}'" - ) - _LOGGER.debug( - f"Pipestat configuration incomplete for {context}; " - f"caught exception: {getattr(e, 'message', repr(e))}" - ) - return False - else: - if pipestat_configured is not None and pipestat_configured != {}: - return True - else: - return False - - def _get_pipestat_configuration(self, sample_name=None, project_level=False): - """ - Get all required pipestat configuration variables from looper_config file - """ - - ret = {} - if not project_level and sample_name is None: - raise ValueError( - "Must provide the sample_name to determine the " - "sample to get the PipestatManagers for" - ) - - if PIPESTAT_KEY in self[EXTRA_KEY]: - pipestat_config_dict = self[EXTRA_KEY][PIPESTAT_KEY] - else: - _LOGGER.debug( - f"'{PIPESTAT_KEY}' not found in '{LOOPER_KEY}' section of the " - f"project configuration file." - ) - # We cannot use pipestat without it being defined in the looper config file. - raise ValueError - - # Expand paths in the event ENV variables were used in config files - output_dir = expandpath(self.output_dir) - - # Get looper user configured items first and update the pipestat_config_dict - try: - results_file_path = expandpath(pipestat_config_dict["results_file_path"]) - if not os.path.exists(os.path.dirname(results_file_path)): - results_file_path = os.path.join( - os.path.dirname(output_dir), results_file_path - ) - pipestat_config_dict.update({"results_file_path": results_file_path}) - except KeyError: - results_file_path = None - - try: - flag_file_dir = expandpath(pipestat_config_dict["flag_file_dir"]) - if not os.path.isabs(flag_file_dir): - flag_file_dir = os.path.join(os.path.dirname(output_dir), flag_file_dir) - pipestat_config_dict.update({"flag_file_dir": flag_file_dir}) - except KeyError: - flag_file_dir = None - - if sample_name: - pipestat_config_dict.update({"record_identifier": sample_name}) - - if project_level and "project_name" in pipestat_config_dict: - pipestat_config_dict.update( - {"project_name": pipestat_config_dict["project_name"]} - ) - - if project_level and "{record_identifier}" in results_file_path: - # if project level and using {record_identifier}, pipestat needs some sort of record_identifier during creation - pipestat_config_dict.update( - {"record_identifier": "default_project_record_identifier"} - ) - - pipestat_config_dict.update({"output_dir": output_dir}) - - pifaces = ( - self.project_pipeline_interfaces - if project_level - else self._interfaces_by_sample[sample_name] - ) - - for piface in pifaces: - # We must also obtain additional pipestat items from the pipeline author's piface - if "output_schema" in piface.data: - schema_path = expandpath(piface.data["output_schema"]) - if not os.path.isabs(schema_path): - # Get path relative to the pipeline_interface - schema_path = os.path.join( - os.path.dirname(piface.pipe_iface_file), schema_path - ) - pipestat_config_dict.update({"schema_path": schema_path}) - if "pipeline_name" in piface.data: - pipestat_config_dict.update( - {"pipeline_name": piface.data["pipeline_name"]} - ) - if "pipeline_type" in piface.data: - pipestat_config_dict.update( - {"pipeline_type": piface.data["pipeline_type"]} - ) - - # Pipestat_dict_ is now updated from all sources and can be written to a yaml. - looper_pipestat_config_path = os.path.join( - os.path.dirname(output_dir), "looper_pipestat_config.yaml" - ) - write_pipestat_config(looper_pipestat_config_path, pipestat_config_dict) - - ret[piface.pipeline_name] = { - "config_file": looper_pipestat_config_path, - } - return ret - def populate_pipeline_outputs(self): """ Populate project and sample output attributes based on output schemas From 55eb51fd21253147c68197d403c2c8cefa23adbe Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Mon, 3 Jun 2024 10:24:16 -0400 Subject: [PATCH 16/26] refactor func names --- looper/cli_pydantic.py | 2 +- looper/project.py | 11 +++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/looper/cli_pydantic.py b/looper/cli_pydantic.py index cb7e9e90e..5d2410277 100644 --- a/looper/cli_pydantic.py +++ b/looper/cli_pydantic.py @@ -243,7 +243,7 @@ def run_looper(args: TopLevelParser, parser: ArgumentParser, test_args=None): ) as prj: # Check at the beginning if user wants to use pipestat and pipestat is configurable - is_pipestat_configured = prj._check_if_pipestat_configured_2() + is_pipestat_configured = prj._check_if_pipestat_configured() if subcommand_name in ["run", "rerun"]: rerun = subcommand_name == "rerun" diff --git a/looper/project.py b/looper/project.py index 0155e138a..0969572f8 100644 --- a/looper/project.py +++ b/looper/project.py @@ -342,7 +342,7 @@ def pipestat_configured(self): :return bool: whether pipestat configuration is complete """ - return self._check_if_pipestat_configured_2() + return self._check_if_pipestat_configured() @cached_property def pipestat_configured_project(self): @@ -351,7 +351,7 @@ def pipestat_configured_project(self): :return bool: whether pipestat configuration is complete """ - return self._check_if_pipestat_configured_2(pipeline_type="project") + return self._check_if_pipestat_configured(pipeline_type="project") def get_sample_piface(self, sample_name): """ @@ -449,7 +449,7 @@ def get_schemas(pifaces, schema_key=INPUT_SCHEMA_KEY): schema_set.update([schema_file]) return list(schema_set) - def _check_if_pipestat_configured_2(self, pipeline_type="sample"): + def _check_if_pipestat_configured(self, pipeline_type="sample"): # First check if pipestat key is in looper_config, if not return false @@ -461,12 +461,11 @@ def _check_if_pipestat_configured_2(self, pipeline_type="sample"): else: # If pipestat key is available assume user desires pipestat usage # This should return True OR raise an exception at this point. - return self._get_pipestat_configuration2(pipeline_type) + return self._get_pipestat_configuration(pipeline_type) - def _get_pipestat_configuration2(self, pipeline_type="sample"): + def _get_pipestat_configuration(self, pipeline_type="sample"): # First check if it already exists - print("DEBUG!") if pipeline_type == "sample": for piface in self.pipeline_interfaces: From 60d807c85447cb57969a541a7d354845297570e5 Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Mon, 3 Jun 2024 10:26:54 -0400 Subject: [PATCH 17/26] remove unnecessary variable duplication --- looper/cli_pydantic.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/looper/cli_pydantic.py b/looper/cli_pydantic.py index 5d2410277..6f60d8b8b 100644 --- a/looper/cli_pydantic.py +++ b/looper/cli_pydantic.py @@ -275,34 +275,26 @@ def run_looper(args: TopLevelParser, parser: ArgumentParser, test_args=None): if subcommand_name == "destroy": return Destroyer(prj)(subcommand_args) - # use_pipestat = ( - # prj.pipestat_configured_project - # if getattr(subcommand_args, "project", None) - # else prj.pipestat_configured - # ) - - use_pipestat = is_pipestat_configured - if subcommand_name == "table": - if use_pipestat: + if is_pipestat_configured: return Tabulator(prj)(subcommand_args) else: raise PipestatConfigurationException("table") if subcommand_name == "report": - if use_pipestat: + if is_pipestat_configured: return Reporter(prj)(subcommand_args) else: raise PipestatConfigurationException("report") if subcommand_name == "link": - if use_pipestat: + if is_pipestat_configured: Linker(prj)(subcommand_args) else: raise PipestatConfigurationException("link") if subcommand_name == "check": - if use_pipestat: + if is_pipestat_configured: return Checker(prj)(subcommand_args) else: raise PipestatConfigurationException("check") From f476e68c9b3b589350dbc3929393f8b891e0fab4 Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Mon, 3 Jun 2024 10:32:10 -0400 Subject: [PATCH 18/26] remove commented code --- looper/looper.py | 86 ------------------------------------------------ 1 file changed, 86 deletions(-) diff --git a/looper/looper.py b/looper/looper.py index 0827a0507..a0aea285a 100755 --- a/looper/looper.py +++ b/looper/looper.py @@ -91,12 +91,6 @@ def __call__(self, args): psms = {} if getattr(args, "project", None): - # psms = self.prj.get_pipestat_managers(project_level=True) - # for pipeline_name, psm in psms.items(): - # s = psm.get_status() or "unknown" - # status.setdefault(pipeline_name, {}) - # status[pipeline_name][self.prj.name] = s - # _LOGGER.debug(f"{self.prj.name} ({pipeline_name}): {s}") for piface in self.prj.pipeline_interfaces: if piface.psm.pipeline_type == "project": @@ -117,12 +111,6 @@ def __call__(self, args): _LOGGER.debug( f"{sample.sample_name} ({piface.psm.pipeline_name}): {s}" ) - # psms = self.prj.get_pipestat_managers(sample_name=sample.sample_name) - # for pipeline_name, psm in psms.items(): - # s = psm.get_status(record_identifier=sample.sample_name) - # status.setdefault(pipeline_name, {}) - # status[pipeline_name][sample.sample_name] = s - # _LOGGER.debug(f"{sample.sample_name} ({pipeline_name}): {s}") console = Console() @@ -349,18 +337,6 @@ def __call__(self, args, **compute_kwargs): ) self.counter = LooperCounter(len(project_pifaces)) for project_piface in project_pifaces: - # try: - # project_piface_object = PipelineInterface( - # project_piface, pipeline_type="project" - # ) - # except (IOError, ValidationError) as e: - # _LOGGER.warning( - # "Ignoring invalid pipeline interface source: {}. " - # "Caught exception: {}".format( - # project_piface, getattr(e, "message", repr(e)) - # ) - # ) - # continue _LOGGER.info( self.counter.show( name=self.prj.name, @@ -586,13 +562,6 @@ def __call__(self, args): psms = {} if project_level: - # psms = self.prj.get_pipestat_managers(project_level=True) - # print(psms) - # for name, psm in psms.items(): - # # Summarize will generate the static HTML Report Function - # report_directory = psm.summarize( - # looper_samples=self.prj.samples, portable=portable - # ) for piface in self.prj.pipeline_interfaces: if piface.psm.pipeline_type == "project": @@ -612,24 +581,6 @@ def __call__(self, args): ) print(f"Report directory: {report_directory}") self.debug["report_directory"] = report_directory - - # for piface_source_samples in self.prj._samples_by_piface( - # self.prj.piface_key - # ).values(): - # # For each piface_key, we have a list of samples, but we only need one sample from the list to - # # call the related pipestat manager object which will pull ALL samples when using psm.summarize - # first_sample_name = list(piface_source_samples)[0] - # psms = self.prj.get_pipestat_managers( - # sample_name=first_sample_name, project_level=False - # ) - # print(psms) - # for name, psm in psms.items(): - # # Summarize will generate the static HTML Report Function - # report_directory = psm.summarize( - # looper_samples=self.prj.samples, portable=portable - # ) - # print(f"Report directory: {report_directory}") - # self.debug["report_directory"] = report_directory return self.debug @@ -650,23 +601,7 @@ def __call__(self, args): psms[piface.psm.pipeline_name] = piface.psm linked_results_path = piface.psm.link(link_dir=link_dir) print(f"Linked directory: {linked_results_path}") - # psms = self.prj.get_pipestat_managers(project_level=True) - # for name, psm in psms.items(): - # linked_results_path = psm.link(link_dir=link_dir) - # print(f"Linked directory: {linked_results_path}") else: - # for piface_source_samples in self.prj._samples_by_piface( - # self.prj.piface_key - # ).values(): - # # For each piface_key, we have a list of samples, but we only need one sample from the list to - # # call the related pipestat manager object which will pull ALL samples when using psm.summarize - # first_sample_name = list(piface_source_samples)[0] - # psms = self.prj.get_pipestat_managers( - # sample_name=first_sample_name, project_level=False - # ) - # for name, psm in psms.items(): - # linked_results_path = psm.link(link_dir=link_dir) - # print(f"Linked directory: {linked_results_path}") for piface in self.prj.pipeline_interfaces: if piface.psm.pipeline_type == "sample": psms[piface.psm.pipeline_name] = piface.psm @@ -690,21 +625,7 @@ def __call__(self, args): if piface.psm.pipeline_type == "project": psms[piface.psm.pipeline_name] = piface.psm results = piface.psm.table() - # psms = self.prj.get_pipestat_managers(project_level=True) - # for name, psm in psms.items(): - # results = psm.table() else: - # for piface_source_samples in self.prj._samples_by_piface( - # self.prj.piface_key - # ).values(): - # # For each piface_key, we have a list of samples, but we only need one sample from the list to - # # call the related pipestat manager object which will pull ALL samples when using psm.table - # first_sample_name = list(piface_source_samples)[0] - # psms = self.prj.get_pipestat_managers( - # sample_name=first_sample_name, project_level=False - # ) - # for name, psm in psms.items(): - # results = psm.table() for piface in self.prj.pipeline_interfaces: if piface.psm.pipeline_type == "sample": psms[piface.psm.pipeline_name] = piface.psm @@ -780,13 +701,6 @@ def destroy_summary(prj, dry_run=False, project_level=False): for piface in prj.pipeline_interfaces: if piface.psm.pipeline_type == "sample": psms[piface.psm.pipeline_name] = piface.psm - # for piface_source_samples in prj._samples_by_piface(prj.piface_key).values(): - # # For each piface_key, we have a list of samples, but we only need one sample from the list to - # # call the related pipestat manager object which will pull ALL samples when using psm.table - # first_sample_name = list(piface_source_samples)[0] - # psms = prj.get_pipestat_managers( - # sample_name=first_sample_name, project_level=False - # ) for name, psm in psms.items(): _remove_or_dry_run( [ From 64c81f15a330eda8b787fd75299a6fa28acfaa8d Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Mon, 3 Jun 2024 10:44:27 -0400 Subject: [PATCH 19/26] add "--project" argument back to Looper --- looper/command_models/arguments.py | 9 +++++++-- looper/command_models/commands.py | 1 + tests/test_comprehensive.py | 9 +++++++++ 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/looper/command_models/arguments.py b/looper/command_models/arguments.py index 75855511c..8c484d33d 100644 --- a/looper/command_models/arguments.py +++ b/looper/command_models/arguments.py @@ -192,13 +192,13 @@ class ArgumentEnum(enum.Enum): name="sample_pipeline_interfaces", alias="-S", default=(List, []), - description="Paths to looper sample config files", + description="Paths to looper sample pipeline interfaces", ) PROJECT_PIPELINE_INTERFACES = Argument( name="project_pipeline_interfaces", alias="-P", default=(List, []), - description="Paths to looper project config files", + description="Paths to looper project pipeline interfaces", ) AMEND = Argument( name="amend", default=(List, []), description="List of amendments to activate" @@ -276,3 +276,8 @@ class ArgumentEnum(enum.Enum): default=(bool, False), description="Makes html report portable.", ) + PROJECT_LEVEL = Argument( + name="project", + default=(bool, False), + description="Is this command executed for project-level?", + ) diff --git a/looper/command_models/commands.py b/looper/command_models/commands.py index e9025d909..233cfd0b7 100644 --- a/looper/command_models/commands.py +++ b/looper/command_models/commands.py @@ -60,6 +60,7 @@ def create_model(self) -> Type[pydantic.BaseModel]: ArgumentEnum.PIPESTAT.value, ArgumentEnum.SETTINGS.value, ArgumentEnum.AMEND.value, + ArgumentEnum.PROJECT_LEVEL.value, ] RunParser = Command( diff --git a/tests/test_comprehensive.py b/tests/test_comprehensive.py index eb22fb608..472df3e24 100644 --- a/tests/test_comprehensive.py +++ b/tests/test_comprehensive.py @@ -118,6 +118,15 @@ def test_comprehensive_looper_pipestat(prep_temp_pep_pipestat): except Exception: raise pytest.fail("DID RAISE {0}".format(Exception)) + # Now use looper check to get project level statuses + x = ["check", "--looper-config", path_to_looper_config, "--project"] + + try: + result = main(test_args=x) + assert result == {} + except Exception: + raise pytest.fail("DID RAISE {0}".format(Exception)) + # TEST LOOPER REPORT x = ["report", "--looper-config", path_to_looper_config] From ba7f03ee9dbd1df7e56c253832427a339677ad8b Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Mon, 3 Jun 2024 11:49:32 -0400 Subject: [PATCH 20/26] refactor obtaining results_file_path during pipestat configuration creation --- looper/project.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/looper/project.py b/looper/project.py index 0969572f8..5c952c72e 100644 --- a/looper/project.py +++ b/looper/project.py @@ -584,13 +584,23 @@ def _create_pipestat_config(self, piface): ) try: - # TODO if user gives non-absolute path should we force results to be in a pipeline folder? - # TODO otherwise pipelines could write to the same results file! results_file_path = expandpath(pipestat_config_dict["results_file_path"]) - if not os.path.exists(os.path.dirname(results_file_path)): - results_file_path = os.path.join( - os.path.dirname(output_dir), results_file_path - ) + + if not os.path.isabs(results_file_path): + # e.g. user configures "results.yaml" as results_file_path + if "{record_identifier}" in results_file_path: + # this is specifically to check if the user wishes tro generate a file for EACH record + if not os.path.exists(os.path.dirname(results_file_path)): + results_file_path = os.path.join(output_dir, results_file_path) + else: + if not os.path.exists(os.path.dirname(results_file_path)): + results_file_path = os.path.join( + output_dir, f"{pipeline_name}/", results_file_path + ) + else: + # Do nothing because the user has given an absolute file path + pass + pipestat_config_dict.update({"results_file_path": results_file_path}) except KeyError: results_file_path = None From 19d3130c6480c301b2359e2c62e57be4613083d6 Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Mon, 3 Jun 2024 11:55:31 -0400 Subject: [PATCH 21/26] remove commented code --- looper/conductor.py | 1 - looper/plugins.py | 9 --------- tests/smoketests/test_run.py | 29 ----------------------------- tests/test_comprehensive.py | 2 +- 4 files changed, 1 insertion(+), 40 deletions(-) diff --git a/looper/conductor.py b/looper/conductor.py index 378e2c9a1..3f04f1450 100644 --- a/looper/conductor.py +++ b/looper/conductor.py @@ -304,7 +304,6 @@ def add_sample(self, sample, rerun=False): ) ) if self.prj.pipestat_configured: - # psms = self.prj.get_pipestat_managers(sample_name=sample.sample_name) sample_statuses = self.pl_iface.psm.get_status( record_identifier=sample.sample_name ) diff --git a/looper/plugins.py b/looper/plugins.py index b0c4a246c..dc34283e0 100644 --- a/looper/plugins.py +++ b/looper/plugins.py @@ -158,12 +158,3 @@ def write_sample_yaml(namespaces): ) sample.to_yaml(sample["sample_yaml_path"], add_prj_ref=False) return {"sample": sample} - - -# def write_local_pipestat_config(namespaces): -# -# config_path = "" -# -# print(config_path) -# -# return config_path diff --git a/tests/smoketests/test_run.py b/tests/smoketests/test_run.py index a94fa9b8a..05231f594 100644 --- a/tests/smoketests/test_run.py +++ b/tests/smoketests/test_run.py @@ -443,35 +443,6 @@ def test_looper_command_templates_hooks(self, prep_temp_pep, cmd): sd = os.path.join(get_outdir(tp), "submission") verify_filecount_in_dir(sd, "test.txt", 3) - # @pytest.mark.parametrize( - # "plugin,appendix", - # [ - # ("looper.write_local_pipestat_config", "submission.yaml"), - # ], - # ) - # def test_looper_pipestat_plugins(self, prep_temp_pep_pipestat, plugin, appendix): - # # tp = prep_temp_pep - # tp = prep_temp_pep_pipestat - # pep_dir = os.path.dirname(tp) - # pipeline_interface1 = os.path.join( - # pep_dir, "pipeline_pipestat/pipeline_interface.yaml" - # ) - # - # with mod_yaml_data(pipeline_interface1) as piface_data: - # piface_data.update({PRE_SUBMIT_HOOK_KEY: {}}) - # piface_data[PRE_SUBMIT_HOOK_KEY].update({PRE_SUBMIT_PY_FUN_KEY: {}}) - # piface_data[PRE_SUBMIT_HOOK_KEY][PRE_SUBMIT_PY_FUN_KEY] = [plugin] - # - # # x = test_args_expansion(tp, "run") - # x = ["run", "--looper-config", tp, "--dry-run"] - # # x.pop(-1) - # try: - # main(test_args=x) - # except Exception as err: - # raise pytest.fail(f"DID RAISE {err}") - # sd = os.path.join(get_outdir(tp), "submission") - # verify_filecount_in_dir(sd, appendix, 3) - class TestLooperRunSubmissionScript: def test_looper_run_produces_submission_scripts(self, prep_temp_pep): diff --git a/tests/test_comprehensive.py b/tests/test_comprehensive.py index 472df3e24..c0f0f81fb 100644 --- a/tests/test_comprehensive.py +++ b/tests/test_comprehensive.py @@ -104,7 +104,7 @@ def test_comprehensive_looper_pipestat(prep_temp_pep_pipestat): path_to_pipestat_config = os.path.join( pipestat_dir, f"results/pipestat_config_{pipeline_name}.yaml" ) - # pipestat_config_example_pipestat_pipeline.yaml + psm = PipestatManager(config_file=path_to_pipestat_config) psm.set_status(record_identifier="frog_1", status_identifier="completed") psm.set_status(record_identifier="frog_2", status_identifier="completed") From 5c9ea759b63849377cc32736d407840dc90cae75 Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Mon, 3 Jun 2024 12:06:27 -0400 Subject: [PATCH 22/26] remove print statements, change info to debug --- looper/looper.py | 2 +- looper/project.py | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/looper/looper.py b/looper/looper.py index a0aea285a..934793aea 100755 --- a/looper/looper.py +++ b/looper/looper.py @@ -420,7 +420,7 @@ def __call__(self, args, top_level_args=None, rerun=False, **compute_kwargs): ) submission_conductors[piface.pipe_iface_file] = conductor - _LOGGER.info(f"Pipestat compatible: {self.prj.pipestat_configured}") + _LOGGER.debug(f"Pipestat compatible: {self.prj.pipestat_configured}") self.debug["Pipestat compatible"] = self.prj.pipestat_configured for sample in select_samples(prj=self.prj, args=args): diff --git a/looper/project.py b/looper/project.py index 5c952c72e..9eec91067 100644 --- a/looper/project.py +++ b/looper/project.py @@ -469,8 +469,6 @@ def _get_pipestat_configuration(self, pipeline_type="sample"): if pipeline_type == "sample": for piface in self.pipeline_interfaces: - print(piface) - # first check if this piface has a psm? pipestat_config_path = self._check_for_existing_pipestat_config(piface) From c1825c16cd8c85e7fec9d642fc30364a7599cb4e Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Mon, 3 Jun 2024 12:14:08 -0400 Subject: [PATCH 23/26] ensure we are checking for project level pipestat configuration if using --project --- looper/__init__.py | 1 - looper/cli_pydantic.py | 6 +++++- looper/looper.py | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/looper/__init__.py b/looper/__init__.py index 5db46a828..fe751d02d 100644 --- a/looper/__init__.py +++ b/looper/__init__.py @@ -25,7 +25,6 @@ write_sample_yaml_cwl, write_sample_yaml_prj, write_custom_template, - # write_local_pipestat_config, ) from .const import * from .pipeline_interface import PipelineInterface diff --git a/looper/cli_pydantic.py b/looper/cli_pydantic.py index 6f60d8b8b..74d6d4e31 100644 --- a/looper/cli_pydantic.py +++ b/looper/cli_pydantic.py @@ -243,7 +243,11 @@ def run_looper(args: TopLevelParser, parser: ArgumentParser, test_args=None): ) as prj: # Check at the beginning if user wants to use pipestat and pipestat is configurable - is_pipestat_configured = prj._check_if_pipestat_configured() + is_pipestat_configured = ( + prj._check_if_pipestat_configured(pipeline_type="project") + if getattr(args, "project", None) + else prj._check_if_pipestat_configured() + ) if subcommand_name in ["run", "rerun"]: rerun = subcommand_name == "rerun" diff --git a/looper/looper.py b/looper/looper.py index 934793aea..45a4bf0b0 100755 --- a/looper/looper.py +++ b/looper/looper.py @@ -251,7 +251,7 @@ def __call__(self, args, preview_flag=True): """ use_pipestat = ( - self.prj.pipestat_configured + self.prj.pipestat_configured_project if getattr(args, "project", None) else self.prj.pipestat_configured ) From 541113949b726cf2b82c5c63a5d325b5aaf4a227 Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Mon, 3 Jun 2024 12:27:21 -0400 Subject: [PATCH 24/26] reduce looper verbosity by switching to logger.debug in some places --- looper/conductor.py | 1 - looper/looper.py | 4 ++-- looper/project.py | 2 -- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/looper/conductor.py b/looper/conductor.py index 3f04f1450..7b2930326 100644 --- a/looper/conductor.py +++ b/looper/conductor.py @@ -635,7 +635,6 @@ def write_script(self, pool, size): argstring = jinja_render_template_strictly( template=templ, namespaces=namespaces ) - print(argstring) except UndefinedError as jinja_exception: _LOGGER.warning(NOT_SUB_MSG.format(str(jinja_exception))) except KeyError as e: diff --git a/looper/looper.py b/looper/looper.py index 45a4bf0b0..3edc254bc 100755 --- a/looper/looper.py +++ b/looper/looper.py @@ -493,7 +493,7 @@ def __call__(self, args, top_level_args=None, rerun=False, **compute_kwargs): len(processed_samples), num_samples ) ) - _LOGGER.info("Commands submitted: {} of {}".format(cmd_sub_total, max_cmds)) + _LOGGER.debug("Commands submitted: {} of {}".format(cmd_sub_total, max_cmds)) self.debug[DEBUG_COMMANDS] = "{} of {}".format(cmd_sub_total, max_cmds) if getattr(args, "dry_run", None): job_sub_total_if_real = job_sub_total @@ -501,7 +501,7 @@ def __call__(self, args, top_level_args=None, rerun=False, **compute_kwargs): _LOGGER.info( f"Dry run. No jobs were actually submitted, but {job_sub_total_if_real} would have been." ) - _LOGGER.info("Jobs submitted: {}".format(job_sub_total)) + _LOGGER.debug("Jobs submitted: {}".format(job_sub_total)) self.debug[DEBUG_JOBS] = job_sub_total # Restructure sample/failure data for display. diff --git a/looper/project.py b/looper/project.py index 9eec91067..46f32ad00 100644 --- a/looper/project.py +++ b/looper/project.py @@ -613,7 +613,6 @@ def _create_pipestat_config(self, piface): # Pipestat_dict_ is now updated from all sources and can be written to a yaml. pipestat_config_path = os.path.join( - # os.path.dirname(output_dir), f"pipestat_config_{pipeline_name}.yaml" output_dir, f"pipestat_config_{pipeline_name}.yaml", ) @@ -621,7 +620,6 @@ def _create_pipestat_config(self, piface): # Two end goals, create a config file write_pipestat_config(pipestat_config_path, pipestat_config_dict) - # piface['psm'] = PipestatManager(config_file=pipestat_config_path) piface.psm = PipestatManager( config_file=pipestat_config_path, multi_pipelines=True ) From d4f576c4b28f50321ec4ae6539a9d292c1fc8de7 Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Mon, 3 Jun 2024 14:44:21 -0400 Subject: [PATCH 25/26] bumbp pipestat req to v0.9.2 --- requirements/requirements-all.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/requirements-all.txt b/requirements/requirements-all.txt index ab0eff0f2..3a558c224 100644 --- a/requirements/requirements-all.txt +++ b/requirements/requirements-all.txt @@ -6,7 +6,7 @@ logmuse>=0.2.0 pandas>=2.0.2 pephubclient>=0.4.0 peppy>=0.40.0 -pipestat>=0.9.2a1 +pipestat>=0.9.2 pyyaml>=3.12 rich>=9.10.0 ubiquerg>=0.5.2 From affd8d4e206ab768b3732c9b789d55fafca8fc69 Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Mon, 3 Jun 2024 16:33:17 -0400 Subject: [PATCH 26/26] add enum for pipeline type #360 --- looper/cli_pydantic.py | 3 ++- looper/conductor.py | 3 ++- looper/const.py | 8 ++++++++ looper/looper.py | 24 +++++++++++++----------- looper/project.py | 21 +++++++++++++-------- 5 files changed, 38 insertions(+), 21 deletions(-) diff --git a/looper/cli_pydantic.py b/looper/cli_pydantic.py index 74d6d4e31..bfd189fdd 100644 --- a/looper/cli_pydantic.py +++ b/looper/cli_pydantic.py @@ -29,6 +29,7 @@ from divvy import select_divvy_config +from .const import PipelineLevel from . import __version__ from .command_models.arguments import ArgumentEnum @@ -244,7 +245,7 @@ def run_looper(args: TopLevelParser, parser: ArgumentParser, test_args=None): # Check at the beginning if user wants to use pipestat and pipestat is configurable is_pipestat_configured = ( - prj._check_if_pipestat_configured(pipeline_type="project") + prj._check_if_pipestat_configured(pipeline_type=PipelineLevel.PROJECT.value) if getattr(args, "project", None) else prj._check_if_pipestat_configured() ) diff --git a/looper/conductor.py b/looper/conductor.py index 7b2930326..ffbb1b547 100644 --- a/looper/conductor.py +++ b/looper/conductor.py @@ -27,6 +27,7 @@ from .exceptions import JobSubmissionException, SampleFailedException from .processed_project import populate_sample_paths from .utils import fetch_sample_flags, jinja_render_template_strictly +from .const import PipelineLevel _LOGGER = logging.getLogger(__name__) @@ -276,7 +277,7 @@ def is_project_submittable(self, force=False): psms = {} if self.prj.pipestat_configured_project: for piface in self.prj.project_pipeline_interfaces: - if piface.psm.pipeline_type == "project": + if piface.psm.pipeline_type == PipelineLevel.PROJECT.value: psms[piface.psm.pipeline_name] = piface.psm psm = psms[self.pl_name] status = psm.get_status() diff --git a/looper/const.py b/looper/const.py index a866f2d84..ca70851da 100644 --- a/looper/const.py +++ b/looper/const.py @@ -1,6 +1,7 @@ """ Shared project constants """ import os +from enum import Enum __author__ = "Databio lab" __email__ = "nathan@code.databio.org" @@ -268,3 +269,10 @@ def _get_apperance_dict(type, templ=APPEARANCE_BY_FLAG): "init-piface": "Initialize generic pipeline interface.", "link": "Create directory of symlinks for reported results.", } + +# Add project/sample enum + + +class PipelineLevel(Enum): + SAMPLE = "sample" + PROJECT = "project" diff --git a/looper/looper.py b/looper/looper.py index 3edc254bc..ee3670c28 100755 --- a/looper/looper.py +++ b/looper/looper.py @@ -16,6 +16,8 @@ # Need specific sequence of actions for colorama imports? from colorama import init +from .const import PipelineLevel + init() from shutil import rmtree @@ -93,7 +95,7 @@ def __call__(self, args): if getattr(args, "project", None): for piface in self.prj.pipeline_interfaces: - if piface.psm.pipeline_type == "project": + if piface.psm.pipeline_type == PipelineLevel.PROJECT.value: psms[piface.psm.pipeline_name] = piface.psm s = piface.psm.get_status() or "unknown" status.setdefault(piface.psm.pipeline_name, {}) @@ -103,7 +105,7 @@ def __call__(self, args): else: for sample in self.prj.samples: for piface in sample.project.pipeline_interfaces: - if piface.psm.pipeline_type == "sample": + if piface.psm.pipeline_type == PipelineLevel.SAMPLE.value: psms[piface.psm.pipeline_name] = piface.psm s = piface.psm.get_status(record_identifier=sample.sample_name) status.setdefault(piface.psm.pipeline_name, {}) @@ -275,7 +277,7 @@ def __call__(self, args, preview_flag=True): else: if use_pipestat: for piface in sample.project.pipeline_interfaces: - if piface.psm.pipeline_type == "sample": + if piface.psm.pipeline_type == PipelineLevel.SAMPLE.value: psms[piface.psm.pipeline_name] = piface.psm for pipeline_name, psm in psms.items(): psm.backend.remove_record( @@ -564,7 +566,7 @@ def __call__(self, args): if project_level: for piface in self.prj.pipeline_interfaces: - if piface.psm.pipeline_type == "project": + if piface.psm.pipeline_type == PipelineLevel.PROJECT.value: psms[piface.psm.pipeline_name] = piface.psm report_directory = piface.psm.summarize( looper_samples=self.prj.samples, portable=portable @@ -574,7 +576,7 @@ def __call__(self, args): return self.debug else: for piface in self.prj.pipeline_interfaces: - if piface.psm.pipeline_type == "sample": + if piface.psm.pipeline_type == PipelineLevel.SAMPLE.value: psms[piface.psm.pipeline_name] = piface.psm report_directory = piface.psm.summarize( looper_samples=self.prj.samples, portable=portable @@ -597,13 +599,13 @@ def __call__(self, args): if project_level: for piface in self.prj.pipeline_interfaces: - if piface.psm.pipeline_type == "project": + if piface.psm.pipeline_type == PipelineLevel.PROJECT.value: psms[piface.psm.pipeline_name] = piface.psm linked_results_path = piface.psm.link(link_dir=link_dir) print(f"Linked directory: {linked_results_path}") else: for piface in self.prj.pipeline_interfaces: - if piface.psm.pipeline_type == "sample": + if piface.psm.pipeline_type == PipelineLevel.SAMPLE.value: psms[piface.psm.pipeline_name] = piface.psm linked_results_path = piface.psm.link(link_dir=link_dir) print(f"Linked directory: {linked_results_path}") @@ -622,12 +624,12 @@ def __call__(self, args): psms = {} if project_level: for piface in self.prj.pipeline_interfaces: - if piface.psm.pipeline_type == "project": + if piface.psm.pipeline_type == PipelineLevel.PROJECT.value: psms[piface.psm.pipeline_name] = piface.psm results = piface.psm.table() else: for piface in self.prj.pipeline_interfaces: - if piface.psm.pipeline_type == "sample": + if piface.psm.pipeline_type == PipelineLevel.SAMPLE.value: psms[piface.psm.pipeline_name] = piface.psm results = piface.psm.table() # Results contains paths to stats and object summaries. @@ -672,7 +674,7 @@ def destroy_summary(prj, dry_run=False, project_level=False): psms = {} if project_level: for piface in prj.pipeline_interfaces: - if piface.psm.pipeline_type == "project": + if piface.psm.pipeline_type == PipelineLevel.PROJECT.value: psms[piface.psm.pipeline_name] = piface.psm for name, psm in psms.items(): @@ -699,7 +701,7 @@ def destroy_summary(prj, dry_run=False, project_level=False): ) else: for piface in prj.pipeline_interfaces: - if piface.psm.pipeline_type == "sample": + if piface.psm.pipeline_type == PipelineLevel.SAMPLE.value: psms[piface.psm.pipeline_name] = piface.psm for name, psm in psms.items(): _remove_or_dry_run( diff --git a/looper/project.py b/looper/project.py index 46f32ad00..16684ba74 100644 --- a/looper/project.py +++ b/looper/project.py @@ -28,6 +28,7 @@ from .pipeline_interface import PipelineInterface from .processed_project import populate_project_paths, populate_sample_paths from .utils import * +from .const import PipelineLevel __all__ = ["Project"] @@ -308,7 +309,7 @@ def project_pipeline_interfaces(self): :return list[looper.PipelineInterface]: list of pipeline interfaces """ return [ - PipelineInterface(pi, pipeline_type="project") + PipelineInterface(pi, pipeline_type=PipelineLevel.PROJECT.value) for pi in self.project_pipeline_interface_sources ] @@ -351,7 +352,9 @@ def pipestat_configured_project(self): :return bool: whether pipestat configuration is complete """ - return self._check_if_pipestat_configured(pipeline_type="project") + return self._check_if_pipestat_configured( + pipeline_type=PipelineLevel.PROJECT.value + ) def get_sample_piface(self, sample_name): """ @@ -449,7 +452,7 @@ def get_schemas(pifaces, schema_key=INPUT_SCHEMA_KEY): schema_set.update([schema_file]) return list(schema_set) - def _check_if_pipestat_configured(self, pipeline_type="sample"): + def _check_if_pipestat_configured(self, pipeline_type=PipelineLevel.SAMPLE.value): # First check if pipestat key is in looper_config, if not return false @@ -463,11 +466,11 @@ def _check_if_pipestat_configured(self, pipeline_type="sample"): # This should return True OR raise an exception at this point. return self._get_pipestat_configuration(pipeline_type) - def _get_pipestat_configuration(self, pipeline_type="sample"): + def _get_pipestat_configuration(self, pipeline_type=PipelineLevel.SAMPLE.value): # First check if it already exists - if pipeline_type == "sample": + if pipeline_type == PipelineLevel.SAMPLE.value: for piface in self.pipeline_interfaces: pipestat_config_path = self._check_for_existing_pipestat_config(piface) @@ -479,7 +482,7 @@ def _get_pipestat_configuration(self, pipeline_type="sample"): config_file=pipestat_config_path, multi_pipelines=True ) - elif pipeline_type == "project": + elif pipeline_type == PipelineLevel.PROJECT.value: for prj_piface in self.project_pipeline_interfaces: pipestat_config_path = self._check_for_existing_pipestat_config( prj_piface @@ -691,7 +694,7 @@ def _piface_by_samples(self): pifaces_by_sample = {} for source, sample_names in self._samples_by_interface.items(): try: - pi = PipelineInterface(source, pipeline_type="sample") + pi = PipelineInterface(source, pipeline_type=PipelineLevel.SAMPLE.value) except PipelineInterfaceConfigError as e: _LOGGER.debug(f"Skipping pipeline interface creation: {e}") else: @@ -742,7 +745,9 @@ def _samples_by_piface(self, piface_key): for source in piface_srcs: source = self._resolve_path_with_cfg(source) try: - PipelineInterface(source, pipeline_type="sample") + PipelineInterface( + source, pipeline_type=PipelineLevel.SAMPLE.value + ) except ( ValidationError, IOError,