Kafka入门4.0.0版本(基于Java、SpringBoot操作)

Kafka入门4.0.0版本(基于Java、SpringBoot操作)

一、kafka概述

Kafka最初是由LinkedIn公司开发的,是一个高可靠、高吞吐量、低延迟的分布式发布订阅消息系统,它使用Scala语言编写,并于2010年被贡献给了Apache基金会,随后成为Apache的顶级开源项目。主要特点有:

  1. 为发布和订阅提供高吞吐量
  2. 消息持久化
  3. 分布式
  4. 消费消息采用Pull模式
  5. 支持在线和离线场景

本次采用最新的kafka版本4.0.0,Kafka 4.0 最引人瞩目的变化之一,当属其默认运行在 KRaft(Kafka Raft)模式下,彻底摆脱了对 Apache ZooKeeper 的依赖。在 Kafka 的发展历程中,ZooKeeper 曾是其核心组件,负责协调分布式系统中的元数据管理、Broker 注册、主题分区分配等关键任务。然而,随着 Kafka 功能的不断丰富与用户规模的持续扩大,ZooKeeper 逐渐成为系统部署和运维中的一个复杂性来源,增加了运营成本与管理难度。

KRaft 模式的引入,标志着 Kafka 在架构上的自我进化达到了一个新高度。通过采用基于 Raft 一致性算法的共识机制,Kafka 将元数据管理内嵌于自身体系,实现了对 ZooKeeper 的无缝替代。这一转变带来了多方面的显著优势:

简化部署与运维:运维人员无需再为维护 ZooKeeper 集群投入额外精力,降低了整体运营开销。新架构减少了系统的复杂性,使得 Kafka 的安装、配置和日常管理变得更加直观和高效。

增强可扩展性:KRaft 模式下,Kafka 集群的扩展性得到了进一步提升。新增 Broker 节点的加入流程更加简便,能够更好地适应大规模数据处理场景下对系统资源动态调整的需求。

提升系统性能与稳定性:去除 ZooKeeper 这一外部依赖后,Kafka 在元数据操作的响应速度和一致性方面表现出色。尤其是在高并发写入和读取场景下,系统的稳定性和可靠性得到了增强,减少了因外部组件故障可能导致的单点问题。

  • 之前的架构

在这里插入图片描述

  • 现在的架构

在这里插入图片描述

kafka消费模型

不同消费者组可以消费全量的消息,相同消费者组内的消费者只能消费一部分。

在这里插入图片描述

kafka基本概念

Producer(生产者)

消息的生产者,负责将消息发送到Kafka集群中。

Consumer(消费者)

消息的消费者,负责从Kafka集群中读取并处理消息

Broker(服务代理节点)

Kafka集群中的一个或多个服务器,负责存储和转发消息。

Topic(主题)

Kafka中的消息以主题为单位进行归类,生产者发送消息到特定主题,消费者订阅并消费这些主题的消息。

Partition(分区)

每个主题可以细分为多个分区,分区是Kafka存储消息的物理单位,每个分区可以看作是一个有序的、不可变的消息序列。

Replica(副本)

Kafka为每个分区引入了多副本机制,以提高数据的安全性和可靠性。副本分为leader和follower,其中leader负责处理读写请求,follower负责从leader同步数据。

Consumer Group(消费者组)

由多个消费者组成,消费者组内的消费者共同消费同一个主题的消息,但每个消费者只负责消费该主题的一个或多个分区,避免消息重复消费。

kraft

通过采用基于 Raft 一致性算法的共识机制,Kafka 将元数据管理内嵌于自身体系,实现了对 ZooKeeper 的无缝替代

kafka发送端采用push模式

kafka消费端采用pull模式订阅并消费消息

在这里插入图片描述

Kafka的工作原理

可以概括为以下几个步骤:

  • 消息发布: 生产者将消息发送到Kafka集群的特定主题,并可以选择发送到该主题的哪个分区。如果未指定分区,Kafka会根据负载均衡策略自动选择分区。

  • 消息存储: Kafka将接收到的消息存储在磁盘上的分区中,每个分区都是一个有序的消息序列。Kafka使用顺序写入和零拷贝技术来提高写入性能,并通过多副本机制确保数据的安全性和可靠性。

在这里插入图片描述

  • 消息消费: 消费者组中的消费者从Kafka集群中订阅并消费消息。每个消费者负责消费一个或多个分区中的消息,并确保消息至少被消费一次。消费者可以使用拉(Pull)模式或推(Push)模式从Kafka中拉取消息。

在这里插入图片描述

  • 负载均衡: Kafka通过ZooKeeper维护集群的元数据信息,包括分区和消费者的对应关系。当消费者数量或分区数量发生变化时,Kafka会重新分配分区给消费者,以实现负载均衡。

  • 容错机制: Kafka通过多副本机制实现容错。当leader副本出现故障时,Kafka会从ISR(In-Sync Replicas)集合中选择一个新的leader副本继续对外提供服务。同时,Kafka还提供了多种可靠性级别供用户选择,以满足不同的业务需求。

kafka特点

一、Kafka的持久化机制

Kafka的持久化机制主要涉及消息的存储和复制。Kafka以日志的形式存储消息,每个主题(Topic)被划分为多个分区(Partition),每个分区中的消息按照顺序进行存储。Kafka使用多个副本(Replica)保证消息的持久性和可靠性,每个分区的消息会被复制到多个副本中,以防止数据丢失。此外,Kafka还允许根据配置的保留策略来保留已消费的消息一段时间,以便在需要时进行检索和恢复。

Kafka的副本机制是其实现高可用性和数据持久性的重要基石。每个主题的每个分区都配置有多个副本,这些副本分散保存在不同的Broker上,从而能够对抗部分Broker宕机带来的数据不可用问题。Kafka的副本机制包括领导者副本(Leader Replica)和追随者副本(Follower Replica):

领导者副本:负责处理所有的读写请求,包括生产者的消息写入和消费者的消息读取。

追随者副本:从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。追随者副本不对外提供服务,只作为数据的冗余备份。

Kafka还引入了ISR(In-Sync Replicas)机制,即与领导者副本保持同步的副本集合。只有处于ISR中的副本才能参与到消息的写入和读取过程中,以确保数据的一致性和可靠性。当某个副本与领导者副本的同步延迟超过一定的阈值时,它会被踢出ISR,直到同步恢复正常。

二、Kafka的数据一致性

Kafka通过多个机制来确保数据的一致性,包括副本同步、ISR机制、生产者事务和消费者事务等:

副本同步:确保主副本将数据同步到所有副本的过程,在副本同步完成之前,生产者才会认为消息已经被成功写入。

ISR机制:通过动态调整ISR列表中的副本,确保只有可靠的副本参与到数据的读写操作,从而提高数据的一致性和可靠性。

生产者事务:Kafka的生产者事务机制可以确保消息的Exactly-Once语义,即消息不会被重复写入或丢失。生产者事务将消息的发送和位移提交等操作放在同一个事务中,一旦事务提交成功,就意味着消息已经被成功写入,并且对应的位移也已经提交。

消费者事务:虽然Kafka的消费者通常不直接支持事务但消费者可以通过提交位移(Offset)来确保消息的正确消费。消费者事务将消息的拉取和位移提交等操作放在同一个事务中,以确保消息不会被重复消费或丢失。

二、kafka应用

2.1 win11安装kafka4.0.0

下载地址:https://kafka.apache.org/downloads 下载最后一个kafka-2.13-4.0.0.tgz

在这里插入图片描述

下载好之后,把这个压缩包解压就行了,然后找到config下面的server.properties

找到log.dirs改成自己电脑上的目录

log.dirs=E:\\runSoft\\kafka\\data

在这里插入图片描述

  • 第一步 获取uuid

先打开命令行,进入到bin下面的windows目录下

命令

kafka-storage.bat random-uuid

在这里插入图片描述

先获取uuid,我的uuid为ANVnC_s-QYGJF1C7wu9Aww

  • 第二步 格式化日志

命令:

kafka-storage.bat format --standalone -t PPEZ2LW8T8yjZNWnfNHorQ -c ../../config/server.properties

在这里插入图片描述

  • 第三步 启动

打开命令行,进入到bin下面的windows目录下 启动命令

