中间件Kafka已经使用好多年,一直没有时间来详细梳理其中原理,先从写入kafka开始,这里有几个概念需要理解;

基础名词概念

生产者:产生消息体,写入kafka;

Broker:用于接收生产者和消费者请求的服务器,同时作为存储消息体的物理存储;

topic:消息主题,逻辑上存储某一类消息的标记;

分区:topic对应的消息写到物理磁盘时为分摊磁盘压力增加写入QPS,将消息写入不同的磁盘区域,一个消息写入磁盘的区域的个数为分区数量;

消费者:消息中间件读取消息的程序;

消费者组:从消息中间件读取消息时按不同分组设置偏移量offset;

代理:一般来说,这里的代理都是负载,用于节点之间的负载均衡,转发来自生产者或者消费者的请求;

副本:简单来说就是另一个存储文件,比如mysql的只读库就可以看成一个副本,一般使用副本主要有三个方面的优势,能够提供数据的冗余,在宕机或者磁盘损坏的时候,及时恢复数据,能够提供横向的扩展,通过增加机器(类似于mysql的只读库)来提高读操作的吞吐量 *改善数据的局部性,比如cdn的节点数据,将数据落到用户最近的节点上。

生产发送过程

kafka消息顺序(中间件-Kafka原理-生产者如何把消息写进Broker)(1)

kafka消息顺序(中间件-Kafka原理-生产者如何把消息写进Broker)(2)

编辑

  1. 生产者随机链接一个broker,根据topic发送meta协议请求;
  2. broker返回topic对应的所有分区数据,包括leader和leader所在的broker;
  3. 生产者会跟所有broker都建立好TCP长链接;
  4. 生产者在客户端根据消息发送策略计算要发送到的分区的broker;
  5. 生产者发送消息,客户端在消息积攒到发送条件时触发批量发送;
副本详细

kafka消息顺序(中间件-Kafka原理-生产者如何把消息写进Broker)(3)

kafka消息顺序(中间件-Kafka原理-生产者如何把消息写进Broker)(4)

kafka消息顺序(中间件-Kafka原理-生产者如何把消息写进Broker)(5)

kafka消息顺序(中间件-Kafka原理-生产者如何把消息写进Broker)(6)

在正常情况下,Kafka中的每个Topic都会有很多个分区,每个分区又会存在多个副本。在这些副本中,存在一个leader分区,而剩下的分区叫做 follower,所有对分区的读写操作都是对leader分区进行的。所以当我们向Kafka写消息或者从Kafka读取消息的时候,必须先找到对应分区的Leader及其所在的Broker地址,这样才可以进行后续的操作;

如上图中有3个分区,每个分区有两个副本,箭头起始为leader分区即主分区,箭头指向为分区副本;

副本参数:replica-factor,指的是每一个分区有多少个副本一般情况下,有多少个broker就设置多少个副本。

replica-factor 副本因子控制消息保存在几个broker(服务器)上,一般情况下副本数等于broker的个数。一个broker服务下,不可以创建多个副本因子。

创建主题时,副本因子应该小于等于可用的broker数。副本因子操作以分区为单位的。每个分区都有各自的主副本和从副本;

主副本叫做leader,从副本叫做 follower(在有多个副本的情况下,kafka会为同一个分区下的所有分区,设定角色关系:一个leader和N个 follower),处于同步状态的副本叫做in-sync-replicas(ISR);follower通过拉的方式从leader同步数据。

消费者和生产者都是从leader读写数据,不与follower交互。

副本因子的作用:让kafka读取数据和写入数据时的可靠性。副本因子是包含本身,同一个副本因子不能放在同一个broker中。如果某一个分区有三个副本因子,就算其中一个挂掉,那么只会剩下的两个中,选择一个leader,但不会在其他的broker中,另启动一个副本(因为在另一台启动的话,存在数据传递,只要在机器之间有数据传递,就会长时间占用网络IO,kafka是一个高吞吐量的消息系统,这个情况不允许发生)所以不会在另一个broker中启动。

