Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to specify duplicate command names in Pipeline #174

Merged
merged 6 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sup3r/bias/bias_calc.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ def get_node_cmd(cls, config):
f"{fun_str};\n"
"t_elap = time.time() - t0;\n")

cmd = BaseCLI.add_status_cmd(config, ModuleName.BIAS_CALC, cmd)
pipeline_step = config.get('pipeline_step') or ModuleName.BIAS_CALC
cmd = BaseCLI.add_status_cmd(config, pipeline_step, cmd)
cmd += ";\'\n"

return cmd.replace('\\', '/')
Expand Down
26 changes: 18 additions & 8 deletions sup3r/bias/bias_calc_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def main(ctx, verbose):
@click.option('-v', '--verbose', is_flag=True,
help='Flag to turn on debug logging. Default is not verbose.')
@click.pass_context
def from_config(ctx, config_file, verbose=False, **__):
def from_config(ctx, config_file, verbose=False, pipeline_step=None):
"""Run sup3r bias correction calculation from a config file."""
config = BaseCLI.from_config_preflight(ModuleName.BIAS_CALC, ctx,
config_file, verbose)
Expand All @@ -56,20 +56,22 @@ def from_config(ctx, config_file, verbose=False, **__):
name = ('{}_{}'.format(basename, str(i_node).zfill(6)))
ctx.obj['NAME'] = name
node_config['job_name'] = name
node_config["pipeline_step"] = pipeline_step

cmd = BiasCalcClass.get_node_cmd(node_config)

cmd_log = '\n\t'.join(cmd.split('\n'))
logger.debug(f'Running command:\n\t{cmd_log}')

if hardware_option.lower() in AVAILABLE_HARDWARE_OPTIONS:
kickoff_slurm_job(ctx, cmd, **exec_kwargs)
kickoff_slurm_job(ctx, cmd, pipeline_step, **exec_kwargs)
else:
kickoff_local_job(ctx, cmd)
kickoff_local_job(ctx, cmd, pipeline_step)


def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
feature=None, stdout_path='./stdout/'):
def kickoff_slurm_job(ctx, cmd, pipeline_step=None, alloc='sup3r',
memory=None, walltime=4, feature=None,
stdout_path='./stdout/'):
"""Run sup3r on HPC via SLURM job submission.

Parameters
Expand All @@ -79,6 +81,10 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
cmd : str
Command to be submitted in SLURM shell script. Example:
'python -m sup3r.cli forward_pass -c <config_file>'
pipeline_step : str, optional
Name of the pipeline step being run. If ``None``, the
``pipeline_step`` will be set to the ``module_name``,
mimicking old reV behavior. By default, ``None``.
alloc : str
HPC project (allocation) handle. Example: 'sup3r'.
memory : int
Expand All @@ -92,10 +98,10 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
Path to print .stdout and .stderr files.
"""
BaseCLI.kickoff_slurm_job(ModuleName.BIAS_CALC, ctx, cmd, alloc, memory,
walltime, feature, stdout_path)
walltime, feature, stdout_path, pipeline_step)


def kickoff_local_job(ctx, cmd):
def kickoff_local_job(ctx, cmd, pipeline_step=None):
"""Run sup3r bias calc locally.

Parameters
Expand All @@ -105,8 +111,12 @@ def kickoff_local_job(ctx, cmd):
cmd : str
Command to be submitted in shell script. Example:
'python -m sup3r.cli forward_pass -c <config_file>'
pipeline_step : str, optional
Name of the pipeline step being run. If ``None``, the
``pipeline_step`` will be set to the ``module_name``,
mimicking old reV behavior. By default, ``None``.
"""
BaseCLI.kickoff_local_job(ModuleName.BIAS_CALC, ctx, cmd)
BaseCLI.kickoff_local_job(ModuleName.BIAS_CALC, ctx, cmd, pipeline_step)


if __name__ == '__main__':
Expand Down
3 changes: 2 additions & 1 deletion sup3r/pipeline/forward_pass.py
Original file line number Diff line number Diff line change
Expand Up @@ -1761,7 +1761,8 @@ def get_node_cmd(cls, config):
f"{cls.__name__}.run(strategy, {node_index});\n"
"t_elap = time.time() - t0;\n")

cmd = BaseCLI.add_status_cmd(config, ModuleName.FORWARD_PASS, cmd)
pipeline_step = config.get('pipeline_step') or ModuleName.FORWARD_PASS
cmd = BaseCLI.add_status_cmd(config, pipeline_step, cmd)
cmd += ";\'\n"

