centos7.9使用docker-compose安装kafka

docker-compose配置文件

services:zookeeper:image: confluentinc/cp-zookeeper:7.0.1hostname: zookeepercontainer_name: zookeeperports:- "2181:2181"environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000kafka:image: confluentinc/cp-kafka:7.0.1hostname: kafkacontainer_name: kafkadepends_on:- zookeeperports:- "9092:9092"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXTKAFKA_LISTENERS: PLAINTEXT://:9092,PLAINTEXT_INTERNAL://:29092KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.85:9092,PLAINTEXT_INTERNAL://kafka:29092KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNALKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1kafka-manager:image: hlebalbau/kafka-manager:stablecontainer_name: kafka-managerdepends_on:- zookeeperports:- "9002:9000"environment:ZK_HOSTS: "zookeeper:2181"kAFKA_BROKERS: 192.168.1.85:9092KAFKA_MANAGER_AUTH_ENABLED: "false"

application.properties文件

spring.application.name=kafka_demo
# application.properties
spring.kafka.bootstrap-servers=192.168.1.85:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

生产者controller

import com.yykj.kafka_demo.service.KafkaProducerService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaTestController {private final KafkaProducerService kafkaProducerService;public KafkaTestController(KafkaProducerService kafkaProducerService) {this.kafkaProducerService = kafkaProducerService;}@GetMapping("/send")public String sendMessageToKafka(@RequestParam(value = "topic", defaultValue = "test-topic") String topic,@RequestParam(value = "message", defaultValue = "Hello Kafka!") String message) {kafkaProducerService.sendMessage(topic, message);return "消息已发送: " + message + " 到主题: " + topic;}
}

生产者service

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}/*** 发送消息到指定主题* @param topic 主题名称* @param message 消息内容*/public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message).whenComplete((result, ex) -> {if (ex == null) {System.out.println("消息发送成功: " + message +", 分区: " + result.getRecordMetadata().partition() +", 偏移量: " + result.getRecordMetadata().offset());} else {System.err.println("消息发送失败: " + ex.getMessage());}});}
}

消费者监听

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumerService {// 监听指定的主题,groupId用于区分不同的消费者组@KafkaListener(topics = "${kafka.topic:test-topic}", groupId = "${kafka.group-id:test-group}")public void consumeMessage(ConsumerRecord<String, String> record) {System.out.printf("收到消息 -> 主题: %s, 分区: %d, 偏移量: %d, 键: %s, 值: %s%n",record.topic(),record.partition(),record.offset(),record.key(),record.value());// 这里可以添加你的业务逻辑处理}
}

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.5.0</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.yykj</groupId><artifactId>kafka_demo</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka_demo</name><description>kafka_demo</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- Spring Boot Starter for Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.3.6</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><annotationProcessorPaths><path><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></path></annotationProcessorPaths></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/diannao/84544.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STM32:Modbus通信协议核心解析:关键通信技术

知识点1【 Modbus通信】 1、Modbus的概述 Modbus是OSI模型第七层的应用层报文传输协议 协议&#xff1a;说明有组包和解包的过程 2、通信机制 Modelbus是一个请求/应答协议 通信机制&#xff1a;主机轮询&#xff0c;从机应答的机制。每个从设备有唯一的地址&#xff0c;主…

LeetCode 3362.零数组变换 III:贪心+优先队列+差分数组——清晰题解

【LetMeFly】3362.零数组变换 III&#xff1a;贪心优先队列差分数组——清晰题解 力扣题目链接&#xff1a;https://leetcode.cn/problems/zero-array-transformation-iii/ 给你一个长度为 n 的整数数组 nums 和一个二维数组 queries &#xff0c;其中 queries[i] [li, ri] …

ORM++ 封装实战指南:安全高效的 C++ MySQL 数据库操作

ORM 封装实战指南&#xff1a;安全高效的 C MySQL 数据库操作 一、环境准备 1.1 依赖安装 # Ubuntu/Debian sudo apt-get install libmysqlclient-dev # CentOS sudo yum install mysql-devel# 编译时链接库 (-I 指定头文件路径 -L 指定库路径) g main.cpp -stdc17 -I/usr/i…

JESD204B 协议介绍

一、协议概述 JESD204B是由JEDEC&#xff08;固态技术协会&#xff09;制定的高速串行接口标准&#xff0c;专为模数转换器&#xff08;ADC&#xff09;、数模转换器&#xff08;DAC&#xff09;与逻辑器件&#xff08;如FPGA、ASIC&#xff09;之间的数据传输设计。其核心目标…

yolov8,c++案例汇总

文章目录 引言多目标追踪案例人体姿态估计算法手势姿态估计算法目标分割算法 引言 以下案例,基于c,ncnn,yolov8既可以在windows10/11上部署, 也可以在安卓端部署, 也可以在嵌入式端部署, 服务器端可支持部署封装为DLL,支持c/c#/java端调用 多目标追踪案例 基于yolov8, ncnn,…

运动规划实战案例 | 图解基于状态晶格(State Lattice)的路径规划(附ROS C++/Python仿真)

目录 1 控制采样 vs 状态采样2 State Lattice路径规划2.1 算法流程2.2 Lattice运动基元生成2.3 几何代价函数2.4 运动学约束启发式 3 算法仿真3.1 ROS C仿真3.2 Python仿真 1 控制采样 vs 状态采样 控制采样的技术路线源自经典的运动学建模思想。这种方法将机器人的控制指令空…

BERT框架:自然语言处理的革命性突破

