Spark Starter Guide 1.4: Modifying Spark DataFrames

Introduction

Now that we have created DataFrames manually and from files, it is time to actually do things with them. This section introduces the fundamental operations on DataFrames: displaying, selecting, filtering, and sorting. As discussed in the Section 1.1, performing operations on a DataFrame actually creates a new DataFrame behind the scenes in Spark, regardless if a new name is given to the DataFrame or the results are displayed. When working with DataFrames the results can be displayed (as will be covered shortly), the results can be saved to a DataFrame with the same name as the original, or the results can be saved to a different DataFrame name.

Read More »

Spark Starter Guide 1.3: Creating DataFrames from Files

Introduction

When working with Spark in a production setting, most likely the data will come from files stored in HDFS on a Hadoop cluster. Spark has built-in support for a lot of different file types. In this section we will cover five of the most popular data file types you will see when working with Spark in a production setting: Parquet, ORC, JSON, Avro, and CSV. Some of these file types were created recently for the Big Data world and others have been around for a while and have uses outside of Hadoop. They represent a good mix of file data types and the capabilities of Spark when dealing with files.

The remainder of chapter 1 uses an open source sample data set originally found at https://github.com/Teradata/kylo/tree/master/samples/sample-data. It contains folders with data files of each of the file types covered in this chapter. All of the folders were copied to HDFS with a path of /user/your_user_name/data. No data was changed from the original version.

Let’s see the following Mac example:

Clone the kylo GitHub open-source repository to a location on your local computer with the following code:

$ git clone https://github.com/Teradata/kylo.git

Upload the sample-data directory to your Hadoop edge node by using the scp command as shown in the following code:

$ scp -rp /Users/your_directory/…/kylo/samples/sample-data user_name@cluster_ip_address:/home/your_user_name

Once on the edge node, use the Hadoop command -put to upload the directory of data from the edge node to a directory in HDFS as shown in the following code:

$ hadoop fs -put /home/your_user_name/sample-data /user/your_user_name/data

Creating DataFrames from Parquet Files

Parquet is an open-source data file format. Just like Comma Separated Value (CSV) is a type of file format, Parquet is a file format. However, Parquet was built from the ground up to be very fast, efficient, and flexible. Parquet was originally designed for the Hadoop ecosystem, so it fits perfectly with Spark. Many traditional file formats like CSV are row-based. Meaning all the rows will have to be read for each query or operation. This is costly and time consuming. Instead Parquet has a columnar format. Which means it organizes data into columns instead of rows. Querying by column means reads and operations can skip over entire columns that aren’t needed. This makes queries very fast. 

Parquet also is unique because the schema of the data is held within the files. This means Spark doesn’t have to infer or guess the schema, it can create DataFrames with the correct column names and data types. Lastly, Parquet was built with advanced compression technology that physically compresses the data for the purpose of taking up less space. For example, one TB of CSV files would only be a couple hundred GBs as Parquet files. This makes storage costs cheaper because there is less data to hold.

Creating DataFrames from Parquet files is straightforward, since Parquet files already contain the schema. There are two main ways of creating DataFrames from Parquet files and the commands are the same in both PySpark and Spark Scala. Both options use the SparkSession variable, spark, as the starting point and then call the read method. The read method is used to read data from files and create DataFrames. The first option calls the parquet method that is purposely designed to read parquet files. The parquet method has one parameter and that is the HDFS path to the parquet file. The second option first calls the format method that accepts a string parameter of the type of files to be read. In this case “parquet”. Then it calls the load method that is the HDFS path to files to be loaded. 

Note:

In this chapter many of the Python and Scala Spark commands are identical. Since Scala uses the val keyword when creating variables and Python simply needs the variable name, this section will use “... =” to designate that the command can be written in either Python or Scala to save space. The only difference being Scala uses “val some_name =” while Python uses “some_name =“.

Exercise 10: Creating DataFrames from Parquet Files

  1. To create a DataFrame using the parquet() method use the SparkSession variable and call the read method followed by the parquet() method. Then pass the full HDFS file path of the parquet file you would like to convert into a DataFrame with the following code in both PySpark and Spark Scala:
… = spark.read.parquet("hdfs://user/your_user_name/data/userdata1.parquet")
  1. To create a DataFrame using the load() method, use the SparkSession variable and call the read method followed by the format() method that takes one parameter which is the file type of the file to be converted into a DataFrame. In this case use “parquet“. Then call the load() method with the full HDFS file path to the parquet file as shown in the following code:
… = spark.read.format("parquet").load("hdfs://user/your_user_name/data/userdata1.parquet")
  1. If we call .show(5) on any of the newly created DataFrames it will output the first five rows of the DataFrame. The data is made-up data that mimics users logging into a computer system. The output of the DataFrame would look like:
+-------------------+---+----------+---------+--------------------+------+--------------+----------------+------------+---------+---------+--------------------+--------+
|  registration_dttm| id|first_name|last_name|               email|gender|    ip_address|              cc|     country|birthdate|   salary|               title|comments|
+-------------------+---+----------+---------+--------------------+------+--------------+----------------+------------+---------+---------+--------------------+--------+
|2016-02-03 07:55:29|  1|    Amanda|   Jordan|    ajordan0@com.com|Female|   1.197.201.2|6759521864920116|   Indonesia| 3/8/1971| 49756.53|    Internal Auditor|   1E+02|
|2016-02-03 17:04:03|  2|    Albert|  Freeman|     afreeman1@is.gd|  Male|218.111.175.34|                |      Canada|1/16/1968|150280.17|       Accountant IV|        |
|2016-02-03 01:09:31|  3|    Evelyn|   Morgan|emorgan2@altervis...|Female|  7.161.136.94|6767119071901597|      Russia| 2/1/1960|144972.51| Structural Engineer|        |
|2016-02-03 00:36:21|  4|    Denise|    Riley|    driley3@gmpg.org|Female| 140.35.109.83|3576031598965625|       China| 4/8/1997| 90263.05|Senior Cost Accou...|        |
|2016-02-03 05:05:31|  5|    Carlos|    Burns|cburns4@miitbeian...|      |169.113.235.40|5602256255204850|South Africa|         |     null|                    |        |
+-------------------+---+----------+---------+--------------------+------+--------------+----------------+------------+---------+---------+--------------------+--------+
only showing top 5 rows

Creating DataFrames from ORC Files