return cmd.replace('\\', '/')
Expand Down
26 changes: 18 additions & 8 deletions sup3r/pipeline/forward_pass_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def main(ctx, verbose):
@click.option('-v', '--verbose', is_flag=True,
help='Flag to turn on debug logging. Default is not verbose.')
@click.pass_context
def from_config(ctx, config_file, verbose=False, **__):
def from_config(ctx, config_file, verbose=False, pipeline_step=None):
"""Run sup3r forward pass from a config file."""

config = BaseCLI.from_config_preflight(ModuleName.FORWARD_PASS, ctx,
Expand Down Expand Up @@ -66,19 +66,21 @@ def from_config(ctx, config_file, verbose=False, **__):
name = ('{}_{}'.format(basename, str(i_node).zfill(6)))
ctx.obj['NAME'] = name
node_config['job_name'] = name
node_config["pipeline_step"] = pipeline_step
cmd = ForwardPass.get_node_cmd(node_config)

cmd_log = '\n\t'.join(cmd.split('\n'))
logger.debug(f'Running command:\n\t{cmd_log}')

if hardware_option.lower() in AVAILABLE_HARDWARE_OPTIONS:
kickoff_slurm_job(ctx, cmd, **exec_kwargs)
kickoff_slurm_job(ctx, cmd, pipeline_step, **exec_kwargs)
else:
kickoff_local_job(ctx, cmd)
kickoff_local_job(ctx, cmd, pipeline_step)


def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
feature=None, stdout_path='./stdout/'):
def kickoff_slurm_job(ctx, cmd, pipeline_step=None, alloc='sup3r',
memory=None, walltime=4, feature=None,
stdout_path='./stdout/'):
"""Run sup3r on HPC via SLURM job submission.

Parameters
Expand All @@ -88,6 +90,10 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
cmd : str
Command to be submitted in SLURM shell script. Example:
'python -m sup3r.cli forward_pass -c <config_file>'
pipeline_step : str, optional
Name of the pipeline step being run. If ``None``, the
``pipeline_step`` will be set to the ``module_name``,
mimicking old reV behavior. By default, ``None``.
alloc : str
HPC project (allocation) handle. Example: 'sup3r'.
memory : int
Expand All @@ -101,10 +107,10 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
Path to print .stdout and .stderr files.
"""
BaseCLI.kickoff_slurm_job(ModuleName.FORWARD_PASS, ctx, cmd, alloc, memory,
walltime, feature, stdout_path)
walltime, feature, stdout_path, pipeline_step)


def kickoff_local_job(ctx, cmd):
def kickoff_local_job(ctx, cmd, pipeline_step=None):
"""Run sup3r forward pass locally.

Parameters
Expand All @@ -114,8 +120,12 @@ def kickoff_local_job(ctx, cmd):
cmd : str
Command to be submitted in shell script. Example:
'python -m sup3r.cli forward_pass -c <config_file>'
pipeline_step : str, optional
Name of the pipeline step being run. If ``None``, the
``pipeline_step`` will be set to the ``module_name``,
mimicking old reV behavior. By default, ``None``.
"""
BaseCLI.kickoff_local_job(ModuleName.FORWARD_PASS, ctx, cmd)
BaseCLI.kickoff_local_job(ModuleName.FORWARD_PASS, ctx, cmd, pipeline_step)


if __name__ == '__main__':
Expand Down
12 changes: 9 additions & 3 deletions sup3r/postprocessing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ def get_node_cmd(cls, config):
"t_elap = time.time() - t0;\n"
)

cmd = BaseCLI.add_status_cmd(config, ModuleName.DATA_COLLECT, cmd)

pipeline_step = config.get('pipeline_step') or ModuleName.DATA_COLLECT
cmd = BaseCLI.add_status_cmd(config, pipeline_step, cmd)
cmd += ";\'\n"

