Source code for argopy.utilities

#!/bin/env python
# -*coding: UTF-8 -*-
#
# Disclaimer:
# Functions get_sys_info, netcdf_and_hdf5_versions and show_versions are from:
#   xarray/util/print_versions.py
#

import os
import sys
import warnings
import urllib.request
import json

import importlib
import locale
import platform
import struct
import subprocess

import xarray as xr
import numpy as np
from scipy import interpolate

import pickle
import pkg_resources
import shutil

from argopy.options import OPTIONS, set_options
from argopy.stores import httpstore
from argopy.errors import FtpPathError, InvalidFetcher

path2pkl = pkg_resources.resource_filename('argopy', 'assets/')


[docs]def clear_cache(): """ Delete argopy cache folder """ if os.path.exists(OPTIONS['cachedir']): shutil.rmtree(OPTIONS['cachedir'])
def load_dict(ptype): if ptype == 'profilers': with open(os.path.join(path2pkl, 'dict_profilers.pickle'), 'rb') as f: loaded_dict = pickle.load(f) return loaded_dict elif ptype == 'institutions': with open(os.path.join(path2pkl, 'dict_institutions.pickle'), 'rb') as f: loaded_dict = pickle.load(f) return loaded_dict else: raise ValueError("Invalid dictionnary pickle file") def mapp_dict(Adictionnary, Avalue): if Avalue not in Adictionnary: return "Unknown" else: return Adictionnary[Avalue]
[docs]def list_available_data_src(): """ List all available data sources """ AVAILABLE_SOURCES = {} try: from .data_fetchers import erddap_data as Erddap_Fetchers AVAILABLE_SOURCES['erddap'] = Erddap_Fetchers except Exception: warnings.warn("An error occured while loading the ERDDAP data fetcher, " "it will not be available !\n%s\n%s" % (sys.exc_info()[0], sys.exc_info()[1])) pass try: from .data_fetchers import localftp_data as LocalFTP_Fetchers AVAILABLE_SOURCES['localftp'] = LocalFTP_Fetchers except Exception: warnings.warn("An error occured while loading the local FTP data fetcher, " "it will not be available !\n%s\n%s" % (sys.exc_info()[0], sys.exc_info()[1])) pass try: from .data_fetchers import argovis_data as ArgoVis_Fetchers AVAILABLE_SOURCES['argovis'] = ArgoVis_Fetchers except Exception: warnings.warn("An error occured while loading the ArgoVis data fetcher, " "it will not be available !\n%s\n%s" % (sys.exc_info()[0], sys.exc_info()[1])) pass return AVAILABLE_SOURCES
[docs]def list_available_index_src(): """ List all available index sources """ AVAILABLE_SOURCES = {} try: from .data_fetchers import erddap_index as Erddap_Fetchers AVAILABLE_SOURCES['erddap'] = Erddap_Fetchers except Exception: warnings.warn("An error occured while loading the ERDDAP index fetcher, " "it will not be available !\n%s\n%s" % (sys.exc_info()[0], sys.exc_info()[1])) pass try: from .data_fetchers import localftp_index as LocalFTP_Fetchers AVAILABLE_SOURCES['localftp'] = LocalFTP_Fetchers except Exception: warnings.warn("An error occured while loading the local FTP index fetcher, " "it will not be available !\n%s\n%s" % (sys.exc_info()[0], sys.exc_info()[1])) pass return AVAILABLE_SOURCES
def list_standard_variables(): """ Return the list of variables for standard users """ return ['DATA_MODE', 'LATITUDE', 'LONGITUDE', 'POSITION_QC', 'DIRECTION', 'PLATFORM_NUMBER', 'CYCLE_NUMBER', 'PRES', 'TEMP', 'PSAL', 'PRES_QC', 'TEMP_QC', 'PSAL_QC', 'PRES_ADJUSTED', 'TEMP_ADJUSTED', 'PSAL_ADJUSTED', 'PRES_ADJUSTED_QC', 'TEMP_ADJUSTED_QC', 'PSAL_ADJUSTED_QC', 'PRES_ADJUSTED_ERROR', 'TEMP_ADJUSTED_ERROR', 'PSAL_ADJUSTED_ERROR', 'JULD', 'JULD_QC', 'TIME', 'TIME_QC'] def list_multiprofile_file_variables(): """ Return the list of variables in a netcdf multiprofile file. This is for files created by GDAC under <DAC>/<WMO>/<WMO>_prof.nc """ return ['CONFIG_MISSION_NUMBER', 'CYCLE_NUMBER', 'DATA_CENTRE', 'DATA_MODE', 'DATA_STATE_INDICATOR', 'DATA_TYPE', 'DATE_CREATION', 'DATE_UPDATE', 'DC_REFERENCE', 'DIRECTION', 'FIRMWARE_VERSION', 'FLOAT_SERIAL_NO', 'FORMAT_VERSION', 'HANDBOOK_VERSION', 'HISTORY_ACTION', 'HISTORY_DATE', 'HISTORY_INSTITUTION', 'HISTORY_PARAMETER', 'HISTORY_PREVIOUS_VALUE', 'HISTORY_QCTEST', 'HISTORY_REFERENCE', 'HISTORY_SOFTWARE', 'HISTORY_SOFTWARE_RELEASE', 'HISTORY_START_PRES', 'HISTORY_STEP', 'HISTORY_STOP_PRES', 'JULD', 'JULD_LOCATION', 'JULD_QC', 'LATITUDE', 'LONGITUDE', 'PARAMETER', 'PI_NAME', 'PLATFORM_NUMBER', 'PLATFORM_TYPE', 'POSITIONING_SYSTEM', 'POSITION_QC', 'PRES', 'PRES_ADJUSTED', 'PRES_ADJUSTED_ERROR', 'PRES_ADJUSTED_QC', 'PRES_QC', 'PROFILE_PRES_QC', 'PROFILE_PSAL_QC', 'PROFILE_TEMP_QC', 'PROJECT_NAME', 'PSAL', 'PSAL_ADJUSTED', 'PSAL_ADJUSTED_ERROR', 'PSAL_ADJUSTED_QC', 'PSAL_QC', 'REFERENCE_DATE_TIME', 'SCIENTIFIC_CALIB_COEFFICIENT', 'SCIENTIFIC_CALIB_COMMENT', 'SCIENTIFIC_CALIB_DATE', 'SCIENTIFIC_CALIB_EQUATION', 'STATION_PARAMETERS', 'TEMP', 'TEMP_ADJUSTED', 'TEMP_ADJUSTED_ERROR', 'TEMP_ADJUSTED_QC', 'TEMP_QC', 'VERTICAL_SAMPLING_SCHEME', 'WMO_INST_TYPE'] def check_localftp(path, errors: str = "ignore"): """ Check if the path has the expected GDAC ftp structure Check if the path is structured like: . └── dac ├── aoml ├── ... ├── coriolis ├── ... ├── meds └── nmdis Parameters ---------- path: str Path name to check errors: str "ignore" or "raise" (or "warn" Returms ------- checked: boolean True if at least one DAC folder is found under path/dac/<dac_name> False otherwise """ dacs = ['aoml', 'bodc', 'coriolis', 'csio', 'csiro', 'incois', 'jma', 'kma', 'kordi', 'meds', 'nmdis'] # Case 1: check1 = os.path.isdir(path) and os.path.isdir(os.path.join(path, "dac")) \ and np.any([os.path.isdir(os.path.join(path, "dac", dac)) for dac in dacs]) if check1: return True elif errors == 'raise': # This was possible up to v0.1.3: check2 = os.path.isdir(path) and np.any([os.path.isdir(os.path.join(path, dac)) for dac in dacs]) if check2: raise FtpPathError("This path is no longer GDAC compliant for argopy.\n" "Please make sure you point toward a path with a 'dac' folder:\n%s" % path) else: raise FtpPathError("This path is not GDAC compliant:\n%s" % path) elif errors == 'warn': # This was possible up to v0.1.3: check2 = os.path.isdir(path) and np.any([os.path.isdir(os.path.join(path, dac)) for dac in dacs]) if check2: warnings.warn("This path is no longer GDAC compliant for argopy. This will raise an error in the future.\n" "Please make sure you point toward a path with a 'dac' folder:\n%s" % path) return False else: warnings.warn("This path is not GDAC compliant:\n%s" % path) return False else: return False def get_sys_info(): "Returns system information as a dict" blob = [] # get full commit hash commit = None if os.path.isdir(".git") and os.path.isdir("argopy"): try: pipe = subprocess.Popen( 'git log --format="%H" -n 1'.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) so, serr = pipe.communicate() except Exception: pass else: if pipe.returncode == 0: commit = so try: commit = so.decode("utf-8") except ValueError: pass commit = commit.strip().strip('"') blob.append(("commit", commit)) try: (sysname, nodename, release, version, machine, processor) = platform.uname() blob.extend( [ ("python", sys.version), ("python-bits", struct.calcsize("P") * 8), ("OS", "%s" % (sysname)), ("OS-release", "%s" % (release)), ("machine", "%s" % (machine)), ("processor", "%s" % (processor)), ("byteorder", "%s" % sys.byteorder), ("LC_ALL", "%s" % os.environ.get("LC_ALL", "None")), ("LANG", "%s" % os.environ.get("LANG", "None")), ("LOCALE", "%s.%s" % locale.getlocale()), ] ) except Exception: pass return blob def netcdf_and_hdf5_versions(): libhdf5_version = None libnetcdf_version = None try: import netCDF4 libhdf5_version = netCDF4.__hdf5libversion__ libnetcdf_version = netCDF4.__netcdf4libversion__ except ImportError: try: import h5py libhdf5_version = h5py.version.hdf5_version except ImportError: pass return [("libhdf5", libhdf5_version), ("libnetcdf", libnetcdf_version)]
[docs]def show_versions(file=sys.stdout): """ Print the versions of argopy and its dependencies Parameters ---------- file : file-like, optional print to the given file-like object. Defaults to sys.stdout. """ sys_info = get_sys_info() try: sys_info.extend(netcdf_and_hdf5_versions()) except Exception as e: print(f"Error collecting netcdf / hdf5 version: {e}") deps = [ # (MODULE_NAME, f(mod) -> mod version) ("argopy", lambda mod: mod.__version__), ("xarray", lambda mod: mod.__version__), ("pandas", lambda mod: mod.__version__), ("numpy", lambda mod: mod.__version__), ("scipy", lambda mod: mod.__version__), # argopy optionals ("netCDF4", lambda mod: mod.__version__), ("pydap", lambda mod: mod.__version__), ("h5netcdf", lambda mod: mod.__version__), ("h5py", lambda mod: mod.__version__), ("Nio", lambda mod: mod.__version__), ("zarr", lambda mod: mod.__version__), ("cftime", lambda mod: mod.__version__), ("nc_time_axis", lambda mod: mod.__version__), ("PseudoNetCDF", lambda mod: mod.__version__), ("rasterio", lambda mod: mod.__version__), ("cfgrib", lambda mod: mod.__version__), ("iris", lambda mod: mod.__version__), ("bottleneck", lambda mod: mod.__version__), ("dask", lambda mod: mod.__version__), ("distributed", lambda mod: mod.__version__), ("matplotlib", lambda mod: mod.__version__), ("cartopy", lambda mod: mod.__version__), ("seaborn", lambda mod: mod.__version__), ("numbagg", lambda mod: mod.__version__), # argopy setup/test ("setuptools", lambda mod: mod.__version__), ("pip", lambda mod: mod.__version__), ("conda", lambda mod: mod.__version__), ("pytest", lambda mod: mod.__version__), # Misc. ("IPython", lambda mod: mod.__version__), ("sphinx", lambda mod: mod.__version__), ] deps_blob = list() for (modname, ver_f) in deps: try: if modname in sys.modules: mod = sys.modules[modname] else: mod = importlib.import_module(modname) except Exception: deps_blob.append((modname, None)) else: try: ver = ver_f(mod) deps_blob.append((modname, ver)) except Exception: deps_blob.append((modname, "installed")) print("\nINSTALLED VERSIONS", file=file) print("------------------", file=file) for k, stat in sys_info: print(f"{k}: {stat}", file=file) print("", file=file) for k, stat in deps_blob: print(f"{k}: {stat}", file=file)
def isconnected(host='http://www.ifremer.fr'): """ check if we have a live internet connection Parameters ---------- host: str URL to use, 'http://www.ifremer.fr' by default Returns ------- bool """ try: urllib.request.urlopen(host, timeout=2) # Python 3.x return True except Exception: return False def isAPIconnected(src='erddap', data=True): """ Check if a source web API is alive or not Parameters ---------- src: str The data or index source name, 'erddap' default data: bool If True check the data fetcher (default), if False, check the index fetcher Returns ------- bool """ if data: AVAILABLE_SOURCES = list_available_data_src() else: AVAILABLE_SOURCES = list_available_index_src() if src in AVAILABLE_SOURCES and getattr(AVAILABLE_SOURCES[src], "api_server_check", None): with set_options(src=src): return isconnected(AVAILABLE_SOURCES[src].api_server_check) else: raise InvalidFetcher def erddap_ds_exists(ds="ArgoFloats"): """ Given erddap fetcher, check if a Dataset exists, return a bool""" # e = ArgoDataFetcher(src='erddap').float(wmo=0).fetcher # erddap_index = json.load(urlopen(e.erddap.server + "/info/index.json")) # erddap_index = json.load(urlopen("http://www.ifremer.fr/erddap/info/index.json")) with httpstore(timeout=120).open("http://www.ifremer.fr/erddap/info/index.json") as of: erddap_index = json.load(of) return ds in [row[-1] for row in erddap_index['table']['rows']]
[docs]def open_etopo1(box, res='l'): """ Download ETOPO for a box Parameters ---------- box: [xmin, xmax, ymin, ymax] Returns ------- xarray.Dataset """ # This function is in utilities to anticipate usage outside of plotting, eg interpolation, grounding detection resx, resy = 0.1, 0.1 if res == 'h': resx, resy = 0.016, 0.016 uri = ("https://gis.ngdc.noaa.gov/mapviewer-support/wcs-proxy/wcs.groovy?filename=etopo1.nc" "&request=getcoverage&version=1.0.0&service=wcs&coverage=etopo1&CRS=EPSG:4326&format=netcdf" "&resx={}&resy={}" "&bbox={}").format thisurl = uri(resx, resy, ",".join([str(b) for b in [box[0], box[2], box[1], box[3]]])) ds = httpstore(cache=True).open_dataset(thisurl) da = ds['Band1'].rename("topo") for a in ds.attrs: da.attrs[a] = ds.attrs[a] da.attrs['Data source'] = 'https://maps.ngdc.noaa.gov/viewers/wcs-client/' da.attrs['URI'] = thisurl return da
# # From xarrayutils : https://github.com/jbusecke/xarrayutils/blob/master/xarrayutils/vertical_coordinates.py #  Direct integration of those 2 functions to minimize dependencies and possibility of tuning them to our needs # def linear_interpolation_remap(z, data, z_regridded, z_dim=None, z_regridded_dim="regridded", output_dim="remapped"): # interpolation called in xarray ufunc def _regular_interp(x, y, target_values): # remove all nans from input x and y idx = np.logical_or(np.isnan(x), np.isnan(y)) x = x[~idx] y = y[~idx] # replace nans in target_values with out of bound Values (just in case) target_values = np.where( ~np.isnan(target_values), target_values, np.nanmax(x) + 1) # Interpolate with fill value parameter to extend min pressure toward 0 interpolated = interpolate.interp1d( x, y, bounds_error=False, fill_value=(y[0], y[-1]))(target_values) return interpolated # infer dim from input if z_dim is None: if len(z.dims) != 1: raise RuntimeError( "if z_dim is not specified,x must be a 1D array.") dim = z.dims[0] else: dim = z_dim # if dataset is passed drop all data_vars that dont contain dim if isinstance(data, xr.Dataset): raise ValueError("Dataset input is not supported yet") # TODO: for a datset input just apply the function for each appropriate array kwargs = dict( input_core_dims=[[dim], [dim], [z_regridded_dim]], output_core_dims=[[output_dim]], vectorize=True, dask="parallelized", output_dtypes=[data.dtype], output_sizes={output_dim: len(z_regridded[z_regridded_dim])}, ) remapped = xr.apply_ufunc( _regular_interp, z, data, z_regridded, **kwargs) remapped.coords[output_dim] = z_regridded.rename( {z_regridded_dim: output_dim} ).coords[output_dim] return remapped ####################