How to use Predicate Subqueries in Spark SQL (i.e. subqueries in where clause)

Spark SQL supports an incredibly useful feature: predicate subqueries. Documentation on the DataBricks website defines it as:

Predicate subqueries are predicates in which the operand is a subquery. Spark 2.0 supports both the EXISTS and IN based forms. Spark 2.0 currently only supports predicate subqueries in WHERE clauses.

That means you can conveniently filter rows in your main query using contents from another query. This can be incredibly helpful in situations where you need to only select rows from a table where matching contents exist in another table.

Predicate Subqueries might be a solution for you if you need, say: to provide an otherwise massive list for an IN statement in a WHERE clause.

A good use case example of this, is only selecting web traffic from a click-stream table where users are defined in another table. For example, you might be wanting to see rows only related to certain users, but your list of users (stored in another table) is dozens, hundreds or even millions of users long.

That is where Predicate Subqueries in Spark shine. Take for example this query below that uses predicate subquerying to filter users in clickstream based on the users found in another table, interesting_users.

select count(1) from
from some_db.clickstream
where user in (select distinct user from some_db.interesting_users)

Note: Using select * for your base table might cause errors (I’ve only seen it in Zeppelin, but your results may vary), and you might need to explicitly select the columns you need. A small price to pay for this level of convenience.

Source: DataBricks Blog

How to Name Cached DataFrames and SQL Views in Spark

For a while now, it’s been possible to give custom names to RDDS in Spark. One result of this is a convenient name in the Storage tab of the Spark Web UI. It looks like this:

val my_rdd = sc.parallelize(List(1,2,3))
my_rdd.setName("Some Numbers")
my_rdd.cache()

// running an action like .count() will fully materialize the rdd
my_rdd.count() 

Note: You could use an action like take or show, instead of count. But be careful.

.take() with cached RDDs (and .show() with DFs), will mean only the “shown” part of the RDD will be cached (remember, spark is a lazy evaluator, and won’t do work until it has to).

The implication being that you might think your entire set is cached when doing one of those actions, but unless your data will be 100% shown in said action, it will not be 100% cached.

But DataFrames have not been given the same, clear route to convenient renaming of cached data. It has, however, been attempted and requested by the community:

However, with the below approach, you can start naming your DataFrames all you want. It’s very handy.

How to Nickname a DataFrame and Cache It

This method requires a few steps:

  • Create a DataFrame
  • If it needs to be repartitioned (due to skew), do that immediately
  • Create a SQL View
  • Cache using SQL Context (not precisely the same as df.cache() or df.persist(), as we’ll see)

PySpark

from pyspark.sql import Row

# Some arguments
tbl_name = "Some Numbers (in a DataFrame)"
do_cache = True
num_partitions = 50

# Create an RDD
rdd = sc.parallelize(
    [Row(1),
     Row(2),
     Row(3)])

# Create a DataFrame
df = spark.createDataFrame(rdd, ['number'])

# Optional: Repartition as desired for better performance
if(num_partitions):
    df = df.repartition(num_partitions)
    
# Create a SQL View from that DataFrame
df.createOrReplaceTempView(tbl_name)

# Cache the table using sqlContext
if(do_cache):
    sqlContext.cacheTable(tbl_name)

# Run a .count() to materialize the df in cache
df.count()

Now, your DataFrame has been named as specified in the Storage tab of the Spark Web UI. It’s worth noting that the allowances for a DataFrame nickname are more restricted (namely, no spaces are allowed) compared to RDDs. This is for good reason: it follows the SQL convention for table names – and those can’t have spaces, can they?

Plus, cached DF tables will be prefixed with an “In-memory table” designation.

So there you have it: a way to name your cached DataFrames as table views in Spark SQL. This has the additional benefit of making your Spark SQL queries easier to read and comprehend.

Always a good thing.

How to See Record Count Per Partition in a Spark DataFrame (i.e. Find Skew)

