Kafka入门及架构介绍

官网:http://kafka.apache.org/documentation/

Kafka 是使用 Scala 编写具有高水平扩展和高吞吐量的分布式消息系统。

Kafka 对消息保存时根据 Topic 进行归类,发送消息者成为 Producer ,消息接受者成为 Consumer ,此外 kafka 集群有多个 kafka 实例组成,每个实例(server)称为 broker。

无论是 Kafka集群,还是 producer 和 consumer 都依赖于 zookeeper 来保证系统可用性,为集群保存一些 meta 信息。

1 使用场景

1.1 消息传递

消息传递就是发送数据,作为TCP HTTP或者RPC的替代方案,可以实现异步,解耦,削峰(RabbitMQ、RocketMQ能做的事情,它也能做)。因为kafka的吞吐量更高,在大规模消息系统中更有优势。第二个是大数据领域的使用,比如网站行为分析。

具体可以做网站活动跟踪,日志集合,应用指标监控等。

1.2 数据集成+流计算

数据集成指的是把kafka的数据导入Hadoop,Hbase等离线数据仓库,实现数据分析。

​ 第三块是流计算。什么是流(Stream)?它不是静态的数据,而是没有边界的,源源不断地产生的数据,就像流水一样。流计算指的就是Stream对做实时的计算。

​ Kafka在0.10版本后,内置了流处理框架API——kafka Streams。

​ 所以,它跟RabbitMQ的定位差别还是比较大的,不仅仅是一个简单的消息中间件,而且是一个流处理平台。在kafka中,消息被称为日志。日志就是消息的数据文件。

2 安装与命令

2.1 CentOS单机安装

  1. 下载解压kafka
1
2
3
4
cd /usr/local/soft
wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz
tar -xzvf kafka_2.13-2.5.0.tgz
cd kafka_2.13-2.5.0
  1. 启动zookeeper

kafka需要依赖ZK,安装包中已经自带了一个ZK,也可以改成指定已运行的ZK。
如果改成指定的ZK需要修改修改 kafka 安装目录下的 config/server.properties 文件中的 zookeeper.connect 。这里使用自带的ZK。

1
2
3
4
# 后台启动zk
nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties >> zookeeper.nohup &
# 检查zk是否启动成功
ps -ef | grep zookeeper
  1. 启动kafka
1
2
# 修改相关配置
vim config/server.properties
1
2
3
4
5
6
7
8
9
# Broker ID启动以后就不能改了
broker.id=1
# 取消注释,改成本机IP:
listeners=PLAINTEXT://192.168.44.160:9092
# num.partitions后面增加2行。
# 发送到不存在topic自动创建。允许永久删除topic。
num.partitions=1
auto.create.topics.enable=true
delete.topic.enable=true
1
2
# 后台启动kafka(kafka安装目录下)
nohup ./bin/kafka-server-start.sh ./config/server.properties &
  1. 创建Topic
1
2
3
4
5
# 创建一个名为gptest的topic,只有一个副本,一个分区
sh bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

# 查看已经创建的 topic
sh bin/kafka-topics.sh -list -zookeeper localhost:2181
  1. 启动Producer
1
2
# 新窗口
sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  1. 启动Consumer
1
2
# 新窗口
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  1. 删除kafka全部数据

(1)停止每台机器上的kafka;
(2)删除kafka存储目录(server.properties文件log.dirs配置,默认为“/tmp/kafka-logs”)全部topic的数据目录;
(3)删除zookeeper上与kafka相关的znode节点;除了/zookeeper
(4)重启kafka。

2.2 单机集群安装-伪集群

  1. 下载解压
1
2
3
4
cd /usr/local/soft
wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz
tar -xzvf kafka_2.13-2.5.0.tgz
cd kafka_2.13-2.5.0
  1. 修改配置文件
1
2
3
4
5
# 复制3个配置文件
cd config
cp server.properties server1.properties
cp server.properties server2.properties
cp server.properties server3.properties

修改配置文件中的broker.id分别为1、2、3
listeners这一行取消注释,端口号分别为9093、9094、9095
log.dirs分别设置为kafka-logs1、kafka-logs2、kafka-logs3(先创建)

1
mkdir -p /tmp/kafka-logs1 /tmp/kafka-logs2 /tmp/kafka-logs3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# server1.properties 的配置:
broker.id=1
listeners=PLAINTEXT://192.168.44.161:9093
log.dirs=/tmp/kafka-logs1

