3 消费者

felix.shao2025-04-19

3 消费者

前述

 与生产者对应的是消费者,应用程序可以通过 KafkaConsumer 来订阅主题,并从订阅的主题种拉取消息。

3.1 消费者与消费者组

消费者与消费者组

 消费者负责订阅 Kafka 中的主题,并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在 Kafka 的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。

 两个消费组之间互不影响,每个消费者只能消费所分配到的分区中的消息。换言之,每一个分区只能被一个消费组中的一个消费者所消费。

 消费者与消费组的模型可以让整体的消费能力具备横向伸缩性,我们可以增加(或减少)消费组的个数来提高(或降低)整体的消费能力。对于分区数固定的情况,一味地增加消费者并不会让消费能力一直得到提升,如果消费者过多,出现了消费者的个数大于分区个数的情况,就会有消费者分配不到任何分区。

 以上分配逻辑都是基于默认的分区分配策略进行分析的,可以通过消费者客户端参数 partition.assignment.strategy 来设置消费者与订阅主题之间的分区分配策略,分区分配策略详见 7.3 节介绍。

 消息中间件一般有两种消息投递模式。

  • 点对点(P2P,Point-to-Point)模式:基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息。
  • 发布/订阅(Pub/Sub)模式:定义了如何像一个内容节点发布和订阅消息,这个内容节点称为主题(Topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者从主题中订阅消息。主题使得消息的订阅者和发布者互相保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。

 Kafka 同时支持两种消息投递模式,而这正式得益于消费者与消费组模型的契合。

  1. 如果所有的消费者都隶属于同一个消费组,那么所有的消息都会被均衡地投递给每一个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式的应用。
  2. 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式的应用。

 消费组是一个逻辑上的概念,它将旗下的消费者归为一类,每一个消费者只隶属于一个消费组。每一个消费组都会有一个固定的名称,消费者在进行消费前需要指定其所属消费组的名称,这个可以通过消费者客户端参数 group.id 来配置,默认值为空字符串。
 消费者并非逻辑上的概念,它的实际的应用示例,它可以是一个线程,也可以是一个进程。同一个消费组内的消费者既可以部署在同一台机器上,也可以部署在不同的机器上。

3.2 客户端开发

客户端开发

 在 Kafka 的历史中,消费者客户端同生产者客户端一样也经历了两个大版本:第一个是基于 Kafka 开源之初使用 Scala 语言编写的客户端,我们可以称之为旧消费者客户端(Old Consumer)或 Scala 消费者客户端:第二个是基从 Kafka 0.9.x 版本开始推出的使用 Java 编写的客户端,我们可以称之为新消费者客户端(New Consumer)或 Java 消费者客户端,它弥补了旧客户端中存在的诸多设计缺陷。
 我们主要是基于目前流程的新消费者客户端来实现,一个正常的消费逻辑需要具备以下几个步骤。

  1. 配置消费者客户端参数及创建相应的消费者实例。
  2. 订阅主题。
  3. 拉取消息并消费。
  4. 提交消费唯一。
  5. 关闭生产者实例。  具体的代码示例可以看com.kafka.stu.chapter03.KafkaConsumerAnalysis。我们对各个步骤进行相应的分析。

3.2.1 必要的参数配置

必要的参数配置

 创建真正的消费者实例前需要配置相应的参数,其中有 4 个参数是必填的,如下。

  • bootstrap.servers:必填,同生产者客户端参数,默认值为“”。
  • group.id:必填,消费者隶属的消费者名称,默认值为“”。如果设置为空,则会抛出异常。一般而言,这个参数需要设置成具有一定的业务意义的名称。
  • key.serializer 和 value.serializer:必填,同生产者客户端 Producer 中的 key.serializer 和 value.serializer 参数对应,这两个参数无默认值。
  • client.id:选填,设定 KafkaConsumer 对应的客户端 id,默认值为“”。如果客户端不设置,则 KafkaConsumer 会自动生产一个非空字符串,内容形式如“consumer-1”、“consumer-2”,即字符串“consumer-”与数字的拼接。

 KafkaConsumer 中的参数众多,远非实例 initConfig() 方法中的那样只有 5 个。我们可以直接使用客户端中的 org.apache.kafka.clients.consumer.ConsumerConfig 类来避免人为因素而书写错误。

3.2.2 订阅主题与分区

订阅主题与分区
 订阅主题的方式

 集合的订阅方式有 subscribe(Collection)subscribe(Pattern)assign(Collection) 三种方式。他们分别代表了三种不同的订阅状态:AUTO_TOPICS、AUTO_PATTERN 和 USER_ASSIGNED (如果没有订阅,那么订阅的状态为 NONE)。这三种状态是互斥的,在一个消费者中只能使用其中的一种。
 subscribe 的几个重载方法和 assign() 方法如下:

/**
 * 以集合的形式订阅多个主题
 * @param topics 主题列表
 * @param listener 设置相应的再均衡监听器
 */
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
public void subscribe(Collection<String> topics)
/**
 * 以正则表达式的形式订阅多个主题
 * @param topics 主题列表
 * @param listener 设置相应的再均衡监听器 3.2.8 节介绍
 */
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe(Pattern pattern)

/**
 * 订阅主题的特定分区
 * @param partitions 唯一的参数,用来指定需要订阅的分区集合。
 */
public void assign(Collection<TopicPartition> partitions)
  • subscribe(Collection)。
     对于消费者使用集合的方式 (subscribe(Collection)) 来订阅主题而言,比较容易理解,订阅了什么主题就消费什么主题中的消息。如果前后两次订阅了不同的主题,那么消费者以最后一次的为准(注意是覆盖逻辑)。
  • subscribe(Pattern)。
     如果消费者采用的是正则表达式的方式 (subscribe(Pattern)) 订阅,在之后的过程中,如果有人创建了新的主题,并且主题的名字与正则表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息。如果应用程序需要消费多个主题,并且可以处理不同的类型,那么这种订阅方式就很有效。在 Kafka 和其他系统之间数据复制时,这种正则表达式的方式就显得狠常见。
  • assign(Collection)。
 分区

 先看下 TopicPartition 类的结构。

public final class TopicPartition implements Serializable{
    // 分区编号
    private final int partition;
    // 主题
    private final String topic;
}

 见 KafkaConsumerAnalysis,我们可以用 assign 指定订阅分区编号为 0 的分区。

consumer.assign(Arrays.asList(new TopicPartition(topic, 0)));

 我们需要用 KafkaConsumer 中的 partitionsFor() 方法查询指定主题的元数据信息, partitionsFor() 方法的具体定义如下。

public List<PartitionInfo> partitionsFor(String topic)

 其中 PartitionInfo 类型即为主题的分区元数据信息,此类的主要结构如下:

public class PartitionInfo {
    private final String topic;
    private final int partition;
    private final Node leader;
    private final Node[] replicas;
    private final Node[] inSyncReplicas;
    private final Node[] offlineReplicas;
}

 PartitionInfo 类中的属性 topic 表示主题名称,partition 代表分区编号,leader 代表分区的 leader 副本所在的位置,replicas 代表分区的 AR 集合,inSyncReplicas 代表分区的 ISR 集合,offlineReplicas 代表分区的 OSR 集合。
 通过 partitionFor() 方法的协助,我们可以通过 assign() 方法来实现订阅主题(全部分区)的功能,示例代码如下:

List<TopicPartition> partitions = new ArrayList<>();
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
if(partitionInfos != null){
    for(PartitionInfo tpInfo : partitionInfos){
        partitions.add(new TopicPartition(tpInfo.topic(), tpInfo.partition()));
    }
}

 取消订阅的方法为 unsubscribe() 方法,下面示例的三行代码的效果相同。

consumer.unsubscribe();
consumer.subscribe(new ArrayList<String>());
consumer.assign(new ArrayList<TopicPartition>());

 如果没有订阅任何主题和分区,那么再继续执行消费程序的时候会抛出异常。

Exception in thread "main" java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions

 通过 subscribe() 方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。而通过 assign() 方法订阅分区时,是不具备消费者自动均衡的功能的,看接口参数可以看出来这个逻辑。

3.2.3 反序列化

反序列化

 消费者需要用反序列化器把从 Kafka 中收到的字节数组转换成响应的对象。
 反序列化器有 ByteBufferDeserializer、ByteArrayDeserializer、BytesDeserializer、DoubleDeserializer、FloatDeserializer、IntegerDeserializer、LongDeserializer、ShortDeserializer、StringDeserializer,它们分别用于 ByteBuffer、ByteArray、Bytes、Double、Fload、Integer、Long、Short、String 类型的序列化,这些序列化器都实现了 org.apache.kafka.common.serialization.Deserializer 接口,此接口有 3 个方法。

public interface Deserializer<T> extends Closeable {
    // 配置当前类
    default void configure(Map<String, ?> configs, boolean isKey)
    // 用来执行反序列化操作
    T deserialize(String topic, byte[] data)
    // 空方法,如果实现了此方法,则必须确保此方法的幂等性,因为这个方法很可能会被 KafkaProducer 调用多次。
    default void close()
}

 生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的。
 自定义的序列化器代码见 com.kafka.stu.chapter02.serial 包下的代码,对应的反序列器示例代码见 com.kafka.stu.chapter03.CompanyDeserializable
 基于 protostuff 自定义化的序列化器示例代码见 com.kafka.stu.chapter03.protostuff 包。

TIP

 注意如无特殊需要,不建议使用自定义的序列化器或反序列化器,因为这样会增加生产者与消费者之间的耦合度,在系统升级换代的时候很容易出错。自定义的类型有一个不得不面对的问题就是 KafkaProducer 和 KafkaConsumer 之间的序列化和反序列化的兼容性。对于 StringSerializer 来说,KafkaConsumer 可以顺其自然地采用 StringDeserializer,不过对于 Company 这种专用类型而言,某个上游应用采用 CompanySerializer 进行序列化之后,下游应用也必须实现对应的 CompanyDeserializer。再者,如果上有的 Company 类型改变,那么下游也需要跟着重新实现一个新的 CompanyDeserializer。后者面临的难题可想而知。
 在实际应用中,在 Kafka 提供的序列化器和反序列化器满足不了应用需求的前提下,推荐使用 Avro、JSON、Thrift、ProtoBuf 或 ProtoStuff 等通用的序列化工具来包装,以求尽可能实现得更加通用且前后兼容。使用通用的序列化工具也需要实现 Serializer 和 Deserializer 接口,因为 Kafka 客户端的序列化和反序列化入口必须是这两个类型。

3.2.4 消息消费

消息消费

 Kafka 中的消息是基于拉模式的。消息的消费一般有两种模式:推模式和拉模式。推模式是服务器主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。
 Kafka 中的消息消费是一个不断轮询的过程,消费者所要作的就是重复地调用 poll() 方法,而 poll() 方法返回的是所订阅的主题(分区)上的一组消息。而拉模式返回的是所订阅的主题(分区)上的一组消息。
 对于 poll() 方法而言,如果某些分区中没有可供消费的消息,那么此分区对应的消息拉去的结果就为空;如果订阅的所有分区中都没有可供消费的消息,那么 poll() 方法返回为空的消息集合。poll() 方法的具体定义如下。

public class KafkaConsumer<K, V> implements Consumer<K, V> {
    /**
     * 拉取消费
     * @param timeout 用来控制 poll() 方法的阻塞时间。
     */
    public ConsumerRecords<K, V> poll(final Duration timeout)
}

 timeout 的设置取决于应用程序对响应速度的要求,比如需要在多长时间内将控制权移交给执行轮询的应用线程。可以直接将 timeout 设置为 0,这样 poll() 方法会立刻返回,而不管是否已经拉取到了消息。如果应用线程唯一的工作就是从 Kafka 中拉取并消费消息,则可以将这个参数设置为最大值 Long.MAX_VALUE。

 消费者消费到的每条消息的类型为 ConsumerRecord(注意与 ConsumerRecords 的区别),这个和生产者发送的消息类型 ProducerRecord 相对应,不过 ConsumerRecord 中的内容更加丰富,具体的结构参加如下代码。

public class ConsumerRecord<K, V> {

    // 主题名称
    private final String topic;
    // 所在分区的编号
    private final int partition;
    // 消息在所属分区的偏移量
    private final long offset;
    // 时间戳
    private final long timestamp;
    // 时间戳的类型,有消息创建的时间戳和消息追加到日志的时间戳两种类型
    private final TimestampType timestampType;
    // 表示 key 经过序列化之后的大小,如果 key 为空,则 serializedKeySize 值为 -1
    private final int serializedKeySize;
    // 表示 value 经过序列化之后的大小
    private final int serializedValueSize;
    // 消息的头部内容
    private final Headers headers;
    // 消息的键
    private final K key;
    // 消息的值,一般应用读取的内容
    private final V value;
    private final Optional<Integer> leaderEpoch;
    // CRC32 的校验值
    private volatile Long checksum;
}

 我们在消费消息的时候可以直接对 ConsumerRecord 中感兴趣的字段进行具体的业务逻辑处理。
 poll() 方法的返回值类型是 ConsumerRecords,它用来表示一次拉取操作所获得的消息集,内部包含了若干 ConsumerRecord,它提供了一个 iterator() 方法来循环遍历消息集内部的消息。
 示例代码见 com.kafka.stu.chapter03.dimconsumer 包下的代码,明细如下。

  • PartitionConsumer,按照分区维度消费。
  • TopicConsumer,按照主题维度消费。

3.2.5 位移提交

位移提交

 对于 Kafka 中的分区而言,它的每条消息都有唯一的 offset,用来表示消息在分区中对应的位置。对于消费者而言,它也有一个 offset 的概念,消费者使用 offset 来表示消费到分区中某个消息所在的位置。一般来说:对于消息在分区中的位置,我们将 offset 称为“偏移量”;对于消费者消费到的位置,将 offset 称为“位移”,有时候也会更明确地称之为“消费位移”。这样区分可以甄别出是在讲分区存储层面的内容,还是在讲消费层面的内容。当然,对于一条消息而言,它的偏移量和消费者消费它时的消费位移是相等的,在某些不需要具体划分的场景下也可以用“消息位置”或直接用“offset”这个单词来表述。

 在每次调用 poll() 方法时,它返回的是还没有被消费过的消息集,要做到这一点,就需要记录上一次消费时的消费位移。并且这个消费位移必须做持久化保存,而不是单单保存在内存中,否则消费者重启之后就无法知晓之前的消费位移。再考虑一种情况,当有新的消费者加入时,那么必然会有再均衡的动作,对于同一分区而言,它可能在再均衡动作之后分配给新的消费者,如果不持久化保存消费位移,那么这个新的消费者也无法知晓之前的消费位移。

 在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在 Kafka 内部的主题 _consumer_offsets 中。这里把将消费位移存储起来(持久化)的动作称为“提交”,消费者在消费完消息之后需要执行消费位移的提交。

 参考下图的消费位移,x 表示某一次拉取操作中此分区消息的最大偏移量,假设当前消费者已经消费了 x 位置的消息,那么我们就可以说消费者的消费位移为 x,图中也用了 lastConsumedOffset 这个单词来标识它。

consumer_offset.png

 非常需要明确的是,当前消费者需要提交的消费位移并不是 x,而是 x + 1,对应于图中的 position,它表示下一条需要拉取的消息的位置。
 KafkaConsumer 提供了 position(TopicPartition)committed(TopicPartition) 两个方法来分别获取上面所说的 position 和 committed offset 的值。这两个方法的定义如下所示。

public class KafkaConsumer<K, V> implements Consumer<K, V> {

    public long position(TopicPartition partition)

    public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions)

}

 消费位移代码的演示见:com.kafka.stu.chapter03.offset.OffsetConsumer,测试时,注意先用生产者生产消息,查看其对应的分区,再在位移测试里面改对应分区,得到如下结果。据此可以验证 消费者到此分许消息的最大偏移量为 16,对应的消费位移 lastConsumedOffset 也就是 16。在消费完之后就执行同步提交,但是最终结果显示所提交的位移 committed offset 为 17,并且下一次所要拉取的消息的起始偏移量 position 也是 17。在本次示例中,position = committed offset = lastConsumedOffset + 1,当然 position 和 committed offset 并不会一直相同,我们后面研究。

