大数据量下的数据修复与回写Spark on Hive 的大数据量主键冲突排查:COUNT(DISTINCT) 的陷阱

背景与问题概述

        这一周(2025-05-26-2026-05-30)我在搞数据拟合修复优化的任务,有大量的数据需要进行数据处理及回写,大概一个表一天一分区有五六千万数据,大约一百多列的字段。         具体是这样的我先取档案,关联对应表hive对应分区的数据,然后进行算法一系列逻辑处理后,将结果输出到hive,然后再从hive回写一份到oracle里面。         

        spark资源大概我给了不小,数据大概一天40左右吧,大概12个excutor,每一个12G内存,2core吧,拟合完数据,将数据入hive时候,进行了整体去重。 包括且不限于如下操作       

1、.distinct(),         

2、对应主键的去重.dropDuplicates(id),         

3、row_number对id,type主键字段开窗取first         

4、对id,type主键字段开窗,取后续字段的max()

        经过以上操作,我的数据得以在没有主键冲突的情况下顺利的入库到hive中,并且我对入库数据进行group by id,type having count(1) >1时数据也没有出现重复的情况。        

        OK。鬼知道我对上述数据验证进行多少次跑批总结出来的上面的操作。以上是我写入hive的操作。 下面即将是从hive入到oracle艰辛的探索之路。 正常来讲经过上面的数据操作,我从hive入到oracle是不应该出现主键冲突的情况了,因为我有一部分表已经处理入库了,但有一个表就是死活入不进去,我impala都快查烂了,资源监控的同事都给我致电了。         

        为什么调了一天呢,因为跑一个 程序就要个吧小时,代码都快被我调抑郁了。

Hive数据写入阶段的去重策略

经过多次实验和验证,我总结出一套有效的去重方法,确保数据在写入Hive时不出现主键冲突:

1. 整体去重 - distinct()

val distinctDF = originalDF.distinct()

这种方法简单直接,但性能开销较大,适合小数据集或初步去重。

2. 基于主键的去重 - dropDuplicates()

val dedupByKeyDF = originalDF.dropDuplicates("id")

比整体去重更高效,只针对指定列进行去重。

3. 开窗函数取第一条记录

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._val windowSpec = Window.partitionBy("id", "type").orderBy("timestamp")
val firstRecordDF = originalDF.withColumn("rn", row_number().over(windowSpec)).filter("rn = 1").drop("rn")

这种方法在有多条相同主键记录时,可以按指定排序条件保留一条。

4. 开窗函数取最大值记录

val maxValueDF = originalDF.groupBy("id", "type").agg(max("value1").as("value1"), max("value2").as("value2"),/* 其他字段的max操作 */)

对于需要保留最大值的场景,这种聚合方式非常有效。

Hive到Oracle的数据的迁移问题结局

尽管Hive中的数据已经严格去重,但在迁移到Oracle时仍遇到了两个主要问题:

问题1:NULL值导致的主键冲突

-- 问题发现查询
SELECT id, type, COUNT(1) 
FROM hive_table 
WHERE id IS NULL 
GROUP BY id, type 
HAVING COUNT(1) > 1;

解决方案

// 在写入Oracle前增加NULL值处理
val cleanDF = processedDF.na.fill("NULL", Seq("id")).filter("id IS NOT NULL") // 或者直接过滤

问题2:资源不足导致的作业失败

最初配置:

  • 12个Executor

  • 每个Executor 12G内存,2个核心

  • 一个表一天的分区大概处理约40GB数据

作业在运行10-20分钟后失败,经过多次调整,最终稳定运行的配置:

  • 每个Executor 45G内存,这个我觉得得看集群资源,我们集群资源很紧张,大概10TB的内存,都不太够用

  • 适当增加核心数(根据集群情况)我一般都设置2

性能优化经验总结

1. 内存配置黄金法则

对于大规模数据处理,Executor内存配置应遵循:

  • 基础内存 = 数据分区大小 × 安全系数(2-3)

  • 考虑序列化开销和中间数据结构

2. 高效去重策略选择

方法适用场景优点缺点
distinct()小数据集或全字段去重简单性能差
dropDuplicates()已知主键字段高效仅针对指定列
开窗函数需要按条件保留记录灵活可控计算开销大
聚合函数需要保留极值高效只能处理数值字段

3. NULL值处理最佳实践

  • 在数据处理的早期阶段识别和处理NULL值

  • 对于主键字段,NULL值应被替换或过滤

  • 考虑使用COALESCE或NVL函数提供默认值

4. 资源监控与调优技巧

  • 观察GC时间和频率,内存不足时GC会频繁发生

  • 监控Executor心跳丢失情况

  • 适当增加spark.memory.fraction(默认0.6)

  • 考虑启用spark.memory.offHeap.enabled使用堆外内存

