Flink04_Timestamp与window的实战与源码分析


Table table = input
  .window([GroupWindow w].as("w"))  // define window with alias w
  .groupBy("w, a")  // group the table by attribute a and window w 
  .select("a, w.start, w.end, w.rowtime, b.count"); // aggregate and add window start, end, and rowtime timestamps
//Tumble ----------
// Tumbling Event-time Window
.window(Tumble.over("10.minutes").on("rowtime").as("w"));

// Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
.window(Tumble.over("10.minutes").on("proctime").as("w"));

// Tumbling Row-count Window (assuming a processing-time attribute "proctime")
.window(Tumble.over("10.rows").on("proctime").as("w"));    

0x1.Windows概述

Streaming流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而window是一种切割无限数据为有限块进行处理的手段。

Window是无限数据流处理的核心,Window将一个无限的stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作

/**
 * @author JueQian
 * @create 01-12 11:59
 * 概述:
 * 窗口类型分两种:时间窗口 计数窗口
 * 窗口功能分三种:滚动 滑动 会话(时间窗口特有)
 * 窗口方法分两种:增量(aggregate:sum,min,max)全量(apply)
 */
public class Flink01_TimeWindow {
    public static void main(String[] args) throws Exception {
        //0x0 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        //0x1 从执行环境中获取
        DataStreamSource<SensorReading> sensorSource = env.addSource(new Flink05_Source_UDFSource.MySource());

        //0x2 使用时间窗口统计每个窗口的最高温度
        WindowedStream<SensorReading, String, TimeWindow> windowStream = sensorSource.keyBy(SensorReading::getId).timeWindow(Time.seconds(5));

        SingleOutputStreamOperator<SensorReading> temp = windowStream.maxBy("temp");

        //0x3 打印
        temp.print("最高温度");

        //0x4 执行
        env.execute();
    }
}

窗口函数分两种

graph LR
A[窗口函数]-->B[增量窗口函数]
A-->C[全量窗口函数]

Window可以分成两类:

CountWindow:按照指定的数据条数生成一个Window,与时间无关。

TimeWindow:按照时间生成Window。

注意

CountWindow没有Session,比如隔着几条没有来数据???显然不符合逻辑

graph LR
A[window]-->B[CountWindow]
A-->C[TimeWindow]
C-->D[Tumbing Window]
C-->E[Sliding window]
C-->F[Session Window]
B-->G[Tumbing Window]
B-->H[Sliding window]

1.TimeWindow

对于TimeWindow,可以根据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。

1.滚动窗口(Tumbling Windows)

将数据依据固定的窗口长度对数据进行切片。

特点:时间对齐,窗口长度固定,没有重叠。

滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个5分钟大小的滚动窗口,窗口的创建如下图所示:

滚动窗口示意图

适用场景:适合做BI统计等(做每个时间段的聚合计算)。

2.滑动窗口(Sliding Windows)

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。

特点:时间对齐,窗口长度固定,可以有重叠。

滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。

因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。

例如,你有10分钟的窗口和5分钟的滑动,那么每个窗口中5分钟的窗口里包含着上个10分钟产生的数据,如下图所示:

滑动窗口示意图

适用场景:对最近一个时间段内的统计(求某接口最近5min的失败率来决定是否要报警)。

image-20210112170315668

3. 会话窗口(Session Windows)

由一系列事件组合一个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。

特点:时间无对齐。

session窗口分配器通过session活动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。

一个session窗口通过一个session间隔来配置,这个session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的session将关闭并且后续的元素将被分配到新的session窗口中去。

时间回话窗口

2.CountWindow

CountWindow根据窗口中相同key元素的数量来触发执行,执行时只计算元素数量达到窗口大小的key对应的结果。

注意:CountWindow的window_size指的是相同Key的元素的个数,不是输入的所有元素的总数。

1.滚动窗口

默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。

DataStream<SensorReading> minTempPerWindowStream = 
    dataStream
    .keyBy(SensorReading::getId)
    .countWindow(5)
    .minBy("temperature");

2.滑动窗口

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。

下面代码中的sliding_size设置为了2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的window范围是10个元素。

DataStream<SensorReading> minTempPerWindowStream = 
    dataStream
    .keyBy(SensorReading::getId)
    .countWindow(10,2)
    .minBy("temperature");

3.WindowFunction

window function 定义了要对窗口中收集的数据做的计算操作,主要可以分为两类:

增量聚合函数(incremental aggregation functions)

每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有ReduceFunction, AggregateFunction

全窗口函数(full window functions)

先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。ProcessWindowFunction就是一个全窗口函数。

4.功能API

KeyedWindows

1.trigger() —— 触发器

定义 window 什么时候关闭,触发计算并输出结果

2.evitor() —— 移除器

定义移除某些数据的逻辑

3.allowedLateness() —— 允许处理迟到的数据

4.sideOutputLateData() —— 将迟到的数据放入侧输出流

5.getSideOutput() —— 获取侧输出流

Non_KeyedWindows

WindowAll()

