案例:Flume消费Kafka数据保存Hive
# Flume消费Kafka数据保存到Hive
# 场景
通过Flume消费Kafka中数据,保存数据到ODS层,数据存储时标记消费时的元信息
# 创建Hive表
orc存储,snappy压缩,开启事务
ORC事务表
- 只能是内部表
- 必须创建桶
create TABLE hr.ods_internetbar_data
(
k_topic string ,
k_data string,
k_partition int,
k_offset int,
k_key string,
current_time bigint
)
partitioned by (pt_dt int)
CLUSTERED BY(k_partition) into 5 buckets
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS orc
TBLPROPERTIES('orc.compress'='SNAPPY','transactional'='true');
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 编写Interceptor
思路:我们将Kafka中的元信息(topic名称,partition等)通过Interceptor
转换成以'\t'分隔的数据,然后将数据保存到Hive
拦截器com.jast.flume.ETLInterceptor
代码如下:
package com.jast.flume;
import cn.hutool.json.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* @author Jast
* @description 自定义拦截器
* @date 2022-03-17 09:20
*/
public class ETLInterceptor implements Interceptor {
private final String separator = "\t";
private final String defaultValue = "";
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
//拦截kafka时,这里获取到的Head,是Kafka元信息,包括key,topic,partition,offset等信息,e.g. {topic=ZW_WB_1010000009, partition=0, offset=5425705, key={"dataType":"ZW_WB_","fileName":"yc_1644913124_11086","fullLinkId":"3e9fc439e42c4059b46e5d5664d08bf8","lineNum":17,"subFullLinkId":"77dc3a9680ee4d1caeb6719d87e8d3e5","totalCount":50}, timestamp=1648642188763}
Map<String, String> headers = event.getHeaders();
String topic = headers.getOrDefault("topic",defaultValue);
String partition = headers.getOrDefault("partition",defaultValue);
String offset = headers.getOrDefault("offset",defaultValue);
String key = headers.getOrDefault("key",defaultValue);
long currentTimeMillis = System.currentTimeMillis();
//System.out.println("拦截器head:"+headers);
byte[] body = event.getBody();
String text = new String(body, StandardCharsets.UTF_8);
if(JSONUtil.isJson(text)){
text = topic+separator+text+separator+partition+separator+offset+separator+key+separator+currentTimeMillis;
System.out.println("text:"+text);
event.setBody(text.getBytes());
return event;
}
//System.out.println("非json格式过滤掉");
return null;
}
@Override
public List<Event> intercept(List<Event> list) {
Iterator<Event> iterator = list.iterator();
while (iterator.hasNext()){
Event next = iterator.next();
if(intercept(next)==null){
iterator.remove();
}
}
return list;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new ETLInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flume-interceptor</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.19</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
打包后生成flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar
文件
将生成的包放到flume lib 目录中
自己部署的Flume:
需要先将打好的包放入到
flume/lib
文件夹下面CDH版本Flume:
需要先将打好的包放入到
flume/lib
文件夹下面
具体的目录/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/flume-ng/lib/
cp flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/flume-ng/lib
1
# 编写Flume配置文件
需要注意的是a1.sinks.k1.serializer.fieldnames
和a1.sources.r1.interceptors.i1.type
a1.sinks.k1.serializer.fieldnames
:拦截器\t
处理后在这里与Hive表的字段对应上
a1.sources.r1.interceptors.i1.type
:拦截器配置
配置文件k2h.config
内容如下
## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
# 每批次数量
a1.sources.r1.batchSize = 1000
# 将批写入通道之前的最长时间(毫秒)。每当达到第一个大小和时间时,将写入批。
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 192.168.60.16:9092
# kafka消费主题,多个主题逗号分隔
a1.sources.r1.kafka.topics=ZW_WB_1010000009
# 默认:PLAINTEXT,如果使用某种安全级别写入 Kafka,则设置为 SASL_PLAINTEXT、SASL_SSL 或 SSL。
a1.sources.r1.kafka.consumer.security.protocol=PLAINTEXT
# 配置拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.jast.flume.ETLInterceptor$Builder
## channel1
# 基于文件存储的Channel
a1.channels.c1.type = file
# checkpoint 保存目录
a1.channels.c1.checkpointDir = /var/lib/hadoop-hdfs/flume/checkpoint/behavior1
# 用于存储日志文件的目录的逗号分隔列表。 在不同磁盘上使用多个目录可以提高文件通道性能
a1.channels.c1.dataDirs = /var/lib/hadoop-hdfs/flume/data/behavior1/
# 最大事务大小
a1.channels.c1.transactionCapacity=10000
# 通道最大容量
a1.channels.c1.capacity=1000000
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://192.168.60.14:9083
# hive 库名
a1.sinks.k1.hive.database = hr
# hive表名称
a1.sinks.k1.hive.table = ods_internetbar_data
# 分区使用字段,转义字符介绍在文章下面
a1.sinks.k1.hive.partition = %y%m%d
# 在替换转义序列时使用本地时间(而不是事件标头中的时间戳)。
a1.sinks.k1.useLocalTimeStamp = false
# ============================================================
# e.g. 每6小时产生一个新文件,比如把24小时分成4份,假设现在的时间是15:40,如果这时候有新的日志到来,那么hdfs 会创建一个新的hdfs文件,文件名称是2015102012 ,就是15:40 是分布在12-18这个区间的
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
# collector1.sinks.sink_hdfs.hdfs.rollSize = 2048000000
# collector1.sinks.sink_hdfs.hdfs.rollCount = 0
# collector1.sinks.sink_hdfs.hdfs.rollInterval = 21600
# rollsize 的配置表示到2G大小的时候回滚到下一个文件,也就是到了这个时间 hdfs就会rename正在写的文件到已经写完
# rollInterval 的配置表示每个6小时回滚到下一个文件
# ============================================================
# 数据解析方式,目前支持DELIMITED(分隔符),JSON(每行为单层Json)和REGEX(正则表达式)
a1.sinks.k1.serializer = DELIMITED
# 数据字段分割符,如果要使用特殊字符需要添加双引号,例如"\t"
a1.sinks.k1.serializer.delimiter = "\t"
# 数据解析的正则表达式,每个字段的数据被解析成一个group
# a1.sinks.k1.serializer.regex =
# (类型:字符)自定义底层 serde 使用的分隔符。 如果 serializer.fieldnames 中的字段与表列的顺序相同,serializer.delimiter 与 serializer.serdeSeparator 相同,并且 serializer.fieldnames 中的字段数小于或等于表数,则效率会有所提高 列,因为传入事件正文中的字段不需要重新排序以匹配表列的顺序。 对特殊字符使用单引号,例如“\t”。 确保输入字段不包含此字符。 注意:如果 serializer.delimiter 是单个字符,最好将其设置为相同的字符
a1.sinks.k1.serializer.serdeSeparator = '\t'
# 输入数据字段到datahub字段的映射,以输入的顺序标示字段,如果要跳过某个字段, 不指定列名即可,例如 c1,c2,,c3,表示将输入数据的第一、二、四字段和hive的c1,c2,c3字段进行匹配。
a1.sinks.k1.serializer.fieldnames = k_topic,k_data,k_partition,k_offset,k_key,current_time
# 单个配置单元事务中写入配置单元的最大事件数,默认15000
a1.sinks.k1.batchSize=10000
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
上面配置中我们使用到了转义序列a1.sinks.k1.hive.partition = %y%m%d
支持的转义序列:
Alias | Description |
---|---|
%{host} | Substitute value of event header named “host”. Arbitrary header names are supported. |
%t | Unix time in milliseconds |
%a | locale’s short weekday name (Mon, Tue, ...) |
%A | locale’s full weekday name (Monday, Tuesday, ...) |
%b | locale’s short month name (Jan, Feb, ...) |
%B | locale’s long month name (January, February, ...) |
%c | locale’s date and time (Thu Mar 3 23:05:25 2005) |
%d | day of month (01) |
%D | date; same as %m/%d/%y |
%H | hour (00..23) |
%I | hour (01..12) |
%j | day of year (001..366) |
%k | hour ( 0..23) |
%m | month (01..12) |
%M | minute (00..59) |
%p | locale’s equivalent of am or pm |
%s | seconds since 1970-01-01 00:00:00 UTC |
%S | second (00..59) |
%y | last two digits of year (00..99) |
%Y | year (2010) |
%z | +hhmm numeric timezone (for example, -0400) |
# 启动Flume收集数据
flume-ng agent --conf-file k2h.config --name a1
1
# 查看数据
select k_topic,k_partition,k_offset,k_key,current_time,length(k_data) from ods_internetbar_data limit 10;
----
ZW_WB_1010000009 0 5441712 {"dataType":"ZW_WB_","fileName":"yc_1644913124_11086","fullLinkId":"f3bc635822ca4af099755b9b339e8e4c","lineNum":24,"subFullLinkId":"c2e3704534e745c0a779b16f059e6411","totalCount":50} 1648644001736 52105
ZW_WB_1010000009 0 5441713 {"dataType":"ZW_WB_","fileName":"yc_1644913124_11086","fullLinkId":"f3bc635822ca4af099755b9b339e8e4c","lineNum":25,"subFullLinkId":"c0cec18006ce40848ea3a0494a991999","totalCount":50} 1648644001736 35837
1
2
3
4
5
2
3
4
5
这里使用
length(k_data)
是为了方便展示,该字段值太大了
上次更新: 2023/03/10, 16:49:38