Spark日志Log4j发送到Kafka
[toc]
# 自定义KafkaAppender
注意:如果使用官方自带的可以直接引用,版本为Kafka当前使用的版本
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-log4j-appender</artifactId> <version>2.4.1</version> </dependency>
1
2
3
4
5
配置依赖为
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.28</version>
<scope>compile</scope>
</dependency>
2
3
4
5
6
7
8
9
10
11
12
自定义KafkaLog4jAppender.java
内容为
这里我们实现了包名过滤功能
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class KafkaLog4jAppender extends AppenderSkeleton {
/** 包含规则条件 */
private Set<String> includeSet = new HashSet<>();
private Set<String> includeMatchSet = new HashSet<>();
/** 不包含规则条件 */
private Set<String> excludeSet = new HashSet<>();
private Set<String> excludeMatchSet = new HashSet<>();
private String brokerList;
private String topic;
private String compressionType;
private String securityProtocol;
private String sslTruststoreLocation;
private String sslTruststorePassword;
private String sslKeystoreType;
private String sslKeystoreLocation;
private String sslKeystorePassword;
private String saslKerberosServiceName;
private String saslMechanism;
private String clientJaasConfPath;
private String clientJaasConf;
private String kerb5ConfPath;
private Integer maxBlockMs;
private int retries = 2147483647;
private int requiredNumAcks = 1;
private int deliveryTimeoutMs = 120000;
private boolean ignoreExceptions = true;
private boolean syncSend;
private Producer<byte[], byte[]> producer;
private String includes;
private String excludes;
public String getIncludes() {
return includes;
}
public void setIncludes(String includes) {
this.includes = includes;
}
public String getExcludes() {
return excludes;
}
public void setExcludes(String excludes) {
this.excludes = excludes;
}
public KafkaLog4jAppender() {}
public Producer<byte[], byte[]> getProducer() {
return this.producer;
}
public String getBrokerList() {
return this.brokerList;
}
public void setBrokerList(String brokerList) {
this.brokerList = brokerList;
}
public int getRequiredNumAcks() {
return this.requiredNumAcks;
}
public void setRequiredNumAcks(int requiredNumAcks) {
this.requiredNumAcks = requiredNumAcks;
}
public int getRetries() {
return this.retries;
}
public void setRetries(int retries) {
this.retries = retries;
}
public int getDeliveryTimeoutMs() {
return this.deliveryTimeoutMs;
}
public void setDeliveryTimeoutMs(int deliveryTimeoutMs) {
this.deliveryTimeoutMs = deliveryTimeoutMs;
}
public String getCompressionType() {
return this.compressionType;
}
public void setCompressionType(String compressionType) {
this.compressionType = compressionType;
}
public String getTopic() {
return this.topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public boolean getIgnoreExceptions() {
return this.ignoreExceptions;
}
public void setIgnoreExceptions(boolean ignoreExceptions) {
this.ignoreExceptions = ignoreExceptions;
}
public boolean getSyncSend() {
return this.syncSend;
}
public void setSyncSend(boolean syncSend) {
this.syncSend = syncSend;
}
public String getSslTruststorePassword() {
return this.sslTruststorePassword;
}
public String getSslTruststoreLocation() {
return this.sslTruststoreLocation;
}
public String getSecurityProtocol() {
return this.securityProtocol;
}
public void setSecurityProtocol(String securityProtocol) {
this.securityProtocol = securityProtocol;
}
public void setSslTruststoreLocation(String sslTruststoreLocation) {
this.sslTruststoreLocation = sslTruststoreLocation;
}
public void setSslTruststorePassword(String sslTruststorePassword) {
this.sslTruststorePassword = sslTruststorePassword;
}
public void setSslKeystorePassword(String sslKeystorePassword) {
this.sslKeystorePassword = sslKeystorePassword;
}
public void setSslKeystoreType(String sslKeystoreType) {
this.sslKeystoreType = sslKeystoreType;
}
public void setSslKeystoreLocation(String sslKeystoreLocation) {
this.sslKeystoreLocation = sslKeystoreLocation;
}
public void setSaslKerberosServiceName(String saslKerberosServiceName) {
this.saslKerberosServiceName = saslKerberosServiceName;
}
public void setClientJaasConfPath(String clientJaasConfPath) {
this.clientJaasConfPath = clientJaasConfPath;
}
public void setKerb5ConfPath(String kerb5ConfPath) {
this.kerb5ConfPath = kerb5ConfPath;
}
public String getSslKeystoreLocation() {
return this.sslKeystoreLocation;
}
public String getSslKeystoreType() {
return this.sslKeystoreType;
}
public String getSslKeystorePassword() {
return this.sslKeystorePassword;
}
public String getSaslKerberosServiceName() {
return this.saslKerberosServiceName;
}
public String getClientJaasConfPath() {
return this.clientJaasConfPath;
}
public void setSaslMechanism(String saslMechanism) {
this.saslMechanism = saslMechanism;
}
public String getSaslMechanism() {
return this.saslMechanism;
}
public void setClientJaasConf(String clientJaasConf) {
this.clientJaasConf = clientJaasConf;
}
public String getClientJaasConf() {
return this.clientJaasConf;
}
public String getKerb5ConfPath() {
return this.kerb5ConfPath;
}
public int getMaxBlockMs() {
return this.maxBlockMs;
}
public void setMaxBlockMs(int maxBlockMs) {
this.maxBlockMs = maxBlockMs;
}
@Override
public void activateOptions() {
// 加载过滤规则
setFilterRules(includes, includeMatchSet, includeSet);
setFilterRules(excludes, excludeMatchSet, excludeSet);
Properties props = new Properties();
if (this.brokerList != null) {
props.put("bootstrap.servers", this.brokerList);
}
if (props.isEmpty()) {
throw new ConfigException("The bootstrap servers property should be specified");
} else if (this.topic == null) {
throw new ConfigException("Topic must be specified by the Kafka log4j appender");
} else {
if (this.compressionType != null) {
props.put("compression.type", this.compressionType);
}
props.put("acks", Integer.toString(this.requiredNumAcks));
props.put("retries", this.retries);
props.put("delivery.timeout.ms", this.deliveryTimeoutMs);
if (this.securityProtocol != null) {
props.put("security.protocol", this.securityProtocol);
}
if (this.securityProtocol != null
&& this.securityProtocol.contains("SSL")
&& this.sslTruststoreLocation != null
&& this.sslTruststorePassword != null) {
props.put("ssl.truststore.location", this.sslTruststoreLocation);
props.put("ssl.truststore.password", this.sslTruststorePassword);
if (this.sslKeystoreType != null
&& this.sslKeystoreLocation != null
&& this.sslKeystorePassword != null) {
props.put("ssl.keystore.type", this.sslKeystoreType);
props.put("ssl.keystore.location", this.sslKeystoreLocation);
props.put("ssl.keystore.password", this.sslKeystorePassword);
}
}
if (this.securityProtocol != null
&& this.securityProtocol.contains("SASL")
&& this.saslKerberosServiceName != null
&& this.clientJaasConfPath != null) {
props.put("sasl.kerberos.service.name", this.saslKerberosServiceName);
System.setProperty("java.security.auth.login.config", this.clientJaasConfPath);
}
if (this.kerb5ConfPath != null) {
System.setProperty("java.security.krb5.conf", this.kerb5ConfPath);
}
if (this.saslMechanism != null) {
props.put("sasl.mechanism", this.saslMechanism);
}
if (this.clientJaasConf != null) {
props.put("sasl.jaas.config", this.clientJaasConf);
}
if (this.maxBlockMs != null) {
props.put("max.block.ms", this.maxBlockMs);
}
props.put("key.serializer", ByteArraySerializer.class.getName());
props.put("value.serializer", ByteArraySerializer.class.getName());
System.out.println("Properties:" + props);
this.producer = this.getKafkaProducer(props);
LogLog.debug("Kafka producer connected to " + this.brokerList);
LogLog.debug("Logging for topic: " + this.topic);
}
}
/**
* 设置过滤规则
*
* @name setFilterRules
* @date 2023/3/2 下午1:57
* @return void
* @param excludes
* @param excludeMatchSet
* @param excludeSet
* @author Jast
*/
private void setFilterRules(
String excludes, Set<String> excludeMatchSet, Set<String> excludeSet) {
if (excludes != null) {
for (String exclude : excludes.split(",")) {
if (exclude.length() > 0) {
if (exclude.endsWith(".*")) {
excludeMatchSet.add(exclude.replace(".*", ""));
} else {
excludeSet.add(exclude);
}
}
}
}
}
protected Producer<byte[], byte[]> getKafkaProducer(Properties props) {
return new KafkaProducer(props);
}
@Override
protected void append(LoggingEvent event) {
if (filterPackageName(event)) {
return;
}
String message = this.subAppend(event);
LogLog.debug("[" + new Date(event.getTimeStamp()) + "]" + message);
Future<RecordMetadata> response =
this.producer.send(
new ProducerRecord(this.topic, message.getBytes(StandardCharsets.UTF_8)));
if (this.syncSend) {
try {
response.get();
} catch (ExecutionException | InterruptedException var5) {
if (!this.ignoreExceptions) {
throw new RuntimeException(var5);
}
LogLog.debug("Exception while getting response", var5);
}
}
}
private String subAppend(LoggingEvent event) {
return this.layout == null ? event.getRenderedMessage() : this.layout.format(event);
}
@Override
public void close() {
if (!this.closed) {
this.closed = true;
this.producer.close();
}
}
@Override
public boolean requiresLayout() {
return true;
}
/**
* 过滤包名,如果为True则不发送到Kafka
*
* @name filterPackageName
* @date 2023/2/28 下午4:07
* @return boolean
* @param event
* @author Jast
*/
private boolean filterPackageName(LoggingEvent event) {
boolean flag = true;
if (includeSet.size() == 0
&& includeMatchSet.size() == 0
&& excludeSet.size() == 0
&& excludeMatchSet.size() == 0) {
return false;
}
if (includeSet.size() == 0 && includeMatchSet.size() == 0) {
flag = false;
}
/** 打印日志类/名称 */
String loggerName = event.getLoggerName();
for (String include : includeSet) {
if (loggerName.equals(include)) {
flag = false;
}
}
for (String include : includeMatchSet) {
if (loggerName.startsWith(include)) {
flag = false;
}
}
for (String exclude : excludeMatchSet) {
if (loggerName.startsWith(exclude)) {
flag = true;
}
}
for (String exclude : excludeSet) {
if (loggerName.equals(exclude)) {
flag = true;
}
}
return flag;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
# 修改log4j.properties配置
修改Spark自身的配置文件,配置文件位置:/opt/spark-client/conf
不同的集群可能配置文件所在目录不同
# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %l %p %c{1}: %m%n
# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
# 从这开始往下是添加的内容
# log4j.logger.com 代表包名com开头的类info日志都发送到Kafka,可以配置多个,注意包名要完整,比如com 不能写co
log4j.logger.com=info,kafka
log4j.logger.net.jast.xxx=info,kafka
log4j.logger.org.apache.kafka.clients.Metadata=ERROR
# 使用自定义的KafkaAppender
log4j.appender.kafka=com.avris.KafkaLog4jAppender
# 官方提供的KafkaAppender
#log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
# 发送至Topic
log4j.appender.kafka.topic=test
# Kafka BrokerList
log4j.appender.kafka.brokerList=172.16.24.194:9092,172.16.24.195:9092,172.16.24.196:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.Threshold=INFO
log4j.appender.kafka.requiredNumAcks=-1
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.maxBlockMs=5000
log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1
log4j.appender.kafka.layout.UserFields=app_name:xxx
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 启动命令配置添加参数
--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \
--jars $(echo $JAR_PATH/lib/*.jar | tr ' ' ',') \
2
3
说明:
- kafka-appender-1.0.0.jar 为我们刚刚自定义的
KafkaLog4jAppender
类打成的jar包 - slf4j-api-1.8.0-beta2.jar与slf4j-log4j12-1.8.0-beta2.jar版本是为了解决日志版本引发的NullpointException异常,在下面异常处理章节有描述
- slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar这两个jar包也要放在lib目录下通过--jars命令提交
# 启动之后可以在Kafka中查询发送数据
{"source_host":"bigdata-24-194","method":"println","level":"INFO","message":"2023-03-02 14:38:35 - 本轮任务计算完成,休眠 10000","mdc":{},"app_name":"xxx","@timestamp":"2023-03-02T06:38:35.858Z","file":"Print.scala","line_number":"24","thread_name":"main","@version":1,"logger_name":"com.util.Print","class":"com.avris.util.Print$"}
这里有个问题
net.logstash.log4j.JSONEventLayoutV1
实现的方法,时区是错误的,我们需要修改时区,下面我们介绍自定义实现Layout
# 时区问题-自定义实现JSONLayout解决
JSONLayout比较影响性能,建议数据量大的情况下不要使用,自行记录相关信息,然后打印日志
# 自定义JSONLayout.java
在我们刚刚自定义KafkaAppender项目中创建JSONLayout.java
类,内容如下
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.FastDateFormat;
import org.apache.log4j.Layout;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LocationInfo;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.log4j.spi.ThrowableInformation;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
public class JSONLayout extends Layout {
private boolean locationInfo = false;
private String customUserFields;
private boolean ignoreThrowable = false;
private boolean activeIgnoreThrowable = ignoreThrowable;
private String hostname = InetAddress.getLocalHost().getHostName();
private String threadName;
private long timestamp;
private String ndc;
private Map mdc;
private LocationInfo info;
private HashMap<String, Object> exceptionInformation;
private static Integer version = 1;
private JSONObject logstashEvent;
public static final TimeZone GMT_8 = TimeZone.getTimeZone("GMT+8");
public static final FastDateFormat ISO_DATETIME_TIME_ZONE_FORMAT_WITH_MILLIS =
FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", GMT_8);
public static final String ADDITIONAL_DATA_PROPERTY = "com.avris.JSONLayout.UserFields";
public static String dateFormat(long timestamp) {
return ISO_DATETIME_TIME_ZONE_FORMAT_WITH_MILLIS.format(timestamp);
}
/**
* For backwards compatibility, the default is to generate location information in the log
* messages.
*/
public JSONLayout() throws UnknownHostException {
this(true);
}
/**
* Creates a layout that optionally inserts location information into log messages.
*
* @param locationInfo whether or not to include location information in the log messages.
*/
public JSONLayout(boolean locationInfo) throws UnknownHostException {
this.locationInfo = locationInfo;
}
@Override
public String format(LoggingEvent loggingEvent) {
threadName = loggingEvent.getThreadName();
timestamp = loggingEvent.getTimeStamp();
exceptionInformation = new HashMap<String, Object>();
mdc = loggingEvent.getProperties();
ndc = loggingEvent.getNDC();
logstashEvent = new JSONObject();
String whoami = this.getClass().getSimpleName();
/**
* All v1 of the event format requires is "@timestamp" and "@version" Every other field is
* arbitrary
*/
logstashEvent.put("@version", version);
logstashEvent.put("@timestamp", dateFormat(timestamp));
/** Extract and add fields from log4j config, if defined */
if (getUserFields() != null) {
String userFlds = getUserFields();
LogLog.debug("[" + whoami + "] Got user data from log4j property: " + userFlds);
addUserFields(userFlds);
}
/**
* Extract fields from system properties, if defined Note that CLI props will override
* conflicts with log4j config
*/
if (System.getProperty(ADDITIONAL_DATA_PROPERTY) != null) {
if (getUserFields() != null) {
LogLog.warn(
"["
+ whoami
+ "] Loading UserFields from command-line. This will override any UserFields set in the log4j configuration file");
}
String userFieldsProperty = System.getProperty(ADDITIONAL_DATA_PROPERTY);
LogLog.debug(
"[" + whoami + "] Got user data from system property: " + userFieldsProperty);
addUserFields(userFieldsProperty);
}
/** Now we start injecting our own stuff. */
logstashEvent.put("source_host", hostname);
logstashEvent.put("message", loggingEvent.getRenderedMessage());
if (loggingEvent.getThrowableInformation() != null) {
final ThrowableInformation throwableInformation =
loggingEvent.getThrowableInformation();
if (throwableInformation.getThrowable().getClass().getCanonicalName() != null) {
exceptionInformation.put(
"exception_class",
throwableInformation.getThrowable().getClass().getCanonicalName());
}
if (throwableInformation.getThrowable().getMessage() != null) {
exceptionInformation.put(
"exception_message", throwableInformation.getThrowable().getMessage());
}
if (throwableInformation.getThrowableStrRep() != null) {
String stackTrace =
StringUtils.join(throwableInformation.getThrowableStrRep(), "\n");
exceptionInformation.put("stacktrace", stackTrace);
}
addEventData("exception", exceptionInformation);
}
if (locationInfo) {
info = loggingEvent.getLocationInformation();
addEventData("file", info.getFileName());
addEventData("line_number", info.getLineNumber());
addEventData("class", info.getClassName());
addEventData("method", info.getMethodName());
}
addEventData("logger_name", loggingEvent.getLoggerName());
addEventData("mdc", mdc);
addEventData("ndc", ndc);
addEventData("level", loggingEvent.getLevel().toString());
addEventData("thread_name", threadName);
return logstashEvent.toString() + "\n";
}
@Override
public boolean ignoresThrowable() {
return ignoreThrowable;
}
/**
* Query whether log messages include location information.
*
* @return true if location information is included in log messages, false otherwise.
*/
public boolean getLocationInfo() {
return locationInfo;
}
/**
* Set whether log messages should include location information.
*
* @param locationInfo true if location information should be included, false otherwise.
*/
public void setLocationInfo(boolean locationInfo) {
this.locationInfo = locationInfo;
}
public String getUserFields() {
return customUserFields;
}
public void setUserFields(String userFields) {
this.customUserFields = userFields;
}
@Override
public void activateOptions() {
activeIgnoreThrowable = ignoreThrowable;
}
private void addUserFields(String data) {
if (null != data) {
String[] pairs = data.split(",");
for (String pair : pairs) {
String[] userField = pair.split(":", 2);
if (userField[0] != null) {
String key = userField[0];
String val = userField[1];
addEventData(key, val);
}
}
}
}
private void addEventData(String keyname, Object keyval) {
if (null != keyval) {
logstashEvent.put(keyname, keyval);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
相关依赖
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.21</version>
<scope>provided</scope>
</dependency>
2
3
4
5
6
7
8
9
10
11
12
打包上传服务器准备运行
启动命令中将kafka-appender-1.0.0.jar
以及相关依赖添加
--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar:lib/fastjson-2.0.7.jar:lib/fastjson2-2.0.7.jar:lib/fastjson2-extension-2.0.7.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar:lib/fastjson-2.0.7.jar:lib/fastjson2-2.0.7.jar:lib/fastjson2-extension-2.0.7.jar" \
2
启动后查看数据,发现@timestamp时间正常了
{"source_host":"bigdata-24-194","method":"println","level":"INFO","message":"2023-03-02 16:41:24 - 本轮自定义任务计算完成","mdc":{},"app_name":"xxx","@timestamp":"2023-03-02 16:41:24","file":"Print.scala","line_number":"24","thread_name":"main","@version":1,"logger_name":"com.avris.util.Print","class":"com.avris.util.Print$"}
# 一键应用
查看本节之前,请将之前讲解的步骤都看一遍,否则可能不了解。
为了方便应用,我将实现的类打包传到了中央仓库,可以直接通过Maven引用,直接使用,具体步骤如下
- maven中引用依赖
<dependency>
<groupId>com.gitee.jastee</groupId>
<artifactId>kafka-log4j-appender</artifactId>
<version>1.0.5</version>
</dependency>
2
3
4
5
- 在代码中使用Log打印日志
- 修改
Spark
配置文件log4j.properties
# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %l %p %c{1}: %m%n
# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
# 从这开始往下是添加的内容
# log4j.logger.com 代表包名com开头的类info日志都发送到Kafka,可以配置多个,注意包名要完整,比如com 不能写co
log4j.logger.com=info,kafka
log4j.logger.net.jast.xxx=info,kafka
log4j.logger.org.apache.kafka.clients.Metadata=ERROR
# 使用自定义的KafkaAppender
log4j.appender.kafka=com.gitee.jastee.kafka.appender.KafkaLog4jAppender
# 官方提供的KafkaAppender
#log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
# 发送至Topic
log4j.appender.kafka.topic=test
# Kafka BrokerList
log4j.appender.kafka.brokerList=172.16.24.194:9092,172.16.24.195:9092,172.16.24.196:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.Threshold=INFO
log4j.appender.kafka.requiredNumAcks=-1
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.maxBlockMs=5000
log4j.appender.kafka.layout=com.gitee.jastee.kafka.appender.JSONLayout
log4j.appender.kafka.layout.UserFields=app_name:xxx
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
- 启动命令添加
使用
--conf
指定加载的jar包太多了?可以将三个包合成一个包去指定,使用jar xf
解压,jar -cvfM
在压缩,合成一个包即可,这里因为不同环境可能使用的slf4j版本不同,我就没统一合并,实际开发中根据自己需求合并就行。
--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-log4j-appender-1.0.5.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-log4j-appender-1.0.5.jar" \
--jars $(echo $JAR_PATH/lib/*.jar | tr ' ' ',') \
2
3
- 启动程序
# 可能遇到的异常
# ClassNotFoundException: xxx.KafkaLog4jAppender
启动程序提示异常,明明在启动时候将jar包提交了却提示找不到类
log4j:ERROR Could not instantiate class [com.xxx.KafkaLog4jAppender].
java.lang.ClassNotFoundException: com.xxx.KafkaLog4jAppender
2
原因:
因为Spark启动最初还未加载--jars的jar包,通过spark.executor.extraClassPath
与spark.driver.extraClassPath
将我们自定义kafka-appender-1.0.0.jar
(jar包中的类就是KafkaLog4jAppender.java
)提交上去即可,如果是使用官方的,就将官方的jar包提交上去即可
解决方法:
在启动脚本添加
--conf "spark.driver.extraClassPath=kafka-appender-1.0.0.jar" \
--conf "spark.executor.extraClassPath=kafka-appender-1.0.0.jar" \
2
# Unexpected problem occured during version sanity check Reported exception: java.lang.NullPointerException
Unexpected problem occured during version sanity check
Reported exception:
java.lang.NullPointerException
at org.slf4j.LoggerFactory.versionSanityCheck(LoggerFactory.java:272)
at org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:126)
at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:417)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:362)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:388)
at org.apache.kafka.clients.CommonClientConfigs.<clinit>(CommonClientConfigs.java:32)
at org.apache.kafka.clients.producer.ProducerConfig.<clinit>(ProducerConfig.java:341)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
at com.KafkaLog4jAppender.getKafkaProducer(KafkaLog4jAppender.java:285)
at com.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.java:278)
at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)
at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)
at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)
at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)
at org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:672)
at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:516)
at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)
at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
at org.slf4j.impl.Log4jLoggerFactory.<init>(Log4jLoggerFactory.java:66)
at org.slf4j.impl.StaticLoggerBinder.<init>(StaticLoggerBinder.java:72)
at org.slf4j.impl.StaticLoggerBinder.<clinit>(StaticLoggerBinder.java:45)
at org.apache.spark.internal.Logging$.org$apache$spark$internal$Logging$$isLog4j12(Logging.scala:222)
at org.apache.spark.internal.Logging.initializeLogging(Logging.scala:127)
at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:111)
at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:105)
at org.apache.spark.deploy.SparkSubmit.initializeLogIfNecessary(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:83)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Exception in thread "main" java.lang.ExceptionInInitializerError
at org.apache.kafka.clients.producer.ProducerConfig.<clinit>(ProducerConfig.java:341)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
at com.avris.KafkaLog4jAppender.getKafkaProducer(KafkaLog4jAppender.java:285)
at com.avris.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.java:278)
at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)
at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)
at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)
at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)
at org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:672)
at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:516)
at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)
at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
at org.slf4j.impl.Log4jLoggerFactory.<init>(Log4jLoggerFactory.java:66)
at org.slf4j.impl.StaticLoggerBinder.<init>(StaticLoggerBinder.java:72)
at org.slf4j.impl.StaticLoggerBinder.<clinit>(StaticLoggerBinder.java:45)
at org.apache.spark.internal.Logging$.org$apache$spark$internal$Logging$$isLog4j12(Logging.scala:222)
at org.apache.spark.internal.Logging.initializeLogging(Logging.scala:127)
at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:111)
at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:105)
at org.apache.spark.deploy.SparkSubmit.initializeLogIfNecessary(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:83)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:423)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:362)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:388)
at org.apache.kafka.clients.CommonClientConfigs.<clinit>(CommonClientConfigs.java:32)
... 27 more
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
原因:
使用日志版本问题,现在使用的版本为slf4j-log4j12-1.7.30.jar
解决方法:
使用slf4j的1.8.0-beta2
版本
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.8.0-beta2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.8.0-beta2</version>
</dependency>
2
3
4
5
6
7
8
9
10
通过spark.driver.extraClassPath
和spark.executor.extraClassPath
参数提交
--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \
2
# 参考文章
https://www.jianshu.com/p/cde2b4712859
https://blog.csdn.net/epitomizelu/article/details/123687998