Source code for aqua_fetch.rr._denmark

import os
from typing import Union, List, Dict

import pandas as pd

from .camels import Camels
from ..utils import check_attributes

from ._map import (
    total_precipitation,
    mean_air_temp,
    total_potential_evapotranspiration_with_specifier,
    actual_evapotranspiration,
)

from ._map import (
    catchment_area,
    gauge_latitude,
    gauge_longitude,
    slope
    )


[docs] class Caravan_DK(Camels): """ Reads Caravan extension Denmark - Danish dataset for large-sample hydrology following the works of `Koch and Schneider 2022 <https://doi.org/10.34194/geusb.v49.829>`_ . The dataset is downloaded from `zenodo <https://zenodo.org/record/7962379>`_ . This dataset consists of static and dynamic features from 308 danish catchments. There are 38 dynamic (time series) features from 1981-01-02 to 2020-12-31 with daily timestep and 211 static features for each of 308 catchments. Please note that there is an updated version of this dataset following the works of `Liu et al., 2024 <https://doi.org/10.5194/essd-2024-292>`_ . This dataset is associated with the :py:class:`water_datasets.CAMELS_DK` class which can be imported as follows: >>> from water_datasets import CAMELS_DK Examples --------- >>> from water_datasets import Caravan_DK >>> dataset = Caravan_DK() >>> data = dataset.fetch(0.1, as_dataframe=True) >>> data.shape (569751, 30) # 30 represents number of stations >>> data.index.names == ['time', 'dynamic_features'] True >>> df = dataset.fetch(stations=1, as_dataframe=True) >>> df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it >>> df.shape (14609, 39) # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 308 # get data by station id >>> df = dataset.fetch(stations='80001', as_dataframe=True).unstack() >>> df.shape (14609, 39) # get names of available dynamic features >>> dataset.dynamic_features # get only selected dynamic features >>> df = dataset.fetch(1, as_dataframe=True, ... dynamic_features=['snow_depth_water_equivalent_mean', 'temperature_2m_mean', ... 'potential_evaporation_sum', 'total_precipitation_sum', 'streamflow']).unstack() >>> df.shape (14609, 5) # get names of available static features >>> dataset.static_features # get data of 10 random stations >>> df = dataset.fetch(10, as_dataframe=True) >>> df.shape (569751, 10) # remember this is multi-indexed DataFrame # when we get both static and dynamic data, the returned data is a dictionary # with ``static`` and ``dynamic`` keys. >>> data = dataset.fetch(stations='80001', static_features="all", as_dataframe=True) >>> data['static'].shape, data['dynamic'].shape ((1, 211), (569751, 1)) """ url = "https://zenodo.org/record/7962379"
[docs] def __init__(self, path=None, overwrite=False, to_netcdf: bool = True, **kwargs): """ Parameters ---------- path : str If the data is alredy downloaded then provide the complete path to it. If None, then the data will be downloaded. The data is downloaded once and therefore susbsequent calls to this class will not download the data unless ``overwrite`` is set to True. overwrite : bool If the data is already down then you can set it to True, to make a fresh download. to_netcdf : bool whether to convert all the data into one netcdf file or not. This will fasten repeated calls to fetch etc but will require netcdf5 package as well as xarry. """ super(Caravan_DK, self).__init__(path=path, **kwargs) self.path = path self._download(overwrite=overwrite) self._static_features = self.__static_features() self._dynamic_features = self.__dynamic_features() self.dyn_fname = os.path.join(self.path, 'camelsdk_dyn.nc') if to_netcdf: self._maybe_to_netcdf('camelsdk_dyn') self.boundary_file = os.path.join( path, "Caravan_DK", "Caravan_extension_DK", "Caravan_extension_DK", "Caravan_extension_DK", "shapefiles", "camelsdk", "camelsdk_basin_shapes.shp" ) self._create_boundary_id_map(self.boundary_file, 3)
@property def static_map(self) -> Dict[str, str]: return { 'catch_area': catchment_area(), 'catch_outlet_lat': gauge_latitude(), 'slope_mean': slope('mkm-1'), 'catch_outlet_lon': gauge_longitude(), } @property def csv_path(self): return os.path.join(self.path, "Caravan_extension_DK", "Caravan_extension_DK", "Caravan_extension_DK", "timeseries", "csv", "camelsdk") @property def nc_path(self): return os.path.join(self.path, "Caravan_extension_DK", "Caravan_extension_DK", "Caravan_extension_DK", "timeseries", "netcdf", "camelsdk") @property def other_attr_fpath(self): """returns path to attributes_other_camelsdk.csv file """ return os.path.join(self.path, "Caravan_extension_DK", "Caravan_extension_DK", "Caravan_extension_DK", "attributes", "camelsdk", "attributes_other_camelsdk.csv") @property def caravan_attr_fpath(self): """returns path to attributes_caravan_camelsdk.csv file """ return os.path.join(self.path, "Caravan_extension_DK", "Caravan_extension_DK", "Caravan_extension_DK", "attributes", "camelsdk", "attributes_caravan_camelsdk.csv") def stations(self) -> List[str]: return [fname.split(".csv")[0][9:] for fname in os.listdir(self.csv_path)] def _read_csv(self, stn: str) -> pd.DataFrame: fpath = os.path.join(self.csv_path, f"camelsdk_{stn}.csv") df = pd.read_csv(os.path.join(fpath)) df.index = pd.to_datetime(df.pop('date')) df.rename(columns=self.dyn_map, inplace=True) return df @property def dynamic_features(self) -> List[str]: """returns names of dynamic features""" return self._dynamic_features def __dynamic_features(self) -> List[str]: return self._read_csv('100006').columns.to_list() @property def static_features(self) -> List[str]: """returns static features for Denmark catchments""" return self._static_features def __static_features(self) -> List[str]: caravan = pd.read_csv(self.caravan_attr_fpath) _ = caravan.pop('gauge_id') other = pd.read_csv(self.other_attr_fpath) _ = other.pop('gauge_id') atlas = pd.read_csv(self.hyd_atlas_fpath) _ = atlas.pop('gauge_id') return pd.concat([caravan, other, atlas], axis=1).columns.to_list() @property def start(self): # start of data return pd.Timestamp('1981-01-02 00:00:00') @property def end(self) -> pd.Timestamp: # end of data return pd.Timestamp('2020-12-31 00:00:00') @property def dyn_map(self) -> Dict[str, str]: return { }
[docs] def q_mmd( self, stations: Union[str, List[str]] = 'all' ) -> pd.DataFrame: """ returns streamflow in the units of milimeter per day. This is obtained by diving ``streamflow``/area parameters ---------- stations : str/list name/names of stations. Default is ``all``, which will return area of all stations Returns -------- pd.DataFrame a pandas DataFrame whose indices are time-steps and columns are catchment/station ids. """ stations = check_attributes(stations, self.stations()) q = self.fetch_stations_features(stations, dynamic_features='obs_q_cms', as_dataframe=True) q.index = q.index.get_level_values(0) area_m2 = self.area(stations) * 1e6 # area in m2 q = (q / area_m2) * 86400 # cms to m/day return q * 1e3 # to mm/day
def _area_name(self, stn: str) -> str: return "area" # def area( # self, # stations: Union[str, List[str]] = None # ) ->pd.Series: # """ # Returns area (Km2) of all catchments as pandas series # parameters # ---------- # stations : str/list # name/names of stations. Default is None, which will return # area of all stations # Returns # -------- # pd.Series # a pandas series whose indices are catchment ids and values # are areas of corresponding catchments. # Examples # --------- # >>> from water_quality import CAMELS_DK # >>> dataset = CAMELS_DK() # >>> dataset.area() # returns area of all stations # >>> dataset.stn_coords('100010') # returns area of station whose id is 912101A # >>> dataset.stn_coords(['100010', '210062']) # returns area of two stations # """ # stations = check_attributes(stations, self.stations()) # df = pd.read_csv(self.other_attr_fpath, # dtype={"gauge_id": str, # 'gauge_lat': float, # 'gauge_lon': float, # 'area': float, # 'gauge_name': str, # 'country': str # }) # df.index = [name.split('camelsdk_')[1] for name in df['gauge_id']] # return df.loc[stations, 'area']
[docs] def stn_coords( self, stations: Union[str, List[str]] = 'all' ) -> pd.DataFrame: """ returns coordinates of stations as DataFrame with ``long`` and ``lat`` as columns. Parameters ---------- stations : name/names of stations. If not given, coordinates of all stations will be returned. Returns ------- coords : pandas DataFrame with ``long`` and ``lat`` columns. The length of dataframe will be equal to number of stations wholse coordinates are to be fetched. Examples -------- >>> dataset = Caravan_DK() >>> dataset.stn_coords() # returns coordinates of all stations >>> dataset.stn_coords('100010') # returns coordinates of station whose id is 912101A >>> dataset.stn_coords(['100010', '210062']) # returns coordinates of two stations """ df = pd.read_csv(self.other_attr_fpath, dtype={"gauge_id": str, 'gauge_lat': float, 'gauge_lon': float, 'area': float, 'gauge_name': str, 'country': str }) df.index = [name.split('camelsdk_')[1] for name in df['gauge_id']] df = df[['gauge_lat', 'gauge_lon']] df.columns = ['lat', 'long'] stations = check_attributes(stations, self.stations()) return df.loc[stations, :]
def _read_dynamic_from_csv( self, stations, dynamic_features, st=None, en=None) -> dict: features = check_attributes(dynamic_features, self.dynamic_features) dyn = {stn: self._read_csv(stn)[features] for stn in stations} return dyn
[docs] def fetch_static_features( self, stn_id: Union[str, List[str]] = 'all', features: Union[str, List[str]] = 'all' ) -> pd.DataFrame: """ Returns static features of one or more stations. Parameters ---------- stn_id : str name/id of station/stations of which to extract the data features : list/str, optional (default="all") The name/names of features to fetch. By default, all available static features are returned. Returns ------- pd.DataFrame a pandas dataframe of shape (stations, features) Examples --------- >>> from water_datasets import Caravan_DK >>> dataset = Caravan_DK() get the names of stations >>> stns = dataset.stations() >>> len(stns) 308 get all static data of all stations >>> static_data = dataset.fetch_static_features(stns) >>> static_data.shape (308, 211) get static data of one station only >>> static_data = dataset.fetch_static_features('80001') >>> static_data.shape (1, 211) get the names of static features >>> dataset.static_features get only selected features of all stations >>> static_data = dataset.fetch_static_features(stns, ['gauge_lat', 'area']) >>> static_data.shape (308, 2) >>> data = dataset.fetch_static_features('80001', features=['gauge_lat', 'area']) >>> data.shape (1, 2) """ stations = check_attributes(stn_id, self.stations()) features = check_attributes(features, self.static_features) df = pd.concat([self.hyd_atlas_attributes(), self.other_static_attributes(), self.caravan_static_attributes()], axis=1) return df.loc[stations, features]
@property def hyd_atlas_fpath(self): return os.path.join(self.path, "Caravan_extension_DK", "Caravan_extension_DK", "Caravan_extension_DK", "attributes", "camelsdk", "attributes_hydroatlas_camelsdk.csv")
[docs] def hyd_atlas_attributes(self, stations='all') -> pd.DataFrame: """ Returns -------- a pandas DataFrame of shape (308, 196) """ stations = check_attributes(stations, self.stations()) df = pd.read_csv(self.hyd_atlas_fpath) indices = df.pop('gauge_id') df.index = [idx[9:] for idx in indices] return df
[docs] def other_static_attributes(self, stations='all') -> pd.DataFrame: """ Returns -------- a pandas DataFrame of shape (308, 5) """ stations = check_attributes(stations, self.stations()) df = pd.read_csv(self.other_attr_fpath) indices = df.pop('gauge_id') df.index = [idx[9:] for idx in indices] return df
[docs] def caravan_static_attributes(self, stations='all') -> pd.DataFrame: """ Returns -------- a pandas DataFrame of shape (308, 10) """ stations = check_attributes(stations, self.stations()) df = pd.read_csv(self.caravan_attr_fpath) indices = df.pop('gauge_id') df.index = [idx[9:] for idx in indices] return df