consumed offset is 16
commited offset is 17
the offset of the next record is 17

 对于位移提交的具体时机的把握也很有讲究,有可能会造成重复消费和消息丢失的现象,两种消费异常逻辑介绍如下。

  • 上一次消费的位移是 x+2,当前一次 poll 操作所拉取的消息集为 [x+2,x+7]。
    • 1 如果拉取到消息之后就进行了位移提交,即提交了 x+8,消费到 x+5 出现了异常,在故障恢复之后,重新拉取的消息是从 x+8 开始的,也就是说 x+5 至 x+7 之间的消息未消费,即发生了消息丢失的现象。
    • 2 位移提交的动作是在消费完所有拉取到的消息之后才执行的,那么当消费 x+5 遇到了异常,在故障恢复之后,重新拉取的消息是从 x+2 开始的,也就是说 x+2 至 x+4 之间的消息又重新消费了一遍,即发生了重复消费的现象。
    • 3 多线程情况下,拉取到消息之后就进行了位移提交,A 线程提交了 x+6,B 线程消费 x+3 出现异常,故障恢复后,从 x+6 消费,即 x+3 至 x+5 之间的消息被丢失了。

 在 Kafka 中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数 enable.auto.commit 配置,默认值为 true。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数 auto.commit.interval.ms 配置,默认值为 5 秒,此参数生效的前提是 enable.auto.commit 参数为 true。

 在默认的方式下,消费者每隔 5 秒会将拉取到的每个分区中最大消息位移进行提交。自动位移提交的动作是在 poll() 方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。

 手动提交上面配置改为 false 接口,其可以细分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync()commitAsync() 两种类型的方法。

 同步提交示例 DEMO 如下。

  • 带参数的同步提交示例代码见 com.kafka.stu.chapter03.dimconsumer.SyncCommitConsumer
  • 同步批量提交示例代码见 com.kafka.stu.chapter03.dimconsumer.BufferCommitConsumer。示例代码限制 200 条数据提交,不满足时,停止服务重启又会拉到未提交的数据,即会出现重复消费问题。
  • 按分区的粒度提交位移(提高性能)的示例代码见 com.kafka.stu.chapter03.dimconsumer.PartitionDimCommitConsumer

 commitSync() 在没有发生不可恢复的错误,它就会阻塞消费者线程直至位移提交完成。对于不可恢复的错误,比如 CommitFailedException、WakeupException、InterruptException、AuthenticationException、AuthorizationException 等,我们可以将其捕获并做针对性的处理。
 同步提交方法如下。

