[特殊字符] 使用增量同步+MQ机制将用户数据同步到Elasticsearch

在开发用户搜索功能时,我们通常会将用户信息存储到 Elasticsearch(简称 ES) 中,以提高搜索效率。本篇文章将详细介绍我们是如何实现 MySQL 到 Elasticsearch 的增量同步,以及如何通过 MQ 消息队列实现用户信息实时更新 的机制。

一、整体思路

为了保证用户数据在 MySQL 与 ES 之间保持一致,我们采用了以下 双通道同步策略

  1. 定时任务 + 游标机制:实现 MySQL 到 ES 的增量同步

  2. 通过 MQ(消息队列) 实现实时同步用户更新/删除操作到 ES


二、定时任务增量同步逻辑详解

我们定义了一个定时任务 syncUserDataToESJob,主要用于从 user 表中 增量拉取变动数据,并同步到 ES。

✨ 增量拉取机制

为了避免全量同步的高开销,我们使用了 “更新时间 + 主键 ID”双重游标,实现分页增量同步:

List<User> usersBatch = userClient.selectIncrementalUsers(lastSyncTime, lastMaxId, PAGE_SIZE);

其中:

  • lastSyncTime 表示上次同步的最大更新时间

  • lastMaxId 用于处理相同更新时间下的并发写入

🧠 同步逻辑核心代码如下:

