Kafka


消息队列

什么是消息队列

可以把消息队列看作是一个存放消息的容器,当我们需要使用消息的时候,直接从容器中取出消息供自己使用即可。

由于队列 Queue 是一种先进先出的数据结构,所以消费消息时也是按照顺序来消费的。

参与消息传递的双方称为 生产者 和 消费者 ,生产者负责发送消息,消费者负责处理消息。

消息队列的作用是什么(优点)

  1. 异步处理:提高系统性能(减少响应所需时间)
  2. 削峰/限流:把请求数据先存入消息队列中,消费系统再根据自己的消费能力拉取消费。
  3. 降低系统耦合性。

使用消息队列会带来哪些问题(缺点)

  • 系统可用性降低: 需要考虑消息丢失或者说 MQ 挂掉等等的情况
  • 系统复杂性提高: 需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题
  • 一致性问题: 消息的真正消费者并没有正确消费消息,导致数据不一致

Kafka

Kafka 是什么

Kafka 是一个分布式流式处理平台。​

Kafka 是一个多分区、多副本且基于zookeeper协调的分布式消息系统。

Kafka 应用场景有哪些

  1. 消息队列 :建立实时流数据管道,以可靠地在系统或应用程序之间获取数据。
  2. 数据处理: 构建实时的流数据处理程序来转换或处理数据流

Kafka的优势

会经常拿它跟 RocketMQ、RabbitMQ 对比:

  • 高吞吐量:Kafka 每秒可以生产约 25 万消息(50 MB),每秒处理 55 万消息(110 MB)

  • 持久化数据存储:将消息持久化到磁盘,防止数据丢失

  • 分布式系统易于扩展:所有的 producer、broker 和 consumer 都会有多个,均为分布式的。无需停机即可扩展机器。

Kafka和RabbitMQ的区别

  1. 数据处理模型:Kafka是一个分布式发布-订阅消息系统,以高吞吐量和持久性为目标。RabbitMQ是一个基于AMQP(高级消息队列协议)的消息中间件,它使用队列模型来处理消息。
  2. 处理方式:Kafka通过分区和复制机制来实现高吞吐量和高可靠性。RabbitMQ使用先进先出(FIFO)的队列模型来处理消息,按照顺序逐一传递和消费消息。
  3. 数据保证:Kafka提供的是至少一次交付的语义,即消息不会丢失,但可能会重复。RabbitMQ提供的是恰好一次交付的语义,即消息不会丢失也不会重复。它使用确认机制和消息确认应答来确保消息的可靠性。
  4. 适用场景:Kafka适用于处理高吞吐量和持久化的流式数据,而RabbitMQ适用于可靠的点对点通信和任务队列。

什么是Producer、Consumer、Broker、Topic、Partition

  • Producer(生产者) : 产生消息的一方。
  • Consumer(消费者) : 消费消息的一方。
  • Broker(代理) : 可以看作是一个独立的 Kafka 实例。多个Broker 组成一个 Kafka 集群。

每个 Broker 中又包含了 Topic 以及 Partition :

  • Topic(主题) : Producer 将消息发送到特定的主题,Consumer 通过订阅特定的 Topic(主题) 来消费消息。
  • Partition(分区) : Partition 属于 Topic 的一部分。一个 Topic 可以有多个 Partition

Kafka 中的 Partition(分区) 实际上可以对应成为消息队列中的队列。

Zookeeper 在 Kafka 中的作用是什么

  • Broker 注册 :每个 Broker 在启动时,都会到 Zookeeper 上进行注册,创建属于自己的节点。每个 Broker 就会将自己的 IP 地址和端口等信息记录到该节点中去

  • Topic 注册 : 同一个Topic 的消息会被分成多个分区,并将其分布在多个 Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护。

  • 负载均衡 : Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力。 对于同一个 Topic 的不同 Partition,Kafka 会尽力将这些 Partition 分布到不同的 Broker 服务器上。当生产者产生消息后也会尽量投递到不同 Broker 的 Partition 里面。当 Consumer 消费的时候,Zookeeper 可以根据当前的 Partition 数量以及 Consumer 数量来实现动态负载均衡。

