官方文档:Working with State | Apache Flink
有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入数据的基础上不断更新状态。下面的几个场景都需要使用流处理的状态功能:
- 数据流中的数据有重复,想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。
- 检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。
- 对一个时间窗口内的数据进行聚合分析,分析一个小时内某项指标的75分位或99分位的数值。
个状态更新和获取的流程如下图所示,一个算子子任务接收输入流,获取对应的状态,根据新的计算结果更新状态。
1)键控状态(Keyed State)
keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下。换句话说,这些状态仅可在 KeyedStream 上使用,在Java/Scala API上可以通过 stream.keyBy(...) 得到 KeyedStream,在Python API上可以通过 stream.key_by(...) 得到 KeyedStream。
1、控件状态特点2、键控状态类型
- 键控状态是根据输入数据流中定义的键(key)来维护和访问的
- Flink 为每个 key 维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个 key 对应的状态
- 当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的 key
键控状态类型
说明
方法
ValueState[T]
值状态,保存一个可以更新和检索的值
ValueState.update(value: T)ValueState.value()
ListState[T]
列表状态,保存一个元素的列表可以往这个列表中追加数据,并在当前的列表上进行检索。
ListState.add(value: T)ListState.addAll(values: java.util.List[T])ListState.update(values: java.util.List[T])ListState.get()(注意:返回的是Iterable[T])
ReducingState
聚合状态,保存一个单值,表示添加到状态的所有值的聚合,接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。
ReducingState.add(value: T)ReducingState.get()
AggregatingState<IN, OUT>
聚合状态,保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。
AggregatingState.add(value: T)AggregatingState.get()
MapState<UK, UV>
映射状态,维护了一个映射列表,保存Key-Value对。
MapState.get(key: K)MapState.put(key: K, value: V)MapState.contains(key: K)MapState.remove(key: K)
【温馨提示】所有类型的状态还有一个clear() 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。
3、状态有效期 (TTL)任何类型的 keyed state 都可以有 有效期 (TTL)。所有状态类型都支持单元素的 TTL。 这意味着列表元素和映射元素将独立到期。
【官网示例】
package com import org.apache.flink.api.common.state.StateTtlConfig import org.apache.flink.api.common.state.ValueStateDescriptor import org.apache.flink.api.common.time.Time object StateTest001 { def main(args: Array[String]): Unit = { val ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String]) stateDescriptor.enableTimeToLive(ttlConfig) } }
TTL 配置有以下几个选项:
1)过期数据的清理
- newBuilder 的第一个参数表示数据的有效期,是【必选项】。
- TTL 的更新策略(默认是 OnCreateAndWrite)StateTtlConfig.UpdateType.OnCreateAndWrite - 仅在创建和写入时更新StateTtlConfig.UpdateType.OnReadAndWrite - 读取时也更新
默认情况下,过期数据会在读取的时候被删除,例如 ValueState#value,同时会有后台线程定期清理(如果 StateBackend 支持的话)。可以通过 StateTtlConfig 配置关闭后台清理:
2)全量快照时进行清理
import org.apache.flink.api.common.state.StateTtlConfig val ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .disableCleanupInBackground .build
可以启用全量快照时进行清理的策略,这可以减少整个快照的大小。当前实现中不会清理本地的状态,但从上次快照恢复时,不会恢复那些已经删除的过期数据。 该策略可以通过 StateTtlConfig 配置进行配置:
import org.apache.flink.api.common.state.StateTtlConfig import org.apache.flink.api.common.time.Time val ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .cleanupFullSnapshot .build
【温馨提示】这种策略在 RocksDBStateBackend 的增量 Checkpoint 模式下无效。
3)增量数据清理另外可以选择增量式清理状态数据,在状态访问或/和处理时进行。如果某个状态开启了该清理策略,则会在存储后端保留一个所有状态的惰性全局迭代器。 每次触发增量清理时,从迭代器中选择已经过期的数进行清理。
4)在 RocksDB 压缩时清理如果使用 RocksDB state backend,则会启用 Flink 为 RocksDB 定制的压缩过滤器。RocksDB 会周期性的对数据进行合并压缩从而减少存储空间。 Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的状态数据。该特性可以通过 StateTtlConfig 进行配置:
import org.apache.flink.api.common.state.StateTtlConfig val ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .cleanupInRocksdbCompactFilter(1000) .build
【注意】
4、键控状态的使用
- 如果没有 state 访问,也没有处理数据,则不会清理过期数据。
- 增量清理会增加数据处理的耗时。
- 现在仅 Heap state backend 支持增量清除机制。在 RocksDB state backend 上启用该特性无效。
- 如果 Heap state backend 使用同步快照方式,则会保存一份所有 key 的拷贝,从而防止并发修改问题,因此会增加内存的使用。但异步快照则没有这个问题。
- 对已有的作业,这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性,比如从 savepoint 重启后。
除了上面描述的接口之外,Scala API 还在 KeyedStream 上对 map() 和 flatMap() 访问 ValueState 提供了一个更便捷的接口mapWithState。 用户函数能够通过 Option 获取当前 ValueState 的值,并且返回即将保存到状态的值。
2)算子状态(Operatior State)
val stream: DataStream[(String, Int)] = ... val counts: DataStream[(String, Int)] = stream .keyBy(_._1) .mapWithState((in: (String, Int), count: Option[Int]) => count match { case Some(c) => ( (in._1, c), Some(c in._2) ) case None => ( (in._1, 0), Some(in._2) ) })
算子状态(或者非 keyed 状态)是绑定到一个并行算子实例的状态。Kafka Connector 是 Flink 中使用算子状态一个很具有启发性的例子。Kafka consumer 每个并行实例维护了 topic partitions 和偏移量的 map 作为它的算子状态。
【温馨提示】 Python DataStream API 仍无法支持算子状态。
1、算子状态特点2、算子状态类型
- 算子状态的作用范围限定为算子任务,由同一并行任务所处理的所有数据都可以访问到相同的状态
- 状态对于同一子任务而言是共享的
- 算子状态不能由相同或不同算子的另一个子任务访问
3)广播状态 (Broadcast State)
键控状态类型
说明
列表状态(ListState)
将状态表示为一组数据的列表
联合列表状态(UnionListState)
也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复
广播状态(BroadcastState)
如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。
广播状态是一种特殊的算子状态。引入它的目的在于支持一个流中的元素需要广播到所有下游任务的使用情形。在这些任务中广播状态用于保持所有子任务状态相同。 该状态接下来可在第二个处理记录的数据流中访问。广播状态和其他算子状态的不同之处在于:
- 它具有 map 格式,
- 它仅在一些特殊的算子中可用。这些算子的输入为一个广播数据流和非广播数据流,
- 这类算子可以拥有不同命名的多个广播状态 。
【温馨提示】 Python DataStream API 仍无法支持算子状态。
二、状态后端(State Backends)状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(state backend) ,状态后端主要负责两件事:本地的状态管理,以及将检查点(checkpoint)状态写入远程存储。
1)三种状态存储方式
存储方式
说明
MemoryStateBackend
【默认模式】状将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上,将checkpoint存储在JobManager的内存中。主要适用于本地开发和调试。
FsStateBackend
基于文件系统进行存储,可以是本地文件系统,也可以是 HDFS 等分布式文件系统。 需要注意而是虽然选择使用了 FsStateBackend ,但正在进行的数据仍然是存储在 TaskManager 的内存中的,只有在 checkpoint 时,才会将状态快照写入到指定文件系统上。
RocksDBStateBackend
将所有状态序列化后,存入本地的RocksDB中存储。
【温馨提示】特别在 MemoryStateBackend 内使用HeapKeyedStateBackend时,Checkpoint 序列化数据阶段默认有最大 5 MB数据的限制。
对于HeapKeyedStateBackend,有两种实现:
2)配置方式
- 支持异步 Checkpoint(默认):存储格式 CopyOnWriteStateMap
- 仅支持同步 Checkpoint:存储格式 NestedStateMap
Flink 支持使用两种方式来配置后端管理器:
1、【第一种方式】基于代码方式进行配置【温馨提示】只对当前作业生效
// 配置 FsStateBackend env.setStateBackend(new FsStateBackend("hdfs://namenode:port/flink/checkpoints")); // 配置 RocksDBStateBackend env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:port/flink/checkpoints"));
配置 RocksDBStateBackend 时,需要额外导入下面的依赖:
2、【第二种方式】基于 flink-conf.yaml 配置文件的方式进行配置
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>1.9.0</version> </dependency>
【温馨提示】对所有部署在该集群上的作业都生效
三、容错机制(checkpoint)
state.backend: filesystem state.checkpoints.dir: hdfs://namenode:port/flink/checkpoints
checkpoint是Flink容错的核心机制。它可以定期地将各个Operator处理的数据进行快照存储( Snapshot )。如果Flink程序出现宕机,可以重新从这些快照中恢复数据。
1)一致性谈到容错性,就没法避免一致性这个概念。所谓一致性就是:成功处理故障并恢复之后得到的结果与没有发生任何故障是得到的结果相比,前者的正确性。换句大白话,就是故障的发生是否影响得到的结果。在流处理过程,一致性分为3个级别:
- at-most-once:至多一次。故障发生之后,计算结果可能丢失,就是无法保证结果的正确性;
- at-least-once:至少一次。计算结果可能大于正确值,但绝不会小于正确值,就是计算程序发生故障后可能多算,但是绝不可能少算;
- exactly-once:精确一次。系统保证发生故障后得到的计算结果的值和正确值一致;
Flink的容错机制保证了exactly-once,也可以选择at-least-once。Flink的容错机制是通过对数据流不停的做快照(snapshot)实现的。
2)检查点(checkpoint)Flink 中的每个方法或算子都能够是有状态的,为了让状态容错,Flink 需要为状态添加 checkpoint(检查点)。Checkpoint 使得 Flink 能够恢复状态和在流中的位置,从而向应用提供和无故障执行时一样的语义。官方文档:Checkpointing | Apache Flink
1、开启与配置 Checkpoint默认情况下 checkpoint 是禁用的。通过调用 StreamExecutionEnvironment 的 enableCheckpointing(n)来启用 checkpoint,里面的 n 是进行 checkpoint 的间隔,单位毫秒。
2、Checkpoint 属性
属性
说明
精确一次(exactly-once)
你可以选择向 enableCheckpointing(long interval, CheckpointingMode mode) 方法中传入一个模式来选择保证等级级别。
checkpoint 超时
如果 checkpoint 执行的时间超过了该配置的阈值,还在进行中的 checkpoint 操作就会被抛弃。
checkpoints 之间的最小时间
该属性定义在 checkpoint 之间需要多久的时间,以确保流应用在 checkpoint 之间有足够的进展。如果值设置为了 5000, 无论 checkpoint 持续时间与间隔是多久,在前一个 checkpoint 完成时的至少五秒后会才开始下一个 checkpoint。
checkpoint 可容忍连续失败次数
该属性定义可容忍多少次连续的 checkpoint 失败。超过这个阈值之后会触发作业错误 fail over。 默认次数为“0”,这意味着不容忍 checkpoint 失败,作业将在第一次 checkpoint 失败时fail over。
并发 checkpoint 的数目
默认情况下,在上一个 checkpoint 未完成(失败或者成功)的情况下,系统不会触发另一个 checkpoint。这确保了拓扑不会在 checkpoint 上花费太多时间,从而影响正常的处理流程。 不过允许多个 checkpoint 并行进行是可行的,对于有确定的处理延迟(例如某方法所调用比较耗时的外部服务),但是仍然想进行频繁的 checkpoint 去最小化故障后重跑的 pipelines 来说,是有意义的。
externalized checkpoints
你可以配置周期存储 checkpoint 到外部系统中。Externalized checkpoints 将他们的元数据写到持久化存储上并且在 job 失败的时候不会被自动删除。 这种方式下,如果你的 job 失败,你将会有一个现有的 checkpoint 去恢复。更多的细节请看 Externalized checkpoints 的部署文档。
【官网示例】
3)从检查点恢复状态
val env = StreamExecutionEnvironment.getExecutionEnvironment() // 每 1000ms 开始一次 checkpoint env.enableCheckpointing(1000) // 高级选项: // 设置模式为精确一次 (这是默认值) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // 确认 checkpoints 之间的时间会进行 500 ms env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) // Checkpoint 必须在一分钟内完成,否则就会被抛弃 env.getCheckpointConfig.setCheckpointTimeout(60000) // 允许两个连续的 checkpoint 错误 env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2) // 同一时间只允许一个 checkpoint 进行 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) // 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留 env.getCheckpointConfig().enableExternalizedCheckpoints( ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) // 开启实验性的 unaligned checkpoints env.getCheckpointConfig.enableUnalignedCheckpoints()
4)检查点的实现算法
- 【第一步】遇到故障之后,第一步就是重启应用
- 【第二步】是从 checkpoint 中读取状态,将状态重置,从检查点重新启动应用程序后,其内部状态与检查点完成时的状态完全相同
- 【第三步】开始消费并处理检查点到发生故障之间的所有数据,这种检查点的保存和恢复机制可以为应用程序状态提供“精确一次”(exactly- once)的一致性,因为所有算子都会保存检查点并恢复其所有状态,这样一来所有的输入流就都会被重置到检查点完成时的位置
5)检查点算法
- 【一种简单的想法】:暂停应用,保存状态到检查点,再重新恢复应用
- 【Flink 的改进实现】:基于 Chandy-Lamport 算法的分布式快照将检查点的保存和数据处理分离开,不暂停整个应用
基于Chandy-Lamport算法实现的分布式快照
1、检查点分界线(Checkpoint Barrier)
2、Barrier对齐
- 将barrier插入到数据流中,作为数据流的一部分和数据一起向下流动。Barrier不会干扰正常数据,数据流严格有序。
- 一个barrier把数据流分割成两部分:一部分进入到当前快照,另一部分进入到下一个快照。
- 每一个barrier都带有快照ID,并且barrier之前的数据都进入了此快照。Barrier不会干扰数据流处理,所以非常轻量。
- 多个不同快照的多个barrier会在流中同时出现,即多个快照可能同时创建。
当一个opeator有多个输入流的时候,checkpoint barrier n 会进行对齐,就是已到达的会先缓存到buffer里等待其他未到达的,一旦所有流都到达,则会向下游广播,exactly-once 就是利用这一特性实现的,at least once 因为不会进行对齐,就会导致有的数据被重复处理。
3、执行一次检查点步骤
- jobManager会向每个source任务发送一条带有新检查点ID的消息,通过这种方式来启动检查点。
- 数据源将他们各自的状态写入检查点后,并向下游所有分区发出一个检查点barrier。状态后端在状态存入检查点之后,会返回通知给source任务,source任务再向jobmanager确认检查点完成。
- barrier向下游传递,下游任务会等待所有输入分区的barrier的到达后再做状态保存通知jobmanager状态保存完成,并再向下游所有分区发送收到的检查点barrier。
【温馨提示】对于barrier已经到达的分区,继续到达的数据会被缓存;对于barrier未到达的分区,数据会被正常处理所有barrier都到达后,做完状态保存且向下游发送检查点barrier后,当前任务继续处理缓存的数据和后面到来的数据。
6)保存点(savepoint)1、概述
- sink任务向jobmanager确认状态保存到checkpoint完成。即所有任务都确认已成功将状态保存到检查点时,检查点就真正完成了。
2、savepoint触发的三种方式
- Flink 还提供了可以自定义的镜像保存功能,就是保存点(savepoints);
- 原则上,创建保存点使用的算法与检查点完全相同,因此保存点可以认为就是具有一些额外元数据的检查点;
- Flink不会自动创建保存点,因此用户(或者外部调度程序)必须明确地触发创建操作;
- 保存点是一个强大的功能。除了故障恢复外,保存点可以用于:有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用,等等。
7)检查点(checkpoint)与 保存点(savepoint)的区别与联系
- 使用 flink savepoint 命令触发 Savepoint,其是在程序运行期间触发 savepoint。
- 使用 flink cancel -s 命令,取消作业时,并触发 Savepoint。
- 使用 Rest API 触发 Savepoint,格式为:/jobs/:jobid /savepoints
- checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。而savepoint的侧重点是“维护”,即Flink作业需要在人工干预下手动重启、升级、迁移或A/B测试时,先将状态整体写入可靠存储,维护完毕之后再从savepoint恢复现场。
- savepoint是“通过checkpoint机制”创建的,所以savepoint本质上是特殊的checkpoint。
- checkpoint面向Flink Runtime本身,由Flink的各个TaskManager定时触发快照并自动清理,一般不需要用户干预;savepoint面向用户,完全根据用户的需要触发与清理。
- checkpoint的频率往往比较高(因为需要尽可能保证作业恢复的准确度),所以checkpoint的存储格式非常轻量级,但作为trade-off牺牲了一切可移植(portable)的东西,比如不保证改变并行度和升级的兼容性。savepoint则以二进制形式存储所有状态数据和元数据,执行起来比较慢而且“贵”,但是能够保证portability,如并行度改变或代码升级之后,仍然能正常恢复。
未完待续,请耐心等待~
,