diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 7e5c4b8bcf3..7cb78c106de 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -42,6 +42,7 @@ jobs/JGFS_ATMOS_PGRB2_SPEC_NPOESS @WenMeng-NOAA jobs/JGFS_ATMOS_POSTSND @BoCui-NOAA jobs/JGFS_ATMOS_VERIFICATION jobs/JGLOBAL_AERO_ANALYSIS_* @CoryMartin-NOAA +jobs/JGLOBAL_ANALYSIS_STATS @CoryMartin-NOAA jobs/JGLOBAL_*ARCH* @DavidHuber-NOAA jobs/JGLOBAL_ATMENS_ANALYSIS_* @RussTreadon-NOAA @CoryMartin-NOAA @DavidNew-NOAA jobs/JGLOBAL_ATMOS_ANALYSIS @RussTreadon-NOAA @CatherineThomas-NOAA @@ -96,6 +97,7 @@ scripts/exgfs_pmgr.sh scripts/exgfs_prdgen_manager.sh scripts/exgfs_wave_* @JessicaMeixner-NOAA @sbanihash scripts/exglobal_aero_analysis_* @CoryMartin-NOAA +scripts/exglobal_analysis_stats.py @CoryMartin-NOAA scripts/exglobal_archive_*.py @DavidHuber-NOAA scripts/exglobal_globus_*.py @DavidHuber-NOAA scripts/exglobal_atm_analysis_* @RussTreadon-NOAA @DavidNew-NOAA @@ -190,6 +192,7 @@ ush/python/pygfs/task/aero_bmatrix.py @DavidNew-NOAA @CoryMartin-NOAA ush/python/pygfs/task/aero_emissions.py @bbakernoaa ush/python/pygfs/task/aero_prepobs.py @CoryMartin-NOAA ush/python/pygfs/task/analysis.py @DavidNew-NOAA @RussTreadon-NOAA +ush/python/pygfs/task/analysis_stats.py @CoryMartin-NOAA ush/python/pygfs/task/archive.py @DavidHuber-NOAA ush/python/pygfs/task/atm_analysis.py @DavidNew-NOAA @RussTreadon-NOAA ush/python/pygfs/task/atmens_analysis.py @DavidNew-NOAA @RussTreadon-NOAA diff --git a/.gitignore b/.gitignore index 0f186ee8029..1fa7bbf34ef 100644 --- a/.gitignore +++ b/.gitignore @@ -52,6 +52,7 @@ parm/gdas/io parm/gdas/ioda parm/gdas/snow parm/gdas/soca +parm/gdas/stat parm/gdas/jcb-gdas parm/gdas/jcb-algorithms parm/monitor diff --git a/env/GAEAC6.env b/env/GAEAC6.env index 6d077770c95..56a13619a3a 100755 --- a/env/GAEAC6.env +++ b/env/GAEAC6.env @@ -146,6 +146,11 @@ case ${step} in export NTHREADS_OCNANAL=${NTHREADSmax} export APRUN_MARINEANLCHKPT="${APRUN_default} --cpus-per-task=${NTHREADS_OCNANAL}" ;; + "anlstat") + + export NTHREADS_ANLSTAT=${NTHREADSmax} + export APRUN_ANLSTAT="${APRUN_default} --cpus-per-task=${NTHREADS_ANLSTAT}" + ;; "marineanlletkf") export NTHREADS_MARINEANLLETKF=${NTHREADSmax} diff --git a/env/HERA.env b/env/HERA.env index 9d5638a8a39..fbfd19df0de 100755 --- a/env/HERA.env +++ b/env/HERA.env @@ -109,6 +109,11 @@ elif [[ "${step}" = "atmanlfv3inc" ]]; then export NTHREADS_ATMANLFV3INC=${NTHREADSmax} export APRUN_ATMANLFV3INC="${APRUN_default} --cpus-per-task=${NTHREADS_ATMANLFV3INC}" +elif [[ "${step}" = "anlstat" ]]; then + + export NTHREADS_ANLSTAT=${NTHREADSmax} + export APRUN_ANLSTAT="${APRUN_default} --cpus-per-task=${NTHREADS_ANLSTAT}" + elif [[ "${step}" = "prepobsaero" ]]; then export NTHREADS_PREPOBSAERO=${NTHREADS1} diff --git a/env/HERCULES.env b/env/HERCULES.env index 6128c4b6869..622d61394c3 100755 --- a/env/HERCULES.env +++ b/env/HERCULES.env @@ -157,6 +157,11 @@ case ${step} in export NTHREADS_MARINEANLLETKF=${NTHREADSmax} export APRUN_MARINEANLLETKF="${APRUN_default}" ;; + "anlstat") + + export NTHREADS_ANLSTAT=${NTHREADSmax} + export APRUN_ANLSTAT="${APRUN_default} --cpus-per-task=${NTHREADS_ANLSTAT}" + ;; "ecen_fv3jedi") export NTHREADS_ECEN_FV3JEDI=${NTHREADSmax} diff --git a/env/ORION.env b/env/ORION.env index 8b6ca5f3da7..d5be672a536 100755 --- a/env/ORION.env +++ b/env/ORION.env @@ -158,6 +158,11 @@ elif [[ "${step}" = "marineanlletkf" ]]; then export NTHREADS_MARINEANLLETKF=${NTHREADSmax} export APRUN_MARINEANLLETKF="${APRUN_default}" +elif [[ "${step}" = "anlstat" ]]; then + + export NTHREADS_ANLSTAT=${NTHREADSmax} + export APRUN_ANLSTAT="${APRUN_default} --cpus-per-task=${NTHREADS_ANLSTAT}" + elif [[ "${step}" = "ecen_fv3jedi" ]]; then export NTHREADS_ECEN_FV3JEDI=${NTHREADSmax} diff --git a/env/WCOSS2.env b/env/WCOSS2.env index 22f17da977b..e0e27da2351 100755 --- a/env/WCOSS2.env +++ b/env/WCOSS2.env @@ -143,6 +143,11 @@ elif [[ "${step}" = "atmanlfv3inc" ]]; then export NTHREADS_ATMANLFV3INC=${NTHREADSmax} export APRUN_ATMANLFV3INC="${APRUN_default}" +elif [[ "${step}" = "anlstat" ]]; then + + export NTHREADS_ANLSTAT=${NTHREADSmax} + export APRUN_ANLSTAT="${APRUN_default}" + elif [[ "${step}" = "ecen_fv3jedi" ]]; then export NTHREADS_ECEN_FV3JEDI=${NTHREADSmax} diff --git a/jobs/JGLOBAL_ANALYSIS_STATS b/jobs/JGLOBAL_ANALYSIS_STATS new file mode 100755 index 00000000000..70255a5848b --- /dev/null +++ b/jobs/JGLOBAL_ANALYSIS_STATS @@ -0,0 +1,44 @@ +#! /usr/bin/env bash + +source "${HOMEgfs}/ush/jjob_header.sh" -e "anlstat" -c "base anlstat" + +############################################## +# Set variables used in the script +############################################## + + +############################################## +# Begin JOB SPECIFIC work +############################################## + +# Generate COM variables from templates +YMD=${PDY} HH=${cyc} declare_from_tmpl -rx COMIN_OBS:COM_OBS_TMPL \ + COMIN_ATMOS_ANALYSIS:COM_ATMOS_ANALYSIS_TMPL \ + COMIN_OCEAN_ANALYSIS:COM_OCEAN_ANALYSIS_TMPL \ + COMIN_AERO_ANALYSIS:COM_CHEM_ANALYSIS_TMPL \ + COMIN_SNOW_ANALYSIS:COM_SNOW_ANALYSIS_TMPL \ + COMOUT_CONF:COM_CONF_TMPL \ + COMOUT_ATMOS_ANLMON:COM_ATMOS_ANLMON_TMPL \ + COMOUT_OCEAN_ANLMON:COM_OCEAN_ANLMON_TMPL \ + COMOUT_AERO_ANLMON:COM_CHEM_ANLMON_TMPL \ + COMOUT_SNOW_ANLMON:COM_SNOW_ANLMON_TMPL + +############################################################### +# Run relevant script + +EXSCRIPT=${ANLSTATSPY:-${SCRgfs}/exglobal_analysis_stats.py} +${EXSCRIPT} +export err=$?; err_chk + +############################################## +# End JOB SPECIFIC work +############################################## + +############################################## +# Final processing +############################################## +if [[ -e "${pgmout}" ]] ; then + cat "${pgmout}" +fi + +exit 0 diff --git a/jobs/rocoto/anlstat.sh b/jobs/rocoto/anlstat.sh new file mode 100755 index 00000000000..5bb74d791d9 --- /dev/null +++ b/jobs/rocoto/anlstat.sh @@ -0,0 +1,20 @@ +#! /usr/bin/env bash + +set -x + +############################################################### +# Source UFSDA workflow modules +. "${HOMEgfs}/ush/load_ufsda_modules.sh" +err=$? +if [[ ${err} -ne 0 ]]; then + echo "FATAL ERROR Failed to load UFSDA modules!" + exit "${err}" +fi + +export job="anlstat" +export jobid="${job}.$$" + +############################################################### +# Execute the JJOB +"${HOMEgfs}/jobs/JGLOBAL_ANALYSIS_STATS" +exit $? diff --git a/parm/config/gfs/config.anlstat b/parm/config/gfs/config.anlstat new file mode 100644 index 00000000000..3293c3376ae --- /dev/null +++ b/parm/config/gfs/config.anlstat @@ -0,0 +1,17 @@ +#!/bin/bash -x + +########## config.anlstat ########## +# Analysis Stat + +echo "BEGIN: config.anlstat" + +# Get task specific resources +source "${EXPDIR}/config.resources" anlstat + +export JEDI_CONFIG_YAML="${PARMgfs}/gdas/anlstat_jedi_config.yaml.j2" +export STAT_BASE_CONFIG_YAML="${PARMgfs}/config/gfs/yaml/stat_base_config.yaml.j2" +export JCB_BASE_YAML="${PARMgfs}/gdas/stat/aero/jcb-base.yaml.j2" +export JCB_ALGO_YAML="${PARMgfs}/gdas/jcb-algorithms/anlstat.yaml.j2" +export JEDIEXE="${HOMEgfs}/sorc/gdas.cd/build/bin/ioda-stats.x" + +echo "END: config.anlstat" diff --git a/parm/config/gfs/config.base.j2 b/parm/config/gfs/config.base.j2 index e1dae0d87de..75dac7f6185 100644 --- a/parm/config/gfs/config.base.j2 +++ b/parm/config/gfs/config.base.j2 @@ -82,6 +82,7 @@ export DO_GENESIS_FSU="{{ DO_GENESIS_FSU }}" # Cyclone genesis verification (FSU export DO_VERFOZN="YES" # Ozone data assimilation monitoring export DO_VERFRAD="YES" # Radiance data assimilation monitoring export DO_VMINMON="YES" # GSI minimization monitoring +export DO_ANLSTAT="NO" # JEDI-based analysis statistics export DO_MOS="NO" # GFS Model Output Statistics - Only supported on WCOSS2 # NO for retrospective parallel; YES for real-time parallel @@ -499,6 +500,11 @@ if [[ "${DO_JEDIATMVAR}" = "YES" ]]; then export DO_VERFOZN="NO" # Ozone data assimilation monitoring export DO_VERFRAD="NO" # Radiance data assimilation monitoring export DO_VMINMON="NO" # GSI minimization monitoring + export DO_ANLSTAT="YES" # JEDI-based analysis statistics +else + if [[ ${DO_AERO} = "YES" || ${DO_JEDIOCNVAR} = "YES" || ${DO_JEDISNOWDA} = "YES " ]]; then + export DO_ANLSTAT="YES" # JEDI-based analysis statistics + fi fi # If starting ICs that are not at cycle hour diff --git a/parm/config/gfs/config.com b/parm/config/gfs/config.com index 649f82e3e34..76cb7a2c5a3 100644 --- a/parm/config/gfs/config.com +++ b/parm/config/gfs/config.com @@ -55,6 +55,7 @@ declare -rx COM_ATMOS_INPUT_TMPL=${COM_BASE}'/model/atmos/input' declare -rx COM_ATMOS_RESTART_TMPL=${COM_BASE}'/model/atmos/restart' declare -rx COM_ATMOS_ANALYSIS_TMPL=${COM_BASE}'/analysis/atmos' declare -rx COM_SNOW_ANALYSIS_TMPL=${COM_BASE}'/analysis/snow' +declare -rx COM_SNOW_ANLMON_TMPL=${COM_BASE}'/products/snow/anlmon' declare -rx COM_ATMOS_HISTORY_TMPL=${COM_BASE}'/model/atmos/history' declare -rx COM_ATMOS_MASTER_TMPL=${COM_BASE}'/model/atmos/master' declare -rx COM_ATMOS_GRIB_TMPL=${COM_BASE}'/products/atmos/grib2' @@ -68,6 +69,7 @@ declare -rx COM_ATMOS_IMAGERY_TMPL=${COM_BASE}'/products/atmos/imagery' declare -rx COM_ATMOS_OZNMON_TMPL=${COM_BASE}'/products/atmos/oznmon' declare -rx COM_ATMOS_RADMON_TMPL=${COM_BASE}'/products/atmos/radmon' declare -rx COM_ATMOS_MINMON_TMPL=${COM_BASE}'/products/atmos/minmon' +declare -rx COM_ATMOS_ANLMON_TMPL=${COM_BASE}'/products/atmos/anlmon' declare -rx COM_ATMOS_WMO_TMPL=${COM_BASE}'/products/atmos/wmo' declare -rx COM_WAVE_RESTART_TMPL=${COM_BASE}'/model/wave/restart' @@ -83,6 +85,7 @@ declare -rx COM_OCEAN_HISTORY_TMPL=${COM_BASE}'/model/ocean/history' declare -rx COM_OCEAN_RESTART_TMPL=${COM_BASE}'/model/ocean/restart' declare -rx COM_OCEAN_INPUT_TMPL=${COM_BASE}'/model/ocean/input' declare -rx COM_OCEAN_ANALYSIS_TMPL=${COM_BASE}'/analysis/ocean' +declare -rx COM_OCEAN_ANLMON_TMPL=${COM_BASE}'/products/ocean/anlmon' declare -rx COM_OCEAN_LETKF_TMPL=${COM_BASE}'/analysis/ocean/letkf' declare -rx COM_OCEAN_BMATRIX_TMPL=${COM_BASE}'/bmatrix/ocean' declare -rx COM_OCEAN_NETCDF_TMPL=${COM_BASE}'/products/ocean/netcdf' @@ -91,6 +94,7 @@ declare -rx COM_OCEAN_GRIB_GRID_TMPL=${COM_OCEAN_GRIB_TMPL}'/${GRID}' declare -rx COM_ICE_ANALYSIS_TMPL=${COM_BASE}'/analysis/ice' declare -rx COM_ICE_LETKF_TMPL=${COM_BASE}'/analysis/ice/letkf' +declare -rx COM_ICE_ANLMON_TMPL=${COM_BASE}'/products/ice/anlmon' declare -rx COM_ICE_BMATRIX_TMPL=${COM_BASE}'/bmatrix/ice' declare -rx COM_ICE_INPUT_TMPL=${COM_BASE}'/model/ice/input' declare -rx COM_ICE_HISTORY_TMPL=${COM_BASE}'/model/ice/history' @@ -102,5 +106,6 @@ declare -rx COM_ICE_GRIB_GRID_TMPL=${COM_ICE_GRIB_TMPL}'/${GRID}' declare -rx COM_CHEM_HISTORY_TMPL=${COM_BASE}'/model/chem/history' declare -rx COM_CHEM_ANALYSIS_TMPL=${COM_BASE}'/analysis/chem' declare -rx COM_CHEM_BMAT_TMPL=${COM_CHEM_ANALYSIS_TMPL}'/bmatrix' +declare -rx COM_CHEM_ANLMON_TMPL=${COM_BASE}'/products/chem/anlmon' declare -rx COM_MED_RESTART_TMPL=${COM_BASE}'/model/med/restart' diff --git a/parm/config/gfs/config.resources b/parm/config/gfs/config.resources index 6a3d5b33018..436190c03a3 100644 --- a/parm/config/gfs/config.resources +++ b/parm/config/gfs/config.resources @@ -17,7 +17,7 @@ if (( $# != 1 )); then echo "atmensanlinit atmensanlobs atmensanlsol atmensanlletkf atmensanlfv3inc atmensanlfinal ecen_fv3jedi analcalc_fv3jedi" echo "snowanl esnowanl" echo "prepobsaero aeroanlinit aeroanlvar aeroanlfinal aeroanlgenb" - echo "anal sfcanl analcalc analdiag fcst echgres" + echo "anal sfcanl analcalc analdiag anlstat fcst echgres" echo "upp atmos_products" echo "tracker genesis genesis_fsu" echo "verfozn verfrad vminmon fit2obs metp arc_vrfy arc_tars cleanup" @@ -801,6 +801,14 @@ case ${step} in memory="48GB" ;; + "anlstat") + walltime="00:30:00" + ntasks=20 + threads_per_task=1 + tasks_per_node=20 + memory="90GB" + ;; + "sfcanl") walltime="00:20:00" ntasks=${ntiles:-6} diff --git a/parm/config/gfs/yaml/stat_base_config.yaml.j2 b/parm/config/gfs/yaml/stat_base_config.yaml.j2 new file mode 100644 index 00000000000..06badfd9ca2 --- /dev/null +++ b/parm/config/gfs/yaml/stat_base_config.yaml.j2 @@ -0,0 +1,26 @@ +aero: + stat_file_path: '{{ COMIN_AERO_ANALYSIS }}' + stat_file_name: 'aerostat.tgz' + obs spaces: + - name: viirs_npp + input file: "diag_viirs_npp_aod_{{ current_cycle | to_YMDH }}.nc" + output file: "viirs_npp_{{ current_cycle | to_YMDH }}_output_aod.nc" + - name: viirs_n20 + input file: "diag_viirs_n20_aod_{{ current_cycle | to_YMDH }}.nc" + output file: "viirs_n20_{{ current_cycle | to_YMDH }}_output_aod.nc" + +atmos: + stat_file_path: '{{ COMIN_ATMOS_ANALYSIS }}' + stat_file_name: 'atmstat' + obs spaces: + - name: conventional_ps + input file: "diag_conventional_ps_{{ current_cycle | to_YMDH }}.nc" + output file: "conventional_ps_{{ current_cycle | to_YMDH }}_output_atmos.nc" + +snow: + stat_file_path: '{{ COMIN_SNOW_ANALYSIS }}' + stat_file_name: 'snowstat.tgz' + obs spaces: + - name: ims_snow + input file: "diag_ims_snow_{{ current_cycle | to_YMDH }}.nc" + output file: "ims_snow_{{ current_cycle | to_YMDH }}_output_snow.nc" diff --git a/parm/gdas/anlstat_jedi_config.yaml.j2 b/parm/gdas/anlstat_jedi_config.yaml.j2 new file mode 100644 index 00000000000..f09f6f84185 --- /dev/null +++ b/parm/gdas/anlstat_jedi_config.yaml.j2 @@ -0,0 +1,24 @@ +aero: + rundir: '{{ DATA }}' + exe_src: '{{ JEDIEXE }}' + mpi_cmd: '{{ APRUN_ANLSTAT }}' + # jedi_args: None + jcb_base_yaml: '{{ PARMgfs }}/gdas/stat/aero/jcb-base.yaml.j2' + jcb_algo_yaml: '{{ JCB_ALGO_YAML }}' + jcb_algo: 'anlstat' +atmos: + rundir: '{{ DATA }}' + exe_src: '{{ JEDIEXE }}' + mpi_cmd: '{{ APRUN_ANLSTAT }}' + # jedi_args: None + jcb_base_yaml: '{{ PARMgfs }}/gdas/stat/atmos/jcb-base.yaml.j2' + jcb_algo_yaml: '{{ JCB_ALGO_YAML }}' + jcb_algo: 'anlstat' +snow: + rundir: '{{ DATA }}' + exe_src: '{{ JEDIEXE }}' + mpi_cmd: '{{ APRUN_ANLSTAT }}' + # jedi_args: None + jcb_base_yaml: '{{ PARMgfs }}/gdas/stat/snow/jcb-base.yaml.j2' + jcb_algo_yaml: '{{ JCB_ALGO_YAML }}' + jcb_algo: 'anlstat' diff --git a/scripts/exglobal_analysis_stats.py b/scripts/exglobal_analysis_stats.py new file mode 100755 index 00000000000..745f33efecc --- /dev/null +++ b/scripts/exglobal_analysis_stats.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 + +# exglobal_analysis_stats.py +# This script creates a AnalysisStats class +# and runs the initialize, execute, and finalize +# methods which create and stage the runtime directory +# and create the YAML configuration +# to produce summary statistics from the desired analyses +import os + +from wxflow import Logger, cast_strdict_as_dtypedict +from pygfs.task.analysis_stats import AnalysisStats + +# Initialize root logger +logger = Logger(level='DEBUG', colored_log=True) + + +if __name__ == '__main__': + + # Take configuration from environment and cast it as python dictionary + config = cast_strdict_as_dtypedict(os.environ) + + # Instantiate the atm analysis task + AnlStats = AnalysisStats(config) + + # Create list based on DA components + AnlStats.task_config['STAT_ANALYSES'] = [] + if AnlStats.task_config.DO_AERO_ANL: + AnlStats.task_config['STAT_ANALYSES'].append('aero') + if AnlStats.task_config.DO_JEDISNOWDA: + AnlStats.task_config['STAT_ANALYSES'].append('snow') + if AnlStats.task_config.DO_JEDIATMVAR: + AnlStats.task_config['STAT_ANALYSES'].append('atmos') + + # Initialize JEDI variational analysis + AnlStats.initialize() + for anl in AnlStats.task_config.STAT_ANALYSES: + AnlStats.execute(anl) + AnlStats.finalize(anl) diff --git a/sorc/link_workflow.sh b/sorc/link_workflow.sh index fb5d16b61df..12391c398f0 100755 --- a/sorc/link_workflow.sh +++ b/sorc/link_workflow.sh @@ -227,7 +227,7 @@ fi #------------------------------ if [[ -d "${HOMEgfs}/sorc/gdas.cd" ]]; then cd "${HOMEgfs}/parm/gdas" || exit 1 - declare -a gdasapp_comps=("aero" "atm" "io" "ioda" "snow" "soca" "jcb-gdas" "jcb-algorithms") + declare -a gdasapp_comps=("aero" "atm" "io" "ioda" "snow" "soca" "jcb-gdas" "jcb-algorithms" "stat") for comp in "${gdasapp_comps[@]}"; do if [[ -d "${comp}" ]]; then rm -rf "${comp}" diff --git a/ush/python/pygfs/task/analysis_stats.py b/ush/python/pygfs/task/analysis_stats.py new file mode 100644 index 00000000000..f0def9e5353 --- /dev/null +++ b/ush/python/pygfs/task/analysis_stats.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python3 + +import os +import gzip +import tarfile +from logging import getLogger +from pprint import pformat +from typing import Optional, Dict, Any + +from wxflow import (AttrDict, + FileHandler, + add_to_datetime, to_timedelta, + Task, + parse_j2yaml, + logit) +from pygfs.jedi import Jedi + +logger = getLogger(__name__.split('.')[-1]) + + +class AnalysisStats(Task): + """ + Class for JEDI-based global analysis stats tasks + """ + @logit(logger, name="AnalysisStats") + def __init__(self, config: Dict[str, Any]): + """ + Constructor global analysis stats task + This method will construct a global analysis stats task. + This includes: + - extending the task_config attribute AttrDict to include parameters required for this task + - instantiate the Jedi attribute objects + Parameters + ---------- + config: Dict + dictionary object containing task configuration + Returns + ---------- + None + """ + super().__init__(config) + + _window_begin = add_to_datetime(self.task_config.current_cycle, -to_timedelta(f"{self.task_config.assim_freq}H") / 2) + + # Create a local dictionary that is repeatedly used across this class + local_dict = AttrDict( + { + 'STAT_WINDOW_BEGIN': _window_begin, + 'STAT_WINDOW_LENGTH': f"PT{self.task_config.assim_freq}H", + 'OPREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", + 'APREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", + 'GPREFIX': f"gdas.t{self.task_config.previous_cycle.hour:02d}z." + } + ) + # Extend task_config with local_dict + self.task_config = AttrDict(**self.task_config, **local_dict) + + @logit(logger) + def initialize(self) -> None: + """ + This method will initialize a global analysis stats task. + This includes: + - initialize JEDI applications + - copying stat files + Parameters + ---------- + None + Returns + ---------- + None + """ + # Create dictionary of Jedi objects + # Expected keys are what must be included from the JEDI config file. We can + # then loop through ob space list from scripts/exglobal_analysis_stats.py + expected_keys = ['aero', 'atmos', 'snow'] + self.jedi_dict = Jedi.get_jedi_dict(self.task_config.JEDI_CONFIG_YAML, self.task_config, expected_keys) + + logger.info(f"Copying files to {self.task_config.DATA}/stats") + + # Extract info from stat config file + analysis_config_dict = parse_j2yaml(self.task_config.STAT_BASE_CONFIG_YAML, self.task_config) + + # Loop through a copy of ob space list + for analysis in self.task_config.STAT_ANALYSES[:]: + logger.info(f"Working on analysis type: {analysis}") + + # Copy stat files to DATA path + input_tar = os.path.join(analysis_config_dict[analysis]['stat_file_path'], + f"{self.task_config['APREFIX']}{analysis_config_dict[analysis]['stat_file_name']}") + diag_dir_path = os.path.join(self.task_config.DATA, analysis) + + dest = os.path.join(diag_dir_path, analysis_config_dict[analysis]['stat_file_name']) + logger.info(f"Copying {input_tar} to {dest} ...") + tarball_list = [[input_tar, dest]] + FileHandler({'mkdir': [diag_dir_path], 'copy': tarball_list}).sync() + + # Open tar file + logger.info(f"Open tarred diagnostic files in {dest}") + with tarfile.open(dest, "r") as tar: + # Check if tar file is empty + if not tar.getnames(): + logger.warning(f"WARNING. The tar file {dest} is empty. No files to extract.") + logger.warning("Moving to next analysis ...") + # Remove analysis from STAT_ANALYSES and move to next + self.task_config.STAT_ANALYSES.remove(analysis) + logger.info(f"current analysis list: {self.task_config.STAT_ANALYSES}") + continue + # Extract all files to the current directory + tar.extractall(path=diag_dir_path) + + self.task_config.OBSSPACES_LIST = [] + for analysis_dict in analysis_config_dict[analysis]['obs spaces']: + # Gunzip .nc files + gz_file = os.path.join(diag_dir_path, (analysis_dict['input file'] + ".gz")) + + # Check if the file exists + if os.path.exists(gz_file): + logger.info(f"Now processing {gz_file}") + output_file = os.path.join(diag_dir_path, analysis_dict['input file']) + # Open the .gz file + with gzip.open(gz_file, 'rb') as f_in: + with open(output_file, 'wb') as f_out: + f_out.write(f_in.read()) + else: + logger.warning(f"WARNING. {gz_file} does not exist to extract.") + logger.warning("Moving to next analysis ...") + continue # Skip current analysis and move to next + + self.task_config.OBSSPACES_LIST.append(analysis_dict['name']) + + # initialize JEDI application + logger.info(f"Initializing JEDI ioda-stats extraction application") + self.jedi_dict[analysis].initialize(self.task_config) + + @logit(logger) + def execute(self, jedi_dict_key: str) -> None: + """Execute JEDI application of analysis stats + + Parameters + ---------- + jedi_dict_key + key specifying particular Jedi object in self.jedi_dict + + Returns + ---------- + None + """ + + self.jedi_dict[jedi_dict_key].execute() + + @logit(logger) + def finalize(self, jedi_dict_key: str) -> None: + """Finalize the analysis statistics job. + + This method will finalize the analysis statistics job using JEDI. + This includes: + - copying stat files to specified outdir + + Parameters + ---------- + jedi_dict_key + key specifying particular Jedi object in self.jedi_dict + + Returns + ---------- + None + """ + + analysis_config_dict = parse_j2yaml(self.task_config.STAT_BASE_CONFIG_YAML, self.task_config) + + for analysis_dict in analysis_config_dict[jedi_dict_key]['obs spaces']: + statfile = os.path.join(self.task_config.DATA, analysis_dict['output file']) + outdir = self.task_config['COMOUT_' + jedi_dict_key.upper() + '_ANLMON'] + + # Check if the directory exists; if not, create it + if not os.path.exists(outdir): + FileHandler({'mkdir': [outdir]}).sync() + + dest = os.path.join(outdir, f"{analysis_dict['output file']}") + logger.debug(f"copying {statfile} to {dest}") + stat_copy = { + 'copy': [[statfile, dest]] + } + FileHandler(stat_copy).sync() diff --git a/workflow/applications/applications.py b/workflow/applications/applications.py index 7ec8e58eed8..656f97bfeb6 100644 --- a/workflow/applications/applications.py +++ b/workflow/applications/applications.py @@ -88,6 +88,7 @@ def _get_run_options(self, conf: Configuration) -> Dict[str, Any]: run_options[run]['do_verfozn'] = run_base.get('DO_VERFOZN', True) run_options[run]['do_verfrad'] = run_base.get('DO_VERFRAD', True) run_options[run]['do_vminmon'] = run_base.get('DO_VMINMON', True) + run_options[run]['do_anlstat'] = run_base.get('DO_ANLSTAT', True) run_options[run]['do_tracker'] = run_base.get('DO_TRACKER', True) run_options[run]['do_genesis'] = run_base.get('DO_GENESIS', True) run_options[run]['do_genesis_fsu'] = run_base.get('DO_GENESIS_FSU', False) diff --git a/workflow/applications/gfs_cycled.py b/workflow/applications/gfs_cycled.py index 213897e67cf..b7380b327c8 100644 --- a/workflow/applications/gfs_cycled.py +++ b/workflow/applications/gfs_cycled.py @@ -108,6 +108,9 @@ def _get_app_configs(self, run): if options['do_vminmon']: configs += ['vminmon'] + if options['do_anlstat']: + configs += ['anlstat'] + if options['do_tracker']: configs += ['tracker'] @@ -260,6 +263,9 @@ def get_task_names(self): if options['do_vminmon']: task_names[run] += ['vminmon'] + if options['do_anlstat']: + task_names[run] += ['anlstat'] + # gfs-only verification/tracking if run == 'gfs': if options['do_tracker']: diff --git a/workflow/rocoto/gfs_tasks.py b/workflow/rocoto/gfs_tasks.py index 612f4c0b376..71bc2162c2d 100644 --- a/workflow/rocoto/gfs_tasks.py +++ b/workflow/rocoto/gfs_tasks.py @@ -1797,6 +1797,40 @@ def vminmon(self): return task + def anlstat(self): + deps = [] + if self.options['do_jediatmvar']: + dep_dict = {'type': 'task', 'name': f'{self.run}_atmanlfinal'} + deps.append(rocoto.add_dependency(dep_dict)) + if self.options['do_jediocnvar']: + dep_dict = {'type': 'task', 'name': f'{self.run}_marineanlfinal'} + deps.append(rocoto.add_dependency(dep_dict)) + if self.options['do_jedisnowda']: + dep_dict = {'type': 'task', 'name': f'{self.run}_snowanl'} + deps.append(rocoto.add_dependency(dep_dict)) + if self.options['do_aero_anl']: + dep_dict = {'type': 'task', 'name': f'{self.run}_aeroanlfinal'} + deps.append(rocoto.add_dependency(dep_dict)) + + dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + + resources = self.get_resource('anlstat') + task_name = f'{self.run}_anlstat' + task_dict = {'task_name': task_name, + 'resources': resources, + 'dependency': dependencies, + 'envars': self.envars, + 'cycledef': self.run, + 'command': f'{self.HOMEgfs}/jobs/rocoto/anlstat.sh', + 'job_name': f'{self.pslot}_{task_name}_@H', + 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', + 'maxtries': '&MAXTRIES;' + } + + task = rocoto.create_task(task_dict) + + return task + def tracker(self): deps = [] dep_dict = {'type': 'metatask', 'name': f'{self.run}_atmos_prod'} @@ -2364,6 +2398,9 @@ def arch_tars(self): if self.options['do_vminmon']: dep_dict = {'type': 'task', 'name': f'{self.run}_vminmon'} deps.append(rocoto.add_dependency(dep_dict)) + if self.options['do_anlstat']: + dep_dict = {'type': 'task', 'name': f'{self.run}_anlstat'} + deps.append(rocoto.add_dependency(dep_dict)) elif self.run in ['gdas']: dep_dict = {'type': 'task', 'name': f'{self.run}_atmanlprod'} deps.append(rocoto.add_dependency(dep_dict)) @@ -2379,6 +2416,9 @@ def arch_tars(self): if self.options['do_vminmon']: dep_dict = {'type': 'task', 'name': f'{self.run}_vminmon'} deps.append(rocoto.add_dependency(dep_dict)) + if self.options['do_anlstat']: + dep_dict = {'type': 'task', 'name': f'{self.run}_anlstat'} + deps.append(rocoto.add_dependency(dep_dict)) if self.run in ['gfs'] and self.options['do_tracker']: dep_dict = {'type': 'task', 'name': f'{self.run}_tracker'} deps.append(rocoto.add_dependency(dep_dict)) diff --git a/workflow/rocoto/tasks.py b/workflow/rocoto/tasks.py index 512d36eadfb..ffe3b5f99ab 100644 --- a/workflow/rocoto/tasks.py +++ b/workflow/rocoto/tasks.py @@ -27,7 +27,7 @@ class Tasks: 'fcst', 'upp', 'atmanlprod', 'atmupp', 'goesupp', 'atmos_products', 'oceanice_products', - 'verfozn', 'verfrad', 'vminmon', + 'verfozn', 'verfrad', 'vminmon', 'anlstat', 'metp', 'fit2obs', 'extractvars', 'tracker', 'genesis', 'genesis_fsu', 'postsnd', 'awips', 'awips_20km_1p0deg', 'fbwind',