Source code for argopy.utils.transform

"""
Manipulate/transform xarray objects or list of objects
"""

import numpy as np
import xarray as xr
import logging
from typing import List, Union

from ..errors import InvalidDatasetStructure
from .lists import list_core_parameters


log = logging.getLogger("argopy.utils.manip")


[docs] def drop_variables_not_in_all_datasets( ds_collection: List[xr.Dataset], ) -> List[xr.Dataset]: """Drop variables that are not in all datasets (the lowest common denominator) Parameters ---------- ds_collection: List[xarray.Dataset] A list of :class:`xarray.Dataset` Returns ------- List[xarray.Dataset] """ # List all possible data variables: vlist = [] for res in ds_collection: [vlist.append(v) for v in list(res.data_vars)] vlist = np.unique(vlist) # Check if each variable are in each dataset: ishere = np.zeros((len(vlist), len(ds_collection))) for ir, res in enumerate(ds_collection): for iv, v in enumerate(res.data_vars): for iu, u in enumerate(vlist): if v == u: ishere[iu, ir] = 1 # List of dataset with missing variables: # ir_missing = np.sum(ishere, axis=0) < len(vlist) # List of variables missing in some dataset: iv_missing = np.sum(ishere, axis=1) < len(ds_collection) if len(iv_missing) > 0: log.debug( "Dropping these variables that are missing from some dataset in this list: %s" % vlist[iv_missing] ) # List of variables to keep iv_tokeep = np.sum(ishere, axis=1) == len(ds_collection) for ir, res in enumerate(ds_collection): # print("\n", res.attrs['Fetched_uri']) v_to_drop = [] for iv, v in enumerate(res.data_vars): if v not in vlist[iv_tokeep]: v_to_drop.append(v) ds_collection[ir] = ds_collection[ir].drop_vars(v_to_drop) return ds_collection
[docs] def fill_variables_not_in_all_datasets( ds_collection: List[xr.Dataset], concat_dim: str = "rows" ) -> List[xr.Dataset]: """Add empty variables to dataset so that all the collection have the same :attr:`xarray.Dataset.data_vars` and :props:`xarray.Dataset.coords` This is to make sure that the collection of dataset can be concatenated Parameters ---------- ds_collection: List[xarray.Dataset] A list of :class:`xarray.Dataset` concat_dim: str, default='rows' Name of the dimension to use to create new variables. Typically, this is the name of the dimension the collection will be concatenated along afterward. Returns ------- List[xarray.Dataset] """ def first_variable_with_concat_dim(this_ds, concat_dim="rows"): """Return the 1st variable in the collection that have the concat_dim in dims""" first = None for v in this_ds.data_vars: if concat_dim in this_ds[v].dims: first = v pass return first def fillvalue(da): """Return fillvalue for a dataarray""" # https://docs.scipy.org/doc/numpy/reference/generated/numpy.dtype.kind.html#numpy.dtype.kind if da.dtype.kind in ["U"]: fillvalue = " " elif da.dtype.kind == "i": fillvalue = 99999 elif da.dtype.kind == "M": fillvalue = np.datetime64("NaT") else: fillvalue = np.nan return fillvalue # List all possible data variables: vlist = [] for res in ds_collection: [vlist.append(v) for v in list(res.variables) if concat_dim in res[v].dims] vlist = np.unique(vlist) # log.debug('variables', vlist) # List all possible coordinates: clist = [] for res in ds_collection: [clist.append(c) for c in list(res.coords) if concat_dim in res[c].dims] clist = np.unique(clist) # log.debu('coordinates', clist) # Get the first occurrence of each variable, to be used as a template for attributes and dtype meta = {} for ir, ds in enumerate(ds_collection): for v in vlist: if v in ds.variables: meta[v] = { "attrs": ds[v].attrs, "dtype": ds[v].dtype, "fill_value": fillvalue(ds[v]), } # [log.debug(meta[m]) for m in meta.keys()] # Add missing variables to dataset datasets = [ds.copy() for ds in ds_collection] for ir, ds in enumerate(datasets): for v in vlist: if v not in ds.variables: like = ds[first_variable_with_concat_dim(ds, concat_dim=concat_dim)] datasets[ir][v] = xr.full_like( like, fill_value=meta[v]["fill_value"], dtype=meta[v]["dtype"] ) datasets[ir][v].attrs = meta[v]["attrs"] # Make sure that all datasets have the same set of coordinates results = [] for ir, ds in enumerate(datasets): results.append(datasets[ir].set_coords(clist)) # return results
def merge_param_with_param_adjusted( ds: xr.Dataset, param: str, errors: str = "raise" ) -> xr.Dataset: """Copy <PARAM>_ADJUSTED values onto <PARAM> for points where param data mode is 'A' or 'D' After values have been copied, all <PARAM>_ADJUSTED* variables are dropped to avoid confusion. For core and deep datasets (ds='phy'), we use the ``<DATA>_MODE`` variable. For the bgc dataset (ds='bgc'), we use the ``<PARAM>_DATA_MODE`` variables. The type of dataset is inferred automatically. Parameters ---------- ds: :class:`xarray.Dataset` Dataset to transform param: str Name of the parameter to merge errors: str, optional, default='raise' If 'raise': raises a InvalidDatasetStructure error if any of the expected dataset variables is not found. If 'ignore', fails silently and return unmodified dataset. Returns ------- :class:`xarray.Dataset` """ if "%s_ADJUSTED" % param not in ds: if errors == "raise": raise InvalidDatasetStructure( "Parameter '%s_ADJUSTED' adjusted values not found in this dataset" % param ) else: return ds if ds.argo._type != "point": raise InvalidDatasetStructure("Method only available to a collection of points") core_ds = False if "%s_DATA_MODE" % param not in ds and param in list_core_parameters(): if "DATA_MODE" not in ds: if errors == "raise": raise InvalidDatasetStructure( "Parameter '%s' data mode not found in this dataset (no 'DATA_MODE')" % param ) else: return ds else: core_ds = True # Create a bgc-like parameter data mode variable: ds["%s_DATA_MODE" % param] = ds["DATA_MODE"].copy() # that will be dropped at the end of the process if param not in ds: ds[param] = ds["%s_ADJUSTED" % param].copy() if "%s_QC" % param not in ds and "%s_ADJUSTED_QC" % param in ds: ds["%s_QC" % param] = ds["%s_ADJUSTED_QC" % param].copy() if "%s_ERROR" % param not in ds and "%s_ADJUSTED_ERROR" % param in ds: ds["%s_ERROR" % param] = ds["%s_ADJUSTED_ERROR" % param].copy() ii_measured = np.logical_or.reduce( ( ds["%s_DATA_MODE" % param] == "R", ds["%s_DATA_MODE" % param] == "A", ds["%s_DATA_MODE" % param] == "D", ) ) ii_missing = np.logical_and.reduce( ( ds["%s_DATA_MODE" % param] != "R", ds["%s_DATA_MODE" % param] != "A", ds["%s_DATA_MODE" % param] != "D", ) ) assert ii_measured.sum() + ii_missing.sum() == len( ds["N_POINTS"] ), "Unexpected data mode values !" ii_measured_adj = np.logical_and.reduce( ( ii_measured, np.logical_or.reduce( (ds["%s_DATA_MODE" % param] == "A", ds["%s_DATA_MODE" % param] == "D") ), ) ) # Copy param_adjusted values onto param indexes where data_mode is in 'a' or 'd': ds["%s" % param].loc[dict(N_POINTS=ii_measured_adj)] = ds[ "%s_ADJUSTED" % param ].loc[dict(N_POINTS=ii_measured_adj)] ds = ds.drop_vars(["%s_ADJUSTED" % param]) if "%s_ADJUSTED_QC" % param in ds and "%s_ADJUSTED_QC" % param in ds: ds["%s_QC" % param].loc[dict(N_POINTS=ii_measured_adj)] = ds[ "%s_ADJUSTED_QC" % param ].loc[dict(N_POINTS=ii_measured_adj)] ds = ds.drop_vars(["%s_ADJUSTED_QC" % param]) if "%s_ERROR" % param in ds and "%s_ADJUSTED_ERROR" % param in ds: ds["%s_ERROR" % param].loc[dict(N_POINTS=ii_measured_adj)] = ds[ "%s_ADJUSTED_ERROR" % param ].loc[dict(N_POINTS=ii_measured_adj)] ds = ds.drop_vars(["%s_ADJUSTED_ERROR" % param]) if core_ds: ds = ds.drop_vars(["%s_DATA_MODE" % param]) return ds def filter_param_by_data_mode( ds: xr.Dataset, param: str, dm: Union[str, List[str]] = ["R", "A", "D"], mask: bool = False, errors: str = "raise", ) -> xr.Dataset: """Filter measurements according to a parameter data mode Filter the dataset to keep points where a parameter is in any of the data mode specified. This method can return the filtered dataset or the filter mask. Notes ----- - Method compatible with core, deep and BGC datasets - Can be applied after the :class:`xarray.Dataset.transform_data_mode` Parameters ---------- ds: :class:`xarray.Dataset` The dataset to work filter param: str Name of the parameter to apply the filter to dm: str, list(str), optional, default=['R', 'A', 'D'] List of DATA_MODE values (string) to keep mask: bool, optional, default=False Determine if we should return the filter mask or the filtered dataset errors: str, optional, default='raise' If ``raise``, raises a InvalidDatasetStructure error if any of the expected variables is not found. If ``ignore``, fails silently and return unmodified dataset. Returns ------- :class:`xarray.Dataset` """ core_ds = False if "%s_DATA_MODE" % param not in ds and param in list_core_parameters(): if "DATA_MODE" not in ds: if errors == "raise": raise InvalidDatasetStructure( "Parameter '%s' data mode not found in this dataset (no 'DATA_MODE')" % param ) else: return ds else: core_ds = True # Create a bgc-like parameter data mode variable: ds["%s_DATA_MODE" % param] = ds["DATA_MODE"].copy() # that will be dropped at the end of the process filter = [] for this_dm in dm: vname = "%s_DATA_MODE" % param if vname not in ds: log.warning("The parameter '%s' has no associated data mode" % vname) else: filter.append(ds[vname] == "%s" % this_dm.upper()) if len(filter) > 0: filter = np.logical_or.reduce(filter) if core_ds: ds = ds.drop_vars(["%s_DATA_MODE" % param]) if mask: return filter else: return ds.loc[dict(N_POINTS=filter)] if len(filter) > 0 else ds def split_data_mode(ds: xr.Dataset) -> xr.Dataset: """Convert PARAMETER_DATA_MODE(N_PROF, N_PARAM) into several <PARAM>_DATA_MODE(N_PROF) variables Using the list of *PARAM* found in ``STATION_PARAMETERS``, this method will create ``N_PARAM`` new variables in the dataset ``*PARAM*_DATA_MODE(N_PROF)``. The variable ``PARAMETER_DATA_MODE`` is drop from the dataset at the end of the process. Returns ------- :class:`xr.Dataset` """ if "STATION_PARAMETERS" in ds and "PARAMETER_DATA_MODE" in ds: u64 = lambda s: "%s%s" % (s, " " * (64 - len(s))) # noqa: E731 params = [p.strip() for p in np.unique(ds["STATION_PARAMETERS"])] for param in params: name = "%s_DATA_MODE" % param.replace("_PARAMETER", "").replace( "PARAMETER_", "" ) mask = ds["STATION_PARAMETERS"] == xr.full_like( ds["STATION_PARAMETERS"], u64(param), dtype=ds["STATION_PARAMETERS"].dtype, ) da = ds["PARAMETER_DATA_MODE"].where(mask, drop=True).isel(N_PARAM=0) da = da.rename(name) da = da.astype(ds["PARAMETER_DATA_MODE"].dtype) ds[name] = da ds = ds.drop_vars("PARAMETER_DATA_MODE") ds.argo.add_history("Transformed with 'split_data_mode'") return ds