diff --git a/modulefiles/obsforge/wcoss2.intel.lua b/modulefiles/obsforge/wcoss2.intel.lua index 23f7d11b..d18634c3 100644 --- a/modulefiles/obsforge/wcoss2.intel.lua +++ b/modulefiles/obsforge/wcoss2.intel.lua @@ -15,20 +15,20 @@ load("stack-python/3.10.13") load("craype/2.7.17") load("cray-pals/1.3.2") load("cmake/3.23.1") -load("gettext/0.19.7") -load("pcre2/10.42") -load("curl/8.4.0") +--load("gettext/0.19.7") +--load("pcre2/10.42") +--load("curl/8.4.0") load("zlib/1.2.13") -load("git/2.35.3") -load("git-lfs/2.11.0") -load("pkg-config/0.29.2") +--load("git/2.35.3") +--load("git-lfs/2.11.0") +--load("pkg-config/0.29.2") load("hdf5/1.14.0") load("parallel-netcdf/1.12.2") load("netcdf-c/4.9.2") load("nccmp/1.9.0.1") load("netcdf-fortran/4.6.1") -load("nco/5.0.6") -load("parallelio/2.5.10") +--load("nco/5.0.6") +--load("parallelio/2.5.10") load("boost/1.83.0") load("bufr/12.0.1") load("ecbuild/3.7.2") @@ -38,22 +38,22 @@ load("openblas/0.3.24") load("eckit/1.24.5") load("fftw/3.3.10") load("fckit/0.11.0") -load("fms/2023.04") -load("esmf/8.5.0") +--load("fms/2023.04") +--load("esmf/8.5.0") load("atlas/0.35.0") -load("sp/2.5.0") +--load("sp/2.5.0") load("gsl-lite/0.37.0") load("libjpeg/2.1.0") load("libpng/1.6.37") -load("libxt/1.1.5") -load("libxmu/1.1.4") -load("libxpm/3.5.12") -load("libxaw/1.0.13") +--load("libxt/1.1.5") +--load("libxmu/1.1.4") +--load("libxpm/3.5.12") +--load("libxaw/1.0.13") load("udunits/2.2.28") -load("ncview/2.1.9") +--load("ncview/2.1.9") load("netcdf-cxx4/4.3.1") -load("core/rocoto/1.3.5") -load("prod_util/2.0.14") +--load("core/rocoto/1.3.5") +--load("prod_util/2.0.14") load("py-setuptools/63.4.3") load("py-jinja2/3.1.2") @@ -61,20 +61,20 @@ load("py-netcdf4/1.5.8") load("py-pybind11/2.11.1") load("py-pycodestyle/2.11.0") load("py-pyyaml/5.4.1") -load("py-scipy/1.10.1") -load("py-xarray/2023.7.0") -load("py-f90nml/1.4.3") -load("py-pip/23.1.2") -load("py-bottleneck/1.3.7") -load("py-numexpr/2.8.4") -load("py-six/1.16.0") +--load("py-scipy/1.10.1") +--load("py-xarray/2023.7.0") +--load("py-f90nml/1.4.3") +--load("py-pip/23.1.2") +--load("py-bottleneck/1.3.7") +--load("py-numexpr/2.8.4") +--load("py-six/1.16.0") load("py-python-dateutil/2.8.2") load("py-pytz/2023.3") -load("py-pandas/1.5.3") +--load("py-pandas/1.5.3") load("py-numpy/1.24.4") -load("py-markupsafe/2.1.3") +--load("py-markupsafe/2.1.3") load("py-cftime/1.0.3.4") -load("py-packaging/23.1") +--load("py-packaging/23.1") setenv("CC","cc") setenv("CXX","CC") diff --git a/parm/config.hera.yaml b/parm/config.hera.yaml index 1dee5c59..4a2e9dc4 100644 --- a/parm/config.hera.yaml +++ b/parm/config.hera.yaml @@ -15,7 +15,7 @@ obsforge: aoddump: provider: VIIRSAOD - platforms: ['npp', 'n20', 'n21'] + platforms: ['npp', 'n20', 'n21'] # note j01==n20 thinning_threshold: 0 channel: 4 preqc: 2 diff --git a/parm/config.hercules.yaml b/parm/config.hercules.yaml index be9ce00c..9e4c336b 100644 --- a/parm/config.hercules.yaml +++ b/parm/config.hercules.yaml @@ -15,7 +15,7 @@ obsforge: aoddump: provider: VIIRSAOD - platforms: ['npp', 'n20', 'n21'] + platforms: ['npp', 'n20', 'n21'] # note j01==n20 thinning_threshold: 0 channel: 4 preqc: 2 diff --git a/parm/config.orion.yaml b/parm/config.orion.yaml index 4c5be6d9..1f58021f 100644 --- a/parm/config.orion.yaml +++ b/parm/config.orion.yaml @@ -15,7 +15,7 @@ obsforge: aoddump: provider: VIIRSAOD - platforms: ['npp', 'n20', 'n21'] + platforms: ['npp', 'j01', 'n21'] # note j01==n20 thinning_threshold: 0 channel: 4 preqc: 2 diff --git a/parm/config.wcoss2.yaml b/parm/config.wcoss2.yaml new file mode 100644 index 00000000..701eeefb --- /dev/null +++ b/parm/config.wcoss2.yaml @@ -0,0 +1,90 @@ +obsforge: + PSLOT: obsforge + HOMEobsforge: /lfs/h2/emc/da/noscrub/cory.r.martin/obsforge/obsforge/ + SDATE: 202504250000 + EDATE: 202506250000 + COMROOT: /lfs/h2/emc/ptmp/cory.r.martin/obsforge + DCOMROOT: /lfs/h1/ops/prod/dcom/ + DATAROOT: /lfs/h2/emc/stmp/cory.r.martin/RUNDIRS + SCHEDULER: pbspro + ACCOUNT: GFS-DEV + QUEUE: dev + PARTITION: dev + KEEPDATA: YES + assim_freq: 6 + +aoddump: + provider: VIIRSAOD + platforms: ['npp', 'j01', 'n21'] # note j01==n20 + thinning_threshold: 0 + channel: 4 + preqc: 2 + WALLTIME_AOD_DUMP: '00:10:00' + TASK_GEOM_AOD_DUMP: '1:ppn=1:tpp=1' + MEMORY_AOD_DUMP: 96GB + +marinedump: + providers: + ghrsst: + list: + - sst_viirs_n21_l3u + - sst_viirs_n20_l3u + - sst_viirs_npp_l3u + - sst_avhrrf_ma_l3u + - sst_avhrrf_mb_l3u + - sst_avhrrf_mc_l3u + - sst_ahi_h08_l3c + - sst_abi_g17_l3c + - sst_abi_g16_l3c + qc config: + min: -2.0 + max: 45.0 + stride: 15 + min number of obs: 10 + rads: + list: + - rads_adt_3a + - rads_adt_3b + - rads_adt_6a + - rads_adt_c2 + - rads_adt_j2 + - rads_adt_j3 + - rads_adt_sa + - rads_adt_sw + qc config: + min: -2.0 + max: 3.0 + error ratio: 1.0 + nesdis_amsr2: + list: + - icec_amsr2_north + - icec_amsr2_south + qc config: + min: 0.0 + max: 1.0 + nesdis_mirs: + list: + - icec_amsu_ma1_l2 + - icec_atms_n20_l2 + - icec_atms_n21_l2 + - icec_atms_npp_l2 +# - icec_gmi_gpm_l2 + qc config: + min: 0.0 + max: 1.0 + smap: + list: + - sss_smap_l2 + qc config: + min: 0.1 + max: 40.0 + smos: + list: + - sss_smos_l2 + qc config: + min: 0.1 + max: 40.0 + + WALLTIME_MARINE_DUMP: '00:10:00' + TASK_GEOM_MARINE_DUMP: '1:ppn=20:tpp=2' + MEMORY_MARINE_DUMP: 32GB diff --git a/parm/config.yaml b/parm/config.yaml index 7d8d61b4..c528257c 120000 --- a/parm/config.yaml +++ b/parm/config.yaml @@ -1 +1 @@ -config.hercules.yaml \ No newline at end of file +config.wcoss2.yaml \ No newline at end of file diff --git a/parm/obsforge_rocoto_template_pbspro.xml.j2 b/parm/obsforge_rocoto_template_pbspro.xml.j2 new file mode 100644 index 00000000..bda2dbe1 --- /dev/null +++ b/parm/obsforge_rocoto_template_pbspro.xml.j2 @@ -0,0 +1,137 @@ + + + + + + +]> + + + + {{ COMROOT }}/{{ PSLOT }}/logs/@Y@m@d@H.log + + + {{ SDATE }} {{ EDATE }} 06:00:00 + {{ SDATE }} {{ EDATE }} 06:00:00 + + + + + {{ HOMEobsforge }}/jobs/rocoto/aoddump.sh + + obsforge_gfs_aod_dump_@H + {{ ACCOUNT }} + {{ QUEUE }} + {{ WALLTIME_AOD_DUMP }} + {{ TASK_GEOM_AOD_DUMP }} + {{ MEMORY_AOD_DUMP }} + -l place=vscatter:shared + + {{ COMROOT }}/{{ PSLOT }}/logs/@Y@m@d@H/gfs_aod_dump_prep.log + + RUN_ENVIRemc + HOMEobsforge{{ HOMEobsforge }} + NETgfs + RUNgfs + CDATE@Y@m@d@H + PDY@Y@m@d + cyc@H + KEEPDATA{{ KEEPDATA }} + COMROOT{{ COMROOT }} + DCOMROOT{{ DCOMROOT }} + DATAROOT{{ DATAROOT }}/{{ PSLOT }}/gfs.@Y@m@d@H + + + + + + + + + {{ HOMEobsforge }}/jobs/rocoto/aoddump.sh + + obsforge_gdas_aod_dump_@H + {{ ACCOUNT }} + {{ QUEUE }} + {{ WALLTIME_AOD_DUMP }} + {{ TASK_GEOM_AOD_DUMP }} + {{ MEMORY_AOD_DUMP }} + -l place=vscatter:shared + + {{ COMROOT }}/{{ PSLOT }}/logs/@Y@m@d@H/gdas_aod_dump_prep.log + + RUN_ENVIRemc + HOMEobsforge{{ HOMEobsforge }} + NETgfs + RUNgdas + CDATE@Y@m@d@H + PDY@Y@m@d + cyc@H + KEEPDATA{{ KEEPDATA }} + COMROOT{{ COMROOT }} + DCOMROOT{{ DCOMROOT }} + DATAROOT{{ DATAROOT }}/{{ PSLOT }}/gdas.@Y@m@d@H + + + + + + + + + {{ HOMEobsforge }}/jobs/rocoto/marinedump.sh + + obsforge_gfs_marine_dump_@H + {{ ACCOUNT }} + {{ QUEUE }} + {{ WALLTIME_MARINE_DUMP }} + {{ TASK_GEOM_MARINE_DUMP }} + {{ MEMORY_MARINE_DUMP }} + -l place=vscatter:shared + + {{ COMROOT }}/{{ PSLOT }}/logs/@Y@m@d@H/gfs_marine_dump_prep.log + + RUN_ENVIRemc + HOMEobsforge{{ HOMEobsforge }} + NETgfs + RUNgfs + CDATE@Y@m@d@H + PDY@Y@m@d + cyc@H + KEEPDATA{{ KEEPDATA }} + COMROOT{{ COMROOT }} + DCOMROOT{{ DCOMROOT }} + DATAROOT{{ DATAROOT }}/{{ PSLOT }}/gfs.@Y@m@d@H + + + + + + diff --git a/parm/obsforge_rocoto_template_pbspro_realtime.xml.j2 b/parm/obsforge_rocoto_template_pbspro_realtime.xml.j2 new file mode 100644 index 00000000..4b5644b9 --- /dev/null +++ b/parm/obsforge_rocoto_template_pbspro_realtime.xml.j2 @@ -0,0 +1,100 @@ + + + + + + +]> + + + + {{ COMROOT }}/{{ PSLOT }}/logs/@Y@m@d@H.log + + + 0 */6 * * 2025 * + 0 */6 * * 2025 * + 0 */6 * * 2025 * + + + + + {{ HOMEobsforge }}/jobs/rocoto/aoddump.sh + + obsforge_gcdas_aod_dump_@H + {{ ACCOUNT }} + {{ QUEUE }} + {{ WALLTIME_AOD_DUMP }} + {{ TASK_GEOM_AOD_DUMP }} + {{ MEMORY_AOD_DUMP }} + -l place=vscatter:shared + + {{ COMROOT }}/{{ PSLOT }}/logs/@Y@m@d@H/gcdas_aod_dump_prep.log + + RUN_ENVIRemc + HOMEobsforge{{ HOMEobsforge }} + NETgcdas + RUNgcdas + CDATE@Y@m@d@H + PDY@Y@m@d + cyc@H + KEEPDATA{{ KEEPDATA }} + COMROOT{{ COMROOT }} + DCOMROOT{{ DCOMROOT }} + DATAROOT{{ DATAROOT }}/{{ PSLOT }}/gcdas.@Y@m@d@H + + + + + + + + + {{ HOMEobsforge }}/jobs/rocoto/marinedump.sh + + obsforge_gfs_marine_dump_@H + {{ ACCOUNT }} + {{ QUEUE }} + {{ WALLTIME_MARINE_DUMP }} + {{ TASK_GEOM_MARINE_DUMP }} + {{ MEMORY_MARINE_DUMP }} + -l place=vscatter:shared + + {{ COMROOT }}/{{ PSLOT }}/logs/@Y@m@d@H/gfs_marine_dump_prep.log + + RUN_ENVIRemc + HOMEobsforge{{ HOMEobsforge }} + NETgfs + RUNgfs + CDATE@Y@m@d@H + PDY@Y@m@d + cyc@H + KEEPDATA{{ KEEPDATA }} + COMROOT{{ COMROOT }} + DCOMROOT{{ DCOMROOT }} + DATAROOT{{ DATAROOT }}/{{ PSLOT }}/gfs.@Y@m@d@H + + + + + + diff --git a/ush/python/pyobsforge/obsdb/ghrsst_db.py b/ush/python/pyobsforge/obsdb/ghrsst_db.py index 17c3c807..efb4bda6 100644 --- a/ush/python/pyobsforge/obsdb/ghrsst_db.py +++ b/ush/python/pyobsforge/obsdb/ghrsst_db.py @@ -62,19 +62,19 @@ def ingest_files(self): obs_files = ospo_files + star_files print(f"Found {len(obs_files)} new files to ingest") - # Counter for successful ingestions - ingested_count = 0 - + records_to_insert = [] for file in obs_files: parsed_data = self.parse_filename(file) if parsed_data: - query = """ - INSERT INTO obs_files (filename, obs_time, receipt_time, instrument, satellite, obs_type) - VALUES (?, ?, ?, ?, ?, ?) - """ - try: - self.insert_record(query, parsed_data) - ingested_count += 1 - except Exception as e: - print(f"Failed to insert record for {file}: {e}") - print(f"################################ Successfully ingested {ingested_count} files into the database.") + records_to_insert.append(parsed_data) + + if records_to_insert: + query = """ + INSERT INTO obs_files (filename, obs_time, receipt_time, instrument, satellite, obs_type) + VALUES (?, ?, ?, ?, ?, ?) + """ + try: + self.insert_records(query, records_to_insert) + print(f"################################ Successfully ingested {len(records_to_insert)} files into the database.") + except Exception as e: + print(f"Failed to insert records: {e}") diff --git a/ush/python/pyobsforge/obsdb/jrr_aod_db.py b/ush/python/pyobsforge/obsdb/jrr_aod_db.py index d894ea10..3a14b3a3 100644 --- a/ush/python/pyobsforge/obsdb/jrr_aod_db.py +++ b/ush/python/pyobsforge/obsdb/jrr_aod_db.py @@ -56,14 +56,19 @@ def ingest_files(self): """Scan the directory for new JRR-AOD observation files and insert them into the database.""" obs_files = glob.glob(os.path.join(self.base_dir, "*.nc")) print(f"Found {len(obs_files)} new files to ingest") + + records_to_insert = [] for file in obs_files: parsed_data = self.parse_filename(file) if parsed_data: - query = """ - INSERT INTO obs_files (filename, obs_time, receipt_time, satellite) - VALUES (?, ?, ?, ?) - """ - self.insert_record(query, parsed_data) + records_to_insert.append(parsed_data) + + if records_to_insert: + query = """ + INSERT INTO obs_files (filename, obs_time, receipt_time, satellite) + VALUES (?, ?, ?, ?) + """ + self.insert_records(query, records_to_insert) if __name__ == "__main__": diff --git a/ush/python/pyobsforge/obsdb/nesdis_amsr2_db.py b/ush/python/pyobsforge/obsdb/nesdis_amsr2_db.py index 1debdd41..c44c03b1 100644 --- a/ush/python/pyobsforge/obsdb/nesdis_amsr2_db.py +++ b/ush/python/pyobsforge/obsdb/nesdis_amsr2_db.py @@ -79,22 +79,23 @@ def parse_filename(self, filename): return None def ingest_files(self): - """Scan the directory for new observation files and insert them into the database.""" + """Scan the directory for new NESDIS AMSR2 observation files and insert them into the database.""" obs_files = glob.glob(os.path.join(self.base_dir, "*.nc")) + print(f"Found {len(obs_files)} new files to ingest") - # Counter for successful ingestions - ingested_count = 0 - + records_to_insert = [] for file in obs_files: parsed_data = self.parse_filename(file) if parsed_data: - query = """ - INSERT INTO obs_files (filename, obs_time, receipt_time, instrument, satellite, obs_type) - VALUES (?, ?, ?, ?, ?, ?) - """ - try: - self.insert_record(query, parsed_data) - ingested_count += 1 - except Exception as e: - print(f"[DEBUG] Failed to insert record for {file}: {e}") - print(f"################################ Successfully ingested {ingested_count} files into the database.") + records_to_insert.append(parsed_data) + + if records_to_insert: + query = """ + INSERT INTO obs_files (filename, obs_time, receipt_time, instrument, satellite, obs_type) + VALUES (?, ?, ?, ?, ?, ?) + """ + try: + self.insert_records(query, records_to_insert) + print(f"################################ Successfully ingested {len(records_to_insert)} files into the database.") + except Exception as e: + print(f"Failed to insert records: {e}") diff --git a/ush/python/pyobsforge/obsdb/nesdis_mirs_db.py b/ush/python/pyobsforge/obsdb/nesdis_mirs_db.py index 0f984543..5228c923 100644 --- a/ush/python/pyobsforge/obsdb/nesdis_mirs_db.py +++ b/ush/python/pyobsforge/obsdb/nesdis_mirs_db.py @@ -79,26 +79,29 @@ def parse_filename(self, filename): return None def ingest_files(self): + """Scan the directory for new NESDIS MIRS observation files and insert them into the database.""" obs_files = [] for base in self.base_dir: matched = glob.glob(os.path.join(base, "*.nc")) obs_files.extend(matched) - ingested_count = 0 + print(f"[INFO] Found {len(obs_files)} new files to ingest") + + records_to_insert = [] for file in obs_files: parsed_data = self.parse_filename(file) - if not parsed_data: + if parsed_data: + records_to_insert.append(parsed_data) + else: print(f"[WARN] Skipped (unparseable): {os.path.basename(file)}") - continue + if records_to_insert: query = """ INSERT INTO obs_files (filename, obs_time, receipt_time, instrument, satellite, obs_type) VALUES (?, ?, ?, ?, ?, ?) """ try: - self.insert_record(query, parsed_data) - ingested_count += 1 + self.insert_records(query, records_to_insert) + print(f"[INFO] Successfully ingested {len(records_to_insert)} files into the database.") except Exception as e: - print(f"[ERROR] Failed to insert {file}: {e}") - - print(f"[INFO] Successfully ingested {ingested_count} files into the database.") + print(f"[ERROR] Failed to insert records: {e}") diff --git a/ush/python/pyobsforge/obsdb/obsdb.py b/ush/python/pyobsforge/obsdb/obsdb.py index fa92d311..ba17a947 100644 --- a/ush/python/pyobsforge/obsdb/obsdb.py +++ b/ush/python/pyobsforge/obsdb/obsdb.py @@ -50,6 +50,23 @@ def insert_record(self, query: str, params: tuple) -> None: finally: self.disconnect() + def insert_records(self, query: str, params_list: list[tuple]) -> None: + """ + Insert multiple records into the database. + + :param query: SQL query for inserting records. + :param params_list: List of tuples containing the parameters for each record. + """ + self.connect() + cursor = self.connection.cursor() + try: + cursor.executemany(query, params_list) + self.connection.commit() + except sqlite3.IntegrityError: + pass # Skip duplicates + finally: + self.disconnect() + def execute_query(self, query: str, params: tuple = None) -> list: """Execute a query and return the results.""" self.connect() diff --git a/ush/python/pyobsforge/obsdb/rads_db.py b/ush/python/pyobsforge/obsdb/rads_db.py index 9d33554a..7a6912b0 100644 --- a/ush/python/pyobsforge/obsdb/rads_db.py +++ b/ush/python/pyobsforge/obsdb/rads_db.py @@ -50,14 +50,23 @@ def parse_filename(self, filename): return None def ingest_files(self): - """Scan the directory for new observation files and insert them into the database.""" - obs_files = glob.glob(os.path.join(self.base_dir, "rads_adt_??_???????.nc")) + """Scan the directory for new RADS observation files and insert them into the database.""" + obs_files = glob.glob(os.path.join(self.base_dir, "*.nc")) print(f"Found {len(obs_files)} new files to ingest") + + records_to_insert = [] for file in obs_files: parsed_data = self.parse_filename(file) if parsed_data: - query = """ - INSERT INTO obs_files (filename, obs_time, receipt_time, satellite) + records_to_insert.append(parsed_data) + + if records_to_insert: + query = """ + INSERT INTO obs_files (filename, obs_time, receipt_time, satellite) VALUES (?, ?, ?, ?) - """ - self.insert_record(query, parsed_data) + """ + try: + self.insert_records(query, records_to_insert) + print(f"################################ Successfully ingested {len(records_to_insert)} files into the database.") + except Exception as e: + print(f"Failed to insert records: {e}") diff --git a/ush/python/pyobsforge/obsdb/smap_db.py b/ush/python/pyobsforge/obsdb/smap_db.py index fbe115e4..933a007d 100644 --- a/ush/python/pyobsforge/obsdb/smap_db.py +++ b/ush/python/pyobsforge/obsdb/smap_db.py @@ -68,19 +68,21 @@ def ingest_files(self): obs_files = glob.glob(os.path.join(self.base_dir, "*.h5")) print(f"Found {len(obs_files)} new files to ingest") - # Counter for successful ingestions - ingested_count = 0 - + records_to_insert = [] for file in obs_files: parsed_data = self.parse_filename(file) if parsed_data: - query = """ - INSERT INTO obs_files (filename, obs_time, receipt_time, satellite, obs_type) - VALUES (?, ?, ?, ?, ?) - """ - try: - self.insert_record(query, parsed_data) - ingested_count += 1 - except Exception as e: - print(f"Failed to insert record for {file}: {e}") - print(f"################################ Successfully ingested {ingested_count} files into the database.") + records_to_insert.append(parsed_data) + else: + print(f"[DEBUG] Skipped (unparseable): {os.path.basename(file)}") + + if records_to_insert: + query = """ + INSERT INTO obs_files (filename, obs_time, receipt_time, satellite, obs_type) + VALUES (?, ?, ?, ?, ?) + """ + try: + self.insert_records(query, records_to_insert) + print(f"################################ Successfully ingested {len(records_to_insert)} files into the database.") + except Exception as e: + print(f"[ERROR] Failed to insert records: {e}") diff --git a/ush/python/pyobsforge/obsdb/smos_db.py b/ush/python/pyobsforge/obsdb/smos_db.py index 3ea89705..f27a3c1f 100644 --- a/ush/python/pyobsforge/obsdb/smos_db.py +++ b/ush/python/pyobsforge/obsdb/smos_db.py @@ -68,19 +68,21 @@ def ingest_files(self): obs_files = glob.glob(os.path.join(self.base_dir, "*.nc")) print(f"Found {len(obs_files)} new files to ingest") - # Counter for successful ingestions - ingested_count = 0 - + records_to_insert = [] for file in obs_files: parsed_data = self.parse_filename(file) if parsed_data: - query = """ - INSERT INTO obs_files (filename, obs_time, receipt_time, satellite, obs_type) - VALUES (?, ?, ?, ?, ?) - """ - try: - self.insert_record(query, parsed_data) - ingested_count += 1 - except Exception as e: - print(f"Failed to insert record for {file}: {e}") - print(f"################################ Successfully ingested {ingested_count} files into the database.") + records_to_insert.append(parsed_data) + else: + print(f"[DEBUG] Skipped (unparseable): {os.path.basename(file)}") + + if records_to_insert: + query = """ + INSERT INTO obs_files (filename, obs_time, receipt_time, satellite, obs_type) + VALUES (?, ?, ?, ?, ?) + """ + try: + self.insert_records(query, records_to_insert) + print(f"################################ Successfully ingested {len(records_to_insert)} files into the database.") + except Exception as e: + print(f"[ERROR] Failed to insert records: {e}") diff --git a/ush/python/pyobsforge/task/aero_prepobs.py b/ush/python/pyobsforge/task/aero_prepobs.py index 9dfe2e3e..43e85326 100644 --- a/ush/python/pyobsforge/task/aero_prepobs.py +++ b/ush/python/pyobsforge/task/aero_prepobs.py @@ -1,10 +1,12 @@ #!/usr/bin/env python3 +import glob +import os from logging import getLogger from typing import Dict, Any from wxflow import (AttrDict, Task, add_to_datetime, to_timedelta, - logit) + logit, FileHandler) from pyobsforge.obsdb.jrr_aod_db import JrrAodDatabase from pyobsforge.task.run_nc2ioda import run_nc2ioda @@ -60,7 +62,8 @@ def execute(self) -> None: if len(input_files) > 0: print(f"number of valid files: {len(input_files)}") obs_space = 'jrr_aod' - output_file = f"{self.task_config['RUN']}.t{self.task_config['cyc']:02d}z.{obs_space}.tm00.nc" + platform_out = 'n20' if platform == 'j01' else platform + output_file = f"{self.task_config['RUN']}.t{self.task_config['cyc']:02d}z.viirs_{platform_out}_aod.tm00.nc" context = {'provider': 'VIIRSAOD', 'window_begin': self.task_config.window_begin, 'window_end': self.task_config.window_end, @@ -74,4 +77,34 @@ def execute(self) -> None: def finalize(self) -> None: """ """ - print("finalize") + # Copy the processed ioda files to the destination directory + logger.info("Copying ioda files to destination COMROOT directory") + yyyymmdd = self.task_config['PDY'].strftime('%Y%m%d') + + comout = os.path.join(self.task_config['COMROOT'], + self.task_config['PSLOT'], + f"{self.task_config['RUN']}.{yyyymmdd}", + f"{self.task_config['cyc']:02d}", + 'chem') + + # Loop through the observation types + obs_types = ['viirs'] + src_dst_obs_list = [] # list of [src_file, dst_file] + for obs_type in obs_types: + # Create the destination directory + comout_tmp = os.path.join(comout, obs_type) + FileHandler({'mkdir': [comout_tmp]}).sync() + + # Glob the ioda files + ioda_files = glob.glob(os.path.join(self.task_config['DATA'], + f"{self.task_config['OPREFIX']}*{obs_type}_*.nc")) + for ioda_file in ioda_files: + logger.info(f"ioda_file: {ioda_file}") + src_file = ioda_file + dst_file = os.path.join(comout_tmp, os.path.basename(ioda_file)) + src_dst_obs_list.append([src_file, dst_file]) + + logger.info("Copying ioda files to destination COMROOT directory") + logger.info(f"src_dst_obs_list: {src_dst_obs_list}") + + FileHandler({'copy': src_dst_obs_list}).sync()