The Optimized Row Columnar (ORC) is a file format that was designed with two main goals: increase data processing speeds and reducing file sizes. ORC is another open-source project built for the Hadoop ecosystem, specifically Hive. Like Parquet, ORC is a columnar file format. But ORC stores data first into stripes which are groups of row data. Within a stripe the data is stored in columns. Also, each stripe has a footer that holds metadata about the columns in the stripe, including each column’s data type, count, min, max, and sum. The ORC architecture is relatively complicated to describe and wrap your mind around and there is much more to it than this brief introduction. But the main point of ORC is that it is meant to be fast and it is. Also, ORC files have great file compression just like Parquet.

Source: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC#LanguageManualORC-FileStructure

Note

ORC files don’t have file extensions.

In PySpark and Spark Scala a DataFrame is created from ORC files by calling the orc() method or the .format("orc").load() methods.

Following are examples of creating DataFrames from ORC files using the orc() method and the load() method. The code is the same between PySpark and Spark Scala:

… = spark.read.orc("hdfs://user/your_user_name/data/orc/userdata1_orc")
 
… = spark.read.format("orc").load("hdfs://user/your_user_name/data/orc/userdata1_orc")

All of the previous examples have created DataFrames from a single file. But Spark can easily create a single DataFrame from multiple files. Instead of providing a single file in the path, all that is required is to end the path on a directory that only contains files of single file format and schema. The following example is shown for ORC files but works for all the other file formats.

In the following example a single file is not specified but a directory containing only files of the same format. Also, notice the subtle difference between the two examples. In the HDFS path, the first example ends in only the directory name. In the second example, the HDFS path has a trailing slash. Both of these produce the exact same DataFrame. It doesn’t matter whether the trailing slash is present. 

… = spark.read.orc("hdfs://user/your_user_name/data/orc")
 
… = spark.read.format("orc").load("hdfs://user/your_user_name/data/orc/")

Creating DataFrames from JSON

JSON, or JavaScript Object Notation is a file format written in human-readable text in the form of key-value pairs. In JavaScript, an object looks like:

var myObj = {name: "Craig", age: 34, state: "Arkansas"};

Suppose we had a JSON file, some_file.json, that had data equal to the preceding JavaScript object. The contents of that file would be  {name: "Craig", age: 33, state: "Arkansas"}. The structure of JSON makes it very easy for humans to read, understand, and even write. But it turns out this JSON structure is very easy for computers to read and parse. Which makes JSON the go-to format for exchanging data between applications and servers. 

The preceding JSON example is a very simple example but JSON supports very complex nested structures including objects, arrays, and many different data types. This capability makes it extremely flexible at holding any type of data. Because of JSON’s flexible structure and ease of use for both humans and computers JSON is very popular. 

Spark has the ability to automatically infer the schema from JSON data. This makes it very easy to use JSON data in Spark. Also, the json() method has almost twenty optional parameters it accommodates. Everything from error handling to leading zero options to data type configurations. This chapter won’t cover all of the available paramters. But the link below to the official documentation outlines all the paramters.

Following is the code for both PySpark and Spark Scala:

… = spark.read.json("hdfs://user/your_user_name/data/books1.json")
 
… = spark.read.format("json").load("hdfs://user/your_user_name/data/books1.json")

See the official documentation links for PySpark (https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=json#pyspark.sql.DataFrameReader.json) and Scala (https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader@json(paths:String*):org.apache.spark.sql.DataFrame) to view the complete list optional parameters.

Creating DataFrames from Avro Files

Avro file format is another open-source technology designed for the Hadoop world. But unlike Parquet and ORC, Avro is a row-based file format. The row-based architecture and binary data format of Avro make it ideal for write-heavy operations. If your use-case is primarily focused on writing data then Avro would be a good choice. File formats like Parquet and ORC are primarily designed for reading data. 

Another major advantage of Avro is its schema. In fact, Avro stores its schema in JSON. And because Avro’s schema is JSON, the schema fields can be added or subtracted overtime. If we had an Avro file with three columns, “name”, “age”, and “state”, the corresponding JSON schema code would look like the following example. The JSON clearly delineates the table name, all of the column names and their data types, plus other metadata about how the record was generated as shown in the following code:

    {
      "type" : "record",
      "name" : "json example",
      "doc" : "Schema generated"
      "fields" : [ {
        "name" : "name",
        "type" : [ "null", "string" ],
        "default" : null,
        "columnName" : "name",
        "sqlType" : "12"
      }, {
        "name" : "age",
        "type" : [ "null", "long" ],
        "default" : null,
        "columnName" : "age",
        "sqlType" : "93"
      }, {
        "name" : "state",
        "type" : [ "null", "string" ],
        "default" : null,
        "columnName" : "state",
        "sqlType" : "2"
      } ],
      "tableName" : "employees"
    }

In Spark, there is not an avro() method. So the only way in PySpark and Spark Scala to create a DataFrame from Avro files is to use the load() method. See the following example.

Following is the code for both PySpark and Spark Scala:

… = spark.read.format("avro").load("hdfs://user/your_user_name/data/userdata1.avro")

In addition to HDFS paths that include single files and directories, Spark also supports wildcards and collections of files & directories. Let’s explore these options using Avro files as an example. 

Wildcards

Spark supports Regular Expressions (RegEx) in the HDFS path. The two most beneficial wildcards are the * character which matches on zero or more characters and the ? character which matches on a single character. In the preceding Avro example, let’s say in the data directory were a mix of Avro files and other files formats. But we only wanted to create a DataFrame of the Avro files. The following example searches for all files in the data directory that end in “.avro”:

… = spark.read.format("avro").load("hdfs://user/your_user_name/data/*.avro")

Collections

Let’s say the data directory consisted of ten files from userdata0.avro through userdata9.avro. And for some reason we wanted to create a DataFrame from a certain subset of the files. We could provide a comma separated collection of the full HDFS paths to each file. For example:

… = spark.read.format("avro").load("hdfs://user/your_user_name/data/userdata1.avro,hdfs://user/data/userdata5.avro,hdfs://user/your_user_name/data/userdata8.avro")

The result would be one DataFrame with data from the three Avro files. This collection of paths will also work on any collection of files and directories. This assumes that all the files have the same schema. 

Creating DataFrames from CSV Files

Comma Separated Value (CSV) is a row-based human readable text format with the data values separated by commas. It is one of the simplest file formats. Because it is plain text the file will take up more space than the other file formats used with Spark. But it is simple to read and write and most text editors can open CSV files.

Creating a DataFrame from CSV files requires several parameters. The most important parameters include the file separator defined as sep, the inferSchema parameter, and the header parameter which specifies if the CSV file(s) have a header row of column names. In Spark, there is a major difference in creating a DataFrame from CSV files between PySpark and Spark Scala. 

Exercise 11: Creating DataFrames from CSV Files

  1. In PySpark, the three options consist of spark.read.csv("…")spark.read.format("csv").load("…"), and spark.read.load("…", format="csv"). Use the following code to use the spark.read.csv("…") option:
csv_pyspark_df1 = spark.read.csv("hdfs://user/your_user_name/data/csv/userdata1.csv"
  , sep=","
  , inferSchema="true"
  , header="true")
  1. Use the following code to use the spark.read.format("csv").load("…") option:
csv_pyspark_df2 = spark.read.format("csv").load("hdfs://user/your_user_name/data/csv/userdata1.csv"
  , sep=","
  , inferSchema="true"
  , header="true")
  1. Use the following code to use the spark.read.load("…", format="csv") option:
csv_pyspark_df3 = spark.read.load("hdfs://user/your_user_name/data/csv/userdata1.csv"
  , format="csv"
  , sep=","
  , inferSchema="true"
  , header="true")
  1. In Spark Scala, after calling the read method, use the option() method to add additional configurations. In Python we set parameters inside the csv() or load() methods. But in Scala the only parameter is the HDFS path. See the following code for csv() and load():
val csv_scala_df1 = spark.read
  .option("sep", ",")
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("hdfs://user/your_user_name/data/csv/userdata1.csv")
 
val csv_scala_df2 = spark.read
  .format("csv")
  .option("sep", ",")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("hdfs://user/your_user_name/data/csv/userdata1.csv")

CSV files are a very popular option for storing data, especially when data sizes are on the smaller side. This is because the data file is human readable and the files are easy to create. As a result you will create a lot of DataFrames from CSV files over your Spark career. 

Like the json() method, the csv() method has many parameters. See the following official documentation links for a complete list.

PySpark: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=json#pyspark.sql.DataFrameReader.csv

Scala: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader@csv(paths:String*):org.apache.spark.sql.DataFrame


Spark Starter Guide 1.2: Spark DataFrame Schemas

Introduction

A schema is information about the data contained in a DataFrame. Specifically, the number of columns, column names, column data type, and whether the column can contain NULLs. Without a schema, a DataFrame would be a group of disorganized things. The schema gives the DataFrame structure and meaning.

Without a schema, a DataFrame would be a group of disorganized things. The schema gives the DataFrame structure and meaning.

Specifying the Schema of a DataFrame

In the previous section we introduced the createDataFrame() method. In PySpark, this method looks like:

createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)

