Kafka1.1.0源码分析——KafkaConsumer介绍

  |   0 评论   |   1,236 浏览

消费Kafka集群record的客户端。

该客户端透明地处理Kafka broker的故障,并透明地适应它在集群中提取迁移的topic partition。该客户还与topic进行交互,以允许Consumer Group使用Consumer Group负载均衡消费。

Consumer维护与必要broker获取数据的TCP连接。未能在使用后关闭Consumer将泄漏这些连接。Consumer不是线程安全的。有关更多详细信息,请参阅多线程处理。

跨版本兼容性

该客户端可以与0.10.0或更新版本的broker进行通信。较早或较新的topic可能不支持某些功能。例如,0.10.0broker不支持offsetsForTimes,因为此功能是在版本0.10.1中添加的。调用正在运行的broker版本上不可用的API时,您将收到org.apache.kafka.common.errors.UnsupportedVersionException。

偏移量和Consumer位置

Kafka为partition中的每条record保留一个数字偏移量。该偏移量用作该partition内record的唯一标识符,还表示Consumer在partition中的位置。例如,位于位置5的Consumer已经消费了具有偏移量0至4的record,并且将接下来以偏移量5接收record。实际上有两种与ConsumerConsumer相关的位置概念:

Consumer的位置给出了将要发布的下一条record的偏移量。它将比Consumer在该partition中看到的最高偏移量大1。每当Consumer在调用中接收到消息以进行轮询时(长),它都会自动前进。

Committed的位置是安全存储的最后一个偏移量。如果流程失败并重新启动,则这是Consumer将恢复的偏移量。Consumer可以定期自动提交偏移量;或者可以选择通过调用其中一个提交API(例如,commitSync和commitAsync)来手动控制此提交位置。

这种区分使Consumer能够控制何时考虑使用record。这在下面进一步详细讨论。

Consumer Group和topic订阅

Kafka使用Consumer Group的概念来允许一系列进程来划分消费和处理record的工作。这些进程可以在同一台机器上运行,也可以分布在多台机器上,为处理提供可扩展性和容错能力。所有共享同一个group.id的Consumer实例将成为同一个Consumer Group的一部分。

组中的每个Consumer可以通过其中一个订阅API动态设置它想要订阅的topic列表。 Kafka会将订阅topic中的每条消息发送给每个Consumer Group中的一个进程。这是通过平衡Consumer组中所有成员之间的partition来实现的,以便每个partition只能分配给组中的一个Consumer。所以如果有一个包含四个partition的topic和一个包含两个进程的Consumer Group,每个进程将从两个partition中消费。

Consumer组中的成员资格是动态维护的:如果进程失败,分配给它的partition将被rebalance给同一组中的其他使用者。同样,如果新的Consumer加入该组,partition将从现有的Consumer转移到新的Consumer。这被称为rebalancing,下面将对此进行更详细的讨论。将新partition添加到其中一个订阅topic或者创建了与订阅的正则表达式匹配的新topic时,也会使用组rebalance。该组将通过定期元数据刷新自动检测新partition,并将其分配给组成员。

从概念上讲,您可以将Consumer组视为恰好由多个进程组成的单个逻辑Consumer。作为一个多Consumer系统,Kafka自然地支持为特定topic提供任意数量的Consumer Group而不需要重复数据(其他Consumer实际上相当便宜)。

这是消息传递系统中常见功能的轻微泛化。为了获得类似于传统消息系统中的队列的语义,所有进程将成为单个Consumer Group的一部分,因此record传递将在群组中平衡,如队列。与传统的消息传递系统不同,您可以拥有多个此类组。为了在传统的消息传递系统中获得与pub-sub类似的语义,每个进程都有自己的Consumer Group,因此每个进程都会订阅发布到该topic的所有record。

此外,当组rebalance是自动发生的,Consumer可以通过ConsumerRebalanceListener,这允许它们以完成必要的应用程序级的逻辑,诸如状态清理通知,手动偏移提交,等等,可以从在Kafka之外存储偏移量,以获取更多细节。

Consumer也可以使用assign(Collection)手动分配特定partition(类似于较旧的“简单”Consumer)。在这种情况下,动态partition分配和Consumer组协调将被禁用。

