Skip to content

Commit

Permalink
Merge pull request #31 from axiom-data-science/add_parquet
Browse files Browse the repository at this point in the history
added parquet as config option
  • Loading branch information
kthyng authored Nov 11, 2024
2 parents 6b3951b + 1b9762c commit 11c91bb
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 55 deletions.
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ m.show_config(ptm_level=[1,2,3], excludestring=":").keys()

This section is split into two: first options that are available to all models (thus are handled in the Manager) and those for `OpenDriftModel` (the only model option currently).

This is not currently a comprehensive list but a place where extra details are included that might not be clear or available elsewhere. For more information look at the configuration information (previous section) and the docstrings for each class.
This is not currently a comprehensive list but a place where extra details are included that might not be clear or available elsewhere. For more information look at the configuration information (previous section) and the docstrings for each class in the API.

### Manager options, available to all models

Expand Down
16 changes: 16 additions & 0 deletions docs/quick_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,22 @@ ptm lon=-151 lat=59 ocean_model=NWGOA steps=1 --dry-run

`m.outfile_name` is printed to the screen after the command has been run. `ptm` is installed as an entry point with `particle-tracking-manager`.


If you are running this locally (this is for Axiom people), you'll want to run it like this:

```
ptm lon=-151 lat=59 ocean_model=NWGOA steps=1 ocean_model_local=True start_time=2000-1-1T01
```

where you should include `ocean_model_local=True` since you are running the model locally on a server, if you are doing so, you need to input a `start_time` since it will create a kerchunk file on the fly for `ocean_model` that you select.

Similarly you would do:

```
ptm lon=-151 lat=59 ocean_model=NWGOA steps=1 ocean_model_local=True start_time=2000-1-1T01 --dry-run
```


+++

(new_reader)=
Expand Down
7 changes: 6 additions & 1 deletion docs/whats_new.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
# What's New

## v0.9.2 (November 11, 2024)

* Added ability to save output files as parquet instead of netcdf.
* Partially updated docs

## v0.9.1 (October 25, 2024)

* Added local model option of CIOFS Fresh for which kerchunk files also can be generated on the fly
* Added local model option of CIOFS Fresh for which kerchunk files also can be generated on the fly.

## v0.9.0 (July 26, 2024)

Expand Down
10 changes: 10 additions & 0 deletions particle_tracking_manager/models/opendrift/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -204,5 +204,15 @@
"default": "low",
"ptm_level": 3,
"description": "Log verbosity"
},
"output_format": {
"type": "enum",
"enum": [
"netcdf",
"parquet"
],
"default": "netcdf",
"description": "Output file format. Options are \"netcdf\" or \"parquet\".",
"ptm_level": 2
}
}
140 changes: 93 additions & 47 deletions particle_tracking_manager/models/opendrift/opendrift.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ class OpenDriftModel(ParticleTrackingManager):
Oil mass is biodegraded (eaten by bacteria).
log : str, optional
Options are "low" and "high" verbosity for log, by default "low"
output_format : str, default "netcdf"
Name of input/output module type to use for writing Lagrangian model output. Default is "netcdf".
Notes
-----
Expand All @@ -164,6 +166,7 @@ class OpenDriftModel(ParticleTrackingManager):
o: Union[OceanDrift, Leeway, LarvalFish, OpenOil]
horizontal_diffusivity: Optional[float]
config_model: dict
output_format: str

