Flink01_工具与架构


0x0.Flink的应用

Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。

1.处理无界和有界数据

任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流。

数据可以被作为 无界 或者 有界流来处理。

  1. 无界流 有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
  2. 有界流 有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理

有界流与无界流

Apache Flink 擅长处理无界和有界数据集 精确的时间控制和状态化使得 Flink 的运行时(runtime)能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。

2.Flink的强大之处

  • 处理每天处理数万亿的事件,
  • 应用维护几TB大小的状态, 和
  • 应用在数千个内核上运行

3.如何利用内存性能

有状态的 Flink 程序针对本地状态访问进行了优化。任务的状态始终保留在内存中,如果状态大小超过可用内存,则会保存在能高效访问的磁盘数据结构中。任务通过访问本地(通常在内存中)状态来进行所有的计算,从而产生非常低的处理延迟。Flink 通过定期和异步地对本地状态进行持久化存储来保证故障场景下精确一次的状态一致性。

img

0x1.Flink的操作工具

0.流处理应用的基本组件

可以由流处理框架构建和执行的应用程序类型是由框架对 状态时间 的支持程度来决定的。在下文中,我们将对上述这些流处理应用的基本组件逐一进行描述,并对 Flink 处理它们的方法进行细致剖析。

1.流Stream

显而易见,(数据)流是流处理的基本要素。然而,流也拥有着多种特征。这些特征决定了流如何以及何时被处理。Flink 是一个能够处理任何类型数据流的强大处理框架。

  • 有界无界 的数据流:流可以是无界的;也可以是有界的,例如固定大小的数据集。Flink 在无界的数据流处理上拥有诸多功能强大的特性,同时也针对有界的数据流开发了专用的高效算子。
  • 实时历史记录 的数据流:所有的数据都是以流的方式产生,但用户通常会使用两种截然不同的方法处理数据。或是在数据生成时进行实时的处理;亦或是先将数据流持久化到存储系统中——例如文件系统或对象存储,然后再进行批处理。Flink 的应用能够同时支持处理实时以及历史记录数据流。

2.状态State

只有在每一个单独的事件上进行转换操作的应用才不需要状态,换言之,每一个具有一定复杂度的流处理应用都是有状态的。任何运行基本业务逻辑的流处理应用都需要在一定时间内存储所接收的事件或中间结果,以供后续的某个时间点(例如收到下一个事件或者经过一段特定时间)进行访问并进行后续处理。

算子与状态协同处理流

应用状态是 Flink 中的特点,Flink 提供了许多状态管理相关的特性支持,其中包括:

  • 多种状态基础类型:Flink 为多种不同的数据结构提供了相对应的状态基础类型,例如原子值(value),列表(list)以及映射(map)。开发者可以基于处理函数对状态的访问方式,选择最高效、最适合的状态基础类型。
  • 插件化的State Backend:State Backend 负责管理应用程序状态,并在需要的时候进行 checkpoint。Flink 支持多种 state backend,可以将状态存在内存或者 RocksDB。RocksDB 是一种高效的嵌入式、持久化键值存储引擎。Flink 也支持插件式的自定义 state backend 进行状态存储。
  • 精确一次语义:Flink 的 checkpoint 和故障恢复算法保证了故障发生后应用状态的一致性。因此,Flink 能够在应用程序发生故障时,对应用程序透明,不造成正确性的影响。
  • 超大数据量状态:Flink 能够利用其异步以及增量式的 checkpoint 算法,存储数 TB 级别的应用状态。
  • 可弹性伸缩的应用:Flink 能够通过在更多或更少的工作节点上对状态进行重新分布,支持有状态应用的分布式的横向伸缩

3.时间WaterMark

时间是流处理应用另一个重要的组成部分。因为事件总是在特定时间点发生,所以大多数的事件流都拥有事件本身所固有的时间语义。进一步而言,许多常见的流计算都基于时间语义,例如窗口聚合、会话计算、模式检测和基于时间的 join。流处理的一个重要方面是应用程序如何衡量时间,即区分事件时间(event-time)和处理时间(processing-time)。

