Source code for aqua_fetch.rr._hype


import os
import json
from typing import Union, List

import numpy as np
import pandas as pd 

from .utils import _RainfallRunoff
from ..utils import validate_attributes


[docs] class HYPE(_RainfallRunoff): """ Downloads and preprocesses HYPE [1]_ dataset from Lindstroem et al., 2010 [2]_ . This is a rainfall-runoff dataset of Costa Rica of 564 stations from 1985 to 2019 at daily, monthly and yearly time steps. Examples -------- >>> from aqua_fetch import HYPE >>> dataset = HYPE() ... # get data by station id >>> _, dynamic = dataset.fetch(stations='564', as_dataframe=True) >>> df = dynamic['564'] # dynamic is a dictionary of with keys as station names and values as DataFrames >>> df.shape (12783, 9) ... ... # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 564 ... # get data of 10 % of stations as dataframe >>> _, dynamic = dataset.fetch(0.1, as_dataframe=True) >>> len(dynamic) # dynamic has data for 10% of stations (67 out of 671) 67 ... ... # dynamic is a dictionary whose values are dataframes of dynamic features >>> [df.shape for df in dynamic.values()] [(12783, 9), (12783, 9), (12783, 9),... (12783, 9), (12783, 9)] ... ... get the data of a single (randomly selected) station >>> _, dynamic = dataset.fetch(stations=1, as_dataframe=True) >>> len(dynamic) # dynamic has data for 1 station 1 ... # get names of available dynamic features >>> dataset.dynamic_features ... # get only selected dynamic features >>> _, dynamic = dataset.fetch('564', as_dataframe=True, ... dynamic_features=['AET_mm', 'Prec_mm', 'Streamflow_mm']) >>> dynamic['564'].shape (12783, 3) ... ... # get names of available static features >>> dataset.static_features ... # get data of 10 random stations >>> _, dynamic = dataset.fetch(10, as_dataframe=True) >>> len(dynamic) # remember this is a dictionary with values as dataframe 10 ... # If we get both static and dynamic data >>> static, dynamic = dataset.fetch(stations='564', static_features="all", as_dataframe=True) >>> static.shape, len(dynamic), dynamic['564'].shape ((1, 59), 1, (12783, 9)) ... # If we don't set as_dataframe=True and have xarray installed then the returned data will be a xarray Dataset >>> _, dynamic = dataset.fetch(10) ... type(dynamic) xarray.core.dataset.Dataset ... >>> dynamic.dims FrozenMappingWarningOnValuesAccess({'time': 12783, 'dynamic_features': 9}) ... >>> len(dynamic.data_vars) 10 ... >>> coords = dataset.stn_coords() # returns coordinates of all stations >>> coords.shape (564, 2) >>> dataset.stn_coords('564') # returns coordinates of station whose id is 564 40.480419 -123.890877 >>> dataset.stn_coords(['564', '563']) # returns coordinates of two stations ... # get area of a single station >>> dataset.area('564') # get coordinates of two stations >>> dataset.area(['564', '563']) ... # if fiona library is installed we can get the boundary as fiona Geometry >>> dataset.get_boundary('564') .. [1] https://zenodo.org/record/4029572 .. [2] https://doi.org/10.2166/nh.2010.007 """ url = [ "https://zenodo.org/record/581435", "https://zenodo.org/record/4029572" ] dynamic_features = [ 'AET_mm', 'Baseflow_mm', 'Infiltration_mm', 'SM_mm', 'Streamflow_mm', 'Runoff_mm', 'Qsim_m3-s', 'Prec_mm', 'PET_mm' ]
[docs] def __init__(self, time_step: str = 'daily', path = None, **kwargs): """ Parameters ---------- path : str If the data is alredy downloaded then provide the complete path to it. If None, then the data will be downloaded. The data is downloaded once and therefore susbsequent calls to this class will not download the data unless ``overwrite`` is set to True. time_step : str one of ``daily``, ``month`` or ``year`` **kwargs key word arguments """ assert time_step in ['daily', 'month', 'year'] self.time_step = time_step self.path = path super().__init__(path=path, **kwargs) self._download() fpath = os.path.join(self.path, 'hype_year_dyn.nc') if not os.path.exists(fpath): self.time_step = 'daily' self._maybe_to_netcdf() self.time_step = 'month' self._maybe_to_netcdf() self.time_step = 'year' self._maybe_to_netcdf() self.time_step = time_step
[docs] def stations(self) -> list: _stations = np.arange(1, 565).astype(str) return list(_stations)
@property def static_features(self): return [] def _read_dynamic(self, stations: list, features: Union[str, list] = 'all', st=None, en=None, ): dynamic_features = validate_attributes(features, self.dynamic_features) _dynamic_features = [] for dyn_attr in dynamic_features: pref, suff = dyn_attr.split('_')[0], dyn_attr.split('_')[-1] _dyn_attr = f"{pref}_{self.time_step}_{suff}" _dynamic_features.append(_dyn_attr) df_attrs = {} for dyn_attr in _dynamic_features: fname = f"{dyn_attr}.csv" fpath = os.path.join(self.path, fname) index_col_name = 'DATE' if fname in ['SM_month_mm.csv', 'SM_year_mm.csv']: index_col_name = 'Date' _df = pd.read_csv(fpath, index_col=index_col_name) _df.index = pd.to_datetime(_df.index) # todo, some stations have wider range than self.st/self.en df_attrs[dyn_attr] = _df.loc[self.start:self.end] stns_dfs = {} for st in stations: stn_dfs = [] cols = [] for dyn_attr, dyn_df in df_attrs.items(): stn_dfs.append(dyn_df[st]) col_name = f"{dyn_attr.split('_')[0]}_{dyn_attr.split('_')[-1]}" # get original name without time_step cols.append(col_name) stn_df = pd.concat(stn_dfs, axis=1) stn_df.columns = cols stn_df.columns.name = 'dynamic_features' stn_df.index.name = 'time' stns_dfs[st] = stn_df return stns_dfs @property def _mm_feature_name(self) ->str: return 'Streamflow_mm'
[docs] def fetch_static_features(self, station, static_features=None): """static data for HYPE is not available.""" raise ValueError(f'No static feature for {self.name}')
[docs] def area( self, stations: Union[str, List[str]] = "all" ) ->pd.Series: """ Returns area (Km2) of all catchments as :obj:`pandas.Series` parameters ---------- stations : str/list name/names of stations. Default is None, which will return area of all stations Returns -------- pd.Series a :obj:`pandas.Series` whose indices are catchment ids and values are areas of corresponding catchments. Examples --------- >>> from aqua_fetch import HYPE >>> dataset = HYPE() >>> dataset.area() # returns area of all stations >>> dataset.stn_coords('2') # returns area of station whose id is 912101A >>> dataset.stn_coords(['2', '605']) # returns area of two stations """ stations = validate_attributes(stations, self.stations()) fpath = os.path.join(self.path, 'Catchments_CostaRica.geojson') with open(fpath, 'r') as fp: data = json.load(fp) areas = [] indices = [] indices = [] for idx, feature in enumerate(data['features']): area_m2 = feature['properties']['Area m2'] areas.append(area_m2/1e6) indices.append(str(feature['properties']['subid'])) s = pd.Series( np.array(areas), name="area_km2", index=indices) return s.loc[stations]
[docs] def stn_coords( self, stations:Union[str, List[str]] = "all" ) ->pd.DataFrame: """ returns coordinates of stations as DataFrame with ``long`` and ``lat`` as columns. Parameters ---------- stations : name/names of stations. If not given, coordinates of all stations will be returned. Examples -------- >>> dataset = HYPE() >>> dataset.stn_coords() # returns coordinates of all stations >>> dataset.stn_coords('2') # returns coordinates of station whose id is 912101A >>> dataset.stn_coords(['2', '605']) # returns coordinates of two stations """ stations = validate_attributes(stations, self.stations(), 'stations') fpath = os.path.join(self.path, 'Catchments_CostaRica.geojson') with open(fpath, 'r') as fp: data = json.load(fp) lats = [] longs = [] indices = [] for idx, feature in enumerate(data['features']): coord = feature['geometry']['coordinates'] lat = feature['properties']['Latitude'] if len(coord) == 1: xy = np.array(coord)[0] else: xy = np.array(coord[0]) long = xy[:, 0].min() longs.append(long) lats.append(lat) indices.append(str(feature['properties']['subid'])) df = pd.DataFrame( np.vstack([np.array(lats), np.array(longs)]).transpose(), columns=['lat', 'long'], index=indices) return df.loc[stations, :]
@property def start(self): return '19850101' @property def end(self): return '20191231'