# 图集 **Repository Path**: zhongxianliang/atlas ## Basic Information - **Project Name**: 图集 - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2024-05-18 - **Last Updated**: 2024-05-20 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README Kafka ⼀ 概述 Kafka传统定义: Kafka是⼀个分布式基于发布/订阅模式的消息队列 发布/订阅模式: 消息的发布者不会将消息直接发送给特定的订阅者, ⽽是将发布的消息分为不同的类 别, 订阅者只接收⾃⼰感兴趣的消息 消息队列 传统消息队列的应⽤场景 • 缓存/削峰: 有助于控制和优化数据流经过系统的速度, 解决⽣产消息和消费消息的处理速度不⼀致 的情况 • 解耦: 允许独⽴的扩展和修改两边的处理过程, 只要确保遵守同样的接⼝约束即可 • 异步通信: 允许⽤⼾把⼀个消息放⼊队列, 但并不⽴即处理它, 然后再有需要的时候再进⾏处理 消息队列的两种模式 • 点对点模式: 消费者主动拉取数据, 消息收到后发送确认信息, 消息队列随即删除数据 • 发布/订阅模式: 消息的发布者不会将消息直接发送给特定的订阅者, ⽽是将发布的消息分为不同的 类别, 订阅者只接收⾃⼰感兴趣的消息 ◦ 可以有多个 topic 主题, ⽅便订阅者后期订阅 ◦ 消费者消费数据之后不删除数据 ◦ 每个消费者相互独⽴, 都可以消费到数据 Kafka 基础架构 1. 为了⽅便扩展和提⾼吞吐量, ⼀个 topic 氛围多个 partition 2. 配合分区的设计, 提出消费者组的概念, 组内每个消费者并⾏消费(注意: ⼀个 partition 中的数据只能 由消费者组中的⼀个 group 来消费) 3. 为了提⾼可⽤性, 为每个 partition 增加若⼲个副本 ⼆ Kafka ⽣产者 1.发送原理 在消息发送的过程中, 涉及到了两个线程: main 线程和 Sender 线程 在 main 线程中创建了⼀个双端队列 RecordAccumulator main 线程将消息发送给 RecordAccumulator, Sender 线程不断从 RecordAccumulator 中拉取消息发 送到 Kafka Broker 2.异步发送 API 普通异步发送 1 // 配置 2 Properties properties = new Properties(); 3 // 连接集群 bootstrap.servers 4 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx,xxx,xxx"); 5 // 指定对应的key和value 6 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.clas 7 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.cl 8 9 // 创建kafka⽣产者对象 10 KafkaProducer kafkaProducer = new KafkaProducer<>(properties); 11 12 // 发送数据 13 for(int i=0; i<5; i++){ 14 kafkaProducer.send(new ProducerRecord<>("first", "first" + i)); 15 } 16 17 // 关闭资源 18 kafkaProducer.close; 带回调函数的异步发送 1 // 配置 2 Properties properties = new Properties(); 3 // 连接集群 bootstrap.servers 4 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx,xxx,xxx"); 5 // 指定对应的key和value 6 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.clas 7 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.cl 8 9 // 创建kafka⽣产者对象 10 KafkaProducer kafkaProducer = new KafkaProducer<>(properties); 11 12 // 发送数据 13 for(int i=0; i<5; i++){ 14 kafkaProducer.send(new ProducerRecord<>("first", "first" + i), new Callback( 15 @Override 16 public void onCompletion(RecordMetadata metadata, Exception exception) { 17 if (exception == null){ 18 System..out.println("主题: " + metadata.topic() + "分区: " + meat 19 } 20 } 21 }); 22 } 23 24 // 关闭资源 25 kafkaProducer.close; 3.同步发送 API 1 // 配置 2 Properties properties = new Properties(); 3 // 连接集群 bootstrap.servers 4 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx,xxx,xxx"); 5 // 指定对应的key和value 6 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.clas 7 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.cl 8 9 // 创建kafka⽣产者对象 10 KafkaProducer kafkaProducer = new KafkaProducer<>(properties); 11 12 // 发送数据 13 for(int i=0; i<5; i++){ 14 kafkaProducer.send(new ProducerRecord<>("first", "first" + i)).get(); 15 } 16 17 // 关闭资源 18 kafkaProducer.close; 4.⽣产者分区 分区的好处 1. 便于合理使⽤存储资源,每个Position在⼀个Broker上存储,可以把海量的数据按照分区切割成⼀ 块⼀块的数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果 2. 提⾼并⾏度,⽣产者可以以分区为单位发送数据,消费者可以以分区为单位进⾏消费数据 ⽣产者发送消息的分区策略 1. 如果制定了分区,就按照指定的分区发送 2. 如果没有指定分区但是有key,那就使⽤key%分区数来进⾏分区选定 注意在⽣产环境中⼀般将表名作为key来指定分区,以便于管理 3. 如果都没有,选择粘性分区,当当前分区满了,才发送给下⼀个分区 ⾃定义分区器 ⾃定义分区器 1 package com.lcha.kafka_pro.producer; 2 3 import org.apache.kafka.clients.producer.Partitioner; 4 import org.apache.kafka.common.Cluster; 5 6 import java.util.Map; 7 8 public class MyPartitioner implements Partitioner { 9 10 @Override 11 public int partition(String topic, Object key, byte[] keyBytes, Object value 12 // 获取数据 13 String msgValue = value.toString(); 14 int partition; 15 if (msgValue.contains("lcha")){ 16 partition = 0; 17 } else { 18 partition = 1; 19 } 20 return partition; 21 } 22 23 @Override 24 public void close() { 25 26 } 27 28 @Override 29 public void configure(Map configure) { 30 31 } 32 } 使⾃定义分区器⽣效 1 package com.lcha.kafka_pro.producer; 2 3 import org.apache.kafka.clients.producer.*; 4 import org.apache.kafka.common.serialization.StringSerializer; 5 6 import java.util.Properties; 7 8 public class CustomProducer { 9 public static void main(String[] args) { 10 // 配置 11 Properties properties = new Properties(); 12 // 连接集群 bootstrap.servers 13 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx,xxx,xxx"); 14 // 指定对应的key和value 15 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSeriali 16 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSeria 17 18 // 关联⾃定义分区器 19 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.lcha.kafka_ 20 21 // 创建kafka⽣产者对象 22 KafkaProducer kafkaProducer = new KafkaProducer<>(proper 23 24 // 发送数据 25 for(int i=0; i<5; i++){ 26 kafkaProducer.send(new ProducerRecord<>("first", "first" + i), new C 27 @Override 28 public void onCompletion(RecordMetadata metadata, Exception exce 29 if (exception == null){ 30 System.out.println("主题: " + metadata.topic() + "分区: " 31 } 32 } 33 }); 34 } 35 36 // 关闭资源 37 kafkaProducer.close(); 38 } 39 } 注意:⾃定义分区器也同样可以处理⼀些脏数据,在某些数据不符合要求的时候可以发送到指定的分 区进⾏统⼀处理 5.⽣产者如何提⾼吞吐量 ⾸先这4个参数都会影响kafka的吞吐量 • batch.size:批次⼤⼩,默认为16k • linger.ms:等待时间,⼀般为5-100ms • compression.type:压缩类型,snappy⽤的最多 • RecordAccumulator:缓冲区⼤⼩ 6.数据可靠性 • ack=0:⽣产者发送过来的数据,不需要等数据落盘应答 • ack=1:⽣产者发送过来的数据,Leader收到数据后应答 • sck=-1:⽣产者发送过来的数据,Leader和isr队列⾥⾯的所有节点收⻬数据后应答 注意:如果某⼀台Follower因为某种故障不能与Leader进⾏同步导致迟迟⽆法返回应答信息怎么 办? Leader维护了⼀个动态的in-sync replica set(ISR),意为和Leader保持同步的Follower+Leader 集合(leader:0,isr:0,1,2)。如果Follower⻓时间未向Leader发送请求信息或者同步数据,则 将Follower踢出ISR 数据完全可靠条件=ACK级别设置为-1 + 分区副本⼤于等于2 + ISR应答的最⼩副本数量⼤于2 1 // acks 2 properties.put(ProducerConfig.ACKS_CONFIG, "1"); 3 // 重试次数 4 properties.put(ProducerConfig.RETRIES_CONFIG, 3); 7.数据去重 ⾄少⼀次=ACK级别设置为-1 + 分区副本⼤于等于2 + ISR应答的最⼩副本数量⼤于2 可以保证数据不丢失,但不能保证数据不重复 最多⼀次=ACK级别设置为0 可以保证数据不重复,但不能保证数据不丢失 怎么做到既可以保证数据不重复,还可以保证数据不丢失? 幂等性 幂等性:Producer⽆论向Broker发送多少次重复数据,Broker端最多只会持久化⼀条数据,保证不重 复 判断标准:具有相同主键的消息提交时,Broker只会持久化⼀条。其 中 PID 在kafka每次重启时都会分配⼀个新的,Partition表⽰分区号,Sequence Number是单调递增 的 但幂等性只能保证在单分区单次会话不重复 精确⼀次 = 幂等性 + ⾄少⼀次 ⽣产者事务 开启事务,必须开启幂等性 Producer 在使⽤事务功能之前,必须先⾃定义⼀个唯⼀的 transactional.id,有个这个id,即使客⼾ 端挂掉重启后也能继续处理未完成的事务 8.数据乱序 1. kafka在1.x版本之前保证数据单分区有序,条件如下: max.in.flight.requests.per.connection = 1 2. kafka在1.x版本之后保证数据单分区有序,条件如下: a. 未开启幂等性 max.in.flight.requests.per.connection = 1 b. 开启幂等性 max.in.flight.requests.per.connection <= 5 原因:在启⽤幂等性之后,kafka服务端会缓存producer发来的最近5个request数据,⽆论如 何,都可以保证最近5个request数据是有序的 三 Kafka Broker 1.Kafka Broker ⼯作流程 zookeeper存储的 kafka 信息 重点信息: 1. /kafka/brokers/ids [0,1,2] ⽤来记录有哪些服务器 2. /kafka/brokers/topics/first/partitions/0/state {"leader": 1, "isr": [1,0,2]} 记录谁是leader,有哪 些服务可⽤ Kafka Broker ⼯作原理 2.kafka 副本 副本基本信息 1. Kafka 副本的作⽤:提⾼数据可靠性 2. Kafka 默认副本1个,⽣产环境⼀般配置2个,保证数据可靠性。太多的副本会增加磁盘存储空间, 增加⽹络上数据传输,降低效率 3. Kafka 中的副本分为:leader和follower,⽣产者只会将数据发送给leader,follower⾃⼰找leader 进⾏数据同步 4. kafka分区中的所有副本统称为AR(Assigned Repllicas) AR = ISR + OSR(同步时延迟过多的副本) Follower 故障处理 LEO:每个副本的最后⼀个offset,LEO其实就是最新的offset+1 HW:所有副本中最⼩的LEO 1. follower发⽣故障后会被临时踢出ISR 2. 这个期间leader和follower继续接收数据 3. 待改follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log⽂件⾼于HW的部分截取 掉,从HW开始向Leader进⾏同步 4. 等该follower的LEO⼤于等于该Partition的HW后,就可以重新加⼊ISR Leader 故障处理 1. Leader发⽣故障后,会从ISR中选出⼀个新的Leader 2. 为保证多个副本之间的数据⼀致性,其余的Follower会先将各⾃的log⽂件⾼于HW的部分截取掉, 然后从新的Leader同步数据 注意:这只能保证副本之间的数据⼀致性,并不能保证数据不丢失或者不重复 四 Kafka 消费者 1.Kafka 消费⽅式 kafka主要采取的是消费者主动拉取数据的⽅式,因为如果采取推送的⽅式,很难适应所有消费者的消 费速率 坏处:如果kafka没有数据,消费者可能会陷⼊循环之中,⼀直返回空数据 2.消费者组 消费者组(Consumer Group):消费者组,由多个consumer组成,形成⼀个消费者组的条件,是所 有消费者的groupid相同 • 消费者组内每个消费者负责消费不同分区的数据,⼀个分区只能由⼀个组内消费者消费 • 消费者组之间互不影响,所有的消费者都属于某个消费者组,即消费者组是逻辑上的⼀个订阅者 3.消费者组初始化流程 coordinator:辅助实现消费者组的初始化和分区的分配 coordinator节点选择 = groupid 的 hashcode 值 % 50 (__consumer_offset的分区数量) 4.消费者 API 订阅主题 1 package com.lcha.kafka_pro; 2 3 import org.apache.kafka.clients.consumer.ConsumerConfig; 4 import org.apache.kafka.clients.consumer.ConsumerRecord; 5 import org.apache.kafka.clients.consumer.ConsumerRecords; 6 import org.apache.kafka.clients.consumer.KafkaConsumer; 7 import org.apache.kafka.common.serialization.StringDeserializer; 8 9 import java.time.Duration; 10 import java.util.ArrayList; 11 import java.util.Properties; 12 13 public class CustConsumer { 14 public static void main(String[] args) { 15 16 // 0.配置 17 Properties properties = new Properties(); 18 // 链接 19 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx,xxx,xxx"); 20 // 反序列化 21 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeser 22 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDes 23 // 消费者id 24 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); 25 26 // 1.创建⼀个消费者 27 KafkaConsumer kafkaConsumer = new KafkaConsumer<>(proper 28 // 2.定义主题 29 ArrayList topics = new ArrayList<>(); 30 topics.add("first"); 31 kafkaConsumer.subscribe(topics); 32 // 3.消费数据 33 while (true) { 34 ConsumerRecords consumerRecords = kafkaConsumer.poll 35 for (ConsumerRecord consumerRecord : consumerRecords 36 System.out.println(consumerRecord); 37 } 38 } 39 } 40 } 订阅分区 1 package com.lcha.kafka_pro; 2 3 import org.apache.kafka.clients.consumer.ConsumerConfig; 4 import org.apache.kafka.clients.consumer.ConsumerRecord; 5 import org.apache.kafka.clients.consumer.ConsumerRecords; 6 import org.apache.kafka.clients.consumer.KafkaConsumer; 7 import org.apache.kafka.common.TopicPartition; 8 import org.apache.kafka.common.serialization.StringDeserializer; 9 10 import java.time.Duration; 11 import java.util.ArrayList; 12 import java.util.Properties; 13 14 public class CustConsumer { 15 public static void main(String[] args) { 16 17 // 0.配置 18 Properties properties = new Properties(); 19 // 链接 20 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx,xxx,xxx"); 21 // 反序列化 22 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeser 23 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDes 24 // 消费者id 25 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); 26 27 // 1.创建⼀个消费者 28 KafkaConsumer kafkaConsumer = new KafkaConsumer<>(proper 29 // 2.订阅主题对应的分区 30 ArrayList topicsPartitions = new ArrayList<>(); 31 topicsPartitions.add(new TopicPartition("first", 0)); 32 kafkaConsumer.assign(topicsPartitions); 33 // 3.消费数据 34 while (true) { 35 ConsumerRecords consumerRecords = kafkaConsumer.poll 36 for (ConsumerRecord consumerRecord : consumerRecords 37 System.out.println(consumerRecord); 38 } 39 } 40 } 41 } 五 集成 SpringBoot 0.配置⽂件 1 # 链接kafka集群 2 spring.kafka.bootstrap-servers=xxx:xxx,xxx:xxx 3 4 # key value 对应的序列化 5 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.Strin 6 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.Str 7 8 # key value的反序列化 9 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.Str 10 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.S 11 12 # 消费者组id 13 spring.kafka.consumer.group-id=lcha 1.SpringBoot ⽣产者 1 package com.lcha.kafka_pro.controller; 2 3 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.kafka.core.KafkaTemplate; 5 import org.springframework.web.bind.annotation.RequestMapping; 6 import org.springframework.web.bind.annotation.RestController; 7 8 @RestController 9 public class ProducerController { 10 11 @Autowired 12 KafkaTemplate kafka; 13 14 @RequestMapping("/lcha") 15 public String data(String msg){ 16 // 通过kafka发送数据 17 kafka.send("first", msg); 18 return "ok"; 19 } 20 } 2.SpringBoot 消费者 1 package com.lcha.kafka_pro.controller; 2 3 import org.springframework.kafka.annotation.KafkaListener; 4 5 public class KafkaConsumer { 6 7 @KafkaListener(topics = "first") 8 public void consumerTopic(String msg){ 9 System.out.println("收到消息: " + msg); 10 } 11 }