基于Kafka实现简单的延时队列

生命无罪,健康万岁,我是laity。

我曾七次鄙视自己的灵魂:

第一次,当它本可进取时,却故作谦卑;

第二次,当它在空虚时,用爱欲来填充;

第三次,在困难和容易之间,它选择了容易;

第四次,它犯了错,却借由别人也会犯错来宽慰自己;

第五次,它自由软弱,却把它认为是生命的坚韧;

第六次,当它鄙夷一张丑恶的嘴脸时,却不知那正是自己面具中的一副;

第七次,它侧身于生活的污泥中,虽不甘心,却又畏首畏尾。

基于Kafka实现简单的延时队列

业务场景:
listener kafka 中的指定topic,接收并处理其中的message,再基于websocket向前端推送数据,前端接收到数据后将数据放置到定时队列中,进行5s的倒计时
,情况1:时间到了进行触发下一步的接口(该操作为自动操作)
,情况2:时间未到有人为干预进行点击进入下一步的接口(该操作为人工操作)。
问题:当前端页面进行切换页面后,前端是无法将定时队列中的数据进行存储,从而进行清空;
解决方案:kafka的延时队列解决切换页面后未处理的message;
1.解决流程:
(1)listener 到 message 并处理后直接进行走一遍自动操作,
(2)并将存入库中的saveId进行返回至websocket推向前端的JSON数据中,
(3)再通过写好的send方法将JSONObject发送至kafka的消息队列中,
(4)之后不论人工还是自动都进行update操作(基于saveId可以去查询到’自动处理’时的留痕,判断update_time的是否存在,不在则进行update(因为触发update只有当前端时间到了或kafka延时队列的listener))。

实现流程

1.引入pom依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><optional>true</optional>
</dependency>

2.yaml进行自动配置

spring:application:# 应用名称name: youServerNamekafka:bootstrap-servers: youKafkaIp:9092consumer:enable-auto-commit: falsevalue-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializerkey-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer

3.各各类的实现

DelayEnum.java

import lombok.AllArgsConstructor;
import lombok.Getter;/*** @author laity*/
@Getter
@AllArgsConstructor
public enum DelayEnum {FIVE_S(5, "topic_message_5s"),TEN_S(10, "topic_message_10s");private final int delay_time;private final String topic_name;
}

KafkaDelayMsg.java

import lombok.Data;import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/*** @author laity */
@Data
public class KafkaDelayMsg implements Delayed {private String msg; // content - 可以换成对应的实体private DelayEnum delayEnum;private long time;public KafkaDelayMsg() {}public KafkaDelayMsg(String msg, DelayEnum delayEnum) {this.msg = msg;this.delayEnum = delayEnum;this.time = System.currentTimeMillis() + delayEnum.getDelay_time() * 1000;}/* 延时队列实现的关键 */@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {return Long.compare(this.time, ((KafkaDelayMsg) o).time);}
}

