Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
94143aa
Reinstate product groups
WalterKolczynski-NOAA Jan 6, 2025
c4782e2
Merge branch 'develop' into feature/prod_groups
WalterKolczynski-NOAA Jan 7, 2025
e741267
Add wave gridded post data dependency
WalterKolczynski-NOAA Jan 7, 2025
f6c476d
Fix documentation typos in tasks.py
WalterKolczynski-NOAA Jan 7, 2025
7287f0f
Use fhr in jobid for product jobs
WalterKolczynski-NOAA Jan 7, 2025
b8c832d
Add missing whitespace
WalterKolczynski-NOAA Jan 8, 2025
f0444d5
Add comment about removing output hours for replay
WalterKolczynski-NOAA Jan 8, 2025
d66e3c0
Merge branch 'develop' into feature/prod_groups
aerorahul Jan 8, 2025
23b9850
Merge branch 'develop' into feature/prod_groups
WalterKolczynski-NOAA Jan 8, 2025
833cb44
Move new task tests to separate file
WalterKolczynski-NOAA Jan 8, 2025
bc07a7f
Switch from unittest to pytest for Tasks test
WalterKolczynski-NOAA Jan 8, 2025
d6b5a5c
Extend grouping to ensstat
WalterKolczynski-NOAA Jan 8, 2025
898607c
Reset permissions on workflow tests
WalterKolczynski-NOAA Jan 8, 2025
d6492ad
Move workflow tests to tests directory
WalterKolczynski-NOAA Jan 8, 2025
9372fd2
Update ensstat rocoto stub to loop over fhr
WalterKolczynski-NOAA Jan 8, 2025
c24412d
Replace tabs with spaces in tasks test
WalterKolczynski-NOAA Jan 8, 2025
9f05093
Fix ensstat exit code
WalterKolczynski-NOAA Jan 8, 2025
a6ed2fd
Switch type hint from pipe to Union for backwards compatability
WalterKolczynski-NOAA Jan 9, 2025
77b7312
Use bisect instead of index to split fhrs
WalterKolczynski-NOAA Jan 9, 2025
0dccf85
Remove empty arrays after breakpoint splitting
WalterKolczynski-NOAA Jan 9, 2025
db1226d
Break atmos gempak into groups
WalterKolczynski-NOAA Jan 10, 2025
7f978e6
Add comment about walltime for ensstat
WalterKolczynski-NOAA Jan 10, 2025
98d6050
Fix copy/paste error
WalterKolczynski-NOAA Jan 10, 2025
48c1acb
Merge branch 'develop' into feature/prod_groups
aerorahul Jan 10, 2025
bae8d2d
Fix gempak stub
WalterKolczynski-NOAA Jan 14, 2025
cae19d4
Merge branch 'develop' into feature/prod_groups
WalterKolczynski-NOAA Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions jobs/rocoto/atmos_products.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@ if (( status != 0 )); then exit "${status}"; fi
export job="atmos_products"
export jobid="${job}.$$"
Comment thread
WalterKolczynski-NOAA marked this conversation as resolved.
Outdated

