Kafka自定义分区策略实战避坑指南

文章目录

    • 概要
    • 代码示例
    • 小结

概要

kafka生产者发送消息默认根据总分区数和设置的key计算哈希取余数,key不变就默认存放在一个分区,没有key则随机数分区,明显默认的是最不好用的,那kafka也提供了一个轮询分区策略,我自己使用的是一言难尽,具体我也没有深究下去,那么针对业务硬性要求消息按照升序或降序轮询分区,就需要我们自己定义分区策略了。

有多少小伙伴第一次配置自定义分区策略时,发现分区总是按照倍数分区,并没有按照指定的规则去分区呢?嘿嘿,相信没阅读过源码的都应该踩过这一个坑,原因在于生产者发送消息时,kafka会先去分区策略那里逛一圈,拿到本次分区值,再去执行下一步流程,而在真正执行发送消息之前,kafka会再次进入分区策略内拿取本次的分区值,那么轮询策略一般按照依次递增或递减,致使发送消息时都会拿到自增两次后的分区值。

好,知道了问题所在,那就简单了,修改逻辑就行了呗,这一块考虑到使用分区策略一般是应对多个消息的产生同时发送,所以就涉及到并发了,那么并发就要考虑线程安全,这里推荐使用原子自增类和原子Boolean(非必要),能不使用锁就不使用锁,具体根据各位的业务而定吧,那话不多说,上代码。

代码示例

package org.example.springkafkademo.config;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;public class CustomerPartitioner implements Partitioner {//针对并发设计,使分区数量原子自增private static AtomicInteger nextPartition  = new AtomicInteger(0);//二次进入判断机制private static AtomicBoolean flag = new AtomicBoolean(false);@Overridepublic int partition(String topic, Object key, byte[] bytes, Object o1, byte[] keyBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);//最大自增值int numPartitions = partitions.size();if (key == null) {//二次判断机制为true则说明自增过一次,需要返回自增之前的值if (flag.get()){flag.set(false);return nextPartition.get()-1;}//原子类将旧值返回再自增int next = nextPartition.getAndIncrement();//如果自增后与大于最大值或相等则直接cas赋值0,使下一次的轮询从0开始if (next >= numPartitions) {nextPartition.compareAndSet(numPartitions, 0);}//标记已经进入过一次flag.set(true);System.out.println("分区值:" + next);return next;} else {// 如果key不为null,则使用默认的分区策略return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

小结

本文分享kafka实现自定义轮询策略,在应对需要将大量的消息轮询发送给分区的场景时,可以采纳本文的代码逻辑,但是并不是适配所有分区轮询,毕竟业务逻辑不是定死的,各位小伙伴一定要结合实际业务逻辑,针对性的对代码进行修改扩展。
有哪里不懂得小伙伴可留言或私信,如与本文章有不同观点欢迎讨论留言,大家一起进步。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/news/907344.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

WPF 全屏显示实现(无标题栏按钮 + 自定义退出按钮)

WPF 全屏显示实现&#xff08;无标题栏按钮 自定义退出按钮&#xff09; 完整实现代码 MainWindow.xaml <Window x:Class"FullScreenApp.MainWindow"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://schemas…

sqli_labs第二十九/三十/三十一关——hpp注入

一&#xff1a;HTTP参数污染&#xff1a; hpp&#xff08;http parameter pollution)注入中&#xff0c;可以通过在hppt的请求中注入多个同名参数来绕过安全过滤 原理&#xff1a;php默认只取最后一个同名参数 比如在这一关里&#xff0c;可能对第一个id参数进行消毒处理&a…

【STM32】按键控制LED 光敏传感器控制蜂鸣器

&#x1f50e;【博主简介】&#x1f50e; &#x1f3c5;CSDN博客专家 &#x1f3c5;2021年博客之星物联网与嵌入式开发TOP5 &#x1f3c5;2022年博客之星物联网与嵌入式开发TOP4 &#x1f3c5;2021年2022年C站百大博主 &#x1f3c5;华为云开发…

华为OD机试真题——斗地主之顺子(2025B卷:100分)Java/python/JavaScript/C/C++/GO最佳实现

2025 B卷 100分 题型 本专栏内全部题目均提供Java、python、JavaScript、C、C++、GO六种语言的最佳实现方式; 并且每种语言均涵盖详细的问题分析、解题思路、代码实现、代码详解、3个测试用例以及综合分析; 本文收录于专栏:《2025华为OD真题目录+全流程解析+备考攻略+经验分…

Qt找不到windows API报错:error: LNK2019: 无法解析的外部符号 __imp_OpenClipboard

笔者在开发中出现的bug完整报错如下&#xff1a; spcm_ostools_win.obj:-1: error: LNK2019: 无法解析的外部符号 __imp_OpenClipboard&#xff0c;函数 "void __cdecl spcmdrv::vCopyToClipboard(char const *,unsigned __int64)" (?vCopyToClipboardspcmdrvYAXPE…

4.8.4 利用Spark SQL实现分组排行榜

在本次实战中&#xff0c;我们的目标是利用Spark SQL实现分组排行榜&#xff0c;特别是计算每个学生分数最高的前3个成绩。任务的原始数据由一组学生成绩组成&#xff0c;每个学生可能有多个成绩记录。我们首先将这些数据读入Spark DataFrame&#xff0c;然后按学生姓名分组&am…

[PyMySQL]

掌握pymysql对数据库实现增删改查数据库工具类封装,数据库操作应用场景数据库操作应用场景 校验测试数据 : 删除员工 :构造测试数据 : 测试数据使用一次就失效,不能重复使用 : 添加员工(is_delete)测试数据在展开测试前无法确定是否存在 : 查询,修改,删除员工操作步骤:!~~~~~~~…

cs224w课程学习笔记-第12课

cs224w课程学习笔记-第12课 知识图谱问答 前言一、问答类型分类二、路径查询(Path queries)2.1 直观查询方法2.2 TransE 扩展2.3 TransE 能力分析 三、连词查询(conjunctive queries)3.1 Query2box 原理1)、投影2)、交集查询&#xff08;AND 操作)3)、联合查询&#xff08;OR 操…

