Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/pytest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,3 @@ jobs:
run: |
source obsdb/bin/activate
pytest ush/python/pyobsforge/tests/ --disable-warnings -v
pytest scripts/tests/ --disable-warnings -v
4 changes: 4 additions & 0 deletions parm/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ obsforge:
assim_freq: 6

aoddump:
provider: VIIRSAOD
platforms: ['npp', 'n20', 'n21']
thinning_threshold: 0
channel: 4
preqc: 2
WALLTIME_AOD_DUMP: '00:30:00'
TASK_GEOM_AOD_DUMP: '1:ppn=1:tpp=1'
MEMORY_AOD_DUMP: 96GB
Expand Down
12 changes: 12 additions & 0 deletions parm/nc2ioda/nc2ioda.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,23 @@ binning:
stride: {{ binning_stride }}
min number of obs: {{ binning_min_number_of_obs }}
{% endif %}
{% if bounds_min is defined and bounds_max is defined %}
bounds:
min: {{ bounds_min }}
max: {{ bounds_max }}
{% endif %}
output file: {{ output_file }}
{% if ocean_basin is defined %}
ocean basin: {{ ocean_basin }}
{% endif %}
input files: {{ input_files }}

# Used in JRR_AOD
{% if provider == "VIIRSAOD"%}
variable: aerosolOpticalDepth
thinning:
threshold: {{ thinning_threshold }}
channel: 4
preqc: 2
{% endif %}

8 changes: 4 additions & 4 deletions scripts/exobsforge_global_aod_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
config = AttrDict(**config_env, **obsforge_dict)
config = AttrDict(**config, **config_yaml['aoddump'])

AeroObs = AerosolObsPrep(config)
AeroObs.initialize()
AeroObs.execute()
AeroObs.finalize()
aeroObs = AerosolObsPrep(config)
aeroObs.initialize()
aeroObs.execute()
aeroObs.finalize()
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ def script_env(tmp_path):
# Set environment vars expected by the script
env = os.environ.copy()
env["cyc"] = "0"
env["current_cycle"] = "20250301500"
env["PDY"] = "20250315"
env["current_cycle"] = "20250301600"
env["PDY"] = "20250316"
env["RUN"] = "gdas"
return env

Expand Down Expand Up @@ -60,24 +60,28 @@ def test_run_exobsforge_script(script_env):
create_dcom(output_root=os.getenv("DCOMROOT"),
dcom_tree_file=Path(__file__).parent / "dcom_tree.txt")

# Run the script using subprocess
exec = Path(__file__).parent.parent / "exobsforge_global_marine_dump.py"
cwd = Path(__file__).parent / "tests_output/RUNDIRS/obsforge"
result = subprocess.run(
["python3", exec],
cwd=cwd,
env=env,
capture_output=True,
text=True
)

# Print the standard output
print("Standard Output:")
print(result.stdout)

# Optionally, print the standard error
print("Standard Error:")
print(result.stderr)

# Basic assertions
assert result.returncode == 0
# List of scripts to run
scripts = [
"exobsforge_global_marine_dump.py",
"exobsforge_global_aod_dump.py"
]

for script_name in scripts:
# Run each script using subprocess
exec = Path(__file__).parent.parent / script_name
cwd = Path(__file__).parent / "tests_output/RUNDIRS/obsforge"
result = subprocess.run(
["python3", exec],
cwd=cwd,
env=env,
capture_output=True,
text=True
)

print(f"Standard Output for {script_name}:")
print(result.stdout)
print(f"Standard Error for {script_name}:")
print(result.stderr)

# Basic assertions
assert result.returncode == 0
2 changes: 1 addition & 1 deletion ush/python/pyobsforge/obsdb/ghrsst_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def parse_filename(self, filename):

def ingest_files(self):
"""Scan the directory for new observation files and insert them into the database."""
obs_files = glob.glob(os.path.join(self.base_dir, "*.nc"))
obs_files = glob.glob(os.path.join(self.base_dir, "*-OSPO-L3U_GHRSST-*.nc"))
print(f"Found {len(obs_files)} new files to ingest")
for file in obs_files:
parsed_data = self.parse_filename(file)
Expand Down
29 changes: 25 additions & 4 deletions ush/python/pyobsforge/obsdb/obsdb.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from logging import getLogger
import sqlite3
from datetime import datetime, timedelta
from wxflow.sqlitedb import SQLiteDB
from wxflow import FileHandler
from os.path import basename, join

logger = getLogger(__name__.split('.')[-1])