# Negatation needs to be before the base
fhr3_base="10#${FHR3}"
export FORECAST_HOUR=$(( ${fhr3_base/10#-/-10#} ))
# shellcheck disable=SC2153
IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}"

###############################################################
# Execute the JJOB
###############################################################
"${HOMEgfs}/jobs/JGLOBAL_ATMOS_PRODUCTS"
export FORECAST_HOUR
for FORECAST_HOUR in "${fhr_list[@]}"; do
###############################################################
# Execute the JJOB
###############################################################
"${HOMEgfs}/jobs/JGLOBAL_ATMOS_PRODUCTS"
status=$?
Comment thread
WalterKolczynski-NOAA marked this conversation as resolved.
[[ ${status} -ne 0 ]] && exit "${status}"
done

exit $?
exit 0
18 changes: 12 additions & 6 deletions jobs/rocoto/oceanice_products.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@ if (( status != 0 )); then exit "${status}"; fi
export job="oceanice_products"
export jobid="${job}.$$"

export FORECAST_HOUR=$(( 10#${FHR3} ))
# shellcheck disable=SC2153
IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}"

###############################################################
# Execute the JJOB
###############################################################
"${HOMEgfs}/jobs/JGLOBAL_OCEANICE_PRODUCTS"
export FORECAST_HOUR
for FORECAST_HOUR in "${fhr_list[@]}"; do
###############################################################
# Execute the JJOB
###############################################################
"${HOMEgfs}/jobs/JGLOBAL_OCEANICE_PRODUCTS"
Comment thread
WalterKolczynski-NOAA marked this conversation as resolved.
status=$?
[[ ${status} -ne 0 ]] && exit "${status}"
done

exit $?
exit 0
19 changes: 13 additions & 6 deletions jobs/rocoto/wavepostsbs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,24 @@ source "${HOMEgfs}/ush/preamble.sh"
###############################################################
# Source FV3GFS workflow modules
#. ${HOMEgfs}/ush/load_fv3gfs_modules.sh
. ${HOMEgfs}/ush/load_ufswm_modules.sh
source "${HOMEgfs}/ush/load_ufswm_modules.sh"
status=$?
[[ ${status} -ne 0 ]] && exit ${status}
[[ ${status} -ne 0 ]] && exit "${status}"

export job="wavepostsbs"
export jobid="${job}.$$"

###############################################################
# Execute the JJOB
${HOMEgfs}/jobs/JGLOBAL_WAVE_POST_SBS
status=$?
[[ ${status} -ne 0 ]] && exit ${status}
# shellcheck disable=SC2153
IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}"

export FHR3
for FORECAST_HOUR in "${fhr_list[@]}"; do
FHR3=$(printf '%03d' "${FORECAST_HOUR}")
# Execute the JJOB
"${HOMEgfs}/jobs/JGLOBAL_WAVE_POST_SBS"
Comment thread
WalterKolczynski-NOAA marked this conversation as resolved.
status=$?
[[ ${status} -ne 0 ]] && exit "${status}"
done

exit 0
4 changes: 2 additions & 2 deletions parm/config/gefs/config.atmos_products
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ echo "BEGIN: config.atmos_products"
# Get task specific resources
. "${EXPDIR}/config.resources" atmos_products

# No. of forecast hours to process in a single job
export NFHRS_PER_GROUP=3
# Maximum number of rocoto tasks per member
export MAX_TASKS=25

# Scripts used by this job
export INTERP_ATMOS_MASTERSH="${USHgfs}/interp_atmos_master.sh"
Expand Down
4 changes: 2 additions & 2 deletions parm/config/gefs/config.oceanice_products
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ source "${EXPDIR}/config.resources" oceanice_products

export OCEANICEPRODUCTS_CONFIG="${PARMgfs}/post/oceanice_products_gefs.yaml"

# No. of forecast hours to process in a single job
export NFHRS_PER_GROUP=3
# Maximum number of rocoto tasks per member
export MAX_TASKS=25

echo "END: config.oceanice_products"
5 changes: 4 additions & 1 deletion parm/config/gefs/config.resources
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ case ${step} in
;;

"atmos_products")
# Walltime is per forecast hour; will be multipled by group size
export walltime="00:15:00"
export ntasks=24
export threads_per_task=1
Expand All @@ -250,6 +251,7 @@ case ${step} in
;;

"oceanice_products")
# Walltime is per forecast hour; will be multipled by group size
export walltime="00:15:00"
export ntasks=1
export tasks_per_node=1
Expand All @@ -258,7 +260,8 @@ case ${step} in
;;

"wavepostsbs")
export walltime="03:00:00"
# Walltime is per forecast hour; will be multipled by group size
export walltime="00:15:00"
export ntasks=1
export threads_per_task=1
export tasks_per_node=$(( max_tasks_per_node / threads_per_task ))
Expand Down
3 changes: 3 additions & 0 deletions parm/config/gefs/config.wavepostsbs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ echo "BEGIN: config.wavepostsbs"
# Get task specific resources
source "${EXPDIR}/config.resources" wavepostsbs

# Maximum number of rocoto tasks per member
export MAX_TASKS=25

# Subgrid info for grib2 encoding
export WAV_SUBGRBSRC=""
export WAV_SUBGRB=""
Expand Down
4 changes: 2 additions & 2 deletions parm/config/gfs/config.atmos_products
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ echo "BEGIN: config.atmos_products"
# Get task specific resources
. "${EXPDIR}/config.resources" atmos_products

# No. of forecast hours to process in a single job
export NFHRS_PER_GROUP=3
## Maximum number of rocoto tasks per member
export MAX_TASKS=25

