消息队列原理与实战

Entropy Lv4

本文来源于第五届字节跳动青训营活动,已收录到消息队列原理与实战 | 青训营笔记 - 掘金 (juejin.cn) ,主要记录了消息队列的学习

消息队列原理与实战

前言

场景

  • 系统崩溃

  • 服务能力有限

  • 链路耗时长尾

  • 日志存储

解决方案

  • 针对存储行为服务崩溃时,使用消息队列进行解耦
  • 针对高并发服务时,使用消息队列进行削峰
  • 针对链路耗时,使用消息队列进行异步
  • 针对日志存储,使用消息队列进行日志处理

什么是消息队列

消息队列(MQ) ,指保存消息的一个容器,本质是个队列。但这个队列需要支持高吞吐,高并发,高可用。

1.前世今生

消息中间件其实诞生的很早,早在1983年互联网应用还是一片荒芜的年代,有个在美国的印度小哥Vivek就设想了一种通用软件总线,世界上第一个现代消息队列软件The Information Bus(TIB), TIB受到了企业的欢迎,这家公司的业务发展引起了当时最牛气的IT公司IBM的注意,于是他们一开始研发了自己消息队列软件,于是才有了后来的wesphere mq,再后来微软也加入了战团。接近2000年的时候,互联网时代已经初见曙光,全球的应用程序得到了极大地丰富,对于程序之间互联互通的需求越来越强烈,但是各大IT公司之间还是牢牢建立着各种技术壁垒,以此来保证自己的商业利益,所以消息中间件在那个时候是大型企业才能够用的起的高级玩意。 但是时代的洪流不可逆转,有壁垒就有打破壁垒的后来者,2001年sun发布了jms技术,试图在各大厂商的层面上再包装一层统一的java规范。java程序只需要针对jms api编程就可以了,不需要再关注使用了什么样的消息中间件,但是jms仅仅适用于java。2004年AMQP(高级消息队列协议)诞生了,才是真正促进了消息队列的繁荣发展,任何人都可以针对AMQP的标准进行编码。有好的协议指导,再加上互联网分布式应用的迅猛发展成为了消息中间件一飞冲天的最大动力,程序应用的互联互通,发布订阅,最大契合了消息中间件的最初的设计初衷。除了刚才介绍过的收费中间件,后来开源消息中间件开始层出不穷,常见比较流行的有ActiveMQ、RabbitMQ 、Kafak、阿里的RocketMQ,以及目前存算分离的Pulsar,在目前互联网应用中消息队列中间件基本上成为标配。

业界内消息队列对比

目前比较流行的MQ

  • Kafka:分布式的、分区的、多副本的日志提交服务,在高吞吐场景下发挥较为出色。
  • RocketMQ:低延迟、强一致、高性能、高可靠、万亿级容量和灵活的可扩展性,在一些实时场景中运用较广泛。
  • Pulsar:是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体、采用存算分离的架构设计。
  • BMQ:和 Pulsar 架构类似,存算分离,初期定位是承接高吞吐的离线业务场景,逐步替换掉对应的 Kafka 集群。

2.消息队列 - Kafka

2.1 使用场景

  • 日志信息

  • Metrics 数据:搜索服务、直播服务、订单服务、支付服务

  • 用户行为:搜索、点赞、评论、收藏

2.2 如何使用 Kafka

  • 第一步:创建一个 Kafka 集群
  • 第二步:在这个集群中创建一个 Topic,并且设置好分片数量
  • 第三步:引入对应语言的 SDK 编写生产者逻辑,配置好集群和 Topic 等参数,初始化一个生产者,调用 Send 方法,将信息发送出去
  • 第四步:引入对应语言的 SDK 编写消费者逻辑,配置好集群和 Topic 等参数,初始化一个消费者,调用 Poll 方法,接受发送的信息