trigger()

evictor()

allowedLateness()

sideOutoutLateData()

getSideOutput

0x2.时间语义与Watermark

1.Flink中的时间语义

在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:

Flink的时间语义

Event Time:日志自己记录的时间

StreamExecutionEnvironment env = StreamExecutionEnvironment
    .getExecutionEnvironment
    // 从调用时刻开始给env创建的每一个stream追加时间特性
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //在后续指定时间时间语义

Ingestion Time:是数据进入Flink的时间

Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。

2.乱序数据的产生

原因:由于网络、分布式等原因,导致乱序的产生,接受数据时间不按照实际时间

测试Kafka

2> sensor_0:2021-01-13 12:56:34
2> sensor_0:2021-01-13 12:56:36
2> sensor_0:2021-01-13 12:56:38
2> sensor_0:2021-01-13 12:56:40
2> sensor_0:2021-01-13 12:56:42
1> sensor_0:2021-01-13 15:52:53//跑到12.56.44前面
2> sensor_0:2021-01-13 12:56:44
2> sensor_0:2021-01-13 12:56:46
2> sensor_0:2021-01-13 12:56:48
1> sensor_0:2021-01-13 15:52:55
2> sensor_0:2021-01-13 12:56:50
2> sensor_0:2021-01-13 12:56:52
1> sensor_0:2021-01-13 15:52:57
2> sensor_0:2021-01-13 12:56:54
1> sensor_0:2021-01-13 15:52:59
2> sensor_0:2021-01-13 12:56:56
1> sensor_0:2021-01-13 15:53:01

3.waterMark的引入

//0x0 要求使用时间语义
StreamExecutionEnvironment env = StreamExecutionEnvironment
    .getExecutionEnvironment
    //从调用时刻开始给env创建的每一个stream追加时间特性
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //在后续指定时间时间语义
    
//0x1 要求指定时间语义
dataStream.assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor<SensorReading>
    (Time.milliseconds(1000)) {
        @Override
        public long extractTimestamp(element: SensorReading): Long = {
            return element.getTimestamp() * 1000L;
        }
    });

翻译成水印水位线

WaterMark用于关闭窗口

重点

Watermark本质是一个插入到流中的时间戳,规定了窗口关闭的时间.

在00:00开始5min窗口

定义Watermark+1min

则窗口关闭时间为00:06

在这延迟的一分钟内,如果到达时间依然在0-5min之内,依然可以进入窗口

(目的是为了让延迟数据也能进入窗口,因为比timestamp要小)

说白了

窗口:存储规定时间段内发生的事件

WaterMark:让时间字段发生在窗口内,但是迟到了的数据进入窗口

WaterMark与窗口

0x3.Window的源码解析

1.格林威治时间

点击

timewindow->of

滑动窗口代码

public static TumblingProcessingTimeWindows of(Time size) {
        return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0L);
    }

如果开一天窗口

//0x3 分组
      KeyedStream<SensorReading, String> one2Sensor = sensorWithTime.keyBy(SensorReading::getId);

      //0x4 开窗
      SingleOutputStreamOperator<SensorReading> result = one2Sensor.timeWindow(Time.seconds(5)).min("temp");

      //由于是UTC格林兰治的时间
      one2Sensor.timeWindow(Time.days(1));//这个window是早八点到八点
      //如果国内想开一天的窗口
  one2Sensor.window(TumblingProcessingTimeWindows.of(Time.days(1),Time.hours(-8)));

2.关于窗口整点问题

左闭右开

窗口是一个桶

2.1窗口整数问题源码处理

源码

timewindow->TumblingProcessingTimeWindows->assignWindows->getWindowStartWithOffset

 //计算第一个窗口的开始时间
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
        return timestamp - (timestamp - offset + windowSize) % windowSize;
    }

参数解析:

offset=0 (关于东八区偏移时间)

timestamp=0(关于时间戳)

windowSize保证时间为正数

计算,假如我们在20:36输入15min的 滚动窗口

可以将上述公式简化为下面公式
$$
timestamp - timestamp/windowSize
$$
举例:

10-10/6 = 6

先使用时间戳对窗口大小取余

然后减去这个余数,为第一个窗口开始时间

2.2左闭右开源码(end-1)

timewindow.java->end

//当前窗口所属的最大时间
@Override
	public long maxTimestamp() {
		return end - 1;
	}

2.3滑动窗口属于多个窗口源码解析

timeWindow->SlidingProcessingTimeWindows.class->assignWindows

public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        timestamp = context.getCurrentProcessingTime();
        List<TimeWindow> windows = 
            new ArrayList((int)(this.size / this.slide));//一下子计算出将数据属于的窗口
        long lastStart = TimeWindow
            .getWindowStartWithOffset(
            	timestamp, 
            	this.offset, 
            	this.slide);//windowsize是把滑动步长当做slide
        for(long start = lastStart; //遍历
            start > timestamp - this.size; 
            start -= this.slide) {//进去slide
            windows.add(new TimeWindow(start, start + this.size));
        }
        return windows;
    }
