Kafka1.1.0源码分析——KafkaProducer介绍

  |   0 评论   |   1,590 浏览

基本知识

KafkaProducer是一个将record发布到Kafka集群的Kafka客户端。

Producer是线程安全的,跨线程共享单个Producer实例通常比拥有多个实例更快。

下面是一个使用Producer发送包含连续数字的字符串作为键/值对的record的简单示例。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
    producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();

异步调用

Producer由缓冲区空间池组成,该缓冲区空间保存尚未传输到服务器的record以及后台I/O线程,后者负责将这些record转换为请求并将其传输到群集。使用后未能关闭Producer将会泄露这些资源。

send()方法是异步的。当被调用时,它将record添加到暂挂record发送的缓冲区并立即返回。这使得Producer可以将单个record合并进行批处理以提高效率。

Producer维护每个Partition的未发送record的缓冲区。这些缓冲区的大小由batch.size配置指定。使其变大可以在一个批次中处理更多消息,但需要更多的内存(因为我们通常对每个活动的Partition都有一个缓冲区)。

默认情况下,即使缓冲区中存在额外的未使用空间,缓冲区也可立即发送。但是,如果你想减少请求数,你可以将linger.ms设置为大于0的值。这将指示Producer在发送请求之前等待这个毫秒数,等待更多的record到达以填满同一批次。这与TCP中的Nagle算法类似。例如,在上面的代码片段中,由于我们将逗留时间设置为1毫秒,因此可能所有100条record都将在单个请求中发送。然而,如果我们没有填充缓冲区,这个设置会给我们的请求增加1毫秒的延迟,等待更多的record到达。请注意,即使在linger.ms = 0的情况下,到达时间接近的record通常也会放在一个批次中,因此在高负载下,无论持续配置如何,都会发生配料;然而,将此设置为大于0的值可能获得更少的、更高效的请求,代价是少量的延迟。

buffer.memory控制Producer可用于缓冲的总内存量。如果record的发送速度比它们可以传送到服务器的速度快,那么这个缓冲区空间将被耗尽。当缓冲区空间耗尽时,其他发送调用将被阻止。阻塞时间的阈值由max.block.ms决定,之后它会引发TimeoutException。

持久性

acks配置用来控制请求被认为完成的标准。我们指定的“all”设置会导致阻塞在record的完全提交上,这是最慢但最持久的设置。

自动重试

如果请求失败,Producer可以自动重试,但由于我们将重试次数指定为0,它不会进行重试。启用重试也可能会导致重复(有关详细信息,请参阅有关消息传递语义的文档)。

序列化

key.serializer和value.serializer指示如何将用户提供的ProducerRecord的键和值对象转换为字节。对于简单的字符串或字节类型,您可以使用内置的org.apache.kafka.common.serialization.ByteArraySerializer或org.apache.kafka.common.serialization.StringSerializer。

幂等特性

从Kafka 0.11开始,KafkaProducer支持另外两种模式:幂等Producer和事务性Producer。幂等Producer将Kafka的交付语义从至少一次加强到刚好一次交付。特别是Producer重试将不再引入重复。事务Producer允许应用程序自动发送消息到多个Partition(和Topic!)。

要启用幂等特性,必须将enable.idempotence配置设置为true。如果设置,重试配置将默认为Integer.MAX_VALUE,并且ack配置将默认为全部。幂等Producer没有API更改,所以现有的应用程序利用此功能将不需要修改。

为了利用幂等Producer,必须避免应用程序级别的重复发送,因为这些不能去重。因此,如果应用程序启用了idempotence,则建议不要配置重试,因为它将默认为Integer.MAX_VALUE。此外,如果发送(ProducerRecord)即使进行无限次重试也会返回错误(例如,如果消息在发送前在缓冲区中到期),则建议关闭Producer并检查最后生成的消息的内容以确保它不重复。最后,Producer只能保证在单个会话中发送的消息的幂等性。

事务性保证

要使用事务性Producer和伴随的API,您必须设置transactional.id配置属性。如果设置了transactional.id,则Producer自动启用idempotence,这是幂等函数所依赖的。此外,事务中包含的Topic应配置为持久性。特别是,replication.factor应该至少为3,并且这些Topic的min.insync.replicas应该设置为2.最后,为了使事务保证从端到端实现,Consumer必须是配置为只读取已提交的消息。

transactional.id的目的是在一个Producer实例的多个会话中启用事务恢复。它通常来自分区的、有状态的、应用程序中的碎片标识符。因此,对于在Partition的应用程序中运行的每个Producer实例而言,它应该是唯一的。

所有新的事务性API都是阻塞的,并会在失败时引发异常。下面的例子说明了如何使用新的API。它与上面的例子类似,只是所有的100条消息都是单个事务的一部分。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
    producer.beginTransaction();
    for (int i = 0; i < 100; i++)
        producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // We can't recover from these exceptions, so our only option is to close the producer and exit.
    producer.close();
} catch (KafkaException e) {
    // For all other exceptions, just abort the transaction and try again.
    producer.abortTransaction();
}
producer.close();

正如在示例中所暗示的那样,每个Producer只能有一个打开的事务。在beginTransaction()和commitTransaction()调用之间发送的所有消息都将成为单个事务的一部分。当指定transactional.id时,Producer发送的所有消息都必须是事务的一部分。

事务Producer使用异常来传达错误状态。特别是,不需要为producer.send()指定回调函数或在返回的Future上调用.get():如果任何producer.send()或事务性调用在执行期间遇到不可恢复的错误时,将抛出KafkaException事务。有关检测事务性发送错误的更多详细信息,请参阅send(ProducerRecord)文档。

通过在收到KafkaException异常时调用producer.abortTransaction(),我们可以确保任何成功的写入被标记为中止,因此继续保持事务性保证。

兼容性

该客户端可以与0.10.0或更新版本的brokers进行通信。较早或较新的brokers可能不支持某些客户端功能。例如,事务性API需要brokers版本0.11.0或更高版本。调用正在运行的brokers版本中不可用的API时,您将收到UnsupportedVersionException。


读后有收获可以支付宝请作者喝咖啡