Flink 提供了丰富的时间语义支持。

  • 事件时间模式:使用事件时间语义的流处理应用根据事件本身自带的时间戳进行结果的计算。因此,无论处理的是历史记录的事件还是实时的事件,事件时间模式的处理总能保证结果的准确性和一致性。
  • Watermark 支持:Flink 引入了 watermark 的概念,用以衡量事件时间进展。Watermark 也是一种平衡处理延时和完整性的灵活机制。
  • 迟到数据处理:当以带有 watermark 的事件时间模式处理数据流时,在计算完成之后仍会有相关数据到达。这样的事件被称为迟到事件。Flink 提供了多种处理迟到数据的选项,例如将这些数据重定向到旁路输出(side output)或者更新之前完成计算的结果。
  • 处理时间模式:除了事件时间模式,Flink 还支持处理时间语义。处理时间模式根据处理引擎的机器时钟触发计算,一般适用于有着严格的低延迟需求,并且能够容忍近似结果的流处理应用。

4.分层 API

Flink 根据抽象程度分层,提供了三种不同的 API。每一种 API 在简洁性和表达力上有着不同的侧重,并且针对不同的应用场景。

flink分层API

ProcessFunction

ProcessFunction 是 Flink 所提供的最具表达力的接口。ProcessFunction 可以处理一或两条输入数据流中的单个事件或者归入一个特定窗口内的多个事件。它提供了对于时间和状态的细粒度控制。开发者可以在其中任意地修改状态,也能够注册定时器用以在未来的某一时刻触发回调函数。因此,你可以利用 ProcessFunction 实现许多有状态的事件驱动应用所需要的基于单个事件的复杂业务逻辑。

DataStream API

DataStream API 为许多通用的流处理操作提供了处理原语。这些操作包括窗口、逐条记录的转换操作,在处理事件时进行外部数据库查询等。DataStream API 支持 Java 和 Scala 语言,预先定义了例如map()reduce()aggregate() 等函数。你可以通过扩展实现预定义接口或使用 Java、Scala 的 lambda 表达式实现自定义的函数。

下面的代码示例展示了如何捕获会话时间范围内所有的点击流事件,并对每一次会话的点击量进行计数。

SQL & Table API

Flink 支持两种关系型的 API,Table API 和 SQL。这两个 API 都是批处理和流处理统一的 API,这意味着在无边界的实时数据流和有边界的历史记录数据流上,关系型 API 会以相同的语义执行查询,并产生相同的结果。Table API 和 SQL 借助了 Apache Calcite 来进行查询的解析,校验以及优化。它们可以与 DataStream 和 DataSet API 无缝集成,并支持用户自定义的标量函数,聚合函数以及表值函数。

Flink 的关系型 API 旨在简化数据分析数据流水线和 ETL 应用的定义。

下面的代码示例展示了如何使用 SQL 语句查询捕获会话时间范围内所有的点击流事件,并对每一次会话的点击量进行计数。此示例与上述 DataStream API 中的示例有着相同的逻辑。

0x2.Flink的稳定性

Apache Flink 是一个针对无界和有界数据流进行有状态计算的框架。由于许多流应用程序旨在以最短的停机时间连续运行,因此流处理器必须提供出色的故障恢复能力,以及在应用程序运行期间进行监控和维护的工具。

Apache Flink 非常注重流数据处理的可运维性。因此在这一小节中,我们将详细介绍 Flink 的故障恢复机制,并介绍其管理和监控应用的功能。

1.如何保持7 * 24小时稳定运行

Flink通过几下多种机制维护应用可持续运行及其一致性:

  • 检查点的一致性: Flink的故障恢复机制是通过建立分布式应用服务状态一致性检查点实现的,当有故障产生时,应用服务会重启后,再重新加载上一次成功备份的状态检查点信息。结合可重放的数据源,该特性可保证精确一次(exactly-once)的状态一致性。
  • 高效的检查点: 如果一个应用要维护一个TB级的状态信息,对此应用的状态建立检查点服务的资源开销是很高的,为了减小因检查点服务对应用的延迟性(SLAs服务等级协议)的影响,Flink采用异步及增量的方式构建检查点服务。
  • 端到端的精确一次: Flink 为某些特定的存储支持了事务型输出的功能,及时在发生故障的情况下,也能够保证精确一次的输出。
  • 集成多种集群管理服务: Flink已与多种集群管理服务紧密集成,如 Hadoop YARN, Mesos, 以及 Kubernetes。当集群中某个流程任务失败后,一个新的流程服务会自动启动并替代它继续执行。
  • 内置高可用服务: Flink内置了为解决单点故障问题的高可用性服务模块,此模块是基于Apache ZooKeeper 技术实现的,Apache ZooKeeper是一种可靠的、交互式的、分布式协调服务组件。

2.如何升级、迁移、暂停、恢复应用服务