# server2.properties 的配置:
broker.id=2
listeners=PLAINTEXT://192.168.44.161:9094
log.dirs=/tmp/kafka-logs2

# server3.properties 的配置:
broker.id=3
listeners=PLAINTEXT://192.168.44.161:9095
log.dirs=/tmp/kafka-logs3
  1. 启动三个服务
1
2
3
4
5
6
# 先启动zk,再启动kafka
./kafka-server-start.sh -daemon ../config/server1.properties
./kafka-server-start.sh -daemon ../config/server2.properties
./kafka-server-start.sh -daemon ../config/server3.properties

# 如果遇到zk node exists的问题,先把brokers节点删掉(临时解决方案)。
  1. 创建Topic
1
2
3
4
# 创建一个名为gptest的topic,只有一个副本,一个分区
sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 查看已经创建的 topic
sh kafka-topics.sh -list -zookeeper localhost:2181
  1. 启动Consumer
1
2
# 新窗口
sh kafka-console-consumer.sh --bootstrap-server 192.168.44.161:9093,192.168.44.161:9094,192.168.44.161:9095 --topic test --from-beginning
  1. 启动Producer
1
2
# 新窗口
sh kafka-console-producer.sh --broker-list 192.168.44.161:9093,192.168.44.161:9094,192.168.44.161:9095 --topic test

2.3 kafka脚本介绍

脚本 作用
kafka-server-start.sh
kafka-server-stop.sh
kafka启动停止
kafka-topics.sh 查看创建删除topic
kafka-console-consumer.sh 消费者操作,例如监听topic
kafka-consumer-groups.sh 消费者组操作
kafka-console-producer.sh 生产者操作,例如发送消息
zookeeper-server-start.sh zk操作,启动停止连接zk
kafka-reassign-partitions.sh 分区重新分配
kafka-consumer-perf-test.sh 性能测试

2.4 kafka界面管理工具

Kafka没有自带管理界面,但是基于admin的接口可以开发。目前比较流行的管理界面主要是kafka-manager和kafka-eagle(国产)。

https://github.com/yahoo/kafka-manager/releases

https://github.com/smartloli/kafka-eagle

注意最新版本的cmak对java版本要求比较高,最低需要JDK11。kafka-eagle对内存要求比较高,在虚拟机中部署需要修改JVM参数才能启动。

3 架构分析

3.1 Broker

kafka作为一个中间件,是存储和转发消息的,它做的事情优点像中介,所以把kafka的服务叫做Broker,默认是9092端口。生产者和消费者都需要跟这个Broker建立一个连接,才可以实现消息的收发。

3.2 消息

客户端之间传输的数据叫做消息,或者叫做记录。在客户端的代码中。Reocrd可以是一个KV键值对。

生产者对应的封装类是ProducerRecord,消费者对应的封装类是ConsumerRecord。消息在传输过程中需要序列化,所以代码中要指定序列化工具。消息在服务端的存储格式(RecordBatc和Record):http://kafka.apache.org/documentation/#messageformat

3.3 生产者

发送消息的一方叫做生产者,接收消息的一方叫做消费者。为了提升消息发送速率,生产者不是逐条发送消息给Broker,而是批量发送的。多少条发送一次由一个参数决定。

1
pros.put("batch.size", 16384);

3.4 消费者

一般来说消费者获得消息有两种模式,一种是pull模式,一种是push模式。

Pull模式就是消息放在Broker,消费者自己决定什么时候去获取。Push模式是消息放在Consumer,只要有消息到达Broker,都直接推给消费者。

RabbitMQ Consumer支持push和pull,一般使用push。Kafka只有pull模式。

为什么消费者用pull,官网已经说得很明白了:http://kafka.apache.org/documentation/#design_pull

在push模式下,如果消息产生速度远远大于消费者消费消息的速率,那消费者就会不堪重负,直到挂掉。而且消费者可以自己控制一次到底获取多少条消息:max.poll.records。默认500。在poll方法中可以指定。

3.5 Topic

kafka中,队列叫做Topic,是一个逻辑的概念,可以理解为一组消息的集合(不同业务用途的消息)。生产者和Topic以及Topic和消费者的关系都是多对多。一个生产者可以发送消息到多个Topic,一个消费者也可以从多个Topic获取消息(但是不建议这么做)。

注意,生产者发送消息时,如果Topic不存在,会自动创建。由一个参数控制: auto.create.topics.enable

