Skip to content

PySpark by Example

Here are my worked examples from the very useful LinkedIn Learning course: PySpark by Example by Jonathan Fernandes : https://www.linkedin.com/learning/apache-pyspark-by-example


Learning PySpark by Example

Over the past 12 months or so I have been learning and playing with Apache Spark. I went through the brilliant book by Bill Chambers and Matei Zaharia, Spark: The Definitive Guide, that covers Spark in depth and gives plenty of code snippets one can try out in the spark-shell. Whilst the book is indeed very detailed and provides great examples, the datasets that are included for you to get your hands on are on the order of Mb's (with the exception of the activity-data dataset used for the Streaming examples).

$ du -sh data/* | sort -rh
1.2G    data/activity-data
90M     data/retail-data
71M     data/deep-learning-images
42M     data/bike-data
208K    data/flight-data
104K    data/sample_libsvm_data.txt
32K     data/sample_movielens_ratings.txt
32K     data/regression
32K     data/multiclass-classification
32K     data/clustering
32K     data/binary-classification
12K     data/simple-ml-integers
12K     data/flight-data-hive
8.0K    data/simple-ml
4.0K    data/simple-ml-scaling
4.0K    data/README.md

For this reason, I wanted to try out PySpark by Example that plays with the City of Chicago's reported-crimes.csv dataset which is around 1.6Gb.

Another reason for why that course and the related dataset was appealing, was I could use it as an excuse to explore the plotting capabilities of Ploty, an interactive library for plotting data. This dataset had location data combined with distributions of data.

The Data

As mentioned, in the course, the City of Chicago, reported crimes data was used.

$ wget https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD
$ mv rows.csv\?accessType\=DOWNLOAD data/reported-crimes.csv
>>> from pyspark.sql.functions import to_timestamp,col,lit
>>> rc = spark.read.csv('data/reported-crimes.csv',header=True).withColumn('Date',to_timestamp(col('Date'),'MM/dd/yyyy hh:mm:ss a')).filter(col('Date') <= lit('2018-11-11'))
>>> rc.show(5)

+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|      ID|Case Number|               Date|               Block|IUCR|       Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|
+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|11034701|   JA366925|2001-01-01 11:00:00|     016XX E 86TH PL|1153| DECEPTIVE PRACTICE|FINANCIAL IDENTIT...|           RESIDENCE| false|   false|0412|     004|   8|            45|      11|        null|        null|2001|08/05/2017 03:50:...|    null|     null|    null|
|11227287|   JB147188|2017-10-08 03:00:00|  092XX S RACINE AVE|0281|CRIM SEXUAL ASSAULT|      NON-AGGRAVATED|           RESIDENCE| false|   false|2222|     022|  21|            73|      02|        null|        null|2017|02/11/2018 03:57:...|    null|     null|    null|
|11227583|   JB147595|2017-03-28 14:00:00|     026XX W 79TH ST|0620|           BURGLARY|      UNLAWFUL ENTRY|               OTHER| false|   false|0835|     008|  18|            70|      05|        null|        null|2017|02/11/2018 03:57:...|    null|     null|    null|
|11227293|   JB147230|2017-09-09 20:17:00|060XX S EBERHART AVE|0810|              THEFT|           OVER $500|           RESIDENCE| false|   false|0313|     003|  20|            42|      06|        null|        null|2017|02/11/2018 03:57:...|    null|     null|    null|
|11227634|   JB147599|2017-08-26 10:00:00| 001XX W RANDOLPH ST|0281|CRIM SEXUAL ASSAULT|      NON-AGGRAVATED|         HOTEL/MOTEL| false|   false|0122|     001|  42|            32|      02|        null|        null|2017|02/11/2018 03:57:...|    null|     null|    null|
+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
only showing top 5 rows
>>> rc.columns
['ID', 'Case Number', 'Date', 'Block', 'IUCR', 'Primary Type', 'Description', 'Location Description', 'Arrest', 'Domestic', 'Beat', 'District', 'Ward', 'Community Area', 'FBI Code', 'X Coordinate', 'Y Coordinate', 'Year', 'Updated On', 'Latitude', 'Longitude', 'Location']

Another dataset is used,

$ wget -O data/police-stations.csv https://data.cityofchicago.org/api/views/z8bn-74gv/rows.csv?accessType=DOWNLOAD
>>> ps = spark.read.csv("data/police-stations.csv", header=True)
>>> ps.show(5, truncate=False)
+------------+--------------+--------------------+-------+-----+-----+-------------------------------------------------------------------------------+------------+------------+------------+------------+------------+-----------+------------+-------------------------------+
|DISTRICT    |DISTRICT NAME |ADDRESS             |CITY   |STATE|ZIP  |WEBSITE                                                                        |PHONE       |FAX         |TTY         |X COORDINATE|Y COORDINATE|LATITUDE   |LONGITUDE   |LOCATION                       |
+------------+--------------+--------------------+-------+-----+-----+-------------------------------------------------------------------------------+------------+------------+------------+------------+------------+-----------+------------+-------------------------------+
|Headquarters|Headquarters  |3510 S Michigan Ave |Chicago|IL   |60653|http://home.chicagopolice.org                                                  |null        |null        |null        |1177731.401 |1881697.404 |41.83070169|-87.62339535|(41.8307016873, -87.6233953459)|
|1           |Central       |1718 S State St     |Chicago|IL   |60616|http://home.chicagopolice.org/community/districts/1st-district-central/        |312-745-4290|312-745-3694|312-745-3693|1176569.052 |1891771.704 |41.85837259|-87.62735617|(41.8583725929, -87.627356171) |
|6           |Gresham       |7808 S Halsted St   |Chicago|IL   |60620|http://home.chicagopolice.org/community/districts/6th-district-gresham/        |312-745-3617|312-745-3649|312-745-3639|1172283.013 |1853022.646 |41.75213684|-87.64422891|(41.7521368378, -87.6442289066)|
|11          |Harrison      |3151 W Harrison St  |Chicago|IL   |60612|http://home.chicagopolice.org/community/districts/11th-district-harrison/      |312-746-8386|312-746-4281|312-746-5151|1155244.069 |1897148.755 |41.87358229|-87.70548813|(41.8735822883, -87.705488126) |
|16          |Jefferson Park|5151 N Milwaukee Ave|Chicago|IL   |60630|http://home.chicagopolice.org/community/districts/16th-district-jefferson-park/|312-742-4480|312-742-4421|312-742-4423|1138480.758 |1933660.473 |41.97409445|-87.76614884|(41.9740944511, -87.7661488432)|
+------------+--------------+--------------------+-------+-----+-----+-------------------------------------------------------------------------------+------------+------------+------------+------------+------------+-----------+------------+-------------------------------+
only showing top 5 rows
>>> ps.columns
['DISTRICT', 'DISTRICT NAME', 'ADDRESS', 'CITY', 'STATE', 'ZIP', 'WEBSITE', 'PHONE', 'FAX', 'TTY', 'X COORDINATE', 'Y COORDINATE', 'LATITUDE', 'LONGITUDE', 'LOCATION']

Exploratory Data Analysis and Challenge Questions

Before we do this, let's cache the dataset in memory for faster querying, this will alleviate us from the burden of reading from disk each time we want to run a query.

>>> rc.cache()
DataFrame[ID: string, Case Number: string, Date: timestamp, Block: string, IUCR: string, Primary Type: string, Description: string, Location Description: string, Arrest: string, Domestic: string, Beat: string, District: string, Ward: string, Community Area: string, FBI Code: string, X Coordinate: string, Y Coordinate: string, Year: string, Updated On: string, Latitude: string, Longitude: string, Location: string]
>>> rc.count()
6752020

Note, the cache() command is evaluated lazily, so an action is required to execute it. Here we simply do a count() action to ensure cache() is run.

Display only the first 4 rows of the column names Case Number, Date and Arrest

>>> rc.select('Case Number', 'Date', 'Arrest').show(4)
+-----------+-------------------+------+
|Case Number|               Date|Arrest|
+-----------+-------------------+------+
|   JA366925|2001-01-01 11:00:00| false|
|   JB147188|2017-10-08 03:00:00| false|
|   JB147595|2017-03-28 14:00:00| false|
|   JB147230|2017-09-09 20:17:00| false|
+-----------+-------------------+------+
only showing top 4 rows

What are the top 10 number of reported crimes by Primary type, in descending order of occurrence?

>>> rc.groupBy('Primary Type').count().show(5)
+--------------------+-----+
|        Primary Type|count|
+--------------------+-----+
|OFFENSE INVOLVING...|45709|
|CRIMINAL SEXUAL A...|  333|
|            STALKING| 3384|
|PUBLIC PEACE VIOL...|47780|
|           OBSCENITY|  582|
+--------------------+-----+
only showing top 5 rows
But we want these in order, so:

>>> rc.groupBy('Primary Type').count().orderBy('count', ascending=False).show(10)
+-------------------+-------+
|       Primary Type|  count|
+-------------------+-------+
|              THEFT|1418293|
|            BATTERY|1232001|
|    CRIMINAL DAMAGE| 771399|
|          NARCOTICS| 711609|
|      OTHER OFFENSE| 418802|
|            ASSAULT| 418479|
|           BURGLARY| 388009|
|MOTOR VEHICLE THEFT| 314101|
| DECEPTIVE PRACTICE| 265567|
|            ROBBERY| 255566|
+-------------------+-------+
only showing top 10 rows

What percentage of reported crimes resulted in an arrest?

>>> rc.select('Arrest').distinct().show()
+------+
|Arrest|
+------+
| false|
|  true|
+------+
>>> rc.where(col('Arrest') == 'true').count() / rc.select('Arrest').count()
0.2775467193521346

What are the top 3 locations for reported crimes?

>>> rc.groupBy('Location Description').count().orderBy('count', ascending=False).show(3)
+--------------------+-------+
|Location Description|  count|
+--------------------+-------+
|              STREET|1770359|
|           RESIDENCE|1144528|
|           APARTMENT| 698091|
+--------------------+-------+
only showing top 3 rows

What is the most frequently reported non-criminal activity?

>>> rc.select('Primary Type').distinct().count()
36
36 types of crime..

>>> rc.select('Primary Type').distinct().orderBy(col('Primary Type')).show(36, truncate=False)
+---------------------------------+
|Primary Type                     |
+---------------------------------+
|ARSON                            |
|ASSAULT                          |
|BATTERY                          |
|BURGLARY                         |
|CONCEALED CARRY LICENSE VIOLATION|
|CRIM SEXUAL ASSAULT              |
|CRIMINAL DAMAGE                  |
|CRIMINAL SEXUAL ASSAULT          |
|CRIMINAL TRESPASS                |
|DECEPTIVE PRACTICE               |
|DOMESTIC VIOLENCE                |
|GAMBLING                         |
|HOMICIDE                         |
|HUMAN TRAFFICKING                |
|INTERFERENCE WITH PUBLIC OFFICER |
|INTIMIDATION                     |
|KIDNAPPING                       |
|LIQUOR LAW VIOLATION             |
|MOTOR VEHICLE THEFT              |
|NARCOTICS                        |
|NON - CRIMINAL                   |
|NON-CRIMINAL                     |
|NON-CRIMINAL (SUBJECT SPECIFIED) |
|OBSCENITY                        |
|OFFENSE INVOLVING CHILDREN       |
|OTHER NARCOTIC VIOLATION         |
|OTHER OFFENSE                    |
|PROSTITUTION                     |
|PUBLIC INDECENCY                 |
|PUBLIC PEACE VIOLATION           |
|RITUALISM                        |
|ROBBERY                          |
|SEX OFFENSE                      |
|STALKING                         |
|THEFT                            |
|WEAPONS VIOLATION                |
+---------------------------------+
>>> nc = rc.filter( (col('Primary Type') == 'NON - CRIMINAL') | (col('Primary Type') == 'NON-CRIMINAL ') | (col('Primary Type') == 'NON-CRIMINAL (SUBJECT SPECIFIED)') )
>>> nc.show(5, truncate=False)
+--------+-----------+-------------------+---------------------+----+--------------+-----------------+-------------------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------------------+------------+-------------+-----------------------------+
|ID      |Case Number|Date               |Block                |IUCR|Primary Type  |Description      |Location Description           |Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|Updated On            |Latitude    |Longitude    |Location                     |
+--------+-----------+-------------------+---------------------+----+--------------+-----------------+-------------------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------------------+------------+-------------+-----------------------------+
|10062441|HY250685   |2015-05-07 13:20:00|012XX S HARDING AVE  |5114|NON - CRIMINAL|FOID - REVOCATION|RESIDENCE                      |false |false   |1011|010     |24  |29            |26      |1150243     |1894129     |2015|02/10/2018 03:50:01 PM|41.865394646|-87.723928428|(41.865394646, -87.723928428)|
|10064717|HY253344   |2015-05-08 13:15:00|051XX S WENTWORTH AVE|5114|NON - CRIMINAL|FOID - REVOCATION|POLICE FACILITY/VEH PARKING LOT|false |false   |0225|002     |3   |37            |26      |1175826     |1871120     |2015|02/10/2018 03:50:01 PM|41.80171934 |-87.630703621|(41.80171934, -87.630703621) |
|10072565|HY261001   |2015-05-14 10:30:00|006XX N WELLS ST     |5114|NON - CRIMINAL|FOID - REVOCATION|STREET                         |false |false   |1832|018     |42  |8             |26      |1174623     |1904537     |2015|02/10/2018 03:50:01 PM|41.89344506 |-87.634117632|(41.89344506, -87.634117632) |
|10109156|HY297801   |2015-06-12 09:00:00|053XX S NEVA AVE     |5114|NON - CRIMINAL|FOID - REVOCATION|RESIDENCE                      |false |false   |0811|008     |23  |56            |26      |1129584     |1868411     |2015|02/10/2018 03:50:01 PM|41.795197456|-87.800355525|(41.795197456, -87.800355525)|
|10115077|HY304017   |2015-06-16 19:00:00|081XX S WHIPPLE ST   |5114|NON - CRIMINAL|FOID - REVOCATION|RESIDENCE                      |false |false   |0835|008     |18  |70            |26      |1157460     |1850515     |2015|02/10/2018 03:50:01 PM|41.745568408|-87.698616805|(41.745568408, -87.698616805)|
+--------+-----------+-------------------+---------------------+----+--------------+-----------------+-------------------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------------------+------------+-------------+-----------------------------+
only showing top 5 rows
>>> nc.groupBy(col('Description')).count().orderBy('count', ascending=False).show(truncate=False)
+--------------------------------------+-----+
|Description                           |count|
+--------------------------------------+-----+
|FOID - REVOCATION                     |38   |
|NOTIFICATION OF CIVIL NO CONTACT ORDER|9    |
+--------------------------------------+-----+

Using a bar chart, plot which day of the week has the most number of reported crime.

>>> from pyspark.sql.functions import count, avg
>>> ss = rc.groupBy(dayofweek(col('Date')), date_format(col('Date'), 'E')).agg(count("*")).show()
+---------------+--------------------+--------+
|dayofweek(Date)|date_format(Date, E)|count(1)|
+---------------+--------------------+--------+
|              2|                 Mon|  952646|
|              6|                 Fri| 1016882|
|              1|                 Sun|  911174|
|              5|                 Thu|  964457|
|              4|                 Wed|  973801|
|              3|                 Tue|  967965|
|              7|                 Sat|  965095|
+---------------+--------------------+--------+

>>> ss = rc.groupBy(dayofweek(col('Date')), date_format(col('Date'), 'E')).count().orderBy('dayofweek(Date)')
>>> ss.show()
+---------------+--------------------+-------+
|dayofweek(Date)|date_format(Date, E)|  count|
+---------------+--------------------+-------+
|              1|                 Sun| 911174|
|              2|                 Mon| 952646|
|              3|                 Tue| 967965|
|              4|                 Wed| 973801|
|              5|                 Thu| 964457|
|              6|                 Fri|1016882|
|              7|                 Sat| 965095|
+---------------+--------------------+-------+

>>> type(ss)
<class 'pyspark.sql.dataframe.DataFrame'>
>>> sbar = ss.withColumn("Day of Week", col("date_format(Date, E)"))
>>> import plotly.express as px
>>> fig = px.bar(sbar.toPandas(), x="Day of Week", y="count")
>>> fig.show()

Maps

Now let's explore plotly's functionality for plotting data with lat, long co-ordinates...

Below is a figure made from a sub-sample of the data of 100 rows.

Let's scale this up! Now we'll try for 50, 000 points:

Awesome! Now I would have loved to show all points (800,000) but my laptop crashes each time I try, I assume this is simply a memory issue and on other laptops might well be fine..

Conclusions

Playing with this dataset has been fun and it has been interesting to follow the course at the same time exploring plotly. I've put a few points on things I learned whilst writing this post. Overall, I have learned at lot and look forward to exploring more "big-data"-sets with pyspark and plotly

Things I Learned Along the Way

  1. A few key things-I-learned during this post was how to embed interactive plotly figures into markdown such that they can be rendered into the blog with ease.

    This can simply be done using the to_html(..) function:

    import plotly
    plotly.io.to_html(fig, include_plotlyjs=False, full_html=False)
    
    This spits out a <div> element one can then place into their desired markdown, which will then translate as rendered HTML.

    To ensure just the <div> element is returned, full_html=False is required. Another thing to remember is that this will return the element as a string, so the leading and trailing apostophe's that make it a string need to be removed. In the process of discovering this, a potential "bug" was found in this actual function, resulting in excessive \n characters being generated. So, the actual function that has been used for this post is:

    import importlib.util
    spec = importlib.util.spec_from_file_location("plotly", "/Users/tallamjr/github/forks/plotly.py/packages/python/plotly/plotly/io/_html.py")
    foo = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(foo)
    foo.to_html(fig, include_plotlyjs=False, full_html=False)
    

    Which points to a forked version of the plotly codebase while I have an outstanding PR waiting to be reviewed.

    A final thing to mention, is that in order for all of the plots above to show up at all, even with the <div> elements, one needs to make sure to include the necessary Javascript tags. Therefore, in the head.html file for this blog, there exists:

    $ sed -n 14,17p layouts/partials/head.html
    <!-- Plotly embeddings
    REF: http://www.kellieottoboni.com/posts/2017/08/plotly-markup/
    ================================================== -->
    <script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
    
  2. Another thing I discovered was how to allow for better formatting of the .show() command on Spark DataFrames. My approach is explained in this StackOverflow post for pyspark show dataframe as table with horizontal scroll in ipython notebook and Improve PySpark DataFrame.show output to fit Jupyter notebook:

    Adding to the answers given above by @karan-singla and @vijay-jangir, a handy one-liner to comment
    out the `white-space: pre-wrap` styling can be done like so:
    
    $ awk -i inplace '/pre-wrap/ {$0="/*"$0"*/"}1' $(dirname `python -c "import notebook as nb;print(nb.__file__)"`)/static/style/style.min.css
    
    This translates as; use `awk` to update _inplace_ lines that contain `pre-wrap` to be surrounded by
    `*/ -- */` i.e. comment out, on the file found in `styles.css` found in your working Python
    environment.
    
    This, in theory, can then be used as an alias if one uses multiple environments, say with Anaconda.
    - https://stackoverflow.com/a/24884616/4521950
    - https://stackoverflow.com/questions/16529716/save-modifications-in-place-with-awk
    
  3. Finally, this is not necessarily something I learned during this post, but it opened my eyes to the possibilities that are available when using nbconvert The notebook for this post has been rendered here using the following command:

 $ jupyter nbconvert --ExecutePreprocessor.kernel_name=python --ExecutePreprocessor.timeout=600 --to html --execute PySpark-by-Example.ipynb --output-dir /Users/tallamjr/www/blog/static/notebooks

After look for ways to link point number 1. above and how one can add custom css, I discovered the numerous customisations one can do. Some example can be found at https://github.com/jupyter/nbconvert-examples

References and Resources