kafka SASL/PLAIN 认证及 ACL 权限控制

一、Zookeeper 配置 SASL/PLAIN 认证(每个zookeeper节点都要做)

1.1 在 zookeeper 的 conf 目录下,创建 zk_server_jaas.conf 文件,内容如下

Server {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin"user_kafka="kafka";
};

username="admin" 是 zookeeper 实例之间通信的用户;user_kafka="kafka" 是 kafka broker 与 zookeeper 连接的时候的认证用户,密码为=后面的值;

1.2 修改 zoo.cfg ,添加以下内容

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

1.3 因为认证的时候用到包org.apache.kafka.common.security.plain.PlainLoginModule, 这个是 kafka-client.jar 里面,所有需要将相应的 jar 包拷贝到 zookeeper 安装根目录的 lib 目录下, 大概要 copy 以下这些 jar 包

mv /opt/module/kafka_2.12-2.4.1/libs/kafka-clients-2.4.1.jar /opt/module/zookeeper/lib/
mv /opt/module/kafka_2.12-2.4.1/libs/lz4-java-1.6.0.jar /opt/module/zookeeper/lib/
mv /opt/module/kafka_2.12-2.4.1/libs/osgi-resource-locator-1.0.1.jar /opt/module/zookeeper/lib/
mv /opt/module/kafka_2.12-2.4.1/libs/slf4j-api-1.7.28.jar /opt/module/zookeeper/lib/
mv /opt/module/kafka_2.12-2.4.1/libs/snappy-java-1.1.7.3.jar /opt/module/zookeeper/lib/

1.4 修改 zookeeper 启动命令参数,在文件末尾追加以下内容

SERVER_JVMFLAGS="-Djava.security.auth.login.config=/opt/module/zookeeper/conf/zk_server_jaas.conf"

1.5 重启 zookeeper 服务。

二、Kafka Server 配置 SASL/PLAIN 认证(每个kafka节点都要做)

2.1 修改 kafka server.properties 文件,添加以下内容(各个node修改为自己的hostname)

listeners=SASL_PLAINTEXT://node2:9092
advertised.listeners=SASL_PLAINTEXT://node2:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# 配置ACL入口类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer  
# default false | true to accept all the users to use it.
# allow.everyone.if.no.acl.found=true 
# 设置本例中admin为超级用户
super.users=User:admin

2.2 在kafka config 目录下创建 kafka_server_jass.conf 文件,内容如下

KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin"user_admin="admin"user_consumer="consumer"
};Client {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="kafka";
};

KafkaServer 段里面配置了 broker 之间的认证配置以及 client 和 broker 之间的认证配置

KafkaServer.username, KafkaServer.password 用于broker之间的相互认证

KafkaServer.user_admin和KafkaServer.user_consumer 用于 client 和broker 之间的认证, 下面我们 client 里面都用用户 consumer 进行认证

Client 段里面定义 username 和 password 用于 broker 与 zookeeper 连接的认证;

2.3 修改 kafka 启动命令脚本工具 kafka-server-start.sh,添加 java.security.auth.login.config 环境变量

将最后一句

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

修改为

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/opt/module/kafka_2.12-2.4.1/config/kafka_server_jaas.conf kafka.Kafka "$@"

2.4 修改完所有节点后,重启 kafka 服务,查看启动日志。

三、 Kafka client 的认证配置

3.1 在 Kafka 的 config 目录下创建 kafka_client_jaas.conf 文件,内容如下

KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="consumer"password="consumer";
};

3.2 修改 kafka 的 consumer.properties 和 producer.properties,添加以下内容

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

3.3 修改 producer 启动脚本参数,修改 kafka-console-producer.sh 文件

将最后一行

exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

修改为

exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/module/kafka_2.12-2.4.1/config/kafka_client_jaas.conf kafka.tools.ConsoleProducer "$@"

3.4 修改 consumer 启动脚本参数,修改 kafka-console-consumer.sh 文件

