Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions parm/config.orion.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
obsforge:
PSLOT: obsforge
HOMEobsforge: /work/noaa/da/gvernier/runs/obsForge
HOMEobsforge: /work2/noaa/da/mchoi3/temp/obsForge
SDATE: 202503141800
EDATE: 202503150000
COMROOT: /work/noaa/da/gvernier/runs/test_obsforge/COMROOT
COMROOT: /work2/noaa/da/mchoi3/temp/test_obsForge/COMROOT
DCOMROOT: /work2/noaa/da/common/lfs/h1/ops/prod/dcom
DATAROOT: /work/noaa/da/gvernier/runs/test_obsforge/RUNDIRS
DATAROOT: /work2/noaa/da/mchoi3/temp/test_obsForge/RUNDIRS
SCHEDULER: slurm
ACCOUNT: da-cpu
QUEUE: debug
PARTITION: orion
PARTITION: hercules
Comment thread
apchoiCMD marked this conversation as resolved.
Outdated
KEEPDATA: NO
assim_freq: 6

Expand Down Expand Up @@ -55,6 +55,13 @@ marinedump:
min: -2.0
max: 3.0
error ratio: 1.0
amsr2:
list:
- seaice_gw1_nh
- seaice_gw1_sh
qc config:
min: 0.0
max: 1.0

WALLTIME_MARINE_DUMP: '00:10:00'
TASK_GEOM_MARINE_DUMP: '1:ppn=20:tpp=2'
Expand Down
86 changes: 86 additions & 0 deletions ush/python/pyobsforge/obsdb/amsr2_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import os
import glob
from datetime import datetime
from pyobsforge.obsdb import BaseDatabase


class Amsr2Database(BaseDatabase):
"""Class to manage an observation file database for data assimilation."""

def __init__(self, db_name="amsr2.db",
dcom_dir="/lfs/h1/ops/prod/dcom/",
obs_dir="seaice/pda"):
base_dir = os.path.join(dcom_dir, '*', obs_dir)
super().__init__(db_name, base_dir)

def create_database(self):
"""
Create the SQLite database and observation files table.

This method initializes the database with a table named `obs_files` to store metadata
about observation files. The table contains the following columns:

- `id`: A unique identifier for each record (auto-incremented primary key).
- `filename`: The full path to the observation file (must be unique).
- `obs_time`: The timestamp of the observation, extracted from the filename.
- `receipt_time`: The timestamp when the file was added to the `dcom` directory.
- `instrument`: The instrument used to collect the observation (e.g., AMSR2).
- `satellite`: The satellite from which the observation was collected (e.g., GW1).
- `obs_type`: The type of observation (e.g., SEAICE)

The table is created if it does not already exist.
"""
query = """
CREATE TABLE IF NOT EXISTS obs_files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT UNIQUE,
obs_time TIMESTAMP,
receipt_time TIMESTAMP,
instrument TEXT,
satellite TEXT,
obs_type TEXT
)
"""
self.execute_query(query)

def parse_filename(self, filename):
# Example filename:
# AMSR2-SEAICE-NH_v2r2_GW1_s202503140032240_e202503140211220_c202503140245560.nc
parts = os.path.basename(filename).replace('_', '-').split('-')
try:
if len(parts) >= 8 and parts[0] == 'AMSR2':
instrument = parts[0]
obs_type = parts[1]
satellite = parts[4]
obs_time_str = parts[5][1:16]
receipt_time_str = parts[7].split('.')[0][1:16]

obs_time = datetime.strptime(obs_time_str, "%Y%m%d%H%M%S%f")
receipt_time = datetime.strptime(receipt_time_str, "%Y%m%d%H%M%S%f")
return filename, obs_time, receipt_time, instrument, satellite, obs_type
except Exception as e:
print(f"[DEBUG] Error parsing filename {filename}: {e}")
Comment thread
guillaumevernieres marked this conversation as resolved.
return None

def ingest_files(self):
"""Scan the directory for new observation files and insert them into the database."""
obs_files = glob.glob(os.path.join(self.base_dir, "*.nc"))
print(f"[INFO] Found {len(obs_files)} new files to ingest")
print(f"[INFO] Files found: {obs_files}")

# Counter for successful ingestions
ingested_count = 0

