基于librdkafka开发的C++客户端,生产者生产发送数据失败问题处理

在这里插入图片描述
我们的项目使用了开源的librdkafka库,实现向kafka服务器生产发送数据的功能。使用的librdkafka的版本是1.9.0。

作为客户端程序,在开发时和客户协商确认后,支持了SASL_PLAINTEXT认证。以下概念解释引用自通义千问AI

SASL (Simple Authentication and Security Layer) 是一种框架,允许服务添加认证支持。
Kafka 支持多种 SASL 机制,其中之一就是 PLAINTEXT。
尽管名称中有“PLAINTEXT”,它实际上指的是使用的认证机制(即明文传输用户名和密码),而不是数据传输的安全性。
为了安全起见,通常会结合 SSL/TLS 来加密通信。主要用途:
用户身份验证:确认尝试连接到 Kafka 集群的客户端确实是其所声称的身份。
安全性:虽然 PLAINTEXT 机制本身不提供加密,但它可以与 SSL/TLS 结合使用以确保数据传输的安全性。

kafka服务端(github上下载kafka源码后安装)如何配置:
在 server.properties 文件中启用 SASL 和指定机制:
listeners=SASL_PLAINTEXT://your.host.name:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

依赖librdkafka实现的C++客户端的伪代码设置如下