2.3 基本概念

  • Topic:Kafka 中的逻辑队列,可以理解为每一个不同的业务场景就是一个不同的 Topic,对于这个业务来说,所有的数据都存储在这个 Topic 中。
  • Cluster:Kafka 的物理集群,每个集群中可以新建多个不同的 Topic。
  • Producer:消息的生产者,负责将业务消息发送到 Topic 当中。
  • Consumer:消息的消费者,负责消费已经发送到 Topic 中的消息。
  • Consumer Group:消费者组,不同组 Consumer 消费进度互不干涉。
  • Partition:分区,通常 Topic 会有多个分片,不同分片直接消息是可以并发处理的,能够提高单个 Topic 的吞吐。

Offset:对每一个 partition 来说,每一条消息都有一个唯一的 Offset,消息在 partition 内的相对位置信息,可以理解为唯一 ID,在 partition 内部严格递增。

Replica:分片的副本,分布在不同的机器上,可用来容灾,Leader 对外服务,Follower 异步去拉取 Leader 的数据进行同步,如果 Leader 挂掉了,可以将 Follower 提升成 Leader 再对外进行服务。

ISR:In-Sync Replicas。意思是同步中的副本,对于 Follower 来说,始终和 Leader 是有一定差距的,但是当这个 差距比较小的时候,就可以将这个 Follower 副本加入到 ISR 中,不在 ISR 中的副本是不允许提升成 Leader 的。

每个分片有多个 Replica、Leader,Replica 将会从 ISR 中选出。

2.4 数据复制

Broker 代表每一个 Kafka 的节点,所有的 Broker 节点最终组成了一个集群。多个 Broker 中有一个 Broker 同时扮演了 Controller 的角色,Controller 是整个集群的大脑,负责对副本和 Broker 进行分配。

2.5 Kafka 架构

在集群的基础上,还有一个模块是 ZooKeeper。这个模块存储了集群的元数据信息,比如副本或分区的分配信息等,Controller 计算好的方案都会存放到这个地方。

2.6 一条消息的自述

从一条消息的视角来看完整的处理流程,了解 Kafka 为什么能够支持如此高的吞吐。

思考:如果发送一条消息,等到其成功后再发一条会有什么问题?

2.7 Producer

批量发送

批量发送可以减少 IO 次数,从而加强发送能力。

思考:如果消息量很大,网络带宽不够用,如何解决?

数据压缩

通过压缩减小消息大小,目前支持 Snappy、Gzip、LZ4、ZSTD 压缩算法。

2.8 Broker

  • 数据的存储

    了解数据的存储,可以先从消息文件结构入手,再了解磁盘结构的工作原理。

  • 消息文件结构

    在每一个 Broker 中都分布这不同 Topic 的不同分片。

  • 磁盘结构

    移动磁头找到对应磁道,磁盘转动,找到对应扇区,最后写入。寻道成本比较高,因此顺序写可以减少寻道所带来的时间成本。

    在一个盘面里寻道,磁头 —> 磁道 —> 扇区。

  • 顺序写

    采用顺序写的方式进入写入,可以提高写入效率。

  • 如何找到消息?

    Consumer 通过发送 Fetch Request 请求消息数据,Broker 会指定 Offset 处的消息,按照时间窗口和消息大小窗口发送给 Consumer

    思考:寻找数据这个细节是如何实现的?

  • 偏移量索引文件,文件名是文件中第一条消息的 Offset

    通过二分找到小于目标文件 Offset 的最大索引位置,再遍历找到目标 Offset。

  • 时间戳索引文件

    通过二分找到小于目标时间戳最大的索引位置,在通过寻找 Offset 的方式找到最终数据。和 Offset 相比只是多加了时间戳以及,即通过二分找到时间戳对应的 Offset,再找到相应的文件数据。

  • 传统数据拷贝

    从磁盘空间到内核空间,内核空间到应用空间,再从应用空间到内核空间,进入消费者进程。

  • 零拷贝

    从磁盘空间到内核空间,然后进入消费者进程。

    Consumer从Broker中读取数据,通过sendfile的方式,将磁盘读到os内核缓冲区后,直接转到socket buffer进行网络发送 Producer生产的数据持久化到broker,采用mmap文件映射,实现顺序的快速写入