After the required data parameter the first optional parameter is schema. The most useful options for the schema parameter include: None (or not included), a list of column names, or a StructType.

If schema is None or left out, then Spark will try to infer the column names and the column types from the data. If schema is a list of column names, then Spark will add the column names in the order specified and will try to infer the column types from the data. In both of these cases Spark uses the number of rows specified in the second optional parameter, samplingRatio, to infer the schema from the data. If not included or given None, then only the top row is used to infer the schema.

To illustrate, say we had some data with a variable named computer_sales with columns “product_code”, “computer_name”, and “sales”. The following illustrates all the options the createDataFrame() method can handle in PySpark.

The following code is used when only the data parameter is provided or the schema is set to None or left blank:

df1 = spark.createDataFrame(computer_sales)
df2 = spark.createDataFrame(computer_sales, None)

Both DataFrames are equivalent.

The following is used when the data parameter is specified along with a Python list of column names:

df3 = spark.createDataFrame(computer_sales, ["product_code", "computer_name", "sales"])

The following code is used when the data parameter is specified along with a Python list of column names and the first two rows will be used to infer the schema:

df4 = spark.createDataFrame(computer_sales, ["product_code", "computer_name", "sales"], 2)

The following is used to infer the schema from every row in the DataFrame. len() is a Python function that returns an integer of the number of values in a list. Since the number of values in the list computer_sales equals the number of rows in the DataFrame, the samplingRatio parameter will evaluate every row in the DataFrame to infer the schema:

df5 = spark.createDataFrame(computer_sales, ["product_code", "computer_name", "sales"], len(computer_sales))

Exercise 6: Creating a DataFrame in PySpark with only named columns

  1. Create a nested list called home_computers as shown in the following code:
    home_computers = [["Honeywell", "Honeywell 316#Kitchen Computer", "DDP 16 Minicomputer", 1969], ["Apple Computer", "Apple II series", "6502", 1977], ["Bally Consumer Products", "Bally Astrocade", "Z80", 1977]]
  2. Create a DataFrame but this time the column names of the DataFrame are given explicitly as a list in the second parameter as shown in the following code:
    computers_df = spark.createDataFrame(home_computers, ["Manufacturer", "Model", "Processor", "Year"])
    Since the third parameter samplingRatio is not included, Spark uses the first row of data to infer the data types of the columns.
  3. Show the contents of the DataFrame and display the schema with the following code:
    computers_df.show()
    computers_df.printSchema()

Running the preceding code displays the following:

+--------------------+--------------------+-------------------+----+
|        Manufacturer|               Model|          Processor|Year|
+--------------------+--------------------+-------------------+----+
|           Honeywell|Honeywell 316#Kit...|DDP 16 Minicomputer|1969|
|      Apple Computer|     Apple II series|               6502|1977|
|Bally Consumer Pr...|     Bally Astrocade|                Z80|1977|
+--------------------+--------------------+-------------------+----+
 
root
 |-- Manufacturer: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Processor: string (nullable = true)
 |-- Year: long (nullable = true)

Columns names make DataFrames exceptionally useful. The PySpark API makes adding columns names to a DataFrame very easy. 

Schemas, StructTypes, and StructFields

The most rigid and defined option for schema is the StructType. It is important to note that the schema of a DataFrame is a StructType. If a DataFrame is created without column names and Spark infers the data types based upon the data, a StructType is still created in the background by Spark. 

A manually created PySpark DataFrame, like the following example, still has a StructType schema:

computers_df = spark.createDataFrame(home_computers)
computers_df.show()
+--------------------+--------------------+-------------------+----+
|        Manufacturer|               Model|          Processor|Year|
+--------------------+--------------------+-------------------+----+
|           Honeywell|Honeywell 316#Kit...|DDP 16 Minicomputer|1969|
|      Apple Computer|     Apple II series|               6502|1977|
|Bally Consumer Pr...|     Bally Astrocade|                Z80|1977|
+--------------------+--------------------+-------------------+----+

The schema can be displayed in PySpark by calling the schema method on a DataFrame like: computers_df.schema

