From 94143aaa8ae9ae4559b3d6ca83f231a612a2ef21 Mon Sep 17 00:00:00 2001 From: "Walter.Kolczynski" Date: Mon, 6 Jan 2025 15:37:17 -0600 Subject: [PATCH 01/21] Reinstate product groups Testing with full-sized GEFS found that the sheer number of tasks overloads rocoto, resulting in `rocotorun` taking over 10 min to complete or hanging entirely. To reduce the number of tasks, product groups are reimplemented so that multiple forecast hour are processed in a single task. However, the implementation is a little different than previously. The jobs where groups are enabled (atmos_products, oceanice_products, and wavepostsbs) have a new variable, `MAX_TASKS`, that controls how many groups to use. This setting is currently *per member*. The forecast hours to be processed are then divided into this many groups as evenly as possible without crossing forecast segment boundaries. The walltime for those jobs is then multiplied by the number of times in the largest group. A number of helper methods are added to Tasks to determine these groups and make a standard metatask variable dict in a centralized location. There is also a function to multiply the walltime, but this may be better off relocated to wxflow with the other time functions. As part of switching from a single value to a list, hours are no longer passed by rocoto as zero-padded values. The lists are comma-delimited (without spaces) and split apart in the job stub (`jobs/rocoto/*`), so each j-job call is still a single forecast hour. The offline post (upp) job is not broken into groups, since it really isn't used outside the analysis anymore. Gempak jobs that run over multiple forecast hours also aren't broken into groups yet. Resolves #2999 --- jobs/rocoto/atmos_products.sh | 20 ++- jobs/rocoto/oceanice_products.sh | 18 +- jobs/rocoto/wavepostsbs.sh | 19 ++- parm/config/gefs/config.atmos_products | 4 +- parm/config/gefs/config.oceanice_products | 4 +- parm/config/gefs/config.resources | 5 +- parm/config/gefs/config.wavepostsbs | 3 + parm/config/gfs/config.atmos_products | 4 +- parm/config/gfs/config.oceanice_products | 3 + parm/config/gfs/config.resources | 7 +- parm/config/gfs/config.wavepostsbs | 3 + workflow/rocoto/gefs_tasks.py | 72 ++++---- workflow/rocoto/gfs_tasks.py | 56 +++--- workflow/rocoto/tasks.py | 199 +++++++++++++++++++++- 14 files changed, 331 insertions(+), 86 deletions(-) diff --git a/jobs/rocoto/atmos_products.sh b/jobs/rocoto/atmos_products.sh index f6adbcf861a..5d545975194 100755 --- a/jobs/rocoto/atmos_products.sh +++ b/jobs/rocoto/atmos_products.sh @@ -15,13 +15,17 @@ if (( status != 0 )); then exit "${status}"; fi export job="atmos_products" export jobid="${job}.$$" -# Negatation needs to be before the base -fhr3_base="10#${FHR3}" -export FORECAST_HOUR=$(( ${fhr3_base/10#-/-10#} )) +# shellcheck disable=SC2153 +IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}" -############################################################### -# Execute the JJOB -############################################################### -"${HOMEgfs}/jobs/JGLOBAL_ATMOS_PRODUCTS" +export FORECAST_HOUR +for FORECAST_HOUR in "${fhr_list[@]}"; do + ############################################################### + # Execute the JJOB + ############################################################### + "${HOMEgfs}/jobs/JGLOBAL_ATMOS_PRODUCTS" + status=$? + [[ ${status} -ne 0 ]] && exit "${status}" +done -exit $? +exit 0 diff --git a/jobs/rocoto/oceanice_products.sh b/jobs/rocoto/oceanice_products.sh index 2a3b617d054..d72afcb99f4 100755 --- a/jobs/rocoto/oceanice_products.sh +++ b/jobs/rocoto/oceanice_products.sh @@ -15,11 +15,17 @@ if (( status != 0 )); then exit "${status}"; fi export job="oceanice_products" export jobid="${job}.$$" -export FORECAST_HOUR=$(( 10#${FHR3} )) +# shellcheck disable=SC2153 +IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}" -############################################################### -# Execute the JJOB -############################################################### -"${HOMEgfs}/jobs/JGLOBAL_OCEANICE_PRODUCTS" +export FORECAST_HOUR +for FORECAST_HOUR in "${fhr_list[@]}"; do + ############################################################### + # Execute the JJOB + ############################################################### + "${HOMEgfs}/jobs/JGLOBAL_OCEANICE_PRODUCTS" + status=$? + [[ ${status} -ne 0 ]] && exit "${status}" +done -exit $? +exit 0 diff --git a/jobs/rocoto/wavepostsbs.sh b/jobs/rocoto/wavepostsbs.sh index f4789210d87..b2eec789ae8 100755 --- a/jobs/rocoto/wavepostsbs.sh +++ b/jobs/rocoto/wavepostsbs.sh @@ -5,17 +5,24 @@ source "${HOMEgfs}/ush/preamble.sh" ############################################################### # Source FV3GFS workflow modules #. ${HOMEgfs}/ush/load_fv3gfs_modules.sh -. ${HOMEgfs}/ush/load_ufswm_modules.sh +source "${HOMEgfs}/ush/load_ufswm_modules.sh" status=$? -[[ ${status} -ne 0 ]] && exit ${status} +[[ ${status} -ne 0 ]] && exit "${status}" export job="wavepostsbs" export jobid="${job}.$$" ############################################################### -# Execute the JJOB -${HOMEgfs}/jobs/JGLOBAL_WAVE_POST_SBS -status=$? -[[ ${status} -ne 0 ]] && exit ${status} +# shellcheck disable=SC2153 +IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}" + +export FHR3 +for FORECAST_HOUR in "${fhr_list[@]}"; do + FHR3=$(printf '%03d' "${FORECAST_HOUR}") + # Execute the JJOB + "${HOMEgfs}/jobs/JGLOBAL_WAVE_POST_SBS" + status=$? + [[ ${status} -ne 0 ]] && exit "${status}" +done exit 0 diff --git a/parm/config/gefs/config.atmos_products b/parm/config/gefs/config.atmos_products index e8aae324e19..d1f36a7bc92 100644 --- a/parm/config/gefs/config.atmos_products +++ b/parm/config/gefs/config.atmos_products @@ -8,8 +8,8 @@ echo "BEGIN: config.atmos_products" # Get task specific resources . "${EXPDIR}/config.resources" atmos_products -# No. of forecast hours to process in a single job -export NFHRS_PER_GROUP=3 +# Maximum number of rocoto tasks per member +export MAX_TASKS=25 # Scripts used by this job export INTERP_ATMOS_MASTERSH="${USHgfs}/interp_atmos_master.sh" diff --git a/parm/config/gefs/config.oceanice_products b/parm/config/gefs/config.oceanice_products index 3b8b064947d..6bb604d0ca9 100644 --- a/parm/config/gefs/config.oceanice_products +++ b/parm/config/gefs/config.oceanice_products @@ -9,7 +9,7 @@ source "${EXPDIR}/config.resources" oceanice_products export OCEANICEPRODUCTS_CONFIG="${PARMgfs}/post/oceanice_products_gefs.yaml" -# No. of forecast hours to process in a single job -export NFHRS_PER_GROUP=3 +# Maximum number of rocoto tasks per member +export MAX_TASKS=25 echo "END: config.oceanice_products" diff --git a/parm/config/gefs/config.resources b/parm/config/gefs/config.resources index 68f81c10395..b221e89fe94 100644 --- a/parm/config/gefs/config.resources +++ b/parm/config/gefs/config.resources @@ -234,6 +234,7 @@ case ${step} in ;; "atmos_products") + # Walltime is per forecast hour; will be multipled by group size export walltime="00:15:00" export ntasks=24 export threads_per_task=1 @@ -250,6 +251,7 @@ case ${step} in ;; "oceanice_products") + # Walltime is per forecast hour; will be multipled by group size export walltime="00:15:00" export ntasks=1 export tasks_per_node=1 @@ -258,7 +260,8 @@ case ${step} in ;; "wavepostsbs") - export walltime="03:00:00" + # Walltime is per forecast hour; will be multipled by group size + export walltime="00:15:00" export ntasks=1 export threads_per_task=1 export tasks_per_node=$(( max_tasks_per_node / threads_per_task )) diff --git a/parm/config/gefs/config.wavepostsbs b/parm/config/gefs/config.wavepostsbs index 82cec321dae..b43ea33d406 100644 --- a/parm/config/gefs/config.wavepostsbs +++ b/parm/config/gefs/config.wavepostsbs @@ -8,6 +8,9 @@ echo "BEGIN: config.wavepostsbs" # Get task specific resources source "${EXPDIR}/config.resources" wavepostsbs +# Maximum number of rocoto tasks per member +export MAX_TASKS=25 + # Subgrid info for grib2 encoding export WAV_SUBGRBSRC="" export WAV_SUBGRB="" diff --git a/parm/config/gfs/config.atmos_products b/parm/config/gfs/config.atmos_products index 451f5eff863..5b6e4067b5d 100644 --- a/parm/config/gfs/config.atmos_products +++ b/parm/config/gfs/config.atmos_products @@ -8,8 +8,8 @@ echo "BEGIN: config.atmos_products" # Get task specific resources . "${EXPDIR}/config.resources" atmos_products -# No. of forecast hours to process in a single job -export NFHRS_PER_GROUP=3 +## Maximum number of rocoto tasks per member +export MAX_TASKS=25 # Scripts used by this job export INTERP_ATMOS_MASTERSH="${USHgfs}/interp_atmos_master.sh" diff --git a/parm/config/gfs/config.oceanice_products b/parm/config/gfs/config.oceanice_products index 9e5c5b1c683..a618cbe10c2 100644 --- a/parm/config/gfs/config.oceanice_products +++ b/parm/config/gfs/config.oceanice_products @@ -7,6 +7,9 @@ echo "BEGIN: config.oceanice_products" # Get task specific resources source "${EXPDIR}/config.resources" oceanice_products +# Maximum number of rocoto tasks per member +export MAX_TASKS=25 + export OCEANICEPRODUCTS_CONFIG="${PARMgfs}/post/oceanice_products.yaml" # No. of forecast hours to process in a single job diff --git a/parm/config/gfs/config.resources b/parm/config/gfs/config.resources index eeb33716c0a..c8eb7592be4 100644 --- a/parm/config/gfs/config.resources +++ b/parm/config/gfs/config.resources @@ -191,8 +191,9 @@ case ${step} in ;; "wavepostsbs") - walltime_gdas="00:20:00" - walltime_gfs="03:00:00" + # Walltime is per forecast hour; will be multipled by group size + walltime_gdas="00:15:00" + walltime_gfs="00:15:00" ntasks=8 threads_per_task=1 tasks_per_node=$(( max_tasks_per_node / threads_per_task )) @@ -911,6 +912,7 @@ case ${step} in ;; "oceanice_products") + # Walltime is per forecast hour; will be multipled by group size walltime="00:15:00" ntasks=1 tasks_per_node=1 @@ -944,6 +946,7 @@ case ${step} in ;; "atmos_products") + # Walltime is per forecast hour; will be multipled by group size walltime="00:15:00" ntasks=24 threads_per_task=1 diff --git a/parm/config/gfs/config.wavepostsbs b/parm/config/gfs/config.wavepostsbs index 82cec321dae..b43ea33d406 100644 --- a/parm/config/gfs/config.wavepostsbs +++ b/parm/config/gfs/config.wavepostsbs @@ -8,6 +8,9 @@ echo "BEGIN: config.wavepostsbs" # Get task specific resources source "${EXPDIR}/config.resources" wavepostsbs +# Maximum number of rocoto tasks per member +export MAX_TASKS=25 + # Subgrid info for grib2 encoding export WAV_SUBGRBSRC="" export WAV_SUBGRB="" diff --git a/workflow/rocoto/gefs_tasks.py b/workflow/rocoto/gefs_tasks.py index ca29bcdf1ee..293236e5a63 100644 --- a/workflow/rocoto/gefs_tasks.py +++ b/workflow/rocoto/gefs_tasks.py @@ -190,39 +190,57 @@ def _atmosoceaniceprod(self, component: str): fhout_ice_gfs = self._configs['base']['FHOUT_ICE_GFS'] products_dict = {'atmos': {'config': 'atmos_products', 'history_path_tmpl': 'COM_ATMOS_MASTER_TMPL', - 'history_file_tmpl': f'{self.run}.t@Hz.master.grb2f#fhr#'}, + 'history_file_tmpl': f'{self.run}.t@Hz.master.grb2f#fhr3_last#'}, 'ocean': {'config': 'oceanice_products', 'history_path_tmpl': 'COM_OCEAN_HISTORY_TMPL', - 'history_file_tmpl': f'{self.run}.ocean.t@Hz.{fhout_ocn_gfs}hr_avg.f#fhr_next#.nc'}, + 'history_file_tmpl': f'{self.run}.ocean.t@Hz.{fhout_ocn_gfs}hr_avg.f#fhr3_next#.nc'}, 'ice': {'config': 'oceanice_products', 'history_path_tmpl': 'COM_ICE_HISTORY_TMPL', - 'history_file_tmpl': f'{self.run}.ice.t@Hz.{fhout_ice_gfs}hr_avg.f#fhr#.nc'}} + 'history_file_tmpl': f'{self.run}.ice.t@Hz.{fhout_ice_gfs}hr_avg.f#fhr3_last#.nc'}} component_dict = products_dict[component] config = component_dict['config'] history_path_tmpl = component_dict['history_path_tmpl'] history_file_tmpl = component_dict['history_file_tmpl'] + max_tasks = self._configs[config]['MAX_TASKS'] resources = self.get_resource(config) + fhrs = self._get_forecast_hours('gefs', self._configs[config], component) + + # when replaying, atmos component does not have fhr 0, therefore remove 0 from fhrs + is_replay = self._configs[config]['REPLAY_ICS'] + if is_replay and component in ['atmos'] and 0 in fhrs: + fhrs.remove(0) + + # ocean/ice components do not have fhr 0 as they are averaged output + if component in ['ocean', 'ice'] and 0 in fhrs: + fhrs.remove(0) + + fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) + + # Adjust walltime based on the largest group + largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) + resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) + history_path = self._template_to_rocoto_cycstring(self._base[history_path_tmpl], {'MEMDIR': 'mem#member#'}) deps = [] data = f'{history_path}/{history_file_tmpl}' dep_dict = {'type': 'data', 'data': data, 'age': 120} deps.append(rocoto.add_dependency(dep_dict)) - dep_dict = {'type': 'metatask', 'name': 'gefs_fcst_mem#member#'} + dep_dict = {'type': 'task', 'name': 'gefs_fcst_mem#member#_#seg_dep#'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps, dep_condition='or') postenvars = self.envars.copy() postenvar_dict = {'ENSMEM': '#member#', 'MEMDIR': 'mem#member#', - 'FHR3': '#fhr#', + 'FHR_LIST': '#fhr_list#', 'COMPONENT': component} for key, value in postenvar_dict.items(): postenvars.append(rocoto.create_envar(name=key, value=str(value))) - task_name = f'gefs_{component}_prod_mem#member#_f#fhr#' + task_name = f'gefs_{component}_prod_mem#member#_#fhr_label#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, @@ -233,22 +251,6 @@ def _atmosoceaniceprod(self, component: str): 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', 'maxtries': '&MAXTRIES;'} - fhrs = self._get_forecast_hours('gefs', self._configs[config], component) - - # when replaying, atmos component does not have fhr 0, therefore remove 0 from fhrs - is_replay = self._configs[config]['REPLAY_ICS'] - if is_replay and component in ['atmos'] and 0 in fhrs: - fhrs.remove(0) - - # ocean/ice components do not have fhr 0 as they are averaged output - if component in ['ocean', 'ice'] and 0 in fhrs: - fhrs.remove(0) - - fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} - if component in ['ocean']: - fhrs_next = fhrs[1:] + [fhrs[-1] + (fhrs[-1] - fhrs[-2])] - fhr_var_dict['fhr_next'] = ' '.join([f"{fhr:03d}" for fhr in fhrs_next]) - fhr_metatask_dict = {'task_name': f'gefs_{component}_prod_#member#', 'task_dict': task_dict, 'var_dict': fhr_var_dict} @@ -308,22 +310,35 @@ def atmos_ensstat(self): return task def wavepostsbs(self): + deps = [] - dep_dict = {'type': 'metatask', 'name': f'gefs_fcst_mem#member#'} + dep_dict = {'type': 'task', 'name': f'gefs_fcst_mem#member#_#seg_dep#'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps) + fhrs = self._get_forecast_hours('gefs', self._configs['wavepostsbs'], 'wave') + is_replay = self._configs['wavepostsbs']['REPLAY_ICS'] + if is_replay: + fhrs = [fhr for fhr in fhrs if fhr not in [0, 1, 2]] + + max_tasks = self._configs['wavepostsbs']['MAX_TASKS'] + fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) + wave_post_envars = self.envars.copy() postenvar_dict = {'ENSMEM': '#member#', 'MEMDIR': 'mem#member#', - 'FHR3': '#fhr#', + 'FHR_LIST': '#fhr_list#', } for key, value in postenvar_dict.items(): wave_post_envars.append(rocoto.create_envar(name=key, value=str(value))) resources = self.get_resource('wavepostsbs') - task_name = f'gefs_wave_post_grid_mem#member#_f#fhr#' + # Adjust walltime based on the largest group + largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) + resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) + + task_name = f'gefs_wave_post_grid_mem#member#_#fhr_label#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, @@ -335,13 +350,6 @@ def wavepostsbs(self): 'maxtries': '&MAXTRIES;' } - fhrs = self._get_forecast_hours('gefs', self._configs['wavepostsbs'], 'wave') - is_replay = self._configs['wavepostsbs']['REPLAY_ICS'] - if is_replay: - fhrs = [fhr for fhr in fhrs if fhr not in [0, 1, 2]] - - fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} - fhr_metatask_dict = {'task_name': f'gefs_wave_post_grid_#member#', 'task_dict': task_dict, 'var_dict': fhr_var_dict} diff --git a/workflow/rocoto/gfs_tasks.py b/workflow/rocoto/gfs_tasks.py index d2a3e437192..1b52186f658 100644 --- a/workflow/rocoto/gfs_tasks.py +++ b/workflow/rocoto/gfs_tasks.py @@ -1029,7 +1029,7 @@ def atmanlupp(self): def atmanlprod(self): postenvars = self.envars.copy() - postenvar_dict = {'FHR3': '-001'} + postenvar_dict = {'FHR_LIST': '-1'} for key, value in postenvar_dict.items(): postenvars.append(rocoto.create_envar(name=key, value=str(value))) @@ -1126,21 +1126,36 @@ def _atmosoceaniceprod(self, component: str): products_dict = {'atmos': {'config': 'atmos_products', 'history_path_tmpl': 'COM_ATMOS_MASTER_TMPL', - 'history_file_tmpl': f'{self.run}.t@Hz.master.grb2f#fhr#'}, + 'history_file_tmpl': f'{self.run}.t@Hz.master.grb2f#fhr3_last#'}, 'ocean': {'config': 'oceanice_products', 'history_path_tmpl': 'COM_OCEAN_HISTORY_TMPL', - 'history_file_tmpl': f'{self.run}.ocean.t@Hz.6hr_avg.f#fhr_next#.nc'}, + 'history_file_tmpl': f'{self.run}.ocean.t@Hz.6hr_avg.f#fhr3_next#.nc'}, 'ice': {'config': 'oceanice_products', 'history_path_tmpl': 'COM_ICE_HISTORY_TMPL', - 'history_file_tmpl': f'{self.run}.ice.t@Hz.6hr_avg.f#fhr#.nc'}} + 'history_file_tmpl': f'{self.run}.ice.t@Hz.6hr_avg.f#fhr3_last#.nc'}} component_dict = products_dict[component] config = component_dict['config'] history_path_tmpl = component_dict['history_path_tmpl'] history_file_tmpl = component_dict['history_file_tmpl'] + max_tasks = self._configs[config]['MAX_TASKS'] + resources = self.get_resource(component_dict['config']) + + fhrs = self._get_forecast_hours(self.run, self._configs[config], component) + + # ocean/ice components do not have fhr 0 as they are averaged output + if component in ['ocean', 'ice'] and 0 in fhrs: + fhrs.remove(0) + + fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) + + # Adjust walltime based on the largest group + largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) + resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) + postenvars = self.envars.copy() - postenvar_dict = {'FHR3': '#fhr#', 'COMPONENT': component} + postenvar_dict = {'FHR_LIST': '#fhr_list#', 'COMPONENT': component} for key, value in postenvar_dict.items(): postenvars.append(rocoto.create_envar(name=key, value=str(value))) @@ -1154,9 +1169,8 @@ def _atmosoceaniceprod(self, component: str): dependencies = rocoto.create_dependency(dep=deps, dep_condition='or') cycledef = 'gdas_half,gdas' if self.run in ['gdas'] else self.run - resources = self.get_resource(component_dict['config']) - task_name = f'{self.run}_{component}_prod_f#fhr#' + task_name = f'{self.run}_{component}_prod_#fhr_label#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, @@ -1168,17 +1182,6 @@ def _atmosoceaniceprod(self, component: str): 'maxtries': '&MAXTRIES;' } - fhrs = self._get_forecast_hours(self.run, self._configs[config], component) - - # ocean/ice components do not have fhr 0 as they are averaged output - if component in ['ocean', 'ice'] and 0 in fhrs: - fhrs.remove(0) - - fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} - - if component in ['ocean']: - fhrs_next = fhrs[1:] + [fhrs[-1] + (fhrs[-1] - fhrs[-2])] - fhr_var_dict['fhr_next'] = ' '.join([f"{fhr:03d}" for fhr in fhrs_next]) metatask_dict = {'task_name': f'{self.run}_{component}_prod', 'task_dict': task_dict, 'var_dict': fhr_var_dict} @@ -1194,13 +1197,21 @@ def wavepostsbs(self): deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps) + fhrs = self._get_forecast_hours('gfs', self._configs['wavepostsbs'], 'wave') + max_tasks = self._configs['wavepostsbs']['MAX_TASKS'] + fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) + wave_post_envars = self.envars.copy() - postenvar_dict = {'FHR3': '#fhr#'} + postenvar_dict = {'FHR_LIST': '#fhr_list#'} for key, value in postenvar_dict.items(): wave_post_envars.append(rocoto.create_envar(name=key, value=str(value))) resources = self.get_resource('wavepostsbs') - task_name = f'{self.run}_wavepostsbs_f#fhr#' + # Adjust walltime based on the largest group + largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) + resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) + + task_name = f'{self.run}_wavepostsbs_#fhr_label#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, @@ -1212,12 +1223,9 @@ def wavepostsbs(self): 'maxtries': '&MAXTRIES;' } - fhrs = self._get_forecast_hours('gfs', self._configs['wavepostsbs'], 'wave') - - fhr_metatask_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} metatask_dict = {'task_name': f'{self.run}_wavepostsbs', 'task_dict': task_dict, - 'var_dict': fhr_metatask_dict} + 'var_dict': fhr_var_dict} task = rocoto.create_task(metatask_dict) diff --git a/workflow/rocoto/tasks.py b/workflow/rocoto/tasks.py index d9c769ffbea..7a42c6594c4 100644 --- a/workflow/rocoto/tasks.py +++ b/workflow/rocoto/tasks.py @@ -3,7 +3,7 @@ import numpy as np from applications.applications import AppConfig import rocoto.rocoto as rocoto -from wxflow import Template, TemplateConstants, to_timedelta +from wxflow import Template, TemplateConstants, to_timedelta, timedelta_to_HMS from typing import List __all__ = ['Tasks'] @@ -176,6 +176,203 @@ def _get_forecast_hours(run, config, component='atmos') -> List[str]: return fhrs + @staticmethod + def get_job_groups(fhrs: List[int], ngroups: int, breakpoints: List[int] = None) -> List[dict]: + ''' + Split forecast hours into a number of groups, obeying a list of pre-set breakpoints. + + Takes a list of forecast hours and splits it into a number of groups while obeying + a list of pre-set breakpoints and recording which segment each belongs to. + + Parameters + ---------- + fhrs: List[int] + List of forecast hours to break into groups + ngroups: int + Number of groups to split the forecast hours into + breakpoints: List[int] + List of preset forecast hour break points to use (default: []) + + Returns + ------- + List[dict]: List of dicts, where each dict contains two keys: + 'fhrs': the forecast hours for that group + 'seg': the forecast segment (from the original breakpoint list) + the group belong to + ''' + if breakpoints is None: + breakpoints = [] + + num_segs = len(breakpoints) + 1 + if num_segs > ngroups: + raise ValueError(f"Number of segments ({num_segs}) is greater than the number of groups ({ngroups}") + + if ngroups > len(fhrs): + ngroups = len(fhrs) + + # First, split at segment boundaries + fhrs_segs = [grp.tolist() for grp in np.array_split(fhrs, [fhrs.index(bpnt) + 1 for bpnt in breakpoints])] + seg_lens = [len(seg) for seg in fhrs_segs] + + # Initialize each segment to be split into one job group + ngroups_segs = [1 for _ in range(0, len(fhrs_segs))] + + # For remaining job groups, iteratively assign to the segment with the most + # hours per group + for _ in range(0, ngroups - len(fhrs_segs)): + current_lens = [size / weight for size, weight in zip(seg_lens, ngroups_segs)] + index_max = max(range(len(current_lens)), key=current_lens.__getitem__) + ngroups_segs[index_max] += 1 + + # Now that we know how many groups each forecast segment should be split into, + # Split them and flatten to a single list. + groups = [] + for seg_num, (fhrs_seg, ngroups_seg) in enumerate(zip(fhrs_segs, ngroups_segs)): + [groups.append({'fhrs': grp.tolist(), 'seg': seg_num}) for grp in np.array_split(fhrs_seg, ngroups_seg)] + + return groups + + @staticmethod + def test_job_groups(): + test_array = list(range(0, 24)) + + # Test simple splitting with no breakpoints + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5], 'seg': 0}, + {'fhrs': [6, 7, 8, 9, 10, 11], 'seg': 0}, + {'fhrs': [12, 13, 14, 15, 16, 17], 'seg': 0}, + {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 0}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4) == test_groups + + # Test with a break point that aligns with normal split point + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5], 'seg': 0}, + {'fhrs': [6, 7, 8, 9, 10, 11], 'seg': 0}, + {'fhrs': [12, 13, 14, 15, 16, 17], 'seg': 1}, + {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 1}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[11]) == test_groups + + # Test with a break point not at a normal splilt point + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5, 6, 7], 'seg': 0}, + {'fhrs': [8, 9, 10, 11, 12, 13, 14], 'seg': 0}, + {'fhrs': [15, 16, 17, 18, 19], 'seg': 1}, + {'fhrs': [20, 21, 22, 23], 'seg': 1}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[14]) == test_groups + + # Test highly skewed break point + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5, 6, 7], 'seg': 0}, + {'fhrs': [8, 9, 10, 11, 12, 13, 14, 15], 'seg': 0}, + {'fhrs': [16, 17, 18, 19, 20, 21, 22], 'seg': 0}, + {'fhrs': [23], 'seg': 1}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[22]) == test_groups + + # Test with two break points that align + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5], 'seg': 0}, + {'fhrs': [6, 7, 8, 9, 10, 11], 'seg': 0}, + {'fhrs': [12, 13, 14, 15, 16, 17], 'seg': 1}, + {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 2}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[11, 17]) == test_groups + + # Test with two skewed break points + test_groups = [{'fhrs': [0, 1], 'seg': 0}, + {'fhrs': [2, 3, 4, 5, 6, 7], 'seg': 1}, + {'fhrs': [8, 9, 10, 11, 12], 'seg': 1}, + {'fhrs': [13, 14, 15, 16, 17], 'seg': 1}, + {'fhrs': [18, 19, 20, 21, 22], 'seg': 1}, + {'fhrs': [23], 'seg': 2}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=6, breakpoints=[1, 22]) == test_groups + + # Test slightly irregular break points + test_groups = [{'fhrs': [0, 1, 2, 3], 'seg': 0}, + {'fhrs': [4, 5, 6], 'seg': 0}, + {'fhrs': [7, 8, 9, 10], 'seg': 1}, + {'fhrs': [11, 12, 13, 14], 'seg': 1}, + {'fhrs': [15, 16, 17, 18], 'seg': 1}, + {'fhrs': [19, 20, 21, 22, 23], 'seg': 2}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=6, breakpoints=[6, 18]) == test_groups + + # Test more groups than fhrs available + test_array = list(range(0, 6)) + test_groups = [{'fhrs': [0], 'seg': 0}, + {'fhrs': [1], 'seg': 0}, + {'fhrs': [2], 'seg': 0}, + {'fhrs': [3], 'seg': 0}, + {'fhrs': [4], 'seg': 0}, + {'fhrs': [5], 'seg': 0}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=15) == test_groups + + def get_grouped_fhr_dict(self, fhrs: List[int], ngroups: int) -> dict: + ''' + Prepare a metatask dictionary for forecast hour groups. + + Takes a list of forecast hours and splits it into a number of groups while not + crossing forecast segment boundaries. Then use that to prepare a dict with key + variable lists for use in a rocoto metatask. + + Parameters + ---------- + fhrs: List[int] + List of forecast hours to break into groups + ngroups: int + Number of groups to split the forecast hours into + + Returns + ------- + dict: Several variable lists for use in rocoto metatasks: + fhr_list: list of comma-separated lists of fhr groups + fhr_label: list of labels corrsponding to the fhr range + fhr3_last: list of the last fhr in each group, formatted to three digits + fhr3_next: list of the fhr that would follow each group, formatted to + three digits + seg_dep: list of segments each group belongs to + ''' + fhr_breakpoints = self.options['fcst_segments'][1:-1] + group_dicts = Tasks.get_job_groups(fhrs=fhrs, ngroups=ngroups, breakpoints=fhr_breakpoints) + + fhrs_group = [dct['fhrs'] for dct in group_dicts] + fhrs_first = [grp[0] for grp in fhrs_group] + fhrs_last = [grp[-1] for grp in fhrs_group] + fhrs_next = fhrs_first[1:] + [fhrs_last[-1] + (fhrs[-1] - fhrs[-2])] + grp_str = [f'f{grp[0]:03d}-f{grp[-1]:03d}' if len(grp) > 1 else f'f{grp[0]:03d}' for grp in fhrs_group] + seg_deps = [f'seg{dct["seg"]}' for dct in group_dicts] + + fhr_var_dict = {'fhr_list': ' '.join(([','.join(str(fhr) for fhr in grp) for grp in fhrs_group])), + 'fhr_label': ' '.join(grp_str), + 'seg_dep': ' '.join(seg_deps), + 'fhr3_last': ' '.join([f'{fhr:03d}' for fhr in fhrs_last]), + 'fhr3_next': ' '.join([f'{fhr:03d}' for fhr in fhrs_next]) + } + + return fhr_var_dict + + @staticmethod + def multiply_HMS(hms_timedelta: str, multiplier: int | float) -> str: + ''' + Multiplies an HMS timedelta string + + Parameters + ---------- + hms_timedelta: str + String represnting a time delta in HH:MM:SS format + multiplier: int | float + Value to multiply the time delta by + + Returns + ------- + str: String represnting a time delta in HH:MM:SS format + + ''' + input_timedelta = to_timedelta(hms_timedelta) + output_timedelta = input_timedelta * multiplier + return timedelta_to_HMS(output_timedelta) + + @staticmethod + def test_multiply_HMS(): + assert Tasks.multiply_HMS('00:10:00', 2) == '00:20:00' + assert Tasks.multiply_HMS('00:30:00', 10) == '05:00:00' + assert Tasks.multiply_HMS('01:15:00', 4) == '05:00:00' + assert Tasks.multiply_HMS('00:05:00', 1.5) == '00:07:30' + assert Tasks.multiply_HMS('00:40:00', 2.5) == '01:40:00' + assert Tasks.multiply_HMS('00:10:00', 1) == '00:10:00' + def get_resource(self, task_name): """ Given a task name (task_name) and its configuration (task_names), From e7412671e62d7121da81c3808e6829fcfaef27c0 Mon Sep 17 00:00:00 2001 From: "Walter.Kolczynski" Date: Tue, 7 Jan 2025 17:36:44 -0600 Subject: [PATCH 02/21] Add wave gridded post data dependency Adds an alternative data dependency for gridded wave post so jobs can begin before the entire forecast is complete. Uses the next file existing to prevent reading incomplete files. Resolves #3210 --- workflow/rocoto/gefs_tasks.py | 10 ++++++++-- workflow/rocoto/gfs_tasks.py | 10 ++++++++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/workflow/rocoto/gefs_tasks.py b/workflow/rocoto/gefs_tasks.py index 293236e5a63..9e1a6b6b2b8 100644 --- a/workflow/rocoto/gefs_tasks.py +++ b/workflow/rocoto/gefs_tasks.py @@ -311,10 +311,16 @@ def atmos_ensstat(self): def wavepostsbs(self): + wave_grid = self._configs['base']['waveGRD'] + history_path = self._template_to_rocoto_cycstring(self._base['COM_WAVE_HISTORY_TMPL'], {'MEMDIR': 'mem#member#'}) + history_file = f'/{self.run}wave.out_grd.{wave_grid}.@Y@m@d.@H@M@S' + deps = [] - dep_dict = {'type': 'task', 'name': f'gefs_fcst_mem#member#_#seg_dep#'} + dep_dict = {'type': 'data', 'data':[history_path, history_file], 'offset': [None, '#fhr3_next#:00:00']} deps.append(rocoto.add_dependency(dep_dict)) - dependencies = rocoto.create_dependency(dep=deps) + dep_dict = {'type': 'task', 'name': f'{self.run}_fcst_mem#member#_#seg_dep#'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps, dep_condition='or') fhrs = self._get_forecast_hours('gefs', self._configs['wavepostsbs'], 'wave') is_replay = self._configs['wavepostsbs']['REPLAY_ICS'] diff --git a/workflow/rocoto/gfs_tasks.py b/workflow/rocoto/gfs_tasks.py index 1b52186f658..5a8f7de7270 100644 --- a/workflow/rocoto/gfs_tasks.py +++ b/workflow/rocoto/gfs_tasks.py @@ -1192,10 +1192,16 @@ def _atmosoceaniceprod(self, component: str): def wavepostsbs(self): + wave_grid = self._configs['base']['waveGRD'] + history_path = self._template_to_rocoto_cycstring(self._base['COM_WAVE_HISTORY_TMPL']) + history_file = f'/{self.run}wave.out_grd.{wave_grid}.@Y@m@d.@H@M@S' + deps = [] - dep_dict = {'type': 'metatask', 'name': f'{self.run}_fcst'} + dep_dict = {'type': 'data', 'data':[history_path, history_file], 'offset': [None, '#fhr3_next#:00:00']} deps.append(rocoto.add_dependency(dep_dict)) - dependencies = rocoto.create_dependency(dep=deps) + dep_dict = {'type': 'task', 'name': f'{self.run}_fcst_#seg_dep#'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps, dep_condition='or') fhrs = self._get_forecast_hours('gfs', self._configs['wavepostsbs'], 'wave') max_tasks = self._configs['wavepostsbs']['MAX_TASKS'] From f6c476de148009303bd6255ee505ed1fd0e9004d Mon Sep 17 00:00:00 2001 From: "Walter.Kolczynski" Date: Tue, 7 Jan 2025 17:43:01 -0600 Subject: [PATCH 03/21] Fix documentation typos in tasks.py --- workflow/rocoto/tasks.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/workflow/rocoto/tasks.py b/workflow/rocoto/tasks.py index 7a42c6594c4..efaf37c8200 100644 --- a/workflow/rocoto/tasks.py +++ b/workflow/rocoto/tasks.py @@ -250,7 +250,7 @@ def test_job_groups(): {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 1}] assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[11]) == test_groups - # Test with a break point not at a normal splilt point + # Test with a break point not at a normal split point test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5, 6, 7], 'seg': 0}, {'fhrs': [8, 9, 10, 11, 12, 13, 14], 'seg': 0}, {'fhrs': [15, 16, 17, 18, 19], 'seg': 1}, @@ -318,7 +318,7 @@ def get_grouped_fhr_dict(self, fhrs: List[int], ngroups: int) -> dict: ------- dict: Several variable lists for use in rocoto metatasks: fhr_list: list of comma-separated lists of fhr groups - fhr_label: list of labels corrsponding to the fhr range + fhr_label: list of labels corresponding to the fhr range fhr3_last: list of the last fhr in each group, formatted to three digits fhr3_next: list of the fhr that would follow each group, formatted to three digits @@ -351,13 +351,13 @@ def multiply_HMS(hms_timedelta: str, multiplier: int | float) -> str: Parameters ---------- hms_timedelta: str - String represnting a time delta in HH:MM:SS format + String representing a time delta in HH:MM:SS format multiplier: int | float Value to multiply the time delta by Returns ------- - str: String represnting a time delta in HH:MM:SS format + str: String representing a time delta in HH:MM:SS format ''' input_timedelta = to_timedelta(hms_timedelta) From 7287f0fd2cf08192db70f7f9a3e2f68e1ff0d938 Mon Sep 17 00:00:00 2001 From: "Walter.Kolczynski" Date: Tue, 7 Jan 2025 17:49:31 -0600 Subject: [PATCH 04/21] Use fhr in jobid for product jobs --- jobs/rocoto/atmos_products.sh | 5 +++-- jobs/rocoto/oceanice_products.sh | 5 +++-- jobs/rocoto/wavepostsbs.sh | 4 ++-- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/jobs/rocoto/atmos_products.sh b/jobs/rocoto/atmos_products.sh index 5d545975194..947b06dfc23 100755 --- a/jobs/rocoto/atmos_products.sh +++ b/jobs/rocoto/atmos_products.sh @@ -13,13 +13,14 @@ status=$? if (( status != 0 )); then exit "${status}"; fi export job="atmos_products" -export jobid="${job}.$$" # shellcheck disable=SC2153 IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}" -export FORECAST_HOUR +export FORECAST_HOUR jobid for FORECAST_HOUR in "${fhr_list[@]}"; do + fhr3=$(printf '%03d' "${FORECAST_HOUR}") + jobid="${job}_f${fhr3}.$$" ############################################################### # Execute the JJOB ############################################################### diff --git a/jobs/rocoto/oceanice_products.sh b/jobs/rocoto/oceanice_products.sh index d72afcb99f4..c3e03cea1aa 100755 --- a/jobs/rocoto/oceanice_products.sh +++ b/jobs/rocoto/oceanice_products.sh @@ -13,13 +13,14 @@ status=$? if (( status != 0 )); then exit "${status}"; fi export job="oceanice_products" -export jobid="${job}.$$" # shellcheck disable=SC2153 IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}" -export FORECAST_HOUR +export FORECAST_HOUR jobid for FORECAST_HOUR in "${fhr_list[@]}"; do + fhr3=$(printf '%03d' "${FORECAST_HOUR}") + jobid="${job}_${COMPONENT}_f${fhr3}.$$" ############################################################### # Execute the JJOB ############################################################### diff --git a/jobs/rocoto/wavepostsbs.sh b/jobs/rocoto/wavepostsbs.sh index b2eec789ae8..ff81c2a9d30 100755 --- a/jobs/rocoto/wavepostsbs.sh +++ b/jobs/rocoto/wavepostsbs.sh @@ -10,15 +10,15 @@ status=$? [[ ${status} -ne 0 ]] && exit "${status}" export job="wavepostsbs" -export jobid="${job}.$$" ############################################################### # shellcheck disable=SC2153 IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}" -export FHR3 +export FHR3 jobid for FORECAST_HOUR in "${fhr_list[@]}"; do FHR3=$(printf '%03d' "${FORECAST_HOUR}") + jobid="${job}_f${FHR3}.$$" # Execute the JJOB "${HOMEgfs}/jobs/JGLOBAL_WAVE_POST_SBS" status=$? From b8c832dbad6d533b16a03a893d81334578d06a6e Mon Sep 17 00:00:00 2001 From: "Walter.Kolczynski" Date: Tue, 7 Jan 2025 18:07:04 -0600 Subject: [PATCH 05/21] Add missing whitespace --- workflow/rocoto/gefs_tasks.py | 2 +- workflow/rocoto/gfs_tasks.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/workflow/rocoto/gefs_tasks.py b/workflow/rocoto/gefs_tasks.py index 9e1a6b6b2b8..89b7498b4c0 100644 --- a/workflow/rocoto/gefs_tasks.py +++ b/workflow/rocoto/gefs_tasks.py @@ -316,7 +316,7 @@ def wavepostsbs(self): history_file = f'/{self.run}wave.out_grd.{wave_grid}.@Y@m@d.@H@M@S' deps = [] - dep_dict = {'type': 'data', 'data':[history_path, history_file], 'offset': [None, '#fhr3_next#:00:00']} + dep_dict = {'type': 'data', 'data': [history_path, history_file], 'offset': [None, '#fhr3_next#:00:00']} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'task', 'name': f'{self.run}_fcst_mem#member#_#seg_dep#'} deps.append(rocoto.add_dependency(dep_dict)) diff --git a/workflow/rocoto/gfs_tasks.py b/workflow/rocoto/gfs_tasks.py index 5a8f7de7270..6e506017518 100644 --- a/workflow/rocoto/gfs_tasks.py +++ b/workflow/rocoto/gfs_tasks.py @@ -1197,7 +1197,7 @@ def wavepostsbs(self): history_file = f'/{self.run}wave.out_grd.{wave_grid}.@Y@m@d.@H@M@S' deps = [] - dep_dict = {'type': 'data', 'data':[history_path, history_file], 'offset': [None, '#fhr3_next#:00:00']} + dep_dict = {'type': 'data', 'data': [history_path, history_file], 'offset': [None, '#fhr3_next#:00:00']} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'task', 'name': f'{self.run}_fcst_#seg_dep#'} deps.append(rocoto.add_dependency(dep_dict)) From f0444d50bec61195dd25ea7fa2608cb6ead01750 Mon Sep 17 00:00:00 2001 From: "Walter.Kolczynski" Date: Tue, 7 Jan 2025 18:10:24 -0600 Subject: [PATCH 06/21] Add comment about removing output hours for replay --- workflow/rocoto/gefs_tasks.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/workflow/rocoto/gefs_tasks.py b/workflow/rocoto/gefs_tasks.py index 89b7498b4c0..2cb7166c7a9 100644 --- a/workflow/rocoto/gefs_tasks.py +++ b/workflow/rocoto/gefs_tasks.py @@ -323,6 +323,8 @@ def wavepostsbs(self): dependencies = rocoto.create_dependency(dep=deps, dep_condition='or') fhrs = self._get_forecast_hours('gefs', self._configs['wavepostsbs'], 'wave') + + # When using replay, output does not start until hour 3 is_replay = self._configs['wavepostsbs']['REPLAY_ICS'] if is_replay: fhrs = [fhr for fhr in fhrs if fhr not in [0, 1, 2]] From 833cb44c222c1568d60eb0e5b820fd7f8a849a09 Mon Sep 17 00:00:00 2001 From: "Walter.Kolczynski" Date: Wed, 8 Jan 2025 12:30:04 -0600 Subject: [PATCH 07/21] Move new task tests to separate file Moves the new task tests to a separate file. Also changes the permissions on the existing test files so they can be executed. --- workflow/rocoto/tasks.py | 76 ----------------------------- workflow/test_configuration.py | 0 workflow/test_hosts.py | 0 workflow/test_tasks.py | 89 ++++++++++++++++++++++++++++++++++ 4 files changed, 89 insertions(+), 76 deletions(-) mode change 100644 => 100755 workflow/test_configuration.py mode change 100644 => 100755 workflow/test_hosts.py create mode 100755 workflow/test_tasks.py diff --git a/workflow/rocoto/tasks.py b/workflow/rocoto/tasks.py index efaf37c8200..77971837ab1 100644 --- a/workflow/rocoto/tasks.py +++ b/workflow/rocoto/tasks.py @@ -232,73 +232,6 @@ def get_job_groups(fhrs: List[int], ngroups: int, breakpoints: List[int] = None) return groups - @staticmethod - def test_job_groups(): - test_array = list(range(0, 24)) - - # Test simple splitting with no breakpoints - test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5], 'seg': 0}, - {'fhrs': [6, 7, 8, 9, 10, 11], 'seg': 0}, - {'fhrs': [12, 13, 14, 15, 16, 17], 'seg': 0}, - {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 0}] - assert Tasks.get_job_groups(fhrs=test_array, ngroups=4) == test_groups - - # Test with a break point that aligns with normal split point - test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5], 'seg': 0}, - {'fhrs': [6, 7, 8, 9, 10, 11], 'seg': 0}, - {'fhrs': [12, 13, 14, 15, 16, 17], 'seg': 1}, - {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 1}] - assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[11]) == test_groups - - # Test with a break point not at a normal split point - test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5, 6, 7], 'seg': 0}, - {'fhrs': [8, 9, 10, 11, 12, 13, 14], 'seg': 0}, - {'fhrs': [15, 16, 17, 18, 19], 'seg': 1}, - {'fhrs': [20, 21, 22, 23], 'seg': 1}] - assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[14]) == test_groups - - # Test highly skewed break point - test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5, 6, 7], 'seg': 0}, - {'fhrs': [8, 9, 10, 11, 12, 13, 14, 15], 'seg': 0}, - {'fhrs': [16, 17, 18, 19, 20, 21, 22], 'seg': 0}, - {'fhrs': [23], 'seg': 1}] - assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[22]) == test_groups - - # Test with two break points that align - test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5], 'seg': 0}, - {'fhrs': [6, 7, 8, 9, 10, 11], 'seg': 0}, - {'fhrs': [12, 13, 14, 15, 16, 17], 'seg': 1}, - {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 2}] - assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[11, 17]) == test_groups - - # Test with two skewed break points - test_groups = [{'fhrs': [0, 1], 'seg': 0}, - {'fhrs': [2, 3, 4, 5, 6, 7], 'seg': 1}, - {'fhrs': [8, 9, 10, 11, 12], 'seg': 1}, - {'fhrs': [13, 14, 15, 16, 17], 'seg': 1}, - {'fhrs': [18, 19, 20, 21, 22], 'seg': 1}, - {'fhrs': [23], 'seg': 2}] - assert Tasks.get_job_groups(fhrs=test_array, ngroups=6, breakpoints=[1, 22]) == test_groups - - # Test slightly irregular break points - test_groups = [{'fhrs': [0, 1, 2, 3], 'seg': 0}, - {'fhrs': [4, 5, 6], 'seg': 0}, - {'fhrs': [7, 8, 9, 10], 'seg': 1}, - {'fhrs': [11, 12, 13, 14], 'seg': 1}, - {'fhrs': [15, 16, 17, 18], 'seg': 1}, - {'fhrs': [19, 20, 21, 22, 23], 'seg': 2}] - assert Tasks.get_job_groups(fhrs=test_array, ngroups=6, breakpoints=[6, 18]) == test_groups - - # Test more groups than fhrs available - test_array = list(range(0, 6)) - test_groups = [{'fhrs': [0], 'seg': 0}, - {'fhrs': [1], 'seg': 0}, - {'fhrs': [2], 'seg': 0}, - {'fhrs': [3], 'seg': 0}, - {'fhrs': [4], 'seg': 0}, - {'fhrs': [5], 'seg': 0}] - assert Tasks.get_job_groups(fhrs=test_array, ngroups=15) == test_groups - def get_grouped_fhr_dict(self, fhrs: List[int], ngroups: int) -> dict: ''' Prepare a metatask dictionary for forecast hour groups. @@ -364,15 +297,6 @@ def multiply_HMS(hms_timedelta: str, multiplier: int | float) -> str: output_timedelta = input_timedelta * multiplier return timedelta_to_HMS(output_timedelta) - @staticmethod - def test_multiply_HMS(): - assert Tasks.multiply_HMS('00:10:00', 2) == '00:20:00' - assert Tasks.multiply_HMS('00:30:00', 10) == '05:00:00' - assert Tasks.multiply_HMS('01:15:00', 4) == '05:00:00' - assert Tasks.multiply_HMS('00:05:00', 1.5) == '00:07:30' - assert Tasks.multiply_HMS('00:40:00', 2.5) == '01:40:00' - assert Tasks.multiply_HMS('00:10:00', 1) == '00:10:00' - def get_resource(self, task_name): """ Given a task name (task_name) and its configuration (task_names), diff --git a/workflow/test_configuration.py b/workflow/test_configuration.py old mode 100644 new mode 100755 diff --git a/workflow/test_hosts.py b/workflow/test_hosts.py old mode 100644 new mode 100755 diff --git a/workflow/test_tasks.py b/workflow/test_tasks.py new file mode 100755 index 00000000000..8c77f95e458 --- /dev/null +++ b/workflow/test_tasks.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 + +from rocoto.tasks import Tasks +import unittest + +class TestTasks(unittest.TestCase): + + ''' + Tasks class tests + + Note: this is currently only testing a small fraction of the class. + ''' + + def test_job_groups(self): + test_array = list(range(0, 24)) + + # Test simple splitting with no breakpoints + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5], 'seg': 0}, + {'fhrs': [6, 7, 8, 9, 10, 11], 'seg': 0}, + {'fhrs': [12, 13, 14, 15, 16, 17], 'seg': 0}, + {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 0}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4) == test_groups + + # Test with a break point that aligns with normal split point + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5], 'seg': 0}, + {'fhrs': [6, 7, 8, 9, 10, 11], 'seg': 0}, + {'fhrs': [12, 13, 14, 15, 16, 17], 'seg': 1}, + {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 1}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[11]) == test_groups + + # Test with a break point not at a normal split point + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5, 6, 7], 'seg': 0}, + {'fhrs': [8, 9, 10, 11, 12, 13, 14], 'seg': 0}, + {'fhrs': [15, 16, 17, 18, 19], 'seg': 1}, + {'fhrs': [20, 21, 22, 23], 'seg': 1}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[14]) == test_groups + + # Test highly skewed break point + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5, 6, 7], 'seg': 0}, + {'fhrs': [8, 9, 10, 11, 12, 13, 14, 15], 'seg': 0}, + {'fhrs': [16, 17, 18, 19, 20, 21, 22], 'seg': 0}, + {'fhrs': [23], 'seg': 1}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[22]) == test_groups + + # Test with two break points that align + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5], 'seg': 0}, + {'fhrs': [6, 7, 8, 9, 10, 11], 'seg': 0}, + {'fhrs': [12, 13, 14, 15, 16, 17], 'seg': 1}, + {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 2}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[11, 17]) == test_groups + + # Test with two skewed break points + test_groups = [{'fhrs': [0, 1], 'seg': 0}, + {'fhrs': [2, 3, 4, 5, 6, 7], 'seg': 1}, + {'fhrs': [8, 9, 10, 11, 12], 'seg': 1}, + {'fhrs': [13, 14, 15, 16, 17], 'seg': 1}, + {'fhrs': [18, 19, 20, 21, 22], 'seg': 1}, + {'fhrs': [23], 'seg': 2}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=6, breakpoints=[1, 22]) == test_groups + + # Test slightly irregular break points + test_groups = [{'fhrs': [0, 1, 2, 3], 'seg': 0}, + {'fhrs': [4, 5, 6], 'seg': 0}, + {'fhrs': [7, 8, 9, 10], 'seg': 1}, + {'fhrs': [11, 12, 13, 14], 'seg': 1}, + {'fhrs': [15, 16, 17, 18], 'seg': 1}, + {'fhrs': [19, 20, 21, 22, 23], 'seg': 2}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=6, breakpoints=[6, 18]) == test_groups + + # Test more groups than fhrs available + test_array = list(range(0, 6)) + test_groups = [{'fhrs': [0], 'seg': 0}, + {'fhrs': [1], 'seg': 0}, + {'fhrs': [2], 'seg': 0}, + {'fhrs': [3], 'seg': 0}, + {'fhrs': [4], 'seg': 0}, + {'fhrs': [5], 'seg': 0}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=15) == test_groups + + def test_multiply_HMS(self): + assert Tasks.multiply_HMS('00:10:00', 2) == '00:20:00' + assert Tasks.multiply_HMS('00:30:00', 10) == '05:00:00' + assert Tasks.multiply_HMS('01:15:00', 4) == '05:00:00' + assert Tasks.multiply_HMS('00:05:00', 1.5) == '00:07:30' + assert Tasks.multiply_HMS('00:40:00', 2.5) == '01:40:00' + assert Tasks.multiply_HMS('00:10:00', 1) == '00:10:00' + +if __name__ == '__main__': + unittest.main() From bc07a7f32d768ee9d466e4957e40aa3fe92a760f Mon Sep 17 00:00:00 2001 From: "Walter.Kolczynski" Date: Wed, 8 Jan 2025 12:55:47 -0600 Subject: [PATCH 08/21] Switch from unittest to pytest for Tasks test --- workflow/test_tasks.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/workflow/test_tasks.py b/workflow/test_tasks.py index 8c77f95e458..5b672551f37 100755 --- a/workflow/test_tasks.py +++ b/workflow/test_tasks.py @@ -1,9 +1,8 @@ #!/usr/bin/env python3 from rocoto.tasks import Tasks -import unittest -class TestTasks(unittest.TestCase): +class TestTasks: ''' Tasks class tests @@ -84,6 +83,3 @@ def test_multiply_HMS(self): assert Tasks.multiply_HMS('00:05:00', 1.5) == '00:07:30' assert Tasks.multiply_HMS('00:40:00', 2.5) == '01:40:00' assert Tasks.multiply_HMS('00:10:00', 1) == '00:10:00' - -if __name__ == '__main__': - unittest.main() From d6b5a5c16a788414f0f08b2852dadb1f46d0660c Mon Sep 17 00:00:00 2001 From: "Walter.Kolczynski" Date: Wed, 8 Jan 2025 13:13:58 -0600 Subject: [PATCH 09/21] Extend grouping to ensstat --- parm/config/gefs/config.atmos_ensstat | 3 +++ parm/config/gefs/config.resources | 4 ++-- workflow/rocoto/gefs_tasks.py | 27 ++++++++++++++++----------- 3 files changed, 21 insertions(+), 13 deletions(-) diff --git a/parm/config/gefs/config.atmos_ensstat b/parm/config/gefs/config.atmos_ensstat index d371f758870..b5426595236 100644 --- a/parm/config/gefs/config.atmos_ensstat +++ b/parm/config/gefs/config.atmos_ensstat @@ -5,6 +5,9 @@ echo "BEGIN: config.atmos_ensstat" +# Maximum number of rocoto tasks +export MAX_TASKS=25 + # Get task specific resources . "${EXPDIR}/config.resources" atmos_ensstat diff --git a/parm/config/gefs/config.resources b/parm/config/gefs/config.resources index 8deb9e92bd2..0a368372af3 100644 --- a/parm/config/gefs/config.resources +++ b/parm/config/gefs/config.resources @@ -243,7 +243,7 @@ case ${step} in ;; "atmos_ensstat") - export walltime="00:30:00" + export walltime="00:15:00" export ntasks=6 export threads_per_task=1 export tasks_per_node="${ntasks}" @@ -331,7 +331,7 @@ case ${step} in ;; "cleanup") - export walltime="00:15:00" + export walltime="00:30:00" export ntasks=1 export tasks_per_node=1 export threads_per_task=1 diff --git a/workflow/rocoto/gefs_tasks.py b/workflow/rocoto/gefs_tasks.py index 2cb7166c7a9..07c9263eece 100644 --- a/workflow/rocoto/gefs_tasks.py +++ b/workflow/rocoto/gefs_tasks.py @@ -270,18 +270,32 @@ def atmos_ensstat(self): deps = [] for member in range(0, self.nmem + 1): - task = f'gefs_atmos_prod_mem{member:03d}_f#fhr#' + task = f'gefs_atmos_prod_mem{member:03d}_#fhr_label#' dep_dict = {'type': 'task', 'name': task} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + fhrs = self._get_forecast_hours('gefs', self._configs['atmos_ensstat']) + + # when replaying, atmos component does not have fhr 0, therefore remove 0 from fhrs + is_replay = self._configs['atmos_ensstat']['REPLAY_ICS'] + if is_replay and 0 in fhrs: + fhrs.remove(0) + + max_tasks = self._configs['atmos_ensstat']['MAX_TASKS'] + fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) + + # Adjust walltime based on the largest group + largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) + resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) + postenvars = self.envars.copy() postenvar_dict = {'FHR3': '#fhr#'} for key, value in postenvar_dict.items(): postenvars.append(rocoto.create_envar(name=key, value=str(value))) - task_name = f'gefs_atmos_ensstat_f#fhr#' + task_name = f'gefs_atmos_ensstat_#fhr_label#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, @@ -292,15 +306,6 @@ def atmos_ensstat(self): 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', 'maxtries': '&MAXTRIES;'} - fhrs = self._get_forecast_hours('gefs', self._configs['atmos_ensstat']) - - # when replaying, atmos component does not have fhr 0, therefore remove 0 from fhrs - is_replay = self._configs['atmos_ensstat']['REPLAY_ICS'] - if is_replay and 0 in fhrs: - fhrs.remove(0) - - fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} - fhr_metatask_dict = {'task_name': f'gefs_atmos_ensstat', 'task_dict': task_dict, 'var_dict': fhr_var_dict} From 898607c1175359c7bfcc6da7403ecc27c2899d4c Mon Sep 17 00:00:00 2001 From: "Walter.Kolczynski" Date: Wed, 8 Jan 2025 13:59:32 -0600 Subject: [PATCH 10/21] Reset permissions on workflow tests --- workflow/test_configuration.py | 0 workflow/test_hosts.py | 0 workflow/test_tasks.py | 2 -- 3 files changed, 2 deletions(-) mode change 100755 => 100644 workflow/test_configuration.py mode change 100755 => 100644 workflow/test_hosts.py mode change 100755 => 100644 workflow/test_tasks.py diff --git a/workflow/test_configuration.py b/workflow/test_configuration.py old mode 100755 new mode 100644 diff --git a/workflow/test_hosts.py b/workflow/test_hosts.py old mode 100755 new mode 100644 diff --git a/workflow/test_tasks.py b/workflow/test_tasks.py old mode 100755 new mode 100644 index 5b672551f37..896fa390683 --- a/workflow/test_tasks.py +++ b/workflow/test_tasks.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python3 - from rocoto.tasks import Tasks class TestTasks: From d6492ad945fd27641e90aa3e83b726bdcdf69a02 Mon Sep 17 00:00:00 2001 From: "Walter.Kolczynski" Date: Wed, 8 Jan 2025 14:07:48 -0600 Subject: [PATCH 11/21] Move workflow tests to tests directory --- workflow/tests/__init__.py | 0 workflow/{ => tests}/test_configuration.py | 0 workflow/{ => tests}/test_hosts.py | 0 workflow/{ => tests}/test_tasks.py | 0 4 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 workflow/tests/__init__.py rename workflow/{ => tests}/test_configuration.py (100%) rename workflow/{ => tests}/test_hosts.py (100%) rename workflow/{ => tests}/test_tasks.py (100%) diff --git a/workflow/tests/__init__.py b/workflow/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/workflow/test_configuration.py b/workflow/tests/test_configuration.py similarity index 100% rename from workflow/test_configuration.py rename to workflow/tests/test_configuration.py diff --git a/workflow/test_hosts.py b/workflow/tests/test_hosts.py similarity index 100% rename from workflow/test_hosts.py rename to workflow/tests/test_hosts.py diff --git a/workflow/test_tasks.py b/workflow/tests/test_tasks.py similarity index 100% rename from workflow/test_tasks.py rename to workflow/tests/test_tasks.py From 9372fd23978a78c208012faa72c71168d3a4b229 Mon Sep 17 00:00:00 2001 From: "Walter.Kolczynski" Date: Wed, 8 Jan 2025 14:09:44 -0600 Subject: [PATCH 12/21] Update ensstat rocoto stub to loop over fhr --- jobs/rocoto/atmos_ensstat.sh | 19 +++++++++++++------ workflow/rocoto/gefs_tasks.py | 2 +- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/jobs/rocoto/atmos_ensstat.sh b/jobs/rocoto/atmos_ensstat.sh index 76ed7f0a727..46aae58b905 100755 --- a/jobs/rocoto/atmos_ensstat.sh +++ b/jobs/rocoto/atmos_ensstat.sh @@ -13,13 +13,20 @@ status=$? if (( status != 0 )); then exit "${status}"; fi export job="atmos_ensstat" -export jobid="${job}.$$" -export FORECAST_HOUR=$(( 10#${FHR3} )) +# shellcheck disable=SC2153 +IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}" -############################################################### -# Execute the JJOB -############################################################### -"${HOMEgfs}/jobs/JGLOBAL_ATMOS_ENSSTAT" +export FORECAST_HOUR jobid +for FORECAST_HOUR in "${fhr_list[@]}"; do + fhr3=$(printf '%03d' "${FORECAST_HOUR}") + jobid="${job}_f${fhr3}.$$" + ############################################################### + # Execute the JJOB + ############################################################### + "${HOMEgfs}/jobs/JGLOBAL_ATMOS_ENSSTAT" + status=$? + [[ ${status} -ne 0 ]] && exit "${status}" +done exit $? diff --git a/workflow/rocoto/gefs_tasks.py b/workflow/rocoto/gefs_tasks.py index 07c9263eece..f89d3dbbb0e 100644 --- a/workflow/rocoto/gefs_tasks.py +++ b/workflow/rocoto/gefs_tasks.py @@ -291,7 +291,7 @@ def atmos_ensstat(self): resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) postenvars = self.envars.copy() - postenvar_dict = {'FHR3': '#fhr#'} + postenvar_dict = {'FHR_LIST': '#fhr_list#'} for key, value in postenvar_dict.items(): postenvars.append(rocoto.create_envar(name=key, value=str(value))) From c24412d45d52bf8fb97fe2f1aac456bab75b5e1a Mon Sep 17 00:00:00 2001 From: "Walter.Kolczynski" Date: Wed, 8 Jan 2025 14:25:26 -0600 Subject: [PATCH 13/21] Replace tabs with spaces in tasks test How did those tabs get in there? --- workflow/tests/test_tasks.py | 137 ++++++++++++++++++----------------- 1 file changed, 69 insertions(+), 68 deletions(-) diff --git a/workflow/tests/test_tasks.py b/workflow/tests/test_tasks.py index 896fa390683..7e7e7eb5e7b 100644 --- a/workflow/tests/test_tasks.py +++ b/workflow/tests/test_tasks.py @@ -1,83 +1,84 @@ from rocoto.tasks import Tasks + class TestTasks: - ''' - Tasks class tests + ''' + Tasks class tests - Note: this is currently only testing a small fraction of the class. - ''' + Note: this is currently only testing a small fraction of the class. + ''' - def test_job_groups(self): - test_array = list(range(0, 24)) + def test_job_groups(self): + test_array = list(range(0, 24)) - # Test simple splitting with no breakpoints - test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5], 'seg': 0}, - {'fhrs': [6, 7, 8, 9, 10, 11], 'seg': 0}, - {'fhrs': [12, 13, 14, 15, 16, 17], 'seg': 0}, - {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 0}] - assert Tasks.get_job_groups(fhrs=test_array, ngroups=4) == test_groups + # Test simple splitting with no breakpoints + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5], 'seg': 0}, + {'fhrs': [6, 7, 8, 9, 10, 11], 'seg': 0}, + {'fhrs': [12, 13, 14, 15, 16, 17], 'seg': 0}, + {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 0}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4) == test_groups - # Test with a break point that aligns with normal split point - test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5], 'seg': 0}, - {'fhrs': [6, 7, 8, 9, 10, 11], 'seg': 0}, - {'fhrs': [12, 13, 14, 15, 16, 17], 'seg': 1}, - {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 1}] - assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[11]) == test_groups + # Test with a break point that aligns with normal split point + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5], 'seg': 0}, + {'fhrs': [6, 7, 8, 9, 10, 11], 'seg': 0}, + {'fhrs': [12, 13, 14, 15, 16, 17], 'seg': 1}, + {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 1}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[11]) == test_groups - # Test with a break point not at a normal split point - test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5, 6, 7], 'seg': 0}, - {'fhrs': [8, 9, 10, 11, 12, 13, 14], 'seg': 0}, - {'fhrs': [15, 16, 17, 18, 19], 'seg': 1}, - {'fhrs': [20, 21, 22, 23], 'seg': 1}] - assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[14]) == test_groups + # Test with a break point not at a normal split point + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5, 6, 7], 'seg': 0}, + {'fhrs': [8, 9, 10, 11, 12, 13, 14], 'seg': 0}, + {'fhrs': [15, 16, 17, 18, 19], 'seg': 1}, + {'fhrs': [20, 21, 22, 23], 'seg': 1}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[14]) == test_groups - # Test highly skewed break point - test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5, 6, 7], 'seg': 0}, - {'fhrs': [8, 9, 10, 11, 12, 13, 14, 15], 'seg': 0}, - {'fhrs': [16, 17, 18, 19, 20, 21, 22], 'seg': 0}, - {'fhrs': [23], 'seg': 1}] - assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[22]) == test_groups + # Test highly skewed break point + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5, 6, 7], 'seg': 0}, + {'fhrs': [8, 9, 10, 11, 12, 13, 14, 15], 'seg': 0}, + {'fhrs': [16, 17, 18, 19, 20, 21, 22], 'seg': 0}, + {'fhrs': [23], 'seg': 1}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[22]) == test_groups - # Test with two break points that align - test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5], 'seg': 0}, - {'fhrs': [6, 7, 8, 9, 10, 11], 'seg': 0}, - {'fhrs': [12, 13, 14, 15, 16, 17], 'seg': 1}, - {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 2}] - assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[11, 17]) == test_groups + # Test with two break points that align + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5], 'seg': 0}, + {'fhrs': [6, 7, 8, 9, 10, 11], 'seg': 0}, + {'fhrs': [12, 13, 14, 15, 16, 17], 'seg': 1}, + {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 2}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[11, 17]) == test_groups - # Test with two skewed break points - test_groups = [{'fhrs': [0, 1], 'seg': 0}, - {'fhrs': [2, 3, 4, 5, 6, 7], 'seg': 1}, - {'fhrs': [8, 9, 10, 11, 12], 'seg': 1}, - {'fhrs': [13, 14, 15, 16, 17], 'seg': 1}, - {'fhrs': [18, 19, 20, 21, 22], 'seg': 1}, - {'fhrs': [23], 'seg': 2}] - assert Tasks.get_job_groups(fhrs=test_array, ngroups=6, breakpoints=[1, 22]) == test_groups + # Test with two skewed break points + test_groups = [{'fhrs': [0, 1], 'seg': 0}, + {'fhrs': [2, 3, 4, 5, 6, 7], 'seg': 1}, + {'fhrs': [8, 9, 10, 11, 12], 'seg': 1}, + {'fhrs': [13, 14, 15, 16, 17], 'seg': 1}, + {'fhrs': [18, 19, 20, 21, 22], 'seg': 1}, + {'fhrs': [23], 'seg': 2}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=6, breakpoints=[1, 22]) == test_groups - # Test slightly irregular break points - test_groups = [{'fhrs': [0, 1, 2, 3], 'seg': 0}, - {'fhrs': [4, 5, 6], 'seg': 0}, - {'fhrs': [7, 8, 9, 10], 'seg': 1}, - {'fhrs': [11, 12, 13, 14], 'seg': 1}, - {'fhrs': [15, 16, 17, 18], 'seg': 1}, - {'fhrs': [19, 20, 21, 22, 23], 'seg': 2}] - assert Tasks.get_job_groups(fhrs=test_array, ngroups=6, breakpoints=[6, 18]) == test_groups + # Test slightly irregular break points + test_groups = [{'fhrs': [0, 1, 2, 3], 'seg': 0}, + {'fhrs': [4, 5, 6], 'seg': 0}, + {'fhrs': [7, 8, 9, 10], 'seg': 1}, + {'fhrs': [11, 12, 13, 14], 'seg': 1}, + {'fhrs': [15, 16, 17, 18], 'seg': 1}, + {'fhrs': [19, 20, 21, 22, 23], 'seg': 2}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=6, breakpoints=[6, 18]) == test_groups - # Test more groups than fhrs available - test_array = list(range(0, 6)) - test_groups = [{'fhrs': [0], 'seg': 0}, - {'fhrs': [1], 'seg': 0}, - {'fhrs': [2], 'seg': 0}, - {'fhrs': [3], 'seg': 0}, - {'fhrs': [4], 'seg': 0}, - {'fhrs': [5], 'seg': 0}] - assert Tasks.get_job_groups(fhrs=test_array, ngroups=15) == test_groups + # Test more groups than fhrs available + test_array = list(range(0, 6)) + test_groups = [{'fhrs': [0], 'seg': 0}, + {'fhrs': [1], 'seg': 0}, + {'fhrs': [2], 'seg': 0}, + {'fhrs': [3], 'seg': 0}, + {'fhrs': [4], 'seg': 0}, + {'fhrs': [5], 'seg': 0}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=15) == test_groups - def test_multiply_HMS(self): - assert Tasks.multiply_HMS('00:10:00', 2) == '00:20:00' - assert Tasks.multiply_HMS('00:30:00', 10) == '05:00:00' - assert Tasks.multiply_HMS('01:15:00', 4) == '05:00:00' - assert Tasks.multiply_HMS('00:05:00', 1.5) == '00:07:30' - assert Tasks.multiply_HMS('00:40:00', 2.5) == '01:40:00' - assert Tasks.multiply_HMS('00:10:00', 1) == '00:10:00' + def test_multiply_HMS(self): + assert Tasks.multiply_HMS('00:10:00', 2) == '00:20:00' + assert Tasks.multiply_HMS('00:30:00', 10) == '05:00:00' + assert Tasks.multiply_HMS('01:15:00', 4) == '05:00:00' + assert Tasks.multiply_HMS('00:05:00', 1.5) == '00:07:30' + assert Tasks.multiply_HMS('00:40:00', 2.5) == '01:40:00' + assert Tasks.multiply_HMS('00:10:00', 1) == '00:10:00' From 9f05093c27a935b755ad5d3da499351bbe7f9014 Mon Sep 17 00:00:00 2001 From: "Walter.Kolczynski" Date: Wed, 8 Jan 2025 14:27:13 -0600 Subject: [PATCH 14/21] Fix ensstat exit code --- jobs/rocoto/atmos_ensstat.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jobs/rocoto/atmos_ensstat.sh b/jobs/rocoto/atmos_ensstat.sh index 46aae58b905..617cbd77f89 100755 --- a/jobs/rocoto/atmos_ensstat.sh +++ b/jobs/rocoto/atmos_ensstat.sh @@ -29,4 +29,4 @@ for FORECAST_HOUR in "${fhr_list[@]}"; do [[ ${status} -ne 0 ]] && exit "${status}" done -exit $? +exit 0 From a6ed2fdd5da7f6972b78a601891f6468787519eb Mon Sep 17 00:00:00 2001 From: "Walter.Kolczynski" Date: Wed, 8 Jan 2025 22:55:53 -0600 Subject: [PATCH 15/21] Switch type hint from pipe to Union for backwards compatability --- workflow/rocoto/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workflow/rocoto/tasks.py b/workflow/rocoto/tasks.py index 77971837ab1..028b1760c6e 100644 --- a/workflow/rocoto/tasks.py +++ b/workflow/rocoto/tasks.py @@ -4,7 +4,7 @@ from applications.applications import AppConfig import rocoto.rocoto as rocoto from wxflow import Template, TemplateConstants, to_timedelta, timedelta_to_HMS -from typing import List +from typing import List, Union __all__ = ['Tasks'] @@ -277,7 +277,7 @@ def get_grouped_fhr_dict(self, fhrs: List[int], ngroups: int) -> dict: return fhr_var_dict @staticmethod - def multiply_HMS(hms_timedelta: str, multiplier: int | float) -> str: + def multiply_HMS(hms_timedelta: str, multiplier: Union[int, float]) -> str: ''' Multiplies an HMS timedelta string From 77b7312df782a628b92bef98d0c338a988bdcdd2 Mon Sep 17 00:00:00 2001 From: "Walter.Kolczynski" Date: Wed, 8 Jan 2025 23:18:55 -0600 Subject: [PATCH 16/21] Use bisect instead of index to split fhrs Switches from using index to using bisect_array. This makes the split more forgiving, since the fhr of the breakpoint need not be in the fhr array. Unit tests still pass with this change. --- workflow/rocoto/tasks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/workflow/rocoto/tasks.py b/workflow/rocoto/tasks.py index 028b1760c6e..a1566a1e62f 100644 --- a/workflow/rocoto/tasks.py +++ b/workflow/rocoto/tasks.py @@ -5,6 +5,7 @@ import rocoto.rocoto as rocoto from wxflow import Template, TemplateConstants, to_timedelta, timedelta_to_HMS from typing import List, Union +from bisect import bisect_right __all__ = ['Tasks'] @@ -211,7 +212,7 @@ def get_job_groups(fhrs: List[int], ngroups: int, breakpoints: List[int] = None) ngroups = len(fhrs) # First, split at segment boundaries - fhrs_segs = [grp.tolist() for grp in np.array_split(fhrs, [fhrs.index(bpnt) + 1 for bpnt in breakpoints])] + fhrs_segs = [grp.tolist() for grp in np.array_split(fhrs, [bisect_right(fhrs, bpnt) for bpnt in breakpoints])] seg_lens = [len(seg) for seg in fhrs_segs] # Initialize each segment to be split into one job group From 0dccf85d066919c1562a0e295aad93fcd8feb20f Mon Sep 17 00:00:00 2001 From: "Walter.Kolczynski" Date: Wed, 8 Jan 2025 23:53:01 -0600 Subject: [PATCH 17/21] Remove empty arrays after breakpoint splitting --- workflow/rocoto/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflow/rocoto/tasks.py b/workflow/rocoto/tasks.py index a1566a1e62f..3c215414b58 100644 --- a/workflow/rocoto/tasks.py +++ b/workflow/rocoto/tasks.py @@ -212,7 +212,7 @@ def get_job_groups(fhrs: List[int], ngroups: int, breakpoints: List[int] = None) ngroups = len(fhrs) # First, split at segment boundaries - fhrs_segs = [grp.tolist() for grp in np.array_split(fhrs, [bisect_right(fhrs, bpnt) for bpnt in breakpoints])] + fhrs_segs = [grp.tolist() for grp in np.array_split(fhrs, [bisect_right(fhrs, bpnt) for bpnt in breakpoints if bpnt < max(fhrs)])] seg_lens = [len(seg) for seg in fhrs_segs] # Initialize each segment to be split into one job group From db1226d19a7696564386cad83a19d539f5fd4a35 Mon Sep 17 00:00:00 2001 From: "Walter.Kolczynski" Date: Fri, 10 Jan 2025 01:02:52 -0600 Subject: [PATCH 18/21] Break atmos gempak into groups --- jobs/rocoto/gempak.sh | 19 ++++++++++++++----- parm/config/gfs/config.gempak | 5 ++++- parm/config/gfs/config.resources | 1 + workflow/rocoto/gfs_tasks.py | 18 ++++++++++++------ 4 files changed, 31 insertions(+), 12 deletions(-) diff --git a/jobs/rocoto/gempak.sh b/jobs/rocoto/gempak.sh index f5aea2379dd..d712e9a41d3 100755 --- a/jobs/rocoto/gempak.sh +++ b/jobs/rocoto/gempak.sh @@ -6,11 +6,20 @@ status=$? if (( status != 0 )); then exit "${status}"; fi export job="gempak" -export jobid="${job}.$$" +# shellcheck disable=SC2153 +IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}" -# Execute the JJOB -"${HOMEgfs}/jobs/J${RUN^^}_ATMOS_GEMPAK" +export FORECAST_HOUR jobid +for FORECAST_HOUR in "${fhr_list[@]}"; do + fhr3=$(printf '%03d' "${FORECAST_HOUR}") + jobid="${job}_f${fhr3}.$$" + ############################################################### + # Execute the JJOB + ############################################################### + "${HOMEgfs}/jobs/J${RUN^^}_ATMOS_GEMPAK" + status=$? + [[ ${status} -ne 0 ]] && exit "${status}" +done -status=$? -exit "${status}" +exit 0 diff --git a/parm/config/gfs/config.gempak b/parm/config/gfs/config.gempak index 791770ba4af..db5e85af3f7 100644 --- a/parm/config/gfs/config.gempak +++ b/parm/config/gfs/config.gempak @@ -5,7 +5,10 @@ echo "BEGIN: config.gempak" +# Maximum number of rocoto tasks per member +export MAX_TASKS=25 + # Get task specific resources -. $EXPDIR/config.resources gempak +source "${EXPDIR}/config.resources" gempak echo "END: config.gempak" diff --git a/parm/config/gfs/config.resources b/parm/config/gfs/config.resources index c8eb7592be4..97e6da005cd 100644 --- a/parm/config/gfs/config.resources +++ b/parm/config/gfs/config.resources @@ -1276,6 +1276,7 @@ case ${step} in ;; "gempak") + # Walltime is per forecast hour; will be multipled by group size walltime="00:30:00" ntasks_gdas=2 ntasks_gfs=28 diff --git a/workflow/rocoto/gfs_tasks.py b/workflow/rocoto/gfs_tasks.py index 6e506017518..e73343c65c1 100644 --- a/workflow/rocoto/gfs_tasks.py +++ b/workflow/rocoto/gfs_tasks.py @@ -1526,17 +1526,26 @@ def awips_20km_1p0deg(self): def gempak(self): deps = [] - dep_dict = {'type': 'task', 'name': f'{self.run}_atmos_prod_f#fhr#'} + dep_dict = {'type': 'task', 'name': f'{self.run}_atmos_prod_#fhr_label#'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps) + fhrs = self._get_forecast_hours(self.run, self._configs['gempak']) + max_tasks = self._configs['wavepostsbs']['MAX_TASKS'] + fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) + + resources = self.get_resource('wavepostsbs') + # Adjust walltime based on the largest group + largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) + resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) + gempak_vars = self.envars.copy() - gempak_dict = {'FHR3': '#fhr#'} + gempak_dict = {'FHR_LIST': '#fhr_list#'} for key, value in gempak_dict.items(): gempak_vars.append(rocoto.create_envar(name=key, value=str(value))) resources = self.get_resource('gempak') - task_name = f'{self.run}_gempak_f#fhr#' + task_name = f'{self.run}_gempak_#fhr_label#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, @@ -1548,9 +1557,6 @@ def gempak(self): 'maxtries': '&MAXTRIES;' } - fhrs = self._get_forecast_hours(self.run, self._configs['gempak']) - fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} - fhr_metatask_dict = {'task_name': f'{self.run}_gempak', 'task_dict': task_dict, 'var_dict': fhr_var_dict} From 7f978e6af4fa41d6cf4dc530323a09b4a34d3905 Mon Sep 17 00:00:00 2001 From: "Walter.Kolczynski" Date: Fri, 10 Jan 2025 01:09:47 -0600 Subject: [PATCH 19/21] Add comment about walltime for ensstat --- parm/config/gefs/config.resources | 1 + 1 file changed, 1 insertion(+) diff --git a/parm/config/gefs/config.resources b/parm/config/gefs/config.resources index 0a368372af3..bb33f3eb029 100644 --- a/parm/config/gefs/config.resources +++ b/parm/config/gefs/config.resources @@ -243,6 +243,7 @@ case ${step} in ;; "atmos_ensstat") + # Walltime is per forecast hour; will be multipled by group size export walltime="00:15:00" export ntasks=6 export threads_per_task=1 From 98d6050718ca5beaa876f6ce9bcce3dae3f7cf58 Mon Sep 17 00:00:00 2001 From: "Walter.Kolczynski" Date: Fri, 10 Jan 2025 01:28:02 -0600 Subject: [PATCH 20/21] Fix copy/paste error --- workflow/rocoto/gfs_tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workflow/rocoto/gfs_tasks.py b/workflow/rocoto/gfs_tasks.py index e73343c65c1..9820cfacb9a 100644 --- a/workflow/rocoto/gfs_tasks.py +++ b/workflow/rocoto/gfs_tasks.py @@ -1531,10 +1531,10 @@ def gempak(self): dependencies = rocoto.create_dependency(dep=deps) fhrs = self._get_forecast_hours(self.run, self._configs['gempak']) - max_tasks = self._configs['wavepostsbs']['MAX_TASKS'] + max_tasks = self._configs['gempak']['MAX_TASKS'] fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) - resources = self.get_resource('wavepostsbs') + resources = self.get_resource('gempak') # Adjust walltime based on the largest group largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) From bae8d2de3b2fc3f5c9493dad6d4d46e3ab523081 Mon Sep 17 00:00:00 2001 From: "Walter.Kolczynski" Date: Tue, 14 Jan 2025 09:47:54 -0600 Subject: [PATCH 21/21] Fix gempak stub --- jobs/rocoto/gempak.sh | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/jobs/rocoto/gempak.sh b/jobs/rocoto/gempak.sh index d712e9a41d3..dc1d3f2621e 100755 --- a/jobs/rocoto/gempak.sh +++ b/jobs/rocoto/gempak.sh @@ -10,10 +10,10 @@ export job="gempak" # shellcheck disable=SC2153 IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}" -export FORECAST_HOUR jobid -for FORECAST_HOUR in "${fhr_list[@]}"; do - fhr3=$(printf '%03d' "${FORECAST_HOUR}") - jobid="${job}_f${fhr3}.$$" +export FHR3 jobid +for fhr in "${fhr_list[@]}"; do + FHR3=$(printf '%03d' "${fhr}") + jobid="${job}_f${FHR3}.$$" ############################################################### # Execute the JJOB ###############################################################