Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion utils/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ find_package(oops REQUIRED)
find_package(ioda REQUIRED)

add_subdirectory(preproc)
add_subdirectory(b2i)
add_subdirectory(test)

1 change: 1 addition & 0 deletions utils/b2i/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
add_subdirectory(test)
33 changes: 33 additions & 0 deletions utils/b2i/argo_ioda_variables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import numpy as np
from pyiodaconv import bufr
from b2iconverter.ioda_variables import IODAVariables


class ArgoIODAVariables(IODAVariables):
def __init__(self):
super().__init__()

def build_query(self):
q = super().build_query()
q.add('stationID', '*/WMOP')
q.add('latitude', '*/CLATH')
q.add('longitude', '*/CLONH')
q.add('depth', '*/GLPFDATA/WPRES')
q.add('temp', '*/GLPFDATA/SSTH')
q.add('saln', '*/GLPFDATA/SALNH')
return q

def set_from_query_result(self, r):
super().set_from_query_result(r)
# convert depth in pressure units to meters (rho * g * h)
self.metadata.depth = np.float32(self.metadata.depth.astype(float) * 0.0001)

def filter(self):
TS_mask = self.TemperatureFilter() & self.SalinityFilter()
# Separate ARGO profiles from subpfl tank
# the index for ARGO floats where the second number of the stationID=9
id_mask = [True if str(x)[1] == '9' else False for x in self.metadata.stationID]
mask = TS_mask & id_mask
self.metadata.filter(mask)
self.temp = self.temp[mask]
self.saln = self.saln[mask]
Empty file.
71 changes: 71 additions & 0 deletions utils/b2i/b2iconverter/bufr2ioda_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import json
import yaml
import os
import sys


# configuration file can be either json or yaml
# this config class provides the functions that determine
# the names and the paths of the bufr input and the ioda output files
# these functions can be overridden in the converter

class Bufr2iodaConfig:
def __init__(self, script_name, config_file, platform_description):
self.script_name = script_name
self.platform_description = platform_description

_, file_extension = os.path.splitext(config_file)
if file_extension == ".json":
with open(config_file, "r") as file:
config = json.load(file)
self.read_config(config)
elif file_extension == ".yaml":
with open(config_file, "r") as file:
config = yaml.safe_load(file)
self.read_config(config)
else:
print("Fatal error: Unknown file extension = ", file_extension)
sys.exit(1)

def read_config(self, config):
# Get parameters from configuration
self.data_format = config["data_format"]
self.source = config["source"]
self.data_type = config["data_type"]
self.data_description = config["data_description"]
self.data_provider = config["data_provider"]
self.cycle_type = config["cycle_type"]
self.cycle_datetime = config["cycle_datetime"]
self.dump_dir = config["dump_directory"]
self.ioda_dir = config["ioda_directory"]
self.ocean_basin = config["ocean_basin"]

self.yyyymmdd = self.cycle_datetime[0:8]
self.hh = self.cycle_datetime[8:10]

# General Information
self.converter = 'BUFR to IODA Converter'

def ocean_basin_nc_file_path(self):
return self.ocean_basin

def bufr_filename(self):
return f"{self.cycle_datetime}-{self.cycle_type}.t{self.hh}z.{self.data_format}.tm00.bufr_d"

def bufr_filepath(self):
return os.path.join(self.dump_dir, self.bufr_filename())

def ioda_filename(self):
return f"{self.cycle_type}.t{self.hh}z.insitu_profile_{self.data_format}.{self.cycle_datetime}.nc4"

def ioda_filepath(self):
return os.path.join(self.ioda_dir, self.ioda_filename())

def create_ioda_attributes(self, obsspace, date_range):
obsspace.write_attr('Converter', self.converter)
obsspace.write_attr('source', self.source)
obsspace.write_attr('sourceFiles', self.bufr_filename())
obsspace.write_attr('dataProviderOrigin', self.data_provider)
obsspace.write_attr('description', self.data_description)
obsspace.write_attr('datetimeRange', date_range)
obsspace.write_attr('platformLongDescription', self.platform_description)
133 changes: 133 additions & 0 deletions utils/b2i/b2iconverter/bufr2ioda_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import sys
import numpy as np
import numpy.ma as ma
import os
import time
from datetime import datetime
from pyiodaconv import bufr
from collections import namedtuple
from pyioda import ioda_obs_space as ioda_ospace
from .util import parse_arguments, run_diff
from .bufr2ioda_config import Bufr2iodaConfig
import logging
import tempfile


