Source code for argopy.xarray

import os
import sys
import warnings

import numpy as np
import pandas as pd
import xarray as xr
from sklearn import preprocessing
import logging

try:
    import gsw

    with_gsw = True
except ModuleNotFoundError:
    with_gsw = False

from argopy.utilities import (
    linear_interpolation_remap,
    is_list_equal,
    is_list_of_strings,
    toYearFraction,
    groupby_remap
)
from argopy.errors import InvalidDatasetStructure, DataNotFound, OptionValueError


log = logging.getLogger("argopy.xarray")


@xr.register_dataset_accessor("argo")
class ArgoAccessor:
    """

        Class registered under scope ``argo`` to access a :class:`xarray.Dataset` object.

        - Ensure all variables are of the Argo required dtype with:
        >>> ds.argo.cast_types()

        - Convert a collection of points into a collection of profiles:
        >>> ds.argo.point2profile()

        - Convert a collection of profiles to a collection of points:
        >>> ds.argo.profile2point()

        - Filter measurements according to data mode:
        >>> ds.argo.filter_data_mode()

        - Filter measurements according to QC flag values:
        >>> ds.argo.filter_qc(QC_list=[1, 2], QC_fields='all')

        - Filter variables according OWC salinity calibration requirements:
        >>> ds.argo.filter_scalib_pres(force='default')

        - Interpolate measurements on pressure levels:
        >>> ds.argo.inter_std_levels(std_lev=[10., 500., 1000.])

        - Group and reduce measurements by pressure bins:
        >>> ds.argo.groupby_pressure_bins(bins=[0, 200., 500., 1000.])
`
        - Compute and add additional variables to the dataset:
        >>> ds.argo.teos10(vlist='PV')

        - Preprocess data for OWC salinity calibration:
        >>> ds.argo.create_float_source("output_folder")

     """

    def __init__(self, xarray_obj):
        """ Init """
        self._obj = xarray_obj
        self._added = list()  # Will record all new variables added by argo
        # self._register = collections.OrderedDict() # Will register mutable instances of sub-modules like 'plot'
        # Variables present in the initial dataset
        self._vars = list(xarray_obj.variables.keys())
        # Store the initial list of dimensions
        self._dims = list(xarray_obj.dims.keys())
        self.encoding = xarray_obj.encoding
        self.attrs = xarray_obj.attrs

        if "N_PROF" in self._dims:
            self._type = "profile"
        elif "N_POINTS" in self._dims:
            self._type = "point"
        else:
            raise InvalidDatasetStructure("Argo dataset structure not recognised (dimensions N_PROF or N_POINTS not found)")

        if "PRES_ADJUSTED" in self._vars:
            self._mode = "expert"
        elif "PRES" in self._vars:
            self._mode = "standard"
        else:
            raise InvalidDatasetStructure("Argo dataset structure not recognised")

    def __repr__(self):
        # import xarray.core.formatting as xrf
        # col_width = xrf._calculate_col_width(xrf._get_col_items(self._obj.variables))
        # max_rows = xr.core.options.OPTIONS["display_max_rows"]

        summary = ["<xarray.{}.argo>".format(type(self._obj).__name__)]
        if self._type == "profile":
            summary.append("This is a collection of Argo profiles")
            summary.append(
                "N_PROF(%i) x N_LEVELS(%i) ~ N_POINTS(%i)"
                % (self.N_PROF, self.N_LEVELS, self.N_POINTS)
            )

        elif self._type == "point":
            summary.append("This is a collection of Argo points")
            summary.append(
                "N_POINTS(%i) ~ N_PROF(%i) x N_LEVELS(%i)"
                % (self.N_POINTS, self.N_PROF, self.N_LEVELS)
            )

        # dims_start = xrf.pretty_print("Dimensions:", col_width)
        # summary.append("{}({})".format(dims_start, xrf.dim_summary(self._obj)))

        return "\n".join(summary)

    @property
    def N_PROF(self):
        """Number of profiles"""
        if self._type == "point":
            dummy_argo_uid = xr.DataArray(
                self.uid(
                    self._obj["PLATFORM_NUMBER"].values,
                    self._obj["CYCLE_NUMBER"].values,
                    self._obj["DIRECTION"].values,
                ),
                dims="N_POINTS",
                coords={"N_POINTS": self._obj["N_POINTS"]},
                name="dummy_argo_uid",
            )
            N_PROF = len(np.unique(dummy_argo_uid))
        else:
            N_PROF = len(np.unique(self._obj["N_PROF"]))
        return N_PROF

    @property
    def N_LEVELS(self):
        """Number of vertical levels"""
        if self._type == "point":
            dummy_argo_uid = xr.DataArray(
                self.uid(
                    self._obj["PLATFORM_NUMBER"].values,
                    self._obj["CYCLE_NUMBER"].values,
                    self._obj["DIRECTION"].values,
                ),
                dims="N_POINTS",
                coords={"N_POINTS": self._obj["N_POINTS"]},
                name="dummy_argo_uid",
            )
            N_LEVELS = int(
                xr.DataArray(
                    np.ones_like(self._obj["N_POINTS"].values),
                    dims="N_POINTS",
                    coords={"N_POINTS": self._obj["N_POINTS"]},
                )
                .groupby(dummy_argo_uid)
                .sum()
                .max()
                .values
            )
        else:
            N_LEVELS = len(np.unique(self._obj["N_LEVELS"]))
        return N_LEVELS

    @property
    def N_POINTS(self):
        """Number of measurement points"""
        if self._type == "profile":
            N_POINTS = self.N_PROF * self.N_LEVELS
        else:
            N_POINTS = len(np.unique(self._obj["N_POINTS"]))
        return N_POINTS

    def _add_history(self, txt):
        if "history" in self._obj.attrs:
            self._obj.attrs["history"] += "; %s" % txt
        else:
            self._obj.attrs["history"] = txt

    def _where(self, cond, other=xr.core.dtypes.NA, drop: bool = False):
        """ where that preserve dtypes of Argo fields

        Parameters
        ----------
        cond : DataArray, Dataset, or callable
            Locations at which to preserve this object's values. dtype must be `bool`.
            If a callable, it must expect this object as its only parameter.
        other : scalar, DataArray or Dataset, optional
            Value to use for locations in this object where ``cond`` is False.
            By default, these locations filled with NA.
        drop : bool, optional
            If True, coordinate labels that only correspond to False values of
            the condition are dropped from the result. Mutually exclusive with
            ``other``.
        """
        this = self._obj.copy(deep=True)
        this = this.where(cond, other=other, drop=drop)
        this = this.argo.cast_types()
        # this.argo._add_history("Modified with 'where' statement")
        return this