public class KafkaConsumer<K, V> implements Consumer<K, V> {
    public void commitSync() 

    // 可以更细粒度的、更精准的提交。
    public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets)

    // 支持超时提交
    public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) 
}

 异步提交示例 DEMO 如下。

  • 异步提交示例代码见 com.kafka.stu.chapter03.dimconsumer.CallbackCommitConsumer

 异步提交在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。
 异步提交可以使消费者的性能得到一定的增强。
 异步提交失败时重试与否都会出现重复消费的问题,不重试时,位移提交失败,会出现重复消费(比如提交失败后,重新启动,又会拉到刚才那条消息);重试时,也有重复消费问题(比如批量消息时,有部分已被消费,但是提交失败,重试则会出现重复消费等)。

 异步提交方法如下。

public class KafkaConsumer<K, V> implements Consumer<K, V> {
    public void commitAsync() 
    public void commitAsync(OffsetCommitCallback callback)
    public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
}

3.2.6 控制或关闭消费

控制或关闭消费

 KafkaConsumer 提供了对消费速度进行控制的方法,在有些应用场景下我们可能需要暂停某些分区的消费而先消费其他分区,当达到一定条件时再恢复这些分区的消费。

public class KafkaConsumer<K, V> implements Consumer<K, V> {
    // 暂停某些分区在拉取操作时返回数据给客户端
    public void pause(Collection<TopicPartition> partitions)
    // 恢复某些分区向客户端返回数据的操作
    public void resume(Collection<TopicPartition> partitions)
    // 返回被暂停的分区集合
    public Set<TopicPartition> paused() 
}

 优雅退出循环有 while(isRunning.get()),还有一种 KafkaConsumer 的 wakeup() 方法,wakeup() 方法是 KafkaConsumer 中唯一可以从其他线程里安全调用的方法(KafkaConsumer 是非线程安全的)。调用 wakeup() 方法后可以退出 poll() 的逻辑,并抛出 WakeupException 的异常。我们也不需要处理 WakeupException 的异常,它只是一种跳出循环的方式。跳出循环以后一定要显示地执行关闭动作以释放运行过程中占用的各种系统资源,包括内存资源、Socket 连接等。
 close() 方法略。