bool KafkaProducer::init(std::string &brokers, const string &username , const string &passwd, const bool &async, const int &size)
{...if (!username.empty() && !passwd.empty()){HLog(HGET_INFO << L"sasl authentication set, username:" << username << L" ,password:" << passwd);//security.protocol: 安全协议类型,示例为SASL_PLAINTEXTconf_->set("security.protocol", "sasl_plaintext", errstr);//sasl.mechanisms : sasl协议机制,示例为PLAIN, 表示普通文本conf_->set("sasl.mechanisms", "PLAIN", errstr);//sasl.username : 认证用户名conf_->set("sasl.username", username, errstr);//sasl.password : 认证密码		conf_->set("sasl.password", passwd, errstr);}}else{conf_->set("producer.type", "sync", errstr);}...
}

以下概念解释也是来自于通义千问AI

ACLs(Access Control Lists,访问控制列表) 是 Kafka 提供的一种方法,用于控制哪些用户或客户端可以对特定资源执行操作。ACL 定义了谁(principal)、可以在哪个资源上(resource)、执行什么操作(operation)。这里的资源可以是主题、消费者组等。ACL 组成部分
Principal:表示用户身份,通常格式为 User:<username>。
Resource Type:要控制访问权限的资源类型,如 Topic, Group, Cluster, TransactionalId。
Operation:允许的操作类型,包括但不限于 Read, Write, Create, Delete, Describe, Alter, All。
Pattern Type:资源匹配模式,支持 Literal(精确匹配),Prefixed(前缀匹配)等。
Host:指定允许从哪些主机发起请求,默认为 * 表示不限制
要启用 ACL 支持,你需要在 Kafka broker 的配置文件 server.properties 中设置以下参数:authorizer.class.name默认情况下,Kafka 使用简单的基于 Zookeeper 的 ACL 管理方式。为了启用 ACL 支持,你需要指定一个授权器类。最常用的授权器是 kafka.security.auth.SimpleAclAuthorizer。
authorizer.class.name=kafka.security.authorizer.AclAuthorizer下面是开启了ACL后的相关设置示例,允许某个用户读写某个topic
(1)查看:在kafka-acls.sh脚本中传入list参数来查看ACL授权
./kafka-acls.sh --list --authorizer-properties zookeeper.connect=localhost:2181(2)配置ACL来让writer用户有权限写入test这个topic
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:writer --operation Write --topic test(3)为reader用户设置test topic的读权限
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:reader --operation Read --topic test(4)设置访问group的权限
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:reader --operation Read --group test-group

问题1:
某天客户反馈kafka服务器收不到数据了,我们拿回了日志。发现了日志中报错
[1][2025-06-24][11:57:03][T2709509888][KafkaProducer.cpp ][0111][I]Kafka produce failed:Broker: Topic authorization failed .Message:{"db_name":"UniMonDB","dtagentpackagecusttime":"2023-11-03 13:29:23.0000","dtdevdowntime":"2025-06-17 15:11:49.0000","dtdevfirstfoundtime":"2024-02-20 15:47:47.0000","dtdevlastofftime":0,"dtdevstarttime":"2025-06-12 18:06:00.0000","dtdevuptime":"2025-06-24 12:41:28.0000","dtfoundlongtermnotrunningtime":"0-00-00 00:00:00.0000","dtlasttimeofuaagentdown":"2025-06-23 16:14:26.0000","dtpwdlastset":"0-00-00 00:00:00.0000","dttimeout":"0-00-00 00:00:00.0000","iagentpackagetype":64,"iagenttype":2,"iconnifno":39,"idatacollectby":0,"idevrecordid":71,"idevscrappedstatus":0,"idevtype":10,"idevtypeusedbytopo":0,"idot1xver":1,"iencryptstatus":0,"iextenstatus":0,"igetstatus":1,"ihastag":0,"iisaddedbyclient":0,"iisattrsetbyclient":0,"iisbelongtounit":1,"iisbind":0,"iisdevtypesetbyclient":0,"iisfoundbytopo":0,"iishidden":0,"iisinad":0,"iisiot":0,"iislongtermnotrunning":0,"imanuldept":0,"imultios":1,"inetdisposalstatus":0,"iregisterresult":0,"iroamingdevice":0,"isafescore":0,"iselfcheckscore":100,"isnmpagentstatus":0,"isolation":0,"istatus":1,"isysservices":0,"iuniaccessagentoldstatus":2,"iuniaccessagentstatus":1,"ivalnformat":0,"recycledstatus":0,"source_ip":"99.96.0.81","stradaccount":"","stragentpackagename":"V10820-20231103(办公网版)","strassetid":"","strbaiduloc":"","strbelong":"","strbelongdeptid":"","strclientappid":"","strconnifname":"GigabitEthernet0/0/34","strconnswitchname":"SYD-OA-S5720-02","strdevalias":"","strdevdesc":"","strdevgip":"99.96.16.187","strdevidentiy":"DC8531FB-22E0-468B-ABD5-AD6B1E53AB9F","strdevip":"99.96.16.187","strdevname":"ASDO-CBMF.sydney.cmbchina.cn","strdevoid":"","strdomain":"sydney.cmbchina.cn","stremailaccount":"","strextend":"","strgpsx":"","strgpsy":"","strip1":"099096016187","strlocation":"","strmac":"7C:57:58:10:5F:14","strnatip":"99.96.16.187","strnatip1":"","strnet":"99.96.16.0/24","strreportlink":"/notifymsg/devreport/7cda64088ed3d9ed33cb37f06953c22d.html","strres1":"","strres2":"","strsafeclass":"","strservices":"","strverofuaagent":"3.5.10820.3","strxloc":"","stryloc":"","table_name":"tbl_devbaseinfo","uidconnswitchid":"68EFEC46-C6F9-435A-8AAA-566E42E78000","uiddeptid":"8b4e251a-0560-44cb-98b5-49bbb5add077","uiddevrecordid":"DC8531FB-22E0-468B-ABD5-AD6B1E53AB9F","uiddomainid":"SL832322282903524504","uiduserid":"50381d22-ecaa-4ed5-b3e2-4120b77673d8"}

根据这个日志,找到对应的代码,可以知道第三方库报错的其实是”Broker: Topic authorization failed “

RdKafka::ErrorCode resp = producer_->produce(tpk, partition,RdKafka::Producer::RK_MSG_COPY,const_cast<char *>(data), size,key.c_str(), key.size(), NULL);if (resp != RdKafka::ERR_NO_ERROR){HString strLog;strLog << L"Kafka produce failed:" << HString(RdKafka::err2str(resp)) << L" .Message:" << HString(data);HASGlobal::pins()->mpFail->log(HASGlobal::pins()->mpFail->get(LEL_TIPS, __WFILE__, __LINE__) << strLog);...}

认证失败的报错,由此想到可能是客户端和服务端关于SASL认证的相关配置是否有差异造成。或者是开启了ACL,但是没有为这个用户开放写topic的权限

要求用户提供kafka服务端SASL的相关配置,和客户端核对无误后,要求客户运维和客户的开发沟通排查是否是ACL的相关权限问题导致,最终客户运维反馈是客户的开发同事没有为这个用户配置对于这个topic的写权限,导致客户端生产数据传给客户的kafka服务器的某个topic时,报错”Broker: Topic authorization failed“,在添加了配置后,生产者的数据可正常写入。

问题2:
客户端程序使用./手动执行后,有如下报错。
需要在/etc/hosts 文件中加上kafka服务器ip和主机名的对应关系。添加配置后可以客户端可以正常生产发送数据到kafka服务器
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/912474.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/912474.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OpenGL之yaw、pitch、fov 和 lookAt

在 3D 图形学中&#xff0c;yaw、pitch、fov 和 lookAt 都是控制摄像机&#xff08;Camera&#xff09;行为的关键参数&#xff0c;但它们的 作用层级 和 使用场景 不同。 1. yaw、pitch、fov 的作用 (1) yaw&#xff08;偏航角&#xff09; 作用&#xff1a;控制摄像机 左右…

STM32-第一节-新建工程,GPIO,点亮LED,蜂鸣器

一、新建工程&#xff1a; 1.Keil中新建工程&#xff0c;选择开发板型号。 2.工程文件夹建立Start&#xff0c;Library等分类&#xff0c;复制模版工程中的文件到工程文件夹中。 3.在Keil中添加分组&#xff0c;添加文件。 4.工程选项设置&#xff1a; c/c中&#xff1a;Inc…

Rust标量、复合类型与自定义类型、第三方并发结构

以下是 Rust 中标量类型、对象类型&#xff08;含结构体、复合类型、堆分配类型&#xff09;以及常用第三方并发数据结构的完整分类、示例和区别对比&#xff0c;帮助你系统掌握它们的本质异同&#xff1a; &#x1f7e2; 一、标量类型&#xff08;Scalar Types&#xff0c;存储…

基于STM32温湿度检测—串口显示

基于STM32温湿度检测 &#xff08;仿真&#xff0b;程序&#xff09; 功能介绍 具体功能&#xff1a; 1.使用DHT11检测温湿度&#xff1b; 2.单片机处理完控制LCD1602显示温湿度&#xff1b; 3.单片机也通过串口显示检测到的温湿度&#xff1b; 添加图片注释&#xff0c;不…

Windows 10 查询 Nginx 进程教程

1. 打开命令提示符&#xff08;CMD&#xff09; 按 Win R&#xff0c;输入 cmd&#xff0c;回车。或者在开始菜单搜索栏输入“cmd”&#xff0c;选择“命令提示符”。 2. 查看是否有正在运行的 Nginx 进程 输入命令&#xff1a; tasklist | findstr nginx这个命令会列出所有…

使用 Kafka 优化物流系统的实践与思考

使用 Kafka 优化物流系统的实践与思考 在现代物流系统中&#xff0c;订单处理、仓储管理、运输调度等环节复杂且实时性要求高。为了满足异步解耦、高吞吐、高可用、事件驱动和数据可靠性等需求&#xff0c;Kafka 作为分布式消息队列和流处理平台&#xff0c;成为了我们的首选。…

Rust中模式匹配let Some(gas_price) = tx.gas_price用法

你问得非常好&#xff0c;let Some(gas_price) tx.gas_price 是 Rust 中的一种模式匹配写法&#xff0c;它用于从 Option 类型中提取值。 ✅ 背景知识&#xff1a;什么是 Option&#xff1f; 在 Rust 中&#xff0c;如果一个值可能存在也可能不存在&#xff0c;就会用 Option…

什么是LLM大语言模型

什么是LLM大语言模型 LLM的全称是&#xff0c;Large Language Model&#xff0c;简称LLM&#xff0c;翻译为大语言模型&#xff0c;其核心是模拟人类语言的复杂规律&#xff0c;实现语义理解、推理分析、文本生成等任务&#xff0c;主要目的是实现能读懂和说出人类语言的模型。…

杂谈-架构时代演进

关于未来 5-10 年软件系统演化方向 1. 云原生 ➝ 超云原生&#xff08;Post Cloud Native&#xff09; Kubernetes 平台自治化&#xff1a; K8s Operator 日益强大&#xff0c;逐步具备自愈、自动扩缩容、自动调优能力。 云厂商与企业私有云逐步融合为一体…

如何查看服务器的运行日志?

&#x1f7e2; 一、Linux服务器 Linux日志都在**/var/log**目录下&#xff0c;最常用的有&#xff1a; &#x1f4c2; 常用日志文件 文件内容/var/log/messages大部分系统日志&#xff08;CentOS常见&#xff09;/var/log/syslog系统消息日志&#xff08;Ubuntu/Debian常见&a…

在幸狐RV1106开发板上用gcc14.2本地编译安装postgresql 17.5数据库

在幸狐RV1106开发板上用gcc14.2本地编译安装postgresql 17.5数据库 编译环境&#xff1a; RV1106G3 Linux luckfox-rv1106 5.10.160 #3 Fri Jun 27 14:16:20 AWST 2025 armv7l GNU/Linux BusyBox v1.36.1 gcc version 14.2.0 (GCC) GNU ld (GNU Binutils) 2.44 GNU Make 4.4 n…

Go语言中map[string]interface{} 和 map[string]string的区别

在 Go 语言中&#xff0c;map[string]interface{} 和 map[string]string 是两种不同类型的 map&#xff0c;它们的主要区别在于值的类型以及这种差异带来的使用场景和灵活性的不同。 1. 值的类型 map[string]interface{}&#xff1a;这里的 interface{} 表示 Go 中的空接口类型…

AdGuard Home 安装及使用

AdGuard Home 是 AdGuard 开源的一个私人 DNS 服务端,只需在网关部署,即可实现全局域网的广告拦截与隐私反追踪。在 DNS 解析的过程中,匹配规则库内的 URL 进行拦截,同时在客户端中,还可以通过自定义过滤规则实现网页 DOM 的拦截。 基于 Golang 编写的 AdGuard Home,官方…

绕过 GraphQL 暴力破解保护

题目要求&#xff1a; 本实验的用户登录机制由 GraphQL API 提供支持。API 终端节点有一个速率限制器&#xff0c;如果它在短时间内收到来自同一源的太多请求&#xff0c;则会返回错误。 要解决实验问题&#xff0c;请暴力破解登录机制以 .使用身份验证实验室密码列表作为密码…

C/C++ 使用rapidjson库 操作Json格式文件(创建、插入、解析、修改、删除)

目录 一、前言 1.简介 2.兼容性 3.下载 4.安装 5.官方文档 6.自述 二、封装Json 1. 创建一个 Document 对象 2. "key":"value" 3. { } 4. [ ] 5. [ { }, { } ] 6. [ [ ], [ ] ] 7. { [ ], [ ] } 8. { { }, { } } 9. 将Document转换为字符串…

免安装一键修复网络诊断 + 权限修复!打印机共享错误工具适配 Win7/10/11

各位打印小能手们&#xff01;你们有没有遇到过共享打印机出问题&#xff0c;搞得自己焦头烂额的情况&#xff1f;比如系统一更新&#xff0c;打印机就连不上&#xff0c;打印任务失败&#xff0c;真的是让人崩溃啊&#xff01;别慌&#xff0c;今天就给大家全面介绍一款打印机…

电脑分屏快捷键5

按window右箭头&#xff1a; 按window左箭头&#xff1a;

nt!CcFlushCache函数分析之nt!CcFindBcb

nt!CcFindBcb函数分析 第一部分&#xff1a; 1: kd> p nt!CcAcquireByteRangeForWrite0x377: 80a13c49 e866e4ffff call nt!CcFindBcb (80a120b4) 1: kd> t nt!CcFindBcb: 80a120b4 55 push ebp 1: kd> kc # 00 nt!CcFindBcb 01 nt!CcAcqu…

矩阵及矩阵快速幂

一.矩阵与模板 【模板】矩阵求和 时间限制&#xff1a;1秒 内存限制&#xff1a;128M 题目描述 给出两个&#x1d45b;行&#x1d45a;列的矩阵&#xff0c;求两个矩阵的和 输入描述 第一行输入两个以空格分隔的整数&#x1d45b;,&#x1d45a;&#xff0c;表示矩…

rk3588获取探维雷达数据

可以在上期部署完 FASTLIO 的工作空间内&#xff0c;继续部署探维雷达的驱动程序。不要问为什么不用 mid360&#xff0c;因为我手上只有探维雷达。 探维雷达的驱动链接&#xff1a;https://github.com/TanwayLab/tanwaylidar_view/blob/main/README.md 下载驱动并编译 因为沿…