数据湖Iceberg-Flink DataFrame集成(7)
[toc]
数据湖Iceberg-简介(1) (opens new window) 数据湖Iceberg-存储结构(2) (opens new window) 数据湖Iceberg-Hive集成Iceberg(3) (opens new window) 数据湖Iceberg-SparkSQL集成(4) (opens new window) 数据湖Iceberg-FlinkSQL集成(5) (opens new window) 数据湖Iceberg-FlinkSQL-kafka类型表数据无法成功写入(6) (opens new window) 数据湖Iceberg-Flink DataFrame集成(7) (opens new window)
# 环境准备
# 配置pom.xml
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.14.3</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope> <!–不会打包到依赖中,只参与编译,不参与运行 –>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<!--idea运行时也有webui-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
<!-- <scope>provided</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-flink-runtime-1.16 -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime-1.14</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
<exclude>org.apache.hadoop:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# 配置log4j
resources目录下新建log4j.properties。
log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
1
2
3
4
5
2
3
4
5
# 写入数据
目前支持DataStream
def writeData() = {
val path = "hdfs://172.16.24.194:8020/iceberg/iceberg-hadoop/iceberg_db/sample0424"
val env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1)
val input: SingleOutputStreamOperator[RowData] = env.fromElements("")
.map(x => {
import org.apache.flink.table.data.GenericRowData
val genericRowData = new GenericRowData(2)
genericRowData.setField(0, 123455L)
genericRowData.setField(1, StringData.fromString("99L"))
genericRowData
})
val loader: TableLoader = TableLoader.fromHadoopTable(path)
FlinkSink.forRowData(input)
.tableLoader(loader)
// .set("write-format", "orc") //设置其他参数
// .set(FlinkWriteOptions.OVERWRITE_MODE, "true") //设置其他参数
.append() // append方式
// .overwrite(true) //overwrite方式
// .upsert(true) // upsert方式
env.execute()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
写入配置参数选项
选项 | 默认值 | 说明 |
---|---|---|
write-format | Parquet 同write.format.default | 写入操作使用的文件格式:Parquet, avro或orc |
target-file-size-bytes | 536870912(512MB) 同write.target-file-size-bytes | 控制生成的文件的大小,目标大约为这么多字节 |
upsert-enabled | 同write.upsert.enabled, | |
overwrite-enabled | false | 覆盖表的数据,不能和UPSERT模式同时开启 |
distribution-mode | None 同 write.distribution-mode | 定义写数据的分布方式: none:不打乱行; hash:按分区键散列分布; range:如果表有SortOrder,则通过分区键或排序键分配 |
compression-codec | 同 write.(fileformat).compression-codec | |
compression-level | 同 write.(fileformat).compression-level | |
compression-strategy | 同write.orc.compression-strategy |
# 读取数据
# 常规Source写法
# Batch方式
/**
* 读取 Iceberg 表数据的方法
*
* @return 返回读取的数据
* @author Jast
*/
def readDataForBatch() = {
// 定义Iceberg表路径
val path = "hdfs://172.16.24.194:8020/iceberg/iceberg-hadoop/iceberg_db/sample0424"
// 获取Flink执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从路径中加载Iceberg表
val tableLoader = TableLoader.fromHadoopTable(path);
// 构建一个消费Iceberg表数据的DataStream
val batch: DataStream[RowData] = FlinkSource.forRowData()
.env(env)
.tableLoader(tableLoader)
// .startSnapshotId() 可以指定从哪个快照开始
.streaming(false)
.build();
// 对每行数据的第0个和第1个字段构造一个二元组,返回值类型是Tuple2[Long, StringData],并打印出数据内容
batch.map(r => (r.getLong(0), r.getString(1)))
.returns(TypeInformation.of(new TypeHint[(Long, StringData)] {}))
.print("数据")
// 执行Flink任务
env.execute("Test Iceberg Read");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# Streaming方式
def readDataForStream() = {
// 定义Iceberg表路径
val path = "hdfs://172.16.24.194:8020/iceberg/iceberg-hadoop/iceberg_db/sample0424"
// 获取Flink执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从路径中加载Iceberg表
val tableLoader = TableLoader.fromHadoopTable(path);
// 构建一个消费Iceberg表数据的DataStream
val batch: DataStream[RowData] = FlinkSource.forRowData()
.env(env)
.tableLoader(tableLoader)
.streaming(true)
.build();
// 对每行数据的第0个和第1个字段构造一个二元组,返回值类型是Tuple2[Long, StringData],并打印出数据内容
batch.map(r => (r.getLong(0), r.getString(1)))
.returns(TypeInformation.of(new TypeHint[(Long, StringData)] {}))
.print("数据流")
// 执行Flink任务
env.execute("Test Iceberg Read");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# FLIP-27 Source写法
# Batch方式
def readDataForBatchFLIP27() = {
// 定义Iceberg表路径
val path = "hdfs://172.16.24.194:8020/iceberg/iceberg-hadoop/iceberg_db/sample0424"
// 获取Flink执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从路径中加载Iceberg表
val tableLoader = TableLoader.fromHadoopTable(path);
val source: IcebergSource[RowData] = IcebergSource.forRowData()
.tableLoader(tableLoader)
.assignerFactory(new SimpleSplitAssignerFactory)
.build()
val batch: DataStream[RowData] = env.fromSource(
source,
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
"Iceberg Source",
TypeInformation.of(classOf[RowData])
)
// 对每行数据的第0个和第1个字段构造一个二元组,返回值类型是Tuple2[Long, StringData],并打印出数据内容
batch.map(r => (r.getLong(0), r.getString(1)))
.returns(TypeInformation.of(new TypeHint[(Long, StringData)] {}))
.print("FLIP27数据")
// 执行Flink任务
env.execute("Test Iceberg Read");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Streaming方式
def readDataForStreamFLIP27() = {
// 定义Iceberg表路径
val path = "hdfs://172.16.24.194:8020/iceberg/iceberg-hadoop/iceberg_db/sample0424"
// 获取Flink执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从路径中加载Iceberg表
val tableLoader = TableLoader.fromHadoopTable(path);
val source: IcebergSource[RowData] = IcebergSource.forRowData()
.tableLoader(tableLoader)
.assignerFactory(new SimpleSplitAssignerFactory)
.streaming(true)
.streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
.build()
val batch: DataStream[RowData] = env.fromSource(
source,
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
"Iceberg Source",
TypeInformation.of(classOf[RowData])
)
// 对每行数据的第0个和第1个字段构造一个二元组,返回值类型是Tuple2[Long, StringData],并打印出数据内容
batch.map(r => (r.getLong(0), r.getString(1)))
.returns(TypeInformation.of(new TypeHint[(Long, StringData)] {}))
.print("FLIP27数据流")
// 执行Flink任务
env.execute("Test Iceberg Read");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 合并小文件
/**
* 合并小文件
*
* @return
*/
def mergeSmallFiles() = {
// 1.获取 Table对象
// 1.1 创建 catalog对象
// val conf: Configuration = new Configuration()
// val hadoopCatalog = new HadoopCatalog(conf, "hdfs://hadoop1:8020/warehouse/spark-iceberg")
// 1.2 通过 catalog加载 Table对象
// val table: Table = hadoopCatalog.loadTable(TableIdentifier.of("default", "a"))
// 有Table对象,就可以获取元数据、进行维护表的操作
// System.out.println(table.history());
// System.out.println(table.expireSnapshots().expireOlderThan());
// 2.通过 Actions 来操作 合并
// Actions.forTable(table)
// .rewriteDataFiles()
// .targetSizeInBytes(1024L)
// .execute();
// 1. 获取 Table 对象
// 1.1 创建 Configuration 对象
val conf: Configuration = new Configuration()
// 1.2 创建 HadoopCatalog 对象,传入 Configuration 对象和表路径字符串
val hadoopCatalog = new HadoopCatalog(conf, "hdfs://hadoop1:8020/warehouse/spark-iceberg")
// 1.3 通过 TableIdentifier.of() 方法创建 TableIdentifier 对象,传入库名和表名
val tableId: TableIdentifier = TableIdentifier.of("default", "a")
// 1.4 通过 HadoopCatalog 对象的 loadTable() 方法加载 Table 对象,传入 TableIdentifier 对象
val table: Table = hadoopCatalog.loadTable(tableId)
// 2. 通过 Actions 来操作合并
// 2.1 调用 Actions.forTable() 方法,传入 Table 对象,实例化 Actions 对象
val actions: Actions = Actions.forTable(table)
// 2.2 调用 rewriteDataFiles() 方法,实现合并操作
val rewritten: RewriteDataFilesAction = actions.rewriteDataFiles()
// 2.3 设置合并后数据文件的目标大小为 1024 字节,调用 targetSizeInBytes() 方法
val withTargetSize: RewriteDataFiles = rewritten.targetSizeInBytes(1024L)
// 2.4 执行合并操作,调用 execute() 方法
withTargetSize.execute()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
上次更新: 2023/04/26, 17:38:02