上节成功实现了FlinkKafkaConsumer消费Kafka数据,并将数据写入到控制台,接下来将继续将计算的结果输入到redis中。
pom.xml
引入redis到pom包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flink-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>1.17.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.maven</groupId><artifactId>maven-plugin-api</artifactId><version>2.0</version></dependency><dependency><groupId>org.apache.maven.plugin-tools</groupId><artifactId>maven-plugin-annotations</artifactId><version>3.2</version></dependency><dependency><groupId>org.codehaus.plexus</groupId><artifactId>plexus-utils</artifactId><version>3.0.8</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.8.2</version><scope>test</scope></dependency><!--mybatis坐标--><dependency><groupId>org.mybatis</groupId><artifactId>mybatis</artifactId><version>3.4.5</version></dependency><!--mysql驱动坐标--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.6</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.7.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-plugin-plugin</artifactId><version>3.2</version><executions><execution><phase>package</phase><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins>
</build>
</project>
KafkaProducer.java 生产数据存入Kafka
同上一节,具体代码
package org.example.snow.demo5;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** @author snowsong*/
public class KafkaTestProducer {public static void main(String[] args) throws InterruptedException {Properties props = new Properties();// Kafka 集群的初始连接地址props.put("bootstrap.servers", "172.16.1.173:9092");// 序列化器 将 Java 对象序列化为字节数组props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// kafka生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 消息循环for (int i = 0; i < 50; i++) {String key = "key-" + i;String value = "value-" + i;ProducerRecord<String, String> record = new ProducerRecord<>("xue", key, value);producer.send(record);System.out.println("send: " + key);Thread.sleep(200);}// 关闭生产者producer.close();}
}
启动服务类
Flink消费Kafka,并将结果存入redis。
设置FlinkRedisConfig
// 配置 Redis 连接池,设置 Redis 服务器地址和端口并构建对象FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(REDIS_SERVER).setPort(REDIS_PORT).build();// 创建 RedisSink 对象,用于将数据写入 RedisRedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());// 将 RedisSink 添加到数据流中,作为数据的接收端wordData.addSink(redisSink);
MyRedisMapper
它实现了 RedisMapper 接口,用于自定义 Redis 数据的映射规则。MyRedisMapper 类用于将 Flink 数据流中的 Tuple2 对象映射到 Redis 命令中。
public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>> {/*** 获取当前命令的描述信息。** @return 返回Redis命令的描述信息对象,其中包含了命令的类型为LPUSH。*/@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}/*** 从给定的Tuple2数据中获取键。** @param data 一个包含两个字符串元素的Tuple2对象* @return 返回Tuple2对象的第一个元素,即键*/@Overridepublic String getKeyFromData(Tuple2<String,String> data) {return data.f0;}/*** 从给定的元组中获取第二个元素的值。** @param data 一个包含两个字符串元素的元组* @return 元组中的第二个元素的值*/@Overridepublic String getValueFromData(Tuple2<String,String> data) {return data.f1;}
starApp的完整代码如下:
package org.example.snow.demo5;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;import java.util.Properties;/*** @author snowsong*/
public class StartApp {private static final String REDIS_SERVER = "0.0.0.0";private static final Integer REDIS_PORT = 6379;public static void main(String[] args) throws Exception {// 初始化StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置 Kafka 客户端的连接参数Properties properties = new Properties();properties.setProperty("bootstrap.servers", "172.16.1.173:9092");FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>("xue",new SimpleStringSchema(), properties);DataStreamSource dataStreamSource = env.addSource(flinkKafkaConsumer);// 将接收的数据映射为二元组SingleOutputStreamOperator<Tuple2<String, String>> wordData = dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {/*** 将输入的字符串映射为 Tuple2 对象。** @param value 输入的字符串* @return 一个包含两个元素的 Tuple2 对象,第一个元素为 "l_words",第二个元素为输入的字符串* @throws Exception 如果发生异常,则抛出该异常*/@Overridepublic Tuple2<String, String> map(String value) throws Exception {return new Tuple2<>("l_words", value);}});// 配置 Redis 连接池,设置 Redis 服务器地址和端口并构建对象FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(REDIS_SERVER).setPort(REDIS_PORT).build();// 创建 RedisSink 对象,用于将数据写入 RedisRedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());// 将 RedisSink 添加到数据流中,作为数据的接收端wordData.addSink(redisSink);env.execute();}/*** MyRedisMapper 类用于将 Flink 数据流中的 Tuple2 对象映射到 Redis 命令中。* 它实现了 RedisMapper 接口,用于自定义 Redis 数据的映射规则。*/public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>> {/*** 获取当前命令的描述信息。** @return 返回Redis命令的描述信息对象,其中包含了命令的类型为LPUSH。*/@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}/*** 从给定的Tuple2数据中获取键。** @param data 一个包含两个字符串元素的Tuple2对象* @return 返回Tuple2对象的第一个元素,即键*/@Overridepublic String getKeyFromData(Tuple2<String,String> data) {return data.f0;}/*** 从给定的元组中获取第二个元素的值。** @param data 一个包含两个字符串元素的元组* @return 元组中的第二个元素的值*/@Overridepublic String getValueFromData(Tuple2<String,String> data) {return data.f1;}}}
运行结果
存入redis结果