我们的项目使用了开源的librdkafka库,实现向kafka服务器生产发送数据的功能。使用的librdkafka的版本是1.9.0。
作为客户端程序,在开发时和客户协商确认后,支持了SASL_PLAINTEXT认证。以下概念解释引用自通义千问AI
SASL (Simple Authentication and Security Layer) 是一种框架,允许服务添加认证支持。
Kafka 支持多种 SASL 机制,其中之一就是 PLAINTEXT。
尽管名称中有“PLAINTEXT”,它实际上指的是使用的认证机制(即明文传输用户名和密码),而不是数据传输的安全性。
为了安全起见,通常会结合 SSL/TLS 来加密通信。主要用途:
用户身份验证:确认尝试连接到 Kafka 集群的客户端确实是其所声称的身份。
安全性:虽然 PLAINTEXT 机制本身不提供加密,但它可以与 SSL/TLS 结合使用以确保数据传输的安全性。
kafka服务端(github上下载kafka源码后安装)如何配置:
在 server.properties 文件中启用 SASL 和指定机制:
listeners=SASL_PLAINTEXT://your.host.name:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
依赖librdkafka实现的C++客户端的伪代码设置如下
bool KafkaProducer::init(std::string &brokers, const string &username , const string &passwd, const bool &async, const int &size)
{...if (!username.empty() && !passwd.empty()){HLog(HGET_INFO << L"sasl authentication set, username:" << username << L" ,password:" << passwd);//security.protocol: 安全协议类型,示例为SASL_PLAINTEXTconf_->set("security.protocol", "sasl_plaintext", errstr);//sasl.mechanisms : sasl协议机制,示例为PLAIN, 表示普通文本conf_->set("sasl.mechanisms", "PLAIN", errstr);//sasl.username : 认证用户名conf_->set("sasl.username", username, errstr);//sasl.password : 认证密码 conf_->set("sasl.password", passwd, errstr);}}else{conf_->set("producer.type", "sync", errstr);}...
}
以下概念解释也是来自于通义千问AI
ACLs(Access Control Lists,访问控制列表) 是 Kafka 提供的一种方法,用于控制哪些用户或客户端可以对特定资源执行操作。ACL 定义了谁(principal)、可以在哪个资源上(resource)、执行什么操作(operation)。这里的资源可以是主题、消费者组等。ACL 组成部分
Principal:表示用户身份,通常格式为 User:<username>。
Resource Type:要控制访问权限的资源类型,如 Topic, Group, Cluster, TransactionalId。
Operation:允许的操作类型,包括但不限于 Read, Write, Create, Delete, Describe, Alter, All。
Pattern Type:资源匹配模式,支持 Literal(精确匹配),Prefixed(前缀匹配)等。
Host:指定允许从哪些主机发起请求,默认为 * 表示不限制
要启用 ACL 支持,你需要在 Kafka broker 的配置文件 server.properties 中设置以下参数:authorizer.class.name默认情况下,Kafka 使用简单的基于 Zookeeper 的 ACL 管理方式。为了启用 ACL 支持,你需要指定一个授权器类。最常用的授权器是 kafka.security.auth.SimpleAclAuthorizer。
authorizer.class.name=kafka.security.authorizer.AclAuthorizer下面是开启了ACL后的相关设置示例,允许某个用户读写某个topic
(1)查看:在kafka-acls.sh脚本中传入list参数来查看ACL授权
./kafka-acls.sh --list --authorizer-properties zookeeper.connect=localhost:2181(2)配置ACL来让writer用户有权限写入test这个topic
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:writer --operation Write --topic test(3)为reader用户设置test topic的读权限
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:reader --operation Read --topic test(4)设置访问group的权限
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:reader --operation Read --group test-group
问题1:
某天客户反馈kafka服务器收不到数据了,我们拿回了日志。发现了日志中报错
[1][2025-06-24][11:57:03][T2709509888][KafkaProducer.cpp ][0111][I]Kafka produce failed:Broker: Topic authorization failed .Message:{"db_name":"UniMonDB","dtagentpackagecusttime":"2023-11-03 13:29:23.0000","dtdevdowntime":"2025-06-17 15:11:49.0000","dtdevfirstfoundtime":"2024-02-20 15:47:47.0000","dtdevlastofftime":0,"dtdevstarttime":"2025-06-12 18:06:00.0000","dtdevuptime":"2025-06-24 12:41:28.0000","dtfoundlongtermnotrunningtime":"0-00-00 00:00:00.0000","dtlasttimeofuaagentdown":"2025-06-23 16:14:26.0000","dtpwdlastset":"0-00-00 00:00:00.0000","dttimeout":"0-00-00 00:00:00.0000","iagentpackagetype":64,"iagenttype":2,"iconnifno":39,"idatacollectby":0,"idevrecordid":71,"idevscrappedstatus":0,"idevtype":10,"idevtypeusedbytopo":0,"idot1xver":1,"iencryptstatus":0,"iextenstatus":0,"igetstatus":1,"ihastag":0,"iisaddedbyclient":0,"iisattrsetbyclient":0,"iisbelongtounit":1,"iisbind":0,"iisdevtypesetbyclient":0,"iisfoundbytopo":0,"iishidden":0,"iisinad":0,"iisiot":0,"iislongtermnotrunning":0,"imanuldept":0,"imultios":1,"inetdisposalstatus":0,"iregisterresult":0,"iroamingdevice":0,"isafescore":0,"iselfcheckscore":100,"isnmpagentstatus":0,"isolation":0,"istatus":1,"isysservices":0,"iuniaccessagentoldstatus":2,"iuniaccessagentstatus":1,"ivalnformat":0,"recycledstatus":0,"source_ip":"99.96.0.81","stradaccount":"","stragentpackagename":"V10820-20231103(办公网版)","strassetid":"","strbaiduloc":"","strbelong":"","strbelongdeptid":"","strclientappid":"","strconnifname":"GigabitEthernet0/0/34","strconnswitchname":"SYD-OA-S5720-02","strdevalias":"","strdevdesc":"","strdevgip":"99.96.16.187","strdevidentiy":"DC8531FB-22E0-468B-ABD5-AD6B1E53AB9F","strdevip":"99.96.16.187","strdevname":"ASDO-CBMF.sydney.cmbchina.cn","strdevoid":"","strdomain":"sydney.cmbchina.cn","stremailaccount":"","strextend":"","strgpsx":"","strgpsy":"","strip1":"099096016187","strlocation":"","strmac":"7C:57:58:10:5F:14","strnatip":"99.96.16.187","strnatip1":"","strnet":"99.96.16.0/24","strreportlink":"/notifymsg/devreport/7cda64088ed3d9ed33cb37f06953c22d.html","strres1":"","strres2":"","strsafeclass":"","strservices":"","strverofuaagent":"3.5.10820.3","strxloc":"","stryloc":"","table_name":"tbl_devbaseinfo","uidconnswitchid":"68EFEC46-C6F9-435A-8AAA-566E42E78000","uiddeptid":"8b4e251a-0560-44cb-98b5-49bbb5add077","uiddevrecordid":"DC8531FB-22E0-468B-ABD5-AD6B1E53AB9F","uiddomainid":"SL832322282903524504","uiduserid":"50381d22-ecaa-4ed5-b3e2-4120b77673d8"}
根据这个日志,找到对应的代码,可以知道第三方库报错的其实是”Broker: Topic authorization failed “
RdKafka::ErrorCode resp = producer_->produce(tpk, partition,RdKafka::Producer::RK_MSG_COPY,const_cast<char *>(data), size,key.c_str(), key.size(), NULL);if (resp != RdKafka::ERR_NO_ERROR){HString strLog;strLog << L"Kafka produce failed:" << HString(RdKafka::err2str(resp)) << L" .Message:" << HString(data);HASGlobal::pins()->mpFail->log(HASGlobal::pins()->mpFail->get(LEL_TIPS, __WFILE__, __LINE__) << strLog);...}
认证失败的报错,由此想到可能是客户端和服务端关于SASL认证的相关配置是否有差异造成。或者是开启了ACL,但是没有为这个用户开放写topic的权限
要求用户提供kafka服务端SASL的相关配置,和客户端核对无误后,要求客户运维和客户的开发沟通排查是否是ACL的相关权限问题导致,最终客户运维反馈是客户的开发同事没有为这个用户配置对于这个topic的写权限,导致客户端生产数据传给客户的kafka服务器的某个topic时,报错”Broker: Topic authorization failed“,在添加了配置后,生产者的数据可正常写入。
问题2:
客户端程序使用./手动执行后,有如下报错。
需要在/etc/hosts 文件中加上kafka服务器ip和主机名的对应关系。添加配置后可以客户端可以正常生产发送数据到kafka服务器