Spark Starter Guide 4.9: How to Rank Data

Previous post: Spark Starter Guide 4.8: How to Order and Sort Data

Ranking is, fundamentally, ordering based on a condition. So, in essence, it’s like a combination of a where clause and order by clause—the exception being that data is not removed through ranking , it is, well, ranked, instead. While ordering allows you to sort data based on a column, ranking allows you to allocate a number (e.g. row number or rank) to each row (based on a column or condition) so that you can utilize it in logical decision making, like selecting a top result, or applying further transformations.

One very common ranking function is row_number(), which allows you to assign a unique value or “rank” to each row or rows within a grouping based on a specification. That specification, at least in Spark, is controlled by partitioning and ordering a dataset. The result allows you, for example, to achieve “top n” analysis in Spark.

Let’s look at an example where ranking can be applied to data in Spark.


Exercise Setup

In this exercise, we will try getting the results for the following question using ranking : “Who are the top 2 cats and dogs in each category?”

Yes, the preceding question could be solved without ranking, but ranking makes it easier to read, understand and achieve; that’s what makes it so powerful and popular to use. Now, let’s show how we answer the above question.

In this article, some code examples will utilize a line of code like this:

withColumn()

This allows for the modification of a column in-place. It’s commonly utilized to apply logic to a specific column of a DataFrame or DataSet. There is a variant of this, withColumnRenamed(), that is also available. It will apply logic and also rename the column simultaneously.


Follow these steps to complete the exercise in SCALA:

Import additional relevant Spark libraries using the following code:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

These allow us to access two key components in our code: the windowing specification and the row_number ranking function.

Import spark.implicits, which will be useful for handy operations in a later step using the following code:

import spark.implicits._

Create a Sequence of Rows, each containing a name, type, age and color using the following code:

val my_previous_pets = Seq(Row("fido", "dog", 4, "brown"),
                               Row("annabelle", "cat", 15, "white"),
                               Row("fred", "bear", 29, "brown"),
                               Row("daisy", "cat", 8, "black"),
                               Row("jerry", "cat", 1, "white"),
                               Row("fred", "parrot", 1, "brown"),
                               Row("gus", "fish", 1, "gold"),
                               Row("gus", "dog", 11, "black"),
                               Row("daisy", "iguana", 2, "green"),
                               Row("rufus", "dog", 10, "gold"))

Create a schema that corresponds to the data using the following code:

val schema = List(
      StructField("nickname", StringType, nullable = true),
      StructField("type", StringType, nullable = true),
      StructField("age", IntegerType, nullable = true),
      StructField("color", StringType, nullable = true)
)

Use the parallelize() function of Spark to turn that Sequence into an RDD as shown in the following code:

val petsRDD = spark.sparkContext.parallelize(my_previous_pets)

Create a DataFrame from the RDD and schema created using the following code:

val petsDF = spark.createDataFrame(petsRDD, StructType(schema))

Create a temporary table view of the data in Spark SQL called pets using the following code:

petsDF.createOrReplaceTempView("pets")

Create a Window that is partitioned by type and orders (i.e. ranks) by age using the following code:

val window = Window.partitionBy("type").orderBy($"age".desc)

Using the withColumn() function of the DataFrame, use the row_number() function (of the Spark SQL library you imported) to apply your Windowing function to the data. Finish the logic by renaming the new row_number() column to ‘rank’ and filtering down to the top two ranks of each group: cats and dogs. Print the results to the console using the following code:

petsDF.withColumn("row_number", row_number().over(window))
          .withColumnRenamed("row_number", "rank")
          .where("rank <= 2 and (type = 'dog' or type = 'cat')")
          .orderBy("type", "nickname")
          .show()

The following is the output of the preceding code:

+---------+----+---+-----+----+
| nickname|type|age|color|rank|
+---------+----+---+-----+----+
|annabelle| cat| 15|white|   1|
|    daisy| cat|  8|black|   2|
|      gus| dog| 11|black|   1|
|    rufus| dog| 10| gold|   2|
+---------+----+---+-----+----+

As you can see, each category of animal is ranked from highest to lowest based on its age, in descending order. TL;DR: The oldest animal in each category is ranked highest (i.e. #1).


Follow these steps to complete the exercise in PYTHON:

Import additional relevant Spark libraries using the following code:

from pyspark.sql.window import Window
from pyspark.sql import functions as F

These allow us to access two key components in our code: the windowing specification and the row_number ranking function.

Create a List of Rows, each containing a name, type, age and color using the following code:

my_previous_pets = [("fido", "dog", 4, "brown"),
                    ("annabelle", "cat", 15, "white"),
                    ("fred", "bear", 29, "brown"),
                    ("daisy", "cat", 8, "black"),
                    ("jerry", "cat", 1, "white"),
                    ("fred", "parrot", 1, "brown"),
                    ("gus", "fish", 1, "gold"),
                    ("gus", "dog", 11, "black"),
                    ("daisy", "iguana", 2, "green"),
                    ("rufus", "dog", 10, "gold")]

Use the parallelize() function of Spark to turn that List into an RDD as shown in the following code:

petsRDD = sc.parallelize(my_previous_pets)

Create a DataFrame from the RDD and a provided schema using the following code:

petsDF = spark.createDataFrame(petsRDD, ['nickname', 'type', 'age', 'color'])

Create a temporary table view of the data in Spark SQL called ‘pets’ using the following code:

petsDF.registerTempTable('pets')

Create a Window that is partitioned by type and orders/ranks by age using the following code

window = Window.partitionBy("type").orderBy(col("age").desc())

Using the withColumn() function of the DataFrame, use the row_number() function (of the Spark SQL library you imported) to apply your Windowing function to the data. Finish the logic by renaming the new row_number() column to rank and filtering down to the top two ranks of each group: cats and dogs. Print the results to the console using the following code:

petsDF.withColumn("row_number", F.row_number().over(window))\
    .withColumnRenamed("row_number", "rank")\
    .where("rank <= 2 and (type = 'dog' or type = 'cat')")\
    .orderBy("type", "nickname")\
    .show()

The following is the output of the preceding code:

+---------+----+---+-----+----+
| nickname|type|age|color|rank|
+---------+----+---+-----+----+
|annabelle| cat| 15|white|   1|
|    daisy| cat|  8|black|   2|
|      gus| dog| 11|black|   1|
|    rufus| dog| 10| gold|   2|
+---------+----+---+-----+----+

As you can see, Annabelle is out ahead of the pack with a ripe old age of fifteen, and Daisy pulling up 2nd place with a still impressive eight years of age. It was a closer cut with Gus and Rufus in the dog category, but hey, it’s a dog-eat-dog world out there. You can’t win them all.

Coming Up

In the next section, we will learn how Having (filtering on an aggregate column) is used! It’s a great function to Have! Ha!

2 thoughts on “Spark Starter Guide 4.9: How to Rank Data

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.