Source code for aqua_fetch.rr._camels

import os
import json
import glob
import time
import shutil
import zipfile
import warnings
import concurrent.futures as cf
from typing import Union, List, Dict

import numpy as np
import pandas as pd

from .utils import _RainfallRunoff
from .._geom_utils import utm_to_lat_lon
from ..utils import get_cpus, download_and_unzip
from ..utils import check_attributes, download, unzip

from .._backend import netCDF4, xarray as xr

from ._map import (
    observed_streamflow_cms,
    observed_streamflow_mmd,
    mean_air_temp,
    min_air_temp_with_specifier,
    max_air_temp_with_specifier,
    max_air_temp,
    min_air_temp,
    mean_air_temp_with_specifier,
    total_precipitation,
    total_precipitation_with_specifier,
    total_potential_evapotranspiration,
    total_potential_evapotranspiration_with_specifier,
    simulated_streamflow_cms,
    actual_evapotranspiration,
    actual_evapotranspiration_with_specifier,
    solar_radiation_with_specifier,
    mean_vapor_pressure,
    mean_vapor_pressure_with_specifier,
    mean_rel_hum,
    mean_rel_hum_with_specifier,
    rel_hum_with_specifier,
    mean_windspeed,
    u_component_of_wind,
    v_component_of_wind,
    solar_radiation,
    downward_longwave_radiation,
    snow_water_equivalent,
    mean_specific_humidity,
    soil_moisture_layer1,
    soil_moisture_layer2,
    soil_moisture_layer3,
    soil_moisture_layer4,
    mean_dewpoint_temperature_at_2m,
    catchment_area_with_specifier,
    snow_water_equivalent_with_specifier,
    snow_depth,
)

from ._map import (
    catchment_area,
    gauge_latitude,
    gauge_longitude,
    slope,
    gauge_elevation_meters,
    catchment_elevation_meters,
    urban_fraction,
    urban_fraction_with_specifier,
    grass_fraction,
    grass_fraction_with_specifier,
    crop_fraction,
    crop_fraction_with_specifier,
    catchment_perimeter,
    aridity_index,
    med_catchment_elevation_meters,
    soil_depth,
    population_density,
    )

# directory separator
SEP = os.sep


