Source code for aqua_fetch.rr._usgs


import os
import re
import time
import requests
from io import StringIO
from typing import List, Union, Dict
from requests.exceptions import JSONDecodeError
from concurrent.futures import ProcessPoolExecutor

import numpy as np
import pandas as pd

from .camels import Camels
from .._backend import netCDF4, shapefile, xarray as xr
from ..utils import get_cpus
from ..utils import check_attributes
from ._hysets import HYSETS

from ._map import (
    catchment_area,
    gauge_latitude,
    gauge_longitude,
    slope
    )

DAILY_START = "1820-01-01"
DAILY_END = "2024-05-30"
HOURLY_START = "1910-01-01"
HOURLY_END = "2024-05-30"


[docs] class USGS(Camels): """ This class handles the hydrometeorological data for the USA. The daily and hourly discharge data is downloaded from `usgs/nwis website <https://waterservices.usgs.gov/nwis/dv>`_ . The data is optionally stored in a netCDF file if xarray is available. Currently the data is downloaded for only those sites/catchments that are in the `HYSETS database <https://doi.org/10.1038/s41597-020-00583-2>`_. This is because the catchment boundaries are taken from HYSETS database using :py:class:`water_datasets.HYSETS`. For hourly timestep, "iv" service is used to download the instantaneous data which is then resampled to hourly data. Data with only ``A, [92]``, ``A, [91]``, ``A, [93]``, ``A, e``, ``A`` flags is used. For daily streamflow, "dv" service is used to download the data. In this case, the data with only ``A`` and ``A, e`` flags is used. """
[docs] def __init__( self, path:Union[str, os.PathLike] = None, hysets_path: Union[str, os.PathLike] = None, verbosity:int = 1, **kwargs ): """ Parameters ---------- path : str Path to store the data """ super().__init__(path, verbosity=verbosity, **kwargs) if hysets_path is None: hysets_path = os.path.join(os.path.dirname(self.path), 'HYSETS') if os.path.exists(hysets_path): self.hysets_path = hysets_path self.hysets = HYSETS(path = hysets_path, verbosity=verbosity) self.hysets_path = self.hysets.path self._stations = self.__stations() self.metadata = maybe_make_and_get_metadata(self.path, self.stations()) self._static_features = self.__static_features() self.boundary_file = self.hysets.boundary_file
@property def static_map(self) -> Dict[str, str]: return { 'Drainage_Area_km2': catchment_area(), 'Centroid_Lat_deg_N': gauge_latitude(), 'Slope_deg': slope('degrees'), 'Centroid_Lon_deg_E': gauge_longitude(), } @property def start(self)->str: return "19500101" @property def end(self)->str: return "20181231" @property def dynamic_features(self)->List[str]: return ['obs_q_cms'] + self.hysets.dynamic_features[1:] def stations(self)->List[str]: return self._stations @property def static_features(self)->List[str]: return self._static_features @property def _q_name(self)->str: return 'obs_q_cms'
[docs] def get_boundary( self, catchment_id: str, as_type: str = 'numpy' ): """ returns boundary of a catchment in a required format Parameters ---------- catchment_id : str name/id of catchment as_type : str 'numpy' or 'geopandas' Examples -------- >>> from water_datasets import USGS >>> dataset = USGS() >>> dataset.get_boundary(dataset.stations()[0]) """ if shapefile is None: raise ModuleNotFoundError("shapefile module is not installed. Please install it to use boundary file") from shapefile import Reader bndry_sf = Reader(self.boundary_file) bndry_shp = bndry_sf.shape(self.hysets.bndry_id_map[catchment_id]) bndry_sf.close() xyz = np.array(bndry_shp.points) xyz = self.transform_coords(xyz) return xyz
[docs] def area( self, stations: Union[str, List[str]] = 'all', ) ->pd.Series: """ Returns area_gov (Km2) of all catchments as pandas series parameters ---------- stations : str/list name/names of stations. Default is None, which will return area of all stations Returns -------- pd.Series a pandas series whose indices are catchment ids and values are areas of corresponding catchments. Examples --------- >>> from water_datasets import USGS >>> dataset = USGS() >>> dataset.area() # returns area of all stations >>> dataset.area('912101A') # returns area of station whose id is 912101A >>> dataset.area(['912101A', '12388200']) # returns area of two stations """ stations = check_attributes(stations, self.stations(), 'stations') area = self.metadata['drain_area_va'] area.name = 'area' return area.loc[stations]
[docs] def fetch_static_features( self, stations: Union[str, List[str]] = "all", static_features:Union[str, List[str]] = "all", st=None, en=None, as_ts=False ) -> pd.DataFrame: """ returns static atttributes of one or multiple stations Parameters ---------- stations : str name/id of station of which to extract the data static_features : list/str, optional (default="all") The name/names of features to fetch. By default, all available static features are returned. st : en : as_ts : Examples --------- >>> from water_datasets import USGS >>> dataset = USGS() get the names of stations >>> stns = dataset.stations() >>> len(stns) 12004 get all static data of all stations >>> static_data = dataset.fetch_static_features(stns) >>> static_data.shape (12004, 27) get static data of one station only >>> static_data = dataset.fetch_static_features('01010070') >>> static_data.shape (1, 27) get the names of static features >>> dataset.static_features get only selected features of all stations >>> static_data = dataset.fetch_static_features(stns, ['Drainage_Area_km2', 'Elevation_m']) >>> static_data.shape (12004, 2) """ stations = check_attributes(stations, self.stations()) map_ = self.hysets.OfficialID_WatershedID_map stations = [int(map_[stn]) for stn in stations] static_feats = self.hysets.fetch_static_features(stations, static_features, st, en, as_ts) static_feats.set_index('Official_ID', inplace=True) return static_feats
[docs] def stn_coords( self, stations:Union[str, List[str]] = 'all' ) ->pd.DataFrame: """ returns coordinates of stations as DataFrame with ``long`` and ``lat`` as columns. Parameters ---------- stations : name/names of stations. If not given, coordinates of all stations will be returned. Returns ------- coords : pandas DataFrame with ``long`` and ``lat`` columns. The length of dataframe will be equal to number of stations wholse coordinates are to be fetched. Examples -------- >>> dataset = USGS() >>> dataset.stn_coords() # returns coordinates of all stations >>> dataset.stn_coords('01010000') # returns coordinates of station whose id is 912101A >>> dataset.stn_coords(['01010000', '01010070']) # returns coordinates of two stations """ stations = check_attributes(stations, self.stations(), 'stations') coords = self.metadata.loc[:, ['dec_long_va', 'dec_lat_va']] coords.rename(columns={'dec_long_va': 'long', 'dec_lat_va': 'lat'}, inplace=True) return coords.loc[stations, :]
[docs] def fetch_stations_features( self, stations: list, dynamic_features: Union[str, list, None] = 'all', static_features: Union[str, list, None] = None, st=None, en=None, as_dataframe: bool = False, **kwargs ): """ returns features of multiple stations Examples -------- >>> from water_datasets import USGS >>> dataset = USGS() >>> stations = dataset.stations()[0:3] >>> features = dataset.fetch_stations_features(stations) """ stations = check_attributes(stations, self.stations(), 'stations') #stations = [int(stn) for stn in stations] if dynamic_features is not None: dyn = self._fetch_dynamic_features(stations=stations, dynamic_features=dynamic_features, as_dataframe=as_dataframe, st=st, en=en, **kwargs ) if static_features is not None: # we want both static and dynamic to_return = {} static = self._fetch_static_features(station=stations, static_features=static_features, st=st, en=en, **kwargs ) to_return['static'] = static to_return['dynamic'] = dyn else: to_return = dyn elif static_features is not None: # we want only static to_return = self._fetch_static_features( station=stations, static_features=static_features, **kwargs ) else: raise ValueError(f"static features are {static_features} and dynamic features are {dynamic_features}") return to_return
def _fetch_dynamic_features( self, stations: list, dynamic_features = 'all', st=None, en=None, as_dataframe=False, as_ts=False ): """Fetches dynamic features of station.""" st, en = self._check_length(st, en) features = check_attributes(dynamic_features, self.dynamic_features.copy(), 'dynamic_features') daily_q = None if 'obs_q_cms' in features: # todo: we are reading all file even for one station daily_q = self._q(as_dataframe) if isinstance(daily_q, xr.Dataset): daily_q = daily_q.sel(time=slice(st, en))[stations] else: daily_q = daily_q.loc[st:en, stations] features.remove('obs_q_cms') if len(features) == 0: return daily_q stations_ = [int(self.hysets.OfficialID_WatershedID_map[stn]) for stn in stations] data = self.hysets._fetch_dynamic_features(stations_, features, st, en, as_dataframe) if daily_q is not None: if isinstance(daily_q, xr.Dataset): assert isinstance(data, xr.Dataset), "xarray dataset not supported" data = data.rename({int(k)-1:v for k,v in self.hysets.WatershedID_OfficialID_map.items() if v in stations}) # first create a new dimension in daily_q named dynamic_features daily_q = daily_q.expand_dims({'dynamic_features': ['obs_q_cms']}) data = xr.concat([data, daily_q], dim='dynamic_features') else: # -1 because the data in .nc files hysets starts with 0 data.rename(columns={int(k)-1:v for k,v in self.hysets.WatershedID_OfficialID_map.items()}, inplace=True) assert isinstance(data.index, pd.MultiIndex) # data is multiindex dataframe but daily_q is not # first make daily_q multiindex daily_q['dynamic_features'] = 'daily_q' daily_q.set_index('dynamic_features', append=True, inplace=True) daily_q = daily_q.reorder_levels(['time', 'dynamic_features']) data = pd.concat([data, daily_q], axis=0).sort_index() #data = data.join(daily_q, how='outer') return data def _fetch_static_features( self, station="all", static_features: Union[str, list] = 'all', st=None, en=None, as_ts=False )->pd.DataFrame: """Fetches static features of station.""" if self.verbosity>1: print('fetching static features') stations = check_attributes(station, self.stations(), 'stations') map_ = self.hysets.OfficialID_WatershedID_map stations = [int(map_[stn]) for stn in stations] static_feats = self.hysets.fetch_static_features(stations, static_features, st, en, as_ts).copy() static_feats = static_feats.set_index('Official_ID') return static_feats def __static_features(self)->List[str]: if self.verbosity>1: print('getting static features from hysets') static_feats = self.hysets.static_features static_feats.remove('Official_ID') return static_feats def _q(self, as_dataframe:bool=None, read_csv_kwargs:dict=None): if netCDF4 is None: ext = 'csv' else: ext = 'nc' if self.timestep.lower().startswith('d'): fname = f'daily_q.{ext}' else: fname = f'hourly_q.{ext}' if self.verbosity>1: print(f'reading {fname}') fpath = os.path.join(self.path, fname) if not os.path.exists(fpath): print(f"{fpath} not found. Downloading data storing it in {fpath}") self._make_csv(cpus=None) if ext == 'csv': return pd.read_csv(fpath, index_col=0, **read_csv_kwargs) else: if as_dataframe: return xr.open_dataset(fpath).to_dataframe() return xr.open_dataset(fpath) def __stations(self)->List[str]: if self.verbosity>1: print('getting stations') dataset = self._q(read_csv_kwargs=dict(nrows=2)) if isinstance(dataset, xr.Dataset): # get names of all variables return list(dataset.data_vars.keys()) else: return dataset.columns.tolist() def _make_csv( self, cpus:int = None, ): df = pd.read_csv(os.path.join(self.hysets_path, "HYSETS_watershed_properties.txt"), sep=";") sites = df.loc[df['Source']=='USGS']['Official_ID'] if cpus is None: cpus = get_cpus() - 2 if not os.path.exists(self.path): os.makedirs(self.path) make_daily_q(self.path, sites, cpus) print(f"Downloaded daily data and stored in {self.path}/daily_q.nc") #make_hourly_q(self.path, sites[9000:10000], cpus=cpus) #print(f"Downloaded hourly data and stored in {self.path}/hourly_q.nc") return
def download_metadata( site:str )->pd.DataFrame: try: metadata = _download_metadata(site) except (ValueError, IndexError): print(f"Site: {site} ValueError/IndexError") # create dataframe with nan values metadata = pd.DataFrame( np.array([np.nan, np.nan, np.nan]).reshape(1,3), columns=['dec_lat_va', 'dec_long_va', 'drain_area_va'] ) metadata = metadata[['dec_lat_va', 'dec_long_va', 'drain_area_va']] metadata.index = [site] return metadata def maybe_make_and_get_metadata( path:str, sites:List[str], cpus:int=None )->pd.DataFrame: cpus = get_cpus()-2 if cpus is None else cpus fpath = os.path.join(path, 'metadata.csv') if os.path.exists(fpath): return pd.read_csv( fpath, index_col='site_no', dtype={'site_no': str, "dec_lat_va": float, "dec_long_va": float, "drain_area_va": float}) print(f"Downloading metadata for {len(sites)} sites using {cpus} cpus") start = time.time() with ProcessPoolExecutor(max_workers=cpus) as executor: data = executor.map(download_metadata, sites) total = round((time.time() - start)/60, 2) print(f"Time taken to download metadata: {total} mins with {cpus} cpus") start = time.time() metadata = pd.concat(list(data)) # convert drain_area_va from sq miles to sq km metadata['drain_area_va'] = metadata['drain_area_va'] * 2.58999 metadata.index.name = "site_no" metadata.to_csv(fpath, index_label='site_no') return metadata def make_daily_q( path:str, sites:List[str], cpus:int, verbosity:int=1 ): if verbosity: print(f"Downloading daily data for {len(sites)} sites using {cpus} cpus") daily_files_path = os.path.join(path, 'daily_files') if not os.path.exists(daily_files_path): os.makedirs(daily_files_path) start = time.time() with ProcessPoolExecutor(max_workers=cpus) as executor: data = executor.map(download_daily_record, sites) total = round((time.time() - start)/60, 2) if verbosity: print(f"Time taken to download data: {total} mins with {cpus} cpus") start = time.time() if netCDF4 is None: save_daily_q_as_csv(path, data) else: save_daily_q_as_nc(path, data) total = round((time.time() - start)/60, 2) print(f"Time taken: to store {total} mins") return def save_daily_q_as_csv(path, data): df = pd.DataFrame(data) df.to_csv(os.path.join(path, 'daily_q.csv'), index=True) return def save_daily_q_as_nc(path, data): from netCDF4 import Dataset, date2num ncfile = Dataset(os.path.join(path, 'daily_q.nc'), mode='w', format='NETCDF4') _ = ncfile.createDimension('time', None) # unlimited dimension new_idx = pd.date_range(start=DAILY_START, end=DAILY_END, freq='D') time_var = ncfile.createVariable('time', 'f8', ('time',)) time_var.units = 'days since 1820-01-01 00:00:00' time_var.calendar = 'gregorian' time_var[:] = date2num(new_idx.to_pydatetime(), units=time_var.units, calendar=time_var.calendar) for idx, ts in enumerate(data): # create a variable for each column col_var = ncfile.createVariable(str(ts.name), datatype='f4', dimensions=('time',), complevel=4, compression='zlib', shuffle=True, least_significant_digit=4, ) # ts may have different index than new_idx/tim_demension, so we need to reindex new_ts = ts.reindex(index=new_idx) col_var[:] = new_ts.values col_var.units = "cms" col_var.description = "daily discharge" if idx % 100 == 0: print(f"Saved data for {idx} sites in .nc file") ncfile.close() return def _read_json(json): """ Following code is taken and modified after dataretrieval.utils Reads a NWIS Water Services formatted JSON into a ``pandas.DataFrame``. Parameters ---------- json: dict A JSON dictionary response to be parsed into a ``pandas.DataFrame`` Returns ------- df: ``pandas.DataFrame`` Times series data from the NWIS JSON md: :obj:`dataretrieval.utils.Metadata` A custom metadata object """ merged_df = pd.DataFrame(columns=["site_no", "datetime"]) site_list = [ ts["sourceInfo"]["siteCode"][0]["value"] for ts in json["value"]["timeSeries"] ] # create a list of indexes for each change in site no # for example, [0, 21, 22] would be the first and last indeces index_list = [0] index_list.extend( [i + 1 for i, (a, b) in enumerate(zip(site_list[:-1], site_list[1:])) if a != b] ) index_list.append(len(site_list)) for i in range(len(index_list) - 1): start = index_list[i] # [0] end = index_list[i + 1] # [21] # grab a block containing timeseries 0:21, # which are all from the same site site_block = json["value"]["timeSeries"][start:end] if not site_block: continue site_no = site_block[0]["sourceInfo"]["siteCode"][0]["value"] site_df = pd.DataFrame(columns=["datetime"]) for timeseries in site_block: param_cd = timeseries["variable"]["variableCode"][0]["value"] # check whether min, max, mean record XXX option = timeseries["variable"]["options"]["option"][0].get("value") # loop through each parameter in timeseries, then concat to the merged_df for parameter in timeseries["values"]: col_name = param_cd method = parameter["method"][0]["methodDescription"] # if len(timeseries['values']) > 1 and method: if method: # get method, format it, and append to column name method = method.strip("[]()").lower() col_name = f"{col_name}_{method}" if option: col_name = f"{col_name}_{option}" record_json = parameter["value"] if not record_json: # no data in record continue # should be able to avoid this by dumping record_json = str(record_json).replace("'", '"') # read json, converting all values to float64 and all qualifiers # Lists can't be hashed, thus we cannot df.merge on a list column record_df = pd.read_json( StringIO(record_json), orient="records", dtype={"value": "float64", "qualifiers": "unicode"}, convert_dates=False, ) record_df["qualifiers"] = ( record_df["qualifiers"].str.strip("[]").str.replace("'", "") ) record_df.rename( columns={ "value": col_name, "dateTime": "datetime", "qualifiers": col_name + "_cd", }, inplace=True, ) site_df = site_df.merge(record_df, how="outer", on="datetime") # end of site loop site_df["site_no"] = site_no merged_df = pd.concat([merged_df, site_df]) # convert to datetime, normalizing the timezone to UTC when doing so if "datetime" in merged_df.columns: merged_df["datetime"] = pd.to_datetime(merged_df["datetime"], utc=True) return merged_df def download_daily_q_nwis( site:str = '14105700', start = '1820-01-01', end='2024-05-30' )->pd.DataFrame: response = requests.get( "https://waterservices.usgs.gov/nwis/dv", params={'format': 'json', 'parameterCd': '00060', 'sites': site, 'startDT': start, 'endDT': end, 'multi_index': None}, headers={"user-agent": f"python-dataretrieval/1.0.11"}, verify=True) if response.status_code in [400, 404, 414]: raise ValueError(f"Bad Request, check that your parameters are correct. URL: {response.url}") try: site_data = _read_json(response.json()) except JSONDecodeError: site_data = pd.DataFrame() print(f"Site: {site} JSONDecodeError") if 'datetime' in site_data.columns: site_data.index = pd.to_datetime(site_data.pop('datetime')) return site_data def download_daily_record( site:str, )->pd.Series: # todo: should we store the data in csv files so that we don't have to download it again? # but using 110 cores for all 12004 sites takes around 5 minutes which is not that bad # given that it has to happen only once! #fpath = os.path.join(path, f"{site}.csv") # if os.path.exists(fpath): # return pd.read_csv(fpath, index_col=0) site_data = download_daily_q_nwis(site, start="1820-01-01", # DAILY_START end="2024-05-30", # DAILY_END ) if f'00060_Mean' in site_data.columns: # get data for stations which have A in 00060_Mean_cd column site_data = site_data[site_data['00060_Mean_cd'].isin(["A", "A, e"])] site_data = site_data['00060_Mean'] elif '00060_upstream site_Mean' in site_data.columns: site_data = site_data[site_data['00060_upstream site_Mean_cd'].isin(["A", "A, e"])] site_data = site_data['00060_upstream site_Mean'] elif "00060_upstream of dam_Mean" in site_data.columns: # "01399100" has this column site_data = site_data[site_data['00060_upstream of dam_Mean_cd'].isin(["A", "A, e"])] site_data = site_data["00060_upstream of dam_Mean"] elif "00060_2_Mean" in site_data.columns: # "01401000" has this column site_data = site_data[site_data['00060_2_Mean_cd'].isin(["A", "A, e"])] site_data = site_data["00060_2_Mean"] elif "00060_published_Mean" in site_data.columns: # "02129590" has this column site_data = site_data[site_data['00060_published_Mean_cd'].isin(["A", "A, e"])] site_data = site_data["00060_published_Mean"] elif len(site_data) == 0: # return empty series site_data = pd.Series(name=f"{site}__empty", index = pd.date_range(start="2024-01-01", end="2024-05-30", freq='D') ) else: #print(f"Site: {site} has {site_data.columns}") site_data = pd.Series(name=f"{site}__empty", index = pd.date_range(start="2024-01-01", end="2024-05-30", freq='D') ) site_data.name = site site_data.index = site_data.index.tz_localize(None) return site_data * 0.028316847 # convert cfs to cms def format_response( df: pd.DataFrame, **kwargs ) -> pd.DataFrame: """Setup index for response from query. This function formats the response from the NWIS web services, in particular it sets the index of the data frame. This function tries to convert the NWIS response into pandas datetime values localized to UTC, and if possible, uses these timestamps to define the data frame index. Parameters ---------- df: ``pandas.DataFrame`` The data frame to format service: string, optional, default is None The NWIS service that was queried, important because the 'peaks' service returns a different format than the other services. **kwargs: optional Additional keyword arguments, e.g., 'multi_index' Returns ------- df: ``pandas.DataFrame`` The formatted data frame """ mi = kwargs.pop("multi_index", True) # check for multiple sites: if "datetime" not in df.columns: # XXX: consider making site_no index return df elif len(df["site_no"].unique()) > 1 and mi: # setup multi-index df.set_index(["site_no", "datetime"], inplace=True) if hasattr(df.index.levels[1], "tzinfo") and df.index.levels[1].tzinfo is None: df = df.tz_localize("UTC", level=1) else: df.set_index(["datetime"], inplace=True) if hasattr(df.index, "tzinfo") and df.index.tzinfo is None: df = df.tz_localize("UTC") return df.sort_index() def _read_rdb(rdb): """ Convert NWIS rdb table into a ``pandas.dataframe``. Parameters ---------- rdb: string A string representation of an rdb table Returns ------- df: ``pandas.dataframe`` A formatted pandas data frame """ count = 0 for line in rdb.splitlines(): # ignore comment lines if line.startswith("#"): count = count + 1 else: break fields = re.split("[\t]", rdb.splitlines()[count]) fields = [field.replace(",", "") for field in fields] dtypes = { "site_no": str, "dec_long_va": float, "dec_lat_va": float, "parm_cd": str, "parameter_cd": str, } df = pd.read_csv( StringIO(rdb), delimiter="\t", skiprows=count + 2, names=fields, na_values="NaN", dtype=dtypes, ) df = format_response(df) return df def _download_metadata(site:str)->pd.DataFrame: response = requests.get( 'https://waterservices.usgs.gov/nwis/site', params={'sites': site, 'parameterCd': '00060', 'siteOutput': 'Expanded', 'format': 'rdb'}, headers={'user-agent': 'python-dataretrieval/1.0.11'}, verify=True ) return _read_rdb(response.text)