//getWindowStartWithOffset
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
		return timestamp - (timestamp - offset + windowSize) % windowSize;
	}

滑动窗口源码

滑动窗口源码示意图

总结

一个窗口15秒,滑动步长为5秒,此时存在三个滑动小窗口

源码内是吧滑动的一步当做一个滚动窗口进行计算,

然后把滚动计算放置到一个ArrayList中,统计计算

源码是当一个15秒窗口,先计算出最后的一个小窗口,往前推

0x4.==深入了解WaterMark使用与源码==

问题还没解决!!!!

简单解析

waterMark的时间戳本质:延长窗口关闭时间

[window:0-5]

[watermark:2]

接受数据[0-5]

关闭时间[7]

1.WaterMark使用Example

数据案例 求5秒内,2秒watermark,2秒网络延迟条件下的最大延迟时间

sensor_1,1610506280,10
sensor_1,1610506281,20
sensor_1,1610506282,30
sensor_1,1610506283,40
sensor_1,1610506284,50
sensor_1,1610506285,60
sensor_1,1610506286,70
sensor_1,1610506287,80
sensor_1,1610506288,90
sensor_1,1610506290,100
package com.ecust.watermark;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.OutputTag;

/**
 * @author JueQian
 * @create 01-13 11:43
 * 延迟发车 waterMark 包容数据的混乱度
 * 允许迟到时间发车等人 allowedLateness 允许迟到数据,包容网络的延迟
 * 侧输出流 sideOutputStream 保证数据的准确一致性(牺牲了一点准确性,保证了高效性)
 * todo 测输出流不参与计算,需要统计然后单独处理
 */
public class Flink02_Window_WaterMark_Lateness {
    public static void main(String[] args) throws Exception {

        //0x0 定义执行环境,并且从端口读取数据
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> socketSource = env.socketTextStream("hadoop102", 9999);

        //0x1 将数据添加上数据结构
        SingleOutputStreamOperator<Tuple3<String, Long, Double>> mapStream = socketSource.map(new MapFunction<String, Tuple3<String, Long, Double>>() {
            @Override
            public Tuple3<String, Long, Double> map(String s) throws Exception {
                String[] fields = s.split(",");
                return new Tuple3<>(fields[0], Long.parseLong(fields[1])*1000, Double.parseDouble(fields[2]));
            }
        });

        //0x2 指定时间语义 设定waterMark为两秒
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        SingleOutputStreamOperator<Tuple3<String, Long, Double>> watermarks = mapStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, Double>>(Time.seconds(2)) {
            @Override
            public long extractTimestamp(Tuple3<String, Long, Double> stringLongDoubleTuple3) {
                return stringLongDoubleTuple3.f1;
            }
        });

        //0x3 开窗,设定窗口时间与测输出流
        WindowedStream<Tuple3<String, Long, Double>, Tuple, TimeWindow> timeWindow = watermarks.keyBy(1).timeWindow(Time.seconds(5));
        WindowedStream<Tuple3<String, Long, Double>, Tuple, TimeWindow> windowedStream = timeWindow
                .allowedLateness(Time.seconds(2))
                .sideOutputLateData(new OutputTag<Tuple3<String, Long, Double>>("side") {});
                //计算窗口时间内的温度的最大值
        SingleOutputStreamOperator<Tuple3<String, Long, Double>> streamOperator = windowedStream.maxBy(2);
        //0x4 打印
        streamOperator.print("主输出流");
        streamOperator.getSideOutput(new OutputTag<Tuple3<String, Long, Double>>("side") {}).print("测输出流");

        //0x5 执行
        env.execute();
    }
}

2.源码解析assignTimestampsAndWatermarks

assignTimestampsAndWatermarks->
    AssignerWithPeriodicWatermarks->
    ctrl+H->

周期性waterMark继承关系图

自增时间

AscendingTimestampExtractor()//自增

使用自己的时间 ,时间自增

源码:

waterMark是一个衡量时间进展机制

public final long extractTimestamp(T element, long elementPrevTimestamp) {
        long newTimestamp = this.extractAscendingTimestamp(element);
        if (newTimestamp >= this.currentTimestamp) {
            this.currentTimestamp = newTimestamp;
            return newTimestamp;
        } else {
            this.violationHandler.handleViolation(newTimestamp, this.currentTimestamp);
            return newTimestamp;
        }
    }

    public final Watermark getCurrentWatermark() {
        return new Watermark(this.currentTimestamp == -9223372036854775808L ? -9223372036854775808L : this.currentTimestamp - 1L);
    }

3.WaterMark的传递

Such an operator’s current event time is the minimum of its input streams’ event times. As its input streams update their event times, so does the operator.

算子的执行时间取输入流中最小的事件时间

WaterMark传递机制

周期性每200ms产生一个新数据

  1. watermark是往下游广播的
  2. 当所有waterMark中的最小的WaterMark

测试

package com.ecust.watermark;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.OutputTag;