检测Consumer故障

在订阅了一组topic后,当poll(long)被调用时,Consumer将自动加入该组。poll API旨在确保Consumer的生活。只要您继续调用poll,Consumer就会留在该组中,并继续接收来自其分配的partition的消息。在底层,Consumer定期向服务器发送心跳。如果Consumer在session.timeout.ms的持续时间内崩溃或无法发送心跳,那么Consumer将被视为死亡,并且其partition将被rebalance。

Consumer也有可能会遇到“活锁”情况,即持续发送心跳,但没有取得进展。为了防止Consumer在这种情况下无限期地保持其partition,我们使用max.poll.interval.ms设置提供活跃检测机制。基本上,如果您至少不像配置的最大时间间隔那么频繁地调用轮询,则客户端将主动离开组,以便另一个Consumer可以接管其partition。发生这种情况时,您可能会看到偏移提交失败(如调用commitSync()时抛出的CommitFailedException所示)。这是一种安全机制,可以确保只有该组的活动成员才能执行offset。所以留在小组中,你必须继续调用poll。

Consumer提供两个配置设置来控制轮询循环的行为:

max.poll.interval.ms

通过增加期望的poll之间的时间间隔,您可以给Consumer更多的时间来处理从poll(长)返回的一批record。缺点是增加此值可能会延迟群组rebalance,因为Consumer只会在轮询调用内部加入rebalance。您可以使用此设置来限制完成rebalance的时间,但如果Consumer实际上无法经常调用轮询,则风险可能会更慢。

max.poll.records

使用此设置可将单个调用返回的record总数限制为轮询。这可以更容易地预测每个轮询时间间隔内必须处理的最大值。通过调整此值,您可以减少轮询间隔,这将减少组rebalance的影响。

对于消息处理时间不可预测变化的用例,这些选项都不足以满足要求。处理这些情况的推荐方式是将消息处理移动到另一个线程,以便Consumer在处理器仍在工作时继续调用轮询。必须小心确保Committed的偏移量不超过实际位置。通常,只有在线程完成处理后(取决于所需的传递语义),您必须禁用自动提交并手动提交record的已处理偏移量。另请注意,您需要暂停partition,以便在线程完成处理之前返回的内容之后,不会收到来自轮询的新record。

用法示例

消费者API提供灵活性以涵盖各种消费用例。 以下是一些演示如何使用它们的示例。

这个例子演示了Kafka的依赖于自动偏移提交的消费者API的简单用法。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
   ConsumerRecords<String, String> records = consumer.poll(100);
   for (ConsumerRecord<String, String> record : records)
       System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

通过使用配置> bootstrap.servers指定要联系的一个或多个broker的列表来引导与集群的连接。此列表仅用于发现集群中的其他broker,不必是集群中服务器的详尽列表(尽管如果客户端连接时服务器停止运行,您可能需要指定多个列表)。

设置enable.auto.commit意味着偏移是通过config auto.commit.interval.ms控制的频率自动提交的。

在这个例子中,Consumer正在订阅topic foo和bar,作为一组名为test的Consumer的一部分,如group.id配置。

deserializer设置指定如何将字节转换为对象。例如,通过指定字符串反序列化器,我们说我们的record的键和值只是简单的字符串。

手动偏移控制

Consumer不必依赖Consumer定期提交消费的偏移量,而是可以控制何时应将record视为消费并因此提交offset。当消息消费与某些处理逻辑相结合时,这非常有用,因此在完成处理之前不应将消息视为消费。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
   ConsumerRecords<String, String> records = consumer.poll(100);
   for (ConsumerRecord<String, String> record : records) {
       buffer.add(record);
   }
   if (buffer.size() >= minBatchSize) {
       insertIntoDb(buffer);
       consumer.commitSync();
       buffer.clear();
   }
}

在这个例子中,我们将消费一批record并将其批量存储在内存中。当我们有足够的batch record时,我们会将它们插入到数据库中。如果我们允许偏移量按照前面的示例进行自动提交,那么record会在轮询返回给Consumer后被视为消费。然后,我们的流程可能会在批处理record之后失败,但在它们被插入数据库之前就会失败。

