刨根问底: Kafka 到底会不会丢数据?

2022年01月16日 阅读数:1
这篇文章主要向大家介绍刨根问底: Kafka 到底会不会丢数据?,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

你们好,我是 华仔, 又跟你们见面了。java

上一篇做为专题系列的第二篇,从演进的角度带你深度剖析了关于 Kafka 请求处理全流程以及超高并发的网络架构设计的实现细节,今天开启第三篇,咱们来聊聊 Kafka 生产环境你们都比较关心的问题。微信

那么 Kafka 到底会不会丢数据呢?若是丢数据,究竟该怎么解决呢?网络

只有掌握了这些, 咱们才能处理好 Kafka 生产级的一些故障,从而更稳定地服务业务。架构

认真读完这篇文章,我相信你会对Kafka 如何解决丢数据问题,有更加深入的理解。并发

这篇文章干货不少,但愿你能够耐心读完。异步

 

01 整体概述分布式


愈来愈多的互联网公司使用消息队列来支撑本身的核心业务。因为是核心业务,通常都会要求消息传递过程当中最大限度的作到不丢失,若是中间环节出现数据丢失,就会引来用户的投诉,年末绩效就要背锅了。ide

那么使用 Kafka 到底会不会丢数据呢?若是丢数据了该怎么解决呢?为了不相似状况发生,除了要作好补偿措施,咱们更应该在系统设计的时候充分考虑系统中的各类异常状况,从而设计出一个稳定可靠的消息系统。函数

你们都知道 Kafka 的整个架构很是简洁,是分布式的架构,主要由 Producer、Broker、Consumer 三部分组成,后面剖析丢失场景会从这三部分入手来剖析。高并发

 

02 消息传递语义剖析


在深度剖析消息丢失场景以前,咱们先来聊聊「消息传递语义」究竟是个什么玩意?

所谓的消息传递语义是 Kafka 提供的 Producer 和 Consumer 之间的消息传递过程当中消息传递的保证性。主要分为三种, 以下图所示:

 

1)首先当 Producer 向 Broker 发送数据后,会进行 commit,若是 commit 成功,因为 Replica 副本机制的存在,则意味着消息不会丢失,可是 Producer 发送数据给 Broker 后,遇到网络问题而形成通讯中断,那么 Producer 就没法准确判断该消息是否已经被提交(commit),这就可能形成 at least once 语义。

2)在 Kafka 0.11.0.0 以前, 若是 Producer 没有收到消息 commit 的响应结果,它只能从新发送消息,确保消息已经被正确的传输到 Broker,从新发送的时候会将消息再次写入日志中;而在 0.11.0.0 版本以后, Producer 支持幂等传递选项,保证从新发送不会致使消息在日志出现重复。为了实现这个, Broker 为 Producer 分配了一个ID,并经过每条消息的序列号进行去重。也支持了相似事务语义来保证将消息发送到多个 Topic 分区中,保证全部消息要么都写入成功,要么都失败,这个主要用在 Topic 之间的 exactly once 语义。

其中启用幂等传递的方法配置:enable.idempotence = true。

启用事务支持的方法配置:设置属性 transcational.id = "指定值"。

3)从 Consumer 角度来剖析, 咱们知道 Offset 是由 Consumer 本身来维护的, 若是 Consumer 收到消息后更新 Offset, 这时 Consumer 异常 crash 掉, 那么新的 Consumer 接管后再次重启消费,就会形成 at most once 语义(消息会丢,但不重复)。

4) 若是 Consumer 消费消息完成后, 再更新 Offset, 若是这时 Consumer crash 掉,那么新的 Consumer 接管后从新用这个 Offset 拉取消息, 这时就会形成 at least once 语义(消息不丢,但被屡次重复处理)。

总结:默认 Kafka 提供 「at least once」语义的消息传递,容许用户经过在处理消息以前保存 Offset 的方式提供 「at most once」 语义。若是咱们能够本身实现消费幂等,理想状况下这个系统的消息传递就是严格的「exactly once」, 也就是保证不丢失、且只会被精确的处理一次,可是这样是很难作到的。

 

从 Kafka 总体架构图咱们能够得出有三次消息传递的过程:

1)Producer 端发送消息给 Kafka Broker 端。

2)Kafka Broker 将消息进行同步并持久化数据。

3)Consumer 端从 Kafka Broker 将消息拉取并进行消费。

在以上这三步中每一步均可能会出现丢失数据的状况, 那么 Kafka 到底在什么状况下才能保证消息不丢失呢?