/**
 * @author JueQian
 * @create 01-13 11:43
 * 测试一下在文件流经常使用的自增时间
 */
public class Flink04_Window_WaterMark_AscendingTimes {
    public static void main(String[] args) throws Exception {

        //0x0 定义执行环境,并且从端口读取数据
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> socketSource = env.socketTextStream("hadoop102", 9999);

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //使用自增时间语义
        SingleOutputStreamOperator<String> assignTimestampsAndWatermarks = socketSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<String>() {
            @Override
            public long extractAscendingTimestamp(String s) {
                String[] fields = s.split(",");
                return Long.parseLong(fields[1])*1000L;
            }
        });

        //0x1 将数据添加上数据结构 进行第一个map的时候是进行hash分布
        SingleOutputStreamOperator<Tuple3<String, Long, Double>> mapStream = assignTimestampsAndWatermarks.map(new MapFunction<String, Tuple3<String, Long, Double>>() {
            @Override
            public Tuple3<String, Long, Double> map(String s) throws Exception {
                String[] fields = s.split(",");
                return new Tuple3<>(fields[0], Long.parseLong(fields[1])*1000, Double.parseDouble(fields[2]));
            }
        });

        //0x3 开窗,设定窗口时间与测输出流
        WindowedStream<Tuple3<String, Long, Double>, Tuple, TimeWindow> timeWindow = mapStream.keyBy(1).timeWindow(Time.seconds(5));
        WindowedStream<Tuple3<String, Long, Double>, Tuple, TimeWindow> windowedStream = timeWindow
                .allowedLateness(Time.seconds(2))
                .sideOutputLateData(new OutputTag<Tuple3<String, Long, Double>>("side") {});

        SingleOutputStreamOperator<Tuple3<String, Long, Double>> streamOperator = windowedStream.maxBy(2);

        //0x4 打印
        streamOperator.print("主输出流");
        streamOperator.getSideOutput(new OutputTag<Tuple3<String, Long, Double>>("side") {}).print("测输出流");

        //0x5 执行
        env.execute();
    }
}
#测试数据信息----------------
waterMark = 2
window=5
#-------------------------
sensor_1,1610506279,5

sensor_1,1610506282,30
sensor_1,1610506283,40
#在282理应生成数据,但是输入283之后才会生成数据

4.AssignerWithPeriodicWatermarks

会定时抽取waterMark

//默认200ms
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
    this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
    if (characteristic == TimeCharacteristic.ProcessingTime) {
        getConfig().setAutoWatermarkInterval(0);
    } else {
        getConfig().setAutoWatermarkInterval(200);
    }
}

触发机制

window的触发要符合以下几个条件:

  1. watermark时间 >= window_end_time
  2. 在[window_start_time,window_end_time)中有数据存在

同时满足了以上2个条件,window才会触发。

watermark是一个全局的值,不是某一个key下的值,所以即使不是同一个key的数据,其warmark也会增加.

WaterMark在数据中表示小于等于WaterMark的数据全部到齐

5.WaterMark源码

example

//指定时间语义
SingleOutputStreamOperator<String> assignTimestampsAndWatermarks = socketSource.
                assignTimestampsAndWatermarks(
                        new BoundedOutOfOrdernessTimestampExtractor<String>
                                (Time.milliseconds(200)) {
            @Override
            public long extractTimestamp(String s) {
                String[] fields = s.split(",");
                return Long.parseLong(fields[1]) * 1000L;
            }
        });

5.1 获取WaterMark的方法类

点击方式

new BoundedOutOfOrdernessTimestampExtractor->
    getCurrentWatermark()

WaterMark生成机制

BoundedOutOfOrdernessTimestampExtractor源码

public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {

	private static final long serialVersionUID = 1L;

	/** 目前为止最大的时间戳 */
	private long currentMaxTimestamp;

	/** 最近从上游提交的时间戳,默认为Long的最小值*/
	private long lastEmittedWatermark = Long.MIN_VALUE;

	/**最大无序度*/
	private final long maxOutOfOrderness;

	public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
		if (maxOutOfOrderness.toMilliseconds() < 0) {
			throw new RuntimeException("Tried to set the maximum allowed " +
				"lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
		}
        //给无序度赋值
		this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
		this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
	}
	
    
    //得到最大的无序度,在程序的参数里指定的
	public long getMaxOutOfOrdernessInMillis() {
		return maxOutOfOrderness;
	}

	/**
	 * Extracts the timestamp from the given element.
	 */
	public abstract long extractTimestamp(T element);

	@Override
	public final Watermark getCurrentWatermark() {
		// this guarantees that the watermark never goes backwards.
        // 因为存在乱序数据,所以需要保持时间戳不能倒退
        // 保证时间戳不会倒退 一直使用最小的水位线作为真正的水位线
        
        //自增原理:只有生成的waterMark,比当前的waterMark大的时候才会进行赋值
		long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
		if (potentialWM >= lastEmittedWatermark) {
			lastEmittedWatermark = potentialWM;
		}
		return new Watermark(lastEmittedWatermark);
	}

	@Override
	public final long extractTimestamp(T element, long previousElementTimestamp) {
		long timestamp = extractTimestamp(element);
		if (timestamp > currentMaxTimestamp) {
			currentMaxTimestamp = timestamp;
		}
		return timestamp;
	}
}

