Flink-05学习 接上节,将FlinkJedisPoolConfig 从Kafka写入Redis

上节成功实现了FlinkKafkaConsumer消费Kafka数据,并将数据写入到控制台,接下来将继续将计算的结果输入到redis中。

pom.xml

引入redis到pom包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flink-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.maven</groupId><artifactId>maven-plugin-api</artifactId><version>2.0</version></dependency><dependency><groupId>org.apache.maven.plugin-tools</groupId><artifactId>maven-plugin-annotations</artifactId><version>3.2</version></dependency><dependency><groupId>org.codehaus.plexus</groupId><artifactId>plexus-utils</artifactId><version>3.0.8</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.8.2</version><scope>test</scope></dependency><!--mybatis坐标--><dependency><groupId>org.mybatis</groupId><artifactId>mybatis</artifactId><version>3.4.5</version></dependency><!--mysql驱动坐标--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.6</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.7.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-plugin-plugin</artifactId><version>3.2</version><executions><execution><phase>package</phase><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins>
</build>
</project>

KafkaProducer.java 生产数据存入Kafka

同上一节,具体代码

package org.example.snow.demo5;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** @author snowsong*/
public class KafkaTestProducer {public static void main(String[] args) throws InterruptedException {Properties props = new Properties();// Kafka 集群的初始连接地址props.put("bootstrap.servers", "172.16.1.173:9092");// 序列化器 将 Java 对象序列化为字节数组props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// kafka生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 消息循环for (int i = 0; i < 50; i++) {String key = "key-" + i;String value = "value-" + i;ProducerRecord<String, String> record = new ProducerRecord<>("xue", key, value);producer.send(record);System.out.println("send: " + key);Thread.sleep(200);}// 关闭生产者producer.close();}
}

启动服务类

Flink消费Kafka,并将结果存入redis。
设置FlinkRedisConfig