引言 在自然语言处理&#xff08;NLP&#xff09;领域&#xff0c;2018年Google推出的BERT&#xff08;Bidirectional Encoder Representations from Transformers&#xff09;框架无疑是一场革命。作为基于Transformer架构的双向编码器表示模型&#xff0c;BERT通过预训练学习…

【Fifty Project - D31】

结束了一个超级消耗周末&#xff0c;满安排之健身梅溪湖游泳做饭喝酒羽毛球赛 完全力竭了&#xff0c;久久不能恢复过来&#xff0c;暂停健身安排了 端午后再继续 今日完成记录 TimePlan完成情况7&#xff1a;30 - 8&#xff1a;10有氧爬坡√9&#xff1a;00 - 11&#xff1a;…

信息学奥赛一本通 1547:【 例 1】区间和

【题目链接】 ybt 1547&#xff1a;【 例 1】区间和 【题目考点】 1. 线段树 2. 树状数组 【解题思路】 本题要求维护区间和&#xff0c;实现单点修改、区间查询。 解法1&#xff1a;线段树 线段树原理&#xff0c;及实现方法见&#xff1a;洛谷 P3374 【模板】树状数组…

力扣面试150题--求根节点到叶节点数字之和

Day 48 题目描述 思路 我们利用sum这个全局变量来保存总和值&#xff0c;递归函数sum来计算每个根到叶子节点路径所代表的数&#xff0c;由于我们需要遍历到每条根到叶子节点的路径&#xff0c;所有我采取了前序遍历&#xff0c;如果不是叶子节点&#xff0c;就计算到该节点代…

DJI上云API官方demo学习

1、websocket&#xff0c;所在位置如下图&#xff0c;调用的可以用//websocket搜索 2、用到的http客户端&#xff0c;axios 3、很多和后端交互都是走的http请求

uniapp开发小程序,如何根据权限动态配置按钮或页面内容

前言 写了好几个项目&#xff0c;发现小程序对权限控制非常麻烦&#xff0c;于是有了这个想法&#xff0c;但是网上找了一圈没有一个比较完善的讲解&#xff0c;因为小程序不支持自定义指令&#xff0c;所以不能像后台那样方便&#xff0c;于是就将几个博主的想法结合。 思路就…

LSTM+Transformer混合模型架构文档

LSTMTransformer混合模型架构文档 模型概述 本项目实现了一个LSTMTransformer混合模型&#xff0c;用于超临界机组协调控制系统的数据驱动建模。该模型结合了LSTM的时序建模能力和Transformer的自注意力机制&#xff0c;能够有效捕捉时间序列数据中的长期依赖关系和变量间的复…

测量尺子:多功能测量工具,科技改变生活

测量尺子是一款专业的测距仪测量万能工具箱类型手机APP&#xff0c;旨在为用户提供最贴心的测量助手。它拥有和现实测量仪器一样的测量标准&#xff0c;更简单便捷且精准的测量方式&#xff0c;最新AR科技测量更是大大拓宽了可以被测量的高度和深度。无论是日常使用、学习还是工…

结课作业01. 用户空间 MPU6050 体感鼠标驱动程序

目录 一. qt界面实现 二. 虚拟设备模拟模拟鼠标实现体感鼠标 2.1 函数声明 2.2 虚拟鼠标实现 2.2.1 虚拟鼠标创建函数 2.2.2 鼠标移动函数 2.2.3 鼠标点击函数 2.3 mpu6050相关函数实现 2.3.1 i2c设备初始化 2.3.2 mpu6050寄存器写入 2.3.3 mpu6050寄存器读取 2.3.…

深入浅出 Python Testcontainers:用容器优雅地编写集成测试

在现代软件开发中&#xff0c;自动化测试已成为敏捷开发与持续集成中的关键环节。单元测试可以快速验证函数或类的行为是否符合预期&#xff0c;而集成测试则确保多个模块协同工作时依然正确。问题是&#xff1a;如何让集成测试可靠、可重复且易于维护&#xff1f; 这时&#…

JVM 的垃圾回收器

新生代回收器 通性 会触发StW&#xff0c;暂停所有应用线程复制算法 Serial 单线程回收适合单线程系统 ParNew 多线程回收优先保证响应速度&#xff0c;降低 STW&#xff08;STW 越大&#xff0c;执行垃圾回收的时间越长&#xff0c;回收的垃圾越多&#xff0c;减少垃圾回…

【笔记】排查并解决Error in LLM call after 3 attempts: (status code: 502)

#工作记录 一、问题描述 在部署运行部署对冲基金分析工具 ai-hedge-fund 时&#xff0c;不断出现以下报错&#xff0c;导致项目运行异常&#xff1a; Error in LLM call after 3 attempts: (status code: 502) Error in LLM call after 3 attempts: [WinError 10054] 远程主…

GO 语言进阶之 Template 模板使用

更多个人笔记见&#xff1a; github个人笔记仓库 gitee 个人笔记仓库 个人学习&#xff0c;学习过程中还会不断补充&#xff5e; &#xff08;后续会更新在github上&#xff09; 文章目录 Template 模板基本示例语法1. 基本输出语法2. 控制结构3. 空白字符控制4. Must函数 Temp…

origin绘图之【如何将多条重叠、高度重叠的点线图、折线图分开】

在日常的数据可视化工作中&#xff0c;Origin 作为一款功能强大的科研绘图软件&#xff0c;广泛应用于实验数据处理、结果展示与论文图表制作等领域。然而&#xff0c;在处理多组数据、特别是绘制多条曲线的折线图或点线图时&#xff0c;常常会遇到这样一个困扰&#xff1a;多条…