Source code for aqua_fetch.rr._bull

import os
from typing import List, Union, Dict
import concurrent.futures as cf

import pandas as pd

from .camels import Camels
from .._backend import netCDF4
from .._backend import xarray as xr
from ..utils import check_attributes, get_cpus
from ._map import (
    total_potential_evapotranspiration_with_specifier,
    solar_radiation,
    max_solar_radiation,
    min_solar_radiation,
    mean_thermal_radiation,
    max_thermal_radiation,
    min_themal_radiation,
    max_air_temp_with_specifier,
    min_air_temp_with_specifier,
    mean_air_temp_with_specifier,
    total_precipitation_with_specifier,
    max_dewpoint_temperature,
    min_dewpoint_temperature,
    mean_dewpoint_temperature,
    mean_dewpoint_temperature_with_specifier,
    mean_potential_evaporation,
    observed_streamflow_cms,
    max_dewpoint_temperature,
    snow_water_equivalent,
    min_snow_water_equivalent,
    max_snow_water_equivalent,
    u_component_of_wind_with_specifier,
    v_component_of_wind_with_specifier
)

from ._map import (
    catchment_area,
    gauge_latitude,
    gauge_longitude,
    slope
    )

BUL_COLUMNS = [
    'snow_depth_water_equivalent_mean_BULL', 'surface_net_solar_radiation_mean_BULL',
    'surface_net_thermal_radiation_mean_BULL',
    'surface_pressure_mean_BULL', 'temperature_2m_mean_BULL', 'dewpoint_temperature_2m_mean_BULL',
    'u_component_of_wind_10m_mean_BULL',
    'v_component_of_wind_10m_mean_BULL', 'volumetric_soil_water_layer_1_mean_BULL',
    'volumetric_soil_water_layer_2_mean_BULL',
    'volumetric_soil_water_layer_3_mean_BULL', 'volumetric_soil_water_layer_4_mean_BULL',
    'snow_depth_water_equivalent_min_BULL',
    'surface_net_solar_radiation_min_BULL', 'surface_net_thermal_radiation_min_BULL', 'surface_pressure_min_BULL',
    'temperature_2m_min_BULL', 'dewpoint_temperature_2m_min_BULL', 'u_component_of_wind_10m_min_BULL',
    'v_component_of_wind_10m_min_BULL',
    'volumetric_soil_water_layer_1_min_BULL', 'volumetric_soil_water_layer_2_min_BULL',
    'volumetric_soil_water_layer_3_min_BULL',
    'volumetric_soil_water_layer_4_min_BULL', 'snow_depth_water_equivalent_max_BULL',
    'surface_net_solar_radiation_max_BULL',
    'surface_net_thermal_radiation_max_BULL', 'surface_pressure_max_BULL', 'temperature_2m_max_BULL',
    'dewpoint_temperature_2m_max_BULL',
    'u_component_of_wind_10m_max_BULL', 'v_component_of_wind_10m_max_BULL', 'volumetric_soil_water_layer_1_max_BULL',
    'volumetric_soil_water_layer_2_max_BULL', 'volumetric_soil_water_layer_3_max_BULL',
    'volumetric_soil_water_layer_4_max_BULL',
    'total_precipitation_sum_BULL', 'potential_evaporation_sum_BULL', 'streamflow_BULL'
]