为了避免这种情况,只有在相应的record被插入数据库后,我们才会手动提交偏移量。这使我们能够精确控制record被视为消费的时间。这提出了相反的可能性:在插入到数据库之后但在提交之前的时间间隔内,进程可能会失败(尽管这可能仅仅是几毫秒,但这是可能的)。在这种情况下,接管消费的进程将消费上次提交的偏移量,并重复插入最后一批数据。以这种方式使用Kafka提供了通常所说的“至少一次”交付保证,因为每个record可能会被交付一次,但在失败的情况下可能会被复制。

注意:使用自动偏移提交也可以为您提供“至少一次”投递,但要求您必须将每次调用返回的所有数据都消费掉,以便在任何后续调用之前或在关闭使用者之前进行轮询(长)。如果您未能完成其中任何一项,则Committed的偏移量可能会超过消费的位置,从而导致record丢失。使用手动偏移控制的优点是,您可以直接控制record被视为“消费”的时间。

以上示例使用commitSync将所有收到的record标记为已提交。在某些情况下,您可能希望通过明确指定偏移量来更好地控制哪些record已被提交。在下面的例子中,我们在完成处理每个partition中的record之后提交偏移量。

try {
   while(running) {
       ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
       for (TopicPartition partition : records.partitions()) {
           List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
           for (ConsumerRecord<String, String> record : partitionRecords) {
               System.out.println(record.offset() + ": " + record.value());
           }
           long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
           consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
       }
   }
} finally {
 consumer.close();
}

注意:提交的偏移量应始终是应用程序将读取的下一条消息的偏移量。因此,在调用commitSync(偏移量)时,应该在最后处理的消息的偏移量上加1。

手动partition分配

在前面的示例中,我们订阅了我们感兴趣的topic,并让Kafka根据组中的活动Consumer为这些topic动态分配partition的公平份额。但是,在某些情况下,您可能需要更好地控制分配的特定partition。例如:

  • 如果进程正在维护与该partition相关的某种本地状态(如本地磁盘上的键值存储),那么它应该只获取它在磁盘上维护的partition的record。

  • 如果进程本身具有高可用性,并在失败时重启(可能使用YARN,Mesos或AWS工具等集群管理框架,或作为流处理框架的一部分)。在这种情况下,不需要Kafka检测到故障并rebalance partition,因为消费进程将在另一台计算机上重新启动。

要使用这种模式,您不必使用订阅topic,只需使用您想要使用的partition的完整列表来调用assign(Collection)。

String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));

分配后,您可以循环调用轮询,就像在前面的示例中一样,以消费record。Consumer指定的组仍然用于提交偏移量,但现在该partition组只会随着另一个要分配的调用而改变。手动partition分配不使用组协调,因此Consumer故障不会导致分配的partition rebalance。即使每个Consumer与另一个Consumer共享一个组,也会独立行事。为避免偏移提交冲突,通常应确保groupId对于每个Consumer实例都是唯一的。

请注意,无法通过topic订阅(即使用订阅)将手动partition分配(即使用分配)与动态partition分配相混合。

在Kafka外存储偏移量

Consumer应用程序不需要使用Kafka的内置偏移量存储,它可以将偏移量存储在自己选择的存储中。对此的主要用例是允许应用程序将偏移量和消费结果存储在同一个系统中,使得结果和偏移量都以原子方式存储。这并不总是可能的,但是当它发生时,它将使消费完全原子化,并给予“恰好一次”的语义,这比使用Kafka的偏移提交功能获得的默认“至少一次”语义更强。

这里有几个这种用法的例子:

  • 如果消费结果存储在关系数据库中,则将偏移量存储在数据库中也可以允许在单个事务中提交结果和偏移量。因此,事务将成功,并且偏移量将根据所消费的内容或结果不会被更新并且偏移量不会被更新。

  • 如果结果存储在本地存储中,那么也可以在那里存储偏移量。例如,可以通过订阅特定partition并将偏移量和索引数据一起存储来构建搜索索引。如果这是以原子的方式完成的,那么即使发生崩溃而导致不同步的数据丢失,但是仍然存在相应的偏移量,通常可能会出现这种情况。这意味着在这种情况下,返回丢失最近更新的索引进程只是从它所具有的索引中恢复索引,以确保不会丢失更新。

