《智能运维实践 苏娜 孙琳 王鸽著 人工智能技术丛书 自然语言处理的常用算法 日志异常检测 根因定位 网络流量异常检测 清华大学出版社》【摘要 书评 试读】- 京东图书
数据转换是数据预处理中的关键步骤,用于将数据从原始格式转换为适合分析和建模的形式。这一过程能够提高数据质量,确保不同来源或格式的数据在分析时具有一致性和可比性。数据转换通常包括多种技术和方法,例如单位转换、标准化、格式化、归一化以及特征编码等。下面以传感器数据为例,详细介绍数据转换的方法和过程,如示例6-4所示,示例文件为demo/code/chapter6/Data_transformation.py。
1. 示例背景
运维团队需要处理来自不同系统的监控数据,这些数据可能存在以下问题:
- 单位不统一:存储容量单位有GB、MB、KB,时间单位有毫秒(ms)、秒(s)、纳秒(ns)。
- 格式不一致:时间戳格式有UNIX时间戳、ISO8601等多种格式。
- 标准化缺失:状态码存在数字(如200)和文本(如"success")混用的情况。
2. 数据转换
具体数据转换如下:
- 单位转换:将所有容量统一为GB,时间统一为秒。
- 时间标准化:转换为UTC时区的ISO8601格式。
- 状态码规范化:统一为HTTP状态码数字。
- 数据格式化:输出为结构化的JSON格式文件。
【示例6-4】数据转换
import re
import json
from datetime import datetime
import pytz
from enum import Enum# 示例原始数据
raw_data = ['{"host": "server1", "disk_usage": "500GB", "response_time":
120ms", "timestamp": 1678923456, "status": "success"}','{"host": "server2", "disk_usage": "250000MB", "response_time":
0.45", "timestamp": "2023-03-15T12:30:45+08:00", "status": 200}','{"host": "server3", "disk_usage": "1000000KB", "response_time":
1.2s", "timestamp": "15/Mar/2023:12:30:45", "status": "error"}'
]class StatusCode(Enum):SUCCESS = 200ERROR = 500UNKNOWN = 0def convert_size_to_gb(size_str):"""将存储容量统一转换为GB"""size_str = str(size_str).upper()if 'GB' in size_str:return float(size_str.replace('GB', ''))elif 'MB' in size_str:return float(size_str.replace('MB', '')) / 1024elif 'KB' in size_str:return float(size_str.replace('KB', '')) / (1024 * 1024)else: # 无单位假定为GBreturn float(size_str)def convert_time_to_seconds(time_str):"""将时间统一转换为秒"""time_str = str(time_str).lower()if 'ms' in time_str:return float(time_str.replace('ms', '')) / 1000elif 's' in time_str:return float(time_str.replace('s', ''))elif 'ns' in time_str:return float(time_str.replace('ns', '')) / 1e9else: # 无单位假定为秒return float(time_str)def standardize_timestamp(timestamp):"""标准化时间戳为UTC ISO8601格式"""if isinstance(timestamp, (int, float)):return datetime.fromtimestamp(timestamp, pytz.utc).isoformat()elif 'T' in str(timestamp):dt = datetime.fromisoformat(str(timestamp))if dt.tzinfo is None:dt = pytz.utc.localize(dt)else:dt = dt.astimezone(pytz.utc)return dt.isoformat()elif '/' in str(timestamp):dt = datetime.strptime(str(timestamp), '%d/%b/%Y:%H:%M:%S')dt = pytz.timezone('Asia/Shanghai').localize(dt). astimezone(pytz.utc)return dt.isoformat()return Nonedef standardize_status(status):"""标准化状态码"""status = str(status).lower()if status in ('success', 'ok', '200'):return StatusCode.SUCCESS.valueelif status in ('error', 'fail', '500'):return StatusCode.ERROR.valueelif status.isdigit():return int(status)return StatusCode.UNKNOWN.valuedef process_data_entry(entry):"""处理单个数据条目"""try:data = json.loads(entry)except json.JSONDecodeError:print(f"无效JSON格式: {entry}")return Nonereturn {'host': data['host'],'disk_usage_gb': convert_size_to_gb(data['disk_usage']),'response_time_seconds':
onvert_time_to_seconds(data['response_time']),'timestamp_utc': standardize_timestamp(data['timestamp']),'status_code': standardize_status(data['status'])}def analyze_processed_data(processed_data):"""分析处理后的数据"""valid_data = [x for x in processed_data if x is not None]disk_usages = [x['disk_usage_gb'] for x in valid_data]response_times = [x['response_time_seconds'] for x in valid_data]status_codes = [x['status_code'] for x in valid_data]return {'total_entries': len(processed_data),'valid_entries': len(valid_data),'avg_disk_usage': sum(disk_usages) / len(disk_usages) if disk_usages else None,'avg_response_time': sum(response_times) / len(response_times) if response_times else None,'status_distribution': {code: status_codes.count(code) for code in set(status_codes)}}def main():print("原始数据示例:")for data in raw_data[:2]:print(data)print("\n正在处理数据...")processed_data = [process_data_entry(entry) for entry in raw_data]print("\n标准化后的数据示例:")for data in processed_data[:2]:print(json.dumps(data, indent=2))# 分析数据analysis = analyze_processed_data(processed_data)print("\n数据分析结果:")print(json.dumps(analysis, indent=2))# 保存结果with open('data\processed_data.json', 'w') as f:json.dump(processed_data, f, indent=2)print("\n结果已保存到 data\processed_data.json")if __name__ == "__main__":main()
代码解释:
1)单位转换函数
- convert_size_to_gb(): 将存储容量单位从GB、MB、KB统一转换为GB。
- convert_time_to_seconds(): 将时间单位从毫秒(ms)、秒(s)、纳秒(ns)统一转换为秒。
2)标准化函数
- standardize_timestamp(): 处理多种时间戳格式,并统一为UTC时区的ISO8601格式。
- standardize_status(): 统一状态码表示,使用枚举规范状态码。
3)数据处理流程
- 使用process_data_entry()处理单个数据条目,应用所有转换规则。
- 保留原始数据以便追溯。
- 简化处理逻辑,不处理缺失值情况。
4)数据分析
- 计算平均磁盘使用量和响应时间。
- 统计状态码分布。
- 输出数据质量报告。
运行示例6-4,处理结果保存至.\data\processed_data.json,控制台输出如下:
原始数据示例:
{"host": "server1", "disk_usage": "500GB", "response_time": "120ms", "timestamp": 1678923456, "status": "success"}
{"host": "server2", "disk_usage": "250000MB", "response_time": "0.45", "timestamp": "2023-03-15T12:30:45+08:00", "status": 200}正在处理数据...标准化后的数据示例:
{"host": "server1","disk_usage_gb": 500.0,"response_time_seconds": 0.12,"timestamp_utc": "2023-03-15T23:37:36+00:00","status_code": 200
}
{"host": "server2","disk_usage_gb": 244.140625,"response_time_seconds": 0.45,"timestamp_utc": "2023-03-15T04:30:45+00:00","status_code": 200
}数据分析结果:
{"total_entries": 3,"valid_entries": 3,"avg_disk_usage": 248.3647664388021,"avg_response_time": 0.59,"status_distribution": {"200": 2,"500": 1}
}结果已保存到 data\processed_data.json