Running the preceding code displays the following:

Out[1]: StructType(List(StructField(_1,StringType,true),StructField(_2,StringType,true),StructField(_3,StringType,true),StructField(_4,LongType,true)))

To recap, the schema of a DataFrame is stored as a StructType object. The StructType object consists of a list of StructFields. The StructFields are the information about the columns of a DataFrame. Use the following code for Spark Scala:

import org.apache.spark.sql.types.{StructType, StructField}
 
val schema = StructType(
  List(
    StructField("Manufacturer", IntegerType, true),
    StructField("Model", StringType, true),
    StructField("Processor", StringType, true),
    StructField("Year", LongType, true)
  )
)

Use the following code for PySpark:

from pyspark.sql.types import StructType, StructField
 
schema = StructType([
  StructField("Manufacturer", StringType(), True),
  StructField("Model", StringType(), True),
  StructField("Processor", StringType(), True),
  StructField("Year", LongType(), True)
])

It is important to note that the schema of a DataFrame is a StructType

StructFields are objects that correspond to each column of the DataFrame and are constructed with the name, data type, and a boolean value of whether the column can contain NULLs. The second parameter of a StructFieldis the columns data type: string, integer, decimal, datetime, and so on. To use data types in Spark the types module must be called. Imports in Scala and Python are code that is not built-in the main module. For example, DataFrames are part of the main code class. But ancillary things like data types and functions are not and must be imported to be used in your file. The following code is the Scala import for all of the data types:

import org.apache.spark.sql.types._

The following code is the Python import for all of the data types:

from pyspark.sql.types import *

Note:

StructType and StructField are actually Spark data types themselves. They are included in the preceding data imports that import all the members of the data types class. To import StructType and StructField individually use the following code for Scala Spark:
import org.apache.spark.sql.types.{StructType, StructField}

To import StructType and StructField individually use the following code for PySpark:
from pyspark.sql.types import StructType, StructField

Exercise 7: Creating a DataFrame in PySpark with a Defined Schema

  1. Import all the PySpark data types at once (that include both StructType and StructField) and make a nested list of data with the following code:
from pyspark.sql.types import *

customer_list = [[111, "Jim", 45.51], [112, "Fred", 87.3], [113, "Jennifer", 313.69], [114, "Lauren", 28.78]]
  1. Construct the schema using the StructType and StructField. First make a StructType which holds a Python list as shown in the following code:
customer_schema = StructType([
  StructField("customer_id", LongType(), True),
  StructField("first_name", StringType(), True),
  StructField("avg_shopping_cart", DoubleType(), True)
])

Inside the list are the individual StructField which make up the columns of the DataFrame.

  1. Make the DataFrame with the code below. In the createDataFrame() method the first parameter is the data and the second is the schema.
customer_df = spark.createDataFrame(customer_list, customer_schema)
  1. Display the contents and the schema of the DataFrame with the following code:
    customer_df.show()
    customer_df.printSchema()

Running the preceding code displays the following:

+-----------+----------+-----------------+
|customer_id|first_name|avg_shopping_cart|
+-----------+----------+-----------------+
|        111|       Jim|            45.51|
|        112|      Fred|             87.3|
|        113|  Jennifer|           313.69|
|        114|    Lauren|            28.78|
+-----------+----------+-----------------+
 
root
 |-- customer_id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- avg_shopping_cart: double (nullable = true)

Spark schemas are the structure or the scaffolding of a DataFrame. Just like a building would collapse without structure, so too would a DataFrame. Without structure Spark wouldn’t be able to scale to trillions and trillions of rows. 

Spark schemas are the structure or the scaffolding of a DataFrame

Another way to create a DataFrame in Spark Scala is to use Rows wrapped in a Sequence. We have already introduced the Sequence object. The Row object is one row in a DataFrame with each value representing cells in different columns.

However, a Sequence of Rows requires several things in order to be created into a DataFrame. First a Sequence of Rows must be “parallelized” which means the data that is held in the driver is distributed to all the nodes of the clusters. This is done by calling the parallelize() method on the SparkContext (which is created from the SparkSession). An example would look like spark.sparkContext.parallelize(data). The second thing required to make a DataFrame out of a Sequence of Rows is providing a StructType schema. Unfortunately, not passing a schema or using the toDF() method causes an error.

Exercise 8: Creating a DataFrame in Spark Scala with a Defined Schema

  1. Import the Scala data types and then make a Sequence of Rows called grocery_items. The Seq() wraps around each Row() object. Each comma separated value in the Row() will be a column in the DataFrame:
import org.apache.spark.sql.types._
 
val grocery_items = Seq(
  Row(1, "stuffing", 4.67),
  Row(2, "milk", 3.69),
  Row(3, "rolls", 2.99),
  Row(4, "potatoes", 5.15),
  Row(5, "turkey", 23.99)
)
  1. Construct the schema using the StructType and StructFields. First make a StructType which holds a Scala list. Inside the list are the individual StructFields which make up the columns of the DataFrame
val grocery_schema = StructType(
  List(
    StructField("id", IntegerType, true),
    StructField("item", StringType, true),
    StructField("price", DoubleType, true)
  )
)
  1. Make a DataFrame using the createDataFrame() method. But when using the Sequence of Rows we first have to paralize the data. Inside createDataFrame() call the SparkSession by the “spark” variable then call sparkContext and then finally parallelize followed by the data variable grocery_items. The second parameter is the StructType grocery_schema:
val grocery_df = spark.createDataFrame(spark.sparkContext.parallelize(grocery_items), grocery_schema)

Display the data and print the schema:

grocery_df.show()
grocery_df.printSchema()
+---+--------+-----+
| id|    item|price|
+---+--------+-----+
|  1|stuffing| 4.67|
|  2|    milk| 3.69|
|  3|   rolls| 2.99|
|  4|potatoes| 5.15|
|  5|  turkey|23.99|
+---+--------+-----+
 
root
 |-- id: integer (nullable = true)
 |-- item: string (nullable = true)
 |-- price: double (nullable = true)
 
import org.apache.spark.sql.types._
grocery_items: Seq[org.apache.spark.sql.Row] = List([1,stuffing,4.67], [2,milk,3.69], [3,rols,2.99], [4,potatoes,5.15], [5,turkey,23.99])
grocery_schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(item,StringType,true), StructField(price,DoubleType,true))
grocery_df: org.apache.spark.sql.DataFrame = [id: int, item: string ... 1 more field]

The add() Method

