Source code for aqua_fetch.rr._gsha


__all__ = [
    'GSHA', 
    '_GSHA', 
    'Japan'
    'Arcticnet',
    'Spain',
    'Thailand'
    ]

import os
import time
import warnings
from datetime import datetime
import concurrent.futures as cf
from typing import List, Union, Dict, Tuple

import numpy as np
import pandas as pd

from .._backend import xarray as xr

from ..utils import get_cpus
from ..utils import validate_attributes
from ..utils import merge_shapefiles_fiona
from .utils import _RainfallRunoff

from ._map import (
    total_precipitation_with_specifier,
    leaf_area_index,
    actual_evapotranspiration_with_specifier,
    total_potential_evapotranspiration_with_specifier,
    download_longwave_radiation_with_specifier,
    solar_radiation_with_specifier,
    snow_water_equivalent_with_specifier,
    mean_air_temp_with_specifier,
    mean_windspeed_with_specifier,
    u_component_of_wind_with_specifier,
    v_component_of_wind_with_specifier,
    groundwater_percentages,
    soil_moisture_layer1,
    soil_moisture_layer2,
    soil_moisture_layer3,
    soil_moisture_layer4,
    observed_streamflow_cms,
)

from ._map import (
    catchment_area,
    gauge_latitude,
    gauge_longitude,
    slope
    )


METEO_MAP = {
    'arcticnet': 'Meteorology_PartI_arcticnet_AFD_GRDC_IWRIS_MLIT/Meteorology_arcticnet_AFD_GRDC_IWRIS_MLIT',
    'AFD': 'Meteorology_PartI_arcticnet_AFD_GRDC_IWRIS_MLIT/Meteorology_arcticnet_AFD_GRDC_IWRIS_MLIT',
    'GRDC': 'Meteorology_PartI_arcticnet_AFD_GRDC_IWRIS_MLIT/Meteorology_arcticnet_AFD_GRDC_IWRIS_MLIT',
    'IWRIS': 'Meteorology_PartI_arcticnet_AFD_GRDC_IWRIS_MLIT/Meteorology_arcticnet_AFD_GRDC_IWRIS_MLIT',
    'MLIT': 'Meteorology_PartI_arcticnet_AFD_GRDC_IWRIS_MLIT/Meteorology_arcticnet_AFD_GRDC_IWRIS_MLIT',
    'HYDAT': 'Meteorology_ PartII_ANA_BOM_CCRR_HYDAT/Meteorology_ANA_BOM_CCRR_HYDAT',
    'ANA': 'Meteorology_ PartII_ANA_BOM_CCRR_HYDAT/Meteorology_ANA_BOM_CCRR_HYDAT',
    'BOM': 'Meteorology_ PartII_ANA_BOM_CCRR_HYDAT/Meteorology_ANA_BOM_CCRR_HYDAT',
    'CCRR': 'Meteorology_ PartII_ANA_BOM_CCRR_HYDAT/Meteorology_ANA_BOM_CCRR_HYDAT',
    'China': 'Meteorology_PartIII_China_CHP_RID_USGS/Meteorology_China_CHP_RID_USGS',
    'CHP': 'Meteorology_PartIII_China_CHP_RID_USGS/Meteorology_China_CHP_RID_USGS',
    'RID': 'Meteorology_PartIII_China_CHP_RID_USGS/Meteorology_China_CHP_RID_USGS',
    'USGS': 'Meteorology_PartIII_China_CHP_RID_USGS/Meteorology_China_CHP_RID_USGS',
}


