diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 7e419dc085d..51ac7db706c 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -210,19 +210,19 @@ ush/python/pygfs/task/aero_bmatrix.py @DavidNew-NOAA @CoryMartin-NOAA ush/python/pygfs/task/aero_emissions.py @bbakernoaa ush/python/pygfs/task/aero_prepobs.py @CoryMartin-NOAA ush/python/pygfs/task/analysis.py @DavidNew-NOAA @RussTreadon-NOAA -ush/python/pygfs/task/analysis_stats.py @CoryMartin-NOAA +ush/python/pygfs/task/analysis_stats.py @CoryMartin-NOAA @DavidNew-NOAA ush/python/pygfs/task/archive.py @DavidHuber-NOAA ush/python/pygfs/task/atm_analysis.py @DavidNew-NOAA @RussTreadon-NOAA ush/python/pygfs/task/atmens_analysis.py @DavidNew-NOAA @RussTreadon-NOAA ush/python/pygfs/task/bmatrix.py @DavidNew-NOAA ush/python/pygfs/task/gfs_forecast.py @aerorahul -ush/python/pygfs/task/marine_analysis.py @guillaumevernieres @AndrewEichmann-NOAA -ush/python/pygfs/task/marine_bmat.py @guillaumevernieres @AndrewEichmann-NOAA -ush/python/pygfs/task/marine_letkf.py @guillaumevernieres @AndrewEichmann-NOAA -ush/python/pygfs/task/oceanice_products.py @aerorahul @JesseMeng-NOAA @ChristopherHill-NOAA +ush/python/pygfs/task/marine_analysis.py @guillaumevernieres @AndrewEichmann-NOAA @DavidNew-NOAA +ush/python/pygfs/task/marine_bmat.py @guillaumevernieres @AndrewEichmann-NOAA @DavidNew-NOAA +ush/python/pygfs/task/marine_letkf.py @guillaumevernieres @AndrewEichmann-NOAA @DavidNew-NOAA +ush/python/pygfs/task/oceanice_products.py @aerorahul @JesseMeng-NOAA @ChristopherHill-NOAA @DavidNew-NOAA ush/python/pygfs/task/offline_analysis.py @CoryMartin-NOAA -ush/python/pygfs/task/snow_analysis.py @jiaruidong2017 -ush/python/pygfs/task/snowens_analysis.py @jiaruidong2017 +ush/python/pygfs/task/snow_analysis.py @jiaruidong2017 @DavidNew-NOAA +ush/python/pygfs/task/snowens_analysis.py @jiaruidong2017 @DavidNew-NOAA ush/python/pygfs/task/stage_ic.py @DavidHuber-NOAA @aerorahul ush/python/pygfs/task/upp.py @aerorahul @WenMeng-NOAA ush/python/pygfs/ufswm/__init__.py @aerorahul diff --git a/dev/ci/cases/yamls/ufs_hybatmDA_defaults.ci.yaml b/dev/ci/cases/yamls/ufs_hybatmDA_defaults.ci.yaml index f7fa21d4560..045a1bc8a4a 100644 --- a/dev/ci/cases/yamls/ufs_hybatmDA_defaults.ci.yaml +++ b/dev/ci/cases/yamls/ufs_hybatmDA_defaults.ci.yaml @@ -6,17 +6,19 @@ base: DO_JEDIATMENS: "YES" DO_TEST_MODE: "NO" atmanl: - JCB_ALGO_YAML_VAR: "${HOMEgfs}/sorc/gdas.cd/test/gw-ci/atm/jcb-prototype_3dvar_ufs_hybatmDA.yaml.j2" - JCB_ALGO_YAML_FV3INC: "${HOMEgfs}/sorc/gdas.cd/test/gw-ci/atm/jcb-prototype_3dvar-fv3inc_ufs_hybatmDA.yaml.j2" LAYOUT_X_ATMANL: 4 LAYOUT_Y_ATMANL: 4 + OBS_LIST_YAML: "${HOMEgfs}/sorc/gdas.cd/test/gw-ci/atm/atm_obs_list_ufs_hybatmDA.yaml.j2" + VAR_JEDI_TEST_YAML: "${HOMEgfs}/sorc/gdas.cd/test/gw-ci/atm/jedi-test_3dvar_ufs_hybatmDA.yaml.j2" + FV3INC_JEDI_TEST_YAML: "${HOMEgfs}/sorc/gdas.cd/test/gw-ci/atm/jedi-test_3dvar-fv3inc_ufs_hybatmDA.yaml.j2" atmensanl: - JCB_ALGO_YAML_LETKF: "${HOMEgfs}/sorc/gdas.cd/test/gw-ci/atm/jcb-prototype_lgetkf_ufs_hybatmDA.yaml.j2" - JCB_ALGO_YAML_OBS: "${HOMEgfs}/sorc/gdas.cd/test/gw-ci/atm/jcb-prototype_lgetkf_observer_ufs_hybatmDA.yaml.j2" - JCB_ALGO_YAML_SOL: "${HOMEgfs}/sorc/gdas.cd/test/gw-ci/atm/jcb-prototype_lgetkf_solver_ufs_hybatmDA.yaml.j2" - JCB_ALGO_YAML_FV3INC: "${HOMEgfs}/sorc/gdas.cd/test/gw-ci/atm/jcb-prototype_lgetkf-fv3inc_ufs_hybatmDA.yaml.j2" LAYOUT_X_ATMENSANL: 4 LAYOUT_Y_ATMENSANL: 4 + OBS_LIST_YAML: "${HOMEgfs}/sorc/gdas.cd/test/gw-ci/atm/atm_obs_list_ufs_hybatmDA.yaml.j2" + LETKF_JEDI_TEST_YAML: "${HOMEgfs}/sorc/gdas.cd/test/gw-ci/atm/jedi-test_lgetkf_ufs_hybatmDA.yaml.j2" + OBS_JEDI_TEST_YAML: "${HOMEgfs}/sorc/gdas.cd/test/gw-ci/atm/jedi-test_lgetkf-observer_ufs_hybatmDA.yaml.j2" + SOL_JEDI_TEST_YAML: "${HOMEgfs}/sorc/gdas.cd/test/gw-ci/atm/jedi-test_lgetkf-solver_ufs_hybatmDA.yaml.j2" + FV3INC_JEDI_TEST_YAML: "${HOMEgfs}/sorc/gdas.cd/test/gw-ci/atm/jedi-test_lgetkf-fv3inc_ufs_hybatmDA.yaml.j2" esfc: DONST: "NO" nsst: diff --git a/dev/parm/config/gcafs/config.aeroanl.j2 b/dev/parm/config/gcafs/config.aeroanl.j2 index e97b6d088a6..fb4a3de649f 100644 --- a/dev/parm/config/gcafs/config.aeroanl.j2 +++ b/dev/parm/config/gcafs/config.aeroanl.j2 @@ -18,18 +18,12 @@ case ${CASE} in exit 4 esac export CASE_ANL -export JCB_ALGO_YAML_VAR=${PARMgfs}/gdas/aero/jcb-prototype_3dvar.yaml.j2 -export STATICB_TYPE='diffusion' -export BERROR_YAML="aero_background_error_static_${STATICB_TYPE}" -export BERROR_DATA_DIR="${FIXgfs}/gdas/aero/clim_b" -export JEDI_CONFIG_YAML="${PARMgfs}/gdas/aero/aero_det_jedi_config.yaml.j2" -export STAGE_CRTM_COEFF_YAML="${PARMgfs}/gdas/aero/aero_stage_crtm_coeff.yaml.j2" -export STAGE_JEDI_FIX_YAML="${PARMgfs}/gdas/aero/aero_stage_jedi_fix.yaml.j2" -export STAGE_YAML="${PARMgfs}/gdas/aero/aero_det_stage.yaml.j2" -export SAVE_YAML="${PARMgfs}/gdas/aero/aero_det_save.yaml.j2" +export STATICB_TYPE='diffusion' -export AERO_BMATRIX_RESCALE_YAML="aero_gen_bmatrix_rescale_default.yaml.j2" +export TASK_CONFIG_YAML="${PARMgfs}/gdas/aero/aero_det_config.yaml.j2" +export OBS_LIST_YAML="${PARMgfs}/gdas/aero/aero_obs_list.yaml.j2" +export BIAS_FILES_YAML="${PARMgfs}/gdas/aero/aero_bias_files.yaml.j2" export io_layout_x="{{ IO_LAYOUT_X }}" export io_layout_y="{{ IO_LAYOUT_Y }}" diff --git a/dev/parm/config/gcafs/config.aeroanlgenb b/dev/parm/config/gcafs/config.aeroanlgenb index fc4fd2243fd..d9c8dbc2862 100644 --- a/dev/parm/config/gcafs/config.aeroanlgenb +++ b/dev/parm/config/gcafs/config.aeroanlgenb @@ -8,11 +8,10 @@ echo "BEGIN: config.aeroanlgenb" # Get task specific resources source "${EXPDIR}/config.resources" aeroanlgenb -export JEDI_CONFIG_YAML="${PARMgfs}/gdas/aero/aero_bmat_jedi_config.yaml.j2" -export STAGE_YAML="${PARMgfs}/gdas/aero/aero_bmat_stage.yaml.j2" -export SAVE_YAML="${PARMgfs}/gdas/aero/aero_bmat_save.yaml.j2" +export TASK_CONFIG_YAML="${PARMgfs}/gdas/aero/aero_bmat_config.yaml.j2" +export OBS_LIST_YAML="${PARMgfs}/gdas/aero/aero_obs_list.yaml.j2" +export BIAS_FILES_YAML="${PARMgfs}/gdas/aero/aero_bias_files.yaml.j2" -export RESCALE_YAML="${PARMgfs}/gdas/jcb-gdas/aero/algorithm/aero_gen_bmatrix_rescale_default.yaml.j2" export aero_diffusion_iter=200 export aero_diffusion_horiz_len=300e3 export aero_diffusion_fixed_val=20.0 diff --git a/dev/parm/config/gcafs/yaml/defaults.yaml b/dev/parm/config/gcafs/yaml/defaults.yaml index 2c8379195fe..814eaafc9c8 100644 --- a/dev/parm/config/gcafs/yaml/defaults.yaml +++ b/dev/parm/config/gcafs/yaml/defaults.yaml @@ -37,23 +37,25 @@ base: FHOUT_AERO: 3 atmanl: - JCB_ALGO_YAML_VAR: "${PARMgfs}/gdas/atm/jcb-prototype_3dvar.yaml.j2" - JCB_ALGO_YAML_FV3INC: "${PARMgfs}/gdas/atm/jcb-prototype_3dvar-fv3inc.yaml.j2" STATICB_TYPE: "gsibec" LAYOUT_X_ATMANL: 8 LAYOUT_Y_ATMANL: 8 IO_LAYOUT_X: 1 IO_LAYOUT_Y: 1 + OBS_LIST_YAML: "${PARMgfs}/gdas/atm/atm_obs_list.yaml.j2" + VAR_JEDI_TEST_YAML: "" + FV3INC_JEDI_TEST_YAML: "" atmensanl: - JCB_ALGO_YAML_LETKF: "${PARMgfs}/gdas/atm/jcb-prototype_lgetkf.yaml.j2" - JCB_ALGO_YAML_OBS: "${PARMgfs}/gdas/atm/jcb-prototype_lgetkf_observer.yaml.j2" - JCB_ALGO_YAML_SOL: "${PARMgfs}/gdas/atm/jcb-prototype_lgetkf_solver.yaml.j2" - JCB_ALGO_YAML_FV3INC: "${PARMgfs}/gdas/atm/jcb-prototype_lgetkf-fv3inc.yaml.j2" LAYOUT_X_ATMENSANL: 8 LAYOUT_Y_ATMENSANL: 8 IO_LAYOUT_X: 1 IO_LAYOUT_Y: 1 + OBS_LIST_YAML: "${PARMgfs}/gdas/atm/atm_obs_list.yaml.j2" + LETKF_JEDI_TEST_YAML: "" + OBS_JEDI_TEST_YAML: "" + SOL_JEDI_TEST_YAML: "" + FV3INC_JEDI_TEST_YAML: "" aeroanl: IO_LAYOUT_X: 1 diff --git a/dev/parm/config/gfs/config.aeroanl.j2 b/dev/parm/config/gfs/config.aeroanl.j2 index f7d21986930..ae4efd8bcc4 100644 --- a/dev/parm/config/gfs/config.aeroanl.j2 +++ b/dev/parm/config/gfs/config.aeroanl.j2 @@ -18,16 +18,12 @@ case ${CASE} in exit 4 esac export CASE_ANL -export JCB_ALGO_YAML_VAR=${PARMgfs}/gdas/aero/jcb-prototype_3dvar.yaml.j2 + export STATICB_TYPE='diffusion' -export BERROR_YAML="aero_background_error_static_${STATICB_TYPE}" -export BERROR_DATA_DIR="${FIXgfs}/gdas/aero/clim_b" - -export JEDI_CONFIG_YAML="${PARMgfs}/gdas/aero/aero_det_jedi_config.yaml.j2" -export STAGE_CRTM_COEFF_YAML="${PARMgfs}/gdas/aero/aero_stage_crtm_coeff.yaml.j2" -export STAGE_JEDI_FIX_YAML="${PARMgfs}/gdas/aero/aero_stage_jedi_fix.yaml.j2" -export STAGE_YAML="${PARMgfs}/gdas/aero/aero_det_stage.yaml.j2" -export SAVE_YAML="${PARMgfs}/gdas/aero/aero_det_save.yaml.j2" + +export TASK_CONFIG_YAML="${PARMgfs}/gdas/aero/aero_det_config.yaml.j2" +export OBS_LIST_YAML="${PARMgfs}/gdas/aero/aero_obs_list.yaml.j2" +export BIAS_FILES_YAML="${PARMgfs}/gdas/aero/aero_bias_files.yaml.j2" export io_layout_x="{{ IO_LAYOUT_X }}" export io_layout_y="{{ IO_LAYOUT_Y }}" diff --git a/dev/parm/config/gfs/config.aeroanlgenb b/dev/parm/config/gfs/config.aeroanlgenb index 7fadfe33da2..d9c8dbc2862 100644 --- a/dev/parm/config/gfs/config.aeroanlgenb +++ b/dev/parm/config/gfs/config.aeroanlgenb @@ -8,9 +8,9 @@ echo "BEGIN: config.aeroanlgenb" # Get task specific resources source "${EXPDIR}/config.resources" aeroanlgenb -export JEDI_CONFIG_YAML"${PARMgfs}/gdas/aero/aero_bmat_jedi_config.yaml.j2" -export STAGE_YAML"${PARMgfs}/gdas/aero/aero_bmat_stage.yaml.j2" -export SAVE_YAML"${PARMgfs}/gdas/aero/aero_bmat_save.yaml.j2" +export TASK_CONFIG_YAML="${PARMgfs}/gdas/aero/aero_bmat_config.yaml.j2" +export OBS_LIST_YAML="${PARMgfs}/gdas/aero/aero_obs_list.yaml.j2" +export BIAS_FILES_YAML="${PARMgfs}/gdas/aero/aero_bias_files.yaml.j2" export aero_diffusion_iter=200 export aero_diffusion_horiz_len=300e3 diff --git a/dev/parm/config/gfs/config.analcalc_fv3jedi b/dev/parm/config/gfs/config.analcalc_fv3jedi index b0ede30dca5..233a5f694ac 100644 --- a/dev/parm/config/gfs/config.analcalc_fv3jedi +++ b/dev/parm/config/gfs/config.analcalc_fv3jedi @@ -14,9 +14,7 @@ export layout_y_analcalc_fv3jedi=2 # Get task specific resources source "${EXPDIR}/config.resources" analcalc_fv3jedi -export STAGE_JEDI_FIX_YAML="${PARMgfs}/gdas/atm/atm_stage_jedi_fix.yaml.j2" -export STAGE_YAML="${PARMgfs}/gdas/analcalc/analcalc_stage.yaml.j2" -export JEDI_CONFIG_YAML="${PARMgfs}/gdas/analcalc/analcalc_jedi_config.yaml.j2" +export TASK_CONFIG_YAML="${PARMgfs}/gdas/analcalc/analcalc_config.yaml.j2" if [[ ${DOHYBVAR} = "YES" ]]; then export CASE_ANL=${CASE_ENS} diff --git a/dev/parm/config/gfs/config.atmanl.j2 b/dev/parm/config/gfs/config.atmanl.j2 index 14d2acfa959..d0e0e8d0f88 100644 --- a/dev/parm/config/gfs/config.atmanl.j2 +++ b/dev/parm/config/gfs/config.atmanl.j2 @@ -5,33 +5,25 @@ echo "BEGIN: config.atmanl" -export JCB_ALGO_YAML_VAR="{{ JCB_ALGO_YAML_VAR }}" -export JCB_ALGO_YAML_FV3INC="{{ JCB_ALGO_YAML_FV3INC }}" - +export OBS_LIST_YAML="{{ OBS_LIST_YAML }}" +export VAR_JEDI_TEST_YAML="{{ VAR_JEDI_TEST_YAML }}" +export FV3INC_JEDI_TEST_YAML="{{ FV3INC_JEDI_TEST_YAML }}" export STATICB_TYPE="{{ STATICB_TYPE }}" +export layout_x_atmanl="{{ LAYOUT_X_ATMANL }}" +export layout_y_atmanl="{{ LAYOUT_Y_ATMANL }}" +export io_layout_x="{{ IO_LAYOUT_X }}" +export io_layout_y="{{ IO_LAYOUT_Y }}" + +export TASK_CONFIG_YAML="${PARMgfs}/gdas/atm/atm_det_config.yaml.j2" +export BIAS_FILES_YAML="${PARMgfs}/gdas/atm/atm_bias_files.yaml.j2" + export LOCALIZATION_TYPE="bump" -export INTERP_METHOD='barycentric' if [[ ${DOHYBVAR} = "YES" ]]; then # shellcheck disable=SC2153 export CASE_ANL=${CASE_ENS} - export BERROR_YAML="atmosphere_background_error_hybrid_${STATICB_TYPE}_${LOCALIZATION_TYPE}" else export CASE_ANL=${CASE} - export BERROR_YAML="atmosphere_background_error_static_${STATICB_TYPE}" fi -export JEDI_CONFIG_YAML="${PARMgfs}/gdas/atm/atm_det_jedi_config.yaml.j2" -export STAGE_CRTM_COEFF_YAML="${PARMgfs}/gdas/atm/atm_stage_crtm_coeff.yaml.j2" -export STAGE_JEDI_FIX_YAML="${PARMgfs}/gdas/atm/atm_stage_jedi_fix.yaml.j2" -export STAGE_BKG_YAML="${PARMgfs}/gdas/atm/atm_det_stage_bkg.yaml.j2" -export STAGE_BERROR_YAML="${PARMgfs}/gdas/atm/atm_det_stage_berror_${STATICB_TYPE}.yaml.j2" -export STAGE_FV3ENS_YAML="${PARMgfs}/gdas/atm/atm_det_stage_fv3ens.yaml.j2" - -export layout_x_atmanl="{{ LAYOUT_X_ATMANL }}" -export layout_y_atmanl="{{ LAYOUT_Y_ATMANL }}" - -export io_layout_x="{{ IO_LAYOUT_X }}" -export io_layout_y="{{ IO_LAYOUT_Y }}" - echo "END: config.atmanl" diff --git a/dev/parm/config/gfs/config.atmensanl.j2 b/dev/parm/config/gfs/config.atmensanl.j2 index a8d72c7889d..d6b869151ab 100644 --- a/dev/parm/config/gfs/config.atmensanl.j2 +++ b/dev/parm/config/gfs/config.atmensanl.j2 @@ -5,22 +5,17 @@ echo "BEGIN: config.atmensanl" -export JCB_ALGO_YAML_LETKF="{{ JCB_ALGO_YAML_LETKF }}" -export JCB_ALGO_YAML_OBS="{{ JCB_ALGO_YAML_OBS }}" -export JCB_ALGO_YAML_SOL="{{ JCB_ALGO_YAML_SOL }}" -export JCB_ALGO_YAML_FV3INC="{{ JCB_ALGO_YAML_FV3INC }}" - -export INTERP_METHOD='barycentric' - -export JEDI_CONFIG_YAML="${PARMgfs}/gdas/atm/atm_ens_jedi_config.yaml.j2" -export STAGE_CRTM_COEFF_YAML="${PARMgfs}/gdas/atm/atm_stage_crtm_coeff.yaml.j2" -export STAGE_JEDI_FIX_YAML="${PARMgfs}/gdas/atm/atm_stage_jedi_fix.yaml.j2" -export STAGE_BKG_YAML="${PARMgfs}/gdas/atm/atm_ens_stage_bkg.yaml.j2" - +export OBS_LIST_YAML="{{ OBS_LIST_YAML }}" +export LETKF_JEDI_TEST_YAML="{{ LETKF_JEDI_TEST_YAML }}" +export OBS_JEDI_TEST_YAML="{{ OBS_JEDI_TEST_YAML }}" +export SOL_JEDI_TEST_YAML="{{ SOL_JEDI_TEST_YAML }}" +export FV3INC_JEDI_TEST_YAML="{{ FV3INC_JEDI_TEST_YAML }}" export layout_x_atmensanl="{{ LAYOUT_X_ATMENSANL }}" export layout_y_atmensanl="{{ LAYOUT_Y_ATMENSANL }}" - export io_layout_x="{{ IO_LAYOUT_X }}" export io_layout_y="{{ IO_LAYOUT_Y }}" +export TASK_CONFIG_YAML="${PARMgfs}/gdas/atm/atm_ens_config.yaml.j2" +export BIAS_FILES_YAML="${PARMgfs}/gdas/atm/atm_bias_files.yaml.j2" + echo "END: config.atmensanl" diff --git a/dev/parm/config/gfs/config.ecen_fv3jedi b/dev/parm/config/gfs/config.ecen_fv3jedi index bf33719bc7d..768140b2654 100644 --- a/dev/parm/config/gfs/config.ecen_fv3jedi +++ b/dev/parm/config/gfs/config.ecen_fv3jedi @@ -14,9 +14,7 @@ export layout_y_ecen_fv3jedi=1 # Get task specific resources source "${EXPDIR}/config.resources" ecen_fv3jedi -export JEDI_CONFIG_YAML="${PARMgfs}/gdas/atm/atm_ecen_jedi_config.yaml.j2" -export STAGE_JEDI_FIX_YAML="${PARMgfs}/gdas/atm/atm_stage_jedi_fix.yaml.j2" -export STAGE_YAML="${PARMgfs}/gdas/atm/atm_ecen_stage.yaml.j2" +export TASK_CONFIG_YAML="${PARMgfs}/gdas/atm/atm_ecen_config.yaml.j2" if [[ ${DOHYBVAR} = "YES" ]]; then export CASE_ANL=${CASE_ENS} diff --git a/dev/parm/config/gfs/config.esnowanl.j2 b/dev/parm/config/gfs/config.esnowanl.j2 index 4da6cf78473..2dbf77949b4 100644 --- a/dev/parm/config/gfs/config.esnowanl.j2 +++ b/dev/parm/config/gfs/config.esnowanl.j2 @@ -8,21 +8,13 @@ echo "BEGIN: config.esnowanl" # Get task specific resources source "${EXPDIR}/config.resources" esnowanl -export JEDI_CONFIG_YAML="${PARMgfs}/gdas/snow/snow_ens_jedi_config.yaml.j2" -export STAGE_JEDI_FIX_YAML="${PARMgfs}/gdas/snow/snow_stage_jedi_fix.yaml.j2" -export STAGE_OROG_YAML="${PARMgfs}/gdas/snow/snow_stage_orog.yaml.j2" -export STAGE_BERROR_YAML="${PARMgfs}/gdas/snow/snow_stage_berror.yaml.j2" -export STAGE_BKG_YAML="${PARMgfs}/gdas/snow/snow_ens_stage_bkg.yaml.j2" -export STAGE_IMS_SCF2IODA_YAML="${PARMgfs}/gdas/snow/snow_stage_ims_scf2ioda.yaml.j2" -export STAGE_GTS_YAML="${PARMgfs}/gdas/snow/obs/config/bufr2ioda_mapping.yaml.j2" -export SAVE_YAML="${PARMgfs}/gdas/snow/snow_ens_save.yaml.j2" +export TASK_CONFIG_YAML="${PARMgfs}/gdas/snow/snow_ens_config.yaml.j2" +export OBS_LIST_YAML="${PARMgfs}/gdas/snow/snow_obs_list.yaml.j2" # Name of the executable that applies increment to bkg and its namelist template export APPLY_INCR_EXE="${EXECgfs}/gdas_apply_incr.x" export ENS_APPLY_INCR_NML_TMPL="${PARMgfs}/gdas/snow/ens_apply_incr_nml.j2" -export JEDI_CONFIG_YAML="${PARMgfs}/gdas/snow/snow_ens_jedi_config.yaml.j2" - export io_layout_x="{{ IO_LAYOUT_X }}" export io_layout_y="{{ IO_LAYOUT_Y }}" diff --git a/dev/parm/config/gfs/config.snowanl.j2 b/dev/parm/config/gfs/config.snowanl.j2 index 83f1a9ae78a..0fd94aa0a72 100644 --- a/dev/parm/config/gfs/config.snowanl.j2 +++ b/dev/parm/config/gfs/config.snowanl.j2 @@ -12,12 +12,8 @@ source "${EXPDIR}/config.resources" snowanl export APPLY_INCR_EXE="${EXECgfs}/gdas_apply_incr.x" export APPLY_INCR_NML_TMPL="${PARMgfs}/gdas/snow/apply_incr_nml.j2" -export JEDI_CONFIG_YAML="${PARMgfs}/gdas/snow/snow_det_jedi_config.yaml.j2" -export STAGE_JEDI_FIX_YAML="${PARMgfs}/gdas/snow/snow_stage_jedi_fix.yaml.j2" -export STAGE_BKG_YAML="${PARMgfs}/gdas/snow/snow_det_stage_bkg.yaml.j2" -export STAGE_BERROR_YAML="${PARMgfs}/gdas/snow/snow_stage_berror.yaml.j2" -export STAGE_GTS_YAML="${PARMgfs}/gdas/snow/obs/config/bufr2ioda_mapping.yaml.j2" -export STAGE_IMS_SCF2IODA_YAML="${PARMgfs}/gdas/snow/snow_stage_ims_scf2ioda.yaml.j2" +export TASK_CONFIG_YAML="${PARMgfs}/gdas/snow/snow_det_config.yaml.j2" +export OBS_LIST_YAML="${PARMgfs}/gdas/snow/snow_obs_list.yaml.j2" export io_layout_x="{{ IO_LAYOUT_X }}" export io_layout_y="{{ IO_LAYOUT_Y }}" diff --git a/dev/parm/config/gfs/yaml/defaults.yaml b/dev/parm/config/gfs/yaml/defaults.yaml index 03788924124..d7e3c094680 100644 --- a/dev/parm/config/gfs/yaml/defaults.yaml +++ b/dev/parm/config/gfs/yaml/defaults.yaml @@ -29,23 +29,25 @@ base: NMEM_ENS_GFS_OFFSET: 20 atmanl: - JCB_ALGO_YAML_VAR: "${PARMgfs}/gdas/atm/jcb-prototype_3dvar.yaml.j2" - JCB_ALGO_YAML_FV3INC: "${PARMgfs}/gdas/atm/jcb-prototype_3dvar-fv3inc.yaml.j2" STATICB_TYPE: "gsibec" LAYOUT_X_ATMANL: 8 LAYOUT_Y_ATMANL: 8 IO_LAYOUT_X: 1 IO_LAYOUT_Y: 1 + OBS_LIST_YAML: "${PARMgfs}/gdas/atm/atm_obs_list.yaml.j2" + VAR_JEDI_TEST_YAML: "" + FV3INC_JEDI_TEST_YAML: "" atmensanl: - JCB_ALGO_YAML_LETKF: "${PARMgfs}/gdas/atm/jcb-prototype_lgetkf.yaml.j2" - JCB_ALGO_YAML_OBS: "${PARMgfs}/gdas/atm/jcb-prototype_lgetkf_observer.yaml.j2" - JCB_ALGO_YAML_SOL: "${PARMgfs}/gdas/atm/jcb-prototype_lgetkf_solver.yaml.j2" - JCB_ALGO_YAML_FV3INC: "${PARMgfs}/gdas/atm/jcb-prototype_lgetkf-fv3inc.yaml.j2" LAYOUT_X_ATMENSANL: 8 LAYOUT_Y_ATMENSANL: 8 IO_LAYOUT_X: 1 IO_LAYOUT_Y: 1 + OBS_LIST_YAML: "${PARMgfs}/gdas/atm/atm_obs_list.yaml.j2" + LETKF_JEDI_TEST_YAML: "" + OBS_JEDI_TEST_YAML: "" + SOL_JEDI_TEST_YAML: "" + FV3INC_JEDI_TEST_YAML: "" aeroanl: IO_LAYOUT_X: 1 diff --git a/jobs/JGDAS_AERO_ANALYSIS_GENERATE_BMATRIX b/jobs/JGDAS_AERO_ANALYSIS_GENERATE_BMATRIX index e14705ef949..2824af6cdd6 100755 --- a/jobs/JGDAS_AERO_ANALYSIS_GENERATE_BMATRIX +++ b/jobs/JGDAS_AERO_ANALYSIS_GENERATE_BMATRIX @@ -13,9 +13,12 @@ source "${HOMEgfs}/ush/jjob_header.sh" -e "aeroanlgenb" -c "base aeroanl aeroanl # Generate COM variables from templates YMD=${PDY} HH=${cyc} declare_from_tmpl -rx COMIN_OBS:COM_OBS_TMPL \ COMOUT_CHEM_BMAT:COM_CHEM_BMAT_TMPL \ - COMIN_ATMOS_RESTART:COM_ATMOS_RESTART_TMPL + COMIN_ATMOS_RESTART:COM_ATMOS_RESTART_TMPL \ + COMOUT_CONF:COM_CONF_TMPL + mkdir -p "${COMOUT_CHEM_BMAT}" +mkdir -p "${COMOUT_CONF}" ############################################################### # Run relevant script diff --git a/jobs/JGLOBAL_AERO_ANALYSIS_FINALIZE b/jobs/JGLOBAL_AERO_ANALYSIS_FINALIZE index 44642b5b261..e2e2f065967 100755 --- a/jobs/JGLOBAL_AERO_ANALYSIS_FINALIZE +++ b/jobs/JGLOBAL_AERO_ANALYSIS_FINALIZE @@ -18,6 +18,10 @@ YMD=${PDY} HH=${cyc} declare_from_tmpl -rx \ COMOUT_CONF:COM_CONF_TMPL \ COMOUT_ATMOS_RESTART:COM_ATMOS_RESTART_TMPL +mkdir -m 755 -p "${COMOUT_CHEM_ANALYSIS}" +mkdir -m 755 -p "${COMOUT_ATMOS_RESTART}" +mkdir -m 755 -p "${COMOUT_CONF}" + ############################################################### # Run relevant script diff --git a/jobs/JGLOBAL_ATMOS_ANALYSIS_CALC_FV3JEDI b/jobs/JGLOBAL_ATMOS_ANALYSIS_CALC_FV3JEDI index 83da8f5dd88..b09d924f9d4 100755 --- a/jobs/JGLOBAL_ATMOS_ANALYSIS_CALC_FV3JEDI +++ b/jobs/JGLOBAL_ATMOS_ANALYSIS_CALC_FV3JEDI @@ -26,7 +26,11 @@ YMD=${PDY} HH=${cyc} RUN=${RUN} declare_from_tmpl -rx \ YMD=${PDY} HH=${cyc} RUN=${RUN} declare_from_tmpl -rx \ COMOUT_ATMOS_ANALYSIS:COM_ATMOS_ANALYSIS_TMPL RUN=${GDUMP} YMD=${gPDY} HH=${gcyc} declare_from_tmpl -rx \ - COMIN_ATMOS_HISTORY_PREV:COM_ATMOS_HISTORY_TMPL + COMIN_ATMOS_HISTORY_PREV:COM_ATMOS_HISTORY_TMPL +YMD=${PDY} HH=${cyc} declare_from_tmpl -rx \ + COMOUT_CONF:COM_CONF_TMPL + +mkdir -m 775 -p "${COMOUT_CONF}" ############################################## # Run relevant script diff --git a/jobs/JGLOBAL_ATM_ANALYSIS_INITIALIZE b/jobs/JGLOBAL_ATM_ANALYSIS_INITIALIZE index 4a470ff3b9b..8419405f040 100755 --- a/jobs/JGLOBAL_ATM_ANALYSIS_INITIALIZE +++ b/jobs/JGLOBAL_ATM_ANALYSIS_INITIALIZE @@ -24,8 +24,6 @@ RUN=${GDUMP} YMD=${gPDY} HH=${gcyc} declare_from_tmpl -rx \ COMIN_ATMOS_ANALYSIS_PREV:COM_ATMOS_ANALYSIS_TMPL \ COMIN_ATMOS_HISTORY_PREV:COM_ATMOS_HISTORY_TMPL -mkdir -m 775 -p "${COMIN_ATMOS_ANALYSIS_PREV}" - ############################################################### # Run relevant script diff --git a/sorc/gdas.cd b/sorc/gdas.cd index 811d86b57a3..6b00f289cf8 160000 --- a/sorc/gdas.cd +++ b/sorc/gdas.cd @@ -1 +1 @@ -Subproject commit 811d86b57a34aff3452f7a2869ab1b4cf0c2daa4 +Subproject commit 6b00f289cf8333133a55bb5ac534a0ddfed74980 diff --git a/ush/python/pygfs/jedi/jedi.py b/ush/python/pygfs/jedi/jedi.py index 0420eb37284..2cb1d5a1aff 100644 --- a/ush/python/pygfs/jedi/jedi.py +++ b/ush/python/pygfs/jedi/jedi.py @@ -181,13 +181,13 @@ def render_jcb(self, task_config: AttrDict, algorithm_in: Optional[str] = None) @staticmethod @logit(logger) - def get_jedi_dict(jedi_config_yaml: str, task_config: AttrDict, expected_block_names: Optional[list] = None): + def get_jedi_dict(jedi_config_dict: dict, task_config: AttrDict, expected_block_names: Optional[list] = None): """Get dictionary of Jedi objects from YAML specifying their configuration dictionaries Parameters ---------- - jedi_config_yaml : str - path to YAML specifying configuration dictionaries for Jedi objects + jedi_config_dict : dict + dictionary parsed from a J2-YAML file specifying configuration dictionaries for JEDI objects task_config : str attribute-dictionary of all configuration variables associated with a GDAS task expected_block_names (optional) : str @@ -201,9 +201,6 @@ def get_jedi_dict(jedi_config_yaml: str, task_config: AttrDict, expected_block_n # Initialize dictionary of Jedi objects jedi_dict = AttrDict() - # Parse J2-YAML file for dictionary of JEDI configuration dictionaries - jedi_config_dict = parse_j2yaml(jedi_config_yaml, task_config) - # Loop through dictionary of Jedi configuration dictionaries for block_name in jedi_config_dict: # yaml_name key is set to name for this block @@ -266,9 +263,9 @@ def clean_empty_obsspaces(self): observers.clear() observers.extend(cleaned_observers) - # If no observers left in list, raise error + # Warn if no observers left in list if observers == []: - raise WorkflowException(f"No observers found in JEDI input config") + logger.warning(f"No observers found in JEDI input config") @staticmethod @logit(logger) diff --git a/ush/python/pygfs/task/aero_analysis.py b/ush/python/pygfs/task/aero_analysis.py index 28310ea1643..d5408e34e68 100644 --- a/ush/python/pygfs/task/aero_analysis.py +++ b/ush/python/pygfs/task/aero_analysis.py @@ -1,28 +1,24 @@ #!/usr/bin/env python3 import os -import glob -import gzip -import tarfile from logging import getLogger -from pprint import pformat from netCDF4 import Dataset from typing import Dict, List - -from wxflow import (AttrDict, - FileHandler, - add_to_datetime, to_fv3time, to_timedelta, - to_fv3time, - Task, - YAMLFile, parse_j2yaml, - logit) +from pygfs.task.analysis import Analysis from pygfs.jedi import Jedi +from wxflow import ( + AttrDict, + FileHandler, + to_fv3time, to_timedelta, + YAMLFile, parse_j2yaml, + logit +) import numpy as np logger = getLogger(__name__.split('.')[-1]) -class AerosolAnalysis(Task): +class AerosolAnalysis(Analysis): """ Class for JEDI-based global aerosol analysis tasks """ @@ -48,36 +44,40 @@ def __init__(self, config): _res = int(self.task_config['CASE'][1:]) _res_anl = int(self.task_config['CASE_ANL'][1:]) - _window_begin = add_to_datetime(self.task_config.current_cycle, -to_timedelta(f"{self.task_config['assim_freq']}H") / 2) - # Create a local dictionary that is repeatedly used across this class - local_dict = AttrDict( + if self.task_config.DOIAU: + _anl_time = self.task_config.WINDOW_BEGIN + else: + _anl_time = self.task_config.current_cycle + + _bkg_times = [] + for hour in self.task_config.aero_bkg_times: + _bkg_times.append(self.task_config.WINDOW_BEGIN + to_timedelta(f"{str(hour)}H") - to_timedelta(f"{self.task_config.assim_freq}H") / 2) + + # Extend task_config with variables repeatedly used across this class + self.task_config.update(AttrDict( { 'npx_ges': _res + 1, 'npy_ges': _res + 1, 'npz_ges': self.task_config.LEVS - 1, - 'npz': self.task_config.LEVS - 1, 'npx_anl': _res_anl + 1, 'npy_anl': _res_anl + 1, 'npz_anl': self.task_config['LEVS'] - 1, - 'AERO_WINDOW_BEGIN': _window_begin, - 'AERO_WINDOW_LENGTH': f"PT{self.task_config['assim_freq']}H", - 'aero_bkg_fhr': [fh - 3 for fh in self.task_config['aero_bkg_times']], - 'OPREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", - 'APREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", - 'GPREFIX': f"gcdas.t{self.task_config.previous_cycle.hour:02d}z.", - 'aero_obsdatain_path': f"{self.task_config.DATA}/obs/", - 'aero_obsdataout_path': f"{self.task_config.DATA}/diags/", - 'BKG_TSTEP': "PT3H" # FGAT + 'npz': self.task_config.LEVS - 1, + 'BKG_TSTEP': "PT3H", # FGAT + 'BERROR_YAML': f'aero_background_error_static_{self.task_config.STATICB_TYPE}', + 'AERO_BMATRIX_RESCALE_YAML': 'aero_gen_bmatrix_rescale_default.yaml.j2', + 'anl_time': _anl_time, + 'bkg_times': _bkg_times, } - ) + )) - # Extend task_config with local_dict - self.task_config = AttrDict(**self.task_config, **local_dict) + # Extend task_config with content of config yaml for this task + self.task_config.update(parse_j2yaml(self.task_config.TASK_CONFIG_YAML, self.task_config)) # Create dictionary of Jedi objects expected_keys = ['aeroanlvar'] - self.jedi_dict = Jedi.get_jedi_dict(self.task_config.JEDI_CONFIG_YAML, self.task_config, expected_keys) + self.jedi_dict = Jedi.get_jedi_dict(self.task_config.jedi_config, expected_keys) @logit(logger) def initialize(self) -> None: @@ -85,55 +85,18 @@ def initialize(self) -> None: This method will initialize a global aerosol analysis using JEDI. This includes: - - initialize JEDI applications - - staging observation files - - staging bias correction files - - staging CRTM fix files - - staging FV3-JEDI fix files - - staging B error files - - staging model backgrounds - - creating output directories + - stage input files from COM and create output directories + - extract bias corrections from tar files + - initialize JEDI application """ - # stage observations - logger.info(f"Staging list of observation files generated from JEDI config") - obs_dict = self.jedi_dict['aeroanlvar'].render_jcb(self.task_config, 'aero_obs_staging') - FileHandler(obs_dict).sync() - logger.debug(f"Observation files:\n{pformat(obs_dict)}") + # Stage files from COM + logger.info(f"Staging files from COM") + FileHandler(self.task_config.data_in).sync() - # # stage bias corrections - logger.info(f"Staging list of bias correction files") - bias_dict = self.jedi_dict['aeroanlvar'].render_jcb(self.task_config, 'aero_bias_staging') - - if bias_dict['copy'] is None: - logger.info(f"No bias correction files to stage") - else: - try: - bias_dict['copy'] = Jedi.remove_redundant(bias_dict['copy']) - FileHandler(bias_dict).sync() - logger.debug(f"Bias correction files:\n{pformat(bias_dict)}") - - # extract bias corrections - Jedi.extract_tar_from_filehandler_dict(bias_dict) - except FileNotFoundError: - logger.error(f"Bias correction files or directories do not exist:\n{pformat(bias_dict)}") - - # stage CRTM fix files - logger.info(f"Staging CRTM fix files from {self.task_config.STAGE_CRTM_COEFF_YAML}") - crtm_fix_dict = parse_j2yaml(self.task_config.STAGE_CRTM_COEFF_YAML, self.task_config) - FileHandler(crtm_fix_dict).sync() - logger.debug(f"CRTM fix files:\n{pformat(crtm_fix_dict)}") - - # stage fix files - logger.info(f"Staging JEDI fix files from {self.task_config.STAGE_JEDI_FIX_YAML}") - jedi_fix_dict = parse_j2yaml(self.task_config.STAGE_JEDI_FIX_YAML, self.task_config) - FileHandler(jedi_fix_dict).sync() - logger.debug(f"JEDI fix files:\n{pformat(jedi_fix_dict)}") - - # stage files from COM and create working directories - logger.info(f"Staging files prescribed from {self.task_config.STAGE_YAML}") - stage_dict = parse_j2yaml(self.task_config.STAGE_YAML, self.task_config) - FileHandler(stage_dict).sync() + # Extract bias corrections from tar files + logger.info(f"Extracting bias corrections from tar files") + self.untar_bias_corrections() # initialize JEDI variational application logger.info(f"Initializing JEDI variational DA application") @@ -161,56 +124,28 @@ def finalize(self) -> None: This method will finalize a global aerosol analysis using JEDI. This includes: - - tarring up output diag files and place in ROTDIR - - copying the generated YAML file from initialize to the ROTDIR - - copying the guess files to the ROTDIR - - applying the increments to the original RESTART files - - moving the increment files to the ROTDIR + - apply increments to the original RESTART files + - compress and tar output diag files in COM + - tar radiative bias correction files in COM + - save output files and YAMLs to COM """ - # ---- tar up diags - # path of output tar statfile - logger.info('Preparing observation space diagnostics for archiving') - aerostat = os.path.join(self.task_config.COMOUT_CHEM_ANALYSIS, f"{self.task_config['APREFIX']}aerostat.tgz") - - # get list of diag files to put in tarball - diags = glob.glob(os.path.join(self.task_config['DATA'], 'diags', 'diag*nc')) - - # gzip the files first - for diagfile in diags: - logger.info(f'Adding {diagfile} to tar file') - with open(diagfile, 'rb') as f_in, gzip.open(f"{diagfile}.gz", 'wb') as f_out: - f_out.writelines(f_in) # ---- add increments to RESTART files logger.info('Adding increments to RESTART files') self._add_fms_cube_sphere_increments() - # tar up bias correction files - bfile = f"{self.task_config.APREFIX}aero_varbc_params.tar" - aertar = os.path.join(self.task_config.COMOUT_CHEM_ANALYSIS, bfile) - - # get lists of aerosol bias correction files to add to tarball - satlist = glob.glob(os.path.join(self.task_config.DATA, 'bc', '*satbias*nc')) - - # copy files back to COM - logger.info(f"Copying files to COM based on {self.task_config.SAVE_YAML}") - save_dict = parse_j2yaml(self.task_config.SAVE_YAML, self.task_config) - FileHandler(save_dict).sync() - - # tar aerosol bias correction files to ROTDIR - logger.info(f"Creating aerosol bias correction tar file {aertar}") - with tarfile.open(aertar, 'w') as aerbcor: - for satfile in satlist: - aerbcor.add(satfile, arcname=os.path.basename(satfile)) - logger.info(f"Add {aerbcor.getnames()}") - - # open tar file for writing - with tarfile.open(aerostat, "w|gz") as archive: - for diagfile in diags: - diaggzip = f"{diagfile}.gz" - archive.add(diaggzip, arcname=os.path.basename(diaggzip)) - logger.info(f'Saved diags to {aerostat}') + # Compress and tar diag files in COM directory + self.tar_diag_files(self.task_config.COMOUT_CHEM_ANALYSIS, + f"{self.task_config['APREFIX']}aerostat.tgz") + + # Tar radiative bias correction files into COM directory + self.tar_radiative_bias_corrections(self.task_config.COMOUT_CHEM_ANALYSIS, + f"{self.task_config.APREFIX}aero_varbc_params.tar") + + # Save files from COM + logger.info(f"Saving files to COM") + FileHandler(self.task_config.data_out).sync() def clean(self): super().clean() diff --git a/ush/python/pygfs/task/aero_bmatrix.py b/ush/python/pygfs/task/aero_bmatrix.py index cc9918d4d77..511b7dcdcf5 100644 --- a/ush/python/pygfs/task/aero_bmatrix.py +++ b/ush/python/pygfs/task/aero_bmatrix.py @@ -1,18 +1,14 @@ #!/usr/bin/env python3 -import os from logging import getLogger -from typing import List, Dict - -from wxflow import (AttrDict, FileHandler, - add_to_datetime, to_timedelta, - parse_j2yaml, logit, Task) +from pygfs.task.analysis import Analysis from pygfs.jedi import Jedi +from wxflow import AttrDict, FileHandler, add_to_datetime, to_timedelta, parse_j2yaml, logit logger = getLogger(__name__.split('.')[-1]) -class AerosolBMatrix(Task): +class AerosolBMatrix(Analysis): """ Class for global aerosol BMatrix tasks """ @@ -40,46 +36,37 @@ def __init__(self, config): _res_anl = int(self.task_config['CASE_ANL'][1:]) _window_begin = add_to_datetime(self.task_config.current_cycle, -to_timedelta(f"{self.task_config['assim_freq']}H") / 2) - # fix ocnres - self.task_config.OCNRES = f"{self.task_config.OCNRES:03d}" - - # Create a local dictionary that is repeatedly used across this class - local_dict = AttrDict( + # Extend task_config with variables repeatedly used across this class + self.task_config.update(AttrDict( { 'npx_ges': _res + 1, 'npy_ges': _res + 1, 'npz_ges': self.task_config.LEVS - 1, - 'npz': self.task_config.LEVS - 1, 'npx_anl': _res_anl + 1, 'npy_anl': _res_anl + 1, 'npz_anl': self.task_config['LEVS'] - 1, - 'AERO_WINDOW_BEGIN': _window_begin, - 'AERO_WINDOW_LENGTH': f"PT{self.task_config['assim_freq']}H", - 'aero_bkg_fhr': map(int, str(self.task_config['aero_bkg_times']).split(',')), - 'OPREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", - 'APREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", - 'GPREFIX': f"gdas.t{self.task_config.previous_cycle.hour:02d}z.", - 'aero_obsdatain_path': f"{self.task_config.DATA}/obs/", - 'aero_obsdataout_path': f"{self.task_config.DATA}/diags/", + 'npz': self.task_config.LEVS - 1, + 'BERROR_YAML': f'aero_background_error_static_{self.task_config.STATICB_TYPE}', + 'BERROR_DATA_DIR': f'{self.task_config.FIXgfs}/gdas/aero/clim_b', + 'AERO_BMATRIX_RESCALE_YAML': 'aero_gen_bmatrix_rescale_default.yaml.j2', } - ) + )) - # task_config is everything that this task should need - self.task_config = AttrDict(**self.task_config, **local_dict) + # Extend task_config with content of config yaml for this task + self.task_config.update(parse_j2yaml(self.task_config.TASK_CONFIG_YAML, self.task_config)) # Create dictionary of Jedi objects expected_keys = ['aero_interpbkg', 'aero_diagb', 'aero_diffusion'] - self.jedi_dict = Jedi.get_jedi_dict(self.task_config.JEDI_CONFIG_YAML, self.task_config, expected_keys) + self.jedi_dict = Jedi.get_jedi_dict(self.task_config.jedi_config, self.task_config, expected_keys) @logit(logger) - def initialize(self: Task) -> None: + def initialize(self) -> None: """Initialize a global aerosol B-matrix This method will initialize a global aerosol B-Matrix. This includes: - - staging the determinstic backgrounds - - staging fix files - - initializing the JEDI applications + - stage input files from COM and create output directories + - initialize JEDI applications Parameters ---------- @@ -90,17 +77,12 @@ def initialize(self: Task) -> None: None """ - # stage fix files - logger.info(f"Staging JEDI fix files from {self.task_config.STAGE_JEDI_FIX_YAML}") - jedi_fix_list = parse_j2yaml(self.task_config.STAGE_JEDI_FIX_YAML, self.task_config) - FileHandler(jedi_fix_list).sync() - - # stage files from COM and create working directories - logger.info(f"Staging files from COM and creating working directories {self.task_config.STAGE_YAML}") - stage_dict = parse_j2yaml(self.task_config.STAGE_YAML, self.task_config) - FileHandler(stage_dict).sync() + # Stage files from COM + logger.info(f"Staging files from COM") + FileHandler(self.task_config.data_in).sync() # initialize JEDI applications + logger.info(f"Initializing JEDI applications") self.jedi_dict['aero_interpbkg'].initialize(self.task_config) self.jedi_dict['aero_diagb'].initialize(self.task_config) self.jedi_dict['aero_diffusion'].initialize(self.task_config) @@ -137,11 +119,10 @@ def finalize(self) -> None: This method will finalize a global aerosol bmatrix using JEDI. This includes: - - copying the bmatrix files to COM - - copying YAMLs to COM + - save output files and YAMLs to COM """ - # save files to COMOUT - logger.info(f"Saving files to COMOUT based on {self.task_config.SAVE_YAML}") - save_dict = parse_j2yaml(self.task_config.SAVE_YAML, self.task_config) - FileHandler(save_dict).sync() + + # Save files to COM + logger.info(f"Saving files to COM") + FileHandler(self.task_config.data_out).sync() diff --git a/ush/python/pygfs/task/analysis.py b/ush/python/pygfs/task/analysis.py index 1d8b38483b0..1998e5a6c04 100644 --- a/ush/python/pygfs/task/analysis.py +++ b/ush/python/pygfs/task/analysis.py @@ -1,274 +1,213 @@ #!/usr/bin/env python3 -import os import glob -import tarfile +import gzip from logging import getLogger -from pprint import pformat -from netCDF4 import Dataset -from typing import List, Dict, Any, Union, Optional - -from jcb import render -from wxflow import (parse_j2yaml, FileHandler, rm_p, logit, - Task, Executable, WorkflowException, to_fv3time, to_YMD, - Template, TemplateConstants) +import os +import tarfile +from typing import Any, Dict +from wxflow import (AttrDict, Task, WorkflowException, + add_to_datetime, to_timedelta, to_isotime, + parse_j2yaml, + logit) logger = getLogger(__name__.split('.')[-1]) class Analysis(Task): - """Parent class for GDAS tasks - - The Analysis class is the parent class for all - Global Data Assimilation System (GDAS) tasks - directly related to peforming an analysis """ + General class for JEDI-based global analysis tasks + """ + @logit(logger, name="Analysis") + def __init__(self, config: Dict[str, Any]): + """Constructor global atm analysis task - def __init__(self, config: Dict[str, Any]) -> None: - super().__init__(config) - # Store location of GDASApp jinja2 templates - self.gdasapp_j2tmpl_dir = os.path.join(self.task_config.PARMgfs, 'gdas') - # fix ocnres - self.task_config.OCNRES = f"{self.task_config.OCNRES :03d}" - - def initialize(self) -> None: - super().initialize() - - # all JEDI analyses need a JEDI config - self.task_config.jedi_config = self.get_jedi_config() - - # all analyses need to stage observations - obs_dict = self.get_obs_dict() - FileHandler(obs_dict).sync() - - # link jedi executable to run directory - self.link_jediexe() - - @logit(logger) - def get_jedi_config(self, algorithm: Optional[str] = None) -> Dict[str, Any]: - """Compile a dictionary of JEDI configuration from JEDIYAML template file + This method will construct a global atm analysis task. + This includes: + - extending the task_config attribute AttrDict to include parameters required for this task Parameters ---------- - algorithm (optional) : str - Name of the algorithm to use in the JEDI configuration. Will override the algorithm - set in the self.task_config.JCB_<>_YAML file + config: Dict + dictionary object containing task configuration Returns ---------- - jedi_config : Dict - a dictionary containing the fully rendered JEDI yaml configuration + None """ + super().__init__(config) - # generate JEDI YAML file - logger.info(f"Generate JEDI YAML config: {self.task_config.jedi_yaml}") - - if 'JCB_BASE_YAML' in self.task_config.keys(): - # Step 1: fill templates of the jcb base YAML file - jcb_config = parse_j2yaml(self.task_config.JCB_BASE_YAML, self.task_config) - - # Step 2: (optional) fill templates of algorithm override YAML and merge - if 'JCB_ALGO_YAML' in self.task_config.keys(): - jcb_algo_config = parse_j2yaml(self.task_config.JCB_ALGO_YAML, self.task_config) - jcb_config = {**jcb_config, **jcb_algo_config} - - # If algorithm is present override the algorithm in the JEDI config - if algorithm: - jcb_config['algorithm'] = algorithm - - # Step 3: generate the JEDI Yaml using JCB driving YAML - jedi_config = render(jcb_config) - elif 'JEDIYAML' in self.task_config.keys(): - # Generate JEDI YAML file (without using JCB) - logger.info(f"Generate JEDI YAML config: {self.task_config.jedi_yaml}") - jedi_config = parse_j2yaml(self.task_config.JEDIYAML, self.task_config, - searchpath=self.gdasapp_j2tmpl_dir) - logger.debug(f"JEDI config:\n{pformat(jedi_config)}") - else: - raise KeyError(f"Task config must contain JCB_BASE_YAML or JEDIYAML") + # Get assimilation window times + _window_begin = add_to_datetime(self.task_config.current_cycle, -to_timedelta(f"{self.task_config.assim_freq}H") / 2) + _next_cycle = add_to_datetime(self.task_config.current_cycle, to_timedelta(f"{self.task_config.assim_freq}H")) - logger.debug(f"JEDI config:\n{pformat(jedi_config)}") + # Get specific assimilation times within the assimulation window + _iau_times_iso = [] + for hour in self.task_config.IAUFHRS: + _iau_times_iso.append(to_isotime(_window_begin + to_timedelta(f"{str(hour)}H") - to_timedelta(f"{self.task_config.assim_freq}H") / 2)) - return jedi_config + # Get observations list from obs list yaml + if 'OBS_LIST_YAML' in self.task_config: + _observations = parse_j2yaml(self.task_config.OBS_LIST_YAML, self.task_config)['observations'] + else: + _observations = [] - @logit(logger) - def get_obs_dict(self) -> Dict[str, Any]: - """Compile a dictionary of observation files to copy + # Get bias correction dict from bias files yaml + if 'BIAS_FILES_YAML' in self.task_config: + _bias_files = parse_j2yaml(self.task_config.BIAS_FILES_YAML, self.task_config)['bias_files'] + else: + _bias_files = AttrDict - This method extracts 'observers' from the JEDI yaml and from that list, extracts a list of - observation files that are to be copied to the run directory - from the observation input directory + # Set prefix needed for GPREFIX, depedning on the model + if self.task_config.NET == 'gcafs': + _da_prefix = 'gcdas' + else: + _da_prefix = 'gdas' + + # Extend task_config with variables that are repeatedly used across this class + self.task_config.update(AttrDict( + { + 'WINDOW_BEGIN': _window_begin, + 'WINDOW_LENGTH': f"PT{self.task_config.assim_freq}H", + 'next_cycle': _next_cycle, + 'OPREFIX': f"{self.task_config.RUN.replace('enkf','')}.t{self.task_config.cyc:02d}z.", + 'APREFIX': f"{self.task_config.RUN.replace('enkf','')}.t{self.task_config.cyc:02d}z.", + 'APREFIX_ENS': f"enkf{self.task_config.RUN.replace('enkf','')}.t{self.task_config.cyc:02d}z.", + 'GPREFIX': f"{_da_prefix}.t{self.task_config.previous_cycle.hour:02d}z.", + 'GPREFIX_ENS': f"enkf{_da_prefix}.t{self.task_config.previous_cycle.hour:02d}z.", + 'OCNRES': f"{self.task_config.OCNRES:03d}", + 'iau_times_iso': _iau_times_iso, + 'observations': _observations, + 'bias_files': _bias_files, + } + )) - Parameters - ---------- + def initialize(self) -> None: + self.initialize() - Returns - ---------- - obs_dict: Dict - a dictionary containing the list of observation files to copy for FileHandler - """ + def execute(self) -> None: + super.execute() + + def finalize(self) -> None: + super.finalize() - logger.info(f"Extracting a list of observation files from Jedi config file") - observations = find_value_in_nested_dict(self.task_config.jedi_config, 'observations') - logger.debug(f"observations:\n{pformat(observations)}") - - copylist = [] - for ob in observations['observers']: - obfile = ob['obs space']['obsdatain']['engine']['obsfile'] - basename = os.path.basename(obfile) - copylist.append([os.path.join(self.task_config['COM_OBS'], basename), obfile]) - obs_dict = { - 'mkdir': [os.path.join(self.task_config['DATA'], 'obs')], - 'copy': copylist - } - return obs_dict + def clean(self) -> None: + super().clean() @logit(logger) - def add_fv3_increments(self, inc_file_tmpl: str, bkg_file_tmpl: str, incvars: List) -> None: - """Add cubed-sphere increments to cubed-sphere backgrounds + def untar_bias_corrections(self) -> None: + """Extract bias correction files from tarballs + This method will extract bias correction files from tarballs Parameters ---------- - inc_file_tmpl : str - template of the FV3 increment file of the form: 'filetype.tile{tilenum}.nc' - bkg_file_tmpl : str - template of the FV3 background file of the form: 'filetype.tile{tilenum}.nc' - incvars : List - List of increment variables to add to the background + None + + Returns + ---------- + None """ - for itile in range(1, self.task_config.ntiles + 1): - inc_path = inc_file_tmpl.format(tilenum=itile) - bkg_path = bkg_file_tmpl.format(tilenum=itile) - with Dataset(inc_path, mode='r') as incfile, Dataset(bkg_path, mode='a') as rstfile: - for vname in incvars: - increment = incfile.variables[vname][:] - bkg = rstfile.variables[vname][:] - anl = bkg + increment - rstfile.variables[vname][:] = anl[:] - try: - rstfile.variables[vname].delncattr('checksum') # remove the checksum so fv3 does not complain - except (AttributeError, RuntimeError): - pass # checksum is missing, move on + bias_file_list = [] + for ob in self.task_config.observations: + if ob in self.task_config.bias_files and not self.task_config.bias_files[ob] in bias_file_list: + bias_file_list.append(self.task_config.bias_files[ob]) + bias_file_path = f'{self.task_config.DATA}/obs/{self.task_config.GPREFIX}{self.task_config.bias_files[ob]}' + if os.path.exists(bias_file_path): + extract_tar(bias_file_path) + else: + logger.warning(f"Bias correction file {bias_file_path} does not exist and will be skipped") @logit(logger) - def link_jediexe(self) -> None: - """ - - This method links a JEDI executable to the run directory + def tar_diag_files(self, comout: str, tarball_name: str) -> None: + """Compress and tar diag files into COM directory Parameters ---------- - Task: GDAS task + comout: str + path to COM output directory + tarball_name: str + name of output tar file Returns ---------- None """ - exe_src = self.task_config.JEDIEXE - # TODO: linking is not permitted per EE2. Needs work in JEDI to be able to copy the exec. - logger.info(f"Link executable {exe_src} to DATA/") - logger.warn("Linking is not permitted per EE2.") - exe_dest = os.path.join(self.task_config.DATA, os.path.basename(exe_src)) - if os.path.exists(exe_dest): - rm_p(exe_dest) - os.symlink(exe_src, exe_dest) + # Set paths of output tar files + diagtar = os.path.join(comout, tarball_name) - return exe_dest + # Get lists of files to put in tarballs + diaglist = glob.glob(os.path.join(self.task_config.DATA, 'diags', 'diag*nc')) + + # Compress diag files + logger.info(f"Compressing {len(diaglist)} diag files") + for diagfile in diaglist: + with open(diagfile, 'rb') as f_in, gzip.open(f"{diagfile}.gz", 'wb') as f_out: + f_out.writelines(f_in) + + # Create tarball of compressed diag files in COM + logger.debug(f"Creating tarball {diagtar} with {len(diaglist)} compressed diag files") + with tarfile.open(diagtar, "w") as archive: + for diagfile in diaglist: + diaggzip = f"{diagfile}.gz" + archive.add(diaggzip, arcname=os.path.basename(diaggzip)) - @staticmethod @logit(logger) - def tgz_diags(statfile: str, diagdir: str) -> None: - """tar and gzip the diagnostic files resulting from a JEDI analysis. + def tar_radiative_bias_corrections(self, comout: str, tarball_name: str) -> None: + """Tar radiative bias correction files and into COM directory Parameters ---------- - statfile : str | os.PathLike - Path to the output .tar.gz .tgz file that will contain the diag*.nc files e.g. atmstat.tgz - diagdir : str | os.PathLike - Directory containing JEDI diag files + comout: str + path to COM output directory + tarball_name: str + name of output tar file + + Returns + ---------- + None """ - # get list of diag files to put in tarball - diags = glob.glob(os.path.join(diagdir, 'diags', 'diag*nc')) - diags.extend(glob.glob(os.path.join(diagdir, 'diags', 'diag*nc4'))) + # Set paths of output tar files + radtar = os.path.join(comout, tarball_name) - logger.info(f"Compressing {len(diags)} diag files to {statfile}") + # Get lists of files to put in tarballs + satlist = glob.glob(os.path.join(self.task_config.DATA, 'bc', '*satbias*nc')) + tlaplist = glob.glob(os.path.join(self.task_config.DATA, 'obs', '*tlapse.txt')) - # Open tar.gz file for writing - with tarfile.open(statfile, "w:gz") as tgz: - # Add diag files to tarball - for diagfile in diags: - tgz.add(diagfile, arcname=os.path.basename(diagfile)) + # Create tarball of radiance bias correction files + logger.info(f"Creating radiance bias correction tarball {radtar}") + with tarfile.open(radtar, 'w') as radbcor: + logger.info(f"Adding {radbcor.getnames()}") + for satfile in satlist: + radbcor.add(satfile, arcname=os.path.basename(satfile)) + for tlapfile in tlaplist: + # Change OPREFIX to APREFIX in tlapse file name when adding to tarball + radbcor.add(tlapfile, arcname=os.path.basename(tlapfile.replace(self.task_config.GPREFIX, self.task_config.APREFIX))) @logit(logger) -def find_value_in_nested_dict(nested_dict: Dict, target_key: str) -> Any: - """ - Recursively search through a nested dictionary and return the value for the target key. - This returns the first target key it finds. So if a key exists in a subsequent - nested dictionary, it will not be found. +def extract_tar(tar_file: str) -> None: + """Extract files from a tarball + + This method extract files from a tarball Parameters ---------- - nested_dict : Dict - Dictionary to search - target_key : str - Key to search for + tar_file + path/name of tarball Returns - ------- - Any - Value of the target key - - Raises - ------ - KeyError - If key is not found in dictionary - - TODO: if this gives issues due to landing on an incorrect key in the nested - dictionary, we will have to implement a more concrete method to search for a key - given a more complete address. See resolved conversations in PR 2387 - - # Example usage: - nested_dict = { - 'a': { - 'b': { - 'c': 1, - 'd': { - 'e': 2, - 'f': 3 - } - }, - 'g': 4 - }, - 'h': { - 'i': 5 - }, - 'j': { - 'k': 6 - } - } - - user_key = input("Enter the key to search for: ") - result = find_value_in_nested_dict(nested_dict, user_key) + ---------- + None """ - if not isinstance(nested_dict, dict): - raise TypeError(f"Input is not of type(dict)") - - result = nested_dict.get(target_key) - if result is not None: - return result - - for value in nested_dict.values(): - if isinstance(value, dict): - try: - result = find_value_in_nested_dict(value, target_key) - if result is not None: - return result - except KeyError: - pass - - raise KeyError(f"Key '{target_key}' not found in the nested dictionary") + # extract files from tar file + tar_path = os.path.dirname(tar_file) + try: + with tarfile.open(tar_file, "r") as tarball: + tarball.extractall(path=tar_path) + logger.info(f"Extract {tarball.getnames()}") + except Exception as e: + raise WorkflowException(f"An error occurred while extracting {tar_file}:\n{e}") from e diff --git a/ush/python/pygfs/task/analysis_stats.py b/ush/python/pygfs/task/analysis_stats.py index cfe34d3257f..086f289d3a1 100644 --- a/ush/python/pygfs/task/analysis_stats.py +++ b/ush/python/pygfs/task/analysis_stats.py @@ -74,7 +74,8 @@ def initialize(self) -> None: # Expected keys are what must be included from the JEDI config file. We can # then loop through ob space list from scripts/exglobal_analysis_stats.py expected_keys = ['aero', 'atmos', 'snow'] - self.jedi_dict = Jedi.get_jedi_dict(self.task_config.JEDI_CONFIG_YAML, self.task_config, expected_keys) + jedi_config_dict = parse_j2yaml(self.task_config.JEDI_CONFIG_YAML, self.task_config) + self.jedi_dict = Jedi.get_jedi_dict(jedi_config_dict, self.task_config, expected_keys) logger.info(f"Copying files to {self.task_config.DATA}/stats") diff --git a/ush/python/pygfs/task/atm_analysis.py b/ush/python/pygfs/task/atm_analysis.py index dae5469a1c3..0ea0724ef03 100644 --- a/ush/python/pygfs/task/atm_analysis.py +++ b/ush/python/pygfs/task/atm_analysis.py @@ -1,24 +1,17 @@ #!/usr/bin/env python3 -import os -import glob -import gzip -import tarfile from logging import getLogger -from pprint import pformat -from typing import Any, Dict -from wxflow import (AttrDict, FileHandler, Task, - add_to_datetime, to_timedelta, - parse_j2yaml, - logit) +from pygfs.task.analysis import Analysis from pygfs.jedi import Jedi +from typing import Any, Dict +from wxflow import AttrDict, FileHandler, parse_j2yaml, logit logger = getLogger(__name__.split('.')[-1]) -class AtmAnalysis(Task): +class AtmAnalysis(Analysis): """ - Class for JEDI-based global atm analysis tasks + Class for JEDI-based global atm deterministic analysis tasks """ @logit(logger, name="AtmAnalysis") def __init__(self, config: Dict[str, Any]): @@ -42,37 +35,33 @@ def __init__(self, config: Dict[str, Any]): _res = int(self.task_config.CASE[1:]) _res_anl = int(self.task_config.CASE_ANL[1:]) - _window_begin = add_to_datetime(self.task_config.current_cycle, -to_timedelta(f"{self.task_config.assim_freq}H") / 2) + + if self.task_config.DOHYBVAR: + _BERROR_YAML = f"atmosphere_background_error_hybrid_{self.task_config.STATICB_TYPE}_{self.task_config.LOCALIZATION_TYPE}" + else: + _BERROR_YAML = f"atmosphere_background_error_static_{self.task_config.STATICB_TYPE}" # Create a local dictionary that is repeatedly used across this class - local_dict = AttrDict( + self.task_config.update(AttrDict( { 'npx_ges': _res + 1, 'npy_ges': _res + 1, 'npz_ges': self.task_config.LEVS - 1, - 'npz': self.task_config.LEVS - 1, 'npx_anl': _res_anl + 1, 'npy_anl': _res_anl + 1, 'npz_anl': self.task_config.LEVS - 1, - 'ATM_WINDOW_BEGIN': _window_begin, - 'ATM_WINDOW_LENGTH': f"PT{self.task_config.assim_freq}H", - 'OPREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", - 'APREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", - 'APREFIX_ENS': f"enkf{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", - 'GPREFIX': f"gdas.t{self.task_config.previous_cycle.hour:02d}z.", - 'GPREFIX_ENS': f"enkfgdas.t{self.task_config.previous_cycle.hour:02d}z.", - 'atm_obsdatain_path': f"{self.task_config.DATA}/obs/", - 'atm_obsdataout_path': f"{self.task_config.DATA}/diags/", - 'BKG_TSTEP': "PT1H" # Placeholder for 4D applications + 'npz': self.task_config.LEVS - 1, + 'BKG_TSTEP': "PT1H", # Placeholder for 4D applications + 'BERROR_YAML': _BERROR_YAML, } - ) + )) - # Extend task_config with local_dict - self.task_config = AttrDict(**self.task_config, **local_dict) + # Extend task_config with content of config yaml for this task + self.task_config.update(parse_j2yaml(self.task_config.TASK_CONFIG_YAML, self.task_config)) # Create dictionary of Jedi objects expected_keys = ['atmanlvar', 'atmanlfv3inc'] - self.jedi_dict = Jedi.get_jedi_dict(self.task_config.JEDI_CONFIG_YAML, self.task_config, expected_keys) + self.jedi_dict = Jedi.get_jedi_dict(self.task_config.jedi_config, self.task_config, expected_keys) @logit(logger) def initialize(self) -> None: @@ -80,14 +69,9 @@ def initialize(self) -> None: This method will initialize a global atm analysis. This includes: + - stage input files from COM and create output directories + - extract bias corrections from tar files - initialize JEDI applications - - staging observation files - - staging bias correction files - - staging CRTM fix files - - staging FV3-JEDI fix files - - staging B error files - - staging model backgrounds - - creating output directories Parameters ---------- @@ -98,73 +82,17 @@ def initialize(self) -> None: None """ - # stage observations - logger.info(f"Staging list of observation files") - obs_dict = self.jedi_dict['atmanlvar'].render_jcb(self.task_config, 'atm_obs_staging') - FileHandler(obs_dict).sync() - logger.debug(f"Observation files:\n{pformat(obs_dict)}") - - # stage bias corrections - logger.info(f"Staging list of bias correction files") - bias_dict = self.jedi_dict['atmanlvar'].render_jcb(self.task_config, 'atm_bias_staging') - if bias_dict['copy'] is None: - logger.info(f"No bias correction files to stage") - else: - bias_dict['copy'] = Jedi.remove_redundant(bias_dict['copy']) - FileHandler(bias_dict).sync() - logger.debug(f"Bias correction files:\n{pformat(bias_dict)}") - - # extract bias corrections - Jedi.extract_tar_from_filehandler_dict(bias_dict) - - # stage CRTM fix files - logger.info(f"Staging CRTM fix files from {self.task_config.STAGE_CRTM_COEFF_YAML}") - crtm_fix_dict = parse_j2yaml(self.task_config.STAGE_CRTM_COEFF_YAML, self.task_config) - FileHandler(crtm_fix_dict).sync() - logger.debug(f"CRTM fix files:\n{pformat(crtm_fix_dict)}") - - # stage fix files - logger.info(f"Staging JEDI fix files from {self.task_config.STAGE_JEDI_FIX_YAML}") - jedi_fix_dict = parse_j2yaml(self.task_config.STAGE_JEDI_FIX_YAML, self.task_config) - FileHandler(jedi_fix_dict).sync() - logger.debug(f"JEDI fix files:\n{pformat(jedi_fix_dict)}") - - # stage static background error files, otherwise it will assume ID matrix - logger.info(f"Stage files for STATICB_TYPE {self.task_config.STATICB_TYPE}") - if self.task_config.STATICB_TYPE != 'identity': - berror_staging_dict = parse_j2yaml(self.task_config.STAGE_BERROR_YAML, self.task_config) - else: - berror_staging_dict = {} - FileHandler(berror_staging_dict).sync() - logger.debug(f"Background error files:\n{pformat(berror_staging_dict)}") - - # stage ensemble files for use in hybrid background error - if self.task_config.DOHYBVAR: - logger.debug(f"Stage ensemble files for DOHYBVAR {self.task_config.DOHYBVAR}") - fv3ens_staging_dict = parse_j2yaml(self.task_config.STAGE_FV3ENS_YAML, self.task_config) - FileHandler(fv3ens_staging_dict).sync() - logger.debug(f"Ensemble files:\n{pformat(fv3ens_staging_dict)}") - - # stage backgrounds - logger.info(f"Staging background files from {self.task_config.STAGE_BKG_YAML}") - bkg_staging_dict = parse_j2yaml(self.task_config.STAGE_BKG_YAML, self.task_config) - FileHandler(bkg_staging_dict).sync() - logger.debug(f"Background files:\n{pformat(bkg_staging_dict)}") + # Stage files from COM + logger.info(f"Staging files from COM and creating output directories") + FileHandler(self.task_config.data_in).sync() - # need output dir for diags and anl - logger.debug("Create empty output [anl, diags] directories to receive output from executable") - newdirs = [ - os.path.join(self.task_config.DATA, 'anl'), - os.path.join(self.task_config.DATA, 'diags'), - ] - FileHandler({'mkdir': newdirs}).sync() + # Extract bias corrections from tar files + logger.info(f"Extracting bias corrections from tar files") + self.untar_bias_corrections() - # initialize JEDI variational application - logger.info(f"Initializing JEDI variational DA application") + # Initialize JEDI variational application + logger.info(f"Initializing JEDI applications") self.jedi_dict['atmanlvar'].initialize(self.task_config, clean_empty_obsspaces=True) - - # initialize JEDI FV3 increment conversion application - logger.info(f"Initializing JEDI FV3 increment conversion application") self.jedi_dict['atmanlfv3inc'].initialize(self.task_config) @logit(logger) @@ -189,9 +117,9 @@ def finalize(self) -> None: This method will finalize a global atm analysis using JEDI. This includes: - - tar output diag files and place in ROTDIR - - copy the generated YAML file from initialize to the ROTDIR - - copy the updated bias correction files to ROTDIR + - compress and tar output diag files in COM + - tar radiative bias correction files and place in COM + - save output files and YAMLs to COM Parameters ---------- @@ -202,84 +130,14 @@ def finalize(self) -> None: None """ - # ---- tar up diags - # path of output tar statfile - atmstat = os.path.join(self.task_config.COMOUT_ATMOS_ANALYSIS, f"{self.task_config.APREFIX}atmstat") - - # get list of diag files to put in tarball - diags = glob.glob(os.path.join(self.task_config.DATA, 'diags', 'diag*nc')) - - logger.info(f"Compressing {len(diags)} diag files to {atmstat}.gz") - - # gzip the files first - logger.debug(f"Gzipping {len(diags)} diag files") - for diagfile in diags: - with open(diagfile, 'rb') as f_in, gzip.open(f"{diagfile}.gz", 'wb') as f_out: - f_out.writelines(f_in) - - # open tar file for writing - logger.debug(f"Creating tar file {atmstat} with {len(diags)} gzipped diag files") - with tarfile.open(atmstat, "w") as archive: - for diagfile in diags: - diaggzip = f"{diagfile}.gz" - archive.add(diaggzip, arcname=os.path.basename(diaggzip)) - - # get list of yamls to copy to ROTDIR - yamls = glob.glob(os.path.join(self.task_config.DATA, '*atm*yaml')) - - # copy full YAML from executable to ROTDIR - for src in yamls: - yaml_base = os.path.splitext(os.path.basename(src))[0] - dest_yaml_name = f"{self.task_config.APREFIX}{yaml_base}.yaml" - dest = os.path.join(self.task_config.COMOUT_CONF, dest_yaml_name) - logger.debug(f"Copying {src} to {dest}") - yaml_copy = { - 'copy': [[src, dest]] - } - FileHandler(yaml_copy).sync() - - # path of output radiance bias correction tarfile - bfile = f"{self.task_config.APREFIX}rad_varbc_params.tar" - radtar = os.path.join(self.task_config.COMOUT_ATMOS_ANALYSIS, bfile) - - # rename and copy tlapse radiance bias correction files from obs to bc - tlapobs = glob.glob(os.path.join(self.task_config.DATA, 'obs', '*tlapse.txt')) - copylist = [] - for tlapfile in tlapobs: - obsfile = os.path.basename(tlapfile).split('.', 2) - newfile = f"{self.task_config.APREFIX}{obsfile[2]}" - copylist.append([tlapfile, os.path.join(self.task_config.DATA, 'bc', newfile)]) - tlapse_dict = { - 'copy': copylist - } - FileHandler(tlapse_dict).sync() - - # get lists of radiance bias correction files to add to tarball - satlist = glob.glob(os.path.join(self.task_config.DATA, 'bc', '*satbias*nc')) - tlaplist = glob.glob(os.path.join(self.task_config.DATA, 'bc', '*tlapse.txt')) - - # tar radiance bias correction files to ROTDIR - logger.info(f"Creating radiance bias correction tar file {radtar}") - with tarfile.open(radtar, 'w') as radbcor: - for satfile in satlist: - radbcor.add(satfile, arcname=os.path.basename(satfile)) - for tlapfile in tlaplist: - radbcor.add(tlapfile, arcname=os.path.basename(tlapfile)) - logger.info(f"Add {radbcor.getnames()}") - - # Copy FV3 atm increment to comrot directory - logger.info("Copy UFS model readable atm increment file") - inc_copy = {'copy': []} - for itile in range(6): - src = os.path.join(self.task_config.DATA, "anl", - f"{self.task_config.APREFIX}cubed_sphere_grid_atminc.tile{itile+1}.nc") - dest = self.task_config.COMOUT_ATMOS_ANALYSIS - inc_copy['copy'].append([src, dest]) + # Compress and tar diag files in COM directory + self.tar_diag_files(self.task_config.COMOUT_ATMOS_ANALYSIS, + f"{self.task_config.APREFIX}atmstat") - # copy increments - src_list, dest_list = zip(*inc_copy['copy']) - logger.debug(f"Copying {src_list}\nto {dest_list}") - FileHandler(inc_copy).sync() + # Tar radiative bias correction files into COM directory + self.tar_radiative_bias_corrections(self.task_config.COMOUT_ATMOS_ANALYSIS, + f"{self.task_config.APREFIX}rad_varbc_params.tar") - def clean(self): - super().clean() + # Save files from COM + logger.info(f"Saving files to COM") + FileHandler(self.task_config.data_out).sync() diff --git a/ush/python/pygfs/task/atmens_analysis.py b/ush/python/pygfs/task/atmens_analysis.py index 6db19c11166..778cfaea8a3 100644 --- a/ush/python/pygfs/task/atmens_analysis.py +++ b/ush/python/pygfs/task/atmens_analysis.py @@ -1,24 +1,15 @@ #!/usr/bin/env python3 -import os -import glob -import gzip -import tarfile from logging import getLogger -from pprint import pformat -from typing import Dict, Any - -from wxflow import (AttrDict, FileHandler, Task, - add_to_datetime, to_timedelta, to_YMD, - parse_j2yaml, - logit, - Template, TemplateConstants) +from pygfs.task.analysis import Analysis from pygfs.jedi import Jedi +from typing import Dict, Any +from wxflow import AttrDict, FileHandler, parse_j2yaml, logit logger = getLogger(__name__.split('.')[-1]) -class AtmEnsAnalysis(Task): +class AtmEnsAnalysis(Analysis): """ Class for JEDI-based global atmens analysis tasks """ @@ -43,34 +34,24 @@ def __init__(self, config: Dict[str, Any]): super().__init__(config) _res = int(self.task_config.CASE_ENS[1:]) - _window_begin = add_to_datetime(self.task_config.current_cycle, -to_timedelta(f"{self.task_config.assim_freq}H") / 2) # Create a local dictionary that is repeatedly used across this class - local_dict = AttrDict( + self.task_config.update(AttrDict( { 'npx_ges': _res + 1, 'npy_ges': _res + 1, 'npz_ges': self.task_config.LEVS - 1, 'npz': self.task_config.LEVS - 1, - 'ATM_WINDOW_BEGIN': _window_begin, - 'ATM_WINDOW_LENGTH': f"PT{self.task_config.assim_freq}H", - 'OPREFIX': f"{self.task_config.EUPD_CYC}.t{self.task_config.cyc:02d}z.", - 'APREFIX': f"{self.task_config.RUN.replace('enkf', '')}.t{self.task_config.cyc:02d}z.", - 'APREFIX_ENS': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", - 'GPREFIX': f"gdas.t{self.task_config.previous_cycle.hour:02d}z.", - 'GPREFIX_ENS': f"enkfgdas.t{self.task_config.previous_cycle.hour:02d}z.", - 'atm_obsdatain_path': f"./obs/", - 'atm_obsdataout_path': f"./diags/", - 'BKG_TSTEP': "PT1H" # Placeholder for 4D applications - } + 'BKG_TSTEP': "PT1H", # Placeholder for 4D applications + }) ) - # Extend task_config with local_dict - self.task_config = AttrDict(**self.task_config, **local_dict) + # Extend task_config with content of config yaml for this task + self.task_config.update(parse_j2yaml(self.task_config.TASK_CONFIG_YAML, self.task_config)) # Create dictionary of JEDI objects expected_keys = ['atmensanlobs', 'atmensanlsol', 'atmensanlfv3inc', 'atmensanlletkf'] - self.jedi_dict = Jedi.get_jedi_dict(self.task_config.JEDI_CONFIG_YAML, self.task_config, expected_keys) + self.jedi_dict = Jedi.get_jedi_dict(self.task_config.jedi_config, self.task_config, expected_keys) @logit(logger) def initialize(self) -> None: @@ -78,13 +59,9 @@ def initialize(self) -> None: This method will initialize a global atmens analysis. This includes: - - initialize JEDI LETKF observer and FV3 increment converter applications - - staging observation files - - staging bias correction files - - staging CRTM fix files - - staging FV3-JEDI fix files - - staging model backgrounds - - creating output directories + - stage input files from COM and create output directories + - extract bias corrections from tar files + - initialize JEDI applications Parameters ---------- @@ -95,58 +72,18 @@ def initialize(self) -> None: None """ - # stage observations - logger.info(f"Staging list of observation files") - obs_dict = self.jedi_dict['atmensanlobs'].render_jcb(self.task_config, 'atm_obs_staging') - FileHandler(obs_dict).sync() - logger.debug(f"Observation files:\n{pformat(obs_dict)}") - - # stage bias corrections - logger.info(f"Staging list of bias correction files") - bias_dict = self.jedi_dict['atmensanlobs'].render_jcb(self.task_config, 'atm_bias_staging') - bias_dict['copy'] = Jedi.remove_redundant(bias_dict['copy']) - FileHandler(bias_dict).sync() - logger.debug(f"Bias correction files:\n{pformat(bias_dict)}") - - # extract bias corrections - Jedi.extract_tar_from_filehandler_dict(bias_dict) - - # stage CRTM fix files - logger.info(f"Staging CRTM fix files from {self.task_config.STAGE_CRTM_COEFF_YAML}") - crtm_fix_dict = parse_j2yaml(self.task_config.STAGE_CRTM_COEFF_YAML, self.task_config) - FileHandler(crtm_fix_dict).sync() - logger.debug(f"CRTM fix files:\n{pformat(crtm_fix_dict)}") - - # stage fix files - logger.info(f"Staging JEDI fix files from {self.task_config.STAGE_JEDI_FIX_YAML}") - jedi_fix_dict = parse_j2yaml(self.task_config.STAGE_JEDI_FIX_YAML, self.task_config) - FileHandler(jedi_fix_dict).sync() - logger.debug(f"JEDI fix files:\n{pformat(jedi_fix_dict)}") - - # stage backgrounds - logger.info(f"Stage ensemble member background files") - bkg_staging_dict = parse_j2yaml(self.task_config.STAGE_BKG_YAML, self.task_config) - FileHandler(bkg_staging_dict).sync() - logger.debug(f"Ensemble member background files:\n{pformat(bkg_staging_dict)}") - - # need output dir for diags and anl - logger.debug("Create empty output [anl, diags] directories to receive output from executable") - newdirs = [ - os.path.join(self.task_config.DATA, 'anl'), - os.path.join(self.task_config.DATA, 'diags'), - ] - FileHandler({'mkdir': newdirs}).sync() - - # initialize JEDI LETKF observer application + # Stage files from COM + logger.info(f"Staging files from COM") + FileHandler(self.task_config.data_in).sync() + + # Extract bias corrections from tar files + logger.info(f"Extracting bias corrections from tar files") + self.untar_bias_corrections() + + # initialize JEDI applications logger.info(f"Initializing JEDI LETKF observer application") self.jedi_dict['atmensanlobs'].initialize(self.task_config, clean_empty_obsspaces=True) - - # initialize JEDI LETKF solver application - logger.info(f"Initializing JEDI LETKF solver application") self.jedi_dict['atmensanlsol'].initialize(self.task_config) - - # initialize JEDI FV3 increment conversion application - logger.info(f"Initializing JEDI FV3 increment conversion application") self.jedi_dict['atmensanlfv3inc'].initialize(self.task_config) @logit(logger) @@ -189,8 +126,8 @@ def finalize(self) -> None: This method will finalize a global atmens analysis using JEDI. This includes: - - tar output diag files and place in ROTDIR - - copy the generated YAML file from initialize to the ROTDIR + - compress and tar output diag files and place in COM + - save output files and YAMLs to COM Parameters ---------- @@ -201,76 +138,10 @@ def finalize(self) -> None: None """ - # ---- tar up diags - # path of output tar statfile - atmensstat = os.path.join(self.task_config.COMOUT_ATMOS_ANALYSIS_ENS, f"{self.task_config.APREFIX_ENS}atmensstat") - - # get list of diag files to put in tarball - diags = glob.glob(os.path.join(self.task_config.DATA, 'diags', 'diag*nc')) - - logger.info(f"Compressing {len(diags)} diag files to {atmensstat}.gz") - - # gzip the files first - logger.debug(f"Gzipping {len(diags)} diag files") - for diagfile in diags: - with open(diagfile, 'rb') as f_in, gzip.open(f"{diagfile}.gz", 'wb') as f_out: - f_out.writelines(f_in) - - # open tar file for writing - logger.debug(f"Creating tar file {atmensstat} with {len(diags)} gzipped diag files") - with tarfile.open(atmensstat, "w") as archive: - for diagfile in diags: - diaggzip = f"{diagfile}.gz" - archive.add(diaggzip, arcname=os.path.basename(diaggzip)) - - # get list of yamls to cop to ROTDIR - yamls = glob.glob(os.path.join(self.task_config.DATA, '*atmens*yaml')) - - # copy full YAML from executable to ROTDIR - for src in yamls: - logger.info(f"Copying {src} to {self.task_config.COMOUT_CONF}") - yaml_base = os.path.splitext(os.path.basename(src))[0] - dest_yaml_name = f"{self.task_config.APREFIX_ENS}{yaml_base}.yaml" - dest = os.path.join(self.task_config.COMOUT_CONF, dest_yaml_name) - logger.debug(f"Copying {src} to {dest}") - yaml_copy = { - 'copy': [[src, dest]] - } - FileHandler(yaml_copy).sync() - - # create template dictionaries - template_inc = self.task_config.COM_ATMOS_ANALYSIS_TMPL - tmpl_inc_dict = { - 'ROTDIR': self.task_config.ROTDIR, - 'RUN': self.task_config.RUN, - 'YMD': to_YMD(self.task_config.current_cycle), - 'HH': self.task_config.current_cycle.strftime('%H') - } - - # copy ensemble mean analysis to comrot - logger.info("Copy ensemble mean analysis") - fh_dict = {'copy': [[f"{self.task_config.DATA}/anl/{self.task_config.APREFIX_ENS}cubed_sphere_grid_atmanl.ensmean.nc", - f"{self.task_config.COMOUT_ATMOS_ANALYSIS_ENS}"]]} - FileHandler(fh_dict).sync() - - # copy FV3 atm increment to comrot directory - logger.info("Copy UFS model readable atm increment file") - - # loop over ensemble members - inc_copy = {'copy': []} - for imem in range(1, self.task_config.NMEM_ENS + 1): - memchar = f"mem{imem:03d}" - - # create output path for member analysis increment - tmpl_inc_dict['MEMDIR'] = memchar - incdir = Template.substitute_structure(template_inc, TemplateConstants.DOLLAR_CURLY_BRACE, tmpl_inc_dict.get) - src = os.path.join(self.task_config.DATA, 'anl', memchar, - f"{self.task_config.APREFIX_ENS}cubed_sphere_grid_atminc.nc") - dest = incdir - inc_copy['copy'].append([src, dest]) - - logger.debug(f"Copying increments") - FileHandler(inc_copy).sync() - - def clean(self): - super().clean() + # Compress and tar diag files in COM directory + self.tar_diag_files(self.task_config.COMOUT_ATMOS_ANALYSIS_ENS, + f"{self.task_config.APREFIX_ENS}atmensstat") + + # Save files from COM + logger.info(f"Saving files to COM") + FileHandler(self.task_config.data_out).sync() diff --git a/ush/python/pygfs/task/ensemble_recenter.py b/ush/python/pygfs/task/ensemble_recenter.py index 2f50f45501e..54ba1d12339 100644 --- a/ush/python/pygfs/task/ensemble_recenter.py +++ b/ush/python/pygfs/task/ensemble_recenter.py @@ -1,24 +1,20 @@ #!/usr/bin/env python3 -from datetime import timedelta from logging import getLogger -import os -from pprint import pformat +from pygfs.task.analysis import Analysis from pygfs.jedi import Jedi -from wxflow import (AttrDict, FileHandler, Task, Executable, Template, TemplateConstants, - add_to_datetime, to_timedelta, to_isotime, to_YMD, - parse_j2yaml, - logit) +from typing import Dict, Any +from wxflow import AttrDict, FileHandler, parse_j2yaml, logit logger = getLogger(__name__.split('.')[-1]) -class EnsembleRecenter(Task): +class EnsembleRecenter(Analysis): """ Class for JEDI-based ensemble increment recentering """ @logit(logger, name="EnsembleRecenter") - def __init__(self, config): + def __init__(self, config: Dict[str, Any]): """Constructor for atmospheric ensemble increment recentering task This method will construct an ensemble increment recentering task @@ -39,38 +35,26 @@ def __init__(self, config): _res = int(self.task_config.CASE[1:]) _res_anl = int(self.task_config.CASE_ANL[1:]) - _window_begin = add_to_datetime(self.task_config.current_cycle, -to_timedelta(f"{self.task_config.assim_freq}H") / 2) - - _iau_times_iso = [] - for hour in self.task_config.IAUFHRS: - _iau_times_iso.append(to_isotime(_window_begin + to_timedelta(f"{str(hour)}H") - to_timedelta(f"{self.task_config.assim_freq}H") / 2)) # Create a local dictionary that is repeatedly used across this class - local_dict = AttrDict( + self.task_config.update(AttrDict( { 'npx_ges': _res + 1, 'npy_ges': _res + 1, 'npz_ges': self.task_config.LEVS - 1, - 'npz': self.task_config.LEVS - 1, 'npx_anl': _res_anl + 1, 'npy_anl': _res_anl + 1, 'npz_anl': self.task_config.LEVS - 1, - 'ATM_WINDOW_LENGTH': f"PT{self.task_config.assim_freq}H", - 'ATM_WINDOW_BEGIN': _window_begin, - 'APREFIX': f"{self.task_config.RUN.replace('enkf', '')}.t{self.task_config.cyc:02d}z.", - 'APREFIX_ENS': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", - 'GPREFIX': f"gdas.t{self.task_config.previous_cycle.hour:02d}z.", - 'GPREFIX_ENS': f"enkfgdas.t{self.task_config.previous_cycle.hour:02d}z.", - 'iau_times_iso': _iau_times_iso + 'npz': self.task_config.LEVS - 1, } - ) + )) - # Extend task_config with local_dict - self.task_config = AttrDict(**self.task_config, **local_dict) + # Extend task_config with content of config yaml for this task + self.task_config.update(parse_j2yaml(self.task_config.TASK_CONFIG_YAML, self.task_config)) # Create dictionary of Jedi objects expected_keys = ['correction_increment', 'ensemble_recenter'] - self.jedi_dict = Jedi.get_jedi_dict(self.task_config.JEDI_CONFIG_YAML, self.task_config, expected_keys) + self.jedi_dict = Jedi.get_jedi_dict(self.task_config.jedi_config, self.task_config, expected_keys) @logit(logger) def initialize(self) -> None: @@ -78,9 +62,8 @@ def initialize(self) -> None: This method will initialize the ensemble increment recentering task. This includes: - - initializing the JEDI recentering application - - staging JEDI fix files - - staging backgrounds and increments + - stage input files from COM and create output directories + - initialize JEDI applications Parameters ---------- @@ -91,23 +74,15 @@ def initialize(self) -> None: None """ + # Stage files from COM + logger.info(f"Staging files from COM") + FileHandler(self.task_config.data_in).sync() + # Initialize JEDI ensemble increment recentering application - logger.info(f"Initializing JEDI ensemble recentering applications") + logger.info(f"Initializing JEDI applications") self.jedi_dict['correction_increment'].initialize(self.task_config) self.jedi_dict['ensemble_recenter'].initialize(self.task_config) - # Stage fix files - logger.info(f"Staging JEDI fix files from {self.task_config.STAGE_JEDI_FIX_YAML}") - jedi_fix_dict = parse_j2yaml(self.task_config.STAGE_JEDI_FIX_YAML, self.task_config) - FileHandler(jedi_fix_dict).sync() - logger.debug(f"JEDI fix files:\n{pformat(jedi_fix_dict)}") - - # Stage background and increment files - logger.info(f"Staging background and increment files from {self.task_config.STAGE_YAML}") - fh_dict = parse_j2yaml(self.task_config.STAGE_YAML, self.task_config) - FileHandler(fh_dict).sync() - logger.debug(f"JEDI background and increment files:\n{pformat(fh_dict)}") - @logit(logger) def execute(self) -> None: """Run JEDI executable @@ -135,7 +110,7 @@ def finalize(self) -> None: This method will finalize the ensemble increment recentering task. This includes: - - Move correction increment files to the comrot directory + - save output files and YAMLs to COM Parameters ---------- @@ -146,41 +121,6 @@ def finalize(self) -> None: None """ - fh_dict = {'copy': []} - - # create template dictionaries - template_inc = self.task_config.COM_ATMOS_ANALYSIS_TMPL - tmpl_inc_dict = { - 'ROTDIR': self.task_config.ROTDIR, - 'RUN': self.task_config.RUN, - 'YMD': to_YMD(self.task_config.current_cycle), - 'HH': self.task_config.current_cycle.strftime('%H') - } - - # Copy increments to COM - for imem in range(1, self.task_config.NMEM_ENS + 1): - memchar = f"mem{imem:03d}" - tmpl_inc_dict['MEMDIR'] = memchar - incdir = Template.substitute_structure(template_inc, TemplateConstants.DOLLAR_CURLY_BRACE, tmpl_inc_dict.get) - for fh in self.task_config.IAUFHRS: - hr = format(fh, '03') - for itile in range(6): - src = os.path.join(self.task_config.DATA, memchar, - f"{self.task_config.APREFIX_ENS}cubed_sphere_grid_ratmi{hr}.tile{itile+1}.nc") - if fh == 6: - dest = os.path.join(incdir, - f"{self.task_config.APREFIX_ENS}cubed_sphere_grid_ratminc.tile{itile+1}.nc") - else: - dest = incdir - fh_dict['copy'].append([src, dest]) - - # Copy YAMLs to COM - for app_name in self.jedi_dict.keys(): - src = os.path.join(self.task_config.DATA, - f"{app_name}.yaml") - dest = os.path.join(self.task_config.COMOUT_CONF, - f"{self.task_config.APREFIX_ENS}{app_name}.yaml") - fh_dict['copy'].append([src, dest]) - - # Sync file handler - FileHandler(fh_dict).sync() + # Save output files to COM + logger.info(f"Saving output files to COM") + FileHandler(self.task_config.data_out).sync() diff --git a/ush/python/pygfs/task/fv3_analysis_calc.py b/ush/python/pygfs/task/fv3_analysis_calc.py index a287c82ad4e..5dc5000ed51 100644 --- a/ush/python/pygfs/task/fv3_analysis_calc.py +++ b/ush/python/pygfs/task/fv3_analysis_calc.py @@ -4,22 +4,20 @@ from logging import getLogger import netCDF4 as nc import os -from pprint import pformat from pygfs.jedi import Jedi -from wxflow import (AttrDict, FileHandler, Task, - parse_j2yaml, - to_timedelta, add_to_datetime, to_fv3time, to_isotime, - logit) +from pygfs.task.analysis import Analysis +from typing import Dict, Any +from wxflow import AttrDict, FileHandler, to_fv3time, parse_j2yaml, logit logger = getLogger(__name__.split('.')[-1]) -class FV3AnalysisCalc(Task): +class FV3AnalysisCalc(Analysis): """ Class for analysis calculation """ @logit(logger, name="FV3AnalysisCalc") - def __init__(self, config): + def __init__(self, config: Dict[str, Any]): """Constructor for analysis calculation task This method will construct an analysis calculation @@ -39,30 +37,23 @@ def __init__(self, config): super().__init__(config) _res = int(self.task_config.CASE[1:]) - _res_anl = int(self.task_config.CASE_ANL[1:]) - _window_begin = add_to_datetime(self.task_config.current_cycle, -to_timedelta(f"{self.task_config.assim_freq}H") / 2) + _res_anl = int(self.task_config['CASE_ANL'][1:]) # Create a local dictionary that is repeatedly used across this class - local_dict = AttrDict( + self.task_config.update(AttrDict( { 'npx_ges': _res + 1, 'npy_ges': _res + 1, 'npz_ges': self.task_config.LEVS - 1, - 'npz': self.task_config.LEVS - 1, 'npx_anl': _res_anl + 1, 'npy_anl': _res_anl + 1, 'npz_anl': self.task_config.LEVS - 1, - 'ATM_WINDOW_LENGTH': f"PT{self.task_config.assim_freq}H", - 'ATM_WINDOW_BEGIN': _window_begin, - 'APREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", - 'APREFIX_ENS': f"enkf{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", - 'GPREFIX': f"gdas.t{self.task_config.previous_cycle.hour:02d}z.", - 'GPREFIX_ENS': f"enkfgdas.t{self.task_config.previous_cycle.hour:02d}z.", + 'npz': self.task_config.LEVS - 1, } - ) + )) - # Extend task_config with local_dict - self.task_config = AttrDict(**self.task_config, **local_dict) + # Extend task_config with content of config yaml for this task + self.task_config.update(parse_j2yaml(self.task_config.TASK_CONFIG_YAML, self.task_config)) # Create dictionary of Jedi objects expected_keys = ['atm_addincrement'] @@ -70,7 +61,7 @@ def __init__(self, config): expected_keys.append('aero_addincrement') if self.task_config.DO_JEDISNOWDA: expected_keys.append('snow_addincrement') - self.jedi_dict = Jedi.get_jedi_dict(self.task_config.JEDI_CONFIG_YAML, self.task_config, expected_keys) + self.jedi_dict = Jedi.get_jedi_dict(self.task_config.jedi_config, self.task_config, expected_keys) @logit(logger) def initialize(self) -> None: @@ -78,9 +69,8 @@ def initialize(self) -> None: This method will initialize the analysis calculation task. This includes: - - initializing the JEDI addincrement application - - staging JEDI fix files - - staging backgrounds and increments + - stage input files from COM and create output directories + - initialize JEDI applications Parameters ---------- @@ -91,6 +81,10 @@ def initialize(self) -> None: None """ + # Stage files from COM + logger.info(f"Staging files from COM") + FileHandler(self.task_config.data_in).sync() + # Initialize GDASApp JEDI addincrement application logger.info(f"Initializing GDASApp JEDI addincrement applications") self.jedi_dict['atm_addincrement'].initialize(self.task_config) @@ -99,17 +93,6 @@ def initialize(self) -> None: if self.task_config.DO_JEDISNOWDA: self.jedi_dict['snow_addincrement'].initialize(self.task_config) - # Stage fix files - logger.info(f"Staging JEDI fix files from {self.task_config.STAGE_JEDI_FIX_YAML}") - jedi_fix_dict = parse_j2yaml(self.task_config.STAGE_JEDI_FIX_YAML, self.task_config) - FileHandler(jedi_fix_dict).sync() - logger.debug(f"JEDI fix files:\n{pformat(jedi_fix_dict)}") - - # Stage background and increment files - logger.info(f"Staging background and increment files from COM") - fh_dict = parse_j2yaml(self.task_config.STAGE_YAML, self.task_config) - FileHandler(fh_dict).sync() - @logit(logger) def execute(self) -> None: """Compute analyses @@ -165,7 +148,8 @@ def finalize(self) -> None: This method will finalize the analysis calculation task. This includes: - - Move analysis files to the comrot directory + - write analysis log file + - save output files and YAMLs to COM Parameters ---------- @@ -176,26 +160,6 @@ def finalize(self) -> None: None """ - # Copy analyses to COM - fh_dict = {'copy': []} - src_prefix = f"{self.task_config.DATA}/{self.task_config.GPREFIX}" - dest_prefix = f"{self.task_config.COMOUT_ATMOS_ANALYSIS}/{self.task_config.APREFIX}" - fh_dict['copy'].append([f"{src_prefix}atmf006.nc", - f"{dest_prefix}atmanl.nc"]) - fh_dict['copy'].append([f"{src_prefix}sfcf006.nc", - f"{dest_prefix}sfcanl.nc"]) - - # Copy YAMLs to COM - for app_name in self.jedi_dict.keys(): - src = os.path.join(self.task_config.DATA, - f"{app_name}.yaml") - dest = os.path.join(self.task_config.COMOUT_ATMOS_ANALYSIS, - f"{self.task_config.APREFIX}{app_name}.yaml") - fh_dict['copy'].append([src, dest]) - - # Call FileHandler - FileHandler(fh_dict).sync() - # Write analysis log file formatted_date = datetime.now().strftime("%a %b %d %H:%M:%S %Z%Y") log_file = os.path.join(self.task_config.COMOUT_ATMOS_ANALYSIS, f"{self.task_config.RUN}.t{self.task_config.cyc}z.loganl.txt") @@ -203,6 +167,10 @@ def finalize(self) -> None: with open(log_file, "w") as file: file.write(f"{message}\n") + # Save files from COM + logger.info(f"Saving files to COM") + FileHandler(self.task_config.data_out).sync() + @logit(logger) def insert_analysis_variables(valid_time, fn_anl: str, fn_bkg: str) -> None: diff --git a/ush/python/pygfs/task/marine_analysis.py b/ush/python/pygfs/task/marine_analysis.py index fd7fa8d2427..b374c5f0ed5 100644 --- a/ush/python/pygfs/task/marine_analysis.py +++ b/ush/python/pygfs/task/marine_analysis.py @@ -81,7 +81,8 @@ def __init__(self, config): # Construct dictionary of JEDI objects, one for each JEDI application need for the analysis expected_keys = ['var', 'soca_incpostproc', 'soca_diag_stats'] - self.jedi_dict = Jedi.get_jedi_dict(self.task_config.JEDI_CONFIG_YAML_DET, self.task_config, expected_keys) + jedi_config_dict = parse_j2yaml(self.task_config.JEDI_CONFIG_YAML_DET, self.task_config) + self.jedi_dict = Jedi.get_jedi_dict(jedi_config_dict, self.task_config, expected_keys) @logit(logger) def initialize(self: Task) -> None: diff --git a/ush/python/pygfs/task/marine_bmat.py b/ush/python/pygfs/task/marine_bmat.py index ab32fc88bdf..e2c2739f893 100644 --- a/ush/python/pygfs/task/marine_bmat.py +++ b/ush/python/pygfs/task/marine_bmat.py @@ -75,7 +75,8 @@ def __init__(self, config): # Create dictionary of Jedi objects expected_keys = ['gridgen', 'soca_diagb', 'soca_parameters_diffusion_vt', 'soca_setcorscales', 'soca_parameters_diffusion_hz', 'soca_ensb', 'soca_ensweights', 'soca_chgres'] - self.jedi_dict = Jedi.get_jedi_dict(self.task_config.JEDI_CONFIG_YAML, self.task_config, expected_keys) + jedi_config_dict = parse_j2yaml(self.task_config.JEDI_CONFIG_YAML, self.task_config) + self.jedi_dict = Jedi.get_jedi_dict(jedi_config_dict, self.task_config, expected_keys) @logit(logger) def initialize(self: Task) -> None: diff --git a/ush/python/pygfs/task/marine_recenter.py b/ush/python/pygfs/task/marine_recenter.py index 03fb5e2bcbc..87bc8e8c785 100644 --- a/ush/python/pygfs/task/marine_recenter.py +++ b/ush/python/pygfs/task/marine_recenter.py @@ -64,7 +64,8 @@ def __init__(self, config: Dict) -> None: # Construct dictionary of JEDI objects, one for each JEDI application need for the analysis expected_keys = ['gridgen', 'ens_handler'] - self.jedi_dict = Jedi.get_jedi_dict(self.task_config.JEDI_CONFIG_YAML, self.task_config, expected_keys) + jedi_config_dict = parse_j2yaml(self.task_config.JEDI_CONFIG_YAML, self.task_config) + self.jedi_dict = Jedi.get_jedi_dict(jedi_config_dict, self.task_config, expected_keys) @logit(logger) def initialize(self): diff --git a/ush/python/pygfs/task/snow_analysis.py b/ush/python/pygfs/task/snow_analysis.py index 5253e186104..c9bb3ad778b 100644 --- a/ush/python/pygfs/task/snow_analysis.py +++ b/ush/python/pygfs/task/snow_analysis.py @@ -9,29 +9,25 @@ import tarfile import numpy as np from netCDF4 import Dataset - -from wxflow import (AttrDict, - FileHandler, +from pygfs.task.analysis import Analysis +from pygfs.jedi import Jedi +from wxflow import (AttrDict, Executable, FileHandler, WorkflowException, to_fv3time, to_YMD, to_YMDH, to_timedelta, add_to_datetime, to_julian, rm_p, cp, parse_j2yaml, save_as_yaml, Jinja, - Task, - logit, - Executable, - WorkflowException) -from pygfs.jedi import Jedi + logit) logger = getLogger(__name__.split('.')[-1]) -class SnowAnalysis(Task): +class SnowAnalysis(Analysis): """ Class for JEDI-based global snow analysis tasks """ - @logit(logger, name="SnowAnalysis") + @logit(logger, name="Analysis") def __init__(self, config: Dict[str, Any]): """Constructor global snow analysis task @@ -52,39 +48,34 @@ def __init__(self, config: Dict[str, Any]): super().__init__(config) _res = int(self.task_config['CASE'][1:]) - _window_begin = add_to_datetime(self.task_config.current_cycle, -to_timedelta(f"{self.task_config['assim_freq']}H") / 2) - # fix ocnres - self.task_config.OCNRES = f"{self.task_config.OCNRES:03d}" - - # Create a local dictionary that is repeatedly used across this class - local_dict = AttrDict( + # if 00z, do SCF preprocessing + _ims_file = os.path.join(self.task_config.COMIN_OBS, f'{self.task_config.OPREFIX}imssnow96.asc') + logger.info(f"Checking for IMS file: {_ims_file}") + if self.task_config.cyc == 0 and os.path.exists(_ims_file): + _DO_IMS_SCF = True + else: + _DO_IMS_SCF = False + + # Extend task_config with variables repeatedly used across this class + self.task_config.update(AttrDict( { 'npx_ges': _res + 1, 'npy_ges': _res + 1, 'npz_ges': self.task_config.LEVS - 1, 'npz': self.task_config.LEVS - 1, - 'SNOW_WINDOW_BEGIN': _window_begin, - 'SNOW_WINDOW_LENGTH': f"PT{self.task_config['assim_freq']}H", - 'OPREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", - 'APREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", - 'GPREFIX': f"gdas.t{self.task_config.previous_cycle.hour:02d}z.", - 'snow_obsdatain_path': os.path.join(self.task_config.DATA, 'obs'), - 'snow_obsdataout_path': os.path.join(self.task_config.DATA, 'diags'), 'snow_bkg_path': os.path.join('.', 'bkg/'), - 'res': _res, + 'ims_file': _ims_file, + 'DO_IMS_SCF': _DO_IMS_SCF, # Boolean to decide if IMS snow cover processing is done } - ) + )) - # Extend task_config with local_dict - self.task_config = AttrDict(**self.task_config, **local_dict) + # Extend task_config with content of config yaml for this task + self.task_config.update(parse_j2yaml(self.task_config.TASK_CONFIG_YAML, self.task_config)) # Create JEDI object dictionary expected_keys = ['scf_to_ioda', 'snowanlvar'] - self.jedi_dict = Jedi.get_jedi_dict(self.task_config.JEDI_CONFIG_YAML, self.task_config, expected_keys) - - # Boolean to decide if IMS snow cover processing is done - self.task_config.DO_IMS_SCF = False + self.jedi_dict = Jedi.get_jedi_dict(self.task_config.jedi_config, self.task_config, expected_keys) @logit(logger) def initialize(self) -> None: @@ -92,12 +83,8 @@ def initialize(self) -> None: This method will initialize a global snow analysis. This includes: - - initialize JEDI application - - staging model backgrounds - - staging observation files - - staging FV3-JEDI fix files - - staging B error files - - creating output directories + - stage input files from COM and create output directories + - initialize JEDI applications Parameters ---------- @@ -108,56 +95,15 @@ def initialize(self) -> None: None """ - # stage backgrounds - logger.info(f"Staging background files from {self.task_config.STAGE_BKG_YAML}") - bkg_staging_dict = parse_j2yaml(self.task_config.STAGE_BKG_YAML, self.task_config) - FileHandler(bkg_staging_dict).sync() - logger.debug(f"Background files:\n{pformat(bkg_staging_dict)}") - - # stage observations - logger.info(f"Staging list of observation files generated from JEDI config") - obs_dict = self.jedi_dict['snowanlvar'].render_jcb(self.task_config, 'snow_obs_staging') - FileHandler(obs_dict).sync() - logger.debug(f"Observation files:\n{pformat(obs_dict)}") - - # stage GTS bufr2ioda mapping YAML files - logger.info(f"Staging GTS bufr2ioda mapping YAML files from {self.task_config.STAGE_GTS_YAML}") - gts_mapping_list = parse_j2yaml(self.task_config.STAGE_GTS_YAML, self.task_config) - FileHandler(gts_mapping_list).sync() - - # stage FV3-JEDI fix files - logger.info(f"Staging JEDI fix files from {self.task_config.STAGE_JEDI_FIX_YAML}") - jedi_fix_dict = parse_j2yaml(self.task_config.STAGE_JEDI_FIX_YAML, self.task_config) - FileHandler(jedi_fix_dict).sync() - logger.debug(f"JEDI fix files:\n{pformat(jedi_fix_dict)}") - - # staging B error files - logger.info("Stage files for static background error") - berror_staging_dict = parse_j2yaml(self.task_config.STAGE_BERROR_YAML, self.task_config) - FileHandler(berror_staging_dict).sync() - logger.debug(f"Background error files:\n{pformat(berror_staging_dict)}") - - # need output dir for diags and anl - logger.debug("Create empty output [anl, diags] directories to receive output from executable") - newdirs = [ - os.path.join(self.task_config.DATA, 'anl'), - os.path.join(self.task_config.DATA, 'diags'), - ] - FileHandler({'mkdir': newdirs}).sync() - - # if 00z, do SCF preprocessing - if self.task_config.cyc == 0: - ims_scf_to_ioda_staging_dict = parse_j2yaml(self.task_config.STAGE_IMS_SCF2IODA_YAML, self.task_config) - FileHandler(ims_scf_to_ioda_staging_dict).sync() - self.jedi_dict['scf_to_ioda'].initialize(self.task_config) - # Check if file exists - ims_file = ims_scf_to_ioda_staging_dict['copy_opt'][0][1] - if os.path.exists(ims_file): - self.task_config.DO_IMS_SCF = True + # Stage files from COM + logger.info(f"Staging files from COM and creating output directories") + FileHandler(self.task_config.data_in).sync() # initialize JEDI variational application - logger.info(f"Initializing JEDI variational DA application") + logger.info(f"Initializing JEDI applications") self.jedi_dict['snowanlvar'].initialize(self.task_config, clean_empty_obsspaces=False) + if self.task_config.DO_IMS_SCF: + self.jedi_dict['scf_to_ioda'].initialize(self.task_config) @logit(logger) def execute(self, jedi_dict_key: str) -> None: @@ -181,10 +127,8 @@ def execute(self, jedi_dict_key: str) -> None: def finalize(self) -> None: """Performs closing actions of the Snow analysis task This method: - - tar and gzip the output diag files and place in COM/ - - copy the generated YAML file from initialize to the COM/ - - copy the analysis files to the COM/ - - copy the increment files to the COM/ + - compress and tar output diag files in COM + - save output files and YAMLs to COM Parameters ---------- @@ -192,67 +136,13 @@ def finalize(self) -> None: Instance of the SnowAnalysis object """ - # ---- tar up diags - # path of output tar statfile - snowstat = os.path.join(self.task_config.COMOUT_SNOW_ANALYSIS, f"{self.task_config.APREFIX}snowstat.tgz") - - # get list of diag files to put in tarball - diags = glob.glob(os.path.join(self.task_config.DATA, 'diags', 'diag*nc')) - - logger.info(f"Compressing {len(diags)} diag files to {snowstat}") - - # gzip the files first - logger.debug(f"Gzipping {len(diags)} diag files") - for diagfile in diags: - with open(diagfile, 'rb') as f_in, gzip.open(f"{diagfile}.gz", 'wb') as f_out: - f_out.writelines(f_in) - - # open tar file for writing - logger.debug(f"Creating tar file {snowstat} with {len(diags)} gzipped diag files") - with tarfile.open(snowstat, "w|gz") as archive: - for diagfile in diags: - diaggzip = f"{diagfile}.gz" - archive.add(diaggzip, arcname=os.path.basename(diaggzip)) - - # get list of yamls to copy to ROTDIR - yamls = glob.glob(os.path.join(self.task_config.DATA, '*snow*yaml')) - - # copy full YAML from executable to ROTDIR - for src in yamls: - yaml_base = os.path.splitext(os.path.basename(src))[0] - dest_yaml_name = f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.{yaml_base}.yaml" - dest = os.path.join(self.task_config.COMOUT_CONF, dest_yaml_name) - logger.debug(f"Copying {src} to {dest}") - yaml_copy = { - 'copy': [[src, dest]] - } - FileHandler(yaml_copy).sync() - - logger.info("Copy analysis to COM") - bkgtimes = [] - if self.task_config.DOIAU: - # need both beginning and middle of window - bkgtimes.append(self.task_config.SNOW_WINDOW_BEGIN) - bkgtimes.append(self.task_config.current_cycle) - anllist = [] - for bkgtime in bkgtimes: - template = f'{to_fv3time(bkgtime)}.sfc_data.tile{{tilenum}}.nc' - for itile in range(1, self.task_config.ntiles + 1): - filename = template.format(tilenum=itile) - src = os.path.join(self.task_config.DATA, 'anl', filename) - dest = os.path.join(self.task_config.COMOUT_SNOW_ANALYSIS, filename) - anllist.append([src, dest]) - FileHandler({'copy': anllist}).sync() + # Compress and tar diag files into COM directory + self.tar_diag_files(self.task_config.COMOUT_SNOW_ANALYSIS, + f"{self.task_config.APREFIX}snowstat.tgz") - logger.info('Copy increments to COM') - template = f'snowinc.{to_fv3time(self.task_config.current_cycle)}.sfc_data.tile{{tilenum}}.nc' - inclist = [] - for itile in range(1, self.task_config.ntiles + 1): - filename = template.format(tilenum=itile) - src = os.path.join(self.task_config.DATA, 'anl', filename) - dest = os.path.join(self.task_config.COMOUT_SNOW_ANALYSIS, filename) - inclist.append([src, dest]) - FileHandler({'copy': inclist}).sync() + # Save files to COM + logger.info(f"Saving files to COM") + FileHandler(self.task_config.data_out).sync() @logit(logger) def add_increments(self) -> None: @@ -269,7 +159,7 @@ def add_increments(self) -> None: bkgtimes = [] if self.task_config.DOIAU: # want analysis at beginning and middle of window - bkgtimes.append(self.task_config.SNOW_WINDOW_BEGIN) + bkgtimes.append(self.task_config.WINDOW_BEGIN) bkgtimes.append(self.task_config.current_cycle) anllist = [] for bkgtime in bkgtimes: @@ -284,7 +174,7 @@ def add_increments(self) -> None: if self.task_config.DOIAU: logger.info("Copying increments to beginning of window") template_in = f'snowinc.{to_fv3time(self.task_config.current_cycle)}.sfc_data.tile{{tilenum}}.nc' - template_out = f'snowinc.{to_fv3time(self.task_config.SNOW_WINDOW_BEGIN)}.sfc_data.tile{{tilenum}}.nc' + template_out = f'snowinc.{to_fv3time(self.task_config.WINDOW_BEGIN)}.sfc_data.tile{{tilenum}}.nc' inclist = [] for itile in range(1, self.task_config.ntiles + 1): filename_in = template_in.format(tilenum=itile) diff --git a/ush/python/pygfs/task/snowens_analysis.py b/ush/python/pygfs/task/snowens_analysis.py index 9a94a55c9e5..7a9d0c7acff 100644 --- a/ush/python/pygfs/task/snowens_analysis.py +++ b/ush/python/pygfs/task/snowens_analysis.py @@ -9,7 +9,8 @@ import tarfile import numpy as np from netCDF4 import Dataset - +from pygfs.task.analysis import Analysis +from pygfs.jedi import Jedi from wxflow import (AttrDict, FileHandler, to_fv3time, to_YMD, to_YMDH, to_timedelta, add_to_datetime, @@ -17,16 +18,14 @@ rm_p, cp, parse_j2yaml, save_as_yaml, Jinja, - Task, logit, Executable, WorkflowException) -from pygfs.jedi import Jedi logger = getLogger(__name__.split('.')[-1]) -class SnowEnsAnalysis(Task): +class SnowEnsAnalysis(Analysis): """ Class for JEDI-based global snow ensemble analysis tasks """ @@ -52,40 +51,34 @@ def __init__(self, config: Dict[str, Any]): super().__init__(config) _res = int(self.task_config['CASE_ENS'][1:]) - self.task_config['CASE'] = self.task_config['CASE_ENS'] - _window_begin = add_to_datetime(self.task_config.current_cycle, -to_timedelta(f"{self.task_config['assim_freq']}H") / 2) - # fix ocnres - self.task_config.OCNRES = f"{self.task_config.OCNRES :03d}" - - # Create a local dictionary that is repeatedly used across this class - local_dict = AttrDict( + # if 00z, do SCF preprocessing + _ims_file = os.path.join(self.task_config.COMIN_OBS, f'{self.task_config.OPREFIX}imssnow96.asc') + if self.task_config.cyc == 0 and os.path.exists(_ims_file): + _DO_IMS_SCF = True + else: + _DO_IMS_SCF = False + + # Extend task_config with variables repeatedly used across this class + self.task_config.update(AttrDict( { 'npx_ges': _res + 1, 'npy_ges': _res + 1, 'npz_ges': self.task_config.LEVS - 1, 'npz': self.task_config.LEVS - 1, - 'SNOW_WINDOW_BEGIN': _window_begin, - 'SNOW_WINDOW_LENGTH': f"PT{self.task_config['assim_freq']}H", - 'OPREFIX': f"{self.task_config.CDUMP}.t{self.task_config.cyc:02d}z.", - 'APREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", - 'GPREFIX': f"gdas.t{self.task_config.previous_cycle.hour:02d}z.", - 'snow_obsdatain_path': os.path.join(self.task_config.DATA, 'obs'), - 'snow_obsdataout_path': os.path.join(self.task_config.DATA, 'diags'), + 'CASE': self.task_config.CASE_ENS, 'snow_bkg_path': os.path.join('.', 'bkg', 'ensmean/'), - 'res': _res, + 'ims_file': _ims_file, + 'DO_IMS_SCF': _DO_IMS_SCF, # Boolean to decide if IMS snow cover processing is done } - ) + )) - # Extend task_config with local_dict - self.task_config = AttrDict(**self.task_config, **local_dict) + # Extend task_config with content of config yaml for this task + self.task_config.update(parse_j2yaml(self.task_config.TASK_CONFIG_YAML, self.task_config)) # Create JEDI object dictionary expected_keys = ['scf_to_ioda', 'snowanlvar', 'esnowanlensmean'] - self.jedi_dict = Jedi.get_jedi_dict(self.task_config.JEDI_CONFIG_YAML, self.task_config, expected_keys) - - # Boolean to decide if IMS snow cover processing is done - self.task_config.DO_IMS_SCF = False + self.jedi_dict = Jedi.get_jedi_dict(self.task_config.jedi_config, self.task_config, expected_keys) @logit(logger) def initialize(self) -> None: @@ -93,12 +86,8 @@ def initialize(self) -> None: This method will initialize a global snow ensemble analysis. This includes: + - stage input files from COM and create output directories - initialize JEDI applications - - staging model backgrounds - - staging observation files - - staging FV3-JEDI fix files - - staging B error files - - creating output directories Parameters ---------- @@ -109,77 +98,16 @@ def initialize(self) -> None: None """ - # stage backgrounds - logger.info(f"Staging background files from {self.task_config.STAGE_BKG_YAML}") - bkg_staging_dict = parse_j2yaml(self.task_config.STAGE_BKG_YAML, self.task_config) - FileHandler(bkg_staging_dict).sync() - logger.debug(f"Background files:\n{pformat(bkg_staging_dict)}") - - # stage orography - logger.info(f"Staging orography files from {self.task_config.STAGE_OROG_YAML}") - orog_staging_dict = parse_j2yaml(self.task_config.STAGE_OROG_YAML, self.task_config) - FileHandler(orog_staging_dict).sync() - logger.debug(f"Orography files:\n{pformat(orog_staging_dict)}") - # note JEDI will try to read the orog files for each member, let's just symlink - logger.info("Linking orography files for each member") - oro_files = glob.glob(os.path.join(self.task_config.DATA, 'orog', 'ens', '*')) - for mem in range(1, self.task_config.NMEM_ENS + 1): - dest = os.path.join(self.task_config.DATA, 'bkg', f"mem{mem:03}") - for oro_file in oro_files: - os.symlink(oro_file, os.path.join(dest, os.path.basename(oro_file))) - # need to symlink orography files for the ensmean too - dest = os.path.join(self.task_config.DATA, 'bkg', 'ensmean') - for oro_file in oro_files: - os.symlink(oro_file, os.path.join(dest, os.path.basename(oro_file))) - - # stage observations - logger.info(f"Staging list of observation files generated from JEDI config") - obs_dict = self.jedi_dict['snowanlvar'].render_jcb(self.task_config, 'snow_obs_staging') - FileHandler(obs_dict).sync() - logger.debug(f"Observation files:\n{pformat(obs_dict)}") - - # stage GTS bufr2ioda mapping YAML files - logger.info(f"Staging GTS bufr2ioda mapping YAML files from {self.task_config.STAGE_GTS_YAML}") - gts_mapping_list = parse_j2yaml(self.task_config.STAGE_GTS_YAML, self.task_config) - FileHandler(gts_mapping_list).sync() - - # stage FV3-JEDI fix files - logger.info(f"Staging JEDI fix files from {self.task_config.STAGE_JEDI_FIX_YAML}") - jedi_fix_dict = parse_j2yaml(self.task_config.STAGE_JEDI_FIX_YAML, self.task_config) - FileHandler(jedi_fix_dict).sync() - logger.debug(f"JEDI fix files:\n{pformat(jedi_fix_dict)}") - - # staging B error files - logger.info("Stage files for static background error") - berror_staging_dict = parse_j2yaml(self.task_config.STAGE_BERROR_YAML, self.task_config) - FileHandler(berror_staging_dict).sync() - logger.debug(f"Background error files:\n{pformat(berror_staging_dict)}") - - # need output dir for diags and anl - logger.debug("Create empty output [anl, diags] directories to receive output from executable") - newdirs = [ - os.path.join(self.task_config.DATA, 'anl'), - os.path.join(self.task_config.DATA, 'diags'), - ] - FileHandler({'mkdir': newdirs}).sync() - - # if 00z, do SCF preprocessing - if self.task_config.cyc == 0: - ims_scf_to_ioda_staging_dict = parse_j2yaml(self.task_config.STAGE_IMS_SCF2IODA_YAML, self.task_config) - FileHandler(ims_scf_to_ioda_staging_dict).sync() - self.jedi_dict['scf_to_ioda'].initialize(self.task_config) - # Check if file exists - ims_file = ims_scf_to_ioda_staging_dict['copy_opt'][0][1] - if os.path.exists(ims_file): - self.task_config.DO_IMS_SCF = True + # Stage files from COM + logger.info(f"Staging files from COM and creating output directories") + FileHandler(self.task_config.data_in).sync() - # initialize JEDI variational application - logger.info(f"Initializing JEDI variational DA application") + # Initialize JEDI applications + logger.info(f"Initializing JEDI applications") self.jedi_dict['snowanlvar'].initialize(self.task_config, clean_empty_obsspaces=False) - - # initialize ensemble mean computation - logger.info(f"Initializing JEDI ensemble mean application") self.jedi_dict['esnowanlensmean'].initialize(self.task_config) + if self.task_config.DO_IMS_SCF: + self.jedi_dict['scf_to_ioda'].initialize(self.task_config) @logit(logger) def execute(self, jedi_dict_key: str) -> None: @@ -203,10 +131,8 @@ def execute(self, jedi_dict_key: str) -> None: def finalize(self) -> None: """Performs closing actions of the Snow analysis task This method: - - tar and gzip the output diag files and place in COM/ - - copy the generated YAML file from initialize to the COM/ - - copy the analysis files to the COM/ - - copy the increment files to the COM/ + - compress and tar output diag files in COM + - save output files and YAMLs to COM Parameters ---------- @@ -214,69 +140,13 @@ def finalize(self) -> None: Instance of the SnowEnsAnalysis object """ - # ---- tar up diags - # path of output tar statfile - snowstat = os.path.join(self.task_config.COMOUT_SNOW_ANALYSIS, f"{self.task_config.APREFIX}snowstat.tgz") - - # get list of diag files to put in tarball - diags = glob.glob(os.path.join(self.task_config.DATA, 'diags', 'diag*nc')) - - logger.info(f"Compressing {len(diags)} diag files to {snowstat}") - - # gzip the files first - logger.debug(f"Gzipping {len(diags)} diag files") - for diagfile in diags: - with open(diagfile, 'rb') as f_in, gzip.open(f"{diagfile}.gz", 'wb') as f_out: - f_out.writelines(f_in) - - # open tar file for writing - logger.debug(f"Creating tar file {snowstat} with {len(diags)} gzipped diag files") - with tarfile.open(snowstat, "w|gz") as archive: - for diagfile in diags: - diaggzip = f"{diagfile}.gz" - archive.add(diaggzip, arcname=os.path.basename(diaggzip)) - - # get list of yamls to copy to ROTDIR - yamls = glob.glob(os.path.join(self.task_config.DATA, '*snow*yaml')) - - # copy full YAML from executable to ROTDIR - for src in yamls: - yaml_base = os.path.splitext(os.path.basename(src))[0] - dest_yaml_name = f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.{yaml_base}.yaml" - dest = os.path.join(self.task_config.COMOUT_CONF, dest_yaml_name) - logger.debug(f"Copying {src} to {dest}") - yaml_copy = { - 'copy': [[src, dest]] - } - FileHandler(yaml_copy).sync() + # Compress and tar diag files into COM directory + self.tar_diag_files(self.task_config.COMOUT_SNOW_ANALYSIS, + f"{self.task_config.APREFIX_ENS}snowstat.tgz") - logger.info("Copy analysis to COM") - bkgtimes = [] - if self.task_config.DOIAU: - # need both beginning and middle of window - bkgtimes.append(self.task_config.SNOW_WINDOW_BEGIN) - bkgtimes.append(self.task_config.current_cycle) - anllist = [] - for mem in range(1, self.task_config.NMEM_ENS + 1): - for bkgtime in bkgtimes: - template = f'{to_fv3time(bkgtime)}.sfc_data.tile{{tilenum}}.nc' - for itile in range(1, self.task_config.ntiles + 1): - filename = template.format(tilenum=itile) - src = os.path.join(self.task_config.DATA, 'anl', f"mem{mem:03d}", filename) - COMOUT_SNOW_ANALYSIS = self.task_config.COMOUT_SNOW_ANALYSIS.replace('ensstat', f"mem{mem:03d}") - dest = os.path.join(COMOUT_SNOW_ANALYSIS, filename) - anllist.append([src, dest]) - FileHandler({'copy': anllist}).sync() - - logger.info('Copy increments to COM') - template = f'snowinc.{to_fv3time(self.task_config.current_cycle)}.sfc_data.tile{{tilenum}}.nc' - inclist = [] - for itile in range(1, self.task_config.ntiles + 1): - filename = template.format(tilenum=itile) - src = os.path.join(self.task_config.DATA, 'anl', filename) - dest = os.path.join(self.task_config.COMOUT_SNOW_ANALYSIS, filename) - inclist.append([src, dest]) - FileHandler({'copy': inclist}).sync() + # Save files to COM + logger.info(f"Saving files to COM") + FileHandler(self.task_config.data_out).sync() @logit(logger) def add_increments(self) -> None: @@ -291,7 +161,7 @@ def add_increments(self) -> None: if self.task_config.DOIAU: logger.info("Copying increments to beginning of window") template_in = f'snowinc.{to_fv3time(self.task_config.current_cycle)}.sfc_data.tile{{tilenum}}.nc' - template_out = f'snowinc.{to_fv3time(self.task_config.SNOW_WINDOW_BEGIN)}.sfc_data.tile{{tilenum}}.nc' + template_out = f'snowinc.{to_fv3time(self.task_config.WINDOW_BEGIN)}.sfc_data.tile{{tilenum}}.nc' inclist = [] for itile in range(1, self.task_config.ntiles + 1): filename_in = template_in.format(tilenum=itile) @@ -304,7 +174,7 @@ def add_increments(self) -> None: bkgtimes = [] if self.task_config.DOIAU: # need both beginning and middle of window - bkgtimes.append(self.task_config.SNOW_WINDOW_BEGIN) + bkgtimes.append(self.task_config.WINDOW_BEGIN) bkgtimes.append(self.task_config.current_cycle) # loop over members