Source code for aqua_fetch._datasets

# camels #
# https://confluence.ecmwf.int/display/COPSRV/GloFAS+Climate+Data+Store
# https://www.sciencedirect.com/journal/data-in-brief
# https://data.mendeley.com/datasets/5vzp6svhwh/4
# https://zenodo.org/record/4218413#.YA6w7BZS-Uk
# https://www.sciencedirect.com/search?qs=time%20series&pub=Data%20in%20Brief&cid=311593
# https://doi.pangaea.de/10.1594/PANGAEA.898217
# https://doi.pangaea.de/10.1594/PANGAEA.811992  # protected
# https://doi.pangaea.de/10.1594/PANGAEA.905446
# https://doi.pangaea.de/10.1594/PANGAEA.900958
# https://doi.pangaea.de/10.1594/PANGAEA.831193
# https://data.world/datagov-uk/223a8f60-e3ac-4a25-987d-587cc3a92fa1
# https://www.bafg.de/GRDC/EN/04_spcldtbss/41_ARDB/arcticHycos.html?nn=201574 flow dataset
# https://doi.pangaea.de/10.1594/PANGAEA.924561 Air temp Tiangin china


# https://zenodo.org/record/3712407#.YExYDtyRWUk
# https://zenodo.org/record/3844201#.YExYi9yRWUk
# https://zenodo.org/record/1471322#.YExYftyRWUk
# https://zenodo.org/record/3961605#.YExYcdyRWUk
# https://zenodo.org/record/1452383#.YExZRdyRWUk
# https://zenodo.org/record/4428151#.YExZPdyRWUk
# https://zenodo.org/record/3903238#.YExZItyRWUk
# https://zenodo.org/record/3670864#.YExZFdyRWUk
# https://zenodo.org/record/3834623#.YExY7tyRWUk
# https://zenodo.org/record/4029572#.YExY5NyRWUk
# https://zenodo.org/record/4552842#.YExaR9yRWUk
# https://zenodo.org/record/3466097#.YExaQ9yRWUk
# https://zenodo.org/record/4327078#.YExaCdyRWUk
# https://zenodo.org/record/3712397#.YExbsdyRWUk
# https://zenodo.org/record/3560706#.YExeztyRWUk
# https://zenodo.org/record/3698998#.YExekdyRWUk
# https://zenodo.org/record/3564237#.YExlh9yRWUl
# https://zenodo.org/record/581145#.YExeV9yRWUk
# https://zenodo.org/record/3978225#.YExeEtyRWUk
# https://zenodo.org/record/3763766#.YExdntyRWUk
# https://zenodo.org/record/3744217#.YExdi9yRWUk
# https://zenodo.org/record/3948568#.YExdeNyRWUk
# https://zenodo.org/record/3538207#.YExdbtyRWUk
# https://zenodo.org/record/1486058#.YExc-dyRWUk
# https://zenodo.org/record/3561032#.YExc7tyRWUk
# https://zenodo.org/record/1466038#.YExc3dyRWUk
# https://zenodo.org/record/581186#.YExcz9yRWUk
# https://zenodo.org/record/4572636#.YExcwNyRWUk
# https://zenodo.org/record/1267837#.YExcZNyRWUk
# https://zenodo.org/record/3808223#.YExcX9yRWUk
# https://zenodo.org/record/4447435#.YExcWtyRWUk
# https://zenodo.org/record/1300354#.YExcVdyRWUk
# https://zenodo.org/record/4308036#.YExcJdyRWUk
# https://zenodo.org/record/3459610#.YExhNNyRWUk
# https://zenodo.org/record/3763342#.YExhCdyRWUk
# https://zenodo.org/record/4559571#.YExhBNyRWUk
# https://zenodo.org/record/3663630#.YExg89yRWUk
# https://zenodo.org/record/4382937#.YExg7dyRWUk
# https://zenodo.org/record/3876148#.YExgUdyRWUk
# https://zenodo.org/record/3982962#.YExgTdyRWUk
# https://zenodo.org/record/2559480#.YExvWtyRWUk
# https://zenodo.org/record/4094684#.YExvS9yRWUk
# https://zenodo.org/record/2596929#.YExvP9yRWUk
# https://zenodo.org/record/977773#.YExvEtyRWUk
# https://zenodo.org/record/3520146#.YExu_tyRWUk
# https://zenodo.org/record/3836648#.YExu09yRWUk
# https://zenodo.org/record/4290294#.YExo5tyRWUk
# https://zenodo.org/record/2728636#.YEx4EdyRWUk
# https://zenodo.org/record/3581187#.YEx5CNyRWUk
# https://zenodo.org/record/3946242#.YEx5FtyRWUk
# https://zenodo.org/record/883100#.YEx5L9yRWUk
# https://zenodo.org/record/3239401#.YEx5gtyRWUk
# https://zenodo.org/record/4183611#.YEx5vNyRWUk
# https://zenodo.org/record/4559696#.YEx5xdyRWUk
# https://zenodo.org/record/3776011#.YEx6YdyRWUk
# https://zenodo.org/record/4315647#.YEx6v9yRWUk
# https://zenodo.org/record/1185084#.YEx77NyRWUk
# https://zenodo.org/record/4271209#.YEx7z9yRWUk
# https://zenodo.org/record/4570780#.YEx7y9yRWUk
# https://zenodo.org/record/3593395#.YEx7x9yRWUk
# https://zenodo.org/record/3632501#.YEx7qtyRWUk
# https://zenodo.org/record/1122635#.YEx7ndyRWUk
# https://zenodo.org/record/3893897#.YEx7gNyRWUk
# https://zenodo.org/record/4395737#.YEx7a9yRWUk
# https://zenodo.org/record/3779473#.YEx7aNyRWUk
# https://zenodo.org/record/1226394#.YEx7O9yRWUk
# https://zenodo.org/record/4391461#.YEx7MtyRWUk
# https://zenodo.org/record/4247833#.YEx7HtyRWUk
# https://zenodo.org/record/1486058#.YEx7G9yRWUk
# https://zenodo.org/record/3928587#.YEx7E9yRWUk
# https://zenodo.org/record/4341521#.YEx7DdyRWUk
# https://zenodo.org/record/3974871#.YEx7CdyRWUk
# https://zenodo.org/record/1298526#.YEx7B9yRWUk
# https://zenodo.org/record/57293#.YEx6_dyRWUk
# https://zenodo.org/record/4268711#.YEx6-9yRWUk
# https://zenodo.org/record/322827#.YEx69tyRWUk
# https://zenodo.org/record/1050301#.YEx6y9yRWUk
# https://zenodo.org/record/4734372#.YKc9QKGRWUk
# https://www.nature.com/articles/s41597-019-0288-y#Abs1
# https://catalogue-imos.aodn.org.au/geonetwork/srv/api/records/9e2ba32a-5da3-4ea5-b750-e6279680dd71
#https://daac.ornl.gov/cgi-bin/dsviewer.pl?ds_id=1566
# https://doi.pangaea.de/10.1594/PANGAEA.885860
# https://doi.pangaea.de/10.1594/PANGAEA.913939
# https://www.nature.com/articles/s41597-019-0346-5#Sec8


