Kafka简介

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,这使它作为企业级基础设施来处理流式数据非常有价值。此外,Kafka可以通过Kafka Connect连接到外部系统(用于数据输入/输出),并提供了Kafka Streams——一个Java流式处理库。

功能

  1. 消息队列:发布和订阅消息流,这个功能类似于消息队列,这也是 Kafka 也被归类为消息队列的原因。
  2. 容错的持久方式存储记录消息流: Kafka 会把消息持久化到磁盘,有效避免了消息丢失的风险·。
  3. 流式处理平台: 在消息发布的时候进行处理,Kafka 提供了一个完整的流式处理类库。

应用场景

  1. 消息队列 :建立实时流数据管道,以可靠地在系统或应用程序之间获取数据。
  2. 数据处理: 构建实时的流数据处理程序来转换或处理数据流。

Kafka特性

  1. 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
  2. 可扩展性:kafka集群支持热扩展。
  3. 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。
  4. 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)。
  5. 高并发:支持数千个客户端同时读写。

消息模型

队列模型(点对点式)

简单来说就是使用队列(Queue)作为消息通信载体,满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。 比如:我们生产者发送 100 条消息的话,两个消费者来消费一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)

队列模型存在的问题:

假如我们存在这样一种情况:我们需要将生产者产生的消息分发给多个消费者,并且每个消费者都能接收到完成的消息内容。

例如:Java中常使用的阻塞队列BlockingQueue

阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。并发包下很多高级同步类的实现都是基于BlockingQueue实现的。

值得注意的是,BlockingQueue使用了大量的synchronized关键字保证了线程安全,在多线程并发的过程,能够有效处理put-puttake-take方法之间的竞争而不会出现数据丢失或者重复消费的情况,这也是我们自己徒手实现阻塞队列时需要注意的关键点。

我们可以通过阻塞队列完成简单的生产者-消费者模式,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.nowcoder.community;

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueTests {

public static void main(String[] args) {
BlockingQueue queue = new ArrayBlockingQueue(10);
new Thread(new Producer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
}

}

class Producer implements Runnable {

private BlockingQueue<Integer> queue;

public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}

@Override
public void run() {
try {
for (int i = 0; i < 100; i++) {
Thread.sleep(20);
queue.put(i);
System.out.println(Thread.currentThread().getName() + "生产:" + queue.size());
}
} catch (Exception e) {
e.printStackTrace();
}
}

}

class Consumer implements Runnable {

private BlockingQueue<Integer> queue;

public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}

