0%

大数据--spark应用性能优化常见错误处理

本篇文章会讲述spark应用的性能调优, 针对常见的数据倾斜问题提出了一些解决方案, 以及如何避免Task Not SerializableException错误 ## spark基本结构和原理 Spark架构采用了分布式计算中的Master-Slave模型。Master是对应集群中的含有Master进程的节(ClusterManager),Slave是集群中含有Worker进程的节点。Master作为整个集群的控制器,负责整个集群的正常运行;Worker相当于是计算节点,接收主节点命令与进行状态汇报;Executor负责任务的执行;Client作为用户的客户端负责提交应用,Driver负责控制一个应用的执行

  • job 所谓⼀个job,就是由⼀个 rdd 的 action 触发的动作,可以简 单的理解为,当你需要执⾏行⼀个 rdd 的 action 的时候,会⽣生成⼀个 job。

  • stage stage 是⼀个 job 的组成单位,就是说,⼀个 job 会被切分成 1 个或 1 个以上的 stage,然后各个 stage 会按照执⾏行顺序依次执行。在⼀个 job 中划分 stage 的⼀个重要依据是是否有 shuflle 发 ⽣ ,也就是是否会发生数据的重组(重新组织数据)。

  • task 即 stage 下的⼀个任务执⾏行单元,⼀般来说,⼀个 rdd 有多少 个 partition,就会有多少个 task,因为每⼀个 task 只是处理⼀个 partition 上的数据。

数据倾斜

何谓数据倾斜?数据倾斜指的是,并⾏处理的数据集中,某⼀部分(如 Spark 或 Kafka 的⼀个 Partition)的数据显著多于其它部分,从⽽使得该部分的处理速度成为整个数据集处理的瓶颈。 对分布式系统而言,理想情况下,随着系统规模(节点数量)的增加,应用整体耗时线性下降。如果⼀台机器处理⼀批⼤量数据需要120分钟,当机器数量增加到三时,理想的耗时为 120 / 3 = 40 分钟。但是发生数据倾斜时, 任务的执行速度由数据最大的那个任务决定

如何判断数据倾斜

下面举了一些常见数据倾斜的现象 1. 绝⼤多数 task 执行得都非常快,但个别 task 执⾏极慢。比如, 总共有 1000 个 task,997 个 task 都在 1 分钟之内执行完了,但是剩余两三个task却要⼀两个小时。这种情况很常见。 2. 原本能够正常执⾏的 Spark 作业,某天突然报出 OOM(内存溢出)异常,观察异常栈,是我们写的业务代码造成的。

发生数据倾斜的原因

数据倾斜的原理很简单:在进⾏ shuffle 的时候,必须将各个节点上相同的 key 拉取到某个节点上的⼀个 task 来进行处理,比如按照 key 进行聚合或 join 等操作。此时如果某个 key 对应的数据量特别大的话,就会发⽣数据倾斜。比如大部分 key 对应10条数据,但是个别 key 却对应了 100 万条数据,那么大部分 task 可能就只会分配到 10 条数据,然后 1 秒钟就运⾏完了。但是个别 task 可能分配到了 100 万数据,要运⾏一两个小时。因此,整个 Spark 作业的运行进度 是由运行时间最长的那个task决定的。

如何定位数据倾斜

数据倾斜只会发⽣在 shuffle 过程中。这里给⼤家罗列⼀些常用的并且可能会触发 shuffle 操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、 join、cogroup、repartition 等。出现数据倾斜时,可能就是你的代码中使用了这些算⼦中的某⼀个所导致的。

