文章目录
从Kafka架构来看,理论上仍有消息丢失的可能性,但实际发生的概率极低,只有在所有副本的机器都宕机时才会丢失。
当所有代理(broker)都确认消息时,这并不意味着消息已经存储在磁盘上,而是仍在代理的内存中。这可能成为一个问题,特别是在所有代理服务器同时down的情况下。如果所有代理都位于同一可用区,这种情况更有可能发生,这是最糟糕的设计实践之一。
因此,尽管Kafka的设计大大降低了消息丢失的概率,但完全避免消息丢失是不可能的。
Producer
使用回调通知,不要使用producer.send(msg),而要使用producer.send(msg, callback)。记住,一定要使用带有回调通知的send方法。回调可以帮助你在消息发送失败时捕获异常并进行处理。
producer.send(new ProducerRecord<>("topic", "key", "value"), new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { // 处理异常 } } });
设置acks = all。acks是Producer的一个参数,代表了你对“已提交”消息的定义。如果设置成all,则表明所有副本Broker都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
一旦领导者收到来自同步副本的确认,告诉他们已经复制了消息,它就会将确认发送回生产者。这保证了只要至少一个同步副本保持活动状态,记录就不会丢失。
props.put("acks", "all");
设置重试机制,设置retries为一个较大的值。这里的retries同样是Producer的参数,对应前面提到的Producer自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了retries > 0的Producer能够自动重试消息发送,避免消息丢失。
启用幂等性,设置
enable.idempotence
为true
来启用幂等生产者。这可以防止重试期间出现重复,确保从生产者的角度来看,消息只传送到代理一次。设置交付超时时间,设置
delivery.timeout.ms
用于设置所有重试的超时时间。因此,如果retries
是一个很大的数字,但超时很短,消息传递无论如何都会失败。props.put("delivery.timeout.ms", "120000"); // 2分钟
Broker
禁用非干净的领导者选举,设置
unclean.leader.election.enable = false
。这是Broker端的参数,它控制的是哪些Broker有资格竞选分区的Leader。如果一个Broker落后原先的Leader太多,那么它一旦成为新的Leader,必然会造成消息的丢失。故一般都要将该参数设置成false,即不允许这种情况的发生。防止落后太多的 Broker 选举为 Leader,从而避免消息丢失。
配置副本数量,设置
replication.factor >= 3
。这也是Broker端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。副本replica总数,包括Leader和Follower设置最小同步副本数,设置
min.insync.replicas > 1
。这依然是Broker端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于1可以提升消息持久性。在实际环境中千万不要使用默认值1。min.insync.replicas只有在acks=-1(all)时才生效确保副本数量大于最小同步副本数,确保
replication.factor > min.insync.replicas
。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成replication.factor = min.insync.replicas + 1。
Consumer
- 确保消息消费完成再提交。Consumer端有个参数enable.auto.commit,最好把它设置成false,并采用手动提交位移的方式。
维持先消费消息(阅读),再更新位移(书签)的顺序
- 使用
commitSync
;为了更好的性能,使用commitAsync
,如果处理和提交之间发生故障,可能会出现重复。
流程
当设置 acks=all
并且生产者调用 send()
方法时,它会等待领导者的确认。一旦领导者收到来自同步副本的确认,并告知它们已经复制了消息,领导者将把确认发送回给生产者。
属性 min.insync.replicas
指定了在领导者向客户端发送确认之前必须确认的最小副本数。如果未达到这个最小值,生产者将抛出异常(例如 NotEnoughReplicas
或 NotEnoughReplicasAfterAppend
)。
在典型场景中,如果创建一个复制因子为 3 的主题,并将 min.insync.replicas
设置为 2,并使用 acks=all
,这将确保只有在大多数副本都写入成功时生产者才会确认消息提交。
当生产者在设定的 request.timeout.ms
时间内未收到确认时,它将会重新发送消息,间隔时间由 retry.backoff.ms
定义。生产者将根据设定的 retries
次数重试发送失败的消息。
另外,属性 delivery.timeout.ms
定义了调用 send()
后返回成功或失败的时间上限。
在达到最大重试次数后,如果消息仍未能成功记录,或出现其他异常情况,可以考虑将消息保存到其他数据存储。这可以通过在 send()
方法中设置回调来处理,在此类错误情况下会触发回调函数。
这些属性和机制确保了在不同网络和节点故障情况下,Kafka生产者能够尽可能地保证消息的可靠性和最终一致性。