Source code for aqua_fetch.rr._usgs


import os
import re
import time
import warnings
import requests
from io import StringIO
from datetime import datetime
from typing import List, Union, Dict, Tuple
from requests.exceptions import JSONDecodeError
from requests.exceptions import ContentDecodingError
from concurrent.futures import ProcessPoolExecutor

import numpy as np
import pandas as pd

from .utils import _RainfallRunoff
from .._backend import netCDF4, xarray as xr
from ..utils import get_cpus
from ..utils import validate_attributes
from ._hysets import HYSETS
from ._utils import tw_resampler

from ._map import (
    observed_streamflow_cms,
)

from ._map import (
    catchment_area,
    gauge_latitude,
    gauge_longitude,
    slope
    )

DAILY_START = "1820-01-01"
DAILY_END = "2024-12-31"
HOURLY_START = "1910-01-01"
HOURLY_END = "2024-12-31"


[docs] class USGS(_RainfallRunoff): """ This class handles the hydrometeorological data for the USA. The daily and hourly discharge data is downloaded from `usgs/nwis website <https://waterservices.usgs.gov/nwis/dv>`_ . The data is optionally stored in a netCDF file if xarray is available. Currently the data is downloaded for only those sites/catchments that are in the `HYSETS database <https://doi.org/10.1038/s41597-020-00583-2>`_. This is because the catchment boundaries are taken from HYSETS database using :py:class:`aqua_fetch.HYSETS`. For hourly timestep, "iv" service is used to download the instantaneous data which is then resampled to hourly data. Data with only ``A, [92]``, ``A, [91]``, ``A, [93]``, ``A, e``, ``A`` flags is used. For daily streamflow, "dv" service is used to download the data. In this case, the data with only ``A`` and ``A, e`` flags is used. Examples -------- >>> from aqua_fetch import USGS >>> dataset = USGS() ... # get data by station id >>> _, dynamic = dataset.fetch(stations='01010000', as_dataframe=True) >>> df = dynamic['01010000'] # dynamic is a dictionary of with keys as station names and values as DataFrames >>> df.shape (27028, 20) ... ... # get name of all stations as list >>> stns = dataset.stations() >>> len(stns) 12004 ... # get data of 10 % of stations as dataframe >>> _, dynamic = dataset.fetch(0.1, as_dataframe=True) >>> len(dynamic) # dynamic has data for 10% of stations (1200 out of 12004) 1200 ... ... # dynamic is a dictionary whose values are dataframes of dynamic features >>> [df.shape for df in dynamic.values()] [(27028, 20), (27028, 20), (27028, 20),... (27028, 20), (27028, 20)] ... ... get the data of a single (randomly selected) station >>> _, dynamic = dataset.fetch(stations=1, as_dataframe=True) >>> len(dynamic) # dynamic has data for 1 station 1 ... # get names of available dynamic features >>> dataset.dynamic_features ... # get only selected dynamic features >>> _, dynamic = dataset.fetch('01010000', as_dataframe=True, ... dynamic_features=['pcp_mm', 'snowmelt_mm', 'airtemp_C_2m_min', 'swe_mm', 'q_cms_obs']) >>> dynamic['01010000'].shape (27028, 4) ... ... # get names of available static features >>> dataset.static_features ... # get data of 10 random stations >>> _, dynamic = dataset.fetch(10, as_dataframe=True) >>> len(dynamic) # remember this is a dictionary with values as dataframe 10 ... # If we get both static and dynamic data >>> static, dynamic = dataset.fetch(stations='01010000', static_features="all", as_dataframe=True) >>> static.shape, len(dynamic), dynamic['01010000'].shape ((1, 29), 1, (27028, 20)) ... # If we don't set as_dataframe=True and have xarray installed then the returned data will be a xarray Dataset >>> _, dynamic = dataset.fetch(10) ... type(dynamic) xarray.core.dataset.Dataset ... >>> dynamic.dims FrozenMappingWarningOnValuesAccess({'time': 27028, 'dynamic_features': 20}) ... >>> len(dynamic.data_vars) 10 ... >>> coords = dataset.stn_coords() # returns coordinates of all stations >>> coords.shape (671, 2) >>> dataset.stn_coords('01010000') # returns coordinates of station whose id is 01010000 -69.715556 46.700556 >>> dataset.stn_coords(['01010000', '01010070']) # returns coordinates of two stations ... # get area of a single station >>> dataset.area('01010000') # get coordinates of two stations >>> dataset.area(['01010000', '01010070']) ... # if fiona library is installed we can get the boundary as fiona Geometry >>> dataset.get_boundary('01010000') """
[docs] def __init__( self, path:Union[str, os.PathLike] = None, hysets_path: Union[str, os.PathLike] = None, verbosity:int = 1, **kwargs ): """ Parameters ---------- path : str Path to store the data """ super().__init__(path, verbosity=verbosity, **kwargs) if hysets_path is None: hysets_path = os.path.dirname(self.path) # if os.path.exists(hysets_path): # self.hysets_path = hysets_path if self.verbosity> 1: print(f"hysets_path is {hysets_path}") self.hysets = HYSETS(path = hysets_path, verbosity=verbosity-1) self.hysets_path = self.hysets.path self._stations = self.__stations() self.metadata = maybe_make_and_get_metadata(self.path, self.stations()) self._static_features = self.__static_features() self.bbox = {"llcrnrlat": 22, "urcrnrlat": 75, "llcrnrlon": -168.0, "urcrnrlon": -65.0} self.parallels = np.arange(22, 75, 7) self.meridians = np.arange(-168, -65, 12)
@property def boundary_file(self) -> os.PathLike: return self.hysets.boundary_file @property def static_map(self) -> Dict[str, str]: return { 'Drainage_Area_km2': catchment_area(), 'Centroid_Lat_deg_N': gauge_latitude(), 'Slope_deg': slope('degrees'), 'Centroid_Lon_deg_E': gauge_longitude(), } @property def start(self)->str: return "19500101" @property def end(self)->str: return "20231231" @property def dynamic_features(self)->List[str]: return self.hysets.dynamic_features
[docs] def stations(self)->List[str]: return self._stations
@property def static_features(self)->List[str]: return self._static_features @property def _q_name(self)->str: return observed_streamflow_cms()
[docs] def area( self, stations: Union[str, List[str]] = 'all', ) ->pd.Series: """ Returns area_gov (Km2) of all catchments as :obj:`pandas.Series` parameters ---------- stations : str/list name/names of stations. Default is None, which will return area of all stations Returns -------- pd.Series a :obj:`pandas.Series` whose indices are catchment ids and values are areas of corresponding catchments. Examples --------- >>> from aqua_fetch import USGS >>> dataset = USGS() >>> dataset.area() # returns area of all stations >>> dataset.area('912101A') # returns area of station whose id is 912101A >>> dataset.area(['912101A', '12388200']) # returns area of two stations """ stations = validate_attributes(stations, self.stations(), 'stations') area = self.metadata['drain_area_va'] area.name = 'area_km2' return area.loc[stations]
[docs] def fetch_static_features( self, stations: Union[str, List[str]] = "all", static_features:Union[str, List[str]] = "all" ) -> pd.DataFrame: """ returns static atttributes of one or multiple stations Parameters ---------- stations : str name/id of station of which to extract the data static_features : list/str, optional (default="all") The name/names of features to fetch. By default, all available static features are returned. Examples --------- >>> from aqua_fetch import USGS >>> dataset = USGS() get the names of stations >>> stns = dataset.stations() >>> len(stns) 12004 get all static data of all stations >>> static_data = dataset.fetch_static_features(stns) >>> static_data.shape (12004, 27) get static data of one station only >>> static_data = dataset.fetch_static_features('01010070') >>> static_data.shape (1, 27) get the names of static features >>> dataset.static_features get only selected features of all stations >>> static_data = dataset.fetch_static_features(stns, ['area_km2', 'Elevation_m']) >>> static_data.shape (12004, 2) """ if isinstance(static_features, str) and static_features != 'all': # we want Official_ID because that will be used as index later on static_features = [static_features, 'Official_ID'] stations = validate_attributes(stations, self.stations(), 'stations') map_ = self.hysets.OfficialID_WatershedID_map stations = [map_[stn] for stn in stations] static_feats = self.hysets.fetch_static_features(stations, static_features) static_feats.set_index('Official_ID', inplace=True) return static_feats
[docs] def stn_coords( self, stations:Union[str, List[str]] = 'all' ) ->pd.DataFrame: """ returns coordinates of stations as DataFrame with ``long`` and ``lat`` as columns. Parameters ---------- stations : name/names of stations. If not given, coordinates of all stations will be returned. Returns ------- pd.DataFrame :obj:`pandas.DataFrame` with ``long`` and ``lat`` columns. The length of dataframe will be equal to number of stations wholse coordinates are to be fetched. Examples -------- >>> dataset = USGS() >>> dataset.stn_coords() # returns coordinates of all stations >>> dataset.stn_coords('01010000') # returns coordinates of station whose id is 912101A >>> dataset.stn_coords(['01010000', '01010070']) # returns coordinates of two stations """ stations = validate_attributes(stations, self.stations(), 'stations') coords = self.metadata.loc[:, ['dec_long_va', 'dec_lat_va']] coords.rename(columns={'dec_long_va': 'long', 'dec_lat_va': 'lat'}, inplace=True) return coords.loc[stations, :]
[docs] def fetch_stations_features( self, stations: list, dynamic_features: Union[str, list, None] = 'all', static_features: Union[str, list, None] = None, st=None, en=None, as_dataframe: bool = False, **kwargs ) -> Tuple[pd.DataFrame, Union[Dict[str, pd.DataFrame], "Dataset"]]: """ returns features of multiple stations Examples -------- >>> from aqua_fetch import USGS >>> dataset = USGS() >>> stations = dataset.stations()[0:3] >>> features = dataset.fetch_stations_features(stations) """ stations = validate_attributes(stations, self.stations(), 'stations') static, dynamic = None, None if xr is None: if not as_dataframe: if self.verbosity: warnings.warn("xarray module is not installed so as_dataframe will have no effect. " "Dynamic features will be returned as pandas DataFrame") as_dataframe = True # todo : reading data for 500 stations is taking ~ 10 minutes which is not sustainable if dynamic_features is not None: dynamic = self._fetch_dynamic_features(stations=stations, dynamic_features=dynamic_features, as_dataframe=as_dataframe, st=st, en=en, **kwargs ) if static_features is not None: # we want both static and dynamic static = self._fetch_static_features(station=stations, static_features=static_features ) elif static_features is not None: # we want only static static = self._fetch_static_features( station=stations, static_features=static_features ) else: raise ValueError(f"static features are {static_features} and dynamic features are {dynamic_features}") return static, dynamic
def _fetch_dynamic_features( self, stations: list, dynamic_features = 'all', st=None, en=None, as_dataframe=False ): """Fetches dynamic features of station.""" st, en = self._check_length(st, en) features = validate_attributes(dynamic_features, self.dynamic_features.copy(), 'dynamic_features') if self.verbosity>2: print(f"fetching {len(features)} dynamic features for {len(stations)} stations from {st} to {en}") daily_q = None if observed_streamflow_cms() in features: # todo: we are reading all file even for one station daily_q = self._q(as_dataframe) if isinstance(daily_q, xr.Dataset): daily_q = daily_q.sel(time=slice(st, en))[stations] else: daily_q = daily_q.loc[st:en, stations] features.remove(observed_streamflow_cms()) if len(features) == 0: if isinstance(daily_q, pd.DataFrame): daily_q = {stn:pd.DataFrame(daily_q[stn].rename(observed_streamflow_cms())) for stn in daily_q.columns} return daily_q karte = self.hysets.OfficialID_WatershedID_map stations_ = [int(karte[stn]) for stn in stations] data = self.hysets._fetch_dynamic_features(stations_, features, st, en, as_dataframe) if daily_q is not None: if isinstance(daily_q, xr.Dataset): assert isinstance(data, xr.Dataset), "xarray dataset not supported" # todo : why shoule we not subtract 1 karte = {str(int(k)):v for k,v in self.hysets.WatershedID_OfficialID_map.items() if v in stations} data = data.rename(karte) # first create a new dimension in daily_q named dynamic_features daily_q = daily_q.expand_dims({'dynamic_features': [observed_streamflow_cms()]}) data = xr.concat([data, daily_q], dim='dynamic_features') else: # todo : -1 ? #data.rename(columns={str(int(k)):v for k,v in self.hysets.WatershedID_OfficialID_map.items()}, inplace=True) assert isinstance(data, dict) # rename the keys in data using self.hysets.WatershedID_OfficialID_map data = {self.hysets.WatershedID_OfficialID_map.get(k, k): v for k, v in data.items()} for k,meteo_df in data.items(): stn_df = pd.concat([meteo_df, daily_q[k].rename(observed_streamflow_cms())], axis=1) data[k] = stn_df else: if isinstance(data, xr.Dataset): karte = {str(int(k)):v for k,v in self.hysets.WatershedID_OfficialID_map.items() if v in stations} data = data.rename(karte) else: assert isinstance(data, dict) # rename the keys in data using self.hysets.WatershedID_OfficialID_map data = {self.hysets.WatershedID_OfficialID_map.get(k, k): v for k, v in data.items()} return data def _fetch_static_features( self, station="all", static_features: Union[str, list] = 'all' )->pd.DataFrame: """Fetches static features of station.""" if self.verbosity>1: print('fetching static features') stations = validate_attributes(station, self.stations(), 'stations') map_ = self.hysets.OfficialID_WatershedID_map stations = [map_[stn] for stn in stations] static_feats = self.hysets.fetch_static_features(stations, static_features).copy() static_feats = static_feats.set_index('Official_ID') return static_feats def __static_features(self)->List[str]: if self.verbosity>1: print('getting static features from hysets') static_feats = self.hysets.static_features static_feats.remove('Official_ID') return static_feats def _q(self, as_dataframe:bool=None, read_csv_kwargs:dict=None): if netCDF4 is None: ext = 'csv' else: ext = 'nc' if self.timestep.lower().startswith('d'): fname = f'daily_q.{ext}' else: fname = f'hourly_q.{ext}' if self.verbosity>1: print(f'reading {fname}') fpath = os.path.join(self.path, fname) if not os.path.exists(fpath): print(f"{fpath} not found. Downloading data storing it in {fpath}") self._make_csv(cpus=None) if ext == 'csv': return pd.read_csv(fpath, index_col=0, **read_csv_kwargs) else: ds = xr.open_dataset(fpath) if as_dataframe: return ds['q_cms_obs'].to_pandas() ds = ds['q_cms_obs'].assign_coords(stations=ds['stations']).to_dataset(dim='stations') return ds def __stations(self)->List[str]: if self.verbosity>1: print('getting stations') dataset = self._q(read_csv_kwargs=dict(nrows=2)) if xr is not None: if isinstance(dataset, xr.Dataset): # get names of all variables return list(dataset.data_vars.keys()) #return [stn for stn in dataset.stations.data] else: return dataset.columns.tolist() return dataset.columns.tolist() def _make_csv( self, cpus:int = None, ): df = pd.read_csv( os.path.join(self.hysets_path, "HYSETS_watershed_properties.txt"), sep=",") sites = df.loc[df['Source']=='USGS']['Official_ID'] if cpus is None: cpus = max(get_cpus() - 2, 1) if not os.path.exists(self.path): os.makedirs(self.path) make_daily_q(self.path, sites, cpus=cpus) if self.verbosity: print(f"Downloaded daily data and stored at {self.path}/daily_q.nc") #make_hourly_q(self.path, sites[9000:10000], cpus=cpus) #print(f"Downloaded hourly data and stored in {self.path}/hourly_q.nc") return
def download_metadata( site:str )->pd.DataFrame: try: metadata = _download_metadata(site) except (ValueError, IndexError): print(f"Site: {site} ValueError/IndexError") # create dataframe with nan values metadata = pd.DataFrame( np.array([np.nan, np.nan, np.nan]).reshape(1,3), columns=['dec_lat_va', 'dec_long_va', 'drain_area_va'] ) metadata = metadata[['dec_lat_va', 'dec_long_va', 'drain_area_va']] metadata.index = [site] return metadata def maybe_make_and_get_metadata( path:str, sites:List[str], cpus:int=None )->pd.DataFrame: cpus = max(get_cpus()-2, 1) if cpus is None else cpus fpath = os.path.join(path, 'metadata.csv') if os.path.exists(fpath): return pd.read_csv( fpath, index_col='site_no', dtype={'site_no': str, "dec_lat_va": float, "dec_long_va": float, "drain_area_va": float}) print(f"Downloading metadata for {len(sites)} sites using {cpus} cpus") start = time.time() with ProcessPoolExecutor(max_workers=cpus) as executor: data = executor.map(download_metadata, sites) total = round((time.time() - start)/60, 2) print(f"Time taken to download metadata: {total} mins with {cpus} cpus") start = time.time() metadata = pd.concat(list(data)) # convert drain_area_va from sq miles to sq km metadata['drain_area_va'] = metadata['drain_area_va'] * 2.58999 metadata.index.name = "site_no" metadata.to_csv(fpath, index_label='site_no') return metadata def make_daily_q( path:str, sites:List[str], start = "1900-01-01", end = None, cpus:int=None, out_fname:str = "daily_q", # dron't drop stations with all nans by default # because self.stations() or self.stn_coords() may fail drop_all_nan_stns:bool=False, save_stns_as_data_vars:bool=False, verbosity:int=1 ): if netCDF4 is None: out_fname += '.csv' else: out_fname += '.nc' assert os.path.exists(path), f"{path} does not exist" if cpus is None: cpus = max(get_cpus()-2, 1) if end is None: end = datetime.today().strftime("%Y-%m-%d") out_fpath = os.path.join(path, out_fname) if verbosity: print(f"Downloading daily data for {len(sites)} sites using {cpus} cpus") if verbosity: print(f"Final file will be stored in {out_fpath}") start_c = time.time() with ProcessPoolExecutor(max_workers=cpus) as executor: data = executor.map(download_daily_record, sites, [start]*len(sites), [end]*len(sites)) data = pd.concat(list(data), axis=1) data.index = pd.to_datetime(data.index) data.index.name = 'time' total = round((time.time() - start_c)/60, 2) if verbosity: print(f"Time taken to download data: {total} mins with {cpus} cpus") # find out columns with all nans all_nan_cols = data.columns[data.isna().all()].tolist() if len(all_nan_cols) > 0: print(f"found {len(all_nan_cols)} stations with all nans") if drop_all_nan_stns: data = data.drop(columns=all_nan_cols) start_c = time.time() if netCDF4 is None: save_daily_q_as_csv(path, data, out_fname=out_fname) else: save_daily_q_as_nc(path, data, out_fname, save_stns_as_data_vars) #save_daily_q_as_csv(path, data) total = round((time.time() - start_c)/60, 2) print(f"Time taken: to write {total} mins for {data.shape[1]} stations") return data def save_daily_q_as_csv(path, data): df = pd.DataFrame(data) df.to_csv(os.path.join(path, 'daily_q.csv'), index=True, index_label='time') return def save_daily_q_as_nc( path, data:pd.DataFrame, out_fname:str = "daily_q.nc", save_stns_as_data_vars:bool=False, ): from netCDF4 import Dataset, date2num start_yr = data.index[0].year if save_stns_as_data_vars: ncfile = Dataset(os.path.join(path, out_fname), mode='w', format='NETCDF4') _ = ncfile.createDimension('time', None) # unlimited dimension new_idx = pd.date_range(start=data.index[0], end=data.index[-1], freq='D') time_var = ncfile.createVariable('time', 'f8', ('time',)) time_var.units = f'days since {start_yr}-01-01 00:00:00' time_var.calendar = 'gregorian' time_var[:] = date2num(new_idx.to_pydatetime(), units=time_var.units, calendar=time_var.calendar) for idx, ts in enumerate(data): # create a variable for each column col_var = ncfile.createVariable(str(ts.name), datatype='f4', dimensions=('time',), complevel=2, compression='zlib', shuffle=True, least_significant_digit=4, chunksizes=(data.shape[0],) ) # ts may have different index than new_idx/tim_demension, so we need to reindex new_ts = ts.reindex(index=new_idx) col_var[:] = new_ts.values col_var.units = "cms" col_var.description = "daily discharge" if idx % 100 == 0: print(f"Saved data for {idx} sites in .nc file") ncfile.close() else: # saving as a single variable with stations as dimension (appears to be more efficient i/o wise) out_path = os.path.join(path, out_fname) with Dataset(out_path, "w", format="NETCDF4") as nc: # create the dimensions which will be used to create variable _ = nc.createDimension('time', data.shape[0]) _ = nc.createDimension('stations', data.shape[1]) col_var = nc.createVariable(observed_streamflow_cms(), datatype=np.float32, dimensions=('time', 'stations'), complevel=4, compression='zlib', shuffle=True, least_significant_digit=4, # chunk size equal to data for one station chunksizes=(data.shape[0], 1) ) col_var[:] = data.values col_var.description = "daily discharge" col_var.units = "cms" station_var = nc.createVariable('stations', str, ('stations',)) station_var[:] = np.array(data.columns).astype(str) # Create time variable with CF-compliant encoding time_var = nc.createVariable('time', 'i4', ('time',)) time_var.calendar = 'standard' time_var.long_name = 'time' time_var.standard_name = 'time' time_var.axis = 'T' time_strings = data.index.strftime("%Y%j").tolist() # Convert time strings from YYYYJJJ to YYYYMMDD format datetime_objects = [datetime.strptime(ts, "%Y%j") for ts in time_strings] time_var.units = f'days since {start_yr}-01-01 00:00:00' # Convert to days since reference date reference_date = datetime(start_yr, 1, 1) time_values = [(dt - reference_date).total_seconds() / 86400 for dt in datetime_objects] time_var[:] = time_values return def _read_json(json): """ Following code is taken and modified after dataretrieval.utils Reads a NWIS Water Services formatted JSON into a ``pandas.DataFrame``. Parameters ---------- json: dict A JSON dictionary response to be parsed into a ``pandas.DataFrame`` Returns ------- df: ``pandas.DataFrame`` Times series data from the NWIS JSON md: :obj:`dataretrieval.utils.Metadata` A custom metadata object """ merged_df = pd.DataFrame(columns=["site_no", "datetime"]) site_list = [ ts["sourceInfo"]["siteCode"][0]["value"] for ts in json["value"]["timeSeries"] ] # create a list of indexes for each change in site no # for example, [0, 21, 22] would be the first and last indeces index_list = [0] index_list.extend( [i + 1 for i, (a, b) in enumerate(zip(site_list[:-1], site_list[1:])) if a != b] ) index_list.append(len(site_list)) for i in range(len(index_list) - 1): start = index_list[i] # [0] end = index_list[i + 1] # [21] # grab a block containing timeseries 0:21, # which are all from the same site site_block = json["value"]["timeSeries"][start:end] if not site_block: continue site_no = site_block[0]["sourceInfo"]["siteCode"][0]["value"] site_df = pd.DataFrame(columns=["datetime"]) for timeseries in site_block: param_cd = timeseries["variable"]["variableCode"][0]["value"] # check whether min, max, mean record XXX option = timeseries["variable"]["options"]["option"][0].get("value") # loop through each parameter in timeseries, then concat to the merged_df for parameter in timeseries["values"]: col_name = param_cd method = parameter["method"][0]["methodDescription"] # if len(timeseries['values']) > 1 and method: if method: # get method, format it, and append to column name method = method.strip("[]()").lower() col_name = f"{col_name}_{method}" if option: col_name = f"{col_name}_{option}" record_json = parameter["value"] if not record_json: # no data in record continue # should be able to avoid this by dumping record_json = str(record_json).replace("'", '"') # read json, converting all values to float64 and all qualifiers # Lists can't be hashed, thus we cannot df.merge on a list column record_df = pd.read_json( StringIO(record_json), orient="records", dtype={"value": "float64", "qualifiers": "unicode"}, convert_dates=False, ) record_df["qualifiers"] = ( record_df["qualifiers"].str.strip("[]").str.replace("'", "") ) record_df.rename( columns={ "value": col_name, "dateTime": "datetime", "qualifiers": col_name + "_cd", }, inplace=True, ) site_df = site_df.merge(record_df, how="outer", on="datetime") # end of site loop site_df["site_no"] = site_no merged_df = pd.concat([merged_df, site_df]) # convert to datetime, normalizing the timezone to UTC when doing so if "datetime" in merged_df.columns: merged_df["datetime"] = pd.to_datetime(merged_df["datetime"], utc=True) return merged_df def download_daily_q_nwis( site:str = '14105700', start = '1900-01-01', end='2024-12-31' )->pd.DataFrame: response = requests.get( "https://waterservices.usgs.gov/nwis/dv", params={'format': 'json', 'parameterCd': '00060', 'sites': site, 'startDT': start, 'endDT': end, 'multi_index': None}, headers={"user-agent": f"python-dataretrieval/1.0.11"}, verify=True) if response.status_code in [400, 404, 414]: raise ValueError(f"Bad Request, check that your parameters are correct. URL: {response.url}") try: site_data = _read_json(response.json()) except JSONDecodeError: site_data = pd.DataFrame() print(f"Site: {site} JSONDecodeError") if 'datetime' in site_data.columns: site_data.index = pd.to_datetime(site_data.pop('datetime')) return site_data def download_daily_record( site:str, start = "1900-01-01", end:str = None, )->pd.Series: # todo: should we store the data in csv files so that we don't have to download it again? # but using 110 cores for all 12004 sites takes around 5 minutes which is not that bad # given that it has to happen only once! #fpath = os.path.join(path, f"{site}.csv") # if os.path.exists(fpath): # return pd.read_csv(fpath, index_col=0) site_data = download_daily_q_nwis(site, start=start, end=end, ) if f'00060_Mean' in site_data.columns: # get data for stations which have A in 00060_Mean_cd column site_data = site_data[site_data['00060_Mean_cd'].isin(["A", "A, e"])] site_data = site_data['00060_Mean'] elif '00060_upstream site_Mean' in site_data.columns: site_data = site_data[site_data['00060_upstream site_Mean_cd'].isin(["A", "A, e"])] site_data = site_data['00060_upstream site_Mean'] elif "00060_upstream of dam_Mean" in site_data.columns: # "01399100" has this column site_data = site_data[site_data['00060_upstream of dam_Mean_cd'].isin(["A", "A, e"])] site_data = site_data["00060_upstream of dam_Mean"] elif "00060_2_Mean" in site_data.columns: # "01401000" has this column site_data = site_data[site_data['00060_2_Mean_cd'].isin(["A", "A, e"])] site_data = site_data["00060_2_Mean"] elif "00060_published_Mean" in site_data.columns: # "02129590" has this column site_data = site_data[site_data['00060_published_Mean_cd'].isin(["A", "A, e"])] site_data = site_data["00060_published_Mean"] elif len(site_data) == 0: # return empty series site_data = pd.Series(name=f"{site}__empty", index = pd.date_range(start="2024-01-01", end="2024-12-31", freq='D') ) else: #print(f"Site: {site} has {site_data.columns}") site_data = pd.Series(name=f"{site}__empty", index = pd.date_range(start="2024-01-01", end="2024-12-31", freq='D') ) site_data.name = site if site_data.index.tz is not None: site_data.index = site_data.index.tz_convert("UTC").tz_localize(None) return site_data * 0.028316847 # convert cfs to cms def format_response( df: pd.DataFrame, **kwargs ) -> pd.DataFrame: """Setup index for response from query. This function formats the response from the NWIS web services, in particular it sets the index of the data frame. This function tries to convert the NWIS response into pandas datetime values localized to UTC, and if possible, uses these timestamps to define the data frame index. Parameters ---------- df: ``pandas.DataFrame`` The data frame to format service: string, optional, default is None The NWIS service that was queried, important because the 'peaks' service returns a different format than the other services. **kwargs: optional Additional keyword arguments, e.g., 'multi_index' Returns ------- df: ``pandas.DataFrame`` The formatted data frame """ mi = kwargs.pop("multi_index", True) # check for multiple sites: if "datetime" not in df.columns: # XXX: consider making site_no index return df elif len(df["site_no"].unique()) > 1 and mi: # setup multi-index df.set_index(["site_no", "datetime"], inplace=True) if hasattr(df.index.levels[1], "tzinfo") and df.index.levels[1].tzinfo is None: df = df.tz_localize("UTC", level=1) else: df.set_index(["datetime"], inplace=True) if hasattr(df.index, "tzinfo") and df.index.tzinfo is None: df = df.tz_localize("UTC") return df.sort_index() def _read_rdb(rdb): """ The following code is taken and modified after dataretrieval github repo Convert NWIS rdb table into a ``pandas.dataframe``. Parameters ---------- rdb: string A string representation of an rdb table Returns ------- df: ``pandas.dataframe`` A formatted pandas data frame """ count = 0 for line in rdb.splitlines(): # ignore comment lines if line.startswith("#"): count = count + 1 else: break fields = re.split("[\t]", rdb.splitlines()[count]) fields = [field.replace(",", "") for field in fields] dtypes = { "site_no": str, "dec_long_va": float, "dec_lat_va": float, "parm_cd": str, "parameter_cd": str, } df = pd.read_csv( StringIO(rdb), delimiter="\t", skiprows=count + 2, names=fields, na_values="NaN", dtype=dtypes, ) df = format_response(df) return df def _download_metadata(site:str)->pd.DataFrame: response = requests.get( 'https://waterservices.usgs.gov/nwis/site', params={'sites': site, 'parameterCd': '00060', 'siteOutput': 'Expanded', 'format': 'rdb'}, headers={'user-agent': 'python-dataretrieval/1.0.11'}, verify=True ) return _read_rdb(response.text) def download_hourly_q_nwis( site:str = "09246200", start = '1995-08-11', end='1995-08-12' )->pd.DataFrame: response = requests.get( "https://waterservices.usgs.gov/nwis/iv", params={'format': 'json', 'parameterCd': '00060', 'sites': site, 'startDT': start, 'endDT': end, 'multi_index': None, }, headers={"user-agent": f"python-dataretrieval/1.0.11"}, verify=True) if response.status_code in [400, 404, 414]: raise ValueError(f"Bad Request, check that your parameters are correct. URL: {response.url}") try: site_data = _read_json(response.json()) except JSONDecodeError: site_data = pd.DataFrame() print(f"Site: {site} JSONDecodeError") if 'datetime' in site_data.columns: site_data.index = pd.to_datetime(site_data.pop('datetime')) return site_data def download_hourly_record( site:str, path:str, start:str = "1979-02-11", end:str = None, overwrite:bool=False, save_empty_file:bool=False )->pd.Series: fpath = os.path.join(path, f"{site}.csv") if os.path.exists(fpath) and not overwrite: site_data = pd.read_csv(fpath, index_col=0, parse_dates=True, dtype={site:'float32'}) site_data = site_data[site] site_data.name = site return site_data try: site_data = download_hourly_q_nwis( site, start=start, end=end, ) except (JSONDecodeError, ContentDecodingError, TypeError): site_data = pd.Series( index = pd.date_range(start="2024-01-01", end="2024-01-30", freq='h') ) if len(site_data) == 0: # return empty series site_data = pd.Series( index = pd.date_range(start="2024-01-01", end="2024-01-02", freq='h') ) elif isinstance(site_data, pd.DataFrame) and site_data.columns[1] == '00060': site_data = site_data[site_data['00060_cd'].isin(['A, [92]', 'A, [91]', 'A, [93]', 'A, e', 'A'])] site_data = site_data['00060'].resample('h').apply(lambda subdata: tw_resampler(subdata, site_data['00060'].sort_index())) else: site_data = pd.Series( index = pd.date_range(start="2024-01-01", end="2024-01-02", freq='h')) site_data.name = site # site_data for some stations is not tz aware, so we make it tz aware first # and then convert to UTC and remove tz info so that the csv files can be # joined later without any issues if site_data.index.tzinfo is not None: site_data.index = site_data.index.tz_convert("UTC").tz_localize(None) site_data = site_data * 0.028316847 # convert cfs to cms site_data = site_data.astype('float32') site_data = site_data.sort_index() if len(site_data.dropna()) > 0 or save_empty_file: site_data.to_csv(fpath, index=True, index_label='time') return site_data