如果所有的副本都挂了,生产者如果生产数据到指定分区的话,将写入不成功。lsr表示:当前可用的副本。

replica-factor数量如何设置?

一般来讲,与broker的数量保持一致,超过broker数量会导致一个broker会包含同一个分区多个副本,失去了容灾意义,还占用过多的磁盘;

副本分配算法如下:

  1. 将所有N Broker和待分配的i个Partition排序.将第i个Partition分配到第(i mod n)个Broker上.
  2. 将第i个Partition的第j个副本分配到第((i j) mod n)个Broker上.
  3. 基本保障每个leader分区分散在不同broker,保障每个replication分散在不同机器;
分区数量设计为多少合适?

一般根据实际需求进行选择,比如要保证消息顺序场景,这种需求必须只能一个分区,因为分区多于1个,会造成数据并发写入多个分区,进而不能保证消息顺序;

而非顺序要求的场景,我们尽量按流量和资源预算来设定,如我们有流量写入诉求:2G/s

这样就意味着,我们每秒写入2G,那么消费也要达到每秒2G的性能;

我们拿日志采集为例,1个请求大小:5k,每秒2G,每秒总共有X=419430个消息写入;

单个消息消费耗时:Y=5毫秒;

单线程每秒:Z = 1000/Y = 200个;

那么我们折算一下X个请求1秒内需要线程数:N=X / Z = 419430 / 200 = 2097个线程;

单机16C32G机器,假设我们开到16个线程,每个线程对应一个分区,不考虑多路复用的话,那么我们大概需要131台机器,来存储这些;

当然,这里主要从消费角度来考虑,并没有考虑并行消费,和批量消费场景;

kafka消息顺序(中间件-Kafka原理-生产者如何把消息写进Broker)(7)

kafka消息顺序(中间件-Kafka原理-生产者如何把消息写进Broker)(8)

实际使用中,我们用了80台32C的机器,处理了大概每秒5G的流量;

数据写入Kafka的策略

kafka提供了三种分区策略:轮询策略、随机策略、按消息键保序策略

1、轮询策略

这是默认的分区策略,能够保证消息最大限度的被平均分配到所有分区

2、随机策略(已经过时了)

也就是生产的消息被随机分配到不同的分区,实际的表现逊于轮询策略;实际上,老的kafka版本用的是随机策略,新的版本已经改成轮询策略了

3、按消息键保序策略

生产消息时,为每条消息定义消息键key,消息键是一个有着明确含义的业务字符串,可以是业务ID之类的;通过消息键,相同的消息键的消息能被保证写入相同的分区;

看看kafka提供的轮询策略的实现:RoundRobinPartitioner

public class RoundRobinPartitioner implements Partitioner { private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap(); public RoundRobinPartitioner() { } public void configure(Map<String, ?> configs) { } public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = Cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); int nextValue = this.nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (!availablePartitions.isEmpty()) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return ((PartitionInfo)availablePartitions.get(part)).partition(); } else { return Utils.toPositive(nextValue) % numPartitions; } } private int nextValue(String topic) { AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> { return new AtomicInteger(0); }); return counter.getAndIncrement(); } public void close() { } }

kafka消息顺序(中间件-Kafka原理-生产者如何把消息写进Broker)(9)

同时我们可以自定义分区策略,只需要实现分区策略接口即可,org.apache.kafka.clients.producer.Partitioner

public interface Partitioner extends Configurable, Closeable { int partition(String var1, Object var2, byte[] var3, Object var4, byte[] var5, Cluster var6); void close(); default void onNewbatch(String topic, Cluster cluster, int prevPartition) { } }

kafka消息顺序(中间件-Kafka原理-生产者如何把消息写进Broker)(10)

写入ACK

为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到 producer 发送的数据后,都需要向 producer 发送 ack(acknowledgement 确认收到),如果 producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。

