kafka消息模型

Scroll Down

消息引擎系统

在设计一个消息引擎系统时需要考虑的两个重要因素:
1、消息设计
2、传输协议设计

消息设计

消息引擎在设计消息时一定要考虑语义的清晰和格式上的通用性。一条消息要有能够完整清晰表达业务的能力,他不能是含糊不清,语义不明甚至无法处理的。同时,为了更好地表达语义以及最大限度地提高通用性,消息通常都采用结构化的方式进行设计。比如SOAP协议中的消息就采用了XML格式,而Web Service中也支持了JSON格式的消息。kafka的消息是用二进制方式来保存的,但是依然是结构化的消息。
可以发现,不论是使用XML、JSON、二进制表示,抑或是其他自定义的结构化类型,消息主题本身一般都是结构化的数据,这给后续消息引擎系统的处理带来了极大的便利。

传输协议设计

这是消息引擎系统中更为关键的部分–如何设计消息传输的协议。从狭义的角度来说,消息传输协议指定了消息在不同系统之间传输的方式。从广义的角度来说,这类协议可能包括任何能够在不同系统间传输消息或是执行语义操作的协议或框架。
kafka自己设计了一套二进制的消息传输协议,而没有采用诸如Google PB这样的框架。

消息引擎范型

最常见的两种消息引擎范型是消息队列和发布/订阅模型。消息队列(message queue)模型是基于队列提供消息传输服务的,多用于进程间通信,以及线程间通信。该模型定义了消息队列(queue)、发送者(sender)和接收者(receiver),提供了一种点对点的消息传递方式,即发送者发送每条消息到队列的指定位置,接收者从指定位置获取消息。一旦消息被消费,就会从队列中排除该消息。每条消息由一个发送者生产出来,且只能被一个消费者处理–发送者和消费者之间是一对一的关系。
kafka-note1.drawio
而另一种模型就是发布/订阅模型(pub/sub),与前一种模型不同,它有主题(topic)的概念:一个topic可以理解为逻辑语义相近的消息容器。这种模型也定义了类似于生产者/消费者这样的角色,即发布者和订阅者。发布者将消息生产出来发送到指定的topic中,所有订阅了该topic的订阅者都可以接收到该topic下的所有消息。通常具有相同订阅topic的所有订阅者将接收同样的消息。
kafka-note001.drawio
显然kafka必须要同时支持这两种消息引擎模型。

kafka概要设计

重点部分
kafaka的设计初衷就是为了解决互联网公司超大量级数据的实时传输。为了实现这个目标,kafka在设计之初就需要考虑以下4个方面的问题。

  • 吞吐量/延时
  • 消息持久化
  • 负载均衡和故障迁移
  • 伸缩性

吞吐量/延时

对于任何一个消息引擎来说,吞吐量(throughput)都是至关重要的性能指标。通常来说,吞吐量是某种处理能力的最大值。而对于kafka而言,它的吞吐量就是每秒能够处理的消息数或者每秒能够处理的字节数。很显然,我们自然希望消息引擎的吞吐量越大越好。

消息引擎系统还有一个名为延时的性能指标。它衡量的是一段时间间隔,可能是发出某个操作与接收到操作相应之间的时间,或者是在系统中导致某些物理变更的起始时刻与变更正式生效之间的间隔。对于kafka而言,延时可以表示客户端发起请求与服务端处理请求并发送响应给客户端之间的这一段时间。显而易见,延时时间隔越短越好。

kafka是通过如下方式做到高吞吐量、低延时的。

首先,kafka的写入操作是很快的,这主要得益于它对磁盘的使用方法的不同。虽然kafka会持久化所有数据到磁盘,但本质上每次写入操作其实都只是把数据写入到操作系统的页缓存(page cache)中,然后由操作系统自行决定什么时候把页缓存中的数据写回到磁盘上。这样的设计有3个主要优势。

  • 操作系统的页缓存是在内存中分配的,所以消息写入的速度非常快。
  • kafka不必直接与底层文件系统打交道。所有繁琐的IO操作都交由操作系统来处理。
  • kafka写入操作采用追加写入的方式,避免了磁盘随机写操作。

对于普通的物理磁盘(非固态硬盘)而言,我们总是认为磁盘的读写是很慢的。事实上普通SAS磁盘随机读写的吞吐量的确是很慢的,但是磁盘的顺序读写操作其实是非常快的,它的速度甚至可以匹敌内存的随机IO速度。

