Spark Starter Guide 1.7: Chapter 1 Activity

This activity will combine the skills and techniques you learned so far in this chapter. 

Imagine you are working as an intern at XYZ BigData Analytics Firm, where you are tasked with performing data analysis on a data set to solidify your Spark DataFrame skills. 

In this activity you will use the data set “Kew Museum of Economic Botany: Object Dispersals to Schools, 1877-1982” which “is a record of object dispersals from the Kew Museum of Economic Botany to schools between 1877 and 1982” that “is based on entries in the ‘Specimens Distributed’ books (or exit books) in the Museum archive.” The source of the data set can be found at https://figshare.com/articles/Schools_dispersals_data_with_BHL_links/9573956 This activity is a good practice for a real-world scenario because it simulates being given a random data set that you must process and analyze when you have no prior connection to the data set.

Read More »

Spark Starter Guide 1.4: Modifying Spark DataFrames

Introduction

Now that we have created DataFrames manually and from files, it is time to actually do things with them. This section introduces the fundamental operations on DataFrames: displaying, selecting, filtering, and sorting. As discussed in the Section 1.1, performing operations on a DataFrame actually creates a new DataFrame behind the scenes in Spark, regardless if a new name is given to the DataFrame or the results are displayed. When working with DataFrames the results can be displayed (as will be covered shortly), the results can be saved to a DataFrame with the same name as the original, or the results can be saved to a different DataFrame name.

Read More »

Spark Starter Guide 4.7: How to Standardize Data

Spark Starter Guide 4.7: How to Standardize Data

Previous post: Spark Starter Guide 4.6: How to Aggregate Data

Introduction

Standardization is the practice of analyzing columns of data and identifying synonyms or like names for the same item. Similar to how a cat can also be identified as a kitty, kitty cat, kitten or feline, we might want to standardize all of those entries into simply “cat” so our data is less messy and more organized. This can make future processing of the data more streamlined and less complicated. It can also reduce skew, which we address in Addressing Data Cardinality and Skew.

We will learn how to standardize data in the following exercises.


NOTE

From this point onward, future code examples could utilize a line of code like this:

import spark.implicits._

This allows for implicit conversions for Scala objects like RDDs into modern Spark abstractions like Dataset, DataFrame, or Columns. It also supports many convenience functions from Spark SQL like isin() (checking for values in a list) or desc() (descending order sorting).


Two Types of Standardization

There are at least two basic ways to standardize something:

  1. You can recognize when two things are the same but literally different (“puppy” and “dog”) and associate them without actually changing anything.

  2. You can recognize when two things are the same and change them to be consistent (change instances of “puppy” to “dog”).

We’ll show ways to do both in Spark, in Scala and Python. Both involve some form of a synonym library.

Standardization through Suggestion

Exercise Setup

Follow these steps to complete the exercise in SCALA:

Import additional relevant Spark libraries using the following code:

import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

Import spark.implicits, which will be useful for handy operations in a later step using the following code:

import spark.implicits._

Create a Sequence of Rows, each containing an animal name and type using the following code:

val my_previous_pets = Seq(Row("annabelle", "cat"),
                           Row("daisy", "kitten"),
                           Row("roger", "puppy"),
                           Row("joe", "puppy dog"),
                           Row("rosco", "dog"),
                           Row("julie", "feline"))

Create a schema that corresponds to the data using the following code:

val schema = List(
  StructField("nickname", StringType, nullable = true),
  StructField("type", StringType, nullable = true)
)

Use the parallelize() function of Spark to turn that Sequence into an RDD as shown in the following code:

val petsRDD = spark.sparkContext.parallelize(my_previous_pets)

Create a DataFrame from the RDD and schema created using the following code:

val petsDF = spark.createDataFrame(petsRDD, StructType(schema))

Use the where() function of the DataFrame in combination with the isin() function (of the implicits library) to only keep rows where the name matches a provided list of dog nouns. Print the results to the console as shown in the following code:

val dogs = petsDF
.where($"type"
.isin("dog", "puppy", "puppy dog", "hound", "canine"))

dogs.show()

The following is the output of the preceding code:

+--------+---------+
|nickname|     type|
+--------+---------+
|   roger|    puppy|
|     joe|puppy dog|
|   rosco|      dog|
+--------+---------+

Use the where() function of the DataFrame in combination with the isin() function (of the implicits library) to only keep rows where the name matches a provided list of cat nouns. Print the results to the console as shown in the following code:

val cats = petsDF
.where($"type"
.isin ("cat", "kitty", "kitten", "feline", "kitty cat"))

cats.show()

The following is the output of the preceding code:

+---------+------+
| nickname|  type|
+---------+------+
|annabelle|   cat|
|    daisy|kitten|
|    julie|feline|
+---------+------+

As we can see, we were able to use custom standardization logic to decide when a cat is a cat, and a dog is a dog, even when their given type is not consistent. And, generally speaking, the isin() methodology of string comparisons in a list is relatively efficient at scale.


Follow these steps to complete the exercise in PYTHON:

Import additional relevant Spark libraries using the following code:

from pyspark.sql.functions import col

Create a List of Rows, each containing a name and type using the following code:

my_previous_pets = [Row("annabelle", "cat"),
                    Row("daisy", "kitten"),
                    Row("roger", "puppy"),
                    Row("joe", "puppy dog"),
                    Row("rosco", "dog"),
                    Row("julie", "feline")]

Use the parallelize() function of Spark to turn that List into an RDD as shown in the following code:

petsRDD = sc.parallelize(my_previous_pets)

Create a DataFrame from the RDD and a provided schema using the following code:

petsDF = spark.createDataFrame(petsRDD, ['nickname', 'type'])

Use the where() function of the DataFrame in combination with the isin() function (of the implicits library) to only keep rows where the name matches a provided list of dog nouns. Print the results to the console as shown in the following code:

dogs = petsDF
.where(col("type")
.isin("dog", "puppy", "puppy dog", "hound", "canine"))

dogs.show()

The following is the output of the preceding code:

+--------+---------+
|nickname|     type|
+--------+---------+
|   roger|    puppy|
|     joe|puppy dog|
|   rosco|      dog|
+--------+---------+

Use the where() function of the DataFrame in combination with the isin() function (of the implicits library) to only keep rows where the name matches a provided list of cat nouns. Print the results to the console as shown in the following code:

cats = petsDF
.where(col("type")
.isin(["cat", "kitty", "kitten", "feline", "kitty cat"]))

cats.show()

This example also demonstrates that you can pass a list to the isin() function, not just a comma-separated list of strings values as demonstrated in the previous step.

The following is the output of the preceding code:

+---------+------+
| nickname|  type|
+---------+------+
|annabelle|   cat|
|    daisy|kitten|
|    julie|feline|
+---------+------+

We can see that all cats could be identified in the data set, even though they weren’t all labeled as type ‘cat’.

Standardization through Modification

In the previous exercise, we would quietly identify animals as a certain type if their type was found in a list of common synonyms for the proper type. In this exercise, we will actually modify our data to be standardized, by replacing the similar type value with its preferred, standard alternative.


NOTE

From this point onward, further Scala code examples could, where appropriate, utilize a case class. This is a Scala only abstraction that acts as a simple schema for structured, tabular-style data.

It’s an especially handy tool when paired with the Spark Dataset API, which can make powerful code even simpler to read and work with than DataFrames. It should be created outside of the main() function in Scala, or imported from its own class.

Otherwise, you will experience exceptions.

Example:

case class Person(name:String, age:Int)


Exercise Setup

Follow these steps to complete the exercise in SCALA:

Import additional relevant Spark libraries using the following code:

import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

Then, create a case class called Pet, which contains two columns: nickname and petType using the following code:

case class Pet(nickname: String, petType: String)

Using the petsDF created in the previous exercise, Use the map() function of the DataFrame to compare the petType to a list of common dog and cat synonyms – returning “dog” or “cat”, respectively if there is a match. If there is not a match, return the unmodified petType (thus assuming it cannot be standardized) as shown in the following code:

val standardized_pets = petsDF.map(pet => {

  val nickname = pet.getString(pet.fieldIndex("nickname"))
  val petType = pet.getString(pet.fieldIndex("type"))

  val standardType =
    if (Seq("dog", "puppy", "puppy dog", "hound", "canine").contains(petType)){
      "dog"
    }
    else if (Seq("cat", "kitty", "kitten", "feline", "kitty cat").contains(petType)){
    "cat"
    }
  else{
      petType
    }

  Pet(nickname, standardType)
})

Print the results to the console using the following code:

standardized_pets.show()

The following is the output of the preceding code:

+---------+---+
|       _1| _2|
+---------+---+
|annabelle|cat|
|    daisy|cat|
|    roger|dog|
|      joe|dog|
|    rosco|dog|
|    julie|cat|
+---------+---+

As we can observe, Column 1 in the table displays the names of the pets while column 2 displays the type of pet animal they are, cat or dog, after passing through the standardization process.

Follow these steps to complete the exercise in PYTHON:

Create and utilize a standardize() function to compare the petType to a list of common dog and cat nouns – returning “dog” or “cat”, respectively, if there is a match.

def standardize(pet):

    name = pet[0]
    animal_type = pet[1]

    if animal_type in ["dog", "puppy", "puppy dog", "hound", "canine"]:
        return name, "dog"
    elif animal_type in ["cat", "kitty", "kitten", "feline", "kitty cat"]:
        return name, "cat"
    else:
        return pet

Then, apply the standardize() function to petsRDD (created in the previous exercise) using the map() function. Hint: You can also use a UDF on the DataFrame instead of this RDD map method, but we’ll cover that in a future exercise!

Print the results to the console using the following code:

standardizedPets = petsRDD.map(standardize)
standardizedPetsDF = spark.createDataFrame(standardizedPets, ['nickname', 'type'])