WaterMark类

点击方式

直接搜waterMark

/**
 不缓存元素的算子在他们得到watermark之后会向下游算子发送waterMark
 缓存元素的算子比如window operators,必须waterMark之后往下游发送数据,需要watermark触发计算
 */
@PublicEvolving
public final class Watermark extends StreamElement {

	/** The watermark that signifies end-of-event-time. */
	public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);

	// ------------------------------------------------------------------------

	/** The timestamp of the watermark in milliseconds. */
	private final long timestamp;

	/**
	 * Creates a new watermark with the given timestamp in milliseconds.
	 */
	public Watermark(long timestamp) {
		this.timestamp = timestamp;
	}

	/**
	 * Returns the timestamp associated with this {@link Watermark} in milliseconds.
	 */
	public long getTimestamp() {
		return timestamp;
	}
}

RDD中有很多实现类,每一个RDD的实现类里

SparkRDD中都有一个compute方法,我们传入逻辑,改变compute方法,才能进行计算

Flink中所有的map与flatmap都对应一个(StreamOperator的实现类)

WaterMark生成算子

点击方式

map.assignTimestampsAndWatermarks->
    new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);
StreamOperator->
     TimestampsAndPeriodicWatermarksOperator

源码解析

/**
 * 流操作算子,抓取流中的元素生成waterMark
 * @param <T> The type of the input elements
 */
public class TimestampsAndPeriodicWatermarksOperator<T>
		extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>>
		implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {

	private static final long serialVersionUID = 1L;

	private transient long watermarkInterval;

	private transient long currentWatermark;

	public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) {
		super(assigner);
		this.chainingStrategy = ChainingStrategy.ALWAYS;
	}

	@Override
	public void open() throws Exception {
		super.open();

		currentWatermark = Long.MIN_VALUE;
		watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();

		if (watermarkInterval > 0) {
			long now = getProcessingTimeService().getCurrentProcessingTime();
			getProcessingTimeService().registerTimer(now + watermarkInterval, this);
		}
	}

	@Override
	public void processElement(StreamRecord<T> element) throws Exception {
		final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
				element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

		output.collect(element.replace(element.getValue(), newTimestamp));
	}

	@Override
	public void onProcessingTime(long timestamp) throws Exception {
		// register next timer
		Watermark newWatermark = userFunction.getCurrentWatermark();
		if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
			currentWatermark = newWatermark.getTimestamp();
			// emit watermark
			output.emitWatermark(newWatermark);
		}

		long now = getProcessingTimeService().getCurrentProcessingTime();
		getProcessingTimeService().registerTimer(now + watermarkInterval, this);
	}

	/**
	 * Override the base implementation to completely ignore watermarks propagated from
	 * upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit
	 * watermarks from here).
	 */
	@Override
	public void processWatermark(Watermark mark) throws Exception {
		// if we receive a Long.MAX_VALUE watermark we forward it since it is used
		// to signal the end of input and to not block watermark progress downstream
		if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
			currentWatermark = Long.MAX_VALUE;
			output.emitWatermark(mark);
		}
	}

	@Override
	public void close() throws Exception {
		super.close();

		// emit a final watermark 如果新的waterMark比当前的waterMark大,就发送waterMark
		Watermark newWatermark = userFunction.getCurrentWatermark();
		if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
			currentWatermark = newWatermark.getTimestamp();
			// emit watermark
			output.emitWatermark(newWatermark);
		}
	}
}

6.waterMark实战分析

数据

sensor_1,1610506279,5
sensor_1,1610506280,10
sensor_1,1610506281,20
sensor_1,1610506282,30
sensor_1,1610506283,40
sensor_1,1610506284,50
sensor_1,1610506285,60
sensor_1,1610506286,70
sensor_1,1610506287,80
sensor_1,1610506290,100
sensor_1,1610506292,100

程序

package com.ecust.watermark;

import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import javax.annotation.Nullable;

/**
 * @author JueQian
 * @create 01-14 15:17
 * 这个程序主要根据源码重写waterMark方法
 */
