import os
import concurrent.futures as cf
from typing import Union, List, Dict
import pandas as pd
from .camels import Camels
from ..utils import get_cpus
from ..utils import check_st_en
from ..utils import check_attributes, download, _unzip
from .._backend import netCDF4, xarray as xr
from ._map import (
observed_streamflow_cms,
observed_streamflow_mmd,
mean_air_temp,
min_air_temp_with_specifier,
max_air_temp_with_specifier,
max_air_temp,
min_air_temp,
mean_air_temp_with_specifier,
total_precipitation,
total_precipitation_with_specifier,
total_potential_evapotranspiration,
total_potential_evapotranspiration_with_specifier,
simulated_streamflow_cms,
actual_evapotranspiration,
actual_evapotranspiration_with_specifier,
solar_radiation_with_specifier,
mean_vapor_pressure,
mean_vapor_pressure_with_specifier,
mean_rel_hum,
mean_rel_hum_with_specifier,
rel_hum_with_specifier,
mean_windspeed,
u_component_of_wind,
v_component_of_wind,
solar_radiation,
downward_longwave_radiation,
snow_water_equivalent,
mean_specific_humidity,
)
from ._map import (
catchment_area,
gauge_latitude,
gauge_longitude,
slope
)
[docs]
class GRDCCaravan(Camels):
"""
This is a dataset of 5357 catchments from around the globe following the works of
`Faerber et al., 2023 <https://zenodo.org/records/10074416>`_ . The dataset consists of 39
dynamic (timeseries) features and 211 static features. The dynamic (timeseries) data
spands from 1950-01-02 to 2019-05-19.
if xarray + netCDF4 packages are installed then netcdf files will be downloaded
otherwise csv files will be downloaded and used.
Examples
--------
>>> from water_datasets import GRDCCaravan
>>> dataset = GRDCCaravan()
>>> df = dataset.fetch(stations=1, as_dataframe=True)
>>> df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it
>>> df.shape
(26801, 39)
get name of all stations as list
>>> stns = dataset.stations()
>>> len(stns)
5357
get data of 10 % of stations as dataframe
>>> df = dataset.fetch(0.1, as_dataframe=True)
>>> df.shape
(1045239, 535)
The returned dataframe is a multi-indexed data
>>> df.index.names == ['time', 'dynamic_features']
True
get data by station id
>>> df = dataset.fetch(stations='GRDC_3664802', as_dataframe=True).unstack()
>>> df.shape
(26800, 39)
get names of available dynamic features
>>> dataset.dynamic_features
get only selected dynamic features
>>> data = dataset.fetch(1, as_dataframe=True,
... dynamic_features=['total_precipitation_sum', 'potential_evaporation_sum', 'temperature_2m_mean', 'streamflow']).unstack()
>>> data.shape
(26800, 4)
get names of available static features
>>> dataset.static_features
... # get data of 10 random stations
>>> df = dataset.fetch(10, as_dataframe=True)
>>> df.shape # remember this is a multiindexed dataframe
(1045239, 10)
when we get both static and dynamic data, the returned data is a dictionary
with ``static`` and ``dyanic`` keys.
>>> data = dataset.fetch(stations='GRDC_3664802', static_features="all", as_dataframe=True)
>>> data['static'].shape, data['dynamic'].shape
((1, 211), (1045200, 1))
>>> coords = dataset.stn_coords() # returns coordinates of all stations
>>> coords.shape
(5357, 2)
>>> dataset.stn_coords('GRDC_3664802') # returns coordinates of station whose id is GRDC_3664802
-26.2271 -51.0771
>>> dataset.stn_coords(['GRDC_3664802', 'GRDC_1159337']) # returns coordinates of two stations
"""
url = {
'caravan-grdc-extension-nc.tar.gz':
"https://zenodo.org/records/10074416/files/caravan-grdc-extension-nc.tar.gz?download=1",
'caravan-grdc-extension-csv.tar.gz':
"https://zenodo.org/records/10074416/files/caravan-grdc-extension-csv.tar.gz?download=1"
}
[docs]
def __init__(
self,
path=None,
overwrite: bool = False,
verbosity: int = 1,
**kwargs
):
if xr is None:
self.ftype == 'csv'
if "caravan-grdc-extension-nc.tar.gz" in self.url:
self.url.pop("caravan-grdc-extension-nc.tar.gz")
else:
self.ftype = 'netcdf'
if "caravan-grdc-extension-csv.tar.gz" in self.url:
self.url.pop("caravan-grdc-extension-csv.tar.gz")
super().__init__(path=path, verbosity=verbosity, **kwargs)
for _file, url in self.url.items():
fpath = os.path.join(self.path, _file)
if not os.path.exists(fpath) and not overwrite:
if self.verbosity > 0:
print(f"Downloading {_file} from {url + _file}")
download(url + _file, outdir=self.path, fname=_file, )
_unzip(self.path)
elif self.verbosity > 0:
print(f"{_file} at {self.path} already exists")
self.boundary_file = os.path.join(
self.shapefiles_path,
'grdc_basin_shapes.shp'
)
self._create_boundary_id_map(self.boundary_file, 0)
# so that we dont have to read the files again and again
self._stations = self.other_attributes().index.to_list()
self._static_attributes = self.static_data().columns.tolist()
self._dynamic_attributes = self._read_dynamic_for_stn(self.stations()[0]).columns.tolist()
self.dyn_fname = ''
@property
def static_map(self) -> Dict[str, str]:
return {
'area': catchment_area(),
'gauge_lat': gauge_latitude(),
'gauge_lon': gauge_longitude(),
}
@property
def dyn_map(self):
return {
'streamflow': observed_streamflow_cms(),
'temperature_2m_mean': mean_air_temp_with_specifier('2m'),
'temperature_2m_min': min_air_temp_with_specifier('2m'),
'temperature_2m_max': max_air_temp_with_specifier('2m'),
'total_precipitation_sum': total_precipitation(),
}
@property
def static_features(self):
return self._static_attributes
@property
def dynamic_features(self):
return self._dynamic_attributes
@property
def shapefiles_path(self):
if self.ftype == 'csv':
return os.path.join(self.path, 'GRDC-Caravan-extension-csv',
'shapefiles', 'grdc')
return os.path.join(self.path, 'GRDC-Caravan-extension-nc',
'shapefiles', 'grdc')
@property
def attrs_path(self):
if self.ftype == 'csv':
return os.path.join(self.path, 'GRDC-Caravan-extension-csv',
'attributes', 'grdc')
return os.path.join(self.path, 'GRDC-Caravan-extension-nc',
'attributes', 'grdc')
@property
def ts_path(self) -> os.PathLike:
if self.ftype == 'csv':
return os.path.join(self.path, 'GRDC-Caravan-extension-csv',
'timeseries', 'grdc')
return os.path.join(self.path, 'GRDC-Caravan-extension-nc',
'timeseries', self.ftype, 'grdc')
def stations(self) -> List[str]:
return self._stations
@property
def _coords_name(self) -> List[str]:
return ['gauge_lat', 'gauge_lon']
@property
def _area_name(self) -> str:
return 'area'
@property
def start(self):
return pd.Timestamp("19500102")
@property
def end(self):
return pd.Timestamp("20230519")
@property
def _q_name(self) -> str:
return observed_streamflow_cms()
def other_attributes(self) -> pd.DataFrame:
return pd.read_csv(os.path.join(self.attrs_path, 'attributes_other_grdc.csv'), index_col='gauge_id')
def hydroatlas_attributes(self) -> pd.DataFrame:
return pd.read_csv(os.path.join(self.attrs_path, 'attributes_hydroatlas_grdc.csv'), index_col='gauge_id')
def caravan_attributes(self) -> pd.DataFrame:
return pd.read_csv(os.path.join(self.attrs_path, 'attributes_caravan_grdc.csv'), index_col='gauge_id')
def static_data(self) -> pd.DataFrame:
return pd.concat([
self.other_attributes(),
self.hydroatlas_attributes(),
self.caravan_attributes(),
], axis=1)
[docs]
def fetch_station_features(
self,
station: str,
dynamic_features: Union[str, list, None] = 'all',
static_features: Union[str, list, None] = None,
as_ts: bool = False,
st: Union[str, None] = None,
en: Union[str, None] = None,
**kwargs
) -> Dict[str, pd.DataFrame]:
"""
Fetches features for one station.
Parameters
-----------
station :
station id/gauge id for which the data is to be fetched.
dynamic_features : str/list, optional
names of dynamic features/attributes to fetch
static_features :
names of static features/attributes to be fetches
as_ts : bool
whether static features are to be converted into a time
series or not. If yes then the returned time series will be of
same length as that of dynamic attribtues.
st : str,optional
starting point from which the data to be fetched. By default,
the data will be fetched from where it is available.
en : str, optional
end point of data to be fetched. By default the dat will be fetched
Returns
-------
Dict
dataframe if as_ts is True else it returns a dictionary of static and
dynamic features for a station/gauge_id
Examples
--------
>>> from water_datasets import GRDCCaravan
>>> dataset = GRDCCaravan()
>>> dataset.fetch_station_features('912101A')
"""
dynamic_features = check_attributes(dynamic_features, self.dynamic_features, 'dynamic_features')
st, en = check_st_en(st, en, self.start, self.end)
if self.ftype == "netcdf":
fpath = os.path.join(self.ts_path, f'{station}.nc')
df = xr.open_dataset(fpath).to_dataframe()
else:
fpath = os.path.join(self.ts_path, f'{station}.csv')
df = pd.read_csv(fpath, index_col='date', parse_dates=True)
df = df.loc[st:en, :]
df.rename(columns=self.dyn_map, inplace=True)
if static_features is not None:
static = self.fetch_static_features(station, static_features)
return {'static': static, 'dynamic': df[dynamic_features]}
[docs]
def fetch_static_features(
self,
stn_id: Union[str, list] = "all",
static_features: Union[str, list] = "all"
) -> pd.DataFrame:
"""
Returns static features of one or more stations.
Parameters
----------
stn_id : str
name/id of station/stations of which to extract the data
static_features : list/str, optional (default="all")
The name/names of features to fetch. By default, all available
static features are returned.
Returns
-------
pd.DataFrame
a pandas dataframe of shape (stations, features)
Examples
---------
>>> from water_datasets import GRDCCaravan
>>> dataset = GRDCCaravan()
get all static data of all stations
>>> static_data = dataset.fetch_static_features(stns)
>>> static_data.shape
(1555, 111)
get static data of one station only
>>> static_data = dataset.fetch_static_features('DE110010')
>>> static_data.shape
(1, 111)
get the names of static features
>>> dataset.static_features
get only selected features of all stations
>>> static_data = dataset.fetch_static_features(stns, ['p_mean', 'p_seasonality', 'frac_snow'])
>>> static_data.shape
(1555, 3)
>>> data = dataset.fetch_static_features('DE110000', static_features=['p_mean', 'p_seasonality', 'frac_snow'])
>>> data.shape
(1, 3)
"""
stations = check_attributes(stn_id, self.stations(), 'stations')
df = self.static_data()
features = check_attributes(static_features, df.columns.tolist(),
"static_features")
return df.loc[stations, features]
def _read_dynamic_from_csv(
self,
stations,
dynamic_features,
st=None,
en=None) -> dict:
dynamic_features = check_attributes(dynamic_features, self.dynamic_features)
stations = check_attributes(stations, self.stations())
if len(stations) > 10:
cpus = self.processes or min(get_cpus(), 64)
with cf.ProcessPoolExecutor(max_workers=cpus) as executor:
results = executor.map(
self._read_dynamic_for_stn,
stations,
)
dyn = {stn: data.loc[st:en, dynamic_features] for stn, data in zip(stations, results)}
else:
dyn = {
stn: self._read_dynamic_for_stn(stn).loc[st: en, dynamic_features] for stn in stations
}
return dyn
def _read_dynamic_for_stn(self, stn_id) -> pd.DataFrame:
if self.ftype == "netcdf":
fpath = os.path.join(self.ts_path, f'{stn_id}.nc')
df = xr.load_dataset(fpath).to_dataframe()
else:
fpath = os.path.join(self.ts_path, f'{stn_id}.csv')
df = pd.read_csv(fpath, index_col='date', parse_dates=True)
df.rename(columns=self.dyn_map, inplace=True)
df.index.name = 'time'
df.columns.name = 'dynamic_features'
return df