An Empirical Study of the Out of Memory Errors in Apache Spark.docx

Publish in

Documents

245 views

Please download to get full document.

View again

of 28
All materials on our website are shared by users. If you have any questions about copyright issues, please report us to resolve them. We are always happy to assist you.
Share
Description
An Empirical Study of Out of Memory Errors in Apache Spark Caused Identified by: Cause identified by No. Users themselves 33 Experts 29 Us 5 Total 67 Cases: Sources User-defined code GraphX MLlib #Total #Reproduced #Unknown StackOverflow 16 1 2 19 3 26 Spark mailing list 42
Tags
Transcript
  An Empirical Study of Out of Memory Errors in Apache Spark   Caused Identified by: Cause identified by No. Users themselves 33 Experts 29 Us 5 Total 67 Cases: Sources User-defined code GraphX MLlib #Total #Reproduced #Unknown StackOverflow 16 1 2 19 3 26 Spark mailing list 42 1 5 48 14 80 Total 58 2 7 67 17 106 Cause patterns: Category Pattern Pattern description No. Ratio Large data stored in memory Large buffered data 2 Large cached data 7 Abnormal dataflow Improper data partition 13 Hotspot key 7 Large single record 1 Memory-consuming user code Large external data 0 Large intermediate results 2 Large accumulated results 10 Driver Large generated results 9 Large collected results 16 Total Fix suggestions: Suggestions Related pattern No. Lower buffer size Large data buffers 2 Lower cache size Large cached data 2 Lower storage level 3 Add partition number, repartition Improper data partition 10 Change partition function 1 Change key (add an auxiliary key) Hotspot key 1 Sub-divide the group, cut down the data associated to a single key 2 Aggregate partial values for each group 1 write multiple times, split the single large record Large single record 1 Add partition number, change storage Large intermediate results Adjust the parameter, moving the array outside the iterator (for reuse), add partition number (2) Large accumulated results Large external data Reduce partition number Large generates large results Remove collect() (2) Reduce task number Tree reduce, tree aggregation Use a small k Large collected results   Large buffered data 1.  A: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0    User: We used JavaPairRDD.repartitionAndSortWithinPartitions on 100GB data and it kept failing similarly to your app. Then we looked at the Yarn logs on the specific nodes and found out that we have some kind of out-of-memory problem, so the Yarn interrupted the execution. Our solution was to change/add spark.shuffle.memoryFraction 0 in .../spark/conf/spark-defaults.conf. That allowed us to handle a much larger (but unfortunately not infinite) amount of data this way.  Job type: User-defined (StackOverflow) Causes: Large buffered data (User) Fix suggestions: lower the buffer size (user) Fix details: Change spark.shuffle.memoryFraction to 0 to force it to spill permanently, to force it to spill permanently, The price was 20% of time. 2. MLLib /ALS : java.lang.OutOfMemoryError: Java heap space    User: I am running into an out of memory error while running ALS using MLLIB on a reasonably small data set consisting of around 6 Million ratings.   at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) Expert: I am not sure this can help you. I have 57 million rating,about 4million user and 4k items. I used 7-14 total-executor-cores,executal-memory 13g,cluster have 4 nodes,each have 4cores,max memory 16g. I found set as follows may help avoid this problem: conf.set( spark.shuffle.memoryFraction , 0.65 ) //default is 0.2 conf.set( spark.storage.memoryFraction , 0.3 )//default is 0.6 I have to set rank value under 40, otherwise occure this problem.  Job type: User-defined (Mailing list) Causes: Large buffered data (User) Fix suggestions: decrease the rank value, lower the buffer size, lower the cache limit (Expert) Fix details: decrease the rank value, I have to set rank value under 40 Large cached data 1. tiers of caching    User: i noticed that some algorithms such as graphx liberally cache RDDs for efficiency, which makes sense. however it can also leave a long trail of unused yet cached RDDs, that might push other RDDs out of memory.  Expert: I think tiers/priorities for caching are a very good idea and I'd be interested to see what others think. In addition to letting libraries cache RDDs liberally, it could also unify memory management across other parts of Spark. For example, small shuffles benefit from explicitly keeping the shuffle outputs in memory rather than writing it to disk, possibly due to filesystem overhead. To prevent in-memory shuffle outputs from competing with application RDDs, Spark could mark them as lower-priority and specify that they should be dropped to disk when memory runs low.  Job type: User-defined (Mailing list) Causes: Large cached data (User) Fix suggestions: no 2. Kyro serialization slow and runs OOM  User: when I load my dataset, transform it with some one to one transformations, and try to cache the eventual RDD  - it runs really slow and then runs out of memory. When I remove Kyro serializer and default back to java serialization it works just fine and is able to load and cache the 700Gs of resultant data. Job type: User-defined (Mailing list)  Causes: Large cached data (User) Fix suggestions: no 3. Problems with broadcast large datastructure    User: Spark repeatedly fails broadcast a large object on a cluster of 25 machines for me. I have a node of 20 machines, and I just run the broadcast example, what I do is just change the data size in the example, to 400M, this is really a small data size. but I occurred the same problem with you . Expert: 400MB isn't really that big. Broadcast is expected to work with several GB of data and in even larger clusters (100s of machines). if you are using the default HttpBroadcast, then akka isn't used to move the broadcasted data.  But block manager can run out of memory if you repetitively broadcast large objects.  Another scenario is that the master isn't receiving any heartbeats from the blockmanager because the control messages are getting dropped due to bulk data movement. Can you provide a bit more details on your network setup? Also, you can try doing a binary search over the size of broadcasted data to see at what size it breaks (i.e, try to broadcast 10, then 20, then 40 etc etc.)? Also, limit each run to a single iteration in the example (right now, it tries to broadcast 3 consecutive times). If you are using a newer branch, you can also try the new TorrentBroadcast implementation. your code is broadcasting 400MB 30 times, which are not being evicted from the cache fast enough, which, I think, is causing blockManagers to run out of memory. Job type: User-defined (Mailing list) Causes: Large cached data (Expert) Fix suggestions: Try TorrentBroadcast 4. Spark streaming questions    User: Can someone explain the usage of cache w.r.t spark streaming? For example if we do stream.cache(), will the cache remain constatnt with all the partitions of rdd present across the nodes for that stream, OR will it be regularly updated as in while new batch is coming? Expert: If you call DStream.persist (persist == cache = true), then all RDDs generated by the DStream will be persisted in the cache (in the BlockManager). As new RDDs are generated and persisted, old RDDs from the same DStream will fall out of memory. either by LRU or explicitly if spark.streaming.unpersist is set to true. Well it is clear that the combineByKey is taking the most amount of time and 7 seconds. So you need to increase the number of reducers in the reduceByKeyAndWindow operation. That should distribute the computation more to use all the cores, and therefore speed up the processing of each batch. Job type: Streaming (Mailing list) Causes: Large cached data (Expert) Fix suggestions: Change storage level, increase reduce number, add combineByKey() Fix details: To fall of to disk you have to use MEMORY_AND_DISK_SER or MEMORY_AND_DISK_SER_2. Note that, SER = keep data serialized, good for GC behavior (see programming guide), and _2 = replicate twice. 5. OOM when calling cache on RDD with big data    User: I have a very simple job that simply caches the hadoopRDD by calling cache/persist on it. I tried MEMORY_ONLY, MEMORY_DISK and DISK_ONLY for caching strategy, I always get OOM on executors. And it works fine if I do not call cache or persist on the RDD: Expert: Job type: User-defined (Mailing list) Causes: Large cached data (User) Fix suggestions: no 6. [Graphx] some problem about using SVDPlusPlus    User: The implementation of SVDPlusPlus shows that it produces two new graph in each iteration which will also be cached to memory. However,  as the iteration goes on, more and more graph will be cached and out of memory happens.  So I think it maybe need to unpersist old graph which will not be used any more and add a few lines of code, the details are showed as follows: Expert:  Job type: GraphX (Mailing list) Causes: Large cached data (User) Fix suggestions: unpersist old graph which will not be used any more 7. [0.9.0] MEMORY  AND DISK_SER not falling back to disk  User: My understanding of the MEMORY_AND_DISK_SER persistence level was that if an RDD could fit into memory then it would be left there (same as MEMORY_ONLY), and only if it was too big for memory would it spill to disk. Here's how the docs describe it: What I'm observing though is that really large RDDs are actually causing OOMs. I'm not sure if this is a regression in 0.9.0 or if it has been this way for some time. I dropped down to 0.5 but still OOM'd, so sent it all the way to 0.1 and didn't get an OOM. Expert: This probably mean s that there’s not enough free memory for the “scratch” space used for computations, so we OOM before the Spark cache decides that it’s full and starts to spill stuff. Try reducing spark.storage.memoryFraction (default is 0.66, try  0.5). Job type: User-defined (Mailing list) Causes: Large cached data (Expert) Fix suggestions: lower the cache size Fix details: dropped spark.storage.memoryFraction down to 0.5 but still OOM'd, so sent it all the way to 0.1 and didn't get an OOM. (fixed)  Improper data partition 1.  Q: Spark runs out of memory when grouping by key    User: I am attempting to perform a simple transformation of common crawl data using Spark host on an EC2 using this guide, my code looks like   this: So my basic question is, what is necessary to write a Spark task that can group by key with an almost unlimited amount of input without running out of memory?  Expert: The most common cause of java.lang.OutOfMemoryError exceptions in shuffle tasks (such as groupByKey, reduceByKey, etc.) is low level of  parallelism.    Job type: User-defined (StackOverflow) Causes: Improper data partition (Expert) Fix suggestions: add partition number Fix details: Increase default value by setting spark.default.parallelism property (suggestion) 2.  Q: Regarding spark input data partition and coalesce  User: partition the input data(80 million records) into partitions using RDD.coalesce(numberOfPArtitions) before submitting it to mapper/reducer function. Without using coalesce() or repartition() on the input data spark executes really slow and fails with out of memory exception. Expert: Determining the number of partitions is a bit tricky. Spark by default will try and infer a sensible number of partitions. Job type: User-defined (StackOverflow) Causes: Improper data partition (User) Fix suggestions: repartition() (User) 3.  Q: Why does Spark RDD partition has 2GB limit for HDFS    User: The Integer.MAX_SIZE is 2GB, it seems that some partition out of memory. So i repartiton my rdd partition to 1000, so that each partition could hold far less data as before. Finally, the problem is solved!!!  Expert: It is a critical issue which prevents use of spark with very large datasets. Increasing the number of partitions can resolve it (like in OP's case), but is not always feasible, for instance when there is large chain of transformations part of which can increase data (flatMap etc) or in cases where data is skewed.  Job type: User-defined (StackOverflow) Causes: Improper data partition (User)
Related Search
Related Documents
View more...
We Need Your Support
Thank you for visiting our website and your interest in our free products and services. We are nonprofit website to share and download documents. To the running of this website, we need your help to support us.

Thanks to everyone for your continued support.

No, Thanks