KafkaUtil.java

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;import java.util.HashMap;
import java.util.Map;/*** @author laity*/
@Slf4j
public class KafkaUtil {KafkaProducer<String, String> kafkaProducer = null;private static final Map<String, Object> map = new HashMap<>();public KafkaUtil() {map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.x.x:9092");map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());}// sendpublic void send(KafkaDelayMsg msg) {KafkaProducer producer = new KafkaProducer(map);if (msg.getDelayEnum().getDelay_time() == DelayEnum.FIVE_S.getDelay_time()) {KafkaDelayQueue.FIVE_S.add(msg);}KafkaDelayQueue.run(producer);}
}

KafkaDelayQueue.java

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** @author laity 设计队列*/
@Slf4j
public class KafkaDelayQueue {public static DelayQueue<KafkaDelayMsg> FIVE_S = new DelayQueue<>();// 实际sendMsg的functionpublic static void run(KafkaProducer<String, String> producer) {ExecutorService executorService = Executors.newFixedThreadPool(5);executorService.execute(() -> {log.info("=============== 开始推送执行 ==================");while (true) {try {KafkaDelayMsg take = FIVE_S.take();// 推送数据RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(take.getDelayEnum().getTopic_name(), take.getMsg())).get();log.info("============================= CONTENT:" + take.toString() + "-" + recordMetadata.topic() + "-" + recordMetadata.partition() + " ===============================");} catch (Exception e) {e.printStackTrace();}}});}
}

KafkaConsumer.java - 业务代码

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Date;/*** @author laity 消费*/@Component
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class KafkaConsumer {private final KelCjTaskResultMapper mapper;// beanRef的功能public String getTopicName() {return DelayEnum.FIVE_S.getTopic_name();}// @KafkaListener(topics = "send_message_5s", groupId = "consumer-laity")@KafkaListener(topics = "#{__listener.getTopicName()}", groupId = "consumer-laity")public void listen(String message) {// 1.实现你的业务}
}

总结

年轻人,你的职责是平整土地,而非焦虑时光。你做三四月的事,在八九月自有答案。我是Laity,正在前进的Laity。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/93015.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/93015.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OceanBase 4.3.5 解析:DDL性能诊断

背景DDL操作通常耗时较长&#xff0c;特别是涉及补数据流程的DDL语句。在执行过程中&#xff0c;用户面临两个主要痛点&#xff1a;一是无法实时获取DDL执行进度&#xff0c;难以区分长时间运行是正常现象还是由内部异常导致的停滞&#xff1b;二是执行效率经常低于预期&#x…

幸福网咖订座点餐小程序的设计与实现

文章目录前言详细视频演示具体实现截图后端框架SpringBoot微信小程序持久层框架MyBaits成功系统案例&#xff1a;参考代码数据库源码获取前言 博主介绍:CSDN特邀作者、985高校计算机专业毕业、现任某互联网大厂高级全栈开发工程师、Gitee/掘金/华为云/阿里云/GitHub等平台持续…

C语言————练习题册(答案版)

目录 每日更新5-10题&#xff0c;感兴趣可以订阅 一.理解函数、操作符、占位符 1.1 欢迎来到C语言的世界 1.2 输入和输出 1.3 浮点数的打印 1.4 字符串的打印 1.14 I am iron man 1.5 求和运算 1.6 计算比例 1.7 求商求余 1.8 不同数位上的数字 1.8.1 求个位数 1.8…

haproxy配置详解

1、haproxy简介 HAProxy是法国开发者 威利塔罗(Willy Tarreau) 在2000年使用C语言开发的一个开源软件 是一款具备高并发(万级以上)、高性能的TCP和HTTP负载均衡器 支持基于cookie的持久性&#xff0c;自动故障切换&#xff0c;支持正则表达式及web状态统计 企业版网站&#xff…

计网-TCP可靠传输

TCP&#xff08;传输控制协议&#xff09;的可靠传输是通过一系列机制保证数据准确、有序、不丢失地到达接收方。以下是TCP可靠传输的详细过程及核心机制&#xff1a;1. 数据分块与序列号&#xff08;Seq&#xff09;分块&#xff1a;应用层数据被分割成适合传输的TCP报文段&am…

数智管理学(三十九)

第三章 数智化对管理理论的冲击第三节 系统理论与生态化管理的强化系统理论作为理解企业运作与环境互动的重要框架&#xff0c;一直强调企业是一个由多个相互关联子系统构成的整体&#xff0c;其核心要素包括整体性、开放性、动态性和反馈机制。在传统管理视角下&#xff0c;这…

哈希表(c语言)

文章目录哈希表哈希表知识点哈希表概念负载因子哈希表的优缺点哈希冲突哈希函数常见哈希函数处理哈希冲突开放定址法线性探测二次探测链地址法哈希表的实现哈希表的核心:HashMap核心函数&#xff1a;从创建到销毁创建哈希表&#xff1a;hashmap_create()销毁哈希表:hashmap_des…

【Canvas与旗帜】条纹版大明三辰旗

【成图】【代码】<!DOCTYPE html> <html lang"utf-8"> <meta http-equiv"Content-Type" content"text/html; charsetutf-8"/> <head><title>十三条纹版大明三辰旗 Draft1</title><style type"text/…

【Java】空指针(NullPointerException)异常深度攻坚:从底层原理到架构级防御,老司机的实战经验

写Java代码这些年&#xff0c;空指针异常&#xff08;NullPointerException&#xff09;就像甩不掉的影子。线上排查问题时&#xff0c;十次有八次最后定位到的都是某个对象没处理好null值。但多数人解决问题只停留在加个if (obj ! null)的层面&#xff0c;没从根本上想过为什么…

【NLP舆情分析】基于python微博舆情分析可视化系统(flask+pandas+echarts) 视频教程 - 主页-评论用户时间占比环形饼状图实现

大家好&#xff0c;我是java1234_小锋老师&#xff0c;最近写了一套【NLP舆情分析】基于python微博舆情分析可视化系统(flaskpandasecharts)视频教程&#xff0c;持续更新中&#xff0c;计划月底更新完&#xff0c;感谢支持。今天讲解主页-评论用户时间占比环形饼状图实现 视频…

Redis面试精讲 Day 5:Redis内存管理与过期策略

【Redis面试精讲 Day 5】Redis内存管理与过期策略 开篇 欢迎来到"Redis面试精讲"系列的第5天&#xff01;今天我们将深入探讨Redis内存管理与过期策略&#xff0c;这是面试中经常被问及的核心知识点。对于后端工程师而言&#xff0c;理解Redis如何高效管理内存、处…

ICMPv6报文类型详解表

一、错误报文类型&#xff08;Type 1-127&#xff09;Type值名称Code范围触发条件示例典型用途1Destination Unreachable0-60: 无路由到目标1: 通信被管理员禁止2: 地址不可达3: 端口不可达4: 分片需要但DF标志设置5: 源路由失败6: 目的地址不可达网络故障诊断2Packet Too Big0…

配置nodejs

第一步确认 node.exe 和 npm 存在 例如安装目录D:\nodejs检查是否存在以下文件&#xff1a; node.exenpm.cmdnpx.cmd 第二步&#xff1a;添加环境变量 PATH 图形化操作步骤&#xff08;Windows&#xff09;&#xff1a; 右键「此电脑」→「属性」点击左侧 「高级系统设置」弹出…

MySQL的命令行客户端

MySQL中的一些程序&#xff1a;MySQL在安装完成的时候&#xff0c;一般都会包含如下程序&#xff1a;在Linux系统下&#xff0c;通过/usr/bin目录下&#xff0c;可以通过命令查看&#xff1a;以下是常用的MySQL程序&#xff1a;程序名作用mysqldMySQL的守护进程即MySQL服务器&a…

C# 值类型与引用类型的储存方式_堆栈_

目录 值类型 引用类型 修改stu3的值 stu也被修改了 为什么? &#xff08;对象之间&#xff09; 值类型中&#xff0c;值全在栈中单独存储&#xff0c;变量之间不会影响 结构体中&#xff0c;结构体全在栈中&#xff0c;结构体与结构体之间也不会相互影响 静态资源区 值类…

解锁永久会员的白噪音软件:睡眠助手

如今的年轻人压力普遍较大&#xff0c;学会解压至关重要。这期就为大家推荐一款优秀的白噪音软件&#xff0c;在压力大时听听&#xff0c;能起到不错的解压效果。 睡眠助手 文末获取 这款软件的特别版本十分出色&#xff0c;知晓的人不多。它已解锁永久会员&#xff0c;无需登…

uniapp使用css实现进度条带动画过渡效果

一、效果 二、实现原理 1.uni.createAnimation 动画函数 2.初始化uni.createAnimation方法 3.监听值的变化调用动画执行方法 三、代码 1.实现方式比较简单&#xff0c;目前是vue3的写法&#xff0c;vue2只需要稍微改动即可 <template><view class"layout_progre…

高级分布式系统调试:调试的科学与 USE 方法实战

高级分布式系统调试:调试的科学与 USE 方法实战 前言:从“救火”到“探案” 当一个复杂的分布式系统出现“灰色故障”——例如“服务有时会变慢”、“偶尔出现超时错误”——我们该从何处着手?随机地查看 Grafana 仪表盘,或者漫无目的地 tail -f 日志,往往效率低下,甚至…

栈算法之【有效括号】

目录 LeetCode-20题 LeetCode-20题 给定一个只包括 (&#xff0c;)&#xff0c;{&#xff0c;}&#xff0c;[&#xff0c;] 的字符串 s &#xff0c;判断字符串是否有效。 有效字符串需满足&#xff1a; 左括号必须用相同类型的右括号闭合。 左括号必须以正确的顺序闭合。 每…

大模型——Data Agent:超越 BI 与 AI 的边界

Data Agent:超越 BI 与 AI 的边界 1. 数据工具的演进路径 在数据分析领域,技术工具经历了多个阶段的演进。这些演进不仅反映了技术的进步,也体现了用户需求和使用场景的变化。 Excel 时代:告别手工作业,陷入“表格泥潭“,早期数据分析依赖 Excel,实现基础数据记录、计…