standardizedPetsDF.show()

The following is the output of the preceding code:

+---------+----+
| nickname|type|
+---------+----+
|annabelle| cat|
|    daisy| cat|
|    roger| dog|
|      joe| dog|
|    rosco| dog|
|    julie| cat|
+---------+----+

We can see that annabelle, daisy and julie are identified as cats, while roger, joe and rosco are identified as dogs, even though their types, starting out, were not labeled strictly as such.

In this exercise, we learned how to use custom standardization logic in Spark to identify similar animals, even when their given types differed.

In the next section, we’ll cover Ordering & Sorting – useful for when you want to convey… well, order! See you then!

Spark Starter Guide 1.3: Creating DataFrames from Files

Introduction

When working with Spark in a production setting, most likely the data will come from files stored in HDFS on a Hadoop cluster. Spark has built-in support for a lot of different file types. In this section we will cover five of the most popular data file types you will see when working with Spark in a production setting: Parquet, ORC, JSON, Avro, and CSV. Some of these file types were created recently for the Big Data world and others have been around for a while and have uses outside of Hadoop. They represent a good mix of file data types and the capabilities of Spark when dealing with files.

The remainder of chapter 1 uses an open source sample data set originally found at https://github.com/Teradata/kylo/tree/master/samples/sample-data. It contains folders with data files of each of the file types covered in this chapter. All of the folders were copied to HDFS with a path of /user/your_user_name/data. No data was changed from the original version.

Let’s see the following Mac example:

Clone the kylo GitHub open-source repository to a location on your local computer with the following code:

$ git clone https://github.com/Teradata/kylo.git

Upload the sample-data directory to your Hadoop edge node by using the scp command as shown in the following code:

$ scp -rp /Users/your_directory/…/kylo/samples/sample-data user_name@cluster_ip_address:/home/your_user_name

Once on the edge node, use the Hadoop command -put to upload the directory of data from the edge node to a directory in HDFS as shown in the following code:

$ hadoop fs -put /home/your_user_name/sample-data /user/your_user_name/data

Creating DataFrames from Parquet Files

Parquet is an open-source data file format. Just like Comma Separated Value (CSV) is a type of file format, Parquet is a file format. However, Parquet was built from the ground up to be very fast, efficient, and flexible. Parquet was originally designed for the Hadoop ecosystem, so it fits perfectly with Spark. Many traditional file formats like CSV are row-based. Meaning all the rows will have to be read for each query or operation. This is costly and time consuming. Instead Parquet has a columnar format. Which means it organizes data into columns instead of rows. Querying by column means reads and operations can skip over entire columns that aren’t needed. This makes queries very fast. 

Parquet also is unique because the schema of the data is held within the files. This means Spark doesn’t have to infer or guess the schema, it can create DataFrames with the correct column names and data types. Lastly, Parquet was built with advanced compression technology that physically compresses the data for the purpose of taking up less space. For example, one TB of CSV files would only be a couple hundred GBs as Parquet files. This makes storage costs cheaper because there is less data to hold.

Creating DataFrames from Parquet files is straightforward, since Parquet files already contain the schema. There are two main ways of creating DataFrames from Parquet files and the commands are the same in both PySpark and Spark Scala. Both options use the SparkSession variable, spark, as the starting point and then call the read method. The read method is used to read data from files and create DataFrames. The first option calls the parquet method that is purposely designed to read parquet files. The parquet method has one parameter and that is the HDFS path to the parquet file. The second option first calls the format method that accepts a string parameter of the type of files to be read. In this case “parquet”. Then it calls the load method that is the HDFS path to files to be loaded. 

Note:

In this chapter many of the Python and Scala Spark commands are identical. Since Scala uses the val keyword when creating variables and Python simply needs the variable name, this section will use “... =” to designate that the command can be written in either Python or Scala to save space. The only difference being Scala uses “val some_name =” while Python uses “some_name =“.

Exercise 10: Creating DataFrames from Parquet Files

  1. To create a DataFrame using the parquet() method use the SparkSession variable and call the read method followed by the parquet() method. Then pass the full HDFS file path of the parquet file you would like to convert into a DataFrame with the following code in both PySpark and Spark Scala:
… = spark.read.parquet("hdfs://user/your_user_name/data/userdata1.parquet")
  1. To create a DataFrame using the load() method, use the SparkSession variable and call the read method followed by the format() method that takes one parameter which is the file type of the file to be converted into a DataFrame. In this case use “parquet“. Then call the load() method with the full HDFS file path to the parquet file as shown in the following code:
… = spark.read.format("parquet").load("hdfs://user/your_user_name/data/userdata1.parquet")
  1. If we call .show(5) on any of the newly created DataFrames it will output the first five rows of the DataFrame. The data is made-up data that mimics users logging into a computer system. The output of the DataFrame would look like:
+-------------------+---+----------+---------+--------------------+------+--------------+----------------+------------+---------+---------+--------------------+--------+
|  registration_dttm| id|first_name|last_name|               email|gender|    ip_address|              cc|     country|birthdate|   salary|               title|comments|
+-------------------+---+----------+---------+--------------------+------+--------------+----------------+------------+---------+---------+--------------------+--------+
|2016-02-03 07:55:29|  1|    Amanda|   Jordan|    ajordan0@com.com|Female|   1.197.201.2|6759521864920116|   Indonesia| 3/8/1971| 49756.53|    Internal Auditor|   1E+02|
|2016-02-03 17:04:03|  2|    Albert|  Freeman|     afreeman1@is.gd|  Male|218.111.175.34|                |      Canada|1/16/1968|150280.17|       Accountant IV|        |
|2016-02-03 01:09:31|  3|    Evelyn|   Morgan|emorgan2@altervis...|Female|  7.161.136.94|6767119071901597|      Russia| 2/1/1960|144972.51| Structural Engineer|        |
|2016-02-03 00:36:21|  4|    Denise|    Riley|    driley3@gmpg.org|Female| 140.35.109.83|3576031598965625|       China| 4/8/1997| 90263.05|Senior Cost Accou...|        |
|2016-02-03 05:05:31|  5|    Carlos|    Burns|cburns4@miitbeian...|      |169.113.235.40|5602256255204850|South Africa|         |     null|                    |        |
+-------------------+---+----------+---------+--------------------+------+--------------+----------------+------------+---------+---------+--------------------+--------+
only showing top 5 rows

Creating DataFrames from ORC Files

The Optimized Row Columnar (ORC) is a file format that was designed with two main goals: increase data processing speeds and reducing file sizes. ORC is another open-source project built for the Hadoop ecosystem, specifically Hive. Like Parquet, ORC is a columnar file format. But ORC stores data first into stripes which are groups of row data. Within a stripe the data is stored in columns. Also, each stripe has a footer that holds metadata about the columns in the stripe, including each column’s data type, count, min, max, and sum. The ORC architecture is relatively complicated to describe and wrap your mind around and there is much more to it than this brief introduction. But the main point of ORC is that it is meant to be fast and it is. Also, ORC files have great file compression just like Parquet.

Source: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC#LanguageManualORC-FileStructure

Note

ORC files don’t have file extensions.

In PySpark and Spark Scala a DataFrame is created from ORC files by calling the orc() method or the .format("orc").load() methods.

Following are examples of creating DataFrames from ORC files using the orc() method and the load() method. The code is the same between PySpark and Spark Scala:

… = spark.read.orc("hdfs://user/your_user_name/data/orc/userdata1_orc")
 
… = spark.read.format("orc").load("hdfs://user/your_user_name/data/orc/userdata1_orc")

All of the previous examples have created DataFrames from a single file. But Spark can easily create a single DataFrame from multiple files. Instead of providing a single file in the path, all that is required is to end the path on a directory that only contains files of single file format and schema. The following example is shown for ORC files but works for all the other file formats.

In the following example a single file is not specified but a directory containing only files of the same format. Also, notice the subtle difference between the two examples. In the HDFS path, the first example ends in only the directory name. In the second example, the HDFS path has a trailing slash. Both of these produce the exact same DataFrame. It doesn’t matter whether the trailing slash is present. 

… = spark.read.orc("hdfs://user/your_user_name/data/orc")
 
… = spark.read.format("orc").load("hdfs://user/your_user_name/data/orc/")

Creating DataFrames from JSON

JSON, or JavaScript Object Notation is a file format written in human-readable text in the form of key-value pairs. In JavaScript, an object looks like:

var myObj = {name: "Craig", age: 34, state: "Arkansas"};

Suppose we had a JSON file, some_file.json, that had data equal to the preceding JavaScript object. The contents of that file would be  {name: "Craig", age: 33, state: "Arkansas"}. The structure of JSON makes it very easy for humans to read, understand, and even write. But it turns out this JSON structure is very easy for computers to read and parse. Which makes JSON the go-to format for exchanging data between applications and servers. 

The preceding JSON example is a very simple example but JSON supports very complex nested structures including objects, arrays, and many different data types. This capability makes it extremely flexible at holding any type of data. Because of JSON’s flexible structure and ease of use for both humans and computers JSON is very popular. 

Spark has the ability to automatically infer the schema from JSON data. This makes it very easy to use JSON data in Spark. Also, the json() method has almost twenty optional parameters it accommodates. Everything from error handling to leading zero options to data type configurations. This chapter won’t cover all of the available paramters. But the link below to the official documentation outlines all the paramters.

Following is the code for both PySpark and Spark Scala:

… = spark.read.json("hdfs://user/your_user_name/data/books1.json")
 
… = spark.read.format("json").load("hdfs://user/your_user_name/data/books1.json")