3.2.7 指定位移消费

指定位移消费

 在 Kafka 中每当消费者查找不到所记录的消费位移时(如消费者的位移信息过期而被删除等),就会根据消费者客户端参数 auto.offset.reset 的配置来决定从何处开始进行消费,这个参数的默认值为“latest”,表示从分区末尾开始消费消息。如果将 auto.offset.reset 参数配置为“earliest”,那么消费者会从起始处,也就是 0 开始消费。
 可以加一个新的消费组,然后看客户端日志信息如 Resetting offset for partition 相关的消费位移日志信息。
 除了查找不到消费位移,位移越界也会触发 auto.offset.reset 参数的执行,见下面的 seek 系列的方法介绍。
auto.offset.reset 参数还有一个可配置的值-“none”,配置为此值就意味着出现查不到消费位移的时候,既不从最新的消息位置处开始消费,也不从最早的消息位置处开始消费,此时会报出 NoOffsetForPartitionException 异常。
 消息的拉取是根据 poll() 方法来处理的,里面的逻辑对于普通的开发人员而言是一个黑盒,无法精确地掌控其消费的起始位置。提供的 auto.offset.reset 参数也只能在找不到消费位移越界的情况下粗粒度地从开头或末尾开始消费。有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而 KafkaConsumer 中的 seek() 方法正好提供了这个功能,让我们得以坠前消费或回溯消费。 seek() 方法的具体定义如下。

