单机Kafka配置ssl并在springboot使用

目录

  • SSL证书
    • 生成根证书
    • 生成服务端和客户端证书
      • 生成keystore.jks和truststore.jks
      • 辅助脚本
      • 单独生成truststore.jks
  • 环境配置
    • hosts文件
    • kafka server.properties配置ssl
  • 启动kafka
  • kafka基础操作
  • springboot集成
    • 准备工作
    • 需要配置的文件
    • 开始消费

SSL证书

证书主要包含两大类,一个是根证书,用于签发和认证证书。其他证书可以用同一个根证书签发,也可以用不同的根证书签发各自的证书,使用同一个的话比较方便管理,这样所有节点的trust可以公用,即只需要生成一次,其他节点复制就可以。
整个证书生成过程大概如下图:
在这里插入图片描述
最终用于认证的是keystore.jks和truststore.jks,两个证书的作用分别是:
keystore.jks:证明自己的身份,自己的keystore.jks是由别人truststore.jks包含的ca-cert.pem签发就可以证明
truststore.jks:认证别人是否可信,看到别人的keystore.jks里有自己truststore.jks包含的ca-cert.pem就认为可信
这里要注意的是,如果要信任别人,就要在truststore中导入别人的根证书,这里是因为用的同一个根证书签发,所以导入的根证书一样,否则应该交叉导入,也就是客户端导入服务端的,服务端导入客户端的。

备注:后面的脚本直接复制使用有可能会出现下面的错误,这是由于不同系统的换行符不一致导致的,需要转换成对应系统兼容的

$'\r': command not found
: invalid optionet: -
set: usage: set [-abefhkmnptuvxBCHP] [-o option-name] [--] [arg ...]

以在Linux中使用为例,可以使用Notepad++按照以下操作路径修改一下保存之后覆盖掉就可以了。

在这里插入图片描述

生成根证书

生成根证书包含流程图的第一步,这时会生成根证书和他的私钥,命令脚本如下:

#!/bin/bashset -e# === 配置部分 需要根据自己的实际情况进行调整===
#根证书
CA_CERT="ca-cert.pem"
#根证书私钥
CA_KEY="ca-key.pem"
#有效期
VALIDITY=365
#subj
SUBJ="/CN=KafkaCA"
# 检查目录
if [ ! -d "/usr/ca/ssl" ]; thenmkdir -p /usr/ca/sslchmod 700 /usr/ca/ssl
ficd /usr/ca/ssl
# 检查文件是否已存在
if [ -f "$CA_CERT" ] || [ -f "$CA_KEY" ]; thenecho "错误: CA证书或私钥已存在,请先删除或备份现有文件"exit 1
fi#正式生成证书,以下内容可以不用调整
echo "=== 步骤 1: 生成自签名 CA ==="
openssl req -new -x509 \-keyout $CA_KEY \-out $CA_CERT \-days $VALIDITY \-nodes \-subj $SUBJ# 设置文件权限
chmod 600 "$CA_KEY"

有效命令其实就是最后一句,其他的是因为放在脚本中所以进行一些通用配置,方便复用

生成服务端和客户端证书

这个阶段包含流程图的2-7,在都用同一个CA的情况下,步骤7只需要执行一次,然后复制到所有需要用到的计算机中就可以。

生成keystore.jks和truststore.jks