鉴于这个事实,kafka在设计时采用了追加写入消息的方式,即只能在日志文件末尾追加写入新的消息,且不允许修改已写入的消息,因此它属于典型的磁盘顺序访问操作,所以kafka消息发送的吞吐量很高的。在实际使用过程中可以很轻松地做到每秒写入几万甚至几十万条消息。

kafka的消费端是如何做到高吞吐量、低延时的。之前提到kafka是把消息写入操作系统的页缓存中的。那么同样地,kafka在读取消息时会首先尝试从OS的页缓存中读取,如果命中便把消息经页缓存直接发送到网络Socket上。这个过程就是利用Linux平台的sendFile系统调用做到的,而这种技术就是零拷贝(zero copy)技术。

传统的Linux操作系统中的IO接口是依托于数据拷贝来实现的,但在零拷贝技术出现之前,一个IO操作会将同一份数据进行多次拷贝。数据传输过程还涉及内核态与用户态的上下文切换,CPU开销非常大,因此极大地限制了OS高效进行数据传输的能力。零拷贝技术很好地改善了这个问题,首先在内核驱动程序处理IO数据的时候,它不再需要进行上下文切换,节省了内核缓冲区与用户态应用程序之间的数据拷贝,同时它用直接存储器访问技术(DMA)执行IO操作,因此避免了OS内核缓冲区之间的数据拷贝。故而得名零拷贝。
Linux提供了sendFile系统调用实现了这种零拷贝技术,而kafka的消息消费机制使用的就是sendFile-严格来说是通过Java的FileChannel.transferTo方法实现的。
除了零拷贝技术,kafka由于大量使用页缓存,故读取消息时大部分消息很有可能依然保存在页缓存中,因此可以直接命中缓存,不用穿透到底层的物理磁盘上获取消息,从而极大地提升了消息读取的吞吐量。事实上,如果我们监控一个经过良好调优的kafka生产集群便可以发现,即使那些有负载的kafka服务器,其磁盘的读操作也很少,这是因为大部分的消息读取操作会直接命中页缓存。

总结一下,kafka就是依靠下列4点达到了高吞吐量、低延时的设计目标的:

  • 大量使用操作系统页缓存,内存操作速度快且命中率高。
  • kafka不直接参与物理IO操作,而是交由最擅长此事的操作系统来完成的。
  • 采用追加写入的方式,摒弃了缓慢的磁盘随机读/写操作。
  • 使用sendFile为代表的零拷贝技术加强网络间的数据传输效率。

消息持久化

kafka是要持久化消息的,而且要把消息持久化到磁盘上。这样做的好处如下。

  • 解耦消息发送与消息消费:从本质上来说,kafka最核心的功能就是提供了生产者-消费者模式的完整解决方案。通过将消息持久化使得生产者不再需要和消费者方耦合。它只是简单地把消息生产出来并交由kafka服务器保存即可,因此提升了整体的吞吐量。
  • 实现灵活的消息处理:很多kafka的下游子系统(接收kafka消息的系统)都有这样的需求-对于已经处理过的消息可能在未来的某个时间点重新处理一次,即所谓的消息重演。消息持久化便可以很方便地实现这样的需求。

负载均衡和故障转移

作为一个功能完备的分布式系统,kafka如果只提供了最基本的消息引擎功能肯定不足以帮助他脱颖而出。一套完备的消息引擎中必然要提供负载均衡(load balancing)和故障转移(fail over)功能。
负载均衡就是让系统的负载根据一定的规则均衡地分配在所有参与工作的服务器上,从而最大限度地提升系统整体的运行效率。
kafka实现负载均衡实际上是通过智能化的分区领导者选举来实现的。除了负载均衡,完备的分布式系统还需要支持故障转移。所谓故障转移,是指当服务器意外中止时,整个集群可以快速地检测到该失效(failure),并立即将该服务器上的应用或服务自动转移到其他服务器上。故障转移通常是以心跳或会话的机制来实现的,即只要主服务器与备份服务器之间的心跳无法维持或主服务器注册到服务中心的会话超时过期了,那么就认为主服务器已无法正常运行,集群会自动启动某个备份服务器来替代主服务器的工作。
kafka服务器支持故障转移的方式就是使用会话机制。每台kafka服务器启动后会以会话的形式把自己注册到zookeeper服务器上,一旦该服务运转出现问题,与zookeeper的会话便不能维持从而超时失效,此时kafka集群会选举出另一台服务器来完全代替这台服务器继续提供服务。

伸缩性