将最后一行

exec $(dirname $0)/kafka-run-class.sh  kafka.tools.ConsoleConsumer "$@"

修改为

exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/module/kafka_2.12-2.4.1/config/kafka_client_jaas.conf kafka.tools.ConsoleConsumer "$@"

四、ACL 配置

4.1 因为我们有个超级用户 admin,所以可以用 admin 做生产者,admin不需要做 ACL 配置

4.2 为 test topic 添加消费者用户 consumer

kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241 --add --allow-principal User:consumer --consumer --topic test --group test_group

4.3 测试

生产者测试

kafka-console-producer.sh --broker-list 
node2 --topic test --producer.config /opt/module/kafka_2.12-2.4.1/config/producer.properties

消费者测试

kafka-console-consumer.sh --bootstrap-server 
node2:9092 --topic test --from-beginning --consumer.config 
/opt/module/kafka_2.12-2.4.1/config/consumer.properties  

4.4 Flink 代码消费测试

object TestKafkaSource {// Kafka Broker 地址private val BOOTSTRAP_SERVERS = "node2:9092"// SASL/PLAIN 认证配置private val SASL_USERNAME = "consumer" private val SASL_PASSWORD = "consumer"// Topic 名称private val ALLOWED_TOPIC = "test"private val DENIED_TOPIC = "test1"def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval props = new Propertiesprops.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS)props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group")props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")// SASL/PLAIN 配置props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")props.put("sasl.mechanism", "PLAIN")props.put("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", SASL_USERNAME, SASL_PASSWORD))val dataSource: DataStreamSource[String] = env.addSource(new FlinkKafkaConsumer[String](ALLOWED_TOPIC , new SimpleStringSchema(), props))dataSource.print()env.execute()}
}

五、kafka-exporter 配置(使用了 kafka-exporter 监控 kafka 集群才做)

5.1 在kafka_server_jaas.conf 文件中加入 kafka-exporter 用户信息,内容如下

KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin"user_admin="admin"user_consumer="consumer"user_kafka-exporter="kafka-exporter"
};Client {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="kafka";
};

5.2 分发到所有 kafka broker ,并重启 kafka 服务

5.3 为 kafka-exporter 用户配置权限

kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241   --add --allow-principal User:kafka-exporter   --operation Describe --cluster
kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241   --add --allow-principal User:kafka-exporter   --operation Describe --topic '*'
kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241   --add --allow-principal User:kafka-exporter   --operation Describe --group '*'
kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241   --add --allow-principal User:kafka-exporter   --operation Read --topic '*'
kafka-acls.sh --authorizer-properties zookeeper.connect=node2:2181/kafka241   --add --allow-principal User:kafka-exporter   --operation Read --topic __consumer_offsets

5.4 配置 kafka_export 服务启动参数,在 kafka-export 安装目录下创建一个文件 start_kafka_export ,内容如下

#!/bin/bash
export KAFKA_SASL_USERNAME="kafka-exporter"
export KAFKA_SASL_PASSWORD="kafka-exporter"
export KAFKA_SASL_MECHANISM="PLAIN"/opt/module/kafka_exporter-1.7.0/kafka_exporter --kafka.server=node2:9092 --web.listen-address=:9308 --zookeeper.server=node2:2181/kafka241 \--sasl.enabled \--sasl.username=$KAFKA_SASL_USERNAME \--sasl.password=$KAFKA_SASL_PASSWORD \--sasl.mechanism=$KAFKA_SASL_MECHANISM

5.5 修改 kafka_export.service 文件,内容如下

[Unit]
Description=kafka_exporter
Wants=prometheus.service
After=network.target prometheus.service[Service]
Type=simple
User=bigdata
Group=bigdata
Restart=on-failure
WorkingDirectory=/opt/module/kafka_exporter-1.7.0
#ExecStart=/opt/module/kafka_exporter-1.7.0/kafka_exporter --kafka.server=node2:9092 --web.listen-address=:9308 --zookeeper.server=node2:2181
ExecStart=/opt/module/kafka_exporter-1.7.0/start_kafka_exporter
[Install]
WantedBy=multi-user.target