The add() method can be used interchangeably and in addition to the StructFields objects on a StructType. The add() method takes the same parameters as the StructField object. The following schemas are all equivalent representations:

The PySpark code is as follows:

from pyspark.sql.types import StructType, StructField, StringType, LongType
 
schema1 = StructType([
  StructField("id_column", LongType(), True),
  StructField("product_desc", StringType(), True)
])
 
schema2 = StructType().add("id_column", LongType(), True).add("product_desc", StringType(), True)
 
schema3 = StructType().add(StructField("id_column", LongType(), True)).add(StructField("product_desc", StringType(), True))

We can confirm that the two schemas are equivalent by comparing if the schema variables are equal to each other and printing the results in Python:

print(schema1 == schema2)
print(schema1 == schema3)

Running the preceding code will display the following:

True
True

In Spark Scala, the add() method is exactly the same except for a blank Scala list must be placed inside the blank StructType. See variables schema2 and schema3 for the only difference between Spark Scala and PySpark:

import org.apache.spark.sql.types.{StructType, StructField, StringType, LongType}
 
val scala_schema1 = StructType(List(
  StructField("id_column", LongType, true),
  StructField("product_desc", StringType, true)
))
 
val scala_schema2 = StructType(List()).add("id_column", LongType, true).add("product_desc", StringType, true)
 
val scala_schema3 = StructType(List()).add(StructField("id_column", LongType, true)).add(StructField("product_desc", StringType, true))

We can confirm that the two schemas are equivalent by comparing if the schema variables are equal to each other and printing the results in Scala:

println(scala_schema1 == scala_schema2)
println(scala_schema1 == scala_schema3)

Running the preceding code will display the following:

true
true
import org.apache.spark.sql.types.{StructType, StructField, StringType, LongType}
scala_schema1: org.apache.spark.sql.types.StructType = StructType(StructField(id_column,LongType,true), StructField(product_desc,StringType,true))
scala_schema2: org.apache.spark.sql.types.StructType = StructType(StructField(id_column,LongType,true), StructField(product_desc,StringType,true))
scala_schema3: org.apache.spark.sql.types.StructType = StructType(StructField(id_column,LongType,true), StructField(product_desc,StringType,true))

The add() method can be used when adding a new column to already existing DataFrame. In the following example, sales_schema is the schema of a DataFrame. And we want to add another column to the DataFrame. We can use the add() method on a StructField to the original StructType to create a brand new StructType.

Exercise 9: Using the add() Method

  1. Create a schema of a StructType named sales_schema that has two columns. The first column “user_id” is a long data type and cannot be nullable. The second column “product_item” is a string data type and can be nullable. Following is the code for PySpark:
from pyspark.sql.types import StructType, StructField, StringType, LongType
 
sales_schema = StructType([
  StructField("user_id", LongType(), False),
  StructField("product_item", StringType(), True)
])

Following is the code for Scala Spark:

import org.apache.spark.sql.types.{StructType, StructField, StringType, LongType}
 
val sales_schema = StructType(List(
  StructField("user_id", LongType, true),
  StructField("product_item", StringType, true)
))
  1. Create a StructField called sales_field that has a column name of “total_sales” with a long data type that can be nullable. Following is the code for PySpark:
    sales_field = StructField("total_sales", LongType(), True)
    Following is the code for Scala Spark:
    val sales_field = StructField("total_sales", LongType, true)
  2. Use the add() method to add the sales_field StructField to the sales_schema StructType. Following is the code for PySpark:
    another_schema = sales_schema.add(sales_field)
    Following is the code for Scala Spark:
    val another_schema = sales_schema.add(sales_field)
  3. Print out the new schema variable to verify it looks correctly: print(another_schema)

Following is the code for PySpark:

StructType(List(StructField(user_id,LongType,false),StructField(product_item,StringType,true),StructField(total_sales,LongType,true)))

Following is the code for Scala Spark:

println(another_schema)
StructType(StructField(user_id,LongType,true), StructField(product_item,StringType,true), StructField(total_sales,LongType,true))
import org.apache.spark.sql.types.{StructType, StructField, StringType, LongType}
sales_schema: org.apache.spark.sql.types.StructType = StructType(StructField(user_id,LongType,true), StructField(product_item,StringType,true))
sales_field: org.apache.spark.sql.types.StructField = StructField(total_sales,LongType,true)
another_schema: org.apache.spark.sql.types.StructType = StructType(StructField(user_id,LongType,true), StructField(product_item,StringType,true), StructField(total_sales,LongType,true))

Use the add() method when adding columns to a DataFrame

You can use the schema method on a DataFrame in conjunction with the add() method to add new fields to the schema of an already existing DataFrame. In Spark, a DataFrame’s schema is a StructType. In the preceding exercise we manually specified the schema as StructType. Spark has a shortcut: the schema method. The method schema can be called on an existing DataFrame to return its schema, that is a StructType. So in Spark Scala or PySpark you would call some_df.schema to output the StructType schema.

Here is an example in PySpark of the output of the schema method on a DataFrame returning a StructType schema:

print(customer_df.schema)
StructType(List(StructField(customer_id,LongType,true),StructField(first_name,StringType,true),StructField(avg_shopping_cart,DoubleType,true)))

Here is an example in Spark Scala of the output of the schema method on a DataFrame returning a StructType schema:

println(grocery_df.schema)
StructType(StructField(id,IntegerType,true), StructField(item,StringType,true), StructField(price,DoubleType,true))

So, with the schema method you don’t have to manually create the StructType to add a new column. Just call the schema method on the DataFrame and then use the add method to add a column as a StructField. Following is the code for PySpark:

final_schema = customer_df.schema.add(StructField("new_column", StringType(), True))
 
print(final_schema)

Running the preceding code displays the following:

StructType(List(StructField(customer_id,LongType,true),StructField(first_name,StringType,true),StructField(avg_shopping_cart,DoubleType,true),StructField(column_x,StringType,true),StructField(column_x,StringType,true),StructField(new_column,StringType,true),StructField(new_column,StringType,true)))

Following is the code for Scala Spark:

val final_schema = grocery_df.schema.add(StructField("new_column", StringType, true))
 
println(final_schema)

Running the preceding code displays the following:

StructType(StructField(id,IntegerType,true), StructField(item,StringType,true), StructField(price,DoubleType,true), StructField(new_column,StringType,true))
final_schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(item,StringType,true), StructField(price,DoubleType,true), StructField(new_column,StringType,true))

In the Modifying DataFrames section of this chapter we will see how to add new columns of data to a DataFrame.

Return column names from a schema

