Flink Sql 按分钟或日期统计数据量

一、环境版本

环境版本
Flink1.17.0
Kafka2.12
MySQL5.7.33

【注意】Flink 1.13版本增加Cumulate Window,之前版本Flink Sql 没有 Trigger 功能,长时间的窗口不能在中途触发计算,输出中间结果。比如每 10S 更新一次截止到当前的pv、uv。只能用Trigger配合State实现,可参考如下实现方式:
Flink DataStream 按分钟或日期统计数据量

二、MySQL建表脚本

create table user_log
(id      int auto_increment comment '主键'primary key,uid     int    not null comment '用户id',event   int    not null comment '用户行为',logtime bigint null comment '日志时间'
)comment '用户日志表,作为验证数据源';

三、用户日志类

新建maven项目

用以定义Kafka和MySQL中Schema

/*** 用户日志类*/
@Data
public class UserLog {//用户uidprivate int uid;//用户行为private int event;//日志时间private Date logtime;//获取日期,用于按日期统计数据public String getFormatDate() {return DateUtil.format(logtime, "yyyyMMdd");}//获取时间,精确到分钟public String getFormatTime() {return DateUtil.format(logtime, "yyyy-MM-dd HH:mm") + ":00";}
}
}

四、用户数据生成器

/*** 用户数据生成器*/
public class UserLogGenerator {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2.自定义数据生成器SourceDataGeneratorSource<UserLog> dataGeneratorSource = new DataGeneratorSource<>(// 指定GeneratorFunction 实现类new GeneratorFunction<Long, UserLog>(){// 定义随机数数据生成器public RandomDataGenerator generator;@Overridepublic void open(SourceReaderContext readerContext) throws Exception {generator = new RandomDataGenerator();}@Overridepublic UserLog map(Long aLong) throws Exception {UserLog userLog = new UserLog();//随机生成用户uiduserLog.setUid(generator.nextInt(1, 50));//随机生成用户行为userLog.setEvent(generator.nextInt(1, 2));//随机生成用户数据时间userLog.setLogtime(DateUtil.offset(new DateTime(), DateField.MILLISECOND, generator.nextInt(-2000, 2000)));return userLog;}},// 指定输出数据的总行数
//                60 * 60 * 10,1200,// 指定每秒发射的记录数RateLimiterStrategy.perSecond(10),// 指定返回值类型, 将Java的StockPrice封装成到TypeInformationTypeInformation.of(UserLog.class));DataStreamSource<UserLog> dataGeneratorSourceStream = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGeneratorSource");//输出生成数据
//        dataGeneratorSourceStream.print();//kafka数据写入KafkaSink<UserLog> kafkaSink = KafkaSink.<UserLog>builder().setBootstrapServers("hadoop01:9092").setRecordSerializer(KafkaRecordSerializationSchema.<UserLog>builder().setTopic("userLog").setValueSerializationSchema((SerializationSchema<UserLog>) userLog -> JSONUtil.toJsonStr(userLog).getBytes()).build()).build();dataGeneratorSourceStream.sinkTo(kafkaSink);//MySQL数据写入,用以数据验证SinkFunction<UserLog> jdbcSink = JdbcSink.sink("insert into user_log (uid, event, logtime) values (?, ?, ?)",new JdbcStatementBuilder<UserLog>() {@Overridepublic void accept(PreparedStatement preparedStatement, UserLog userLog) throws SQLException {preparedStatement.setInt(1, userLog.getUid());preparedStatement.setInt(2, userLog.getEvent());preparedStatement.setLong(3, userLog.getLogtime().getTime());}},JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(200).withMaxRetries(5).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://192.168.31.116:3306/demo").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("root").build());dataGeneratorSourceStream.addSink(jdbcSink);env.execute();}
}

五、Sql按分钟或日期统计PV和UV

public class UserLogSql {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);env.setParallelism(1);// 创建一个输入表SourceTableString sourceDDL = "create table user_log\n" +"(\n" +"    uid  INT\n" +"    , event INT\n" +"    , logtime BIGINT\n" +"    , rowtime AS TO_TIMESTAMP_LTZ(logtime, 3)\n" +"    , WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND\n" +") with (\n" +"      'connector' = 'kafka'\n" +"      ,'topic' = 'userLog'\n" +"      ,'properties.bootstrap.servers' = 'hadoop01:9092'\n" +"      ,'scan.startup.mode' = 'latest-offset'\n" +"      ,'format' = 'json'\n" +");";tableEnv.executeSql(sourceDDL);// 统计每分钟PV和UVString result = "select\n" +" date_format(window_start, 'yyyy-MM-dd') cal_day\n" +" , date_format(window_start, 'HH:mm:ss') start_time\n" +" , date_format(window_end, 'HH:mm:ss') end_time\n" +" , count(uid) pv\n" +" , count(distinct uid) uv\n" +"FROM TABLE(\n" +// 每隔10秒触发一次计算,窗口大小为1天
//                "    CUMULATE(TABLE user_log, DESCRIPTOR(rowtime), INTERVAL '10' SECOND, INTERVAL '1' DAY))\n" +// 每隔10秒触发一次计算,窗口大小为10秒"    CUMULATE(TABLE user_log, DESCRIPTOR(rowtime), INTERVAL '10' SECOND, INTERVAL '10' SECOND))\n" +"  GROUP BY window_start, window_end\n" +";";// 输出sql执行结果tableEnv.executeSql(result).print();}
}