默认为true。如果要彻底删掉一个Topic,这个参数必须改为false,否则只有代码使用这个Topic,它就会自动创建。

https://kafka.apache.org/documentation/#auto.crete.topics.enable

3.6 Partition与Cluster

如果说一个Topic的消息太多,会带来两个问题:

​ 1、不方便横向扩展,比如想要在集群中把数据分布在不同的机器上实现扩展,而不是通过升级硬件做到,如果一个Topic的消息无法在物理上拆分到多台机器的时候,这个是做不到的。

​ 2、并发或者负载的问题,所以的客户端操作的都是同一个Topic,在搞并发的场景下性能会大大下降。

​ 怎么解决这个问题呢?想到的就是把一个Topic进行拆分(分片的思想)。

​ kafka引入了一个分区(Partition)的概念。一个Topic可以划分成多个分区。分区在创建topic的时候指定,每个topic至少有一个分区。如果没有指定分区数,默认的分区数是一个,这个参数可以修改:

1
num.partitions=1
1
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

partitions是分区数,replication-factor是主题的副本数。

partition思想上有点类似与分库分表,实现的也是横向扩展和负载的目的。

每个partition都有一个物理目录。在配置的数据目录下(日志就是数据):/tem/kafka-logs

跟RabbitMQ不一样的地方是,Partition中的消息被读取之后不会被删除,所以一批消息在一个Partition里面顺序,追加写入的。这个也是kafka吞吐量大的一个很重要的原因。

3.7 Partition副本Replica机制

如果partiton的数据只存储一份,在发生网络或者硬件故障的时候,该分区的数据就无法访问或者无法恢复了。kafka在0.8版本后增加了副本机制。每个partition可以有若干个副本(Replica),副本必须在不同的Broker上面。一般说的副本包括其中的主节点。由repliacation-factor指定一个Topic的副本数。服务端有一个参数控制默认的副本数:

1
offsets.topic.replication.factor

3.8 Segment

kafka的数据是放在后缀.log的文件中的。如果一个partiton只有一个log文件,消息不断地追加,这个log文件也会变得越来越大,这个时候要检索数据效率就很低了。所以干脆把partition再做一个切分,切分出来的单位就叫做段(Segment)。实际上kafka的存储文件是划分成段来存储的。默认存储路径:/tmp/kafka-logs

每个segment都有至少1个数据文件和2个索引文件,这三个文件是成套出现的。segment默认大小是1G,由这个参数控制:log.segment.bytes

3.9 Consumer Group

Consumer Group消费组的概念,在代码中通过group id来配置。消费同一个topic的消费者不一定是同一组,只有group id相同的消费者才是同一个消费者组。

注意:同一个group中的消费者,不能浪费相同的partition——partition要在消费者之间分配。

怎么理解呢?在上大学的时候是没有固定的教室的吧?教室中的座位可以理解成partition,分区。一个教室由很多班级都可以使用,一个班级就可以理解为consumer group。很显然,对于一个班级来说,是不可能两个人坐一张桌子的。但是对于不同的班级,却是可以的。比如编号为0的这张桌子,5班的某个学生用,6班的某个学生用,7班的某个学生用。

​ 那就会有一个问题,如果学生比桌子多,或者学生比桌子少,怎么办?

​ 1、如果消费者比partition少,一个消费者可能消费多个partition(两张桌子凑在一起,躺在桌子上都可以)

​ 2、如果消费者比partition多,肯定有消费者没有partition可以消费(两个人不能挤在一张桌子上,有一个人站着上课了)。不会出现一个group中的消费者消费同一个partition的情况。

​ 如果想要同时消费同一个partition的消息,那么需要其他的组来消费。

3.10 Consumer Offset

partition中的消息是顺序写入的,被读取之后不会被删除。如果消息者挂了或者下一次读取,想要接着上次的位置读取消息,或者从某个特定的位置读取消息,怎么办呢?会不会出现重复消费的情况?因为消息是有序的,可以对消息进行编号,用来标识一条唯一的消息。

这个编号就把它叫做offset,偏移量。offset记录着下一条将要发送给consumer的消息的序号。这个消费者跟partition之间的的偏移量没有保存在ZK,而是直接保存在服务端。


Kafka入门及架构介绍
http://www.zivjie.cn/2023/06/03/消息队列/Kafka/kafka入门及架构介绍/
作者
Francis
发布于
2023年6月3日
许可协议