import logging
import types
import xarray as xr
import concurrent.futures
import multiprocessing
import io
import fsspec
import warnings
from typing import Literal
from netCDF4 import Dataset
from ...errors import InvalidMethod, DataNotFound
from ...utils.transform import drop_variables_not_in_all_datasets
from ..filesystems import has_distributed, distributed
from ..filesystems import tqdm
from .http import httpstore
log = logging.getLogger("argopy.stores.implementation.ftp")
[docs]
class ftpstore(httpstore):
"""Argo ftp file system
Inherits from :class:`argopy.stores.httpstore` but relies on :class:`fsspec.implementations.ftp.FTPFileSystem`
"""
protocol = "ftp"
@property
def host(self):
return self.fs.fs.host if self.fs.protocol == "dir" else self.fs.host
@property
def port(self):
return self.fs.fs.port if self.fs.protocol == "dir" else self.fs.port
[docs]
def open_dataset(
self,
url: str,
errors: Literal["raise", "ignore", "silent"] = "raise",
lazy: bool = False,
xr_opts: dict = {},
**kwargs,
):
"""Create a :class:`xarray.Dataset` from an url pointing to a netcdf file
Parameters
----------
url: str
The remote URL of the netcdf file to open
errors: Literal, default: ``raise``
Define how to handle errors raised during data fetching:
- ``raise`` (default): Raise any error encountered
- ``ignore``: Do not stop processing, simply issue a debug message in logging console
- ``silent``: Do not stop processing and do not issue log message
lazy: bool, default=False
Define if we should try to load netcdf file lazily or not
**If this is set to False (default)** opening is done in 2 steps:
1. Download from ``url`` raw binary data with :class:`ftpstore.fs.cat_file`,
2. Create a :class:`xarray.Dataset` with :func:`xarray.open_dataset`.
Each functions can be passed specifics arguments with ``dwn_opts`` and ``xr_opts`` (see below).
**If this is set to True**, use a :class:`ArgoKerchunker` instance to access
the netcdf file lazily. You can provide a specific :class:`ArgoKerchunker` instance with the ``ak`` argument (see below).
xr_opts: dict, default={}
Options passed to :func:`xarray.open_dataset`. This will be ignored if the ``netCDF4` option is set to True.
Other Parameters
----------------
ak: :class:`ArgoKerchunker`, optional
:class:`ArgoKerchunker` instance to use if ``lazy=True``.
akoverwrite: bool, optional
Determine if kerchunk data should be overwritten or not. This is passed to :meth:`ArgoKerchunker.to_kerchunk`.
netCDF4: bool, optional, default=False
Return a :class:`netCDF4.Dataset` object instead of a :class:`xarray.Dataset`
Returns
-------
:class:`xarray.Dataset` or :class:`netCDF4.Dataset`
Raises
------
:class:`TypeError`
Raised if data returned by ``url`` are not CDF or HDF5 binary data.
:class:`DataNotFound`
Raised if ``errors`` is set to ``raise`` and url returns no data.
See Also
--------
:func:`httpstore.open_mfdataset`, :class:`ArgoKerchunker`
"""
def load_in_memory(url, errors="raise", xr_opts={}):
"""Download url content and return data along with xarray option to open it
Returns
-------
tuple: (data, xr_opts) or (None, None) if errors == "ignore"
"""
try:
this_url = self.fs._strip_protocol(url)
data = self.fs.cat_file(this_url)
if data is None:
if errors == "raise":
raise DataNotFound(url)
elif errors == "ignore":
log.error("DataNotFound: %s" % url)
return None, None
except Exception:
log.debug("Error with: %s" % url)
# except aiohttp.ClientResponseError as e:
raise
if data[0:3] != b"CDF" and data[0:3] != b"\x89HD":
raise TypeError(
"We didn't get a CDF or HDF5 binary data as expected ! We get: %s"
% data
)
if data[0:3] == b"\x89HD":
data = io.BytesIO(data)
return data, xr_opts
def load_lazily(url, errors="raise", xr_opts={}, akoverwrite: bool = False):
"""Check if url support lazy access and return kerchunk data along with xarray option to open it lazily
Otherwise, download url content and return data along with xarray option to open it.
Returns
-------
tuple:
If the url support lazy access:
("reference://", xr_opts)
else:
(data, xr_opts) or (None, None) if errors == "ignore"
"""
from .. import ArgoKerchunker
if "ak" not in kwargs:
self.ak = ArgoKerchunker()
self.ak.storage_options = {"host": self.host, "port": self.port}
else:
self.ak = kwargs["ak"]
if self.ak.supported(url, fs=self):
xr_opts = {
"engine": "zarr",
"backend_kwargs": {
"consolidated": False,
"storage_options": {
"fo": self.ak.to_reference(
url, overwrite=akoverwrite, fs=self
), # codespell:ignore
"remote_protocol": fsspec.core.split_protocol(url)[0],
"remote_options": self.ak.storage_options,
},
},
}
return "reference://", xr_opts
else:
warnings.warn(
"This url does not support byte range requests so we cannot load it lazily, falling back on loading in memory.\n(url='%s')"
% url
)
log.debug(
"This url does not support byte range requests: %s"
% self.full_path(url)
)
return load_in_memory(url, errors=errors, xr_opts=xr_opts)
netCDF4 = kwargs.get("netCDF4", False)
if lazy and netCDF4:
if errors == "raise":
raise ValueError("Cannot return a netCDF4.Dataset object in lazy mode")
elif errors == "ignore":
log.error("Cannot return a netCDF4.Dataset object in lazy mode")
return None
if not lazy:
target, _ = load_in_memory(url, errors=errors, xr_opts=xr_opts)
else:
target, xr_opts = load_lazily(
url,
errors=errors,
xr_opts=xr_opts,
akoverwrite=kwargs.get("akoverwrite", False),
)
if target is not None:
if not netCDF4:
ds = xr.open_dataset(target, **xr_opts)
if "source" not in ds.encoding:
if isinstance(url, str):
ds.encoding["source"] = self.full_path(url)
else:
target = target if isinstance(target, bytes) else target.getbuffer()
ds = Dataset(None, memory=target, diskless=True, mode="r")
self.register(url)
return ds
elif errors == "raise":
raise DataNotFound(url)
elif errors == "ignore":
log.error("DataNotFound from: %s" % url)
return None
def _mfprocessor_dataset(
self,
url,
preprocess=None,
preprocess_opts={},
open_dataset_opts={},
*args,
**kwargs,
):
# Load data
ds = self.open_dataset(url, **open_dataset_opts)
# Pre-process
if isinstance(preprocess, types.FunctionType) or isinstance(
preprocess, types.MethodType
):
ds = preprocess(ds, **preprocess_opts)
return ds
[docs]
def open_mfdataset(
self, # noqa: C901
urls,
max_workers: int = 6,
method: str = "sequential",
progress: bool = False,
concat: bool = True,
concat_dim="row",
preprocess=None,
preprocess_opts={},
open_dataset_opts={},
errors: str = "ignore",
*args,
**kwargs,
):
"""Open multiple ftp urls as a single xarray dataset.
This is a version of the :class:`argopy.stores.ftpstore.open_dataset` method that is able
to handle a list of urls/paths sequentially or in parallel.
Use a Threads Pool by default for parallelization.
Parameters
----------
urls: list(str)
List of url/path to open
max_workers: int, default: 6
Maximum number of threads or processes
method: str, default: ``thread``
The parallelization method to execute calls asynchronously:
- ``seq`` (default): open data sequentially, no parallelization applied
- ``process``: use a pool of at most ``max_workers`` processes
- :class:`distributed.client.Client`: Experimental, expect this method to fail !
progress: bool, default: False
Display a progress bar
concat: bool, default: True
Concatenate results in a single :class:`xarray.Dataset` or not (in this case, function will return a
list of :class:`xarray.Dataset`)
concat_dim: str, default: ``row``
Name of the dimension to use to concatenate all datasets (passed to :class:`xarray.concat`)
preprocess: callable (optional)
If provided, call this function on each dataset prior to concatenation
preprocess_opts: dict (optional)
If ``preprocess`` is provided, pass this as options
errors: str, default: ``ignore``
Define how to handle errors raised during data URIs fetching:
- ``raise``: Raise any error encountered
- ``ignore`` (default): Do not stop processing, simply issue a debug message in logging console
- ``silent``: Do not stop processing and do not issue log message
Other args and kwargs: other options passed to :class:`argopy.stores.httpstore.open_dataset`.
Returns
-------
output: :class:`xarray.Dataset` or list of :class:`xarray.Dataset`
"""
strUrl = lambda x: x.replace("ftps://", "").replace("ftp://", "") # noqa: E731
if not isinstance(urls, list):
urls = [urls]
results = []
failed = []
if method in ["process"]:
if max_workers == 6:
max_workers = multiprocessing.cpu_count()
ConcurrentExecutor = concurrent.futures.ProcessPoolExecutor(
max_workers=max_workers
)
with ConcurrentExecutor as executor:
future_to_url = {
executor.submit(
self._mfprocessor_dataset,
url,
preprocess=preprocess,
preprocess_opts=preprocess_opts,
open_dataset_opts=open_dataset_opts,
*args,
**kwargs,
): url
for url in urls
}
futures = concurrent.futures.as_completed(future_to_url)
if progress:
futures = tqdm(
futures, total=len(urls), disable="disable" in [progress]
)
for future in futures:
data = None
try:
data = future.result()
except Exception:
failed.append(future_to_url[future])
if errors == "ignore":
log.debug(
"Ignored error with this url: %s"
% strUrl(future_to_url[future])
)
# See fsspec.http logger for more
pass
elif errors == "silent":
pass
else:
raise
finally:
results.append(data)
elif has_distributed and isinstance(method, distributed.client.Client):
# Use a dask client:
if progress:
from dask.diagnostics import ProgressBar
with ProgressBar():
futures = method.map(
self._mfprocessor_dataset,
urls,
preprocess=preprocess,
preprocess_opts=preprocess_opts,
open_dataset_opts=open_dataset_opts,
*args,
**kwargs,
)
results = method.gather(futures)
else:
futures = method.map(
self._mfprocessor_dataset,
urls,
preprocess=preprocess,
*args,
**kwargs,
)
results = method.gather(futures)
elif method in ["seq", "sequential"]:
if progress:
urls = tqdm(urls, total=len(urls), disable="disable" in [progress])
for url in urls:
data = None
try:
data = self._mfprocessor_dataset(
url,
preprocess=preprocess,
preprocess_opts=preprocess_opts,
open_dataset_opts=open_dataset_opts,
*args,
**kwargs,
)
except Exception:
failed.append(url)
if errors == "ignore":
log.debug(
"Ignored error with this url: %s" % strUrl(url)
) # See fsspec.http logger for more
pass
elif errors == "silent":
pass
else:
raise
finally:
results.append(data)
else:
raise InvalidMethod(method)
# Post-process results
results = [r for r in results if r is not None] # Only keep non-empty results
if len(results) > 0:
if concat:
# ds = xr.concat(results, dim=concat_dim, data_vars='all', coords='all', compat='override')
results = drop_variables_not_in_all_datasets(results)
ds = xr.concat(
results,
dim=concat_dim,
data_vars="minimal",
coords="minimal",
compat="override",
)
return ds
else:
return results
else:
raise DataNotFound(urls)