"""
Utility module for optical modelling diagnostics.
These functions are not really meant to be used directly, they consume raw 1D array of data.
You should rather use the :class:`xarray.Dataset.argo.optic` extension.
"""
import numpy as np
from typing import Tuple, Annotated, Literal, TypeVar
from scipy.ndimage import median_filter
from scipy.ndimage import uniform_filter1d
import numpy.typing as npt
DType = TypeVar("DType", bound=np.generic)
Array1D = Annotated[npt.NDArray[DType], Literal["N"]]
try:
import gsw
with_gsw = True
except ModuleNotFoundError:
with_gsw = False
[docs]
def Z_euphotic(
axis: np.ndarray,
par: np.ndarray,
method: Literal["percentage", "KdPAR"] = "percentage",
max_surface: float = 5.0,
layer_min: float = 10.0,
layer_max: float = 50.0,
) -> float:
"""Depth of the euphotic zone from unlabeled arrays of pressure and PAR
Two methods are available:
- **percentage**: Depth for which PAR is 1% that of the surface value, defined as the maximum above ``max_surface``
- **KdPAR**: -log(0.01) times the PAR attenuation coefficient over the layer between ``layer_min`` and ``layer_max``
See :class:`xarray.Dataset.argo.optic.Zeu` for more details on the methodology.
Parameters
----------
axis: numpy.ndarray, 1 dimensional
Vertical axis values, pressure or depth, positive, increasing downward, typically from the ``PRES`` parameter of an Argo float.
par: array_like, 1 dimensional
Photosynthetically available radiation, typically from the ``DOWNWELLING_PAR`` parameter of an Argo float.
method: Literal['percentage', 'KdPAR'] = 'percentage'
Computation method to use.
max_surface: float, optional, default: 5.
Used only with the ``percentage`` method.
Maximum value of the vertical axis above which the maximum PAR value is considered surface values.
layer_min: float, optional, default: 10.
layer_max: float, optional, default: 50.
Used only with the ``KdPAR`` method.
Minimum and maximum values of the vertical axis over which to compute the PAR attenuation coefficient.
Returns
-------
float
Estimate of the euphotic layer depth
See Also
--------
:class:`xarray.Dataset.argo.optic.Zeu`
"""
idx = ~np.logical_or(np.isnan(axis), np.isnan(par))
axis = axis[idx]
par = par[idx]
if method == "percentage":
try:
Surface_levels = np.where(axis <= max_surface)[0]
except:
Surface_levels = np.ndarray()
pass
result = np.nan
if Surface_levels.shape[0] > 0:
Surface_value = np.max(par[Surface_levels])
if np.any(par > (Surface_value / 100)):
index_1_percent = np.argmin(np.abs(par - (Surface_value / 100)))
result = axis[index_1_percent]
elif method == "KdPAR":
result = np.nan
layer_index = (axis >= layer_min) & (axis <= layer_max)
if np.any(layer_index):
layer_size = (
axis[layer_index][0] - axis[layer_index][-1]
) # 0 index is below (higher value) the -1 index
Kd_layer = (
-1
/ layer_size
* (np.log(par[layer_index][0]) - np.log(par[layer_index][-1]))
)
result = -np.log(0.01) / Kd_layer
return np.array(result)
[docs]
def Z_firstoptic(*args, **kwargs) -> float:
"""First optical depth from unlabeled arrays of pressure and PAR
See :class:`xarray.Dataset.argo.optic.Zpd` for more details on the methodology.
Parameters
----------
args, kwargs:
All arguments are passed to :meth:`Z_euphotic`.
Returns
-------
float
Estimate of the first optical depth
See Also
--------
:class:`xarray.Dataset.argo.optic.Zpd`, :meth:`Z_euphotic`, :mod:`argopy.utils.optical_modeling`
"""
Zeu = Z_euphotic(*args, **kwargs)
return Zeu / 4.6
[docs]
def Z_iPAR_threshold(
axis: np.ndarray, par: np.ndarray, threshold: float = 15.0, tolerance: float = 5.0
) -> float:
"""Depth where unlabelled array of PAR reaches some threshold value (closest point)
The closest level in the vertical axis for which PAR is about a ``threshold`` value, with some tolerance.
See :class:`xarray.Dataset.argo.optic.Z_iPAR_threshold` for more details on the methodology.
Parameters
----------
axis: array_like, 1 dimensional
Vertical axis values, pressure or depth, positive, increasing downward, typically from the ``PRES``
parameter of an Argo float.
par: array_like, 1 dimensional
Photosynthetically available radiation, typically from the ``DOWNWELLING_PAR`` parameter of an Argo float.
threshold: float, optional, default: 15.
Target value for ``par``. We use 15 as the default because it is the theoretical value below which
the Fchla is no longer quenched (For correction of NPQ purposes).
tolerance: float, optional, default: 5.
PAR value tolerance with regard to the target threshold. If the closest PAR value to ``threshold`` is distant by more than ``tolerance``, consider result invalid and return NaN.
Returns
-------
float
See Also
--------
:class:`xarray.Dataset.argo.optic.Z_iPAR_threshold`
"""
# index = np.argmin(np.abs(par - 15))
# Z_iPAR = axis[index]
#
# if not par[index] > 10 or not par[index] < 20:
# return np.nan
# else:
# return Z_iPAR
iz = np.argmin(np.abs(par - threshold))
par_z = par[iz]
result = axis[iz]
if np.abs(par_z - threshold) >= tolerance:
return np.nan
else:
return np.array(result)
[docs]
def DCM(
CHLA: np.ndarray,
CHLA_axis: np.ndarray,
BBP: np.ndarray,
BBP_axis: np.ndarray,
max_depth: float = 300.0,
resolution_threshold: float = 3.0,
median_filter_size: int = 5,
surface_layer: float = 15.0,
median_filter_size_bbp: int = 7,
uniform_filter1d_size_bbp: int = 5,
) -> Tuple[Literal["NO ", "DBM", "DAM"], float, float]:
"""Search and qualify Deep Chlorophyll Maxima from unlabeled arrays of pressure and CHLA/BBP
See :class:`xarray.Dataset.argo.optic.DCM` for more details on the methodology.
Parameters
----------
CHLA: array_like, 1 dimensional
Chlorophyl-a concentration profile data.
CHLA_axis: array_like, 1 dimensional
Vertical axis values, pressure or depth, positive, increasing downward, for CHLA. Typically, from the ``PRES``
parameter of an Argo float.
BBP: array_like, 1 dimensional
Particulate backscattering coefficient profile data.
BBP_axis: array_like, 1 dimensional
Vertical axis values, pressure or depth, positive, increasing downward, for BBP. Typically, from the ``PRES``
parameter of an Argo float.
max_depth: float, optional, default: 300.
Maximum depth allowed for a deep CHLA maximum to be found.
resolution_threshold: float, optional, default: 3.
CHLA vertical axis resolution threshold below which a smoother is applied.
median_filter_size: int, optional, default: 5
Size of the :func:`scipy.ndimage.median_filter` filter used with CHLA.
surface_layer: float, optional, default: 15.
Depth value defining the surface layer above which twice the median CHLA value may qualify a DCM as such.
Other Parameters
----------------
median_filter_size_bbp: int, optional, default: 7
Size of the :func:`scipy.ndimage.median_filter` filter used with BBP.
uniform_filter1d_size_bbp: int, optional, default: 7
Size of the :func:`scipy.ndimage.uniform_filter1d` filter used with BBP.
Returns
-------
str[3], Literal['NO ', 'DBM', 'DAM']
The type of Deep Chlorophyll Maxima (DCM). Possible values are:
- 'NO ': No DCM found above ``max_depth``
- 'DBM' : Deep Biomass Maximum
- 'DAM' : Deep photoAcclimation Maximum
float
Depth of the DCM, from the un-smoothed profile
float
Amplitude the DCM: CHLA value, from the un-smoothed profile.
See Also
--------
:class:`xarray.Dataset.argo.optic.DCM`
"""
# Possibly smooth the profile:
if np.diff(CHLA_axis[CHLA_axis <= max_depth]).mean().round() < resolution_threshold:
# Rolling median window:
CHLA_smooth = median_filter(CHLA, median_filter_size, mode="nearest")
else:
CHLA_smooth = np.copy(CHLA)
# Identify the CHLA maximum in the appropriate layer:
layer_CHLA = CHLA_axis <= max_depth
if ~np.any(layer_CHLA):
return "NO", np.nan, np.nan
Max_CHLA_depth = CHLA_axis[layer_CHLA][np.argmax(CHLA_smooth[layer_CHLA])]
Max_CHLA = CHLA[layer_CHLA][np.argmax(CHLA_smooth[layer_CHLA])]
# Qualify CHLA maximum as a DCM:
if np.any(CHLA_axis <= surface_layer) and Max_CHLA > 2 * np.median(
CHLA[CHLA_axis <= surface_layer]
):
DCM_type = "DCM"
else:
return "NO", np.nan, np.nan
# Check for a potential cooccurrence of the DCM depth with any deep peak of BBP:
if DCM_type == "DCM":
if (
np.diff(BBP_axis[BBP_axis <= max_depth]).mean().round()
< resolution_threshold
):
# Rolling median window 5, Rolling mean 7
BBP_smooth = median_filter(BBP, median_filter_size_bbp, mode="nearest")
BBP_smooth = uniform_filter1d(
BBP_smooth, uniform_filter1d_size_bbp, mode="nearest"
)
else:
BBP_smooth = np.copy(BBP)
# BBP maximum was searched for from the smoothed BBP profile in a layer of 20 meters around the DCM
layer_bbp = (BBP_axis >= Max_CHLA_depth - 20) & (
BBP_axis <= Max_CHLA_depth + 20
)
# once the bbp maximum and depth were identified on the smoothed profile,
# closest bbp measurements on the unsmoothed profile were accordingly identified.
Max_bbp = BBP[layer_bbp][np.argmax(BBP_smooth[layer_bbp])]
# The profile was defined as a DBM if the BBP maximum was more than 1.3
# times the BBP minimum within the top 15 meters.
if Max_bbp > 1.3 * np.min(BBP[BBP_axis <= surface_layer]):
DCM_type = "DBM" # Deep Biomass Maximum
else:
DCM_type = "DAM" # Deep photoAcclimation Maximum
if DCM_type == "NO":
return "%3s" % DCM_type, np.nan, np.nan
else:
return "%3s" % DCM_type, Max_CHLA_depth, Max_CHLA
def MLD_Func(PRES, PSAL, TEMP, LAT, LON):
"""
Parameters
----------
Process potential density using gsw package
Return MLD with Boyer Montégut method with threshold of σ(10m) + 0.03 kg.m-3
"""
SA = gsw.SA_from_SP(PSAL, PRES, LON, LAT)
CT = gsw.CT_from_t(SA, TEMP, PRES)
density = gsw.sigma0(SA, CT)
depth_density = PRES[~np.isnan(density)]
density = density[~np.isnan(density)]
if not any(depth_density < 10) or all(depth_density < 10):
return np.nan
else:
index_10m = np.argmin(np.abs(depth_density - 10))
density_at_10m = density[index_10m]
index = np.min(
np.where(density[index_10m:] > density_at_10m + 0.03) + index_10m
)
MLD = depth_density[index]
return MLD
def time_UTC_tolocal(time_64, longitude):
delta = 60 * longitude / 15 # 60 min = 15°
local = time_64 + np.timedelta64(int(delta), "m")
return local
def time_UTC_to_offset(time_64, longitude):
if abs(longitude) <= 7.5:
# return time_64
return 0
elif abs(longitude) > 7.5 and abs(longitude) < 15:
local = time_64 + np.timedelta64(int(1 * np.sign(longitude)), "h")
# return local
return 1 * np.sign(longitude)
else:
offset_hours = int((abs(longitude) + 7.5) / 15)
local = time_64 + np.timedelta64(int(offset_hours * np.sign(longitude)), "h")
# return local
return offset_hours * np.sign(longitude)
def get_solar_angle(LATITUDE, LONGITUDE, JULD):
import pvlib
location = pvlib.location.Location(LATITUDE, LONGITUDE)
solar_position = location.get_solarposition(JULD)
return solar_position["apparent_elevation"].values