flink sql读hive catalog数据,将string类型的时间戳数据排序后写入kafka,如何保障写入kafka的数据是有序的

在 Flink SQL 中,要确保从 Hive 读取的 STRING 类型时间戳数据排序后有序写入 Kafka,需要结合 批处理模式时间类型转换单分区写入Kafka 生产者配置。以下是完整解决方案:

一、核心解决方案

1. 批处理模式 + 全局排序

将作业设置为批处理模式,并对字符串时间戳进行类型转换后排序:

-- 设置为批处理模式(关键!)
SET 'execution.runtime-mode' = 'batch';-- 从 Hive 读取数据
CREATE TABLE hive_source (id STRING,ts_str STRING,  -- 字符串类型的时间戳,如 '2024-01-01 12:00:00' 或 '1718524800000'value DOUBLE
) WITH ('connector' = 'hive','catalog-name' = 'myhive','database-name' = 'default','table-name' = 'my_table'
);-- 创建 Kafka 目标表(单分区)
CREATE TABLE kafka_sink (id STRING,ts_str STRING,value DOUBLE
) WITH ('connector' = 'kafka','topic' = 'output_topic','properties.bootstrap.servers' = 'kafka:9092','properties.max.in.flight.requests.per.connection' = '1',  -- 确保生产者按顺序发送'properties.acks' = 'all',  -- 等待所有副本确认'format' = 'json'
);-- 转换时间戳类型并全局排序后写入 Kafka
INSERT INTO kafka_sink
SELECT id,ts_str,value
FROM hive_source
ORDER BY CASE WHEN REGEXP_EXTRACT(ts_str, '^\\d{4}-\\d{2}-\\d{2}', 0) != '' THEN TO_TIMESTAMP(ts_str)  -- 处理 'yyyy-MM-dd HH:mm:ss' 格式ELSE TO_TIMESTAMP_LTZ(CAST(ts_str AS BIGINT), 3)  -- 处理毫秒时间戳END ASC;  -- 按时间升序排列

2. 强制写入单 Kafka 分区

通过 固定分区键 确保所有数据写入同一 Kafka 分区:

-- 创建带分区键的 Kafka 表
CREATE TABLE kafka_sink (id STRING,ts_str STRING,value DOUBLE,partition_key STRING  -- 用于分区的字段
) WITH ('connector' = 'kafka','topic' = 'output_topic','properties.bootstrap.servers' = 'kafka:9092','format' = 'json','sink.partitioner' = 'fixed'  -- 使用固定分区器
);-- 写入时指定相同分区键(确保所有数据在同一分区内有序)
INSERT INTO kafka_sink
SELECT id,ts_str,value,'fixed_key' AS partition_key  -- 固定分区键,所有数据写入同一分区
FROM (SELECT *,CASE WHEN REGEXP_EXTRACT(ts_str, '^\\d{4}-\\d{2}-\\d{2}', 0) != '' THEN TO_TIMESTAMP(ts_str) ELSE TO_TIMESTAMP_LTZ(CAST(ts_str AS BIGINT), 3) END AS ts_time  -- 转换为时间类型FROM hive_source
)
ORDER BY ts_time ASC;  -- 按转换后的时间排序

二、关键配置说明

配置项作用
execution.runtime-mode = 'batch'启用批处理模式,支持全局排序(流模式仅支持时间属性字段排序)
properties.max.in.flight.requests.per.connection = '1'限制 Kafka 生产者并发请求数,确保消息按顺序发送
properties.acks = 'all'等待所有 Kafka 副本确认,保证消息不丢失
sink.partitioner = 'fixed'使用固定分区器,结合相同分区键,确保所有数据写入同一分区

三、注意事项

  1. 时间戳格式适配

    • 代码示例中通过 REGEXP_EXTRACT 自动判断格式(字符串日期或毫秒),需根据实际数据调整。
    • 若格式固定,可简化为单一转换函数(如 TO_TIMESTAMP(ts_str))。
  2. 性能与有序性权衡

    • 单分区写入会导致吞吐量下降,适合对顺序要求极高但数据量较小的场景。
    • 若数据量大,可考虑按时间窗口分组,每个窗口内有序写入不同分区。
  3. Kafka 主题配置

    • 确保 Kafka 主题的分区数至少为 1。若需更高吞吐量,可增加分区但需接受不同分区间可能乱序。

