import os
import warnings
import numpy as np
import pandas as pd
import xarray as xr
from typing import Union
from fsspec.core import split_protocol
import fsspec
from socket import gaierror
import urllib
import json
import logging
import importlib
from ..options import OPTIONS
from ..errors import InvalidDatasetStructure, GdacPathError, InvalidFetcher
from .lists import list_available_data_src, list_available_index_src
from .casting import to_list
log = logging.getLogger("argopy.utils.checkers")
if importlib.util.find_spec("s3fs") is not None:
HAS_S3 = True
import s3fs
else:
HAS_S3 = False
if importlib.util.find_spec("boto3") is not None:
HAS_BOTO3 = True
import boto3
else:
HAS_BOTO3 = False
[docs]
def is_indexbox(box: list, errors="raise"):
"""Check if this array matches a 2d or 3d index box definition
Argopy expects one of the following 2 format to define an index box:
- box = [lon_min, lon_max, lat_min, lat_max]
- box = [lon_min, lon_max, lat_min, lat_max, datim_min, datim_max]
This function check for this format compliance.
Parameters
----------
box: list
errors: str, default='raise'
Returns
-------
bool
"""
def is_dateconvertible(d):
try:
pd.to_datetime(d)
isit = True
except Exception:
isit = False
return isit
tests = {}
# Formats:
tests["index box must be a list"] = lambda b: isinstance(b, list)
tests["index box must be a list with 4 or 6 elements"] = lambda b: len(b) in [4, 6]
# Types:
tests["lon_min must be numeric"] = lambda b: (
isinstance(b[0], int) or isinstance(b[0], (np.floating, float))
)
tests["lon_max must be numeric"] = lambda b: (
isinstance(b[1], int) or isinstance(b[1], (np.floating, float))
)
tests["lat_min must be numeric"] = lambda b: (
isinstance(b[2], int) or isinstance(b[2], (np.floating, float))
)
tests["lat_max must be numeric"] = lambda b: (
isinstance(b[3], int) or isinstance(b[3], (np.floating, float))
)
if len(box) > 4:
tests[
"datetim_min must be a string convertible to a Pandas datetime"
] = lambda b: isinstance(b[-2], str) and is_dateconvertible(b[-2])
tests[
"datetim_max must be a string convertible to a Pandas datetime"
] = lambda b: isinstance(b[-1], str) and is_dateconvertible(b[-1])
# Ranges:
tests["lon_min must be in [-180;180] or [0;360]"] = (
lambda b: b[0] >= -180.0 and b[0] <= 360.0
)
tests["lon_max must be in [-180;180] or [0;360]"] = (
lambda b: b[1] >= -180.0 and b[1] <= 360.0
)
tests["lat_min must be in [-90;90]"] = lambda b: b[2] >= -90.0 and b[2] <= 90
tests["lat_max must be in [-90;90]"] = lambda b: b[3] >= -90.0 and b[3] <= 90.0
# Orders:
tests["lon_max must be larger than lon_min"] = lambda b: b[0] < b[1]
tests["lat_max must be larger than lat_min"] = lambda b: b[2] < b[3]
if len(box) > 4:
tests["datetim_max must come after datetim_min"] = lambda b: pd.to_datetime(
b[-2]
) < pd.to_datetime(b[-1])
error = None
for msg, test in tests.items():
if not test(box):
error = msg
break
if error and errors == "raise":
raise ValueError("%s: %s" % (box, error))
elif error:
return False
else:
return True
[docs]
def is_box(box: list, errors="raise"):
"""Check if this array matches a 3d or 4d data box definition
Argopy expects one of the following 2 format to define a box:
- box = [lon_min, lon_max, lat_min, lat_max, pres_min, pres_max]
- box = [lon_min, lon_max, lat_min, lat_max, pres_min, pres_max, datim_min, datim_max]
This function check for this format compliance.
Parameters
----------
box: list
errors: 'raise'
Returns
-------
bool
"""
def is_dateconvertible(d):
try:
pd.to_datetime(d)
isit = True
except Exception:
isit = False
return isit
tests = {}
# print(box)
# Formats:
tests["box must be a list"] = lambda b: isinstance(b, list)
tests["box must be a list with 6 or 8 elements"] = lambda b: len(b) in [6, 8]
# Types:
tests["lon_min must be numeric"] = lambda b: (
isinstance(b[0], int) or isinstance(b[0], (np.floating, float))
)
tests["lon_max must be numeric"] = lambda b: (
isinstance(b[1], int) or isinstance(b[1], (np.floating, float))
)
tests["lat_min must be numeric"] = lambda b: (
isinstance(b[2], int) or isinstance(b[2], (np.floating, float))
)
tests["lat_max must be numeric"] = lambda b: (
isinstance(b[3], int) or isinstance(b[3], (np.floating, float))
)
tests["pres_min must be numeric"] = lambda b: (
isinstance(b[4], int) or isinstance(b[4], (np.floating, float))
)
tests["pres_max must be numeric"] = lambda b: (
isinstance(b[5], int) or isinstance(b[5], (np.floating, float))
)
if len(box) == 8:
tests[
"datetim_min must be an object convertible to a Pandas datetime"
] = lambda b: is_dateconvertible(b[-2])
tests[
"datetim_max must be an object convertible to a Pandas datetime"
] = lambda b: is_dateconvertible(b[-1])
# Ranges:
tests["lon_min must be in [-180;180] or [0;360]"] = (
lambda b: b[0] >= -180.0 and b[0] <= 360.0
)
tests["lon_max must be in [-180;180] or [0;360]"] = (
lambda b: b[1] >= -180.0 and b[1] <= 360.0
)
tests["lat_min must be in [-90;90]"] = lambda b: b[2] >= -90.0 and b[2] <= 90
tests["lat_max must be in [-90;90]"] = lambda b: b[3] >= -90.0 and b[3] <= 90.0
tests["pres_min must be in [0;10000]"] = lambda b: b[4] >= 0 and b[4] <= 10000
tests["pres_max must be in [0;10000]"] = lambda b: b[5] >= 0 and b[5] <= 10000
# Orders:
tests["lon_max must be larger than lon_min"] = lambda b: b[0] <= b[1]
tests["lat_max must be larger than lat_min"] = lambda b: b[2] <= b[3]
tests["pres_max must be larger than pres_min"] = lambda b: b[4] <= b[5]
if len(box) == 8:
tests["datetim_max must come after datetim_min"] = lambda b: pd.to_datetime(
b[-2]
) <= pd.to_datetime(b[-1])
error = None
for msg, test in tests.items():
if not test(box):
error = msg
break
if error and errors == "raise":
raise ValueError("%s: %s" % (box, error))
elif error:
return False
else:
return True
def is_list_of_strings(lst):
return isinstance(lst, list) and all(isinstance(elem, str) for elem in lst)
def is_list_of_dicts(lst):
return all(isinstance(x, dict) for x in lst)
def is_list_of_datasets(lst):
return all(isinstance(x, xr.Dataset) for x in lst)
def is_list_equal(lst1, lst2):
"""Return true if 2 lists contain same elements"""
return len(lst1) == len(lst2) and len(lst1) == sum(
[1 for i, j in zip(lst1, lst2) if i == j]
)
[docs]
def check_wmo(lst, errors="raise"):
"""Validate a WMO option and returned it as a list of integers
Parameters
----------
wmo: int
WMO must be an integer or an iterable with elements that can be casted as integers
errors: {'raise', 'warn', 'ignore'}
Possibly raises a ValueError exception or UserWarning, otherwise fails silently.
Returns
-------
list(int)
"""
is_wmo(lst, errors=errors)
# Make sure we deal with a list
lst = to_list(lst)
# Then cast list elements as integers
return [abs(int(x)) for x in lst]
[docs]
def is_wmo(lst, errors="raise"): # noqa: C901
"""Check if a WMO is valid
Parameters
----------
wmo: int, list(int), array(int)
WMO must be a single or a list of 5/7 digit positive numbers
errors: {'raise', 'warn', 'ignore'}
Possibly raises a ValueError exception or UserWarning, otherwise fails silently.
Returns
-------
bool
True if wmo is indeed a list of integers
"""
# Make sure we deal with a list
lst = to_list(lst)
# Error message:
# msg = "WMO must be an integer or an iterable with elements that can be casted as integers"
msg = "WMO must be a single or a list of 5/7 digit positive numbers. Invalid: '{}'".format
# Then try to cast list elements as integers, return True if ok
result = True
try:
for x in lst:
if not str(x).isdigit():
result = False
if (len(str(x)) != 5) and (len(str(x)) != 7):
result = False
if int(x) <= 0:
result = False
except Exception:
result = False
if errors == "raise":
raise ValueError(msg(x))
elif errors == "warn":
warnings.warn(msg(x))
if not result:
if errors == "raise":
raise ValueError(msg(x))
elif errors == "warn":
warnings.warn(msg(x))
else:
return result
[docs]
def check_cyc(lst, errors="raise"):
"""Validate a CYC option and returned it as a list of integers
Parameters
----------
cyc: int
CYC must be an integer or an iterable with elements that can be casted as positive integers
errors: {'raise', 'warn', 'ignore'}
Possibly raises a ValueError exception or UserWarning, otherwise fails silently.
Returns
-------
list(int)
"""
is_cyc(lst, errors=errors)
# Make sure we deal with a list
lst = to_list(lst)
# Then cast list elements as integers
return [abs(int(x)) for x in lst]
[docs]
def is_cyc(lst, errors="raise"): # noqa: C901
"""Check if a CYC is valid
Parameters
----------
cyc: int, list(int), array(int)
CYC must be a single or a list of at most 4 digit positive numbers
errors: {'raise', 'warn', 'ignore'}
Possibly raises a ValueError exception or UserWarning, otherwise fails silently.
Returns
-------
bool
True if cyc is indeed a list of integers
"""
# Make sure we deal with a list
lst = to_list(lst)
# Error message:
msg = "CYC must be a single or a list of at most 4 digit positive numbers. Invalid: '{}'".format
# Then try to cast list elements as integers, return True if ok
result = True
try:
for x in lst:
if not str(x).isdigit():
result = False
if len(str(x)) > 4:
result = False
if int(x) < 0:
result = False
except Exception:
result = False
if errors == "raise":
raise ValueError(msg(x))
elif errors == "warn":
warnings.warn(msg(x))
if not result:
if errors == "raise":
raise ValueError(msg(x))
elif errors == "warn":
warnings.warn(msg(x))
else:
return result
def check_index_cols(column_names: list, convention: str = "ar_index_global_prof"):
"""
ar_index_global_prof.txt: Index of profile files
Profile directory file of the Argo Global Data Assembly Center
file,date,latitude,longitude,ocean,profiler_type,institution,date_update
argo_bio-profile_index.txt: bgc Argo profiles index file
The directory file describes all individual bio-profile files of the argo GDAC ftp site.
file,date,latitude,longitude,ocean,profiler_type,institution,parameters,parameter_data_mode,date_update
"""
# Default for 'ar_index_global_prof'
ref = [
"file",
"date",
"latitude",
"longitude",
"ocean",
"profiler_type",
"institution",
"date_update",
]
if (
convention == "argo_bio-profile_index"
or convention == "argo_synthetic-profile_index"
):
ref = [
"file",
"date",
"latitude",
"longitude",
"ocean",
"profiler_type",
"institution",
"parameters",
"parameter_data_mode",
"date_update",
]
if (
convention == "argo_aux-profile_index"
):
# ['file', 'date', 'latitude', 'longitude', 'ocean', 'profiler_type', 'institution', 'parameters', 'date_update']
ref = [
"file",
"date",
"latitude",
"longitude",
"ocean",
"profiler_type",
"institution",
"parameters",
"date_update",
]
if not is_list_equal(column_names, ref):
log.debug("Expected (convention=%s): %s, got: %s" % (convention, ";".join(ref), ";".join(column_names)))
raise InvalidDatasetStructure("Unexpected column names in this index !")
else:
return column_names
def check_gdac_path(path, errors="ignore"): # noqa: C901
"""Check if a path has the expected GDAC structure
Expected GDAC structure::
.
βββ dac
βββ aoml
βββ ...
βββ coriolis
βββ ...
βββ meds
βββ nmdis
This check will return True if at least one DAC sub-folder is found under path/dac/<dac_name>
Examples::
>>> check_gdac_path("https://data-argo.ifremer.fr") # True
>>> check_gdac_path("https://usgodae.org/pub/outgoing/argo") # True
>>> check_gdac_path("ftp://ftp.ifremer.fr/ifremer/argo") # True
>>> check_gdac_path("/home/ref-argo/gdac") # True
>>> check_gdac_path("https://www.ifremer.fr") # False
>>> check_gdac_path("ftp://usgodae.org/pub/outgoing") # False
Parameters
----------
path: str
Path name to check, including access protocol
errors: str
"ignore" or "raise" (or "warn")
Returns
-------
checked: boolean
True if at least one DAC folder is found under path/dac/<dac_name>
False otherwise
"""
# Create a file system for this path
if split_protocol(path)[0] is None:
fs = fsspec.filesystem("file")
elif "https" in split_protocol(path)[0]:
fs = fsspec.filesystem("http")
elif "ftp" in split_protocol(path)[0]:
try:
host = split_protocol(path)[-1].split("/")[0]
fs = fsspec.filesystem("ftp", host=host)
except gaierror:
if errors == "raise":
raise GdacPathError("Can't get address info (GAIerror) on '%s'" % host)
elif errors == "warn":
warnings.warn("Can't get address info (GAIerror) on '%s'" % host)
return False
else:
return False
else:
raise GdacPathError(
"Unknown protocol for an Argo GDAC host: %s" % split_protocol(path)[0]
)
# dacs = [
# "aoml",
# "bodc",
# "coriolis",
# "csio",
# "csiro",
# "incois",
# "jma",
# "kma",
# "kordi",
# "meds",
# "nmdis",
# ]
# Case 1:
check1 = (
fs.exists(path)
and fs.exists(fs.sep.join([path, "dac"]))
# and np.any([fs.exists(fs.sep.join([path, "dac", dac])) for dac in dacs]) # Take too much time on http/ftp GDAC server
)
if check1:
return True
elif errors == "raise":
raise GdacPathError(
"This path is not GDAC compliant (no `dac` folder with legitimate sub-folder):\n%s"
% path
)
elif errors == "warn":
warnings.warn("This path is not GDAC compliant:\n%s" % path)
return False
else:
return False
[docs]
def isconnected(host: str = "https://www.ifremer.fr", maxtry: int = 10):
"""Check if an URL is alive
Parameters
----------
host: str
URL to use, 'https://www.ifremer.fr' by default
maxtry: int, default: 10
Maximum number of host connections to try before
Returns
-------
bool
"""
def test_retry(host, checker, maxtry):
it = 0
while it < maxtry:
try:
checker(host)
result, it = True, maxtry
except Exception:
result, it = False, it + 1
return result
def check_local(host):
return os.path.exists(host)
def check_remote(host):
return urllib.request.urlopen(
host, timeout=1
) # nosec B310 because host protocol already checked
def check_s3(host):
return s3fs.S3FileSystem(anon=True).info(host)
if split_protocol(host)[0] in ["http", "https", "ftp", "sftp"]:
return test_retry(host, check_remote, maxtry)
elif split_protocol(host)[0] == "s3":
if HAS_S3:
return test_retry(host, check_s3, maxtry)
else:
raise ValueError(
"Can't check if an S3 server is connected without the 's3fs' library. Please update your environment "
"with this dependency.")
else:
return test_retry(host, check_local, 1)
[docs]
def urlhaskeyword(url: str = "", keyword: str = "", maxtry: int = 10):
"""Check if a keyword is in the content of a URL
Parameters
----------
url: str
keyword: str
maxtry: int, default: 10
Maximum number of host connections to try before returning False
Returns
-------
bool
"""
it = 0
while it < maxtry:
try:
with fsspec.open(url) as f:
data = f.read()
result = keyword in str(data)
it = maxtry
except Exception:
result, it = False, it + 1
return result
[docs]
def isalive(api_server_check: Union[str, dict] = "") -> bool:
"""Check if an API is alive or not
2 methods are available:
- URL Ping
- keyword Check
Parameters
----------
api_server_check
Url string or dictionary with [``url``, ``keyword``] keys.
- For a string, uses: :class:`argopy.utilities.isconnected`
- For a dictionary, uses: :class:`argopy.utilities.urlhaskeyword`
Returns
-------
bool
"""
# log.debug("isalive: %s" % api_server_check)
if isinstance(api_server_check, dict):
return urlhaskeyword(
url=api_server_check["url"], keyword=api_server_check["keyword"]
)
else:
return isconnected(api_server_check)
[docs]
def isAPIconnected(src="erddap", data=True):
"""Check if a source API is alive or not
The API is connected when it has a live URL or valid folder path.
Parameters
----------
src: str
The data or index source name, 'erddap' default
data: bool
If True check the data fetcher (default), if False, check the index fetcher
Returns
-------
bool
"""
if data:
list_src = list_available_data_src()
else:
list_src = list_available_index_src()
if src in list_src and getattr(list_src[src], "api_server_check", None):
return isalive(list_src[src].api_server_check)
else:
raise InvalidFetcher
def erddap_ds_exists(
ds: Union[list, str] = "ArgoFloats", erddap: str = None, maxtry: int = 2
) -> bool:
"""Check if a dataset exists on a remote erddap server
Parameter
---------
ds: str, default='ArgoFloats'
Name of the erddap dataset to check
erddap: str, default=OPTIONS['erddap']
Url of the erddap server
maxtry: int, default: 2
Maximum number of host connections to try
Return
------
bool
"""
if erddap is None:
erddap = OPTIONS["erddap"]
# log.debug("from erddap_ds_exists: %s" % erddap)
if isconnected(erddap, maxtry=maxtry):
from ..stores import httpstore # must import here to avoid circular import
with httpstore(timeout=OPTIONS["api_timeout"]).open(
"".join([erddap, "/info/index.json"])
) as of:
erddap_index = json.load(of)
if is_list_of_strings(ds):
return [
this_ds in [row[-1] for row in erddap_index["table"]["rows"]]
for this_ds in ds
]
else:
return ds in [row[-1] for row in erddap_index["table"]["rows"]]
else:
log.debug("Cannot reach erddap server: %s" % erddap)
warnings.warn(
"Return False because we cannot reach the erddap server %s" % erddap
)
return False
def has_aws_credentials():
if HAS_BOTO3:
client = boto3.client('s3')
return client._request_signer._credentials is not None
else:
raise Exception("boto3 is not available !")