See the official documentation links for PySpark (https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=json#pyspark.sql.DataFrameReader.json) and Scala (https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader@json(paths:String*):org.apache.spark.sql.DataFrame) to view the complete list optional parameters.

Creating DataFrames from Avro Files

Avro file format is another open-source technology designed for the Hadoop world. But unlike Parquet and ORC, Avro is a row-based file format. The row-based architecture and binary data format of Avro make it ideal for write-heavy operations. If your use-case is primarily focused on writing data then Avro would be a good choice. File formats like Parquet and ORC are primarily designed for reading data. 

Another major advantage of Avro is its schema. In fact, Avro stores its schema in JSON. And because Avro’s schema is JSON, the schema fields can be added or subtracted overtime. If we had an Avro file with three columns, “name”, “age”, and “state”, the corresponding JSON schema code would look like the following example. The JSON clearly delineates the table name, all of the column names and their data types, plus other metadata about how the record was generated as shown in the following code:

    {
      "type" : "record",
      "name" : "json example",
      "doc" : "Schema generated"
      "fields" : [ {
        "name" : "name",
        "type" : [ "null", "string" ],
        "default" : null,
        "columnName" : "name",
        "sqlType" : "12"
      }, {
        "name" : "age",
        "type" : [ "null", "long" ],
        "default" : null,
        "columnName" : "age",
        "sqlType" : "93"
      }, {
        "name" : "state",
        "type" : [ "null", "string" ],
        "default" : null,
        "columnName" : "state",
        "sqlType" : "2"
      } ],
      "tableName" : "employees"
    }

In Spark, there is not an avro() method. So the only way in PySpark and Spark Scala to create a DataFrame from Avro files is to use the load() method. See the following example.

Following is the code for both PySpark and Spark Scala:

… = spark.read.format("avro").load("hdfs://user/your_user_name/data/userdata1.avro")

In addition to HDFS paths that include single files and directories, Spark also supports wildcards and collections of files & directories. Let’s explore these options using Avro files as an example. 

Wildcards

Spark supports Regular Expressions (RegEx) in the HDFS path. The two most beneficial wildcards are the * character which matches on zero or more characters and the ? character which matches on a single character. In the preceding Avro example, let’s say in the data directory were a mix of Avro files and other files formats. But we only wanted to create a DataFrame of the Avro files. The following example searches for all files in the data directory that end in “.avro”:

… = spark.read.format("avro").load("hdfs://user/your_user_name/data/*.avro")

Collections

Let’s say the data directory consisted of ten files from userdata0.avro through userdata9.avro. And for some reason we wanted to create a DataFrame from a certain subset of the files. We could provide a comma separated collection of the full HDFS paths to each file. For example:

… = spark.read.format("avro").load("hdfs://user/your_user_name/data/userdata1.avro,hdfs://user/data/userdata5.avro,hdfs://user/your_user_name/data/userdata8.avro")

The result would be one DataFrame with data from the three Avro files. This collection of paths will also work on any collection of files and directories. This assumes that all the files have the same schema. 

Creating DataFrames from CSV Files

Comma Separated Value (CSV) is a row-based human readable text format with the data values separated by commas. It is one of the simplest file formats. Because it is plain text the file will take up more space than the other file formats used with Spark. But it is simple to read and write and most text editors can open CSV files.

Creating a DataFrame from CSV files requires several parameters. The most important parameters include the file separator defined as sep, the inferSchema parameter, and the header parameter which specifies if the CSV file(s) have a header row of column names. In Spark, there is a major difference in creating a DataFrame from CSV files between PySpark and Spark Scala. 

Exercise 11: Creating DataFrames from CSV Files

  1. In PySpark, the three options consist of spark.read.csv("…")spark.read.format("csv").load("…"), and spark.read.load("…", format="csv"). Use the following code to use the spark.read.csv("…") option:
csv_pyspark_df1 = spark.read.csv("hdfs://user/your_user_name/data/csv/userdata1.csv"
  , sep=","
  , inferSchema="true"
  , header="true")
  1. Use the following code to use the spark.read.format("csv").load("…") option:
csv_pyspark_df2 = spark.read.format("csv").load("hdfs://user/your_user_name/data/csv/userdata1.csv"
  , sep=","
  , inferSchema="true"
  , header="true")
  1. Use the following code to use the spark.read.load("…", format="csv") option:
csv_pyspark_df3 = spark.read.load("hdfs://user/your_user_name/data/csv/userdata1.csv"
  , format="csv"
  , sep=","
  , inferSchema="true"
  , header="true")
  1. In Spark Scala, after calling the read method, use the option() method to add additional configurations. In Python we set parameters inside the csv() or load() methods. But in Scala the only parameter is the HDFS path. See the following code for csv() and load():
val csv_scala_df1 = spark.read
  .option("sep", ",")
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("hdfs://user/your_user_name/data/csv/userdata1.csv")
 
val csv_scala_df2 = spark.read
  .format("csv")
  .option("sep", ",")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("hdfs://user/your_user_name/data/csv/userdata1.csv")

CSV files are a very popular option for storing data, especially when data sizes are on the smaller side. This is because the data file is human readable and the files are easy to create. As a result you will create a lot of DataFrames from CSV files over your Spark career. 

Like the json() method, the csv() method has many parameters. See the following official documentation links for a complete list.

PySpark: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=json#pyspark.sql.DataFrameReader.csv

Scala: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader@csv(paths:String*):org.apache.spark.sql.DataFrame


Spark Starter Guide 4.6: How to Aggregate Data

Spark Starter Guide 4.6: How to Aggregate Data

Previous post: Spark Starter Guide 4.5: How to Join DataFrames

Introduction

Also known as grouping, aggregation is the method by which data is summarized by dividing it into common, meaningful groups. At the same time that information is grouped, you can also summarize data points from those groups through a series of SQL-like aggregate functions (i.e. functions that you can run during data aggregations).

Say you run an online e-commerce website that sells shoes. You have data logs that tell you what each customer purchased, and when.

customershoe_namesale_pricepurchase_date
landonblue shoes5.002020-10-01
jamesblue shoes5.002020-10-04
zachwhite shoes6.002020-10-06
An example data set of sold shoes, called Sales.

At the end of each month, you might want to aggregate that data such that you can see how much revenue each model of shoe brought in that month. In SQL, that might look something like this:

select sum(sale_price) as revenue, 
       shoe_name 
from sales 
group by shoe_name

In this example, shoe_name is our grouping field, and the sum total of sales (for each shoe_name) is our aggregation metric. You would expect results to show something like:

itemrevenue
blue shoes10.00
white shoes6.00
After all, we had two sales of blue shoes at $5/pair, and only one sale of white shoes at $6 a pair. If we had just done sum(sale_price) as revenue, but didn’t group by shoe_name, we’d simply get a total of $16.

Sum, of course, is just one example of an aggregate function. Others include min(), max(), count(), and stdev(), to name a few. All of these can be used to summarize identifiable groups within your data.

In the following exercises, we will learn to analyze data in groups by way of aggregation.


Exercise Setup

Follow these steps to complete the exercise in SCALA:

Import additional relevant Spark libraries using the following code:

import org.apache.spark.sql.functions.{max, min}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

Continuing to build on our animal data set (from previous exercises in this guide), Create a Sequence of Rows, each containing an animal name, type, age and color using the following code:

val my_previous_pets = Seq(
  Row("fido", "dog", 4, "brown"),
  Row("annabelle", "cat", 15, "white"),
  Row("fred", "bear", 29, "brown"),
  Row("gus", "parakeet", 2, "black"),
  Row("daisy", "cat", 8, "black"),
  Row("jerry", "cat", 1, "white"),
  Row("fred", "parrot", 1, "brown"),
  Row("gus", "fish", 1, "gold"),
  Row("gus", "dog", 11, "black"),
  Row("daisy", "iguana", 2, "green"),
  Row("rufus", "dog", 10, "gold"))

Create a schema that mirrors the data you just created using the following code. The schema will be used by Spark to form a DataFrame.

val schema = List(
  StructField("nickname", StringType, nullable = true),
  StructField("type", StringType, nullable = true),
  StructField("age", IntegerType, nullable = true),
  StructField("color", StringType, nullable = true)
)

Use the parallelize() function of Spark to turn that Sequence into an RDD as shown in the following code:

val petsRDD = spark.sparkContext.parallelize(my_previous_pets)

Create a DataFrame from the RDD and schema created using the following code:

val petsDF = spark.createDataFrame(petsRDD, StructType(schema))

Create a temporary table view of the data in Spark SQL called ‘pets’ using the following code:

petsDF.createOrReplaceTempView("pets")

You’ve now completed the initial setup for this exercise in Scala. Skip past the subsequent section in Python to continue.


Follow these steps to complete the exercise in PYTHON:

Import relevant Spark SQL libraries using the following code:

from pyspark.sql import functions as F

Continuing to build on our animal data set, Create a List of Rows, each containing an animal name, type, age and color using the following code:

my_previous_pets = [("fido", "dog", 4, "brown"),
                    ("annabelle", "cat", 15, "white"),
                    ("fred", "bear", 29, "brown"),
                    ("gus", "parakeet", 2, "black"),
                    ("daisy", "cat", 8, "black"),
                    ("jerry", "cat", 1, "white"),
                    ("fred", "parrot", 1, "brown"),
                    ("gus", "fish", 1, "gold"),
                    ("gus", "dog", 11, "black"),
                    ("daisy", "iguana", 2, "green"),
                    ("rufus", "dog", 10, "gold")]

Use the parallelize() function of Spark to turn that List into an RDD as shown in the following code:

petsRDD = sc.parallelize(my_previous_pets)

Create a DataFrame from the RDD and a provided schema using the following code:

petsDF = spark.createDataFrame(petsRDD, ['nickname', 'type', 'age', 'color'])

Create a temporary table view of the data in Spark SQL called ‘pets’ using the following code:

petsDF.createOrReplaceTempView('pets')

You’ve now completed the initial setup for this exercise in Python.

If you chose to do the setup for this exercise in Scala, you can now proceed from this point.

Analysis through Aggregation

We now have an in-memory view (i.e. relational table representation) of our data. It only exists within the confines of our Spark application, but it enables us to run SQL queries against it as if it were a real table in a database. This will prove handy time after time in your work with Spark.

Now that we have a query-able view, let’s answer several different questions about the data by using Spark’s rich, comprehensive SQL functionality to query the temporary view we’ve created. Once a table is registered, you can query it as many times as you like, again, as if it were a real table.

As usual, we’ve provided solutions in both Scala and Python for your convenience. Where the solution is the exact same, we have coalesced the code.

What are the three most popular (i.e. recurring) names in the data?

To answer this question, we need to write a SQL query in Spark SQL that gives us the count of name occurrences in the table. You can also do this functionally (with methods on the DataFrame), but this example will showcase the pure SQL approach.

The following shows the code in SCALA and PYTHON:

spark.sql("select nickname, count(*) as occurrences from pets group by nickname order by occurrences desc limit 3").show()

The following is the output of the preceding code:

+--------+-----------+
|nickname|occurrences|
+--------+-----------+
|     gus|          3|
|    fred|          2|
|   daisy|          2|
+--------+-----------+

As can be seen the three most popular names are gus, fred, and daisy.

How old is the oldest cat in the data?

We can use the functional API of Spark SQL to find the maximum age of cats in the data. As demonstrated above, you can also use pure SQL to achieve this – but this example will focus on the purely functional approach.

Follow these steps for SCALA:

  1. Use the where() function of the DataFrame to filter the data to just cats.
  2. Use the agg() function of the DataFrame to select the max age.
  3. Use the show() function of the DataFrame to print the results to the console.

In code form:

petsDF.where("type = 'cat'")
      .agg(Map("age" -> "max"))
      .show()

The following is the output of the preceding code:

+--------+
|max(age)|
+--------+
|      15|
+--------+

The oldest cat in our data is 15!

Follow these steps for PYTHON:

  1. Use the where() function of the DataFrame to filter the data to just cats.
  2. Use the agg() function of the DataFrame to select the max age.
  3. Use the show() function of the DataFrame to print the results to the console.

In code form:

petsDF.where("type = 'cat'")\
    .agg({"age": "max"})\
    .show()

The following is the output of the preceding code:

+--------+
|max(age)|
+--------+
|      15|
+--------+

What are the youngest and oldest cat ages?

We can use the functional API of Spark SQL to to answer this question.

Following are the steps for implementing this in SCALA:

  1. Use the where() function of the DataFrame to filter the data to just cats.
  2. Group the data by type using groupBy().
  3. Then, combine the agg() function of the DataFrame with the min() and max() functions to request two metrics: min and max age. Optional: rename the columns using alias().
  4. Finally, print the results to the console using the show() function of the DataFrame.
petsDF.where("type = 'cat'")
  .groupBy("type")
  .agg(min("age").alias("min_age"), max("age").alias("max_age"))
  .show()

The following is the output of the preceding code:

+----+-------+-------+
|type|min_age|max_age|
+----+-------+-------+
| cat|      1|     15|
+----+-------+-------+

As can be seen the youngest cat’s age is 1 and the oldest cat’s age is 15.

Following are the steps for implementing this in PYTHON:

  1. Use the where() function of the DataFrame to filter the data to just cats.
  2. Group the data by type using groupBy().
  3. Then, combine the agg() function of the DataFrame with the min() and max() functions to request two metrics: min and max age. Optional: rename the columns using alias().
  4. Finally, print the results to the console using the show() function of the DataFrame.
petsDF.where(petsDF["type"] == "cat") \
    .groupBy("type") \
    .agg(F.min("age"), F.max("age")) \
    .show()

The following is the output of the preceding code:

+----+-------+-------+
|type|min_age|max_age|
+----+-------+-------+
| cat|      1|     15|
+----+-------+-------+

What is the average dog age?

We can use the functional API of Spark SQL to find the average age of dogs.

Following are the steps for implementing this in SCALA:

  1. Use the where() function of the DataFrame to filter the data to just dogs.
  2. Group the data by type using groupBy().
  3. Use the agg() function of the DataFrame to select the average age.
  4. Finally, print the results to the console using the show() function of the DataFrame.
petsDF.where("type = 'dog'")
    .groupBy("type")
    .agg("age" -> "avg")
    .show()

The following is the output of the preceding code:

+----+-----------------+
|type|         avg(age)|
+----+-----------------+
| dog|8.333333333333334|
+----+-----------------+

As can be seen the average dog’s age is 8.334.

Following are the steps for implementing this in PYTHON:

  1. Use the where() function of the DataFrame to filter the data to just dogs.
  2. Group the data by type using groupBy().
  3. Use the agg() function of the DataFrame to select the average age.
  4. Finally, print the results to the console using the show() function of the DataFrame.
petsDF.where("type = 'dog'")\
    .groupBy("type")\
    .agg(F.avg("age"))\
    .show()

The following is the output of the preceding code:

+----+-----------------+
|type|         avg(age)|
+----+-----------------+
| dog|8.333333333333334|
+----+-----------------+

As can be seen the average age of dogs is 8.334.

How many pets of each color are there in the data?

We can use the functional API of Spark SQL to find how many pets of each color exist in the data.

Following are the steps for implementing this in SCALA and PYTHON:

  1. Group the data by type using groupBy().
  2. Use the groupBy() function of the DataFrame to count the records in each group.
  3. Finally, print the results to the console using the show() function of the DataFrame.
petsDF.groupBy("color").count().show()

The following is the output of the preceding code:

+-----+-----+
|color|count|
+-----+-----+
|green|    1|
|white|    2|
| gold|    2|
|black|    2|
|brown|    3|
+-----+-----+

As can be seen the number of pets in each colors can be seen here.

And that’s a rough introduction to aggregation in Spark / Spark SQL! There are a lot of powerful operations you can conduct in Spark, so keep exploring the APIs!

In the next section, we’ll cover Standardization – a practice that’s common in Data Engineering but not necessarily in Spark. See you then!

Spark Starter Guide 4.5: How to Join DataFrames

Spark Starter Guide 4.5: How to Join DataFrames

Previous post: Spark Starter Guide 4.4: How to Filter Data

Introduction

If you’ve spent any time writing SQL, or Structured Query Language, you might already be familiar with the concept of a JOIN. If you’re not, then the short explanation is that you can use it in SQL to combine two or more data tables together, leveraging a column of data that is shared or related between them.

Joining is handy in a number of ways, like supplementing your large dataset with additional information or performing lookups. There are many types of joins, like left, right, inner and full outer, and Spark has multiple implementations of each to make it convenient and fast for you as an engineer/analyst to leverage. It’s all possible using the join() method.

  • Inner Join: Return records that have matching values in both tables that are being joined together. Rows that do not match on both sides are not kept.
  • Left (Outer) Join: Return all records from the left table, and only the matched records from the right table. This is useful in situations where you need all the rows from the left table, joined with rows from the right table that match. It can be described as supplementing the Left table with Right table information.
  • Right (Outer) Join: Return all records from the right table, and only the matched records from the left table. This is useful in situations where you need all the rows from the right table, joined with rows from the left table that match. It’s the reversed version of the Left (Outer) Join.
  • Full (Outer) Join: Return all records from the left table and the right table, whether they have matching values or not. This is useful in situations where you need like rows to be joined together, but keeping the rows that don’t from both sides.

In the following exercise, we will see how to join two DataFrames.


Follow these steps to complete the exercise in SCALA:

Import additional relevant Spark libraries using the following code:

import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions.col

Create a Sequence of Rows where the content is a tuple containing an animal and its category using the following code:

val categorized_animals = Seq(Row("dog", "pet"),
                              Row("cat", "pet"),
                              Row("bear", "wild"))

Create a schema that corresponds to the data using the following code:

val schema_animals = List(
  StructField("name", StringType, nullable = true),
  StructField("category", StringType, nullable = true)
)

Use the parallelize() function of Spark to turn that Sequence into an RDD as shown in the following code:

val animalDataRDD = spark.sparkContext.parallelize(categorized_animals)

Create a DataFrame from the RDD and schema created using the following code:

val animalData = spark.createDataFrame(animalDataRDD, StructType(schema_animals))

Create a Sequence of Rows where the content is a tuple containing an animal and its food using the following code:

val animal_foods = Seq(Row("dog", "kibble"),
                       Row("cat", "canned tuna"),
                       Row("bear", "salmon"))

Again, we will create a schema that corresponds to the data from the preceding step using the following code:

val schema_foods = List(
  StructField("animal", StringType, nullable = true),
  StructField("food", StringType, nullable = true)
)

Using the parallelize() function of Spark we will turn that Sequence into an RDD as shown in the following code:

val animalFoodRDD = spark.sparkContext.parallelize(animal_foods)

We will then create a DataFrame from the RDD and schema created using the following code:

val animalFoods = spark.createDataFrame(animalFoodRDD, StructType(schema_foods))

Join one DataFrame to the other on the value they have in common: the animal name. Print the results to the console using the following code:

val animals_enhanced = animalData.join(
      animalFoods, 
      joinExprs = col(colName = "name") === col(colName = "animal"),
      joinType = "left")

animals_enhanced.show()

The following is the output of the preceding code:

+----+--------+------+-----------+
|name|category|animal|       food|
+----+--------+------+-----------+
| dog|     pet|   dog|     kibble|
|bear|    wild|  bear|     salmon|
| cat|     pet|   cat|canned tuna|
+----+--------+------+-----------+

From the preceding table, we can observe rows having common values in the name and animal columns. Based on this, their category and corresponding food is listed.


Follow these steps to complete the exercise in PYTHON:

Create a List of tuples containing an animal and its category using the following code:

categorized_animals = [("dog", "pet"), ("cat", "pet"), ("bear", "wild")]

Create a List of tuples containing an animal and its food using the following code:

animal_foods = [("dog", "kibble"), ("cat", "canned tuna"), ("bear", "salmon")]

Use the parallelize() function of Spark to turn those Lists into RDDs as shown in the following code:

animalDataRDD = sc.parallelize(categorized_animals)
animalFoodRDD = sc.parallelize(animal_foods)

Create DataFrames from the RDDs using the following code:

animalData = spark.createDataFrame(animalDataRDD, ['name', 'category'])
animalFoods = spark.createDataFrame(animalFoodRDD, ['animal', 'food'])

Join one DataFrame to the other on the value they have in common: the animal name. Print the results to the console using the following code:

animals_enhanced = animalData.join(animalFoods, animalData.name == animalFoods.animal)
animals_enhanced.show()

The following is the output of the preceding code:

+----+--------+------+-----------+
|name|category|animal|       food|
+----+--------+------+-----------+
| dog|     pet|   dog|     kibble|
|bear|    wild|  bear|     salmon|
| cat|     pet|   cat|canned tuna|
+----+--------+------+-----------+

This is just one way to join data in Spark. There is another way within the .join() method called the usingColumn approach.

The usingColumn Join Method

If the exercise were a bit different—say, if the join key/column of the left and right data sets had the same column name—we could enact a join slightly differently, but attain the same results. This is called the usingColumn approach, and it’s handy in its straightforwardness.

The following shows the code in SCALA:

leftData.join(rightData, usingColumn = "columnInCommon")

We can try the same thing in PYTHON using the following code:

leftData.join(rightData, on="columnInCommon")

There is an extension of this approach that allows you to involve multiple columns in the join, and all you have to do is provide it a Scala Sequence or Python List of the appropriate columns you wish to join on. If you’re familiar with the JOIN USING clause in SQL, this is effectively that. Keep in mind that each value in the list must exist on both sides of the join for this approach to succeed.

Following shows the code in SCALA:

leftData.join(rightData, usingColumns = Seq("firstColumnInCommon", "secondColumnInCommon"))

We can try the same thing in PYTHON using the following code:

leftData.join(rightData, on=['firstColumnInCommon', 'secondColumnInCommon'])

These methods are by no means the only way to join data, but they are the most common and straightforward.

In the next section, we will learn data aggregation.

Spark Starter Guide 1.2: Spark DataFrame Schemas

Introduction

A schema is information about the data contained in a DataFrame. Specifically, the number of columns, column names, column data type, and whether the column can contain NULLs. Without a schema, a DataFrame would be a group of disorganized things. The schema gives the DataFrame structure and meaning.

Without a schema, a DataFrame would be a group of disorganized things. The schema gives the DataFrame structure and meaning.

Specifying the Schema of a DataFrame

In the previous section we introduced the createDataFrame() method. In PySpark, this method looks like:

createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)

