# -*- coding: utf-8 -*-
"""
argopy.data_fetchers.erddap
~~~~~~~~~~~~~~~~~~~~~~~~~~~
This module contains Argo data fetcher for Ifremer ERDDAP.
This is not intended to be used directly, only by the facade at fetchers.py
"""
import xarray as xr
import pandas as pd
import numpy as np
import copy
import time
from abc import abstractmethod
import getpass
from typing import Union
import fnmatch
from aiohttp import ClientResponseError
import logging
from ..options import OPTIONS
from ..utils.format import format_oneline
from ..stores import httpstore
from ..errors import ErddapServerError, DataNotFound
from ..stores import indexstore_pd as ArgoIndex # make sure we work with the Pandas index store
from ..utils import is_list_of_strings, to_list,Chunker
from .proto import ArgoDataFetcherProto
# Load erddapy according to available version (breaking changes in v0.8.0)
try:
from erddapy import ERDDAP
from erddapy.utilities import parse_dates, quote_string_constraints
except: # noqa: E722
# >= v0.8.0
from erddapy.erddapy import ERDDAP
from erddapy.erddapy import _quote_string_constraints as quote_string_constraints
from erddapy.erddapy import parse_dates
# Soon ! https://github.com/ioos/erddapy
log = logging.getLogger("argopy.erddap.data")
access_points = ["wmo", "box"]
exit_formats = ["xarray"]
dataset_ids = ["phy", "ref", "bgc"] # First is default
api_server = OPTIONS["erddap"] # API root url
api_server_check = (
OPTIONS["erddap"] + "/info/ArgoFloats/index.json"
) # URL to check if the API is alive
[docs]class ErddapArgoDataFetcher(ArgoDataFetcherProto):
"""Manage access to Argo data through Ifremer ERDDAP
ERDDAP transaction are managed with the erddapy library
This class is a prototype not meant to be instantiated directly
"""
###
# Methods to be customised for a specific erddap request
###
@abstractmethod
def init(self, *args, **kwargs):
"""Initialisation for a specific fetcher"""
raise NotImplementedError("ErddapArgoDataFetcher.init not implemented")
@abstractmethod
def define_constraints(self):
"""Define erddapy constraints"""
raise NotImplementedError(
"ErddapArgoDataFetcher.define_constraints not implemented"
)
@property
@abstractmethod
def uri(self) -> list:
"""Return the list of Unique Resource Identifier (URI) to download data"""
raise NotImplementedError("ErddapArgoDataFetcher.uri not implemented")
###
# Methods that must not change
###
[docs] def __init__( # noqa: C901
self,
ds: str = "",
cache: bool = False,
cachedir: str = "",
parallel: bool = False,
parallel_method: str = "erddap", # Alternative to 'thread' with a dashboard
progress: bool = False,
chunks: str = "auto",
chunks_maxsize: dict = {},
api_timeout: int = 0,
params: Union[str, list] = "all",
measured: Union[str, list] = None,
**kwargs,
):
"""Instantiate an ERDDAP Argo data fetcher
Parameters
----------
ds: str (optional)
Dataset to load: 'phy' or 'ref' or 'bgc'
cache: bool (optional)
Cache data or not (default: False)
cachedir: str (optional)
Path to cache folder
parallel: bool (optional)
Chunk request to use parallel fetching (default: False)
parallel_method: str (optional)
Define the parallelization method: ``thread``, ``process`` or a :class:`dask.distributed.client.Client`.
progress: bool (optional)
Show a progress bar or not when ``parallel`` is set to True.
chunks: 'auto' or dict of integers (optional)
Dictionary with request access point as keys and number of chunks to create as values.
Eg: {'wmo': 10} will create a maximum of 10 chunks along WMOs when used with ``Fetch_wmo``.
chunks_maxsize: dict (optional)
Dictionary with request access point as keys and chunk size as values (used as maximum values in
'auto' chunking).
Eg: {'wmo': 5} will create chunks with as many as 5 WMOs each.
api_timeout: int (optional)
Erddap request time out in seconds. Set to OPTIONS['api_timeout'] by default.
params: Union[str, list] (optional, default='all')
List of BGC essential variables to retrieve, i.e. that will be in the output :class:`xr.DataSet``.
By default, this is set to ``all``, i.e. any variable found in at least of the profile in the data
selection will be included in the output.
measured: Union[str, list] (optional, default=None)
List of BGC essential variables that can't be NaN. If set to 'all', this is an easy way to reduce the size of the
:class:`xr.DataSet`` to points where all variables have been measured. Otherwise, provide a simple list of
variables.
"""
timeout = OPTIONS["api_timeout"] if api_timeout == 0 else api_timeout
self.definition = "Ifremer erddap Argo data fetcher"
self.dataset_id = OPTIONS["dataset"] if ds == "" else ds
self.server = kwargs["server"] if "server" in kwargs else OPTIONS["erddap"]
self.store_opts = {
"cache": cache,
"cachedir": cachedir,
"timeout": timeout,
"size_policy": "head",
}
self.fs = kwargs["fs"] if "fs" in kwargs else httpstore(**self.store_opts)
if not isinstance(parallel, bool):
parallel_method = parallel
parallel = True
if parallel_method not in ["thread", "seq", "erddap"]:
raise ValueError(
"erddap only support multi-threading, use 'thread' or 'erddap' instead of '%s'"
% parallel_method
)
self.parallel = parallel
self.parallel_method = parallel_method
self.progress = progress
self.chunks = chunks
self.chunks_maxsize = chunks_maxsize
self.init(**kwargs)
self._init_erddapy()
if self.dataset_id == "bgc":
# Create an ArgoIndex instance:
# This will be used to:
# - retrieve the list of BGC variables to ask the erddap server
# - get <param>_data_mode information because we can't get it from the server
self.indexfs = kwargs['indexfs'] if 'indexfs' in kwargs else ArgoIndex(
index_file='argo_synthetic-profile_index.txt', # the only available in the erddap
cache=kwargs['cache_index'] if 'cache_index' in kwargs else cache,
cachedir=cachedir,
timeout=timeout,
)
# To handle bugs in the erddap server, we need the list of parameters on the server:
# todo: Remove this when bug fixed
data = self.fs.open_json(
self.server + "/info/ArgoFloats-synthetic-BGC/index.json"
)
self._bgc_vlist_erddap = [
row[1] for row in data["table"]["rows"] if row[0] == "variable"
]
# Handle the 'params' argument:
self._bgc_params = to_list(params)
if isinstance(params, str):
if params == "all":
params = self._bgc_vlist_avail
else:
params = to_list(params)
elif params is None:
raise ValueError()
elif params[0] == "all":
params = self._bgc_vlist_avail
elif not is_list_of_strings(params):
raise ValueError("'params' argument must be a list of strings")
# raise ValueError("'params' argument must be a list of strings (possibly with a * wildcard)")
self._bgc_vlist_requested = [p.upper() for p in params]
# self._bgc_vlist_requested = self._bgc_handle_wildcard(self._bgc_vlist_requested)
for p in ["PRES", "TEMP", "PSAL"]:
if p not in self._bgc_vlist_requested:
self._bgc_vlist_requested.append(p)
# Handle the 'measured' argument:
self._bgc_measured = to_list(measured)
if isinstance(measured, str):
if measured == "all":
measured = self._bgc_vlist_requested
else:
measured = to_list(measured)
elif self._bgc_measured[0] is None:
measured = []
elif self._bgc_measured[0] == "all":
measured = self._bgc_vlist_requested
elif not is_list_of_strings(self._bgc_measured):
raise ValueError("'measured' argument must be a list of strings")
# raise ValueError("'measured' argument must be a list of strings (possibly with a * wildcard)")
self._bgc_vlist_measured = [m.upper() for m in measured]
# self._bgc_vlist_measured = self._bgc_handle_wildcard(self._bgc_vlist_measured)
def __repr__(self):
summary = ["<datafetcher.erddap>"]
summary.append("Name: %s" % self.definition)
summary.append("API: %s" % self.server)
summary.append("Domain: %s" % format_oneline(self.cname()))
if self.dataset_id == "bgc":
summary.append("BGC variables: %s" % self._bgc_vlist_requested)
summary.append(
"BGC 'must be measured' variables: %s" % self._bgc_vlist_measured
)
return "\n".join(summary)
@property
def server(self):
"""URL of the Erddap server"""
return self._server
@server.setter
def server(self, value):
self._server = value
if hasattr(self, "erddap") and self.erddap.server != value:
log.debug("The erddap server has been modified, updating internal data")
self._init_erddapy()
def _add_attributes(self, this): # noqa: C901
"""Add variables attributes not return by erddap requests (csv)
This is hard coded, but should be retrieved from an API somewhere
"""
for v in this.data_vars:
param = "PRES"
if v in [param, "%s_ADJUSTED" % param, "%s_ADJUSTED_ERROR" % param]:
this[v].attrs = {
"long_name": "Sea Pressure",
"standard_name": "sea_water_pressure",
"units": "decibar",
"valid_min": 0.0,
"valid_max": 12000.0,
"resolution": 0.1,
"axis": "Z",
"casted": this[v].attrs["casted"]
if "casted" in this[v].attrs
else 0,
}
if "ERROR" in v:
this[v].attrs["long_name"] = (
"ERROR IN %s" % this[v].attrs["long_name"]
)
for v in this.data_vars:
param = "TEMP"
if v in [param, "%s_ADJUSTED" % param, "%s_ADJUSTED_ERROR" % param]:
this[v].attrs = {
"long_name": "SEA TEMPERATURE IN SITU ITS-90 SCALE",
"standard_name": "sea_water_temperature",
"units": "degree_Celsius",
"valid_min": -2.0,
"valid_max": 40.0,
"resolution": 0.001,
"casted": this[v].attrs["casted"]
if "casted" in this[v].attrs
else 0,
}
if "ERROR" in v:
this[v].attrs["long_name"] = (
"ERROR IN %s" % this[v].attrs["long_name"]
)
for v in this.data_vars:
param = "PSAL"
if v in [param, "%s_ADJUSTED" % param, "%s_ADJUSTED_ERROR" % param]:
this[v].attrs = {
"long_name": "PRACTICAL SALINITY",
"standard_name": "sea_water_salinity",
"units": "psu",
"valid_min": 0.0,
"valid_max": 43.0,
"resolution": 0.001,
"casted": this[v].attrs["casted"]
if "casted" in this[v].attrs
else 0,
}
if "ERROR" in v:
this[v].attrs["long_name"] = (
"ERROR IN %s" % this[v].attrs["long_name"]
)
for v in this.data_vars:
param = "DOXY"
if v in [param, "%s_ADJUSTED" % param, "%s_ADJUSTED_ERROR" % param]:
this[v].attrs = {
"long_name": "Dissolved oxygen",
"standard_name": "moles_of_oxygen_per_unit_mass_in_sea_water",
"units": "micromole/kg",
"valid_min": -5.0,
"valid_max": 600.0,
"resolution": 0.001,
"casted": this[v].attrs["casted"]
if "casted" in this[v].attrs
else 0,
}
if "ERROR" in v:
this[v].attrs["long_name"] = (
"ERROR IN %s" % this[v].attrs["long_name"]
)
for v in this.data_vars:
if "_QC" in v:
attrs = {
"long_name": "Global quality flag of %s profile" % v,
"conventions": "Argo reference table 2a",
"casted": this[v].attrs["casted"]
if "casted" in this[v].attrs
else 0,
}
this[v].attrs = attrs
if "CYCLE_NUMBER" in this.data_vars:
this["CYCLE_NUMBER"].attrs = {
"long_name": "Float cycle number",
"conventions": "0..N, 0 : launch cycle (if exists), 1 : first complete cycle",
"casted": this["CYCLE_NUMBER"].attrs["casted"]
if "casted" in this["CYCLE_NUMBER"].attrs
else 0,
}
if "DIRECTION" in this.data_vars:
this["DIRECTION"].attrs = {
"long_name": "Direction of the station profiles",
"conventions": "A: ascending profiles, D: descending profiles",
"casted": this["DIRECTION"].attrs["casted"]
if "casted" in this["DIRECTION"].attrs
else 0,
}
if "PLATFORM_NUMBER" in this.data_vars:
this["PLATFORM_NUMBER"].attrs = {
"long_name": "Float unique identifier",
"conventions": "WMO float identifier : A9IIIII",
"casted": this["PLATFORM_NUMBER"].attrs["casted"]
if "casted" in this["PLATFORM_NUMBER"].attrs
else 0,
}
if "DATA_MODE" in this.data_vars:
this["DATA_MODE"].attrs = {
"long_name": "Delayed mode or real time data",
"conventions": "R : real time; D : delayed mode; A : real time with adjustment",
"casted": this["DATA_MODE"].attrs["casted"]
if "casted" in this["DATA_MODE"].attrs
else 0,
}
if self.dataset_id == "bgc":
for param in self._bgc_vlist_requested:
if "%s_DATA_MODE" % param in this.data_vars:
this["%s_DATA_MODE" % param].attrs = {
"long_name": "Delayed mode or real time data",
"conventions": "R : real time; D : delayed mode; A : real time with adjustment",
"casted": this["%s_DATA_MODE" % param].attrs["casted"]
if "casted" in this["%s_DATA_MODE" % param].attrs
else 0,
}
return this
def _init_erddapy(self):
# Init erddapy
self.erddap = ERDDAP(server=str(self.server), protocol="tabledap")
self.erddap.response = (
"nc" # This is a major change in v0.4, we used to work with csv files
)
if self.dataset_id == "phy":
self.erddap.dataset_id = "ArgoFloats"
elif self.dataset_id == "ref":
self.erddap.dataset_id = "ArgoFloats-ref"
elif self.dataset_id == "bgc":
self.erddap.dataset_id = "ArgoFloats-synthetic-BGC"
elif self.dataset_id == "fail":
self.erddap.dataset_id = "invalid_db"
else:
raise ValueError(
"Invalid database short name for Ifremer erddap (use: 'phy', 'bgc' or 'ref')"
)
return self
@property
def _bgc_vlist_avail(self):
"""Return the list of BGC parameters available for this access point
Apply search criteria in the index, then retrieve the list of parameters
"""
if hasattr(self, "WMO"):
if hasattr(self, "CYC") and self.CYC is not None:
self.indexfs.search_wmo_cyc(self.WMO, self.CYC)
else:
self.indexfs.search_wmo(self.WMO)
elif hasattr(self, "BOX"):
if len(self.indexBOX) == 4:
self.indexfs.search_lat_lon(self.indexBOX)
else:
self.indexfs.search_lat_lon_tim(self.indexBOX)
params = self.indexfs.read_params()
# Temporarily remove from params those missing on the erddap server:
# params = [p for p in params if p.lower() in self._bgc_vlist_erddap]
results = []
for p in params:
if p.lower() in self._bgc_vlist_erddap:
results.append(p)
else:
log.error(
"Removed '%s' because it's not available on the erddap, but it must !"
% p
)
return results
# def _bgc_handle_wildcard(self, param_list):
# """In a list, replace item with wildcard by available BGC parameter(s)"""
# is_valid_param = lambda x: x in list( # noqa: E731
# argopy.ArgoNVSReferenceTables().tbl(3)["altLabel"]
# )
# results = param_list.copy()
# for p in param_list:
# if not is_valid_param(p):
# if "*" not in p:
# raise ValueError(
# "Invalid BGC parameter '%s' (not listed in Argo reference table 3)"
# % p
# )
# else:
# match = fnmatch.filter(self._bgc_vlist_avail, p)
# if len(match) > 0:
# [results.append(m) for m in match]
# results.remove(p)
# return results
@property
def _minimal_vlist(self):
"""Return the minimal list of variables to retrieve measurements for"""
vlist = list()
if self.dataset_id == "phy":
plist = [
"data_mode",
"latitude",
"longitude",
"position_qc",
"time",
"time_qc",
"direction",
"platform_number",
"cycle_number",
"config_mission_number",
"vertical_sampling_scheme",
]
[vlist.append(p) for p in plist]
plist = ["pres", "temp", "psal"]
[vlist.append(p) for p in plist]
[vlist.append(p + "_qc") for p in plist]
[vlist.append(p + "_adjusted") for p in plist]
[vlist.append(p + "_adjusted_qc") for p in plist]
[vlist.append(p + "_adjusted_error") for p in plist]
if self.dataset_id == "bgc":
plist = [
# "parameter_data_mode", # never !!!
"latitude",
"longitude",
"position_qc",
"time",
"time_qc",
"direction",
"platform_number",
"cycle_number",
"config_mission_number",
]
[vlist.append(p) for p in plist]
# Search in the profile index the list of parameters to load:
params = self._bgc_vlist_requested
# log.debug("erddap-bgc parameters to load: %s" % params)
for p in params:
vname = p.lower()
vlist.append("%s" % vname)
vlist.append("%s_qc" % vname)
vlist.append("%s_adjusted" % vname)
vlist.append("%s_adjusted_qc" % vname)
vlist.append("%s_adjusted_error" % vname)
# vlist.append("profile_%s_qc" % vname) # not in the database
elif self.dataset_id == "ref":
plist = ["latitude", "longitude", "time", "platform_number", "cycle_number"]
[vlist.append(p) for p in plist]
plist = ["pres", "temp", "psal", "ptmp"]
[vlist.append(p) for p in plist]
return vlist
def cname(self):
"""Return a unique string defining the constraints"""
return self._cname()
@property
def cachepath(self):
"""Return path to cached file(s) for this request
Returns
-------
list(str)
"""
return [self.fs.cachepath(uri) for uri in self.uri]
def get_url(self):
"""Return the URL to download requested data
Returns
-------
str
"""
# Replace erddapy get_download_url()
# We need to replace it to better handle http responses with by-passing the _check_url_response
# https://github.com/ioos/erddapy/blob/fa1f2c15304938cd0aa132946c22b0427fd61c81/erddapy/erddapy.py#L247
# First part of the URL:
protocol = self.erddap.protocol
dataset_id = self.erddap.dataset_id
response = self.erddap.response
url = f"{self.erddap.server}/{protocol}/{dataset_id}.{response}?"
# Add variables to retrieve:
self.erddap.variables = (
self._minimal_vlist
) # Define the list of variables to retrieve
variables = self.erddap.variables
variables = ",".join(variables)
url += f"{variables}"
# Load constraints to implement the access point:
self.define_constraints() # from Fetch_box or Fetch_wmo
# Possibly add more constraints for the BGC dataset:
if self.dataset_id == "bgc":
params = self._bgc_vlist_measured
for p in params:
self.erddap.constraints.update({"%s!=" % p.lower(): "NaN"})
# Possibly add more constraints to make requests smaller:
for p in ["latitude", "longitude"]:
self.erddap.constraints.update({"%s!=" % p.lower(): "NaN"})
# for p in ['platform_number']:
# self.erddap.constraints.update({"%s!=" % p.lower(): '"NaN"'})
# Post-process all constraints:
constraints = self.erddap.constraints
_constraints = copy.copy(constraints)
for k, v in _constraints.items():
if k.startswith("time"):
_constraints.update({k: parse_dates(v)})
_constraints = quote_string_constraints(_constraints)
# Remove double-quote around NaN for numerical values:
for k, v in _constraints.items():
if v == '"NaN"':
_constraints.update({k: "NaN"})
_constraints = "".join([f"&{k}{v}" for k, v in _constraints.items()])
url += f"{_constraints}"
# Last part:
url += "&distinct()"
url += '&orderBy("time,pres")'
return url
@property
def N_POINTS(self) -> int:
"""Number of measurements expected to be returned by a request"""
url = self.get_url().replace("." + self.erddap.response, ".ncHeader")
try:
ncHeader = str(self.fs.fs.cat_file(url))
lines = [line for line in ncHeader.splitlines() if "row = " in line][0]
return int(lines.split("=")[1].split(";")[0])
except Exception:
raise ErddapServerError(
"Erddap server can't return ncHeader for this url. "
)
def post_process(self, this_ds, add_dm: bool = True, URI: list = None): # noqa: C901
"""Post-process a xarray.DataSet created from a netcdf erddap response
This method can also be applied on a regular dataset to re-enforce format compliance
"""
if "row" in this_ds.dims:
this_ds = this_ds.rename({"row": "N_POINTS"})
# Set coordinates:
coords = ("LATITUDE", "LONGITUDE", "TIME", "N_POINTS")
this_ds = this_ds.reset_coords()
this_ds["N_POINTS"] = this_ds["N_POINTS"]
# Convert all coordinate variable names to upper case
for v in this_ds.data_vars:
this_ds = this_ds.rename({v: v.upper()})
this_ds = this_ds.set_coords(coords)
if self.dataset_id == "ref":
this_ds["DIRECTION"] = xr.full_like(this_ds["CYCLE_NUMBER"], "A", dtype=str)
# Cast data types:
# log.debug("erddap.post_process WMO=%s" % to_list(np.unique(this_ds['PLATFORM_NUMBER'].values)))
this_ds = this_ds.argo.cast_types()
# log.debug("erddap.post_process WMO=%s" % to_list(np.unique(this_ds['PLATFORM_NUMBER'].values)))
# if '999' in to_list(np.unique(this_ds['PLATFORM_NUMBER'].values)):
# log.error(this_ds.attrs)
# With BGC, some points may not have a PLATFORM_NUMBER !
# So, we remove these
if self.dataset_id == "bgc" and "999" in to_list(
np.unique(this_ds["PLATFORM_NUMBER"].values)
):
log.error("Found points without WMO !")
this_ds = this_ds.where(this_ds["PLATFORM_NUMBER"] != "999", drop=True)
this_ds = this_ds.argo.cast_types(overwrite=True)
log.debug(
"erddap.post_process WMO=%s"
% to_list(np.unique(this_ds["PLATFORM_NUMBER"].values))
)
# log.debug("erddap.post_process (add_dm=%s): %s" % (add_dm, str(this_ds)))
if self.dataset_id == "bgc" and add_dm:
this_ds = self._add_parameters_data_mode_ds(this_ds)
this_ds = this_ds.argo.cast_types(overwrite=False)
# log.debug("erddap.post_process (add_dm=%s): %s" % (add_dm, str(this_ds)))
# Overwrite Erddap attributes with those from Argo standards:
this_ds = self._add_attributes(this_ds)
# In the case of a parallel download, this is a trick to preserve the chunk uri in the chunk dataset:
# (otherwise all chunks have the same list of uri)
Fetched_url = this_ds.attrs.get('Fetched_url', False)
Fetched_constraints = this_ds.attrs.get('Fetched_constraints', False)
# Finally overwrite erddap attributes with those from argopy:
this_ds.attrs = {}
if self.dataset_id == "phy":
this_ds.attrs["DATA_ID"] = "ARGO"
this_ds.attrs["DOI"] = "http://doi.org/10.17882/42182"
elif self.dataset_id == "ref":
this_ds.attrs["DATA_ID"] = "ARGO_Reference"
this_ds.attrs["DOI"] = "-"
elif self.dataset_id == "bgc":
this_ds.attrs["DATA_ID"] = "ARGO-BGC"
this_ds.attrs["DOI"] = "http://doi.org/10.17882/42182"
this_ds.attrs["Fetched_from"] = self.erddap.server
try:
this_ds.attrs["Fetched_by"] = getpass.getuser()
except:
this_ds.attrs["Fetched_by"] = 'anonymous'
this_ds.attrs["Fetched_date"] = pd.to_datetime("now", utc=True).strftime(
"%Y/%m/%d"
)
this_ds.attrs["Fetched_constraints"] = self.cname() if not Fetched_constraints else Fetched_constraints
this_ds.attrs["Fetched_uri"] = URI if not Fetched_url else Fetched_url
this_ds = this_ds[np.sort(this_ds.data_vars)]
if self.dataset_id == "bgc":
n_zero = np.count_nonzero(np.isnan(np.unique(this_ds["PLATFORM_NUMBER"])))
if n_zero > 0:
log.error("Some points (%i) have no PLATFORM_NUMBER !" % n_zero)
return this_ds
def to_xarray( # noqa: C901
self,
errors: str = "ignore",
add_dm: bool = None,
concat: bool = True,
max_workers: int = 6,
):
"""Load Argo data and return a xarray.DataSet"""
URI = self.uri # Call it once
# Should we compute (from the index) and add DATA_MODE for BGC variables:
add_dm = self.dataset_id == "bgc" if add_dm is None else bool(add_dm)
# Download data
if not self.parallel:
if len(URI) == 1:
try:
# log_argopy_callerstack()
results = self.fs.open_dataset(URI[0])
results = self.post_process(results, add_dm=add_dm, URI=URI)
except ClientResponseError as e:
if "Proxy Error" in e.message:
raise ErddapServerError(
"Your request is probably taking longer than expected to be handled by the server. "
"You can try to relaunch you're request or use the 'parallel' option "
"to chunk it into small requests."
)
log.debug(str(e))
else:
raise ErddapServerError(e.message)
else:
try:
results = self.fs.open_mfdataset(
URI,
method="erddap",
max_workers=1,
progress=self.progress,
errors=errors,
concat=concat,
concat_dim="N_POINTS",
preprocess=self.post_process,
preprocess_opts={"add_dm": False, "URI": URI},
final_opts={"data_vars": "all"},
)
if results is not None:
if self.progress:
print("Final post-processing of the merged dataset () ...")
results = self.post_process(
results, **{"add_dm": add_dm, "URI": URI}
)
except ClientResponseError as e:
raise ErddapServerError(e.message)
else:
try:
results = self.fs.open_mfdataset(
URI,
method="erddap",
progress=self.progress,
max_workers=max_workers,
errors=errors,
concat=concat,
concat_dim="N_POINTS",
preprocess=self.post_process,
preprocess_opts={"add_dm": False, "URI": URI},
final_opts={"data_vars": "all"},
)
if concat:
if results is not None:
if self.progress:
print("Final post-processing of the merged dataset () ...")
results = self.post_process(
results, **{"add_dm": add_dm, "URI": URI}
)
except DataNotFound:
if self.dataset_id == "bgc" and len(self._bgc_vlist_measured) > 0:
msg = (
"Your BGC request returned no data. This may be due to the 'measured' "
"argument that imposes constraints impossible to fulfill for the "
f"access point defined ({self.cname()}), i.e. some "
"variables in 'measured' are not available in some floats in this "
"access point."
)
raise DataNotFound(msg)
else:
raise
except ClientResponseError as e:
raise ErddapServerError(e.message)
if concat:
results = self.filter_points(results)
# Final checks
if self.dataset_id == "bgc" and concat and len(self._bgc_vlist_measured) > 0:
empty = []
for v in self._bgc_vlist_measured:
if np.count_nonzero(results[v]) != len(results["N_POINTS"]):
empty.append(v)
if len(empty) > 0:
msg = (
"After processing, your BGC request still return final data with NaNs (%s). "
"This may be due to the 'measured' argument ('%s') that imposes a no-NaN constraint "
"impossible to fulfill for the access point defined (%s)]. "
"\nUsing the 'measured' argument, you can try to minimize the list of variables to "
"return without NaNs, or set it to 'None' to return all samples."
% (",".join(to_list(v)), ",".join(self._bgc_measured), self.cname())
)
raise ValueError(msg)
return results
def filter_data_mode(self, ds: xr.Dataset, **kwargs):
"""Apply xarray argo accessor filter_data_mode method"""
ds = ds.argo.filter_data_mode(errors="ignore", **kwargs)
if ds.argo._type == "point":
ds["N_POINTS"] = np.arange(0, len(ds["N_POINTS"]))
return ds
def filter_qc(self, ds: xr.Dataset, **kwargs):
"""Apply xarray argo accessor filter_qc method"""
ds = ds.argo.filter_qc(**kwargs)
if ds.argo._type == "point":
ds["N_POINTS"] = np.arange(0, len(ds["N_POINTS"]))
return ds
def filter_researchmode(self, ds: xr.Dataset, *args, **kwargs) -> xr.Dataset:
"""Filter dataset for research user mode
This filter will select only QC=1 delayed mode data with pressure errors smaller than 20db
Use this filter instead of filter_data_mode and filter_qc
"""
ds = ds.argo.filter_researchmode()
if ds.argo._type == "point":
ds["N_POINTS"] = np.arange(0, len(ds["N_POINTS"]))
return ds
def filter_points(self, ds):
"""Enforce request criteria
Sometimes, erddap data postprocessing may modify the content wrt user requirements. So we need
to adjust for this.
"""
# Enforce the 'measured' argument for BGC:
if self.dataset_id == "bgc":
if len(self._bgc_vlist_measured) == 0:
return ds
else:
this_mask = xr.DataArray(
np.zeros_like(ds["N_POINTS"]),
dims=["N_POINTS"],
coords={"N_POINTS": ds["N_POINTS"]},
)
log.debug("Keep samples without NaN in %s" % self._bgc_vlist_measured)
for v in self._bgc_vlist_measured:
this_mask += ds[v].notnull()
this_mask = this_mask == len(self._bgc_vlist_measured)
ds = ds.argo._where(this_mask, drop=True)
ds["N_POINTS"] = np.arange(0, len(ds["N_POINTS"]))
return ds
def _add_parameters_data_mode_ds(self, this_ds): # noqa: C901
"""Compute and add <PARAM>_DATA_MODE variables to a xarray dataset
This requires an ArgoIndex instance as Pandas Dataframe
todo: Code this for the pyarrow index backend
This method consume a collection of points
"""
# import time
def list_WMO_CYC(this_ds):
"""Given a dataset, return a list with all possible (PLATFORM_NUMBER, CYCLE_NUMBER) tuple"""
profiles = []
for wmo, grp in this_ds.groupby("PLATFORM_NUMBER"):
[
profiles.append((int(wmo), int(cyc)))
for cyc in np.unique(grp["CYCLE_NUMBER"])
]
return profiles
def list_WMO(this_ds):
"""Return all possible WMO as a list"""
return to_list(np.unique(this_ds["PLATFORM_NUMBER"].values))
def complete_df(this_df, params):
"""Add 'wmo', 'cyc' and '<param>_data_mode' columns to this dataframe"""
this_df["wmo"] = this_df["file"].apply(lambda x: int(x.split("/")[1]))
this_df["cyc"] = this_df["file"].apply(
lambda x: int(x.split("_")[-1].split(".nc")[0].replace("D", ""))
)
this_df["variables"] = df["parameters"].apply(lambda x: x.split())
for param in params:
df["%s_data_mode" % param] = this_df.apply(
lambda x: x["parameter_data_mode"][x["variables"].index(param)]
if param in x["variables"]
else "",
axis=1,
)
return this_df
def read_DM(this_df, wmo, cyc, param):
"""Return one parameter data mode for a given wmo/cyc and index dataframe"""
filt = []
filt.append(this_df["wmo"].isin([wmo]))
filt.append(this_df["cyc"].isin([cyc]))
sub_df = df[np.logical_and.reduce(filt)]
if sub_df.shape[0] == 0:
log.debug(
"Found a profile in the dataset, but not in the index ! wmo=%i, cyc=%i"
% (wmo, cyc)
)
# This can happen if a Synthetic netcdf file was generated from a non-BGC float.
# The file exists, but it doesn't have BGC variables. Float is usually not listed in the index.
return ""
else:
return sub_df["%s_data_mode" % param].values[-1]
def print_etime(txt, t0):
now = time.process_time()
print("⏰ %s: %0.2f seconds" % (txt, now - t0))
return now
# timer = time.process_time()
profiles = list_WMO_CYC(this_ds)
params = [
p
for p in self.indexfs.search_wmo(list_WMO(this_ds)).read_params()
if p in this_ds
]
# timer = print_etime('Read profiles and params from ds', timer)
df = self.indexfs.search_wmo(list_WMO(this_ds)).to_dataframe(completed=False)
df = complete_df(df, params)
# timer = print_etime('Index search wmo and export to dataframe', timer)
CYCLE_NUMBER = this_ds["CYCLE_NUMBER"].values
PLATFORM_NUMBER = this_ds["PLATFORM_NUMBER"].values
N_POINTS = this_ds["N_POINTS"].values
for param in params:
# print("=" * 50)
# print("Filling DATA MODE for %s ..." % param)
# tims = {'init': 0, 'read_DM': 0, 'isin': 0, 'where': 0, 'fill': 0}
for iprof, prof in enumerate(profiles):
wmo, cyc = prof
# t0 = time.process_time()
if "%s_DATA_MODE" % param not in this_ds:
this_ds["%s_DATA_MODE" % param] = xr.full_like(
this_ds["CYCLE_NUMBER"], dtype=str, fill_value=""
)
# now = time.process_time()
# tims['init'] += now - t0
# t0 = now
param_data_mode = read_DM(df, wmo, cyc, param)
# now = time.process_time()
# tims['read_DM'] += now - t0
# t0 = now
i_cyc = CYCLE_NUMBER == cyc
i_wmo = PLATFORM_NUMBER == wmo
# now = time.process_time()
# tims['isin'] += now - t0
# t0 = now
i_points = N_POINTS[np.logical_and(i_cyc, i_wmo)]
# now = time.process_time()
# tims['where'] += now - t0
# t0 = now
this_ds["%s_DATA_MODE" % param][i_points] = param_data_mode
# now = time.process_time()
# tims['fill'] += now - t0
this_ds["%s_DATA_MODE" % param] = this_ds["%s_DATA_MODE" % param].astype(
"<U1"
)
# timer = print_etime('Processed %s (%i profiles)' % (param, len(profiles)), timer)
return this_ds
[docs]class Fetch_wmo(ErddapArgoDataFetcher):
"""Manage access to Argo data through Ifremer ERDDAP for: a list of WMOs
This class is instantiated when a call is made to these facade access points:
- `ArgoDataFetcher(src='erddap').float(**)`
- `ArgoDataFetcher(src='erddap').profile(**)`
"""
def init(self, WMO=[], CYC=None, **kw):
"""Create Argo data loader for WMOs
Parameters
----------
WMO : list(int)
The list of WMOs to load all Argo data for.
CYC : int, np.array(int), list(int)
The cycle numbers to load.
"""
self.WMO = WMO
self.CYC = CYC
self.definition = "?"
if self.dataset_id == "phy":
self.definition = "Ifremer erddap Argo data fetcher"
elif self.dataset_id == "bgc":
self.definition = "Ifremer erddap Argo BGC data fetcher"
elif self.dataset_id == "ref":
self.definition = "Ifremer erddap Argo REFERENCE data fetcher"
if self.CYC is not None:
self.definition = "%s for profiles" % self.definition
else:
self.definition = "%s for floats" % self.definition
return self
def define_constraints(self):
"""Define erddap constraints"""
self.erddap.constraints = {
"platform_number=~": "|".join(["%i" % i for i in self.WMO])
}
if self.CYC is not None:
self.erddap.constraints.update(
{"cycle_number=~": "|".join(["%i" % i for i in self.CYC])}
)
return self
@property
def uri(self):
"""List of URLs to load for a request
Returns
-------
list(str)
"""
if not self.parallel:
chunks = "auto"
chunks_maxsize = {"wmo": 5}
if self.dataset_id == "bgc":
chunks_maxsize = {"wmo": 1}
else:
chunks = self.chunks
chunks_maxsize = self.chunks_maxsize
if self.dataset_id == "bgc":
chunks_maxsize["wmo"] = 1
self.Chunker = Chunker(
{"wmo": self.WMO}, chunks=chunks, chunksize=chunks_maxsize
)
wmo_grps = self.Chunker.fit_transform()
urls = []
opts = {
"ds": self.dataset_id,
"fs": self.fs,
"server": self.server,
"parallel": False,
"CYC": self.CYC,
}
if self.dataset_id == "bgc":
opts["params"] = self._bgc_params
opts["measured"] = self._bgc_measured
opts["indexfs"] = self.indexfs
for wmos in wmo_grps:
urls.append(
Fetch_wmo(
WMO=wmos,
**opts,
).get_url()
)
return urls
[docs]class Fetch_box(ErddapArgoDataFetcher):
"""Manage access to Argo data through Ifremer ERDDAP for: an ocean rectangle"""
def init(self, box: list, **kw):
"""Create Argo data loader
Parameters
----------
box : list(float, float, float, float, float, float, str, str)
The box domain to load all Argo data for:
box = [lon_min, lon_max, lat_min, lat_max, pres_min, pres_max]
or:
box = [lon_min, lon_max, lat_min, lat_max, pres_min, pres_max, datim_min, datim_max]
"""
self.BOX = box.copy()
self.indexBOX = [self.BOX[ii] for ii in [0, 1, 2, 3]]
if len(self.BOX) == 8:
self.indexBOX = [self.BOX[ii] for ii in [0, 1, 2, 3, 6, 7]]
if self.dataset_id == "phy":
self.definition = "Ifremer erddap Argo data fetcher for a space/time region"
elif self.dataset_id == "bgc":
self.definition = (
"Ifremer erddap Argo BGC data fetcher for a space/time region"
)
elif self.dataset_id == "ref":
self.definition = (
"Ifremer erddap Argo REFERENCE data fetcher for a space/time region"
)
return self
def define_constraints(self):
"""Define request constraints"""
self.erddap.constraints = {"longitude>=": self.BOX[0]}
self.erddap.constraints.update({"longitude<=": self.BOX[1]})
self.erddap.constraints.update({"latitude>=": self.BOX[2]})
self.erddap.constraints.update({"latitude<=": self.BOX[3]})
self.erddap.constraints.update({"pres>=": self.BOX[4]})
self.erddap.constraints.update({"pres<=": self.BOX[5]})
if len(self.BOX) == 8:
self.erddap.constraints.update({"time>=": self.BOX[6]})
self.erddap.constraints.update({"time<=": self.BOX[7]})
return None
@property
def uri(self):
"""List of files to load for a request
Returns
-------
list(str)
"""
if not self.parallel:
return [self.get_url()]
else:
self.Chunker = Chunker(
{"box": self.BOX}, chunks=self.chunks, chunksize=self.chunks_maxsize
)
boxes = self.Chunker.fit_transform()
urls = []
opts = {"ds": self.dataset_id, "fs": self.fs, "server": self.server}
if self.dataset_id == "bgc":
opts["params"] = self._bgc_params
opts["measured"] = self._bgc_measured
opts["indexfs"] = self.indexfs
for box in boxes:
try:
fb = Fetch_box(
box=box,
**opts,
)
urls.append(fb.get_url())
except DataNotFound:
log.debug("This box fetcher will contain no data")
return urls