# Spark Starter Guide 2.1: DataFrame Data Analysis

## Introduction

In the last chapter, the reader was introduced to Spark DataFrames. The reader was shown Spark commands for creating, processing, and working with DataFrames. In this chapter, we will accelerate into more complex and challenging topics like data cleaning, statistics, and ML Pipelines. In the Chapter 1DataFrames with Spark, we talked about the “what” of Spark DataFrames, which would be the commands to do different things. Here, we will introduce the “why” and the “how” of Spark DataFrames, which would be starting to ask, “why are we doing this” and thinking longer term on how the steps we do at the beginning of a data problem impact us at the end.

# Spark Starter Guide 1.7: Chapter 1 Activity

This activity will combine the skills and techniques you learned so far in this chapter.

Imagine you are working as an intern at XYZ BigData Analytics Firm, where you are tasked with performing data analysis on a data set to solidify your Spark DataFrame skills.

In this activity you will use the data set “Kew Museum of Economic Botany: Object Dispersals to Schools, 1877-1982” which “is a record of object dispersals from the Kew Museum of Economic Botany to schools between 1877 and 1982” that “is based on entries in the ‘Specimens Distributed’ books (or exit books) in the Museum archive.” The source of the data set can be found at https://figshare.com/articles/Schools_dispersals_data_with_BHL_links/9573956 This activity is a good practice for a real-world scenario because it simulates being given a random data set that you must process and analyze when you have no prior connection to the data set.

# Spark Starter Guide 1.6: DataFrame Aggregations

## Introduction

Aggregations are combining data into one or many groups and performing statistical operations like average, maximum, and minimum on the groups. In Spark it is easy to group by a categorical column and perform the needed operation. DataFrames allow multiple groupings and multiple aggregations at once.

# Spark Starter Guide 1.5: DataFrame Columns

## Introduction

Since DataFrames are comprised of named columns, in Spark there are many options for performing operations on individual or multiple columns. This section will introduce converting columns to a different data type, adding calculate columns, renaming columns, and dropping columns from a DataFrame.

# Spark Starter Guide 1.4: Modifying Spark DataFrames

## Introduction

Now that we have created DataFrames manually and from files, it is time to actually do things with them. This section introduces the fundamental operations on DataFrames: displaying, selecting, filtering, and sorting. As discussed in the Section 1.1, performing operations on a DataFrame actually creates a new DataFrame behind the scenes in Spark, regardless if a new name is given to the DataFrame or the results are displayed. When working with DataFrames the results can be displayed (as will be covered shortly), the results can be saved to a DataFrame with the same name as the original, or the results can be saved to a different DataFrame name.

# Spark Starter Guide 4.7: How to Standardize Data

Previous post: Spark Starter Guide 4.6: How to Aggregate Data

## Introduction

Standardization is the practice of analyzing columns of data and identifying synonyms or like names for the same item. Similar to how a cat can also be identified as a kitty, kitty cat, kitten or feline, we might want to standardize all of those entries into simply “cat” so our data is less messy and more organized. This can make future processing of the data more streamlined and less complicated. It can also reduce skew, which we address in Addressing Data Cardinality and Skew.

We will learn how to standardize data in the following exercises.

NOTE

From this point onward, future code examples could utilize a line of code like this:

`import spark.implicits._`

This allows for implicit conversions for Scala objects like RDDs into modern Spark abstractions like Dataset, DataFrame, or Columns. It also supports many convenience functions from Spark SQL like `isin()` (checking for values in a list) or `desc()` (descending order sorting).

## Two Types of Standardization

There are at least two basic ways to standardize something:

1. You can recognize when two things are the same but literally different (“puppy” and “dog”) and associate them without actually changing anything.

2. You can recognize when two things are the same and change them to be consistent (change instances of “puppy” to “dog”).

We’ll show ways to do both in Spark, in Scala and Python. Both involve some form of a synonym library.

## Exercise Setup

Follow these steps to complete the exercise in SCALA:

Import additional relevant Spark libraries using the following code:

```import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
```

Import spark.implicits, which will be useful for handy operations in a later step using the following code:

```import spark.implicits._
```

Create a Sequence of Rows, each containing an animal name and type using the following code:

```val my_previous_pets = Seq(Row("annabelle", "cat"),
Row("daisy", "kitten"),
Row("roger", "puppy"),
Row("joe", "puppy dog"),
Row("rosco", "dog"),
Row("julie", "feline"))
```

Create a schema that corresponds to the data using the following code:

```val schema = List(
StructField("nickname", StringType, nullable = true),
StructField("type", StringType, nullable = true)
)
```

Use the parallelize() function of Spark to turn that Sequence into an RDD as shown in the following code:

```val petsRDD = spark.sparkContext.parallelize(my_previous_pets)
```

Create a DataFrame from the RDD and schema created using the following code:

```val petsDF = spark.createDataFrame(petsRDD, StructType(schema))
```

Use the where() function of the DataFrame in combination with the isin() function (of the implicits library) to only keep rows where the name matches a provided list of dog nouns. Print the results to the console as shown in the following code:

```val dogs = petsDF
.where(\$"type"
.isin("dog", "puppy", "puppy dog", "hound", "canine"))

dogs.show()
```

The following is the output of the preceding code:

```+--------+---------+
|nickname|     type|
+--------+---------+
|   roger|    puppy|
|     joe|puppy dog|
|   rosco|      dog|
+--------+---------+
```

Use the where() function of the DataFrame in combination with the isin() function (of the implicits library) to only keep rows where the name matches a provided list of cat nouns. Print the results to the console as shown in the following code:

```val cats = petsDF
.where(\$"type"
.isin ("cat", "kitty", "kitten", "feline", "kitty cat"))

cats.show()
```

The following is the output of the preceding code:

```+---------+------+
| nickname|  type|
+---------+------+
|annabelle|   cat|
|    daisy|kitten|
|    julie|feline|
+---------+------+
```

As we can see, we were able to use custom standardization logic to decide when a cat is a cat, and a dog is a dog, even when their given type is not consistent. And, generally speaking, the isin() methodology of string comparisons in a list is relatively efficient at scale.

Follow these steps to complete the exercise in PYTHON:

Import additional relevant Spark libraries using the following code:

```from pyspark.sql.functions import col
```

Create a List of Rows, each containing a name and type using the following code:

```my_previous_pets = [Row("annabelle", "cat"),
Row("daisy", "kitten"),
Row("roger", "puppy"),
Row("joe", "puppy dog"),
Row("rosco", "dog"),
Row("julie", "feline")]
```

Use the parallelize() function of Spark to turn that List into an RDD as shown in the following code:

```petsRDD = sc.parallelize(my_previous_pets)
```

Create a DataFrame from the RDD and a provided schema using the following code:

```petsDF = spark.createDataFrame(petsRDD, ['nickname', 'type'])
```

Use the where() function of the DataFrame in combination with the isin() function (of the implicits library) to only keep rows where the name matches a provided list of dog nouns. Print the results to the console as shown in the following code:

```dogs = petsDF
.where(col("type")
.isin("dog", "puppy", "puppy dog", "hound", "canine"))

dogs.show()
```

The following is the output of the preceding code:

```+--------+---------+
|nickname|     type|
+--------+---------+
|   roger|    puppy|
|     joe|puppy dog|
|   rosco|      dog|
+--------+---------+
```

Use the where() function of the DataFrame in combination with the isin() function (of the implicits library) to only keep rows where the name matches a provided list of cat nouns. Print the results to the console as shown in the following code:

```cats = petsDF
.where(col("type")
.isin(["cat", "kitty", "kitten", "feline", "kitty cat"]))

cats.show()
```

This example also demonstrates that you can pass a list to the isin() function, not just a comma-separated list of strings values as demonstrated in the previous step.

The following is the output of the preceding code:

```+---------+------+
| nickname|  type|
+---------+------+
|annabelle|   cat|
|    daisy|kitten|
|    julie|feline|
+---------+------+
```

We can see that all cats could be identified in the data set, even though they weren’t all labeled as type ‘cat’.

## Standardization through Modification

