Kafka_运维与实战


0x0 Kafka运维命令

1.连接zk

由于kafka的各种元数据都存储在zk,要连接kafka集群也要通过zk获取各个broker的ip端口然后连接broker。因此,大多数kafka自带的运维命令都要指定zk的地址,比如用kafka-topics列出所有topics:

kafka-topics --zookeeper localhost:2181/kafka --list

--zookeeper参数是必须指定的。另外,如果kafka集群启动的时候在配置文件中指定了namespace,记得要在zk的地址后面也要加上kafka所属的namespace。否则kafka就找不到kafka集群的相关元数据了。

由于kafka的元数据都存储在zk,因此掌握好如何查看zk的数据也是运维kafka集群的一个关键。

zkCli 命令

zookeeper安装包一般都会提供zkCli命令来让用户连接zookeeper集群。

./zkCli.sh -timeout 5000 -r -server ip:port

之后进入zk的交互界面,就可以输出相关命令查看zk的数据了。

# 查看kafka集群下所有的brokers id列表
ls /brokers/ids
# 查看 1003 broker的信息
get /brokers/ids/1003

其他zk交互界面的命令这里不多做介绍,zkCli的帮助文档已经写的很清楚了。

2.topic 相关

kafka-topics可以进行和topics相关的一些操作。下面介绍一下如何运用该命令来操作kafka topics。

该命令最终是调用kafka源码中的TopicCommand类来实现的。

生产与消费

#查看指定topic的详细信息
bin/kafka-topics.sh --zookeeper  hadoop102:2181/kafka --describe --topic test

#往topic里生产数据
bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic test

#消费topic
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning  --topic test

列出所有的topic & 获取命令帮助

# 列出帮助文档,英文好的同学基本看帮助文档就可以指定大概怎么使用该命令了
kafka-topics --help
# 列出kafka集群下的所有topics,这里需要指定kafka机器元数据存储所在的zk机器地址,记得如果有namespace,要也加上,否则将连不上kafka集群
 bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --list

创建topic

# 创建一个topic为test的topic,并指定分区数为5,副本数为1。这里的副本数不能超过broker的数量,否则会报错
kafka-topics --topic timeTest --zookeeper localhost:2181/kafka --create --replication-factor 1 --partitions 5
# 创建时指定副本在哪个broker上,多个partition之间用逗号分隔,副本之间用":"分割,第一个副本默认是leader
kafka-topics.sh --zookeeper 172.19.0.5:2181 --topic lyt2 --create --replica-assignment 1001:1002,1001:1002,1001:1002

列出所有topic的详情

通过 --describe 参数可以列出我们指定的topics详情,包括 partitions、leader、replicas、isr等。

kafka-topics.sh --zookeeper localhost:2181/kafka --describe test test_yangjb
# 输出
Topic:test  PartitionCount:5    ReplicationFactor:3 Configs:
    Topic: test Partition: 0    Leader: 1001    Replicas: 1001,1002,1003    Isr: 1002,1001,1003
    Topic: test Partition: 1    Leader: 1002    Replicas: 1002,1003,1001    Isr: 1002,1003,1001
    Topic: test Partition: 2    Leader: 1003    Replicas: 1003,1001,1002    Isr: 1002,1001,1003
    Topic: test Partition: 3    Leader: 1001    Replicas: 1001,1003,1002    Isr: 1002,1001,1003
    Topic: test Partition: 4    Leader: 1002    Replicas: 1002,1001,1003    Isr: 1002,1001,1003

下面是一些 使用—describe时可以使用的其他参数

# 只列出修改了默认配置的那些topic。并可以查看修改了哪些topic配置
--topics-with-overrides
# 列出那些目前没有leader的topic
--under-replicated-partitions
# 列出那些正在同步的topic或者同步出现异常的topic
--under-replicated-partitions

删除topic

注意,kafka删除topic是异步的,因此并不是命令返回了topic就已经被成功删除。而是等待后台的删除任务执行成功才真正删除该topic。

kafka-topics --zookeeper localhost:2181/kafka --delete --topic yangjb_test