处理数据倾斜的一般方法

  1. 自定义partitioner Spark 在做 Shuffle 时,默认使用 HashPartitioner 对数据进⾏分区,可以使用自定义的 Partitioner,将原本被分配到同⼀个 Task 的不同 Key 分配到不同 Task。

  2. 聚合类操作使用reducedByKey, 或者先局部聚合 使用reduceBYkey会首先在分区中计算,这样可减少部分shuffle 或者通过添加随机数的方式, 先聚合,然后删掉随机数,再次聚合,这样尽可能保证shuffle后的每个分区内的数据不会过多

  3. join操作可以使用广播变量或者增容 在做join关联时,如果其中一个RDD较小,那么建议直接做为广播变量使用, 如果两个RDD都很大, 并且存在数据倾斜(Key分布不均),可以对其中一个RDD中的key添加随机数0~n,另一个RDD扩容n倍(每个key都对应了n个),用空间换时间, 减少大规模的shuffle

正确使用spark广播变量

对于广播变量,尽量避免使用容器嵌套的形式,非常容易导致内存异常,可以先转成字符串(或者其他序列化方式),再进行传输。

这里有个按列:

RDD结构RDD[cookieid, list[obj]] 大小为百万记录, 和另一个RDD[cookieid]需要做关联, 决定采用 广播RDD[cookieid]的方式 过程中分别出现以下问题 1. spark.kryoserializer.buffer.max 设置过小, 广播时需要的buffer 根据实际系列化的对象大小 适当调大 2. Total size of serialized results of 1 tasks (1499.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) 由于广播变量是由driver端分发的, 确保driver端的内存设置合适。 3. 由于spark内部采用的kryo版本存在bug 在序列化的列表对象长度超过10亿时会产生 serializing large (> 1 billion) objects may result in a java.lang.NegativeArraySizeException 解决办法是 临时设置 spark.kryo.refferenceTrackingEnabled=false

spark序列化问题

spark新手会遇到Task NotSerializableException异常,我也曾被这个报错折磨许久,所以我深入研究了一下导致这个问题的来龙去脉。在谈这个问题之前我们先讨论一个小话题: 什么叫闭包 ### 闭包 借用某位程序员的说法, 闭包就是: 1. 闭包是一个有状态(不消失的私有数据)的函数。 2. 闭包是一个有记忆的函数。 3. 闭包相当于一个只有一个方法的紧凑对象(a compact object) 4. 面这三句话是等价的,而其中第 3 句最精妙,可以指导何时、如何用好闭包,后面我会详细分析。 首先闭包是一种函数, 普通函数如果只有局部变量那么就不会和自由变量有关系, 自由变量就是不受我函数内部控制的 贴一段经典java闭包技术: Java最常用的闭包实现办法:内部类实现通用接口,然后将内部类对象向上转型为接口类型。(内部类+接口)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class Milk {
public final static String name = "纯牛奶";//名称
private static int num = 16;//数量
public Milk(){
System.out.println(name+":16/每箱");
}
/**
* 闭包
* @return 返回一个喝牛奶的动作
*/
public Active HaveMeals(){
return new Active(){
public void drink(){
if(num == 0){
System.out.println("木有了,都被你丫喝完了.");
return;
}
num--;
System.out.println("喝掉一瓶牛奶");
}
};
}

/**
* 获取剩余数量
*/
public void currentNum()
{
System.out.println(name+"剩余:"+num);
}
}

/**
* 通用接口
*/
interface Active
{
void drink();
}

public class Person {
public static void main(String[] args) {
//买一箱牛奶
Milk m = new Milk();
Active haveMeals = m.HaveMeals();
//没事喝一瓶
haveMeals.drink();
//有事喝一瓶
haveMeals.drink();
//看看还剩多少?
m.currentNum();
}
}
看着还是比较清楚的, 下面我们再来看看scala的闭包实现
1
2
var factor = 3  
val multiplier = (i:Int) => i * factor
一个函数对象需要引用到外部变量

为什么会task not serializable

让我来引用databricks官方文档 >org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: ...

The above error can be triggered when you intialize a variable on the driver (master), but then try to use it on one of the workers. In that case, Spark Streaming will try to serialize the object to send it over to the worker, and fail if the object is not serializable. Consider the following code snippet: 简单来说, 如果我在worker端使用了driver初始化的变量,需要保证这个对象序列化成功 比如下面的例子:

1
2
3
4
NotSerializable notSerializable = new NotSerializable();
JavaRDD<String> rdd = sc.textFile("/tmp/myfile");

