-
Notifications
You must be signed in to change notification settings - Fork 50
Stand alone script to run a single atm 3DVar analysis #42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
8d7a698
cb92c95
6bdc0c7
fc675ba
7574707
54623ce
b722e28
2b40148
a8a50c5
2707343
60a0307
a90f2b4
ed54794
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| covariance model: FV3JEDI-ID | ||
|
RussTreadon-NOAA marked this conversation as resolved.
|
||
| date: $(window_begin) | ||
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,103 @@ | ||
| #!/usr/bin/env python3 | ||
| import argparse | ||
| import datetime as dt | ||
| import logging | ||
| import os | ||
| import re | ||
| import subprocess | ||
| import sys | ||
| import yaml | ||
|
|
||
|
|
||
| def run_atm_var_analysis(yamlconfig): | ||
| logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s', level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S') | ||
| # open YAML file to get config | ||
| try: | ||
| with open(yamlconfig, 'r') as yamlconfig_opened: | ||
| all_config_dict = yaml.safe_load(yamlconfig_opened) | ||
| logging.info(f'Loading configuration from {yamlconfig}') | ||
| except Exception as e: | ||
| logging.error(f'Error occurred when attempting to load: {yamlconfig}, error: {e}') | ||
| # create working directory | ||
| workdir = all_config_dict['working directory'] | ||
| try: | ||
| os.makedirs(workdir, exist_ok=True) | ||
| logging.info(f'Created working directory {workdir}') | ||
| except OSError as error: | ||
| logging.error(f'Error creating {workdir}: {error}') | ||
| # add your ufsda python package to path | ||
| ufsda_path = os.path.join(all_config_dict['GDASApp home'], 'ush') | ||
| sys.path.append(ufsda_path) | ||
| import ufsda | ||
|
aerorahul marked this conversation as resolved.
|
||
| # compute config for YAML for executable | ||
| analysis_subconfig = all_config_dict['analysis options'] | ||
| valid_time = analysis_subconfig['valid_time'] | ||
| h = re.findall('PT(\\d+)H', analysis_subconfig['atm_window_length'])[0] | ||
| prev_cycle = valid_time - dt.timedelta(hours=int(h)) | ||
| window_begin = valid_time - dt.timedelta(hours=int(h)/2) | ||
| cyc = valid_time.strftime("%H") | ||
| cdate = valid_time.strftime("%Y%m%d%H") | ||
| gcyc = prev_cycle.strftime("%H") | ||
| gdate = prev_cycle.strftime("%Y%m%d%H") | ||
| var_config = { | ||
| 'BERROR_YAML': analysis_subconfig['berror_yaml'], | ||
| 'OBS_YAML_DIR': analysis_subconfig['obs_yaml_dir'], | ||
| 'OBS_LIST': analysis_subconfig['obs_list'], | ||
| 'atm': analysis_subconfig['atm'], | ||
| 'layout_x': str(analysis_subconfig['layout_x']), | ||
| 'layout_y': str(analysis_subconfig['layout_y']), | ||
| 'BKG_DIR': os.path.join(workdir, 'bkg'), | ||
| 'fv3jedi_fix_dir': os.path.join(workdir, 'Data', 'fv3files'), | ||
| 'fv3jedi_fieldset_dir': os.path.join(workdir, 'Data', 'fieldsets'), | ||
| 'ANL_DIR': os.path.join(workdir, 'anl'), | ||
| 'fv3jedi_staticb_dir': os.path.join(workdir, 'berror'), | ||
| 'BIAS_DIR': os.path.join(workdir, 'obs'), | ||
| 'CRTM_COEFF_DIR': os.path.join(workdir, 'crtm'), | ||
| 'BIAS_PREFIX': f"{analysis_subconfig['dump']}.t{gcyc}z", | ||
| 'BIAS_DATE': f"{gdate}", | ||
| 'DIAG_DIR': os.path.join(workdir, 'diags'), | ||
| 'OBS_DIR': os.path.join(workdir, 'obs'), | ||
| 'OBS_PREFIX': f"{analysis_subconfig['dump']}.t{cyc}z", | ||
| 'OBS_DATE': f"{cdate}", | ||
| 'valid_time': f"{valid_time.strftime('%Y-%m-%dT%H:%M:%SZ')}", | ||
| 'window_begin': f"{window_begin.strftime('%Y-%m-%dT%H:%M:%SZ')}", | ||
| 'prev_valid_time': f"{prev_cycle.strftime('%Y-%m-%dT%H:%M:%SZ')}", | ||
| 'atm_window_length': analysis_subconfig['atm_window_length'], | ||
| 'CASE': analysis_subconfig['case'], | ||
| 'CASE_ENKF': analysis_subconfig['case_enkf'], | ||
| 'DOHYBVAR': analysis_subconfig['dohybvar'], | ||
| 'LEVS': str(analysis_subconfig['levs']), | ||
| } | ||
| template = analysis_subconfig['var_yaml'] | ||
| output_file = os.path.join(workdir, 'fv3jedi_var.yaml') | ||
| # set some environment variables | ||
| os.environ['PARMgfs'] = os.path.join(all_config_dict['GDASApp home'], 'parm') | ||
| # generate YAML for executable based on input config | ||
| logging.info(f'Using YAML template {template}') | ||
| ufsda.yamltools.genYAML(var_config, template=template, output=output_file) | ||
| logging.info(f'Wrote Variational DA YAML file to {output_file}') | ||
| # use R2D2 to stage backgrounds, obs, bias correction files, etc. | ||
| ufsda.stage.gdas_single_cycle(var_config) | ||
| # link additional fix files needed (CRTM, fieldsets, etc.) | ||
| gdasfix = analysis_subconfig['gdas_fix_root'] | ||
| ufsda.stage.gdas_fix(gdasfix, workdir, var_config) | ||
| # link executable | ||
| varexe = os.path.join(all_config_dict['GDASApp home'], 'build', 'bin', 'fv3jedi_var.x') | ||
| ufsda.disk_utils.symlink(varexe, os.path.join(workdir, 'fv3jedi_var.x')) | ||
| # create output directories | ||
| ufsda.disk_utils.mkdir(os.path.join(workdir, 'diags')) | ||
| ufsda.disk_utils.mkdir(os.path.join(workdir, 'anl')) | ||
| # generate job submission script | ||
| job_script = ufsda.misc_utils.create_batch_job(all_config_dict['job options'], | ||
| workdir, | ||
| os.path.join(workdir, 'fv3jedi_var.x'), | ||
| output_file) | ||
| # submit job to queue | ||
| ufsda.misc_utils.submit_batch_job(all_config_dict['job options'], workdir, job_script) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| parser = argparse.ArgumentParser() | ||
| parser.add_argument('-c', '--config', type=str, help='Input YAML Configuration', required=True) | ||
| args = parser.parse_args() | ||
| run_atm_var_analysis(args.config) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,7 @@ | ||
| from .disk_utils import * | ||
| from .disk_utils import mkdir, symlink | ||
| from .ufs_yaml import gen_yaml, parse_config | ||
| import ufsda.stage | ||
| import ufsda.r2d2 | ||
| import ufsda.post | ||
| import ufsda.yamltools | ||
| from .misc_utils import isTrue | ||
| from .misc_utils import isTrue, create_batch_job, submit_batch_job |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,8 @@ | ||
| import logging | ||
| import os | ||
| import subprocess | ||
|
|
||
|
|
||
| def isTrue(str_in): | ||
| """ isTrue(str_in) | ||
| - function to translate shell variables to python logical variables | ||
|
|
@@ -10,3 +15,38 @@ def isTrue(str_in): | |
| else: | ||
| status = False | ||
| return status | ||
|
|
||
|
|
||
| def create_batch_job(job_config, working_dir, exe_path, yaml_path): | ||
| """ | ||
| create a batch job submission shell script | ||
| """ | ||
| # open up a file for writing | ||
| batch_script = os.path.join(working_dir, 'submit_job.sh') | ||
| with open(batch_script, 'w') as f: | ||
| f.write('#!/bin/bash\n') | ||
| if job_config['machine'] in ['orion', 'hera']: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would suggest using
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I thought about that, but wanted to make it simpler for the user. Should I do the workflow approach where scheduler is the if statement but it is determined based on a dict (
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @aerorahul how is this? |
||
| f.write('#SBATCH -J GDASApp\n') | ||
| f.write('#SBATCH -o GDASApp.o%J\n') | ||
| f.write(f"#SBATCH -A {job_config['account']}\n") | ||
| f.write(f"#SBATCH -q {job_config['queue']}\n") | ||
| f.write(f"#SBATCH -p {job_config['partition']}\n") | ||
| f.write(f"#SBATCH --ntasks={job_config['ntasks']}\n") | ||
| f.write(f"#SBATCH --cpus-per-task={job_config['cpus-per-task']}\n") | ||
| f.write(f"#SBATCH --exclusive\n") | ||
| f.write(f"#SBATCH -t {job_config['walltime']}\n") | ||
| f.write(f"module use {job_config['modulepath']}\n") | ||
| f.write(f"module load GDAS/{job_config['machine']}\n") | ||
| f.write(f"cd {working_dir}\n") | ||
| if job_config['machine'] in ['orion', 'hera']: | ||
| f.write(f"srun -n $SLURM_NTASKS {exe_path} {yaml_path}\n") | ||
| logging.info(f"Wrote batch submission script to {batch_script}") | ||
| return batch_script | ||
|
|
||
|
|
||
| def submit_batch_job(job_config, working_dir, job_script): | ||
| """ | ||
| submit a batch job | ||
| """ | ||
| if job_config['machine'] in ['orion', 'hera']: | ||
| subprocess.Popen(f"sbatch {job_script}", cwd=working_dir, shell=True) | ||
Uh oh!
There was an error while loading. Please reload this page.