# ETP
# https://zenodo.org/record/4038399#.YEx6INyRWUk
# https://zenodo.org/record/4601596#.YEx6M9yRWUk
# https://zenodo.org/record/3981919#.YEx6ONyRWUk
# https://zenodo.org/record/4271331#.YEx6PdyRWUk
# https://zenodo.org/record/3726856#.YEx6RdyRWUk
# https://zenodo.org/record/4580292#.YEx6TtyRWUk
# https://zenodo.org/record/1044306#.YEx6UNyRWUk
# https://zenodo.org/record/3891936#.YEx7S9yRWUk
# https://zenodo.org/record/4060319#.YEx7QtyRWUk


# rr
# https://zenodo.org/record/3341592#.YEx5RtyRWUk
# https://zenodo.org/record/3931582#.YEx5W9yRWUk
# https://zenodo.org/record/3528098#.YEx64NyRWUk
# https://hess.copernicus.org/articles/25/3105/2021/
# https://www.nature.com/articles/s41597-019-0282-4#Sec12
# https://www.nature.com/articles/sdata201880#Tab3
# https://edg.epa.gov/metadata/catalog/search/resource/details.page?uuid=https://doi.org/10.23719/1378947

# air
# https://zenodo.org/record/4311854#.YExpwNyRWUk
# https://zenodo.org/record/4281271#.YExpYNyRWUk


# ocean
# https://zenodo.org/record/4600696#.YExpSdyRWUk