After the required data parameter the first optional parameter is schema. The most useful options for the schema parameter include: None (or not included), a list of column names, or a StructType.

If schema is None or left out, then Spark will try to infer the column names and the column types from the data. If schema is a list of column names, then Spark will add the column names in the order specified and will try to infer the column types from the data. In both of these cases Spark uses the number of rows specified in the second optional parameter, samplingRatio, to infer the schema from the data. If not included or given None, then only the top row is used to infer the schema.

To illustrate, say we had some data with a variable named computer_sales with columns “product_code”, “computer_name”, and “sales”. The following illustrates all the options the createDataFrame() method can handle in PySpark.

The following code is used when only the data parameter is provided or the schema is set to None or left blank:

df1 = spark.createDataFrame(computer_sales)
df2 = spark.createDataFrame(computer_sales, None)

Both DataFrames are equivalent.

The following is used when the data parameter is specified along with a Python list of column names:

df3 = spark.createDataFrame(computer_sales, ["product_code", "computer_name", "sales"])

The following code is used when the data parameter is specified along with a Python list of column names and the first two rows will be used to infer the schema:

df4 = spark.createDataFrame(computer_sales, ["product_code", "computer_name", "sales"], 2)

The following is used to infer the schema from every row in the DataFrame. len() is a Python function that returns an integer of the number of values in a list. Since the number of values in the list computer_sales equals the number of rows in the DataFrame, the samplingRatio parameter will evaluate every row in the DataFrame to infer the schema:

df5 = spark.createDataFrame(computer_sales, ["product_code", "computer_name", "sales"], len(computer_sales))

Exercise 6: Creating a DataFrame in PySpark with only named columns

  1. Create a nested list called home_computers as shown in the following code:
    home_computers = [["Honeywell", "Honeywell 316#Kitchen Computer", "DDP 16 Minicomputer", 1969], ["Apple Computer", "Apple II series", "6502", 1977], ["Bally Consumer Products", "Bally Astrocade", "Z80", 1977]]
  2. Create a DataFrame but this time the column names of the DataFrame are given explicitly as a list in the second parameter as shown in the following code:
    computers_df = spark.createDataFrame(home_computers, ["Manufacturer", "Model", "Processor", "Year"])
    Since the third parameter samplingRatio is not included, Spark uses the first row of data to infer the data types of the columns.
  3. Show the contents of the DataFrame and display the schema with the following code:
    computers_df.show()
    computers_df.printSchema()

Running the preceding code displays the following:

+--------------------+--------------------+-------------------+----+
|        Manufacturer|               Model|          Processor|Year|
+--------------------+--------------------+-------------------+----+
|           Honeywell|Honeywell 316#Kit...|DDP 16 Minicomputer|1969|
|      Apple Computer|     Apple II series|               6502|1977|
|Bally Consumer Pr...|     Bally Astrocade|                Z80|1977|
+--------------------+--------------------+-------------------+----+
 
root
 |-- Manufacturer: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Processor: string (nullable = true)
 |-- Year: long (nullable = true)

Columns names make DataFrames exceptionally useful. The PySpark API makes adding columns names to a DataFrame very easy. 

Schemas, StructTypes, and StructFields

The most rigid and defined option for schema is the StructType. It is important to note that the schema of a DataFrame is a StructType. If a DataFrame is created without column names and Spark infers the data types based upon the data, a StructType is still created in the background by Spark. 

A manually created PySpark DataFrame, like the following example, still has a StructType schema:

computers_df = spark.createDataFrame(home_computers)
computers_df.show()
+--------------------+--------------------+-------------------+----+
|        Manufacturer|               Model|          Processor|Year|
+--------------------+--------------------+-------------------+----+
|           Honeywell|Honeywell 316#Kit...|DDP 16 Minicomputer|1969|
|      Apple Computer|     Apple II series|               6502|1977|
|Bally Consumer Pr...|     Bally Astrocade|                Z80|1977|
+--------------------+--------------------+-------------------+----+

The schema can be displayed in PySpark by calling the schema method on a DataFrame like: computers_df.schema

Running the preceding code displays the following:

Out[1]: StructType(List(StructField(_1,StringType,true),StructField(_2,StringType,true),StructField(_3,StringType,true),StructField(_4,LongType,true)))

To recap, the schema of a DataFrame is stored as a StructType object. The StructType object consists of a list of StructFields. The StructFields are the information about the columns of a DataFrame. Use the following code for Spark Scala:

import org.apache.spark.sql.types.{StructType, StructField}
 
val schema = StructType(
  List(
    StructField("Manufacturer", IntegerType, true),
    StructField("Model", StringType, true),
    StructField("Processor", StringType, true),
    StructField("Year", LongType, true)
  )
)

Use the following code for PySpark:

from pyspark.sql.types import StructType, StructField
 
schema = StructType([
  StructField("Manufacturer", StringType(), True),
  StructField("Model", StringType(), True),
  StructField("Processor", StringType(), True),
  StructField("Year", LongType(), True)
])

It is important to note that the schema of a DataFrame is a StructType

StructFields are objects that correspond to each column of the DataFrame and are constructed with the name, data type, and a boolean value of whether the column can contain NULLs. The second parameter of a StructFieldis the columns data type: string, integer, decimal, datetime, and so on. To use data types in Spark the types module must be called. Imports in Scala and Python are code that is not built-in the main module. For example, DataFrames are part of the main code class. But ancillary things like data types and functions are not and must be imported to be used in your file. The following code is the Scala import for all of the data types:

import org.apache.spark.sql.types._

The following code is the Python import for all of the data types:

from pyspark.sql.types import *

Note:

StructType and StructField are actually Spark data types themselves. They are included in the preceding data imports that import all the members of the data types class. To import StructType and StructField individually use the following code for Scala Spark:
import org.apache.spark.sql.types.{StructType, StructField}

To import StructType and StructField individually use the following code for PySpark:
from pyspark.sql.types import StructType, StructField

Exercise 7: Creating a DataFrame in PySpark with a Defined Schema

  1. Import all the PySpark data types at once (that include both StructType and StructField) and make a nested list of data with the following code:
from pyspark.sql.types import *

customer_list = [[111, "Jim", 45.51], [112, "Fred", 87.3], [113, "Jennifer", 313.69], [114, "Lauren", 28.78]]
  1. Construct the schema using the StructType and StructField. First make a StructType which holds a Python list as shown in the following code:
customer_schema = StructType([
  StructField("customer_id", LongType(), True),
  StructField("first_name", StringType(), True),
  StructField("avg_shopping_cart", DoubleType(), True)
])

Inside the list are the individual StructField which make up the columns of the DataFrame.

  1. Make the DataFrame with the code below. In the createDataFrame() method the first parameter is the data and the second is the schema.
customer_df = spark.createDataFrame(customer_list, customer_schema)
  1. Display the contents and the schema of the DataFrame with the following code:
    customer_df.show()
    customer_df.printSchema()

Running the preceding code displays the following:

+-----------+----------+-----------------+
|customer_id|first_name|avg_shopping_cart|
+-----------+----------+-----------------+
|        111|       Jim|            45.51|
|        112|      Fred|             87.3|
|        113|  Jennifer|           313.69|
|        114|    Lauren|            28.78|
+-----------+----------+-----------------+
 
root
 |-- customer_id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- avg_shopping_cart: double (nullable = true)

Spark schemas are the structure or the scaffolding of a DataFrame. Just like a building would collapse without structure, so too would a DataFrame. Without structure Spark wouldn’t be able to scale to trillions and trillions of rows. 

Spark schemas are the structure or the scaffolding of a DataFrame

Another way to create a DataFrame in Spark Scala is to use Rows wrapped in a Sequence. We have already introduced the Sequence object. The Row object is one row in a DataFrame with each value representing cells in different columns.

However, a Sequence of Rows requires several things in order to be created into a DataFrame. First a Sequence of Rows must be “parallelized” which means the data that is held in the driver is distributed to all the nodes of the clusters. This is done by calling the parallelize() method on the SparkContext (which is created from the SparkSession). An example would look like spark.sparkContext.parallelize(data). The second thing required to make a DataFrame out of a Sequence of Rows is providing a StructType schema. Unfortunately, not passing a schema or using the toDF() method causes an error.

Exercise 8: Creating a DataFrame in Spark Scala with a Defined Schema

  1. Import the Scala data types and then make a Sequence of Rows called grocery_items. The Seq() wraps around each Row() object. Each comma separated value in the Row() will be a column in the DataFrame:
import org.apache.spark.sql.types._
 
val grocery_items = Seq(
  Row(1, "stuffing", 4.67),
  Row(2, "milk", 3.69),
  Row(3, "rolls", 2.99),
  Row(4, "potatoes", 5.15),
  Row(5, "turkey", 23.99)
)
  1. Construct the schema using the StructType and StructFields. First make a StructType which holds a Scala list. Inside the list are the individual StructFields which make up the columns of the DataFrame
val grocery_schema = StructType(
  List(
    StructField("id", IntegerType, true),
    StructField("item", StringType, true),
    StructField("price", DoubleType, true)
  )
)
  1. Make a DataFrame using the createDataFrame() method. But when using the Sequence of Rows we first have to paralize the data. Inside createDataFrame() call the SparkSession by the “spark” variable then call sparkContext and then finally parallelize followed by the data variable grocery_items. The second parameter is the StructType grocery_schema:
val grocery_df = spark.createDataFrame(spark.sparkContext.parallelize(grocery_items), grocery_schema)

Display the data and print the schema:

grocery_df.show()
grocery_df.printSchema()
+---+--------+-----+
| id|    item|price|
+---+--------+-----+
|  1|stuffing| 4.67|
|  2|    milk| 3.69|
|  3|   rolls| 2.99|
|  4|potatoes| 5.15|
|  5|  turkey|23.99|
+---+--------+-----+
 
root
 |-- id: integer (nullable = true)
 |-- item: string (nullable = true)
 |-- price: double (nullable = true)
 
import org.apache.spark.sql.types._
grocery_items: Seq[org.apache.spark.sql.Row] = List([1,stuffing,4.67], [2,milk,3.69], [3,rols,2.99], [4,potatoes,5.15], [5,turkey,23.99])
grocery_schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(item,StringType,true), StructField(price,DoubleType,true))
grocery_df: org.apache.spark.sql.DataFrame = [id: int, item: string ... 1 more field]

The add() Method

The add() method can be used interchangeably and in addition to the StructFields objects on a StructType. The add() method takes the same parameters as the StructField object. The following schemas are all equivalent representations:

The PySpark code is as follows:

from pyspark.sql.types import StructType, StructField, StringType, LongType
 
schema1 = StructType([
  StructField("id_column", LongType(), True),
  StructField("product_desc", StringType(), True)
])
 
schema2 = StructType().add("id_column", LongType(), True).add("product_desc", StringType(), True)
 
schema3 = StructType().add(StructField("id_column", LongType(), True)).add(StructField("product_desc", StringType(), True))

We can confirm that the two schemas are equivalent by comparing if the schema variables are equal to each other and printing the results in Python:

print(schema1 == schema2)
print(schema1 == schema3)

Running the preceding code will display the following:

True
True

In Spark Scala, the add() method is exactly the same except for a blank Scala list must be placed inside the blank StructType. See variables schema2 and schema3 for the only difference between Spark Scala and PySpark:

import org.apache.spark.sql.types.{StructType, StructField, StringType, LongType}
 
val scala_schema1 = StructType(List(
  StructField("id_column", LongType, true),
  StructField("product_desc", StringType, true)
))
 
val scala_schema2 = StructType(List()).add("id_column", LongType, true).add("product_desc", StringType, true)
 
val scala_schema3 = StructType(List()).add(StructField("id_column", LongType, true)).add(StructField("product_desc", StringType, true))

We can confirm that the two schemas are equivalent by comparing if the schema variables are equal to each other and printing the results in Scala:

println(scala_schema1 == scala_schema2)
println(scala_schema1 == scala_schema3)

Running the preceding code will display the following:

true
true
import org.apache.spark.sql.types.{StructType, StructField, StringType, LongType}
scala_schema1: org.apache.spark.sql.types.StructType = StructType(StructField(id_column,LongType,true), StructField(product_desc,StringType,true))
scala_schema2: org.apache.spark.sql.types.StructType = StructType(StructField(id_column,LongType,true), StructField(product_desc,StringType,true))
scala_schema3: org.apache.spark.sql.types.StructType = StructType(StructField(id_column,LongType,true), StructField(product_desc,StringType,true))

The add() method can be used when adding a new column to already existing DataFrame. In the following example, sales_schema is the schema of a DataFrame. And we want to add another column to the DataFrame. We can use the add() method on a StructField to the original StructType to create a brand new StructType.

Exercise 9: Using the add() Method

  1. Create a schema of a StructType named sales_schema that has two columns. The first column “user_id” is a long data type and cannot be nullable. The second column “product_item” is a string data type and can be nullable. Following is the code for PySpark:
from pyspark.sql.types import StructType, StructField, StringType, LongType
 
sales_schema = StructType([
  StructField("user_id", LongType(), False),
  StructField("product_item", StringType(), True)
])

Following is the code for Scala Spark:

import org.apache.spark.sql.types.{StructType, StructField, StringType, LongType}
 
val sales_schema = StructType(List(
  StructField("user_id", LongType, true),
  StructField("product_item", StringType, true)
))
  1. Create a StructField called sales_field that has a column name of “total_sales” with a long data type that can be nullable. Following is the code for PySpark:
    sales_field = StructField("total_sales", LongType(), True)
    Following is the code for Scala Spark:
    val sales_field = StructField("total_sales", LongType, true)
  2. Use the add() method to add the sales_field StructField to the sales_schema StructType. Following is the code for PySpark:
    another_schema = sales_schema.add(sales_field)
    Following is the code for Scala Spark:
    val another_schema = sales_schema.add(sales_field)
  3. Print out the new schema variable to verify it looks correctly: print(another_schema)

Following is the code for PySpark:

StructType(List(StructField(user_id,LongType,false),StructField(product_item,StringType,true),StructField(total_sales,LongType,true)))

Following is the code for Scala Spark:

println(another_schema)
StructType(StructField(user_id,LongType,true), StructField(product_item,StringType,true), StructField(total_sales,LongType,true))
import org.apache.spark.sql.types.{StructType, StructField, StringType, LongType}
sales_schema: org.apache.spark.sql.types.StructType = StructType(StructField(user_id,LongType,true), StructField(product_item,StringType,true))
sales_field: org.apache.spark.sql.types.StructField = StructField(total_sales,LongType,true)
another_schema: org.apache.spark.sql.types.StructType = StructType(StructField(user_id,LongType,true), StructField(product_item,StringType,true), StructField(total_sales,LongType,true))

Use the add() method when adding columns to a DataFrame

You can use the schema method on a DataFrame in conjunction with the add() method to add new fields to the schema of an already existing DataFrame. In Spark, a DataFrame’s schema is a StructType. In the preceding exercise we manually specified the schema as StructType. Spark has a shortcut: the schema method. The method schema can be called on an existing DataFrame to return its schema, that is a StructType. So in Spark Scala or PySpark you would call some_df.schema to output the StructType schema.

Here is an example in PySpark of the output of the schema method on a DataFrame returning a StructType schema:

print(customer_df.schema)
StructType(List(StructField(customer_id,LongType,true),StructField(first_name,StringType,true),StructField(avg_shopping_cart,DoubleType,true)))

Here is an example in Spark Scala of the output of the schema method on a DataFrame returning a StructType schema:

println(grocery_df.schema)
StructType(StructField(id,IntegerType,true), StructField(item,StringType,true), StructField(price,DoubleType,true))

So, with the schema method you don’t have to manually create the StructType to add a new column. Just call the schema method on the DataFrame and then use the add method to add a column as a StructField. Following is the code for PySpark:

final_schema = customer_df.schema.add(StructField("new_column", StringType(), True))
 
print(final_schema)

Running the preceding code displays the following:

StructType(List(StructField(customer_id,LongType,true),StructField(first_name,StringType,true),StructField(avg_shopping_cart,DoubleType,true),StructField(column_x,StringType,true),StructField(column_x,StringType,true),StructField(new_column,StringType,true),StructField(new_column,StringType,true)))

Following is the code for Scala Spark:

val final_schema = grocery_df.schema.add(StructField("new_column", StringType, true))
 
println(final_schema)

Running the preceding code displays the following:

StructType(StructField(id,IntegerType,true), StructField(item,StringType,true), StructField(price,DoubleType,true), StructField(new_column,StringType,true))
final_schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(item,StringType,true), StructField(price,DoubleType,true), StructField(new_column,StringType,true))