#!/bin/bashset -e# === 配置部分 需要根据自己的实际情况进行调整===
ALIAS="kafka"
KEYSTORE="keystore.jks"
TRUSTSTORE="truststore.jks"
CSR="sign.csr"
SIGN="signed.crt"
STOREPASS="123456"
KEYPASS="654321"
DNAME="CN=localhost, OU=IT, O=Kafka, L=City, S=State, C=CN"
VALIDITY=365
#上一步生成的根证书
CA_CERT="/usr/ca/ssl/ca-cert.pem"
CA_KEY="/usr/ca/ssl/ca-key.pem"
# 检查目录
if [ ! -d "/usr/ca/ssl" ]; thenecho "错误: 目录 /usr/ca/ssl 不存在"exit 1
ficd /usr/ca/ssl#正式生成各个证书,以下内容可以不用调整
echo "=== 步骤 1: 生成Keystore证书和私钥 ==="
keytool -genkeypair \-alias $ALIAS \-keyalg RSA \-keysize 2048 \-validity $VALIDITY \-keystore $KEYSTORE \-storepass $STOREPASS \-keypass $KEYPASS \-dname "$DNAME"echo "=== 步骤 2: 生成证书签名请求 (CSR) ==="
keytool -keystore $KEYSTORE \-alias $ALIAS \-certreq \-file $CSR \-storepass $STOREPASSecho "=== 步骤 3: 使用 CA 签名证书 ==="
openssl x509 -req \-CA $CA_CERT \-CAkey $CA_KEY \-in $CSR \-out $SIGN \-days $VALIDITY \-CAcreateserialecho "=== 步骤 4: 将 CA 根证书导入Keystore ==="
keytool -keystore $KEYSTORE \-alias CARoot \-import -file $CA_CERT \-storepass $STOREPASS -nopromptecho "=== 步骤 5: 将签名证书导入 Keystore ==="
keytool -keystore $KEYSTORE \-alias $ALIAS \-import -file $SIGN \-storepass $STOREPASS -noprompt#使用同一个CA证书,在多个计算机使用时,下面这步可以只执行一次,每次新生成也不影响
echo "=== 步骤 6: 创建Truststore(导入 CA 根证书) ==="
keytool -keystore $TRUSTSTORE \-alias CARoot \-import -file $CA_CERT \-storepass $STOREPASS -noprompt

辅助脚本

如果正在生成服务端证书,需要把相关证书配置到server.properties可以在上面脚本中增加一下内容:

#顺便生成后续Kafka要配置的内容,直接复制到server.properties文件
cat <<EOF############ SSL 配置 - server.properties 中添加 ############listeners=SSL://:9092
#下面的localhost需要改成ip,否则只有自己能连上
advertised.listeners=SSL://localhost:9092
security.inter.broker.protocol=SSL
ssl.endpoint.identification.algorithm=
ssl.keystore.location=$(pwd)/$KEYSTORE
ssl.keystore.password=$STOREPASS
ssl.key.password=$KEYPASS
ssl.truststore.location=$(pwd)/$TRUSTSTORE
ssl.truststore.password=$STOREPASS
#这里配置成双向认证
ssl.client.auth=required
# 不验证客户端证书
#ssl.client.auth=none  #############################################################EOF

如果是为客户端生成证书,可以增加一下内容:

#如果是客户端就增加使用以下脚本生成的文件去执行Kafka相关命令
echo "=== 创建 Kafka 客户端配置文件 client.properties ==="
cat <<EOF > client.properties
security.protocol=SSL
ssl.truststore.location=$(pwd)/$TRUSTSTORE
ssl.truststore.password=$STOREPASS
ssl.endpoint.identification.algorithm=
group.id=test-group
#如果单向认证就不用添加下面三个配置
ssl.keystore.location=$(pwd)/$KEYSTORE
ssl.keystore.password=$STOREPASS
ssl.key.password=$KEYPASS
EOF

单独生成truststore.jks

#!/bin/bashset -e
# === 配置部分 需要根据自己的实际情况进行调整===
TRUSTSTORE="truststore.jks"
STOREPASS="123456"
#信任的根证书
CA_CERT="/usr/ca/ssl/ca-cert.pem"
echo "=== 步骤 6: 创建Truststore(导入 CA 根证书) ==="
keytool -keystore $TRUSTSTORE \-alias CARoot \-import -file $CA_CERT \-storepass $STOREPASS -noprompt

环境配置

hosts文件

文件位置:
Windows hosts:C:\Windows\System32\drivers\etc\hosts
Linux hosts:/etc/hosts
添加内容:公网IP kafka
如果是本机使用也可以直接用内网IP

kafka server.properties配置ssl