void seek(TopicPartition partition, long offset)

 seek() 方法只能重置消费者分配到的分区的消费位置,而分区的分配是在 poll() 方法的调用过程中实现的。也就是说,在执行 seek() 方法之前需要先执行一次 poll() 方法,等到分配到分区之后才可以重置消费位置。
 seek() 方法的使用示例代码见 com.kafka.stu.chapter03.offset.SeekConsumer。如果消费组内的消费者在启动的时候能够找到消费位移,除非发生位移越界,否则 auto.offset.reset 参数并不会奏效,此时如果想指定从开头或末尾开始消费,就需要 seek() 方法了。
 使用 seek() 方法从分区末尾消费的示例代码见 com.kafka.stu.chapter03.offset.SeekLastConsumerendOffsets() 的具体方法定义如下。

Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions)
// timeout 等待获取的超时时间,若没有指定,那么其由客户端参数 request.timeout.ms 来这时,默认是 30000
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout)

 其中 partitions 参数表示分区集合,而 timeout 参数用来设置等待获取的超时时间。如果没有指定 timeout 参数的值,那么 endOffsets 方法的等待时间由客户端参数 request.timeout.ms 来设置,默认值是 30000。与 endOffsets 对应的是 beginningOffsets() 方法,一个分区的起始位置起初是 0,但并不代表每时每刻都为 0,因为日志清理的动作会清理旧的数据,所以分区的起始位置会自然而然地增加。beginningOffsets() 方法的具体定义如下:

Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions)
// timeout 参数同 endOffsets
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout)

 其实 KafkaConsumer 中直接提供了 seekToBeginning() 方法和 seekToEnd() 方法来实现这两个功能,这两个方法的具体定义如下。

void seekToBeginning(Collection<TopicPartition> partitions)
void seekToEnd(Collection<TopicPartition> partitions);

 有时候我们并不知道特定的消费位置,却知道一个相关的时间点,比如我们想要消费昨天 8 点之后的消息,这个需求更符合正常的思维逻辑。KafkaConsumer 提供了一个 offsetsForTimes() 方法,通过 timestamp 来查询与此对应的分区位置。

