Source code for argopy.data_fetchers.localftp_data

#!/bin/env python
# -*coding: UTF-8 -*-
"""
Argo data fetcher for a local copy of GDAC ftp.

This is not intended to be used directly, only by the facade at fetchers.py

Since the GDAC ftp is organised by DAC/WMO folders, we start by implementing the 'float' and 'profile' entry points.



About the index local ftp fetcher:

We have a large index csv file "ar_index_global_prof.txt", that is about ~200Mb
For a given request, we need to load/read it
and then to apply a filter to select lines matching the request
With the current version, a dataframe of the full index is cached
and then another cached file is created for the result of the filter.

df_full = pd.read_csv("index.txt")
df_small = filter(df_full)
write_on_file(df_small)

I think we can avoid this with a virtual file system
When a request is done, we



"""
import os
from glob import glob
import numpy as np
import pandas as pd
from abc import abstractmethod
import warnings
import getpass

from .proto import ArgoDataFetcherProto
from argopy.errors import NetCDF4FileNotFoundError
from argopy.utilities import (
    list_standard_variables,
    check_localftp,
    format_oneline
)
from argopy.options import OPTIONS
from argopy.stores import filestore, indexstore, indexfilter_box
from argopy.plotters import open_dashboard

access_points = ["wmo", "box"]
exit_formats = ["xarray"]
dataset_ids = ["phy", "bgc"]  # First is default
api_server_check = OPTIONS["local_ftp"]