伸缩性表示向分布式系统中增加额外的计算资源时吞吐量提升的能力。阻碍线性扩容的一个很常见的因素就是状态的保存。我们知道,不论是哪类分布式系统,集群中的每台服务器一定会维护很多内部状态。如果由服务器自己来保存这些状态信息,则必须要处理一致性问题。相反,如果服务器是无状态的,状态的保存和管理交于专门的协调服务来做(比如zookeeper),那么整个集群的服务器之间就无须繁重的状态共享,这极大地降低了维护复杂度。倘若要扩容集群节点,只需要简单地启动新的节点机器进行自动负载均衡就可以了。
kafka正是采用了这样的思想,每台kafka服务器上的状态统一交由zookeeper保管。扩展kafka集群也只需要一步:启动新的kafka服务器即可。当然这里需要言明的是,在kafka服务器上并不是所有的状态都不保存,它只保存了很轻量级的内部状态,因此在整个集群间维护状态一致性是很低的。

kafka基本概念和术语

kafka架构图如下:
kafka-note002.drawio
如果总结起来就是3部分:

  • 生产者发送消息给kafka服务器。
  • 消费者从kafka服务器读取消息。
  • kafka服务器依托zookeeper集群进行服务的协调管理。

首先,kafka是分布式的集群。一个集群可能由一台或多台机器组成。而在kafka集群中保存的每条消息都归属于一个topic。

消息

消息由消息头部、key和value组成。消息头部包括消息的CRC码、消息版本号、属性、时间戳、键长度和消息体长度。
主要下面3个属性:

  • key:消息键,对消息做partition时使用,即决定消息被保存在某topic下的哪个partition。
  • value:消息体,保存实际的消息数据。
  • timestamp:消息发送时间戳,用于流式处理及其他依赖时间的处理语义。如果不指定则取当前时间。

topic和partition

从概念上说,topic只是一个概念,代表了一类消息,也可以认为是消息被发送到的地方。通常我们可以使用topic来区分实际业务,比如业务A使用一个topic。业务B使用另外一个topic。
kafka中的topic通常会被多个消费者订阅,因此处于性能的考量,kafka并不是topic message的两级结构,而是采用了topic-partition-message的三级结构来分散负载。从本质上来讲每个kafka topic都是由若干个partition组成。
topic是由多个partition组成的,而kafka的partition是不可修改的有序消息序列,也可以说是有序的消息日志。每个partition有自己专属的partition号,通常是从0开始的。用户对partition唯一能做的操作就是在消息序列的尾部追加写入消息。partition上的每条消息都会被分配一个唯一的序列号-按照kafka的术语来说,该序列号被称为位移(offset)。该位移值是从0开始顺序递增的整数。位移信息可以唯一定位到某partition下的一条消息。
值得一提的是,kafka的partition实际上并没有太多的业务含义,它的引入就是单纯地为了提升系统的吞吐量,因此在创建kafka topic的时候可以根据集群实际配置设置具体的partition数,实现整体性能的最大化。

offset

前面说过,topic partition下的每条消息都被分配了一个位移值。实际上,kafka消费者端也有位移(offset)的概念,但一定要注意这两个offset属于不同的概念。
显然,每条消息在某个partition的位移是固定的,但是消费该partition的消费者的位移会随着消费进度不断前移,但终究不可能超过该分区最新一条消息的位移。
综合之前说的topic、partition和offset,我们可以断言kafka中一条消息其实就是<topic、partition、offset>三元组,通过该元组值我们可以在kafka集群中找到唯一对应的那条消息。

replica

partition是有序消息日志,那么一定不能只保存这一份日志,否则一旦保存partition的kafka服务器挂掉了,其上保存的消息也就丢失了。分布式系统必然要实现高可靠性,而目前实现的途径还是依靠冗余机制-简单地说,就是备份多份日志。这些备份日志在kafka中被称为副本(replica),它存在的唯一目的就是防止数据丢失。
副本分为两类:领导者副本(leader replica)和追随者副本(follower replica)。follower replica是不能提供服务给客户端的,也就是说不负责响应客户端发来的消息写入和消息消费请求。它只是被动地向领导者副本(leader replica)获取数据,而一旦leader replica所在的broker宕机,kafka会从剩余的replica中选举出新的leader继续提供服务。

leader和follower

在这类leader和follower系统中通常只有leader对外提供服务,follower只是被动地追随leader的状态,保持与leader的同步。follower存在的唯一价值就是充当leader的候补:一旦leader挂掉立即就会有一个追随者被选举成为一个新的leader接替它的工作。
kafka保证同一个partition的多个replica一定不会分配在同一台broker上。

