Flink09_CEP


本文参考:

官网

https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/libs/cep.html

阿里云社区(精品)

https://developer.aliyun.com/article/738451

0x0开始

1.概览

CEP的意思是复杂事件处理,例如:起床–>洗漱–>吃饭–>上班等一系列串联起来的事件流形成的模式称为CEP。如果发现某一次起床后没有刷牙洗脸亦或是吃饭就直接上班,就可以把这种非正常的事件流匹配出来进行分析,看看今天是不是起晚了。

CEP应用实例

复杂事件处理(Complex Event Processing,CEP)API的基本架构

graph LR
A[CEP]-->B[格式:Pattern.begin]-->H[时间上下文]
B-->G[迟到数据处理]
A-->C[next]-->E[时间作用域:within]
A-->D[followedBy]-->E[时间作用域:within]

是构建在 DataStream API上的,首先需要用户创建定义一个个pattern,然后通过链表将由前后逻辑关系的pattern串在一起,构成模式匹配的逻辑表达。

然后需要用户利用NFACompiler,将模式进行分拆,创建出NFA(非确定有限自动机)对象,NFA包含了该次模式匹配的各个状态和状态间转换的表达式。整个示意图就像如下:

2.CEP的使用步骤

  1. 首先需要定义一个模式(Pattern)

  2. 接着把定义好的模式绑定在DataStream上

  3. 最后就可以在具有CEP功能的DataStream上将匹配的结果进行处理

  • 风险控制:对用户异常行为模式进行实时检测,当一个用户发生了不该发生的行为,判定这个用户是不是有违规操作的嫌疑。
  • 策略营销:用预先定义好的规则对用户的行为轨迹进行实时跟踪,对行为轨迹匹配预定义规则的用户实时发送相应策略的推广。
  • 运维监控:灵活配置多指标、多依赖来实现更复杂的监控模式。

FlinkCEP原理

Flink CEP内部是用NFA(非确定有限自动机)来实现的,由点和边组成的一个状态图,以一个初始状态作为起点,经过一系列的中间状态,达到终态。点分为起始状态中间状态最终状态三种,边分为takeignoreproceed三种。

  • take:必须存在一个条件判断,当到来的消息满足take边条件判断时,把这个消息放入结果集,将状态转移到下一状态。
  • ignore:当消息到来时,可以忽略这个消息,将状态自旋在当前不变,是一个自己到自己的状态转移。
  • proceed:又叫做状态的空转移,当前状态可以不依赖于消息到来而直接转移到下一状态。举个例子,当用户购买商品时,如果购买前有一个咨询客服的行为,需要把咨询客服行为和购买行为两个消息一起放到结果集中向下游输出;如果购买前没有咨询客服的行为,只需把购买行为放到结果集中向下游输出就可以了。 也就是说,如果有咨询客服的行为,就存在咨询客服状态的上的消息保存,如果没有咨询客服的行为,就不存在咨询客服状态的上的消息保存,咨询客服状态是由一条proceed边和下游的购买状态相连。

0x1模式API

目标:从有序的简单事件流中发现一些高阶特征

输入:一个或多个由简单事件构成的事件流

处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件

输出:满足规则的复杂事件

CEP实现的功能

处理事件的规则,被叫做“模式”(Pattern)

Flink CEP 提供了 Pattern API,用于对输入流数据进行复杂事件规则定义,用来提取符合规则的事件序列

0x2检测模式PatternAPI

个体模式(Individual Patterns)

组成复杂规则的每一个单独的模式定义,就是“个体模式”

组合模式(Combining Patterns,也叫模式序列)

很多个体模式组合起来,就形成了整个的模式序列

模式序列必须以一个“初始模式”开始

模式组(Groups of patterns)

将一个模式序列作为条件嵌套在个体模式里,成为一组模式

0x3CEP库中的时间

