IT星球论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

新浪微博账号登陆

只需一步,快速开始

搜索
查看: 70|回复: 0

通过一系列破坏行为加深对spark RDD 的理解(或者是猜测)...

[复制链接]

2004

主题

1

好友

1万

积分

管理员

Rank: 9Rank: 9Rank: 9

优秀会员 助人为乐 辛勤工作 技术精英 多才多艺 优秀班竹 灌水天才 星球管理 宣传大使 灌水之王 财富勋章 版主勋章 动漫勋章 勤奋会员 论坛精英 PS高手 心 8 闪游皮肤 双鱼座 8★8➹ 志愿者 乖

发表于 2017-3-22 12:43:12 |显示全部楼层
通过一系列破坏行为加深对spark RDD 的理解(或者是猜测)(Python 版)

这个实验由一个实验案例产生,实验中,需要对一个数据集进行维护,其中有一个需要对数据一条条进行插入:

下面是最二的写法:

rdd=sc.parallelize([-1])for i in range(10000):    rdd=rdd.union(sc.parallelize())

每次插入数据,新建一个rdd,然后union。

后果是:

Java.lang.OutOfMemoryError: GC overhead limit exceeded

at org.apache.spark.rdd.UnionRDD$$anonfun$getPartitions$2$$anonfun$apply$1.apply(UnionRDD.scala:69)
        at org.apache.spark.rdd.UnionRDD$$anonfun$getPartitions$2$$anonfun$apply$1.apply(UnionRDD.scala:68)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at org.apache.spark.rdd.UnionRDD$$anonfun$getPartitions$2.apply(UnionRDD.scala:68)
        at org.apache.spark.rdd.UnionRDD$$anonfun$getPartitions$2.apply(UnionRDD.scala:68)

。。。

第2119次循环时报的错,但不能对rdd进行操作。也就是说看不出来成功插入的条数,反正就是没意义。

方法二:也挺二,其实是第一种二的基础上改进一下

count=0rdd=sc.parallelize([-1])for i in range(10000):    rdd=rdd.union(sc.parallelize())    count=count+1    if(count>100):        rdd.take(1)        count=0

: org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

第605次循环就报错了,一样的二,类似的结果,不一样的原因

也许还有第三个实验:

count=0rdd=sc.parallelize([-1])for i in range(10000):    rdd=rdd.union(sc.parallelize()).persist()    count=count+1    if(count>100):        rdd.take(1)        count=0

跟实验二差不多。

现在说一下原因。

首先类比一下我们在内存中干同样的事情,

我们有一个数组,循环一万次,每次插入一个元素,似乎没有任何问题。我们也经常这么做。

那我们经常把rdd看作数组(或者list)对待,现在要插入一个元素,似乎只能是用union。为什么就不行?

RDD将操作分为两类:transformation与action。无论执行了多少次transformation操作,RDD都不会真正执行运算,只有当action操作被执行时,运算才会触发。而在RDD的内部实现机制中,底层接口则是基于迭代器的,从而使得数据访问变得更高效,也避免了大量中间结果对内存的消耗。

可见rdd于数组还是有区别。在问题一中,union都被记录下来,而没有去执行,然后,就把内存撑爆了。

第二次,我每过100次,就执行一次action,这样,union就会被执行了,好聪明啊。

但还是挂了,而且挂的更惨。

这是因为,计算的时候,rdd并不把上次rdd的结果作为输入,而是,依然把最初的数据装进来,从头开始算,但是,要算的东西太多,所以堆栈溢出。

第三次,表明persist操作并无明显的作用。

好吧,突然又想到实验四。

count=0rdd=sc.parallelize([-1])for i in range(10000):    rdd=rdd.union(sc.parallelize()).persist()    count=count+1    if(count>100):        myarray=rdd.collect()        rdd=sc.parallelize(myarray)        count=0

结果还在跑,但感觉问题不大。

就是这样的话,数据量过大的话,myarray还是把内存撑爆啊。


综上:最好的做法是,在内存里攒着别插,然后攒够一万条就插一次。

而且在别的场景中,你可以制造大规模的数据,但不要制造大规模的transformation操作。

通过一系列破坏行为加深对spark RDD 的理解(或者是猜测)(python 版)
该会员没有填写今日想说内容.
您需要登录后才可以回帖 登录 | 立即注册 新浪微博账号登陆

回顶部