"""
Fetcher to retrieve CTD reference data from Ifremer erddap
"""
import xarray as xr
import logging
from ..options import OPTIONS
from ..utils.chunking import Chunker
from ..utils.geo import conv_lon
from ..stores import httpstore_erddap_auth
from .erddap_data import ErddapArgoDataFetcher
# Load erddapy according to available version (breaking changes in v0.8.0)
try:
from erddapy import ERDDAP
from erddapy.utilities import parse_dates, quote_string_constraints
except: # noqa: E722
# >= v0.8.0
from erddapy.erddapy import ERDDAP # noqa: F401
from erddapy.erddapy import _quote_string_constraints as quote_string_constraints # noqa: F401
from erddapy.erddapy import parse_dates # noqa: F401
# Soon ! https://github.com/ioos/erddapy
log = logging.getLogger("argopy.erddap.refdata")
access_points = ["box"]
exit_formats = ["xarray"]
dataset_ids = ["ref-ctd"] # First is default
api_server = OPTIONS["erddap"] # API root url
api_server_check = (
OPTIONS["erddap"] + "/info/ArgoFloats/index.json"
) # URL to check if the API is alive
[docs]class ErddapREFDataFetcher(ErddapArgoDataFetcher):
"""Manage access to Argo CTD-reference data through Ifremer ERDDAP"""
# @doc_inherit
[docs] def __init__(self, **kwargs):
"""Instantiate an authenticated ERDDAP Argo data fetcher
Parameters
----------
cache: bool (optional)
Cache data or not (default: False)
cachedir: str (optional)
Path to cache folder
api_timeout: int (optional)
Erddap request time out in seconds. Set to OPTIONS['api_timeout'] by default.
"""
kwargs["ds"] = "ref-ctd"
super().__init__(**kwargs)
kw = kwargs
[
kw.pop(p)
for p in [
"ds",
"cache",
"cachedir",
"parallel",
"parallel_method",
"progress",
"chunks",
"chunks_maxsize",
"api_timeout",
"box",
]
if p in kw
]
login_page = "%s/login.html" % self.server.rstrip("/")
self.fs = httpstore_erddap_auth(
login=login_page, auto=False, **{**kw, **self.store_opts}
)
def __repr__(self):
summary = [super().__repr__()]
summary.append(
"Performances: cache=%s, parallel=%s"
% (str(self.fs.cache), str(self.parallel_method))
)
summary.append("User mode: %s" % "expert")
summary.append("Dataset: %s" % self.dataset_id)
return "\n".join(summary)
def _add_attributes(self, this): # noqa: C901
"""Add variables attributes not return by erddap requests
This is hard coded, but should be retrieved from an API somewhere
"""
this = super()._add_attributes(this)
if "DIRECTION" in this.data_vars:
this["DIRECTION"].attrs[
"comment"
] = "Set to 'A' for all CTD stations by default"
if "PLATFORM_NUMBER" in this.data_vars:
this["PLATFORM_NUMBER"].attrs["long_name"] = "Fake unique identifier"
this["PLATFORM_NUMBER"].attrs[
"comment"
] = "This was inferred from EXPOCODE and is not a real WMO"
if "CYCLE_NUMBER" in this.data_vars:
this["CYCLE_NUMBER"].attrs["long_name"] = "Station number"
this["CYCLE_NUMBER"].attrs[
"comment"
] = "This was computed using unique TIME for each EXPOCODE"
this["CYCLE_NUMBER"].attrs["convention"] = "-"
return this
def _init_erddapy(self):
# Init erddapy
self.erddap = ERDDAP(server=str(self.server), protocol="tabledap")
self.erddap.response = "nc"
self.erddap.dataset_id = "Argo-ref-ctd"
return self
@property
def _minimal_vlist(self):
"""Return the minimal list of variables to retrieve measurements for"""
# vlist = super()._minimal_vlist
vlist = list()
plist = ["latitude", "longitude", "time"]
[vlist.append(p) for p in plist]
plist = ["pres", "temp", "psal", "ptmp", "source", "qclevel"]
[vlist.append(p) for p in plist]
return vlist
def to_xarray(self, errors: str = "ignore"): # noqa: C901
"""Load CTD-Reference data and return a xarray.DataSet"""
ds = super().to_xarray(errors=errors)
ds = ds.rename({"SOURCE": "EXPOCODE"})
ds["DIRECTION"] = xr.full_like(ds["EXPOCODE"], "A", dtype=str)
g = []
for iplatform, grp in enumerate(ds.groupby("EXPOCODE")):
code, this_ds = grp
for istation, sub_grp in enumerate(this_ds.groupby("TIME")):
sub_grp[-1]["CYCLE_NUMBER"] = xr.full_like(
sub_grp[-1]["TIME"], istation, int
)
sub_grp[-1]["PLATFORM_NUMBER"] = xr.full_like(
sub_grp[-1]["TIME"], iplatform + 900000, int
)
g.append(sub_grp[-1])
ds = xr.concat(
g, dim="N_POINTS", data_vars="minimal", coords="minimal", compat="override"
)
ds.attrs["DATA_ID"] = "ARGO_Reference_CTD"
ds.attrs["DOI"] = "-"
# Cast data types and add variable attributes (not available in the csv download):
ds = self._add_attributes(ds)
ds = ds.argo.cast_types()
return ds
[docs]class Fetch_box(ErddapREFDataFetcher):
"""Manage access to Argo CTD-reference data through Ifremer ERDDAP for: an ocean rectangle"""
def init(self, box: list, **kw):
"""Create Argo data loader
Parameters
----------
box : list(float, float, float, float, float, float, str, str)
The box domain to load all Argo data for:
box = [lon_min, lon_max, lat_min, lat_max, pres_min, pres_max]
or:
box = [lon_min, lon_max, lat_min, lat_max, pres_min, pres_max, datim_min, datim_max]
"""
self.BOX = box.copy()
self.definition = (
"Ifremer erddap Argo CTD-REFERENCE data fetcher for a space/time region"
)
return self
def define_constraints(self):
"""Define request constraints"""
self.erddap.constraints = {"longitude>=": conv_lon(self.BOX[0], conv='360')}
self.erddap.constraints.update({"longitude<=": conv_lon(self.BOX[1], conv='360')})
self.erddap.constraints.update({"latitude>=": self.BOX[2]})
self.erddap.constraints.update({"latitude<=": self.BOX[3]})
self.erddap.constraints.update({"pres>=": self.BOX[4]})
self.erddap.constraints.update({"pres<=": self.BOX[5]})
if len(self.BOX) == 8:
self.erddap.constraints.update({"time>=": self.BOX[6]})
self.erddap.constraints.update({"time<=": self.BOX[7]})
return None
@property
def uri(self):
"""List of files to load for a request
Returns
-------
list(str)
"""
if not self.parallel:
return [self.get_url()]
else:
self.Chunker = Chunker(
{"box": self.BOX}, chunks=self.chunks, chunksize=self.chunks_maxsize
)
boxes = self.Chunker.fit_transform()
urls = []
for box in boxes:
urls.append(
Fetch_box(
box=box, ds=self.dataset_id, fs=self.fs, server=self.server
).get_url()
)
return urls