Kafka入门及架构介绍
官网:http://kafka.apache.org/documentation/
Kafka 是使用 Scala 编写具有高水平扩展和高吞吐量的分布式消息系统。
Kafka 对消息保存时根据 Topic 进行归类,发送消息者成为 Producer ,消息接受者成为 Consumer ,此外 kafka 集群有多个 kafka 实例组成,每个实例(server)称为 broker。
无论是 Kafka集群,还是 producer 和 consumer 都依赖于 zookeeper 来保证系统可用性,为集群保存一些 meta 信息。
1 使用场景
1.1 消息传递
消息传递就是发送数据,作为TCP HTTP或者RPC的替代方案,可以实现异步,解耦,削峰(RabbitMQ、RocketMQ能做的事情,它也能做)。因为kafka的吞吐量更高,在大规模消息系统中更有优势。第二个是大数据领域的使用,比如网站行为分析。
具体可以做网站活动跟踪,日志集合,应用指标监控等。
1.2 数据集成+流计算
数据集成指的是把kafka的数据导入Hadoop,Hbase等离线数据仓库,实现数据分析。
第三块是流计算。什么是流(Stream)?它不是静态的数据,而是没有边界的,源源不断地产生的数据,就像流水一样。流计算指的就是Stream对做实时的计算。
Kafka在0.10版本后,内置了流处理框架API——kafka Streams。
所以,它跟RabbitMQ的定位差别还是比较大的,不仅仅是一个简单的消息中间件,而且是一个流处理平台。在kafka中,消息被称为日志。日志就是消息的数据文件。
2 安装与命令
2.1 CentOS单机安装
- 下载解压kafka
1 | |
- 启动zookeeper
kafka需要依赖ZK,安装包中已经自带了一个ZK,也可以改成指定已运行的ZK。
如果改成指定的ZK需要修改修改 kafka 安装目录下的 config/server.properties 文件中的 zookeeper.connect 。这里使用自带的ZK。
1 | |
- 启动kafka
1 | |
1 | |
1 | |
- 创建Topic
1 | |
- 启动Producer
1 | |
- 启动Consumer
1 | |
- 删除kafka全部数据
(1)停止每台机器上的kafka;
(2)删除kafka存储目录(server.properties文件log.dirs配置,默认为“/tmp/kafka-logs”)全部topic的数据目录;
(3)删除zookeeper上与kafka相关的znode节点;除了/zookeeper
(4)重启kafka。
2.2 单机集群安装-伪集群
- 下载解压
1 | |
- 修改配置文件
1 | |
修改配置文件中的broker.id分别为1、2、3
listeners这一行取消注释,端口号分别为9093、9094、9095
log.dirs分别设置为kafka-logs1、kafka-logs2、kafka-logs3(先创建)
1 | |
1 | |
- 启动三个服务
1 | |
- 创建Topic
1 | |
- 启动Consumer
1 | |
- 启动Producer
1 | |
2.3 kafka脚本介绍
| 脚本 | 作用 |
|---|---|
| kafka-server-start.sh kafka-server-stop.sh |
kafka启动停止 |
| kafka-topics.sh | 查看创建删除topic |
| kafka-console-consumer.sh | 消费者操作,例如监听topic |
| kafka-consumer-groups.sh | 消费者组操作 |
| kafka-console-producer.sh | 生产者操作,例如发送消息 |
| zookeeper-server-start.sh | zk操作,启动停止连接zk |
| kafka-reassign-partitions.sh | 分区重新分配 |
| kafka-consumer-perf-test.sh | 性能测试 |
2.4 kafka界面管理工具
Kafka没有自带管理界面,但是基于admin的接口可以开发。目前比较流行的管理界面主要是kafka-manager和kafka-eagle(国产)。
https://github.com/yahoo/kafka-manager/releases
https://github.com/smartloli/kafka-eagle
注意最新版本的cmak对java版本要求比较高,最低需要JDK11。kafka-eagle对内存要求比较高,在虚拟机中部署需要修改JVM参数才能启动。
3 架构分析

