Source code for aqua_fetch.rr._bull

import os
from typing import List, Union, Dict
import concurrent.futures as cf

import pandas as pd

from .utils import _RainfallRunoff
from .._backend import netCDF4
from .._backend import xarray as xr
from ._map import (
    total_potential_evapotranspiration_with_specifier,
    solar_radiation,
    max_solar_radiation,
    min_solar_radiation,
    mean_thermal_radiation,
    max_thermal_radiation,
    min_themal_radiation,
    max_air_temp_with_specifier,
    min_air_temp_with_specifier,
    mean_air_temp_with_specifier,
    total_precipitation_with_specifier,
    max_dewpoint_temperature,
    min_dewpoint_temperature,
    mean_dewpoint_temperature,
    mean_dewpoint_temperature_with_specifier,
    mean_potential_evaporation,
    observed_streamflow_cms,
    max_dewpoint_temperature,
    snow_water_equivalent,
    min_snow_water_equivalent,
    max_snow_water_equivalent,
    u_component_of_wind_with_specifier,
    v_component_of_wind_with_specifier
)

from ._map import (
    catchment_area,
    gauge_latitude,
    gauge_longitude,
    slope
    )

BUL_COLUMNS = [
    'snow_depth_water_equivalent_mean_BULL', 
    'surface_net_solar_radiation_mean_BULL',
    'surface_net_thermal_radiation_mean_BULL',
    'surface_pressure_mean_BULL', 'temperature_2m_mean_BULL', 'dewpoint_temperature_2m_mean_BULL',
    'u_component_of_wind_10m_mean_BULL',
    'v_component_of_wind_10m_mean_BULL', 'volumetric_soil_water_layer_1_mean_BULL',
    'volumetric_soil_water_layer_2_mean_BULL',
    'volumetric_soil_water_layer_3_mean_BULL', 'volumetric_soil_water_layer_4_mean_BULL',
    'snow_depth_water_equivalent_min_BULL',
    'surface_net_solar_radiation_min_BULL', 'surface_net_thermal_radiation_min_BULL', 'surface_pressure_min_BULL',
    'temperature_2m_min_BULL', 'dewpoint_temperature_2m_min_BULL', 'u_component_of_wind_10m_min_BULL',
    'v_component_of_wind_10m_min_BULL',
    'volumetric_soil_water_layer_1_min_BULL', 'volumetric_soil_water_layer_2_min_BULL',
    'volumetric_soil_water_layer_3_min_BULL',
    'volumetric_soil_water_layer_4_min_BULL', 'snow_depth_water_equivalent_max_BULL',
    'surface_net_solar_radiation_max_BULL',
    'surface_net_thermal_radiation_max_BULL', 'surface_pressure_max_BULL', 'temperature_2m_max_BULL',
    'dewpoint_temperature_2m_max_BULL',
    'u_component_of_wind_10m_max_BULL', 'v_component_of_wind_10m_max_BULL', 'volumetric_soil_water_layer_1_max_BULL',
    'volumetric_soil_water_layer_2_max_BULL', 'volumetric_soil_water_layer_3_max_BULL',
    'volumetric_soil_water_layer_4_max_BULL',
    'total_precipitation_sum_BULL', 'potential_evaporation_sum_BULL', 
    'streamflow_BULL'
]