Use the fieldNames method on a StructType to return a list or array of the column names. This is an easy way to return the column names of a DataFrame. The fieldNames method can be called on a StructType or after the schema method on a DataFrame. The only difference between Spark Scala and PySpark is that PySpark requires trailing parenthesis () and Spark Scala omits the parenthesis. 

In PySpark call the fieldNames() method on a schema and on the DataFrame to return the column names of the schema:

print( customer_df.schema.fieldNames() )
print( customer_schema.fieldNames() )

The output is a list of the column names of the DataFrame:

['customer_id', 'first_name', 'avg_shopping_cart'] 
['customer_id', 'first_name', 'avg_shopping_cart']

In Spark Scala call the fieldNames() method on a schema and on the DataFrame to return the column names of the schema. In Scala we cannot directly print the contents of an array like we can print a list in Python. So here we create a variable and Scala outputs the contents of the array:

val columns1 = grocery_df.schema.fieldNames
val columns2 = grocery_schema.fieldNames

The output is an array of the column names of the DataFrame:

columns1: Array[String] = Array(id, item, price) 
columns2: Array[String] = Array(id, item, price)

Nested Schemas

So far, we have dealt with flat and orderly DataFrame schema. But Spark supports nested columns where a column can contain more sets of data. 

So far, we have dealt with flat and orderly DataFrame schema. But Spark supports nested columns where a column can contain more sets of data. Suppose we had a data set that looked like the following Python dictionary or JSON object: 

{"id":101,"name":"Jim","orders":[{"id":1,"price":45.99,"userid":101},{"id":2,"price":17.35,"userid":101}]},{"id":102,"name":"Christina","orders":[{"id":3,"price":245.86,"userid":102}]},{"id":103,"name":"Steve","orders":[{"id":4,"price":7.45,"userid":103},{"id":5,"price":8.63,"userid":103}]}

This data set would be the result of some imaginary sales tables that was joined to an orders table. We will look at joining DataFrames together in Chapter 3SQL with Spark.

It is difficult to see from the nested dictionary but there are three columns: idname, and orders. But orders is special, because it is a list of lists. In Python we can directly use this data by wrapping it in brackets as mentioned earlier.

nested_sales_data = [{"id":101,"name":"Jim","orders":[{"id":1,"price":45.99,"userid":101},{"id":2,"price":17.35,"userid":101}]},{"id":102,"name":"Christina","orders":[{"id":3,"price":245.86,"userid":102}]},{"id":103,"name":"Steve","orders":[{"id":4,"price":7.45,"userid":103},{"id":5,"price":8.63,"userid":103}]}]

If we used this list and made a DataFrame without specifying a schema, the output would not be very usable or readable. The following PySpark code uses the preceding nested JSON data to make a Spark DataFrame. The DataFrame and schema is displayed to demonstrate what can happen when you make a DataFrame with nested data without a schema:

ugly_df = spark.createDataFrame(nested_sales_data)
ugly_df.show(20, False)
ugly_df.printSchema()
+---+---------+------------------------------------------------------------------------+
|id |name     |orders                                                                  |
+---+---------+------------------------------------------------------------------------+
|101|Jim      |[[id ->, userid ->, price -> 45.99], [id ->, userid ->, price -> 17.35]]|
|102|Christina|[[id ->, userid ->, price -> 245.86]]                                   |
|103|Steve    |[[id ->, userid ->, price -> 7.45], [id ->, userid ->, price -> 8.63]]  |
+---+---------+------------------------------------------------------------------------+
 
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: double (valueContainsNull = true)

The output is not readable or user friendly with the “->” characters and Spark is trying to make a map of the data. Let’s add a schema to tell Spark exactly how we want to structure the DataFrame. The following PySpark code demonstrates the results of nested data when using a schema:

from pyspark.sql.types import *
 
orders_schema = [
  StructField("id", IntegerType(), True),
  StructField("price", DoubleType(), True),
  StructField("userid", IntegerType(), True)
]
 
sales_schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("name", StringType(), True),
  StructField("orders", ArrayType(StructType(orders_schema)), True)
])

Here we called the order_schema inside the sales_schema. This shows how versatile schemas can be and how easy it is in Spark to construct complex schemas. Now let’s make a DataFrame that is readable and well structured as shown in the following code:

nested_df = spark.createDataFrame(nested_sales_data, sales_schema)
 
nested_df.show(20, False)
nested_df.printSchema()
+---+---------+----------------------------------+
|id |name     |orders                            |
+---+---------+----------------------------------+
|101|Jim      |[[1, 45.99, 101], [2, 17.35, 101]]|
|102|Christina|[[3, 245.86, 102]]                |
|103|Steve    |[[4, 7.45, 103], [5, 8.63, 103]]  |
+---+---------+----------------------------------+
 
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- userid: integer (nullable = true)

In the next section we will move on from manually created DataFrames to creating DataFrames from files stored in Hadoop.

Spark Starter Guide 1.1: Creating Spark DataFrames Manually

Introduction

Since the Spark 2.0 version update, DataFrames have been the central technology for accomplishing tasks in Spark. At its essence, DataFrames are an immutable but distributed group of data that is assembled into named columns with a set structure. Let’s look at the aspects of DataFrames individually in the next section.

DataFrames are an immutable but distributed group of data that is assembled into named columns with a set structure

Spark DataFrame Characteristics

Distributed Group of Data

Spark’s principle data storage device is the Hadoop Distributed File System (HDFS). A Hadoop cluster consists of a single NameNode that manages access & metadata of the cluster and multiple DataNodes that store the data. In Hadoop, each data file is split up into many small pieces, replicated at least three times, and stored across the DataNodes. Spark is designed to be able to read and process this distributed data stored in HDFS. So, if data is stored in HDFS and that data is created into a DataFrame, then the data actually is distributed on all the DataNodes of the Hadoop cluster.  

Immutable

Spark DataFrames are immutable which means they do not change. So, adding columns, removing rows, renaming columns, performing calculations, or anything else done to a DataFrame does not actually change the original DataFrame. Instead, each time an operation is performed on a DataFrame an entirely new DataFrame is created, behind the scenes in Spark, with the additional change(s). Even if you performed an operation on a DataFrame and called the DataFrame the same name, like df = df.withColumn(…), in the background Spark creates an entirely new DataFrame with the new column change and then renames the new DataFrame with the same name as the original, df. If an user performs a df.show() after the df = df.withColumn(…), because the Python or Scala code is interpreted in sequential order from top to bottom, the df.show() would display the DataFrame with the added column.