@XxlJob("syncUserDataToESJob")
@GlobalTransactional
public void syncUserData() {Date lastSyncTime = syncPointService.getLastSyncTime();Long lastMaxId = syncPointService.getLastMaxId();if (lastSyncTime == null) {lastSyncTime = new Date(0); // 默认从最早开始lastMaxId = 0L;}Date maxUpdateTime = lastSyncTime;Long maxId = lastMaxId;boolean hasNewData = false;while (true) {List<User> usersBatch = userClient.selectIncrementalUsers(lastSyncTime, lastMaxId, PAGE_SIZE);if (usersBatch.isEmpty()) break;hasNewData = true;List<EsUserDoc> esDocs = usersBatch.stream().map(this::convertToEsDoc).collect(Collectors.toList());esClient.bulkIndex(esDocs);for (User u : usersBatch) {Date updateTime = u.getUpdateTime();if (updateTime.after(maxUpdateTime)) {maxUpdateTime = updateTime;maxId = u.getId();} else if (updateTime.equals(maxUpdateTime) && u.getId() > maxId) {maxId = u.getId();}}lastSyncTime = maxUpdateTime;lastMaxId = maxId;}// 同步删除数据List<Long> deletedUserIds = userClient.selectDeletedUserIds(syncPointService.getLastSyncTime(), syncPointService.getLastMaxId());if (!deletedUserIds.isEmpty()) {esClient.bulkDeleteByIds(deletedUserIds);}if (hasNewData) {log.info("更新同步点:maxUpdateTime = {}, maxId = {}", maxUpdateTime, maxId);syncPointService.updateLastSyncPoint(maxUpdateTime, maxId);} else {log.info("本次没有增量数据,不更新同步点");}
}

📝 特别说明:

  • syncPointService 用于记录上次同步的时间点和 ID,保证每次定时任务可重复安全执行。

  • 如果服务中断重启,也不会造成数据丢失或重复。


三、用户修改通过 MQ 实时同步到 ES

虽然定时任务可以周期性同步,但如果用户更新昵称、头像、标签等信息,等待下一次定时任务才能生效,可能会造成 数据延迟

为此,我们引入了 消息队列机制,实现实时更新:

✅ 使用 MQ 的同步方案

  1. 用户信息发生变化时,在业务服务中发送一条消息:

UserUpdateMessage message = new UserUpdateMessage(userId);
rabbitTemplate.convertAndSend("user.topic.exchange", "user.update", message);
  1. 在 ES 同步服务中监听消息并处理:

@RabbitListener(queues = "user.update.queue")
public void onUserUpdate(UserUpdateMessage msg) {User user = userClient.getUserById(msg.getUserId());if (user != null) {EsUserDoc doc = convertToEsDoc(user);esClient.index(doc);}
}

💡 好处:

  • 实时:用户更新后立即同步到 ES

  • 解耦:业务逻辑与搜索逻辑分离

  • 高性能:避免频繁更新 ES


四、总结与展望

通过“定时任务 + 增量游标” 和 “消息队列实时更新” 的结合方案,我们实现了对用户数据高效且可靠的同步到 Elasticsearch。

同步方式特点使用场景
定时任务批量、容错性强周期性同步新增/修改/删除
MQ 实时快速、解耦用户主动更新资料时快速生效

未来我们还可以扩展以下能力:

  • 引入 Canal + Binlog 监听实现更彻底的实时同步

  • 支持多租户分库分表的场景下数据同步

  • 引入失败重试机制保障消息不丢


希望本文对你在做数据同步或 ES 架构设计时有所启发,欢迎点赞、收藏、评论交流!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/web/81289.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MyBatis缓存机制全解析

在MyBatis中&#xff0c;缓存分为一级缓存和二级缓存&#xff0c;它们的主要目的是减少数据库的访问次数&#xff0c;提高查询效率。下面简述这两种缓存的工作原理&#xff1a; 一、 一级缓存&#xff08;SqlSession级别的缓存&#xff09; 一级缓存是MyBatis默认开启的缓存机…

【短距离通信】【WiFi】WiFi7关键技术之4096-QAM、MRU

目录 3. 4096-QAM 3.1 4096-QAM 3.2 QAM 的阶数越高越好吗&#xff1f; 4. MRU 4.1 OFDMA 和 RU 4.2 MRU 资源分配 3. 4096-QAM 摘要 本章主要介绍了Wi-Fi 7引入的4096-QAM对数据传输速率的提升。 3.1 4096-QAM 对速率的提升 Wi-Fi 标准一直致力于提升数据传输速率&a…

【二刷力扣】【力扣热题100】今天的题目是:283.移动零

题目&#xff1a; 给定一个数组 nums&#xff0c;编写一个函数将所有 0 移动到数组的末尾&#xff0c;同时保持非零元素的相对顺序。 请注意 &#xff0c;必须在不复制数组的情况下原地对数组进行操作。 示例 1: 输入: nums [0,1,0,3,12] 输出: [1,3,12,0,0] 示例 2: 输…

机器学习中的多GPU训练模式

文章目录 一、数据并行&#xff08;Data Parallelism&#xff09;二、模型并行&#xff08;Model Parallelism&#xff09;1. 模型并行2. 张量并行&#xff08;Tensor Parallelism&#xff09; 三、流水线并行&#xff08;Pipeline Parallelism&#xff09;四、混合并行&#x…

《JavaScript 性能优化:从原理到实战的全面指南》

《JavaScript 性能优化&#xff1a;从原理到实战的全面指南》 一、JavaScript 性能优化基础理论 在深入探讨 JavaScript 性能优化技术之前&#xff0c;我们需要明白JavaScript 的执行机制和性能瓶颈产生的根本原因。JavaScript 是一种单线程、非阻塞的脚本语言&#xff0c;其…

选择合适的Azure数据库监控工具

Azure云为组织提供了众多服务&#xff0c;使其能够无缝运行应用程序、Web服务和服务器部署&#xff0c;其中包括云端数据库部署。Azure数据库能够与云应用程序实现无缝集成&#xff0c;具备可靠、易扩展和易管理的特性&#xff0c;不仅能提升数据库可用性与性能&#xff0c;同时…

9.4在 VS Code 中配置 Maven

在 VS Code 中配置 Maven 需要完成 Maven 环境安装 一、安装 Maven&#xff08;如果未安装&#xff09; 下载 Maven 访问 Apache Maven 官网&#xff0c;下载最新版本的 Maven&#xff08;如apache-maven-3.9.9-bin.zip&#xff09;。 解压文件 将下载的 ZIP 文件解压到本地目…

影刀自动化流程复用技巧:流程复用

草莓时刻会创建一个新的空白流程。但是很多时候需要复用过往基础流程&#xff0c;在此基础上进行修改即可。而而不是重新创建基础流程。 为了解决这个问题&#xff0c;我们需要了解一下影刀流程的基础结构。 影刀流程基础结构概览 影刀自动化流程的基础结构主要包括几个关键组…

理论篇六:如何在Webpack中实现持久化缓存?

在 Webpack 中实现持久化缓存可以显著提升构建速度,尤其是在大型项目中。以下是 7 种核心策略 及其详细配置方法: 一、文件哈希命名(Content Hash) 确保文件内容变化时哈希值才改变,利用浏览器缓存。 // webpack.config.js output: {filename: [name].[contenthash:8].j…

C++单例模式与线程安全

C单例模式的线程安全实践与优化-CSDN博客 https://www.zhihu.com/question/56527586/answer/2344903391 C11中的单例模式 在C11及更高版本中&#xff0c;可以使用std::call_once和std::once_flag来确保单例实例的线程安全初始化。这种方法不需要显式地使用互斥锁&#xff0c…

UE5 图片导入,拖到UI上变色

UE5会自动把蓝色的图片当成法线贴图处理&#xff0c;非常傻逼 双击出问题的图片&#xff0c;右侧面板将压缩设置从法线改回默认

服务器安装xfce桌面环境并通过浏览器操控

最近需要运行某个浏览器的脚本&#xff0c;但是服务器没有桌面环境&#xff0c;无法使用&#xff0c;遂找到了KasmVNC&#xff0c;并配合xfce实现低占用的桌面环境&#xff0c;可以直接使用浏览器进行操作 本文基于雨云——新一代云服务提供商的Debian11服务器操作&#xff0c;…

Python函数全面解析:从基础到高级特性

文章目录 Python函数全面解析&#xff1a;从基础到高级特性一、函数基础概念1. 什么是函数&#xff1f;2. 函数的组成部分 二、函数的参数传递1. 参数类型对比2. 参数传递示例 三、函数的作用域作用域示例global和nonlocal关键字 四、函数的属性和方法1. 函数的特殊属性2. 函数…

Ubuntu20.04的安装(VMware)

1.Ubuntu20.04.iso文件下载 下载网址&#xff1a;ubuntu-releases-20.04安装包下载_开源镜像站-阿里云 2.创建虚拟环境 2.1打开VMware与创建新虚拟机 点击创建新虚拟机 如果没下好可以点击稍后安装操作系统 选择linux版本选择Ubuntu 64位然后点击下一步。 注意这里需要选择一…

Kafka 的日志清理策略:delete 和 compact

Kafka delete 日志清理策略&#xff08;日志删除&#xff09; 原理&#xff1a;按照一定保留策略&#xff0c;直接删除不符合条件的日志分段。Kafka 把 topic 的一个 partition 大文件分成多个小文件段&#xff0c;通过这种方式&#xff0c;能方便地定期清除或删除已消费完的文…

Go语言中常量的命名规则详解

1. 常量的基本命名规则 1.1. 命名格式 1. 使用const关键字声明&#xff1b; 2. 命名格式&#xff1a;const 常量名 [类型] 值&#xff1b; 3. 类型可以省略&#xff0c;由编译器推断&#xff1b; 1.2. 命名风格 大小写规则&#xff1a; 1. 首字母大写&#xff1a;导出常…

22、web场景-web开发简介

22、web场景-web开发简介 Web开发是指创建和维护在互联网上运行的网站和应用程序的过程。它涉及多个技术领域&#xff0c;包括前端开发、后端开发和数据库管理&#xff0c;共同实现网站的功能和用户体验。 ### 一、Web开发的基本概念 #### 1. **Web应用程序** - **狭义上**&am…

Structured Query Language(SQL)它到底是什么?

Structured Query Language&#xff08;SQL&#xff09; 的中文意思是 “结构化查询语言”&#xff0c;它是一种专门用于管理和操作关系型数据库的标准化编程语言。以下是其核心含义和用途的总结&#xff1a; 1. 核心功能 定义数据&#xff1a;创建、修改数据库结构&#xff08…

ubuntu22.04上运行opentcs6.4版本

1、下载github上的源码&#xff1a; openTCS - Downloads 2、安装java21 我的版本是&#xff1a;java --version java 21.0.6 2025-01-21 LTS Java(TM) SE Runtime Environment (build 21.0.68-LTS-188) Java HotSpot(TM) 64-Bit Server VM (build 21.0.68-LTS-188, mixed mo…

游戏引擎学习第307天:排序组可视化

简短谈谈直播编程的一些好处。 上次结束后&#xff0c;很多人都指出代码中存在一个拼写错误&#xff0c;因此这次我们一开始就知道有一个 bug 等待修复&#xff0c;省去了调试寻找错误的时间。 今天的任务就是修复这个已知 bug&#xff0c;然后继续排查其他潜在的问题。如果短…