public class Flink07_Window_WaterMark_MyTest {
    public static void main(String[] args) throws Exception {
        //0x0 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        //0x1 设定事件时间语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //0x2 设置生成watermark的周期
        env.getConfig().setAutoWatermarkInterval(2000);

        //0x3 获取nc数据
        DataStreamSource<String> socketTextStream = env.socketTextStream("hadoop102", 9999);

        //0x4 指定事件时间字段
        SingleOutputStreamOperator<String> watermarks = socketTextStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<String>() {
            long currentTimeStamp = 0L;//用于存储当前元素的时间语义

            //允许迟到的数据
            long maxDelayAllowed = 2000L;
            //当前水位线
            long currentWaterMark;

            private long lastEmittedWatermark = -9223372036854775808L;

            @Nullable
            @Override
            public Watermark getCurrentWatermark() {
                long potentialWM;//定义判断水位线

                potentialWM = currentTimeStamp - maxDelayAllowed;
                //保证waterMark自增
                if (potentialWM >= lastEmittedWatermark) {
                    lastEmittedWatermark = potentialWM;
                }
                System.out.println("当前水位线:" + lastEmittedWatermark);
                return new Watermark(lastEmittedWatermark);
            }

            @Override
            public long extractTimestamp(String s, long l) {
                String[] fields = s.split(",");
                long timeStamp = Long.parseLong(fields[1])*1000L;
                //取最大的作为当前时间戳
                currentTimeStamp = Math.max(timeStamp, currentTimeStamp);
                System.out.println("key:" + fields[0] + ",EventTime:" + timeStamp + ",水位线:" + currentWaterMark);
                return timeStamp;
            }
        });

        //0x4 转换为键值对
        SingleOutputStreamOperator<Tuple2<String, Integer>> string2One = watermarks.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                String[] fields = s.split(",");
                return new Tuple2<>(fields[0],1);
            }
        });

        //0x5 分组开窗处理数据
        string2One.keyBy(0).timeWindow(Time.seconds(5)).sum(1).print(">>>>>>>>>>>>输出数据>>>>>>>>>");

        //0x6 执行数据
        env.execute();
    }
}

Windowing:就是负责该如何生成Window,比如Fixed Window、Slide Window,当你配置好生成Window的策略时,Window就会根据时间动态生成,最终得到一个一个的Window,包含一个时间范围:[起始时间, 结束时间),它们是一个一个受限于该时间范围的事件记录的容器,每个Window会收集一堆记录,满足指定条件会触发Window内事件记录集合的计算处理。

WaterMark:它其实不太好理解,可以将它定义为一个函数E=f(P),当前处理系统的处理时间P,根据一定的策略f会映射到一个事件时间E,可见E在坐标系中的表现形式是一条曲线,根据f的不同曲线形状也不同。假设,处理时间12:00:00,我希望映射到事件时间11:59:30,这时对于延迟30秒以内(事件时范围11:59:30~12:00:00)的事件记录到达处理系统,都指派到时间范围包含处理时间12:00:00这个Window中。事件时间超过12:00:00的就会由Trigger去做补偿了。

Trigger:为了满足实际不同的业务需求,对上述事件记录指派给Window未能达到实际效果,而做出的一种补偿,比如事件记录在WaterMark时间戳之后到达事件处理系统,因为已经在对应的Window时间范围之后,我有很多选择:选择丢弃,选择是满足延迟3秒后还是指派给该Window,选择只接受对应的Window时间范围之后的5个事件记录,等等,这都是满足业务需要而制定的触发Window重新计算的策略,所以非常灵活。

6.1 总结

watermark本质是一个不断生成的在流中的时间戳

waterMark是真正触发缓存算子计算的时间

用于处理乱序数据

7.WaterMark实战分析2

自定义数据源

package com.ecust.source;

import com.ecust.beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

/**
 * @author Jinxin Li
 * @create 2021-01-08 21:07
 */
public class Flink05_Source_UDFSource {
    public static void main(String[] args) throws Exception {
        //0x0 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //0x1 获取执行环境
        DataStreamSource<SensorReading> sensorDS = env.addSource(new MySource());

        //0x2 打印环境
        SingleOutputStreamOperator<String> map = sensorDS.map(new MapFunction<SensorReading, String>() {
            @Override
            public String map(SensorReading sensorReading) throws Exception {
                return sensorReading.getId() + "," + sensorReading.getTs() + "," + sensorReading.getTemp();
            }
        });
        map.print();

        //0x3 启动环境
        env.execute();
    }

    //0x0 定义自定义数据源
    public static class MySource implements SourceFunction<SensorReading>{

        //定义标记控制数据的运行
        private boolean running = true;

        //定义一个随机数据
        private Random random = new Random();


        //定义基准温度数组
        private Map<String,SensorReading> map = new HashMap<String,SensorReading>();

        @Override
        public void run(SourceContext<SensorReading> sourceContext) throws Exception {
            //0x0 造数据 给传感器赋值
            for (int i = 0; i < 1; i++) {
                SensorReading sensorReading = map.put("sensor_" + i, new SensorReading("sensor_" + (i+1), System.currentTimeMillis()-10000000L, 60D+random.nextGaussian() * 20));
            }
            while (running){
                //0x1 发送数据
                for (String s : map.keySet()) {
                    //设定温度
                    double v = map.get(s).getTemp() + random.nextGaussian() * 4;
                    //每次温度都要增大
                    map.get(s).setTemp(v);

                    //设定时间每次都要增大
                    long ts = map.get(s).getTs() + random.nextInt(1000);
                    SensorReading sensorReading = map.get(s);
                    sensorReading.setTs(ts);
//                    System.out.println(i);
//                    System.out.println(map.get(s).getTs());

                    //写出数据
                    sourceContext.collect(sensorReading);

                    //停顿一下
                    Thread.sleep(10);
                }
            }
        }

