Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@ cd build
ctest -R test_obsforge_util
```



# Workflow usage
```console
source ush/of_setup.sh
setup_xml.py --config config.yaml --template obsforge_rocoto_template.xml.j2 --output obsforge.xml
```

load rocoto
#### Note:
To load `rocoto` on WCOSS2:
```
module use /apps/ops/test/nco/modulefiles/core
module load rocoto
Expand Down
61 changes: 61 additions & 0 deletions parm/config.orion.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
obsforge:
PSLOT: obsforge
HOMEobsforge: /work/noaa/da/gvernier/runs/obsForge
SDATE: 202503141800
EDATE: 202503150000
COMROOT: /work/noaa/da/gvernier/runs/test_obsforge/COMROOT
DCOMROOT: /work2/noaa/da/common/lfs/h1/ops/prod/dcom
DATAROOT: /work/noaa/da/gvernier/runs/test_obsforge/RUNDIRS
SCHEDULER: slurm
ACCOUNT: da-cpu
QUEUE: debug
PARTITION: orion
KEEPDATA: NO
assim_freq: 6

aoddump:
provider: VIIRSAOD
platforms: ['npp', 'n20', 'n21']
thinning_threshold: 0
channel: 4
preqc: 2
WALLTIME_AOD_DUMP: '00:10:00'
TASK_GEOM_AOD_DUMP: '1:ppn=1:tpp=1'
MEMORY_AOD_DUMP: 96GB

marinedump:
providers:
ghrsst:
list:
- sst_viirs_n21_l3u
- sst_viirs_n20_l3u
- sst_viirs_npp_l3u
- sst_avhrrf_ma_l3u
- sst_avhrrf_mb_l3u
- sst_avhrrf_mc_l3u
- sst_ahi_h08_l3c
- sst_abi_g17_l3c
- sst_abi_g16_l3c
qc config:
min: -2.0
max: 45.0
stride: 15
min number of obs: 10
rads:
list:
- rads_adt_3a
- rads_adt_3b
- rads_adt_6a
- rads_adt_c2
- rads_adt_j2
- rads_adt_j3
- rads_adt_sa
- rads_adt_sw
qc config:
min: -2.0
max: 3.0
error ratio: 1.0

WALLTIME_MARINE_DUMP: '00:10:00'
TASK_GEOM_MARINE_DUMP: '1:ppn=20:tpp=2'
MEMORY_MARINE_DUMP: 32GB
47 changes: 0 additions & 47 deletions parm/config.yaml

This file was deleted.

1 change: 1 addition & 0 deletions parm/config.yaml
13 changes: 12 additions & 1 deletion parm/nc2ioda/nc2ioda.yaml.j2
Original file line number Diff line number Diff line change
@@ -1,22 +1,33 @@
provider: {{ provider }}
window begin: {{ window_begin | to_isotime }}
window end: {{ window_end | to_isotime }}
output file: {{ output_file }}

# Superobing configuration
{% if binning_stride is defined%}
binning:
stride: {{ binning_stride }}
min number of obs: {{ binning_min_number_of_obs }}
{% endif %}

# Bounds for rudimentary QC
{% if bounds_min is defined and bounds_max is defined %}
bounds:
min: {{ bounds_min }}
max: {{ bounds_max }}
{% endif %}
output file: {{ output_file }}

# Ocean basin flags
{% if ocean_basin is defined %}
ocean basin: {{ ocean_basin }}
{% endif %}
input files: {{ input_files }}

# Error adjustment for re-dated observations
{% if error_ratio is defined %}
error ratio: {{ error_ratio }}
{% endif %}