优化Demo示例代码

  /*** @date 2025-05-30* @author hebei_xidaocun_laoli*/
// 1. 读取原始数据
val rawDF = spark.table("source_table").where("dt = '20250530'") // 按分区过滤// 2. 多阶段去重处理
val stage1DF = rawDF.dropDuplicates("id") // 初步去重val windowSpec = Window.partitionBy("id", "type").orderBy(col("update_time").desc)
val stage2DF = stage1DF.withColumn("rn", row_number().over(windowSpec)).filter("rn = 1").drop("rn")// 3. NULL值处理
val cleanDF = stage2DF.na.fill(Map("id" -> "NULL_ID","type" -> "DEFAULT"
)).filter("id != 'NULL_ID'") // 或者保留但确保不冲突// 4. 写入Hive
cleanDF.write.mode("overwrite").partitionBy("dt").saveAsTable("result_hive_table")// 5. 配置优化后写入Oracle
cleanDF.write.format("jdbc").option("url", "jdbc:oracle:thin:@//host:port/service").option("dbtable", "target_table").option("user", "username").option("password", "password").option("batchsize", 10000) // 调整批量大小.option("isolationLevel", "NONE") // 对于大数据量写入可提高性能.mode("append").save()

通过这次项目,总结了以下经验:

  1. 数据质量优先:在数据处理早期阶段解决NULL值、重复数据等问题

  2. 渐进式调优:从较小资源开始,逐步增加直至作业稳定运行

  3. 监控驱动:密切监控作业执行情况,特别是GC和内存使用指标

  4. 文档记录:记录每次调整的参数和效果,形成知识库

        大数据处理中的问题往往不是单一因素导致的,需要综合考虑数据特性、处理逻辑和集群资源。希望诸君避免类似的"坑",更高效地完成大数据处理任务。

        这个资源调优是真的恶心,代码没问题,就是和资源有问题,跑着跑着就突然报错了,唉,还好这个端午节前解决了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/pingmian/83360.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于 AUTOSAR 的域控产品软件开发:从 CP 到 AP 的跨越

基于 AUTOSAR 的域控产品软件开发:从 CP 到 AP 的跨越 一、AUTOSAR AP 架构解析:面向智能汽车的自适应框架 (一)引言 随着汽车智能化向 L3 演进,传统 AUTOSAR CP(经典平台)在实时性、动态性和…

Nacos 配置管理案例:nacos-spring-cloud-config-example详解

一、结构说明:基于Spring Cloud Alibaba的微服务示例 nacos-spring-cloud-config-example : 服务提供者 二、技术栈:Spring BootSpring CloudSpring Cloud Alibaba Nacos Actuator(可选:监控) 三、使用环境 安装…

BUUCTF[ACTF2020 新生赛]Include 1题解

BUUCTF[ACTF2020 新生赛]Include 1题解 题目分析:知识准备:php://filter 过滤器参数说明常用过滤器功能对照表 开始解题:原理解析构造payload 总结 题目分析: 生成靶机,打开网址,查看源码,抓包…

vscode + cmake + ninja+ gcc 搭建MCU开发环境

vscode cmake ninja gcc 搭建MCU开发环境 文章目录 vscode cmake ninja gcc 搭建MCU开发环境1. 前言2. 工具安装及介绍2.1 gcc2.1.1 gcc 介绍2.1.2 gcc 下载及安装 2.2 ninja2.2.1 ninja 介绍2.2 ninja 安装 2.3 cmake2.3.1 cmake 介绍2.3.2 cmake 安装 2.4 VScode 3. 上手…

九(1). 引用作为函数参数的使用

引用作为参数使用 在 C 中,引用作为函数参数是一种高效且灵活的参数传递方式,它避免了拷贝开销,同时允许函数直接操作原始数据。 以下是关于引用作为参数的详细使用指南和最佳实践: 1. 引用作为参数的基本用法 (1) 普通引用&…

Linux多路TTS混音播放:让多个语音同时清晰可听

Linux多路TTS混音播放:让多个语音同时清晰可听 为什么需要多路混音播放?技术原理概述第一步:配置ALSA dmix混音插件为什么需要dmix?具体配置步骤第二步:生成TTS语音文件为什么需要格式转换?Python生成脚本第三步:实现多路同时播放播放器设计原理Python实现代码多路同时播…

Spring AI 1.0 GA 深度解析:构建企业级AI应用的全栈实践指南

目录 Spring AI 1.0 核心架构解析统一接口与多模型支持检索增强生成(RAG)全流程实战对话记忆与工具调用进阶模型评估与可观测性体系企业级应用案例与最佳实践未来演进与技术展望1. Spring AI 1.0 核心架构解析 1.1 技术架构演进 #mermaid-svg-ymTZMAaxOwd4OAMu {font-family…