# Water Quality
# https://zenodo.org/record/1495558#.YExqFtyRWUk

# Flow
# https://zenodo.org/record/3941890#.YExp5NyRWUk
# https://zenodo.org/record/1206188#.YExn-dyRWUk
# https://zenodo.org/record/4394503#.YEx6ndyRWUk
# https://zenodo.org/record/3240954#.YEx6s9yRWUk


# Groundwater
# https://zenodo.org/record/3887120#.YExq1tyRWUk
# https://zenodo.org/record/3928587#.YExnztyRWUk
# https://zenodo.org/record/1158631#.YEx7ZdyRWUk
# https://zenodo.org/record/4139912#.YEx7XdyRWUk


# Weather
# https://zenodo.org/record/3678799#.YExsP9yRWUk
# https://zenodo.org/record/3679247#.YExsOdyRWUk
# https://zenodo.org/record/3678789#.YExsN9yRWUk
# https://zenodo.org/record/4567325#.YExqjtyRWUk
# https://zenodo.org/record/3549899#.YExqNdyRWUk
# https://zenodo.org/record/4319773#.YExoq9yRWUk
# https://zenodo.org/record/4319770#.YExooNyRWUk
# https://zenodo.org/record/4319756#.YExnl9yRWUk
# https://zenodo.org/record/854619#.YExnityRWUk
# https://essd.copernicus.org/articles/13/1289/2021/
# https://essd.copernicus.org/articles/13/1307/2021/
# https://www.tr32db.uni-koeln.de/search/view.php?dataID=1786
# https://doi.org/10.3334/ORNLDAAC/1840

#DWD
# https://opendata.dwd.de/climate_environment/CDC/observations_germany/

# geologic
# https://zenodo.org/record/4536561#.YExpQNyRWUk
# https://zenodo.org/record/2549499#.YExo09yRWUk


# 2D time series datasets
# https://zenodo.org/record/1135230#.YExYotyRWUk
# https://zenodo.org/record/2630456#.YExb4tyRWUk
# https://zenodo.org/record/4559368#.YExd1NyRWUk
# https://zenodo.org/record/4542076#.YExuxtyRWUk
# https://zenodo.org/record/4489056#.YExoBtyRWUk
# https://zenodo.org/record/1157344#.YExnqNyRWUk
# https://www.nature.com/articles/s41597-020-0450-6
# https://www.nature.com/articles/sdata201542#Abs1
# https://www.nature.com/articles/s41597-019-0228-x
# https://zenodo.org/record/4058167
# https://www.nature.com/articles/sdata2018224#Sec10


# soil
# https://www.tr32db.uni-koeln.de/search/view.php?dataID=1839
# https://www.tr32db.uni-koeln.de/search/view.php?dataID=1838
# https://www.tr32db.uni-koeln.de/search/view.php?dataID=1837
# https://www.tr32db.uni-koeln.de/search/view.php?dataID=1760
# https://www.tr32db.uni-koeln.de/search/view.php?dataID=1761

import os
import glob
import random
import warnings
from typing import Union

import numpy as np
import pandas as pd

from ._backend import netCDF4, xarray as xr

from .download_pangaea import PanDataSet
from .utils import maybe_download, download_and_unzip, unzip_all_in_dir, download
from .utils import check_attributes, check_st_en


SEP = os.sep
# TODO, add visualization

# TODO all available datasets should be available using a single interface instead of importing each separately
DATASETS = [
    'ISWDC',
    'SEBAL_ET_CHINA',
    'GeoChemMatane',
    'PrecipBerlin',
    'HydroChemJava',
    'WaterChemVictoriaLakes',
    'WaterChemEcuador',
    'HydrocarbonsGabes',
    'SedimentAmersee',
    'FlowTetRiver',
    'HoloceneTemp',
    'RiverTempEroo',
    'StreamTempSpain',
    'FlowSedDenmark',
    'FlowSamoylov',
    'EtpPcpSamoylov',
    'RiverIsotope',
    'WQCantareira',
    'RiverTempSpain',
    'HydrometricParana',
    'FlowBenin',
    'YamaguchiClimateJp',
    'WQJordan2',
    'WQJordan',
    'Weisssee'
            ]


