From 650e9a585095c7390c4d21833a9b8fbb8c03596d Mon Sep 17 00:00:00 2001 From: Kristen Thyng Date: Mon, 11 Nov 2024 13:13:31 -0800 Subject: [PATCH 1/2] added parquet as config option updated tests and some docs, a few other things --- docs/configuration.md | 2 +- docs/quick_start.md | 16 ++ docs/whats_new.md | 7 +- .../models/opendrift/config.json | 10 ++ .../models/opendrift/opendrift.py | 140 ++++++++++++------ .../models/opendrift/utils.py | 5 + particle_tracking_manager/the_manager.py | 6 +- tests/test_opendrift.py | 10 ++ tests/test_realistic.py | 27 +++- 9 files changed, 169 insertions(+), 54 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 1e54a71..e10124b 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -145,7 +145,7 @@ m.show_config(ptm_level=[1,2,3], excludestring=":").keys() This section is split into two: first options that are available to all models (thus are handled in the Manager) and those for `OpenDriftModel` (the only model option currently). -This is not currently a comprehensive list but a place where extra details are included that might not be clear or available elsewhere. For more information look at the configuration information (previous section) and the docstrings for each class. +This is not currently a comprehensive list but a place where extra details are included that might not be clear or available elsewhere. For more information look at the configuration information (previous section) and the docstrings for each class in the API. ### Manager options, available to all models diff --git a/docs/quick_start.md b/docs/quick_start.md index 7673c85..c422c40 100644 --- a/docs/quick_start.md +++ b/docs/quick_start.md @@ -55,6 +55,22 @@ ptm lon=-151 lat=59 ocean_model=NWGOA steps=1 --dry-run `m.outfile_name` is printed to the screen after the command has been run. `ptm` is installed as an entry point with `particle-tracking-manager`. + +If you are running this locally (this is for Axiom people), you'll want to run it like this: + +``` +ptm lon=-151 lat=59 ocean_model=NWGOA steps=1 ocean_model_local=True start_time=2000-1-1T01 +``` + +where you should include `ocean_model_local=True` since you are running the model locally on a server, if you are doing so, you need to input a `start_time` since it will create a kerchunk file on the fly for `ocean_model` that you select. + +Similarly you would do: + +``` +ptm lon=-151 lat=59 ocean_model=NWGOA steps=1 ocean_model_local=True start_time=2000-1-1T01 --dry-run +``` + + +++ (new_reader)= diff --git a/docs/whats_new.md b/docs/whats_new.md index 1856c94..bad715f 100644 --- a/docs/whats_new.md +++ b/docs/whats_new.md @@ -1,8 +1,13 @@ # What's New +## v0.9.2 (November 11, 2024) + +* Added ability to save output files as parquet instead of netcdf. +* Partially updated docs + ## v0.9.1 (October 25, 2024) -* Added local model option of CIOFS Fresh for which kerchunk files also can be generated on the fly +* Added local model option of CIOFS Fresh for which kerchunk files also can be generated on the fly. ## v0.9.0 (July 26, 2024) diff --git a/particle_tracking_manager/models/opendrift/config.json b/particle_tracking_manager/models/opendrift/config.json index 9d12f85..6b1edae 100644 --- a/particle_tracking_manager/models/opendrift/config.json +++ b/particle_tracking_manager/models/opendrift/config.json @@ -204,5 +204,15 @@ "default": "low", "ptm_level": 3, "description": "Log verbosity" + }, + "output_format": { + "type": "enum", + "enum": [ + "netcdf", + "parquet" + ], + "default": "netcdf", + "description": "Output file format. Options are \"netcdf\" or \"parquet\".", + "ptm_level": 2 } } diff --git a/particle_tracking_manager/models/opendrift/opendrift.py b/particle_tracking_manager/models/opendrift/opendrift.py index 603d235..de8121d 100644 --- a/particle_tracking_manager/models/opendrift/opendrift.py +++ b/particle_tracking_manager/models/opendrift/opendrift.py @@ -144,6 +144,8 @@ class OpenDriftModel(ParticleTrackingManager): Oil mass is biodegraded (eaten by bacteria). log : str, optional Options are "low" and "high" verbosity for log, by default "low" + output_format : str, default "netcdf" + Name of input/output module type to use for writing Lagrangian model output. Default is "netcdf". Notes ----- @@ -164,6 +166,7 @@ class OpenDriftModel(ParticleTrackingManager): o: Union[OceanDrift, Leeway, LarvalFish, OpenOil] horizontal_diffusivity: Optional[float] config_model: dict + output_format: str def __init__( self, @@ -221,6 +224,7 @@ def __init__( ], biodegradation: bool = config_model["biodegradation"]["default"], log: str = config_model["log"]["default"], + output_format: str = config_model["output_format"]["default"], **kw, ) -> None: """Inputs for OpenDrift model.""" @@ -248,18 +252,29 @@ def __init__( # so do this before super initialization self.__dict__["drift_model"] = drift_model + # # need output_format defined right away + # self.__dict__["output_format"] = output_format + # do this right away so I can query the object + # we don't actually input output_format here because we first output to netcdf, then + # resave as parquet after adding in extra config if self.drift_model == "Leeway": - o = Leeway(loglevel=self.loglevel) + o = Leeway(loglevel=self.loglevel) # , output_format=self.output_format) elif self.drift_model == "OceanDrift": - o = OceanDrift(loglevel=self.loglevel) + o = OceanDrift( + loglevel=self.loglevel + ) # , output_format=self.output_format) elif self.drift_model == "LarvalFish": - o = LarvalFish(loglevel=self.loglevel) + o = LarvalFish( + loglevel=self.loglevel + ) # , output_format=self.output_format) elif self.drift_model == "OpenOil": - o = OpenOil(loglevel=self.loglevel, weathering_model="noaa") + o = OpenOil( + loglevel=self.loglevel, weathering_model="noaa" + ) # , output_format=self.output_format) else: raise ValueError(f"Drifter model {self.drift_model} is not recognized.") @@ -662,13 +677,18 @@ def run_add_reader( "hraw", "snow_thick", ] - if self.start_time is None: - raise ValueError( - "Need to set start_time ahead of time to add local reader." - ) - start = f"{self.start_time.year}-{str(self.start_time.month).zfill(2)}-{str(self.start_time.day).zfill(2)}" - end = f"{self.end_time.year}-{str(self.end_time.month).zfill(2)}-{str(self.end_time.day).zfill(2)}" - loc_local = make_nwgoa_kerchunk(start=start, end=end) + + if self.ocean_model_local: + + if self.start_time is None: + raise ValueError( + "Need to set start_time ahead of time to add local reader." + ) + start_time = self.start_time + start = f"{start_time.year}-{str(start_time.month).zfill(2)}-{str(start_time.day).zfill(2)}" + end_time = self.end_time + end = f"{end_time.year}-{str(end_time.month).zfill(2)}-{str(end_time.day).zfill(2)}" + loc_local = make_nwgoa_kerchunk(start=start, end=end) # loc_local = "/mnt/depot/data/packrat/prod/aoos/nwgoa/processed/nwgoa_kerchunk.parq" loc_remote = ( @@ -682,29 +702,34 @@ def run_add_reader( "wetdry_mask_psi", ] if self.ocean_model == "CIOFS": - if self.start_time is None: - raise ValueError( - "Need to set start_time ahead of time to add local reader." + + if self.ocean_model_local: + + if self.start_time is None: + raise ValueError( + "Need to set start_time ahead of time to add local reader." + ) + start = f"{self.start_time.year}_{str(self.start_time.dayofyear - 1).zfill(4)}" + end = f"{self.end_time.year}_{str(self.end_time.dayofyear).zfill(4)}" + loc_local = make_ciofs_kerchunk( + start=start, end=end, name="ciofs" ) - start = f"{self.start_time.year}_{str(self.start_time.dayofyear - 1).zfill(4)}" - end = ( - f"{self.end_time.year}_{str(self.end_time.dayofyear).zfill(4)}" - ) - loc_local = make_ciofs_kerchunk(start=start, end=end, name="ciofs") loc_remote = "http://xpublish-ciofs.srv.axds.co/datasets/ciofs_hindcast/zarr/" elif self.ocean_model == "CIOFSFRESH": - if self.start_time is None: - raise ValueError( - "Need to set start_time ahead of time to add local reader." + + if self.ocean_model_local: + + if self.start_time is None: + raise ValueError( + "Need to set start_time ahead of time to add local reader." + ) + start = f"{self.start_time.year}_{str(self.start_time.dayofyear - 1).zfill(4)}" + + end = f"{self.end_time.year}_{str(self.end_time.dayofyear).zfill(4)}" + loc_local = make_ciofs_kerchunk( + start=start, end=end, name="ciofs_fresh" ) - start = f"{self.start_time.year}_{str(self.start_time.dayofyear - 1).zfill(4)}" - end = ( - f"{self.end_time.year}_{str(self.end_time.dayofyear).zfill(4)}" - ) - loc_local = make_ciofs_kerchunk( - start=start, end=end, name="ciofs_fresh" - ) loc_remote = None elif self.ocean_model == "CIOFSOP": @@ -715,17 +740,21 @@ def run_add_reader( "v_northward": "y_sea_water_velocity", } ) - if self.start_time is None: - raise ValueError( - "Need to set start_time ahead of time to add local reader." + + if self.ocean_model_local: + + if self.start_time is None: + raise ValueError( + "Need to set start_time ahead of time to add local reader." + ) + start = f"{self.start_time.year}-{str(self.start_time.month).zfill(2)}-{str(self.start_time.day).zfill(2)}" + end = f"{self.end_time.year}-{str(self.end_time.month).zfill(2)}-{str(self.end_time.day).zfill(2)}" + + loc_local = make_ciofs_kerchunk( + start=start, end=end, name="aws_ciofs_with_angle" ) - start = f"{self.start_time.year}-{str(self.start_time.month).zfill(2)}-{str(self.start_time.day).zfill(2)}" - end = f"{self.end_time.year}-{str(self.end_time.month).zfill(2)}-{str(self.end_time.day).zfill(2)}" + # loc_local = "/mnt/depot/data/packrat/prod/noaa/coops/ofs/aws_ciofs/processed/aws_ciofs_kerchunk.parq" - loc_local = make_ciofs_kerchunk( - start=start, end=end, name="aws_ciofs_with_angle" - ) - # loc_local = "/mnt/depot/data/packrat/prod/noaa/coops/ofs/aws_ciofs/processed/aws_ciofs_kerchunk.parq" loc_remote = "https://thredds.aoos.org/thredds/dodsC/AWS_CIOFS.nc" elif self.ocean_model == "user_input": @@ -964,10 +993,21 @@ def run_drifters(self): self.o._config = config_input_to_opendrift # only OpenDrift config - output_file_initial = ( - f"{self.output_file}_initial" - or f"output-results_{datetime.datetime.now():%Y-%m-%dT%H%M:%SZ}.nc" + output_file = ( + self.output_file + or f"output-results_{datetime.datetime.now():%Y-%m-%dT%H%M:%SZ}" ) + output_file_initial = f"{output_file}_initial" + ".nc" + + # initially output to netcdf even if parquet has been selected + # since I do this weird 2 step saving process + + # if self.output_format == "netcdf": + # output_file_initial += ".nc" + # elif self.output_format == "parquet": + # output_file_initial += ".parq" + # else: + # raise ValueError(f"output_format {self.output_format} not recognized.") self.o.run( time_step=timedir * self.time_step, @@ -987,13 +1027,19 @@ def run_drifters(self): v = str(v) ds.attrs[f"ptm_config_{k}"] = v - # Make new output file - output_file = ( - self.output_file - or f"output-results_{datetime.datetime.utcnow():%Y-%m-%dT%H%M:%SZ}.nc" - ) + if self.output_format == "netcdf": + output_file += ".nc" + elif self.output_format == "parquet": + output_file += ".parq" + else: + raise ValueError(f"output_format {self.output_format} not recognized.") - ds.to_netcdf(output_file) + if self.output_format == "netcdf": + ds.to_netcdf(output_file) + elif self.output_format == "parquet": + ds.to_dataframe().to_parquet(output_file) + else: + raise ValueError(f"output_format {self.output_format} not recognized.") # update with new path name self.o.outfile_name = output_file diff --git a/particle_tracking_manager/models/opendrift/utils.py b/particle_tracking_manager/models/opendrift/utils.py index 8cb52d3..676cb51 100644 --- a/particle_tracking_manager/models/opendrift/utils.py +++ b/particle_tracking_manager/models/opendrift/utils.py @@ -207,6 +207,11 @@ def make_nwgoa_kerchunk(start, end): and Path(j).stem.split("nwgoa_")[1] <= end ] + if json_list == []: + raise ValueError( + f"No files found in {output_dir_single_files} for {start} to {end}" + ) + # account for double compression # Look at individual variables in the files to see what needs to be changed with # h5dump -d ocean_time -p /mnt/depot/data/packrat/prod/aoos/nwgoa/processed/1999/nwgoa_1999-02-01.nc diff --git a/particle_tracking_manager/the_manager.py b/particle_tracking_manager/the_manager.py index 00a82d4..d4fee25 100644 --- a/particle_tracking_manager/the_manager.py +++ b/particle_tracking_manager/the_manager.py @@ -136,7 +136,7 @@ class ParticleTrackingManager: dataset before inputting to PTM. Setting this to True may save computation time but will be less accurate, especially in the tidal flat regions of the model. output_file : Optional[str], optional - Name of output file to save, by default None. If None, default is set in the model. With ".nc" suffix. + Name of output file to save, by default None. If None, default is set in the model. Without any suffix. Notes ----- @@ -230,12 +230,12 @@ def __init__( if self.__dict__["output_file"] is None: self.__dict__[ "output_file" - ] = f"output-results_{datetime.datetime.now():%Y-%m-%dT%H%M:%SZ}.nc" + ] = f"output-results_{datetime.datetime.now():%Y-%m-%dT%H%M:%SZ}" ## set up log for this simulation # Create a file handler assert self.__dict__["output_file"] is not None - logfile_name = self.__dict__["output_file"].replace(".nc", ".log") + logfile_name = self.__dict__["output_file"] + ".log" self.file_handler = logging.FileHandler(logfile_name) # Create a formatter and add it to the handler diff --git a/tests/test_opendrift.py b/tests/test_opendrift.py index 41c1e9f..5a87597 100644 --- a/tests/test_opendrift.py +++ b/tests/test_opendrift.py @@ -312,6 +312,16 @@ def test_leeway_model_stokes_drift_true(self): # assert not self.m.show_config(key="stokes_drift")["value"] +def test_output_format(): + """Check output_format.""" + + m = OpenDriftModel(output_format="netcdf") + assert m.output_format == "netcdf" + + m = OpenDriftModel(output_format="parquet") + assert m.output_format == "parquet" + + def test_horizontal_diffusivity_logic(): """Check logic for using default horizontal diff values for known models.""" diff --git a/tests/test_realistic.py b/tests/test_realistic.py index f621702..fdb9368 100644 --- a/tests/test_realistic.py +++ b/tests/test_realistic.py @@ -20,15 +20,38 @@ def test_add_new_reader(): @pytest.mark.slow -def test_run(): +def test_run_parquet(): """Set up and run.""" import xroms seeding_kwargs = dict(lon=-90, lat=28.7, number=1) - manager = ptm.OpenDriftModel(**seeding_kwargs, use_static_masks=True, steps=2) + manager = ptm.OpenDriftModel( + **seeding_kwargs, use_static_masks=True, steps=2, output_format="parquet" + ) url = xroms.datasets.CLOVER.fetch("ROMS_example_full_grid.nc") ds = xr.open_dataset(url, decode_times=False) manager.add_reader(ds=ds, name="txla") manager.seed() manager.run() + + assert "parq" in manager.o.outfile_name + + +@pytest.mark.slow +def test_run_netcdf(): + """Set up and run.""" + + import xroms + + seeding_kwargs = dict(lon=-90, lat=28.7, number=1) + manager = ptm.OpenDriftModel( + **seeding_kwargs, use_static_masks=True, steps=2, output_format="netcdf" + ) + url = xroms.datasets.CLOVER.fetch("ROMS_example_full_grid.nc") + ds = xr.open_dataset(url, decode_times=False) + manager.add_reader(ds=ds, name="txla") + manager.seed() + manager.run() + + assert "nc" in manager.o.outfile_name From 1b9762c044069f35ef52578cebd48d2e7c68c694 Mon Sep 17 00:00:00 2001 From: Kristen Thyng Date: Mon, 11 Nov 2024 13:38:20 -0800 Subject: [PATCH 2/2] fixed test --- tests/test_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_manager.py b/tests/test_manager.py index b5dc42f..8a39c2b 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -61,7 +61,7 @@ def test_seed(): def test_set_start_time_ahead(): """Test set start_time ahead when using start_time for local kerchunk file setup.""" - m = ptm.OpenDriftModel(ocean_model="CIOFSOP") + m = ptm.OpenDriftModel(ocean_model="CIOFSOP", ocean_model_local=True) # this causes the check with pytest.raises(ValueError):