In the Modifying DataFrames section of this chapter we will see how to add new columns of data to a DataFrame.

Return column names from a schema

Use the fieldNames method on a StructType to return a list or array of the column names. This is an easy way to return the column names of a DataFrame. The fieldNames method can be called on a StructType or after the schema method on a DataFrame. The only difference between Spark Scala and PySpark is that PySpark requires trailing parenthesis () and Spark Scala omits the parenthesis. 

In PySpark call the fieldNames() method on a schema and on the DataFrame to return the column names of the schema:

print( customer_df.schema.fieldNames() )
print( customer_schema.fieldNames() )

The output is a list of the column names of the DataFrame:

['customer_id', 'first_name', 'avg_shopping_cart'] 
['customer_id', 'first_name', 'avg_shopping_cart']

In Spark Scala call the fieldNames() method on a schema and on the DataFrame to return the column names of the schema. In Scala we cannot directly print the contents of an array like we can print a list in Python. So here we create a variable and Scala outputs the contents of the array:

val columns1 = grocery_df.schema.fieldNames
val columns2 = grocery_schema.fieldNames

The output is an array of the column names of the DataFrame:

columns1: Array[String] = Array(id, item, price) 
columns2: Array[String] = Array(id, item, price)

Nested Schemas

So far, we have dealt with flat and orderly DataFrame schema. But Spark supports nested columns where a column can contain more sets of data. 

So far, we have dealt with flat and orderly DataFrame schema. But Spark supports nested columns where a column can contain more sets of data. Suppose we had a data set that looked like the following Python dictionary or JSON object: 

{"id":101,"name":"Jim","orders":[{"id":1,"price":45.99,"userid":101},{"id":2,"price":17.35,"userid":101}]},{"id":102,"name":"Christina","orders":[{"id":3,"price":245.86,"userid":102}]},{"id":103,"name":"Steve","orders":[{"id":4,"price":7.45,"userid":103},{"id":5,"price":8.63,"userid":103}]}

This data set would be the result of some imaginary sales tables that was joined to an orders table. We will look at joining DataFrames together in Chapter 3SQL with Spark.

It is difficult to see from the nested dictionary but there are three columns: idname, and orders. But orders is special, because it is a list of lists. In Python we can directly use this data by wrapping it in brackets as mentioned earlier.

nested_sales_data = [{"id":101,"name":"Jim","orders":[{"id":1,"price":45.99,"userid":101},{"id":2,"price":17.35,"userid":101}]},{"id":102,"name":"Christina","orders":[{"id":3,"price":245.86,"userid":102}]},{"id":103,"name":"Steve","orders":[{"id":4,"price":7.45,"userid":103},{"id":5,"price":8.63,"userid":103}]}]

If we used this list and made a DataFrame without specifying a schema, the output would not be very usable or readable. The following PySpark code uses the preceding nested JSON data to make a Spark DataFrame. The DataFrame and schema is displayed to demonstrate what can happen when you make a DataFrame with nested data without a schema:

ugly_df = spark.createDataFrame(nested_sales_data)
ugly_df.show(20, False)
ugly_df.printSchema()
+---+---------+------------------------------------------------------------------------+
|id |name     |orders                                                                  |
+---+---------+------------------------------------------------------------------------+
|101|Jim      |[[id ->, userid ->, price -> 45.99], [id ->, userid ->, price -> 17.35]]|
|102|Christina|[[id ->, userid ->, price -> 245.86]]                                   |
|103|Steve    |[[id ->, userid ->, price -> 7.45], [id ->, userid ->, price -> 8.63]]  |
+---+---------+------------------------------------------------------------------------+
 
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: double (valueContainsNull = true)

The output is not readable or user friendly with the “->” characters and Spark is trying to make a map of the data. Let’s add a schema to tell Spark exactly how we want to structure the DataFrame. The following PySpark code demonstrates the results of nested data when using a schema:

from pyspark.sql.types import *
 
orders_schema = [
  StructField("id", IntegerType(), True),
  StructField("price", DoubleType(), True),
  StructField("userid", IntegerType(), True)
]
 
sales_schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("name", StringType(), True),
  StructField("orders", ArrayType(StructType(orders_schema)), True)
])

Here we called the order_schema inside the sales_schema. This shows how versatile schemas can be and how easy it is in Spark to construct complex schemas. Now let’s make a DataFrame that is readable and well structured as shown in the following code:

nested_df = spark.createDataFrame(nested_sales_data, sales_schema)
 
nested_df.show(20, False)
nested_df.printSchema()
+---+---------+----------------------------------+
|id |name     |orders                            |
+---+---------+----------------------------------+
|101|Jim      |[[1, 45.99, 101], [2, 17.35, 101]]|
|102|Christina|[[3, 245.86, 102]]                |
|103|Steve    |[[4, 7.45, 103], [5, 8.63, 103]]  |
+---+---------+----------------------------------+
 
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- userid: integer (nullable = true)

In the next section we will move on from manually created DataFrames to creating DataFrames from files stored in Hadoop.

Spark Starter Guide 1.1: Creating Spark DataFrames Manually

Introduction

Since the Spark 2.0 version update, DataFrames have been the central technology for accomplishing tasks in Spark. At its essence, DataFrames are an immutable but distributed group of data that is assembled into named columns with a set structure. Let’s look at the aspects of DataFrames individually in the next section.

DataFrames are an immutable but distributed group of data that is assembled into named columns with a set structure

Spark DataFrame Characteristics

Distributed Group of Data

Spark’s principle data storage device is the Hadoop Distributed File System (HDFS). A Hadoop cluster consists of a single NameNode that manages access & metadata of the cluster and multiple DataNodes that store the data. In Hadoop, each data file is split up into many small pieces, replicated at least three times, and stored across the DataNodes. Spark is designed to be able to read and process this distributed data stored in HDFS. So, if data is stored in HDFS and that data is created into a DataFrame, then the data actually is distributed on all the DataNodes of the Hadoop cluster.  

Immutable

Spark DataFrames are immutable which means they do not change. So, adding columns, removing rows, renaming columns, performing calculations, or anything else done to a DataFrame does not actually change the original DataFrame. Instead, each time an operation is performed on a DataFrame an entirely new DataFrame is created, behind the scenes in Spark, with the additional change(s). Even if you performed an operation on a DataFrame and called the DataFrame the same name, like df = df.withColumn(…), in the background Spark creates an entirely new DataFrame with the new column change and then renames the new DataFrame with the same name as the original, df. If an user performs a df.show() after the df = df.withColumn(…), because the Python or Scala code is interpreted in sequential order from top to bottom, the df.show() would display the DataFrame with the added column.

It is very important to note that Spark computes all DataFrame transformations in a lazy fashion. What that means is that computations or operations on a DataFrame actually only happen when a physical action is performed. Examples of an action on a DataFrame include displaying, writing, counting, and collecting on a DataFrame. For example, if we add a calculated column to a DataFrame, the column isn’t actually added until we perform an action on the DataFrame, like displaying the top twenty rows of the DataFrame with df.show(). For each operation performed on a DataFrame, but before an action is called, Spark essentially creates a blueprint or execution plan of the operation. This blueprint would consist of a function that would be the dataset plus the operation. And each time a new operation is performed Spark updates the blueprint with the new changes. Then when an action is called, Spark executes the blueprint and a new DataFrame is created.

Assembled into Named Columns

Spark DataFrames have structure. That structure consists of columns with column names. All data in a DataFrame is organized into named columns: even if there is just one column. For example, if the user creates a DataFrame with no column names, Spark will automatically give each column a name corresponding to its column number: _1, _2, _3, and so on.

Set Structure

Each column’s name and the data type of the column make up its structure, which is called a schema. A DataFrames schema may change but it must be defined. This means columns must have a data type: integer, string, decimal, date, and so on.


To summarize the Spark characteristics – Spark is an action technology. Every operation in Spark is designed to accomplish tasks and help reveal insights on incredibly large data sets. DataFrames form the foundation for everything in Spark: organizing data, analyzing data, streaming data, performing machine learning on data, and much more.  

But before we can do all of that, we first have to create DataFrames. Let’s jump right in.

Creating DataFrames Manually

The main way to create DataFrames in Spark is to use the createDataFrame() method. This method is called on the SparkSession to create DataFrames. Since Spark 2.0, the SparkSession is the main way to interact with all of Sparks’ many capabilities. The SparkSession creates and exposes the SparkConfSparkContext, and SQLContext to the entire application. Normally the SparkSession is created with a variable named “spark” but any name can be used. A basic SparkSession in Spark Scala looks like:

val spark = SparkSession
   .builder()
   .appName("some_app_name")
   .getOrCreate()

A basic SparkSession in PySpark looks like:

spark = SparkSession \
   .builder \
   .appName("some_app_name") \
   .getOrCreate()

Creating DataFrames Manually with PySpark

The createDataFrame() method in PySpark has a structure like:

createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)

The optional parameters, schemasamplingRatio, and verifySchema all deal with a DataFrames schema. A schema is information about the structure of the data. These parameters and more will be covered in the next section Spark DataFrame Schemas. For now, we will only use the first parameter, data.

A schema is information about the structure of the data

The parameter data is a required parameter and can take many things. A DataFrame can be manually created from various kinds of nested lists and tuples. 

In Python, a list is a group of ordered things. Lists can include objects of different data types, are ordered, can be changed, and be nested. A nested list is the easiest way to manually create a DataFrame in PySpark. Each inside list forms a row in the DataFrame.

Let’s start off by showing how to create a DataFrame from a nested Python list.

Exercise 1: Creating a DataFrame in PySpark from a Nested List