class Datasets(object):
    """
    This is the base class for datasets

    Note:
        We don't host datasets. Each dataset is downloaded fromt he target remote
        server and saved into local disk.
    """
    def __init__(
            self,
            name=None,
            units=None,
            path:str = None,
            verbosity:int = 1,
            processes:int = None,
            remove_zip:bool = False,
            overwrite:bool = False,
            float_precision:str = np.float32,
            integer_precision:str = np.int32
    ):
        """
        Arguments:
            name : str (default=None)
                name of dataset
            units : str, (default=None)
                the unit system being used
            path : str (default=None)
                path where the data is available (manually downloaded).
                If None, it will be downloaded
            processes : int
                number of processes to use for parallel processing
            verbosity : int
                determines the amount of information to be printed
            remove_zip : bool
                whether to remove the zip files after unz
        """
        if name is None:
            name = self.__class__.__name__

        if units is not None:
            assert units in ['si', 'imperial', 'metric']

        self.units = units
        self.name = name
        self.processes = processes
        self.verbosity = verbosity
        self.path = path
        self.remove_zip = remove_zip
        self.overwrite = overwrite
        self.fp = float_precision
        self.ip = integer_precision

    @property
    def url(self):
        raise NotImplementedError(f"url must be defined.")

    @property
    def base_ds_dir(self):
        """Base datasets directory"""
        return os.path.join(os.path.dirname(__file__), 'data')

    @property
    def path(self):
        return self._path

    @path.setter
    def path(self, x):
        if x is None:
            # path is not given dataset is not downloaded yet
            if self.__class__.__name__.startswith('CAMELS'):
                x = os.path.join(self.camels_dir, self.__class__.__name__)
            else:
                x = os.path.join(self.base_ds_dir, self.__class__.__name__)

            if not os.path.exists(x):
                os.makedirs(x)
        else:
            #assert os.path.exists(x), f"The path {x} does not exist"
            if not os.path.exists(x) and self.verbosity>0:
                print(f"The path {x} does not exist. Creating the directory.")
            x = os.path.join(x, self.__class__.__name__)
        # sanity_check(self.name, x)
        self._path = x

    def _download(self, overwrite=False, **kwargs):
        """Downloads the dataset. If already downloaded, then

        Parameters
        -----------
        overwrite : bool
        **kwargs :
            any keyword arguments for maybe_download function
        """
        maybe_download(
            self.path, 
            overwrite=overwrite,
            url=self.url, 
            name=self.name,
            verbosity=self.verbosity,
            **kwargs
            )
        return

    def _download_and_unzip(self):
        # todo : this function should not exist, we should always call _download
        download_and_unzip(self.path, self.url)
        return

    def download_from_pangaea(self, overwrite=False):

        if os.path.exists(self.path):
            if overwrite:
                print("removing previously downloaded data and downloading again")
            else:
                print(f"The path {self.path} already exists.")
                self.data_files = [f for f in os.listdir(self.path) if f.endswith('.txt')]
                self.metadata_files = [f for f in os.listdir(self.path) if f.endswith('.json')]
                if len(self.data_files) == 0:
                    print(f"The path {self.path} is empty so downloading the files again")
                    self._download_from_pangaea()
        else:
            os.makedirs(self.path)
            self._download_from_pangaea()
        return

    def _download_from_pangaea(self):
        self.data_files = []
        self.metadata_files = []
        ds = PanDataSet(self.url)
        kids = ds.children()
        if len(kids) > 1:
            for kid in kids:
                kid_ds = PanDataSet(kid)
                fname = kid_ds.download(self.path)
                self.metadata_files.append(fname + '._metadata.json')
                self.data_files.append(fname + '.txt')
        else:
            fname = ds.download(self.path)
            self.metadata_files.append(fname + '._metadata.json')
            self.data_files.append(fname + '.txt')
        return
    
    def remove_zip_files(self):
        for f in os.listdir(self.path):
            if f.endswith('.zip') or f.endswith(".gz"):
                os.remove(os.path.join(self.path, f))
        return

    def maybe_remove_zip_files(self):
        if self.remove_zip:
            self.remove_zip_files()
        return