修改topic相关信息

通过 --alter 参数可以修改topic的信息,能修改的信息包括 partition数量、replica分配情况、topic配置。如果要修改 partition数量时,修改的后的数量一定要比当前的数量大,否则会报错。

# 将partition数量修改成7个
kafka-topics --zookeeper localhost:2181/kafka --topic test --alter --partitions 7
# 通过 --replica-assignment 参数指定新增partition的副本分布情况
# 如果原先的partition数量是3,那么新增的一个分区的副本分布应该在1002和1003
kafka-topics --zookeeper localhost:2181/kafka --topic test -alter --partitions 4 --replica-assignment 1001:1002,1001:1002,1001:1002,1002:1003
# 修改topic test的配置 flush.ms =30000 。
kafka-topics --zookeeper localhost:2181/kafka  --topic test --alter --config flush.ms=30000
# 删除topic test的 flush.ms 配置
kafka-topics --zookeeper localhost:2181/kafka  --topic test --alter --delete-config flush.ms

注意,在后续的kafka版本中,关于topic的配置的修改删除可能会被移到kafka-configs.sh中。官方建议使用kafka-configs来修改topic的配置。

3.分区副本重分配

在数据量大的情况下,各个broker上的数据量经常会不一致,有的broker上数据非常大,有的则很小,为了让数据更均匀的分布在各个broker,我们就要学会对topic的partion进行分区副本重分配。

首先建立一个json文件,用来描述如何分配分区副本。

assign.json

{
  "partitions": [
    {
      "topic": "test",
      "partition": 1,
      "replicas": [
        1002,
        1003
      ]
    },
    {
      "topic": "test",
      "partition": 2,
      "replicas": [
        1003,
        1002
      ]
    }
  ],
  "version": 1
}

文件中只要指定要重新分配副本的分区号就可以,不需要列出所有分区。

提交分区副本重分配任务:

# --execute 参数表示执行
kafka-reassign-partitions --zookeeper localhost:2181/kafka --reassignment-json-file assign.json --execute
# --verify 参数表示查看分区副本重分配任务的执行状态
kafka-reassign-partitions --zookeeper localhost:2181/kafka --reassignment-json-file assign.json --verify

让系统自动帮我们生成重分配json文件:

执行命令之前需要建立一个json文件,告诉系统要重分配哪些分区:

gen.json:

{
  "topics": [
    {
      "topic": "foo"
    }
  ],
  "version": 1
}

接着执行命令

# --generate 表示生成重分配的json文件
# --topics-to-move-json-file 指定要重分配哪些topic
# --broker-list 表示要分配到哪些broker上去
kafka-reassign-partitions --zookeeper localhost:2181/kafka --generate --topics-to-move-json-file gen.json --broker-list 1001,1002,1003

其他参数

# 指定重分配时,在一个broker上,各个日志目录之间复制数据的阈值,最低要求 1 KB/s
# 如果重分配任务正在进行,第二次执行会修改原来设置的阈值
--replica-alter-log-dirs
# 指定重分配时,在不同broker之间传输数据的阈值,最低要求 1 KB/s
# 如果重分配任务正在进行,第二次执行会修改原来设置的阈值
--throttle
# 等待重分配任务开始的超时时间
--timeout

分区副本重分配过程

详情可以看kafka源码的KafkaController#onPartitionReassignment()的方法注解。

RAR = Reassigned replicas,目标要分配的副本情况
OAR = Original list of replicas for partition,原先的副本分配情况
AR = current assigned replicas,当前的副本分配情况

  1. 更新zk处的partition副本配置:AR=RAR+OAR
  2. 向所有RAR+OAR的副本发送元数据更新请求
  3. 将新增的那部分的副本状态设置为NewReplica。也就是 RAR-OAR 那部分副本
  4. 等待所有的副本和leader保持同步。也就是抱着RAR+OAR的副本都在isr中了
  5. 将所有在RAR中的副本状态都设置为OnlineReplica
  6. 在内存中先将AR=RAR
  7. 如果leader不在RAR中,就需要重新竞选leader。采用ReassignedPartitionLeaderSelector选举
  8. 将所有准备移除的副本状态设置为OfflineReplica。也就是OAR-RAR的那部分副本。这时partition的isr会收缩
  9. 将所有准备移除的副本状态设置为NonExistentReplica。这时所在的分区副本数据会被删除。
  10. 将内存中的AR更新到zk
  11. 更新zk的/admin/reassign_partitions路径,移除这个partition
  12. 发送新的元数据到各个broker上