kafka-server-start.bat ../../config/server.properties
创建topic
kafka-topics.bat --create --topic quickstart-events --bootstrap-server localhost:9092
启动一个消费端
kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
启动一个生产端
kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
问题
1、如果提示如下
命令行  输入行太长。
命令语法不正确。

则需要把目录变短,目录太长,win11不让输入。

2,tgz需要解压两次
只解压一次是不行的,tgz是打包之后压缩的。

3、如果启动失败,需要重新配置

重新配置时。把log.dirs的路径下面的东西清空
2.2 java开发kafka

第一步,引入依赖

 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>4.0.0</version>
</dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.17</version>
</dependency>

第二步,建立生产者

public class Producer {public static void main(String[] args) {Map<String,Object> props = new HashMap<>();//  kafka 集群 节点props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");String topic = "test";KafkaProducer<String,  String> producer = new KafkaProducer(props);producer.send(new ProducerRecord<String, String>(topic, "key", "value-1"));producer.send(new ProducerRecord<String, String>(topic, "key", "value-2"));producer.send(new ProducerRecord<String, String>(topic, "key", "value-3"));producer.close();}}

ProducerRecord 是 Kafka 中的一个核心类,它代表了一组 Kafka 需要发送的 key/value 键值对,它由记录要发送到的主题名称(Topic Name),**可选的分区号(Partition Number)**以及可选的键值对构成。

在这里插入图片描述

第三步、建立消费者类

public class Consumer {public static void main(String[] args){Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String , String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("test"));while (true) {ConsumerRecords<String,String> records =  consumer.poll(Duration.ofDays(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("partition = %d ,offset = %d, key = %s, value = %s%n",record.partition(), record.offset(), record.key(), record.value());}}}
}

运行效果

在这里插入图片描述

2.3 spring boot整合kafka

第一步,引入依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.4.1</version><relativePath/> <!-- lookup parent in repository --></parent><artifactId>spring_boot_kafka_demo</artifactId><packaging>jar</packaging><name>spring_boot_kafka_demo Maven Webapp</name><url>http://maven.apache.org</url><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>4.0.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

第二步,编写配置文件

编写resources下的application.yml

spring:kafka:bootstrap-servers: localhost:9092consumer:auto-offset-reset: earliest

第三步,编写生产者

@Service
public class Producer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send("topic1", message);}@PostConstructpublic void init() {sendMessage("Hello, Kafka!");}
}

第四步,编写消费者

@Component
public class Consumer {@KafkaListener(id = "myId", topics = "topic1")public void listen(String in) {System.out.println(in);}
}

第五步,编写启动类

@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}}

运行效果

在这里插入图片描述

2.4 记录日志到kafka中

第一步,在2.3的基础上,添加依赖

<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.12</version> <!-- Spring Boot 3.x 推荐版本 -->
</dependency>

第二步,添加kafka的日志appender类

public class KafkaLogbackAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {private String topic = "application-logs";private String bootstrapServers = "localhost:9092";private KafkaProducer<String, String> producer;@Overridepublic void start() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());this.producer = new KafkaProducer<>(props);super.start();}@Overrideprotected void append(ILoggingEvent eventObject) {String msg = eventObject.getFormattedMessage();producer.send(new ProducerRecord<>(topic, msg));}@Overridepublic void stop() {if (producer != null) {producer.close();}super.stop();}// Getter and Setter for XML configpublic void setTopic(String topic) {this.topic = topic;}public void setBootstrapServers(String bootstrapServers) {this.bootstrapServers = bootstrapServers;}
}

第三步,在resources下添加logback-spring.xml文件

<configuration debug="false" scan="true" scanPeriod="30 seconds"><!-- 定义日志格式 --><property name="PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n"/><!-- 控制台输出 --><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>${PATTERN}</pattern></encoder></appender><!-- Kafka Appender --><appender name="KAFKA" class="com.demo.KafkaLogbackAppender"><bootstrapServers>localhost:9092</bootstrapServers><topic>application-logs</topic></appender><!-- 根日志输出 --><root level="info"><appender-ref ref="STDOUT"/><appender-ref ref="KAFKA"/></root></configuration>

第四步,修改Producer类

