import os
import warnings
from typing import Union, List, Dict, Tuple
import numpy as np
import pandas as pd
try:
from netCDF4 import Dataset
except (ModuleNotFoundError, ImportError):
pass
from .utils import _RainfallRunoff
from .._backend import xarray as xr
from ..utils import validate_attributes, download, unzip
from ._map import (
observed_streamflow_cms,
observed_streamflow_mm,
min_air_temp,
max_air_temp,
mean_air_temp,
total_precipitation,
snow_water_equivalent,
mean_dewpoint_temperature_at_2m,
max_air_temp_with_specifier,
min_air_temp_with_specifier,
u_component_of_wind_at_10m,
v_component_of_wind_at_10m,
mean_daily_evaporation_with_specifier,
cloud_cover,
downward_longwave_radiation,
mean_thermal_radiation,
snow_density,
mean_daily_evaporation,
snowfall,
snowmelt,
mean_air_pressure,
solar_radiation,
net_longwave_radiation,
net_solar_radiation,
)
from ._map import (
catchment_area,
gauge_latitude,
gauge_longitude,
slope
)
[docs]
class HYSETS(_RainfallRunoff):
"""
database for hydrometeorological modeling of 14,425 North American watersheds
from 1950-2023 following the work of `Arsenault et al., 2020 <https://doi.org/10.1038/s41597-020-00583-2>`_
This data has 20 dynamic features and 30 static features. Most of the dynamic features
have more than one source. The data is available in netcdf format therefore,
this package requires xarray and netCDF4 to be installed..
Following data_source are available.
+---------------+------------------------------+
|sources | dynamic_features |
+===============+==============================+
|SNODAS_SWE | dscharge, swe |
+---------------+------------------------------+
|SCDNA | discharge, pr, tasmin, tasmax|
+---------------+------------------------------+
|nonQC_stations | discharge, pr, tasmin, tasmax|
+---------------+------------------------------+
|Livneh | discharge, pr, tasmin, tasmax|
+---------------+------------------------------+
|ERA5 | discharge, pr, tasmax, tasmin|
+---------------+------------------------------+
|ERAS5Land_SWE | discharge, swe |
+---------------+------------------------------+
|ERA5Land | discharge, pr, tasmax, tasmin|
+---------------+------------------------------+
all sources contain one or more following dynamic_features
with following shapes
+----------------------------+------------------+
|dynamic_features | shape |
+============================+==================+
|time | (25202,) |
+----------------------------+------------------+
|watershedID | (14425,) |
+----------------------------+------------------+
|drainage_area | (14425,) |
+----------------------------+------------------+
|drainage_area_GSIM | (14425,) |
+----------------------------+------------------+
|flag_GSIM_boundaries | (14425,) |
+----------------------------+------------------+
|flag_artificial_boundaries | (14425,) |
+----------------------------+------------------+
|centroid_lat | (14425,) |
+----------------------------+------------------+
|centroid_lon | (14425,) |
+----------------------------+------------------+
|elevation | (14425,) |
+----------------------------+------------------+
|slope | (14425,) |
+----------------------------+------------------+
|discharge | (14425, 25202) |
+----------------------------+------------------+
|pr | (14425, 25202) |
+----------------------------+------------------+
|tasmax | (14425, 25202) |
+----------------------------+------------------+
|tasmin | (14425, 25202) |
+----------------------------+------------------+
Examples
--------
>>> from aqua_fetch import HYSETS
>>> dataset = HYSETS()
... # get data by station id
>>> _, dynamic = dataset.fetch(stations='5', as_dataframe=True)
>>> df = dynamic['5'] # dynamic is a dictionary of with keys as station names and values as DataFrames
>>> df.shape
(27028, 20)
...
... # get name of all stations as list
>>> stns = dataset.stations()
>>> len(stns)
14425
... # get data of 10 % of stations as dataframe
>>> _, dynamic = dataset.fetch(0.1, as_dataframe=True)
>>> len(dynamic) # dynamic has data for 10% of stations (1442 out of 14425)
1442
...
... # dynamic is a dictionary whose values are dataframes of dynamic features
>>> [df.shape for df in dynamic.values()]
[(27028, 20), (27028, 20), (27028, 20),... (27028, 20), (27028, 20)]
...
... get the data of a single (randomly selected) station
>>> _, dynamic = dataset.fetch(stations=1, as_dataframe=True)
>>> len(dynamic) # dynamic has data for 1 station
1
... # get names of available dynamic features
>>> dataset.dynamic_features
... # get only selected dynamic features
>>> _, dynamic = dataset.fetch('5', as_dataframe=True,
... dynamic_features=['evap_mm', 'pcp_mm', 'snowmelt_mm', 'swe_mm', 'q_cms_obs'])
>>> dynamic['5'].shape
(27028, 5)
...
... # get names of available static features
>>> dataset.static_features
... # get data of 10 random stations
>>> _, dynamic = dataset.fetch(10, as_dataframe=True)
>>> len(dynamic) # remember this is a dictionary with values as dataframe
10
...
# If we get both static and dynamic data
>>> static, dynamic = dataset.fetch(stations='5', static_features="all", as_dataframe=True)
>>> static.shape, len(dynamic), dynamic['5'].shape
((1, 30), 1, (27028, 20))
...
# If we don't set as_dataframe=True and have xarray installed then the returned data will be a xarray Dataset
>>> _, dynamic = dataset.fetch(10)
... type(dynamic)
xarray.core.dataset.Dataset
...
>>> dynamic.dims
FrozenMappingWarningOnValuesAccess({'time': 27028, 'dynamic_features': 20})
...
>>> len(dynamic.data_vars)
10
...
>>> coords = dataset.stn_coords() # returns coordinates of all stations
>>> coords.shape
(14425, 2)
>>> dataset.stn_coords('5') # returns coordinates of station whose id is 5
47.091389 -67.731392
>>> dataset.stn_coords(['5', '12']) # returns coordinates of two stations
...
# get area of a single station
>>> dataset.area('5')
# get coordinates of two stations
>>> dataset.area(['5', '12'])
...
# if fiona library is installed we can get the boundary as fiona Geometry
>>> dataset.get_boundary('5')
"""
doi = "https://doi.org/10.1038/s41597-020-00583-2"
url = {
'HYSETS_watershed_boundaries.zip': 'https://osf.io/download/p8unw/',
'HYSETS_watershed_properties.txt': 'https://osf.io/download/us795/',
'HYSETS_2023_update_ERA5.nc': 'https://osf.io/download/fdnc8/',
'HYSETS_2023_update_ERA5Land.nc': 'https://osf.io/download/4vt2s/',
'HYSETS_2023_update_Livneh.nc': 'https://osf.io/download/4jgpt/',
'HYSETS_2023_update_monthly_meteorological_data.nc': 'https://osf.io/download/sc4ge/',
'HYSETS_2023_update_SNODAS.nc': 'https://osf.io/download/46wa7/',
'HYSETS_2023_update_SCDNA.nc': 'https://osf.io/download/q8za6/',
'HYSETS_2023_update_NRCAN.nc': 'https://osf.io/download/vfpre/',
'HYSETS_2023_update_nonQC_stations.nc': 'https://osf.io/download/eu8gr/',
'HYSETS_2023_update_QC_stations.nc': 'https://osf.io/download/sbfd2/',
'HYSETS_elevation_bands_100m.csv': 'https://osf.io/download/stzn7/',
'NOTES.txt': 'https://osf.io/download/cfm7q/',
}
sources = {
'10m_u_component_of_wind': ['ERA5', 'ERA5Land'],
'10m_v_component_of_wind': ['ERA5', 'ERA5Land'],
'2m_dewpoint': ['ERA5', 'ERA5Land'],
'2m_tasmax': ['ERA5', 'NRCAN', 'Livneh', 'QC_stations', 'nonQC_stations', 'ERA5Land', 'SCDNA'],
'2m_tasmin': ['ERA5', 'NRCAN', 'Livneh', 'QC_stations', 'nonQC_stations', 'ERA5Land', 'SCDNA'],
'discharge': ['ERA5', 'NRCAN', 'ERA5Land', 'Livneh', 'nonQC_stations', 'SCDNA', 'SNODAS', 'QC_stations'],
'evaporation': ['ERA5', 'ERA5Land'],
'snow_density': ['ERA5', 'ERA5Land'],
'snow_evaporation': ['ERA5', 'ERA5Land'],
'snow_water_equivalent': ['ERA5', 'ERA5Land'],
'snowfall': ['ERA5', 'ERA5Land'],
'snowmelt': ['ERA5', 'ERA5Land'],
'surface_downwards_solar_radiation': ['ERA5', 'ERA5Land'],
'surface_downwards_thermal_radiation': ['ERA5', 'ERA5Land'],
'surface_net_solar_radiation': ['ERA5', 'ERA5Land'],
'surface_net_thermal_radiation': ['ERA5', 'ERA5Land'],
'surface_pressure': ['ERA5', 'ERA5Land'],
'surface_runoff': ['ERA5', 'ERA5Land'],
'swe': ['SNODAS'],
'total_cloud_cover': ['ERA5'],
'total_precipitation': ['ERA5', 'NRCAN', 'Livneh', 'QC_stations', 'nonQC_stations', 'ERA5Land', 'SCDNA'],
'total_runoff': ['ERA5', 'ERA5Land'],
}
def_src = {
'10m_u_component_of_wind': 'ERA5',
'10m_v_component_of_wind': 'ERA5',
'2m_dewpoint': 'ERA5',
'2m_tasmax': 'ERA5',
'2m_tasmin': 'ERA5',
'discharge': 'ERA5',
'evaporation': 'ERA5',
'snow_density': 'ERA5',
'snow_evaporation': 'ERA5',
'snow_water_equivalent': 'ERA5',
'snowfall': 'ERA5',
'snowmelt': 'ERA5',
'surface_downwards_solar_radiation': 'ERA5',
'surface_downwards_thermal_radiation': 'ERA5',
'surface_net_solar_radiation': 'ERA5',
'surface_net_thermal_radiation': 'ERA5',
'surface_pressure': 'ERA5',
'surface_runoff': 'ERA5',
#'swe': 'SNODAS',
'total_cloud_cover': 'ERA5',
'total_precipitation': 'ERA5',
#'total_runoff': 'ERA5',
}
[docs]
def __init__(self,
path: str,
sources:Dict[str, str] = None,
**kwargs
):
"""
parameters
--------------
path : str
The path under which the data is to be saved or is saved already.
If the data is alredy downloaded then provide the path under which
HYSETS data is located. If None, then the data will be downloaded.
The data is downloaded once and therefore susbsequent
calls to this class will not download the data unless
``overwrite`` is set to True.
sources : dict
sources for each dynamic feature. The keys should be dynamic features
and values should be sources. Available sources for the dynamic
features are as below
- 10m_u_component_of_wind: ['ERA5', 'ERA5Land']
- 10m_v_component_of_wind: ['ERA5', 'ERA5Land']
- 2m_dewpoint: ['ERA5', 'ERA5Land']
- 2m_tasmax: ['NRCAN', 'Livneh', 'QC_stations', 'ERA5', 'nonQC_stations', 'ERA5Land', 'SCDNA']
- 2m_tasmin: ['NRCAN', 'Livneh', 'QC_stations', 'ERA5', 'nonQC_stations', 'ERA5Land', 'SCDNA']
- discharge: ['NRCAN', 'ERA5', 'ERA5Land', 'Livneh', 'nonQC_stations', 'SCDNA', 'SNODAS', 'QC_stations']
- evaporation: ['ERA5', 'ERA5Land']
- snow_density: ['ERA5', 'ERA5Land']
- snow_evaporation: ['ERA5', 'ERA5Land']
- snow_water_equivalent: ['ERA5', 'ERA5Land', 'SNODAS']
- snowfall: ['ERA5', 'ERA5Land']
- snowmelt: ['ERA5', 'ERA5Land']
- surface_downwards_solar_radiation: ['ERA5', 'ERA5Land']
- surface_downwards_thermal_radiation: ['ERA5', 'ERA5Land']
- surface_net_solar_radiation: ['ERA5', 'ERA5Land']
- surface_net_thermal_radiation: ['ERA5', 'ERA5Land']
- surface_pressure: ['ERA5', 'ERA5Land']
- surface_runoff: ['ERA5', 'ERA5Land']
- total_cloud_cover: ['ERA5']
- total_precipitation: ['NRCAN', 'Livneh', 'QC_stations', 'ERA5', 'nonQC_stations', 'ERA5Land', 'SCDNA']
kwargs :
arguments for ``_RainfallRunoff`` base class
"""
if sources is not None:
assert isinstance(sources, dict), 'sources must be a dictionary'
for key, val in sources.items():
assert key in self.sources, f'{key} is not a valid source'
assert val in self.sources[key], f'{val} is not a valid source for {key}. Available sources are {self.sources[key]}'
self.sources = sources
else:
self.sources = self.def_src.copy()
super().__init__(path=path, **kwargs)
if not os.path.exists(self.path):
os.makedirs(self.path)
for fname, url in self.url.items():
fpath = os.path.join(self.path, fname)
if not os.path.exists(fpath):
if self.verbosity:
print(f'downloading {fname}')
download(url, self.path, fname)
unzip(self.path, verbosity=self.verbosity)
self._maybe_to_netcdf()
self.bbox = {"llcrnrlat": 18, "urcrnrlat": 80.384358,
"llcrnrlon": -168.0, "urcrnrlon": -55.0}
self.parallels = np.arange(18, 80, 7)
self.meridians = np.arange(-168, -55, 12)
@property
def boundary_file(self) -> os.PathLike:
return os.path.join(self.path,
"HYSETS_watershed_boundaries",
"HYSETS_watershed_boundaries_20200730.shp")
@property
def boundary_id_map(self)->str:
"""
Name of the attribute in the boundary (.shp/.gpkg) file that
will be used to map the catchment/station id to the geometry of the
catchment/station. This is used to create the boundary id map.
"""
return "OfficialID"
@property
def static_map(self) -> Dict[str, str]:
return {
'Drainage_Area_km2': catchment_area(), # todo: why give preference to, Drainage_Area_GSIM_km2
'Centroid_Lat_deg_N': gauge_latitude(),
'Slope_deg': slope('degrees'),
'Centroid_Lon_deg_E': gauge_longitude(),
}
@property
def dyn_map(self)->Dict[str, str]:
return {
'10m_u_component_of_wind': u_component_of_wind_at_10m(),
'10m_v_component_of_wind': v_component_of_wind_at_10m(),
'2m_dewpoint': mean_dewpoint_temperature_at_2m(),
'2m_tasmax': max_air_temp_with_specifier('2m'),
'2m_tasmin': min_air_temp_with_specifier('2m'),
'discharge': observed_streamflow_cms(),
'evaporation': mean_daily_evaporation(),
'snow_density': snow_density(),
'snow_evaporation': mean_daily_evaporation_with_specifier('snow'),
'snow_water_equivalent': snow_water_equivalent(),
'snowfall': snowfall(),
'snowmelt': snowmelt(),
'surface_downwards_solar_radiation': solar_radiation(), # surface_downwards_solar_radiation_shortwave in J/m2
'surface_downwards_thermal_radiation': downward_longwave_radiation(), # surface_downwards_thermal_radiation_longwave in J/m2
'surface_net_solar_radiation': net_solar_radiation(), # surface_net_solar_radiation_shortwave in J/m2
'surface_net_thermal_radiation': net_longwave_radiation(), # surface_net_thermal_radiation_longwave in J/m2
'surface_pressure': mean_air_pressure(), # convert Pa to hPa
'surface_runoff': observed_streamflow_mm(),
'total_cloud_cover': cloud_cover(),
'total_precipitation': total_precipitation(),
# 'total_runoff': observed_streamflow_mm(), todo : it appears same as runoff?
}
@property
def dyn_generators(self):
return {
# new column to be created : function to be applied, inputs
mean_air_temp(): (self.mean_temp, (min_air_temp(), max_air_temp())),
}
@property
def dynamic_features(self)->List[str]:
return sorted(list(self.dyn_map.values()))
def _maybe_to_netcdf(self):
for src in list(set(list(self.sources.values()))):
fname = f'HYSETS_2023_update_{src}.nc'
outpath = os.path.join(self.path, f'HYSETS_2023_update_{src}1.nc')
if not os.path.exists(outpath):
self.transform(fname)
return
@property
def static_features(self)->List[str]:
df = self._static_data(nrows=2)
return df.columns.to_list()
[docs]
def stations(self) -> List[str]:
"""
retuns a list of station names. The ``Watershed_ID`` of the station is used
as station name instead of ``Official_ID``. This is because in .nc files
watershed_ID is used for stations instead of Official_ID. ``Official_ID``
starts with 1, 2, 3 and so on while ``Watershed_ID`` is a code from
meteo agency such as ``01AD002`` for station 1.
Returns
-------
list
a list of ids of stations
Examples
--------
>>> from aqua_fetch import HYSETS
>>> dataset = HYSETS()
... # get name of all stations as list
>>> dataset.stations()
"""
return super().stations()
@property
def WatershedID_OfficialID_map(self):
"""A dictionary mapping Watershed_ID to Official_ID.
For example '01AD002': '1'
"""
return self._static_data(
usecols=['Watershed_ID', 'Official_ID']
).loc[:, 'Official_ID'].to_dict()
@property
def OfficialID_WatershedID_map(self):
"""A dictionary mapping Official_ID to Watershed_ID.
For example '1': '01AD002'
"""
s = self._static_data(usecols=['Watershed_ID', 'Official_ID'])
return {v:k for k,v in s.loc[:, 'Official_ID'].to_dict().items()}
@property
def start(self)->pd.Timestamp:
return pd.Timestamp("19500101")
@property
def end(self)->pd.Timestamp:
return pd.Timestamp("20231231")
[docs]
def usgs_stations(self)->List[str]:
"""Returns the names of stations which are taken from USGS as list"""
df = pd.read_csv(os.path.join(self.path, "HYSETS_watershed_properties.txt"),
sep=",")
return df.loc[df['Source']=='USGS']['Watershed_ID'].astype(str).tolist()
[docs]
def area(
self,
stations: Union[str, List[str]] = 'all',
source:str = 'other'
) ->pd.Series:
"""
Returns area_gov (Km2) of all catchments as :obj:`pandas.Series`
parameters
----------
stations : str/list
name/names of stations. Default is None, which will return
area of all stations
source : str
source of area calculation. It should be either ``gsim`` or ``other``
Returns
--------
pd.Series
a :obj:`pandas.Series` whose indices are catchment ids and values
are areas of corresponding catchments.
Examples
---------
>>> from aqua_fetch import HYSETS
>>> dataset = HYSETS()
>>> dataset.area() # returns area of all stations
>>> dataset.area('92') # returns area of station whose id is 912101A
>>> dataset.area(['92', '142']) # returns area of two stations
"""
stations = validate_attributes(stations, self.stations())
SRC_MAP = {
'gsim': 'Drainage_Area_GSIM_km2',
'other': 'area_km2'
}
s = self.fetch_static_features(
static_features=[SRC_MAP[source]],
)
s.columns = ['area_km2']
return s.loc[stations, 'area_km2']
[docs]
def fetch_stations_features(
self,
stations: list,
dynamic_features: Union[str, list, None] = 'all',
static_features: Union[str, list, None] = None,
st=None,
en=None,
as_dataframe: bool = False,
**kwargs
) -> Tuple[pd.DataFrame, Union[pd.DataFrame, "Dataset"]]:
"""returns features of multiple stations
Examples
--------
>>> from aqua_fetch import HYSETS
>>> dataset = HYSETS()
>>> stations = dataset.stations()[0:3]
>>> features = dataset.fetch_stations_features(stations)
"""
if xr is None:
if not as_dataframe:
if self.verbosity: warnings.warn("xarray module is not installed so as_dataframe will have no effect. "
"Dynamic features will be returned as pandas DataFrame")
as_dataframe = True
stations = validate_attributes(stations, self.stations())
stations_int = [int(stn) for stn in stations]
static, dynamic = None, None
if dynamic_features is not None:
dynamic = self._fetch_dynamic_features(stations=stations_int,
dynamic_features=dynamic_features,
as_dataframe=as_dataframe,
st=st,
en=en,
**kwargs
)
if static_features is not None: # we want both static and dynamic
static = self.fetch_static_features(stations,
static_features=static_features,
)
elif static_features is not None:
# we want only static
static = self.fetch_static_features(
stations,
static_features=static_features,
)
else:
raise ValueError
return static, dynamic
[docs]
def fetch_dynamic_features(
self,
station,
dynamic_features = 'all',
st=None,
en=None,
as_dataframe=False
):
"""Fetches dynamic features of one station.
Examples
--------
>>> from aqua_fetch import HYSETS
>>> dataset = HYSETS()
>>> dyn_features = dataset.fetch_dynamic_features('station_name')
"""
station = [int(station)]
return self._fetch_dynamic_features(
stations=station,
dynamic_features=dynamic_features,
st=st,
en=en,
as_dataframe=as_dataframe
)
def _fetch_dynamic_features(
self,
stations: List[int],
dynamic_features = 'all',
st=None,
en=None,
as_dataframe=False
):
"""Fetches dynamic features of station."""
# first put all dynamic features in a single Dataset of shape (time, watershed) with dynamic_features as data_vars.
# Then converting it to (dynamic_features, time) with watershed as data_vars. This method is faster
# for fewer dynamic features but slower for many dynamic features.
stations_1 = np.subtract(stations, 1).astype(str).tolist()
st, en = self._check_length(st, en)
attrs = validate_attributes(dynamic_features, self.dynamic_features)
dyn_map_ = {v:k for k,v in self.dyn_map.items()}
xds = None
features = {}
for idx, f in enumerate(attrs):
f_ = dyn_map_[f]
fpath = os.path.join(self.path, f'HYSETS_2023_update_{self.sources[f_]}1.nc')
if xds is None or f_ not in xds.dynamic_features:
xds = xr.open_dataset(fpath)
features[f] = xds.sel(dynamic_features=[f_], time=slice(st, en))[stations_1]
if self.verbosity>1:
print(f"{idx+1}/{len(attrs)} fetched {f}")
if self.verbosity>1: print('concatenating along features')
xds = xr.concat(list(features.values()), dim='dynamic_features')
if self.verbosity>1: print('transposing')
old_vals = xds.coords["dynamic_features"].values
new_vals = [self.dyn_map[val] for val in old_vals]
xds = xds.assign_coords(dynamic_features=new_vals)
# we need to add +1 to the names of data_vars
old_vars = list(xds.data_vars)
new_data_vars = [f"{int(var)+1}" for var in old_vars]
data_vars_map = {old_var: new_var for old_var, new_var in zip(old_vars, new_data_vars)}
xds = xds.rename(data_vars_map)
if as_dataframe:
stations = np.array(stations).astype(str).tolist()
return {stn:xds[stn].to_pandas() for stn in stations}
return xds
def _static_data(self, usecols=None, nrows=None):
"""
reads the HYSETS_watershed_properties.txt file while using `Watershed_ID`
as index instead of ``Official_ID``. Watershed_ID starts with 1,2,3 and so on
while ``Official_ID`` is code from meteo agency such as ``01AD002`` for station 1.
"""
fname = os.path.join(self.path, 'HYSETS_watershed_properties.txt')
static_df = pd.read_csv(fname, index_col='Watershed_ID', sep=',', usecols=usecols, nrows=nrows)
static_df.index = static_df.index.astype(str)
static_df.rename(columns=self.static_map, inplace=True)
return static_df
def transform(
self,
fname: str,
):
fpath = os.path.join(self.path, fname)
if self.verbosity: print(f'transforming {fname}')
ds = xr.open_dataset(fpath)
ds = ds[[var for var in ds.data_vars if len(ds[var].dims) == 2]]
dyn_vars = list(ds.data_vars) # e.g. ["var1", "var2", ...]
# We'll manually combine them into a new DataArray
arr_list = []
for idx, var in enumerate(dyn_vars):
array = ds[var].values
arr_list.append(array)
if self.verbosity>1: print(f"{idx+1}/{len(dyn_vars)} Fetched {var} {array.shape}")
if self.verbosity: print('stacking arrays')
data = np.stack(arr_list, axis=2)
data_var_names = [str(i) for i in range(len(data))]
if self.verbosity: print('creating xarray dataset')
xds = xr.Dataset(
{name: (["time", "dynamic_features"], data[i, :, :].astype(np.float32)) for i, name in enumerate(data_var_names)},
coords={
"time": ds.time, # Replace with actual time coordinates if available
"dynamic_features": dyn_vars # Replace with actual feature names if available
}
)
outpath = os.path.join(self.path, f"{fname.split('.')[0]}1.nc")
if self.verbosity: print(f'saving as {outpath}')
xds.to_netcdf(
outpath,
encoding={var: {"dtype": "float32",
'zlib': True,
'complevel': 3,
'least_significant_digit': 4} for var in xds.data_vars}
)
return xds