2.9 Consumer

消息的接收端

如何解决 Partition 在 Consumer Group 中的分配问题?

对于一个Consumer Group来说,多个分片可以并发消费,这样可以大大提高消费的效率,但需要解决 Consumer 和 Partition 的分配问题,也就是对于每一个 Partition 来讲,该由哪一个 Consumer 来消费的问题。对于这个问题,我们一般有两种解决方法,手动分配和自动分配

Low Level

通过手动分配,哪一个 Consumer 消费哪一个 Partition 完全由业务来决定

第一,手动分配,也就是Kafka中所说的 Low Level 消费方式进行消费,这种分配方式的一个好处就是启动比较快,因为对于每一个 Consumer 来说,启动的时候就已经知道了自己应该去消费哪个消费方式。这些 Consumer 再启动的时候就已经知道分配方案了

但这样这种方式存在缺点,想象一下,如果某一个 Consumer 挂掉了可能会停止一部分分片的消费,或者新增了一台Consumer,那又需要停掉整个集群,重新修改配置再上线,保证新增的 Consumer 也可以消费数据,其实上面两个问题,有时候对于线上业务来说是致命的。

因此 Kafka 也提供了另外一种方式。

High Level

Kafka 提供了自动分配的方式,这里也叫做 High Level 的消费方式,简单的来说,就是在 Broker 集群中,对于不同的 Consumer Group 来讲,都会选取一台 Broker 当做 Coordinator,而 Coordinator 的作用就是帮助 Consumer Group 进行分片的分配,也叫做分片的 Rebalance

使用这种方式,如果 Consumer Group 中有发生宕机,或者有新的 Consumer 加入,整个 partition 和 Consumer 都会重新进行分配来达到一个稳定的消费状态

2.10 Consumer Rebalance

相关参考https://www.cnblogs.com/listenfwind/p/12662968.html

小结 - 提高 Kafka 吞吐或者稳定性的功能

  • Producer:批量发送、数据压缩
  • Broker:顺序写,消息索引,零拷贝
  • Consumer:Rebalance

2.11 Kafka - 数据复制问题

对于 Kafka 来说,每一个 Broker 上都有不同 Topic 分区的不同副本,而每一个副本,会将其数据存储到该 Kafka 节点上面,对于不同的节点之间,通过副本直接的数据复制,来保证数据的最终一致性与集群的高可用。

2.12 Kafka - 重启操作

对一个机器进行重启

  • 首先会关闭一个 Broker,此时如果该 Broker 上存在副本的 Leader,那么该副本将发生 Leader 切换,切换到其他节点上并且在 ISR 中的 Follower 副本

  • 而此时,因为数据在不断的写入,对于刚刚关闭重启的 Broker 来说,和新 Leader 之间一定会存在数据的滞后,此时这个 Broker 会追赶数据,重新加入到 ISR 当中

  • 当数据追赶完成之后,需要回切 Leader,这一步叫做 prefer leader,目的是为了避免在一个集群长期运行后,所有的 Leader 都分布在少数节点上导致数据的不均衡

  • 通过上面的一个流程分析,可以发现对于一个 Broker 的重启来说,需要进行数据复制,所以时间成本会比较大

    比如一个节点重启需要10分钟,一个集群有1000个节点,如果该集群需要重启升级,则需要10000分钟,那差不多就是一个星期,这样的时间成本是非常大的。 可能会问,能不能并发多台重启,答案是不能。因为在一个两副本的集群中,重启了两台机器,对某一分片来讲,可能两个分片都在这台机器上面,则会导致该集群处于不可用的状态。这是更不能接受的。