In this exercise we will be creating a DataFrame in PySpark from a given set of nested list. 

  1. Create a Python variable hadoop_list that is a nested list with the following code:
    hadoop_list = [[1, "MapReduce"], [2, "YARN"], [3, "Hive"], [4, "Pig"], [5, "Spark"], [6, "Zookeeper"]]
    This will create a DataFrame with six rows and two columns.
  2. To create the DataFrame named hadoop_df we use the SparkSession variable spark (that we created) and call the createDataFrame() method passing only the nested list with the following code:
    hadoop_df = spark.createDataFrame(hadoop_list)
  3. Finally display the contents of the DataFrame using hadoop_df.show() and display the schema of the DataFrame in a tree structure using hadoop_df.printSchema() as shown in the following code:
hadoop_df.show()
hadoop_df.printSchema()

Running the preceding code will produce the following output:

+---+---------+
| _1|       _2|
+---+---------+
|  1|MapReduce|
|  2|     YARN|
|  3|     Hive|
|  4|      Pig|
|  5|    Spark|
|  6|Zookeeper|
+---+---------+
 
root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)

You have now created your first Spark DataFrame. In this exercise the DataFrame only has six rows. But a Spark DataFrame can scale infinitely to contain 100 trillion rows and beyond. This the power of Spark. 

In the output did you notice anything that stood out? There are actually two things to note: 

  1. The column names were _1 and _2. This is because no column names were supplied when creating the DataFrame. Spark didn’t know what to call the columns, so _1 and _2 correspond to its column number going from left to right. 
  2. The output of the printSchema() method correctly inferred the data type of each column. Spark figured out the first column was of data type long, which is similar to an integer, and that the second column was a string. When the printSchema() method is called on a DataFrame the output displays the schema in a tree format. The tree format displays the column hierarchy, column names, column data type, and whether the column is nullable. The printSchema() method has no parameters.

To display the contents of the DataFrame we call the method show() on the newly created DataFrame. The show() method looks like this:

show(n=20, truncate=True)

The show() method defaults to displaying the top twenty rows and also truncates each cell to the first twenty characters. To display more or less than the top twenty rows set the first parameter to any integer. To include all the characters of the cells, set the second parameter to False.

Displaying the contents of Spark DataFrames in PySpark:

address_data = [["Bob", "1348 Central Park Avenue"], ["Nicole", "734 Southwest 46th Street"], ["Jordan", "3786 Ocean City Drive"]]
address_df = spark.createDataFrame(address_data)
address_df.show()
+------+--------------------+
|    _1|                  _2|
+------+--------------------+
|   Bob|1348 Central Park...|
|Nicole|734 Southwest 46t...|
|Jordan|3786 Ocean City D...|
+------+--------------------+

Since the method defaults to the displaying only the first twenty characters of each cell, the content is truncated.

To display all the characters for each cell set the second parameter to False and to limit the output to the first two rows, set the first parameter to 2, as shown in the following code:

address_df.show(2, False)
+------+-------------------------+
|_1    |_2                       |
+------+-------------------------+
|Bob   |1348 Central Park Avenue |
|Nicole|734 Southwest 46th Street|
+------+-------------------------+
only showing top 2 rows

Note:

The second parameter, truncate, can also take integers. If set to an integer, it will display the number of characters equal to the integer for each cell.


In Python, a tuple is similar to a list except it is wrapped in parentheses instead of square brackets and is not changeable (immutable). Other than that, lists and tuples are the same. A nested tuple is a tuple inside another tuple.

Exercise 2: Creating a DataFrame in PySpark from a nested tuple

  1. Create a nested tuple called programming_languages with the following code:
    programming_languages = ((1, "Java", "Scalable"), (2, "C", "Portable"), (3, "Python", "Big Data, ML, AI, Robotics"), (4, "JavaScript", "Web Browsers"), (5, "Ruby", "Web Apps"))
  2. Construct a DataFrame called prog_lang_df with the following code:
    prog_lang_df = spark.createDataFrame(programming_languages)
  3. Display the five rows and set the truncate parameter to False so the entire contents of the cells will be shown. Also print the schema of the DataFrame with the following code:
prog_lang_df.show(5, False)
prog_lang_df.printSchema()
+---+----------+--------------------------+
|_1 |_2        |_3                        |
+---+----------+--------------------------+
|1  |Java      |Scalable                  |
|2  |C         |Portable                  |
|3  |Python    |Big Data, ML, AI, Robotics|
|4  |JavaScript|Web Browsers              |
|5  |Ruby      |Web Apps                  |
+---+----------+--------------------------+
 
root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)

In Python, a dictionary is a key-value pair wrapped in curly braces. A dictionary is similar to a list, in that it is mutable, can increase or decrease in size, and be nested. Each data element in a dictionary has a key and a value. To create a DataFrame out of a dictionary all that is required is to wrap it in a list. 

Exercise 3: Creating a DataFrame in PySpark from a list of dictionaries

  1. Create a list of dictionaries called top_mobile_phones. Inside the list make three comma separated dictionaries each with keys of “Manufacturer”, “Model”, “Year”, “Million_Units” as shown in the following code:
    top_mobile_phones = [{"Manufacturer": "Nokia", "Model": "1100", "Year": 2003, "Million_Units": 250}, {"Manufacturer": "Nokia", "Model": "1110", "Year": 2005, "Million_Units": 250}, {"Manufacturer": "Apple", "Model": "iPhone 6 & 6+", "Year": 2014, "Million_Units": 222}]
  2. Create a DataFrame called mobile_phones_df from the dictionary list as shown in the following code:
    mobile_phones_df = spark.createDataFrame(top_mobile_phones)
  3. Display the DataFrame and print the schema as shown in the following code:
mobile_phones_df.show()
mobile_phones_df.printSchema()
+------------+-------------+-------------+----+
|Manufacturer|Million_Units|        Model|Year|
+------------+-------------+-------------+----+
|       Nokia|          250|         1100|2003|
|       Nokia|          250|         1110|2005|
|       Apple|          222|iPhone 6 & 6+|2014|
+------------+-------------+-------------+----+
 
root
 |-- Manufacturer: string (nullable = true)
 |-- Million_Units: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- Year: long (nullable = true)

Notice that we didn’t supply the column names to the DataFrame but they still appear. That is because dictionaries have “keys” and these keys make up the columns of the DataFrame. Likewise, the dictionary “values” are the cells in the DataFrame. So, by using dictionaries, Spark can display the DataFrame column names.

Creating DataFrames Manually with Spark Scala

In Scala the createDataFrame() method only has two parameters, the first is the actual data and the second is schema. Two of the more popular options for creating a DataFrame manually in Scala are using lists and Sequences. A list in Scala is different than a list in Python. In Scala, a list is an unchangeable collection of objects of the same data type. But we can get around Scala’s list immutability by holding each row of data in a tuple. In Scala, a tuple is just comma separated data enclosed by parenthesis. In the following exercise, the DataFrame will have two columns because the tuple only has two values. But using this method we can add more columns by adding more values inside the tuple.

Exercise 4: Creating a DataFrame in Scala from a List

  1. Before we can create a DataFrame, we first need a list. In Scala, we can use the List() method to create a list. Create a list called reptile_species_state with three tuples of two vales each inside the list as shown in the following code:
    val reptile_species_state = List(("Arizona", 97), ("Florida", 103), ("Texas", 139))
  2. Create a DataFrame named reptile_df by calling the createDataFrame() method on the SparkSession variable spark. Pass the reptile_species_state list as a parameter to the createDataFrame() method as shown in the following code:
    val reptile_df = spark.createDataFrame(reptile_species_state)
  3. Display the DataFrame and print the schema with the following code:
reptile_df.show()
reptile_df.printSchema
+-------+---+
|     _1| _2|
+-------+---+
|Arizona| 97|
|Florida|103|
|  Texas|139|
+-------+---+
 
root
 |-- _1: string (nullable = true)
 |-- _2: integer (nullable = false)
 
reptile_species_state: List[(String, Int)] = List((Arizona,97), (Florida,103), (Texas,139))
reptile_df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

As with Python, the createDataFrame() method in Scala correctly inferred the schema and created the DataFrame.

In Scala, a Sequence and a list are very similar. In fact, a list is a type of Sequence. The main difference centers around performance: how Scala provides fast access to the elements in a list or Sequence. But in reality you can treat them the same. A Sequence is called like Seq() with the elements inside the parenthesis separated by commas.

To add column names to a manually created DataFrame in Scala use the toDF() method. toDF() takes comma separated strings and converts the DataFrame with unnamed columns into a DataFrame with named columns. 

Exercise 5: Creating a DataFrame in Scala from a Sequence

  1. In Scala, a Sequence is created using “Seq()”. Make a Sequence called bird_species_state and inside construct three tuples as shown in the following code:
    val bird_species_state = Seq(("Alaska", 506), ("California", 683), ("Colorado", 496))
  2. Create a DataFrame called birds_df. But this time call the toDF() method with comma separated strings for column names as shown in the following code:
    val birds_df = spark.createDataFrame(bird_species_state).toDF("state","bird_species")
  3. Display the contents of the DataFrame and print the schema with the following code:
birds_df.show()
birds_df.printSchema
+----------+------------+
|     state|bird_species|
+----------+------------+
|    Alaska|         506|
|California|         683|
|  Colorado|         496|
+----------+------------+
 
root
 |-- state: string (nullable = true)
 |-- bird_species: integer (nullable = false)
 
bird_species_state: Seq[(String, Int)] = List((Alaska,506), (California,683), (Colorado,496))
birds_df: org.apache.spark.sql.DataFrame = [state: string, bird_species: int]

Like Python dictionaries, Scala Sequences are prevalent and are used often. Being exposed to them will add a lot of value. In this exercise we showed how to transform data in a Sequence into a DataFrame. 

In this section we showed how to create DataFrame with just data. We will explain creating DataFrames in Scala and Python with a defined schema in the next section.