Hadoopsters is Moving to Medium.com

For nearly 7 years, I’ve been writing Big Data and Data Engineering tutorials for Hadoop, Spark, and beyond – and I’ve been doing it on a website I created with my friends and co-workers – this website – Hadoopsters.com

A peek at our analytics from the year level.

In 2021, our biggest year yet, we saw over 47,000 unique visitors, and served nearly 62,000 views. Since the site launched 2,430 days ago, we’ve helped over 189,000 visitors, and served just over a quarter-million views (and we never ran ads). 

We created Hadoopsters out of the love of sharing knowledge, and a passion for filling documentation gaps in what we knew would be an explosive new career path for many: Data Engineering! In the early days of Hadoop, my peers and I recognized the significant gap in documentation for solving some of the most frustrating technical problems – and we wanted to help solve it.

Hadoopsters as a website has been a WordPress blog this entire time… until now! We have officially moved to Medium.com as a Medium Publication. 

Introducing, the new Hadoopsters.com!

So, why did we do this?

Firstly, economics. Hosting a website all on our own (via WordPress) is just not worth the expense – especially since we choose not to run advertising. Moving to Medium allows us to have effectively zero hosting expense, and we only pay a small amount annually to keep our domain mapped to Medium.

Secondly, distribution. The Medium platform presents an excellent opportunity to circulate our work in the go-to place for tech writing. It has built-in distribution through its article recommendation engine, and it has quickly become the go-to place for tech writing. This allows our work to reach a much broader audience, organically, and without the need for intentional marketing (again, more money) to drive people to our specific website.

I am extremely excited about reaching more Data Engineers and Data Scientists through knowledge sharing, and making the data world an even more well-documented place.

If you wish to follow my writing, all of it will continue to be through Hadoopsters – just now on Medium. 🙂 

Thank you to everyone who has read and leveraged our work through this website. We hope to make an impact on your work and career for years to come. Be well!

TL;DR

Hadoopsters.com now fully routes to our Medium publication. Our legacy website (this one), which has now reverted to hadoopsters.wordpress.com, will live on as long as WordPress will continue to serve it. We’ve migrated over (cloned) select historical articles from this site to Medium, but every article ever written here will remain here. All new content will be produced exclusively on Medium.

Advertisement

Spark Starter Guide 4.10: How to Filter on Aggregate Columns

Previous post: Spark Starter Guide 4.9: How to Rank Data

Having is similar to filtering (filter(), where() or where, in a SQL clause), but the use cases differ slightly. While filtering allows you to apply conditions on your data to limit the result set, Having allows you to apply conditions on aggregate functions on your data to limit your result set.

Both limit your result set – but the difference in how they are applied is the key. In short: where filters are for row-level filtering. Having filters are for aggregate-level filtering. As a result, using a Having statement can also simplify (or outright negate) the need to use some sub-queries.

Let’s look at an example.

Read More »

Spark Starter Guide 4.9: How to Rank Data

Previous post: Spark Starter Guide 4.8: How to Order and Sort Data

Ranking is, fundamentally, ordering based on a condition. So, in essence, it’s like a combination of a where clause and order by clause—the exception being that data is not removed through ranking , it is, well, ranked, instead. While ordering allows you to sort data based on a column, ranking allows you to allocate a number (e.g. row number or rank) to each row (based on a column or condition) so that you can utilize it in logical decision making, like selecting a top result, or applying further transformations.

Read More »

Spark Starter Guide 4.8: How to Order and Sort Data

Previous post: Spark Starter Guide 4.7: How to Standardize Data

Ordering is useful for when you want to convey… well, order. To be more specific, Ordering (also known as sorting) is most often used in the final analysis or output of your data pipeline, as a way to display data in an organized fashion based on criteria. This results in data that is sorted, and ideally easier to understand.

Read More »

Spark Starter Guide 4.7: How to Standardize Data

Spark Starter Guide 4.7: How to Standardize Data

Previous post: Spark Starter Guide 4.6: How to Aggregate Data

Introduction

Standardization is the practice of analyzing columns of data and identifying synonyms or like names for the same item. Similar to how a cat can also be identified as a kitty, kitty cat, kitten or feline, we might want to standardize all of those entries into simply “cat” so our data is less messy and more organized. This can make future processing of the data more streamlined and less complicated. It can also reduce skew, which we address in Addressing Data Cardinality and Skew.

We will learn how to standardize data in the following exercises.


NOTE

From this point onward, future code examples could utilize a line of code like this:

import spark.implicits._

This allows for implicit conversions for Scala objects like RDDs into modern Spark abstractions like Dataset, DataFrame, or Columns. It also supports many convenience functions from Spark SQL like isin() (checking for values in a list) or desc() (descending order sorting).


Two Types of Standardization

There are at least two basic ways to standardize something:

  1. You can recognize when two things are the same but literally different (“puppy” and “dog”) and associate them without actually changing anything.

  2. You can recognize when two things are the same and change them to be consistent (change instances of “puppy” to “dog”).