One of our greatest enemies in big data processing is cardinality (i.e. skew) in our data. This manifests itself in subtle ways, such as 99 out of 100 tasks finishing quickly, while 1 lone task takes forever to complete (or worse: never does).

Skew is largely inevitable in this line of work, and we have 2 choices:

  • Ignore it, and live with the slowdown
  • Try to find the source of the skew, and mitigate it

Ignoring issues caused by skew can be worth it sometimes, especially if the skew is not too severe, or isn’t worth the time spent for the performance gained. This is particularly true with one-off or ad-hoc analysis that isn’t likely to be repeated, and simply needs to get done.

However, the rest of the time, we need to find out where the skew is occurring, and take steps to dissolve it and get back to processing our big data. This post will show you one way to help find the source of skew in a Spark DataFrame. It won’t delve into the handful of ways to mitigate it (repartitioning, distributing/clustering, isolation, etc) (but our new book will), but this will certainly help pinpoint where the issue may be.

Introducing… Spark Partition ID

There is a built-in function of Spark that allows you to reference the numeric ID of each partition, and perform operations against it. In our case, we’d like the .count() for each Partition ID.

By doing a simple count grouped by partition id, and optionally sorted from smallest to largest, we can see the distribution of our data across partitions. This will help us determine if our dataset is skewed.

Python / PySpark

from pyspark.sql.functions import spark_partition_id, asc, desc
df\
    .withColumn("partitionId", spark_partition_id())\
    .groupBy("partitionId")\
    .count()\
    .orderBy(asc("count"))\
    .show()
+-----------+-----+
|partitionId|count|
+-----------+-----+
|         21|86640|
|          4|86716|
|         19|86729|
|         13|86790|
|         31|86911|
|         25|86927|
|         24|86978|
|         15|87044|
|         10|87085|
|         18|87088|
|         17|87105|
|         22|87236|
|          5|87287|
|         29|87313|
|          2|87331|
|          8|87363|
|          1|87401|
|         16|87424|
|          9|87457|
|         14|87468|
+-----------+-----+
only showing top 20 rows

Scala / Spark

import org.apache.spark.sql.functions.{spark_partition_id, asc, desc}

df
    .groupBy(spark_partition_id)
    .count()
    .orderBy(asc("count"))
    .show()
+--------------------+-----+
|SPARK_PARTITION_ID()|count|
+--------------------+-----+
|                  21|86640|
|                   4|86716|
|                  19|86729|
|                  13|86790|
|                  31|86911|
|                  25|86927|
|                  24|86978|
|                  15|87044|
|                  10|87085|
|                  18|87088|
|                  17|87105|
|                  22|87236|
|                   5|87287|
|                  29|87313|
|                   2|87331|
|                   8|87363|
|                   1|87401|
|                  16|87424|
|                   9|87457|
|                  14|87468|
+--------------------+-----+
only showing top 20 rows

Spark SQL

First, create a version of your DataFrame with the Partition ID added as a field. You can do this in any supported language. Here it is in Scala:

import org.apache.spark.sql.functions.spark_partition_id

val df_with_id = df.withColumn("partitionId", spark_partition_id())
df_with_id.createOrReplaceTempView("df_with_id")