for file in obs_files:
parsed_data = self.parse_filename(file)
if parsed_data:
query = """
INSERT INTO obs_files (filename, obs_time, receipt_time, instrument, satellite, obs_type)
VALUES (?, ?, ?, ?, ?, ?)
"""
try:
self.insert_record(query, parsed_data)
ingested_count += 1
except Exception as e:
print(f"[DEBUG] Failed to insert record for {file}: {e}")
print(f"################################ Successfully ingested {ingested_count} files into the database.")
5 changes: 4 additions & 1 deletion ush/python/pyobsforge/obsdb/obsdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ def get_valid_files(self,
if check_receipt in ["gdas", "gfs"]:
query = "SELECT receipt_time FROM obs_files WHERE filename = ?"
receipt_time = self.execute_query(query, (filename,))[0][0]
receipt_time = datetime.strptime(receipt_time, "%Y-%m-%d %H:%M:%S.%f")
try:
Comment thread
apchoiCMD marked this conversation as resolved.
Outdated
receipt_time = datetime.strptime(receipt_time, "%Y-%m-%d %H:%M:%S.%f")
except ValueError:
receipt_time = datetime.strptime(receipt_time, "%Y-%m-%d %H:%M:%S")
if receipt_time <= window_end - timedelta(minutes=minutes_behind_realtime[check_receipt]):
continue

Expand Down
2 changes: 1 addition & 1 deletion ush/python/pyobsforge/obsdb/rads_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class RADSDatabase(BaseDatabase):

def __init__(self, db_name="rads.db",
dcom_dir="/lfs/h1/ops/prod/dcom/",
obs_dir="sst"):
obs_dir="wgrdbul/adt"):
base_dir = os.path.join(dcom_dir, '*', obs_dir)
super().__init__(db_name, base_dir)

Expand Down
79 changes: 79 additions & 0 deletions ush/python/pyobsforge/obsdb/smap_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import os
import glob
from datetime import datetime
from pyobsforge.obsdb import BaseDatabase


class SmapDatabase(BaseDatabase):
"""Class to manage an observation file database for data assimilation."""

def __init__(self, db_name="smap.db",
dcom_dir="/lfs/h1/ops/prod/dcom/",
obs_dir="wtxtbul/satSSS/SMAP"):
base_dir = os.path.join(dcom_dir, '*', obs_dir)
super().__init__(db_name, base_dir)

def create_database(self):
"""
Create the SQLite database and observation files table.

This method initializes the database with a table named `obs_files` to store metadata
about observation files. The table contains the following columns:

- `id`: A unique identifier for each record (auto-incremented primary key).
- `filename`: The full path to the observation file (must be unique).
- `obs_time`: The timestamp of the observation, extracted from the filename.
- `receipt_time`: The timestamp when the file was added to the `dcom` directory.
- `satellite`: The satellite from which the observation was collected (e.g., GW1).

The table is created if it does not already exist.
"""
query = """
CREATE TABLE IF NOT EXISTS obs_files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT UNIQUE,
obs_time TIMESTAMP,
receipt_time TIMESTAMP,
satellite TEXT
)
"""
self.execute_query(query)

def parse_filename(self, filename):
# patten: SMAP_L2B_SSS_NRT_54047_A_20250315T011742.h5
basename = os.path.basename(filename)
parts = basename.split('_')
try:
if basename.startswith("SMAP_L2B_SSS_NRT") and len(parts) >= 7:
Comment thread
apchoiCMD marked this conversation as resolved.
Outdated
satellite = "SMAP"
timestamp_with_ext = parts[6]
timestamp_str = os.path.splitext(timestamp_with_ext)[0]
obs_time = datetime.strptime(timestamp_str, "%Y%m%dT%H%M%S")
receipt_time = datetime.fromtimestamp(os.path.getctime(filename))
return filename, obs_time, receipt_time, satellite

except ValueError as e:
print(f"[DEBUG] Error parsing filename {filename}: {e}")
return None

def ingest_files(self):
"""Scan the directory for new observation files and insert them into the database."""
obs_files = glob.glob(os.path.join(self.base_dir, "*.h5"))
print(f"Found {len(obs_files)} new files to ingest")

# Counter for successful ingestions
ingested_count = 0

for file in obs_files:
parsed_data = self.parse_filename(file)
if parsed_data:
query = """
INSERT INTO obs_files (filename, obs_time, receipt_time, satellite)
VALUES (?, ?, ?, ?)
"""
try:
self.insert_record(query, parsed_data)
ingested_count += 1
except Exception as e:
print(f"Failed to insert record for {file}: {e}")
print(f"################################ Successfully ingested {ingested_count} files into the database.")
78 changes: 78 additions & 0 deletions ush/python/pyobsforge/obsdb/smos_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import os
import glob
from datetime import datetime
from pyobsforge.obsdb import BaseDatabase


class SmosDatabase(BaseDatabase):
"""Class to manage an observation file database for data assimilation."""

def __init__(self, db_name="smos.db",
dcom_dir="/lfs/h1/ops/prod/dcom/",
obs_dir="wtxtbul/satSSS/SMOS"):
base_dir = os.path.join(dcom_dir, '*', obs_dir)
super().__init__(db_name, base_dir)

def create_database(self):
"""
Create the SQLite database and observation files table.

This method initializes the database with a table named `obs_files` to store metadata
about observation files. The table contains the following columns:

- `id`: A unique identifier for each record (auto-incremented primary key).
- `filename`: The full path to the observation file (must be unique).
- `obs_time`: The timestamp of the observation, extracted from the filename.
- `receipt_time`: The timestamp when the file was added to the `dcom` directory.
- `satellite`: The satellite from which the observation was collected (e.g., GW1).

The table is created if it does not already exist.
"""
query = """
CREATE TABLE IF NOT EXISTS obs_files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT UNIQUE,
obs_time TIMESTAMP,
receipt_time TIMESTAMP,
satellite TEXT
)
"""
self.execute_query(query)

