import os
import json
from typing import Union, List, Dict
import numpy as np
import pandas as pd
from .utils import _RainfallRunoff
from .._backend import fiona
from .._backend import xarray as xr
from ..utils import merge_shapefiles_fiona
from ..utils import validate_attributes, dateandtime_now
from ._map import (
observed_streamflow_cms,
observed_streamflow_mm,
min_air_temp,
max_air_temp,
mean_air_temp,
mean_rel_hum,
mean_daily_evaporation,
max_daily_ground_surface_temp,
min_daily_ground_surface_temp,
mean_daily_ground_surface_temp,
total_precipitation,
sunshine_duration,
max_windspeed,
min_windspeed,
mean_windspeed,
)
from ._map import (
catchment_area,
gauge_latitude,
gauge_longitude,
slope
)
[docs]
class CCAM(_RainfallRunoff):
"""
Dataset for Yellow River (China) catchments. The CCAM dataset was published by
`Hao et al., 2021 <https://doi.org/10.5194/essd-13-5591-2021>`_ and has two sets.
One set consists of catchment attributes, meteorological data, catchment boundaries
of over 4000 catchments. However this data does not have streamflow data. The second
set consists of streamflow, catchment attributes, catchment boundaries and meteorological
data for 102 catchments of Yellow River. Since this second set conforms to the norms
of CAMELS, this class uses this second set. Therefore, the ``fetch``, ``stations`` and other
methods/attributes of this class return data of only Yellow River catchments
and not for whole china. However, the first set of data is can
also be fetched using `fetch_meteo` method of this class. The temporal extent of both
sets is from 1999 to 2020. However, the streamflow time series in first set has very
large number of missing values. The data of Yellow river consists fo 16 dynamic
features (time series) and 124 static features (catchment attributes).
Examples
---------
>>> from aqua_fetch import CCAM
>>> dataset = CCAM()
... # get data by station id
>>> _, dynamic = dataset.fetch(stations='0010', as_dataframe=True)
>>> df = dynamic['0010'] # dynamic is a dictionary of with keys as station names and values as DataFrames
>>> df.shape
(8035, 16)
...
... # get name of all stations as list
>>> stns = dataset.stations()
>>> len(stns)
102
... # get data of 10 % of stations as dataframe
>>> _, dynamic = dataset.fetch(0.1, as_dataframe=True)
>>> len(dynamic) # dynamic has data for 10% of stations (10 out of 102)
10
...
... # dynamic is a dictionary whose values are dataframes of dynamic features
>>> [df.shape for df in dynamic.values()]
[(8035, 16), (8035, 16), (8035, 16),... (8035, 16), (8035, 16)]
...
... get the data of a single (randomly selected) station
>>> _, dynamic = dataset.fetch(stations=1, as_dataframe=True)
>>> len(dynamic) # dynamic has data for 1 station
1
... # get names of available dynamic features
>>> dataset.dynamic_features
... # get only selected dynamic features
>>> _, dynamic = dataset.fetch('0010', as_dataframe=True,
... dynamic_features=['pcp_mm', 'airtemp_C_mean', 'evap_mm', 'rh_%', 'q_cms_obs'])
>>> dynamic['0010'].shape
(8035, 5)
...
... # get names of available static features
>>> dataset.static_features
... # get data of 10 random stations
>>> _, dynamic = dataset.fetch(10, as_dataframe=True)
>>> len(dynamic) # remember this is a dictionary with values as dataframe
10
...
# If we get both static and dynamic data
>>> static, dynamic = dataset.fetch(stations='0010', static_features="all", as_dataframe=True)
>>> static.shape, len(dynamic), dynamic['0010'].shape
((1, 124), 1, (8035, 8))
...
# If we don't set as_dataframe=True and have xarray installed then the returned data will be a xarray Dataset
>>> _, dynamic = dataset.fetch(10)
... type(dynamic)
xarray.core.dataset.Dataset
...
>>> dynamic.dims
FrozenMappingWarningOnValuesAccess({'time': 8035, 'dynamic_features': 16})
...
>>> len(dynamic.data_vars)
10
...
>>> coords = dataset.stn_coords() # returns coordinates of all stations
>>> coords.shape
(102, 2)
>>> dataset.stn_coords('0010') # returns coordinates of station whose id is 0010
36.059803 112.3638
>>> dataset.stn_coords(['0010', '0104']) # returns coordinates of two stations
...
# get area of a single station
>>> dataset.area('0010')
# get coordinates of two stations
>>> dataset.area(['0010', '0104'])
...
# if fiona library is installed we can get the boundary as fiona Geometry
>>> dataset.get_boundary('0010')
"""
url = "https://zenodo.org/record/5729444"
[docs]
def __init__(self,
path=None,
overwrite:bool=False,
to_netcdf:bool = True,
**kwargs):
"""
Parameters
----------
path : str
If the data is alredy downloaded then provide the complete
path to it. If None, then the data will be downloaded.
The data is downloaded once and therefore susbsequent
calls to this class will not download the data unless
``overwrite`` is set to True.
overwrite : bool
If the data is already down then you can set it to True,
to make a fresh download.
to_netcdf : bool
whether to convert all the data into one netcdf file or not.
This will fasten repeated calls to fetch etc but will
require netCDF4 package as well as xarry.
"""
super(CCAM, self).__init__(path=path, **kwargs)
self.path = path
self._download(overwrite=overwrite)
if to_netcdf:
self._maybe_to_netcdf()
self._maybe_meteo_to_nc()
shp_path = os.path.join(self.path,
"7_HydroMLYR",
"7_HydroMLYR",
"0_basin_boundary")
files = [file for file in os.listdir(shp_path) if file.endswith('.shp')]
shp_files = [os.path.join(shp_path, shp_file) for shp_file in files]
boundaries = os.path.join(shp_path, "boundaries")
if fiona is not None:
merge_shapefiles_fiona(shp_files, boundaries)
self.bbox = {'llcrnrlat': 20.0, 'urcrnrlat': 50.0, 'llcrnrlon': 70.0, 'urcrnrlon': 140.0}
self.parallels = range(20, 50, 5)
self.meridians = range(70, 140, 10)
@property
def boundary_file(self) -> os.PathLike:
return os.path.join(
self.path,
"7_HydroMLYR",
"7_HydroMLYR",
"0_basin_boundary",
"boundaries",
"boundaries.shp")
@property
def static_map(self) -> Dict[str, str]:
return {
'area': catchment_area(),
'slope': slope('mkm-1'),
'lat': gauge_latitude(),
'lon': gauge_longitude(),
}
@property
def dyn_map(self):
return {
'q': observed_streamflow_cms(), # todo: check the units
'tem_min': min_air_temp(),
'tem_max': max_air_temp(),
'tem_mean': mean_air_temp(),
'rhu': mean_rel_hum(),
'evp': mean_daily_evaporation(),
'gst_max': max_daily_ground_surface_temp(),
'gst_min': min_daily_ground_surface_temp(),
'gst_mean': mean_daily_ground_surface_temp(),
'pre': total_precipitation(),
'ssd': sunshine_duration(),
'win_max': max_windspeed(),
'win_mean': mean_windspeed(),
}
@property
def meteo_path(self):
"""path where daily meteorological data of stations is present"""
return os.path.join(self.path, "1_meteorological", '1_meteorological')
@property
def meteo_nc_path(self):
return os.path.join(self.path, "meteo_data.nc")
@property
def meteo_stations(self)->List[str]:
stations = [fpath.split('.')[0] for fpath in os.listdir(self.meteo_path)]
stations.remove('35616')
return stations
@property
def yr_data_path(self):
return os.path.join(self.path, "7_HydroMLYR", "7_HydroMLYR", '1_data')
[docs]
def stations(self):
"""Returns station ids for catchments on Yellow River"""
return os.listdir(self.yr_data_path)
@property
def dynamic_features(self)->List[str]:
"""names of hydro-meteorological time series data for Yellow River catchments"""
return [self.dyn_map.get(feat, feat) for feat in ['pre', 'evp', 'gst_mean', 'prs_mean', 'tem_mean', 'rhu', 'win_mean',
'gst_min', 'prs_min', 'tem_min', 'gst_max', 'prs_max', 'tem_max', 'ssd',
'win_max', 'q']]
@property
def static_features(self)->List[str]:
"""names of static features for Yellow River catchments"""
attr_fpath = os.path.join(self.yr_data_path, self.stations()[0], 'attributes.json')
with open(attr_fpath, 'r') as fp:
data = json.load(fp)
return [self.static_map.get(feat, feat) for feat in data.keys()]
@property
def start(self): # start of data
return pd.Timestamp('1999-01-02 00:00:00')
@property
def end(self): # end of data
return pd.Timestamp('2020-12-31 00:00:00')
def _read_meteo_from_csv(
self,
station:str
)->pd.DataFrame:
"""returns daily meteorological data of one station as DataFrame after reading it
from csv file. This data is from 1990-01-01 to 2021-03-31. The returned
dataframe has following columns
- 'PRE'
- 'TEM': temperature
- 'PRS': pressure
- 'RHU',
- 'EVP',
- 'WIN',
- 'SSD': sunshine duration
- 'GST': ground surface temperature
- 'PET'
"""
fpath = os.path.join(self.meteo_path, f"{station}.txt")
df = pd.read_csv(fpath)
df.index = pd.to_datetime(df.pop("Date"))
if 'PET' not in df:
df['PET'] = None
# following two stations have multiple enteries
if station in ['17456', '18161']:
df = drop_duplicate_indices(df)
return df
def _maybe_meteo_to_nc(self):
if os.path.exists(self.meteo_nc_path):
if self.verbosity>1:
print(f"meteo data already converted to netcdf file at {self.meteo_nc_path}")
return
if self.verbosity:
print(f"converting meteo data to netcdf file")
stations = os.listdir(self.meteo_path)
dyn = {}
for idx, stn in enumerate(stations):
if stn not in ['35616.txt']:
stn_id = stn.split('.')[0]
dyn[stn_id] = self._read_meteo_from_csv(stn_id).astype(np.float32)
if self.verbosity and idx % 500 == 0:
print(f"{idx}/{len(stations)} stations processed")
elif self.verbosity>1 and idx % 200 == 0:
print(f"{idx}/{len(stations)} stations processed")
data_vars = {}
coords = {}
for k, v in dyn.items():
data_vars[k] = (['time', 'dynamic_features'], v)
index = v.index
index.name = 'time'
coords = {
'dynamic_features': list(v.columns),
'time': index
}
if self.verbosity>1:
print(f"Creating xarray dataset")
xds = xr.Dataset(
data_vars=data_vars,
coords=coords,
attrs={'date': f"create on {dateandtime_now()}"}
)
if self.verbosity>1:
print(f"creating netcdf file at {self.meteo_nc_path}")
xds.to_netcdf(self.meteo_nc_path)
return
[docs]
def fetch_meteo(
self,
station:Union[str, List[str]] = "all",
features:Union[str, List[str]] = "all",
st = '1990-01-01',
en = '2021-03-31',
as_dataframe:bool = True
):
"""
fetches meteorological data of 4902 chinese catchments
Examples
---------
>>> from aqua_fetch import CCAM
>>> dataset = CCAM()
>>> dynamic_features = ['PRE', 'TEM', 'PRS', 'RHU', 'EVP', 'WIN', 'PET']
>>> st = '1999-01-01'
>>> en = '2020-03-31'
>>> xds = dataset.fetch_meteo(features=features, st=st, en=en)
"""
def_features = ['PRE', 'TEM', 'PRS', 'RHU', 'EVP', 'WIN', 'SSD', 'GST', 'PET']
features = validate_attributes(features, def_features)
stations = validate_attributes(station, self.meteo_stations)
if xr is None:
raise ModuleNotFoundError(f"xarray must be installed")
else:
dyn = xr.open_dataset(self.meteo_nc_path)
dyn = dyn[stations].sel(dynamic_features=features, time=slice(st, en))
if as_dataframe:
dyn = dyn.to_dataframe(['time', 'dynamic_features'])
return dyn
def _read_yr_dynamic_from_csv(
self,
station:str
)->pd.DataFrame:
"""
Reads daily dynamic (meteorological + streamflow) data for one catchment of
yellow river and returns as DataFrame
"""
meteo_fpath = os.path.join(self.yr_data_path, station, 'meteorological.txt')
q_fpath = os.path.join(self.yr_data_path, station, 'streamflow_raw.txt')
meteo = pd.read_csv(meteo_fpath)
meteo.index = pd.to_datetime(meteo.pop('date'))
q = pd.read_csv(q_fpath)
q.index = pd.to_datetime(q.pop('date'))
df = pd.concat([meteo, q], axis=1).astype(np.float32)
df.rename(columns=self.dyn_map, inplace=True)
df.index.name = 'time'
df.columns.name = 'dynamic_features'
return df
def _read_dynamic(
self,
stations,
dynamic_features,
st=None,
en=None)->dict:
"""reads dynamic data of one or more catchments located along Yellow River basin
"""
attributes = validate_attributes(dynamic_features, self.dynamic_features)
dyn = {stn: self._read_yr_dynamic_from_csv(stn).loc["19990101": "20201231", attributes] for stn in stations}
# making sure that data for all stations has same dimensions by inserting nans
# and removign duplicates
dyn = {stn:drop_duplicate_indices(data) for stn, data in dyn.items()}
dummy = pd.DataFrame(index=pd.date_range("19990101", "20201231", freq="D"))
dummy.index.name = 'time'
dummy.columns.name = 'dynamic_features'
dyn = {stn: pd.concat([v, dummy], axis=1) for stn, v in dyn.items()}
return dyn
def _static_data(self)->pd.DataFrame:
ds = []
for stn in self.stations():
d = self._read_yr_static(stn)
ds.append(d)
df = pd.concat(ds, axis=1).transpose()
df.rename(columns=self.static_map, inplace=True)
return df
def _read_yr_static(
self,
station:str
)->pd.Series:
"""
Reads catchment attributes data for Yellow River catchments
"""
fpath = os.path.join(self.yr_data_path, station, 'attributes.json')
with open(fpath, 'r') as fp:
data = json.load(fp)
return pd.Series(data, name=station)
def drop_duplicate_indices(df):
return df[~df.index.duplicated(keep='first')]