DataX插件开发-KafkaWriter
[toc]
# 下载源码
下载源码:https://github.com/alibaba/DataX/releases/tag/datax_v202210
这里使用的是 : datax_v202210 版本
DataX使用手册:https://github.com/alibaba/DataX/blob/master/introduction.md
DataX插件说明:https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md
# 插件开发
# 创建kafkawriter模块
# pom.xml
主要添加kafka-clients
依赖,datax一些默认依赖,内容如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>kafkawriter</artifactId>
<properties>
<kafka.version>1.1.1</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
说明:
- maven-assembly-plugin
首先我们要了解Datax打包的方式:Maven-assembly-plugin
1、作用:要想将写的程序和它本身所依赖的jar包一起build到一个包里,是maven中针对打包任务而提供的标准插件。
2、其他作用:
1)提供一个把工程依赖元素、模块、网站文档等其他文件存放到单个归档文件里。
2)打包成指定格式分发包,支持各种主流的格式如zip、tar.gz、jar和war等,具体打包哪些文件是高度可控的。
3)能够自定义包含/排除指定的目录或文件。
总体来说,实现插件maven-assembly-plugin需要两个步骤:
第1步骤:pom.xml文件里配置maven-assembly-plugin,指定描述文件
第2步骤:描述文件配置具体参数
在kafkawriter的pom文件中需要新增
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors> <!--描述文件路径-->
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase> <!-- 绑定到package生命周期阶段上 -->
<goals>
<goal>single</goal> <!-- 只运行一次 -->
</goals>
</execution>
</executions>
</plugin>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# plugin.sjon
存放目录 src/main/resources/plugin.json
{
"name": "kafkawriter",
"class": "com.alibaba.datax.plugin.writer.KafkaWriter",
"description": "Kafka Writer",
"developer": "Jast"
}
2
3
4
5
6
# package.xml
存在目录src/main/assembly/package.xml
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
</includes>
<outputDirectory>plugin/writer/kafkawriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>kafkawriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/kafkawriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/kafkawriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 类
# com.alibaba.datax.plugin.writer.KafkaWriter
package com.alibaba.datax.plugin.writer;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
*
Job和Task之间一定不能有共享变量,因为分布式运行时不能保证共享变量会被正确初始化。两者之间只能通过配置文件进行依赖。
prepare和post在Job和Task中都存在,插件需要根据实际情况确定在什么地方执行操作。
* @author mac
*/
public class KafkaWriter extends Writer {
public static class Job extends Writer.Job {
private static final Logger log = LoggerFactory.getLogger(Job.class);
private Configuration conf = null;
/**
* init: Job对象初始化工作,此时可以通过super.getPluginJobConf()获取与本插件相关的配置。
* 读插件获得配置中reader部分,写插件获得writer部分。
*/
@Override
public void init() {
this.conf = super.getPluginJobConf();//获取配置文件信息{parameter 里面的参数}
log.info("kafka writer params:{}", conf.toJSON());
//校验 参数配置
this.validateParameter();
}
private void validateParameter() {
//toipc 必须填
this.conf
.getNecessaryValue(
Key.TOPIC,
KafkaWriterErrorCode.REQUIRED_VALUE);
this.conf
.getNecessaryValue(
Key.BOOTSTRAP_SERVERS,
KafkaWriterErrorCode.REQUIRED_VALUE);
}
/**
* prepare: 全局准备工作,比如odpswriter清空目标表。
*/
@Override
public void prepare() {
}
/**
* split: 拆分Task。参数adviceNumber框架建议的拆分数,一般是运行时所配置的并发度。值返回的是Task的配置列表。
* @param mandatoryNumber
* 为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错!
*
* @return
*/
@Override
public List<Configuration> split(int mandatoryNumber) {
//按照reader 配置文件的格式 来 组织相同个数的writer配置文件
List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);
for (int i = 0; i < mandatoryNumber; i++) {
configurations.add(conf);
}
return configurations;
}
/**
* post: 全局的后置工作,比如mysqlwriter同步完影子表后的rename操作。
*/
@Override
public void post() {
log.info("job destroy ");
}
/**
* destroy: Job对象自身的销毁工作。
*/
@Override
public void destroy() {
}
}
public static class Task extends Writer.Task {
private static final Logger log = LoggerFactory.getLogger(Task.class);
private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n");
private Producer<String, String> producer;
private String fieldDelimiter;
private Configuration conf;
/**
* init:Task对象的初始化。此时可以通过super.getPluginJobConf()获取与本Task相关的配置。这里的配置是Job的split方法返回的配置列表中的其中一个。
*/
@Override
public void init() {
this.conf = super.getPluginJobConf();
fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);
//初始化kafka
Properties props = new Properties();
props.put("bootstrap.servers", conf.getString(Key.BOOTSTRAP_SERVERS));
props.put("acks", "all");//这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
props.put("retries", 0);
// Controls how much bytes sender would wait to batch up before publishing to Kafka.
//控制发送者在发布到kafka之前等待批处理的字节数。
//控制发送者在发布到kafka之前等待批处理的字节数。 满足batch.size和ling.ms之一,producer便开始发送消息
//默认16384 16kb
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer(props);
}
/**
* prepare:局部的准备工作。
*/
@Override
public void prepare() {
super.prepare();
}
/**
* startWrite:从RecordReceiver中读取数据,写入目标数据源。RecordReceiver中的数据来自Reader和Writer之间的缓存队列。
* @param lineReceiver
*/
@Override
public void startWrite(RecordReceiver lineReceiver) {
log.info("start to writer kafka");
Record record = null;
while ((record = lineReceiver.getFromReader()) != null) {//说明还在读取数据,或者读取的数据没处理完
//获取一行数据,按照指定分隔符 拼成字符串 发送出去
producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
recordToString(record), recordToString(record)));
}
}
/**
* destroy: Task象自身的销毁工作。
*/
@Override
public void destroy() {
log.info("Waiting for message to be successfully sent");
producer.flush();
log.info("Message sent successfully");
if (producer != null) {
producer.close();
}
}
private String recordToString(Record record) {
int recordLength = record.getColumnNumber();
if (0 == recordLength) {
return NEWLINE_FLAG;
}
Column column;
StringBuilder sb = new StringBuilder();
for (int i = 0; i < recordLength; i++) {
column = record.getColumn(i);
sb.append(column.asString()).append(fieldDelimiter);
}
sb.setLength(sb.length() - 1);
sb.append(NEWLINE_FLAG);
return sb.toString();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# com.alibaba.datax.plugin.writer.KafkaWriterErrorCode
package com.alibaba.datax.plugin.writer;
import com.alibaba.datax.common.spi.ErrorCode;
public enum KafkaWriterErrorCode implements ErrorCode {
REQUIRED_VALUE("KafkaWriter-00", "Required parameter is not filled .")
;
private final String code;
private final String description;
KafkaWriterErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s]. ", this.code,
this.description);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# com.alibaba.datax.plugin.writer.Key
package com.alibaba.datax.plugin.writer;
public class Key {
public static final String FIELD_DELIMITER = "fieldDelimiter";
public static final String BOOTSTRAP_SERVERS = "bootstrapServers";
public static final String TOPIC = "topic";
}
2
3
4
5
6
7
# 在DataX项目根目录下修改package.xml文件
在fileSets
中添加
<fileSet>
<directory>kafkawriter/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
2
3
4
5
6
7
# 打包
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
或者IDEA中直接打包
生成打插件在目录DataX-datax_v202210/kafkawriter/target/datax/plugin
# 安装Datax
# 下载DataX
wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gz
# 解压
tar -zxvf datax.tar.gz
# 上传自定义KafkaWriter
将自己开发的plugin
目录上传到DataX工具目录下,并解压
[hadoop@10 ~/datax]$ ll plugin/writer/
total 148
.....
drwxr-xr-x 3 hadoop hadoop 4096 Dec 9 15:22 kafkawriter
.....
2
3
4
5
# 创建启动配置文件text2kafka.json
实现功能:读取文本内容,将数据发送到Kafka
{
"setting": {},
"job": {
"setting": {
"speed": {
"channel": 2
}
},
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": ["/home/hadoop/data.txt"],
"encoding": "UTF-8",
"column": [
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "kafkawriter",
"parameter": {
"topic": "behavior_test",
"bootstrapServers": "10.16.0.2:9092",
"fieldDelimiter": ","
}
}
}
]
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 启动
python bin/datax.py job/text2kafka.json
可能报错:
2022-12-09 15:18:30.412 [main] WARN ConfigParser - 插件[txtfilereader,kafkawriter]加载失败,1s后重试... Exception:Code:[Common-00], Describe:[您提供的配置文件存在错误信息,请检查您的作业配置 .] - 配置信息错误,您提供的配置文件[/home/hadoop/datax/plugin/writer/.DS_Store/plugin.json]不存在. 请检查您的配置文件.
原因,Mac电脑打包自己默认将
.DS_Store
打进去了,需要删除,不然DataX会解析失败
# 执行结果
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
2022-12-09 15:23:56.947 [main] INFO MessageSource - JVM TimeZone: GMT+08:00, Locale: zh_CN
2022-12-09 15:23:56.949 [main] INFO MessageSource - use Locale: zh_CN timeZone: sun.util.calendar.ZoneInfo[id="GMT+08:00",offset=28800000,dstSavings=0,useDaylight=false,transitions=0,lastRule=null]
2022-12-09 15:23:56.959 [main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2022-12-09 15:23:56.963 [main] INFO Engine - the machine info =>
osInfo: Tencent 1.8 25.282-b1
jvmInfo: Linux amd64 5.4.119-19-0007
cpu num: 8
totalPhysicalMemory: -0.00G
freePhysicalMemory: -0.00G
maxFileDescriptorCount: -1
currentOpenFileDescriptorCount: -1
GC Names [PS MarkSweep, PS Scavenge]
MEMORY_NAME | allocation_size | init_size
PS Eden Space | 256.00MB | 256.00MB
Code Cache | 240.00MB | 2.44MB
Compressed Class Space | 1,024.00MB | 0.00MB
PS Survivor Space | 42.50MB | 42.50MB
PS Old Gen | 683.00MB | 683.00MB
Metaspace | -0.00MB | 0.00MB
2022-12-09 15:23:56.976 [main] INFO Engine -
{
"content":[
{
"reader":{
"name":"txtfilereader",
"parameter":{
"column":[],
"encoding":"UTF-8",
"fieldDelimiter":",",
"path":[
"/home/hadoop/data.txt"
]
}
},
"writer":{
"name":"kafkawriter",
"parameter":{
"bootstrapServers":"10.16.0.2:9092",
"fieldDelimiter":",",
"topic":"behavior_test"
}
}
}
],
"setting":{
"speed":{
"channel":2
}
}
}
2022-12-09 15:23:56.988 [main] WARN Engine - prioriy set to 0, because NumberFormatException, the value is: null
2022-12-09 15:23:56.989 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2022-12-09 15:23:56.989 [main] INFO JobContainer - DataX jobContainer starts job.
2022-12-09 15:23:56.991 [main] INFO JobContainer - Set jobId = 0
2022-12-09 15:23:57.003 [job-0] INFO KafkaWriter$Job - kafka writer params:{"bootstrapServers":"10.16.0.2:9092","fieldDelimiter":",","topic":"behavior_test"}
2022-12-09 15:23:57.004 [job-0] INFO JobContainer - jobContainer starts to do prepare ...
2022-12-09 15:23:57.004 [job-0] INFO JobContainer - DataX Reader.Job [txtfilereader] do prepare work .
2022-12-09 15:23:57.005 [job-0] INFO TxtFileReader$Job - add file [/home/hadoop/data.txt] as a candidate to be read.
2022-12-09 15:23:57.005 [job-0] INFO TxtFileReader$Job - 您即将读取的文件数为: [1]
2022-12-09 15:23:57.005 [job-0] INFO JobContainer - DataX Writer.Job [kafkawriter] do prepare work .
2022-12-09 15:23:57.006 [job-0] INFO JobContainer - jobContainer starts to do split ...
2022-12-09 15:23:57.006 [job-0] INFO JobContainer - Job set Channel-Number to 2 channels.
2022-12-09 15:23:57.006 [job-0] INFO JobContainer - DataX Reader.Job [txtfilereader] splits to [1] tasks.
2022-12-09 15:23:57.006 [job-0] INFO JobContainer - DataX Writer.Job [kafkawriter] splits to [1] tasks.
2022-12-09 15:23:57.021 [job-0] INFO JobContainer - jobContainer starts to do schedule ...
2022-12-09 15:23:57.024 [job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2022-12-09 15:23:57.025 [job-0] INFO JobContainer - Running by standalone Mode.
2022-12-09 15:23:57.030 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2022-12-09 15:23:57.033 [taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2022-12-09 15:23:57.034 [taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
2022-12-09 15:23:57.042 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2022-12-09 15:23:57.042 [0-0-0-reader] INFO TxtFileReader$Task - reading file : [/home/hadoop/data.txt]
2022-12-09 15:23:57.058 [0-0-0-writer] INFO ProducerConfig - ProducerConfig values:
acks = all
batch.size = 16384
bootstrap.servers = [10.16.0.2:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 1
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2022-12-09 15:23:57.105 [0-0-0-reader] INFO UnstructuredStorageReaderUtil - CsvReader使用默认值[{"captureRawRecord":true,"columnCount":0,"comment":"#","currentRecord":-1,"delimiter":",","escapeMode":1,"headerCount":0,"rawRecord":"","recordDelimiter":"\u0000","safetySwitch":false,"skipEmptyRecords":true,"textQualifier":"\"","trimWhitespace":true,"useComments":false,"useTextQualifier":true,"values":[]}],csvReaderConfig值为[null]
2022-12-09 15:23:57.158 [0-0-0-writer] INFO AppInfoParser - Kafka version : 1.1.1
2022-12-09 15:23:57.158 [0-0-0-writer] INFO AppInfoParser - Kafka commitId : 98b6346a977495f6
2022-12-09 15:23:57.159 [0-0-0-writer] INFO KafkaWriter$Task - start to writer kafka
2022-12-09 15:23:57.227 [kafka-producer-network-thread | producer-1] INFO Metadata - Cluster ID: 4SU0GyNWQpOfGrHCJfZwQw
2022-12-09 15:23:57.236 [0-0-0-writer] INFO KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2022-12-09 15:23:57.243 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[202]ms
2022-12-09 15:23:57.243 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] completed it's tasks.
2022-12-09 15:24:07.040 [job-0] INFO StandAloneJobContainerCommunicator - Total 11 records, 33 bytes | Speed 3B/s, 1 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2022-12-09 15:24:07.040 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2022-12-09 15:24:07.040 [job-0] INFO JobContainer - DataX Writer.Job [kafkawriter] do post work.
2022-12-09 15:24:07.040 [job-0] INFO JobContainer - DataX Reader.Job [txtfilereader] do post work.
2022-12-09 15:24:07.040 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
2022-12-09 15:24:07.041 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: /home/hadoop/datax/hook
2022-12-09 15:24:07.042 [job-0] INFO JobContainer -
[total cpu info] =>
averageCpu | maxDeltaCpu | minDeltaCpu
-1.00% | -1.00% | -1.00%
[total gc info] =>
NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime
PS MarkSweep | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s
PS Scavenge | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s
2022-12-09 15:24:07.042 [job-0] INFO JobContainer - PerfTrace not enable!
2022-12-09 15:24:07.043 [job-0] INFO StandAloneJobContainerCommunicator - Total 11 records, 33 bytes | Speed 3B/s, 1 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2022-12-09 15:24:07.043 [job-0] INFO JobContainer -
任务启动时刻 : 2022-12-09 15:23:56
任务结束时刻 : 2022-12-09 15:24:07
任务总计耗时 : 10s
任务平均流量 : 3B/s
记录写入速度 : 1rec/s
读出记录总数 : 11
读写失败总数 : 0
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# 在Kafka查看消息
这里不做截图,实际是发送进来了
# 脚本样例
# 读Txt发送Kafka
{
"setting": {},
"job": {
"setting": {
"speed": {
"channel": 2
}
},
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": ["/home/hadoop/data.txt"],
"encoding": "UTF-8",
"column": [
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "kafkawriter",
"parameter": {
"topic": "behavior_test",
"bootstrapServers": "10.16.0.2:9092",
"fieldDelimiter": ","
}
}
}
]
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 读Hive发送Kafka
同步脚本
python bin/datax.py -p "-Dday=2020-03-10" job/hive2kafka.json
hive2kafka.json 文件
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/usr/hive/warehouse/ods.db/ods_tt_clue_intent/dt=${day}",
"defaultFS": "hdfs://10.16.0.12:4010",
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "string"
},
{
"index": 5,
"type": "string"
},
{
"index": 6,
"type": "string"
},
{
"index": 7,
"type": "string"
},
{
"index": 8,
"type": "string"
},
{
"index": 9,
"type": "string"
},
{
"index": 10,
"type": "string"
},
{
"index": 11,
"type": "string"
},
{
"index": 12,
"type": "string"
},
{
"index": 13,
"type": "string"
},
{
"index": 14,
"type": "string"
},
{
"index": 15,
"type": "string"
},
{
"index": 16,
"type": "string"
},
{
"index": 17,
"type": "string"
},
{
"index": 18,
"type": "string"
},
{
"index": 19,
"type": "string"
},
{
"index": 20,
"type": "string"
},
{
"index": 21,
"type": "string"
},
{
"index": 22,
"type": "string"
},
{
"index": 23,
"type": "string"
},
{
"index": 24,
"type": "string"
},
{
"index": 25,
"type": "string"
},
{
"index": 26,
"type": "string"
},
{
"index": 27,
"type": "string"
},
{
"type": "string",
"value": "${day}"
}
],
"fieldDelimiter":",",
"fileType": "orc",
"nullFormat":"\\N"
}
},
"writer": {
"name": "kafkawriter",
"parameter": {
"topic": "behavior_test",
"bootstrapServers": "10.16.0.2:9092",
"fieldDelimiter": ","
}
}
}
]
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
参考:https://www.imooc.com/article/259830