// key 为待查询的分区,value 为待查询的时间戳,该方法会返回时间戳大于等于待查询 时间的第一条消息对应的位置和时间戳,对应于 OffsetAndTimestamp 中的 offset 和 timestamp 字段。  
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout)

 offsetsForTimes 方法的参数 timestampsToSearch 是一个 Map 类型,key 为待查询的分区,而 value 为待查询的时间戳,该方法会返回时间戳大于等于待查询时间的第一条消息对应的位置和时间戳,对应于 OffserAndTimeStamp 中的 offset 和 timestamp 字段。
 offsetsForTimes() 和 seek() 方法之间的使用方法见示例 com.kafka.stu.chapter03.offset.TimeOffsetConsumer。
 Kafka 中的消费位移是存储在一个内部主题中的,而 seek() 方法可以突破这一限制:消费位移可以保存在任意的存储介质,例如数据库、文件系统等。seek() 方法为我们提供了从特定位置读取消息的能力,我们可以通过这个方法来向前跳过若干消息,也可以通过这个方法来向后回溯若干消息,这样为消息的消费提供了很大的灵活性。

3.2.8 再均衡

再均衡

 再均衡是指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。不过在再均衡发生期间,消费组内的消费者是无法读取消息的。另外,当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。比如消费者消费完某个分区中的一部分消息时还没有来得及提交消费位移就发生了再均衡操作,之后这个分区又被分配给了消费组内的另一个消费者,原来被消费完的那部分消息又被重新消费一遍,也就是发生了重复消费。一般情况下,应尽量避免不必要的再均衡的发生。
 前面介绍 subscribe() 方法时提及再均衡监听器 ConsumerRebalanceListener。再均衡监听器用来设定发生再均衡动作前后的一些准备或收尾的动作,其定义如下:

public interface ConsumerRebalanceListener {

    /**
     * 在再均衡开始之前和消费者停止读取消息之后被调用。
     * 可以通过这个回调方法来处理消费位移的提交,以此来避免一些不必要的重复消费现象的发生。
     * @param partitions 再均衡前所分配到的分区
     */
    void onPartitionsRevoked(Collection<TopicPartition> partitions);

    /**
     * 在重新分配分区之后和消费者开始读取消费之前被调用。
     * @param partitions 再均衡后所分配到的分区
     */
    void onPartitionsAssigned(Collection<TopicPartition> partitions);
}

 再均衡监听器示例代码见 com.kafka.stu.chapter03.rebalance.RebalanceConsumer,将消费位移暂存到 cache,这样在正常消费的时候可以通过 commitAsync() 方法来异步提交消费位移,在发生再均衡动作之前可以通过再均衡监听器的 onPartitionsRevoked() 回调执行 commitSync() 方法同步提交消费位移,以尽量避免一些不必要的重复消费。
 再均衡监听器还可以配合外部存储使用。我们可以将消费位移保存在数据库中,这里可以通过再均衡监听器查找分配到的分区的消费位移,并且配合 seek() 方法来进一步优化代码逻辑。
 这里只是简单的介绍了再均衡监听器的用法,再均衡期间消费者客户端与 Kafka 服务端之间的交互逻辑及相关原理并不简单,会再后面介绍更多细节。

3.2.9 消费者拦截器

消费者拦截器

 和生产者一样,消费者也有拦截器。消费者拦截器主要在消费到消息或在提交消费位移时进行一些定制化的操作。
 消费者拦截器需要自定义实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口。ConsumerInterceptor 接口包含 3 个方法:

public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
    /**
     * KafkaConsumer 会在 poll() 方法返回之前调用此方法来对消息进行相应的定制化操作,比如修改返回的消息内容、
     * 按照某种规则过滤消息(可能会减少 poll() 方法返回的消息的个数)。
     * 如果在本方法中抛出异常,那么会被捕获并记录到日志中,但是异常不会再向上传递。
     * @param record 
     */
    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);

    /**
     * KafkaConsumer 会在提交完消费位移之后调用此方法,可以使用该方法来记录跟踪所提交的位移信息,
     * 比如当消费者使用 commitSync 的无参方法时,我们不知道提交的消费位移的具体细节,而使用该方法却可以做到这一点。
     * @param record 
     */
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);

    /**
     * 和生产者一致
     * @param record 
     */
    public void close();
}

 在某些业务场景中会对消息设置一个有效期的属性,如果某条消息在既定的时间窗口内无法到达,那么就会被视为无效,它也就不需要再被继续处理了。
 使用消费者拦截器来实现一个 TTL 的功能示例代码见 com.kafka.stu.chapter03.inter.ConsumerInterceptorTTL。使用这种功能时需注意,带参数的位移提交方式,即 public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) ,有可能提交了错误的位移信息。在一次消息拉取的批次中,可能含有最大偏移量的消息会被消费者拦截器过滤。
 消费者拦截器链同生产者拦截器链一致。

3.2.10 多线程实现

多线程实现

 KafkaProducer 是线程安全的,然而 KafkaConsumer 却是非线程安全的。KafkaConsumer 中定义了一个 acquire() 方法,用来检测当前是否只有一个线程在操作,若有其他线程正在操作则会抛出 ConcurrentModifcationException 异常。
 KafkaConsumer 中的每个公用方法在执行所要执行的动作之前都会调用这个 acquire() 方法,只有 wakeup() 方法是个例外。

