Flink操作MySQL
# Flink操作MySQL
[toc]
# 在function中操作MySQL读写
e.g. 以ProcessWindowFunction
举例
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.concurrent.TimeUnit
import cn.hutool.core.date.DateUtil
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import org.apache.kafka.clients.consumer.ConsumerRecord
@SerialVersionUID(1L)
class AlertProcessWindowFunction extends ProcessWindowFunction[(Int, ConsumerRecord[String, String]), (String, Int), Int, TimeWindow] {
/**
* MySQL连接
*/
private var connection: Connection = null
private var ps: PreparedStatement = null
override def open(parameters: Configuration): Unit = {
super.open(parameters);
//================================================================
//获取配置文件全局变量
val params = getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[ParameterTool]
//================================================================
//MySQL连接
val url = params.getRequired("mysql.jdbc.url")
val username = params.getRequired("mysql.username")
val passowrd = params.getRequired("mysql.password")
println("MySQL连接初始化:" + url + "\t" + username + "\t" + passowrd)
//创建MySQL连接
connection = DriverManager.getConnection(url, username, passowrd)
//================================================================
//MySQL数据写入
var pstm: PreparedStatement = null
try {
pstm = connection.prepareStatement("INSERT INTO alert (data_type, contrast_table) VALUES ( ?, ?) ")
pstm.setString(1, "Test type")
pstm.setString(2, "Test table " + DateUtil.date())
pstm.executeUpdate
} finally {
if (pstm != null) {
pstm.close()
}
}
println("数据写入成功")
//================================================================
//读取MySQL数据
ps = connection.prepareStatement("select data_type,contrast_table from alert")
val resultSet = ps.executeQuery()
while (resultSet.next()) {
val tuple = Tuple2.apply(resultSet.getString(1), resultSet.getString(2))
println("读取数据:" + tuple._1 + "," + tuple._2)
}
//================================================================
}
override def process(key: Int, context: Context, elements: Iterable[(Int, ConsumerRecord[String, String])], out: Collector[(String, Int)]): Unit = {
elements.foreach(element => {
println(element._2.topic())
println(element._2.partition())
println(element._2.value())
TimeUnit.SECONDS.sleep(10);
})
out.collect(("facebook", 0))
}
override def close(): Unit = {
super.close()
//关闭MySQL连接
connection.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# MySQL Sink保存
- MysqlSink
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
//RichSinkFunction<ActivityBean> 中 ActivityBean类型为调用输入,ActivityBean自定义实体类
public class MysqlSink extends RichSinkFunction<ActivityBean> {
private transient Connection connection = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//创建MySQL连接
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "123456");
}
@Override
public void invoke(ActivityBean bean, Context context) throws Exception {
PreparedStatement pstm = null;
try {
pstm = connection.prepareStatement(
"INSERT INTO t_activity_counts (aid, event_type, counts) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE counts = ?");
pstm.setString(1, bean.aid);
pstm.setInt(2, bean.eventType);
pstm.setInt(3, bean.count);
pstm.setInt(4, bean.count);
pstm.executeUpdate();
} finally {
if(pstm != null) {
pstm.close();
}
}
}
@Override
public void close() throws Exception {
super.close();
//关闭连接
connection.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
调用
stream.map(value => { var v = new ActivityBean v }).addSink(new MysqlSink)
1
2
3
4
5
6
7
# MySQL两阶段提交事务
两阶段提交事务,为了保障前一次 CheckPoint 成功后到这次 CheckPoint 成功之前这段时间内的数据不丢失,如果执行到一半过程任务失败了,从而导致前一次CheckPoint成功后到任务失败前的数据已经存储到了MySQL,然而这部分数据并没有写入到 CheckPoint。如果任务重启后,前一次CheckPoint成功后到任务失败前的数据便会再次写入MySQL,从而导致数据重复的问题。
MySqlTwoPhaseCommitSink
package com.jast.flink.day07; import com.jast.flink.util.DruidConnectionPool; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; public class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<Tuple2<String, Integer>, MySqlTwoPhaseCommitSink.ConnectionState, Void> { public MySqlTwoPhaseCommitSink() { super(new KryoSerializer<>(ConnectionState.class, new ExecutionConfig()), VoidSerializer.INSTANCE); } @Override protected ConnectionState beginTransaction() throws Exception { System.out.println("=====> beginTransaction... "); //Class.forName("com.mysql.jdbc.Driver"); //Connection conn = DriverManager.getConnection("jdbc:mysql://172.16.200.101:3306/bigdata?characterEncoding=UTF-8", "root", "123456"); Connection connection = DruidConnectionPool.getConnection(); connection.setAutoCommit(false); return new ConnectionState(connection); } @Override protected void invoke(ConnectionState connectionState, Tuple2<String, Integer> value, Context context) throws Exception { Connection connection = connectionState.connection; PreparedStatement pstm = connection.prepareStatement("INSERT INTO t_wordcount (word, counts) VALUES (?, ?) ON DUPLICATE KEY UPDATE counts = ?"); pstm.setString(1, value.f0); pstm.setInt(2, value.f1); pstm.setInt(3, value.f1); pstm.executeUpdate(); } @Override protected void preCommit(ConnectionState connectionState) throws Exception { System.out.println("=====> preCommit... " + connectionState); } @Override protected void commit(ConnectionState connectionState) { System.out.println("=====> commit... "); Connection connection = connectionState.connection; try { connection.commit(); connection.close(); } catch (SQLException e) { throw new RuntimeException("提交事物异常"); } } @Override protected void abort(ConnectionState connectionState) { System.out.println("=====> abort... "); Connection connection = connectionState.connection; try { connection.rollback(); connection.close(); } catch (SQLException e) { throw new RuntimeException("回滚事物异常"); } } static class ConnectionState { private final transient Connection connection; ConnectionState(Connection connection) { this.connection = connection; } } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87DruidConnectionPool
package com.jast.flink.util; import com.alibaba.druid.pool.DruidDataSourceFactory; import javax.sql.DataSource; import java.sql.Connection; import java.sql.SQLException; import java.util.Properties; public class DruidConnectionPool { private transient static DataSource dataSource = null; private transient static Properties props = new Properties(); static { props.put("driverClassName", "com.mysql.jdbc.Driver"); props.put("url", "jdbc:mysql://172.16.200.101:3306/bigdata?characterEncoding=UTF-8"); props.put("username", "root"); props.put("password", "123456"); try { dataSource = DruidDataSourceFactory.createDataSource(props); } catch (Exception e) { e.printStackTrace(); } } private DruidConnectionPool() { } public static Connection getConnection() throws SQLException { return dataSource.getConnection(); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# Function 中 MySQL 获取数据定时更新
满足功能:定期通过MySQL查询数据,该数据用于数据流判断时间
- 只展示定时获取方法
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util
import java.util.Timer
import java.util.concurrent.TimeUnit
import cn.hutool.core.date.DateUtil
import com.rbt.config.Config
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import org.apache.kafka.clients.consumer.ConsumerRecord
class AlertProcessWindowFunction extends ProcessWindowFunction[(Int, ConsumerRecord[String, String]), (String, Int), Int, TimeWindow] {
/**
* MySQL连接
*/
private var connection: Connection = null
private var alertMap:scala.collection.immutable.Map[String,List[(String,String,String,String)]]=null
private var ps: PreparedStatement = null
override def open(parameters: Configuration): Unit = {
super.open(parameters);
//================================================================
//获取配置文件全局变量
val params = getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[ParameterTool]
//================================================================
//MySQL连接
val url = params.getRequired("mysql.jdbc.url")
val username = params.getRequired("mysql.username")
val passowrd = params.getRequired("mysql.password")
println("MySQL连接初始化:" + url + "\t" + username + "\t" + passowrd)
//创建MySQL连接
connection = DriverManager.getConnection(url, username, passowrd)
//MySQL查询数据定时更新,供程序使用,每隔20秒执行一次
import java.util.concurrent.Executors
val executor = Executors.newScheduledThreadPool(1)
executor.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
//================================================================
//读取MySQL数据
ps = connection.prepareStatement("select data_type,contrast_table,contrast_field,business_type from alert")
val resultSet = ps.executeQuery()
val list = new util.ArrayList[Tuple2[String, Tuple4[String, String, String, String]]]
while (resultSet.next()) {
val tuple = Tuple4.apply(resultSet.getString(1), resultSet.getString(2), resultSet.getString(3), resultSet.getString(4))
val tuple1 = Tuple2.apply(resultSet.getString(1), tuple)
list.add(tuple1)
println("读取数据:" + tuple._1 + "," + tuple._2)
}
import scala.collection.JavaConverters._
val list1 = list.asScala.toList
//预警条件转为Map格式后面用于判断
alertMap = list1.map(x => (x._2._2, x._2)).groupBy(_._1).map(x=>(x._1,x._2.map(_._2)))
println("alertMap:"+alertMap)
//================================================================
}
},0,20,TimeUnit.SECONDS)
}
override def process(key: Int, context: Context, elements: Iterable[(Int, ConsumerRecord[String, String])], out: Collector[(String, Int)]): Unit = {
out.collect(("facebook", 0))
}
override def close(): Unit = {
super.close()
//关闭MySQL连接
connection.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
上次更新: 2023/03/10, 16:49:38