# the converter takes a configuration class as input,
# creates the logger and provides a method to run the converter
# and to test the result
# for testing purposes a temporary file is written by the logger
# and it is compared to a reference file
# the logger provides simple human readable numbers,
# as well as cryptographic hashes generated from input
# the hashes are deterministic, yet they are supposedly capable
# of detecting an error with high probability

class Bufr2ioda_Converter:
def __init__(self, bufr2ioda_config, ioda_vars, logfile):
ioda_vars.set_ocean_basin_nc_file(bufr2ioda_config.ocean_basin_nc_file_path())
self.bufr2ioda_config = bufr2ioda_config
self.ioda_vars = ioda_vars
self.logfile = logfile
self.setup_logging(bufr2ioda_config.script_name, self.logfile)

def setup_logging(self, script_name, logfile):
self.logger = logging.getLogger(script_name)
self.logger.setLevel(logging.DEBUG)

formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

console_handler = logging.StreamHandler()
# console_handler.setLevel(logging.INFO)
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(formatter)

self.logger.addHandler(console_handler)

if (logfile):
self.file_handler = logging.FileHandler(logfile)
self.file_handler.setLevel(logging.DEBUG)
file_formatter = logging.Formatter('%(message)s')
self.file_handler.setFormatter(file_formatter)

def run(self):
start_time = time.time()

self.logger.debug(f"build_query")
q = self.ioda_vars.build_query()

bufrfile_path = self.bufr2ioda_config.bufr_filepath()
self.logger.debug(f"ExecuteQuery: BUFR file = {bufrfile_path}")
with bufr.File(bufrfile_path) as f:
r = f.execute(q)

# process query results and set ioda variables
self.ioda_vars.set_from_query_result(r)

n_obs = self.ioda_vars.number_of_obs()
self.logger.debug(f"Query result has {n_obs} obs")
if (n_obs == 0):
self.logger.warning(f"No obs! Quitting.")
sys.exit(0)

self.ioda_vars.filter()

n_obs = self.ioda_vars.number_of_obs()
self.logger.debug(f"Filtered result has {n_obs} obs")
if (n_obs == 0):
self.logger.warning(f"No obs! Quitting.")
sys.exit(0)
self.logger.debug(f"Number of temperature obs = {self.ioda_vars.number_of_temp_obs()}")
self.logger.debug(f"Number of salinity obs = {self.ioda_vars.number_of_saln_obs()}")

# set seqNum, PreQC, ObsError, OceanBasin
self.ioda_vars.additional_vars.construct()

iodafile_path = self.bufr2ioda_config.ioda_filepath()
path, fname = os.path.split(iodafile_path)
os.makedirs(path, exist_ok=True)

metadata = self.ioda_vars.metadata

dims = {'Location': np.arange(0, metadata.lat.shape[0])}
obsspace = ioda_ospace.ObsSpace(iodafile_path, mode='w', dim_dict=dims)
self.logger.debug(f"Created IODA file: {iodafile_path}")

date_range = [str(metadata.dateTime.min()), str(metadata.dateTime.max())]
self.logger.debug(f"CreateGlobalAttributes")
self.bufr2ioda_config.create_ioda_attributes(obsspace, date_range)

self.logger.debug(f"ioda_vars.write_to_ioda_file")
self.ioda_vars.write_to_ioda_file(obsspace)

if (self.logfile):
self.logger.addHandler(self.file_handler)
self.ioda_vars.log(self.logger)
if (self.logfile):
self.logger.removeHandler(self.file_handler)

end_time = time.time()
running_time = end_time - start_time
self.logger.debug(f"Total running time: {running_time} seconds")

def test(self, test_file):
with tempfile.NamedTemporaryFile(delete=False, suffix='.log') as temp_log_file:
temp_log_file_name = temp_log_file.name
file_handler = logging.FileHandler(temp_log_file_name)
file_handler.setLevel(logging.DEBUG)
file_formatter = logging.Formatter('%(message)s')
file_handler.setFormatter(file_formatter)