假设当前有OAR = {1, 2, 3}, RAR = {4,5,6},在进行partition reaassigned的过程中会发生如下变化

AR leader/isr 步骤
{1,2,3} 1/{1,2,3} 初始状态
{1,2,3,4,5,6} 1/{1,2,3,4,5,6} 步骤2
{1,2,3,4,5,6} 1/{1,2,3,4,5,6} 步骤4
{1,2,3,4,5,6} 4/{1,2,3,4,5,6} 步骤7
{1,2,3,4,5,6} 4/{1,2,3,4,5,6} 步骤8
{4,5,6} 4/{4,5,6} 步骤10

4.删除某个partition的数据

使用kafka-delete-records命令可以删除指定topic-partition在指定offset之前的所有数据。

该命令是kafka在0.11版本之后才支持的。

首先需要编写删除offset描述json文件:

delete.json

{
  "partitions": [
    {
      "topic": "test",
      "partition": 0,
      "offset": 24
    }
  ],
  "version": 1
}

上面的json文件表示删除topic是test的0号parition的24之前的所有offset,也就是1-23这些offset的数据都会被删除掉。

kafka-delete-records --bootstrap-server 127.0.0.1:9092 --offset-json-file delete.json

5.全局&topic配置修改

通过kafka-configs命令,我们可以修改broker的配置,以及topic的配置、client和user的配置。

配置更新原理

kafka-configs命令修改配置后会被写到对应的zookeeper的节点上持久化,之后kafka集群重启后还会加载这些配置,并覆盖配置文件的那些配置。也就是说,如果在此处设置了某个配置项,之后在配置文件中对这个配置项的改动都不会起作用,因为被覆盖了。

用该命令修改了配置后,可以在zk的节点下看到对应的配置内容。

节点目录一般是 /config/entityType/entityName,entityType可以是brokers、topics、users、clients。entityName表示具体的名称,比如broker的id,topic的名称等。

比如要看0号broker修改过的配置项,可以在zk交互界面中输入

# /kafka 是命名空间
get /kafka/config/brokers/0

修改broker配置

# 将0号broker的配置 log.cleaner.backoff.ms修改成1000,flush.ms 也修改成1000
# --alter 表示要修改配置项
# --add-config 后面跟着要修改的配置项
kafka-configs --bootstrap-server 127.0.0.1:9092 --entity-type brokers --entity-name 0 --add-config log.cleaner.backoff.ms=1000,flush.ms=1000 --alter
# 删除0号broker 对 log.cleaner.backoff.ms的配置
kafka-configs --bootstrap-server 127.0.0.1:9092 --entity-type brokers --entity-name 0 --delete-config log.cleaner.backoff.ms --alter
# 列出0号broker修改过的配置项
kafka-configs --bootstrap-server 127.0.0.1:9092 --entity-type brokers --entity-name 0 --describe

修改topic的配置

# 将test这个topic的 delete.retention.ms修改成1000,flush.ms 也修改成1000
kafka-configs --zookeeper 127.0.0.1:2181/kafka --entity-type topics --entity-name test --add-config delete.retention.ms=1000,flush.ms=1000 --alter
# 删除test这个topic的 delete.retention.ms和flush.ms配置项
kafka-configs --zookeeper 127.0.0.1:2181/kafka --entity-type topics --entity-name test --delete-config delete.retention.ms,flush.ms --alter
# 列出 test这个topic修改过的配置项
kafka-configs --zookeeper 127.0.0.1:2181/kafka --entity-type topics --entity-name test --describe

修改client的配置

这里的client是指客户端,也就是produer或者consumer。客户端支持修改的配置有