Kafka如何削峰

  1. 增加分区:增加分区数量可以提高系统的并发处理能力,从而更好地分散负载,减轻压力。
  2. 提高消费者组的消费能力:多个消费者可以同时消费同一个主题的不同分区,从而提高处理速度。
  3. 批量发送和批量接收:在生产者端,可以将多个消息进行批量发送,减少网络开销。在消费者端,可以设置合适的拉取批次大小,以一次拉取多个消息,减少频繁的网络请求。
  4. 弹性扩展:在系统负载过高时,可以采用弹性扩展的方式,动态增加Kafka集群的节点或消费者的实例数量,以应对高负载情况。
  5. 参数设置(限流)
    生产者端参数和策略:
    (1)batch.size 和 linger.ms:适当增大 batch.size 参数和 linger.ms 参数的值,将多个消息进行批量发送。这样可以减少请求次数和网络传输开销,提高发送效率。
    (2)max.block.ms:设置生产者在发送缓冲区满时的阻塞时间。如果设置了较小的值,当发送缓冲区已满时,生产者将被阻塞一段时间,直到有足够空间去发送消息。这种方式可以在消息高峰期时,通过控制发送速率来缓解压力。
    消费者端参数和策略:
    (1)max.poll.records:控制每次拉取的最大记录数。通过调整该参数,限制每次消费者获取消息的数量,从而控制消费压力。
    (2)fetch.min.bytes 和 fetch.max.wait.ms:通过适当调整 fetch.min.bytesfetch.max.wait.ms 参数,控制消费者的拉取频率。较大的 fetch.min.bytes 值可以减少拉取的次数,较大的 fetch.max.wait.ms 值可以延长拉取的等待时间,从而降低消费者的请求次数。

如何保证Kafka消息队列的高可用

  1. 多分区(Partition)

创建一个 topic 可以划分为多个 partition,每个 partition 可以存在于不同的 broker 上。就是说一个 topic 的数据,是分散放在多个机器上的,每个机器就放一部分数据。

  1. 多副本(Replica)

每个 partition 的数据都会同步到其他机器上,形成自己的多个 replica 副本。

分区(Partition)中的多个副本之间会有一个叫做 leader 的,其他副本称为 follower。发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。

写的时候,leader 会负责把数据同步到所有 follower 上去,读的时候就直接读 leader 上数据即可。Kafka 会均匀的将一个 partition 的所有 replica 分布在不同的机器上,这样才可以提高容错性。

这两种基准极大地提高了消息存储的安全性, 保证了kafka消息队列的高可用,不过也相应的增加了所需要的存储空间。

Kafka 的分区策略有哪些

所谓分区策略就是决定生产者将消息发送到哪个分区的算法。

  1. 轮询策略:默认的分区策略,非常优秀的负载均衡表现,总是能保证消息最大限度地被平均分配到所有分区上;
  2. 随机策略:实现随机策略版的 partition 方法;
  3. 按消息键保序策略:可以保证同一个 Key 的所有消息都进入到相同的分区里,由于每个分区下的消息处理是有顺序的,所以称之为消息键保序策略;

Kafka 如何保证消息的消费顺序

Topic 有多个 Partition,从不同的Partition中消费消息可能满足不了我们需要的顺序

1、1 个 Topic 只对应一个 Partition

Kafka 只能为我们保证一个 Partition(分区) 中的消息有序。

消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。Kafka 通过偏移量(offset)来保证消息在分区内的顺序性。

2 、发送消息的时候指定 key/Partition

Kafka 中发送 1 条消息的时候,可以指定 topic, partition, key,data(数据) 4 个参数。如果你发送消息的时候指定了 Partition 的话,所有消息都会被发送到指定的 Partition。并且,同一个 key 的消息可以保证只发送到同一个 partition,这个我们可以采用表/对象的 id 来作为 key 。

Kafka 如何保证消息不丢失

生产者丢失消息

生产者(Producer) 调用send方法发送消息之后,消息可能因为网络问题并没有发送过去。

Kafka 生产者(Producer) 使用 send 方法发送消息实际上是异步的操作,在项目中采用了ListenableFuture类接收返回值,为其添加回调函数,记录发送日志,发现送失败,检查失败的原因之后重新发送。设置生产者的retries (重试次数),一般是 3,设置重发时间间隔

ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
        ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));

或者设置acks = all

acks 是 Kafka 生产者(Producer) 很重要的一个参数。

acks 的默认值即1,当我们配置 acks = all 表示只有所有副本收到消息时,生产者才会接收到来自服务器的响应。但是延迟高

消费者丢失消息