四、验证方法

  1. 检查 Kafka 消息顺序

    kafka-console-consumer.sh \--bootstrap-server kafka:9092 \--topic output_topic \--from-beginning | jq -r '.ts_str'  # 使用 jq 解析 JSON 中的时间戳字段
    
  2. 在 Flink WebUI 中观察

    • 访问 http://jobmanager-host:8081,查看作业是否正常完成,以及 sink 算子的并行度是否为 1(若设置)。

五、总结

要保障写入 Kafka 的数据有序,需同时满足:

  1. 批处理模式:确保全局排序生效。
  2. 类型转换:将字符串时间戳正确转换为 TIMESTAMPTIMESTAMP_LTZ 类型。
  3. 单分区写入:通过固定分区键将所有数据路由到同一 Kafka 分区。
  4. 生产者配置:限制并发请求,确保消息按顺序发送和确认。

通过以上步骤,可实现从 Hive 到 Kafka 的有序数据传输。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/89188.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/89188.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

7.17 滑动窗口 |assign |memo

lcp56. memo优化tle或者改用bfsclass Solution {int m, n;int dx[4] {0, 0, 1, -1};int dy[4] {1, -1, 0, 0};public:int conveyorBelt(vector<string>& matrix, vector<int>& start, vector<int>& end) {int ret INT_MAX;m matrix.size();n…

统计功效是什么?

统计功效的通俗理解可以把“统计功效”想象成侦探破案的能力——它代表统计检验&#xff08;侦探&#xff09;在犯罪事实确实存在&#xff08;真实效应存在&#xff09;时&#xff0c;成功发现真相&#xff08;检测出效应&#xff09;的概率。核心比喻假设你是一个侦探&#xf…

大语言模型(LLM)训练的教师强制(Teacher Forcing)方法

大语言模型&#xff08;LLM&#xff09;在训练时使用一种名为“教师强制&#xff08;Teacher Forcing&#xff09;”的方法&#xff0c;而不是它们在推理&#xff08;生成文本&#xff09;时使用的“自回归&#xff08;Autoregressive&#xff09;”方法 。阐明关于LLM训练的一…

归一化与激活函数:深度学习的双引擎

归一化和激活函数区别 归一化和激活函数是深度学习中两个不同但又存在关联的技术,前者聚焦于“数据分布的调整”,后者聚焦于“引入非线性与输出转换”。 Softmax 既可以被视为一种归一化操作,也属于激活函数 因为它同时满足两者的核心特征,只是从不同角度定义:从“输出…

C# --- 单例类错误初始化 + 没有释放资源导致线程泄漏

C# --- 单例类错误初始化 没有释放资源导致线程泄漏Background原因分析问题一&#xff1a; 错误初始化&#xff08;使用了箭头函数&#xff09;问题一&#xff1a; 没有Dispose资源Background 背景: service A的其中一个Api会向mq发送消息问题&#xff1a;线上发现这个服务经常…

MySQL基础学习之DML,DQL(二)

这里写目录标题一、DML1、INSERT语句1)、给指定列添加数据2)、给全部列添加数据3)、批量数据添加数据4)、操作2、UPDATE语句3、DELETE语句二、DQL1、单表查询1&#xff09;查询语法2&#xff09;查询全部3&#xff09;查询部分4&#xff09;条件查询5&#xff09;聚合函数6&…

在 Linux 系统中实现 Spring Boot 程序自动启动的最佳实践

在实际部署 Spring Boot 项目的生产环境中&#xff0c;如何确保服务自动启动&#xff08;如开机自动运行、宕机自动恢复&#xff09;是一项基础而关键的运维能力。本文将系统介绍如何在 Linux 中将 Spring Boot 应用注册为 systemd 服务&#xff0c;实现进程守护与自动启动。&a…

如何建立项目团队的自驱力文化?

建立项目团队的自驱力文化&#xff0c;关键在于赋权机制、目标共创、持续反馈、内在激励、价值认同。 其中&#xff0c;“目标共创”尤其重要。项目成员若未参与目标制定&#xff0c;仅被动接受任务&#xff0c;将很难激发责任感和参与热情。反之&#xff0c;通过共创目标&…

【React Native】布局文件-底部TabBar

布局文件-底部tabBar 内容配置 export default function Layout() {return (<Tabs />); }默认会将布局文件是将与它在同一个目录的所有文件&#xff0c;包括下级目录的文件&#xff0c;全都配置成Tab了。&#xff1a; 这样做显然不对&#xff0c;正确的做法是 在app目…

CompareFace使用

