云原生时代 Kafka 深度实践:02快速上手与环境搭建

2.1 本地开发环境搭建

单机模式安装

  1. 下载与解压:前往Apache Kafka 官网,下载最新稳定版本的 Kafka 二进制包(如kafka_2.13-3.6.0.tgz,其中2.13为 Scala 版本)。解压到本地目录,例如/opt/kafka
tar -xzf kafka\_2.13-3.6.0.tgz
mv kafka\_2.13-3.6.0 /opt/kafka
  1. 配置文件调整:Kafka 的核心配置文件位于/opt/kafka/config目录下。
  • server.properties:修改关键参数,如listeners=PLAINTEXT://``localhost:9092指定 Broker 监听地址和端口;log.dirs=/var/lib/kafka-logs设置消息存储目录;zookeeper.connect=``localhost:2181(若使用 Zookeeper)配置元数据管理地址。
  • zookeeper.properties(若未单独安装 Zookeeper):可保持默认配置,默认数据存储目录为/tmp/zookeeper,端口为2181
  1. 启动服务:依次启动 Zookeeper 和 Kafka Broker:
# 启动Zookeeper(若未单独安装)
/opt/kafka/bin/zookeeper-server-start.sh 
/opt/kafka/config/zookeeper.properties# 启动Kafka Broker
/opt/kafka/bin/kafka-server-start.sh 
/opt/kafka/config/server.properties

启动后,Kafka 将在localhost:9092监听 Producer 和 Consumer 的请求。

Docker 容器化部署

使用 Docker Compose 可快速搭建多节点 Kafka 集群,并简化环境管理:

  1. 创建docker-compose.yml文件
version: '3'  # 指定Docker Compose文件版本为3services:zookeeper:image: confluentinc/cp-zookeeper:7.3.0  # 使用Confluent提供的Zookeeper镜像,版本7.3.0environment:ZOOKEEPER_CLIENT_PORT: 2181  # 设置Zookeeper客户端连接端口为2181ZOOKEEPER_TICK_TIME: 2000  # 设置Zookeeper的心跳时间(单位:毫秒)ports:- "2181:2181"  # 将容器内的2181端口映射到主机的2181端口kafka:image: confluentinc/cp-kafka:7.3.0  # 使用Confluent提供的Kafka镜像,版本7.3.0depends_on:- zookeeper  # 指定Kafka服务依赖于Zookeeper服务environment:KAFKA_BROKER_ID: 1  # 设置Kafka broker的唯一IDKAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'  # 指定Kafka连接的Zookeeper地址KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT  # 定义监听器安全协议映射KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_INTERNAL://localhost:9093  # 定义对外广播的监听器地址KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_INTERNAL://0.0.0.0:9093  # 定义Kafka监听的地址和端口KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNAL  # 指定broker间通信使用的监听器名称ports:- "9092:9092"  # 将容器内的9092端口映射到主机的9092端口
  1. 启动集群:在包含docker-compose.yml的目录下执行:
docker-compose up -d

此配置启动一个单节点 Zookeeper 和一个 Kafka Broker,通过映射本地端口9092实现外部访问。如需扩展集群,可增加kafka服务实例并调整配置。

2.2 基础操作入门

命令行工具实战

  1. 创建 Topic:使用kafka-topics.sh命令创建一个名为test_topic,包含 3 个分区、2 个副本的 Topic:

/opt/kafka/bin/kafka-topics.sh --create \--topic test_topic \--bootstrap-server localhost:9092 \--partitions 3 \--replication-factor 2
  1. 生产与消费消息
  • 生产者:通过kafka-console-producer.shtest_topic发送消息:
/opt/kafka/bin/kafka-console-producer.sh \--topic test_topic \--bootstrap-server localhost:9092

输入消息内容(如Hello, Kafka!)并回车发送。

  • 消费者:使用kafka-console-consumer.shtest_topic拉取消息,支持从头开始消费或从最新位置消费:
# 从头开始消费
/opt/kafka/bin/kafka-console-consumer.sh \--topic test_topic \--from-beginning \--bootstrap-server localhost:9092# 从最新位置消费
/opt/kafka/bin/kafka-console-consumer.sh \--topic test_topic \--bootstrap-server localhost:9092
  1. 查看 Topic 元数据:使用--describe参数查看test_topic的分区分布、Leader 副本等信息:
/opt/kafka/bin/kafka-topics.sh --describe \--topic test_topic \--bootstrap-server localhost:9092
  1. 消费位移管理:默认情况下,Consumer 自动提交 Offset。如需手动提交,可在消费时添加--enable-auto-commit=false参数,并通过commitSync()commitAsync()方法控制提交时机。

2.3 首个 Java 程序:Producer & Consumer

Maven 依赖配置

pom.xml中添加 Kafka 客户端依赖:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>

Producer 代码示例

import org.apache.kafka.clients.producer.*;  
import java.util.Properties;  
import java.util.concurrent.ExecutionException;  public class KafkaProducerExample {public static void main(String[] args) {// 1. 配置Kafka生产者属性Properties props = new Properties();// 设置Kafka集群地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 设置键的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 设置值的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 设置消息确认机制:等待所有副本确认(最可靠但最慢)props.put(ProducerConfig.ACKS_CONFIG, "all");// 设置发送失败时的重试次数props.put(ProducerConfig.RETRIES_CONFIG, 3);// 2. 创建Kafka生产者实例Producer<String, String> producer = new KafkaProducer<>(props);// 3. 创建要发送的消息记录// 参数:topic名称,消息key,消息valueProducerRecord<String, String> record = new ProducerRecord<>("test_topic", "key1", "message1");try {// 4. 发送消息(同步方式)// send()返回Future,get()会阻塞直到收到响应RecordMetadata metadata = producer.send(record).get();// 5. 打印消息发送成功的元数据System.out.println("Message sent to partition " + metadata.partition() + " with offset " + metadata.offset());} catch (InterruptedException | ExecutionException e) {// 6. 处理发送过程中可能出现的异常e.printStackTrace();} finally {// 7. 关闭生产者(重要!避免资源泄漏)producer.close();}}
}

Consumer 代码示例

import org.apache.kafka.clients.consumer.*; 
import java.time.Duration;                  
import java.util.Collections;              
import java.util.Properties;               public class KafkaConsumerExample {public static void main(String[] args) {// 1. 配置Kafka消费者属性Properties props = new Properties();// 设置Kafka集群地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 设置消费者组ID(同一组内的消费者共享消息)props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");// 设置键的反序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 设置值的反序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 注意:默认是自动提交offset,这里我们改为手动提交(见下方commitSync())// 2. 创建Kafka消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 3. 订阅主题(可以订阅多个主题,这里用单例集合订阅单个主题)consumer.subscribe(Collections.singletonList("test_topic"));// 4. 持续轮询消息while (true) {// poll()方法获取消息,参数是等待时间(避免CPU空转)// 返回一批记录(可能包含0到多条消息)ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));// 5. 处理收到的每条消息for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d%n",record.key(), record.value(), record.partition(), record.offset());}// 6. 手动同步提交offset(确保消息被成功处理后再提交)// 注意:生产环境应考虑错误处理和异步提交(commitAsync)consumer.commitSync(); }// 实际应用中应该添加关闭逻辑(如通过ShutdownHook)// consumer.close();}
}

上述 Java 程序分别实现了消息的生产与消费,通过配置 Producer 和 Consumer 的参数,可灵活控制消息发送策略与消费行为。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/diannao/85014.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue Hook Store 设计模式最佳实践指南

Vue Hook Store 设计模式最佳实践指南 一、引言 在 Vue 3 组合式 API 与 TypeScript 普及的背景下&#xff0c;Hook Store 设计模式应运而生&#xff0c;它结合了 Vue 组合式 API 的灵活性与状态管理的最佳实践&#xff0c;为开发者提供了一种轻量级、可测试且易于维护的状态…

无人机多人协同控制技术解析

一、运行方式 无人机多人点对点控制通常采用以下两种模式&#xff1a; 1. 主从控制模式 指定一个主控用户拥有最高优先级&#xff0c;负责飞行路径规划、紧急操作等关键指令&#xff1b;其他用户作为观察者&#xff0c;仅能查看实时画面或提交辅助指令&#xff0c;需经主…

树型表查询方法 —— SQL递归

目录 引言&#xff1a; 自链接查询&#xff1a; 递归查询&#xff1a; 编写service接口实现&#xff1a; 引言&#xff1a; 看下图&#xff0c;这是 course_category 课程分类表的结构&#xff1a; 这张表是一个树型结构&#xff0c;通过父结点id将各元素组成一个树。 我…

微服务难题?Nacos服务发现来救场

文章目录 前言1.什么是服务发现2.Nacos 闪亮登场2.1 服务注册2.2 服务发现 3.Nacos 的优势3.1 简单易用3.2 高可用3.3 动态配置 4.实战演练4.1安装 Nacos4.2 服务注册与发现示例代码&#xff08;以 Spring Boot 为例&#xff09; 总结 前言 大家好&#xff0c;我是沛哥儿。今天…

AStar低代码平台-脚本调用C#方法

修改报工表表单&#xff0c;右键定义弹出菜单&#xff0c;新增一个菜单项&#xff0c;并在点击事件脚本中编写调用脚本。 编译脚本&#xff0c;然后在模块代码里面定义这个方法&#xff1a; public async Task<int> on_call_import(DataRow curRow) {PrintDataRow(cur…

python调用langchain实现RAG

一、安装langchain 安装依赖 python -m venv env.\env\Scripts\activatepip3 install langchainpip3 install langchain-corepip3 install langchain-openaipip3 install langchain-communitypip3 install dashscopepip3 install langchain_postgrespip3 install "psyc…

大学大模型教学:基于NC数据的全球气象可视化解决方案

引言 气象数据通常以NetCDF(Network Common Data Form)格式存储,这是一种广泛应用于科学数据存储的二进制文件格式。在大学气象学及相关专业的教学中,掌握如何读取、处理和可视化NC数据是一项重要技能。本文将详细介绍基于Python的NC数据处理与可视化解决方案,包含完整的代…

ORB-SLAM2学习笔记:ComputeKeyPointsOctTree分析过程记录

ComputeKeyPointsOctTree是ORB特征提取器中计算关键点的部分&#xff0c;特别是使用八叉树&#xff08;OctTree&#xff09;方法进行关键点分布。 首先&#xff0c;函数参数是vector<vector的引用allKeypoints&#xff0c;用来存储各层的关键点。代码开头调整了allKeypoint…

LeetCode Hot100(多维动态规划)

62. 不同路径 比较板子的dp&#xff0c;实际上就是到达一个点有两种方式&#xff0c;从上面来或者是左边&#xff0c;加起来就可以了 class Solution {public int uniquePaths(int m, int n) {int [][]arr new int[m2][n2];arr[1][1]1;for(int i1;i<m;i){for(int j1;j<…

Oracle MOVE ONLINE 实现原理

Oracle MOVE ONLINE 实现原理 Oracle 的 MOVE ONLINE 操作是一种在线重组表的技术&#xff0c;允许在不中断业务的情况下重新组织表数据。以下是其实现原理的详细分析&#xff1a; 基本概念 MOVE ONLINE 是 Oracle 12c 引入的特性&#xff0c;用于替代传统的 ALTER TABLE ..…

工作流长任务处置方案

以下是前后端协作处理长任务工作流的完整实现方案&#xff0c;结合技术选型与设计要点&#xff0c;以清晰结构呈现&#xff1a; 一、后端实现方案 异步任务队列架构 • 技术选型&#xff1a; ◦ 消息队列&#xff1a;NATS&#xff08;轻量级&#xff09;或 RabbitMQ&#xf…

RabbitMQ仲裁队列高可用架构解析

#作者&#xff1a;闫乾苓 文章目录 概述工作原理1.节点之间的交互2.消息复制3.共识机制4.选举领导者5.消息持久化6.自动故障转移 集群环境节点管理仲裁队列增加集群节点重新平衡仲裁队列leader所在节点仲裁队列减少集群节点 副本管理add_member 在给定节点上添加仲裁队列成员&…

fingerprint2浏览器指纹使用记录

我在uniapp-vue3-H5端使用的&#xff0c;记录一下 抄的这里前端使用fingerprintjs2获取浏览器指纹fingerprintjs2是通过设备浏览器信息获取浏览器指纹的插件&#xff08; - 掘金 1、安装依赖 npm i fingerprintjs2 -S2、抽成模块文件&#xff0c;/utils/Fingerprint2.js 生成指…

深度学习面试八股简略速览

在准备深度学习面试时&#xff0c;你可能会感到有些不知所措。毕竟&#xff0c;深度学习是一个庞大且不断发展的领域&#xff0c;涉及众多复杂的技术和概念。但别担心&#xff0c;本文将为你提供一份全面的指南&#xff0c;从基础理论到实际应用&#xff0c;帮助你在面试中脱颖…

使用 Redis 作为向量数据库

一、什么是向量数据库&#xff1f; 向量&#xff08;Vector&#xff09;&#xff1a;在机器学习和 AI 中&#xff0c;向量是由一系列数字组成的序列&#xff0c;用于数值化地描述数据的特征或语义。文本、图像、音频等非结构化数据可以通过模型转换成固定长度的向量。 向量数据…

变量的计算

不同类型变量之间的计算 数字型变量可以直接计算 在python中&#xff0c;数字型变量可以直接通过算术运算符计算bool型变量&#xff1a;True 对应数字1 &#xff1b;False 对应数字0、 字符串变量 使用 拼接字符串 使用 * 拼接指定倍数的相同字符串 变量的输入&#xff1a;&…

PostgreSQL学会如何建表

开始使用PostgreSQL之前&#xff0c; 上一节我们说了怎样安装它。 PostgreSQL可能已经安装到你的电脑上了,安装后postgre服务默认在电脑开机时运行启动。 一.了解PostgreSQL的运行 PostgreSQL使用一种客户端/服务器&#xff08;C/S&#xff09;模型。 和其他典型的客户端/服务…

Linux驱动学习笔记(十)

热插拔 1.热插拔&#xff1a;就是带电插拔&#xff0c;即允许用户在不关闭系统&#xff0c;不切断电源的情况下拆卸或安装硬盘&#xff0c;板卡等设备。热插拔是内核和用户空间之间&#xff0c;通过调用用户空间程序实现交互来实现的&#xff0c;当内核发生了某种热拔插事件时…

大模型应用开发第五讲:成熟度模型:从ChatGPT(L2)到未来自主Agent(L4)

大模型应用开发第五讲&#xff1a;成熟度模型&#xff1a;从ChatGPT&#xff08;L2&#xff09;到未来自主Agent&#xff08;L4&#xff09; 资料取自《大模型应用开发&#xff1a;动手做AI Agent 》。 查看总目录&#xff1a;学习大纲 关于DeepSeek本地部署指南可以看下我之…

Delphi 导入excel

Delphi导入Excel的常见方法可分为两种主流方案&#xff1a;基于OLE自动化操作Excel原生接口和利用第三方组件库。以下为具体实现流程及注意事项&#xff1a; ‌一、OLE自动化方案&#xff08;推荐基础场景&#xff09;‌ 该方法通过COM接口调用本地安装的Excel程序&#xff0c…