1 Java API
引入依赖
1 2 3 4 <dependency > <groupId > org.apache.kafka</groupId > <artifactId > kafka-clients</artifactId > </dependency >
topic要先提前创建,或者配置允许自动创建topic。
1 auto.create.topics.enable =true
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class SimpleConsumer { public static void main (String[] args) { Properties props = new Properties (); props.put("bootstrap.servers" , "192.168.44.160:9092" ); props.put("group.id" , "test-group" ); props.put("enable.auto.commit" , "true" ); props.put("auto.commit.interval.ms" , "1000" ); props.put("auto.offset.reset" , "earliest" ); props.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); props.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); KafkaConsumer<String, String> consumer = new KafkaConsumer <>(props); consumer.subscribe(Arrays.asList("mytopic" )); try { while (true ){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000 )); for (ConsumerRecord<String, String> record : records){ System.out.println("....." ); } } } finally { consumer.close(); } } }
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class SimpleProducer { public static void main (String[] args) { Properties pros = new Properties (); pros.put("bootstrap.servers" , "192.168.44.160:9092" ); pros.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); pros.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); pros.put("acks" , "1" ); pros.put("retries" , 3 ); pros.put("batch.size" , 16384 ); pros.put("linger.ms" , 5 ); pros.put("buffer.memory" , 33554432 ); pros.put("max.block.ms" , 3000 ); Producer<String, String> producer = new KafkaProducer <>(pros); for (int i=0 ;i<100 ;i++){ producer.send(new ProducerRecord <String, String>("mytopic" ,Integer.toString(i))); System.out.println("发送成功" ); } priducer.close(); } }
2 SpringBoot集成 版本对应关系:https://spring.io/projects/spring-kafka
引入依赖
1 2 3 4 <dependency > <groupId > org.springframework.kafka</groupId > <artifactId > spring-kafka</artifactId > </dependency >
配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 server.port =7271 spring.kafka.bootstrap-servers =192.168.44.160:9092 spring.kafka.producer.retries =1 spring.kafka.producer.batch-size =16384 spring.kafka.producer.buffer-memory =33554432 spring.kafka.producer.acks =1 spring.kafka.producer.properties.linger.ms =5 spring.kafka.producer.key-serializer =org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.value-serializer =org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.auto-offset-reset =earliest spring.kafka.consumer.enable-auto-commit =true spring.kafka.consumer.auto-commit-interval =1000 spring.kafka.consumer.key-deserializer =org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer =org.apache.kafka.common.serialization.StringDeserializer
消费者
1 2 3 4 5 6 7 8 @Component public class ConsumerListener { @KafkaListener(topics = "springboottopic", groupId="springgboottopic-group") public void onMessage (String msg) { System.out.println("收到消息:" + msg); } }
生产者
1 2 3 4 5 6 7 8 9 10 11 @Component public class KafkaProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; public String send (@RequestParam String msg) { kafkaTemplate.send("springboottopic" , msg); return "OK" ; } }
测试
1 2 3 4 5 6 7 8 9 10 11 12 @SpringBootTest class KafkaTest { @Autowired KafkaProducer controller; @Test void contextLoads () { long time = System.currentTimeMillis(); System.out.println(time + "已经发出" ); controller.send("test" + time); } }
3 集成Canal数据同步 Canal是一款纯Java开发的数据同步工具,可以支持binlog增量订阅(注意不是全量)。binlog设置为row模式以后,不仅能获取到执行的每一个增删改的脚本,同时还能获取到修改前和修改后的数据。所以,作为一款binlog增量数据解析的工具,canal可以做的事情很多,比如备份数据,缓存的同步刷新,构建ES索引,增量数据的处理等等。
canal可以多种方式把数据增量变动的信息发送出去,比如TCP和多种MQ,目前支持kafka,RabbitMQ,RocketMQ,而且提供了同步数据到hbase,elasticsearch的适配器。工作流程:数据变动——产生binlog信息——canal服务端获取binlog信息——发送MQ消息——消费者消费MQ消息,完成后续逻辑处理。
在目标数据库上创建用户和数据库
注意121的数据库首先要开启binlog,binlog-format必须是ROW
1 2 log-bin =/var/lib/mysql/mysql-bin binlog-format =ROW
用户和数据库创建
1 2 3 4 5 6 7 8 9 10 11 12 13 CREATE USER canal IDENTIFIED BY '123456' ;GRANT SELECT , REPLICATION SLAVE, REPLICATION CLIENT ON * .* TO canal@'%' ; FLUSH PRIVILEGES;ALTER USER 'canal' @'%' IDENTIFIED WITH mysql_native_password BY '123456' ;CREATE DATABASE `canaltest` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`;
在192.168.44.161上安装zk和kafka
具体查看集群环境搭建
下载安装canal
1 2 3 4 5 cd /usr/local/soft/mkdir canalcd canal wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz tar -zxvf canal.deployer-1.1.4.tar.gz
需要修改的配置项:conf/canal.properties
1 2 canal.serverMode =kafka canal.mq.servers = 192.168.44.160:9092
example/instance.properties
1 2 3 4 5 6 7 canal.instance.master.address =192.168.44.121:3306 canal.instance.dbUsername =canal canal.instance.dbPassword =123456 canal.instance.defaultDatabaseName =canaltest canal.mq.topic =canal-topic
1 2 3 4 sh startup.sh tail -100f /usr/local/soft/canal/logs/canal/canal.log
4 消息幂等性 消息重复消费问题需要在消费者端解决,也就是消费者实现幂等性。考虑所有的消费者都要做一样的实现,kafka干脆在Broker实现了消息的重复性判断,大大的解放了消费者的双手。肯定要依赖于生产者的消息的唯一标识,不然是没办法知道是不是同一条消息的。
1 props.put("ennable.idempotence" , true );
enable.idempotence设置成true后,Producer自动升级成幂等性Producer。Kafka会自动去重。有两个重要机制:
1、PID(Producer ID),幂等性的生产者每个客户端都有一个唯一的编号;
2、sequence number,幂等性的生产者发送的每条消息都会带相应的sequence number,Server端就是根据这个值来判断数据是否重复。如果说发现sequence number比服务端已经记录的值要小,那肯定是出现消息重复了。
当然,这个sequence number并不是全局有序的,不能保证所有时间上的幂等。所以,它的作用范围是有限的:
1、只能保证单分区上的幂等性,即一个幂等性Producer能够保证某个主题的一个分区不出现重复消息。
2、只能实现单会话上的幂等性,这里的会话是Producer进程的一次运行。当重启了Producer进程之后,幂等性不保证。
这个也很容易理解,也就是说不允许生产者在一次会话中向同一个partition发送相同的消息。如果要实现多个分区的消息的原子性,就要用到kafka的事务机制了。
5 生产者事务 生产者事务是kafka2017年0.11.0.0引入的新特性,通过事务,kafka可以保证跨生产者会话的消息幂等发送。有几种情况需要开启事务:
1、假设只有1个Broker,1个topic的分区只有一个副本,如果要发送多条消息,想要让这些消息全部成功或者全部失败
2、更加复杂的情况,如果生产者发送消息到多个topic或者多个partition,它们有可能分布在不同的服务器上,需要它们全部发送成功或者全部发送失败
3、消费者和生产者在同一块代码中(consumer-process-produce),从上游接收消息,经过处理后发给下游,这个时候要保证接收消息和发送消息同时成功。
生产者跟事务相关的方法一共有5个:
对象
描述
initTransactions()
初始化事务
beginTransaction()
开启事务
commitTransaction()
提交事务
abortTransaction()
终止事务
sendOffsetsToTransaction()
这个方法是消费者和生产者在同一段代码中使用的(从上游接收消费发送给下游),在提交的时候把消费的消息的offset发给consumer Corordinator
在Spring boot中可以同executeInTransaction 方法,或者加上@Transaction的注解。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 pros.put("transaction.id" , UUID.randomUUID().toString()); Producer<String, String> producer = new KafkaProducer <>(pros); producer.initTransactions();try { producer.beginTransaction(); producer.send(new ProducerRecord <String, String>("transaction-test" ,"1" ,"1" )); producer.send(new ProducerRecord <String, String>("transaction-test" ,"2" ,"2" )); Integer i = 1 /0 ; producer.send(new ProducerRecord <String, String>("transaction-test" ,"3" ,"3" )); producer.commitTransaction(); } catch (Exception e){ producer.abortTransaction(); }
kafka的分布式事务实现的核心思想。其中有几个关键点:
1)因为生产者的消息可能会跨分区,所以这里的事务是属于分布式事务。分布式事务的实现方式有很多,kafka选择了最常见的连接断提交(2PC)。如果大家都可以commit,那么就commit,否则abort。
2)既然是2PC,必须要有一个协调者的角色,叫做Transaction Coordinator。
3)事务管理器必须要有事务日志,来记录事务的状态,以便Coordinator在意外挂掉之后继续处理原来的事务,跟消费者offset的存储一样,kafka使用一个特殊的topic_transaction_state来记录事务状态。
4)如果生产者挂了,事务要在重启之后可以继续处理,接着之前未处理完的事务,或者在其他机器上处理,必须要有一个唯一的ID,这个就是transaction.id,这里使用UUID。配置了transaction.id,则此时enable.idempotence会被设置为true(事务实现的前提是幂等性)。事务ID相同的生产者,可以接着处理原来的事务。
步骤描述:
1、生产者通过initTransaction API向Coordinator注册事务ID
2、Coordinator记录事务日志
3、生产者把消息写入目标分区
4、分区和Coordinator的交互。当事务完成以后,消息的状态应该是已提交,这样消费者才可以消费到。