[docs] class Bull(Camels): """ Following the works of `Aparicio et al., 2024 <https://doi.org/10.1038/s41597-024-03594-5>`_. The data is taken from the `Zenodo repository <https://zenodo.org/records/10629809>`_. This dataset contains 484 stations with 55 dynamic (time series) features and 214 static features. The dynamic features span from 1951 to 2021. Examples --------- >>> from water_datasets import Bull >>> dataset = Bull() >>> data = dataset.fetch(0.1, as_dataframe=True) >>> data.shape (1426260, 48) # 40 represents number of stations Since data is a multi-index dataframe, we can get data of one station as below >>> data['BULL_9007'].unstack().shape # the name of station could be different (25932, 13) If we don't set as_dataframe=True, then the returned data will be a xarray Dataset >>> data = dataset.fetch(0.1) >>> type(data) xarray.core.dataset.Dataset >>> data.dims FrozenMappingWarningOnValuesAccess({'time': 25932, 'dynamic_features': 55}) >>> len(data.data_vars) 48 >>> df = dataset.fetch(stations=1, as_dataframe=True) # get data of only one random station >>> df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it >>> df.shape (25932, 55) # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 484 # get data by station id >>> df = dataset.fetch(stations='BULL_9007', as_dataframe=True).unstack() >>> df.shape (25932, 55) # get names of available dynamic features >>> dataset.dynamic_features # get only selected dynamic features >>> df = dataset.fetch(1, as_dataframe=True, ... dynamic_features=['potential_evapotranspiration_AEMET', 'temperature_mean_AEMET', ... 'total_precipitation_ERA5_Land', 'obs_q_cms']).unstack() >>> df.shape (25932, 4) # get names of available static features >>> dataset.static_features # get data of 10 random stations >>> df = dataset.fetch(10, as_dataframe=True) >>> df.shape (166166, 10) # remember this is multi-indexed DataFrame # when we get both static and dynamic data, the returned data is a dictionary # with ``static`` and ``dyanic`` keys. >>> data = dataset.fetch(stations='BULL_9007', static_features="all", as_dataframe=True) >>> data['static'].shape, data['dynamic'].shape ((1, 214), (1426260, 1)) >>> coords = dataset.stn_coords() # returns coordinates of all stations >>> coords.shape (484, 2) >>> dataset.stn_coords('BULL_9007') # returns coordinates of station whose id is GRDC_3664802 41.298 -1.967 >>> dataset.stn_coords(['BULL_9007', 'BULL_8083']) # returns coordinates of two stations """ url = "https://zenodo.org/records/10629809"
[docs] def __init__( self, path, overwrite=False, **kwargs ): super().__init__(path, **kwargs) self._download(overwrite=overwrite) if netCDF4 is None: self.ftype = "csv" else: self.ftype = "netcdf" self._dynamic_features = self._read_dynamic_for_stn(self.stations()[0]).columns.tolist() self._static_features = list(set(self.static_data().columns.tolist())) self.boundary_file = os.path.join(self.shapefiles_path, "BULL_basin_shapes.shp") self._create_boundary_id_map(self.boundary_file, 0) self.dyn_fname = ''
@property def static_map(self) -> Dict[str, str]: return { 'area': catchment_area(), 'gauge_lat': gauge_latitude(), 'gauge_lon': gauge_longitude(), } @property def dyn_map(self): return { 'dewpoint_temperature_2m_max_BULL': max_dewpoint_temperature(), 'dewpoint_temperature_2m_mean_BULL': mean_dewpoint_temperature(), 'dewpoint_temperature_2m_min_BULL': min_dewpoint_temperature(), # todo: are we considering height 'potential_evaporation_sum_BULL': mean_potential_evaporation(), # todo: is it mean or total? 'streamflow': observed_streamflow_cms(), 'potential_evapotranspiration_AEMET': total_potential_evapotranspiration_with_specifier('AEMET'), 'potential_evapotranspiration_EMO1_arc': total_potential_evapotranspiration_with_specifier('EMO1arc'), 'potential_evapotranspiration_ERA5_Land': total_potential_evapotranspiration_with_specifier('ERA5Land'), 'surface_net_solar_radiation_mean_BULL': solar_radiation(), 'surface_net_solar_radiation_max_BULL': max_solar_radiation(), 'surface_net_solar_radiation_min_BULL': min_solar_radiation(), 'surface_net_thermal_radiation_max_BULL': max_thermal_radiation(), 'surface_net_thermal_radiation_mean_BULL': mean_thermal_radiation(), 'surface_net_thermal_radiation_min_BULL': min_themal_radiation(), 'temperature_max_AEMET': max_air_temp_with_specifier('AEMET'), 'temperature_max_EMO1_arc': max_air_temp_with_specifier('EMO1arc'), 'temperature_max_ERA5_Land': max_air_temp_with_specifier('ERA5Land'), 'temperature_mean_AEMET': mean_air_temp_with_specifier('AEMET'), 'temperature_mean_EMO1_arc': mean_air_temp_with_specifier('EMO1arc'), 'temperature_mean_ERA5_Land': mean_air_temp_with_specifier('ERA5Land'), 'temperature_min_AEMET': min_air_temp_with_specifier('AEMET'), 'temperature_min_EMO1_arc': min_air_temp_with_specifier('EMO1arc'), 'temperature_min_ERA5_Land': min_air_temp_with_specifier('ERA5Land'), 'total_precipitation_AEMET': total_precipitation_with_specifier('AEMET'), 'total_precipitation_EMO1_arc': total_precipitation_with_specifier('EMO1arc'), 'total_precipitation_ERA5_Land': total_precipitation_with_specifier('ERA5Land'), 'total_precipitation_sum_BULL': total_precipitation_with_specifier('BULL'), 'snow_depth_water_equivalent_max_BULL': max_snow_water_equivalent(), 'snow_depth_water_equivalent_mean_BULL': snow_water_equivalent(), 'snow_depth_water_equivalent_min_BULL': min_snow_water_equivalent(), #'surface_pressure_max_BULL': # todo: is it same as air pressure 'temperature_2m_max_BULL': max_air_temp_with_specifier('2m'), 'temperature_2m_mean_BULL': mean_air_temp_with_specifier('2m'), 'temperature_2m_min_BULL': min_air_temp_with_specifier('2m'), 'u_component_of_wind_10m_max_BULL': u_component_of_wind_with_specifier('max_10m'), 'u_component_of_wind_10m_mean_BULL': u_component_of_wind_with_specifier('mean_10m'), 'u_component_of_wind_10m_min_BULL': u_component_of_wind_with_specifier('min_10m'), 'v_component_of_wind_10m_max_BULL': v_component_of_wind_with_specifier('max_10m'), 'v_component_of_wind_10m_mean_BULL': v_component_of_wind_with_specifier('mean_10m'), 'v_component_of_wind_10m_min_BULL': v_component_of_wind_with_specifier('min_10m'), } @property def attributes_path(self): return os.path.join(self.path, "attributes", "attributes") @property def shapefiles_path(self): return os.path.join(self.path, "shapefiles", "shapefiles") @property def ts_path(self): return os.path.join(self.path, "timeseries", "timeseries") @property def q_path(self): return os.path.join(self.ts_path, self.ftype, "streamflow") @property def aemet_path(self): return os.path.join(self.ts_path, self.ftype, "AEMET") @property def bull_path(self): return os.path.join(self.ts_path, self.ftype, "BULL") @property def era5_land_path(self): return os.path.join(self.ts_path, self.ftype, "ERA5_Land") @property def emo1_arc_path(self): return os.path.join(self.ts_path, self.ftype, "EMO1_arc") @property def _q_name(self) -> str: return "obs_q_cms" @property def _coords_name(self) -> List[str]: return ['gauge_lat', 'gauge_lon'] @property def _area_name(self) -> str: return 'area' @property def start(self): return pd.Timestamp("19510102") @property def end(self): return pd.Timestamp("20211231") def stations(self) -> List[str]: return ["BULL_" + f.split('.')[0].split('_')[1] for f in os.listdir(self.q_path)] @property def dynamic_features(self) -> List[str]: return self._dynamic_features @property def static_features(self) -> List[str]: return self._static_features
[docs] def caravan_attributes(self) -> pd.DataFrame: """a dataframe of shape (484, 10)""" return pd.read_csv( os.path.join(self.attributes_path, "attributes_caravan_.csv"), index_col=0)
[docs] def hydroatlas_attributes(self) -> pd.DataFrame: """a dataframe of shape (484, 197)""" df = pd.read_csv( os.path.join(self.attributes_path, "attributes_hydroatlas_.csv"), index_col=0) # because self.other_attributes() has a column named 'area' df.rename(columns={'area': 'area_hydroatlas'}, inplace=True) return df
[docs] def other_attributes(self) -> pd.DataFrame: """a dataframe of shape (484, 7)""" return pd.read_csv( os.path.join(self.attributes_path, "attributes_other_ss.csv"), index_col=0)
def static_data(self) -> pd.DataFrame: return pd.concat([ self.caravan_attributes(), self.hydroatlas_attributes(), self.other_attributes() ], axis=1) def _read_dynamic_for_stn(self, stn_id: str) -> pd.DataFrame: stn_id = stn_id.split('_')[1] df = pd.concat([ self._read_q_for_stn(stn_id), self._read_aemet_for_stn(stn_id), self._read_bull_for_stn(stn_id), self._read_era5_land_for_stn(stn_id), self._read_emo1_arc_for_stn(stn_id) ], axis=1) df.index.name = 'time' df.columns.name = 'dynamic_features' df.rename(columns=self.dyn_map, inplace=True) return df def _read_dynamic_from_csv( self, stations, dynamic_features, st=None, en=None) -> dict: dynamic_features = check_attributes(dynamic_features, self.dynamic_features) stations = check_attributes(stations, self.stations()) if st is None: st = self.start if en is None: en = self.end cpus = self.processes or min(get_cpus(), 64) if len(stations) > 10 and cpus > 1: with cf.ProcessPoolExecutor(max_workers=cpus) as executor: results = executor.map( self._read_dynamic_for_stn, stations, ) dyn = {stn: data.loc[st:en, dynamic_features] for stn, data in zip(stations, results)} else: dyn = { stn: self._read_dynamic_for_stn(stn).loc[st: en, dynamic_features] for stn in stations } return dyn def _read_q_for_stn(self, stn_id) -> pd.DataFrame: """a dataframe of shape (time, 1)""" if self.ftype == "netcdf": fpath = os.path.join(self.q_path, f'streamflow_{stn_id}.nc') df = xr.load_dataset(fpath).to_dataframe() else: fpath = os.path.join(self.q_path, f'streamflow_{stn_id}.csv') df = pd.read_csv(fpath, index_col='date', parse_dates=True) df.index.name = 'time' df.columns.name = 'dynamic_features' return df def _read_aemet_for_stn(self, stn_id) -> pd.DataFrame: """a dataframe of shape (time, 5) 'temperature_max_AEMET', 'temperature_min_AEMET', 'temperature_mean_AEMET', 'total_precipitation_AEMET', 'potential_evapotranspiration_AEMET' """ if self.ftype == "netcdf": fpath = os.path.join(self.aemet_path, f'AEMET_{stn_id}.nc') df = xr.load_dataset(fpath).to_dataframe() else: fpath = os.path.join(self.aemet_path, f'AEMET_{stn_id}.csv') df = pd.read_csv(fpath, index_col='date', parse_dates=True) df.index.name = 'time' df.columns.name = 'dynamic_features' df.columns = [col + '_AEMET' for col in df.columns] return df def _read_bull_for_stn(self, stn_id) -> pd.DataFrame: """a dataframe of shape (time, 39) except for stn 3163""" if self.ftype == "netcdf": fpath = os.path.join(self.bull_path, f'BULL_{stn_id}.nc') df = xr.load_dataset(fpath).to_dataframe() else: fpath = os.path.join(self.bull_path, f'BULL_{stn_id}.csv') df = pd.read_csv(fpath, index_col='date', parse_dates=True) df.index.name = 'time' df.columns.name = 'dynamic_features' df.columns = [col + '_BULL' for col in df.columns] # todo: why are we adding _BULL to the columns if len(df.columns) == 15: # add missing columns for col in BUL_COLUMNS: if col not in df.columns: df[col] = None return df def _read_era5_land_for_stn(self, stn_id) -> pd.DataFrame: """a dataframe of shape (time, 5) with following columns - 'temperature_max_ERA5_Land', - 'temperature_min_ERA5_Land', - 'temperature_mean_ERA5_Land', - 'total_precipitation_ERA5_Land', - 'potential_evapotranspiration_ERA5_Land' """ if self.ftype == "netcdf": fpath = os.path.join(self.era5_land_path, f'ERA5_Land_{stn_id}.nc') df = xr.load_dataset(fpath).to_dataframe() else: fpath = os.path.join(self.era5_land_path, f'ERA5_Land_{stn_id}.csv') df = pd.read_csv(fpath, index_col='date', parse_dates=True) df.index.name = 'time' df.columns.name = 'dynamic_features' df.columns = [col + '_ERA5_Land' for col in df.columns] return df def _read_emo1_arc_for_stn(self, stn_id) -> pd.DataFrame: """a dataframe of shape (time, 5) with following columns - 'temperature_max_EMO1_arc' - 'temperature_min_EMO1_arc' - 'temperature_mean_EMO1_arc' - 'total_precipitation_EMO1_arc' - 'potential_evapotranspiration_EMO1_arc' """ if self.ftype == "netcdf": fpath = os.path.join(self.emo1_arc_path, f'EMO1_{stn_id}.nc') df = xr.load_dataset(fpath).to_dataframe() else: fpath = os.path.join(self.emo1_arc_path, f'EMO1_{stn_id}.csv') df = pd.read_csv(fpath, index_col='date', parse_dates=True) df.index.name = 'time' df.columns.name = 'dynamic_features' df.columns = [col + '_EMO1_arc' for col in df.columns] return df
[docs] def fetch_static_features( self, stn_id: Union[str, List[str]] = 'all', static_features: Union[str, List[str]] = 'all' ) -> pd.DataFrame: """ Returns static features of one or more stations. Parameters ---------- stn_id : str name/id of station/stations of which to extract the data static_features : list/str, optional (default="all") The name/names of features to fetch. By default, all available static features are returned. Returns ------- pd.DataFrame a pandas dataframe of shape (stations, features) Examples --------- >>> from water_datasets import Bull >>> dataset = Bull() get the names of stations >>> stns = dataset.stations() >>> len(stns) 484 get all static data of all stations >>> static_data = dataset.fetch_static_features(stns) >>> static_data.shape (484, 214) get static data of one station only >>> static_data = dataset.fetch_static_features('42600042') >>> static_data.shape (1, 214) get the names of static features >>> dataset.static_features get only selected features of all stations >>> static_data = dataset.fetch_static_features(stns, ['seasonality', 'moisture_index']) >>> static_data.shape (484, 2) >>> data = dataset.fetch_static_features('42600042', static_features=['seasonality', 'moisture_index']) >>> data.shape (1, 2) """ stations = check_attributes(stn_id, self.stations()) features = check_attributes(static_features, self.static_features, 'static_features') df = self.static_data() return df.loc[stations, features]