SparkStreaming参数介绍
# SparkStreaming参数介绍
- spark.streaming.concurrentJobs :增加job并行度
可以通过集中方法为streaming job配置此参数。
- spark-default中修改
全局性修改,所有的streaming job都会受到影响。
- 提交streaming job是 –conf 参数添加(推荐)
在提交job时,可以使用–conf 参数为该job添加个性化的配置。例如:
bin/spark-submit --master yarn --conf spark.streaming.concurrentJobs=5
设置该streaming job的job executor 线程池大小为5,在资源充足的情况下可以同时执行5个batch job。
- 代码设置
在代码中通过sparkConf设置:
sparkConf.set("spark.streaming.concurrentJobs", "5");
或者
System.setProperty("spark.streaming.concurrentJobs", "5");
2
3
4
5
6
7
8
9
10
11
12
- spark.streaming.kafka.maxRatePerPartition:每秒每一个topic的每一个分区获取的最大消息数。
合理的批处理时间(batchDuration)
几乎所有的Spark Streaming调优文档都会提及批处理时间的调整,在StreamingContext初始化的时候,有一个参数便是批处理时间的设定。 如果这个值设置的过短,即个batchDuration所产生的Job并不能在这期间完成处理,那么就会造成数据不断堆积,最终导致Spark Streaming发生阻塞。 一般对于batchDuration的设置不会小于500ms,因为过小会导致SparkStreaming频繁的提交作业,对整个streaming造成额外的负担。 在平时的应用中,根据不同的应用场景和硬件配置,我设在1~10s之间,我们可以根据SparkStreaming的可视化监控界面,观察Total Delay来进行batchDuration的调整,直达SparkStreaming刚刚能及时处理完上一个批处理的数据,这样就是目前情况的最优值。
合理的Kafka拉取量(maxRatePerPartition重要) spark.streaming.kafka.maxRatePerPartition参数配置指定了每秒每一个topic的每一个分区获取的最大消息数。
对于Spark Streaming消费kafka中数据的应用场景,这个配置是非常关键的。这个参数默认是没有上限的,即kafka当中有多少数据它就会直接全部拉出。而根据生产者写入Kafka的速率以及消费者本身处理数据的速度,同时这个参数需要结合上面的batchDuration,使得每个partition拉取在每个batchDuration期间拉取的数据能够顺利的处理完毕,做到尽可能高的吞吐量,而这个参数的调整可以参考可视化监控界面中的Input Rate和Processing Time