Kafka入门-生产者

生产者

生产者发送流程:
在这里插入图片描述

延迟时间为0ms时,也就意味着每当有数据就会直接发送

异步发送API

异步发送和同步发送的不同在于:异步发送不需要等待结果,同步发送必须等待结果才能进行下一步发送。

普通异步发送

首先导入所需的kafka依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version>
</dependency>
public class CustomProducer {public static void main(String[] args) {//配置Properties properties = new Properties();//连接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.27.101:9092,192.168.27.102:9092");//指定对应的key和value的序列化类型properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//创建Kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//异步发送数据kafkaProducer.send(new ProducerRecord<>("first","XXX learn Kafka"));for (int i = 0; i < 8; i++) {kafkaProducer.send(new ProducerRecord<>("first","learning Kafka-"+i));}//关闭资源kafkaProducer.close();}
}
带回调函数的异步发送

回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数:元数据信息和异常信息,如果异常信息为null,说明消息发送成功,如果异常现象不为null,说明消息发送失败。

修改发送方法,采用回调

//异步发送数据,并有回调函数
kafkaProducer.send(new ProducerRecord<>("first","XXX learn Kafka"));
for (int i = 0; i < 8; i++) {kafkaProducer.send(new ProducerRecord<>("first", "learning Kafka-" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e == null){System.out.println("主题: "+ recordMetadata.topic() +" 分区:"+recordMetadata.partition());}}});}

运行方法就能看到返回的主题、分区

主题: first 分区:2

同步发送

同步发送只需更改发送方式

//同步发送数据kafkaProducer.send(new ProducerRecord<>("first","XXX learn Kafka"));for (int i = 0; i < 8; i++) {kafkaProducer.send(new ProducerRecord<>("first","learning Kafka-"+i)).get();}

为什么要分区

  1. 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上,合理控制分区的任务,可以实现负载均衡的效果。
  2. 提高并行度,生产者可以以分区为单位发送数据,而消费者则可以以分区为单位进行消费

分区策略:

  1. 默认分区策略:

    • 如果在记录中指定了分区,那么直接使用指定的分区

      例如在send方法指定分区2,key为""

          kafkaProducer.send(new ProducerRecord<>("first",2,"", "learning Kafka-" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e == null){System.out.println("主题: "+ recordMetadata.topic() +" 分区:"+recordMetadata.partition());}}});
      
    • 如果未指定分区但存在键key,则根据key的哈希值与topic的partition数目进行取余选择分区

      例如在send方法中不指定分区,设置key

      kafkaProducer.send(new ProducerRecord<>("first","haha", "learning Kafka-" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e == null){System.out.println("主题: "+ recordMetadata.topic() +" 分区:"+recordMetadata.partition());}}});
      
    • 如果不存在分区也没有键key,那么使用黏性分区,会随机选择一个分区并且尽可能一直使用该分区,如果该分区batch已满或者已完成,kafka会再随机一个分区进行使用(和上一个分区不同)。

自定义分区器

首先自定义一个分区器

public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//获取数据String msgValues = value.toString();int partition;//如果发送的数据包含aha字段则发送到0号分区,不包含则发往1号分区if(msgValues.contains("aha")){partition = 0;}else {partition = 1;}return partition;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

在创建Kafka对象之前设置配置,选择自定义的分区器

//关联自定义分区
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.hzp.kafka.producer.MyPartitioner");

注意:如果使用自定义分区的同时,还在send方法内指定分区,那么以指定分区为准。

生产者提高吞吐量

生产者发消息就相当于用货车从本地仓库(缓冲区)送货到kafka,相关的参数有两个,一个是batch size批次大小,一个是linger.ms等待时间。batch size默认为16k,相当于货车的容量大小,如果货车装满了就发往kafka。但是通常情况下等待时间为0ms,也就是每当仓库来了一箱货就直接送到kafka,不管货车是否装满。

因此提高吞吐量主要有以下方法:

  1. 修改linger.ms,增长等待时间或者增加批次大小,让货车尽量装多一点货甚至装满再发送。(等待时间会造成一定的延迟,通常控制在5-100ms)
  2. 发送数据时,采取压缩的方式
  3. 增大缓冲区大小,缓冲区大小通常为32m。相当于增加仓库大小,让仓库能够存储更多的货物。
        //缓冲区大小(单位为kb,默认32M)1024*1024*32properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);//批次大小(单位为kb,默认16kb)1024*16properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);//linger.ms (单位为ms)properties.put(ProducerConfig.LINGER_MS_CONFIG,10);//压缩 设置压缩类型为snappy,可配置的值有gzip、snappy、lz4、zstdproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");

数据可靠性

数据可靠性与ACK应答级别有关

acks:

  • 0:生产者发送过来的数据,不需要等数据落盘就应答。

    如果不等数据落盘就应答,容易造成数据丢失,生产者发送数据就不管了,可靠性差,效率高。

  • 1:生产者发送过来的数据,Leader收到数据后应答

    如果Leader接收到数据,并且应答之后,突然挂掉了,但是此时Leader还没有同步数据给其他节点,此时就造成数据丢失。生产者发送数据Leader应答,可靠性中等,效率中等。

  • -1:生产者发送的数据,Leader和ISR队列中的所有节点收齐数据后应答

    生产者发送数据需要Leader和ISR队列里面所有的Follower应答,可靠性高,效率低。

    如果Leader收到数据并且和Follower同步数据时,有一个Follower因为故障,长时间不能与Leader同步,这应该如何解决?

    解决方案:Leader维护了一个动态的in-sync replica set(ISR)也就是与Leader保持同步的Follower+Leader的集合(Leader:0,ISR:0,1,2)。如果Follower长时间未向Leader发送通信请求或者同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。这样就不用长时间等待以故障的节点。

如果分区副本为1,那么ACK应答-1和1没有区别,挂了数据就直接丢失,如果ISR里面也只有一个(Leader:0,ISR:0),那么说明没有Follower跟Leader同步,那么仍然会数据丢失。因此可以得到:数据完全可靠的条件:ACK级别设置为-1、分区副本大于等于2、ISR应答里的最小副本数量大于等于2。

通常情况下,acks=0很少使用,acks=1主要用于传输普通日志(大量但并不重要的数据),允许个别数据丢失,acks=-1一般用于传输重要的数据比如金钱这类对可靠性要求比较高的场景。

acks=-1仍然存在问题,比如现在Leader:0,ISR:0,1,2。生产者发送数据data,Leader:0接收到data后与1、2同步数据。同步数据完成之后,即将应答之前,Leader突然挂掉了,那么此时就会从1,2中选择一个成为新的Leader。假设1成为新的Leader,此时生产者没有收到应答,再次发送数据data,那么此时Leader:1就接收到了两份data数据,造成数据重复。

java设置acks,以及重试次数

//acks 设置为1
properties.put(ProducerConfig.ACKS_CONFIG,"1");
//重试次数 默认为int的最大值
properties.put(ProducerConfig.RETRIES_CONFIG,3);

数据去重

在刚刚的数据可靠性中,我们知道怎么让数据能够完全可靠,就是让ACK级别设置为-1、分区副本大于等于2、ISR应答里的最小副本数量大于等于2。从数据传递来看,这种设置就是数据传递至少一次(At Least One);而当ACK级别设置为0,那么数据传递最多一次(At Most One)。

At Least One可以保证数据不丢失,但是不能保证数据不重复,At Most One可以保证数据不重复,但是不能保证数据不丢失。那么如果既想数据不丢失,又想数据不重复,此时就要依靠幂等性和事务。

幂等性

幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条数据,保证了不重复。

重复数据的判断标准就是<PID、Partition、SqlNumber>相同的消息,Broker只会持久化一条数据。Pid标识指的是ProducerId,生产者编号,Kafka每重启一次就分配一个新的;Partiton标识分区号;SqlNumber是单调自增的,因此幂等性能够保证在单分区、单会话内不重复。

幂等性的使用只需设置enable.idempotence即可,默认为true,关闭只需设置为false。

事务

事务开启之前,必须先开启幂等性。事务底层依赖幂等性。

数据有序

Kafka单分区内有序,但是多分区时,分区与分区之间无序。

数据乱序

kafka保证数据单分区有序的条件是:

  1. 如果没有开启幂等性,那么需要设置max.in.flight.request.per.connection的值为1
  2. 如果开启幂等性,那么需要设置max.in.flight.request.per.connection的值小于等于5.

