Source code for aqua_fetch.rr._denmark

import os
from typing import Union, List, Dict

import pandas as pd

from .utils import _RainfallRunoff
from ..utils import validate_attributes

from ._map import (
    total_precipitation,
    mean_air_temp,
    total_potential_evapotranspiration_with_specifier,
    actual_evapotranspiration,
    observed_streamflow_cms,
)

from ._map import (
    catchment_area,
    gauge_latitude,
    gauge_longitude,
    slope
    )


[docs] class Caravan_DK(_RainfallRunoff): """ Reads Caravan extension Denmark - Danish dataset for large-sample hydrology following the works of `Koch and Schneider 2022 <https://doi.org/10.34194/geusb.v49.829>`_ . The dataset is downloaded from `zenodo <https://zenodo.org/record/7962379>`_ . This dataset consists of static and dynamic features from 308 danish catchments. There are 38 dynamic (time series) features from 1981-01-02 to 2020-12-31 with daily timestep and 211 static features for each of 308 catchments. Please note that there is an updated version of this dataset following the works of `Liu et al., 2024 <https://doi.org/10.5194/essd-2024-292>`_ . This dataset is associated with the :py:class:`aqua_fetch.CAMELS_DK` class which can be imported as follows: >>> from aqua_fetch import CAMELS_DK Examples --------- >>> from aqua_fetch import Caravan_DK >>> dataset = Caravan_DK() ... # get data by station id >>> _, dynamic = dataset.fetch(stations='80001', as_dataframe=True) >>> df = dynamic['80001'] # dynamic is a dictionary of with keys as station names and values as DataFrames >>> df.shape (14609, 39) ... ... # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 308 ... # get data of 10 % of stations as dataframe >>> _, dynamic = dataset.fetch(0.1, as_dataframe=True) >>> len(dynamic) # dynamic has data for 10% of stations (31 out of 308) 31 ... ... # dynamic is a dictionary whose values are dataframes of dynamic features >>> [df.shape for df in dynamic.values()] [(14609, 39), (14609, 39), (14609, 39),... (14609, 39), (14609, 39)] ... ... get the data of a single (randomly selected) station >>> _, dynamic = dataset.fetch(stations=1, as_dataframe=True) >>> len(dynamic) # dynamic has data for 1 station 1 ... # get names of available dynamic features >>> dataset.dynamic_features ... # get only selected dynamic features >>> _, dynamic = dataset.fetch('80001', as_dataframe=True, ... dynamic_features=['snow_depth_water_equivalent_mean', 'temperature_2m_mean', 'q_cms_obs']) >>> dynamic['80001'].shape (14609, 3) ... ... # get names of available static features >>> dataset.static_features ... # get data of 10 random stations >>> _, dynamic = dataset.fetch(10, as_dataframe=True) >>> len(dynamic) # remember this is a dictionary with values as dataframe 10 ... # If we get both static and dynamic data >>> static, dynamic = dataset.fetch(stations='80001', static_features="all", as_dataframe=True) >>> static.shape, len(dynamic), dynamic['80001'].shape ((1, 211), 1, (14609, 39)) ... # If we don't set as_dataframe=True and have xarray installed then the returned data will be a xarray Dataset >>> _, dynamic = dataset.fetch(10) ... type(dynamic) xarray.core.dataset.Dataset ... >>> dynamic.dims FrozenMappingWarningOnValuesAccess({'time': 14609, 'dynamic_features': 39}) ... >>> len(dynamic.data_vars) 10 ... >>> coords = dataset.stn_coords() # returns coordinates of all stations >>> coords.shape (308, 2) >>> dataset.stn_coords('80001') # returns coordinates of station whose id is 80001 57.10371 10.3516 >>> dataset.stn_coords(['80001', '240001']) # returns coordinates of two stations ... # get area of a single station >>> dataset.area('80001') # get coordinates of two stations >>> dataset.area(['80001', '240001']) ... # if fiona library is installed we can get the boundary as fiona Geometry >>> dataset.get_boundary('80001') """ url = "https://zenodo.org/record/7962379"
[docs] def __init__(self, path=None, overwrite=False, to_netcdf: bool = True, **kwargs): """ Parameters ---------- path : str If the data is alredy downloaded then provide the complete path to it. If None, then the data will be downloaded. The data is downloaded once and therefore susbsequent calls to this class will not download the data unless ``overwrite`` is set to True. overwrite : bool If the data is already down then you can set it to True, to make a fresh download. to_netcdf : bool whether to convert all the data into one netcdf file or not. This will fasten repeated calls to fetch etc but will require netCDF4 package as well as xarry. """ super(Caravan_DK, self).__init__(path=path, to_netcdf=to_netcdf, **kwargs) self.path = path self._download(overwrite=overwrite) self._static_features = self._static_data().columns.to_list() self._dynamic_features = self.__dynamic_features() self._maybe_to_netcdf()
@property def boundary_file(self) -> os.PathLike: return os.path.join( self.path, #"Caravan_DK", "Caravan_extension_DK", "Caravan_extension_DK", "Caravan_extension_DK", "shapefiles", "camelsdk", "camelsdk_basin_shapes.shp" ) @property def boundary_id_map(self) -> str: return "gauge_id" @property def static_map(self) -> Dict[str, str]: return { 'area': catchment_area(), 'gauge_lat': gauge_latitude(), 'slope_mean': slope('mkm-1'), 'gauge_lon': gauge_longitude(), } @property def csv_path(self): return os.path.join(self.path, "Caravan_extension_DK", "Caravan_extension_DK", "Caravan_extension_DK", "timeseries", "csv", "camelsdk") @property def nc_path(self): return os.path.join(self.path, "Caravan_extension_DK", "Caravan_extension_DK", "Caravan_extension_DK", "timeseries", "netcdf", "camelsdk") @property def other_attr_fpath(self): """returns path to attributes_other_camelsdk.csv file """ return os.path.join(self.path, "Caravan_extension_DK", "Caravan_extension_DK", "Caravan_extension_DK", "attributes", "camelsdk", "attributes_other_camelsdk.csv") @property def caravan_attr_fpath(self): """returns path to attributes_caravan_camelsdk.csv file """ return os.path.join(self.path, "Caravan_extension_DK", "Caravan_extension_DK", "Caravan_extension_DK", "attributes", "camelsdk", "attributes_caravan_camelsdk.csv")
[docs] def stations(self) -> List[str]: return [fname.split(".csv")[0][9:] for fname in os.listdir(self.csv_path)]
def _read_csv(self, stn: str) -> pd.DataFrame: fpath = os.path.join(self.csv_path, f"camelsdk_{stn}.csv") df = pd.read_csv(os.path.join(fpath)) df.index = pd.to_datetime(df.pop('date')) df.rename(columns=self.dyn_map, inplace=True) df.index.name = 'time' df.columns.name = 'dynamic_features' return df @property def dynamic_features(self) -> List[str]: """returns names of dynamic features""" return self._dynamic_features def __dynamic_features(self) -> List[str]: return self._read_csv('100006').columns.to_list() @property def static_features(self) -> List[str]: """returns static features for Denmark catchments""" return self._static_features @property def start(self): # start of data return pd.Timestamp('1981-01-02 00:00:00') @property def end(self) -> pd.Timestamp: # end of data return pd.Timestamp('2020-12-31 00:00:00') @property def dyn_map(self) -> Dict[str, str]: return { 'streamflow': observed_streamflow_cms(), }
[docs] def stn_coords( self, stations: Union[str, List[str]] = 'all' ) -> pd.DataFrame: """ returns coordinates of stations as DataFrame with ``long`` and ``lat`` as columns. Parameters ---------- stations : name/names of stations. If not given, coordinates of all stations will be returned. Returns ------- pd.DataFrame :obj:`pandas.DataFrame` with ``long`` and ``lat`` columns. The length of dataframe will be equal to number of stations wholse coordinates are to be fetched. Examples -------- >>> dataset = Caravan_DK() >>> dataset.stn_coords() # returns coordinates of all stations >>> dataset.stn_coords('100010') # returns coordinates of station whose id is 912101A >>> dataset.stn_coords(['100010', '210062']) # returns coordinates of two stations """ df = pd.read_csv(self.other_attr_fpath, dtype={"gauge_id": str, 'gauge_lat': float, 'gauge_lon': float, 'area': float, 'gauge_name': str, 'country': str }) df.index = [name.split('camelsdk_')[1] for name in df['gauge_id']] df = df[['gauge_lat', 'gauge_lon']] df.columns = ['lat', 'long'] stations = validate_attributes(stations, self.stations()) return df.loc[stations, :]
def _read_dynamic( self, stations, dynamic_features, st=None, en=None) -> dict: st, en = self._check_length(st, en) features = validate_attributes(dynamic_features, self.dynamic_features) dyn = {stn: self._read_csv(stn).loc[st:en, features] for stn in stations} return dyn def _static_data(self)->pd.DataFrame: df = pd.concat([self.hyd_atlas_attributes(), self.other_static_attributes(), self.caravan_static_attributes()], axis=1) df.rename(columns=self.static_map, inplace=True) return df @property def hyd_atlas_fpath(self): return os.path.join(self.path, "Caravan_extension_DK", "Caravan_extension_DK", "Caravan_extension_DK", "attributes", "camelsdk", "attributes_hydroatlas_camelsdk.csv")
[docs] def hyd_atlas_attributes(self, stations='all') -> pd.DataFrame: """ Returns -------- a :obj:`pandas.DataFrame` of shape (308, 196) """ stations = validate_attributes(stations, self.stations()) df = pd.read_csv(self.hyd_atlas_fpath) indices = df.pop('gauge_id') df.index = [idx[9:] for idx in indices] return df
[docs] def other_static_attributes(self, stations='all') -> pd.DataFrame: """ Returns -------- a :obj:`pandas.DataFrame` of shape (308, 5) """ stations = validate_attributes(stations, self.stations()) df = pd.read_csv(self.other_attr_fpath) indices = df.pop('gauge_id') df.index = [idx[9:] for idx in indices] return df
[docs] def caravan_static_attributes(self, stations='all') -> pd.DataFrame: """ Returns -------- a :obj:`pandas.DataFrame` of shape (308, 10) """ stations = validate_attributes(stations, self.stations()) df = pd.read_csv(self.caravan_attr_fpath) indices = df.pop('gauge_id') df.index = [idx[9:] for idx in indices] return df