org.apache.spark.shuffle.FetchFailedException: The relative remote executor(Id: 21), which maintains...

问题描述

org.apache.spark.shuffle.FetchFailedException: The relative remote executor(Id: 21), which maintains the block data to fetch is dead.
最近在做Spark的性能优化,测试使用不同CPU核数和内存对计算性能的影响,由于是在测试集群进行测试的,硬件配置比生产上面的要少和低,遇到了不少的问题,其中一个值得说一下的就是org.apache.spark.shuffle.FetchFailedException:Failed to connect to /xxx:43301

一、运行环境

1.1 硬件

3台测试服务器,分别为A,B,C,每台4核,16GB内存
每台部署HDFS的DataNode和Spark的Worker
其中A同时部署了HDFS的NameNode
其中B同时部署了Spark的Master
其中C是Spark的Driver

1.2 软件

HDFS 2.7.3,集群
Spark 2.1.0,标准集群模式
Java 1.8.0_131

二、Spark启动参数

2.1 测试1

2.1.1 测试参数

spark.driver.cores 没有配置,默认使用1
spark.driver.maxResultSize 配置2g,默认是1g
spark.driver.memory 配置3g,默认是1g
spark.executor.memory 配置8g,默认是1g
spark.executor.cores 没有配置,默认使用Worker全部核数,这里是4

2.1.2 测试结果

Spark集群每个Worker创建了1个Executor,每个Executor使用了4核和8g内存,可以得出结果,耗时2小时

2.2 测试2

2.2.1 测试参数

spark.driver.cores 没有配置,默认使用1
spark.driver.maxResultSize 配置2g,默认是1g
spark.driver.memory 配置3g,默认是1g

因为想每个Worker创建多于1个Executor,测试多个Executors是否能提高性能,所以修改以下参数:

spark.executor.memory 配置4g,是测试1的一半
spark.executor.cores 配置2,是测试1的一半

2.2.2 测试结果

Spark集群每个Worker创建了2个Executors(spark.cores.max/spark.executor.cores=4/2=2),每个Executor使用了2核和4g内存,总使用资源和测试1是一样的,也就是每台服务器的2个Executors总共使用了4核和8g内存,但是遇到以下异常:

[WARN][TaskSetManager] Lost task 6.0 in stage 4.0 (TID 307, xxx, executor 0): FetchFailed(BlockManagerId(1, xxx, 33557, None), shuffleId=0, mapId=7, reduceId=6, message=
org.apache.spark.shuffle.FetchFailedException: Failed to connect to /xxx:43301
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154)
	at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
	at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:85)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

三、异常分析

因为Executor可用的资源减少了一半,shuffle执行的时间变长,内存使用过多导致无响应心跳,超过默认的spark.network.timeout=120s,对应的Executor会被移除,任务丢失:

[WARN][HeartbeatReceiver] Removing executor 5 with no recent heartbeats: 120504 ms exceeds timeout 120000 ms
[ERROR][TaskSchedulerImpl] Lost executor 5 on xxx: Executor heartbeat timed out after 120504 ms
[WARN][TaskSetManager] Lost task 8.0 in stage 4.0 (TID 309, xxx, executor 5): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 122504 ms

Spark的DAGScheduler会尝试提交失败的task到其它的Executors,但是由于其它的Executors也是使用同样的配置资源,最终的任务还是会失败。

四、解决方案

减少使用触发shuffle的操作,例如reduceByKey,从而减少使用内存
增大spark.network.timeout,从而允许有更多时间去等待心跳响应
增加spark.executor.cores,从而减少创建的Executor数量,使得总使用内存减少
同时增大spark.executor.memory,保证每个Executor有足够的可用内存
增大spark.shuffle.memoryFraction,默认为0.2(需要spark.memory.useLegacyMode配置为true,适用于1.5或更旧版本,已经deprecated)

另外可以参考官方的shuffle配置参数:http://spark.apache.org/docs/latest/configuration.html#shuffle-behavior