producer

producer概览

简单来说,kafka producer就是负责向kafka写入数据的应用程序。kafka producer在设计上要不consumer简单一些,因为它不涉及复杂的组管理操作,即每个producer都是独立工作的,与其他的producer实例之间没有关联。目前producer的首要功能就是向某个topic的某个分区发送一条消息,所以它首先需要确认到底要向topic的哪个分区写入消息-这就是分区器要做的事情。kafka producer提供了一个默认的分区器。对于每条待发送的消息而言,如果该消息指定了key,则partition会根据key的哈希值来选择目标分区;若这条消息没有指定key,则partition使用轮询方式确认目标分区-这样可以最大限度地确保消息在所有分区上的均匀性。当然producer的API赋予了用户自行指定目标分区的权力,即用户可以在消息发送时跳过partition直接指定要发送的分区。
另外,producer也允许用户实现自定义的分区策略而非使用默认的partition,这样用户可以很灵活地根据自身的业务需求确定不同的分区策略。
在确认了目标的分区后,producer要做的第二件事就是要寻找这个分区对应的leader,也就是该分区leader副本所在的kafka broker。在下图中,producer首先使用一个线程(用户主线程,也就是用户启动producer的线程)将待发送的消息封装进一个ProducerRecord类实例,然后将其序列化之后发送给partition,再由后者确定了目标分区后一同发送到位于producer程序中的一块内存缓冲区中。而producer的另一个工作线程(IO发送线程,也称sender线程)则负责实时地从该缓冲区中提供出准备就绪的消息封装进一个批次(batch),统一发送给对应的broker。整个producer的工作流程大概就是这样的。
kafka-note003.drawio

consumer

consumer概览

kafka消费者是从kafka读取数据的应用。若干个consumer订阅kafka集群中的若干个topic并从kafka接收属于这些topic的信息。

消费者组(consumer group)

重点部分
消费者使用一个消费者组名(即group.id)来标记自己,topic的每条消息都只会被发送到每个订阅它的消费者组的一个消费者实例上。

这句话给出了3个非常重要的信息:

  • 一个consumer group可能有若干个consumer实例(一个group只有一个实例也是允许的);
  • 对于同一个group而言,topic的每条消费只能被发送到group下的一个consumer实例上;
  • topic消息可以被发送到多个group中。

kafka同时支持基于队列和基于发布/订阅的两种消息引擎模型。事实上kafka就是通过consumer group实现的对这两种模型支持。

  • 所有consumer实例都属于相同group-实现基于队列的模型。每条消息都只会被一个consumer实例处理。
  • consumer实例都属于不同group-实现基于发布/订阅的模型。极端情况下是每个consumer实例都设置完全不同的group,这样kafka消息就会被广播到所有consumer实例上。

位移(offset)

需要明确的是,这里的offset指代的是consumer端的offset,与分区日志中的offset是不同的含义。每个consumer都会为它消费的分区维护属于自己的位置信息来记录当前消费了多少条消息。这在kafka中有一个特定的术语:位移(offset)。很多消息引擎都把消费端的offset保存在服务器端(broker)。这样做的好处当然是实现简单,但会有以下3个方面的问题。

  • broker从此变成了有状态的,增加了同步成本,影响伸缩性。
  • 需要引入应答机制来确认消费成功
  • 由于要保存许多consumer的offset,故必然引入复杂的数据结构,从而造成了不必要的资源浪费。

kafka选择了另一种方式,让consumer group保存offset,那么只需要简单地保存一个长整型数据就可以了,同时kafka consumer还引入了检查点机制定期对offset进行持久化,从而简化了应答机制的实现。

位移提交

consumer客户端需要定期地向kafka集群汇报自己消费数据的进度,这一过程被称为位移提交(offset commit)。

消费者组重平衡

消息轮询

归根结底,kafka的consumer是用来读取消息的,而且要能够同时读取多个topic的多个分区的消息。若要实现并行的消息读取,一种方法是使用多线程的方式,为每个要读取的分区都创建一个专门的线程去消费,另一种方法是采用类似LinuxIO模型的poll或select等,使用一个线程来同时管理多个Socket连接,即同时与多个broker通信实现消息的并行读取。
一旦consumer订阅了topic,所有的消费逻辑包括coordinate的协调、消费者组的Rebalance以及数据的获取都会在主逻辑poll方法的一次调用中被执行。这样用户很容易使用一个线程来管理所有的consumerIO操作。
截止目前新版本的consumer是一个多线程或者说是一个双线程的Java进程-创建一个kafkaConsumer的线程被称为用户主线程,同时consumer在后台会创建一个心跳线程,该线程被称为后台心跳线程。kafkaConsumer的poll方法在用户主线程中运行。这也同时表明,消费者组执行Rebalance、消息获取、coordinate管理、异步任务结果的处理甚至位移提交等操作都是运行在用户主线程中的。
consumer订阅topic之后通常以时间循环的方式来获取订阅方案并开启消息读取。每次poll方法返回的都是订阅分区上的一组消息,当然如果某些分区没有准备好,某次poll返回的就是空的消息集合。
poll方法根据当前consumer的消费位移返回消息集合。当poll首次被调用时,新的消费者组会被创建,并根据对应的位移重设策略来设定消费者组的位移。一旦consumer开始提交位移,每个后续的Rebalance完成后都会将位置设置为上次已提交的位移。传递给poll方法的超时设定参数用于控制consumer等待消息的最大阻塞时间。由于某些原因,broker端有时候无法立即满足consumer端的获取请求(比如consumer要求至少一次获取1MB的数据,但broker端无法立即全部给出),那么此时consumer端将会阻塞以等待数据不断积累并最终满足consumer需求。如果用户不想让consumer一直处于阻塞状态,则需要给定一个超时时间。因此poll方法返回满足一下任意一个条件即可返回。

  • 要么获取了足够多的可用数据。
  • 要么等待时间超过了指定的超时设置。

位移管理

consumer端需要为每个它要读取的分区保存消费进度,即分区中当前最新消费消息的位置。该位置就被成为位移(offset)。consumer需要定期地向kafka提交自己的位置信息,实际上,这里的位移值通常是下一条待消费的消息的位置。假设consumer已经读取了某个分区中的第N条消息,那么它应该提交位移值为N,因为位移是从0开始的,位移为N的消息是第N+1条消息。这样下次consumer重启时会从第N+1条消息开始消费。总而言之,offset就是consumer端维护的位置信息。
offset对于consumer非常重要,因为它是实现消息交付语义保证的基石。常见的3种语义保证如下:

  • 最多一次,消息可能丢失,但不会重复处理。
  • 最少一次,消息不会丢失,但可能被处理多次。
  • 精确一次,消息一定会被处理且只会被处理一次。

kafka默认的是最少一次。
consumer会在kafka集群的所有broker中选择一个broker作为consumer group的coordinate,用于实现组成员管理、消费分配方案制定以及提交位移等。为每个组选择对应的coordinate的依据_consumer_offsets。和普通的kafka配置有多个分区,每个分区有多个副本。它存在的唯一目的就是保存consumer提交的位移。
当消费者首次启动时,由于没有初始的位移信息,coordinate必须为其确定初始位移值,这就是consumer参数auto.offset.reset的作用。通常情况下,consumer要么从最早的位移开始读取,要么从最新的位移开始读取。
当consumer运行了一段时间之后,它必须要提交自己的位移值。如果consumer崩溃或被关闭,它负责的分区就会被分配给其他consumer,因此一定要在其他consumer读取这些分区前就做好位移提交工作,否则就会出现消息的重复消费。
consumer提交位移的主要机制是通过向所属的coordinate发送位移请求来实现的。每个位移提交请求都会往_consumer_offsets对应分区上追加写入一条消息。消息的key是group.id、topic的分区的元组,而value就是位移值。如果consumer为同一个group的同一个topic分区提交了多次位移,那么_consumer_offsets对应的分区上就会有若干条key相同但value不同的消息,但显然我们只关心最新一次提交的那条消息。
位移提交策略对于提供消息交付语义至关重要。默认情况下,consumer是自动提交位移的,自动提交间隔是5秒。这就是说若不做特定的设置,consumer程序后台提交位移。通过设置auto.commit.interval.ms参数可以控制自动提交间隔。
所谓的手动位移提交就是用户自行确定消息何时被真正处理完并可以提交位移。在一个典型的consumer应用场景中,用户需要对poll方法返回的消息集合中的消息执行业务级的处理。用户想要确保只有消息被真正处理完成后再提交位移。如果使用自动位移提交则无法保证这种时序性,因此在这种情况下必须使用手动提交位移。设置使用手动提交位移非常简单,仅仅需要在构建kafkaConsumer时设置enable.auto.commit=false,然后调用commitSync或commitAsync方法即可。

重平衡(rebalance)

重点部分
consumer group的rebalance本质上是一组协议,它规定了一个consumer group是如何达成一致来分配订阅topic的所有分区的。假设某个组下有20个实例,该组订阅了一个有着100分区的topic。正常情况下kafka会为每个consumer平均分配5个分区。这个分配过程就被称为rebalance。当consumer成功地执行rebalance后,组订阅topic的每个分区只会分配给组内的一个consumer实例。
新版本的consumer使用了kafka内置的一个全新的组协调协议。对于每个组而言,kafka的某个broker会被选举为组协调者。coordinate负责对组的状态进行管理,它的主要职责就是当新成员到达时促成组内所有成员达成新的分区分配方案,即coordinate负责对组执行rebalance操作。
组rebalance触发的条件有以下3个。

  • 组成员发生变更,比如新的consumer加入组,或已有consumer主动离开组,再或是已有consumer崩溃时则触发rebalance。
  • 组订阅topic数发生变化,比如使用基于正则表达式的订阅,当正则表达式的新topic被创建时则会触发rebalance。
  • 组订阅topic的分区数发生变更,比如使用命令行脚本增加了订阅的topic的分区数。

kafka新版本consumer默认提供了3种分配策略,分别是range策略、round-robin策略和sticky策略。
所谓的分配策略决定了订阅topic的每个分区会被分配给哪个consumer。range策略主要是基于范围的思想。它将单个topic的所有分区按照顺序排列,然后把这些分区划分成固定大小的分区段并依次分配给每个consumer;round-robin策略则会把所有topic的所有分区顺序摆开,然后轮询式地分配给各个consumer。最新发布的sticky策略有效地避免了上述两种策略完全无视历史分配方案的缺陷,采用了有粘性的策略对所有consumer实例进行分配,可以规避极端情况下的数据倾斜并且在两次rebalance间最大限度地维持了之前的分配方案。
通常意义尚认为,如果group下所有的consumer实例的订阅是相同的,那么使用round-robin会带来更公平的分配方案,否则使用range策略的效果更好。
rebalance流程
consumer group在执行rebalance之前必须先确定coordinator所在的broker,并创建与该broker相互通信的Socket连接。确定coordinator的算法与确定offset被提交到_consumer_offsets目标分区的算法是相同的。算法如下:

  • 计算Math.abs(groupID.hashcode)%offsets.topic.num.partitions参数值(默认是50)。
  • 寻找_consumer_offsets分区10的leader副本所在的broker,该broker即为这个group的coordinator。

成功连接coordinator之后便可以执行rebalance操作。目前rebalance主要分为两步:加入组合同步更新分配方案。

  • 加入组:这一步组内所有的consumer向coordinator发送JoinGroup请求。当收集全JoinGroup请求后,coordinator从中选择一个consumer担任group的leader,并把所有成员信息以及它们的订阅信息发送给leader。特别需要注意的是,group的leader和coordinator不是一个概念。leader是某个consumer实例,coordinator通常是kafka集群中的一个broker。另外leader而非coordinator负责为整个group的所有成员指定分配方案。
  • 同步更新分配方案:这一步中leader开始指定分配方案,即根据前面提到的分配策略决定每个consumer都负责哪些topic的哪些分区。一旦分配完成,leader会把这个分配方案封装进SyncGroup请求并发送给coordinator。

kafka的设计原理

broker端设计架构

broker是kafka最重要的组件,本质上它是一个功能载体,承载了绝大多数的kafka服务。一个broker通常是以服务器的形式出现的,对用户而言,broker的主要功能就是持久化消息以及将消息队列中的消息从发送端传输到消费端。
一个kafka分区本质上就是一个备份日志,即利用多份相同的备份共同提供冗余机制来保持系统的高可用性。这些备份在kafka中被称为副本(replica)。kafka把分区的所有副本均匀地分配到所有broker上,并从这些副本中挑选一个作为leader副本对外提供服务,而其他副本被称为follower副本,只能被动地向leader副本请求数据,从而保持与leader副本的同步。
假如leader副本永远工作正常,那么其实不需要follower副本。但是kafka leader副本所在的broker可能因为各种各样的原因而随时宕机。一旦发生这种情况,follower副本会竞争成为新leader。显然不是所有的follower都有资格去竞选leader。前面说过,follower被动地向leader请求数据。对于那些落后的leader进度太多的follower而言,它们是没有资格竞选leader的,毕竟它们手中握有的数据太旧了,如果允许它们成为leader,会造成数据丢失,而这对clients而言是灾难性的。鉴于这个原因,kafka引入了ISR概念。
所谓ISR,就是kafka集群动态维护一组同步副本集合(in-sync replicas)。每个topic分区都会有自己的ISR列表,ISR中的所有副本都与leader保持同步状态。值得注意的是,leader副本总是包含在ISR中的,只有ISR中的副本才有资格被选举为leader。而producer写入的一条kafka消息只有被ISR中的所有副本都接收到,才被视为已提交的状态。由此可见,若ISR中有N个副本,那么该分区中最多可以忍受N-1个副本崩溃而不丢失已提交消息。
1、follower副本同步
重点部分
follower副本只做一件事情:向leader副本请求数据。
kafka-replica-note-001.drawio

图中比较重要的位移信息如下:
1、起始位移(base offset):表示该副本当前所含第一条消息的offset。
2、高水印值(high watermark,HW):副本高水印值。它保存了该副本最新一条已提交消息的位移。leader分区的HW值决定了副本中已提交消息的范围,也确定了consumer能够获取的消息的上限,超过HW值的所有消息都被视为未提交成功的,因而consumer是看不到的。另外值得注意的是,不是只有leader副本才有HW值。实际上每个follower副本都有HW值,只不过只有leader副本的HW值才能决定clients能看到的消息数量。
3、日志末端位移(log end offset,LEO):副本日志中下一条待写入消息的offset。所有副本都需要维护自己的LEO信息。每当leader副本接收到producer端推送的消息,它会更新自己的LEO(通常是加1)。同样,follower副本向leader副本请求到数据后也会增加自己的LEO。事实上只有ISR中的所有副本都更新了对应的LEO之后。leader副本才会向右移动HW值表明消息写入成功。

比如,现在有一个producer向broker1所在的leader副本发送了一条消息,接下来会发生什么呢?
1、broker1上的leader副本接收到消息,把自己的LEO值更新为1。
2、broker2和broker3上的follower副本各自发送请求给broker1。
3、broker1分别把该消息推送给follower副本。
4、follower副本接收到消息后各自更新自己的LEO为1。
5、leader副本接收到其他follower副本的数据请求响应(response)之后,更新HW值为1,。此时位移为0这条消息可以被consumer消费。
对于设置了ack=-1的producer而言,只有完整的做完上面所有的5步操作,producer才能正常返回,这也标志着这条消息发送成功。
2、ISR设计
kafka提供了一个参数replica.lag.max.message,用于控制follower副本落后leader副本的额消息数。一旦超过这个消息数,则视为该follower为不同步状态,从而需要被kafka踢出ISR。
初始状态下所有follower副本都是和leader副本同步的,所有follower都能追上leader的LEO。现在假设producer生产了1条消息给leader,而broker3上的follower副本经历了一次full GC。此时更新之后leader的LEO不再与HW值相等,但最新生产的这条消息不会被认为已提交,除非broker3上的follower副本被踢出ISR或者追上leader的LEO。由于replica.lag.max.messages被设置为4,而broker3上的follower只落后1条消息,并不满足不同步条件,因此不会从LSR中移除。对于broker3上的副本而言,只需要追上leader的LEO。如果我们假设broker3在执行full GC停顿了100毫秒之后重新追上了leader的进度,那么此时leader的LEO和HW相等。
新版本之后,kafka去掉了replica.lag.max.messages参数,改用统一的参数同时检测由于慢以及进程卡壳而导致之后-即follower副本落后leader副本的时间间隔。这个唯一的参数就是replica.lag.time.max.ms,默认是10秒。对于请求速度追不上的情况,监测机制也发生了变化0如果一个follower副本落后leader的时间持续性地超过这个参数值,那么该follower副本就是不同步的。这样即使出现刚刚提到的producer瞬时流量的值,只要follower不是持续性的落后,它就不会反复地在ISR中进出。
在kafka中,水印表示的就是位置信息,即位移(offset)。
一个kafka分区下通常存在多个副本(replica)用于实现数据冗余,进一步实现高可用性。副本根据角色不同分为如下3类:

  • leader副本:响应clients段读写请求的副本。
  • follower副本:被动地备份leader副本
  • ISR副本集合:包含leader副本和所有与leader副本保持同步的follower副本。