class BaseDatabase(SQLiteDB):
Expand Down Expand Up @@ -57,21 +62,25 @@ def execute_query(self, query: str, params: tuple = None) -> list:
def get_valid_files(self,
window_begin: datetime,
window_end: datetime,
dst_dir: str,
instrument: str = None,
satellite: str = None,
obs_type: str = None,
check_receipt: str = "none") -> list:
"""
Retrieve a list of observation files within a specified time window, possibly filtered by instrument,
satellite, and observation type. The check_receipt parameter can be 'gdas', 'gfs', or 'none'.
Retrieve and copy to dst_dir a list of observation files within a specified time window, possibly filtered by instrument,
satellite, and observation type. The check_receipt parameter can be 'gdas', 'gfs', or 'none'. If 'gdas' or
'gfs' is specified, files are further filtered based on their receipt time to ensure they meet the
required delay criteria.

:param window_begin: Start of the time window (datetime object).
:param window_end: End of the time window (datetime object).
:param dst_dir: Destination directory where valid files will be copied.
:param instrument: (Optional) Filter by instrument name.
:param satellite: (Optional) Filter by satellite name.
:param obs_type: (Optional) Filter by observation type.
:param check_receipt: (Optional) Specify receipt time check ('gdas', 'gfs', or 'none').
:return: List of valid observation file names.
:return: List of valid observation file paths in the destination directory.
"""