在生产者通过配置acks ,根据acks参数来实现不同场景的写入需求:0:producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟,broker 一接收到还没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据;

1:producer 等待 broker 的 ack,partition 的 leader 落盘成功后返回 ack,如果在 follower同步成功之前 leader 故障,那么将会丢失数据;

-1(all):producer 等待 broker 的 ack,partition 的 leader 和 follower 全部落盘成功后才 返回 ack。但是如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会 造成数据重复。这里很多文章都漏掉了一个重要的点,follower并不是全部的follower,而是处于ISR列表中的follower,后面单独写一篇来介绍ISR列表;

生产者重试机制

创建生产者时,可以指定retries参数,如果向broker发送消息时抛出异常,并且异常是可重试异常RetriableException,那么此时就会按照指定的次数进行重试

1、哪些情况下可以重试

(1)没有到delivery超时时间

(2)剩余重试次数大于0

(3)异常类型为RetriableException或者使用事务管理器时允许重试

如下重试代码

private boolean canRetry(ProducerBatch batch, PartitionResponse response, long now) { boolean var10000; label29: { if (!batch.hasReachedDeliveryTimeout(this.accumulator.getDeliveryTimeoutMs(), now) && batch.attempts() < this.retries && !batch.isDone()) { if (this.transactionManager == null) { if (response.error.exception() instanceof RetriableException) { break label29; } } else if (this.transactionManager.canRetry(response, batch)) { break label29; } } var10000 = false; return var10000; } var10000 = true; return var10000; }

kafka消息顺序(中间件-Kafka原理-生产者如何把消息写进Broker)(11)

生产写入回调

kafka客户端中使用了很多的回调方式处理请求。基本思路是将回调函数暂存到Clientrequest中,而ClientRequest会暂存到inFlightRequests中,当返回response的时候,从inFlightRequests中读取对应的ClientRequest,并调用request中的回调函数完成处理。inFlightRequests是请求和响应处理的桥梁.

为什么要把回调做成异步,而不做成同步发送后立即得到发送结果,这里是出于发送的时候默认是采取批量发送的,批量发送会把客户端消息攒积达到批量发送条件后才会发送,所以并不会立即就得到发送的结果;

看看两种不同的回调执行方式;

@Autowired KafkaTemplate<String, String> kafkaTemplate; public void testCallBack1 () { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("topic", "{\"key\": \"value\"}"); CompletableFuture<SendResult<String, String>> completable = future.completable(); completable.whenCompleteAsync((n, e) -> { if (null != e) { System.out.println("发送报错了"); } else { System.out.println("发送成功了!"); } }); } public void testCallBack2() { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("topic", "{\"key\": \"value\"}"); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onFailure(Throwable throwable) { //失败 } @Override public void onSuccess(SendResult<String, String> stringStringSendResult) { //成功 } }); }

kafka消息顺序(中间件-Kafka原理-生产者如何把消息写进Broker)(12)

下面补充我们公司的里MQ架构方案

kafka消息顺序(中间件-Kafka原理-生产者如何把消息写进Broker)(13)

kafka消息顺序(中间件-Kafka原理-生产者如何把消息写进Broker)(14)

这里是常规的生产消费架构,网上都能查到搭建方式;

kafka消息顺序(中间件-Kafka原理-生产者如何把消息写进Broker)(15)

kafka消息顺序(中间件-Kafka原理-生产者如何把消息写进Broker)(16)

这是改良过后的架构方案,主要说一下优势和解决的问题

常规的架构正常使用是没有问题的,主要的问题在于当生产者暴增时,会造成客户端与broker有大量TCP链接,而且是长链接。曾经一度我们生产者发展到20w的时候,broker大概有10个物理机,造成每个物理机上,长期有20-30w的TCP链接。导致有些生产者容易被挤掉线,并且掉线后很难再找到链接上来的机会;

遂,改到在broker前面增加了四层负载的方式,保障所有生产者和消费者都能顺畅链接;

,