[docs] class Bull(_RainfallRunoff): """ Following the works of `Aparicio et al., 2024 <https://doi.org/10.1038/s41597-024-03594-5>`_. The data is taken from the `Zenodo repository <https://zenodo.org/records/10629809>`_. This dataset contains 484 stations with 55 dynamic (time series) features and 214 static features. The dynamic features span from 1951 to 2021. Examples --------- >>> from aqua_fetch import Bull >>> dataset = Bull() ... # get data by station id >>> _, dynamic = dataset.fetch(stations='BULL_9007', as_dataframe=True) >>> df = dynamic['BULL_9007'] # dynamic is a dictionary of with keys as station names and values as DataFrames >>> df.shape (25932, 55) ... ... # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 484 ... # get data of 10 % of stations as dataframe >>> _, dynamic = dataset.fetch(0.1, as_dataframe=True) >>> len(dynamic) # dynamic has data for 10% of stations (48 out of 484) 48 ... ... # dynamic is a dictionary whose values are dataframes of dynamic features >>> [df.shape for df in dynamic.values()] [(25932, 55), (25932, 55), (25932, 55),... (25932, 55), (25932, 55)] ... ... get the data of a single (randomly selected) station >>> _, dynamic = dataset.fetch(stations=1, as_dataframe=True) >>> len(dynamic) # dynamic has data for 1 station 1 ... # get names of available dynamic features >>> dataset.dynamic_features ... # get only selected dynamic features >>> _, dynamic = dataset.fetch('BULL_9007', as_dataframe=True, ... dynamic_features=['pet_mm_AEMET', 'airtemp_C_mean_AEMET', 'pcp_mm_ERA5Land', 'q_obs_cms']) >>> dynamic['BULL_9007'].shape (25932, 4) ... ... # get names of available static features >>> dataset.static_features ... # get data of 10 random stations >>> _, dynamic = dataset.fetch(10, as_dataframe=True) >>> len(dynamic) # remember this is a dictionary with values as dataframe 10 ... # If we get both static and dynamic data >>> static, dynamic = dataset.fetch(stations='BULL_9007', static_features="all", as_dataframe=True) >>> static.shape, len(dynamic), dynamic['BULL_9007'].shape ((1, 214), 1, (25932, 55)) ... # If we don't set as_dataframe=True and have xarray installed then the returned data will be a xarray Dataset >>> _, dynamic = dataset.fetch(10) ... type(dynamic) xarray.core.dataset.Dataset ... >>> dynamic.dims FrozenMappingWarningOnValuesAccess({'time': 25932, 'dynamic_features': 55}) ... >>> len(dynamic.data_vars) 10 ... >>> coords = dataset.stn_coords() # returns coordinates of all stations >>> coords.shape (484, 2) >>> dataset.stn_coords('BULL_9007') # returns coordinates of station whose id is BULL_9007 41.298 -1.967 >>> dataset.stn_coords(['BULL_9007', 'BULL_8083']) # returns coordinates of two stations ... # get area of a single station >>> dataset.area('BULL_9007') # get coordinates of two stations >>> dataset.area(['BULL_9007', 'BULL_8083']) ... # if fiona library is installed we can get the boundary as fiona Geometry >>> dataset.get_boundary('BULL_9007') """ url = "https://zenodo.org/records/10629809" # todo : why setting a dictionary does not work? # url = { # "attributes.7z": "https://zenodo.org/record/10629809/files/attributes.7z", # "shapefiles.zip": "https://zenodo.org/record/10629809/files/shapefiles.zip", # "timeseries.zip": "https://zenodo.org/record/10629809/files/timeseries.zip", # }
[docs] def __init__( self, path, overwrite=False, **kwargs ): super().__init__(path, **kwargs) self._download(overwrite=overwrite) self._unzip_7z_files() if netCDF4 is None: self.ftype = "csv" else: self.ftype = "netcdf" self._dynamic_features = self._read_stn_dyn(self.stations()[0]).columns.tolist() self._static_features = list(set(self._static_data().columns.tolist()))
#self.dyn_fname = '' @property def boundary_file(self) -> os.PathLike: return os.path.join(self.shapefiles_path, "BULL_basin_shapes.shp") @property def static_map(self) -> Dict[str, str]: return { 'area': catchment_area(), 'gauge_lat': gauge_latitude(), 'gauge_lon': gauge_longitude(), } @property def dyn_map(self): return { 'dewpoint_temperature_2m_max_BULL': max_dewpoint_temperature(), 'dewpoint_temperature_2m_mean_BULL': mean_dewpoint_temperature(), 'dewpoint_temperature_2m_min_BULL': min_dewpoint_temperature(), # todo: are we considering height 'potential_evaporation_sum_BULL': mean_potential_evaporation(), # todo: is it mean or total? 'streamflow_BULL': observed_streamflow_cms(), 'potential_evapotranspiration_AEMET': total_potential_evapotranspiration_with_specifier('AEMET'), 'potential_evapotranspiration_EMO1_arc': total_potential_evapotranspiration_with_specifier('EMO1arc'), 'potential_evapotranspiration_ERA5_Land': total_potential_evapotranspiration_with_specifier('ERA5Land'), 'surface_net_solar_radiation_mean_BULL': solar_radiation(), 'surface_net_solar_radiation_max_BULL': max_solar_radiation(), 'surface_net_solar_radiation_min_BULL': min_solar_radiation(), 'surface_net_thermal_radiation_max_BULL': max_thermal_radiation(), 'surface_net_thermal_radiation_mean_BULL': mean_thermal_radiation(), 'surface_net_thermal_radiation_min_BULL': min_themal_radiation(), 'temperature_max_AEMET': max_air_temp_with_specifier('AEMET'), 'temperature_max_EMO1_arc': max_air_temp_with_specifier('EMO1arc'), 'temperature_max_ERA5_Land': max_air_temp_with_specifier('ERA5Land'), 'temperature_mean_AEMET': mean_air_temp_with_specifier('AEMET'), 'temperature_mean_EMO1_arc': mean_air_temp_with_specifier('EMO1arc'), 'temperature_mean_ERA5_Land': mean_air_temp_with_specifier('ERA5Land'), 'temperature_min_AEMET': min_air_temp_with_specifier('AEMET'), 'temperature_min_EMO1_arc': min_air_temp_with_specifier('EMO1arc'), 'temperature_min_ERA5_Land': min_air_temp_with_specifier('ERA5Land'), 'total_precipitation_AEMET': total_precipitation_with_specifier('AEMET'), 'total_precipitation_EMO1_arc': total_precipitation_with_specifier('EMO1arc'), 'total_precipitation_ERA5_Land': total_precipitation_with_specifier('ERA5Land'), 'total_precipitation_sum_BULL': total_precipitation_with_specifier('BULL'), 'snow_depth_water_equivalent_max_BULL': max_snow_water_equivalent(), 'snow_depth_water_equivalent_mean_BULL': snow_water_equivalent(), 'snow_depth_water_equivalent_min_BULL': min_snow_water_equivalent(), #'surface_pressure_max_BULL': # todo: is it same as air pressure 'temperature_2m_max_BULL': max_air_temp_with_specifier('2m'), 'temperature_2m_mean_BULL': mean_air_temp_with_specifier('2m'), 'temperature_2m_min_BULL': min_air_temp_with_specifier('2m'), 'u_component_of_wind_10m_max_BULL': u_component_of_wind_with_specifier('max_10m'), 'u_component_of_wind_10m_mean_BULL': u_component_of_wind_with_specifier('mean_10m'), 'u_component_of_wind_10m_min_BULL': u_component_of_wind_with_specifier('min_10m'), 'v_component_of_wind_10m_max_BULL': v_component_of_wind_with_specifier('max_10m'), 'v_component_of_wind_10m_mean_BULL': v_component_of_wind_with_specifier('mean_10m'), 'v_component_of_wind_10m_min_BULL': v_component_of_wind_with_specifier('min_10m'), } @property def attributes_path(self): return os.path.join(self.path, "attributes") @property def shapefiles_path(self): return os.path.join(self.path, "shapefiles", "shapefiles") @property def ts_path(self): return os.path.join(self.path, "timeseries", "timeseries") @property def q_path(self): return os.path.join(self.ts_path, self.ftype, "streamflow") @property def aemet_path(self): return os.path.join(self.ts_path, self.ftype, "AEMET") @property def bull_path(self): return os.path.join(self.ts_path, self.ftype, "BULL") @property def era5_land_path(self): return os.path.join(self.ts_path, self.ftype, "ERA5_Land") @property def emo1_arc_path(self): return os.path.join(self.ts_path, self.ftype, "EMO1_arc") @property def start(self): return pd.Timestamp("19510102") @property def end(self): return pd.Timestamp("20211231")
[docs] def stations(self) -> List[str]: return ["BULL_" + f.split('.')[0].split('_')[1] for f in os.listdir(self.q_path)]
@property def dynamic_features(self) -> List[str]: return self._dynamic_features @property def static_features(self) -> List[str]: return self._static_features def _unzip_7z_files(self): # The attributes file is .7z file try: import py7zr except (ModuleNotFoundError, ImportError): raise ImportError('py7zr is required to extract the .7z files. Please install it using `pip install py7zr`') # get all .7z files in self.path files = [f for f in os.listdir(self.path) if f.endswith('.7z')] for file in files: fpath = os.path.join(self.path, file) extracted_path = os.path.join(self.path, file[:-3]) # remove .7z extension if not os.path.exists(extracted_path): with py7zr.SevenZipFile(fpath, mode='r') as z: z.extractall(path = self.path) print(f'Extracted {file}') return
[docs] def caravan_attributes(self) -> pd.DataFrame: """a dataframe of shape (484, 10)""" return pd.read_csv( os.path.join(self.attributes_path, "attributes_caravan_.csv"), index_col=0)
[docs] def hydroatlas_attributes(self) -> pd.DataFrame: """a dataframe of shape (484, 197)""" df = pd.read_csv( os.path.join(self.attributes_path, "attributes_hydroatlas_.csv"), index_col=0) # because self.other_attributes() has a column named 'area' df.rename(columns={'area': 'area_hydroatlas'}, inplace=True) return df
[docs] def other_attributes(self) -> pd.DataFrame: """a dataframe of shape (484, 7)""" return pd.read_csv( os.path.join(self.attributes_path, "attributes_other_ss.csv"), index_col=0)
def _static_data(self) -> pd.DataFrame: df = pd.concat([ self.caravan_attributes(), self.hydroatlas_attributes(), self.other_attributes() ], axis=1) df.rename(columns=self.static_map, inplace=True) return df def _read_stn_dyn(self, station: str) -> pd.DataFrame: station = station.split('_')[1] df = pd.concat([ self._read_q_for_stn(station), self._read_aemet_for_stn(station), self._read_bull_for_stn(station), self._read_era5_land_for_stn(station), self._read_emo1_arc_for_stn(station) ], axis=1) df.index.name = 'time' df.columns.name = 'dynamic_features' df.rename(columns=self.dyn_map, inplace=True) return df def _read_q_for_stn(self, station) -> pd.DataFrame: """a dataframe of shape (time, 1)""" if self.ftype == "netcdf": fpath = os.path.join(self.q_path, f'streamflow_{station}.nc') df = xr.load_dataset(fpath).to_dataframe() else: fpath = os.path.join(self.q_path, f'streamflow_{station}.csv') df = pd.read_csv(fpath, index_col='date', parse_dates=True) df.index.name = 'time' df.columns.name = 'dynamic_features' return df def _read_aemet_for_stn(self, station) -> pd.DataFrame: """ reads a dataframe of shape (time, 5) 'temperature_max_AEMET', 'temperature_min_AEMET', 'temperature_mean_AEMET', 'total_precipitation_AEMET', 'potential_evapotranspiration_AEMET' """ if self.ftype == "netcdf": fpath = os.path.join(self.aemet_path, f'AEMET_{station}.nc') df = xr.load_dataset(fpath).to_dataframe() else: fpath = os.path.join(self.aemet_path, f'AEMET_{station}.csv') df = pd.read_csv(fpath, index_col='date', parse_dates=True) df.index.name = 'time' df.columns.name = 'dynamic_features' df.columns = [col + '_AEMET' for col in df.columns] return df def _read_bull_for_stn(self, station) -> pd.DataFrame: """a dataframe of shape (time, 39) except for stn 3163""" if self.ftype == "netcdf": fpath = os.path.join(self.bull_path, f'BULL_{station}.nc') df = xr.load_dataset(fpath).to_dataframe() else: fpath = os.path.join(self.bull_path, f'BULL_{station}.csv') df = pd.read_csv(fpath, index_col='date', parse_dates=True) df.index.name = 'time' df.columns.name = 'dynamic_features' df.columns = [col + '_BULL' for col in df.columns] # todo: why are we adding _BULL to the columns if len(df.columns) == 15: # add missing columns for col in BUL_COLUMNS: if col not in df.columns: df[col] = None return df def _read_era5_land_for_stn(self, station) -> pd.DataFrame: """a dataframe of shape (time, 5) with following columns - 'temperature_max_ERA5_Land', - 'temperature_min_ERA5_Land', - 'temperature_mean_ERA5_Land', - 'total_precipitation_ERA5_Land', - 'potential_evapotranspiration_ERA5_Land' """ if self.ftype == "netcdf": fpath = os.path.join(self.era5_land_path, f'ERA5_Land_{station}.nc') df = xr.load_dataset(fpath).to_dataframe() else: fpath = os.path.join(self.era5_land_path, f'ERA5_Land_{station}.csv') df = pd.read_csv(fpath, index_col='date', parse_dates=True) df.index.name = 'time' df.columns.name = 'dynamic_features' df.columns = [col + '_ERA5_Land' for col in df.columns] return df def _read_emo1_arc_for_stn(self, station) -> pd.DataFrame: """a dataframe of shape (time, 5) with following columns - 'temperature_max_EMO1_arc' - 'temperature_min_EMO1_arc' - 'temperature_mean_EMO1_arc' - 'total_precipitation_EMO1_arc' - 'potential_evapotranspiration_EMO1_arc' """ if self.ftype == "netcdf": fpath = os.path.join(self.emo1_arc_path, f'EMO1_{station}.nc') df = xr.load_dataset(fpath).to_dataframe() else: fpath = os.path.join(self.emo1_arc_path, f'EMO1_{station}.csv') df = pd.read_csv(fpath, index_col='date', parse_dates=True) df.index.name = 'time' df.columns.name = 'dynamic_features' df.columns = [col + '_EMO1_arc' for col in df.columns] return df