From d98aa8eceba63bf789676d2cc15ccfe3c6d0c142 Mon Sep 17 00:00:00 2001 From: SarathUW Date: Mon, 19 Feb 2024 14:25:24 -0800 Subject: [PATCH 1/3] Fixing the forecasting plugin issues --- src/rat/plugins/forecasting.py | 2 +- src/rat/run_rat.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/rat/plugins/forecasting.py b/src/rat/plugins/forecasting.py index c0beb8e..4646a82 100644 --- a/src/rat/plugins/forecasting.py +++ b/src/rat/plugins/forecasting.py @@ -15,7 +15,7 @@ from datetime import date from rat.utils.run_command import run_command import ruamel_yaml as ryaml -import cfgrib +# import cfgrib from rat.data_processing.metsim_input_processing import CombinedNC from rat.rat_basin import rat_basin from rat.toolbox.config import update_config diff --git a/src/rat/run_rat.py b/src/rat/run_rat.py index 795e7d8..b90002f 100644 --- a/src/rat/run_rat.py +++ b/src/rat/run_rat.py @@ -90,8 +90,8 @@ def run_rat(config_fn, operational_latency=None): config_copy = copy.deepcopy(config) # Run RAT for basin no_errors, latest_altimetry_cycle = rat_basin(config, log) - # Run RAT forecast for basin if forecast is True - if config['PLUGINS']['forecast']: + # Run RAT forecast for basin if forecast is True + if config.get('PLUGINS', {}).get('forecasting'): log.info('############## Starting RAT forecast for '+config['BASIN']['basin_name']+' #################') forecast_no_errors = forecast(config, log) if(forecast_no_errors>0): @@ -172,7 +172,7 @@ def run_rat(config_fn, operational_latency=None): ryaml_client.dump(config, config_fn.open('w')) no_errors, latest_altimetry_cycle = rat_basin(config_copy, log) # Run RAT forecast for basin if forecast is True - if config['PLUGINS']['forecast']: + if config.get('PLUGINS', {}).get('forecasting'): log.info('############## Starting RAT forecast for '+config['BASIN']['basin_name']+' #################') forecast_no_errors = forecast(config, log) if(forecast_no_errors>0): From 673ba37d3655ff6bc63cac2a16665a8b33419546 Mon Sep 17 00:00:00 2001 From: SarathUW Date: Mon, 19 Feb 2024 14:37:55 -0800 Subject: [PATCH 2/3] ResORR plugin for RAT --- src/rat/plugins/resorr/data_prep.py | 318 ++++++++++++++++++++++++++++ src/rat/plugins/resorr/network.py | 301 ++++++++++++++++++++++++++ src/rat/plugins/resorr/reservoir.py | 90 ++++++++ src/rat/plugins/resorr/runResorr.py | 134 ++++++++++++ src/rat/rat_basin.py | 15 +- 5 files changed, 857 insertions(+), 1 deletion(-) create mode 100644 src/rat/plugins/resorr/data_prep.py create mode 100644 src/rat/plugins/resorr/network.py create mode 100644 src/rat/plugins/resorr/reservoir.py create mode 100644 src/rat/plugins/resorr/runResorr.py diff --git a/src/rat/plugins/resorr/data_prep.py b/src/rat/plugins/resorr/data_prep.py new file mode 100644 index 0000000..47a317b --- /dev/null +++ b/src/rat/plugins/resorr/data_prep.py @@ -0,0 +1,318 @@ +import geopandas as gpd +from pathlib import Path +import pandas as pd +import xarray as xr +import rioxarray as rxr +import numpy as np +import networkx as nx +import geonetworkx as gnx + + +def generate_network( + flow_dir_fn, + stations_fn, + save_dir=None, + dist_proj=None, + elevation_fn=None + ) -> (gpd.GeoDataFrame, gpd.GeoDataFrame): + """generate reservoir network using flow direction file and reservoir locations. + + Args: + flow_dir_fn (str or Path): path to flow direction file + stations_fn (str or Path): path to reservoir locations file + save_dir (str or Path): path to save directory + dist_proj (str, optional): projection to use for optionally calculating distance between reservoirs. Defaults to None. + elevtion_fn (str or Path, optional): path to elevation file used for optionally adding elevation data to each reservoir. Defaults to None. + + Returns: + (gpd.GeoDataFrame, gpd.GeoDataFrame): tuple of (edges, nodes) GeoDataFrames + """ + + # check if files exist + flow_dir_fn = Path(flow_dir_fn) + assert flow_dir_fn.exists() + + stations_fn = Path(stations_fn) + assert stations_fn.exists() + + if save_dir: + save_dir = Path(save_dir) + if not save_dir.exists(): + print(f"passed save_dir does not exist, creating {save_dir}") + save_dir.mkdir(parents=True) + + if elevation_fn: + elevation_fn = Path(elevation_fn) + assert elevation_fn.exists() + + fdr = rxr.open_rasterio(flow_dir_fn, masked=True) + band = fdr.sel(band=1) + + band_vicfmt = band + + reservoirs = gpd.read_file(stations_fn) + reservoirs['geometry'] = gpd.points_from_xy(reservoirs['lon'], reservoirs['lat']) + reservoirs.set_crs('epsg:4326', inplace=True) + + reservoir_location_raster = xr.full_like(band_vicfmt, np.nan) + for resid, row in reservoirs.iterrows(): + reslat = float(row.lat) + reslon = float(row.lon) + + rast_lat = reservoir_location_raster.indexes['y'].get_indexer([reslat], method="nearest")[0] + rast_lon = reservoir_location_raster.indexes['x'].get_indexer([reslon], method="nearest")[0] + + reservoir_location_raster[rast_lat, rast_lon] = resid + + # convert all points to nodes. Use index value to identify + G = gnx.GeoDiGraph() + G.add_nodes_from(reservoirs.index) + + operations = { + 1: [-1, 0], # N + 2: [-1, 1], # NE + 3: [0, 1], # E + 4: [1, 1], # SE + 5: [1, 0], # S + 6: [1, -1], # SW + 7: [0, -1], # W + 8: [-1, -1], # NW + } + + for node in G.nodes: + resdata = reservoirs[reservoirs.index==node] + + x = float(resdata['lon'].values[0]) + y = float(resdata['lat'].values[0]) + + idxx = band_vicfmt.indexes['x'].get_indexer([x], method="nearest")[0] + idxy = band_vicfmt.indexes['y'].get_indexer([y], method="nearest")[0] + + # travel downstream until another node, np.nan or out-of-bounds is found, or if travelling in a loop + + visited = [(idxx, idxy)] + current_pix = band_vicfmt.isel(x=idxx, y=idxy) + + attrs_n = { + node: { + 'x': reservoirs['geometry'][node].x, + 'y': reservoirs['geometry'][node].y, + 'name': reservoirs['name'][node] + } + } + nx.set_node_attributes(G, attrs_n) + + if not np.isnan(current_pix): + END = False + while not END: + op = operations[int(current_pix)] + new_idxy, new_idxx = np.array((idxy, idxx)) + np.array(op) + idxy, idxx = new_idxy, new_idxx + + if (new_idxx, new_idxy) in visited: + # In a loop, exit + END=True + break + else: + visited.append((new_idxx, new_idxy)) + + current_pix = band_vicfmt.isel(x=new_idxx, y=new_idxy) + if np.isnan(current_pix): + # NaN value found, exit loop + END=True + break + + try: + any_reservoir = reservoir_location_raster.isel(x=new_idxx, y=new_idxy) + if not np.isnan(any_reservoir): + # another reservoir found + G.add_edge(node, int(any_reservoir)) + if dist_proj: + attrs_e = { + (node, int(any_reservoir)): { + 'length': reservoirs.to_crs(dist_proj)['geometry'][node].distance(reservoirs.to_crs(dist_proj)['geometry'][int(any_reservoir)]) + } + } + nx.set_edge_attributes(G, attrs_e) + END = True + break + except IndexError: + # Reached end + END=True + + G_gdf = gpd.GeoDataFrame(gnx.graph_edges_to_gdf(G)) + G_gdf_pts = gpd.GeoDataFrame(gnx.graph_nodes_to_gdf(G)) + + # add elevation data to nodes + if elevation_fn: + elev = rxr.open_rasterio(elevation_fn, chunks='auto') + + G_gdf_pts['elevation'] = G_gdf_pts[['x', 'y']].apply(lambda row: float(elev.sel(x=row.x, y=row.y, method='nearest')), axis=1) + G_gdf_pts.head() + + if save_dir: + pts_save_fn = Path(save_dir) / 'rivreg_network_pts.shp' + edges_save_fn = Path(save_dir) / 'rivreg_network.shp' + + G_gdf_pts.to_file(pts_save_fn) + G_gdf.to_file(edges_save_fn) + + return G + +# aggregate +def aggregate(ds, frequency='weekly'): + if frequency == 'daily': + resampled = ds + resampled['dt'] = ds['time'].resample(time='1D').count() + elif frequency == 'weekly': + resampled = ds.resample(time='1W').mean() + resampled['dt'] = ds['time'].resample(time='1W').count() + elif frequency == 'monthly': + resampled = ds.resample(time='1M').mean() + resampled['dt'] = ds['time'].resample(time='1M').count() + elif frequency == 'annual': + resampled = ds.resample(time='1Y').mean() + resampled['dt'] = ds['time'].resample(time='1Y').count() + else: + raise ValueError(f'frequency {frequency} not supported') + + return resampled + +def calculate_volumes( + ds, + fluxes=['unregulated_inflow', 'storage_change'] + ): + """Calculate volume values using flow rates and ∆t + + Args: + ds (xr.Dataset): Dataset containing flow rates in m3/day + fluxes (list, optional): List of fluxes to calculate volumes for. Defaults to ['unregulated_inflow', 'storage_change']. + """ + for flux in fluxes: + ds[flux] = ds[flux] * ds['dt'] + ds[flux].attrs['units'] = 'm3' + ds[flux].attrs['long_name'] = f'Volume of {flux}' + ds[flux].attrs['description'] = f'Volume of {flux} in m3' + + return ds + +def _rat_read_inflow(unregulated_inflow_fn, resorr_node_id, rat_output_level='final_outputs') -> xr.Dataset: + """returns inflow files generated by RAT under pristine conditions as unregulated inflow. + + Args: + unregulated_inflow_fn (str): path of the unregulated inflow file + rat_output_level (str, optional): whether to read from `final_outputs` or `rat_outputs`, which are slightly different in units and file formatting. Defaults to 'final_outputs'. + + Returns: + xr.Dataset: unregulated inflow + """ + if rat_output_level == 'final_outputs': + unregulated_inflow = pd.read_csv(unregulated_inflow_fn, parse_dates=['date']).rename({ + 'date': 'time', + 'inflow (m3/d)': 'unregulated_inflow' + }, axis='columns') + elif rat_output_level == 'rat_outputs': + unregulated_inflow = pd.read_csv(unregulated_inflow_fn, parse_dates=['date']).rename({ + 'date': 'time', + 'streamflow': 'unregulated_inflow' + }, axis='columns') + unregulated_inflow['unregulated_inflow'] = unregulated_inflow['unregulated_inflow'] * (24*60*60) # m3/s -> m3/day + + unregulated_inflow['node'] = resorr_node_id + unregulated_inflow.set_index(['time', 'node'], inplace=True) + unregulated_inflow = unregulated_inflow.to_xarray() + + return unregulated_inflow + +def _rat_read_storage_change(storage_change_fn, resorr_node_id): + storage_change = pd.read_csv(storage_change_fn, parse_dates=['date']).rename({ + 'date': 'time', + 'dS (m3)': 'storage_change' + }, axis='columns')[['time', 'storage_change']] + + # convert storage_change to daily - https://stackoverflow.com/a/73724900 + storage_change = storage_change.set_index('time') + storage_change = storage_change.resample('1D').apply(lambda x: np.nan if x.empty else x) + groups = storage_change['storage_change'].notna()[::-1].cumsum() + storage_change['storage_change'] = storage_change['storage_change'].fillna(0).groupby(groups).transform('mean') + storage_change['node'] = resorr_node_id + storage_change = storage_change.reset_index().set_index(['time', 'node']) + storage_change = storage_change.to_xarray() + + return storage_change + +def generate_forcings_from_rat( + reservoir_network, + inflow_dir, + storage_change_dir, + save_dir, + aggregate_freq='daily', + rat_output_level='final_outputs' + ): + """ + + Args: + reservoir_network (gnx.GeoDiGraph or str or pathlib.Path): reservoir network as GeoDiGraph or path to directory containing files generated by generate_network + inflow_dir (str or Path): path to directory containing inflow files + storage_change_dir (str or Path): path to directory containing storage change files + save_dir (str or Path): directory to save regulation data to + """ + # check if files exist + inflow_dir = Path(inflow_dir) + assert inflow_dir.exists() + + storage_change_dir = Path(storage_change_dir) + assert storage_change_dir.exists() + + save_dir = Path(save_dir) + if not save_dir.exists(): + print(f"passed save_dir does not exist, creating {save_dir}") + save_dir.mkdir(parents=True) + + if isinstance(reservoir_network, gnx.GeoDiGraph): + G = reservoir_network + elif isinstance(reservoir_network, str) or isinstance(reservoir_network, Path): + reservoir_network = gpd.read_file(Path(reservoir_network) / 'rivreg_network.shp') + reservoir_network_pts = gpd.read_file(Path(reservoir_network) / 'rivreg_network_pts.shp') + print(reservoir_network, reservoir_network_pts) + G = gnx.read_geofiles(reservoir_network, reservoir_network_pts, directed=True) + else: + raise TypeError(f"reservoir_network must be of type gnx.GeoDiGraph or str or pathlib.Path. type of passed reservoir_network - {type(reservoir_network)}") + + datasets_to_join = [] + for node_id in G: + node = G.nodes[node_id] + name = node['name'] + + # unregulated inflow + unregulated_inflow_fn = inflow_dir / f"{name}.csv" + + if not unregulated_inflow_fn.exists(): + print(f"Missing {unregulated_inflow_fn}") + continue + unregulated_inflow = _rat_read_inflow( + unregulated_inflow_fn, node_id, rat_output_level=rat_output_level) + datasets_to_join.append(unregulated_inflow) + + # storage change + storage_change_fn = storage_change_dir / f"{name}.csv" + if storage_change_fn.exists(): + storage_change = _rat_read_storage_change(storage_change_fn, node_id) + datasets_to_join.append(storage_change) + + rat_data = xr.merge(datasets_to_join) + + aggregated_volumes = calculate_volumes(aggregate(rat_data, frequency=aggregate_freq)) + + forcings = xr.Dataset( + data_vars={ + 'theoretical_natural_runoff': aggregated_volumes['unregulated_inflow'], + 'storage_change': aggregated_volumes['storage_change'], + 'dt': aggregated_volumes['dt'] + } + ) + + # save regulation data + forcings.to_netcdf(save_dir / 'resorr_forcings.nc') + + return forcings diff --git a/src/rat/plugins/resorr/network.py b/src/rat/plugins/resorr/network.py new file mode 100644 index 0000000..ba16f14 --- /dev/null +++ b/src/rat/plugins/resorr/network.py @@ -0,0 +1,301 @@ + +import numpy as np +import pandas as pd +import xarray as xr + +import networkx as nx + +from .reservoir import Reservoir + + +class ReservoirNetwork(nx.DiGraph): + def __init__(self, network, start_time, *args, **kwargs): + super().__init__(network, *args, **kwargs) + self.data = xr.Dataset( + coords={ + 'node': list(self.nodes), + 'time': pd.date_range(start_time, periods=1, freq='1D') + } + ) + self.network = {node: Reservoir(node) for node in self.nodes} + self.time = start_time + + self.FIRST_RUN = 1 + + def create_field(self, var, fill_value=0.0): + """Create a new field if not already present in self.data""" + if var not in self.data.variables: + self.data[var] = xr.DataArray( + data=np.full((len(self.nodes), len(self.data.time)), fill_value), + dims=['node', 'time'], + coords={'node': self.nodes, 'time': self.data.time} + ) + + def insert_new_time_step(self, time): + """Insert a new time step and fill with np.nan for all variables""" + if time not in self.data.time: + data_vars = {} + for var in self.data.variables: + if var not in ['node', 'time']: + data_vars[var] = (['node', 'time'], np.full((len(self.nodes), 1), np.nan)) + new_timestep_ds = xr.Dataset(data_vars=data_vars, coords={'node': self.nodes, 'time': [self.time]}) + # self.data = xr.merge([self.data, new_timestep_ds]) + self.data = xr.concat([self.data, new_timestep_ds], dim='time') + else: + raise ValueError(f"Time {time} already exists in data.") + + def update(self, forcings, dt=1, algorithm='wb', reservoir_algorithm='outlet'): + """Update the reservoir network for one time step. + + Args: + dt (int, optional): time step in days. Defaults to 1 day. + algorithm (str, optional): Defaults to 'wb'. + - hydraulic - outflow from reservoir is simulated to estimate storage change. + - wb - water balance: required forcings: `storage change`, `theoretical_natural_runoff`. + - wb_obs_outflow - water balance using observed outflow: required forcings: `storage_change`, `theoretical_natural_runoff`, `obs_outflow`. + - wb_travel_time - water balance with travel time: required forcings: `storage_change`, `unregulated_inflow`. This requires `travel_time` attribute at each edge of the network. + - wb_obs_outflow_upstream - water balance using observed outflow only at the most upstream dam: required forcings: `storage_change`, `theoretical_natural_runoff`, `obs_outflow`. + - wb_obs_inflow_upstream - water balance using observed inflow only at the most upstream dam: required forcings: `storage_change`, `theoretical_natural_runoff`, `obs_inflow`. + reservoir_algorithm (str, optional): Defaults to 'outlet'. + - outlet - outflow from reservoir is calculated using outlet discharge equation. + """ + + if self.FIRST_RUN: + self.create_field('inflow', np.nan) + self.create_field('outflow', np.nan) + self.create_field('regulated_runoff', np.nan) + self.create_field('natural_runoff', np.nan) + self.create_field('theoretical_natural_runoff', np.nan) + self.create_field('storage', np.nan) + self.create_field('storage_change', np.nan) + self.create_field('regulation', np.nan) + self.FIRST_RUN = 0 + else: + self.time += pd.Timedelta(days=dt) + self.insert_new_time_step(self.time) + + # insert forcings into data + for var in forcings.variables: + if var not in ['node', 'time']: + # if var not in self.data.variables: + self.data[var] = forcings[var]#.sel(time=self.time) + # else: + # self.data[var].loc[dict(time=self.time)] = forcings[var].sel(time=self.time) + + if algorithm == 'hydraulic': + self._alg_hydraulic(forcings, dt, reservoir_algorithm=reservoir_algorithm) + + if algorithm == 'hydraulic_travel_time': + self._alg_hydraulic_travel_time(forcings, dt) + + if algorithm == 'wb': + self._alg_wb(forcings) + + if algorithm == 'wb_obs_outflow': + self._alg_wb_obs_outflow(forcings) + + if algorithm == 'wb_travel_time': + self._alg_wb_travel_time(forcings) + + if algorithm == 'wb_obs_outflow_upstream': + self._alg_wb_obs_outflow_upstream(forcings) + + if algorithm == 'wb_obs_inflow_upstream': + self._alg_wb_obs_inflow_upstream(forcings) + + def _alg_wb(self, forcings): + for node in list(nx.topological_sort(self)): + storage_change = float(forcings['storage_change'].sel(node=node, time=self.time)) + theoretical_natural_runoff = float(forcings['theoretical_natural_runoff'].sel(node=node, time=self.time)) + + self.data['theoretical_natural_runoff'].loc[dict(node=node, time=self.time)] = theoretical_natural_runoff + + upstream_dams = list(self.predecessors(node)) + natural_runoff = theoretical_natural_runoff + regulated_runoff = 0.0 + if len(upstream_dams) > 0: + regulated_runoff = sum([float(self.data['outflow'].sel(node=n, time=self.time)) for n in upstream_dams]) + natural_runoff -= sum([float(self.data['theoretical_natural_runoff'].sel(node=n, time=self.time)) for n in upstream_dams]) + + inflow = max([0, float(natural_runoff + regulated_runoff)]) + outflow = max([0, inflow - storage_change]) + regulation = theoretical_natural_runoff - inflow + + self.data['inflow'].loc[dict(node=node, time=self.time)] = inflow + self.data['outflow'].loc[dict(node=node, time=self.time)] = outflow + self.data['regulation'].loc[dict(node=node, time=self.time)] = regulation + self.data['natural_runoff'].loc[dict(node=node, time=self.time)] = natural_runoff + self.data['regulated_runoff'].loc[dict(node=node, time=self.time)] = regulated_runoff + self.data['storage_change'].loc[dict(node=node, time=self.time)] = storage_change + + def _alg_wb_obs_outflow(self, forcings): + for node in list(nx.topological_sort(self)): + storage_change = float(forcings['storage_change'].sel(node=node, time=self.time)) + theoretical_natural_runoff = float(forcings['theoretical_natural_runoff'].sel(node=node, time=self.time)) + + self.data['theoretical_natural_runoff'].loc[dict(node=node, time=self.time)] = theoretical_natural_runoff + + upstream_dams = list(self.predecessors(node)) + natural_runoff = theoretical_natural_runoff + regulated_runoff = 0.0 + if len(upstream_dams) > 0: + regulated_runoff = sum([float(self.data['obs_outflow'].sel(node=n, time=self.time)) for n in upstream_dams]) + natural_runoff -= sum([float(self.data['theoretical_natural_runoff'].sel(node=n, time=self.time)) for n in upstream_dams]) + + inflow = max([0, float(natural_runoff + regulated_runoff)]) + outflow = max([0, inflow - storage_change]) + regulation = theoretical_natural_runoff - inflow + + self.data['inflow'].loc[dict(node=node, time=self.time)] = inflow + self.data['outflow'].loc[dict(node=node, time=self.time)] = outflow + self.data['regulation'].loc[dict(node=node, time=self.time)] = regulation + self.data['natural_runoff'].loc[dict(node=node, time=self.time)] = natural_runoff + self.data['regulated_runoff'].loc[dict(node=node, time=self.time)] = regulated_runoff + self.data['storage_change'].loc[dict(node=node, time=self.time)] = storage_change + + def _alg_wb_travel_time(self, forcings): + for node in list(nx.topological_sort(self)): + storage_change = float(forcings['storage_change'].sel(node=node, time=self.time)) + unregulated_inflow = float(forcings['unregulated_inflow'].sel(node=node, time=self.time)) + + upstreams = list(self.predecessors(node)) + upstream_outflow = 0.0 + upstream_unregulated_inflow = 0.0 + if len(upstreams) > 0: + time_lags = [self.time - pd.to_timedelta(round(self.get_edge_data(upstream, node)['travel_time']), 'd') for upstream in upstreams] + upstream_outflow = sum([float(self.data['outflow'].sel(node=n, time=t)) for n, t in zip(upstreams, time_lags)]) + upstream_unregulated_inflow = sum([float(self.data['unregulated_inflow'].sel(node=n, time=t)) for n, t in zip(upstreams, time_lags)]) + + regulated_inflow = max([0, float(unregulated_inflow - upstream_unregulated_inflow + upstream_outflow)]) + outflow = max([0, regulated_inflow - storage_change]) + + self.data['regulated_inflow'].loc[dict(node=node, time=self.time)] = regulated_inflow + self.data['outflow'].loc[dict(node=node, time=self.time)] = outflow + + def _alg_hydraulic(self, forcings, dt, reservoir_algorithm='linear_reservoir'): + for node in list(nx.topological_sort(self)): + # get reservoir object + res = self.network[node] + # get inflow for this node + natural_runoff = (forcings['natural_runoff'].sel(node=node, time=slice(self.time - pd.Timedelta(days=dt), self.time)).mean() * dt).data # m3 + + # if there are upstream nodes, the inflow + regulated_runoff = 0 + theoretical_natural_runoff = natural_runoff.copy() + upstreams = list(self.predecessors(node)) + if len(upstreams) > 0: + # sum the outflow from upstream nodes to the inflow of this node + regulated_runoff += sum([self.data['outflow'].sel(node=n, time=self.time) for n in upstreams]).data + theoretical_natural_runoff += sum([self.data['theoretical_natural_runoff'].sel(node=n, time=self.time) for n in upstreams]).data + + inflow = natural_runoff + regulated_runoff + res.update(inflow, dt=1, algorithm=reservoir_algorithm) # run one time-step of the reservoir model + + res_data = res.dumpds() + + self.data['inflow'].loc[dict(node=node, time=self.time)] = res_data['inflow'] + self.data['outflow'].loc[dict(node=node, time=self.time)] = res_data['outflow'] + self.data['regulation'].loc[dict(node=node, time=self.time)] = theoretical_natural_runoff - res_data['inflow'] + self.data['storage_change'].loc[dict(node=node, time=self.time)] = res_data['storage_change'] + self.data['storage'].loc[dict(node=node, time=self.time)] = res_data['storage'] + self.data['regulated_runoff'].loc[dict(node=node, time=self.time)] = regulated_runoff + self.data['theoretical_natural_runoff'].loc[dict(node=node, time=self.time)] = theoretical_natural_runoff + + def _alg_hydraulic_travel_time(self, forcings, dt): + for node in list(nx.topological_sort(self)): + # get reservoir object + res = self.network[node] + # get inflow for this node + natural_runoff = (forcings['natural_runoff'].sel(node=node, time=slice(self.time - pd.Timedelta(days=dt), self.time)).mean() * dt).data # m3 + + # if there are upstream nodes, the inflow + regulated_runoff = 0 + theoretical_natural_runoff = natural_runoff.copy() + upstreams = list(self.predecessors(node)) + if len(upstreams) > 0: + # sum the outflow from upstream nodes to the inflow of this node + time_lags = [self.get_edge_data(upstream, node)['travel_time'] for upstream in upstreams] + #### NOTE: method='nearest' is used as of now to get the nearest available upstream outflow value to the self.time-time_lag time. + #### this must be handled later, either by interpolating, or using np.nan for the first few time steps. + regulated_runoff += sum([self.data['outflow'].sel(node=n, time=self.time-pd.Timedelta(lag, 'D')) for n, lag in zip(upstreams, time_lags)]).data + theoretical_natural_runoff += sum([self.data['theoretical_natural_runoff'].sel(node=n, time=self.time-pd.Timedelta(lag, 'D')) for n, lag in zip(upstreams, time_lags)]).data + + inflow = natural_runoff + regulated_runoff + res.update(inflow, 1) # run one time-step of the reservoir model + res_data = res.dumpds() + + # update data + self.data['inflow'].loc[dict(node=node, time=self.time)] = res_data['inflow'] + self.data['outflow'].loc[dict(node=node, time=self.time)] = res_data['outflow'] + self.data['regulation'].loc[dict(node=node, time=self.time)] = theoretical_natural_runoff - res_data['inflow'] + self.data['storage_change'].loc[dict(node=node, time=self.time)] = res_data['storage_change'] + self.data['storage'].loc[dict(node=node, time=self.time)] = res_data['storage'] + self.data['regulated_runoff'].loc[dict(node=node, time=self.time)] = regulated_runoff + self.data['theoretical_natural_runoff'].loc[dict(node=node, time=self.time)] = theoretical_natural_runoff + + def _alg_wb_obs_outflow_upstream(self, forcings): + for node in list(nx.topological_sort(self)): + storage_change = float(forcings['storage_change'].sel(node=node, time=self.time)) + theoretical_natural_runoff = float(forcings['theoretical_natural_runoff'].sel(node=node, time=self.time)) + + self.data['theoretical_natural_runoff'].loc[dict(node=node, time=self.time)] = theoretical_natural_runoff + + upstream_dams = list(self.predecessors(node)) + natural_runoff = theoretical_natural_runoff + regulated_runoff = 0.0 + if len(upstream_dams) > 0: + regulated_runoff = sum([float(self.data['outflow'].sel(node=n, time=self.time)) for n in upstream_dams]) + natural_runoff -= sum([float(self.data['theoretical_natural_runoff'].sel(node=n, time=self.time)) for n in upstream_dams]) + + inflow = max([0, float(natural_runoff + regulated_runoff)]) + outflow = max([0, inflow - storage_change]) + regulation = theoretical_natural_runoff - inflow + else: + outflow = float(self.data['obs_outflow'].sel(node=node, time=self.time)) + if np.isnan(outflow): + # try to calculate using observed inflow and observed storage change + obs_inflow = float(forcings['obs_inflow'].sel(node=node, time=self.time)) + if np.isnan(obs_inflow): + # if observed inflow is also nan, use theoretical natural runoff + outflow = max([0, theoretical_natural_runoff - storage_change]) + else: + outflow = max([0, obs_inflow - storage_change]) + inflow = outflow + storage_change + regulation = theoretical_natural_runoff - inflow + + self.data['inflow'].loc[dict(node=node, time=self.time)] = inflow + self.data['outflow'].loc[dict(node=node, time=self.time)] = outflow + self.data['regulation'].loc[dict(node=node, time=self.time)] = regulation + self.data['natural_runoff'].loc[dict(node=node, time=self.time)] = natural_runoff + self.data['regulated_runoff'].loc[dict(node=node, time=self.time)] = regulated_runoff + self.data['storage_change'].loc[dict(node=node, time=self.time)] = storage_change + + def _alg_wb_obs_inflow_upstream(self, forcings): + for node in list(nx.topological_sort(self)): + storage_change = float(forcings['storage_change'].sel(node=node, time=self.time)) + theoretical_natural_runoff = float(forcings['theoretical_natural_runoff'].sel(node=node, time=self.time)) + + self.data['theoretical_natural_runoff'].loc[dict(node=node, time=self.time)] = theoretical_natural_runoff + + upstream_dams = list(self.predecessors(node)) + natural_runoff = theoretical_natural_runoff + regulated_runoff = 0.0 + if len(upstream_dams) > 0: + regulated_runoff = sum([float(self.data['outflow'].sel(node=n, time=self.time)) for n in upstream_dams]) + natural_runoff -= sum([float(self.data['theoretical_natural_runoff'].sel(node=n, time=self.time)) for n in upstream_dams]) + + inflow = max([0, float(natural_runoff + regulated_runoff)]) + outflow = max([0, inflow - storage_change]) + regulation = theoretical_natural_runoff - inflow + else: + obs_inflow = float(self.data['obs_inflow'].sel(node=node, time=self.time)) + outflow = max([0, obs_inflow - storage_change]) + inflow = max([0, outflow + storage_change]) + regulation = theoretical_natural_runoff - inflow + + self.data['inflow'].loc[dict(node=node, time=self.time)] = inflow + self.data['outflow'].loc[dict(node=node, time=self.time)] = outflow + self.data['regulation'].loc[dict(node=node, time=self.time)] = regulation + self.data['natural_runoff'].loc[dict(node=node, time=self.time)] = natural_runoff + self.data['regulated_runoff'].loc[dict(node=node, time=self.time)] = regulated_runoff + self.data['storage_change'].loc[dict(node=node, time=self.time)] = storage_change \ No newline at end of file diff --git a/src/rat/plugins/resorr/reservoir.py b/src/rat/plugins/resorr/reservoir.py new file mode 100644 index 0000000..8306eac --- /dev/null +++ b/src/rat/plugins/resorr/reservoir.py @@ -0,0 +1,90 @@ +import numpy as np +import pandas as pd +import xarray as xr +from pathlib import Path +import networkx as nx +import geonetworkx as gnx + +G = 9.81 # m/s2 + +class Reservoir: + def __init__(self, node, start_time='2000-01-01', **kwargs): + self.node = node + + self.reservoir_breadth = 500 # m + self.reservoir_depth = 500 # m + self.outlet_height = 0 # m + self.outlet_diameter = 0.1 # m + + self.reaction_factor = 0.01 # 1/d + self.storage = 1e4 # m3 + self.inflow = 0 # m3/d + self.storage_change = np.nan + self.time = start_time if isinstance(start_time, pd.Timestamp) else pd.to_datetime(start_time) + self.outlet_area = np.pi * (self.outlet_diameter / 2) ** 2 + + # calculate derived properties + self.water_height = self.storage / (self.reservoir_breadth * self.reservoir_depth) + self.height_above_outlet = max([0, self.water_height - self.outlet_height]) # return height above outlet, or 0 if below outlet + self.outlet_velocity = np.sqrt(2 * G * self.height_above_outlet) + self.outlet_flow = self.outlet_area * self.outlet_velocity + + self.FIRST_RUN = 1 + + def update(self, inflow, algorithm='linear_reservoir', dt=1): + """update for one time step + + Args: + inflow (number): inflow rate (m3/d) + dt (number): time step (1 day) + """ + # prev_time = self.time + # self.time += pd.Timedelta(seconds=dt) + if self.FIRST_RUN: + self.FIRST_RUN = 0 # if this is the first run, don't advance time + else: + self.time += pd.Timedelta(days=dt) + + self.inflow = inflow + + if algorithm == 'outlet': + self._alg_outlet(dt) + + if algorithm == 'linear_reservoir': + self._alg_linear_reservoir(dt) + + def _alg_linear_reservoir(self, dt): + self.outflow = self.reaction_factor * self.storage + self.storage_change = self.inflow - self.outflow + self.storage += self.storage_change * dt + + return { + 'inflow': self.inflow, + 'outflow': self.outflow, + 'storage': self.storage, + 'storage_change': self.storage_change, + } + + def _alg_outlet(self, dt): + last_storage = self.storage + self.storage += self.inflow * dt + + # using water height method + self.water_height = self.storage / (self.reservoir_breadth * self.reservoir_depth) + self.height_above_outlet = max([0, self.water_height - self.outlet_height]) + + self.outlet_velocity = np.sqrt(2 * G * self.height_above_outlet) # m/s + self.outflow = self.outlet_area * self.outlet_velocity # m3/d + + self.storage -= self.outflow * dt # update storage again after outflow is calcualted + self.storage_change = self.storage - last_storage # calculate storage change + + def dumpds(self): + return { + 'inflow': self.inflow, + 'outflow': self.outflow, + 'storage': self.storage, + 'storage_change': self.storage_change, + 'water_height': self.water_height, + 'height_above_outlet': self.height_above_outlet, + } \ No newline at end of file diff --git a/src/rat/plugins/resorr/runResorr.py b/src/rat/plugins/resorr/runResorr.py new file mode 100644 index 0000000..5205cf6 --- /dev/null +++ b/src/rat/plugins/resorr/runResorr.py @@ -0,0 +1,134 @@ +import rioxarray as rxr +import xarray as xr +import numpy as np +import geopandas as gpd +import pandas as pd +import matplotlib.pyplot as plt +from pathlib import Path +import networkx as nx +import os + +from logging import getLogger +from rat.utils.logging import LOG_NAME, LOG_LEVEL1_NAME +from rat.utils.utils import create_directory +from rat.plugins.resorr.data_prep import generate_network +from resorr.data_prep import generate_forcings_from_rat, generate_network +from resorr.network import ReservoirNetwork +from tqdm.notebook import tqdm + +log = getLogger(f"{LOG_NAME}.{__name__}") +log_level1 = getLogger(f"{LOG_LEVEL1_NAME}.{__name__}") + +def runResorr(basin_data_dir,basin_station_latlon_file,resorr_startDate,resorr_endDate): + ''' + Runs the regulation algorith ResORR. + Dams connected to each other in a basin is identified and regulated reservoir data is computed. + + Parameters + ---------- + basin_data_dir: str + Path to the RAT basin folder + basin_station_latlon_file: str + Path to file containing reservoir locations as latitude, longtitude. + resorr_startDate : str + Start date for ResORR calculation + resorr_endDate : str + End date for ResORR calculation + + ''' + # read in the flow direction file and reservoir location file + flow_dir_fn = os.path.join(basin_data_dir,'ro','pars','fl.tif') + res_location_fn = basin_station_latlon_file + # creating directory to store the network files + save_dir = create_directory(os.path.join(basin_data_dir,'post_processing','resorr_network'), True) + + # read in the reservoir location file, create a geodataframe and set the crs. + reservoirs = gpd.read_file(res_location_fn) + reservoirs['geometry'] = gpd.points_from_xy(reservoirs['lon'], reservoirs['lat']) + reservoirs.set_crs('epsg:4326', inplace=True) + + # generating the network + try: + G = generate_network( + flow_dir_fn, res_location_fn, save_dir + ) + except Exception as e: + log.info(f'RESORR: Network generation failed due to error: {e}') + log_level1.warning('RESORR failed to run because there are no regulated reservoirs in the basin. Please validate the flow direction file if this is unexpected.') + return + else: + log.info('RESORR: Network generated') + log.info(f'Network files stored at {save_dir}') + + # generating the forcings + try: + forcings = generate_forcings_from_rat( + G, + os.path.join(basin_data_dir,'rat_outputs','inflow'), + os.path.join(basin_data_dir,'final_outputs','dels'), + save_dir, + rat_output_level='rat_outputs', + aggregate_freq='daily' + ) + + start_time = pd.to_datetime(resorr_startDate.strftime("%Y-%m-%d")) + end_time = pd.to_datetime(resorr_endDate.strftime("%Y-%m-%d")) + forcings = forcings.sel(time=slice(start_time, end_time)) + + reservoir_network = ReservoirNetwork(G, start_time) + + for timestep in forcings.time.values: + dt = forcings['dt'].sel(time=timestep).values.item() + reservoir_network.update(forcings, dt, 'wb') + + reservoir_network.data.to_netcdf(os.path.join(save_dir, 'resorr_outputs.nc')) + + # Saving the regulated reservoir data to csv in the RAT final_outputs folder + filetype_list = ['inflow','outflow'] + #List of reservoirs that are part of regulated network + reg_res_edges = list(G.edges()) + reg_res_nodes = [] + for edge in reg_res_edges: + for res_node in edge: + reg_res_nodes.append(res_node) + + for node in reservoir_network.data.node.values: + res_name = reservoirs['name'][node] + if node in reg_res_nodes: + for file_type in filetype_list: + # Prepping the regulated data + reg_data = reservoir_network.data[file_type].sel(node = node) + reg_data = reg_data.to_dataframe() + reg_data.drop('node', axis = 1, inplace = True) + reg_data.rename(columns = {'time':'date'}, inplace = True) + reg_data['date'] = reg_data.index + reg_data.reset_index(drop=True, inplace=True) + reg_data = pd.DataFrame(reg_data) + cols = reg_data.columns.tolist() + cols = cols[-1:] + cols[:-1] + reg_data = reg_data[cols] + + # Saving the regulated data to csv + filename = os.path.join(basin_data_dir,'final_outputs',file_type,'Regulated',res_name + '.csv') + if not os.path.exists(filename): + create_directory(os.path.join(basin_data_dir,'final_outputs',file_type,'Regulated'), True) + reg_data.to_csv(filename, index = False) + else: + file_df = pd.read_csv(filename) + file_df = pd.concat([file_df, reg_data], axis=0) + file_df.to_csv(filename, index = False) + + except Exception as e: + log.exception(f'RESORR failed due to error: {e}') + log_level1.error('RESORR failed to run. Please check Level-2 log file for more details.') + return + else: + log_level1.info('RESORR run completed successfully') + + + + + + + + diff --git a/src/rat/rat_basin.py b/src/rat/rat_basin.py index 8ba1a66..fe874dd 100644 --- a/src/rat/rat_basin.py +++ b/src/rat/rat_basin.py @@ -33,6 +33,8 @@ from rat.utils.convert_to_final_outputs import convert_sarea, convert_inflow, convert_dels, convert_evaporation, convert_outflow, convert_altimeter, copy_aec_files +from rat.plugins.resorr.runResorr import runResorr + # Step-(-1): Reading Configuration settings to run RAT # Step-0: Creating required directory structure for RAT # Step-1: Downloading and Pre-processing of meteorolgical data @@ -763,7 +765,18 @@ def rat_basin(config, rat_logger, forecast_mode=False): rat_logger.info("Converted Area Elevation Curve to the Output Format.") else: rat_logger.info("Converted Area Elevation Curve to the Output Format.") - + + ## Plugins: RESORR + if config.get('PLUGINS', {}).get('resorr'): + resorr_startDate = config['BASIN']['start'] + resorr_endDate = config['BASIN']['end'] + # check if basin_station_latlon_file exists: + if os.path.exists(basin_station_latlon_file): + rat_logger.info("Running RESORR") + runResorr(basin_data_dir,basin_station_latlon_file,resorr_startDate,resorr_endDate) + else: + rat_logger.warning("No station latlon file found to run RESORR. Try running Step-8 or provide station_latlon_path in routing section of config file.") + # Clearing out memory space as per user input if(config['CLEAN_UP'].get('clean_metsim')): rat_logger.info("Clearing up memory space: Removal of metsim output files") From 2200b9f69a7071f5dd88438a56515e34edd008a6 Mon Sep 17 00:00:00 2001 From: SarathUW Date: Mon, 19 Feb 2024 19:19:38 -0800 Subject: [PATCH 3/3] Removed import tqdm --- src/rat/plugins/resorr/runResorr.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/rat/plugins/resorr/runResorr.py b/src/rat/plugins/resorr/runResorr.py index 5205cf6..7141a48 100644 --- a/src/rat/plugins/resorr/runResorr.py +++ b/src/rat/plugins/resorr/runResorr.py @@ -14,7 +14,6 @@ from rat.plugins.resorr.data_prep import generate_network from resorr.data_prep import generate_forcings_from_rat, generate_network from resorr.network import ReservoirNetwork -from tqdm.notebook import tqdm log = getLogger(f"{LOG_NAME}.{__name__}") log_level1 = getLogger(f"{LOG_LEVEL1_NAME}.{__name__}")