当一个模式通过 within 关键字定义了检测窗口时间时,部分事件序列可能因为超过窗口长度而被丢弃;为了能够处理这些超时的部分匹配,select 和 flatSelect API 调用允许指定超时处理程序

超时处理程序会接收到目前为止由模式匹配到的所有事件,由一个 OutputTag 定义接收到的超时事件序列

CEP中,事件的处理顺序很重要。在使用事件时间时,为了保证事件按照正确的顺序被处理,一个事件到来后会先被放到一个缓冲区中, 在缓冲区里事件都按照时间戳从小到大排序,当水位线到达后,缓冲区中所有小于水位线的事件被处理。这意味着水位线之间的数据都按照时间戳被顺序处理。

注意 这个库假定按照事件时间时水位线一定是正确的。

为了保证跨水位线的事件按照事件时间处理,Flink CEP库假定水位线一定是正确的,并且把时间戳小于最新水位线的事件看作是晚到的。 晚到的事件不会被处理。你也可以指定一个侧输出标志来收集比最新水位线晚到的事件,你可以这样做

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

OutputTag<String> lateDataOutputTag = new OutputTag<String>("late-data"){};

SingleOutputStreamOperator<ComplexEvent> result = patternStream
    .sideOutputLateData(lateDataOutputTag)
    .select(
        new PatternSelectFunction<Event, ComplexEvent>() {...}
    );

DataStream<String> lateData = result.getSideOutput(lateDataOutputTag);

0x4CEP算例

package com.ecust.patternapi;


import com.ecust.beans.SensorReading;
import javassist.compiler.ast.CallExpr;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;


/**
 * @author JueQian
 * @create 01-20 9:44
 * 需求,探测传感器,五秒内连续两次温度超过30度
 */

public class Flink01_CEP_Test {
    public static void main(String[] args) throws Exception {
        //0x0 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);

        //0x1 个体模式-singleton/looping 找出5秒内,连续两次温度超过30度报警 todo next
        Pattern<SensorReading, SensorReading> pattern = Pattern.<SensorReading>begin("start").where(new SimpleCondition<SensorReading>() {
            @Override
            public boolean filter(SensorReading sensorReading) throws Exception {
                return sensorReading.getTemp() > 30;
            }
        }).next("mid").where(new SimpleCondition<SensorReading>() {
            @Override
            public boolean filter(SensorReading sensorReading) throws Exception {
                return sensorReading.getTemp() > 30;
            }
        }).within(Time.seconds(5000L));

        //0x2 将数据流映射为样例类
        SingleOutputStreamOperator<SensorReading> operator = source.map(new MapFunction<String, SensorReading>() {
            @Override
            public SensorReading map(String s) throws Exception {
                String[] fields = s.split(",");
                String id = fields[0];
                long ts = Long.parseLong(fields[1]);
                double temp = Double.parseDouble(fields[2]);
                return new SensorReading(id, ts, temp);
            }
        });

        //0x3 将流进行按key分组
        KeyedStream<SensorReading, String> keyedStream = operator.keyBy(SensorReading::getId);

        //0x4 将pattern作用于流
        PatternStream<SensorReading> patternStream = CEP.pattern(keyedStream, pattern);

        //0x5 将流选择出来
        SingleOutputStreamOperator<String> select = patternStream.select(new PatternSelectFunction<SensorReading, String>() {
            @Override
            public String select(Map<String, List<SensorReading>> map) throws Exception {
                //注意这个数据格式Map<定义的模式名,符合模式名的事件列表>
                SensorReading start = map.get("start").get(0);
                SensorReading end = map.get("mid").get(0);
//                return start.get(0).getId()+"在"+start.get(0).getTs()+"-"+end.get(0).getTs()+"温度超过30度不下降,当前温度"+end.get(0).getTemp();
                return start+"-"+end;
            }
        });

        //0x6 打印
        select.print();

        //0x7 执行计划
        env.execute();
    }
}

文章作者: Jinxin Li
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Jinxin Li !
  目录