AI任务相关解决方案2-基于WOA-CNN-BIGRU-Transformer模型解决光纤通信中的非线性问题

文章目录 1. 项目背景与研究意义1.1 光纤通信中的非线性问题1.2 神经网络在光纤非线性补偿中的应用现状 2. 现有模型 CNN-BIGRU-attention 分析2.1 模型架构与工作原理2.2 模型性能评估与局限性 3. 新模型优化方案3.1 WOA算法原理与优势3.2 WOA-CNN-BIGRU-MHA模型构建3.3 WOA-C…

HTTP Accept简介

一、HTTP Accept是什么 HTTP协议是一个客户端和服务器之间进行通信的标准协议&#xff0c;它定义了发送请求和响应的格式。而HTTP Accept是HTTP协议中的一个HTTP头部&#xff0c;用于告诉服务器请求方所期望的响应格式。这些格式可以是媒体类型、字符集、语言等信息。 HTTP A…

39-居住证管理系统(小程序)

技术栈: springBootVueMysqlUni-app 功能点: 群众端 警方端 管理员端 群众端: 1.首页: 轮播图展示、公告信息列表 2.公告栏: 公告查看及评论 3.我的: 联系我们: 可在线咨询管理员问题 实时回复 居住证登记申请 回执单查看 领证信息查看 4.个人中心: 个人信息查看及修改…

鸿蒙OSUniApp 开发的滑动图片墙组件#三方框架 #Uniapp

UniApp 开发的滑动图片墙组件 前言 在移动应用中&#xff0c;图片墙是一种极具视觉冲击力的内容展示方式&#xff0c;广泛应用于相册、商品展示、社交分享等场景。一个优秀的滑动图片墙组件不仅要支持流畅的滑动浏览&#xff0c;还要兼容不同设备的分辨率和性能&#xff0c;尤…

碰一碰系统源码搭建==saas系统

搭建“碰一碰”系统&#xff08;通常指基于NFC或蓝牙的短距离交互功能&#xff09;的源码实现&#xff0c;需结合具体技术栈和功能需求。以下是关键步骤和示例代码&#xff1a; 技术选型 NFC模式&#xff1a;适用于Android/iOS设备的近场通信&#xff0c;需处理NDEF协议。蓝牙…

自动驾驶决策规划框架详解:从理论到实践

欢迎来到《自动驾驶决策规划框架详解:从理论到实践》的第二章。在本章中,我们将深入探讨自动驾驶系统中至关重要的“大脑”——决策规划模块。我们将从基本概念入手,逐步解析主流的决策规划框架,包括经典的路径速度解耦方法、工业界广泛应用的Apollo Planning框架、应对复杂…

服务器定时任务查看和编辑

在 Ubuntu 系统中&#xff0c;查看当前系统中已开启的定时任务主要有以下几种方式&#xff0c;分别针对不同类型的定时任务管理方式&#xff08;如 crontab、systemd timer 等&#xff09;&#xff1a; 查看服务器定时任务 一、查看用户级别的 Crontab 任务 每个用户都可以配…

小白的进阶之路系列之四----人工智能从初步到精通pytorch自定义数据集下

本篇涵盖的内容 在之前的文章中,我们已经讨论了如何获取数据,转换数据以及如何准备自定义数据集,本篇文章将涵盖更加深入的问题,希望通过详细的代码示例,帮助大家了解PyTorch自定义数据集是如何应对各种复杂实际情况中,数据处理的。 更加详细的,我们将讨论下面一些内容…

DeepSeek实战:打造智能数据分析与可视化系统

DeepSeek实战:打造智能数据分析与可视化系统 1. 数据智能时代:DeepSeek数据分析系统入门 在数据驱动的决策时代,智能数据分析系统正成为企业核心竞争力。本节将使用DeepSeek构建一个从数据清洗到可视化分析的全流程智能系统。 1.1 系统核心功能架构 class DataAnalysisS…

力扣100题---字母异位词分组

1.字母异位词分组 给你一个字符串数组&#xff0c;请你将 字母异位词 组合在一起。可以按任意顺序返回结果列表。 字母异位词 是由重新排列源单词的所有字母得到的一个新单词。 方法一&#xff1a;字母排序 class Solution {public List<List<String>> groupAnagr…

使用子查询在 SQL Server 中进行数据操作

在 SQL Server 中&#xff0c;子查询&#xff08;Subquery&#xff09;是一种在查询中嵌套另一个查询的技术&#xff0c;可以用来执行复杂的查询、过滤数据或进行数据计算。子查询通常被用在 SELECT、INSERT、UPDATE 或 DELETE 语句中&#xff0c;可以帮助我们高效地解决问题。…

Flask集成pyotp生成动态口令

Python中的pyotp模块是一个用于生成和验证一次性密码&#xff08;OTP&#xff09;的库&#xff0c;支持基于时间&#xff08;TOTP&#xff09;和计数器&#xff08;HOTP&#xff09;的两种主流算法。它遵循RFC 4226&#xff08;HOTP&#xff09;和RFC 6238&#xff08;TOTP&…