一、updateStateByKey

官网原话:

sparkstreaming数据解析(Streaming状态管理函数的选择比较)(1)

也即是说它会统计全局的key的状态,就算没有数据输入,它也会在每一个批次的时候返回之前的key的状态。

缺点:若数据量太大的话,需要checkpoint的数据会占用较大的存储,效率低下。

程序示例如下:

sparkstreaming数据解析(Streaming状态管理函数的选择比较)(2)

二、mapWithState

mapWithState:也是用于全局统计key的状态,但是它如果没有数据输入,便不会返回之前的key的状态,有一点增量的感觉。效率更高,生产中建议使用

官方代码如下

sparkstreaming数据解析(Streaming状态管理函数的选择比较)(3)

三、源码分析

upateStateByKey:

sparkstreaming数据解析(Streaming状态管理函数的选择比较)(4)

跟进去 PairDStreamFunctions ,发现最终调用的是自己的updateStateByKey。

其中updateFunc就要传入的参数,他是一个函数,Seq[V]表示当前key对应的所有值,

Option[S] 是当前key的历史状态,返回的是新的状态。

sparkstreaming数据解析(Streaming状态管理函数的选择比较)(5)

最终调用:

sparkstreaming数据解析(Streaming状态管理函数的选择比较)(6)

再跟进去 new StateDStream:

在这里面new出了一个StateDStream对象。在其compute方法中,会先获取上一个batch计算出的RDD(包含了至程序开始到上一个batch单词的累计计数),然后在获取本次batch中StateDStream的父类计算出的RDD(本次batch的单词计数)分别是prevStateRDD和parentRDD,然后在调用 computeUsingPreviousRDD 方法:

sparkstreaming数据解析(Streaming状态管理函数的选择比较)(7)

在这里两个RDD进行cogroup然后应用updateStateByKey传入的函数。我们知道cogroup的性能是比较低下,参考【http://lxw1234.com/archives/2015/07/384.htm】。

mapWithState:

sparkstreaming数据解析(Streaming状态管理函数的选择比较)(8)

说明:StateSpec 封装了状态管理函数,并在该方法中创建了MapWithStateDStreamImpl对象。

MapWithStateDStreamImpl 中创建了一个InternalMapWithStateDStream类型对象internalStream,在MapWithStateDStreamImpl的compute方法中调用了internalStream的getOrCompute方法。

sparkstreaming数据解析(Streaming状态管理函数的选择比较)(9)

InternalMapWithStateDStream中没有getOrCompute方法,这里调用的是其父类 DStream 的getOrCpmpute方法,该方法中最终会调用InternalMapWithStateDStream的Compute方法:

sparkstreaming数据解析(Streaming状态管理函数的选择比较)(10)

根据给定的时间生成一个MapWithStateRDD,首先获取了先前状态的RDD:preStateRDD和当前时间的RDD:dataRDD,然后对dataRDD基于先前状态RDD的分区器进行重新分区获取partitionedDataRDD。最后将preStateRDD,partitionedDataRDD和用户定义的函数mappingFunction传给新生成的MapWithStateRDD对象返回。

后续若有兴趣可以继续跟进MapWithStateRDD的compute方法,限于篇幅不再展示。

,