Source code for argopy.data_fetchers.erddap_data

# -*- coding: utf-8 -*-

"""
argopy.data_fetchers.erddap
~~~~~~~~~~~~~~~~~~~~~~~~~~~

This module contains Argo data fetcher for Ifremer ERDDAP.

This is not intended to be used directly, only by the facade at fetchers.py

"""

import pandas as pd
import numpy as np
import copy

from abc import abstractmethod
import getpass

from .proto import ArgoDataFetcherProto
from argopy.options import OPTIONS
from argopy.utilities import list_standard_variables, Chunker, format_oneline
from argopy.stores import httpstore
from ..errors import ErddapServerError
from aiohttp import ClientResponseError


# Load erddapy according to available version (breaking changes in v0.8.0)
try:
    from erddapy import ERDDAP
    from erddapy.utilities import parse_dates, quote_string_constraints
except:  # noqa: E722
    # >= v0.8.0
    from erddapy.erddapy import ERDDAP
    from erddapy.erddapy import _quote_string_constraints as quote_string_constraints
    from erddapy.erddapy import parse_dates


access_points = ['wmo', 'box']
exit_formats = ['xarray']
dataset_ids = ['phy', 'ref', 'bgc']  # First is default
api_server = 'https://erddap.ifremer.fr/erddap/'  # API root url
api_server_check = api_server + '/info/ArgoFloats/index.json'  # URL to check if the API is alive