# Scripts used by this job
export INTERP_ATMOS_MASTERSH="${USHgfs}/interp_atmos_master.sh"
Expand Down
3 changes: 3 additions & 0 deletions parm/config/gfs/config.oceanice_products
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ echo "BEGIN: config.oceanice_products"
# Get task specific resources
source "${EXPDIR}/config.resources" oceanice_products

# Maximum number of rocoto tasks per member
export MAX_TASKS=25

export OCEANICEPRODUCTS_CONFIG="${PARMgfs}/post/oceanice_products.yaml"

# No. of forecast hours to process in a single job
Expand Down
7 changes: 5 additions & 2 deletions parm/config/gfs/config.resources
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,9 @@ case ${step} in
;;

"wavepostsbs")
walltime_gdas="00:20:00"
walltime_gfs="03:00:00"
# Walltime is per forecast hour; will be multipled by group size
walltime_gdas="00:15:00"
walltime_gfs="00:15:00"
ntasks=8
threads_per_task=1
tasks_per_node=$(( max_tasks_per_node / threads_per_task ))
Expand Down Expand Up @@ -911,6 +912,7 @@ case ${step} in
;;

"oceanice_products")
# Walltime is per forecast hour; will be multipled by group size
walltime="00:15:00"
ntasks=1
tasks_per_node=1
Expand Down Expand Up @@ -944,6 +946,7 @@ case ${step} in
;;

"atmos_products")
# Walltime is per forecast hour; will be multipled by group size
walltime="00:15:00"
ntasks=24
threads_per_task=1
Expand Down
3 changes: 3 additions & 0 deletions parm/config/gfs/config.wavepostsbs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ echo "BEGIN: config.wavepostsbs"
# Get task specific resources
source "${EXPDIR}/config.resources" wavepostsbs

# Maximum number of rocoto tasks per member
export MAX_TASKS=25

# Subgrid info for grib2 encoding
export WAV_SUBGRBSRC=""
export WAV_SUBGRB=""
Expand Down
72 changes: 40 additions & 32 deletions workflow/rocoto/gefs_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,39 +190,57 @@ def _atmosoceaniceprod(self, component: str):
fhout_ice_gfs = self._configs['base']['FHOUT_ICE_GFS']
products_dict = {'atmos': {'config': 'atmos_products',
'history_path_tmpl': 'COM_ATMOS_MASTER_TMPL',
'history_file_tmpl': f'{self.run}.t@Hz.master.grb2f#fhr#'},
'history_file_tmpl': f'{self.run}.t@Hz.master.grb2f#fhr3_last#'},
'ocean': {'config': 'oceanice_products',
'history_path_tmpl': 'COM_OCEAN_HISTORY_TMPL',
'history_file_tmpl': f'{self.run}.ocean.t@Hz.{fhout_ocn_gfs}hr_avg.f#fhr_next#.nc'},
'history_file_tmpl': f'{self.run}.ocean.t@Hz.{fhout_ocn_gfs}hr_avg.f#fhr3_next#.nc'},
'ice': {'config': 'oceanice_products',
'history_path_tmpl': 'COM_ICE_HISTORY_TMPL',
'history_file_tmpl': f'{self.run}.ice.t@Hz.{fhout_ice_gfs}hr_avg.f#fhr#.nc'}}
'history_file_tmpl': f'{self.run}.ice.t@Hz.{fhout_ice_gfs}hr_avg.f#fhr3_last#.nc'}}

component_dict = products_dict[component]
config = component_dict['config']
history_path_tmpl = component_dict['history_path_tmpl']
history_file_tmpl = component_dict['history_file_tmpl']

max_tasks = self._configs[config]['MAX_TASKS']
resources = self.get_resource(config)

fhrs = self._get_forecast_hours('gefs', self._configs[config], component)

# when replaying, atmos component does not have fhr 0, therefore remove 0 from fhrs
is_replay = self._configs[config]['REPLAY_ICS']
if is_replay and component in ['atmos'] and 0 in fhrs:
fhrs.remove(0)

# ocean/ice components do not have fhr 0 as they are averaged output
if component in ['ocean', 'ice'] and 0 in fhrs:
fhrs.remove(0)

fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks)

# Adjust walltime based on the largest group
largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')])
resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group)