2.13 Kafka - 替换、扩容、缩容

  • 替换,本质上来讲就是一个需要追更多数据的重启操作,因为正常重启只需要追一小部分,而替换,则是需要复制整个leader的数据,时间会更长
  • 扩容,当分片分配到新的机器上以后,也是相当于要从0开始复制一些新的副本
  • 缩容,缩容节点上面的分片也会分片到集群中剩余节点上面,分配过去的副本也会从0开始去复制数据

以上三个操作均有数据复制所带来的时间成本问题,所以对于 Kafka 来说,运维操作所带来的时间成本是不容忽视的

2.14 Kafka - 负载不均衡

场景:同一个 Topic 有4个分片,两副本,对于分片1来说,数据量明显比其他分片要大,当机器 IO 达到瓶颈的时候,可能就需要把第一台 Broker上面的其他的 Partition 迁移到其他负载小的 Broker上面

但数据复制又会引起 Broker 的 IO 升高,所以问题就变成了为了去解决 IO 升高,但解决问题的过程又会带来更高的IO。所以就需要权衡 IO 设计出一个极其复杂的负载均衡策略。

Kafka - 问题总结

  1. 运维成本高
  2. 对于负载不均衡的场景,解决方案复杂
  3. 没有自己的缓存,完全依赖 Page Cache
  4. Controller 和 Coordinator 和 Broker 在同一进程中,大量 IO 会造成其性能下降

3.消息队列 - BMQ

ByteMQ,简称 BMQ,由字节跳动团队自主研发。

3.1 BMQ 简介

BMQ 兼容 Kafka 协议,存算分离,云原生消息队列,初期定位是承接高吞吐的离线业务场景,逐步替换掉对应的 Kafka 集群。

Producer —> Consumer —> Proxy —> Broker —> HDFS —> Controller —> Coordinator —> Meta

这里着重强调 Proxy 和 Broker 无状态,为与下面的运维进行比较,简单说明存算分离,适配 Kafka 协议以及不选择 Pulsar 的原因

3.2 运维操作对比

具体操作KafkaBMQ
重启需要数据复制,分钟级重启重启后可直接对外服务,秒级完成
替换需要数据复制,分钟级替换,甚至天级别替换后可直接对外服务,秒级完成
扩容需要数据复制,分钟级扩容,甚至天级别扩容后可直接对外服务,秒级完成
缩容需要数据复制,分钟级缩容,甚至天级别缩容后可直接对外服务,秒级完成

实际上对于所有节点变更的操作,都仅仅只是集群元数据的变化,通常情况下都能秒级完成,而真正的数据已经移到下层分布式文件存储去了,所以运维操作不需要额外关心数据复制所带来的时间成本

3.3 HDFS 写文件流程

同一个副本是由多个 segment 组成,BMQ 对于单个文件写入的机制,首先客户端写入前会选择一定数量的 DataNode,这个数量是副本数,然后将一个文件写入到三个节点上,切换到下一个 segment 之后,又会重新选择三个节点进行写入。这样一来,对于单个副本的所有 segment 来讲,会随机地分配到分布式文件系统的整个集群中。

3.4 BMQ 文件结构

对于 Kafka 分片数据的写入,是先在 Leader 上面写好文件,然后同步到 Follower 上,所以同一个副本的所有 Segment 都在同一台机器上面。会存在之前说到的单分片过大导致负载不均衡的问题,但在 BMQ 集群中,因为对于单个副本来讲,是随机分配到不同的节点上面的,因此不会存在 Kafka 的负载不均问题。

3.5 Broker

Partition 状态机

其实对于写入的逻辑来说,还有一个状态机的机制,用来保证不会出现同一个分片在两个 Broker 上同时启动的情况,另外也能够保证一个分片的正常运行。

