import numpy as np
import pandas as pd
from functools import reduce
from ..errors import InvalidFetcherAccessPoint
from .checkers import is_box
import collections
try:
collectionsAbc = collections.abc
except AttributeError:
collectionsAbc = collections
[docs]
class Chunker:
"""To chunk fetcher requests"""
# Default maximum chunks size for all possible request parameters
default_chunksize = {
"box": {
"lon": 20, # degree
"lat": 20, # degree
"dpt": 500, # meters/db
"time": 3 * 30,
}, # Days
"wmo": {"wmo": 5, "cyc": 100}, # Nb of floats
} # Nb of cycles
[docs]
def __init__(self, request: dict, chunks: str = "auto", chunksize: dict = {}):
"""Create a request Chunker
Allow to easily split an access point request into chunks
Parameters
----------
request: dict
Access point request to be chunked. One of the following:
- {'box': [lon_min, lon_max, lat_min, lat_max, dpt_min, dpt_max, time_min, time_max]}
- {'box': [lon_min, lon_max, lat_min, lat_max, dpt_min, dpt_max]}
- {'wmo': [wmo1, wmo2, ...], 'cyc': [0,1, ...]}
chunks: 'auto' or dict
Dictionary with request access point as keys and number of chunks to create as values.
Eg: {'wmo':10} will create a maximum of 10 chunks along WMOs.
chunksize: dict, optional
Dictionary with request access point as keys and chunk size as values (used as maximum values in
'auto' chunking).
Eg: {'wmo': 5} will create chunks with as many as 5 WMOs each.
"""
self.request = request
if "box" in self.request:
is_box(self.request["box"])
if len(self.request["box"]) == 8:
self.this_chunker = self._chunker_box4d
elif len(self.request["box"]) == 6:
self.this_chunker = self._chunker_box3d
elif "wmo" in self.request:
self.this_chunker = self._chunker_wmo
else:
raise InvalidFetcherAccessPoint(
"'%s' not valid access point" % ",".join(self.request.keys())
)
default = self.default_chunksize[[k for k in self.request.keys()][0]]
if len(chunksize) == 0: # chunksize = {}
chunksize = default
if not isinstance(chunksize, collectionsAbc.Mapping):
raise ValueError("chunksize must be mappable")
else: # merge with default:
chunksize = {**default, **chunksize}
self.chunksize = collections.OrderedDict(sorted(chunksize.items()))
default = {k: "auto" for k in self.chunksize.keys()}
if chunks == "auto": # auto for all
chunks = default
elif len(chunks) == 0: # chunks = {}, i.e. chunk=1 for all
chunks = {k: 1 for k in self.request}
if not isinstance(chunks, collectionsAbc.Mapping):
raise ValueError("chunks must be 'auto' or mappable")
chunks = {**default, **chunks}
self.chunks = collections.OrderedDict(sorted(chunks.items()))
def _split(self, lst, n=1):
"""Yield successive n-sized chunks from lst"""
for i in range(0, len(lst), n):
yield lst[i : i + n]
def _split_list_bychunknb(self, lst, n=1):
"""Split list in n-imposed chunks of similar size
The last chunk may contain less element than the others, depending on the size of the list.
"""
res = []
s = int(np.floor_divide(len(lst), n))
for i in self._split(lst, s):
res.append(i)
if len(res) > n:
res[n - 1 : :] = [reduce(lambda i, j: i + j, res[n - 1 : :])]
return res
def _split_list_bychunksize(self, lst, max_size=1):
"""Split list in chunks of imposed size
The last chunk may contain less element than the others, depending on the size of the list.
"""
res = []
for i in self._split(lst, max_size):
res.append(i)
return res
def _split_box(self, large_box, n=1, d="x"): # noqa: C901
"""Split a box domain in one direction in n-imposed equal chunks"""
if d == "x":
i_left, i_right = 0, 1
if d == "y":
i_left, i_right = 2, 3
if d == "z":
i_left, i_right = 4, 5
if d == "t":
i_left, i_right = 6, 7
if n == 1:
return [large_box]
boxes = []
if d in ["x", "y", "z"]:
n += 1 # Required because we split in linspace
bins = np.linspace(large_box[i_left], large_box[i_right], n)
for ii, left in enumerate(bins):
if ii < len(bins) - 1:
right = bins[ii + 1]
this_box = large_box.copy()
this_box[i_left] = left
this_box[i_right] = right
boxes.append(this_box)
elif "t" in d:
dates = pd.to_datetime(large_box[i_left : i_right + 1])
date_bounds = [
d.strftime("%Y%m%d%H%M%S")
for d in pd.date_range(dates[0], dates[1], periods=n + 1)
]
for i1, i2 in zip(np.arange(0, n), np.arange(1, n + 1)):
left, right = date_bounds[i1], date_bounds[i2]
this_box = large_box.copy()
this_box[i_left] = left
this_box[i_right] = right
boxes.append(this_box)
return boxes
def _split_this_4Dbox(self, box, nx=1, ny=1, nz=1, nt=1):
box_list = []
split_x = self._split_box(box, n=nx, d="x")
for bx in split_x:
split_y = self._split_box(bx, n=ny, d="y")
for bxy in split_y:
split_z = self._split_box(bxy, n=nz, d="z")
for bxyz in split_z:
split_t = self._split_box(bxyz, n=nt, d="t")
for bxyzt in split_t:
box_list.append(bxyzt)
return box_list
def _split_this_3Dbox(self, box, nx=1, ny=1, nz=1):
box_list = []
split_x = self._split_box(box, n=nx, d="x")
for bx in split_x:
split_y = self._split_box(bx, n=ny, d="y")
for bxy in split_y:
split_z = self._split_box(bxy, n=nz, d="z")
for bxyz in split_z:
box_list.append(bxyz)
return box_list
def _chunker_box4d(self, request, chunks, chunks_maxsize): # noqa: C901
BOX = request["box"]
n_chunks = chunks
for axis, n in n_chunks.items():
if n == "auto":
if axis == "lon":
Lx = BOX[1] - BOX[0]
if Lx > chunks_maxsize["lon"]: # Max box size in longitude
n_chunks["lon"] = int(
np.ceil(np.divide(Lx, chunks_maxsize["lon"]))
)
else:
n_chunks["lon"] = 1
if axis == "lat":
Ly = BOX[3] - BOX[2]
if Ly > chunks_maxsize["lat"]: # Max box size in latitude
n_chunks["lat"] = int(
np.ceil(np.divide(Ly, chunks_maxsize["lat"]))
)
else:
n_chunks["lat"] = 1
if axis == "dpt":
Lz = BOX[5] - BOX[4]
if Lz > chunks_maxsize["dpt"]: # Max box size in depth
n_chunks["dpt"] = int(
np.ceil(np.divide(Lz, chunks_maxsize["dpt"]))
)
else:
n_chunks["dpt"] = 1
if axis == "time":
Lt = np.timedelta64(
pd.to_datetime(BOX[7]) - pd.to_datetime(BOX[6]), "D"
)
MaxLen = np.timedelta64(chunks_maxsize["time"], "D")
if Lt > MaxLen: # Max box size in time
n_chunks["time"] = int(np.ceil(np.divide(Lt, MaxLen)))
else:
n_chunks["time"] = 1
boxes = self._split_this_4Dbox(
BOX,
nx=n_chunks["lon"],
ny=n_chunks["lat"],
nz=n_chunks["dpt"],
nt=n_chunks["time"],
)
return {"chunks": sorted(n_chunks), "values": boxes}
def _chunker_box3d(self, request, chunks, chunks_maxsize):
BOX = request["box"]
n_chunks = chunks
for axis, n in n_chunks.items():
if n == "auto":
if axis == "lon":
Lx = BOX[1] - BOX[0]
if Lx > chunks_maxsize["lon"]: # Max box size in longitude
n_chunks["lon"] = int(
np.floor_divide(Lx, chunks_maxsize["lon"])
)
else:
n_chunks["lon"] = 1
if axis == "lat":
Ly = BOX[3] - BOX[2]
if Ly > chunks_maxsize["lat"]: # Max box size in latitude
n_chunks["lat"] = int(
np.floor_divide(Ly, chunks_maxsize["lat"])
)
else:
n_chunks["lat"] = 1
if axis == "dpt":
Lz = BOX[5] - BOX[4]
if Lz > chunks_maxsize["dpt"]: # Max box size in depth
n_chunks["dpt"] = int(
np.floor_divide(Lz, chunks_maxsize["dpt"])
)
else:
n_chunks["dpt"] = 1
# if axis == 'time':
# Lt = np.timedelta64(pd.to_datetime(BOX[5]) - pd.to_datetime(BOX[4]), 'D')
# MaxLen = np.timedelta64(chunks_maxsize['time'], 'D')
# if Lt > MaxLen: # Max box size in time
# n_chunks['time'] = int(np.floor_divide(Lt, MaxLen))
# else:
# n_chunks['time'] = 1
boxes = self._split_this_3Dbox(
BOX, nx=n_chunks["lon"], ny=n_chunks["lat"], nz=n_chunks["dpt"]
)
return {"chunks": sorted(n_chunks), "values": boxes}
def _chunker_wmo(self, request, chunks, chunks_maxsize):
WMO = request["wmo"]
n_chunks = chunks
if n_chunks["wmo"] == "auto":
wmo_grps = self._split_list_bychunksize(WMO, max_size=chunks_maxsize["wmo"])
else:
n = np.min([n_chunks["wmo"], len(WMO)])
wmo_grps = self._split_list_bychunknb(WMO, n=n)
n_chunks["wmo"] = len(wmo_grps)
return {"chunks": sorted(n_chunks), "values": wmo_grps}
def fit_transform(self):
"""Chunk a fetcher request
Returns
-------
list
"""
self._results = self.this_chunker(self.request, self.chunks, self.chunksize)
# self.chunks = self._results['chunks']
return self._results["values"]