diff --git a/env/HERA.env b/env/HERA.env index 2157e900313..94bab36703a 100755 --- a/env/HERA.env +++ b/env/HERA.env @@ -102,6 +102,13 @@ elif [[ "${step}" = "atmanlfv3inc" ]]; then [[ ${NTHREADS_ATMANLFV3INC} -gt ${nth_max} ]] && export NTHREADS_ATMANLFV3INC=${nth_max} export APRUN_ATMANLFV3INC="${launcher} -n ${npe_atmanlfv3inc} --cpus-per-task=${NTHREADS_ATMANLFV3INC}" +elif [[ "${step}" = "prepobsaero" ]]; then + + nth_max=$((npe_node_max / npe_node_prepobsaero)) + + export NTHREADS_PREPOBSAERO=${nth_prepobsaero:-1} + export APRUN_PREPOBSAERO="${launcher} -n ${npe_prepobsaero} --cpus-per-task=${NTHREADS_PREPOBSAERO}" + elif [[ "${step}" = "snowanl" ]]; then nth_max=$((npe_node_max / npe_node_snowanl)) diff --git a/env/HERCULES.env b/env/HERCULES.env index 0824ba913a6..d43dedad8d8 100755 --- a/env/HERCULES.env +++ b/env/HERCULES.env @@ -99,6 +99,12 @@ case ${step} in [[ ${NTHREADS_AEROANL} -gt ${nth_max} ]] && export NTHREADS_AEROANL=${nth_max} export APRUN_AEROANL="${launcher} -n ${npe_aeroanlrun} --cpus-per-task=${NTHREADS_AEROANL}" ;; + "prepobsaero") + nth_max=$((npe_node_max / npe_node_prepobsaero)) + + export NTHREADS_PREPOBSAERO=${nth_prepobsaero:-1} + export APRUN_PREPOBSAERO="${launcher} -n ${npe_prepobsaero} --cpus-per-task=${NTHREADS_PREPOBSAERO}" +;; "snowanl") nth_max=$((npe_node_max / npe_node_snowanl)) diff --git a/env/JET.env b/env/JET.env index 5bd88dc93a2..668ec1c2e4d 100755 --- a/env/JET.env +++ b/env/JET.env @@ -82,6 +82,13 @@ elif [[ "${step}" = "aeroanlrun" ]]; then [[ ${NTHREADS_AEROANL} -gt ${nth_max} ]] && export NTHREADS_AEROANL=${nth_max} export APRUN_AEROANL="${launcher} -n ${npe_aeroanlrun}" +elif [[ "${step}" = "prepobsaero" ]]; then + + nth_max=$((npe_node_max / npe_node_prepobsaero)) + + export NTHREADS_PREPOBSAERO=${nth_prepobsaero:-1} + export APRUN_PREPOBSAERO="${launcher} -n ${npe_prepobsaero} --cpus-per-task=${NTHREADS_PREPOBSAERO}" + elif [[ "${step}" = "snowanl" ]]; then nth_max=$((npe_node_max / npe_node_snowanl)) diff --git a/env/ORION.env b/env/ORION.env index f701e55aa2e..afd1cda0521 100755 --- a/env/ORION.env +++ b/env/ORION.env @@ -90,6 +90,13 @@ elif [[ "${step}" = "aeroanlrun" ]]; then [[ ${NTHREADS_AEROANL} -gt ${nth_max} ]] && export NTHREADS_AEROANL=${nth_max} export APRUN_AEROANL="${launcher} -n ${npe_aeroanlrun} --cpus-per-task=${NTHREADS_AEROANL}" +elif [[ "${step}" = "prepobsaero" ]]; then + + nth_max=$((npe_node_max / npe_node_prepobsaero)) + + export NTHREADS_PREPOBSAERO=${nth_prepobsaero:-1} + export APRUN_PREPOBSAERO="${launcher} -n ${npe_prepobsaero} --cpus-per-task=${NTHREADS_PREPOBSAERO}" + elif [[ "${step}" = "snowanl" ]]; then nth_max=$((npe_node_max / npe_node_snowanl)) diff --git a/env/S4.env b/env/S4.env index 9ba3a61b01c..8a368bf1d67 100755 --- a/env/S4.env +++ b/env/S4.env @@ -82,6 +82,13 @@ elif [[ "${step}" = "aeroanlrun" ]]; then [[ ${NTHREADS_AEROANL} -gt ${nth_max} ]] && export NTHREADS_AEROANL=${nth_max} export APRUN_AEROANL="${launcher} -n ${npe_aeroanlrun}" +elif [[ "${step}" = "prepobsaero" ]]; then + + nth_max=$((npe_node_max / npe_node_prepobsaero)) + + export NTHREADS_PREPOBSAERO=${nth_prepobsaero:-1} + export APRUN_PREPOBSAERO="${launcher} -n ${npe_prepobsaero} --cpus-per-task=${NTHREADS_PREPOBSAERO}" + elif [[ "${step}" = "snowanl" ]]; then nth_max=$((npe_node_max / npe_node_snowanl)) diff --git a/env/WCOSS2.env b/env/WCOSS2.env index 0876e4127dc..9fe9179e6b0 100755 --- a/env/WCOSS2.env +++ b/env/WCOSS2.env @@ -76,6 +76,13 @@ elif [[ "${step}" = "aeroanlrun" ]]; then [[ ${NTHREADS_AEROANL} -gt ${nth_max} ]] && export NTHREADS_AEROANL=${nth_max} export APRUN_AEROANL="${launcher} -n ${npe_aeroanlrun}" +elif [[ "${step}" = "prepobsaero" ]]; then + + nth_max=$((npe_node_max / npe_node_prepaeroobs)) + + export NTHREADS_PREPOBSAERO=${nth_prepobsaero:-1} + export APRUN_PREPOBSAERO="${launcher} -n ${npe_prepobsaero} --ppn ${npe_node_prepobsaero}--cpu-bind depth --depth=${NTHREADS_PREPOBSAERO}" + elif [[ "${step}" = "snowanl" ]]; then nth_max=$((npe_node_max / npe_node_snowanl)) diff --git a/jobs/JGLOBAL_PREP_OBS_AERO b/jobs/JGLOBAL_PREP_OBS_AERO new file mode 100755 index 00000000000..7fe701898f2 --- /dev/null +++ b/jobs/JGLOBAL_PREP_OBS_AERO @@ -0,0 +1,43 @@ +#! /usr/bin/env bash + +source "${HOMEgfs}/ush/preamble.sh" +source "${HOMEgfs}/ush/jjob_header.sh" -e "prepobsaero" -c "base prepobsaero" + +############################################## +# Set variables used in the script +############################################## + +export COMIN_OBS="${DATA}" +YMD=${PDY} HH=${cyc} declare_from_tmpl -rx COMOUT_OBS:COM_OBS_TMPL + +############################################## +# Begin JOB SPECIFIC work +############################################## + +############################################################### +# Run relevant script + +EXSCRIPT=${GDASPREPAEROOBSPY:-${SCRgfs}/exglobal_prep_obs_aero.py} +${EXSCRIPT} +status=$? +[[ ${status} -ne 0 ]] && exit "${status}" + + +############################################## +# End JOB SPECIFIC work +############################################## + +############################################## +# Final processing +############################################## +if [[ -e "${pgmout}" ]] ; then + cat "${pgmout}" +fi + +########################################## +# Remove the Temporary working directory +########################################## +cd "${DATAROOT}" || exit +[[ "${KEEPDATA}" = "NO" ]] && rm -rf "${DATA}" + +exit 0 diff --git a/jobs/rocoto/prepobsaero.sh b/jobs/rocoto/prepobsaero.sh new file mode 100755 index 00000000000..89da7547e8d --- /dev/null +++ b/jobs/rocoto/prepobsaero.sh @@ -0,0 +1,24 @@ +#! /usr/bin/env bash + +source "${HOMEgfs}/ush/preamble.sh" + +############################################################### +# Source UFSDA workflow modules +. "${HOMEgfs}/ush/load_ufsda_modules.sh" +status=$? +[[ ${status} -ne 0 ]] && exit "${status}" + +export job="prepobsaero" +export jobid="${job}.$$" + +############################################################### +# setup python path for workflow utilities and tasks +wxflowPATH="${HOMEgfs}/ush/python:${HOMEgfs}/ush/python/wxflow/src" +PYTHONPATH="${PYTHONPATH:+${PYTHONPATH}:}${wxflowPATH}" +export PYTHONPATH + +############################################################### +# Execute the JJOB +"${HOMEgfs}/jobs/JGLOBAL_PREP_OBS_AERO" +status=$? +exit "${status}" diff --git a/parm/archive/arcdir.yaml.j2 b/parm/archive/arcdir.yaml.j2 index fab93cc2a41..1ed7422761a 100644 --- a/parm/archive/arcdir.yaml.j2 +++ b/parm/archive/arcdir.yaml.j2 @@ -47,6 +47,11 @@ Deterministic: &deterministic - ["{{ COM_CHEM_ANALYSIS }}/{{ head }}aerostat", "{{ ARCDIR }}/aerostat.{{ RUN }}.{{ cycle_YMDH }}"] {% endif %} + {% if DO_PREP_OBS_AERO %} + - ["{{ COM_OBS }}/{{ head }}aeroobs", "{{ ARCDIR }}/aeroobs.{{ RUN }}.{{ cycle_YMDH }}"] + - ["{{ COM_OBS }}/{{ head }}aerorawobs", "{{ ARCDIR }}/aerorawobs.{{ RUN }}.{{ cycle_YMDH }}"] + {% endif %} + {% endif %} {% if RUN == "gfs" %} diff --git a/parm/archive/gdas.yaml.j2 b/parm/archive/gdas.yaml.j2 index fd5e0741116..26540156cd9 100644 --- a/parm/archive/gdas.yaml.j2 +++ b/parm/archive/gdas.yaml.j2 @@ -69,6 +69,10 @@ gdas: {% if AERO_ANL_CDUMP == "gdas" or AERO_ANL_CDUMP == "both" %} - "{{ COM_CHEM_ANALYSIS | relpath(ROTDIR) }}/{{ head }}aerostat" {% endif %} + {% if DO_PREP_OBS_AERO %} + - "{{ COM_OBS | relpath(ROTDIR) }}/{{ head }}aeroobs" + - "{{ COM_OBS | relpath(ROTDIR) }}/{{ head }}aerorawobs" + {% endif %} {% if DO_JEDISNOWDA %} - "{{ COM_SNOW_ANALYSIS | relpath(ROTDIR) }}/{{ head }}snowstat.tgz" {% endif %} diff --git a/parm/archive/gfsa.yaml.j2 b/parm/archive/gfsa.yaml.j2 index df90a1a71e3..0a8e65d3ef1 100644 --- a/parm/archive/gfsa.yaml.j2 +++ b/parm/archive/gfsa.yaml.j2 @@ -40,6 +40,10 @@ gfsa: {% if AERO_ANL_CDUMP == "gfs" or AERO_ANL_CDUMP == "both" %} - "{{ COM_CHEM_ANALYSIS | relpath(ROTDIR) }}/{{ head }}aerostat" {% endif %} + {% if DO_PREP_OBS_AERO %} + - "{{ COM_OBS | relpath(ROTDIR) }}/{{ head }}aeroobs" + - "{{ COM_OBS | relpath(ROTDIR) }}/{{ head }}aerorawobs" + {% endif %} # BUFR inputs - "{{ COM_OBS | relpath(ROTDIR) }}/{{ head }}nsstbufr" diff --git a/parm/config/gfs/config.base b/parm/config/gfs/config.base index 6cc1b6d7448..fd1ffdc415a 100644 --- a/parm/config/gfs/config.base +++ b/parm/config/gfs/config.base @@ -178,6 +178,7 @@ export DO_WAVE="NO" export DO_OCN="NO" export DO_ICE="NO" export DO_AERO="NO" +export DO_PREP_OBS_AERO="NO" export AERO_FCST_CDUMP="" # When to run aerosol forecast: gdas, gfs, or both export AERO_ANL_CDUMP="" # When to run aerosol analysis: gdas, gfs, or both export WAVE_CDUMP="" # When to include wave suite: gdas, gfs, or both diff --git a/parm/config/gfs/config.prepobsaero b/parm/config/gfs/config.prepobsaero new file mode 100644 index 00000000000..f70138991c8 --- /dev/null +++ b/parm/config/gfs/config.prepobsaero @@ -0,0 +1,17 @@ +#!/bin/bash -x + +########## config.prepobsaero ########## +# Prepare and thin/superob aerosol observations + +echo "BEGIN: config.prepobsaero" + +# Get task specific resources +source "${EXPDIR}/config.resources" prepobsaero + +export OBSPROCYAML="${PARMgfs}/gdas/aero/obs/lists/gdas_aero_obsproc.yaml.j2" +export OBSPROCEXE="${EXECgfs}/gdas_obsprovider2ioda.x" +export VIIRS_DATA_DIR="/scratch2/NCEPDEV/stmp3/Yaping.Wang/VIIRS/AWS/" +export SENSORS="npp,n20" + + +echo "END: config.prepaeroobs" diff --git a/parm/config/gfs/config.resources b/parm/config/gfs/config.resources index 0972f74f9cc..1d1fd2e3c15 100644 --- a/parm/config/gfs/config.resources +++ b/parm/config/gfs/config.resources @@ -13,7 +13,7 @@ if (( $# != 1 )); then echo "atmanlinit atmanlvar atmanlfv3inc atmanlfinal" echo "atmensanlinit atmensanlletkf atmensanlfv3inc atmensanlfinal" echo "snowanl" - echo "aeroanlinit aeroanlrun aeroanlfinal" + echo "prepobsaero aeroanlinit aeroanlrun aeroanlfinal" echo "anal sfcanl analcalc analdiag fcst echgres" echo "upp atmos_products" echo "tracker genesis genesis_fsu" @@ -289,6 +289,14 @@ case ${step} in export npe_node_snowanl=$(( npe_node_max / nth_snowanl )) ;; + "prepobsaero") + export wtime_prepobsaero="00:30:00" + export npe_prepobsaero=1 + export nth_prepobsaero=1 + export npe_node_prepobsaero=1 + export memory_prepobsaero="96GB" + ;; + "aeroanlinit") # below lines are for creating JEDI YAML case ${CASE} in diff --git a/scripts/exglobal_archive.py b/scripts/exglobal_archive.py index e38d0abf72b..bcd8d522d9c 100755 --- a/scripts/exglobal_archive.py +++ b/scripts/exglobal_archive.py @@ -19,8 +19,8 @@ def main(): # Pull out all the configuration keys needed to run the rest of archive steps keys = ['ATARDIR', 'current_cycle', 'FHMIN', 'FHMAX', 'FHOUT', 'RUN', 'PDY', - 'DO_VERFRAD', 'DO_VMINMON', 'DO_VERFOZN', 'DO_ICE', 'DO_AERO', 'PARMgfs', - 'DO_OCN', 'DO_WAVE', 'WRITE_DOPOST', 'PSLOT', 'HPSSARCH', 'DO_MOS', + 'DO_VERFRAD', 'DO_VMINMON', 'DO_VERFOZN', 'DO_ICE', 'DO_AERO', 'DO_PREP_OBS_AERO', + 'PARMgfs', 'DO_OCN', 'DO_WAVE', 'WRITE_DOPOST', 'PSLOT', 'HPSSARCH', 'DO_MOS', 'DO_JEDISNOWDA', 'LOCALARCH', 'REALTIME', 'ROTDIR', 'ARCH_WARMICFREQ', 'ARCH_FCSTICFREQ', 'ARCH_CYC', 'assim_freq', 'ARCDIR', 'SDATE', 'FHMIN_GFS', 'FHMAX_GFS', 'FHOUT_GFS', 'ARCH_GAUSSIAN', 'MODE', diff --git a/scripts/exglobal_prep_obs_aero.py b/scripts/exglobal_prep_obs_aero.py new file mode 100755 index 00000000000..08548e68749 --- /dev/null +++ b/scripts/exglobal_prep_obs_aero.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 +# exglobal_prep_obs_aero.py +# This script collect available viirs +# obs files, combine and preprocess +# them. +import os + +from wxflow import Logger, cast_strdict_as_dtypedict +from pygfs.task.aero_prepobs import AerosolObsPrep + +# Initialize root logger +logger = Logger(level='DEBUG', colored_log=True) + + +if __name__ == '__main__': + + # Take configuration from environment and cast it as python dictionary + config = cast_strdict_as_dtypedict(os.environ) + + AeroObs = AerosolObsPrep(config) + AeroObs.initialize() + AeroObs.runConverter() + AeroObs.finalize() diff --git a/ush/python/pygfs/task/aero_prepobs.py b/ush/python/pygfs/task/aero_prepobs.py new file mode 100644 index 00000000000..f2344241a92 --- /dev/null +++ b/ush/python/pygfs/task/aero_prepobs.py @@ -0,0 +1,236 @@ +#!/usr/bin/env python3 + +import os +import glob +import gzip +import tarfile +import re +from logging import getLogger +from typing import List, Dict, Any, Union + +from wxflow import (AttrDict, FileHandler, rm_p, rmdir, + Task, add_to_datetime, to_timedelta, to_datetime, + datetime_to_YMD, + chdir, Executable, WorkflowException, + parse_j2yaml, save_as_yaml, logit) + +logger = getLogger(__name__.split('.')[-1]) + + +class AerosolObsPrep(Task): + """ + Class for preparing and managing aerosol observations + """ + def __init__(self, config: Dict[str, Any]) -> None: + super().__init__(config) + + _window_begin = add_to_datetime(self.runtime_config.current_cycle, -to_timedelta(f"{self.config['assim_freq']}H") / 2) + _window_end = add_to_datetime(self.runtime_config.current_cycle, +to_timedelta(f"{self.config['assim_freq']}H") / 2) + + local_dict = AttrDict( + { + 'window_begin': _window_begin, + 'window_end': _window_end, + 'sensors': str(self.config['SENSORS']).split(','), + 'data_dir': self.config['VIIRS_DATA_DIR'], + 'input_files': '', + 'OPREFIX': f"{self.runtime_config.RUN}.t{self.runtime_config.cyc:02d}z.", + 'APREFIX': f"{self.runtime_config.RUN}.t{self.runtime_config.cyc:02d}z." + } + ) + + # task_config is everything that this task should need + self.task_config = AttrDict(**self.config, **self.runtime_config, **local_dict) + + @logit(logger) + def initialize(self) -> None: + """ + List needed raw obs files. + Copy the raw obs files to $DATA/obs. + Link over the needed executable. + Generate corresponding YAML file. + Run IODA converter. + """ + self.task_config.DATA_OBS = os.path.join(self.task_config.DATA, 'obs') + if os.path.exists(self.task_config.DATA_OBS): + rmdir(self.task_config.DATA_OBS) + FileHandler({'mkdir': [self.task_config.DATA_OBS]}).sync() + + self.task_config.prepaero_yaml = [] + for sensor in self.task_config.sensors: + raw_files = self.list_raw_files(sensor) + self.task_config.input_files = self.copy_obs(raw_files) + self.link_obsconvexe() + self.task_config.prepaero_config = self.get_obsproc_config(sensor) + + # generate converter YAML file + template = f"{self.runtime_config.CDUMP}.t{self.runtime_config['cyc']:02d}z.prepaero_viirs_{sensor}.yaml" + _prepaero_yaml = os.path.join(self.runtime_config.DATA, template) + self.task_config.prepaero_yaml.append(_prepaero_yaml) + logger.debug(f"Generate PrepAeroObs YAML file: {_prepaero_yaml}") + save_as_yaml(self.task_config.prepaero_config, _prepaero_yaml) + logger.info(f"Wrote PrepAeroObs YAML to: {_prepaero_yaml}") + + @logit(logger) + def list_raw_files(self, sensor) -> List[str]: + """ + List all files in the predefined directory that match the predefined sensor and within the time window. + """ + if sensor == 'n20': + sensor = 'j01' + dir1 = os.path.join(self.task_config.data_dir, datetime_to_YMD(self.task_config.window_begin)) + dir2 = os.path.join(self.task_config.data_dir, datetime_to_YMD(self.task_config.window_end)) + + if dir1 == dir2: + files = os.listdir(dir1) + allfiles = [os.path.join(dir1, file) for file in files] + allfiles.sort() + else: + files_1 = os.listdir(dir1) + allfiles_1 = [os.path.join(dir1, file) for file in files_1] + files_2 = os.listdir(dir2) + allfiles_2 = [os.path.join(dir2, file) for file in files_2] + allfiles = sorted(allfiles_1, allfiles_2) + matching_files = [] + try: + for file in allfiles: + basename = os.path.basename(file) + pattern = r"s(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{3})" + match = re.match(pattern, basename.split('_')[3]) + yyyy, mm, dd, HH, MM = match.group(1), match.group(2), match.group(3), match.group(4), match.group(5) + fstart = to_datetime(f'{yyyy}-{mm}-{dd}T{HH}:{MM}Z') + if sensor == basename.split('_')[2]: + # temporally select obs files based on time stamp in the filename. + if (fstart > self.task_config.window_begin) and (fstart < self.task_config.window_end): + matching_files.append(os.path.join(self.task_config.data_dir, file)) + logger.info("Found %d matching files.", len(matching_files)) + except FileNotFoundError: + logger.error("The specified file/directory does not exist.") + raise + return matching_files + + @logit(logger) + def copy_obs(self, inputfiles) -> Dict[str, Any]: + """ + Copy the raw obs files to $DATA/obs. + """ + copylist = [] + destlist = [] + for filename in inputfiles: + basename = os.path.basename(filename) + dest = os.path.join(self.task_config.DATA_OBS, basename) + copylist.append([filename, dest]) + destlist.append(dest) + FileHandler({'copy': copylist}).sync() + + return destlist + + @logit(logger) + def get_obsproc_config(self, sensor) -> Dict[str, Any]: + """ + Compile a dictionary of obs proc configuration from OBSPROCYAML template file + Parameters + ---------- + Returns + ---------- + obsproc_config : Dict + a dictionary containing the fully rendered obs proc yaml configuration + """ + self.task_config.sensor = sensor + # generate JEDI YAML file + logger.info(f"Generate gdas_obsprovider2ioda YAML config: {self.task_config.OBSPROCYAML}") + prepaero_config = parse_j2yaml(self.task_config.OBSPROCYAML, self.task_config) + + return prepaero_config + + @logit(logger) + def link_obsconvexe(self) -> None: + """ + This method links the gdas executable to the run directory + Parameters + ---------- + Task: GDAS task + Returns + ---------- + None + """ + exe_src = self.task_config.OBSPROCEXE + + logger.info(f"Link executable {exe_src} to DATA/") + exe_dest = os.path.join(self.task_config.DATA, os.path.basename(exe_src)) + if os.path.exists(exe_dest): + rm_p(exe_dest) + os.symlink(exe_src, exe_dest) + + return + + @logit(logger) + def runConverter(self) -> None: + """ + Run the IODA converter gdas_obsprovider2ioda.x + """ + chdir(self.task_config.DATA) + exec_cmd = Executable(self.task_config.APRUN_PREPOBSAERO) + exec_name = os.path.join(self.task_config.DATA, 'gdas_obsprovider2ioda.x') + exec_cmd.add_default_arg(exec_name) + + for prepaero_yaml in self.task_config.prepaero_yaml: + try: + logger.debug(f"Executing {exec_cmd} on {prepaero_yaml}") + exec_cmd(f"{prepaero_yaml}") + except OSError: + raise OSError(f"Failed to execute {exec_cmd} on {prepaero_yaml}") + except Exception: + raise WorkflowException(f"An error occured during execution of {exec_cmd} on {prepaero_yaml}") + + pass + + @logit(logger) + def finalize(self) -> None: + """ + Copy the output viirs files to COMIN_OBS. + Tar and archive the output files. + Tar and archive the raw obs files. + """ + # get list of viirs files + obsfiles = glob.glob(os.path.join(self.task_config['DATA'], '*viirs*nc4')) + copylist = [] + for obsfile in obsfiles: + basename = os.path.basename(obsfile) + src = os.path.join(self.task_config['DATA'], basename) + dest = os.path.join(self.task_config.COMOUT_OBS, basename) + copylist.append([src, dest]) + FileHandler({'copy': copylist}).sync() + + # gzip the files first + for obsfile in obsfiles: + with open(obsfile, 'rb') as f_in, gzip.open(f"{obsfile}.gz", 'wb') as f_out: + f_out.writelines(f_in) + + aeroobs = os.path.join(self.task_config.COMOUT_OBS, f"{self.task_config['APREFIX']}aeroobs") + # open tar file for writing + with tarfile.open(aeroobs, "w") as archive: + for obsfile in obsfiles: + aeroobsgzip = f"{obsfile}.gz" + archive.add(aeroobsgzip, arcname=os.path.basename(aeroobsgzip)) + # get list of raw viirs L2 files + rawfiles = glob.glob(os.path.join(self.task_config.DATA_OBS, 'JRR-AOD*')) + # gzip the raw L2 files first + for rawfile in rawfiles: + with open(rawfile, 'rb') as f_in, gzip.open(f"{rawfile}.gz", 'wb') as f_out: + f_out.writelines(f_in) + + aerorawobs = os.path.join(self.task_config.COMOUT_OBS, f"{self.task_config['APREFIX']}aerorawobs") + # open tar file for writing + with tarfile.open(aerorawobs, "w") as archive: + for rawfile in rawfiles: + aerorawobsgzip = f"{rawfile}.gz" + archive.add(aerorawobsgzip, arcname=os.path.basename(aerorawobsgzip)) + copylist = [] + for prepaero_yaml in self.task_config.prepaero_yaml: + basename = os.path.basename(prepaero_yaml) + dest = os.path.join(self.task_config.COMOUT_OBS, basename) + copylist.append([prepaero_yaml, dest]) + FileHandler({'copy': copylist}).sync() + + pass diff --git a/workflow/applications/applications.py b/workflow/applications/applications.py index 50a9a7cdd05..6a4d240fe5f 100644 --- a/workflow/applications/applications.py +++ b/workflow/applications/applications.py @@ -51,6 +51,7 @@ def __init__(self, conf: Configuration) -> None: self.do_ocean = _base.get('DO_OCN', False) self.do_ice = _base.get('DO_ICE', False) self.do_aero = _base.get('DO_AERO', False) + self.do_prep_obs_aero = _base.get('DO_PREP_OBS_AERO', False) self.do_bufrsnd = _base.get('DO_BUFRSND', False) self.do_gempak = _base.get('DO_GEMPAK', False) self.do_awips = _base.get('DO_AWIPS', False) diff --git a/workflow/applications/gfs_cycled.py b/workflow/applications/gfs_cycled.py index f7f9b5b5e68..175ddb07bfa 100644 --- a/workflow/applications/gfs_cycled.py +++ b/workflow/applications/gfs_cycled.py @@ -108,6 +108,8 @@ def _get_app_configs(self): if self.do_aero: configs += ['aeroanlinit', 'aeroanlrun', 'aeroanlfinal'] + if self.do_prep_obs_aero: + configs += ['prepobsaero'] if self.do_jedisnowda: configs += ['prepsnowobs', 'snowanl'] @@ -178,6 +180,8 @@ def get_task_names(self): if self.do_aero and 'gdas' in self.aero_anl_cdumps: gdas_tasks += ['aeroanlinit', 'aeroanlrun', 'aeroanlfinal'] + if self.do_prep_obs_aero: + gdas_tasks += ['prepobsaero'] gdas_tasks += ['atmanlupp', 'atmanlprod', 'fcst'] @@ -215,6 +219,8 @@ def get_task_names(self): if self.do_aero and 'gfs' in self.aero_anl_cdumps: gfs_tasks += ['aeroanlinit', 'aeroanlrun', 'aeroanlfinal'] + if self.do_prep_obs_aero: + gfs_tasks += ['prepobsaero'] gfs_tasks += ['atmanlupp', 'atmanlprod', 'fcst'] diff --git a/workflow/rocoto/gfs_tasks.py b/workflow/rocoto/gfs_tasks.py index fa218c6713b..0fd468b3b46 100644 --- a/workflow/rocoto/gfs_tasks.py +++ b/workflow/rocoto/gfs_tasks.py @@ -483,10 +483,35 @@ def atmanlfinal(self): return task + def prepobsaero(self): + deps = [] + dep_dict = {'type': 'task', 'name': f'{self.cdump}prep'} + deps.append(rocoto.add_dependency(dep_dict)) + dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + + resources = self.get_resource('prepobsaero') + task_name = f'{self.cdump}prepobsaero' + task_dict = {'task_name': task_name, + 'resources': resources, + 'dependency': dependencies, + 'envars': self.envars, + 'cycledef': self.cdump.replace('enkf', ''), + 'command': f'{self.HOMEgfs}/jobs/rocoto/prepobsaero.sh', + 'job_name': f'{self.pslot}_{task_name}_@H', + 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', + 'maxtries': '&MAXTRIES;' + } + + task = rocoto.create_task(task_dict) + + return task + def aeroanlinit(self): deps = [] dep_dict = {'type': 'task', 'name': f'{self.cdump}prep'} + if self.app_config.do_prep_obs_aero: + dep_dict = {'type': 'task', 'name': f'{self.cdump}prepobsaero'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)