六、sql-client方式执行Sql

# 建表语句
create table user_log
(uid  INT,event INT,logtime BIGINT,rowtime AS TO_TIMESTAMP_LTZ(logtime, 3) ,WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND
) with ('connector' = 'kafka','topic' = 'userLog''properties.bootstrap.servers' = 'hadoop01:9092','scan.startup.mode' = 'latest-offset','format' = 'json',
);# pv、uv计算语句, 每隔10秒触发一次计算,窗口大小为1天
selectdate_format(window_start, 'yyyy-MM-dd') cal_day,date_format(window_start, 'HH:mm:ss') start_time,date_format(window_end, 'HH:mm:ss') end_time,count(uid) pv,count(distinct uid) uv
FROM TABLE(CUMULATE(TABLE user_log, DESCRIPTOR(rowtime), INTERVAL '10' SECOND, INTERVAL '1' DAY))GROUP BY window_start, window_end;

七、数据验证

  1. 启动 UserLogGenerator
  2. 启动 UserLogSql或在sql-client执行Sql
  3. 在MySQL中验证查询

转换时间戳

时间戳转换前转换后
w_start2025-08-16 14:45:401755326740000
w_end2025-08-16 14:45:501755326750000
select count(distinct uid) from user_log where logtime< 1755326750000 and logtime>=1755326740000;
# 与MySql中输出一致SQL Query Result (Table)                                                               Refresh: 1 s                                                      Page: Last of 1                                              Updated: 23:50:09.972 cal_day                     start_time                       end_time                   pv                   uv2025-08-15                       23:45:30                       23:45:40                   15                   152025-08-15                       23:45:40                       23:45:50                  101                   452025-08-15                       23:45:50                       23:46:00                  104                   422025-08-15                       23:46:00                       23:46:10                  100                   422025-08-15                       23:46:10                       23:46:20                   97                   452025-08-15                       23:46:20                       23:46:30                  104                   402025-08-15                       23:46:30                       23:46:40                   97                   422025-08-15                       23:46:40                       23:46:50                   99                   442025-08-15                       23:46:50                       23:47:00                  103                   442025-08-15                       23:47:00                       23:47:10                   97                   442025-08-15                       23:47:10                       23:47:20                  100                   43

八、常见问题

  1. sql-client执行查询,缺少kafka包
# 运行SQL命令
Flink SQL> select * from user_log;
# 报错
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

解决方法

# 下载flink对应版本的kafka包,放到flink的lib目录下
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.0/flink-sql-connector-kafka-1.17.0.jar -P ${FLINK_HOME}/lib/

九、参考鸣谢

Flink 实时统计历史 pv、uv
Flink Cumulate Window

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/93347.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/93347.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 2460.对数组执行操作