# 请求限制
request_percentage
# 推送消息时的流量控制
producer_byte_rate
# 消费时的流量控制
consumer_byte_rate

通过指定clientId我们可以控制指定客户端的配置,从而控制他们的流量不会超过我们设定的值

# 设置 客户端id 为test的 producer_byte_rate和consumer_byte_rate为1024
kafka-configs --zookeeper 127.0.0.1:2181/kafka --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=1024' --entity-type clients --entity-name test

6.查看broker上磁盘的使用情况

在0.11版本中,新增一个命令kafka-log-dirs可以查看broker的磁盘使用情况。

该命令可以从两个维度观察磁盘的使用情况,一个是指定broker id,查看该broker的数据目录的各个topic parition的占用大小。还可以直接指定topic,查看这些topic的partition在各个broker上的使用情况。甚至可以两个过滤条件一起用,同时指定brokerId和topic。

# 查看0、1号broker上各个topic partition的磁盘使用情况
kafka-log-dirs --bootstrap-server 127.0.0.1:9092 --broker-list 0,1 --describe
# 查看topic:test 在各个broker上的磁盘使用情况
kafka-log-dirs --bootstrap-server 127.0.0.1:9092 --topic-list test --describe
# 查看topic test 在0号broker上的磁盘使用情况
kafka-log-dirs --bootstrap-server 127.0.0.1:9092 --topic-list test --broker-list 0 --describe

输出示例:

{
  "version": 1,
  "brokers": [
    {
      "broker": 1001,
      "logDirs": [
        {
          "logDir": "/kafka/kafka-logs-7da01186c90a",
          "error": null,
          "partitions": [
            {
              "partition": "test-4",
              "size": 0,
              "offsetLag": 0,
              "isFuture": false
            },
            {
              "partition": "test-0",
              "size": 0,
              "offsetLag": 0,
              "isFuture": false
            }
          ]
        }
      ]
    }
  ]
}

7.使用kafka-preferred-replica-election进行leader选举

当我们查看某个topic partition时,会输出该partiton replica的列表,其中replica列表的第一个replica被kafka称为preferred replica。

Topic: test Partition: 0    Leader: 1002    Replicas: 1001,1002,1003    Isr: 1002,1001,1003

上面的test partition-0中,1001就是那个preferred replica。在大多情况下,preferred replica一般就是leader,但是有些情况可能不是。因此,kafka提供了kafka-preferred-replica-election来将preferred replica选举成leader。

首先我们需要编辑prefered.json 文件:

{
  "partitions": [
    {
      "topic": "test",
      "partition": 0
    }
  ]
}

该文件告诉工具我们要对topic test的partition-0进行preferred选举:

kafka-preferred-replica-election --zookeeper 127.0.0.1:2181 --path-to-json-file prefered.json

8.声明:转载来源

0x1 Kafka eagle监控

1.Kafka Eagle如何安装?

Kafka Eagle安装部署非常方便,可以从官网下载最新版本进行安装,或者从Github下载最新的Release源代码进行编译安装。

修改kafka内部配置

vim /kafka/bin/kafka-server-start.sh
#----------------需要修改内容---------
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
#----------------修改内容-------------
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    export JMX_PORT="9999"
    #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

例如,从官网下载Kafka Eagle安装包,按如下命令操作即可:

#解压安装包在software里面
tar -zxvf kafka-eagle-v1.4.2-bin.tar.gz
#进入文件,然后进行解压至
tar -zxvf kafka-eagle-web-1.4.5-bin.tar.gz -C /opt/module/
#修改名称
mv kafka-eagle-web-1.4.5/ eagle
#给ke.sh添加权限
chmod u+x ke.sh

配置环境变量

# 配置JAVA_HOME和KE_HOME:
vi ~/.bash_profile
export JAVA_HOME=/hadoop/jdk8
export KE_HOME=/hadoop/kafka-eagle
export PATH=$PATH:$JAVA_HOME/bin:$KE_HOME/bin

修改conf/system-config.properties