return cmd.replace('\\', '/')
Expand Down Expand Up @@ -755,6 +755,7 @@ def collect(
log_file=None,
write_status=False,
job_name=None,
pipeline_step=None,
join_times=False,
target_final_meta_file=None,
n_writes=None,
Expand Down Expand Up @@ -786,6 +787,10 @@ def collect(
Flag to write status file once complete if running from pipeline.
job_name : str
Job name for status file if running from pipeline.
pipeline_step : str, optional
Name of the pipeline step being run. If ``None``, the
``pipeline_step`` will be set to the ``"collect``,
mimicking old reV behavior. By default, ``None``.
join_times : bool
Option to split full file list into chunks with each chunk having
the same temporal_chunk_index. The number of writes will then be
Expand Down Expand Up @@ -909,8 +914,9 @@ def collect(
'job_status': 'successful',
'runtime': (time.time() - t0) / 60,
}
pipeline_step = pipeline_step or 'collect'
Status.make_single_job_file(
os.path.dirname(out_file), 'collect', job_name, status
os.path.dirname(out_file), pipeline_step, job_name, status
)

logger.info('Finished file collection.')
26 changes: 18 additions & 8 deletions sup3r/postprocessing/data_collect_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def main(ctx, verbose):
@click.option('-v', '--verbose', is_flag=True,
help='Flag to turn on debug logging. Default is not verbose.')
@click.pass_context
def from_config(ctx, config_file, verbose=False, **__):
def from_config(ctx, config_file, verbose=False, pipeline_step=None):
"""Run sup3r data collection from a config file. If dset_split is True this
each feature will be collected into a separate file."""
config = BaseCLI.from_config_preflight(ModuleName.DATA_COLLECT, ctx,
Expand Down Expand Up @@ -62,18 +62,19 @@ def from_config(ctx, config_file, verbose=False, **__):

for config in configs:
ctx.obj['NAME'] = config['job_name']
config['pipeline_step'] = pipeline_step
cmd = Collector.get_node_cmd(config)

cmd_log = '\n\t'.join(cmd.split('\n'))
logger.debug(f'Running command:\n\t{cmd_log}')

if hardware_option.lower() in AVAILABLE_HARDWARE_OPTIONS:
kickoff_slurm_job(ctx, cmd, **exec_kwargs)
kickoff_slurm_job(ctx, cmd, pipeline_step, **exec_kwargs)
else:
kickoff_local_job(ctx, cmd)
kickoff_local_job(ctx, cmd, pipeline_step)


def kickoff_local_job(ctx, cmd):
def kickoff_local_job(ctx, cmd, pipeline_step=None):
"""Run sup3r data collection locally.

Parameters
Expand All @@ -83,12 +84,17 @@ def kickoff_local_job(ctx, cmd):
cmd : str
Command to be submitted in shell script. Example:
'python -m sup3r.cli data_collect -c <config_file>'
pipeline_step : str, optional
Name of the pipeline step being run. If ``None``, the
``pipeline_step`` will be set to the ``module_name``,
mimicking old reV behavior. By default, ``None``.
"""
BaseCLI.kickoff_local_job(ModuleName.DATA_COLLECT, ctx, cmd)
BaseCLI.kickoff_local_job(ModuleName.DATA_COLLECT, ctx, cmd, pipeline_step)


def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
feature=None, stdout_path='./stdout/'):
def kickoff_slurm_job(ctx, cmd, pipeline_step=None, alloc='sup3r',
memory=None, walltime=4, feature=None,
stdout_path='./stdout/'):
"""Run sup3r on HPC via SLURM job submission.

Parameters
Expand All @@ -98,6 +104,10 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
cmd : str
Command to be submitted in SLURM shell script. Example:
'python -m sup3r.cli data-collect -c <config_file>'
pipeline_step : str, optional
Name of the pipeline step being run. If ``None``, the
``pipeline_step`` will be set to the ``module_name``,
mimicking old reV behavior. By default, ``None``.
alloc : str
HPC project (allocation) handle. Example: 'sup3r'.
memory : int
Expand All @@ -111,7 +121,7 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
Path to print .stdout and .stderr files.
"""
BaseCLI.kickoff_slurm_job(ModuleName.DATA_COLLECT, ctx, cmd, alloc, memory,
walltime, feature, stdout_path)
walltime, feature, stdout_path, pipeline_step)


if __name__ == '__main__':
Expand Down
22 changes: 16 additions & 6 deletions sup3r/preprocessing/data_extract_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def main(ctx, verbose):
@click.option('-v', '--verbose', is_flag=True,
help='Flag to turn on debug logging. Default is not verbose.')
@click.pass_context
def from_config(ctx, config_file, verbose=False, **__):
def from_config(ctx, config_file, verbose=False, pipeline_step=None):
"""Run sup3r data extraction from a config file.

Parameters
Expand All @@ -46,6 +46,7 @@ def from_config(ctx, config_file, verbose=False, **__):
"""
config = BaseCLI.from_config_preflight(ModuleName.DATA_EXTRACT, ctx,
config_file, verbose)
config["pipeline_step"] = pipeline_step

exec_kwargs = config.get('execution_control', {})
hardware_option = exec_kwargs.pop('option', 'local')
Expand All @@ -63,7 +64,7 @@ def from_config(ctx, config_file, verbose=False, **__):
kickoff_local_job(ctx, cmd)


def kickoff_local_job(ctx, cmd):
def kickoff_local_job(ctx, cmd, pipeline_step=None):
"""Run sup3r data extraction locally.

Parameters
Expand All @@ -73,12 +74,17 @@ def kickoff_local_job(ctx, cmd):
cmd : str
Command to be submitted in shell script. Example:
'python -m sup3r.cli data_extract -c <config_file>'
pipeline_step : str, optional
Name of the pipeline step being run. If ``None``, the
``pipeline_step`` will be set to the ``module_name``,
mimicking old reV behavior. By default, ``None``.
"""
BaseCLI.kickoff_local_job(ModuleName.DATA_EXTRACT, ctx, cmd)
BaseCLI.kickoff_local_job(ModuleName.DATA_EXTRACT, ctx, cmd, pipeline_step)


def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
feature=None, stdout_path='./stdout/'):
def kickoff_slurm_job(ctx, cmd, pipeline_step=None, alloc='sup3r',
memory=None, walltime=4, feature=None,
stdout_path='./stdout/'):
"""Run sup3r on HPC via SLURM job submission.

Parameters
Expand All @@ -88,6 +94,10 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
cmd : str
Command to be submitted in SLURM shell script. Example:
'python -m sup3r.cli data_extract -c <config_file>'
pipeline_step : str, optional
Name of the pipeline step being run. If ``None``, the
``pipeline_step`` will be set to the ``module_name``,
mimicking old reV behavior. By default, ``None``.
alloc : str
HPC project (allocation) handle. Example: 'sup3r'.
memory : int
Expand All @@ -101,7 +111,7 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
Path to print .stdout and .stderr files.
"""
BaseCLI.kickoff_slurm_job(ModuleName.DATA_EXTRACT, ctx, cmd, alloc, memory,
walltime, feature, stdout_path)
walltime, feature, stdout_path, pipeline_step)