给你一个下标从 0 开始的数组 nums &#xff0c;数组大小为 n &#xff0c;且由 非负 整数组成。 你需要对数组执行 n - 1 步操作&#xff0c;其中第 i 步操作&#xff08;从 0 开始计数&#xff09;要求对 nums 中第 i 个元素执行下述指令&#xff1a; 如果 nums[i] nums[i …

深入解析 @nestjs/typeorm的 forRoot 与 forFeature

nestjs/typeorm 是 NestJS 与 TypeORM 集成的官方模块&#xff0c;提供了 forRoot() 和 forFeature() 两个核心静态方法用于配置数据库连接和实体注册。本文将深入解析这两个方法的机制、使用场景和最佳实践。 一、TypeOrmModule.forRoot() - 全局数据库配置 forRoot() 方法用于…

关于simplifyweibo_4_moods数据集的分类问题

本来打算用情感分类数据集拿Transformer模型来练练手&#xff0c;发现训练效果并不好。当我分析了这个数据集的标签后发现问题了&#xff1a; 查看标签的分布&#xff1a; import pandas as pd# 先直接读取数据&#xff0c;不进行后续处理 data_file ~/data/simplifyweibo_4_m…

Custom SRP - Baked Light

https://catlikecoding.com/unity/tutorials/custom-srp/baked-light/本篇教程介绍将静态光照烘焙到 light map 和 light prob 中.首先贴上我遇到的问题,希望遇到的同学帮忙解答:实践本教程过程中,定义的 MetaPass 没有效果, Unity 始终在使用默认的 meta pass,我使用的是 unit…

[Python]PTA:实验2-3-1-for 求1到100的和

本题要求编写程序&#xff0c;计算表达式 1 2 3 ... 100 的值。输入格式&#xff1a;本题无输入。输出格式&#xff1a;按照以下格式输出&#xff1a;sum 累加和代码如下&#xff1a;x0 for i in range(1,101,1):xi print("sum {}".format(x))

【解决笔记】MyBatis-Plus 中无 selectList 方法

MyBatis-Plus 中无 selectList 方法的解决笔记 核心前提 MyBatis-Plus 的 BaseMapper 接口内置了 selectList 等基础查询方法&#xff0c;继承该接口可直接使用&#xff0c;无需手动实现。 无 selectList 方法的两种情况及解决方式 1. 未继承 BaseMapper&#xff08;推荐方案&a…

一周学会Matplotlib3 Python 数据可视化-绘制箱线图(Box)

锋哥原创的Matplotlib3 Python数据可视化视频教程&#xff1a; 2026版 Matplotlib3 Python 数据可视化 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili 课程介绍 本课程讲解利用python进行数据可视化 科研绘图-Matplotlib&#xff0c;学习Matplotlib图形参数基本设置&…

4.4 vue3生命周期函数

vue3生命周期函数生命周期钩子名称对比表阶段Vue 2 选项式 APIVue 3 组合式 API说明创建前beforeCreateonBeforeCreate&#xff08;已废弃&#xff09;Vue 3 中 setup() 替代创建完成createdsetup()&#xff08;替代&#xff09;setup 是入口&#xff0c;代替 beforeCreate 和 …

无脑整合springboot2.7+nacos2.2.3+dubbo3.2.9实现远程调用及配置中心

简介&#xff1a; 好久没有写博客了&#xff0c;最近辞职了有时间进行一次分享&#xff0c;今天我们主要是使用单体服务springboot整合nacos实现配置中心&#xff0c;然后整合dubbo来实现远程的rpc调用。如下是本地案例架构图&#xff0c;生产者和消费者的配置在nacos配置中心上…

腾讯位置商业授权微信小程序逆地址解析(坐标位置描述)

微信小程序JavaScript SDK 开发指南 逆地址解析(坐标位置描述) reverseGeocoder(options:Object) 本接口提供由坐标到坐标所在位置的文字描述的转换&#xff0c;输入坐标返回地理位置信息和附近poi列表。 注&#xff1a;坐标系采用gcj02坐标系 options属性说明 属性类型必填…