经过上面三步,咱们能够得出:Kafka 只对 「已提交」的消息作「最大限度的持久化保证不丢失」。

怎么理解上面这句话呢?

1)首先是 「已提交」的消息:当 Kafka 中 N 个 Broker 成功的收到一条消息并写入到日志文件后,它们会告诉 Producer 端这条消息已成功提交了,那么这时该消息在 Kafka 中就变成 "已提交消息" 了。

这里的 N 个 Broker 咱们怎么理解呢?这主要取决于对 "已提交" 的定义, 这里能够选择只要一个 Broker 成功保存该消息就算已提交,也能够是全部 Broker 都成功保存该消息才算是已提交。

2)其次是 「最大限度的持久化保证不丢失」,也就是说 Kafka 并不能保证在任何状况下都能作到数据不丢失。即 Kafka 不丢失数据是有前提条件的。假如这时你的消息保存在 N 个 Broker 上,那么前提条件就是这 N 个 Broker 中至少有1个是存活的,就能够保证你的消息不丢失。

也就是说 Kafka 是能作到不丢失数据的, 只不过这些消息必须是 「已提交」的消息,且还要知足必定的条件才能够。

了解了 Kafka 消息传递语义以及什么状况下能够保证不丢失数据,下面咱们来详细剖析每一个环节为何会丢数据,以及如何最大限度的避免丢失数据。

 

03 消息丢失场景剖析


Producer 端丢失场景剖析


在剖析 Producer 端数据丢失以前,咱们先来了解下 Producer 端发送消息的流程,对于不了解 Producer 的读者们,能够查看 聊聊 Kafka Producer 那点事

 

消息发送流程以下:

1)首先咱们要知道一点就是 Producer 端是直接与 Broker 中的 Leader Partition 交互的,因此在 Producer 端初始化中就须要经过 Partitioner 分区器从 Kafka 集群中获取到相关 Topic 对应的 Leader Partition 的元数据 。

2)待获取到 Leader Partition 的元数据后直接将消息发送过去。

3)Kafka Broker 对应的 Leader Partition 收到消息会先写入 Page Cache,定时刷盘进行持久化(顺序写入磁盘)。

4) Follower Partition 拉取 Leader Partition 的消息并保持同 Leader Partition 数据一致,待消息拉取完毕后须要给 Leader Partition 回复 ACK 确认消息。

5)待 Kafka Leader 与 Follower Partition 同步完数据并收到全部 ISR 中的 Replica 副本的 ACK 后,Leader Partition 会给 Producer 回复 ACK 确认消息。

 

根据上图以及消息发送流程能够得出:Producer 端为了提高发送效率,减小IO操做,发送数据的时候是将多个请求合并成一个个 RecordBatch,并将其封装转换成 Request 请求「异步」将数据发送出去(也能够按时间间隔方式,达到时间间隔自动发送),因此 Producer 端消息丢失更可能是由于消息根本就没有发送到 Kafka Broker 端

致使 Producer 端消息没有发送成功有如下缘由:

  • 网络缘由:因为网络抖动致使数据根本就没发送到 Broker 端。
  • 数据缘由:消息体太大超出 Broker 承受范围而致使 Broker 拒收消息。

另外 Kafka Producer 端也能够经过配置来确认消息是否生产成功:

 

在 Kafka Producer 端的 acks 默认配置为1, 默认级别是 at least once 语义, 并不能保证 exactly once 语义。

 

既然 Producer 端发送数据有 ACK 机制, 那么这里就可能会丢数据的!!!

  • acks = 0:因为发送后就自认为发送成功,这时若是发生网络抖动, Producer 端并不会校验 ACK 天然也就丢了,且没法重试。
  • acks = 1:消息发送 Leader Parition 接收成功就表示发送成功,这时只要 Leader Partition 不 Crash 掉,就能够保证 Leader Partition 不丢数据,可是若是 Leader Partition 异常 Crash 掉了, Follower Partition 还未同步完数据且没有 ACK,这时就会丢数据。
  • acks = -1 或者 all: 消息发送须要等待 ISR 中 Leader Partition 和 全部的 Follower Partition 都确认收到消息才算发送成功, 可靠性最高, 但也不能保证不丢数据,好比当 ISR 中只剩下 Leader Partition 了, 这样就变成 acks = 1 的状况了。


Broker 端丢失场景剖析


接下来咱们来看看 Broker 端持久化存储丢失场景, 对于不了解 Broker 的读者们,能够先看看 聊聊 Kafka Broker 那点事,数据存储过程以下图所示:

 