Docker 安装 Redis 容器

系列文章目录 文章目录 系列文章目录前言1 获取redis镜像2 创建和部署redis容器3 查看redis是否启动成功4 使用Redis客户端验证连接总结 前言 搭建环境: ubuntu22.04.05 docker redis: 7.0.10 测试环境: windows: win11 Redis测试客户端:Ti…

学习vue3阶段性复习(插槽,Pinia,生命周期)

目录 插槽(匿名插槽,具名插槽) 插槽概述 匿名插槽 具名插槽 Pinia(统一管理,共享数据) pinia概述 安装和使用Pinia 1 使用命令下载Pinia 2 再main.js中导入,注册到vue框架中 3使用pinia 持久化存储插件 1 第一步&…

嵌入式Linux 期末复习指南(上)

鉴于互联网上针对本科目相关复习视频及资料过少, 撰写本篇期末复习指南用作期末复习知识点扫盲,以应对本科期末考试及格之用。 由于任课老师并透露考试范围或任何有关试卷的相关信息,本篇指南基于教材、上机实验报告及作者经验编写&#xff0…

VScode ios 模拟器安装cocoapods

使用 Homebrew 安装(推荐) 如果你有 Homebrew,直接用它安装更稳定: brew install cocoapods

Python趣学篇:用Pygame打造绚烂流星雨动画

名人说:路漫漫其修远兮,吾将上下而求索。—— 屈原《离骚》 创作者:Code_流苏(CSDN)(一个喜欢古诗词和编程的Coder😊) 专栏介绍:《Python星球日记》 目录 一、项目简介与效果展示二、技术栈与核…

可视化大屏通用模板Axure原型设计案例

本文将介绍一款基于Axure设计的可视化大屏通用模板,适用于城市、网络安全、园区、交通、社区、工业、医疗、能源等多个领域。 模板概述 这款Axure可视化大屏通用模板集成了多种数据展示模块和组件,旨在为用户提供一个灵活、可定制的数据展示平台。无论…

20250530-C#知识:万物之父Object

C#知识:万物之父Object Object类(即object)是所有类的基类,这里面的方法还是需要好好了解一下。 1、Object类 是顶级父类,其他类默认都是Object类的子类(自定义类也会默认继承Object类)可以用O…

苹果应用开发详细教程(2025最新版)

苹果应用开发详细教程(2025最新版) 第一阶段:开发环境搭建 硬件准备 Mac电脑(macOS Monterey 12或更高版本)iPhone/iPad(真机调试建议iOS 16+)软件安装 # 通过App Store安装Xcode xcode-select --installXcode 15+(包含Swift 5.9编译器)安装CocoaPods(依赖管理工具)…

flutter项目迁移空安全

重中之重 备份好项目文件,甚至连已经加载好的flutter库也可以备份。环境包升级 2.1 不要直接换成flutter:3.0以上的版本,这样做既有基本的库兼容问题,又有空安全下的语法问题(整个项目中需要增加 late、?、!的语法错误,一片报错的…

架构师面试题整理

以下是从提供的HTML代码中提取的所有class"title-txt"的文本内容,已排除重复项并按顺序整理: 缓存专题 实战解决大规模缓存击穿导致线上数据库压力暴增面试常问的缓存穿透是怎么回事基于DCL机制解决突发性热点缓存并发重建问题实战Redis分布…

pytest 中 fixture 与类继承交互导致的问题

文章目录 问题分析将属性绑定到 **类** 上使用 scopefunction 解决方法为什么有两个不同的对象核心原因:fixture 的执行上下文scopefunction 的情况scopeclass 的情况 为什么 pytest 要这样做?这是 pytest 的设计局限 总结 本文探讨 Pytest 中 fixture 作…

uniapp+ts模拟popup弹出框(下拉框)

效果图&#xff08;未展开的样子&#xff09;&#xff1a; 效果图&#xff08;展开的样子&#xff09;&#xff1a; 子组件代码&#xff1a; <!--* Date: 2024-04-26 14:30:00* LastEditTime: 2025-05-29 09:01:06* Description: 技术服务 --> <template><view …

中小型企业大数据平台全栈搭建:Hive+HDFS+YARN+Hue+ZooKeeper+MySQL+Sqoop+Azkaban 保姆级配置指南

目录 背景‌一、环境规划与依赖准备‌1. 服务器规划(3节点集群)2. 系统与依赖‌3. Hadoop生态组件版本与下载路径4. 架构图二、Hadoop(HDFS+YARN)安装与配置‌1. 下载与解压(所有节点)2. HDFS高可用配置3. YARN资源配置‌4. 启动Hadoop集群三、MySQL安装与Hive元数据配置…