Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
c0f5604
Rename ensemble globus JJob
DavidHuber-NOAA Mar 11, 2025
db1a43f
First stab at ensemble globus archiving
DavidHuber-NOAA Mar 18, 2025
370195a
Merge in develop; resolve conflict in gfs tasks
DavidHuber-NOAA Mar 18, 2025
6013bd0
Remove globus_earc config (not needed)
DavidHuber-NOAA Mar 18, 2025
d1cd8eb
Check for empty transfer sets
DavidHuber-NOAA Mar 19, 2025
1eadd6a
Create ensgrp-based dataset yaml files
DavidHuber-NOAA Mar 19, 2025
db7e051
Move ensgrp definitions to new config file
DavidHuber-NOAA Mar 19, 2025
e946629
Merge remote-tracking branch 'origin/develop' into feature/ens_globus
DavidHuber-NOAA Mar 19, 2025
ef2a3f3
Add globus job description
DavidHuber-NOAA Mar 19, 2025
91da1bf
Return to 40 members per group at C48-96
DavidHuber-NOAA Mar 19, 2025
6222c7b
Update earc JJob name
DavidHuber-NOAA Mar 19, 2025
3974ad2
Update CODEOWNERS
DavidHuber-NOAA Mar 19, 2025
b5d1243
Remove debug print statements
DavidHuber-NOAA Mar 19, 2025
8cb19de
Add globus dependency for enkf cleanup task
DavidHuber-NOAA Mar 19, 2025
e8be679
Merge branch 'develop' into feature/ens_globus
DavidHuber-NOAA Mar 20, 2025
fdb41e4
Add group definitions
DavidHuber-NOAA Mar 20, 2025
2517669
Merge branch 'develop' into feature/ens_globus
DavidHuber-NOAA Mar 20, 2025
38d1d8e
Apply suggestions from code review
DavidHuber-NOAA Mar 20, 2025
5d89c8f
Merge branch 'develop' into feature/ens_globus
DavidHuber-NOAA Mar 21, 2025
e8ea838
Merge branch 'develop' into feature/ens_globus
DavidHuber-NOAA Mar 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs/JGFS_ATMOS_PGRB2_SPEC_NPOESS @WenMeng-NOAA
jobs/JGFS_ATMOS_POSTSND @BoCui-NOAA
jobs/JGFS_ATMOS_VERIFICATION
jobs/JGLOBAL_AERO_ANALYSIS_* @CoryMartin-NOAA
jobs/JGLOBAL_ARCHIVE @DavidHuber-NOAA
jobs/JGLOBAL_*ARCH* @DavidHuber-NOAA
jobs/JGLOBAL_ATMENS_ANALYSIS_* @RussTreadon-NOAA @CoryMartin-NOAA @DavidNew-NOAA
jobs/JGLOBAL_ATMOS_ANALYSIS @RussTreadon-NOAA @CatherineThomas-NOAA
jobs/JGLOBAL_ATMOS_ANALYSIS_CALC @RussTreadon-NOAA @CatherineThomas-NOAA @CoryMartin-NOAA
Expand Down Expand Up @@ -75,8 +75,7 @@ scripts/exgdas_atmos_gempak_gif_ncdc.sh @GwenChen-NOAA
scripts/exgdas_atmos_nawips.sh @GwenChen-NOAA
scripts/exgdas_atmos_verfozn.sh @EdwardSafford-NOAA
scripts/exgdas_atmos_verfrad.sh @EdwardSafford-NOAA
scripts/exgdas_enkf_earc_vrfy.py @DavidHuber-NOAA
scripts/exgdas_enkf_earc_tars.py @DavidHuber-NOAA
scripts/exgdas_enkf_earc_*.py @DavidHuber-NOAA
scripts/exgdas_enkf_ecen.sh @CoryMartin-NOAA @RussTreadon-NOAA @CatherineThomas-NOAA
scripts/exgdas_enkf_post.sh @CoryMartin-NOAA @RussTreadon-NOAA @CatherineThomas-NOAA
scripts/exgdas_enkf_select_obs.sh @CoryMartin-NOAA @RussTreadon-NOAA @CatherineThomas-NOAA
Expand All @@ -97,7 +96,8 @@ scripts/exgfs_pmgr.sh
scripts/exgfs_prdgen_manager.sh
scripts/exgfs_wave_* @JessicaMeixner-NOAA @sbanihash
scripts/exglobal_aero_analysis_* @CoryMartin-NOAA
scripts/exglobal_archive.py @DavidHuber-NOAA
scripts/exglobal_archive_*.py @DavidHuber-NOAA
scripts/exglobal_globus_*.py @DavidHuber-NOAA
scripts/exglobal_atm_analysis_* @RussTreadon-NOAA @DavidNew-NOAA
scripts/exglobal_atmens_analysis_* @RussTreadon-NOAA @DavidNew-NOAA
scripts/exglobal_atmos_analysis*.sh @RussTreadon-NOAA @CoryMartin-NOAA
Expand Down
4 changes: 2 additions & 2 deletions docs/source/hpc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ The Global Workflow provides capabilities for deterministic and ensemble forecas
-
- X
- X
-
- X
* - Orion
- 2
- X
Expand All @@ -131,7 +131,7 @@ The Global Workflow provides capabilities for deterministic and ensemble forecas
-
- X
- X
-
- X
* - Gaea C5
- 3
- X
Expand Down
5 changes: 3 additions & 2 deletions docs/source/jobs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ Jobs in the GFS Configuration
+-------------------+-----------------------------------------------------------------------------------------------------------------------+
| globus_arch | Optional archive job that sends the tarballs generated by arch_tars to HPSS via globus. |
+-------------------+-----------------------------------------------------------------------------------------------------------------------+
| earcN/eamn | Archival script for EnKF: 1) Write select EnKF output to HPSS; 2) Copy select files to online archive; 3) Clean up |
| | EnKF temporary run directories; 4) Remove "old" EnKF files from rotating directory. |
| earcN/eamn | Archival script for EnKF that write selected EnKF output to HPSS or locally |
+-------------------+-----------------------------------------------------------------------------------------------------------------------|
| globus_earcN | Additional archival script that pushes data to HPSS via Niagara. |
+-------------------+-----------------------------------------------------------------------------------------------------------------------+
| ecenN/ecmn | Recenter ensemble members around hi-res deterministic analysis. GFS v16 recenters ensemble member analysis. |
| | increments. |
Expand Down
2 changes: 1 addition & 1 deletion jobs/JGLOBAL_ENS_ARCHIVE_TARS
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#! /usr/bin/env bash

