2 生产者
2 生产者
前述
从编程的角度而言,生产者就是负责向 Kafka 发送消息的应用程序。
2.1 客户端开发
客户端开发
一个正常的生产逻辑需要具备以下几个步骤:
- 配置生产者客户端参数及创建相应的生产者实例。
- 构建待发送的消息。
- 发送消息。
- 关闭生产者实例。
具体的代码示例可以看com.kafka.stu.chapter02.KafkaProducerAnalysis
。其中 ProducerRecord 类的定义如下:
public class ProducerRecord<K, V> {
private final String topic; // 主题
private final Integer partition; // 分区号
private final Headers headers; // 消息头部
private final K key; // 键
private final V value; // 值
private final Long timestamp; // 消息的时间戳
// 省略其他成员方法和构造方法
}
- key 是用来指定消息的键,它不仅是消息的附加信息,还可以用来计算分区号进而可以让消息发往特定的分区。topic 可以进行归类,而 key 可以让消息再进行二次归类,同一个 key 的消息会被划分到同一个分区中。有 key 的消息还可以支持日志压缩的功能,详见后续小节。
- value 是指消息体,一般不为空,如果为空则表示特定的消息--墓碑消息,详见后续小节。
- timestamp 是指消息的时间戳,它有 CreateTime 和 LogAppendTime 两种类型,前者表示消息创建的时间,后者表示消息追加到日志文件的时间,详见后续小节。
2.1.1 必要的参数配置
必要的参数配置
创建真正的生产者实例前需要配置相应的参数,如下:
- bootstrap.servers:必填,指定生产者客户端连接 Kafka 集群所需的 broker 地址清单,可以配置部分 broker 节点的地址,因为客户端可以自己发现其他 broker 节点的地址,不过还是建议配置两个以上的地址,防止一个沓机。
- key.serializer 和 value.serializer:必填,broker 端接收的消息必须以字节数组(byte[])的形式存在。指定 key 和 value 序列化操作的序列化器,这两个参数无默认值。
- client.id:选填,设定 KafkaProducer 对应的客户端 id,默认值为“”。如果客户端不设置,则 KafkaProducer 会自动生产一个非空字符串,内容形式如“producer-1”、“producer-2”,即字符串“producer-”与数字的拼接。
KafkaProducer 是线程安全的,可以在多个线程中共享单个 KafkaProducer 实例,也可以将 KafkaProducer 实例进行池化来供其他线程调用。
2.1.2 消息的发送
消息的发送
消息的发送
在创建完生产者实例以后,接下来的工作就是构建消息,即创建 ProducerRecord 对象。
public class ProducerRecord<K, V> {
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
}
调用 ProducerRecord 方法,其部分构造方法如上,针对不同的消息,需要不同的 ProducerRecord 消息,在实际应用中创建 ProducerRecord 对象是一个非常频繁的动作。
创建生产者实例和构建消息之后,就可以开始发送消息了。发送消息主要有三种模式:发后即忘(fire-and-forget)、同步(sync)及异步(async)。
- 发后即忘:它只管往 Kafka 中发送消息而并不关心消息是否正确到达。这种方式的性能最高,可靠性也最差。
KafkaProducerAnalysis
即为这种方式。 - 同步:可靠性高,要么消息被发送成功,要么发生异常。同步方式性能会差很多,需要阻塞等待一条消息发送完之后才能发送下一条。
- 异步。
同步
KafkaProducer 的 send() 方法并非是 void 类型,而是 Future<RecordMetadata>
类型,send() 方法有 2 个重载方法,具体定义如下:
// future 结果即 send 方法本身就是异步的
public Future<RecordMetadata> send(ProducerRecord<K, V> record)
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
要实现同步的发送方式,可以利用返回的 Future 对象实现,示例如下:
try{
// send() 方法本身就是异步的,调用 get() 方法来阻塞等待 Kafka 的响应。
producer.send(record).get();
}
还可以如下这种实现同步:
// com.kafka.stu.chapter02.KafkaProducerRecordTest
// 可以获取 RecordMetadata,里面包含了消息的一些元数据信息,比如当前消息的主题、分区号、分区中的偏移量、时间戳等。如果不需要直接使用 get() 方式更省事。
RecordMetadata metadata;
metadata = future.get();
System.out.println("send success : "+ metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
KafkaProducer 中一般会发生两种类型的异常:可重试的异常和不可重试的异常。常见的可重试异常有:NetworkException、LeaderNotAvailableException、UnknownTopicOrPartitionException、NotEnoughReplicasException、NotCoordinatorException 等。重试之后可以重新恢复。不可重试的异常,比如 RecordTooLargeException 异常,暗示了所发送的消息太大,KafkaProducer 对此不会进行任何重试,直接抛出异常。
对于可重试的异常,如果配置了 retries 参数,那么只要在规定的重试次数内自行恢复了,就不会抛出异常。retries 参数的默认值为 0,配置方式参考如下。
// 配置了 10 次重试。如果重试了 10 次之后还没有恢复,那么仍会抛出异常,进而发送的外层逻辑就要处理这些异常了。
props.put(ProducerConfig.RETRIES_CONFIG, 10)
TIP
retries 参数的默认值为 0,即不会对此错误进行重试处理,若不对此进行异常捕获和处理,则会丢失消息。
异步
一般是在 send() 方法里指定一个 CallBack 的回调函数,Kafka 在返回响应时调用该函数来实现异步的发送确认,示例代码如下。
// com.kafka.stu.chapter02.KafkaProducerRecordCallbackTest
producer.send(record, new Callback() {
// metadata 和 e 是互斥的
// 消息发送成功时,metadata 不为 null 而 exception 为 null
// 消息发送异常时,metadata 为 null 而 exception 不为 null
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null){
e.printStackTrace();
} else {
System.out.println("send success : "+ metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
}
}
});
对于同一个分区而言,如果消息 record1 于 record2 之前先发送,那么 KafkaProducer 就可以保证对应的 callback1 在 callback2 之前调用,也就是说,回调函数的调用也可以保证分区有序。
批量发送消息见 com.kafka.stu.chapter02.KafkaBatchProducerRecordTest
。
close() 方法会阻塞等待之前所有的发送请求完成后再关闭 KafkaProducer。与此同时,KafkaProducer 还提供了一个带超时时间的 close() 方法,具体定义如下:public void close(Duration timeout)
。
2.1.3 序列化
序列化
生产者需要用序列化器把对象转换成字节数组才能通过网络发送给 Kafka。而在对侧,消费者需要用反序列化器把从 Kafka 中收到的字节数组转换成响应的对象。
序列化器有 org.apache.kafka.common.serialization.StringSerializer
,除了用于 String 类型的序列化器,还有 ByteArray、ByteBuffer、Bytes、Double、Integer、Long 这几种类型,它们都实现了 org.apache.kafka.common.serialization.Serializer 接口,此接口有 3 个方法:
// 配置当前类,如 StringSerializer 主要用来确认编码类型。
default void configure(Map<String, ?> configs, boolean isKey)
// 用来执行序列化操作
default byte[] serialize(String topic, Headers headers, T data)
// 一般情况下是空方法,如果实现了此方法,则必须确保此方法的幂等性,因为这个方法很可能会被 KafkaProducer 调用多次。
default void close()
生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的。
自定义的序列化器代码见 com.kafka.stu.chapter02.serial
包下的代码。对应的反序列化器见 3.2.3 节
。
2.1.4 分区器
分区器
消息在通过 send() 方法发往 broker 的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。拦截器一般不是必须的,而序列化器是必须的。消息经过序列化器之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是所要发往的分区号;如果没有指定,那么就需要依赖分区器,根据 key 这个字段来计算 partition 的值。分区器的作用就是为消息分配分区。
Kafka 中提供的默认分区器是 org.apache.kafka.clients.producer.internals.DefaultPartitioner
,它实现了 org.apache.kafka.clients.producer.Partitioner
接口,这个接口中定义了 2 个方法,具体如下。
public interface Partitioner extends Configurable, Closeable {
/**
* 计算分区号
* @param topic 主题
* @param key 键
* @param keyBytes 序列化的键
* @param value 值
* @param valueBytes 序列化的值
* @param cluster 集群的元数据信息
*/
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster)
// 关闭分区器的时候回收一些资源
public void close()
}
Partitioner 接口还有一个父接口 org.apache.kafka.common.Configurable,这个接口中只有一个方法。
public interface Configurable {
/**
* 主要用来获取配置信息及初始化数据
* Configure this class with the given key-value pairs
*/
void configure(Map<String, ?> configs);
}
在默认分区器 DefaultPartitioner 的实现中,close() 是空方法,而在 partition() 方法中定义了主要的分区分配逻辑。如果 key 不为 null,那么默认的分区器会对 key 进行哈希(采用 MurmurHash2 算法,具备高运算性能及低碰撞率),最终根据得到的哈希值来计算分区号,拥有相同 key 的消息会被写入同一个分区。如果 key 为 null,那么消息将会以轮询的方式发往主题内的各个可用分区。
在不改变主题分区数量的情况下,key 与分区之间的映射可以保持不变。不过,一旦主题中增加了分区,那么就难以保证 key 与分区之间的映射关系了。
还可以自定义分区器,我们可以通过自定义的分区器 DemoPartitioner 来打破这一限制,具体的实现可以查看代码org.apache.kafka.clients.producer.internals.DefaultPartitioner
。定义号默认分区后,可以通过参数 partitioner.class 来显式指定这个分区器。
2.1.5 生产者拦截器
生产者拦截器
Kafka 一共有两种拦截器:生产者拦截器和消费者拦截器。
生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类的工作。
生产者拦截器的使用也很方便,主要是自定义实现 org.apache.kafka.clients.producer.ProducerInterceptor
接口。ProducerInterceptor 接口中包含 3 个方法。
public interface ProducerInterceptor<K, V> extends Configurable {
/**
* 在将消息序列化和计算分区之前会调用生产者拦截器的 onSend() 方法来对消息进行相应的定制化操作。
* 一般来说最耗不要修改消息 ProducerRecord 的 topic、key 和 partition 等信息,如果要修改,
* 则需确保对其有准确的判断,否则会与预想的效果出现偏差。比如修改 key 不仅会影响分区的计算,
* 同样会影响 broker 端日志压缩(Log Compaction)的功能。
* @param record
*/
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record)
/**
* Kafka 会在消息被应答(Acknowledgement)之前或消息发送失败时调用该方法,优先于用户设定的 Callback 之前执行。
* 该方法运行在 Producer 的 I/O 线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。
* @param metadata
* @param exception
*/
public void onAcknowledgement(RecordMetadata metadata, Exception exception)
/**
* close() 方法组要用于在关闭拦截器时执行一些资源的清理工作。
*/
public void close()
}
生产者拦截器使用可以查看代码 com.kafka.stu.chapter02.inter.ProducerInterceptorPrefix
,演示了为每条消息添加一个前缀 “prefix1-”,并且通过 onAcknowledgement() 方法来计算发送消息的成功率。
KafkaProducer 中不仅可以指定一个拦截器,还可以指定多个拦截器以形成拦截链。拦截链会按照 interceptor.classes 参数配置的拦截器的顺序来一一执行(配置时,多个拦截器之间用逗号隔开)。拦截链可以查看代码 com.kafka.stu.chapter02.inter.ProducerInterceptorPrefixPlus
。
如果拦截链的某个拦截器的执行需要依赖于前一个拦截器的输出,那么就有可能产生“副作用”。设想一下,如果前一个拦截器由于异常而执行失败,那么这个拦截器也就跟着无法继续执行。在拦截链中,如果某个拦截器执行失败,那么下一个拦截器会接着从上一个执行成功的拦截器继续执行(DEBUG 验证,拦截器链某一个拦截器的 onSend 抛出异常,也不会影响下一个拦截器的 onSend 逻辑执行)。
2.2 原理分析
2.2.1 整体架构
生产者客户端的整体架构
生产者客户端的整体架构如下图所示。
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。在主线程中由 KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从 RecordAccumulator 中获取消息并将其发送到 Kafka 中。
RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator 缓存的大小可以通过生产者客户端参数 buffer.memory 配置,默认值为 33554432B,即 32 MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候 KafkaProducer 的 send() 方法调用要么被阻塞,要么抛出异常,这个取决于参数 max.block.ms 的配置,此参数的默认值为 60000,即 60 秒。
主线程中发送过来的消息都会被追加到 RecordAccumulator 的某个双端队列(Deque)中,在 RecordAccumulator 的内部为每个分区都维护了一个双端队列,队列中的内容就是 ProducerBatch,即 Deque<ProducerBatch>
。消息写入缓存时,追加到双端队列的尾部;Sender读取消息时,从双端队列的头部读取。注意 ProducerBatch 不是 ProducerRecord,ProducerBatch 中可以包含一至多个 ProducerRecord(这样可以使字节的使用更加紧凑)。如果生产者客户端需要向很多分区发送消息,则可以将 buffer.memory 参数适当调大以增加整体的吞吐量。
消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在 Kafka 生产者客户端中,通过 java.io.ByteBuffer 实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的,在 RecordAccumulator 的内部还有一个 BufferPool,它主要用来实现 ByteBuffer 的复用,以实现缓存的高效利用。不过 BufferPool 只针对特定大小的 ByteBuffer 进行管理,而其他大小的 ByteBuffer 不会缓存进 BufferPool 中,这个特定的大小由 batch.size 参数来指定,默认值为 16382B,即 16KB。我们可以适当地调大 batch.size 参数以便多缓存一些消息。
ProducerBatch 的大小和 batch.size 参数也有着密切的关系。当一条消息流入 RecordAccumulator 时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个 ProducerBatch(如果没有则新建),查看 ProducerBatch 中是否还可以写入这个 ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的 ProducerBatch。在新建 ProducerBatch 时评估这条消息的大小是否超过 batch.size 参数的大小,如果不超过,那么就以 batch.size 参数的大小来创建 ProducerBatch,这样在使用完这段内存区域之后,可以通过 BufferPool 的管理来进行复用;如果超过,那么就以评估的大小来创建 ProducerBatch,这段内存区域不会被复用。
Sender 从 RecordAccumulator 中获取到缓存的消息之后,会进一步将原本<分区, Deque<ProducerBatch>>
的保存形式转变成<Node, List<ProducerBatch>>
的形式,其中 Node 表示 Kafka 集群的 broker 节点。对于网络连接来说,生产者客户端是与具体的 broker 节点建立的连接,也就是向具体的 broker 节点发送消息,而并不关心消息属于哪一个分区;而对于 KafkaProducer 的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络 I/O 层面的转换。
在转换成<Node, List<ProducerBatch>>
的形式之后,Sender 还会进一步封装成<Node, Request>
的形式,这样就可以将 Request 请求发往各个 Node 了,这里的 Request 是指 Kafka 的各种协议请求,对于消息发送而言就是指具体的 ProduceRequest。
请求在从 Sender 线程发往 Kafka 之前还会保存到 InFlightRequests 中,InFlightRequests 保存对象的具体形式为Map<NodeId, Deque<Request>>
,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个 String 类型,表示节点的 id 编号)。与此同时,InFlightRequests 还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与 Node 之间的连接)最多缓存的请求数。这个配置参数为 max.in.flight.requests.per.connection
,默认值为 5,即每个连接最多只能缓存 5 个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。通过比较 Deque<Request>
的 size 与这个参数的大小来判断对应的 Node 中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个 Node 节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。
2.2.2 元数据的更新
元数据的更新
InFlightRequests 还可以获得 leastLoadedNode ,即所有 Node 中负载最小的那一个。这里的负载最小是通过每个 Node 在 InFlightRequests 中还未确认的请求决定的,未确认的请求越多则认为负载越大。选择 leastLoadedNode 发送请求可以使它能够尽快发出,避免因网络拥塞等异常而影响整体的进度。leastLoadedNode 的概念可以用于多个应用场合,比如元数据请求、消费者组播协议的交互。
如下,创建了一条消息 ProducerRecord。
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "java Analysis hello");
我们只知道主题的名称,对于其他一些必要的信息却一无所知。KafkaProducer 要将此消息追加到指定主题的某个分区所对应的 leader 副本之前,首先需要知道主题的分区数量,然后经过计算得出(或者指定)目标分区,之后 KafkaProducer 需要知道目标分区的 leader 副本所在的 broker 节点的地址、端口等信息才能建立连接,最终才闹将消息发送到 Kafka,在这一过程中所需要的信息都属于元数据信息。
bootstrap.servers 参数只需要配置部分 broker 节点的地址,因为客户端可以自己发现其他 broker 节点的地址,这一过程也属于元数据的更新操作。与此同时,分区数量及 leader 副本的分布都会动态地变化,客户端也需要动态地捕捉这些变化。
元数据是指 Kafka 集群的元数据,它们记录了集群中有哪些主题,这些主题有哪些分区,每个分区的 leader 副本分配在哪个节点上,follower 副本分配在哪些节点上,哪些副本在 AR、ISR 等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。
当客户端中没有需要使用的元数据信息时,比如没有指定的主题信息,或者超过 metadata.max.age.ms
时间没有更新元数据都会引起元数据的更新操作。客户端参数 metadata.max.age.ms
的默认值为 300000,即 5 分钟。元数据的更新操作是在客户端内部进行的,对客户端的外部使用者不可见。当需要更新元数据时,会先挑选出 leastLoadedNode,然后向这个 Node 发送 MetadataRequest 请求来获取具体的元数据信息。这个更新操作是由 Sender 线程发起的,在创建完 MetadataRequest 之后同样会存入 InFlightRequests,之后的步骤就和发送消息时类似。元数据虽然由 Sender 线程负责更新,但是主线程也需要读取这些信息,这里的数据同步通过 synchronized 和 final 关键字来保障。
2.3 重要的生产者参数
挑一些重要的参数进行讲解。
1. acks
这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。acks 是生产者客户端中一个非常重要的参数,它涉及消息的可靠性和吞吐量之间的权衡。acks 参数有 3 种类型的值(都是字符串类型)。
- acks = 1。默认值即为 1。生产者发送消息之后,只要分区的 leader 副本成功的写入消息,生产端就会收到来自服务端的成功响应,说明发送成功。如果消息无法写入 leader 副本,比如在 leader 副本崩溃、重新选举新的 leader 副本的过程中,生产者就会收到一个错误的响应,为了避免消息丢失,生产者就会选择重发消息;如果消息写入 leader 副本并成功响应给生产者,并且在其他 follower 副本拉取之前 leader 副本崩溃,此时消息还会丢失,因为新选举的 leader 副本中并没有这条对应的消息。acks 设置为 1,是消息可靠性和吞吐量之间的折中方案。
- acks = 0。生产者发送消息之后,不需要等待任何服务端的响应。如果在消息从发送到写入 时,Kafka 的过程中出现异常,导致 时,Kafka 并没有收到消息,此时生产者是不知道的,消息也就丢失了。acks 设置为 0 时,Kafka 可以达到最大的吞吐量。
- acks = -1 或 acks = all。生产者在消息发送之后,需要等待 ISR 中所有的副本都成功写入消息此案能够收到服务端的成功响应。acks 设置为 -1,可以达到相对最强的可靠性。但这不一定是最可靠的,因为 ISR 中可能就只有 leader 副本,这样就退化成了 acks = 1 的情况。要获得更高的消息可靠性需要配合 min.insync.replicas 等参数的联动。
注意 acks 参数配置的值是一个字符串类型,而不是整数类型,否则会报异常。
2. max.request.size
生产者客户端能发送消息的最大值,默认值为 1048576B,1MB。不建议盲目修改,这个参数涉及其他的一些参数的联动,比如 broker 端的 message.max.bytes
参数,如果 broker 的 message.max.bytes
参数设置为 10,而 max.request.size
设置为 20,当发送一条大小为 15B 的消息时,生产者参数就会报错。
3. retries 和 retry.backoff.ms
生产者重试次数,默认值为 0。消息在从生产者从发出到成功写入 broker 之前可能发生一些临时性异常,比如网络抖动、leader 副本选举等,这些异常往往是可以自行恢复的,生产者可以配置 retries 的值,通过生产端的内部重试来恢复而不是一味的将异常抛给生产者;如果重试达到设定次数,生产者才会放弃重试并抛出异常。但是!并不是所有的异常都可以通过重试来解决,比如消息过大,超过 max.request.size
参数配置的数值。
重试还和参数 retry.backoff.ms
有关,默认值为 100,用来设定两次重试之间的时间间隔,避免无效的频繁重试。在配置 retries 和 retry.backoff.ms
之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间要大于异常恢复时间,避免生产者过早的放弃重试。
重试时可能会出现错序的现象,一般而言,在需要保证消息顺序的场合建议把参数 max.in.flight.requests.per.connection
配置为 1,而不是把 acks 配置为 0,不过这样也会影响整体的吞吐。max.in.flight.requests.per.connection
该参数指定了生产者在收到服务器晌应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为1可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
4. compression.type
这个参数用来指定消息的压缩方式,默认值为“none”,即默认情况下,消息不会被压缩。该参数还可以配置为“gzip”,“snappy”和“lz4”。对消息进行压缩可以极大的减少网络传输量、降低网络 I/O、从而提高整体的性能。消息压缩是一种使用时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩。
5. connection.max.idle.ms
这个参数用来指定多久之后关闭限制的连接,默认值 540000(ms),即 9 分钟。
6. linger.ms
这个参数用来指定生产者发送 ProducerBatch 之前等待更多的消息(ProducerRecord)加入 ProducerBatch 的时间,默认值为 0。ProducerBatch 在被填满或者等待时间超过 linger.ms
值时发送出去。增大这个参数的值会增加消息的延迟(消费端接收延迟),但能够提升一定的吞吐量。
7. receive.buffer.bytes
这个参数用来设置 socket 接收消息缓冲区(SO_RECBUF)的大小,默认值为 32768(B),即 32KB。如果设置为 -1,则使用操作系统的默认值。如果 Producer 和 Kafka 处于不同的机房,则可以适当的调大这个参数值。
8. send.buffer.bytes
这个参数用来设置 socket 发送消息缓冲区(SO_SNDBUF)的大小,默认值为 131072(B),即 128KB。与 receive.buffer.bytes
参数一样,如果设置为 -1,则使用操作系统的默认值。
9. request.timeout.ms
这个参数用来配置 Producer 等待请求响应的最长时间,默认值为 30000(ms)。请求超时之后可以选择进行重试。这个参数需要比 broker 端参数 replica.lag.time.max.ms
值要大,这样可以减少因客户端重试引起的消息重复的概率。
远不止这些参数,有些参数会在后续的小节予以介绍,或查阅 官方文档配置 Producer Configs
了解详细的参数等。