把生成jks那步输出的内容增加到server.properties中就可以,如果不是第一次配置,就只增加自己需要配置的内容即可。或者没有增加辅助脚本的话,直接把下面内容中keystore和truststore的位置手动替换一下就行:

#下面两项原来如果已经配置过就不要重复
listeners=SSL://:9092
#下面的localhost需要改成ip,否则只有自己能连上
advertised.listeners=SSL://localhost:9092security.inter.broker.protocol=SSL
ssl.endpoint.identification.algorithm=
ssl.keystore.location=这里替换成keystore的路径
ssl.keystore.password=keystore的密码
ssl.key.password=key的密码
ssl.truststore.location=这里替换成truststore的路径
ssl.truststore.password=truststore的密码#这里配置成双向认证
ssl.client.auth=required
# 不验证客户端证书
#ssl.client.auth=none  

启动kafka

cd到kafka安装路径下可以直接执行下面命令,或者使用绝对路径
/bin/zookeeper-server-start.sh -daemon /config/zookeeper.properties
/bin/kafka-server-start.sh -daemon /config/server.properties

kafka基础操作

client.properties的路径要换成自己的
创建topic

 bin/kafka-topics.sh   --create --bootstrap-server kafka:9092 --replication-factor 1 --partitions 1 --topic test-topic  --command-config /usr/local/kafka/ssl/client.properties

查看topic

 bin/kafka-topics.sh --list --bootstrap-server kafka:9092  --command-config /usr/ca/ssl-server/ssl-client/client-ssl.properties

生成消息

 bin/kafka-console-producer.sh  --bootstrap-server  kafka:9092 --topic test-topic --producer.config /usr/local/kafka/ssl/client.properties

springboot集成

准备工作

1、生成客户端keystore.jks和truststore.jks
这时候spring程序作为客户端,所以需要为他生成一个keystore.jks和truststore.jks,然后放到项目或别的位置,配置到项目中用来认证。如果只是暂时测试一下是否能连通,也可以讨巧,直接用服务端的同一套keystore.jks和truststore.jks,但是这样的操作不能用到正式中。

2、修改项目所在计算机的hosts文件

需要配置的文件

pom:引入kafka依赖,有说需要版本对应的,我直接没有指定版本也是可以的

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

yml:增加Kafka配置项,当然也可以放到代码里

spring:kafka:#kafka代理地址bootstrap-servers: kafka:9092ssl:protocol: SSL###服务端证书配置的时候设置的密码#broke对client的认证,ssl.client.auth=required时需要key的配置key-store-location: classpath:/certs/keystore.jkskey-store-password: 123456key-password: 654321#client对broke的认证trust-store-password: 123456trust-store-location: classpath:/certs/truststore.jkskey-store-type: JKS#不验证主机名  properties:ssl:endpoint:identification:algorithm: ''security:protocol: SSL#认证的配置就到这里了,下面的配置可以根据自己的习惯配置#消息发送失败重试次数producer:retries: 0# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: consumer-dev-groupauto-offset-reset: earliestenable-auto-commit: falsemax-poll-records: 30# 指定消息key和消息体的编解码方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:ack-mode: manualtype: batchconcurrency: 1#kafka监听的topic和group
report:kafka:#接收kafka消息的topic和groupproducerTopic: test-topicreportGroup: test-group

开始消费

如果需要生成可以找别的教程,我直接通过命令进行生产的,只是写了一个消费(后面有时间可以补上)