In the previous exercise, we would quietly identify animals as a certain type if their type was found in a list of common synonyms for the proper type. In this exercise, we will actually modify our data to be standardized, by replacing the similar type value with its preferred, standard alternative.

NOTE

From this point onward, further Scala code examples could, where appropriate, utilize a case class. This is a Scala only abstraction that acts as a simple schema for structured, tabular-style data.

It’s an especially handy tool when paired with the Spark Dataset API, which can make powerful code even simpler to read and work with than DataFrames. It should be created outside of the main() function in Scala, or imported from its own class.

Otherwise, you will experience exceptions.

Example:

`case class Person(name:String, age:Int)`

## Exercise Setup

Follow these steps to complete the exercise in SCALA:

Import additional relevant Spark libraries using the following code:

```import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
```

Then, create a case class called Pet, which contains two columns: nickname and petType using the following code:

```case class Pet(nickname: String, petType: String)
```

Using the petsDF created in the previous exercise, Use the map() function of the DataFrame to compare the petType to a list of common dog and cat synonyms – returning “dog” or “cat”, respectively if there is a match. If there is not a match, return the unmodified petType (thus assuming it cannot be standardized) as shown in the following code:

```val standardized_pets = petsDF.map(pet => {

val nickname = pet.getString(pet.fieldIndex("nickname"))
val petType = pet.getString(pet.fieldIndex("type"))

val standardType =
if (Seq("dog", "puppy", "puppy dog", "hound", "canine").contains(petType)){
"dog"
}
else if (Seq("cat", "kitty", "kitten", "feline", "kitty cat").contains(petType)){
"cat"
}
else{
petType
}

Pet(nickname, standardType)
})
```

Print the results to the console using the following code:

```standardized_pets.show()
```

The following is the output of the preceding code:

```+---------+---+
|       _1| _2|
+---------+---+
|annabelle|cat|
|    daisy|cat|
|    roger|dog|
|      joe|dog|
|    rosco|dog|
|    julie|cat|
+---------+---+
```

As we can observe, Column 1 in the table displays the names of the pets while column 2 displays the type of pet animal they are, cat or dog, after passing through the standardization process.

Follow these steps to complete the exercise in PYTHON:

Create and utilize a standardize() function to compare the petType to a list of common dog and cat nouns – returning “dog” or “cat”, respectively, if there is a match.

```def standardize(pet):

name = pet
animal_type = pet

if animal_type in ["dog", "puppy", "puppy dog", "hound", "canine"]:
return name, "dog"
elif animal_type in ["cat", "kitty", "kitten", "feline", "kitty cat"]:
return name, "cat"
else:
return pet
```

Then, apply the standardize() function to petsRDD (created in the previous exercise) using the map() function. Hint: You can also use a UDF on the DataFrame instead of this RDD map method, but we’ll cover that in a future exercise!

Print the results to the console using the following code:

```standardizedPets = petsRDD.map(standardize)
standardizedPetsDF = spark.createDataFrame(standardizedPets, ['nickname', 'type'])

standardizedPetsDF.show()
```

The following is the output of the preceding code:

```+---------+----+
| nickname|type|
+---------+----+
|annabelle| cat|
|    daisy| cat|
|    roger| dog|
|      joe| dog|
|    rosco| dog|
|    julie| cat|
+---------+----+
```

We can see that annabelle, daisy and julie are identified as cats, while roger, joe and rosco are identified as dogs, even though their types, starting out, were not labeled strictly as such.

In this exercise, we learned how to use custom standardization logic in Spark to identify similar animals, even when their given types differed.

In the next section, we’ll cover Ordering & Sorting – useful for when you want to convey… well, order! See you then!

# Introduction

When working with Spark in a production setting, most likely the data will come from files stored in HDFS on a Hadoop cluster. Spark has built-in support for a lot of different file types. In this section we will cover five of the most popular data file types you will see when working with Spark in a production setting: Parquet, ORC, JSON, Avro, and CSV. Some of these file types were created recently for the Big Data world and others have been around for a while and have uses outside of Hadoop. They represent a good mix of file data types and the capabilities of Spark when dealing with files.

The remainder of chapter 1 uses an open source sample data set originally found at https://github.com/Teradata/kylo/tree/master/samples/sample-data. It contains folders with data files of each of the file types covered in this chapter. All of the folders were copied to HDFS with a path of `/user/your_user_name/data`. No data was changed from the original version.

Let’s see the following Mac example:

Clone the kylo GitHub open-source repository to a location on your local computer with the following code:

```\$ git clone https://github.com/Teradata/kylo.git
```

Upload the sample-data directory to your Hadoop edge node by using the `scp` command as shown in the following code:

```\$ scp -rp /Users/your_directory/…/kylo/samples/sample-data user_name@cluster_ip_address:/home/your_user_name
```

Once on the edge node, use the Hadoop command `-put` to upload the directory of data from the edge node to a directory in HDFS as shown in the following code:

```\$ hadoop fs -put /home/your_user_name/sample-data /user/your_user_name/data
```

## Creating DataFrames from Parquet Files

Parquet is an open-source data file format. Just like Comma Separated Value (CSV) is a type of file format, Parquet is a file format. However, Parquet was built from the ground up to be very fast, efficient, and flexible. Parquet was originally designed for the Hadoop ecosystem, so it fits perfectly with Spark. Many traditional file formats like CSV are row-based. Meaning all the rows will have to be read for each query or operation. This is costly and time consuming. Instead Parquet has a columnar format. Which means it organizes data into columns instead of rows. Querying by column means reads and operations can skip over entire columns that aren’t needed. This makes queries very fast.

Parquet also is unique because the schema of the data is held within the files. This means Spark doesn’t have to infer or guess the schema, it can create DataFrames with the correct column names and data types. Lastly, Parquet was built with advanced compression technology that physically compresses the data for the purpose of taking up less space. For example, one TB of CSV files would only be a couple hundred GBs as Parquet files. This makes storage costs cheaper because there is less data to hold.

Creating DataFrames from Parquet files is straightforward, since Parquet files already contain the schema. There are two main ways of creating DataFrames from Parquet files and the commands are the same in both PySpark and Spark Scala. Both options use the `SparkSession` variable, `spark`, as the starting point and then call the `read` method. The `read` method is used to read data from files and create DataFrames. The first option calls the `parquet` method that is purposely designed to read parquet files. The `parquet` method has one parameter and that is the HDFS path to the parquet file. The second option first calls the `format` method that accepts a string parameter of the type of files to be read. In this case “parquet”. Then it calls the `load` method that is the HDFS path to files to be loaded.

Note:

In this chapter many of the Python and Scala Spark commands are identical. Since Scala uses the `val` keyword when creating variables and Python simply needs the variable name, this section will use “`... =`” to designate that the command can be written in either Python or Scala to save space. The only difference being Scala uses “`val some_name =`” while Python uses “`some_name =`“.

## Exercise 10: Creating DataFrames from Parquet Files

