Source code for aqua_fetch.rr._ccam


import os
import json
from typing import Union, List, Dict

import numpy as np
import pandas as pd

from .camels import Camels
from .._backend import shapefile
from .._backend import xarray as xr
from ..utils import merge_shapefiles
from ..utils import check_attributes, dateandtime_now

from ._map import (
    observed_streamflow_cms,
    observed_streamflow_mmd,
    min_air_temp,
    max_air_temp,
    mean_air_temp,
    mean_rel_hum,
    mean_daily_evaporation,
    max_daily_ground_surface_temp,
    min_daily_ground_surface_temp,
    mean_daily_ground_surface_temp,
    total_precipitation,
    sunshine_duration,
    max_windspeed,
    min_windspeed,
    mean_windspeed,
)

from ._map import (
    catchment_area,
    gauge_latitude,
    gauge_longitude,
    slope
    )


[docs] class CCAM(Camels): """ Dataset for chinese catchments. The CCAM dataset was published by `Hao et al., 2021 <https://doi.org/10.5194/essd-13-5591-2021>`_ has two sets. One set consists of catchment attributes, meteorological data, catchment boundaries of over 4000 catchments. However this data does not have streamflow data. The second set consists of streamflow, catchment attributes, catchment boundaries and meteorological data for 102 catchments of Yellow River. Since this second set conforms to the norms of CAMELS, this class uses this second set. Therefore, the ``fetch``, ``stations`` and other methods/attributes of this class return data of only Yellow River catchments and not for whole china. However, the first set of data is can also be fetched using `fetch_meteo` method of this class. The temporal extent of both sets is from 1999 to 2020. However, the streamflow time series in first set has very large number of missing values. The data of Yellow river consists fo 16 dynamic features (time series) and 124 static features (catchment attributes). Examples --------- >>> from water_datasets import CCAM >>> dataset = CCAM() >>> data = dataset.fetch(0.1, as_dataframe=True) >>> data.shape (128560, 10) >>> data.index.names == ['time', 'dynamic_features'] True >>> df = dataset.fetch(stations=1, as_dataframe=True) >>> df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it >>> df.shape (8035, 16) # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 102 # get data by station id >>> df = dataset.fetch(stations='0010', as_dataframe=True).unstack() >>> df.shape (8035, 16) # get names of available dynamic features >>> dataset.dynamic_features # get only selected dynamic features >>> df = dataset.fetch(1, as_dataframe=True, dynamic_features=['pre', 'tem_mean', 'evp', 'rhu', 'q']).unstack() >>> df.shape (8035, 5) # get names of available static features >>> dataset.static_features # get data of 10 random stations >>> df = dataset.fetch(10, as_dataframe=True) >>> df.shape (128560, 10) # remember this is multi-indexed DataFrame # when we get both static and dynamic data, the returned data is a dictionary # with ``static`` and ``dyanic`` keys. >>> data = dataset.fetch(stations='0010', static_features="all", as_dataframe=True) >>> data['static'].shape, data['dynamic'].shape ((1, 124), (128560, 1)) """ url = "https://zenodo.org/record/5729444"
[docs] def __init__(self, path=None, overwrite:bool=False, to_netcdf:bool = True, **kwargs): """ Parameters ---------- path : str If the data is alredy downloaded then provide the complete path to it. If None, then the data will be downloaded. The data is downloaded once and therefore susbsequent calls to this class will not download the data unless ``overwrite`` is set to True. overwrite : bool If the data is already down then you can set it to True, to make a fresh download. to_netcdf : bool whether to convert all the data into one netcdf file or not. This will fasten repeated calls to fetch etc but will require netcdf5 package as well as xarry. """ super(CCAM, self).__init__(path=path, **kwargs) self.path = path self._download(overwrite=overwrite) self.dyn_fname = os.path.join(self.path, 'ccam_dyn.nc') if to_netcdf: self._maybe_to_netcdf('ccam_dyn') self._maybe_meteo_to_nc() shp_path = os.path.join(self.path, "7_HydroMLYR", "7_HydroMLYR", "0_basin_boundary") self.boundary_file = os.path.join(shp_path, 'boundaries.shp') files = [file for file in os.listdir(shp_path) if file.endswith('.shp')] shp_files = [os.path.join(shp_path, shp_file) for shp_file in files] boundaries = os.path.join(shp_path, "boundaries") if shapefile is not None: merge_shapefiles(shp_files, boundaries, add_new_field=True, ignore_previous_fields=True, verbosity=self.verbosity) self._create_boundary_id_map(self.boundary_file, 2)
@property def static_map(self) -> Dict[str, str]: return { 'area': catchment_area(), 'slope': slope('mkm-1'), 'lat': gauge_latitude(), 'lon': gauge_longitude(), } @property def dyn_map(self): return { 'q': observed_streamflow_mmd(), 'tem_min': min_air_temp(), 'tem_max': max_air_temp(), 'tem_mean': mean_air_temp(), 'rhu': mean_rel_hum(), 'evp': mean_daily_evaporation(), 'gst_max': max_daily_ground_surface_temp(), 'gst_min': min_daily_ground_surface_temp(), 'gst_mean': mean_daily_ground_surface_temp(), 'pre': total_precipitation(), 'ssd': sunshine_duration(), 'win_max': max_windspeed(), 'win_mean': mean_windspeed(), } @property def meteo_path(self): """path where daily meteorological data of stations is present""" return os.path.join(self.path, "1_meteorological", '1_meteorological') @property def meteo_nc_path(self): return os.path.join(self.path, "meteo_data.nc") @property def meteo_stations(self)->List[str]: stations = [fpath.split('.')[0] for fpath in os.listdir(self.meteo_path)] stations.remove('35616') return stations @property def yr_data_path(self): return os.path.join(self.path, "7_HydroMLYR", "7_HydroMLYR", '1_data')
[docs] def q_mmd( self, stations: Union[str, List[str]] = "all" )->pd.DataFrame: """ returns streamflow in the units of milimeter per day. This is obtained by diving ``q``/area parameters ---------- stations : str/list name/names of stations. Default is ``all``, which will return area of all stations Returns -------- pd.DataFrame a pandas DataFrame whose indices are time-steps and columns are catchment/station ids. """ stations = check_attributes(stations, self.stations(), 'stations') q = self.fetch_stations_features(stations, dynamic_features='q', as_dataframe=True) q.index = q.index.get_level_values(0) area_m2 = self.area(stations) * 1e6 # area in m2 q = (q / area_m2) * 86400 # cms to m/day return q * 1e3 # to mm/day
@property def _area_name(self) ->str: return 'area' @property def _coords_name(self) ->List[str]: return ['lat', 'lon']
[docs] def stations(self): """Returns station ids for catchments on Yellow River""" return os.listdir(self.yr_data_path)
@property def dynamic_features(self)->List[str]: """names of hydro-meteorological time series data for Yellow River catchments""" return ['pre', 'evp', 'gst_mean', 'prs_mean', 'tem_mean', 'rhu', 'win_mean', 'gst_min', 'prs_min', 'tem_min', 'gst_max', 'prs_max', 'tem_max', 'ssd', 'win_max', 'q'] @property def static_features(self)->List[str]: """names of static features for Yellow River catchments""" attr_fpath = os.path.join(self.yr_data_path, self.stations()[0], 'attributes.json') with open(attr_fpath, 'r') as fp: data = json.load(fp) return list(data.keys()) @property def start(self): # start of data return pd.Timestamp('1999-01-02 00:00:00') @property def end(self): # end of data return pd.Timestamp('2020-12-31 00:00:00') def _read_meteo_from_csv( self, stn_id:str )->pd.DataFrame: """returns daily meteorological data of one station as DataFrame after reading it from csv file. This data is from 1990-01-01 to 2021-03-31. The returned dataframe has following columns - 'PRE' - 'TEM': temperature - 'PRS': pressure - 'RHU', - 'EVP', - 'WIN', - 'SSD': sunshine duration - 'GST': ground surface temperature - 'PET' """ fpath = os.path.join(self.meteo_path, f"{stn_id}.txt") df = pd.read_csv(fpath) df.index = pd.to_datetime(df.pop("Date")) if 'PET' not in df: df['PET'] = None # following two stations have multiple enteries if stn_id in ['17456', '18161']: df = drop_duplicate_indices(df) return df def _maybe_meteo_to_nc(self): if os.path.exists(self.meteo_nc_path): return stations = os.listdir(self.meteo_path) dyn = {} for idx, stn in enumerate(stations): if stn not in ['35616.txt']: stn_id = stn.split('.')[0] dyn[stn_id] = self._read_meteo_from_csv(stn_id).astype(np.float32) data_vars = {} coords = {} for k, v in dyn.items(): data_vars[k] = (['time', 'dynamic_features'], v) index = v.index index.name = 'time' coords = { 'dynamic_features': list(v.columns), 'time': index } xds = xr.Dataset( data_vars=data_vars, coords=coords, attrs={'date': f"create on {dateandtime_now()}"} ) xds.to_netcdf(self.meteo_nc_path) return
[docs] def fetch_meteo( self, stn_id:Union[str, List[str]] = "all", features:Union[str, List[str]] = "all", st = '1990-01-01', en = '2021-03-31', as_dataframe:bool = True ): """ fetches meteorological data of 4902 chinese catchments >>> from water_datasets import CCAM >>> dataset = CCAM() >>> dynamic_features = ['PRE', 'TEM', 'PRS', 'RHU', 'EVP', 'WIN', 'PET'] >>> st = '1999-01-01' >>> en = '2020-03-31' >>> xds = dataset.fetch_meteo(features=features, st=st, en=en) """ def_features = ['PRE', 'TEM', 'PRS', 'RHU', 'EVP', 'WIN', 'SSD', 'GST', 'PET'] features = check_attributes(features, def_features) stations = check_attributes(stn_id, self.meteo_stations) if xr is None: raise ModuleNotFoundError(f"xarray must be installed") else: dyn = xr.open_dataset(self.meteo_nc_path) dyn = dyn[stations].sel(dynamic_features=features, time=slice(st, en)) if as_dataframe: dyn = dyn.to_dataframe(['time', 'dynamic_features']) return dyn
def _read_yr_dynamic_from_csv( self, stn_id:str )->pd.DataFrame: """ Reads daily dynamic (meteorological + streamflow) data for one catchment of yellow river and returns as DataFrame """ meteo_fpath = os.path.join(self.yr_data_path, stn_id, 'meteorological.txt') q_fpath = os.path.join(self.yr_data_path, stn_id, 'streamflow_raw.txt') meteo = pd.read_csv(meteo_fpath) meteo.index = pd.to_datetime(meteo.pop('date')) q = pd.read_csv(q_fpath) q.index = pd.to_datetime(q.pop('date')) return pd.concat([meteo, q], axis=1).astype(np.float32) def _read_dynamic_from_csv( self, stations, dynamic_features, st=None, en=None)->dict: """reads dynamic data of one or more catchments located along Yellow River basin """ attributes = check_attributes(dynamic_features, self.dynamic_features) dyn = {stn: self._read_yr_dynamic_from_csv(stn).loc["19990101": "20201231", attributes] for stn in stations} # making sure that data for all stations has same dimensions by inserting nans # and removign duplicates dyn = {stn:drop_duplicate_indices(data) for stn, data in dyn.items()} dummy = pd.DataFrame(index=pd.date_range("19990101", "20201231", freq="D")) dyn = {stn: pd.concat([v, dummy], axis=1) for stn, v in dyn.items()} return dyn
[docs] def fetch_static_features( self, stn_id: Union[str, List[str]] = "all", static_features:Union[str, List[str]] = "all" ) -> pd.DataFrame: """ Returns static features of one or more stations. Parameters ---------- stn_id : str name/id of station/stations of which to extract the data static_features : list/str, optional (default="all") The name/names of features to fetch. By default, all available static features are returned. Returns ------- pd.DataFrame a pandas dataframe of shape (stations, features) Examples --------- >>> from water_datasets import CAMELS_DK >>> dataset = CAMELS_DK() get the names of stations >>> stns = dataset.stations() >>> len(stns) 102 get all static data of all stations >>> static_data = dataset.fetch_static_features(stns) >>> static_data.shape (102, 124) get static data of one station only >>> static_data = dataset.fetch_static_features('0140') >>> static_data.shape (1, 124) get the names of static features >>> dataset.static_features get only selected features of all stations >>> static_data = dataset.fetch_static_features(stns, ['lon', 'lat', 'area']) >>> static_data.shape (102, 3) >>> data = dataset.fetch_static_features('0140', static_features=['lon', 'lat', 'area']) >>> data.shape (1, 3) """ stations = check_attributes(stn_id, self.stations(), 'stations') features = check_attributes(static_features, self.static_features, 'static_features') ds = [] for stn in stations: d = self._read_yr_static(stn) ds.append(d) return pd.concat(ds, axis=1).transpose().loc[:, features]
def _read_yr_static( self, stn_id:str )->pd.Series: """ Reads catchment attributes data for Yellow River catchments """ fpath = os.path.join(self.yr_data_path, stn_id, 'attributes.json') with open(fpath, 'r') as fp: data = json.load(fp) return pd.Series(data, name=stn_id)
def drop_duplicate_indices(df): return df[~df.index.duplicated(keep='first')]