基于Kafka的延迟队列

实现原理

通过topic区分不同的延迟时长,每个topic对于一个延迟,比如 topic100 仅存储延迟 100ms 的消息,topic1000 仅存储延迟 1s 的消息,依次类推。

在这里插入图片描述

生产消息时,消息需按延迟时长投递到对应的topic。消费消息时,检查消息的时间,如果未到达延迟时长,则sleep剩余的时长后再处理。这样就简单的实现了基于kafka的延迟队列。死信队列,可作为一种特殊的延迟队列,比如延迟 3600000ms 的处理。

消费者实现

package mainimport ("context""time""github.com/IBM/sarama""github.com/sirupsen/logrus"
)// 定义每个topic对应的延迟时间(ms)
var topicDelayConfig = map[string]time.Duration{"delay-100ms":  100 * time.Millisecond,"delay-200ms":  200 * time.Millisecond,"delay-500ms":  500 * time.Millisecond,"delay-1000ms": 1000 * time.Millisecond,
}type delayConsumerHandler struct {// 可以添加必要的依赖,如业务处理器等
}func (h *delayConsumerHandler) Setup(sess sarama.ConsumerGroupSession) error {logrus.Info("延迟队列消费者初始化完成")return nil
}func (h *delayConsumerHandler) Cleanup(sess sarama.ConsumerGroupSession) error {logrus.Info("延迟队列消费者清理完成")return nil
}// ConsumeClaim 处理分区消息,实现延迟逻辑
func (h *delayConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {topic := claim.Topic()delay, exists := topicDelayConfig[topic]if !exists {logrus.Errorf("topic %s 未配置延迟时间,跳过消费", topic)// 标记所有消息为已消费,避免重复处理for range claim.Messages() {sess.MarkMessage(msg, "")}return nil}// 按顺序处理消息(假设消息时间有序)for msg := range claim.Messages() {// 检查会话是否已关闭(如重平衡发生)select {case <-sess.Context().Done():logrus.Info("会话已关闭,停止消费")return nildefault:}// 计算需要延迟的时间// 消息应该被处理的时间 = 消息产生时间 + 主题延迟时间produceTime := msg.TimestampprocessTime := produceTime.Add(delay)now := time.Now()// 如果当前时间未到处理时间,计算需要休眠的时间if now.Before(processTime) {sleepDuration := processTime.Sub(now)logrus.Debugf("消息需要延迟处理,topic=%s, offset=%d, 需等待 %v (产生时间: %v, 预计处理时间: %v)",topic, msg.Offset, sleepDuration, produceTime, processTime,)// 休眠期间监听会话关闭信号,避免阻塞重平衡select {case <-sess.Context().Done():logrus.Info("休眠期间会话关闭,停止消费")return nilcase <-time.After(sleepDuration):// 休眠完成,继续处理}}// 延迟时间已到,处理消息h.processMessage(msg)// 标记消息为已消费sess.MarkMessage(msg, "")}return nil
}// 实际业务处理逻辑
func (h *delayConsumerHandler) processMessage(msg *sarama.ConsumerMessage) {logrus.Infof("处理延迟消息,topic=%s, partition=%d, offset=%d, key=%s, value=%s, 产生时间=%v",msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value), msg.Timestamp,)// 这里添加实际的业务处理代码
}// 初始化消费者示例
func newDelayConsumer(brokers []string, groupID string) (sarama.ConsumerGroup, error) {config := sarama.NewConfig()config.Version = sarama.V2_8_1_0 // 指定Kafka版本config.Consumer.Return.Errors = trueconfig.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange// 确保消息的Timestamp是创建时间(需要Kafka broker配置支持)config.Consumer.Fetch.Min = 1config.Consumer.Fetch.Default = 1024 * 1024return sarama.NewConsumerGroup(brokers, groupID, config)
}func main() {brokers := []string{"localhost:9092"}groupID := "delay-queue-group"topics := []string{"delay-100ms", "delay-200ms", "delay-500ms", "delay-1000ms"}consumer, err := newDelayConsumer(brokers, groupID)if err != nil {logrus.Fatalf("创建消费者失败: %v", err)}defer consumer.Close()handler := &delayConsumerHandler{}ctx := context.Background()// 持续消费for {if err := consumer.Consume(ctx, topics, handler); err != nil {logrus.Errorf("消费出错: %v", err)// 简单重试逻辑time.Sleep(5 * time.Second)}}
}

生产者实现

package mainimport ("errors""time""github.com/IBM/sarama""github.com/sirupsen/logrus"
)// 定义允许的延迟时长(毫秒)及其对应的Topic
var allowedDelays = map[time.Duration]string{100 * time.Millisecond:  "delay-100ms",200 * time.Millisecond:  "delay-200ms",500 * time.Millisecond:  "delay-500ms",1000 * time.Millisecond: "delay-1000ms",// 可根据需要添加更多允许的延迟时长
}// DelayProducer 延迟消息生产者
type DelayProducer struct {producer sarama.SyncProducer
}// NewDelayProducer 创建延迟消息生产者实例
func NewDelayProducer(brokers []string) (*DelayProducer, error) {config := sarama.NewConfig()config.Version = sarama.V2_8_1_0 // 匹配Kafka版本config.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Retry.Max = 3config.Producer.Return.Successes = trueproducer, err := sarama.NewSyncProducer(brokers, config)if err != nil {return nil, err}return &DelayProducer{producer: producer,}, nil
}// SendDelayMessage 发送延迟消息
// 参数:
//   - key: 消息键
//   - value: 消息内容
//   - delay: 延迟时长
// 返回:
//   - 消息的分区和偏移量
//   - 错误信息(若延迟不合法或发送失败)
func (p *DelayProducer) SendDelayMessage(key, value []byte, delay time.Duration) (partition int32, offset int64, err error) {// 1. 校验延迟时长是否合法topic, ok := allowedDelays[delay]if !ok {return 0, 0, errors.New("invalid delay duration, allowed values are: 100ms, 200ms, 500ms, 1000ms")}// 2. 创建消息,设置当前时间为消息时间戳(供消费者计算延迟)msg := &sarama.ProducerMessage{Topic:     topic,Key:       sarama.ByteEncoder(key),Value:     sarama.ByteEncoder(value),Timestamp: time.Now(), // 记录消息发送时间,用于消费者计算处理时间}// 3. 发送消息partition, offset, err = p.producer.SendMessage(msg)if err != nil {logrus.Errorf("发送延迟消息失败: %v, 延迟时长: %v", err, delay)return 0, 0, err}logrus.Infof("发送延迟消息成功, topic: %s, 分区: %d, 偏移量: %d, 延迟时长: %v",topic, partition, offset, delay)return partition, offset, nil
}// Close 关闭生产者
func (p *DelayProducer) Close() error {return p.producer.Close()
}// 使用示例
func main() {// 初始化生产者producer, err := NewDelayProducer([]string{"localhost:9092"})if err != nil {logrus.Fatalf("初始化生产者失败: %v", err)}defer producer.Close()// 发送合法延迟消息_, _, err = producer.SendDelayMessage([]byte("test-key"),[]byte("这是一条延迟消息"),100*time.Millisecond, // 合法延迟)if err != nil {logrus.Error("发送消息失败:", err)}// 尝试发送非法延迟消息(会被拒绝)_, _, err = producer.SendDelayMessage([]byte("test-key"),[]byte("这是一条非法延迟消息"),300*time.Millisecond, // 不允许的延迟)if err != nil {logrus.Error("发送消息失败:", err) // 会输出非法延迟的错误}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/920797.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/920797.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LabVIEW转速仪校准系统

LabVIEW 与机器视觉的智能校准系统以工控机为核心&#xff0c;整合标准源、智能相机等硬件&#xff0c;通过软件实现校准流程自动化&#xff0c;支持 500-6000r/min 转速范围校准&#xff0c;覆盖 5 类转速测量仪&#xff0c;校准时间缩短约 70%&#xff0c;满足计量院高效、精…

Synchronized 概述

1. 初识 synchronized 是 Java 中的关键字&#xff0c;是一种 同步锁 &#xff0c;可重入锁&#xff0c;悲观锁。它修饰的对象有以下几种&#xff1a; 具体表现为以下3种形式。 对于普通同步方法&#xff0c;锁是当前实例对象。 对于静态同步方法&#xff0c;锁是当前类的 Clas…

通过Auth.log来查看VPS服务器是否被扫描和暴力破解及解决办法

说明&#xff1a;很多人vps可能出现过被扫的情况&#xff0c;有的还被爆破了&#xff0c;这里提供下查看方法 查看用密码登陆成功的IP地址及次数grep "Accepted password for root" /var/log/auth.log | awk {print $11} | sort | uniq -c | sort -nr | more查看用密…

碰一碰发视频手机版源码开发:支持OEM

**从事开发 20 年&#xff0c;见过不少技术风口起起落落&#xff0c;最近 “碰一碰发视频” 又成了热门话题。不少同行或刚入行的年轻人来问我&#xff0c;手机版源码开发该从哪下手&#xff0c;怕踩坑、怕走弯路。今天就以一个老程序员的视角&#xff0c;把碰一碰发视频手机版…

只出现一次的数字(总结)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录前言一、给定一个整数数组nums&#xff0c;除了某个元素只出现一次以外&#xff0c;其余元素均出现两次。找出那个只出现一次的元素二、给你一个整数数组nums&#x…

Cesium 入门教程(十一):Camera相机功能展示

文章目录一&#xff0c;Cesium 实际示例&#xff08;含源代码&#xff09;1&#xff0c;vuecesium&#xff1a; 围绕一个固定点自动左右旋转2&#xff0c;vuecesium&#xff1a; flyto一个具体的实体位置3&#xff0c;vuecesium&#xff1a; flyto一个具体的点位置4&#xff0c…

go语言基本排序算法

package mainimport "fmt"func main() {BubbleSort()SelectSort()InsertSort()MergeSort()QuickSort()HeapSort()ShellSort() }//冒泡排序 func BubbleSort() {str : []int{9, 1, 5, 8, 3, 7, 4, 6, 2}for i : 0; i < len(str)-1; i {flag : falsefor j : len(str…

一步完成CalDAV账户同步,日历服务助力钉钉日历日程集中管理

在信息爆炸节奏飞快的今天&#xff0c;高效的管理时间已经成为我们工作和生活中的核心竞争力&#xff0c;复杂纷繁的日程安排&#xff0c;无处不在的提醒需求以及跨设备同步的困扰&#xff0c;这些问题仿佛都在呼唤着一个更智能、更便捷、更可靠的解决方案。 而华为日历App&am…

企业内部机密视频安全保护|如何防止企业内部机密视频泄露?

在企业数字化进程飞速发展的今天&#xff0c;视频内容已成为承载企业内部培训、战略会议、产品机密和核心技术的关键载体。一次意外的泄露&#xff0c;不仅可能导致知识产权流失&#xff0c;更会让企业声誉和市场竞争力遭受重创。面对无孔不入的安全威胁&#xff0c;企业该如何…

C# Deconstruct | 简化元组与对象的数据提取

官方文档&#xff1a;析构元组和其他类型 - C# | Microsoft Learn 标签&#xff1a;Deconstruct、Tuple、record、模式匹配 PS&#xff1a;record相关内容后续还会继续更新&#x1f504; 模式匹配可以查看我的另一篇&#x1f449;模式匹配 目录1. 概述2. 基本用法2.1 元组解…

R 语言 ComplexUpset 包实战:替代 Venn 图的高级集合可视化方案

摘要 在生物信息学、数据挖掘等领域的集合分析中,传统 Venn 图在多维度数据展示时存在信息拥挤、可读性差等问题。本文基于 R 语言的 ComplexUpset 包,以基因表达研究为场景,从包安装、数据准备到可视化实现,完整演示如何制作正刊级别的集合交集图,解决多条件下差异基因(…

​导游|基于SprinBoot+vue的在线预约导游系统

在线预约导游系统 基于SprinBootvue的在线预约导游系统 一、前言 二、系统设计 三、系统功能设计 前台功能实现 后台功能实现 管理员模块实现 导游模块实现 用户模块实现 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取&am…

SQL server 异常 出现错误 824

2025-08-27 01:36:37,324 ERROR c.z.i.w.DatabaseUtils [Scheduled-7] Error executeStoredProcedure SQL script: sp_RefreshDWDByDateFive警告: 在 08 27 2025 1:36AM 出现错误 824。请记录该错误和时间&#xff0c;并与您的系统管理员联系。 2025-08-27 01:36:37,332 ERROR …

制造业生产线连贯性动作识别系统开发

制造业生产线连贯性动作识别系统开发 第一部分&#xff1a;项目概述与理论基础 1.1 项目背景与意义 在现代智能制造环境中&#xff0c;尽管自动化程度不断提高&#xff0c;但人工操作仍然在复杂装配任务中扮演着不可替代的角色。研究表明&#xff0c;人机协作被视为打破传统人机…

什么是Jmeter? Jmeter工作原理是什么?

&#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 第一篇 什么是 JMeter&#xff1f;JMeter 工作原理 1.1 什么是 JMeter Apache JMeter 是 Apache 组织开发的基于 Java 的压力测试工具。用于对软件做压力测试&a…

Linux网络基础1(一)之计算机网络背景

文章目录计算机网络背景网络发展认识 "协议"高小琴例子方言例子计算机网络背景 网络发展 独立模式: 计算机之间相互独立; 网络互联: 多台计算机连接在一起, 完成数据共享; 局域网LAN: 计算机数量更多了, 通过交换机和路由器连接在一起; 广域网WAN: 将远隔千里的计算…

如何在数学建模赛中实现模型创新?

模型创新性在国赛数学建模中&#xff0c;完备性是论文的基本要求&#xff0c;而创新性则是决定论文能否脱颖而出的关键因素。所谓创新&#xff0c;并不仅仅指提出完全新颖的数学理论&#xff0c;而是能够在已有方法的基础上&#xff0c;通过新的问题切入点、假设修正、模型优化…

【重磅发布】flutter_chen_updater-版本升级更新

Flutter Chen Updater 一个功能强大的Flutter应用内更新插件&#xff0c;支持Android APK自动下载、安装和iOS跳转App Store。 ✨ 特性 ✅ 跨平台支持: Android APK自动更新&#xff0c;iOS跳转App Store✅ 智能下载: 支持断点续传、文件校验、多重备用方案✅ 权限管理: 自动处…

docker 1分钟 快速搭建 redis 哨兵集群

使用 docker-compose 1 分钟搭建好 1主2从3哨兵的 redis 哨兵集群 目录结构 redis-sentinel-cluster ├── check_redis.sh ├── docker-compose.yml ├── redis │ └── redis.conf ├── sentinel │ └── sentinel.confdocker-compose.yml 配置 version: 3…

Git与DevOps实战:从版本控制到自动化部署

一、版本控制1.什么是版本控制&#xff1f;版本控制用于高效追踪和管理项目开发中的代码、配置及文档变更历史&#xff0c;确保团队成员始终使用正确版本&#xff0c;并支持版本回溯、差异比较和文件恢复。它能带来以下优势&#xff1a;通过历史记录保障数据安全与完整性&#…