import os
import random
import warnings
from typing import Union, List, Dict
import numpy as np
import pandas as pd
from .._datasets import Datasets
from .._backend import netCDF4
from .._backend import shapefile
from .._backend import xarray as xr, plt, easy_mpl, plt_Axes
from ..utils import check_attributes
# directory separator
SEP = os.sep
def gb_message():
link = "https://doi.org/10.5285/8344e4f3-d2ea-44f5-8afa-86d2987543a9"
raise ValueError(f"Dwonlaoad the data from {link} and provide the directory "
f"path as dataset=Camels(data=data)")
[docs]
class Camels(Datasets):
"""
This is the parent class for invidual rainfall-runoff datasets like CAMELS-GB etc.
This class is not meant to be for direct use. It is inherited by the child classes
which are specific to a dataset like CAMELS-GB, CAMELS-AUS etc.
This class first downloads the CAMELS dataset if it is not already downloaded.
Then the selected features for a selected id are fetched and provided to the
user using the method `fetch`.
Attributes
-----------
- path str/path: diretory of the dataset
- dynamic_features list: tells which dynamic features are available in
this dataset
- static_features list: a list of static features.
- static_attribute_categories list: tells which kinds of static features
are present in this category.
Methods
---------
- stations : returns name/id of stations for which the data (dynamic features)
exists as list of strings.
- fetch : fetches all features (both static and dynamic type) of all
station/gauge_ids or a speficified station. It can also be used to
fetch all features of a number of stations ids either by providing
their guage_id or by just saying that we need data of 20 stations
which will then be chosen randomly.
- fetch_dynamic_features :
fetches speficied dynamic features of one specified station. If the
dynamic attribute is not specified, all dynamic features will be
fetched for the specified station. If station is not specified, the
specified dynamic features will be fetched for all stations.
- fetch_static_features :
works same as `fetch_dynamic_features` but for `static` features.
Here if the `category` is not specified then static features of
the specified station for all categories are returned.
stations : returns list of stations
"""
DATASETS = {
'CAMELS_BR': {'url': "https://zenodo.org/record/3964745#.YA6rUxZS-Uk",
},
'CAMELS-GB': {'url': gb_message},
}
[docs]
def __init__(
self,
path: str = None,
timestep: str = "D",
id_idx_in_bndry_shape: int = None,
to_netcdf: bool = True,
overwrite: bool = False,
verbosity: int = 1,
**kwargs
):
"""
parameters
-----------
path : str
if provided and the directory exists, then the data will be read
from this directory. If provided and the directory does not exist,
then the data will be downloaded in this directory. If not provided,
then the data will be downloaded in the default directory.
timestep : str
verbosity : int
0: no message will be printed
kwargs : dict
Any other keyword arguments for the Datasets class
"""
super(Camels, self).__init__(path=path, verbosity=verbosity, overwrite=overwrite, **kwargs)
self.bndry_id_map = {}
self.timestep = timestep
if netCDF4 is None:
if to_netcdf:
warnings.warn("netCDF4 module is not installed. Please install it to save data in netcdf format")
to_netcdf = False
self.to_netcdf = to_netcdf
@property
def dyn_map(self) -> Dict[str, str]:
return {}
@property
def dyn_factors(self) -> Dict[str, float]:
return {}
def _create_boundary_id_map(self, boundary_file, id_idx_in_bndry_shape):
if boundary_file is None:
return
if shapefile is None:
warnings.warn("shapefile module is not installed. Please install it to use boundary file")
return
from shapefile import Reader
if self.verbosity > 1:
print(f"loading boundary file {boundary_file}")
assert os.path.exists(boundary_file), f"{boundary_file} does not exist"
bndry_sf = Reader(boundary_file)
# shapefile of chille contains spanish characters which can not be
# decoded with utf-8
if os.path.basename(bndry_sf.shapeName) in [
'catchments_camels_cl_v1_3',
"WKMSBSN",
'estreams_catchments',
'CAMELS_DE_catchments',
]:
bndry_sf.encoding = 'ISO-8859-1'
if self.verbosity > 2:
print(f"loaded boundary file {boundary_file}")
self.bndry_id_map = self._get_map(bndry_sf,
id_index=id_idx_in_bndry_shape,
name="bndry_shape")
if self.verbosity>2:
print(f"created boundary id map of length {len(self.bndry_id_map)}")
bndry_sf.close()
return
@staticmethod
def _get_map(sf_reader, id_index=None, name: str = '') -> Dict[str, int]:
fieldnames = [f[0] for f in sf_reader.fields[1:]]
if len(fieldnames) > 1:
if id_index is None:
raise ValueError(f"""
more than one fileds are present in {name} shapefile
i.e: {fieldnames}.
Please provide a value for id_idx_in_{name} that must be
less than {len(fieldnames)}
""")
else:
id_index = 0
if os.path.basename(sf_reader.shapeName) in ["CAMELS_CH_catchments"]:
catch_ids_map = {
str(int(rec[id_index])): idx for idx, rec in enumerate(sf_reader.iterRecords())
}
else:
catch_ids_map = {
str(rec[id_index]): idx for idx, rec in enumerate(sf_reader.iterRecords())
}
return catch_ids_map
def stations(self) -> List[str]:
raise NotImplementedError("The base class must implement this method")
def _read_dynamic_from_csv(self, stations, dynamic_features, st=None,
en=None) -> dict:
raise NotImplementedError
[docs]
def fetch_static_features(
self,
stn_id: Union[str, list] = None,
static_features: Union[str, list] = None
) -> pd.DataFrame:
"""Fetches all or selected static features of one or more stations.
Parameters
----------
stn_id : str
name/id of station of which to extract the data
static_features : list/str, optional (default="all")
The name/names of features to fetch. By default, all available
static features are returned.
Returns
-------
pd.DataFrame
a pandas dataframe
Examples
--------
>>> from water_datasets import CAMELS_AUS
>>> camels = CAMELS_AUS()
>>> camels.fetch_static_features('224214A')
>>> camels.static_features
>>> camels.fetch_static_features('224214A',
... static_features=['elev_mean', 'relief', 'ksat', 'pop_mean'])
"""
raise NotImplementedError
@property
def start(self): # start of data
raise NotImplementedError
@property
def end(self): # end of data
raise NotImplementedError
@property
def dynamic_features(self) -> list:
raise NotImplementedError
@property
def _area_name(self) -> str:
"""name of feature from static_features to be used as area"""
raise NotImplementedError
@property
def _mmd_feature_name(self) -> str:
return None
@property
def _q_name(self) -> str:
return None
@property
def _coords_name(self) -> List[str]:
"""
names of features from static_features to be used as station
coordinates (lat, long)
"""
raise NotImplementedError
[docs]
def area(
self,
stations: Union[str, List[str]] = 'all'
) -> pd.Series:
"""
Returns area (Km2) of all/selected catchments as pandas series
parameters
----------
stations : str/list (default=None)
name/names of stations. Default is ``all``, which will return
area of all stations
Returns
--------
pd.Series
a pandas series whose indices are catchment ids and values
are areas of corresponding catchments.
Examples
---------
>>> from water_datasets import CAMELS_CH
>>> dataset = CAMELS_CH()
>>> dataset.area() # returns area of all stations
>>> dataset.area('2004') # returns area of station whose id is 2004
>>> dataset.area(['2004', '6004']) # returns area of two stations
"""
stations = check_attributes(stations, self.stations(), 'stations')
df = self.fetch_static_features(static_features=[self._area_name])
df.columns = ['area']
return df.loc[stations, 'area']
def _check_length(self, st, en):
if st is None:
st = self.start
else:
st = pd.Timestamp(st)
if en is None:
en = self.end
else:
en = pd.Timestamp(en)
return st, en
def to_ts(self, static, st, en, as_ts=False, freq='D'):
st, en = self._check_length(st, en)
if as_ts:
idx = pd.date_range(st, en, freq=freq)
static = pd.DataFrame(np.repeat(static.values, len(idx), axis=0), index=idx,
columns=static.columns)
return static
else:
return static
@property
def camels_dir(self):
"""Directory where all camels datasets will be saved. This will under
datasets directory"""
return os.path.join(self.base_ds_dir, "CAMELS")
[docs]
def fetch(self,
stations: Union[str, list, int, float] = "all",
dynamic_features: Union[List[str], str, None] = 'all',
static_features: Union[str, List[str], None] = None,
st: Union[None, str] = None,
en: Union[None, str] = None,
as_dataframe: bool = False,
**kwargs
) -> Union[dict, pd.DataFrame]:
"""
Fetches the features of one or more stations.
Arguments:
stations :
It can have following values:
- int : number of (randomly selected) stations to fetch
- float : fraction of (randomly selected) stations to fetch
- str : name/id of station to fetch. However, if ``all`` is
provided, then all stations will be fetched.
- list : list of names/ids of stations to fetch
dynamic_features : If not None, then it is the features to be
fetched. If None, then all available features are fetched
static_features : list of static features to be fetches. None
means no static attribute will be fetched.
st : starting date of data to be returned. If None, the data will be
returned from where it is available.
en : end date of data to be returned. If None, then the data will be
returned till the date data is available.
as_dataframe : whether to return dynamic features as pandas
dataframe or as xarray dataset.
kwargs : keyword arguments to read the files
returns:
If both static and dynamic features are obtained then it returns a
dictionary whose keys are station/gauge_ids and values are the
features and dataframes.
Otherwise either dynamic or static features are returned.
Examples
--------
>>> from water_datasets import CAMELS_AUS
>>> dataset = CAMELS_AUS()
>>> # get data of 10% of stations
>>> df = dataset.fetch(stations=0.1, as_dataframe=True) # returns a multiindex dataframe
... # fetch data of 5 (randomly selected) stations
>>> five_random_stn_data = dataset.fetch(stations=5, as_dataframe=True)
... # fetch data of 3 selected stations
>>> three_selec_stn_data = dataset.fetch(stations=['912101A','912105A','915011A'], as_dataframe=True)
... # fetch data of a single stations
>>> single_stn_data = dataset.fetch(stations='318076', as_dataframe=True)
... # get both static and dynamic features as dictionary
>>> data = dataset.fetch(1, static_features="all", as_dataframe=True) # -> dict
>>> data['dynamic']
... # get only selected dynamic features
>>> sel_dyn_features = dataset.fetch(stations='318076',
... dynamic_features=['streamflow_MLd', 'solarrad_AWAP'], as_dataframe=True)
... # fetch data between selected periods
>>> data = dataset.fetch(stations='318076', st="20010101", en="20101231", as_dataframe=True)
"""
if isinstance(stations, int):
# the user has asked to randomly provide data for some specified number of stations
stations = random.sample(self.stations(), stations)
elif isinstance(stations, list):
pass
elif isinstance(stations, str):
if stations == 'all':
stations = self.stations()
else:
stations = [stations]
elif isinstance(stations, float):
num_stations = int(len(self.stations()) * stations)
stations = random.sample(self.stations(), num_stations)
elif stations is None:
# fetch for all stations
stations = self.stations()
else:
raise TypeError(f"Unknown value provided for stations {stations}")
return self.fetch_stations_features(
stations,
dynamic_features,
static_features,
st=st,
en=en,
as_dataframe=as_dataframe,
**kwargs
)
def _maybe_to_netcdf(self, fname: str):
self.dyn_fname = os.path.join(self.path, f'{fname}.nc')
if self.to_netcdf:
if not os.path.exists(self.dyn_fname) or self.overwrite:
# saving all the data in netCDF file using xarray
print(f'converting data to netcdf format for faster io operations')
data = self.fetch(static_features=None)
data.to_netcdf(self.dyn_fname)
return
[docs]
def fetch_stations_features(
self,
stations: Union[str, List[str]],
dynamic_features: Union[str, List[str]] = 'all',
static_features: Union[str, List[str]] = None,
st: Union[str, pd.Timestamp] = None,
en: Union[str, pd.Timestamp] = None,
as_dataframe: bool = False,
**kwargs
):
"""Reads features of more than one stations.
parameters
----------
stations :
list of stations for which data is to be fetched.
dynamic_features :
list of dynamic features to be fetched.
if ``all``, then all dynamic features will be fetched.
static_features : list of static features to be fetched.
If ``all``, then all static features will be fetched. If None,
`then no static attribute will be fetched.
st :
start of data to be fetched.
en :
end of data to be fetched.
as_dataframe :
whether to return the dynamic data as pandas dataframe. default
is xr.Dataset object
kwargs dict:
additional keyword arguments
Returns
-------
Dynamic and static features of multiple stations. Dynamic features
are by default returned as xr.Dataset unless `as_dataframe` is True, in
such a case, it is a pandas dataframe with multiindex. If xr.Dataset,
it consists of `data_vars` equal to number of stations and for each
station, the `DataArray` is of dimensions (time, dynamic_features).
where `time` is defined by `st` and `en` i.e. length of `DataArray`.
In case, when the returned object is pandas DataFrame, the first index
is `time` and second index is `dyanamic_features`. Static features
are always returned as pandas DataFrame and have shape
`(stations, static_features)`. If `dynamic_features` is None,
then they are not returned and the returned value only consists of
static features. Same holds true for `static_features`.
If both are not None, then the returned type is a dictionary with
`static` and `dynamic` keys.
Raises:
ValueError, if both dynamic_features and static_features are None
Examples
--------
>>> from water_datasets import CAMELS_AUS
>>> dataset = CAMELS_AUS()
... # find out station ids
>>> dataset.stations()
... # get data of selected stations as xarray Dataset
>>> dataset.fetch_stations_features(['912101A', '912105A', '915011A'])
... # get data of selected stations as pandas DataFrame
>>> dataset.fetch_stations_features(['912101A', '912105A', '915011A'],
... as_dataframe=True)
... # get both dynamic and static features of selected stations
>>> dataset.fetch_stations_features(['912101A', '912105A', '915011A'],
... dynamic_features=['streamflow_mmd', 'tmax_AWAP'], static_features=['elev_mean'])
"""
st, en = self._check_length(st, en)
stations = check_attributes(stations, self.stations(), 'stations')
if dynamic_features is not None:
dynamic_features = check_attributes(dynamic_features, self.dynamic_features, 'dynamic_features')
if netCDF4 is None or not os.path.exists(self.dyn_fname):
# read from csv files
# following code will run only once when fetch is called inside init method
dyn = self._read_dynamic_from_csv(stations, dynamic_features, st=st, en=en)
else:
dyn = xr.open_dataset(self.dyn_fname) # daataset
dyn = dyn[stations].sel(dynamic_features=dynamic_features, time=slice(st, en))
if as_dataframe:
dyn = dyn.to_dataframe(['time', 'dynamic_features'])
if static_features is not None:
static = self.fetch_static_features(stations, static_features)
dyn = _handle_dynamic(dyn, as_dataframe)
stns = {'dynamic': dyn, 'static': static}
else:
dyn = _handle_dynamic(dyn, as_dataframe)
stns = dyn
elif static_features is not None:
return self.fetch_static_features(stations, static_features)
else:
raise ValueError
return stns
[docs]
def fetch_dynamic_features(
self,
stn_id: str,
dynamic_features='all',
st=None,
en=None,
as_dataframe=False
):
"""Fetches all or selected dynamic features of one station.
Parameters
----------
stn_id : str
name/id of station of which to extract the data
features : list/str, optional (default="all")
The name/names of features to fetch. By default, all available
dynamic features are returned.
st : Optional (default=None)
start time from where to fetch the data.
en : Optional (default=None)
end time untill where to fetch the data
as_dataframe : bool, optional (default=False)
if true, the returned data is pandas DataFrame otherwise it
is xarray dataset
Examples
--------
>>> from water_datasets import CAMELS_AUS
>>> camels = CAMELS_AUS()
>>> camels.fetch_dynamic_features('224214A', as_dataframe=True).unstack()
>>> camels.dynamic_features
>>> camels.fetch_dynamic_features('224214A',
... features=['tmax_AWAP', 'vprp_AWAP', 'streamflow_mmd'],
... as_dataframe=True).unstack()
"""
assert isinstance(stn_id, str), f"station id must be string is is of type {type(stn_id)}"
station = [stn_id]
return self.fetch_stations_features(
station,
dynamic_features,
None,
st=st,
en=en,
as_dataframe=as_dataframe
)
[docs]
def fetch_station_features(
self,
station: str,
dynamic_features: Union[str, list, None] = 'all',
static_features: Union[str, list, None] = None,
as_ts: bool = False,
st: Union[str, None] = None,
en: Union[str, None] = None,
**kwargs
) -> pd.DataFrame:
"""
Fetches features for one station.
Parameters
-----------
station :
station id/gauge id for which the data is to be fetched.
dynamic_features : str/list, optional
names of dynamic features/attributes to fetch
static_features :
names of static features/attributes to be fetches
as_ts : bool
whether static features are to be converted into a time
series or not. If yes then the returned time series will be of
same length as that of dynamic attribtues.
st : str,optional
starting point from which the data to be fetched. By default,
the data will be fetched from where it is available.
en : str, optional
end point of data to be fetched. By default the dat will be fetched
Returns
-------
pd.DataFrame
dataframe if as_ts is True else it returns a dictionary of static and
dynamic features for a station/gauge_id
Examples
--------
>>> from water_datasets import CAMELS_AUS
>>> dataset = CAMELS_AUS()
>>> dataset.fetch_station_features('912101A')
"""
st, en = self._check_length(st, en)
station_df = pd.DataFrame()
if dynamic_features:
dynamic = self.fetch_dynamic_features(station, dynamic_features, st=st,
en=en, **kwargs)
station_df = pd.concat([station_df, dynamic])
if static_features is not None:
static = self.fetch_static_features(station, static_features)
if as_ts:
station_df = pd.concat([station_df, static], axis=1)
else:
station_df = {'dynamic': station_df, 'static': static}
elif static_features is not None:
station_df = self.fetch_static_features(station, static_features)
return station_df
[docs]
def plot_stations(
self,
stations: List[str] = 'all',
marker='.',
ax: plt_Axes = None,
show: bool = True,
**kwargs
) -> plt_Axes:
"""
plots coordinates of stations
Parameters
----------
stations :
name/names of stations. If not given, all stations will be plotted
marker :
marker to use.
ax : plt.Axes
matplotlib axes to draw the plot. If not given, then
new axes will be created.
show : bool
**kwargs
Returns
-------
plt.Axes
Examples
--------
>>> from water_datasets import CAMELS_AUS
>>> dataset = CAMELS_AUS()
>>> dataset.plot_stations()
>>> dataset.plot_stations(['1', '2', '3'])
>>> dataset.plot_stations(marker='o', ms=0.3)
>>> ax = dataset.plot_stations(marker='o', ms=0.3, show=False)
>>> ax.set_title("Stations")
>>> plt.show()
"""
xy = self.stn_coords(stations)
ax = easy_mpl.plot(xy.loc[:, 'long'].values,
xy.loc[:, 'lat'].values,
marker, ax=ax, show=False, **kwargs)
if show:
plt.show()
return ax
[docs]
def q_mmd(
self,
stations: Union[str, List[str]] = "all"
) -> pd.DataFrame:
"""
returns streamflow in the units of milimeter per day. This is obtained
by diving ``q``/area
parameters
----------
stations : str/list
name/names of stations. Default is ``all``, which will return
area of all stations
Returns
--------
pd.DataFrame
a pandas DataFrame whose indices are time-steps and columns
are catchment/station ids.
"""
stations = check_attributes(stations, self.stations())
if self._mmd_feature_name is None:
q = self.fetch_stations_features(
stations,
dynamic_features=self._q_name, # todo: we should always use obs_q_cms
as_dataframe=True)
q.index = q.index.get_level_values(0)
area_m2 = self.area(stations) * 1e6 # area in m2
q = (q / area_m2) * 86400 # cms to m/day
return q * 1e3 # to mm/day
else:
q = self.fetch_stations_features(
stations,
dynamic_features=self._mmd_feature_name,
as_dataframe=True)
q.index = q.index.get_level_values(0)
return q
[docs]
def stn_coords(
self,
stations: Union[str, List[str]] = 'all'
) -> pd.DataFrame:
"""
returns coordinates of stations as DataFrame
with ``long`` and ``lat`` as columns.
Parameters
----------
stations :
name/names of stations. If not given, coordinates
of all stations will be returned.
Returns
-------
coords :
pandas DataFrame with ``long`` and ``lat`` columns.
The length of dataframe will be equal to number of stations
wholse coordinates are to be fetched.
Examples
--------
>>> from water_datasets import CAMELS_CH
>>> dataset = CAMELS_CH()
>>> dataset.stn_coords() # returns coordinates of all stations
>>> dataset.stn_coords('2004') # returns coordinates of station whose id is 2004
>>> dataset.stn_coords(['2004', '6004']) # returns coordinates of two stations
>>> from water_datasets import CAMELS_AUS
>>> dataset = CAMELS_AUS()
>>> dataset.stn_coords() # returns coordinates of all stations
>>> dataset.stn_coords('912101A') # returns coordinates of station whose id is 912101A
>>> dataset.stn_coords(['G0050115', '912101A']) # returns coordinates of two stations
"""
df = self.fetch_static_features(static_features=self._coords_name)
df.columns = ['lat', 'long']
stations = check_attributes(stations, self.stations())
df = df.loc[stations, :].astype(self.fp)
return self.transform_stn_coords(df)
[docs]
def get_boundary(
self,
catchment_id: str,
as_type: str = 'numpy'
):
"""
returns boundary of a catchment in a required format
Parameters
----------
catchment_id : str
name/id of catchment
as_type : str
'numpy' or 'geopandas'
Examples
--------
>>> from water_datasets import CAMELS_SE
>>> dataset = CAMELS_SE()
>>> dataset.get_boundary(dataset.stations()[0])
"""
if shapefile is None:
raise ModuleNotFoundError("shapefile module is not installed. Please install it to use boundary file")
from shapefile import Reader
bndry_sf = Reader(self.boundary_file)
bndry_shp = bndry_sf.shape(self.bndry_id_map[catchment_id])
bndry_sf.close()
xyz = np.array(bndry_shp.points)
xyz = self.transform_coords(xyz)
return xyz
[docs]
def plot_catchment(
self,
catchment_id: str,
ax: plt_Axes = None,
show: bool = True,
**kwargs
) -> plt.Axes:
"""
plots catchment boundaries
Parameters
----------
ax : plt.Axes
matplotlib axes to draw the plot. If not given, then
new axes will be created.
show : bool
**kwargs
Returns
-------
plt.Axes
Examples
--------
>>> from water_datasets import CAMELS_AUS
>>> dataset = CAMELS_AUS()
>>> dataset.plot_catchment()
>>> dataset.plot_catchment(marker='o', ms=0.3)
>>> ax = dataset.plot_catchment(marker='o', ms=0.3, show=False)
>>> ax.set_title("Catchment Boundaries")
>>> plt.show()
"""
catchment = self.get_boundary(catchment_id)
if isinstance(catchment, np.ndarray):
if catchment.ndim == 2:
ax = easy_mpl.plot(catchment[:, 0], catchment[:, 1],
show=False, ax=ax, **kwargs)
else:
raise NotImplementedError
# elif isinstance(catchment, geojson.geometry.Polygon):
# coords = catchment['coordinates']
# x = [i for i, j in coords[0]]
# y = [j for i, j in coords[0]]
# ax = plot(x, y, show=False, ax=ax, **kwargs)
# elif isinstance(catchment, SPolygon):
# x, y = catchment.exterior.xy
# ax = plot(x, y, show=False, ax=ax, **kwargs)
# elif isinstance(catchment, SMultiPolygon):
# raise NotImplementedError
else:
raise NotImplementedError
if show:
plt.show()
return ax
def _handle_dynamic(dyn, as_dataframe: bool):
if as_dataframe and isinstance(dyn, dict) and isinstance(list(dyn.values())[0], pd.DataFrame):
# if the dyn is a dictionary of key, DataFames, we will return a MultiIndex
# dataframe instead of a dictionary
dyn = xr.Dataset(dyn).to_dataframe(['time',
'dynamic_features']) # todo wiered that we have to first convert to xr.Dataset and then to DataFrame
elif isinstance(dyn, dict) and isinstance(list(dyn.values())[0], pd.DataFrame):
# dyn is a dictionary of key, DataFames and we have to return xr Dataset
# dyn = pd.concat(dyn, axis=0, keys=dyn.keys())
dyn = xr.Dataset(dyn)
return dyn