Flink流处理高级操作之时间语义
# 流处理高级操作之时间语义
[toc]
# 时间语义介绍
对于一台机器而言,“时间”自然就是指系统时间。但我们知道,Flink 是一个分布式处理 系统。分布式架构最大的特点, 就是节点彼此独立、互不影响,这带来了更高的吞吐量和容错 性;但有利必有弊, 最大的问题也来源于此。
在分布式系统中,节点“各自为政”,是没有统一时钟的, 数据和控制信息都通过网络进 行传输。比如现在有一个任务是窗口聚合,我们希望将每个小时的数据收集起来进行统计处理。 而对于并行的窗口子任务,它们所在节点不同,系统时间也会有差异; 当我们希望统计 8 点 ~9 点的数据时,对并行任务来说其实并不是“同时”的, 收集到的数据也会有误差。
那既然一个集群中有 JobManager 作为管理者, 是不是让它统一向所有 TaskManager 发送 同步时钟信号就行了呢? 这也是不行的。因为网络传输会有延迟, 而且这延迟是不确定的, 所 以 JobManager 发出的同步信号无法同时到达所有节点;想要拥有一个全局统一的时钟, 在分 布式系统里是做不到的。
另一个麻烦的问题是,在流式处理的过程中, 数据是在不同的节点间不停流动的, 这同样 也会有网络传输的延迟。这样一来, 当上下游任务需要跨节点传输数据时, 它们对于“时间” 的理解也会有所不同。例如, 上游任务在 8 点 59 分 59 秒发出一条数据, 到下游要做窗口计算 时已经是 9 点零 1 秒了, 那这条数据到底该不该被收到 8 点~9 点的窗口呢?
所以, 当我们希望对数据按照时间窗口来进行收集计算时,“时间”到底以谁为标准就非 常重要了。
我们重新梳理一下流式数据处理的过程。如上图所示, 在事件发生之后, 生成的数据被 收集起来,首先进入分布式消息队列,然后被 Flink 系统中的 Source 算子读取消费,进而向下 游的转换算子(窗口算子)传递,最终由窗口算子进行计算处理。
很明显,这里有两个非常重要的时间点: 一个是数据产生的时间, 我们把它叫作“事件时间”(Event Time);另一个是数据真正被处理的时刻,叫作“处理时间”(Processing Time)。 我们所定义的窗口操作, 到底是以那种时间作为衡量标准,就是所谓的“时间语义”(Notions of Time)。由于分布式系统中网络传输的延迟和时钟漂移,处理时间相对事件发生的时间会有 所滞后。
# 1. 处理时间(Processing Time)
处理时间的概念非常简单,就是指执行处理操作的机器的系统时间。
如果我们以它作为衡量标准,那么数据属于哪个窗口就很明显了: 只看窗口任务处理这条 数据时,当前的系统时间。比如之前举的例子,数据 8 点 59 分 59 秒产生, 而窗口计算时的时 间是 9 点零 1 秒,那么这条数据就属于 9 点—10 点的窗口;如果数据传输非常快, 9 点之前就 到了窗口任务,那么它就属于 8 点—9 点的窗口了。每个并行的窗口子任务,就只按照自己的 系统时钟划分窗口。假如我们在早上 8 点 10 分启动运行程序, 那么接下来一直到 9 点以前处 理的所有数据, 都属于第一个窗口; 9 点之后、10 点之前的所有数据就将属于第二个窗口。
这种方法非常简单粗暴,不需要各个节点之间进行协调同步,也不需要考虑数据在流中的 位置, 简单来说就是“我的地盘听我的”。所以处理时间是最简单的时间语义。
# 2. 事件时间(Event Time)
事件时间, 是指每个事件在对应的设备上发生的时间,也就是数据生成的时间。
数据一旦产生, 这个时间自然就确定了, 所以它可以作为一个属性嵌入到数据中。这其实 就是这条数据记录的“时间戳”(Timestamp)。
在事件时间语义下,我们对于时间的衡量,就不看任何机器的系统时间了, 而是依赖于数 据本身。打个比方, 这相当于任务处理的时候自己本身是没有时钟的,所以只好来一个数据就 问一下“现在几点了”;而数据本身也没有表, 只有一个自带的“出厂时间”,于是任务就基于这 个时间来确定自己的时钟。由于流处理中数据是源源不断产生的, 一般来说, 先产生的数据也 会先被处理,所以当任务不停地接到数据时, 它们的时间戳也基本上是不断增长的,就可以代 表时间的推进。
当然我们会发现,这里有个前提,就是“先产生的数据先被处理”,这要求我们可以保证 数据到达的顺序。但是由于分布式系统中网络传输延迟的不确定性,实际应用中我们要面对的 数据流往往是乱序的。在这种情况下,就不能简单地把数据自带的时间戳当作时钟了, 而需要 用另外的标志来表示事件时间进展,在 Flink 中把它叫作事件时间的“水位线”(Watermarks)。 关于水位线的概念和用法,我们会稍后介绍。
# 3.摄入时间(Ingestion Time)
TODO 待补充
# 水位线/水印(Watermark)
在介绍事件时间语义时, 我们提到了“水位线”的概念, 已经知道了它其实就是用来度量 事件时间的。那么水位线具体有什么含义,又跟数据的时间戳有什么关系呢?接下来我们就来 深入探讨一下这个流处理中的核心概念。
在实际应用中, 一般会采用事件时间语义。而水位线, 就是基于事件时间提出的概念。所 以在介绍水位线之前,我们首先来梳理一下事件时间和窗口的关系。
一个数据产生的时刻,就是流处理中事件触发的时间点, 这就是“事件时间”,一般都会以时间戳的形式作为一个字段记录在数据里。这个时间就像商品的“生产日期”一样, 一旦产 生就是固定的, 印在包装袋上, 不会因为运输辗转而变化。如果我们想要统计一段时间内的数 据,需要划分时间窗口, 这时只要判断一下时间戳就可以知道数据属于哪个窗口了。
明确了一个数据的所属窗口, 还不能直接进行计算。因为窗口处理的是有界数据, 我们需 要等窗口的数据都到齐了,才能计算出最终的统计结果。那什么时候数据就都到齐了呢?对于 时间窗口来说这很明显: 到了窗口的结束时间,自然就应该收集到了所有数据, 就可以触发计 算输出结果了。比如我们想统计 8 点~9 点的用户点击量, 那就是从 8 点开始收集数据, 到 9 点截止,将收集的数据做处理计算。这有点类似于班车, 每小时发一班,那么 8 点之后来的人都会上同一班车,到 9 点钟准时发车; 9 点之后来的人,就只好等下一班 10 点发的车了。
当然, 我们现在处理的数据本身是有时间戳的。所以为了更清楚地解释,我们将“赶班车” 这个例子中的人,换成带着生产日期的商品。所以现在我们班车的主要任务是运送货物,一辆 车就只装载 1 小时内生产出的所有商品,货到齐了就发车。比如某辆车要装的是 8 点到 9 点的 所有商品,那货什么时候到齐呢? 自然可以想到,到 9 点钟的时候商品就到齐了,可以发车了。
这里的关键问题是,“9 点钟发车”,到底是看谁的表来定时间?
在处理时间语义下,都是以当前任务所在节点的系统时间为准的。这就相当于每辆车里都 挂了一个钟,司机看到到了 9 点就直接发车。这种方式简单粗暴容易实现, 但因为车上的钟是 独立运行的,以它为标准就不能准确地判断商品的生产时间。在分布式环境下, 这样会因为网 络传输延迟的不确定而导致误差。比如有些商品在 8 点 59 分 59 秒生产出来,可是从下生产线 到运至车上又要花费几秒,那就赶不上 9 点钟这班车了。而且现在分布式系统中有很多辆 9 点发的班车,所以同时生产出的一批商品,需要平均分配到不同班车上,可这些班车距离有近 有远、上面挂的钟有快有慢, 这就可能导致有些商品上车了、有些却被漏掉;先后生产出的商 品,到达车上的顺序也可能乱掉:统计结果的正确性受到了影响。
所以在实际中我们往往需要以事件时间为准。如果考虑事件时间, 情况就复杂起来了。现 在不能直接用每辆车上挂的钟(系统时间),又没有统一的时钟,那该怎么确定发车时间呢?
现在能利用的, 就只有商品的生产时间(数据的时间戳) 了。我们可以这样思考: 一般情 况下, 商品生产出来之后,就会立即传送到车上; 所以商品到达车上的时间(系统时间) 应该稍稍滞后于商品的生产时间(数据时间戳)。如果不考虑传输过程的一点点延迟,我们就可以 直接用商品生产时间来表示当前车上的时间了。如下图所示, 到达车上的商品, 生产时间是 8 点 05 分,那么当前车上的时间就是 8 点 05 分; 又来了一个 8 点 10 分生产的商品, 现在车 上的时间就是 8 点 10 分。我们直接用数据的时间戳来指示当前的时间进展, 窗口的关闭自然 也是以数据的时间戳等于窗口结束时间为准,这就相当于可以不受网络传输延迟的影响了。像之前所说 8 点 59 分 59 秒生产出来的商品,到车上的时候不管实际时间(系统时间)是几点, 我们就认为当前是 8 点 59 分 59 秒, 所以它总是能赶上车的;而 9 点这班车, 要等到 9 点整生 产的商品到来, 才认为时间到了 9 点,这时才正式发车。这样就可以得到正确的统计结果了。
在这个处理过程中, 我们其实是基于数据的时间戳,自定义了一个“逻辑时钟”。这个时 钟的时间不会自动流逝; 它的时间进展, 就是靠着新到数据的时间戳来推动的。这样的好处在于计算的过程可以完全不依赖处理时间(系统时间),不论什么时候进行统计处理,得到的 结果都是正确的。比如双十一的时候系统处理压力大,我们可能会把大量数据缓存在 Kafka 中; 过了高峰时段之后再读取出来, 在几秒之内就可以处理完几个小时甚至几天的数据,而且 依然可以按照数据产生的时间段进行统计,所有窗口都能收集到正确的数据。而一般实时流处 理的场景中,事件时间可以基本与处理时间保持同步, 只是略微有一点延迟, 同时保证了窗口计算的正确性。
# 什么是Watermark?
在事件时间语义下,我们不依赖系统时间,而是基于数据自带的时间戳去定义了一个时钟, 用来表示当前时间的进展。于是每个并行子任务都会有一个自己的逻辑时钟,它的前进是靠数 据的时间戳来驱动的。
但在分布式系统中,这种驱动方式又会有一些问题。因为数据本身在处理转换的过程中会 变化, 如果遇到窗口聚合这样的操作,其实是要攒一批数据才会输出一个结果, 那么下游的数 据就会变少,时间进度的控制就不够精细了。另外,数据向下游任务传递时,一般只能传输给 一个子任务(除广播外),这样其他的并行子任务的时钟就无法推进了。例如一个时间戳为 9点整的数据到来,当前任务的时钟就已经是 9 点了;处理完当前数据要发送到下游,如果下游 任务是一个窗口计算,并行度为 3,那么接收到这个数据的子任务,时钟也会进展到 9 点, 9 点结束的窗口就可以关闭进行计算了;而另外两个并行子任务则时间没有变化,不能进行窗口 计算。
所以我们应该把时钟也以数据的形式传递出去,告诉下游任务当前时间的进展;而且这个 时钟的传递不会因为窗口聚合之类的运算而停滞。一种简单的想法是,在数据流中加入一个时 钟标记,记录当前的事件时间; 这个标记可以直接广播到下游,当下游任务收到这个标记, 就 可以更新自己的时钟了。由于类似于水流中用来做标志的记号,在 Flink 中, 这种用来衡量事 件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)。
具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点, 主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置, 就应该是在某个 数据到来之后; 这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了
如图所示, 每个事件产生的数据,都包含了一个时间戳, 我们直接用一个整数表示。 这里没有指定单位, 可以理解为秒或者毫秒(方便起见, 下面讲述统一认为是秒)。当产生于 2 秒的数据到来之后,当前的事件时间就是 2 秒; 在后面插入一个时间戳也为 2 秒的水位线, 随着数据一起向下游流动。而当 5 秒产生的数据到来之后,同样在后面插入一个水位线,时间 戳也为 5,当前的时钟就推进到了 5 秒。这样,如果出现下游有多个并行子任务的情形,我们 只要将水位线广播出去, 就可以通知到所有下游任务当前的时间进度了。
水位线就像它的名字所表达的, 是数据流中的一部分, 随着数据一起流动, 在不同任务之 间传输。这看起来非常简单;接下来我们就进一步探讨一些复杂的状况。
(1) 有序流中的水位线
在理想状态下, 数据应该按照它们生成的先后顺序、排好队进入流中; 也就是说, 它们处 理的过程会保持原先的顺序不变,遵守先来后到的原则。这样的话我们从每个数据中提取时间 戳, 就可以保证总是从小到大增长的,从而插入的水位线也会不断增长、事件时钟不断向前推 进。
实际应用中,如果当前数据量非常大,可能会有很多数据的时间戳是相同的,这时每来一 条数据就提取时间戳、插入水位线就做了大量的无用功。而且即使时间戳不同, 同时涌来的数 据时间差会非常小(比如几毫秒),往往对处理计算也没什么影响。所以为了提高效率, 一般 会每隔一段时间生成一个水位线,这个水位线的时间戳,就是当前最新数据的时间戳, 如图 6-6 所示。所以这时的水位线,其实就是有序流中的一个周期性出现的时间标记。
这里需要注意的是, 水位线插入的“周期”,本身也是一个时间概念。在当前事件时间语 义下, 假如我们设定了每隔 100ms 生成一次水位线,那就是要等事件时钟推进 100ms 才能插 入; 但是事件时钟本身的进展, 本身就是靠水位线来表示的——现在要插入一个水位线,可前 提又是水位线要向前推进 100ms,这就陷入了死循环。所以对于水位线的周期性生成,周期时 间是指处理时间(系统时间),而不是事件时间。
(2) 乱序流中的水位线
有序流的处理非常简单,看起来水位线也并没有起到太大的作用。但这种情况只存在于理 想状态下。我们知道在分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性, 导致顺序发生改变, 这就是所谓的“乱序数据”。
这里所说的“乱序”(out-of-order),是指数据的先后顺序不一致,主要就是基于数据的产 生时间而言的。如图所示,一个 7 秒时产生的数据,生成时间自然要比 9 秒的数据早;但 是经过数据缓存和传输之后,处理任务可能先收到了 9 秒的数据,之后 7 秒的数据才姗姗来迟。 这时如果我们希望插入水位线, 来指示当前的事件时间进展, 又该怎么做呢?
最直观的想法自然是跟之前一样,我们还是靠数据来驱动,每来一个数据就提取它的时间 戳、插入一个水位线。不过现在的情况是数据乱序,所以有可能新的时间戳比之前的还小, 如 果直接将这个时间的水位线再插入, 我们的“时钟”就回退了——水位线就代表了时钟, 时光 不能倒流, 所以水位线的时间戳也不能减小。
解决思路也很简单: 我们插入新的水位线时, 要先判断一下时间戳是否比之前的大,否则 就不再生成新的水位线,如下图所示。也就是说,只有数据的时间戳比当前时钟大,才能推 动时钟前进,这时才插入水位线。
如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时只需 要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新 的水位线,如下图所示。
这样做尽管可以定义出一个事件时钟,却也会带来一个非常大的问题:我们无法正确处理 “迟到”的数据。在上面的例子中,当 9 秒产生的数据到来之后,我们就直接将时钟推进到了 9 秒;如果有一个窗口结束时间就是 9 秒(比如, 要统计 0~9 秒的所有数据),那么这时窗口 就应该关闭、将收集到的所有数据计算输出结果了。但事实上,由于数据是乱序的,还可能有 时间戳为 7 秒、 8 秒的数据在 9 秒的数据之后才到来,这就是“迟到数据”(late data)。它们 本来也应该属于 0~9 秒这个窗口,但此时窗口已经关闭, 于是这些数据就被遗漏了,这会导 致统计结果不正确。
如果用之前我们类比班车的例子,现在的状况就是商品不是按照生产时间顺序到来的,所 以有可能出现这种情况: 9 点生产的商品已经到了,我们认为已经到了 9 点,所以直接发车; 但是可能还会有 8 点 59 分 59 秒生产的商品迟到了,没有赶上这班车。那怎么解决这个问题呢?
其实我们有很多生活中的经验。假如是一个团队出去团建,那肯定希望每个人都不能落下; 如果有人因为堵车没能准时到车上, 我们可以稍微等一会儿。9 点发车, 我们可以等到 9 点 10 分, 等人都到齐了再出发。当然,实际应用的网络环境不可能跟北京的交通一样堵,所以不需 要等那么久,或许只要等一两秒钟就可以了。具体在商品班车的例子里,我们可以多等 2 秒钟, 也就是当生产时间为 9 点零 2 秒的商品到达,时钟推进到 9 点零 2 秒,这时就认为所有 8 点到 9 点生产的商品都到齐了,可以正式发车。不过这样相当于更改了发车时间,属于“违规操作”。 为了做到形式上仍然是 9 点发车,我们可以更改一下时钟推进的逻辑: 当一个商品到达时, 不 要直接用它的生产时间作为当前时间,而是减上两秒,这就相当于把车上的逻辑时钟调慢了。 这样一来, 当 9 点生产的商品到达时,我们当前车上的时间是 8 点 59 分 58 秒, 还没到发车时 间; 当 9 点零 2 秒生产的商品到达时,车上时间刚好是 9 点, 这时该到的商品都到齐了,准时 发车就没问题了。
回到上面的例子,为了让窗口能够正确收集到迟到的数据,我们也可以等上 2 秒; 也就是 用当前已有数据的最大时间戳减去 2 秒, 就是要插入的水位线的时间戳,如下图所示。这 样的话,9 秒的数据到来之后, 事件时钟不会直接推进到9 秒,而是进展到了7 秒;必须等到 11 秒的数据到来之后, 事件时钟才会进展到 9 秒,这时迟到数据也都已收集齐, 0~9 秒的窗 口就可以正确计算结果了。
如果仔细观察就会看到, 这种“等 2 秒”的策略其实并不能处理所有的乱序数据。比如 22 秒的数据到来之后, 插入的水位线时间戳为 20,也就是当前时钟已经推进到了 20 秒; 对于 10~20 秒的窗口, 这时就该关闭了。但是之后又会有 17 秒的迟到数据到来, 它本来应该属于 10~20 秒窗口,现在却被遗漏丢弃了。那又该怎么办呢?
既然现在等 2 秒还是等不到 17 秒产生的迟到数据,那自然我们可以试着多等几秒, 也就 是把时钟调得更慢一些。最终的目的,就是要让窗口能够把所有迟到数据都收进来,得到正确 的计算结果。对应到水位线上, 其实就是要保证, 当前时间已经进展到了这个时间戳, 在这之 后不可能再有迟到数据来了。
下面是一个示例,我们可以使用周期性的方式生成正确的水位线。
如上图所示, 第一个水位线时间戳为 7,它表示当前事件时间是 7 秒, 7 秒之前的数据 都已经到齐,之后再也不会有了;同样,第二个、第三个水位线时间戳分别为 12 和20,表示 11 秒、 20 秒之前的数据都已经到齐, 如果有对应的窗口就可以直接关闭了, 统计的结果一定 是正确的。这里由于水位线是周期性生成的,所以插入的位置不一定是在时间戳最大的数据后 面。
另外需要注意的是, 这里一个窗口所收集的数据, 并不是之前所有已经到达的数据。因为 数据属于哪个窗口,是由数据本身的时间戳决定的,一个窗口只会收集真正属于它的那些数据。 也就是说, 上图中尽管水位线 W(20)之前有时间戳为 22 的数据到来,10~20 秒的窗口中也不 会收集这个数据,进行计算依然可以得到正确的结果。关于窗口的原理, 我们会在后面继续展 开讲解。
# 水位线特性
现在我们可以知道,水位线就代表了当前的事件时间时钟,而且可以在数据的时间戳基础 上加一些延迟来保证不丢数据, 这一点对于乱序流的正确处理非常重要。
我们可以总结一下水位线的特性:
水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
水位线是基于数据的时间戳生成的
水位线的时间戳必须单调递增, 以确保任务的事件时间时钟一直向前推进
水位线可以通过设置延迟,来保证正确处理乱序数据
一个水位线 Watermark(t),表示在当前流中事件时间已经达到了时间戳 t, 这代表 t 之 前的所有数据都到齐了, 之后流中不会出现时间戳 t’ ≤ t 的数据
水位线是 Flink 流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合, 完成对乱序数据的正确处理。关于这部分内容, 我们会稍后进一步展开讲解。
# 实际代码中应用
在 Flink 的 DataStream API 中 , 有 一 个 单 独 用 于 生 成 水 位 线 的 方
法:.assignTimestampsAndWatermarks()
,它主要用来为流中的数据分配时间戳,并生成水位线 来指示事件时间:
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( WatermarkStrategy<T> watermarkStrategy)
具体使用时,直接用 DataStream 调用该方法即可,与普通的 transform 方法完全一样。
DataStream<Event> stream = env.addSource(new ClickSource());
DataStream<Event> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(<watermark strategy>);
2
3
这里读者可能有疑惑:不是说数据里已经有时间戳了吗, 为什么这里还要“分配”呢?这是 因为原始的时间戳只是写入日志数据的一个字段,如果不提取出来并明确把它分配给数据, Flink 是无法知道数据真正产生的时间的。 当然,有些时候数据源本身就提供了时间戳信息, 比如读取 Kafka 时,我们就可以从 Kafka 数据中直接获取时间戳, 而不需要单独提取字段分配 了。
.assignTimestampsAndWatermarks()
方法需要传入一个WatermarkStrategy
作为参数,这就 是 所 谓 的 “ 水 位 线 生 成 策 略 ” 。 WatermarkStrategy
中 包 含 了 一 个 “ 时 间 戳 分 配 器”TimestampAssigner 和一个“水位线生成器”WatermarkGenerator
。
public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{
@Override
TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context); }
2
3
4
5
6
7
TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给 元素。时间戳的分配是生成水位线的基础。
WatermarkGenerator :主要负责按照既定的方式, 基于时间戳生成水位线。在 WatermarkGenerator 接口中,主要又有两个方法:onEvent()和 onPeriodicEmit()。
onEvent:每个事件(数据)到来都会调用的方法, 它的参数有当前事件、时间戳, 以及允许发出水位线的一个 WatermarkOutput,可以基于事件做各种操作
onPeriodicEmit:周期性调用的方法, 可以由 WatermarkOutput 发出水位线。周期时间 为处理时间,可以调用环境配置的.setAutoWatermarkInterval()方法来设置,默认为 200ms。
env.getConfig().setAutoWatermarkInterval(60 * 1000L);
1
# 无序流应用
设置Watermark
延迟时间为5秒,逗号分隔取第一个字段为事件时间(时间戳)字段
- Flink 1.9
SingleOutputStreamOperator<String> lines = env.socketTextStream("localhost", 8888)
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(5)) {
//将数据中的时间字段提取出来,然后转成long类型
@Override
public long extractTimestamp(String line) {
String[] fields = line.split(",");
return Long.parseLong(fields[0]);
}
});
2
3
4
5
6
7
8
9
10
- Flink 1.13
SingleOutputStreamOperator<String> lines = env.socketTextStream("localhost", 8888)
.assignTimestampsAndWatermarks(
// 针对乱序流插入水位线,延迟时间设置为 5s
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
@Override
public long extractTimestamp(String s, long l) {
String[] fields = line.split(",");
return Long.parseLong(fields[0]);
}
})
);
2
3
4
5
6
7
8
9
10
11
12
# 有序流(不常用)
- Flink 1.13
SingleOutputStreamOperator<String> lines = env.socketTextStream("localhost", 8888).assignTimestampsAndWatermarks(
WatermarkStrategy.<String>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<String>()
{
@Override
public long extractTimestamp(String element, long recordTimestamp) {
String[] fields = line.split(",");
return Long.parseLong(fields[0]);
}
})
);
2
3
4
5
6
7
8
9
10
11