1. To create a DataFrame using the parquet() method use the SparkSession variable and call the read method followed by the parquet() method. Then pass the full HDFS file path of the parquet file you would like to convert into a DataFrame with the following code in both PySpark and Spark Scala:
```… = spark.read.parquet("hdfs://user/your_user_name/data/userdata1.parquet")
```
1. To create a DataFrame using the `load()` method, use the `SparkSession` variable and call the `read` method followed by the `format()` method that takes one parameter which is the file type of the file to be converted into a DataFrame. In this case use “`parquet`“. Then call the `load()` method with the full HDFS file path to the parquet file as shown in the following code:
```… = spark.read.format("parquet").load("hdfs://user/your_user_name/data/userdata1.parquet")
```
1. If we call `.show(5)` on any of the newly created DataFrames it will output the first five rows of the DataFrame. The data is made-up data that mimics users logging into a computer system. The output of the DataFrame would look like:
```+-------------------+---+----------+---------+--------------------+------+--------------+----------------+------------+---------+---------+--------------------+--------+
+-------------------+---+----------+---------+--------------------+------+--------------+----------------+------------+---------+---------+--------------------+--------+
|2016-02-03 07:55:29|  1|    Amanda|   Jordan|    ajordan0@com.com|Female|   1.197.201.2|6759521864920116|   Indonesia| 3/8/1971| 49756.53|    Internal Auditor|   1E+02|
|2016-02-03 17:04:03|  2|    Albert|  Freeman|     afreeman1@is.gd|  Male|218.111.175.34|                |      Canada|1/16/1968|150280.17|       Accountant IV|        |
|2016-02-03 01:09:31|  3|    Evelyn|   Morgan|emorgan2@altervis...|Female|  7.161.136.94|6767119071901597|      Russia| 2/1/1960|144972.51| Structural Engineer|        |
|2016-02-03 00:36:21|  4|    Denise|    Riley|    driley3@gmpg.org|Female| 140.35.109.83|3576031598965625|       China| 4/8/1997| 90263.05|Senior Cost Accou...|        |
|2016-02-03 05:05:31|  5|    Carlos|    Burns|cburns4@miitbeian...|      |169.113.235.40|5602256255204850|South Africa|         |     null|                    |        |
+-------------------+---+----------+---------+--------------------+------+--------------+----------------+------------+---------+---------+--------------------+--------+
only showing top 5 rows
```

## Creating DataFrames from ORC Files

The Optimized Row Columnar (ORC) is a file format that was designed with two main goals: increase data processing speeds and reducing file sizes. ORC is another open-source project built for the Hadoop ecosystem, specifically Hive. Like Parquet, ORC is a columnar file format. But ORC stores data first into stripes which are groups of row data. Within a stripe the data is stored in columns. Also, each stripe has a footer that holds metadata about the columns in the stripe, including each column’s data type, count, min, max, and sum. The ORC architecture is relatively complicated to describe and wrap your mind around and there is much more to it than this brief introduction. But the main point of ORC is that it is meant to be fast and it is. Also, ORC files have great file compression just like Parquet.

Note

ORC files don’t have file extensions.

In PySpark and Spark Scala a DataFrame is created from ORC files by calling the `orc()` method or the `.format("orc").load()` methods.

Following are examples of creating DataFrames from ORC files using the `orc()` method and the `load()` method. The code is the same between PySpark and Spark Scala:

```… = spark.read.orc("hdfs://user/your_user_name/data/orc/userdata1_orc")

```

All of the previous examples have created DataFrames from a single file. But Spark can easily create a single DataFrame from multiple files. Instead of providing a single file in the path, all that is required is to end the path on a directory that only contains files of single file format and schema. The following example is shown for ORC files but works for all the other file formats.

In the following example a single file is not specified but a directory containing only files of the same format. Also, notice the subtle difference between the two examples. In the HDFS path, the first example ends in only the directory name. In the second example, the HDFS path has a trailing slash. Both of these produce the exact same DataFrame. It doesn’t matter whether the trailing slash is present.

```… = spark.read.orc("hdfs://user/your_user_name/data/orc")

```

## Creating DataFrames from JSON

JSON, or JavaScript Object Notation is a file format written in human-readable text in the form of key-value pairs. In JavaScript, an object looks like:

``var myObj = {name: "Craig", age: 34, state: "Arkansas"};``

Suppose we had a JSON file, `some_file.json`, that had data equal to the preceding JavaScript object. The contents of that file would be  `{name: "Craig", age: 33, state: "Arkansas"}`. The structure of JSON makes it very easy for humans to read, understand, and even write. But it turns out this JSON structure is very easy for computers to read and parse. Which makes JSON the go-to format for exchanging data between applications and servers.

The preceding JSON example is a very simple example but JSON supports very complex nested structures including objects, arrays, and many different data types. This capability makes it extremely flexible at holding any type of data. Because of JSON’s flexible structure and ease of use for both humans and computers JSON is very popular.

Spark has the ability to automatically infer the schema from JSON data. This makes it very easy to use JSON data in Spark. Also, the `json()` method has almost twenty optional parameters it accommodates. Everything from error handling to leading zero options to data type configurations. This chapter won’t cover all of the available paramters. But the link below to the official documentation outlines all the paramters.

Following is the code for both PySpark and Spark Scala:

```… = spark.read.json("hdfs://user/your_user_name/data/books1.json")

```