It is very important to note that Spark computes all DataFrame transformations in a lazy fashion. What that means is that computations or operations on a DataFrame actually only happen when a physical action is performed. Examples of an action on a DataFrame include displaying, writing, counting, and collecting on a DataFrame. For example, if we add a calculated column to a DataFrame, the column isn’t actually added until we perform an action on the DataFrame, like displaying the top twenty rows of the DataFrame with df.show(). For each operation performed on a DataFrame, but before an action is called, Spark essentially creates a blueprint or execution plan of the operation. This blueprint would consist of a function that would be the dataset plus the operation. And each time a new operation is performed Spark updates the blueprint with the new changes. Then when an action is called, Spark executes the blueprint and a new DataFrame is created.

Assembled into Named Columns

Spark DataFrames have structure. That structure consists of columns with column names. All data in a DataFrame is organized into named columns: even if there is just one column. For example, if the user creates a DataFrame with no column names, Spark will automatically give each column a name corresponding to its column number: _1, _2, _3, and so on.

Set Structure

Each column’s name and the data type of the column make up its structure, which is called a schema. A DataFrames schema may change but it must be defined. This means columns must have a data type: integer, string, decimal, date, and so on.


To summarize the Spark characteristics – Spark is an action technology. Every operation in Spark is designed to accomplish tasks and help reveal insights on incredibly large data sets. DataFrames form the foundation for everything in Spark: organizing data, analyzing data, streaming data, performing machine learning on data, and much more.  

But before we can do all of that, we first have to create DataFrames. Let’s jump right in.

Creating DataFrames Manually

The main way to create DataFrames in Spark is to use the createDataFrame() method. This method is called on the SparkSession to create DataFrames. Since Spark 2.0, the SparkSession is the main way to interact with all of Sparks’ many capabilities. The SparkSession creates and exposes the SparkConfSparkContext, and SQLContext to the entire application. Normally the SparkSession is created with a variable named “spark” but any name can be used. A basic SparkSession in Spark Scala looks like:

val spark = SparkSession
   .builder()
   .appName("some_app_name")
   .getOrCreate()

A basic SparkSession in PySpark looks like:

spark = SparkSession \
   .builder \
   .appName("some_app_name") \
   .getOrCreate()

Creating DataFrames Manually with PySpark

The createDataFrame() method in PySpark has a structure like:

createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)

The optional parameters, schemasamplingRatio, and verifySchema all deal with a DataFrames schema. A schema is information about the structure of the data. These parameters and more will be covered in the next section Spark DataFrame Schemas. For now, we will only use the first parameter, data.

A schema is information about the structure of the data

The parameter data is a required parameter and can take many things. A DataFrame can be manually created from various kinds of nested lists and tuples. 

In Python, a list is a group of ordered things. Lists can include objects of different data types, are ordered, can be changed, and be nested. A nested list is the easiest way to manually create a DataFrame in PySpark. Each inside list forms a row in the DataFrame.

Let’s start off by showing how to create a DataFrame from a nested Python list.

Exercise 1: Creating a DataFrame in PySpark from a Nested List

In this exercise we will be creating a DataFrame in PySpark from a given set of nested list. 

  1. Create a Python variable hadoop_list that is a nested list with the following code:
    hadoop_list = [[1, "MapReduce"], [2, "YARN"], [3, "Hive"], [4, "Pig"], [5, "Spark"], [6, "Zookeeper"]]
    This will create a DataFrame with six rows and two columns.
  2. To create the DataFrame named hadoop_df we use the SparkSession variable spark (that we created) and call the createDataFrame() method passing only the nested list with the following code:
    hadoop_df = spark.createDataFrame(hadoop_list)
  3. Finally display the contents of the DataFrame using hadoop_df.show() and display the schema of the DataFrame in a tree structure using hadoop_df.printSchema() as shown in the following code:
hadoop_df.show()
hadoop_df.printSchema()

Running the preceding code will produce the following output:

+---+---------+
| _1|       _2|
+---+---------+
|  1|MapReduce|
|  2|     YARN|
|  3|     Hive|
|  4|      Pig|
|  5|    Spark|
|  6|Zookeeper|
+---+---------+
 
root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)

You have now created your first Spark DataFrame. In this exercise the DataFrame only has six rows. But a Spark DataFrame can scale infinitely to contain 100 trillion rows and beyond. This the power of Spark. 

In the output did you notice anything that stood out? There are actually two things to note: 

  1. The column names were _1 and _2. This is because no column names were supplied when creating the DataFrame. Spark didn’t know what to call the columns, so _1 and _2 correspond to its column number going from left to right. 
  2. The output of the printSchema() method correctly inferred the data type of each column. Spark figured out the first column was of data type long, which is similar to an integer, and that the second column was a string. When the printSchema() method is called on a DataFrame the output displays the schema in a tree format. The tree format displays the column hierarchy, column names, column data type, and whether the column is nullable. The printSchema() method has no parameters.

To display the contents of the DataFrame we call the method show() on the newly created DataFrame. The show() method looks like this:

show(n=20, truncate=True)

The show() method defaults to displaying the top twenty rows and also truncates each cell to the first twenty characters. To display more or less than the top twenty rows set the first parameter to any integer. To include all the characters of the cells, set the second parameter to False.

Displaying the contents of Spark DataFrames in PySpark:

address_data = [["Bob", "1348 Central Park Avenue"], ["Nicole", "734 Southwest 46th Street"], ["Jordan", "3786 Ocean City Drive"]]
address_df = spark.createDataFrame(address_data)
address_df.show()
+------+--------------------+
|    _1|                  _2|
+------+--------------------+
|   Bob|1348 Central Park...|
|Nicole|734 Southwest 46t...|
|Jordan|3786 Ocean City D...|
+------+--------------------+

Since the method defaults to the displaying only the first twenty characters of each cell, the content is truncated.

To display all the characters for each cell set the second parameter to False and to limit the output to the first two rows, set the first parameter to 2, as shown in the following code:

address_df.show(2, False)
+------+-------------------------+
|_1    |_2                       |
+------+-------------------------+
|Bob   |1348 Central Park Avenue |
|Nicole|734 Southwest 46th Street|
+------+-------------------------+
only showing top 2 rows

Note:

The second parameter, truncate, can also take integers. If set to an integer, it will display the number of characters equal to the integer for each cell.


In Python, a tuple is similar to a list except it is wrapped in parentheses instead of square brackets and is not changeable (immutable). Other than that, lists and tuples are the same. A nested tuple is a tuple inside another tuple.

