Source code for argopy.utils.format

"""
Manipulate Argo formatted string and print/stdout formatters
"""

import os
from urllib.parse import urlparse, parse_qs
import logging
import pandas as pd
import numpy as np
import warnings
from typing import Literal

from argopy.utils.checkers import check_cyc, check_wmo


log = logging.getLogger("argopy.utils.format")


redact = lambda s, n: s[:n] + "*" * max(0, len(s) - n)  # noqa: E731


[docs] def format_oneline(s, max_width=65): """Return a string formatted for a line print""" if len(s) > max_width: padding = " ... " n = (max_width - len(padding)) // 2 q = (max_width - len(padding)) % 2 if q == 0: return "".join([s[0:n], padding, s[-n:]]) else: return "".join([s[0 : n + 1], padding, s[-n:]]) else: return s
def dirfs_relpath(fs, path): print("-" * 40) if isinstance(path, str): if not fs.path: return path # We need to account for S3FileSystem returning paths that do not # start with a '/' if path == fs.path or (fs.path.startswith(fs.fs.sep) and path == fs.path[1:]): return "" prefix = fs.path + fs.fs.sep if fs.path.startswith(fs.fs.sep) and not path.startswith(fs.fs.sep): prefix = prefix[1:] print("fs=", fs) print("fs.path=", fs.path) print("prefix=", prefix) print("fs.fs.sep=", fs.fs.sep) print("fs.path.startswith(fs.fs.sep)=", fs.path.startswith(fs.fs.sep)) print("path.startswith(fs.fs.sep)=", path.startswith(fs.fs.sep)) print("prefix=", prefix) print("path=", path) print("path.startswith(prefix)=", path.startswith(prefix)) assert path.startswith(prefix) return path[len(prefix) :] print("-" * 40) return [dirfs_relpath(fs, _path) for _path in path]
[docs] def argo_split_path(this_path): # noqa C901 """Split path from a GDAC ftp style Argo netcdf file and return information >>> argo_split_path('/dac/coriolis/6901035/profiles/D6901035_001D.nc') >>> argo_split_path('https://data-argo.ifremer.fr/dac/csiro/5903939/profiles/D5903939_103.nc') Parameters ---------- str Returns ------- dict """ dacs = [ "aoml", "bodc", "coriolis", "csio", "csiro", "incois", "jma", "kma", "kordi", # todo: remove this entry after some time, it will not be valid after 2025/06/30 "kiost", "meds", "nmdis", ] output = {} start_with = lambda f, x: ( # noqa: E731 f[0 : len(x)] == x if len(x) <= len(f) else False ) def detect_path_separator(path): """ Determines the file path separator used in a given path string. Args: path (str): The path string to analyze. Returns: str: The detected file path separator, or None if no valid separator is found. """ # Check for the default OS separator if os.sep in path: return os.sep # Check for the alternative separator, if it exists (e.g., '/' on Windows) if os.altsep and os.altsep in path: return os.altsep # No separator detected return None def split_path(p, sep="/"): """Split a pathname. Returns tuple "(head, tail)" where "tail" is everything after the final slash. Either part may be empty.""" # Same as posixpath.py but we get to choose the file separator ! p = os.fspath(p) i = p.rfind(sep) + 1 head, tail = p[:i], p[i:] if head and head != sep * len(head): head = head.rstrip(sep) return head, tail def fix_localhost(host): if "ftp://localhost:" in host: return "ftp://%s" % (urlparse(host).netloc) if "http://127.0.0.1:" in host: return "http://%s" % (urlparse(host).netloc) else: return "" known_origins = [ "https://data-argo.ifremer.fr", "https://usgodae.org/pub/outgoing/argo", "ftp://ftp.ifremer.fr/ifremer/argo", "ftp://usgodae.org/pub/outgoing/argo", "s3://argo-gdac-sandbox/pub", fix_localhost(this_path), "", ] # Check if this is a path with a known "origin": # If not, fills value with an empty string "" output["origin"] = [ origin for origin in known_origins if start_with(this_path, origin) ][0] output["origin"] = "." if output["origin"] == "" else output["origin"] + "/" sep = "/" if output["origin"] != "." else detect_path_separator(this_path) (path, file) = split_path(this_path, sep=sep) output["path"] = path.replace(output["origin"], "") output["name"] = file # Deal with the path: # dac/<DAC>/<FloatWmoID>/ # dac/<DAC>/<FloatWmoID>/profiles path_parts = path.split(sep) try: # Adjust origin and path for local files: # This ensures that output['path'] is agnostic to users and can be reused on any gdac compliant architecture output["origin"] = sep.join(path_parts[0 : path_parts.index("dac")]) output["origin"] = sep if output["origin"] == "" else output["origin"] output["path"] = sep.join(path_parts[path_parts.index("dac") :]) # Extract file information if path_parts[-1] == "profiles": output["type"] = "Mono-cycle profile file" output["wmo"] = path_parts[-2] output["dac"] = path_parts[-3] else: output["type"] = "Multi-cycle profile file" output["wmo"] = path_parts[-1] output["dac"] = path_parts[-2] except Exception: log.warning(this_path) log.warning(path) log.warning(sep) log.warning(path_parts) log.warning(output) raise if output["dac"] not in dacs: log.debug("This is not a Argo GDAC compliant file path: %s" % path) log.warning(this_path) log.warning(path) log.warning(sep) log.warning(path_parts) log.warning(output) raise ValueError( "This is not a Argo GDAC compliant file path (invalid DAC name: '%s')" % output["dac"] ) elif output["dac"] == "kordi" and pd.to_datetime("now", utc=True) > pd.to_datetime( "2025-06-30", utc=True ): warnings.warn("DAC 'kordi' has been deprecated by ADMT. Use 'kiost' instead.") # Deal with the file name: filename, file_extension = os.path.splitext(output["name"]) output["extension"] = file_extension if file_extension != ".nc": raise ValueError( "This is not a Argo GDAC compliant file path (invalid file extension: '%s')" % file_extension ) filename_parts = output["name"].split("_") if "Mono" in output["type"]: prefix = filename_parts[0].split(output["wmo"])[0] if "R" in prefix: output["data_mode"] = "R, Real-time data" if "D" in prefix: output["data_mode"] = "D, Delayed-time data" if "S" in prefix: output["type"] = "S, Synthetic BGC Mono-cycle profile file" if "M" in prefix: output["type"] = "M, Merged BGC Mono-cycle profile file" if "B" in prefix: output["type"] = "B, BGC Mono-cycle profile file" suffix = filename_parts[-1].split(output["wmo"])[-1] if "D" in suffix: output["direction"] = "D, descending profiles" elif suffix == "" and "Mono" in output["type"]: output["direction"] = "A, ascending profiles (implicit)" else: typ = filename_parts[-1].split(".nc")[0] if typ == "prof": output["type"] = "Multi-cycle file" if typ == "Sprof": output["type"] = "S, Synthetic BGC Multi-cycle profiles file" if typ == "tech": output["type"] = "Technical data file" if typ == "meta": output["type"] = "Metadata file" if "traj" in typ: # possible typ = [Rtraj, Dtraj, BRtraj, BDtraj] output["type"], i = "Trajectory file", 0 if typ[0] == "B": output["type"], i = "BGC Trajectory file", 1 if typ.split("traj")[0][i] == "D": output["data_mode"] = "D, Delayed-time data" elif typ.split("traj")[0][i] == "R": output["data_mode"] = "R, Real-time data" else: output["data_mode"] = "R, Real-time data (implicit)" return dict(sorted(output.items()))
def erddapuri2fetchobj(uri: str) -> dict: """Given an Ifremer ERDDAP URI, return a dictionary with BOX or WMO or (WMO, CYC) fetcher arguments""" params = parse_qs(uri) result = {} if "longitude>" in params.keys(): # Recreate the box definition: box = [ float(params["longitude>"][0]), float(params["longitude<"][0]), float(params["latitude>"][0]), float(params["latitude<"][0]), ] if "pres>" in params: box.append(float(params["pres>"][0])) box.append(float(params["pres<"][0])) elif "pres_adjusted>" in params: box.append(float(params["pres_adjusted>"][0])) box.append(float(params["pres_adjusted<"][0])) else: raise ValueError( "This erddap uri is invalid, it must have pressure constraints with coordinates constraints: %s" % uri ) if "time>" in params.keys(): box.append( pd.to_datetime(float(params["time>"][0]), unit="s").strftime("%Y-%m-%d") ) box.append( pd.to_datetime(float(params["time<"][0]), unit="s").strftime("%Y-%m-%d") ) result["box"] = box elif "platform_number" in params: wmo = params["platform_number"][0].replace("~", "").replace('"', "").split("|") wmo = check_wmo(wmo) result["wmo"] = wmo if "cycle_number" in params: cyc = params["cycle_number"][0].replace("~", "").replace('"', "").split("|") cyc = check_cyc(cyc) result["cyc"] = cyc if len(result.keys()) == 0: raise ValueError("This is not a typical Argo Ifremer Erddap uri") else: return result
[docs] class UriCName: """Return a CNAME from an Ifremer ERDDAP fetcher instance or uri string""" def _is_url(self, url): parsed = urlparse(url) return parsed.scheme and parsed.netloc
[docs] def __init__(self, obj): if hasattr(obj, "BOX"): self.BOX = obj.BOX elif hasattr(obj, "WMO"): self.WMO = obj.WMO if hasattr(obj, "CYC"): self.CYC = obj.CYC elif self._is_url(obj) and "/tabledap/" in obj: obj = erddapuri2fetchobj(obj) if "box" in obj.keys(): self.BOX = obj["box"] elif "wmo" in obj.keys(): self.WMO = obj["wmo"] if "cyc" in obj.keys(): self.CYC = obj["cyc"] else: raise ValueError( "This class is only available with Erddap uri string requests or an ArgoDataFetcherProto instance" )
def _format(self, x, typ: str) -> str: """string formatting helper""" if typ == "lon": if x < 0: x = 360.0 + x return ("%05d") % (x * 100.0) if typ == "lat": return ("%05d") % (x * 100.0) if typ == "prs": return ("%05d") % (np.abs(x) * 10.0) if typ == "tim": return pd.to_datetime(x).strftime("%Y-%m-%d") return str(x) def __repr__(self): return self.cname @property def cname(self) -> str: """Fetcher one line string definition helper""" cname = "?" if hasattr(self, "BOX"): BOX = self.BOX cname = ("[x=%0.2f/%0.2f; y=%0.2f/%0.2f]") % ( BOX[0], BOX[1], BOX[2], BOX[3], ) if len(BOX) == 6: cname = ("[x=%0.2f/%0.2f; y=%0.2f/%0.2f; z=%0.1f/%0.1f]") % ( BOX[0], BOX[1], BOX[2], BOX[3], BOX[4], BOX[5], ) if len(BOX) == 8: cname = ("[x=%0.2f/%0.2f; y=%0.2f/%0.2f; z=%0.1f/%0.1f; t=%s/%s]") % ( BOX[0], BOX[1], BOX[2], BOX[3], BOX[4], BOX[5], self._format(BOX[6], "tim"), self._format(BOX[7], "tim"), ) elif hasattr(self, "WMO"): prtcyc = lambda f, wmo: "WMO%i_%s" % ( # noqa: E731 wmo, "_".join(["CYC%i" % (cyc) for cyc in sorted(f.CYC)]), ) if len(self.WMO) == 1: if hasattr(self, "CYC") and self.CYC is not None: cname = prtcyc(self, self.WMO[0]) else: cname = "WMO%i" % (self.WMO[0]) else: cname = ";".join(["WMO%i" % wmo for wmo in sorted(self.WMO)]) if hasattr(self, "CYC") and self.CYC is not None: cname = ";".join([prtcyc(self, wmo) for wmo in self.WMO]) if hasattr(self, "dataset_id"): cname = self.dataset_id + ";" + cname return cname
def cfgnameparser(name: str) -> dict[str, str]: """Configuration parameter name parser Get parameter name and unit as dictionary out of R18 prefLabel. Unit is always lower case. """ assert name.split("_")[0] == 'CONFIG', "This is not a valid configuration parameter (see R18 prefLabel)" unit = name.split("_")[-1] label = "".join(name.split("_")[1:-1]) return {'label': label, 'unit': unit.lower()} def group_cycles_by_missions(cycles: dict[int, int], output: Literal['group', 'list'] = 'group') -> dict[int, str] | dict[int, list[int]]: """ Parameters ---------- cycles: dict[int, int] A dictionary mapping cycle (keys) on mission numbers (values). output: Literal['group', 'list'], default='group' Returns ------- dict[int, str] | dict[int, list[int]] A dictionary mapping mission numbers (keys) on group of cycle numbers (values). If output is set to 'group', values are a string (eg '1>3') and if output is set to 'list', values are the list of cycle numbers as integers. """ is_suite = lambda x: list(range(np.min(x), np.max(x) + 1)) == sorted(x) def group_consecutive(lst): if not lst: return [] # Sort the list to ensure consecutive values are adjacent lst = sorted(lst) groups = [[lst[0]]] for i in range(1, len(lst)): if lst[i] == groups[-1][-1] + 1: groups[-1].append(lst[i]) else: groups.append([lst[i]]) return groups missions = np.unique(list(cycles.values())) mission_cycles = {} for m in missions: mission_cycles.update({int(m): []}) for cyc, mis in cycles.items(): if mis == m: mission_cycles[int(m)].append(cyc) if output == 'list': return mission_cycles else: results = {} for mis, cycs in mission_cycles.items(): if len(cycs) == 1: txt = f"{cycs[0]}" elif is_suite(cycs): txt = f"{np.min(cycs)}>{np.max(cycs)}" else: grps = group_consecutive(cycs) summary = [] for grp in grps: summary.append(f"{np.min(grp)}>{np.max(grp)}") txt = ",".join(summary) results.update({int(mis): txt}) return results def mono2multi(flist : list[str], convention : str = 'core', sep :str = '/') -> list[str]: """Convert a list of mono-profile files to a list of multi-profile files The multi-profile file name is based on an :class:`ArgoIndex` convention. Parameters ---------- flist: list[str] A list of mono-profile files (relative GDAC paths), as output for :meth:`ArgoIndex.read_files`. convention: str, optional, default = 'ar_index_global_prof' The Argo index convention from which `flist` was extracted. Can be 'ar_index_global_prof' or 'argo_synthetic-profile_index'. sep: str, optional, default = '/' GDAC file system separator used in flist Returns ------- list(str) """ def _mono2multi(mono_path): meta = argo_split_path(mono_path) if convention == "ar_index_global_prof": return sep.join( [ meta["origin"], "dac", meta["dac"], meta["wmo"], "%s_prof.nc" % meta["wmo"], ] ) elif convention in ["argo_synthetic-profile_index"]: return sep.join( [ meta["origin"], "dac", meta["dac"], meta["wmo"], "%s_Sprof.nc" % meta["wmo"], ] ) else: raise ValueError("Method not available for this index (only 'ar_index_global_prof' and 'argo_synthetic-profile_index' allowed).") new_uri = [_mono2multi(uri)[2:] for uri in flist] new_uri = list(set(new_uri)) return new_uri def urnparser(urn: str) -> dict[str]: """Parsing RFC 8141 compliant uniform resource names (URN) from NVS SDN stands for SeaDataNet Parameters ---------- urn: str Uniform resource names of the 'SDN:{listid}:{version}:{termid}' or 'SDN:{listid}::{termid}' Returns ------- dict[str, str] Components of the URN: 'listid', 'version' and 'termid' """ pp = urn.split(":") if len(pp) == 4 and pp[0] == 'SDN': return {'listid': pp[1], 'version': pp[2], 'termid': pp[3]} else: raise ValueError(f"This NVS URN '{urn}' does not follow the pattern: 'SDN:listid:version:termid' or 'SDN:listid::termid' for NVS2.0") def ppliststr(l: list[str], last : str = 'and', n : int | None = None) -> str: """Pretty print a list of strings Examples -------- .. code-block:: python ppliststr(['a', 'b', 'c', 'd']) -> "'a', 'b', 'c' and 'd'" ppliststr(['a', 'b'], last='or') -> "'a' or 'b'" ppliststr(['a', 'b', 'c', 'd'], n=3) -> "'a', 'b', 'c' and more ..." """ n = n if n is not None else len(l) if n == 0: return "" s: str = "" ii: int = 0 m: int = len(l) while ii < m: item = l[ii] if ii == n: s += f" {last} more ..." break if ii == 0: s += f"'{item}'" elif ii == len(l) - 1: s += f" {last} '{item}'" else: s += f", '{item}'" ii += 1 return s