5.6 重新加载 systemctl 管理的服务

sudo systemctl daemon-reload

5.7 启动 kafka_exporter 服务

sudo systemctl start kafka_exporter

5.8 上 grafana 查看是否能获取 kafka 状态

六、 参考文档

6.1 官网文档 Apache Kafka

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/web/81584.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

20250528-C#知识:函数简介及函数重载

C#知识:函数简介及函数重载 本文主要介绍函数参数和函数重载相关的知识点 1、函数 函数一般写在类中 一般函数调用 static int Add(int num, int value){num value;return num;}//一般函数调用,发生值类型参数的复制int num 1;Add(num, 1); //调用…

Vue内置指令与自定义指令

一、前言 在 Vue 开发中,指令(Directives) 是一种非常强大的特性,它允许我们以声明式的方式操作 DOM。Vue 提供了一些常用的内置指令,如 v-if、v-show、v-bind、v-on 等,同时也支持开发者根据需求创建自己…

华为AP6050DN无线接入点瘦模式转胖模式

引言 华为AP6050DN是一款企业级商用的无线接入点。由于产品定位原因,其默认工作在瘦模式下,即须经AC统一控制和管理,是不能直接充当普通的无线路由器来使用的。 而本文的目的,就是让其能脱离AC的统一控制和管理,当作普通无线路由器来使用。 硬件准备 华为AP6050DN无线接…

程序员出海之英语-使用手册

为什么现在实时翻译工具这么牛逼了,AI转译这么准确了,我还在这里跟老古董一样吭哧吭哧学英语呢? 这是因为我们始终是和人打交道,不仅仅是为了考试,看懂官方文章,听懂官方视频。这里为什么说官方&#xff0c…

Java 事务管理:在分布式系统中实现可靠的数据一致性

Java 事务管理:在分布式系统中实现可靠的数据一致性 在当今的软件开发领域,分布式系统逐渐成为主流架构。然而,这也给事务管理带来了巨大的挑战。本文将深入探讨 Java 事务管理在分布式系统中的关键要点,并通过详细代码实例展示如…

微信小程序关于截图、录屏拦截

1.安卓 安卓: 在需要禁止的页面添加 onShow() {if (wx.setVisualEffectOnCapture) {wx.setVisualEffectOnCapture({visualEffect: hidden,complete: function(res) {}})}},// 页面隐藏和销毁时需要释放防截屏录屏设置onHide() {if (wx.setVisualEffectOnCapture) {w…

使用 PySpark 从 Kafka 读取数据流并处理为表

使用 PySpark 从 Kafka 读取数据流并处理为表 下面是一个完整的指南,展示如何通过 PySpark 从 Kafka 消费数据流,并将其处理为可以执行 SQL 查询的表。 1. 环境准备 确保已安装: Apache Spark (包含Spark SQL和Spark Streaming)KafkaPySpark对应的Ka…

第十天的尝试

目录 一、每日一言 二、练习题 三、效果展示 四、下次题目 五、总结 一、每日一言 哈哈,十天缺了两天,我写的文章现在质量不高,所以我可能考虑,应该一星期或者三四天出点高质量的文章,同时很开心大家能够学到知识&a…

mediapipe标注视频姿态关键点(基础版加进阶版)

前言 手语视频流的识别有两种大的分类,一种是直接将视频输入进网络,一种是识别了关键点之后再进入网络。所以这篇文章我就要来讲讲如何用mediapipe对手语视频进行关键点标注。 代码 需要直接使用代码的,我就放这里了。环境自己配置一下吧&…

Redis数据迁移方案及持久化机制详解

