__all__ = [
'GSHA',
'_GSHA',
'Japan'
'Arcticnet',
'Spain',
'Thailand'
]
import os
import time
from typing import List, Union, Dict
import concurrent.futures as cf
import numpy as np
import pandas as pd
from .._backend import xarray as xr, shapefile
from ..utils import get_cpus
from ..utils import check_attributes
from ..utils import merge_shapefiles
from .camels import Camels
from ._map import (
total_precipitation_with_specifier,
leaf_area_index,
actual_evapotranspiration_with_specifier,
total_potential_evapotranspiration_with_specifier,
download_longwave_radiation_with_specifier,
solar_radiation_with_specifier,
snow_water_equivalent_with_specifier,
mean_air_temp_with_specifier,
mean_windspeed_with_specifier,
u_component_of_wind_with_specifier,
v_component_of_wind_with_specifier,
groundwater_percentages,
soil_moisture_layer1,
soil_moisture_layer2,
soil_moisture_layer3,
soil_moisture_layer4,
observed_streamflow_cms,
)
from ._map import (
catchment_area,
gauge_latitude,
gauge_longitude,
slope
)
METEO_MAP = {
'arcticnet': 'Meteorology_PartI_arcticnet_AFD_GRDC_IWRIS_MLIT/Meteorology_arcticnet_AFD_GRDC_IWRIS_MLIT',
'AFD': 'Meteorology_PartI_arcticnet_AFD_GRDC_IWRIS_MLIT/Meteorology_arcticnet_AFD_GRDC_IWRIS_MLIT',
'GRDC': 'Meteorology_PartI_arcticnet_AFD_GRDC_IWRIS_MLIT/Meteorology_arcticnet_AFD_GRDC_IWRIS_MLIT',
'IWRIS': 'Meteorology_PartI_arcticnet_AFD_GRDC_IWRIS_MLIT/Meteorology_arcticnet_AFD_GRDC_IWRIS_MLIT',
'MLIT': 'Meteorology_PartI_arcticnet_AFD_GRDC_IWRIS_MLIT/Meteorology_arcticnet_AFD_GRDC_IWRIS_MLIT',
'HYDAT': 'Meteorology_ PartII_ANA_BOM_CCRR_HYDAT/Meteorology_ANA_BOM_CCRR_HYDAT',
'ANA': 'Meteorology_ PartII_ANA_BOM_CCRR_HYDAT/Meteorology_ANA_BOM_CCRR_HYDAT',
'BOM': 'Meteorology_ PartII_ANA_BOM_CCRR_HYDAT/Meteorology_ANA_BOM_CCRR_HYDAT',
'CCRR': 'Meteorology_ PartII_ANA_BOM_CCRR_HYDAT/Meteorology_ANA_BOM_CCRR_HYDAT',
'China': 'Meteorology_PartIII_China_CHP_RID_USGS/Meteorology_China_CHP_RID_USGS',
'CHP': 'Meteorology_PartIII_China_CHP_RID_USGS/Meteorology_China_CHP_RID_USGS',
'RID': 'Meteorology_PartIII_China_CHP_RID_USGS/Meteorology_China_CHP_RID_USGS',
'USGS': 'Meteorology_PartIII_China_CHP_RID_USGS/Meteorology_China_CHP_RID_USGS',
}
[docs]
class GSHA(Camels):
"""
Global streamflow characteristics, hydrometeorology and catchment
attributes following `Peirong et al., 2023 <https://doi.org/10.5194/essd-16-1559-2024>`_.
The data is downloaded from its `zenodo repository <https://zenodo.org/record/8090704>`_.
It should be noted that this dataset does not contain observed streamflow data.
It has 21568 stations, 26 dynamic (meteorological + storage) features with daily timestep, 21 dynamic
features (landcover + streamflow indices + reservoir) with yearly timestep and 35 static features.
Examples
--------
>>> from water_datasets import GSHA
>>> dataset = GSHA()
>>> len(dataset.stations())
21568
>>> dataset.agencies
['arcticnet', 'AFD', 'GRDC', 'IWRIS', 'MLIT', 'HYDAT', 'ANA', 'BOM', 'CCRR', 'China', 'CHP', 'RID', 'USGS']
>>> dataset.start
Timestamp('1979-01-01 00:00:00')
>>> dataset.end
Timestamp('2022-12-31 00:00:00')
>>> dataset.static_features
['ele_mt_uav', 'slp_dg_uav', 'lat', 'long', 'area', 'agency', ...]
>>> len(dataset.dynamic_features)
26
>>> len(dataset.daily_dynamic_features)
26
>>> len(dataset.yearly_dynamic_features)
21
>>> dataset.fetch_static_features('1001_arcticnet')
fetch static features for all stations of arcticnet agency
>>> dataset.fetch_static_features(agency='arcticnet')
fetch static features for all stations of arcticnet agency
>>> ds.fetch_dynamic_features(agency='arcticnet')
"""
url = "https://zenodo.org/record/8090704"
[docs]
def __init__(self,
path=None,
overwrite=False,
to_netcdf: bool = True,
**kwargs):
"""
Parameters
----------
to_netcdf : bool
whether to convert all the data into one netcdf file or not.
This will fasten repeated calls to fetch etc but will
require netcdf5 package as well as xarry.
"""
super(GSHA, self).__init__(path=path, to_netcdf=to_netcdf, **kwargs)
self.path = path
files = ['Global_files.zip',
'GSHAreadme.docx',
'LAI.zip',
'Landcover.zip',
'Meteorology_PartI_arcticnet_AFD_GRDC_IWRIS_MLIT.zip',
'Meteorology_ PartII_ANA_BOM_CCRR_HYDAT.zip',
'Meteorology_PartIII_China_CHP_RID_USGS.zip',
'Reservoir.zip',
'Storage.zip',
'StreamflowIndices.zip',
'WatershedPolygons.zip',
'WatershedsAll.csv'
]
# self._download(overwrite=overwrite, files_to_check=files)
self._maybe_merge_shapefiles()
fpath = os.path.join(self.path,
"Global_files",
"Global_files",
'WatershedsAll.csv')
wsAll = pd.read_csv(fpath)
wsAll.columns = ['station_id', 'lat', 'long', 'area', 'agency']
wsAll.index = wsAll.pop('station_id')
self.wsAll = wsAll[~wsAll.index.duplicated(keep='first')].copy()
self.boundary_file = os.path.join(self.path, "boundaries.shp")
self._create_boundary_id_map(self.boundary_file, 7)
self._daily_dynamic_features = self.__daily_dynamic_features()
self._yearly_dynamic_features = self.__yearly_dynamic_features()
self._static_features = self.__static_features()
@property
def agencies(self) -> List[str]:
"""
returns the names of agencies as list
- ``arcticnet`` : Antarctica
- ``AFD`` : Spain
- ``GRDC`` : Global
- ``IWRIS`` : India
- ``MLIT`` : Japan
- ``HYDAT`` : Canada
- ``ANA``: Brazil
- ``BOM`` : Australia
- ``CCRR`` : Chile
- ``China``
- ``CHP`` : China
- ``RID`` : Thailand
- ``USGS``
"""
return self.wsAll.loc[:, 'agency'].unique()
@property
def daily_dynamic_features(self) -> List[str]:
return self._daily_dynamic_features
@property
def yearly_dynamic_features(self) -> List[str]:
return self._yearly_dynamic_features
@property
def static_map(self) -> Dict[str, str]:
return {
'area': catchment_area(),
'lat': gauge_latitude(),
'slp_dg_uav': slope('degrees'),
'long': gauge_longitude(),
}
@property
def dyn_map(self):
return {
'EP_GLEAM': actual_evapotranspiration_with_specifier('gleam'),
'EP_REA': actual_evapotranspiration_with_specifier('rea'),
'GLEAM_PET': total_potential_evapotranspiration_with_specifier('gleam'),
'GW': groundwater_percentages(),
'HPET_PET': total_potential_evapotranspiration_with_specifier('hpet'),
'LONGRAD_ERA': download_longwave_radiation_with_specifier('era5'),
'LONGRAD_MERRA': download_longwave_radiation_with_specifier('merra2'),
'P_EMEarth': total_precipitation_with_specifier('emearth'),
'P_MSWEP': total_precipitation_with_specifier('mswep'),
'SHORTRAD_ERA': solar_radiation_with_specifier('era5'),
'SHORTRAD_MERRA': solar_radiation_with_specifier('merra2'),
'SML1': soil_moisture_layer1(),
'SML2': soil_moisture_layer2(),
'SML3': soil_moisture_layer3(),
'SML4': soil_moisture_layer4(),
'SWDE': snow_water_equivalent_with_specifier('era5'), # change m to mm
'T_ERA': mean_air_temp_with_specifier('era5'), # change from K to C
'T_EUSTACE': mean_air_temp_with_specifier('eustace'), # change from K to C
'T_MERRA': mean_air_temp_with_specifier('merra2'), # change from K to C
'WINDERA': mean_windspeed_with_specifier('era5'),
'WINDMERRA': mean_windspeed_with_specifier('merra'),
'WINDU_ERA': u_component_of_wind_with_specifier('era5'),
'WINDU_MERRA': u_component_of_wind_with_specifier('merra'),
'WINDV_ERA': v_component_of_wind_with_specifier('era5'),
'WINDV_MERRA': v_component_of_wind_with_specifier('merra'),
'lai': leaf_area_index(),
}
@property
def dyn_factors(self):
return {
snow_water_equivalent_with_specifier('era5'): lambda x: x * 1000,
mean_air_temp_with_specifier('era5'): lambda x: x - 273.15,
mean_air_temp_with_specifier('eustace'): lambda x: x - 273.15,
mean_air_temp_with_specifier('merra2'): lambda x: x - 273.15,
}
def __daily_dynamic_features(self):
return pd.concat(
[self.meteo_vars_stn('1001_arcticnet'),
self.storage_vars_stn('1001_arcticnet'),
],
axis=1
).columns.tolist() + ['lai']
def __yearly_dynamic_features(self):
return pd.concat(
[self.lc_variables_stn('1001_arcticnet'),
self.streamflow_indices_stn('1001_arcticnet'),
self.reservoir_variables_stn('1001_arcticnet')
],
axis=1
).columns.tolist()
@property
def start(self) -> pd.Timestamp:
return pd.Timestamp('1979-01-01')
@property
def end(self) -> pd.Timestamp:
return pd.Timestamp('2022-12-31')
@property
def static_features(self) -> List[str]:
return self._static_features
@property
def dynamic_features(self) -> List[str]:
return self.daily_dynamic_features
def __static_features(self):
return pd.concat(
[self.atlas('1001_arcticnet'),
self.uncertainty('1001_arcticnet')
],
axis=1
).columns.tolist() + self.wsAll.columns.tolist()
[docs]
def agency_of_stn(self, stn: str) -> str:
"""find the agency to which a station belongs """
return self.wsAll.loc[stn, 'agency']
[docs]
def agency_stations(self, agency: str) -> List[str]:
"""returns the station ids from a particular agency"""
return self.wsAll[self.wsAll['agency'] == agency].index.tolist()
def _maybe_merge_shapefiles(self):
shp_path = os.path.join(self.path, 'WatershedPolygons', 'WatershedPolygons')
out_shapefile = os.path.join(self.path, 'boundaries.shp')
if not os.path.exists(out_shapefile):
shp_files = [os.path.join(shp_path, filename) for filename in os.listdir(shp_path) if
filename.endswith('.shp')]
merge_shapefiles(shp_files, out_shapefile,
add_new_field=True)
return
def _get_stations(
self,
stations: Union[str, List[str]] = "all",
agency: Union[str, List[str]] = "all"
) -> List[str]:
if agency != "all" and stations != 'all':
raise ValueError("Either provide agency or stations not both")
if agency != "all":
agency = check_attributes(agency, self.agencies, 'agency')
stations = self.wsAll[self.wsAll['agency'].isin(agency)].index.tolist()
else:
stations = check_attributes(stations, self.stations(), 'stations')
return stations
[docs]
def stn_coords(self, stations: List[str] = "all", agency: List[str] = "all") -> pd.DataFrame:
"""
returns the latitude and longitude of stations
Returns
-------
pd.DataFrame
a pandas DataFrame of shape (n, 2) where n is the number of stations
Examples
--------
>>> from water_datasets import GSHA
>>> dataset = GSHA()
>>> dataset.stn_coords('1001_arcticnet')
>>> dataset.stn_coords(['1001_arcticnet', '1002_arcticnet'])
get coordinates for all stations of arcticnet agency
>>> dataset.stn_coords(agency='arcticnet')
"""
stations = self._get_stations(stations, agency)
return self.wsAll.loc[stations, ['lat', 'long']].copy()
[docs]
def stations(self, agency: str = "all") -> List[str]:
"""returns names of stations as list"""
if agency != "all":
agency = check_attributes(agency, self.agencies, 'agency')
return self.wsAll[self.wsAll['agency'].isin(agency)].index.tolist()
return self.wsAll.index.tolist()
[docs]
def area(self, stations: List[str] = "all", agency: List[str] = "all") -> pd.Series:
"""area of catchments"""
stations = self._get_stations(stations, agency)
return self.wsAll.loc[stations, 'area']
[docs]
def uncertainty(
self,
stations: List[str] = "all",
agency: List[str] = "all"
) -> pd.DataFrame:
"""
Uncertainty estimates of all meteorological variables over all watersheds
- P_uncertainty (%) Precipitation uncertainty estimates (in percentage). Uncertainties are calculated from EM-Earth deterministic and MSWEP datasets.
- T_uncertainty (%) Temperature uncertainty estimates (in percentage). Uncertainties are calculated from EUSTACE, MERRA-2, and ERA5 datasets.
- EVP_uncertainty (%) Actual evapotranspiration uncertainty estimates (in percentage). Uncertainties are calculated from GLEAM and REA datasets.
- LRAD_uncertainty (%) Downward longwave radiation uncertainty estimates (in percentage). Uncertainties are calculated from MERRA-2 and ERA5-land datasets.
- SRAD_uncertainty (%) Downward shortwave radiation uncertainty estimates (in percentage). Uncertainties are calculated from MERRA-2 and ERA5-land datasets.
- wind_uncertainty (%) Wind speed uncertainty estimates (in percentage). The u- and v- components are aggregated on each grid to obtain wind speed. Uncertainties are calculated from MERRA-2 and ERA5-land datasets.
- pet_uncertainty (%) Potential evapotranspiration uncertainty estimates (in percentage). Uncertainties are calculated from GLEAM and REA datasets.
Returns
-------
pd.DataFrame
a pandas DataFrame of shape (n, 7) where n is the number of stations
"""
stations = self._get_stations(stations, agency)
fpath = os.path.join(
self.path,
"Global_files",
"Global_files",
'Uncertainty.csv')
df = pd.read_csv(fpath, index_col=0)
df = df[~df.index.duplicated(keep='first')]
return df.loc[stations, :]
[docs]
def atlas(self, stations: List[str] = "all", agency: List[str] = "all") -> pd.DataFrame:
"""
The link table between GSHA watershed IDs and RiverATLAS
river reach IDs, as well as the selected static attributes
Returns
-------
pd.DataFrame
a pandas DataFrame of shape (n, 24) where n is the number of stations
"""
stations = self._get_stations(stations, agency)
fpath = os.path.join(
self.path,
"Global_files",
"Global_files",
'GSHA_ATLAS.csv')
df = pd.read_csv(fpath, index_col=0)
df = df[~df.index.duplicated(keep='first')]
return df.loc[stations, :]
[docs]
def lc_variables_stn(self, stn: str) -> pd.DataFrame:
"""
Landcover variables for a given station which have yearly timestep.
Following three landcover variables are provided:
- urban_fraction(%): Ratio of urban extent to the entire watershed area (percentage).
- forest_fraction(%): Ratio of forest extent to the entire watershed area (percentage).
- cropland_fraction(%): Ratio of cropland extent to the entire watershed area (percentage).
Returns
-------
pd.DataFrame
a pandas DataFrame of shape (n, 3) where n is the number of years
"""
return lc_variable_stn(self.path, stn)
[docs]
def lc_variables(
self,
stations: List[str] = "all",
agency: List[str] = "all"
):
"""
Landcover variables for one or more than one station either
as xr.Dataset or dictionary. The data has yearly timestep.
"""
stations = self._get_stations(stations, agency)
lc_vars = lc_vars_all_stns(self.path)
if isinstance(lc_vars, xr.Dataset):
return lc_vars[stations]
else:
return {stn: lc_vars[stn] for stn in stations}
[docs]
def reservoir_variables_stn(self, stn: str) -> pd.DataFrame:
"""
Reservoir variables for a given station from 1979 to 2020 with yearly timestep.
Following two reservoir variables are provided:
- ``capacity``: Reservoir capacity of the year in the watershed (m3). To avoid including too many missing values, we use the ICOLD capacity in the linked table of the GeoDAR dataset.
- ``dor``: Degree of regulation of the watershed (yearly reservoir capacity/yearly mean flow). If yearly mean flow is missing, the value is substituted with the average of all mean flow values.
Returns
-------
pd.DataFrame
a pandas DataFrame of shape (42, 2) where 42 is the number of years
"""
return reservoir_vars_stn(self.path, stn)
[docs]
def reservoir_variables(
self,
stations: List[str] = "all",
agency: List[str] = "all"
):
"""
Reservoir variables for one or more than one station either
as xr.Dataset or dictionary. The data has yearly timestep.
"""
stations = self._get_stations(stations, agency)
lc_vars = reservoir_vars_all_stns(self.path)
if isinstance(lc_vars, xr.Dataset):
return lc_vars[stations]
else:
return {stn: lc_vars[stn] for stn in stations}
[docs]
def streamflow_indices_stn(self, stn: str) -> pd.DataFrame:
"""
Streamflow indices for a given station which have yearly timestep.
Returns
-------
pd.DataFrame
a pandas DataFrame of shape (n, 16) where n is the number of years
"""
return streamflow_indices_stn(self.path, stn)
[docs]
def streamflow_indices(
self,
stations: List[str] = "all",
agency: List[str] = "all"
):
"""
Landcover variables for one or more than one station either
as xr.Dataset or dictionary. The data has yearly timestep.
"""
stations = self._get_stations(stations, agency)
lc_vars = streamflow_indices_all_stations(
self.path,
to_netcdf=self.to_netcdf,
verbosity=self.verbosity
)
if isinstance(lc_vars, xr.Dataset):
return lc_vars[stations]
else:
return {stn: lc_vars[stn] for stn in stations}
[docs]
def lai_stn(self, stn: str) -> pd.Series:
"""
Daily leaf area index. As per documentation, due to satellite data quality,
some watersheds might have relatively serious data missing issue. The data is
from 1981-01-01 to 2020-12-31.
Returns
-------
pd.Series
a pandas Series of shape (14571,) where 14571 is the number of days
"""
return lai_stn(self.path, stn)
[docs]
def lai(
self,
stations: List[str] = "all",
agency: List[str] = "all"
):
"""
Leaf Area Index timeseries for one or more than one station either
as xr.Dataset or pandas DataFrame. The data has daily timestep.
"""
stations = self._get_stations(stations, agency)
lai = lai_all_stns(
self.path,
to_netcdf=self.to_netcdf,
verbosity=self.verbosity
)
return lai[stations]
[docs]
def meteo_vars_stn(self, stn: str) -> pd.DataFrame:
"""
Daily meteorological variables from 1979-01-01 to 2022-12-31 for a given station.
Returns
-------
pd.DataFrame
a pandas DataFrame of shape (16071, 19) where n is the number of days
"""
path = os.path.join(
self.path,
METEO_MAP[self.agency_of_stn(stn)],
f'{stn}.csv'
)
return self._meteo_vars_stn(path)
[docs]
def meteo_vars_all_stns(self):
"""
Meteorological variables from 1979-01-01 to 2022-12-31 for all stations either
as xr.Dataset or dictionary. The data has daily timestep.
"""
nc_path = os.path.join(self.path, 'meteo_vars.nc')
if self.to_netcdf and os.path.exists(nc_path):
if self.verbosity: print(f"Reading from pre-existing {nc_path}")
return xr.open_dataset(nc_path)
meteo_vars = {}
paths = [os.path.join(
self.path,
METEO_MAP[self.agency_of_stn(stn)],
f'{stn}.csv') for stn in self.stations()]
cpus = self.processes or get_cpus() - 2
start = time.time()
if self.verbosity:
print(f"Reading meteorological variables for {len(self.stations())} stations using {cpus} cpus")
# takes ~ 1538 seconds with 110 cpus
with cf.ProcessPoolExecutor(cpus) as executor:
results = executor.map(
self._meteo_vars_stn,
paths
)
if self.verbosity: print(f"Time taken: {time.time() - start:.2f} seconds")
if self.to_netcdf:
for stn, df in zip(self.stations(), results):
meteo_vars[stn] = df
encoding = {stn: {'dtype': 'float32', 'zlib': True, 'complevel': 3} for stn in meteo_vars.keys()}
ds = xr.Dataset(meteo_vars)
if self.verbosity: print(f"Saving to {nc_path}")
ds.to_netcdf(nc_path, encoding=encoding)
else:
ds = {stn: df for stn, df in zip(self.stations(), results)}
return ds
[docs]
def meteo_vars(
self,
stations: List[str] = "all",
agency: List[str] = "all"
):
"""
Meteorological variables from 1979-01-01 to 2022-12-31 for one or more than one station either
as xr.Dataset or dictionary. The data has daily timestep.
"""
if agency != "all" and stations != 'all':
raise ValueError("Either provide agency or stations not both")
if agency != "all":
agency = check_attributes(agency, self.agencies, 'agency')
stations = self.wsAll[self.wsAll['agency'].isin(agency)].index.tolist()
else:
stations = check_attributes(stations, self.stations(), 'stations')
meteo_vars = self.meteo_vars_all_stns()
if isinstance(meteo_vars, xr.Dataset):
return meteo_vars[stations]
else:
return {stn: meteo_vars[stn] for stn in stations}
[docs]
def storage_vars_stn(self, stn: str) -> pd.DataFrame:
"""
Daily Water storage term variables from 1979-01-01 to 2021-12-31 for a given station.
- SM_layer1: 0-7 cm soil moisture from ERA5 land soil water layer 1 (m3/m3) for 1979-2021.
- SM_layer2: 7-28 cm soil moisture from ERA5 land soil water layer 2 (m3/m3) for 1979-2021.
- SM_layer3: 28-100 cm soil moisture from ERA5 land soil water layer 3 (m3/m3) for 1979-2021.
- SM_layer4: 100-289 cm soil moisture from ERA5 land soil water layer 4 (m3/m3) for 1979-2021.
- SWDE: Snow water equivalent from ERA5 snow depth water equivalent (m of water equivalent) for 1979-2021.
- groundwater(%): Groundwater percentage from GRACE-FO data assimilation (%) for 2003-2021 (weekly).
Returns
-------
pd.DataFrame
a pandas DataFrame of shape (15706, 6) where n is the number of days
"""
path = os.path.join(
self.path,
"Storage",
"Storage",
f'{stn}.csv'
)
return self._storage_vars_stn(path)
[docs]
def storage_vars_all_stns(self):
"""
Water storage term variables from 1979-01-01 to 2021-12-31 for all stations either
as xr.Dataset or dictionary. The data has daily timestep.
"""
nc_path = os.path.join(self.path, 'storage.nc')
if self.to_netcdf and os.path.exists(nc_path):
if self.verbosity: print(f"Reading from pre-existing {nc_path}")
return xr.open_dataset(nc_path)
storage_vars = {}
paths = [os.path.join(
self.path,
"Storage",
"Storage",
f'{stn}.csv') for stn in self.stations()]
cpus = self.processes or get_cpus() - 2
start = time.time()
if self.verbosity: print(f"Reading storage vars for {len(self.stations())} stations using {cpus} cpus")
# takes ~ 975 seconds with 110 cpus
with cf.ProcessPoolExecutor(cpus) as executor:
results = executor.map(
self._storage_vars_stn,
paths
)
if self.verbosity: print(f"Time taken: {time.time() - start:.2f} seconds")
if self.to_netcdf:
encoding = {stn: {'dtype': 'float32', 'zlib': True, 'complevel': 3} for stn in self.stations()}
for stn, df in zip(self.stations(), results):
storage_vars[stn] = df
ds = xr.Dataset(storage_vars)
if self.verbosity: print(f"Saving to {nc_path}")
ds.to_netcdf(nc_path, encoding=encoding)
else:
ds = {stn: df for stn, df in zip(self.stations(), results)}
return ds
[docs]
def storage_vars(
self,
stations: List[str] = "all",
agency: List[str] = "all"
):
"""
Water storage term variables from 1979-01-01 to 2021-12-31 for one or more than one station either
as xr.Dataset or dictionary. The data has daily timestep.
"""
stations = self._get_stations(stations, agency)
meteo_vars = self.storage_vars_all_stns()
if isinstance(meteo_vars, xr.Dataset):
return meteo_vars[stations]
else:
return {stn: meteo_vars[stn] for stn in stations}
[docs]
def fetch_static_features(
self,
stations: Union[str, List[str]] = "all",
static_features: Union[str, List[str]] = "all",
agency: List[str] = "all",
) -> pd.DataFrame:
"""
Returns static features of one or more stations.
Parameters
----------
stations : str
name/id of station/stations of which to extract the data
static_features : list/str, optional (default="all")
The name/names of features to fetch. By default, all available
static features are returned.
Returns
-------
pd.DataFrame
a pandas dataframe of shape (stations, features)
Examples
---------
>>> from water_datasets import GSHA
>>> dataset = GSHA()
get the names of stations
>>> stns = dataset.stations()
>>> len(stns)
21568
get all static data of all stations
>>> static_data = dataset.fetch_static_features(stns)
>>> static_data.shape
(21568, 35)
get static data of one station only
>>> static_data = dataset.fetch_static_features('1001_arcticnet')
>>> static_data.shape
(1, 35)
get the names of static features
>>> dataset.static_features
get only selected features of all stations
>>> static_data = dataset.fetch_static_features(stns, ['ele_mt_uav', 'slp_dg_uav'])
>>> static_data.shape
(21568, 2)
>>> data = dataset.fetch_static_features('1001_arcticnet', static_features=['slp_dg_uav', 'slp_dg_uav'])
>>> data.shape
(1, 2)
>>> out = ds.fetch_static_features(agency='arcticnet')
>>> out.shape
(106, 35
"""
stations = self._get_stations(stations, agency)
features = check_attributes(static_features, self.static_features, 'static_features')
return pd.concat([
self.atlas(stations),
self.uncertainty(stations),
self.wsAll.loc[stations, :]
],
axis=1
).loc[:, features]
[docs]
def fetch_stn_dynamic_features(
self,
stn_id: str,
dynamic_features='all',
) -> pd.DataFrame:
"""
Fetches all or selected dynamic features of one station.
Parameters
----------
stn_id : str
name/id of station of which to extract the data
features : list/str, optional (default="all")
The name/names of features to fetch. By default, all available
dynamic features are returned.
Returns
-------
pd.DataFrame
a pandas dataframe of shape (n, features) where n is the number of days
Examples
--------
>>> from water_datasets import GSHA
>>> camels = GSHA()
>>> camels.fetch_stn_dynamic_features('1001_arcticnet').unstack()
>>> camels.dynamic_features
>>> camels.fetch_stn_dynamic_features('1001_arcticnet',
... features=['tmax_AWAP', 'vprp_AWAP']).unstack()
"""
features = check_attributes(dynamic_features, self.dynamic_features, 'dynamic_features')
out = pd.concat(
[self.meteo_vars_stn(stn_id),
self.storage_vars_stn(stn_id),
self.lai_stn(stn_id).rename('lai')
],
axis=1
).loc[:, features]
out.columns.name = 'dynamic_features'
return out
[docs]
def fetch_dynamic_features(
self,
stations: Union[List[str], str] = "all",
dynamic_features='all',
st=None,
en=None,
as_dataframe=False,
agency: List[str] = "all",
):
"""Fetches all or selected dynamic features of one station.
Parameters
----------
stations : str
name/id of station of which to extract the data
features : list/str, optional (default="all")
The name/names of features to fetch. By default, all available
dynamic features are returned.
st : Optional (default=None)
start time from where to fetch the data.
en : Optional (default=None)
end time untill where to fetch the data
as_dataframe : bool, optional (default=False)
if true, the returned data is pandas DataFrame otherwise it
is xarray dataset
Examples
--------
>>> from water_datasets import GSHA
>>> camels = GSHA()
>>> camels.fetch_dynamic_features('1001_arcticnet', as_dataframe=True).unstack()
>>> camels.dynamic_features
>>> camels.fetch_dynamic_features('1001_arcticnet',
... features=['tmax_AWAP', 'vprp_AWAP', 'streamflow_mmd'],
... as_dataframe=True).unstack()
"""
stations = self._get_stations(stations, agency)
features = check_attributes(dynamic_features, self.dynamic_features, 'dynamic_features')
st, en = self._check_length(st, en)
if len(stations) == 1:
if as_dataframe:
return self.fetch_stn_dynamic_features(stations[0], features)
else:
return xr.Dataset({stations[0]: xr.DataArray(self.fetch_stn_dynamic_features(stations[0], features))})
if as_dataframe:
raise NotImplementedError("as_dataframe=True is not implemented yet")
meteo_vars = self.meteo_vars(stations)
storage_vars = self.storage_vars(stations)
# since lai does not have 'features' dimension, we need to add it
lai = self.lai(stations).expand_dims({'features': ['lai']})
ds = xr.concat([meteo_vars, storage_vars, lai], dim='features')
ds = ds.rename({'features': 'dynamic_features'})
return ds.sel(time=slice(st, en))
def _meteo_vars_stn(self, fpath) -> pd.DataFrame:
if not os.path.exists(fpath):
raise FileNotFoundError(f"{fpath} not found")
df = pd.read_csv(fpath, index_col=0,
)
df.index = pd.to_datetime(df.index)
df.index.name = 'time'
df.columns.name = 'features'
df.rename(columns=self.dyn_map, inplace=True)
for col, func in self.dyn_factors.items():
if col in df.columns:
df[col] = df[col].apply(func)
return df
def _storage_vars_stn(self, fpath) -> pd.DataFrame:
if not os.path.exists(fpath):
raise FileNotFoundError(f"{fpath} not found")
df = pd.read_csv(fpath, index_col=0,
dtype={'SWDE': np.float32,
'SML1': np.float32,
'SML2': np.float32,
'SML3': np.float32,
'SML4': np.float32,
'GW': np.float32, }
)
df.index = pd.to_datetime(df.index)
df.index.name = 'time'
df.columns.name = 'features'
df.rename(columns=self.dyn_map, inplace=True)
for col, func in self.dyn_factors.items():
if col in df.columns:
df[col] = df[col].apply(func)
return df
[docs]
class _GSHA(Camels):
"""
Parent class for those datasets which uses static and dynamic features from
GSHA dataset . The following dataset classes are based on this class:
- py:class:`water_datasets.Japan`
- py:class:`water_datasets.Thailand`
- py:class:`water_datasets.Spain`
"""
[docs]
def __init__(
self,
gsha_path: Union[str, os.PathLike] = None,
verbosity: int = 1,
**kwargs):
super(_GSHA, self).__init__(verbosity=verbosity, **kwargs)
if gsha_path is None:
self.gsha_path = os.path.dirname(self.path)
else:
self.gsha_path = gsha_path
self.gsha = GSHA(path=self.gsha_path, verbosity=verbosity)
self.boundary_file = self.gsha.boundary_file
self._stations = self.__stations()
@property
def start(self) -> pd.Timestamp:
return pd.Timestamp('1979-01-01')
@property
def end(self) -> pd.Timestamp:
return pd.Timestamp('2022-12-31')
@property
def dynamic_features(self) -> List[str]:
return [observed_streamflow_cms()] + self.gsha.dynamic_features
@property
def static_features(self) -> List[str]:
return self.gsha.static_features
@property
def _coords_name(self) -> List[str]:
return ['lat', 'long']
@property
def _area_name(self) -> str:
return 'area'
@property
def _q_name(self) -> str:
return observed_streamflow_cms()
def stations(self) -> List[str]:
return self._stations
def __stations(self) -> List[str]:
"""
returns names of only those stations which are also documented
by GSHA.
"""
return [stn.split('_')[0] for stn in self.gsha.agency_stations(self.agency_name)]
[docs]
def get_boundary(
self,
catchment_id: str,
as_type: str = 'numpy'
):
"""
returns boundary of a catchment in a required format
Parameters
----------
catchment_id : str
name/id of catchment
as_type : str
'numpy' or 'geopandas'
Examples
--------
>>> from water_datasets import Japan
>>> dataset = Japan()
>>> dataset.get_boundary(dataset.stations()[0])
"""
if shapefile is None:
raise ModuleNotFoundError("shapefile module is not installed. Please install it to use boundary file")
from shapefile import Reader
bndry_sf = Reader(self.boundary_file)
if self.agency_name == 'RID':
catchment_id = catchment_id.replace('.', '_')
bndry_shp = bndry_sf.shape(self.gsha.bndry_id_map[f"{catchment_id}_{self.agency_name}"])
bndry_sf.close()
xyz = np.array(bndry_shp.points)
xyz = self.transform_coords(xyz)
return xyz
def _fetch_dynamic_features(
self,
stations: list,
dynamic_features='all',
st=None,
en=None,
as_dataframe=False,
as_ts=False
):
"""Fetches dynamic features of station."""
st, en = self._check_length(st, en)
features = check_attributes(dynamic_features, self.dynamic_features.copy(), 'dynamic_features')
daily_q = None
if observed_streamflow_cms() in features:
daily_q = self.get_q(as_dataframe)
if isinstance(daily_q, xr.Dataset):
daily_q = daily_q.sel(time=slice(st, en))[stations]
else:
daily_q = daily_q.loc[st:en, stations]
features.remove(observed_streamflow_cms())
if len(features) == 0:
return daily_q
stations_ = [f"{stn}_{self.agency_name}" for stn in stations]
data = self.gsha.fetch_dynamic_features(stations_, features, st, en, as_dataframe)
if daily_q is not None:
if isinstance(daily_q, xr.Dataset):
assert isinstance(data, xr.Dataset), "xarray dataset not supported"
data = data.rename({stn: stn.split('_')[0] for stn in data.data_vars})
# first create a new dimension in daily_q named dynamic_features
daily_q = daily_q.expand_dims({'dynamic_features': [observed_streamflow_cms()]})
data = xr.concat([data, daily_q], dim='dynamic_features').sel(time=slice(st, en))
else:
# -1 because the data in .nc files hysets starts with 0
data.rename(columns={stn: stn.split('_')[0] for stn in stations}, inplace=True)
assert isinstance(data.index, pd.MultiIndex)
# data is multiindex dataframe but daily_q is not
# first make daily_q multiindex
daily_q['dynamic_features'] = 'daily_q'
daily_q.set_index('dynamic_features', append=True, inplace=True)
daily_q = daily_q.reorder_levels(['time', 'dynamic_features'])
data = pd.concat([data, daily_q], axis=0).sort_index()
return data
def _fetch_static_features(
self,
station="all",
static_features: Union[str, list] = 'all',
st=None,
en=None,
as_ts=False
) -> pd.DataFrame:
"""Fetches static features of station."""
if self.verbosity > 1:
print('fetching static features')
stations = check_attributes(station, self.stations(), 'stations')
stations_ = [f"{stn}_{self.agency_name}" for stn in stations]
static_feats = self.gsha.fetch_static_features(stations_, static_features).copy()
static_feats.index = [stn.split('_')[0] for stn in static_feats.index]
return static_feats
[docs]
def fetch_stations_features(
self,
stations: list,
dynamic_features: Union[str, list, None] = 'all',
static_features: Union[str, list, None] = None,
st=None,
en=None,
as_dataframe: bool = False,
**kwargs
):
"""
returns features of multiple stations
Examples
--------
>>> from water_datasets import Arcticnet
>>> dataset = Arcticnet()
>>> stations = dataset.stations()
>>> features = dataset.fetch_stations_features(stations)
"""
stations = check_attributes(stations, self.stations(), 'stations')
if dynamic_features is not None:
dyn = self._fetch_dynamic_features(stations=stations,
dynamic_features=dynamic_features,
as_dataframe=as_dataframe,
st=st,
en=en,
**kwargs
)
if static_features is not None: # we want both static and dynamic
to_return = {}
static = self._fetch_static_features(station=stations,
static_features=static_features,
st=st,
en=en,
**kwargs
)
to_return['static'] = static
to_return['dynamic'] = dyn
else:
to_return = dyn
elif static_features is not None:
# we want only static
to_return = self._fetch_static_features(
station=stations,
static_features=static_features,
**kwargs
)
else:
raise ValueError(f"""
static features are {static_features} and dynamic features are also {dynamic_features}""")
return to_return
[docs]
def fetch_static_features(
self,
stations: Union[str, List[str]] = "all",
static_features: Union[str, List[str]] = "all",
st=None,
en=None,
as_ts=False
) -> pd.DataFrame:
"""
returns static atttributes of one or multiple stations
Parameters
----------
stations : str
name/id of station of which to extract the data
static_features : list/str, optional (default="all")
The name/names of features to fetch. By default, all available
static features are returned.
st :
en :
as_ts :
Examples
---------
>>> from water_datasets import Japan
>>> dataset = Japan()
get the names of stations
>>> stns = dataset.stations()
>>> len(stns)
12004
get all static data of all stations
>>> static_data = dataset.fetch_static_features(stns)
>>> static_data.shape
(12004, 27)
get static data of one station only
>>> static_data = dataset.fetch_static_features('01010070')
>>> static_data.shape
(1, 27)
get the names of static features
>>> dataset.static_features
get only selected features of all stations
>>> static_data = dataset.fetch_static_features(stns, ['Drainage_Area_km2', 'Elevation_m'])
>>> static_data.shape
(12004, 2)
"""
return self._fetch_static_features(stations, static_features, st, en, as_ts)
def streamflow_indices_all_stations(
ds_path: Union[str, os.PathLike],
cpus: int = None,
to_netcdf: bool = True,
verbosity: int = 1
):
nc_path = os.path.join(ds_path, 'streamflow_indices.nc')
if to_netcdf and os.path.exists(nc_path):
if verbosity: print(f"Reading from pre-existing {nc_path}")
return xr.open_dataset(nc_path)
cpus = cpus or get_cpus() - 2
start = time.time()
stations = GSHA(os.path.dirname(ds_path)).stations()
paths = [ds_path for _ in range(len(stations))]
if verbosity: print(f"Reading streamflow indices for {len(stations)} stations using {cpus} cpus")
# takes ~20 seconds with 110 cpus
with cf.ProcessPoolExecutor(cpus) as executor:
results = executor.map(
streamflow_indices_stn,
paths,
stations,
)
if verbosity: print(f"Time taken: {time.time() - start:.2f} seconds")
if to_netcdf:
encoding = {var: {'dtype': 'float32', 'zlib': True, 'complevel': 3} for var in stations()}
ds = xr.Dataset({str(stn): xr.DataArray(df) for stn, df in zip(stations, results)})
if verbosity: print(f"Saving to {nc_path}")
ds.to_netcdf(nc_path, encoding=encoding)
else:
ds = {stn: df for stn, df in zip(stations, results)}
return ds
def streamflow_indices_stn(
ds_path: Union[str, os.PathLike],
stn: str
) -> pd.DataFrame:
fpath = os.path.join(
ds_path,
"StreamflowIndices",
"StreamflowIndices",
f'{stn}.csv')
if not os.path.exists(fpath):
raise FileNotFoundError(f"{ds_path} for station {stn} not found")
df = pd.read_csv(
fpath, index_col=0,
dtype={'1- percentile': np.float32,
'10- percentile': np.float32,
'25- percentile': np.float32,
'median': np.float32,
'75- percentile': np.float32,
'90- percentile': np.float32,
'99- percentile': np.float32,
'mean': np.float32,
'maximum (AMF)': np.float32,
'AMF occurrence date': str,
'frequency of high-flow days': np.int32,
'average duration of high-flow events': np.float32,
'frequency of low-flow days': np.int32,
'average duration of low-flow events': np.float32,
'number of days with Q=0 (days)': np.int32,
'valid observation days (days)': np.int32,
}
)
df.index = pd.to_datetime(df.index, format='%Y')
df.index.name = 'years'
df.columns.name = 'streamflow_indices'
return df
def lc_variable_stn(ds_path, stn: str) -> pd.DataFrame:
fpath = os.path.join(
ds_path,
"Landcover",
"Landcover",
f'{stn}.csv')
if not os.path.exists(fpath):
raise FileNotFoundError(f"{ds_path} for station {stn} not found")
df = pd.read_csv(fpath, index_col=0,
# dtype={'1- percentile': np.float32}
)
df.index = pd.to_datetime(df.pop('year'), format='%Y')
df.index.name = 'years'
df.columns.name = 'lc_variables'
return df
def lc_vars_all_stns(
ds_path: Union[str, os.PathLike],
cpus: int = None,
to_netcdf: bool = True,
verbosity: int = 1
):
nc_path = os.path.join(ds_path, 'lc_variables.nc')
if to_netcdf and os.path.exists(nc_path):
if verbosity: print(f"Reading from pre-existing {nc_path}")
return xr.open_dataset(nc_path)
cpus = cpus or get_cpus() - 2
start = time.time()
stations = GSHA(os.path.dirname(ds_path)).stations()
paths = [ds_path for _ in range(len(stations))]
if verbosity: print(f"Reading landcover variables for {len(stations)} stations using {cpus} cpus")
with cf.ProcessPoolExecutor(cpus) as executor:
results = executor.map(
lc_variable_stn,
paths,
stations,
)
print(f"Time taken: {time.time() - start:.2f} seconds")
if to_netcdf:
encoding = {var: {'dtype': 'float32', 'zlib': True, 'complevel': 3} for var in stations()}
ds = xr.Dataset({stn: xr.DataArray(val) for stn, val in zip(stations, results)})
print(f"Saving to {nc_path}")
ds.to_netcdf(nc_path, encoding=encoding)
else:
ds = {stn: df for stn, df in zip(stations, results)}
return ds
def reservoir_vars_stn(ds_path, stn: str) -> pd.DataFrame:
fpath = os.path.join(
ds_path,
"Reservoir",
"Reservoir",
f'{stn}.csv')
if not os.path.exists(fpath):
raise FileNotFoundError(f"{ds_path} for station {stn} not found")
df = pd.read_csv(fpath, index_col=0,
dtype={'capacity': np.float32, 'dor': np.float32, 'year': np.int32}
)
df.index = pd.to_datetime(df.index, format='%Y')
df.index.name = 'years'
df.columns.name = 'reservoir_variables'
return df
def reservoir_vars_all_stns(
ds_path: Union[str, os.PathLike],
cpus: int = None,
to_netcdf: bool = True,
verbosity: int = 1
):
nc_path = os.path.join(ds_path, 'reservoir_variables.nc')
if to_netcdf and os.path.exists(nc_path):
if verbosity: print(f"Reading from pre-existing {nc_path}")
return xr.open_dataset(nc_path)
cpus = cpus or get_cpus() - 2
start = time.time()
stations = GSHA(os.path.dirname(ds_path)).stations()
paths = [ds_path for _ in range(len(stations))]
if verbosity: print(f"Reading reservoir variables for {len(stations)} stations using {cpus} cpus")
with cf.ProcessPoolExecutor(cpus) as executor:
results = executor.map(
reservoir_vars_stn,
paths,
stations,
)
print(f"Time taken: {time.time() - start:.2f} seconds")
if to_netcdf:
encoding = {var: {'dtype': 'float32', 'zlib': True, 'complevel': 3} for var in stations}
ds = xr.Dataset({stn: xr.DataArray(val) for stn, val in zip(stations, results)})
print(f"Saving to {nc_path}")
ds.to_netcdf(nc_path, encoding=encoding)
else:
ds = {stn: df for stn, df in zip(stations, results)}
return ds
def lai_stn(ds_path, stn: str) -> pd.Series:
fpath = os.path.join(
ds_path,
"LAI",
"LAI",
f'{stn}.csv')
if not os.path.exists(fpath):
raise FileNotFoundError(f"{ds_path} for station {stn} not found")
df = pd.read_csv(fpath, index_col='date',
dtype={stn: np.float32}
)
df.index = pd.to_datetime(df.index)
df.index.name = 'time'
return df[stn]
def lai_all_stns(
ds_path: Union[str, os.PathLike],
cpus: int = None,
to_netcdf: bool = True,
verbosity: int = 1
):
if to_netcdf:
nc_path = os.path.join(ds_path, 'lai.nc')
if os.path.exists(nc_path):
if verbosity: print(f"Reading from pre-existing {nc_path}")
return xr.open_dataset(nc_path)
elif os.path.exists(os.path.join(ds_path, 'lai.csv')):
if verbosity: print(f"Reading from pre-existing {ds_path}")
return pd.read_csv(os.path.join(ds_path, 'lai.csv'), index_col=0)
cpus = cpus or get_cpus() - 2
start = time.time()
stations = GSHA(os.path.dirname(ds_path)).stations()
paths = [ds_path for _ in range(len(stations))]
if verbosity: print(f"Reading lai for {len(stations)} stations using {cpus} cpus")
with cf.ProcessPoolExecutor(cpus) as executor:
results = executor.map(
lai_stn,
paths,
stations,
)
if verbosity: print(f"Time taken: {time.time() - start:.2f} seconds")
if to_netcdf:
encoding = {stn: {'dtype': 'float32', 'zlib': True, 'complevel': 3} for stn in stations}
nc_path = os.path.join(ds_path, 'lai.nc')
ds = xr.Dataset({stn: xr.DataArray(val) for stn, val in zip(stations, results)})
print(f"Saving to {nc_path}")
ds.to_netcdf(nc_path, encoding=encoding)
else:
ds = pd.concat(results, axis=1)
csv_path = os.path.join(ds_path, 'lai.csv')
if verbosity: print(f"Saving to {csv_path}")
ds.to_csv(csv_path, index=True)
return ds
# the dates for data to be downloaded
START_YEAR = 1979
END_YEAR = 2023
[docs]
class Japan(_GSHA):
"""
Data of 694 catchments of Japan from
`river.go.jp website <http://www1.river.go.jp>`_ .
The meteorological data static catchment features and catchment boundaries
taken from `GSHA <https://doi.org/10.5194/essd-16-1559-2024>`_ project. Therefore,
the number of staic features are 35 and dynamic features are 27 and the
data is available from 1979-01-01 to 2022-12-31.
"""
[docs]
def __init__(
self,
path:Union[str, os.PathLike] = None,
gsha_path:Union[str, os.PathLike] = None,
verbosity:int=1,
**kwargs):
super().__init__(path=path, verbosity=verbosity,
gsha_path=gsha_path, **kwargs)
if not os.path.exists(self.path):
os.makedirs(self.path)
@property
def static_map(self) -> Dict[str, str]:
return {
'area': catchment_area(),
'lat': gauge_latitude(),
'long': gauge_longitude(),
}
@property
def agency_name(self)->str:
return 'MLIT'
def _maybe_move_and_merge_shpfiles(self):
out_shp_file = os.path.join(self.path, "boundaries.shp")
if not os.path.exists(out_shp_file):
df = self.gsha._coords()
jpn_stns = df.loc[df['agency'] == 'MLIT']
shp_path = os.path.join(self.gsha.path, "WatershedPolygons",
"WatershedPolygons")
shp_files = [os.path.join(shp_path, f"{filename}.shp") for filename in jpn_stns['station_id'].values]
for f in shp_files:
assert os.path.exists(f)
merge_shapefiles(shp_files, out_shp_file, add_new_field=True)
return
[docs]
def get_q(self, as_dataframe:bool=True)->pd.DataFrame:
"""reads daily streamflow for all stations and puts them in a single
file named data.csv. If data.csv is already present, then it is read
and its contents are returned as dataframe.
"""
if self.timestep in ('daily', 'D'):
df = download_daily_data(
self.stations(),
self.path,
verbosity=self.verbosity,
cpus=self.processes
)
else:
df = self.get_hourly_data(cpus=self.processes)
df.index.name = 'time'
if as_dataframe:
return df
df = xr.Dataset({stn: xr.DataArray(df.loc[:, stn]) for stn in df.columns})
return df
def get_hourly_data(self, cpus=None):
hourly_file = os.path.join(self.path, 'hourly_data.csv')
if os.path.exists(hourly_file):
print(f"reading hourly data from {hourly_file}")
return pd.read_csv(hourly_file, index_col=0)
path = os.path.join(self.path, 'hourly_files')
if self.verbosity>0: print(f"preparing hourly data using {cpus} cpus")
stn_qs = []
for idx, stn in enumerate(self.stations()):
stn_q = download_hourly_stn(path, stn=stn, cpus=cpus, verbosity=self.verbosity)
if self.verbosity>0: print(f"{idx} {stn}, {len(stn_q)}, {stn_q.index[0]}")
stn_qs.append(stn_q)
q = pd.concat(stn_qs, axis=1)
q.to_csv(hourly_file)
return q
def download_daily_stn_yr(
stn:str="309191289913130",
yr:int=1979,
)->pd.Series:
"""downloads daily data for a year for a station"""
url = f'http://www1.river.go.jp/cgi-bin/DspWaterData.exe?KIND=7&ID={stn}&BGNDATE={yr}0131&ENDDATE={yr}1231&KAWABOU=NO'
df = pd.read_html(url, encoding='EUC-JP')[1].loc[2:, 1:].reset_index(drop=True)
# make a dictionary with months as keys and number of days as values
days_in_month = {1: 31, 2: 28, 3: 31, 4: 30, 5: 31, 6: 30, 7: 31, 8: 31, 9: 30, 10: 31, 11: 30, 12: 31}
# if it is a leap year, change the number of days in February
if yr % 4 == 0:
days_in_month[2] = 29
assert len(df)<13, len(df)
yearly_data = []
for i in range(0, len(df)):
row = df.iloc[i, 0:days_in_month[i+1]]
yearly_data.append(row)
stn_data = pd.concat(yearly_data).reset_index(drop=True)
stn_data.index = pd.date_range(start=f'{yr}-01-01', end=f'{yr}-12-31', freq='D')
stn_data.name = stn
return stn_data
def download_daily_data(
stations:List[str],
path:Union[str, os.PathLike],
verbosity:int=1,
cpus:int=None
):
"""downloads daily data for all stations"""
csv_path = os.path.join(path, 'daily_q.csv')
if os.path.exists(csv_path):
if verbosity:
print(f"reading daily data from {csv_path}")
return pd.read_csv(csv_path, index_col=0, parse_dates=True)
years = range(START_YEAR, END_YEAR+1)
stations_ = [[stn]*len(years) for stn in stations]
# flatten the list
stations_ = [item for sublist in stations_ for item in sublist]
years_ = list(years) * len(stations)
cpus = cpus or get_cpus()-2
if verbosity:
print(f"downloading daily data for {len(stations)} stations from {years[0]} to {years[-1]}")
if verbosity>1:
print(f"Total function calls: {len(stations_)} with {cpus} cpus")
start = time.time()
with cf.ProcessPoolExecutor(cpus) as executor:
results = executor.map(download_daily_stn_yr, stations_, years_)
if verbosity:
print(f"total time taken to download data: {time.time() - start}")
all_data = []
for stn in stations:
stn_data = []
for yr in years:
stn_yr_data = next(results)
stn_data.append(stn_yr_data)
stn_data = pd.concat(stn_data, axis=0)
stn_data.name = stn
if verbosity>2:
print(f"total number of years: {yr} for {stn} with shape {stn_data.shape}")
all_data.append(stn_data)
if verbosity>2:
print(f"total number of stations: {len(all_data)} each with shape {all_data[0].shape}")
all_data = pd.concat(all_data, axis=1)
all_data = all_data.replace({'−': np.nan, '欠測': np.nan})
all_data = all_data.astype(np.float32)
if verbosity:
print(f"saving daily data to {csv_path} with shape {all_data.shape}")
all_data.to_csv(csv_path)
return
def download_hourly_stn_day(
stn:str="309191289913130",
st:str="20211227",
en:str="20211227"
):
"""download hourly data for a single day for a single station"""
url = f"http://www1.river.go.jp/cgi-bin/SrchSiteSuiData2.exe?SUIKEI=90336000&BGNDATE={st}&ENDDATE={en}&ID={stn}:0202;"
data = pd.read_html(url)[0]
df = data.iloc[7:]
df.columns = ['date', 'time', stn]
if len(df)>0:
# make sure that we have data for all 24 hours
assert len(df) == 24, len(df)
else:
df.pop(stn)
df.insert(2, stn, [np.nan for _ in range(24)])
df.index = pd.date_range(pd.Timestamp(st), periods=24, freq="H")
return df[stn]
def download_hourly_stn(
path:Union[str, os.PathLike],
stn:str="301031281101030",
st_yr:int=1980,
en_yr:int=2024,
cpus:int=64,
verbosity:int = 1
)->pd.Series:
fpath = os.path.join(path, f"{stn}.csv")
if os.path.exists(fpath):
if verbosity>0: print(f"{stn} already exists")
return pd.read_csv(fpath, index_col=0)
starts, ends = [], []
for yr in range(st_yr, en_yr):
if yr % 4 == 0:
n_days = 366
else:
n_days = 365
for j_day in range(0, n_days):
# convert jday to date
date = pd.Timestamp(f"{yr}-01-01") + pd.Timedelta(days=j_day)
st = date.strftime("%Y%m%d")
en = date.strftime("%Y%m%d")
starts.append(st)
ends.append(en)
stations = [stn for _ in range(len(starts))]
with cf.ProcessPoolExecutor(cpus) as executor:
results = executor.map(download_hourly_stn_day, stations, starts, ends)
yr_df = []
for res in results:
yr_df.append(res)
q = pd.concat(yr_df)
# replace "欠測" with np.nan
q = q.replace("欠測", np.nan)
q = q.replace('-', np.nan)
q = q.dropna().astype(np.float32).sort_index()
# drop duplicated index
q = q[~q.index.duplicated(keep='first')]
q.to_csv(fpath)
return q
[docs]
class Arcticnet(_GSHA):
"""
Data of 106 catchments of arctic region from
`r-arcticnet project <https://www.r-arcticnet.sr.unh.edu/v4.0/AllData/index.html>`_ .
The meteorological data static catchment features and catchment boundaries
taken from `GSHA <https://doi.org/10.5194/essd-16-1559-2024>`_ project. Therefore,
the number of staic features are 35 and dynamic features are 27 and the
data is available from 1979-01-01 to 2003-12-31.
"""
[docs]
def __init__(
self,
path:Union[str, os.PathLike] = None,
gsha_path:Union[str, os.PathLike] = None,
verbosity:int=1,
**kwargs):
super().__init__(path=path, gsha_path=gsha_path, verbosity=verbosity, **kwargs)
if not os.path.exists(self.path):
os.makedirs(self.path)
self.metadata = self.get_metadata()
self._get_q()
self._stations = [stn for stn in self.all_stations() if stn in self.gsha_arctic_stns()]
self._static_features = self.gsha.static_features
@property
def static_map(self) -> Dict[str, str]:
return {
'area': catchment_area(),
'lat': gauge_latitude(),
'long': gauge_longitude(),
}
@property
def end(self)->pd.Timestamp:
return pd.Timestamp('2003-12-31')
def stations(self)->List[str]:
return self._stations
def all_stations(self):
return self.metadata.index.astype(str).tolist()
@property
def agency_name(self)->str:
return 'arcticnet'
def get_metadata(self):
metadata_path = os.path.join(self.path, "metadata.csv")
if not os.path.exists(metadata_path):
df = pd.read_csv(
"https://www.r-arcticnet.sr.unh.edu/v4.0/russia-arcticnet/Daily_SiteAttributes.txt",
#"https://www.r-arcticnet.sr.unh.edu/v4.0/data/SiteAttributes.txt",
sep="\t",
encoding_errors='ignore',
)
df.index = df.pop('Code')
df.to_csv(metadata_path, index=True)
else:
df = pd.read_csv(metadata_path, index_col=0)
return df
def get_q(self, as_dataframe:bool=True):
nc_path = os.path.join(self.path, "daily_q.nc")
if os.path.exists(nc_path):
if self.verbosity:
print(f"Reading {nc_path}")
q_ds = xr.open_dataset(nc_path)
else:
q_ds = xr.Dataset({stn:self.get_stn_q(stn) for stn in self.stations()})
# rename dimension/coordinate to 'time' in q_ds
q_ds = q_ds.rename({'dim_0':'time'})
encoding = {stn: {'dtype': 'float32', 'zlib': True, 'complevel': 3} for stn in self.stations()}
if self.verbosity:
print(f"Writing {nc_path}")
q_ds.to_netcdf(nc_path, encoding=encoding)
if as_dataframe:
q_ds = q_ds.to_dataframe()
return q_ds
def _get_q(self, as_dataframe:bool=True):
q_path = os.path.join(self.path, "daily_q.csv")
if not os.path.exists(q_path):
df = pd.read_csv(
"https://www.r-arcticnet.sr.unh.edu/v4.0/russia-arcticnet/discharge_m3_s_UNH-UCLA.txt",
#"https://www.r-arcticnet.sr.unh.edu/v4.0/data/Discharge_ms.txt",
sep="\t")
df.to_csv(q_path)
else:
df = pd.read_csv(q_path, index_col=0)
return df
def get_stn_q(self, stn: str):
q = self._get_q()
stn_q = q.loc[q['Code']==int(stn), :].copy()
stn_q.drop('Code', axis=1, inplace=True)
# Function to check leap year and adjust February days
def adjust_feb_days(year):
if (year % 4 == 0 and year % 100 != 0) or (year % 400 == 0):
return 29
return 28
month_days1 = {
'Jan': 31, 'Feb': 28, # February will be adjusted dynamically
'Mar': 31, 'Apr': 30, 'May': 31, 'Jun': 30,
'Jul': 31, 'Aug': 31, 'Sep': 30, 'Oct': 31,
'Nov': 30, 'Dec': 31
}
# Prepare a mapping of month names to the number of days
month_days2 = {
1: 31, 2: 28, # February will be adjusted dynamically
3: 31, 4: 30, 5: 31, 6: 30,
7: 31, 8: 31, 9: 30, 10: 31,
11: 30, 12: 31
}
# Melt the DataFrame to convert it from wide format to long format
df_long = stn_q.melt(id_vars=['Year', 'Day'], value_vars=['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'],
var_name='Month', value_name='Value')
month_to_int = {month: i + 1 for i, month in enumerate(month_days1.keys())}
df_long['Month'] = df_long['Month'].map(month_to_int).astype(int)
def get_days_in_month(row):
try:
if int(row.Month) == 2:
return adjust_feb_days(row.Year)
return month_days2[int(row.Month)]
except KeyError as e:
print(f"KeyError encountered: {e} with row details: {row}")
return None # Or handle the error as needed
df_long['DaysInMonth'] = df_long.apply(get_days_in_month, axis=1)
# Filter out invalid days (e.g., February 30)
df_long = df_long[df_long['Day'] <= df_long['DaysInMonth']]
# Create a datetime column from the Year, Month, Day
df_long.index = pd.to_datetime(df_long[['Year', 'Month', 'Day']])
return pd.Series(df_long['Value'], name=str(stn))
def gsha_arctic_stns(self)->List[str]:
df = self.gsha.wsAll.copy()
return [stn.split('_')[0] for stn in df[df.index.str.endswith('_arcticnet')].index]
[docs]
class Spain(_GSHA):
"""
Data of 889 catchments of Spain from
`ceh-es <https://ceh-flumen64.cedex.es>`_ website.
The meteorological data static catchment features and catchment boundaries
taken from `GSHA <https://doi.org/10.5194/essd-16-1559-2024>`_ project. Therefore,
the number of staic features are 35 and dynamic features are 27 and the
data is available from 1979-01-01 to 2020-09-30.
"""
[docs]
def __init__(
self,
path:Union[str, os.PathLike] = None,
gsha_path:Union[str, os.PathLike] = None,
overwrite:bool=False,
verbosity:int=1,
**kwargs):
super().__init__(
path=path,
gsha_path=gsha_path,
overwrite=overwrite,
verbosity=verbosity,
**kwargs)
self.areas = [
"CANTABRICO", "DUERO", "EBRO", "GALICIA COSTA",
"GUADALQUIVIR", "GUADIANA", "JUCAR", "MIÑO-SIL",
"SEGURA", "TAJO"
]
@property
def static_map(self) -> Dict[str, str]:
return {
'area': catchment_area(),
'lat': gauge_latitude(),
'long': gauge_longitude(),
}
@property
def end(self)->pd.Timestamp:
return pd.Timestamp('2020-09-30')
@property
def agency_name(self)->str:
return 'AFD'
[docs]
def daily_q_all_areas(self)->pd.DataFrame:
"""Daily data of gauging stations in river from all areas
Retuns
------
16_806_305 rows x 3
"""
dfs = []
for area in self.areas:
df = self.daily_q_area(area)
dfs.append(df)
return pd.concat(dfs)
[docs]
def daily_q_area(self, area:str)->pd.DataFrame:
"""Reads Daily data of gauging stations in river which is in afliq.csv file"""
url = f"https://ceh-flumen64.cedex.es/anuarioaforos//anuario-2019-2020/{area}/afliq.csv"
df = pd.read_csv(url, #os.path.join(self.path, area, "afliq.csv"),
sep=';')
idx = pd.to_datetime(df.pop('fecha'), dayfirst=True)
df.index = idx
df.index.name = "date"
df.columns = ['stations', 'height_m', "q_cms"]
return df
[docs]
def get_q(self, as_dataframe:bool=True):
"""
returns daily q of all stations
Returns
-------
pd.DataFrame
a pandas dataframe of shape (39721, 1447)
"""
fpath = os.path.join(self.path, 'daily_q.csv')
if os.path.exists(fpath):
data= pd.read_csv(fpath, index_col='Unnamed: 0')
data.index = pd.to_datetime(data.index)
data.index.name = 'time'
if as_dataframe:
return data
return xr.Dataset({stn: xr.DataArray(data.loc[:, stn]) for stn in data.columns})
q = self.daily_q_all_areas()
st = []
en = []
for g_name, grp in q.groupby('stations'):
st.append(grp.sort_index().index[0])
en.append(grp.sort_index().index[-1])
start = pd.to_datetime(st).sort_values()[0]
end = pd.to_datetime(en).sort_values()[-1]
daily_qs = []
for stn, stn_df in q.groupby('stations'):
q_ts = pd.Series(name=stn,
index=pd.date_range(start, end=end, freq="d"),
dtype=np.float32)
q_ts[stn_df.index] = stn_df['q_cms']
daily_qs.append(q_ts)
data = pd.concat(daily_qs, axis=1)
data.to_csv(fpath)
data.index.name = 'time'
if as_dataframe:
return data
return xr.Dataset({stn: xr.DataArray(data.loc[:, stn]) for stn in data.columns})
[docs]
class Thailand(_GSHA):
"""
Data of 73 catchments of Thailand from
`RID project <https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/rid-river/disc_d.html>`_ .
The meteorological data static catchment features and catchment boundaries
taken from `GSHA <https://doi.org/10.5194/essd-16-1559-2024>`_ project. Therefore,
the number of staic features are 35 and dynamic features are 27 and the
data is available from 1980-01-01 to 1999-12-31.
"""
url = {
'disc_d_1980_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1980_RIDall.zip',
'disc_d_1981_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1981_RIDall.zip',
'disc_d_1982_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1982_RIDall.zip',
'disc_d_1983_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1983_RIDall.zip',
'disc_d_1984_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1984_RIDall.zip',
'disc_d_1985_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1985_RIDall.zip',
'disc_d_1986_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1986_RIDall.zip',
'disc_d_1987_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1987_RIDall.zip',
'disc_d_1988_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1988_RIDall.zip',
'disc_d_1989_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1989_RIDall.zip',
'disc_d_1990_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1990_RIDall.zip',
'disc_d_1991_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1991_RIDall.zip',
'disc_d_1992_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1992_RIDall.zip',
'disc_d_1993_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1993_RIDall.zip',
'disc_d_1994_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1994_RIDall.zip',
'disc_d_1995_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1995_RIDall.zip',
'disc_d_1996_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1996_RIDall.zip',
'disc_d_1997_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1997_RIDall.zip',
'disc_d_1998_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1998_RIDall.zip',
'disc_d_1999_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1999_RIDall.zip',
}
[docs]
def __init__(
self,
path:Union[str, os.PathLike] = None,
gsha_path:Union[str, os.PathLike] = None,
overwrite:bool=False,
verbosity:int=1,
**kwargs):
super().__init__(
path=path,
gsha_path=gsha_path,
overwrite=overwrite,
verbosity=verbosity,
**kwargs)
self._download(overwrite=overwrite)
@property
def static_map(self) -> Dict[str, str]:
return {
'area': catchment_area(),
'lat': gauge_latitude(),
'long': gauge_longitude(),
}
@property
def agency_name(self)->str:
return 'RID'
@property
def start(self)->pd.Timestamp:
return pd.Timestamp('1980-01-01')
@property
def end(self)->pd.Timestamp:
return pd.Timestamp('1999-12-31')
[docs]
def get_q(self, as_dataframe:bool=True):
"""reads q"""
fpath = os.path.join(self.path, 'daily_q.csv')
if os.path.exists(fpath):
if self.verbosity:
print(f"Reading {fpath}")
data = pd.read_csv(fpath, index_col=0, parse_dates=True)
if as_dataframe:
return data
return xr.Dataset({stn: xr.DataArray(data.loc[:, stn]) for stn in data.columns})
datas = []
for year in range(1980, 2000):
data = self._read_year(year)
datas.append(data)
data = pd.concat(datas)
#data.columns = [column.replace('.', '_') for column in data.columns.tolist()]
data.index.name = 'time'
if self.verbosity:
print(f"Writing {fpath}")
data.to_csv(fpath)
if as_dataframe:
return data
return xr.Dataset({stn: xr.DataArray(data.loc[:, stn]) for stn in data.columns})
def _read_year(self, year:int):
year_path = os.path.join(self.path, f'disc_d_{year}_RIDall')
yr_dfs = []
stn_ids = []
ndays = 365
if year%4==0:
ndays = 366
for file in os.listdir(year_path):
fpath = os.path.join(year_path, file)
df = pd.read_csv(
fpath,
sep='\t',
names=['index', 'q_cms'],
nrows=ndays,
na_values=-9999.0,
)
df.index = pd.to_datetime(df.pop('index'))
yr_dfs.append(df)
stn_id = file.split('RID')[1].split('_m3s')[0]
stn_ids.append(stn_id)
df = pd.concat(yr_dfs, axis=1)
df.columns = stn_ids
return df