We’ll show ways to do both in Spark, in Scala and Python. Both involve some form of a synonym library.

Standardization through Suggestion

Exercise Setup

Follow these steps to complete the exercise in SCALA:

Import additional relevant Spark libraries using the following code:

import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

Import spark.implicits, which will be useful for handy operations in a later step using the following code:

import spark.implicits._

Create a Sequence of Rows, each containing an animal name and type using the following code:

val my_previous_pets = Seq(Row("annabelle", "cat"),
                           Row("daisy", "kitten"),
                           Row("roger", "puppy"),
                           Row("joe", "puppy dog"),
                           Row("rosco", "dog"),
                           Row("julie", "feline"))

Create a schema that corresponds to the data using the following code:

val schema = List(
  StructField("nickname", StringType, nullable = true),
  StructField("type", StringType, nullable = true)
)

Use the parallelize() function of Spark to turn that Sequence into an RDD as shown in the following code:

val petsRDD = spark.sparkContext.parallelize(my_previous_pets)

Create a DataFrame from the RDD and schema created using the following code:

val petsDF = spark.createDataFrame(petsRDD, StructType(schema))

Use the where() function of the DataFrame in combination with the isin() function (of the implicits library) to only keep rows where the name matches a provided list of dog nouns. Print the results to the console as shown in the following code:

val dogs = petsDF
.where($"type"
.isin("dog", "puppy", "puppy dog", "hound", "canine"))

dogs.show()

The following is the output of the preceding code:

+--------+---------+
|nickname|     type|
+--------+---------+
|   roger|    puppy|
|     joe|puppy dog|
|   rosco|      dog|
+--------+---------+

Use the where() function of the DataFrame in combination with the isin() function (of the implicits library) to only keep rows where the name matches a provided list of cat nouns. Print the results to the console as shown in the following code:

val cats = petsDF
.where($"type"
.isin ("cat", "kitty", "kitten", "feline", "kitty cat"))

cats.show()

The following is the output of the preceding code:

+---------+------+
| nickname|  type|
+---------+------+
|annabelle|   cat|
|    daisy|kitten|
|    julie|feline|
+---------+------+

As we can see, we were able to use custom standardization logic to decide when a cat is a cat, and a dog is a dog, even when their given type is not consistent. And, generally speaking, the isin() methodology of string comparisons in a list is relatively efficient at scale.


Follow these steps to complete the exercise in PYTHON:

Import additional relevant Spark libraries using the following code:

from pyspark.sql.functions import col

Create a List of Rows, each containing a name and type using the following code:

my_previous_pets = [Row("annabelle", "cat"),
                    Row("daisy", "kitten"),
                    Row("roger", "puppy"),
                    Row("joe", "puppy dog"),
                    Row("rosco", "dog"),
                    Row("julie", "feline")]

Use the parallelize() function of Spark to turn that List into an RDD as shown in the following code:

petsRDD = sc.parallelize(my_previous_pets)

Create a DataFrame from the RDD and a provided schema using the following code:

petsDF = spark.createDataFrame(petsRDD, ['nickname', 'type'])

Use the where() function of the DataFrame in combination with the isin() function (of the implicits library) to only keep rows where the name matches a provided list of dog nouns. Print the results to the console as shown in the following code:

dogs = petsDF
.where(col("type")
.isin("dog", "puppy", "puppy dog", "hound", "canine"))

dogs.show()

The following is the output of the preceding code:

+--------+---------+
|nickname|     type|
+--------+---------+
|   roger|    puppy|
|     joe|puppy dog|
|   rosco|      dog|
+--------+---------+

Use the where() function of the DataFrame in combination with the isin() function (of the implicits library) to only keep rows where the name matches a provided list of cat nouns. Print the results to the console as shown in the following code:

cats = petsDF
.where(col("type")
.isin(["cat", "kitty", "kitten", "feline", "kitty cat"]))

cats.show()

This example also demonstrates that you can pass a list to the isin() function, not just a comma-separated list of strings values as demonstrated in the previous step.

The following is the output of the preceding code:

+---------+------+
| nickname|  type|
+---------+------+
|annabelle|   cat|
|    daisy|kitten|
|    julie|feline|
+---------+------+

We can see that all cats could be identified in the data set, even though they weren’t all labeled as type ‘cat’.

Standardization through Modification

In the previous exercise, we would quietly identify animals as a certain type if their type was found in a list of common synonyms for the proper type. In this exercise, we will actually modify our data to be standardized, by replacing the similar type value with its preferred, standard alternative.


NOTE

From this point onward, further Scala code examples could, where appropriate, utilize a case class. This is a Scala only abstraction that acts as a simple schema for structured, tabular-style data.

It’s an especially handy tool when paired with the Spark Dataset API, which can make powerful code even simpler to read and work with than DataFrames. It should be created outside of the main() function in Scala, or imported from its own class.

Otherwise, you will experience exceptions.

Example:

case class Person(name:String, age:Int)