而Flink的 Savepoint 服务就是为解决升级服务过程中记录流应用状态信息及其相关难题而产生的一种唯一的、强大的组件。一个 Savepoint,就是一个应用服务状态的一致性快照,因此其与checkpoint组件的很相似,但是与checkpoint相比,Savepoint 需要手动触发启动,而且当流应用服务停止时,它并不会自动删除。Savepoint 常被应用于启动一个已含有状态的流服务,并初始化其(备份时)状态。Savepoint 有以下特点:

  • 便于升级应用服务版本: Savepoint 常在应用版本升级时使用,当前应用的新版本更新升级时,可以根据上一个版本程序记录的 Savepoint 内的服务状态信息来重启服务。它也可能会使用更早的 Savepoint 还原点来重启服务,以便于修复由于有缺陷的程序版本导致的不正确的程序运行结果。
  • 方便集群服务移植: 通过使用 Savepoint,流服务应用可以自由的在不同集群中迁移部署。
  • 方便Flink版本升级: 通过使用 Savepoint,可以使应用服务在升级Flink时,更加安全便捷。
  • 增加应用并行服务的扩展性: Savepoint 也常在增加或减少应用服务集群的并行度时使用。
  • 便于A/B测试及假设分析场景对比结果: 通过把同一应用在使用不同版本的应用程序,基于同一个 Savepoint 还原点启动服务时,可以测试对比2个或多个版本程序的性能及服务质量。
  • 暂停和恢复服务: 一个应用服务可以在新建一个 Savepoint 后再停止服务,以便于后面任何时间点再根据这个实时刷新的 Savepoint 还原点进行恢复服务。
  • 归档服务: Savepoint 还提供还原点的归档服务,以便于用户能够指定时间点的 Savepoint 的服务数据进行重置应用服务的状态,进行恢复服务。

3.如何监控和控制应用服务

如其它应用服务一样,持续运行的流应用服务也需要监控及集成到一些基础设施资源管理服务中,例如一个组件的监控服务及日志服务等。监控服务有助于预测问题并提前做出反应,日志服务提供日志记录能够帮助追踪、调查、分析故障发生的根本原因。最后,便捷易用的访问控制应用服务运行的接口也是Flink的一个重要的亮点特征。

Flink与许多常见的日志记录和监视服务集成得很好,并提供了一个REST API来控制应用服务和查询应用信息。具体表现如下:

  • Web UI方式: Flink提供了一个web UI来观察、监视和调试正在运行的应用服务。并且还可以执行或取消组件或任务的执行。
  • 日志集成服务:Flink实现了流行的slf4j日志接口,并与日志框架log4jlogback集成。
  • 指标服务: Flink提供了一个复杂的度量系统来收集和报告系统和用户定义的度量指标信息。度量信息可以导出到多个报表组件服务,包括 JMX, Ganglia, Graphite, Prometheus, StatsD, Datadog, 和 Slf4j.
  • 标准的WEB REST API接口服务: Flink提供多种REST API接口,有提交新应用程序、获取正在运行的应用程序的Savepoint服务信息、取消应用服务等接口。REST API还提供元数据信息和已采集的运行中或完成后的应用服务的指标信息。

0x3.Flink的组件

Flink运行时架构主要包括四个不同的组件,它们会在运行流处理应用程序时协同工作:作业管理器(JobManager)、资源管理器(ResourceManager)、任务管理器(TaskManager),以及分发器(Dispatcher)。因为Flink是用Java和Scala实现的,所以所有组件都会运行在Java虚拟机上。每个组件的职责如下:

1.作业管理器(JobManager)

控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager所控制执行。JobManager会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它资源的JAR包。

JobManager会把JobGraph转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),包含了所有可以并发执行的任务。

JobManager会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。

一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上。而在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。

2.资源管理器(ResourceManager)

主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger插槽是Flink中定义的处理资源单元。

Flink为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s,以及standalone部署。当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。

如果ResourceManager没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。另外,ResourceManager还负责终止空闲的TaskManager,释放计算资源。

(通信:Spark-netty框架,Flink-rpc框架)

3.任务管理器(TaskManager)

Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots)。

插槽的数量限制了TaskManager能够执行的任务数量。

启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。

JobManager就可以向插槽分配任务(tasks)来执行了。在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据。

4.分发器(Dispatcher)

可以跨作业运行,它为应用提交提供了REST接口。

