Spark Starter Guide 4.10: How to Filter on Aggregate Columns

Previous post: Spark Starter Guide 4.9: How to Rank Data

Having is similar to filtering (filter(), where() or where, in a SQL clause), but the use cases differ slightly. While filtering allows you to apply conditions on your data to limit the result set, Having allows you to apply conditions on aggregate functions on your data to limit your result set.

Both limit your result set – but the difference in how they are applied is the key. In short: where filters are for row-level filtering. Having filters are for aggregate-level filtering. As a result, using a Having statement can also simplify (or outright negate) the need to use some sub-queries.

Let’s look at an example.

In previous entries in The Spark Starter Guide, we used filtering to remove animals from a dataset if their name didn’t start with the letter ‘c’. We also used filtering to remove any animals categorized as pets from our dataset. Having is useful if you want to aggregate a metric in your dataset, and then filter your dataset further based on that aggregated metric. In other words, it’s like a where clause for aggregate functions.


Exercise Setup

Which 3 categories of animals are, collectively (i.e. in total), the oldest?

Put more simply, this question is asking, “if you were, to sum up the ages of each animal in a type/category, which three categories have the greatest combined age”? Thankfully, Spark could not make it more simple.


Follow these steps to complete the exercise in SCALA:

Import additional relevant Spark libraries using the following code:

import org.apache.spark.sql.functions.{max, min}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

These allow us to use Spark SQL functions like max() and min() (programmatically).

Import spark.implicits, which will be useful for handy operations in a later step using the following code:

import spark.implicits._

Create a Sequence of Rows, each containing a name, type, age and color using the following code:

val my_previous_pets = Seq(Row("fido", "dog", 4, "brown"),
                               Row("annabelle", "cat", 15, "white"),
                               Row("fred", "bear", 29, "brown"),
                               Row("daisy", "cat", 8, "black"),
                               Row("jerry", "cat", 1, "white"),
                               Row("fred", "parrot", 1, "brown"),
                               Row("gus", "fish", 1, "gold"),
                               Row("gus", "dog", 11, "black"),
                               Row("daisy", "iguana", 2, "green"),
                               Row("rufus", "dog", 10, "gold"))

Create a schema that corresponds to the data using the following code:

val schema = List(
      StructField("nickname", StringType, nullable = true),
      StructField("type", StringType, nullable = true),
      StructField("age", IntegerType, nullable = true),
      StructField("color", StringType, nullable = true)
)

Use the parallelize() function of Spark to turn that Sequence into an RDD as shown in the following code:

val petsRDD = spark.sparkContext.parallelize(my_previous_pets)

Create a DataFrame from the RDD and schema created using the following code:

val petsDF = spark.createDataFrame(petsRDD, StructType(schema))

Create a temporary table view of the data in Spark SQL called pets using the following code:

petsDF.createOrReplaceTempView("pets")

From here, you have two options for completing this task. You can use SQL code, or write the same logic programmatically.

Option 1: Write a SQL query in Spark SQL that gives us the collective age of each group that is greater than 10 using the following code:

spark.sql("select type, sum(age) as total_age from pets group by type having total_age > 10 order by total_age desc").show()

Option 2: Write the equivalent of a SQL query through programmatic Spark functions to calculate the collective age of each group of animals – and only returning groups with a collective age greater than 10.

petsDF.groupBy("type")
      .agg("age" -> "sum")
      .withColumnRenamed("sum(age)", "total_age")
      .where("total_age > 10")
      .orderBy($"total_age".desc)
      .show()

The following is the output of the preceding code:

+----+---------+
|type|total_age|
+----+---------+
|bear|       29|
| dog|       25|
| cat|       24|
+----+---------+

As you can see, the only animals in our dataset that are, collectively, older than 10, are bears, dogs and cats.


Follow these steps to complete the exercise in PYTHON:

Create a List of Rows, each containing a name, type, age and color using the following code:

my_previous_pets = [("fido", "dog", 4, "brown"),
                    ("annabelle", "cat", 15, "white"),
                    ("fred", "bear", 29, "brown"),
                    ("daisy", "cat", 8, "black"),
                    ("jerry", "cat", 1, "white"),
                    ("fred", "parrot", 1, "brown"),
                    ("gus", "fish", 1, "gold"),
                    ("gus", "dog", 11, "black"),
                    ("daisy", "iguana", 2, "green"),
                    ("rufus", "dog", 10, "gold")]

Use the parallelize() function of Spark to turn that List into an RDD as shown in the following code:

petsRDD = sc.parallelize(my_previous_pets)

Create a DataFrame from the RDD and a provided schema using the following code:

petsDF = spark.createDataFrame(petsRDD, ['nickname', 'type', 'age', 'color'])

Create a temporary table view of the data in Spark SQL called ‘pets’ using the following code:

petsDF.registerTempTable('pets')

You have two options for accomplishing this task: using pure SQL, or taking the programmatic approach.

Option 1: Write a SQL query in Spark SQL that gives us the collective age of each group that is greater than 10, using the following code.

spark.sql("select type, "
          "sum(age) as total_age "
          "from pets "
          "group by type "
          "having total_age > 10 "
          "order by total_age desc").show()

Option 2: Use the programmatic, function-chaining alternative to Spark SQL to calculate the collective age of each type of animal. Only retain the groups with a collective age over 10.

petsDF.groupBy("type")\
    .sum("age")\
    .withColumnRenamed("sum(age)", "total_age")\
    .where("total_age > 10")\
    .orderBy(col("total_age").desc())\
    .show()

The following is the output of the preceding code:

+----+---------+
|type|total_age|
+----+---------+
|bear|       29|
| dog|       25|
| cat|       24|
+----+---------+

As expected, only the above three groups pass muster for our condition! They have what it takes!

Coming Up

In the next section, we’ll learn about normalized and de-normalized data, and one way Spark can address navigating them.

One thought on “Spark Starter Guide 4.10: How to Filter on Aggregate Columns

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.