Exercise Setup

Follow these steps to complete the exercise in SCALA:

Import additional relevant Spark libraries using the following code:

import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

Then, create a case class called Pet, which contains two columns: nickname and petType using the following code:

case class Pet(nickname: String, petType: String)

Using the petsDF created in the previous exercise, Use the map() function of the DataFrame to compare the petType to a list of common dog and cat synonyms – returning “dog” or “cat”, respectively if there is a match. If there is not a match, return the unmodified petType (thus assuming it cannot be standardized) as shown in the following code:

val standardized_pets = petsDF.map(pet => {

  val nickname = pet.getString(pet.fieldIndex("nickname"))
  val petType = pet.getString(pet.fieldIndex("type"))

  val standardType =
    if (Seq("dog", "puppy", "puppy dog", "hound", "canine").contains(petType)){
      "dog"
    }
    else if (Seq("cat", "kitty", "kitten", "feline", "kitty cat").contains(petType)){
    "cat"
    }
  else{
      petType
    }

  Pet(nickname, standardType)
})

Print the results to the console using the following code:

standardized_pets.show()

The following is the output of the preceding code:

+---------+---+
|       _1| _2|
+---------+---+
|annabelle|cat|
|    daisy|cat|
|    roger|dog|
|      joe|dog|
|    rosco|dog|
|    julie|cat|
+---------+---+

As we can observe, Column 1 in the table displays the names of the pets while column 2 displays the type of pet animal they are, cat or dog, after passing through the standardization process.

Follow these steps to complete the exercise in PYTHON:

Create and utilize a standardize() function to compare the petType to a list of common dog and cat nouns – returning “dog” or “cat”, respectively, if there is a match.

def standardize(pet):

    name = pet[0]
    animal_type = pet[1]

    if animal_type in ["dog", "puppy", "puppy dog", "hound", "canine"]:
        return name, "dog"
    elif animal_type in ["cat", "kitty", "kitten", "feline", "kitty cat"]:
        return name, "cat"
    else:
        return pet

Then, apply the standardize() function to petsRDD (created in the previous exercise) using the map() function. Hint: You can also use a UDF on the DataFrame instead of this RDD map method, but we’ll cover that in a future exercise!

Print the results to the console using the following code:

standardizedPets = petsRDD.map(standardize)
standardizedPetsDF = spark.createDataFrame(standardizedPets, ['nickname', 'type'])

standardizedPetsDF.show()

The following is the output of the preceding code:

+---------+----+
| nickname|type|
+---------+----+
|annabelle| cat|
|    daisy| cat|
|    roger| dog|
|      joe| dog|
|    rosco| dog|
|    julie| cat|
+---------+----+

We can see that annabelle, daisy and julie are identified as cats, while roger, joe and rosco are identified as dogs, even though their types, starting out, were not labeled strictly as such.

In this exercise, we learned how to use custom standardization logic in Spark to identify similar animals, even when their given types differed.

In the next section, we’ll cover Ordering & Sorting – useful for when you want to convey… well, order! See you then!

Spark Starter Guide 4.6: How to Aggregate Data

Spark Starter Guide 4.6: How to Aggregate Data

Previous post: Spark Starter Guide 4.5: How to Join DataFrames

Introduction

Also known as grouping, aggregation is the method by which data is summarized by dividing it into common, meaningful groups. At the same time that information is grouped, you can also summarize data points from those groups through a series of SQL-like aggregate functions (i.e. functions that you can run during data aggregations).

Say you run an online e-commerce website that sells shoes. You have data logs that tell you what each customer purchased, and when.

customershoe_namesale_pricepurchase_date
landonblue shoes5.002020-10-01
jamesblue shoes5.002020-10-04
zachwhite shoes6.002020-10-06
An example data set of sold shoes, called Sales.

At the end of each month, you might want to aggregate that data such that you can see how much revenue each model of shoe brought in that month. In SQL, that might look something like this:

select sum(sale_price) as revenue, 
       shoe_name 
from sales 
group by shoe_name

In this example, shoe_name is our grouping field, and the sum total of sales (for each shoe_name) is our aggregation metric. You would expect results to show something like:

itemrevenue
blue shoes10.00
white shoes6.00
After all, we had two sales of blue shoes at $5/pair, and only one sale of white shoes at $6 a pair. If we had just done sum(sale_price) as revenue, but didn’t group by shoe_name, we’d simply get a total of $16.

Sum, of course, is just one example of an aggregate function. Others include min(), max(), count(), and stdev(), to name a few. All of these can be used to summarize identifiable groups within your data.

In the following exercises, we will learn to analyze data in groups by way of aggregation.


Exercise Setup

Follow these steps to complete the exercise in SCALA:

Import additional relevant Spark libraries using the following code:

import org.apache.spark.sql.functions.{max, min}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

Continuing to build on our animal data set (from previous exercises in this guide), Create a Sequence of Rows, each containing an animal name, type, age and color using the following code:

val my_previous_pets = Seq(
  Row("fido", "dog", 4, "brown"),
  Row("annabelle", "cat", 15, "white"),
  Row("fred", "bear", 29, "brown"),
  Row("gus", "parakeet", 2, "black"),
  Row("daisy", "cat", 8, "black"),
  Row("jerry", "cat", 1, "white"),
  Row("fred", "parrot", 1, "brown"),
  Row("gus", "fish", 1, "gold"),
  Row("gus", "dog", 11, "black"),
  Row("daisy", "iguana", 2, "green"),
  Row("rufus", "dog", 10, "gold"))

Create a schema that mirrors the data you just created using the following code. The schema will be used by Spark to form a DataFrame.

val schema = List(
  StructField("nickname", StringType, nullable = true),
  StructField("type", StringType, nullable = true),
  StructField("age", IntegerType, nullable = true),
  StructField("color", StringType, nullable = true)
)

Use the parallelize() function of Spark to turn that Sequence into an RDD as shown in the following code:

val petsRDD = spark.sparkContext.parallelize(my_previous_pets)

Create a DataFrame from the RDD and schema created using the following code:

val petsDF = spark.createDataFrame(petsRDD, StructType(schema))

Create a temporary table view of the data in Spark SQL called ‘pets’ using the following code:

petsDF.createOrReplaceTempView("pets")

You’ve now completed the initial setup for this exercise in Scala. Skip past the subsequent section in Python to continue.


Follow these steps to complete the exercise in PYTHON:

Import relevant Spark SQL libraries using the following code:

from pyspark.sql import functions as F

Continuing to build on our animal data set, Create a List of Rows, each containing an animal name, type, age and color using the following code:

my_previous_pets = [("fido", "dog", 4, "brown"),
                    ("annabelle", "cat", 15, "white"),
                    ("fred", "bear", 29, "brown"),
                    ("gus", "parakeet", 2, "black"),
                    ("daisy", "cat", 8, "black"),
                    ("jerry", "cat", 1, "white"),
                    ("fred", "parrot", 1, "brown"),
                    ("gus", "fish", 1, "gold"),
                    ("gus", "dog", 11, "black"),
                    ("daisy", "iguana", 2, "green"),
                    ("rufus", "dog", 10, "gold")]

Use the parallelize() function of Spark to turn that List into an RDD as shown in the following code:

petsRDD = sc.parallelize(my_previous_pets)

Create a DataFrame from the RDD and a provided schema using the following code:

petsDF = spark.createDataFrame(petsRDD, ['nickname', 'type', 'age', 'color'])

Create a temporary table view of the data in Spark SQL called ‘pets’ using the following code:

petsDF.createOrReplaceTempView('pets')

You’ve now completed the initial setup for this exercise in Python.

If you chose to do the setup for this exercise in Scala, you can now proceed from this point.

Analysis through Aggregation

We now have an in-memory view (i.e. relational table representation) of our data. It only exists within the confines of our Spark application, but it enables us to run SQL queries against it as if it were a real table in a database. This will prove handy time after time in your work with Spark.

Now that we have a query-able view, let’s answer several different questions about the data by using Spark’s rich, comprehensive SQL functionality to query the temporary view we’ve created. Once a table is registered, you can query it as many times as you like, again, as if it were a real table.

As usual, we’ve provided solutions in both Scala and Python for your convenience. Where the solution is the exact same, we have coalesced the code.

What are the three most popular (i.e. recurring) names in the data?

To answer this question, we need to write a SQL query in Spark SQL that gives us the count of name occurrences in the table. You can also do this functionally (with methods on the DataFrame), but this example will showcase the pure SQL approach.

The following shows the code in SCALA and PYTHON:

spark.sql("select nickname, count(*) as occurrences from pets group by nickname order by occurrences desc limit 3").show()

The following is the output of the preceding code:

+--------+-----------+
|nickname|occurrences|
+--------+-----------+
|     gus|          3|
|    fred|          2|
|   daisy|          2|
+--------+-----------+

As can be seen the three most popular names are gus, fred, and daisy.

How old is the oldest cat in the data?

We can use the functional API of Spark SQL to find the maximum age of cats in the data. As demonstrated above, you can also use pure SQL to achieve this – but this example will focus on the purely functional approach.

Follow these steps for SCALA:

  1. Use the where() function of the DataFrame to filter the data to just cats.
  2. Use the agg() function of the DataFrame to select the max age.
  3. Use the show() function of the DataFrame to print the results to the console.

In code form:

petsDF.where("type = 'cat'")
      .agg(Map("age" -> "max"))
      .show()

The following is the output of the preceding code:

+--------+
|max(age)|
+--------+
|      15|
+--------+

The oldest cat in our data is 15!

Follow these steps for PYTHON:

  1. Use the where() function of the DataFrame to filter the data to just cats.
  2. Use the agg() function of the DataFrame to select the max age.
  3. Use the show() function of the DataFrame to print the results to the console.

In code form:

petsDF.where("type = 'cat'")\
    .agg({"age": "max"})\
    .show()

The following is the output of the preceding code:

+--------+
|max(age)|
+--------+
|      15|
+--------+

What are the youngest and oldest cat ages?

We can use the functional API of Spark SQL to to answer this question.

Following are the steps for implementing this in SCALA:

  1. Use the where() function of the DataFrame to filter the data to just cats.
  2. Group the data by type using groupBy().
  3. Then, combine the agg() function of the DataFrame with the min() and max() functions to request two metrics: min and max age. Optional: rename the columns using alias().
  4. Finally, print the results to the console using the show() function of the DataFrame.
petsDF.where("type = 'cat'")
  .groupBy("type")
  .agg(min("age").alias("min_age"), max("age").alias("max_age"))
  .show()

The following is the output of the preceding code:

+----+-------+-------+
|type|min_age|max_age|
+----+-------+-------+
| cat|      1|     15|
+----+-------+-------+

As can be seen the youngest cat’s age is 1 and the oldest cat’s age is 15.

Following are the steps for implementing this in PYTHON:

  1. Use the where() function of the DataFrame to filter the data to just cats.
  2. Group the data by type using groupBy().
  3. Then, combine the agg() function of the DataFrame with the min() and max() functions to request two metrics: min and max age. Optional: rename the columns using alias().
  4. Finally, print the results to the console using the show() function of the DataFrame.
petsDF.where(petsDF["type"] == "cat") \
    .groupBy("type") \
    .agg(F.min("age"), F.max("age")) \
    .show()

The following is the output of the preceding code:

+----+-------+-------+
|type|min_age|max_age|
+----+-------+-------+
| cat|      1|     15|
+----+-------+-------+

What is the average dog age?

We can use the functional API of Spark SQL to find the average age of dogs.

Following are the steps for implementing this in SCALA:

  1. Use the where() function of the DataFrame to filter the data to just dogs.
  2. Group the data by type using groupBy().
  3. Use the agg() function of the DataFrame to select the average age.
  4. Finally, print the results to the console using the show() function of the DataFrame.
petsDF.where("type = 'dog'")
    .groupBy("type")
    .agg("age" -> "avg")
    .show()

The following is the output of the preceding code:

+----+-----------------+
|type|         avg(age)|
+----+-----------------+
| dog|8.333333333333334|
+----+-----------------+

As can be seen the average dog’s age is 8.334.

Following are the steps for implementing this in PYTHON:

  1. Use the where() function of the DataFrame to filter the data to just dogs.
  2. Group the data by type using groupBy().
  3. Use the agg() function of the DataFrame to select the average age.
  4. Finally, print the results to the console using the show() function of the DataFrame.
petsDF.where("type = 'dog'")\
    .groupBy("type")\
    .agg(F.avg("age"))\
    .show()

The following is the output of the preceding code:

+----+-----------------+
|type|         avg(age)|
+----+-----------------+
| dog|8.333333333333334|
+----+-----------------+

As can be seen the average age of dogs is 8.334.

How many pets of each color are there in the data?

We can use the functional API of Spark SQL to find how many pets of each color exist in the data.

Following are the steps for implementing this in SCALA and PYTHON:

  1. Group the data by type using groupBy().
  2. Use the groupBy() function of the DataFrame to count the records in each group.
  3. Finally, print the results to the console using the show() function of the DataFrame.
petsDF.groupBy("color").count().show()

The following is the output of the preceding code:

+-----+-----+
|color|count|
+-----+-----+
|green|    1|
|white|    2|
| gold|    2|
|black|    2|
|brown|    3|
+-----+-----+

As can be seen the number of pets in each colors can be seen here.

And that’s a rough introduction to aggregation in Spark / Spark SQL! There are a lot of powerful operations you can conduct in Spark, so keep exploring the APIs!

In the next section, we’ll cover Standardization – a practice that’s common in Data Engineering but not necessarily in Spark. See you then!

Spark Starter Guide 4.5: How to Join DataFrames

Spark Starter Guide 4.5: How to Join DataFrames

Previous post: Spark Starter Guide 4.4: How to Filter Data

Introduction

If you’ve spent any time writing SQL, or Structured Query Language, you might already be familiar with the concept of a JOIN. If you’re not, then the short explanation is that you can use it in SQL to combine two or more data tables together, leveraging a column of data that is shared or related between them.

Joining is handy in a number of ways, like supplementing your large dataset with additional information or performing lookups. There are many types of joins, like left, right, inner and full outer, and Spark has multiple implementations of each to make it convenient and fast for you as an engineer/analyst to leverage. It’s all possible using the join() method.

  • Inner Join: Return records that have matching values in both tables that are being joined together. Rows that do not match on both sides are not kept.
  • Left (Outer) Join: Return all records from the left table, and only the matched records from the right table. This is useful in situations where you need all the rows from the left table, joined with rows from the right table that match. It can be described as supplementing the Left table with Right table information.
  • Right (Outer) Join: Return all records from the right table, and only the matched records from the left table. This is useful in situations where you need all the rows from the right table, joined with rows from the left table that match. It’s the reversed version of the Left (Outer) Join.
  • Full (Outer) Join: Return all records from the left table and the right table, whether they have matching values or not. This is useful in situations where you need like rows to be joined together, but keeping the rows that don’t from both sides.