当一个应用被提交执行时,分发器就会启动并将应用移交给一个JobManager。由于是REST接口,所以Dispatcher可以作为集群的一个HTTP接入点,这样就能够不受防火墙阻挡。

Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执行的信息。Dispatcher在架构中可能并不是必需的,这取决于应用提交运行的方式。

0x4.Flink的提交与运行架构

具体地,如果我们将Flink集群部署到YARN上,那么就会有如下的提交流程:

1.提交流程

Yarn-提交流程

Per-Job模式没有现成的slot

(1) 当任务执行1.SubmitJob的时候,请求Yarn的ResourceManager去开一个新的YARN Application Master进程,这个进程包含一个JobManager线程和一个属于Flink Master的ResourceManager.2.Start application 3.master with Flink master

FlowGraph

(2) JobManager负责对任务进行切分,划分任务,然后根据任务去ResourceManager 4.Request Slots

(3) RM向YarnRM请求资源,RM返回一个使用Container包装好的TaskManager 5.Start TaskManager containers

(4) TaskManager向RM注册Slots 6.Register slots

(5) 然后RM得到注册好的TaskManager之后,请求使用TaskManager7.Resuest slot,然后TaskManager 8.Offer slot 给JobManager

(6) JobManager执行任务9.Execute task

2.TaskManager与Slots

Flink中每一个worker(TaskManager)都是一个JVM进程,它可能会在独立的线程上执行一个或多个subtask。

为什么要使用Slots

为了控制一个TaskManager能够运行多个task,使用slot隔离内存,一个worker至少有一个task slot。

每个task slot表示TaskManager拥有资源的一个固定大小的子集

Task Slot

Task Slot是静态的概念,是指TaskManager具有的并发执行能力,可以通过参数taskmanager.numberOfTaskSlots进行配置;

而并行度parallelism是动态概念,即TaskManager运行程序时实际使用的并发能力,可以通过参数parallelism.default进行配置。

3.Slots与Slots共享组

TaskManager 是一个 JVM 进程,并会以独立的线程来执行一个task。

为了控制一个 TaskManager 能接受多少个 task,Flink 提出了 Task Slot 的概念,通过 Task Slot 来定义Flink 中的计算资源。

solt 对TaskManager内存进行平均分配,每个solt内存都相同,加起来和等于TaskManager可用内存,但是仅仅对内存做了隔离,并没有对cpu进行隔离。

同一个JVM不同slots内部通信

而在同一个JVM进程中的task,可以共享TCP连接(基于多路复用)和心跳消息,可以减少数据的网络传输。也能共享一些数据结构,一定程度上减少了每个task的消耗。

4.共享组示例

共享组是将一个Slot的任务链截断,另起一个Slot,增加并行度与运行效率

public class Flink02_Wc_Groups {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        //0x0 使用工具类从命令行获取参数
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String host = parameterTool.get("host");
        int port = parameterTool.getInt("port");

        //0x1 获取端口数据
        DataStreamSource<String> portDS = env.socketTextStream(host, port);

        //0x2 处理数据计算wordCount
        SingleOutputStreamOperator<Tuple2<String, Integer>> word2One = portDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = s.split(" ");
                for (String word : words) {
                    Tuple2<String, Integer> tuple2 = new Tuple2<>(word, 1);
                    collector.collect(tuple2);
                }
            }
        }).slotSharingGroup("group1");

        //0x3 打印数据
        KeyedStream<Tuple2<String, Integer>, Tuple> keyedOne = word2One.keyBy(0);
        SingleOutputStreamOperator<Tuple2<String, Integer>> one2Sum = keyedOne.sum(1).slotSharingGroup("group2");
        one2Sum.print().slotSharingGroup("group3");

        //0x4
        env.execute();

    }
}

我们简单使用Standalone-mode提交

bin/flink run \
-c com.ecust.groups.Flink01_Wc_Groups \
-p 2 \
flink-core-1.0-SNAPSHOT.jar \
--host hadoop102 
--port 9999 \

Slot共享组执行计划

任务运行后的slot使用情况观察

slot共享组slot使用情况

5.任务链

观察上述的执行图,一个小框就是一个任务链(可以有多个人subTask)

Shuffle任务不能合并任务链

能够合并的任务链

  1. one2One
  2. 并行度相同
  3. 同一个共享组

将算子链接成task是非常有效的优化:它能减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。链接的行为可以在编程API中进行指定。

任务链的合并


文章作者: Jinxin Li
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Jinxin Li !
  目录