See the official documentation links for PySpark (https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=json#pyspark.sql.DataFrameReader.json) and Scala (https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader@json(paths:String*):org.apache.spark.sql.DataFrame) to view the complete list optional parameters.

## Creating DataFrames from Avro Files

Avro file format is another open-source technology designed for the Hadoop world. But unlike Parquet and ORC, Avro is a row-based file format. The row-based architecture and binary data format of Avro make it ideal for write-heavy operations. If your use-case is primarily focused on writing data then Avro would be a good choice. File formats like Parquet and ORC are primarily designed for reading data.

Another major advantage of Avro is its schema. In fact, Avro stores its schema in JSON. And because Avro’s schema is JSON, the schema fields can be added or subtracted overtime. If we had an Avro file with three columns, “name”, “age”, and “state”, the corresponding JSON schema code would look like the following example. The JSON clearly delineates the table name, all of the column names and their data types, plus other metadata about how the record was generated as shown in the following code:

```    {
"type" : "record",
"name" : "json example",
"doc" : "Schema generated"
"fields" : [ {
"name" : "name",
"type" : [ "null", "string" ],
"default" : null,
"columnName" : "name",
"sqlType" : "12"
}, {
"name" : "age",
"type" : [ "null", "long" ],
"default" : null,
"columnName" : "age",
"sqlType" : "93"
}, {
"name" : "state",
"type" : [ "null", "string" ],
"default" : null,
"columnName" : "state",
"sqlType" : "2"
} ],
"tableName" : "employees"
}
```

In Spark, there is not an `avro()` method. So the only way in PySpark and Spark Scala to create a DataFrame from Avro files is to use the `load()` method. See the following example.

Following is the code for both PySpark and Spark Scala:

```… = spark.read.format("avro").load("hdfs://user/your_user_name/data/userdata1.avro")
```

In addition to HDFS paths that include single files and directories, Spark also supports wildcards and collections of files & directories. Let’s explore these options using Avro files as an example.

### Wildcards

Spark supports Regular Expressions (RegEx) in the HDFS path. The two most beneficial wildcards are the `*` character which matches on zero or more characters and the `?` character which matches on a single character. In the preceding Avro example, let’s say in the `data` directory were a mix of Avro files and other files formats. But we only wanted to create a DataFrame of the Avro files. The following example searches for all files in the `data` directory that end in “.avro”:

```… = spark.read.format("avro").load("hdfs://user/your_user_name/data/*.avro")
```

### Collections

Let’s say the data directory consisted of ten files from `userdata0.avro` through `userdata9.avro`. And for some reason we wanted to create a DataFrame from a certain subset of the files. We could provide a comma separated collection of the full HDFS paths to each file. For example:

```… = spark.read.format("avro").load("hdfs://user/your_user_name/data/userdata1.avro,hdfs://user/data/userdata5.avro,hdfs://user/your_user_name/data/userdata8.avro")
```

The result would be one DataFrame with data from the three Avro files. This collection of paths will also work on any collection of files and directories. This assumes that all the files have the same schema.

## Creating DataFrames from CSV Files

Comma Separated Value (CSV) is a row-based human readable text format with the data values separated by commas. It is one of the simplest file formats. Because it is plain text the file will take up more space than the other file formats used with Spark. But it is simple to read and write and most text editors can open CSV files.

Creating a DataFrame from CSV files requires several parameters. The most important parameters include the file separator defined as sep, the inferSchema parameter, and the header parameter which specifies if the CSV file(s) have a header row of column names. In Spark, there is a major difference in creating a DataFrame from CSV files between PySpark and Spark Scala.

## Exercise 11: Creating DataFrames from CSV Files

1. In PySpark, the three options consist of `spark.read.csv("…")``spark.read.format("csv").load("…")`, and `spark.read.load("…", format="csv")`. Use the following code to use the `spark.read.csv("…")` option:
```csv_pyspark_df1 = spark.read.csv("hdfs://user/your_user_name/data/csv/userdata1.csv"
, sep=","
, inferSchema="true"
```
1. Use the following code to use the `spark.read.format("csv").load("…")` option:
```csv_pyspark_df2 = spark.read.format("csv").load("hdfs://user/your_user_name/data/csv/userdata1.csv"
, sep=","
, inferSchema="true"
```
1. Use the following code to use the `spark.read.load("…", format="csv")` option:
```csv_pyspark_df3 = spark.read.load("hdfs://user/your_user_name/data/csv/userdata1.csv"
, format="csv"
, sep=","
, inferSchema="true"
```
1. In Spark Scala, after calling the `read` method, use the `option()` method to add additional configurations. In Python we set parameters inside the `csv()` or `load()` methods. But in Scala the only parameter is the HDFS path. See the following code for `csv()` and `load()`:
```val csv_scala_df1 = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.csv("hdfs://user/your_user_name/data/csv/userdata1.csv")

.format("csv")
.option("sep", ",")
.option("inferSchema", "true")
```

CSV files are a very popular option for storing data, especially when data sizes are on the smaller side. This is because the data file is human readable and the files are easy to create. As a result you will create a lot of DataFrames from CSV files over your Spark career.

Like the `json()` method, the `csv()` method has many parameters. See the following official documentation links for a complete list.

# Spark Starter Guide 4.6: How to Aggregate Data

Previous post: Spark Starter Guide 4.5: How to Join DataFrames

## Introduction

Also known as grouping, aggregation is the method by which data is summarized by dividing it into common, meaningful groups. At the same time that information is grouped, you can also summarize data points from those groups through a series of SQL-like aggregate functions (i.e. functions that you can run during data aggregations).

Say you run an online e-commerce website that sells shoes. You have data logs that tell you what each customer purchased, and when.

At the end of each month, you might want to aggregate that data such that you can see how much revenue each model of shoe brought in that month. In SQL, that might look something like this:

```select sum(sale_price) as revenue,
shoe_name
from sales
group by shoe_name
```

In this example, shoe_name is our grouping field, and the sum total of sales (for each shoe_name) is our aggregation metric. You would expect results to show something like:

Sum, of course, is just one example of an aggregate function. Others include min(), max(), count(), and stdev(), to name a few. All of these can be used to summarize identifiable groups within your data.

In the following exercises, we will learn to analyze data in groups by way of aggregation.

## Exercise Setup

Follow these steps to complete the exercise in SCALA:

Import additional relevant Spark libraries using the following code:

```import org.apache.spark.sql.functions.{max, min}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
```

Continuing to build on our animal data set (from previous exercises in this guide), Create a Sequence of Rows, each containing an animal name, type, age and color using the following code:

```val my_previous_pets = Seq(
Row("fido", "dog", 4, "brown"),
Row("annabelle", "cat", 15, "white"),
Row("fred", "bear", 29, "brown"),
Row("gus", "parakeet", 2, "black"),
Row("daisy", "cat", 8, "black"),
Row("jerry", "cat", 1, "white"),
Row("fred", "parrot", 1, "brown"),
Row("gus", "fish", 1, "gold"),
Row("gus", "dog", 11, "black"),
Row("daisy", "iguana", 2, "green"),
Row("rufus", "dog", 10, "gold"))
```

Create a schema that mirrors the data you just created using the following code. The schema will be used by Spark to form a DataFrame.

```val schema = List(
StructField("nickname", StringType, nullable = true),
StructField("type", StringType, nullable = true),
StructField("age", IntegerType, nullable = true),
StructField("color", StringType, nullable = true)
)
```

Use the parallelize() function of Spark to turn that Sequence into an RDD as shown in the following code:

```val petsRDD = spark.sparkContext.parallelize(my_previous_pets)
```

Create a DataFrame from the RDD and schema created using the following code:

```val petsDF = spark.createDataFrame(petsRDD, StructType(schema))
```

Create a temporary table view of the data in Spark SQL called ‘pets’ using the following code:

```petsDF.createOrReplaceTempView("pets")
```

You’ve now completed the initial setup for this exercise in Scala. Skip past the subsequent section in Python to continue.

Follow these steps to complete the exercise in PYTHON:

Import relevant Spark SQL libraries using the following code:

```from pyspark.sql import functions as F
```

Continuing to build on our animal data set, Create a List of Rows, each containing an animal name, type, age and color using the following code:

```my_previous_pets = [("fido", "dog", 4, "brown"),
("annabelle", "cat", 15, "white"),
("fred", "bear", 29, "brown"),
("gus", "parakeet", 2, "black"),
("daisy", "cat", 8, "black"),
("jerry", "cat", 1, "white"),
("fred", "parrot", 1, "brown"),
("gus", "fish", 1, "gold"),
("gus", "dog", 11, "black"),
("daisy", "iguana", 2, "green"),
("rufus", "dog", 10, "gold")]
```

Use the parallelize() function of Spark to turn that List into an RDD as shown in the following code:

```petsRDD = sc.parallelize(my_previous_pets)
```

Create a DataFrame from the RDD and a provided schema using the following code:

```petsDF = spark.createDataFrame(petsRDD, ['nickname', 'type', 'age', 'color'])
```

Create a temporary table view of the data in Spark SQL called ‘pets’ using the following code:

```petsDF.createOrReplaceTempView('pets')
```

You’ve now completed the initial setup for this exercise in Python.

If you chose to do the setup for this exercise in Scala, you can now proceed from this point.

## Analysis through Aggregation

We now have an in-memory view (i.e. relational table representation) of our data. It only exists within the confines of our Spark application, but it enables us to run SQL queries against it as if it were a real table in a database. This will prove handy time after time in your work with Spark.

Now that we have a query-able view, let’s answer several different questions about the data by using Spark’s rich, comprehensive SQL functionality to query the temporary view we’ve created. Once a table is registered, you can query it as many times as you like, again, as if it were a real table.

As usual, we’ve provided solutions in both Scala and Python for your convenience. Where the solution is the exact same, we have coalesced the code.

### What are the three most popular (i.e. recurring) names in the data?

To answer this question, we need to write a SQL query in Spark SQL that gives us the count of name occurrences in the table. You can also do this functionally (with methods on the DataFrame), but this example will showcase the pure SQL approach.

The following shows the code in SCALA and PYTHON:

```spark.sql("select nickname, count(*) as occurrences from pets group by nickname order by occurrences desc limit 3").show()
```

The following is the output of the preceding code:

```+--------+-----------+
|nickname|occurrences|
+--------+-----------+
|     gus|          3|
|    fred|          2|
|   daisy|          2|
+--------+-----------+
```

As can be seen the three most popular names are gus, fred, and daisy.

### How old is the oldest cat in the data?

We can use the functional API of Spark SQL to find the maximum age of cats in the data. As demonstrated above, you can also use pure SQL to achieve this – but this example will focus on the purely functional approach.

1. Use the where() function of the DataFrame to filter the data to just cats.
2. Use the agg() function of the DataFrame to select the max age.
3. Use the show() function of the DataFrame to print the results to the console.

In code form:

```petsDF.where("type = 'cat'")
.agg(Map("age" -> "max"))
.show()
```

The following is the output of the preceding code:

```+--------+
|max(age)|
+--------+
|      15|
+--------+
```

The oldest cat in our data is 15!

1. Use the where() function of the DataFrame to filter the data to just cats.
2. Use the agg() function of the DataFrame to select the max age.
3. Use the show() function of the DataFrame to print the results to the console.

In code form:

```petsDF.where("type = 'cat'")\
.agg({"age": "max"})\
.show()
```

The following is the output of the preceding code:

```+--------+
|max(age)|
+--------+
|      15|
+--------+
```

### What are the youngest and oldest cat ages?

We can use the functional API of Spark SQL to to answer this question.

Following are the steps for implementing this in SCALA:

1. Use the where() function of the DataFrame to filter the data to just cats.
2. Group the data by type using groupBy().
3. Then, combine the agg() function of the DataFrame with the min() and max() functions to request two metrics: min and max age. Optional: rename the columns using alias().
4. Finally, print the results to the console using the show() function of the DataFrame.
```petsDF.where("type = 'cat'")
.groupBy("type")
.agg(min("age").alias("min_age"), max("age").alias("max_age"))
.show()
```

The following is the output of the preceding code:

```+----+-------+-------+
|type|min_age|max_age|
+----+-------+-------+
| cat|      1|     15|
+----+-------+-------+
```

As can be seen the youngest cat’s age is 1 and the oldest cat’s age is 15.

Following are the steps for implementing this in PYTHON:

1. Use the where() function of the DataFrame to filter the data to just cats.
2. Group the data by type using groupBy().
3. Then, combine the agg() function of the DataFrame with the min() and max() functions to request two metrics: min and max age. Optional: rename the columns using alias().
4. Finally, print the results to the console using the show() function of the DataFrame.
```petsDF.where(petsDF["type"] == "cat") \
.groupBy("type") \
.agg(F.min("age"), F.max("age")) \
.show()
```

The following is the output of the preceding code:

```+----+-------+-------+
|type|min_age|max_age|
+----+-------+-------+
| cat|      1|     15|
+----+-------+-------+
```

### What is the average dog age?

We can use the functional API of Spark SQL to find the average age of dogs.

Following are the steps for implementing this in SCALA:

1. Use the where() function of the DataFrame to filter the data to just dogs.
2. Group the data by type using groupBy().
3. Use the agg() function of the DataFrame to select the average age.
4. Finally, print the results to the console using the show() function of the DataFrame.
```petsDF.where("type = 'dog'")
.groupBy("type")
.agg("age" -> "avg")
.show()
```

The following is the output of the preceding code:

```+----+-----------------+
|type|         avg(age)|
+----+-----------------+
| dog|8.333333333333334|
+----+-----------------+
```

As can be seen the average dog’s age is 8.334.

Following are the steps for implementing this in PYTHON:

1. Use the where() function of the DataFrame to filter the data to just dogs.
2. Group the data by type using groupBy().
3. Use the agg() function of the DataFrame to select the average age.
4. Finally, print the results to the console using the show() function of the DataFrame.
```petsDF.where("type = 'dog'")\
.groupBy("type")\
.agg(F.avg("age"))\
.show()
```

The following is the output of the preceding code:

```+----+-----------------+
|type|         avg(age)|
+----+-----------------+
| dog|8.333333333333334|
+----+-----------------+
```

As can be seen the average age of dogs is 8.334.

### How many pets of each color are there in the data?

We can use the functional API of Spark SQL to find how many pets of each color exist in the data.

Following are the steps for implementing this in SCALA and PYTHON:

1. Group the data by type using groupBy().
2. Use the groupBy() function of the DataFrame to count the records in each group.
3. Finally, print the results to the console using the show() function of the DataFrame.
```petsDF.groupBy("color").count().show()
```

The following is the output of the preceding code:

```+-----+-----+
|color|count|
+-----+-----+
|green|    1|
|white|    2|
| gold|    2|
|black|    2|
|brown|    3|
+-----+-----+
```

As can be seen the number of pets in each colors can be seen here.

And that’s a rough introduction to aggregation in Spark / Spark SQL! There are a lot of powerful operations you can conduct in Spark, so keep exploring the APIs!

In the next section, we’ll cover Standardization – a practice that’s common in Data Engineering but not necessarily in Spark. See you then!

# Spark Starter Guide 4.5: How to Join DataFrames

Previous post: Spark Starter Guide 4.4: How to Filter Data

## Introduction

If you’ve spent any time writing SQL, or Structured Query Language, you might already be familiar with the concept of a JOIN. If you’re not, then the short explanation is that you can use it in SQL to combine two or more data tables together, leveraging a column of data that is shared or related between them.

Joining is handy in a number of ways, like supplementing your large dataset with additional information or performing lookups. There are many types of joins, like left, right, inner and full outer, and Spark has multiple implementations of each to make it convenient and fast for you as an engineer/analyst to leverage. It’s all possible using the join() method.

• Inner Join: Return records that have matching values in both tables that are being joined together. Rows that do not match on both sides are not kept.
• Left (Outer) Join: Return all records from the left table, and only the matched records from the right table. This is useful in situations where you need all the rows from the left table, joined with rows from the right table that match. It can be described as supplementing the Left table with Right table information.
• Right (Outer) Join: Return all records from the right table, and only the matched records from the left table. This is useful in situations where you need all the rows from the right table, joined with rows from the left table that match. It’s the reversed version of the Left (Outer) Join.
• Full (Outer) Join: Return all records from the left table and the right table, whether they have matching values or not. This is useful in situations where you need like rows to be joined together, but keeping the rows that don’t from both sides.

In the following exercise, we will see how to join two DataFrames.

Follow these steps to complete the exercise in SCALA:

Import additional relevant Spark libraries using the following code:

```import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions.col
```

Create a Sequence of Rows where the content is a tuple containing an animal and its category using the following code:

```val categorized_animals = Seq(Row("dog", "pet"),
Row("cat", "pet"),
Row("bear", "wild"))
```

Create a schema that corresponds to the data using the following code:

```val schema_animals = List(
StructField("name", StringType, nullable = true),
StructField("category", StringType, nullable = true)
)
```

Use the parallelize() function of Spark to turn that Sequence into an RDD as shown in the following code:

```val animalDataRDD = spark.sparkContext.parallelize(categorized_animals)
```

Create a DataFrame from the RDD and schema created using the following code:

```val animalData = spark.createDataFrame(animalDataRDD, StructType(schema_animals))
```

Create a Sequence of Rows where the content is a tuple containing an animal and its food using the following code:

```val animal_foods = Seq(Row("dog", "kibble"),
Row("cat", "canned tuna"),
Row("bear", "salmon"))
```

Again, we will create a schema that corresponds to the data from the preceding step using the following code:

```val schema_foods = List(
StructField("animal", StringType, nullable = true),
StructField("food", StringType, nullable = true)
)
```

Using the parallelize() function of Spark we will turn that Sequence into an RDD as shown in the following code:

```val animalFoodRDD = spark.sparkContext.parallelize(animal_foods)
```

We will then create a DataFrame from the RDD and schema created using the following code:

```val animalFoods = spark.createDataFrame(animalFoodRDD, StructType(schema_foods))
```

Join one DataFrame to the other on the value they have in common: the animal name. Print the results to the console using the following code:

```val animals_enhanced = animalData.join(
animalFoods,
joinExprs = col(colName = "name") === col(colName = "animal"),
joinType = "left")

animals_enhanced.show()
```

The following is the output of the preceding code:

```+----+--------+------+-----------+
|name|category|animal|       food|
+----+--------+------+-----------+
| dog|     pet|   dog|     kibble|
|bear|    wild|  bear|     salmon|
| cat|     pet|   cat|canned tuna|
+----+--------+------+-----------+
```

From the preceding table, we can observe rows having common values in the name and animal columns. Based on this, their category and corresponding food is listed.

Follow these steps to complete the exercise in PYTHON:

Create a List of tuples containing an animal and its category using the following code:

```categorized_animals = [("dog", "pet"), ("cat", "pet"), ("bear", "wild")]
```

Create a List of tuples containing an animal and its food using the following code:

```animal_foods = [("dog", "kibble"), ("cat", "canned tuna"), ("bear", "salmon")]
```

Use the parallelize() function of Spark to turn those Lists into RDDs as shown in the following code:

```animalDataRDD = sc.parallelize(categorized_animals)
animalFoodRDD = sc.parallelize(animal_foods)
```

Create DataFrames from the RDDs using the following code:

```animalData = spark.createDataFrame(animalDataRDD, ['name', 'category'])
animalFoods = spark.createDataFrame(animalFoodRDD, ['animal', 'food'])
```

Join one DataFrame to the other on the value they have in common: the animal name. Print the results to the console using the following code:

```animals_enhanced = animalData.join(animalFoods, animalData.name == animalFoods.animal)
animals_enhanced.show()
```

The following is the output of the preceding code:

```+----+--------+------+-----------+
|name|category|animal|       food|
+----+--------+------+-----------+
| dog|     pet|   dog|     kibble|
|bear|    wild|  bear|     salmon|
| cat|     pet|   cat|canned tuna|
+----+--------+------+-----------+

```

This is just one way to join data in Spark. There is another way within the .join() method called the usingColumn approach.

## The usingColumn Join Method

If the exercise were a bit different—say, if the join key/column of the left and right data sets had the same column name—we could enact a join slightly differently, but attain the same results. This is called the usingColumn approach, and it’s handy in its straightforwardness.

The following shows the code in SCALA:

```leftData.join(rightData, usingColumn = "columnInCommon")
```

We can try the same thing in PYTHON using the following code:

```leftData.join(rightData, on="columnInCommon")
```

There is an extension of this approach that allows you to involve multiple columns in the join, and all you have to do is provide it a Scala Sequence or Python List of the appropriate columns you wish to join on. If you’re familiar with the JOIN USING clause in SQL, this is effectively that. Keep in mind that each value in the list must exist on both sides of the join for this approach to succeed.

Following shows the code in SCALA:

```leftData.join(rightData, usingColumns = Seq("firstColumnInCommon", "secondColumnInCommon"))
```

We can try the same thing in PYTHON using the following code:

```leftData.join(rightData, on=['firstColumnInCommon', 'secondColumnInCommon'])
```

These methods are by no means the only way to join data, but they are the most common and straightforward.

In the next section, we will learn data aggregation.

# Introduction

A schema is information about the data contained in a DataFrame. Specifically, the number of columns, column names, column data type, and whether the column can contain NULLs. Without a schema, a DataFrame would be a group of disorganized things. The schema gives the DataFrame structure and meaning.

## Specifying the Schema of a DataFrame

In the previous section we introduced the createDataFrame() method. In PySpark, this method looks like:

``createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)``

