DataStream实现WordCount

目录

  • 读取文本数据
  • 读取端口数据

事实上Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。所以从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:

$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

对于Flink而言,流才是整个处理逻辑的底层核心,所以流批统一之后的DataStream API更加强大,可以直接处理批处理和流处理的所有场景。

读取文本数据

需要处理数据如下:

hello flink
hello java
hello world

在这里插入图片描述

package com.tsg.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class BoundedStreamWordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile("input/word.txt");SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = dataStreamSource.flatMap((String value, Collector<Tuple2<String, Long>> out) -> {String[] split = value.split(" ");for (String s : split) {out.collect(Tuple2.of(s, 1L));}}).returns(Types.TUPLE(Types.STRING,Types.LONG));
//        KeyedStream<Tuple2<String, Long>, Tuple> tuple2TupleKeyedStream = wordAndOne.keyBy(0);KeyedStream<Tuple2<String, Long>, String> tuple2TupleKeyedStream = wordAndOne.keyBy(data->data.f0);SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2TupleKeyedStream.sum(1);sum.print();env.execute();}
}

在这里插入图片描述

读取端口数据

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

package com.tsg.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class StreamWordCount {public static void main(String[] args) throws Exception {// 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从参数中提取主机名和端口号ParameterTool parameterTool = ParameterTool.fromArgs(args);String hostname = parameterTool.get("host");int port = parameterTool.getInt("port");DataStreamSource<String> lineStream = env.socketTextStream(hostname,port);
//        DataStreamSource<String> lineStream = env.socketTextStream("master", 7777);SingleOutputStreamOperator<Tuple2<String, Long>> tuple2SingleOutputStreamOperator = lineStream.flatMap((String str, Collector<Tuple2<String, Long>> out) -> {// 注意这里的Collector是org.apache.flink.util.Collector;String[] split = str.split(" ");for (String s : split) {out.collect(Tuple2.of(s, 1L));}}).returns(Types.TUPLE(Types.STRING,Types.LONG));KeyedStream<Tuple2<String, Long>, Tuple> tuple2TupleKeyedStream = tuple2SingleOutputStreamOperator.keyBy(0);SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2TupleKeyedStream.sum(1);sum.print();env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/94088.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/94088.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

imx6ull-驱动开发篇37——Linux MISC 驱动实验

目录 MISC 设备驱动 miscdevice结构体 misc_register 函数 misc_deregister 函数 实验程序编写 修改设备树 驱动程序编写 miscbeep.c miscbeepApp.c Makefile 文件 运行测试 MISC 驱动也叫做杂项驱动&#xff0c;也就是当某些外设无法进行分类的时候就可以使用 MISC…

C# 项目“交互式展厅管理客户端“针对的是“.NETFramework,Version=v4.8”,但此计算机上没有安装它。

C# 项目“交互式展厅管理客户端"针对的是".NETFramework,Versionv4.8”&#xff0c;但此计算机上没有安装它。 解决方法&#xff1a; C# 项目“交互式展厅管理客户端"针对的是".NETFramework,Versionv4.8”&#xff0c;但此计算机上没有安装它。 下载地址…

FFmpeg及 RTSP、RTMP

FFmpeg 是一个功能强大的跨平台开源音视频处理工具集 &#xff0c;集录制、转码、编解码、流媒体传输等功能于一体&#xff0c;被广泛应用于音视频处理、直播、点播等场景。它支持几乎所有主流的音视频格式和协议&#xff0c;是许多媒体软件&#xff08;如 VLC、YouTube、抖音等…

金山办公的服务端开发工程师-25届春招笔试编程题

1.作弊 溪染&#xff1a;六王毕&#xff0c;四海一&#xff1b;蜀山兀&#xff0c;阿房出。覆压三百余里&#xff0c;隔离天日。骊山北构而西折&#xff0c;直走咸阳。二川溶溶&#xff0c;流入宫墙。五步一楼&#xff0c;十步一阁&#xff1b;廊腰缦回&#xff0c;檐牙高啄&am…

注意力机制中为什么q与k^T相乘是注意力分数

要理解 “qkT\mathbf{q} \times \mathbf{k}^TqkT 是注意力分数”&#xff0c;核心是抓住注意力机制的本质目标 ——量化 “查询&#xff08;q&#xff09;” 与 “键&#xff08;k&#xff09;” 之间的关联程度&#xff0c;而向量点积&#xff08;矩阵相乘的元素本质&#xff…

Krea Video:Krea AI推出的AI视频生成工具

本文转载自&#xff1a;Krea Video&#xff1a;Krea AI推出的AI视频生成工具 - Hello123工具导航 ** 一、平台定位与技术特性 Krea Video 是 Krea AI 推出的 AI 视频生成工具&#xff0c;通过结合关键帧图像与文本提示实现精准视频控制。用户可自定义视频首尾帧、为每张图片设…

C++初阶(2)C++入门基础1

C是在C的基础之上&#xff0c;容纳进去了面向对象编程思想&#xff0c;并增加了许多有用的库&#xff0c;以及编程范式 等。熟悉C语言之后&#xff0c;对C学习有一定的帮助。 本章节主要目标&#xff1a; 补充C语言语法的不足&#xff0c;以及C是如何对C语言设计不合理的地方…

ANSI终端色彩控制知识散播(II):封装的层次(Python)——不同的逻辑“一样”的预期

基础高阶各有色&#xff0c;本原纯真动乾坤。 笔记模板由python脚本于2025-08-22 18:05:28创建&#xff0c;本篇笔记适合喜欢终端色彩ansi编码和python的coder翻阅。 学习的细节是欢悦的历程 博客的核心价值&#xff1a;在于输出思考与经验&#xff0c;而不仅仅是知识的简单复述…

前端无感刷新 Token 的 Axios 封装方案

在现代前端应用中&#xff0c;基于 Token 的身份验证已成为主流方案。然而&#xff0c;Token 过期问题常常困扰开发者 —— 如何在不打断用户操作的情况下自动刷新 Token&#xff0c;实现 "无感刷新" 体验&#xff1f;本文将详细介绍基于 Axios 的解决方案。什么是无…

【数据结构】线性表——链表

这里写自定义目录标题线性表链表&#xff08;链式存储&#xff09;单链表的定义单链表初始化不带头结点的单链表初始化带头结点的单链表初始化单链表的插入按位序插入带头结点不带头结点指定结点的后插操作指定结点的前插操作单链表的删除按位序删除&#xff08;带头结点&#…

容器安全实践(三):信任、约定与“安全基线”镜像库

容器安全实践&#xff08;一&#xff09;&#xff1a;概念篇 - 从“想当然”到“真相” 容器安全实践&#xff08;二&#xff09;&#xff1a;实践篇 - 从 Dockerfile 到 Pod 的权限深耕 在系列的前两篇文章中&#xff0c;我们探讨了容器安全的底层原理&#xff0c;并详细阐述…

百度面试题:赛马问题

题目现在有25匹马和一个赛马场&#xff0c;赛马场有5条跑道&#xff08;即一次只能比较5匹马&#xff09;&#xff0c;并且没有秒表等计时工具&#xff0c;因此每次赛马只能知道这5匹马的相对时间而非绝对时间。问&#xff1a;如何筛选出跑的最快的3匹马&#xff1f;需要比赛几…

centos下安装Nginx(搭建高可用集群)

CentOS-7下安装Nginx的详细过程_centos7安装nginx-CSDN博客 centos换yum软件管理包镜像 CentOS 7.* 更换国内镜像源完整指南_centos7更换国内源-CSDN博客 VMware虚拟机上CentOS配置nginx后,本机无法访问 执行命令&#xff1a;/sbin/iptables -I INPUT -p tcp --dport 80 -j…

实时视频技术选型深度解析:RTSP、RTMP 与 WebRTC 的边界

引言&#xff1a;WebRTC 的“光环”与现实落差 在实时音视频领域&#xff0c;WebRTC 常常被贴上“终极解决方案”的标签&#xff1a;浏览器原生支持、无需插件、点对点传输、毫秒级延迟&#xff0c;这些特性让它在媒体和开发者群体中拥有了近乎神话般的地位。许多人甚至认为&a…

基于深度学习的阿尔茨海默症MRI图像分类系统

基于深度学习的阿尔茨海默症MRI图像分类系统 项目概述 阿尔茨海默症是一种进行性神经退行性疾病&#xff0c;早期诊断对于患者的治疗和生活质量至关重要。本项目利用深度学习技术&#xff0c;基于MRI脑部扫描图像&#xff0c;构建了一个高精度的阿尔茨海默症分类系统&#xff0…

54 C++ 现代C++编程艺术3-移动构造函数

C 现代C编程艺术3-移动构造函数 文章目录C 现代C编程艺术3-移动构造函数场景1&#xff1a;动态数组资源转移 #include <iostream> #include <vector> class DynamicArray { int* data; size_t size; public: // 移动构造函数&#xff08;关键实现&#xf…

Sping Boot + RabbitMQ :如何在Spring Boot中整合RabbitMQ实现消息可靠投递?

Spring Boot整合RabbitMQ实现消息可靠投递全解析 在分布式系统中&#xff0c;消息中间件是解耦、异步、流量削峰的核心组件。RabbitMQ作为高可靠、易扩展的AMQP协议实现&#xff0c;被广泛应用于企业级场景。但消息传递过程中可能因网络波动、服务宕机等问题导致消息丢失&#…

STAR-CCM+|K-epsilon湍流模型溯源

【1】引言 三维CFD仿真经典软件很多&#xff0c;我接触过的有Ansys和STAR-CCM两种。因为一些机缘&#xff0c;我使用STAR-CCM更多&#xff0c;今天就来回顾一下STAR-CCM中K-epsilon湍流模型的基本定义。 【2】学习地址介绍 点击链接User Guide可以到达网页版本的STAR-CCM 24…

osgEarth 图像融合正片叠底

* 需求&#xff1a;* 高程渲染图 RGB.tif、 山体阴影图 AMP.tif** 高程渲染图 rgb波段分别 乘以 山体阴影图r波段&#xff0c; 然后除以255(AI说 读取的纹理就已经归一化到了 0~1 范围&#xff0c;不用除以 255)。本人遥感知识匮乏。问了AI,以上 需求在许多商业软件上已实现。…

Java接口响应速度优化

在 Java 开发中&#xff0c;接口响应速度直接影响用户体验和系统吞吐量。优化接口性能需要从代码、数据库、缓存、架构等多个维度综合考量&#xff0c;以下是具体方案及详细解析&#xff1a;一、代码层面优化代码是接口性能的基础&#xff0c;低效的代码会直接导致响应缓慢。1.…