# Used in JRR_AOD
{% if provider == "VIIRSAOD"%}
variable: aerosolOpticalDepth
Expand Down
2 changes: 1 addition & 1 deletion sorc/bufr-query
Submodule bufr-query updated 40 files
+0 −3 .readthedocs.yaml
+1 −3 core/CMakeLists.txt
+7 −13 core/include/bufr/DataObject.h
+3 −36 core/include/bufr/encoders/Description.h
+0 −124 core/include/bufr/encoders/EncoderBase.h
+6 −4 core/include/bufr/encoders/netcdf/Encoder.h
+9 −112 core/src/encoders/Description.cpp
+0 −420 core/src/encoders/EncoderBase.cpp
+245 −13 core/src/encoders/netcdf/Encoder.cpp
+1 −2 docs/conf.py
+0 −20 docs/python_api.rst
+5 −21 docs/software_architecture.rst
+ docs/uml/BUFR_BigPicture.png
+ docs/uml/BUFR_BigPictureSeq.png
+ docs/uml/BUFR_CreateQuerySet.png
+ docs/uml/BUFR_DataCollection.png
+0 −57 docs/uml/BUFR_EncoderClassDiagram.puml
+ docs/uml/BUFR_QueryClassDiagram.png
+ docs/uml/BUFR_ResultSetGet.png
+ docs/uml/BUFR_Tokenizer.png
+64 −66 docs/yaml.rst
+0 −1 python/CMakeLists.txt
+1 −2 python/bufr/encoders/__init__.py
+0 −1 python/bufr/encoders/zarr/__init__.py
+0 −107 python/bufr/encoders/zarr/encoder.py
+0 −2 python/py_bufr.cpp
+0 −149 python/py_encoder_base.cpp
+1 −212 python/py_encoder_description.cpp
+1 −2 python/py_mpi.cpp
+0 −1 python/py_mpi.h
+0 −6 python/py_parser.cpp
+41 −51 test/CMakeLists.txt
+0 −55 test/testinput/bufrtest_dim_labels_mapping.yaml
+11 −11 test/testinput/bufrtest_python_mpi_test.py
+5 −58 test/testinput/bufrtest_python_test.py
+0 −1 tools/CMakeLists.txt
+1 −2 tools/bufr2netcdf/bufr2netcdf.cpp
+0 −8 tools/bufr2zarr/CMakeLists.txt
+0 −34 tools/bufr2zarr/bufr2zarr.py
+4 −15 tools/build_scripts/bufr_comp.sh
47 changes: 15 additions & 32 deletions ush/python/pyobsforge/obsdb/ghrsst_db.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import os
import glob
from datetime import datetime, timedelta
from datetime import datetime
from pyobsforge.obsdb import BaseDatabase


class GhrSstDatabase(BaseDatabase):
"""Class to manage an observation file database for data assimilation."""