[docs] class Weisssee(Datasets): dynamic_attributes = ['Precipitation_measurements', 'long_wave_upward_radiation', 'snow_density_at_30cm', 'long_wave_downward_radiation' ] url = '10.1594/PANGAEA.898217'
[docs] def __init__(self, path=None, overwrite=False, **kwargs): super(Weisssee, self).__init__(path=path, **kwargs) #self.path = path self.download_from_pangaea(overwrite=overwrite)
[docs] def fetch(self, **kwargs): """ Examples -------- >>> from water_quality import Weisssee >>> dataset = Weisssee() >>> data = dataset.fetch() """ data = {} for f in self.data_files: fpath = os.path.join(self.path, f) df = pd.read_csv(fpath, **kwargs) if 'index_col' in kwargs: df.index = pd.to_datetime(df.index) data[f.split('.txt')[0]] = df return data
class ETP_CHN_SEBAL(Datasets): url = "https://zenodo.org/record/4218413#.YBNhThZS-Ul" class ISWDC(Datasets): url = "https://zenodo.org/record/2616035#.YBNl5hZS-Uk" class WQJordan(Weisssee): """Jordan River water quality data of 9 variables for two variables.""" url = 'https://doi.pangaea.de/10.1594/PANGAEA.919103' class WQJordan2(Weisssee): """Stage and Turbidity data of Jordan River""" url = '10.1594/PANGAEA.919104' class YamaguchiClimateJp(Weisssee): """Daily climate and flow data of Japan from 2006 2018""" url = "https://doi.pangaea.de/10.1594/PANGAEA.909880" class FlowBenin(Weisssee): """Flow data""" url = "10.1594/PANGAEA.831196" class HydrometricParana(Weisssee): """Daily and monthly water level and flow data of Parana river Argentina from 1875 to 2017.""" url = "https://doi.pangaea.de/10.1594/PANGAEA.882613" class RiverTempSpain(Weisssee): """Daily mean stream temperatures in Central Spain for different periods.""" url = "https://doi.pangaea.de/10.1594/PANGAEA.879494" class WQCantareira(Weisssee): """Water quality and quantity primary data from field campaigns in the Cantareira Water Supply System, period Oct. 2013 - May 2014""" url = "https://doi.pangaea.de/10.1594/PANGAEA.892384" class RiverIsotope(Weisssee): """399 δ18O and δD values in river surface waters of Indian River""" url = "https://doi.pangaea.de/10.1594/PANGAEA.912582" class EtpPcpSamoylov(Weisssee): """Evpotranspiration and Precipitation at station TOWER on Samoylov Island Russia from 20110524 to 20110819 with 30 minute frequency""" url = "10.1594/PANGAEA.811076" class FlowSamoylov(Weisssee): """Net lateral flow at station INT2 on Samoylov Island Russia from 20110612 to 20110819 with 30 minute frequency""" url = "10.1594/PANGAEA.811072" class FlowSedDenmark(Weisssee): """Flow and suspended sediment concentration fields over tidal bedforms, ADCP profile""" url = "10.1594/PANGAEA.841977" class StreamTempSpain(Weisssee): """Daily Mean Stream Temperature at station Tormes3, Central Spain from 199711 to 199906.""" url = "https://doi.pangaea.de/10.1594/PANGAEA.879507" class RiverTempEroo(Weisssee): """Water temperature records in the Eroo River and some tributaries (Selenga River basin, Mongolia, 2011-2012)""" url = "10.1594/PANGAEA.890070" class HoloceneTemp(Weisssee): """Holocene temperature reconstructions for northeastern North America and the northwestern Atlantic, core Big_Round_Lake.""" url = "10.1594/PANGAEA.905446" class FlowTetRiver(Weisssee): """Daily mean river discharge at meteorological station Perpignan upstream, Têt basin France from 1980 to 2000.""" url = "10.1594/PANGAEA.226925" class SedimentAmersee(Weisssee): """Occurence of flood laminae in sediments of Ammersee""" url = "10.1594/PANGAEA.746240" class HydrocarbonsGabes(Weisssee): """Concentration and geological parameters of n-alkanes and n-alkenes in surface sediments from the Gulf of Gabes, Tunisia""" url = "10.1594/PANGAEA.774595" class WaterChemEcuador(Weisssee): """weekly and biweekly Water chemistry of cloud forest streams at baseflow conditions, Rio San Francisco, Ecuador""" url = "10.1594/PANGAEA.778629" class WaterChemVictoriaLakes(Weisssee): """Surface water chemistry of northern Victoria Land lakes""" url = "10.1594/PANGAEA.807883" class HydroChemJava(Weisssee): """Hydrochemical data from subsurface rivers, coastal and submarine springsin a karstic region in southern Java.""" url = "10.1594/PANGAEA.882178" class PrecipBerlin(Weisssee): """Sub-hourly Berlin Dahlem precipitation time-series 2001-2013""" url = "10.1594/PANGAEA.883587" class GeoChemMatane(Weisssee): """Geochemical data collected in shallow groundwater and river water in a subpolar environment (Matane river, QC, Canada).""" url = "10.1594/PANGAEA.908290" class HydroMeteorAndes(Datasets): """Hydrometeriological dataset of tropical Andes region""" url = ["https://springernature.figshare.com/ndownloader/files/10514506", "https://springernature.figshare.com/ndownloader/files/10514509"]
[docs] class WeatherJena(Datasets): """ 10 minute weather dataset of Jena, Germany hosted at https://www.bgc-jena.mpg.de/wetter/index.html from 2002 onwards. Examples -------- >>> from water_quality import WeatherJena >>> dataset = WeatherJena() >>> data = dataset.fetch() >>> data.sum() """ url = "https://www.bgc-jena.mpg.de/wetter/weather_data.html" STARTS = { 'roof': 2004, 'saale': 2003, 'soil': 2007 } PREFIX = { 'roof': 'mpi_roof', 'saale': 'mpi_saale', 'soil': 'MPI_Soil' }
[docs] def __init__(self, path=None, obs_loc='roof'): """ The ETP data is collected at three different locations i.e. roof, soil and saale(hall). Parameters ---------- obs_loc : str, optional (default=roof) location of observation. It can be one of following - roof - soil - saale """ if obs_loc not in ['roof', 'soil', 'saale']: raise ValueError self.obs_loc = obs_loc super().__init__(path=path) sub_dir = os.path.join(self.path, self.obs_loc) if not os.path.exists(sub_dir): os.makedirs(sub_dir) self._download(sub_dir) if xr is None: warnings.warn(""" loading data from csv files is slow. Try installing xarray and netcdf for faster loading """) #download_all_http_directory(self.url, sub_dir, match_name=self.obs_loc) unzip_all_in_dir(sub_dir, 'zip') else: nc_path = os.path.join(sub_dir, "data.nc") if not os.path.exists(nc_path): #download_all_http_directory(self.url, sub_dir, match_name=self.obs_loc) unzip_all_in_dir(sub_dir, 'zip') print("converting data to netcdf file. This will happen only once.") df = self._read_as_df() ndf = pd.DataFrame() for _col in df.columns: col = _col.replace("/", "_") ndf[col] = df[_col].copy() ndf.replace('********', np.nan, inplace=True) ndf['Rn (W_m**2)'] = ndf['Rn (W_m**2)'].astype(np.float32) ndf.to_xarray().to_netcdf(nc_path)
def _download(self, path): """downloads the dataset""" if os.path.exists(path) and len(os.listdir(path)) > 0: return for year in range(self.STARTS[self.obs_loc], 2024): for period in ['a', 'b']: url = f"https://www.bgc-jena.mpg.de/wetter/{self.PREFIX[self.obs_loc]}_{year}{period}.zip" download_and_unzip(path, url=url, name=f"mpi_{self.obs_loc}_{year}{period}.zip", verbosity=self.verbosity) return @property def dynamic_features(self)->list: """returns names of features available""" return self.fetch().columns.tolist()
[docs] def fetch( self, st: Union[str, int, pd.DatetimeIndex] = None, en: Union[str, int, pd.DatetimeIndex] = None ) -> pd.DataFrame: """ Fetches the time series data between given period as pandas dataframe. Parameters ---------- st : Optional start of data to be fetched. If None, the data from start (2003-01-01) will be retuned en : Optional end of data to be fetched. If None, the data from till (2021-12-31) end be retuned. Returns ------- pd.DataFrame a pandas dataframe of shape (972111, 21) Examples -------- >>> from water_quality import WeatherJena >>> dataset = WeatherJena() >>> data = dataset.fetch() >>> data.shape (972111, 21) ... # get data between specific period >>> data = dataset.fetch("20110101", "20201231") >>> data.shape (525622, 21) """ sub_dir = os.path.join(self.path, self.obs_loc) if xr is None: df = self._read_as_df() else: nc_path = os.path.join(sub_dir, "data.nc") df = xr.load_dataset(nc_path).to_dataframe() if 'Date Time' in df: df.index = pd.to_datetime(df.pop('Date Time')) if isinstance(st, int): if en is None: en = len(df) assert isinstance(en, int) return df.iloc[st:en] elif st is not None: return df.loc[st:en] return df
def _read_as_df(self)->pd.DataFrame: sub_dir = os.path.join(self.path, self.obs_loc) all_files = glob.glob(f"{sub_dir}/*.csv") dfs = [] for fpath in all_files: f_df = pd.read_csv(fpath, index_col='Date Time', encoding='unicode_escape', na_values=-9999) f_df.index = pd.to_datetime(f_df.index, format='%d.%m.%Y %H:%M:%S') dfs.append(f_df) df = pd.concat(dfs) return df.sort_index()
[docs] class SWECanada(Datasets): """ Daily Canadian historical Snow Water Equivalent dataset from 1928 to 2020 from Brown_ et al., 2019 . Examples -------- >>> from water_quality import SWECanada >>> swe = SWECanada() ... # get names of all available stations >>> stns = swe.stations() >>> len(stns) 2607 ... # get data of one station >>> df1 = swe.fetch('SCD-NS010') >>> df1['SCD-NS010'].shape (33816, 3) ... # get data of 10 stations >>> df5 = swe.fetch(5, st='20110101') >>> df5.keys() ['YT-10AA-SC01', 'ALE-05CA805', 'SCD-NF078', 'SCD-NF086', 'INA-07RA01B'] >>> [v.shape for v in df5.values()] [(3500, 3), (3500, 3), (3500, 3), (3500, 3), (3500, 3)] ... # get data of 0.1% of stations >>> df2 = swe.fetch(0.001, st='20110101') ... # get data of one stations starting from 2011 >>> df3 = swe.fetch('ALE-05AE810', st='20110101') >>> df3.keys() >>> ['ALE-05AE810'] >>> df4 = swe.fetch(stns[0:10], st='20110101') .. _Brown: https://doi.org/10.1080/07055900.2019.1598843 """ url = "https://zenodo.org/records/10835278" features = ['snw', 'snd', 'den'] q_flags = ['data_flag_snw', 'data_flag_snd', 'qc_flag_snw', 'qc_flag_snd']
[docs] def __init__(self, path=None, **kwargs): super().__init__(path=path, **kwargs) #self.path = path self._download()
def stations(self) -> list: nc = netCDF4.Dataset(os.path.join(self.path, 'CanSWE-CanEEN_1928-2023_v6.nc')) s = nc['station_id'][:] return s.tolist() @property def start(self): return '19280101' @property def end(self): return '20230731'
[docs] def fetch( self, station_id: Union[None, str, float, int, list] = None, features: Union[None, str, list] = None, q_flags: Union[None, str, list] = None, st=None, en=None ) -> dict: """ Fetches time series data from selected stations. Parameters ---------- station_id : station/stations to be retrieved. In None, then data from all stations will be returned. features : Names of features to be retrieved. Following features are allowed: - ``snw`` snow water equivalent kg/m3 - ``snd`` snow depth m - ``den`` snowpack bulk density kg/m3 If None, then all three features will be retrieved. q_flags : If None, then no qflags will be returned. Following q_flag values are available. - ``data_flag_snw`` - ``data_flag_snd`` - ``qc_flag_snw`` - ``qc_flag_snd`` st : start of data to be retrieved en : end of data to be retrived. Returns ------- dict a dictionary of dataframes of shape (st:en, features + q_flags) whose length is equal to length of stations being considered. """ # todo, q_flags not working if station_id is None: station_id = self.stations() elif isinstance(station_id, str): station_id = [station_id] elif isinstance(station_id, list): pass elif isinstance(station_id, int): station_id = random.sample(self.stations(), station_id) elif isinstance(station_id, float): num_stations = int(len(self.stations()) * station_id) station_id = random.sample(self.stations(), num_stations) stns = self.stations() stn_id_dict = {k: v for k, v in zip(stns, np.arange(len(stns)))} stn_id_dict_inv = {v: k for k, v in stn_id_dict.items()} stn_ids = [stn_id_dict[i] for i in station_id] features = check_attributes(features, self.features) qflags = [] if q_flags is not None: qflags = check_attributes(q_flags, self.q_flags) features_to_fetch = features + qflags all_stn_data = {} for stn in stn_ids: stn_df = self.fetch_station_attributes(stn, features_to_fetch, st=st, en=en) all_stn_data[stn_id_dict_inv[stn]] = stn_df return all_stn_data
[docs] def fetch_station_attributes(self, stn, features_to_fetch, st=None, en=None, ) -> pd.DataFrame: """fetches attributes of one station""" # st, en = self._check_length(st, en) nc = netCDF4.Dataset(os.path.join(self.path, 'CanSWE-CanEEN_1928-2023_v6.nc')) stn_df = pd.DataFrame(columns=features_to_fetch) for var in nc.variables: if var in features_to_fetch: ma = np.array(nc[var][:]) ma[ma == nc[var]._FillValue] = np.nan ta = ma[stn, :] # target array of on station s = pd.Series(ta, index=pd.date_range(self.start, self.end, freq='D'), name=var) stn_df[var] = s[st:en] nc.close() return stn_df
class RRAlpineCatchments(Datasets): """ Modelled runoff in contrasting Alpine catchments in Austria from 1981 to 2100 using 14 models follwoing the work of Hanus et al., 2021 [12]_ . past 1981 - 2010 future .. [12] https://hess.copernicus.org/preprints/hess-2021-92/ """ url = "https://zenodo.org/record/4539986" def __init__(self, **kwargs): super().__init__(**kwargs) self._download() class ETPAgroForestGermany(Datasets): """ Evapotranspiration over agroforestry sites in Germany https://doi.org/10.5194/bg-17-5183-2020 SiteName_Landuse_Content_Figures_Tables.csv """ url = "https://zenodo.org/record/4038399" class ETPTelesinaItaly(Datasets): """ Daily rain and reference evapotranspiration for three years 2002-2004 """ url = "https://zenodo.org/record/3726856"
[docs] def gw_punjab( data_type:str = "full", country:str = None, )->pd.DataFrame: """ groundwater level (meters below ground level) dataset from Punjab region (Pakistan and north-west India) following the study of `MacAllister et al., 2022 <https://doi.org/10.1038/s41561-022-00926-1>`_. parameters ---------- data_type : str (default="full") either ``full`` or ``LTS``. The ``full`` contains the full dataset, there are 68783 rows of observed groundwater level data from 4028 individual sites. In ``LTS`` there are 7547 rows of groundwater level observations from 130 individual sites, which have water level data available for a period of more than 40 years and from which at least two thirds of the annual observations are available. country : str (default=None) the country for which data to retrieve. Either ``PAK`` or ``IND``. Returns ------- pd.DataFrame a pandas DataFrame with datetime index Examples --------- >>> from water_quality import gw_punjab >>> full_data = gw_punjab() find out the earliest observation >>> print(full_data.sort_index().head(1)) >>> lts_data = gw_punjab() >>> lts_data.shape (68782, 4) >>> df_pak = gw_punjab(country="PAK") >>> df_pak.sort_index().dropna().head(1) """ f = 'https://webservices.bgs.ac.uk/accessions/download/167240?fileName=India_Pakistan_WL_NGDC.xlsx' ds_dir =os.path.join(os.path.dirname(__file__), "data", 'gw_punjab') if not os.path.exists(ds_dir): os.makedirs(ds_dir) fname = os.path.join(ds_dir, "gw_punjab.xlsx") if not os.path.exists(fname): print(f"downloading {fname}") download(f, fname) assert data_type in ("full", "LTS") if data_type == "full": sheet_name = "Full_dataset" else: sheet_name = "LTS" df = pd.read_excel(fname, sheet_name=sheet_name) if sheet_name == "LTS": df.iloc[5571, 3] = '01/10/1887' df.iloc[5572, 3] = '01/10/1892' df.iloc[6227, 3] = '01/10/1887' df.iloc[5511, 3] = '01/10/1887' df.iloc[5512, 3] = '01/10/1892' df.iloc[6228, 3] = '01/10/1892' df.index = pd.to_datetime(df.pop("DATE")) if country: if country == "PAK": pak_stations = [st for st in df['OW_ID'].unique() if st.startswith("PAK")] df = df[df['OW_ID'].isin(pak_stations)] else: pak_stations = [st for st in df['OW_ID'].unique() if st.startswith("IND")] df = df[df['OW_ID'].isin(pak_stations)] return df