Kafka使用

1 Java API

  1. 引入依赖
1
2
3
4
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>

topic要先提前创建,或者配置允许自动创建topic。

1
auto.create.topics.enable=true
  1. 消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class SimpleConsumer{
public static void main(String[] args){
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.44.160:9092");
props.put("group.id", "test-group");
//是否自动提交偏移量,只有commit之后才更新消费组的offset
props.put("enable.auto.commit", "true");
//消费者自动提交的间隔
props.put("auto.commit.interval.ms", "1000");
//从最早的数据开始消费, earliest | latest | none
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//订阅队列
consumer.subscribe(Arrays.asList("mytopic"));
try {
while(true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for(ConsumerRecord<String, String> record : records){
System.out.println(".....");
}
}
} finally {
consumer.close();
}
}
}
  1. 生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class SimpleProducer {
public static void main(String[] args) {
Properties pros = new Properties();
pros.put("bootstrap.servers", "192.168.44.160:9092");
pros.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
pros.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//0 发出去就确认 | 1 leader落盘就确认 | all 所有Follower同步完才确认
pros.put("acks", "1");
//异常自动重试次数
pros.put("retries", 3);
//多少条数据发送一次,默认16K
pros.put("batch.size", 16384);
//批量发送的等待时间
pros.put("linger.ms", 5);
//客户端缓冲区大小,默认为32M,满了也会触发消息发送
pros.put("buffer.memory", 33554432);
//获取元素据时生产者的阻塞时间,超时后抛出异常
pros.put("max.block.ms", 3000);

Producer<String, String> producer = new KafkaProducer<>(pros);

for(int i=0;i<100;i++){
producer.send(new ProducerRecord<String, String>("mytopic",Integer.toString(i)));
System.out.println("发送成功");
}
priducer.close();
}
}

2 SpringBoot集成

版本对应关系:https://spring.io/projects/spring-kafka

  1. 引入依赖
1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
  1. 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
server.port=7271
spring.kafka.bootstrap-servers=192.168.44.160:9092

#producer
spring.kafka.producer.retries=1
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.acks=1
spring.kafka.producer.properties.linger.ms=5
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringDeserializer
#consumer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  1. 消费者
1
2
3
4
5
6
7
8
//使用注解@KafkaListener监听Topic
@Component
public class ConsumerListener{
@KafkaListener(topics = "springboottopic", groupId="springgboottopic-group")
public void onMessage(String msg){
System.out.println("收到消息:" + msg);
}
}
  1. 生产者
1
2
3
4
5
6
7
8
9
10
11
//注入模板方法KafkaTemplate发送消息,注意send方法有很多重载。异步回调ListenableFuture。
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

public String send(@RequestParam String msg){
kafkaTemplate.send("springboottopic", msg);
return "OK";
}
}
  1. 测试
1
2
3
4
5
6
7
8
9
10
11
12
@SpringBootTest
class KafkaTest {
@Autowired
KafkaProducer controller;

@Test
void contextLoads(){
long time = System.currentTimeMillis();
System.out.println(time + "已经发出");
controller.send("test" + time);
}
}

3 集成Canal数据同步

Canal是一款纯Java开发的数据同步工具,可以支持binlog增量订阅(注意不是全量)。binlog设置为row模式以后,不仅能获取到执行的每一个增删改的脚本,同时还能获取到修改前和修改后的数据。所以,作为一款binlog增量数据解析的工具,canal可以做的事情很多,比如备份数据,缓存的同步刷新,构建ES索引,增量数据的处理等等。

canal可以多种方式把数据增量变动的信息发送出去,比如TCP和多种MQ,目前支持kafka,RabbitMQ,RocketMQ,而且提供了同步数据到hbase,elasticsearch的适配器。工作流程:数据变动——产生binlog信息——canal服务端获取binlog信息——发送MQ消息——消费者消费MQ消息,完成后续逻辑处理。

  1. 在目标数据库上创建用户和数据库

注意121的数据库首先要开启binlog,binlog-format必须是ROW

1
2
log-bin=/var/lib/mysql/mysql-bin
binlog-format=ROW

用户和数据库创建

1
2
3
4
5
6
7
8
9
10
11
12
13
-- 创建canal专用的用户,用于访问master获取binlog
CREATE USER canal IDENTIFIED BY '123456';

-- 给canal用户分配查询和复制的权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal@'%';

-- 刷新权限
FLUSH PRIVILEGES;

ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY '123456';