Then, simply execute similar logic as above using Spark SQL (%sql block in Zeppelin/Qubole, or using spark.sql() in any supported language:

select partitionId, count(1) as num_records
from df_with_id
group by partitionId
order by num_records asc

As you can see, the partitions of our Spark DataFrame are nice and evenly distributed. No outliers here!

Let us know if you have any other tricks in the comments!

Big special thanks to this StackOverflow discussion for pointing me in the right direction!

How to Load Data from Cassandra into Hadoop using Spark

Let’s look at one way to get Cassandra data into Hadoop.

Cassandra is a great open-source solution for accessing data at web scale, thanks in no small part to its low-latency performance. And if you’re a power user of Cassandra, there’s a high probability you’ll want to analyze the data it contains to create reports, apply machine learning, or just do some good old fashioned digging.

However, Cassandra can prove difficult to use as an analytical warehouse, especially if you’re using it to serve data in production around the clock. But one approach you can take is quite simple: copy the data to Hadoop (HDFS).

Read More »

How to Control File Count, Reducers and Partitions in Spark and Spark SQL

Image result for poorly packed boxes
Hopefully after this you’ll get why I chose this image of unevenly packed boxes for a post about Spark partitioning.

After years of working with engineers, analysts, data scientists and general users of big data technology, I have learned a constant: people want to control the number and size of files their job or query will output… and usually for good enough reasons:

  • they wish to share the data externally or load it into a tool (and too many files becomes a headache)
  • they’re encouraged to optimize the output of a job (perhaps creating many small files or not utilizing an appropriate block size)

Whatever the case may be, the desire to control the number of files for a job or query is reasonable – within, ahem, reason – and in general is not too complicated. And, it’s often a very beneficial idea.

However, a thorough understanding of distributed computing paradigms like Map-Reduce (a paradigm Apache Spark follows and builds upon) can help understand how files are created by parallelized processes. More importantly, one can learn the benefits and consequences of manipulating that behavior, and how to do so properly – or at least without degrading performance.

Controlling Initial Partition Count in Spark for an RDD

It’s actually really simple. If you’re reading a source and you want to convey the number of partitions you’d like the resulting RDD to have, you can simply include it as an argument:

val rdd= sc.textFile ("file.txt", 5)

I imagine most of you know that trick, and are looking more for how to control the final output of a job, and the number of files it will result in.

So let’s do that!

Controlling Reducer / File Count in Spark

Option 1: spark.default.parallelism

In the Execution Behavior section of the Apache Spark docs, you will find a setting called spark.default.parallelism– it’s also scattered across Stack Overflow threads – sometimes as the appropriate answer and sometimes not.

It controls, according to the documentation, the…

Default number of partitions in RDDs returned by transformations like joinreduceByKey, and parallelize when not set by user.

Default is 200.

To put it another way, this value controls the number of partitions an RDD (Resilient Distributed Dataset) will have when it is created by transformations. Recall that in Spark there are two key abstractions: transformations (logic that lazily produces new RDDs from existing RDDs) and actions (which force transformations to execute and render the data).

When working with Spark Core and RDDs, this setting will allow you to control the number of partitions an RDD will have after a reduce operation. If you’ve noticed how commonly you end up with Spark stages in your job that are exactly 200 tasks – now you know why!

You can alter this setting as you please. In doing so, you will instruct Spark to use this value as the partition count for any RDDs that are created from transformations, or more specifically in our case, functions that reduce (grouping, filtering, etc). What this will not necessarily control are the partition counts of RDDs that are created by reading a data source – this is most often a factor of the parallelization of the read, which is influenced by file/block counts, at least in HDFS). So don’t be surprised if it doesn’t change the partition count of an RDD created from reading some data (at least not initially – you can create a reduce step that will, however, conform to this setting).

So… if you find that the last step in your job/query is creating 200 files – you can obviously alter this setting to force Spark to use n partitions and therefore output n files. However, you must be wary of skew that might be present in your job, as well as overall data size. As you alter parallelism, especially when decreasing it , you run the risk of losing performance and defeating the purpose of distributed computing. If you drive this setting to 1 inappropriately, you’ll have quite the slow pipeline on your hands – and most certainly run into OOMs (out of memory errors) from pushing all your data to one host.

Tl;DR: spark.default.parallelism is useful for controlling the parallelism of RDDs created by transformations, which are things like joins and groupBys.

Option 2: repartition() or coalesce()

You can also use the built in transformations of repartition() and coalesce(). They achieve similar results but have very different approaches.