首先 Controller 做好分片的分配之后,如果在该 Broker 分配到了 Broker,首先会 start 这个分片,然后进入 Recover 状态,这个状态主要有两个目的

  • 第一个目的是获取分片写入权利,即对于 hdfs 来讲,只会允许一个分片进行写入,只有拿到这个权利的分片才能写入。

  • 第二个目的是如果上次分片是异常中断的,没有进行 save checkpoint,这里会重新进行一次 save checkpoint,然后就进入了正常的写流程状态,创建文件,写入数据,到一定大小之后又开始建立新的文件进行写入。

写文件流程
  • 数据校验:CRC , 参数是否合法

  • 校验完成后,会把数据放入Buffer中,通过一个异步的Write Thread线程将数据最终写入到底层的存储系统当中

    这里有一个地方需要注意一下,就是对于业务的写入来说,可以配置返回方式,可以在写完缓存之后直接返回,另外也可以数据真正写入存储系统后再返回,对于这两个来说前者损失了数据的可靠性,带来了吞吐性能的优势,因为只写入内存是比较快的,但如果在下一次 flush 前发生宕机了,这个时候数据就有可能丢失,后者的话,因为数据已经写入了存储系统,这个时候也不需要担心数据丢失,相应的来说吞吐就会小一些

Thread 的具体逻辑

  • 首先会将 Buffer 中的数据取出来,调用底层写入逻辑,在一定的时间周期上去 flush, flush 完成后开始建立 Index,也就是 offset 和 timestamp 对应消息具体位置的映射关系

  • Index建立好以后,会 save 一次 checkpoint,即表示 checkpoint 后的数据是可以被消费的,试想一下,如果没有 checkpoint 的情况下会发生什么问题,如果 flush 完成之后宕机,index 还没有建立,这个数据是不应该被消费的

  • 最后当文件到达一定大小之后,需要建立一个新的 segment 文件来写入

写文件 Failover

建立一个新的文件,会随机挑选与副本数量相当的数据节点进行写入,但如果此时挑选节点中有一个出现了问题,导致不能正常写入了,应该怎么处理?

可以重新找正常的节点创建新的文件进行写入,这样也就保证了写入的可用性。

3.6 Proxy

  • 首先 Consumer 发送一个 Fetch Request,然后会有一个 Wait 流程,那么他的作用是什么?

    想象一个 Topic,如果一直没有数据写入,那么此时 consumer 就会一直发送 Fetch Request,如果 Consumer 数量过多,BMQ 的 server端 是扛不住这个请求的。

    因此设置了一个等待机制,如果没有 fetch 到指定大小的数据,那么 proxy 会等待一定的时间,再返回给用户侧,这样也就降低了 fetch 请求的 IO 次数,经过 wait 流程后,会到 Cache 里面去寻找是否有存在想要的数据,如果有直接返回,如果没有,再开始去存储系统当中寻找,首先会 Open 这个文件,然后通过 Index 找到数据所在的具体位置,从这个位置开始读取数据

3.7 多机房部署

为什么需要多机房部署?

其实对于一个高可用的服务,除了要防止单机故障所带来的的影响意外,也要防止机房级故障所带来的影响,比如机房断点,机房之间网络故障等等。

BMQ 的多机房部署:Proxy —> Broker —> Meta —> HDFS

3.8 BMQ - 高级特性

泳道 —> Databus —> Mirror —> Index —> Parquet

3.9 泳道消息

开发流程:开发 —> BOE —> PPE —> Prod

BOE:Bytedance Offline Environment,是一套完全独立的线下机房环境

PPE:Product Preview Environment,即产品预览环境

BOE 测试:多个人同时测试,需要等待上一个人测试完成。每多一个测试人员,都需要重新搭建一个相同配置的 Topic,造成人力和资源的浪费。

PPE 验证:对于 PPE 的消费者来说,资源没有生产环境多,所以无法承受生产环境的流量。解决主干泳道流量隔离问题以及泳道资源重复创建问题。

3.10 Databus

直接使用原生 SDK 的问题

  1. 客户端配置较为复杂
  2. 不支持动态配置,更改配置需要停掉服务
  3. 对于 latency 不是很敏感的业务,batch 效果不佳