import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;import java.util.List;@Slf4j
@Component
public class Consumer {@KafkaListener(topics = "${report.kafka.producerTopic}", groupId = "${report.kafka.reportGroup}")public void reportConsumer(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {log.info("---------- 从Kafka上接收消息 -----");for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {log.info("offset是" + consumerRecord.offset() + "," + consumerRecord.partition());String value = consumerRecord.value();System.out.println("接到的内容:"+value);//具体的业务处理逻辑可以写在后面}// 手动批量ackack.acknowledge();log.info("kafka提交成功");}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/bicheng/82505.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PCB设计教程【入门篇】——电路分析基础-元件数据手册

前言 本教程基于B站Expert电子实验室的PCB设计教学的整理&#xff0c;为个人学习记录&#xff0c;旨在帮助PCB设计新手入门。所有内容仅作学习交流使用&#xff0c;无任何商业目的。若涉及侵权&#xff0c;请随时联系&#xff0c;将会立即处理 目录 前言 一、数据手册的重要…

Vue2实现Office文档(docx、xlsx、pdf)在线预览

&#x1f31f; 前言 欢迎来到我的技术小宇宙&#xff01;&#x1f30c; 这里不仅是我记录技术点滴的后花园&#xff0c;也是我分享学习心得和项目经验的乐园。&#x1f4da; 无论你是技术小白还是资深大牛&#xff0c;这里总有一些内容能触动你的好奇心。&#x1f50d; &#x…

【辰辉创聚生物】JAK-STAT信号通路相关蛋白:细胞信号传导的核心枢纽

在细胞间复杂的信号传递网络中&#xff0c;Janus 激酶 - 信号转导和转录激活因子&#xff08;JAK-STAT&#xff09;信号通路犹如一条高速信息公路&#xff0c;承担着传递细胞外信号、调控基因表达的重要使命。JAK-STAT 信号通路相关蛋白作为这条信息公路上的 “关键节点” 和 “…

OceanBase数据库从入门到精通(运维监控篇)

文章目录 一、OceanBase 运维监控体系概述二、OceanBase 系统表与元数据查询2.1 元数据查询基础2.2 核心系统表详解2.3 分区元数据查询实战三、OceanBase 性能监控SQL详解3.1 关键性能指标监控3.2 SQL性能分析实战四、OceanBase 空间使用监控4.1 表空间监控体系4.2 空间使用趋势…

linux 进程间通信_共享内存

目录 一、什么是共享内存&#xff1f; 二、共享内存的特点 优点 缺点 三、使用共享内存的基本函数 1、创建共享内存shmget() 2、挂接共享内存shmat 3、脱离挂接shmdt 4、共享内存控制shmctl 5.查看和删除共享内存 comm.hpp server.cc Client.cc Makefile 一、什么…

Spring Boot 登录实现:JWT 与 Session 全面对比与实战讲解

Spring Boot 登录实现&#xff1a;JWT 与 Session 全面对比与实战讲解 2025.5.21-23:11今天在学习黑马点评时突然发现用的是与苍穹外卖jwt不一样的登录方式-Session&#xff0c;于是就想记录一下这两种方式有什么不同 在实际开发中&#xff0c;登录认证是后端最基础也是最重要…

Vue中的 VueComponent

VueComponent 组件的本质 Vue 组件是一个可复用的 Vue 实例。每个组件本质上就是通过 Vue.extend() 创建的构造函数&#xff0c;或者在 Vue 3 中是由函数式 API&#xff08;Composition API&#xff09;创建的。 // Vue 2 const MyComponent Vue.extend({template: <div…

使用 FFmpeg 将视频转换为高质量 GIF(保留原始尺寸和帧率)

在制作教程动图、产品展示、前端 UI 演示等场景中,我们经常需要将视频转换为体积合适且清晰的 GIF 动图。本文将详细介绍如何使用 FFmpeg 工具将视频转为高质量 GIF,包括: ✅ 保留原视频尺寸或自定义缩放✅ 保留原始帧率或自定义帧率✅ 使用调色板优化色彩质量✅ 降低体积同…

【自然语言处理与大模型】大模型Agent四大的组件

大模型Agent是基于大型语言模型构建的智能体&#xff0c;它们能够模拟独立思考过程&#xff0c;灵活调用各类工具&#xff0c;逐步达成预设目标。这类智能体的设计旨在通过感知、思考与行动三者的紧密结合来完成复杂任务。下面将从大模型大脑&#xff08;LLM&#xff09;、规划…

《软件工程》第 11 章 - 结构化软件开发

结构化软件开发是一种传统且经典的软件开发方法&#xff0c;它强调将软件系统分解为多个独立的模块&#xff0c;通过数据流和控制流来描述系统的行为。本章将结合 Java 代码示例、可视化图表&#xff0c;深入讲解面向数据流的分析与设计方法以及实时系统设计的相关内容。 11.1 …

初步尝试AI应用开发平台——Dify的本地部署和应用开发

随着大语言模型LLM和相关应用的流行&#xff0c;在本地部署并构建知识库&#xff0c;结合企业的行业经验或个人的知识积累进行定制化开发&#xff0c;是LLM的一个重点发展方向&#xff0c;在此方向上也涌现出了众多软件框架和工具集&#xff0c;Dify就是其中广受关注的一款&…

高阶数据结构——哈希表的实现

目录 1.概念引入 2.哈希的概念&#xff1a; 2.1 什么叫映射&#xff1f; 2.2 直接定址法 2.3 哈希冲突&#xff08;哈希碰撞&#xff09; 2.4 负载因子 2.5 哈希函数 2.5.1 除法散列法&#xff08;除留余数法&#xff09; 2.5.2 乘法散列法&#xff08;了解&#xff09…

7.安卓逆向2-frida hook技术-介绍

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 内容参考于&#xff1a;图灵Python学院 工具下载&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1bb8NhJc9eTuLzQr39lF55Q?pwdzy89 提取码&#xff1…

DB-GPT扩展自定义Agent配置说明

简介 文章主要介绍了如何扩展一个自定义Agent&#xff0c;这里是用官方提供的总结摘要的Agent做了个示例&#xff0c;先给大家看下显示效果 代码目录 博主将代码放在core目录了&#xff0c;后续经过对源码的解读感觉放在dbgpt_serve.agent.agents.expand目录下可能更合适&…

Android 架构演进之路:从 MVC 到 MVI,拥抱单向数据流的革命

在移动应用开发的世界里&#xff0c;架构模式的演进从未停歇。从早期的 MVC 到后来的 MVP、MVVM&#xff0c;每一次变革都在尝试解决前一代架构的痛点。而今天&#xff0c;我们将探讨一种全新的架构模式 ——MVI&#xff08;Model-View-Intent&#xff09;&#xff0c;它借鉴了…

【YOLOv8-pose部署至RK3588】模型训练→转换RKNN→开发板部署

已在GitHub开源与本博客同步的YOLOv8_RK3588_object_pose 项目&#xff0c;地址&#xff1a;https://github.com/A7bert777/YOLOv8_RK3588_object_pose 详细使用教程&#xff0c;可参考README.md或参考本博客第六章 模型部署 文章目录 一、项目回顾二、文件梳理三、YOLOv8-pose…

集成30+办公功能的实用工具

软件介绍 本文介绍的软件是千峰办公助手。 软件功能概述与开发目的 千峰办公助手集成了自动任务、系统工具、文件工具、PDF工具、OCR图文识别、文字处理、电子表格七个模块&#xff0c;拥有30余项实用功能。作者开发该软件的目的是解决常见办公痛点&#xff0c;把机械操作交…

IDEA启动报错:Cannot invoke “org.flowable.common.engine.impl.persistence.ent

1.问题 项目启动报错信息 java.lang.NullPointerException: Cannot invoke "org.flowable.common.engine.impl.persistence.ent 2.问题解析 出现这个问题是在项目中集成了Flowable或Activiti工作流&#xff0c;开启自动创建工作流创建的表&#xff0c;因为不同环境的数据…

网络安全--PHP第三天

今天学习文件上传的相关知识 上传的前端页面如下 upload.html <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"&g…

【愚公系列】《生产线数字化设计与仿真》004-颜色分类站仿真(基础概念)

🌟【技术大咖愚公搬代码:全栈专家的成长之路,你关注的宝藏博主在这里!】🌟 📣开发者圈持续输出高质量干货的"愚公精神"践行者——全网百万开发者都在追更的顶级技术博主! 👉 江湖人称"愚公搬代码",用七年如一日的精神深耕技术领域,以"…