Kafka Broker 集群接收到数据后会将数据进行持久化存储到磁盘,为了提升吞吐量和性能,采用的是「异步批量刷盘的策略」,也就是说按照必定的消息量和间隔时间进行刷盘。首先会将数据存储到 「PageCache」 中,至于何时将 Cache 中的数据刷盘是由「操做系统」根据本身的策略决定或者调用 fsync 命令进行强制刷盘,若是此时 Broker 宕机 Crash 掉,且选举了一个落后 Leader Partition 不少的 Follower Partition 成为新的 Leader Partition,那么落后的消息数据就会丢失。

既然 Broker 端消息存储是经过异步批量刷盘的,那么这里就可能会丢数据的!!!

  • 因为 Kafka 中并无提供「同步刷盘」的方式,因此说从单个 Broker 来看仍是颇有可能丢失数据的。
  • kafka 经过「多 Partition (分区)多 Replica(副本)机制」已经能够最大限度的保证数据不丢失,若是数据已经写入 PageCache 中可是还没来得及刷写到磁盘,此时若是所在 Broker 忽然宕机挂掉或者停电,极端状况仍是会形成数据丢失。

 

Consumer 端丢失场景剖析


接下来咱们来看看 Consumer 端消费数据丢失场景,对于不了解 Consumer 的读者们,能够先看看 聊聊 Kafka Consumer 那点事, 咱们先来看看消费流程:

 

 

 

1)Consumer 拉取数据以前跟 Producer 发送数据同样, 须要经过订阅关系获取到集群元数据, 找到相关 Topic 对应的 Leader Partition 的元数据。

2)而后 Consumer 经过 Pull 模式主动的去 Kafka 集群中拉取消息。

3)在这个过程当中,有个消费者组的概念(不了解的能够看上面连接文章),多个 Consumer 能够组成一个消费者组即 Consumer Group,每一个消费者组都有一个Group-Id。同一个 Consumer Group 中的 Consumer 能够消费同一个 Topic 下不一样分区的数据,可是不会出现多个 Consumer 去消费同一个分区的数据。

4)拉取到消息后进行业务逻辑处理,待处理完成后,会进行 ACK 确认,即提交 Offset 消费位移进度记录。

5)最后 Offset 会被保存到 Kafka Broker 集群中的 __consumer_offsets 这个 Topic 中,且每一个 Consumer 保存本身的 Offset 进度。

根据上图以及消息消费流程能够得出消费主要分为两个阶段:

  • 获取元数据并从 Kafka Broker 集群拉取数据。
  • 处理消息,并标记消息已经被消费,提交 Offset 记录。

 

 

既然 Consumer 拉取后消息最终是要提交 Offset, 那么这里就可能会丢数据的!!!

  • 可能使用的「自动提交 Offset 方式
  • 拉取消息后「先提交 Offset,后处理消息」,若是此时处理消息的时候异常宕机,因为 Offset 已经提交了, 待 Consumer 重启后,会从以前已提交的 Offset 下一个位置从新开始消费, 以前未处理完成的消息不会被再次处理,对于该 Consumer 来讲消息就丢失了。
  • 拉取消息后「先处理消息,在进行提交 Offset」, 若是此时在提交以前发生异常宕机,因为没有提交成功 Offset, 待下次 Consumer 重启后还会从上次的 Offset 从新拉取消息,不会出现消息丢失的状况, 可是会出现重复消费的状况,这里只能业务本身保证幂等性。

04 消息丢失解决方案


上面带你从 Producer、Broker、Consumer 三端剖析了可能丢失数据的场景,下面咱们就来看看如何解决才能最大限度的保证消息不丢失。

 

Producer 端解决方案


在剖析 Producer 端丢失场景的时候, 咱们得出其是经过「异步」方式进行发送的,因此若是此时是使用「发后即焚」的方式发送,即调用 Producer.send(msg) 会当即返回,因为没有回调,可能因网络缘由致使 Broker 并无收到消息,此时就丢失了。

所以咱们能够从如下几方面进行解决 Producer 端消息丢失问题:

 

4.1.1 更换调用方式:

弃用调用发后即焚的方式,使用带回调通知函数的方法进行发送消息,即 Producer.send(msg, callback), 这样一旦发现发送失败, 就能够作针对性处理。

 

Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}

  

(1)网络抖动致使消息丢失,Producer 端能够进行重试。

(2)消息大小不合格,能够进行适当调整,符合 Broker 承受范围再发送。