使用Databus 的优点

  1. 简化消息队列客户端复杂度
  2. 解耦业务与 Topic
  3. 缓解集群压力,提高吞吐

3.11 Mirror

思考:是否可以通过多机房部署的方式,解决跨 Region 读写的问题?

使用 Mirror 通过最终一致的方式,解决跨 Region 读写问题。

3.12 Index

思考:如何通过写入的 LogId、UserId 或者其他的业务字段进行消息的查询?

直接在 BMQ 中将数据结构化,配置索引 DDL,异步构建索引后,通过 Index Query 服务读出数据。

3.13 Parquet

Apache Parquet 是 Hadoop 生态圈中一种新型列式存储格式,它可以兼容 Hadoop 生态圈中大多数计算框架(Hadoop、Spark等),被多种查询引擎支持(Hive、Impala、Drill 等)。

  • 有行式存储和列式存储

  • 直接在 BMQ 中将数据结构化,通过 Parquet Engine,可以使用不同的方式构建 Parquet 格式文件

小结

  1. BMQ 的框架模型(解决Kafka 存在的问题)
  2. BMQ 的读写流程(Failover 机制,写入状态机)
  3. BMQ 高级特性(泳道、Databus、Mirror、Index、Parquet)

4.消息队列 - RocketMQ

使用场景:针对电商业务线,其业务涉及广泛,如注册、订单、库存、物流等;同时,也会涉及许多业务峰值时刻,如秒杀活动、周年庆、定期优惠等。

4.1 RocketMQ 基本概念

名称KafkaRocketMQ
逻辑队列TopicTopic
消息体MessageMessage
标签Tag
分区PartitionConsumer Queue
生产者ProducerProducer
生产者集群Producer Group
消费者ConsumerConsumer
消费者集群Consumer GroupConsumer Group
集群控制器ControllerNameserver

Producer,Consumer,Broker这三个部分,Kafka 和 RocketMQ 是一样的,而 Kafka 中的 Partition 概念在这里叫做 Consumer Queue。

4.2 RocketMQ 架构

  • 数据流也是通过 Producer 发送给 Broker 集群,再由 Consumer 进行消费

  • Broker 节点有 Master 和 Slave 的概念

  • NameServer 为集群提供轻量级服务发现和路由

4.3 存储模型

对于一个 Broker 来说所有的消息的会 append 到一个 CommitLog 上面,然后按照不同的 Queue,重新 Dispatch 到不同的 Consumer 中,这样 Consumer 就可以按照 Queue 进行拉取消费

但需要注意的是,这里的 Consumer Queue 所存储的并不是真实的数据,真实的数据其实只存在 CommitLog 中,这里存的仅仅是这个 Queue 所有消息在 CommitLog 上面的位置,相当于是这个 Queue 的一个密集索引

4.4 RocketMQ - 高级特性

事务场景:事务消息

延迟发送:延迟消息

处理失败:消费重试和死信队列

具体参考RocketMQ 中文官网

小结

  1. RocketMQ 的基本概念(Queue、Tag)
  2. RocketMQ 的底层原理(架构模型、存储模型)
  3. RocketMQ 的高级特性(事务消息、重试和死信队列,延迟队列)

总结

  • 前世今生:消息队列的发展历程
  • Kafka:基本概念、架构设计、底层原理、架构缺点
  • BMQ:架构设计、底层原理、与 Kafka 比较、高级特性
  • RocketMQ:架构设计、底层原理、高级特性

参考资料

再过半小时,你就能明白kafka的工作原理了

详细解析kafka之 kafka消费者组与重平衡机制

RocketMQ 中文官网

  • 标题: 消息队列原理与实战
  • 作者: Entropy
  • 创建于 : 2023-02-10 18:17:22
  • 更新于 : 2023-04-01 07:55:52
  • 链接: https://www.entropy-tree.top/2023/02/10/golang-day13/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
评论