#!/bin/env python
# -*coding: UTF-8 -*-
"""
High level helper methods to load Argo data from any source
The facade should be able to work with all available data access point,
Usage for LOCALFTP:
from argopy import DataFetcher as ArgoDataFetcher
argo_loader = ArgoDataFetcher(src='localftp', ds='phy')
or
argo_loader = ArgoDataFetcher(src='localftp', ds='bgc')
argo_loader.float(6902746).to_xarray()
argo_loader.float([6902746, 6902747, 6902757, 6902766]).to_xarray()
Usage for ERDDAP (default src):
from argopy import DataFetcher as ArgoDataFetcher
argo_loader = ArgoDataFetcher(src='erddap')
or
argo_loader = ArgoDataFetcher(src='erddap', cachedir='tmp', cache=True)
or
argo_loader = ArgoDataFetcher(src='erddap', ds='ref')
argo_loader.profile(6902746, 34).to_xarray()
argo_loader.profile(6902746, np.arange(12,45)).to_xarray()
argo_loader.profile(6902746, [1,12]).to_xarray()
or
argo_loader.float(6902746).to_xarray()
argo_loader.float([6902746, 6902747, 6902757, 6902766]).to_xarray()
argo_loader.float([6902746, 6902747, 6902757, 6902766], CYC=1).to_xarray()
or
argo_loader.region([-85,-45,10.,20.,0,1000.]).to_xarray()
argo_loader.region([-85,-45,10.,20.,0,1000.,'2012-01','2014-12']).to_xarray()
"""
import os
import sys
import glob
import pandas as pd
import xarray as xr
import numpy as np
import warnings
from argopy.options import OPTIONS, _VALIDATORS
from .errors import InvalidFetcherAccessPoint, InvalidFetcher
from .utilities import list_available_data_src
AVAILABLE_SOURCES = list_available_data_src()
# Import plotters :
from .plotters import plot_trajectory, plot_dac, plot_profilerType
# Highest level API / Facade:
[docs]class ArgoDataFetcher(object):
""" Fetch and process Argo data.
Can return data selected from:
- one or more float(s), defined by WMOs
- one or more profile(s), defined for one WMO and one or more CYCLE NUMBER
- a space/time rectangular domain, defined by lat/lon/pres/time range
Can return data from the regular Argo dataset ('phy': temperature, salinity) and the Argo referenced
dataset used in DMQC ('ref': temperature, salinity).
This is the main API facade.
Specify here all options to data_fetchers.
"""
[docs] def __init__(self,
mode: str = "",
src : str = "",
ds: str = "",
**fetcher_kwargs):
# Facade options:
self._mode = OPTIONS['mode'] if mode == '' else mode
self._dataset_id = OPTIONS['dataset'] if ds == '' else ds
self._src = OPTIONS['src'] if src == '' else src
_VALIDATORS['mode'](self._mode)
_VALIDATORS['src'](self._src)
_VALIDATORS['dataset'](self._dataset_id)
# Load src access points:
if self._src not in AVAILABLE_SOURCES:
raise ValueError("Data fetcher '%s' not available" % self._src)
else:
Fetchers = AVAILABLE_SOURCES[self._src]
# Auto-discovery of access points for this fetcher:
# rq: Access point names for the facade are not the same as the access point of fetchers
self.valid_access_points = ['profile', 'float', 'region']
self.Fetchers = {}
for p in Fetchers.access_points:
if p == 'wmo': # Required for 'profile' and 'float'
self.Fetchers['profile'] = Fetchers.Fetch_wmo
self.Fetchers['float'] = Fetchers.Fetch_wmo
if p == 'box': # Required for 'region'
self.Fetchers['region'] = Fetchers.Fetch_box
# Init sub-methods:
self.fetcher = None
if ds is None:
ds = Fetchers.dataset_ids[0]
self.fetcher_options = {**{'ds': ds}, **fetcher_kwargs}
self.postproccessor = self.__empty_processor
# Dev warnings
#Todo Clean-up before each release
if self._dataset_id == 'bgc' and self._mode == 'standard':
warnings.warn(" 'BGC' dataset fetching in 'standard' user mode is not reliable. "
"Try to switch to 'expert' mode if you encounter errors.")
def __repr__(self):
if self.fetcher:
summary = [self.fetcher.__repr__()]
summary.append("Backend: %s" % self._src)
summary.append("User mode: %s" % self._mode)
else:
summary = ["<datafetcher 'Not initialised'>"]
summary.append("Backend: %s" % self._src)
summary.append("Fetchers: %s" % ", ".join(self.Fetchers.keys()))
summary.append("User mode: %s" % self._mode)
return "\n".join(summary)
def __empty_processor(self, xds):
""" Do nothing to a dataset """
return xds
def __getattr__(self, key):
""" Validate access points """
# print("key", key)
valid_attrs = ['Fetchers', 'fetcher', 'fetcher_options', 'postproccessor']
if key not in self.valid_access_points and key not in valid_attrs:
raise InvalidFetcherAccessPoint("'%s' is not a valid access point" % key)
pass
[docs] def float(self, wmo, **kw):
""" Fetch data from a float """
if "CYC" in kw or "cyc" in kw:
raise TypeError("float() got an unexpected keyword argument 'cyc'. Use 'profile' access "
"point to fetch specific profile data.")
if 'float' in self.Fetchers:
self.fetcher = self.Fetchers['float'](WMO=wmo, **self.fetcher_options)
else:
raise InvalidFetcherAccessPoint("'float' not available with '%s' src" % self._src)
if self._mode == 'standard' and self._dataset_id != 'ref':
def postprocessing(xds):
xds = self.fetcher.filter_data_mode(xds)
xds = self.fetcher.filter_qc(xds)
xds = self.fetcher.filter_variables(xds, self._mode)
return xds
self.postproccessor = postprocessing
return self
[docs] def profile(self, wmo, cyc):
""" Fetch data from a profile
given one or more WMOs and CYCLE_NUMBER
"""
if 'profile' in self.Fetchers:
self.fetcher = self.Fetchers['profile'](WMO=wmo, CYC=cyc, **self.fetcher_options)
else:
raise InvalidFetcherAccessPoint("'profile' not available with '%s' src" % self._src)
if self._mode == 'standard' and self._dataset_id != 'ref':
def postprocessing(xds):
xds = self.fetcher.filter_data_mode(xds)
xds = self.fetcher.filter_qc(xds)
xds = self.fetcher.filter_variables(xds, self._mode)
return xds
self.postproccessor = postprocessing
return self
[docs] def region(self, box: list):
""" Fetch data from a space/time domain
Parameters
----------
box: list(lon_min: float, lon_max: float, lat_min: float, lat_max: float, pres_min: float, pres_max: float,
date_min: str, date_max: str)
Define the domain to load all Argo data for. Longitude, latitude and pressure bounds are required, while
the two bounding dates [date_min and date_max] are optional. If not specificied, the entire time series
is requested.
Returns
-------
:class:`argopy.DataFetcher` with an access point initialized.
"""
if 'region' in self.Fetchers:
self.fetcher = self.Fetchers['region'](box=box, **self.fetcher_options)
else:
raise InvalidFetcherAccessPoint("'region' not available with '%s' src" % self._src)
if self._mode == 'standard' and self._dataset_id != 'ref':
def postprocessing(xds):
xds = self.fetcher.filter_data_mode(xds)
xds = self.fetcher.filter_qc(xds)
xds = self.fetcher.filter_variables(xds, self._mode)
return xds
self.postproccessor = postprocessing
return self
[docs] def to_xarray(self, **kwargs):
""" Fetch and return data as xarray.DataSet """
if not self.fetcher:
raise InvalidFetcher(" Initialize an access point (%s) first." %
",".join(self.Fetchers.keys()))
xds = self.fetcher.to_xarray(**kwargs)
xds = self.postproccessor(xds)
return xds
[docs] def to_dataframe(self, **kwargs):
""" Fetch and return data as pandas.Dataframe """
xds = self.to_xarray(**kwargs)
return xds.to_dataframe()
[docs]class ArgoIndexFetcher(object):
"""
Specs discussion :
https://github.com/euroargodev/argopy/issues/8
https://github.com/euroargodev/argopy/pull/6)
Usage :
from argopy import ArgoIndexFetcher
idx = ArgoIndexFetcher.region([-75, -65, 10, 20])
idx.plot.trajectories()
idx.to_dataframe()
Fetch and process Argo index.
Can return metadata from index of :
- one or more float(s), defined by WMOs
- one or more profile(s), defined for one WMO and one or more CYCLE NUMBER
- a space/time rectangular domain, defined by lat/lon/pres/time range
idx object can also be used as an input :
argo_loader = ArgoDataFetcher(index=idx)
Specify here all options to data_fetchers
"""
[docs] def __init__(self,
mode: str = "",
src : str = "",
**fetcher_kwargs):
# Facade options:
self._mode = OPTIONS['mode'] if mode == '' else mode
self._src = OPTIONS['src'] if src == '' else src
_VALIDATORS['mode'](self._mode)
_VALIDATORS['src'](self._src)
# Load src access points:
if self._src not in AVAILABLE_SOURCES:
raise ValueError("Fetcher '%s' not available" % self._src)
else:
Fetchers = AVAILABLE_SOURCES[self._src]
# Auto-discovery of access points for this fetcher:
# rq: Access point names for the facade are not the same as the access point of fetchers
self.valid_access_points = ['float', 'region']
self.Fetchers = {}
for p in Fetchers.access_points:
if p == 'wmo': # Required for 'profile' and 'float'
self.Fetchers['float'] = Fetchers.IndexFetcher_wmo
if p == 'box': # Required for 'region'
self.Fetchers['region'] = Fetchers.IndexFetcher_box
# Init sub-methods:
self.fetcher = None
self.fetcher_options = {**fetcher_kwargs}
self.postproccessor = self.__empty_processor
def __repr__(self):
if self.fetcher:
summary = [self.fetcher.__repr__()]
summary.append("User mode: %s" % self._mode)
else:
summary = ["<indexfetcher 'Not initialised'>"]
summary.append("Fetchers: 'float' or 'region'")
summary.append("User mode: %s" % self._mode)
return "\n".join(summary)
def __empty_processor(self, xds):
""" Do nothing to a dataset """
return xds
def __getattr__(self, key):
""" Validate access points """
valid_attrs = ['Fetchers', 'fetcher', 'fetcher_options', 'postproccessor']
if key not in self.valid_access_points and key not in valid_attrs:
raise InvalidFetcherAccessPoint("'%s' is not a valid access point" % key)
pass
[docs] def float(self, wmo):
""" Load index for one or more WMOs """
if 'float' in self.Fetchers:
self.fetcher = self.Fetchers['float'](WMO=wmo, **self.fetcher_options)
else:
raise InvalidFetcherAccessPoint("'float' not available with '%s' src" % self._src)
return self
[docs] def region(self, box):
""" Load index for a rectangular space/time domain region """
if 'region' in self.Fetchers:
self.fetcher = self.Fetchers['region'](box=box, **self.fetcher_options)
else:
raise InvalidFetcherAccessPoint("'region' not available with '%s' src" % self._src)
return self
[docs] def to_dataframe(self, **kwargs):
""" Fetch index and return pandas.Dataframe """
if not self.fetcher:
raise InvalidFetcher(" Initialize an access point (%s) first." %
",".join(self.Fetchers.keys()))
return self.fetcher.to_dataframe(**kwargs)
[docs] def to_xarray(self, **kwargs):
""" Fetch index and return xr.dataset """
if not self.fetcher:
raise InvalidFetcher(" Initialize an access point (%s) first." %
",".join(self.Fetchers.keys()))
return self.fetcher.to_xarray(**kwargs)
[docs] def to_csv(self, file: str='output_file.csv'):
""" Fetch index and return csv """
if not self.fetcher:
raise InvalidFetcher(" Initialize an access point (%s) first." %
",".join(self.Fetchers.keys()))
return self.to_dataframe().to_csv(file)
[docs] def plot(self, ptype='trajectory'):
""" Create custom plots from index
Parameters
----------
ptype: str
Type of plot to generate. This can be: 'trajectory',' profiler', 'dac'.
Returns
-------
fig : :class:`matplotlib.pyplot.figure.Figure`
Figure instance
"""
idx=self.to_dataframe()
if ptype=='dac':
return plot_dac(idx)
elif ptype=='profiler':
return plot_profilerType(idx)
elif ptype=='trajectory':
return plot_trajectory(idx.sort_values(['file']))
else:
raise ValueError("Type of plot unavailable. Use: 'dac', 'profiler' or 'trajectory' (default)")