@Service
public class Producer {private static final Logger logger = LoggerFactory.getLogger(Producer.class);@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send("topic1", message);}@PostConstructpublic void init() {sendMessage("Hello, Kafka!");logger.info("Message sent");logger.info("Message sent");logger.info("Message sent");logger.info("Message sent");logger.info("Message sent");}
}

第五步,修改Consumer类

@Component
public class Consumer {@KafkaListener(id = "myId", topics = "topic1")public void listen(String in) {System.out.println(in);}@KafkaListener(id = "myId2", topics = "application-logs")public void listen2(String in) {System.out.println("resinfo:"+in);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/web/83661.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

react react-router-dom中获取自定义参数v6.4版本之后

路由配置, AutnToken 组件作为权限、登录管理 import { createBrowserRouter, Navigate } from react-router-dom; import Layout from /layout/index; import Login from /pages/login; import Page404 from /pages/404;import AutnToken from /components/authToken; import…

AI中的Prompt

1. System 作用&#xff1a;设定 AI 的“角色设定”和“行为准则”。 内容&#xff1a;通常是描述 LLM 的身份、语气、行为范围、约束规则。 类似&#xff1a;在大语言模型中是最优先被考虑的提示。 示例&#xff1a; 你是一个专业的商品评价分析助手&#xff0c;请根据用户…

从人工到智能:IACheck如何重构检测报告审核工作流?

从人工到智能&#xff1a;IACheck如何重构检测报告审核工作流&#xff1f; 在当今AI技术迅猛发展的时代&#xff0c;各行各业正经历从“人工驱动”到“智能驱动”的根本性变革。检测认证&#xff08;TIC&#xff09;行业作为关乎质量与安全的重要支柱&#xff0c;也不例外。在…

React事件处理:如何给按钮绑定onClick点击事件?

系列回顾&#xff1a; 在前几篇文章中&#xff0c;我们已经学会了如何使用 State 管理组件的内部数据&#xff0c;以及如何通过 Props 实现父子组件之间的通信。我们的组件现在已经有了“数据”和“外观”。但是&#xff0c;它还像一个只能看的“模型”&#xff0c;无法与用户进…

【机器学习|学习笔记】粒子群优化(Particle Swarm Optimization, PSO)详解,附代码。

【机器学习|学习笔记】粒子群优化&#xff08;Particle Swarm Optimization, PSO&#xff09;详解&#xff0c;附代码。 【机器学习|学习笔记】粒子群优化&#xff08;Particle Swarm Optimization, PSO&#xff09;详解&#xff0c;附代码。 文章目录 【机器学习|学习笔记】粒…

深度剖析:AI 社媒矩阵营销工具,如何高效获客?

在社交媒体营销领域&#xff0c;竞争日益激烈&#xff0c;传统的社媒矩阵运营方式面临诸多挑战。而 AI 社媒矩阵营销工具的出现&#xff0c;正以前所未有的方式重构社媒矩阵的底层架构&#xff0c;为营销人员带来了全新的机遇与变革。接下来&#xff0c;我们将从技术破局、实战…

Spring XML 常用命名空间配置

Spring XML 常用命名空间配置 下面是一个综合性的Spring XML配置样例&#xff0c;展示了各种常用命名空间的使用方式&#xff1a; <?xml version"1.0" encoding"UTF-8"?> <beans xmlns"http://www.springframework.org/schema/beans&quo…

UE5场景漫游——开始界面及关卡跳转

UE中实现UMG游戏界面搭建及蓝图控制&#xff0c;点击游戏界面中的按钮实现关卡的跳转效果。 一、游戏界面显示。1.创建UMG&#xff0c;2.搭建UI。3.关卡蓝图控制显示 二、点击按钮之后实现关卡跳转

CSS 外边距合并(Margin Collapsing)问题研究

在 CSS 中&#xff0c;margin-top 属性会导致外部 DIV 移动的现象主要是由于 外边距合并&#xff08;Margin Collapsing&#xff09; 造成的。这是 CSS 盒模型的一个特性&#xff0c;可能会与直觉相悖。 外边距合并的原理 当一个元素&#xff08;如内部 DIV&#xff09;的 ma…

清理电脑C磁盘,方法N:使用【360软件】中的【清理C盘空间】

1、先下载并打开【360安全卫士】&#xff0c;点击如下位置&#xff1a; 之后&#xff0c;可以把这个东西&#xff0c;创建快捷方式到电脑桌面&#xff0c;方便以后使用&#xff1a;

微服务集成seata分布式事务 at模式快速验证

微服务集成Seata分布式事务 本次demo代码地址业务场景&#xff1a;一般用于以下场景&#xff1a;使用 AT 模式的优势&#xff08;适用于快速验证&#xff09;&#xff1a;快速验证建议步骤&#xff1a;注意事项&#xff1a; 工具环境微服务版本选择Nacos 环境搭建与启动nacos 下…

LLM基础5_从零开始实现 GPT 模型

基于GitHub项目&#xff1a;https://github.com/datawhalechina/llms-from-scratch-cn 设计 LLM 的架构 GPT 模型基于 Transformer 的 decoder-only 架构&#xff0c;其主要特点包括&#xff1a; 顺序生成文本 参数数量庞大&#xff08;而非代码量复杂&#xff09; 大量重复…

Android 中 linux 命令查询设备信息

一、getprop 命令 在 Linux 系统中&#xff0c; getprop 命令通常用于获取 Android 设备的系统属性&#xff0c;这些属性包括设备型号、Android 版本、电池状态等。 1、获取 Android 版本号 adb shell getprop ro.build.version.release2、获取设备型号 adb shell getprop …

26考研 | 王道 | 计算机组成原理 | 六、总线

26考研 | 王道 | 计算机组成原理 | 六、总线 文章目录 26考研 | 王道 | 计算机组成原理 | 六、总线6.1 总线概述1. 总线概述2. 总线的性能指标 6.2 总线仲裁&#xff08;考纲没有&#xff0c;看了留个印象&#xff09;6.3 总线操作和定时6.4 总线标准&#xff08;考纲没有&…

SE(Secure Element)加密芯片与MCU协同工作的典型流程

以下是SE&#xff08;Secure Element&#xff09;加密芯片与MCU协同工作的典型流程&#xff0c;综合安全认证、数据保护及防篡改机制&#xff1a; 一、基础认证流程&#xff08;参数保护方案&#xff09; 密钥预置‌ SE芯片与MCU分别预置相同的3DES密钥&#xff08;Key1、Key2…

数据库——MongoDB

一、介绍 1. MongoDB 概述 MongoDB 是一款由 C 语言编写的开源 NoSQL 数据库&#xff0c;采用分布式文件存储设计。作为介于关系型和非关系型数据库之间的产品&#xff0c;它是 NoSQL 数据库中最接近传统关系数据库的解决方案&#xff0c;同时保留了 NoSQL 的灵活性和扩展性。…

WebSocket 前端断连原因与检测方法

文章目录 前言WebSocket 前端断连原因与检测方法常见 WebSocket 断连原因及检测方式聊天系统场景下的断连问题与影响行情推送场景下的断连问题与影响React 前端应对断连的稳健策略自动重连机制的设计与节流控制心跳机制的实现与保持连接存活连接状态管理与 React 集成错误提示与…

2025年真实面试问题汇总(三)

线上数据库数据丢失如何恢复 线上数据库数据丢失的恢复方法需要根据数据丢失原因、备份情况及数据库类型&#xff08;如MySQL、SQL Server、PostgreSQL等&#xff09;综合处理&#xff0c;以下是通用的分步指南&#xff1a; 一、紧急止损&#xff1a;暂停写入&#xff0c;防止…

Android音视频多媒体开源框架基础大全

安卓多媒体开发框架中&#xff0c;从音频采集&#xff0c;视频采集&#xff0c;到音视频处理&#xff0c;音视频播放显示分别有哪些常用的框架&#xff1f;分成六章&#xff0c;这里一次帮你总结完。 音视频的主要流程是录制、处理、编解码和播放显示。本文也遵循这个流程展开…

安卓上架华为应用市场、应用宝、iosAppStore上架流程,保姆级记录(1)

上架前请准备好apk、备案、软著、企业开发者账号&#xff01;&#xff01;&#xff01;其余准备好app相关的截图、介绍、测试账号&#xff0c;没讲解明白的评论区留言~ 华为应用市场 1、登录账号 打开 华为开发者平台 https://developer.huawei.com/consumer/cn/ 2.登录企…