[docs] class GSHA(_RainfallRunoff): """ Global streamflow characteristics, hydrometeorology and catchment attributes following `Peirong et al., 2023 <https://doi.org/10.5194/essd-16-1559-2024>`_. The data is downloaded from its `zenodo repository <https://zenodo.org/record/8090704>`_. It should be noted that this dataset does not contain observed streamflow data. It has 21568 stations, 26 dynamic (meteorological + storage) features with daily timestep, 21 dynamic features (landcover + streamflow indices + reservoir) with yearly timestep and 35 static features. Examples -------- >>> from aqua_fetch import GSHA >>> dataset = GSHA() >>> len(dataset.stations()) 21568 >>> dataset.agencies ['arcticnet', 'AFD', 'GRDC', 'IWRIS', 'MLIT', 'HYDAT', 'ANA', 'BOM', 'CCRR', 'China', 'CHP', 'RID', 'USGS'] >>> dataset.start Timestamp('1979-01-01 00:00:00') >>> dataset.end Timestamp('2022-12-31 00:00:00') >>> dataset.static_features ['ele_mt_uav', 'slp_dg_uav', 'lat', 'long', 'area_km2', 'agency', ...] >>> len(dataset.dynamic_features) 26 >>> len(dataset.daily_dynamic_features) 26 >>> len(dataset.yearly_dynamic_features) 21 >>> dataset.fetch_static_features('1001_arcticnet') fetch static features for all stations of arcticnet agency >>> dataset.fetch_static_features(agency='arcticnet') fetch static features for all stations of arcticnet agency >>> ds.fetch_dynamic_features(agency='arcticnet') """ url = "https://zenodo.org/record/8090704"
[docs] def __init__(self, path=None, overwrite:bool = False, to_netcdf: bool = True, **kwargs): """ Parameters ---------- to_netcdf : bool whether to convert all the data into one netcdf file or not. This will fasten repeated calls to fetch etc but will require netCDF4 package as well as xarry. """ super(GSHA, self).__init__(path=path, to_netcdf=to_netcdf, overwrite=overwrite, **kwargs) self.path = path files = ['Global_files.zip', 'GSHAreadme.docx', 'LAI.zip', 'Landcover.zip', 'Meteorology_PartI_arcticnet_AFD_GRDC_IWRIS_MLIT.zip', 'Meteorology_ PartII_ANA_BOM_CCRR_HYDAT.zip', 'Meteorology_PartIII_China_CHP_RID_USGS.zip', 'Reservoir.zip', 'Storage.zip', 'StreamflowIndices.zip', 'WatershedPolygons.zip', 'WatershedsAll.csv' ] self._download() self._maybe_merge_shapefiles() fpath = os.path.join(self.path, "Global_files", "Global_files", 'WatershedsAll.csv') wsAll = pd.read_csv(fpath) wsAll.columns = ['station_id', 'lat', 'long', 'area', 'agency'] wsAll.index = wsAll.pop('station_id') self.wsAll = wsAll[~wsAll.index.duplicated(keep='first')].copy() self._daily_dynamic_features = None # lazy initialization, computed when accessed self._yearly_dynamic_features = None # lazy initialization, computed when accessed self._static_features = self.__static_features()
@property def boundary_file(self) -> os.PathLike: return os.path.join(self.path, "boundaries.shp") @property def boundary_id_map(self) -> str: return "gauge_id" @property def agencies(self) -> List[str]: """ returns the names of agencies as list - ``arcticnet`` : Antarctica - ``AFD`` : Spain - ``GRDC`` : Global - ``IWRIS`` : India - ``MLIT`` : Japan - ``HYDAT`` : Canada - ``ANA``: Brazil - ``BOM`` : Australia - ``CCRR`` : Chile - ``China`` - ``CHP`` : China - ``RID`` : Thailand - ``USGS`` """ return self.wsAll.loc[:, 'agency'].unique() @property def daily_dynamic_features(self) -> List[str]: if self._daily_dynamic_features is None: self._daily_dynamic_features = self.__daily_dynamic_features() return self._daily_dynamic_features @property def yearly_dynamic_features(self) -> List[str]: if self._yearly_dynamic_features is None: self._yearly_dynamic_features = self.__yearly_dynamic_features() return self._yearly_dynamic_features @property def static_map(self) -> Dict[str, str]: return { 'area': catchment_area(), 'lat': gauge_latitude(), 'slp_dg_uav': slope('degrees'), 'long': gauge_longitude(), } @property def dyn_map(self): return { 'EP_GLEAM': actual_evapotranspiration_with_specifier('gleam'), 'EP_REA': actual_evapotranspiration_with_specifier('rea'), 'GLEAM_PET': total_potential_evapotranspiration_with_specifier('gleam'), 'GW': groundwater_percentages(), 'HPET_PET': total_potential_evapotranspiration_with_specifier('hpet'), 'LONGRAD_ERA': download_longwave_radiation_with_specifier('era5'), 'LONGRAD_MERRA': download_longwave_radiation_with_specifier('merra2'), 'P_EMEarth': total_precipitation_with_specifier('emearth'), 'P_MSWEP': total_precipitation_with_specifier('mswep'), 'SHORTRAD_ERA': solar_radiation_with_specifier('era5'), 'SHORTRAD_MERRA': solar_radiation_with_specifier('merra2'), 'SML1': soil_moisture_layer1(), 'SML2': soil_moisture_layer2(), 'SML3': soil_moisture_layer3(), 'SML4': soil_moisture_layer4(), 'SWDE': snow_water_equivalent_with_specifier('era5'), # change m to mm 'T_ERA': mean_air_temp_with_specifier('era5'), # change from K to C 'T_EUSTACE': mean_air_temp_with_specifier('eustace'), # change from K to C 'T_MERRA': mean_air_temp_with_specifier('merra2'), # change from K to C 'WINDERA': mean_windspeed_with_specifier('era5'), 'WINDMERRA': mean_windspeed_with_specifier('merra'), 'WINDU_ERA': u_component_of_wind_with_specifier('era5'), 'WINDU_MERRA': u_component_of_wind_with_specifier('merra'), 'WINDV_ERA': v_component_of_wind_with_specifier('era5'), 'WINDV_MERRA': v_component_of_wind_with_specifier('merra'), 'lai': leaf_area_index(), } @property def dyn_factors(self): return { snow_water_equivalent_with_specifier('era5'): lambda x: x * 1000, mean_air_temp_with_specifier('era5'): lambda x: x - 273.15, mean_air_temp_with_specifier('eustace'): lambda x: x - 273.15, mean_air_temp_with_specifier('merra2'): lambda x: x - 273.15, } def __daily_dynamic_features(self): return pd.concat( [self.meteo_vars_stn('1001_arcticnet'), self.storage_vars_stn('1001_arcticnet'), ], axis=1 ).columns.tolist() + ['lai'] def __yearly_dynamic_features(self): return pd.concat( [self.lc_variables_stn('1001_arcticnet'), self.streamflow_indices_stn('1001_arcticnet'), self.reservoir_variables_stn('1001_arcticnet') ], axis=1 ).columns.tolist() @property def start(self) -> pd.Timestamp: return pd.Timestamp('1979-01-01') @property def end(self) -> pd.Timestamp: return pd.Timestamp('2022-12-31') @property def static_features(self) -> List[str]: return self._static_features @property def dynamic_features(self) -> List[str]: return self.daily_dynamic_features def __static_features(self)->List[str]: df = pd.concat( [self.atlas('1001_arcticnet'), self.uncertainty('1001_arcticnet'), self.wsAll.copy(), ], axis=1 ) #+ self.wsAll df.rename(columns = self.static_map, inplace = True) return df.columns.tolist()
[docs] def agency_of_stn(self, stn: str) -> str: """find the agency to which a station belongs """ return self.wsAll.loc[stn, 'agency']
[docs] def agency_stations(self, agency: str) -> List[str]: """returns the station ids from a particular agency""" return self.wsAll[self.wsAll['agency'] == agency].index.tolist()
def _maybe_merge_shapefiles(self): shp_path = os.path.join(self.path, 'WatershedPolygons', 'WatershedPolygons') out_shapefile = os.path.join(self.path, 'boundaries.shp') if not os.path.exists(out_shapefile): shp_files = [os.path.join(shp_path, filename) for filename in os.listdir(shp_path) if filename.endswith('.shp')] merge_shapefiles_fiona(shp_files, out_shapefile, copy_properties=True) return def _get_stations( self, stations: Union[str, List[str]] = "all", agency: Union[str, List[str]] = "all" ) -> List[str]: if agency != "all" and stations != 'all': raise ValueError("Either provide agency or stations not both") if agency != "all": agency = validate_attributes(agency, self.agencies, 'agency') stations = self.wsAll[self.wsAll['agency'].isin(agency)].index.tolist() else: stations = validate_attributes(stations, self.stations(), 'stations') return stations
[docs] def stn_coords(self, stations: List[str] = "all", agency: List[str] = "all") -> pd.DataFrame: """ returns the latitude and longitude of stations Returns ------- pd.DataFrame a :obj:`pandas.DataFrame` of shape (n, 2) where n is the number of stations Examples -------- >>> from aqua_fetch import GSHA >>> dataset = GSHA() >>> dataset.stn_coords('1001_arcticnet') >>> dataset.stn_coords(['1001_arcticnet', '1002_arcticnet']) get coordinates for all stations of arcticnet agency >>> dataset.stn_coords(agency='arcticnet') """ stations = self._get_stations(stations, agency) return self.wsAll.loc[stations, ['lat', 'long']].copy()
[docs] def stations(self, agency: str = "all") -> List[str]: """returns names of stations as list""" if agency != "all": agency = validate_attributes(agency, self.agencies, 'agency') return self.wsAll[self.wsAll['agency'].isin(agency)].index.tolist() return self.wsAll.index.tolist()
[docs] def area(self, stations: List[str] = "all", agency: List[str] = "all") -> pd.Series: """area of catchments""" stations = self._get_stations(stations, agency) s = self.wsAll.loc[stations, 'area'] s.name = 'area_km2' return s
[docs] def uncertainty( self, stations: List[str] = "all", agency: List[str] = "all" ) -> pd.DataFrame: """ Uncertainty estimates of all meteorological variables over all watersheds - P_uncertainty (%) Precipitation uncertainty estimates (in percentage). Uncertainties are calculated from EM-Earth deterministic and MSWEP datasets. - T_uncertainty (%) Temperature uncertainty estimates (in percentage). Uncertainties are calculated from EUSTACE, MERRA-2, and ERA5 datasets. - EVP_uncertainty (%) Actual evapotranspiration uncertainty estimates (in percentage). Uncertainties are calculated from GLEAM and REA datasets. - LRAD_uncertainty (%) Downward longwave radiation uncertainty estimates (in percentage). Uncertainties are calculated from MERRA-2 and ERA5-land datasets. - SRAD_uncertainty (%) Downward shortwave radiation uncertainty estimates (in percentage). Uncertainties are calculated from MERRA-2 and ERA5-land datasets. - wind_uncertainty (%) Wind speed uncertainty estimates (in percentage). The u- and v- components are aggregated on each grid to obtain wind speed. Uncertainties are calculated from MERRA-2 and ERA5-land datasets. - pet_uncertainty (%) Potential evapotranspiration uncertainty estimates (in percentage). Uncertainties are calculated from GLEAM and REA datasets. Returns ------- pd.DataFrame a :obj:`pandas.DataFrame` of shape (n, 7) where n is the number of stations """ stations = self._get_stations(stations, agency) fpath = os.path.join( self.path, "Global_files", "Global_files", 'Uncertainty.csv') df = pd.read_csv(fpath, index_col=0) df = df[~df.index.duplicated(keep='first')] return df.loc[stations, :]
[docs] def atlas(self, stations: List[str] = "all", agency: List[str] = "all") -> pd.DataFrame: """ The link table between GSHA watershed IDs and RiverATLAS river reach IDs, as well as the selected static attributes Returns ------- pd.DataFrame a :obj:`pandas.DataFrame` of shape (n, 24) where n is the number of stations """ stations = self._get_stations(stations, agency) fpath = os.path.join( self.path, "Global_files", "Global_files", 'GSHA_ATLAS.csv') df = pd.read_csv(fpath, index_col=0) df = df[~df.index.duplicated(keep='first')] return df.loc[stations, :]
[docs] def lc_variables_stn(self, stn: str) -> pd.DataFrame: """ Landcover variables for a given station which have yearly timestep. Following three landcover variables are provided: - urban_fraction(%): Ratio of urban extent to the entire watershed area (percentage). - forest_fraction(%): Ratio of forest extent to the entire watershed area (percentage). - cropland_fraction(%): Ratio of cropland extent to the entire watershed area (percentage). Returns ------- pd.DataFrame a :obj:`pandas.DataFrame` of shape (n, 3) where n is the number of years """ return lc_variable_stn(self.path, stn)
[docs] def lc_variables( self, stations: List[str] = "all", agency: List[str] = "all" ): """ Landcover variables for one or more than one station either as :obj:`xarray.Dataset` or dictionary. The data has yearly timestep. """ stations = self._get_stations(stations, agency) lc_vars = self.lc_vars_all_stns() if isinstance(lc_vars, xr.Dataset): return lc_vars[stations] else: return {stn: lc_vars[stn] for stn in stations}
def lc_vars_all_stns( self ): nc_path = os.path.join(self.path, 'lc_variables.nc') if self.to_netcdf and os.path.exists(nc_path): if self.verbosity: print(f"Reading from pre-existing {nc_path}") return xr.open_dataset(nc_path) cpus = self.processes or max(get_cpus() - 2, 1) start = time.time() stations = self.stations() paths = [self.path for _ in range(len(stations))] if self.verbosity: print(f"Reading landcover variables for {len(stations)} stations using {cpus} cpus") with cf.ProcessPoolExecutor(cpus) as executor: results = executor.map( lc_variable_stn, paths, stations, ) if self.verbosity: print(f"Time taken: {time.time() - start:.2f} seconds") if self.to_netcdf: encoding = {var: {'dtype': 'float32', 'zlib': True, 'complevel': 3} for var in stations()} ds = xr.Dataset({stn: xr.DataArray(val) for stn, val in zip(stations, results)}) if self.verbosity: print(f"Saving to {nc_path}") ds.to_netcdf(nc_path, encoding=encoding) else: ds = {stn: df for stn, df in zip(stations, results)} return ds
[docs] def reservoir_variables_stn(self, stn: str) -> pd.DataFrame: """ Reservoir variables for a given station from 1979 to 2020 with yearly timestep. Following two reservoir variables are provided: - ``capacity``: Reservoir capacity of the year in the watershed (m3). To avoid including too many missing values, we use the ICOLD capacity in the linked table of the GeoDAR dataset. - ``dor``: Degree of regulation of the watershed (yearly reservoir capacity/yearly mean flow). If yearly mean flow is missing, the value is substituted with the average of all mean flow values. Returns ------- pd.DataFrame a :obj:`pandas.DataFrame` of shape (42, 2) where 42 is the number of years """ return reservoir_vars_stn(self.path, stn)
[docs] def reservoir_variables( self, stations: List[str] = "all", agency: List[str] = "all" ): """ Reservoir variables for one or more than one station either as :obj:`xarray.Dataset` or dictionary. The data has yearly timestep. """ stations = self._get_stations(stations, agency) lc_vars = self.reservoir_vars_all_stns() if isinstance(lc_vars, xr.Dataset): return lc_vars[stations] else: return {stn: lc_vars[stn] for stn in stations}
def reservoir_vars_all_stns( self ): nc_path = os.path.join(self.path, 'reservoir_variables.nc') if self.to_netcdf and os.path.exists(nc_path): if self.verbosity: print(f"Reading from pre-existing {nc_path}") return xr.open_dataset(nc_path) cpus = self.processes or max(get_cpus() - 2, 1) start = time.time() stations = self.stations() paths = [self.path for _ in range(len(stations))] if self.verbosity: print(f"Reading reservoir variables for {len(stations)} stations using {cpus} cpus") with cf.ProcessPoolExecutor(cpus) as executor: results = executor.map( reservoir_vars_stn, paths, stations, ) if self.verbosity: print(f"Time taken: {time.time() - start:.2f} seconds") if self.to_netcdf: encoding = {var: {'dtype': 'float32', 'zlib': True, 'complevel': 3} for var in stations} ds = xr.Dataset({stn: xr.DataArray(val) for stn, val in zip(stations, results)}) self.print(f"Saving to {nc_path}") ds.to_netcdf(nc_path, encoding=encoding) else: ds = {stn: df for stn, df in zip(stations, results)} return ds
[docs] def streamflow_indices_stn(self, stn: str) -> pd.DataFrame: """ Streamflow indices for a given station which have yearly timestep. Returns ------- pd.DataFrame a :obj:`pandas.DataFrame` of shape (n, 16) where n is the number of years """ return streamflow_indices_stn(self.path, stn)
[docs] def streamflow_indices( self, stations: List[str] = "all", agency: List[str] = "all" ): """ Landcover variables for one or more than one station either as :obj:`xarray.Dataset` or dictionary. The data has yearly timestep. """ stations = self._get_stations(stations, agency) lc_vars = self.streamflow_indices_all_stations() if isinstance(lc_vars, xr.Dataset): return lc_vars[stations] else: return {stn: lc_vars[stn] for stn in stations}
def streamflow_indices_all_stations( self ): nc_path = os.path.join(self.path, 'streamflow_indices.nc') if self.to_netcdf and os.path.exists(nc_path): if self.verbosity: print(f"Reading from pre-existing {nc_path}") return xr.open_dataset(nc_path) cpus = self.processes or max(get_cpus() - 2, 1) start = time.time() stations = self.stations() paths = [self.path for _ in range(len(stations))] if self.verbosity: print(f"Reading streamflow indices for {len(stations)} stations using {cpus} cpus") # takes ~20 seconds with 110 cpus with cf.ProcessPoolExecutor(cpus) as executor: results = executor.map( streamflow_indices_stn, paths, stations, ) if self.verbosity: print(f"Time taken: {time.time() - start:.2f} seconds") if self.to_netcdf: encoding = {var: {'dtype': 'float32', 'zlib': True, 'complevel': 3} for var in stations()} ds = xr.Dataset({str(stn): xr.DataArray(df) for stn, df in zip(stations, results)}) if self.verbosity: print(f"Saving to {nc_path}") ds.to_netcdf(nc_path, encoding=encoding) else: ds = {stn: df for stn, df in zip(stations, results)} return ds
[docs] def lai_stn(self, stn: str) -> pd.Series: """ Daily leaf area index. As per documentation, due to satellite data quality, some watersheds might have relatively serious data missing issue. The data is from 1981-01-01 to 2020-12-31. Returns ------- pd.Series a :obj:`pandas.Series` of shape (14571,) where 14571 is the number of days """ nc_fpath = os.path.join(self.path, 'lai.nc') if os.path.exists(nc_fpath) and xr is not None: ds = xr.open_dataset(nc_fpath) ser = ds[stn].to_pandas() ds.close() return ser return lai_stn(self.path, stn)
[docs] def fetch_lai( self, stations: List[str] = "all", agency: List[str] = "all" ): """ Leaf Area Index timeseries for one or more than one station either as :obj:`xarray.Dataset` or :obj:`pandas.DataFrame`. The data has daily timestep. """ stations = self._get_stations(stations, agency) lai = self.lai_all_stns() return lai[stations]
def lai_all_stns( self ): if self.to_netcdf: nc_path = os.path.join(self.path, 'lai.nc') if os.path.exists(nc_path): if self.verbosity: print(f"Reading from pre-existing {nc_path}") return xr.open_dataset(nc_path) elif os.path.exists(os.path.join(self.path, 'lai.csv')): if self.verbosity: print(f"Reading from pre-existing {self.path}") return pd.read_csv(os.path.join(self.path, 'lai.csv'), index_col=0) cpus = self.processes or max(get_cpus() - 2, 1) start = time.time() stations = self.stations() paths = [self.path for _ in range(len(stations))] if self.verbosity: print(f"Reading lai for {len(stations)} stations using {cpus} cpus") with cf.ProcessPoolExecutor(cpus) as executor: results = executor.map( lai_stn, paths, stations, ) if self.verbosity: print(f"Time taken: {time.time() - start:.2f} seconds") if self.to_netcdf: encoding = {stn: {'dtype': 'float32', 'zlib': True, 'complevel': 3} for stn in stations} nc_path = os.path.join(self.path, 'lai.nc') ds = xr.Dataset({stn: xr.DataArray(val) for stn, val in zip(stations, results)}) print(f"Saving to {nc_path}") ds.to_netcdf(nc_path, encoding=encoding) else: ds = pd.concat(results, axis=1) csv_path = os.path.join(self.path, 'lai.csv') if self.verbosity: print(f"Saving to {csv_path}") ds.to_csv(csv_path, index=True) return ds
[docs] def meteo_vars(self)->List[str]: """ returns names of meteorological variables """ return self.meteo_vars_stn('1001_arcticnet').columns.tolist()
[docs] def meteo_vars_stn( self, stn: str ) -> pd.DataFrame: """ Daily meteorological variables from 1979-01-01 to 2022-12-31 for a given station. Returns ------- pd.DataFrame a :obj:`pandas.DataFrame` of shape (16071, 19) where n is the number of days """ nc_path = os.path.join(self.path, 'meteo_vars.nc') if os.path.exists(nc_path) and xr is not None: # even if the files exist, we may not have xarray installed ds = xr.open_dataset(nc_path) df = ds[stn].to_pandas() ds.close() return df path = os.path.join( self.path, METEO_MAP[self.agency_of_stn(stn)], f'{stn}.csv' ) return self._meteo_vars_stn(path)
[docs] def meteo_vars_all_stns(self): """ Meteorological variables from 1979-01-01 to 2022-12-31 for all stations either as :obj:`xarray.Dataset` or dictionary. The data has daily timestep. """ nc_path = os.path.join(self.path, 'meteo_vars.nc') if self.to_netcdf and os.path.exists(nc_path): if self.verbosity: print(f"Reading from pre-existing {nc_path}") return xr.open_dataset(nc_path) meteo_vars = {} paths = [os.path.join( self.path, METEO_MAP[self.agency_of_stn(stn)], f'{stn}.csv') for stn in self.stations()] cpus = self.processes or max(get_cpus() - 2, 1) start = time.time() if self.verbosity: print(f"Reading meteorological variables for {len(self.stations())} stations using {cpus} cpus") # takes ~ 1538 seconds with 110 cpus with cf.ProcessPoolExecutor(cpus) as executor: results = executor.map( self._meteo_vars_stn, paths ) if self.verbosity: print(f"Time taken: {time.time() - start:.2f} seconds") if self.to_netcdf: for stn, df in zip(self.stations(), results): meteo_vars[stn] = df encoding = {stn: {'dtype': 'float32', 'zlib': True, 'complevel': 3} for stn in meteo_vars.keys()} if self.verbosity>1: print(f"Creating xr.Dataset from {len(meteo_vars)} stations") ds = xr.Dataset(meteo_vars) if self.verbosity: print(f"Saving to {nc_path}") ds.to_netcdf(nc_path, encoding=encoding) else: ds = {stn: df for stn, df in zip(self.stations(), results)} return ds
[docs] def fetch_meteo_vars( self, stations: List[str] = "all", agency: List[str] = "all" ): """ Meteorological variables from 1979-01-01 to 2022-12-31 for one or more than one station either as :obj:`xarray.Dataset` or dictionary. The data has daily timestep. """ if agency != "all" and stations != 'all': raise ValueError("Either provide agency or stations not both") if agency != "all": agency = validate_attributes(agency, self.agencies, 'agency') stations = self.wsAll[self.wsAll['agency'].isin(agency)].index.tolist() else: stations = validate_attributes(stations, self.stations(), 'stations') meteo_vars = self.meteo_vars_all_stns() if isinstance(meteo_vars, xr.Dataset): return meteo_vars[stations] else: return {stn: meteo_vars[stn] for stn in stations}
[docs] def storage_vars_stn( self, stn: str ) -> pd.DataFrame: """ Daily Water storage term variables from 1979-01-01 to 2021-12-31 for a given station. - SM_layer1: 0-7 cm soil moisture from ERA5 land soil water layer 1 (m3/m3) for 1979-2021. - SM_layer2: 7-28 cm soil moisture from ERA5 land soil water layer 2 (m3/m3) for 1979-2021. - SM_layer3: 28-100 cm soil moisture from ERA5 land soil water layer 3 (m3/m3) for 1979-2021. - SM_layer4: 100-289 cm soil moisture from ERA5 land soil water layer 4 (m3/m3) for 1979-2021. - SWDE: Snow water equivalent from ERA5 snow depth water equivalent (m of water equivalent) for 1979-2021. - groundwater(%): Groundwater percentage from GRACE-FO data assimilation (%) for 2003-2021 (weekly). Returns ------- pd.DataFrame a :obj:`pandas.DataFrame` of shape (15706, 6) where n is the number of days """ nc_path = os.path.join(self.path, 'storage.nc') if os.path.exists(nc_path) and xr is not None: ds = xr.open_dataset(nc_path) df = ds[stn].to_pandas() ds.close() return df path = os.path.join( self.path, "Storage", "Storage", f'{stn}.csv' ) return self._storage_vars_stn(path)
[docs] def storage_vars_all_stns(self): """ Water storage term variables from 1979-01-01 to 2021-12-31 for all stations either as :obj:`xarray.Dataset` or dictionary. The data has daily timestep. """ nc_path = os.path.join(self.path, 'storage.nc') if self.to_netcdf and os.path.exists(nc_path): if self.verbosity: print(f"Reading from pre-existing {nc_path}") return xr.open_dataset(nc_path) storage_vars = {} paths = [os.path.join( self.path, "Storage", "Storage", f'{stn}.csv') for stn in self.stations()] cpus = self.processes or max(get_cpus() - 2, 1) start = time.time() if self.verbosity: print(f"Reading storage vars for {len(self.stations())} stations using {cpus} cpus") # takes ~ 975 seconds with 110 cpus with cf.ProcessPoolExecutor(cpus) as executor: results = executor.map( self._storage_vars_stn, paths ) if self.verbosity: print(f"Time taken: {time.time() - start:.2f} seconds") if self.to_netcdf: encoding = {stn: {'dtype': 'float32', 'zlib': True, 'complevel': 3} for stn in self.stations()} for stn, df in zip(self.stations(), results): storage_vars[stn] = df ds = xr.Dataset(storage_vars) if self.verbosity: print(f"Saving to {nc_path}") ds.to_netcdf(nc_path, encoding=encoding) else: ds = {stn: df for stn, df in zip(self.stations(), results)} return ds
[docs] def storage_vars(self)->List[str]: """returns names of storage variables""" return self.storage_vars_stn('1001_arcticnet').columns.tolist()
[docs] def fetch_storage_vars( self, stations: List[str] = "all", agency: List[str] = "all" ): """ Water storage term variables from 1979-01-01 to 2021-12-31 for one or more than one station either as :obj:`xarray.Dataset` or dictionary. The data has daily timestep. """ stations = self._get_stations(stations, agency) meteo_vars = self.storage_vars_all_stns() if isinstance(meteo_vars, xr.Dataset): return meteo_vars[stations] else: return {stn: meteo_vars[stn] for stn in stations}
[docs] def fetch_static_features( self, stations: Union[str, List[str]] = "all", static_features: Union[str, List[str]] = "all", agency: List[str] = "all", ) -> pd.DataFrame: """ Returns static features of one or more stations. Parameters ---------- stations : str name/id of station/stations of which to extract the data static_features : list/str, optional (default="all") The name/names of features to fetch. By default, all available static features are returned. Returns ------- pd.DataFrame a :obj:`pandas.DataFrame` of shape (stations, features) Examples --------- >>> from aqua_fetch import GSHA >>> dataset = GSHA() get the names of stations >>> stns = dataset.stations() >>> len(stns) 21568 get all static data of all stations >>> static_data = dataset.fetch_static_features(stns) >>> static_data.shape (21568, 35) get static data of one station only >>> static_data = dataset.fetch_static_features('1001_arcticnet') >>> static_data.shape (1, 35) get the names of static features >>> dataset.static_features get only selected features of all stations >>> static_data = dataset.fetch_static_features(stns, ['ele_mt_uav', 'slp_dg_uav']) >>> static_data.shape (21568, 2) >>> data = dataset.fetch_static_features('1001_arcticnet', static_features=['slp_dg_uav', 'slp_dg_uav']) >>> data.shape (1, 2) >>> out = ds.fetch_static_features(agency='arcticnet') >>> out.shape (106, 35 """ stations = self._get_stations(stations, agency) features = validate_attributes(static_features, self.static_features, 'static_features') df = pd.concat([ self.atlas(stations), self.uncertainty(stations), self.wsAll.loc[stations, :] ], axis=1 )#.loc[:, features] df.rename(columns=self.static_map, inplace=True) return df.loc[:, features]
[docs] def fetch_stn_dynamic_features( self, station: str, dynamic_features='all', st : Union[str, pd.Timestamp] = None, en : Union[str, pd.Timestamp] = None ) -> pd.DataFrame: """ Fetches all or selected dynamic features of one station. Parameters ---------- station : str name/id of station of which to extract the data features : list/str, optional (default="all") The name/names of features to fetch. By default, all available dynamic features are returned. Returns ------- pd.DataFrame a :obj:`pandas.DataFrame` of shape (n, features) where n is the number of days Examples -------- >>> from aqua_fetch import GSHA >>> dataset = GSHA() >>> data = dataset.fetch_stn_dynamic_features('1001_arcticnet') >>> data.shape (16071, 26) >>> dataset.dynamic_features >>> data = dataset.fetch_stn_dynamic_features('1001_arcticnet', ... dynamic_features=['airtemp_C_mean_era5', 'pcp_mm_mswep']) >>> data.shape (16071, 2) """ features = validate_attributes(dynamic_features, self.dynamic_features, 'dynamic_features') st, en = self._check_length(st, en) out = pd.concat( [self.meteo_vars_stn(station), self.storage_vars_stn(station), self.lai_stn(station).rename('lai') ], axis=1 ).loc[st:en, features] out.columns.name = 'dynamic_features' return out
[docs] def fetch_dynamic_features( self, stations: Union[List[str], str] = "all", dynamic_features='all', st=None, en=None, as_dataframe=False, agency: List[str] = "all", )-> "Dataset": """ Fetches all or selected dynamic features of one station. Parameters ---------- stations : str name/id of station of which to extract the data features : list/str, optional (default="all") The name/names of features to fetch. By default, all available dynamic features are returned. st : Optional (default=None) start time from where to fetch the data. en : Optional (default=None) end time untill where to fetch the data as_dataframe : bool, optional (default=False) if true, the returned data is :obj:`pandas.DataFrame` otherwise it is xarray dataset Examples -------- >>> from aqua_fetch import GSHA >>> dataset = GSHA() >>> data = dataset.fetch_dynamic_features('1001_arcticnet', as_dataframe=True) >>> data.shape (16071, 26) >>> dataset.dynamic_features >>> stns = ['1001_arcticnet', '10062_arcticnet'] >>> data = dataset.fetch_dynamic_features(stns, ... dynamic_features=['airtemp_C_mean_era5', 'pcp_mm_mswep']) """ # todo : extremely slow even for two stations stations = self._get_stations(stations, agency) features = validate_attributes(dynamic_features, self.dynamic_features, 'dynamic_features') st, en = self._check_length(st, en) if len(stations) == 1: if as_dataframe: stn_df = self.fetch_stn_dynamic_features(stations[0], features, st, en) return {stations[0]: stn_df} else: return xr.Dataset({stations[0]: xr.DataArray(self.fetch_stn_dynamic_features(stations[0], features, st, en))}) if as_dataframe: # raise NotImplementedError("as_dataframe=True is not implemented yet for multiple stations") dynamic = {stn:self.fetch_stn_dynamic_features(stn, features, st, en) for stn in stations} return dynamic # todo : we should read meteo, storage and lai only when they are required! meteo_vars = self.fetch_meteo_vars(stations) storage_vars = self.fetch_storage_vars(stations) if 'lai' in features: # since lai does not have 'features' dimension, we need to add it lai = self.fetch_lai(stations).expand_dims({'features': ['lai']}) ds = xr.concat([meteo_vars, storage_vars, lai], dim='features') else: ds = xr.concat([meteo_vars, storage_vars], dim='features') ds = ds.rename({'features': 'dynamic_features'}) return ds.sel(time=slice(st, en), dynamic_features=features)
def _meteo_vars_stn(self, fpath) -> pd.DataFrame: if not os.path.exists(fpath): raise FileNotFoundError(f"{fpath} not found") df = pd.read_csv(fpath, index_col=0) df.index = pd.to_datetime(df.index) df.index.name = 'time' df.columns.name = 'features' df.rename(columns=self.dyn_map, inplace=True) for col, func in self.dyn_factors.items(): if col in df.columns: df[col] = df[col].apply(func) return df.astype(self.fp) def _storage_vars_stn(self, fpath) -> pd.DataFrame: if not os.path.exists(fpath): raise FileNotFoundError(f"{fpath} not found") df = pd.read_csv(fpath, index_col=0, dtype={'SWDE': np.float32, 'SML1': np.float32, 'SML2': np.float32, 'SML3': np.float32, 'SML4': np.float32, 'GW': np.float32, } ) df.index = pd.to_datetime(df.index) df.index.name = 'time' df.columns.name = 'features' df.rename(columns=self.dyn_map, inplace=True) for col, func in self.dyn_factors.items(): if col in df.columns: df[col] = df[col].apply(func) return df
[docs] class _GSHA(_RainfallRunoff): """ Parent class for those datasets which uses static and dynamic features from GSHA dataset . The following dataset classes are based on this class: - :py:class:`aqua_fetch.Japan` - :py:class:`aqua_fetch.Thailand` - :py:class:`aqua_fetch.Spain` """
[docs] def __init__( self, gsha_path: Union[str, os.PathLike] = None, verbosity: int = 1, **kwargs): super(_GSHA, self).__init__(verbosity=verbosity, **kwargs) if gsha_path is None: self.gsha_path = os.path.dirname(self.path) else: self.gsha_path = gsha_path self.gsha = GSHA(path=self.gsha_path, verbosity=verbosity) self._stations = self.__stations()
@property def boundary_file(self) -> os.PathLike: return self.gsha.boundary_file @property def start(self) -> pd.Timestamp: return pd.Timestamp('1979-01-01') @property def end(self) -> pd.Timestamp: return pd.Timestamp('2022-12-31') @property def dynamic_features(self) -> List[str]: return [observed_streamflow_cms()] + self.gsha.dynamic_features @property def static_features(self) -> List[str]: return self.gsha.static_features
[docs] def stations(self) -> List[str]: return self._stations
def __stations(self) -> List[str]: """ returns names of only those stations which are also documented by GSHA. """ return [stn.split('_')[0] for stn in self.gsha.agency_stations(self.agency_name)] def _fetch_dynamic_features( self, stations: list, dynamic_features='all', st=None, en=None, as_dataframe=False, ): """Fetches dynamic features of station.""" # todo : setting as_dataframe=True is too slow for 500 stations st, en = self._check_length(st, en) features = validate_attributes(dynamic_features, self.dynamic_features.copy(), 'dynamic_features') daily_q = None if observed_streamflow_cms() in features: daily_q = self.fetch_q(as_dataframe) if isinstance(daily_q, xr.Dataset): daily_q = daily_q.sel(time=slice(st, en))[stations] else: daily_q = daily_q.loc[st:en, stations] features.remove(observed_streamflow_cms()) if len(features) == 0: if isinstance(daily_q, pd.DataFrame): # as_dataframe is True so daily_q = {stn:pd.DataFrame(daily_q[stn].rename(observed_streamflow_cms())) for stn in daily_q.columns} return daily_q stations_ = [f"{stn}_{self.agency_name}" for stn in stations] data = self.gsha.fetch_dynamic_features(stations_, features, st, en, as_dataframe) if daily_q is not None: if isinstance(daily_q, xr.Dataset): assert isinstance(data, xr.Dataset), "xarray dataset not supported" data = data.rename({stn: stn.split('_')[0] for stn in data.data_vars}) # first create a new dimension in daily_q named dynamic_features daily_q = daily_q.expand_dims({'dynamic_features': [observed_streamflow_cms()]}) data = xr.concat([data, daily_q], dim='dynamic_features').sel(time=slice(st, en)) else: assert isinstance(data, dict), type(data) data = {stn.split('_')[0]: stn_data for stn, stn_data in data.items()} for stn,meteo_df in data.items(): stn_df = pd.concat([meteo_df, daily_q[stn].rename(observed_streamflow_cms())], axis=1) data[stn] = stn_df else: if isinstance(data, xr.Dataset): data = data.rename({stn: stn.split('_')[0] for stn in data.data_vars}) else: assert isinstance(data, dict) data = {stn.split('_')[0]: stn_data for stn, stn_data in data.items()} return data def _fetch_static_features( self, station="all", static_features: Union[str, list] = 'all', st=None, en=None, ) -> pd.DataFrame: """Fetches static features of station.""" if self.verbosity > 1: print('fetching static features') stations = validate_attributes(station, self.stations(), 'stations') stations_ = [f"{stn}_{self.agency_name}" for stn in stations] static_feats = self.gsha.fetch_static_features(stations_, static_features).copy() static_feats.index = [stn.split('_')[0] for stn in static_feats.index] return static_feats
[docs] def fetch_stations_features( self, stations: list, dynamic_features: Union[str, list, None] = 'all', static_features: Union[str, list, None] = None, st=None, en=None, as_dataframe: bool = False, **kwargs ) -> Tuple[pd.DataFrame, Union[pd.DataFrame, "Dataset"]]: """ returns features of multiple stations Examples -------- >>> from aqua_fetch import Arcticnet >>> dataset = Arcticnet() >>> stations = dataset.stations() >>> features = dataset.fetch_stations_features(stations) Returns ------- tuple A tuple of static and dynamic features. Static features are always returned as :obj:`pandas.DataFrame` with shape (stations, staticfeatures). The index of static features is the station/gauge ids while the columns are the static features. Dynamic features are returned as either xarray Dataset or :obj:`pandas.DataFrame` depending upon whether `as_dataframe` is True or False and whether the xarray module is installed or not. If dynamic features are xarray Dataset, then it consists of `data_vars` equal to the number of stations and `time` adn `dynamic_features` as dimensions. If dynamic features are returned as pandas DataFrame, then the first index is `time` and the second index is `dynamic_features`. """ stations = validate_attributes(stations, self.stations(), 'stations') if xr is None: if not as_dataframe: if self.verbosity: warnings.warn("xarray module is not installed so as_dataframe will have no effect. " "Dynamic features will be returned as pandas DataFrame") as_dataframe = True static, dynamic = None, None if dynamic_features is not None: dynamic = self._fetch_dynamic_features(stations=stations, dynamic_features=dynamic_features, as_dataframe=as_dataframe, st=st, en=en, **kwargs ) if static_features is not None: # we want both static and dynamic static = self._fetch_static_features(station=stations, static_features=static_features, st=st, en=en, **kwargs ) elif static_features is not None: # we want only static static = self._fetch_static_features( station=stations, static_features=static_features, **kwargs ) else: raise ValueError(f""" static features are {static_features} and dynamic features are also {dynamic_features}""") return static, dynamic
[docs] def fetch_static_features( self, stations: Union[str, List[str]] = "all", static_features: Union[str, List[str]] = "all", st=None, en=None, ) -> pd.DataFrame: """ returns static atttributes of one or multiple stations Parameters ---------- stations : str name/id of station of which to extract the data static_features : list/str, optional (default="all") The name/names of features to fetch. By default, all available static features are returned. st : en : Examples --------- >>> from aqua_fetch import Japan >>> dataset = Japan() get the names of stations >>> stns = dataset.stations() >>> len(stns) 12004 get all static data of all stations >>> static_data = dataset.fetch_static_features(stns) >>> static_data.shape (12004, 27) get static data of one station only >>> static_data = dataset.fetch_static_features('01010070') >>> static_data.shape (1, 27) get the names of static features >>> dataset.static_features get only selected features of all stations >>> static_data = dataset.fetch_static_features(stns, ['Drainage_Area_km2', 'Elevation_m']) >>> static_data.shape (12004, 2) """ return self._fetch_static_features(stations, static_features, st, en)
def streamflow_indices_stn( ds_path: Union[str, os.PathLike], stn: str ) -> pd.DataFrame: fpath = os.path.join( ds_path, "StreamflowIndices_yearly", "StreamflowIndices", f'{stn}.csv') if not os.path.exists(fpath): raise FileNotFoundError(f"{fpath} not found") df = pd.read_csv( fpath, index_col=0, dtype={'1- percentile': np.float32, '10- percentile': np.float32, '25- percentile': np.float32, 'median': np.float32, '75- percentile': np.float32, '90- percentile': np.float32, '99- percentile': np.float32, 'mean': np.float32, 'maximum (AMF)': np.float32, 'AMF occurrence date': str, 'frequency of high-flow days': np.int32, 'average duration of high-flow events': np.float32, 'frequency of low-flow days': np.int32, 'average duration of low-flow events': np.float32, 'number of days with Q=0 (days)': np.int32, 'valid observation days (days)': np.int32, } ) df.index = pd.to_datetime(df.index, format='%Y') df.index.name = 'years' df.columns.name = 'streamflow_indices' return df def lc_variable_stn(ds_path, stn: str) -> pd.DataFrame: nc_path = os.path.join(ds_path, 'lc_variables.nc') if os.path.exists(nc_path) and xr is not None: ds = xr.open_dataset(nc_path) df = ds[stn].to_pandas() ds.close() return df fpath = os.path.join( ds_path, "Landcover", "Landcover", f'{stn}.csv') if not os.path.exists(fpath): raise FileNotFoundError(f"{ds_path} for station {stn} not found") df = pd.read_csv(fpath, index_col=0, # dtype={'1- percentile': np.float32} ) df.index = pd.to_datetime(df.pop('year'), format='%Y') df.index.name = 'years' df.columns.name = 'lc_variables' return df def reservoir_vars_stn(ds_path, stn: str) -> pd.DataFrame: fpath = os.path.join( ds_path, "Reservoir", "Reservoir", f'{stn}.csv') nc_path = os.path.join(ds_path, 'reservoir_variables.nc') if os.path.exists(nc_path) and xr is not None: ds = xr.open_dataset(nc_path) df = ds[stn].to_pandas() ds.close() return df if not os.path.exists(fpath): raise FileNotFoundError(f"{ds_path} for station {stn} not found") df = pd.read_csv(fpath, index_col=0, dtype={'capacity': np.float32, 'dor': np.float32, 'year': np.int32} ) df.index = pd.to_datetime(df.index, format='%Y') df.index.name = 'years' df.columns.name = 'reservoir_variables' return df def lai_stn(ds_path, stn: str) -> pd.Series: fpath = os.path.join( ds_path, "LAI", "LAI", f'{stn}.csv') if not os.path.exists(fpath): raise FileNotFoundError(f"{ds_path} for station {stn} not found") df = pd.read_csv(fpath, index_col='date', dtype={stn: np.float32} ) df.index = pd.to_datetime(df.index) df.index.name = 'time' return df[stn] # the dates for data to be downloaded START_YEAR = 1979 END_YEAR = datetime.now().year
[docs] class Japan(_GSHA): """ Data of 694 catchments of Japan from `river.go.jp website <http://www1.river.go.jp>`_ . The meteorological data static catchment features and catchment boundaries taken from `GSHA <https://doi.org/10.5194/essd-16-1559-2024>`_ project. Therefore, the number of static features are 35 and dynamic features are 27 and the data is available from 1979-01-01 to 2022-12-31. """
[docs] def __init__( self, path:Union[str, os.PathLike] = None, gsha_path:Union[str, os.PathLike] = None, verbosity:int=1, **kwargs): super().__init__(path=path, verbosity=verbosity, gsha_path=gsha_path, **kwargs) if not os.path.exists(self.path): os.makedirs(self.path) self.bbox = {'llcrnrlat': 30, 'urcrnrlat': 45, 'llcrnrlon': 128, 'urcrnrlon':149} self.parallels = range(30, 45, 2) self.meridians = range(128, 149, 3)
@property def static_map(self) -> Dict[str, str]: return { 'area': catchment_area(), 'lat': gauge_latitude(), 'long': gauge_longitude(), } @property def agency_name(self)->str: return 'MLIT' # def _maybe_move_and_merge_shpfiles(self): # out_shp_file = os.path.join(self.path, "boundaries.shp") # if not os.path.exists(out_shp_file): # df = self.gsha._coords() # jpn_stns = df.loc[df['agency'] == 'MLIT'] # shp_path = os.path.join(self.gsha.path, "WatershedPolygons", # "WatershedPolygons") # shp_files = [os.path.join(shp_path, f"{filename}.shp") for filename in jpn_stns['station_id'].values] # for f in shp_files: # assert os.path.exists(f) # merge_shapefiles(shp_files, out_shp_file, add_new_field=True) # return
[docs] def fetch_q(self, as_dataframe:bool=True)->pd.DataFrame: """reads daily streamflow for all stations and puts them in a single file named data.csv. If data.csv is already present, then it is read and its contents are returned as dataframe. """ if self.timestep.lower() in ('daily', 'd'): df = download_daily_data( self.stations(), self.path, verbosity=self.verbosity, cpus=self.processes ) else: df = self.get_hourly_data(cpus=self.processes) df.index.name = 'time' if as_dataframe: return df df = xr.Dataset({stn: xr.DataArray(df.loc[:, stn]) for stn in df.columns}) return df
def get_hourly_data(self, cpus=None): hourly_file = os.path.join(self.path, 'hourly_data.csv') if os.path.exists(hourly_file): print(f"reading hourly data from {hourly_file}") q = pd.read_csv(hourly_file, index_col=0) q.index = pd.to_datetime(q.index) return q path = os.path.join(self.path, 'hourly_files') if not os.path.exists(path): os.makedirs(path) cpus = cpus or max(get_cpus() - 2, 1) if self.verbosity>0: print(f"preparing hourly data using {cpus} cpus") stn_qs = [] for idx, stn in enumerate(self.stations()): stn_q = download_hourly_stn(path, stn=stn, cpus=cpus, en_yr=datetime.now().year+1, verbosity=self.verbosity) if self.verbosity>0: print(f"{idx} {stn}, {len(stn_q)}, {stn_q.index[0]}") stn_qs.append(stn_q) q = pd.concat(stn_qs, axis=1) q.to_csv(hourly_file, index=True, index_label='time') return q
def download_daily_stn_yr( stn:str="309191289913130", yr:int=1979, )->pd.Series: """downloads daily data for a year for a station""" url = f'http://www1.river.go.jp/cgi-bin/DspWaterData.exe?KIND=7&ID={stn}&BGNDATE={yr}0131&ENDDATE={yr}1231&KAWABOU=NO' df = pd.read_html(url, encoding='EUC-JP')[1].loc[2:, 1:].reset_index(drop=True) # make a dictionary with months as keys and number of days as values days_in_month = {1: 31, 2: 28, 3: 31, 4: 30, 5: 31, 6: 30, 7: 31, 8: 31, 9: 30, 10: 31, 11: 30, 12: 31} # if it is a leap year, change the number of days in February if yr % 4 == 0: days_in_month[2] = 29 assert len(df)<13, len(df) yearly_data = [] for i in range(0, len(df)): row = df.iloc[i, 0:days_in_month[i+1]] yearly_data.append(row) stn_data = pd.concat(yearly_data).reset_index(drop=True) stn_data.index = pd.date_range(start=f'{yr}-01-01', end=f'{yr}-12-31', freq='D') stn_data.name = stn return stn_data def download_daily_data( stations:List[str], path:Union[str, os.PathLike], verbosity:int=1, cpus:int=None )->pd.DataFrame: """downloads daily data for all stations""" csv_path = os.path.join(path, 'daily_q.csv') if os.path.exists(csv_path): if verbosity: print(f"reading daily data from {csv_path}") return pd.read_csv(csv_path, index_col=0, parse_dates=True) years = range(START_YEAR, END_YEAR+1) stations_ = [[stn]*len(years) for stn in stations] # flatten the list stations_ = [item for sublist in stations_ for item in sublist] years_ = list(years) * len(stations) cpus = cpus or max(get_cpus() - 2, 1) if verbosity: print(f"downloading daily data for {len(stations)} stations from {years[0]} to {years[-1]}") if verbosity>1: print(f"Total function calls: {len(stations_)} with {cpus} cpus") # It takes ~ 100 seconds to download data for a single station for all years (1979-2024) with 1 cpu start = time.time() if cpus == 1: all_data = [] for idx, stn in enumerate(stations): stn_data = [] for yr in years: stn_yr_data = download_daily_stn_yr(stn, yr) stn_data.append(stn_yr_data) stn_data = pd.concat(stn_data, axis=0) stn_data.name = stn all_data.append(stn_data) if verbosity==1 and idx % 1000 == 0: print(f"Data for {idx+1} stations downloaded") elif verbosity==2 and idx % 500 == 0: print(f"Data for {idx+1} stations downloaded") elif verbosity==3 and idx % 100 == 0: print(f"Data for {idx+1} stations downloaded") elif verbosity==4 and idx % 10 == 0: print(f"Data for {idx+1} stations downloaded") elif verbosity>4: print(f"Data for {idx+1} stations downloaded") else: with cf.ProcessPoolExecutor(cpus) as executor: results = executor.map(download_daily_stn_yr, stations_, years_) all_data = [] for idx, stn in enumerate(stations): stn_data = [] for yr in years: stn_yr_data = next(results) stn_data.append(stn_yr_data) if verbosity==1 and idx % 1000 == 0: print(f"{idx} stations downloaded") elif verbosity==2 and idx % 500 == 0: print(f"{idx} stations downloaded") elif verbosity>2 and idx % 100 == 0: print(f"{idx} stations downloaded") stn_data = pd.concat(stn_data, axis=0) stn_data.name = stn if verbosity>2: print(f"total number of years: {yr} for {stn} with shape {stn_data.shape}") all_data.append(stn_data) if verbosity: print(f"total time taken to download data: {time.time() - start}") if verbosity>2: print(f"total number of stations: {len(all_data)} each with shape {all_data[0].shape}") all_data = pd.concat(all_data, axis=1) all_data = all_data.replace({'−': np.nan, '欠測': np.nan}) all_data = all_data.astype(np.float32) if verbosity: print(f"saving daily data to {csv_path} with shape {all_data.shape}") all_data.to_csv(csv_path) return all_data def download_hourly_stn_day( stn:str="309191289913130", st:str="20211227", en:str="20211227" ): """download hourly data for a single day for a single station""" url = f"http://www1.river.go.jp/cgi-bin/SrchSiteSuiData2.exe?SUIKEI=90336000&BGNDATE={st}&ENDDATE={en}&ID={stn}:0202;" data = pd.read_html(url, encoding='EUC-JP')[0] df = data.iloc[7:] df.columns = ['date', 'time', stn] if len(df)>0: # make sure that we have data for all 24 hours assert len(df) == 24, len(df) else: df.pop(stn) df.insert(2, stn, [np.nan for _ in range(24)]) df.index = pd.date_range(pd.Timestamp(st), periods=24, freq="h") return df[stn] def download_hourly_stn( path:Union[str, os.PathLike], stn:str="301031281101030", st_yr:int=1980, en_yr:int=2024, cpus:int=64, verbosity:int = 1 )->pd.Series: """downlaod Japanese hourly data for a station from st_yr to en_yr""" fpath = os.path.join(path, f"{stn}.csv") if os.path.exists(fpath): if verbosity>0: print(f"{stn} already exists") return pd.read_csv(fpath, index_col=0) starts, ends = [], [] for yr in range(st_yr, en_yr): if yr % 4 == 0: n_days = 366 else: n_days = 365 for j_day in range(0, n_days): # convert jday to date date = pd.Timestamp(f"{yr}-01-01") + pd.Timedelta(days=j_day) st = date.strftime("%Y%m%d") en = date.strftime("%Y%m%d") starts.append(st) ends.append(en) stations = [stn for _ in range(len(starts))] with cf.ProcessPoolExecutor(cpus) as executor: results = executor.map(download_hourly_stn_day, stations, starts, ends) yr_df = [] for res in results: yr_df.append(res) q = pd.concat(yr_df) # replace "欠測" with np.nan q = q.replace("欠測", np.nan) q = q.replace('-', np.nan) q = q.dropna().astype(np.float32).sort_index() # drop duplicated index q = q[~q.index.duplicated(keep='first')] q.to_csv(fpath) return q
[docs] class Arcticnet(_GSHA): """ Data of 106 catchments of arctic region from `r-arcticnet project <https://www.r-arcticnet.sr.unh.edu/v4.0/AllData/index.html>`_ . The meteorological data static catchment features and catchment boundaries taken from `GSHA <https://doi.org/10.5194/essd-16-1559-2024>`_ project. Therefore, the number of static features are 35 and dynamic features are 27 and the data is available from 1979-01-01 to 2003-12-31 although the observed streamflow (q_cms_obs) for some stations is available as earlier as from 1913-01-01. """
[docs] def __init__( self, path:Union[str, os.PathLike] = None, gsha_path:Union[str, os.PathLike] = None, verbosity:int=1, **kwargs): super().__init__(path=path, gsha_path=gsha_path, verbosity=verbosity, **kwargs) if not os.path.exists(self.path): os.makedirs(self.path) self.metadata = self.get_metadata() self._get_q() self._stations = [stn for stn in self.all_stations() if stn in self.gsha_arctic_stns()] self._static_features = self.gsha.static_features self.bbox = {'llcrnrlat': 50.0, 'urcrnrlat': 80.0, 'llcrnrlon': 25.0, 'urcrnrlon': 130.0} self.parallels = range(50, 80, 7) self.meridians = range(25, 130, 10)
@property def static_map(self) -> Dict[str, str]: return { 'area': catchment_area(), 'lat': gauge_latitude(), 'long': gauge_longitude(), } @property def end(self)->pd.Timestamp: return pd.Timestamp('2003-12-31')
[docs] def stations(self)->List[str]: return self._stations
def all_stations(self): return self.metadata.index.astype(str).tolist() @property def agency_name(self)->str: return 'arcticnet' def get_metadata(self): metadata_path = os.path.join(self.path, "metadata.csv") if not os.path.exists(metadata_path): df = pd.read_csv( "https://www.r-arcticnet.sr.unh.edu/v4.0/russia-arcticnet/Daily_SiteAttributes.txt", #"https://www.r-arcticnet.sr.unh.edu/v4.0/data/SiteAttributes.txt", sep="\t", encoding_errors='ignore', ) df.index = df.pop('Code') df.to_csv(metadata_path, index=True) else: df = pd.read_csv(metadata_path, index_col=0) return df def fetch_q(self, as_dataframe:bool=True): nc_path = os.path.join(self.path, "daily_q.nc") if os.path.exists(nc_path): if self.verbosity: print(f"Reading {nc_path}") q_ds = xr.open_dataset(nc_path) else: q_ds = xr.Dataset({stn:self.get_stn_q(stn) for stn in self.stations()}) # rename dimension/coordinate to 'time' in q_ds q_ds = q_ds.rename({'dim_0':'time'}) encoding = {stn: {'dtype': 'float32', 'zlib': True, 'complevel': 3} for stn in self.stations()} if self.verbosity: print(f"Writing {nc_path}") q_ds.to_netcdf(nc_path, encoding=encoding) if as_dataframe: q_ds = q_ds.to_dataframe() return q_ds def _get_q(self, as_dataframe:bool=True): q_path = os.path.join(self.path, "daily_q.csv") if not os.path.exists(q_path): df = pd.read_csv( "https://www.r-arcticnet.sr.unh.edu/v4.0/russia-arcticnet/discharge_m3_s_UNH-UCLA.txt", #"https://www.r-arcticnet.sr.unh.edu/v4.0/data/Discharge_ms.txt", sep="\t") df.to_csv(q_path) else: df = pd.read_csv(q_path, index_col=0) return df def get_stn_q(self, stn: str): q = self._get_q() stn_q = q.loc[q['Code']==int(stn), :].copy() stn_q.drop('Code', axis=1, inplace=True) # Function to check leap year and adjust February days def adjust_feb_days(year): if (year % 4 == 0 and year % 100 != 0) or (year % 400 == 0): return 29 return 28 month_days1 = { 'Jan': 31, 'Feb': 28, # February will be adjusted dynamically 'Mar': 31, 'Apr': 30, 'May': 31, 'Jun': 30, 'Jul': 31, 'Aug': 31, 'Sep': 30, 'Oct': 31, 'Nov': 30, 'Dec': 31 } # Prepare a mapping of month names to the number of days month_days2 = { 1: 31, 2: 28, # February will be adjusted dynamically 3: 31, 4: 30, 5: 31, 6: 30, 7: 31, 8: 31, 9: 30, 10: 31, 11: 30, 12: 31 } # Melt the DataFrame to convert it from wide format to long format df_long = stn_q.melt(id_vars=['Year', 'Day'], value_vars=['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'], var_name='Month', value_name='Value') month_to_int = {month: i + 1 for i, month in enumerate(month_days1.keys())} df_long['Month'] = df_long['Month'].map(month_to_int).astype(int) def get_days_in_month(row): try: if int(row.Month) == 2: return adjust_feb_days(row.Year) return month_days2[int(row.Month)] except KeyError as e: print(f"KeyError encountered: {e} with row details: {row}") return None # Or handle the error as needed df_long['DaysInMonth'] = df_long.apply(get_days_in_month, axis=1) # Filter out invalid days (e.g., February 30) df_long = df_long[df_long['Day'] <= df_long['DaysInMonth']] # Create a datetime column from the Year, Month, Day df_long.index = pd.to_datetime(df_long[['Year', 'Month', 'Day']]) return pd.Series(df_long['Value'], name=str(stn)) def gsha_arctic_stns(self)->List[str]: df = self.gsha.wsAll.copy() return [stn.split('_')[0] for stn in df[df.index.str.endswith('_arcticnet')].index]
[docs] class Spain(_GSHA): """ Data of 889 catchments of Spain from `ceh-es <https://ceh-flumen64.cedex.es>`_ website. The meteorological data static catchment features and catchment boundaries taken from `GSHA <https://doi.org/10.5194/essd-16-1559-2024>`_ project. Therefore, the number of static features are 35 and dynamic features are 27 and the data is available from 1979-01-01 to 2020-09-30. """
[docs] def __init__( self, path:Union[str, os.PathLike] = None, gsha_path:Union[str, os.PathLike] = None, overwrite:bool=False, verbosity:int=1, **kwargs): super().__init__( path=path, gsha_path=gsha_path, overwrite=overwrite, verbosity=verbosity, **kwargs) self.areas = [ "CANTABRICO", "DUERO", "EBRO", "GALICIA COSTA", "GUADALQUIVIR", "GUADIANA", "JUCAR", "MIÑO-SIL", "SEGURA", "TAJO" ] self.bbox = {"llcrnrlat": 36.0, "urcrnrlat": 44.0, "llcrnrlon": -10.0, "urcrnrlon": 4.0} self.parallels = np.arange(36.0, 44.0, 2.0) self.meridians = np.arange(-10.0, 4.0, 2.0)
@property def static_map(self) -> Dict[str, str]: return { 'area': catchment_area(), 'lat': gauge_latitude(), 'long': gauge_longitude(), } @property def end(self)->pd.Timestamp: return pd.Timestamp('2020-09-30') @property def agency_name(self)->str: return 'AFD'
[docs] def daily_q_all_areas(self)->pd.DataFrame: """Daily data of gauging stations in river from all areas Retuns ------ 16_806_305 rows x 3 """ dfs = [] for area in self.areas: df = self.daily_q_area(area) dfs.append(df) return pd.concat(dfs)
[docs] def daily_q_area(self, area:str)->pd.DataFrame: """Reads Daily data of gauging stations in river which is in afliq.csv file""" url = f"https://ceh-flumen64.cedex.es/anuarioaforos//anuario-2019-2020/{area}/afliq.csv" df = pd.read_csv(url, #os.path.join(self.path, area, "afliq.csv"), sep=';') idx = pd.to_datetime(df.pop('fecha'), dayfirst=True) df.index = idx df.index.name = "date" df.columns = ['stations', 'height_m', "q_cms"] return df
[docs] def fetch_q(self, as_dataframe:bool=True): """ returns daily q of all stations Returns ------- pd.DataFrame a :obj:`pandas.DataFrame` of shape (39721, 1447) """ fpath = os.path.join(self.path, 'daily_q.csv') if os.path.exists(fpath): data= pd.read_csv(fpath, index_col='Unnamed: 0') data.index = pd.to_datetime(data.index) data.index.name = 'time' if as_dataframe: return data return xr.Dataset({stn: xr.DataArray(data.loc[:, stn]) for stn in data.columns}) q = self.daily_q_all_areas() st = [] en = [] for g_name, grp in q.groupby('stations'): st.append(grp.sort_index().index[0]) en.append(grp.sort_index().index[-1]) start = pd.to_datetime(st).sort_values()[0] end = pd.to_datetime(en).sort_values()[-1] daily_qs = [] for stn, stn_df in q.groupby('stations'): q_ts = pd.Series(name=stn, index=pd.date_range(start, end=end, freq="d"), dtype=np.float32) q_ts[stn_df.index] = stn_df['q_cms'] daily_qs.append(q_ts) data = pd.concat(daily_qs, axis=1) data.to_csv(fpath) data.index.name = 'time' if as_dataframe: return data return xr.Dataset({stn: xr.DataArray(data.loc[:, stn]) for stn in data.columns})
[docs] class Thailand(_GSHA): """ Data of 73 catchments of Thailand from `RID project <https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/rid-river/disc_d.html>`_ . The meteorological data static catchment features and catchment boundaries taken from `GSHA <https://doi.org/10.5194/essd-16-1559-2024>`_ project. Therefore, the number of static features are 35 and dynamic features are 27 and the data is available from 1980-01-01 to 1999-12-31. """ url = { 'disc_d_1980_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1980_RIDall.zip', 'disc_d_1981_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1981_RIDall.zip', 'disc_d_1982_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1982_RIDall.zip', 'disc_d_1983_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1983_RIDall.zip', 'disc_d_1984_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1984_RIDall.zip', 'disc_d_1985_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1985_RIDall.zip', 'disc_d_1986_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1986_RIDall.zip', 'disc_d_1987_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1987_RIDall.zip', 'disc_d_1988_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1988_RIDall.zip', 'disc_d_1989_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1989_RIDall.zip', 'disc_d_1990_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1990_RIDall.zip', 'disc_d_1991_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1991_RIDall.zip', 'disc_d_1992_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1992_RIDall.zip', 'disc_d_1993_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1993_RIDall.zip', 'disc_d_1994_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1994_RIDall.zip', 'disc_d_1995_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1995_RIDall.zip', 'disc_d_1996_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1996_RIDall.zip', 'disc_d_1997_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1997_RIDall.zip', 'disc_d_1998_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1998_RIDall.zip', 'disc_d_1999_RIDall.zip': 'https://hydro.iis.u-tokyo.ac.jp/GAME-T/GAIN-T/routine/data/disc/disc_d_1999_RIDall.zip', }
[docs] def __init__( self, path:Union[str, os.PathLike] = None, gsha_path:Union[str, os.PathLike] = None, overwrite:bool=False, verbosity:int=1, **kwargs): super().__init__( path=path, gsha_path=gsha_path, overwrite=overwrite, verbosity=verbosity, **kwargs) self._download(overwrite=overwrite) self.bbox = {'llcrnrlat': 5.5, 'urcrnrlat': 21.5, 'llcrnrlon': 97.5, 'urcrnrlon':105.5} self.parallels = range(5, 22, 2) self.meridians = range(97, 106, 2)
@property def static_map(self) -> Dict[str, str]: return { 'area': catchment_area(), 'lat': gauge_latitude(), 'long': gauge_longitude(), } @property def agency_name(self)->str: return 'RID' @property def start(self)->pd.Timestamp: return pd.Timestamp('1980-01-01') @property def end(self)->pd.Timestamp: return pd.Timestamp('1999-12-31')
[docs] def fetch_q(self, as_dataframe:bool=True): """reads q""" fpath = os.path.join(self.path, 'daily_q.csv') if os.path.exists(fpath): if self.verbosity: print(f"Reading {fpath}") data = pd.read_csv(fpath, index_col=0, parse_dates=True) if as_dataframe: return data return xr.Dataset({stn: xr.DataArray(data.loc[:, stn]) for stn in data.columns}) datas = [] for year in range(1980, 2000): data = self._read_year(year) datas.append(data) data = pd.concat(datas) #data.columns = [column.replace('.', '_') for column in data.columns.tolist()] data.index.name = 'time' if self.verbosity: print(f"Writing {fpath}") data.to_csv(fpath) if as_dataframe: return data return xr.Dataset({stn: xr.DataArray(data.loc[:, stn]) for stn in data.columns})
def _read_year(self, year:int): year_path = os.path.join(self.path, f'disc_d_{year}_RIDall') yr_dfs = [] stn_ids = [] ndays = 365 if year%4==0: ndays = 366 for file in os.listdir(year_path): fpath = os.path.join(year_path, file) df = pd.read_csv( fpath, sep='\t', names=['index', 'q_cms'], nrows=ndays, na_values=-9999.0, ) df.index = pd.to_datetime(df.pop('index')) yr_dfs.append(df) station = file.split('RID')[1].split('_m3s')[0] stn_ids.append(station) df = pd.concat(yr_dfs, axis=1) df.columns = stn_ids return df