UPDATES

WELCOME TO BIGDATATRENDZ      WELCOME TO CAMO      Architectural Patterns for Near Real-Time Data Processing with Apache Hadoop      Working with Apache Spark: Or, How I Learned to Stop Worrying and Love the Shuffle     

Video Bar

Loading...

Sunday, 31 May 2015

Working with Apache Spark: Or, How I Learned to Stop Worrying and Love the Shuffle

this post is from cloudera blog, thanks to IIya Ganelin
Our thanks to Ilya Ganelin, Senior Data Engineer at Capital One Labs, for the guest post below about his hard-earned lessons from using Spark.
I started using Apache Spark in late 2014, learning it at the same time as I learned Scala, so I had to wrap my head around the various complexities of a new language as well as a new computational framework. This process was a great in-depth introduction to the world of Big Data (I previously worked as an electrical engineer for Boeing), and I very quickly found myself deep in the guts of Spark. The hands-on experience paid off; I now feel extremely comfortable with Spark as my go-to tool for a wide variety of data analytics tasks, but my journey here was no cakewalk.
Capital One’s original use case for Spark was to surface product recommendations for a set of 25 million users and 10 million products, one of the largest datasets available for this type of modeling. Moreover, we had the goal of considering all possible products, specifically to capture the “long tail” (AKA less frequently purchased items). That problem is even harder, since to generate all possible pairings of user and products, you’d have 250e12 combinations—which is more data than you can store in memory, or even on disk. Not only were we ingesting more data than Spark could readily handle, but we also had to step away from the standard use case for Spark (batch processing with RDDs) and actually decompose the process of generating recommendations into an iterative operation.
Learning how to properly configure Spark, and use its powerful API in a way that didn’t cause things to break, taught me a lot about its internals. I also learned that at the time, there really wasn’t a consolidated resource that explained how those pieces fit together. The end goal of Spark is to abstract away those internals so the end-user doesn’t have to worry about them, but at the time I was using it (and to some degree today), to write efficient and functional Spark code you need to know what’s going on under the hood. This blog post is intended to reveal just that: to teach the curious reader what’s happening, and to highlight some simple and tangible lessons for writing better Spark programs.
Note: this post is not intended as a ground-zero introduction. Rather, the reader should have some familiarity with Spark’s execution model and the basics of Spark’s API.

The Pieces
First, let’s review the key players in the Spark infrastructure. I won’t touch on all the components, but there are a few basics that I’ll cover. These are partitioning, caching, serialization, and the shuffle operation.
Partitions
Spark’s basic abstraction is the Resilient Distributed Dataset, or RDD. The RDD is how Spark simplifies complex operations like join or groupBy and hides the fact that under the hood, you’re dealing with fragmented data. That fragmentation is what enables Spark to execute in parallel, and the level of fragmentation is a function of the number of partitions of your RDD. The number of partitions is important because a stage in Spark will operate on one partition at a time (and load the data in that partition into memory). Consequently, if you have fewer partitions than active stages, you will wind up under-utilizing your cluster. Furthermore, since with fewer partitions there’s more data in each partition, you increase the memory pressure on your program. On the flip side, with too many partitions, your performance may degrade as you take a greater hit from network and disk I/O. Ultimately this concept ties into Spark’s notion of parallelism and how you can tune it (see the discussion of tuning parallelism here) to optimize performance.
Caching
Spark is a big deal for two main reasons: first, as already mentioned, it has a really simple and useful API for complex processes. The other big thing is that unlike your standard MapReduce program, Spark lets you cache intermediate results in memory. Caching can dramatically improve performance if you have data structures that are used frequently, such as a lookup table or a matrix of scores in a machine-learning algorithm. Caching can also introduce problems since it will often require huge chunks of memory; tuning this process is its own challenge, but doing so can increase performance by several orders of magnitude.
Serialization
In distributed computing, you generally want to avoid writing data back and forth because it’s expensive. Instead, the common paradigm is to bring your code to your data. This is why many frameworks are based on the JVM, which lets you execute code on the same machine as the data. Serialization is the process of translating this code into an ideally compressed format for efficient transfer over the network. By default, Spark uses the standard Java serializer. However, you can get much faster and more memory-efficient serialization using Kryo serialization. Switching to Kryo can reduce memory pressure on your cluster and improve stability.
Shuffle
Even though moving data is expensive, sometimes it’s necessary. For example, certain operations need to consolidate data on a single node so that it can be co-located in memory, such as when you want to perform a reduce operation across all values associated with a particular key of a key-value RDD (reduceByKey()). This expensive reorganization of data is known as the shuffle. The shuffle involves serialization as well as Akka, Spark’s internal messaging system, thereby consuming disk and network I/O while increasing memory pressure from garbage collection. Improperly configured Akka settings or serialization settings can cause problems depending on the size of your data. There is an excellent write-up of what happens during a shuffle in this Cloudera Engineering blog post.

Lessons