CompareFace 使用 CompareFace 有三种服务&#xff0c;分别是人脸识别&#xff08;RECOGNITION&#xff09;、人脸验证&#xff08;VERIFICATION&#xff09;、人脸检测&#xff08;DETECTION&#xff09;。 人脸识别其实就是人脸身份识别(每张照片只有一个人脸)&#xff0c;…

APP测试之Monkey压力测试

&#xff08;一&#xff09;Monkey简介 Monkey意指猴子&#xff0c;顽皮淘气。所以Monkey测试&#xff0c;顾名思义也就像猴子一样在软件上乱敲按键&#xff0c;猴子什么都不懂&#xff0c;就爱捣乱。 Monkey 是 Android SDK 自带的命令行工具&#xff0c;它通过向系统发送伪…

时序大模型为时序数据库带来的变革与机遇

时序数据&#xff08;Time Series Data&#xff09;作为记录系统状态随时间变化的重要数据类型&#xff0c;在物联网、金融交易、工业监控等领域呈爆炸式增长。传统时序数据库专注于高效存储和查询时序数据&#xff0c;而时序大模型&#xff08;Time Series Foundation Models&…

深入核心:理解Spring Boot的三大基石:起步依赖、自动配置与内嵌容器

深入核心&#xff1a;理解Spring Boot的三大基石&#xff1a;起步依赖、自动配置与内嵌容器 摘要&#xff1a;在上一章&#xff0c;我们领略了Spring Boot带来的革命性开发体验。但魔法的背后&#xff0c;必有其科学的支撑。本章将带你深入Spring Boot的内核&#xff0c;系统性…

达梦数据库配置兼容MySQL

前言 作为一名数据库管理员或开发者&#xff0c;当项目需要从MySQL迁移到达梦数据库时&#xff0c;最关心的莫过于兼容性问题。达梦作为国产数据库的佼佼者&#xff0c;提供了良好的MySQL兼容模式&#xff0c;今天我就来分享一下如何配置达梦数据库以实现对MySQL的兼容。 一、为…

js与vue基础学习

vue创建项目 安装node安装node、npm、cnpm node -v npm -v #npm服务器位置处于国外&#xff0c;下载包的速度会比较缓慢。阿里为国内用户提供的cnpm&#xff0c;他是npm的镜像&#xff0c;下载第三方包时&#xff0c;们完全可以使用cnpm来替代npm。 cnpm -v在node中执行JavaScr…

【开源.NET】一个 .NET 开源美观、灵活易用、功能强大的图表库

文章目录一、项目介绍二、适用场景三、功能模块四、功能特点五、效果展示六、开源地址一、项目介绍 LiveCharts2 是一个开源、简单、灵活、交互式且功能强大的 .NET 图表库。LiveCharts2 现在几乎可以在任何地方运行&#xff1a;Maui、Uno Platform、Blazor-wasm、WPF、WinFor…

使用Whistle自定义接口返回内容:Mock流式JSON数据全解析

一.mock接口返回数据流程 定位目标接口 在Whistle的Network面板中找到需要Mock的接口&#xff0c;右键点击请求信息&#xff0c;选择COPY -> URL复制完整URL&#xff0c;确保URL路径精确到具体接口。准备Mock数据 点击对应接口&#xff0c;在右侧面板切换到response标签页&a…

【前端】富文本编辑器插件 wangEditor 5 基本使用(Vue2)

https://www.wangeditor.com/v5 一、安装 首先安装editor yarn add wangeditor/editor # 或者 npm install wangeditor/editor --save安装Vue2组件 yarn add wangeditor/editor-for-vue # 或者 npm install wangeditor/editor-for-vue --save或者Vue3 yarn add wangeditor/…

自适应哈希索引 和 日志缓冲区

目录 1. 自适应哈希索引在内存中的位置 2. 自适应哈希索引的作用 3. 为什么要创建自适应哈希索引 4. 适应哈希索引的Key -Value如何设置&#xff1f; 5. 日志缓冲区在内存中的位置 6. 日志缓冲区的作用 7. 日志不通过LogBuffer直接写入磁盘不行吗&#xff1f; 1. 自适应哈…

中国旅行社协会在京召开“文旅人工智能应用研讨会”,助力文旅创新发展

7月15日&#xff0c;由中国旅行社协会数字经济专业委员会和在线旅行服务商分会联合主办的“人工智能技术在文旅产业中的应用”研讨会在北京举行。中国旅行社协会副会长、秘书长孙桂珍出席并致辞&#xff0c;中国工程院外籍院士、具身智能机器人专家张建伟、北京第二外国语学院旅…