Flink流处理之窗口Window
[toc]
# Flink流处理之窗口(Window)
我们已经了解了 Flink 中事件时间和水位线的概念, 那它们有什么具体应用呢?当然是做 基于时间的处理计算了。其中最常见的场景, 就是窗口聚合计算。
在流处理中,我们往往需要面对的是连续 不断、无休无止的无界流, 不可能等到所有所有数据都到齐了才开始处理。所以聚合计算其实 只能针对当前已有的数据——之后再有数据到来, 就需要继续叠加、再次输出结果。这样似乎 很“实时”,但现实中大量数据一般会同时到来,需要并行处理,这样频繁地更新结果就会给 系统带来很大负担了。
更加高效的做法是, 把无界流进行切分, 每一段数据分别进行聚合,结果只输出一次。这 就相当于将无界流的聚合转化为了有界数据集的聚合, 这就是所谓的“窗口”(Window)聚合 操作。窗口聚合其实是对实时性和处理效率的一个权衡。在实际应用中, 我们往往更关心一段 时间内数据的统计结果, 比如在过去的 1 分钟内有多少用户点击了网页。在这种情况下,我们 就可以定义一个窗口,收集最近一分钟内的所有用户点击数据,然后进行聚合统计,最终输出 一个结果就可以了。
在 Flink 中,提供了非常丰富的窗口操作, 下面我们就来详细介绍。
# 窗口的基本概念
窗口化的流处理程序一般有两种结构,第一种结构基于分组的窗口化数据流,第二种结构基于非分组的窗口化数据流。这两种结构的唯一区别是分组的窗口化数据流调用KeyBy
操作符和Window
操作符,而非分组的窗口化数据流仅调用 WindowAll
操作符。
# 分组的窗口 Window 操作符
经过按键分区 keyBy 操作后, 数据流会按照 key 被分为多条逻辑流(logical streams),这 就是 KeyedStream。基于 KeyedStream 进行窗口操作时, 窗口计算会在多个并行子任务上同时 执行。相同 key 的数据会被发送到同一个并行子任务, 而窗口操作会基于每个 key 进行单独的 处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。
在代码实现上,我们需要先对 DataStream 调用.keyBy() 进行按键分区, 然后再调 用.window()定义窗口。
stream.keyBy(...)
.window(...)
2
# 非分组的窗口 WindowAll 操作符
如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只 能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用 这种方式。
在代码中, 直接基于 DataStream 调用.windowAll()定义窗口。
stream.windowAll(...)
这里需要注意的是,对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的, windowAll 本身就是一个非并行的操作。
# 窗口分配器
在指定数据流是否为键控后(数据流是分组流还是非分组流后),下一步就是定义一个窗口分配器。通过在KeyedStream(分组流)中调用window(...)方法或者在DataStream(非分组流)中调用windowAll(...)方法来选择窗口分配器。
所有预定义的窗口分配器中除了全局窗口是基于数据驱动的(根据元素数量来划分窗口),其他类型的窗口都是基于时间驱动的(窗口有一个开始时间戳(包括)和一个结束时间戳(不包括),它们共同描述窗口的大小),这个时间可以是处理时间,也可以是事件时间。关于时间类型的差异,可以参考《Flink流处理高级操作之时间语义》。
# Tumbling Window 滚动窗口
滚动窗口分配器将数据流中的每个元素分配到指定大小的窗口中。滚动窗口有固定的大小,并且各窗口不会出现重叠的情况。
(1) 滚动处理时间窗口
stream.keyBy(...) // 根据指定key对数据流进行分组
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) //基于处理时间的滚动窗口,窗口大小为5秒
.<windowed transformation>(<window function>); //窗口函数
2
3
(2) 滚动事件时间窗口
stream.keyBy(...) // 根据指定key对数据流进行分组
.window(TumblingEventTimeWindows.of(Time.seconds(5))) //基于事件时间的滚动窗口,窗口大小为5秒
.<windowed transformation>(<window function>); //窗口函数
2
3
stream.keyBy(...) // 根据指定key对数据流进行分组
.window(TumblingEventTimeWindows.of(Time.days(5),Time.hours(-8))) //基于事件时间的滚动窗口,窗口大小为1天,偏移为8小时
.<windowed transformation>(<window function>); //窗口函数
2
3
可以传入两个 Time 类型的参数:size 和offset。第一个参 数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量。这里需要多做一些解释:对于 我们之前的定义,滚动窗口其实只有一个 size 是不能唯一确定的。比如我们定义 1 天的滚动 窗口, 从每天的 0 点开始计时是可以的, 统计的就是一个自然日的所有数据;而如果从每天的 凌晨 2 点开始计时其实也完全没问题,只不过统计的数据变成了每天 2 点到第二天 2 点。这个 起始点的选取, 其实对窗口本身的类型没有影响; 而为了方便应用,默认的起始点时间戳是窗 口大小的整倍数。也就是说, 如果我们定义 1 天的窗口, 默认就从 0 点开始;如果定义 1 小时 的窗口,默认就从整点开始。而如果我们非要不从这个默认值开始,那就可以通过设置偏移量 offset 来调整。
这里读者可能会觉得奇怪: 这个功能好像没什么用, 非要弄个偏移量不是给自己找别扭 吗?这其实是有实际用途的。我们知道, 不同国家分布在不同的时区。标准时间戳其实就是 1970 年 1 月 1 日 0 时 0 分 0 秒 0 毫秒开始计算的一个毫秒数, 而这个时间是以 UTC 时间, 也 就是 0 时区(伦敦时间)为标准的。我们所在的时区是东八区,也就是 UTC+8,跟 UTC 有 8 小时的时差。我们定义 1 天滚动窗口时,如果用默认的起始点,那么得到就是伦敦时间每天 0 点开启窗口,这时是北京时间早上 8 点。那怎样得到北京时间每天 0 点开启的滚动窗口呢? 只 要设置-8 小时的偏移量就可以了:
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)));
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
2
3
# Sliding Window 滑动窗口
与滚动窗口类似,滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的, 而是可以“错开”一定的位置。如果看作一个窗口的运动, 那么就像是向前小步“滑动”一样。
既然是向前滑动,那么每一步滑多远,就也是可以控制的。所以定义滑动窗口的参数有两 个:除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代 表了窗口计算的频率。滑动的距离代表了下个窗口开始的时间间隔,而窗口大小是固定的, 所 以也就是两个窗口结束时间的间隔;窗口在结束时间触发计算输出结果,那么滑动步长就代表了计算频率。例如, 我们定义一个长度为 1 小时、滑动步长为 5 分钟的滑动窗口, 那么就会统 计 1 小时内的数据, 每 5 分钟统计一次。同样,滑动窗口可以基于时间定义,也可以基于数据个数定义。
我们可以看到, 当滑动步长小于窗口大小时, 滑动窗口就会出现重叠, 这时数据也可能会 被同时分配到多个窗口中。而具体的个数,就由窗口大小和滑动步长的比值(size/slide)来决 定。如上图所示,滑动步长刚好是窗口大小的一半,那么每个数据都会被分配到 2 个窗口 里。比如我们定义的窗口长度为 1 小时、滑动步长为 30 分钟,那么对于 8 点 55 分的数据, 应 该同时属于[8 点, 9 点)和[8 点半, 9 点半)两个窗口; 而对于 8 点 10 分的数据, 则同时属于[8 点, 9 点)和[7 点半, 8 点半)两个窗口。
所以, 滑动窗口其实是固定大小窗口的更广义的一种形式;换句话说, 滚动窗口也可以看作是一种特殊的滑动窗口——窗口大小等于滑动步长(size = slide)。当然,我们也可以定义 滑动步长大于窗口大小, 这样的话就会出现窗口不重叠、但会有间隔的情况; 这时有些数据不 属于任何一个窗口, 就会出现遗漏统计。所以一般情况下,我们会让滑动步长小于窗口大小, 并尽量设置为整数倍的关系。
在一些场景中, 可能需要统计最近一段时间内的指标, 而结果的输出频率要求又很高, 甚 至要求实时更新,比如股票价格的 24 小时涨跌幅统计,或者基于一段时间内行为检测的异常 报警。这时滑动窗口无疑就是很好的实现方式。
(1) 滑动处理时间窗口
stream.keyBy(...) // 根据指定key对数据流进行分组
.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))) //基于处理时间的滑动窗口,窗口大小为10秒,滑动间隙为5秒。
.<windowed transformation>(<window function>); //窗口函数、
2
3
stream.keyBy(...) // 根据指定key对数据流进行分组 .window(SlidingProcessingTimeWindows.of(Time.hours(12),Time.hours(1),Time.hours(-8))) //基于处理时间的滑动窗口,窗口大小为1天,滑动间隙为1小时,偏移为-8小时。
.<windowed transformation>(<window function>); //窗口函数、
2
(2) 滑动事件时间窗口
stream.keyBy(...) // 根据指定key对数据流进行分组
.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5))) //基于事件时间的滑动窗口,窗口大小为10秒,滑动间隙为5秒。
.<windowed transformation>(<window function>); //窗口函数、
2
3
# Session Window 会话窗口
会话窗口顾名思义,是基于“会话”(session)来来对数据进行分组的。这里的会话类似 Web 应用中 session 的概念,不过并不表示两端的通讯过程,而是借用会话超时失效的机制来 描述窗口。简单来说,就是数据来了之后就开启一个会话窗口,如果接下来还有数据陆续到来, 那么就一直保持会话;如果一段时间一直没收到数据,那就认为会话超时失效,窗口自动关闭。 这就好像我们打电话一样,如果时不时总能说点什么, 那说明还没聊完; 如果陷入了尴尬的沉 默,半天都没话说, 那自然就可以挂电话了。
与滑动窗口和滚动窗口不同, 会话窗口只能基于时间来定义, 而没有“会话计数窗口”的 概念。这很好理解,“会话”终止的标志就是“隔一段时间没有数据来”,如果不依赖时间而改 成个数,就成了“隔几个数据没有数据来”,这完全是自相矛盾的说法。
而同样是基于这个判断标准, 这“一段时间”到底是多少就很重要了, 必须明确指定。对 于会话窗口而言, 最重要的参数就是这段时间的长度(size),它表示会话的超时时间,也就 是两个会话窗口之间的最小距离。如果相邻两个数据到来的时间间隔(Gap)小于指定的大小 (size),那说明还在保持会话,它们就属于同一个窗口;如果 gap 大于 size,那么新来的数据 就应该属于新的会话窗口,而前一个窗口就应该关闭了。在具体实现上, 我们可以设置静态固 定的大小(size),也可以通过一个自定义的提取器(gap extractor)动态提取最小间隔 gap 的 值。
考虑到事件时间语义下的乱序流,这里又会有一些麻烦。相邻两个数据的时间间隔 gap 大于指定的 size,我们认为它们属于两个会话窗口,前一个窗口就关闭;可在数据乱序的情况 下, 可能会有迟到数据, 它的时间戳刚好是在之前的两个数据之间的。这样一来, 之前我们判 断的间隔中就不是“一直没有数据”,而缩小后的间隔有可能会比 size 还要小——这代表三个 数据本来应该属于同一个会话窗口。
所以在 Flink 底层,对会话窗口的处理会比较特殊:每来一个新的数据,都会创建一个新 的会话窗口;然后判断已有窗口之间的距离,如果小于给定的 size,就对它们进行合并(merge) 操作。在 Window 算子中,对会话窗口会有单独的处理逻辑。
我们可以看到, 与前两种窗口不同, 会话窗口的长度不固定, 起始和结束时间也是不确定 的,各个分区之间窗口没有任何关联。如上图所示,会话窗口之间一定是不会重叠的,而 且会留有至少为 size 的间隔(session gap)。
在一些类似保持会话的场景下, 往往可以使用会话窗口来进行数据的处理统计。
(1) 处理时间会话窗口
stream.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate (...)
2
3
这里.withGap()方法需要传入一个 Time 类型的参数 size,表示会话的超时时间,也就是最小间隔 session gap。我们这里创建了静态会话超时时间为 10 秒的会话窗口。
stream.keyBy(...)
.window(ProcessingTimeSessionWindows.withDynamicGap(new
SessionWindowTimeGapExtractor<Tuple2<String, Long>>() {
@Override
public long extract(Tuple2<String, Long> element) {
// 提取 session gap 值返回, 单位毫秒
return element.f0.length() * 1000;
}
}))
2
3
4
5
6
7
8
9
这里.withDynamicGap()方法需要传入一个 SessionWindowTimeGapExtractor 作为参数,用 来定义 session gap 的动态提取逻辑。在这里, 我们提取了数据元素的第一个字段, 用它的长 度乘以 1000 作为会话超时的间隔。
keyBy之后,每个组的窗口结束时间都是根据当前组数据时间进行判断的。
(2) 事件时间会话窗口
stream.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate (...)
2
3
# Global Window 全局窗口
还有一类比较通用的窗口,就是“全局窗口”。这种窗口全局有效, 会把相同key 的所有数据都分配到同一个窗口中; 说直白一点,就跟没分窗口一样。无界流的数据永无止尽,所以 这种窗口也没有结束的时候,默认是不会做触发计算的。如果希望它能对数据进行计算处理, 还需要自定义“触发器”(Trigger)。关于触发器,后面单独讲解。
如上图所示,可以看到,全局窗口没有结束的时间点, 所以一般在希望做更加灵活的 窗口处理时自定义使用。 Flink 中的计数窗口(Count Window),底层就是用全局窗口实现的。
stream.keyBy(...)
.window(GlobalWindows.create())//为窗口分配全局窗口分配器
.trigger(CountTrigger.of(3));//一旦窗口中的元素数量为3就会出发窗口函数对窗口进行计算
2
3
# 窗口函数
定义了窗口分配器, 我们只是知道了数据属于哪个窗口, 可以将数据收集起来了; 至于收 集起来到底要做什么,其实还完全没有头绪。所以在窗口分配器之后, 必须再接上一个定义窗 口如何进行计算的操作, 这就是所谓的“窗口函数”(window functions)。
经窗口分配器处理之后,数据可以分配到对应的窗口中, 而数据流经过转换得到的数据类 型是 WindowedStream。这个类型并不是 DataStream,所以并不能直接进行其他转换,而必须 进一步调用窗口函数,对收集到的数据进行处理计算之后,才能最终再次得到 DataStream,如下图所示:
窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数。下面我们来进行分别讲解。
# 增量聚合函数(incremental aggregation functions)
窗口将数据收集起来,最基本的处理操作当然就是进行聚合。窗口对无限流的切分,可以 看作得到了一个有界数据集。如果我们等到所有数据都收集齐,在窗口到了结束时间要输出结 果的一瞬间再去进行聚合,显然就不够高效了——这相当于真的在用批处理的思路来做实时流 处理。
为了提高实时性,我们可以再次将流处理的思路发扬光大:就像 DataStream 的简单聚合 一样, 每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态就可以了;区别只是 在于不立即输出结果,而是要等到窗口结束时间。等到窗口到了结束时间需要输出计算结果的 时候, 我们只需要拿出之前聚合的状态直接输出,这无疑就大大提高了程序运行的效率和实时 性。
典型的增量聚合函数有两个: ReduceFunction 和 AggregateFunction。
# ReduceFunction 归约函数
调用方法:
windowedStream.reduce(ReduceFunction<T> function)
只要基于 WindowedStream 调用.reduce()方法, 然 后传入 ReduceFunction 作为参数,就可以指定以归约两个元素的方式去对窗口中数据进行聚 合了
ReduceFunction 中需要重写一个 reduce 方法, 它的两个参数代表输入的两 个元素,而归约最终输出结果的数据类型,与输入的数据类型必须保持一致。也就是说,中间 聚合的状态和输出的结果,都和输入的数据类型是一样的。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//自定义数据源函数,并指定向数据流中发送元素的间隔为1000毫秒
DataStream<Tuple3<String, Integer, String>> streamSource = env.addSource(new SourceForWindow(1000));
//将数据流中元素的f0字段作为Key对数据流进行分组
DataStream<Tuple3<String, Integer, String>> reduce = streamSource.keyBy("f0")
//基于处理时间的滚动窗口,窗口大小为5秒
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
//对该窗口应用ReduceFunction窗口函数,输出每个窗口中f1字段值最小的元素
.reduce((new ReduceFunction<Tuple3<String, Integer, String>>() {
@Override
public Tuple3<String, Integer, String> reduce(Tuple3<String, Integer, String> value1,Tuple3<String, Integer, String> value2) {
return value1.f1 > value2.f1 ? value2 : value1;
}
}));
reduce.print();
env.execute("WindowReduceTemplate");
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# AggregateFunction 聚合函数
ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状 态的类型、输出结果的类型都必须和输入数据类型一样。这就迫使我们必须在聚合前, 先将数 据转换(map)成预期结果类型; 而在有些情况下, 还需要对状态进行进一步处理才能得到输 出结果,这时它们的类型可能不同, 使用ReduceFunction 就会非常麻烦。
例如, 如果我们希望计算一组数据的平均值, 应该怎样做聚合呢? 很明显, 这时我们需要 计算两个状态量:数据的总和(sum),以及数据的个数(count),而最终输出结果是两者的商 (sum/count)。如果用 ReduceFunction,那么我们应该先把数据转换成二元组(sum, count)的形 式, 然后进行归约聚合, 最后再将元组的两个元素相除转换得到最后的平均值。本来应该只是 一个任务, 可我们却需要 map-reduce-map 三步操作,这显然不够高效。
于是自然可以想到, 如果取消类型一致的限制,让输入数据、中间状态、输出结果三者类 型都可以不同, 不就可以一步直接搞定了吗?
Flink 的 Window API 中的 aggregate 就提供了这样的操作。直接基于 WindowedStream 调 用 .aggregate() 方法 ,就可 以 定义更加灵 活 的 窗 口 聚合操作 。这个方法 需要传入 一个 AggregateFunction 的实现类作为参数。AggregateFunction 在源码中的定义如下:
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
ACC createAccumulator();
ACC add(IN value, ACC accumulator);
OUT getResult(ACC accumulator);
ACC merge(ACC a, ACC b);
}
2
3
4
5
6
AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:输入类型 (IN)、累加器类型(ACC)和输出类型(OUT)。
输入类型 IN 就是输入流中元素的数据类型; 累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。
接口中有四个方法:
createAccumulator():创建一个累加器, 这就是为聚合创建了一个初始状态,每个聚 合任务只会调用一次。
add():将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进 一步聚合的过程。方法传入两个参数: 当前新到的数据 value,和当前的累加器 accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之 后都会调用这个方法。
getResult() :从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态, 然后再基于这些聚合的状态计算出一个结果进行输出。比如之前我们提到的计算平均 值, 就可以把 sum 和count 作为状态放入累加器,而在调用这个方法时相除得到最终 结果。这个方法只在窗口要输出结果时调用。
merge():合并两个累加器, 并将合并后的状态作为一个累加器返回。这个方法只在 需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景 就是会话窗口(Session Windows)。
所以可以看到, AggregateFunction 的工作原理是: 首先调用 createAccumulator()为任务初 始化一个状态(累加器);而后每来一个数据就调用一次 add()方法,对数据进行聚合,得到的结果保存在状态中; 等到了窗口需要输出时, 再调用 getResult()方法得到计算结果。很明显, 与 ReduceFunction 相同,AggregateFunction 也是增量式的聚合;而由于输入、中间状态、输 出的类型可以不同, 使得应用更加灵活方便。
下面是一个简单的AggregateFunction的实现,它使用AverageAccumulator类作为累加器,统计一个窗口内元素的个数,以及对元素的某个字段进行累计求和。
import com.intsmaze.flink.streaming.window.source.SourceForWindow;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class CustomWindowAggregateTemplate {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//自定义数据源函数,并指定向数据流中发送元素的间隔时间为1000毫秒
DataStream<Tuple3<String, Integer, String>> streamSource = env.addSource(new SourceForWindow(1000));
streamSource.keyBy(0)
//基于处理时间的滚动窗口,窗口大小为5秒
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
//对该窗口应用AggregateFunction窗口函数
.aggregate(new Average())
.print();
env.execute("CustomWindowAggregateTemplate");
}
}
class Average implements AggregateFunction<Tuple3<String, Integer, String>, AverageAccumulator, AverageAccumulator> {
//创建一个初始状态的累加器对象
@Override
public AverageAccumulator createAccumulator() {
return new AverageAccumulator();
}
@Override
public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
//将两个窗口中累加器对象的count字段进行相加
a.setCount(a.getCount() + b.getCount());
//将两个窗口中累加器对象的sum字段进行相加
a.setSum(a.getSum() + b.getSum());
return a;
}
//将进入窗口的元素添加进该窗口的累加器对象中
@Override
public AverageAccumulator add(Tuple3<String, Integer, String> value, AverageAccumulator acc) {
acc.setWord(value.f0);
acc.setSum(acc.getSum() + value.f1);
acc.setCount(acc.getCount() + 1);
return acc;
}
//返回窗口函数计算的结果
@Override
public AverageAccumulator getResult(AverageAccumulator acc) {
return acc;
}
}
@Data
class AverageAccumulator {
private String word;
private long count;
private long sum;
public AverageAccumulator() {
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
public long getSum() {
return sum;
}
public void setSum(long sum) {
this.sum = sum;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
@Override
public String toString() {
return "AverageAccumulator{" +
"word='" + word + '\'' +
", count=" + count +
", sum=" + sum +
'}';
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# 全窗口函数 (full window functions)
窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。
很明显,这就是典型的批处理思路了——先攒数据,等一批都到齐了再正式启动处理流程。 这样做毫无疑问是低效的:因为窗口全部的计算任务都积压在了要输出结果的那一瞬间,而在之前收集数据的漫长过程中却无所事事。这就好比平时不用功,到考试之前通宵抱佛脚,肯定不如把工夫花在日常积累上。
那为什么还需要有全窗口函数呢?这是因为有些场景下,我们要做的计算必须基于全部的数据才有效,这时做增量聚合就没什么意义了;另外, 输出的结果有可能要包含上下文中的一些信息(比如窗口的起始时间),这是增量聚合函数做不到的。所以,我们还需要有更丰富的窗口计算方式, 这就可以用全窗口函数来实现。
在 Flink 中,全窗口函数也有两种:WindowFunction
和 ProcessWindowFunction
。
# WindowFunction 窗口函数
WindowFunction 字面上就是“窗口函数”,它其实是老版本的通用窗口函数接口。我们可以基于WindowedStream 调用.apply()方法, 传入一个 WindowFunction 的实现类。该窗口在未来会被废弃掉。
stream
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());
2
3
4
这个类中可以获取到包含窗口所有数据的可迭代集合(Iterable), 还可以拿到窗口 (Window)本身的信息。WindowFunction 接口在源码中实现如下:
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
2
3
当窗口到达结束时间需要触发计算时,就会调用这里的 apply 方法。我们可以从 input 集合中取出窗口收集的数据,结合 key 和 window 信息,通过收集器(Collector)输出结果。这里 Collector 的用法, 与 FlatMapFunction 中相同。
不过我们也看到了, WindowFunction 能提供的上下文信息较少, 也没有更高级的功能。 事实上,它的作用可以被 ProcessWindowFunction 全覆盖,所以之后可能会逐渐弃用。一般在 实际应用, 直接使用 ProcessWindowFunction 就可以了。
# ProcessWindowFunction 处理窗口函数
ProcessWindowFunction 是 Window API 中最底层的通用窗口函数接口。之所以说它“最底 层”,是因为除了可以拿到窗口中的所有数据之外, ProcessWindowFunction 还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当 前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富。事实上, ProcessWindowFunction 是 Flink 底层 API——处理函数(process function)中的一员,关于处 理函数我们会在后续章节展开讲解。
当然,这些好处是以牺牲性能和资源为代价的。作为一个全窗口函数,ProcessWindowFunction同样需要将所有数据缓存下来、等到窗口触发计算时才使用。它其实就是一个增强版的WindowFunction。
ProcessWindowFunction抽象类定义如下:
package org.apache.flink.streaming.api.functions.windowing;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* Base abstract class for functions that are evaluated over keyed (grouped) windows using a context
* for retrieving extra information.
*
* @param <IN> The type of the input value.
* @param <OUT> The type of the output value.
* @param <KEY> The type of the key.
* @param <W> The type of {@code Window} that this window function can be applied on.
*/
@PublicEvolving
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
/**
* Deletes any state in the {@code Context} when the Window is purged.
*
* @param context The context to which the window is being evaluated
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
public void clear(Context context) throws Exception {}
/**
* The context holding window metadata.
*/
public abstract class Context implements java.io.Serializable {
/**
* Returns the window that is being evaluated.
*/
public abstract W window();
/** Returns the current processing time. */
public abstract long currentProcessingTime();
/** Returns the current event-time watermark. */
public abstract long currentWatermark();
/**
* State accessor for per-key and per-window state.
*
* <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
* by implementing {@link ProcessWindowFunction#clear(Context)}.
*/
public abstract KeyedStateStore windowState();
/**
* State accessor for per-key global state.
*/
public abstract KeyedStateStore globalState();
/**
* Emits a record to the side output identified by the {@link OutputTag}.
*
* @param outputTag the {@code OutputTag} that identifies the side output to emit to.
* @param value The record to emit.
*/
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
可以看到ProcessWindowFunction抽象类定义了如下两个方法:
public abstract void process(KEY key, Context context, Iterable
elements, Collector out) : 该方法负责处理窗口内的数据,并输出0个或者多个元素。该方法中的参数key为当前处理窗口的键(前面keyBy的字段),参数context为当前处理窗口的上下文对象,存储了当前窗口的元数据信息,参数elements为当前窗口中的元素的迭代器对象,参数out为发送窗口函数计算结果的收集器。
public void clear(Context context):
在窗口函数中使用状态来进行聚合计算时,在窗口结束被清楚时也要清除该窗口内维护的状态,开发者需要再clear(...)方法清除对应的状态。
Process(...) 和 clear(...) 方法都提供了Context对象作为参数,以实现对窗口信息和状态的访问操作,Context接口定义了如下6个方法供开发者使用:
- W window(): 获取正在处理的窗口对象
- long currentProcessingTime():获取当前的处理时间
- long currentWatermark():获取当前时间的水位线
- KeyedStateStore windowState():获取每个键和每个窗口状态的状态访问器
- KeyedStateStore globalState():获取每个键全局状态的状态访问器
- void output(OutputTag
outputTag, X value):向OutputTag标识的侧端输出发送元素。
package com.intsmaze.flink.streaming.window.process;
import com.intsmaze.flink.streaming.window.source.SourceForWindow;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class ProcessWindowTemplate {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple3<String, Integer, String>> streamSource = env.addSource(new SourceForWindow(1000));
DataStream<String> process = streamSource
.keyBy(t -> t.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new UserDefinedProcessWindowFunction());
process.print("输出结果");
env.execute("ProcessWindowTemplate");
}
public static class UserDefinedProcessWindowFunction extends ProcessWindowFunction<Tuple3<String, Integer, String>, String, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Tuple3<String, Integer, String>> input, Collector<String> out) {
String str = "";
long count = 0;
//计算迭代器中元素的个数
for (Tuple3<String, Integer, String> in : input) {
str = StringUtils.join(str, in.toString());
count++;
}
System.out.println("窗口内元素为:" + str);
KeyedStateStore keyedStateStore = context.globalState();
KeyedStateStore keyedStateStore1 = context.windowState();
//将计算的元素个数和窗口信息及窗口的Key拼接为一个字符串作为窗口计算的结果输出
out.collect("Window: " + context.window() + " key:" + key + " count: " + count);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54