diff --git a/ci/cases/yamls/gefs_defaults_ci.yaml b/ci/cases/yamls/gefs_defaults_ci.yaml index 03607d911e6..99733ca821d 100644 --- a/ci/cases/yamls/gefs_defaults_ci.yaml +++ b/ci/cases/yamls/gefs_defaults_ci.yaml @@ -5,5 +5,7 @@ base: FHOUT_GFS: 6 USE_OCN_ENS_PERTURB_FILES: "NO" USE_ATM_ENS_PERTURB_FILES: "NO" + DO_BUFRSND: "NO" + DO_GEMPAK: "NO" ocn: MOM6_INTERP_ICS: "NO" diff --git a/jobs/rocoto/gempak.sh b/jobs/rocoto/gempak.sh index dc1d3f2621e..41121089d88 100755 --- a/jobs/rocoto/gempak.sh +++ b/jobs/rocoto/gempak.sh @@ -12,14 +12,14 @@ IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}" export FHR3 jobid for fhr in "${fhr_list[@]}"; do - FHR3=$(printf '%03d' "${fhr}") - jobid="${job}_f${FHR3}.$$" - ############################################################### - # Execute the JJOB - ############################################################### - "${HOMEgfs}/jobs/J${RUN^^}_ATMOS_GEMPAK" - status=$? - [[ ${status} -ne 0 ]] && exit "${status}" + FHR3=$(printf '%03d' "${fhr}") + jobid="${job}_f${FHR3}.$$" + ############################################################### + # Execute the JJOB + ############################################################### + "${HOMEgfs}/jobs/J${RUN^^}_ATMOS_GEMPAK" + err=$? + [[ ${err} -ne 0 ]] && exit "${err}" done exit 0 diff --git a/jobs/rocoto/postsnd.sh b/jobs/rocoto/postsnd.sh index 8e7ba1547e1..e0a869b8976 100755 --- a/jobs/rocoto/postsnd.sh +++ b/jobs/rocoto/postsnd.sh @@ -1,24 +1,17 @@ #! /usr/bin/env bash source "${HOMEgfs}/ush/preamble.sh" - -############################################################### -# Source FV3GFS workflow modules - -. ${HOMEgfs}/ush/load_fv3gfs_modules.sh +source "${HOMEgfs}/ush/load_fv3gfs_modules.sh" status=$? if [[ ${status} -ne 0 ]]; then - exit "${status}" + exit "${status}" fi export job="postsnd" export jobid="${job}.$$" -############################################################### +################################################################ # Execute the JJOB ${HOMEgfs}/jobs/JGFS_ATMOS_POSTSND -status=$? - - -exit "${status}" - +err=$? +exit "${err}" diff --git a/parm/config/gefs/config.gempak b/parm/config/gefs/config.gempak new file mode 120000 index 00000000000..668de5ba0fb --- /dev/null +++ b/parm/config/gefs/config.gempak @@ -0,0 +1 @@ +../gfs/config.gempak \ No newline at end of file diff --git a/parm/config/gefs/config.postsnd b/parm/config/gefs/config.postsnd new file mode 120000 index 00000000000..3ca118e114d --- /dev/null +++ b/parm/config/gefs/config.postsnd @@ -0,0 +1 @@ +../gfs/config.postsnd \ No newline at end of file diff --git a/parm/config/gefs/config.resources b/parm/config/gefs/config.resources index 201284042d7..88268ba9364 100644 --- a/parm/config/gefs/config.resources +++ b/parm/config/gefs/config.resources @@ -241,6 +241,20 @@ case ${step} in export is_exclusive=True ;; + "postsnd") + export walltime="02:00:00" + export ntasks=141 + export ntasks_postsndcfp=9 + export tasks_per_node=21 + export threads_per_task=6 + export tasks_per_node_postsndcfp=1 + postsnd_req_cores=$(( tasks_per_node * threads_per_task )) + if [[ ${postsnd_req_cores} -gt ${max_tasks_per_node} ]]; then + tasks_per_node=$(( max_tasks_per_node / threads_per_task )) + fi + export is_exclusive=True + ;; + "atmos_ensstat") # Walltime is per forecast hour; will be multipled by group size export walltime="00:15:00" @@ -310,6 +324,14 @@ case ${step} in export NTASKS=${ntasks} ;; + "gempak") + export walltime="00:30:00" + export ntasks=1 + export tasks_per_node=1 + export threads_per_task=1 + export memory="2GB" + ;; + "extractvars") export walltime="00:30:00" export ntasks=1 diff --git a/workflow/applications/gefs.py b/workflow/applications/gefs.py index 46ca1ff955c..591025e23fc 100644 --- a/workflow/applications/gefs.py +++ b/workflow/applications/gefs.py @@ -30,6 +30,12 @@ def _get_app_configs(self, run): options = self.run_options[run] configs = ['stage_ic', 'fcst', 'atmos_products'] + if options['do_bufrsnd']: + configs += ['postsnd'] + + if options['do_gempak']: + configs += ['gempak'] + if options['nens'] > 0: configs += ['efcs', 'atmos_ensstat'] @@ -82,6 +88,12 @@ def get_task_names(self): tasks += ['atmos_prod'] + if options['do_bufrsnd']: + tasks += ['postsnd'] + + if options['do_gempak']: + tasks += ['gempak'] + if options['nens'] > 0: tasks += ['atmos_ensstat'] @@ -100,7 +112,6 @@ def get_task_names(self): if options['do_extractvars']: tasks += ['extractvars'] - tasks += ['cleanup'] if options['do_archcom']: tasks += ['arch_tars'] if options['do_globusarch']: diff --git a/workflow/rocoto/gefs_tasks.py b/workflow/rocoto/gefs_tasks.py index 1d26ce94925..9b20b95d32d 100644 --- a/workflow/rocoto/gefs_tasks.py +++ b/workflow/rocoto/gefs_tasks.py @@ -264,6 +264,105 @@ def _atmosoceaniceprod(self, component: str): return task + def postsnd(self): + + resources = self.get_resource('postsnd') + + deps = [] + dep_dict = {'type': 'task', 'name': f'{self.run}_fcst_mem#member#'} + deps.append(rocoto.add_dependency(dep_dict)) + + dependencies = rocoto.create_dependency(dep=deps) + + postsnd_envars = self.envars.copy() + postenvar_dict = {'ENSMEM': '#member#', + 'MEMDIR': 'mem#member#'} + + for key, value in postenvar_dict.items(): + postsnd_envars.append(rocoto.create_envar(name=key, value=str(value))) + + resources = self.get_resource('postsnd') + task_name = f'{self.run}_postsnd_mem#member#' + task_dict = {'task_name': task_name, + 'resources': resources, + 'dependency': dependencies, + 'envars': postsnd_envars, + 'cycledef': self.run, + 'command': f'{self.HOMEgfs}/jobs/rocoto/postsnd.sh', + 'job_name': f'{self.pslot}_{task_name}_@H', + 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', + 'maxtries': '&MAXTRIES;' + } + + member_var_dict = {'member': ' '.join([str(mem).zfill(3) for mem in range(0, self.nmem + 1)])} + member_metatask_dict = {'task_name': f'{self.run}_postsnd', + 'task_dict': task_dict, + 'var_dict': member_var_dict + } + + task = rocoto.create_task(member_metatask_dict) + + return task + + def gempak(self): + + resources = self.get_resource('gempak') + + deps = [] + task = f'{self.run}_atmos_prod_mem#member#_#fhr_label#' + dep_dict = {'type': 'task', 'name': task} + deps.append(rocoto.add_dependency(dep_dict)) + + dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + + fhrs = self._get_forecast_hours(self.run, self._configs['gempak']) + + # when replaying, atmos component does not have fhr 0, therefore remove 0 from fhrs + is_replay = self._configs['gempak']['REPLAY_ICS'] + if is_replay and 0 in fhrs: + fhrs.remove(0) + + max_tasks = self._configs['gempak']['MAX_TASKS'] + fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) + + # Adjust walltime based on the largest group + largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) + resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) + + postenvars = self.envars.copy() + postenvar_dict = {'ENSMEM': '#member#', + 'MEMDIR': 'mem#member#', + 'FHR_LIST': '#fhr_list#'} + + for key, value in postenvar_dict.items(): + postenvars.append(rocoto.create_envar(name=key, value=str(value))) + + task_name = f'{self.run}_gempak_mem#member#_#fhr_label#' + task_dict = {'task_name': task_name, + 'resources': resources, + 'dependency': dependencies, + 'envars': postenvars, + 'cycledef': self.run, + 'command': f'{self.HOMEgfs}/jobs/rocoto/gempak.sh', + 'job_name': f'{self.pslot}_{task_name}_@H', + 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', + 'maxtries': '&MAXTRIES;' + } + + fhr_metatask_dict = {'task_name': f'{self.run}_gempak_#member#', + 'task_dict': task_dict, + 'var_dict': fhr_var_dict} + + member_var_dict = {'member': ' '.join([str(mem).zfill(3) for mem in range(0, self.nmem + 1)])} + member_metatask_dict = {'task_name': f'{self.run}_gempak', + 'task_dict': fhr_metatask_dict, + 'var_dict': member_var_dict + } + + task = rocoto.create_task(member_metatask_dict) + + return task + def atmos_ensstat(self): resources = self.get_resource('atmos_ensstat')