import os
import time
import random
import warnings
import concurrent.futures as cf
from typing import Union, List, Dict, Tuple
import numpy as np
import pandas as pd
from .._datasets import Datasets
from .._backend import netCDF4
from .._backend import fiona
from .._backend import xarray as xr, plt, easy_mpl, plt_Axes
from ..utils import validate_attributes, get_cpus
from .._geom_utils import (
_make_boundary_2d
)
from ._map import (
catchment_area,
gauge_latitude,
gauge_longitude,
)
# directory separator
SEP = os.sep
def gb_message():
link = "https://doi.org/10.5285/8344e4f3-d2ea-44f5-8afa-86d2987543a9"
raise ValueError(f"Dwonlaoad the data from {link} and provide the directory "
f"path as dataset=Camels(data=data)")
[docs]
class _RainfallRunoff(Datasets):
"""
This is the parent class for invidual rainfall-runoff datasets like CAMELS-GB etc.
This class is not meant to be for direct use. It is inherited by the child classes
which are specific to a dataset like CAMELS-GB, CAMELS-AUS etc.
This class first downloads the dataset if it is not already downloaded.
Then the selected features for a selected catchment/station are fetched and provided to the
user using the method `fetch`.
Attributes
-----------
- path str/path: diretory of the dataset
- dynamic_features list: tells which dynamic features are available in
this dataset
- static_features list: a list of static features.
- static_attribute_categories list: tells which kinds of static features
are present in this category.
Methods
---------
- stations : returns name/id of stations for which the data (dynamic features)
exists as list of strings.
- fetch : fetches all features (both static and dynamic type) of all
station/gauge_ids or a speficified station. It can also be used to
fetch all features of a number of stations ids either by providing
their guage_id or by just saying that we need data of 20 stations
which will then be chosen randomly.
- fetch_dynamic_features :
fetches speficied dynamic features of one specified station. If the
dynamic attribute is not specified, all dynamic features will be
fetched for the specified station. If station is not specified, the
specified dynamic features will be fetched for all stations.
- fetch_static_features :
works same as `fetch_dynamic_features` but for `static` features.
Here if the `category` is not specified then static features of
the specified station for all categories are returned.
stations : returns list of stations
"""
DATASETS = {
'CAMELS_BR': {'url': "https://zenodo.org/record/3964745#.YA6rUxZS-Uk",
},
'CAMELS-GB': {'url': gb_message},
}
[docs]
def __init__(
self,
path: str = None,
timestep: str = "D",
to_netcdf: bool = True,
overwrite: bool = False,
verbosity: int = 1,
**kwargs
):
"""
parameters
-----------
path : str
if provided and the directory exists, then the data will be read
from this directory. If provided and the directory does not exist,
then the data will be downloaded in this directory. If not provided,
then the data will be downloaded in the default directory.
timestep : str
This can only be set for datasets which are available at multiple timesteps
such as LamaHCE or LamaHIce etc.
to_netcdf : bool
whether the data should be saved in netCDF format or not
If set to true, the data will be saved in netCDF format which
can take time for the first time it is created. However, it leads
to faster I/O operations in subsequent accesses.
overwrite : bool
whether to overwrite existing files or not. If set to True, the data
will be redownloaded.
verbosity : int
This parameter determines the level of verbosity for logging messages.
- 0: no message will be printed
- 1: only important messages will be printed
- >1: any higher value greater than 1 will result in more verbose output
kwargs :
Any other keyword arguments for the parent :py:class:`Datasets` class
"""
super(_RainfallRunoff, self).__init__(path=path, verbosity=verbosity, overwrite=overwrite, **kwargs)
self.bndry_id_map_ = {}
self.timestep = timestep
if netCDF4 is None:
if to_netcdf:
msg = "netCDF4 module is not installed. Please install it to save data in netcdf format"
warnings.warn(msg, UserWarning)
to_netcdf = False
self.to_netcdf = to_netcdf
self.bbox = {"llcrnrlat": -90, "urcrnrlat": 90, "llcrnrlon": -180, "urcrnrlon": 180}
self.parallels = range(-90, 90, 30)
self.meridians = range(-180, 180, 30)
@property
def dyn_map(self) -> Dict[str, str]:
"""A dictionary that maps dynamic features to their names in the dataset."""
return {}
@property
def static_map(self) -> Dict[str, str]:
"""A dictionary that maps static features to their names in the dataset."""
return {}
@property
def static_factors(self) -> Dict[str, str]:
"""A dictionary that maps static features to the factors with they needs
to be multiplied to get the actual value"""
return {}
@property
def dyn_factors(self) -> Dict[str, float]:
return {}
@property
def boundary_id_map(self) -> str:
"""
Name of the attribute in the boundary (shapefile/.gpkg) file that
will be used to map the catchment/station id to the geometry of the
catchment/station. This is used to create the boundary id map.
if not given, then the first attribute in the boundary file will be used.
"""
return None
@property
def dyn_fname(self) -> Union[str, os.PathLike]:
"""
name of the .nc file which contains dynamic features. This file is created during dataset initialization
only if to_netcdf is True and xarray is installed and the file does not already exists. The creation of this
file can take some time however it leads to faster I/O operations.
"""
return self.name.lower() + f"_{self.timestep}.nc"
@property
def dyn_fpath(self) -> os.PathLike:
return os.path.join(self.path, self.dyn_fname)
@property
def dyn_fpath_exists(self) -> bool:
"""checks if the .nc file which contains dynamic features exists"""
return os.path.exists(self.dyn_fpath)
[docs]
def mm_to_cms(self, q_mm: pd.Series) -> pd.Series:
"""converts discharge from mm/timestep to cms"""
if self.timestep.lower().startswith('d'):
conversion_factor = 86400
elif self.timestep.lower().startswith('h'):
conversion_factor = 3600
elif self.timestep.lower().startswith('15min'):
conversion_factor = 900
else:
raise ValueError(f"Invalid timestep: {self.timestep}. ")
area_m2 = self.area(q_mm.name) * 1e6
q_md = q_mm * 0.001 # convert mm/timestep to m/timestep
return q_md * area_m2.iloc[0] / conversion_factor
[docs]
def cms_to_mm(self, q_cms:pd.Series)->pd.Series:
"""convert streamflow from cms to mm/timestep"""
if self.timestep.lower().startswith('d'):
conversion_factor = 86400
elif self.timestep.lower().startswith('h'):
conversion_factor = 3600
elif self.timestep.lower().startswith('15min'):
conversion_factor = 900
else:
raise ValueError(f"Invalid timestep: {self.timestep}. ")
area_m2 = self.area(q_cms.name) * 1e6 # area in m2
return ((q_cms * conversion_factor)/area_m2.iloc[0]) * 1e3 # cms to mm/timestep
[docs]
@staticmethod
def mean_temp(tmin:pd.Series, tmax:pd.Series)->pd.Series:
"""calculates mean temperature from tmin and tmax"""
assert len(tmin) == len(tmax), f"length of tmin {len(tmin)} and tmax {len(tmax)} must be same"
return (tmin + tmax)/2
def _create_boundary_id_map(self):
if fiona is None:
raise ModuleNotFoundError("fiona module is not installed. Please install it to use boundary file")
if hasattr(self, 'bndry_id_map_') and self.bndry_id_map_:
return self.bndry_id_map_
# Dictionary to hold {CatchID: geometry}
self.bndry_id_map_ = {}
assert os.path.exists(self.boundary_file), \
f"Boundary file {self.boundary_file} does not exist."
with fiona.open(self.boundary_file, "r") as src:
boundary_id_map = self.boundary_id_map
if boundary_id_map is None:
schema = src.schema
properties = schema['properties']
boundary_id_map = list(properties.keys())[0] # use the first property as default
if self.verbosity:
print(f"Using attribute '{boundary_id_map}' as default for boundary ID mapping.")
for feature in src:
if self.name in ['CAMELS_CH', 'CAMELS_IND', 'CABra']:
# from '2004.0' -> '2004' for CAMELS_CH
# from '03001' -> '3001' for CAMELS_IND
catch_id = str(int(feature["properties"][boundary_id_map]))
elif self.name == 'CAMELS_LUX':
idx = int(feature["properties"][boundary_id_map])
if idx < 10:
catch_id = f"ID_{str(idx).zfill(2)}"
else:
catch_id = f"ID_{idx}"
elif self.name == 'Simbi':
catch_id = feature['properties'][boundary_id_map]
catch_id = catch_id.split('-')[1]
elif self.name == 'Caravan_DK':
catch_id = str(feature["properties"][boundary_id_map])
catch_id = str(catch_id).split('_')[1]
elif self.name == 'CAMELS_US':
# in shapefile, for some stations, 0 is missing at the start
catch_id = str(feature["properties"][boundary_id_map])
if len(str(catch_id)) == 7:
catch_id = f"0{catch_id}"
else:
# since we are treating catchment/station id as string
catch_id = str(feature["properties"][boundary_id_map])
geometry = feature["geometry"]
self.bndry_id_map_[catch_id] = geometry
return self.bndry_id_map_
[docs]
def stations(self) -> List[str]:
"""
Names/ids of stations/catchment/gauges or whatever that would
be used to index each station in the dataset.
"""
# Since this is a method,
# it is called multiple times, it is better to cache the result
# and return the cached result instead of reading the data again and again
# The user is recommended to implement this method in the child class in a more efficient way.
return self._static_data().index.tolist()
def _read_dynamic(
self,
stations,
dynamic_features,
st:Union[str, pd.Timestamp] = None,
en:Union[str, pd.Timestamp] = None
) -> Dict[str, pd.DataFrame]:
st, en = self._check_length(st, en)
dyn_feats = validate_attributes(dynamic_features, self.dynamic_features, 'dynamic_features')
stations = validate_attributes(stations, self.stations(), 'stations')
cpus = self.processes or min(get_cpus(), 16)
start = time.time()
if len(stations) < cpus:
cpus = 1
if cpus == 1:
dyn = {}
for idx, stn in enumerate(stations):
stn_df = self._read_stn_dyn(stn).loc[st:en, dyn_feats]
stn_df.columns.name = 'dynamic_features'
stn_df.index.name = 'time'
dyn[stn] = stn_df
if self.verbosity and idx % 100 == 0:
print(f"Read {idx+1}/{len(stations)} stations.")
elif self.verbosity>1 and idx % 50 == 0:
print(f"Read {idx+1}/{len(stations)} stations.")
elif self.verbosity>2 and idx % 10 == 0:
print(f"Read {idx+1}/{len(stations)} stations.")
else:
with cf.ProcessPoolExecutor(cpus) as executor:
results = executor.map(self._read_stn_dyn, stations)
dyn = {}
for stn, stn_df in zip(stations, results):
stn_df.columns.name = 'dynamic_features'
stn_df.index.name = 'time'
dyn[stn] = stn_df.loc[st:en, dyn_feats]
total = time.time() - start
if self.verbosity:
print(f"Read {len(dyn)} stations for {len(dyn_feats)} dyn features in {total:.2f} seconds with {cpus} cpus.")
return dyn
def _read_stn_dyn(self, stn: str) -> pd.DataFrame:
"""
reads dynamic data of one station
parameters
----------
stn : str
name/id of the station for which to read the dynamic data. This
must be one of the station names returned by
:meth:`stations`.
Returns
-------
pd.DataFrame
a :obj:`pandas.DataFrame` with index as time and columns as dynamic features.
The index is a :obj:`pandas.DatetimeIndex` and the columns are the names of
dynamic features.
"""
raise NotImplementedError(f"Must be implemented in the child class")
[docs]
def fetch_static_features(
self,
stations: Union[str, list] = "all",
static_features: Union[str, list] = "all"
) -> pd.DataFrame:
"""Fetches all or selected static features of one or more stations.
Parameters
----------
stations : str/list
name/id of station of which to extract the data
static_features : list/str, optional (default="all")
The name/names of features to fetch. By default, all available
static features are returned.
Returns
-------
pd.DataFrame
a :obj:`pandas.DataFrame`
Examples
--------
>>> from aqua_fetch import CAMELS_AUS
>>> camels = CAMELS_AUS()
>>> camels.fetch_static_features('912101A')
>>> camels.static_features
>>> camels.fetch_static_features('912101A',
... static_features=['elev_mean', 'relief', 'ksat', 'pop_mean'])
for CAMELS_FR
>>> from aqua_fetch import CAMELS_FR
>>> dataset = CAMELS_FR()
get the names of stations
>>> stns = dataset.stations()
>>> len(stns)
654
get all static data of all stations
>>> static_data = dataset.fetch_static_features(stns)
>>> static_data.shape
(472, 210)
get static data of one station only
>>> static_data = dataset.fetch_static_features('42600042')
>>> static_data.shape
(1, 210)
get the names of static features
>>> dataset.static_features
get only selected features of all stations
>>> static_data = dataset.fetch_static_features(stns, ['slope_mean', 'aridity'])
>>> static_data.shape
(472, 2)
>>> data = dataset.fetch_static_features('42600042', static_features=['slope_mean', 'aridity'])
>>> data.shape
(1, 2)
"""
stations = validate_attributes(stations, self.stations(), 'stations')
features = validate_attributes(static_features, self.static_features, 'static_features')
df:pd.DataFrame = self._static_data()
return df.loc[stations, features]
def _static_data(self) -> pd.DataFrame:
"""returns all static data as DataFrame. The index of the DataFrame
is the station ids and the columns are the static features.
This method must be implemented in the child class.
Returns
-------
pd.DataFrame
a :obj:`pandas.DataFrame` with index as station ids and columns as static features.
The index is a :obj:`pandas.Index` and the columns are the names of
static features.
"""
raise NotImplementedError(f"Must be implemented in the child class")
@property
def start(self) -> pd.Timestamp: # start of data
return pd.Timestamp("1800-01-01")
@property
def end(self) -> pd.Timestamp:
"""end of data"""
return pd.Timestamp.today().strftime("%Y-%m-%d")
@property
def static_features(self) -> List[str]:
"""
Returns a list of static features that are available in the dataset.
Since this is a method is called multiple times, it is better to cache the result
and return the cached result instead of reading the data again and again
or the user implementing this method in the child class in a more efficient way.
Returns
-------
List[str]
a list of static features that are available in the dataset.
The names of the features are the same as the names used in the
dataset. The names can be used to fetch the data using
:meth:`fetch_static_features`.
"""
return self._static_data().columns.tolist()
@property
def dynamic_features(self) -> List[str]:
"""
Returns a list of dynamic features that are available in the dataset.
Since this is a method is called multiple times, it is better to cache the result
and return the cached result instead of reading the data again and again
or the user implementing this method in the child class in a more efficient way.
Returns
-------
List[str]
a list of dynamic features that are available in the dataset.
The names of the features are the same as the names used in the
dataset. The names can be used to fetch the data using
:meth:`fetch_dynamic_features`.
"""
return self._read_stn_dyn(self.stations()[0]).columns.tolist()
@property
def _area_name(self) -> str:
"""name of feature from static_features to be used as area"""
raise NotImplementedError
@property
def _mm_feature_name(self) -> str:
return None
@property
def _q_name(self) -> str:
return None
@property
def _coords_name(self) -> List[str]:
"""
names of features from static_features to be used as station
coordinates (lat, long)
"""
raise NotImplementedError
[docs]
def area(
self,
stations: Union[str, List[str]] = 'all'
) -> pd.Series:
"""
Returns area (Km2) of all/selected catchments as :obj:`pandas.Series`
parameters
----------
stations : str/list (default=None)
name/names of stations. Default is ``all``, which will return
area of all stations
Returns
--------
pd.Series
a :obj:`pandas.Series` whose indices are catchment ids and values
are areas of corresponding catchments.
Examples
---------
>>> from aqua_fetch import CAMELS_CH
>>> dataset = CAMELS_CH()
>>> dataset.area() # returns area of all stations
>>> dataset.area('2004') # returns area of station whose id is 2004
>>> dataset.area(['2004', '6004']) # returns area of two stations
"""
stations = validate_attributes(stations, self.stations(), 'stations')
df = self.fetch_static_features(static_features=[catchment_area()])
return df.loc[stations, catchment_area()].astype(self.fp)
def _check_length(self, st, en):
if st is None:
st = self.start
else:
st = pd.Timestamp(st)
if en is None:
en = self.end
else:
en = pd.Timestamp(en)
return st, en
@property
def camels_dir(self):
"""Directory where all camels datasets will be saved. This will under
datasets directory"""
return os.path.join(self.base_ds_dir, "CAMELS")
[docs]
def fetch(self,
stations: Union[str, list, int, float] = "all",
dynamic_features: Union[List[str], str, None] = 'all',
static_features: Union[str, List[str], None] = None,
st: Union[None, str] = None,
en: Union[None, str] = None,
as_dataframe: bool = False,
**kwargs
) -> Tuple[pd.DataFrame, Union[Dict[str, pd.DataFrame], "Dataset"]]:
"""
Fetches the features of one or more stations.
Parameters
----------
stations :
It can have following values:
- int : number of (randomly selected) stations to fetch
- float : fraction of (randomly selected) stations to fetch
- str : name/id of station to fetch. However, if ``all`` is
provided, then all stations will be fetched.
- list : list of names/ids of stations to fetch
dynamic_features : If not None, then it is the features to be
fetched. If None, then all available features are fetched
static_features : list of static features to be fetches. None
means no static attribute will be fetched.
st : starting date of data to be returned. If None, the data will be
returned from where it is available.
en : end date of data to be returned. If None, then the data will be
returned till the date data is available.
as_dataframe : whether to return dynamic features as :obj:`pandas.DataFrame`
or as :obj:`xarray.Dataset`.
kwargs : keyword arguments to read the files
Returns
-------
tuple
A tuple of static and dynamic features. Static features are always
returned as pandas DataFrame with shape (stations, staticfeatures).
The index of static features is the station/gauge ids while the columns
are the static features. Dynamic features are returned as either
xarray Dataset or a dictionary with keys as station names and values as
pandas DataFrame. This depends upon whether `as_dataframe`
is True or False and whether the xarray module is installed or not.
If dynamic features are xarray Dataset, then it consists of `data_vars`
equal to the number of stations and `time` adn `dynamic_features` as
dimensions.
Examples
--------
>>> from aqua_fetch import CAMELS_AUS
>>> dataset = CAMELS_AUS()
>>> # get data of 10% of stations
>>> _, dynamic = dataset.fetch(stations=0.1, as_dataframe=True) # dynamic is a dictionary
... # fetch data of 5 (randomly selected) stations
>>> _, five_random_stn_data = dataset.fetch(stations=5, as_dataframe=True)
... # fetch data of 3 selected stations
>>> _, three_selec_stn_data = dataset.fetch(stations=['912101A','912105A','915011A'], as_dataframe=True)
... # fetch data of a single stations
>>> _, single_stn_data = dataset.fetch(stations='318076', as_dataframe=True)
... # get both static and dynamic features as dictionary
>>> static, dynamic = dataset.fetch(1, static_features="all", as_dataframe=True) # -> dict
>>> dynamic
... # get only selected dynamic features
>>> _, sel_dyn_features = dataset.fetch(stations='318076',
... dynamic_features=['q_mm_obs', 'solrad_wm2_silo'], as_dataframe=True)
... # fetch data between selected periods
>>> _, data = dataset.fetch(stations='318076', st="20010101", en="20101231", as_dataframe=True)
"""
if isinstance(stations, int):
# the user has asked to randomly provide data for some specified number of stations
stations = random.sample(self.stations(), stations)
elif isinstance(stations, list):
pass
elif isinstance(stations, str):
if stations == 'all':
stations = self.stations()
else:
stations = [stations]
elif isinstance(stations, float):
num_stations = int(len(self.stations()) * stations)
stations = random.sample(self.stations(), num_stations)
elif stations is None:
# fetch for all stations
stations = self.stations()
else:
raise TypeError(f"Unknown value provided for stations {stations}")
return self.fetch_stations_features(
stations,
dynamic_features,
static_features,
st=st,
en=en,
as_dataframe=as_dataframe,
**kwargs
)
def _maybe_to_netcdf(self):
# todo : we should save the dynamic data with default names of dynamic features
# and then convert them to the standard names using the dyn_map
# otherwise everytime we change dyn_map, we will have to convert the data again
# and more importantly, all the users who used a previous version of the dataset
# will have to download the data again, which is not good
if self.to_netcdf:
if not self.dyn_fpath_exists or self.overwrite:
# saving all the data in netCDF file using xarray
if self.verbosity: print(f'converting data to netcdf format for faster io operations')
_, data = self.fetch(static_features=None)
data.to_netcdf(self.dyn_fpath)
else:
if self.verbosity:
print(f"dynamic data already exists as {self.dyn_fpath}. "
f"To overwrite, set `overwrite=True`")
return
[docs]
def fetch_stations_features(
self,
stations: Union[str, List[str]],
dynamic_features: Union[str, List[str]] = 'all',
static_features: Union[str, List[str]] = None,
st: Union[str, pd.Timestamp] = None,
en: Union[str, pd.Timestamp] = None,
as_dataframe: bool = False,
**kwargs
) -> Tuple[pd.DataFrame, Union[Dict[str, pd.DataFrame], "Dataset"]]:
"""
Reads features of more than one stations.
parameters
----------
stations :
list of stations for which data is to be fetched.
dynamic_features :
list of dynamic features to be fetched.
if ``all``, then all dynamic features will be fetched.
static_features : list of static features to be fetched.
If ``all``, then all static features will be fetched. If None,
`then no static attribute will be fetched.
st :
start of data to be fetched.
en :
end of data to be fetched.
as_dataframe :
whether to return the dynamic data as pandas dataframe. default
is :obj:`xarray.Dataset` object
kwargs dict:
additional keyword arguments
Returns
-------
tuple
A tuple of static and dynamic features. Static features are always
returned as :obj:`pandas.DataFrame` with shape (stations, staticfeatures).
The index of static features is the station/gauge ids while the columns
are the static features. Dynamic features are returned as either
:obj:`xarray.Dataset` or a :obj:`dict` with keys as station names and values as
:obj:`pandas.DataFrame` depending upon whether `as_dataframe`
is True or False and whether the xarray module is installed or not.
If dynamic features are xarray Dataset, then it consists of `data_vars`
equal to the number of stations and `time` and `dynamic_features` as
dimensions.
Raises:
ValueError, if both dynamic_features and static_features are None
Examples
--------
>>> from aqua_fetch import CAMELS_AUS
>>> dataset = CAMELS_AUS()
... # find out station ids
>>> dataset.stations()
... # get data of selected stations as xarray Dataset
>>> dataset.fetch_stations_features(['912101A', '912105A', '915011A'])
... # get data of selected stations as dictionary of pandas DataFrame
>>> dataset.fetch_stations_features(['912101A', '912105A', '915011A'],
... as_dataframe=True)
... # get both dynamic and static features of selected stations
>>> dataset.fetch_stations_features(['912101A', '912105A', '915011A'],
... dynamic_features=['q_mm_obs', 'airtemp_C_mean_silo'], static_features=['elev_mean'])
"""
if xr is None:
if not as_dataframe:
if self.verbosity: warnings.warn("xarray module is not installed so as_dataframe will have no effect. "
"Dynamic features will be returned as pandas DataFrame")
as_dataframe = True
st, en = self._check_length(st, en)
static, dynamic = None, None
stations = validate_attributes(stations, self.stations(), 'stations')
if dynamic_features is not None:
dynamic_features = validate_attributes(dynamic_features, self.dynamic_features, 'dynamic_features')
if netCDF4 is None or not os.path.exists(self.dyn_fpath):
# read from csv files
# following code will run only once when fetch is called inside init method
dyn = self._read_dynamic(stations, dynamic_features, st=st, en=en)
else:
ds = xr.open_dataset(self.dyn_fpath) # daataset
dyn = ds[stations].sel(dynamic_features=dynamic_features, time=slice(st, en))
ds.close()
if as_dataframe:
dyn = {stn:dyn[stn].to_pandas() for stn in dyn}
if static_features is not None:
static = self.fetch_static_features(stations, static_features)
dynamic = _handle_dynamic(dyn, as_dataframe)
else:
dynamic = _handle_dynamic(dyn, as_dataframe)
elif static_features is not None:
return self.fetch_static_features(stations, static_features), dynamic
else:
raise ValueError(f"static features are {static_features} and dynamic features are {dynamic_features}")
return static, dynamic
[docs]
def fetch_dynamic_features(
self,
station: str,
dynamic_features='all',
st=None,
en=None,
as_dataframe=False
)-> Union[pd.DataFrame, "Dataset"]:
"""Fetches all or selected dynamic features of one station.
Parameters
----------
station : str
name/id of station of which to extract the data
features : list/str, optional (default="all")
The name/names of features to fetch. By default, all available
dynamic features are returned.
st : Optional (default=None)
start time from where to fetch the data.
en : Optional (default=None)
end time untill where to fetch the data
as_dataframe : bool, optional (default=False)
if true, the returned data is pandas DataFrame otherwise it
is :obj:`xarray.Dataset`
Returns
-------
pd.DataFrame/xr.Dataset
a pandas dataframe or xarray dataset of dynamic features
If as_dataframe is True, then the returned data is a pandas
DataFrame whose index is `time` and the columns are
`dynamic_features`. If as_dataframe is False, and xarray
module is installed, then the returned data is xarray dataset with
`data_vars` equal to the number of stations and `time` and `dynamic_features`
as dimensions.
Examples
--------
>>> from aqua_fetch import CAMELS_AUS
>>> camels = CAMELS_AUS()
>>> camels.fetch_dynamic_features('912101A', as_dataframe=True)
>>> camels.dynamic_features
>>> camels.fetch_dynamic_features('912101A',
... dynamic_features=['airtemp_C_awap_max', 'vp_hpa_awap', 'q_cms_obs'],
... as_dataframe=True)
"""
assert isinstance(station, str), f"station id must be string is is of type {type(station)}"
station = [station]
return self.fetch_stations_features(
stations=station,
dynamic_features=dynamic_features,
static_features=None,
st=st,
en=en,
as_dataframe=as_dataframe
)[1]
[docs]
def fetch_station_features(
self,
station: str,
dynamic_features: Union[str, list, None] = 'all',
static_features: Union[str, list, None] = None,
st: Union[str, None] = None,
en: Union[str, None] = None,
**kwargs
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Fetches features for one station.
Parameters
-----------
station :
station id/gauge id for which the data is to be fetched.
dynamic_features : str/list, optional
names of dynamic features/attributes to fetch
static_features :
names of static features/attributes to be fetches
st : str,optional
starting point from which the data to be fetched. By default,
the data will be fetched from where it is available.
en : str, optional
end point of data to be fetched. By default the dat will be fetched
Returns
-------
tuple
A tuple of static and dynamic features, both as :obj:`pandas.DataFrame`.
The dataframe of static features will be of single row while the dynamic
features will be of shape (time, dynamic features).
Examples
--------
>>> from aqua_fetch import CAMELS_AUS
>>> dataset = CAMELS_AUS()
>>> static, dynamic = dataset.fetch_station_features('912101A')
>>> static.shape, dynamic.shape
"""
st, en = self._check_length(st, en)
static, dynamic = None, None
if dynamic_features:
dynamic = self.fetch_dynamic_features(station, dynamic_features, st=st,
en=en, **kwargs)
if xr is not None and isinstance(dynamic, xr.Dataset):
dynamic = dynamic[station].to_pandas()
if isinstance(dynamic, pd.Series):
# when single dynamic feature for a single station
#dynamic = pd.DataFrame(dynamic, columns=[dynamic_features])
dynamic = dynamic.to_frame(name=dynamic_features)
elif isinstance(dynamic, dict):
assert len(dynamic) == 1, f"Expected dynamic dict of length 1, got {len(dynamic)}"
dynamic = dynamic.popitem()[1]
if static_features is not None:
static = self.fetch_static_features(station, static_features)
elif static_features is not None:
static = self.fetch_static_features(station, static_features)
return static, dynamic
[docs]
def plot_stations(
self,
stations: List[str] = 'all',
marker='.',
color:str=None,
ax: plt_Axes = None,
show: bool = True,
**kwargs
) -> plt_Axes:
"""
plots coordinates of stations
Parameters
----------
stations :
name/names of stations. If not given, all stations will be plotted
marker :
marker to use.
color : str, optional
name of static feature to use as color.
ax : plt.Axes
matplotlib axes to draw the plot. If not given, then
new axes will be created.
show : bool
**kwargs
Returns
-------
plt.Axes
Examples
--------
>>> from aqua_fetch import CAMELS_AUS
>>> dataset = CAMELS_AUS()
>>> dataset.plot_stations()
>>> dataset.plot_stations(['1', '2', '3'])
>>> dataset.plot_stations(marker='o', ms=0.3)
>>> ax = dataset.plot_stations(marker='o', ms=0.3, show=False)
>>> ax.set_title("Stations")
>>> plt.show()
using area as color
>>> ds.plot_stations(color='area_km2')
"""
from easy_mpl.utils import add_cbar, map_array_to_cmap
xy = self.stn_coords(stations)
_kws = dict(
ax_kws=dict(xlabel="Longitude",
ylabel="Latitude",
title=f"{self.name} Stations (n={len(xy)})",
))
_kws.update(kwargs)
if color is not None:
assert color in self.static_features, f"color {color} is not in static features {self.static_features}"
c = self.fetch_static_features(stations, color)
c = c.astype('float32')
ul = round(c[color].quantile([0.99]).item(), 2)
if self.verbosity > 0:
print(f"Setting upper limit to {ul} for color scale")
c[c>ul] = ul
colorbar = _kws.pop('colorbar', True)
_kws['cmap'] = _kws.get('cmap', 'viridis')
ax, _= easy_mpl.scatter(
xy.loc[:, 'long'].values,
xy.loc[:, 'lat'].values,
ax=ax,
c=c.values.reshape(-1,),
show=False,
**_kws)
if colorbar:
c, mapper = map_array_to_cmap(c.values.reshape(-1,), _kws['cmap'])
add_cbar(ax, mappable=mapper, pad=0.3,
border=False,
title=color, title_kws=dict(fontsize=12))
else:
ax = easy_mpl.plot(xy.loc[:, 'long'].values,
xy.loc[:, 'lat'].values,
marker, ax=ax,
show=False,
**_kws)
if show:
plt.show()
return ax
[docs]
def q_mm(
self,
stations: Union[str, List[str]] = "all"
) -> pd.DataFrame:
"""
returns streamflow in the units of milimeter per timestep (e.g. mm/day or mm/hour). This is obtained
by diving ``q``/area
parameters
----------
stations : str/list
name/names of stations. Default is ``all``, which will return
q_mm of all stations
Returns
--------
pd.DataFrame
a :obj:`pandas.DataFrame` whose indices are time-steps and columns
are catchment/station ids.
"""
stations = validate_attributes(stations, self.stations(), 'stations')
if self._mm_feature_name is None:
_, q = self.fetch_stations_features(
stations,
dynamic_features="q_cms_obs",
as_dataframe=True)
q = pd.DataFrame.from_dict({stn:df['q_cms_obs'] for stn,df in q.items()})
area_m2 = self.area(stations) * 1e6 # area in m2
# Determine time conversion based on timestep
if self.timestep.lower().startswith('h'):
time_conversion = 3600 # seconds per hour
elif self.timestep.lower().startswith('d'):
time_conversion = 86400 # seconds per day
elif self.timestep.lower().startswith('15min'):
time_conversion = 900 # seconds per 15 minutes
else:
raise ValueError(f"Timestep '{self.timestep}' not supported.")
q = (q / area_m2) * time_conversion # cms to m
return q * 1e3 # to mm
else:
_, q = self.fetch_stations_features(
stations,
dynamic_features=self._mm_feature_name,
as_dataframe=True)
q = pd.DataFrame.from_dict({stn:df[self._mm_feature_name] for stn,df in q.items()})
return q
[docs]
def stn_coords(
self,
stations: Union[str, List[str]] = 'all'
) -> pd.DataFrame:
"""
returns coordinates of stations as DataFrame
with ``long`` and ``lat`` as columns.
Parameters
----------
stations :
name/names of stations. If not given, coordinates
of all stations will be returned.
Returns
-------
pd.DataFrame
:obj:`pandas.DataFrame` with ``long`` and ``lat`` columns.
The length of dataframe will be equal to number of stations
wholse coordinates are to be fetched.
Examples
--------
>>> from aqua_fetch import CAMELS_CH
>>> dataset = CAMELS_CH()
>>> dataset.stn_coords() # returns coordinates of all stations
>>> dataset.stn_coords('2004') # returns coordinates of station whose id is 2004
>>> dataset.stn_coords(['2004', '6004']) # returns coordinates of two stations
>>> from aqua_fetch import CAMELS_AUS
>>> dataset = CAMELS_AUS()
>>> dataset.stn_coords() # returns coordinates of all stations
>>> dataset.stn_coords('912101A') # returns coordinates of station whose id is 912101A
>>> dataset.stn_coords(['G0050115', '912101A']) # returns coordinates of two stations
"""
df = self.fetch_static_features(static_features=[gauge_latitude(), gauge_longitude()])
#df.columns = ['lat', 'long']
stations = validate_attributes(stations, self.stations(), 'stations')
df = df.loc[stations, :].astype(self.fp)
return self.transform_stn_coords(df)
[docs]
def get_boundary(
self,
catchment_id: str,
to_wgs84: bool = True
):
"""
returns boundary of a catchment in a required format
Parameters
----------
catchment_id : str
name/id of catchment
to_wgs84 : bool, optional (default=True)
if True, then the boundary will be transformed to WGS84 (EPSG:4326)
if it is not already in WGS84.
Returns
-------
geometry : fiona.Geometry
Examples
--------
>>> from aqua_fetch import CAMELS_SE
>>> dataset = CAMELS_SE()
>>> dataset.get_boundary(dataset.stations()[0])
"""
assert isinstance(catchment_id, str), f"catchment_id must be string but is of type {type(catchment_id)}"
# todo : when we repeatedly call get_boundary, we should not create the
# boundary_id_map for all catchments again
if self.name in ['Thailand', 'Japan', 'Arcticnet', 'Spain']:
bndry_id_map = self.gsha._create_boundary_id_map()
elif self.name in ['USGS']:
bndry_id_map = self.hysets._create_boundary_id_map()
else:
bndry_id_map = self._create_boundary_id_map()
if self.name in ['HYSETS']:
catchment_id = self.WatershedID_OfficialID_map[catchment_id]
elif self.name == 'Thailand':
catchment_id = catchment_id.replace('.', '_')
if self.name in ['Thailand', 'Japan', 'Arcticnet', 'Spain']:
catchment_id = f"{catchment_id}_{self.agency_name}"
geometry = bndry_id_map[catchment_id]
if to_wgs84:
geometry = self.transform_boundary(geometry)
return geometry
[docs]
def plot_catchment(
self,
catchment_id: str,
show_outlet:bool = False,
ax: plt_Axes = None,
show: bool = True,
**kwargs
):
"""
plots catchment boundaries
Parameters
----------
catchment_id : str
name/id of catchment to plot
show_outlet : bool, optional (default=False)
if True, then outlet of the catchment will be plotted as a red dot
ax : plt.Axes
matplotlib axes to draw the plot. If not given, then
new axes will be created.
show : bool
**kwargs
Returns
-------
plt.Axes
Examples
--------
>>> from aqua_fetch import CAMELS_AUS
>>> dataset = CAMELS_AUS()
>>> dataset.plot_catchment('912101A')
>>> dataset.plot_catchment('912101A', marker='o', ms=0.3)
>>> ax = dataset.plot_catchment('912101A', marker='o', ms=0.3, show=False)
>>> ax.set_title("Catchment Boundary")
>>> plt.show()
# show the outlet as well
>>> CAMELS_AUS.plot_catchment('912101A', show_outlet=True)
"""
geometry = self.get_boundary(catchment_id)
rings:List[np.ndarray] = _make_boundary_2d(geometry)
_kws = dict(
ax_kws=dict(xlabel="Longitude", ylabel="Latitude")
)
_kws.update(kwargs)
for ring in rings:
ax = easy_mpl.plot(ring[:, 0], ring[:, 1],
show=False, ax=ax, **_kws)
if show_outlet:
coords = self.stn_coords(catchment_id)
ax.scatter(coords['long'], coords['lat'], marker='o',
color='red',
s=10,
label='Outlet')
if show:
plt.show()
return ax
[docs]
def plot_num_observations(
self,
stations: Union[str, List[str]] = 'all',
dynamic_features: Union[str, List[str]] = 'all',
start: Union[str, pd.Timestamp] = None,
end: Union[str, pd.Timestamp] = None,
show_constant: bool = False,
figsize: Tuple[float, float] = None,
ax = None,
show: bool = True
):
"""
Plots the number of observations available for different dynamic features
as cumulative distribution function (CDF). This plot is not plotted if
all stations have same number of observations for a dynamic feature.
Parameters
----------
stations : Union[str, List[str]]
The stations to include in the plot. If 'all', all stations will be included.
dynamic_features : Union[str, List[str]]
The dynamic features to include in the plot. If 'all', all dynamic features will be
included.
start : Union[str, pd.Timestamp], optional
The start date for the data to consider. If None, the start date of the dataset will be used.
end : Union[str, pd.Timestamp], optional
The end date for the data to consider. If None, the end date of the dataset will be used.
show_constant : bool, optional
Whether to show features with constant number of observations across stations.
If True, these features will be included in the plot as well.
figsize : Tuple[float, float], optional
The size of the figure to create. If None, a default size will be used.
ax : plt.Axes, optional
The matplotlib axes to draw the plot. If not given, then
new axes will be created.
show : bool, optional
Whether to display the plot immediately.
Returns
-------
plt.Axes
The matplotlib axes containing the plot.
Examples
--------
>>> from aqua_fetch import CAMELS_FI
>>> dataset = CAMELS_FI()
>>> dataset.plot_num_observations()
# plotting for different time periods
>>> dataset = RainfallRunoff('CAMELS_COL')
>>> _, ax = plt.subplots()
>>> for idx, period in enumerate([("19810101", "19901231"), ("19910101", "20001231"), ("20010101", "20101231")]):
>>> start, end = period
>>> ax = dataset.plot_num_observations(
>>> dynamic_features=['q_cms_obs'],
>>> ax=ax,
>>> start=start, end=end, show=False)
>>> ax.lines[idx].set_label(f'{start} to {end}')
>>> assert isinstance(ax, plt.Axes)
>>> ax.legend()
>>> plt.show()
"""
_, dyn = self.fetch(
stations,
dynamic_features=dynamic_features,
as_dataframe=False,
st=start, en=end)
num_points = 100
if ax is None:
fig, ax = plt.subplots(figsize=figsize or (7, 7))
for feature in dyn.dynamic_features.data:
d = dyn.sel(dynamic_features=feature).to_dataframe().drop(columns=['dynamic_features'], errors='ignore')
if d.isna().sum().sum() < 10:
print(f"Skipping {feature} due to no missing values.")
continue
data = d.count().values.reshape(-1, )
x = np.linspace(np.min(data), np.max(data), num_points)
y_values = np.sum(data[:, np.newaxis] <= x, axis=0) #/ data.size
y_values = y_values.reshape(-1, )
label = f"{feature} (n={d.count().sum()})"
# if np.allclose([100.], [y_values.sum()]):
if np.all(y_values == y_values[0]):
print(f"All stations for {feature} have {int(x[0])} observations.")
if show_constant:
easy_mpl.plot(x[0], y_values[0], '*', label=label, ax=ax, show=False)
continue
#y = np.linspace(0, len(data), num_points).astype(int)
easy_mpl.plot(x, y_values, label=label, ax=ax, show=False)
ax.set_ylabel("Number of stations")
ax.set_xlabel("Number of observations")
# ax.set_ylabel("Fraction of stations")
ax.grid()
if show:
plt.show()
return ax
def _handle_dynamic(
dyn,
as_dataframe: bool
) -> Union[Dict[str, pd.DataFrame], "Dataset"]:
if as_dataframe and isinstance(dyn, dict) and isinstance(list(dyn.values())[0], pd.DataFrame):
# if the dyn is a dictionary of station, DataFames pairs, and each DataFrame's index is 'time'
for station, df in dyn.items():
assert isinstance(df, pd.DataFrame), f"Data for station {station} is not a DataFrame"
elif isinstance(dyn, dict) and isinstance(list(dyn.values())[0], pd.DataFrame):
assert xr is not None, f"For as_dataframe {as_dataframe} and dyn: {type(dyn)}, xarray module must be installed"
# dyn is a dictionary of key, DataFames and we have to return xr Dataset
dyn = xr.Dataset(dyn)
return dyn