消息队列
什么是消息队列
可以把消息队列看作是一个存放消息的容器,当我们需要使用消息的时候,直接从容器中取出消息供自己使用即可。
由于队列 Queue 是一种先进先出的数据结构,所以消费消息时也是按照顺序来消费的。
参与消息传递的双方称为 生产者 和 消费者 ,生产者负责发送消息,消费者负责处理消息。
消息队列的作用是什么(优点)
- 异步处理:提高系统性能(减少响应所需时间)
- 削峰/限流:把请求数据先存入消息队列中,消费系统再根据自己的消费能力拉取消费。
- 降低系统耦合性。
使用消息队列会带来哪些问题(缺点)
- 系统可用性降低: 需要考虑消息丢失或者说 MQ 挂掉等等的情况
- 系统复杂性提高: 需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题
- 一致性问题: 消息的真正消费者并没有正确消费消息,导致数据不一致
Kafka
Kafka 是什么
Kafka 是一个分布式流式处理平台。
Kafka 是一个多分区、多副本且基于zookeeper协调的分布式消息系统。
Kafka 应用场景有哪些
- 消息队列 :建立实时流数据管道,以可靠地在系统或应用程序之间获取数据。
- 数据处理: 构建实时的流数据处理程序来转换或处理数据流
Kafka的优势
会经常拿它跟 RocketMQ、RabbitMQ 对比:
高吞吐量:Kafka 每秒可以生产约 25 万消息(50 MB),每秒处理 55 万消息(110 MB)
持久化数据存储:将消息持久化到磁盘,防止数据丢失
分布式系统易于扩展:所有的 producer、broker 和 consumer 都会有多个,均为分布式的。无需停机即可扩展机器。
Kafka和RabbitMQ的区别
- 数据处理模型:Kafka是一个分布式发布-订阅消息系统,以高吞吐量和持久性为目标。RabbitMQ是一个基于AMQP(高级消息队列协议)的消息中间件,它使用队列模型来处理消息。
- 处理方式:Kafka通过分区和复制机制来实现高吞吐量和高可靠性。RabbitMQ使用先进先出(FIFO)的队列模型来处理消息,按照顺序逐一传递和消费消息。
- 数据保证:Kafka提供的是至少一次交付的语义,即消息不会丢失,但可能会重复。RabbitMQ提供的是恰好一次交付的语义,即消息不会丢失也不会重复。它使用确认机制和消息确认应答来确保消息的可靠性。
- 适用场景:Kafka适用于处理高吞吐量和持久化的流式数据,而RabbitMQ适用于可靠的点对点通信和任务队列。
什么是Producer、Consumer、Broker、Topic、Partition
- Producer(生产者) : 产生消息的一方。
- Consumer(消费者) : 消费消息的一方。
- Broker(代理) : 可以看作是一个独立的 Kafka 实例。多个Broker 组成一个 Kafka 集群。
每个 Broker 中又包含了 Topic 以及 Partition :
- Topic(主题) : Producer 将消息发送到特定的主题,Consumer 通过订阅特定的 Topic(主题) 来消费消息。
- Partition(分区) : Partition 属于 Topic 的一部分。一个 Topic 可以有多个 Partition
Kafka 中的 Partition(分区) 实际上可以对应成为消息队列中的队列。
Zookeeper 在 Kafka 中的作用是什么
Broker 注册 :每个 Broker 在启动时,都会到 Zookeeper 上进行注册,创建属于自己的节点。每个 Broker 就会将自己的 IP 地址和端口等信息记录到该节点中去
Topic 注册 : 同一个Topic 的消息会被分成多个分区,并将其分布在多个 Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护。
负载均衡 : Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力。 对于同一个 Topic 的不同 Partition,Kafka 会尽力将这些 Partition 分布到不同的 Broker 服务器上。当生产者产生消息后也会尽量投递到不同 Broker 的 Partition 里面。当 Consumer 消费的时候,Zookeeper 可以根据当前的 Partition 数量以及 Consumer 数量来实现动态负载均衡。
Kafka如何削峰
- 增加分区:增加分区数量可以提高系统的并发处理能力,从而更好地分散负载,减轻压力。
- 提高消费者组的消费能力:多个消费者可以同时消费同一个主题的不同分区,从而提高处理速度。
- 批量发送和批量接收:在生产者端,可以将多个消息进行批量发送,减少网络开销。在消费者端,可以设置合适的拉取批次大小,以一次拉取多个消息,减少频繁的网络请求。
- 弹性扩展:在系统负载过高时,可以采用弹性扩展的方式,动态增加Kafka集群的节点或消费者的实例数量,以应对高负载情况。
- 参数设置(限流)
生产者端参数和策略:
(1)batch.size 和 linger.ms:适当增大batch.size
参数和linger.ms
参数的值,将多个消息进行批量发送。这样可以减少请求次数和网络传输开销,提高发送效率。
(2)max.block.ms:设置生产者在发送缓冲区满时的阻塞时间。如果设置了较小的值,当发送缓冲区已满时,生产者将被阻塞一段时间,直到有足够空间去发送消息。这种方式可以在消息高峰期时,通过控制发送速率来缓解压力。
消费者端参数和策略:
(1)max.poll.records:控制每次拉取的最大记录数。通过调整该参数,限制每次消费者获取消息的数量,从而控制消费压力。
(2)fetch.min.bytes 和 fetch.max.wait.ms:通过适当调整fetch.min.bytes
和fetch.max.wait.ms
参数,控制消费者的拉取频率。较大的fetch.min.bytes
值可以减少拉取的次数,较大的fetch.max.wait.ms
值可以延长拉取的等待时间,从而降低消费者的请求次数。
如何保证Kafka消息队列的高可用
- 多分区(Partition)
创建一个 topic 可以划分为多个 partition,每个 partition 可以存在于不同的 broker 上。就是说一个 topic 的数据,是分散放在多个机器上的,每个机器就放一部分数据。
- 多副本(Replica)
每个 partition 的数据都会同步到其他机器上,形成自己的多个 replica 副本。
分区(Partition)中的多个副本之间会有一个叫做 leader 的,其他副本称为 follower。发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。
写的时候,leader 会负责把数据同步到所有 follower 上去,读的时候就直接读 leader 上数据即可。Kafka 会均匀的将一个 partition 的所有 replica 分布在不同的机器上,这样才可以提高容错性。
这两种基准极大地提高了消息存储的安全性, 保证了kafka消息队列的高可用,不过也相应的增加了所需要的存储空间。
Kafka 的分区策略有哪些
所谓分区策略就是决定生产者将消息发送到哪个分区的算法。
- 轮询策略:默认的分区策略,非常优秀的负载均衡表现,总是能保证消息最大限度地被平均分配到所有分区上;
- 随机策略:实现随机策略版的 partition 方法;
- 按消息键保序策略:可以保证同一个 Key 的所有消息都进入到相同的分区里,由于每个分区下的消息处理是有顺序的,所以称之为消息键保序策略;
Kafka 如何保证消息的消费顺序
Topic 有多个 Partition,从不同的Partition中消费消息可能满足不了我们需要的顺序
1、1 个 Topic 只对应一个 Partition
Kafka 只能为我们保证一个 Partition(分区) 中的消息有序。
消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。Kafka 通过偏移量(offset)来保证消息在分区内的顺序性。
2 、发送消息的时候指定 key/Partition
Kafka 中发送 1 条消息的时候,可以指定 topic, partition, key,data(数据) 4 个参数。如果你发送消息的时候指定了 Partition 的话,所有消息都会被发送到指定的 Partition。并且,同一个 key 的消息可以保证只发送到同一个 partition,这个我们可以采用表/对象的 id 来作为 key 。
Kafka 如何保证消息不丢失
生产者丢失消息
生产者(Producer) 调用send
方法发送消息之后,消息可能因为网络问题并没有发送过去。
Kafka 生产者(Producer) 使用 send
方法发送消息实际上是异步的操作,在项目中采用了ListenableFuture类接收返回值,为其添加回调函数,记录发送日志,发现送失败,检查失败的原因之后重新发送。设置生产者的retries
(重试次数),一般是 3,设置重发时间间隔
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));
或者设置acks = all
acks 是 Kafka 生产者(Producer) 很重要的一个参数。
acks 的默认值即1,当我们配置 acks = all 表示只有所有副本收到消息时,生产者才会接收到来自服务器的响应。但是延迟高
消费者丢失消息
消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。偏移量(offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量(offset)可以保证消息在分区内的顺序性。
当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset,如果消费者在正式消费之前突然挂掉了,消息实际上没有消费,但是 offset 却被自动提交了。kafka就会认为这个消息已经消费过了,造成了消息丢失。
这个情况解决办法是:关闭自动提交 offset,每次在真正消费完消息之后再自己手动提交 offset 。
enable-auto-commit: false
手动提交 :ack.acknowledge();
但是,这样会带来消息被重新消费的问题。比如刚刚消费完消息之后,还没提交 offset,结果自己挂掉了,那么这个消息理论上就会被消费两次。
Kafka 丢失消息
考虑到:假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,但是 leader 的数据还有一些没有被 follower 副本的同步的话,就会造成消息丢失。
解决办法:
设置 acks = all
acks 是 Kafka 生产者(Producer) 很重要的一个参数。
acks 的默认值即1,当我们配置 acks = all 表示只有所有副本收到消息时,生产者才会接收到来自服务器的响应。
设置 replication.factor >= 3
这样就可以保证每个 分区(partition) 至少有 3 个副本。虽然造成了数据冗余,但是带来了数据的安全性。
设置 min.insync.replicas > 1
代表消息至少要被写入到 2 个副本才算是被成功发送。min.insync.replicas 的默认值为 1 ,在实际生产中应尽量避免默认值 1。
设置 unclean.leader.election.enable = false
当 leader 副本发生故障时不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。
如何保证消息不被重复消费
kafka出现消息重复消费的原因:服务端侧已经消费的数据没有成功提交 offset(根本原因)
解决方案:消费消息服务做幂等校验,比如 Redis 的set、MySQL 的主键等天然的幂等功能。保证重复消费的消息也会得出正常结果
Redis 的set
- 每个消息对应一个唯一id
- 在消费者receive函数中,先对这个id校验,是否存在redis中
- 如果存在,说明该消息已经被消费过了,不用再执行
- 如果不存在,说明消息没有被消费,将id放入redis,然后消费这个消息
幂等校验的其他方法:
1、token 机制
- 客户端会先发送一个请求去获取 token,服务端会生成一个全局唯一的 ID 作为 token 保存在 redis 中,同时把这个 ID 返回给客户端
- 客户端第二次调用业务请求的时候必须携带这个 token
- 服务端会校验这个 token,如果校验成功,则执行业务,并删除 redis 中的 token
- 如果校验失败,说明 redis 中已经没有对应的 token,则表示重复操作,直接返回指定的结果给客户端
2、基于 mysql 主键唯一
利用 mysql 唯一索引的特性。
- 建立一张去重表,其中某个字段需要建立唯一索引
- 客户端去请求服务端,服务端会将这次请求的一些信息插入这张去重表中
- 因为表中某个字段带有唯一索引,如果插入成功,证明表中没有这次请求的信息,则执行后续的业务逻辑
- 如果插入失败,则代表已经执行过当前请求,直接返回
Kafka时间轮片是什么
Kafka中的时间轮片是用于处理延迟消息的一种机制。
时间轮片的概念基于时间轮(time wheel),时间轮是一种计时器算法,用于处理定时任务或延迟任务。Kafka中的时间轮由一组时间轮片组成,每个时间轮片代表一个时间单位,例如1秒、1分钟等。
当生产者发送一个延迟消息时,Kafka会将该消息放入对应的时间轮片中。随着时间的推移,时间轮不断转动,当一个时间轮片到达指定的时间单位后,其中的消息就可以被消费者消费了。消费者会定期检查时间轮,并处理到期的消息。
时间轮片提供了一种高效的方式来管理和触发到期的延迟消息。对于某些需要处理定时任务或具有延迟要求的场景非常有用,例如实时日志处理、事件驱动架构等。
大量消息在 MQ 里长时间积压,该如何解决
一般这个时候,只能临时紧急扩容了,具体操作步骤和思路如下:
- 先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉;
- 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量;
- 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue;
- 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据;
- 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
Zookeeper
谈下你对 Zookeeper 的认识?
ZooKeeper 是一个分布式的,开源的分布式应用程序协调服务
ZooKeeper主要服务于分布式系统,可以用ZooKeeper来做:统一配置管理、统一命名服务、分布式锁、集群管理、负载均衡、分布式协调与通知、数据发布/订阅等
Zookeeper 有哪些功能
1. 集群管理:监控节点存活状态、运行请求等;
2. 主节点选举:主节点挂掉了之后可以从备用的节点开始新一轮选主,主节点选举说的就是这个选举的过程,使用 Zookeeper 可以协助完成这个过程;
3. 分布式锁:Zookeeper 提供两种锁:独占锁、共享锁。
4. 命名服务:在分布式系统中,通过使用命名服务,客户端应用能够根据指定名字来获取资源或服务的地址,提供者等信息
谈下你对 ZAB 协议的了解
ZAB 协议是为分布式协调服务 Zookeeper 专门设计的一种支持崩溃恢复的原子广播协议。
ZAB 协议包括两种基本的模式:崩溃恢复和消息广播。
崩溃恢复(选主)
当整个 Zookeeper 集群刚刚启动或者Leader服务器宕机、重启或者网络故障导致不存在过半的服务器与 Leader 服务器保持正常通信时,所有服务器进入崩溃恢复模式,首先选举产生新的 Leader 服务器,然后集群中 Follower 服务器开始与新的 Leader 服务器进行数据同步。(超半数投票)
消息广播(同步)
当集群中超过半数机器与该 Leader 服务器完成数据同步之后,退出恢复模式进入消息广播模式,Leader 服务器开始接收客户端的事务请求,生成事物提案来进行事务请求处理。