消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。偏移量(offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量(offset)可以保证消息在分区内的顺序性。

当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset,如果消费者在正式消费之前突然挂掉了,消息实际上没有消费,但是 offset 却被自动提交了。kafka就会认为这个消息已经消费过了,造成了消息丢失。

这个情况解决办法是:关闭自动提交 offset,每次在真正消费完消息之后再自己手动提交 offset 。

enable-auto-commit: false

手动提交 :ack.acknowledge();

但是,这样会带来消息被重新消费的问题。比如刚刚消费完消息之后,还没提交 offset,结果自己挂掉了,那么这个消息理论上就会被消费两次。

Kafka 丢失消息

考虑到:假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,但是 leader 的数据还有一些没有被 follower 副本的同步的话,就会造成消息丢失。

解决办法:

设置 acks = all

acks 是 Kafka 生产者(Producer) 很重要的一个参数。

acks 的默认值即1,当我们配置 acks = all 表示只有所有副本收到消息时,生产者才会接收到来自服务器的响应。

设置 replication.factor >= 3

这样就可以保证每个 分区(partition) 至少有 3 个副本。虽然造成了数据冗余,但是带来了数据的安全性。

设置 min.insync.replicas > 1

代表消息至少要被写入到 2 个副本才算是被成功发送。min.insync.replicas 的默认值为 1 ,在实际生产中应尽量避免默认值 1。

设置 unclean.leader.election.enable = false

当 leader 副本发生故障时不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。

如何保证消息不被重复消费

kafka出现消息重复消费的原因:服务端侧已经消费的数据没有成功提交 offset(根本原因)

解决方案:消费消息服务做幂等校验,比如 Redis 的set、MySQL 的主键等天然的幂等功能。保证重复消费的消息也会得出正常结果

Redis 的set

  • 每个消息对应一个唯一id
  • 在消费者receive函数中,先对这个id校验,是否存在redis中
  • 如果存在,说明该消息已经被消费过了,不用再执行
  • 如果不存在,说明消息没有被消费,将id放入redis,然后消费这个消息

幂等校验的其他方法:

1、token 机制

  1. 客户端会先发送一个请求去获取 token,服务端会生成一个全局唯一的 ID 作为 token 保存在 redis 中,同时把这个 ID 返回给客户端
  2. 客户端第二次调用业务请求的时候必须携带这个 token
  3. 服务端会校验这个 token,如果校验成功,则执行业务,并删除 redis 中的 token
  4. 如果校验失败,说明 redis 中已经没有对应的 token,则表示重复操作,直接返回指定的结果给客户端

2、基于 mysql 主键唯一

利用 mysql 唯一索引的特性。

  1. 建立一张去重表,其中某个字段需要建立唯一索引
  2. 客户端去请求服务端,服务端会将这次请求的一些信息插入这张去重表中
  3. 因为表中某个字段带有唯一索引,如果插入成功,证明表中没有这次请求的信息,则执行后续的业务逻辑
  4. 如果插入失败,则代表已经执行过当前请求,直接返回

Kafka时间轮片是什么

Kafka中的时间轮片是用于处理延迟消息的一种机制。

时间轮片的概念基于时间轮(time wheel),时间轮是一种计时器算法,用于处理定时任务或延迟任务。Kafka中的时间轮由一组时间轮片组成,每个时间轮片代表一个时间单位,例如1秒、1分钟等。

当生产者发送一个延迟消息时,Kafka会将该消息放入对应的时间轮片中。随着时间的推移,时间轮不断转动,当一个时间轮片到达指定的时间单位后,其中的消息就可以被消费者消费了。消费者会定期检查时间轮,并处理到期的消息。

时间轮片提供了一种高效的方式来管理和触发到期的延迟消息。对于某些需要处理定时任务或具有延迟要求的场景非常有用,例如实时日志处理、事件驱动架构等。

大量消息在 MQ 里长时间积压,该如何解决

一般这个时候,只能临时紧急扩容了,具体操作步骤和思路如下:

  1. 先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉;
  2. 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量;
  3. 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue;
  4. 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据;
  5. 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。

Zookeeper

谈下你对 Zookeeper 的认识?

ZooKeeper 是一个分布式的,开源的分布式应用程序协调服务

ZooKeeper主要服务于分布式系统,可以用ZooKeeper来做:统一配置管理、统一命名服务、分布式锁、集群管理、负载均衡、分布式协调与通知、数据发布/订阅等

Zookeeper 有哪些功能

1. 集群管理:监控节点存活状态、运行请求等;

2. 主节点选举:主节点挂掉了之后可以从备用的节点开始新一轮选主,主节点选举说的就是这个选举的过程,使用 Zookeeper 可以协助完成这个过程;

3. 分布式锁:Zookeeper 提供两种锁:独占锁、共享锁。

4. 命名服务:在分布式系统中,通过使用命名服务,客户端应用能够根据指定名字来获取资源或服务的地址,提供者等信息

谈下你对 ZAB 协议的了解

ZAB 协议是为分布式协调服务 Zookeeper 专门设计的一种支持崩溃恢复的原子广播协议。

ZAB 协议包括两种基本的模式:崩溃恢复和消息广播。

崩溃恢复(选主)

当整个 Zookeeper 集群刚刚启动或者Leader服务器宕机、重启或者网络故障导致不存在过半的服务器与 Leader 服务器保持正常通信时,所有服务器进入崩溃恢复模式,首先选举产生新的 Leader 服务器,然后集群中 Follower 服务器开始与新的 Leader 服务器进行数据同步。(超半数投票)

消息广播(同步)

当集群中超过半数机器与该 Leader 服务器完成数据同步之后,退出恢复模式进入消息广播模式,Leader 服务器开始接收客户端的事务请求,生成事物提案来进行事务请求处理。


文章作者: Aiaa
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Aiaa !
  目录