""" Argo index store """
import numpy as np
import pandas as pd
from abc import ABC, abstractmethod
import hashlib
from ..errors import DataNotFound
from ..options import OPTIONS
from .filesystems import filestore, memorystore
def safe_rewind(this_index_obj):
""" Rewind io.TextIOWrapper if seekable """
if this_index_obj.seekable():
this_index_obj.seek(0)
# except io.UnsupportedOperation:
# # print(type(this_index_obj))
# pass
return this_index_obj
class indexfilter_proto(ABC):
""" Class prototype for an Argo index filter
Such classes requires a ``run`` and ``uri`` methods.
"""
@abstractmethod
def run(self):
""" Take a class:`io.TextIOWrapper` and return filtered data as string (csv likes)
Parameters
----------
index_file: class:`io.TextIOWrapper`
Returns
-------
csv rows matching the request, as in-memory string. Or None.
"""
raise NotImplementedError("Not implemented")
@property
@abstractmethod
def uri(self):
""" Return a name for one specific filter run """
raise NotImplementedError("Not implemented")
@property
def sha(self):
""" Unique filter hash string """
return hashlib.sha256(self.uri.encode()).hexdigest()
def search_null(self, index):
""" Perform a null search, ie return the full argo index file
Parameters
----------
index: :class:`io.TextIOWrapper`
Returns
-------
csv index, as a string
"""
safe_rewind(index)
for line in index:
if line[0] != '#':
break
return index.read()
[docs]class indexfilter_wmo(indexfilter_proto):
""" Legacy Index filter based on WMO and/or CYCLE_NUMER. Deprecated soon.
This is intended to be used by instances of an indexstore
Examples
--------
# Create filters:
filt = indexfilter_wmo(WMO=13857)
filt = indexfilter_wmo(WMO=13857, CYC=np.arange(1,10))
filt = indexfilter_wmo(WMO=[13857, 13858, 12], CYC=12)
filt = indexfilter_wmo(WMO=[13857, 13858, 12], CYC=[1, 12])
filt = indexfilter_wmo(CYC=250)
filt = indexfilter_wmo()
# Filter name:
print(filt.uri)
# Direct usage:
with open("/Volumes/Data/ARGO/ar_index_global_prof.txt", "r") as f:
results = filt.run(f)
# With the indexstore:
indexstore(cache=1, index_file="/Volumes/Data/ARGO/ar_index_global_prof.txt").read_csv(filt)
"""
[docs] def __init__(self, WMO: list = [], CYC=None, **kwargs):
""" Create Argo index filter for WMOs/CYCs
Parameters
----------
WMO : list(int)
The list of WMOs to search
CYC : int, np.array(int), list(int)
The cycle numbers to search for each WMO
"""
if isinstance(WMO, int):
WMO = [WMO] # Make sure we deal with a list
if isinstance(CYC, int):
CYC = np.array((CYC,), dtype='int') # Make sure we deal with an array of integers
if isinstance(CYC, list):
CYC = np.array(CYC, dtype='int') # Make sure we deal with an array of integers
self.WMO = sorted(WMO)
self.CYC = CYC
@property
def uri(self):
""" Return a unique name for this filter instance """
if len(self.WMO) > 1:
listname = ["WMO%i" % i for i in sorted(self.WMO)]
if isinstance(self.CYC, (np.ndarray)):
[listname.append("CYC%0.4d" % i) for i in sorted(self.CYC)]
listname = "_".join(listname)
elif len(self.WMO) == 0:
if isinstance(self.CYC, (np.ndarray)):
listname = ["AllWMOs"]
[listname.append("CYC%0.4d" % i) for i in sorted(self.CYC)]
else:
listname = ["FULL"]
listname = "_".join(listname)
else:
listname = "WMO%i" % self.WMO[0]
if isinstance(self.CYC, (np.ndarray)):
listname = [listname]
[listname.append("CYC%0.4d" % i) for i in sorted(self.CYC)]
listname = "_".join(listname)
if len(listname) > 256:
listname = hashlib.sha256(listname.encode()).hexdigest()
return listname
def define_search_this(self, cyc):
""" Return a search function for a given cycle number """
if np.all(cyc >= 1000):
def search_this(this_line):
# return np.any([re.search("%0.4d.nc" % c, this_line.split(',')[0]) for c in cyc])
return np.any(["%0.4d.nc" % c in this_line for c in cyc])
else:
def search_this(this_line):
# return np.any([re.search("%0.3d.nc" % c, this_line.split(',')[0]) for c in cyc])
return np.any(["%0.3d.nc" % c in this_line for c in cyc])
return search_this
def search_one_wmo(self, index, wmo):
""" Search for a WMO in an argo index file
Parameters
----------
index_file: _io.TextIOWrapper
wmo: int
Returns
-------
csv chunk matching the request, as a string. Or None
"""
safe_rewind(index)
results = ""
il_read, il_loaded, il_this = 0, 0, 0
for line in index:
il_this = il_loaded
# if re.search("/%i/" % wmo, line.split(',')[0]):
if "/%i/" % wmo in line: # much faster than re
# Search for the wmo at the beginning of the file name under: /<dac>/<wmo>/profiles/
results += line
il_loaded += 1
if il_this == il_loaded and il_this > 0:
break # Since the index is sorted, once we found the float, we can stop reading the index !
il_read += 1
if il_loaded > 0:
return results
else:
return None
def search_any_wmo_cyc(self, index, cyc):
""" Search for a WMO in an argo index file
Parameters
----------
index_file: _io.TextIOWrapper
cyc: array of integers
Returns
-------
csv chunk matching the request, as a string. Or None
"""
search_this = self.define_search_this(cyc)
safe_rewind(index)
results = ""
il_read, il_loaded = 0, 0
for line in index:
if search_this(line):
results += line
il_loaded += 1
il_read += 1
if il_loaded > 0:
return results
else:
return None
def search_one_wmo_cyc(self, index, wmo, cyc):
""" Search for a WMO and CYC in an argo index file
Parameters
----------
index: _io.TextIOWrapper
wmo: int
cyc: array of integers
Returns
-------
csv chunk matching the request, as a string. Or None
"""
safe_rewind(index)
results = ""
# Look for the float:
il_read, il_loaded, il_this = 0, 0, 0
for line in index:
il_this = il_loaded
# if re.search("/%i/" % wmo, line.split(',')[0]):
if "/%i/" % wmo in line: # much faster than re
results += line
il_loaded += 1
if il_this == il_loaded and il_this > 0:
break # Since the index is sorted, once we found the float, we can stop reading the index !
il_read += 1
# Then look for the profile:
if results:
search_this = self.define_search_this(cyc)
il_loaded, cyc_results = 0, ""
for line in results.split():
if search_this(line):
il_loaded += 1
cyc_results += line + "\n"
if il_loaded > 0:
return cyc_results
else:
return None
def run(self, index_file):
""" Run search on an Argo index file
Parameters
----------
index_file: class:`io.TextIOWrapper`
Argo index text stream
Returns
-------
csv rows matching the request, as in-memory string. Or None.
"""
# Run the filter with the appropriate one-line search
if len(self.WMO) > 1:
if isinstance(self.CYC, (np.ndarray)):
return "".join([r for r in [self.search_one_wmo_cyc(index_file, w, self.CYC) for w in self.WMO] if r])
else:
return "".join([r for r in [self.search_one_wmo(index_file, w) for w in self.WMO] if r])
elif len(self.WMO) == 0: # Search for cycle numbers only
if isinstance(self.CYC, (np.ndarray)):
return self.search_any_wmo_cyc(index_file, self.CYC)
else:
# No wmo, No cyc, return the full index:
return self.search_null(index_file)
else:
if isinstance(self.CYC, (np.ndarray)):
return self.search_one_wmo_cyc(index_file, self.WMO[0], self.CYC)
else:
return self.search_one_wmo(index_file, self.WMO[0])
[docs]class indexfilter_box(indexfilter_proto):
""" Legacy Index filter based on LATITUDE, LONGITUDE, DATE. Deprecated soon.
This is intended to be used by instances of an indexstore
Examples
--------
# Create filters:
filt = indexfilter_box(BOX=[-70, -65, 30., 35.])
filt = indexfilter_box(BOX=[-70, -65, 30., 35., '2012-01-01', '2012-06-30'])
# Filter name:
print(filt.uri)
# Direct usage:
with open("/Volumes/Data/ARGO/ar_index_global_prof.txt", "r") as f:
results = filt.run(f)
# With the indexstore:
indexstore(cache=1, index_file="/Volumes/Data/ARGO/ar_index_global_prof.txt").read_csv(filt)
"""
[docs] def __init__(self, BOX: list = [], **kwargs):
""" Create Argo index filter for LATITUDE, LONGITUDE, DATE
Parameters
----------
box : list(float, float, float, float, str, str)
The box domain to load all Argo data for:
box = [lon_min, lon_max, lat_min, lat_max, datim_min, datim_max]
"""
# is_indexbox(BOX)
self.BOX = BOX
def _format(self, x, typ):
""" string formatting helper """
if typ == 'lon':
if x < 0:
x = 360. + x
return ("%05d") % (x * 100.)
if typ == 'lat':
return ("%05d") % (x * 100.)
if typ == 'prs':
return ("%05d") % (np.abs(x) * 10.)
if typ == 'tim':
return pd.to_datetime(x).strftime('%Y%m%d')
return str(x)
@property
def uri(self):
""" Return a unique name for this filter instance """
BOX = self.BOX
if len(BOX) == 4:
boxname = ("[x=%0.2f/%0.2f; y=%0.2f/%0.2f]") % (BOX[0], BOX[1], BOX[2], BOX[3])
else:
boxname = ("[x=%0.2f/%0.2f; y=%0.2f/%0.2f; t=%s/%s]") % (BOX[0], BOX[1], BOX[2], BOX[3], BOX[4], BOX[5])
# if len(boxname) > 256:
# boxname = hashlib.sha256(boxname.encode()).hexdigest()
return hashlib.sha256(boxname.encode()).hexdigest()
# return boxname
def search_latlon(self, index):
""" Search
Parameters
----------
index: _io.TextIOWrapper
Returns
-------
csv chunk matching the request, as a string. Or None
"""
safe_rewind(index)
results = ""
iv_lat, iv_lon = 2, 3
il_loaded = 0
for ii in range(0, 9):
index.readline()
for line in index:
this_line = line.split(",")
if this_line[iv_lon] != "" and this_line[iv_lat] != "":
x = float(this_line[iv_lon])
y = float(this_line[iv_lat])
if x >= self.BOX[0] and x <= self.BOX[1] and y >= self.BOX[2] and y <= self.BOX[3]:
results += line
il_loaded += 1
if il_loaded > 0:
return results
else:
return None
def search_tim(self, index):
""" Search
Parameters
----------
index: str csv like
Returns
-------
csv chunk matching the request, as a string. Or None
"""
results = ""
iv_tim = 1
il_loaded = 0
for line in index.split():
this_line = line.split(",")
if this_line[iv_tim] != "":
t = pd.to_datetime(str(this_line[iv_tim]))
if t >= pd.to_datetime(self.BOX[4]) and t <= pd.to_datetime(self.BOX[5]):
results += line + "\n"
il_loaded += 1
if il_loaded > 0:
return results
else:
return None
def search_latlontim(self, index):
""" Search
Parameters
----------
index: _io.TextIOWrapper
Returns
-------
csv chunk matching the request, as a string. Or None
"""
# First search in space:
results = self.search_latlon(index)
# Then refine in time:
if results:
results = self.search_tim(results)
return results
def run(self, index_file):
""" Run search on an Argo index file
Parameters
----------
index_file: class:`io.TextIOWrapper`
Argo index text stream
Returns
-------
csv rows matching the request, as in-memory string. Or None.
"""
# Run the filter:
if len(self.BOX) == 4:
return self.search_latlon(index_file)
else:
return self.search_latlontim(index_file)
[docs]class indexstore():
"""Legacy Argo index store. Deprecated soon.
Examples
--------
BOX = [-60, -55, 40., 45., '2007-08-01', '2007-09-01']
filt = indexfilter_box(BOX)
df = indexstore(cache=True).read_csv(filt)
"""
[docs] def __init__(self,
cache: bool = False,
cachedir: str = "",
index_file: str = "ar_index_global_prof.txt",
**kw):
""" Create a file storage system for Argo index file requests
Parameters
----------
cache : bool (False)
cachedir : str (used value in global OPTIONS)
index_file: str ("ar_index_global_prof.txt")
"""
self.index_file = index_file
self.cache = cache
self.cachedir = OPTIONS['cachedir'] if cachedir == '' else cachedir
self.fs = {}
self.fs['index'] = filestore(cache, cachedir) # Manage the full index
self.fs['search'] = memorystore(cache, cachedir) # Manage the search results
def cachepath(self, uri: str, errors: str = 'raise'):
""" Return path to cached file for a given URI """
return self.fs['search'].cachepath(uri, errors)
def clear_cache(self):
self.fs['index'].clear_cache()
self.fs['search'].clear_cache()
# def in_cache(self, fs, uri):
# """ Return True if uri is cached """
# if not uri.startswith(fs.target_protocol):
# store_path = fs.target_protocol + "://" + uri
# else:
# store_path = uri
# fs.load_cache()
# return store_path in fs.cached_files[-1]
# def in_memory(self, fs, uri):
# """ Return True if uri is in memory """
# return fs.exists(uri)
# def open_index(self):
# return self.fs['index'].open(self.index_file, "r")
def res2dataframe(self, results):
""" Convert a csv like string into a DataFrame
If one columns has a missing value, the row is skipped
Parameters
----------
str
Returns
-------
:class:`pandas.Dataframe`
"""
cols_name = ['file', 'date', 'latitude', 'longitude', 'ocean', 'profiler_type', 'institution', 'date_update']
cols_type = {'file': np.str_, 'date': np.datetime64, 'latitude': np.float32, 'longitude': np.float32,
'ocean': np.str_, 'profiler_type': np.str_, 'institution': np.str_, 'date_update': np.datetime64}
data = [x.split(',') for x in results.split('\n') if ",," not in x]
return pd.DataFrame(data, columns=cols_name).astype(cols_type)[:-1]
def read_csv(self, search):
""" Run a search on an csv Argo index file and return a Pandas DataFrame with results
Parameters
----------
search: :class:`indexfilter_wmo` or :class:`indexfilter_box`
Class instance inheriting from :class:`indexfilter_proto`
Returns
-------
:class:`pandas.DataFrame`
"""
if self.fs['search'].exists(search.uri):
# print('\nSearch already in memory, loading:', search.uri)
results = ""
with self.fs['search'].fs.open(search.uri, "r") as of:
results += of.readline()
else:
# print('\nRunning search from scratch ...')
with self.fs['index'].open(self.index_file, "r") as f:
# Run search:
results = search.run(f)
if not results:
raise DataNotFound("No data found in index: %s" % search.uri)
# and save results for caching:
if self.cache:
with self.fs['search'].open(search.uri, "w") as of:
of.write(results) # Save in "memory"
results = ""
with self.fs['search'].fs.open(search.uri, "r") as of:
results += of.readline() # Trigger save in cache file
return self.res2dataframe(results)