Exercise 2: Creating a DataFrame in PySpark from a nested tuple

  1. Create a nested tuple called programming_languages with the following code:
    programming_languages = ((1, "Java", "Scalable"), (2, "C", "Portable"), (3, "Python", "Big Data, ML, AI, Robotics"), (4, "JavaScript", "Web Browsers"), (5, "Ruby", "Web Apps"))
  2. Construct a DataFrame called prog_lang_df with the following code:
    prog_lang_df = spark.createDataFrame(programming_languages)
  3. Display the five rows and set the truncate parameter to False so the entire contents of the cells will be shown. Also print the schema of the DataFrame with the following code:
prog_lang_df.show(5, False)
prog_lang_df.printSchema()
+---+----------+--------------------------+
|_1 |_2        |_3                        |
+---+----------+--------------------------+
|1  |Java      |Scalable                  |
|2  |C         |Portable                  |
|3  |Python    |Big Data, ML, AI, Robotics|
|4  |JavaScript|Web Browsers              |
|5  |Ruby      |Web Apps                  |
+---+----------+--------------------------+
 
root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)

In Python, a dictionary is a key-value pair wrapped in curly braces. A dictionary is similar to a list, in that it is mutable, can increase or decrease in size, and be nested. Each data element in a dictionary has a key and a value. To create a DataFrame out of a dictionary all that is required is to wrap it in a list. 

Exercise 3: Creating a DataFrame in PySpark from a list of dictionaries

  1. Create a list of dictionaries called top_mobile_phones. Inside the list make three comma separated dictionaries each with keys of “Manufacturer”, “Model”, “Year”, “Million_Units” as shown in the following code:
    top_mobile_phones = [{"Manufacturer": "Nokia", "Model": "1100", "Year": 2003, "Million_Units": 250}, {"Manufacturer": "Nokia", "Model": "1110", "Year": 2005, "Million_Units": 250}, {"Manufacturer": "Apple", "Model": "iPhone 6 & 6+", "Year": 2014, "Million_Units": 222}]
  2. Create a DataFrame called mobile_phones_df from the dictionary list as shown in the following code:
    mobile_phones_df = spark.createDataFrame(top_mobile_phones)
  3. Display the DataFrame and print the schema as shown in the following code:
mobile_phones_df.show()
mobile_phones_df.printSchema()
+------------+-------------+-------------+----+
|Manufacturer|Million_Units|        Model|Year|
+------------+-------------+-------------+----+
|       Nokia|          250|         1100|2003|
|       Nokia|          250|         1110|2005|
|       Apple|          222|iPhone 6 & 6+|2014|
+------------+-------------+-------------+----+
 
root
 |-- Manufacturer: string (nullable = true)
 |-- Million_Units: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- Year: long (nullable = true)

Notice that we didn’t supply the column names to the DataFrame but they still appear. That is because dictionaries have “keys” and these keys make up the columns of the DataFrame. Likewise, the dictionary “values” are the cells in the DataFrame. So, by using dictionaries, Spark can display the DataFrame column names.

Creating DataFrames Manually with Spark Scala

In Scala the createDataFrame() method only has two parameters, the first is the actual data and the second is schema. Two of the more popular options for creating a DataFrame manually in Scala are using lists and Sequences. A list in Scala is different than a list in Python. In Scala, a list is an unchangeable collection of objects of the same data type. But we can get around Scala’s list immutability by holding each row of data in a tuple. In Scala, a tuple is just comma separated data enclosed by parenthesis. In the following exercise, the DataFrame will have two columns because the tuple only has two values. But using this method we can add more columns by adding more values inside the tuple.

Exercise 4: Creating a DataFrame in Scala from a List

  1. Before we can create a DataFrame, we first need a list. In Scala, we can use the List() method to create a list. Create a list called reptile_species_state with three tuples of two vales each inside the list as shown in the following code:
    val reptile_species_state = List(("Arizona", 97), ("Florida", 103), ("Texas", 139))
  2. Create a DataFrame named reptile_df by calling the createDataFrame() method on the SparkSession variable spark. Pass the reptile_species_state list as a parameter to the createDataFrame() method as shown in the following code:
    val reptile_df = spark.createDataFrame(reptile_species_state)
  3. Display the DataFrame and print the schema with the following code:
reptile_df.show()
reptile_df.printSchema
+-------+---+
|     _1| _2|
+-------+---+
|Arizona| 97|
|Florida|103|
|  Texas|139|
+-------+---+
 
root
 |-- _1: string (nullable = true)
 |-- _2: integer (nullable = false)
 
reptile_species_state: List[(String, Int)] = List((Arizona,97), (Florida,103), (Texas,139))
reptile_df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

As with Python, the createDataFrame() method in Scala correctly inferred the schema and created the DataFrame.

In Scala, a Sequence and a list are very similar. In fact, a list is a type of Sequence. The main difference centers around performance: how Scala provides fast access to the elements in a list or Sequence. But in reality you can treat them the same. A Sequence is called like Seq() with the elements inside the parenthesis separated by commas.

To add column names to a manually created DataFrame in Scala use the toDF() method. toDF() takes comma separated strings and converts the DataFrame with unnamed columns into a DataFrame with named columns. 

Exercise 5: Creating a DataFrame in Scala from a Sequence

  1. In Scala, a Sequence is created using “Seq()”. Make a Sequence called bird_species_state and inside construct three tuples as shown in the following code:
    val bird_species_state = Seq(("Alaska", 506), ("California", 683), ("Colorado", 496))
  2. Create a DataFrame called birds_df. But this time call the toDF() method with comma separated strings for column names as shown in the following code:
    val birds_df = spark.createDataFrame(bird_species_state).toDF("state","bird_species")
  3. Display the contents of the DataFrame and print the schema with the following code:
birds_df.show()
birds_df.printSchema
+----------+------------+
|     state|bird_species|
+----------+------------+
|    Alaska|         506|
|California|         683|
|  Colorado|         496|
+----------+------------+
 
root
 |-- state: string (nullable = true)
 |-- bird_species: integer (nullable = false)
 
bird_species_state: Seq[(String, Int)] = List((Alaska,506), (California,683), (Colorado,496))
birds_df: org.apache.spark.sql.DataFrame = [state: string, bird_species: int]

Like Python dictionaries, Scala Sequences are prevalent and are used often. Being exposed to them will add a lot of value. In this exercise we showed how to transform data in a Sequence into a DataFrame. 

In this section we showed how to create DataFrame with just data. We will explain creating DataFrames in Scala and Python with a defined schema in the next section.