"""
Argo file index store prototype
"""
import numpy as np
import pandas as pd
import logging
from abc import ABC, abstractmethod
from fsspec.core import split_protocol
from ..options import OPTIONS
from ..errors import FtpPathError, InvalidDataset, CacheFileNotFound
from ..utilities import Registry
from .filesystems import httpstore, memorystore, filestore, ftpstore
try:
import pyarrow.csv as csv
import pyarrow as pa
import pyarrow.parquet as pq
except ModuleNotFoundError:
pass
log = logging.getLogger("argopy.stores.index")
[docs]class ArgoIndexStoreProto(ABC):
"""
Examples
--------
An index store is instantiated with the access path (host) and the index file:
>>> idx = indexstore()
>>> idx = indexstore(host="ftp://ftp.ifremer.fr/ifremer/argo")
>>> idx = indexstore(host="https://data-argo.ifremer.fr", index_file="ar_index_global_prof.txt")
>>> idx = indexstore(host="https://data-argo.ifremer.fr", index_file="ar_index_global_prof.txt", cache=True)
Index methods and properties:
>>> idx.load()
>>> idx.load(nrows=12) # Only load the first N rows of the index
>>> idx.N_RECORDS # Shortcut for length of 1st dimension of the index array
>>> idx.index # internal storage structure of the full index (:class:`pyarrow.Table` or :class:`pandas.DataFrame`)
>>> idx.shape # shape of the full index array
>>> idx.uri_full_index # List of absolute path to files from the full index table column 'file'
>>> idx.to_dataframe(index=True) # Convert index to user-friendly :class:`pandas.DataFrame`
>>> idx.to_dataframe(index=True, nrows=2) # Only returns the first nrows of the index
Search methods and properties:
>>> idx.search_wmo(1901393)
>>> idx.search_cyc(1)
>>> idx.search_wmo_cyc(1901393, [1,12])
>>> idx.search_tim([-60, -55, 40., 45., '2007-08-01', '2007-09-01']) # Take an index BOX definition
>>> idx.search_lat_lon([-60, -55, 40., 45., '2007-08-01', '2007-09-01']) # Take an index BOX definition
>>> idx.search_lat_lon_tim([-60, -55, 40., 45., '2007-08-01', '2007-09-01']) # Take an index BOX definition
>>> idx.N_MATCH # Shortcut for length of 1st dimension of the search results array
>>> idx.search # Internal table with search results
>>> idx.uri # List of absolute path to files from the search results table column 'file'
>>> idx.run() # Run the search and save results in cache if necessary
>>> idx.to_dataframe() # Convert search results to user-friendly :class:`pandas.DataFrame`
>>> idx.to_dataframe(nrows=2) # Only returns the first nrows of the search results
Misc:
>>> idx.cname
>>> idx.read_wmo
>>> idx.records_per_wmo
"""
backend = "?"
"""Name of store backend"""
search_type = {}
"""Dictionary with search meta-data"""
ext = None
"""Storage file extension"""
[docs] def __init__(
self,
host: str = "https://data-argo.ifremer.fr",
index_file: str = "ar_index_global_prof.txt",
cache: bool = False,
cachedir: str = "",
timeout: int = 0,
):
""" Create an Argo index file store
Parameters
----------
host: str, default: ``https://data-argo.ifremer.fr``
Host is a local or remote ftp/http path to a `dac` folder (GDAC structure compliant). This takes values
like: ``ftp://ftp.ifremer.fr/ifremer/argo``, ``ftp://usgodae.org/pub/outgoing/argo`` or a local absolute path.
index_file: str, default: ``ar_index_global_prof.txt``
Name of the csv-like text file with the index
cache : bool, default: False
Use cache or not.
cachedir : str, default: OPTIONS['cachedir'])
Folder where to store cached files
"""
self.host = host
self.index_file = index_file
self.cache = cache
self.cachedir = OPTIONS["cachedir"] if cachedir == "" else cachedir
timeout = OPTIONS["api_timeout"] if timeout == 0 else timeout
self.fs = {}
if split_protocol(host)[0] is None:
self.fs["src"] = filestore(cache=cache, cachedir=cachedir)
elif "https" in split_protocol(host)[0]:
# Only for https://data-argo.ifremer.fr (much faster than the ftp servers)
self.fs["src"] = httpstore(
cache=cache, cachedir=cachedir, timeout=timeout, size_policy="head"
)
elif "ftp" in split_protocol(host)[0]:
if "ifremer" not in host and "usgodae" not in host:
raise FtpPathError("""Unknown Argo ftp: %s. Raise on issue if you wish to add your own to the
valid list of FTP servers:
https://github.com/euroargodev/argopy/issues/new?title=New%%20FTP%%20server""" % host)
self.fs["src"] = ftpstore(
host=split_protocol(host)[-1].split("/")[0], # host eg: ftp.ifremer.fr
cache=cache,
cachedir=cachedir,
timeout=timeout,
block_size=1000 * (2 ** 20),
)
else:
raise FtpPathError(
"Unknown protocol for an Argo index store: %s" % split_protocol(host)[0]
)
self.fs["client"] = memorystore(cache, cachedir) # Manage search results
self._memory_store_content = Registry(name='memory store') # Track files opened with this memory store, since it's a global store
self.search_path_cache = Registry(name='cached search') # Track cached files related to search
self.index_path = self.fs["src"].fs.sep.join([self.host, self.index_file])
if not self.fs["src"].exists(self.index_path):
raise FtpPathError("Index file does not exist: %s" % self.index_path)
def __repr__(self):
summary = ["<argoindex.%s>" % self.backend]
summary.append("Host: %s" % self.host)
summary.append("Index: %s" % self.index_file)
if hasattr(self, "index"):
summary.append("Loaded: True (%i records)" % self.N_RECORDS)
else:
summary.append("Loaded: False")
if hasattr(self, "search"):
match = "matches" if self.N_MATCH > 1 else "match"
summary.append(
"Searched: True (%i %s, %0.4f%%)"
% (self.N_MATCH, match, self.N_MATCH * 100 / self.N_RECORDS)
)
else:
summary.append("Searched: False")
return "\n".join(summary)
def _format(self, x, typ: str) -> str:
""" string formatting helper """
if typ == "lon":
if x < 0:
x = 360.0 + x
return ("%05d") % (x * 100.0)
if typ == "lat":
return ("%05d") % (x * 100.0)
if typ == "prs":
return ("%05d") % (np.abs(x) * 10.0)
if typ == "tim":
return pd.to_datetime(x).strftime("%Y-%m-%d")
return str(x)
@property
def cname(self) -> str:
""" Return the search constraint(s) as a pretty formatted string
Return 'full' if a search was not yet performed on the indexstore instance
This method uses the BOX, WMO, CYC keys of the index instance ``search_type`` property
"""
cname = "full"
if "BOX" in self.search_type:
BOX = self.search_type["BOX"]
cname = ("x=%0.2f/%0.2f;y=%0.2f/%0.2f") % (BOX[0], BOX[1], BOX[2], BOX[3],)
if len(BOX) == 6:
cname = ("x=%0.2f/%0.2f;y=%0.2f/%0.2f;t=%s/%s") % (
BOX[0],
BOX[1],
BOX[2],
BOX[3],
self._format(BOX[4], "tim"),
self._format(BOX[5], "tim"),
)
elif "WMO" in self.search_type:
WMO = self.search_type["WMO"]
if "CYC" in self.search_type:
CYC = self.search_type["CYC"]
prtcyc = lambda CYC, wmo: "WMO%i_%s" % ( # noqa: E731
wmo,
"_".join(["CYC%i" % (cyc) for cyc in sorted(CYC)]),
)
if len(WMO) == 1:
if "CYC" in self.search_type:
cname = "%s" % prtcyc(CYC, WMO[0])
else:
cname = "WMO%i" % (WMO[0])
else:
cname = ";".join(["WMO%i" % wmo for wmo in sorted(WMO)])
if "CYC" in self.search_type:
cname = ";".join([prtcyc(CYC, wmo) for wmo in WMO])
cname = "%s" % cname
elif "CYC" in self.search_type and "WMO" not in self.search_type:
CYC = self.search_type["CYC"]
if len(CYC) == 1:
cname = "CYC%i" % (CYC[0])
else:
cname = ";".join(["CYC%i" % cyc for cyc in sorted(CYC)])
cname = "%s" % cname
return cname
def _sha_from(self, path):
""" Internal post-processing for a sha
Used by: sha_df, sha_pq, sha_h5
"""
sha = path # no encoding
# sha = hashlib.sha256(path.encode()).hexdigest() # Full encoding
# log.debug("%s > %s" % (path, sha))
return sha
@property
def sha_df(self) -> str:
""" Returns a unique SHA for a cname/dataframe """
cname = "pd-%s" % self.cname
sha = self._sha_from(cname)
return sha
@property
def sha_pq(self) -> str:
""" Returns a unique SHA for a cname/parquet """
cname = "pq-%s" % self.cname
# if cname == "full":
# raise ValueError("Search not initialised")
# else:
# path = cname
sha = self._sha_from(cname)
return sha
@property
def sha_h5(self) -> str:
""" Returns a unique SHA for a cname/hdf5 """
cname = "h5-%s" % self.cname
# if cname == "full":
# raise ValueError("Search not initialised")
# else:
# path = cname
sha = self._sha_from(cname)
return sha
@property
def shape(self):
""" Shape of the index array """
# Must work for all internal storage type (:class:`pyarrow.Table` or :class:`pandas.DataFrame`)
return self.index.shape
@property
def N_FILES(self):
""" Number of rows in search result or index if search not triggered """
# Must work for all internal storage type (:class:`pyarrow.Table` or :class:`pandas.DataFrame`)
if hasattr(self, "search"):
return self.search.shape[0]
elif hasattr(self, "index"):
return self.index.shape[0]
else:
raise InvalidDataset("You must, at least, load the index first !")
@property
def N_RECORDS(self):
""" Number of rows in the full index """
# Must work for all internal storage type (:class:`pyarrow.Table` or :class:`pandas.DataFrame`)
if hasattr(self, "index"):
return self.index.shape[0]
else:
raise InvalidDataset("Load the index first !")
@property
def N_MATCH(self):
""" Number of rows in search result """
# Must work for all internal storage type (:class:`pyarrow.Table` or :class:`pandas.DataFrame`)
if hasattr(self, "search"):
return self.search.shape[0]
else:
raise InvalidDataset("Initialised search first !")
def _same_origin(self, path):
"""Compare origin of path with current memory fs"""
return path in self._memory_store_content
def _commit(self, path):
self._memory_store_content.commit(path)
def _write(self, fs, path, obj, fmt="pq"):
""" Write internal array object to file store
Parameters
----------
fs: filestore
obj: :class:`pyarrow.Table` or :class:`pandas.DataFrame`
fmt: str
File format to use. This is "pq" (default) or "pd"
"""
this_path = path
write_this = {
'pq': lambda o, h: pa.parquet.write_table(o, h),
'pd': lambda o, h: o.to_pickle(h) # obj is a pandas dataframe
}
if fmt == 'parquet':
fmt = 'pq'
with fs.open(this_path, "wb") as handle:
write_this[fmt](obj, handle)
if fs.protocol == 'memory':
self._commit(this_path)
return self
def _read(self, fs, path, fmt="pq"):
""" Read internal array object from file store
Parameters
----------
fs: filestore
path:
Path to readable object
fmt: str
File format to use. This is "pq" (default) or "pd"
Returns
-------
obj: :class:`pyarrow.Table` or :class:`pandas.DataFrame`
"""
this_path = path
read_this = {
'pq': lambda h: pa.parquet.read_table(h),
'pd': lambda h: pd.read_pickle(h)
}
if fmt == 'parquet':
fmt = 'pq'
with fs.open(this_path, "rb") as handle:
obj = read_this[fmt](handle)
return obj
def clear_cache(self):
"""Clear cache registry and files associated with this store instance."""
self.fs["src"].clear_cache()
self.fs["client"].clear_cache()
self._memory_store_content.clear()
self.search_path_cache.clear()
return self
def cachepath(self, path):
""" Return path to a cached file
Parameters
----------
path: str
Path for which to return the cached file path for. You can use `index` or `search` as shortcuts
to access path to the internal index or search tables.
Returns
-------
list(str)
"""
if path == 'index' and hasattr(self, 'index_path_cache'):
path = [self.index_path_cache]
elif path == 'search':
if len(self.search_path_cache) > 0:
path = self.search_path_cache.data
else:
path = [None]
# elif not self.fs['client'].cache:
# raise
# elif self.fs['client'].cache:
# raise
elif not isinstance(path, list):
path = [path]
return [self.fs["client"].cachepath(p) for p in path]
def to_dataframe(self, nrows=None, index=False): # noqa: C901
""" Return index or search results as :class:`pandas.DataFrame`
If search not triggered, fall back on full index by default. Using index=True force to return the full index.
Parameters
----------
nrows: {int, None}, default: None
Will return only the first `nrows` of search results. None returns all.
index: bool, default: False
Force to return the index, even if a search was performed with this store instance.
Returns
-------
:class:`pandas.DataFrame`
"""
def get_filename(s, index):
if hasattr(self, "search") and not index:
fname = s.search_path
else:
fname = s.index_path
if nrows is not None:
fname = fname + "/export" + "#%i.pd" % nrows
else:
fname = fname + "/export.pd"
return fname
df, src = self._to_dataframe(nrows=nrows, index=index)
fname = get_filename(self, index)
if self.cache and self.fs["client"].exists(fname):
log.debug(
"[%s] already processed as Dataframe, loading ... src='%s'"
% (src, fname)
)
df = self._read(self.fs["client"].fs, fname, fmt="pd")
else:
log.debug("Converting [%s] to dataframe from scratch ..." % src)
# Post-processing for user:
from argopy.utilities import load_dict, mapp_dict
if nrows is not None:
df = df.loc[0 : nrows - 1].copy()
if "index" in df:
df.drop("index", axis=1, inplace=True)
df.reset_index(drop=True, inplace=True)
df["wmo"] = df["file"].apply(lambda x: int(x.split("/")[1]))
df["date"] = pd.to_datetime(df["date"], format="%Y%m%d%H%M%S")
df["date_update"] = pd.to_datetime(df["date_update"], format="%Y%m%d%H%M%S")
# institution & profiler mapping for all users
# todo: may be we need to separate this for standard and expert users
institution_dictionnary = load_dict("institutions")
df["tmp1"] = df["institution"].apply(
lambda x: mapp_dict(institution_dictionnary, x)
)
df = df.rename(
columns={"institution": "institution_code", "tmp1": "institution"}
)
profiler_dictionnary = load_dict("profilers")
profiler_dictionnary["?"] = "?"
def ev(x):
try:
return int(x)
except Exception:
return x
df["profiler"] = df["profiler_type"].apply(
lambda x: mapp_dict(profiler_dictionnary, ev(x))
)
df = df.rename(columns={"profiler_type": "profiler_code"})
if self.cache:
self._write(self.fs["client"], fname, df, fmt="pd")
df = self._read(self.fs["client"].fs, fname, fmt="pd")
if not index:
self.search_path_cache.commit(fname) # Keep track of files related to search results
log.debug("This dataframe saved in cache. dest='%s'" % fname)
return df
@property
@abstractmethod
def search_path(self):
""" Path to search result uri
Returns
-------
str
"""
raise NotImplementedError("Not implemented")
@property
@abstractmethod
def uri_full_index(self):
""" List of URI from index
Returns
-------
list(str)
"""
raise NotImplementedError("Not implemented")
@property
@abstractmethod
def uri(self):
""" List of URI from search results
Returns
-------
list(str)
"""
raise NotImplementedError("Not implemented")
@abstractmethod
def load(self, nrows=None, force=False):
""" Load an Argo-index file content
Fill in self.index internal property
If store is cached, caching is triggered here
Try to load the gzipped file first, and if not found, fall back on the raw .txt file.
Parameters
----------
force: bool, default: False
Force to refresh the index stored with this store instance
nrows: {int, None}, default: None
Maximum number of index rows to load
"""
raise NotImplementedError("Not implemented")
@abstractmethod
def run(self):
""" Filter index with search criteria
Fill in self.search internal property
If store is cached, caching is triggered here
"""
raise NotImplementedError("Not implemented")
@abstractmethod
def _to_dataframe(self) -> pd.DataFrame:
""" Return search results as dataframe
If store is cached, caching is triggered here
"""
raise NotImplementedError("Not implemented")
@abstractmethod
def read_wmo(self):
""" Return list of unique WMOs in search results
Fall back on full index if search not found
Returns
-------
list(int)
"""
raise NotImplementedError("Not implemented")
@abstractmethod
def records_per_wmo(self):
""" Return the number of records per unique WMOs in search results
Fall back on full index if search not found
Returns
-------
dict
WMO are in keys, nb of records in values
"""
raise NotImplementedError("Not implemented")
@abstractmethod
def search_wmo(self, WMOs):
""" Search index for floats defined by their WMO
- Define search method
- Trigger self.run() to set self.search internal property
Parameters
----------
list(int)
List of WMOs to search
Examples
--------
>>> idx.search_wmo(2901746)
>>> idx.search_wmo([2901746, 4902252])
"""
raise NotImplementedError("Not implemented")
@abstractmethod
def search_cyc(self, CYCs):
""" Search index for cycle numbers
Parameters
----------
list(int)
List of CYCs to search
Examples
--------
>>> idx.search_cyc(1)
>>> idx.search_cyc([1,2])
"""
raise NotImplementedError("Not implemented")
@abstractmethod
def search_wmo_cyc(self, WMOs, CYCs):
""" Search index for floats defined by their WMO and specific cycle numbers
Parameters
----------
list(int)
List of WMOs to search
list(int)
List of CYCs to search
Examples
--------
>>> idx.search_wmo_cyc(2901746, 12)
>>> idx.search_wmo_cyc([2901746, 4902252], [1,2])
"""
raise NotImplementedError("Not implemented")
@abstractmethod
def search_tim(self, BOX):
""" Search index for a time range
Parameters
----------
box : list()
An index box to search Argo records for.
Warnings
--------
Only date bounds are considered from the index box.
Examples
--------
>>> idx.search_tim([-60, -55, 40., 45., '2007-08-01', '2007-09-01'])
"""
raise NotImplementedError("Not implemented")
@abstractmethod
def search_lat_lon(self, BOX):
""" Search index for a rectangular latitude/longitude domain
Parameters
----------
box : list()
An index box to search Argo records for.
Warnings
--------
Only lat/lon bounds are considered from the index box.
Examples
--------
>>> idx.search_lat_lon([-60, -55, 40., 45., '2007-08-01', '2007-09-01'])
"""
raise NotImplementedError("Not implemented")
@abstractmethod
def search_lat_lon_tim(self, BOX):
""" Search index for a rectangular latitude/longitude domain and time range
Parameters
----------
box : list()
An index box to search Argo records for.
Examples
--------
>>> idx.search_lat_lon_tim([-60, -55, 40., 45., '2007-08-01', '2007-09-01'])
"""
raise NotImplementedError("Not implemented")