Source code for aqua_fetch.rr._brazil


__all__ = ['CAMELS_BR', 'CABra']

import os
import glob
import concurrent.futures as cf
from typing import Union, List, Dict

import numpy as np
import pandas as pd

from .utils import _RainfallRunoff
from ..utils import validate_attributes, get_cpus, download, unzip
from ._map import (
    min_air_temp,
    max_air_temp,
    mean_air_temp,
    mean_air_temp_with_specifier,
    min_air_temp_with_specifier,
    max_air_temp_with_specifier,
    total_potential_evapotranspiration_with_specifier,
    actual_evapotranspiration_with_specifier,
    total_precipitation_with_specifier,
    observed_streamflow_cms,
    observed_streamflow_mm,
    mean_rel_hum_with_specifier,
    mean_windspeed_with_specifier,
    solar_radiation_with_specifier,
)

from ._map import (
    catchment_area,
    gauge_latitude,
    gauge_longitude,
    slope
    )

from .._backend import xarray as xr

# directory separator
SEP = os.sep


[docs] class CAMELS_BR(_RainfallRunoff): """ This is a dataset of 897 Brazilian catchments with 67 static features and 10 dyanmic features for each catchment. The dyanmic features are timeseries from 1920-01-01 to 2019-02-28. This class downloads and processes CAMELS dataset of Brazil as provided by `VP Changas et al., 2020 <https://doi.org/10.5194/essd-12-2075-2020>`_ . The simulated streamflow of 593 and raw streamflow of 3679 stations shipped with this data is not included in dynamic features. Both can be fetched through fetch_simulated_streamflow and fetch_raw_streamflow methods. Examples -------- >>> from aqua_fetch import CAMELS_BR >>> dataset = CAMELS_BR() ... # get data by station id >>> _, dynamic = dataset.fetch(stations='46035000', as_dataframe=True) >>> df = dynamic['46035000'] # dynamic is a dictionary of with keys as station names and values as DataFrames >>> df.shape (14245, 10) ... ... # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 593 ... # get data of 10 % of stations as dataframe >>> _, dynamic = dataset.fetch(0.1, as_dataframe=True) >>> len(dynamic) # dynamic has data for 10% of stations (59 out of 593) 59 ... ... # dynamic is a dictionary whose values are dataframes of dynamic features >>> [df.shape for df in dynamic.values()] [(14245, 10), (14245, 10), (14245, 10),... (14245, 10), (14245, 10)] ... ... get the data of a single (randomly selected) station >>> _, dynamic = dataset.fetch(stations=1, as_dataframe=True) >>> len(dynamic) # dynamic has data for 1 station 1 ... # get names of available dynamic features >>> dataset.dynamic_features ... # get only selected dynamic features >>> _, dynamic = dataset.fetch('46035000', as_dataframe=True, ... dynamic_features=['pcp_mm_cpc', 'aet_mm_mgb', 'airtemp_C_mean', 'q_cms_obs']) >>> dynamic['46035000'].shape (14245, 4) ... ... # get names of available static features >>> dataset.static_features ... # get data of 10 random stations >>> _, dynamic = dataset.fetch(10, as_dataframe=True) >>> len(dynamic) # remember this is a dictionary with values as dataframe 10 ... # If we get both static and dynamic data >>> static, dynamic = dataset.fetch(stations='46035000', static_features="all", as_dataframe=True) >>> static.shape, len(dynamic), dynamic['46035000'].shape ((1, 67), 1, (14245, 10)) ... # If we don't set as_dataframe=True and have xarray installed then the returned data will be a xarray Dataset >>> _, dynamic = dataset.fetch(10) ... type(dynamic) xarray.core.dataset.Dataset >>> dynamic.dims FrozenMappingWarningOnValuesAccess({'time': 14245, 'dynamic_features': 10}) ... >>> len(dynamic.data_vars) 10 ... >>> coords = dataset.stn_coords() # returns coordinates of all stations >>> coords.shape (593, 2) >>> dataset.stn_coords('46035000') # returns coordinates of station whose id is 46035000 -12.8686 -43.3797 >>> dataset.stn_coords(['46035000', '57170000']) # returns coordinates of two stations ... # get area of a single station >>> dataset.area('46035000') # get coordinates of two stations >>> dataset.area(['46035000', '57170000']) ... # if fiona library is installed we can get the boundary as fiona Geometry >>> dataset.get_boundary('46035000') """ url = "https://zenodo.org/record/3964745#.YA6rUxZS-Uk" urls = { '01_CAMELS_BR_attributes.zip': 'https://zenodo.org/records/3964745/files/', '02_CAMELS_BR_streamflow_m3s.zip': 'https://zenodo.org/records/3964745/files/', '03_CAMELS_BR_streamflow_mm_selected_catchments.zip': 'https://zenodo.org/records/3964745/files/', '04_CAMELS_BR_streamflow_simulated.zip': 'https://zenodo.org/records/3964745/files/', '05_CAMELS_BR_precipitation_chirps.zip': 'https://zenodo.org/records/3964745/files/', '06_CAMELS_BR_precipitation_mswep.zip': 'https://zenodo.org/records/3964745/files/', '07_CAMELS_BR_precipitation_cpc.zip': 'https://zenodo.org/records/3964745/files/', '08_CAMELS_BR_evapotransp_gleam.zip': 'https://zenodo.org/records/3964745/files/', '09_CAMELS_BR_evapotransp_mgb.zip': 'https://zenodo.org/records/3964745/files/', '10_CAMELS_BR_potential_evapotransp_gleam.zip': 'https://zenodo.org/records/3964745/files/', '11_CAMELS_BR_temperature_min_cpc.zip': 'https://zenodo.org/records/3964745/files/', '12_CAMELS_BR_temperature_mean_cpc.zip': 'https://zenodo.org/records/3964745/files/', '13_CAMELS_BR_temperature_max_cpc.zip': 'https://zenodo.org/records/3964745/files/', '14_CAMELS_BR_catchment_boundaries.zip': 'https://zenodo.org/records/3964745/files/', '15_CAMELS_BR_gauges_location_shapefile.zip': 'https://zenodo.org/records/3964745/files/' } folders = {'streamflow_m3s_raw': '02_CAMELS_BR_streamflow_m3s', 'streamflow_mm': '03_CAMELS_BR_streamflow_mm_selected_catchments', 'simulated_streamflow_m3s': '04_CAMELS_BR_streamflow_simulated', 'precipitation_cpc': '07_CAMELS_BR_precipitation_cpc', 'precipitation_mswep': '06_CAMELS_BR_precipitation_mswep', 'precipitation_chirps': '05_CAMELS_BR_precipitation_chirps', 'evapotransp_gleam': '08_CAMELS_BR_evapotransp_gleam', 'evapotransp_mgb': '09_CAMELS_BR_evapotransp_mgb', 'potential_evapotransp_gleam': '10_CAMELS_BR_potential_evapotransp_gleam', 'temperature_min': '11_CAMELS_BR_temperature_min_cpc', 'temperature_mean': '12_CAMELS_BR_temperature_mean_cpc', 'temperature_max': '13_CAMELS_BR_temperature_max_cpc' }
[docs] def __init__(self, path=None, verbosity: int = 1, **kwargs): """ parameters ---------- path : str If the data is alredy downloaded then provide the complete path to it. If None, then the data will be downloaded. The data is downloaded once and therefore susbsequent calls to this class will not download the data unless ``overwrite`` is set to True. """ super().__init__(path=path, name="CAMELS_BR", verbosity=verbosity, **kwargs) if not os.path.exists(self.path): os.makedirs(self.path) for zipfilename, url in self.urls.items(): zip_fpath = os.path.join(self.path, zipfilename) if not self.overwrite: if os.path.exists(self.path): if self.verbosity: print(f"{self.path} already exists. Skipping download.") continue unzipped_fpath = zip_fpath.replace('.zip', '') if os.path.exists(unzipped_fpath): if self.verbosity: print(f"{unzipped_fpath} already exists. Skipping download.") continue if self.dyn_fpath_exists and os.path.exists(os.path.join(self.path, 'static_features.csv')): if self.verbosity: print(f"All files already exist in {self.path}. Skipping download.") continue #if not os.path.exists(fpath) or (os.path.exists(fpath) and self.overwrite): if self.verbosity: print(f"Downloading {zipfilename} from {url + zipfilename} at {zip_fpath}") download(url + zipfilename, self.path, verbosity=self.verbosity) unzip(self.path, verbosity=self.verbosity) # elif self.verbosity>1: # print(f"{fpath} already exists") # todo : dynamic data must be stored for all stations and not only for stations which are common among all attributes self._maybe_to_netcdf()
@property def boundary_file(self) -> os.PathLike: return os.path.join( self.path, "14_CAMELS_BR_catchment_boundaries", "14_CAMELS_BR_catchment_boundaries", "camels_br_catchments.shp" ) @property def boundary_id_map(self) -> str: """ Name of the attribute in the boundary (.shp/.gpkg) file that will be used to map the catchment/station id to the geometry of the catchment/station. This is used to create the boundary id map. """ return "gauge_id" @property def static_map(self) -> Dict[str, str]: return { 'area': catchment_area(), 'slope_mean': slope('degrees'), 'gauge_lat': gauge_latitude(), 'gauge_lon': gauge_longitude(), } @property def dyn_map(self): # table 1 in paper return { 'streamflow_mm': observed_streamflow_mm(), 'temperature_min': min_air_temp(), 'temperature_max': max_air_temp(), 'temperature_mean': mean_air_temp(), 'precipitation_mswep': total_precipitation_with_specifier('mswep'), 'precipitation_chirps': total_precipitation_with_specifier('chirps'), 'precipitation_cpc': total_precipitation_with_specifier('cpc'), 'potential_evapotransp_gleam': total_potential_evapotranspiration_with_specifier('gleam'), 'evapotransp_gleam': actual_evapotranspiration_with_specifier('gleam'), 'evapotransp_mgb': actual_evapotranspiration_with_specifier('mgb'), } @property def dyn_generators(self): return { # new column to be created : function to be applied, inputs observed_streamflow_cms(): (self.mm_to_cms, observed_streamflow_mm()), #mean_air_temp(): (self.mean_temp, (min_air_temp(), max_air_temp())), } @property def _all_dirs(self): """All the folders in the dataset_directory""" return [f for f in os.listdir(self.path) if os.path.isdir(os.path.join(self.path, f))] @property def static_dir(self): path = None for _dir in self._all_dirs: if "attributes" in _dir: # supposing that 'attributes' axist in only one file/folder in self.path path = os.path.join(self.path, f'{_dir}{SEP}{_dir}') return path @property def static_files(self): all_files = None if self.static_dir is not None: all_files = glob.glob(f"{self.static_dir}/*.txt") return all_files @property def dynamic_features(self) -> List[str]: features = list(CAMELS_BR.folders.keys()) features.remove('simulated_streamflow_m3s') # todo: why we need to remove this? features.remove('streamflow_m3s_raw') return [self.dyn_map.get(feature, feature) for feature in features] + list(self.dyn_generators.keys()) @property def static_attribute_categories(self): static_attrs = [] for f in self.static_files: ff = str(os.path.basename(f).split('.txt')[0]) static_attrs.append('_'.join(ff.split('_')[2:])) return static_attrs @property def static_features(self): static_fpath = os.path.join(self.path, 'static_features.csv') if not os.path.exists(static_fpath): files = glob.glob( f"{os.path.join(self.path, '01_CAMELS_BR_attributes', '01_CAMELS_BR_attributes')}/*.txt") cols = [] for f in files: _df = pd.read_csv(f, sep=' ', index_col='gauge_id', nrows=1) cols += list(_df.columns) else: df = pd.read_csv(static_fpath, index_col='gauge_id', nrows=1) cols = list(df.columns) return [self.static_map.get(feat, feat) for feat in cols] @property def start(self): return "19200601" @property def end(self): return "20190228"
[docs] def q_mm( self, stations: Union[str, List[str]] = "all" ) -> pd.DataFrame: """ returns streamflow in the units of milimeter per day. he name of original timeseries is ``streamflow_mm``. parameters ---------- stations : str/list name/names of stations. Default is None, which will return area of all stations Returns -------- pd.DataFrame a :obj:`pandas.DataFrame` whose indices are time-steps and columns are catchment/station ids. """ # todo: better avoid remove this method since parent class has it stations = validate_attributes(stations, self.stations()) _, q = self.fetch_stations_features(stations, dynamic_features=observed_streamflow_mm(), as_dataframe=True) #q.index = q.index.get_level_values(0) q = pd.DataFrame.from_dict({stn:df[observed_streamflow_mm()] for stn,df in q.items()}) # area_m2 = self.area(stations) * 1e6 # area in m2 # q = (q / area_m2) * 86400 # cms to m/day return q # * 1e3 # to mm/day
[docs] def area( self, stations: Union[str, List[str]] = "all", source: str = "gsim", ) -> pd.Series: """ Returns area (Km2) of all catchments as :obj:`pandas.Series` parameters ---------- stations : str/list name/names of stations. Default is None, which will return area of all stations source : str source of area calculation. It should be either ``gsim`` or ``ana`` Returns -------- pd.Series a :obj:`pandas.Series` whose indices are catchment ids and values are areas of corresponding catchments. Examples --------- >>> from aqua_fetch import CAMELS_BR >>> dataset = CAMELS_BR() >>> dataset.area() # returns area of all stations >>> dataset.stn_coords('65100000') # returns area of station whose id is 912101A >>> dataset.stn_coords(['65100000', '64075000']) # returns area of two stations """ SRC_MAP = { 'gsim': 'area_gsim', 'ana': 'area_ana' } stations = validate_attributes(stations, self.stations(), 'stations') fpath = os.path.join(self.path, '01_CAMELS_BR_attributes', '01_CAMELS_BR_attributes', 'camels_br_location.txt') df = pd.read_csv(fpath, sep=' ') df.index = df['gauge_id'].astype(str) s = df[SRC_MAP[source]] s.name = 'area_km2' return s.loc[stations]
[docs] def stn_coords( self, stations: Union[str, List[str]] = 'all' ) -> pd.DataFrame: """ returns coordinates of stations as :obj:`pandas.DataFrame` with ``long`` and ``lat`` as columns. Parameters ---------- stations : name/names of stations. If not given, coordinates of all stations will be returned. Returns ------- pd.DataFrame :obj:`pandas.DataFrame` with ``long`` and ``lat`` columns. The length of dataframe will be equal to number of stations wholse coordinates are to be fetched. Examples -------- >>> dataset = CAMELS_BR() >>> dataset.stn_coords() # returns coordinates of all stations >>> dataset.stn_coords('65100000') # returns coordinates of station whose id is 912101A >>> dataset.stn_coords(['65100000', '64075000']) # returns coordinates of two stations """ fpath = os.path.join(self.path, '01_CAMELS_BR_attributes', '01_CAMELS_BR_attributes', 'camels_br_location.txt') df = pd.read_csv(fpath, sep=' ') df.index = df['gauge_id'].astype(str) df = df[['gauge_lat', 'gauge_lon']] df.columns = ['lat', 'long'] stations = validate_attributes(stations, self.stations(), 'stations') return df.loc[stations, :]
[docs] def all_stations(self, feature: str) -> List[str]: """Tells all station ids for which a data of a specific attribute is available.""" # # first check if static_features.csv is available, if yes then read station ids from there # # because folders may not be available # fpath = os.path.join(self.path, 'static_features.csv') # if os.path.exists(fpath): # return pd.read_csv(fpath, usecols=[0], dtype={'gauge_id': str})['gauge_id'].values.tolist() p = self.folders[feature] return [f.split('_')[0] for f in os.listdir(os.path.join(self.path, p, p))]
[docs] def stations( self, ) -> List[str]: """ Returns a list of station ids. Example ------- >>> dataset = CAMELS_BR() >>> stations = dataset.stations() """ if self.dyn_fpath_exists and xr is not None: return list(xr.open_dataset(self.dyn_fpath).data_vars) return self.all_stations('streamflow_mm')
[docs] def fetch_raw_streamflow( self, stations: str = None ) -> pd.DataFrame: """ returns raw streamflow data for one or more stations. Example ------- >>> dataset = CAMELS_BR() >>> data = dataset.fetch_raw_streamflow('10500000') ... # fetch all time series data associated with a station. >>> x = dataset.fetch_raw_streamflow(dataset.all_stations()) """ if stations is None: stations = self.all_stations('streamflow_m3s_raw') if not isinstance(stations, list): stations = [stations] raw_q = [] for stn in stations: self._read_dynamic_feature('streamflow_m3s_raw', stn) return pd.concat(raw_q, axis=1)
[docs] def fetch_simulated_streamflow( self, stations: str = None ) -> pd.DataFrame: """ returns raw streamflow data for one or more stations. Example ------- >>> dataset = CAMELS_BR() >>> data = dataset.fetch_simulated_streamflow('10500000') ... # fetch all time series data associated with a station. >>> x = dataset.fetch_simulated_streamflow(dataset.all_stations()) """ if stations is None: stations = self.all_stations('simulated_streamflow_m3s') if not isinstance(stations, list): stations = [stations] raw_q = [] for stn in stations: self._read_dynamic_feature('simulated_streamflow_m3s', stn) return pd.concat(raw_q, axis=1)
def _read_dynamic( self, stations, attributes: Union[str, list] = 'all', st=None, en=None, ) -> Dict[str, pd.DataFrame]: """ returns the dynamic/time series attribute/attributes for one station id. Example ------- >>> dataset = CAMELS_BR() >>> pcp = dataset.fetch_dynamic_features('10500000', 'precipitation_cpc') ... # fetch all time series data associated with a station. >>> x = dataset.fetch_dynamic_features('51560000', dataset.dynamic_features) """ features = validate_attributes(attributes, self.dynamic_features, 'dynamic_features') st, en = self._check_length(st, en) cpus = self.processes or min(get_cpus(), 64) dyn = {} if cpus == 1: for idx, station in enumerate(stations): # making one separate dataframe for one station dyn[station] = self.get_dynamic_features(station, features).loc[st:en] if idx % 20 == 0: print(f"completed {idx} stations") else: if self.verbosity > 0: print(f"getting data for {len(stations)} stations using {cpus} cpus") features = [features for _ in range(len(stations))] with cf.ProcessPoolExecutor(cpus) as executor: results = executor.map(self.get_dynamic_features, stations, features) for station, stn_df in zip(stations, results): dyn[station] = stn_df.loc[st:en] if self.verbosity > 1: print(f"completed fetching data for {len(stations)} stations") return dyn def get_dynamic_features(self, station, features, st=None, en=None): feature_dfs = [] for feature, path in self.folders.items(): if feature in ['simulated_streamflow_m3s', 'streamflow_m3s_raw']: continue feature_df = self._read_dynamic_feature(path, feature=feature, station=station, st=st, en=en) feature_dfs.append(feature_df) stn_df = pd.concat(feature_dfs, axis=1) for new_col, (func, old_col) in self.dyn_generators.items(): if isinstance(old_col, str): if old_col in stn_df.columns: # name of Series to func should be same as station id stn_df[new_col] = func(pd.Series(stn_df[old_col], name=station)) else: assert isinstance(old_col, tuple) if all([col in stn_df.columns for col in old_col]): # feed all old_cols to the function stn_df[new_col] = func(*[pd.Series(stn_df[col], name=station) for col in old_col]) stn_df.columns.name = 'dynamic_features' stn_df.index.name = 'time' return stn_df def _read_dynamic_feature(self, folder, feature, station, st=None, en=None): path = os.path.join(self.path, f'{folder}{SEP}{folder}') # supposing that the filename starts with stn_id and has .txt extension. fname = [f for f in os.listdir(path) if f.startswith(str(station)) and f.endswith('.txt')] assert len(fname) == 1, f"{len(fname)} {station} in {folder} for {feature}" fname = fname[0] if os.path.exists(os.path.join(path, fname)): df = pd.read_csv(os.path.join(path, fname), sep=' ') df.index = pd.to_datetime(df[['year', 'month', 'day']]) df = df.drop(['year', 'month', 'day'], axis=1) df = pd.DataFrame(df.loc[st:en, feature]) df.rename(columns = self.dyn_map, inplace=True) else: raise FileNotFoundError(f"file {fname} not found at {path}") return df.astype(np.float32) def _static_data(self)->pd.DataFrame: static_fpath = os.path.join(self.path, 'static_features.csv') if not os.path.exists(static_fpath): files = glob.glob( f"{os.path.join(self.path, '01_CAMELS_BR_attributes', '01_CAMELS_BR_attributes')}/*.txt") static_df = pd.DataFrame() for f in files: _df = pd.read_csv(f, sep=' ', index_col='gauge_id') static_df = pd.concat([static_df, _df], axis=1) static_df.to_csv(static_fpath, index_label='gauge_id') else: static_df = pd.read_csv(static_fpath, index_col='gauge_id') static_df.index = static_df.index.astype(str) static_df.rename(columns = self.static_map, inplace=True) return static_df
[docs] class CABra(_RainfallRunoff): """ Reads and fetches CABra dataset which is catchment attribute dataset following the work of `Almagro et al., 2021 <https://doi.org/10.5194/hess-25-3105-2021>`_ This dataset consists of 87 static and 13 dynamic features of 735 Brazilian catchments. The temporal extent is from 1980 to 2020. The dyanmic features consist of daily hydro-meteorological time series Examples --------- >>> from aqua_fetch import CABra >>> dataset = CABra() ... # get data by station id >>> _, dynamic = dataset.fetch(stations='92', as_dataframe=True) >>> df = dynamic['92'] # dynamic is a dictionary of with keys as station names and values as DataFrames >>> df.shape (10956, 13) ... ... # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 735 ... # get data of 10 % of stations as dataframe >>> _, dynamic = dataset.fetch(0.1, as_dataframe=True) >>> len(dynamic) # dynamic has data for 10% of stations (73 out of 735) 73 ... ... # dynamic is a dictionary whose values are dataframes of dynamic features >>> [df.shape for df in dynamic.values()] [(10956, 13), (10956, 13), (10956, 13),... (10956, 13), (10956, 13)] ... ... get the data of a single (randomly selected) station >>> _, dynamic = dataset.fetch(stations=1, as_dataframe=True) >>> len(dynamic) # dynamic has data for 1 station 1 ... # get names of available dynamic features >>> dataset.dynamic_features ... # get only selected dynamic features >>> _, dynamic = dataset.fetch('92', as_dataframe=True, ... dynamic_features=['pcp_mm_ens', 'airtemp_C_ens_max', 'pet_mm_pm', 'rh_%_ens', 'q_cms_obs']) >>> dynamic['92'].shape (10956, 4) ... ... # get names of available static features >>> dataset.static_features ... # get data of 10 random stations >>> _, dynamic = dataset.fetch(10, as_dataframe=True) >>> len(dynamic) # remember this is a dictionary with values as dataframe 10 ... # If we get both static and dynamic data >>> static, dynamic = dataset.fetch(stations='92', static_features="all", as_dataframe=True) >>> static.shape, len(dynamic), dynamic['92'].shape ((1, 87), 1, (10956, 13)) # If we don't set as_dataframe=True and have xarray installed then the returned data will be a xarray Dataset >>> _, dynamic = dataset.fetch(10) ... type(dynamic) xarray.core.dataset.Dataset ... >>> dynamic.dims FrozenMappingWarningOnValuesAccess({'time': 10956, 'dynamic_features': 13}) ... >>> len(dynamic.data_vars) 10 ... >>> coords = dataset.stn_coords() # returns coordinates of all stations >>> coords.shape (735, 2) >>> dataset.stn_coords('92') # returns coordinates of station whose id is 92 -2.509 -47.764 >>> dataset.stn_coords(['92', '5']) # returns coordinates of two stations ... # get area of a single station >>> dataset.area('92') # get coordinates of two stations >>> dataset.area(['92', '5']) ... # if fiona library is installed we can get the boundary as fiona Geometry >>> dataset.get_boundary('92') """ url = 'https://zenodo.org/record/7612350'
[docs] def __init__(self, path=None, overwrite=False, to_netcdf: bool = True, met_src: str = 'ens', **kwargs): """ Parameters ---------- path : str If the data is alredy downloaded then provide the complete path to it. If None, then the data will be downloaded. The data is downloaded once and therefore susbsequent calls to this class will not download the data unless ``overwrite`` is set to True. overwrite : bool If the data is already down then you can set it to True, to make a fresh download. to_netcdf : bool whether to convert all the data into one netcdf file or not. This will fasten repeated calls to fetch etc but will require netCDF4 package as well as xarry. met_src : str source of meteorological data, must be one of ``ens``, ``era5`` or ``ref``. """ super(CABra, self).__init__(path=path, to_netcdf=to_netcdf, **kwargs) self.path = path self.met_src = met_src self._download(overwrite=overwrite) self._dynamic_features = self.__dynamic_features() self._static_features = self.__static_features() self._maybe_to_netcdf()
@property def dyn_fname(self) -> Union[str, os.PathLike]: """ name of the .nc file which contains dynamic features. This file is created during dataset initialization only if to_netcdf is True and xarray is installed and the file does not already exists. The creation of this file can take some time however it leads to faster I/O operations. """ return self.name.lower() + f"_{self.timestep}_{self.met_src}.nc" @property def boundary_file(self) -> os.PathLike: return os.path.join(self.path, "CABra_boundaries", "CABra_boundaries.shp") @property def boundary_id_map(self) -> str: """ Name of the attribute in the boundary (.shp/.gpkg) file that will be used to map the catchment/station id to the geometry of the catchment/station. This is used to create the boundary id map. """ return "ID_CABra" @property def static_map(self) -> Dict[str, str]: return { 'catch_area': catchment_area(), 'catch_slope': slope('perc'), 'latitude': gauge_latitude(), 'longitude': gauge_longitude(), } @property def dyn_map(self): # table 3 in the paper https://hess.copernicus.org/articles/25/3105/2021/#&gid=1&pid=1 return { 'Streamflow': observed_streamflow_cms(), 'tmin_ens': min_air_temp_with_specifier('ens'), 'tmax_ens': max_air_temp_with_specifier('ens'), 'tmin_era5': min_air_temp_with_specifier('era5'), 'tmax_era5': max_air_temp_with_specifier('era5'), 'tmin_ref': min_air_temp_with_specifier('ref'), 'tmax_ref': max_air_temp_with_specifier('ref'), 'p_ens': total_precipitation_with_specifier('ens'), 'p_ref': total_precipitation_with_specifier('ref'), 'p_era5': total_precipitation_with_specifier('era5'), 'rh_ens': mean_rel_hum_with_specifier('ens'), 'rh_era5': mean_rel_hum_with_specifier('era5'), 'rh_ref': mean_rel_hum_with_specifier('ref'), 'wnd_ens': mean_windspeed_with_specifier('ens'), 'wnd_era5': mean_windspeed_with_specifier('era5'), 'wnd_ref': mean_windspeed_with_specifier('ref'), 'et_ens': actual_evapotranspiration_with_specifier('ens'), 'pet_pm': total_potential_evapotranspiration_with_specifier('pm'), 'pet_pt': total_potential_evapotranspiration_with_specifier('pt'), 'pet_hg': total_potential_evapotranspiration_with_specifier('hg'), 'srad_ens': solar_radiation_with_specifier('ens'), # todo: change units from MJ/m2/day to W/m2 'srad_era5': solar_radiation_with_specifier('era5'), 'srad_ref': solar_radiation_with_specifier('ref'), } @property def dyn_generators(self): return { # new column to be created : function to be applied, inputs mean_air_temp_with_specifier(self.met_src): (self.mean_temp, (min_air_temp_with_specifier(self.met_src), max_air_temp_with_specifier(self.met_src))), #mean_air_temp_with_specifier('era5'): (self.mean_temp, (min_air_temp_with_specifier('era5'), max_air_temp_with_specifier('era5'))), #mean_air_temp_with_specifier('ref'): (self.mean_temp, (min_air_temp_with_specifier('ref'), max_air_temp_with_specifier('ref'))), } @property def q_path(self): return os.path.join(self.path, "CABra_streamflow_daily_series", "CABra_daily_streamflow") @property def attr_path(self): return os.path.join(self.path, 'CABra_attributes', 'CABra_attributes') @property def dynamic_features(self) -> List[str]: return self._dynamic_features def __dynamic_features(self) -> List[str]: stn = self.stations()[0] df = pd.concat([self._read_q_from_csv(stn), self._read_meteo_from_csv(stn, self.met_src)], axis=1) cols = df.columns.to_list() cols = [col for col in cols if col not in ['Year', 'Month', 'Day']] return cols @property def static_features(self) -> List[str]: """names of static features""" return self._static_features def __static_features(self) -> List[str]: df = pd.concat( [ self.climate_attrs(), self.general_attrs(), self.geology_attrs(), self.gw_attrs(), self.hydro_distrub_attrs(), self.lc_attrs(), self.soil_attrs(), self.q_attrs(), self.topology_attrs()], axis=1) # drop duplicate columns from df which might have come due to concatenation df = df.loc[:, ~df.columns.duplicated()] df.rename(columns = self.static_map, inplace=True) return df.columns.to_list()
[docs] def stations(self) -> List[str]: return self.add_attrs().index.astype(str).to_list()
@property def start(self) -> pd.Timestamp: return pd.Timestamp("1980-10-01") @property def end(self) -> pd.Timestamp: return pd.Timestamp("2010-09-30")
[docs] def add_attrs(self) -> pd.DataFrame: """ Returns additional catchment attributes """ fpath = os.path.join(self.attr_path, "CABra_additional_attributes.txt") dtypes = {"CABra_ID": int, # todo shouldn't it be str? "ANA_ID": int, "longitude_centroid": np.float32, "latitude_centroid": np.float32, "dist_coast": np.float32} add_attributes = pd.read_csv(fpath, sep='\t', names=list(dtypes.keys()), dtype=dtypes, header=4) add_attributes.index = add_attributes.pop('CABra_ID') return add_attributes
[docs] def climate_attrs(self) -> pd.DataFrame: """ returns climate attributes for all catchments """ fpath = os.path.join(self.attr_path, "CABra_climate_attributes.txt") dtypes = {"CABra_ID": int, # todo shouldn't it be str? "ANA_ID": int, "clim_p": np.float32, "clim_tmin": np.float32, "clim_tmax": np.float32, "clim_rh": np.float32, "clim_wind": np.float32, "clim_srad": np.float32, "clim_et": np.float32, "clim_pet": np.float32, "aridity_index": np.float32, "p_seasonality": np.float32, "clim_quality": int, } clim_attrs = pd.read_csv(fpath, sep='\t', names=list(dtypes.keys()), dtype=dtypes, encoding_errors='ignore', header=6) clim_attrs.index = clim_attrs.pop('CABra_ID') return clim_attrs
[docs] def general_attrs(self) -> pd.DataFrame: """ returns general attributes for all catchments """ fpath = os.path.join(self.attr_path, "CABra_general_attributes.txt") dtypes = {"CABra_ID": int, # todo shouldn't it be str? "ANA_ID": int, "longitude": np.float32, "latitude": np.float32, "gauge_hreg": str, "gauge_biome": str, "gauge_state": str, "missing_data": np.float32, "series_length": np.float32, "quality_index": np.float32, } gen_attrs = pd.read_csv(fpath, sep='\t', names=list(dtypes.keys()), dtype=dtypes, encoding_errors='ignore', header=6) gen_attrs.index = gen_attrs.pop('CABra_ID') return gen_attrs
[docs] def geology_attrs(self) -> pd.DataFrame: """ returns geological attributes for all catchments """ fpath = os.path.join(self.attr_path, "CABra_geology_attributes.txt") dtypes = {"CABra_ID": int, "ANA_ID": int, "catch_lith": str, "sub_porosity": np.float32, "sub_permeability": np.float32, "sub_hconduc": np.float32, } gen_attrs = pd.read_csv(fpath, sep='\t', names=list(dtypes.keys()), dtype=dtypes, encoding_errors='ignore', header=6) gen_attrs.index = gen_attrs.pop('CABra_ID') return gen_attrs
[docs] def gw_attrs(self) -> pd.DataFrame: """ returns groundwater attributes for all catchments """ fpath = os.path.join(self.attr_path, "CABra_groundwater_attributes.txt") dtypes = {"CABra_ID": int, "ANA_ID": int, "aquif_name": str, "aquif_type": str, "catch_wtd": np.float32, "catch_hand": np.float32, "hand_class": str, "well_number": int, "well_static": str, "well_dynamic": str, } gen_attrs = pd.read_csv(fpath, sep='\t', names=list(dtypes.keys()), dtype=dtypes, encoding_errors='ignore', header=7) gen_attrs.index = gen_attrs.pop('CABra_ID') return gen_attrs
[docs] def hydro_distrub_attrs(self) -> pd.DataFrame: """ returns geological attributes for all catchments """ fpath = os.path.join(self.attr_path, "CABra_hydrologic_disturbance_attributes.txt") dtypes = {"CABra_ID": int, "ANA_ID": int, "dist_urban": int, "cover_urban": np.float32, "cover_crops": np.float32, "res_number": int, "res_area": np.float32, "res_volume": np.float32, "res_regulation": np.float32, "water_demand": int, "hdisturb_index": np.float32, } gen_attrs = pd.read_csv(fpath, sep='\t', names=list(dtypes.keys()), dtype=dtypes, encoding_errors='ignore', header=8) gen_attrs.index = gen_attrs.pop('CABra_ID') return gen_attrs
[docs] def lc_attrs(self) -> pd.DataFrame: """ returns land cover attributes for all catchments """ fpath = os.path.join(self.attr_path, "CABra_land-cover_attributes.txt") dtypes = {"CABra_ID": int, "ANA_ID": int, "cover_main": str, "cover_bare": np.float32, "cover_forest": np.float32, "cover_crops": np.float32, "cover_grass": np.float32, "cover_moss": np.float32, "cover_shrub": np.float32, "cover_urban": np.float32, "cover_snow": np.float32, "cover_waterp": np.float32, "cover_waters": np.float32, "ndvi_djf": np.float32, "ndvi_mam": np.float32, "ndvi_jja": np.float32, "ndvi_son": np.float32, } lc_attrs = pd.read_csv(fpath, sep='\t', names=list(dtypes.keys()), dtype=dtypes, encoding_errors='ignore', header=6) lc_attrs.index = lc_attrs.pop('CABra_ID') return lc_attrs
[docs] def soil_attrs(self) -> pd.DataFrame: """ returns soil attributes for all catchments """ fpath = os.path.join(self.attr_path, "CABra_soil_attributes.txt") dtypes = {"CABra_ID": int, "ANA_ID": int, "soil_type": str, "soil_textclass": str, "soil_sand": np.float32, "soil_silt": np.float32, "soil_clay": np.float32, "soil_carbon": np.float32, "soil_bulk": np.float32, "soil_depth": np.float32, } soil_attrs = pd.read_csv(fpath, sep='\t', names=list(dtypes.keys()), dtype=dtypes, encoding_errors='ignore', header=7) soil_attrs.index = soil_attrs.pop('CABra_ID') return soil_attrs
[docs] def q_attrs(self) -> pd.DataFrame: """ returns streamflow attributes for all catchments """ fpath = os.path.join(self.attr_path, "CABra_streamflow_attributes.txt") dtypes = {"CABra_ID": int, "ANA_ID": int, "q_mean": np.float32, "q_1": np.float32, "q_5": np.float32, "q_95": np.float32, "q_99": np.float32, "q_lf": np.float32, "q_ld": np.float32, "q_hf": np.float32, "q_hd": np.float32, "q_hfd": np.float32, "q_zero": int, "q_cv": np.float32, "q_lcv": np.float32, "q_hcv": np.float32, "q_elasticity": np.float32, "fdc_slope": np.float32, "baseflow_index": np.float32, 'runoff_coef': np.float32 } names = list(dtypes.keys()) dtypes.pop('q_cv') dtypes.pop('q_mean') dtypes.pop('q_lcv') dtypes.pop('fdc_slope') q_attrs = pd.read_csv(fpath, sep='\t', names=names, dtype=dtypes, encoding_errors='ignore', header=7) q_attrs.index = q_attrs.pop('CABra_ID') return q_attrs
[docs] def topology_attrs(self) -> pd.DataFrame: """ returns topology attributes for all catchments """ fpath = os.path.join(self.attr_path, "CABra_topography_attributes.txt") dtypes = {"CABra_ID": int, "ANA_ID": int, "catch_area": np.float32, "elev_mean": np.float32, "elev_min": np.float32, "elev_max": np.float32, "elev_gauge": np.float32, "catch_slope": np.float32, "catch_order": int, } gen_attrs = pd.read_csv(fpath, sep='\t', names=list(dtypes.keys()), dtype=dtypes, encoding_errors='ignore', header=7) gen_attrs.index = gen_attrs.pop('CABra_ID') return gen_attrs
def _read_q_from_csv(self, station: str) -> pd.DataFrame: q_fpath = os.path.join(self.q_path, f"CABra_{station}_streamflow.txt") df = pd.read_csv(q_fpath, sep='\t', header=8, names=['Year', 'Month', 'Day', 'Streamflow', 'Quality'], dtype={'Year': np.int16, 'Month': np.int16, 'Day': np.int16, # 'Streamflow': np.float32, 'Quality': np.int16} ) df.rename(columns=self.dyn_map, inplace=True) df[observed_streamflow_cms()] = df[observed_streamflow_cms()].astype(np.float32) return df def _read_meteo_from_csv( self, station: str, source="ens") -> pd.DataFrame: meteo_path = os.path.join(self.path, 'CABra_climate_daily_series', 'climate_daily', source ) meteo_fpath = os.path.join(meteo_path, f"CABra_{station}_climate_{source.upper()}.txt") dtypes = {"Year": int, "Month": int, "Day": int, f"p_{source}": np.float32, f"tmin_{source}": np.float32, f"tmax_{source}": np.float32, f"rh_{source}": np.float32, f"wnd_{source}": np.float32, f"srad_{source}": np.float32, f"et_{source}": np.float32, "pet_pm": np.float32, "pet_pt": np.float32, "pet_hg": np.float32} if source == "ref" and station in [ '1', '2', '3', '4', '5', '6', '7', '8', '9', '15', '17', '18', '19', '27', '28', '34', '526', '564', '567', '569' ]: df = pd.DataFrame(columns=list(dtypes.keys())) else: df = pd.read_csv(meteo_fpath, sep="\t", names=list(dtypes.keys()), dtype=dtypes, header=12) df.rename(columns=self.dyn_map, inplace=True) for new_col, (func, old_col) in self.dyn_generators.items(): if isinstance(old_col, str): if old_col in df.columns: # name of Series to func should be same as station id df[new_col] = func(pd.Series(df[old_col], name=station)) else: assert isinstance(old_col, tuple) if all([col in df.columns for col in old_col]): # feed all old_cols to the function df[new_col] = func(*[pd.Series(df[col], name=station) for col in old_col]) return df def _static_data(self)->pd.DataFrame: df = pd.concat([self.climate_attrs(), self.general_attrs(), self.geology_attrs(), self.gw_attrs(), self.hydro_distrub_attrs(), self.lc_attrs(), self.soil_attrs(), self.q_attrs(), self.topology_attrs()], axis=1) df.index = df.index.astype(str) # drop duplicate columns df = df.loc[:, ~df.columns.duplicated()].copy() df.rename(columns=self.static_map, inplace=True) return df def _read_dynamic( self, stations, dynamic_features, st=None, en=None ) -> dict: st, en = self._check_length(st, en) features = validate_attributes(dynamic_features, self.dynamic_features, 'dynamic_features') if self.verbosity>1: print(f"getting data for {len(dynamic_features)} and for {len(stations)} stations") # qs and meteo data has different index if self.verbosity>2: print("getting streamflow data") qs = [self._read_q_from_csv(station=station) for station in stations] q_idx = pd.to_datetime( qs[0]['Year'].astype(str) + '-' + qs[0]['Month'].astype(str) + '-' + qs[0]['Day'].astype(str)) if self.verbosity>2: print("getting meteo data") meteos = [ self._read_meteo_from_csv(station=station, source=self.met_src) for station in stations] # todo : this will be correct only if we are getting data for all stations # but what if we want to get data for some random stations? # 10 because first 10 stations don't have data for "ref" source if len(meteos) < 10: met_idx = pd.to_datetime( meteos[0]['Year'].astype(str) + '-' + meteos[0]['Month'].astype(str) + '-' + meteos[0]['Day'].astype( str)) else: met_idx = pd.to_datetime( meteos[10]['Year'].astype(str) + '-' + meteos[10]['Month'].astype(str) + '-' + meteos[10]['Day'].astype( str)) met_cols = [col for col in meteos[0].columns if col not in ['Year', 'Month', 'Day']] if self.verbosity>2: print("putting data in dictionary") dyn = {} for stn, q, meteo in zip(self.stations(), qs, meteos): if len(meteo) == 0: meteo = pd.DataFrame(meteo, index=met_idx) else: meteo.index = met_idx q.index = q_idx stn_df = pd.concat( [meteo[met_cols].astype(np.float32), q[['Quality', observed_streamflow_cms()]]], axis=1)[features] stn_df.index.name = 'time' stn_df.columns.name = 'dynamic_features' dyn[stn] = stn_df.loc[st:en] return dyn