Source code for aqua_fetch.wq._swatch


__all__ = ["SWatCh"]

import os
from typing import Union

import numpy as np
import pandas as pd

from aqua_fetch._datasets import Datasets


[docs] class SWatCh(Datasets): """ The Surface Water Chemistry (SWatCh) database of 27 variables from 26322 locations as introduced in `Lobke et al., 2022 <https://doi.org/10.5194/essd-14-4667-2022>`_ . It should be noted not all the variables are available for all the locations. Following are the variables available in the dataset: - Total Phosphorus, mixed forms - Sulfate - pH - Temperature, water - Chloride - Magnesium - Calcium - Sodium - Potassium - Aluminum - Nitrate - Nitrite - Fluoride - Hardness, carbonate - Iron - Ammonium - Organic carbon - Bicarbonate - Orthophosphate - Gran acid neutralizing capacity - Alkalinity, total - Inorganic carbon - Carbonate - Alkalinity, carbonate - Hardness, non-carbonate - Carbon Dioxide, free CO2 - Alkalinity, Phenolphthalein (total hydroxide+1/2 carbonate) Examples -------- Examples -------- >>> from water.datasets import Swatch >>> ds = Swatch() >>> df = ds.fetch() >>> df.shape (3901296, 6) >>> len(ds.parameters) 22 >>> len(ds.sites) 26322 >>> coords = ds.stn_coords() >>> coords.shape (26322, 2) """ url = "https://zenodo.org/record/6484939"
[docs] def __init__(self, remove_csv_after_download=False, path=None, **kwargs): """ parameters ---------- remove_csv_after_download : bool (default=False) if True, the csv will be removed after downloading and processing. """ super().__init__(path=path, **kwargs) self.path = path self._download(tolerate_error=True) self._maybe_to_binary() if remove_csv_after_download: if os.path.exists(self.csv_name): os.remove(self.csv_name)
@property def parameters(self)->list: """list of water quality parameters available""" return list(self.names.values()) @property def sites(self)->list: """list of site names""" all_sites = np.load(os.path.join(self.path, 'loc_id.npy'), allow_pickle=True) # numpy's unique is much slower return list(np.sort(pd.unique(all_sites))) @property def site_names(self)->list: """list of site names""" all_sites = np.load(os.path.join(self.path, 'location.npy'), allow_pickle=True) # numpy's unique is much slower return list(np.sort(pd.unique(all_sites))) @property def csv_name(self)->str: return os.path.join(self.path, "SWatCh_v2.csv") @property def npy_files(self)->list: return [fname for f in os.walk(self.path) for fname in f[2] if fname.endswith('.npy')] def read_csv(self)->pd.DataFrame: df = pd.read_csv(self.csv_name, dtype = {col: str for col in CATS}.update( { #'ActivityStartDate': np.float32, "ResultAnalyticalMethodID": str, "ActivityDepthHeightMeasure": np.float32, 'ResultValue': np.float32, 'ResultDetectionQuantitationLimitMeasure': np.float32, 'MonitoringLocationLatitude': np.float32, 'MonitoringLocationLongitude': np.float32, }), ) h = {col: "category" for col in CATS} dates = pd.to_datetime(df.pop("ActivityStartDate") + " " + df.pop("ActivityStartTime")) df.index = dates maybe_reduce_memory(df, hints=h) strings = ["ResultComment", "ResultAnalyticalMethodID", "MonitoringLocationID", "MonitoringLocationName"] for col in strings: df[col] = df[col].astype(str) df.rename(columns=self.names, inplace=True) return df
[docs] def stn_coords(self): """ Returns the coordinates of all the stations in the dataset Returns ------- pd.DataFrame A dataframe with columns 'lat', 'long' """ df = self.fetch(parameters=['lat', 'long', 'location']) return df.drop_duplicates( subset=['location'])[['lat', 'long', 'location']].set_index('location').astype(np.float32)
def _maybe_to_binary(self): """reads the csv file and saves each columns in binary format using numpy. The csv file is 1.5 GB which takes lot of time for loading most the columns are not required most of the times. """ if len(self.npy_files) == 28: return df = self.read_csv() for col in df.columns: np.save(os.path.join(self.path, col), df[col].values) np.save(os.path.join(self.path, "index"), df.index) return def _load_as_df(self, parameters)->pd.DataFrame: paras = [] for para in parameters: paras.append(np.load(os.path.join(self.path, f"{para}.npy"), allow_pickle=True)) index = np.load(os.path.join(self.path, "index.npy"), allow_pickle=True) return pd.DataFrame(np.column_stack(paras), columns=parameters, index=pd.to_datetime(index))
[docs] def fetch( self, parameters: Union[list, str] = None, station_id: Union[list, str] = None, station_names: Union[list, str] = None, )->pd.DataFrame: """ parameters ---------- parameters : str/list (default=None) Names of parameters to fetch. By default, ``name``, ``value``, ``val_unit``, ``location``, ``lat``, and ``long`` are read. station_id : str/list (default=None) name/names of station id for which the data is to be fetched. By default, the data for all stations is fetched. If given, then ``station_names`` should not be given. station_names : str/list (default=None) name/names of station id for which the data is to be fetched. By default, the data for all stations is fetched. If given, then ``station_id`` should not be given. Returns ------- pd.DataFrame Examples -------- >>> from water.datasets import Swatch >>> ds = Swatch() >>> df = ds.fetch() >>> df.shape (3901296, 6) >>> st_name = "Jordan Lake" >>> df = df[df['location'] == st_name] >>> df.shape (4, 6) """ def_paras = ["name", "value", "val_unit", "lat", "long"] if station_id is not None and station_names is not None: raise ValueError(f"Either station_id or station_names should be given. Not both.") if station_id is not None: loc = "loc_id" else: loc = "location" def_paras.append(loc) if parameters is None: parameters = def_paras if isinstance(parameters, str): parameters = [parameters] assert isinstance(parameters, list) df = self._load_as_df(parameters) return df
[docs] def num_samples( self, parameter, station_id = None, )->int: """ parameters ---------- parameter : str name of the water quality parameter whose samples are to be quantified. station_id : if given, samples of parameter will be returned for only this site/sites otherwise for all sites """ raise NotImplementedError
@property def names(self)->dict: """tells the names of parameters in this class and their original names in SWatCh dataset in the form of a python dictionary """ return { "LaboratoryName": "lab_name", 'ActivityDepthHeightMeasure': "depth_height", 'ActivityDepthHeightUnit': "depth_height_unit", "ActivityMediaName": "act_name", "ActivityType": "ActivityType", "MonitoringLocationHorizontalCoordinateReferenceSystem": "coord_system", 'MonitoringLocationLongitude': "long", 'MonitoringLocationLatitude': "lat", "CharacteristicName": "name", "ResultValue": "value", "ResultValueType": "val_type", "MonitoringLocationName": "location", "MonitoringLocationID": "loc_id", "MonitoringLocationType": "loc_type", "ResultDetectionQuantitationLimitType": "detect_limit", "ResultDetectionQuantitationLimitUnit": "detect_limit_type", "ResultDetectionQuantitationLimitMeasure": "detect_limit_measure", "ResultDetectionCondition": "detect_cond", "ResultAnalyticalMethodName": "method_name", "ResultAnalyticalMethodContext": "method_context", "ResultAnalyticalMethodID": "method_id", "ResultUnit": "val_unit", }
CATS = ['ActivityDepthHeightUnit', 'ActivityMediaName', 'ActivityType', 'CharacteristicName', 'DatasetName', 'LaboratoryName', 'MethodSpeciation', 'MonitoringLocationHorizontalCoordinateReferenceSystem', 'MonitoringLocationType', 'ResultAnalyticalMethodContext', 'ResultDetectionCondition', 'ResultDetectionQuantitationLimitType', 'ResultDetectionQuantitationLimitUnit', 'ResultSampleFraction', 'ResultStatusID', 'ResultUnit', 'ResultValueType' ] def int8(array:Union[np.ndarray, pd.Series])->bool: return array.min() > np.iinfo(np.int8).min and array.max() < np.iinfo(np.int8).max def int16(array:Union[np.ndarray, pd.Series])->bool: return array.min() > np.iinfo(np.int16).min and array.max() < np.iinfo(np.int16).max def int32(array:Union[np.ndarray, pd.Series])->bool: return array.min() > np.iinfo(np.int32).min and array.max() < np.iinfo(np.int32).max def int64(array:Union[np.ndarray, pd.Series])->bool: return array.min() > np.iinfo(np.int64).min and array.max() < np.iinfo(np.int64).max def float16(array:Union[np.ndarray, pd.Series])->bool: return array.min() > np.finfo(np.float16).min and array.max() < np.finfo(np.float16).max def float32(array:Union[np.ndarray, pd.Series])->bool: return array.min() > np.finfo(np.float32).min and array.max() < np.finfo(np.float32).max def maybe_convert_int(series:pd.Series)->pd.Series: if int8(series): return series.astype(np.int8) if int16(series): return series.astype(np.int16) if int32(series): return series.astype(np.int32) if int64(series): return series.astype(np.int64) return series def maybe_convert_float(series:pd.Series)->pd.Series: if float16(series): return series.astype(np.float16) if float32(series): return series.astype(np.float32) return series def memory_usage(dataframe): return round(dataframe.memory_usage().sum() / 1024**2, 4) def maybe_reduce_memory(dataframe:pd.DataFrame, hints=None)->pd.DataFrame: init_memory = memory_usage(dataframe) _hints = {col:dataframe[col].dtype.name for col in dataframe.columns} if hints: _hints.update(hints) for col in dataframe.columns: col_dtype = dataframe[col].dtype.name if 'int' in _hints[col]: dataframe[col] = maybe_convert_int(dataframe[col]) elif 'float' in _hints[col]: dataframe[col] = maybe_convert_float(dataframe[col]) elif 'int' in col_dtype: dataframe[col] = maybe_convert_int(dataframe[col]) elif 'float' in col_dtype or 'float' in _hints[col]: dataframe[col] = maybe_convert_float(dataframe[col]) elif col_dtype in ['object'] and 'cat' in _hints[col]: dataframe[col] = dataframe[col].astype('category') print(f"memory reduced from {init_memory} to {memory_usage(dataframe)}") return dataframe