rdd.map(s -> notSerializable.doSomething(s)).collect();

官方给出的处理这个错误的方法有以下几种 - Serializable the class  将引用的类做成可序列化的。 - Declare the instance only within the lambda function passed in map. 将依赖的变量放到map、filter等的参数内部定义。这样就可以使用不支持序列化的类; - Make the NotSerializable object as a static and create it once per machine. 将依赖的变量放到静态对象 这样会在每一个jvm中生成一次 - Call rdd.forEachPartition and create the NotSerializable object in there like this:

1
2
3
4
5
rdd.forEachPartition(iter -> {
NotSerializable notSerializable = new NotSerializable();

// ...Now process iter
});

由于Spark是分布式执行引擎,其核心抽象是弹性分布式数据集RDD,其代表了分布在不同节点的数据。Spark的计算是在executor上分布式执行的,故用户开发的关于RDD的map,flatMap,reduceByKey等transformation 操作(闭包)有如下执行过程: 1. 代码中对象在driver本地序列化 2. 对象序列化后传输到远程executor节点 3. 远程executor节点反序列化对象 4. 最终远程节点执行 故对象在执行中需要序列化通过网络传输,则必须经过序列化过程。

我们的解决方法也很直接, 要么类可以序列化, 要么写成成员函数 先看一个错误例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
object NOTworking extends App {
new testing().doIT
}

//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)

def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}

def someFunc(a:Int) = a+1
}

我们可以让这个类继承Serializable接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.apache.spark.{SparkContext,SparkConf}

object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
new Test().doIT
}

class Test extends java.io.Serializable {
val rddList = Spark.ctx.parallelize(List(1,2,3))

def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}

def someFunc(a: Int) = a + 1
}
或者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.apache.spark.{SparkContext,SparkConf}

object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
new Test().doIT
}

class Test {
val rddList = Spark.ctx.parallelize(List(1,2,3))

def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}

val someFunc = (a: Int) => a + 1
}

具体可以参考Stack Overflow上的回答。

下面再举一个例子说明可能会出现Task Not Serializable 依据上述分析的原因,由于依赖了当前类的成员变量,所以导致当前类全部需要序列化,由于当前类某些字段未做好序列化,导致出错。

1
2
3
4
5
6
7
8
9
10
11
12
13
class MyTest1(conf:String) extends Serializable{
val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org");
private val sparkConf = new SparkConf().setAppName("AppName");
private val sc = new SparkContext(sparkConf);
val rdd = sc.parallelize(list);

private val rootDomain = conf

def getResult(): Array[(String)] = {
val result = rdd.filter(item => item.contains(rootDomain))
result.take(result.count().toInt)
}
}
如果我们将上述代码改成如下形式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class MyTest1(conf:String) extends Serializable{
val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org");
@transient
private val sparkConf = new SparkConf().setAppName("AppName");
@transient
private val sc = new SparkContext(sparkConf);
val rdd = sc.parallelize(list);

private val rootDomain = conf

def getResult(): Array[(String)] = {

val result = rdd.filter(item => item.contains(rootDomain))
result.take(result.count().toInt)
}
}
对类中那些不支持序列化的成员变量标注后,使得整个类能够正常序列化,最终消除Task未序列化问题。

如何避免task not serializable

出现“task not serializable"这个错误,一般是因为在map、filter等的参数使用了外部的变量,但是这个变量不能序列化。特别是当引用了某个类(经常是当前类)的成员函数或变量时,会导致这个类的所有成员(整个类)都需要支持序列化。解决这个问题最常用的方法有: 1. 如果可以,将依赖的变量放到map、filter等的参数内部定义。这样就可以使用不支持序列化的类; 2. 如果可以,将依赖的变量独立放到一个小的class中,让这个class支持序列化;这样做可以减少网络传输量,提高效率; 3. 如果可以,将被依赖的类中不能序列化的部分使用transient关键字修饰,告诉编译器它不需要序列化。 4. 将引用的类做成可序列化的。