Kafka1.1.0源码分析——KafkaProducer发送消息的过程

  |   0 评论   |   2,490 浏览

概述

Kafka提供了一套网络协议,只要遵守这套协议的格式,就可以向Kafka发送消息,也可以拉取消息。Kafka提供了Java版本的Producer实现——KafkaProducer,其可以轻松实现同步/异步发送消息、批量发送、超时重发等功能。

使用KafkaProducer发送消息的一般流程

简单的阻塞调用

byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record).get();

非阻塞的调用

提供回调函数,当请求完成时将会调用回调函数

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(myRecord,
new Callback() {
    public void onCompletion(RecordMetadata metadata, Exception e) {
        if(e != null) {
           e.printStackTrace();
        } else {
           System.out.println("The offset of the record we just sent is: " + metadata.offset());
        }
    }
});

发送消息的整体流程

image.png

各个步骤的含义

  1. ProducerIntercptor对消息进行拦截

  2. Serialzer对key和value进行序列化

  3. Partitioner对消息选择合适的分区

  4. RecordAccumulator收集消息,实现批量发送

  5. Sender从RecordAccumulator获取消息

  6. 构造ClientRequest

  7. 将ClientRequest交给Network,准备发送

  8. Network将请求放入KafkaChannel的缓存

  9. 发送请求

  10. 收到响应,调用ClientResponse

  11. 调用RecordBatch的回调函数,最终调用到每一个消息上注册的回调函数

在这里主要涉及到两个线程:

主线程主要负责封装消息成ProducerRecord对象,之后调用send方法将消息放入RecordAccumulator中暂存

Sender线程负责将消息构造成请求,并从RecordAccumulator取出消息消息并批量发送

发送消息过程分析

send方法

通过上面的示例可以看到,send方法是发送消息的入口方法,实现如下:

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // 拦截可能被修改的Record; 此方法不会抛出异常
    ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
    return doSend(interceptedRecord, callback);
}

该方法异步地向Topic发送record,并在发送已确认时回调提供的回调函数。

发送是异步的,一旦record被存储在等待发送的缓冲区中,该方法将立即返回。

这允许并行地发送多个record,而不是阻塞在每个record之后等待响应。


发送的结果是一个RecordMetadata,指定了record发送到的分区,分配的偏移量和record的时间戳。

如果Topic使用CreateTime,则时间戳将是用户提供的时间戳或record发送时间(如果用户未指定record的时间戳记)。

如果该Topic使用LogAppendTime,则附加消息时,时间戳将是Kafka broker本地时间。


由于发送调用是异步的,因此将返回将被分配给此record的RecordMetadata的Future。在这个Future上调用get()会阻塞,直到关联的请求完成,然后返回record的Metadata或抛出发送record时发生的任何异常。

对于发送到同一分区的record的回调函数保证按顺序执行。也就是说,在下面的例子中,callback1保证在callback2之前执行:

producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);

当作为事务的一部分使用时,不需要为了检测发送错误而定义回调函数或检查future结果。如果出现任何不可恢复的错误导致发送调用失败,则最终的commitTransaction()调用将失败并抛出上次失败的发送的异常。发生这种情况时,应用程序应调用abortTransaction()来重置状态并继续发送数据。


调用abortTransaction()不能解决某些事务性发送错误。特别是,如果事务发送带有ProducerFencedException、org.apache.kafka.common.errors.OutOfOrderSequenceException、org.apache.kafka.common.errors.UnsupportedVersionException,或AuthorizationException,那么唯一的选择就是调用close()。致命错误致使Producer进入停业状态,其中future的API调用将继续提出同样的错误underyling包裹在一个新的KafkaException。


当幂等性启用,但没有配置transactional.id时,将是相同的画面。在这种情况下,org.apache.kafka.common.errors.UnsupportedVersionException和AuthorizationException被认为是致命的错误。但是,ProducerFencedException不需要处理。另外,也能够继续接收org.apache.kafka.common.errors.OutOfOrderSequenceException后发送,但是这样做可能会导致出未决的消息的顺序递送。为了确保正确的排序,你应该关闭Producer并创建一个新的实例。


如果目标Topic的消息格式没有升级到0.11.0.0,幂等与事务产生的请求将失败,并报出org.apache.kafka.common.errors.UnsupportedForMessageFormatException错误。如果在事务过程中遇到这种情况,则可以中止并继续。但请注意,直到Topic升级前,发送到同一Topic将继续接收相同的异常。


请注意,回调函数通常会在Producer的I/O线程中执行,因此应该相当快,否则会延迟从其他线程发送消息。如果要执行阻塞或计算昂贵的回调函数,建议使用自己的java.util.concurrent.Executor在回调函数中并行处理。


ProducerIntercptor对消息进行拦截

定义方法

通过参数interceptor.classes指定多个拦截器,默认为空

ProducerInterceptors#onSend

遍历拦截器,调用其onSend方法,对Record进行处理

image.png

这是在客户端将记录发送到KafkaProducer之前调用的,在key和value被序列化之前。 该方法调用ProducerInterceptor.onSend(ProducerRecord)方法。

从第一个拦截器的onSend()返回的ProducerRecord被传递给第二个拦截器onSend(),等等在拦截器链中。 从这个方法返回从最后一个拦截器返回的记录。 此方法不会抛出异常。 任何拦截器方法抛出的异常都会被捕获并被忽略。 如果链中间的拦截器(通常修改记录)引发异常,链中的下一个拦截器将被前一个拦截器返回的记录调用,该记录不会引发异常。

KafkaProducer#doSend概览

序列化key & value

image.png

计算分区、将record放入RecordAccumulator队列

image.png

Serialzer对key和value进行序列化

定义方法

key.serializer=

value.serializer=

调用keySerializer、valueSerializer对key和value进行序列化

image.png

Partitioner对消息选择合适的分区

image.png

image.png

调用partitioner.partition对record进行分区

RecordAccumulator收集消息,实现批量发送

image.png

org.apache.kafka.clients.producer.internals.RecordAccumulator#append

查找或创建RecordBatch,然后尝试追加Record

image.png

没有in-progress的RecordBatch,申请并创建一个新的,然后尝试追加Record

image.png

Sender从RecordAccumulator获取消息

org.apache.kafka.clients.producer.internals.Sender#run(long)

如果配置了事务

image.png

image.png

Sender#sendProducerData

获取待发送数据的分区列表

如果有分区的leader未知,则强制刷新元数据

image.png

创建ProduceRequest

image.png

image.png

构造ClientRequest

org.apache.kafka.clients.producer.internals.Sender#sendProduceRequest

image.png

image.png

将ClientRequest交给Network,准备发送

image.png

Network将请求放入KafkaChannel的缓存

org.apache.kafka.clients.NetworkClient#doSend

image.png

发送请求

org.apache.kafka.clients.NetworkClient#poll

image.png

收到响应,调用ClientResponse

org.apache.kafka.clients.NetworkClient#completeResponses

image.png

调用RecordBatch的回调函数,最终调用到每一个消息上注册的回调函数

org.apache.kafka.clients.ClientResponse#onComplete

image.png


读后有收获可以支付宝请作者喝咖啡