Kafka中的消息传递保证语义

  |   0 评论   |   3,916 浏览

Kafka提供的消息传递保证语义

Kafka提供的Producer和Consumer之间的消息传递保证语义(Message Delivery Guarantee Semantics)有以下三种

  • At most once—消息可能会丢,但绝不会重复传递;

  • At least once—消息绝不会丢,但可能会重复传递;

  • Exactly once—每条消息只会被精确地传递一次:既不会多,也不会少;

可以分为两个问题来看:发送消息的保证和消费消息的保证。

Producer的消息传递语义

at-least-once传递语义

当producer向broker发送消息时,一旦这条消息被commit,由于副本机制(replication)的存在,它就不会丢失。但是如果producer发送数据给broker后,遇到的网络问题而造成通信中断,那producer就无法判断该条消息是否已经提交(commit)。

在0.11.0.0之前,如果一个Producer没有收到消息提交的响应,它只能重新发送消息,确保消息已经正确传输到broker中,这提供了at-least-once传递语义,因为如果原来的请求实际上成功了,则在重新发送时将再次把消息写入到日志中。

exactly-once传递语义

自0.11.0.0起,Kafka Producer支持幂等传递选项,保证重新发送不会导致在日志中出现重复项。为了实现这个目的,broker为每个Producer分配一个ID,并通过每个消息的序列号来进行去重。

启用幂等传递的方法是配置:enable.idempotence=true

从0.11.0.0开始,Producer支持使用类似事务的语义将消息发送到多个topic分区:即所有消息要么都被成功写入,或者都没有。这个主要用于Kafka topic之间“exactly-once“处理。

启用事务支持的方法是:设置属性transcational.id为一个指定字符串

并不是所有的场景都需要这么强的保证,对于延迟敏感的情况,Producer可以通过request.required.acks参数指定它期望的持久性级别。如Producer指定它想要等待消息的committed,则这可能需要10毫秒量级的延迟。然而,Producer也可以指定它想要完全异步地执行发送,或者它只等到leader(不需要副本的响应)的响应。

request.required.acks持久性级别

参数值解释优缺点
0producer发出消息即完成发送,不等待确认延迟最小、可靠性最差,最容易丢失消息
1当且仅当leader收到消息返回确认信号后认为发送成功只有当leader crash,而且未被同步至其他follower时才丢消息
-1只有当leader以及所有follower都收到消息确认后,才发送成功最好的可靠性,延迟也较大。但是还是有可能丢消息

Consumer的offset记录方式

记录offset的位置

我们知道,Kafka的服务端并不会记录Consumer的消费位置,而是由Consumer自己决定如何如何保存、如何记录其消费的offset。旧版本的Kafka将消费位置记录在Zookeeper中,在新版本中,为了缓解Zookeeper集群的压力,在Kafka的服务端中添加了一个名为”__consumer_offsets“的内部Topic

image.png

使用"__consumer_offsets" Topic记录Consumer的消费位置只是默认选项,仍然可以根据业务需求将offset记录在别的存储系统中。

当记录offset到外部系统时,需要将Consumer的当前位置与实际要存储为输出的位置进行协调。实现这一目标的典型方法是在Consumer位置的存储和Consumer输出的存储之间引入两阶段的”提交“。也可以更简单一些,通过让Consumer将其offset存储在与其输出相同的位置。这样最好,因为大多数的输出系统不支持两阶段”提交“。例如,一个Kafka Connect connector,它填充HDFS中的数据以及它读取的数据的offset,以保证数据和offset都被更新,或者都不更新。 对于需要这些更强大语义的许多其他数据系统,我们遵循类似的模式,为此,消息不具有用来去重的主键。

提交offset方法

在Consumer消费消息的过程中,提交offset的时机显得非常重要,因为它决定了Consumer故障重启后的消费位置。

自动提交offset

通过将enable.auto.commit设置为true,可以启用自动提交,auto.commit.interval.ms则设置了自动提交的时间间隔。

自动提交是由轮询循环驱动的。当轮询时,Consumer检查是否提交,如果是的话,它将提交上次轮询中返回的偏移量。

手动提交offset

  • org.apache.kafka.clients.consumer.KafkaConsumer#commitSync()    同步提交

  • org.apache.kafka.clients.consumer.KafkaConsumer#commitAsync()    异步提交

Consumer的消息传递语义

如果Consumer从未崩溃,它可以将这个位置存储在内存中,但是如果Consumer失败了,我们希望这个topic分区被另一个进程接管,那么新进程将需要选择一个合适的位置开始处理。假设Consumer读取了一些信息 - 它有几个选项用于处理消息并更新其位置。

at-most-once传递语义

读取消息,然后在日志中保存它的位置,最后处理消息。

在这种情况下,有可能Consumer保存了位置之后,但是处理消息输出之前崩溃了。如下图

less.png

在这种情况下,接管处理的进程会在已保存的位置开始,即使该位置之前有几个消息尚未处理。在Consumer处理失败消息的情况下,不进行处理。

at-least-once传递语义

读取消息,处理消息,最后保存消息的位置。

在这种情况下,可能消费进程处理消息之后,但保存它的位置之前崩溃了。如下图

twins.png

在这种情况下,当新的进程接管了它,这将接收已经被处理的前几个消息。这就符合了“至少一次”的语义。在多数情况下消息有一个主键,以便更新幂等(其任意多次执行所产生的影响均与一次执行的影响相同)。

exactly-once传递语义

当从一个Kafka Topic消费并Produce到另一个topic时(例如Kafka Stream),我们可以利用之前提到0.11.0.0中的Producer的新事务功能。Consumer的位置可以像一个消息一样存储到topic中,我们就可以使用接收处理数据并输出topic相同的事务,将offset写入到Kafka。如果事务中断,则Consumer的位置将恢复到老的值,根据其”隔离级别“,其他Consumer将不会看到输出topic产生的数据,在默认的”读取未提交“隔离级别中,所有消息对Consumer都是可见的,即使是被中断的事务的消息。但是在”读取已提交“中,Consumer将只从已提交的事务中返回消息。

总结

  1. Kafka默认是保证“至少一次”消息传递;

  2. 通过禁止Producer重试和在处理消息前提交它的offset可以实现“最多一次”消息传递;

  3. 在Kafka Streams中支持“正好一次”消息传递;

  4. 在Kafka topics之间传递和处理数据时,通过带事务的Producer/Consumer也可以提供“正好一次”的消息传递;

  5. 当和其它存储系统传递数据时,如果要保证“正好一次”消息传递,需要与其配合,但Kafka提供了偏移量,所以实现起来也很简单(可参考Kafka Connect)。

读后有收获可以支付宝请作者喝咖啡