Kafka集成Flume/Spark/Flink(大数据)/SpringBoot

Kafka集成Flume

在这里插入图片描述

Flume生产者

在这里插入图片描述
③、安装Flume,上传apache-flume的压缩包.tar.gz到Linux系统的software,并解压到/opt/module目录下,并修改其名称为flume
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Flume消费者

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Kafka集成Spark

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

生产者

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

object SparkKafkaProducer{def main(args:Array[String]):Unit = {//配置信息val properties  = new Properties()properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092")properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])//创建一个生产者var producer = new KafkaProducer[String,String](properties)//发送数据for(i <- 1 to 5){producer.send(new ProducerRecord[String,String]("first","atguigu"+i))}//关闭资源producer.close()}
}

在这里插入图片描述

消费者
在这里插入图片描述

Object SparkKafkaConsumer{def main(args:Array[String]):Unit = {//初始化上下文环境val conf = new SparkConf().setMaster("local[*]").setAppName("spark-kafka")val ssc = new StreamingContext(conf,Seconds(3))//消费数据val kafkapara = Map[String,Object](ConsumerConfig.BOOT_STRAP_SERVERS_CONFIG->"hadoop102:9092,hadoop103:9092",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],ConsumerConfig.GROUP_ID_CONFIG->"test")val kafkaDStream = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreFerConsistent,ConsumerStrategies.Subscribe[String,String](Set("first"),kafkapara))val valueDStream = kafkaDStream.map(record=>record.value())valueDStream.print()//执行代码,并阻塞ssc.start()ssc.awaitTermination()}
}

Kafka集成Flink

在这里插入图片描述

创建maven项目,导入以下依赖
在这里插入图片描述
resources里面添加log4j.properties文件,可以更改打印日志的级别为error
在这里插入图片描述

Flink生产者

public class FlinkafkaProducer1{public static void main(String[] args){//获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);//准备数据源ArrayList<String> wordList = new ArrayList<>();wordList.add("hello");wordList.add("atguigu");DataStreamSource<String> stream = env.fromCollection();//创建一个kafka生产者Properties properteis = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("first",new SimpleStringSchema(),properties);//添加数据源Kafka生产者stream.addSink(kafkaProducer);//执行env.execute();}
}

在这里插入图片描述

Flink消费者

public class FlinkafkaConsumer1{public static void main(String[] args){//获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);//创建一个消费者Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("first",new SimpleSStringSchema(),properties);//关联消费者和flink流env.addSource(kafkaConsumer).print();//执行env.execute();}
}

Kafka集成SpringBoot

在这里插入图片描述
在这里插入图片描述

生产者
在这里插入图片描述
在这里插入图片描述
通过浏览器发送
在这里插入图片描述
在这里插入图片描述

消费者

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/diannao/85531.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

debian12.9或ubuntu,vagrant离线安装插件vagrant-libvirt,20250601

系统盘: https://mirror.lzu.edu.cn/debian-cd/12.9.0/amd64/iso-dvd/debian-12.9.0-amd64-DVD-1.iso 需要的依赖包,无需安装ruby( sudo apt install -y ruby-full ruby-dev rubygems,后来发现不安装会有编译警告,还是安装吧 ) ,无需安装 zlib1g-dev liblzma-dev libxml2-de…

2025年软件测试面试八股文(含答案+文档)

&#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 Part1 1、你的测试职业发展是什么&#xff1f; 测试经验越多&#xff0c;测试能力越高。所以我的职业发展是需要时间积累的&#xff0c;一步步向着高级测试工程师…

[CSS3]响应式布局

导读 响应式就是一套代码, 兼容大中小不同的屏幕, 即网页内容不变, 网页布局随屏幕切换而改变 媒体查询 响应式布局的核心技术是媒体查询 媒体查询可以检测屏幕尺寸, 设置差异化的css 开发中的常用写法 使用范围属性, 划定屏幕范围 max-width 最大宽度min-width 最小宽度 …

在 Windows安装 make 的几种方式

在 Windows 上使用 make&#xff08;通常用于自动化构建 C/C 项目等&#xff09;有几种方法。以下是最常见的几种安装和使用方法&#xff1a; 文章目录 ✅ 方法一&#xff1a;使用 Chocolatey 安装 GNU Make&#xff08;推荐&#xff09;✅ 方法二&#xff1a;使用 WSL&#xf…

深度学习笔记25-RNN心脏病预测(Pytorch)

&#x1f368; 本文为&#x1f517;365天深度学习训练营中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 一、前期准备 1.数据处理 import torch.nn.functional as F import numpy as np import pandas as pd import torch from torch import nn dfpd.read_csv(r&…

Pytorch知识点2

Pytorch知识点 1、官方教程2、张量&#x1f9f1; 0、数组概念&#x1f9f1; 1. 创建张量&#x1f4d0; 2. 张量形状与维度&#x1f522; 3. 张量数据类型➗ 4. 张量的数学与逻辑操作&#x1f504; 5. 张量的就地操作&#x1f4e6; 6. 复制张量&#x1f680; 7. 将张量移动到加速…

池中锦鲤的自我修养,聊聊蓄水池算法

面试如泡池&#xff0c;蓄水似人生 起初你满怀期待跳进大厂池子&#xff0c;以为自己是天选之子&#xff0c;结果发现池子里早挤满了和你一样的“锦鲤候选人”。HR的渔网一撒&#xff0c;捞谁全看概率——这不就是蓄水池算法的精髓吗&#xff1f; 初入池&#xff08;i≤k&…

Linux应用开发之网络套接字编程

套接字&#xff08;Socket&#xff09;是计算机网络数据通信的基本概念和编程接口&#xff0c;允许不同主机上的进程&#xff08;运行中的程序&#xff09;通过网络进行数据交换。它为应用层软件提供了发送和接收数据的能力&#xff0c;使得开发者可以在不用深入了解底层网络细…

小白的进阶之路系列之六----人工智能从初步到精通pytorch数据集与数据加载器

本文将介绍以下内容: 数据集与数据加载器 数据迁移 如何建立神经网络 数据集与数据加载器 处理数据样本的代码可能会变得混乱且难以维护;理想情况下,我们希望我们的数据集代码与模型训练代码解耦,以获得更好的可读性和模块化。PyTorch提供了两个数据原语:torch.utils…

深入理解设计模式之中介者模式

深入理解设计模式之&#xff1a;中介者模式&#xff08;Mediator Pattern&#xff09; 一、什么是中介者模式&#xff1f; 中介者模式&#xff08;Mediator Pattern&#xff09;是一种行为型设计模式。它通过引入一个中介对象&#xff0c;来封装一组对象之间的交互&#xff0…

基于通义千问的儿童陪伴学习和成长的智能应用架构。

1.整体架构概览 我们的儿童聊天助手将采用典型的语音交互系统架构,结合大模型能力和外部知识库: 2. 技术方案分解 2.1. 前端应用/设备 选择: 移动App(iOS/Android)、Web应用,或者集成到智能音箱/平板等硬件设备中。技术栈: 移动App: React Native / Flutter (跨平台…

Python Day40

Task&#xff1a; 1.彩色和灰度图片测试和训练的规范写法&#xff1a;封装在函数中 2.展平操作&#xff1a;除第一个维度batchsize外全部展平 3.dropout操作&#xff1a;训练阶段随机丢弃神经元&#xff0c;测试阶段eval模式关闭dropout 作业&#xff1a;仔细学习下测试和训练代…

WordPress_suretriggers 权限绕过漏洞复现(CVE-2025-3102)

免责申明: 本文所描述的漏洞及其复现步骤仅供网络安全研究与教育目的使用。任何人不得将本文提供的信息用于非法目的或未经授权的系统测试。作者不对任何由于使用本文信息而导致的直接或间接损害承担责任。如涉及侵权,请及时与我们联系,我们将尽快处理并删除相关内容。 前…

基于Spring Boot 电商书城平台系统设计与实现(源码+文档+部署讲解)

技术范围&#xff1a;SpringBoot、Vue、SSM、HLMT、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、小程序、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容&#xff1a;免费功能设计、开题报告、任务书、中期检查PPT、系统功能实现、代码编写、论文编写和辅导、论文…

LeetCode 39.组合总和:回溯法与剪枝优化的完美结合

一、问题本质与形式化定义 1.1 题目形式化描述 输入&#xff1a;无重复整数数组candidates、目标值target输出&#xff1a;所有和为target的组合集合&#xff0c;满足&#xff1a; 元素可重复使用组合内元素非降序&#xff08;避免重复解&#xff09;解集无重复组合 1.2 问…

windows11安装编译QtMvvm

windows11安装编译QtMvvm 1 从github下载代码2 官方的Download/Installtion3 自行构建编译QtMvvm遇到的问题3.1 `qmake`问题执行命令报错原因分析qmake报错:找不到编译器 cl解决方案3.2 `make qmake_all`问题执行命令报错原因分析make命令未识别解决方案3.3 缺少`perl`问题执行…

unix/linux source 命令,其历史争议、兼容性、生态、未来展望

现在把目光投向unix/linux source命令的历史争议、兼容性、生态和未来展望,这能让我们更全面地理解一个技术点在更广阔的图景中所处的位置。 一、历史争议与设计权衡 虽然 source (或 .) 命令功能强大且不可或缺,但在其发展和使用过程中,也存在一些微妙的争议或设计上的权衡…

开发时如何通过Service暴露应用?ClusterIP、NodePort和LoadBalancer类型的使用场景分别是什么?

一、Service核心概念 Service通过标签选择器&#xff08;Label Selector&#xff09;关联Pod&#xff0c;为动态变化的Pod集合提供稳定的虚拟IP和DNS名称&#xff0c;主要解决&#xff1a; 服务发现负载均衡流量路由 二、Service类型详解 1. ClusterIP&#xff08;默认类型…

从线性代数到线性回归——机器学习视角

真正不懂数学就能理解机器学习其实是个神话。我认为&#xff0c;AI 在商业世界可以不懂数学甚至不懂编程也能应用&#xff0c;但对于技术人员来说&#xff0c;一些基础数学是必须的。本文收集了我认为理解学习本质所必需的数学基础&#xff0c;至少在概念层面要掌握。毕竟&…

华为IP(7)

端口隔离技术 产生的背景 1.以太交换网络中为了实现报文之间的二层隔离&#xff0c;用户通常将不同的端口加入不同的VLAN&#xff0c;实现二层广播域的隔离。 2.大型网络中&#xff0c;业务需求种类繁多&#xff0c;只通过VLAN实现二层隔离&#xff0c;会浪费有限的VLAN资源…