-- 创建测试数据库
CREATE DATABASE `canaltest` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`;
  1. 在192.168.44.161上安装zk和kafka

具体查看集群环境搭建

  1. 下载安装canal
1
2
3
4
5
cd /usr/local/soft/
mkdir canal
cd canal
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
tar -zxvf canal.deployer-1.1.4.tar.gz

需要修改的配置项:conf/canal.properties

1
2
canal.serverMode=kafka
canal.mq.servers = 192.168.44.160:9092

example/instance.properties

1
2
3
4
5
6
7
canal.instance.master.address=192.168.44.121:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=123456
# 新增
canal.instance.defaultDatabaseName=canaltest
# 这个topic会自动创建
canal.mq.topic=canal-topic
1
2
3
4
# 启动canal
sh startup.sh
# 查看实例日志
tail -100f /usr/local/soft/canal/logs/canal/canal.log

4 消息幂等性

消息重复消费问题需要在消费者端解决,也就是消费者实现幂等性。考虑所有的消费者都要做一样的实现,kafka干脆在Broker实现了消息的重复性判断,大大的解放了消费者的双手。肯定要依赖于生产者的消息的唯一标识,不然是没办法知道是不是同一条消息的。

1
props.put("ennable.idempotence", true);

enable.idempotence设置成true后,Producer自动升级成幂等性Producer。Kafka会自动去重。有两个重要机制:

​ 1、PID(Producer ID),幂等性的生产者每个客户端都有一个唯一的编号;

​ 2、sequence number,幂等性的生产者发送的每条消息都会带相应的sequence number,Server端就是根据这个值来判断数据是否重复。如果说发现sequence number比服务端已经记录的值要小,那肯定是出现消息重复了。

​ 当然,这个sequence number并不是全局有序的,不能保证所有时间上的幂等。所以,它的作用范围是有限的:

​ 1、只能保证单分区上的幂等性,即一个幂等性Producer能够保证某个主题的一个分区不出现重复消息。

​ 2、只能实现单会话上的幂等性,这里的会话是Producer进程的一次运行。当重启了Producer进程之后,幂等性不保证。

​ 这个也很容易理解,也就是说不允许生产者在一次会话中向同一个partition发送相同的消息。如果要实现多个分区的消息的原子性,就要用到kafka的事务机制了。

5 生产者事务

生产者事务是kafka2017年0.11.0.0引入的新特性,通过事务,kafka可以保证跨生产者会话的消息幂等发送。有几种情况需要开启事务:

​ 1、假设只有1个Broker,1个topic的分区只有一个副本,如果要发送多条消息,想要让这些消息全部成功或者全部失败

​ 2、更加复杂的情况,如果生产者发送消息到多个topic或者多个partition,它们有可能分布在不同的服务器上,需要它们全部发送成功或者全部发送失败

​ 3、消费者和生产者在同一块代码中(consumer-process-produce),从上游接收消息,经过处理后发给下游,这个时候要保证接收消息和发送消息同时成功。

​ 生产者跟事务相关的方法一共有5个:

对象 描述
initTransactions() 初始化事务
beginTransaction() 开启事务
commitTransaction() 提交事务
abortTransaction() 终止事务
sendOffsetsToTransaction() 这个方法是消费者和生产者在同一段代码中使用的(从上游接收消费发送给下游),在提交的时候把消费的消息的offset发给consumer Corordinator

在Spring boot中可以同executeInTransaction 方法,或者加上@Transaction的注解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//事务ID,唯一
pros.put("transaction.id", UUID.randomUUID().toString());
Producer<String, String> producer = new KafkaProducer<>(pros);
//初始化事务
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<String, String>("transaction-test","1","1"));
producer.send(new ProducerRecord<String, String>("transaction-test","2","2"));
Integer i = 1/0;
producer.send(new ProducerRecord<String, String>("transaction-test","3","3"));
//提交事务
producer.commitTransaction();
} catch(Exception e){
//终止事务
producer.abortTransaction();
}

kafka的分布式事务实现的核心思想。其中有几个关键点:

​ 1)因为生产者的消息可能会跨分区,所以这里的事务是属于分布式事务。分布式事务的实现方式有很多,kafka选择了最常见的连接断提交(2PC)。如果大家都可以commit,那么就commit,否则abort。

​ 2)既然是2PC,必须要有一个协调者的角色,叫做Transaction Coordinator。

​ 3)事务管理器必须要有事务日志,来记录事务的状态,以便Coordinator在意外挂掉之后继续处理原来的事务,跟消费者offset的存储一样,kafka使用一个特殊的topic_transaction_state来记录事务状态。

​ 4)如果生产者挂了,事务要在重启之后可以继续处理,接着之前未处理完的事务,或者在其他机器上处理,必须要有一个唯一的ID,这个就是transaction.id,这里使用UUID。配置了transaction.id,则此时enable.idempotence会被设置为true(事务实现的前提是幂等性)。事务ID相同的生产者,可以接着处理原来的事务。

步骤描述:

​ 1、生产者通过initTransaction API向Coordinator注册事务ID

​ 2、Coordinator记录事务日志

​ 3、生产者把消息写入目标分区

​ 4、分区和Coordinator的交互。当事务完成以后,消息的状态应该是已提交,这样消费者才可以消费到。


Kafka使用
http://www.zivjie.cn/2023/06/03/消息队列/Kafka/kafka使用/
作者
Francis
发布于
2023年6月3日
许可协议