@Override
public void run() {
try {
while (true) {
Thread.sleep(new Random().nextInt(1000));
queue.take();
System.out.println(Thread.currentThread().getName() + "消费:" + queue.size());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

结果如下,我们发现由于生产者的速度明显快于消费者的速度,阻塞队列很快进入满的状态并阻塞,而最终由于生产者没有进一步的生产并且队列已空时,消费者也无法消费最终进入阻塞状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
Thread-0生产:1
Thread-0生产:2
Thread-0生产:3
Thread-1消费:2
Thread-0生产:3
Thread-0生产:4
Thread-0生产:5
Thread-0生产:6
Thread-0生产:7
Thread-0生产:8
Thread-0生产:9
Thread-3消费:8
Thread-0生产:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-1消费:9
Thread-0生产:10
Thread-0生产:10
Thread-3消费:10
Thread-2消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-0生产:10
Thread-3消费:9
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-1消费:9
Thread-3消费:9
Thread-0生产:10
Thread-0生产:10
Thread-3消费:9
Thread-1消费:9
Thread-0生产:10
Thread-0生产:10
Thread-2消费:9
Thread-2消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-0生产:10
Thread-2消费:9
Thread-3消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-0生产:10
Thread-2消费:9
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-0生产:10
Thread-3消费:9
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-0生产:10
Thread-3消费:9
Thread-2消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-0生产:10
Thread-3消费:9
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-3消费:9
Thread-1消费:8
Thread-2消费:7
Thread-2消费:6
Thread-1消费:5
Thread-3消费:4
Thread-3消费:3
Thread-2消费:2
Thread-3消费:1
Thread-3消费:0

发布-订阅模型(Kafka消息模型)

发布订阅模型(Pub-Sub) 使用主题(Topic) 作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者,在一条消息广播之后才订阅的用户则是收不到该条消息的

在发布 - 订阅模型中,如果只有一个订阅者,那它和队列模型就基本是一样的了。所以说,发布 - 订阅模型在功能层面上是可以兼容队列模型的。

MQ消息队列功能

  1. 异步处理

    例如用户注册环节中,我们需要经历用户注册 → 发送注册邮件 → 发送注册短信的过程,而通过消息队列我们可以异步实现,直接将请求操作写入消息队列,等待消费者消费执行

  2. 应用解藕

    现在有两个系统:订单系统/库存系统。如果库存系统异常,则订单调用库存失败,导致下单失败。说明订单系统和库存系统耦合度太高。而通过消息队列,下单的时候,库存系统不能正常运行,也不会影响下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了,实现了订单系统和库存系统的应用解耦。

  3. 流量削峰

    因为流量过大,暴增会导致应用挂掉,为解决这个问题,在前端加入消息队列。用户的请求,服务器接收后,首先写入消息队列,如果超过队列的长度,就抛弃,甩一个秒杀结束的页面!

Kafka总体架构

基于 Kafka-ZooKeeper 的分布式消息队列系统总体架构如下:

一个典型的 Kafka 体系架构包括若干 Producer(消息生产者),若干 broker(作为 Kafka 节点的服务器),若干 Consumer(Group),以及一个 ZooKeeper 集群。Kafka通过 ZooKeeper 管理集群配置、选举 Leader 以及在 consumer group 发生变化时进行 Rebalance(即消费者负载均衡,在下一课介绍)。Producer 使用 push(推)模式将消息发布到 broker,Consumer 使用 pull(拉)模式从 broker 订阅并消费消息。

架构相关组件

  • Producer:生产者,即消息发送者,push 消息到 Kafka 集群中的 broker(就是 server)中;
  • Broker:Kafka 集群由多个 Kafka 实例(server) 组成,每个实例构成一个 broker,说白了就是服务器;
  • Topic:producer 向 kafka 集群 push 的消息会被归于某一类别,即Topic,这本质上只是一个逻辑概念,面向的对象是 producer 和 consumer,producer 只需要关注将消息 push 到哪一个 Topic 中,而 consumer 只需要关心自己订阅了哪个 Topic;
  • Partition:每一个 Topic 又被分为多个 Partitions,即物理分区;出于负载均衡的考虑,同一个 Topic 的 Partitions 分别存储于 Kafka 集群的多个 broker 上;而为了提高可靠性,这些 Partitions 可以由 Kafka 机制中的 replicas 来设置备份的数量;如上面的框架图所示,每个 partition 都存在两个备份;
  • Consumer:消费者,从 Kafka 集群的 broker 中 pull 消息、消费消息;
  • Consumer group:high-level consumer API 中,每个 consumer 都属于一个 consumer-group,每条消息只能被 consumer-group 中的一个 Consumer 消费,但可以被多个 consumer-group 消费;
  • replicas:partition 的副本,保障 partition 的高可用;
  • leader:replicas 中的一个角色, producer 和 consumer 只跟 leader 交互;
  • follower:replicas 中的一个角色,从 leader 中复制数据,作为副本,一旦 leader 挂掉,会从它的 followers 中选举出一个新的 leader 继续提供服务;
  • controller:Kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover;
  • ZooKeeper:Kafka 通过 ZooKeeper 来存储集群的 meta 信息等。

P.S. Kafka 中的 Partition(分区) 实际上可以对应成为消息队列中的队列。

Topic和Partition

一个 topic 可以认为是一类消息,每个 topic 将被分成多个 partition,每个 partition 在存储层面是 append log 文件。任何发布到此 partition 的消息都会被追加到log文件的尾部,每条消息在文件中的位置称为 offset(偏移量),offset 为一个 long 型的数字,它唯一标记一条消息。 Kafka 机制中,producer push 来的消息是追加(append)到 partition 中的,这是一种顺序写磁盘的机制,效率远高于随机写内存,如下示意图:

为何将Topic进行Partition分区?

如果 Topic 不进行分区,而将 Topic 内的消息存储于一个 broker,那么关于该 Topic 的所有读写请求都将由这一个 broker 处理,吞吐量很容易陷入瓶颈,这显然是不符合高吞吐量应用场景的。有了 Partition 概念以后,假设一个 Topic 被分为 10 个 Partitions,Kafka 会根据一定的算法将 10 个 Partition 尽可能均匀的分布到不同的 broker(服务器)上,当 producer 发布消息时,producer 客户端可以采用 randomkey-hash轮询 等算法选定目标 partition,若不指定,Kafka 也将根据一定算法将其置于某一分区上。Partiton 机制可以极大的提高吞吐量,并且使得系统具备良好的水平扩展能力。

也就是说,Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力(负载均衡)。

在创建 topic 时可以在 $KAFKA_HOME/config/server.properties 中指定这个 partition 的数量(如下所示),当然可以在 topic 创建之后去修改 partition 的数量。

1
2
3
4
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3

在发送一条消息时,可以指定这个消息的 key,producer 根据这个 key 和 partition 机制来判断这个消息发送到哪个partition。partition 机制可以通过指定 producer 的 partition.class 这一参数来指定(即支持自定义),该 class 必须实现 kafka.producer.Partitioner 接口。

Kafka 如何保证消息的消费顺序?

我们在使用消息队列的过程中经常有业务场景需要严格保证消息的消费顺序,比如我们同时发了 2 个消息,这 2 个消息对应的操作分别对应的数据库操作是:更改用户会员等级、根据会员等级计算订单价格。假如这两条消息的消费顺序不一样造成的最终结果就会截然不同。

我们知道 Kafka 中 Partition(分区)是真正保存消息的地方,我们发送的消息都被放在了这里。而我们的 Partition(分区) 又存在于 Topic(主题) 这个概念中,并且我们可以给特定 Topic 指定多个 Partition。所以每次添加消息到 Partition(分区) 的时候都会采用尾加法。Kafka 只能为我们保证 Partition(分区) 中的消息有序,而不能保证 Topic(主题) 中的 Partition(分区) 的有序。

文件存储机制

分段机制

Kafka 中消息是以 topic 进行分类的,生产者通过 topic 向 Kafka broker 发送消息,消费者通过 topic 读取数据。然而 topic 在物理层面又能以 partition 为分组,一个 topic 可以分成若干个 partition。事实上,partition 并不是最终的存储粒度,partition 还可以细分为 segment,一个 partition 物理上由多个 segment 组成。

分段原因?

如果就以 partition 为最小存储单位,可以想象,当 Kafka producer 不断发送消息,必然会引起 partition 文件的无限扩张,将对消息文件的维护以及已消费的消息的清理带来严重的影响,因此,需以 segment 为单位将 partition 进一步细分。每个 partition(目录)相当于一个巨型文件被平均分配到多个大小相等的 segment(段)数据文件中(每个 segment 文件中消息数量不一定相等)这种特性也方便 old segment 的删除,即方便已被消费的消息的清理,提高磁盘的利用率。每个 partition 只需要支持顺序读写就行,segment 的文件生命周期由服务端配置参数(log.segment.bytes,log.roll.{ms,hours} 等若干参数)决定。

分段存储结构

segment 文件由两部分组成,分别为 “.index” 文件和 “.log” 文件,分别表示为 segment 索引文件和数据文件。这两个文件的命令规则为:partition 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值,数值大小为 64 位,20 位数字字符长度,没有数字用 0 填充,如下:

1
2
3
4
5
6
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log

以上面的 segment 文件为例,展示出 segment:00000000000000170410 的 “.index” 文件和 “.log” 文件的对应的关系,如下图:

“.index” 索引文件存储大量的元数据,“.log” 数据文件存储大量的消息,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。其中以 “.index” 索引文件中的元数据 [3, 348] 为例,在 “.log” 数据文件表示第 3 个消息,即在全局 partition 中表示 170410+3=170413 个消息,该消息的物理偏移地址为 348。

复制原理和同步方式

Kafka 中 topic 的每个 partition 有一个预写式的日志文件,虽然 partition 可以继续细分为若干个 segment 文件,但是对于上层应用来说,仍然可以将 partition 看成最小的存储单元(一个有多个 segment 文件拼接的 “巨型” 文件),每个 partition 都由一些列有序的、不可变的消息组成,这些消息被连续的追加到 partition 中。

如图:

其中:

  • LEO,LogEndOffset 的缩写,表示每个 partition 的 log 最后一条 Message 的位置。
  • HW 是 HighWatermark 的缩写,是指 consumer 能够看到的此 partition 的位置,这个涉及到多副本的概念。

同步副本 ISR

为了提高消息的可靠性,Kafka 每个 topic 的 partition 有 N 个副本(replicas),其中 N(大于等于 1)是 topic 的复制因子(replica fator)的个数。Kafka 通过多副本机制实现故障自动转移,当 Kafka 集群中出现 broker 失效时,副本机制可保证服务可用。对于任何一个 partition,它的 N 个 replicas 中,其中一个 replica 为 leader,其他都为 follower,leader 负责处理 partition 的所有读写请求,follower 则负责被动地去复制 leader 上的数据。如下图所示,Kafka 集群中有 4 个 broker,某 topic 有 3 个 partition,且复制因子即副本个数也为 3:

Leader Replica:主副本,可以处理请求作响应;

Follower Replica:从副本,只是做备份;当主副本挂掉的时候,可以从从副本中选出一个来作为新的主副本

如果 leader 所在的 broker 发生故障或宕机,对应 partition 将因无 leader 而不能处理客户端请求,这时副本的作用就体现出来了:一个新 leader 将从 follower 中被选举出来并继续处理客户端的请求。

一个 partition 有多个副本(replicas),为了提高可靠性,这些副本分散在不同的 broker 上,由于带宽、读写性能、网络延迟等因素,同一时刻,这些副本的状态通常是不一致的:即 followers 与 leader 的状态不一致。那么,如何保证新选举出的 leader 是优选呢? Kafka 机制中,leader 将负责维护和跟踪一个 ISR(In-Sync Replicas)列表,即同步副本队列,这个列表里面的副本与 leader 保持同步,状态一致。如果新的 leader 从 ISR 列表中的副本中选出,那么就可以保证新 leader 为优选。

虽然副本极大的增强了可用性,但是副本数量对 Kafka 的吞吐率有一定影响。默认情况下 Kafka 的 replica 数量为 1,即每个 partition 都只有唯一的 leader,无 follower,没有容灾能力。为了确保消息的可靠性,生产环境中,通常将其值(由 broker 的参数 offsets.topic.replication.factor 指定)大小设置为大于 1,比如 3。 所有的副本(replicas)统称为 Assigned Replicas,即 AR。ISR 是 AR 中的一个子集,由 leader 维护 ISR 列表,follower 从 leader 同步数据有一些延迟(由参数 replica.lag.time.max.ms 设置超时阈值),超过阈值的 follower 将被剔除出 ISR, 存入 OSR(Outof-Sync Replicas)列表,新加入的 follower 也会先存放在 OSR 中。AR=ISR+OSR。

P.S. ISR中包括:leader + 与leader保持同步的followers。

HW 俗称高水位,HighWatermark 的缩写,取一个 partition 对应的 ISR 中最小的 LEO 作为 HW,consumer 最多只能消费到 HW 所在的位置。另外每个 replica 都有 HW,leader 和 follower 各自负责更新自己的 HW 的状态。对于 leader 新写入的消息,consumer 不能立刻消费,leader 会等待该消息被所有 ISR 中的 replicas 同步后更新 HW,此时消息才能被 consumer 消费。这样就保证了如果 leader 所在的 broker 失效,该消息仍然可以从新选举的 leader 中获取。对于来自内部 broker 的读取请求,没有 HW 的限制。

下图详细的说明了当 producer 生产消息至 broker 后,ISR 以及 HW 和 LEO 的流转过程:

Kafka 的 ISR 的管理最终都会反馈到 ZooKeeper 节点上,具体位置为:

1
/brokers/topics/[topic]/partitions/[partition]/state

目前,有两个地方会对这个 ZooKeeper 的节点进行维护。

  1. Controller 来维护:Kafka 集群中的其中一个 Broker 会被选举为 Controller,主要负责 Partition 管理和副本状态管理,也会执行类似于重分配 partition 之类的管理任务。在符合某些特定条件下,Controller 下的 LeaderSelector 会选举新的 leader,ISR 和新的 leader_epochcontroller_epoch 写入 ZooKeeper 的相关节点中。同时发起 LeaderAndIsrRequest 通知所有的 replicas。
  2. leader 来维护:leader 有单独的线程定期检测 ISR 中 follower 是否脱离 ISR,如果发现 ISR 变化,则会将新的 ISR 的信息返回到 ZooKeeper 的相关节点中。

消息确认机制

当 producer 向 leader 发送数据时,可以通过 request.required.acks 参数来设置数据可靠性的级别:

1. request.required.acks = 1

这是默认情况,即:producer 发送数据到 leader,leader 写本地日志成功,返回客户端成功;此时 ISR 中的其它副本还没有来得及拉取该消息,如果此时 leader 宕机了,那么此次发送的消息就会丢失。

2. request.required.acks = 0

producer 不停向leader发送数据,而不需要 leader 反馈成功消息,这种情况下数据传输效率最高,但是数据可靠性确是最低的。可能在发送过程中丢失数据,可能在 leader 宕机时丢失数据。

3. request.required.acks = -1(all)

producer 发送数据给 leader,leader 收到数据后要等到 ISR 列表中的所有副本都同步数据完成后(强一致性),才向生产者返回成功消息,如果一直收不到成功消息,则认为发送数据失败会自动重发数据。这是可靠性最高的方案,当然,性能也会受到一定影响。

注意:参数 min.insync.replicas

如果要提高数据的可靠性,在设置 request.required.acks=-1 的同时,还需参数 min.insync.replicas 配合,如此才能发挥最大的功效。min.insync.replicas 这个参数用于设定 ISR 中的最小副本数,默认值为1,当且仅当 request.required.acks 参数设置为-1时,此参数才生效。当 ISR 中的副本数少于 min.insync.replicas 配置的数量时,客户端会返回异常:org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。不难理解,如果 min.insync.replicas 设置为 2,当 ISR 中实际副本数为 1 时(只有leader),将无法保证可靠性,此时拒绝客户端的写请求以防止消息丢失。

HW机制

考虑这样一种场景:acks=-1,部分 ISR 副本完成同步,此时leader挂掉,如下图所示:follower1 同步了消息 4、5,follower2 同步了消息 4,与此同时 follower2 被选举为 leader,那么此时 follower1 中的多出的消息 5 该做如何处理呢?

这里就需要 HW 的协同配合了。如前所述,一个 partition 中的 ISR 列表中,leader 的 HW 是所有 ISR 列表里副本中最小的那个的 LEO。类似于木桶原理,水位取决于最低那块短板。

如上图,某个 topic 的某 partition 有三个副本,分别为 A、B、C。A 作为 leader 肯定是 LEO 最高,B 紧随其后,C 机器由于配置比较低,网络比较差,故而同步最慢。这个时候 A 机器宕机,这时候如果 B 成为 leader,假如没有 HW,在 A 重新恢复之后会做同步(makeFollower) 操作,在宕机时 log 文件之后直接做追加操作,而假如 B 的 LEO 已经达到了 A 的 LEO,会产生数据不一致的情况,所以使用 HW 来避免这种情况。 A 在做同步操作的时候,先将 log 文件截断到之前自己的 HW 的位置,即 3,之后再从 B 中拉取消息进行同步。

如果失败的 follower 恢复过来,它首先将自己的 log 文件截断到上次 checkpointed 时刻的 HW 的位置,之后再从 leader 中同步消息。leader 挂掉会重新选举,新的 leader 会发送 “指令” 让其余的 follower 截断至自身的 HW 的位置然后再拉取新的消息。

当 ISR 中的个副本的 LEO 不一致时,如果此时 leader 挂掉,选举新的 leader 时并不是按照 LEO 的高低进行选举,而是按照 ISR 中的顺序选举。

P.S. 消息队列仍然有丢失的风险,即commit操作失败了。例如leader相关信息未完全同步到ISR副本中时leader宕机

Leader选举策略

为了保证可靠性,对于任意一条消息,只有它被 ISR 中的所有 follower 都从 leader 复制过去才会被认为已提交,并返回信息给 producer。如此,可以避免因部分数据被写进 leader,而尚未被任何 follower 复制就宕机的情况下而造成数据丢失。对于 producer 而言,它可以选择是否等待消息 commit,这可以通过参数 request.required.acks 来设置。这种机制可以确保:只要 ISR 中有一个或者以上的 follower,一条被 commit 的消息就不会丢失。

leader 在一个消息被 commit 前等待 follower 的数量越多,与 leader 保持同步的 follower 就越多,可靠性就越高,但这也会造成吞吐率的下降。

Kafka 在 ZooKeeper 中为每一个 partition 动态的维护了一个 ISR,这个 ISR 里的所有 replica 都与 leader 保持同步,只有 ISR 里的成员才能有被选为 leader 的可能(通过参数配置:unclean.leader.election.enable=false)。在这种模式下,对于 f+1 个副本,一个 Kafka topic 能在保证不丢失已经 commit 消息的前提下容忍 f 个副本的失败,在大多数使用场景下,这种模式是十分有利的。事实上,对于任意一条消息,只有它被 ISR 中的所有 follower 都从 leader 复制过去才会被认为已提交,并返回信息给 producer,从而保证可靠性。但与 “少数服从多数” 策略不同的是,Kafka ISR 列表中副本的数量不需要超过副本总数的一半,即不需要满足 “多数派” 原则,通常,ISR 列表副本数大于等于 2 即可,如此,便在可靠性和吞吐量方面取得平衡。

P.S.

关于unclean.leader.election.enable参数:

当 ISR 中至少有一个 follower 时(ISR 包括 leader),Kafka 可以确保已经 commit 的消息不丢失,但如果某一个 partition 的所有 replica 都挂了,自然就无法保证数据不丢失了。这种情况下如何进行 leader 选举呢?通常有两种方案:

  1. 等待 ISR 中任意一个 replica 恢复过来,并且选它作为 leader;
  2. 选择第一个恢复过来的 replica(并不一定是在 ISR 中)作为leader。

如何选择呢?这就需要在可用性和一致性当中作出抉择。如果一定要等待 ISR 中的 replica 恢复过来,不可用的时间就可能会相对较长。而且如果 ISR 中所有的 replica 都无法恢复了,或者数据丢失了,这个 partition 将永远不可用。

选择第一个恢复过来的 replica 作为 leader,如果这个 replica 不是 ISR 中的 replica,那么,它可能并不具备所有已经 commit 的消息,从而造成消息丢失。默认情况下,Kafka 采用第二种策略,即 unclean.leader.election.enable=true,也可以将此参数设置为 false 来启用第一种策略。

unclean.leader.election.enable 这个参数对于 leader 的选举、系统的可用性以及数据的可靠性都有至关重要的影响。生产环境中应慎重权衡。

Spring使用Kafka

首先配置并启动ZooKeeper,确认端口号等相关信息

配置Kafka相关信息并在本地启动,可以直接在本地环境中运行也可在docker中运行

pom.xml中配置Kafka相关依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

并在application.properties中进行相关配置

1
2
3
4
5
# KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=3000

具体测试代码如下,以牛客论坛项目为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.nowcoder.community;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {

@Autowired
private KafkaProducer kafkaProducer;

@Test
public void testKafka() {
kafkaProducer.sendMessage("test", "你好");
kafkaProducer.sendMessage("test", "在吗");

try {
Thread.sleep(1000 * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

@Component
class KafkaProducer {

@Autowired
private KafkaTemplate kafkaTemplate;

public void sendMessage(String topic, String content) {
kafkaTemplate.send(topic, content);
}

}

@Component
class KafkaConsumer {

@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record) {
System.out.println(record.value());
}


}

生产者(Producer) 调用send方法发送消息之后,消息可能因为网络问题并没有发送过去。为了避免相关情况,我们不能默认在调用send方法发送消息之后消息消息发送成功了。为了确定消息是发送成功,我们要判断消息发送的结果。

1
2
3
4
5
6
7
8
public void sendMessage(String topic, Object o) {
// 分区编号最好为 null,交给 kafka 自己去分配
ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, null, System.currentTimeMillis(), String.valueOf(o.hashCode()), o);

ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(producerRecord);
future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));
}

消费者Consumer被动接受消费

注意到consumer 采用 pull 模式从 broker 中读取数据。

push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。

对于 Kafka 而言,pull 模式更合适,它可简化 broker 的设计,consumer 可自主控制消费消息的速率,同时 consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

消息同步问题

数据丢失

与之前提及的类似,我们可以:

设置 acks = all

解决办法就是我们设置 acks = all。acks 是 Kafka 生产者(Producer) 很重要的一个参数。

acks 的默认值即为1,代表我们的消息被leader副本接收之后就算被成功发送。当我们配置 acks = all 代表则所有副本都要接收到该消息之后该消息才算真正成功被发送。

设置 replication.factor >= 3

为了保证 leader 副本能有 follower 副本能同步消息,我们一般会为 topic 设置 replication.factor >= 3。这样就可以保证每个 分区(partition) 至少有 3 个副本。虽然造成了数据冗余,但是带来了数据的安全性。

设置 min.insync.replicas > 1

一般情况下我们还需要设置 min.insync.replicas> 1 ,这样配置代表消息至少要被写入到 2 个副本才算是被成功发送。min.insync.replicas 的默认值为 1 ,在实际生产中应尽量避免默认值 1。

但是,为了保证整个 Kafka 服务的高可用性,你需要确保 replication.factor > min.insync.replicas 。为什么呢?设想一下加入两者相等的话,只要是有一个副本挂掉,整个分区就无法正常工作了。这明显违反高可用性!一般推荐设置成 replication.factor = min.insync.replicas + 1

设置 unclean.leader.election.enable = false

Kafka 0.11.0.0版本开始 unclean.leader.election.enable 参数的默认值由原来的true 改为false

我们最开始也说了我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。多个 follower 副本之间的消息同步情况不一样,当我们配置了 unclean.leader.election.enable = false 的话,当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。

重复消费

原因

  • 消费者宕机、重启或者被强行kill进程,导致消费者消费的offset没有提交。
  • 设置enable.auto.commit为true,如果在关闭消费者进程之前,取消了消费者的订阅,则有可能部分offset没提交,下次重启会重复消费。
  • 消费后的数据,当offset还没有提交时,Partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout.ms时间,那么就会触发reblance重平衡,此时可能存在消费者offset没提交,会导致重平衡后重复消费。

解决

  1. 提高消费者的处理速度。例如:对消息处理中比较耗时的步骤可通过异步的方式进行处理、利用多线程处理等。在缩短单条消息消费的同时,根据实际场景可将max.poll.interval.ms值设置大一点,避免不必要的Rebalance。可根据实际消息速率适当调小max.poll.records的值。
  2. 引入消息去重机制。例如:生成消息时,在消息中加入唯一标识符如消息id等。在消费端,可以保存最近的max.poll.records条消息id到redis或mysql表中,这样在消费消息时先通过查询去重后,再进行消息的处理。
  3. 保证消费者逻辑幂等。

参考相关文档网页

菜鸡真的学到了,补坑扫盲了

[1]深入浅出理解基于 Kafka 和 ZooKeeper 的分布式消息队列

[2]Kafka总结