spark算子调优

如果是普通的map,比如一个partition中有1万条数据;ok,那么你的function要执行和计算1万次。 但是,使用MapPartitions操作之后,一个task仅仅会执行一次function,function一次接收所有的partition数据。只要执行一次就可以了,性能比较高。

如果是普通的map操作,一次function的执行就处理一条数据;那么如果内存不够用的情况下,比如处理了1千条数据了,那么这个时候内存不够了,那么就可以将已经处理完的1千条数据从内存里面垃圾回收掉,或者用其他方法,腾出空间来吧。所以说普通的map操作通常不会导致内存的OOM异常。 但是MapPartitions操作,对于大量数据来说,比如甚至一个partition,100万数据,一次传入一个function以后,那么可能一下子内存不够,但是又没有办法去腾出内存空间来,可能就OOM,内存溢出。

1、每个partition数据量变少了,但是在后面进行处理的时候,还是要跟partition数量一样数量的task,来进行处理;有点浪费task计算资源。
2、每个partition的数据量不一样,会导致后面的每个task处理每个partition的时候,每个task要处理的数据量就不同,这个时候很容易发生什么问题?数据倾斜。。。。
比如说,第二个partition的数据量才100;但是第三个partition的数据量是900;那么在后面的task处理逻辑一样的情况下,不同的task要处理的数据量可能差别达到了9倍,甚至10倍以上;同样也就导致了速度的差别在9倍,甚至10倍以上。

1、针对第一个问题,我们希望可以进行partition的压缩吧,因为数据量变少了,那么partition其实也完全可以对应的变少。比如原来是4个partition,现在完全可以变成2个partition。那么就只要用后面的2个task来处理即可。就不会造成task计算资源的浪费。(不必要,针对只有一点点数据的partition,还去启动一个task来计算)
2、针对第二个问题,其实解决方案跟第一个问题是一样的;也是去压缩partition,尽量让每个partition的数据量差不多。那么这样的话,后面的task分配到的partition的数据量也就差不多。不会造成有的task运行速度特别慢,有的task运行速度特别快。避免了数据倾斜的问题。

优化数据库连接操作,foreach每条数据都创建一次连接,foreachPartition每个partition创建一次连接。

如果一个partition的数量真的特别特别大,可能会发生OOM,内存溢出问题。

用Spark SQL的那个stage的并行度,你没法自己指定。Spark SQL自己会默认根据hive表对应的hdfs文件的block,自动设置Spark SQL查询所在的那个stage的并行度。你自己通过spark.default.parallelism参数指定的并行度,只会在没有Spark SQL的stage中生效。

repartition算子,你用Spark SQL这一步的并行度和task数量,肯定是没有办法去改变了。但是呢,可以将你用Spark SQL查询出来的RDD,使用repartition算子,去重新进行分区,此时可以分区成多个partition,比如从20个partition,分区成100个。
然后从repartition以后的RDD,再往后,并行度和task数量,就会按照你预期的来了。就可以避免跟Spark SQL绑定在一个stage中的算子,只能使用少量的task去处理大量数据以及复杂的算法逻辑。

reduceByKey,相较于普通的shuffle操作(比如groupByKey),它的一个特点,就是说,会进行map端的本地聚合。

a、用reduceByKey对性能的提升
1、在本地进行聚合以后,在map端的数据量就变少了,减少磁盘IO。而且可以减少磁盘空间的占用。
2、下一个stage,拉取数据的量,也就变少了。减少网络的数据传输的性能消耗。
3、在reduce端进行数据缓存的内存占用变少了。
4、reduce端,要进行聚合的数据量也变少了。
b、reduceByKey在什么情况下使用呢?
1、非常普通的,比如说,就是要实现类似于wordcount程序一样的,对每个key对应的值,进行某种数据公式或者算法的计算(累加、类乘)
2、对于一些类似于要对每个key进行一些字符串拼接的这种较为复杂的操作,可以自己衡量一下,其实有时,也是可以使用reduceByKey来实现的。但是不太好实现。如果真能够实现出来,对性能绝对是有帮助的。(shuffle基本上就占了整个spark作业的90%以上的性能消耗,主要能对shuffle进行一定的调优,都是有价值的)

随机文章