spark3 streaming 读kafka写es

1. 代码

package data_import
import org.apache.spark.sql.{DataFrame, Row, SparkSession, SaveMode}
import org.apache.spark.sql.types.{ArrayType, DoubleType, LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.commons.lang3.exception.ExceptionUtils
import java.time.Instant
import java.time.LocalDateTime
import java.time.ZoneId
import org.apache.spark._
import org.apache.spark.streaming._
import alarm.Alarm
import org.elasticsearch.spark.sql._/** 用户特征记录*/
object KafkaImport {case class Parram(env: String, dst_table: String, kafka_address: String, kafka_topic: String, kafka_group_id: String, trigger_time: String)def main(args: Array[String]): Unit = {val param: Parram = utils.parseParam[Parram](args)println("args:" + param.toString())try {processTable(param)Alarm.alarm(env = param.env,level = Alarm.LevelInfo,content = "UserFeature Success")} catch {case e: Exception =>val msg = s"UserFeature handle failed,Error message:${e.getClass()}:${e.getMessage()}===>${ExceptionUtils.getStackTrace(e)}==>argsMap:${param.toString()}"println(msg)Alarm.alarm(env = param.env, level = Alarm.LevelWarning, content = msg)}}def processTable(param: Parram): Unit = {val conf = new SparkConf().setAppName("appName").setMaster("yarn")val ssc = new StreamingContext(conf, Seconds(param.trigger_time.toInt))val ss = SparkSession.builder.appName("KafkaImport").config("spark.sql.mergeSmallFiles.enabled", "true").config("spark.sql.mergeSmallFiles.threshold.avgSize", "true").config("spark.sql.mergeSmallFiles.maxSizePerTask", "true").config("es.net.http.auth.user", "elastic").config("es.net.http.auth.pass", "YX2021@greendog").getOrCreate()val kafkaParams = Map[String, Object]("bootstrap.servers" -> param.kafka_address,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> param.kafka_group_id,"auto.offset.reset" -> "earliest","enable.auto.commit" -> (false: java.lang.Boolean))val topics = Array(param.kafka_topic)val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))val schema: StructType = StructType(List(StructField("key", StringType, true),StructField("value", StringType, true)))stream.foreachRDD { rdd =>val data1 = rdd.map { record => Row(record.key, record.value) }val userData = ss.createDataFrame(data1, schema).withColumn("id", get_json_object(col("value"), "$.ctx.session_id").cast(StringType)).withColumn("app_id", get_json_object(col("value"), "$.ctx.app_id").cast(StringType)).withColumn("user_id", get_json_object(col("value"), "$.ctx.user_id").cast(StringType)).withColumn("session_id", get_json_object(col("value"), "$.ctx.session_id").cast(StringType)).withColumn("time", get_json_object(col("value"), "$.time").cast(LongType)).withColumn("datetime", getDayHourTime(col("time")).cast(TimestampType))userData.show()println("尝试连接ES...")val esDF = ss.read.format("org.elasticsearch.spark.sql").option("es.nodes", "192.168.145.43").option("es.port", "9200").option("es.net.http.auth.user", "elastic").option("es.net.http.auth.pass", "YX2021@greendog").load("test_saylo_user_feature_30033")println(s"索引中存在 ${esDF.count()} 条记录")userData.select(col("id"), col("session_id"), col("value"), col("app_id"), col("datetime")).filter(col("id").isNotNull && col("id") =!= "").write.option("es.nodes", "192.168.145.43").option("es.nodes.wan.only", "true").option("es.port", "9200").option("es.mapping.id", "id") // 替换为您的实际ID字段名// .option("es.mapping.type", "user_id:keyword") // 替换为您的实际ID字段名.mode("append").option("es.write.operation", "upsert").format("org.elasticsearch.spark.sql").option("es.net.http.auth.user", "elastic").option("es.net.http.auth.pass", "YX2021@greendog").save("test_saylo_user_feature_30033")val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}ssc.start()// 等待停止ssc.awaitTermination()}private val getDayHourTime = udf((timestamp: Long) => {utils.getDayTime(timestamp)})
}

2. 依赖

<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xverse.saylo.rec</groupId><artifactId>saylo_rec_data_offline_v2</artifactId><version>1.0.0</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>1.8</java.version><scala.version>2.12.10</scala.version><spark.version>3.2.2</spark.version><jackson.version>2.14.0</jackson.version><shade.jar.name>${project.artifactId}-${project.version}-jar-with-dependencies.jar</shade.jar.name></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-spark-30_2.12</artifactId><version>7.12.0</version></dependency><dependency><groupId>commons-httpclient</groupId><artifactId>commons-httpclient</artifactId><version>3.1</version>  <!-- 或者你需要的版本 --></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.10.1</version>    <!-- 使用最新版本 --></dependency><dependency><groupId>com.qcloud</groupId><artifactId>cos_api</artifactId><version>5.6.227</version></dependency><dependency><groupId>com.typesafe.play</groupId><artifactId>play-json_2.12</artifactId><version>2.9.2</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-annotations</artifactId><version>${jackson.version}</version>    <!-- Add Jackson dependencies --></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>${jackson.version}</version></dependency><!--    <dependency>--><!--      <groupId>org.apache.hadoop</groupId>--><!--      <artifactId>hadoop-client</artifactId>--><!--      <version>${spark.version}</version>--><!--    </dependency>--><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.6.1</version></dependency><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java-util</artifactId><version>3.6.1</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-netty</artifactId><version>1.64.0</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-protobuf</artifactId><version>1.64.0</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-stub</artifactId><version>1.64.0</version></dependency><!-- <dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.6</version></dependency> --><dependency><groupId>org.json4s</groupId><artifactId>json4s-core_2.12</artifactId><version>3.6.6</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.6.3</version></dependency><dependency><groupId>com.tencentcloudapi</groupId><artifactId>tencentcloud-sdk-java-cls</artifactId><version>3.1.1174</version>    <!-- 使用最新版本 --></dependency><dependency><groupId>com.tencentcloudapi.cls</groupId><artifactId>tencentcloud-cls-sdk-java</artifactId><version>1.0.15</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency></dependencies><build><extensions><extension><groupId>kr.motd.maven</groupId><artifactId>os-maven-plugin</artifactId><version>1.6.0</version></extension></extensions><plugins><!--      <plugin>--><!--        <groupId>org.scala-tools</groupId>--><!--        <artifactId>maven-scala-plugin</artifactId>--><!--        <version>2.9.1</version>--><!--        <executions>--><!--          <execution>--><!--            <goals>--><!--              <goal>compile</goal>--><!--              <goal>testCompile</goal>--><!--            </goals>--><!--          </execution>--><!--        </executions>--><!--      </plugin>--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target><encoding>utf-8</encoding></configuration></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.0</version><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>compile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.5.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><finalName>${shade.jar.name}</finalName><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><relocations><!-- <relocation><pattern>org.apache.commons</pattern><shadedPattern>com.acme.shaded.apachecommons</shadedPattern></relocation> --><relocation><pattern>com.google.protobuf</pattern><shadedPattern>my.project.shaded.protobuf</shadedPattern></relocation></relocations><createDependencyReducedPom>false</createDependencyReducedPom><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>my.Application</mainClass><manifestEntries><Implementation-Version>${version}</Implementation-Version><Main-Class>my.Application</Main-Class></manifestEntries></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /></transformers></configuration></execution></executions></plugin></plugins>
</build>
</project>

3. 注意事项

  1. ElasticSearch 7.x 默认不在支持指定索引类型
    如果es版本是7.+
    需注意
    save应写为
    .save(“test_saylo_user_feature_30033”)
    而不是
    .save(“test_saylo_user_feature_30033/docs”)
    否则会报类型转换错误。例如
    [user_id] cannot be changed from type [keyword] to [text]

  2. 依赖冲突
    找不到类

    Caused by: java.lang.ClassNotFoundException: com.acme.shaded.apachecommons.httpclient.HttpConnectionManager
    

在pom文件中手动加入,具体参加上面的pom文件

    <dependency><groupId>commons-httpclient</groupId><artifactId>commons-httpclient</artifactId><version>3.1</version>   </dependency>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/90719.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/90719.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【跟着PMP学习项目管理】每日一练 - 3

1、你是一个建筑项目的项目经理。电工已经开始铺设路线,此时客户带着一个变更请求来找你。他需要增加插座,你认为这会增加相关工作的成本。你要做的第一件事? A、拒绝做出变更,因为这会增加项目的成本并超出预算 B、参考项目管理计划,查看是否应当处理这个变更 C、查阅…

CentOS 安装 JDK+ NGINX+ Tomcat + Redis + MySQL搭建项目环境

目录第一步&#xff1a;安装JDK 1.8方法 1&#xff1a;安装 Oracle JDK 1.8方法 2&#xff1a;安装 OpenJDK 1.8第二步&#xff1a;使用yum安装NGINX第三步&#xff1a;安装Tomcat第四步&#xff1a;安装Redis第五步&#xff1a;安装MySQL第六步&#xff1a;MySQL版本兼容性问题…

如何设计一个登录管理系统:单点登录系统架构设计

关键词&#xff1a;如何设计一个登录管理系统、登录系统架构、用户认证、系统安全设计 &#x1f4cb; 目录 开篇&#xff1a;为什么登录系统这么重要&#xff1f;整体架构设计核心功能模块安全设计要点技术实现细节性能优化策略总结与展望 开篇&#xff1a;为什么登录系统这么…

论迹不论心

2025年7月11日&#xff0c;16~26℃&#xff0c;阴 紧急不紧急重要 备考ing 备课不重要 遇见&#xff1a;免费人格测试 | 16Personalities&#xff0c;下面是我的结果 INFJ分析与优化建议 User: Anonymous (隐藏) Created: 2025/7/11 23:38 Updated: 2025/7/11 23:43 Exported:…

【面板数据】省级泰尔指数及城乡收入差距测算(1990-2024年)

对中国各地区1990-2024年的泰尔指数、城乡收入差距进行测算。本文参考龙海明等&#xff08;2015&#xff09;&#xff0c;程名望、张家平&#xff08;2019&#xff09;的做法&#xff0c;采用泰尔指数测算城乡收入差距。参考陈斌开、林毅夫&#xff08;2013&#xff09;的做法&…

http get和http post的区别

HTTP GET 和 HTTP POST 是两种最常用的 HTTP 请求方法&#xff0c;它们在用途、数据传输方式、安全性等方面存在显著差异。以下是它们的主要区别&#xff1a;1. 用途GET&#xff1a;主要用于请求从服务器获取资源&#xff0c;比如获取网页内容、查询数据库等。GET 请求不应该用…

I2C集成电路总线

&#xff08;摘要&#xff1a;空闲时&#xff0c;时钟线数据线都是高电平&#xff0c;主机发送数据前&#xff0c;要在时钟为高电平时&#xff0c;把数据线从高电平拉低&#xff0c;数据发送采取高位先行&#xff0c;时钟线低电平时可以修改数据线&#xff0c;时钟线高电平时要…

为了安全应该使用非root用户启动nginx

nginx基线安全&#xff0c;修复步骤。主要是由于使用了root用户启动nginx。为了安全应该使用非root用户启动nginx一、检查项和问题检查项分类检查项名称身份鉴别检查是否配置Nginx账号锁定策略。服务配置检查Nginx进程启动账号。服务配置Nginx后端服务指定的Header隐藏状态服务…

论文解析篇 | YOLOv12:以注意力机制为核心的实时目标检测算法

前言&#xff1a;Hello大家好&#xff0c;我是小哥谈。长期以来&#xff0c;改进YOLO框架的网络架构一直至关重要&#xff0c;但尽管注意力机制在建模能力方面已被证明具有优越性&#xff0c;相关改进仍主要集中在基于卷积神经网络&#xff08;CNN&#xff09;的方法上。这是因…

学习C++、QT---20(C++的常用的4种信号与槽、自定义信号与槽的讲解)

每日一言相信自己&#xff0c;你比想象中更接近成功&#xff0c;继续勇往直前吧&#xff01;那么我们开始用这4种方法进行信号与槽的通信第一种信号与槽的绑定方式我们将按键右键后转到槽会自动跳转到这个widget.h文件里面并自动生成了定义&#xff0c;我们要记住我们这个按钮叫…

Anolis OS 23 架构支持家族新成员:Anolis OS 23.3 版本及 RISC-V 预览版发布

自 Anolis OS 23 版本发布之始&#xff0c;龙蜥社区就一直致力于探索同源异构的发行版能力&#xff0c;从 Anolis OS 23.1 版本支持龙芯架构同源异构开始&#xff0c;社区就在持续不断地寻找更多的异构可能性。 RISC-V 作为开放、模块化、可扩展的指令集架构&#xff0c;正成为…

4万亿英伟达,凭什么?

CUDA正是英伟达所有神话的起点。它不是一个产品&#xff0c;而是一个生态系统。当越多的开发者使用CUDA&#xff0c;就会催生越多的基于CUDA的应用程序和框架&#xff1b;这些杀手级应用又会吸引更多的用户和开发者投身于CUDA生态。这个正向飞轮一旦转动起来&#xff0c;其产生…

Unity3D iOS闪退问题解决方案

前言 在Unity3D开发中解决iOS闪退问题需要系统性排查&#xff0c;以下是关键步骤和解决方案&#xff1a; 对惹&#xff0c;这里有一个游戏开发交流小组&#xff0c;希望大家可以点击进来一起交流一下开发经验呀&#xff01; 1. 获取崩溃日志&#xff08;关键第一步&#xff…

嵌入式八股文之 GPIO

1. GPIO 的基本概念(1) 什么是 GPIO&#xff1f;GPIO 的中文意思是通用输入输出端口&#xff08;General Purpose Input/Output&#xff09;&#xff0c;是嵌入式系统中可编程控制的通用引脚&#xff0c;可通过软件配置为输入或输出模式。&#xff08;背诵&#xff09;(2) 它的…

Umi-OCR 的 Docker安装(win制作镜像,Linux(Ubuntu Server 22.04)离线部署)

前置博客&#xff1a;Ubuntu-Server 22.04.4 详细安装图文教程 wget命令在windows终端下不能使用的原因及解决办法 在 Ubuntu 22.04 LTS 上离线安装 Docker 手把手教你在Win11下安装docker Umi-OCR 安装docker时报错&#xff1a;workstation服务启动报错。错误1075&#…

力扣242.有效的字母异位词

给定两个字符串 s 和 t &#xff0c;编写一个函数来判断 t 是否是 s 的 字母异位词。示例 1:输入: s "anagram", t "nagaram" 输出: true示例 2:输入: s "rat", t "car" 输出: false提示:1 < s.length, t.length < 5 * 104s…

基于Springboot+UniApp+Ai实现模拟面试小工具二:后端项目搭建

本节介绍本项目后端项目的开发工具及基础项目的搭建&#xff0c;包括开发工具介绍及后端项目的创建和依赖框架的引入及对应配置。 源码下载&#xff1a; 点击下载 讲解视频&#xff1a; UniappSpringbootKimi实现模拟面试小程序-Springboot项目创建一&#xff0e;开发工具 1.…

Linux711 Mysql

模版 root192.168.235.130s password:┌──────────────────────────────────────────────────────────────────────┐│ • MobaXterm Personal Edition v23.2 • ││…

QT 秘钥生成工具

该项目是注册机和验证机项目&#xff0c;分别是密钥生成工具&#xff0c;和密钥验证demo,可以识别电脑唯一标识码。#include "frmmain.h" #include "ui_frmmain.h" #include "qmessagebox.h" #include "qfile.h" #pragma execution_ch…

PyTorch神经网络训练全流程详解:从线性层到参数优化

目录 一、神经网络训练的核心组件 二、代码逐行解析与知识点 三、核心组件详解 3.1 线性层(nn.Linear) 3.2 损失函数(nn.MSELoss) 3.3 优化器(optim.SGD) 四、训练流程详解 五、实际应用建议 六、完整训练循环示例 七、总结 在深度学习实践中&#xff0c;理解神经网络…