After the required `data` parameter the first optional parameter is `schema`. The most useful options for the `schema` parameter include: `None` (or not included), a list of column names, or a `StructType`.

If `schema` is `None` or left out, then Spark will try to infer the column names and the column types from the `data`. If `schema` is a list of column names, then Spark will add the column names in the order specified and will try to infer the column types from the `data`. In both of these cases Spark uses the number of rows specified in the second optional parameter, `samplingRatio`, to infer the `schema` from the `data`. If not included or given `None`, then only the top row is used to infer the schema.

To illustrate, say we had some data with a variable named `computer_sales` with columns “product_code”, “computer_name”, and “sales”. The following illustrates all the options the `createDataFrame()` method can handle in PySpark.

The following code is used when only the data parameter is provided or the schema is set to `None` or left blank:

```df1 = spark.createDataFrame(computer_sales)
df2 = spark.createDataFrame(computer_sales, None)
```

Both DataFrames are equivalent.

The following is used when the `data` parameter is specified along with a Python list of column names:

```df3 = spark.createDataFrame(computer_sales, ["product_code", "computer_name", "sales"])
```

The following code is used when the `data` parameter is specified along with a Python list of column names and the first two rows will be used to infer the schema:

```df4 = spark.createDataFrame(computer_sales, ["product_code", "computer_name", "sales"], 2)
```