[docs] class CAMELS_US(_RainfallRunoff): """ This is a dataset of 671 US catchments with 59 static features and 8 dyanmic features for each catchment. The dyanmic features are timeseries from 1980-01-01 to 2014-12-31. This class downloads and processes CAMELS dataset of 671 catchments named as CAMELS from `ucar.edu <https://ral.ucar.edu/solutions/products/camels>`_ following `Newman et al., 2015 <https://doi.org/10.5194/hess-19-209-2015>`_ , `Newman et al., 2022 <https://gdex.ucar.edu/dataset/camels.html.>`_ and `Addor et al., 2017 <https://hess.copernicus.org/articles/21/5293/2017/>`_. Examples -------- >>> from aqua_fetch import CAMELS_US >>> dataset = CAMELS_US() >>> _, df = dataset.fetch(stations=1, as_dataframe=True) >>> df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it >>> df.shape (12784, 8) # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 671 # we can get data of 10% catchments as below >>> _, data = dataset.fetch(0.1, as_dataframe=True) >>> data.shape (460488, 51) # the data is multi-index with ``time`` and ``dynamic_features`` as indices >>> data.index.names == ['time', 'dynamic_features'] True # get data by station id >>> _, df = dataset.fetch(stations='11478500', as_dataframe=True) >>> df.unstack().shape (12784, 8) # get names of available dynamic features >>> dataset.dynamic_features # get only selected dynamic features >>> _, df = dataset.fetch(1, as_dataframe=True, ... dynamic_features=['pcp_mm', 'solrad_wm2', 'airtemp_C_max', 'airtemp_C_min', 'q_cms_obs']) >>> df.unstack().shape (12784, 5) # get names of available static features >>> dataset.static_features # get data of 10 random stations >>> _, df = dataset.fetch(10, as_dataframe=True) >>> df.shape (102272, 10) # remember this is multi-indexed DataFrame # If we want to get both static and dynamic data >>> static, dynamic = dataset.fetch(stations='11478500', static_features="all", as_dataframe=True) >>> static.shape, dynamic.unstack().shape ((1, 59), (12784, 8)) """ DATASETS = ['CAMELS_US'] url = { 'camels_attributes_v2.0.pdf': 'https://gdex.ucar.edu/dataset/camels/file/', 'camels_attributes_v2.0.xlsx': 'https://gdex.ucar.edu/dataset/camels/file/', 'camels_clim.txt': 'https://gdex.ucar.edu/dataset/camels/file/', 'camels_geol.txt': 'https://gdex.ucar.edu/dataset/camels/file/', 'camels_hydro.txt': 'https://gdex.ucar.edu/dataset/camels/file/', 'camels_name.txt': 'https://gdex.ucar.edu/dataset/camels/file/', 'camels_soil.txt': 'https://gdex.ucar.edu/dataset/camels/file/', 'camels_topo.txt': 'https://gdex.ucar.edu/dataset/camels/file/', 'camels_vege.txt': 'https://gdex.ucar.edu/dataset/camels/file/', 'readme.txt': 'https://gdex.ucar.edu/dataset/camels/file/', 'basin_timeseries_v1p2_metForcing_obsFlow.zip': 'https://gdex.ucar.edu/dataset/camels/file/', 'basin_set_full_res.zip': 'https://gdex.ucar.edu/dataset/camels/file/', } folders = {'basin_mean_daymet': f'basin_mean_forcing{SEP}daymet', 'basin_mean_maurer': f'basin_mean_forcing{SEP}maurer', 'basin_mean_nldas': f'basin_mean_forcing{SEP}nldas', 'basin_mean_v1p15_daymet': f'basin_mean_forcing{SEP}v1p15{SEP}daymet', 'basin_mean_v1p15_nldas': f'basin_mean_forcing{SEP}v1p15{SEP}nldas', 'elev_bands': f'elev{SEP}daymet', 'hru': f'hru_forcing{SEP}daymet'} dynamic_features_ = ['dayl(s)', 'prcp(mm/day)', 'srad(W/m2)', 'swe(mm)', 'tmax(C)', 'tmin(C)', 'vp(Pa)', 'Flow']
[docs] def __init__( self, path:Union[str, os.PathLike]=None, data_source: str = 'basin_mean_daymet', **kwargs ): """ parameters ---------- path : str If the data is alredy downloaded then provide the complete path to it. If None, then the data will be downloaded. The data is downloaded once and therefore susbsequent calls to this class will not download the data unless ``overwrite`` is set to True. data_source : str allowed values are - basin_mean_daymet - basin_mean_maurer - basin_mean_nldas - basin_mean_v1p15_daymet - basin_mean_v1p15_nldas - elev_bands - hru """ assert data_source in self.folders, f'allwed data sources are {self.folders.keys()}' self.data_source = data_source super().__init__(path=path, name="CAMELS_US", **kwargs) if not os.path.exists(self.path): os.makedirs(self.path) for fname, url in self.url.items(): fpath = os.path.join(self.path, fname) furl = f"{url}{fname}" if not os.path.exists(fpath) or self.overwrite: if self.verbosity: print(f"downloading {fname} from {url}") download(furl, self.path, fname, verbosity=self.verbosity) unzip(self.path, verbosity=self.verbosity) self.dataset_dir = os.path.join( self.path, f'basin_timeseries_v1p2_metForcing_obsFlow{SEP}basin_dataset_public_v1p2') self._static_features = self._static_data().columns.tolist() self._maybe_to_netcdf('camels_us_dyn')
@property def boundary_file(self) -> os.PathLike: return os.path.join( self.path, "basin_set_full_res", "HCDN_nhru_final_671.shp" ) @property def static_map(self) -> Dict[str, str]: return { 'area_gages2': catchment_area(), 'slope_mean': slope('mkm-1'), 'gauge_lat': gauge_latitude(), 'gauge_lon': gauge_longitude(), } @property def dyn_map(self): return { 'Flow': observed_streamflow_cms(), # todo : check units 'tmin(C)': min_air_temp(), 'tmax(C)': max_air_temp(), 'prcp(mm/day)': total_precipitation(), 'swe(mm)': snow_water_equivalent(), 'pet_mean': total_potential_evapotranspiration(), 'vp(Pa)': mean_vapor_pressure(), # todo: convert frmo Pa to hpa 'srad(W/m2)': solar_radiation(), } @property def dyn_factors(self) -> Dict[str, float]: return { observed_streamflow_cms(): 0.0283168, } @property def start(self): return "19800101" @property def end(self): return "20141231" @property def static_features(self)->List[str]: return self._static_features @property def dynamic_features(self) -> List[str]: return [self.dyn_map.get(feat, feat) for feat in self.dynamic_features_]
[docs] def stations(self) -> list: stns = [] for _dir in os.listdir(os.path.join(self.dataset_dir, 'usgs_streamflow')): cat = os.path.join(self.dataset_dir, f'usgs_streamflow{SEP}{_dir}') stns += [fname.split('_')[0] for fname in os.listdir(cat)] # remove stations for which static values are not available for stn in ['06775500', '06846500', '09535100']: stns.remove(stn) return stns
def _read_stn_dyn( self, stn:str, ): assert isinstance(stn, str) df = None dir_name = self.folders[self.data_source] for cat in os.listdir(os.path.join(self.dataset_dir, dir_name)): cat_dirs = os.listdir(os.path.join(self.dataset_dir, f'{dir_name}{SEP}{cat}')) stn_file = f'{stn}_lump_cida_forcing_leap.txt' if stn_file in cat_dirs: df = pd.read_csv(os.path.join(self.dataset_dir, f'{dir_name}{SEP}{cat}{SEP}{stn_file}'), sep="\s+|;|:", skiprows=4, engine='python', names=['Year', 'Mnth', 'Day', 'Hr', 'dayl(s)', 'prcp(mm/day)', 'srad(W/m2)', 'swe(mm)', 'tmax(C)', 'tmin(C)', 'vp(Pa)'], ) df.index = pd.to_datetime( df['Year'].map(str) + '-' + df['Mnth'].map(str) + '-' + df['Day'].map(str)) flow_dir = os.path.join(self.dataset_dir, 'usgs_streamflow') for cat in os.listdir(flow_dir): cat_dirs = os.listdir(os.path.join(flow_dir, cat)) stn_file = f'{stn}_streamflow_qc.txt' if stn_file in cat_dirs: fpath = os.path.join(flow_dir, f'{cat}{SEP}{stn_file}') q_df = pd.read_csv(fpath, sep=r"\s+", names=['station', 'Year', 'Month', 'Day', 'Flow', 'Flag'], engine='python') q_df.index = pd.to_datetime( q_df['Year'].map(str) + '-' + q_df['Month'].map(str) + '-' + q_df['Day'].map(str)) stn_df = pd.concat([ df[['dayl(s)', 'prcp(mm/day)', 'srad(W/m2)', 'swe(mm)', 'tmax(C)', 'tmin(C)', 'vp(Pa)']], q_df['Flow']], axis=1) stn_df.rename(columns=self.dyn_map, inplace=True) for col, fact in self.dyn_factors.items(): if col in stn_df.columns: stn_df[col] *= fact return stn_df def _static_data(self)->pd.DataFrame: static_fpath = os.path.join(self.path, 'static_features.csv') if not os.path.exists(static_fpath): files = glob.glob(f"{self.path}/*.txt") static_dfs = [] for f in files: if not f.endswith('readme.txt'): if self.verbosity>2: print(f"reading {f}") # index should be read as string _df = pd.read_csv(f, sep=';', index_col='gauge_id', dtype={'gauge_id': str}) static_dfs.append(_df) static_df = pd.concat(static_dfs, axis=1) static_df.to_csv(static_fpath, index_label='gauge_id') else: # index should be read as string bcs it has 0s at the start static_df = pd.read_csv(static_fpath, index_col='gauge_id', dtype={'gauge_id': str}) static_df.index = static_df.index.astype(str) static_df = static_df#.loc[stn_id][features] if isinstance(static_df, pd.Series): static_df = pd.DataFrame(static_df).transpose() static_df.rename(columns=self.static_map, inplace=True) return static_df
[docs] class CAMELS_GB(_RainfallRunoff): """ This is a dataset of 671 catchments with 145 static features and 10 dyanmic features for each catchment following the work of `Coxon et al., 2020 <https://doi.org/10.5194/essd-12-2459-2020>`__. The dyanmic features are timeseries from 1970-10-01 to 2015-09-30. The data is downloaded from `ceh website <https://data-package.ceh.ac.uk/data/8344e4f3-d2ea-44f5-8afa-86d2987543a9.zip>`_ Examples -------- >>> from aqua_fetch import CAMELS_GB >>> dataset = CAMELS_GB() >>> _, data = dataset.fetch(0.1, as_dataframe=True) >>> data.shape (164360, 67) >>> data.index.names == ['time', 'dynamic_features'] True >>> _, df = dataset.fetch(stations=1, as_dataframe=True) >>> df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it >>> df.shape (16436, 10) # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 671 # get data by station id >>> _, df = dataset.fetch(stations='97002', as_dataframe=True) >>> df.unstack().shape (16436, 10) # get names of available dynamic features >>> dataset.dynamic_features # get only selected dynamic features >>> _, df = dataset.fetch(1, as_dataframe=True, ... dynamic_features=['windspeed_mps', 'airtemp_C_mean', 'pet_mm', 'pcp_mm', 'q_cms_obs']) >>> df.unstack().shape (16436, 5) # get names of available static features >>> dataset.static_features # get data of 10 random stations >>> _, df = dataset.fetch(10, as_dataframe=True) >>> df.shape (164360, 10) # remember this is multi-indexed DataFrame # when we get both static and dynamic data, the returned data is a dictionary # with ``static`` and ``dyanic`` keys. >>> static, dynamic = dataset.fetch(stations='97002', static_features="all", as_dataframe=True) >>> static, dynamic.shape ((1, 290), (164360, 1)) """ dynamic_features_ = ["precipitation", "pet", "temperature", "discharge_spec", "discharge_vol", "peti", "humidity", "shortwave_rad", "longwave_rad", "windspeed"]
[docs] def __init__(self, path=None, **kwargs): """ parameters ------------ path : str If the data is alredy downloaded then provide the complete path to it. If None, then the data will be downloaded. The data is downloaded once and therefore susbsequent calls to this class will not download the data unless ``overwrite`` is set to True. """ super().__init__(name="CAMELS_GB", path=path, **kwargs) if not os.path.exists(self.path): os.makedirs(self.path) if not os.path.exists(os.path.join(self.path, 'camels_gb')): download( outdir=self.path, url="https://data-package.ceh.ac.uk/data/8344e4f3-d2ea-44f5-8afa-86d2987543a9.zip", fname="camels_gb.zip" ) if self.verbosity > 0: print("unzipping the downloaded file") unzip(self.path, verbosity=self.verbosity) # rename the folder camels_gb/8344e4f3-d2ea-44f5-8afa-86d2987543a9 to camels_gb/caemls_gb shutil.move( os.path.join(self.path, 'camels_gb', '8344e4f3-d2ea-44f5-8afa-86d2987543a9'), os.path.join(self.path, 'camels_gb', 'camels_gb') ) else: if self.verbosity > 0: print(f"dataset is already available at {self.path}") self._static_features = self._static_data().columns.tolist() self._maybe_to_netcdf('camels_gb_dyn') if not os.path.exists(self.boundary_file): unzip(self.data_path, verbosity=self.verbosity)
@property def boundary_file(self) -> os.PathLike: return os.path.join( self.data_path, "CAMELS_GB_catchment_boundaries", "CAMELS_GB_catchment_boundaries.shp" ) @property def static_map(self) -> Dict[str, str]: return { 'area': catchment_area(), 'slope_fdc': slope(''), 'gauge_lat': gauge_latitude(), 'gauge_lon': gauge_longitude(), } @property def dyn_map(self): # table 1 in https://essd.copernicus.org/articles/12/2459/2020/#&gid=1&pid=1 return { 'discharge_vol': observed_streamflow_cms(), 'discharge_spec': observed_streamflow_mmd(), 'temperature': mean_air_temp(), 'humidity': mean_rel_hum(), # todo: convert from g/kg to % 'windspeed': mean_windspeed(), 'precipitation': total_precipitation(), 'pet': total_potential_evapotranspiration(), 'peti': total_potential_evapotranspiration_with_specifier('intercep'), 'shortwave_rad': solar_radiation(), 'longwave_rad': downward_longwave_radiation(), } @property def data_path(self): return os.path.join(self.path, 'camels_gb', 'camels_gb', 'data') @property def static_attribute_categories(self) -> list: features = [] for f in os.listdir(self.data_path): if os.path.isfile(os.path.join(self.data_path, f)) and f.endswith('csv'): features.append(f.split('_')[2]) return features @property def start(self): return pd.Timestamp("19701001") @property def end(self): return pd.Timestamp("20150930") @property def static_features(self)->List[str]: return self._static_features @property def dynamic_features(self) -> List[str]: return [self.dyn_map.get(feat, feat) for feat in self.dynamic_features_]
[docs] def stations(self, to_exclude=None): # CAMELS_GB_hydromet_timeseries_StationID_number path = os.path.join(self.data_path, 'timeseries') gauge_ids = [] for f in os.listdir(path): gauge_ids.append(f.split('_')[4]) return gauge_ids
@property def _mmd_feature_name(self) -> str: return observed_streamflow_mmd() @property def _area_name(self) -> str: return 'area' @property def _coords_name(self) -> List[str]: return ['gauge_lat', 'gauge_lon'] def _read_stn_dyn( self, stn:str )->pd.DataFrame: # making one separate dataframe for one station path = os.path.join(self.data_path, f"timeseries") fname = f"CAMELS_GB_hydromet_timeseries_{stn}_19701001-20150930.csv" df = pd.read_csv(os.path.join(path, fname), index_col='date') df.index = pd.to_datetime(df.index) df.index.freq = pd.infer_freq(df.index) df.rename(columns=self.dyn_map, inplace=True) return df def _static_data(self)->pd.DataFrame: static_fpath = os.path.join(self.data_path, 'static_features.csv') if os.path.exists(static_fpath): static_df = pd.read_csv(static_fpath, index_col='gauge_id') else: files = glob.glob(f"{self.data_path}/*.csv") static_dfs = [] for f in files: _df = pd.read_csv(f, index_col='gauge_id') static_dfs.append(_df) static_df = pd.concat(static_dfs, axis=1) static_df.to_csv(static_fpath) static_df.index = static_df.index.astype(str) static_df.rename(columns=self.static_map, inplace=True) return static_df
[docs] class CAMELS_AUS(_RainfallRunoff): """ This is a dataset of 561 Australian catchments with 187 static features and 28 dyanmic features for each catchment. The dyanmic features are timeseries from 1950-01-01 to 2022-03-31. This class Reads CAMELS-AUS dataset of `Fowler et al., 2024 <https://doi.org/10.5194/essd-2024-263>`_ . If ``version`` is 1 then this class reads data following `Fowler et al., 2021 <https://doi.org/10.5194/essd-13-3847-2021>`_ which is a dataset of 222 Australian catchments with 161 static features and 26 dyanmic features for each catchment. The dyanmic features are timeseries from 1957-01-01 to 2018-12-31. Examples -------- >>> from aqua_fetch import CAMELS_AUS >>> dataset = CAMELS_AUS() >>> _, df = dataset.fetch(stations=1, as_dataframe=True) >>> df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it >>> df.shape # if you are using version 1 then the shape will be (21184, 28) (26388, 28) ... # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 222 ... # get data of 10 % of stations as dataframe >>> _, df = dataset.fetch(0.1, as_dataframe=True) >>> df.shape (550784, 22) ... # The returned dataframe is a multi-indexed data >>> df.index.names == ['time', 'dynamic_features'] True ... # get data by station id >>> df = dataset.fetch(stations='912101A', as_dataframe=True)[1].unstack() >>> df.shape # if you are using version 1 then the shape will be (21184, 28) (26388, 28) ... # get names of available dynamic features >>> dataset.dynamic_features ... # get only selected dynamic features >>> _, data = dataset.fetch(1, as_dataframe=True, ... dynamic_features=['airtemp_C_awap_max', 'pcp_mm_awap', 'et_morton_actual_SILO', 'q_cms_obs']) >>> data.unstack().shape # if you are using version 1 then the shape will be (21184, 4) (26388, 4) ... # get names of available static features >>> dataset.static_features ... # get data of 10 random stations >>> _, df = dataset.fetch(10, as_dataframe=True) >>> df.shape # remember this is a multiindexed dataframe (26388, 260) # If we get both static and dynamic data >>> static, dynamic = dataset.fetch(stations='912101A', static_features="all", as_dataframe=True) >>> static.shape, dynamic.unstack()shape >>> ((1, 166), (26388, 28)) """ url = 'https://doi.pangaea.de/10.1594/PANGAEA.921850' url_v2 = "https://zenodo.org/records/13350616" urls = {1: { "01_id_name_metadata.zip": "https://download.pangaea.de/dataset/921850/files/", "02_location_boundary_area.zip": "https://download.pangaea.de/dataset/921850/files/", "03_streamflow.zip": "https://download.pangaea.de/dataset/921850/files/", "04_attributes.zip": "https://download.pangaea.de/dataset/921850/files/", "05_hydrometeorology.zip": "https://download.pangaea.de/dataset/921850/files/", "CAMELS_AUS_Attributes&Indices_MasterTable.csv": "https://download.pangaea.de/dataset/921850/files/", # "Units_01_TimeseriesData.pdf": "https://download.pangaea.de/dataset/921850/files/", # "Units_02_AttributeMasterTable.pdf": "https://download.pangaea.de/dataset/921850/files/", }, 2: { "01_id_name_metadata.zip": "https://zenodo.org/records/13350616/files/", "02_location_boundary_area.zip": "https://zenodo.org/records/13350616/files/", "03_streamflow.zip": "https://zenodo.org/records/13350616/files/", "04_attributes.zip": "https://zenodo.org/records/13350616/files/", "05_hydrometeorology.zip": "https://zenodo.org/records/13350616/files/", "CAMELS_AUS_Attributes&Indices_MasterTable.csv": "https://zenodo.org/records/13350616/files/", } } folders = {1: { 'streamflow_MLd': f'03_streamflow{SEP}03_streamflow', 'streamflow_MLd_inclInfilled': f'03_streamflow{SEP}03_streamflow', 'streamflow_mmd': f'03_streamflow{SEP}03_streamflow', 'et_morton_actual_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}02_EvaporativeDemand_timeseries', 'et_morton_point_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}02_EvaporativeDemand_timeseries', 'et_morton_wet_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}02_EvaporativeDemand_timeseries', 'et_short_crop_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}02_EvaporativeDemand_timeseries', 'et_tall_crop_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}02_EvaporativeDemand_timeseries', 'evap_morton_lake_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}02_EvaporativeDemand_timeseries', 'evap_pan_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}02_EvaporativeDemand_timeseries', 'evap_syn_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}02_EvaporativeDemand_timeseries', 'precipitation_AWAP': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}01_precipitation_timeseries', 'precipitation_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}01_precipitation_timeseries', 'precipitation_var_AWAP': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}01_precipitation_timeseries', 'solarrad_AWAP': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}AWAP', 'tmax_AWAP': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}AWAP', 'tmin_AWAP': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}AWAP', 'vprp_AWAP': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}AWAP', 'mslp_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}SILO', 'radiation_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}SILO', 'rh_tmax_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}SILO', 'rh_tmin_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}SILO', 'tmax_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}SILO', 'tmin_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}SILO', 'vp_deficit_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}SILO', 'vp_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}SILO', }, 2: { 'streamflow_MLd': f'03_streamflow{SEP}03_streamflow', 'streamflow_MLd_inclInfilled': f'03_streamflow{SEP}03_streamflow', 'streamflow_mmd': f'03_streamflow{SEP}03_streamflow', 'et_morton_actual_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}02_EvaporativeDemand_timeseries', 'et_morton_point_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}02_EvaporativeDemand_timeseries', 'et_morton_wet_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}02_EvaporativeDemand_timeseries', 'et_short_crop_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}02_EvaporativeDemand_timeseries', 'et_tall_crop_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}02_EvaporativeDemand_timeseries', 'evap_morton_lake_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}02_EvaporativeDemand_timeseries', 'evap_pan_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}02_EvaporativeDemand_timeseries', 'evap_syn_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}02_EvaporativeDemand_timeseries', 'precipitation_AGCD': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}01_precipitation_timeseries', 'precipitation_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}01_precipitation_timeseries', 'precipitation_var_AGCD': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}01_precipitation_timeseries', # 'solarrad_AWAP': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}AGCD{SEP}solarrad_AWAP', 'tmax_AGCD': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}AGCD', 'tmin_AGCD': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}AGCD', 'vapourpres_h09_AGCD': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}AGCD', 'vapourpres_h15_AGCD': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}AGCD', 'mslp_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}SILO', 'radiation_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}SILO', 'rh_tmax_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}SILO', 'rh_tmin_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}SILO', 'tmax_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}SILO', 'tmin_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}SILO', 'vp_deficit_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}SILO', 'vp_SILO': f'05_hydrometeorology{SEP}05_hydrometeorology{SEP}03_Other{SEP}SILO', } }
[docs] def __init__( self, path: str = None, version: int = 2, to_netcdf: bool = True, overwrite: bool = False, verbosity: int = 1, **kwargs ): """ Arguments: path: path where the CAMELS_AUS dataset has been downloaded. This path must contain five zip files and one xlsx file. If None, then the data will be downloaded. version: version of the dataset to download. Allowed values are 1 and 2. to_netcdf : """ self.version = version super().__init__(path=path, verbosity=verbosity, **kwargs) if not os.path.exists(self.path): os.makedirs(self.path) for _file, url in self.urls[version].items(): fpath = os.path.join(self.path, _file) if os.path.exists(fpath) and overwrite: os.remove(fpath) if verbosity > 0: print(f"Re-downloading {_file} from {url + _file} at {fpath}") download(url + _file, outdir=self.path, fname=_file, verbosity=self.verbosity) elif not os.path.exists(fpath): if verbosity > 0: print(f"Downloading {_file} from {url + _file} at {fpath}") download(url + _file, outdir=self.path, fname=_file, verbosity=self.verbosity) elif verbosity > 0: print(f"{_file} already exists at {self.path}") # maybe the .zip file has been downloaded previously but not unzipped unzip(self.path, verbosity=verbosity, overwrite=overwrite) if netCDF4 is None: to_netcdf = False if to_netcdf: self._maybe_to_netcdf('camels_aus_dyn')
@property def boundary_file(self) -> os.PathLike: return os.path.join( self.path, "02_location_boundary_area", "02_location_boundary_area", "shp", "CAMELS_AUS_Boundaries_adopted.shp" if self.version == 1 else "CAMELS_AUS_v2_Boundaries_adopted.shp" ) @property def static_map(self) -> Dict[str, str]: return { 'catchment_area': catchment_area(), 'maen_slope_pct': slope('%'), 'lat_outlet': gauge_latitude(), 'long_outlet': gauge_longitude(), } @property def dyn_map(self): # table 2 in https://essd.copernicus.org/articles/13/3847/2021/#&gid=1&pid=1 return { 'streamflow_MLd': observed_streamflow_cms(), 'streamflow_mmd': observed_streamflow_mmd(), 'tmin_SILO': min_air_temp_with_specifier('silo'), 'tmax_SILO': max_air_temp_with_specifier('silo'), 'tmin_AWAP': min_air_temp_with_specifier('awap'), 'tmax_AWAP': max_air_temp_with_specifier('awap'), 'et_morton_actual_SILO': actual_evapotranspiration_with_specifier('silo_morton'), 'et_morton_point_SILO': actual_evapotranspiration_with_specifier('silo_morton_point'), 'et_short_crop_SILO': actual_evapotranspiration_with_specifier('silo_short_crop'), 'et_tall_crop_SILO': actual_evapotranspiration_with_specifier('silo_tall_crop'), 'precipitation_AWAP': total_precipitation_with_specifier('awap'), 'precipitation_SILO': total_precipitation_with_specifier('silo'), 'solarrad_AWAP': solar_radiation_with_specifier('awap'), # convert MJ/m2/day to W/m2 'radiation_SILO': solar_radiation_with_specifier('silo'), # convert MJ/m2/day to W/m2 'vp_SILO': mean_vapor_pressure_with_specifier('silo'), 'vprp_AWAP': mean_vapor_pressure_with_specifier('awap'), 'rh_tmax_SILO': mean_rel_hum_with_specifier('silo_tmax'), 'rh_tmin_SILO': mean_rel_hum_with_specifier('silo_tmin'), 'vapourpres_h09_AGCD': mean_vapor_pressure_with_specifier('agcd_h09'), 'vapourpres_h15_AGCD': mean_vapor_pressure_with_specifier('agcd_h15'), 'tmin_AGCD': min_air_temp_with_specifier('agcd'), 'tmax_AGCD': max_air_temp_with_specifier('agcd'), 'precipitation_AGCD': total_precipitation_with_specifier('agcd'), #'mslp_SILO': mean_sea_level_pressure_with_specifier('silo'), } @property def dyn_factors(self): return { observed_streamflow_cms(): 0.01157, } @property def dyn_generators(self): if self.version == 1: return { # new column to be created : function to be applied, inputs mean_air_temp_with_specifier('silo'): (self.mean_temp, (min_air_temp_with_specifier('silo'), max_air_temp_with_specifier('silo'))), mean_air_temp_with_specifier('awap'): (self.mean_temp, (min_air_temp_with_specifier('awap'), max_air_temp_with_specifier('awap'))), } else: return { # new column to be created : function to be applied, inputs mean_air_temp_with_specifier('silo'): (self.mean_temp, (min_air_temp_with_specifier('silo'), max_air_temp_with_specifier('silo'))), mean_air_temp_with_specifier('agcd'): (self.mean_temp, (min_air_temp_with_specifier('agcd'), max_air_temp_with_specifier('agcd'))), } @property def start(self): return "19500101" @property def end(self): return "20181231" if self.version == 1 else "20220331" @property def location(self): return "Australia"
[docs] def stations(self, as_list=True) -> list: fname = os.path.join(self.path, f"01_id_name_metadata{SEP}01_id_name_metadata{SEP}id_name_metadata.csv") df = pd.read_csv(fname) if as_list: return df['station_id'].to_list() else: return df
@property def static_attribute_categories(self): features = [] path = os.path.join(self.path, f'04_attributes{SEP}04_attributes') for f in os.listdir(path): if os.path.isfile(os.path.join(path, f)) and f.endswith('csv'): f = str(f.split('.csv')[0]) features.append(''.join(f.split('_')[2:])) return features @property def static_features(self) -> List[str]: return self._static_data().columns.tolist() @property def dynamic_features(self) -> list: return [self.dyn_map.get(feat, feat) for feat in list(self.folders[self.version].keys())] + list(self.dyn_generators.keys()) def _static_data(self, #stations, #features, st=None, en=None): #features = check_attributes(features, self.static_features, 'static_features') static_fname = 'CAMELS_AUS_Attributes&Indices_MasterTable.csv' static_fpath = os.path.join(self.path, static_fname) static_df = pd.read_csv(static_fpath, index_col='station_id') static_df.index = static_df.index.astype(str) #static_df = static_df.loc[stations]#[features] if isinstance(static_df, pd.Series): static_df = pd.DataFrame(static_df).transpose() static_df.rename(columns=self.static_map, inplace=True) return static_df def _read_dynamic( self, stations, dynamic_features, st=None, en=None, ): # todo currently it reads the data for all stations even if we want # only one station. This makes it quite slow when we want to get data for only # one station st, en = self._check_length(st, en) dynamic_features = check_attributes(dynamic_features, self.dynamic_features, 'dynamic_features') dyn_attrs = {} dyn = {} for _attr in list(self.folders[self.version].keys()): _path = os.path.join(self.path, f'{self.folders[self.version][_attr]}{SEP}{_attr}.csv') attr_df = pd.read_csv(_path, na_values=['-99.99']) attr_df.index = pd.to_datetime(attr_df[['year', 'month', 'day']]) [attr_df.pop(col) for col in ['year', 'month', 'day']] dyn_attrs[_attr] = attr_df # making one separate dataframe for one station for idx, stn in enumerate(stations): stn_df = pd.DataFrame() for attr, attr_df in dyn_attrs.items(): # if attr in dynamic_features: stn_df[attr] = attr_df[stn] stn_df.rename(columns=self.dyn_map, inplace=True) for col, fact in self.dyn_factors.items(): if col in stn_df.columns: stn_df[col] = stn_df[col] * fact for new_col, (func, old_col) in self.dyn_generators.items(): if isinstance(old_col, str): if old_col in stn_df.columns: # name of Series to func should be same as station id stn_df[new_col] = func(pd.Series(stn_df[old_col], name=stn)) else: assert isinstance(old_col, tuple) if all([col in stn_df.columns for col in old_col]): # feed all old_cols to the function stn_df[new_col] = func(*[pd.Series(stn_df[col], name=stn) for col in old_col]) if self.verbosity>1 and idx % 100 == 0: print(f"processed {idx} stations") stn_df.index.name = 'time' stn_df.columns.name = 'dynamic_features' dyn[stn] = stn_df.loc[st:en, dynamic_features] return dyn
[docs] class CAMELS_CL(_RainfallRunoff): """ This is a dataset of 516 Chilean catchments with 104 static features and 12 dyanmic features for each catchment. The dyanmic features are timeseries from 1913-02-15 to 2018-03-09. This class downloads and processes CAMELS dataset of Chile following the work of `Alvarez-Garreton et al., 2018 <https://doi.org/10.5194/hess-22-5817-2018>`_ . Examples --------- >>> from aqua_fetch import CAMELS_CL >>> dataset = CAMELS_CL() >>> _, df = dataset.fetch(stations=1, as_dataframe=True) >>> df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it >>> df.shape (38374, 12) # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 516 # we can get data of 10% catchments as below >>> _, data = dataset.fetch(0.1, as_dataframe=True) >>> data.shape (460488, 51) # the data is multi-index with ``time`` and ``dynamic_features`` as indices >>> df.index.names == ['time', 'dynamic_features'] True # get data by station id >>> _, df = dataset.fetch(stations='8350001', as_dataframe=True) >>> df.unstack().shape (38374, 12) # get names of available dynamic features >>> dataset.dynamic_features # get only selected dynamic features >>> _, df = dataset.fetch(1, as_dataframe=True, ... dynamic_features=['pet_mm_hargreaves', 'pcp_mm_mswep', 'airtemp_C_mean', 'q_cms_obs']) >>> df.unstack().shape (38374, 4) # get names of available static features >>> dataset.static_features # get data of 10 random stations >>> _, df = dataset.fetch(10, as_dataframe=True) >>> df.shape (460488, 10) # If we get both static and dynamic data >>> static, dynamic = dataset.fetch(stations='8350001', static_features="all", as_dataframe=True) >>> static.shape, dynamic.unstack().shape >>> ((1, 104), (38374, 12)) """ urls = { "1_CAMELScl_attributes.zip": "https://store.pangaea.de/Publications/Alvarez-Garreton-etal_2018/", "2_CAMELScl_streamflow_m3s.zip": "https://store.pangaea.de/Publications/Alvarez-Garreton-etal_2018/", "3_CAMELScl_streamflow_mm.zip": "https://store.pangaea.de/Publications/Alvarez-Garreton-etal_2018/", "4_CAMELScl_precip_cr2met.zip": "https://store.pangaea.de/Publications/Alvarez-Garreton-etal_2018/", "5_CAMELScl_precip_chirps.zip": "https://store.pangaea.de/Publications/Alvarez-Garreton-etal_2018/", "6_CAMELScl_precip_mswep.zip": "https://store.pangaea.de/Publications/Alvarez-Garreton-etal_2018/", "7_CAMELScl_precip_tmpa.zip": "https://store.pangaea.de/Publications/Alvarez-Garreton-etal_2018/", "8_CAMELScl_tmin_cr2met.zip": "https://store.pangaea.de/Publications/Alvarez-Garreton-etal_2018/", "9_CAMELScl_tmax_cr2met.zip": "https://store.pangaea.de/Publications/Alvarez-Garreton-etal_2018/", "10_CAMELScl_tmean_cr2met.zip": "https://store.pangaea.de/Publications/Alvarez-Garreton-etal_2018/", "11_CAMELScl_pet_8d_modis.zip": "https://store.pangaea.de/Publications/Alvarez-Garreton-etal_2018/", "12_CAMELScl_pet_hargreaves.zip": "https://store.pangaea.de/Publications/Alvarez-Garreton-etal_2018/", "13_CAMELScl_swe.zip": "https://store.pangaea.de/Publications/Alvarez-Garreton-etal_2018/", "14_CAMELScl_catch_hierarchy.zip": "https://store.pangaea.de/Publications/Alvarez-Garreton-etal_2018/", "CAMELScl_catchment_boundaries.zip": "https://store.pangaea.de/Publications/Alvarez-Garreton-etal_2018/", } dynamic_features_ = ['streamflow_m3s', 'streamflow_mm', 'precip_cr2met', 'precip_chirps', 'precip_mswep', 'precip_tmpa', 'tmin_cr2met', 'tmax_cr2met', 'tmean_cr2met', 'pet_8d_modis', 'pet_hargreaves', 'swe' ]
[docs] def __init__(self, path: str = None, **kwargs, ): """ Arguments: path: path where the CAMELS-CL dataset has been downloaded. This path must contain five zip files and one xlsx file. """ super().__init__(path=path, **kwargs) self.path = path if not os.path.exists(self.path): os.makedirs(self.path) for _file, url in self.urls.items(): fpath = os.path.join(self.path, _file) if not os.path.exists(fpath) or (os.path.exists(fpath) and self.overwrite): if self.verbosity: print(f"Downloading {_file} from {url + _file} at {fpath}") download(url + _file, self.path, verbosity=self.verbosity) unzip(self.path, verbosity=self.verbosity) self._static_features = self._static_data().columns.tolist() self.dyn_fname = os.path.join(self.path, 'camels_cl_dyn.nc') self._maybe_to_netcdf('camels_cl_dyn')
@property def boundary_file(self) -> os.PathLike: return os.path.join( self.path, "CAMELScl_catchment_boundaries", "CAMELScl_catchment_boundaries", "catchments_camels_cl_v1.3.shp" ) @property def static_map(self) -> Dict[str, str]: return { 'area': catchment_area(), 'slope_mean': slope('mkm-1'), 'gauge_lat': gauge_latitude(), 'gauge_lon': gauge_longitude(), } @property def dyn_map(self): return { 'streamflow_m3s': observed_streamflow_cms(), 'streamflow_mm': observed_streamflow_mmd(), 'tmin_cr2met': min_air_temp(), 'tmax_cr2met': max_air_temp(), 'tmean_cr2met': mean_air_temp(), 'precip_mswep': total_precipitation_with_specifier('mswep'), 'precip_tmpa': total_precipitation_with_specifier('tmpa'), 'precip_cr2met': total_precipitation_with_specifier('cr2met'), 'precip_chirps': total_precipitation_with_specifier('chirps'), 'pet_hargreaves': total_potential_evapotranspiration_with_specifier('hargreaves'), 'pet_8d_modis': total_potential_evapotranspiration_with_specifier('modis'), } @property def _all_dirs(self): """All the folders in the dataset_directory""" return [f for f in os.listdir(self.path) if os.path.isdir(os.path.join(self.path, f))] @property def start(self): return "19130215" @property def end(self): return "20180309" @property def location(self): return "Chile" @property def static_features(self) -> List[str]: return self._static_features def _static_data(self)->pd.DataFrame: path = os.path.join(self.path, f"1_CAMELScl_attributes{SEP}1_CAMELScl_attributes.txt") df = pd.read_csv(path, sep='\t', index_col='gauge_id') df = df.transpose() df.rename(columns=self.static_map, inplace=True) # remove empty space in index of df df.index = df.index.str.strip() return df @property def dynamic_features(self) -> List[str]: return [self.dyn_map.get(feat, feat) for feat in self.dynamic_features_] @property def _mmd_feature_name(self) -> str: return observed_streamflow_mmd()
[docs] def stn_coords( self, stations: Union[str, List[str]] = "all" ) -> pd.DataFrame: """ returns coordinates of stations as DataFrame with ``long`` and ``lat`` as columns. Parameters ---------- stations : name/names of stations. If not given, coordinates of all stations will be returned. Returns ------- pd.DataFrame :obj:`pandas.DataFrame` with ``long`` and ``lat`` columns. The length of dataframe will be equal to number of stations wholse coordinates are to be fetched. Examples -------- >>> dataset = CAMELS_CL() >>> dataset.stn_coords() # returns coordinates of all stations >>> dataset.stn_coords('12872001') # returns coordinates of station whose id is 912101A >>> dataset.stn_coords(['12872001', '12876004']) # returns coordinates of two stations """ fpath = os.path.join(self.path, '1_CAMELScl_attributes', '1_CAMELScl_attributes.txt') df = pd.read_csv(fpath, sep='\t', index_col='gauge_id') df = df.loc[['gauge_lat', 'gauge_lon'], :].transpose() df.columns = ['lat', 'long'] stations = check_attributes(stations, self.stations(), 'stations') df.index = [index.strip() for index in df.index] return df.loc[stations, :].astype(self.fp)
[docs] def stations(self) -> list: """ Tells all station ids for which a data of a specific attribute is available. """ stn_fname = os.path.join(self.path, 'stations.json') if not os.path.exists(stn_fname): _stations = {} for dyn_attr in self.dynamic_features_: for _dir in self._all_dirs: if dyn_attr in _dir: fname = os.path.join(self.path, f"{_dir}{SEP}{_dir}.txt") df = pd.read_csv(fname, sep='\t', nrows=2, index_col='gauge_id') _stations[dyn_attr] = list(df.columns) stns = list(set.intersection(*map(set, list(_stations.values())))) with open(stn_fname, 'w') as fp: json.dump(stns, fp) else: with open(stn_fname, 'r') as fp: stns = json.load(fp) return stns
def _read_dynamic(self, stations, dynamic_features, st=None, en=None): dyn = {} st, en = self._check_length(st, en) dynamic_features = check_attributes(dynamic_features, self.dynamic_features, 'dynamic_features') assert all(stn in self.stations() for stn in stations) dynamic_features = check_attributes(dynamic_features, self.dynamic_features) # reading all dynnamic features dyn_attrs = {} for attr in self.dynamic_features_: fname = [f for f in self._all_dirs if '_' + attr in f][0] fname = os.path.join(self.path, f'{fname}{SEP}{fname}.txt') df = pd.read_csv(fname, sep='\t', index_col=['gauge_id'], na_values=" ") df.index = pd.to_datetime(df.index) dyn_attrs[attr] = df[st:en] # making one separate dataframe for one station for stn in stations: stn_df = pd.DataFrame() for attr, attr_df in dyn_attrs.items(): # if attr in dynamic_features: stn_df[attr] = attr_df[stn] stn_df.rename(columns=self.dyn_map, inplace=True) stn_df.index.name = 'time' stn_df.columns.name = 'dynamic_features' dyn[stn] = stn_df.loc[st:en, dynamic_features] return dyn
[docs] class CAMELS_CH(_RainfallRunoff): """ Data of 331 Swiss catchments from `Hoege et al., 2023 <https://doi.org/10.5194/essd-15-5755-2023>`_ . The dataset consists of 209 static catchment features and 9 dynamic features. The dynamic features span from 19810101 to 20201231 with daily timestep. For daily (``D``) ``timestep``, only streamflow is available for 170 swiss catchments. The hourly (``H``) streamflow data is obtained from `Kauzlaric et al., 2023 <https://zenodo.org/records/7691294>`_ . Examples --------- >>> from aqua_fetch import CAMELS_CH >>> dataset = CAMELS_CH() >>> _, data = dataset.fetch(0.1, as_dataframe=True) >>> data.shape (128560, 10) >>> data.index.names == ['time', 'dynamic_features'] True >>> _, df = dataset.fetch(stations=1, as_dataframe=True) >>> df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it >>> df.shape (8036, 9) # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 331 # get data by station id >>> _, df = dataset.fetch(stations='2004', as_dataframe=True) >>> df.unstack().shape (8036, 9) # get names of available dynamic features >>> dataset.dynamic_features # get only selected dynamic features >>> _, df = dataset.fetch(1, as_dataframe=True, dynamic_features=['pcp_mm', 'airtemp_C_mean', 'q_cms_obs']) >>> df.unstack().shape (8036, 3) # get names of available static features >>> dataset.static_features # get data of 10 random stations >>> _, df = dataset.fetch(10, as_dataframe=True) >>> df.shape (72324, 10) # remember this is multi-indexed DataFrame # If we get both static and dynamic data >>> static, dynamic = dataset.fetch(stations='2004', static_features="all", as_dataframe=True) >>> static.shape, dynamic.unstack().shape ((1, 209), (8036, 9)) """ url = { 'camels_ch.zip': "https://zenodo.org/record/7957061", 'DischargeDBHydroCH.zip': 'https://zenodo.org/records/7691294' }
[docs] def __init__( self, path=None, overwrite: bool = False, to_netcdf: bool = True, timestep: str = 'D', **kwargs ): """ Parameters ---------- path : str If the data is alredy downloaded then provide the complete path to it. If None, then the data will be downloaded. The data is downloaded once and therefore susbsequent calls to this class will not download the data unless ``overwrite`` is set to True. overwrite : bool If the data is already down then you can set it to True, to make a fresh download. to_netcdf : bool whether to convert all the data into one netcdf file or not. This will fasten repeated calls to fetch etc. but will require netcdf5 package as well as xarry. """ super().__init__(path=path, **kwargs) self.timestep = timestep if timestep == 'D' and 'DischargeDBHydroCH.zip' in self.url: self.url.pop('DischargeDBHydroCH.zip') self._download(overwrite=overwrite) self._dynamic_features = self._read_stn_dyn(self.stations()[0]).columns.tolist() if to_netcdf: self._maybe_to_netcdf('camels_ch_dyn')
@property def boundary_file(self) -> os.PathLike: return os.path.join( self.path, 'camels_ch', 'camels_ch', 'catchment_delineations', 'CAMELS_CH_catchments.shp' ) @property def static_map(self) -> Dict[str, str]: return { 'area': catchment_area(), 'slope_mean': slope('degrees'), 'gauge_lat': gauge_latitude(), 'gauge_lon': gauge_longitude(), } @property def dyn_map(self): # table 1 in https://essd.copernicus.org/articles/15/5755/2023/ return { 'discharge_vol(m3/s)': observed_streamflow_cms(), # 'discharge_vol(m3/s)': 'sim_q_cms', 'discharge_spec(mm/d)': observed_streamflow_mmd(), 'temperature_min(°C)': min_air_temp(), 'temperature_max(°C)': max_air_temp(), 'temperature_mean(°C)': mean_air_temp(), 'precipitation(mm/d)': total_precipitation(), 'swe(mm)': snow_water_equivalent(), } @property def camels_path(self) -> Union[str, os.PathLike]: return os.path.join(self.path, 'camels_ch', 'camels_ch') @property def static_path(self) -> Union[str, os.PathLike]: return os.path.join(self.camels_path, 'static_attributes') @property def dynamic_path(self) -> Union[str, os.PathLike]: return os.path.join(self.camels_path, 'time_series', 'observation_based') @property def glacier_attr_path(self) -> Union[str, os.PathLike]: return os.path.join(self.static_path, 'CAMELS_CH_glacier_attributes.csv') @property def clim_attr_path(self) -> Union[str, os.PathLike]: return os.path.join(self.static_path, 'CAMELS_CH_climate_attributes_obs.csv') @property def geol_attr_path(self) -> Union[str, os.PathLike]: return os.path.join(self.static_path, 'CAMELS_CH_geology_attributes.csv') @property def supp_geol_attr_path(self) -> Union[str, os.PathLike]: return os.path.join(self.static_path, 'CAMELS_CH_geology_attributes_supplement.csv') @property def hum_inf_attr_path(self) -> Union[str, os.PathLike]: return os.path.join(self.static_path, 'CAMELS_CH_humaninfluence_attributes.csv') @property def hydrogeol_attr_path(self) -> Union[str, os.PathLike]: return os.path.join(self.static_path, 'CAMELS_CH_hydrogeology_attributes.csv') @property def hydrol_attr_path(self) -> Union[str, os.PathLike]: return os.path.join(self.static_path, 'CAMELS_CH_hydrology_attributes_obs.csv') @property def lc_attr_path(self) -> Union[str, os.PathLike]: return os.path.join(self.static_path, 'CAMELS_CH_landcover_attributes.csv') @property def soil_attr_path(self) -> Union[str, os.PathLike]: return os.path.join(self.static_path, 'CAMELS_CH_soil_attributes.csv') @property def topo_attr_path(self) -> Union[str, os.PathLike]: return os.path.join(self.static_path, 'CAMELS_CH_topographic_attributes.csv') @property def static_features(self): return self._static_data().columns.tolist() @property def dynamic_features(self) -> List[str]: return self._dynamic_features
[docs] def all_hourly_stations(self) -> List[str]: """Names of all stations which have hourly data""" return pd.read_excel( os.path.join(self.path, 'Inventory_discharge_hydroCH.xlsx'), dtype={'ID': str} )['ID'].values.tolist()
[docs] def hourly_stations(self) -> List[str]: """ IDs of those stations which have hourly data and which are also part of CAMELS-CH dataset """ return [stn for stn in self.all_hourly_stations() if stn in self.stations()]
@property def start(self): # start of data return pd.Timestamp('1981-01-01') @property def end(self): # end of data return pd.Timestamp('2020-12-31')
[docs] def stations(self) -> List[str]: """Returns station ids for catchments""" stns = pd.read_csv( self.glacier_attr_path, sep=';', skiprows=1 )['gauge_id'].values.tolist() return [str(stn) for stn in stns]
@property def foen_path(self) -> Union[str, os.PathLike]: return os.path.join(self.path, 'DischargeDBHydroCH', 'DischargeDBHydroCH', 'CH', 'FOEN')
[docs] def foen_stations(self) -> List[str]: """Returns all the stations in the FOEN folder""" return os.listdir(self.foen_path)
def read_hourly_q_ch(self, stn: str) -> pd.DataFrame: stn = f"Q_{stn}_hourly.asc" fname = [fname for fname in self.foen_stations() if stn in fname][0] fpath = os.path.join(self.foen_path, fname) q = pd.read_csv(fpath, sep="\t", parse_dates=[['YYYY', 'MM', 'DD', 'HH']], index_col='YYYY_MM_DD_HH', ) q.index = pd.to_datetime(q.index) q.columns = ['q_cms'] q.index.name = "time" return q
[docs] def glacier_attrs(self) -> pd.DataFrame: """ returns a dataframe with four columns - 'glac_area' - 'glac_vol' - 'glac_mass' - 'glac_area_neighbours' """ df = pd.read_csv( self.glacier_attr_path, sep=';', skiprows=1, index_col='gauge_id', dtype=np.float32 ) df.index = df.index.astype(int).astype(str) return df
[docs] def climate_attrs(self) -> pd.DataFrame: """returns 14 climate attributes of catchments. """ df = pd.read_csv( self.clim_attr_path, skiprows=1, sep=';', index_col='gauge_id', dtype={ 'gauge_id': str, 'p_mean': float, 'aridity': float, 'pet_mean': float, 'p_seasonality': float, 'frac_snow': float, 'high_prec_freq': float, 'high_prec_dur': float, 'high_prec_timing': str, 'low_prec_timing': str } ) return df
[docs] def geol_attrs(self) -> pd.DataFrame: """15 geological features""" df = pd.read_csv( self.geol_attr_path, skiprows=1, sep=';', index_col='gauge_id', dtype=np.float32 ) df.index = df.index.astype(int).astype(str) return df
[docs] def supp_geol_attrs(self) -> pd.DataFrame: """supplimentary geological features""" df = pd.read_csv( self.supp_geol_attr_path, skiprows=1, sep=';', index_col='gauge_id', dtype=np.float32 ) df.index = df.index.astype(int).astype(str) return df
[docs] def human_inf_attrs(self) -> pd.DataFrame: """ 14 athropogenic factors """ df = pd.read_csv( self.hum_inf_attr_path, skiprows=1, sep=';', index_col='gauge_id', dtype={ 'gauge_id': str, 'n_inhabitants': int, 'dens_inhabitants': float, 'hp_count': int, 'hp_qturb': float, 'hp_inst_turb': float, 'hp_max_power': float, 'num_reservoir': int, 'reservoir_cap': float, 'reservoir_he': float, 'reservoir_fs': float, 'reservoir_irr': float, 'reservoir_nousedata': float, # 'reservoir_year_first': int, # 'reservoir_year_last': int } ) return df
[docs] def hydrogeol_attrs(self) -> pd.DataFrame: """10 hydrogeological factors""" df = pd.read_csv( self.hydrogeol_attr_path, skiprows=1, sep=';', index_col='gauge_id', dtype=float ) df.index = df.index.astype(int).astype(str) return df
[docs] def hydrol_attrs(self) -> pd.DataFrame: """14 hydrological parameters + 2 useful infos""" df = pd.read_csv( self.hydrol_attr_path, skiprows=1, sep=';', index_col='gauge_id', dtype={ 'gauge_id': str, 'sign_number_of_years': int, 'q_mean': float, 'runoff_ratio': float, 'stream_elas': float, 'slope_fdc': float, 'baseflow_index_landson': float, 'hfd_mean': float, 'Q5': float, 'Q95': float, 'high_q_freq': float, 'high_q_dur': float, 'low_q_freq': float } ) return df
[docs] def landcolover_attrs(self) -> pd.DataFrame: """13 landcover parameters""" return pd.read_csv( self.lc_attr_path, skiprows=1, sep=';', index_col='gauge_id', dtype={ 'gauge_id': str, 'crop_perc': float, 'grass_perc': float, 'scrub_perc': float, 'dwood_perc': float, 'mixed_wood_perc': float, 'ewood_perc': float, 'wetlands_perc': float, 'inwater_perc': float, 'ice_perc': float, 'loose_rock_perc': float, 'rock_perc': float, 'urban_perc': float, 'dom_land_cover': str } )
[docs] def soil_attrs(self) -> pd.DataFrame: """80 soil parameters""" df = pd.read_csv( self.soil_attr_path, skiprows=1, sep=';', index_col='gauge_id' ) df.index = df.index.astype(int).astype(str) return df
[docs] def topo_attrs(self) -> pd.DataFrame: """topographic parameters""" df = pd.read_csv( self.topo_attr_path, skiprows=1, sep=';', index_col='gauge_id', encoding="unicode_escape" ) df.index = df.index.astype(int).astype(str) return df
def _static_data(self)->pd.DataFrame: df = pd.concat( [ self.climate_attrs(), self.geol_attrs(), self.supp_geol_attrs(), self.glacier_attrs(), self.human_inf_attrs(), self.hydrogeol_attrs(), self.hydrol_attrs(), self.landcolover_attrs(), self.soil_attrs(), self.topo_attrs(), ], axis=1) df.index = df.index.astype(str) df.rename(columns=self.static_map, inplace=True) return df def _read_stn_dyn(self, station: str) -> pd.DataFrame: """ Reads daily dynamic (meteorological + streamflow) data for one catchment and returns as DataFrame """ df = pd.read_csv( os.path.join(self.dynamic_path, f"CAMELS_CH_obs_based_{station}.csv"), sep=';', index_col='date', parse_dates=True, dtype=np.float32 ) df.rename(columns=self.dyn_map, inplace=True) return df
[docs] class CAMELS_DE(_RainfallRunoff): """ This is the data from 1555 German catchments following the work of `Loritz et al., 2024 <https://doi.org/10.5194/essd-16-5625-2024>`_ . The data is downloaded from `zenodo <https://zenodo.org/record/12733968>`_ . This data consists of 155 static and 21 dynamic features. The dynamic features span from 1951-01-01 to 2020-12-31 with daily timestep. Examples -------- >>> from aqua_fetch import CAMELS_DE >>> dataset = CAMELS_DE() >>> _, df = dataset.fetch(stations=1, as_dataframe=True) >>> df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it >>> df.shape (25568, 21) get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 1555 get data of 10 % of stations as dataframe >>> _, df = dataset.fetch(0.1, as_dataframe=True) >>> df.shape (536928, 155) The returned dataframe is a multi-indexed data >>> df.index.names == ['time', 'dynamic_features'] True get data by station id >>> _, df = dataset.fetch(stations='DE110260', as_dataframe=True) >>> df.unstack().shape (25568, 21) get names of available dynamic features >>> dataset.dynamic_features get only selected dynamic features >>> _, data = dataset.fetch(1, as_dataframe=True, ... dynamic_features=['airtemp_C_mean', 'rh_%', 'pcp_mm_mean', 'q_cms_obs']) >>> data.unstack().shape (25568, 4) get names of available static features >>> dataset.static_features get data of 10 random stations >>> _, df = dataset.fetch(10, as_dataframe=True) >>> df.shape # remember this is a multiindexed dataframe (536928, 10) If we want to get both static and dynamic data >>> static, dynamic = dataset.fetch(stations='DE110260', static_features="all", as_dataframe=True) >>> static.shape, dynamic.unstack().shape ((1, 111), (25568, 21)) >>> coords = dataset.stn_coords() # returns coordinates of all stations >>> coords.shape (1555, 2) >>> dataset.stn_coords('DE110250') # returns coordinates of station whose id is DE110250 47.925221 8.191595 >>> dataset.stn_coords(['DE110250', 'DE110260']) # returns coordinates of two stations """ url = "https://zenodo.org/record/12733968"
[docs] def __init__( self, path=None, overwrite: bool = False, to_netcdf: bool = True, verbosity: int = 1, **kwargs ): """ Parameters ---------- path : str If the data is alredy downloaded then provide the complete path to it. If None, then the data will be downloaded. The data is downloaded once and therefore susbsequent calls to this class will not download the data unless ``overwrite`` is set to True. overwrite : bool If the data is already down then you can set it to True, to make a fresh download. to_netcdf : bool whether to convert all the data into one netcdf file or not. This will fasten repeated calls to fetch etc. but will require netCDF5 package as well as xarray. """ super().__init__(path=path, verbosity=verbosity, **kwargs) self._download(overwrite=overwrite) if to_netcdf and netCDF4 is None: warnings.warn("netCDF4 is not installed. Therefore, the data will not be converted to netcdf format.") to_netcdf = False if to_netcdf: self._maybe_to_netcdf('camels_de_dyn')
@property def boundary_file(self) -> os.PathLike: return os.path.join(self.path, "camels_de", "CAMELS_DE_catchment_boundaries", "catchments", "CAMELS_DE_catchments.shp") @property def static_map(self) -> Dict[str, str]: return { 'area': catchment_area(), 'slope_fdc': slope(''), 'gauge_lat': gauge_latitude(), 'gauge_lon': gauge_longitude(), } @property def dyn_map(self): # table 1 in https://essd.copernicus.org/articles/16/5625/2024/#&gid=1&pid=1 return { 'discharge_vol': observed_streamflow_cms(), 'discharge_spec': observed_streamflow_mmd(), 'temperature_min': min_air_temp(), 'temperature_max': max_air_temp(), 'temperature_mean': mean_air_temp(), # 'precipitation_mean': 'pcp_mm', 'precipitation_mean': total_precipitation_with_specifier('mean'), # todo: is it mean or total? 'precipitation_median': total_precipitation_with_specifier('median'), 'precipitation_stdev': total_precipitation_with_specifier('std'), 'precipitation_min': total_precipitation_with_specifier('min'), 'precipitation_max': total_precipitation_with_specifier('max'), 'humidity_mean': mean_rel_hum(), 'humidity_median': rel_hum_with_specifier('med'), 'humidity_stdev': rel_hum_with_specifier('std'), 'humidity_min': rel_hum_with_specifier('min'), 'humidity_max': rel_hum_with_specifier('max'), # 'water_level': # observed daily water level, 'radiation_global_stdev': solar_radiation_with_specifier('std'), 'radiation_global_min': solar_radiation_with_specifier('min'), 'radiation_global_median': solar_radiation_with_specifier('med'), 'radiation_global_mean': solar_radiation_with_specifier('mean'), 'radiation_global_max': solar_radiation_with_specifier('max'), } @property def ts_dir(self) -> str: return os.path.join(self.path, 'camels_de', 'timeseries') @property def clim_attr_path(self) -> str: return os.path.join(self.path, 'camels_de', 'CAMELS_DE_climatic_attributes.csv') @property def hum_infl_path(self) -> str: return os.path.join(self.path, 'camels_de', 'CAMELS_DE_humaninfluence_attributes.csv') @property def hydrogeol_attr_path(self) -> str: return os.path.join(self.path, 'camels_de', 'CAMELS_DE_hydrogeology_attributes.csv') @property def hydrol_attr_path(self) -> str: return os.path.join(self.path, 'camels_de', 'CAMELS_DE_hydrologic_attributes.csv') @property def lc_attr_path(self) -> str: return os.path.join(self.path, 'camels_de', 'CAMELS_DE_landcover_attributes.csv') @property def sim_attr_path(self) -> str: return os.path.join(self.path, 'camels_de', 'CAMELS_DE_simulation_benchmark.csv') @property def soil_attr_path(self) -> str: return os.path.join(self.path, 'camels_de', 'CAMELS_DE_soil_attributes.csv') @property def topo_attr_path(self) -> str: return os.path.join(self.path, 'camels_de', 'CAMELS_DE_topographic_attributes.csv')
[docs] def stations(self) -> List[str]: return [f.split('_')[4].split('.')[0] for f in os.listdir(self.ts_dir)]
def clim_attrs(self) -> pd.DataFrame: return pd.read_csv(self.clim_attr_path, index_col='gauge_id', # dtype=np.float32 ) def hum_infl_attrs(self) -> pd.DataFrame: return pd.read_csv(self.hum_infl_path, index_col='gauge_id', # dtype=np.float32 ) def hydrogeol_attrs(self) -> pd.DataFrame: return pd.read_csv(self.hydrogeol_attr_path, index_col='gauge_id', # dtype=np.float32 ) def hydrol_attrs(self) -> pd.DataFrame: return pd.read_csv(self.hydrol_attr_path, index_col='gauge_id', # dtype=np.float32 ) def lc_attrs(self) -> pd.DataFrame: return pd.read_csv(self.lc_attr_path, index_col='gauge_id', # dtype=np.float32 ) def sim_attrs(self) -> pd.DataFrame: return pd.read_csv(self.sim_attr_path, index_col='gauge_id', # dtype=np.float32 ) def soil_attrs(self) -> pd.DataFrame: return pd.read_csv(self.soil_attr_path, index_col='gauge_id', # dtype=np.float32 ) def topo_attrs(self) -> pd.DataFrame: return pd.read_csv(self.topo_attr_path, index_col='gauge_id', # dtype=np.float32 ) def _static_data(self) -> pd.DataFrame: df = pd.concat([ self.clim_attrs(), self.hum_infl_attrs(), self.hydrogeol_attrs(), self.hydrol_attrs(), self.lc_attrs(), self.sim_attrs(), self.soil_attrs(), self.topo_attrs() ], axis=1) df.rename(columns=self.static_map, inplace=True) return df def _read_stn_dyn(self, station) -> pd.DataFrame: """ Reads daily dynamic (meteorological + streamflow) data for one catchment and returns as DataFrame """ df = pd.read_csv( os.path.join(self.ts_dir, f"CAMELS_DE_hydromet_timeseries_{station}.csv"), # sep=';', index_col='date', parse_dates=True, # dtype=np.float32 ) df.rename(columns=self.dyn_map, inplace=True) return df @property def start(self): return pd.Timestamp('1951-01-01') @property def end(self): return pd.Timestamp('2020-12-31') @property def dynamic_features(self) -> List[str]: return self._read_stn_dyn(self.stations()[0]).columns.tolist() @property def static_features(self) -> List[str]: return self._static_data().columns.tolist() @property def _coords_name(self) -> List[str]: return ['gauge_lat', 'gauge_lon'] @property def _area_name(self) -> str: return 'area' @property def _mmd_feature_name(self) -> str: """Observed catchment-specific discharge (converted to millimetres per day using catchment areas""" return observed_streamflow_mmd()
[docs] class CAMELS_SE(_RainfallRunoff): """ Dataset of 50 Swedish catchments following the works of `Teutschbein et al., 2024 <https://doi.org/10.1002/gdj3.239>`_ . The data is downloaded from Swedish National Data Service `website <https://snd.se/en/catalogue/dataset/2023-173>`_ . The dataset consists of 76 static catchment features and 4 dynamic features. The dynamic features span from 19610101 to 20201231 with daily timestep. Examples -------- >>> from aqua_fetch import CAMELS_SE >>> dataset = CAMELS_SE() >>> _, df = dataset.fetch(stations=1, as_dataframe=True) >>> _, df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it >>> df.shape (21915, 4) get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 50 get data of 10 % of stations as dataframe >>> _, df = dataset.fetch(0.1, as_dataframe=True) >>> df.shape (87660, 5) The returned dataframe is a multi-indexed data >>> df.index.names == ['time', 'dynamic_features'] True get data by station id >>> _, df = dataset.fetch(stations='5', as_dataframe=True) >>> df.unstack().shape (21915, 4) get names of available dynamic features >>> dataset.dynamic_features get only selected dynamic features >>> _, data = dataset.fetch(1, as_dataframe=True, ... dynamic_features=['q_cms_obs', 'q_mmd_obs', 'pcp_mm', 'airtemp_C_mean']) >>> data.unstack().shape (21915, 4) get names of available static features >>> dataset.static_features ... # get data of 10 random stations >>> _, df = dataset.fetch(10, as_dataframe=True) >>> df.shape # remember this is a multiindexed dataframe (87660, 10) If we want to get both static and dynamic data >>> static, dynamic = dataset.fetch(stations='5', static_features="all", as_dataframe=True) >>> static.shape, dynamic.unstack().shape ((1, 76), (21915, 4)) >>> coords = dataset.stn_coords() # returns coordinates of all stations >>> coords.shape (50, 2) >>> dataset.stn_coords('5') # returns coordinates of station whose id is GRDC_3664802 68.0356 21.9758 >>> dataset.stn_coords(['5', '200']) # returns coordinates of two stations """ url = { 'catchment properties.zip': 'https://api.researchdata.se/dataset/2023-173/1/file/data?filePath=catchment+properties.zip', 'catchment time series.zip': 'https://api.researchdata.se/dataset/2023-173/1/file/data?filePath=catchment+time+series.zip', 'catchment_GIS_shapefiles.zip': 'https://api.researchdata.se/dataset/2023-173/1/file/data?filePath=catchment_GIS_shapefiles.zip', 'Documentation_2024-01-02.pdf': 'https://api.researchdata.se/dataset/2023-173/1/file/documentation?filePath=Documentation_2024-01-02.pdf' }
[docs] def __init__( self, path: str = None, to_netcdf: bool = True, overwrite: bool = False, verbosity: int = 1, **kwargs ): """ Arguments: path: path where the CAMELS_SE dataset has been downloaded. This path must contain five zip files and one xlsx file. If None, then the data will be downloaded. to_netcdf : """ super().__init__(path=path, verbosity=verbosity, **kwargs) if not os.path.exists(self.path): os.makedirs(self.path) for _file, url in self.url.items(): fpath = os.path.join(self.path, _file) if not os.path.exists(fpath) and not overwrite: if verbosity > 0: print(f"Downloading {_file} from {url}") download(url, outdir=self.path, fname=_file, verbosity=self.verbosity) unzip(self.path, verbosity=self.verbosity) else: if self.verbosity > 0: print(f"{_file} at {self.path} already exists") self._static_features = list(set(self._static_data().columns.tolist())) self._stations = self.physical_properties().index.to_list() self._dynamic_features = self._read_stn_dyn(self.stations()[0], nrows=2).columns.tolist() if to_netcdf and netCDF4 is None: warnings.warn("netCDF4 is not installed. Therefore, the data will not be converted to netcdf format.") to_netcdf = False if to_netcdf: self._maybe_to_netcdf('camels_se_dyn')
@property def boundary_file(self) -> os.PathLike: return os.path.join(self.path, 'catchment_GIS_shapefiles', 'catchment_GIS_shapefiles', 'Sweden_catchments_50_boundaries_WGS84.shp') @property def static_map(self) -> Dict[str, str]: return { 'Area_km2': catchment_area(), 'slope_mean_degree': slope('degrees'), 'Latitude_WGS84': gauge_latitude(), 'Longitude_WGS84': gauge_longitude(), } @property def dyn_map(self): return { 'Qobs_m3s': observed_streamflow_cms(), 'Qobs_mm': observed_streamflow_mmd(), 'Tobs_C': mean_air_temp(), 'Pobs_mm': total_precipitation(), } @property def static_features(self): return self._static_features @property def dynamic_features(self) -> List[str]: return self._dynamic_features @property def properties_path(self): return os.path.join(self.path, 'catchment properties', 'catchment properties') @property def ts_dir(self) -> os.PathLike: return os.path.join(self.path, 'catchment time series', 'catchment time series') @property def _mmd_feature_name(self) -> str: return observed_streamflow_cms() @property def start(self): return pd.Timestamp("19610101") @property def end(self): return pd.Timestamp("20201231")
[docs] def stations(self) -> List[str]: return self._stations
def landcover(self) -> pd.DataFrame: return pd.read_csv( os.path.join(self.properties_path, 'catchments_landcover.csv'), index_col='ID', dtype={'ID': str}) def physical_properties(self) -> pd.DataFrame: return pd.read_csv( os.path.join(self.properties_path, 'catchments_physical_properties.csv'), index_col='ID', dtype={'ID': str}) def soil_classes(self) -> pd.DataFrame: df = pd.read_csv( os.path.join(self.properties_path, 'catchments_soil_classes.csv'), index_col='ID', dtype={'ID': str}) df.columns = [f"{c}_sc" for c in df.columns] return df def hydro_signatures_1961_2020(self) -> pd.DataFrame: df = pd.read_csv( os.path.join(self.properties_path, 'catchments_hydrological_signatures_1961_2020.csv'), index_col='ID', dtype={'ID': str}) df.columns = [f"{c}_hs" for c in df.columns] return df def hydro_signatures_CNP_1961_1990(self) -> pd.DataFrame: df = pd.read_csv( os.path.join(self.properties_path, 'catchments_hydrological_signatures_CNP1_1961_1990.csv'), index_col='ID', dtype={'ID': str}) df.columns = [f"{c}_CNP_61_90" for c in df.columns] return df def hydro_signatures_CNP_1990_2020(self) -> pd.DataFrame: df = pd.read_csv( os.path.join(self.properties_path, 'catchments_hydrological_signatures_CNP2_1991_2020.csv'), index_col='ID', dtype={'ID': str}) df.columns = [f"{c}_CNP_91_20" for c in df.columns] return df def _static_data(self) -> pd.DataFrame: df = pd.concat([ self.landcover(), self.physical_properties(), self.soil_classes(), self.hydro_signatures_1961_2020(), self.hydro_signatures_CNP_1961_1990(), self.hydro_signatures_CNP_1990_2020() ], axis=1) df.rename(columns=self.static_map, inplace=True) return df def _read_stn_dyn(self, station, nrows=None) -> pd.DataFrame: """ Reads daily dynamic (meteorological + streamflow) data for one catchment and returns as DataFrame """ # find file starting with 'catchment_id_stn_id_' in self.path stn_id = f'catchment_id_{station}_' fname = [f for f in os.listdir(self.ts_dir) if f.startswith(stn_id)] assert len(fname) == 1 fname = fname[0] df = pd.read_csv( os.path.join(self.ts_dir, fname), index_col='Year_Month_Day', parse_dates=[['Year', 'Month', 'Day']], dtype={'Qobs_m3s': np.float32, 'Qobs_mm': np.float32, 'Pobs_mm': np.float32, 'Tobs_C': np.float32}, nrows=nrows, ) for old_name, new_name in self.dyn_map.items(): if old_name in df.columns: df.rename(columns={old_name: new_name}, inplace=True) return df
[docs] class CAMELS_DK(_RainfallRunoff): """ This is an updated version of :py class:`aqua_fetch.rr.Caravan_DK` dataset . This dataset was presented by `Liu et al., 2024 <https://doi.org/10.5194/essd-2024-292>`_ and is available at `dataverse <https://dataverse.geus.dk/dataset.xhtml?persistentId=doi:10.22008/FK2/AZXSYP>`_ . This dataset consists of 119 static and 13 dynamic features from 3330 Danish catchments. The dynamic (time series) features span from 1989-01-02 to 2023-12-31 with daily timestep. However, the streamflow observations are available for only 304 catchments. Examples --------- >>> from aqua_fetch import CAMELS_DK >>> dataset = CAMELS_DK() >>> _, data = dataset.fetch(0.1, as_dataframe=True) >>> data.shape (166166, 30) # 30 represents number of stations Since data is a multi-index dataframe, we can get data of one station as below >>> data['54130033'].unstack().shape (12782, 13) If we don't set as_dataframe=True, then the returned data will be a xarray Dataset >>> _, data = dataset.fetch(0.1) >>> type(data) xarray.core.dataset.Dataset >>> data.dims FrozenMappingWarningOnValuesAccess({'time': 12782, 'dynamic_features': 13}) >>> len(data.data_vars) 30 >>> _, df = dataset.fetch(stations=1, as_dataframe=True) # get data of only one random station >>> df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it >>> df.shape (12782, 13) # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 304 # get data by station id >>> _, df = dataset.fetch(stations='54130033', as_dataframe=True) >>> df.unstack().shape (12782, 13) # get names of available dynamic features >>> dataset.dynamic_features # get only selected dynamic features >>> _, df = dataset.fetch(1, as_dataframe=True, ... dynamic_features=['Abstraction', 'pet_mm', 'airtemp_C_mean', 'pcp_mm', 'q_cms_obs']) >>> df.unstack().shape (12782, 5) # get names of available static features >>> dataset.static_features # get data of 10 random stations >>> _, df = dataset.fetch(10, as_dataframe=True) >>> df.shape (166166, 10) # remember this is multi-indexed DataFrame # If we get both static and dynamic data >>> static, dynamic = dataset.fetch(stations='54130033', static_features="all", as_dataframe=True) >>> static.shape, dynamic.unstack().shape ((1, 119), (12782, 13)) >>> coords = dataset.stn_coords() # returns coordinates of all stations >>> coords.shape (304, 2) >>> dataset.stn_coords('54130033') # returns coordinates of station whose id is GRDC_3664802 6131379.493 559057.7232 >>> dataset.stn_coords(['54130033', '13210113']) # returns coordinates of two stations """ url = { 'CAMELS_DK_304_gauging_catchment_boundaries.cpg': 'https://dataverse.geus.dk/api/access/datafile/83017', 'CAMELS_DK_304_gauging_catchment_boundaries.prj': 'https://dataverse.geus.dk/api/access/datafile/83019', 'CAMELS_DK_304_gauging_catchment_boundaries.shp': 'https://dataverse.geus.dk/api/access/datafile/83021', 'CAMELS_DK_304_gauging_catchment_boundaries.dbf': 'https://dataverse.geus.dk/api/access/datafile/83020', 'CAMELS_DK_304_gauging_catchment_boundaries.shx': 'https://dataverse.geus.dk/api/access/datafile/83018', 'CAMELS_DK_304_gauging_stations.cpg': 'https://dataverse.geus.dk/api/access/datafile/83008', 'CAMELS_DK_304_gauging_stations.dbf': 'https://dataverse.geus.dk/api/access/datafile/83010', 'CAMELS_DK_304_gauging_stations.prj': 'https://dataverse.geus.dk/api/access/datafile/83009', 'CAMELS_DK_304_gauging_stations.shp': 'https://dataverse.geus.dk/api/access/datafile/83011', 'CAMELS_DK_304_gauging_stations.shx': 'https://dataverse.geus.dk/api/access/datafile/83007', 'CAMELS_DK_climate.csv': 'https://dataverse.geus.dk/api/access/datafile/83123', 'CAMELS_DK_geology.csv': 'https://dataverse.geus.dk/api/access/datafile/83124', 'CAMELS_DK_georegion.dbf': 'https://dataverse.geus.dk/api/access/datafile/83030', 'CAMELS_DK_georegion.prj': 'https://dataverse.geus.dk/api/access/datafile/83026', 'CAMELS_DK_georegion.sbn': 'https://dataverse.geus.dk/api/access/datafile/83027', 'CAMELS_DK_georegion.sbx': 'https://dataverse.geus.dk/api/access/datafile/83028', 'CAMELS_DK_georegion.shp': 'https://dataverse.geus.dk/api/access/datafile/83029', 'CAMELS_DK_georegion.shx': 'https://dataverse.geus.dk/api/access/datafile/83031', 'CAMELS_DK_landuse.csv': 'https://dataverse.geus.dk/api/access/datafile/83125', 'CAMELS_DK_script.py': 'https://dataverse.geus.dk/api/access/datafile/83135', 'CAMELS_DK_signature_obs_based.csv': 'https://dataverse.geus.dk/api/access/datafile/83131', 'CAMELS_DK_signature_sim_based.csv': 'https://dataverse.geus.dk/api/access/datafile/83132', 'CAMELS_DK_soil.csv': 'https://dataverse.geus.dk/api/access/datafile/83126', 'CAMELS_DK_topography.csv': 'https://dataverse.geus.dk/api/access/datafile/83127', 'Data_description.pdf': 'https://dataverse.geus.dk/api/access/datafile/83138', 'Gauged_catchments.zip': 'https://dataverse.geus.dk/api/access/datafile/83022', 'Ungauged_catchments.zip': 'https://dataverse.geus.dk/api/access/datafile/83025', }
[docs] def __init__(self, path=None, overwrite=False, to_netcdf: bool = True, **kwargs): """ Parameters ---------- path : str If the data is alredy downloaded then provide the complete path to it. If None, then the data will be downloaded. The data is downloaded once and therefore susbsequent calls to this class will not download the data unless ``overwrite`` is set to True. overwrite : bool If the data is already down then you can set it to True, to make a fresh download. to_netcdf : bool whether to convert all the data into one netcdf file or not. This will fasten repeated calls to fetch etc but will require netcdf5 package as well as xarry. """ super(CAMELS_DK, self).__init__(path=path, **kwargs) self._download(overwrite=overwrite) # self.dyn_fname = os.path.join(self.path, 'camelsdk_dyn.nc') self._static_features = self._static_data().columns.to_list() self._dynamic_features = self._read_csv(self.stations()[0]).columns.to_list() if to_netcdf: self._maybe_to_netcdf('camels_dk_dyn')
@property def boundary_file(self) -> os.PathLike: return os.path.join( self.path, "CAMELS_DK_304_gauging_catchment_boundaries.shp" ) @property def static_map(self) -> Dict[str, str]: return { 'catch_area': catchment_area(), 'slope_mean': slope('mkm-1'), 'catch_outlet_lat': gauge_latitude(), 'catch_outlet_lon': gauge_longitude(), } @property def dyn_map(self): # table 1 in https://essd.copernicus.org/preprints/essd-2024-292/essd-2024-292.pdf return { 'Qobs': observed_streamflow_cms(), 'temperature': mean_air_temp(), 'precipitation': total_precipitation(), 'pet': total_potential_evapotranspiration(), # todo: should we write method (makkink) 'Qsim': simulated_streamflow_cms(), "DKM_eta": actual_evapotranspiration() } @property def gaug_catch_path(self): return os.path.join(self.path, "Gauged_catchments", "Gauged_catchments") @property def climate_fpath(self): return os.path.join(self.path, "CAMELS_DK_climate.csv") @property def geology_fpath(self): return os.path.join(self.path, "CAMELS_DK_geology.csv") @property def landuse_fpath(self): return os.path.join(self.path, "CAMELS_DK_landuse.csv") @property def soil_fpath(self): return os.path.join(self.path, "CAMELS_DK_soil.csv") @property def topography_fpath(self): return os.path.join(self.path, "CAMELS_DK_topography.csv") @property def signature_obs_fpath(self): return os.path.join(self.path, "CAMELS_DK_signature_obs_based.csv") @property def signature_sim_fpath(self): return os.path.join(self.path, "CAMELS_DK_signature_sim_based.csv") def climate_data(self): df = pd.read_csv(self.climate_fpath, index_col=0) df.index = df.index.astype(str) return df def geology_data(self): df = pd.read_csv(self.geology_fpath, index_col=0) df.index = df.index.astype(str) return df def landuse_data(self): df = pd.read_csv(self.landuse_fpath, index_col=0) df.index = df.index.astype(str) return df def soil_data(self): df = pd.read_csv(self.soil_fpath, index_col=0) df.index = df.index.astype(str) return df def topography_data(self): df = pd.read_csv(self.topography_fpath, index_col=0) df.index = df.index.astype(str) return df def signature_obs_data(self): df = pd.read_csv(self.signature_obs_fpath, index_col=0) df.index = df.index.astype(str) return df def signature_sim_data(self): df = pd.read_csv(self.signature_sim_fpath, index_col=0) df.index = df.index.astype(str) return df def _static_data(self) -> pd.DataFrame: """combination of topographic + soil + landuse + geology + climate features Returns ------- pd.DataFrame a :obj:`pandas.DataFrame` of static features of all catchments of shape (3330, 119) """ df = pd.concat([self.climate_data(), self.geology_data(), self.landuse_data(), self.soil_data(), self.topography_data() ], axis=1) df.rename(columns=self.static_map, inplace=True) return df
[docs] def stations(self) -> List[str]: return [fname.split(".csv")[0].split('_')[4] for fname in os.listdir(self.gaug_catch_path)]
def _read_csv(self, stn: str) -> pd.DataFrame: fpath = os.path.join(self.gaug_catch_path, f"CAMELS_DK_obs_based_{stn}.csv") df = pd.read_csv(os.path.join(fpath), parse_dates=True, index_col='time') df.columns.name = 'dynamic_features' df.pop('catch_id') df = df.astype(np.float32) df.rename(columns=self.dyn_map, inplace=True) # for old_name, new_name in self.dyn_map.items(): # if old_name in df.columns: # df.rename(columns={old_name: new_name}, inplace=True) return df @property def dynamic_features(self) -> List[str]: """returns names of dynamic features""" return self._dynamic_features @property def static_features(self) -> List[str]: """returns static features for Denmark catchments""" return self._static_features @property def _coords_name(self) -> List[str]: return ['catch_outlet_lat', 'catch_outlet_lon'] @property def _area_name(self) -> str: return 'catch_area' @property def _q_name(self) -> str: return observed_streamflow_cms() @property def start(self) -> pd.Timestamp: # start of data return pd.Timestamp('1989-01-02 00:00:00') @property def end(self) -> pd.Timestamp: # end of data return pd.Timestamp('2023-12-31 00:00:00') def _read_dynamic( self, stations, dynamic_features, st=None, en=None) -> dict: st, en = self._check_length(st, en) features = check_attributes(dynamic_features, self.dynamic_features) dyn = {stn: self._read_csv(stn).loc[st:en, features] for stn in stations} return dyn
[docs] def transform_stn_coords(self, df:pd.DataFrame)->pd.DataFrame: ct_m = pd.DataFrame(columns=['lat', 'long'], index=df.index) # Test the function using lat, long in c DataFrame for i in range(0, len(df)): lat, lon = utm_to_lat_lon(df.iloc[i, 1], df.iloc[i, 0], 32) ct_m.iloc[i] = [lat, lon] return ct_m
[docs] def transform_coords(self, coords): """ Transforms the coordinates to the required format. """ # from EPSG:25832 - ETRS89 / UTM zone 32N to WGS84 return coords
[docs] class CAMELS_IND(_RainfallRunoff): """ Dataset of 472 catchments from Republic of India following the works of `Mangukiya et al., 2024 <https://doi.org/10.5194/essd-2024-379>`_. The dataset consists of 210 static catchment features and 20 dynamic features. The dynamic features span from 19800101 to 20201231 with daily timestep. Examples --------- >>> from aqua_fetch import CAMELS_IND >>> dataset = CAMELS_IND() >>> _, data = dataset.fetch(0.1, as_dataframe=True) >>> data.shape (299520, 47) # 47 represents number of stations Since data is a multi-index dataframe, we can get data of one station as below >>> data['17015'].unstack().shape (14976, 20) If we don't set as_dataframe=True, then the returned data will be a xarray Dataset >>> _, data = dataset.fetch(0.1) >>> type(data) xarray.core.dataset.Dataset >>> data.dims FrozenMappingWarningOnValuesAccess({'time': 14976, 'dynamic_features': 20}) >>> len(data.data_vars) 47 >>> _, df = dataset.fetch(stations=1, as_dataframe=True) # get data of only one random station >>> df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it >>> df.shape (14976, 20) # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 472 # get data by station id >>> _, df = dataset.fetch(stations='3001', as_dataframe=True) >>> df.unstack().shape (14976, 20) # get names of available dynamic features >>> dataset.dynamic_features # get only selected dynamic features >>> _, df = dataset.fetch(1, as_dataframe=True, ... dynamic_features=['pcp_mm', 'rh_%', 'airtemp_C_mean', 'pet_mm', 'q_cms_obs']) >>> df.unstack().shape (14976, 5) # get names of available static features >>> dataset.static_features # get data of 10 random stations >>> _, df = dataset.fetch(10, as_dataframe=True) >>> df.shape (299520, 10) # remember this is multi-indexed DataFrame # If we want to get both static and dynamic data >>> static, dynamic = dataset.fetch(stations='3001', static_features="all", as_dataframe=True) >>> static.shape, dynamic.unstack().shape ((1, 210), (14976, 20)) >>> coords = dataset.stn_coords() # returns coordinates of all stations >>> coords.shape (472, 2) >>> dataset.stn_coords('3001') # returns coordinates of station whose id is 3001 18.3861 80.3917 >>> dataset.stn_coords(['3001', '17021']) # returns coordinates of two stations """ url = "https://zenodo.org/records/13221214"
[docs] def __init__(self, path=None, overwrite=False, to_netcdf: bool = True, **kwargs): super(CAMELS_IND, self).__init__(path=path, **kwargs) self._download(overwrite=overwrite) names = pd.read_csv( os.path.join(self.static_path, "camels_India_name.txt"), sep=";", index_col=0, dtype={0: str} ) id_str = names.index.to_list() id_int = names.index.astype(int).to_list() self.id_map = {str(k): v for k, v in zip(id_int, id_str)} self._static_features = self._static_data().columns.to_list() self._dynamic_features = self._read_stn_dyn(self.stations()[0]).columns.to_list() if to_netcdf: self._maybe_to_netcdf('camels_ind_dyn')
@property def boundary_file(self) -> os.PathLike: return os.path.join( self.path, "shapefiles_catchment", "Merged", "all_catchments.shp" ) @property def static_map(self) -> Dict[str, str]: return { 'cwc_area': catchment_area(), 'slope_mean': slope('degrees'), 'cwc_lat': gauge_latitude(), 'cwc_lon': gauge_longitude(), } @property def dyn_map(self): # Table A1 return { # 'streamflow_cms': 'obs_q_cms', 'tmin(C)': min_air_temp(), 'tmax(C)': max_air_temp(), 'tavg(C)': mean_air_temp(), 'prcp(mm/day)': total_precipitation(), 'rel_hum(%)': mean_rel_hum(), 'wind(m/s)': mean_windspeed(), 'wind_u(m/s)': u_component_of_wind(), 'wind_v(m/s)': v_component_of_wind(), # surface downward short-wave radiation flux 'srad_sw(w/m2)': solar_radiation(), # surface downward long-wave radiation flux 'srad_lw(w/m2)': downward_longwave_radiation(), #'sm_lvl2(kg/m2)', # soil moisture of layer 1 (0-0.1 m below ground) #'sm_lvl2(kg/m2)', #'sm_lvl3(kg/m2)', #'sm_lvl4(kg/m2)': , 'pet_gleam(mm/day)': total_potential_evapotranspiration_with_specifier('gleam'), 'pet(mm/day)': total_potential_evapotranspiration(), 'aet_gleam(mm/day)': actual_evapotranspiration_with_specifier('gleam'), #'evap_canopy(kg/m2/s)': evaporation } @property def static_path(self) -> os.PathLike: return os.path.join(self.path, "attributes_txt") @property def q_path(self) -> os.PathLike: return os.path.join(self.path, "streamflow_timeseries") @property def forcings_path(self) -> os.PathLike: return os.path.join(self.path, "catchment_mean_forcings") @property def dynamic_features(self) -> List[str]: """returns names of dynamic features""" return self._dynamic_features @property def static_features(self) -> List[str]: """returns static features for Denmark catchments""" return self._static_features @property def start(self) -> pd.Timestamp: # start of data return pd.Timestamp('1980-01-01') @property def end(self) -> pd.Timestamp: # end of data return pd.Timestamp('2020-12-31') def stn_forcing_path(self, stn: str) -> os.PathLike: return os.path.join( self.forcings_path, self.id_map.get(stn)[0:2], f"{self.id_map.get(stn)}.csv" )
[docs] def stations(self) -> List[str]: """ returns names of stations a list **Node:** 0s are omitted from the start of the station names which means 03001 is returned as 3001 """ stns = pd.read_csv( os.path.join(self.static_path, "camels_India_name.txt"), sep=";", index_col=0, dtype={0: str} ).index.to_list() return [str(int(stn)) for stn in stns]
def _static_data(self) -> pd.DataFrame: """ combination of topographic + soil + landuse + geology + climate + hydro + climate + anthropogenic features Returns ------- pd.DataFrame a :obj:`pandas.DataFrame` of static features of all catchments of shape (3330, 119) """ files = glob.glob(f"{self.static_path}/*.txt") dfs = [] for f in files: df = pd.read_csv(f, sep=";", index_col=0) df.index = df.index.astype(str) dfs.append(df) df = pd.concat(dfs, axis=1) df.rename(columns=self.static_map, inplace=True) return df def _read_q(self, stn: str = None, ) -> pd.DataFrame: """reads observed streamflow data""" fpath = os.path.join(self.q_path, f"streamflow_observed.csv") cols = ['year', 'month', 'day'] if stn is not None: cols.append(stn) else: cols = None df = pd.read_csv(os.path.join(fpath), index_col='year_month_day', parse_dates=[['year', 'month', 'day']], usecols=cols, ) return df.astype(np.float32) def _read_forcings(self, stn: str) -> pd.DataFrame: """reads the foring data for a given station""" fpath = self.stn_forcing_path(stn) df = pd.read_csv(fpath, index_col='year_month_day', parse_dates=[['year', 'month', 'day']], ) return df.astype(np.float32) def _read_stn_dyn(self, stn: str) -> pd.DataFrame: """reads dynamic data for a given station""" q = self._read_q(stn)[stn] q.name = observed_streamflow_cms() df = pd.concat([self._read_forcings(stn), pd.DataFrame(q)], axis=1) for old_name, new_name in self.dyn_map.items(): if old_name in df.columns: df.rename(columns={old_name: new_name}, inplace=True) return df
[docs] class CAMELS_FR(_RainfallRunoff): """ Dataset of 654 catchments from France following the works of `Delaigue et al., 2024 <https://doi.org/10.5194/essd-2024-415>`_. The dataset consists of 344 static catchment features and 22 dynamic features. The dynamic features span from 1970101 to 20211231 with daily timestep. Examples --------- >>> from aqua_fetch import CAMELS_FR >>> dataset = CAMELS_FR() >>> _, data = dataset.fetch(0.1, as_dataframe=True) >>> data.shape (166166, 30) # 30 represents number of stations Since data is a multi-index dataframe, we can get data of one station as below >>> data['J421191001'].unstack().shape (12782, 13) If we don't set as_dataframe=True, then the returned data will be a xarray Dataset >>> _, data = dataset.fetch(0.1) >>> type(data) xarray.core.dataset.Dataset >>> data.dims FrozenMappingWarningOnValuesAccess({'time': 460928, 'dynamic_features': 13}) >>> len(data.data_vars) 36 >>> _, df = dataset.fetch(stations=1, as_dataframe=True) # get data of only one random station >>> df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it >>> df.shape (18993, 22) # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 654 # get data by station id >>> _, df = dataset.fetch(stations='J421191001', as_dataframe=True) >>> df.unstack().shape (18993, 22) # get names of available dynamic features >>> dataset.dynamic_features # get only selected dynamic features >>> _, df = dataset.fetch(1, as_dataframe=True, ... dynamic_features=['pcp_mm', 'spechum_gkg', 'airtemp_C_mean', 'pet_mm_pm', 'q_cms_obs']) >>> df.unstack().shape (18993, 5) # get names of available static features >>> dataset.static_features # get data of 10 random stations >>> _, df = dataset.fetch(10, as_dataframe=True) >>> df.shape (417846, 10) # remember this is multi-indexed DataFrame # If we want to get both static and dynamic data >>> static, dynamic = dataset.fetch(stations='J421191001', static_features="all", as_dataframe=True) >>> static.shape, dynamic.unstack().shape ((1, 334), (18993, 22)) >>> coords = dataset.stn_coords() # returns coordinates of all stations >>> coords.shape (654, 2) >>> dataset.stn_coords('J421191001') # returns coordinates of station whose id is J421191001 48.006298 -4.063848 >>> dataset.stn_coords(['J421191001', 'U104401001']) # returns coordinates of two stations J421191001 48.006298 -4.063848 U104401001 173.170761 -34.918594 """ url = { "ADDITIONAL_LICENSES.zip": "https://entrepot.recherche.data.gouv.fr/api/access/datafile/343463", "CAMELS_FR_attributes.zip": "https://entrepot.recherche.data.gouv.fr/api/access/datafile/343464", 'CAMELS_FR_geography.zip': 'https://entrepot.recherche.data.gouv.fr/api/access/datafile/343465', 'CAMELS_FR_time_series.zip': 'https://entrepot.recherche.data.gouv.fr/api/access/datafile/343470', 'README.md': 'https://entrepot.recherche.data.gouv.fr/api/access/datafile/431300', 'CAMELS-FR_description.ods': 'https://entrepot.recherche.data.gouv.fr/api/access/datafile/348740', }
[docs] def __init__(self, path=None, overwrite=False, **kwargs): super().__init__(path=path, **kwargs) self._download(overwrite=overwrite) self._stations = self.__stations() self._static_features = list(set(self._static_data().columns.to_list())) self._dynamic_features = self._read_stn_dyn(self.stations()[0]).columns.to_list() if self.to_netcdf: self._maybe_to_netcdf('camels_fr_dyn')
@property def boundary_file(self) -> os.PathLike: return os.path.join( self.path, 'CAMELS_FR_geography', 'CAMELS_FR_geography', 'CAMELS_FR_catchment_boundaries.gpkg') @property def static_map(self) -> Dict[str, str]: return { 'hyd_slope_fdc': slope(''), # 'sit_latitude', 'sit_longitude', todo : what is difference between site and guage lat/lon? # gauge latitude/longitude in WGS84 (Hydroportail coordinates) 'sta_x_w84': gauge_longitude(), 'sta_y_w84': gauge_latitude(), # todo: should we use sta_x_w84_snap and sta_y_w84_snap which are # gauge longitude in WGS84 (INRAE's own estimation, snapped on thorical river network) 'sit_area_topo': catchment_area(), } @property def dyn_map(self)->Dict[str, str]: return { # streamflow in liters per second 'tsd_q_l': observed_streamflow_cms(), # streamflow in milimeters per day 'tsd_q_mm': observed_streamflow_mmd(), 'tsd_wind': mean_windspeed(), 'tsd_temp_min': min_air_temp(), # minimum air temperature over the period (18h day-1, 18h day] 'tsd_temp_max': max_air_temp(), # maximum air temperature over the period (18h day-1, 18h day] 'tsd_temp': mean_air_temp(), # mean air temperature over the period (18h day-1, 18h day] # short wave visible radiation over the period (0h day, 0h day+1] 'tsd_rad_ssi': solar_radiation(), # todo: convert from J cm⁻² to W m⁻² # long wave atmospheric radiation over the period (0h day, 0h day+1] 'tsd_rad_dli': downward_longwave_radiation(), # todo : convert from J cm⁻² to W m⁻² # specific air humidity over the period (0h day, 0h day+1] 'tsd_humid': mean_specific_humidity(), # PET over the period (0h day, 0h day+1] (Penman-Monteith method with a modified albedo when snow lies on the ground) 'tsd_pet_pm': total_potential_evapotranspiration_with_specifier('pm'), 'tsd_pet_pe': total_potential_evapotranspiration_with_specifier('pe'), 'tsd_pet_ou': total_potential_evapotranspiration_with_specifier('ou'), # total precipitation (liquid + solid) over the period (6h day, 6h day+1] 'tsd_prec': total_precipitation(), # solid fraction of precipitation over the period (6h day, 6h day+1] 'tsd_prec_solid_frac': total_precipitation_with_specifier('solfrac'), # todo : check its units? } @property def daily_ts_path(self) -> os.PathLike: return os.path.join(self.path, "CAMELS_FR_time_series", "CAMELS_FR_time_series", "daily") @property def attr_path(self) -> os.PathLike: return os.path.join(self.path, "CAMELS_FR_attributes", "CAMELS_FR_attributes") @property def static_attr_path(self) -> os.PathLike: return os.path.join(self.attr_path, "static_attributes") @property def ts_stat_path(self) -> os.PathLike: return os.path.join(self.attr_path, "time_series_statistics") @property def static_features(self) -> List[str]: """returns static features for Denmark catchments""" return self._static_features @property def dynamic_features(self) -> List[str]: """returns names of dynamic features""" return self._dynamic_features @property def start(self) -> pd.Timestamp: # start of data return pd.Timestamp('1970-01-01') @property def end(self) -> pd.Timestamp: # end of data return pd.Timestamp('2021-12-31') def __stations(self) -> List[str]: return pd.read_csv(os.path.join( self.static_attr_path, "CAMELS_FR_human_influences_dams.csv"), sep=";", index_col=0).index.to_list()
[docs] def stations(self) -> List[str]: return self._stations
@property def geog_path(self) -> os.PathLike: return os.path.join(self.path, "CAMELS_FR_geography", "CAMELS_FR_geography")
[docs] def static_attrs(self) -> pd.DataFrame: """ combination of topographic + soil + landuse + geology + climate + hydro + climate + anthropogenic features Returns ------- pd.DataFrame a :obj:`pandas.DataFrame` of static features of all catchments of shape (654, xxxx) """ files = glob.glob(f"{self.static_attr_path}/*.csv") dfs = [] for f in files: df = pd.read_csv(f, sep=";", index_col=0) df.index = df.index.astype(str) if len(df) == 654: dfs.append(df) elif self.verbosity > 1: print(f"skipping {os.path.basename(f)} as it has {len(df)} rows") static_attrs = pd.concat(dfs, axis=1) gen_attrs = pd.read_csv( os.path.join(self.static_attr_path, "CAMELS_FR_site_general_attributes.csv"), sep=";", index_col=0, ) # in gen_attrs the stn_id has lenght of 8 while in static_attrs it is 10 # so adding the last two digits to the gen_attrs _map = {stn[0:-2]: stn for stn in static_attrs.index} gen_attrs = gen_attrs.rename(index=_map) if self.verbosity: for stn in static_attrs.index: if stn not in gen_attrs.index: print(stn, " not found in site_general_attributes.csv") static_attrs = pd.concat([gen_attrs, static_attrs], axis=1) return static_attrs
[docs] def ts_attrs(self) -> pd.DataFrame: """ daily_timeseries statistics of all catchments Returns ------- pd.DataFrame a :obj:`pandas.DataFrame` of static features of all catchments of shape (654, xxxx) """ files = glob.glob(f"{self.ts_stat_path}/*.csv") dfs = [] for f in files: df = pd.read_csv(f, sep=";", index_col=0) df.index = df.index.astype(str) if len(df) == 654: dfs.append(df) elif self.verbosity > 1: print(f"skipping {os.path.basename(f)} as it has {len(df)} rows") return pd.concat(dfs, axis=1)
def _static_data(self) -> pd.DataFrame: """ static attributes plus timeseries statistics Returns ------- pd.DataFrame a :obj:`pandas.DataFrame` of static features of all catchments of shape (654, xxxx) """ static_data = pd.concat([ self.static_attrs(), self.ts_attrs() ], axis=1) # remove duplicated columns df = static_data.loc[:, ~static_data.columns.duplicated()].copy() df.rename(columns=self.static_map, inplace=True) return df def _read_stn_dyn(self, station: str) -> pd.DataFrame: df = pd.read_csv( os.path.join(self.daily_ts_path, f"CAMELS_FR_tsd_{station}.csv"), sep=";", index_col=0, parse_dates=True, comment="#", ) df.rename(columns=self.dyn_map, inplace=True) return df
class CAMELS_SPAT(_RainfallRunoff): """ Dataset of 1426 catchments from North America (USA and Canada) following the works of `Knoben et al., 2025 <https://doi.org/10.5194/egusphere-2025-893>`_. """ stn_name = 'USA_14141500' time_step = 'obs-hourly' # or obs_daily scale = 'macro-scale' # or headwater or meso-scale data_type = 'observations' # or url = { f'https://www.frdr-dfdr.ca/repo/files/1/published/publication_1211/submitted_data/{data_type}/{scale}/{time_step}/{stn_name}_hourly_flow_observations.nc', }
[docs] class CAMELS_NZ(_RainfallRunoff): """ Dataset of 369 catchments from New Zealand following the works of `Harrigan et al., 2025 <https://doi.org/10.5194/essd-2025-244>`_. The dataset consists of 39 static catchment features and 5 dynamic features. The dynamic features span from 19720101 to 20240802 with hourly timestep. The data is downloaded from `figshare <https://doi.org/10.26021/canterburynz.28827644>`_. Examples --------- >>> from aqua_fetch import CAMELS_NZ >>> dataset = CAMELS_NZ() >>> _, data = dataset.fetch(0.1, as_dataframe=True) >>> data.shape (2304640, 36) # 36 represents number of stations Since data is a multi-index dataframe, we can get data of one station as below >>> data['74321'].unstack().shape (460928, 5) If we don't set as_dataframe=True, then the returned data will be a xarray Dataset >>> _, data = dataset.fetch(0.1) >>> type(data) xarray.core.dataset.Dataset >>> data.dims FrozenMappingWarningOnValuesAccess({'time': 460928, 'dynamic_features': 5}) >>> len(data.data_vars) 36 >>> _, df = dataset.fetch(stations=1, as_dataframe=True) # get data of only one random station >>> df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it >>> df.shape (460928, 5) # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 369 # get data by station id >>> _, df = dataset.fetch(stations='74321', as_dataframe=True) >>> df.unstack().shape (460928, 5) # get names of available dynamic features >>> dataset.dynamic_features # get only selected dynamic features >>> _, df = dataset.fetch(1, as_dataframe=True, ... dynamic_features=['pcp_mm', 'rh_%', 'airtemp_C_mean', 'pet_mm', 'q_cms_obs']) >>> df.unstack().shape (460928, 5) # get names of available static features >>> dataset.static_features # get data of 10 random stations >>> _, df = dataset.fetch(10, as_dataframe=True) >>> df.shape (2304640, 10) # remember this is multi-indexed DataFrame # If we want to get both static and dynamic data >>> static, dynamic = dataset.fetch(stations='74321', static_features="all", as_dataframe=True) >>> static.shape, dynamic.unstack().shape ((1, 39), (460928, 5)) >>> coords = dataset.stn_coords() # returns coordinates of all stations >>> coords.shape (369, 2) >>> dataset.stn_coords('74321') # returns coordinates of station whose id is 74321 -45.945599 170.101486 >>> dataset.stn_coords(['74321', '802']) # returns coordinates of two stations 74321 -45.945599 170.101486 802 -34.918594 173.170761 """ url = "https://figshare.canterbury.ac.nz/ndownloader/articles/28827644/versions/1"
[docs] def __init__(self, path:Union[str, os.PathLike]=None, timestep = 'H', **kwargs): super().__init__(name="CAMELS_NZ", path=path, timestep=timestep, **kwargs) if not os.path.exists(self.path): os.makedirs(self.path) if not os.path.exists(os.path.join(self.path, 'camels_nz.zip')) and not self.overwrite: download( outdir=self.path, url=self.url, fname="camels_nz.zip", verbosity=self.verbosity, ) unzip(self.path, verbosity=self.verbosity) # unzip the .zip files which are inside the camels_nz folder unzip(os.path.join(self.path, 'camels_nz'), verbosity=self.verbosity) if self.to_netcdf: self._maybe_to_netcdf('camels_nz_dyn')
@property def boundary_file(self)-> os.PathLike: return os.path.join( self.shapefile_path, "catnz_SpatialJoin.shp" ) @property def dyn_map(self)->Dict[str, str]: return { 'flow': observed_streamflow_cms(), 'temperature': mean_air_temp(), 'Relative_humidity': mean_rel_hum(), 'precipitation': total_precipitation(), 'PET': total_potential_evapotranspiration(), } @property def static_map(self) -> Dict[str, str]: return { 'latitude': gauge_latitude(), 'longitude': gauge_longitude(), 'uparea': catchment_area(), 'elevation': gauge_elevation_meters(), 'usAveSlope': slope('degrees') } @property def start(self) -> pd.Timestamp: return pd.Timestamp('1972-01-01 00:00:00') @property def end(self) -> pd.Timestamp: return pd.Timestamp('2024-08-02 09:00:00') @property def dynamic_features(self) -> List[str]: """returns names of dynamic features""" return [self.dyn_map[feature] for feature in self._path_map] @property def static_features(self) -> List[str]: """returns static features for New Zealand catchments""" return self._static_data(nrows=2).columns.to_list()
[docs] def stations(self)->List[str]: fpath = os.path.join(self.static_path, '4.CAMELS_NZ_Geology.csv') df = pd.read_csv(fpath, index_col=0, usecols=[0, 1]) return df.index.astype(str).tolist()
@property def temp_path(self) -> os.PathLike: return os.path.join(self.path, 'camels_nz', 'CAMELS_NZ_Temperature') @property def precip_path(self) -> os.PathLike: return os.path.join(self.path, 'camels_nz', 'CAMELS_NZ_Precipitation') @property def q_path(self) -> os.PathLike: return os.path.join(self.path, 'camels_nz', 'CAMELS_NZ_Streamflow') @property def shapefile_path(self) -> os.PathLike: return os.path.join(self.path, 'camels_nz', 'CAMELS_NZ_Catchment_Boundaries') @property def pet_path(self) -> os.PathLike: return os.path.join(self.path, 'camels_nz', 'CAMELS_NZ_PET') @property def rh_path(self) -> os.PathLike: return os.path.join(self.path, 'camels_nz', 'CAMELS_NZ_Relative_Humidity') @property def static_path(self) -> os.PathLike: return os.path.join(self.path, 'camels_nz', 'CAMELS_NZ_Catchment_Atrributes') def _static_data(self, nrows:int = None) -> pd.DataFrame: """ static attributes of catchments Returns ------- pd.DataFrame a :obj:`pandas.DataFrame` of static features of all catchments of shape (369, 39) """ dfs = [] idx = 0 # read all .csv files in static_path for csv_file in glob.glob(os.path.join(self.static_path, '*.csv')): df = pd.read_csv(csv_file, index_col=0, nrows=nrows) df.index = df.index.astype(str) if idx > 0: df.drop(columns=['RID', 'StationName', 'latitude', 'longitude'], inplace=True, errors='ignore') dfs.append(df) idx += 1 static_data = pd.concat(dfs, axis=1) static_data.rename(columns=self.static_map, inplace=True) return static_data @property def _nodata_stns(self): """data from following stations is not available. The corresponding files are empty.""" return ['75253', "75261", "75265", "75276", "75294", "15408", "15410", "15453", "33356", "52916", "74318", "74321", "1114629"] def _read_dynamic_para( self, stations:Union[str, List[str]] = "all", para_name:str = "PET", )-> pd.DataFrame: """ reads dynamic data for a given parameter for given stations. """ assert para_name in list(self._path_map.keys()) cpus = self.processes or min(get_cpus(), 32) stations = check_attributes(stations, self.stations(), 'stations') start = time.time() if cpus == 1: q_dfs = [] for _, stn in enumerate(stations): stn_q = self._read_stn_dyn_para(stn, para_name) q_dfs.append(stn_q) if self.verbosity and _ % 100 == 0: print(f"Read {len(q_dfs)} stations so far...") else: with cf.ProcessPoolExecutor(cpus) as executor: results = executor.map( self._read_stn_dyn_para, stations, (para_name for _ in range(len(stations))) ) q_dfs = [stn_q for stn_q in results] total = time.time() - start if self.verbosity: print(f"Read {len(q_dfs)} stations for {para_name} in {total:.2f} seconds with {cpus} cpus.") q_df = pd.concat(q_dfs, axis=1) return q_df @property def _path_map(self) -> Dict[str, os.PathLike]: return { 'PET': self.pet_path, 'precipitation': self.precip_path, 'Relative_humidity': self.rh_path, 'temperature': self.temp_path, 'flow': self.q_path, } def _read_stn_dyn_para(self, stn:str, para_name:str) -> pd.Series: """ read dynamic data for a given station and parameter. """ stn_q = pd.Series(dtype=np.float32, name=stn) fname = { 'Relative_humidity': 'RH' } fpath = os.path.join( self._path_map[para_name], f'{fname.get(para_name, para_name)}_station_id_{stn}.csv') if os.path.exists(fpath): if para_name == 'flow' and stn in self._nodata_stns: return stn_q try: stn_q = pd.read_csv(fpath, index_col=0, parse_dates=True, na_values=['NA ']) except pd.errors.EmptyDataError: print(f"Warning: {para_name}_station_id_{stn}.csv is empty. Skipping station {stn}.") return stn_q format = '%m/%d/%Y %H:%M' if para_name == 'flow' and stn == '57521': format = '%d/%m/%Y %H:%M' stn_q.index = pd.to_datetime(stn_q.index, format=format) stn_q = stn_q[para_name].astype(np.float32).rename(stn) else: if self.verbosity>1: print(f"Warning: {para_name}_station_id_{stn}.csv does not exist. Skipping station {stn}.") stn_q = pd.Series(dtype=np.float32, name=stn) # remove rows with duplicated index, ideally there should not be any stn_q = stn_q[~stn_q.index.duplicated(keep='first')] return stn_q def _read_stn_dyn(self, stn:str)->pd.DataFrame: """ reads dynamic data for a given station """ stn_dfs = [] for para in self._path_map.keys(): stn_para = self._read_stn_dyn_para(stn, para) stn_dfs.append(stn_para.rename(para, inplace=True)) stn_df = pd.concat(stn_dfs, axis=1) stn_df.index = pd.to_datetime(stn_df.index) # convert the temperature to Celcius from Kelvin stn_df['temperature'] = stn_df['temperature'] - 273.15 stn_df.rename(columns=self.dyn_map, inplace=True) return stn_df
[docs] class CAMELS_COL(_RainfallRunoff): """ Dataset of 347 catchments from Colombia following the works of `Jimenez et al., 2025 <https://doi.org/10.5194/essd-2025-200>`_. The dataset consists of 255 static catchment features and 6 dynamic features. The dynamic features span from 19810101 to 20221231 with daily timestep. The data is downloaded from `Zenodo <https://zenodo.org/records/15554735>`_. Examples --------- >>> from aqua_fetch import CAMELS_COL >>> dataset = CAMELS_COL() >>> _, data = dataset.fetch(0.1, as_dataframe=True) >>> data.shape (92040, 34) # 34 represents number of stations Since data is a multi-index dataframe, we can get data of one station as below >>> data['35067040'].unstack().shape (15340, 5) If we don't set as_dataframe=True, then the returned data will be a xarray Dataset >>> _, data = dataset.fetch(0.1) >>> type(data) xarray.core.dataset.Dataset >>> data.dims FrozenMappingWarningOnValuesAccess({'time': 15340, 'dynamic_features': 6}) >>> len(data.data_vars) 34 >>> _, df = dataset.fetch(stations=1, as_dataframe=True) # get data of only one random station >>> df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it >>> df.shape (15340, 6) # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 347 # get data by station id >>> _, df = dataset.fetch(stations='35067040', as_dataframe=True) >>> df.unstack().shape (15340, 6) # get names of available dynamic features >>> dataset.dynamic_features # get only selected dynamic features >>> _, df = dataset.fetch(1, as_dataframe=True, ... dynamic_features=['pcp_mm', 'airtemp_C_mean', 'pet_mm', 'q_cms_obs']) >>> df.unstack().shape (15340, 6) # get names of available static features >>> dataset.static_features # get data of 10 random stations >>> _, df = dataset.fetch(10, as_dataframe=True) >>> df.shape (2304640, 10) # remember this is multi-indexed DataFrame # If we want to get both static and dynamic data >>> static, dynamic = dataset.fetch(stations='35067040', static_features="all", as_dataframe=True) >>> static.shape, dynamic.unstack().shape ((1, 255), (15340, 6)) >>> coords = dataset.stn_coords() # returns coordinates of all stations >>> coords.shape (347, 2) >>> dataset.stn_coords('35067040') # returns coordinates of station whose id is 35067040 4.746433 -73.587807 >>> dataset.stn_coords(['35067040', '21187030']) # returns coordinates of two stations 35067040 4.746433 -73.587807 21187030 4.203826 -75.092720 """ url = "https://zenodo.org/records/15554735"
[docs] def __init__(self, path=None, overwrite=False, to_netcdf: bool = True, **kwargs): super(CAMELS_COL, self).__init__( path=path, to_netcdf=to_netcdf, **kwargs) self._download(overwrite=overwrite) if self.to_netcdf: self._maybe_to_netcdf(f'camels_lux_dyn')
@property def bbox(self) -> Dict[str, float]: return dict(llcrnrlon=-80, llcrnrlat=-5, urcrnrlon=-65, urcrnrlat=12) @property def boundary_file(self) -> os.PathLike: return os.path.join( self.path, "03_CAMELS_COL_Basin_boundary", "03_CAMELS_COL_Basin_boundary", "CAMELS_COL_catchments_boundaries.shp" ) @property def dyn_map(self) -> Dict[str, str]: """ dynamic features map for CAMELS-LUX catchments """ return { 'streamflow': observed_streamflow_cms(), 'pr': total_precipitation(), # CHIRPS V2 't_mean': mean_air_temp(), 't_min': min_air_temp(), 't_max': max_air_temp(), 'poten_evapo': total_potential_evapotranspiration(), } @property def static_map(self) -> Dict[str, str]: return { 'gauge_lat': gauge_latitude(), 'gauge_lon': gauge_longitude(), 'area': catchment_area(), 'gauge_elev': gauge_elevation_meters(), 'perimeter': catchment_perimeter(), } @property def ts_path(self) -> os.PathLike: return os.path.join( self.path, "04_CAMELS_COL_Hydrometeorological_data", "04_CAMELS_COL_Hydrometeorological_data", )
[docs] def stations(self) -> List[str]: return [fname[14:22] for fname in os.listdir(self.ts_path)]
@property def dynamic_features(self) -> List[str]: df = self._read_stn_dyn(self.stations()[0], nrows=2) return df.columns.to_list() @property def static_features(self) -> List[str]: # todo : calling this method again and again can be slow """ returns static features for Colombia catchments """ df = self._static_data() return df.columns.to_list() @property def start(self) -> pd.Timestamp: return pd.Timestamp('1981-01-01') @property def end(self) -> pd.Timestamp: return pd.Timestamp('2022-12-31') def _read_stn_dyn(self, stn:str, nrows=None)->pd.DataFrame: """ reads dynamic data for a given station """ stn_df = pd.read_csv( os.path.join(self.ts_path, f"Hydromet_data_{stn}.txt.txt"), sep='\t', index_col=0, parse_dates=True, nrows=nrows, ) stn_df.index = pd.to_datetime(stn_df.index) if stn_df.index.has_duplicates: print(f"Warning: {stn} has duplicated index. Removing duplicates.") stn_df.rename(columns=self.dyn_map, inplace=True) return stn_df def _soil_data(self) -> pd.DataFrame: """ reads 07_CAMELS_COL_Soil_characteristics.xlsx file """ df = pd.read_excel( os.path.join(self.path, "07_CAMELS_COL_Soil_characteristics.xlsx"), index_col=0, dtype={0: str}, ).T df.index = [name.split('_')[1] for name in df.index] return df def _lc_data(self) -> pd.DataFrame: """ reads 06_CAMELS_COL_Land_cover_characteristics.xlsx file """ df = pd.read_excel( os.path.join(self.path, "06_CAMELS_COL_Land_cover_characteristics.xlsx"), index_col=0, dtype={0: str}, ).T df.index = [name.split('_')[1] for name in df.index] df = df.dropna(axis=1, how='all') return df def _geol_data(self) -> pd.DataFrame: """ reads 05_CAMELS_COL_Geology_characteristics.xlsx file """ df = pd.read_excel( os.path.join(self.path, "05_CAMELS_COL_Geologic_characteristics.xlsx"), index_col=0, dtype={0: str}, usecols="D:MM", ).T df.index = [name.split('_')[1] for name in df.index] df = df.dropna(axis=1, how='all') return df def _static_data(self) -> pd.DataFrame: """ static attributes of catchments Returns ------- pd.DataFrame a :obj:`pandas.DataFrame` of static features of all catchments of shape (347, 255) """ dfs = [] idx = 0 # read all .csv files for xlsx_file in [ '02_CAMELS_COL_Catchment_information', '08_CAMELS_COL_Climatic_indices', '09_CAMELS_COL_Hydrological_signatures', '10_CAMELS_COL_Physiograpic_characteristics']: #if not csv_file.endswith('basin_id.csv'): df = pd.read_excel(os.path.join(self.path, f"{xlsx_file}.xlsx"), index_col=0, dtype={0: str}) df.index = df.index.astype(str) dfs.append(df) idx += 1 static_data = pd.concat(dfs, axis=1) soil = self._soil_data() lc = self._lc_data() geol = self._geol_data() static_data = pd.concat([static_data, soil, lc, geol], axis=1) static_data.rename(columns=self.static_map, inplace=True) for col, fac in self.static_factors.items(): if col in static_data.columns: static_data[col] *= fac # convert latitude and longitude from EPSG:3395 to EPSG:4326 R = 6378137.0 # Earth's radius in meters for EPSG:3395 static_data[gauge_longitude()] = np.degrees(static_data[gauge_longitude()] / R) static_data[gauge_latitude()] = np.degrees(2 * np.arctan(np.exp(static_data[gauge_latitude()] / R)) - np.pi / 2) return static_data
[docs] class CAMELS_SK(_RainfallRunoff): """ Dataset of 178 catchments from South Korea following the work of `Kim et al., 2025 <https://doi.org/10.5281/zenodo.15073263>`_. The dataset consists of 215 static catchment features and 17 dynamic features. The dynamic features span from 20000101 to 20191231 with hourly timestep. Examples --------- >>> from aqua_fetch import CAMELS_SK >>> dataset = CAMELS_SK() >>> _, data = dataset.fetch(0.1, as_dataframe=True) >>> data.shape (2980440, 17) # 17 represents number of stations Since data is a multi-index dataframe, we can get data of one station as below >>> data['2013615'].unstack().shape (175320, 13) If we don't set as_dataframe=True, then the returned data will be a xarray Dataset >>> _, data = dataset.fetch(0.1) >>> type(data) xarray.core.dataset.Dataset >>> data.dims FrozenMappingWarningOnValuesAccess({'time': 175320, 'dynamic_features': 17}) >>> len(data.data_vars) 17 >>> _, df = dataset.fetch(stations=1, as_dataframe=True) # get data of only one random station >>> df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it >>> df.shape (175320, 17) # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 178 # get data by station id >>> _, df = dataset.fetch(stations='2013615', as_dataframe=True) >>> df.unstack().shape (175320, 17) # get names of available dynamic features >>> dataset.dynamic_features # get only selected dynamic features >>> _, df = dataset.fetch(1, as_dataframe=True, ... dynamic_features=['total_precipitation', 'snow_depth', 'air_temp_obs', 'potential_evaporation', 'q_cms_obs']) >>> df.unstack().shape (175320, 17) # get names of available static features >>> dataset.static_features # get data of 10 random stations >>> _, df = dataset.fetch(10, as_dataframe=True) >>> df.shape (155225, 10) # remember this is multi-indexed DataFrame # If we want to get both static and dynamic data >>> static, dynamic = dataset.fetch(stations='2013615', static_features="all", as_dataframe=True) >>> static.shape, dynamic.unstack().shape ((1, 215), (175320, 17)) >>> coords = dataset.stn_coords() # returns coordinates of all stations >>> coords.shape (178, 2) >>> dataset.stn_coords('2013615') # returns coordinates of station whose id is 2013615 35.880798 128.173096 >>> dataset.stn_coords(['2013615', '2017620']) # returns coordinates of two stations 2013615 35.880798 128.173096 2017620 35.527500 128.359207 """ url = "https://zenodo.org/records/15073264"
[docs] def __init__(self, path=None, timestep:str = "H", to_netcdf: bool = True, **kwargs): super(CAMELS_SK, self).__init__( path=path, timestep=timestep, to_netcdf=to_netcdf, **kwargs) self._download(overwrite=self.overwrite) if self.to_netcdf: self._maybe_to_netcdf(f'camels_sk_dyn_{self.timestep}') self._unzip_7z_files()
@property def boundary_file(self) -> os.PathLike: return os.path.join( self.path, "shp", "KFM_bas.shp" ) @property def start(self) -> pd.Timestamp: """ start of data """ return pd.Timestamp('2000-01-01') @property def end(self) -> pd.Timestamp: """ end of data """ return pd.Timestamp('2019-12-31 23:59:59') @property def ts_path(self) -> os.PathLike: return os.path.join(self.path, "timeseries", "timeseries") @property def static_features(self) -> List[str]: return self._static_data(nrows=2).columns.to_list() @property def dynamic_features(self) -> List[str]: return self._read_stn_dyn(self.stations()[0], nrows=2).columns.to_list()
[docs] def stations(self) -> List[str]: """ returns names of stations as a list """ return [fname.split('.')[0] for fname in os.listdir(self.ts_path)]
@property def static_map(self) -> Dict[str, str]: return { 'Area': catchment_area(), 'Area_HydroATLAS': catchment_area_with_specifier('hydroatlas'), 'Lon_gage': gauge_longitude(), 'Lat_gage': gauge_latitude(), } @property def dyn_map(self) -> Dict[str, str]: """ dynamic features map for CAMELS-SK catchments """ return { # todo not sure about the units of these features # 'total_precipitation': total_precipitation(), # 'temperature_2m': mean_air_temp_with_specifier('2m'), # 'dewpoint_temperature_2m': mean_dewpoint_temperature_at_2m(), # 'potential_evaporation': total_potential_evapotranspiration(), # 'u_component_of_wind_10m': u_component_of_wind(), # 'v_component_of_wind_10m': v_component_of_wind(), # 'surface_net_solar_radiation': solar_radiation(), # 'air_temp_obs': mean_air_temp(), # 'precip_obs': total_precipitation(), # 'wind_sp_obs': mean_windspeed(), 'streamflow': observed_streamflow_cms(), } def _unzip_7z_files(self): # The attributes file is .7z file try: import py7zr except (ModuleNotFoundError, ImportError): raise ImportError('py7zr is required to extract the .7z files. Please install it using `pip install py7zr`') if not os.path.exists(self.boundary_file): fpath = os.path.join(self.path, 'shp.7z') with py7zr.SevenZipFile(fpath, mode='r') as z: z.extractall(path = self.path) print(f'Extracted {fpath}') return def _read_stn_dyn(self, stn:str, nrows=None)->pd.DataFrame: """ reads dynamic data for a given station """ stn_df = pd.read_csv( os.path.join(self.ts_path, f"{stn}.csv"), index_col=0, parse_dates=True, nrows=nrows, ) stn_df.index = pd.to_datetime(stn_df.index) if stn_df.index.has_duplicates: print(f"Warning: {stn} has duplicated index. Removing duplicates.") stn_df.rename(columns=self.dyn_map, inplace=True) return stn_df def _static_data(self, nrows:int = None) -> pd.DataFrame: """ static attributes of catchments Returns ------- pd.DataFrame a :obj:`pandas.DataFrame` of static features of all catchments of shape (178, 239) """ dfs = [] idx = 0 # read all .csv files in static_path for csv_file in glob.glob(os.path.join(self.path, '*.csv')): df = pd.read_csv(csv_file, index_col=0, nrows=nrows) df.index = df.index.astype(str) if df.index.duplicated().any(): if self.verbosity > 1: print(f"Warning: Duplicated indices found in {csv_file}. Dropping duplicates.") df = df.drop_duplicates() if csv_file.endswith('HydroATLAS.csv'): val_stns = [stn for stn in df.index if stn in self.stations()] df = df.loc[val_stns, :] dfs.append(df) idx += 1 static_data = pd.concat(dfs, axis=1) # remove duplicated columns static_data = static_data.loc[:, ~static_data.columns.duplicated()].copy() static_data.rename(columns=self.static_map, inplace=True) return static_data
[docs] class CAMELS_LUX(_RainfallRunoff): """ Dataset of 56 catchments from Luxembourg following the work of `Nijzink et al., 2025 <https://doi.org/10.5194/essd-2024-482>`_. The dataset consists of 61 static catchment features and 25 dynamic features. The dynamic features span from 20040101 to 20211231 with daily, hourly, and 15-minute timesteps. The data is downloaded from `Zenodo <https://zenodo.org/records/14910359>`_. Examples --------- >>> from aqua_fetch import CAMELS_LUX >>> dataset = CAMELS_LUX() >>> _, data = dataset.fetch(0.5, as_dataframe=True) >>> data.shape (155225, 28) # 28 represents number of stations Since data is a multi-index dataframe, we can get data of one station as below >>> data['ID_02'].unstack().shape (6209, 13) If we don't set as_dataframe=True, then the returned data will be a xarray Dataset >>> _, data = dataset.fetch(0.1) >>> type(data) xarray.core.dataset.Dataset >>> data.dims FrozenMappingWarningOnValuesAccess({'time': 6209, 'dynamic_features': 25}) >>> len(data.data_vars) 5 >>> _, df = dataset.fetch(stations=1, as_dataframe=True) # get data of only one random station >>> df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it >>> df.shape (6209, 25) # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 56 # get data by station id >>> _, df = dataset.fetch(stations='ID_02', as_dataframe=True) >>> df.unstack().shape (6209, 25) # get names of available dynamic features >>> dataset.dynamic_features # get only selected dynamic features >>> _, df = dataset.fetch(1, as_dataframe=True, ... dynamic_features=['pcp_mm_station', 'rh_%', 'airtemp_C_mean', 'pet_mm_pm', 'q_cms_obs']) >>> df.unstack().shape (6209, 25) # get names of available static features >>> dataset.static_features # get data of 10 random stations >>> _, df = dataset.fetch(10, as_dataframe=True) >>> df.shape (155225, 10) # remember this is multi-indexed DataFrame # If we want to get both static and dynamic data >>> static, dynamic = dataset.fetch(stations='ID_02', static_features="all", as_dataframe=True) >>> static.shape, dynamic.unstack().shape ((1, 61), (6209, 25)) >>> coords = dataset.stn_coords() # returns coordinates of all stations >>> coords.shape (56, 2) >>> dataset.stn_coords('ID_02') # returns coordinates of station whose id is ID_02 49.586288 6.14908 >>> dataset.stn_coords(['ID_01', 'ID_02']) # returns coordinates of two stations ID_01 49.526478 6.114957 ID_02 49.586288 6.14908 """ url = "https://zenodo.org/records/14910359"
[docs] def __init__(self, path=None, timestep:str = 'D', overwrite=False, to_netcdf: bool = True, **kwargs): assert timestep in ['D', 'H', '15Min'], "timestep must be one of ['D', 'H', '15Min']" super(CAMELS_LUX, self).__init__( path=path, timestep=timestep, to_netcdf=to_netcdf, **kwargs) self._download(overwrite=overwrite) # if self.to_netcdf: self._maybe_to_netcdf(f'camels_lux_dyn_{self.timestep}')
@property def static_features(self) -> List[str]: return self._static_data().columns.to_list() @property def dynamic_features(self) -> List[str]: df = self._read_stn_dyn(self.stations()[0], nrows=2) return df.columns.to_list()
[docs] def stations(self) -> List[str]: """ returns names of stations a list """ return pd.read_csv( os.path.join(self.path, "CAMELS-LUX", "basin_id.csv"), header=None, index_col=0, dtype={0: str} ).index.to_list()
@property def boundary_file(self): return os.path.join( self.path, "CAMELS-LUX_shapefiles", "catchments_CAMELS-LUX.shp" ) @property def static_map(self) -> Dict[str, str]: return { 'Lat': gauge_latitude(), 'Lon': gauge_longitude(), 'area_km2': catchment_area(), 'SLOPE_MEAN': slope('degree'), 'Z_MEAN': catchment_elevation_meters(), 'grassland': grass_fraction(), 'agricultural_land': crop_fraction(), 'urban': urban_fraction(), 'perimeter_km': catchment_perimeter(), } @property def static_factors(self) -> Dict[str, float]: """ static factors for CAMELS-LUX catchments """ return { urban_fraction(): 0.01, grass_fraction(): 0.01, crop_fraction(): 0.01, } @property def dyn_map(self) -> Dict[str, str]: """ dynamic features map for CAMELS-LUX catchments """ return { 'Q': observed_streamflow_cms(), 'Qspec': observed_streamflow_mmd(), 'RR_rad': total_precipitation_with_specifier('radar'), 'RR_stn': total_precipitation_with_specifier('station'), 'tp': total_precipitation_with_specifier('era5'), 't2m': mean_air_temp(), 'PET_Oudin': total_potential_evapotranspiration_with_specifier('oudin'), 'PET_PM': total_potential_evapotranspiration_with_specifier('pm'), 'q': mean_specific_humidity(), # todo : convert from kg/kg -> g/kg 'rh': mean_rel_hum(), 'ws10500': mean_windspeed(), 'swvl1': soil_moisture_layer1(), 'swvl2': soil_moisture_layer2(), 'swvl3': soil_moisture_layer3(), 'swvl4': soil_moisture_layer4(), } @property def start(self) -> pd.Timestamp: return pd.Timestamp('2004-01-01') @property def end(self) -> pd.Timestamp: return pd.Timestamp('2021-12-31') @property def ts_path(self) -> os.PathLike: return os.path.join(self.path, "CAMELS-LUX", "timeseries") @property def topo_fpath(self) -> os.PathLike: return os.path.join(self.path, "CAMELS-LUX", "CAMELS_LUX_topographic_attributes.csv") @property def daily_ts_path(self) -> os.PathLike: return os.path.join(self.ts_path, "daily") @property def hourly_ts_path(self) -> os.PathLike: return os.path.join(self.ts_path, "hourly") @property def subhourly_ts_path(self) -> os.PathLike: return os.path.join(self.ts_path, "15Min") def _static_data(self) -> pd.DataFrame: """ static attributes of catchments Returns ------- pd.DataFrame a :obj:`pandas.DataFrame` of static features of all catchments of shape (56, 61) """ dfs = [] idx = 0 # read all .csv files for csv_file in glob.glob(os.path.join(self.path, 'CAMELS-LUX', '*.csv')): if not csv_file.endswith('basin_id.csv'): df = pd.read_csv(csv_file, index_col=0, dtype={0: str}) df.index = df.index.astype(str) dfs.append(df) idx += 1 static_data = pd.concat(dfs, axis=1) static_data.rename(columns=self.static_map, inplace=True) # static_factors should be called after renaming the columns for col, fac in self.static_factors.items(): if col in static_data.columns: static_data[col] *= fac return static_data def _read_stn_dyn(self, stn:str, nrows=None)->pd.DataFrame: """ reads dynamic data for a given station """ ts_path = { 'D': self.daily_ts_path, 'H': self.hourly_ts_path, '15Min': self.subhourly_ts_path } stn_df = pd.read_csv( os.path.join(ts_path[self.timestep], f"CAMELS_LUX_hydromet_timeseries_{stn}.csv"), index_col=0, parse_dates=True, nrows=nrows, ) stn_df.index = pd.to_datetime(stn_df.index) if stn_df.index.has_duplicates: print(f"Warning: {stn} has duplicated index. Removing duplicates.") # drop rows with duplicated index, ideally there should not be any if self.timestep == '15Min': stn_df = stn_df[~stn_df.index.duplicated(keep='first')] stn_df.rename(columns=self.dyn_map, inplace=True) return stn_df
class CAMELS_DEBY(_RainfallRunoff): """ lumped and gridded data at hourly and daily timestep for 210 Bavarian (Germany) catchments following the work of `Anwar et al., 2025 <https://doi.org/10.5281/zenodo.14893685>`_. """ class HYD_Responses(_RainfallRunoff): """ `von Matt et al., 2025 <https://doi.org/10.5281/zenodo.14713274> """ class CAMELS_ES(_RainfallRunoff): """ """ url = "https://zenodo.org/records/15040948"
[docs] class CAMELS_FI(_RainfallRunoff): """ Dataset of 320 Finnish catchments with 16 dynamic features and 106 static features. The dynamic features span from 19610101 to 20231231 with daily timestep. The data is downloaded from `Zenodo <https://zenodo.org/records/16257216>`_. Examples --------- >>> from aqua_fetch import CAMELS_FI >>> dataset = CAMELS_FI() >>> _, data = dataset.fetch(0.1, as_dataframe=True) >>> data.shape (368160, 32) # 32 represents number of stations Since data is a multi-index dataframe, we can get data of one station as below >>> data['1156'].unstack().shape (23010, 16) If we don't set as_dataframe=True, then the returned data will be a xarray Dataset >>> _, data = dataset.fetch(0.1) >>> type(data) xarray.core.dataset.Dataset >>> data.dims FrozenMappingWarningOnValuesAccess({'time': 23010, 'dynamic_features': 16}) >>> len(data.data_vars) 32 >>> _, df = dataset.fetch(stations=1, as_dataframe=True) # get data of only one random station >>> df = df.unstack() # the returned dataframe is a multi-indexed dataframe so we have to unstack it >>> df.shape (23010, 16) # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 320 # get data by station id >>> _, df = dataset.fetch(stations='1156', as_dataframe=True) >>> df.unstack().shape (23010, 16) # get names of available dynamic features >>> dataset.dynamic_features # get only selected dynamic features >>> _, df = dataset.fetch(1, as_dataframe=True, ... dynamic_features=['pcp_mm', 'snowdepth_m', 'airtemp_C_mean', 'pet_mm', 'q_cms_obs']) >>> df.unstack().shape (23010, 5) # get names of available static features >>> dataset.static_features # get data of 10 random stations >>> _, df = dataset.fetch(10, as_dataframe=True) >>> df.shape (368160, 10) # remember this is multi-indexed DataFrame # If we want to get both static and dynamic data >>> static, dynamic = dataset.fetch(stations='1156', static_features="all", as_dataframe=True) >>> static.shape, dynamic.unstack().shape ((1, 106), (23010, 16)) >>> coords = dataset.stn_coords() # returns coordinates of all stations >>> coords.shape (320, 2) >>> dataset.stn_coords('1156') # returns coordinates of station whose id is 1156 62.253101 24.444099 >>> dataset.stn_coords(['1156', '1116']) # returns coordinates of two stations 1156 62.253101 24.444099 1116 60.385201 22.355301 """ url = "https://zenodo.org/records/16257216"
[docs] def __init__(self, path=None, overwrite=False, to_netcdf: bool = True, **kwargs): super(CAMELS_FI, self).__init__( path=path, to_netcdf=to_netcdf, **kwargs) self._download(overwrite=overwrite) self._unzip_boundaries() self._maybe_to_netcdf(f'camels_fi_dyn')
@property def data_path(self) -> os.PathLike: return os.path.join(self.path, "CAMELS-FI", "CAMELS-FI", "data") @property def boundary_path(self) -> os.PathLike: return os.path.join(self.data_path, "CAMELS_FI_catchment_boundaries") @property def boundary_file(self) -> os.PathLike: return os.path.join( self.boundary_path, "CAMELS_FI_catchment_boundaries.shp" ) @property def ts_path(self) -> os.PathLike: return os.path.join( self.data_path, "timeseries", )
[docs] def stations(self) -> List[str]: return [ fname.split('.')[0].split('_')[4] for fname in os.listdir(self.ts_path) if fname.endswith('.csv') ]
@property def dyn_map(self) -> Dict[str, str]: """ dynamic features map for CAMELS-FI catchments """ return { 'discharge_vol': observed_streamflow_cms(), 'discharge_spec': observed_streamflow_mmd(), 'precipitation': total_precipitation(), 'pet': total_potential_evapotranspiration(), 'temperature_min': min_air_temp(), 'temperature_mean': mean_air_temp(), 'temperature_max': max_air_temp(), 'humidity_rel': mean_rel_hum(), 'snow_depth': snow_depth(), # change from cm to m 'swe': snow_water_equivalent_with_specifier('era5'), 'swe_cci3-1': snow_water_equivalent_with_specifier('cci3-1'), } @property def static_map(self) -> Dict[str, str]: return { 'gauge_lat': gauge_latitude(), 'gauge_lon': gauge_longitude(), 'area': catchment_area(), 'slope': slope('percent'), #'slope_fdc': catchment_elevation_meters(), 'aridity': aridity_index(), 'grass_perc_2000': grass_fraction_with_specifier('2000'), 'urban_perc_2000': urban_fraction_with_specifier('2000'), 'crop_perc_2000': crop_fraction_with_specifier('2000'), 'grass_perc_2006': grass_fraction_with_specifier('2006'), 'urban_perc_2006': urban_fraction_with_specifier('2006'), 'crop_perc_2006': crop_fraction_with_specifier('2006'), 'grass_perc_2012': grass_fraction_with_specifier('2012'), 'urban_perc_2012': urban_fraction_with_specifier('2012'), 'crop_perc_2012': crop_fraction_with_specifier('2012'), 'grass_perc_2018': grass_fraction_with_specifier('2018'), 'urban_perc_2018': urban_fraction_with_specifier('2018'), 'crop_perc_2018': crop_fraction_with_specifier('2018'), 'elev_gauge': gauge_elevation_meters(), 'elev_50': med_catchment_elevation_meters(), 'soil_depth': soil_depth(), 'dens_inhabitants': population_density(), } @property def static_factors(self) -> Dict[str, float]: """ static factors for CAMELS-LUX catchments """ return { grass_fraction_with_specifier('2000'): 0.01, urban_fraction_with_specifier('2000'): 0.01, crop_fraction_with_specifier('2000'): 0.01, grass_fraction_with_specifier('2006'): 0.01, urban_fraction_with_specifier('2006'): 0.01, crop_fraction_with_specifier('2006'): 0.01, grass_fraction_with_specifier('2012'): 0.01, urban_fraction_with_specifier('2012'): 0.01, crop_fraction_with_specifier('2012'): 0.01, grass_fraction_with_specifier('2018'): 0.01, urban_fraction_with_specifier('2018'): 0.01, crop_fraction_with_specifier('2018'): 0.01, } @property def start(self) -> pd.Timestamp: """ start of data """ return pd.Timestamp('1961-01-01') @property def end(self) -> pd.Timestamp: """ end of data """ return pd.Timestamp('2023-12-31') def _static_data(self) -> pd.DataFrame: """ static attributes of catchments Returns ------- pd.DataFrame a :obj:`pandas.DataFrame` of static features of all catchments of shape (320, 106) """ csv_files = glob.glob(os.path.join(self.data_path, '*.csv')) dfs = [] for csv_file in csv_files: df = pd.read_csv( csv_file, index_col=0, dtype={0: str} ) df.index = df.index.astype(str) dfs.append(df) static_data = pd.concat(dfs, axis=1) static_data.rename(columns=self.static_map, inplace=True) # static_factors should be called after renaming the columns for col, fac in self.static_factors.items(): if col in static_data.columns: static_data[col] *= fac return static_data def _read_stn_dyn(self, stn:str, nrows=None)->pd.DataFrame: """ reads dynamic data for a given station """ fpath = os.path.join( self.ts_path, f"CAMELS_FI_hydromet_timeseries_{stn}_19610101-20231231.csv") df = pd.read_csv(fpath, index_col=0, parse_dates=True, nrows=nrows) df.index = pd.to_datetime(df.index) if df.index.has_duplicates: print(f"Warning: {stn} has duplicated index. Removing duplicates.") df.rename(columns=self.dyn_map, inplace=True) return df def _unzip_boundaries(self): if not os.path.exists(self.boundary_path): zip_file = os.path.join(self.data_path, "CAMELS_FI_catchment_boundaries.zip") if os.path.exists(zip_file): if self.verbosity: print(f"Unzipping boundary file {os.path.basename(zip_file)} to {self.data_path}") with zipfile.ZipFile(zip_file, 'r') as zip_ref: zip_ref.extractall(self.boundary_path) else: raise FileNotFoundError(f"Boundary file {zip_file} not found.") return
class CAMELSH(_RainfallRunoff): """ Dataset of ~5,000 catchments from united states of america following the work of `Tran et al., (2025) <https://doi.org/10.1038/s41597-025-05612-6>`_ . Please note that usage of this dataset requires xarray and netCDF4 libraries. """ url = { "Hourly2.zip": "https://zenodo.org/records/16729675", # contains observed q and water level "timeseries_nonobs.7z": "https://zenodo.org/records/15070091", # contains NLDAS forcing data "attributes.7z": "https://zenodo.org/records/15066778", "info.csv": "https://zenodo.org/records/15066778", "shapefiles.7z": "https://zenodo.org/records/15066778" } def __init__(self, path=None, overwrite=False, **kwargs, ): super(CAMELSH, self).__init__( path=path, timestep="H", **kwargs) for fname, url in self.url.items(): fpath = os.path.join(self.path, fname) if not os.path.exists(fpath) or overwrite: download_and_unzip(self.path, url, include=[fname], verbosity=self.verbosity) @property def h2_path(self) -> os.PathLike: return os.path.join(self.path, "Hourly2", "Hourly2") @property def attr_path(self) -> os.PathLike: return os.path.join(self.path, "attributes") @property def sf_path(self) -> os.PathLike: return os.path.join(self.path, "shapefiles") @property def boundary_file(self) -> os.PathLike: return os.path.join( self.sf_path, "CAMELSH_shapefile.shp" ) def _read_stn_dyn(self, stn): fpath = os.path.join(self.h2_path, f"{stn}_hourly.csv") if not os.path.exists(fpath): raise FileNotFoundError(f"Dynamic data for station {stn} not found in {self.h2_path}") ds = xr.open_dataset(fpath, engine='netcdf4') return ds def _static_data(self) -> pd.DataFrame: """ reads static data for all stations """ csv_files = glob.glob(os.path.join(self.attr_path, '*.csv')) dfs = [] for csv_file in csv_files: df = pd.read_csv( csv_file, index_col=0, dtype={0: str} ) df.index = df.index.astype(str) dfs.append(df) return pd.concat(dfs)