每个kafka副本对象都持有两个重要的属性:日志末端位移(log end offset,LEO)和高水印(HW)。注意是所有的副本,而不只是leader副本。以下是这两个属性的解释。

  • LEO:日志末端位移,记录了该副本对象底层日志文件中下一条消息的位移值。
  • HW:高水印值,任何一个副本对象的HW值一定不大于其LEO值,而小于或等于HW值的所有消息被认为是已提交的或已备份的。

如果把LEO和HW看作两个指针,那么它们定位的机制是不同的:任意时刻,HW指向的是实实在在的消息,而LEO总是指向下一条待写入消息,也就是说LEO指向的位置上是没有消息的。

LEO更新机制
follower副本只是被动地向leader副本请求数据,具体表现为follower副本不停地向leader副本所在的broker发送FETCH请求,一旦获取消息,便写入自己的日志中进行备份。
那么follower副本的LEO是何时更新的呢?严格来说,kafka设计了两套follower副本LEO属性:一套LEO值保存在follower副本所在broker的缓存上;另一套LEO值保存在leader副本所在broker的缓存上,leader副本所在机器的缓存上保存了该分区下所有follower副本的LEO属性值。

1、follower副本端的follower副本LEO何时更新?
follower副本端的follower副本LEO值就是指该副本对象底层日志的LEO值,也就是说,每当新写入一条消息时,其LEO值就会加1.在follower发送FETCH请求后,leader将数据返回给follower,此时follower开始向底层log写数据,从而自动更新其LEO值。
2、leader副本端的follower副本LEO何时更新?
leader副本端的follower副本LEO的更新发生在leader处理follower FETCH请求时。一旦leader接收到follower发送的fetch请求,它首先会从自己的log中读取相应的数据,但是在给follower返回数据之前它先去更新follower的LEO。

HW更新机制
follower更新HW发生在其更新LEO之后,一旦follower向log写完数据,它就会尝试更新HW值。具体算法就是比较当前LEO值与FETCH响应中leader的HW值,取两者的小者作为新的HW值。

在以下四种情况下,leader会尝试更新分区的HW值

  • 副本称为leader副本时:当某个副本成为分区的leader副本,kafka会尝试更新分区HW。
  • broker出现崩溃导致副本被踢出ISR时:若有broker崩溃,则必须查看是否会波及此分区,因此检查分区HW值是否需要更新是由必要的。
  • producer向leader副本写入消息时:因为写入消息会更新leader的LEO,故有必要再查看HW值是否也需要更新。
  • leader处理follower FETCH请求时:当leader处理follower的FETCH的请求时,首先会从底层log读取数据,之后再尝试更新分区HW值。

日志存储

日志记录按照被写入的顺序保存,读取日志以从左到右的方式进行。每条记录都会被分配一个唯一的且顺序增加的记录号作为定位消息的唯一标识。
日志中记录的排序通常按照时间排序,即位于日志左边部分的记录发生时间通常要小于位于右边部分的记录。
kafka的日志设计都是以分区为单位的,即每个分区都有它自己的日志,该日志被称为分区日志(partition log)。producer生产kafka消息时需要确定该消息被发送到的分区,然后kafka broker把该消息写入该分区对应的日志中。
具体对于每个日志而言,kafka又将其进一步细分成日志段文件以及日志段索引文件。可以这样说,每个分区日志都是由若干组日志段文件+索引文件构成的。
创建topic时,kafka为该topic的每个分区在文件系统中创建了一个对应的子目录,名字就是topic-分区号。
索引文件
除了.log文件,kafka分区日志还包含两个特殊的文件.index和.timeindex,它们都是索引文件,分别被称为位移索引文件和时间戳索引文件。前者可以帮助broker更快地定位记录所在的物理文件位置,而后者则是根据给定的时间戳查找对应的位移信息。

它们都属于稀疏索引文件(spare index file),每个索引文件都由若干条索引项(index entry)组成。kafka不会为每条消息记录都保存对应的索引项,而是待写入若干条记录后才增加一个索引项。

不论是位移索引文件还是时间戳索引文件,它们中的索引项都按照某种规律进行升序排列。对于位移索引文件而言,它是按照位移顺序保存的;而时间戳索引文件则严格按照时间戳顺序保存。由于有了这种升序规律,kafka可以利用二分法查找算法来搜寻目标索引项,从而降低了整体时间复杂度到O(logN)。
时间戳索引项保存的是时间戳与位移的映射关系。给定时间戳之后根据次索引文件只能找到不大于该时间戳的最大位移,稍后kafka还需要拿着返回的位移再去位移索引文件中定位真实的物理文件位置。该索引文件中的时间戳一定是按照升序排列的。