######################################
# 设置Kafka多集群,这里只需要设置Zookeeper,
# 系统会自动识别Kafka Broker
######################################
# kafka.eagle.zk.cluster.alias=cluster1
# cluster1.zk.list=127.0.0.1:2181
# cluster2.zk.list=127.0.0.1:2181/plain
# cluster3.zk.list=127.0.0.1:2181/scram
# cluster4.zk.list=vmn4:2181
#--
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
#--

######################################
# Zookeeper线程池最大连接数
######################################
kafka.zk.limit.size=25

######################################
# Kafka Eagle的页面访问端口
######################################
kafka.eagle.webui.port=8048

######################################
# 存储消费信息的类型,一般在0.9版本之前,消费
# 信息会默认存储在Zookeeper中,所以存储类型
# 设置zookeeper即可,如果是在0.10版本之后,
# 消费者信息默认存储在Kafka中,所以存储类型
# 设置为kafka。而且,在使用消费者API时,尽量
# 客户端Kafka API版本和Kafka服务端的版本保持
# 一致性。
######################################
cluster1.kafka.eagle.offset.storage=kafka
#cluster2.kafka.eagle.offset.storage=kafka
#cluster3.kafka.eagle.offset.storage=kafka
#cluster4.kafka.eagle.offset.storage=kafka

######################################
# 开启性能监控,数据默认保留30天
######################################
kafka.eagle.metrics.charts=true
kafka.eagle.metrics.retain=30


######################################
# KSQL查询Topic数据默认是最新的5000条,如果
# 在使用KSQL查询的过程中出现异常,可以将下面
# 的false属性修改为true,Kafka Eagle会在
# 系统中自动修复错误。
######################################
kafka.eagle.sql.topic.records.max=5000
kafka.eagle.sql.fix.error=false

######################################
# 删除Kafka Topic时需要输入删除密钥,由
# 管理员执行
######################################
kafka.eagle.topic.token=keadmin

######################################
# 开启Kafka ACL特性,例如SCRAM或者PLAIN,
# 一般生产环境会使用SCRAM来做ACL,应为SCRAM
# 可以动态创建和管理用户。
######################################
cluster1.kafka.eagle.sasl.enable=false
cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
cluster1.kafka.eagle.sasl.mechanism=SCRAM-SHA-256
cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret";
cluster1.kafka.eagle.sasl.client.id=

cluster2.kafka.eagle.sasl.enable=true
cluster2.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
cluster2.kafka.eagle.sasl.mechanism=PLAIN
cluster2.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";
cluster2.kafka.eagle.sasl.client.id=

######################################
# 存储Kafka Eagle元数据信息的数据库,目前支持
# MySQL和Sqlite,默认使用Sqlite进行存储
######################################
# kafka.eagle.driver=com.mysql.jdbc.Driver
# kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
# kafka.eagle.username=root
# kafka.eagle.password=123456

#kafka.eagle.driver=org.sqlite.JDBC
#kafka.eagle.url=jdbc:sqlite:/Users/dengjie/webserver/hadoop/sqlite/ke.db
#kafka.eagle.username=root
#kafka.eagle.password=root

kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456

启动

# 启动Kafka Eagle系统,执行如下命令:
ke.sh start

同时,我们还可以执行其他的命令,参数如下:

# 查看Kafka Eagle运行状态
ke.sh status

# 停止Kafka Eagle
ke.sh stop

# 查看Kafka Eagle GC情况
ke.sh gc

# 查看Kafka Eagle服务器资源占用情况,例如TCP、句柄等
ke.sh stats

# 查看Kafka Eagle版本号
ke.sh version

# 查看Kafka Eagle服务器上JDK的编码情况(如果JDK编码不是UTF-8,可能会有异常出现,执行如下命令,根据提示来修复JDK编码问题)
ke.sh jdk

# 查看Kafka Eagle中是否存在某个类(如果需要精确,类名前面可以加上包名)
ke.sh find [ClassName]

2.总结

