Kafka_架构与流程


1.Kafka概述

Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。

1.1 消息队列与其应用场景

发布/订阅模式

一对多,消费者消费数据之后不会清除消息

消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。

1.2 Kafka基础架构

1)Producer :消息生产者,就是向kafka broker发消息的客户端;
2)Consumer :消息消费者,向kafka broker取消息的客户端;
3)Consumer Group (CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即==消费者组是逻辑上的一个订阅者==。
4)Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
5)Topic :可以理解为一个队列,生产者和消费者面向的都是一个topic;
6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列;
7)Replica:副本,为保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower。
8)leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。
9)follower:每个分区多个副本中的“从”,实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,某个follower会成为新的leader。

1.3 关于Kafka服务器的一些理解

Kafka存在两个服务器连接操作Kafka内部数据

kafka-topics.sh --bootstrap-server hadoop105:9092
kafka-topics.sh --list --bootstrap-server hadoop102:9092 

Kafka通过高性能的TCP网络协议进行传输通信,在客户端与服务器之间

1.4 Kafka安装配置文件

位置

kafka/config/server.properties

#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能,当前版本此配置默认为true,已从配置文件移除
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

分别在hadoop103和hadoop104上修改配置文件

/opt/module/kafka/config/server.properties中的

broker.id=1、broker.id=2

注:broker.id不得重复

2.Kafka生产者

2.1分区策略

2.1.1分区原因

方便在集群中扩展,每个partition可以通过调整以适应它所在的机器,而一个topic又可以有多少个Partition组成,因此整个集群就可以适应任意大小的数据了;

可以提高并发,因为可以以Partition为单位读写了。

2.1.2分区原则

(1) 指明 partition 的情况下,直接将指明的值直接作为 partiton 值;

(2) 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;

(3) 既没有 partition 值又没有 key 值的情况下, kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,kafka再随机一个分区进行使用.

2.2数据可靠性保证

2.2.1生产者发送数据到topic partition的可靠性保证