repartition() will shuffle data across nodes to achieve as even a balance in terms of data size as it can. Data, by default, is not shuffled by any particular value, it’s simply moved across nodes until a relative balance is achieved. You can choose to repartition(n) to any n count of partitions preferred, increasing or decreasing. You can also repartition on a column if you like.

coalesce(), like repartition(), will allow you to decrease the number of partitions for your data, but not increase. It does far less data movement than repartition, generally speaking.

The main difference between the two: repartition() does a full shuffle of the data and creates relatively equal sized partitions across hosts. coalesce() on the other hand combines existing partitions to try and avoid a significant shuffle.

A popular attempt to output one file from a job or query is to use .coalesce(1) on your data. It might not scale appropriately in all scenarios, but it certainly can in some! Use with caution.

Option 3: spark.sql.shuffle.partitions

spark.sql.shuffle.partitions is a helpful but lesser known configuration. It is very similar to spark.default.parallelism, but applies to SparkSQL (Dataframes and Datasets) instead of Spark Core’s original RDDs.

Its definition:

Configures the number of partitions to use when shuffling data for joins or aggregations.

Default is 200.

So while you can control the partition count of RDDs made from reduce operations using spark.default.parallelism, that setting doesn’t apply for Dataframes and Datasets (which use the SparkSQL API). For those, you’ll need to use spark.sql.shuffle.partitions.

Keep in mind that this will not change the default partition count for any old Dataframe or Dataset. It will only change the default partition count for Dataframes and Datasets that are the result of reduce computations: like joins and aggregations.

So in short, you must be invoking an operation that uses a shuffle to take advantage of this editable value.

Understanding the Output of your Job or Query

Because you are using a distributed computing framework to process your big data, any n number of hosts can take part in computation as well as output. But the general rule is:

You will have n number of files matching the n number of hosts that hold data at the time the write is requested. Therefore, all those hosts will write the data they have individually and in parallel.

Based on knowing that, it makes sense why the number of files would fluctuate based on the number of final hosts (usually reducers) holding data at the end.

Example: Basic Spark App (no reduce function)

Say this app reads data into Spark from somewhere and writes it somewhere else. Pretty simple. Your output, at least on distributed storage like HDFS, will be made of n files corresponding to the n partitions of your RDD, Dataframe or Dataset. The number of partitions in that initial object will be determined by either:

  • the number of source files being read (because one task will be assigned to each file / block and will create one partition in the resulting object)
  • telling spark explicitly how many partitions you want in that initial object (see Controlling Initial Partition Count in Spark for an RDD at the top of this article for a how-to)

The simplest options you have in a job like this is to:

  • change how many underlying files are in the source data (not easiest)
  • tell spark how many partitions you want before the read occurs (and since there are no reduce operations, partition count will remain the same)
  • use repartition or coalesce to manually alter the partition size of the consumed data before the write occurs

Using one of the above options, you’ll be able to easily control the size of your output.

Example: Basic Spark App (w/ reduce function)

Say this app reads data into Spark from somewhere and writes it somewhere else. But before it writes, it does a grouping operation (sum, max, etc). Your initial object will, just like the previous scenario, will have a partition count defined by either you, the method of consumption, or the data itself.

When the grouping (a transformation) occurs, the resulting data will have partitions defined by something like spark.default.parallelism or spark.sql.shuffle.partitions, or some other baked-in methodology. You of course now, because you’re reducing, have access to the options explained earlier, to control the partition count and therefore file count of the written data.

Again, those options are repartition or coalesce, or one of the aforementioned partition parallelization settings.

In Conclusion

Sometimes it makes sense to control and define the output of your job. Sometimes it doesn’t. Whatever the reason, always remember the golden rules of distributed storage (especially in the case of Hadoop and HDFS):

  • avoid small files when possible
  • especially avoid them in large quantities (namenode headaches)
  • aim for larger files in smaller quantities (try to match your block size)

