Source code for argopy.stores.index.spec

import copy
import numpy as np
import pandas as pd
import logging
import time
from abc import ABC, abstractmethod
from fsspec.core import split_protocol
from urllib.parse import urlparse
from typing import Union, List
from pathlib import Path
import sys

if sys.version_info >= (3, 11):
    from typing import Self
else:
    from typing_extensions import Self

from ...options import OPTIONS
from ...errors import GdacPathError, S3PathError, InvalidDataset, OptionValueError, InvalidDatasetStructure
from ...utils import isconnected, has_aws_credentials, to_list
from ...utils import Registry
from ...utils import Chunker
from ...utils import shortcut2gdac
from ...utils import deprecated

from .. import httpstore, memorystore, filestore, ftpstore, s3store
from .implementations.index_s3 import get_a_s3index

try:
    import pyarrow.csv as csv  # noqa: F401
    import pyarrow as pa
    import pyarrow.parquet as pq  # noqa: F401
except ModuleNotFoundError:
    pass

# from .argo_index_proto_s3 import search_s3

log = logging.getLogger("argopy.stores.index")


[docs] class ArgoIndexStoreProto(ABC): backend = "?" """Name of store backend (pandas or pyarrow)""" # Pandas or Pyarrow search_type = {} """Dictionary with search meta-data""" ext = None """Storage file extension""" convention_supported = [ "ar_index_global_prof", "core", "argo_bio-profile_index", "bgc-b", "bio", "argo_synthetic-profile_index", "bgc-s", "synth", "argo_aux-profile_index", "aux", "ar_index_global_meta", "meta", ] """List of supported conventions""" _load_dict = None """Place holder for load_dict method"""
[docs] def __init__( self, host: str = None, index_file: str = "ar_index_global_prof.txt", convention: str = None, cache: bool = False, cachedir: str = "", timeout: int = 0, **kwargs, ): """Create an Argo index store Parameters ---------- host: str, optional, default=OPTIONS["gdac"] Local or remote (http, ftp or s3) path to a `dac` folder (compliant with GDAC structure). This parameter takes values like: - ``http`` or ``https`` for ``https://data-argo.ifremer.fr`` - ``us-http`` or ``us-https`` for ``https://usgodae.org/pub/outgoing/argo`` - ``ftp`` for ``ftp://ftp.ifremer.fr/ifremer/argo`` - ``s3`` or ``aws`` for ``s3://argo-gdac-sandbox/pub/idx`` - a local absolute path index_file: str, default: ``ar_index_global_prof.txt`` Name of the csv-like text file with the index. This parameter takes values like: - ``core`` or ``ar_index_global_prof.txt`` - ``bgc-b`` or ``argo_bio-profile_index.txt`` - ``bgc-s`` or ``argo_synthetic-profile_index.txt`` - ``aux`` or ``etc/argo-index/argo_aux-profile_index.txt`` - ``meta`` or ``ar_index_global_meta.txt`` - a local absolute path toward a file following an Argo index convention. When using a local file, you need to set the ``convention`` followed by the file. convention: str, default: None Set the expected format convention of the index file. This is useful when trying to load an index file with a custom name. If set to ``None``, we'll try to infer the convention from the ``index_file`` value. This parameter takes values like: - ``core`` or ``ar_index_global_prof`` - ``bgc-b`` or ``argo_bio-profile_index`` - ``bgc-s`` or ``argo_synthetic-profile_index`` - ``aux`` or ``argo_aux-profile_index`` - ``meta`` or ``ar_index_global_meta`` cache : bool, default: False Use cache or not. cachedir: str, default: OPTIONS['cachedir'] Folder where to store cached files. timeout: int, default: OPTIONS['api_timeout'] Time out in seconds to connect to a remote host (ftp or http). """ host = OPTIONS["gdac"] if host is None else shortcut2gdac(host) self.host = host self.root = host # Will be used by the .uri properties, this is introduced to deal with index files not on the same root as DAC folders # Catchup keyword for the main profile index files: if index_file in ["core"]: index_file = "ar_index_global_prof.txt" elif index_file in ["bgc-s", "synth"]: index_file = "argo_synthetic-profile_index.txt" elif index_file in ["bgc-b", "bio"]: index_file = "argo_bio-profile_index.txt" elif index_file in ["aux"]: index_file = "etc/argo-index/argo_aux-profile_index.txt" elif index_file in ["meta"]: index_file = "ar_index_global_meta.txt" self.index_file = index_file # Default number of commented lines to skip at the beginning of csv index files # (this is different for s3 than for ftp/http) self.skip_rows = 8 # Create a File Store to access index file: self.cache = cache self.cachedir = OPTIONS["cachedir"] if cachedir == "" else cachedir self.timeout = OPTIONS["api_timeout"] if timeout == 0 else timeout self.fs = {} if split_protocol(host)[0] is None: self.fs["src"] = filestore(cache=cache, cachedir=cachedir) elif split_protocol(self.host)[0] in ["https", "http"]: # Only for https://data-argo.ifremer.fr (much faster than the ftp servers) self.fs["src"] = httpstore( cache=cache, cachedir=cachedir, timeout=timeout, size_policy="head" ) elif "ftp" in split_protocol(self.host)[0]: if "ifremer" not in host: log.info( """Working with a non-official Argo ftp server: %s. Raise on issue if you wish to add your own to the valid list of FTP servers: https://github.com/euroargodev/argopy/issues/new?title=New%%20FTP%%20server""" % host ) if not isconnected(host): raise GdacPathError("This host (%s) is not alive !" % host) self.fs["src"] = ftpstore( host=urlparse(self.host).hostname, # host eg: ftp.ifremer.fr port=0 if urlparse(self.host).port is None else urlparse(self.host).port, cache=cache, cachedir=cachedir, timeout=self.timeout, block_size=1000 * (2**20), ) elif "s3" in split_protocol(self.host)[0]: # On AWS S3, index files are not under DAC root: if self.host == 's3://argo-gdac-sandbox/pub/idx': self.root = 's3://argo-gdac-sandbox/pub' if self.host == 's3://argo-gdac-sandbox/pub': self.host = 's3://argo-gdac-sandbox/pub/idx' self.root = 's3://argo-gdac-sandbox/pub' if "argo-gdac-sandbox/pub/idx" not in self.host: log.info( """Working with a non-official Argo s3 server: %s. Raise on issue if you wish to add your own to the valid list of S3 servers: https://github.com/euroargodev/argopy/issues/new?title=New%%20S3%%20server""" % self.host ) if not isconnected(self.host): raise S3PathError("This host (%s) is not alive !" % self.host) self.fs["src"] = s3store( cache=cache, cachedir=cachedir, anon=not has_aws_credentials(), ) self.skip_rows = 10 else: raise GdacPathError( "Unknown protocol for an Argo index store: %s" % split_protocol(host)[0] ) # Create a File Store to manage search results: self.fs["client"] = memorystore(cache, cachedir, skip_instance_cache=True) # Registry to Track files opened with the memory store # (since it's a global store, other instances will access the same fs, we need our registry here) self._memory_store_content = Registry(name="memory store") # Registry to Track cached files related to search: self.search_path_cache = Registry(name="cached search") # Try to infer index convention from the file name: if convention is None: convention = index_file.split(self.fs["src"].fs.sep)[-1].split(".")[0] if convention not in self.convention_supported: raise OptionValueError( "Convention '%s' is not supported, it must be one in: %s" % (convention, self.convention_supported) ) else: # Catch shortcuts for convention: if convention in ["core"]: convention = "ar_index_global_prof" elif convention in ["bgc-s", "synth"]: convention = "argo_synthetic-profile_index" elif convention in ["bgc-b", "bio"]: convention = "argo_bio-profile_index" elif convention in ["aux"]: convention = "argo_aux-profile_index" elif convention in ["meta"]: convention = "ar_index_global_meta" self._convention = convention # Check if the index file exists # Allow for up to 10 try to account for some slow servers i_try, max_try, index_found = 0, 1 if "invalid" in self.host else 10, False while i_try < max_try: if not self.fs["src"].exists(self.index_path) and not self.fs["src"].exists( self.index_path + ".gz" ): time.sleep(1) i_try += 1 else: index_found = True break if not index_found: raise GdacPathError("Index file does not exist: %s" % self.index_path) else: # Will init search with full index by default: self._nrows_index = None # Work with the compressed index if available: if self.fs["src"].exists(self.index_path + ".gz"): self.index_file += ".gz" if isinstance(self.fs["src"], s3store): # If the index host is on a S3 store, we add another file system that will be called to # bypass some search methods to improve performances. self.fs["s3"] = get_a_s3index(self.convention) # Adjust S3 bucket name and key with host and index file names: self.fs["s3"].bucket_name = Path(split_protocol(self.host)[1]).parts[0] self.fs["s3"].key = str( Path(*Path(split_protocol(self.host)[1]).parts[1:]) / self.index_file )
# # CNAME internal manager to be able to chain search methods: # self._cname = None def __repr__(self): summary = ["<argoindex.%s>" % self.backend] summary.append("Host: %s" % self.host) summary.append("Index: %s" % self.index_file) summary.append("Convention: %s (%s)" % (self.convention, self.convention_title)) if hasattr(self, "index"): summary.append("In memory: True (%i records)" % self.N_RECORDS) elif "s3" in self.host: summary.append( "In memory: False [But there's no need to load the full index with a S3 host to make wmo/cycles searches]" ) else: summary.append("In memory: False") if hasattr(self, "search"): match = "matches" if self.N_MATCH > 1 else "match" summary.append( "Searched: True (%i %s, %0.4f%%)" % (self.N_MATCH, match, self.N_MATCH * 100 / self.N_RECORDS) ) else: summary.append("Searched: False") return "\n".join(summary) def _format(self, x, typ: str) -> str: """string formatting helper""" if typ == "lon": if x < 0: x = 360.0 + x return ("%05d") % (x * 100.0) if typ == "lat": return ("%05d") % (x * 100.0) if typ == "prs": return ("%05d") % (np.abs(x) * 10.0) if typ == "tim": return pd.to_datetime(x).strftime("%Y-%m-%d") return str(x) @property def index_path(self): """Absolute path to the index file""" return self.fs["src"].fs.sep.join([self.host, self.index_file]) @property def cname(self) -> str: """Search query as a pretty formatted string Return 'full' if a search was not yet performed on the :class:`ArgoIndex` instance This method uses the BOX, WMO, CYC keys of the index instance ``search_type`` property """ cname = "full" C = [] for key in self.search_type.keys(): if key == "LAT": LAT = self.search_type["LAT"] cname = ("y=%0.2f/%0.2f") % ( LAT[0], LAT[1], ) elif key == "LON": LON = self.search_type["LON"] cname = ("x=%0.2f/%0.2f") % ( LON[0], LON[1], ) elif key == "DATE": DATE = self.search_type["DATE"] cname = ("t=%s/%s") % ( self._format(DATE[0], "tim"), self._format(DATE[1], "tim"), ) elif key == "BOX": BOX = self.search_type["BOX"] cname = ("x=%0.2f/%0.2f;y=%0.2f/%0.2f;t=%s/%s") % ( BOX[0], BOX[1], BOX[2], BOX[3], self._format(BOX[4], "tim"), self._format(BOX[5], "tim"), ) elif "WMO" == key: WMO = self.search_type["WMO"] if len(WMO) == 1: cname = "WMO%i" % (WMO[0]) else: cname = ";".join(["WMO%i" % wmo for wmo in sorted(WMO)]) elif "CYC" == key: CYC = self.search_type["CYC"] if len(CYC) == 1: cname = "CYC%i" % (CYC[0]) else: cname = ";".join(["CYC%i" % cyc for cyc in sorted(CYC)]) cname = "%s" % cname elif "PARAMS" == key: PARAM, LOG = self.search_type["PARAMS"] cname = ("_%s_" % LOG).join(PARAM) elif "DMODE" == key: DMODE, LOG = self.search_type["DMODE"] cname = ("_%s_" % LOG).join( ["%s_%s" % (p, "".join(DMODE[p])) for p in DMODE] ) elif "PTYPE" == key: PTYPE = self.search_type["PTYPE"] if len(PTYPE) == 1: cname = "PTYPE%i" % (PTYPE[0]) else: cname = ";".join(["PTYPE%i" % pt for pt in sorted(PTYPE)]) cname = "%s" % cname elif "PLABEL" == key: PLABEL = self.search_type["PLABEL"] LOG = 'or' cname = ("_%s_" % LOG).join(PLABEL) C.append(cname) return "_and_".join(C) def _sha_from(self, path): """Internal post-processing for a sha Used by: sha_df, sha_pq, sha_h5 """ sha = path # no encoding # sha = hashlib.sha256(path.encode()).hexdigest() # Full encoding # log.debug("%s > %s" % (path, sha)) return sha @property def sha_df(self) -> str: """Returns a unique SHA for a cname/dataframe""" cname = "pd-%s" % self.cname sha = self._sha_from(cname) return sha @property def sha_pq(self) -> str: """Returns a unique SHA for a cname/parquet""" cname = "pq-%s" % self.cname # if cname == "full": # raise ValueError("Search not initialised") # else: # path = cname sha = self._sha_from(cname) return sha @property def sha_h5(self) -> str: """Returns a unique SHA for a cname/hdf5""" cname = "h5-%s" % self.cname # if cname == "full": # raise ValueError("Search not initialised") # else: # path = cname sha = self._sha_from(cname) return sha @property def _r4(self): """Reference table 4 "Argo data centres and institutions" as a dictionary""" if self._load_dict is None: from ...related import load_dict self._load_dict = load_dict return self._load_dict('institutions') @property def _r8(self): """Reference table 8 "Argo instrument types" as a dictionary""" if self._load_dict is None: from ...related import load_dict self._load_dict = load_dict return self._load_dict('profilers') @property def shape(self): """Shape of the index array""" # Must work for all internal storage type (:class:`pyarrow.Table` or :class:`pandas.DataFrame`) return self.index.shape @property def N_FILES(self): """Number of rows in search result or index if search not triggered""" # Must work for all internal storage type (:class:`pyarrow.Table` or :class:`pandas.DataFrame`) if hasattr(self, "search"): return self.search.shape[0] elif hasattr(self, "index"): return self.index.shape[0] else: raise InvalidDataset("You must, at least, load the index first !") @property def N_RECORDS(self): """Number of rows in the full index""" # Must work for all internal storage type (:class:`pyarrow.Table` or :class:`pandas.DataFrame`) if hasattr(self, "index"): return self.index.shape[0] elif "s3" in self.host: return np.Inf else: raise InvalidDataset("Load the index first !") @property def N_MATCH(self): """Number of rows in search result""" # Must work for all internal storage type (:class:`pyarrow.Table` or :class:`pandas.DataFrame`) if hasattr(self, "search"): return self.search.shape[0] else: raise InvalidDataset("Initialised search first !") @property def convention(self): """Convention of the index (standard csv file name)""" return self._convention @property def convention_title(self): """Long name for the index convention""" if self.convention in ["ar_index_global_prof", "core"]: title = "Profile directory file of the Argo GDAC" elif self.convention in ["argo_bio-profile_index", "bgc-b", "bio"]: title = "Bio-Profile directory file of the Argo GDAC" elif self.convention in ["argo_synthetic-profile_index", "bgc-s", "synth"]: title = "Synthetic-Profile directory file of the Argo GDAC" elif self.convention in ["argo_aux-profile_index", "aux"]: title = "Aux-Profile directory file of the Argo GDAC" elif self.convention in ["ar_index_global_meta", "meta"]: title = "Metadata directory file of the Argo GDAC" return title @property def convention_columns(self) -> List[str]: """CSV file column names for the index convention""" if self.convention == "ar_index_global_prof": columns = ['file', 'date', 'latitude', 'longitude', 'ocean', 'profiler_type', 'institution', 'date_update'] elif self.convention in ["argo_bio-profile_index", "argo_synthetic-profile_index"]: columns = ['file', 'date', 'latitude', 'longitude', 'ocean', 'profiler_type', 'institution', 'parameters', 'parameter_data_mode', 'date_update'] elif self.convention in ["argo_aux-profile_index"]: columns = ['file', 'date', 'latitude', 'longitude', 'ocean', 'profiler_type', 'institution', 'parameters', 'date_update'] elif self.convention in ["ar_index_global_meta"]: columns = ['file', 'profiler_type', 'institution', 'date_update'] return columns def _same_origin(self, path): """Compare origin of path with current memory fs""" return path in self._memory_store_content def _commit(self, path): self._memory_store_content.commit(path) def _write(self, fs, path, obj, fmt="pq"): """Write internal array object to file store, possibly cached Parameters ---------- fs: Union[filestore, memorystore] obj: :class:`pyarrow.Table` or :class:`pandas.DataFrame` fmt: str File format to use. This is "pq" (default) or "pd" """ this_path = path write_this = { "pq": lambda o, h: pa.parquet.write_table(o, h), "pd": lambda o, h: o.to_pickle(h), # obj is a pandas dataframe } if fmt == "parquet": fmt = "pq" if isinstance(fs, memorystore): fs.fs.touch( this_path ) # Fix for https://github.com/euroargodev/argopy/issues/345 # fs.fs.touch(this_path) # Fix for https://github.com/euroargodev/argopy/issues/345 # This is an f* mystery to me, why do we need 2 calls to trigger file creation FOR REAL ???? # log.debug("memorystore touched this path before open context: '%s'" % this_path) with fs.open(this_path, "wb") as handle: write_this[fmt](obj, handle) if fs.protocol == "memory": self._commit(this_path) # log.debug("_write this path: '%s'" % this_path) if self.cache: fs.fs.save_cache() return self def _read(self, fs, path, fmt="pq"): """Read internal array object from file store Parameters ---------- fs: filestore path: Path to readable object fmt: str File format to use. This is "pq" (default) or "pd" Returns ------- obj: :class:`pyarrow.Table` or :class:`pandas.DataFrame` """ this_path = path read_this = { "pq": lambda h: pa.parquet.read_table(h), "pd": lambda h: pd.read_pickle(h), } if fmt == "parquet": fmt = "pq" with fs.open(this_path, "rb") as handle: obj = read_this[fmt](handle) # log.debug("_read this path: '%s'" % this_path) return obj def clear_cache(self) -> Self: """Clear cache registry and files associated with this store instance.""" self.fs["src"].clear_cache() self.fs["client"].clear_cache() self._memory_store_content.clear() self.search_path_cache.clear() return self def cachepath(self, path): """Return path to a cached file Parameters ---------- path: str Path for which to return the cached file path for. You can use `index` or `search` as shortcuts to access path to the internal index or search tables. Returns ------- list(str) """ if path == "index" and hasattr(self, "index_path_cache"): path = [self.index_path_cache] elif path == "search": if len(self.search_path_cache) > 0: path = self.search_path_cache.data else: path = [None] # elif not self.fs['client'].cache: # raise # elif self.fs['client'].cache: # raise elif not isinstance(path, list): path = [path] return [self.fs["client"].cachepath(p) for p in path] def to_dataframe(self, nrows=None, index=False, completed=True): # noqa: C901 """Return index or search results as :class:`pandas.DataFrame` If search not triggered, fall back on full index by default. Using index=True force to return the full index. Parameters ---------- nrows: {int, None}, default: None Will return only the first `nrows` of search results. None returns all. index: bool, default: False Force to return the index, even if a search was performed with this store instance. completed: bool, default: True Complete the raw index columns with: Platform Number (WMO), Cycle Number, Institution and Profiler details This is adding an extra computation, so if you care about performances, you may set this to False. Returns ------- :class:`pandas.DataFrame` """ def get_filename(s, index): if hasattr(self, "search") and not index: fname = s.search_path else: fname = s.index_path if not completed: suff = "_raw" else: suff = "" if nrows is not None: fname = fname + "/export" + suff + "#%i.pd" % nrows else: fname = fname + "/export" + suff + ".pd" return fname df, src = self._to_dataframe(nrows=nrows, index=index) fname = get_filename(self, index) if self.cache and self.fs["client"].exists(fname): log.debug( "[%s] already processed as Dataframe, loading ... src='%s'" % (src, fname) ) df = self._read(self.fs["client"].fs, fname, fmt="pd") else: log.debug("Converting [%s] to dataframe from scratch ..." % src) # Post-processing for user: from ...related import mapp_dict if nrows is not None: df = df.loc[0 : nrows - 1].copy() if "index" in df: df.drop("index", axis=1, inplace=True) df.reset_index(drop=True, inplace=True) if "date" in df: df["date"] = pd.to_datetime(df["date"], format="%Y%m%d%H%M%S") df["date_update"] = pd.to_datetime(df["date_update"], format="%Y%m%d%H%M%S") df["wmo"] = df["file"].apply(lambda x: int(x.split("/")[1])) if self.convention not in [ "ar_index_global_meta", ]: df["cyc"] = df["file"].apply( lambda x: int(x.split("_")[1].split(".nc")[0].replace("D", "")) ) if 'profiler_type' in self.convention_columns: df['profiler_type'] = df['profiler_type'].fillna(9999).astype(int) if completed: # institution & profiler mapping for all users # todo: may be we need to separate this for standard and expert users institution_dictionary = self._r4 df["tmp1"] = df["institution"].apply( lambda x: mapp_dict(institution_dictionary, x) ) df = df.rename( columns={"institution": "institution_code", "tmp1": "institution"} ) df["dac"] = df["file"].apply(lambda x: x.split("/")[0]) profiler_dictionary = self._r8 def ev(x): try: return int(x) except Exception: return x df["profiler"] = df["profiler_type"].apply( lambda x: mapp_dict(profiler_dictionary, ev(x)) ) df = df.rename(columns={"profiler_type": "profiler_code"}) if self.cache: self._write(self.fs["client"], fname, df, fmt="pd") df = self._read(self.fs["client"].fs, fname, fmt="pd") if not index: self.search_path_cache.commit( fname ) # Keep track of files related to search results log.debug("This dataframe saved in cache. dest='%s'" % fname) return df def to_indexfile(self): """Save search results on file, following the Argo standard index format""" raise NotImplementedError("Not implemented") @property @abstractmethod def search_path(self): """Path to search result uri Returns ------- str """ raise NotImplementedError("Not implemented") @property @abstractmethod def uri_full_index(self) -> List[str]: """List of URI from index Returns ------- list(str) """ raise NotImplementedError("Not implemented") @property @abstractmethod def uri(self) -> List[str]: """List of URI from search results Returns ------- list(str) """ raise NotImplementedError("Not implemented") @property def files(self) -> List[str]: """File paths listed in search results""" return self.read_files(index=False) @property def files_full_index(self) -> List[str]: """File paths listed in the index""" return self.read_files(index=True) @property def domain(self): """Space/time domain of the index This is different from a usual argopy ``box`` because dates are in :class:`numpy.datetime64` format. """ return self.read_domain() @abstractmethod def load(self, nrows=None, force=False): """Load an Argo-index file content in memory Fill in self.index internal property If store is cached, caching is triggered here Try to load the gzipped file first, and if not found, fall back on the raw .txt file. Parameters ---------- force: bool, default: False Force to refresh the index stored with this store instance nrows: {int, None}, default: None Maximum number of index rows to load """ raise NotImplementedError("Not implemented") @abstractmethod def run(self): """Execute index search query (internal use) This method will populate the self.search internal property If store is cached, caching is triggered here """ raise NotImplementedError("Not implemented") @abstractmethod def _to_dataframe(self) -> pd.DataFrame: """Return search results as dataframe If store is cached, caching is triggered here """ raise NotImplementedError("Not implemented") @abstractmethod def read_wmo(self): """Return list of unique WMOs in index or search results Fall back on full index if search not found Returns ------- list(int) """ raise NotImplementedError("Not implemented") @abstractmethod def read_dac_wmo(self, index=False): """Return a tuple of unique [DAC, WMO] pairs from the index or search results Fall back on full index if search not triggered Returns ------- tuple """ raise NotImplementedError("Not implemented") @abstractmethod def read_params(self): """Return list of unique PARAMETERs in index or search results Fall back on full index if search not found Returns ------- list(str) """ raise NotImplementedError("Not implemented") @abstractmethod def read_files(self, index=False): """Return file paths listed in index or search results Fall back on full index if search not triggered Returns ------- list(str) """ raise NotImplementedError("Not implemented") @abstractmethod def read_domain(self): """Read the space/time domain of the index This is different from a usual argopy ``box`` because dates are in :class:`numpy.datetime64` format. Fall back on full index if search not found Returns ------- list [lon_min, lon_max, lat_min, lat_max, datim_min, datim_max] """ raise NotImplementedError("Not implemented") @abstractmethod def records_per_wmo(self): """Return the number of records per unique WMOs in index or search results Fall back on full index if search not found Returns ------- dict WMO are in keys, nb of records in values """ raise NotImplementedError("Not implemented") @deprecated("this method is replaced by `ArgoIndex().query.wmo()`", version="1.1.0") def search_wmo(self, WMOs, nrows=None): return self.query.wmo(WMOs, nrows=nrows) @deprecated("this method is replaced by `ArgoIndex().query.cyc()`", version="1.1.0") def search_cyc(self, CYCs, nrows=None): """Deprecated: this method is replaced by `ArgoIndex().query.cyc()`""" return self.query.cyc(CYCs, nrows=nrows) @deprecated("this method is replaced by `ArgoIndex().query.compose()`", version="1.1.0") def search_wmo_cyc(self, WMOs, CYCs, nrows=None): """Deprecated: this method is replaced by `ArgoIndex().query.wmo_cyc()`""" return self.query.wmo_cyc(WMOs, CYCs, nrows=nrows) @deprecated("this method is replaced by `ArgoIndex().query.date()`", version="1.1.0") def search_tim(self, BOX, nrows=None): """Deprecated: this method is replaced by `ArgoIndex().query.date()`""" return self.query.date(BOX, nrows=nrows) @deprecated("this method is replaced by `ArgoIndex().query.lon_lat()`", version="1.1.0") def search_lat_lon(self, BOX, nrows=None): """Deprecated: this method is replaced by ``ArgoIndex().query.lon_lat()``""" return self.query.lon_lat(BOX, nrows=nrows) # Faster than .compose() # return self.query.compose({'lon': BOX, 'lat': BOX}, nrows=nrows) @deprecated("this method is replaced by `ArgoIndex().query.box()`", version="1.1.0") def search_lat_lon_tim(self, BOX, nrows=None): """Deprecated: this method is replaced by ``ArgoIndex().query.box()``""" return self.query.box(BOX, nrows=nrows) @deprecated("this method is replaced by `ArgoIndex().query.params()`", version="1.1.0") def search_params(self, PARAMs, logical: bool = "and", nrows=None): """Deprecated: this method is replaced by ``ArgoIndex().query.params()``""" return self.query.params(PARAMs, logical=logical, nrows=nrows) @deprecated("this method is replaced by `ArgoIndex().query.parameter_data_mode()`", version="1.1.0") def search_parameter_data_mode( self, PARAMs: dict, logical: bool = "and", nrows=None ): """Deprecated: this method is replaced by ``ArgoIndex().query.parameter_data_mode()``""" return self.query.parameter_data_mode(PARAMs, logical=logical, nrows=nrows) @deprecated("this method is replaced by `ArgoIndex().query.profiler_type()`", version="1.1.0") def search_profiler_type(self, profiler_type: List[int], nrows=None): """Deprecated: this method is replaced by ``ArgoIndex().query.profiler_type()``""" return self.query.profiler_type(profiler_type, nrows=nrows) @deprecated("this method is replaced by `ArgoIndex().query.profiler_label()`", version="1.1.0") def search_profiler_label(self, profiler_label: str, nrows=None): """Deprecated: this method is replaced by ``ArgoIndex().query.profiler_label()``""" return self.query.profiler_label(profiler_label, nrows=nrows) def _insert_header(self, originalfile): if self.convention == "ar_index_global_prof": header = """# Title : Profile directory file of the Argo Global Data Assembly Center # Description : The directory file describes all individual profile files of the argo GDAC ftp site. # Project : ARGO # Format version : 2.0 # Date of update : %s # FTP root number 1 : ftp://ftp.ifremer.fr/ifremer/argo/dac # FTP root number 2 : ftp://usgodae.org/pub/outgoing/argo/dac # GDAC node : CORIOLIS file,date,latitude,longitude,ocean,profiler_type,institution,date_update """ % pd.to_datetime( "now", utc=True ).strftime( "%Y%m%d%H%M%S" ) elif self.convention == "argo_bio-profile_index": header = """# Title : Bio-Profile directory file of the Argo Global Data Assembly Center # Description : The directory file describes all individual bio-profile files of the argo GDAC ftp site. # Project : ARGO # Format version : 2.2 # Date of update : %s # FTP root number 1 : ftp://ftp.ifremer.fr/ifremer/argo/dac # FTP root number 2 : ftp://usgodae.org/pub/outgoing/argo/dac # GDAC node : CORIOLIS file,date,latitude,longitude,ocean,profiler_type,institution,parameters,parameter_data_mode,date_update """ % pd.to_datetime( "now", utc=True ).strftime( "%Y%m%d%H%M%S" ) elif self.convention == "argo_synthetic-profile_index": header = """# Title : Synthetic-Profile directory file of the Argo Global Data Assembly Center # Description : The directory file describes all individual synthetic-profile files of the argo GDAC ftp site. # Project : ARGO # Format version : 2.2 # Date of update : %s # FTP root number 1 : ftp://ftp.ifremer.fr/ifremer/argo/dac # FTP root number 2 : ftp://usgodae.org/pub/outgoing/argo/dac # GDAC node : CORIOLIS file,date,latitude,longitude,ocean,profiler_type,institution,parameters,parameter_data_mode,date_update """ % pd.to_datetime( "now", utc=True ).strftime( "%Y%m%d%H%M%S" ) elif self.convention == "argo_aux-profile_index": header = """# Title : Aux-Profile directory file of the Argo Global Data Assembly Center # Description : The directory file describes all aux-profile files of the argo GDAC ftp site. # Project : ARGO # Format version : 2.2 # Date of update : %s # FTP root number 1 : ftp://ftp.ifremer.fr/ifremer/argo/dac # FTP root number 2 : ftp://usgodae.org/pub/outgoing/argo/dac # GDAC node : CORIOLIS file,date,latitude,longitude,ocean,profiler_type,institution,parameters,date_update """ % pd.to_datetime( "now", utc=True ).strftime( "%Y%m%d%H%M%S" ) elif self.convention == "ar_index_global_meta": header = """# Title : Metadata directory file of the Argo Global Data Assembly Center # Description : The directory file describes all metadata files of the argo GDAC ftp site. # Project : ARGO # Format version : 2.0 # Date of update : %s # FTP root number 1 : ftp://ftp.ifremer.fr/ifremer/argo/dac # FTP root number 2 : ftp://usgodae.org/pub/outgoing/argo/dac # GDAC node : CORIOLIS file,profiler_type,institution,date_update """ % pd.to_datetime( "now", utc=True ).strftime( "%Y%m%d%H%M%S" ) with open(originalfile, "r") as f: data = f.read() with open(originalfile, "w") as f: f.write(header) f.write(data) return originalfile def _copy( self, deep: bool = True, ) -> Self: cls = self.__class__ if deep: # Ensure complete independence between the original and the copied index: obj = cls.__new__(cls) obj.__init__( host=copy.deepcopy(self.host), index_file=copy.deepcopy(self.index_file), timeout=copy.deepcopy(self.timeout), cache=copy.deepcopy(self.cache), cachedir=copy.deepcopy(self.cachedir), ) if hasattr(self, "index"): obj._nrows_index = copy.deepcopy(self._nrows_index) obj.index = copy.deepcopy(self.index) if self.cache: obj.index_path_cache = copy.deepcopy(self.index_path_cache) else: obj = cls.__new__(cls) obj.__init__( host=copy.copy(self.host), index_file=copy.copy(self.index_file), timeout=copy.copy(self.timeout), cache=copy.copy(self.cache), cachedir=copy.copy(self.cachedir), ) if hasattr(self, "index"): obj._nrows_index = copy.copy(self._nrows_index) obj.index = copy.copy(self.index) if self.cache: obj.index_path_cache = copy.copy(self.index_path_cache) if hasattr(self, "search"): obj.search_type = copy.copy(self.search_type) obj.search_filter = copy.copy(self.search_filter) obj.search = copy.copy(self.search) if obj.cache: obj.search_path_cache = copy.copy(self.search_path_cache) return obj def __copy__(self) -> Self: return self._copy(deep=False) def __deepcopy__(self) -> Self: return self._copy(deep=True) def copy( self, deep: bool = True, ) -> Self: """Returns a copy of this :class:`ArgoIndex` instance A copy is a new instance based on similar parameters (e.g. ``host`` and ``index_file``). A deep copy ensure complete independence between the original and the copied index. If the index was loaded, a new view is returned with the copied index, but search parameters and results are lost. A shallow copy preserves the index array, search parameters and results. Parameters ---------- deep: bool, optional, default=True Whether the search parameters and results are copied onto the new ArgoIndex instance. Returns ------- :class:`ArgoIndex` """ return self._copy(deep=deep) def iterfloats(self, index=False, chunksize: int = None): """Iterate over unique Argo floats in the full index or search results By default, iterate over a single float, otherwise use the `chunksize` argument to iterate over chunk of floats. Parameters ---------- index: bool, optional, default=False Passed to :class:`ArgoIndex.read_wmo` in order to choose if we iterate over all WMOs of the index or only those matching search results. chunksize: int, optional Maximum chunk size Eg: A value of 5 will create chunks with as many as 5 WMOs each. Returns ------- Iterator of :class:`ArgoFloat` Examples -------- .. code-block:: python :caption: Example of iteration # Make a search on Argo index of profiles: idx = ArgoIndex().search_lat_lon([lon_min, lon_max, lat_min, lat_max]) # Then iterate over float matching the results: for float in idx.iterfloats(): float # is a ArgoFloat instance """ from .. import ArgoFloat # Prevent circular import wmos = self.read_wmo(index=index) if chunksize is not None: chk_opts = {} chk_opts.update({'chunks': {'wmo': 'auto'}}) chk_opts.update({'chunksize': {'wmo': chunksize}}) chunked = Chunker({'wmo': self.read_wmo(index=index)}, **chk_opts).fit_transform() for grp in chunked: yield [ArgoFloat(wmo, idx=self) for wmo in grp] else: for wmo in wmos: yield ArgoFloat(wmo, idx=self)
class ArgoIndexExtension: """Prototype for ArgoIndex extensions All extensions should inherit from this class This prototype makes available: - the :class:`ArgoIndex` instance as ``self._obj`` """ def __init__(self, obj): self._obj = obj class ArgoIndexSearchEngine(ArgoIndexExtension): """:class:`argopy.ArgoIndex` extension providing search methods to query index entries All search methods can be combined with the :meth:`ArgoIndex.query.compose` method, see examples. Examples -------- .. code-block:: python :caption: List of search methods from argopy import ArgoIndex idx = ArgoIndex() idx.query.wmo idx.query.cyc idx.query.wmo_cyc idx.query.lon idx.query.lat idx.query.date idx.query.lat_lon idx.query.box idx.query.params idx.query.parameter_data_mode idx.query.profiler_type idx.query.profiler_label .. code-block:: python :caption: Composition of queries from argopy import ArgoIndex idx = ArgoIndex(index_file='bgc-s') idx.query.compose({'box': BOX, 'wmo': WMOs}) idx.query.compose({'box': BOX, 'params': 'DOXY'}) idx.query.compose({'box': BOX, 'params': 'DOXY'}) idx.query.compose({'box': BOX, 'params': (['DOXY', 'DOXY2'], {'logical': 'and'})}) idx.query.compose({'params': 'DOXY', 'profiler_label': 'ARVOR'}) Note that composing query with: - ``wmo`` and ``cyc`` is slower than using the ``wmo_cyc`` method - ``lon`` and ``lat`` is slower than using the ``lon_lat`` method - ``lon``, ``lat`` and ``date`` is slower than using the ``box`` method """ @abstractmethod def wmo(self): """Search index for floats defined by WMO Parameters ---------- WMOs: list(int) or list(str) List of WMOs to search Returns ------- :class:`ArgoIndex` Examples -------- .. code-block:: python from argopy import ArgoIndex idx = ArgoIndex(index_file='core') idx.query.wmo(2901746) idx.query.wmo([2901746, 4902252]) """ raise NotImplementedError @abstractmethod def cyc(self): """Search index for cycle numbers Parameters ---------- CYCs: list(int) or list(str) List of cycle number to search Returns ------- :class:`ArgoIndex` Examples -------- .. code-block:: python from argopy import ArgoIndex idx = ArgoIndex(index_file='core') idx.query.cyc(1) idx.query.cyc([1,2]) """ raise NotImplementedError("Not implemented") @abstractmethod def wmo_cyc(self): """Search index for floats defined by their WMO and specific cycle numbers Parameters ---------- WMOs: list(int) or list(str) List of WMOs to search CYCs: list(int) or list(str) List of cycle number to search Returns ------- :class:`ArgoIndex` Examples -------- .. code-block:: python from argopy import ArgoIndex idx = ArgoIndex(index_file='core') idx.search_wmo_cyc(2901746, 12) idx.search_wmo_cyc([2901746, 4902252], [1,2]) """ raise NotImplementedError("Not implemented") @abstractmethod def lon(self): """Search index for a meridional band Parameters ---------- BOX : list() An index box to search Argo records for. Returns ------- :class:`ArgoIndex` Warnings -------- Only longitude bounds are used from the index box. Examples -------- .. code-block:: python from argopy import ArgoIndex idx = ArgoIndex(index_file='core') idx.query.lon([-60, -55, 40., 45., '2007-08-01', '2007-09-01']) """ raise NotImplementedError("Not implemented") @abstractmethod def lat(self): """Search index for a zonal band Parameters ---------- BOX : list() An index box to search Argo records for. Returns ------- :class:`ArgoIndex` Warnings -------- Only latitude bounds are used from the index box. Examples -------- .. code-block:: python from argopy import ArgoIndex idx = ArgoIndex(index_file='core') idx.query.lat([-60, -55, 40., 45., '2007-08-01', '2007-09-01']) """ raise NotImplementedError("Not implemented") @abstractmethod def date(self): """Search index for a date range Parameters ---------- BOX : list() An index box to search Argo records for. Returns ------- :class:`ArgoIndex` Warnings -------- Only date bounds are used from the index box. Examples -------- .. code-block:: python from argopy import ArgoIndex idx = ArgoIndex(index_file='core') idx.query.date([-60, -55, 40., 45., '2007-08-01', '2007-09-01']) """ raise NotImplementedError("Not implemented") @abstractmethod def lon_lat(self): """Search index for a rectangular longitude/latitude domain Parameters ---------- BOX : list() An index box to search Argo records for. Returns ------- :class:`ArgoIndex` Warnings -------- Only lat/lon bounds are used from the index box. Examples -------- .. code-block:: python from argopy import ArgoIndex idx = ArgoIndex(index_file='core') idx.query.lon_lat([-60, -55, 40., 45., '2007-08-01', '2007-09-01']) """ raise NotImplementedError("Not implemented") @abstractmethod def box(self): """Search index for a box: a rectangular latitude/longitude domain and time range Parameters ---------- BOX : list() An index box to search Argo records for. Returns ------- :class:`ArgoIndex` Examples -------- .. code-block:: python from argopy import ArgoIndex idx = ArgoIndex(index_file='core') idx.query.box([-60, -55, 40., 45., '2007-08-01', '2007-09-01']) """ raise NotImplementedError("Not implemented") @abstractmethod def params(self): """Search index for one or a list of parameters Parameters ---------- PARAMs: str or list(str) A string or a list of strings to search Argo records for in the PARAMETERS columns of BGC profiles index. logical: str, default='and' Indicate to search for all (``and``) or any (``or``) of the parameters. Returns ------- :class:`ArgoIndex` Examples -------- .. code-block:: python from argopy import ArgoIndex idx = ArgoIndex(index_file='bgc-s') idx.query.params(['C1PHASE_DOXY', 'DOWNWELLING_PAR']) idx.query.params(['C1PHASE_DOXY', 'DOWNWELLING_PAR'], logical='or') Warnings -------- This method is only available for index following the ``bgc-s``, ``bgc-b`` and ``aux`` conventions. """ raise NotImplementedError("Not implemented") @abstractmethod def parameter_data_mode(self): """Search index for profiles with a parameter in a specific data mode Parameters ---------- PARAMs: dict A dictionary with parameters as keys, and data mode as a string or a list of strings logical: str, default='and' Indicate to search for all (``and``) or any (``or``) of the parameters data mode. This operator applies between each parameter. Returns ------- :class:`ArgoIndex` Examples -------- .. code-block:: python from argopy import ArgoIndex idx = ArgoIndex(index_file='bgc-s') idx.query.parameter_data_mode({'TEMP': 'D'}) idx.query.parameter_data_mode({'BBP700': 'D'}) idx.query.parameter_data_mode({'DOXY': ['R', 'A']}) idx.query.parameter_data_mode({'BBP700': 'D', 'DOXY': 'D'}, logical='or') """ raise NotImplementedError("Not implemented") @abstractmethod def profiler_type(self): """Search index for profiler types Parameters ---------- profiler_type: list(str) List of profiler types to search for. Valid types are given by integers with values from the R8 Argo Reference table Returns ------- :class:`ArgoIndex` Examples -------- .. code-block:: python from argopy import ArgoIndex idx = ArgoIndex(index_file='core') idx.query.profiler_type(845) from argopy import ArgoNVSReferenceTables valid_types = ArgoNVSReferenceTables().tbl(8)['altLabel'] See Also -------- :class:`ArgoIndex.query.profiler_label` """ raise NotImplementedError("Not implemented") def profiler_label(self, profiler_label: str, nrows=None, composed=False): """Search index for profiler types with a given string in their label Parameters ---------- profiler_label: str The string to be found in the R8 Argo Reference table label Returns ------- :class:`ArgoIndex` Examples -------- .. code-block:: python from argopy import ArgoIndex idx = ArgoIndex(index_file='bgc-s') idx.query.profiler_label('ARVOR') See Also -------- :class:`ArgoIndex.query.profiler_type` """ def checker(profiler_label): if "profiler_type" not in self._obj.convention_columns: raise InvalidDatasetStructure("Cannot search for profiler labels in this index)") log.debug("Argo index searching for profiler label '%s' ..." % profiler_label) profiler_label = to_list(profiler_label) profiler_type = [] for ptype, long_name in self._obj._r8.items(): for label in profiler_label: if label in long_name: profiler_type.append(ptype) return profiler_type def namer(profiler_label): self._obj.search_type.pop('PTYPE') return {"PLABEL": profiler_label} def composer(profiler_type): return self.profiler_type(profiler_type, nrows=nrows, composed=True) profiler_type = checker(profiler_label) self._obj.load(nrows=self._obj._nrows_index) search_filter = composer(profiler_type) if not composed: self._obj.search_type = namer(profiler_label) self._obj.search_filter = search_filter self._obj.run(nrows=nrows) return self._obj else: self._obj.search_type.update(namer(profiler_label)) return search_filter def compose(self, query: dict, nrows=None): """Compose query with multiple search methods Parameters ---------- query: dict A dictionary with search method as keys and search criteria as values Returns ------- :class:`ArgoIndex` Examples -------- .. code-block:: python from argopy import ArgoIndex idx = ArgoIndex(index_file='bgc-s') idx.query.compose({'box': BOX, 'wmo': WMOs}) idx.query.compose({'box': BOX, 'params': 'DOXY'}) idx.query.compose({'box': BOX, 'params': 'DOXY'}) idx.query.compose({'box': BOX, 'params': (['DOXY', 'DOXY2'], {'logical': 'and'})}) idx.query.compose({'params': 'DOXY', 'profiler_label': 'ARVOR'}) """ self._obj.search_type = {} filters = [] for entry, arg in query.items(): searcher = getattr(self, entry) if not isinstance(arg, tuple): filter = searcher(arg, composed=True) else: kw = arg[1] kw.update({'composed': True}) filter = searcher(arg[0], **kw) filters.append(filter) self._obj.search_filter = self._obj._reduce_a_filter_list(filters, op='and') self._obj.run(nrows=nrows) return self._obj