Source code for aqua_fetch.rr._grdccaravan


import os
import concurrent.futures as cf
from typing import Union, List, Dict, Tuple, Callable, Any

import pandas as pd

from .utils import _RainfallRunoff
from ..utils import get_cpus
from ..utils import check_st_en  # todo check difference with self.check_length
from ..utils import validate_attributes, download, unzip

from .._backend import xarray as xr


from ._map import (
    observed_streamflow_cms,
    observed_streamflow_mm,
    mean_air_temp,
    min_air_temp_with_specifier,
    max_air_temp_with_specifier,
    max_air_temp,
    min_air_temp,
    mean_air_temp_with_specifier,
    total_precipitation,
    total_precipitation_with_specifier,
    total_potential_evapotranspiration,
    total_potential_evapotranspiration_with_specifier,
    simulated_streamflow_cms,
    actual_evapotranspiration,
    actual_evapotranspiration_with_specifier,
    solar_radiation_with_specifier,
    mean_vapor_pressure,
    mean_vapor_pressure_with_specifier,
    mean_rel_hum,
    mean_rel_hum_with_specifier,
    rel_hum_with_specifier,
    mean_windspeed,
    u_component_of_wind,
    v_component_of_wind,
    solar_radiation,
    downward_longwave_radiation,
    snow_water_equivalent,
    mean_specific_humidity,
)

from ._map import (
    catchment_area,
    gauge_latitude,
    gauge_longitude,
    slope
    )