每条record都带有自己的偏移量,所以要管理自己的偏移量,只需执行以下操作:

  • 配置enable.auto.commit = false

  • 使用每个ConsumerRecord提供的偏移量来保存您的位置。

  • 在重新启动时,使用seek(TopicPartition,long)恢复Consumer的位置。

当partition分配也是手动完成时(这可能在上述搜索索引用例中),这种用法是最简单的。如果partition分配自动完成,需要特别注意处理partition分配更改的情况。这可以通过在对subscribe(Collection,ConsumerRebalanceListener)和subscribe(Pattern,ConsumerRebalanceListener)的调用中提供ConsumerRebalanceListener实例来完成。例如,当从Consumer获取partition时,Consumer将通过实施ConsumerRebalanceListener.onPartitionsRevoked(Collection)来提交这些partition的偏移量。当partition分配给Consumer时,Consumer需要查找这些新partition的偏移量,并通过实现ConsumerRebalanceListener.onPartitionsAssigned(Collection)将Consumer正确初始化到该位置。

ConsumerRebalanceListener的另一个常见用途是刷新应用程序为其他位置移动的partition而保留的任何缓存。

控制Consumer的位置

在大多数使用情况下,Consumer将从头到尾简单地使用record,定期提交其位置(自动或手动)。然而Kafka允许Consumer手动控制其位置,随意在partition中向前或向后移动。这意味着Consumer可以重新使用旧的record,或跳过最近的record而不需要实际使用中间record。

有几种情况可以手动控制Consumer的位置。

一种情况是对时间敏感的record处理,对于落后远远不足以追赶处理所有record的Consumer而言,它可能是有意义的,而只是跳到最近的record。

另一个用例是用于维护本地状态的系统,如前一节所述。在这样的系统中,Consumer希望在启动时将其位置初始化为本地存储中包含的任何位置。同样,如果本地状态被破坏(比如说因为磁盘丢失),通过重新使用所有数据并重新创建状态(假设Kafka保留足够的历史record),可以在新机器上重新创建状态。

Kafka允许使用seek(TopicPartition,long)指定位置来指定新的位置。用于寻求服务器维护的最早和最近偏移的特殊方法也可用(分别为seekToBeginning(Collection)和seekToEnd(Collection))。

消费流量控制

如果Consumer被分配了多个partition来获取数据,它将尝试同时从所有partition中消费,从而有效地为这些partition提供相同的优先级以供消费。但是在某些情况下,Consumer可能需要首先专注于从指定partition的全部速度中提取某些子集,并且只有在这些partition中的数据很少或不需要消费时才开始提取其他partition。

其中一种情况是流处理,其中处理器从两个topic中提取并在这两个流上执行连接。当其中一个topic长期滞后于另一个topic时,处理器想暂停从前面的topic中获取,以便赶上滞后的流。另一个例子是,当Consumer开始有很多历史数据赶上的时候,这些应用程序通常希望获得关于某些topic的最新数据,然后再考虑获取其他topic。

Kafka支持通过使用暂停(收集)和恢复(收集)来暂停指定分配的partition上的消费并分别在未来的轮询(长)调用中恢复指定的暂停partition上的消费,从而动态控制消费流量。

读取事务性消息

在Kafka 0.11.0中引入了事务,其中应用程序可以以原子方式写入多个topic和partition。为了实现这一点,从这些partition读取的Consumer应该被配置为只读取提交的数据。这可以通过在Consumer的配置中设置isolation.level = read_committed来实现。

在read_committed模式下,使用者将只读取已成功提交的事务消息。它将继续像以前一样读取非事务性消息。在read_committed模式下没有客户端缓冲。相反,read_committed使用者的partition的结束偏移量将是属于未处理事务的partition中第一条消息的偏移量。该偏移量被称为“最后稳定偏移量”(LSO)。