public class KafkaConsumer<K, V> implements Consumer<K, V> {
    private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD); 

    private void acquire() {
        long threadId = Thread.currentThread().getId();
        if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
        refcount.incrementAndGet();
    }
}

 acquire() 方法和我们通常所说的锁(synchronized、Lock 等)不同,它不会造成阻塞等待,我们可以将其看作一个轻量级锁,它仅通过线程操作计数标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作。acquire() 方法和 release() 方法成对出现,表示相应的加锁和解锁操作。release() 方法也很简单,具体定义如下。

private void release() {
    if (refcount.decrementAndGet() == 0)
        currentThread.set(NO_CURRENT_THREAD);
}

 KafkaConsumer 非线程安全并不意味着我们在消费消息的时候只能以单线程的方式执行。如果生产者发送消息的速度大于消费者处理消息的速度,那么就会有越来越多的消息得不到及时的消费,造成了一定的延迟。除此之外,由于 Kafka 中消息保留机制的使用,有些消息有可能在被消费之前就被清理了,从而造成消息的丢失。我们可以通过多线程的方式来实现消息消费,多线程的目的就是为了提高整体的消费能力。多线程的实现方式有多种,第一种也是最常见的方式;线程封闭即为每个线程实例化一个 KafkaConsumer 对象。
 一个线程对应一个 KafkaConsumer 实例,我们可以称之为消费线程。一个消费线程可以消费一个或多个分区中的消息,所有的消费线程都隶属于同一个消费组。这种实现方式的并发度受限于分区的实际个数。当消费线程的个数大于分区数时,就有部分消费线程一直处于空闲的状态。
示例代码见 com.kafka.stu.chapter03.pool.FirstMultiConsumerThreadDemo,它的优点是每个线程可以按顺序消费各个分区中的消息。缺点也很明显,每个消费线程都要维护一个独立的 TCP 连接,如果分区数和 consumerThreadNum 的值都很大,那么会造成不小的系统开销。
 一般而言 poll() 拉取消息的速度是相当快的,而整体消费的瓶颈也正是在处理消息这一块,对应示例代码的处理消息模块,如果我们通过一定的方式来改进这一部分,那么我们就能带动整体消费性能的提升。因此,我们有第三种方式,将处理消息模块改成多线程的实现方式。可以基于 KafkaConsumerThread 里面通过线程池的方式来处理一批批的消息。注意可以将 ThreadPoolExcecutor 里的最后一个参数设置的是 CallerRunsPolicy(),这样可以防止线程池的总体消费能力跟不上 poll() 拉取的能力,从而导致异常现象的发生。这种方式还可以横向扩展,通过开启多个 KafkaConsumerThread 实例来进一步提升整体的消费能力。
 第二种方式是多个消费线程同时消费同一个分区,这个通过 assign()、seek() 等方法实现,这样可以打破原有的消费线程的个数不能超过分区数的限制,进一步提高了消费的能力。不过这种实现方式对于位移提交和顺序控制的处理就会变得非常复杂,实际应用中使用的极少,一般不推荐。一般而言,分区是消费线程的最小划分单位。
 第三种实现方式相比第一种实现方式而言,除了横向扩展的能力,还可以减少 TCP 连接对系统资源的消耗,不过缺点就是对于消息的顺序处理就比较困难了。示例代码略,要点是使用多线程进行消息处理,需要考虑顺序处理,后面补下代码。
 以下几点就不展开讨论了,可以注意下:

  • 第三种方式的消息顺序处理。
  • 多线程时的位移提交,需要共享变量。
  • 批量消费时的消息丢失,比如一个处理线程 RecordHandle1 正在处理 offset 为 0 ~ 99 的消息,而另一个处理线程 RecordHandle2 已经处理完了 offset 为 100 ~ 199 的消息并进行了位移提交,此时如果 RecordHandle1 发生异常,则之后的消费只能从 200 开始而无法再次消费 0 ~ 99 的消息,从而造成了消息丢失的现象。一种解决思路是滑动窗口实现的,具体不展开讨论了。

3.2.11 重要的消费者参数

 挑一些重要的参数进行讲解。

1. fetch.min.bytes

 该参数用来配置 Consumer 在一次拉取请求(调用 poll() 方法)中能从 Kafka 中拉取的最小数据量,默认值为 1(B)。Kafka 在收到 Consumer 的拉取请求时,如果返回给 Consumer 的数据量小于这个参数所配置的值,那么它就需要进行等待,直到数据量满足这个参数的配置大小。可以适当调大这个参数的值以提高一定的吞吐量,不过也会造成额外的延迟(latency),对于延迟敏感的应用可能就不可取了。

2. fetch.max.bytes

 该参数与 fetch.min.bytes 参数对应,它用来配置 Consumer 在一次拉取请求中从 Kakfa 中拉取的最大数据量,默认值为 52428800(B),也就是 50MB。如果这个参数设置的值比任何一条写入 Kafka 中的消息要小,那么还是可以正常消费。与此相关的吗,Kafka 中所能接收的最大消息的大小通过服务端参数 message.max.bytes (对应于主题端参数 max.message.bytes)来设置。

