Source code for argopy.stores.argo_index_proto_s3

"""
Additional index support for S3
"""
import io
import pandas as pd
import logging
from decorator import decorator
import warnings

from ..utils.checkers import check_index_cols, check_wmo, check_cyc, is_list_of_strings, has_aws_credentials, HAS_BOTO3
from ..stores import s3store


try:
    import pyarrow.csv as csv  # noqa: F401
    import pyarrow as pa
    HAS_PYARROW = True
except ModuleNotFoundError:
    HAS_PYARROW = False

    class pa:
        @property
        def Table(self):
            pass
    pass

if HAS_BOTO3:
    import boto3
    from botocore import UNSIGNED
    from botocore.client import Config


log = logging.getLogger("argopy.stores.index.s3")


@decorator
def requires_pyarrow(func, *args, **kwargs):
    if not HAS_PYARROW:
        warnings.warn("The 'pyarrow' library is not installed. Please install it to ensure full functionality.")
    return func(*args, **kwargs)


[docs] class s3index: """ A prototype for an Argo index store relying on remote CSV or PQ index data on S3. The index is scanned/searched directly on the s3 server using the :class:`boto3.client.select_object_content` boto3 method. The key limitation here is that we can only search for WMO and CYC. This is due to the fact that GM could not manage to convert (CAST) latitude, longitude and time to a more appropriate data type to execute a search. All variables are considered string by default. Examples -------- idx = s3index() idx.search_wmo(6903091) idx.search_wmo(6903091) idx.search_wmo([13857, 6903091]) idx.search_cyc(1) idx.search_cyc(1, nrows=100) idx.search_cyc([0, 1]) idx.search_cyc([0, 1], nrows=100) idx.search_wmo_cyc(6903091, 1) idx.search_wmo_cyc(6903091, [1, 2]) idx.search_wmo_cyc([13857, 6903091], 1) idx.search_wmo_cyc([13857, 6903091], [1, 2]) idx.pd # Return search results as a :class:`pd.DataFrame` idx.pq # Return search results as a :class:`pa.Table` idx.stats # Data processing stats """ bucket_name = "argo-gdac-sandbox" """Name of the S3 bucket"""
[docs] def __init__(self): # Create a boto3 client to interface with S3 if has_aws_credentials(): self.fs = boto3.client("s3") log.debug("self.fs._request_signer._credentials") log.debug(self.fs._request_signer._credentials) try: access_key = self.fs._request_signer._credentials.get_frozen_credentials().access_key log.debug("Found AWS Credentials for access_key='%s'" % access_key) except: # noqa: E722 pass else: self.fs = boto3.client('s3', config=Config(signature_version=UNSIGNED)) log.debug("No AWS Credentials found, running UNSIGNED anonymous boto3 requests") # search engines won't be available ! self.stats_last = {} self.stats = {} self._sql_logic = 'unique'
def _sio2pd(self, obj_io: io.StringIO) -> pd.DataFrame: def _pd(input_io): this_table = pd.read_csv(input_io, sep=",", index_col=None, header=None, skiprows=0, nrows=None, names=self.colNames ) return this_table index = _pd(obj_io) check_index_cols( index.columns.to_list(), convention=self.convention, ) obj_io.seek(0) # Rewind return index @requires_pyarrow def _sio2pq(self, obj_io: io.StringIO) -> pa.Table: input_bytes = io.BytesIO(obj_io.read().encode('utf8')) if input_bytes.getbuffer().nbytes > 0: index = csv.read_csv( input_bytes, read_options=csv.ReadOptions(use_threads=True, skip_rows=0, column_names=self.colNames ), convert_options=csv.ConvertOptions( column_types={ "date": pa.timestamp("s"), # , tz="utc" "date_update": pa.timestamp("s"), }, timestamp_parsers=["%Y%m%d%H%M%S"], ), ) else: index = self.empty_pq check_index_cols( index.column_names, convention=self.convention, ) obj_io.seek(0) # Rewind return index def query(self, sql_expression: str) -> str: # Use SelectObjectContent to filter CSV data before downloading it try: s3_object = self.fs.select_object_content( Bucket=self.bucket_name, Key=self.key, ExpressionType="SQL", Expression=sql_expression, InputSerialization={"CSV": {"FileHeaderInfo": "IGNORE", 'Comments': '#', 'QuoteEscapeCharacter': '"', 'RecordDelimiter': '\n', 'FieldDelimiter': ',', 'QuoteCharacter': '"', 'AllowQuotedRecordDelimiter': False }, "CompressionType": self.CompressionType}, OutputSerialization={"CSV": {}}, ) except: # noqa: E722 # log.debug(boto3.set_stream_logger('botocore', level='DEBUG')) raise # Iterate over the filtered CSV data records = [] for event in s3_object["Payload"]: if "Records" in event: records.append(event["Records"]["Payload"].decode("utf-8")) elif 'Stats' in event: stats = event['Stats']['Details'] self.stats_last = stats self.stats.update({sql_expression: stats}) return ''.join(r for r in records) def run(self): if not is_list_of_strings(self.sql_expression): self.sql_expression = [self.sql_expression] results = "" for sql in self.sql_expression: results += self.query(sql) self.search = io.StringIO(results) return self def search_wmo(self, WMOs, nrows=None): WMOs = check_wmo(WMOs) if self._sql_logic == 'split': sql = [] for wmo in WMOs: sql.append("SELECT * FROM s3object s WHERE s._1 LIKE '%/{wmo}/profiles/%'".format(wmo=wmo)) else: sql = "SELECT * FROM s3object s WHERE " if len(WMOs) > 1: sql += " OR ".join(["s._1 LIKE '%/{wmo}/profiles/%'".format(wmo=wmo) for wmo in WMOs]) else: sql += "s._1 LIKE '%/{wmo}/profiles/%'".format(wmo=WMOs[0]) if nrows is not None: sql += " LIMIT %i" % nrows self.sql_expression = sql self.run() return self def search_cyc(self, CYCs, nrows=None): CYCs = check_cyc(CYCs) if self._sql_logic == 'split': sql = [] for cyc in CYCs: sql.append("SELECT * FROM s3object s WHERE s._1 LIKE '%/%/profiles/%_{cyc:03d}.nc'".format(cyc=cyc)) else: sql = "SELECT * FROM s3object s WHERE " if len(CYCs) > 1: sql += " OR ".join(["s._1 LIKE '%/%/profiles/%_{cyc:03d}.nc'".format(cyc=cyc) for cyc in CYCs]) else: sql += "s._1 LIKE '%/%/profiles/%_{cyc:03d}.nc'".format(cyc=CYCs[0]) if nrows is not None: sql += " LIMIT %i" % nrows self.sql_expression = sql self.run() return self def search_wmo_cyc(self, WMOs, CYCs, nrows=None): WMOs = check_wmo(WMOs) CYCs = check_cyc(CYCs) if self._sql_logic == 'split': sql = [] for wmo in WMOs: for cyc in CYCs: sql.append( "SELECT * FROM s3object s WHERE s._1 LIKE '%/{wmo}/profiles/%_{cyc:03d}.nc'".format(wmo=wmo, cyc=cyc)) else: sql = "SELECT * FROM s3object s WHERE " if len(WMOs) > 1: if len(CYCs) > 1: sql += " OR ".join( ["s._1 LIKE '%/{wmo}/profiles/%_{cyc:03d}.nc'".format(wmo=wmo, cyc=cyc) for wmo in WMOs for cyc in CYCs]) else: sql += " OR ".join( ["s._1 LIKE '%/{wmo}/profiles/%_{cyc:03d}.nc'".format(wmo=wmo, cyc=CYCs[0]) for wmo in WMOs]) else: if len(CYCs) > 1: sql += " OR ".join( ["s._1 LIKE '%/{wmo}/profiles/%_{cyc:03d}.nc'".format(wmo=WMOs[0], cyc=cyc) for cyc in CYCs]) else: sql += "s._1 LIKE '%/{wmo}/profiles/%_{cyc:03d}.nc'".format(wmo=WMOs[0], cyc=CYCs[0]) if nrows is not None: sql += " LIMIT %i" % nrows self.sql_expression = sql self.run() return self @property def pd(self) -> pd.DataFrame: """Return search result as a :class:`pd.DataFrame`""" if not hasattr(self, 'search'): raise Exception('Execute a search first !') else: return self._sio2pd(self.search) @property @requires_pyarrow def pq(self) -> pa.Table: """Return search result as a :class:`pa.Table`""" if not hasattr(self, 'search'): raise Exception('Execute a search first !') else: return self._sio2pq(self.search)
[docs] class s3index_core(s3index): # key, CompressionType = "pub/idx/ar_index_global_prof.txt", "NONE" key = "pub/idx/ar_index_global_prof.txt.gz" """Path to the index source file""" CompressionType = "GZIP" """Compression used by the index source file""" convention = "ar_index_global_prof" """Argo convention of the index source file""" colNames = [ "file", "date", "latitude", "longitude", "ocean", "profiler_type", "institution", "date_update", ] """List of the index column names""" @property @requires_pyarrow def empty_pq(self): return pa.Table.from_pydict({'file': [], 'date': [], 'latitude': [], 'longitude': [], 'ocean': [], 'profiler_type': [], 'institution': [], 'date_update': [], }, schema=pa.schema([ ("file", pa.string()), ("date", pa.timestamp('s')), ("latitude", pa.float64()), ("longitude", pa.float64()), ("ocean", pa.string()), ("profiler_type", pa.int64()), ("institution", pa.string()), ("date_update", pa.timestamp('s')), ]))
[docs] class s3index_bgc_bio(s3index): key = "pub/idx/argo_bio-profile_index.txt.gz" """Path to the index source file""" CompressionType = "GZIP" """Compression used by the index source file""" convention = "argo_bio-profile_index" """Argo convention of the index source file""" colNames = [ "file", "date", "latitude", "longitude", "ocean", "profiler_type", "institution", "parameters", "parameter_data_mode", "date_update", ] """List of the index column names""" @property @requires_pyarrow def empty_pq(self): return pa.Table.from_pydict({'file': [], 'date': [], 'latitude': [], 'longitude': [], 'ocean': [], 'profiler_type': [], 'institution': [], "parameters": [], "parameter_data_mode": [], 'date_update': [], }, schema=pa.schema([ ("file", pa.string()), ("date", pa.timestamp('s')), ("latitude", pa.float64()), ("longitude", pa.float64()), ("ocean", pa.string()), ("profiler_type", pa.int64()), ("institution", pa.string()), ("parameters", pa.string()), ("parameter_data_mode", pa.string()), ("date_update", pa.timestamp('s')), ]))
[docs] class s3index_bgc_synthetic(s3index_bgc_bio): key = "pub/idx/argo_synthetic-profile_index.txt.gz" """Path to the index source file""" CompressionType = "GZIP" """Compression used by the index source file""" convention = "argo_synthetic-profile_index" """Argo convention of the index source file"""
def get_a_s3index(convention): if convention == "ar_index_global_prof": return s3index_core() elif convention == "argo_bio-profile_index": return s3index_bgc_bio() elif convention == "argo_synthetic-profile_index": return s3index_bgc_synthetic() @decorator def search_s3(func, *args, **kwargs): """Decorator for search methods patched for S3 store This decorator will bypass :class:`argopy.stores.indexstore` search methods with a boto3 more efficient design when using the S3 Argo index store. Note that search methods are bypassed only if the index was not loaded before, otherwise we're using the store original method working with the internal index structure (pandas dataframe or pyarrow table). """ idx = args[0] if func.__name__ == 'search_wmo' and not hasattr(idx, 'index') and isinstance(idx.fs['src'], s3store): WMOs, nrows = args[1], args[2] WMOs = check_wmo(WMOs) # Check and return a valid list of WMOs log.debug( "Argo index searching for WMOs=[%s] using boto3 SQL request ..." % ";".join([str(wmo) for wmo in WMOs]) ) idx.fs['s3'].search_wmo(WMOs, nrows=nrows) idx.search_type = {"WMO": WMOs} idx.search_filter = idx.fs['s3'].sql_expression idx.search = getattr(idx.fs['s3'], idx.ext) return idx if func.__name__ == 'search_cyc' and not hasattr(idx, 'index') and isinstance(idx.fs['src'], s3store): CYCs, nrows = args[1], args[2] CYCs = check_cyc(CYCs) # Check and return a valid list of CYCs log.debug( "Argo index searching for CYCs=[%s] using boto3 SQL request ..." % (";".join([str(cyc) for cyc in CYCs])) ) idx.fs['s3'].search_cyc(CYCs, nrows=nrows) idx.search_type = {"CYC": CYCs} idx.search_filter = idx.fs['s3'].sql_expression idx.search = getattr(idx.fs['s3'], idx.ext) return idx if func.__name__ == 'search_wmo_cyc' and not hasattr(idx, 'index') and isinstance(idx.fs['src'], s3store): WMOs, CYCs, nrows = args[1], args[2], args[3] WMOs = check_wmo(WMOs) # Check and return a valid list of WMOs CYCs = check_cyc(CYCs) # Check and return a valid list of CYCs log.debug( "Argo index searching for WMOs=[%s] and CYCs=[%s] using boto3 SQL request ..." % ( ";".join([str(wmo) for wmo in WMOs]), ";".join([str(cyc) for cyc in CYCs]), ) ) idx.fs['s3'].search_wmo_cyc(WMOs, CYCs, nrows=nrows) idx.search_type = {"WMO": WMOs, "CYC": CYCs} idx.search_filter = idx.fs['s3'].sql_expression idx.search = getattr(idx.fs['s3'], idx.ext) return idx return func(*args, **kwargs)