在kafka1.x版本之后当kafka启用幂等,那么kafka服务端会缓存producer发来的最近5个request的元数据,而幂等性的实现依赖单调递增的序号SqlNumber。如果发送时出现乱序,那么会根据单调递增的序号进行重排序。也就是说当开启了幂等性并且缓存的请求个数小于5,那么会在服务端进行一次重新排序,让数据有序。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/news/908338.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

分类预测 | Matlab实现CNN-LSTM-Attention高光谱数据分类

分类预测 | Matlab实现CNN-LSTM-Attention高光谱数据分类 目录 分类预测 | Matlab实现CNN-LSTM-Attention高光谱数据分类分类效果功能概述程序设计参考资料 分类效果 功能概述 代码功能 该MATLAB代码实现了一个结合CNN、LSTM和注意力机制的高光谱数据分类模型&#xff0c;核心…

gemini和chatgpt数据对比:谁在卷性能、价格和场景?

先把结论“剧透”给赶时间的朋友&#xff1a;顶配 Gemini Ultra/2.5 Pro 在纸面成绩上普遍领先&#xff0c;而 ChatGPT 家族&#xff08;GPT-4o / o3 / 4.1&#xff09;则在延迟、生态和稳定性上占优。下面把核心数据拆开讲&#xff0c;方便你对号入座。附带参考来源&#xff0…

代码训练LeetCode(23)随机访问元素

代码训练(23)LeetCode之随机访问元素 Author: Once Day Date: 2025年6月5日 漫漫长路&#xff0c;才刚刚开始… 全系列文章可参考专栏: 十年代码训练_Once-Day的博客-CSDN博客 参考文章: 380. O(1) 时间插入、删除和获取随机元素 - 力扣&#xff08;LeetCode&#xff09;力…

C++面试5——对象存储区域详解

C++对象存储区域详解 核心观点:内存是程序员的战场,存储区域决定对象的生杀大权!栈对象自动赴死,堆对象生死由你,全局对象永生不死,常量区对象只读不灭。 一、四大地域生死簿 栈区(Stack) • 特点:自动分配释放,速度极快(类似高铁进出站) • 生存期:函数大括号{}就…

STM32 智能小车项目 L298N 电机驱动模块

今天开始着手做智能小车的项目了 在智能小车或机器人项目中&#xff0c;我们经常会听到一个词叫 “H 桥电机驱动”&#xff0c;尤其是常见的 L298N 模块&#xff0c;就是基于“双 H 桥”原理设计的。那么&#xff0c;“H 桥”到底是什么&#xff1f;为什么要用“双 H 桥”来驱动…

python项目如何创建docker环境

这里写自定义目录标题 python项目创建docker环境docker配置国内镜像源构建一个Docker 镜像验证镜像合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPant…

MySQL-多表关系、多表查询

一. 一对多(多对一) 1. 例如&#xff1b;一个部门下有多个员工 在数据库表中多的一方(员工表)、添加字段&#xff0c;来关联一的一方(部门表)的主键 二. 外键约束 1.如将部门表的部门直接删除&#xff0c;然而员工表还存在其部门下的员工&#xff0c;出现了数据的不一致问题&am…

【 HarmonyOS 5 入门系列 】鸿蒙HarmonyOS示例项目讲解

【 HarmonyOS 5 入门系列 】鸿蒙HarmonyOS示例项目讲解 一、前言&#xff1a;移动开发声明式 UI 框架的技术变革 在移动操作系统的发展历程中&#xff0c;UI 开发模式经历了从命令式到声明式的重大变革。 根据华为开发者联盟 2024 年数据报告显示&#xff0c;HarmonyOS 设备…

【SSM】SpringMVC学习笔记7:前后端数据传输协议和异常处理

这篇学习笔记是Spring系列笔记的第7篇&#xff0c;该笔记是笔者在学习黑马程序员SSM框架教程课程期间的笔记&#xff0c;供自己和他人参考。 Spring学习笔记目录 笔记1&#xff1a;【SSM】Spring基础&#xff1a; IoC配置学习笔记-CSDN博客 对应黑马课程P1~P20的内容。 笔记2…

借助 Spring AI 和 LM Studio 为业务系统引入本地 AI 能力

