Source code for argopy.stores.fsspec_wrappers

import os
import io
import xarray as xr
import pandas as pd
import requests
import fsspec
import shutil
import pickle
import json
import tempfile
from IPython.core.display import display, HTML

from argopy.options import OPTIONS
from argopy.errors import ErddapServerError, FileSystemHasNoCache, CacheFileNotFound
from abc import ABC, abstractmethod


class argo_store_proto(ABC):  # Should this class inherits from fsspec.spec.AbstractFileSystem ?
    protocol = ''  # One in fsspec.registry.known_implementations

    def __init__(self, cache: bool = False, cachedir: str = "", **kw):
        """ Create a file storage system for Argo data

            Parameters
            ----------
            cache : bool (False)
            cachedir : str (from OPTIONS)

        """
        self.cache = cache
        self.cachedir = OPTIONS['cachedir'] if cachedir == '' else cachedir
        if not self.cache:
            self.fs = fsspec.filesystem(self.protocol, **kw)
        else:
            self.fs = fsspec.filesystem("filecache",
                                        target_protocol=self.protocol,
                                        target_options={'simple_links': True},
                                        cache_storage=self.cachedir,
                                        expiry_time=86400, cache_check=10, **kw)
            # We use a refresh rate for cache of 1 day,
            # since this is the update frequency of the Ifremer erddap
            self.cache_registry = []  # Will hold uri cached by this store instance

    def open(self, path, *args, **kwargs):
        self.register(path)
        return self.fs.open(path, *args, **kwargs)

    def glob(self, path, **kwargs):
        return self.fs.glob(path, **kwargs)

    def exists(self, path, *args):
        return self.fs.exists(path, *args)

    def store_path(self, uri):
        if not uri.startswith(self.fs.target_protocol):
            path = self.fs.target_protocol + "://" + uri
        else:
            path = uri
        return path

    def register(self, uri):
        """ Keep track of files open with this instance """
        if self.cache:
            self.cache_registry.append(self.store_path(uri))

    def cachepath(self, uri: str, errors: str = 'raise'):
        """ Return path to cached file for a given URI """
        if not self.cache:
            if errors == 'raise':
                raise FileSystemHasNoCache("%s has no cache system" % type(self.fs))
        else:
            store_path = self.store_path(uri)
            self.fs.load_cache()
            if store_path in self.fs.cached_files[-1]:
                return os.path.sep.join([self.cachedir, self.fs.cached_files[-1][store_path]['fn']])
            elif errors == 'raise':
                raise CacheFileNotFound("No cached file found in %s for: \n%s" % (self.fs.storage[-1], uri))

    def _clear_cache_item(self, uri):
        """ Open fsspec cache registry (pickle file) and remove entry for an uri """
        # See the "save_cache()" method in:
        # https://filesystem-spec.readthedocs.io/en/latest/_modules/fsspec/implementations/cached.html#WholeFileCacheFileSystem
        fn = os.path.join(self.fs.storage[-1], "cache")
        cache = self.fs.cached_files[-1]
        if os.path.exists(fn):
            with open(fn, "rb") as f:
                cached_files = pickle.load(f)
        else:
            cached_files = cache
        cache = {}
        for k, v in cached_files.items():
            if k != uri:
                cache[k] = v.copy()
            else:
                os.remove(os.path.join(self.fs.storage[-1], v['fn']))
        fn2 = tempfile.mktemp()
        with open(fn2, "wb") as f:
            pickle.dump(cache, f)
        shutil.move(fn2, fn)

    def clear_cache(self):
        """ Remove cache files and entry from uri open with this store instance """
        if self.cache:
            for uri in self.cache_registry:
                self._clear_cache_item(uri)

    @abstractmethod
    def open_dataset(self):
        pass

    @abstractmethod
    def open_dataframe(self):
        pass