The following is used to infer the schema from every row in the DataFrame. `len()` is a Python function that returns an integer of the number of values in a list. Since the number of values in the list `computer_sales` equals the number of rows in the DataFrame, the `samplingRatio` parameter will evaluate every row in the DataFrame to infer the schema:

```df5 = spark.createDataFrame(computer_sales, ["product_code", "computer_name", "sales"], len(computer_sales))
```

## Exercise 6: Creating a DataFrame in PySpark with only named columns

1. Create a nested list called home_computers as shown in the following code:
`home_computers = [["Honeywell", "Honeywell 316#Kitchen Computer", "DDP 16 Minicomputer", 1969], ["Apple Computer", "Apple II series", "6502", 1977], ["Bally Consumer Products", "Bally Astrocade", "Z80", 1977]]`
2. Create a DataFrame but this time the column names of the DataFrame are given explicitly as a list in the second parameter as shown in the following code:
`computers_df = spark.createDataFrame(home_computers, ["Manufacturer", "Model", "Processor", "Year"])`
Since the third parameter `samplingRatio` is not included, Spark uses the first row of data to infer the data types of the columns.
3. Show the contents of the DataFrame and display the schema with the following code:
`computers_df.show()computers_df.printSchema()`

Running the preceding code displays the following:

```+--------------------+--------------------+-------------------+----+
|        Manufacturer|               Model|          Processor|Year|
+--------------------+--------------------+-------------------+----+
|           Honeywell|Honeywell 316#Kit...|DDP 16 Minicomputer|1969|
|      Apple Computer|     Apple II series|               6502|1977|
|Bally Consumer Pr...|     Bally Astrocade|                Z80|1977|
+--------------------+--------------------+-------------------+----+

root
|-- Manufacturer: string (nullable = true)
|-- Model: string (nullable = true)
|-- Processor: string (nullable = true)
|-- Year: long (nullable = true)
```

Columns names make DataFrames exceptionally useful. The PySpark API makes adding columns names to a DataFrame very easy.

## Schemas, StructTypes, and StructFields

The most rigid and defined option for `schema` is the `StructType`. It is important to note that the schema of a DataFrame is a `StructType`. If a DataFrame is created without column names and Spark infers the data types based upon the data, a `StructType` is still created in the background by Spark.

A manually created PySpark DataFrame, like the following example, still has a `StructType` schema:

```computers_df = spark.createDataFrame(home_computers)
computers_df.show()
```
```+--------------------+--------------------+-------------------+----+
|        Manufacturer|               Model|          Processor|Year|
+--------------------+--------------------+-------------------+----+
|           Honeywell|Honeywell 316#Kit...|DDP 16 Minicomputer|1969|
|      Apple Computer|     Apple II series|               6502|1977|
|Bally Consumer Pr...|     Bally Astrocade|                Z80|1977|
+--------------------+--------------------+-------------------+----+
```

The schema can be displayed in PySpark by calling the `schema` method on a DataFrame like: `computers_df.schema`

Running the preceding code displays the following:

```Out: StructType(List(StructField(_1,StringType,true),StructField(_2,StringType,true),StructField(_3,StringType,true),StructField(_4,LongType,true)))
```

To recap, the `schema` of a DataFrame is stored as a `StructType` object. The `StructType` object consists of a list of `StructFields`. The `StructFields` are the information about the columns of a DataFrame. Use the following code for Spark Scala:

```import org.apache.spark.sql.types.{StructType, StructField}

val schema = StructType(
List(
StructField("Manufacturer", IntegerType, true),
StructField("Model", StringType, true),
StructField("Processor", StringType, true),
StructField("Year", LongType, true)
)
)
```

Use the following code for PySpark:

```from pyspark.sql.types import StructType, StructField

schema = StructType([
StructField("Manufacturer", StringType(), True),
StructField("Model", StringType(), True),
StructField("Processor", StringType(), True),
StructField("Year", LongType(), True)
])
```

`StructFields` are objects that correspond to each column of the DataFrame and are constructed with the name, data type, and a boolean value of whether the column can contain NULLs. The second parameter of a `StructFieldis` the columns data type: string, integer, decimal, datetime, and so on. To use data types in Spark the `types` module must be called. Imports in Scala and Python are code that is not built-in the main module. For example, DataFrames are part of the main code class. But ancillary things like data types and functions are not and must be imported to be used in your file. The following code is the Scala import for all of the data types:

``import org.apache.spark.sql.types._``

The following code is the Python import for all of the data types:

``from pyspark.sql.types import *``

Note:

`StructType` and `StructField` are actually Spark data types themselves. They are included in the preceding data imports that import all the members of the data types class. To import `StructType` and `StructField` individually use the following code for Scala Spark:
`import org.apache.spark.sql.types.{StructType, StructField}`

To import `StructType` and `StructField` individually use the following code for PySpark:
`from pyspark.sql.types import StructType, StructField`

## Exercise 7: Creating a DataFrame in PySpark with a Defined Schema