self.logger.debug(f"TEST: created a temporary log file {temp_log_file_name}")
self.logger.debug(f"TEST: running diff with reference file {test_file}")
self.logger.addHandler(file_handler)

self.ioda_vars.log(self.logger)

result = run_diff(temp_log_file_name, test_file, self.logger)
if result:
self.logger.error(f"TEST ERROR: files are different")
else:
self.logger.info(f"TEST passed: files are identical")

return result
99 changes: 99 additions & 0 deletions utils/b2i/b2iconverter/ioda_addl_vars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import numpy as np
import sys
from .ocean import OceanBasin
from .util import *


class IODAAdditionalVariables:
def __init__(self, ioda_vars):
self.ioda_vars = ioda_vars
self.ocean = OceanBasin()

def construct(self):
self.seqNum = compute_seq_num(self.ioda_vars.metadata.lon, self.ioda_vars.metadata.lat)
n = len(self.seqNum)
self.PreQC = (np.ma.masked_array(np.full(n, 0))).astype(np.int32)
self.ObsError_temp = \
np.float32(np.ma.masked_array(np.full(n, self.ioda_vars.T_error)))
self.ObsError_saln = \
np.float32(np.ma.masked_array(np.full(n, self.ioda_vars.S_error)))
self.compute_ocean_basin()

def set_ocean_basin_nc_file(self, nc_file_path):
self.ocean.set_ocean_basin_nc_file(nc_file_path)

def compute_ocean_basin(self):
lat = self.ioda_vars.metadata.lat
lon = self.ioda_vars.metadata.lon
self.ocean.read_nc_file()
ob = self.ocean.get_station_basin(lat, lon)
self.OceanBasin = np.array(ob, dtype=np.int32)

def set_temperature_error(self, e):
self.T_error = e

def set_salinity_error(self, e):
self.S_error = e

def write_to_ioda_file(self, obsspace):
self.write_seq_num(obsspace)
self.write_preqc(obsspace, self.ioda_vars.T_name)
self.write_preqc(obsspace, self.ioda_vars.S_name)
self.write_obs_errorT(obsspace)
self.write_obs_errorS(obsspace)
self.write_ocean_basin(obsspace)

def log(self, logger):
self.log_seq_num(logger)
self.log_preqc(logger)
self.log_obs_error_temp(logger)
self.log_obs_error_saln(logger)
self.log_ocean_basin(logger)

#########################################################################

def write_seq_num(self, obsspace):
write_seq_num(obsspace, self.seqNum, self.PreQC.dtype, self.PreQC.fill_value)

# should the long name be "PreQC" + name?
def write_preqc(self, obsspace, name):
obsspace.create_var("PreQC/" + name, dtype=self.PreQC.dtype, fillval=self.PreQC.fill_value) \
.write_attr('long_name', 'PreQC') \
.write_data(self.PreQC)

def write_obs_errorT(self, obsspace):
write_obs_error(obsspace, "ObsError/" + self.ioda_vars.T_name, "degC", self.ObsError_temp)

def write_obs_errorS(self, obsspace):
write_obs_error(obsspace, "ObsError/" + self.ioda_vars.S_name, "psu", self.ObsError_saln)

def write_ocean_basin(self, obsspace):
write_ocean_basin(obsspace, self.OceanBasin, self.PreQC.dtype, self.PreQC.fill_value)

#########################################################################

def log_seq_num(self, logger):
log_variable(logger, "seqNum", self.seqNum)
logger.debug(f"seqNum hash = {compute_hash(self.seqNum)}")

def log_preqc(self, logger):
log_variable(logger, "PreQC", self.PreQC)

def log_obs_error_temp(self, logger):
log_variable(logger, "ObsError_temp", self.ObsError_temp)

def log_obs_error_saln(self, logger):
log_variable(logger, "ObsError_saln", self.ObsError_saln)

def log_ocean_basin(self, logger):
log_variable(logger, "OceanBasin", self.OceanBasin)
logger.debug(f"OceanBasin hash = {compute_hash(self.OceanBasin)}")

#########################################################################


def compute_seq_num(lon, lat):
combined = np.stack((lon, lat), axis=-1)
unique_combined, seqNum = np.unique(combined, axis=0, return_inverse=True)
seqNum = seqNum.astype(np.int32)
return seqNum
Loading