   // 配置 Redis 连接池,设置 Redis 服务器地址和端口并构建对象FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(REDIS_SERVER).setPort(REDIS_PORT).build();// 创建 RedisSink 对象,用于将数据写入 RedisRedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());// 将 RedisSink 添加到数据流中,作为数据的接收端wordData.addSink(redisSink);

MyRedisMapper
它实现了 RedisMapper 接口,用于自定义 Redis 数据的映射规则。MyRedisMapper 类用于将 Flink 数据流中的 Tuple2 对象映射到 Redis 命令中。

public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>> {/*** 获取当前命令的描述信息。** @return 返回Redis命令的描述信息对象,其中包含了命令的类型为LPUSH。*/@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}/*** 从给定的Tuple2数据中获取键。** @param data 一个包含两个字符串元素的Tuple2对象* @return 返回Tuple2对象的第一个元素,即键*/@Overridepublic String getKeyFromData(Tuple2<String,String> data) {return data.f0;}/*** 从给定的元组中获取第二个元素的值。** @param data 一个包含两个字符串元素的元组* @return 元组中的第二个元素的值*/@Overridepublic String getValueFromData(Tuple2<String,String> data) {return data.f1;}

starApp的完整代码如下:

package org.example.snow.demo5;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;import java.util.Properties;/*** @author snowsong*/
public class StartApp {private static final String REDIS_SERVER = "0.0.0.0";private static final Integer REDIS_PORT = 6379;public static void main(String[] args) throws Exception {// 初始化StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置 Kafka 客户端的连接参数Properties properties = new Properties();properties.setProperty("bootstrap.servers", "172.16.1.173:9092");FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>("xue",new SimpleStringSchema(), properties);DataStreamSource dataStreamSource = env.addSource(flinkKafkaConsumer);// 将接收的数据映射为二元组SingleOutputStreamOperator<Tuple2<String, String>> wordData = dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {/*** 将输入的字符串映射为 Tuple2 对象。** @param value 输入的字符串* @return 一个包含两个元素的 Tuple2 对象,第一个元素为 "l_words",第二个元素为输入的字符串* @throws Exception 如果发生异常,则抛出该异常*/@Overridepublic Tuple2<String, String> map(String value) throws Exception {return new Tuple2<>("l_words", value);}});// 配置 Redis 连接池,设置 Redis 服务器地址和端口并构建对象FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(REDIS_SERVER).setPort(REDIS_PORT).build();// 创建 RedisSink 对象,用于将数据写入 RedisRedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());// 将 RedisSink 添加到数据流中,作为数据的接收端wordData.addSink(redisSink);env.execute();}/*** MyRedisMapper 类用于将 Flink 数据流中的 Tuple2 对象映射到 Redis 命令中。* 它实现了 RedisMapper 接口,用于自定义 Redis 数据的映射规则。*/public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>> {/*** 获取当前命令的描述信息。** @return 返回Redis命令的描述信息对象,其中包含了命令的类型为LPUSH。*/@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}/*** 从给定的Tuple2数据中获取键。** @param data 一个包含两个字符串元素的Tuple2对象* @return 返回Tuple2对象的第一个元素,即键*/@Overridepublic String getKeyFromData(Tuple2<String,String> data) {return data.f0;}/*** 从给定的元组中获取第二个元素的值。** @param data 一个包含两个字符串元素的元组* @return 元组中的第二个元素的值*/@Overridepublic String getValueFromData(Tuple2<String,String> data) {return data.f1;}}}

运行结果

请添加图片描述

存入redis结果
请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/913051.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/913051.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

git教程-pycharm使用tag打标签

一.生成tag标签 前言 当我们的代码完成了第一阶段的需求&#xff0c;版本稳定后&#xff0c;希望能出个稳定版本。于是在 commit 后需要打个 tag 标签&#xff0c;也就是我们平常说的版本号&#xff0c;如v1.0版本 本篇讲解如何使用 pycharm 打 tag 标签&#xff0c;并推送到…

PHP Error: 深入解析与处理技巧

PHP Error: 深入解析与处理技巧 引言 PHP作为一种广泛使用的服务器端脚本语言,在Web开发领域占据着重要地位。然而,任何编程语言都难以避免错误的发生。本文将深入探讨PHP错误处理的相关知识,包括错误类型、错误显示、错误日志以及错误处理技巧,帮助开发者更好地应对和解…

21、企业行政办公(OA)数字化转型:系统如何重塑企业高效运营新范式

企业行政办公是营造高效工作环境、提升员工幸福感和归属感的重要基石&#xff0c;更是传递组织温度与价值关怀的第一窗口。在数字化转型浪潮席卷各行各业的今天&#xff0c;企业行政办公领域正经历一场静默但深刻的变革。据统计&#xff0c;采用智能化OA系统的企业&#xff0c;…

基于开源AI智能名片链动2+1模式S2B2C商城小程序的抖音渠道力拓展与多渠道利润增长研究

摘要&#xff1a;在数字化商业竞争日益激烈的背景下&#xff0c;抖音平台凭借其庞大的流量基础和兴趣电商生态&#xff0c;成为品牌增长的关键阵地。渠道力作为品牌增长的核心驱动力&#xff0c;以抖音势能为内核&#xff0c;通过流量与销量的外溢效应&#xff0c;可显著提升品…

基于二维码的视频合集高效管理与分发技术

一、 视频资源聚合的技术挑战与解决方案 在企业培训、在线教育和产品展示等场景中&#xff0c;视频资源的结构化组织与高效分发始终是技术实现的核心挑战。传统方案往往面临三大痛点&#xff1a;资源碎片化导致的管理混乱、多视频序列播放的用户体验不佳、以及跨平台兼容性问题…

GPT-2论文阅读:Language Models are Unsupervised Multitask Learners

本文解析 OpenAI 2019 年发布的里程碑式论文&#xff0c;该论文首次提出了 GPT-2 模型&#xff0c;揭示了语言模型作为无监督多任务学习器的革命性潜力。文章的核心观点是&#xff1a;语言模型在无监督训练过程中&#xff0c;可以隐式地学习多种任务&#xff0c;无需特定任务微…

R 语言安装使用教程

一、R 语言简介 R 是一种用于统计分析、数据挖掘和可视化的编程语言和环境。它在学术界和数据分析领域中广泛使用&#xff0c;拥有丰富的统计函数库和绘图功能。 二、安装 R 语言 2.1 下载 R 安装包 前往 CRAN 官网下载适合你操作系统的安装程序&#xff1a; 官网地址&…

智能Agent场景实战指南 Day 1:智能Agent概述与架构设计

【智能Agent场景实战指南 Day 1】智能Agent概述与架构设计 引言 欢迎来到"智能Agent场景实战指南"系列的第一天&#xff01;今天我们将深入探讨智能Agent的基本概念和架构设计。在这个大模型时代&#xff0c;智能Agent已成为连接AI技术与实际业务场景的关键桥梁&am…

Plan-Grounded Large Language Models forDual Goal Conversational Settings

Plan-Grounded Large Language Models for Dual Goal Conversational Settings - ACL Anthologyhttps://aclanthology.org/2024.eacl-long.77/ 1. 概述 引导用户完成诸如烹饪或 DIY 之类的手动任务(Choi 等,2022),对于当前的大型语言模型(LLMs)来说是一个新颖且具有挑战…

python打卡day57@浙大疏锦行

知识点回顾 序列数据的处理&#xff1a; 处理非平稳性&#xff1a;n阶差分处理季节性&#xff1a;季节性差分自回归性无需处理 模型的选择 AR(p) 自回归模型&#xff1a;当前值受到过去p个值的影响MA(q) 移动平均模型&#xff1a;当前值收到短期冲击的影响&#xff0c;且冲击影…

YOLOv11性能评估全解析:从理论到实战的指标指南

深入剖析目标检测核心指标,掌握模型优化的关键密码 为什么需要性能评估指标? 在目标检测领域,YOLO系列模型以其卓越的速度-精度平衡成为行业标杆。当我们训练或使用YOLOv11模型时,一个核心问题始终存在:如何量化模型的性能? 性能评估指标正是回答这个问题的关键工具,它…

【Linux内核及内核编程】Linux2.6 后的内核特点

2003 年发布的 Linux 2.6 内核是一个里程碑&#xff0c;它标志着 Linux 从 “极客玩具” 向全场景操作系统的蜕变。如果说 2.4 内核是 Linux 进入企业级市场的起点&#xff0c;那么 2.6 及后续版本则是一场从内到外的 “现代化革命”&#xff0c;不仅让 Linux 在服务器、桌面、…

GO 语言学习 之 结构体

在 Go 语言中&#xff0c;结构体&#xff08;struct&#xff09;是一种用户自定义的数据类型&#xff0c;它可以包含多种不同类型的数据组合在一起。结构体为组织和管理相关数据提供了一种有效的方式&#xff0c;常用于表示现实世界中的对象或概念。如果你懂C/C&#xff0c;那么…

ubuntu 启动SSH 服务

在Ubuntu系统中&#xff0c;启动SSH服务需要确保SSH服务已经安装&#xff0c;并且正确配置。以下是详细步骤&#xff1a; 一、检查SSH服务是否已安装 检查SSH服务是否安装 打开终端&#xff08;Terminal&#xff09;。 输入以下命令来检查SSH服务是否已安装&#xff1a; bash…

【3.4 漫画分布式共识算法】

3.4 漫画分布式共识算法 🎭 人物介绍 小明:对分布式共识算法好奇的开发者架构师老王:分布式系统专家,精通各种共识算法📚 共识算法概述 小明:“老王,分布式系统中为什么需要共识算法?” 架构师老王:“想象一下,你有多个服务器需要就某个决定达成一致,比如选出一…

程序计数器(PC)是什么?

程序计数器&#xff08;PC&#xff09;是什么&#xff1f; 程序计数器&#xff08;PC&#xff09;详解 程序计数器&#xff08;Program Counter, PC&#xff09; 是CPU中的一个关键寄存器&#xff0c;用于存储下一条待执行指令的内存地址。它控制程序的执行流程&#xff0c;是…

影楼精修-智能修图Agent

今天给大家介绍一篇令人惊喜的论文《JarvisArt: Liberating Human Artistic Creativity via an Intelligent Photo Retouching Agent》 论文地址&#xff1a;https://arxiv.org/pdf/2506.17612 Code&#xff08;暂无代码&#xff09;&#xff1a;https://github.com/LYL1015/…

帕金森与健康人相关数据和处理方法(一些文献的记录)

主要的帕金森脑电数据进行一些分类分析的文章。 帕金森病 2004 年至 2023 年脑电图研究的文献计量分析对于研究的分析以及关键研究和趋势从脑电图信号中检测帕金森病&#xff0c;采用离散小波变换、不同熵度量和机器学习技术使用机器学习和深度学习方法分析不同模态的数据以诊…

优象光流模块,基于python的数据读取demo

优象光流模块&#xff0c;型号UP-FLOW-LC-302-3C&#xff0c;准备将其应用于设备的运行速度测量&#xff0c;物美价廉。 厂家提供的数据格式表&#xff1a; 实测用python的serial包readline()函数读取到的帧数据&#xff1a; 与官方的给定略有出入&#xff0c;不过主要字节的顺…

模型部署与推理--利用libtorch模型部署与推理

文章目录 1从pytorch导出pt文件2下载并配置libtorch3推理4结果&#xff1a;时间对比&#xff1a;推理结果&#xff1a; 参考 以deeplabv3plus为例讲解怎么利用libtorch部署在c上模型。关于libtorch和pt文件请参考我之前的博客。 1从pytorch导出pt文件 if __name__ __main__: …