diff --git a/README.md b/README.md index 7c22e524..5a37a77b 100644 --- a/README.md +++ b/README.md @@ -27,15 +27,14 @@ cd build ctest -R test_obsforge_util ``` - - # Workflow usage ```console source ush/of_setup.sh setup_xml.py --config config.yaml --template obsforge_rocoto_template.xml.j2 --output obsforge.xml ``` -load rocoto +#### Note: +To load `rocoto` on WCOSS2: ``` module use /apps/ops/test/nco/modulefiles/core module load rocoto diff --git a/parm/config.orion.yaml b/parm/config.orion.yaml new file mode 100644 index 00000000..98108e3b --- /dev/null +++ b/parm/config.orion.yaml @@ -0,0 +1,61 @@ +obsforge: + PSLOT: obsforge + HOMEobsforge: /work/noaa/da/gvernier/runs/obsForge + SDATE: 202503141800 + EDATE: 202503150000 + COMROOT: /work/noaa/da/gvernier/runs/test_obsforge/COMROOT + DCOMROOT: /work2/noaa/da/common/lfs/h1/ops/prod/dcom + DATAROOT: /work/noaa/da/gvernier/runs/test_obsforge/RUNDIRS + SCHEDULER: slurm + ACCOUNT: da-cpu + QUEUE: debug + PARTITION: orion + KEEPDATA: NO + assim_freq: 6 + +aoddump: + provider: VIIRSAOD + platforms: ['npp', 'n20', 'n21'] + thinning_threshold: 0 + channel: 4 + preqc: 2 + WALLTIME_AOD_DUMP: '00:10:00' + TASK_GEOM_AOD_DUMP: '1:ppn=1:tpp=1' + MEMORY_AOD_DUMP: 96GB + +marinedump: + providers: + ghrsst: + list: + - sst_viirs_n21_l3u + - sst_viirs_n20_l3u + - sst_viirs_npp_l3u + - sst_avhrrf_ma_l3u + - sst_avhrrf_mb_l3u + - sst_avhrrf_mc_l3u + - sst_ahi_h08_l3c + - sst_abi_g17_l3c + - sst_abi_g16_l3c + qc config: + min: -2.0 + max: 45.0 + stride: 15 + min number of obs: 10 + rads: + list: + - rads_adt_3a + - rads_adt_3b + - rads_adt_6a + - rads_adt_c2 + - rads_adt_j2 + - rads_adt_j3 + - rads_adt_sa + - rads_adt_sw + qc config: + min: -2.0 + max: 3.0 + error ratio: 1.0 + + WALLTIME_MARINE_DUMP: '00:10:00' + TASK_GEOM_MARINE_DUMP: '1:ppn=20:tpp=2' + MEMORY_MARINE_DUMP: 32GB diff --git a/parm/config.yaml b/parm/config.yaml deleted file mode 100644 index 6a21d036..00000000 --- a/parm/config.yaml +++ /dev/null @@ -1,47 +0,0 @@ -obsforge: - PSLOT: realtimeobs_testing - HOMEobsforge: /work2/noaa/da/gvernier/prs/obsForge - SDATE: 202503141800 - EDATE: 202503150000 - COMROOT: /work2/noaa/da/gvernier/prs/obsForge/realtimeobs_testing/COMROOT - DCOMROOT: /work2/noaa/da/common/lfs/h1/ops/prod/dcom - DATAROOT: /work2/noaa/da/gvernier/prs/obsForge/realtimeobs_testing/RUNDIRS - SCHEDULER: slurm - ACCOUNT: da-cpu - QUEUE: debug - PARTITION: hercules - KEEPDATA: NO - assim_freq: 6 - -aoddump: - provider: VIIRSAOD - platforms: ['npp', 'n20', 'n21'] - thinning_threshold: 0 - channel: 4 - preqc: 2 - WALLTIME_AOD_DUMP: '00:30:00' - TASK_GEOM_AOD_DUMP: '1:ppn=1:tpp=1' - MEMORY_AOD_DUMP: 96GB - -marinedump: - providers: - ghrsst: - list: - - sst_viirs_n21_l3u - - sst_viirs_n20_l3u - - sst_viirs_npp_l3u - - sst_avhrrf_ma_l3u - - sst_avhrrf_mb_l3u - - sst_avhrrf_mc_l3u - - sst_ahi_h08_l3c - - sst_abi_g17_l3c - - sst_abi_g16_l3c - qc config: - min: -2 - max: 45 - stride: 15 - min number of obs: 10 - - WALLTIME_MARINE_DUMP: '00:30:00' - TASK_GEOM_MARINE_DUMP: '1:ppn=40:tpp=2' - MEMORY_MARINE_DUMP: 96GB diff --git a/parm/config.yaml b/parm/config.yaml new file mode 120000 index 00000000..1d8374ce --- /dev/null +++ b/parm/config.yaml @@ -0,0 +1 @@ +config.orion.yaml \ No newline at end of file diff --git a/parm/nc2ioda/nc2ioda.yaml.j2 b/parm/nc2ioda/nc2ioda.yaml.j2 index be3cdf27..36fd3979 100644 --- a/parm/nc2ioda/nc2ioda.yaml.j2 +++ b/parm/nc2ioda/nc2ioda.yaml.j2 @@ -1,22 +1,33 @@ provider: {{ provider }} window begin: {{ window_begin | to_isotime }} window end: {{ window_end | to_isotime }} +output file: {{ output_file }} + +# Superobing configuration {% if binning_stride is defined%} binning: stride: {{ binning_stride }} min number of obs: {{ binning_min_number_of_obs }} {% endif %} + +# Bounds for rudimentary QC {% if bounds_min is defined and bounds_max is defined %} bounds: min: {{ bounds_min }} max: {{ bounds_max }} {% endif %} -output file: {{ output_file }} + +# Ocean basin flags {% if ocean_basin is defined %} ocean basin: {{ ocean_basin }} {% endif %} input files: {{ input_files }} +# Error adjustment for re-dated observations +{% if error_ratio is defined %} +error ratio: {{ error_ratio }} +{% endif %} + # Used in JRR_AOD {% if provider == "VIIRSAOD"%} variable: aerosolOpticalDepth diff --git a/sorc/bufr-query b/sorc/bufr-query index cdf5330f..33d2d5e6 160000 --- a/sorc/bufr-query +++ b/sorc/bufr-query @@ -1 +1 @@ -Subproject commit cdf5330fcaa8a91be3b72749c7538aa984f7eba7 +Subproject commit 33d2d5e68e1d177b5994042bb6d3b75e784ab174 diff --git a/ush/python/pyobsforge/obsdb/ghrsst_db.py b/ush/python/pyobsforge/obsdb/ghrsst_db.py index b30d938b..17c3c807 100644 --- a/ush/python/pyobsforge/obsdb/ghrsst_db.py +++ b/ush/python/pyobsforge/obsdb/ghrsst_db.py @@ -1,13 +1,13 @@ import os import glob -from datetime import datetime, timedelta +from datetime import datetime from pyobsforge.obsdb import BaseDatabase class GhrSstDatabase(BaseDatabase): """Class to manage an observation file database for data assimilation.""" - def __init__(self, db_name="obs_files.db", + def __init__(self, db_name="ghrsst.db", dcom_dir="/lfs/h1/ops/prod/dcom/", obs_dir="sst"): base_dir = os.path.join(dcom_dir, '*', obs_dir) @@ -57,8 +57,14 @@ def parse_filename(self, filename): def ingest_files(self): """Scan the directory for new observation files and insert them into the database.""" - obs_files = glob.glob(os.path.join(self.base_dir, "*-OSPO-L3U_GHRSST-*.nc")) + ospo_files = glob.glob(os.path.join(self.base_dir, "*-OSPO-L3?_GHRSST-*.nc")) + star_files = glob.glob(os.path.join(self.base_dir, "*-STAR-L3?_GHRSST-*.nc")) + obs_files = ospo_files + star_files print(f"Found {len(obs_files)} new files to ingest") + + # Counter for successful ingestions + ingested_count = 0 + for file in obs_files: parsed_data = self.parse_filename(file) if parsed_data: @@ -66,32 +72,9 @@ def ingest_files(self): INSERT INTO obs_files (filename, obs_time, receipt_time, instrument, satellite, obs_type) VALUES (?, ?, ?, ?, ?, ?) """ - self.insert_record(query, parsed_data) - - -# Example Usage -if __name__ == "__main__": - db = GhrSstDatabase(db_name="sst_obs.db", - dcom_dir="/home/gvernier/Volumes/hera-s1/runs/realtimeobs/lfs/h1/ops/prod/dcom/", - obs_dir="sst") - - # Check for new files - db.ingest_files() - - # Query files for a given DA cycle - da_cycle = "20250316000000" - window_begin = datetime.strptime(da_cycle, "%Y%m%d%H%M%S") - timedelta(hours=3) - window_end = datetime.strptime(da_cycle, "%Y%m%d%H%M%S") + timedelta(hours=3) - - valid_files = db.get_valid_files(window_begin=window_begin, - window_end=window_end, - instrument="VIIRS", - satellite="NPP", - obs_type="SSTsubskin") - - print(f"Found {len(valid_files)} valid files for DA cycle {da_cycle}") - for valid_file in valid_files: - if os.path.exists(valid_file): - print(f"Valid file: {valid_file}") - else: - print(f"File does not exist: {valid_file}") + try: + self.insert_record(query, parsed_data) + ingested_count += 1 + except Exception as e: + print(f"Failed to insert record for {file}: {e}") + print(f"################################ Successfully ingested {ingested_count} files into the database.") diff --git a/ush/python/pyobsforge/obsdb/obsdb.py b/ush/python/pyobsforge/obsdb/obsdb.py index 6b744750..fa92d311 100644 --- a/ush/python/pyobsforge/obsdb/obsdb.py +++ b/ush/python/pyobsforge/obsdb/obsdb.py @@ -108,8 +108,6 @@ def get_valid_files(self, query = "SELECT receipt_time FROM obs_files WHERE filename = ?" receipt_time = self.execute_query(query, (filename,))[0][0] receipt_time = datetime.strptime(receipt_time, "%Y-%m-%d %H:%M:%S.%f") - print("receipt time, window end:", receipt_time, window_end) - print("Types - receipt_time:", type(receipt_time), "window_end:", type(window_end)) if receipt_time <= window_end - timedelta(minutes=minutes_behind_realtime[check_receipt]): continue @@ -122,7 +120,6 @@ def get_valid_files(self, for src_file in valid_files: dst_file = join(dst_dir, f"{basename(src_file)}") dst_files.append(dst_file) - logger.info(f"copying {src_file} to {dst_file}") src_dst_obs_list.append([src_file, dst_file]) FileHandler({'mkdir': [dst_dir]}).sync() FileHandler({'copy': src_dst_obs_list}).sync() diff --git a/ush/python/pyobsforge/obsdb/rads_db.py b/ush/python/pyobsforge/obsdb/rads_db.py new file mode 100644 index 00000000..f4e97e96 --- /dev/null +++ b/ush/python/pyobsforge/obsdb/rads_db.py @@ -0,0 +1,63 @@ +import os +import glob +from datetime import datetime, timedelta +from pyobsforge.obsdb import BaseDatabase + + +class RADSDatabase(BaseDatabase): + """Class to manage an observation file database for data assimilation.""" + + def __init__(self, db_name="rads.db", + dcom_dir="/lfs/h1/ops/prod/dcom/", + obs_dir="sst"): + base_dir = os.path.join(dcom_dir, '*', obs_dir) + super().__init__(db_name, base_dir) + + def create_database(self): + """ + Create the SQLite database and observation files table. + + This method initializes the database with a table named `obs_files` to store metadata + about observation files. The table contains the following columns: + + - `id`: A unique identifier for each record (auto-incremented primary key). + - `filename`: The full path to the observation file (must be unique). + - `obs_time`: The timestamp of the observation, extracted from the filename. + - `receipt_time`: The timestamp when the file was added to the `dcom` directory. + - `satellite`: The satellite from which the observation was collected (e.g., 3a, 3b, j3, ...). + + The table is created if it does not already exist. + """ + query = """ + CREATE TABLE IF NOT EXISTS obs_files ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + filename TEXT UNIQUE, + obs_time TIMESTAMP, + receipt_time TIMESTAMP, + satellite TEXT + ) + """ + self.execute_query(query) + + def parse_filename(self, filename): + """Extract metadata from filenames matching the expected pattern.""" + parts = os.path.basename(filename).replace('.', '_').split('_') + if len(parts) == 5 and parts[0] == 'rads' and parts[1] == 'adt' and parts[3].isdigit(): + obs_time = datetime.strptime(parts[3], "%Y%j") + timedelta(hours=12) + satellite = parts[2] + receipt_time = datetime.fromtimestamp(os.path.getctime(filename)) + return filename, obs_time, receipt_time, satellite + return None + + def ingest_files(self): + """Scan the directory for new observation files and insert them into the database.""" + obs_files = glob.glob(os.path.join(self.base_dir, "rads_adt_??_???????.nc")) + print(f"Found {len(obs_files)} new files to ingest") + for file in obs_files: + parsed_data = self.parse_filename(file) + if parsed_data: + query = """ + INSERT INTO obs_files (filename, obs_time, receipt_time, satellite) + VALUES (?, ?, ?, ?) + """ + self.insert_record(query, parsed_data) diff --git a/ush/python/pyobsforge/task/marine_prepobs.py b/ush/python/pyobsforge/task/marine_prepobs.py index dd398cf7..7e184d14 100644 --- a/ush/python/pyobsforge/task/marine_prepobs.py +++ b/ush/python/pyobsforge/task/marine_prepobs.py @@ -2,10 +2,13 @@ from logging import getLogger from typing import Dict, Any -from wxflow import AttrDict, Task, add_to_datetime, to_timedelta, logit -from pyobsforge.obsdb.ghrsst_db import GhrSstDatabase -from multiprocessing import Process -from pyobsforge.task.run_nc2ioda import run_nc2ioda +from wxflow import AttrDict, Task, add_to_datetime, to_timedelta, logit, FileHandler +from pyobsforge.task.providers import ProviderConfig +from multiprocessing import Process, Manager +from os.path import join +from datetime import timedelta +import glob +from os.path import basename logger = getLogger(__name__.split('.')[-1]) @@ -24,131 +27,141 @@ def __init__(self, config: Dict[str, Any]) -> None: { 'window_begin': _window_begin, 'window_end': _window_end, - 'OPREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", - 'APREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z." + 'PREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.", } ) # task_config is everything that this task should need self.task_config = AttrDict(**self.task_config, **local_dict) - # Initialize the GHRSST database - self.ghrsst_db = GhrSstDatabase(db_name="sst_obs.db", - dcom_dir=self.task_config.DCOMROOT, - obs_dir="sst") + # Initialize the Providers + self.ghrsst = ProviderConfig.from_task_config("ghrsst", self.task_config) + self.rads = ProviderConfig.from_task_config("rads", self.task_config) + + # Initialize the list of processed ioda files + # TODO: Does not work. This should be a list of gathered ioda files that are created + # across all processes + self.ioda_files = [] @logit(logger) def initialize(self) -> None: """ """ # Update the database with new files - self.ghrsst_db.ingest_files() + self.ghrsst.db.ingest_files() + self.rads.db.ingest_files() @logit(logger) def execute(self) -> None: """ """ - processes = [] - for provider, obs_spaces in self.task_config.providers.items(): - logger.info(f"========= provider: {provider}") - - # Get the obs space QC configuration - bounds_min = obs_spaces["qc config"]["min"] - bounds_max = obs_spaces["qc config"]["max"] - binning_stride = obs_spaces["qc config"]["stride"] - binning_min_number_of_obs = obs_spaces["qc config"]["min number of obs"] - - # Process each obs space - for obs_space in obs_spaces["list"]: - logger.info(f"========= obs_space: {obs_space}") - # extract the instrument and platform from the obs_space - obs_type, instrument, platform, proc_level = obs_space.split("_") - platform = platform.upper() - instrument = instrument.upper() - logger.info(f"Processing {platform.upper()} {instrument.upper()}") - - # Start a new process - process = Process(target=self.process_obs_space, - args=(provider, obs_space, instrument, platform, - bounds_min, bounds_max, - binning_stride, binning_min_number_of_obs)) - process.start() - processes.append(process) - - # Wait for all processes to complete - completed_processes = [] - for process in processes: - process.join() - completed_processes.append(process) - logger.info(f"completed processes: {completed_processes}") + with Manager() as manager: + # Use a Manager list to share ioda_files across processes + shared_ioda_files = manager.list() + + processes = [] + for provider, obs_spaces in self.task_config.providers.items(): + logger.info(f"========= provider: {provider}") + for obs_space in obs_spaces["list"]: + logger.info(f"========= obs_space: {obs_space}") + + # Start a new process + process = Process(target=self.process_obs_space, + args=(provider, obs_space, shared_ioda_files)) + process.start() + processes.append(process) + + # Wait for all processes to complete + for process in processes: + process.join() + + # Convert the Manager list to a regular list + self.ioda_files = list(shared_ioda_files) + logger.info(f"Final ioda_files: {self.ioda_files}") @logit(logger) def process_obs_space(self, provider: str, obs_space: str, - instrument: str, - platform: str, - bounds_min: float, - bounds_max: float, - binning_stride: float, - binning_min_number_of_obs: int) -> None: + shared_ioda_files) -> None: + output_file = f"{self.task_config['RUN']}.t{self.task_config['cyc']:02d}z.{obs_space}.tm00.nc" + + # Process GHRSST if provider == "ghrsst": - return self.process_ghrsst(provider, obs_space, instrument, platform, - bounds_min, bounds_max, binning_stride, binning_min_number_of_obs) + parts = obs_space.split("_") + instrument = parts[1].upper() + platform = parts[2].upper() + + # Process the observation space + kwargs = { + 'provider': provider, + 'obs_space': obs_space, + 'instrument': instrument, + 'platform': platform, + 'obs_type': "SSTsubskin", + 'output_file': output_file, + 'window_begin': self.task_config.window_begin, + 'window_end': self.task_config.window_end, + 'task_config': self.task_config + } + result = self.ghrsst.process_obs_space(**kwargs) + return result + + # Process RADS + if provider == "rads": + platform = obs_space.split("_")[2] + instrument = None + # TODO(G): Get the window size from the config + window_begin = self.task_config.window_begin - timedelta(hours=72) + window_end = self.task_config.window_begin + timedelta(hours=72) + kwargs = { + 'provider': provider, + 'obs_space': obs_space, + 'instrument': instrument, + 'platform': platform, + 'obs_type': "", + 'output_file': output_file, + 'window_begin': window_begin, + 'window_end': window_end, + 'task_config': self.task_config + } + result = self.rads.process_obs_space(**kwargs) + return result else: logger.error(f"Provider {provider} not supported") - @logit(logger) - def process_ghrsst(self, - provider: str, - obs_space: str, - instrument: str, - platform: str, - bounds_min: float, - bounds_max: float, - binning_stride: float, - binning_min_number_of_obs: int) -> None: - """ - Process a single observation space by querying the database for valid files, - copying them to the appropriate directory, and running the ioda converter. - - Args: - provider (str): The data provider name. - obs_space (str): The observation space identifier. - instrument (str): The instrument used for the observations. - platform (str): The satellite platform name. - bounds_min (float): Minimum QC bound for observations. - bounds_max (float): Maximum QC bound for observations. - binning_stride (float): Stride value for binning observations. - binning_min_number_of_obs (int): Minimum number of observations required for binning. - """ - # Query the database for valid files - input_files = self.ghrsst_db.get_valid_files(window_begin=self.task_config.window_begin, - window_end=self.task_config.window_end, - dst_dir=obs_space, - instrument=instrument, - satellite=platform, - obs_type="SSTsubskin") - logger.info(f"number of valid files: {len(input_files)}") - - # Process the observations if the obs space is not empty - if len(input_files) > 0: - # Configure the ioda converter - output_file = f"{self.task_config['RUN']}.t{self.task_config['cyc']:02d}z.{obs_space}.tm00.nc" - context = {'provider': provider.upper(), - 'window_begin': self.task_config.window_begin, - 'window_end': self.task_config.window_end, - 'bounds_min': bounds_min, - 'bounds_max': bounds_max, - 'binning_stride': binning_stride, - 'binning_min_number_of_obs': binning_min_number_of_obs, - 'input_files': input_files, - 'output_file': output_file} - result = run_nc2ioda(self.task_config, obs_space, context) - logger.info(f"run_nc2ioda result: {result}") - @logit(logger) def finalize(self) -> None: """ """ - logger.info("finalize") + # Copy the processed ioda files to the destination directory + logger.info("Copying ioda files to destination COMROOT directory") + yyyymmdd = self.task_config['PDY'].strftime('%Y%m%d') + + comout = join(self.task_config['COMROOT'], + self.task_config['PSLOT'], + f"{self.task_config['RUN']}.{yyyymmdd}", + f"{self.task_config['cyc']:02d}", + 'ocean') + + # Loop through the observation types + obs_types = ['sst', 'adt', 'icec', 'sss'] + src_dst_obs_list = [] # list of [src_file, dst_file] + for obs_type in obs_types: + # Create the destination directory + comout_tmp = join(comout, obs_type) + FileHandler({'mkdir': [comout_tmp]}).sync() + + # Glob the ioda files + ioda_files = glob.glob(join(self.task_config['DATA'], + f"{self.task_config['PREFIX']}*{obs_type}_*.nc")) + for ioda_file in ioda_files: + logger.info(f"ioda_file: {ioda_file}") + src_file = ioda_file + dst_file = join(comout_tmp, basename(ioda_file)) + src_dst_obs_list.append([src_file, dst_file]) + + logger.info("Copying ioda files to destination COMROOT directory") + logger.info(f"src_dst_obs_list: {src_dst_obs_list}") + + FileHandler({'copy': src_dst_obs_list}).sync() diff --git a/ush/python/pyobsforge/task/providers.py b/ush/python/pyobsforge/task/providers.py new file mode 100644 index 00000000..2dd61077 --- /dev/null +++ b/ush/python/pyobsforge/task/providers.py @@ -0,0 +1,127 @@ +from logging import getLogger +from pyobsforge.obsdb.ghrsst_db import GhrSstDatabase +from pyobsforge.obsdb.rads_db import RADSDatabase +from typing import Any +from dataclasses import dataclass +from wxflow import AttrDict +from pyobsforge.task.run_nc2ioda import run_nc2ioda + +logger = getLogger(__name__.split('.')[-1]) + + +@dataclass +class QCConfig: + bounds_min: float + bounds_max: float + binning_stride: float + binning_min_number_of_obs: int + error_ratio: float + + @classmethod + def from_dict(cls, config: dict) -> "QCConfig": + # Initialize with default values + instance = cls( + bounds_min=0.0, + bounds_max=0.0, + binning_stride=0.0, + binning_min_number_of_obs=0, + error_ratio=0.0 + ) + + # Only set attributes for keys that are defined in config + if "min" in config: + instance.bounds_min = config["min"] + if "max" in config: + instance.bounds_max = config["max"] + if "stride" in config: + instance.binning_stride = config["stride"] + if "min number of obs" in config: + instance.binning_min_number_of_obs = config["min number of obs"] + if "error ratio" in config: + instance.error_ratio = config["error ratio"] + + return instance + + +class ProviderConfig: + def __init__(self, qc_config: QCConfig, db: Any): # Replace `Any` with a more specific type if desired + self.qc_config = qc_config + self.db = db + + @classmethod + def from_task_config(cls, provider_name: str, task_config: AttrDict) -> "ProviderConfig": + qc_raw = task_config.providers[provider_name]["qc config"] + qc = QCConfig.from_dict(qc_raw) + + print(f"@@@@@@@@@@@@@@@@@@@@@@@@ provider: {provider_name}") + + if provider_name == "ghrsst": + db = GhrSstDatabase(db_name=f"{provider_name}.db", dcom_dir=task_config.DCOMROOT, obs_dir="sst") + elif provider_name == "rads": + db = RADSDatabase(db_name=f"{provider_name}.db", dcom_dir=task_config.DCOMROOT, obs_dir="wgrdbul/adt") + else: + raise NotImplementedError(f"DB setup for provider {provider_name} not yet implemented") + + return cls(qc_config=qc, db=db) + + def process_obs_space(self, **kwargs) -> None: + """ + Process a single observation space by querying the database for valid files, + copying them to the appropriate directory, and running the ioda converter. + + Args: + **kwargs: Keyword arguments including: + provider: Provider name + obs_space: Observation space name + instrument: Instrument name + platform: Platform name + obs_type: Observation type + output_file: Output file path + window_begin: Beginning of time window + window_end: End of time window + task_config: Task configuration + """ + # Extract parameters from kwargs + provider = kwargs.get('provider') + obs_space = kwargs.get('obs_space') + instrument = kwargs.get('instrument') + platform = kwargs.get('platform') + obs_type = kwargs.get('obs_type') + output_file = kwargs.get('output_file') + window_begin = kwargs.get('window_begin') + window_end = kwargs.get('window_end') + task_config = kwargs.get('task_config') + + # Query the database for valid files + input_files = self.db.get_valid_files(window_begin=window_begin, + window_end=window_end, + dst_dir=obs_space, + instrument=instrument, + satellite=platform, + obs_type=obs_type) + logger.info(f"number of valid files: {len(input_files)}") + + # Process the observations if the obs space is not empty + if len(input_files) > 0: + # Configure the ioda converter + context = {'provider': provider.upper(), + 'window_begin': window_begin, + 'window_end': window_end, + 'input_files': input_files, + 'output_file': output_file} + + # Only add QC config attributes if they exist + if hasattr(self.qc_config, 'bounds_min'): + context['bounds_min'] = self.qc_config.bounds_min + if hasattr(self.qc_config, 'error_ratio'): + context['error_ratio'] = self.qc_config.error_ratio + if hasattr(self.qc_config, 'bounds_max'): + context['bounds_max'] = self.qc_config.bounds_max + if hasattr(self.qc_config, 'binning_stride'): + context['binning_stride'] = self.qc_config.binning_stride + if hasattr(self.qc_config, 'binning_min_number_of_obs'): + context['binning_min_number_of_obs'] = self.qc_config.binning_min_number_of_obs + result = run_nc2ioda(task_config, obs_space, context) + logger.info(f"run_nc2ioda result: {result}") + else: + logger.warning(f"No valid files found for {obs_space} with {instrument} on {platform}") diff --git a/ush/python/pyobsforge/tests/test_rads_database.py b/ush/python/pyobsforge/tests/test_rads_database.py new file mode 100644 index 00000000..726cd5ce --- /dev/null +++ b/ush/python/pyobsforge/tests/test_rads_database.py @@ -0,0 +1,107 @@ +import os +import glob +import tempfile +import shutil +import sqlite3 +from datetime import datetime, timedelta + +import pytest + +from pyobsforge.obsdb.rads_db import RADSDatabase + + +@pytest.fixture +def temp_obs_dir(): + """Create a temp directory with mock RADS NetCDF files.""" + base_dir = tempfile.mkdtemp() + sub_dir = os.path.join(base_dir, "some_subdir", "wgrdbul", "adt") + os.makedirs(sub_dir) + + # Desired datetime for file timestamps + mock_time = datetime(2025, 3, 16, 11, 0, 0).timestamp() + + # Create mock NetCDF files + filenames = [ + "rads_adt_3a_2025075.nc", + "rads_adt_3b_2025075.nc", + "rads_adt_6a_2025075.nc", + "rads_adt_c2_2025075.nc", + "rads_adt_j3_2025075.nc", + "rads_adt_ncoda_3a_2025075.nc", + "rads_adt_ncoda_3b_2025075.nc", + "rads_adt_ncoda_6a_2025075.nc", + "rads_adt_ncoda_c2_2025075.nc", + "rads_adt_ncoda_j3_2025075.nc", + "rads_adt_ncoda_sa_2025075.nc", + "rads_adt_ncoda_sw_2025075.nc", + "rads_adt_sa_2025075.nc", + "rads_adt_sw_2025075.nc" + "invalid_file.nc" + ] + for fname in filenames: + fname_tmp = os.path.join(sub_dir, fname) + with open(fname_tmp, "w") as f: + f.write("fake content") + os.utime(fname_tmp, (mock_time, mock_time)) # (access_time, modification_time) + + yield base_dir + shutil.rmtree(base_dir) + + +@pytest.fixture +def db(temp_obs_dir): + """Initialize test database.""" + db_path = os.path.join(temp_obs_dir, "rads_test.db") + return RADSDatabase(db_name=db_path, dcom_dir=temp_obs_dir, obs_dir="wgrdbul/adt") + + +def test_create_database(db): + db.create_database() + conn = sqlite3.connect(db.db_name) + cursor = conn.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='obs_files'") + assert cursor.fetchone() is not None + conn.close() + + +def test_parse_valid_filename(db): + fname = "rads_adt_j3_2025075.nc" + fname = glob.glob(os.path.join(db.base_dir, fname))[0] + parsed = db.parse_filename(fname) + creation_time = datetime.fromtimestamp(os.path.getctime(fname)) + + assert parsed is not None + assert parsed[0] == fname + assert parsed[1] == datetime(2025, 3, 16, 12, 0) + assert parsed[2] == creation_time + assert parsed[3] == "j3" + + +def test_parse_invalid_filename(db): + assert db.parse_filename("rads_adt_ncoda_sw_2025073.nc") is None + assert db.parse_filename("20250316_invalid_filename.nc") is None + + +def test_ingest_files(db): + db.ingest_files() + conn = sqlite3.connect(db.db_name) + cursor = conn.cursor() + cursor.execute("SELECT COUNT(*) FROM obs_files") + count = cursor.fetchone()[0] + conn.close() + assert count == 6, "Should ingest 6 valid RADS files" + + +def test_get_valid_files(db): + db.ingest_files() + da_cycle = "20250316120000" + window_begin = datetime.strptime(da_cycle, "%Y%m%d%H%M%S") - timedelta(hours=3) + window_end = datetime.strptime(da_cycle, "%Y%m%d%H%M%S") + timedelta(hours=1) + dst_dir = 'rads' + valid_files = db.get_valid_files(window_begin=window_begin, + window_end=window_end, + dst_dir=dst_dir, + satellite="c2") + + assert any("2025075" in f for f in valid_files) + assert len(valid_files) == 1