query = """
Expand Down Expand Up @@ -106,4 +115,16 @@ def get_valid_files(self,

valid_files.append(filename)

return valid_files
# Copy files to the destination directory
dst_files = []
if len(valid_files) > 0:
src_dst_obs_list = [] # list of [src_file, dst_file]
for src_file in valid_files:
dst_file = join(dst_dir, f"{basename(src_file)}")
dst_files.append(dst_file)
logger.info(f"copying {src_file} to {dst_file}")
src_dst_obs_list.append([src_file, dst_file])
FileHandler({'mkdir': [dst_dir]}).sync()
FileHandler({'copy': src_dst_obs_list}).sync()

return dst_files
31 changes: 29 additions & 2 deletions ush/python/pyobsforge/task/aero_prepobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from wxflow import (AttrDict, Task, add_to_datetime, to_timedelta,
logit)
from pyobsforge.obsdb.jrr_aod_db import JrrAodDatabase
from pyobsforge.task.run_nc2ioda import run_nc2ioda

logger = getLogger(__name__.split('.')[-1])

Expand All @@ -31,17 +33,42 @@ def __init__(self, config: Dict[str, Any]) -> None:
# task_config is everything that this task should need
self.task_config = AttrDict(**self.task_config, **local_dict)

# Initialize the JRR_AOD database
self.jrr_aod_db = JrrAodDatabase(db_name="jrr_aod_obs.db",
dcom_dir=self.task_config.DCOMROOT,
obs_dir="jrr_aod")

@logit(logger)
def initialize(self) -> None:
"""
"""
print("initialize")
# Update the database with new files
self.jrr_aod_db.ingest_files()

@logit(logger)
def execute(self) -> None:
"""
"""
print("execute")
for platform in self.task_config.platforms:
print(f"========= platform: {platform}")
input_files = self.jrr_aod_db.get_valid_files(window_begin=self.task_config.window_begin,
window_end=self.task_config.window_end,
dst_dir='jrr_aod',
satellite=platform)
logger.info(f"number of valid files: {len(input_files)}")

if len(input_files) > 0:
print(f"number of valid files: {len(input_files)}")
obs_space = 'jrr_aod'
output_file = f"{self.task_config['RUN']}.t{self.task_config['cyc']:02d}z.{obs_space}.tm00.nc"
context = {'provider': 'VIIRSAOD',
'window_begin': self.task_config.window_begin,
'window_end': self.task_config.window_end,
'thinning_threshold': 0,
'input_files': input_files,
'output_file': output_file}
result = run_nc2ioda(self.task_config, obs_space, context)
logger.info(f"run_nc2ioda result: {result}")

@logit(logger)
def finalize(self) -> None:
Expand Down
47 changes: 8 additions & 39 deletions ush/python/pyobsforge/task/marine_prepobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@

from logging import getLogger
from typing import Dict, Any
import subprocess
from wxflow import (AttrDict, Task, add_to_datetime, to_timedelta,
logit, FileHandler, Jinja, save_as_yaml)
from wxflow import AttrDict, Task, add_to_datetime, to_timedelta, logit
from pyobsforge.obsdb.ghrsst_db import GhrSstDatabase
from os.path import basename, join
from multiprocessing import Process
from pyobsforge.task.run_nc2ioda import run_nc2ioda

logger = getLogger(__name__.split('.')[-1])

Expand Down Expand Up @@ -125,26 +123,16 @@ def process_ghrsst(self,
binning_min_number_of_obs (int): Minimum number of observations required for binning.
"""
# Query the database for valid files
valid_files = self.ghrsst_db.get_valid_files(window_begin=self.task_config.window_begin,
input_files = self.ghrsst_db.get_valid_files(window_begin=self.task_config.window_begin,
window_end=self.task_config.window_end,
dst_dir=obs_space,
instrument=instrument,
satellite=platform,
obs_type="SSTsubskin")
logger.info(f"number of valid files: {len(valid_files)}")
logger.info(f"number of valid files: {len(input_files)}")

# Process the observations if the obs space is not empty
if len(valid_files) > 0:
# Copy the valid files to the RUNDIR
src_dst_obs_list = [] # list of [src_file, dst_file]
input_files = [] # list of dst_files used as input to the ioda converter
for src_file in valid_files:
dst_file = f"{obs_space}/{basename(src_file)}"
input_files.append(dst_file)
logger.info(f"copying {src_file} to {dst_file}")
src_dst_obs_list.append([src_file, dst_file])
FileHandler({'mkdir': [obs_space]}).sync()
FileHandler({'copy': src_dst_obs_list}).sync()

if len(input_files) > 0:
# Configure the ioda converter
output_file = f"{self.task_config['RUN']}.t{self.task_config['cyc']:02d}z.{obs_space}.tm00.nc"
context = {'provider': provider.upper(),
Expand All @@ -156,27 +144,8 @@ def process_ghrsst(self,
'binning_min_number_of_obs': binning_min_number_of_obs,
'input_files': input_files,
'output_file': output_file}
jinja_template = join(self.task_config['HOMEobsforge'], "parm", "nc2ioda", "nc2ioda.yaml.j2")
yaml_config = Jinja(jinja_template, context).render
nc2ioda_yaml = join(self.task_config['DATA'], obs_space, f"{obs_space}_nc2ioda.yaml")
save_as_yaml(yaml_config, nc2ioda_yaml)

# Run the ioda converter
nc2ioda_exe = join(self.task_config['HOMEobsforge'], 'build', 'bin', 'obsforge_obsprovider2ioda.x')
try:
result = subprocess.run([nc2ioda_exe, nc2ioda_yaml],
cwd=self.task_config['DATA'],
capture_output=True,
text=True)
logger.info(f"Standard Output: \n{result.stdout}")
# TODO (G): Figure out what to do with failure.
# Ignore failures for now and just issue a warning
if result.returncode != 0:
logger.error(f"Standard Error: \n{result.stderr}")
return 0
except subprocess.CalledProcessError as e:
logger.warning(f"ioda converter failed with error {e}, \
return code {e.returncode}")
result = run_nc2ioda(self.task_config, obs_space, context)
logger.info(f"run_nc2ioda result: {result}")

@logit(logger)
def finalize(self) -> None:
Expand Down
41 changes: 41 additions & 0 deletions ush/python/pyobsforge/task/run_nc2ioda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from logging import getLogger
import subprocess
from wxflow import save_as_yaml, parse_j2yaml
from os.path import join

logger = getLogger(__name__.split('.')[-1])


def run_nc2ioda(task_config: dict, obs_space: str, context: dict) -> int:
"""
Executes the nc2ioda conversion process using a Jinja2 template and a YAML configuration.

Args:
task_config (dict): Configuration dictionary containing paths and settings for the task.
obs_space (str): Observation space identifier used to generate file paths.
context (dict): Context dictionary with variables to render the Jinja2 template.

Returns:
int: Returns 0 upon successful execution. Logs errors and warnings for failures.
"""
jinja_template = join(task_config['HOMEobsforge'], "parm", "nc2ioda", "nc2ioda.yaml.j2")
yaml_config = parse_j2yaml(jinja_template, context)
nc2ioda_yaml = join(task_config['DATA'], obs_space, f"{obs_space}_nc2ioda.yaml")
save_as_yaml(yaml_config, nc2ioda_yaml)

# Run the ioda converter
nc2ioda_exe = join(task_config['HOMEobsforge'], 'build', 'bin', 'obsforge_obsprovider2ioda.x')
try:
result = subprocess.run([nc2ioda_exe, nc2ioda_yaml],
cwd=task_config['DATA'],
capture_output=True,
text=True)
logger.info(f"Standard Output: \n{result.stdout}")
# TODO (G): Figure out what to do with failures.
# Ignore failures for now and just issue a warning
if result.returncode != 0:
logger.error(f"Standard Error: \n{result.stderr}")
return 0
except subprocess.CalledProcessError as e:
logger.warning(f"ioda converter failed with error {e}, \
return code {e.returncode}")
Loading