[docs]class LocalFTPArgoDataFetcher(ArgoDataFetcherProto): """ Manage access to Argo data from a local copy of GDAC ftp """ ### # Methods to be customised for a specific request ### @abstractmethod def init(self, *args, **kwargs): """ Initialisation for a specific fetcher """ raise NotImplementedError("Not implemented") # @abstractmethod # def list_argo_files(self, errors: str = 'raise'): # """ Set the internal list of absolute path of all files to load # This function must define the attribute: self._list_of_argo_files with a list of path(s) # # Parameters # ---------- # errors: {'raise','ignore'}, optional # If 'raise' (default), raises a NetCDF4FileNotFoundError error if any of the requested # files cannot be found. If 'ignore', file not found is skipped when fetching data. # """ # pass ### # Methods that must not change ###
[docs] def __init__( self, local_ftp: str = "", ds: str = "", cache: bool = False, cachedir: str = "", dimension: str = "point", errors: str = "raise", parallel: bool = False, parallel_method: str = "thread", progress: bool = False, chunks: str = "auto", chunks_maxsize: dict = {}, **kwargs ): """ Init fetcher Parameters ---------- local_ftp: str (optional) Path to the local directory where the 'dac' folder is located. ds: str (optional) Dataset to load: 'phy' or 'ref' or 'bgc' errors: str (optional) If set to 'raise' (default), will raise a NetCDF4FileNotFoundError error if any of the requested files cannot be found. If set to 'ignore', the file not found is skipped when fetching data. cache: bool (optional) Cache data or not (default: False) cachedir: str (optional) Path to cache folder dimension: str Main dimension of the output dataset. This can be "profile" to retrieve a collection of profiles, or "point" (default) to have data as a collection of measurements. This can be used to optimise performances. parallel: bool (optional) Chunk request to use parallel fetching (default: False) parallel_method: str (optional) Define the parallelization method: ``thread``, ``process`` or a :class:`dask.distributed.client.Client`. progress: bool (optional) Show a progress bar or not when fetching data. chunks: 'auto' or dict of integers (optional) Dictionary with request access point as keys and number of chunks to create as values. Eg: - ``{'wmo': 10}`` will create a maximum of 10 chunks along WMOs when used with ``Fetch_wmo``. - ``{'lon': 2}`` will create a maximum of 2 chunks along longitude when used with ``Fetch_box``. chunks_maxsize: dict (optional) Dictionary with request access point as keys and chunk size as values (used as maximum values in 'auto' chunking). Eg: ``{'wmo': 5}`` will create chunks with as many as 5 WMOs each. """ self.cache = cache self.cachedir = cachedir self.fs = filestore(cache=self.cache, cachedir=self.cachedir) self.errors = errors if not isinstance(parallel, bool): # The parallelization method is passed through the argument 'parallel': parallel_method = parallel if parallel in ["thread", "process"]: parallel = True if parallel_method not in ["thread", "process"]: raise ValueError( "localftp only support multi-threading and processing ('%s' unknown)" % parallel_method ) self.parallel = parallel self.parallel_method = parallel_method self.progress = progress self.chunks = chunks self.chunks_maxsize = chunks_maxsize self.definition = "Local ftp Argo data fetcher" self.dataset_id = OPTIONS["dataset"] if ds == "" else ds self.local_ftp = OPTIONS["local_ftp"] if local_ftp == "" else local_ftp check_localftp(self.local_ftp, errors="raise") # Validate local_ftp self.init(**kwargs)
def __repr__(self): summary = ["<datafetcher.localftp>"] summary.append("Name: %s" % self.definition) summary.append("FTP: %s" % self.local_ftp) summary.append("Domain: %s" % format_oneline(self.cname())) return "\n".join(summary) def cname(self): """ Return a unique string defining the constraints """ return self._cname() def get_path(self, wmo: int, cyc: int = None) -> str: # noqa: C901 """ Return the absolute path toward the netcdf source file of a given wmo/cyc pair and a dataset Based on the dataset, the wmo and the cycle requested, return the absolute path toward the file to load. The file is searched using its expected file name pattern (following GDAC conventions). If more than one file are found to match the pattern, the first 1 (alphabetically) is returned. If no files match the pattern, the function can raise an error or fail silently and return None. Parameters ---------- wmo: int WMO float code cyc: int, optional Cycle number (None by default) Returns ------- netcdf_file_path : str """ # This function will be used whatever the access point, since we are working with a GDAC like set of files def _filepathpattern(wmo, cyc=None): """ Return a file path pattern to scan for a given wmo/cyc pair Based on the dataset and the cycle number requested, construct the closest file path pattern to be loaded This path is absolute, the pattern can contain '*', and it is the file path, so it has '.nc' extension Returns ------- file_path_pattern : str """ if cyc is None: # Multi-profile file: # dac/<DacName>/<FloatWmoID>/<FloatWmoID>_<S>prof.nc if self.dataset_id == "phy": return os.path.sep.join( [self.local_ftp, "dac", "*", str(wmo), "%i_prof.nc" % wmo] ) elif self.dataset_id == "bgc": return os.path.sep.join( [self.local_ftp, "dac", "*", str(wmo), "%i_Sprof.nc" % wmo] ) else: # Single profile file: # dac/<DacName>/<FloatWmoID>/profiles/<B/M/S><R/D><FloatWmoID>_<XXX><D>.nc if cyc < 1000: return os.path.sep.join( [ self.local_ftp, "dac", "*", str(wmo), "profiles", "*%i_%0.3d*.nc" % (wmo, cyc), ] ) else: return os.path.sep.join( [ self.local_ftp, "dac", "*", str(wmo), "profiles", "*%i_%0.4d*.nc" % (wmo, cyc), ] ) pattern = _filepathpattern(wmo, cyc) lst = sorted(glob(pattern)) # lst = sorted(self.fs.glob(pattern)) # Much slower than the regular glob ! if len(lst) == 1: return lst[0] elif len(lst) == 0: if self.errors == "raise": raise NetCDF4FileNotFoundError(pattern) else: # Otherwise remain silent/ignore # todo: should raise a warning instead ? return None else: # warnings.warn("More than one file to load for a single float cycle ! Return the 1st one by default.") # The choice of the file to load depends on the user mode and dataset requested. # todo: define a robust choice if self.dataset_id == "phy": if cyc is None: # Use the synthetic profile: lst = [ file for file in lst if [ file for file in [os.path.split(w)[-1] for w in lst] if file[0] == "S" ][0] in file ] else: # Use the ascent profile: lst = [ file for file in lst if [ file for file in [os.path.split(w)[-1] for w in lst] if file[-1] != "D" ][0] in file ] elif self.dataset_id == "bgc": lst = [ file for file in lst if [ file for file in [os.path.split(w)[-1] for w in lst] if file[0] == "M" ][0] in file ] return lst[0] @property @abstractmethod def uri(self): """ Return the list of files to load Returns ------- list(str) """ raise NotImplementedError("Not implemented") @property def cachepath(self): """ Return path to cache file(s) for this request Returns ------- list(str) """ return [self.fs.cachepath(url) for url in self.uri] def _preprocess_multiprof(self, ds): """ Pre-process one Argo multi-profile file as a collection of points Parameters ---------- ds: :class:`xarray.Dataset` Dataset to process Returns ------- :class:`xarray.Dataset` """ # Replace JULD and JULD_QC by TIME and TIME_QC ds = ds.rename( {"JULD": "TIME", "JULD_QC": "TIME_QC", "JULD_LOCATION": "TIME_LOCATION"} ) ds["TIME"].attrs = { "long_name": "Datetime (UTC) of the station", "standard_name": "time", } # Cast data types: ds = ds.argo.cast_types() # Enforce real pressure resolution: 0.1 db for vname in ds.data_vars: if "PRES" in vname and "QC" not in vname: ds[vname].values = np.round(ds[vname].values, 1) # Remove variables without dimensions: # todo: We should be able to find a way to keep them somewhere in the data structure for v in ds.data_vars: if len(list(ds[v].dims)) == 0: ds = ds.drop_vars(v) # print("DIRECTION", np.unique(ds['DIRECTION'])) # print("N_PROF", np.unique(ds['N_PROF'])) ds = ( ds.argo.profile2point() ) # Default output is a collection of points along N_POINTS # print("DIRECTION", np.unique(ds['DIRECTION'])) # Remove netcdf file attributes and replace them with argopy ones: ds.attrs = {} if self.dataset_id == "phy": ds.attrs["DATA_ID"] = "ARGO" if self.dataset_id == "bgc": ds.attrs["DATA_ID"] = "ARGO-BGC" ds.attrs["DOI"] = "http://doi.org/10.17882/42182" ds.attrs["Fetched_from"] = self.local_ftp ds.attrs["Fetched_by"] = getpass.getuser() ds.attrs["Fetched_date"] = pd.to_datetime("now").strftime("%Y/%m/%d") ds.attrs["Fetched_constraints"] = self.cname() ds.attrs["Fetched_uri"] = ds.encoding["source"] ds = ds[np.sort(ds.data_vars)] return ds def to_xarray(self, errors: str = "ignore"): """ Load Argo data and return a xarray.Dataset Returns ------- :class:`xarray.Dataset` """ # Download data: if not self.parallel: method = "sequential" else: method = self.parallel_method # ds = self.fs.open_mfdataset(self.uri, # method=method, # concat_dim='N_POINTS', # concat=True, # preprocess=self._preprocess_multiprof, # progress=self.progress, # errors=errors, # decode_cf=1, use_cftime=0, mask_and_scale=1, engine='h5netcdf') ds = self.fs.open_mfdataset( self.uri, method=method, concat_dim="N_POINTS", concat=True, preprocess=self._preprocess_multiprof, progress=self.progress, errors=errors, decode_cf=1, use_cftime=0, mask_and_scale=1, ) # Data post-processing: ds["N_POINTS"] = np.arange( 0, len(ds["N_POINTS"]) ) # Re-index to avoid duplicate values ds = ds.set_coords("N_POINTS") ds = ds.sortby("TIME") # Remove netcdf file attributes and replace them with simplified argopy ones: ds.attrs = {} if self.dataset_id == "phy": ds.attrs["DATA_ID"] = "ARGO" if self.dataset_id == "bgc": ds.attrs["DATA_ID"] = "ARGO-BGC" ds.attrs["DOI"] = "http://doi.org/10.17882/42182" ds.attrs["Fetched_from"] = self.local_ftp ds.attrs["Fetched_by"] = getpass.getuser() ds.attrs["Fetched_date"] = pd.to_datetime("now").strftime("%Y/%m/%d") ds.attrs["Fetched_constraints"] = self.cname() if len(self.uri) == 1: ds.attrs["Fetched_uri"] = self.uri[0] else: ds.attrs["Fetched_uri"] = ";".join(self.uri) return ds def filter_data_mode(self, ds, **kwargs): ds = ds.argo.filter_data_mode(errors="ignore", **kwargs) if ds.argo._type == "point": ds["N_POINTS"] = np.arange(0, len(ds["N_POINTS"])) return ds def filter_qc(self, ds, **kwargs): ds = ds.argo.filter_qc(**kwargs) if ds.argo._type == "point": ds["N_POINTS"] = np.arange(0, len(ds["N_POINTS"])) return ds def filter_variables(self, ds, mode="standard"): if mode == "standard": to_remove = sorted( list(set(list(ds.data_vars)) - set(list_standard_variables())) ) return ds.drop_vars(to_remove) else: return ds
[docs]class Fetch_wmo(LocalFTPArgoDataFetcher): """ Manage access to local ftp Argo data for: a list of WMOs """ def init(self, WMO: list = [], CYC=None, **kwargs): """ Create Argo data loader for WMOs Parameters ---------- WMO: list(int) The list of WMOs to load all Argo data for. CYC: int, np.array(int), list(int) The cycle numbers to load. """ if isinstance(CYC, int): CYC = np.array( (CYC,), dtype="int" ) # Make sure we deal with an array of integers if isinstance(CYC, list): CYC = np.array( CYC, dtype="int" ) # Make sure we deal with an array of integers self.WMO = WMO self.CYC = CYC return self @property def uri(self): """ List of files to load for a request Returns ------- list(str) """ def list_bunch(wmos, cycs): this = [] for wmo in wmos: if cycs is None: this.append(self.get_path(wmo)) else: for cyc in cycs: this.append(self.get_path(wmo, cyc)) return this # Get list of files to load: if not hasattr(self, "_list_of_argo_files"): # if not self.parallel: # self._list_of_argo_files = list_bunch(self.WMO, self.CYC) # else: # C = Chunker({'wmo': self.WMO}, chunks=self.chunks, chunksize=self.chunks_maxsize) # wmo_grps = C.fit_transform() # self._list_of_argo_files = [] # for wmos in wmo_grps: # self._list_of_argo_files.append(list_bunch(wmos, self.CYC)) self._list_of_argo_files = list_bunch(self.WMO, self.CYC) return self._list_of_argo_files def dashboard(self, **kw): if len(self.WMO) == 1: return open_dashboard(wmo=self.WMO[0], **kw) else: warnings.warn("Dashboard only available for a single float request")
[docs]class Fetch_box(LocalFTPArgoDataFetcher): """ Manage access to local ftp Argo data for: a rectangular space/time domain """ def init(self, box: list): """ Create Argo data loader Parameters ---------- box : list() The box domain to load all Argo data for, with one of the following convention: - box = [lon_min, lon_max, lat_min, lat_max, pres_min, pres_max] - box = [lon_min, lon_max, lat_min, lat_max, pres_min, pres_max, datim_min, datim_max] """ # We use a full domain definition (x, y, z, t) as argument for compatibility with the other fetchers # but at this point, we internally work only with x, y and t. self.BOX = box self.indexBOX = [box[ii] for ii in [0, 1, 2, 3]] if len(box) == 8: self.indexBOX = [box[ii] for ii in [0, 1, 2, 3, 6, 7]] # if len(box) == 6: # # Select the last months of data: # end = pd.to_datetime('now') # start = end - pd.DateOffset(months=1) # self.BOX.append(start.strftime('%Y-%m-%d')) # self.BOX.append(end.strftime('%Y-%m-%d')) self.fs_index = indexstore( self.cache, self.cachedir, os.path.sep.join([self.local_ftp, "ar_index_global_prof.txt"]), ) # todo we would need to check if the index file is there ! @property def uri(self): """ List of files to load for a request Returns ------- list(str) """ if not hasattr(self, "_list_of_argo_files"): self._list_of_argo_files = [] # Fetch the index to retrieve the list of profiles to load: filt = indexfilter_box(self.indexBOX) df_index = self.fs_index.read_csv(filt) if isinstance(df_index, pd.core.frame.DataFrame): # Ok, we found profiles in the index file, # so now we can make sure these files exist: lst = list(df_index["file"]) for file in lst: abs_file = os.path.sep.join([self.local_ftp, "dac", file]) if self.fs.exists(abs_file): self._list_of_argo_files.append(abs_file) elif self.errors == "raise": raise NetCDF4FileNotFoundError(abs_file) else: # Otherwise remain silent/ignore # todo should raise a warning instead ? return None return self._list_of_argo_files