source "${HOMEgfs}/ush/preamble.sh"
source "${HOMEgfs}/ush/jjob_header.sh" -e "earc_tars" -c "base arch_tars earc_tars"
source "${HOMEgfs}/ush/jjob_header.sh" -e "earc_tars" -c "base arch_tars earc_tars earc_groups"


##############################################
Expand Down
File renamed without changes.
15 changes: 10 additions & 5 deletions jobs/JGDAS_ENKF_GLOBUS_ARCH → jobs/JGLOBAL_ENS_GLOBUS_ARCH
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
#! /usr/bin/env bash

source "${HOMEgfs}/ush/preamble.sh"
source "${HOMEgfs}/ush/jjob_header.sh" -e "globus_earc" -c "base globus globus_earc"
source "${HOMEgfs}/ush/jjob_header.sh" -e "globus_earc" -c "base globus earc_groups"


##############################################
# Set variables used in the script
##############################################
MEMDIR="ensstat" YMD=${PDY} HH=${cyc} declare_from_tmpl -rx \
COMIN_CONF:COM_CONF_TMPL


###############################################################
# Run globus transfer script
# Run globus script
###############################################################

"${SCRgfs}/exgdas_enkf_globus_earc.py"
${GLOBALGLOBUSARCHSH:-${SCRgfs}/exglobal_globus_earc.py}
status=$?
[[ ${status} -ne 0 ]] && exit "${status}"

###############################################################

##############################################
# End JOB SPECIFIC work
##############################################
Expand Down
2 changes: 1 addition & 1 deletion jobs/rocoto/earc_vrfy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export jobid="${job}.$$"

###############################################################
# Execute the JJOB
"${HOMEgfs}/jobs/JGDAS_ENKF_ARCHIVE_VRFY"
"${HOMEgfs}/jobs/JGLOBAL_ENS_ARCHIVE_VRFY"
status=$?

exit "${status}"
24 changes: 24 additions & 0 deletions jobs/rocoto/globus_earc.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#! /usr/bin/env bash

source "${HOMEgfs}/ush/preamble.sh"

###############################################################
# Source FV3GFS workflow modules
. "${HOMEgfs}/ush/load_fv3gfs_modules.sh"
status=$?
if [[ ${status} -ne 0 ]]; then exit "${status}"; fi

###############################################################
# setup python path for workflow utilities and tasks
PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${HOMEgfs}/ush/python"
export PYTHONPATH

export job="globus_earc"
export jobid="${job}.$$"

###############################################################
# Execute the JJOB
"${HOMEgfs}/jobs/JGLOBAL_ENS_GLOBUS_ARCH"
status=$?