In the following exercise, we will see how to join two DataFrames.


Follow these steps to complete the exercise in SCALA:

Import additional relevant Spark libraries using the following code:

import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions.col

Create a Sequence of Rows where the content is a tuple containing an animal and its category using the following code:

val categorized_animals = Seq(Row("dog", "pet"),
                              Row("cat", "pet"),
                              Row("bear", "wild"))

Create a schema that corresponds to the data using the following code:

val schema_animals = List(
  StructField("name", StringType, nullable = true),
  StructField("category", StringType, nullable = true)
)

Use the parallelize() function of Spark to turn that Sequence into an RDD as shown in the following code:

val animalDataRDD = spark.sparkContext.parallelize(categorized_animals)

Create a DataFrame from the RDD and schema created using the following code:

val animalData = spark.createDataFrame(animalDataRDD, StructType(schema_animals))

Create a Sequence of Rows where the content is a tuple containing an animal and its food using the following code:

val animal_foods = Seq(Row("dog", "kibble"),
                       Row("cat", "canned tuna"),
                       Row("bear", "salmon"))

Again, we will create a schema that corresponds to the data from the preceding step using the following code:

val schema_foods = List(
  StructField("animal", StringType, nullable = true),
  StructField("food", StringType, nullable = true)
)

Using the parallelize() function of Spark we will turn that Sequence into an RDD as shown in the following code:

val animalFoodRDD = spark.sparkContext.parallelize(animal_foods)

We will then create a DataFrame from the RDD and schema created using the following code:

val animalFoods = spark.createDataFrame(animalFoodRDD, StructType(schema_foods))

Join one DataFrame to the other on the value they have in common: the animal name. Print the results to the console using the following code:

val animals_enhanced = animalData.join(
      animalFoods, 
      joinExprs = col(colName = "name") === col(colName = "animal"),
      joinType = "left")

animals_enhanced.show()

The following is the output of the preceding code:

+----+--------+------+-----------+
|name|category|animal|       food|
+----+--------+------+-----------+
| dog|     pet|   dog|     kibble|
|bear|    wild|  bear|     salmon|
| cat|     pet|   cat|canned tuna|
+----+--------+------+-----------+

From the preceding table, we can observe rows having common values in the name and animal columns. Based on this, their category and corresponding food is listed.


Follow these steps to complete the exercise in PYTHON:

Create a List of tuples containing an animal and its category using the following code:

categorized_animals = [("dog", "pet"), ("cat", "pet"), ("bear", "wild")]

Create a List of tuples containing an animal and its food using the following code:

animal_foods = [("dog", "kibble"), ("cat", "canned tuna"), ("bear", "salmon")]

Use the parallelize() function of Spark to turn those Lists into RDDs as shown in the following code:

animalDataRDD = sc.parallelize(categorized_animals)
animalFoodRDD = sc.parallelize(animal_foods)

Create DataFrames from the RDDs using the following code:

animalData = spark.createDataFrame(animalDataRDD, ['name', 'category'])
animalFoods = spark.createDataFrame(animalFoodRDD, ['animal', 'food'])

Join one DataFrame to the other on the value they have in common: the animal name. Print the results to the console using the following code:

animals_enhanced = animalData.join(animalFoods, animalData.name == animalFoods.animal)
animals_enhanced.show()

The following is the output of the preceding code:

+----+--------+------+-----------+
|name|category|animal|       food|
+----+--------+------+-----------+
| dog|     pet|   dog|     kibble|
|bear|    wild|  bear|     salmon|
| cat|     pet|   cat|canned tuna|
+----+--------+------+-----------+

This is just one way to join data in Spark. There is another way within the .join() method called the usingColumn approach.

The usingColumn Join Method

If the exercise were a bit different—say, if the join key/column of the left and right data sets had the same column name—we could enact a join slightly differently, but attain the same results. This is called the usingColumn approach, and it’s handy in its straightforwardness.

The following shows the code in SCALA:

leftData.join(rightData, usingColumn = "columnInCommon")

We can try the same thing in PYTHON using the following code:

leftData.join(rightData, on="columnInCommon")

There is an extension of this approach that allows you to involve multiple columns in the join, and all you have to do is provide it a Scala Sequence or Python List of the appropriate columns you wish to join on. If you’re familiar with the JOIN USING clause in SQL, this is effectively that. Keep in mind that each value in the list must exist on both sides of the join for this approach to succeed.

Following shows the code in SCALA:

leftData.join(rightData, usingColumns = Seq("firstColumnInCommon", "secondColumnInCommon"))

We can try the same thing in PYTHON using the following code:

leftData.join(rightData, on=['firstColumnInCommon', 'secondColumnInCommon'])

These methods are by no means the only way to join data, but they are the most common and straightforward.

In the next section, we will learn data aggregation.

Spark Starter Guide 4.4: How to Filter Data

Spark Starter Guide 4.4: How to Filter Data

Previous post: Spark Starter Guide 4.3: How to Deduplicate Data

Photo by Tyler Nix on Unsplash

Mmm…. coffee!

Introduction

Filtering data is important when you need to remove entries of information that are irrelevant or troublesome in your pipeline. It can also be used apart from data cleaning as a way to fork different chunks of data onto different routes based on their contents.

Spark, unsurprisingly, has a clean and simple way to filter data: the appropriately and aptly named, .filter(). Alternatively, you can use .where(), which is an alias function for .filter(). Like the deduplication method, filtering is commonly applied in the Transform stage of ETL.

In the following exercise, we will learn how to filter data in a Spark DataFrame step by step.


Follow these steps to complete the exercise in SCALA:

Import additional relevant Spark libraries using the following code:

import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

Create a Sequence of Rows where the content is a tuple containing an animal and its category using the following code:

val categorized_animals = Seq(Row("dog", "pet"),
                              Row("cat", "pet"),
                              Row("bear", "wild"))

Create a schema that corresponds to the data using the following code:

val schema = List(
  StructField("name", StringType, nullable = true),
  StructField("category", StringType, nullable = true)
)

Use the parallelize() function of Spark to turn that Sequence into an RDD as shown in the following code:

val animalDataRDD = spark.sparkContext.parallelize(categorized_animals)

Create a DataFrame from the RDD and schema created using the following code:

val animalData = spark.createDataFrame(animalDataRDD, StructType(schema))

Filter out any animal names that start with the letter c and print results to the console using the following code:

val nonCats = animalData.filter("name not like 'c%'")
nonCats.show()

The following is the output of the preceding code:

+----+--------+
|name|category|
+----+--------+
| dog|     pet|
|bear|    wild|
+----+--------+

In the above output, we can see that the data is filtered such that only animal names that do not start with c, namely dog and bear but not cat, remain displayed.

Apply another filter, this time removing any animals that are not pets, and print the results to the console using the following code:

val nonPets = animalData.filter("category != 'pet'")
nonPets.show()

The following is the output of the preceding code:

+----+--------+
|name|category|
+----+--------+
|bear|    wild|
+----+--------+

The above output shows a table where only wild animals (i.e. non-pets) remain in our data set.


Follow these steps to complete the exercise in PYTHON:

Create a List of tuples containing an animal and its category using the following code:

categorized_animals = [("dog", "pet"), ("cat", "pet"), ("bear", "wild")]

Use the parallelize() function of Spark to turn that Sequence into an RDD using the following code:

animalDataRDD = sc.parallelize(categorized_animals)

Create a DataFrame from the RDD using the following code:

animalsDF = spark.createDataFrame(animalDataRDD, ['name', 'category'])

Filter out any animal names that start with the letter c, and print results to the console using the following code:

nonCats = animalsDF.filter("name not like 'c%'")
nonCats.show()

The following is the output of the preceding code:

+----+--------+
|name|category|
+----+--------+
| dog|     pet|
|bear|    wild|
+----+--------+

Above, we can see that only dog and bear remain after the filtering step.

Apply another filter, this time removing any animals that are not pets, using the following code:

nonPets = animalsDF.filter("category != 'pet'")
nonPets.show()

The following is the output of the preceding code:

+----+--------+
|name|category|
+----+--------+
|bear|    wild|
+----+--------+

In this exercise, we learned how to filter a given set of data based on conditions using the .filter () method.

In the next section, we will learn the concept of Joins — and the different approaches you have to use them in Spark.

Spark Starter Guide 4.3: How to Deduplicate Data

Spark Starter Guide 4.3: How to Deduplicate Data

Previous post: Spark Starter Guide 4.2: How to Create a Spark Session

Introduction

The process of removing duplicate records from a collection of data is called deduplication. This is often necessary early in the process, before data is analyzed further. In fact, it is typically associated with the Transform stage of ETL (though it’s common in the Load stage as well). Spark has a very clever and reliable function, available on the DataFrame object, that can do just that: dropDuplicates().

It looks like this in practice, in Scala and Python respectively:

// scala
val deduplicatedDataFrame_dataframe = dataFrame.dropDuplicates()
# python
deduplicatedDataFrame = dataFrame.dropDuplicates()

Cleaning data is a necessary step in making it more useful. It can be as simple as removing duplicate entries or filtering out unwanted information. Thankfully, Spark provides a suite of handy functions that make the process straightforward. Let’s demonstrate the built-in deduplication functionality of Spark by applying it to a simple set of data.

In the following exercise, we will learn how to deduplicate data in a Spark DataFrame.


Follow these steps to complete the exercise in SCALA:

Import additional relevant Spark libraries using the following code:

import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

Create a Sequence of Rows where the content is a tuple containing an animal and its category using the following code:

val categorized_animals = Seq(Row("dog", "pet"),
                              Row("cat", "pet"),
                              Row("bear", "wild"),
                              Row("cat", "pet"),
                              Row("cat", "pet"))

Create a schema that corresponds to the data using the following code:

val schema = List(
  StructField("name", StringType, nullable = true),
  StructField("category", StringType, nullable = true)
)

Use the parallelize() function of Spark to turn that Sequence into an RDD as shown in the following code:

val animalDataRDD = spark.sparkContext.parallelize(categorized_animals)

Create a DataFrame from the RDD and schema created, and print the results to console using the following code:

val animalData = spark.createDataFrame(animalDataRDD, StructType(schema))
animalData.show()

The following is the output of the preceding code:

+----+--------+
|name|category|
+----+--------+
| dog|     pet|
| cat|     pet|
|bear|    wild|
| cat|     pet|
| cat|     pet|
+----+--------+

From the preceding output, you can notice that the “cat pet” row is duplicated thrice. We will remove the duplicates in the next step using the dropDuplicates() method.

Drop the duplicate rows and print the results to the console using the following code:

val deduped = animalData.dropDuplicates()
deduped.show()

The following is the output of the preceding code:

+----+--------+
|name|category|
+----+--------+
|bear|    wild|
| dog|     pet|
| cat|     pet|
+----+--------+

Follow these steps to complete the exercise in PYTHON:

Create a List of tuples containing an animal and its category using the following code:

categorized_animals = [("dog", "pet"), ("cat", "pet"), ("bear", "wild"), ("cat", "pet"), ("cat", "pet")]

Use the parallelize() function of Spark to turn that Sequence into an RDD using the following code:

animalDataRDD = sc.parallelize(categorized_animals)

Create a DataFrame from the RDD and print the results to the console using the following code:

animalsDF = spark.createDataFrame(animalDataRDD, ['name', 'category'])
animalsDF.show()

The following is the output of the preceding code:

+----+--------+
|name|category|
+----+--------+
| dog|     pet|
| cat|     pet|
|bear|    wild|
| cat|     pet|
| cat|     pet|
+----+--------+

From the preceding output, we can see that all the animals are categorized and the “cat pet” row is duplicated thrice. In the next step, we will use the dropDuplicates() method to fix this.

Drop the duplicate rows and print the results to the console using the following code:

deduplicated = animalsDF.dropDuplicates()
deduplicated.show()

The following is the output of the preceding code:

+----+--------+
|name|category|
+----+--------+
|bear|    wild|
| dog|     pet|
| cat|     pet|
+----+--------+

When you run the final step, you will see that the duplicate entries have been deleted.

In the next section, we will see the importance of filtering data and how it can be done.

Spark Starter Guide 4.2: How to Create a Spark Session

Spark Starter Guide 4.2: How to Create a Spark Session

Previous post: Spark Starter Guide 4.1: Introduction to Data Pipelines

Image courtesy of DataBricks: How to use Spark Session in Apache Spark 2.0

In this exercise, you’ll learn how to create a Spark Session – the basic building block for working with Spark in a modern context (ha, that’s a joke you’ll get later). In shorter terms: it’s the modern entry point into Spark.

Introduction

The Spark Session was introduced in the 2.0 release of Apache Spark, and was designed to both replace and consolidate the previous methods for accessing Spark: contexts! Sessions also made it easier to:

  • configure runtime properties of Spark applications after instantiation, (e.g. spark.sql.shuffle.partitions)
  • create DataFrames and DataSets
  • use Spark SQL and access Hive

So, instead of using the Spark Context, SQL Context and Hive Context objects of the past, you can simply use a Spark Session. You maintain all of the convenience of those legacy objects with one straightforward instantiation.


NOTE

All exercises in this chapter can be written in a project on your local workstation (using an editor like IntelliJ, Eclipse, PyCharm, and so on), or in a Notebook (Jupyter, Qubole, Databricks, and so on).

Code will execute the same in both environments, so the choice of what to use is up to you!


The steps for creating a Spark Session are provided in two different languages: Scala and Python. Select your API of choice and proceed!

Follow these steps to complete the exercise in SCALA:

  1. Import SparkSession from the Spark SQL library using the following code:
import org.apache.spark.sql.SparkSession
  • Create a Spark Session in local mode using the following code:
val spark = SparkSession
    .builder()
    .master("local[2]")
    .appName("My Spark App")
    .getOrCreate()

Follow these steps to complete the exercise in PYTHON:

  1. Import SparkSession from the Spark SQL library using the following code:
from pyspark.sql import SparkSession
spark = SparkSession\
    .builder\
    .appName("My Spark App")\
    .master("local[2]")\
    .getOrCreate()

Now, you’re ready to start using Spark. That might not have been very exciting, but in the next exercise we’ll dive headfirst into our first actual Spark application – data deduplication!