经过以上方式能够保证最大限度消息能够发送成功。

 

4.1.2 ACK 确认机制:

该参数表明了对"已提交"消息的定义。

须要将 request.required.acks 设置为 -1/ all,-1/all 表示有多少个副本 Broker 所有收到消息,才认为是消息提交成功的标识。

针对 acks = -1/ all , 这里有两种很是典型的状况:

(1)数据发送到 Leader Partition, 且全部的 ISR 成员所有同步完数据, 此时,Leader Partition 异常 Crash 掉,那么会选举新的 Leader Partition,数据不会丢失, 以下图所示:

 

(2)数据发送到 Leader Partition,部分 ISR 成员同步完成,此时 Leader Partition 异常 Crash, 剩下的 Follower Partition 均可能被选举成新的 Leader Partition,会给 Producer 端发送失败标识, 后续会从新发送数据,数据可能会重复, 以下图所示:

 

所以经过上面分析,咱们还须要经过其余参数配置来进行保证:

replication.factor >= 2

min.insync.replicas > 1

这是 Broker 端的配置,下面会详细介绍。

 

4.1.3 重试次数 retries:

该参数表示 Producer 端发送消息的重试次数。

须要将 retries 设置为大于0的数, 在 Kafka 2.4 版本中默认设置为Integer.MAX_VALUE。另外若是须要保证发送消息的顺序性,配置以下:

retries = Integer.MAX_VALUE
max.in.flight.requests.per.connection = 1

这样 Producer 端就会一直进行重试直到 Broker 端返回 ACK 标识,同时只有一个链接向 Broker 发送数据保证了消息的顺序性。

 

4.1.4 重试时间 retry.backoff.ms:
该参数表示消息发送超时后两次重试之间的间隔时间,避免无效的频繁重试,默认值为100ms, 推荐设置为300ms。

 

Broker 端解决方案


在剖析 Broker 端丢失场景的时候, 咱们得出其是经过「异步批量刷盘」的策略,先将数据存储到 「PageCache」,再进行异步刷盘, 因为没有提供 「同步刷盘」策略, 所以 Kafka 是经过「多分区多副本」的方式来最大限度的保证数据不丢失。

咱们能够经过如下参数配合来保证:

4.2.1 unclean.leader.election.enable:

该参数表示有哪些 Follower 能够有资格被选举为 Leader , 若是一个 Follower 的数据落后 Leader 太多,那么一旦它被选举为新的 Leader, 数据就会丢失,所以咱们要将其设置为false,防止此类状况发生。

4.2.2 replication.factor:
该参数表示分区副本的个数。建议设置 replication.factor >=3, 这样若是 Leader 副本异常 Crash 掉,Follower 副本会被选举为新的 Leader 副本继续提供服务。

4.2.3 min.insync.replicas:
该参数表示消息至少要被写入成功到 ISR 多少个副本才算"已提交",建议设置min.insync.replicas > 1, 这样才能够提高消息持久性,保证数据不丢失。

另外咱们还须要确保一下 replication.factor > min.insync.replicas, 若是相等,只要有一个副本异常 Crash 掉,整个分区就没法正常工做了,所以推荐设置成: replication.factor = min.insync.replicas +1, 最大限度保证系统可用性。

 

Consumer 端解决方案


在剖析 Consumer 端丢失场景的时候,咱们得出其拉取完消息后是须要提交 Offset 位移信息的,所以为了避免丢数据,正确的作法是:拉取数据、业务逻辑处理、提交消费 Offset 位移信息。


咱们还须要设置参数 enable.auto.commit = false, 采用手动提交位移的方式。

另外对于消费消息重复的状况,业务本身保证幂等性, 保证只成功消费一次便可

 

05 总结


这里,咱们一块儿来总结一下这篇文章的重点。

一、从 Kafka 总体架构上概述了可能发生数据丢失的环节。

二、带你剖析了「消息传递语义」的概念, 肯定了 Kafka 只对「已提交」的消息作「最大限度的持久化保证不丢失」。

三、带你剖析了 Producer、Broker、Consumer 三端可能致使数据丢失的场景以及具体的高可靠解决方案。

若是个人文章对你有所帮助,还请帮忙点赞、在看、转发一下,很是感谢!

 

最后说一句(求关注,别白嫖我)

坚持总结, 持续输出高质量文章  关注个人微信公众号: 【华仔聊技术】

也能够加我微信好友:meng_philip, 回复 【加群】 能够跟BAT大咖一块儿学习进步