能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送==ack==(acknowledgement确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。

2.2.2何时发送ack

确保有follower与leader同步完成

leader再发送ack,这样才能保证leader挂掉之后,能在follower中选出新的leader

2.2.3acks

0:这一操作提供了一个最低的延迟,partition的leader接收到消息还没有写入磁盘就已经返回ack,当leader故障时有可能丢失数据;

1:
partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据

-1(all): partition的leader和follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复

2.2.4多少个follower同步完成之后发送ack

同步方案

半数

全部

ack这里要使用zookeeper进行选举

Kafka选择全部同步完成,才会发送ack

2.2.5ISR

Leader维护了一个动态的in-sync replica set
(ISR),意为和leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader就会给producer发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会从ISR中选举新的leader。

2.3故障处理细节

leader和 follower故障处理细节

LEO:指的是每个副本最大的offset; Log End Offset

HW:指的是消费者能见到的最大的offset,ISR队列中最小的LEO。 High Watermark

HW是同步速度最慢的分区决定的follower

2.3.1follower故障

follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。

2.3.2leader故障

==数据丢失==

leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。

注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

如果ack等级为0或者是1,此时leader分区已经接受到信息并且提交==ack==(注意broker提交是ack),但是此时发生故障,被踢出ISR,然后follower此时没有落盘成功. 此时重新选举follower自动将使用自己HW让其他分区同步,此时会发生丢失一部分数据

2.4 Exactly Once语义

将服务器的ACK级别设置为-1,可以保证Producer到Server之间不会丢失数据,即==At Least Once==语义。

相对的,将服务器ACK级别设置为0,可以保证生产者每条消息只会被发送一次,即==At Most Once==语义。

At Least Once可以保证数据不丢失,但是不能保证数据不重复;相对的,At Least Once可以保证数据不重复,但是不能保证数据不丢失。

但是,对于一些非常重要的信息,比如说交易数据,下游数据消费者要求数据既不重复也不丢失,即Exactly Once语义。在0.11版本以前的Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。

2.4.1数据是如何发生重复的呢?

graph LR
A[Producer]--数据重复-->B[Server]
B-->C[Consumer]

一般情况在,在ack应答等级为-1时,当leader与follower都已经落盘成功之后,这个时候会发送ack,但是这个时候leader崩掉,无法返回ack,这个时候会发生数据重复

0.11版本的Kafka,引入了一项重大特性:幂等性。

所谓的幂等性就是指Producer不论向Server发送多少次重复数据,Server端都只会持久化一条。

幂等性结合At Least Once语义,就构成了Kafka的Exactly Once语义。即:At Least Once + 幂等性 = Exactly Once

要启用幂等性,只需要将Producer的参数中enable.idempotence设置为true即可。

Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一Partition的消息会附带Sequence Number。

而Broker端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。

但是PID重启就会变化,同时不同的Partition也具有不同主键,所以幂等性无法保证跨分区跨会话的Exactly Once。

Exactly Once(At least Once + idempotence)

(PID,Partition,SeqNumber)

如果Broker检查这个信息,如果相同就不会在持有

2.4.2Exactly Once依然存在的问题?

PID是生产者在启动时分配的一个PID,如果这个生产者宕机,重新启动,PID会发生改变,相同信息依然会被broker持有

3.Kakfa Broker

Kafka是一个分布式集群

kafkaBroker内部架构

kafka内部是由分布式集群组成

3.1Broker内部文件

首先Kafka中的消息时以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。

#查看kafka内部的数据
cd /opt/module/kafka/data
#ATLAS使用Kafka的分区
drwxrwxr-x  2 atguigu atguigu 4096 1125 23:05 ATLAS_ENTITIES-0
drwxrwxr-x  2 atguigu atguigu 4096 1125 23:05 ATLAS_ENTITIES-1
drwxrwxr-x  2 atguigu atguigu 4096 1125 23:05 ATLAS_ENTITIES-2
...
#系统维护偏移量offsets的系统分区
drwxrwxr-x. 2 atguigu atguigu 4096 1126 15:40 __consumer_offsets-11
...
#用户自动的topic
drwxrwxr-x  2 atguigu atguigu 4096 1126 15:42 testTopic-1
drwxrwxr-x. 2 atguigu atguigu 4096 1126 15:40 topic_log-0

注意topic是一个逻辑概念,并非由topic组成,而是

不同分区内的一组文件组成(leader follower)

#进入其中一个分区
cd topic_log-0
-rw-rw-r-- 1 atguigu atguigu  0 1126 15:42 00000000000000007372.index
-rw-rw-r-- 1 atguigu atguigu  0 1124 16:38 00000000000000007372.log
-rw-rw-r-- 1 atguigu atguigu  0 1126 15:42 00000000000000007372.timeindex
-rw-rw-r-- 1 atguigu atguigu 12 1126 15:40 leader-epoch-checkpoint

进入topic的一个分区文件,我们可以发现如上四个文件

分片与索引机制

由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片索引机制,将每个partition分为多个segment。

每个segment对应两个文件——“.index”文件和“.log”文件。

这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。

例如,first这个topic有三个分区,则其对应的文件夹为first-0,first-1,first-2。

index和log文件以当前segment的第一条消息的offset命名。下图为index文件和log文件的结构示意图。

index(数据在log中的索引)

log(真实的数据)timeindex (数据发送的时间索引) ,

时间索引和index索引均是用来提高查询数据效率;

三者密不可分

分片与索引详情图

索引文件

3.2关于leader-epoch-checkpoint

源码链接

https://github.com/apache/kafka/blob/ba237c5d21abb8b63c5edf53517654a214157582/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala#L43
In Kafka, a leader epoch refers to the number of leaders previously assigned by the controller. Every time a leader fails, the controller selects the new leader, increments the current "leader epoch" by 1, and shares the leader epoch with all replicas. The replicas use the leader epoch as a means of verifying the current leader. If a leader fails and returns, when it tries to contact other replicas, it will send what it believes is the current leader epoch. The replicas will ignore the messages sent with outdated leader epochs.

The leader-epoch-checkpoint file contains two columns: epochs and offsets, as shown here. Each row is a checkpoint for the latest recorded leader epoch and the leader's latest offset upon becoming leader. Both replicas and leaders contain this file. Its role is for checking what range of offsets pertain to which epoch.

说白了就是每一个topic维护了一个leader的版本,当心leader选举出来,leader的版本会加1

一共三台机器一共三行两列,第一列是leader版本,第二行是leader的offset

0
1
23 7372

3.3关于分区与副本的理解

在kafka中,每个主题可以有多个分区,每个分区又可以有多个副本

在这个多个副本中.只有一个是leader.其他的都是follower副本,仅有leader副本对外提供服务

4.Kafka消费者

4.1拉取模式

pull

pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout。

4.2分区分配策略

首先要明白,分区分配策略其实就是指引Kafka的消费者去读取Kafka内的topic分区中的文件

RoundRobin,Range , Sticky

4.2.1为什么同一个消费者组内的只有一个消费者消费分区?

pull拉取之后提交offset

5.zookeeper

Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。

Controller的管理工作都是依赖于Zookeeper的。

以下为partition的leader选举过程:

在操作上,我们有一下符合规范的ZooKeeper安装方式:
在物理/硬件/网络上的冗余:尽量不要把他们放在同一个机架上,合适的硬件配置(但不要过分),尽量保持电源,网络等。一个典型的ZooKeeper集群有5或7台服务器,分别允许宕机2台和3台服务器。如果你想部署一个小型集群,3台服务器也可以部署,但是要记住,在这种情况下你只能宕机1台服务器。
I/O隔离:如果你有大量的写入操作流入,你几乎肯定会把事务日志放在一组特定的磁盘上。写入事物日志是同步的(但为了性能会分批写入),因此并发写入会明显影响性能。数据快照是异步落盘,因此通常可以与操作系统和消息日志文件共享磁盘性能。你可以配置dataLogDir参数单独为服务器配置磁盘组。
应用隔离:除非你真的了解其他应用的运行模式,否则不要和ZooKeeper安装在一起,最好是单独部署运行ZooKeeper(尽管ZooKeeper可以均衡的利用硬件资源)。
谨慎使用虚拟化:他的运行状况取决于你的集群架构,读写模式和SLA,即便是由虚拟化层引入的微小开销也可能造成ZooKeeper的中断,毕竟ZooKeeper对此十分敏感。
ZooKeeper配置: 他是java运行的,首先确保你给他分配足够的堆空间(我们通常配置3-5G,但这是根据我们现有数据实际情况来定的)。不幸的是,我们没有一个好的固定公式来确定他的值,但是要记住分配个ZooKeeper的堆空间越大,快照也就越大,从而会影响快照的恢复时间。实际上,如果快照变得太大(几个G),那你能需要增加initlimit参数的值,以便为服务器提供足够的时间来恢复并加入集群。
监控:JMX和4个字母的命令(ZooKeeper提供的一系列命令,如:conf,cons,dump等)非常有用,他们在某些功能上重复了(这种情况下我们更喜欢4lw命令,他们似乎更容易预测情况,至少,他们和基础设施监控兼容性更好)
不要过度构建集群:大型集群,尤其是大量写入的情况下,意味着大量的集群内部通信(集群成员节点的写入和后续的仲裁更新),但是过小的将集群将承担不必要的风险。添加更多的服务器可以增加集群的读取能力。
总体来看,我们应尽量保持zookeeper尽可能小的处理负载(标准增长容量规划) 并尽可能的简单。与官方版本相比,我们尽量对配置和应用布局不做什么更改,尽可能保持官方原版。基于这些原因,我们倾向于跳过操作系统打包的版本。因为为了有更好的表现,它倾向于把关注点放在可能“混乱”的标准系统层上。

6.Kafka事务

3.6.1 Producer事务

为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer获得的PID和Transaction ID绑定。

这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID。

为了管理Transaction,Kafka引入了一个新的组件Transaction Coordinator。

Producer就是通过和Transaction Coordinator交互获得Transaction ID对应的任务状态。

Transaction Coordinator还负责将事务所有写入Kafka的一个内部Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

graph LR
A[producer]--Transaction ID&PID-->B[Server]
A--交互-->C[Transaction Coordinator]
C--写入事务存储事务状态-->D[事务topic]
D--读取事务状态-->A
D-->B

3.6.2事务状态是如何得到保存的呢?

看下面Kafka生产流程哦

3.6.3 Consumer事务(精准一次性消费)

上述事务机制主要是从Producer方面考虑,对于Consumer而言,事务的保证就会相对较弱,尤其时无法保证Commit的信息被精确消费。

这是由于Consumer可以通过offset访问任意信息,而且不同的Segment File生命周期不同,同一事务的消息可能会出现重启后被删除的情况。

如果想完成Consumer端的精准一次性消费,那么需要kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将kafka的offset保存到支持事务的自定义介质(比如mysql)。这部分知识会在后续项目部分涉及。

7.Kafka API与消息发送流程

7.1消息发送流程

Kafka的Producer发送消息采用的是异步发送的方式。

在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator

main线程将消息发送给RecordAccumulator,

Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。

Kafka发送消息流程

7.2 API

需要练习

8.kafka监控

 [atguigu@hadoop102 eagle]$ bin/ke.sh start
... ...
... ...
*******************************************************************
* Kafka Eagle Service has started success.
* Welcome, Now you can visit 'http://192.168.202.102:8048/ke'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************
[atguigu@hadoop102 eagle]$
#注意端口号8048

文章作者: Jinxin Li
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Jinxin Li !
  目录