PySpark Window Functions
These window functions are useful when we need to perform aggregate operations on DataFrame columns in a given window frame. PySpark Window functions are running on a set of rows and finally return a single value for each row in the input. There are three kinds of window functions available in PySpark SQL.
To explain the functionality of those functions, a dataset of IMDB ranked movies is used from Kaggle.
Dataset Link: https://www.kaggle.com/stefanoleone992/imdb-extensive-dataset
In the selected dataset there are many columns available, but for the explaining purposes, I considered only the title, year, genre, and duration columns.
Before we apply functions to our group, first we need to,
- Establish a SparkSession, which is the entry point to Spark, by importing SparkSession from pyspark.sql.
from pyspark.sql import SparkSessionspark = SparkSession.builder\.master(“local”)\.appName(“Colab”)\.config(‘spark.ui.port’, ‘4050’)\.getOrCreate()
2. Create a PySpark DataFrame (Here a CSV file is loaded to create the DataFrame)
df = spark.read.csv("/content/drive/My Drive/IMDbmovies.csv", header=True, inferSchema=True)
3. Partition the data -- For that we use Window.partitionBy() function.
4. Order the data -- For that, we use orderBy() function.
Here data is partitioned using Genre and Ordered by the Year.
from pyspark.sql.window import Windowfrom pyspark.sql.functions import row_numberwindowSpec = Window.partitionBy("genre").orderBy("year")
Now we are ready to check the functionality of each function. Let’s discuss one by one.
- PySpark Window Ranking functions
row_number()
df.withColumn("row_number",row_number().over(windowSpec)) \.select("title","year","genre","duration","row_number").show(10)
Output
+--------------------+----+--------------------+--------+----------+| title|year| genre|duration|row_number|+--------------------+----+--------------------+--------+----------+|The Wishing Ring:...|1914| Comedy, Drama| 54| 1||The Social Secretary|1916| Comedy, Drama| 52| 2||Rebecca of Sunnyb...|1917| Comedy, Drama| 78| 3|| M'Liss|1918| Comedy, Drama| 73| 4|| Mickey|1918| Comedy, Drama| 93| 5|| Miss Jerry|1894| Romance| 45| 1|
Here in this example, you can see we have partitioned the data using the genre of the film. Same genre films grouped to a same partition. Rows in each set of partition are ordered by using the year of release of the film. Each row in the same window partition is given a sequential number starting from 1. The number will increase sequentially until the window partition is finished. Every time a new window partition will begin with 1.
Ex: All rows which have Comedy, Drama as the genre are partitioned into a same group and the rows in the same partition are ordered by year as 1914, 1916, 1917, 1918 …. Etc. in ascending order. Rows in this partition are given a row number from 1 to 5 because it has 5 rows in the partition.
rank()
In each window partition, the rank() window function can be used to assign a rank to the result of each partition. If the ordering factor is same in more than one row in the same partition, they will get the same rank and function will leave the gaps.
from pyspark.sql.functions import rankdf.withColumn("rank",rank().over(windowSpec)) \.select("title","year","genre","duration","rank").show(50)
Output
+--------------------+----+--------------------+--------+----+| title|year| genre|duration|rank|+--------------------+----+--------------------+--------+----+| Den sorte drøm|1911| Drama| 53| 1|| Richard III|1912| Drama| 55| 2|| Atlantis|1913| Drama| 121| 3||Il calvario di un...|1913| Drama| 96| 3||Ma l'amor mio non...|1914| Drama| 90| 5|
Consider the above output. There shown nine rows of the Drama partition and they are ranked by considering the year of release. Since the last five rows in the shown partition have the same year, they have ranked as same. Since the third and the fourth rows have the same year, the function has skipped the rank of 4 and the next row is ranked as 5 because this ranking function is skipping the gaps.
dense_rank()
This function is much similar to the rank() function, but even though rank() function skips the gaps dense_rank() function is not skipping the gaps.
from pyspark.sql.functions import dense_rankdf.withColumn("dense_rank",dense_rank().over(windowSpec)) \.select("title","year","genre","duration","dense_rank").show(50)
Output
+--------------------+----+--------------------+--------+----------+| title|year| genre|duration|dense_rank|+--------------------+----+--------------------+--------+----------+| Den sorte drøm|1911| Drama| 53| 1|| Richard III|1912| Drama| 55| 2|| Atlantis|1913| Drama| 121| 3||Il calvario di un...|1913| Drama| 96| 3||Ma l'amor mio non...|1914| Drama| 90| 4|
Here in this output, you can see that, since the third and fourth rows have the same year, they are ranked as same and the fifth row is ranked as 4. If you use rank() function here the fifth row will get the rank of 5, because it is skipping the gaps. Here in dense_rank() function you can see that gaps are not skipped.
percent_rank ()
This function is used to get the window partition’s relative rank() of rows. This can be calculated by,
In a selected partition,
Percent rank = (rank of the row — 1) / (number of rows — 1)
from pyspark.sql.functions import percent_rank df.withColumn("percent_rank",percent_rank().over(windowSpec)) \ .select("title","year","genre","duration","percent_rank").show(10)
Output
In the above example let’s apply the formula and calculate the percent rank for the second row.
By considering the first partition which has genre of Comedy, Drama,
percent_rank = (rank of the row — 1) / (number of rows — 1)
= (2–1) / (5–1)
= ¼ = 0.25
ntile()
The ntile() function divides rows of an ordered partition into a given number of roughly equivalent classes. It gives a number to each category which starts with one. Here I have used 2 as an argument to ntile() in the example below, so it returns a ranking between two values. If I gave 3 as the argument it will divide the partition into roughly equivalent 3 classes
from pyspark.sql.functions import ntile df.withColumn("ntile",ntile(2).over(windowSpec)) \ .select("title","year","genre","duration","ntile").show(10)
Output
2. PySpark Window Analytic functions
cume_dist()
The cume_dist() function determines a value’s cumulative distribution of a certain window partition. In other words, it determines a value’s relative location within a set of values. Cumulative distribution can be calculated using the following formula.
Cumulative Distribution = N / total rows¼
N - Number of rows with a value less than or equal to the current row value.
total rows -Total number of rows in the partition.
from pyspark.sql.functions import cume_distdf.withColumn("cume_dist",cume_dist().over(windowSpec)) \.select("title","year","genre","duration","cume_dist").show(10)
Output
+--------------------+----+--------------------+--------+---------+| title|year| genre|duration|cume_dist|+--------------------+----+--------------------+--------+---------+|The Wishing Ring:...|1914| Comedy, Drama| 54| 0.2||The Social Secretary|1916| Comedy, Drama| 52| 0.4||Rebecca of Sunnyb...|1917| Comedy, Drama| 78| 0.6|| M'Liss|1918| Comedy, Drama| 73| 1.0|| Mickey|1918| Comedy, Drama| 93| 1.0|| Miss Jerry|1894| Romance| 45| 0.5|
Here in this output you can see that the partition which has the genre of Comedy, Drama has five rows and lets calculate the cumulative distribution for the third row of that partition. Year value for the 3rd row is 1917 and there are three rows that has years less than or equal to 1917. Hence N = 3. There are total of 5 rows in the partition. Hence,
Cumulative Distribution = N / total rows = 3 / 5 = 0.6
lag()
The lag() function is a window function that helps to view data from a previous row by looking back a series of rows.
from pyspark.sql.functions import lagdf.withColumn("lag",lag("duration",1).over(windowSpec)) \.select("title","year","genre","duration","lag").show(10)
Output
The first value of the lag column is null because there is no previous film duration. Since we used an offset of 1, the lag() window function would list all of the film duration values in the table in ascending order and then return the film duration that is one place lower in the result collection.
lead()
lead() is a window function that allows you to enter a row at a given physical offset that follows the current row. In other words, you can access data of a forward row from the current row.
from pyspark.sql.functions import leaddf.withColumn("lead",lead("duration",1).over(windowSpec)) \.select("title","year","genre","duration","lead").show(10)
Output
Since we used an offset of 1, the lead() window function would list all of the film duration values in the table in ascending order and then return the film duration that is one place higher in the result collection.
3. PySpark Window Aggregate Functions
We can use Aggregate window functions and WindowSpec to get the summation, minimum, and maximum for a certain column. We don’t need to use the order by clause when dealing with Aggregate functions. In this example, we have calculated the sum, min, max and avg of the duration column.
windowSpecAgg = Window.partitionBy("genre")from pyspark.sql.functions import col,avg,sum,min,max,row_numberdf.withColumn("row",row_number().over(windowSpec)) \.withColumn("avg", avg(col("duration")).over(windowSpecAgg)) \.withColumn("sum", sum(col("duration")).over(windowSpecAgg)) \.withColumn("min", min(col("duration")).over(windowSpecAgg)) \.withColumn("max", max(col("duration")).over(windowSpecAgg)) \.where(col("row")==1).select("genre","avg","sum","min","max") \.show()
Output
+--------------------+------------------+----+---+---+| genre| avg| sum|min|max|+--------------------+------------------+----+---+---+| Comedy, Drama| 70.0| 350| 52| 93|| Romance| 52.5| 105| 45| 60||Comedy, Drama, Ro...| 65.0| 65| 65| 65||Adventure, Drama,...| 68.0| 68| 68| 68|