[docs]class ErddapArgoDataFetcher(ArgoDataFetcherProto): """ Manage access to Argo data through Ifremer ERDDAP ERDDAP transaction are managed with the erddapy library This class is a prototype not meant to be instantiated directly """ ### # Methods to be customised for a specific erddap request ### @abstractmethod def init(self, *args, **kwargs): """ Initialisation for a specific fetcher """ raise NotImplementedError("ErddapArgoDataFetcher.init not implemented") @abstractmethod def define_constraints(self): """ Define erddapy constraints """ raise NotImplementedError("ErddapArgoDataFetcher.define_constraints not implemented") @property @abstractmethod def uri(self) -> list: """ Return the list of Unique Resource Identifier (URI) to download data """ raise NotImplementedError("ErddapArgoDataFetcher.uri not implemented") ### # Methods that must not change ###
[docs] def __init__( self, ds: str = "", cache: bool = False, cachedir: str = "", parallel: bool = False, parallel_method: str = "thread", progress: bool = False, chunks: str = "auto", chunks_maxsize: dict = {}, api_timeout: int = 0, **kwargs, ): """ Instantiate an ERDDAP Argo data fetcher Parameters ---------- ds: str (optional) Dataset to load: 'phy' or 'ref' or 'bgc' cache: bool (optional) Cache data or not (default: False) cachedir: str (optional) Path to cache folder parallel: bool (optional) Chunk request to use parallel fetching (default: False) parallel_method: str (optional) Define the parallelization method: ``thread``, ``process`` or a :class:`dask.distributed.client.Client`. progress: bool (optional) Show a progress bar or not when ``parallel`` is set to True. chunks: 'auto' or dict of integers (optional) Dictionary with request access point as keys and number of chunks to create as values. Eg: {'wmo': 10} will create a maximum of 10 chunks along WMOs when used with ``Fetch_wmo``. chunks_maxsize: dict (optional) Dictionary with request access point as keys and chunk size as values (used as maximum values in 'auto' chunking). Eg: {'wmo': 5} will create chunks with as many as 5 WMOs each. api_timeout: int (optional) Erddap request time out in seconds. Set to OPTIONS['api_timeout'] by default. """ timeout = OPTIONS["api_timeout"] if api_timeout == 0 else api_timeout self.fs = httpstore(cache=cache, cachedir=cachedir, timeout=timeout, size_policy='head') self.definition = "Ifremer erddap Argo data fetcher" self.dataset_id = OPTIONS["dataset"] if ds == "" else ds self.server = api_server if not isinstance(parallel, bool): parallel_method = parallel parallel = True if parallel_method not in ["thread"]: raise ValueError( "erddap only support multi-threading, use 'thread' instead of '%s'" % parallel_method ) self.parallel = parallel self.parallel_method = parallel_method self.progress = progress self.chunks = chunks self.chunks_maxsize = chunks_maxsize self.init(**kwargs) self._init_erddapy()
def __repr__(self): summary = ["<datafetcher.erddap>"] summary.append("Name: %s" % self.definition) summary.append("API: %s" % api_server) summary.append("Domain: %s" % format_oneline(self.cname())) return "\n".join(summary) def _add_attributes(self, this): # noqa: C901 """ Add variables attributes not return by erddap requests (csv) This is hard coded, but should be retrieved from an API somewhere """ for v in this.data_vars: if "TEMP" in v and "_QC" not in v: this[v].attrs = { "long_name": "SEA TEMPERATURE IN SITU ITS-90 SCALE", "standard_name": "sea_water_temperature", "units": "degree_Celsius", "valid_min": -2.0, "valid_max": 40.0, "resolution": 0.001, } if "ERROR" in v: this[v].attrs["long_name"] = ( "ERROR IN %s" % this[v].attrs["long_name"] ) for v in this.data_vars: if "PSAL" in v and "_QC" not in v: this[v].attrs = { "long_name": "PRACTICAL SALINITY", "standard_name": "sea_water_salinity", "units": "psu", "valid_min": 0.0, "valid_max": 43.0, "resolution": 0.001, } if "ERROR" in v: this[v].attrs["long_name"] = ( "ERROR IN %s" % this[v].attrs["long_name"] ) for v in this.data_vars: if "PRES" in v and "_QC" not in v: this[v].attrs = { "long_name": "Sea Pressure", "standard_name": "sea_water_pressure", "units": "decibar", "valid_min": 0.0, "valid_max": 12000.0, "resolution": 0.1, "axis": "Z", } if "ERROR" in v: this[v].attrs["long_name"] = ( "ERROR IN %s" % this[v].attrs["long_name"] ) for v in this.data_vars: if "DOXY" in v and "_QC" not in v: this[v].attrs = { "long_name": "Dissolved oxygen", "standard_name": "moles_of_oxygen_per_unit_mass_in_sea_water", "units": "micromole/kg", "valid_min": -5.0, "valid_max": 600.0, "resolution": 0.001, } if "ERROR" in v: this[v].attrs["long_name"] = ( "ERROR IN %s" % this[v].attrs["long_name"] ) for v in this.data_vars: if "_QC" in v: attrs = { "long_name": "Global quality flag of %s profile" % v, "convention": "Argo reference table 2a", } this[v].attrs = attrs if "CYCLE_NUMBER" in this.data_vars: this["CYCLE_NUMBER"].attrs = { "long_name": "Float cycle number", "convention": "0..N, 0 : launch cycle (if exists), 1 : first complete cycle", } if "DATA_MODE" in this.data_vars: this["DATA_MODE"].attrs = { "long_name": "Delayed mode or real time data", "convention": "R : real time; D : delayed mode; A : real time with adjustment", } if "DIRECTION" in this.data_vars: this["DIRECTION"].attrs = { "long_name": "Direction of the station profiles", "convention": "A: ascending profiles, D: descending profiles", } if "PLATFORM_NUMBER" in this.data_vars: this["PLATFORM_NUMBER"].attrs = { "long_name": "Float unique identifier", "convention": "WMO float identifier : A9IIIII", } return this def _init_erddapy(self): # Init erddapy self.erddap = ERDDAP(server=self.server, protocol="tabledap") self.erddap.response = ( "nc" # This is a major change in v0.4, we used to work with csv files ) if self.dataset_id == "phy": self.erddap.dataset_id = "ArgoFloats" elif self.dataset_id == "ref": self.erddap.dataset_id = "ArgoFloats-ref" elif self.dataset_id == "bgc": self.erddap.dataset_id = "ArgoFloats-synthetic-BGC" elif self.dataset_id == "fail": self.erddap.dataset_id = "invalid_db" else: raise ValueError( "Invalid database short name for Ifremer erddap (use: 'phy', 'bgc' or 'ref')" ) return self @property def _minimal_vlist(self): """ Return the minimal list of variables to retrieve measurements for """ vlist = list() if self.dataset_id == "phy": plist = [ "data_mode", "latitude", "longitude", "position_qc", "time", "time_qc", "direction", "platform_number", "cycle_number", "config_mission_number", "vertical_sampling_scheme", ] [vlist.append(p) for p in plist] plist = ["pres", "temp", "psal"] [vlist.append(p) for p in plist] [vlist.append(p + "_qc") for p in plist] [vlist.append(p + "_adjusted") for p in plist] [vlist.append(p + "_adjusted_qc") for p in plist] [vlist.append(p + "_adjusted_error") for p in plist] if self.dataset_id == "bgc": plist = [ "parameter_data_mode", "latitude", "longitude", "position_qc", "time", "time_qc", "direction", "platform_number", "cycle_number", "config_mission_number", ] [vlist.append(p) for p in plist] plist = ["pres", "temp", "psal", "cndc", "doxy", "beta_backscattering", "fluorescence_chla", # "fluorescence_cdom", # "side_scattering_turbidity", # "transmittance_particle_beam_attenuation", "bbp", "turbidity", "cp", "chla", "cdom", "nitrate", ] [vlist.append(p) for p in plist] [vlist.append(p + "_qc") for p in plist] [vlist.append(p + "_adjusted") for p in plist] [vlist.append(p + "_adjusted_qc") for p in plist] [vlist.append(p + "_adjusted_error") for p in plist] elif self.dataset_id == "ref": plist = ["latitude", "longitude", "time", "platform_number", "cycle_number"] [vlist.append(p) for p in plist] plist = ["pres", "temp", "psal", "ptmp"] [vlist.append(p) for p in plist] return vlist @property def _dtype(self): """ Return a dictionary of data types for each variable requested to erddap in the minimal vlist """ dref = { "data_mode": object, "latitude": np.float64, "longitude": np.float64, "position_qc": np.int64, "time": object, "time_qc": np.int64, "direction": object, "platform_number": np.int64, "config_mission_number": np.int64, "vertical_sampling_scheme": object, "cycle_number": np.int64, "pres": np.float64, "temp": np.float64, "psal": np.float64, "doxy": np.float64, "pres_qc": np.int64, "temp_qc": object, "psal_qc": object, "doxy_qc": object, "pres_adjusted": np.float64, "temp_adjusted": np.float64, "psal_adjusted": np.float64, "doxy_adjusted": np.float64, "pres_adjusted_qc": object, "temp_adjusted_qc": object, "psal_adjusted_qc": object, "doxy_adjusted_qc": object, "pres_adjusted_error": np.float64, "temp_adjusted_error": np.float64, "psal_adjusted_error": np.float64, "doxy_adjusted_error": np.float64, "ptmp": np.float64, } plist = self._minimal_vlist response = {} for p in plist: if p in dref: response[p] = dref[p] else: response[p] = object return response def cname(self): """ Return a unique string defining the constraints """ return self._cname() @property def cachepath(self): """ Return path to cached file(s) for this request Returns ------- list(str) """ return [self.fs.cachepath(uri) for uri in self.uri] def get_url(self): """ Return the URL to download data requested Returns ------- str """ # Replace erddapy get_download_url() # We need to replace it to better handle http responses with by-passing the _check_url_response # https://github.com/ioos/erddapy/blob/fa1f2c15304938cd0aa132946c22b0427fd61c81/erddapy/erddapy.py#L247 # First part of the URL: protocol = self.erddap.protocol dataset_id = self.erddap.dataset_id response = self.erddap.response url = f"{self.erddap.server}/{protocol}/{dataset_id}.{response}?" # Add variables to retrieve: self.erddap.variables = ( self._minimal_vlist ) # Define the list of variables to retrieve variables = self.erddap.variables variables = ",".join(variables) url += f"{variables}" # Add constraints: self.define_constraints() # Define constraint to select this box of data (affect self.erddap.constraints) constraints = self.erddap.constraints _constraints = copy.copy(constraints) for k, v in _constraints.items(): if k.startswith("time"): _constraints.update({k: parse_dates(v)}) _constraints = quote_string_constraints(_constraints) _constraints = "".join([f"&{k}{v}" for k, v in _constraints.items()]) url += f"{_constraints}" # Last part: url += '&distinct()&orderBy("time,pres")' return url @property def N_POINTS(self) -> int: """ Number of measurements expected to be returned by a request """ try: url = self.get_url().replace("." + self.erddap.response, ".ncHeader") with self.fs.open(url) as of: ncHeader = of.read().decode("utf-8") lines = [line for line in ncHeader.splitlines() if "row = " in line][0] return int(lines.split("=")[1].split(";")[0]) except Exception: raise ErddapServerError("Erddap server can't return ncHeader for this url. ") def to_xarray(self, errors: str = 'ignore'): """ Load Argo data and return a xarray.DataSet """ # Download data if not self.parallel: if len(self.uri) == 1: try: ds = self.fs.open_dataset(self.uri[0]) except ClientResponseError as e: raise ErddapServerError(e.message) else: try: ds = self.fs.open_mfdataset( self.uri, method="sequential", progress=self.progress, errors=errors ) except ClientResponseError as e: raise ErddapServerError(e.message) else: try: ds = self.fs.open_mfdataset( self.uri, method=self.parallel_method, progress=self.progress, errors=errors ) except ClientResponseError as e: raise ErddapServerError(e.message) ds = ds.rename({"row": "N_POINTS"}) # Post-process the xarray.DataSet: # Set coordinates: coords = ("LATITUDE", "LONGITUDE", "TIME", "N_POINTS") ds = ds.reset_coords() ds["N_POINTS"] = ds["N_POINTS"] # Convert all coordinate variable names to upper case for v in ds.data_vars: ds = ds.rename({v: v.upper()}) ds = ds.set_coords(coords) # Cast data types and add variable attributes (not available in the csv download): ds = self._add_attributes(ds) ds = ds.argo.cast_types() # More convention: # ds = ds.rename({'pres': 'pressure'}) # Remove erddap file attributes and replace them with argopy ones: ds.attrs = {} if self.dataset_id == "phy": ds.attrs["DATA_ID"] = "ARGO" elif self.dataset_id == "ref": ds.attrs["DATA_ID"] = "ARGO_Reference" elif self.dataset_id == "bgc": ds.attrs["DATA_ID"] = "ARGO-BGC" ds.attrs["DOI"] = "http://doi.org/10.17882/42182" ds.attrs["Fetched_from"] = self.erddap.server ds.attrs["Fetched_by"] = getpass.getuser() ds.attrs["Fetched_date"] = pd.to_datetime("now", utc=True).strftime("%Y/%m/%d") ds.attrs["Fetched_constraints"] = self.cname() ds.attrs["Fetched_uri"] = self.uri ds = ds[np.sort(ds.data_vars)] # return ds def filter_data_mode(self, ds, **kwargs): ds = ds.argo.filter_data_mode(errors="ignore", **kwargs) if ds.argo._type == "point": ds["N_POINTS"] = np.arange(0, len(ds["N_POINTS"])) return ds def filter_qc(self, ds, **kwargs): ds = ds.argo.filter_qc(**kwargs) if ds.argo._type == "point": ds["N_POINTS"] = np.arange(0, len(ds["N_POINTS"])) return ds def filter_variables(self, ds, mode="standard"): if mode == "standard": to_remove = sorted( list(set(list(ds.data_vars)) - set(list_standard_variables())) ) return ds.drop_vars(to_remove) else: return ds
[docs]class Fetch_wmo(ErddapArgoDataFetcher): """ Manage access to Argo data through Ifremer ERDDAP for: a list of WMOs This class is instantiated when a call is made to these facade access points: - `ArgoDataFetcher(src='erddap').float(**)` - `ArgoDataFetcher(src='erddap').profile(**)` """ def init(self, WMO=[], CYC=None, **kw): """ Create Argo data loader for WMOs Parameters ---------- WMO : list(int) The list of WMOs to load all Argo data for. CYC : int, np.array(int), list(int) The cycle numbers to load. """ self.WMO = WMO self.CYC = CYC self.definition = "?" if self.dataset_id == "phy": self.definition = "Ifremer erddap Argo data fetcher for floats" elif self.dataset_id == "ref": self.definition = "Ifremer erddap Argo REFERENCE data fetcher for floats" return self def define_constraints(self): """ Define erddap constraints """ self.erddap.constraints = { "platform_number=~": "|".join(["%i" % i for i in self.WMO]) } if self.CYC is not None: self.erddap.constraints.update( {"cycle_number=~": "|".join(["%i" % i for i in self.CYC])} ) return self @property def uri(self): """ List of URLs to load for a request Returns ------- list(str) """ if not self.parallel: chunks = "auto" chunks_maxsize = {'wmo': 5} else: chunks = self.chunks chunks_maxsize = self.chunks_maxsize self.Chunker = Chunker( {"wmo": self.WMO}, chunks=chunks, chunksize=chunks_maxsize ) wmo_grps = self.Chunker.fit_transform() urls = [] for wmos in wmo_grps: urls.append( Fetch_wmo( WMO=wmos, CYC=self.CYC, ds=self.dataset_id, parallel=False ).get_url() ) return urls
[docs]class Fetch_box(ErddapArgoDataFetcher): """ Manage access to Argo data through Ifremer ERDDAP for: an ocean rectangle """ def init(self, box: list, **kw): """ Create Argo data loader Parameters ---------- box : list(float, float, float, float, float, float, str, str) The box domain to load all Argo data for: box = [lon_min, lon_max, lat_min, lat_max, pres_min, pres_max] or: box = [lon_min, lon_max, lat_min, lat_max, pres_min, pres_max, datim_min, datim_max] """ self.BOX = box.copy() if self.dataset_id == "phy": self.definition = "Ifremer erddap Argo data fetcher for a space/time region" elif self.dataset_id == "ref": self.definition = ( "Ifremer erddap Argo REFERENCE data fetcher for a space/time region" ) return self def define_constraints(self): """ Define request constraints """ self.erddap.constraints = {"longitude>=": self.BOX[0]} self.erddap.constraints.update({"longitude<=": self.BOX[1]}) self.erddap.constraints.update({"latitude>=": self.BOX[2]}) self.erddap.constraints.update({"latitude<=": self.BOX[3]}) self.erddap.constraints.update({"pres>=": self.BOX[4]}) self.erddap.constraints.update({"pres<=": self.BOX[5]}) if len(self.BOX) == 8: self.erddap.constraints.update({"time>=": self.BOX[6]}) self.erddap.constraints.update({"time<=": self.BOX[7]}) return None @property def uri(self): """ List of files to load for a request Returns ------- list(str) """ if not self.parallel: return [self.get_url()] else: self.Chunker = Chunker( {"box": self.BOX}, chunks=self.chunks, chunksize=self.chunks_maxsize ) boxes = self.Chunker.fit_transform() urls = [] for box in boxes: urls.append(Fetch_box(box=box, ds=self.dataset_id).get_url()) return urls