Ravi Shekhar's Technical Blog

A Technical Blog of the Data Science Process

Geospatial Operations at Scale with Dask and Geopandas


Note: This post has interactive Bokeh graphics which may not render well on mobile devices. Try viewing the Jupyter notebook which underlies this post on NBViewer.

Part 1 : A Gentle Introduction to the Spatial Join

One problem I came across when analyzing the New York City Taxi Dataset, is that from 2009 to June 2016, both the starting and stopping locations of taxi trips were given as longitude and latitude points. After July 2016, to provide a degree of anonymity when releasing data to the public, the Taxi and Limousine Commission (TLC) only provides the starting and ending "taxi zones" of a trip, and a shapefile that specifies the boundaries, available here. Let's load this up in Geopandas, and set the coordinate system to 'epsg:4326', which is latitude and longitude coordinates.

In [1]:
Expand Code
Out[1]:
LocationID borough geometry zone
0 1 EWR POLYGON ((-74.18445299999996 40.6949959999999,... Newark Airport
1 2 Queens (POLYGON ((-73.82337597260663 40.6389870471767... Jamaica Bay
2 3 Bronx POLYGON ((-73.84792614099985 40.87134223399993... Allerton/Pelham Gardens
3 4 Manhattan POLYGON ((-73.97177410965318 40.72582128133706... Alphabet City
4 5 Staten Island POLYGON ((-74.17421738099989 40.5625680859999,... Arden Heights

We see that the geometry column consists of polygons (from Shapely) that have vertices defined by longitude and latitude points. Let's plot using bokeh, in order of ascending LocationID.

In [2]:
Expand Code
Loading BokehJS ...

This is a familiar map of New York, with 262 taxi districts shown, colored by the id of the taxi district. I have added a random point (-73.966˚E, 40.78˚N) in magenta, which happens to fall in the middle of Central Park. Assigning a point as within a taxi zone is something humans can do easily, but on a computer it requires solving the point in polygon problem. Luckily the Shapely library provides an easy interface to such geometric operations in Python. But, point in polygon is computationally expensive, and using the Shapely library on 2.4 billion (latitude, longitude) pairs to assign taxi zones as in the NYC Taxi Dataset would take a modern single core cpu about four years. To speed this up, we calculate the bounding boxes for each taxi zone, which looks like:

In [3]:
Expand Code

Now, given a (longitude, latitude) coordinate pair, bounding boxes that contain that pair can be efficiently calculated with an R-tree. You can find an excellent introduction to R-trees here. Only the polygons (taxi zones) that have bounding boxes that contain the coordinate pair need to be examined, and then the point in Polygon is solved for those (hopefully) few taxi zones. This reduces computation by a factor of about 100-1000. This process, assigning coordinate pairs to taxi zones is one example of a spatial join. Geopandas provides a nice interface to efficient spatial joins in Python, and it takes care of calculating bounding boxes and R-trees for you, as this snippet shows.

In [4]:
gpd.sjoin(gpd.GeoDataFrame(crs={'init': 'epsg:4326'}, geometry=[Point(-73.966, 40.78)]), 
          df,
          how='left', op='within')
Out[4]:
geometry index_right LocationID borough zone
0 POINT (-73.96599999999999 40.78) 42 43 Manhattan Central Park

This does the merge for a single point (drawn in magenta) on maps above, and correctly identifies it in Central Park

Part 2 : Spatial Joins at scale using Dask

In my NYC transit project, I download and process the Taxi dataset. Here I load up a single file from the taxi dataset (May 2016) into Dask, and show the first few rows and a few columns. The file is a bit large at 1.8GB, and Dask chooses to divide up the dataframe into 30 partitions for efficient calculations. Each partition is a pandas DataFrame, and dask takes care of all the logic to view the combination as a single DataFrame. Here are a few columns.

In [20]:
Expand Code
There are 30 partitions. 
Out[20]:
tpep_pickup_datetime pickup_longitude pickup_latitude
0 2016-05-01 00:00:00 -73.985901 40.768040
1 2016-05-01 00:00:00 -73.991577 40.744751
2 2016-05-01 00:00:00 -73.993073 40.741573
3 2016-05-01 00:00:00 -73.991943 40.684601
4 2016-05-01 00:00:00 -74.005280 40.740192
In [21]:
Expand Code
Out[21]:
tpep_dropoff_datetime dropoff_longitude dropoff_latitude
0 2016-05-01 00:17:31 -73.983986 40.730099
1 2016-05-01 00:07:31 -73.975700 40.765469
2 2016-05-01 00:07:01 -73.980995 40.744633
3 2016-05-01 00:19:47 -74.002258 40.733002
4 2016-05-01 00:06:39 -73.997498 40.737564

So each trip has pickup and dropoff (longitude, latitude) coordinate pairs. Just to give you a feel for the data, I plot the start and end locations of the first trip, ending in the East Village. Driving directions come with a great deal of additional complexity, so here I just plot an arrow, as the crow flies. A spatial join identifies the taxi zones as Clinton East and East Village.

In [22]:
Expand Code
In [23]:
gpd.sjoin(gpd.GeoDataFrame(crs={'init': 'epsg:4326'}, 
                           geometry=[Point(x, y), Point(x2, y2)]), 
                           df,
                           how='left', op='within')
Out[23]:
geometry index_right LocationID borough zone borough_categ
0 POINT (-73.98590087890625 40.76803970336913) 47 48 Manhattan Clinton East 3
1 POINT (-73.98398590087891 40.73009872436523) 78 79 Manhattan East Village 3

So, Dask DataFrames are just collections of Pandas DataFrames, and I know how to perform a spatial join on a Pandas DataFrame. Let's take advantage of Dask's map_partitions function to do a spatial join with the taxi zones on every partition. Here is the function to do the spatial join, given a Pandas DataFrame, and the names of the longitude, latitude, and taxizone id columns.

In [24]:
Collapse Code
def assign_taxi_zones(df, lon_var, lat_var, locid_var):
    """Joins DataFrame with Taxi Zones shapefile.
    This function takes longitude values provided by `lon_var`, and latitude
    values provided by `lat_var` in DataFrame `df`, and performs a spatial join
    with the NYC taxi_zones shapefile. 
    The shapefile is hard coded in, as this function makes a hard assumption of
    latitude and longitude coordinates. It also assumes latitude=0 and 
    longitude=0 is not a datapoint that can exist in your dataset. Which is 
    reasonable for a dataset of New York, but bad for a global dataset.
    Only rows where `df.lon_var`, `df.lat_var` are reasonably near New York,
    and `df.locid_var` is set to np.nan are updated. 
    Parameters
    ----------
    df : pandas.DataFrame or dask.DataFrame
        DataFrame containing latitudes, longitudes, and location_id columns.
    lon_var : string
        Name of column in `df` containing longitude values. Invalid values 
        should be np.nan.
    lat_var : string
        Name of column in `df` containing latitude values. Invalid values 
        should be np.nan
    locid_var : string
        Name of series to return. 
    """

    import geopandas
    from shapely.geometry import Point


    # make a copy since we will modify lats and lons
    localdf = df[[lon_var, lat_var]].copy()
    
    # missing lat lon info is indicated by nan. Fill with zero
    # which is outside New York shapefile. 
    localdf[lon_var] = localdf[lon_var].fillna(value=0.)
    localdf[lat_var] = localdf[lat_var].fillna(value=0.)
    

    shape_df = geopandas.read_file('../shapefiles/taxi_zones.shp')
    shape_df.drop(['OBJECTID', "Shape_Area", "Shape_Leng", "borough", "zone"],
                  axis=1, inplace=True)
    shape_df = shape_df.to_crs({'init': 'epsg:4326'})

    try:
        local_gdf = geopandas.GeoDataFrame(
            localdf, crs={'init': 'epsg:4326'},
            geometry=[Point(xy) for xy in
                      zip(localdf[lon_var], localdf[lat_var])])

        local_gdf = geopandas.sjoin(
            local_gdf, shape_df, how='left', op='within')

        return local_gdf.LocationID.rename(locid_var)
    except ValueError as ve:
        print(ve)
        print(ve.stacktrace())
        series = localdf[lon_var]
        series = np.nan
        return series

At Scale

Using the map_partitions function, I apply the spatial join to each of the Pandas DataFrames that make up the Dask DataFrame. For simplicity, I just call the function twice, once for pickup locations, and once for dropoff locations. To assist dask in determining schema of returned data, we specify it as a column of floats (allowing for NaN values).

In [25]:
Collapse Code
trips['pickup_taxizone_id'] = trips.map_partitions(
    assign_taxi_zones, "pickup_longitude", "pickup_latitude",
    "pickup_taxizone_id", meta=('pickup_taxizone_id', np.float64))
trips['dropoff_taxizone_id'] = trips.map_partitions(
    assign_taxi_zones, "dropoff_longitude", "dropoff_latitude",
    "dropoff_taxizone_id", meta=('dropoff_taxizone_id', np.float64))
trips[['pickup_taxizone_id', 'dropoff_taxizone_id']].head()
Out[25]:
pickup_taxizone_id dropoff_taxizone_id
0 48.0 79.0
1 90.0 43.0
2 234.0 170.0
3 25.0 249.0
4 158.0 249.0

At this point, the trips Dask DataFrame will have valid taxizone_id information. Lets save this data to Parquet, which is a columnar format that is well supported in Dask and Apache Spark. This prevents Dask from recalculating the spatial join (very expensive) every time an operation on the trips DataFrame is required.

In [26]:
trips.to_parquet('trips_2016-05.parquet', has_nulls=True, object_encoding='json', compression="SNAPPY")
trips = dd.read_parquet('trips_2016-05.parquet', columns=['pickup_taxizone_id', 'dropoff_taxizone_id'])

To close this post out, I'll produce a heatmap of taxi dropoff locations, aggregated by taxi zone using Dask. Unsurprisingly (for New Yorkers at least) the vast majority of taxi dropoffs tend to be in Downtown and Midtown Manhattan. I will analyze this dataset further in future posts.

In [27]:
Expand Code

Summary

In this post I described the process of a Spatial Join, and doing the Spatial Join at scale on a cluster using Dask and Pandas. I glossed over some details that are important for the full New York Taxi Dataset, but my full code is available here on Github. In future posts, I will analyze this data more thoroughly, and possibly look into releasing the processed data as a parquet file for others to analyze.

Project on Github

A side note on spatial join performance

The spatial join as written above with GeoPandas, using the New York Taxi Dataset, can assign taxi zones to approxmately 40 million taxi trips per hour on a 4 GHz 4-core i5 system. A lot of the code that supports this join is some amalgamation of Python and wrapped C code.

This is approxmately a factor of two slower than performing the same spatial join in highly optimized PostGIS C/C++ code. However, PostGIS does not efficiently use multiple cores (at least without multiple spatial joins running simultaneously), and more importantly, the network and serialization overhead of a round trip to the PostgreSQL database puts PostgreSQL/PostGIS at roughly the same speed as the GeoPandas implementation I describe in this post, with vastly more moving parts to break.

Basically, Python is actually pretty fast for these kinds of data structure operations.