Source code for argopy.stores.kerchunker

import fsspec
from typing import List, Union, Dict, Literal
from pathlib import Path
import json
import logging
from packaging import version

from ..utils import to_list
from . import memorystore, filestore

log = logging.getLogger("argopy.stores.kerchunk")


try:
    from kerchunk.hdf import SingleHdf5ToZarr
    from kerchunk.netCDF3 import NetCDF3ToZarr

    HAS_KERCHUNK = True
except ModuleNotFoundError:
    HAS_KERCHUNK = False
    SingleHdf5ToZarr, NetCDF3ToZarr = None, None

try:
    import dask

    HAS_DASK = True
except ModuleNotFoundError:
    HAS_DASK = False
    dask = None
    import concurrent.futures


[docs] class ArgoKerchunker: """ Argo netcdf file kerchunk helper This class is for expert users who wish to test lazy access to remote netcdf files. It is designed to be used through one of the **argopy** stores inheriting from :class:`ArgoStoreProto`. The `kerchunk <https://fsspec.github.io/kerchunk/>`_ library is required only if you need to extract zarr data from a netcdf file, i.e. execute :meth:`ArgoKerchunker.translate`. Notes ----- According to `AWS <https://docs.aws.amazon.com/whitepapers/latest/s3-optimizing-performance-best-practices/use-byte-range-fetches.html>`_, typical sizes for byte-range requests are 8 MB or 16 MB. If you intend to compute kerchunk zarr data on-demand, we don't recommend to use this method on mono or multi profile files that are only a few MB in size, because (ker)-chunking creates a significant performance overhead. Warnings -------- We noticed that kerchunk zarr data for Rtraj files can be insanely larger than the netcdf file itself. This could go from 10Mb to 228Mb ! Examples -------- .. code-block:: python :caption: :class:`ArgoKerchunker` API # Use default memory store to manage kerchunk zarr data: ak = ArgoKerchunker(store='memory') # Use a local file store to keep track of zarr kerchunk data (for later # re-use or sharing): ak = ArgoKerchunker(store='local', root='kerchunk_data_folder') # Use a remote file store to keep track of zarr kerchunk data (for later # re-use or sharing): fs = fsspec.filesystem('dir', path='s3://.../kerchunk_data_folder/', target_protocol='s3') ak = ArgoKerchunker(store=fs) # Methods: ak.supported(ncfile) ak.translate(ncfiles) ak.to_reference(ncfile) ak.pprint(ncfile) .. code-block:: python :caption: Loading one file lazily # Let's consider a remote Argo netcdf file from a s3 server supporting lazy access # (i.e. support byte range requests): ncfile = "argo-gdac-sandbox/pub/dac/coriolis/6903090/6903090_prof.nc" # Simply open the netcdf file lazily: from argopy.stores import s3store ds = s3store().open_dataset(ncfile, lazy=True) # You can also do it with the GDAC fs: from argopy.stores import gdacfs ds = gdacfs('s3').open_dataset("dac/coriolis/6903090/6903090_prof.nc", lazy=True) .. code-block:: python :caption: Translate and save references for a batch of netcdf files # Create an instance that will save netcdf to zarr references on a local # folder at "~/kerchunk_data_folder": ak = ArgoKerchunker(store='local', root='~/kerchunk_data_folder') # Get a dummy list of netcdf files: from argopy import ArgoIndex idx = ArgoIndex(host='s3').search_lat_lon_tim([-70, -55, 30, 45, '2025-01-01', '2025-02-01']) ncfiles = [af.ls_dataset()['prof'] for af in idx.iterfloats()] # Translate and save references for this batch of netcdf files: # (done in parallel, possibly using a Dask client if available) ak.translate(ncfiles, fs=idx.fs['src'], chunker='auto') """
[docs] def __init__( self, store: Union[Literal["memory", "local"], fsspec.AbstractFileSystem] = "memory", root: Union[Path, str] = ".", preload: bool = True, inline_threshold: int = 0, max_chunk_size: int = 0, storage_options: Dict = None, ): """ Parameters ---------- store: str, default='memory' Kerchunk data store, i.e. the file system used to load from and/or save to kerchunk json files root: Path, str, default='.' Use to specify a local folder to base the store preload: bool, default=True Indicate if kerchunk references already on the store should be preloaded or not. inline_threshold: int, default=0 Byte size below which an array will be embedded in the output. Use 0 to disable inlining. This argument is passed to :class:`kerchunk.netCDF3.NetCDF3ToZarr` or :class:`kerchunk.hdf.SingleHdf5ToZarr` max_chunk_size: int, default=0 How big a chunk can be before triggering subchunking. If 0, there is no subchunking, and there is never subchunking for coordinate/dimension arrays. E.g., if an array contains 10,000bytes, and this value is 6000, there will be two output chunks, split on the biggest available dimension. This argument is passed to :class:`kerchunk.netCDF3.NetCDF3ToZarr` only. storage_options: dict, default=None This argument is passed to :class:`kerchunk.netCDF3.NetCDF3ToZarr` or :class:`kerchunk.hdf.SingleHdf5ToZarr` during translation. These in turn, will pass options to fsspec when opening netcdf file. """ # Instance file system to load/save kerchunk json files if store == "memory": self.fs = memorystore() elif store == "local": root = Path(root).expanduser() if root.name == "": self.fs = filestore() else: root.mkdir(parents=True, exist_ok=True) self.fs = fsspec.filesystem("dir", path=root, target_protocol="local") elif isinstance(store, fsspec.AbstractFileSystem): self.fs = store # Passed to fsspec when opening netcdf file: self.storage_options = storage_options if storage_options is not None else {} # List of processed files register: self.kerchunk_references = {} if preload: self.update_kerchunk_references_from_store() self.inline_threshold = inline_threshold """inline_threshold: int Byte size below which an array will be embedded in the output. Use 0 to disable inlining. """ self.max_chunk_size = max_chunk_size """ max_chunk_size: int How big a chunk can be before triggering subchunking. If 0, there is no subchunking, and there is never subchunking for coordinate/dimension arrays. E.g., if an array contains 10,000bytes, and this value is 6000, there will be two output chunks, split on the biggest available dimension. [TBC] """
def __repr__(self): summary = ["<argopy.kerchunker>"] summary.append("- kerchunk data store: %s" % str(self.fs)) summary.append( "- Inline threshold: %i (byte size below which an array will be embedded in the output)" % self.inline_threshold ) summary.append( "- Maximum chunk size: %i (how big a chunk can be before triggering sub-chunking)" % self.max_chunk_size ) n = len(self.kerchunk_references) summary.append("- %i dataset%s listed in store" % (n, "s" if n > 1 else "")) return "\n".join(summary) @property def store_path(self): """Absolute path to the reference store, including protocol""" p = getattr(self.fs, "path", str(Path(".").absolute())) # Ensure the protocol is included for non-local files: if self.fs.fs.protocol[0] == "s3": p = "s3://" + fsspec.core.split_protocol(p)[-1] return p def _ncfile2jsfile(self, ncfile): """Convert a netcdf file path to a data store file path for kerchunk zarr reference (json data)""" return Path(ncfile).name.replace(".nc", "_kerchunk.json") def _ncfile2ncref(self, ncfile: Union[str, Path], fs=None): """Convert a netcdf file path to a key used in internal kerchunk_references""" # return fs.full_path(fs.info(str(ncfile))['name'], protocol=True) return fs.full_path(str(ncfile), protocol=True) def _magic2chunker(self, ncfile, fs): """Get a netcdf file path chunker alias: 'cdf3' or 'hdf5' This is based on the file binary magic value. Raises ------ :class:`ValueError` if file not recognized """ magic = fs.open(ncfile).read(3) if magic == b"CDF": return "cdf3" elif magic == b"\x89HD": return "hdf5" else: raise ValueError("No chunker for this magic: '%s')" % magic)
[docs] def nc2reference( self, ncfile: Union[str, Path], fs=None, chunker: Literal["auto", "cdf3", "hdf5"] = "auto", ): """Compute reference data for a netcdf file (kerchunk json data) This method is intended to be used internally, since it's not using the kerchunk reference store. Users should rather use the :meth:`to_reference` method to avoid to recompute reference data when available on the :class:`ArgoKerchunker` instance. Parameters ---------- ncfile : Union[str, Path] Path to a netcdf file to process fs: None An **argopy** file store, inheriting from :class:`ArgoStoreProto`. chunker : Literal['auto', 'cdf3', 'hdf5'] = 'auto' Define the kerchunker formater to use. Two formater are available: :class:`kerchunk.netCDF3.NetCDF3ToZarr` or :class:`kerchunk.hdf.SingleHdf5ToZarr`: - 'auto': detect and select formater for each netcdf of the ncfiles - 'cdf3': impose use of :class:`kerchunk.netCDF3.NetCDF3ToZarr` - 'hdf5': impose use of :class:`kerchunk.hdf.SingleHdf5ToZarr` Returns ------- dict """ chunker = self._magic2chunker(ncfile, fs) if chunker == "auto" else chunker ncfile_full = self._ncfile2ncref(ncfile, fs=fs) storage_options = self.storage_options.copy() if fs.protocol == 'ftp' and version.parse(fsspec.__version__) < version.parse("2024.10.0"): # We need https://github.com/fsspec/filesystem_spec/pull/1673 storage_options.pop('host', None) storage_options.pop('port', None) if chunker == "cdf3": chunks = NetCDF3ToZarr( ncfile_full, inline_threshold=self.inline_threshold, max_chunk_size=self.max_chunk_size, storage_options=storage_options, ) elif chunker == "hdf5": chunks = SingleHdf5ToZarr( ncfile_full, inline_threshold=self.inline_threshold, storage_options=storage_options, ) kerchunk_data = chunks.translate() kerchunk_jsfile = self._ncfile2jsfile(ncfile) with self.fs.open(kerchunk_jsfile, "wb") as f: f.write(json.dumps(kerchunk_data).encode()) return ncfile_full, kerchunk_jsfile, kerchunk_data
def update_kerchunk_references_from_store(self): """Load kerchunk data already on store""" for f in self.fs.glob("*_kerchunk.json"): with self.fs.open(f, "r") as file: kerchunk_data = json.load(file) for k, v in kerchunk_data["refs"].items(): if k != ".zgroup" and "/0" in k: if Path(v[0]).suffix == ".nc": self.kerchunk_references.update({v[0]: f}) break
[docs] def translate( self, ncfiles: Union[str, Path, List], fs=None, chunker: Literal["first", "auto", "cdf3", "hdf5"] = "first", ): """Translate netcdf file(s) into kerchunk reference data Kerchunk data are saved on the :class:`ArgoKerchunker` instance store. Once translated, netcdf file reference data are internally registered in the :attr:`ArgoKerchunker.kerchunk_references` attribute. If more than 1 netcdf file is provided, the translation is executed in parallel: - if `Dask <https://www.dask.org>`_ is available we use :class:`dask.delayed`/:meth:`dask.compute`, - otherwise we use a :class:`concurrent.futures.ThreadPoolExecutor`. Parameters ---------- ncfiles : Union[str, Path, List] One or more netcdf files to translate fs: None An **argopy** file store, inheriting from :class:`ArgoStoreProto`. chunker : Literal['first', 'auto', 'cdf3', 'hdf5'] = 'first' Define the kerchunker formater to use. Two formater are available: :class:`kerchunk.netCDF3.NetCDF3ToZarr` or :class:`kerchunk.hdf.SingleHdf5ToZarr`: - 'first': detect and select formater from the first netcdf file type - 'auto': detect and select formater for each netcdf of the nc files - 'cdf3': impose use of :class:`kerchunk.netCDF3.NetCDF3ToZarr` for all nc files - 'hdf5': impose use of :class:`kerchunk.hdf.SingleHdf5ToZarr` for all nc files Returns ------- List(dict) See Also -------- :meth:`ArgoKerchunker.to_reference` """ if not HAS_KERCHUNK: raise ModuleNotFoundError("This method requires the 'kerchunk' library") ncfiles = to_list(ncfiles) for i, f in enumerate(ncfiles): ncfiles[i] = str(f) # chunker = self._magic2chunker(ncfiles[0], fs) if chunker == "first" else chunker # if HAS_DASK: tasks = [ dask.delayed(self.nc2reference)(uri, fs=fs, chunker=chunker) for uri in ncfiles ] results = dask.compute(tasks) results = results[0] # ? else: with concurrent.futures.ThreadPoolExecutor() as executor: future_to_url = { executor.submit(self.nc2reference, uri, fs=fs, chunker=chunker): uri for uri in ncfiles } futures = concurrent.futures.as_completed(future_to_url) results = [] for future in futures: results.append(future.result()) for result in results: ncfile, kerchunk_jsfile, _ = result self.kerchunk_references.update({ncfile: kerchunk_jsfile}) return results
[docs] def to_reference(self, ncfile: Union[str, Path], fs=None, overwrite: bool = False): """Return zarr reference data for a given netcdf file path Return data from the instance store if available, otherwise trigger :meth:`ArgoKerchunker.translate` (which save data on the instance data store). This is the method to use in **argopy** file store methods :meth:`ArgoStoreProto.open_dataset` to implement laziness. Parameters ---------- ncfile : Union[str, Path] Path to netcdf file to process fs: None An **argopy** file store, inheriting from :class:`ArgoStoreProto`. Returns ------- dict See Also -------- :meth:`ArgoKerchunker.translate` """ if overwrite: self.translate(ncfile, fs=fs) elif self._ncfile2ncref(ncfile, fs=fs) not in self.kerchunk_references: if self.fs.exists(self._ncfile2jsfile(ncfile)): self.kerchunk_references.update( {self._ncfile2ncref(ncfile, fs=fs): self._ncfile2jsfile(ncfile)} ) else: self.translate(ncfile, fs=fs) # Read and load the kerchunk JSON file: kerchunk_jsfile = self.kerchunk_references[self._ncfile2ncref(ncfile, fs=fs)] with self.fs.open(kerchunk_jsfile, "r") as file: kerchunk_data = json.load(file) # Ensure that reference data corresponds to the target netcdf file: if not overwrite: target_ok = False for key, value in kerchunk_data["refs"].items(): if key not in [".zgroup", ".zattrs"] and "0." in key: if value[0] == self._ncfile2ncref(ncfile, fs=fs): target_ok = True break if not target_ok: kerchunk_data = self.to_reference(ncfile, overwrite=True, fs=fs) return kerchunk_data
[docs] def pprint(self, ncfile: Union[str, Path], params: List[str] = None, fs=None): """Pretty print kerchunk json data for a netcdf file""" params = to_list(params) if params is not None else [] kerchunk_data = self.to_reference(ncfile, fs=fs) # Pretty print JSON data keys_to_select = [".zgroup", ".zattrs", ".zmetadata"] data_to_print = {} for key, value in kerchunk_data["refs"].items(): if key in keys_to_select: if isinstance(value, str): data_to_print[key] = json.loads(value) else: data_to_print[key] = value for p in params: if p == key.split("/")[0]: if isinstance(value, str): data_to_print[key] = json.loads(value) else: data_to_print[key] = value print(json.dumps(data_to_print, indent=4))
[docs] def supported(self, ncfile: Union[str, Path], fs=None) -> bool: """Check if a netcdf file can be accessed through byte ranges For non-local files, the absolute path toward the netcdf file must include the file protocol to return a correct answer. Known Argo GDAC supporting byte ranges: - ftp://ftp.ifremer.fr/ifremer/argo - s3://argo-gdac-sandbox/pub - https://usgodae.org/pub/outgoing/argo - https://argo-gdac-sandbox.s3-eu-west-3.amazonaws.com/pub Not supporting: - https://data-argo.ifremer.fr Parameters ---------- ncfile: str, Path Absolute path toward the netcdf file to assess for lazy support, must include protocol for non-local files. """ try: return fs.first(ncfile) is not None except Exception: log.debug(f"Could not read {ncfile} with {fs}") return False