Source code for aqua_fetch.wq._oligotrend


__all__ = ["Oligotrend"]

import os
from typing import Union, List, Dict

import pandas as pd
import numpy as np

from .._datasets import Datasets
from ..utils import validate_attributes


[docs] class Oligotrend(Datasets): """ A global database of multi-decadal (1986-2023) timeseries of chlorophyll-a and 16 others including N and P, from 1846 unique monitoring locations across estuaries (n=238), lakes (n=687), and rivers (969). The datasets consists of 4.3 million observations and most timeseries cover the period 1986-2022 and comprise at least 15 years of Chl-a observations. For more details, see `Minaudo et al., 2025 <https://doi.org/10.5194/essd-17-3411-2025>_`. The data is fetched from `EDI data portal <https://portal.edirepository.org/nis/mapbrowse?packageid=edi.1778.3>`_. Examples -------- >>> from aqua_fetch import Oligotrend >>> ds = Oligotrend(path='/path/to/data') get names of parameters in the dataset >>> ds.parameters() >>> len(ds.parameters()) 17 get list of stations in the dataset >>> ds.stations() >>> len(ds.stations()) 1846 >>> len(ds.lakes()) 685 >>> len(ds.rivers()) 924 >>> len(ds.estuaries()) 237 get parameters of a single station >>> data = ds.fetch_stn_parameters('lake_atlanticoceanseaboard_usa12721') >>> data.shape (303, 3) get all parameters for specific stations >>> data = ds.fetch_stns_parameters(['river_ebro_9027', 'river_elbe_elbe_10']) >>> data['river_ebro_9027'].shape (287, 8) >>> data['river_elbe_elbe_10'].shape (8154, 12) Get only 'chla' parameter for the stations >>> data1 = ds.fetch_stns_parameters(['river_ebro_9027', 'river_elbe_elbe_10'], ... parameters=['chla']) >>> data1['river_ebro_9027'].shape (177, 1) >>> data1['river_elbe_elbe_10'].shape (413, 1) """ # todo : why stations are 1846 and not 1894 as mentioned in the paper? url = { 'data_sources_oligotrend.csv': "https://pasta.lternet.edu/package/data/eml/edi/1778/3/c828f5056b9c46b8e120bc2c9406de05", 'oligotrend_L1.csv': "https://pasta.lternet.edu/package/data/eml/edi/1778/3/cc48f89ff50a51a6e9dbf9e35fc16c20", "oligotrend_L1_xy_gis.csv": "https://pasta.lternet.edu/package/data/eml/edi/1778/3/4fb9081418e2f8112f094bb284197dde", "oligotrend_L2_trends.csv": "https://pasta.lternet.edu/package/data/eml/edi/1778/3/00446b372fabf8e97bb23c2d47482a0d", }
[docs] def __init__(self, path=None, **kwargs): super().__init__(path=path, **kwargs) self.ds_dir = path self._download() self._stations = self.gis_data().index.tolist() self._parameters = self.l1_data()['variable'].unique().tolist()
[docs] def stn_coords(self)->pd.DataFrame: """ Returns the coordinates of all the stations in the dataset in wgs84 projection. Returns ------- pd.DataFrame A dataframe with columns 'lat', 'long' """ coords = self.gis_data()[['Y', 'X']] coords.columns = ['lat', 'long'] return coords
[docs] def parameters(self)->List[str]: """ Returns the list of names of parameters in the dataset. """ return self._parameters
[docs] def stations(self)->List[str]: """ returns the list of stations in the dataset """ return self._stations
[docs] def lakes(self)->List[str]: """ Returns the list of stations which are lakes in the dataset. """ df = self.gis_data() return df.loc[df['ecsystm'] == 'lake', :].index.tolist()
[docs] def rivers(self)->List[str]: """ Returns the list of stations which are rivers in the dataset. """ df = self.gis_data() return df.loc[df['ecsystm'] == 'river', :].index.tolist()
[docs] def estuaries(self)->List[str]: """ Returns the list of stations which are estuaries in the dataset. """ df = self.gis_data() return df.loc[df['ecsystm'] == 'estuary', :].index.tolist()
[docs] def sources(self): """ Returns the sources of the dataset. """ df = pd.read_csv(os.path.join(self.path, 'data_sources_oligotrend.csv')) return df
[docs] def gis_data(self)->pd.DataFrame: """ Returns the GIS data of the dataset. """ df = pd.read_csv(os.path.join(self.path, 'oligotrend_L1_xy_gis.csv'), index_col='uniqID') return df
[docs] def l1_data(self)->pd.DataFrame: """ Returns the oligotrend_L1.csv file and returns as dataframe of shape 5056630, 7. """ df = pd.read_csv(os.path.join(self.path, 'oligotrend_L1.csv'), dtype={ 'basin': str, 'ecosystm': str, 'id': str, 'variable': str, 'value': np.float32, 'flat': int, }, parse_dates=['date'], date_format=lambda x: pd.to_datetime(x, format='%Y-%m-%d') ) # the vlaues in id column have .csv at the end, we remove it df['id'] = df['id'].str.replace('.csv', '', regex=False) return df
[docs] def fetch_stns_parameters( self, stns: Union[str, List[str]], parameters: Union[str, List[str]] = "all", )-> Dict[str, pd.DataFrame]: """ Fetches the parameters for the given stations. Parameters ---------- stns : str or list of str The station(s) to fetch the parameters for. parameters : str or list of str, optional The parameter(s) to fetch. If 'all', all parameters are fetched. Returns ------- Dict[str, pd.DataFrame] A dictionary with the station id as key and a dataframe of parameters as value. Examples -------- >>> data = ds.fetch_stns_parameters(['river_ebro_9027', 'river_elbe_elbe_10']) >>> data['river_ebro_9027'].shape (287, 8) >>> data['river_elbe_elbe_10'].shape (8154, 12) >>> data = ds.fetch_stns_parameters(['river_ebro_9027', 'river_elbe_elbe_10'], 'chla') >>> data['river_ebro_9027'].shape (177, 1) >>> data['river_elbe_elbe_10'].shape (413, 1) """ df = self.l1_data() stns = validate_attributes(stns, self.stations(), 'stns') parameters = validate_attributes(parameters, self.parameters(), 'parameters') cond1 = df['id'].isin(stns) cond2 = df['variable'].isin(parameters) df = df.loc[cond1 & cond2, :] # make a dictionary with the station id as key and a dataframe of parameters as value results = {} for stn in stns: stn_df = df.loc[df['id'] == stn, ['variable', 'date', 'value']] # for some date, variable paris there are more than 1 value which means there are duplicates # for example for stn river_ebro_9027 for chla on date 1991-07-01, there are 2 values # so we have use aggfunc, ideally we should not have duplicates stn_df = stn_df.pivot_table(index='date', columns='variable', values='value', aggfunc='mean') results[stn] = stn_df return results
[docs] def fetch_stn_parameters( self, stn:str, parameters: Union[str, List[str]] = "all", ): """ Examples -------- >>> stn_df = ds.fetch_stn_parameters('lake_atlanticoceanseaboard_usa12721') >>> stn_df.shape (303, 3) """ assert isinstance(stn, str), "stn should be a string" df = self.l1_data() stn = validate_attributes(stn, self.stations(), 'stn') parameters = validate_attributes(parameters, self.parameters(), 'parameters') cond1 = df['id'].isin(stn) cond2 = df['variable'].isin(parameters) df = df.loc[cond1 & cond2, :] df = df[['variable', 'date', 'value']].pivot(index='date', columns='variable', values='value') return df
[docs] def get_stations( self, parameter:str, ecosystm: str = 'river', ) -> pd.Series: """ Returns a list of stations that have the specified parameter. Examples -------- >>>> chla_stns = ds.get_stations('chla') >>>> len(chla_stns) 969 """ df = self.l1_data() cond1 = df['variable'] == parameter cond2 = df['ecosystem'] == ecosystm return df.loc[cond1 & cond2, 'id'].unique().tolist()
def num_obs(self, parameter:str): assert isinstance(parameter, str), "parameter should be a string" assert parameter in self.parameters(), f"parameter {parameter} not found in {self.parameters()}" l1_data = self.l1_data() obs_counts = l1_data.groupby(['id', 'variable']).size().reset_index(name='count') para_counts = obs_counts.loc[obs_counts['variable']==parameter] # use the 'id' column as index para_counts = para_counts.set_index('id') para_counts.pop('variable') return para_counts.loc[self.stations(), 'count']