Kafka消费者重复消费问题解决

问题描述

在使用Kafka作为消息队列进行数据处理流程的时候,出现了消费者重复消费的问题,最开始消费者每次拉取100条record,然后处理完成之后sleep 10秒继续工作,如此往复;

当我将每次拉取的条数设置成1000条的时候,发现消费者开始进行重复消费了,查看消费者的offset,发现虽然真正执行了消费,但是消费者的offset没有提交成功,下一次拉取的还是原来的数据;

排查过程

是否没有自动提交

通过查看本地的spring配置,consumer的enable-auto-commit正确设置,具体配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
enable-auto-commit: true
auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
max-poll-records: 1000
max_poll_interval: 5000

对于auto-commit-interval这个参数,我默认配置为了1000,即1秒提交一次,由于我采用@KafkaListener注解进行消费者的监听消费,所以进程并不会退出,那么也就有足够时间进行commit的提交;

尝试降低max-poll-records

由于我是在将max-poll-records从100提高到1000之后报出的错误,所以就应该是我一次poll获取到太多的records导致最终提交失败,将max-poll-records重新设置为100,有可以正常消费提交了,看来问题就出在这里;

查看日志,探究原因

查看工程的错误日志error.log,可以看到如下错误:

1
2
3
4
5
6
7
8
9
10
11
12
2020-07-24 14:27:15.161 [post_remove_consumer-5-C-1] ERROR org.springframework.kafka.listener.BatchLoggingErrorHandler 254 - Error while processing:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:820)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:692)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1454)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1968)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1791)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:940)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:901)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)

由上述日志可以看到,org.springframework.kafka.listener.BatchLoggingErrorHandler这个批量处理程序报出错误,错误名为org.apache.kafka.clients.consumer.CommitFailedException,也就是commit提交失败了,详细的原因解释如下:

1
2
3
4
// Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
由于消费者组已经将该partition分区收回并重新分配,所以当前消费者无法成功提交commit!
//This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing.
可能的原因是,两次poll()操作之间的时间大于配置的max.poll.interval.ms,也就是一次poll耗费太多的时间了!

并且也给出了具体的解决方式:

1
2
You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
我们可以通过增大max.poll.interval.ms,或者降低max-poll-records来缩短一次poll()操作的处理时间,从而修复上述错误。

所以我们将max.poll.interval.ms这个参数适当的调大,就可以正常地进行消费了!

总结与思考

kafka中,往往我们一个消费者组中包含多个消费者节点,他们通过poll()操作来获取要消费的分区partition,消费完成提交commit完成流程,但是由于配置了max.poll.interval.ms这个参数,也就是如果一次poll()操作没有在max.poll.interval.ms时间内完成,kafka broker可以认为这个消费者出问题了,就会将分区回收,并交由其他活跃的消费者进行处理!

看到这里,大概有点清楚了,必须说明的是,遇到问题一定多查log,多看各种报错信息,代码没有玄学,bug必然有其原因!

-------------本文结束感谢您的阅读-------------
0%