history_path = self._template_to_rocoto_cycstring(self._base[history_path_tmpl], {'MEMDIR': 'mem#member#'})
deps = []
data = f'{history_path}/{history_file_tmpl}'
dep_dict = {'type': 'data', 'data': data, 'age': 120}
deps.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'metatask', 'name': 'gefs_fcst_mem#member#'}
dep_dict = {'type': 'task', 'name': 'gefs_fcst_mem#member#_#seg_dep#'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps, dep_condition='or')

postenvars = self.envars.copy()
postenvar_dict = {'ENSMEM': '#member#',
'MEMDIR': 'mem#member#',
'FHR3': '#fhr#',
'FHR_LIST': '#fhr_list#',
'COMPONENT': component}
for key, value in postenvar_dict.items():
postenvars.append(rocoto.create_envar(name=key, value=str(value)))

task_name = f'gefs_{component}_prod_mem#member#_f#fhr#'
task_name = f'gefs_{component}_prod_mem#member#_#fhr_label#'
task_dict = {'task_name': task_name,
'resources': resources,
'dependency': dependencies,
Expand All @@ -233,22 +251,6 @@ def _atmosoceaniceprod(self, component: str):
'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log',
'maxtries': '&MAXTRIES;'}

fhrs = self._get_forecast_hours('gefs', self._configs[config], component)

# when replaying, atmos component does not have fhr 0, therefore remove 0 from fhrs
is_replay = self._configs[config]['REPLAY_ICS']
if is_replay and component in ['atmos'] and 0 in fhrs:
fhrs.remove(0)

# ocean/ice components do not have fhr 0 as they are averaged output
if component in ['ocean', 'ice'] and 0 in fhrs:
fhrs.remove(0)

fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])}
if component in ['ocean']:
fhrs_next = fhrs[1:] + [fhrs[-1] + (fhrs[-1] - fhrs[-2])]
fhr_var_dict['fhr_next'] = ' '.join([f"{fhr:03d}" for fhr in fhrs_next])

fhr_metatask_dict = {'task_name': f'gefs_{component}_prod_#member#',
'task_dict': task_dict,
'var_dict': fhr_var_dict}
Expand Down Expand Up @@ -308,22 +310,35 @@ def atmos_ensstat(self):
return task

def wavepostsbs(self):

Comment thread
WalterKolczynski-NOAA marked this conversation as resolved.
deps = []
dep_dict = {'type': 'metatask', 'name': f'gefs_fcst_mem#member#'}
dep_dict = {'type': 'task', 'name': f'gefs_fcst_mem#member#_#seg_dep#'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)

fhrs = self._get_forecast_hours('gefs', self._configs['wavepostsbs'], 'wave')
is_replay = self._configs['wavepostsbs']['REPLAY_ICS']
if is_replay:
fhrs = [fhr for fhr in fhrs if fhr not in [0, 1, 2]]
Comment thread
WalterKolczynski-NOAA marked this conversation as resolved.

max_tasks = self._configs['wavepostsbs']['MAX_TASKS']
fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks)

wave_post_envars = self.envars.copy()
postenvar_dict = {'ENSMEM': '#member#',
'MEMDIR': 'mem#member#',
'FHR3': '#fhr#',
'FHR_LIST': '#fhr_list#',
}
for key, value in postenvar_dict.items():
wave_post_envars.append(rocoto.create_envar(name=key, value=str(value)))

resources = self.get_resource('wavepostsbs')

task_name = f'gefs_wave_post_grid_mem#member#_f#fhr#'
# Adjust walltime based on the largest group
largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')])
resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group)

task_name = f'gefs_wave_post_grid_mem#member#_#fhr_label#'
task_dict = {'task_name': task_name,
'resources': resources,
'dependency': dependencies,
Expand All @@ -335,13 +350,6 @@ def wavepostsbs(self):
'maxtries': '&MAXTRIES;'
}

fhrs = self._get_forecast_hours('gefs', self._configs['wavepostsbs'], 'wave')
is_replay = self._configs['wavepostsbs']['REPLAY_ICS']
if is_replay:
fhrs = [fhr for fhr in fhrs if fhr not in [0, 1, 2]]

fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])}

fhr_metatask_dict = {'task_name': f'gefs_wave_post_grid_#member#',
'task_dict': task_dict,
'var_dict': fhr_var_dict}
Expand Down
Loading