But if you need to control the output for a specific reason: now you know how. 🙂

Our Spark + AI Summit 2019 Talks are Now Available Online

At the Spark + AI Summit 2019 (put on by the lovely folks at DataBricks), myself and fellow Hadoopsters (Jack Chapa and Ben Storrie) were lucky enough to have two featured talks/presentations:

Both of those took place on the final day of the summit, and were recorded for your viewing pleasure. Those presentations are now available on YouTube and the Databricks site. They are linked above and below, along with the slides (if you wish to view/download those for reference).

Additionally, all code gists featured in the presentations can be viewed on the Hadoopsters Github:

We are Speaking at Spark + AI Summit 2019!

Hi everyone, Landon here with some exciting news!

I, alongside my colleagues at SpotX (Ben Storrie and Jack Chapa) will be presenting at Spark + AI Summit 2019. It’s an exciting opportunity to share what we’ve learned over the last year or so – particularly in the realm of Spark Streaming applications.

We will be presenting two different sessions:

Headaches and Breakthroughs in Building Continuous Applications

At SpotX, we have built and maintained a portfolio of Spark Streaming applications — all of which process records in the millions per minute. From pure data ingestion, to ETL, to real-time reporting, to live customer-facing products and features, continuous applications are in our DNA. Come along with us as we outline our journey from square one to present in the world of Spark Streaming. We’ll detail what we’ve learned about efficient processing and monitoring, reliability and stability, and long term support of a streaming app. Come learn from our mistakes, and leave with some handy settings and designs you can implement in your own streaming apps.

  • Speakers: Landon Robinson & Jack Chapa
  • Topic: Streaming
  • Time: 11:00am on Thursday, April 25th
  • Room: 2007

Apache Spark Listeners: A Crash Course in Fast, Easy Monitoring

The Spark Listener interface provides a fast, simple and efficient route to monitoring and observing your Spark application – and you can start using it in minutes. In this talk, we’ll introduce the Spark Listener interfaces available in core and streaming applications, and show a few ways in which they’ve changed our world for the better at SpotX. If you’re looking for a “Eureka!” moment in monitoring or tracking of your Spark apps, look no further than Spark Listeners and this talk!

  • Speakers: Landon Robinson & Ben Storrie
  • Topic: Developer
  • Time: 5:30pm on Thursday, April 25th
  • Room: 3016

When I wrote the abstract submissions for this year’s summit, I had little expectation of either of them being accepted, let alone both. So for my part I’m proud to have the opportunity to share the exciting developments we’ve celebrated at SpotX with the wider Big Data community.

For those of you that have been reading this website for a time, you know that my biggest goal has been to democratize the spread of helpful, technical big data knowledge and breakthroughs to engineers around the world. Having these sessions at Spark + AI Summit 2019, and shared freely on YouTube afterward, is a huge step in that pursuit. I couldn’t be prouder.

If you’ll be there, definitely drop by the sessions! I’d love to meet you. Feel free to reach out to me on LinkedIn and we can sync up.

-Landon

How to Join Static Data with Streaming Data (DStream) in Spark

Today we’ll briefly showcase how to join a static dataset in Spark with a streaming “live” dataset, otherwise known as a DStream. This is helpful in a number of scenarios: like when you have a live stream of data from Kafka (or RabbitMQ, Flink, etc) that you want to join with tabular data you queried from a database (or a Hive table, or a file, etc), or anything you can normally consume into Spark.Read More »

How to Write ORC Files and Hive Partitions in Spark

sporc

ORC, or Optimized Row Columnar, is a popular big data file storage format. Its rise in popularity is due to it being highly performant, very compressible, and progressively more supported by top-level Apache products, like Hive, Crunch, Cascading, Spark, and more.

I recently wanted/needed to write ORC files from my Spark pipelines, and found specific documentation lacking. So, here’s a way to do it.Read More »