exit "${status}"
26 changes: 26 additions & 0 deletions parm/config/gfs/config.earc_groups
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#! /usr/bin/env bash

########## config.earc_groups ########
# Specifies the grouping strategy for ensemble archiving
# This is used by the earc_tars and globus_earc jobs


# Set the number of ensemble members to archive per earc_tars/globus_earc job
case "${CASE_ENS}" in
"C48" | "C96")
export NMEM_EARCGRP=80
;;
"C192")
export NMEM_EARCGRP=20
;;
"C384" | "C768")
export NMEM_EARCGRP=10
;;
"C1152")
export NMEM_EARCGRP=4
;;
*)
echo "FATAL ERROR: Unknown ensemble resolution ${CASE_ENS}, ABORT!"
exit 1
;;
esac
22 changes: 1 addition & 21 deletions parm/config/gfs/config.earc_tars
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,14 @@ echo "BEGIN: config.earc_tars"
# Get task specific resources
. "${EXPDIR}/config.resources" "arch_tars"

# Set the number of ensemble members to archive per earc_tars job
case "${CASE_ENS}" in
"C48" | "C96")
export NMEM_EARCGRP=80
;;
"C192")
export NMEM_EARCGRP=20
;;
"C384" | "C768")
export NMEM_EARCGRP=10
;;
"C1152")
export NMEM_EARCGRP=4
;;
*)
echo "FATAL ERROR: Unknown ensemble resolution ${CASE_ENS}, ABORT!"
exit 1
;;
esac

#--starting and ending hours of previous cycles to be removed from rotating directory
export RMOLDSTD_ENKF=144
export RMOLDEND_ENKF=24

# If we are running globus archiving, create tarballs in a temporary location
if [[ "${ARCHCOM_TO}" == "globus_hpss" ]]; then
export ENSGRP="${ENSGRP:-0}"
export ATARDIR="${DATAROOT}/archive_rotdir/${pslot}/${RUN}/${ENSGRP}"
export ATARDIR="${DATAROOT}/archive_rotdir/${PSLOT}/${RUN}/${ENSGRP}"
export LOCALARCH="YES"
fi

Expand Down
7 changes: 5 additions & 2 deletions scripts/exgdas_enkf_earc_tars.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ def main():

# Also import all COMIN* directory and template variables
for key in archive.task_config.keys():
if key.startswith("COM"):
archive_dict[key] = archive.task_config[key]
if key.startswith(("COM_", "COMIN_")):
archive_dict[key] = archive.task_config.get(key)

with chdir(config.ROTDIR):

Expand All @@ -50,6 +50,9 @@ def main():
for atardir_set in atardir_sets:
archive.execute_backup_dataset(atardir_set)

# Clean up any temporary files
archive.clean()


if __name__ == '__main__':
main()
7 changes: 3 additions & 4 deletions scripts/exglobal_archive_tars.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ def main():

archive_dict = AttrDict()
for key in keys:
try:
archive_dict[key] = archive.task_config[key]
except KeyError:
logger.warning(f"WARNING: key ({key}) not found in archive.task_config!")
archive_dict[key] = archive.task_config.get(key)
if archive_dict[key] is None:
logger.warning(f"WARNING: key ({key}) not found in task_config!")

# Also import all COMIN* and COMOUT* directory and template variables
for key in archive.task_config.keys():
Expand Down
51 changes: 51 additions & 0 deletions scripts/exglobal_globus_earc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/usr/bin/env python3

import os

from pygfs.task.globus_hpss import GlobusHpss
from wxflow import AttrDict, Logger, cast_strdict_as_dtypedict, logit

# initialize root logger
logger = Logger(level=os.environ.get("LOGGING_LEVEL", "DEBUG"), colored_log=True)


@logit(logger)
def main():

config = cast_strdict_as_dtypedict(os.environ)

# Instantiate the globus object
globus = GlobusHpss(config)

keys = ['STAGE_DIR', 'current_cycle', 'RUN', 'PDY', 'HOMEgfs', 'sven_dropbox',
'doorman_gendel', 'DATASETS_YAML', 'PARMgfs', 'COMIN_CONF', 'KEEPDATA',
'jobid', 'hpss_target_dir', 'server_home', 'SERVER_NAME', 'DOORMAN_ROOT',
'CLIENT_GLOBUS_UUID', 'SERVER_GLOBUS_UUID', 'PSLOT', 'ENSGRP']

globus_dict = AttrDict()
for key in keys:
try:
globus_dict[key] = globus.task_config[key]
except KeyError:
logger.warning(f"WARNING: key ({key}) not found in globus.task_config!")