#作者:任少近 文章目录 前言Redis的持久化机制RDBAOF Redis save和bgsave的区别redis数据迁移redis单机-单机数据迁移redis 主从-主从数据迁移redis 单机-cluster数据迁移redis cluster –redis cluster数据迁移 前言 Redis数据迁移是常见需求,主要包括…

图论回溯

图论 200.岛屿数量DFS 给你一个由 ‘1’(陆地)和 ‘0’(水)组成的的二维网格,请你计算网格中岛屿的数量。岛屿总是被水包围,并且每座岛屿只能由水平方向和/或竖直方向上相邻的陆地连接形成。此外&#xff…

真实网络项目中交换机常用的配置与解析

一、配置三层链路聚合增加链路带宽 1.组网需求 某企业有多个部门分布在不同的地区,由于业务发展的需要,不同区域的部门与部门之间有进行带有VLAN Tag的报文的传输需求。采用透明网桥的远程桥接和QinQ功能,可以实现企业在不同区域部门之间进…

【Redis】过期键删除策略,LRU和LFU在redis中的实现,缓存与数据库双写一致性问题,go案例

一、Redis 中的过期键删除策略有哪些? 采用了 惰性删除 和 定期删除 两种策略处理过期键: 1. 惰性删除(Lazy Deletion) 机制:只有在访问 key 时才检查是否过期,如果已过期则立刻删除。优点:对…

为什么单张表索引数量建议控制在 6 个以内

单张表索引数量建议控制在6个以内的主要原因包括以下几点‌: ‌性能影响‌:索引会占用额外的磁盘空间。如果索引数量过多,会占用大量的磁盘空间,尤其是在数据量较大的情况下,索引占用的空间可能会超过数据本身。此外&…

深度学习实战109-智能医疗随访与健康管理系统:基于Qwen3(32B)、LangChain框架、MCP协议和RAG技术研发

大家好,我是微学AI,今天给大家介绍一下深度学习实战109-智能医疗随访与健康管理系统:基于Qwen3(32B)、LangChain框架、MCP协议和RAG技术研发。在当今医疗信息化快速发展的背景下,医疗随访与健康管理面临着数据分散、信息整合困难、个性化方案生成效率低等挑战。传统的医疗随…

聊一聊 .NET Dump 中的 Linux信号机制

一:背景 1. 讲故事 当 .NET程序 在Linux上崩溃时,我们可以配置一些参考拿到对应程序的core文件,拿到core文件后用windbg打开,往往会看到这样的一句信息 Signal SIGABRT code SI_USER (Sent by kill, sigsend, raise)&#xff0c…

如何在uniapp H5中实现路由守卫

目录 Vue3 app.config.globalProperties 1. 创建 Vue 应用实例 2. 添加全局属性或方法 3. 在组件中使用全局属性或方法 beforeEach在uniapp的注册 1、在H5中这两个对象是都存在的。「router:route」但是功能并不全面,具体可参考下图。 2、刚刚测试了一下,在微信小程序…

无人机降落伞设计要点难点及原理!

一、设计要点 1. 伞体结构与折叠方式 伞体需采用轻量化且高强度的材料(如抗撕裂尼龙或芳纶纤维),并通过多重折叠设计(如三重折叠缝合)减少展开时的阻力,同时增强局部承力区域的强度。 伞衣的几何参数&am…

AI时代新词-AI增强现实(AI - Enhanced Reality)

一、什么是AI增强现实(AI - Enhanced Reality)? AI增强现实(AI - Enhanced Reality)是指将人工智能(AI)技术与增强现实(Augmented Reality,简称AR)技术相结合…

基于Matlab实现各种光谱数据预处理

在IT领域,尤其是在数据分析和科学研究中,光谱数据的预处理是至关重要的步骤。光谱数据通常包含了丰富的信息,但往往受到噪声、杂散光、背景信号等因素的影响,需要通过预处理来提取有效信号,提高分析的准确性和可靠性。…