总的来说,Kafka Eagle提供了简单、易用的页面,部署方便。同时,提供非常详细的操作手册,根据官网提供的操作手册来安装Kafka Eagle,一般都可以正常使用。另外,有时候可能会在日志中发现一些连接超时或是空指针异常,对于这类问题,首先需要检测Kafka集群的各个Broker节点JMX_PORT是否开启(这个Kafka默认是不开启),然后就是空指针异常问题,这类问题通常发生在Kafka集群配置了ACL,这就需要认真检测Kafka Eagle配置文件中ACL信息是否正确(比如设置的用户名和密码是否正确,以及用户是否拥有访问Topic的权限等)

...
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-server -Xms8G -Xmx8G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    # 开启JMX_PORT端口,端口开启后,Kafka Eagle系统会自动感知获取
    export JMX_PORT="9999"
    # 注释脚本中默认的信息
    # export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

如果大家对Kafka Eagle感兴趣,可以关注一波。后续,在Github上笔者也会公布每个版本的开发计划,修复大家在issues上面提的需求和问题,将Kafka Eagle建设的更加完善、易用。

Kafka Eagle源代码地址:https://github.com/smartloli/kafka-eagle

0x3 Kafka遇见的问题

1.生产错误

[2021-01-11 08:50:00,535] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {testTopc1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

解决

vi config/server.properties
add below line:
#添加以下行:
listeners=PLAINTEXT://localhost:9092

#No need to change the advertised.listeners as it picks up the value from std listener property. 不需要更改 advertised.listeners,因为它从 std listener 属性获取值
#Hostname and port the broker will advertise to producers and consumers. If not set,
#代理将向生产者和消费者发布广告的 Hostname 和 port,

#it uses the value for "listeners" if configured 如果配置,它将使用“ listeners”的值
#Otherwise, it will use the value returned from java.net.InetAddress.getCanonicalHostName().

stop the Kafka broker:

bin/kafka-server-stop.sh
restart broker:

bin/kafka-server-start.sh -daemon config/server.properties
#and now you should not see any issues.

2.listen介绍

那么先看看文字类描述:

  • listeners: 学名叫监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。
  • advertised.listeners:和 listeners 相比多了个 advertisedAdvertised 的含义表示宣称的、公布的,就是说这组监听器是 Broker 用于对外发布的。

比如说:

listeners: INSIDE://172.17.0.10:9092,OUTSIDE://172.17.0.10:9094
advertised_listeners: INSIDE://172.17.0.10:9092,OUTSIDE://<公网 ip>:端口
kafka_listener_security_protocol_map: "INSIDE:SASL_PLAINTEXT,OUTSIDE:SASL_PLAINTEXT"
kafka_inter_broker_listener_name: "INSIDE"

advertised_listeners 监听器会注册在 zookeeper 中;

当我们对 172.17.0.10:9092 请求建立连接,kafka 服务器会通过 zookeeper 中注册的监听器,找到 INSIDE 监听器,然后通过 listeners 中找到对应的 通讯 ip 和 端口;

同理,当我们对 <公网 ip>:端口 请求建立连接,kafka 服务器会通过 zookeeper 中注册的监听器,找到 OUTSIDE 监听器,然后通过 listeners 中找到对应的 通讯 ip 和 端口 172.17.0.10:9094

总结:advertised_listeners 是对外暴露的服务端口,真正建立连接用的是 listeners

什么场景用到

只有内网

比如在公司搭建的 kafka 集群,只有内网中的服务可以用,这种情况下,只需要用 listeners 就行

listeners: <协议名称>://<内网ip>:<端口>

例如:

listeners: SASL_PLAINTEXT://192.168.0.4:9092

内外网

docker 中或者 在类似阿里云主机上部署 kafka 集群,这种情况下是 需要用到 advertised_listeners

docker 为例:

listeners: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094
advertised_listeners: INSIDE://localhost:9092,OUTSIDE://<宿主机ip>:<宿主机暴露的端口>
kafka_listener_security_protocol_map: "INSIDE:SASL_PLAINTEXT,OUTSIDE:SASL_PLAINTEXT"
kafka_inter_broker_listener_name: "INSIDE"

kafka


文章作者: Jinxin Li
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Jinxin Li !
  目录