Spark实时流数据处理实例(SparkStreaming通话记录消息处理)

所用资源:

通过网盘分享的文件:spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar等4个文件
链接: https://pan.baidu.com/s/1zYHu29tLgDvS_L2Ud-22ZA?pwd=hnpg 提取码: hnpg

1.需求分析 :

假定有一个手机通信计费系统,用户通话时在基站交换机上临时保存了相关记录,由于交换机的容量 有限且分散在各地,因此需要及时将这些通话记录汇总到计费系统中进行长时间保存,以方便后续的 统计分析。

2.准备工作:

(1)确保Kafka服务已经启动,可在Linux终端窗体中使用jps命令查看具体的进程

spark@vm01:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties &

[1] 2770

spark@vm01:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties &

[2] 3128

spark@vm01:/usr/local/kafka$ jps

2770 QuorumPeerMain

3128 Kafka

2104 Main

3529 Jps

(2)创建从130到139的十个主题,为简单起见,通过kafka附带的脚本命令来完成

spark@vm01:/usr/local/kafka$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic 130

Created topic 130.

查看:

spark@vm01:/usr/local/kafka$ bin/kafka-topics.sh --list --zookeeper localhost:2181

130

(3)启动HDFS服务,使用jps查看是否有相关进程在运行

spark@vm01:/usr/local/kafka$ start-dfs.sh

Starting namenodes on [localhost]

localhost: starting namenode, logging to /usr/local/hadoop-2.6.5/logs/hadoop-spark-namenode-vm01.out

localhost: starting datanode, logging to /usr/local/hadoop-2.6.5/logs/hadoop-spark-datanode-vm01.out

Starting secondary namenodes [0.0.0.0]

0.0.0.0: starting secondarynamenode, logging to /usr/local/hadoop-2.6.5/logs/hadoop-spark-secondarynamenode-vm01.out

spark@vm01:/usr/local/kafka$ ^C

spark@vm01:/usr/local/kafka$ jps

4081 SecondaryNameNode

2770 QuorumPeerMain

4181 Jps

3128 Kafka

2104 Main

3710 NameNode

3887 DataNode

(4)在HDFS根目录中创建datas目录及日期的子目录,根据自己当前运行的程序时间进行创建即可

spark@vm01:/usr/local/kafka$ cd

spark@vm01:~$ hdfs dfs -mkdir -p /datas/202505

spark@vm01:~$ hdfs dfs -mkdir -p /datas/202506

(5)在python3.6环境中安装一个kafka-python库,以便程序能够正常访问kafka,后面需要填写一个专门的python程序来模拟基站交换机随机产生通话记录

spark@vm01:~$ sudo pip install kafka-python

[sudo] spark 的密码:

(6)启动PyCharm集成开发环境,在其中创建一个名为SparkKafkaBilling的项目,对应的Python解释器使用python3.6即可

点击file,newproject

文件名(位置):/home/spark/PycharmProjects/SparkKafkaBilling

编译:python3.6

点击创建

3.通话记录生产者模拟:

(1)在新建的项目SparkKafkaBilling中创建CallMsgProducer.py文件,然后输入代码,负责按照要求的记录格式模拟产生通话消息,并将其发送到Kafka的topic中。

from kafka import KafkaProducer

import random, datetime, time

# 产生一个以13开头的手机号字符串,共11位

def gen_phone_num():

    phone = '13'

    for x in range(9):

        phone = phone + str(random.randint(0, 9))

    return phone

(2)为了持续不断地生成新的通话记录信息,可以使用一个循环创建符合格式要求的通话记录信息字符串,且每产生一条消息后休眠随机的时长,然后继续生成下一条通话记录

# Kafka的消息生产者对象准备

producer = KafkaProducer(bootstrap_servers="localhost:9092")

working = True

tformat = '%Y-%m-%d %H:%M:%S'     #设置时间日志格式

