diff --git a/dev/parm/config/gfs/yaml/defaults.yaml b/dev/parm/config/gfs/yaml/defaults.yaml index 15ed548d145..79bd5db0044 100644 --- a/dev/parm/config/gfs/yaml/defaults.yaml +++ b/dev/parm/config/gfs/yaml/defaults.yaml @@ -17,6 +17,10 @@ base: DO_METP: "YES" FHMAX_GFS: 120 FHMAX_HF_GFS: 48 + FHMAX_GFS_00: 384 + FHMAX_GFS_06: 180 + FHMAX_GFS_12: 384 + FHMAX_GFS_18: 180 FCST_BREAKPOINTS: "" DO_GSISOILDA: "NO" DO_LAND_IAU: ".false." diff --git a/dev/workflow/CYCLE_SPECIFIC_FORECAST_LENGTHS.md b/dev/workflow/CYCLE_SPECIFIC_FORECAST_LENGTHS.md new file mode 100644 index 00000000000..1ad8b39b810 --- /dev/null +++ b/dev/workflow/CYCLE_SPECIFIC_FORECAST_LENGTHS.md @@ -0,0 +1,89 @@ +# Cycle-Specific Forecast Lengths for GFS + +## Overview + +This feature allows the Global Forecast System (GFS) to run different forecast cycles (00z, 06z, 12z, 18z) to different output lengths. This helps save compute time by running shorter forecasts for some cycles while maintaining longer forecasts for others. + +## Configuration + +To enable cycle-specific forecast lengths, add the following parameters to your configuration YAML file (e.g., `dev/parm/config/gfs/yaml/defaults.yaml`): + +```yaml +base: + FHMAX_GFS_00: 384 # 16 days for 00z cycle + FHMAX_GFS_06: 180 # 7.5 days for 06z cycle + FHMAX_GFS_12: 384 # 16 days for 12z cycle + FHMAX_GFS_18: 180 # 7.5 days for 18z cycle +``` + +These values override the default `FHMAX_GFS` for their respective cycles. + +## How It Works + +### Cycledef Generation + +When cycle-specific FHMAX values are defined, the workflow generates separate Rocoto cycledefs for each cycle: +- `gfs_00` - Runs at 00z +- `gfs_06` - Runs at 06z +- `gfs_12` - Runs at 12z +- `gfs_18` - Runs at 18z + +The standard `gfs` cycledef is still generated for compatibility. + +### Task Generation + +Tasks that depend on forecast length automatically detect cycle-specific FHMAX values: + +- **Product Generation Tasks** (atmos_prod, ocean_prod, ice_prod): + - When FHMAX values differ across cycles, separate metatasks are created + - Example: `gfs_atmos_prod_00`, `gfs_atmos_prod_06`, etc. + - Each metatask runs only for its designated cycle with appropriate forecast hours + +- **UPP Tasks** (atmupp, goesupp): + - Similar behavior to product tasks + - Cycle-specific versions created when needed + +### Backward Compatibility + +When cycle-specific FHMAX values are not defined or are all the same, the system uses the standard implementation with a single metatask, maintaining full backward compatibility. + +## Example: Operational-like Setup + +16-day forecasts for synoptic cycles (00z, 12z) and 7.5-day forecasts for intermediate cycles (06z, 18z): + +```yaml +base: + INTERVAL_GFS: 6 # Run GFS every 6 hours + FHMAX_GFS: 120 # Default fallback (5 days) + FHMAX_HF_GFS: 48 # High-frequency output to 48 hours + FHMAX_GFS_00: 384 # 16 days for 00z + FHMAX_GFS_06: 180 # 7.5 days for 06z + FHMAX_GFS_12: 384 # 16 days for 12z + FHMAX_GFS_18: 180 # 7.5 days for 18z +``` + +## Implementation Details + +### Code Locations + +- **Configuration**: `dev/parm/config/gfs/yaml/defaults.yaml` +- **Cycledef Generation**: `dev/workflow/rocoto/gfs_cycled_xml.py` +- **Forecast Hour Utilities**: `dev/workflow/rocoto/tasks.py` +- **Task Generation**: `dev/workflow/rocoto/gfs_tasks.py` + +### Forecast Hour Calculation + +For each cycle, forecast hours are determined by: +1. Check if `FHMAX_GFS_` exists in configuration +2. If yes, use that value; otherwise fall back to `FHMAX_GFS` +3. Generate forecast hour list using high-frequency and standard intervals + +Example for 00z with FHMAX_GFS_00 = 384: +- Hours 0-48: Every 3 hours (high-frequency) +- Hours 48-384: Every 6 hours (standard) +- Total: 73 forecast hours + +Example for 06z with FHMAX_GFS_06 = 180: +- Hours 0-48: Every 3 hours (high-frequency) +- Hours 48-180: Every 6 hours (standard) +- Total: 39 forecast hours diff --git a/dev/workflow/rocoto/gfs_cycled_xml.py b/dev/workflow/rocoto/gfs_cycled_xml.py index 6b4542251de..96c31cff2b8 100644 --- a/dev/workflow/rocoto/gfs_cycled_xml.py +++ b/dev/workflow/rocoto/gfs_cycled_xml.py @@ -79,6 +79,28 @@ def get_cycledefs(self): interval_gfs_str = timedelta_to_HMS(interval_gfs) strings.append(f'\t{sdate_gfs_str} {edate_gfs_str} {interval_gfs_str}') + # Create cycle-specific cycledefs for GFS (00z, 06z, 12z, 18z) + # This allows different forecast lengths for different cycles + if interval_gfs <= to_timedelta('6H'): + for cyc in ['00', '06', '12', '18']: + # Find first occurrence of this cycle hour at or after sdate_gfs + cyc_hour = int(cyc) + sdate_cyc = sdate_gfs.replace(hour=cyc_hour) + if sdate_cyc < sdate_gfs: + # Move to next day if we're past this hour + sdate_cyc = sdate_cyc + to_timedelta('24H') + # Find last occurrence at or before edate_gfs + edate_cyc = edate_gfs.replace(hour=cyc_hour) + if edate_cyc > edate_gfs: + # Move back a day if we're past the end date + edate_cyc = edate_cyc - to_timedelta('24H') + + if sdate_cyc <= edate_cyc: + sdate_cyc_str = sdate_cyc.strftime("%Y%m%d%H%M") + edate_cyc_str = edate_cyc.strftime("%Y%m%d%H%M") + interval_cyc_str = timedelta_to_HMS(to_timedelta('24H')) + strings.append(f'\t{sdate_cyc_str} {edate_cyc_str} {interval_cyc_str}') + date2_gfs = sdate_gfs + interval_gfs date2_gfs_str = date2_gfs.strftime("%Y%m%d%H%M") if date2_gfs <= edate_gfs: diff --git a/dev/workflow/rocoto/gfs_tasks.py b/dev/workflow/rocoto/gfs_tasks.py index e5c95f8510e..165a470443e 100644 --- a/dev/workflow/rocoto/gfs_tasks.py +++ b/dev/workflow/rocoto/gfs_tasks.py @@ -1160,50 +1160,122 @@ def _upptask(self, upp_run="forecast", task_id="atmupp"): if upp_run not in VALID_UPP_RUN: raise KeyError(f"{upp_run} is invalid; UPP_RUN options are: {('|').join(VALID_UPP_RUN)}") - postenvars = self.envars.copy() - postenvar_dict = {'FHR3': '#fhr#', - 'UPP_RUN': upp_run} - for key, value in postenvar_dict.items(): - postenvars.append(rocoto.create_envar(name=key, value=str(value))) - - atm_hist_path = self._template_to_rocoto_cycstring(self._base["COM_ATMOS_HISTORY_TMPL"]) - deps = [] - data = f'{atm_hist_path}/{self.run}.t@Hz.atmf#fhr#.nc' - dep_dict = {'type': 'data', 'data': data, 'age': 120} - deps.append(rocoto.add_dependency(dep_dict)) - data = f'{atm_hist_path}/{self.run}.t@Hz.sfcf#fhr#.nc' - dep_dict = {'type': 'data', 'data': data, 'age': 120} - deps.append(rocoto.add_dependency(dep_dict)) - data = f'{atm_hist_path}/{self.run}.t@Hz.atm.logf#fhr#.txt' - dep_dict = {'type': 'data', 'data': data, 'age': 60} - deps.append(rocoto.add_dependency(dep_dict)) - dependencies = rocoto.create_dependency(dep=deps, dep_condition='and') - cycledef = 'gdas_half,gdas' if self.run in ['gdas'] else self.run - resources = self.get_resource('upp') - - task_name = f'{self.run}_{task_id}_f#fhr#' - task_dict = {'task_name': task_name, - 'resources': resources, - 'dependency': dependencies, - 'envars': postenvars, - 'cycledef': cycledef, - 'command': f'{self.HOMEgfs}/dev/jobs/upp.sh', - 'job_name': f'{self.pslot}_{task_name}_@H', - 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', - 'maxtries': '&MAXTRIES;' - } + # Check if we have cycle-specific FHMAX values for GFS + use_cycle_specific = False + cycles = [] + if self.run in ['gfs']: + # Check if any cycle-specific FHMAX values are defined + cycle_fhmax_defined = any([f'FHMAX_GFS_{cyc}' in self._configs['upp'] for cyc in ['00', '06', '12', '18']]) + if cycle_fhmax_defined: + # Check if they're different from each other + fhmax_values = set() + for cyc in ['00', '06', '12', '18']: + fhmax = self._get_cycle_specific_fhmax(self._configs['upp'], cyc) + fhmax_values.add(fhmax) + + # If there are different FHMAX values, use cycle-specific tasks + if len(fhmax_values) > 1: + use_cycle_specific = True + cycles = ['00', '06', '12', '18'] + + if use_cycle_specific: + # Create cycle-specific tasks + all_tasks = [] + for cyc in cycles: + fhrs = self._get_forecast_hours_for_cycle(self.run, self._configs['upp'], cyc) + if not fhrs: + continue + + postenvars = self.envars.copy() + postenvar_dict = {'FHR3': '#fhr#', 'UPP_RUN': upp_run} + for key, value in postenvar_dict.items(): + postenvars.append(rocoto.create_envar(name=key, value=str(value))) + + atm_hist_path = self._template_to_rocoto_cycstring(self._base["COM_ATMOS_HISTORY_TMPL"]) + deps = [] + data = f'{atm_hist_path}/{self.run}.t@Hz.atmf#fhr#.nc' + dep_dict = {'type': 'data', 'data': data, 'age': 120} + deps.append(rocoto.add_dependency(dep_dict)) + data = f'{atm_hist_path}/{self.run}.t@Hz.sfcf#fhr#.nc' + dep_dict = {'type': 'data', 'data': data, 'age': 120} + deps.append(rocoto.add_dependency(dep_dict)) + data = f'{atm_hist_path}/{self.run}.t@Hz.atm.logf#fhr#.txt' + dep_dict = {'type': 'data', 'data': data, 'age': 60} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps, dep_condition='and') + + cycledef = f'{self.run}_{cyc}' + resources = self.get_resource('upp') + + task_name = f'{self.run}_{task_id}_{cyc}_f#fhr#' + task_dict = {'task_name': task_name, + 'resources': resources, + 'dependency': dependencies, + 'envars': postenvars, + 'cycledef': cycledef, + 'command': f'{self.HOMEgfs}/dev/jobs/upp.sh', + 'job_name': f'{self.pslot}_{task_name}_@H', + 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', + 'maxtries': '&MAXTRIES;' + } + + fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} + + metatask_dict = {'task_name': f'{self.run}_{task_id}_{cyc}', + 'task_dict': task_dict, + 'var_dict': fhr_var_dict + } + + cycle_task = rocoto.create_task(metatask_dict) + all_tasks.extend(cycle_task) + + return all_tasks + else: + # Standard implementation for all cycles + postenvars = self.envars.copy() + postenvar_dict = {'FHR3': '#fhr#', + 'UPP_RUN': upp_run} + for key, value in postenvar_dict.items(): + postenvars.append(rocoto.create_envar(name=key, value=str(value))) + + atm_hist_path = self._template_to_rocoto_cycstring(self._base["COM_ATMOS_HISTORY_TMPL"]) + deps = [] + data = f'{atm_hist_path}/{self.run}.t@Hz.atmf#fhr#.nc' + dep_dict = {'type': 'data', 'data': data, 'age': 120} + deps.append(rocoto.add_dependency(dep_dict)) + data = f'{atm_hist_path}/{self.run}.t@Hz.sfcf#fhr#.nc' + dep_dict = {'type': 'data', 'data': data, 'age': 120} + deps.append(rocoto.add_dependency(dep_dict)) + data = f'{atm_hist_path}/{self.run}.t@Hz.atm.logf#fhr#.txt' + dep_dict = {'type': 'data', 'data': data, 'age': 60} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps, dep_condition='and') + cycledef = 'gdas_half,gdas' if self.run in ['gdas'] else self.run + resources = self.get_resource('upp') + + task_name = f'{self.run}_{task_id}_f#fhr#' + task_dict = {'task_name': task_name, + 'resources': resources, + 'dependency': dependencies, + 'envars': postenvars, + 'cycledef': cycledef, + 'command': f'{self.HOMEgfs}/dev/jobs/upp.sh', + 'job_name': f'{self.pslot}_{task_name}_@H', + 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', + 'maxtries': '&MAXTRIES;' + } - fhrs = self._get_forecast_hours(self.run, self._configs['upp']) - fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} + fhrs = self._get_forecast_hours(self.run, self._configs['upp']) + fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} - metatask_dict = {'task_name': f'{self.run}_{task_id}', - 'task_dict': task_dict, - 'var_dict': fhr_var_dict - } + metatask_dict = {'task_name': f'{self.run}_{task_id}', + 'task_dict': task_dict, + 'var_dict': fhr_var_dict + } - task = rocoto.create_task(metatask_dict) + task = rocoto.create_task(metatask_dict) - return task + return task def atmos_prod(self): return self._atmosoceaniceprod('atmos') @@ -1234,6 +1306,40 @@ def _atmosoceaniceprod(self, component: str): max_tasks = self._configs[config]['MAX_TASKS'] resources = self.get_resource(component_dict['config']) + # Check if we have cycle-specific FHMAX values for GFS + use_cycle_specific = False + cycles = [] + if self.run in ['gfs']: + # Check if any cycle-specific FHMAX values are defined + cycle_fhmax_defined = any([f'FHMAX_GFS_{cyc}' in self._configs[config] for cyc in ['00', '06', '12', '18']]) + if cycle_fhmax_defined: + # Check if they're different from each other + fhmax_values = set() + for cyc in ['00', '06', '12', '18']: + fhmax = self._get_cycle_specific_fhmax(self._configs[config], cyc) + fhmax_values.add(fhmax) + + # If there are different FHMAX values, use cycle-specific tasks + if len(fhmax_values) > 1: + use_cycle_specific = True + cycles = ['00', '06', '12', '18'] + + if use_cycle_specific: + # Create cycle-specific metatasks + return self._create_cycle_specific_prod_tasks(component, component_dict, cycles) + else: + # Create standard metatask for all cycles + return self._create_standard_prod_task(component, component_dict) + + def _create_standard_prod_task(self, component: str, component_dict: dict): + """Create standard product task that works for all cycles.""" + config = component_dict['config'] + history_path_tmpl = component_dict['history_path_tmpl'] + history_file_tmpl = component_dict['history_file_tmpl'] + + max_tasks = self._configs[config]['MAX_TASKS'] + resources = self.get_resource(config) + fhrs = self._get_forecast_hours(self.run, self._configs[config], component) # ocean/ice components do not have fhr 0 as they are averaged output @@ -1289,6 +1395,80 @@ def _atmosoceaniceprod(self, component: str): return task + def _create_cycle_specific_prod_tasks(self, component: str, component_dict: dict, cycles: list): + """Create cycle-specific product tasks with different forecast lengths.""" + config = component_dict['config'] + history_path_tmpl = component_dict['history_path_tmpl'] + history_file_tmpl = component_dict['history_file_tmpl'] + + max_tasks = self._configs[config]['MAX_TASKS'] + + # Create a metatask that contains cycle-specific tasks + all_tasks = [] + + for cyc in cycles: + # Get cycle-specific forecast hours + fhrs = self._get_forecast_hours_for_cycle(self.run, self._configs[config], cyc, component) + + # ocean/ice components do not have fhr 0 as they are averaged output + if component in ['ocean', 'ice'] and 0 in fhrs: + fhrs.remove(0) + + # Skip if no forecast hours for this cycle + if not fhrs: + continue + + fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) + + # Delay triggering ocean products task to next next forecast hour to ensure all data is available + if component == 'ocean': + fhr3_next = fhr_var_dict['fhr3_next'].split(' ') + fhr3_nextp1 = fhr3_next[1:] + fhr3_nextp1.append(fhr3_next[-1]) # repeat last forecast hour to maintain same number of groups + fhr_var_dict['fhr3_nextp1'] = ' '.join(fhr3_nextp1) + + # Adjust walltime based on the largest group + resources = self.get_resource(config).copy() + largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) + resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) + + postenvars = self.envars.copy() + postenvar_dict = {'FHR_LIST': '#fhr_list#', 'COMPONENT': component} + for key, value in postenvar_dict.items(): + postenvars.append(rocoto.create_envar(name=key, value=str(value))) + + history_path = self._template_to_rocoto_cycstring(self._base[history_path_tmpl]) + deps = [] + data = f'{history_path}/{history_file_tmpl}' + dep_dict = {'type': 'data', 'data': data, 'age': 120} + deps.append(rocoto.add_dependency(dep_dict)) + dep_dict = {'type': 'metatask', 'name': f'{self.run}_fcst'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep=deps, dep_condition='or') + + cycledef = f'{self.run}_{cyc}' + + task_name = f'{self.run}_{component}_prod_{cyc}_#fhr_label#' + task_dict = {'task_name': task_name, + 'resources': resources, + 'dependency': dependencies, + 'envars': postenvars, + 'cycledef': cycledef, + 'command': f"{self.HOMEgfs}/dev/jobs/{config}.sh", + 'job_name': f'{self.pslot}_{task_name}_@H', + 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', + 'maxtries': '&MAXTRIES;' + } + + metatask_dict = {'task_name': f'{self.run}_{component}_prod_{cyc}', + 'task_dict': task_dict, + 'var_dict': fhr_var_dict} + + cycle_task = rocoto.create_task(metatask_dict) + all_tasks.extend(cycle_task) + + return all_tasks + def wavepostsbs(self): wave_grid = self._configs['base']['waveGRD'] diff --git a/dev/workflow/rocoto/tasks.py b/dev/workflow/rocoto/tasks.py index 79c380ffeb2..96d0b8d5b9c 100644 --- a/dev/workflow/rocoto/tasks.py +++ b/dev/workflow/rocoto/tasks.py @@ -170,6 +170,34 @@ def _template_to_rocoto_cycstring(self, template: str, subs_dict: dict = {}) -> TemplateConstants.DOLLAR_CURLY_BRACE, rocoto_conversion_dict.get) + @staticmethod + def _get_cycle_specific_fhmax(config, cyc=None): + """ + Get cycle-specific FHMAX_GFS value if available. + + Parameters + ---------- + config : dict + Configuration dictionary containing FHMAX parameters + cyc : str, optional + Cycle hour (00, 06, 12, 18). If None, returns default FHMAX_GFS + + Returns + ------- + int + Maximum forecast hour for the specified cycle + """ + if cyc is None: + return config.get('FHMAX_GFS', 120) + + # Check for cycle-specific FHMAX (e.g., FHMAX_GFS_00, FHMAX_GFS_12) + cycle_fhmax_key = f'FHMAX_GFS_{cyc}' + if cycle_fhmax_key in config: + return config[cycle_fhmax_key] + + # Fall back to default FHMAX_GFS + return config.get('FHMAX_GFS', 120) + @staticmethod def _get_forecast_hours(run, config, component='atmos') -> List[str]: # Make a local copy of the config to avoid modifying the original @@ -212,6 +240,38 @@ def _get_forecast_hours(run, config, component='atmos') -> List[str]: return fhrs + @staticmethod + def _get_forecast_hours_for_cycle(run, config, cyc, component='atmos') -> List[str]: + """ + Get forecast hours for a specific cycle, using cycle-specific FHMAX if available. + + Parameters + ---------- + run : str + Run name (gfs, gdas, etc.) + config : dict + Configuration dictionary + cyc : str + Cycle hour (00, 06, 12, 18) + component : str, optional + Component name (atmos, ocean, ice, wave) + + Returns + ------- + List[str] + List of forecast hours for the specified cycle + """ + # Make a local copy of the config to avoid modifying the original + local_config = config.copy() + + # Get cycle-specific FHMAX if available + if run in ['gfs', 'gefs', 'sfs', 'gcafs']: + cycle_fhmax = Tasks._get_cycle_specific_fhmax(config, cyc) + local_config['FHMAX_GFS'] = cycle_fhmax + + # Use the standard method to get forecast hours with the updated config + return Tasks._get_forecast_hours(run, local_config, component) + @staticmethod def get_job_groups(fhrs: List[int], ngroups: int, breakpoints: List[int] = None) -> List[dict]: '''