#!/bin/env python
# -*coding: UTF-8 -*-
"""
High level helper methods to load Argo data from any source
The facade should be able to work with all available data access point,
Validity of access points parameters (eg: wmo) is made here, not at the data/index source fetcher level
"""
import warnings
import xarray as xr
import pandas as pd
import logging
from argopy.options import OPTIONS, _VALIDATORS
from .errors import InvalidFetcherAccessPoint, InvalidFetcher
from .utilities import list_available_data_src, list_available_index_src, is_box, is_indexbox, check_wmo
from .plotters import plot_trajectory, bar_plot
AVAILABLE_DATA_SOURCES = list_available_data_src()
AVAILABLE_INDEX_SOURCES = list_available_index_src()
log = logging.getLogger("argopy.fetchers.facade")
def checkAccessPoint(AccessPoint):
""" Decorator to validate fetcher access points of a given data source
This decorator will check if an access point (eg: 'profile') is available for the data source (eg: 'erddap')
used to initiate the checker. If not, an error is raised.
"""
def wrapper(*args):
if AccessPoint.__name__ not in args[0].valid_access_points:
raise InvalidFetcherAccessPoint(
"'%s' not available with '%s' src. Available access point(s): %s" %
(AccessPoint.__name__, args[0]._src, ", ".join(args[0].Fetchers.keys()))
)
return AccessPoint(*args)
return wrapper
[docs]class ArgoDataFetcher:
""" Fetcher and post-processor of Argo data (API facade) """
[docs] def __init__(self, mode: str = "", src: str = "", ds: str = "", **fetcher_kwargs):
""" Create a fetcher instance
Parameters
----------
mode: str, optional
User mode. Eg: ``standard`` or ``expert``. Set to OPTIONS['mode'] by default if empty.
src: str, optional
Source of the data to use. Eg: ``erddap``. Set to OPTIONS['src'] by default if empty.
ds: str, optional
Name of the dataset to load. Eg: ``phy``. Set to OPTIONS['dataset'] by default if empty.
**fetcher_kwargs: optional
Additional arguments passed on data source instance creation of each access points.
Returns
-------
:class:`argopy.fetchers.ArgoDataFetcher`
"""
# Facade options:
self._mode = OPTIONS["mode"] if mode == "" else mode
self._dataset_id = OPTIONS["dataset"] if ds == "" else ds
self._src = OPTIONS["src"] if src == "" else src
_VALIDATORS["mode"](self._mode)
_VALIDATORS["src"](self._src)
_VALIDATORS["dataset"](self._dataset_id)
# Load data source access points:
if self._src not in AVAILABLE_DATA_SOURCES:
raise InvalidFetcher(
"Requested data fetcher '%s' not available ! Please try again with any of: %s"
% (self._src, "\n".join(AVAILABLE_DATA_SOURCES))
)
else:
Fetchers = AVAILABLE_DATA_SOURCES[self._src]
# Auto-discovery of access points for this fetcher:
# rq: Access point names for the facade are not the same as the access point of fetchers
self.Fetchers = {}
self.valid_access_points = []
for p in Fetchers.access_points:
if p == "box": # Required for 'region'
self.Fetchers["region"] = Fetchers.Fetch_box
self.valid_access_points.append("region")
if p == "wmo": # Required for 'profile' and 'float'
self.Fetchers["float"] = Fetchers.Fetch_wmo
self.valid_access_points.append("float")
self.Fetchers["profile"] = Fetchers.Fetch_wmo
self.valid_access_points.append("profile")
# Init sub-methods:
self.fetcher = None
if self._dataset_id not in Fetchers.dataset_ids:
raise ValueError(
"%s dataset is not available for this data source (%s)"
% (self._dataset_id, self._src)
)
self.fetcher_kwargs = {**fetcher_kwargs}
self.fetcher_options = {**{"ds": self._dataset_id}, **fetcher_kwargs}
self.postproccessor = self.__empty_processor
self._AccessPoint = None
# Init data structure holders:
self._index = None
self._data = None
# Dev warnings
# Todo Clean-up before each release
if self._dataset_id == "bgc" and self._mode == "standard":
warnings.warn(
"'BGC' dataset fetching in 'standard' user mode is not reliable. "
"Try to switch to 'expert' mode if you encounter errors."
)
def __repr__(self):
if self.fetcher:
summary = [self.fetcher.__repr__()]
if "parallel" in self.fetcher_options:
summary.append(
"Backend: %s (parallel=%s)"
% (self._src, str(self.fetcher_options["parallel"]))
)
else:
summary.append("Backend: %s" % self._src)
else:
summary = ["<datafetcher.%s> 'No access point initialised'" % self._src]
summary.append("Available access points: %s" % ", ".join(self.Fetchers.keys()))
if "parallel" in self.fetcher_options:
summary.append("Backend: %s (parallel=%s)" % (self._src, str(self.fetcher_options["parallel"])))
else:
summary.append("Backend: %s" % self._src)
summary.append("User mode: %s" % self._mode)
summary.append("Dataset: %s" % self._dataset_id)
return "\n".join(summary)
def __empty_processor(self, xds):
""" Do nothing to a dataset """
return xds
def __getattr__(self, key):
""" Validate access points """
valid_attrs = [
"Fetchers",
"fetcher",
"fetcher_options",
"postproccessor",
"data",
"index",
"_loaded",
"_request"
]
if key not in self.valid_access_points and key not in valid_attrs:
raise InvalidFetcherAccessPoint("'%s' is not a valid access point" % key)
pass
@property
def uri(self):
""" List of resources to load for a request
This can be a list of paths or urls, depending on the data source selected.
Returns
-------
list(str)
"""
if self.fetcher:
return self.fetcher.uri
else:
raise InvalidFetcherAccessPoint(
" Initialize an access point (%s) first."
% ",".join(self.Fetchers.keys())
)
@property
def data(self):
""" Data structure
Returns
--------
:class:`xarray.DataArray`
"""
if not isinstance(self._data, xr.Dataset):
self.load()
return self._data
@property
def index(self):
""" Index structure, as returned by the to_index method
Returns
--------
:class:`pandas.Dataframe`
"""
if not isinstance(self._index, pd.core.frame.DataFrame):
self.load()
return self._index
def dashboard(self, **kw):
try:
return self.fetcher.dashboard(**kw)
except Exception:
warnings.warn(
"dashboard not available for this fetcher access point (%s/%s)"
% (self._src, self._AccessPoint)
)
[docs] @checkAccessPoint
def float(self, wmo, **kw):
""" Float data fetcher
Parameters
----------
wmo: int, list(int)
Define the list of Argo floats to load data for. This is a list of integers with WMO float identifiers.
WMO is the World Meteorological Organization.
Returns
-------
:class:`argopy.fetchers.ArgoDataFetcher.float`
A data source fetcher for all float profiles
"""
wmo = check_wmo(wmo) # Check and return a valid list of WMOs
if "CYC" in kw or "cyc" in kw:
raise TypeError(
"float() got an unexpected keyword argument 'cyc'. Use 'profile' access "
"point to fetch specific profile data."
)
self.fetcher = self.Fetchers["float"](WMO=wmo, **self.fetcher_options)
self._AccessPoint = "float" # Register the requested access point
self._AccessPoint_data = {'wmo': wmo} # Register the requested access point data
if self._mode == "standard" and self._dataset_id != "ref":
def postprocessing(xds):
xds = self.fetcher.filter_data_mode(xds)
xds = self.fetcher.filter_qc(xds)
xds = self.fetcher.filter_variables(xds, self._mode)
return xds
self.postproccessor = postprocessing
return self
[docs] @checkAccessPoint
def profile(self, wmo, cyc):
""" Profile data fetcher
Parameters
----------
wmo: int, list(int)
Define the list of Argo floats to load data for. This is a list of integers with WMO float identifiers.
WMO is the World Meteorological Organization.
cyc: list(int)
Define the list of cycle numbers to load for each Argo floats listed in ``wmo``.
Returns
-------
:class:`argopy.fetchers.ArgoDataFetcher.profile`
A data source fetcher for specific float profiles
"""
wmo = check_wmo(wmo) # Check and return a valid list of WMOs
self.fetcher = self.Fetchers["profile"](WMO=wmo, CYC=cyc, **self.fetcher_options)
self._AccessPoint = "profile" # Register the requested access point
self._AccessPoint_data = {'wmo': wmo, 'cyc': cyc} # Register the requested access point data
if self._mode == "standard" and self._dataset_id != "ref":
def postprocessing(xds):
xds = self.fetcher.filter_data_mode(xds)
xds = self.fetcher.filter_qc(xds)
xds = self.fetcher.filter_variables(xds, self._mode)
return xds
self.postproccessor = postprocessing
return self
[docs] @checkAccessPoint
def region(self, box: list):
""" Space/time domain data fetcher
Parameters
----------
box: list()
Define the domain to load Argo data for. The box list is made of:
- lon_min: float, lon_max: float,
- lat_min: float, lat_max: float,
- dpt_min: float, dpt_max: float,
- date_min: str (optional), date_max: str (optional)
Longitude, latitude and pressure bounds are required, while the two bounding dates are optional.
If bounding dates are not specified, the entire time series is fetched.
Eg: [-60, -55, 40., 45., 0., 10., '2007-08-01', '2007-09-01']
Returns
-------
:class:`argopy.fetchers.ArgoDataFetcher`
A data source fetcher for a space/time domain
"""
is_box(box, errors="raise") # Validate the box definition
self.fetcher = self.Fetchers["region"](box=box, **self.fetcher_options)
self._AccessPoint = "region" # Register the requested access point
self._AccessPoint_data = {'box': box} # Register the requested access point data
if self._mode == "standard" and self._dataset_id != "ref":
def postprocessing(xds):
xds = self.fetcher.filter_data_mode(xds)
xds = self.fetcher.filter_qc(xds)
xds = self.fetcher.filter_variables(xds, self._mode)
return xds
self.postproccessor = postprocessing
return self
[docs] def to_xarray(self, **kwargs):
""" Fetch and return data as xarray.DataSet
Returns
-------
:class:`xarray.DataSet`
"""
if not self.fetcher:
raise InvalidFetcher(
" Initialize an access point (%s) first."
% ",".join(self.Fetchers.keys())
)
xds = self.fetcher.to_xarray(**kwargs)
xds = self.postproccessor(xds)
return xds
[docs] def to_dataframe(self, **kwargs):
""" Fetch and return data as pandas.Dataframe
Returns
-------
:class:`pandas.Dataframe`
"""
if not self.fetcher:
raise InvalidFetcher(
" Initialize an access point (%s) first."
% ",".join(self.Fetchers.keys())
)
return self.load().data.to_dataframe(**kwargs)
[docs] def to_index(self, full: bool = False):
""" Create an index of Argo data
Parameters
----------
full: bool
Should extract a full index, as returned by an IndexFetcher or only a space/time
index of fetched profiles (this is the default choice, i.e. full=False).
Returns
-------
:class:`pandas.Dataframe`
"""
if not full:
self.load()
ds = self.data.argo.point2profile()
df = (
ds.drop_vars(set(ds.data_vars) - set(["PLATFORM_NUMBER"]))
.drop_dims("N_LEVELS")
.to_dataframe()
)
df = (
df.reset_index()
.rename(
columns={
"PLATFORM_NUMBER": "wmo",
"LONGITUDE": "longitude",
"LATITUDE": "latitude",
"TIME": "date",
}
)
.drop(columns="N_PROF")
)
df = df[["date", "latitude", "longitude", "wmo"]]
else:
# Instantiate and load an IndexFetcher:
index_loader = ArgoIndexFetcher(mode=self._mode,
src=self._src,
ds=self._dataset_id,
**self.fetcher_kwargs)
if self._AccessPoint == 'float':
index_loader.float(self._AccessPoint_data['wmo']).load()
if self._AccessPoint == 'profile':
index_loader.profile(self._AccessPoint_data['wmo'], self._AccessPoint_data['cyc']).load()
if self._AccessPoint == 'region':
# Convert data box to index box (remove depth info):
index_box = self._AccessPoint_data['box'].copy()
del index_box[4:6]
index_loader.region(index_box).load()
df = index_loader.index
if self._loaded and self._mode == 'standard' and len(self._index) != len(df):
warnings.warn("Loading a full index in 'standard' user mode may lead to more profiles in the "
"index than reported in data.")
# Possibly replace the light index with the full version:
if not self._loaded or self._request == self.__repr__():
self._index = df
return df
[docs] def load(self, force: bool = False, **kwargs):
""" Load data in memory
Apply the default to_xarray() and to_index() methods and store results in memory.
Access loaded measurements structure with the `data` and `index` properties::
ds = ArgoDataFetcher().profile(6902746, 34).load().data
# or
df = ArgoDataFetcher().float(6902746).load().index
Parameters
----------
force: bool
Force loading, default is False.
Returns
-------
:class:`argopy.fetchers.ArgoDataFetcher.float`
Data fetcher with `data` and `index` properties in memory
"""
# Force to load data if the fetcher definition has changed
if self._loaded and self._request != self.__repr__():
force = True
if not self._loaded or force:
# Fetch measurements:
self._data = self.to_xarray(**kwargs)
# Next 2 lines must come before ._index because to_index() calls back on .load() to read .data
self._request = self.__repr__() # Save definition of loaded data
self._loaded = True
# Extract measurements index from data:
self._index = self.to_index(full=False)
return self
def clear_cache(self):
""" Clear data cached by fetcher """
if not self.fetcher:
raise InvalidFetcher(
" Initialize an access point (%s) first."
% ",".join(self.Fetchers.keys())
)
return self.fetcher.clear_cache()
[docs] def plot(self, ptype="trajectory", **kwargs):
""" Create custom plots from data
Parameters
----------
ptype: str
Type of plot to generate. This can be: 'trajectory',' profiler', 'dac'.
Returns
-------
fig: :class:`matplotlib.figure.Figure`
ax: :class:`matplotlib.axes.Axes`
"""
self.load()
if ptype in ["dac", "institution"]:
if "institution" not in self.index:
self.to_index(full=True)
return bar_plot(self.index, by="institution", **kwargs)
elif ptype == "profiler":
if "profiler" not in self.index:
self.to_index(full=True)
return bar_plot(self.index, by="profiler", **kwargs)
elif ptype == "trajectory":
return plot_trajectory(self.index, **kwargs)
else:
raise ValueError(
"Type of plot unavailable. Use: 'dac', 'profiler' or 'trajectory' (default)"
)
class ArgoIndexFetcher:
"""
Specs discussion :
https://github.com/euroargodev/argopy/issues/8
https://github.com/euroargodev/argopy/pull/6)
Usage:
from argopy import ArgoIndexFetcher
idx = ArgoIndexFetcher.region([-75, -65, 10, 20])
idx.plot.trajectories()
idx.load().to_dataframe()
Fetch and process Argo index.
Can return metadata from index of :
- one or more float(s), defined by WMOs
- one or more profile(s), defined for one WMO and one or more CYCLE NUMBER
- a space/time rectangular domain, defined by lat/lon/pres/time range
idx object can also be used as an input :
argo_loader = ArgoDataFetcher(index=idx)
Specify here all options to data_fetchers
"""
def __init__(self, mode: str = "", src: str = "", ds: str = "", **fetcher_kwargs):
# Facade options:
self._mode = OPTIONS["mode"] if mode == "" else mode
self._dataset_id = OPTIONS["dataset"] if ds == "" else ds
self._src = OPTIONS["src"] if src == "" else src
_VALIDATORS["mode"](self._mode)
_VALIDATORS["src"](self._src)
# Load data source access points:
if self._src not in AVAILABLE_INDEX_SOURCES:
raise InvalidFetcher(
"Requested index fetcher '%s' not available ! "
"Please try again with any of: %s"
% (self._src, "\n".join(AVAILABLE_INDEX_SOURCES))
)
else:
Fetchers = AVAILABLE_INDEX_SOURCES[self._src]
# Auto-discovery of access points for this fetcher:
# rq: Access point names for the facade are not the same as the access point of fetchers
self.Fetchers = {}
self.valid_access_points = []
for p in Fetchers.access_points:
if p == "box": # Required for 'region'
self.Fetchers["region"] = Fetchers.Fetch_box
self.valid_access_points.append("region")
if p == "wmo": # Required for 'profile' and 'float'
self.Fetchers["float"] = Fetchers.Fetch_wmo
self.valid_access_points.append("float")
self.Fetchers["profile"] = Fetchers.Fetch_wmo
self.valid_access_points.append("profile")
# Init sub-methods:
self.fetcher = None
if self._dataset_id not in Fetchers.dataset_ids:
raise ValueError(
"%s dataset is not available for this index source (%s)"
% (self._dataset_id, self._src)
)
self.fetcher_options = {**fetcher_kwargs}
self.postproccessor = self.__empty_processor
self._AccessPoint = None
# Init data structure holders:
self._index = None
def __repr__(self):
if self.fetcher:
summary = [self.fetcher.__repr__()]
summary.append("Backend: %s" % self._src)
else:
summary = ["<indexfetcher.%s> 'No access point initialised'" % self._src]
summary.append("Available access points: %s" % ", ".join(self.Fetchers.keys()))
summary.append("Backend: %s" % self._src)
summary.append("User mode: %s" % self._mode)
summary.append("Dataset: %s" % self._dataset_id)
return "\n".join(summary)
def __empty_processor(self, xds):
""" Do nothing to a dataset """
return xds
def __getattr__(self, key):
""" Validate access points """
valid_attrs = [
"Fetchers",
"fetcher",
"fetcher_options",
"postproccessor",
"index",
"_loaded",
]
if key not in self.valid_access_points and key not in valid_attrs:
raise InvalidFetcherAccessPoint("'%s' is not a valid access point" % key)
pass
@property
def index(self):
""" Index structure
Returns
--------
:class:`pandas.Dataframe`
"""
if not isinstance(self._index, pd.core.frame.DataFrame):
self.load()
return self._index
[docs] @checkAccessPoint
def float(self, wmo):
""" Float index fetcher
Parameters
----------
wmo: list(int)
Define the list of Argo floats to load data for. This is a list of integers with WMO numbers.
Returns
-------
:class:`argopy.fetchers.ArgoIndexFetcher.float`
An index source fetcher for all float profiles index
"""
wmo = check_wmo(wmo) # Check and return a valid list of WMOs
self.fetcher = self.Fetchers["float"](WMO=wmo, **self.fetcher_options)
self._AccessPoint = "float" # Register the requested access point
return self
@checkAccessPoint
def profile(self, wmo, cyc):
""" Profile index fetcher
Parameters
----------
wmo: int, list(int)
Define the list of Argo floats to load index for. This is a list of integers with WMO float identifiers.
WMO is the World Meteorological Organization.
cyc: list(int)
Define the list of cycle numbers to load for each Argo floats listed in ``wmo``.
Returns
-------
:class:`argopy.fetchers.ArgoIndexFetcher`
A index fetcher initialised for specific float profiles
"""
wmo = check_wmo(wmo) # Check and return a valid list of WMOs
self.fetcher = self.Fetchers["profile"](WMO=wmo, CYC=cyc, **self.fetcher_options)
self._AccessPoint = "profile" # Register the requested access point
return self
[docs] @checkAccessPoint
def region(self, box):
""" Space/time domain index fetcher
Parameters
----------
box: list()
Define the domain to load Argo index for. The box list is made of:
- lon_min: float, lon_max: float,
- lat_min: float, lat_max: float,
- date_min: str (optional), date_max: str (optional)
Longitude and latitude bounds are required, while the two bounding dates are optional.
If bounding dates are not specified, the entire time series is fetched.
Eg: [-60, -55, 40., 45., '2007-08-01', '2007-09-01']
Returns
-------
:class:`argopy.fetchers.ArgoIndexFetcher`
A index fetcher initialised for a space/time domain
Warning
-------
Note that the box option for an index fetcher does not have pressure bounds, contrary to the data fetcher.
"""
is_indexbox(box, errors="raise") # Validate the box definition
self.fetcher = self.Fetchers["region"](box=box, **self.fetcher_options)
self._AccessPoint = "region" # Register the requested access point
return self
[docs] def to_dataframe(self, **kwargs):
""" Fetch and return index data as pandas Dataframe
Returns
-------
:class:`pandas.Dataframe`
"""
if not self.fetcher:
raise InvalidFetcher(
" Initialize an access point (%s) first."
% ",".join(self.Fetchers.keys())
)
return self.fetcher.to_dataframe(**kwargs)
[docs] def to_xarray(self, **kwargs):
""" Fetch and return index data as xarray DataSet
This is a shortcut to .load().index.to_xarray()
Returns
-------
:class:`xarray.DataSet`
"""
if self._AccessPoint not in self.valid_access_points:
raise InvalidFetcherAccessPoint(
" Initialize an access point (%s) first."
% ",".join(self.Fetchers.keys())
)
return self.load().index.to_xarray(**kwargs)
[docs] def to_csv(self, file: str = "output_file.csv"):
""" Fetch and save index data as csv in a file
This is a shortcut to .load().index.to_csv()
Returns
-------
None
"""
if self._AccessPoint not in self.valid_access_points:
raise InvalidFetcherAccessPoint(
" Initialize an access point (%s) first."
% ",".join(self.Fetchers.keys())
)
return self.load().index.to_csv(file)
[docs] def load(self, force: bool = False):
""" Load index in memory
Apply the default to_dataframe() method and store results in memory.
Access loaded index structure with the `index` property::
df = ArgoIndexFetcher().float(6902746).load().index
Parameters
----------
force: bool
Force loading, default is False.
Returns
-------
:class:`argopy.fetchers.ArgoIndexFetcher.float`
Index fetcher with `index` property in memory
"""
# Force to load data if the fetcher definition has changed
if self._loaded and self._request != self.__repr__():
force = True
if not self._loaded or force:
self._index = self.to_dataframe()
self._request = self.__repr__() # Save definition of loaded data
self._loaded = True
return self
[docs] def plot(self, ptype="trajectory", **kwargs):
""" Create custom plots from index
Parameters
----------
ptype: str
Type of plot to generate. This can be: 'trajectory',' profiler', 'dac'.
Returns
-------
fig: :class:`matplotlib.figure.Figure`
ax: :class:`matplotlib.axes.Axes`
"""
self.load()
if ptype in ["dac", "institution"]:
return bar_plot(self.index, by="institution", **kwargs)
elif ptype == "profiler":
return bar_plot(self.index, by="profiler", **kwargs)
elif ptype == "trajectory":
return plot_trajectory(self.index.sort_values(["file"]), **kwargs)
else:
raise ValueError(
"Type of plot unavailable. Use: 'dac', 'profiler' or 'trajectory' (default)"
)
def clear_cache(self):
""" Clear fetcher cached data """
return self.fetcher.clear_cache()