Kafka1.1.0源码分析——KafkaConsumer消费消息的过程

  |   0 评论   |   1,156 浏览

订阅Topic

org.apache.kafka.clients.consumer.KafkaConsumer#subscribe

订阅给定的Topic列表以获取动态分配的分区。Topic订阅不是增量式的。该列表将取代当前的分配(如果有的话)。请注意,无法通过 assign(Collection)将Topic订阅与组管理进行手动分区分配。如果给定的Topic列表为空,则将其视为与unsubscribe()相同。

作为组管理的一部分,Consumer将跟踪属于特定组的Consumer列表,并在触发以下任何事件时触发重新平衡操作:

  • 任何订阅Topic的分区数都会改变

  • 订阅的Topic被创建或删除

  • Consumer组的现有成员关闭或失败

  • 一个新成员被添加到Consumer组

当这些事件中的任何一个被触发时,将首先调用提供的侦听器,以指示Consumer的分配已被撤销,然后在收到新分配时再次被调用。请注意,重新平衡只会在激活的轮询(长)期间发生,因此回调也只会在此期间被调用。提供的侦听器将立即覆盖先前调用订阅中设置的任何侦听器。但是,保证通过此接口撤消/分配的分区来自此调用中订阅的Topic。

image.png

image.png

拉取消息

org.apache.kafka.clients.consumer.KafkaConsumer#poll

获取使用其中一个订阅/分配API指定的主题或分区的数据。 在轮询数据之前未订阅任何主题或分区是错误的。

在每次投票中,消费者将尝试使用最后一次消费的偏移量作为起始偏移量并按顺序提取。 最后消耗的偏移量可以通过seek(TopicPartition,long)手动设置,或者自动设置为订阅的分区列表的最后提交偏移量有关更多详细信息,请参阅ConsumerRebalanceListener。

image.png

org.apache.kafka.clients.consumer.KafkaConsumer#pollOnce

做一轮拉取。 除了检查新数据之外,这还会执行任何所需的偏移量提交(如果启用了自动提交)和偏移量重置(如果定义了偏移重置策略)。

image.png

image.png

org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient#poll

image.png

image.png

org.apache.kafka.clients.NetworkClient#poll

image.png

org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient#trySend

image.png

读后有收获可以支付宝请作者喝咖啡