Java项目通过Log4j将日志发送到Kafka
[toc]
# 实现功能
通过Log4j方式将日志自动发送到Kafka Topic中。
# 实现步骤
# pom.xml依赖
<properties>
<kafka.client.version>2.4.1</kafka.client.version>
<junit.version>4.13.2</junit.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.client.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-log4j-appender</artifactId>
<version>${kafka.client.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 发送Kafka处理类
package wiki.hadoop.log4j.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class KafkaLog4jAppender extends AppenderSkeleton {
private String brokerList;
private String topic;
private String compressionType;
private String securityProtocol;
private String sslTruststoreLocation;
private String sslTruststorePassword;
private String sslKeystoreType;
private String sslKeystoreLocation;
private String sslKeystorePassword;
private String saslKerberosServiceName;
private String saslMechanism;
private String clientJaasConfPath;
private String clientJaasConf;
private String kerb5ConfPath;
private Integer maxBlockMs;
private int retries = 2147483647;
private int requiredNumAcks = 1;
private int deliveryTimeoutMs = 120000;
private boolean ignoreExceptions = true;
private boolean syncSend;
private Producer<byte[], byte[]> producer;
public KafkaLog4jAppender() {}
public Producer<byte[], byte[]> getProducer() {
return this.producer;
}
public String getBrokerList() {
return this.brokerList;
}
public void setBrokerList(String brokerList) {
this.brokerList = brokerList;
}
public int getRequiredNumAcks() {
return this.requiredNumAcks;
}
public void setRequiredNumAcks(int requiredNumAcks) {
this.requiredNumAcks = requiredNumAcks;
}
public int getRetries() {
return this.retries;
}
public void setRetries(int retries) {
this.retries = retries;
}
public int getDeliveryTimeoutMs() {
return this.deliveryTimeoutMs;
}
public void setDeliveryTimeoutMs(int deliveryTimeoutMs) {
this.deliveryTimeoutMs = deliveryTimeoutMs;
}
public String getCompressionType() {
return this.compressionType;
}
public void setCompressionType(String compressionType) {
this.compressionType = compressionType;
}
public String getTopic() {
return this.topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public boolean getIgnoreExceptions() {
return this.ignoreExceptions;
}
public void setIgnoreExceptions(boolean ignoreExceptions) {
this.ignoreExceptions = ignoreExceptions;
}
public boolean getSyncSend() {
return this.syncSend;
}
public void setSyncSend(boolean syncSend) {
this.syncSend = syncSend;
}
public String getSslTruststorePassword() {
return this.sslTruststorePassword;
}
public String getSslTruststoreLocation() {
return this.sslTruststoreLocation;
}
public String getSecurityProtocol() {
return this.securityProtocol;
}
public void setSecurityProtocol(String securityProtocol) {
this.securityProtocol = securityProtocol;
}
public void setSslTruststoreLocation(String sslTruststoreLocation) {
this.sslTruststoreLocation = sslTruststoreLocation;
}
public void setSslTruststorePassword(String sslTruststorePassword) {
this.sslTruststorePassword = sslTruststorePassword;
}
public void setSslKeystorePassword(String sslKeystorePassword) {
this.sslKeystorePassword = sslKeystorePassword;
}
public void setSslKeystoreType(String sslKeystoreType) {
this.sslKeystoreType = sslKeystoreType;
}
public void setSslKeystoreLocation(String sslKeystoreLocation) {
this.sslKeystoreLocation = sslKeystoreLocation;
}
public void setSaslKerberosServiceName(String saslKerberosServiceName) {
this.saslKerberosServiceName = saslKerberosServiceName;
}
public void setClientJaasConfPath(String clientJaasConfPath) {
this.clientJaasConfPath = clientJaasConfPath;
}
public void setKerb5ConfPath(String kerb5ConfPath) {
this.kerb5ConfPath = kerb5ConfPath;
}
public String getSslKeystoreLocation() {
return this.sslKeystoreLocation;
}
public String getSslKeystoreType() {
return this.sslKeystoreType;
}
public String getSslKeystorePassword() {
return this.sslKeystorePassword;
}
public String getSaslKerberosServiceName() {
return this.saslKerberosServiceName;
}
public String getClientJaasConfPath() {
return this.clientJaasConfPath;
}
public void setSaslMechanism(String saslMechanism) {
this.saslMechanism = saslMechanism;
}
public String getSaslMechanism() {
return this.saslMechanism;
}
public void setClientJaasConf(String clientJaasConf) {
this.clientJaasConf = clientJaasConf;
}
public String getClientJaasConf() {
return this.clientJaasConf;
}
public String getKerb5ConfPath() {
return this.kerb5ConfPath;
}
public int getMaxBlockMs() {
return this.maxBlockMs;
}
public void setMaxBlockMs(int maxBlockMs) {
this.maxBlockMs = maxBlockMs;
}
@Override
public void activateOptions() {
Properties props = new Properties();
if (this.brokerList != null) {
props.put("bootstrap.servers", this.brokerList);
}
if (props.isEmpty()) {
throw new ConfigException("The bootstrap servers property should be specified");
} else if (this.topic == null) {
throw new ConfigException("Topic must be specified by the Kafka log4j appender");
} else {
if (this.compressionType != null) {
props.put("compression.type", this.compressionType);
}
props.put("acks", Integer.toString(this.requiredNumAcks));
props.put("retries", this.retries);
props.put("delivery.timeout.ms", this.deliveryTimeoutMs);
if (this.securityProtocol != null) {
props.put("security.protocol", this.securityProtocol);
}
if (this.securityProtocol != null
&& this.securityProtocol.contains("SSL")
&& this.sslTruststoreLocation != null
&& this.sslTruststorePassword != null) {
props.put("ssl.truststore.location", this.sslTruststoreLocation);
props.put("ssl.truststore.password", this.sslTruststorePassword);
if (this.sslKeystoreType != null
&& this.sslKeystoreLocation != null
&& this.sslKeystorePassword != null) {
props.put("ssl.keystore.type", this.sslKeystoreType);
props.put("ssl.keystore.location", this.sslKeystoreLocation);
props.put("ssl.keystore.password", this.sslKeystorePassword);
}
}
if (this.securityProtocol != null
&& this.securityProtocol.contains("SASL")
&& this.saslKerberosServiceName != null
&& this.clientJaasConfPath != null) {
props.put("sasl.kerberos.service.name", this.saslKerberosServiceName);
System.setProperty("java.security.auth.login.config", this.clientJaasConfPath);
}
if (this.kerb5ConfPath != null) {
System.setProperty("java.security.krb5.conf", this.kerb5ConfPath);
}
if (this.saslMechanism != null) {
props.put("sasl.mechanism", this.saslMechanism);
}
if (this.clientJaasConf != null) {
props.put("sasl.jaas.config", this.clientJaasConf);
}
if (this.maxBlockMs != null) {
props.put("max.block.ms", this.maxBlockMs);
}
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
/** 如果达不到batch.size大小,每隔固定时间发送一次 */
/** kafka会默认将发送到一个partiton的数据进行整合,这个大小是处理请求数据大小batch发送的,如果太小,可能就只能单独请求发送消息给kafka。 */
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put("key.serializer", ByteArraySerializer.class.getName());
props.put("value.serializer", ByteArraySerializer.class.getName());
this.producer = this.getKafkaProducer(props);
LogLog.debug("Kafka producer connected to " + this.brokerList);
LogLog.debug("Logging for topic: " + this.topic);
}
}
protected Producer<byte[], byte[]> getKafkaProducer(Properties props) {
return new KafkaProducer(props);
}
@Override
protected void append(LoggingEvent event) {
String message = event.getMessage().toString();
LogLog.debug("[" + new Date(event.getTimeStamp()) + "]" + message);
Future<RecordMetadata> response =
this.producer.send(
new ProducerRecord(this.topic, message.getBytes(StandardCharsets.UTF_8)));
if (this.syncSend) {
try {
response.get();
} catch (ExecutionException | InterruptedException var5) {
if (!this.ignoreExceptions) {
throw new RuntimeException(var5);
}
LogLog.debug("Exception while getting response", var5);
}
}
}
private String subAppend(LoggingEvent event) {
return this.layout == null ? event.getRenderedMessage() : this.layout.format(event);
}
@Override
public void close() {
if (!this.closed) {
this.closed = true;
this.producer.close();
}
}
@Override
public boolean requiresLayout() {
return true;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
# log4j.properties
log4j.rootLogger=debug,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss} [ %t:%l ] - [ %p ] %m%n
# --------kafka配置---------
# 配置输出日志的package
log4j.logger.com=info,kafka
log4j.logger.wiki=info,kafka
log4j.logger.org.apache.kafka.clients.Metadata=WARN,kafka
# 配置哪些包名下日志需要打印
log4j.appender.kafka.include=com.Test;wiki.hadoop
# 使用的实现类
log4j.appender.kafka=wiki.hadoop.log4j.kafka.KafkaLog4jAppender
# log4j 日志发往的 topic 名称
log4j.appender.kafka.topic=test
# Kafka Broker 地址列表
log4j.appender.kafka.brokerList=172.16.24.221:9092
log4j.appender.kafka.compressionType=none
# 输出的日志级别
log4j.appender.kafka.Threshold=INFO
# 生产者 ack 值,-1 表示所有 Broker 都接收到才是生产成功
log4j.appender.kafka.requiredNumAcks=-1
# 输出方式,有同步输出和异步输出两种方式
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.maxBlockMs=5000
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%m%n
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 测试发送
package wiki.hadoop.log4j.kafka;
import org.apache.log4j.Logger;
import org.junit.Test;
import java.util.Date;
public class KafkaLog4jAppenderTest {
private Logger logger = Logger.getLogger(KafkaLog4jAppenderTest.class);
@Test
public void log2Kafka() throws InterruptedException {
logger.info("测试消息" + new Date());
Thread.sleep(5000);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 源码下载
https://download.csdn.net/download/zhangshenghang/88478678 (opens new window)
上次更新: 2024/07/12, 13:19:37