Next, let’s delve into how these components all come together when you write a Spark program and, specifically, what lessons I’ve learned about tying all these pieces together.
Lesson 1: Spark Gorges on Memory
As I mentioned previously, part of Spark’s power is its ability to cache things in memory. The downside of this amazing utility is that when you use it, Spark transforms into a total memory hog. First, the JVM and YARN (if you’re using it) consume a significant amount of memory, leaving less than you expect for data movement and caching. Next, there’s metadata that accumulates on the driver as a byproduct of shuffle operations and becomes particularly problematic during long-running jobs (multi-day). Finally, Java or Scala classes may introduce hidden overhead in your RDDs. A 10-character Java string may actually consume as much as 60 bytes! To pour salt in the wound, actually tracking down the source of a problem can be nearly impossible since a Spark program may have logs distributed across the cluster, have hundreds of tasks executing per second, and errors may not always propagate all the way up the stack when exceptions are thrown.
Thus, the first thing to do is to tame this unruly beast. For starters, it’s critical to partition wisely, to manage memory pressure as well as to ensure complete resource utilization. Next, you must always know your data—size, types, and how it’s distributed. This last bit is important since otherwise you may wind up with skewed distribution of data across partitions. A simple solution for this last problem is to use a custom partitioner. Last, as mentioned above, Kryo serialization is faster and more efficient.
To deal with the issue of accumulating metadata, you have two options. First, you can set the spark.cleaner.ttl parameter to trigger automatic cleanups. However, this will also wipe out any persisted RDDs and I found that it caused problems when trying to subsequently interact with HDFS. The other solution, which I ended up implementing in my case, is to simply split long-running jobs into batches and write intermediate results to disk. This way, you have a fresh environment for every batch and don’t have to worry about metadata build-up.
Lesson 2: Avoid Data Movement
In general, I found that avoiding shuffles and minimizing data transfers helped me write programs that ran faster and executed more reliability. Keep in mind: there are occasional cases when having an extra shuffle can help, such as when you have data that can’t be automatically partitioned into many partitions (see “When More Shuffles are Better” here.)
So, how does one avoid shipping data? The obvious answer is to avoid operations that trigger shuffles like repartition and coalesceByKey operations (except forcounting) like groupByKey and reduceByKey, and join operations like cogroup and join.
Spark also provides two mechanisms in particular that help us here. Broadcast variables are read-only variables that are cached in-memory locally on each machine and eliminate the need to ship copies of it with every task. Using broadcast variables can also let you do efficient joins between large and small RDDs or store a lookup table in memory that provides more efficient retrieval than doing an RDD lookup().
Accumulators are a way to efficiently update a variable in parallel during execution. Accumulators differ from broadcast variables in that they may only be read from on the driver process, but they allow Spark programs to efficiently aggregate results such counters, sums, or generated lists. An important note about Accumulators is that they’re not limited to basic types; you can accumulate any Accumulable classes.
Lesson 3: Avoiding Data Movement is Hard
The challenge of using the above mechanisms is that, for example, to broadcast an RDD you need to first collect() it on the driver node. To accumulate the results of distributed execution you need to serialize that data back to the driver and aggregate it there. The upshot of this is that all of a sudden you’re increasing the memory pressure on the driver. Between collected RDDs, the persistent metadata discussed previously, and Accumulators, you may quickly run out of memory on the driver. You can, of course, increase the amount of memory allocated by setting spark.driver.memory, but that only works to a degree.
Above, I mentioned a few things: how a smaller number of partitions increases the size footprint of each partition, and how Spark uses Akka for messaging. If your 2GB RDD is only partitioned into 20 partitions, to serialize the data from one partition to another node, you’ll need to ship a 100MB chunk of data from each partition via Akka. But, by default, Akka’s buffer is only 10 MB! One can get around this by setting akka.frameSize, but this is another example of needing to fundamentally understand your data and how all these moving pieces come together.
Lesson 4: Speed!
The first three lessons deal primarily with stability. Next, I want to briefly describe how I got dramatic performance gains.
The first is fairly obvious: liberal use of caching. Of course, you don’t have infinite memory so you must cache wisely, but in general, if you use some data twice, cache it. Also, unused RDDs that go out of scope will be automatically un-persisted but they may be explicitly released with unpersist().
Second, I found broadcast variables extremely useful. I use them regularly for large maps and lookup tables. They have a significant advantage over an RDD that is cached in memory since a lookup in an RDD, even one cached in memory, will still be O(m), where m is the length of data in a single partition. In contrast, a broadcasted hash map will have a lookup of O(1).
Finally, there are times when I had to help Spark parallelize its execution. In my case, I needed to perform a per-key operation between two RDDs—that is, select a key-value pair from the first and perform some computation with that data against the second RDD. In a naive implementation, Spark would process these keys one at a time, so I was of course getting terrible resource utilization on my cluster. I’d have only a few tasks running at a time and most of my executors weren’t used. Because I wasn’t using the RDD API to run this code, I couldn’t benefit from Spark’s built-in parallelism. The ultimate solution was simple: create a thread pool on the driver that would process multiple keys at the same time. Each of the threads would generate tasks that were submitted to YARN and now I could easily ramp up my CPU utilization.
The final lesson here is that Spark can only help you so long as you help it to do so.

Conclusion

Ideally, the above concept breakdown and lessons (coupled with the provided documentation links) help clarify some of Spark’s underlying complexity. I’ve made liberal citations to the sources that helped me most when it came to understanding all these various mechanisms, as well as those that guided me through learning how to configure and use Spark’s more advanced features. I simply hope you find them as useful as I have!

Introduction to HDFS Erasure Coding in Apache Hadoop

Thanks to blog contributors from Cloudera Erasure coding, a new feature in HDFS, can reduce storage overhead by approximately 50% compar...