Spatial partitioning in Dask-GeoPandas
Contents
Spatial partitioning in Dask-GeoPandas#
Dask-GeoPandas has implemented spatial_shuffle
method to repartition Dask.GeoDataFrames
geographically.
For those who are not familiar with Dask, a Dask DataFrame is internally split into many partitions, where each partition is a Pandas DataFrame.
Partitions are split vertically along the index;
There are numerous strategies that can be used to partition Dask DataFrames, which determine how the elements of a DataFrame are separated into each resulting partition. Common strategies to partition Dask DataFrames include using a fixed number of partitions or partitions based on column values. Dask-GeoPandas can partition GeoDataFrames by taking advantage of their spatial structure.
In the rest of this notebook, we use an example dataset to describe how Dask-GeoPandas takes advantage of this spatial structure and some best practices.
import geopandas
import dask_geopandas
import matplotlib.pyplot as plt
/home/docs/checkouts/readthedocs.org/user_builds/dask-geopandas/envs/v0.1.0/lib/python3.7/site-packages/geopandas/_compat.py:115: UserWarning: The Shapely GEOS version (3.10.2-CAPI-1.16.0) is incompatible with the GEOS version PyGEOS was compiled with (3.10.1-CAPI-1.16.0). Conversions between both will be slow.
shapely_geos_version, geos_capi_version_string
We first begin by reading USA boundaries from gadm.org
path = "https://geodata.ucdavis.edu/gadm/gadm4.0/gpkg/gadm40_USA.gpkg"
usa = geopandas.read_file(path, layer="ADM_1")
usa.head()
---------------------------------------------------------------------------
KeyboardInterrupt Traceback (most recent call last)
/tmp/ipykernel_273/3612747575.py in <module>
----> 1 usa = geopandas.read_file(path, layer="ADM_1")
2 usa.head()
~/checkouts/readthedocs.org/user_builds/dask-geopandas/envs/v0.1.0/lib/python3.7/site-packages/geopandas/io/file.py in _read_file(filename, bbox, mask, rows, **kwargs)
168
169 if _is_url(filename):
--> 170 req = _urlopen(filename)
171 path_or_bytes = req.read()
172 reader = fiona.BytesCollection
~/.pyenv/versions/3.7.9/lib/python3.7/urllib/request.py in urlopen(url, data, timeout, cafile, capath, cadefault, context)
220 else:
221 opener = _opener
--> 222 return opener.open(url, data, timeout)
223
224 def install_opener(opener):
~/.pyenv/versions/3.7.9/lib/python3.7/urllib/request.py in open(self, fullurl, data, timeout)
523 req = meth(req)
524
--> 525 response = self._open(req, data)
526
527 # post-process response
~/.pyenv/versions/3.7.9/lib/python3.7/urllib/request.py in _open(self, req, data)
541 protocol = req.type
542 result = self._call_chain(self.handle_open, protocol, protocol +
--> 543 '_open', req)
544 if result:
545 return result
~/.pyenv/versions/3.7.9/lib/python3.7/urllib/request.py in _call_chain(self, chain, kind, meth_name, *args)
501 for handler in handlers:
502 func = getattr(handler, meth_name)
--> 503 result = func(*args)
504 if result is not None:
505 return result
~/.pyenv/versions/3.7.9/lib/python3.7/urllib/request.py in https_open(self, req)
1391 def https_open(self, req):
1392 return self.do_open(http.client.HTTPSConnection, req,
-> 1393 context=self._context, check_hostname=self._check_hostname)
1394
1395 https_request = AbstractHTTPHandler.do_request_
~/.pyenv/versions/3.7.9/lib/python3.7/urllib/request.py in do_open(self, http_class, req, **http_conn_args)
1348 try:
1349 h.request(req.get_method(), req.selector, req.data, headers,
-> 1350 encode_chunked=req.has_header('Transfer-encoding'))
1351 except OSError as err: # timeout error
1352 raise URLError(err)
~/.pyenv/versions/3.7.9/lib/python3.7/http/client.py in request(self, method, url, body, headers, encode_chunked)
1275 encode_chunked=False):
1276 """Send a complete request to the server."""
-> 1277 self._send_request(method, url, body, headers, encode_chunked)
1278
1279 def _send_request(self, method, url, body, headers, encode_chunked):
~/.pyenv/versions/3.7.9/lib/python3.7/http/client.py in _send_request(self, method, url, body, headers, encode_chunked)
1321 # default charset of iso-8859-1.
1322 body = _encode(body, 'body')
-> 1323 self.endheaders(body, encode_chunked=encode_chunked)
1324
1325 def getresponse(self):
~/.pyenv/versions/3.7.9/lib/python3.7/http/client.py in endheaders(self, message_body, encode_chunked)
1270 else:
1271 raise CannotSendHeader()
-> 1272 self._send_output(message_body, encode_chunked=encode_chunked)
1273
1274 def request(self, method, url, body=None, headers={}, *,
~/.pyenv/versions/3.7.9/lib/python3.7/http/client.py in _send_output(self, message_body, encode_chunked)
1030 msg = b"\r\n".join(self._buffer)
1031 del self._buffer[:]
-> 1032 self.send(msg)
1033
1034 if message_body is not None:
~/.pyenv/versions/3.7.9/lib/python3.7/http/client.py in send(self, data)
970 if self.sock is None:
971 if self.auto_open:
--> 972 self.connect()
973 else:
974 raise NotConnected()
~/.pyenv/versions/3.7.9/lib/python3.7/http/client.py in connect(self)
1437 "Connect to a host on a given (SSL) port."
1438
-> 1439 super().connect()
1440
1441 if self._tunnel_host:
~/.pyenv/versions/3.7.9/lib/python3.7/http/client.py in connect(self)
942 """Connect to the host and port specified in __init__."""
943 self.sock = self._create_connection(
--> 944 (self.host,self.port), self.timeout, self.source_address)
945 self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
946
~/.pyenv/versions/3.7.9/lib/python3.7/socket.py in create_connection(address, timeout, source_address)
714 if source_address:
715 sock.bind(source_address)
--> 716 sock.connect(sa)
717 # Break explicitly a reference cycle
718 err = None
KeyboardInterrupt:
# Filter and rename cols
usa = usa[["NAME_1", "geometry"]].rename(columns={"NAME_1": "State"})
usa.info()
<class 'geopandas.geodataframe.GeoDataFrame'>
RangeIndex: 51 entries, 0 to 50
Data columns (total 2 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 State 51 non-null object
1 geometry 51 non-null geometry
dtypes: geometry(1), object(1)
memory usage: 944.0+ bytes
usa.plot()
<AxesSubplot:>
We can quickly filter for Contiguous United States, which represents the 48 joining states and the District of Columbia (DC) by the cx
method.
This filters out any geometries using a bounding box.
us_cont = usa.cx[-150:-50, 20:50]
us_cont.info()
<class 'geopandas.geodataframe.GeoDataFrame'>
Int64Index: 49 entries, 0 to 50
Data columns (total 2 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 State 49 non-null object
1 geometry 49 non-null geometry
dtypes: geometry(1), object(1)
memory usage: 1.1+ KB
us_cont.plot(facecolor="none", linewidth=0.5, edgecolor="red")
<AxesSubplot:>
We can then transform the geopandas.GeoDataFrame
to a dask_geopandas.GeoDataFrame
, with a fixed number of arbitrary partitions - this does not yet spatially partition a Dask GeoDataFrame.
Note
We can also read (and export) parquet files in Dask-GeoPandas containing spatially partitioned data.
d_gdf = dask_geopandas.from_geopandas(us_cont, npartitions=4)
d_gdf
State | geometry | |
---|---|---|
npartitions=4 | ||
0 | object | geometry |
15 | ... | ... |
28 | ... | ... |
41 | ... | ... |
50 | ... | ... |
By visualising the convex hull of each partition, we can get a feel for how the Dask-GeoDataFrame has been partitioned using the fixed number. A useful spatial partitioning scheme is one that minimises the degree of spatial overlap between partitions. By default, the fixed number of partitions does a poor job of spatially partitioning our example data - there is a high degree of overlap between partitions.
d_gdf.calculate_spatial_partitions() # convex hull
d_gdf.spatial_partitions
0 POLYGON ((-81.96430 24.52042, -82.87569 24.610...
1 POLYGON ((-89.42000 28.92833, -90.92028 29.048...
2 POLYGON ((-109.04997 31.33190, -114.63287 35.0...
3 POLYGON ((-97.40186 25.83808, -97.40523 25.838...
dtype: geometry
fig, ax = plt.subplots(1,1, figsize=(12,6))
us_cont.plot(ax=ax)
d_gdf.spatial_partitions.plot(ax=ax, cmap="tab20", alpha=0.5)
ax.set_axis_off()
plt.show()
Spatial partitioning a Dask GeoDataFrames is done using spatial_shuffle
, which is based on Dasks set_index
and spatial sorting.
Spatial sorting methods#
spatial_shuffle
supports three different partitioning methods, which provide a simple and relatively inexpensive way of representing two-dimensional objects in one-dimensional space.
These are called using the by
parameter;
Hilbert distance (default)
Morton distance
Geohash
The first two methods are space-filling curves, which are lines that pass through every point in space, in a particular order and pass through points once only, so that each point lies a unique distance along the curve. The range of both curves contain the entire 2-dimensional unit square, which means that the Hilbert and Morton curves can handle projected coordinates. In general, we recommend the default Hilbert distance because it has better order-preserving behaviour. This is because the curve holds a remarkable mathematical property, where the distance between two consecutive points along the curve is always equal to one. By contrast, the Morton distance or “Z-order curve” does not hold this property and there are larger “jumps” in distances between two consecutive points. Currently, calculated distances along the curves are based on the mid points of geometries. Whilst this is inexpensive, mid points do not represent complex geometries well. Future work will examine various methods for better representing complex geometries.
The Geohash, not to be confused with geohashing, is a hierarchical spatial data structure, which subdivides space into buckets of grid shape. Whilst the Geohash does allow binary searchers or spatial indexing, like rtree or quadtree, Geohash is limited to encoding latitude and longitude coordinates. The Geohash can be represented either as text string or integers, where the longer the string/integer is, the more precise the grid shape will be.
Below we compare the different sorting methods using the default parameters. All 3 sorting methods reduce the degree of spatial overlap but this still remains high and could be further improved by controlling specific parameters.
hilbert = d_gdf.spatial_shuffle(by="hilbert")
morton = d_gdf.spatial_shuffle(by="morton")
geohash = d_gdf.spatial_shuffle(by="geohash")
fig, axes = plt.subplots(nrows=1,ncols=3, figsize=(25,12))
ax1, ax2, ax3 = axes.flatten()
for ax in axes:
us_cont.plot(ax=ax)
hilbert.spatial_partitions.plot(ax=ax1, cmap="tab20", alpha=0.5)
morton.spatial_partitions.plot(ax=ax2, cmap="tab20", alpha=0.5)
geohash.spatial_partitions.plot(ax=ax3, cmap="tab20", alpha=0.5)
[axi.set_axis_off() for axi in axes.ravel()]
ax1.set_title("Hilbert", size=16)
ax2.set_title("Morton", size=16)
ax3.set_title("Geohash", size=16)
plt.show()
Number of partitions#
The first parameter is the npartitions
or number of partitions.
The default npartitions is the same number of partitions as the original Dask GeoDataFrame.
Increasing the number of partitions will reduce the degree of spatial overlap between partitions.
Below shows a comparison between the different approaches;
The original Dask GeoDataFrame with no spatial partitioning
The same Dask GeoDataFrame spatially shuffled with the default number of partitions as the original Dask GeoDataFrame
The same Dask GeoDataFrame using 20 partitions.
It’s quite clear that using 20 partitions does reduce the overall degree of spatial overlap between partitions. The number of partitions depends on your data structure and how you plan to use your data.
hilbert20 = d_gdf.spatial_shuffle(by="hilbert", npartitions=20)
fig, axes = plt.subplots(nrows=1,ncols=3, figsize=(25,12))
ax1, ax2, ax3 = axes.flatten()
for ax in axes:
us_cont.plot(ax=ax)
d_gdf.spatial_partitions.plot(ax=ax1, cmap="tab20", alpha=0.5)
hilbert.spatial_partitions.plot(ax=ax2, cmap="tab20", alpha=0.5)
hilbert20.spatial_partitions.plot(ax=ax3, cmap="tab20", alpha=0.5)
[axi.set_axis_off() for axi in axes.ravel()]
ax1.set_title("No spatial shuffle, with 4 partitions", size=16)
ax2.set_title("Spatial shuffle using default npartitions", size=16)
ax3.set_title("Spatial shuffle using 20 partitions", size=16)
plt.show()
Level of precision#
The level
parameter represents the precision of each partitioning method.
This defaults to 16 for both the Hilbert and Morton distance.
However, the Geohash method does not use this parameter and defaults to the integer representation.
We have found there are no significant penalties in selecting the maximum precision.
Summary#
Dask offers some guidance on partitioning Pandas DataFrames, which also applies to Dask-GeoPandas GeoDataFrames;
Try GeoPandas first - GeoPandas can often be faster and easier to use than Dask-GeoPandas for small data problems.
If you are running into memory and/or performance issues with GeoPandas, then the number of partitions of a Dask-GeoDataFrame should fit comfortably into memory, each smaller than a gigabyte in size. So if you are handling a 10GB dataset, first try using 10 partitions.
One common use case of Dask-GeoPandas is to read in large amounts of data, reduce it down and then iterate on a much smaller amount of data. This may only be for a single component of your workflow, so it might make sense to revert back to GeoPandas once this component is complete.