0x0 ProcessFunctionAPI概述
我们之前学习的转换算子是无法访问事件的时间戳信息和水位线信息的。而这在一些应用场景下,极为重要。例如MapFunction这样的map转换算子就无法访问时间戳或者当前事件的事件时间。
基于此,DataStream API提供了一系列的Low-Level转换算子。可以访问时间戳、watermark以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。Process Function用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window函数和转换算子无法实现)。例如,Flink SQL就是使用Process Function实现的。
Flink提供了8个Process Function:
ProcessFunction
KeyedProcessFunction
CoProcessFunction
ProcessJoinFunction
BroadcastProcessFunction
KeyedBroadcastProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction
0x1 状态编程
流式计算分为无状态和有状态两种情况。
无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收温度读数,并在温度超过90度时发出警告。有状态的计算则会基于多个事件输出结果。
使用状态编程的算例
- 按时间:
例如,计算过去一小时的平均温度,就是有状态的计算。
- 复杂事件检测
例如,若在一分钟内收到两个相差20度以上的温度读数,则发出警告,这是有状态的计算。
流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是有状态的计算。
下图展示了无状态流处理和有状态流处理的主要区别。无状态流处理分别接收每条数据记录(图中的黑条),然后根据最新输入的数据生成输出数据(白条)。
有状态流处理会维护状态(根据每条输入记录进行更新),并基于最新输入的记录和当前的状态值生成输出记录(灰条)。
1.状态的类型
Flink内置的很多算子,数据源source,数据存储sink都是有状态的,流中的数据都是buffer records,会保存一定的元素或者元数据。例如: ProcessWindowFunction会缓存输入流的数据,ProcessFunction会保存设置的定时器信息等等。
在Flink中,状态始终与特定算子相关联。总的来说,有两种类型的状态:
算子状态(operator state)
键控状态(keyed state)
2.有状态的算子和应用程序
Flink内置的很多算子,数据源source,数据存储sink都是有状态的,流中的数据都是buffer records,会保存一定的元素或者元数据。例如: ProcessWindowFunction会缓存输入流的数据,ProcessFunction会保存设置的定时器信息等等。
在Flink中,状态始终与特定算子相关联。总的来说,有两种类型的状态:
l 算子状态(operator state)
l 键控状态(keyed state)
2.1 算子状态(operator state)
算子状态的作用范围限定为算子任务。
状态与算子挂钩
这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。
Flink为算子状态提供三种基本数据结构:
列表状态(List state)
将状态表示为一组数据的列表。
联合列表状态(Union list state)
也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。
广播状态(Broadcast state)
如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。
2.2 键控状态(keyed state)
键控状态是根据输入数据流中定义的键(key)来维护和访问的。
Flink为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。Keyed State很类似于一个分布式的key-value map数据结构,只能用于KeyedStream(keyBy算子处理之后)。
3.键控状态的支持类型与基本操作
(1)Flink的Keyed State支持以下数据类型:
ValueState<T>//保存单个的值,值的类型为T。
get操作: ValueState.value()
set操作: ValueState.update(T value)
(2)ListState<T>保存一个列表,列表里的元素的数据类型为T。基本操作如下:
ListState.add(T value)
ListState.addAll(List<T> values)
ListState.get()返回Iterable<T>
ListState.update(List<T> values)
(3)MapState<K, V>保存Key-Value对。
MapState.get(UK key)
MapState.put(UK key, UV value)
MapState.contains(UK key)
MapState.remove(UK key)
(4)ReducingState<T>
(5)AggregatingState<I, O>
(6)State.clear()是清空操作。
0x2 实战案例
package com.ecust.state;
import com.ecust.beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/**
* @author JueQian
* @create 01-18 9:01
* 需求:监测传感器温度,在10秒内不下降,使用processAPI.onTimer
*/
public class Flink02_State_ProcessOnTimer {
public static void main(String[] args) throws Exception {
//0x0 获取执行环境与端口数据源
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);
//0x1 将数据源转换为SensorReading对象
SingleOutputStreamOperator<SensorReading> sensorStream = source.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields = s.split(",");
String id = fields[0];
long ts = Long.parseLong(fields[1]);
double temp = Double.parseDouble(fields[2]);
return new SensorReading(id, ts, temp);
}
});
//0x2 分组,然后使用processAPi定时统计计算
sensorStream.keyBy(SensorReading::getId).process(new MyKeyedProcessFunction(10));
//0x3 执行计划
env.execute();
}
public static class MyKeyedProcessFunction extends KeyedProcessFunction<String,SensorReading,SensorReading>{
//间隔属性
private long interval;
//状态属性
private ValueState<Double> lastTemp;
//定时器属性
private ValueState<Long> timer;
public MyKeyedProcessFunction() {
}
//定义构造器外部传入参数
public MyKeyedProcessFunction(long interval) {
this.interval=interval;
}
//生命周期函数
@Override
public void open(Configuration parameters) throws Exception {
//获取上下文对象
lastTemp = getRuntimeContext().getState(new ValueStateDescriptor<Double>("lastTemp", Double.class));
//获取定时器记录状态
timer = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer", Long.class));
}
@Override
public void processElement(SensorReading sensorReading, Context context, Collector<SensorReading> collector) throws Exception {
//获取现在传感器的温度
Double temp = sensorReading.getTemp();
//获取上次传感器的温度
Double last_temp = lastTemp.value();
//获取当前时间
long ts = context.timerService().currentProcessingTime();
if (last_temp!=null && temp<last_temp){
Long lastTs = timer.value();
context.timerService().deleteProcessingTimeTimer(lastTs);
//注册新的10秒的定时器
context.timerService().registerProcessingTimeTimer(ts+interval*1000L);
//更新状态
timer.update(ts+interval*1000L);
}else if (last_temp==null){
//注册新的10秒的定时器
context.timerService().registerProcessingTimeTimer(ts+interval*1000L);
//更新状态
timer.update(ts+interval*1000L);
}
lastTemp.update(temp);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<SensorReading> out) throws Exception {
System.out.println("温度在"+interval+"秒内没有下降");
}
}
}
0x3 状态后端
1.概述
每传入一条数据,有状态的算子任务都会读取和更新状态。由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务都会在本地维护其状态,以确保快速的状态访问。状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(state backend)
状态后端主要负责两件事:
本地的状态管理
将检查点(checkpoint)状态写入远程存储
用途:用 Data Stream API 编写的程序通常以各种形式保存状态:
- 在 Window 触发之前要么收集元素、要么聚合
- 转换函数可以使用 key/value 格式的状态接口来存储状态
- 转换函数可以实现
CheckpointedFunction
接口,使其本地变量具有容错能力
2.状态后端分类:
MemoryStateBackend
在 MemoryStateBackend 内部,数据以 Java 对象的形式存储在堆中。 Key/value 形式的状态和窗口算子持有存储着状态值、触发器的 hash table。
在 CheckPoint 时,State Backend 对状态进行快照,并将快照信息作为 CheckPoint 应答消息的一部分发送给 JobManager(master),同时 JobManager 也将快照信息存储在堆内存中。
何时使用MemoryStateBackend?
MemoryStateBackend 能配置异步快照。强烈建议使用异步快照来防止数据流阻塞,注意,异步快照默认是开启的。 用户可以在实例化 MemoryStateBackend
的时候,将相应布尔类型的构造参数设置为 false
来关闭异步快照(仅在 debug 的时候使用),例如:
new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
MemoryStateBackend 的限制:
- 默认情况下,每个独立的状态大小限制是 5 MB。在 MemoryStateBackend 的构造器中可以增加其大小。
- 无论配置的最大状态内存大小(MAX_MEM_STATE_SIZE)有多大,都不能大于 akka frame 大小(看配置参数)。
- 聚合后的状态必须能够放进 JobManager 的内存中。
MemoryStateBackend 适用场景:
- 本地开发和调试。
- 状态很小的 Job,例如:由每次只处理一条记录的函数(Map、FlatMap、Filter 等)构成的 Job。Kafka Consumer 仅仅需要非常小的状态。
建议同时将 managed memory 设为0,以保证将最大限度的内存分配给 JVM 上的用户代码。
FsStateBackend
FsStateBackend 需要配置一个文件系统的 URL(类型、地址、路径),例如:”hdfs://namenode:40010/flink/checkpoints” 或 “file:///data/flink/checkpoints”。
FsStateBackend 将正在运行中的状态数据保存在 TaskManager 的内存中。CheckPoint 时,将状态快照写入到配置的文件系统目录中。 少量的元数据信息存储到 JobManager 的内存中(高可用模式下,将其写入到 CheckPoint 的元数据文件中)。
FsStateBackend 默认使用==异步快照==来防止 CheckPoint 写状态时对数据处理造成阻塞。 用户可以在实例化 FsStateBackend
的时候,将相应布尔类型的构造参数设置为 false
来关闭异步快照,例如:
new FsStateBackend(path, false);
FsStateBackend 适用场景:
- 状态比较大、窗口比较长、key/value 状态比较大的 Job。
- 所有高可用的场景。
建议同时将 managed memory 设为0,以保证将最大限度的内存分配给 JVM 上的用户代码。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
yaml文件配置
# 用于存储 operator state 快照的 State Backend
state.backend: filesystem
# 存储快照的目录
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
==RocksDBStateBackend==
RocksDBStateBackend 需要配置一个文件系统的 URL (类型、地址、路径),例如:”hdfs://namenode:40010/flink/checkpoints” 或 “file:///data/flink/checkpoints”。
RocksDBStateBackend 将正在运行中的状态数据保存在 RocksDB 数据库中,RocksDB 数据库默认将数据存储在 TaskManager 的数据目录。 CheckPoint 时,整个 RocksDB 数据库被 checkpoint 到配置的文件系统目录中。 少量的元数据信息存储到 JobManager 的内存中(高可用模式下,将其存储到 CheckPoint 的元数据文件中)。
RocksDBStateBackend 只支持异步快照。
RocksDBStateBackend 的限制:
- 由于 RocksDB 的 JNI API 构建在 byte[] 数据结构之上, 所以每个 key 和 value 最大支持 2^31 字节(2G, 重要信息: RocksDB 合并操作的状态(例如:ListState)累积数据量大小可以超过 2^31 字节==(4G)==,但是会在下一次获取数据时失败。这是当前 RocksDB JNI 的限制。
RocksDBStateBackend 的适用场景:
- 状态非常大、窗口非常长、key/value 状态非常大的 Job。
- 所有高可用的场景。
注意,你可以保留的状态大小仅受磁盘空间的限制。与状态存储在内存中的 FsStateBackend 相比,RocksDBStateBackend 允许存储非常大的状态。 然而,这也意味着使用 RocksDBStateBackend 将会使应用程序的最大吞吐量降低。 所有的读写都必须序列化、反序列化操作,这个比基于堆内存的 state backend 的效率要低很多。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.10.0</version>
</dependency>
设置状态后端为RocksDBStateBackend:
设置状态后端的配置
3.异步快照
参考:
https://arxiv.org/abs/1506.08603 (Lightweight Asynchronous Snapshots for Distributed Dataflows)
3.1 概述
flink的checkpoint(检查点)的原理:checkpoint是目前主流的分布式流式处理框架用于恢复失败作业而保证数据不丢失的常用方法,也是flink实现exactly-once的基础。
以checkpoint为基础,定期生成全局的状态快照(global stat snapshot),当出现作业失败,将集群状态恢复到上一个可用的global stat snapshot再开始继续计算,从而保证数据不丢失。
同时这个“checkpoint/snapshot”还必须尽可能不影响正常的流式计算过程,不能说在生成“checkpoint/snapshot”的时候,对整个集群的处理速度有很大影响,甚至停下来(文章中举了一个反例,叫Naiad,这个不了解),那么这种方案等同于不可用。
Flink采用的是轻量级异步快照,叫做 ABS(Asynchronous Barrier Snapshotting).
3.2 异步快照
所以flink有一篇另外描述exactly-once的文章https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html。大概思路是:把属于一个全局快照的所有sink数据一次性提交,提交成功才算该全局快照执行成功。另外外部存储还要有事务性写入(一批数据要么写入都成功,要么都失败)的功能。kafka 0.11开始有提供事务性写入功能。
0x4 RocksDB State Backend
1.增量快照
RocksDBStateBackend 支持增量快照。不同于产生一个包含所有数据的全量备份,增量快照中只包含自上一次快照完成之后被修改的记录,因此可以显著减少快照完成的耗时。
一个增量快照是基于(通常多个)前序快照构建的。由于 RocksDB 内部存在 compaction 机制对 sst 文件进行合并,Flink 的增量快照也会定期重新设立起点(rebase),因此增量链条不会一直增长,旧快照包含的文件也会逐渐过期并被自动清理。
和基于全量快照的恢复时间相比,如果网络带宽是瓶颈,那么基于增量快照恢复可能会消耗更多时间,因为增量快照包含的 sst 文件之间可能存在数据重叠导致需要下载的数据量变大;而当 CPU 或者 IO 是瓶颈的时候,基于增量快照恢复会更快,因为从增量快照恢复不需要解析 Flink 的统一快照格式来重建本地的 RocksDB 数据表,而是可以直接基于 sst 文件加载。
虽然状态数据量很大时我们推荐使用增量快照,但这并不是默认的快照机制,您需要通过下述配置手动开启该功能:
- 在
flink-conf.yaml
中设置:state.backend.incremental: true
或者 - 在代码中按照右侧方式配置(来覆盖默认配置):
RocksDBStateBackend backend = new RocksDBStateBackend(filebackend, true);
需要注意的是,一旦启用了增量快照,网页上展示的 Checkpointed Data Size
只代表增量上传的数据量,而不是一次快照的完整数据量。
2.内存管理
Flink 致力于控制整个进程的内存消耗,以确保 Flink 任务管理器(TaskManager)有良好的内存使用,从而既不会在容器(Docker/Kubernetes, Yarn等)环境中由于内存超用被杀掉,也不会因为内存利用率过低导致不必要的数据落盘或是缓存命中率下降,致使性能下降。
为了达到上述目标,Flink 默认将 RocksDB 的可用内存配置为任务管理器的单槽(per-slot)托管内存量。这将为大多数应用程序提供良好的开箱即用体验,即大多数应用程序不需要调整 RocksDB 配置,简单的增加 Flink 的托管内存即可改善内存相关性能问题。
当然,您也可以选择不使用 Flink 自带的内存管理,而是手动为 RocksDB 的每个列族(ColumnFamily)分配内存(每个算子的每个 state 都对应一个列族)。这为专业用户提供了对 RocksDB 进行更细粒度控制的途径,但同时也意味着用户需要自行保证总内存消耗不会超过(尤其是容器)环境的限制。请参阅 large state tuning 了解有关大状态数据性能调优的一些指导原则。
总结:flink会分配一个slot给RocksDB
==RocksDB 使用托管内存==
这个功能默认打开,并且可以通过 state.backend.rocksdb.memory.managed
配置项控制。
Flink 并不直接控制 RocksDB 的 native 内存分配,而是通过配置 RocksDB 来确保其使用的内存正好与 Flink 的托管内存预算相同。这是在任务槽(per-slot)级别上完成的(托管内存以任务槽为粒度计算)。
为了设置 RocksDB 实例的总内存使用量,Flink 对同一个任务槽上的所有 RocksDB 实例使用共享的 cache 以及 write buffer manager。 共享 cache 将对 RocksDB 中内存消耗的三个主要来源(块缓存、索引和bloom过滤器、MemTables)设置上限。
Flink还提供了两个参数来控制写路径(MemTable)和读路径(索引及过滤器,读缓存)之间的内存分配。当您看到 RocksDB 由于缺少写缓冲内存(频繁刷新)或读缓存未命中而性能不佳时,可以使用这些参数调整读写间的内存分配。
state.backend.rocksdb.memory.write-buffer-ratio
,默认值0.5
,即 50% 的给定内存会分配给写缓冲区使用。state.backend.rocksdb.memory.high-prio-pool-ratio
,默认值0.1
,即 10% 的 block cache 内存会优先分配给索引及过滤器。 我们强烈建议不要将此值设置为零,以防止索引和过滤器被频繁踢出缓存而导致性能问题。此外,我们默认将L0级的过滤器和索引将被固定到缓存中以提高性能,更多详细信息请参阅 RocksDB 文档。
注意 上述机制开启时将覆盖用户在 PredefinedOptions
和 RocksDBOptionsFactory
中对 block cache 和 write buffer 进行的配置。
注意 仅面向专业用户:若要手动控制内存,可以将 state.backend.rocksdb.memory.managed
设置为 false
,并通过 ColumnFamilyOptions
配置 RocksDB。 或者可以复用上述 cache/write-buffer-manager 机制,但将内存大小设置为与 Flink 的托管内存大小无关的固定大小(通过 state.backend.rocksdb.memory.fixed-per-slot
选项)。 注意在这两种情况下,用户都需要确保在 JVM 之外有足够的内存可供 RocksDB 使用。
3.计时器(内存 vs. RocksDB)
计时器(Timer)用于安排稍后的操作(基于事件时间或处理时间),例如触发窗口或回调 ProcessFunction
。
当选择 RocksDBStateBackend 时,默认情况下计时器也存储在 RocksDB 中。这是一种健壮且可扩展的方式,允许应用程序使用很多个计时器。另一方面,在 RocksDB 中维护计时器会有一定的成本,因此 Flink 也提供了将计时器存储在 JVM 堆上而使用 RocksDB 存储其他状态的选项。当计时器数量较少时,基于堆的计时器可以有更好的性能。
您可以通过将 state.backend.rocksdb.timer-service.factory
配置项设置为 heap
(而不是默认的 rocksdb
)来将计时器存储在堆上。
注意 在 RocksDBStateBackend 中使用基于堆的计时器的组合当前不支持计时器状态的异步快照。其他状态(如 keyed state)可以被异步快照。
4.对于RocksDB总结
RocksDB是一个本地库,
它直接从进程分配内存, 而不是从JVM分配内存。
分配给 RocksDB 的任何内存都必须被考虑在内,
通常需要将这部分内存从任务管理器(TaskManager)的JVM堆中减去。
不这样做可能会导致JVM进程由于分配的内存超过申请值而被 YARN/Mesos 等资源管理框架终止。
算控状态
Operator ID | State
------------+------------------------
source-id | State of StatefulSource
mapper-id | State of StatefulMapper
目前每个算子的每个状态都在 RocksDB 中有专门的一个列族存储
RocksDB
您也可以通过配置一个 RocksDBOptionsFactory
来手动控制 RocksDB 的选项。此机制使您可以对列族的设置进行细粒度控制,例如内存使用、线程、Compaction 设置等。目前每个算子的每个状态都在 RocksDB 中有专门的一个列族存储。
0x5 RocksDB数据库系统
当选择 RocksDB 作为状态后端时,状态将作为序列化字节串存在于堆外内存(off-heap) 存储或本地磁盘中。
RocksDB 是一个以日志合并树( LSM 树)作为索引结构的 KV 存储引擎。当用于在 Flink 中存储 kv 状态时,键由 的序列化字节串组成,而值由状态的序列化字节组成。每次注册 kv 状态时,它都会映射到列族(column-family)(类似于传统数据库中的表),并将键值对以字节串存储在 RocksDB 中。这意味着每次读写(READ or WRITE)操作都必须对数据进行反序列化或者序列化,与 Flink 内置的 in-memory 状态后端相比,会有一些性能开销。
使用 RocksDB 作为状态后端有许多优点:
- 不受 Java 垃圾回收的影响,与 heap 对象相比,它的内存开销更低,并且是目前唯一支持增量检查点(incremental checkpointing)的选项。
- 使用 RocksDB,状态大小仅受限于本地可用的磁盘空间大小,这很适合 state 特别大的 Flink 作业。
下面的图表将进一步阐明 RocksDB 的基本读写操作。
RocksDB 的一次写入操作将把数据写入到内存的 MemTable 中。当 MemTable 写满时,它将成为 READ ONLY MemTable,并被一个新申请的 MemTable 替换。只读 MemTable 被后台线程周期性地刷新到磁盘中,生成按键排序的只读文件,这便是所谓的 SSTables。这些 SSTable 是不可变的,通过后台的多路归并实现进一步的整合。如前所述,对于 RocksDB,每个注册状态都是一个列族,这意味着每个状态都包含自己的 MemTables 和 SSTables 集。