# Determine which tarballs to send
transfer_sets = globus.configure(globus_dict)

# Send the tarballs to HPSS via Niagara. Start with non-rstprod (standard) data
count_sets = 0
for transfer_set in ["standard", "rstprod"]:
if len(transfer_sets[transfer_set]['locations']) > 0:
has_rstprod = transfer_set == "rstprod"
globus.execute_transfer_data(transfer_sets[transfer_set], has_rstprod)
count_sets += 1

if count_sets == 0:
raise RuntimeError("FATAL ERROR: Transfer sets were all empty!")

# Clean up any temporary files
globus.clean()


if __name__ == '__main__':
main()
19 changes: 15 additions & 4 deletions ush/python/pygfs/task/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ def configure_tars(self, arch_dict: Dict[str, Any]) -> (List[Dict[str, Any]]):
atardir_sets.append(dataset)

# Save the tarball list as a YAML in case we are using globus
self._create_datasets_yaml(atardir_sets)
group = arch_dict.get("ENSGRP", -1)
self._create_datasets_yaml(atardir_sets, group)

return atardir_sets

Expand Down Expand Up @@ -605,18 +606,28 @@ def _pop_git_info(self, arch_dict: Dict[str, Any]) -> Dict[str, Any]:
return

@logit(logger)
def _create_datasets_yaml(self, datasets):
def _create_datasets_yaml(self, datasets, group=-1):
"""
Go through the dataset dictionaries, extract the tarball names and has_rstprod
boolean, and write a YAML with the info in COM_CONF.
boolean, and write a YAML with the info in COM_CONF. The group number is appended
to the YAML name if it is not -1.
Comment thread
DavidHuber-NOAA marked this conversation as resolved.
Group definitions
group=-1: deterministic (non-ensemble)
group=0: ensemble aggregates (means, spreads, etc)
group=1..n: groups of individual ensemble members
"""

if len(datasets) == 0:
logger.warning("WARNING: Skipping dataset YAML creation as no datasets were provided.")
return

com_conf = self.task_config.COMOUT_CONF
yaml_filename = "backup_tarballs.yaml"

if group < 0:
Comment thread
aerorahul marked this conversation as resolved.
yaml_filename = "backup_tarballs.yaml"
else:
yaml_filename = f"backup_tarballs_group{group}.yaml"

yaml_filename = os.path.join(com_conf, yaml_filename)

output_yaml = {}
Expand Down
14 changes: 7 additions & 7 deletions ush/python/pygfs/task/globus_hpss.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,17 @@ def configure(self, globus_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[s
com_conf = globus_dict.COMIN_CONF

# Collect the files and properties from the input YAML
backup_yaml = os.path.join(com_conf, "backup_tarballs.yaml")
group = globus_dict.get("ENSGRP", -1)
if group < 0:
backup_yaml = os.path.join(com_conf, "backup_tarballs.yaml")
else:
backup_yaml = os.path.join(com_conf, f"backup_tarballs_group{group}.yaml")

# Parse the list of tarballs to archive
if os.path.isfile(backup_yaml):
backup_set = AttrDict(**parse_yaml(backup_yaml))
else:
raise FileNotFoundError("Backup tarball YAML is missing! ({backup_yaml})")
raise FileNotFoundError(f"Backup tarball YAML is missing! ({backup_yaml})")

# Create a standard and rstprod backup sets for any restricted tarballs
standard_backup_set = []
Expand Down Expand Up @@ -190,7 +194,7 @@ def configure(self, globus_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[s
todo_script = Jinja(todo_jinja, data=globus_dict, allow_missing=False).render
transfer_sets["standard"]["todo"] = todo_script

rstprod_todo_jinja = os.path.join(globus_parm, "rstprod_todo.sh.j2")
rstprod_todo_jinja = os.path.join(globus_parm, "todo_rstprod.sh.j2")
rstprod_todo_script = Jinja(rstprod_todo_jinja, data=globus_dict, allow_missing=False).render
transfer_sets["rstprod"]["todo"] = rstprod_todo_script

Expand Down Expand Up @@ -253,10 +257,6 @@ def execute_transfer_data(self, transfer_set: Dict[str, Any], has_rstprod: bool)
with open("init_xfer.sh", "w") as init_f:
init_f.write(transfer_set["init_xfer.sh"])

# Make run_doorman.sh and init_xfer.sh executable
os.chmod("run_doorman.sh", 0o740)
os.chmod("init_xfer.sh", 0o740)

# Initialize the server
self._init_server(transfer_set["server_job_dir"])

Expand Down
Loading