def __init__(self, db_name="obs_files.db",
def __init__(self, db_name="ghrsst.db",
dcom_dir="/lfs/h1/ops/prod/dcom/",
obs_dir="sst"):
base_dir = os.path.join(dcom_dir, '*', obs_dir)
Expand Down Expand Up @@ -57,41 +57,24 @@ def parse_filename(self, filename):

def ingest_files(self):
"""Scan the directory for new observation files and insert them into the database."""
obs_files = glob.glob(os.path.join(self.base_dir, "*-OSPO-L3U_GHRSST-*.nc"))
ospo_files = glob.glob(os.path.join(self.base_dir, "*-OSPO-L3?_GHRSST-*.nc"))
star_files = glob.glob(os.path.join(self.base_dir, "*-STAR-L3?_GHRSST-*.nc"))
obs_files = ospo_files + star_files
print(f"Found {len(obs_files)} new files to ingest")

# Counter for successful ingestions
ingested_count = 0

for file in obs_files:
parsed_data = self.parse_filename(file)
if parsed_data:
query = """
INSERT INTO obs_files (filename, obs_time, receipt_time, instrument, satellite, obs_type)
VALUES (?, ?, ?, ?, ?, ?)
"""
self.insert_record(query, parsed_data)


# Example Usage
if __name__ == "__main__":
db = GhrSstDatabase(db_name="sst_obs.db",
dcom_dir="/home/gvernier/Volumes/hera-s1/runs/realtimeobs/lfs/h1/ops/prod/dcom/",
obs_dir="sst")

# Check for new files
db.ingest_files()

# Query files for a given DA cycle
da_cycle = "20250316000000"
window_begin = datetime.strptime(da_cycle, "%Y%m%d%H%M%S") - timedelta(hours=3)
window_end = datetime.strptime(da_cycle, "%Y%m%d%H%M%S") + timedelta(hours=3)

valid_files = db.get_valid_files(window_begin=window_begin,
window_end=window_end,
instrument="VIIRS",
satellite="NPP",
obs_type="SSTsubskin")

print(f"Found {len(valid_files)} valid files for DA cycle {da_cycle}")
for valid_file in valid_files:
if os.path.exists(valid_file):
print(f"Valid file: {valid_file}")
else:
print(f"File does not exist: {valid_file}")
try:
self.insert_record(query, parsed_data)
ingested_count += 1
except Exception as e:
print(f"Failed to insert record for {file}: {e}")
print(f"################################ Successfully ingested {ingested_count} files into the database.")
3 changes: 0 additions & 3 deletions ush/python/pyobsforge/obsdb/obsdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,6 @@ def get_valid_files(self,
query = "SELECT receipt_time FROM obs_files WHERE filename = ?"
receipt_time = self.execute_query(query, (filename,))[0][0]
receipt_time = datetime.strptime(receipt_time, "%Y-%m-%d %H:%M:%S.%f")
print("receipt time, window end:", receipt_time, window_end)
print("Types - receipt_time:", type(receipt_time), "window_end:", type(window_end))
if receipt_time <= window_end - timedelta(minutes=minutes_behind_realtime[check_receipt]):
continue

Expand All @@ -122,7 +120,6 @@ def get_valid_files(self,
for src_file in valid_files:
dst_file = join(dst_dir, f"{basename(src_file)}")
dst_files.append(dst_file)
logger.info(f"copying {src_file} to {dst_file}")
src_dst_obs_list.append([src_file, dst_file])
FileHandler({'mkdir': [dst_dir]}).sync()
FileHandler({'copy': src_dst_obs_list}).sync()
Expand Down
63 changes: 63 additions & 0 deletions ush/python/pyobsforge/obsdb/rads_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import os
import glob
from datetime import datetime, timedelta
from pyobsforge.obsdb import BaseDatabase


class RADSDatabase(BaseDatabase):
"""Class to manage an observation file database for data assimilation."""

def __init__(self, db_name="rads.db",
dcom_dir="/lfs/h1/ops/prod/dcom/",
obs_dir="sst"):
base_dir = os.path.join(dcom_dir, '*', obs_dir)
super().__init__(db_name, base_dir)

def create_database(self):
"""
Create the SQLite database and observation files table.

This method initializes the database with a table named `obs_files` to store metadata
about observation files. The table contains the following columns:

- `id`: A unique identifier for each record (auto-incremented primary key).
- `filename`: The full path to the observation file (must be unique).
- `obs_time`: The timestamp of the observation, extracted from the filename.
- `receipt_time`: The timestamp when the file was added to the `dcom` directory.
- `satellite`: The satellite from which the observation was collected (e.g., 3a, 3b, j3, ...).

The table is created if it does not already exist.
"""
query = """
CREATE TABLE IF NOT EXISTS obs_files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT UNIQUE,
obs_time TIMESTAMP,
receipt_time TIMESTAMP,
satellite TEXT
)
"""
self.execute_query(query)

def parse_filename(self, filename):
"""Extract metadata from filenames matching the expected pattern."""
parts = os.path.basename(filename).replace('.', '_').split('_')
if len(parts) == 5 and parts[0] == 'rads' and parts[1] == 'adt' and parts[3].isdigit():
obs_time = datetime.strptime(parts[3], "%Y%j") + timedelta(hours=12)
satellite = parts[2]
receipt_time = datetime.fromtimestamp(os.path.getctime(filename))
return filename, obs_time, receipt_time, satellite
return None

def ingest_files(self):
"""Scan the directory for new observation files and insert them into the database."""
obs_files = glob.glob(os.path.join(self.base_dir, "rads_adt_??_???????.nc"))
print(f"Found {len(obs_files)} new files to ingest")
for file in obs_files:
parsed_data = self.parse_filename(file)
if parsed_data:
query = """
INSERT INTO obs_files (filename, obs_time, receipt_time, satellite)
VALUES (?, ?, ?, ?)
"""
self.insert_record(query, parsed_data)
Loading