1. Import all the PySpark data types at once (that include both `StructType` and `StructField`) and make a nested list of data with the following code:
```from pyspark.sql.types import *

customer_list = [[111, "Jim", 45.51], [112, "Fred", 87.3], [113, "Jennifer", 313.69], [114, "Lauren", 28.78]]
```
1. Construct the schema using the `StructType` and `StructField`. First make a `StructType` which holds a Python list as shown in the following code:
```customer_schema = StructType([
StructField("customer_id", LongType(), True),
StructField("first_name", StringType(), True),
StructField("avg_shopping_cart", DoubleType(), True)
])
```

Inside the list are the individual `StructField` which make up the columns of the DataFrame.

1. Make the DataFrame with the code below. In the `createDataFrame()` method the first parameter is the data and the second is the schema.
```customer_df = spark.createDataFrame(customer_list, customer_schema)
```
1. Display the contents and the schema of the DataFrame with the following code:
`customer_df.show()customer_df.printSchema()`

Running the preceding code displays the following:

```+-----------+----------+-----------------+
|customer_id|first_name|avg_shopping_cart|
+-----------+----------+-----------------+
|        111|       Jim|            45.51|
|        112|      Fred|             87.3|
|        113|  Jennifer|           313.69|
|        114|    Lauren|            28.78|
+-----------+----------+-----------------+

root
|-- customer_id: long (nullable = true)
|-- first_name: string (nullable = true)
|-- avg_shopping_cart: double (nullable = true)
```

Spark schemas are the structure or the scaffolding of a DataFrame. Just like a building would collapse without structure, so too would a DataFrame. Without structure Spark wouldn’t be able to scale to trillions and trillions of rows.

Another way to create a DataFrame in Spark Scala is to use Rows wrapped in a Sequence. We have already introduced the Sequence object. The Row object is one row in a DataFrame with each value representing cells in different columns.

However, a Sequence of Rows requires several things in order to be created into a DataFrame. First a Sequence of Rows must be “parallelized” which means the data that is held in the driver is distributed to all the nodes of the clusters. This is done by calling the `parallelize()` method on the `SparkContext` (which is created from the `SparkSession`). An example would look like `spark.sparkContext.parallelize(data)`. The second thing required to make a DataFrame out of a Sequence of Rows is providing a `StructType` schema. Unfortunately, not passing a schema or using the `toDF()` method causes an error.

## Exercise 8: Creating a DataFrame in Spark Scala with a Defined Schema

1. Import the Scala data types and then make a Sequence of Rows called `grocery_items`. The `Seq()` wraps around each `Row()` object. Each comma separated value in the `Row()` will be a column in the DataFrame:
```import org.apache.spark.sql.types._

val grocery_items = Seq(
Row(1, "stuffing", 4.67),
Row(2, "milk", 3.69),
Row(3, "rolls", 2.99),
Row(4, "potatoes", 5.15),
Row(5, "turkey", 23.99)
)
```
1. Construct the schema using the StructType and StructFields. First make a StructType which holds a Scala list. Inside the list are the individual StructFields which make up the columns of the DataFrame
```val grocery_schema = StructType(
List(
StructField("id", IntegerType, true),
StructField("item", StringType, true),
StructField("price", DoubleType, true)
)
)
```
1. Make a DataFrame using the `createDataFrame()` method. But when using the Sequence of Rows we first have to paralize the data. Inside `createDataFrame()` call the `SparkSession` by the “spark” variable then call `sparkContext` and then finally `parallelize` followed by the data variable `grocery_items`. The second parameter is the StructType `grocery_schema`:
```val grocery_df = spark.createDataFrame(spark.sparkContext.parallelize(grocery_items), grocery_schema)
```

Display the data and print the schema:

```grocery_df.show()
grocery_df.printSchema()
```
```+---+--------+-----+
| id|    item|price|
+---+--------+-----+
|  1|stuffing| 4.67|
|  2|    milk| 3.69|
|  3|   rolls| 2.99|
|  4|potatoes| 5.15|
|  5|  turkey|23.99|
+---+--------+-----+

root
|-- id: integer (nullable = true)
|-- item: string (nullable = true)
|-- price: double (nullable = true)

import org.apache.spark.sql.types._
grocery_items: Seq[org.apache.spark.sql.Row] = List([1,stuffing,4.67], [2,milk,3.69], [3,rols,2.99], [4,potatoes,5.15], [5,turkey,23.99])
grocery_schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(item,StringType,true), StructField(price,DoubleType,true))
grocery_df: org.apache.spark.sql.DataFrame = [id: int, item: string ... 1 more field]
```

## The `add()` Method

The `add()` method can be used interchangeably and in addition to the `StructFields` objects on a `StructType`. The `add()` method takes the same parameters as the `StructField` object. The following schemas are all equivalent representations:

The PySpark code is as follows:

```from pyspark.sql.types import StructType, StructField, StringType, LongType

schema1 = StructType([
StructField("id_column", LongType(), True),
StructField("product_desc", StringType(), True)
])

```

We can confirm that the two schemas are equivalent by comparing if the schema variables are equal to each other and printing the results in Python:

```print(schema1 == schema2)
print(schema1 == schema3)
```

Running the preceding code will display the following:

```True
True
```

In Spark Scala, the `add()` method is exactly the same except for a blank Scala list must be placed inside the blank `StructType`. See variables `schema2` and `schema3` for the only difference between Spark Scala and PySpark:

```import org.apache.spark.sql.types.{StructType, StructField, StringType, LongType}

val scala_schema1 = StructType(List(
StructField("id_column", LongType, true),
StructField("product_desc", StringType, true)
))

```

We can confirm that the two schemas are equivalent by comparing if the schema variables are equal to each other and printing the results in Scala:

```println(scala_schema1 == scala_schema2)
println(scala_schema1 == scala_schema3)
```

Running the preceding code will display the following:

```true
true
import org.apache.spark.sql.types.{StructType, StructField, StringType, LongType}
scala_schema1: org.apache.spark.sql.types.StructType = StructType(StructField(id_column,LongType,true), StructField(product_desc,StringType,true))
scala_schema2: org.apache.spark.sql.types.StructType = StructType(StructField(id_column,LongType,true), StructField(product_desc,StringType,true))
scala_schema3: org.apache.spark.sql.types.StructType = StructType(StructField(id_column,LongType,true), StructField(product_desc,StringType,true))
```

The `add()` method can be used when adding a new column to already existing DataFrame. In the following example, `sales_schema` is the schema of a DataFrame. And we want to add another column to the DataFrame. We can use the `add(`) method on a `StructField` to the original `StructType` to create a brand new `StructType`.

## Exercise 9: Using the add() Method

1. Create a schema of a StructType named sales_schema that has two columns. The first column “user_id” is a long data type and cannot be nullable. The second column “product_item” is a string data type and can be nullable. Following is the code for PySpark:
```from pyspark.sql.types import StructType, StructField, StringType, LongType

sales_schema = StructType([
StructField("user_id", LongType(), False),
StructField("product_item", StringType(), True)
])
```

Following is the code for Scala Spark:

```import org.apache.spark.sql.types.{StructType, StructField, StringType, LongType}

val sales_schema = StructType(List(
StructField("user_id", LongType, true),
StructField("product_item", StringType, true)
))
```
1. Create a StructField called `sales_field` that has a column name of “total_sales” with a long data type that can be nullable. Following is the code for PySpark:
`sales_field = StructField("total_sales", LongType(), True)`
Following is the code for Scala Spark:
`val sales_field = StructField("total_sales", LongType, true)`
2. Use the `add()` method to add the `sales_field` StructField to the `sales_schema` StructType. Following is the code for PySpark:
`another_schema = sales_schema.add(sales_field)`
Following is the code for Scala Spark:
`val another_schema = sales_schema.add(sales_field)`
3. Print out the new schema variable to verify it looks correctly: `print(another_schema)`

Following is the code for PySpark:

```StructType(List(StructField(user_id,LongType,false),StructField(product_item,StringType,true),StructField(total_sales,LongType,true)))
```