def __init__(
self,
Expand Down Expand Up @@ -221,6 +224,7 @@ def __init__(
],
biodegradation: bool = config_model["biodegradation"]["default"],
log: str = config_model["log"]["default"],
output_format: str = config_model["output_format"]["default"],
**kw,
) -> None:
"""Inputs for OpenDrift model."""
Expand Down Expand Up @@ -248,18 +252,29 @@ def __init__(
# so do this before super initialization
self.__dict__["drift_model"] = drift_model

# # need output_format defined right away
# self.__dict__["output_format"] = output_format

# do this right away so I can query the object
# we don't actually input output_format here because we first output to netcdf, then
# resave as parquet after adding in extra config
if self.drift_model == "Leeway":
o = Leeway(loglevel=self.loglevel)
o = Leeway(loglevel=self.loglevel) # , output_format=self.output_format)

elif self.drift_model == "OceanDrift":
o = OceanDrift(loglevel=self.loglevel)
o = OceanDrift(
loglevel=self.loglevel
) # , output_format=self.output_format)

elif self.drift_model == "LarvalFish":
o = LarvalFish(loglevel=self.loglevel)
o = LarvalFish(
loglevel=self.loglevel
) # , output_format=self.output_format)

elif self.drift_model == "OpenOil":
o = OpenOil(loglevel=self.loglevel, weathering_model="noaa")
o = OpenOil(
loglevel=self.loglevel, weathering_model="noaa"
) # , output_format=self.output_format)

else:
raise ValueError(f"Drifter model {self.drift_model} is not recognized.")
Expand Down Expand Up @@ -662,13 +677,18 @@ def run_add_reader(
"hraw",
"snow_thick",
]
if self.start_time is None:
raise ValueError(
"Need to set start_time ahead of time to add local reader."
)
start = f"{self.start_time.year}-{str(self.start_time.month).zfill(2)}-{str(self.start_time.day).zfill(2)}"
end = f"{self.end_time.year}-{str(self.end_time.month).zfill(2)}-{str(self.end_time.day).zfill(2)}"
loc_local = make_nwgoa_kerchunk(start=start, end=end)

if self.ocean_model_local:

if self.start_time is None:
raise ValueError(
"Need to set start_time ahead of time to add local reader."
)
start_time = self.start_time
start = f"{start_time.year}-{str(start_time.month).zfill(2)}-{str(start_time.day).zfill(2)}"
end_time = self.end_time
end = f"{end_time.year}-{str(end_time.month).zfill(2)}-{str(end_time.day).zfill(2)}"
loc_local = make_nwgoa_kerchunk(start=start, end=end)

# loc_local = "/mnt/depot/data/packrat/prod/aoos/nwgoa/processed/nwgoa_kerchunk.parq"
loc_remote = (
Expand All @@ -682,29 +702,34 @@ def run_add_reader(
"wetdry_mask_psi",
]
if self.ocean_model == "CIOFS":
if self.start_time is None:
raise ValueError(
"Need to set start_time ahead of time to add local reader."

if self.ocean_model_local:

if self.start_time is None:
raise ValueError(
"Need to set start_time ahead of time to add local reader."
)
start = f"{self.start_time.year}_{str(self.start_time.dayofyear - 1).zfill(4)}"
end = f"{self.end_time.year}_{str(self.end_time.dayofyear).zfill(4)}"
loc_local = make_ciofs_kerchunk(
start=start, end=end, name="ciofs"
)
start = f"{self.start_time.year}_{str(self.start_time.dayofyear - 1).zfill(4)}"
end = (
f"{self.end_time.year}_{str(self.end_time.dayofyear).zfill(4)}"
)
loc_local = make_ciofs_kerchunk(start=start, end=end, name="ciofs")
loc_remote = "http://xpublish-ciofs.srv.axds.co/datasets/ciofs_hindcast/zarr/"

elif self.ocean_model == "CIOFSFRESH":
if self.start_time is None:
raise ValueError(
"Need to set start_time ahead of time to add local reader."

if self.ocean_model_local:

if self.start_time is None:
raise ValueError(
"Need to set start_time ahead of time to add local reader."
)
start = f"{self.start_time.year}_{str(self.start_time.dayofyear - 1).zfill(4)}"

end = f"{self.end_time.year}_{str(self.end_time.dayofyear).zfill(4)}"
loc_local = make_ciofs_kerchunk(
start=start, end=end, name="ciofs_fresh"
)
start = f"{self.start_time.year}_{str(self.start_time.dayofyear - 1).zfill(4)}"
end = (
f"{self.end_time.year}_{str(self.end_time.dayofyear).zfill(4)}"
)
loc_local = make_ciofs_kerchunk(
start=start, end=end, name="ciofs_fresh"
)
loc_remote = None

elif self.ocean_model == "CIOFSOP":
Expand All @@ -715,17 +740,21 @@ def run_add_reader(
"v_northward": "y_sea_water_velocity",
}
)
if self.start_time is None:
raise ValueError(
"Need to set start_time ahead of time to add local reader."

if self.ocean_model_local:

if self.start_time is None:
raise ValueError(
"Need to set start_time ahead of time to add local reader."
)
start = f"{self.start_time.year}-{str(self.start_time.month).zfill(2)}-{str(self.start_time.day).zfill(2)}"
end = f"{self.end_time.year}-{str(self.end_time.month).zfill(2)}-{str(self.end_time.day).zfill(2)}"

loc_local = make_ciofs_kerchunk(
start=start, end=end, name="aws_ciofs_with_angle"
)
start = f"{self.start_time.year}-{str(self.start_time.month).zfill(2)}-{str(self.start_time.day).zfill(2)}"
end = f"{self.end_time.year}-{str(self.end_time.month).zfill(2)}-{str(self.end_time.day).zfill(2)}"
# loc_local = "/mnt/depot/data/packrat/prod/noaa/coops/ofs/aws_ciofs/processed/aws_ciofs_kerchunk.parq"

loc_local = make_ciofs_kerchunk(
start=start, end=end, name="aws_ciofs_with_angle"
)
# loc_local = "/mnt/depot/data/packrat/prod/noaa/coops/ofs/aws_ciofs/processed/aws_ciofs_kerchunk.parq"
loc_remote = "https://thredds.aoos.org/thredds/dodsC/AWS_CIOFS.nc"

elif self.ocean_model == "user_input":
Expand Down Expand Up @@ -964,10 +993,21 @@ def run_drifters(self):

self.o._config = config_input_to_opendrift # only OpenDrift config

output_file_initial = (
f"{self.output_file}_initial"
or f"output-results_{datetime.datetime.now():%Y-%m-%dT%H%M:%SZ}.nc"
output_file = (
self.output_file
or f"output-results_{datetime.datetime.now():%Y-%m-%dT%H%M:%SZ}"
)
output_file_initial = f"{output_file}_initial" + ".nc"

# initially output to netcdf even if parquet has been selected
# since I do this weird 2 step saving process

# if self.output_format == "netcdf":
# output_file_initial += ".nc"
# elif self.output_format == "parquet":
# output_file_initial += ".parq"
# else:
# raise ValueError(f"output_format {self.output_format} not recognized.")

self.o.run(
time_step=timedir * self.time_step,
Expand All @@ -987,13 +1027,19 @@ def run_drifters(self):
v = str(v)
ds.attrs[f"ptm_config_{k}"] = v

# Make new output file
output_file = (
self.output_file
or f"output-results_{datetime.datetime.utcnow():%Y-%m-%dT%H%M:%SZ}.nc"
)
if self.output_format == "netcdf":
output_file += ".nc"
elif self.output_format == "parquet":
output_file += ".parq"
else:
raise ValueError(f"output_format {self.output_format} not recognized.")

ds.to_netcdf(output_file)
if self.output_format == "netcdf":
ds.to_netcdf(output_file)
elif self.output_format == "parquet":
ds.to_dataframe().to_parquet(output_file)
else:
raise ValueError(f"output_format {self.output_format} not recognized.")

# update with new path name
self.o.outfile_name = output_file
Expand Down
5 changes: 5 additions & 0 deletions particle_tracking_manager/models/opendrift/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ def make_nwgoa_kerchunk(start, end):
and Path(j).stem.split("nwgoa_")[1] <= end
]

if json_list == []:
raise ValueError(
f"No files found in {output_dir_single_files} for {start} to {end}"
)

# account for double compression
# Look at individual variables in the files to see what needs to be changed with
# h5dump -d ocean_time -p /mnt/depot/data/packrat/prod/aoos/nwgoa/processed/1999/nwgoa_1999-02-01.nc
Expand Down
6 changes: 3 additions & 3 deletions particle_tracking_manager/the_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class ParticleTrackingManager:
dataset before inputting to PTM. Setting this to True may save computation time but
will be less accurate, especially in the tidal flat regions of the model.
output_file : Optional[str], optional
Name of output file to save, by default None. If None, default is set in the model. With ".nc" suffix.
Name of output file to save, by default None. If None, default is set in the model. Without any suffix.
Notes
-----
Expand Down Expand Up @@ -230,12 +230,12 @@ def __init__(
if self.__dict__["output_file"] is None:
self.__dict__[
"output_file"
] = f"output-results_{datetime.datetime.now():%Y-%m-%dT%H%M:%SZ}.nc"
] = f"output-results_{datetime.datetime.now():%Y-%m-%dT%H%M:%SZ}"

## set up log for this simulation
# Create a file handler
assert self.__dict__["output_file"] is not None
logfile_name = self.__dict__["output_file"].replace(".nc", ".log")
logfile_name = self.__dict__["output_file"] + ".log"
self.file_handler = logging.FileHandler(logfile_name)

# Create a formatter and add it to the handler
Expand Down
2 changes: 1 addition & 1 deletion tests/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def test_seed():
def test_set_start_time_ahead():
"""Test set start_time ahead when using start_time for local kerchunk file setup."""

m = ptm.OpenDriftModel(ocean_model="CIOFSOP")
m = ptm.OpenDriftModel(ocean_model="CIOFSOP", ocean_model_local=True)

# this causes the check
with pytest.raises(ValueError):
Expand Down
10 changes: 10 additions & 0 deletions tests/test_opendrift.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,16 @@ def test_leeway_model_stokes_drift_true(self):
# assert not self.m.show_config(key="stokes_drift")["value"]


def test_output_format():
"""Check output_format."""

m = OpenDriftModel(output_format="netcdf")
assert m.output_format == "netcdf"

m = OpenDriftModel(output_format="parquet")
assert m.output_format == "parquet"


def test_horizontal_diffusivity_logic():
"""Check logic for using default horizontal diff values for known models."""

Expand Down
27 changes: 25 additions & 2 deletions tests/test_realistic.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,38 @@ def test_add_new_reader():


@pytest.mark.slow
def test_run():
def test_run_parquet():
"""Set up and run."""

import xroms

seeding_kwargs = dict(lon=-90, lat=28.7, number=1)
manager = ptm.OpenDriftModel(**seeding_kwargs, use_static_masks=True, steps=2)
manager = ptm.OpenDriftModel(
**seeding_kwargs, use_static_masks=True, steps=2, output_format="parquet"
)
url = xroms.datasets.CLOVER.fetch("ROMS_example_full_grid.nc")
ds = xr.open_dataset(url, decode_times=False)
manager.add_reader(ds=ds, name="txla")
manager.seed()
manager.run()

assert "parq" in manager.o.outfile_name


@pytest.mark.slow
def test_run_netcdf():
"""Set up and run."""

import xroms

seeding_kwargs = dict(lon=-90, lat=28.7, number=1)
manager = ptm.OpenDriftModel(
**seeding_kwargs, use_static_masks=True, steps=2, output_format="netcdf"
)
url = xroms.datasets.CLOVER.fetch("ROMS_example_full_grid.nc")
ds = xr.open_dataset(url, decode_times=False)
manager.add_reader(ds=ds, name="txla")
manager.seed()
manager.run()

assert "nc" in manager.o.outfile_name

0 comments on commit 11c91bb

Please sign in to comment.