if __name__ == '__main__':
Expand Down
4 changes: 2 additions & 2 deletions sup3r/preprocessing/data_handling/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,8 +816,8 @@ def get_node_cmd(cls, config):
f"data_handler = {dh_init_str};\n"
"t_elap = time.time() - t0;\n")

cmd = BaseCLI.add_status_cmd(config, ModuleName.DATA_EXTRACT, cmd)

pipeline_step = config.get('pipeline_step') or ModuleName.DATA_EXTRACT
cmd = BaseCLI.add_status_cmd(config, pipeline_step, cmd)
cmd += ";\'\n"
return cmd.replace('\\', '/')

Expand Down
3 changes: 2 additions & 1 deletion sup3r/qa/qa.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,8 @@ def get_node_cmd(cls, config):
"t_elap = time.time() - t0;\n"
)

cmd = BaseCLI.add_status_cmd(config, ModuleName.QA, cmd)
pipeline_step = config.get('pipeline_step') or ModuleName.QA
cmd = BaseCLI.add_status_cmd(config, pipeline_step, cmd)
cmd += ";\'\n"

return cmd.replace('\\', '/')
Expand Down
5 changes: 3 additions & 2 deletions sup3r/qa/qa_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ def main(ctx, verbose):
@click.option('-v', '--verbose', is_flag=True,
help='Flag to turn on debug logging. Default is not verbose.')
@click.pass_context
def from_config(ctx, config_file, verbose=False, **__):
def from_config(ctx, config_file, verbose=False, pipeline_step=None):
"""Run the sup3r QA module from a config file."""
BaseCLI.from_config(ModuleName.QA, Sup3rQa, ctx, config_file, verbose)
BaseCLI.from_config(ModuleName.QA, Sup3rQa, ctx, config_file, verbose,
pipeline_step)


if __name__ == '__main__':
Expand Down
3 changes: 2 additions & 1 deletion sup3r/qa/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ def get_node_cmd(cls, config):
"t_elap = time.time() - t0;\n"
)

cmd = BaseCLI.add_status_cmd(config, ModuleName.STATS, cmd)
pipeline_step = config.get('pipeline_step') or ModuleName.STATS
cmd = BaseCLI.add_status_cmd(config, pipeline_step, cmd)
cmd += ";\'\n"

return cmd.replace('\\', '/')
Expand Down
Loading