In the previous section, 2.1 DataFrame Data Analysis, we used US census data and processed the columns to create a DataFrame called
census_df. After processing and organizing the data we would like to save the data as files for use later. In Spark the best and most often used location to save data is HDFS. As we saw in 1.3: Creating DataFrames from Files, we can read files from HDFS to create a DataFrame. Likewise, we can also write a DataFrame to HDFS as files in different file formats. This section will cover writing DataFrames to HDFS as Parquet, ORC, JSON, CSV, and Avro files.
We will use our
census_df DataFrame to write to HDFS. To write a DataFrame, chain the
write method onto the DataFrame name, in our case
census_df. The write method has many options for writing data files, but the main method is the
save() method. It has several parameters but the first is the HDFS directory path to the location you would like to write to. Spark and Hadoop make this very easy because the directory path you specify when writing to HDFS doesn’t have to exist to use it. At runtime if the directory path doesn’t exist then HDFS creates it and places your files there. Here is a basic example in PySpark and Spark Scala of writing the
census_df DataFrame to HDFS:
Spark offers many options when writing DataFrames as files to HDFS. Spark allows us to control if data is appended or overwritten, specific compression technologies to use, and a host of other file format specific options. In this section we will explain writing DataFrames to HDFS as Parquet, ORC, JSON, CSV, and Avro files formats. As we cover each file format we will introduce additional options that can be used when writing files to HDFS.
Writing DataFrames to HDFS as Parquet Files
There are two ways to write a DataFrame as parquet files to HDFS: the
parquet() and the
save() method. Using the
parquet() method is the easiest and most straightforward. Call the
write method on a DataFrame and then directly call the
parquet() method. The
parquet() method has only one parameter: the HDFS directory path to where you would like the files saved. The PySpark and Spark Scala code looks like:
Writing a DataFrame to HDFS doesn’t produce a Spark console output. If the command does not produce an error then the files were written as your command specified.
The other option of writing a DataFrame to Parquet files is to use the
save() method. The
save() method allows more control over the final output and is used in conjunction with the
format() method. The
format() method specifies the output file format used by the
save() method. It has one parameter, which is a string representing the file type:
"avro". In our case, we would like to write the “
census_df” DataFrame as parquet files to HDFS. The PySpark and Spark Scala code is:
save() method allows more options when writing data. One of the choices is the compression type of the individual files written to HDFS. To set the compression use the
option() method where the first parameter is
"compression" and the second is the compression type.
The purpose of compressing files is to take up less space while storing the data. Files that take up less space are important because they cost less to store and they are faster to send across the network of the cluster. In Spark there are multiple compression types to choose from when writing data. Each compression option is different and will result in different performance. Here are the compression options for Parquet, ORC, and JSON file formats:
As an example, if we wanted to write a DataFrame to HDFS using Parquet files compressed with “gzip” technology, the PySpark and Spark Scala code would look like:
Writing DataFrames to HDFS as ORC Files
Similar to writing as Parquet files, there are two option when writing DataFrames as ORC files: the
orc() method and the
save() method. The
orc() method simply saves DataFrames in HDFS as ORC files.
PySpark and Spark Scala code:
save() method requires using
.format("orc") to specify the files as ORC. The PySpark and Spark Scala code:
Another option that can be used with the
save() method is the
mode() method. The mode operation specifies the behavior when writing data files or directories that already exist. There are four parameter options to pass to the
mode() method as a string:
overwrite: Overwrite the existing data
append: Append contents of this DataFrame to existing data
ignore: Silently ignore this operation if data already exists
errorifexists: Default option, throw an exception at runtime if data already exists
A couple examples of specifying the
mode() method in PySpark and Spark Scala look like:
Writing DataFrames to HDFS as JSON Files
Writing a DataFrame to HDFS as JSON files using the
json() method in PySpark and Spark Scala looks like:
json() method has several other options for specifying how the JSON obects are written. The optional parameters include:
lineSep. To include multiple options in the writing process you can chain multiple
option() methods together to specify as many as you need.
In the PySpark and Spark Scala examples below we use multiple
option() method to set the JSON date format and the line separater.
census_df.write \ .format("json") \ .mode("overwrite") \ .option("dateFormat", "yyyy/mm/dd") \ .option("lineSep", "\n") \ .save("hdfs://…/spark_output/json")
census_df.write .format("json") .mode("overwrite") .option("dateFormat", "yyyy/mm/dd") .option("lineSep", "\n") .save("hdfs://…/spark_output/json")
Writing DataFrames to HDFS as CSV Files
Writing CSV files to HDFS has over fifteen optional parameters. We will not cover all of them but will highlight some of the more common choices. See the official PySpark and Spark Scala API documentation for all the options.
In the following code we are writing the “
census_df” DataFrame as CSV files with comma separator, quotes as double quotes, and includes a header row of column names.
(census_df.write .format("csv") .mode("overwrite") .option("sep", ",") .option("quote", '"') .option("header", "true") .save("hdfs://…/spark_output/csv"))
census_df.write .mode("overwrite") .option("sep", ",") .option("quote", "") .option("header", "true") .csv("hdfs://…/spark_output/csv")
Writing DataFrames to HDFS as Avro Files
There is no
avro() method in Spark so the only way to write a DataFrame to HDFS is to use the
save() method in conjunction with the
census_df.write \ .format("avro") \ .mode("overwrite") \ .save("hdfs://…/spark_output/avro")
census_df.write .format("avro") .mode("append") .save("hdfs://…/spark_output/avro")
In Spark the word “partition” is used a lot and has many different usages and meanings. This section will cover partitions, repartitioning, and partitioning.
In Spark, the data in a DataFrame is held in memory and if the size of the data exceeds the amount of available memory then the data is spilled over to the local disc drive of the worker nodes of the cluster. This local disc space is not the same as HDFS, though. Since Spark is designed to run on a distributed system of machines, the data of a DataFrame is split up into smaller pieces and distributed in the memory and/or local storage of the worker nodes. In that sense, a Spark DataFrame is divided logically i.e. partitioned. So a partition is an atomic collection of subset data of a DataFrame residing in memory and/or on the local disc of a worker node. The purpose of DataFrame partitions is that data split up into smaller chunks can have computations executed in parallel. This greatly speeds up computations.
A partition is an atomic collection of subset data of a DataFrame residing in memory and/or on the local disc of a worker node
We can see how many partitions our DataFrame is split into by converting our DataFrame into a RDD with the help of the
rdd and the
getNumPartitions() methods. Because at their cores, both RDDs and DataFrames are immutable and distributed collections of data, converting between RDDs and DataFrames is easy and painless. The number of default partitions that a DataFrame is split up into is largely dependent on the number of cores in all the worker nodes of your Hadoop cluster. For example, if a DataFrame consisted of 100 gigabytes of data and the Hadoop cluster had a total of sixty-four cores, then each partition of the DataFrame would roughly hold about 1.5 gigabytes of data split into approximately sixtry-four partitions. Also, there is no size limit to the amount of data in a partition. As long as the Hadoop cluster can hold the entire data size, Hadoop and Spark will scale to hold any partition size.
The image below is a great visual representation of DataFrame partitions split up between the worker nodes of the cluster.
To get the number of partitions of a DataFrame, call the
rdd method on your DataFrame and then call the method
getNumPartitions(). In the examples below the PySpark DataFrame is split into sixteen partitions while the Scala DataFrame is split into eight partitions. Because both use the same
census_df DataFrame, each PySpark partition holds less data. However, the sum of all the partitions for the PySpark & Spark Scala DataFrame is the same.
res5: Int = 8
Spark allows us to control the number of DataFrame partitions. Repartitioning is the increasing or decreasing of number of partitions for the purpose of adjusting the balance of data of an existing DataFrame. Repartitioning creates a new DataFrame that is hash partitioned, which means that rows are distributed evenly (as much as possible) across the newly created/assigned partitions.
Repartitioning is the increasing or decreasing of number of partitions for the purpose of adjusting the balance of data of an existing DataFrame
Why would we need to repartition a DataFrame? Spark does not repartition when filtering or reducing the size of a DataFrame. So, imagine we had a DataFrame with 1 billion rows of data split into 10,000 partitions. That would be 100,000 rows of data per partition. If we dramatically filtered the data or took a statistical sample of the data and reduced the size of the DataFrame, to say 50,000 rows, then the DataFrame would still have 10,000 partitions. The DataFrame would have 50,000 rows of data in 10,000 partitions; that is not very efficient. To optimize the data in the new DataFrame, Spark has two methods,
coalesce(), that can be used to balance the size of the partitions to a more appropriate number given the overall size of the DataFrame. However both methods are very different in the way they function and their parameters. Let’s look a
coalesce() method returns a new DataFrame with the number of partitions specified as a parameter. However, there is a significant limitation with
coalesce() and that is it can only reduce the number of partitions. It cannot increase the number of partitions of a DataFrame. If a user executes a
coalesce() command that specifies a larger number of partitions, then the DataFrame will stay at its current partition count and the operation will do nothing. The reason that
coalesce() can only reduce partitions is because it doesn’t shuffle the data when repartitioning. Shuffling is the process of moving data between partitions. Shuffles are very computationally expensive because Spark must touch the data in all the partitions and move it around. Since
coalesce() doesn’t do a shuffle, it combines or stacks the current partitions into a smaller number of partitions. This allows the
coalesce() method to operate very quickly compared to the other method,
Shuffling is the process of moving data between partitions
To partition the
census_df DataFrame into four partitions with the
four_parts_df = census_df.coalesce(4) four_parts_df.rdd.getNumPartitions()
val four_parts_df = census_df.coalesce(4) four_parts_df.rdd.getNumPartitions
res6: Int = 4
repartition() method can increase or decrease the number of internal partitions of a DataFrame because each time a data shuffle is performed. Take caution when performing a repartition on a very large DataFrame because it could take a long time to complete.
repartition() returns a new DataFrame according to the partition expression supplied by its parameters. It has two optional parameters that can be used together or separately for a total of three separate configurations.
The first parameter is an integer that is the target number of partitions you would like to increase or decrease to. Here are examples of increasing and decreasing the number of partitions.
ten_parts_df = census_df.repartition(10) ten_parts_df.rdd.getNumPartitions()
val three_parts_df = census_df.repartition(3) three_parts_df.rdd.getNumPartitions
res7: Int = 3
The second option is to include one or many column names to the
repartition() method. It is important to note that when partitioning the data of a DataFrame by a column or columns, Spark will create at a minimum of 200 partitions. Even if a DataFrame already had 100 internal and we partitioned it by a column, it would still create 200 partitions. This hypothetical example would create 200 partitions and some number of partitions would be completely empty of data. The 200 number partition value is controlled by the configuration “spark.sql.shuffle.partitions”. We can verify this 200 value number by exectuing the following Spark command in PySpark and Spark Scala:
Let’s show some real examples of partitioning by column. The
repartition() method will accept column names as strings or by column index.
state_part_df = census_df.repartition("us_state") state_part_df.rdd.getNumPartitions()
Using Spark Scala to partition by two columns:
val state_county_part = census_df.repartition(census_df("us_state"), census_df("county_name")) state_county_part.rdd.getNumPartitions
Even though there are only fifty states, the output is still “200”:
res8: Int = 200
The last option is to combine the previous two options and specify the number of partitions along with a column.
state_50_parts = census_df.repartition(50, "us_state") state_50_parts.rdd.getNumPartitions()
After setting the number of DataFrame partitions, another way to verify the number of partitions is to write the DataFrame to HDFS. And the number of actual files written to HDFS will be equal to the number of DataFrame partitions for both the
parts_to_files = census_df.repartition(10) print(parts_to_files.rdd.getNumPartitions()) parts_to_files.write.format("avro").mode("overwrite").save("hdfs://…/spark_output/parts_to_files")
And if we list out the files in HDFS at
/spark_output/parts_to_files we see something similar to the following:
spark_output/parts_to_files/part-00000-tid-164469901362115015-a3cd28ec-a8af-4f6c-80f4-f7d3406a6024-4113746-1-c000.avro spark_output/parts_to_files/part-00001-tid-164469901362115015-a3cd28ec-a8af-4f6c-80f4-f7d3406a6024-4113747-1-c000.avro spark_output/parts_to_files/part-00002-tid-164469901362115015-a3cd28ec-a8af-4f6c-80f4-f7d3406a6024-4113748-1-c000.avro spark_output/parts_to_files/part-00003-tid-164469901362115015-a3cd28ec-a8af-4f6c-80f4-f7d3406a6024-4113749-1-c000.avro spark_output/parts_to_files/part-00004-tid-164469901362115015-a3cd28ec-a8af-4f6c-80f4-f7d3406a6024-4113750-1-c000.avro spark_output/parts_to_files/part-00005-tid-164469901362115015-a3cd28ec-a8af-4f6c-80f4-f7d3406a6024-4113751-1-c000.avro spark_output/parts_to_files/part-00006-tid-164469901362115015-a3cd28ec-a8af-4f6c-80f4-f7d3406a6024-4113752-1-c000.avro spark_output/parts_to_files/part-00007-tid-164469901362115015-a3cd28ec-a8af-4f6c-80f4-f7d3406a6024-4113753-1-c000.avro spark_output/parts_to_files/part-00008-tid-164469901362115015-a3cd28ec-a8af-4f6c-80f4-f7d3406a6024-4113754-1-c000.avro spark_output/parts_to_files/part-00009-tid-164469901362115015-a3cd28ec-a8af-4f6c-80f4-f7d3406a6024-4113755-1-c000.avro
Partitioning is completely different from partitions of a DataFrame or repartitioning those partitions. Partitioning is the physical writing of data as files in HDFS organized into subdirectories corresponding to a column or columns in the data. It is mainly used to improve query performance by distributing the data horizontally assuming that the partitioning scheme corresponds to likely filtering from users.
The image below (taken from a great partition article) is a great visual representation of the structure of partitioning. In this hypothetical example, the data has two levels of partitioning: by year and by month. Each column of blue folders correspond to two actual columns in the data: “
date_year” and “
date_month“. Each unique value in the “
date_year” and “
date_month” columns (
2016, etc. or
03, etc.) get their own directory in HDFS. All of the data files reside on the last partition directory.
Partitioning is the physical writing of data as files in HDFS organized into subdirectories corresponding to a column or columns in the data
Partitioning uses the
partitionBy() method used when writing a DataFrame to HDFS. The
partitionBy() method can be used in conjunction with the
save() methods we introduced at the beginning of this section. Let’s look at an example.
(census_df.write .format("parquet") .mode("overwrite") .partitionBy("us_state") .save("hdfs://…/spark_output/part_by_state"))
partitionBy() method only takes one or more column names as strings. As stated earlier, partitioning literally alters the output of files in HDFS based upon a column by writing the data into subdirectories. So in the previous example, we saved the data in a directory named
part_by_state which is located inside another directory called
spark_output. If we were writing data to HDFS without partitioning, the data files (Parquet, ORC, etc.) would be saved directly inside the
part_by_state directory. However, when partitioning the
part_by_state directory holds subdirectories corresponding to the column name or names provided in the
partitionBy() method. So in our example, the contents of the directory
part_by_state would be fifty-one directories for the fifty U.S. states plus the District of Columbia. Here is the first eleven directories (which are partitions) in the
us_state=Alabama us_state=Alaska us_state=Arizona us_state=Arkansas us_state=California us_state=Colorado us_state=Connecticut us_state=Delaware us_state=District of Columbia us_state=Florida us_state=Georgia …
So the actual directories are the string characters “
us_state=Alabama” or “
us_state=California“. The strings to the left of the equal sign are the column name, “
us_state“, and the strings to the right of the equal sign are the distinct values of that column. Within each state directory are file or files of the specified format corresponding to rows of data where the column equals the particular partition value. In our case the partition “
us_state=Florida” holds all of the rows of data in the DataFrame
census_df where the column “
us_state” equal “Florida”. This is what it means to by partitioning.
We can partition by multiple columns by separating string column names by commas in the
partitionBy() method. Here is a double partitioned example in Spark Scala.
census_df.write .format("orc") .mode("overwrite") .partitionBy("us_state", "county_name") .save("hdfs://…/spark_output/part_by_state_orc")
When partitioning by multiple columns each subsequent partition is inside the previous partition directory. So in this example the directory
part_by_state_orc contains all 51 state partitions. Then within each state directory partition holds all the distinct “
county_name” values for that state as county partitions. The output is a nested directory structure that can continue for as long as there are additional columns. So the directory structure would look like the example below. And the actual files would reside in each of the “
county_name” directories inside each state directory.
us_state=Alabama/county_name=Autauga County us_state=Alabama/county_name=Baldwin County us_state=Alabama/county_name=Barbour County us_state=Alabama/county_name=Bibb County us_state=Alabama/county_name=Blount County us_state=Alabama/county_name=Bullock County us_state=Alabama/county_name=Butler County us_state=Alabama/county_name=Calhoun County us_state=Alabama/county_name=Chambers County us_state=Alabama/county_name=Cherokee County …
Partitioning is best used on really big data. Because small data (like our Census data of 1.2 million rows) really doesn’t need to be partitioned in HDFS. Partition columns should be categorical or discrete columns because those columns have a finite and lower number of distinct values. The term for this is cardinality. Cardinality is the number of values in a group. In our case, the group is the column we would like to partition on. It is best for partition columns to have low cardinality because a large amount of partition values would take up too much time scanning the partitions and counteracting the benefits. A good rule of thumb for selecting a potential partition column is one with values no greater than ten thousand. Overall, partitioning is a great strategy for optimizing the physical layout of your data that can drastically increase the query speeds of the data because it can skip entire sections of your data.
In our example above, choosing the partition columns of US State and/or State County are good choices because of their low cardinality and the data is roughly distributed between these choices. For example if you have retail data, then it is a good idea to partition it by a date string column, for example, “YYYY-MM-DD”. Since retail transactions are naturally split up by date, each new day gets its own partition. And since there are only 365 partitions in a year, the cardinality is low. But if you had world-wide retail sales, then it might be a good idea to add a second partition by country. Another important consideration is to choose potential partition columns that will not result in small amounts of data inside each partition. This is called the “the small files problem”. A file is “small” when it is smaller than the HDFS block size. And if you have a lot of small files this is an even bigger problem because every file, directory, and block has to be represented in the clusters namenode. It takes a lot of time and processing power to scan an extremely large amount of small files. That is why Hadoop was designed to read large files. Furthermore in Hadoop, files are distributed which means they are split up into smaller pieces. If the data files are small to begin with then Hadoop will take an incredibly long time scaning the data.
An example of the small files problem would be if you partitioned your data by hour and each hour only held 5 MB of data. Two years of data would be almost 20,000 partitions. If an user performed a query on two years of data then Spark would have to read all of these partitions just to process a small amount of data. It would be better to pick a partition column with a lower cardinality in this case. Regardless, just remember good partition columns are columns that will naturally follow users queries and have a low cardinality.