3D商品展示:技术狂欢下的普及困局

当微软推出Copilot 3D——仅需一张照片即可生成可编辑的3D模型时&#xff0c;业界曾欢呼“建模门槛彻底消失”。然而技术的美好愿景却撞上现实的铜墙铁壁&#xff1a;当前电商平台3D商品加载卡顿导致用户跳出率超60%&#xff0c;企业3D化渗透率仍不足34%。绚烂的技术烟花下&…

(Arxiv-2025)Stand-In:一种轻量化、即插即用的身份控制方法用于视频生成

Stand-In&#xff1a;一种轻量化、即插即用的身份控制方法用于视频生成 paper是WeChat发布在Arxiv 2025的工作 paper title:Stand-In: A Lightweight and Plug-and-Play Identity Control for Video Generation Code&#xff1a;链接 图1&#xff1a;给定一张参考图像&#xff…

数据科学与爬虫技术学习笔记

数据科学与爬虫技术学习笔记 一、数据科学基础库 1. NumPy&#xff1a;数值计算的基石 NumPy 是 Python 科学计算的核心库&#xff0c;专为数组和矩阵操作设计&#xff0c;能大幅简化循环操作&#xff0c;提供丰富的数学函数。 核心优势&#xff1a;高效处理同类型元素的多维…

学习嵌入式之硬件——I2C

一、I2C1.定义内部集成电路的简称&#xff0c;半双工串行同步通信&#xff0c;是芯片和芯片之间的通信方式&#xff1b;通常只有一个主机&#xff0c;多个从机&#xff0c;采用主从应答的方式上图所示是IIC的总线的使用场景&#xff0c;所有挂载在IIC总线上的设备都有两根信号线…

使用websockt

封装websocktHooksimport { ref, onMounted, onUnmounted } from vue;/*** webSocket的Hooks* param {string} websocket链接地址* */ export function useWebSocket(url: string) {// 核心状态 const data: Ref<any> ref(null);//收到websocket返回的数据const socke…

Jmeter自定义脚本

目录 log&#xff1a;输出类 Label&#xff1a;你自定义的组件的名称 FileName&#xff1a;添加的脚本文件的文件名 Parameters&#xff1a;你传入的参数&#xff0c;是一个字符串 args&#xff1a;你传入的参数&#xff0c;是一个数组 Parameters和args的异同&#xff1…

飞算 JavaAI 电商零售场景实践:从订单峰值到供应链协同的全链路技术革新

目录 一、电商核心场景的技术攻坚 1.1 分布式订单系统的事务一致性设计 1.1.1 TCC 模式下的订单创建流程 1.1.2 订单状态机的可靠流转 1.2 高并发秒杀系统的架构设计 1.2.1 多级限流与流量削峰 1.2.2 库存防超卖机制 1.3 智能推荐与用户行为分析 1.3.1 用户行为实时采…

51单片机-51单片机介绍

51单片机介绍单片机简介什么是单片机呢&#xff1f;单片机是一种集成电路芯片&#xff0c;采用超大规模集成电路技术将中央处理器&#xff08;CPU&#xff09;、随机存储器&#xff08;RAM&#xff09;、只读存储器&#xff08;ROM&#xff09;、多种I/O口、中断系统、定时器/计…

8月AI面试工具测评:破解规模化招聘难题

金秋校招临近&#xff0c;企业面临“百万简历涌入VS面试官团队告急”的典型困境。传统线下面试效率低下、标准参差&#xff0c;难以应对短时间内爆发式的人才筛选需求。AI面试工具凭借自动化与智能化特性成为破局关键&#xff0c;但市面上产品良莠不齐——究竟哪款能兼顾效率与…

Debian新一代的APT软件源配置文件格式DEB822详解

Debian 的 DEB822 格式详解&#xff1a;新一代 APT 源配置 DEB822 是一种基于 RFC 822 数据格式的配置文件语法&#xff0c;Debian 新一代的 APT 软件源配置文件格式就采用了 DEB822。DEB822 格式从 Debian 11 (Bullseye) 开始被引入&#xff0c;并在 Debian 12 (Bookworm) 中成…