while working:

    # 主叫号码,被叫号码,呼叫时间(模拟当前时间的前一天),接通时间,挂断时间

    src_phone = gen_phone_num()

    dst_phone = gen_phone_num()

    dail_time = datetime.datetime.now() + datetime.timedelta(days=-1)

    call_time = dail_time + datetime.timedelta(seconds=random.randint(0, 10))

    hangup_time = call_time + datetime.timedelta(seconds=random.randint(5, 600))

    # 将时间格式化为所需的字符串格式,类似2025-05-27 09:30:25

    s_dail_time = dail_time.strftime(tformat)

    s_call_time = call_time.strftime(tformat)

    s_hangup_time = hangup_time.strftime(tformat)

    # 生成通话记录消息字符串

    record = '%s,%s,%s,%s,%s' % (src_phone, dst_phone, s_dail_time, s_call_time, s_hangup_time)

    print('send : ', record)

    # 通话记录的主叫号码前三位为topic主题

    topic = src_phone[0:3]

    # 将通话记录字符串转换为字节数组

    msg = bytes(record, encoding='utf-8')

    # 调用send()将通话记录消息发送给Kafka

    producer.send(topic=topic, key=b"call",value=msg)

    # 休眠一个随机的时长,为一个0-1秒之间的随机小数

    time.sleep( random.random() )

producer.close()

4.消息接收者测试:

(1)在SparkKafkaBilling项目中创建CallMsgBilling.py文件,将Kafka中130~139这10个topic(主题)的消息接收并在屏幕上打印显示出来

如果报错 先执行(2),再重新运行

from pyspark.streaming.kafka import KafkaUtils

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

sc = SparkContext('local[2]','KafkaStreamBilling')

sc.setLogLevel("OFF")

ssc = StreamingContext(sc, 5)

streamRdd = KafkaUtils.createDirectStream(ssc,

               topics = ['130','131','132','133','134',

                         '135','136','137','138','139'],

               kafkaParams = {"metadata.broker.list":"localhost:9092"} )

streamRdd.pprint()

ssc.start()

ssc.awaitTermination()

(2)打开一个Linux终端窗体,在其中输入下面的命令,将消息接收者程序提交到Spark中运行,其中用到的spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar依赖库文件此前已下载放在~/streaming目录中,为避免每次提交应用程序时在命令行手动指定,可以将其复制到集群的各节点Spark安装目录中(位于/usr/local/spark/jars目录)

spark@vm01:~$ ls streaming

 FileStreamDemo.py

'IDLE (Python 3.7 64-bit).lnk'

 KafkaStreamDemo.py

 NetworkWordCountall.py

 NetworkWordCount.py

 NetworkWordCount.py.txt

 NetworkWordCountSave.py

 NetworkWordCountWindow.py

 spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar

spark@vm01:~$ cp streaming/*.jar /usr/local/spark/jars

spark@vm01:~$ cd PycharmProjects/SparkKafkaBilling

spark@vm01:~/PycharmProjects/SparkKafkaBilling$ spark-submit CallMsgBilling.py

(3)回到PyCharm集成开发环境中,运行CallMsgProducer.py程序,在底部会源源不断地显示模拟产生的通话记录消息

(4)再切换到运行消息接收者程序的Linux终端窗体,发现其不断地接收发送过来的消息

从输出结果可以清楚地看到,接收的Kafka消息是一系列(K,V)键值对形式的二元组,其中的K代表

CallMsgProducer.py程序中设定的"call"字符串,V代表消息内容。键(K)可以设置成任意字符串,当然

也可以不设置,实际使用的是二元组里面的值(V),即消息内容

5.Spark Streaming通话记录消息处理:将生成的通话记录消息进行简单的处理并保存在HDFS中

(1)在项目的main.py文件中将原有代码删除,并添加下面的代码

from pyspark.streaming.kafka import KafkaUtils

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

from pyspark.sql import SparkSession

from datetime import datetime

# 初始化sc、ssc、spark等几个核心变量

spark = SparkSession.builder \

    .master('local[2]') \

    .appName('KafkaStreamingBilling') \

    .getOrCreate()

sc = spark.sparkContext

sc.setLogLevel("OFF")  #关闭日志

ssc = StreamingContext(sc, 5)

spark = SparkSession(sc)

(2)定义process()和saveYmCallData()函数

# 定义一个处理消息的函数,返回一个通话记录的元组

# (主叫号码,呼叫时间,接通时间,挂断时间,通话时长,被叫号码,年月)

def process(x):

    v = x[1].split(',')

    tformat = '%Y-%m-%d %H:%M:%S'

    d1 = datetime.strptime(v[3], tformat)

    d2 = datetime.strptime(v[4], tformat)

    ym = '%d%02d' % (d1.year, d1.month)

    sec = (d2-d1).seconds

    minutes = sec//60 if sec%60==0 else sec//60+1

    return (v[0],v[2],v[3],v[4],str(minutes),v[1],ym)

# 根据参数row中的年月信息,获取相应的通话消息记录,并保存到HDFS

def saveYmCallData(row):

    year_month = row.ym

    path = "hdfs://localhost:9000/datas/" + year_month + "/"

    ymdf = spark.sql("select * from phonecall where ym='" + year_month +"'")

    ymdf.drop('ym').write.save(path, format="csv", mode="append")

(3)再定义一个save()函数,以实现DStream的通话记录消息保存

# 保存DStream的消息记录

def save(rdd):

    if not rdd.isEmpty():

        rdd2 = rdd.map(lambda x: process(x))

        print(rdd2.count())

        df0 = rdd2.toDF(['src_phone', 'dail_time', 'call_time', 'hangup_time',

                         'call_minutes', 'dst_phone', 'ym'])

        df0.createOrReplaceTempView("phonecall")

        df1 = spark.sql('select distinct ym from phonecall')

        if df1.count() == 1:

            print('ooooooooooo')

            year_month = df1.first().ym

            path = "hdfs://localhost:9000/datas/" + year_month + "/"

            df0.drop("ym").write.save(path, format="csv", mode="append")

        else:

            df1.foreach(saveYmCallData)

(4)通过Kafka数据源创建一个DSteam对象,并开始Spark Streaming应用程序的循环

# 从Kafka的多个topic中接收消息

streamRdd = KafkaUtils.createDirectStream(ssc,

               topics = ['130','131','132','133','134',

                         '135','136','137','138','139'],

               kafkaParams = {"metadata.broker.list":"localhost:9092"})

streamRdd.pprint()

streamRdd.foreachRDD(save)

ssc.start()

ssc.awaitTermination()

(5)功能代码编写完毕,现在可以切换到Linux终端窗体,启动main.py程序

spark@vm01:~/PycharmProjects/SparkKafkaBilling$ spark-submit main.py

(6)再打开一个新的Linux终端窗体,启动消息生产者程序CallMsgProducer.py

cd ~/PycharmProjects/SparkKafkaBilling

spark@vm01:~/PycharmProjects/SparkKafkaBilling$ python CallMsgProducer.py

然后可以查看main.py程序所在终端窗体显示的通话记录消息

(7)最后,在HDFS上可以验证收到的通话记录消息是否被成功保存,注意应将下面目录路径中的年月改为实际的时间,这是因为数据是按照当前机器时间在运行的

spark@vm01:~$ hdfs dfs -cat /datas/202505/part-*

至此,我们就完成了整个通话记录处理功能的实现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/web/81911.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue3处理number输入框避免NaN

在 Vue3 中处理 number 类型输入框避免显示 NaN&#xff0c;核心在于正确处理用户输入的非数字值。以下是几种解决方案&#xff1a; 方案1&#xff1a;使用字符串中转 计算属性&#xff08;推荐&#xff09; vue 复制 下载 <template><input v-model"input…

Python自动化之selenium语句——浏览器设置显示尺寸、截图、刷新网页

目录 一、浏览器设置最大化、最小化 1.浏览器最大化 2.浏览器最小化 二、浏览器打开的位置、尺寸 1.浏览器打开位置 2.浏览器打开尺寸 三、浏览器截图 1.截图语句 2.运行成功后查看 四、刷新网页 上一节实现了打开浏览器、打开指定网址、关闭浏览器的操作&#xff0c…

model.classifier:分类头

model.classifier:分类头 分类头(model.classifier)含义 在基于Transformer架构的模型(如BERT、GPT等 )用于分类任务时,“分类头(model.classifier)” 是模型的一个重要组成部分。以Hugging Face的Transformers库为例,许多预训练模型在完成通用的预训练任务(如语言…

4.1.2 操作数据集

在本实战中&#xff0c;我们深入学习了Spark SQL的操作数据集&#xff0c;包括了解Spark会话、准备数据文件、启动Spark Shell以及获取和操作学生数据集。通过Spark Shell&#xff0c;我们可以直接使用SparkSession实例来加载、转换和处理数据。我们学习了如何将文本文件加载为…

LangChain整合Milvus向量数据库实战:数据新增与删除操作

导读&#xff1a;在AI应用开发中&#xff0c;向量数据库已成为处理大规模语义搜索和相似性匹配的核心组件。本文通过详实的代码示例&#xff0c;深入探讨LangChain框架与Milvus向量数据库的集成实践&#xff0c;为开发者提供生产级别的向量数据管理解决方案。 文章聚焦于向量数…

从根源解决Augment免费额度限制问题:Windows详细教程

从根源解决Augment免费额度限制问题&#xff1a;Windows详细教程 本文将详细介绍如何在Windows系统上解决Augment AI助手的"Too many free trials"限制问题&#xff0c;通过清理VS Code缓存和修改设备ID实现无限制使用Augment的方法。 视频地址 augment从根源上解决免…

IoTDB 集成 DBeaver,简易操作实现时序数据清晰管理

数据结构一目了然&#xff0c;跨库分析轻松实现&#xff0c;方便 IoTDB “内部构造”管理&#xff01; 随着物联网场景对时序数据处理需求激增&#xff0c;时序数据库与数据库管理工具的集成尤为关键。作为数据资产的 “智能管家”&#xff0c;借助数据库管理工具的可视化操作界…

应用层协议http(无代码版)

目录 认识URL urlencode 和 urldecode HTTP 协议请求与响应格式 HTTP 的请求方法 GET 方法 POST 方法 HTTP 的状态码 HTTP 常见 Header Location 关于 connection 报头 HTTP版本 远程连接服务器工具 setsockopt 我们来学习应用层协议http。 虽然我们说, 应用层协…

Cangjie 中的值类型与引用类型

1. 值类型和引用类型 1.1 值的存储方式 所有变量在底层实现中&#xff0c;都会关联一个具体的“值”&#xff0c;这个值可能存储在 内存地址 或 寄存器 中。 寄存器用于优化常用变量的访问速度。只有局部、小、频繁使用的变量才更可能被分配到寄存器中。实际行为由编译器根据…

使用el-input数字校验,输入汉字之后校验取消不掉

先说说复现方式 本来input是只能输入数字的&#xff0c;然后你不小心输入了汉字&#xff0c;触发校验了&#xff0c;然后这时候&#xff0c;你发现校验取消不掉了 就这样了 咋办啊&#xff0c;你一看校验没错啊&#xff0c;各种number啥的也写了,发现没问题啊 <el-inputv…

使用 Zabbix 监控 MySQL 存储空间和性能指标的完整实践指南

目录 引言 一、最终目标支持功能 二、监控方案设计 2.1 技术选型 2.2 设计思路 三、实现步骤 3.1 准备工作 3.11 创建 MySQL 监控账号 3.12 配置 .my.cnf 文件 3.2 编写统一脚本 3.3 配置 Zabbix Agent UserParameter 3.4 Zabbix 前端配置建议 四、总结 引言 MySQL …

多元素纳米颗粒:开启能源催化新纪元

在能源转型的浪潮中&#xff0c;纳米催化剂正成为推动能源技术突破的关键力量。多元素纳米颗粒&#xff08;Polyelemental Nanoparticles&#xff09;凭借其独特的元素协同效应&#xff0c;展现出在能源催化领域的巨大潜力。然而&#xff0c;合成这些复杂体系的纳米颗粒面临着诸…

铁路行业数字化应用建设方案

数字化转型面临的挑战 铁路行业正处于数字化转型的关键时期&#xff0c;铁路行业应用场景复杂&#xff0c;数据量巨大&#xff0c;传统信息化建设模式难以满足日益增长的业务需求。铁路企业亟需引入敏捷高效的数字化工具&#xff0c;加速推进业务创新&#xff0c;实现提质增效…

PlankAssembly 笔记 DeepWiki 正交视图三维重建

manycore-research/PlankAssembly | DeepWiki PlankAssembly项目原理 这个项目是一个基于深度学习的3D重建系统&#xff0c;其核心原理是从三个正交视图的工程图纸中重建出3D形状的结构化程序表示。 核心技术原理 1. 问题定义 PlankAssembly旨在从三个正交视图的工程图纸中…

分布式不同数据的一致性模型

1. 强一致性&#xff08;Strong Consistency&#xff09; 定义&#xff1a;所有节点在任何时间点看到的数据完全一致&#xff0c;读操作总是返回最近的写操作结果。特点&#xff1a; 写操作完成后&#xff0c;所有后续读操作都能立即看到更新。通常需要同步机制&#xff08;如…

C文件操作1

一、为什么使用文件 如果没有文件&#xff0c;我们写的程序的数据是存储在电脑的内存中&#xff0c;如果程序退出&#xff0c;内存回收&#xff0c;数据就丢失 了&#xff0c;等再次运行程序&#xff0c;是看不到上次程序的数据的&#xff0c;如果要将数据进行持久化的保存&am…

Centos7.x内网环境Jenkins前端打包环境配置

Centos7.x内网环境Jenkins前端打包环境配置 参考地址&#xff1a; https://www.cnblogs.com/guangdelw/p/18763336 https://2048.csdn.net/682c1be8606a8318e857d687.html 前言&#xff1a;环境描述和目标 最近公司新接了一个项目&#xff0c;要求是&#xff1a;需要再桌面…

Hash 的工程优势: port range 匹配

昨天和朋友聊到 “如何匹配一个 port range”&#xff0c;觉得挺有意思&#xff0c;简单写篇散文。 回想起十多年前&#xff0c;我移植并优化了 nf-HiPAC&#xff0c;当时还看不上 ipset hash&#xff0c;后来大约七八年前&#xff0c;我又舔 nftables&#xff0c;因为用它可直…

kafka学习笔记(三、消费者Consumer使用教程——使用实例及及核心流程源码讲解)

1.核心概念与架构 1.1.消费者与消费者组 Kafka消费者是订阅主题&#xff08;Topic&#xff09;并拉取消息的客户端实例&#xff0c;其核心逻辑通过KafkaConsumer类实现。消费者组&#xff08;Consumer Group&#xff09;是由多个逻辑关联的消费者组成的集合。 核心规则 同一…

《java创世手记》---java基础篇(下)

《Java 创世手记 - 基础篇&#xff08;下&#xff09;》 第五章&#xff1a;契约与规范 —— 接口 (Interfaces) 与抽象类 (Abstract Classes) 造物主&#xff0c;在你日益繁荣的世界里&#xff0c;你发现仅仅依靠“继承”来构建“物种体系”有时会遇到一些限制。比如&#x…