一个read_committed的Consumer只会读取LSO并过滤掉任何已被中止的事务性消息。 LSO还影响read_committed使用者的seekToEnd(Collection)和endOffsets(Collection)的行为,其详细信息位于每个方法的文档中。最后,读取滞后指标也被调整为相对于读取委托Consumer的LSO。

具有事务消息的partition将包括指示事务结果的提交或中止标记。标记不会返回到应用程序,但日志中有偏移量。因此,使用交易消息从topic阅读的应用程序将看到消费的偏移量中存在差距。这些缺失的消息将成为事务标记,并在两个隔离级别中被Consumer过滤掉。此外,使用read_committed的使用者的应用程序也可能会由于中止事务而看到间隙,因为Consumer不会返回这些消息,而是会有有效的偏移量。

多线程处理

KafkaConsumer不是线程安全的。所有网络I / O都发生在调用的应用程序的线程中。Consumer有责任确保多线程访问已正确同步。未同步的访问将导致ConcurrentModificationException。

这个规则唯一的例外是wakeup(),它可以安全地从外部线程使用来中断一个活动的操作。在这种情况下,将从该操作的阻塞线程抛出org.apache.kafka.common.errors.WakeupException。这可以用来关闭另一个线程的Consumer。以下片段显示了典型模式:

public class KafkaConsumerRunner implements Runnable {
	private final AtomicBoolean closed = new AtomicBoolean(false);
	private final KafkaConsumer consumer;

	public void run() {
	   try {
	       consumer.subscribe(Arrays.asList("topic"));
	       while (!closed.get()) {
	           ConsumerRecords records = consumer.poll(10000);
	           // Handle new records
	       }
	   } catch (WakeupException e) {
	       // Ignore exception if closing
	       if (!closed.get()) throw e;
	   } finally {
	       consumer.close();
	   }
	}
	
	// Shutdown hook which can be called from a separate thread
	public void shutdown() {
	   closed.set(true);
	   consumer.wakeup();
	}
}

然后在一个单独的线程中,可以通过设置关闭标志并唤醒Consumer来关闭Consumer。

closed.set(true);
consumer.wakeup();

请注意,尽管可以使用线程中断而不是wakeup()来中止阻塞操作(在这种情况下,将引发InterruptException),但我们不鼓励使用它们,因为它们可能导致Consumer的干净关闭被中止。对于那些不能使用唤醒()的情况,主要支持中断。当Consumer线程由不知道Kafka客户端的代码管理时。

我们故意避免实现特定的线程模型进行处理。这为实现record的多线程处理留下了几个选项。

1.每个线程一个Consumer

一个简单的选择是给每个线程自己的Consumer实例。以下是这种方法的优缺点:

  • PRO:这是最容易实现的

  • PRO:它通常是最快的,因为不需要线程间的协调

  • PRO:它使得每个partition的按序处理非常容易实现(每个线程只按照接收它们的顺序处理消息)。

  • CON:更多的Consumer意味着更多的TCP连接到集群(每个线程一个)。通常Kafka非常有效地处理连接,所以这通常是一个小的成本。

  • CON:多个Consumer意味着更多的请求被发送到服务器,并且稍微减少可能导致I / O吞吐量下降的数据批量。

  • CON:所有进程中的总线程数将受到partition总数的限制。

2.解耦消费和处理

另一种方法是使一个或多个Consumer线程执行所有数据消费,并将ConsumerRecords实例移交给实际处理record处理的处理器线程池所消费的阻塞队列。该选项同样具有优点和缺点:

  • PRO:该选项允许独立扩展Consumer和处理器的数量。这可以让单个Consumer提供多个处理器线程,避免对partition进行任何限制。

  • CON:保证处理器之间的顺序需要特别小心,因为线程将独立执行,因为线程执行时机的运行,较早的数据块可能会在稍后的数据块之后实际处理。对于没有订阅要求的处理来说,这不是问题。

  • CON:手动提交该位置变得更加困难,因为它要求所有线程协调一致以确保该partition的处理已完成。

这种方法有许多可能的变化。例如,每个处理器线程可以有自己的队列,并且Consumer线程可以使用TopicPartition散列到这些队列中以确保


读后有收获可以支付宝请作者喝咖啡