第 1 章 初识 Kafka

felix.shao2025-04-14

第 1 章 初识 Kafka

前述

 本部分的相关内容引用了书籍 《深入理解 Kafka 核心设计与实践原理》 的部分内容,如需完整内容需阅读原书籍。
 Kafka “扮演”的三大角色如下:

  • 消息系统:Kafka 和传统的消息系统都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
  • 存储系统:Kafka 把消息持久化到硬盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于 Kafka 的消息持久化功能和多副本机制,我们可以把 Kafka 作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。
  • 流式处理平台:Kakfa 不仅为每个流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作。

1.1 基本概念

Kafka 体系架构

 一个典型的 Kafka 体系架构包含若干 Producer、若干 Broker、若干 Consumer,以及一个 ZooKeeper 集群,如下图所示。

kafka_architecture.png
  • Producer:生产者。生产者负责创建消息,然后将其投递到 Kafka 中。
  • Consumer:消费者。消费者连接到 Kafka 上并接收消息。
  • Broker:服务代理节点。可以简单地看作一个独立的 Kafka 服务节点或 Kafka 服务实例。
主题和分区

 另外还有两个特别重要的概念--主题(topic)与分区(partition)。Kakfa 中的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题(发送到 Kafka 集群中的每条消息都要指定一个主题),而消费者负责订阅主题并进行消费。

 主题是一个逻辑上的概念,它还可以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区(Topic-Partition)。同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个追加的日志文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。offset 是消息在分区中的唯一标识,Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka保证的是分区有序而不是主题有序。

 消息被顺序追加到每个分区日志文件的尾部。Kafka 中的分区可以分布在不同的服务器(broker)上,也就是说,一个主题可以横跨多个 broker,以此来提供比单个 broker 更强大的性能。

 每一条消息被发送到 broker 之前,会根据分区规则选择存储到哪个具体的分区。如果分区规则设定的合理,所有的消息都可以均匀地分配到不同的分区中。分区可以解决单机性能瓶颈的问题,分区可以在创建主题的时候通过指定的参数来设置,也可以在主题创建完成之后去修改分区的数量,通过增加分区的数量可以实现水平扩展。

多副本、leader、follower、AR、ISR、OSR、HW、LEO

 Kafka 为分区引入了多副本(Replica)机制,通过增加副本数量可以提升容灾能力。同一分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样),副本之间是“一主多从”的关系,其中 leader 副本负责处理读写请求,follower 副本只负责与 leader 副本的消息同步。副本处于不同的 broker 中,当 leader 副本出现故障时,从 follower 副本中重新选举新的 leader 副本对外提供服务。Kafka 通过多副本机制实现了故障的自动转移,当 Kafka 集群中某个 broker 失效时仍然能保证服务可用。

 Kafka 消费端也具备一定的容灾能力。Consumer 使用拉(pull)模式从服务端拉取消息,并且保存消费的具体位置,当消费者沓机后恢复上线时可以根据之前保存的消费位置重新拉取需要的消息进行消费,这样就不会造成消息丢失。

 分区中的副本统称为 AR(Assigned Replicas)。所有与 leader 副本保持一定程度同步(可配置)的副本(包括 leader 副本在内)组成 ISR(In-Sync Replicas),ISR 集合是 AR 集合中的一个子集。与 leader 副本同步滞后过多的副本(不包括 leader 副本在内)组成 OSR(Out-of-Sync Replicas),由此可见,AR = ISR + OSR。在正常情况下,所有的 follower 副本都应该与 leader 副本保持一定的程度的同步,即 AR = ISR,OSR 集合为空。

 leader 副本负责维护与跟踪 ISR 集合中所有 follower 副本的滞后状态,当 follower 副本落后太多或失效时,leader 副本会把它从 ISR 集合中剔除。如果 OSR 集合中有 foller 副本“追上”了 leader 副本,那么 leader 副本会把它从 OSR 集合中转移至 ISR 集合。默认情况下,当 leader 副本发生故障时,只有在 ISR 集合中的副本才有资格被选举为 leader,而在 OSR 集合中的副本则没有任何机会(也可以通过修改相应的参数配置来改变)。

 ISR 与 HW 和 LEO 也有紧密的关系。 HW 是 High Watermark 的缩写,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个 offset 之前的消息。LEO 是 Log End Offset 的缩写,它标识当前日志文件中下一条待写入消息的 offset。如下图,很多资料误将图中的 offset 为 5 的位置看作 HW,而把 offset 为 8 的位置看作 LEO,这显然是不对的。
partition_offset_name_legend.png

Kafka 的复制机制

 Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的 follower 副本都复制完,这条消息才会被确认为已成功提交,这种复制方式极大地影响了性能。而在异步复制方式下,follower 副本异步地从 leader 副本中复制数据,数据只要被 leader 副本写入就被认为已经成功提交。在这种情况下,如果 follower 副本都还没有复制完而落后于 leader 副本,突然 leader 副本沓机,则会造成数据丢失。Kafka 使用的这种 ISR 的方式则有效地权衡了数据可靠性和性能之间的关系。

1.2 安装和配置

资源
名称文件名
zookeeperzookeeper-3.4.6.tar.gz
kafkakafka_2.12-2.1.0.tgz

 注意 zookeeper 对 java 版本有依赖:3.5.5 要求 jdk8 版本不低于 211,3.4.6 jdk1.7 支持。

主机名配置

 $ cat /etc/hostname

# server1
$ hostnamectl set-hostname server1
# server2
$ hostnamectl set-hostname server2
# server3
$ hostnamectl set-hostname server3

$ vi /etc/hosts
192.168.37.100  server1
192.168.37.101  server2
192.168.37.102  server3

 关闭防火墙、SELinux。

  1. JDK 安装。见 JDK 安装

  2. Zookeeper 集群搭建。见 Zookeeper 集群搭建

  • 注意配置的 /etc/hostname 要一致。