[docs] class GRDCCaravan(_RainfallRunoff): """ This is a dataset of 5357 catchments from around the globe following the works of `Faerber et al., 2023 <https://zenodo.org/records/15349031>`_ . The dataset consists of 39 dynamic (timeseries) features and 211 static features. The dynamic (timeseries) data spands from 1950-01-02 to 2019-05-19. if xarray + netCDF4 packages are installed then netcdf files will be downloaded otherwise csv files will be downloaded and used. Examples -------- >>> from aqua_fetch import GRDCCaravan >>> dataset = GRDCCaravan() ... # get data by station id >>> _, dynamic = dataset.fetch(stations='GRDC_3664802', as_dataframe=True) >>> df = dynamic['GRDC_3664802'] # dynamic is a dictionary of with keys as station names and values as DataFrames >>> df.shape (26801, 39) ... ... # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 5357 ... # get data of 10 % of stations as dataframe >>> _, dynamic = dataset.fetch(0.1, as_dataframe=True) >>> len(dynamic) # dynamic has data for 10% of stations (535 out of 5357) 535 ... ... # dynamic is a dictionary whose values are dataframes of dynamic features >>> [df.shape for df in dynamic.values()] [(26801, 39), (26801, 39), (26801, 39),... (26801, 39), (26801, 39)] ... ... get the data of a single (randomly selected) station >>> _, dynamic = dataset.fetch(stations=1, as_dataframe=True) >>> len(dynamic) # dynamic has data for 1 station 1 ... # get names of available dynamic features >>> dataset.dynamic_features ... # get only selected dynamic features >>> _, dynamic = dataset.fetch('GRDC_3664802', as_dataframe=True, ... dynamic_features=['total_precipitation_sum', 'potential_evaporation_sum', 'temperature_2m_mean', 'q_cms_obs']) >>> dynamic['GRDC_3664802'].shape (26801, 4) ... ... # get names of available static features >>> dataset.static_features ... # get data of 10 random stations >>> _, dynamic = dataset.fetch(10, as_dataframe=True) >>> len(dynamic) # remember this is a dictionary with values as dataframe 10 ... # If we get both static and dynamic data >>> static, dynamic = dataset.fetch(stations='GRDC_3664802', static_features="all", as_dataframe=True) >>> static.shape, len(dynamic), dynamic['GRDC_3664802'].shape ((1, 211), 1, (26801, 39)) ... # If we don't set as_dataframe=True and have xarray installed then the returned data will be a xarray Dataset >>> _, dynamic = dataset.fetch(10) ... type(dynamic) xarray.core.dataset.Dataset ... >>> dynamic.dims FrozenMappingWarningOnValuesAccess({'time': 26801, 'dynamic_features': 39}) ... >>> len(dynamic.data_vars) 10 ... >>> coords = dataset.stn_coords() # returns coordinates of all stations >>> coords.shape (5357, 2) >>> dataset.stn_coords('GRDC_3664802') # returns coordinates of station whose id is GRDC_3664802 -26.2271 -51.0771 >>> dataset.stn_coords(['GRDC_3664802', 'GRDC_1159337']) # returns coordinates of two stations ... # get area of a single station >>> dataset.area('GRDC_3664802') # get coordinates of two stations >>> dataset.area(['GRDC_3664802', 'GRDC_1159337']) ... # if fiona library is installed we can get the boundary as fiona Geometry >>> dataset.get_boundary('GRDC_3664802') ... """ url = { 'GRDC_Caravan_extension_nc.zip': "https://zenodo.org/records/15349031/files/GRDC_Caravan_extension_nc.zip?download=1", 'GRDC_Caravan_extension_csv.zip': "https://zenodo.org/records/15349031/files/GRDC_Caravan_extension_csv.zip?download=1" }
[docs] def __init__( self, path=None, overwrite: bool = False, verbosity: int = 1, **kwargs ): if xr is None: self.ftype = 'csv' if "GRDC_Caravan_extension_nc.zip" in self.url: self.url.pop("GRDC_Caravan_extension_nc.zip") # else: self.ftype = 'netcdf' if "GRDC_Caravan_extension_csv.zip" in self.url: self.url.pop("GRDC_Caravan_extension_csv.zip") super().__init__(path=path, verbosity=verbosity, **kwargs) if not os.path.exists(self.path): if self.verbosity>1: print(f"Creating directory {self.path}") os.makedirs(self.path) for _file, url in self.url.items(): fpath = os.path.join(self.path, _file[:-4]) if os.path.exists(fpath) and not overwrite: if self.verbosity > 0: print(f"{_file} at {self.path} already exists") # zip file is present but is not extracted unzip(self.path, verbosity=verbosity) continue fpath = os.path.join(self.path, _file) if not os.path.exists(fpath) and not overwrite: if self.verbosity > 0: print(f"Downloading {_file} from {url + _file}") download(url + _file, outdir=self.path, fname=_file, ) unzip(self.path, verbosity=verbosity) elif self.verbosity > 0: print(f"{_file} at {self.path} already exists") # so that we dont have to read the files again and again self._stations = self.other_attributes().index.to_list() self._static_attributes = self._static_data().columns.tolist() self._dynamic_attributes = self._read_stn_dyn(self.stations()[0]).columns.tolist()
@property def boundary_file(self) -> os.PathLike: return os.path.join( self.shapefiles_path, 'grdc_basin_shapes.shp' ) @property def static_map(self) -> Dict[str, str]: return { 'area': catchment_area(), 'gauge_lat': gauge_latitude(), 'gauge_lon': gauge_longitude(), } @property def dyn_map(self): return { 'streamflow': observed_streamflow_mm(), 'temperature_2m_mean': mean_air_temp_with_specifier('2m'), 'temperature_2m_min': min_air_temp_with_specifier('2m'), 'temperature_2m_max': max_air_temp_with_specifier('2m'), 'total_precipitation_sum': total_precipitation(), } @property def dyn_generator(self)->Dict[str, Tuple[Callable, Any]]: return { observed_streamflow_cms(): (self.mm_to_cms, observed_streamflow_mm()), } @property def static_features(self): return self._static_attributes @property def dynamic_features(self): return self._dynamic_attributes @property def shapefiles_path(self): if self.ftype == 'csv': return os.path.join(self.path, 'GRDC_Caravan_extension_csv', 'GRDC_Caravan_extension_csv', 'shapefiles', 'grdc') return os.path.join(self.path, 'GRDC_Caravan_extension_nc', 'GRDC_Caravan_extension_nc', 'shapefiles', 'grdc') @property def attrs_path(self): if self.ftype == 'csv': return os.path.join(self.path, 'GRDC_Caravan_extension_csv', 'GRDC_Caravan_extension_csv', 'attributes', 'grdc') return os.path.join(self.path, 'GRDC_Caravan_extension_nc', 'GRDC_Caravan_extension_nc', 'attributes', 'grdc') @property def ts_path(self) -> os.PathLike: if self.ftype == 'csv': return os.path.join(self.path, 'GRDC_Caravan_extension_csv', 'GRDC_Caravan_extension_csv', 'timeseries', self.ftype, 'grdc') return os.path.join(self.path, 'GRDC_Caravan_extension_nc', 'GRDC_Caravan_extension_nc', 'timeseries', self.ftype, 'grdc')
[docs] def stations(self) -> List[str]: return self._stations
@property def start(self): return pd.Timestamp("19500102") @property def end(self): return pd.Timestamp("20230518") def other_attributes(self) -> pd.DataFrame: return pd.read_csv(os.path.join(self.attrs_path, 'attributes_other_grdc.csv'), index_col='gauge_id') def hydroatlas_attributes(self) -> pd.DataFrame: return pd.read_csv(os.path.join(self.attrs_path, 'attributes_hydroatlas_grdc.csv'), index_col='gauge_id') def caravan_attributes(self) -> pd.DataFrame: return pd.read_csv(os.path.join(self.attrs_path, 'attributes_caravan_grdc.csv'), index_col='gauge_id') def _static_data(self) -> pd.DataFrame: df = pd.concat([ self.other_attributes(), self.hydroatlas_attributes(), self.caravan_attributes(), ], axis=1) df.rename(columns=self.static_map, inplace=True) return df
[docs] def fetch_station_features( self, station: str, dynamic_features: Union[str, list, None] = 'all', static_features: Union[str, list, None] = None, st: Union[str, None] = None, en: Union[str, None] = None, **kwargs ) -> tuple[pd.DataFrame, pd.DataFrame]: """ Fetches features for one station. Parameters ----------- station : station id/gauge id for which the data is to be fetched. dynamic_features : str/list, optional names of dynamic features/attributes to fetch static_features : names of static features/attributes to be fetches st : str,optional starting point from which the data to be fetched. By default, the data will be fetched from where it is available. en : str, optional end point of data to be fetched. By default the dat will be fetched Returns ------- tuple A tuple of static and dynamic features, both as :obj:`pandas.DataFrame`. The dataframe of static features will be of single row while the dynamic features will be of shape (time, dynamic features). Examples -------- >>> from aqua_fetch import GRDCCaravan >>> dataset = GRDCCaravan() >>> dataset.fetch_station_features('912101A') """ static, dynamic = None, None st, en = self._check_length(st, en) if dynamic_features is not None: dynamic = self._read_dynamic(station, dynamic_features, st, en)[station] if static_features is not None: static = self.fetch_static_features(station, static_features) return static, dynamic
def _read_dynamic( self, stations, dynamic_features, st=None, en=None) -> dict: dynamic_features = validate_attributes(dynamic_features, self.dynamic_features) stations = validate_attributes(stations, self.stations()) st, en = self._check_length(st, en) cpus = self.processes or min(get_cpus() - 2, 64) if len(stations) > 10 and cpus>1: if self.verbosity > 1: print(f"Using {cpus} cpus to read dynamic features for {len(stations)} stations") with cf.ProcessPoolExecutor(max_workers=cpus) as executor: results = executor.map( self._read_stn_dyn, stations, ) dyn = {stn: data.loc[st:en, dynamic_features] for stn, data in zip(stations, results)} else: if self.verbosity > 1: print(f"Using single cpu to read dynamic features for {len(stations)} stations") dyn = {} for idx, stn in enumerate(stations): dyn[stn] = self._read_stn_dyn(stn).loc[st: en, dynamic_features] if self.verbosity>0 and idx % 100 == 0: print(f"Read data for {idx} stations") return dyn def _read_stn_dyn(self, station) -> pd.DataFrame: if self.ftype == "netcdf": fpath = os.path.join(self.ts_path, f'{station}.nc') df = xr.load_dataset(fpath).to_dataframe() else: fpath = os.path.join(self.ts_path, f'{station}.csv') df = pd.read_csv(fpath, index_col='date', parse_dates=True) df.rename(columns=self.dyn_map, inplace=True) for new_feature, (func, old_feature) in self.dyn_generator.items(): if new_feature not in df.columns: df[new_feature] = func(pd.Series(df[old_feature], name=station)) df = df.sort_index() # Ensure df always extends to self.end if df.index[-1] < self.end: # Create complete date range from start of existing data to self.end complete_range = pd.date_range(start=self.start, end=self.end, freq='D') # Reindex to fill missing dates with NaN df = df.reindex(complete_range) df.index.name = 'time' df.columns.name = 'dynamic_features' return df