        @Override
        public void cancel() {
            running=false;

        }
    }
}

模拟实战数据

package com.ecust.watermark;

import com.ecust.beans.SensorReading;
import com.ecust.source.Flink05_Source_UDFSource;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;

import javax.annotation.Nullable;

/**
 * @author JueQian
 * @create 01-14 15:17
 * 这个程序主要根据源码重写waterMark方法
 * 在上个程序中,主要测试了watermark的生成
 * 但是使用的是端口数据
 * 端口数据发送较慢,一般一个数据后面会生成多个waterMark,不符合正常使用
 * 使用自定义数据源
 *
 * 参数:watermark间隔:2s
 * 滚动窗口:5s
 * 无序度:2s
 * 事件产生速率:0.3s
 */
public class Flink08_Window_WaterMark_MyTest1 {
    public static void main(String[] args) throws Exception {

        //0x0 获取执行环境 并配置一些环境参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.getConfig().setAutoWatermarkInterval(2000L);//好像没用 啊,亲
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //0x1 获取自定义数据源
        DataStreamSource<SensorReading> sensor = env.addSource(new Flink05_Source_UDFSource.MySource());
        SingleOutputStreamOperator<SensorReading> singleOutputStreamOperator = sensor.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<SensorReading>() {
            //当前流时间
            long currentMaxTimestamp = 0L;
            //当前流waterMark
            long lastEmittedWatermark = -9223372036854775808L;
            //当前流无序度
            long maxOutOfOrderness = 2000L;

            @Override
            public Watermark getCurrentWatermark() {
                long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
                //保证自增
                if (potentialWM >= this.lastEmittedWatermark) {
                    lastEmittedWatermark = potentialWM;
                }
                System.out.println("当前WaterMark为:" + lastEmittedWatermark);
                return new Watermark(lastEmittedWatermark);
            }

            @Override
            public long extractTimestamp(SensorReading sensorReading, long l) {
                Long timestamp = sensorReading.getTs();
                //如果事件的时间比较小,则不会更新时间戳
                currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);
                System.out.println("key:" + sensorReading.getId() + ",EventTime:" + timestamp + ",水位线:" + lastEmittedWatermark);
                return timestamp;
            }
        });

        singleOutputStreamOperator.keyBy(SensorReading::getId).timeWindow(Time.seconds(5)).maxBy("temp").print("输出结果>>>>>>>>>>>>>");

        env.execute("test");
    }
}

结果

当前WaterMark:1610660784344
    //更新水位线 结束水位线之前的时间戳的窗口 
key:sensor_1,EventTime:1610660787098,水位线:1610660784344//[85000,90000]
key:sensor_1,EventTime:1610660787819,水位线:1610660784344//[85000,90000]
key:sensor_1,EventTime:1610660788714,水位线:1610660784344//[85000,90000]
key:sensor_1,EventTime:1610660789119,水位线:1610660784344//[85000,90000]
key:sensor_1,EventTime:1610660789771,水位线:1610660784344//[85000,90000]
key:sensor_1,EventTime:1610660790678,水位线:1610660784344//[90000,95000]
key:sensor_1,EventTime:1610660791328,水位线:1610660784344//[90000,95000]
key:sensor_1,EventTime:1610660791645,水位线:1610660784344//[90000,95000]
	...
key:sensor_1,EventTime:1610660795319,水位线:1610660784344//[95000,100000]
key:sensor_1,EventTime:1610660795944,水位线:1610660784344//[95000,100000]
key:sensor_1,EventTime:1610660796642,水位线:1610660784344//[95000,100000]
key:sensor_1,EventTime:1610660796926,水位线:1610660784344//[95000,100000]
当前WaterMark:1610660794926 //[更新水位线时间是94926>90000 关闭90000之前的窗口]
key:sensor_1,EventTime:1610660797035,水位线:1610660794926
key:sensor_1,EventTime:1610660797673,水位线:1610660794926
key:sensor_1,EventTime:1610660798467,水位线:1610660794926
输出>> SensorReading(id=sensor_1, ts=1610660782065, temp=71.80984937005488)
输出>> SensorReading(id=sensor_1, ts=1610660789119, temp=93.17010762180462)

7.1总结

问题:不知道为什么水位线不更新(200ms)

每200ms更新一次水位线,如果水位线更新之后时间,存在窗口没有关闭,则触发计算

window是一种缓存算子

graph LR
A[window]-->B[timeWindow]
A-->C[globalWindow]
  • Collections.singletonList()返回的是不可变的集合,但是这个长度的集合只有1,可以减少内存空间。但是返回的值依然是Collections的内部实现类,同样没有add的方法,调用add,set方法会报错

0x5 WaterMark与Window的源码结构总结

1.WaterMark与时间戳提取器源码结构