[docs] def cast_types(self): # noqa: C901 """ Make sure variables are of the appropriate types according to Argo This is hard coded, but should be retrieved from an API somewhere. Should be able to handle all possible variables encountered in the Argo dataset. """ ds = self._obj list_str = [ "PLATFORM_NUMBER", "DATA_MODE", "DIRECTION", "DATA_CENTRE", "DATA_TYPE", "FORMAT_VERSION", "HANDBOOK_VERSION", "PROJECT_NAME", "PI_NAME", "STATION_PARAMETERS", "DATA_CENTER", "DC_REFERENCE", "DATA_STATE_INDICATOR", "PLATFORM_TYPE", "FIRMWARE_VERSION", "POSITIONING_SYSTEM", "PROFILE_PRES_QC", "PROFILE_PSAL_QC", "PROFILE_TEMP_QC", "PARAMETER", "SCIENTIFIC_CALIB_EQUATION", "SCIENTIFIC_CALIB_COEFFICIENT", "SCIENTIFIC_CALIB_COMMENT", "HISTORY_INSTITUTION", "HISTORY_STEP", "HISTORY_SOFTWARE", "HISTORY_SOFTWARE_RELEASE", "HISTORY_REFERENCE", "HISTORY_QCTEST", "HISTORY_ACTION", "HISTORY_PARAMETER", "VERTICAL_SAMPLING_SCHEME", "FLOAT_SERIAL_NO", ] list_int = [ "PLATFORM_NUMBER", "WMO_INST_TYPE", "WMO_INST_TYPE", "CYCLE_NUMBER", "CONFIG_MISSION_NUMBER", ] list_datetime = [ "REFERENCE_DATE_TIME", "DATE_CREATION", "DATE_UPDATE", "JULD", "JULD_LOCATION", "SCIENTIFIC_CALIB_DATE", "HISTORY_DATE", "TIME" ] def cast_this(da, type): """ Low-level casting of DataArray values """ try: da.values = da.values.astype(type) da.attrs["casted"] = 1 except Exception: print("Oops!", sys.exc_info()[0], "occurred.") print("Fail to cast: ", da.dtype, "into:", type, "for: ", da.name) print("Encountered unique values:", np.unique(da)) return da def cast_this_da(da): """ Cast any DataArray """ da.attrs["casted"] = 0 if v in list_str and da.dtype == "O": # Object da = cast_this(da, str) if v in list_int: # and da.dtype == 'O': # Object da = cast_this(da, int) if v in list_datetime and da.dtype == "O": # Object if ( "conventions" in da.attrs and da.attrs["conventions"] == "YYYYMMDDHHMISS" ): if da.size != 0: if len(da.dims) <= 1: val = da.astype(str).values.astype("U14") # This should not happen, but still ! That's real world data val[val == " "] = "nan" da.values = pd.to_datetime(val, format="%Y%m%d%H%M%S") else: s = da.stack(dummy_index=da.dims) val = s.astype(str).values.astype("U14") # This should not happen, but still ! That's real world data val[val == " "] = "nan" s.values = pd.to_datetime(val, format="%Y%m%d%H%M%S") da.values = s.unstack("dummy_index") da = cast_this(da, np.datetime64) else: da = cast_this(da, np.datetime64) elif v == "SCIENTIFIC_CALIB_DATE": da = cast_this(da, str) s = da.stack(dummy_index=da.dims) s.values = pd.to_datetime(s.values, format="%Y%m%d%H%M%S") da.values = (s.unstack("dummy_index")).values da = cast_this(da, np.datetime64) if "QC" in v and "PROFILE" not in v and "QCTEST" not in v: if da.dtype == "O": # convert object to string da = cast_this(da, str) # Address weird string values: # (replace missing or nan values by a '0' that will be cast as an integer later if da.dtype == "<U3": # string, len 3 because of a 'nan' somewhere ii = ( da == " " ) # This should not happen, but still ! That's real world data da = xr.where(ii, "0", da) ii = ( da == "nan" ) # This should not happen, but still ! That's real world data da = xr.where(ii, "0", da) # Get back to regular U1 string da = cast_this(da, np.dtype("U1")) if da.dtype == "<U1": # string ii = ( da == "" ) # This should not happen, but still ! That's real world data da = xr.where(ii, "0", da) ii = ( da == " " ) # This should not happen, but still ! That's real world data da = xr.where(ii, "0", da) ii = ( da == "n" ) # This should not happen, but still ! That's real world data da = xr.where(ii, "0", da) # finally convert QC strings to integers: da = cast_this(da, int) if da.dtype != "O": da.attrs["casted"] = 1 return da for v in ds.variables: try: ds[v] = cast_this_da(ds[v]) except Exception: print("Oops!", sys.exc_info()[0], "occurred.") print("Fail to cast: %s " % v) print("Encountered unique values:", np.unique(ds[v])) raise return ds
def uid(self, wmo_or_uid, cyc=None, direction=None): """ UID encoder/decoder Parameters ---------- int WMO number (to encode) or UID (to decode) cyc: int, optional Cycle number (to encode), not used to decode direction: str, optional Direction of the profile, must be 'A' (Ascending) or 'D' (Descending) Returns ------- int or tuple of int Examples -------- >>> unique_float_profile_id = uid(690024,13,'A') # Encode >>> wmo, cyc, drc = uid(unique_float_profile_id) # Decode """ le = preprocessing.LabelEncoder() le.fit(["A", "D"]) def encode_direction(x): y = 1 - le.transform(x) return np.where(y == 0, -1, y) def decode_direction(x): y = 1 - np.where(x == -1, 0, x) return le.inverse_transform(y) offset = 1e5 if cyc is not None: # ENCODER if direction is not None: return ( encode_direction(direction) * np.vectorize(int)(offset * wmo_or_uid + cyc).ravel() ) else: return np.vectorize(int)(offset * wmo_or_uid + cyc).ravel() else: # DECODER drc = decode_direction(np.sign(wmo_or_uid)) wmo = np.vectorize(int)(np.abs(wmo_or_uid) / offset) cyc = -np.vectorize(int)(offset * wmo - np.abs(wmo_or_uid)) return wmo, cyc, drc
[docs] def point2profile(self): # noqa: C901 """ Transform a collection of points into a collection of profiles """ if self._type != "point": raise InvalidDatasetStructure( "Method only available to a collection of points" ) this = self._obj # Should not be modified def fillvalue(da): """ Return fillvalue for a dataarray """ # https://docs.scipy.org/doc/numpy/reference/generated/numpy.dtype.kind.html#numpy.dtype.kind if da.dtype.kind in ["U"]: fillvalue = " " elif da.dtype.kind == "i": fillvalue = 99999 elif da.dtype.kind == "M": fillvalue = np.datetime64("NaT") else: fillvalue = np.nan return fillvalue # Find the number of profiles (N_PROF) and vertical levels (N_LEVELS): dummy_argo_uid = xr.DataArray( self.uid( this["PLATFORM_NUMBER"].values, this["CYCLE_NUMBER"].values, this["DIRECTION"].values, ), dims="N_POINTS", coords={"N_POINTS": this["N_POINTS"]}, name="dummy_argo_uid", ) N_PROF = len(np.unique(dummy_argo_uid)) N_LEVELS = int( xr.DataArray( np.ones_like(this["N_POINTS"].values), dims="N_POINTS", coords={"N_POINTS": this["N_POINTS"]}, ) .groupby(dummy_argo_uid) .sum() .max() .values ) assert N_PROF * N_LEVELS >= len(this["N_POINTS"]) # Store the initial set of coordinates: coords_list = list(this.coords) this = this.reset_coords() # For each variables, determine if it has unique value by profile, # if yes: the transformed variable should be [N_PROF] # if no: the transformed variable should be [N_PROF, N_LEVELS] count = np.zeros((N_PROF, len(this.data_vars)), "int") for i_prof, grp in enumerate(this.groupby(dummy_argo_uid)): i_uid, prof = grp for iv, vname in enumerate(this.data_vars): count[i_prof, iv] = len(np.unique(prof[vname])) # Variables with a unique value for each profiles: list_1d = list(np.array(this.data_vars)[count.sum(axis=0) == count.shape[0]]) # Variables with more than 1 value for each profiles: list_2d = list(np.array(this.data_vars)[count.sum(axis=0) != count.shape[0]]) # Create new empty dataset: new_ds = [] for vname in list_2d: new_ds.append( xr.DataArray( np.full( (N_PROF, N_LEVELS), fillvalue(this[vname]), dtype=this[vname].dtype, ), dims=["N_PROF", "N_LEVELS"], coords={ "N_PROF": np.arange(N_PROF), "N_LEVELS": np.arange(N_LEVELS), }, attrs=this[vname].attrs, name=vname, ) ) for vname in list_1d: new_ds.append( xr.DataArray( np.full((N_PROF,), fillvalue(this[vname]), dtype=this[vname].dtype), dims=["N_PROF"], coords={"N_PROF": np.arange(N_PROF)}, attrs=this[vname].attrs, name=vname, ) ) new_ds = xr.merge(new_ds) # Now fill in each profile values: for i_prof, grp in enumerate(this.groupby(dummy_argo_uid)): i_uid, prof = grp for iv, vname in enumerate(this.data_vars): # ['N_PROF', 'N_LEVELS'] array: if len(new_ds[vname].dims) == 2: y = new_ds[vname].values x = prof[vname].values try: y[i_prof, 0: len(x)] = x except Exception: print(vname, "input", x.shape, "output", y[i_prof, :].shape) raise new_ds[vname].values = y else: # ['N_PROF', ] array: y = new_ds[vname].values x = prof[vname].values y[i_prof] = np.unique(x)[0] # Restore coordinate variables: new_ds = new_ds.set_coords([c for c in coords_list if c in new_ds]) # Misc formatting new_ds = new_ds.sortby("TIME") new_ds = new_ds.argo.cast_types() new_ds = new_ds[np.sort(new_ds.data_vars)] new_ds.encoding = self.encoding # Preserve low-level encoding information new_ds.attrs = self.attrs # Preserve original attributes new_ds.argo._add_history("Transformed with point2profile") new_ds.argo._type = "profile" return new_ds
[docs] def profile2point(self): """ Convert a collection of profiles to a collection of points """ if self._type != "profile": raise InvalidDatasetStructure( "Method only available for a collection of profiles (N_PROF dimemsion)" ) ds = self._obj # Remove all variables for which a dimension is length=0 (eg: N_HISTORY) dim_list = [] for v in ds.data_vars: dims = ds[v].dims for d in dims: if len(ds[d]) == 0: dim_list.append(d) break # Drop dimensions and associated variables from this dataset ds = ds.drop_dims(np.unique(dim_list)) # Remove any variable that is not with dimensions (N_PROF,) or (N_PROF, N_LEVELS) for v in ds: dims = list(ds[v].dims) dims = ".".join(dims) if dims not in ["N_PROF", "N_PROF.N_LEVELS"]: ds = ds.drop_vars(v) (ds,) = xr.broadcast(ds) ds = ds.stack({"N_POINTS": list(ds.dims)}) ds = ds.reset_index("N_POINTS").drop_vars(["N_PROF", "N_LEVELS"]) possible_coords = ["LATITUDE", "LONGITUDE", "TIME", "JULD", "N_POINTS"] for c in [c for c in possible_coords if c in ds.data_vars]: ds = ds.set_coords(c) # Remove index without data (useless points) ds = ds.where(~np.isnan(ds["PRES"]), drop=1) ds = ds.sortby("TIME") ds["N_POINTS"] = np.arange(0, len(ds["N_POINTS"])) ds = ds.argo.cast_types() ds = ds[np.sort(ds.data_vars)] ds.encoding = self.encoding # Preserve low-level encoding information ds.attrs = self.attrs # Preserve original attributes ds.argo._add_history("Transformed with profile2point") ds.argo._type = "point" return ds
[docs] def filter_data_mode( # noqa: C901 self, keep_error: bool = True, errors: str = "raise" ): """ Filter variables according to their data mode This filter applies to <PARAM> and <PARAM_QC> For data mode 'R' and 'A': keep <PARAM> (eg: 'PRES', 'TEMP' and 'PSAL') For data mode 'D': keep <PARAM_ADJUSTED> (eg: 'PRES_ADJUSTED', 'TEMP_ADJUSTED' and 'PSAL_ADJUSTED') Since ADJUSTED variables are not required anymore after the filter, all *ADJUSTED* variables are dropped in order to avoid confusion wrt variable content. DATA_MODE is preserved for the record. Parameters ---------- keep_error: bool, optional If true (default) keep the measurements error fields or not. errors: {'raise','ignore'}, optional If 'raise' (default), raises a InvalidDatasetStructure error if any of the expected dataset variables is not found. If 'ignore', fails silently and return unmodified dataset. Returns ------- :class:`xarray.Dataset` """ if self._type != "point": raise InvalidDatasetStructure( "Method only available to a collection of points" ) ######### # Sub-functions ######### def safe_where_eq(xds, key, value): # xds.where(xds[key] == value, drop=True) is not safe to empty time variables, cf issue #64 try: return xds.where(xds[key] == value, drop=True) except ValueError as v: if v.args[0] == ( "zero-size array to reduction operation " "minimum which has no identity" ): # A bug in xarray will cause a ValueError if trying to # decode the times in a NetCDF file with length 0. # See: # https://github.com/pydata/xarray/issues/1329 # https://github.com/euroargodev/argopy/issues/64 # Here, we just need to return an empty array TIME = xds["TIME"] xds = xds.drop_vars("TIME") xds = xds.where(xds[key] == value, drop=True) xds["TIME"] = xr.DataArray( np.arange(len(xds["N_POINTS"])), dims="N_POINTS", attrs=TIME.attrs, ).astype(np.datetime64) xds = xds.set_coords("TIME") return xds def ds_split_datamode(xds): """ Create one dataset for each of the data_mode Split full dataset into 3 datasets """ # Real-time: argo_r = safe_where_eq(xds, "DATA_MODE", "R") for v in plist: vname = v.upper() + "_ADJUSTED" if vname in argo_r: argo_r = argo_r.drop_vars(vname) vname = v.upper() + "_ADJUSTED_QC" if vname in argo_r: argo_r = argo_r.drop_vars(vname) vname = v.upper() + "_ADJUSTED_ERROR" if vname in argo_r: argo_r = argo_r.drop_vars(vname) # Real-time adjusted: argo_a = safe_where_eq(xds, "DATA_MODE", "A") for v in plist: vname = v.upper() if vname in argo_a: argo_a = argo_a.drop_vars(vname) vname = v.upper() + "_QC" if vname in argo_a: argo_a = argo_a.drop_vars(vname) # Delayed mode: argo_d = safe_where_eq(xds, "DATA_MODE", "D") return argo_r, argo_a, argo_d def fill_adjusted_nan(this_ds, vname): """Fill in the adjusted field with the non-adjusted wherever it is NaN Ensure to have values even for bad QC data in delayed mode """ ii = this_ds.where(np.isnan(this_ds[vname + "_ADJUSTED"]), drop=1)[ "N_POINTS" ] this_ds[vname + "_ADJUSTED"].loc[dict(N_POINTS=ii)] = this_ds[vname].loc[ dict(N_POINTS=ii) ] return this_ds def merge_arrays(this_argo_r, this_argo_a, this_argo_d, this_vname): """ Merge one variable from 3 DataArrays Based on xarray merge function with ’no_conflicts’: only values which are not null in all datasets must be equal. The returned dataset then contains the combination of all non-null values. Return a xarray.DataArray """ def merge_this(a1, a2, a3): return xr.merge((xr.merge((a1, a2)), a3)) DA = merge_this( this_argo_r[this_vname], this_argo_a[this_vname + "_ADJUSTED"].rename(this_vname), this_argo_d[this_vname + "_ADJUSTED"].rename(this_vname), ) DA_QC = merge_this( this_argo_r[this_vname + "_QC"], this_argo_a[this_vname + "_ADJUSTED_QC"].rename(this_vname + "_QC"), this_argo_d[this_vname + "_ADJUSTED_QC"].rename(this_vname + "_QC"), ) if keep_error: DA_ERROR = xr.merge( ( this_argo_a[this_vname + "_ADJUSTED_ERROR"].rename( this_vname + "_ERROR" ), this_argo_d[this_vname + "_ADJUSTED_ERROR"].rename( this_vname + "_ERROR" ), ) ) DA = merge_this(DA, DA_QC, DA_ERROR) else: DA = xr.merge((DA, DA_QC)) return DA ######### # filter ######### ds = self._obj if "DATA_MODE" not in ds: if errors: raise InvalidDatasetStructure( "Method only available for dataset with a 'DATA_MODE' variable " ) else: # todo should raise a warning instead ? return ds # Define variables to filter: possible_list = [ "PRES", "TEMP", "PSAL", "DOXY", "CHLA", "BBP532", "BBP700", "DOWNWELLING_PAR", "DOWN_IRRADIANCE380", "DOWN_IRRADIANCE412", "DOWN_IRRADIANCE490", ] plist = [p for p in possible_list if p in ds.data_vars] # Create one dataset for each of the data_mode: argo_r, argo_a, argo_d = ds_split_datamode(ds) # Fill in the adjusted field with the non-adjusted wherever it is NaN for v in plist: argo_d = fill_adjusted_nan(argo_d, v.upper()) # Drop QC fields in delayed mode dataset: for v in plist: vname = v.upper() if vname in argo_d: argo_d = argo_d.drop_vars(vname) vname = v.upper() + "_QC" if vname in argo_d: argo_d = argo_d.drop_vars(vname) # Create new arrays with the appropriate variables: vlist = [merge_arrays(argo_r, argo_a, argo_d, v) for v in plist] # Create final dataset by merging all available variables final = xr.merge(vlist) # Merge with all other variables: other_variables = list( set([v for v in list(ds.data_vars) if "ADJUSTED" not in v]) - set(list(final.data_vars)) ) # other_variables.remove('DATA_MODE') # Not necessary anymore for p in other_variables: final = xr.merge((final, ds[p])) final.attrs = ds.attrs final.argo._add_history("Variables filtered according to DATA_MODE") final = final[np.sort(final.data_vars)] # Cast data types and add attributes: final = final.argo.cast_types() return final
[docs] def filter_qc( # noqa: C901 self, QC_list=[1, 2], QC_fields="all", drop=True, mode="all", mask=False ): """ Filter data set according to QC values Filter the dataset to keep points where ``all`` or ``any`` of the QC fields has a value in the list of integer QC flags. This method can return the filtered dataset or the filter mask. Parameters ---------- QC_list: list(int) List of QC flag values (integers) to keep QC_fields: 'all' or list(str) List of QC fields to consider to apply the filter. By default we use all available QC fields drop: bool Drop values not matching the QC filter, default is True mode: str Must be ``all`` (default) or ``any``. Boolean operator on QC values: should we keep points matching ``all`` QC fields or 'any' one of them. mask: bool ``False`` by default. Determine if we should return the QC mask or the filtered dataset. Returns ------- :class:`xarray.Dataset` """ if self._type != "point": raise InvalidDatasetStructure( "Method only available to a collection of points" ) if mode not in ["all", "any"]: raise ValueError("Mode must be 'all' or 'any'") # Make sure we deal with a list of integers: if not isinstance(QC_list, list): if isinstance(QC_list, np.ndarray): QC_list = list(QC_list) else: QC_list = [QC_list] QC_list = [abs(int(qc)) for qc in QC_list] this = self._obj # Extract QC fields: if isinstance(QC_fields, str) and QC_fields == "all": QC_fields = [] for v in this.data_vars: if "QC" in v and "PROFILE" not in v: QC_fields.append(v) elif is_list_of_strings(QC_fields): for v in QC_fields: if v not in this.data_vars: raise ValueError( "%s not found in this dataset while trying to apply QC filter" % v ) else: raise ValueError( "Invalid content for parameter 'QC_fields'. Use 'all' or a list of strings" ) log.debug( "filter_qc: Filtering dataset to keep points with QC in %s for '%s' fields in %s" % (QC_list, mode, ",".join(QC_fields)) ) # log.debug("filter_qc: Filter applied to '%s' of the fields: %s" % (mode, ",".join(QC_fields))) QC_fields = this[QC_fields] for v in QC_fields.data_vars: QC_fields[v] = QC_fields[v].astype(int) # Now apply filter this_mask = xr.DataArray( np.zeros_like(QC_fields["N_POINTS"]), dims=["N_POINTS"], coords={"N_POINTS": QC_fields["N_POINTS"]}, ) for v in QC_fields.data_vars: for qc_value in QC_list: this_mask += QC_fields[v] == qc_value if mode == "all": this_mask = this_mask == len(QC_fields) # all else: this_mask = this_mask >= 1 # any if not mask: this = this.argo._where(this_mask, drop=drop) this.argo._add_history("Variables selected according to QC") # this = this.argo.cast_types() return this else: return this_mask
[docs] def filter_scalib_pres(self, force: str = "default", inplace: bool = True): """ Filter variables according to OWC salinity calibration software requirements By default: this filter will return a dataset with raw PRES, PSAL and TEMP; and if PRES is adjusted, PRES variable will be replaced by PRES_ADJUSTED. With option force='raw', you can force the filter to return a dataset with raw PRES, PSAL and TEMP whether PRES is adjusted or not. With option force='adjusted', you can force the filter to return a dataset where PRES/PSAL and TEMP replaced with adjusted variables: PRES_ADJUSTED, PSAL_ADJUSTED, TEMP_ADJUSTED. Since ADJUSTED variables are not required anymore after the filter, all *ADJUSTED* variables are dropped in order to avoid confusion wrt variable content. Parameters ---------- force: str Use force='default' to load PRES/PSAL/TEMP or PRES_ADJUSTED/PSAL/TEMP according to PRES_ADJUSTED filled or not. Use force='raw' to force load of PRES/PSAL/TEMP Use force='adjusted' to force load of PRES_ADJUSTED/PSAL_ADJUSTED/TEMP_ADJUSTED inplace: boolean, True by default If True, return the filtered input :class:`xarray.Dataset` If False, return a new :class:`xarray.Dataset` Returns ------- :class:`xarray.Dataset` """ if not with_gsw: raise ModuleNotFoundError("This functionality requires the gsw library") this = self._obj # Will work with a collection of points to_profile = False if this.argo._type == "profile": to_profile = True this = this.argo.profile2point() if force == "raw": # PRES/PSAL/TEMP are not changed # All ADJUSTED variables are removed (not required anymore, avoid confusion with variable content): this = this.drop_vars([v for v in this.data_vars if "ADJUSTED" in v]) elif force == "adjusted": # PRES/PSAL/TEMP are replaced by PRES_ADJUSTED/PSAL_ADJUSTED/TEMP_ADJUSTED for v in ["PRES", "PSAL", "TEMP"]: if "%s_ADJUSTED" % v in this.data_vars: this[v] = this["%s_ADJUSTED" % v] this["%s_ERROR" % v] = this["%s_ADJUSTED_ERROR" % v] this["%s_QC" % v] = this["%s_ADJUSTED_QC" % v] else: raise InvalidDatasetStructure( "%s_ADJUSTED not in this dataset. Tip: fetch data in 'expert' mode" % v ) # All ADJUSTED variables are removed (not required anymore, avoid confusion with variable content): this = this.drop_vars([v for v in this.data_vars if "ADJUSTED" in v]) else: # In default mode, we just need to do something if PRES_ADJUSTED is different from PRES, meaning # pressure was adjusted: if np.any(this["PRES_ADJUSTED"] == this["PRES"]): # Yes # We need to recompute salinity with adjusted pressur, so # Compute raw conductivity from raw salinity and raw pressure: cndc = gsw.C_from_SP( this["PSAL"].values, this["TEMP"].values, this["PRES"].values ) # Then recompute salinity with adjusted pressure: sp = gsw.SP_from_C( cndc, this["TEMP"].values, this["PRES_ADJUSTED"].values ) # Now fill in filtered variables (no need to change TEMP): this["PRES"] = this["PRES_ADJUSTED"] this["PRES_QC"] = this["PRES_ADJUSTED_QC"] this["PSAL"].values = sp # Finally drop everything not required anymore: this = this.drop_vars([v for v in this.data_vars if "ADJUSTED" in v]) # Manage output: this.argo._add_history("Variables filtered according to OWC methodology") this = this[np.sort(this.data_vars)] if to_profile: this = this.argo.point2profile() # Manage output: if inplace: self._obj = this return self._obj else: return this
[docs] def interp_std_levels(self, std_lev: list or np.array, axis: str = 'PRES'): """ Interpolate measurements to standard pressure levels Parameters ---------- std_lev: list or np.array Standard pressure levels used for interpolation. It has to be 1-dimensional and monotonic. axis: str, default: ``PRES`` The dataset variable to use as pressure axis. This could be ``PRES`` or ``PRES_ADJUSTED``. Returns ------- :class:`xarray.Dataset` """ this_dsp = self._obj if (type(std_lev) is np.ndarray) | (type(std_lev) is list): std_lev = np.array(std_lev) if (np.any(sorted(std_lev) != std_lev)) | (np.any(std_lev < 0)): raise ValueError( "Standard levels must be a list or a numpy array of positive and sorted values" ) else: raise ValueError( "Standard levels must be a list or a numpy array of positive and sorted values" ) if axis not in ['PRES', 'PRES_ADJUSTED']: raise ValueError("'axis' option must be 'PRES' or 'PRES_ADJUSTED'") if self._type != "profile": raise InvalidDatasetStructure( "Method only available for a collection of profiles" ) # Will work with a collection of profiles: # to_point = False # if this_ds.argo._type == "point": # to_point = True # this_dsp = this_ds.argo.point2profile() # else: # this_dsp = this_ds.copy(deep=True) # Selecting profiles that have a max(pressure) > max(std_lev) to avoid extrapolation in that direction # For levels < min(pressure), first level values of the profile are extended to surface. i1 = this_dsp[axis].max("N_LEVELS") >= std_lev[-1] this_dsp = this_dsp.where(i1, drop=True) # check if any profile is left, ie if any profile match the requested depth if len(this_dsp["N_PROF"]) == 0: warnings.warn( "None of the profiles can be interpolated (not reaching the requested depth range)." ) return None # add new vertical dimensions, this has to be in the datasets to apply ufunc later this_dsp["Z_LEVELS"] = xr.DataArray(std_lev, dims={"Z_LEVELS": std_lev}) # init ds_out = xr.Dataset() # vars to interpolate datavars = [ dv for dv in list(this_dsp.variables) if set(["N_LEVELS", "N_PROF"]) == set(this_dsp[dv].dims) and "QC" not in dv and "ERROR" not in dv ] # coords coords = [dv for dv in list(this_dsp.coords)] # vars depending on N_PROF only solovars = [ dv for dv in list(this_dsp.variables) if dv not in datavars and dv not in coords and "QC" not in dv and "ERROR" not in dv ] for dv in datavars: ds_out[dv] = linear_interpolation_remap( this_dsp[axis], this_dsp[dv], this_dsp["Z_LEVELS"], z_dim="N_LEVELS", z_regridded_dim="Z_LEVELS", ) ds_out = ds_out.rename({"remapped": "%s_INTERPOLATED" % axis}) for sv in solovars: ds_out[sv] = this_dsp[sv] for co in coords: ds_out.coords[co] = this_dsp[co] ds_out = ds_out.drop_vars(["N_LEVELS", "Z_LEVELS"]) ds_out = ds_out[np.sort(ds_out.data_vars)] ds_out = ds_out.argo.cast_types() ds_out.attrs = self.attrs # Preserve original attributes ds_out.argo._add_history("Interpolated on standard %s levels" % axis) # if to_point: # ds_out = ds_out.argo.profile2point() return ds_out
[docs] def groupby_pressure_bins(self, # noqa: C901 bins: list or np.array, axis: str = 'PRES', right: bool = False, select: str = 'deep', squeeze: bool = True, merge: bool = True): """ Group measurements by pressure bins This method can be used to subsample and align an irregular dataset (pressure not being similar in all profiles) on a set of pressure bins. The output dataset could then be used to perform statistics along the ``N_PROF`` dimension because ``N_LEVELS`` will corresponds to similar pressure bins, while avoiding to interpolate data. Parameters ---------- bins: list or np.array, Array of bins. It has to be 1-dimensional and monotonic. Bins of data are localised using values from options `axis` (default: ``PRES``) and `right` (default: ``False``), see below. axis: str, default: ``PRES`` The dataset variable to use as pressure axis. This could be ``PRES`` or ``PRES_ADJUSTED`` right: bool, default: False Indicating whether the bin intervals include the right or the left bin edge. Default behavior is (right==False) indicating that the interval does not include the right edge. The left bin end is open in this case, i.e., bins[i-1] <= x < bins[i] is the default behavior for monotonically increasing bins. Note the ``merge`` option is intended to work only for the default ``right=False``. select: {'deep','shallow','middle','random','min','max','mean','median'}, default: 'deep' The value selection method for bins. This selection can be based on values at the pressure axis level with: ``deep`` (default), ``shallow``, ``middle``, ``random``. For instance, ``select='deep'`` will lead to the value returned for a bin to be taken at the deepest pressure level in the bin. Or this selection can be based on statistics of measurements in a bin. Stats available are: ``min``, ``max``, ``mean``, ``median``. For instance ``select='mean'`` will lead to the value returned for a bin to be the mean of all measurements in the bin. squeeze: bool, default: True Squeeze from the output bin levels without measurements. merge: bool, default: True Optimize the output bins axis size by merging levels with/without data. The pressure bins axis is modified accordingly. This means that the return ``STD_PRES_BINS`` axis has not necessarily the same size as the input ``bins``. Returns ------- :class:`xarray.Dataset` See Also -------- :class:`numpy.digitize`, :class:`argopy.utilities.groupby_remap` """ this_ds = self._obj if (type(bins) is np.ndarray) | (type(bins) is list): bins = np.array(bins) if (np.any(sorted(bins) != bins)) | (np.any(bins < 0)): raise ValueError( "Standard bins must be a list or a numpy array of positive and sorted values" ) else: raise ValueError( "Standard bins must be a list or a numpy array of positive and sorted values" ) if axis not in ['PRES', 'PRES_ADJUSTED']: raise ValueError("'axis' option must be 'PRES' or 'PRES_ADJUSTED'") # Will work with a collection of profiles: to_point = False if this_ds.argo._type == "point": to_point = True this_dsp = this_ds.argo.point2profile() else: this_dsp = this_ds.copy(deep=True) # Adjust bins axis if we possibly have to squeeze empty bins: h, bin_edges = np.histogram(np.unique(np.round(this_dsp[axis], 1)), bins) N_bins_empty = len(np.where(h == 0)[0]) # check if any profile is left, ie if any profile match the requested bins if N_bins_empty == len(h): warnings.warn( "None of the profiles can be aligned (pressure values out of bins range)." ) return None if N_bins_empty > 0 and squeeze: log.debug( "bins axis was squeezed to full bins only (%i bins found empty out of %i)" % (N_bins_empty, len(bins))) bins = bins[np.where(h > 0)] def replace_i_level_values(this_da, this_i_level, new_values_along_profiles): """ Convenience fct to update only one level of a ["N_PROF", "N_LEVELS"] xr.DataArray""" if this_da.dims == ("N_PROF", "N_LEVELS"): values = this_da.values values[:, this_i_level] = new_values_along_profiles this_da.values = values # else: # raise ValueError("Array not with expected ['N_PROF', 'N_LEVELS'] shape") return this_da def nanmerge(x, y): """ Merge two 1D array Given 2 arrays x, y of 1 dimension, return a new array with: - x values where x is not NaN - y values where x is NaN """ z = x.copy() for i, v in enumerate(x): if np.isnan(v): z[i] = y[i] return z merged_is_nan = lambda l1, l2: len(np.unique(np.where(np.isnan(l1.values + l2.values)))) == len(l1) # noqa: E731 def merge_bin_matching_levels(this_ds: xr.Dataset) -> xr.Dataset: """ Levels merger of type 'bins' value Merge pair of lines with the following pattern: nan, VAL, VAL, nan, VAL, VAL BINVAL, nan, nan, BINVAL, nan, nan This pattern is due to the bins definition: bins[i] <= x < bins[i+1] Parameters ---------- :class:`xarray.Dataset` Returns ------- :class:`xarray.Dataset` """ new_ds = this_ds.copy(deep=True) N_LEVELS = new_ds.argo.N_LEVELS idel = [] for i_level in range(0, N_LEVELS - 1 - 1): this_ds_level = this_ds[axis].isel(N_LEVELS=i_level) this_ds_dw = this_ds[axis].isel(N_LEVELS=i_level + 1) pres_dw = np.unique(this_ds_dw[~np.isnan(this_ds_dw)]) if len(pres_dw) == 1 \ and pres_dw[0] in this_ds["STD_%s_BINS" % axis] \ and merged_is_nan(this_ds_level, this_ds_dw): new_values = nanmerge(this_ds_dw.values, this_ds_level.values) replace_i_level_values(new_ds[axis], i_level, new_values) idel.append(i_level + 1) ikeep = [i for i in np.arange(0, new_ds.argo.N_LEVELS - 1) if i not in idel] new_ds = new_ds.isel(N_LEVELS=ikeep) new_ds = new_ds.assign_coords({'N_LEVELS': np.arange(0, len(new_ds['N_LEVELS']))}) val = new_ds[axis].values new_ds[axis].values = np.where(val == 0, np.nan, val) return new_ds def merge_all_matching_levels(this_ds: xr.Dataset) -> xr.Dataset: """ Levels merger Merge any pair of levels with a "matching" pattern like this: VAL, VAL, VAL, nan, nan, VAL, nan, nan, nan, nan, nan, VAL, VAL, nan, VAL, nan This pattern is due to a strict application of the bins definition. But when bins are small (eg: 10db), many bins can have no data. This has the consequence to change the size and number of the bins. Parameters ---------- :class:`xarray.Dataset` Returns ------- :class:`xarray.Dataset` """ new_ds = this_ds.copy(deep=True) N_LEVELS = new_ds.argo.N_LEVELS idel = [] for i_level in range(0, N_LEVELS): if i_level + 1 < N_LEVELS: this_ds_level = this_ds[axis].isel(N_LEVELS=i_level) this_ds_dw = this_ds[axis].isel(N_LEVELS=i_level + 1) if merged_is_nan(this_ds_level, this_ds_dw): new_values = nanmerge(this_ds_level.values, this_ds_dw.values) replace_i_level_values(new_ds[axis], i_level, new_values) idel.append(i_level + 1) ikeep = [i for i in np.arange(0, new_ds.argo.N_LEVELS - 1) if i not in idel] new_ds = new_ds.isel(N_LEVELS=ikeep) new_ds = new_ds.assign_coords({'N_LEVELS': np.arange(0, len(new_ds['N_LEVELS']))}) val = new_ds[axis].values new_ds[axis].values = np.where(val == 0, np.nan, val) return new_ds # init new_ds = [] # add new vertical dimensions, this has to be in the datasets to apply ufunc later this_dsp["Z_LEVELS"] = xr.DataArray(bins, dims={"Z_LEVELS": bins}) # vars to align if select in ["shallow", "deep", "middle", "random"]: datavars = [ dv for dv in list(this_dsp.data_vars) if set(["N_LEVELS", "N_PROF"]) == set(this_dsp[dv].dims) ] else: datavars = [ dv for dv in list(this_dsp.data_vars) if set(["N_LEVELS", "N_PROF"]) == set(this_dsp[dv].dims) and "QC" not in dv and "ERROR" not in dv ] # All other variables: othervars = [ dv for dv in list(this_dsp.variables) if dv not in datavars and dv not in this_dsp.coords ] # Sub-sample and align: for dv in datavars: v = groupby_remap( this_dsp[axis], this_dsp[dv], this_dsp["Z_LEVELS"], z_dim="N_LEVELS", z_regridded_dim="Z_LEVELS", select=select, right=right ) v.name = this_dsp[dv].name v.attrs = this_dsp[dv].attrs new_ds.append(v) # Finish new_ds = xr.merge(new_ds) new_ds = new_ds.rename({"remapped": "N_LEVELS"}) new_ds = new_ds.assign_coords({'N_LEVELS': range(0, len(new_ds['N_LEVELS']))}) # new_ds["STD_%s_BINS" % axis] = new_ds['N_LEVELS'] new_ds["STD_%s_BINS" % axis] = xr.DataArray(bins, dims=['N_LEVELS'], attrs={'Comment': "Range of bins is: bins[i] <= x < bins[i+1] for i=[0,N_LEVELS-2]\n" "Last bins is bins[N_LEVELS-1] <= x"} ) new_ds = new_ds.set_coords("STD_%s_BINS" % axis) new_ds.attrs = this_ds.attrs for dv in othervars: new_ds[dv] = this_dsp[dv] new_ds = new_ds.argo.cast_types() new_ds = new_ds[np.sort(new_ds.data_vars)] new_ds.attrs = this_dsp.attrs # Preserve original attributes new_ds.argo._add_history("Sub-sampled and re-aligned on standard bins") if merge: new_ds = merge_bin_matching_levels(new_ds) new_ds = merge_all_matching_levels(new_ds) if to_point: new_ds = new_ds.argo.profile2point() return new_ds
[docs] def teos10( # noqa: C901 self, vlist: list = ["SA", "CT", "SIG0", "N2", "PV", "PTEMP"], inplace: bool = True): """ Add TEOS10 variables to the dataset By default, adds: 'SA', 'CT' Other possible variables: 'SIG0', 'N2', 'PV', 'PTEMP', 'SOUND_SPEED' Relies on the gsw library. If one exists, the correct CF standard name will be added to the attrs. Parameters ---------- vlist: list(str) List with the name of variables to add. Must be a list containing one or more of the following string values: * ``SA`` Adds an absolute salinity variable * ``CT`` Adds a conservative temperature variable * ``SIG0`` Adds a potential density anomaly variable referenced to 0 dbar * ``N2`` Adds a buoyancy (Brunt-Vaisala) frequency squared variable. This variable has been regridded to the original pressure levels in the Dataset using a linear interpolation. * ``PV`` Adds a planetary vorticity variable calculated from :math:`\\frac{f N^2}{\\text{gravity}}`. This is not a TEOS-10 variable from the gsw toolbox, but is provided for convenience. This variable has been regridded to the original pressure levels in the Dataset using a linear interpolation. * ``PTEMP`` Add potential temperature * ``SOUND_SPEED`` Add sound speed * ``CNDC`` Add Electrical Conductivity inplace: boolean, True by default * If True, return the input :class:`xarray.Dataset` with new TEOS10 variables added as a new :class:`xarray.DataArray`. * If False, return a :class:`xarray.Dataset` with new TEOS10 variables Returns ------- :class:`xarray.Dataset` """ if not with_gsw: raise ModuleNotFoundError("This functionality requires the gsw library") allowed = ["SA", "CT", "SIG0", "N2", "PV", "PTEMP", "SOUND_SPEED", "CNDC"] if any(var not in allowed for var in vlist): raise ValueError( f"vlist must be a subset of {allowed}, instead found {vlist}" ) if is_list_equal(vlist, ["SA", "CT", "SIG0", "N2", "PV", "PTEMP"]): warnings.warn( "Default variables will be reduced to 'SA' and 'CT' in 0.1.9", category=FutureWarning, ) this = self._obj to_profile = False if self._type == "profile": to_profile = True this = this.argo.profile2point() # Get base variables as numpy arrays: psal = this["PSAL"].values temp = this["TEMP"].values pres = this["PRES"].values lon = this["LONGITUDE"].values lat = this["LATITUDE"].values # Coriolis f = gsw.f(lat) # Absolute salinity sa = gsw.SA_from_SP(psal, pres, lon, lat) # Conservative temperature ct = gsw.CT_from_t(sa, temp, pres) # Potential Temperature if "PTEMP" in vlist: pt = gsw.pt_from_CT(sa, ct) # Potential density referenced to surface if "SIG0" in vlist: sig0 = gsw.sigma0(sa, ct) # Electrical conductivity if "CNDC" in vlist: cndc = gsw.C_from_SP(psal, temp, pres) # N2 if "N2" in vlist or "PV" in vlist: n2_mid, p_mid = gsw.Nsquared(sa, ct, pres, lat) # N2 on the CT grid: ishallow = (slice(0, -1), Ellipsis) ideep = (slice(1, None), Ellipsis) def mid(x): return 0.5 * (x[ideep] + x[ishallow]) n2 = np.zeros(ct.shape) * np.nan n2[1:-1] = mid(n2_mid) # PV: if "PV" in vlist: pv = f * n2 / gsw.grav(lat, pres) # Sound Speed: if "SOUND_SPEED" in vlist: cs = gsw.sound_speed(sa, ct, pres) # Back to the dataset: that = [] if "SA" in vlist: SA = xr.DataArray(sa, coords=this["PSAL"].coords, name="SA") SA.attrs["long_name"] = "Absolute Salinity" SA.attrs["standard_name"] = "sea_water_absolute_salinity" SA.attrs["unit"] = "g/kg" that.append(SA) if "CT" in vlist: CT = xr.DataArray(ct, coords=this["TEMP"].coords, name="CT") CT.attrs["long_name"] = "Conservative Temperature" CT.attrs["standard_name"] = "sea_water_conservative_temperature" CT.attrs["unit"] = "degC" that.append(CT) if "SIG0" in vlist: SIG0 = xr.DataArray(sig0, coords=this["TEMP"].coords, name="SIG0") SIG0.attrs[ "long_name" ] = "Potential density anomaly with reference pressure of 0 dbar" SIG0.attrs["standard_name"] = "sea_water_sigma_theta" SIG0.attrs["unit"] = "kg/m^3" that.append(SIG0) if "CNDC" in vlist: CNDC = xr.DataArray(cndc, coords=this["TEMP"].coords, name="CNDC") CNDC.attrs["long_name"] = "Electrical Conductivity" CNDC.attrs["standard_name"] = "sea_water_electrical_conductivity" CNDC.attrs["unit"] = "mS/cm" that.append(CNDC) if "N2" in vlist: N2 = xr.DataArray(n2, coords=this["TEMP"].coords, name="N2") N2.attrs["long_name"] = "Squared buoyancy frequency" N2.attrs["unit"] = "1/s^2" that.append(N2) if "PV" in vlist: PV = xr.DataArray(pv, coords=this["TEMP"].coords, name="PV") PV.attrs["long_name"] = "Planetary Potential Vorticity" PV.attrs["unit"] = "1/m/s" that.append(PV) if "PTEMP" in vlist: PTEMP = xr.DataArray(pt, coords=this["TEMP"].coords, name="PTEMP") PTEMP.attrs["long_name"] = "Potential Temperature" PTEMP.attrs["standard_name"] = "sea_water_potential_temperature" PTEMP.attrs["unit"] = "degC" that.append(PTEMP) if "SOUND_SPEED" in vlist: CS = xr.DataArray(cs, coords=this["TEMP"].coords, name="SOUND_SPEED") CS.attrs["long_name"] = "Speed of sound" CS.attrs["standard_name"] = "speed_of_sound_in_sea_water" CS.attrs["unit"] = "m/s" that.append(CS) # Create a dataset with all new variables: that = xr.merge(that) # Add to the dataset essential Argo variables (allows to keep using the argo accessor): that = that.assign( { k: this[k] for k in [ "TIME", " LATITUDE", "LONGITUDE", "PRES", "PRES_ADJUSTED", "PLATFORM_NUMBER", "CYCLE_NUMBER", "DIRECTION", ] if k in this } ) # Manage output: if inplace: # Merge previous with new variables for v in that.variables: this[v] = that[v] if to_profile: this = this.argo.point2profile() for k in this: if k not in self._obj: self._obj[k] = this[k] return self._obj else: if to_profile: return that.argo.point2profile() else: return that
[docs] def create_float_source(self, # noqa: C901 path: str or os.PathLike = None, force: str = "default", select: str = 'deep', file_pref: str = '', file_suff: str = '', format: str = '5', do_compression: bool = True, debug_output: bool = False): """ Preprocess data for OWC software calibration This method can create a FLOAT SOURCE file (i.e. the .mat file that usually goes into /float_source/) for OWC software. The FLOAT SOURCE file is saved as: ``<path>/<file_pref><float_WMO><file_suff>.mat`` where ``<float_WMO>`` is automatically extracted from the dataset variable PLATFORM_NUMBER (in order to avoid mismatch between user input and data content). So if this dataset has measurements from more than one float, more than one Matlab file will be created. By default, variables loaded are raw PRES, PSAL and TEMP. If PRES is adjusted, variables loaded are PRES_ADJUSTED, raw PSAL calibrated in pressure and raw TEMP. You can force the program to load raw PRES, PSAL and TEMP whatever PRES is adjusted or not: >>> ds.argo.create_float_source(force='raw') or you can force the program to load adjusted variables: PRES_ADJUSTED, PSAL_ADJUSTED, TEMP_ADJUSTED >>> ds.argo.create_float_source(force='adjusted') **Pre-processing details**: #. select only ascending profiles #. subsample vertical levels to keep the deepest pressure levels on each 10db bins from the surface down to the deepest level. #. align pressure values, i.e. make sure that a pressure index corresponds to measurements from the same binned pressure values. This can lead to modify the number of levels in the dataset. #. filter variables according to the ``force`` option (see below) #. filter variables according to QC flags: * Remove measurements where timestamp QC is >= 3 * Keep measurements where pressure QC is anything but 3 * Keep measurements where pressure, temperature or salinity QC are anything but 4 #. remove dummy values: salinity not in [0/50], potential temperature not in [-10/50] and pressure not in [0/60000]. Bounds inclusive. #. convert timestamp to fractional year #. convert longitudes to 0-360 Parameters ---------- path: str or path-like, optional Path or folder name to which to save this Matlab file. If no path is provided, this function returns the resulting Matlab file as :class:`xarray.Dataset`. force: {"default", "raw", "adjusted"}, default: "default" If force='default' will load PRES/PSAL/TEMP or PRES_ADJUSTED/PSAL/TEMP according to PRES_ADJUSTED filled or not. If force='raw' will load PRES/PSAL/TEMP If force='adjusted' will load PRES_ADJUSTED/PSAL_ADJUSTED/TEMP_ADJUSTED select: {'deep','shallow','middle','random','min','max','mean','median'}, default: 'deep' file_pref: str, optional Preffix to add at the beginning of output file(s). file_suff: str, optional Suffix to add at the end of output file(s). do_compression: bool, optional Whether or not to compress matrices on write. Default is True. format: {'5', '4'}, string, optional Matlab file format version. '5' (the default) for MATLAB 5 and up (to 7.2). Use '4' for MATLAB 4 .mat files. Returns ------- :class:`xarray.Dataset` The output dataset, or Matlab file, will have the following variables (``n`` is the number of profiles, ``m`` is the number of vertical levels): - ``DATES`` (1xn): decimal year, e.g. 10 Dec 2000 = 2000.939726 - ``LAT`` (1xn): decimal degrees, -ve means south of the equator, e.g. 20.5S = -20.5 - ``LONG`` (1xn): decimal degrees, from 0 to 360, e.g. 98.5W in the eastern Pacific = 261.5E - ``PROFILE_NO`` (1xn): this goes from 1 to n. PROFILE_NO is the same as CYCLE_NO in the Argo files - ``PRES`` (mxn): dbar, from shallow to deep, e.g. 10, 20, 30 ... These have to line up along a fixed nominal depth axis. - ``TEMP`` (mxn): in-situ IPTS-90 - ``SAL`` (mxn): PSS-78 - ``PTMP`` (mxn): potential temperature referenced to zero pressure, use SAL in PSS-78 and in-situ TEMP in IPTS-90 for calculation. """ this = self._obj if ( "history" in this.attrs and "DATA_MODE" in this.attrs["history"] and "QC" in this.attrs["history"] ): # This is surely a dataset fetch with 'standard' mode, we can't deal with this, we need 'expert' file raise InvalidDatasetStructure( "Need a full Argo dataset to create OWC float source. " "This dataset was probably loaded with a 'standard' user mode. " "Try to fetch float data in 'expert' mode" ) if force not in ["default", "raw", "adjusted"]: raise OptionValueError( "force option must be 'default', 'raw' or 'adjusted'." ) log.debug("===================== START create_float_source in '%s' mode" % force) if len(np.unique(this['PLATFORM_NUMBER'])) > 1: log.debug("Found more than one 1 float in this dataset, will split processing") def ds2mat(this_dsp): # Return a Matlab dictionary with dataset data to be used by savemat: mdata = {} mdata["PROFILE_NO"] = ( this_dsp["PROFILE_NO"].astype("uint8").values.T[np.newaxis, :] ) # 1-based index in Matlab mdata["DATES"] = this_dsp["DATES"].values.T[np.newaxis, :] mdata["LAT"] = this_dsp["LAT"].values.T[np.newaxis, :] mdata["LONG"] = this_dsp["LONG"].values.T[np.newaxis, :] mdata["PRES"] = this_dsp["PRES"].values mdata["TEMP"] = this_dsp["TEMP"].values mdata["PTMP"] = this_dsp["PTMP"].values mdata["SAL"] = this_dsp["SAL"].values return mdata def pretty_print_count(dd, txt): # if dd.argo._type == "point": # np = len(dd['N_POINTS'].values) # nc = len(dd.argo.point2profile()['N_PROF'].values) # else: # np = len(dd.argo.profile2point()['N_POINTS'].values) # nc = len(dd['N_PROF'].values) out = [] np, nc = dd.argo.N_POINTS, dd.argo.N_PROF out.append("%i points / %i profiles in dataset %s" % (np, nc, txt)) # np.unique(this['PSAL_QC'].values)) # out.append(pd.to_datetime(dd['TIME'][0].values).strftime('%Y/%m/%d %H:%M:%S')) return "\n".join(out) def getfilled_bins(pressure, bins): ip = np.digitize(np.unique(pressure), bins, right=False) ii, ij = np.unique(ip, return_index=True) ii = ii[np.where(ii - 1 > 0)] - 1 return bins[ii] def preprocess_one_float(this_one: xr.Dataset, this_path: str or os.PathLike = None, select: str = 'deep', debug_output: bool = False): """ Run the entire preprocessing on a given dataset with one float data """ # Add potential temperature: if "PTEMP" not in this_one: this_one = this_one.argo.teos10(vlist=["PTEMP"], inplace=True) # Only use Ascending profiles: # https://github.com/euroargodev/dm_floats/blob/c580b15202facaa0848ebe109103abe508d0dd5b/src/ow_source/create_float_source.m#L143 this_one = this_one.argo._where(this_one["DIRECTION"] == "A", drop=True) log.debug(pretty_print_count(this_one, "after direction selection")) # Todo: ensure we load only the primary profile of cycles with multiple sampling schemes: # https://github.com/euroargodev/dm_floats/blob/c580b15202facaa0848ebe109103abe508d0dd5b/src/ow_source/create_float_source.m#L194 # # Subsample and align vertical levels (max 1 level every 10db): # https://github.com/euroargodev/dm_floats/blob/c580b15202facaa0848ebe109103abe508d0dd5b/src/ow_source/create_float_source.m#L208 # this_one = this_one.argo.align_std_bins(inplace=False) # log.debug(pretty_print_count(this_one, "after vertical levels subsampling")) # Filter variables according to OWC workflow # (I don't understand why this_one come at the end of the Matlab routine ...) # https://github.com/euroargodev/dm_floats/blob/c580b15202facaa0848ebe109103abe508d0dd5b/src/ow_source/create_float_source.m#L258 this_one = this_one.argo.filter_scalib_pres(force=force, inplace=False) log.debug(pretty_print_count(this_one, "after pressure fields selection")) # Filter along some QC: # https://github.com/euroargodev/dm_floats/blob/c580b15202facaa0848ebe109103abe508d0dd5b/src/ow_source/create_float_source.m#L372 this_one = this_one.argo.filter_qc( QC_list=[0, 1, 2], QC_fields=["TIME_QC"], drop=True ) # Matlab says to reject > 3 # https://github.com/euroargodev/dm_floats/blob/c580b15202facaa0848ebe109103abe508d0dd5b/src/ow_source/create_float_source.m#L420 this_one = this_one.argo.filter_qc( QC_list=[v for v in range(10) if v != 3], QC_fields=["PRES_QC"], drop=True ) # Matlab says to keep != 3 this_one = this_one.argo.filter_qc( QC_list=[v for v in range(10) if v != 4], QC_fields=["PRES_QC", "TEMP_QC", "PSAL_QC"], drop=True, mode="any", ) # Matlab says to keep != 4 if len(this_one["N_POINTS"]) == 0: raise DataNotFound( "All data have been discarded because either PSAL_QC or TEMP_QC is filled with 4 or" " PRES_QC is filled with 3 or 4\n" "NO SOURCE FILE WILL BE GENERATED !!!" ) log.debug(pretty_print_count(this_one, "after QC filter")) # Exclude dummies # https://github.com/euroargodev/dm_floats/blob/c580b15202facaa0848ebe109103abe508d0dd5b/src/ow_source/create_float_source.m#L427 this_one = ( this_one .argo._where(this_one["PSAL"] <= 50, drop=True) .argo._where(this_one["PSAL"] >= 0, drop=True) .argo._where(this_one["PTEMP"] <= 50, drop=True) .argo._where(this_one["PTEMP"] >= -10, drop=True) .argo._where(this_one["PRES"] <= 6000, drop=True) .argo._where(this_one["PRES"] >= 0, drop=True) ) if len(this_one["N_POINTS"]) == 0: raise DataNotFound( "All data have been discarded because they are filled with values out of range\n" "NO SOURCE FILE WILL BE GENERATED !!!" ) log.debug(pretty_print_count(this_one, "after dummy values exclusion")) # Transform measurements to a collection of profiles for Matlab-like formation: this_one = this_one.argo.point2profile() # Subsample and align vertical levels (max 1 level every 10db): # https://github.com/euroargodev/dm_floats/blob/c580b15202facaa0848ebe109103abe508d0dd5b/src/ow_source/create_float_source.m#L208 # https://github.com/euroargodev/dm_floats/blob/c580b15202facaa0848ebe109103abe508d0dd5b/src/ow_source/create_float_source.m#L451 bins = np.arange(0.0, np.max(this_one["PRES"]) + 10.0, 10.0) this_one = this_one.argo.groupby_pressure_bins(bins=bins, select=select, axis='PRES') log.debug(pretty_print_count(this_one, "after vertical levels subsampling and re-alignment")) # Compute fractional year: # https://github.com/euroargodev/dm_floats/blob/c580b15202facaa0848ebe109103abe508d0dd5b/src/ow_source/create_float_source.m#L334 DATES = np.array( [toYearFraction(d) for d in pd.to_datetime(this_one["TIME"].values)] )[np.newaxis, :] # Read measurements: PRES = this_one["PRES"].values.T # (mxn) TEMP = this_one["TEMP"].values.T # (mxn) PTMP = this_one["PTEMP"].values.T # (mxn) SAL = this_one["PSAL"].values.T # (mxn) LAT = this_one["LATITUDE"].values[np.newaxis, :] LONG = this_one["LONGITUDE"].values[np.newaxis, :] LONG[0][np.argwhere(LONG[0] < 0)] = LONG[0][np.argwhere(LONG[0] < 0)] + 360 PROFILE_NO = this_one["CYCLE_NUMBER"].values[np.newaxis, :] # Create dataset with preprocessed data: this_one_dsp_processed = xr.DataArray( PRES, dims=["m", "n"], coords={"m": np.arange(0, PRES.shape[0]), "n": np.arange(0, PRES.shape[1])}, name="PRES", ).to_dataset(promote_attrs=False) this_one_dsp_processed["TEMP"] = xr.DataArray( TEMP, dims=["m", "n"], coords={"m": np.arange(0, TEMP.shape[0]), "n": np.arange(0, TEMP.shape[1])}, name="TEMP", ) this_one_dsp_processed["PTMP"] = xr.DataArray( PTMP, dims=["m", "n"], coords={"m": np.arange(0, PTMP.shape[0]), "n": np.arange(0, PTMP.shape[1])}, name="PTMP", ) this_one_dsp_processed["SAL"] = xr.DataArray( SAL, dims=["m", "n"], coords={"m": np.arange(0, SAL.shape[0]), "n": np.arange(0, SAL.shape[1])}, name="SAL", ) this_one_dsp_processed["PROFILE_NO"] = xr.DataArray( PROFILE_NO[0, :], dims=["n"], coords={"n": np.arange(0, PROFILE_NO.shape[1])}, name="PROFILE_NO", ) this_one_dsp_processed["DATES"] = xr.DataArray( DATES[0, :], dims=["n"], coords={"n": np.arange(0, DATES.shape[1])}, name="DATES", ) this_one_dsp_processed["LAT"] = xr.DataArray( LAT[0, :], dims=["n"], coords={"n": np.arange(0, LAT.shape[1])}, name="LAT" ) this_one_dsp_processed["LONG"] = xr.DataArray( LONG[0, :], dims=["n"], coords={"n": np.arange(0, LONG.shape[1])}, name="LONG", ) this_one_dsp_processed["m"].attrs = {"long_name": "vertical levels"} this_one_dsp_processed["n"].attrs = {"long_name": "profiles"} # Create Matlab dictionary with preprocessed data (to be used by savemat): mdata = ds2mat(this_one_dsp_processed) # Output log.debug("float source data saved in: %s" % this_path) if this_path is None: if debug_output: return mdata, this_one_dsp_processed, this_one # For debug/devel else: return this_one_dsp_processed else: from scipy.io import savemat # Validity check of the path type is delegated to savemat return savemat(this_path, mdata, appendmat=False, format=format, do_compression=do_compression) # Run pre-processing for each float data output = {} for WMO in np.unique(this['PLATFORM_NUMBER']): log.debug("> Preprocessing data for float WMO %i" % WMO) this_float = this.argo._where(this['PLATFORM_NUMBER'] == WMO, drop=True) if path is None: output[WMO] = preprocess_one_float(this_float, this_path=path, select=select, debug_output=debug_output) else: os.makedirs(path, exist_ok=True) # Make path exists float_path = os.path.join(path, "%s%i%s.mat" % (file_pref, WMO, file_suff)) preprocess_one_float(this_float, this_path=float_path, select=select, debug_output=debug_output) output[WMO] = float_path if path is None: log.debug("===================== END create_float_source") return output