3. fetch.max.wait.ms

 这个参数也和 fetch.min.bytes 参数有关,如果 Kafka 仅仅参考 fetch.min.bytes 参数的要求,那么有可能会一直阻塞等待而无法发送响应给 Consumer,显然这是不合理的。该参数用于指定 Kafka 的等待时间,默认值为 500(ms)。如果 Kafka 中没有足够多的消息而满足不了 fetch.min.bytes 参数的要求,那么最终会等待 500 ms。这个参数的设定和 Consumer 与 Kafka 之间的延迟也有关系,如果业务应用对延迟敏感,那么可以适当调小这个参数。

4. max.partition.fetch.bytes

 用来配置从每个分区里返回给 Consumer 的最大数据量,默认值为 1048576(B),即 1MB。这个参数与 fetch.max.bytes 参数类似,只不过前者用来限制一次拉取中每个分区的消息大小,而后者用来限制一次拉取中整体消息的大小。同样,如果这个参数设定的值比消息的大小要小,那么也不会造成无法消费,Kafka 为了保持消费逻辑的正常运转不会对此做强硬的限制。

5. max.poll.records

 用来配置 Consumer 在一次拉取请求中拉取的最大消息数,默认值为 500 (条)。如果消息的大小都比较小,则可以适当调大这个参数值来提升一定的消费速度。

6. connections.max.idle.ms

 用来指定在多久之后关闭闲置的连接,默认值是 540000 (ms),即 9 分钟。

7. exclude.internal.topics

 Kafka 中有两个内部的主题:_consumer_offsets 和 _transaction_state。exclude.internal.topics 用来指定 Kafka 中的内部主题是否可以向消费者公开,默认值为 true。如果设置为 true,那么只能使用 subscribe(Collection) 的方式而不能使用 subsrcibe(Pattern) 的方式来订阅内部主题,设置为 false 则没有这个限制。

8. receive.buffer.bytes

 用来设置 Socket 接收消息缓冲区(SO_RECBUF)的大小,默认值为 65536(B),即 64 KB。如果设置为 -1,则使用操作系统的默认值。如果 Consumer 与 Kafka 处于不同的机房,则可以适当调大这个参数值。

9. sender.buffer.bytes

 用来设置 Socket 发送消息缓冲区(SO_SNDBUG)的大小,默认值为 131072(B),即 128 KB,与 receive.buffer.bytes 参数一样,如果设置为 -1,则使用操作系统的默认值。

10. request.timeout.ms

 用来配置 Consumer 等待请求响应的最长时间,默认值为 30000 (ms)。

11. metadata.max.age.ms

 用来配置元数据的过期时间,默认值为 300000(ms),即 5 分钟。如果元数据在此参数所限定的时间范围内没有进行更新,则会被强制更新,即使没有任何分区变化或有新的 broker 加入。

12. reconnect.backoff.ms

 用来配置尝试重新连接指定主机之前的等待时间(也称为退避时间),避免频繁地连接主机,默认值为 50 (ms)。这种机制适用于消费者向 broker 发送的所有请求。

13. retry.backoff.ms

 用来配置尝试重新发送失败的请求到指定的主题分区之前的等待(退避)时间,避免在某些故障情况下频繁地重复发送,默认值为 100 (ms)。

14. isolation.level

 用来配置消费者的事务隔离级别。字符串类型,有效值为“read_uncommitted”和“read_committed”,表示消费者所消费到的位置,如果设置为“read_committed”,那么消费者就会忽略事务未提交的消息,及只能消费到 LEO(LastStableOffset)的位置,默认情况下为“read_uncommitted”,即可以消费到 HW 处的位置。

其他参数

 还有些参数不一一展开介绍了:

参数默认值作用
bootstrap.servers""连接 Kafka 集群所需的 broker 地址清单。
key.deserializer消息中 key 所对应的反序列化类。
value.deserializer消息中 key 所对应的反序列化类。
group.id消费者所隶属的消费组的唯一标识,即消费组的名称。
client.id消费组客户端的 id。
heartbeat.interval.ms3000当使用 Kafka 的分组管理功能时,心跳到消费者协调器之间的预计时间。
session.timeout.ms10000组管理协议中用来检测消费者是否失效的超时时间。
max.poll.interval.ms300000当通过消费组管理消费者时,该配置指定拉取消息线程最长空闲时间,若超过这个时间间隔还没有发起 poll 操作,则消费组认为该消费者已离开了消费组,将进行再均衡操作。
auto.offset.resetlatest参数值为字符串类型,有效值为“earliest”、“latest”、“none”,配置为其余值会抛出异常。
enable.auto.committrueboolean 类型,配置是否开启自带提交消费位移的功能,默认开启。
auto.commit.interval.ms5000enable.auto.commit 参数设置为 true 时才有效,表示开启自动提交消费位移功能时自动提交消费位移的时间间隔。
partition.assignment.strategyorg.apache.kafka.clients.consumer.RangeAssignor消费者的分区分配策略
interceptor.class""用来配置消费者客户端的拦截器
Last Updated 4/20/2025, 9:23:27 PM