一、全量同步方案设计
1.1 基础命令模板
sqoop import \
--connect jdbc:mysql://mysql_host:3306/db_name \
--username user \
--password pass \
--table source_table \
--hive-import \
--hive-table target_table \
--hive-overwrite \ # 覆盖已有表
--num-mappers 8 \ # 并行度(根据集群资源调整)
--split-by id \ # 分片字段(需为数字类型)
--fields-terminated-by '\001' \
--lines-terminated-by '\n' \
--null-string '\\N' \
--null-non-string '\\N'
1.2 关键参数说明
参数 | 说明 | 必须项 |
---|---|---|
--hive-import | 启用Hive表自动创建 | 否 |
--hive-drop-import-delims | 删除Hive默认分隔符 | 推荐 |
--mapreduce.job.name | 自定义任务名称 | 否 |
--autoreset-to-one-mapper | 无主键表自动单线程 | 推荐 |
1.3 多表批量处理脚本
#!/bin/bash# 配置参数
DB_CONFIG="mysql_host:3306/db_name"
USER="root"
PASS="password"
HIVE_DB="dw"
TABLES=("user" "order" "product") # 表名列表# 循环处理每个表
for TABLE in "${TABLES[@]}"
doecho "正在同步表: $TABLE"sqoop import \--connect "jdbc:mysql://$DB_CONFIG" \--username $USER \--password $PASS \--table $TABLE \--hive-import \--hive-database $HIVE_DB \--hive-overwrite \--num-mappers $(get_conf $TABLE) \ # 动态获取并行度--split-by $(get_split_col $TABLE) \ # 动态获取分片字段--null-string '\\N' \--null-non-string '\\N'
done
二、增量同步方案设计
2.1 Append模式(新增数据)
sqoop import \
--connect jdbc:mysql://mysql_host:3306/db_name \
--username user \
--password pass \
--table source_table \
--hive-import \
--hive-table target_table \
--incremental append \
--check-column update_time \ # 时间戳字段
--last-value '2024-05-01 00:00:00' \
--num-mappers 4
2.2 LastModified模式(更新数据)
sqoop import \
--connect jdbc:mysql://mysql_host:3306/db_name \
--username user \
--password pass \
--table source_table \
--hive-import \
--hive-table target_table \
--incremental lastmodified \
--check-column update_time \
--last-value '2024-05-01 00:00:00' \
--merge-key id \ # 主键合并
--num-mappers 4
2.3 自动化增量管理
# 动态获取最后同步时间
LAST_VALUE=$(hive -e "SELECT MAX(update_time) FROM target_table")# 执行增量同步
sqoop import \
--check-column update_time \
--last-value "$LAST_VALUE" \
--incremental append \
--hive-import \
--hive-table target_table
三、多表同步增强方案
3.1 配置驱动模式
table_sync.conf
[user]
table=user
split_col=id
parallel=8[order]
table=order
split_col=order_id
parallel=12[product]
table=product
split_col=product_id
parallel=6
批量执行脚本
#!/bin/bashCONFIG_FILE="table_sync.conf"
HIVE_DB="dw"while IFS='=' read -r key value
doif [[ $key == "table" ]]; thenTABLE=${value}echo "处理表: $TABLE"sqoop import \--connect "jdbc:mysql://mysql_host:3306/db_name" \--username user \--password pass \--table $TABLE \--hive-import \--hive-database $HIVE_DB \--hive-overwrite \--num-mappers $(grep "^${TABLE}=" $CONFIG_FILE | cut -d'=' -f2) \--split-by $(grep "^${TABLE}=" $CONFIG_FILE | cut -d'=' -f3) \--null-string '\\N'fi
done < $CONFIG_FILE
3.2 全量+增量混合模式
#!/bin/bash# 全量同步配置
FULL_SYNC_TABLES=("config" "lookup")# 增量同步配置
INCREMENTAL_TABLES=("user" "order")# 执行全量同步
for TABLE in "${FULL_SYNC_TABLES[@]}"
dosqoop import \--table $TABLE \--hive-import \--hive-overwrite
done# 执行增量同步
for TABLE in "${INCREMENTAL_TABLES[@]}"
dosqoop import \--table $TABLE \--incremental append \--check-column update_time \--last-value $(hive -e "SELECT MAX(update_time) FROM $TABLE")
done
四、关键问题解决方案
4.1 数据类型映射
# 显式指定类型映射(解决TINYINT转BOOLEAN问题)
sqoop import \
--map-column-hive status=STRING \
--map-column-hive is_valid=BOOLEAN
4.2 分区表同步
# 按日期分区
sqoop import \
--hive-partition-key dt \
--hive-partition-value $(date +%Y-%m-%d) \
--where "dt='${DATE}'"
4.3 性能优化参数
# 压缩传输(提升30%网络效率)
sqoop import \
--compress \
--compression-codec org.apache.hadoop.io.compress.SnappyCodec# 内存调优(避免OOM)
sqoop import \
--driver-memory 4G \
--executor-memory 8G
五、监控与容错机制
5.1 同步状态记录
-- 创建同步状态表
CREATE TABLE sync_status (table_name VARCHAR(50) PRIMARY KEY,last_sync_time TIMESTAMP,sync_type VARCHAR(20) -- FULL/APPEND
);
5.2 自动重试策略
MAX_RETRY=3
RETRY_COUNT=0while [ $RETRY_COUNT -lt $MAX_RETRY ]; dosqoop import ... || {RETRY_COUNT=$((RETRY_COUNT+1))sleep 300}
done
5.3 异常检测脚本
#!/bin/bash# 检查Hive表行数
HIVE_COUNT=$(hive -e "SELECT COUNT(*) FROM target_table")# 检查MySQL行数
MYSQL_COUNT=$(mysql -uroot -ppass -D db -e "SELECT COUNT(*) FROM source_table")if [ $HIVE_COUNT -ne $MYSQL_COUNT ]; thenecho "数据不一致!差异行数:$(expr $MYSQL_COUNT - $HIVE_COUNT)"# 触发告警send_alert "Sqoop同步异常"
fi
六、生产环境最佳实践
-
元数据管理
使用sqoop import-all-tables
同步全库时,需提前在Hive创建对应数据库 -
增量同步策略
增量频率 | 适用场景 | 检查字段 ---------|----------|---------- 每分钟 | 实时日志 | log_ts 每小时 | 交易流水 | update_time 每天 | 统计报表 | dt
-
资源隔离方案
# 为不同业务分配独立队列 sqoop import \ --queue hadoop_yarn_queue_olap
-
版本兼容性
MySQL版本 推荐Sqoop版本 注意事项 5.7 1.4.7 需添加JDBC驱动 8.0 1.4.7 需升级Connector/J