Source code for argopy.utils.chunking

import numpy as np
import pandas as pd
from functools import reduce
from ..errors import InvalidFetcherAccessPoint
from .checkers import is_box

import collections

try:
    collectionsAbc = collections.abc
except AttributeError:
    collectionsAbc = collections


[docs] class Chunker: """To chunk fetcher requests""" # Default maximum chunks size for all possible request parameters default_chunksize = { "box": { "lon": 20, # degree "lat": 20, # degree "dpt": 500, # meters/db "time": 3 * 30, }, # Days "wmo": {"wmo": 5, "cyc": 100}, # Nb of floats } # Nb of cycles
[docs] def __init__(self, request: dict, chunks: str = "auto", chunksize: dict = {}): """Create a request Chunker Allow to easily split an access point request into chunks Parameters ---------- request: dict Access point request to be chunked. One of the following: - {'box': [lon_min, lon_max, lat_min, lat_max, dpt_min, dpt_max, time_min, time_max]} - {'box': [lon_min, lon_max, lat_min, lat_max, dpt_min, dpt_max]} - {'wmo': [wmo1, wmo2, ...], 'cyc': [0,1, ...]} chunks: 'auto' or dict Dictionary with request access point as keys and number of chunks to create as values. Eg: {'wmo':10} will create a maximum of 10 chunks along WMOs. chunksize: dict, optional Dictionary with request access point as keys and chunk size as values (used as maximum values in 'auto' chunking). Eg: {'wmo': 5} will create chunks with as many as 5 WMOs each. """ self.request = request if "box" in self.request: is_box(self.request["box"]) if len(self.request["box"]) == 8: self.this_chunker = self._chunker_box4d elif len(self.request["box"]) == 6: self.this_chunker = self._chunker_box3d elif "wmo" in self.request: self.this_chunker = self._chunker_wmo else: raise InvalidFetcherAccessPoint( "'%s' not valid access point" % ",".join(self.request.keys()) ) default = self.default_chunksize[[k for k in self.request.keys()][0]] if len(chunksize) == 0: # chunksize = {} chunksize = default if not isinstance(chunksize, collectionsAbc.Mapping): raise ValueError("chunksize must be mappable") else: # merge with default: chunksize = {**default, **chunksize} self.chunksize = collections.OrderedDict(sorted(chunksize.items())) default = {k: "auto" for k in self.chunksize.keys()} if chunks == "auto": # auto for all chunks = default elif len(chunks) == 0: # chunks = {}, i.e. chunk=1 for all chunks = {k: 1 for k in self.request} if not isinstance(chunks, collectionsAbc.Mapping): raise ValueError("chunks must be 'auto' or mappable") chunks = {**default, **chunks} self.chunks = collections.OrderedDict(sorted(chunks.items()))
def _split(self, lst, n=1): """Yield successive n-sized chunks from lst""" for i in range(0, len(lst), n): yield lst[i : i + n] def _split_list_bychunknb(self, lst, n=1): """Split list in n-imposed chunks of similar size The last chunk may contain less element than the others, depending on the size of the list. """ res = [] s = int(np.floor_divide(len(lst), n)) for i in self._split(lst, s): res.append(i) if len(res) > n: res[n - 1 : :] = [reduce(lambda i, j: i + j, res[n - 1 : :])] return res def _split_list_bychunksize(self, lst, max_size=1): """Split list in chunks of imposed size The last chunk may contain less element than the others, depending on the size of the list. """ res = [] for i in self._split(lst, max_size): res.append(i) return res def _split_box(self, large_box, n=1, d="x"): # noqa: C901 """Split a box domain in one direction in n-imposed equal chunks""" if d == "x": i_left, i_right = 0, 1 if d == "y": i_left, i_right = 2, 3 if d == "z": i_left, i_right = 4, 5 if d == "t": i_left, i_right = 6, 7 if n == 1: return [large_box] boxes = [] if d in ["x", "y", "z"]: n += 1 # Required because we split in linspace bins = np.linspace(large_box[i_left], large_box[i_right], n) for ii, left in enumerate(bins): if ii < len(bins) - 1: right = bins[ii + 1] this_box = large_box.copy() this_box[i_left] = left this_box[i_right] = right boxes.append(this_box) elif "t" in d: dates = pd.to_datetime(large_box[i_left : i_right + 1]) date_bounds = [ d.strftime("%Y%m%d%H%M%S") for d in pd.date_range(dates[0], dates[1], periods=n + 1) ] for i1, i2 in zip(np.arange(0, n), np.arange(1, n + 1)): left, right = date_bounds[i1], date_bounds[i2] this_box = large_box.copy() this_box[i_left] = left this_box[i_right] = right boxes.append(this_box) return boxes def _split_this_4Dbox(self, box, nx=1, ny=1, nz=1, nt=1): box_list = [] split_x = self._split_box(box, n=nx, d="x") for bx in split_x: split_y = self._split_box(bx, n=ny, d="y") for bxy in split_y: split_z = self._split_box(bxy, n=nz, d="z") for bxyz in split_z: split_t = self._split_box(bxyz, n=nt, d="t") for bxyzt in split_t: box_list.append(bxyzt) return box_list def _split_this_3Dbox(self, box, nx=1, ny=1, nz=1): box_list = [] split_x = self._split_box(box, n=nx, d="x") for bx in split_x: split_y = self._split_box(bx, n=ny, d="y") for bxy in split_y: split_z = self._split_box(bxy, n=nz, d="z") for bxyz in split_z: box_list.append(bxyz) return box_list def _chunker_box4d(self, request, chunks, chunks_maxsize): # noqa: C901 BOX = request["box"] n_chunks = chunks for axis, n in n_chunks.items(): if n == "auto": if axis == "lon": Lx = BOX[1] - BOX[0] if Lx > chunks_maxsize["lon"]: # Max box size in longitude n_chunks["lon"] = int( np.ceil(np.divide(Lx, chunks_maxsize["lon"])) ) else: n_chunks["lon"] = 1 if axis == "lat": Ly = BOX[3] - BOX[2] if Ly > chunks_maxsize["lat"]: # Max box size in latitude n_chunks["lat"] = int( np.ceil(np.divide(Ly, chunks_maxsize["lat"])) ) else: n_chunks["lat"] = 1 if axis == "dpt": Lz = BOX[5] - BOX[4] if Lz > chunks_maxsize["dpt"]: # Max box size in depth n_chunks["dpt"] = int( np.ceil(np.divide(Lz, chunks_maxsize["dpt"])) ) else: n_chunks["dpt"] = 1 if axis == "time": Lt = np.timedelta64( pd.to_datetime(BOX[7]) - pd.to_datetime(BOX[6]), "D" ) MaxLen = np.timedelta64(chunks_maxsize["time"], "D") if Lt > MaxLen: # Max box size in time n_chunks["time"] = int(np.ceil(np.divide(Lt, MaxLen))) else: n_chunks["time"] = 1 boxes = self._split_this_4Dbox( BOX, nx=n_chunks["lon"], ny=n_chunks["lat"], nz=n_chunks["dpt"], nt=n_chunks["time"], ) return {"chunks": sorted(n_chunks), "values": boxes} def _chunker_box3d(self, request, chunks, chunks_maxsize): BOX = request["box"] n_chunks = chunks for axis, n in n_chunks.items(): if n == "auto": if axis == "lon": Lx = BOX[1] - BOX[0] if Lx > chunks_maxsize["lon"]: # Max box size in longitude n_chunks["lon"] = int( np.floor_divide(Lx, chunks_maxsize["lon"]) ) else: n_chunks["lon"] = 1 if axis == "lat": Ly = BOX[3] - BOX[2] if Ly > chunks_maxsize["lat"]: # Max box size in latitude n_chunks["lat"] = int( np.floor_divide(Ly, chunks_maxsize["lat"]) ) else: n_chunks["lat"] = 1 if axis == "dpt": Lz = BOX[5] - BOX[4] if Lz > chunks_maxsize["dpt"]: # Max box size in depth n_chunks["dpt"] = int( np.floor_divide(Lz, chunks_maxsize["dpt"]) ) else: n_chunks["dpt"] = 1 # if axis == 'time': # Lt = np.timedelta64(pd.to_datetime(BOX[5]) - pd.to_datetime(BOX[4]), 'D') # MaxLen = np.timedelta64(chunks_maxsize['time'], 'D') # if Lt > MaxLen: # Max box size in time # n_chunks['time'] = int(np.floor_divide(Lt, MaxLen)) # else: # n_chunks['time'] = 1 boxes = self._split_this_3Dbox( BOX, nx=n_chunks["lon"], ny=n_chunks["lat"], nz=n_chunks["dpt"] ) return {"chunks": sorted(n_chunks), "values": boxes} def _chunker_wmo(self, request, chunks, chunks_maxsize): WMO = request["wmo"] n_chunks = chunks if n_chunks["wmo"] == "auto": wmo_grps = self._split_list_bychunksize(WMO, max_size=chunks_maxsize["wmo"]) else: n = np.min([n_chunks["wmo"], len(WMO)]) wmo_grps = self._split_list_bychunknb(WMO, n=n) n_chunks["wmo"] = len(wmo_grps) return {"chunks": sorted(n_chunks), "values": wmo_grps} def fit_transform(self): """Chunk a fetcher request Returns ------- list """ self._results = self.this_chunker(self.request, self.chunks, self.chunksize) # self.chunks = self._results['chunks'] return self._results["values"]