3.Kafka 的安装和配置。

解压、配置 Kafka
  1. 创建目录并解压软件
$ mkdir -p /opt/kafka/kafkalogs 
$ tar -zxvf /mnt/hgfs/vmshare/kafka/kafka_2.12-2.1.0.tgz -C /opt/kafka
$ cd /opt/kafka/kafka_2.12-2.1.0
  1. 配置环境变量
$ vi /etc/profile
export KAFKA_HOME=/opt/kafka/kafka_2.12-2.1.0
export PATH=$PATH:$KAFKA_HOME/bin
$ source /etc/profile
  1. 修改配置文件 $ cd /opt/kafka/kafka_2.12-2.1.0/config/ && vi server.properties
# broker.id broker 的编号,每个 broker 服务器的编号需要唯一。
# listeners broker 对外提供的服务入口地址,其中 ip 配置为 0.0.0.0 时允许外部访问.
# advertised.host.name 注意别配置localhost,否则客户端解析 localhost会找不到。
# advertised.listener 一般不用配置,主要用于 IAAS 环境,即公有云环境。
#server1
broker.id = 1 
listeners=PLAINTEXT://server1:9092
advertised.host.name=server1
#server2
broker.id = 2
listeners=PLAINTEXT://server2:9092
advertised.host.name=server2
#server3
broker.id = 3
listeners=PLAINTEXT://server3:9092
advertised.host.name=server3

log.dirs=/opt/kafka/kafkalogs
zookeeper.connect=server1:2181,server2:2181,server3:2181

#在 log.retention.hours=168 下面新增下面三项
# message.max.byte 指定 broker 所能接收消息的最大值,默认值是 1000012(B),约等于976.6KB。
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
  1. 启动 Kafka 集群。
     从后台启动 Kafka 集群(3 台都需要启动)。
$ kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

 检查服务是否启动,稍等一会。
 $ jps

20348 Jps
4233 QuorumPeerMain
18991 Kafka

1.3 生产与消费

生产与消费脚本测试
  1. 创建 Topic 来验证是否创建成功。
#--replication-factor 2   复制两份
#--partitions 1   创建 1 个分区
#--topic helloworld 主题为 helloworld
$ kafka-topics.sh --create --zookeeper server1:2181 --replication-factor 2 --partitions 1 --topic helloworld
  1. 在一台服务器上创建一个发布者。
     创建一个 broker 发布者。
    $ kafka-console-producer.sh --broker-list server2:9092 --topic helloworld

  2. 在一台服务器上创建一个订阅者。
    $ kafka-console-consumer.sh --bootstrap-server server3:9092 --from-beginning --topic helloworld

生产与消费 Java 测试

 可以参考 ConsumerFastStart、ProducerFastStart。

1.4 服务端参数配置

 即 $KAFKA_HOME/config/server.properties 文件中的配置。

  • zookeeper.connect。指明要连接的 ZooKeeper 集群的服务地址,包括端口号,没有默认值,必填,多个用英文逗号隔开。可以指明 chroot 路径,如 localhost:2181/kafka,如果不指定 chroot,那么默认使用 ZooKeeper 的根路径。
  • listeners。该参数指明 broker 监听客户端连接的地址列表,即为客户端要连接 broker 的入口地址列表。配置格式为 protocal1://hostname1:port1,...。
  • broker.id。该参数用来指定 Kafka 集群中 broker 的唯一标识,默认值为 -1。
  • logs.dir 和 logs.dirs。用来配置 Kafka 日志存放的根目录。其中,都可以配置多个根目录,且 logs.dir 优先级比 log.dir,如果未配置 logs.dir,则会以 log.dir 配置为准。
  • message.max.bytes。用来指定 broker 所能接收消息的最大值。默认值为 100000012 (B),约等于 976.6 KB。如果 Producer 发送的消息大于这个参数的值,那么就会报出 RecordTooLargeException 的异常。如果需要修改此参数,那么还需要考虑 max.request.size(客户端参数)、max.message.size(topic 参数)等参数的影响。为了避免修改此参数而引起级联的影响,建议在修改此参数之前考虑分拆消息的可能性。
  • 其他参数。

附录一、参考文献

附录二、Kafka 常用命令

Kafka 常用命令
# 停止 kafka
$ kafka-server-stop.sh -daemon $KAFKA_HOME/config/server.properties

# 查看 topic
$ kafka-topics.sh --list --zookeeper localhost:2181

# 查看 topic 状态
$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic helloworld

# 删除 topic(临时用,实际上删除topic要做的工作很多)
$ kafka-topics.sh --delete --zookeeper localhost:2181 --topic helloworld

# 查看 brokers 信息
$ zookeeper-shell.sh server1:2181,server2:2181,server3:2181
# zookeeper-shell.sh server2:2181 --topic helloworld --from-beginning
$ ls /brokers/ids

# 以下是 zk 相关的命令,使用客户端进入 zk
$ zkCli.sh -server 127.0.0.1:2181  #默认是不用加’-server‘参数的因为我们修改了他的端口
$ ls / # 查看目录情况 执行“ls /”
$ get /brokers/ids/1 # 标注一个重要的
$ get /brokers/topics/helloworld/partitions/0 # 还有一个是查看partion
$ quit # 退出
$ tail -50f $KAFKA_HOME/logs/kafkaServer.out
$ zkCli.sh -server server2:2181
$ ls /brokers/topics
$ rmr /brokers/topics/helloworld
$ ls /admin/delete_topics
$ rmr /admin/delete_topics/helloworld
Last Updated 4/15/2025, 9:45:58 PM