Spring AI 1.0.0-SNAPSHOTLM Studio 0.3.16qwen3-4b 参考 Unable to use spring ai with LMStudio using spring-ai openai module Issue #2441 spring-projects/spring-ai GitHub LM Studio 下载安装 LM Studio下载 qwen3-4b 模型。对于 qwen3 系列模型&#xff0c;测试…

C++学习-入门到精通【13】标准库的容器和迭代器

C学习-入门到精通【13】标准库的容器和迭代器 目录 C学习-入门到精通【13】标准库的容器和迭代器一、标准模板库简介1.容器简介2.STL容器总览3.近容器4.STL容器的通用函数5.首类容器的通用typedef6.对容器元素的要求 二、迭代器简介1.使用istream_iterator输入&#xff0c;使用…

Vue Router的核心实现原理深度解析

1. Vue Router的基本架构 Vue Router的核心功能是实现前端路由&#xff0c;即在不重新加载页面的情况下更改应用的视图。它的基本架构包括&#xff1a; 路由配置&#xff1a;定义路径与组件的映射关系路由实例&#xff1a;管理路由状态和提供导航方法路由视图&#xff1a;渲染…

设计模式——状态设计模式(行为型)

摘要 状态设计模式是一种行为型设计模式&#xff0c;核心在于允许对象在内部状态改变时改变行为。它通过状态对象封装不同行为&#xff0c;使状态切换灵活清晰。该模式包含环境类、抽象状态类和具体状态类等角色&#xff0c;具有避免大量分支判断、符合单一职责和开闭原则等特…

C++ 观察者模式:设计与实现详解

一、引言 在现代软件开发中,组件间的交互与通信是系统设计的核心挑战之一。观察者模式(Observer Pattern)作为一种行为设计模式,提供了一种优雅的解决方案,用于实现对象间的一对多依赖关系。本文将深入探讨 C++ 中观察者模式的设计理念、实现方式及其应用场景。 二、观察…

Windows 账号管理与安全指南

Windows 账号管理与安全指南 概述 Windows 账号管理是系统安全的基础&#xff0c;了解如何正确创建、管理和保护用户账户对于系统管理员和安全专业人员至关重要。本文详细介绍 Windows 系统中的账户管理命令、隐藏账户创建方法以及安全防护措施。 基础账户管理命令 net use…

[蓝桥杯]摆动序列

摆动序列 题目描述 如果一个序列的奇数项都比前一项大&#xff0c;偶数项都比前一项小&#xff0c;则称为一个摆动序列。即 a2i<a2i−1,a2i1 >a2ia2i​<a2i−1​,a2i1​ >a2i​。 小明想知道&#xff0c;长度为 mm&#xff0c;每个数都是 1 到 nn 之间的正整数的…

Python 网络编程 -- WebSocket编程

作者主要是为了用python构建实时网络通信程序。 概念性的东西越简单越好理解,因此,下面我从晚上摘抄的概念 我的理解。 什么是网络通信? 更确切地说&#xff0c;网络通信是两台计算机上的两个进程之间的通信。比如&#xff0c;浏览器进程和新浪服务器上的某个Web服务进程在通…

GM DC Monitor如何实现TCP端口状态监控-操作分享

本节讲解如何通过现有指标提取监控脚本制作自定义的TCP端口监控指标 一、功能介绍 通过提取已有的监控指标的监控命令&#xff0c;来自定义TCP端口的监控指标。 二、配置端口监控 1&#xff09;定位监控脚本 确定脚本及参数如下&#xff1a; check_protocol_tcp.pl --plug…

LabVIEW与Modbus/TCP温湿度监控系统

基于LabVIEW 开发平台与 Modbus/TCP 通信协议&#xff0c;设计一套适用于实验室环境的温湿度数据采集监控系统。通过上位机与高精度温湿度采集设备的远程通信&#xff0c;实现多设备温湿度数据的实时采集、存储、分析及报警功能&#xff0c;解决传统人工采集效率低、环境适应性…

Ntfs!ReadIndexBuffer函数分析之nt!CcGetVirtualAddress函数之nt!CcGetVacbMiss

第一部分&#xff1a; NtfsMapStream( IrpContext, Scb, LlBytesFromIndexBlocks( IndexBlock, Scb->ScbType.Index.IndexBlockByteShift ), Scb->ScbType.Index.BytesPerIndexBuffer, &am…