[docs]class filestore(argo_store_proto): """Wrapper around fsspec file stores https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.implementations.local.LocalFileSystem """ protocol = 'file' def open_dataset(self, url, **kwargs): """ Return a xarray.dataset from an url Parameters ---------- Path: str Path to resources passed to xarray.open_dataset Returns ------- :class:`xarray.DataSet` """ with self.fs.open(url) as of: ds = xr.open_dataset(of, **kwargs) self.register(url) return ds def open_dataframe(self, url, **kwargs): """ Return a pandas.dataframe from an url that is a csv ressource Parameters ---------- Path: str Path to csv resources passed to pandas.read_csv Returns ------- :class:`pandas.DataFrame` """ with self.fs.open(url) as of: df = pd.read_csv(of, **kwargs) self.register(url) return df
[docs]class httpstore(argo_store_proto): """Wrapper around fsspec http file store This wrapper intend to make argopy safer to failures from http requests This wrapper is primarily used by the Erddap data/index fetchers """ protocol = "http" def _verbose_exceptions(self, e): r = e.response # https://requests.readthedocs.io/en/master/api/#requests.Response data = io.BytesIO(r.content) url = r.url # 4XX client error response if r.status_code == 404: # Empty response error = ["Error %i " % r.status_code] error.append(data.read().decode("utf-8").replace("Error", "")) error.append("The URL triggering this error was: \n%s" % url) msg = "\n".join(error) if "Currently unknown datasetID" in msg: raise ErddapServerError("Dataset not found in the Erddap, try again later. " "The server may be rebooting. \n%s" % msg) else: raise requests.HTTPError(msg) elif r.status_code == 413: # Too large request error = ["Error %i " % r.status_code] error.append(data.read().decode("utf-8").replace("Error", "")) error.append("The URL triggering this error was: \n%s" % url) msg = "\n".join(error) if "Payload Too Large" in msg: raise ErddapServerError("Your query produced too much data. " "Try to request less data.\n%s" % msg) else: raise requests.HTTPError(msg) # 5XX server error response elif r.status_code == 500: # 500 Internal Server Error if "text/html" in r.headers.get('content-type'): display(HTML(data.read().decode("utf-8"))) error = ["Error %i " % r.status_code] error.append(data.read().decode("utf-8")) error.append("The URL triggering this error was: \n%s" % url) msg = "\n".join(error) if "No space left on device" in msg or "java.io.EOFException" in msg: raise ErddapServerError("An error occured on the Erddap server side. " "Please contact assistance@ifremer.fr to ask a " "reboot of the erddap server. \n%s" % msg) else: raise requests.HTTPError(msg) else: error = ["Error %i " % r.status_code] error.append(data.read().decode("utf-8")) error.append("The URL triggering this error was: \n%s" % url) print("\n".join(error)) r.raise_for_status() def open_json(self, url, **kwargs): """ Return a json from an url, or verbose errors Parameters ---------- url: str Returns ------- json """ try: with self.fs.open(url) as of: js = json.load(of, **kwargs) self.register(url) return js except json.JSONDecodeError as e: raise except requests.HTTPError as e: self._verbose_exceptions(e) def open_dataset(self, url, **kwargs): """ Return a xarray.dataset from an url, or verbose errors Parameters ---------- url: str Returns ------- :class:`xarray.DataArray` """ try: with self.fs.open(url) as of: ds = xr.open_dataset(of, **kwargs) self.register(url) return ds except requests.HTTPError as e: self._verbose_exceptions(e) def open_dataframe(self, url, **kwargs): """ Return a pandas.dataframe from an url with csv response, or verbose errors Parameters ---------- url: str Returns ------- :class:`pandas.DataFrame` """ try: with self.fs.open(url) as of: df = pd.read_csv(of, **kwargs) self.register(url) return df except requests.HTTPError as e: self._verbose_exceptions(e)
[docs]class memorystore(filestore): # Note that this inherits from filestore, not argo_store_proto protocol = 'memory'