Flink流处理之ProcessFunction
# ProcessFunction
[toc]
# 基本概念
ProcessFunction函数是一个低级的流处理函数,可以将其看做一个具有Keyed状态和定时访问权限的FlatMapFunction,它通过调用输入数据流中收到的每个事件(元素)来处理事件(元素)。
- 转换算子是无法访问事件的时间戳信息和水位线信息的。而这在一些应用场景下,极为重要。例如我们常用的MapFunction转换算子就无法访问时间戳或者当前事件的事件时间。
- 基于此,DataStream API提供了一系列的Low-Level转换算子。可以访问时间戳、watermark以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。
- Process Function用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window函数和转换算子无法实现)。例如,Flink SQL就是使用Process Function实现的。
# 8个ProcessFunction
- Flink提供了8个Process Function:
- **1)**ProcessFunction dataStream
- **2)**KeyedProcessFunction 用于KeyedStream,keyBy之后的流处理
- **3)**CoProcessFunction 用于connect连接的流
- **4)**ProcessJoinFunction 用于join流操作
- **5)**BroadcastProcessFunction 用于广播
- **6)**KeyedBroadcastProcessFunction keyBy之后的广播
- **7)**ProcessWindowFunction 窗口增量聚合
- **8)**ProcessAllWindowFunction 全窗口聚合
# KeyedProcessFunction 使用案例
# 实现功能
通过socketTextStream读取9999端口数据,统计在一定时间内不同类型商品的销售总额度,如果持续销售额度为0,则执行定时器通知老板,是不是卖某种类型商品的员工偷懒了(只做功能演示,根据个人业务来使用,比如统计UV等操作)
# 实现代码
package com.jast.flink.processfunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
/**
* KeyedProcessFunction Demo
*/
object KeyedProcessFunctionDemo {
def main(args: Array[String]): Unit = {
//创建Stream环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//通过9999socket接口接收数据
val stream: DataStream[String] = env.socketTextStream("localhost", 9999)
//数据接收格式为:商品,价格
//e.g. 帽子,38
//e.g. 衣服,199
val typeAndData: DataStream[(String, String)] = stream.map(x => (x.split(",")(0), x.split(",")(1)))
.setParallelism(4) //设置并行度为4
typeAndData.keyBy(0) //根据key(商品)进行聚合
.process(new MyprocessFunction()) //调用自定义KeyedProcessFunction函数
.print("结果") // 输出函数返回结果,前面加上"结果"
env.execute()
}
/**
* 实现:
* 根据key分类,统计每个key进来的数据量,定期统计数量,如果数量为0则预警
*/
class MyprocessFunction extends KeyedProcessFunction[Tuple,(String,String),String]{
//统计间隔时间
val delayTime : Long = 1000 * 10
/**
* 状态存储变量
* cjcount 自定义名称
*/
lazy val state : ValueState[(String,Long)] = getRuntimeContext.getState[(String,Long)](new ValueStateDescriptor[(String, Long)]("cjcount",classOf[Tuple2[String,Long]]))
/**
* 定时器
* @name onTimer
* @date 2022/4/6 上午11:13
* @return void
* @param timestamp 定时器出发的时间
* @param ctx
* @param out
* @author Jast
*/
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, (String, String), String]#OnTimerContext, out: Collector[String]): Unit = {
printf("定时器触发,时间为:%d,状态为:%s,key为:%s\n",timestamp,state.value(),ctx.getCurrentKey)
if(state.value()._2==0){
//该时间段数据为0,进行预警
printf("类型为:%s,数据为0,预警\n",state.value()._1)
}
//定期数据统计完成后,清零
state.update(state.value()._1,0)
//再次注册定时器执行
val currentTime: Long = ctx.timerService().currentProcessingTime()
ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)
}
/**
* 处理数据方法
* @name processElement
* @date 2022/4/6 上午11:12
* @return void
* @param value 数据的数据值
* @param ctx 存储的上下文信息
* @param out 返回数据格式
* @author Jast
*/
override def processElement(value: (String, String), ctx: KeyedProcessFunction[Tuple, (String, String), String]#Context, out: Collector[String]): Unit = {
printf("状态值:%s,state是否为空:%s\n",state.value(),(state.value()==null))
if(state.value() == null){
//当前Key首次执行状态值为空,进行初始化赋值
//获取时间
val currentTime: Long = ctx.timerService().currentProcessingTime()
//注册定时器 delayTime 秒后触发(执行onTimer方法)
ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)
printf("定时器注册时间:%d\n",currentTime + delayTime)
// 更新state值
state.update(value._1,value._2.toInt)
} else{
//统计数据
val key: String = state.value()._1
var count: Long = state.value()._2
count += value._2.toInt
//更新state值
state.update((key,count))
}
//返回任务的名称,并附加子任务指示符,例如“MyTask(3/6)”,
println(getRuntimeContext.getTaskNameWithSubtasks+"->"+value)
printf("状态值:%s\n",state.value())
//返回处理后结果
out.collect("处理后返回数据->"+value)
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# 测试执行
(1) 在终端启动端口准备发送数据使用
mac@Mac ~ % nc -lk 9999
1
**(2)**启动程序
**(3)**发送数据
mac@Mac ~ % nc -lk 9999
衣服,123
裤子,1234
衣服,1
1
2
3
4
2
3
4
**(4)**控制台输出内容
状态值:null,state是否为空:true
定时器注册时间:1649213908697
KeyedProcess -> Sink: Print to Std. Out (8/8)->(衣服,123)
状态值:(衣服,123)
结果:8> 处理后返回数据->(衣服,123)
状态值:null,state是否为空:true
定时器注册时间:1649213911821
KeyedProcess -> Sink: Print to Std. Out (4/8)->(裤子,1234)
状态值:(裤子,1234)
结果:4> 处理后返回数据->(裤子,1234)
状态值:(衣服,123),state是否为空:false
KeyedProcess -> Sink: Print to Std. Out (8/8)->(衣服,1)
状态值:(衣服,124)
结果:8> 处理后返回数据->(衣服,1)
定时器触发,时间为:1649213908697,状态为:(衣服,124),key为:(衣服)
定时器触发,时间为:1649213911821,状态为:(裤子,1234),key为:(裤子)
定时器触发,时间为:1649213918708,状态为:(衣服,0),key为:(衣服)
类型为:衣服,数据为0,预警
定时器触发,时间为:1649213921832,状态为:(裤子,0),key为:(裤子)
类型为:裤子,数据为0,预警
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
可以看到我们输入的内容,根据类型统计输出
定时器触发,时间为:1649213908697,状态为:(衣服,124),key为:(衣服)
定时器触发,时间为:1649213911821,状态为:(裤子,1234),key为:(裤子)
1
2
2
随后我们不进行数据输入,定时器触发进行预警操作
定时器触发,时间为:1649213918708,状态为:(衣服,0),key为:(衣服)
类型为:衣服,数据为0,预警
定时器触发,时间为:1649213921832,状态为:(裤子,0),key为:(裤子)
类型为:裤子,数据为0,预警
1
2
3
4
2
3
4
# CoProcessFunction
http://www.manongjc.com/detail/23-umgdbbcybtgvihr.html
# BroadcastProcessFunction
https://cloud.tencent.com/developer/article/1983497
上次更新: 2023/03/10, 17:00:47