TTAO 预处理、CNN-BiGRU-MSA 模型 时序数据回归分析
时序数据分析方法,特点:
- TTAO 预处理:通过三角拓扑结构增强时序特征的局部和全局关系
- 混合模型架构:
- CNN 层提取局部特征模式
- BiGRU 捕获双向时序依赖
- 多头自注意力机制进行序列建模
- 残差连接和层归一化提高训练稳定性
- 高级优化技术:
- Huber 损失函数(对异常值更鲁棒)
- 指数学习率衰减
- 早停和学习率回调
- 完整工作流:
- 命令行参数配置
- 数据预处理和特征工程
- 模型训练和评估
- 多指标评估和可视化
- 多步预测支持:可通过
--pred_steps
参数配置预测未来多个时间步
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import *
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
import os
import argparse
import warnings
warnings.filterwarnings('ignore')# 设置中文显示
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]def parse_args():"""解析命令行参数"""parser = argparse.ArgumentParser(description='时序数据深度学习回归分析')parser.add_argument('--data_path', type=str, default='data.csv', help='数据文件路径')parser.add_argument('--target_col', type=str, default='value', help='目标列名')parser.add_argument('--window_size', type=int, default=10, help='滑动窗口大小')parser.add_argument('--pred_steps', type=int, default=1, help='预测未来步数')parser.add_argument('--epochs', type=int, default=50, help='训练轮次')parser.add_argument('--batch_size', type=int, default=32, help='批次大小')parser.add_argument('--model_save_path', type=str, default='best_model.h5', help='模型保存路径')return parser.parse_args()def triangular_topological_aggregation_optimization(X, window_size, alpha=0.5):"""时间拓扑聚合优化(TTAO) - 增强时序数据的局部和全局特征关系基于三角拓扑结构对时序窗口内的数据进行加权聚合"""batch_size, seq_len, feature_dim = X.shapettao_output = np.zeros_like(X)for i in range(window_size, seq_len):# 构建三角权重矩阵triangle_weights = np.zeros((window_size, window_size))for j in range(window_size):for k in range(window_size):if k <= j:triangle_weights[j, k] = 1.0 - (j - k) / window_size# 应用三角权重进行聚合window_data = X[:, i-window_size:i, :]weighted_sum = np.zeros((batch_size, window_size, feature_dim))for j in range(window_size):weighted_window = window_data * triangle_weights[j, :].reshape(1, window_size, 1)weighted_sum[:, j, :] = np.sum(weighted_window, axis=1)# 结合原始数据和聚合结果aggregated = np.mean(weighted_sum, axis=1, keepdims=True)ttao_output[:, i, :] = alpha * X[:, i, :] + (1-alpha) * aggregated[:, 0, :]return ttao_outputdef create_sequences(data, window_size, pred_steps=1):"""创建滑动窗口序列,支持多步预测"""X, y = [], []for i in range(len(data) - window_size - pred_steps + 1):X.append(data[i:i+window_size, 0])y.append(data[i+window_size:i+window_size+pred_steps, 0])return np.array(X), np.array(y)def build_advanced_model(input_shape, head_size=256, num_heads=4, ff_dim=4, num_transformer_blocks=4, mlp_units=[128], dropout=0.1, mlp_dropout=0.1):"""构建结合CNN、BiGRU和多头自注意力机制的前沿模型"""inputs = Input(shape=input_shape)x = inputs# 1D卷积提取局部特征x = Conv1D(filters=64, kernel_size=3, padding="causal", activation="relu")(x)x = BatchNormalization()(x)x = Conv1D(filters=128, kernel_size=3, padding="causal", activation="relu")(x)x = BatchNormalization()(x)x = MaxPooling1D(pool_size=2)(x)# BiGRU捕获时序特征x = Bidirectional(GRU(128, return_sequences=True))(x)x = Dropout(0.2)(x)x = Bidirectional(GRU(64, return_sequences=True))(x)x = Dropout(0.2)(x)# 多头自注意力机制for _ in range(num_transformer_blocks):residual = xx = LayerNormalization(epsilon=1e-6)(x)x = MultiHeadAttention(key_dim=head_size, num_heads=num_heads, dropout=dropout)(x, x)x = Dropout(dropout)(x)x = x + residual # 残差连接residual = xx = LayerNormalization(epsilon=1e-6)(x)x = Conv1D(filters=ff_dim, kernel_size=1, activation="gelu")(x)x = Dropout(dropout)(x)x = Conv1D(filters=input_shape[-1], kernel_size=1)(x)x = x + residual # 残差连接x = LayerNormalization(epsilon=1e-6)(x)x = GlobalAveragePooling1D(data_format="channels_first")(x)# MLP层for dim in mlp_units:x = Dense(dim, activation="gelu")(x)x = Dropout(mlp_dropout)(x)outputs = Dense(1)(x) # 单步预测return Model(inputs, outputs)def plot_training_history(history):"""绘制训练历史"""plt.figure(figsize=(14, 5))# 绘制损失plt.subplot(1, 2, 1)plt.plot(history.history['loss'], label='训练损失')plt.plot(history.history['val_loss'], label='验证损失')plt.title('模型损失')plt.ylabel('损失')plt.xlabel('轮次')plt.legend()# 绘制MAEplt.subplot(1, 2, 2)plt.plot(history.history['mae'], label='训练MAE')plt.plot(history.history['val_mae'], label='验证MAE')plt.title('模型MAE')plt.ylabel('MAE')plt.xlabel('轮次')plt.legend()plt.tight_layout()plt.savefig('training_history.png')plt.close()def plot_prediction_results(y_true, y_pred, title='时序数据回归预测结果'):"""绘制预测结果"""plt.figure(figsize=(14, 7))plt.plot(y_true, label='真实值', linewidth=2)plt.plot(y_pred, label='预测值', alpha=0.7, linewidth=2)plt.title(title, fontsize=16)plt.xlabel('时间点', fontsize=14)plt.ylabel('值', fontsize=14)plt.legend(fontsize=12)plt.grid(True, linestyle='--', alpha=0.7)plt.tight_layout()plt.savefig('prediction_results.png')plt.close()def main():args = parse_args()# 1. 数据加载print(f"正在加载数据: {args.data_path}")try:data = pd.read_csv(args.data_path)print(f"数据加载成功,共{len(data)}条记录")except FileNotFoundError:print(f"错误:找不到数据文件 {args.data_path}")return# 2. 数据预处理print("正在进行数据预处理...")values = data[args.target_col].values.reshape(-1, 1)# 创建序列X, y = create_sequences(values, args.window_size, args.pred_steps)# 划分训练集和测试集split_idx = int(0.8 * len(X))X_train, X_test = X[:split_idx], X[split_idx:]y_train, y_test = y[:split_idx], y[split_idx:]# 数据归一化scaler_X = MinMaxScaler(feature_range=(0, 1))scaler_y = MinMaxScaler(feature_range=(0, 1))X_train_scaled = scaler_X.fit_transform(X_train.reshape(-1, X_train.shape[1])).reshape(X_train.shape)X_test_scaled = scaler_X.transform(X_test.reshape(-1, X_test.shape[1])).reshape(X_test.shape)y_train_scaled = scaler_y.fit_transform(y_train.reshape(-1, y_train.shape[1])).reshape(y_train.shape)y_test_scaled = scaler_y.transform(y_test.reshape(-1, y_test.shape[1])).reshape(y_test.shape)# 调整形状以适应模型输入X_train_reshaped = X_train_scaled.reshape(X_train_scaled.shape[0], X_train_scaled.shape[1], 1)X_test_reshaped = X_test_scaled.reshape(X_test_scaled.shape[0], X_test_scaled.shape[1], 1)print(f"训练集形状: {X_train_reshaped.shape}, 测试集形状: {X_test_reshaped.shape}")# 3. 应用TTAO预处理print("正在应用时间拓扑聚合优化(TTAO)...")X_train_tta = triangular_topological_aggregation_optimization(X_train_reshaped, args.window_size)X_test_tta = triangular_topological_aggregation_optimization(X_test_reshaped, args.window_size)# 4. 模型构建print("正在构建模型...")model = build_advanced_model(X_train_tta.shape[1:])# 使用学习率调度和优化器配置lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(initial_learning_rate=1e-3,decay_steps=10000,decay_rate=0.9)optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)model.compile(optimizer=optimizer, loss='huber_loss', metrics=['mae'])model.summary()# 5. 模型训练print("开始训练模型...")callbacks = [EarlyStopping(patience=15, restore_best_weights=True),ModelCheckpoint(args.model_save_path, save_best_only=True, monitor='val_loss'),tf.keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=5, min_lr=1e-6)]history = model.fit(X_train_tta, y_train_scaled,epochs=args.epochs,batch_size=args.batch_size,validation_split=0.2,callbacks=callbacks,verbose=1)# 6. 模型评估print("正在评估模型...")loss, mae = model.evaluate(X_test_tta, y_test_scaled, verbose=0)print(f"测试集指标 - Huber Loss: {loss:.4f}, MAE: {mae:.4f}")# 7. 预测并反归一化y_pred_scaled = model.predict(X_test_tta)y_pred = scaler_y.inverse_transform(y_pred_scaled).flatten()y_true = scaler_y.inverse_transform(y_test_scaled.reshape(-1, y_test_scaled.shape[1])).flatten()# 计算额外指标mse = mean_squared_error(y_true, y_pred)rmse = np.sqrt(mse)mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100print(f"附加指标 - MSE: {mse:.4f}, RMSE: {rmse:.4f}, MAPE: {mape:.2f}%")# 8. 可视化plot_training_history(history)plot_prediction_results(y_true, y_pred)print("分析完成!结果已保存为图表文件")if __name__ == "__main__":main()