__all__ = [
"EStreams",
"_EStreams",
"Finland",
"Ireland",
"Italy",
"Poland",
"Portugal",
"Slovenia"
]
import os
import time
import warnings
import urllib.parse
from io import StringIO
from datetime import datetime
import concurrent.futures as cf
from urllib.error import HTTPError
from typing import Union, List, Dict
from concurrent.futures import ProcessPoolExecutor
import requests
import numpy as np
import pandas as pd
try:
import xml.etree.ElementTree as ET
except (ModuleNotFoundError, ImportError):
ET = None
from .._backend import xarray as xr
from ..utils import get_cpus
from ..utils import validate_attributes
from .utils import _RainfallRunoff
from ._map import (
catchment_area,
gauge_latitude,
gauge_longitude,
slope
)
from ._map import (
min_air_pressure,
total_precipitation,
mean_potential_evapotranspiration,
mean_rel_hum,
mean_air_temp,
mean_windspeed,
max_air_temp,
min_air_temp,
solar_radiation,
observed_streamflow_cms,
)
# todo : a lot of methods in subclasses of _EStreams are redundant
[docs]
class EStreams(_RainfallRunoff):
"""
Handles EStreams data following the work of
`Nascimento et al., 2024 <https://doi.org/10.1038/s41597-024-03706-1>`_ .
The data is available at its `zenodo repository <https://zenodo.org/records/13961394>`_ .
It should be noted that this dataset does not contain observed streamflow data.
It has 17130 stations, 9 dynamic (meteorological) features with daily timestep, 27 dynamic
features with yearly timestep and 214 static features. The dynamic features
are from 1950-01-01 to 2023-06-30.
Examples
--------
>>> from aqua_fetch import EStreams
>>> dataset = EStreams()
"""
url = "https://zenodo.org/records/13961394"
[docs]
def __init__(self, path=None, **kwargs):
super().__init__(path, **kwargs)
self._download()
self.md = self.gauge_stations()
self.md.rename(columns={'area_estreams': catchment_area()}, inplace=True)
self._stations = self.__stations()
self._dynamic_features = None # lazy loading, will be loaded when first accessed
self._static_features = None # lazy loading, will be loaded when first accessed
self.bbox = {"llcrnrlat": 30, "urcrnrlat": 80, "llcrnrlon": -30, "urcrnrlon": 60}
self.parallels = range(30, 80, 15)
self.meridians = range(-30, 60, 20)
@property
def boundary_file(self) -> os.PathLike:
return os.path.join(self.path2,
"shapefiles",
"estreams_catchments.shp")
@property
def path2(self):
return os.path.join(self.path, 'EStreams', 'EStreams')
@property
def dynamic_features(self) -> List[str]:
if self._dynamic_features is None:
self._dynamic_features = self.meteo_data_station('IEEP0281').columns.tolist()
return self._dynamic_features
@property
def static_features(self):
if self._static_features is None:
self._static_features = self._static_data().columns.tolist()
return self._static_features
@property
def start(self) -> pd.Timestamp:
return pd.Timestamp('1950-01-01')
@property
def end(self) -> pd.Timestamp:
return pd.Timestamp('2023-06-30')
@property
def static_map(self) -> Dict[str, str]:
return {
'area_estreams': catchment_area(),
'lat': gauge_latitude(),
'slope_sawicz': slope('no_unit'),
'lon': gauge_longitude(),
}
@property
def dyn_map(self):
return {
't_min': min_air_temp(),
't_max': max_air_temp(),
't_mean': mean_air_temp(),
'p_mean': total_precipitation(),
'pet_mean': mean_potential_evapotranspiration(),
'rh_mean': mean_rel_hum(),
'sp_min': min_air_pressure(),
'swr_mean': solar_radiation(),
'ws_mean': mean_windspeed()
}
@property
def nc_path(self):
return os.path.join(self.path, 'EStreams', 'meteorology.nc')
def _static_data(self) -> pd.DataFrame:
"""
Returns a dataframe with static attributes of catchments
"""
static_path = os.path.join(self.path2, 'attributes', 'static_attributes')
dfs = [self.hydro_clim_sigs(), self.md.copy()]
for f in os.listdir(static_path):
if f.endswith('.csv'):
df = pd.read_csv(os.path.join(static_path, f), index_col='basin_id', dtype={'basin_id': str})
dfs.append(df)
df = pd.concat(dfs, axis=1)
df.rename(columns=self.static_map, inplace=True)
df.columns.name = 'static_features'
df.index.name = 'station_id'
return df
[docs]
def gauge_stations(self) -> pd.DataFrame:
"""
reads the file estreams_gauging_stations.csv as dataframe
"""
df = pd.read_csv(
os.path.join(self.path2, 'streamflow_gauges', 'estreams_gauging_stations.csv'),
index_col='basin_id',
dtype={'basin_id': str}
)
return df
[docs]
def stations(self) -> List[str]:
"""
Returns a list of all station names. Note that the `basin_id` column is
used as the station name.
"""
return self._stations
def __stations(self) -> List[str]:
df = pd.read_csv(
os.path.join(self.path2, 'streamflow_gauges', 'estreams_gauging_stations.csv'),
usecols=['basin_id', 'lat'],
dtype={'basin_id': str}
)
df.set_index('basin_id', inplace=True)
return df.index.tolist()
@property
def countries(self) -> List[str]:
"""
returns the names of 39 countries covered by EStreams as list
"""
return self.md.loc[:, 'gauge_country'].unique().tolist()
[docs]
def country_of_stn(self, stn: str) -> str:
"""find the agency to which a station belongs """
return self.md.loc[stn, 'gauge_country']
[docs]
def country_stations(self, country: str) -> List[str]:
"""returns the station ids from a particular country"""
return self.md[self.md['gauge_country'] == country].index.tolist()
[docs]
def stn_coords(self, stations: List[str] = "all", countries: List[str] = "all") -> pd.DataFrame:
"""
Returns the coordinates of one or more stations
Returns
-------
pd.DataFrame
a :obj:`pandas.DataFrame` of shape (stations, 2)
Examples
--------
>>> from aqua_fetch import EStreams
>>> dataset = EStreams()
>>> dataset.stn_coords('IEEP0281')
>>> dataset.stn_coords(['IEEP0281', 'IEEP0282'])
>>> dataset.stn_coords(countries='IE')
"""
stations = self._get_stations(countries, stations)
df = pd.read_csv(
os.path.join(self.path2, 'streamflow_gauges', 'estreams_gauging_stations.csv'),
usecols=['basin_id', 'lat', 'lon'],
dtype={'basin_id': str}
)
df.set_index('basin_id', inplace=True)
df.rename(columns={'lon': 'long'}, inplace=True)
return df.loc[stations]
def _get_stations(self, countries: List[str] = "all", stations: List[str] = "all") -> List[str]:
if countries != "all" and stations != 'all':
raise ValueError("Either provide countries or stations not both")
if countries != "all":
countries = validate_attributes(countries, self.countries, 'countries')
stations = self.md[self.md['gauge_country'].isin(countries)].index.tolist()
else:
stations = validate_attributes(stations, self.stations(), 'stations')
return stations
[docs]
def area(self, stations: List[str] = "all", countries: List[str] = "all") -> pd.Series:
"""area of catchments im km2"""
stations = self._get_stations(countries, stations)
return self.md.loc[stations, catchment_area()]
[docs]
def meteo_data_station(self, station: str) -> pd.DataFrame:
"""
Returns the meteorological data of a single station.
Parameters
----------
station : str
name/id of station of which to extract the data
Returns
-------
pd.DataFrame
a :obj:`pandas.DataFrame` of meteorological data of shape (time, 9)
"""
if os.path.exists(self.nc_path) and xr is not None:
if self.verbosity > 2:
print(f"Reading {station} from {self.nc_path}")
ds = xr.open_dataset(self.nc_path)
df = ds[station].to_pandas()
ds.close()
return df
df = pd.read_csv(
os.path.join(self.path2, 'meteorology', f'estreams_meteorology_{station}.csv'),
index_col='date',
parse_dates=True
)
df.columns.name = 'dynamic_features'
df.index.name = 'time'
df.rename(columns=self.dyn_map, inplace=True)
return df
[docs]
def meteo_data(
self,
stations: Union[str, List[str]] = "all",
countries: Union[List[str], str] = "all"
):
"""
Returns the meteorological data of one or more stations
either as dictionary of dataframes or xarray Dataset
"""
stations = self._get_stations(countries, stations)
out = self._metedo_data_all_stations()
if isinstance(out, dict):
return {stn: out[stn] for stn in stations}
return out[stations]
def _metedo_data_all_stations(self):
"""
Returns the meteorological data of all stations
"""
if self.to_netcdf and os.path.exists(self.nc_path):
if self.verbosity > 1:
print(f"Reading from {self.nc_path}")
# todo :with xarray 2025.1.2 it is causing following error, however tested successfully with xarray 2025.10.1
# ValueError: Failed to decode variable 'time': unable to decode time units 'days since 1950-01-01 00:00:00' with "calendar 'proleptic_gregorian'".
return xr.open_dataset(self.nc_path)
cpus = self.processes or max(get_cpus() - 2, 1)
stations = self.stations()
meteo_vars = {}
if self.verbosity:
print(f"Fetching meteorological data of {len(stations)} stations using {cpus} cpus")
start = time.time()
with cf.ProcessPoolExecutor(cpus) as exe: # takes ~500 secs with 110 cpus
dfs = exe.map(self.meteo_data_station, stations)
if self.verbosity:
print(f"Fetching meteorological data took {time.time() - start:.2f} seconds")
if self.to_netcdf:
for stn, df in zip(self.stations(), dfs):
meteo_vars[stn] = df
encoding = {stn: {'dtype': 'float32', 'zlib': True, 'complevel': 3} for stn in meteo_vars.keys()}
meteo_vars = xr.Dataset(meteo_vars)
if self.verbosity: print(f"Saving to {self.nc_path}")
meteo_vars.to_netcdf(self.nc_path, encoding=encoding)
return meteo_vars
[docs]
def hydro_clim_sigs(
self,
stations: List[str] = "all",
countries: List[str] = "all"
) -> pd.DataFrame:
"""
Returns the hydro-climatic signatures of one or more stations
Returns
-------
pd.DataFrame
a :obj:`pandas.DataFrame` of hydro-climatic signatures of shape (stations, 31)
"""
stations = self._get_stations(countries, stations)
df = pd.read_csv(
os.path.join(
self.path2,
'hydroclimatic_signatures',
'estreams_hydrometeo_signatures.csv'),
index_col='basin_id',
dtype={'basin_id': str}
)
return df.loc[stations, :]
[docs]
def fetch_stn_dynamic_features(
self,
station: str,
dynamic_features='all',
st:Union[str, pd.Timestamp] = None,
en:Union[str, pd.Timestamp] = None,
) -> pd.DataFrame:
"""
Fetches all or selected dynamic features of one station.
Parameters
----------
station : str
name/id of station of which to extract the data
features : list/str, optional (default="all")
The name/names of features to fetch. By default, all available
dynamic features are returned.
Returns
-------
pd.DataFrame
a :obj:`pandas.DataFrame` of shape (n, features) where n is the number of days
Examples
--------
>>> from aqua_fetch import EStreams
>>> camels = EStreams()
>>> camels.fetch_stn_dynamic_features('IEEP0281')
>>> camels.dynamic_features
>>> camels.fetch_stn_dynamic_features('IEEP0281',
... features=['p_mean', 't_mean', 'pet_mean'])
"""
features = validate_attributes(dynamic_features, self.dynamic_features, 'dynamic_features')
st, en = self._check_length(st, en)
return self.meteo_data_station(station).loc[st:en, features]
[docs]
def fetch_dynamic_features(
self,
stations: Union[List[str], str] = "all",
dynamic_features='all',
st=None,
en=None,
as_dataframe=False,
countries: Union[str, List[str]] = "all",
):
"""Fetches all or selected dynamic features of one station.
Parameters
----------
stations : str
name/id of station of which to extract the data
features : list/str, optional (default="all")
The name/names of features to fetch. By default, all available
dynamic features are returned.
st : Optional (default=None)
start time from where to fetch the data.
en : Optional (default=None)
end time untill where to fetch the data
as_dataframe : bool, optional (default=False)
if true, the returned data is :obj:`pandas.DataFrame` otherwise it
is :obj:`xarray.Dataset`
Examples
--------
>>> from aqua_fetch import EStreams
>>> camels = EStreams()
>>> camels.fetch_dynamic_features('IEEP0281', as_dataframe=True)
>>> camels.dynamic_features
>>> camels.fetch_dynamic_features('IEEP0281',
... features=['p_mean', 't_mean', 'pet_mean'],
... as_dataframe=True)
"""
stations = self._get_stations(countries, stations)
features = validate_attributes(dynamic_features, self.dynamic_features, 'dynamic_features')
if len(stations) == 1:
if as_dataframe:
stn_df = self.fetch_stn_dynamic_features(stations[0], features, st, en)
return {stations[0]: stn_df}
else:
return xr.Dataset({stations[0]: xr.DataArray(self.fetch_stn_dynamic_features(stations[0], features, st, en))})
if as_dataframe:
data = {stn:self.fetch_stn_dynamic_features(stn, features, st, en) for stn in stations}
# raise NotImplementedError("as_dataframe=True is not implemented yet")
return data
return self.meteo_data(stations).sel(dynamic_features=features)
[docs]
class _EStreams(_RainfallRunoff):
"""
Parent/Helper class for those datasets which use static and dynamic data from EStreams.
It handles specifically following classes
- :py:class:`aqua_fetch.Finland`
- :py:class:`aqua_fetch.Ireland`
- :py:class:`aqua_fetch.Italy`
- :py:class:`aqua_fetch.Poland`
- :py:class:`aqua_fetch.Portugal`
- :py:class:`aqua_fetch.Slovenia`
"""
[docs]
def __init__(
self,
path: Union[str, os.PathLike] = None,
estreams_path: Union[str, os.PathLike] = None,
overwrite: bool = False,
verbosity: int = 1,
**kwargs):
super().__init__(path, verbosity=verbosity, overwrite=overwrite, **kwargs)
if not os.path.exists(self.path):
os.makedirs(self.path)
if estreams_path is None:
if self.verbosity:
print(f"estreams_path is not provided, using {os.path.dirname(self.path)} as default")
self.estreams_path = os.path.dirname(self.path)
else:
self.estreams_path = estreams_path
self.estreams = EStreams(path=self.estreams_path, overwrite=overwrite, verbosity=verbosity)
self.md = self.estreams.md.loc[self.estreams.md['gauge_country'] == self.country_name]
self._stations = self.estreams.country_stations(self.country_name)
self.boundary_file = self.estreams.boundary_file
self.bndry_id_map_ = self.estreams.bndry_id_map_.copy()
@property
def dynamic_features(self) -> List[str]:
return [observed_streamflow_cms()] + self.estreams.dynamic_features
@property
def static_features(self) -> List[str]:
return self.estreams.static_features
@property
def country_name(self) -> str:
return NotImplementedError
@property
def _coords_name(self) -> List[str]:
return ['lat', 'lon']
@property
def _area_name(self) -> str:
# area_official is the area given by data provides but it contains nans
# supposing that estreams people's method is better/updated one
return catchment_area()
@property
def _q_name(self) -> str:
return observed_streamflow_cms()
@property
def start(self) -> pd.Timestamp:
return pd.Timestamp('1950-01-01')
@property
def end(self) -> pd.Timestamp:
return pd.Timestamp('2023-06-30')
[docs]
def stations(self) -> List[str]:
"""
Returns a list of all station names. Note that the `basin_id` column is
used as the station name.
"""
return self._stations
[docs]
def gauge_id_basin_id_map(self)->dict:
"""
For example for Portugal, it is
guage_id : '03J/02H'
basin_id 'PT000001'
'03J/02H' -> 'PT000001'
for Slovenia, it is
gauge id : 1060
basin_id : SI000001
'1060' -> 'SI000001'
"""
return {k:v for v,k in self.md['gauge_id'].to_dict().items()}
def _fetch_dynamic_features(
self,
stations: list,
dynamic_features='all',
st=None,
en=None,
as_dataframe=False,
):
"""Fetches dynamic features of station."""
st, en = self._check_length(st, en)
features = validate_attributes(dynamic_features, self.dynamic_features.copy(), 'dynamic_features')
daily_q = None
if observed_streamflow_cms() in features:
daily_q = self.fetch_q(as_dataframe)
if isinstance(daily_q, xr.Dataset):
daily_q = daily_q.sel(time=slice(st, en))[stations]
else:
daily_q = daily_q.loc[st:en, stations]
features.remove(observed_streamflow_cms())
if len(features) == 0:
if isinstance(daily_q, pd.DataFrame): # as_dataframe is True so
daily_q = {stn:pd.DataFrame(daily_q[stn].rename(observed_streamflow_cms())) for stn in daily_q.columns}
return daily_q
# stations_ = [f"{stn}_{self.agency_name}" for stn in stations]
data = self.estreams.fetch_dynamic_features(stations, features, st, en, as_dataframe)
if daily_q is not None:
if isinstance(daily_q, xr.Dataset):
assert isinstance(data, xr.Dataset), "xarray dataset not supported"
data = data.rename({stn: stn.split('_')[0] for stn in data.data_vars})
# first create a new dimension in daily_q named dynamic_features
daily_q = daily_q.expand_dims({'dynamic_features': [observed_streamflow_cms()]})
data = xr.concat([data, daily_q], dim='dynamic_features').sel(time=slice(st, en))
else:
assert isinstance(data, dict)
data = {stn.split('_')[0]: stn_data for stn, stn_data in data.items()}
for stn,meteo_df in data.items():
stn_df = pd.concat([meteo_df, daily_q[stn].rename(observed_streamflow_cms())], axis=1)
data[stn] = stn_df
else:
if isinstance(data, xr.Dataset):
data = data.rename({stn: stn.split('_')[0] for stn in data.data_vars})
else:
assert isinstance(data, dict)
data = {stn.split('_')[0]: stn_data for stn, stn_data in data.items()}
return data
def _fetch_static_features(
self,
station="all",
static_features: Union[str, list] = 'all',
**kwargs
) -> pd.DataFrame:
"""Fetches static features of station."""
if self.verbosity > 1:
print('fetching static features')
stations = validate_attributes(station, self.stations(), 'stations')
# stations_ = [f"{stn}_{self.agency_name}" for stn in stations]
static_feats = self.estreams.fetch_static_features(stations, static_features).copy()
# static_feats.index = [stn.split('_')[0] for stn in static_feats.index]
return static_feats
[docs]
def fetch_stations_features(
self,
stations: list,
dynamic_features: Union[str, list, None] = 'all',
static_features: Union[str, list, None] = None,
st=None,
en=None,
as_dataframe: bool = False,
**kwargs
):
"""
returns features of multiple stations
Returns
-------
tuple
A tuple of static and dynamic features. Static features are always
returned as :obj:`pandas.DataFrame` with shape (stations, static features).
The index of static features' DataFrame is the station/gauge ids while the columns
are names of the static features. Dynamic features are returned either as
:obj:`xarray.Dataset` or a python dictionary whose keys are station names and values
are :obj:`pandas.DataFrame` depending upon whether `as_dataframe`
is True or False and whether the :obj:`xarray` library is installed or not.
If dynamic features are :obj:`xarray.Dataset`, then this dataset consists of `data_vars`
equal to the number of stations and station names as :obj:`xarray.Dataset.variables`
and `time` and `dynamic_features` as dimensions and coordinates.
Examples
--------
>>> from aqua_fetch import Arcticnet
>>> dataset = Arcticnet()
>>> stations = dataset.stations()
>>> features = dataset.fetch_stations_features(stations)
"""
stations = validate_attributes(stations, self.stations(), 'stations')
static, dynamic = None, None
if xr is None:
if not as_dataframe:
if self.verbosity: warnings.warn("xarray module is not installed so as_dataframe will have no effect. "
"Dynamic features will be returned as pandas DataFrame")
as_dataframe = True
if dynamic_features is not None:
dynamic = self._fetch_dynamic_features(stations=stations,
dynamic_features=dynamic_features,
as_dataframe=as_dataframe,
st=st,
en=en,
**kwargs
)
if static_features is not None: # we want both static and dynamic
static = self._fetch_static_features(station=stations,
static_features=static_features,
**kwargs
)
elif static_features is not None:
# we want only static
static = self._fetch_static_features(
station=stations,
static_features=static_features,
**kwargs
)
else:
raise ValueError(f"""
static features are {static_features} and dynamic features are also {dynamic_features}""")
return static, dynamic
[docs]
def fetch_static_features(
self,
stations: Union[str, List[str]] = "all",
static_features: Union[str, List[str]] = "all",
countries: List[str] = "all",
) -> pd.DataFrame:
"""
returns static atttributes of one or multiple stations
Parameters
----------
stations : str
name/id of station of which to extract the data
static_features : list/str, optional (default="all")
The name/names of features to fetch. By default, all available
static features are returned.
Examples
---------
>>> from aqua_fetch import Japan
>>> dataset = Japan()
get the names of stations
>>> stns = dataset.stations()
>>> len(stns)
12004
get all static data of all stations
>>> static_data = dataset.fetch_static_features(stns)
>>> static_data.shape
(12004, 27)
get static data of one station only
>>> static_data = dataset.fetch_static_features('01010070')
>>> static_data.shape
(1, 27)
get the names of static features
>>> dataset.static_features
get only selected features of all stations
>>> static_data = dataset.fetch_static_features(stns, ['Drainage_Area_km2', 'Elevation_m'])
>>> static_data.shape
(12004, 2)
"""
# stations = self.estreams._get_stations(countries, stations)
return self._fetch_static_features(stations, static_features)
START_YEAR = 2012
# todo : why q for only 239 stations is downloaded and others return HTTPError, it is
# due to wrong fromatting error in pd.read_csv?
# better to save all the data downloaded i.e. water level and temperature as well
[docs]
class Finland(_EStreams):
"""
Data of 669 catchments of Finland.
The observed streamflow data is downloaded from
https://wwwi3.ymparisto.fi .
The meteorological data, stattic catchment
features and catchment boundaries are
taken from :py:class:`aqua_fetch.EStreams` follwoing the works
of `Nascimento et al., 2024 <https://doi.org/10.5194/hess-25-471-2021>`_ . Therefore,
the number of static features are 214 and dynamic features are 10 and the
data is available from 2012-01-01 to 2023-06-30.
Examples
---------
>>> from aqua_fetch import Finland
>>> dataset = Finland()
>>> _, data = dataset.fetch(0.1) # the returned data will be a xarray Dataset
>>> type(data)
xarray.core.dataset.Dataset
>>> data.dims
FrozenMappingWarningOnValuesAccess({'time': 4199, 'dynamic_features': 10})
>>> len(data.data_vars) # number of stations for which data has been fetched
66
>>> _, data = dataset.fetch(stations=1) # get data of only one random station
# get name of all stations as list
>>> stns = dataset.stations()
>>> len(stns)
669
# get data by station id
>>> _, data = dataset.fetch(stations='FI000001')
# get names of available dynamic features
>>> dataset.dynamic_features
# get only selected dynamic features
>>> _, data = dataset.fetch(1,
... dynamic_features=['pcp_mm', 'rh_%', 'airtemp_C_mean', 'pet_mm', 'q_cms_obs'])
# get names of available static features
>>> dataset.static_features
# get data of 10 random stations
>>> _, data = dataset.fetch(10)
>>> len(data.data_vars)
10
# If we want to get both static and dynamic data
>>> static, dynamic = dataset.fetch(stations='FI000001', static_features="all")
>>> static.shape, len(dynamic.data_vars)
((1, 214), 1)
>>> coords = dataset.stn_coords() # returns coordinates of all stations
>>> coords.shape
(669, 2)
>>> dataset.stn_coords('FI000001') # returns coordinates of station whose id is FI000001
64.226288 27.736528
>>> dataset.stn_coords(['FI000001', 'FI000002']) # returns coordinates of two stations
FI000001 64.226288 27.736528
FI000002 64.226288 27.736528
"""
[docs]
def __init__(
self,
path:Union[str, os.PathLike] = None,
estreams_path:Union[str, os.PathLike] = None,
verbosity:int=1,
**kwargs):
super().__init__(path=path, estreams_path=estreams_path, verbosity=verbosity, **kwargs)
self.bbox = {'llcrnrlat': 59.0, 'urcrnrlat': 70.0, 'llcrnrlon': 19.0, 'urcrnrlon': 31.0}
self.parallels = range(59, 70, 2)
self.meridians = range(19, 31, 2)
@property
def country_name(self)->str:
return 'FI'
@property
def start(self)->pd.Timestamp:
return pd.Timestamp('2012-01-01')
[docs]
def gauge_id_basin_id_map(self)->dict:
# guage_id '5902650'
# basin_id 'FI000001'
# '5902650' -> 'FI000001'
return {k:v for v,k in self.md['gauge_id'].to_dict().items()}
def basin_id_gauge_id_map(self)->dict:
# guage_id '5902650'
# basin_id 'FI000001'
# 'FI000001' -> '5902650'
return self.md['gauge_id'].to_dict()
[docs]
def fetch_q(self, as_dataframe:bool=True, overwrite:bool=False):
"""
downloads (if not already downloaded) and returns the daily streamflow data of Finland.
either as :obj:`pandas.DataFrame` or as xarray dataset.
"""
fpath = os.path.join(self.path, 'daily_q.csv')
if not os.path.exists(fpath) or overwrite:
if self.verbosity: print("Downloading discharge data For Finland")
df_2001_2023 = self.download_2001_2023()
df_2024 = self.download_2024()
data = pd.concat([df_2001_2023, df_2024])
data.index.name = 'time'
data.to_csv(fpath, index_label="index")
else:
if self.verbosity>1:
print(f"Reading from pre-existing {fpath} file")
data = pd.read_csv(fpath, index_col="index",
na_values=['-'])
data.index = pd.to_datetime(data.index)
data.index.name = 'time'
if as_dataframe:
return data
return xr.Dataset({stn: xr.DataArray(data.loc[:, stn]) for stn in data.columns})
def download_2024(self):
if self.verbosity:
print("Downloading 2024 year data")
dfs = []
failures = 0
for idx, bsn_id in enumerate(self.stations()):
gauge_id = self.basin_id_gauge_id_map()[bsn_id]
url = f"https://wwwi3.ymparisto.fi/i3/tilanne/ENG/discharge/image/bigimage/Q{gauge_id}.txt"
try:
df = pd.read_csv(url,
#delim_whitespace=True,
sep='\s+',
skiprows=10,
encoding="ISO-8859-1",
decimal=',',
names=['date', bsn_id, 'avg', 'min', 'max'],
index_col='date',
parse_dates=True,
dayfirst=True,
na_values=['-']
)
except HTTPError:
failures += 1
warnings.warn(f" {idx} Failed to download {bsn_id} {failures}", UserWarning)
df = pd.DataFrame(columns=['date', bsn_id, 'avg', 'min', 'max'])
if self.verbosity>2:
print(f"{idx}: for {bsn_id} {df.shape}")
dfs.append(df[bsn_id].astype('float32'))
df_2024 = pd.concat(dfs, axis=1)
if self.verbosity:
print(f"Downloaded data of shape {df_2024.shape} for 2024")
return df_2024
def download_2001_2023(self):
if self.verbosity: print("Downloading 2012-2023 year data")
cpus = self.processes or max(get_cpus()-2, 1)
if self.verbosity:
print(f"downloading daily data for {len(self.stations())} stations from {2012} to {2023} with {cpus} cpus")
if cpus == 1:
all_data = self.download_data_seq()
else:
all_data = self.download_data_parallel(cpus)
if self.verbosity>2:
print(f"total number of stations: {len(all_data)} each with shape {all_data[0].shape}")
df_2012_2023 = pd.concat(all_data, axis=1)
if self.verbosity: print(f"Downloaded data of shape {df_2012_2023.shape} for 2001-2023")
return df_2012_2023
def download_data_parallel(self, cpus:int=None):
# todo : taking forever to download the data
start = time.time()
stations = self.stations()
_map = self.basin_id_gauge_id_map()
basin_ids = [_map[stn] for stn in stations]
years = range(START_YEAR, 2024)
stations_ = [[stn]*len(years) for stn in stations]
# flatten the list
stations_ = [item for sublist in stations_ for item in sublist]
basin_ids_ = [[bsn_id]*len(years) for bsn_id in basin_ids]
basin_ids_ = [item for sublist in basin_ids_ for item in sublist]
years_ = list(years) * len(stations)
if self.verbosity>1:
print(f"Total function calls: {len(stations_)} with {cpus} cpus")
with cf.ProcessPoolExecutor(cpus) as executor:
results = executor.map(download_daily_stn_yr, basin_ids_, stations_,
years_)
if self.verbosity:
print(f"total time taken to download data: {time.time() - start}")
all_data = []
for bsn_id, stn in zip(basin_ids, stations):
stn_data = []
for yr in years:
stn_yr_data = next(results)
stn_data.append(stn_yr_data[bsn_id])
stn_data = pd.concat(stn_data, axis=0)
stn_data.name = stn
if self.verbosity>2:
print(f"for {stn} with shape {stn_data.shape}")
all_data.append(stn_data)
return all_data
def download_data_seq(self):
# takes around 1 hour to download all the data
failures = 0
dfs = []
stations = self.stations()
for idx, bsn_id in enumerate(stations):
gauge_id = self.basin_id_gauge_id_map()[bsn_id]
stn_dfs = []
for year in range(2012, 2024):
url = f"https://wwwi3.ymparisto.fi/i3/kktiedote/ENG/{year}/discharge/image/bigimage/Q{gauge_id}.txt"
if year == 2012: skiprows = 7
elif year in [2013, 2014]: skiprows = 9
else: skiprows = 10
try:
yr_df = pd.read_csv(url,
#delim_whitespace=True,
sep='\s+',
skiprows=skiprows,
encoding="ISO-8859-1",
decimal=',',
names=['date', bsn_id, 'avg', 'min', 'max'],
index_col='date',
parse_dates=True,
dayfirst=True,
na_values=['-'],
)
except HTTPError:
failures += 1
warnings.warn(f" {idx} Failed to download {bsn_id} {year} {failures}", UserWarning)
yr_df = pd.DataFrame(
columns=['date', bsn_id, 'avg', 'min', 'max'],
)
stn_dfs.append(yr_df)
if len(stn_dfs) > 0:
stn_df = pd.concat(stn_dfs, axis=0)
if self.verbosity:
print(f"{idx}/{len(stations)}: for {bsn_id} {stn_df.shape} {len(stn_dfs)}")
dfs.append(stn_df[bsn_id].astype('float32'))
return dfs
def download_daily_stn_yr(
gauge_id:str,
bsn_id:str,
year:int
)->pd.DataFrame:
url = f"https://wwwi3.ymparisto.fi/i3/kktiedote/ENG/{year}/discharge/image/bigimage/Q{gauge_id}.txt"
if year == 2012: skiprows = 7
elif year in [2013, 2014]: skiprows = 9
else: skiprows = 10
try:
yr_df = pd.read_csv(url,
#delim_whitespace=True,
sep='\s+',
skiprows=skiprows,
encoding="ISO-8859-1",
decimal=',',
names=['date', bsn_id, 'avg', 'min', 'max'],
index_col='date',
parse_dates=True,
dayfirst=True,
na_values=['-'],
)
except HTTPError:
yr_df = pd.DataFrame(
columns=['date', bsn_id, 'avg', 'min', 'max'],
)
return yr_df
[docs]
class Ireland(_EStreams):
"""
Data of 464 catchments of Ireland. Out of these 464 catchments,
280 are from OPW and 184 are from EPA.
The observed streamflow data for EPA stations is downloaded from
https://epawebapp.epa.ie/Hydronet/#Flow while the observed streamflow for OPW
stations is downloaded from https://waterlevel.ie/hydro-data/#/overview/Waterlevel.
It should be that out of 280 OPW stations, streamflow data is available for only 129
stations. The meteorological data, static catchment
features and catchment boundaries are
taken from :py:class:`aqua_fetch.EStreams` follwoing the works
of `Nascimento et al., 2024 <https://doi.org/10.5194/hess-25-471-2021>`_ project. Therefore,
the number of static features are 214 and dynamic features are 10 and the
data is available from 1992-01-01 to 2020-06-31.
Examples
---------
>>> from aqua_fetch import Ireland
>>> dataset = Ireland()
>>> _, data = dataset.fetch(0.1) # the returned data will be a xarray Dataset
>>> type(data)
xarray.core.dataset.Dataset
>>> data.dims
FrozenMappingWarningOnValuesAccess({'time': 26844, 'dynamic_features': 10})
>>> len(data.data_vars) # number of stations for which data has been fetched
46
>>> _, data = dataset.fetch(stations=1) # get data of only one random station
# get name of all stations as list
>>> stns = dataset.stations()
>>> len(stns)
464
# get data by station id
>>> _, data = dataset.fetch(stations='IEEP0281')
# get names of available dynamic features
>>> dataset.dynamic_features
# get only selected dynamic features
>>> _, data = dataset.fetch(1,
... dynamic_features=['pcp_mm', 'rh_%', 'airtemp_C_mean', 'pet_mm', 'q_cms_obs'])
# get names of available static features
>>> dataset.static_features
# get data of 10 random stations
>>> _, data = dataset.fetch(10)
>>> len(data.data_vars)
10
# If we want to get both static and dynamic data
>>> static, dynamic = dataset.fetch(stations='IEEP0281', static_features="all")
>>> static.shape, len(dynamic.data_vars)
((1, 214), 1)
>>> coords = dataset.stn_coords() # returns coordinates of all stations
>>> coords.shape
(464, 2)
>>> dataset.stn_coords('IEEP0281') # returns coordinates of station whose id is IEEP0281
52.217434 -8.494649
>>> dataset.stn_coords(['IEEP0281', 'IEEP0282']) # returns coordinates of two stations
IEEP0281 52.217434 -8.494649
IEEP0282 54.284546 -6.921607
"""
[docs]
def __init__(
self,
path:Union[str, os.PathLike] = None,
estreams_path:Union[str, os.PathLike] = None,
verbosity:int=1,
**kwargs):
super().__init__(path=path, estreams_path=estreams_path, verbosity=verbosity, **kwargs)
self.bbox = {'llcrnrlat': 51.0, 'urcrnrlat': 55.5, 'llcrnrlon': -11.0, 'urcrnrlon': -5.0}
self.parallels = range(51, 56, 1)
self.meridians = range(-11, -5, 2)
@property
def country_name(self)->str:
return 'IE'
@property
def epa_stations(self)->List[str]:
md = self.estreams.md
stns = md.loc[(md['gauge_country']=='IE') & (md['gauge_provider']=='IE_EPA')]['gauge_id']
epa_stns = stns.tolist()
if self.timestep in ('H', 'hourly'): epa_stns.remove('38004') # todo:
return epa_stns
@property
def opw_stations(self)->List[str]:
md = self.estreams.md
stns = md.loc[(md['gauge_country']=='IE') & (md['gauge_provider']=='IE_OPW')]['gauge_id']
return stns.tolist()
def is_opw_station(self, stn)->bool:
return stn in self.opw_stations
def is_epa_station(self, stn)->bool:
return stn in self.epa_stations
[docs]
def gauge_id_basin_id_map(self)->dict:
"""
A dictionary whose keys are gauge_id and values are basin_id.
Supposing guage_id is '18118' and basin_id is 'IEEP0281'
then '18118' -> 'IEEP0281'
"""
return {k:v for v,k in self.md['gauge_id'].to_dict().items()}
def basin_id_gauge_id_map(self)->dict:
# guage_id '18118'
# basin_id 'IEEP0281'
# 'IEEP0281' -> '18118'
return self.md['gauge_id'].to_dict()
def fetch_q(
self,
as_dataframe:bool=True,
overwrite:bool=False,
):
fname = 'daily_q' if self.timestep in ["D", 'daily'] else 'hourly_q'
ext = '.csv'
fpath = os.path.join(self.path, fname + ext)
if not os.path.exists(fpath) or overwrite:
cpus = self.processes or max(get_cpus() - 2, 1)
if cpus > 1:
epa_df = self.download_epa_data_parallel(cpus=cpus)
opw_df = self.download_opw_data_parallel(cpus=cpus)
else:
epa_df = self.download_epa_data_seq()
opw_df = self.download_opw_data_seq()
data = pd.concat([epa_df, opw_df], axis=1)
data.index.name = 'time'
data.index = pd.to_datetime(data.index)
assert data.index.tz is None, "timezone info found in index"
data.rename(columns=self.gauge_id_basin_id_map(), inplace=True)
if ext == '.csv':
data.to_csv(fpath, index_label="index")
else:
data = xr.Dataset({stn: xr.DataArray(data.loc[:, stn]) for stn in data.columns})
data.to_netcdf(fpath)
else:
if self.verbosity > 1: print(f"Reading from pre-exising {fpath}")
if ext == '.csv':
data = pd.read_csv(fpath, index_col="index")
data.index = pd.to_datetime(data.index)
data.index.name = 'time'
data.rename(columns=self.gauge_id_basin_id_map(), inplace=True)
else:
data = xr.open_dataset(fpath)
if self.verbosity > 1: print(f"opened {fpath}")
if isinstance(data, pd.DataFrame):
if as_dataframe:
return data
return xr.Dataset({stn: xr.DataArray(data.loc[:, stn]) for stn in data.columns})
else:
if as_dataframe:
data = data.to_dataframe()
data.index.name = 'time'
return data
return data
[docs]
def download_epa_data_seq(self):
"""
Examples
---------
>>> epa_df = download_epa_data()
"""
folder = {'D': 'daily', 'H': 'hourly'}[self.timestep]
all_epa_data_file = os.path.join(self.path, f"epa_{folder}.csv")
if os.path.exists(all_epa_data_file):
if self.verbosity>1: print(f"{all_epa_data_file} already exists")
df = pd.read_csv(all_epa_data_file, index_col=0, parse_dates=True)
print(f"{all_epa_data_file} already exists")
return df
print("Downloading EPA data Sequentially")
epa_failiures = 0
epa_dfs = []
for idx, stn in enumerate(self.epa_stations):
fpath = os.path.join(self.path, "EPA", folder, f"{stn}.csv")
print(f"{idx}/{len(self.epa_stations)} Downloading {stn}")
df, epa_failiures = _download_epa_stn_data(fpath, self.timestep)
epa_dfs.append(df)
print(f'total epa failiures: {epa_failiures}')
print(f'total epa dfs: {len(epa_dfs)}')
df = pd.concat(epa_dfs, axis=1).astype('float32')
if self.verbosity>1: print(f"Downloaded total epa dfs: {len(epa_dfs)} saving to {all_epa_data_file}")
df.to_csv(all_epa_data_file)
return df
def download_epa_data_parallel(self, cpus=None):
if cpus is None:
cpus = self.processes or max(get_cpus() - 2, 1)
folder = {'D': 'daily', 'H': 'hourly'}[self.timestep]
all_epa_data_file = os.path.join(self.path, f"epa_{folder}.csv")
if os.path.exists(all_epa_data_file):
df = pd.read_csv(all_epa_data_file, index_col=0, parse_dates=True)
print(f"{all_epa_data_file} already exists")
return df
timesteps = [self.timestep] * len(self.epa_stations)
fpaths = [os.path.join(self.path, "EPA", folder, f"{stn}.csv") for stn in self.epa_stations]
print(f"Downloading {len(fpaths)} EPA stations using {cpus} cpus at {os.path.join(self.path, 'EPA', folder)}")
with ProcessPoolExecutor(cpus) as executor:
epa_dfs = list(executor.map(_download_epa_stn_data, fpaths, timesteps))
df = pd.concat([val[0] for val in epa_dfs], axis=1).astype('float32')
if self.timestep in ["D", 'daily']:
# remove hourly information from the index
# 2000-01-01 01:00:00 -> 2000-01-01
df.index = df.index.normalize()
print(f'Downloaded total epa dfs: {len(epa_dfs)}')
df.to_csv(all_epa_data_file)
return df
def download_opw_data_parallel(self, cpus=None):
folder = {'D': 'daily', 'H': 'hourly'}[self.timestep]
all_opw_data_file = os.path.join(self.path, f"opw_{folder}.csv")
if os.path.exists(all_opw_data_file):
df = pd.read_csv(all_opw_data_file, index_col=0, parse_dates=True)
print(f"{all_opw_data_file} already exists")
return df
fpaths = [os.path.join(self.path, "OPW", folder, f"{stn}.csv") for stn in self.opw_stations]
if self.verbosity:
print(f"Downloading {len(fpaths)} OPW stations using {cpus} cpus at {os.path.join(self.path, 'OPW', folder)}")
with ProcessPoolExecutor(cpus) as executor:
opw_dfs = list(executor.map(_download_opw_stn_data, fpaths, [self.timestep]*len(self.opw_stations)))
opw_df = [df for df in opw_dfs]
opw_df = pd.concat(opw_df, axis=1).astype('float32')
assert opw_df.index.tz is None, "opw_df index is not tz naive"
if self.verbosity:
print(f"Downloaded total opw dfs: {len(opw_dfs)}")
print(f"Saving opw data {opw_df.shape} to {all_opw_data_file}")
failures = [df for df in opw_dfs if len(df)==0]
if len(failures)>0:
self.opw_failures = [s.name for s in failures]
warnings.warn(f"Failed to download {len(failures)} OPW stations")
opw_df.to_csv(all_opw_data_file)
return opw_df
[docs]
def download_opw_data_seq(self):
"""
Examples
---------
>>> opw_df = download_opw_data()
"""
folder = {'D': 'daily', 'H': 'hourly'}[self.timestep]
all_opw_data_file = os.path.join(self.path, f"opw_{folder}.csv")
if os.path.exists(all_opw_data_file):
df = pd.read_csv(all_opw_data_file, index_col=0, parse_dates=True)
if self.verbosity: print(f"{all_opw_data_file} already exists")
return df
if self.verbosity: print("Downloading OPW data")
failiures = 0
opw_dfs = []
for idx, stn in enumerate(self.opw_stations):
fpath = os.path.join(self.path, "OPW", folder, f"{stn}.csv")
print(f"{idx}/{len(self.opw_stations)} Downloading {stn}")
df = _download_opw_stn_data(fpath, self.timestep)
opw_dfs.append(df)
if len(df)==0:
failiures += 1
if self.verbosity:
print(f"total failiures: {failiures}")
print(f"total opw dfs: {len(opw_dfs)}")
opw_dfs1 = [df for df in opw_dfs if len(df)>0]
opw_df = pd.concat(opw_dfs1, axis=1).astype('float32')
assert opw_df.index.tz is None, "opw_df index is not tz naive"
if self.verbosity:
print(f"Saving opw data {opw_df.shape} to {all_opw_data_file}")
opw_df.to_csv(all_opw_data_file)
return opw_df
def _download_epa_stn_data(fpath, timestep="D")->pd.Series:
stn = os.path.basename(fpath).split('.')[0]
if timestep in ("D", 'daily'):
fname = "daymean.zip"
else:
fname = "15min.zip"
epa_failiures = 0
url = f"https://epawebapp.epa.ie/Hydronet/output/internet/stations/DUB/{stn}/Q/complete_{fname}"
url2 = f"https://epawebapp.epa.ie/Hydronet/output/internet/stations/MON/{stn}/Q/complete_{fname}"
url3 = f"https://epawebapp.epa.ie/Hydronet/output/internet/stations/ATH/{stn}/Q/complete_{fname}"
url4 = f"https://epawebapp.epa.ie/Hydronet/output/internet/stations/COR/{stn}/Q/complete_{fname}"
url5 = f"https://epawebapp.epa.ie/Hydronet/output/internet/stations/KIK/{stn}/Q/complete_{fname}"
url6 = f"https://epawebapp.epa.ie/Hydronet/output/internet/stations/CAS/{stn}/Q/complete_{fname}"
try:
df = pd.read_csv(url,
comment='#',
sep=';',
names=["timestamp", stn, "qflag"]
)
except HTTPError:
try:
df = pd.read_csv(url2,
comment='#',
sep=';',
names=["timestamp", stn, "qflag"])
except HTTPError:
try:
df = pd.read_csv(url3,
comment='#',
sep=';',
names=["timestamp", stn, "qflag"])
except HTTPError:
try:
df = pd.read_csv(url4,
comment='#',
sep=';',
names=["timestamp", stn, "qflag"])
except HTTPError:
try:
df = pd.read_csv(url5,
comment='#',
sep=';',
names=["timestamp", stn, "qflag"])
except HTTPError:
try:
df = pd.read_csv(url6,
comment='#',
sep=';',
names=["timestamp", stn, "qflag"])
except HTTPError:
print(f"Failed to download {stn}")
epa_failiures += 1
pass
df.index = pd.to_datetime(df.pop('timestamp'))
# considering quality codes https://epawebapp.epa.ie/Hydronet/#FAQ
# Quality codes: Good, nan, Suspect, Extrapolated, Unchecked, Excellent, Estimated
df = df.loc[~df['qflag'].isin(['Unchecked'])]
if timestep == "D":
return df[stn], epa_failiures
return df[stn].resample(timestep).mean(), epa_failiures
def _download_opw_stn_data(fpath, timestep="D")->pd.Series:
stn = os.path.basename(fpath).split('.')[0]
# we don't/can't download daily data
if timestep == "daily":
timestep = "D"
elif timestep == "hourly":
timestep = "H"
elif timestep == "D":
pass
else:
raise ValueError(f"timestep should be either 'D' or 'H' but it is {timestep}")
url = f"https://waterlevel.ie/hydro-data/data/internet/stations/0/{stn}/Q/Discharge_complete.zip"
try:
df = pd.read_csv(url,
comment='#',
sep=';',
names=["timestamp", stn, "q_code"]
)
except HTTPError:
warnings.warn(f"Failed to download {stn}", UserWarning)
df = pd.Series(name=stn)
df.index = pd.to_datetime(df.pop('timestamp'))
if df.index.tz is not None:
df.index = df.index.tz_convert("UTC").tz_localize(None)
# considering quality codes as given here https://waterlevel.ie/hydro-data/#/html/qualitycodes
# df['q_code'] has following values : 36, 46, 31, 56, 96, 225, 101, 32, 99, 254
# 96 and 254 are provisional values and can be changed!
# 101 is erronous while 36 and 46 contain fair and siginificant errors respectively.
# get rows where q_code is not 96 or 254
df = df.loc[~df['q_code'].isin([96, 254])]
stn_data = df[stn]
#stn_data = stn_data.resample(timestep).apply(lambda subdata: tw_resampler(subdata, stn_data.sort_index(), timestep))
stn_data = stn_data.resample(timestep).mean()
return stn_data
[docs]
class Italy(_EStreams):
"""
Data of 294 catchments of Italy.
The observed streamflow data is downloaded from
http://www.hiscentral.isprambiente.gov.it/hiscentral/hydromap.aspx?map=obsclient .
The meteorological data, static catchment
features and catchment boundaries are
taken from :py:class:`aqua_fetch.EStreams` follwoing the works
of `Nascimento et al., 2024 <https://doi.org/10.5194/hess-25-471-2021>`_ . Therefore,
the number of static features are 214 and dynamic features are 10 and the
data is available from 1992-01-01 to 2020-06-31.
Examples
---------
>>> from aqua_fetch import Italy
>>> dataset = Italy()
>>> _, data = dataset.fetch(0.1) # the returned data will be a xarray Dataset
>>> type(data)
xarray.core.dataset.Dataset
>>> data.dims
FrozenMappingWarningOnValuesAccess({'time': 26844, 'dynamic_features': 10})
>>> len(data.data_vars) # number of stations for which data has been fetched
29
>>> _, data = dataset.fetch(stations=1) # get data of only one random station
# get name of all stations as list
>>> stns = dataset.stations()
>>> len(stns)
294
# get data by station id
>>> _, data = dataset.fetch(stations='ITIS0001')
# get names of available dynamic features
>>> dataset.dynamic_features
# get only selected dynamic features
>>> _, data = dataset.fetch(1,
... dynamic_features=['pcp_mm', 'rh_%', 'airtemp_C_mean', 'pet_mm', 'q_cms_obs'])
# get names of available static features
>>> dataset.static_features
# get data of 10 random stations
>>> _, data = dataset.fetch(10)
>>> len(data.data_vars)
10
# If we want to get both static and dynamic data
>>> static, dynamic = dataset.fetch(stations='ITIS0001', static_features="all")
>>> static.shape, len(dynamic.data_vars)
((1, 214), 1)
>>> coords = dataset.stn_coords() # returns coordinates of all stations
>>> coords.shape
(294, 2)
>>> dataset.stn_coords('ITIS0001') # returns coordinates of station whose id is ITIS0001
42.835835 13.919167
>>> dataset.stn_coords(['ITIS0001', 'ITIS0002']) # returns coordinates of two stations
ITIS0001 42.835835 13.919167
ITIS0002 42.783890 13.905833
"""
[docs]
def __init__(
self,
path:Union[str, os.PathLike] = None,
estreams_path:Union[str, os.PathLike] = None,
verbosity:int=1,
**kwargs):
super().__init__(path=path, estreams_path=estreams_path, verbosity=verbosity, **kwargs)
self._stations = self.ispra_stations()
self.bbox = {'llcrnrlat': 35.0, 'llcrnrlon': 6.0, 'urcrnrlat': 48.0, 'urcrnrlon': 19.0}
self.parallels = [35, 40, 45]
self.meridians = [10, 15]
@property
def country_name(self)->str:
return 'IT'
[docs]
def gauge_id_basin_id_map(self)->dict:
# guage_id 'hsl-abr:5010'
# basin_id 'ITIS0001'
# 'hsl-abr:5010' -> 'ITIS0001'
return {k:v for v,k in self.md['gauge_id'].to_dict().items()}
def ispra_stations_gauge_ids(self)->List[str]:
return self.md.loc[self.md['gauge_provider']=='IT_ISPRA']['gauge_id'].to_list()
def ispra_stations(self)->List[str]:
return self.md.loc[self.md['gauge_provider']=='IT_ISPRA'].index.to_list()
def all_stations(self)->List[str]:
return self.estreams.country_stations("IT")
def fetch_q(self, as_dataframe:bool=True):
fpath = os.path.join(self.path, 'daily_q.csv')
if not os.path.exists(fpath) or self.overwrite:
data = self.download_ispra_data()
data.to_csv(fpath, index_label="time")
else:
if self.verbosity > 1:
print(f"Reading q from pre-existing {fpath} file")
data = pd.read_csv(fpath, index_col="time")
data.index = pd.to_datetime(data.index)
data.index.name = "time"
# replace 'hsl-abr:5010' with 'ITIS0001'
data = data.rename(columns=self.gauge_id_basin_id_map()).sort_index()
if as_dataframe:
return data
return xr.Dataset({stn: xr.DataArray(data.loc[:, stn]) for stn in data.columns})
def download_ispra_data(self):
if self.verbosity > 1:
print("Downloading ISPRA data")
dfs = []
for idx, station in enumerate(self.ispra_stations_gauge_ids()):
initial = station.split(":")[0]
response = requests.get(
f"http://hydroserver.ddns.net/italia/{initial}/index.php/default/services/cuahsi_1_1.asmx/GetValuesObject?authToken=&location={station}&variable={initial}:Discharge&startDate=1900-01-01&endDate=2020-12-31")
root = ET.fromstring(response.content)
namespace = {'ns': 'http://www.cuahsi.org/waterML/1.1/'}
# Extract the time series data
timeseries = []
for value in root.findall('.//ns:value', namespace):
date_time = value.attrib['dateTime']
data_value = value.text
timeseries.append({'dateTime': date_time, 'value': data_value})
df = pd.DataFrame(timeseries)
df.index = pd.to_datetime(df.pop('dateTime'))
df.columns = [station]
print(idx, station, df.shape)
dfs.append(df)
df = pd.concat(dfs, axis=1)
return df
def download_ispra_stn(station):
initial = station.split(":")[0]
response = requests.get(
f"http://hydroserver.ddns.net/italia/{initial}/index.php/default/services/cuahsi_1_1.asmx/GetValuesObject?authToken=&location={station}&variable={initial}:Discharge&startDate=1900-01-01&endDate=2020-12-31")
root = ET.fromstring(response.content)
namespace = {'ns': 'http://www.cuahsi.org/waterML/1.1/'}
# Extract the time series data
timeseries = []
for value in root.findall('.//ns:value', namespace):
date_time = value.attrib['dateTime']
data_value = value.text
timeseries.append({'dateTime': date_time, 'value': data_value})
df = pd.DataFrame(timeseries)
df.index = pd.to_datetime(df.pop('dateTime'))
df.columns = [station]
# todo: why concatenating the 1077 stations in prior to 2023 and 833
# stations from 2023 become 1181? Like a lot of new stations come in 2023?
[docs]
class Poland(_EStreams):
"""
Data of 1287 catchments of Poland.
The observed streamflow data is downloaded from
https://danepubliczne.imgw.pl .
The meteorological data, static catchment
features and catchment boundaries are
taken from :py:class:`aqua_fetch.EStreams` follwoing the works
of `Nascimento et al., 2024 <https://doi.org/10.5194/hess-25-471-2021>`_ . Therefore,
the number of static features are 214 and dynamic features are 10 and the
data is available from 1951-01-01 to 2023-06-30.
Examples
---------
>>> from aqua_fetch import Poland
>>> dataset = Poland()
>>> _, data = dataset.fetch(0.1) # the returned data will be a xarray Dataset
>>> type(data)
xarray.core.dataset.Dataset
>>> data.dims
FrozenMappingWarningOnValuesAccess({'time': 26844, 'dynamic_features': 10})
>>> len(data.data_vars) # number of stations for which data has been fetched
128
>>> _, data = dataset.fetch(stations=1) # get data of only one random station
# get name of all stations as list
>>> stns = dataset.stations()
>>> len(stns)
1287
# get data by station id
>>> _, data = dataset.fetch(stations='PL000001')
# get names of available dynamic features
>>> dataset.dynamic_features
# get only selected dynamic features
>>> _, data = dataset.fetch(1,
... dynamic_features=['pcp_mm', 'rh_%', 'airtemp_C_mean', 'pet_mm', 'q_cms_obs'])
# get names of available static features
>>> dataset.static_features
# get data of 10 random stations
>>> _, data = dataset.fetch(10)
>>> len(data.data_vars)
10
# If we want to get both static and dynamic data
>>> static, dynamic = dataset.fetch(stations='PL000001', static_features="all")
>>> static.shape, len(dynamic.data_vars)
((1, 214), 1)
>>> coords = dataset.stn_coords() # returns coordinates of all stations
>>> coords.shape
(1287, 2)
>>> dataset.stn_coords('PL000001') # returns coordinates of station whose id is PL000001
49.921848 18.327913
>>> dataset.stn_coords(['PL000001', 'PL000002']) # returns coordinates of two stations
PL000001 49.921848 18.327913
PL000002 49.954769 18.326323
"""
[docs]
def __init__(
self,
path:Union[str, os.PathLike] = None,
estreams_path:Union[str, os.PathLike] = None,
verbosity:int=1,
**kwargs):
super().__init__(path=path, estreams_path=estreams_path, verbosity=verbosity, **kwargs)
self.bbox = {"llcrnrlat": 49.0, "urcrnrlat": 55.0, "llcrnrlon": 14.0, "urcrnrlon": 25.0}
self.parallels = range(50, 56, 2)
self.meridians = range(14, 26, 2)
@property
def country_name(self)->str:
return 'PL'
[docs]
def gauge_id_basin_id_map(self)->dict:
# guage_id '149180020'
# basin_id 'PL000001'
# '149180020' -> 'PL000001'
return {k:v for v,k in self.md['gauge_id'].to_dict().items()}
def basin_id_gauge_id_map(self)->dict:
# guage_id '149180020'
# basin_id 'PL000001'
# 'PL000001' -> '149180020'
return self.md['gauge_id'].to_dict()
@property
def zip_files_dir(self)->str:
"""path where zip files will be stored"""
return os.path.join(self.path, 'zip_files')
@property
def csv_files_dir(self)->str:
"""path where csv (obtained after extracting zip files) files will be stored"""
return os.path.join(self.path, 'csv_files')
def fetch_q(self, as_dataframe:bool=True):
fpath = os.path.join(self.path, 'daily_q.csv')
if not os.path.exists(fpath):
data = self._make_csv()
else:
if self.verbosity: print(f"Reading from existing {fpath} file")
data = pd.read_csv(fpath, index_col="time")
data.index = pd.to_datetime(data.index)
data.index.name = 'time'
# replace '149180020' with 'PL000001'
data = data.rename(columns=self.gauge_id_basin_id_map()).sort_index()
# todo: make sure that the following stations actually not have any data
if data.shape[1]<len(self.stations()):
warnings.warn(f"{len(self.stations())-data.shape[1]} stations are missing in the downloaded data")
for stn in self.stations():
if stn not in data.columns:
data[stn] = np.nan
if as_dataframe:
return data
return xr.Dataset({stn: xr.DataArray(data.loc[:, stn]) for stn in data.columns})
def _make_csv(self):
years = []
months = []
for yr in range(1951, 2023):
for month in range(1, 13):
month = str(month).zfill(2)
years.append(yr)
months.append(month)
cpus = self.processes or max(get_cpus()-2, 1)
if self.verbosity:
print(f"Downloading zip files using {cpus} cpus")
print(f"Total files to download: {len(years)}")
start = time.time()
with cf.ProcessPoolExecutor(max_workers=cpus) as executor:
data = list(executor.map(download_single_file, years, months))
if self.verbosity:
print(f"Downloaded all files in {time.time()-start} seconds")
data = pd.concat([df for df in data], axis=0)
if self.verbosity>1:
print(f"Data until 2022 has shape: {data.shape}")
data23 = download_data_2023(2023)
if self.verbosity>1:
print(f"Data for 2023 has shape: {data23.shape}")
data = pd.concat([data, data23], axis=0)
if self.verbosity>1:
print(f"Data after 2023 has shape: {data.shape}")
data.sort_index(inplace=True)
data.index.name = 'time'
if self.verbosity:
print(f"Saving daily discharge data {data.shape} to {self.path}")
data.to_csv(os.path.join(self.path, "daily_q.csv"), index_label="time")
return data
def download_single_file(year, month:str):
url = f"https://danepubliczne.imgw.pl/data/dane_pomiarowo_obserwacyjne/dane_hydrologiczne/dobowe/{year}/codz_{year}_{month}.zip"
try:
df = pd.read_csv(
url,
compression='zip',
encoding="ISO-8859-1",
engine='python',
on_bad_lines="skip",
names=['stn_id', 'year', 'day', 'q_cms', 'month'],
usecols=[0, 3, 5, 7, 9],
# sometimes casting month to int fails
# also don't cast q_cms to float32 since it makes 99999.999 to 100000.0
dtype={'stn_id': str, 'year': 'int', 'day': 'int', #'q_cms': np.float32, #'month': 'int'
},
#parse_dates={'date': ['year', 'month', 'day']},
#index_col='date',
)
except HTTPError:
raise Exception(f"Failed to download {url}")
# sometimes (such as 1992-07) month column has missing values
month = df.loc[:, 'month']
month = month.ffill()
yr, month, day = df.loc[:, 'year'].astype(int), month.astype(int), df.loc[:, 'day'].astype(int)
df.index = pd.to_datetime(pd.DataFrame({
'year': yr,
'month': month,
'day': day,
}))
df = df.pivot_table(index=df.index, columns="stn_id", values="q_cms")
df.columns = [col.replace(' ', '') for col in df.columns]
df.rename(columns={"149180020": "149180020"}, inplace=True)
# as per documentation, 99999.999 is missing value
df.replace(99999.999, np.nan, inplace=True)
if df.index.tz is not None:
df.index = df.index.tz_convert("UTC").tz_localize(None)
df.sort_index(inplace=True)
return df
def download_data_2023(year):
df = pd.read_csv(
f"https://danepubliczne.imgw.pl/data/dane_pomiarowo_obserwacyjne/dane_hydrologiczne/dobowe/{year}/codz_{year}.zip",
compression='zip',
encoding="ISO-8859-1",
engine='python',
on_bad_lines="skip",
sep=';',
names=['stn_id', 'year', 'day', 'q_cms', 'month'],
usecols=[0, 3, 5, 7, 9],
dtype={'stn_id': str, 'year': 'int', 'day': 'int', 'q_cms': np.float32, 'month': 'int'},
parse_dates={'date': ['year', 'month', 'day']},
index_col='date',
na_values=[99999.999]
)
df.replace("149180020", "149180020", inplace=True)
df = df.pivot_table(index=df.index, columns="stn_id", values="q_cms")
# replace empty splace in column names
df.columns = [col.replace(' ', '') for col in df.columns]
# as per documentation, 99999.999 is missing value
df.replace(99999.999, np.nan, inplace=True)
try:
df.index = pd.to_datetime(df.index)
except Exception:
raise Exception(f"Failed to convert index to datetime for {year}")
if df.index.tz is not None:
df.index = df.index.tz_convert("UTC").tz_localize(None)
return df
[docs]
class Portugal(_EStreams):
"""
Data of 280 catchments of Portugal.
The observed streamflow data is downloaded from
https://snirh.apambiente.pt .
The meteorological data, static catchment
features and catchment boundaries for the 280 catchments are
taken from :py:class:`aqua_fetch.EStreams` follwoing the works
of `Nascimento et al., 2024 <https://doi.org/10.5194/hess-25-471-2021>`_ project. Therefore,
the number of static features are 214 and dynamic features are 10 and the
data is available from 1972-01-01 to 2022-12-31.
Examples
---------
>>> from aqua_fetch import Portugal
>>> dataset = Portugal()
>>> _, data = dataset.fetch(0.1) # the returned data will be a xarray Dataset
>>> type(data)
xarray.core.dataset.Dataset
>>> data.dims
FrozenMappingWarningOnValuesAccess({'time': 18628, 'dynamic_features': 10})
>>> len(data.data_vars) # number of stations for which data has been fetched
28
>>> _, data = dataset.fetch(stations=1) # get data of only one random station
# get name of all stations as list
>>> stns = dataset.stations()
>>> len(stns)
280
# get data by station id
>>> _, data = dataset.fetch(stations='PT000001')
# get names of available dynamic features
>>> dataset.dynamic_features
# get only selected dynamic features
>>> _, data = dataset.fetch(1,
... dynamic_features=['pcp_mm', 'rh_%', 'airtemp_C_mean', 'pet_mm', 'q_cms_obs'])
# get names of available static features
>>> dataset.static_features
# get data of 10 random stations
>>> _, data = dataset.fetch(10)
>>> len(data.data_vars)
10
# If we want to get both static and dynamic data
>>> static, dynamic = dataset.fetch(stations='PT000001', static_features="all")
>>> static.shape, len(dynamic.data_vars)
((1, 214), 1)
>>> coords = dataset.stn_coords() # returns coordinates of all stations
>>> coords.shape
(280, 2)
>>> dataset.stn_coords('PT000001') # returns coordinates of station whose id is PT000001
41.794998 -7.969
>>> dataset.stn_coords(['PT000001', 'PT000002']) # returns coordinates of two stations
PT000001 41.794998 -7.969
PT000002 39.679001 -8.437
"""
[docs]
def __init__(
self,
path:Union[str, os.PathLike] = None,
estreams_path:Union[str, os.PathLike] = None,
verbosity:int=1,
**kwargs):
super().__init__(path=path, estreams_path=estreams_path, verbosity=verbosity, **kwargs)
fpath = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'data', 'portugal_stn_codes.csv')
self.codes = pd.read_csv(fpath, index_col=0)
self.bbox = {'llcrnrlat': 36.0, 'urcrnrlat': 43.0, 'llcrnrlon': -9, 'urcrnrlon': -6}
self.parallels = range(36, 43, 2)
self.meridians = range(-9, -6, 1)
@property
def country_name(self)->str:
return 'PT'
@property
def start(self)->pd.Timestamp:
return pd.Timestamp('1972-01-01')
@property
def end(self)->pd.Timestamp:
return pd.Timestamp('2022-12-31')
[docs]
def gauge_id_basin_id_map(self)->dict:
# guage_id '03J/02H'
# basin_id 'PT000001'
# '03J/02H' -> 'PT000001'
return {k:v for v,k in self.md['gauge_id'].to_dict().items()}
[docs]
def download_q_data_seq(self):
"""downloads q data sequentially"""
if self.verbosity: print("Downloading q data sequentially")
start = time.time()
data = []
for i in range(len(self.codes)):
g_code = self.codes.iloc[i, 1]
stn_data = download_stn_data(g_code)
stn_data.name = self.codes.index[i]
data.append(stn_data)
if self.verbosity and i%5 == 0:
print(i, "q files downloaded")
tot_time = round ((time.time() - start) / 60, 2)
if self.verbosity: print(f"Total Time taken {tot_time} minutes")
return pd.concat(data, axis=1)
[docs]
def download_q_data_parallel(self, cpus:int=4):
"""downloads q data in parallel"""
start = time.time()
data = []
with ProcessPoolExecutor(max_workers=cpus) as executor:
futures = [executor.submit(download_stn_data, g_code) for g_code in self.codes.iloc[:, 1]]
for i, future in enumerate(futures):
stn_data = future.result()
stn_data.name = self.codes.index[i]
data.append(stn_data)
if i%10 == 0:
print(i, "Done")
tot_time = round ((time.time() - start) / 60, 2)
if self.verbosity: print(f"Total Time taken {tot_time} minutes")
return pd.concat(data, axis=1)
[docs]
def fetch_q(
self,
as_dataframe:bool=True,
):
"""
returns the streamflow data of Portugal as xarray.Dataset or pandas.DataFrame
Returns
-------
xarray.Dataset or pandas.DataFrame. If as_dataframe is True, returns pandas.DataFrame
with columns as station codes and index as time. If as_dataframe is False, returns
xarray.Dataset with station codes as variables and time as dimension.
"""
fname = 'daily_q.csv'
fpath = os.path.join(self.path, fname)
if not os.path.exists(fpath) or self.overwrite:
if self.verbosity>1: print(f"Downloading q data at {self.path}")
cpus = self.processes or max(get_cpus() - 2, 1)
if cpus > 1:
q_df = self.download_q_data_parallel(cpus=cpus)
else:
q_df = self.download_q_data_seq()
else:
if self.verbosity: print(f"Reading q data from pre-existing file {fpath}")
q_df = pd.read_csv(fpath, index_col=0)
q_df.index = pd.to_datetime(q_df.index, dayfirst=True)
q_df.index.name = 'time'
# q_df columns are 03J/02H 15G/02H 11H/02H which needs to be mapped to PT000001 PT000002 PT000003
# because stations are identified by basin_id
q_df = q_df.rename(columns=self.gauge_id_basin_id_map()).sort_index()
if as_dataframe:
return q_df
return xr.Dataset({stn: xr.DataArray(q_df.loc[:, stn]) for stn in q_df.columns})
def download_stn_data(gauge_code:int)->pd.Series:
url = f'https://snirh.apambiente.pt/snirh/_dadosbase/site/paraCSV/dados_csv.php?sites={gauge_code}&pars=1850&tmin=01/01/1972&tmax=31/12/2022&formato=csv'
# Add headers if needed (you may need to adjust these)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36'
}
response = requests.get(url, headers=headers)
# Check if the request was successful
if response.status_code == 200:
data = StringIO(response.text)
# head = pd.read_csv(data, skiprows=1,
# nrows=1)
df = pd.read_csv(data, skiprows=3, index_col=0, parse_dates=True, dayfirst=True)
assert df.columns[0] == 'Caudal médio diário (m3/s)'
s = df.iloc[0:-1, 0]
s.name = str(gauge_code)
else:
warnings.warn(f"Failed to retrieve data: {response.status_code}")
s = pd.Series(name=str(gauge_code))
return s
[docs]
class Slovenia(_EStreams):
"""
Data of 117 catchments of Slovenia.
The observed streamflow data is downloaded from https://vode.arso.gov.si .
The meteorological data, static catchment
features and catchment boundaries for the 117 catchments are
taken from :py:class:`aqua_fetch.EStreams` follwoing the works
of `Nascimento et al., 2024 <https://doi.org/10.5194/hess-25-471-2021>`_ project. Therefore,
the number of static features are 214 and dynamic features are 10 and the
data is available from 1950-01-01 to 2023-12-31 .
Examples
---------
>>> from aqua_fetch import Slovenia
>>> dataset = Slovenia()
>>> _, data = dataset.fetch(0.1) # the returned data will be a xarray Dataset
>>> type(data)
xarray.core.dataset.Dataset
>>> data.dims
FrozenMappingWarningOnValuesAccess({'time': 27028, 'dynamic_features': 10})
>>> len(data.data_vars)
10
>>> _, df = dataset.fetch(stations=1) # get data of only one random station
# get name of all stations as list
>>> stns = dataset.stations()
>>> len(stns)
117
# get data by station id
>>> _, data = dataset.fetch(stations='SI000090')
# get names of available dynamic features
>>> dataset.dynamic_features
# get only selected dynamic features
>>> _, data = dataset.fetch(1,
... dynamic_features=['pcp_mm', 'rh_%', 'airtemp_C_mean', 'pet_mm', 'q_cms_obs'])
# get names of available static features
>>> dataset.static_features
# get data of 10 random stations
>>> _, data = dataset.fetch(10)
# If we want to get both static and dynamic data
>>> static, dynamic = dataset.fetch(stations='SI000090', static_features="all")
>>> static.shape, len(dynamic.data_vars)
((1, 214), 1)
>>> coords = dataset.stn_coords() # returns coordinates of all stations
>>> coords.shape
(117, 2)
>>> dataset.stn_coords('SI000090') # returns coordinates of station whose id is SI000090
45.865093 15.460184
>>> dataset.stn_coords(['SI000090', 'SI000002']) # returns coordinates of two stations
SI000090 45.865093 15.460184
SI000002 46.648823 16.059244
"""
[docs]
def __init__(
self,
path:Union[str, os.PathLike] = None,
estreams_path:Union[str, os.PathLike] = None,
verbosity:int=1,
**kwargs):
super().__init__(path=path, estreams_path=estreams_path, verbosity=verbosity, **kwargs)
self.bbox = {'llcrnrlat': 45.0, 'urcrnrlat': 47.0, 'llcrnrlon': 13.0, 'urcrnrlon': 17.0}
self.parallels = range(45, 47, 1)
self.meridians = range(13, 17, 1)
@property
def end(self) -> pd.Timestamp:
return pd.Timestamp('2023-12-31')
@property
def country_name(self) -> str:
return 'SI'
[docs]
def fetch_q(
self,
as_dataframe:bool=True,
):
"""
returns the streamflow data of Portugal as xarray.Dataset or pandas.DataFrame
Returns
-------
xarray.Dataset or pandas.DataFrame. If as_dataframe is True, returns pandas.DataFrame
with columns as station codes and index as time. If as_dataframe is False, returns
xarray.Dataset with station codes as variables and time as dimension.
"""
fname = 'daily_q.csv'
fpath = os.path.join(self.path, fname)
if not os.path.exists(fpath) or self.overwrite:
if self.verbosity>1: print(f"Downloading q data at {self.path}")
q_df = download_slovenia_q(self.md, outpath=fpath,
cpus=self.processes or min(get_cpus() - 2, 16))
else:
if self.verbosity: print(f"Reading q data from pre-existing file {fpath}")
q_df = pd.read_csv(fpath, index_col=0)
q_df.index = pd.to_datetime(q_df.index)
q_df.index.name = 'time'
# because stations are identified by basin_id
q_df = q_df.rename(columns=self.gauge_id_basin_id_map()).sort_index()
if as_dataframe:
return q_df
return xr.Dataset({stn: xr.DataArray(q_df.loc[:, stn]) for stn in q_df.columns})
def download_slovenia_q(
metadata:pd.DataFrame,
outpath:Union[str, os.PathLike],
cpus = 1
) -> pd.DataFrame:
"""
Downloads streamflow data for Slovenia stations.
Parameters
----------
metadata : pd.DataFrame
DataFrame containing metadata for the stations.
Returns
-------
pd.DataFrame
DataFrame containing the downloaded streamflow data.
"""
# todo : we should parallelize stations-years combined
cpus = cpus or min(get_cpus() - 2, 8)
if cpus > 1:
print(f"Download operation will be parallelized using {cpus} CPUs")
dirname = os.path.dirname(outpath)
# get current year
current_year = datetime.now().year
q_dfs = []
wl_dfs = []
wt_dfs = []
for i in range(len(metadata)):
row = metadata.iloc[i]
gauge_id = row['gauge_id']
gauge_name = row['gauge_name']
# math.isnan(start) or math.isnan(end):
st_yr = 1950
en_yr = current_year + 1
stn_dfs = []
if cpus > 1:
# Download all year combinations in parallel
with cf.ProcessPoolExecutor(cpus) as executor:
results = executor.map(download_slovenia_stn, [row]*len(range(st_yr, en_yr)), range(st_yr, en_yr))
for yr_df in results:
stn_dfs.append(yr_df)
else:
for year in range(st_yr, en_yr):
yr_df = download_slovenia_stn(row, year)
stn_dfs.append(yr_df)
print(f"downloaded data for {i}/{len(metadata)}: {gauge_id} - {gauge_name} for year {year}")
stn_df = pd.concat(stn_dfs)
stn_df.rename(columns={
'pretok (m3/s)': 'q_cms_obs',
'vodostaj (cm)': 'water_level_cm',
'temp. vode (°C)': 'water_temp_celsius',
'vsebnost suspendiranega materiala (g/m3)': 'suspended_solids_g_per_m3',
}, inplace=True)
q_dfs.append(stn_df['q_cms_obs'].rename(gauge_id))
wl_dfs.append(stn_df['water_level_cm'].rename(gauge_id))
wt_dfs.append(stn_df['water_temp_celsius'].rename(gauge_id))
print(st_yr, en_yr, stn_df.index[0], stn_df.index[-1])
if cpus:
print(f"Downloaded data for {i+1}/{len(metadata)}: {gauge_id} - {gauge_name}")
q_df = pd.concat(q_dfs, axis=1)
wl_df = pd.concat(wl_dfs, axis=1)
wt_df = pd.concat(wt_dfs, axis=1)
q_df.to_csv(outpath, index=True, index_label='date')
wl_df.to_csv(os.path.join(dirname, 'daily_wl.csv'), index=True, index_label='date')
wt_df.to_csv(os.path.join(dirname, 'daily_wt.csv'), index=True, index_label='date')
return q_df
def download_slovenia_stn(row:pd.Series, year:int)->pd.DataFrame:
#row, year = input_data
gauge_id = row['gauge_id']
river = row['river']
url = f"https://vode.arso.gov.si/hidarhiv/pov_arhiv_tab.php?p_vodotok={river}&p_postaja={gauge_id}&p_leto={year}&b_arhiv=Prika%C5%BEi&p_export=txt"
encoded_url = urllib.parse.quote(url, safe=':/?=&')
yr_df = pd.read_csv(encoded_url, encoding='utf-8', sep=';', index_col=0, parse_dates=True)
yr_df.index = pd.to_datetime(yr_df.index, format='%d.%m.%Y')
if 'pretok (m3/s)' not in yr_df.columns:
yr_df['pretok (m3/s)'] = np.nan
if 'vodostaj (cm)' not in yr_df.columns:
yr_df['vodostaj (cm)'] = np.nan
if 'temp. vode (°C)' not in yr_df.columns:
yr_df['temp. vode (°C)'] = np.nan
# Handle comma decimal separator before converting to float
for col in yr_df.columns:
if yr_df[col].dtype == 'object': # Only process string/object columns
yr_df[col] = yr_df[col].astype(str).str.replace(',', '.', regex=False)
# Replace 'nan' strings back to actual NaN
yr_df[col] = yr_df[col].replace('nan', np.nan)
try:
yr_df = yr_df.astype('float32')
except ValueError as e:
print(gauge_id, year)
raise e
return yr_df