def parse_filename(self, filename):
# patten: SM_OPER_MIR_OSUDP2_20250315T001156_20250315T010515_700_001_1.nc
basename = os.path.basename(filename)
parts = basename.split('_')
try:
if basename.startswith("SM_OPER_MIR_OSUDP") and len(parts) >= 6:
Comment thread
apchoiCMD marked this conversation as resolved.
Outdated
satellite = "SMOS"
start_time_str = parts[4]
obs_time = datetime.strptime(start_time_str, "%Y%m%dT%H%M%S")
receipt_time = datetime.fromtimestamp(os.path.getctime(filename))
return filename, obs_time, receipt_time, satellite

except ValueError as e:
print(f"[DEBUG] Error parsing filename {filename}: {e}")
return None

def ingest_files(self):
"""Scan the directory for new observation files and insert them into the database."""
obs_files = glob.glob(os.path.join(self.base_dir, "*.nc"))
print(f"Found {len(obs_files)} new files to ingest")

# Counter for successful ingestions
ingested_count = 0

for file in obs_files:
parsed_data = self.parse_filename(file)
if parsed_data:
query = """
INSERT INTO obs_files (filename, obs_time, receipt_time, satellite)
VALUES (?, ?, ?, ?)
"""
try:
self.insert_record(query, parsed_data)
ingested_count += 1
except Exception as e:
print(f"Failed to insert record for {file}: {e}")
print(f"################################ Successfully ingested {ingested_count} files into the database.")
19 changes: 19 additions & 0 deletions ush/python/pyobsforge/task/marine_prepobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def __init__(self, config: Dict[str, Any]) -> None:
# Initialize the Providers
self.ghrsst = ProviderConfig.from_task_config("ghrsst", self.task_config)
self.rads = ProviderConfig.from_task_config("rads", self.task_config)
self.amsr2 = ProviderConfig.from_task_config("amsr2", self.task_config)

# Initialize the list of processed ioda files
# TODO: Does not work. This should be a list of gathered ioda files that are created
Expand All @@ -50,6 +51,7 @@ def initialize(self) -> None:
# Update the database with new files
self.ghrsst.db.ingest_files()
self.rads.db.ingest_files()
self.amsr2.db.ingest_files()

@logit(logger)
def execute(self) -> None:
Expand Down Expand Up @@ -127,6 +129,23 @@ def process_obs_space(self,
}
result = self.rads.process_obs_space(**kwargs)
return result

# Process AMSR2
if provider == "amsr2":
Comment thread
apchoiCMD marked this conversation as resolved.
Outdated
parts = obs_space.split("_")
platform = parts[1].upper()
kwargs = {
'provider': provider,
'obs_space': obs_space,
'platform': platform,
'obs_type': "seaice",
'output_file': output_file,
'window_begin': self.task_config.window_begin,
'window_end': self.task_config.window_end,
'task_config': self.task_config
}
result = self.amsr2.process_obs_space(**kwargs)
return result
else:
logger.error(f"Provider {provider} not supported")

Expand Down
5 changes: 5 additions & 0 deletions ush/python/pyobsforge/task/providers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from logging import getLogger
from pyobsforge.obsdb.ghrsst_db import GhrSstDatabase
from pyobsforge.obsdb.rads_db import RADSDatabase
from pyobsforge.obsdb.amsr2_db import Amsr2Database
from typing import Any
from dataclasses import dataclass
from wxflow import AttrDict
Expand Down Expand Up @@ -59,6 +60,8 @@ def from_task_config(cls, provider_name: str, task_config: AttrDict) -> "Provide
db = GhrSstDatabase(db_name=f"{provider_name}.db", dcom_dir=task_config.DCOMROOT, obs_dir="sst")
elif provider_name == "rads":
db = RADSDatabase(db_name=f"{provider_name}.db", dcom_dir=task_config.DCOMROOT, obs_dir="wgrdbul/adt")
elif provider_name == "amsr2":
db = Amsr2Database(db_name=f"{provider_name}.db", dcom_dir=task_config.DCOMROOT, obs_dir="seaice/pda")
else:
raise NotImplementedError(f"DB setup for provider {provider_name} not yet implemented")

Expand Down Expand Up @@ -92,6 +95,8 @@ def process_obs_space(self, **kwargs) -> None:
window_end = kwargs.get('window_end')
task_config = kwargs.get('task_config')

logger.debug(f"obs_type for provider {provider}: {obs_type}")
Comment thread
apchoiCMD marked this conversation as resolved.
Outdated

# Query the database for valid files
input_files = self.db.get_valid_files(window_begin=window_begin,
window_end=window_end,
Expand Down
Loading