Following is the code for Scala Spark:

```println(another_schema)
```
```StructType(StructField(user_id,LongType,true), StructField(product_item,StringType,true), StructField(total_sales,LongType,true))
import org.apache.spark.sql.types.{StructType, StructField, StringType, LongType}
sales_schema: org.apache.spark.sql.types.StructType = StructType(StructField(user_id,LongType,true), StructField(product_item,StringType,true))
sales_field: org.apache.spark.sql.types.StructField = StructField(total_sales,LongType,true)
another_schema: org.apache.spark.sql.types.StructType = StructType(StructField(user_id,LongType,true), StructField(product_item,StringType,true), StructField(total_sales,LongType,true))
```

## Use the `add()` method when adding columns to a DataFrame

You can use the `schema` method on a DataFrame in conjunction with the `add()` method to add new fields to the schema of an already existing DataFrame. In Spark, a DataFrame’s schema is a `StructType`. In the preceding exercise we manually specified the schema as `StructType`. Spark has a shortcut: the `schema` method. The method `schema` can be called on an existing DataFrame to return its schema, that is a `StructType`. So in Spark Scala or PySpark you would call `some_df.schema` to output the `StructType` schema.

Here is an example in PySpark of the output of the `schema` method on a DataFrame returning a `StructType` schema:

```print(customer_df.schema)
```
```StructType(List(StructField(customer_id,LongType,true),StructField(first_name,StringType,true),StructField(avg_shopping_cart,DoubleType,true)))
```

Here is an example in Spark Scala of the output of the `schema` method on a DataFrame returning a `StructType` schema:

```println(grocery_df.schema)
```
```StructType(StructField(id,IntegerType,true), StructField(item,StringType,true), StructField(price,DoubleType,true))
```

So, with the `schema` method you don’t have to manually create the `StructType` to add a new column. Just call the `schema` method on the DataFrame and then use the add method to add a column as a `StructField`. Following is the code for PySpark:

```final_schema = customer_df.schema.add(StructField("new_column", StringType(), True))

print(final_schema)
```

Running the preceding code displays the following:

```StructType(List(StructField(customer_id,LongType,true),StructField(first_name,StringType,true),StructField(avg_shopping_cart,DoubleType,true),StructField(column_x,StringType,true),StructField(column_x,StringType,true),StructField(new_column,StringType,true),StructField(new_column,StringType,true)))
```

Following is the code for Scala Spark:

```val final_schema = grocery_df.schema.add(StructField("new_column", StringType, true))

println(final_schema)
```

Running the preceding code displays the following:

```StructType(StructField(id,IntegerType,true), StructField(item,StringType,true), StructField(price,DoubleType,true), StructField(new_column,StringType,true))
final_schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(item,StringType,true), StructField(price,DoubleType,true), StructField(new_column,StringType,true))
```

In the Modifying DataFrames section of this chapter we will see how to add new columns of data to a DataFrame.

## Return column names from a schema

Use the `fieldNames` method on a `StructType` to return a list or array of the column names. This is an easy way to return the column names of a DataFrame. The `fieldNames` method can be called on a `StructType` or after the `schema` method on a DataFrame. The only difference between Spark Scala and PySpark is that PySpark requires trailing parenthesis `()` and Spark Scala omits the parenthesis.

In PySpark call the `fieldNames()` method on a `schema` and on the DataFrame to return the column names of the schema:

```print( customer_df.schema.fieldNames() )
print( customer_schema.fieldNames() )
```

The output is a list of the column names of the DataFrame:

```['customer_id', 'first_name', 'avg_shopping_cart']
['customer_id', 'first_name', 'avg_shopping_cart']
```

In Spark Scala call the `fieldNames()` method on a schema and on the DataFrame to return the column names of the schema. In Scala we cannot directly print the contents of an array like we can print a list in Python. So here we create a variable and Scala outputs the contents of the array:

```val columns1 = grocery_df.schema.fieldNames
val columns2 = grocery_schema.fieldNames
```

The output is an array of the column names of the DataFrame:

```columns1: Array[String] = Array(id, item, price)
columns2: Array[String] = Array(id, item, price)
```

## Nested Schemas

So far, we have dealt with flat and orderly DataFrame schema. But Spark supports nested columns where a column can contain more sets of data.

So far, we have dealt with flat and orderly DataFrame schema. But Spark supports nested columns where a column can contain more sets of data. Suppose we had a data set that looked like the following Python dictionary or JSON object:

```{"id":101,"name":"Jim","orders":[{"id":1,"price":45.99,"userid":101},{"id":2,"price":17.35,"userid":101}]},{"id":102,"name":"Christina","orders":[{"id":3,"price":245.86,"userid":102}]},{"id":103,"name":"Steve","orders":[{"id":4,"price":7.45,"userid":103},{"id":5,"price":8.63,"userid":103}]}
```

This data set would be the result of some imaginary sales tables that was joined to an orders table. We will look at joining DataFrames together in Chapter 3SQL with Spark.

It is difficult to see from the nested dictionary but there are three columns: `id``name`, and `orders`. But `orders` is special, because it is a list of lists. In Python we can directly use this data by wrapping it in brackets as mentioned earlier.

```nested_sales_data = [{"id":101,"name":"Jim","orders":[{"id":1,"price":45.99,"userid":101},{"id":2,"price":17.35,"userid":101}]},{"id":102,"name":"Christina","orders":[{"id":3,"price":245.86,"userid":102}]},{"id":103,"name":"Steve","orders":[{"id":4,"price":7.45,"userid":103},{"id":5,"price":8.63,"userid":103}]}]
```

If we used this list and made a DataFrame without specifying a schema, the output would not be very usable or readable. The following PySpark code uses the preceding nested JSON data to make a Spark DataFrame. The DataFrame and schema is displayed to demonstrate what can happen when you make a DataFrame with nested data without a schema:

```ugly_df = spark.createDataFrame(nested_sales_data)
ugly_df.show(20, False)
ugly_df.printSchema()
```
```+---+---------+------------------------------------------------------------------------+
|id |name     |orders                                                                  |
+---+---------+------------------------------------------------------------------------+
|101|Jim      |[[id ->, userid ->, price -> 45.99], [id ->, userid ->, price -> 17.35]]|
|102|Christina|[[id ->, userid ->, price -> 245.86]]                                   |
|103|Steve    |[[id ->, userid ->, price -> 7.45], [id ->, userid ->, price -> 8.63]]  |
+---+---------+------------------------------------------------------------------------+

root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- orders: array (nullable = true)
|    |-- element: map (containsNull = true)
|    |    |-- key: string
|    |    |-- value: double (valueContainsNull = true)
```

The output is not readable or user friendly with the “`->`” characters and Spark is trying to make a map of the data. Let’s add a schema to tell Spark exactly how we want to structure the DataFrame. The following PySpark code demonstrates the results of nested data when using a schema:

```from pyspark.sql.types import *

orders_schema = [
StructField("id", IntegerType(), True),
StructField("price", DoubleType(), True),
StructField("userid", IntegerType(), True)
]

sales_schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("orders", ArrayType(StructType(orders_schema)), True)
])
```

Here we called the `order_schema` inside the `sales_schema`. This shows how versatile schemas can be and how easy it is in Spark to construct complex schemas. Now let’s make a DataFrame that is readable and well structured as shown in the following code:

```nested_df = spark.createDataFrame(nested_sales_data, sales_schema)

nested_df.show(20, False)
nested_df.printSchema()
```
```+---+---------+----------------------------------+
|id |name     |orders                            |
+---+---------+----------------------------------+
|101|Jim      |[[1, 45.99, 101], [2, 17.35, 101]]|
|102|Christina|[[3, 245.86, 102]]                |
|103|Steve    |[[4, 7.45, 103], [5, 8.63, 103]]  |
+---+---------+----------------------------------+

root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- orders: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- id: integer (nullable = true)
|    |    |-- price: double (nullable = true)
|    |    |-- userid: integer (nullable = true)
```

In the next section we will move on from manually created DataFrames to creating DataFrames from files stored in Hadoop.