New York City Taxi & Limousine Commission (TLC) Trip Data Analysis Using Sparklyr and Google BigQuery

[This article was first published on Mirai Solutions, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

This post shows how to use Apache Spark and Google BigQuery in R via sparklyr to efficiently analyze a big dataset (NYC yellow taxi trips).

You can find the R Markdown document used to generate this post here.

Introduction

On August 3, 2015 the New York City Taxi & Limousine Commission (TLC), in partnership with the New York City Department of Information Technology and Telecommunications (DOITT), announced the availability of millions of trip records from both Yellow Medallion and Green (Street Hail Livery) Cabs. The data can be downloaded from the NYC TLC or through the NYC OpenData website. In addition, the data is available on Google BigQuery in the form of public datasets.

In this post, we show how to use Apache Spark and Google BigQuery in R via sparklyr to efficiently analyze big datasets. Specifically, we are going to analyze tip amounts paid in taxi trips.

The NYC TLC data is ~150GB and consists of 1.2 billion records. It contains pick-up/drop-off date/times and locations, number of passengers, tips and fare amounts, etc. of yellow cab trips between 2009 and 2016.

Package Dependencies

We are using the pacman R package management tool to install and load the package dependencies.
Please note that sparklyr version 0.7.0+ (available on GitHub, but not yet released on CRAN) is needed.

<span class="k">if</span><span class="p">(</span><span class="o">!</span><span class="n">require</span><span class="p">(</span><span class="n">pacman</span><span class="p">))</span><span class="w"> </span><span class="p">{</span><span class="w">
  </span><span class="n">install.packages</span><span class="p">(</span><span class="s2">"pacman"</span><span class="p">)</span><span class="w">
  </span><span class="n">library</span><span class="p">(</span><span class="n">pacman</span><span class="p">)</span><span class="w">
</span><span class="p">}</span><span class="w">

</span><span class="c1"># From GitHub</span><span class="w">
</span><span class="n">p_load_gh</span><span class="p">(</span><span class="w">
  </span><span class="s2">"rstudio/[email protected]"</span><span class="p">,</span><span class="w"> </span><span class="c1"># version 0.7.0+ is required</span><span class="w">
  </span><span class="s2">"miraisolutions/[email protected]"</span><span class="p">,</span><span class="w">
  </span><span class="s2">"miraisolutions/[email protected]"</span><span class="w">
</span><span class="p">)</span><span class="w">

</span><span class="c1"># From CRAN</span><span class="w">
</span><span class="n">p_load</span><span class="p">(</span><span class="w">
  </span><span class="s2">"dplyr"</span><span class="p">,</span><span class="w">
  </span><span class="s2">"dbplyr"</span><span class="p">,</span><span class="w">
  </span><span class="s2">"geojsonio"</span><span class="p">,</span><span class="w">
  </span><span class="s2">"leaflet"</span><span class="p">,</span><span class="w">
  </span><span class="s2">"knitr"</span><span class="p">,</span><span class="w">
  </span><span class="s2">"kableExtra"</span><span class="p">,</span><span class="w">
  </span><span class="s2">"tictoc"</span><span class="w">
</span><span class="p">)</span><span class="w">
</span>

Here is a list of all the dependencies, together with a short description:

Connect to Apache Spark

Before connecting to Apache Spark using sparklyr,
we need to tune some Spark configuration properties to allocate the right amount of resources in the cluster and avoid runtime failures:

  • spark.executor.instances: Number of executors to start. An executor is a process launched on a worker node, that runs tasks and keeps data in memory or on disk.
  • spark.executor.cores: Number of cores to use on each executor.
  • spark.executor.memory: Amount of memory to use per executor process.
  • spark.sql.shuffle.partitions: Number of partitions to use when shuffling data for joins or aggregations.
  • spark.network.timeout: Default timeout for all network interactions.
  • spark.hadoop.fs.gs.project.id: Project ID used in GCP. Used to bill for the usage of GCP resources.
  • spark.hadoop.fs.gs.impl: A GCS setting defining the file system in the Spark configuration.
  • spark.hadoop.fs.AbstractFileSystem.gs.impl: A GCS setting defining the abstract file system in the Spark configuration.

Furthermore, we also register sparkgeo that will be leveraged later to perform geospatial joins.

<span class="c1"># Spark configuration settings</span><span class="w">
</span><span class="n">config</span><span class="w"> </span><span class="o"><-</span><span class="w"> </span><span class="n">spark_config</span><span class="p">()</span><span class="w">

</span><span class="c1"># Apply Spark configuration settings from R markdown document parameters</span><span class="w">
</span><span class="n">spark_param_names</span><span class="w"> </span><span class="o"><-</span><span class="w"> </span><span class="n">grep</span><span class="p">(</span><span class="s2">"spark."</span><span class="p">,</span><span class="w"> </span><span class="nf">names</span><span class="p">(</span><span class="n">params</span><span class="p">),</span><span class="w">
                          </span><span class="n">fixed</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="kc">TRUE</span><span class="p">,</span><span class="w"> </span><span class="n">value</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="kc">TRUE</span><span class="p">)</span><span class="w">
</span><span class="n">print</span><span class="p">(</span><span class="n">params</span><span class="p">[</span><span class="n">spark_param_names</span><span class="p">])</span><span class="w">
</span><span class="c1">## $spark.executor.instances</span><span class="w">
</span><span class="c1">## [1] 4</span><span class="w">
</span><span class="c1">## </span><span class="w">
</span><span class="c1">## $spark.executor.cores</span><span class="w">
</span><span class="c1">## [1] 8</span><span class="w">
</span><span class="c1">## </span><span class="w">
</span><span class="c1">## $spark.executor.memory</span><span class="w">
</span><span class="c1">## [1] "34g"</span><span class="w">
</span><span class="c1">## </span><span class="w">
</span><span class="c1">## $spark.sql.shuffle.partitions</span><span class="w">
</span><span class="c1">## [1] 1600</span><span class="w">
</span><span class="c1">## </span><span class="w">
</span><span class="c1">## $spark.network.timeout</span><span class="w">
</span><span class="c1">## [1] 900</span><span class="w">
</span><span class="c1">## </span><span class="w">
</span><span class="c1">## $spark.yarn.executor.memoryOverhead</span><span class="w">
</span><span class="c1">## [1] 5000</span><span class="w">
</span><span class="c1">## </span><span class="w">
</span><span class="c1">## $spark.hadoop.fs.gs.impl</span><span class="w">
</span><span class="c1">## [1] "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"</span><span class="w">
</span><span class="c1">## </span><span class="w">
</span><span class="c1">## $spark.hadoop.fs.AbstractFileSystem.gs.impl</span><span class="w">
</span><span class="c1">## [1] "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"</span><span class="w">
</span><span class="n">config</span><span class="p">[</span><span class="n">spark_param_names</span><span class="p">]</span><span class="w"> </span><span class="o"><-</span><span class="w"> </span><span class="n">params</span><span class="p">[</span><span class="n">spark_param_names</span><span class="p">]</span><span class="w">

</span><span class="c1"># Set Google Cloud Storage (GCS) settings. This allows us to read</span><span class="w">
</span><span class="c1"># files from 'gs://' URLs.</span><span class="w">
</span><span class="c1"># The Google Cloud Storage connector comes with sparkbq.</span><span class="w">
</span><span class="n">config</span><span class="p">[[</span><span class="s2">"spark.hadoop.fs.gs.project.id"</span><span class="p">]]</span><span class="w"> </span><span class="o"><-</span><span class="w"> 
  </span><span class="n">params</span><span class="o">$</span><span class="n">bigquery_billing_project_id</span><span class="w">

</span><span class="c1"># Check if Google Cloud Platform service credentials have been specified</span><span class="w">
</span><span class="c1"># (in the form of a JSON keyfile) - settings needed when running locally</span><span class="w">
</span><span class="c1"># See https://developers.google.com/identity/protocols/application-default-credentials</span><span class="w">
</span><span class="k">if</span><span class="p">(</span><span class="o">!</span><span class="nf">is.null</span><span class="p">(</span><span class="n">params</span><span class="o">$</span><span class="n">gcp_json_keyfile</span><span class="p">))</span><span class="w"> </span><span class="p">{</span><span class="w">
  </span><span class="n">Sys.setenv</span><span class="p">(</span><span class="s2">"GOOGLE_APPLICATION_CREDENTIALS"</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">params</span><span class="o">$</span><span class="n">gcp_json_keyfile</span><span class="p">)</span><span class="w">
  </span><span class="n">config</span><span class="p">[[</span><span class="s2">"spark.hadoop.google.cloud.auth.service.account.json.keyfile"</span><span class="p">]]</span><span class="w"> </span><span class="o"><-</span><span class="w"> 
    </span><span class="n">params</span><span class="o">$</span><span class="n">gcp_json_keyfile</span><span class="w">
</span><span class="p">}</span><span class="w">

</span><span class="c1"># Connect to Apache Spark</span><span class="w">
</span><span class="n">sc</span><span class="w"> </span><span class="o"><-</span><span class="w"> </span><span class="n">spark_connect</span><span class="p">(</span><span class="n">master</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">params</span><span class="o">$</span><span class="n">spark_master</span><span class="p">,</span><span class="w">
                    </span><span class="n">config</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">config</span><span class="p">,</span><span class="w">
                    </span><span class="n">app_name</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s2">"NYC TLC"</span><span class="p">)</span><span class="w">
</span><span class="c1">## Re-using existing Spark connection to yarn-client</span><span class="w">

</span><span class="c1"># Register sparkgeo's user-defined functions (UDFs)</span><span class="w">
</span><span class="n">sparkgeo_register</span><span class="p">(</span><span class="n">sc</span><span class="p">)</span><span class="w">
</span>

Google BigQuery Settings

Next, we are going to set the Google Cloud Platform project ID to use for billing purposes. 1
We also set the Google Cloud Storage (GCS) bucket used to store temporary BigQuery files and the default BigQuery dataset location.

<span class="n">bigquery_defaults</span><span class="p">(</span><span class="w">
  </span><span class="n">billingProjectId</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">params</span><span class="o">$</span><span class="n">bigquery_billing_project_id</span><span class="p">,</span><span class="w">
  </span><span class="n">gcsBucket</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">params</span><span class="o">$</span><span class="n">bigquery_gcs_bucket</span><span class="p">,</span><span class="w">
  </span><span class="n">datasetLocation</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">params</span><span class="o">$</span><span class="n">bigquery_dataset_location</span><span class="w">
</span><span class="p">)</span><span class="w">
</span>

Import New York City Neighborhoods

In the NYC TLC trips dataset we only have pickup and dropoff locations in terms of latitude and longitude.
To compute statistics by neighborhood we will need to map these locations to New York City neighborhoods.
For this, we are going to leverage NYC neighborhood polygon information in the form of a GeoJSON file. We are using sparkgeo::spark_read_geojson to read the data straight into Spark and extract any relevant metadata.

<span class="n">neighborhoods</span><span class="w"> </span><span class="o"><-</span><span class="w">
  </span><span class="n">spark_read_geojson</span><span class="p">(</span><span class="w">
    </span><span class="n">sc</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">sc</span><span class="p">,</span><span class="w">
    </span><span class="n">name</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s2">"neighborhoods"</span><span class="p">,</span><span class="w">
    </span><span class="n">path</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s2">"gs://miraisolutions/public/sparkgeo/nyc_neighborhoods.geojson"</span><span class="w">
  </span><span class="p">)</span><span class="w"> </span><span class="o">%>%</span><span class="w">
  </span><span class="n">mutate</span><span class="p">(</span><span class="n">neighborhood</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">metadata_string</span><span class="p">(</span><span class="n">metadata</span><span class="p">,</span><span class="w"> </span><span class="s2">"neighborhood"</span><span class="p">))</span><span class="w"> </span><span class="o">%>%</span><span class="w">
  </span><span class="n">select</span><span class="p">(</span><span class="n">neighborhood</span><span class="p">,</span><span class="w"> </span><span class="n">polygon</span><span class="p">,</span><span class="w"> </span><span class="n">index</span><span class="p">)</span><span class="w"> </span><span class="o">%>%</span><span class="w">
  </span><span class="n">sdf_persist</span><span class="p">()</span><span class="w">
</span>

There are 310 neighborhoods.

sdf_persist counteracts the default lazy behaviour of Spark DataFrames. It forces any pending computation and (optionally) serializes the results to disk and/or memory.

Import NYC TLC trip data from Google BigQuery

We are using sparkbq::spark_read_bigquery to import the publicly available Google BigQuery NYC Taxi & Limousine trip dataset. Trip data is available in several tables (one for each year).

<span class="n">all_trips_spark_by_year</span><span class="w"> </span><span class="o"><-</span><span class="w">
  </span><span class="n">lapply</span><span class="p">(</span><span class="m">2009</span><span class="o">:</span><span class="m">2016</span><span class="p">,</span><span class="w"> </span><span class="k">function</span><span class="p">(</span><span class="n">year</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
    </span><span class="n">spark_read_bigquery</span><span class="p">(</span><span class="w">
      </span><span class="n">sc</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">sc</span><span class="p">,</span><span class="w">
      </span><span class="n">name</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">paste0</span><span class="p">(</span><span class="s2">"trips"</span><span class="p">,</span><span class="w"> </span><span class="n">year</span><span class="p">),</span><span class="w">
      </span><span class="n">projectId</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s2">"bigquery-public-data"</span><span class="p">,</span><span class="w">
      </span><span class="n">datasetId</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s2">"new_york"</span><span class="p">,</span><span class="w">
      </span><span class="n">tableId</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">paste0</span><span class="p">(</span><span class="s2">"tlc_yellow_trips_"</span><span class="p">,</span><span class="w"> </span><span class="n">year</span><span class="p">),</span><span class="w">
      </span><span class="n">repartition</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="m">400</span><span class="w">
    </span><span class="p">)</span><span class="w">
  </span><span class="p">})</span><span class="w">

</span><span class="c1"># Union of all trip data</span><span class="w">
</span><span class="n">all_trips_spark</span><span class="w"> </span><span class="o"><-</span><span class="w"> </span><span class="n">Reduce</span><span class="p">(</span><span class="n">union_all</span><span class="p">,</span><span class="w"> </span><span class="n">all_trips_spark_by_year</span><span class="p">)</span><span class="w">
</span>

Note: At this point no data (except for schema information) has been read yet.
These operations are lazy.

Due to lack of information on tips paid by cash, we filter the dataset for trips paid by credit card. In addition, we filter out some bad data points such as negative tip & fare amounts and negative trip distances. We also perform a geospatial join to map pickup locations to the corresponding NYC neighborhood and calculate some additional metrics such as trip duration and tip percentage.

<span class="n">credit_trips_spark</span><span class="w"> </span><span class="o"><-</span><span class="w">
  </span><span class="n">all_trips_spark</span><span class="w"> </span><span class="o">%>%</span><span class="w">
  </span><span class="n">filter</span><span class="p">(</span><span class="w">
    </span><span class="c1"># Trips paid by credit card</span><span class="w">
    </span><span class="n">payment_type</span><span class="w"> </span><span class="o">%in%</span><span class="w"> </span><span class="nf">c</span><span class="p">(</span><span class="s2">"CREDIT"</span><span class="p">,</span><span class="w"> </span><span class="s2">"CRD"</span><span class="p">,</span><span class="w"> </span><span class="s2">"1"</span><span class="p">)</span><span class="w"> </span><span class="o">&</span><span class="w">
    </span><span class="c1"># Filter out bad data points</span><span class="w">
    </span><span class="n">fare_amount</span><span class="w"> </span><span class="o">></span><span class="w"> </span><span class="m">1</span><span class="w"> </span><span class="o">&</span><span class="w">
    </span><span class="n">tip_amount</span><span class="w"> </span><span class="o">>=</span><span class="w"> </span><span class="m">0</span><span class="w"> </span><span class="o">&</span><span class="w">
    </span><span class="n">trip_distance</span><span class="w"> </span><span class="o">></span><span class="w"> </span><span class="m">0</span><span class="w"> </span><span class="o">&</span><span class="w">
    </span><span class="n">passenger_count</span><span class="w"> </span><span class="o">></span><span class="w"> </span><span class="m">0</span><span class="w">
  </span><span class="p">)</span><span class="w"> </span><span class="o">%>%</span><span class="w">
  </span><span class="c1"># Select relevant columns only to reduce amount of data</span><span class="w">
  </span><span class="n">select</span><span class="p">(</span><span class="w">
    </span><span class="n">vendor_id</span><span class="p">,</span><span class="w">
    </span><span class="n">pickup_datetime</span><span class="p">,</span><span class="w">
    </span><span class="n">dropoff_datetime</span><span class="p">,</span><span class="w">
    </span><span class="n">pickup_latitude</span><span class="p">,</span><span class="w">
    </span><span class="n">pickup_longitude</span><span class="p">,</span><span class="w">
    </span><span class="n">trip_distance</span><span class="p">,</span><span class="w">
    </span><span class="n">passenger_count</span><span class="p">,</span><span class="w">
    </span><span class="n">fare_amount</span><span class="p">,</span><span class="w">
    </span><span class="n">tip_amount</span><span class="w">
  </span><span class="p">)</span><span class="w"> </span><span class="o">%>%</span><span class="w">
  </span><span class="c1"># Join with NYC neighborhoods</span><span class="w">
  </span><span class="n">sparkgeo</span><span class="o">::</span><span class="n">sdf_spatial_join</span><span class="p">(</span><span class="n">neighborhoods</span><span class="p">,</span><span class="w"> </span><span class="n">pickup_latitude</span><span class="p">,</span><span class="w">
                             </span><span class="n">pickup_longitude</span><span class="p">)</span><span class="w"> </span><span class="o">%>%</span><span class="w">
  </span><span class="c1"># NOTE: timestamps are currently returned as microseconds since the epoch</span><span class="w">
  </span><span class="n">mutate</span><span class="p">(</span><span class="w">
    </span><span class="n">trip_duration</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="p">(</span><span class="n">dropoff_datetime</span><span class="w"> </span><span class="o">-</span><span class="w"> </span><span class="n">pickup_datetime</span><span class="p">)</span><span class="w"> </span><span class="o">/</span><span class="w"> </span><span class="m">1e6</span><span class="p">,</span><span class="w">
    </span><span class="n">pickup_datetime</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">from_unixtime</span><span class="p">(</span><span class="n">pickup_datetime</span><span class="w"> </span><span class="o">/</span><span class="w"> </span><span class="m">1e6</span><span class="p">)</span><span class="w">
  </span><span class="p">)</span><span class="w"> </span><span class="o">%>%</span><span class="w">
  </span><span class="n">mutate</span><span class="p">(</span><span class="w">
    </span><span class="c1"># Split pickup date/time into separate metrics</span><span class="w">
    </span><span class="n">pickup_month</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">month</span><span class="p">(</span><span class="n">pickup_datetime</span><span class="p">),</span><span class="w">
    </span><span class="n">pickup_weekday</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">date_format</span><span class="p">(</span><span class="n">pickup_datetime</span><span class="p">,</span><span class="w"> </span><span class="s1">'EEEE'</span><span class="p">),</span><span class="w">
    </span><span class="n">pickup_hour</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">hour</span><span class="p">(</span><span class="n">pickup_datetime</span><span class="p">),</span><span class="w">
    </span><span class="c1"># Calculate tip percentage based on fare amount</span><span class="w">
    </span><span class="n">tip_pct</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">tip_amount</span><span class="w"> </span><span class="o">/</span><span class="w"> </span><span class="n">fare_amount</span><span class="w"> </span><span class="o">*</span><span class="w"> </span><span class="m">100</span><span class="w">
  </span><span class="p">)</span><span class="w"> </span><span class="o">%>%</span><span class="w">
  </span><span class="n">select</span><span class="p">(</span><span class="w">
    </span><span class="n">vendor_id</span><span class="p">,</span><span class="w">
    </span><span class="n">pickup_month</span><span class="p">,</span><span class="w">
    </span><span class="n">pickup_weekday</span><span class="p">,</span><span class="w">
    </span><span class="n">pickup_hour</span><span class="p">,</span><span class="w">
    </span><span class="n">neighborhood</span><span class="p">,</span><span class="w">
    </span><span class="n">trip_duration</span><span class="p">,</span><span class="w">
    </span><span class="n">trip_distance</span><span class="p">,</span><span class="w">
    </span><span class="n">passenger_count</span><span class="p">,</span><span class="w">
    </span><span class="n">fare_amount</span><span class="p">,</span><span class="w">
    </span><span class="n">tip_pct</span><span class="w">
  </span><span class="p">)</span><span class="w"> </span><span class="o">%>%</span><span class="w">
  </span><span class="c1"># Persist results to memory and/or disk</span><span class="w">
  </span><span class="n">sdf_persist</span><span class="p">()</span><span class="w">
</span>

There are 507’139’792 records in credit_trips_spark.

The above example also showcases the predicate pushdown feature of sparkbq: the first filter() and select() operations are pushed down to BigQuery and are executed at data source level. This can improve query performance by reducing the amount of data read from the data source.

Average Tip Percentage by Pickup Neighborhood

Here we calculate the average tip percentage by pickup neighborhood for visualization in a following section. Note that the following dplyr pipeline is lazy: no computations will be triggered until we execute an appropriate action.

<span class="n">avg_tip_per_neighborhood_spark</span><span class="w"> </span><span class="o"><-</span><span class="w">
  </span><span class="n">credit_trips_spark</span><span class="w"> </span><span class="o">%>%</span><span class="w">
  </span><span class="n">group_by</span><span class="p">(</span><span class="n">neighborhood</span><span class="p">)</span><span class="w"> </span><span class="o">%>%</span><span class="w">
  </span><span class="n">summarize</span><span class="p">(</span><span class="n">avg_tip_pct</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">mean</span><span class="p">(</span><span class="n">tip_pct</span><span class="p">))</span><span class="w"> </span><span class="o">%>%</span><span class="w">
  </span><span class="n">arrange</span><span class="p">(</span><span class="n">desc</span><span class="p">(</span><span class="n">avg_tip_pct</span><span class="p">))</span><span class="w">
</span>

The following Spark SQL code gets generated by sparklyr/dbplyr for the object avg_tip_per_neighborhood_spark:

<span class="k">SELECT</span> <span class="o">*</span>
<span class="k">FROM</span> <span class="p">(</span><span class="k">SELECT</span> <span class="nv">`neighborhood`</span><span class="p">,</span> <span class="k">AVG</span><span class="p">(</span><span class="nv">`tip_pct`</span><span class="p">)</span> <span class="k">AS</span> <span class="nv">`avg_tip_pct`</span>
<span class="k">FROM</span> <span class="nv">`sparklyr_tmp_41bf4beea2bd`</span>
<span class="k">GROUP</span> <span class="k">BY</span> <span class="nv">`neighborhood`</span><span class="p">)</span> <span class="nv">`zpdkqkniln`</span>
<span class="k">ORDER</span> <span class="k">BY</span> <span class="nv">`avg_tip_pct`</span> <span class="k">DESC</span>

This SQL query will be executed by Spark when we run the collect action in the next section.

Collect and Import to R

<span class="n">avg_tip_per_neighborhood</span><span class="w"> </span><span class="o"><-</span><span class="w"> </span><span class="n">avg_tip_per_neighborhood_spark</span><span class="w"> </span><span class="o">%>%</span><span class="w"> </span><span class="n">collect</span><span class="p">()</span><span class="w">

</span><span class="n">head</span><span class="p">(</span><span class="n">avg_tip_per_neighborhood</span><span class="p">,</span><span class="w"> </span><span class="n">n</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="m">10</span><span class="p">)</span><span class="w"> </span><span class="o">%>%</span><span class="w">
  </span><span class="n">kable</span><span class="p">(</span><span class="s2">"html"</span><span class="p">)</span><span class="w"> </span><span class="o">%>%</span><span class="w">
  </span><span class="n">kable_styling</span><span class="p">(</span><span class="n">bootstrap_options</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s2">"striped"</span><span class="p">,</span><span class="w"> </span><span class="n">full_width</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="kc">FALSE</span><span class="p">)</span><span class="w">
</span>
neighborhood avg_tip_pct
Edgemere 50.60120
Breezy Point 42.17196
Far Rockaway 40.80644
Great Kills Park 39.45321
Oakwood 36.26592
Bloomfield 33.54538
Graniteville 32.20948
Woodrow 31.92127
Rockaway Beach 30.93325
Charleston 30.01376

collect executes the Spark query and returns the results to R.

Leaflet Visualization

In this section we are going to visualize the average tip percentages using
RStudio’s leaflet package.
The geojsonio package can read
GeoJSON files and produces a SpatialPolygonsDataFrame
that can be directly plotted by
leaflet on a map.

<span class="c1"># Build SpatialPolygonsDataFrame from NYC neighborhood geojson file</span><span class="w">
</span><span class="n">nyc_neighborhoods</span><span class="w"> </span><span class="o"><-</span><span class="w">
  </span><span class="n">geojsonio</span><span class="o">::</span><span class="n">geojson_read</span><span class="p">(</span><span class="w">
    </span><span class="s2">"https://storage.googleapis.com/miraisolutions/public/sparkgeo/nyc_neighborhoods.geojson"</span><span class="p">,</span><span class="w">
    </span><span class="n">what</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s2">"sp"</span><span class="w">
  </span><span class="p">)</span><span class="w">

</span><span class="c1"># Merge average tip percentages with neighborhood metadata</span><span class="w">
</span><span class="n">average_tip_with_shapes</span><span class="w"> </span><span class="o"><-</span><span class="w"> </span><span class="n">nyc_neighborhoods</span><span class="w">
</span><span class="n">average_tip_with_shapes</span><span class="o">@</span><span class="n">data</span><span class="w"> </span><span class="o"><-</span><span class="w">
  </span><span class="n">merge</span><span class="p">(</span><span class="w">
    </span><span class="n">nyc_neighborhoods</span><span class="o">@</span><span class="n">data</span><span class="p">,</span><span class="w">
    </span><span class="n">avg_tip_per_neighborhood</span><span class="p">,</span><span class="w">
    </span><span class="n">all.x</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="kc">TRUE</span><span class="w">
  </span><span class="p">)</span><span class="w">

</span><span class="c1"># Create continuous color palette based on average tip percentages</span><span class="w">
</span><span class="n">pal</span><span class="w"> </span><span class="o"><-</span><span class="w">
  </span><span class="n">colorNumeric</span><span class="p">(</span><span class="w">
    </span><span class="nf">c</span><span class="p">(</span><span class="s2">"#FF0000"</span><span class="p">,</span><span class="w"> </span><span class="s2">"#FFFF00"</span><span class="p">,</span><span class="w"> </span><span class="s2">"#00FF00"</span><span class="p">),</span><span class="w">
    </span><span class="n">average_tip_with_shapes</span><span class="o">$</span><span class="n">avg_tip_pct</span><span class="w">
  </span><span class="p">)</span><span class="w">

</span><span class="c1"># Draw leaflet containing</span><span class="w">
</span><span class="c1"># - black & white OpenStreetMap map</span><span class="w">
</span><span class="c1"># - NYC neighborhoods drawn as polygons and colored according to the</span><span class="w">
</span><span class="c1">#   corresponding tip percentage</span><span class="w">
</span><span class="c1"># - legend for tip percentages</span><span class="w">
</span><span class="n">average_tip_with_shapes</span><span class="w"> </span><span class="o">%>%</span><span class="w">
  </span><span class="n">leaflet</span><span class="p">()</span><span class="w"> </span><span class="o">%>%</span><span class="w">
  </span><span class="n">addProviderTiles</span><span class="p">(</span><span class="w">
    </span><span class="n">providers</span><span class="o">$</span><span class="n">OpenStreetMap.BlackAndWhite</span><span class="w">
  </span><span class="p">)</span><span class="w"> </span><span class="o">%>%</span><span class="w">
  </span><span class="n">addPolygons</span><span class="p">(</span><span class="w">
    </span><span class="n">weight</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="m">1</span><span class="p">,</span><span class="w">
    </span><span class="n">opacity</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="m">0.7</span><span class="p">,</span><span class="w">
    </span><span class="n">smoothFactor</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="m">0.3</span><span class="p">,</span><span class="w">
    </span><span class="n">fillOpacity</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="m">0.7</span><span class="p">,</span><span class="w">
    </span><span class="n">fillColor</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="o">~</span><span class="w"> </span><span class="n">pal</span><span class="p">(</span><span class="n">avg_tip_pct</span><span class="p">),</span><span class="w">
    </span><span class="n">label</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="o">~</span><span class="w"> </span><span class="n">paste0</span><span class="p">(</span><span class="n">neighborhood</span><span class="p">,</span><span class="w"> </span><span class="s2">" - "</span><span class="p">,</span><span class="w"> </span><span class="nf">round</span><span class="p">(</span><span class="n">avg_tip_pct</span><span class="p">,</span><span class="w"> </span><span class="m">2</span><span class="p">),</span><span class="w"> </span><span class="s2">"%"</span><span class="p">),</span><span class="w">
    </span><span class="n">highlightOptions</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">highlightOptions</span><span class="p">(</span><span class="w">
      </span><span class="n">color</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s2">"yellow"</span><span class="p">,</span><span class="w">
      </span><span class="n">weight</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="m">2</span><span class="p">,</span><span class="w">
      </span><span class="n">bringToFront</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="kc">TRUE</span><span class="w">
    </span><span class="p">)</span><span class="w">
  </span><span class="p">)</span><span class="w"> </span><span class="o">%>%</span><span class="w">
  </span><span class="n">addLegend</span><span class="p">(</span><span class="w">
    </span><span class="n">pal</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">pal</span><span class="p">,</span><span class="w">
    </span><span class="n">values</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="o">~</span><span class="w"> </span><span class="n">avg_tip_pct</span><span class="p">,</span><span class="w">
    </span><span class="n">opacity</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="m">1</span><span class="p">,</span><span class="w">
    </span><span class="n">title</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s2">"Tips %"</span><span class="p">,</span><span class="w">
    </span><span class="n">labFormat</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">labelFormat</span><span class="p">(</span><span class="n">suffix</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">' %'</span><span class="p">)</span><span class="w">
  </span><span class="p">)</span><span class="w">
</span>

plot of chunk nyc_tlc_leaflet

Machine Learning with sparklyr

sparklyr provides bindings to Spark’s distributed machine learning library MLlib.

In the following we build a model to predict whether a taxi driver can expect generous tips based on provider, pickup date/time, neighborhood, trip duration & distance, passenger count and fare amount. We define a Spark ML pipeline to compute a random forest classifier. Tip levels are defined as follows:

  • < 25%: standard
  • >= 25%: generous

See Tipping in New York City: a guide to who, when and how much for more information on NYC tipping. Also note the difference in how providers handle suggested tips.

<span class="c1"># Prepare training and test datasets</span><span class="w">
</span><span class="n">ml_dataset</span><span class="w"> </span><span class="o"><-</span><span class="w">
  </span><span class="n">credit_trips_spark</span><span class="w"> </span><span class="o">%>%</span><span class="w">
  </span><span class="n">sdf_partition</span><span class="p">(</span><span class="n">train</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="m">0.8</span><span class="p">,</span><span class="w"> </span><span class="n">test</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="m">0.2</span><span class="p">,</span><span class="w"> </span><span class="n">seed</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="m">2510</span><span class="p">)</span><span class="w">

</span><span class="c1"># Create machine learning pipeline</span><span class="w">
</span><span class="n">tip_level_pipeline</span><span class="w"> </span><span class="o"><-</span><span class="w">
  </span><span class="n">ml_pipeline</span><span class="p">(</span><span class="n">sc</span><span class="p">,</span><span class="w"> </span><span class="n">uid</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s2">"tip_level"</span><span class="p">)</span><span class="w"> </span><span class="o">%>%</span><span class="w">
  </span><span class="c1"># Bucketize tip percentages into levels</span><span class="w">
  </span><span class="n">ft_bucketizer</span><span class="p">(</span><span class="w">
    </span><span class="n">input_col</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s2">"tip_pct"</span><span class="p">,</span><span class="w">
    </span><span class="n">output_col</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s2">"tip_level"</span><span class="p">,</span><span class="w">
    </span><span class="n">splits</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="nf">c</span><span class="p">(</span><span class="m">0</span><span class="p">,</span><span class="w"> </span><span class="m">25</span><span class="p">,</span><span class="w"> </span><span class="kc">Inf</span><span class="p">),</span><span class="w">
    </span><span class="n">handle_invalid</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s2">"skip"</span><span class="w">
  </span><span class="p">)</span><span class="w"> </span><span class="o">%>%</span><span class="w">
  </span><span class="n">ft_r_formula</span><span class="p">(</span><span class="n">formula</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">tip_level</span><span class="w"> </span><span class="o">~</span><span class="w"> </span><span class="n">vendor_id</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="n">pickup_month</span><span class="w"> </span><span class="o">+</span><span class="w">
                 </span><span class="n">pickup_weekday</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="n">pickup_hour</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="n">neighborhood</span><span class="w"> </span><span class="o">+</span><span class="w">
                 </span><span class="n">trip_duration</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="n">trip_distance</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="n">passenger_count</span><span class="w"> </span><span class="o">+</span><span class="w">
                 </span><span class="n">fare_amount</span><span class="p">)</span><span class="w"> </span><span class="o">%>%</span><span class="w">
  </span><span class="n">ml_random_forest_classifier</span><span class="p">(</span><span class="n">num_trees</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="m">50</span><span class="p">)</span><span class="w">

</span><span class="c1"># Fit model</span><span class="w">
</span><span class="n">tip_level_model</span><span class="w"> </span><span class="o"><-</span><span class="w"> </span><span class="n">ml_fit</span><span class="p">(</span><span class="n">tip_level_pipeline</span><span class="p">,</span><span class="w"> </span><span class="n">dataset</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">ml_dataset</span><span class="o">$</span><span class="n">train</span><span class="p">)</span><span class="w">

</span><span class="c1"># Perform prediction</span><span class="w">
</span><span class="n">tip_level_prediction</span><span class="w"> </span><span class="o"><-</span><span class="w"> 
  </span><span class="n">ml_predict</span><span class="p">(</span><span class="n">tip_level_model</span><span class="p">,</span><span class="w"> </span><span class="n">dataset</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">ml_dataset</span><span class="o">$</span><span class="n">test</span><span class="p">,</span><span class="w"> 
             </span><span class="n">predicted_label_col</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s2">"prediction"</span><span class="p">)</span><span class="w">

</span><span class="c1"># Evaluate model by calculating F1 score</span><span class="w">
</span><span class="n">ml_classification_eval</span><span class="p">(</span><span class="n">tip_level_prediction</span><span class="p">,</span><span class="w"> </span><span class="n">prediction_col</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s2">"prediction"</span><span class="p">,</span><span class="w">
                       </span><span class="n">metric_name</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s2">"f1"</span><span class="p">)</span><span class="w">
</span><span class="c1">## [1] 0.6900397</span><span class="w">
</span>

Disconnect from Apache Spark

<span class="n">spark_disconnect</span><span class="p">(</span><span class="n">sc</span><span class="p">)</span><span class="w">
</span>

Timings

Description Execution time (minutes)
Data preparation 40.6
Machine Learning 85.4
Complete Spark session 126.1

Appendix

Provisioning a Google Dataproc Cluster

This section explains how to provision a Google Dataproc cluster for the execution
of the TLC Trip Data analyses shown above.

Install Google Cloud SDK

  • Register to https://cloud.google.com/ and create a new project (e.g. “my-project”).
  • Install the Google Cloud SDK by following the instructions at
    https://cloud.google.com/sdk/docs/quickstart-linux
  • If you already have gcloud installed, you may want to update your components
    using gcloud components update.

Log In

  • Log in to Google Cloud: gcloud auth login
  • Set default gcloud project: gcloud config set project my-project

Provision Dataproc Cluster

A Dataproc cluster can easily be provisioned on the command line as follows:

gcloud dataproc clusters create spark --async --image-version 1.2 \
 --master-machine-type n1-standard-1 --master-boot-disk-size 20 \
 --worker-machine-type n1-highmem-8 --num-workers 4 \
 --worker-boot-disk-size 10 --num-worker-local-ssds 1

The command above will launch a Google Cloud Dataproc cluster named spark.
The master node will be called spark-m and the worker nodes spark-w-0,
spark-w-1, etc.

Dataproc Cluster Parameters

  • --async: display information about the operation in progress, without
    waiting for the operation to complete.
  • --image-version: version of the bundle including operating system,
    Big Data components and Google Cloud Platform connectors. See https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-versions
  • --master-machine-type: the type of machine to use for the master.
    See https://cloud.google.com/compute/docs/machine-types
  • --master-boot-disk-size: the size of the boot disk for the master node.
  • --worker-machine-type: the type of machine to use for workers. See
    https://cloud.google.com/compute/docs/machine-types
  • --num-workers: the number of worker nodes in the cluster
  • --worker-boot-disk-size: the size of the boot disk for worker nodes
  • --num-worker-local-ssds: the number of local SSDs to attach to each worker

See https://cloud.google.com/sdk/gcloud/reference/dataproc/clusters/create
for a complete list of all the configuration parameters.

Google Cloud Machine Types

  • n1-standard-1: Standard machine type with 1 virtual CPU and 3.75 GB of memory.
  • n1-highmem-8: High memory machine type with 8 virtual CPUs and 52 GB of memory.

See https://cloud.google.com/compute/docs/machine-types for a complete list of
all the standard Google Cloud machine types.

  1. Note that BigQuery is priced based on a flat rate for storage and a usage rate for queries. 

To leave a comment for the author, please follow the link and comment on their blog: Mirai Solutions.

R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

Never miss an update!
Subscribe to R-bloggers to receive
e-mails with the latest R posts.
(You will not see this message again.)

Click here to close (This popup will not appear again)