In the last chapter, the reader was introduced to Spark DataFrames. The reader was shown Spark commands for creating, processing, and working with DataFrames. In this chapter, we will accelerate into more complex and challenging topics like data cleaning, statistics, and ML Pipelines. In the Chapter 1, DataFrames with Spark, we talked about the “what” of Spark DataFrames, which would be the commands to do different things. Here, we will introduce the “why” and the “how” of Spark DataFrames, which would be starting to ask, “why are we doing this” and thinking longer term on how the steps we do at the beginning of a data problem impact us at the end.
This activity will combine the skills and techniques you learned so far in this chapter.
Imagine you are working as an intern at XYZ BigData Analytics Firm, where you are tasked with performing data analysis on a data set to solidify your Spark DataFrame skills.
In this activity you will use the data set “Kew Museum of Economic Botany: Object Dispersals to Schools, 1877-1982” which “is a record of object dispersals from the Kew Museum of Economic Botany to schools between 1877 and 1982” that “is based on entries in the ‘Specimens Distributed’ books (or exit books) in the Museum archive.” The source of the data set can be found at https://figshare.com/articles/Schools_dispersals_data_with_BHL_links/9573956 This activity is a good practice for a real-world scenario because it simulates being given a random data set that you must process and analyze when you have no prior connection to the data set.
Aggregations are combining data into one or many groups and performing statistical operations like average, maximum, and minimum on the groups. In Spark it is easy to group by a categorical column and perform the needed operation. DataFrames allow multiple groupings and multiple aggregations at once.
Since DataFrames are comprised of named columns, in Spark there are many options for performing operations on individual or multiple columns. This section will introduce converting columns to a different data type, adding calculate columns, renaming columns, and dropping columns from a DataFrame.
Now that we have created DataFrames manually and from files, it is time to actually do things with them. This section introduces the fundamental operations on DataFrames: displaying, selecting, filtering, and sorting. As discussed in the Section 1.1, performing operations on a DataFrame actually creates a new DataFrame behind the scenes in Spark, regardless if a new name is given to the DataFrame or the results are displayed. When working with DataFrames the results can be displayed (as will be covered shortly), the results can be saved to a DataFrame with the same name as the original, or the results can be saved to a different DataFrame name.
Standardization is the practice of analyzing columns of data and identifying synonyms or like names for the same item. Similar to how a cat can also be identified as a kitty, kitty cat, kitten or feline, we might want to standardize all of those entries into simply “cat” so our data is less messy and more organized. This can make future processing of the data more streamlined and less complicated. It can also reduce skew, which we address in Addressing Data Cardinality and Skew.
We will learn how to standardize data in the following exercises.
NOTE
From this point onward, future code examples could utilize a line of code like this:
import spark.implicits._
This allows for implicit conversions for Scala objects like RDDs into modern Spark abstractions like Dataset, DataFrame, or Columns. It also supports many convenience functions from Spark SQL like isin() (checking for values in a list) or desc() (descending order sorting).
Two Types of Standardization
There are at least two basic ways to standardize something:
You can recognize when two things are the same but literally different (“puppy” and “dog”) and associate them without actually changing anything.
You can recognize when two things are the same and change them to be consistent (change instances of “puppy” to “dog”).
We’ll show ways to do both in Spark, in Scala and Python. Both involve some form of a synonym library.
Standardization through Suggestion
Exercise Setup
Follow these steps to complete the exercise in SCALA:
Import additional relevant Spark libraries using the following code:
Use the parallelize() function of Spark to turn that Sequence into an RDD as shown in the following code:
val petsRDD = spark.sparkContext.parallelize(my_previous_pets)
Create a DataFrame from the RDD and schema created using the following code:
val petsDF = spark.createDataFrame(petsRDD, StructType(schema))
Use the where() function of the DataFrame in combination with the isin() function (of the implicits library) to only keep rows where the name matches a provided list of dog nouns. Print the results to the console as shown in the following code:
Use the where() function of the DataFrame in combination with the isin() function (of the implicits library) to only keep rows where the name matches a provided list of cat nouns. Print the results to the console as shown in the following code:
As we can see, we were able to use custom standardization logic to decide when a cat is a cat, and a dog is a dog, even when their given type is not consistent. And, generally speaking, the isin() methodology of string comparisons in a list is relatively efficient at scale.
Follow these steps to complete the exercise in PYTHON:
Import additional relevant Spark libraries using the following code:
from pyspark.sql.functions import col
Create a List of Rows, each containing a name and type using the following code:
Use the where() function of the DataFrame in combination with the isin() function (of the implicits library) to only keep rows where the name matches a provided list of dog nouns. Print the results to the console as shown in the following code:
Use the where() function of the DataFrame in combination with the isin() function (of the implicits library) to only keep rows where the name matches a provided list of cat nouns. Print the results to the console as shown in the following code:
This example also demonstrates that you can pass a list to the isin() function, not just a comma-separated list of strings values as demonstrated in the previous step.
The following is the output of the preceding code:
We can see that all cats could be identified in the data set, even though they weren’t all labeled as type ‘cat’.
Standardization through Modification
In the previous exercise, we would quietly identify animals as a certain type if their type was found in a list of common synonyms for the proper type. In this exercise, we will actually modify our data to be standardized, by replacing the similar type value with its preferred, standard alternative.
NOTE
From this point onward, further Scala code examples could, where appropriate, utilize a case class. This is a Scala only abstraction that acts as a simple schema for structured, tabular-style data.
It’s an especially handy tool when paired with the Spark Dataset API, which can make powerful code even simpler to read and work with than DataFrames. It should be created outside of the main() function in Scala, or imported from its own class.
Otherwise, you will experience exceptions.
Example:
case class Person(name:String, age:Int)
Exercise Setup
Follow these steps to complete the exercise in SCALA:
Import additional relevant Spark libraries using the following code:
Then, create a case class called Pet, which contains two columns: nickname and petType using the following code:
case class Pet(nickname: String, petType: String)
Using the petsDF created in the previous exercise, Use the map() function of the DataFrame to compare the petType to a list of common dog and cat synonyms – returning “dog” or “cat”, respectively if there is a match. If there is not a match, return the unmodified petType (thus assuming it cannot be standardized) as shown in the following code:
val standardized_pets = petsDF.map(pet => {
val nickname = pet.getString(pet.fieldIndex("nickname"))
val petType = pet.getString(pet.fieldIndex("type"))
val standardType =
if (Seq("dog", "puppy", "puppy dog", "hound", "canine").contains(petType)){
"dog"
}
else if (Seq("cat", "kitty", "kitten", "feline", "kitty cat").contains(petType)){
"cat"
}
else{
petType
}
Pet(nickname, standardType)
})
Print the results to the console using the following code:
standardized_pets.show()
The following is the output of the preceding code:
As we can observe, Column 1 in the table displays the names of the pets while column 2 displays the type of pet animal they are, cat or dog, after passing through the standardization process.
Follow these steps to complete the exercise in PYTHON:
Create and utilize a standardize() function to compare the petType to a list of common dog and cat nouns – returning “dog” or “cat”, respectively, if there is a match.
def standardize(pet):
name = pet[0]
animal_type = pet[1]
if animal_type in ["dog", "puppy", "puppy dog", "hound", "canine"]:
return name, "dog"
elif animal_type in ["cat", "kitty", "kitten", "feline", "kitty cat"]:
return name, "cat"
else:
return pet
Then, apply the standardize() function to petsRDD (created in the previous exercise) using the map() function. Hint: You can also use a UDF on the DataFrame instead of this RDD map method, but we’ll cover that in a future exercise!
Print the results to the console using the following code:
We can see that annabelle, daisy and julie are identified as cats, while roger, joe and rosco are identified as dogs, even though their types, starting out, were not labeled strictly as such.
In this exercise, we learned how to use custom standardization logic in Spark to identify similar animals, even when their given types differed.
In the next section, we’ll cover Ordering & Sorting – useful for when you want to convey… well, order! See you then!
When working with Spark in a production setting, most likely the data will come from files stored in HDFS on a Hadoop cluster. Spark has built-in support for a lot of different file types. In this section we will cover five of the most popular data file types you will see when working with Spark in a production setting: Parquet, ORC, JSON, Avro, and CSV. Some of these file types were created recently for the Big Data world and others have been around for a while and have uses outside of Hadoop. They represent a good mix of file data types and the capabilities of Spark when dealing with files.
The remainder of chapter 1 uses an open source sample data set originally found at https://github.com/Teradata/kylo/tree/master/samples/sample-data. It contains folders with data files of each of the file types covered in this chapter. All of the folders were copied to HDFS with a path of /user/your_user_name/data. No data was changed from the original version.
Let’s see the following Mac example:
Clone the kylo GitHub open-source repository to a location on your local computer with the following code:
$ git clone https://github.com/Teradata/kylo.git
Upload the sample-data directory to your Hadoop edge node by using the scp command as shown in the following code:
Once on the edge node, use the Hadoop command -put to upload the directory of data from the edge node to a directory in HDFS as shown in the following code:
Parquet is an open-source data file format. Just like Comma Separated Value (CSV) is a type of file format, Parquet is a file format. However, Parquet was built from the ground up to be very fast, efficient, and flexible. Parquet was originally designed for the Hadoop ecosystem, so it fits perfectly with Spark. Many traditional file formats like CSV are row-based. Meaning all the rows will have to be read for each query or operation. This is costly and time consuming. Instead Parquet has a columnar format. Which means it organizes data into columns instead of rows. Querying by column means reads and operations can skip over entire columns that aren’t needed. This makes queries very fast.
Parquet also is unique because the schema of the data is held within the files. This means Spark doesn’t have to infer or guess the schema, it can create DataFrames with the correct column names and data types. Lastly, Parquet was built with advanced compression technology that physically compresses the data for the purpose of taking up less space. For example, one TB of CSV files would only be a couple hundred GBs as Parquet files. This makes storage costs cheaper because there is less data to hold.
Creating DataFrames from Parquet files is straightforward, since Parquet files already contain the schema. There are two main ways of creating DataFrames from Parquet files and the commands are the same in both PySpark and Spark Scala. Both options use the SparkSession variable, spark, as the starting point and then call the read method. The read method is used to read data from files and create DataFrames. The first option calls the parquet method that is purposely designed to read parquet files. The parquet method has one parameter and that is the HDFS path to the parquet file. The second option first calls the format method that accepts a string parameter of the type of files to be read. In this case “parquet”. Then it calls the load method that is the HDFS path to files to be loaded.
Note:
In this chapter many of the Python and Scala Spark commands are identical. Since Scala uses the val keyword when creating variables and Python simply needs the variable name, this section will use “... =” to designate that the command can be written in either Python or Scala to save space. The only difference being Scala uses “val some_name =” while Python uses “some_name =“.
Exercise 10: Creating DataFrames from Parquet Files
To create a DataFrame using the parquet() method use the SparkSession variable and call the read method followed by the parquet() method. Then pass the full HDFS file path of the parquet file you would like to convert into a DataFrame with the following code in both PySpark and Spark Scala:
To create a DataFrame using the load() method, use the SparkSession variable and call the read method followed by the format() method that takes one parameter which is the file type of the file to be converted into a DataFrame. In this case use “parquet“. Then call the load() method with the full HDFS file path to the parquet file as shown in the following code:
If we call .show(5) on any of the newly created DataFrames it will output the first five rows of the DataFrame. The data is made-up data that mimics users logging into a computer system. The output of the DataFrame would look like:
The Optimized Row Columnar (ORC) is a file format that was designed with two main goals: increase data processing speeds and reducing file sizes. ORC is another open-source project built for the Hadoop ecosystem, specifically Hive. Like Parquet, ORC is a columnar file format. But ORC stores data first into stripes which are groups of row data. Within a stripe the data is stored in columns. Also, each stripe has a footer that holds metadata about the columns in the stripe, including each column’s data type, count, min, max, and sum. The ORC architecture is relatively complicated to describe and wrap your mind around and there is much more to it than this brief introduction. But the main point of ORC is that it is meant to be fast and it is. Also, ORC files have great file compression just like Parquet.
In PySpark and Spark Scala a DataFrame is created from ORC files by calling the orc() method or the .format("orc").load() methods.
Following are examples of creating DataFrames from ORC files using the orc() method and the load() method. The code is the same between PySpark and Spark Scala:
All of the previous examples have created DataFrames from a single file. But Spark can easily create a single DataFrame from multiple files. Instead of providing a single file in the path, all that is required is to end the path on a directory that only contains files of single file format and schema. The following example is shown for ORC files but works for all the other file formats.
In the following example a single file is not specified but a directory containing only files of the same format. Also, notice the subtle difference between the two examples. In the HDFS path, the first example ends in only the directory name. In the second example, the HDFS path has a trailing slash. Both of these produce the exact same DataFrame. It doesn’t matter whether the trailing slash is present.
JSON, or JavaScript Object Notation is a file format written in human-readable text in the form of key-value pairs. In JavaScript, an object looks like:
var myObj = {name: "Craig", age: 34, state: "Arkansas"};
Suppose we had a JSON file, some_file.json, that had data equal to the preceding JavaScript object. The contents of that file would be {name: "Craig", age: 33, state: "Arkansas"}. The structure of JSON makes it very easy for humans to read, understand, and even write. But it turns out this JSON structure is very easy for computers to read and parse. Which makes JSON the go-to format for exchanging data between applications and servers.
The preceding JSON example is a very simple example but JSON supports very complex nested structures including objects, arrays, and many different data types. This capability makes it extremely flexible at holding any type of data. Because of JSON’s flexible structure and ease of use for both humans and computers JSON is very popular.
Spark has the ability to automatically infer the schema from JSON data. This makes it very easy to use JSON data in Spark. Also, the json() method has almost twenty optional parameters it accommodates. Everything from error handling to leading zero options to data type configurations. This chapter won’t cover all of the available paramters. But the link below to the official documentation outlines all the paramters.
Following is the code for both PySpark and Spark Scala:
Avro file format is another open-source technology designed for the Hadoop world. But unlike Parquet and ORC, Avro is a row-based file format. The row-based architecture and binary data format of Avro make it ideal for write-heavy operations. If your use-case is primarily focused on writing data then Avro would be a good choice. File formats like Parquet and ORC are primarily designed for reading data.
Another major advantage of Avro is its schema. In fact, Avro stores its schema in JSON. And because Avro’s schema is JSON, the schema fields can be added or subtracted overtime. If we had an Avro file with three columns, “name”, “age”, and “state”, the corresponding JSON schema code would look like the following example. The JSON clearly delineates the table name, all of the column names and their data types, plus other metadata about how the record was generated as shown in the following code:
In Spark, there is not an avro() method. So the only way in PySpark and Spark Scala to create a DataFrame from Avro files is to use the load() method. See the following example.
Following is the code for both PySpark and Spark Scala:
In addition to HDFS paths that include single files and directories, Spark also supports wildcards and collections of files & directories. Let’s explore these options using Avro files as an example.
Wildcards
Spark supports Regular Expressions (RegEx) in the HDFS path. The two most beneficial wildcards are the * character which matches on zero or more characters and the ? character which matches on a single character. In the preceding Avro example, let’s say in the data directory were a mix of Avro files and other files formats. But we only wanted to create a DataFrame of the Avro files. The following example searches for all files in the data directory that end in “.avro”:
Let’s say the data directory consisted of ten files from userdata0.avro through userdata9.avro. And for some reason we wanted to create a DataFrame from a certain subset of the files. We could provide a comma separated collection of the full HDFS paths to each file. For example:
The result would be one DataFrame with data from the three Avro files. This collection of paths will also work on any collection of files and directories. This assumes that all the files have the same schema.
Creating DataFrames from CSV Files
A Comma Separated Value (CSV) is a row-based human readable text format with the data values separated by commas. It is one of the simplest file formats. Because it is plain text the file will take up more space than the other file formats used with Spark. But it is simple to read and write and most text editors can open CSV files.
Creating a DataFrame from CSV files requires several parameters. The most important parameters include the file separator defined as sep, the inferSchema parameter, and the header parameter which specifies if the CSV file(s) have a header row of column names. In Spark, there is a major difference in creating a DataFrame from CSV files between PySpark and Spark Scala.
Exercise 11: Creating DataFrames from CSV Files
In PySpark, the three options consist of spark.read.csv("…"), spark.read.format("csv").load("…"), and spark.read.load("…", format="csv"). Use the following code to use the spark.read.csv("…") option:
In Spark Scala, after calling the read method, use the option() method to add additional configurations. In Python we set parameters inside the csv() or load() methods. But in Scala the only parameter is the HDFS path. See the following code for csv() and load():
CSV files are a very popular option for storing data, especially when data sizes are on the smaller side. This is because the data file is human readable and the files are easy to create. As a result you will create a lot of DataFrames from CSV files over your Spark career.
Like the json() method, the csv() method has many parameters. See the following official documentation links for a complete list.
Also known as grouping, aggregation is the method by which data is summarized by dividing it into common, meaningful groups. At the same time that information is grouped, you can also summarize data points from those groups through a series of SQL-like aggregate functions (i.e. functions that you can run during data aggregations).
Say you run an online e-commerce website that sells shoes. You have data logs that tell you what each customer purchased, and when.
customer
shoe_name
sale_price
purchase_date
landon
blue shoes
5.00
2020-10-01
james
blue shoes
5.00
2020-10-04
zach
white shoes
6.00
2020-10-06
An example data set of sold shoes, called Sales.
At the end of each month, you might want to aggregate that data such that you can see how much revenue each model of shoe brought in that month. In SQL, that might look something like this:
select sum(sale_price) as revenue,
shoe_name
from sales
group by shoe_name
In this example, shoe_name is our grouping field, and the sum total of sales (for each shoe_name) is our aggregation metric. You would expect results to show something like:
item
revenue
blue shoes
10.00
white shoes
6.00
After all, we had two sales of blue shoes at $5/pair, and only one sale of white shoes at $6 a pair. If we had just done sum(sale_price) as revenue, but didn’t group by shoe_name, we’d simply get a total of $16.
Sum, of course, is just one example of an aggregate function. Others include min(), max(), count(), and stdev(), to name a few. All of these can be used to summarize identifiable groups within your data.
In the following exercises, we will learn to analyze data in groups by way of aggregation.
Exercise Setup
Follow these steps to complete the exercise in SCALA:
Import additional relevant Spark libraries using the following code:
Continuing to build on our animal data set (from previous exercises in this guide), Create a Sequence of Rows, each containing an animal name, type, age and color using the following code:
Create a temporary table view of the data in Spark SQL called ‘pets’ using the following code:
petsDF.createOrReplaceTempView('pets')
You’ve now completed the initial setup for this exercise in Python.
If you chose to do the setup for this exercise in Scala, you can now proceed from this point.
Analysis through Aggregation
We now have an in-memory view (i.e. relational table representation) of our data. It only exists within the confines of our Spark application, but it enables us to run SQL queries against it as if it were a real table in a database. This will prove handy time after time in your work with Spark.
Now that we have a query-able view, let’s answer several different questions about the data by using Spark’s rich, comprehensive SQL functionality to query the temporary view we’ve created. Once a table is registered, you can query it as many times as you like, again, as if it were a real table.
As usual, we’ve provided solutions in both Scala and Python for your convenience. Where the solution is the exact same, we have coalesced the code.
What are the three most popular (i.e. recurring) names in the data?
To answer this question, we need to write a SQL query in Spark SQL that gives us the count of name occurrences in the table. You can also do this functionally (with methods on the DataFrame), but this example will showcase the pure SQL approach.
The following shows the code in SCALA and PYTHON:
spark.sql("select nickname, count(*) as occurrences from pets group by nickname order by occurrences desc limit 3").show()
The following is the output of the preceding code:
As can be seen the three most popular names are gus, fred, and daisy.
How old is the oldest cat in the data?
We can use the functional API of Spark SQL to find the maximum age of cats in the data. As demonstrated above, you can also use pure SQL to achieve this – but this example will focus on the purely functional approach.
Follow these steps for SCALA:
Use the where() function of the DataFrame to filter the data to just cats.
Use the agg() function of the DataFrame to select the max age.
Use the show() function of the DataFrame to print the results to the console.
The following is the output of the preceding code:
+--------+
|max(age)|
+--------+
| 15|
+--------+
What are the youngest and oldest cat ages?
We can use the functional API of Spark SQL to to answer this question.
Following are the steps for implementing this in SCALA:
Use the where() function of the DataFrame to filter the data to just cats.
Group the data by type using groupBy().
Then, combine the agg() function of the DataFrame with the min() and max() functions to request two metrics: min and max age. Optional: rename the columns using alias().
Finally, print the results to the console using the show() function of the DataFrame.
As can be seen the youngest cat’s age is 1 and the oldest cat’s age is 15.
Following are the steps for implementing this in PYTHON:
Use the where() function of the DataFrame to filter the data to just cats.
Group the data by type using groupBy().
Then, combine the agg() function of the DataFrame with the min() and max() functions to request two metrics: min and max age. Optional: rename the columns using alias().
Finally, print the results to the console using the show() function of the DataFrame.
As can be seen the number of pets in each colors can be seen here.
And that’s a rough introduction to aggregation in Spark / Spark SQL! There are a lot of powerful operations you can conduct in Spark, so keep exploring the APIs!
In the next section, we’ll cover Standardization – a practice that’s common in Data Engineering but not necessarily in Spark. See you then!
If you’ve spent any time writing SQL, or Structured Query Language, you might already be familiar with the concept of a JOIN. If you’re not, then the short explanation is that you can use it in SQL to combine two or more data tables together, leveraging a column of data that is shared or related between them.
Joining is handy in a number of ways, like supplementing your large dataset with additional information or performing lookups. There are many types of joins, like left, right, inner and full outer, and Spark has multiple implementations of each to make it convenient and fast for you as an engineer/analyst to leverage. It’s all possible using the join() method.
Inner Join: Return records that have matching values in both tables that are being joined together. Rows that do not match on both sides are not kept.
Left (Outer) Join: Return all records from the left table, and only the matched records from the right table. This is useful in situations where you need all the rows from the left table, joined with rows from the right table that match. It can be described as supplementing the Left table with Right table information.
Right (Outer) Join: Return all records from the right table, and only the matched records from the left table. This is useful in situations where you need all the rows from the right table, joined with rows from the left table that match. It’s the reversed version of the Left (Outer) Join.
Full (Outer) Join: Return all records from the left table and the right table, whether they have matching values or not. This is useful in situations where you need like rows to be joined together, but keeping the rows that don’t from both sides.
In the following exercise, we will see how to join two DataFrames.
Follow these steps to complete the exercise in SCALA:
Import additional relevant Spark libraries using the following code:
From the preceding table, we can observe rows having common values in the name and animal columns. Based on this, their category and corresponding food is listed.
Follow these steps to complete the exercise in PYTHON:
Create a List of tuples containing an animal and its category using the following code:
This is just one way to join data in Spark. There is another way within the .join() method called the usingColumn approach.
The usingColumn Join Method
If the exercise were a bit different—say, if the join key/column of the left and right data sets had the same column name—we could enact a join slightly differently, but attain the same results. This is called the usingColumn approach, and it’s handy in its straightforwardness.
We can try the same thing in PYTHON using the following code:
leftData.join(rightData, on="columnInCommon")
There is an extension of this approach that allows you to involve multiple columns in the join, and all you have to do is provide it a Scala Sequence or Python List of the appropriate columns you wish to join on. If you’re familiar with the JOIN USING clause in SQL, this is effectively that. Keep in mind that each value in the list must exist on both sides of the join for this approach to succeed.
A schema is information about the data contained in a DataFrame. Specifically, the number of columns, column names, column data type, and whether the column can contain NULLs. Without a schema, a DataFrame would be a group of disorganized things. The schema gives the DataFrame structure and meaning.
Without a schema, a DataFrame would be a group of disorganized things. The schema gives the DataFrame structure and meaning.
Specifying the Schema of a DataFrame
In the previous section we introduced the createDataFrame() method. In PySpark, this method looks like:
After the required data parameter the first optional parameter is schema. The most useful options for the schema parameter include: None (or not included), a list of column names, or a StructType.
If schema is None or left out, then Spark will try to infer the column names and the column types from the data. If schema is a list of column names, then Spark will add the column names in the order specified and will try to infer the column types from the data. In both of these cases Spark uses the number of rows specified in the second optional parameter, samplingRatio, to infer the schema from the data. If not included or given None, then only the top row is used to infer the schema.
To illustrate, say we had some data with a variable named computer_sales with columns “product_code”, “computer_name”, and “sales”. The following illustrates all the options the createDataFrame() method can handle in PySpark.
The following code is used when only the data parameter is provided or the schema is set to None or left blank:
The following code is used when the data parameter is specified along with a Python list of column names and the first two rows will be used to infer the schema:
The following is used to infer the schema from every row in the DataFrame. len() is a Python function that returns an integer of the number of values in a list. Since the number of values in the list computer_sales equals the number of rows in the DataFrame, the samplingRatio parameter will evaluate every row in the DataFrame to infer the schema:
Exercise 6: Creating a DataFrame in PySpark with only named columns
Create a nested list called home_computers as shown in the following code: home_computers = [["Honeywell", "Honeywell 316#Kitchen Computer", "DDP 16 Minicomputer", 1969], ["Apple Computer", "Apple II series", "6502", 1977], ["Bally Consumer Products", "Bally Astrocade", "Z80", 1977]]
Create a DataFrame but this time the column names of the DataFrame are given explicitly as a list in the second parameter as shown in the following code: computers_df = spark.createDataFrame(home_computers, ["Manufacturer", "Model", "Processor", "Year"]) Since the third parameter samplingRatio is not included, Spark uses the first row of data to infer the data types of the columns.
Show the contents of the DataFrame and display the schema with the following code: computers_df.show() computers_df.printSchema()
Running the preceding code displays the following:
Columns names make DataFrames exceptionally useful. The PySpark API makes adding columns names to a DataFrame very easy.
Schemas, StructTypes, and StructFields
The most rigid and defined option for schema is the StructType. It is important to note that the schema of a DataFrame is a StructType. If a DataFrame is created without column names and Spark infers the data types based upon the data, a StructType is still created in the background by Spark.
A manually created PySpark DataFrame, like the following example, still has a StructType schema:
To recap, the schema of a DataFrame is stored as a StructType object. The StructType object consists of a list of StructFields. The StructFields are the information about the columns of a DataFrame. Use the following code for Spark Scala:
It is important to note that the schema of a DataFrame is a StructType
StructFields are objects that correspond to each column of the DataFrame and are constructed with the name, data type, and a boolean value of whether the column can contain NULLs. The second parameter of a StructFieldis the columns data type: string, integer, decimal, datetime, and so on. To use data types in Spark the types module must be called. Imports in Scala and Python are code that is not built-in the main module. For example, DataFrames are part of the main code class. But ancillary things like data types and functions are not and must be imported to be used in your file. The following code is the Scala import for all of the data types:
import org.apache.spark.sql.types._
The following code is the Python import for all of the data types:
from pyspark.sql.types import *
Note:
StructType and StructField are actually Spark data types themselves. They are included in the preceding data imports that import all the members of the data types class. To import StructType and StructField individually use the following code for Scala Spark: import org.apache.spark.sql.types.{StructType, StructField}
To import StructType and StructField individually use the following code for PySpark: from pyspark.sql.types import StructType, StructField
Exercise 7: Creating a DataFrame in PySpark with a Defined Schema
Import all the PySpark data types at once (that include both StructType and StructField) and make a nested list of data with the following code:
Spark schemas are the structure or the scaffolding of a DataFrame. Just like a building would collapse without structure, so too would a DataFrame. Without structure Spark wouldn’t be able to scale to trillions and trillions of rows.
Spark schemas are the structure or the scaffolding of a DataFrame
Another way to create a DataFrame in Spark Scala is to use Rows wrapped in a Sequence. We have already introduced the Sequence object. The Row object is one row in a DataFrame with each value representing cells in different columns.
However, a Sequence of Rows requires several things in order to be created into a DataFrame. First a Sequence of Rows must be “parallelized” which means the data that is held in the driver is distributed to all the nodes of the clusters. This is done by calling the parallelize() method on the SparkContext (which is created from the SparkSession). An example would look like spark.sparkContext.parallelize(data). The second thing required to make a DataFrame out of a Sequence of Rows is providing a StructType schema. Unfortunately, not passing a schema or using the toDF() method causes an error.
Exercise 8: Creating a DataFrame in Spark Scala with a Defined Schema
Import the Scala data types and then make a Sequence of Rows called grocery_items. The Seq() wraps around each Row() object. Each comma separated value in the Row() will be a column in the DataFrame:
Construct the schema using the StructType and StructFields. First make a StructType which holds a Scala list. Inside the list are the individual StructFields which make up the columns of the DataFrame
Make a DataFrame using the createDataFrame() method. But when using the Sequence of Rows we first have to paralize the data. Inside createDataFrame() call the SparkSession by the “spark” variable then call sparkContext and then finally parallelize followed by the data variable grocery_items. The second parameter is the StructType grocery_schema:
val grocery_df = spark.createDataFrame(spark.sparkContext.parallelize(grocery_items), grocery_schema)
The add() method can be used interchangeably and in addition to the StructFields objects on a StructType. The add() method takes the same parameters as the StructField object. The following schemas are all equivalent representations:
Running the preceding code will display the following:
True
True
In Spark Scala, the add() method is exactly the same except for a blank Scala list must be placed inside the blank StructType. See variables schema2 and schema3 for the only difference between Spark Scala and PySpark:
The add() method can be used when adding a new column to already existing DataFrame. In the following example, sales_schema is the schema of a DataFrame. And we want to add another column to the DataFrame. We can use the add() method on a StructField to the original StructType to create a brand new StructType.
Exercise 9: Using the add() Method
Create a schema of a StructType named sales_schema that has two columns. The first column “user_id” is a long data type and cannot be nullable. The second column “product_item” is a string data type and can be nullable. Following is the code for PySpark:
Create a StructField called sales_field that has a column name of “total_sales” with a long data type that can be nullable. Following is the code for PySpark: sales_field = StructField("total_sales", LongType(), True) Following is the code for Scala Spark: val sales_field = StructField("total_sales", LongType, true)
Use the add() method to add the sales_field StructField to the sales_schema StructType. Following is the code for PySpark: another_schema = sales_schema.add(sales_field) Following is the code for Scala Spark: val another_schema = sales_schema.add(sales_field)
Print out the new schema variable to verify it looks correctly: print(another_schema)
Use the add() method when adding columns to a DataFrame
You can use the schema method on a DataFrame in conjunction with the add() method to add new fields to the schema of an already existing DataFrame. In Spark, a DataFrame’s schema is a StructType. In the preceding exercise we manually specified the schema as StructType. Spark has a shortcut: the schema method. The method schema can be called on an existing DataFrame to return its schema, that is a StructType. So in Spark Scala or PySpark you would call some_df.schema to output the StructType schema.
Here is an example in PySpark of the output of the schema method on a DataFrame returning a StructType schema:
So, with the schema method you don’t have to manually create the StructType to add a new column. Just call the schema method on the DataFrame and then use the add method to add a column as a StructField. Following is the code for PySpark:
In the Modifying DataFrames section of this chapter we will see how to add new columns of data to a DataFrame.
Return column names from a schema
Use the fieldNames method on a StructType to return a list or array of the column names. This is an easy way to return the column names of a DataFrame. The fieldNames method can be called on a StructType or after the schema method on a DataFrame. The only difference between Spark Scala and PySpark is that PySpark requires trailing parenthesis () and Spark Scala omits the parenthesis.
In PySpark call the fieldNames() method on a schema and on the DataFrame to return the column names of the schema:
In Spark Scala call the fieldNames() method on a schema and on the DataFrame to return the column names of the schema. In Scala we cannot directly print the contents of an array like we can print a list in Python. So here we create a variable and Scala outputs the contents of the array:
val columns1 = grocery_df.schema.fieldNames
val columns2 = grocery_schema.fieldNames
The output is an array of the column names of the DataFrame:
So far, we have dealt with flat and orderly DataFrame schema. But Spark supports nested columns where a column can contain more sets of data.
So far, we have dealt with flat and orderly DataFrame schema. But Spark supports nested columns where a column can contain more sets of data. Suppose we had a data set that looked like the following Python dictionary or JSON object:
This data set would be the result of some imaginary sales tables that was joined to an orders table. We will look at joining DataFrames together in Chapter 3, SQL with Spark.
It is difficult to see from the nested dictionary but there are three columns: id, name, and orders. But orders is special, because it is a list of lists. In Python we can directly use this data by wrapping it in brackets as mentioned earlier.
If we used this list and made a DataFrame without specifying a schema, the output would not be very usable or readable. The following PySpark code uses the preceding nested JSON data to make a Spark DataFrame. The DataFrame and schema is displayed to demonstrate what can happen when you make a DataFrame with nested data without a schema:
The output is not readable or user friendly with the “->” characters and Spark is trying to make a map of the data. Let’s add a schema to tell Spark exactly how we want to structure the DataFrame. The following PySpark code demonstrates the results of nested data when using a schema:
Here we called the order_schema inside the sales_schema. This shows how versatile schemas can be and how easy it is in Spark to construct complex schemas. Now let’s make a DataFrame that is readable and well structured as shown in the following code: