Source code for aqua_fetch.rr._brazil

import os
import glob
import concurrent.futures as cf
from typing import Union, List, Dict

import numpy as np
import pandas as pd

from .camels import Camels
from ..utils import check_attributes, get_cpus
from ._map import (
    min_air_temp,
    max_air_temp,
    mean_air_temp,
    mean_air_temp_with_specifier,
    min_air_temp_with_specifier,
    max_air_temp_with_specifier,
    total_potential_evapotranspiration_with_specifier,
    actual_evapotranspiration_with_specifier,
    total_precipitation_with_specifier,
    observed_streamflow_cms,
    observed_streamflow_mmd,
    mean_rel_hum_with_specifier,
    mean_windspeed_with_specifier,
    solar_radiation_with_specifier,
)

from ._map import (
    catchment_area,
    gauge_latitude,
    gauge_longitude,
    slope
    )

# directory separator
SEP = os.sep


[docs] class CAMELS_BR(Camels): """ This is a dataset of 897 Brazilian catchments with 67 static features and 10 dyanmic features for each catchment. The dyanmic features are timeseries from 1920-01-01 to 2019-02-28. This class downloads and processes CAMELS dataset of Brazil as provided by `VP Changas et al., 2020 <https://doi.org/10.5194/essd-12-2075-2020>`_ . The simulated streamflow of 593 and raw streamflow of 3679 stations shipped with this data is not included in dynamic features. Both can be fetched through fetch_simulated_streamflow and fetch_raw_streamflow methods. Examples -------- >>> from water_datasets import CAMELS_BR >>> dataset = CAMELS_BR() >>> df = dataset.fetch(stations=1, as_dataframe=True) >>> df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it >>> df.shape (14245, 12) # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 593 # we can get data of 10% catchments as below >>> data = dataset.fetch(0.1, as_dataframe=True) >>> data.shape (170940, 59) # the data is multi-index with ``time`` and ``dynamic_features`` as indices >>> data.index.names == ['time', 'dynamic_features'] True # get data by station id >>> df = dataset.fetch(stations='46035000', as_dataframe=True).unstack() >>> df.shape (14245, 12) # get names of available dynamic features >>> dataset.dynamic_features # get only selected dynamic features >>> df = dataset.fetch(1, as_dataframe=True, ... dynamic_features=['precipitation_cpc', 'evapotransp_mgb', 'temperature_mean', 'streamflow_m3s']).unstack() >>> df.shape (14245, 4) # get names of available static features >>> dataset.static_features # get data of 10 random stations >>> df = dataset.fetch(10, as_dataframe=True) >>> df.shape (170940, 10) # remember this is multi-indexed DataFrame # when we get both static and dynamic data, the returned data is a dictionary # with ``static`` and ``dyanic`` keys. >>> data = dataset.fetch(stations='46035000', static_features="all", as_dataframe=True) >>> data['static'].shape, data['dynamic'].shape ((1, 67), (170940, 1)) """ url = "https://zenodo.org/record/3964745#.YA6rUxZS-Uk" folders = {'streamflow_m3s_raw': '02_CAMELS_BR_streamflow_m3s', 'streamflow_mm': '03_CAMELS_BR_streamflow_mm_selected_catchments', 'simulated_streamflow_m3s': '04_CAMELS_BR_streamflow_simulated', 'precipitation_cpc': '07_CAMELS_BR_precipitation_cpc', 'precipitation_mswep': '06_CAMELS_BR_precipitation_mswep', 'precipitation_chirps': '05_CAMELS_BR_precipitation_chirps', 'evapotransp_gleam': '08_CAMELS_BR_evapotransp_gleam', 'evapotransp_mgb': '09_CAMELS_BR_evapotransp_mgb', 'potential_evapotransp_gleam': '10_CAMELS_BR_potential_evapotransp_gleam', 'temperature_min': '11_CAMELS_BR_temperature_min_cpc', 'temperature_mean': '12_CAMELS_BR_temperature_mean_cpc', 'temperature_max': '13_CAMELS_BR_temperature_max_cpc' }
[docs] def __init__(self, path=None, verbosity: int = 1, **kwargs): """ parameters ---------- path : str If the data is alredy downloaded then provide the complete path to it. If None, then the data will be downloaded. The data is downloaded once and therefore susbsequent calls to this class will not download the data unless ``overwrite`` is set to True. """ super().__init__(path=path, name="CAMELS_BR", verbosity=verbosity, **kwargs) self._download() # todo : dynamic data must be stored for all stations and not only for stations which are common among all attributes self._maybe_to_netcdf('camels_dyn_br') self.boundary_file = os.path.join( path, "CAMELS_BR", "14_CAMELS_BR_catchment_boundaries", "14_CAMELS_BR_catchment_boundaries", "camels_br_catchments.shp" ) self._create_boundary_id_map(self.boundary_file, 3)
@property def static_map(self) -> Dict[str, str]: return { 'area': catchment_area(), 'slope_mean': slope('degrees'), 'gauge_lat': gauge_latitude(), 'gauge_lon': gauge_longitude(), } @property def dyn_map(self): # table 1 in paper return { 'streamflow_mm': observed_streamflow_mmd(), 'temperature_min': min_air_temp(), 'temperature_max': max_air_temp(), 'temperature_mean': mean_air_temp(), 'precipitation_mswep': total_precipitation_with_specifier('mswep'), 'precipitation_chirps': total_precipitation_with_specifier('chirps'), 'precipitation_cpc': total_precipitation_with_specifier('cpc'), 'potential_evapotransp_gleam': total_potential_evapotranspiration_with_specifier('gleam'), 'evapotransp_gleam': actual_evapotranspiration_with_specifier('gleam'), 'evapotransp_mgb': actual_evapotranspiration_with_specifier('mgb'), } @property def dyn_generators(self): return { # new column to be created : (old column, function to be applied) observed_streamflow_cms(): (observed_streamflow_mmd(), self.func1), } def func1(self, x): # convert cms to mmd return x / 86400 @property def _all_dirs(self): """All the folders in the dataset_directory""" return [f for f in os.listdir(self.path) if os.path.isdir(os.path.join(self.path, f))] @property def static_dir(self): path = None for _dir in self._all_dirs: if "attributes" in _dir: # supposing that 'attributes' axist in only one file/folder in self.path path = os.path.join(self.path, f'{_dir}{SEP}{_dir}') return path @property def static_files(self): all_files = None if self.static_dir is not None: all_files = glob.glob(f"{self.static_dir}/*.txt") return all_files @property def dynamic_features(self) -> List[str]: features = list(CAMELS_BR.folders.keys()) features.remove('simulated_streamflow_m3s') features.remove('streamflow_m3s_raw') return [self.dyn_map.get(feature, feature) for feature in features] + list(self.dyn_generators.keys()) @property def static_attribute_categories(self): static_attrs = [] for f in self.static_files: ff = str(os.path.basename(f).split('.txt')[0]) static_attrs.append('_'.join(ff.split('_')[2:])) return static_attrs @property def static_features(self): static_fpath = os.path.join(self.path, 'static_features.csv') if not os.path.exists(static_fpath): files = glob.glob( f"{os.path.join(self.path, '01_CAMELS_BR_attributes', '01_CAMELS_BR_attributes')}/*.txt") cols = [] for f in files: _df = pd.read_csv(f, sep=' ', index_col='gauge_id', nrows=1) cols += list(_df.columns) else: df = pd.read_csv(static_fpath, index_col='gauge_id', nrows=1) cols = list(df.columns) return cols @property def start(self): return "19200601" @property def end(self): return "20190228"
[docs] def q_mmd( self, stations: Union[str, List[str]] = "all" ) -> pd.DataFrame: """ returns streamflow in the units of milimeter per day. he name of original timeseries is ``streamflow_mm``. parameters ---------- stations : str/list name/names of stations. Default is None, which will return area of all stations Returns -------- pd.DataFrame a pandas DataFrame whose indices are time-steps and columns are catchment/station ids. """ stations = check_attributes(stations, self.stations()) q = self.fetch_stations_features(stations, dynamic_features=observed_streamflow_mmd(), as_dataframe=True) q.index = q.index.get_level_values(0) # area_m2 = self.area(stations) * 1e6 # area in m2 # q = (q / area_m2) * 86400 # cms to m/day return q # * 1e3 # to mm/day
[docs] def area( self, stations: Union[str, List[str]] = "all", source: str = "gsim", ) -> pd.Series: """ Returns area (Km2) of all catchments as pandas series parameters ---------- stations : str/list name/names of stations. Default is None, which will return area of all stations source : str source of area calculation. It should be either ``gsim`` or ``ana`` Returns -------- pd.Series a pandas series whose indices are catchment ids and values are areas of corresponding catchments. Examples --------- >>> from water_datasets import CAMELS_BR >>> dataset = CAMELS_BR() >>> dataset.area() # returns area of all stations >>> dataset.stn_coords('65100000') # returns area of station whose id is 912101A >>> dataset.stn_coords(['65100000', '64075000']) # returns area of two stations """ SRC_MAP = { 'gsim': 'area_gsim', 'ana': 'area_ana' } stations = check_attributes(stations, self.stations()) fpath = os.path.join(self.path, '01_CAMELS_BR_attributes', '01_CAMELS_BR_attributes', 'camels_br_location.txt') df = pd.read_csv(fpath, sep=' ') df.index = df['gauge_id'].astype(str) s = df[SRC_MAP[source]] s.name = 'area' return s.loc[stations]
[docs] def stn_coords( self, stations: Union[str, List[str]] = 'all' ) -> pd.DataFrame: """ returns coordinates of stations as DataFrame with ``long`` and ``lat`` as columns. Parameters ---------- stations : name/names of stations. If not given, coordinates of all stations will be returned. Returns ------- coords : pandas DataFrame with ``long`` and ``lat`` columns. The length of dataframe will be equal to number of stations wholse coordinates are to be fetched. Examples -------- >>> dataset = CAMELS_BR() >>> dataset.stn_coords() # returns coordinates of all stations >>> dataset.stn_coords('65100000') # returns coordinates of station whose id is 912101A >>> dataset.stn_coords(['65100000', '64075000']) # returns coordinates of two stations """ fpath = os.path.join(self.path, '01_CAMELS_BR_attributes', '01_CAMELS_BR_attributes', 'camels_br_location.txt') df = pd.read_csv(fpath, sep=' ') df.index = df['gauge_id'].astype(str) df = df[['gauge_lat', 'gauge_lon']] df.columns = ['lat', 'long'] stations = check_attributes(stations, self.stations()) return df.loc[stations, :]
[docs] def all_stations(self, feature: str) -> List[str]: """Tells all station ids for which a data of a specific attribute is available.""" p = self.folders[feature] return [f.split('_')[0] for f in os.listdir(os.path.join(self.path, p, p))]
[docs] def stations( self, ) -> List[str]: """ Returns a list of station ids. Example ------- >>> dataset = CAMELS_BR() >>> stations = dataset.stations() """ return self.all_stations('streamflow_mm')
[docs] def fetch_raw_streamflow( self, station_id: str = None ) -> pd.DataFrame: """ returns raw streamflow data for one or more stations. Example ------- >>> dataset = CAMELS_BR() >>> data = dataset.fetch_raw_streamflow('10500000') ... # fetch all time series data associated with a station. >>> x = dataset.fetch_raw_streamflow(dataset.all_stations()) """ if station_id is None: station_id = self.all_stations('streamflow_m3s_raw') if not isinstance(station_id, list): station_id = [station_id] raw_q = [] for stn in station_id: self._read_dynamic_feature('streamflow_m3s_raw', stn) return pd.concat(raw_q, axis=1)
[docs] def fetch_simulated_streamflow( self, station_id: str = None ) -> pd.DataFrame: """ returns raw streamflow data for one or more stations. Example ------- >>> dataset = CAMELS_BR() >>> data = dataset.fetch_simulated_streamflow('10500000') ... # fetch all time series data associated with a station. >>> x = dataset.fetch_simulated_streamflow(dataset.all_stations()) """ if station_id is None: station_id = self.all_stations('simulated_streamflow_m3s') if not isinstance(station_id, list): station_id = [station_id] raw_q = [] for stn in station_id: self._read_dynamic_feature('simulated_streamflow_m3s', stn) return pd.concat(raw_q, axis=1)
def _read_dynamic_from_csv( self, stations, attributes: Union[str, list] = 'all', st=None, en=None, ) -> Dict[str, pd.DataFrame]: """ returns the dynamic/time series attribute/attributes for one station id. Example ------- >>> dataset = CAMELS_BR() >>> pcp = dataset.fetch_dynamic_features('10500000', 'precipitation_cpc') ... # fetch all time series data associated with a station. >>> x = dataset.fetch_dynamic_features('51560000', dataset.dynamic_features) """ features = check_attributes(attributes, self.dynamic_features, 'dynamic_features') if st is None: st = self.start if en is None: en = self.end cpus = self.processes or min(get_cpus(), 64) dyn = {} if cpus == 1: for idx, stn_id in enumerate(stations): # making one separate dataframe for one station dyn[stn_id] = self.get_dynamic_features(stn_id, features).loc[st:en] if idx % 20 == 0: print(f"completed {idx} stations") else: if self.verbosity > 0: print(f"getting data for {len(stations)} stations using {cpus} cpus") features = [features for _ in range(len(stations))] with cf.ProcessPoolExecutor(cpus) as executor: results = executor.map(self.get_dynamic_features, stations, features) for stn_id, stn_df in zip(stations, results): dyn[stn_id] = stn_df.loc[st:en] if self.verbosity > 1: print(f"completed fetching data for {len(stations)} stations") return dyn def get_dynamic_features(self, stn_id, features, st=None, en=None): feature_dfs = [] for feature, path in self.folders.items(): if feature in ['simulated_streamflow_m3s', 'streamflow_m3s_raw']: continue feature_df = self._read_dynamic_feature(path, feature=feature, stn_id=stn_id, st=st, en=en) feature_dfs.append(feature_df) stn_df = pd.concat(feature_dfs, axis=1) for new_col, (old_col, func) in self.dyn_generators.items(): if old_col in stn_df.columns: stn_df[new_col] = func(stn_df[old_col]) stn_df.columns.name = 'dynamic_features' stn_df.index.name = 'time' return stn_df def _read_dynamic_feature(self, folder, feature, stn_id, st=None, en=None): path = os.path.join(self.path, f'{folder}{SEP}{folder}') # supposing that the filename starts with stn_id and has .txt extension. fname = [f for f in os.listdir(path) if f.startswith(str(stn_id)) and f.endswith('.txt')] assert len(fname) == 1, f"{len(fname)} {stn_id} in {folder} for {feature}" fname = fname[0] if os.path.exists(os.path.join(path, fname)): df = pd.read_csv(os.path.join(path, fname), sep=' ') df.index = pd.to_datetime(df[['year', 'month', 'day']]) df = df.drop(['year', 'month', 'day'], axis=1) df = pd.DataFrame(df.loc[st:en, feature]) df.rename(columns = self.dyn_map, inplace=True) else: raise FileNotFoundError(f"file {fname} not found at {path}") return df.astype(np.float32)
[docs] def fetch_static_features( self, stn_id: Union[str, List[str]] = "all", static_features: Union[str, List[str]] = "all" ) -> pd.DataFrame: """ fetches static feature/features of one or mroe stations Parameters ---------- stn_id : int/list station id whose attribute to fetch. static_features : str/list name of attribute to fetch. Default is None, which will return all the attributes for a particular station of the specified category. Example ------- >>> dataset = Camels() >>> df = dataset.fetch_static_features('11500000', 'climate') # read all static features of all stations >>> data = dataset.fetch_static_features(dataset.stations(), dataset.static_features) >>> data.shape (597, 67) """ station = check_attributes(stn_id, self.stations(), 'stations') attributes = check_attributes(static_features, self.static_features, 'static_features') static_fpath = os.path.join(self.path, 'static_features.csv') if not os.path.exists(static_fpath): files = glob.glob( f"{os.path.join(self.path, '01_CAMELS_BR_attributes', '01_CAMELS_BR_attributes')}/*.txt") static_df = pd.DataFrame() for f in files: _df = pd.read_csv(f, sep=' ', index_col='gauge_id') static_df = pd.concat([static_df, _df], axis=1) static_df.to_csv(static_fpath, index_label='gauge_id') else: static_df = pd.read_csv(static_fpath, index_col='gauge_id') static_df.index = static_df.index.astype(str) return pd.DataFrame(static_df.loc[station][attributes])
[docs] class CABra(Camels): """ Reads and fetches CABra dataset which is catchment attribute dataset following the work of `Almagro et al., 2021 <https://doi.org/10.5194/hess-25-3105-2021>`_ This dataset consists of 97 static and 12 dynamic features of 735 Brazilian catchments. The temporal extent is from 1980 to 2020. The dyanmic features consist of daily hydro-meteorological time series Examples --------- >>> from water_datasets import CABra >>> dataset = CABra() >>> data = dataset.fetch(0.1, as_dataframe=True) >>> data.shape (131472, 73) # 73 represents number of stations >>> data.index.names == ['time', 'dynamic_features'] True >>> df = dataset.fetch(stations=1, as_dataframe=True) >>> df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it >>> df.shape (10956, 12) # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 735 # get data by station id >>> df = dataset.fetch(stations='92', as_dataframe=True).unstack() >>> df.shape (10956, 12) # get names of available dynamic features >>> dataset.dynamic_features # get only selected dynamic features >>> df = dataset.fetch(1, as_dataframe=True, ... dynamic_features=['p_ens', 'tmax_ens', 'pet_pm', 'rh_ens', 'Streamflow']).unstack() >>> df.shape (10956, 5) # get names of available static features >>> dataset.static_features # get data of 10 random stations >>> df = dataset.fetch(10, as_dataframe=True) >>> df.shape (131472, 10) # remember this is multi-indexed DataFrame # when we get both static and dynamic data, the returned data is a dictionary # with ``static`` and ``dyanic`` keys. >>> data = dataset.fetch(stations='92', static_features="all", as_dataframe=True) >>> data['static'].shape, data['dynamic'].shape ((1, 97), (131472, 1)) """ url = 'https://zenodo.org/record/7612350'
[docs] def __init__(self, path=None, overwrite=False, to_netcdf: bool = True, met_src: str = 'ens', **kwargs): """ Parameters ---------- path : str If the data is alredy downloaded then provide the complete path to it. If None, then the data will be downloaded. The data is downloaded once and therefore susbsequent calls to this class will not download the data unless ``overwrite`` is set to True. overwrite : bool If the data is already down then you can set it to True, to make a fresh download. to_netcdf : bool whether to convert all the data into one netcdf file or not. This will fasten repeated calls to fetch etc but will require netcdf5 package as well as xarry. met_src : str source of meteorological data, must be one of ``ens``, ``era5`` or ``ref``. """ super(CABra, self).__init__(path=path, **kwargs) self.path = path self.met_src = met_src self._download(overwrite=overwrite) self._dynamic_features = self.__dynamic_features() self._static_features = self.__static_features() self.dyn_fname = os.path.join(self.path, f'cabra_{met_src}_dyn.nc') if to_netcdf: self._maybe_to_netcdf(f'cabra_{met_src}_dyn') self.boundary_file = os.path.join(self.path, "CABra_boundaries", "CABra_boundaries.shp") self._create_boundary_id_map(self.boundary_file, 2)
@property def static_map(self) -> Dict[str, str]: return { 'catch_area': catchment_area(), 'catch_slope': slope('perc'), 'latitude': gauge_latitude(), 'longitude': gauge_longitude(), } @staticmethod def _get_map(sf_reader, id_index=None, name: str = '') -> Dict[str, int]: fieldnames = [f[0] for f in sf_reader.fields[1:]] if len(fieldnames) > 1: if id_index is None: raise ValueError(f""" more than one fileds are present in {name} shapefile i.e: {fieldnames}. Please provide a value for id_idx_in_{name} that must be less than {len(fieldnames)} """) else: id_index = 0 catch_ids_map = { str(int(rec[id_index])): idx for idx, rec in enumerate(sf_reader.iterRecords()) } return catch_ids_map @property def dyn_map(self): # table 3 in the paper https://hess.copernicus.org/articles/25/3105/2021/#&gid=1&pid=1 return { 'Streamflow': observed_streamflow_cms(), 'tmin_ens': min_air_temp_with_specifier('ens'), 'tmax_ens': max_air_temp_with_specifier('ens'), 'tmin_era5': min_air_temp_with_specifier('era5'), 'tmax_era5': max_air_temp_with_specifier('era5'), 'tmin_ref': min_air_temp_with_specifier('ref'), 'tmax_ref': max_air_temp_with_specifier('ref'), 'p_ens': total_precipitation_with_specifier('ens'), 'p_ref': total_precipitation_with_specifier('ref'), 'p_era5': total_precipitation_with_specifier('era5'), 'rh_ens': mean_rel_hum_with_specifier('ens'), 'rh_era5': mean_rel_hum_with_specifier('era5'), 'rh_ref': mean_rel_hum_with_specifier('ref'), 'wnd_ens': mean_windspeed_with_specifier('ens'), 'wnd_era5': mean_windspeed_with_specifier('era5'), 'wnd_ref': mean_windspeed_with_specifier('ref'), 'et_ens': actual_evapotranspiration_with_specifier('ens'), 'pet_pm': total_potential_evapotranspiration_with_specifier('pm'), 'pet_pt': total_potential_evapotranspiration_with_specifier('pt'), 'pet_hg': total_potential_evapotranspiration_with_specifier('hg'), 'srad_ens': solar_radiation_with_specifier('ens'), # todo: change units from MJ/m2/day to W/m2 'srad_era5': solar_radiation_with_specifier('era5'), 'srad_ref': solar_radiation_with_specifier('ref'), } @property def q_path(self): return os.path.join(self.path, "CABra_streamflow_daily_series", "CABra_daily_streamflow") @property def attr_path(self): return os.path.join(self.path, 'CABra_attributes', 'CABra_attributes') @property def dynamic_features(self) -> List[str]: return self._dynamic_features def __dynamic_features(self) -> List[str]: stn = self.stations()[0] df = pd.concat([self._read_q_from_csv(stn), self._read_meteo_from_csv(stn)], axis=1) cols = df.columns.to_list() cols = [col for col in cols if col not in ['Year', 'Month', 'Day']] return cols @property def static_features(self) -> List[str]: """names of static features""" return self._static_features def __static_features(self) -> List[str]: return pd.concat( [ self.climate_attrs(), self.general_attrs(), self.geology_attrs(), self.gw_attrs(), self.hydro_distrub_attrs(), self.lc_attrs(), self.soil_attrs(), self.q_attrs(), self.topology_attrs()], axis=1).columns.to_list() def stations(self) -> List[str]: return self.add_attrs().index.astype(str).to_list() @property def start(self) -> pd.Timestamp: return pd.Timestamp("1980-10-01") @property def end(self) -> pd.Timestamp: return pd.Timestamp("2010-09-30")
[docs] def q_mmd( self, stations: Union[str, List[str]] = 'all' ) -> pd.DataFrame: """ returns streamflow in the units of milimeter per day. It is obtained by dividing ``Streamflow`` time series by area parameters ---------- stations : str/list name/names of stations. Default is ``all``, which will return area of all stations Returns -------- pd.DataFrame a pandas DataFrame whose indices are time-steps and columns are catchment/station ids. """ stations = check_attributes(stations, self.stations()) q = self.fetch_stations_features(stations, dynamic_features=observed_streamflow_cms(), as_dataframe=True) q.index = q.index.get_level_values(0) area_m2 = self.area(stations) * 1e6 # area in m2 q = (q / area_m2) * 86400 # cms to m/day return q * 1e3 # to mm/day
@property def _area_name(self) -> str: return 'catch_area'
[docs] def stn_coords( self, stations: Union[str, List[str]] = 'all' ) -> pd.DataFrame: """ returns coordinates of stations as DataFrame with ``long`` and ``lat`` as columns. Parameters ---------- stations : name/names of stations. If not given, coordinates of all stations will be returned. Returns ------- coords : pandas DataFrame with ``long`` and ``lat`` columns. The length of dataframe will be equal to number of stations wholse coordinates are to be fetched. Examples -------- >>> dataset = CABra() >>> dataset.stn_coords() # returns coordinates of all stations >>> dataset.stn_coords('92') # returns coordinates of station whose id is 912101A >>> dataset.stn_coords(['92', '142']) # returns coordinates of two stations """ df = self.general_attrs() df.index = df.index.astype(str) df = df[['latitude', 'longitude']] df.columns = ['lat', 'long'] stations = check_attributes(stations, self.stations()) return df.loc[stations, :]
[docs] def add_attrs(self) -> pd.DataFrame: """ Returns additional catchment attributes """ fpath = os.path.join(self.attr_path, "CABra_additional_attributes.txt") dtypes = {"CABra_ID": int, # todo shouldn't it be str? "ANA_ID": int, "longitude_centroid": np.float32, "latitude_centroid": np.float32, "dist_coast": np.float32} add_attributes = pd.read_csv(fpath, sep='\t', names=list(dtypes.keys()), dtype=dtypes, header=4) add_attributes.index = add_attributes.pop('CABra_ID') return add_attributes
[docs] def climate_attrs(self) -> pd.DataFrame: """ returns climate attributes for all catchments """ fpath = os.path.join(self.attr_path, "CABra_climate_attributes.txt") dtypes = {"CABra_ID": int, # todo shouldn't it be str? "ANA_ID": int, "clim_p": np.float32, "clim_tmin": np.float32, "clim_tmax": np.float32, "clim_rh": np.float32, "clim_wind": np.float32, "clim_srad": np.float32, "clim_et": np.float32, "clim_pet": np.float32, "aridity_index": np.float32, "p_seasonality": np.float32, "clim_quality": int, } clim_attrs = pd.read_csv(fpath, sep='\t', names=list(dtypes.keys()), dtype=dtypes, encoding_errors='ignore', header=6) clim_attrs.index = clim_attrs.pop('CABra_ID') return clim_attrs
[docs] def general_attrs(self) -> pd.DataFrame: """ returns general attributes for all catchments """ fpath = os.path.join(self.attr_path, "CABra_general_attributes.txt") dtypes = {"CABra_ID": int, # todo shouldn't it be str? "ANA_ID": int, "longitude": np.float32, "latitude": np.float32, "gauge_hreg": str, "gauge_biome": str, "gauge_state": str, "missing_data": np.float32, "series_length": np.float32, "quality_index": np.float32, } gen_attrs = pd.read_csv(fpath, sep='\t', names=list(dtypes.keys()), dtype=dtypes, encoding_errors='ignore', header=6) gen_attrs.index = gen_attrs.pop('CABra_ID') return gen_attrs
[docs] def geology_attrs(self) -> pd.DataFrame: """ returns geological attributes for all catchments """ fpath = os.path.join(self.attr_path, "CABra_geology_attributes.txt") dtypes = {"CABra_ID": int, "ANA_ID": int, "catch_lith": str, "sub_porosity": np.float32, "sub_permeability": np.float32, "sub_hconduc": np.float32, } gen_attrs = pd.read_csv(fpath, sep='\t', names=list(dtypes.keys()), dtype=dtypes, encoding_errors='ignore', header=6) gen_attrs.index = gen_attrs.pop('CABra_ID') return gen_attrs
[docs] def gw_attrs(self) -> pd.DataFrame: """ returns groundwater attributes for all catchments """ fpath = os.path.join(self.attr_path, "CABra_groundwater_attributes.txt") dtypes = {"CABra_ID": int, "ANA_ID": int, "aquif_name": str, "aquif_type": str, "catch_wtd": np.float32, "catch_hand": np.float32, "hand_class": str, "well_number": int, "well_static": str, "well_dynamic": str, } gen_attrs = pd.read_csv(fpath, sep='\t', names=list(dtypes.keys()), dtype=dtypes, encoding_errors='ignore', header=7) gen_attrs.index = gen_attrs.pop('CABra_ID') return gen_attrs
[docs] def hydro_distrub_attrs(self) -> pd.DataFrame: """ returns geological attributes for all catchments """ fpath = os.path.join(self.attr_path, "CABra_hydrologic_disturbance_attributes.txt") dtypes = {"CABra_ID": int, "ANA_ID": int, "dist_urban": int, "cover_urban": np.float32, "cover_crops": np.float32, "res_number": int, "res_area": np.float32, "res_volume": np.float32, "res_regulation": np.float32, "water_demand": int, "hdisturb_index": np.float32, } gen_attrs = pd.read_csv(fpath, sep='\t', names=list(dtypes.keys()), dtype=dtypes, encoding_errors='ignore', header=8) gen_attrs.index = gen_attrs.pop('CABra_ID') return gen_attrs
[docs] def lc_attrs(self) -> pd.DataFrame: """ returns land cover attributes for all catchments """ fpath = os.path.join(self.attr_path, "CABra_land-cover_attributes.txt") dtypes = {"CABra_ID": int, "ANA_ID": int, "cover_main": str, "cover_bare": np.float32, "cover_forest": np.float32, "cover_crops": np.float32, "cover_grass": np.float32, "cover_moss": np.float32, "cover_shrub": np.float32, "cover_urban": np.float32, "cover_snow": np.float32, "cover_waterp": np.float32, "cover_waters": np.float32, "ndvi_djf": np.float32, "ndvi_mam": np.float32, "ndvi_jja": np.float32, "ndvi_son": np.float32, } lc_attrs = pd.read_csv(fpath, sep='\t', names=list(dtypes.keys()), dtype=dtypes, encoding_errors='ignore', header=6) lc_attrs.index = lc_attrs.pop('CABra_ID') return lc_attrs
[docs] def soil_attrs(self) -> pd.DataFrame: """ returns soil attributes for all catchments """ fpath = os.path.join(self.attr_path, "CABra_soil_attributes.txt") dtypes = {"CABra_ID": int, "ANA_ID": int, "soil_type": str, "soil_textclass": str, "soil_sand": np.float32, "soil_silt": np.float32, "soil_clay": np.float32, "soil_carbon": np.float32, "soil_bulk": np.float32, "soil_depth": np.float32, } soil_attrs = pd.read_csv(fpath, sep='\t', names=list(dtypes.keys()), dtype=dtypes, encoding_errors='ignore', header=7) soil_attrs.index = soil_attrs.pop('CABra_ID') return soil_attrs
[docs] def q_attrs(self) -> pd.DataFrame: """ returns streamflow attributes for all catchments """ fpath = os.path.join(self.attr_path, "CABra_streamflow_attributes.txt") dtypes = {"CABra_ID": int, "ANA_ID": int, "q_mean": np.float32, "q_1": np.float32, "q_5": np.float32, "q_95": np.float32, "q_99": np.float32, "q_lf": np.float32, "q_ld": np.float32, "q_hf": np.float32, "q_hd": np.float32, "q_hfd": np.float32, "q_zero": int, "q_cv": np.float32, "q_lcv": np.float32, "q_hcv": np.float32, "q_elasticity": np.float32, "fdc_slope": np.float32, "baseflow_index": np.float32, 'runoff_coef': np.float32 } names = list(dtypes.keys()) dtypes.pop('q_cv') dtypes.pop('q_mean') dtypes.pop('q_lcv') dtypes.pop('fdc_slope') q_attrs = pd.read_csv(fpath, sep='\t', names=names, dtype=dtypes, encoding_errors='ignore', header=7) q_attrs.index = q_attrs.pop('CABra_ID') return q_attrs
[docs] def topology_attrs(self) -> pd.DataFrame: """ returns topology attributes for all catchments """ fpath = os.path.join(self.attr_path, "CABra_topography_attributes.txt") dtypes = {"CABra_ID": int, "ANA_ID": int, "catch_area": np.float32, "elev_mean": np.float32, "elev_min": np.float32, "elev_max": np.float32, "elev_gauge": np.float32, "catch_slope": np.float32, "catch_order": int, } gen_attrs = pd.read_csv(fpath, sep='\t', names=list(dtypes.keys()), dtype=dtypes, encoding_errors='ignore', header=7) gen_attrs.index = gen_attrs.pop('CABra_ID') return gen_attrs
def _read_q_from_csv(self, stn_id: str) -> pd.DataFrame: q_fpath = os.path.join(self.q_path, f"CABra_{stn_id}_streamflow.txt") df = pd.read_csv(q_fpath, sep='\t', header=8, names=['Year', 'Month', 'Day', 'Streamflow', 'Quality'], dtype={'Year': np.int16, 'Month': np.int16, 'Day': np.int16, # 'Streamflow': np.float32, 'Quality': np.int16} ) df.rename(columns=self.dyn_map, inplace=True) df[observed_streamflow_cms()] = df[observed_streamflow_cms()].astype(np.float32) return df def _read_meteo_from_csv( self, stn_id: str, source="ens") -> pd.DataFrame: meteo_path = os.path.join(self.path, 'CABra_climate_daily_series', 'climate_daily', source ) meteo_fpath = os.path.join(meteo_path, f"CABra_{stn_id}_climate_{source.upper()}.txt") dtypes = {"Year": int, "Month": int, "Day": int, "p_ens": np.float32, "tmin_ens": np.float32, "tmax_ens": np.float32, "rh_ens": np.float32, "wnd_ens": np.float32, "srad_ens": np.float32, "et_ens": np.float32, "pet_pm": np.float32, "pet_pt": np.float32, "pet_hg": np.float32} if source == "ref" and stn_id in [ '1', '2', '3', '4', '5', '6', '7', '8', '9', '15', '17', '18', '19', '27', '28', '34', '526', '564', '567', '569' ]: df = pd.DataFrame(columns=list(dtypes.keys())) else: df = pd.read_csv(meteo_fpath, sep="\t", names=list(dtypes.keys()), dtype=dtypes, header=12) df.rename(columns=self.dyn_map, inplace=True) return df
[docs] def fetch_static_features( self, stn_id: Union[str, List[str]] = 'all', static_features: Union[str, List[str]] = 'all' ) -> pd.DataFrame: """ Returns static features of one or more stations. Parameters ---------- stn_id : str name/id of station/stations of which to extract the data static_features : list/str, optional (default="all") The name/names of features to fetch. By default, all available static features are returned. Returns ------- pd.DataFrame a pandas dataframe of shape (stations, features) Examples --------- >>> from water_datasets import CABra >>> dataset = CABra() get the names of stations >>> stns = dataset.stations() >>> len(stns) 735 get all static data of all stations >>> static_data = dataset.fetch_static_features(stns) >>> static_data.shape (735, 97) get static data of one station only >>> static_data = dataset.fetch_static_features('92') >>> static_data.shape (1, 97) get the names of static features >>> dataset.static_features get only selected features of all stations >>> static_data = dataset.fetch_static_features(stns, ['gauge_lat', 'area']) >>> static_data.shape (735, 2) >>> data = dataset.fetch_static_features('92', static_features=['gauge_lat', 'area']) >>> data.shape (1, 2) """ stations = check_attributes(stn_id, self.stations()) features = check_attributes(static_features, self.static_features, 'static_features') df = pd.concat([self.climate_attrs(), self.general_attrs(), self.geology_attrs(), self.gw_attrs(), self.hydro_distrub_attrs(), self.lc_attrs(), self.soil_attrs(), self.q_attrs(), self.topology_attrs()], axis=1) df.index = df.index.astype(str) # drop duplicate columns df = df.loc[:, ~df.columns.duplicated()].copy() return df.loc[stations, features]
def _read_dynamic_from_csv( self, stations, dynamic_features, st=None, en=None ) -> dict: features = check_attributes(dynamic_features, self.dynamic_features, 'dynamic_features') if self.verbosity>1: print(f"getting data for {len(dynamic_features)} and for {len(stations)} stations") # qs and meteo data has different index if self.verbosity>2: print("getting streamflow data") qs = [self._read_q_from_csv(stn_id=stn_id) for stn_id in stations] q_idx = pd.to_datetime( qs[0]['Year'].astype(str) + '-' + qs[0]['Month'].astype(str) + '-' + qs[0]['Day'].astype(str)) if self.verbosity>2: print("getting meteo data") meteos = [ self._read_meteo_from_csv(stn_id=stn_id, source=self.met_src) for stn_id in stations] # 10 because first 10 stations don't have data for "ref" source met_idx = pd.to_datetime( meteos[10]['Year'].astype(str) + '-' + meteos[10]['Month'].astype(str) + '-' + meteos[10]['Day'].astype( str)) met_cols = [col for col in meteos[0].columns if col not in ['Year', 'Month', 'Day']] dyn = {} for stn, q, meteo in zip(self.stations(), qs, meteos): if len(meteo) == 0: meteo = pd.DataFrame(meteo, index=met_idx) else: meteo.index = met_idx q.index = q_idx stn_df = pd.concat( [meteo[met_cols].astype(np.float32), q[['Quality', observed_streamflow_cms()]]], axis=1)[features] stn_df.index.name = 'time' stn_df.columns.name = 'dynamic_features' dyn[stn] = stn_df return dyn