用于提取时间戳与waterMark

graph TD
A[TimestampAssigner]-->B[AssignerWithPeriodicWatermarks]
A-->C[AssignerWithPunctuatedWatermarks]
B-->D[AscendingTimestampExtractor]
B-->G[BoundedOutOfOrdernessTImestamoExtractor]

2.数据流源码结构

数据流种类

graph TD
A[DataStream]-->B[SingleOutputStreamOperator]
A-->C[SplitStream]
A-->D[KeyedStream]
B-->G[IterativeStream]
B-->E[DataStreamSource]

3.算子源码结构

算子源码种类与结构

graph TD
A[StreamOperator]-->B[TwoInputOperator]
A-->C[OneInputStreamOperator]
C-->D[TimestampsAndPeriodicWaterMarksOperator]
C-->F[WindowOperator]
C-->G[StreamMap]
C-->I[...]

4.WaterMark使用源码流程

如果获取WaterMark以及如何往下级发送

先提取事件事件,然后获得当前的WaterMark

graph TD
A[提取-AssignerWithPeriodicWatermarks]-->B[extractTimestamp]-->C[getCurrentWatermark]

结论:所以每个record的waterMark都是上一级的waterMark,因为每次获取事件的时间戳之后都要减去无序度

循环发送waterMark

执行:TimestampsAndPeriodicWatermarksOperator

graph TD
A[提取-Operator]-->B[open-定时200ms]--响应-->C[onTime-getCurrentWaterMark]
C--回调-->D[定时200ms]
D--响应-->C
C-->E{new>current}
E--emitWatermark-->F[下级算子]

结论:使用定时器来回调实现每隔200s进行发送,可以设置,但是只有当取生成的waterMark比现在的waterMark大才会进行往下级发送

5.WaterMark类源码继承树结构

graph TD
A[StreamElement]-->B[StreamRecord]
A-->C[StreamStatus]
A-->D[WaterMark]
A-->E[latencyMarker]

**结论:**waterMark本质是时间戳,跟Record一样,在流中

6.总结:使用WaterMark具体流程 抽象

指定-提取-发送

water抽象示意图

0x6 Window源码继承关系

上面图已经讲明白,KeyedStream继承于DataStream

1.KeyedStream的Window方法

graph LR
A[KeyedStream]-->B[m-timeWindow]-->H[window/TumblingEventTimeWindows]
A-->C[m-countwindow]-->I[window/GlobalWindows.create]
A-->D[m-window]-->E[WindowedStream/this assigner]

**结论:**window的方法比如timewindow,底层都是window(windowAssigner)执行的

而window的底层是windowStream

2.windowAssigner的作用与源码

Flink has several built-in types of window assigners, which are illustrated below:

Window assigners

* A {@code WindowAssigner} assigns zero or more {@link Window Windows} to an element.
*
* <p>In a window operation, elements are grouped by their key (if available) and by the windows to which it was assigned. The set of elements with the same key and window is called a pane.
* When a {@link Trigger} decides that a certain pane should fire the
* {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} is applied
* to produce output elements for that pane.

主要用是分配一个或者多个窗口给新来的元素

一个元素的集合拥有相同的key与窗口称为窗格

watermark之后Trigger触发WindowFunction的计算

graph LR
A[WindowAssigner]-->B[SildingEventTimeWIndows]
A-->C[TumbingEventTimeWIndows]
A-->D[...]
/** The window assigner. */
private final WindowAssigner<? super T, W> windowAssigner;

/** The trigger that is used for window evaluation/emission. */
private Trigger<? super T, ? super W> trigger;

/** The evictor that is used for evicting elements before window evaluation. */
private Evictor<? super T, ? super W> evictor;

3.总结

window其实就两种类型:timeWindow与Globalwindow

countWindow是GlobalWindow+定制的trigger+evictor实现的

timeWindow这些方法底层都是window(windowAssigner)

windowAssigner是各种各样的窗口类型

window的底层是WindowStream

//keyedStream->countwindow 调用方法底层是window
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
		return window(GlobalWindows.create())
				.evictor(CountEvictor.of(size))
				.trigger(CountTrigger.of(slide));
	}

//keyedStream window底层是WindowedStream
@PublicEvolving
	public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
		return new WindowedStream<>(this, assigner);//-->
	}

//----------------------------------------------------

//WindowedStream
/*
the stream of elements is split into windows based on a {windowing.assigners.WindowAssigner}. 
Window emission is triggered based on a {windowing.triggers.Trigger}.
*/

@PublicEvolving
	public WindowedStream(KeyedStream<T, K> input,
			WindowAssigner<? super T, W> windowAssigner) {
		this.input = input;
		this.windowAssigner = windowAssigner;
		this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
}

各种window的继承树

graph LR
A[window]-->B[windowAssigner]-->C[各种window]

3.windowAssigner的理解(更新20210125)

windowAssigner的理解

windowAssigner的源码理解


文章作者: Jinxin Li
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Jinxin Li !
  目录