3.1 Broker
kafka作为一个中间件,是存储和转发消息的,它做的事情优点像中介,所以把kafka的服务叫做Broker,默认是9092端口。生产者和消费者都需要跟这个Broker建立一个连接,才可以实现消息的收发。
3.2 消息
客户端之间传输的数据叫做消息,或者叫做记录。在客户端的代码中。Reocrd可以是一个KV键值对。
生产者对应的封装类是ProducerRecord,消费者对应的封装类是ConsumerRecord。消息在传输过程中需要序列化,所以代码中要指定序列化工具。消息在服务端的存储格式(RecordBatc和Record):http://kafka.apache.org/documentation/#messageformat
3.3 生产者
发送消息的一方叫做生产者,接收消息的一方叫做消费者。为了提升消息发送速率,生产者不是逐条发送消息给Broker,而是批量发送的。多少条发送一次由一个参数决定。
1 | |
3.4 消费者
一般来说消费者获得消息有两种模式,一种是pull模式,一种是push模式。
Pull模式就是消息放在Broker,消费者自己决定什么时候去获取。Push模式是消息放在Consumer,只要有消息到达Broker,都直接推给消费者。
RabbitMQ Consumer支持push和pull,一般使用push。Kafka只有pull模式。
为什么消费者用pull,官网已经说得很明白了:http://kafka.apache.org/documentation/#design_pull
在push模式下,如果消息产生速度远远大于消费者消费消息的速率,那消费者就会不堪重负,直到挂掉。而且消费者可以自己控制一次到底获取多少条消息:max.poll.records。默认500。在poll方法中可以指定。
3.5 Topic
kafka中,队列叫做Topic,是一个逻辑的概念,可以理解为一组消息的集合(不同业务用途的消息)。生产者和Topic以及Topic和消费者的关系都是多对多。一个生产者可以发送消息到多个Topic,一个消费者也可以从多个Topic获取消息(但是不建议这么做)。
注意,生产者发送消息时,如果Topic不存在,会自动创建。由一个参数控制: auto.create.topics.enable
默认为true。如果要彻底删掉一个Topic,这个参数必须改为false,否则只有代码使用这个Topic,它就会自动创建。
https://kafka.apache.org/documentation/#auto.crete.topics.enable
3.6 Partition与Cluster
如果说一个Topic的消息太多,会带来两个问题:
1、不方便横向扩展,比如想要在集群中把数据分布在不同的机器上实现扩展,而不是通过升级硬件做到,如果一个Topic的消息无法在物理上拆分到多台机器的时候,这个是做不到的。
2、并发或者负载的问题,所以的客户端操作的都是同一个Topic,在搞并发的场景下性能会大大下降。
怎么解决这个问题呢?想到的就是把一个Topic进行拆分(分片的思想)。
kafka引入了一个分区(Partition)的概念。一个Topic可以划分成多个分区。分区在创建topic的时候指定,每个topic至少有一个分区。如果没有指定分区数,默认的分区数是一个,这个参数可以修改:
1 | |
1 | |
partitions是分区数,replication-factor是主题的副本数。
partition思想上有点类似与分库分表,实现的也是横向扩展和负载的目的。
每个partition都有一个物理目录。在配置的数据目录下(日志就是数据):/tem/kafka-logs
跟RabbitMQ不一样的地方是,Partition中的消息被读取之后不会被删除,所以一批消息在一个Partition里面顺序,追加写入的。这个也是kafka吞吐量大的一个很重要的原因。
3.7 Partition副本Replica机制
如果partiton的数据只存储一份,在发生网络或者硬件故障的时候,该分区的数据就无法访问或者无法恢复了。kafka在0.8版本后增加了副本机制。每个partition可以有若干个副本(Replica),副本必须在不同的Broker上面。一般说的副本包括其中的主节点。由repliacation-factor指定一个Topic的副本数。服务端有一个参数控制默认的副本数:
1 | |
3.8 Segment
kafka的数据是放在后缀.log的文件中的。如果一个partiton只有一个log文件,消息不断地追加,这个log文件也会变得越来越大,这个时候要检索数据效率就很低了。所以干脆把partition再做一个切分,切分出来的单位就叫做段(Segment)。实际上kafka的存储文件是划分成段来存储的。默认存储路径:/tmp/kafka-logs
每个segment都有至少1个数据文件和2个索引文件,这三个文件是成套出现的。segment默认大小是1G,由这个参数控制:log.segment.bytes

3.9 Consumer Group
Consumer Group消费组的概念,在代码中通过group id来配置。消费同一个topic的消费者不一定是同一组,只有group id相同的消费者才是同一个消费者组。
注意:同一个group中的消费者,不能浪费相同的partition——partition要在消费者之间分配。
怎么理解呢?在上大学的时候是没有固定的教室的吧?教室中的座位可以理解成partition,分区。一个教室由很多班级都可以使用,一个班级就可以理解为consumer group。很显然,对于一个班级来说,是不可能两个人坐一张桌子的。但是对于不同的班级,却是可以的。比如编号为0的这张桌子,5班的某个学生用,6班的某个学生用,7班的某个学生用。
那就会有一个问题,如果学生比桌子多,或者学生比桌子少,怎么办?
1、如果消费者比partition少,一个消费者可能消费多个partition(两张桌子凑在一起,躺在桌子上都可以)
2、如果消费者比partition多,肯定有消费者没有partition可以消费(两个人不能挤在一张桌子上,有一个人站着上课了)。不会出现一个group中的消费者消费同一个partition的情况。
如果想要同时消费同一个partition的消息,那么需要其他的组来消费。
3.10 Consumer Offset
partition中的消息是顺序写入的,被读取之后不会被删除。如果消息者挂了或者下一次读取,想要接着上次的位置读取消息,或者从某个特定的位置读取消息,怎么办呢?会不会出现重复消费的情况?因为消息是有序的,可以对消息进行编号,用来标识一条唯一的消息。
这个编号就把它叫做offset,偏移量。offset记录着下一条将要发送给consumer的消息的序号。这个消费者跟partition之间的的偏移量没有保存在ZK,而是直接保存在服务端。