Parallel data fetching#
Sometimes you may find that your request takes a long time to fetch, or simply does not even succeed. This is probably because youβre trying to fetch a large amount of data.
In this case, you can try to let argopy chunks your request into smaller pieces and have them fetched in parallel
for you. This is done with the data fetcher argument, or global option, parallel
.
Parallelization can be tuned using arguments chunks
and chunks_maxsize
.
This goes by default like this:
In [1]: from argopy import DataFetcher
# Define a box to load (large enough to trigger chunking):
In [2]: box = [-60, -30, 40.0, 60.0, 0.0, 100.0, "2007-01-01", "2007-04-01"]
# Instantiate a parallel fetcher:
In [3]: f = DataFetcher(parallel=True).region(box)
Note that you can also use the option progress
to display a progress bar during fetching.
Then, simply trigger data fetching as usual:
In [4]: %%time
...: ds = f.to_xarray() # or .load().data
...:
CPU times: user 328 ms, sys: 5.14 ms, total: 333 ms
Wall time: 14.3 s
Parallelization methods#
3 methods are available to set-up your data fetching requests in parallel:
multi-threading with a
concurrent.futures.ThreadPoolExecutor
,multi-processing with a
concurrent.futures.ProcessPoolExecutor
,A Dask Cluster identified by its client.
The argopy parallelization method is set with the parallel
option (global or of the fetcher), which can take one of the following values:
a boolean
True
orFalse
,a string:
thread
orprocess
,or a Dask
client
object.
In the case of setting a parallel=True
boolean value, argopy will rely on using the default parallelization method defined by the option parallel_default_method
.
You have several ways to specify which parallelization methods you want to use:
using argopy global options:
In [5]: import argopy
In [6]: argopy.set_options(parallel=True) # Rq: Fall back on using: parallel_default_method='thread'
Out[6]: <argopy.options.set_options at 0x7b09529f2c50>
in a temporary context:
In [7]: with argopy.set_options(parallel='process'):
...: fetcher = DataFetcher()
...:
with an argument in the data fetcher:
In [8]: fetcher = DataFetcher(parallel='process')
Caution
To parallelize your fetcher is useful to handle large region of data, but it can also add a significant overhead on reasonable size requests that may lead to degraded performances. So, we do not recommend for you to use the parallel option systematically.
Benchmarking the current argopy processing chain has shown that most of the time necessary to fetch data is spent in waiting response for the data server and in merging chunks of data. There is currently no possibility to avoid chunks merging and the data server response time is out of scope for argopy.
Caution
You may have different dataset sizes with and without the
parallel
option. This may happen if one of the chunk data
fetching fails. By default, data fetching of multiple resources fails
with a warning. You can change this behaviour with the option
errors
of the to_xarray()
fetcher methods, just set it to
raise
like this:
DataFetcher(parallel=True).region(BOX).to_xarray(errors='raise')
You can also use silent
to simply hide all messages during fetching.
Number of chunks: chunks
#
To see how many chunks your request has been split into, you can look at
the uri
property of the fetcher, it gives you the list of paths
toward data:
# Create a large box:
In [9]: box = [-60, 0, 0.0, 60.0, 0.0, 500.0, "2007", "2010"]
# Init a parallel fetcher:
In [10]: fetcher = DataFetcher(parallel=True).region(box)
In [11]: print(len(fetcher.uri))
117
To control chunking, you can use the chunks
option that specifies the number of chunks in each of the direction:
lon
,lat
,dpt
andtime
for a region fetching,wmo
for a float and profile fetching.
Example:
In [12]: fetcher = DataFetcher(parallel=True, chunks={'lon': 5}).region(box)
In [13]: len(fetcher.uri) # Check the number of chunks
Out[13]: 195
This creates 195 chunks, and 5 along the longitudinale direction, as requested.
When the chunks
option is not specified for a given direction, it
relies on auto-chunking using pre-defined chunk maximum sizes (see
below). In the case above, auto-chunking appends also along latitude,
depth and time; this explains why we have 195 and not only 5 chunks.
To chunk the request along a single direction, set explicitly all the
other directions to 1
:
# Init a parallel fetcher:
In [14]: fetcher = DataFetcher(parallel=True,
....: chunks={'lon': 5, 'lat':1, 'dpt':1, 'time':1}).region(box)
....:
# Check the number of chunks:
In [15]: len(fetcher.uri)
Out[15]: 5
We now have 5 chunks along longitude, check out the URLs parameter in the list of URIs:
In [16]: for uri in fetcher.uri:
....: print("&".join(uri.split("&")[1:-2])) # Display only the relevant URL part
....:
longitude>=-60.0&longitude<=-48.0&latitude>=0.0&latitude<=60.0&pres>=0.0&pres<=500.0&time>=1167609600.0&time<=1262304000.0&latitude!=NaN&longitude!=NaN
longitude>=-48.0&longitude<=-36.0&latitude>=0.0&latitude<=60.0&pres>=0.0&pres<=500.0&time>=1167609600.0&time<=1262304000.0&latitude!=NaN&longitude!=NaN
longitude>=-36.0&longitude<=-24.0&latitude>=0.0&latitude<=60.0&pres>=0.0&pres<=500.0&time>=1167609600.0&time<=1262304000.0&latitude!=NaN&longitude!=NaN
longitude>=-24.0&longitude<=-12.0&latitude>=0.0&latitude<=60.0&pres>=0.0&pres<=500.0&time>=1167609600.0&time<=1262304000.0&latitude!=NaN&longitude!=NaN
longitude>=-12.0&longitude<=0.0&latitude>=0.0&latitude<=60.0&pres>=0.0&pres<=500.0&time>=1167609600.0&time<=1262304000.0&latitude!=NaN&longitude!=NaN
Note
You may notice that if you run the last command with the argovis fetcher, you will still have more than 5 chunks (i.e. 65). This is because argovis is limited to 3 months length requests. So, for this request that is 3 years long, argopy ends up with 13 chunks along time, times 5 chunks in longitude, leading to 65 chunks in total.
Warning
The gdac
fetcher and the float
and profile
access points of the argovis
fetcher use a list of resources than are not chunked but fetched in parallel using a batch queue.
Size of chunks: chunks_maxsize
#
The default chunk size for each access point dimensions are:
Access point dimension |
Maximum chunk size |
---|---|
πΊ region / lon |
20 deg |
πΊ region / lat |
20 deg |
πΊ region / dpt |
500 m or db |
πΊ region / time |
90 days |
π€ float / wmo |
5 |
β profile / wmo |
5 |
These default values are used to chunk data when the chunks
parameter key is set to auto
.
But you can modify the maximum chunk size allowed in each of the
possible directions. This is done with the option
chunks_maxsize
.
For instance if you want to make sure that your chunks are not larger then 100 meters (db) in depth (pressure), you can use:
# Create a large box:
In [17]: box = [-60, -10, 40.0, 60.0, 0.0, 500.0, "2007", "2010"]
# Init a parallel fetcher:
In [18]: fetcher = DataFetcher(parallel=True,
....: chunks_maxsize={'dpt': 100}).region(box)
....:
# Check number of chunks:
In [19]: len(fetcher.uri)
Out[19]: 195
Since this creates a large number of chunks, letβs do this again and
combine with the option chunks
to see easily whatβs going on:
# Init a parallel fetcher with chunking along the vertical axis alone:
In [20]: fetcher = DataFetcher(parallel=True,
....: chunks_maxsize={'dpt': 100},
....: chunks={'lon':1, 'lat':1, 'dpt':'auto', 'time':1}).region(box)
....:
In [21]: for uri in fetcher.uri:
....: print("http: ... ", "&".join(uri.split("&")[1:-2])) # Display only the relevant URL part
....:
http: ... longitude>=-60&longitude<=-10&latitude>=40.0&latitude<=60.0&pres>=0.0&pres<=100.0&time>=1167609600.0&time<=1262304000.0&latitude!=NaN&longitude!=NaN
http: ... longitude>=-60&longitude<=-10&latitude>=40.0&latitude<=60.0&pres>=100.0&pres<=200.0&time>=1167609600.0&time<=1262304000.0&latitude!=NaN&longitude!=NaN
http: ... longitude>=-60&longitude<=-10&latitude>=40.0&latitude<=60.0&pres>=200.0&pres<=300.0&time>=1167609600.0&time<=1262304000.0&latitude!=NaN&longitude!=NaN
http: ... longitude>=-60&longitude<=-10&latitude>=40.0&latitude<=60.0&pres>=300.0&pres<=400.0&time>=1167609600.0&time<=1262304000.0&latitude!=NaN&longitude!=NaN
http: ... longitude>=-60&longitude<=-10&latitude>=40.0&latitude<=60.0&pres>=400.0&pres<=500.0&time>=1167609600.0&time<=1262304000.0&latitude!=NaN&longitude!=NaN
You can see, that the pres
argument of this erddap list of URLs
define layers not thicker than the requested 100db.
With the profile
and float
access points, you can use the
wmo
keyword to control the number of WMOs in each chunks.
In [22]: WMO_list = [6902766, 6902772, 6902914, 6902746, 6902916, 6902915, 6902757, 6902771]
# Init a parallel fetcher with chunking along the list of WMOs:
In [23]: fetcher = DataFetcher(parallel=True,
....: chunks_maxsize={'wmo': 3}).float(WMO_list)
....:
In [24]: for uri in fetcher.uri:
....: print("http: ... ", "&".join(uri.split("&")[1:-2])) # Display only the relevant URL part
....:
http: ... platform_number=~"6902766|6902772|6902914"&latitude!=NaN&longitude!=NaN
http: ... platform_number=~"6902746|6902916|6902915"&latitude!=NaN&longitude!=NaN
http: ... platform_number=~"6902757|6902771"&latitude!=NaN&longitude!=NaN
You see here, that this request for 8 floats is split in chunks with no more that 3 floats each.
Warning
At this point, there is no mechanism to chunk requests along cycle numbers for the profile
access point. See #362.
Working with a Dask Cluster#
The parallel
option/argument can directly takes a Dask Cluster client object.
This can go like this:
In [25]: from dask.distributed import Client
In [26]: client = Client(processes=True)
In [27]: print(client)
<Client: 'tcp://127.0.0.1:34041' processes=2 threads=2, memory=6.64 GiB>
In [28]: %%time
....: with argopy.set_options(parallel=client):
....: f = DataFetcher(src='argovis').region([-75, -70, 25, 40, 0, 1000, '2020-01-01', '2021-01-01'])
....: print("%i chunks to process" % len(f.uri))
....: print("\n", f)
....:
14 chunks to process
<datafetcher.argovis>
π Name: Argovis Argo data fetcher for a space/time region
πΊ Domain: [x=-75.00/-70.00; y=25.00/40.00; z=0.0/1000.0; t=2020-01-01/2021-01-01]
π API: https://argovis-api.colorado.edu
π API KEY: 'guest' (get a free key at https://argovis-keygen.colorado.edu)
π User mode: standard
π‘+π΅ Dataset: phy
π€ Performances: cache=False, parallel=True [<Client: 'tcp://127.0.0.1:34041' processes=2 threads=2, memory=6.64 GiB>]
CPU times: user 14.2 ms, sys: 14 us, total: 14.2 ms
Wall time: 14.1 ms