问题描述
在使用Kafka作为消息队列进行数据处理流程的时候,出现了消费者重复消费的问题,最开始消费者每次拉取100条record,然后处理完成之后sleep 10秒继续工作,如此往复;
当我将每次拉取的条数设置成1000条的时候,发现消费者开始进行重复消费了,查看消费者的offset,发现虽然真正执行了消费,但是消费者的offset没有提交成功,下一次拉取的还是原来的数据;
排查过程
是否没有自动提交
通过查看本地的spring配置,consumer的enable-auto-commit
正确设置,具体配置如下:
1 | spring: |
对于auto-commit-interval
这个参数,我默认配置为了1000
,即1秒提交一次,由于我采用@KafkaListener
注解进行消费者的监听消费,所以进程并不会退出,那么也就有足够时间进行commit的提交;
尝试降低max-poll-records
由于我是在将max-poll-records
从100提高到1000之后报出的错误,所以就应该是我一次poll获取到太多的records导致最终提交失败,将max-poll-records
重新设置为100,有可以正常消费提交了,看来问题就出在这里;
查看日志,探究原因
查看工程的错误日志error.log
,可以看到如下错误:
1 | 2020-07-24 14:27:15.161 [post_remove_consumer-5-C-1] ERROR org.springframework.kafka.listener.BatchLoggingErrorHandler 254 - Error while processing: |
由上述日志可以看到,org.springframework.kafka.listener.BatchLoggingErrorHandler
这个批量处理程序报出错误,错误名为org.apache.kafka.clients.consumer.CommitFailedException
,也就是commit提交失败了,详细的原因解释如下:
1 | // Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. |
并且也给出了具体的解决方式:
1 | You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. |
所以我们将max.poll.interval.ms
这个参数适当的调大,就可以正常地进行消费了!
总结与思考
在kafka
中,往往我们一个消费者组中包含多个消费者节点,他们通过poll()
操作来获取要消费的分区partition
,消费完成提交commit
完成流程,但是由于配置了max.poll.interval.ms
这个参数,也就是如果一次poll()
操作没有在max.poll.interval.ms
时间内完成,kafka broker
可以认为这个消费者出问题了,就会将分区回收,并交由其他活跃的消费者进行处理!
看到这里,大概有点清楚了,必须说明的是,遇到问题一定多查log,多看各种报错信息,代码没有玄学,bug必然有其原因!