结合生活实例,先简单认识一下什么是循环神经网络
先想个问题:为什么需要 “循环”?
你平时看句子、听语音、看视频,都是 “按顺序” 来的吧?比如 “我吃苹果” 和 “苹果吃我”,字一样但顺序不同,意思天差地别。
传统的神经网络像个 “健忘症患者”—— 处理每个字的时候,只看当前这个字,记不住前面的。比如看 “苹果吃我”,它看到 “苹果” 时,等下看到 “吃我”,早忘了 “苹果” 是啥了,自然分不清意思。
而 RNN 就像个 “有记性的人”—— 处理每个字时,会偷偷记住前面看过的内容。比如看 “苹果吃我”,它看到 “苹果” 时记下来,看到 “吃” 时,结合前面的 “苹果”,再看到 “我” 时,就知道是 “苹果吃我”(虽然不合逻辑,但能记住顺序)。
RNN 到底怎么 “记” 东西?
把 RNN 想象成一个 “复读机 + 记事本” 的结合体,处理序列数据(比如一句话)时,它会按顺序一个字一个字地 “读”,边读边记。
举个例子:用 RNN 理解句子 “我爱吃西瓜”。
第一步(看 “我”):
它先看到 “我”,心里默默记下来(这就是 “记忆”,专业名叫 “隐藏状态”),记的内容是 “现在看到了‘我’”。第二步(看 “爱”):
它不会忘了刚才的 “我”,而是把 “爱” 和之前记的 “我” 放一起,更新记忆:“现在是‘我’+‘爱’”。第三步(看 “吃”):
继续带着之前的记忆(“我 + 爱”),加上 “吃”,记忆变成:“我 + 爱 + 吃”。第四步(看 “西瓜”):
带着 “我 + 爱 + 吃” 的记忆,加上 “西瓜”,最终记忆变成整个句子的信息:“我爱吃西瓜”。
你看,它每一步都把新内容和 “之前的记忆” 混在一起,更新记忆 —— 这就是 “循环” 的意思:后一步依赖前一步的记忆,一步步传递下去。
为什么说 “权重共享”?
还是刚才的例子,RNN 处理 “我”“爱”“吃”“西瓜” 这四个字时,用的是同一套 “记东西的规则”。
就像你记日记,不管今天记开心事还是难过事,都是用同样的方式写在本子上(不会今天用中文,明天用英文)。RNN 也一样,处理每个字的逻辑完全相同,这样既能少学很多规则,又能适应不同长度的句子(比如一句话 3 个字或 10 个字,都能用同一套方法处理)。
RNN 能干啥?
说白了,就是处理 “有顺序” 的事儿:
- 看一句话猜情绪(“这部电影太烂了” 是负面,得记住每个词的顺序才能判断);
- 听语音转文字(声音是按时间顺序来的,前面的音和后面的音有关联);
- 预测明天的天气(今天、昨天的天气会影响明天,得按时间顺序记下来)。
它的毛病在哪?
RNN 的 “记性” 不好,记不住太久远的事。比如一句话特别长:“今天早上出门时忘了带伞,结果……(中间 100 个字)…… 所以全身湿透了”。
RNN 处理到 “全身湿透了” 时,可能早就忘了 “早上没带伞” 这回事了 —— 这就是 “长时记忆差”,专业叫 “梯度消失”,后面的 LSTM、GRU 就是给它加了 “备忘录”,帮它记久一点。
总结一下:
RNN 就像一个 “有短期记忆的复读机”,处理按顺序来的数据时,会把新信息和之前的记忆混在一起,一步步传递下去,所以能理解顺序的重要性。但记性不算太好,长句子容易忘事儿~
专业术语解释
循环神经网络(Recurrent Neural Network, RNN)是一类专门处理序列数据(如文本、语音、时间序列等)的神经网络,其核心是通过隐藏状态的循环传递捕捉数据中的时序依赖关系。以下从专业角度解析其基本结构与机制:
1. 核心目标
传统前馈神经网络(如 CNN、全连接网络)的输入是固定维度的非序列数据,且各层神经元间无反馈连接,无法处理时序依赖(如 “苹果吃我” 与 “我吃苹果” 的语义差异由词序决定)。 RNN 的核心设计是:让网络在处理序列的第 t 步时,能利用第 t-1 步的信息,从而建模序列中 “前因后果” 的关联。
2. 基本结构与循环机制
RNN 的结构可简化为 “输入层 - 隐藏层 - 输出层”,但其核心特征是隐藏层存在自循环连接,即隐藏层的输出会作为自身的输入参与下一时间步的计算。
关键变量定义:
核心计算公式:
隐藏状态更新(循环的核心):
输出计算:
3. 权重共享机制
RNN 的关键特性是所有时间步共享同一套参数(\(W_{hx}, W_{hh}, W_{ho}, b_h, b_o\))。 这意味着:处理序列中不同位置的元素(如第 1 个词与第 t 个词)时,使用相同的权重矩阵与偏置。
- 优势:极大减少参数数量(与序列长度 T 无关),使模型能适应任意长度的序列输入;
- 本质:建模 “序列中通用的时序规律”(如语言中 “主谓宾” 的语法规则对所有句子通用)。
4. 序列处理模式
根据输入序列与输出序列的长度关系,RNN 的应用模式可分为 4 类:
- 一对一:输入输出均为单元素(如固定长度的时序数据分类,如 “用前 3 天天气预测第 4 天”);
- 一对多:单输入生成序列(如输入 “晴天” 生成 “出门带伞?否;适合野餐?是”);
- 多对一:序列输入生成单输出(如文本情感分类,输入句子输出 “正面 / 负面”);
- 多对多:序列输入生成等长 / 不等长序列(如机器翻译,输入 “我爱你” 输出 “I love you”)。
5. 局限性
标准 RNN 的隐藏状态更新依赖线性变换与简单激活函数(如 tanh),在处理长序列(如 T>100)时会出现梯度消失 / 爆炸问题:
- 反向传播时,梯度需通过 \(W_{hh}\) 的多次矩阵乘法传递,当
时梯度会指数级衰减(消失),导致模型无法学习长距离依赖(如 “早上忘带伞...(100 词后)... 淋湿了” 的关联);
- 这一缺陷推动了 LSTM(长短期记忆网络)、GRU(门控循环单元)等改进模型的提出,通过 “门控机制” 动态控制信息的保留与遗忘。
总结:RNN 通过隐藏状态的循环传递与权重共享,实现了对序列时序依赖的建模,是处理时序数据的基础模型;其核心是公式 所体现的 “当前状态依赖历史状态” 的循环逻辑。
简易代码实战(最后附带完整代码)
1. 序列数据的表
# 生成正弦波时序数据
time = np.linspace(0, 2 * np.pi * n_samples / 10, n_samples + seq_length)
data = np.sin(time)# 创建输入序列X和目标值y
for i in range(n_samples):X.append(data[i:i+seq_length]) # 前seq_length个点作为输入y.append(data[i+seq_length]) # 下一个点作为预测目标
- 概念对应:
- 序列数据:正弦波是典型的时序数据,每个点依赖于前面的点。
- 输入序列长度:
seq_length=20
表示用前 20 个时间步预测第 21 个!!!!! - 时间步(time step):每个时间步对应序列中的一个点(如
t=1
对应data[0]
)。
2. RNN 模型结构
class SimpleRNN(nn.Module):def __init__(self, input_size, hidden_size, output_size, num_layers=1):super(SimpleRNN, self).__init__()self.hidden_size = hidden_sizeself.num_layers = num_layers# 定义RNN层(核心组件)self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True, nonlinearity='tanh')# 全连接层:将RNN的输出映射到预测值self.fc = nn.Linear(hidden_size, output_size)def forward(self, x):# 初始化隐藏状态h0(形状:[层数, 批量大小, 隐藏维度])h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)# 前向传播RNN# out形状:[批量大小, 序列长度, 隐藏维度]# hn形状:[层数, 批量大小, 隐藏维度]out, hn = self.rnn(x, h0)# 只取最后一个时间步的输出(用于预测下一个值)out = out[:, -1, :] # 形状:[批量大小, 隐藏维度]# 通过全连接层得到预测值out = self.fc(out) # 形状:[批量大小, 输出维度]return out
一、模型初始化(__init__
方法)
1. 参数含义
def __init__(self, input_size, hidden_size, output_size, num_layers=1):
input_size
:每个时间步的输入特征数(本例中为 1,因为只输入正弦波的当前值)。hidden_size
:隐藏状态的维度(记忆容量),数值越大,模型能记住的信息越多(本例为 64)。output_size
:输出的维度(本例中为 1,因为只预测下一个正弦波值)。num_layers
:RNN 的层数(默认 1 层,可堆叠多层增强表达能力)。
2. RNN 层的定义
self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True, nonlinearity='tanh')
batch_first=True
:输入张量的第 1 维是批量大小([batch, seq_len, feature]
)。nonlinearity='tanh'
:使用 tanh 激活函数(将输出值压缩到 [-1, 1] 区间)。
3. 全连接层的作用
self.fc = nn.Linear(hidden_size, output_size)
- 将 RNN 的隐藏状态(
hidden_size
维)映射到最终输出(output_size
维)。 - 相当于做一个线性变换:
y = W*h + b
。
二、前向传播(forward
方法)
1. 初始化隐藏状态
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
h0
是序列开始时的初始隐藏状态,形状为[层数, 批量大小, 隐藏维度]
。- 初始化为全零向量,表示序列开始时模型没有任何先验记忆。
2. RNN 层的计算
out, hn = self.rnn(x, h0)
输入:
x
:输入序列,形状为[batch, seq_len, input_size]
(本例中为[32, 20, 1]
)。h0
:初始隐藏状态,形状为[num_layers, batch, hidden_size]
。
输出:
out
:所有时间步的隐藏状态,形状为[batch, seq_len, hidden_size]
。hn
:最后一个时间步的隐藏状态(即out
的最后一个时间步),形状为[num_layers, batch, hidden_size]
。
3. 提取最后时间步的输出
out = out[:, -1, :] # 取每个样本的最后一个时间步
out
的原始形状:[batch, seq_len, hidden_size]
(例如[32, 20, 64]
)。- 提取后形状:
[batch, hidden_size]
(例如[32, 64]
)。 - 为什么只取最后一个时间步?因为我们的任务是预测序列的下一个值,最后一个时间步的隐藏状态包含了整个序列的信息。
4. 通过全连接层生成预测
out = self.fc(out) # 将64维隐藏状态映射到1维输出
- 最终输出形状:
[batch, output_size]
(本例中为[32, 1]
)。
三、用具体例子理解数据流动
假设:
- 批量大小
batch_size=2
(同时处理 2 个序列)。 - 序列长度
seq_length=3
(每个序列有 3 个时间步)。 - 输入维度
input_size=1
(每个时间步 1 个特征)。 - 隐藏维度
hidden_size=2
(简化计算)。
1. 输入 x 的形状
x.shape = [2, 3, 1]
# 示例数据:
x = [[[0.1], [0.2], [0.3]], # 第1个序列[[0.4], [0.5], [0.6]] # 第2个序列
]
2. 初始隐藏状态 h0 的形状
h0.shape = [1, 2, 2] # [层数, 批量, 隐藏维度]
# 初始化为全零:
h0 = [[[0, 0], [0, 0]] # 第1层(唯一层)的两个批量的初始隐藏状态
]
3. RNN 计算过程(简化版)
对第 1 个序列的第 1 个时间步x[0, 0] = [0.1]
:
h_1 = tanh(W_hx * [0.1] + W_hh * [0, 0] + b_h)
# 假设W_hx = [[0.5], [0.3]], W_hh = [[0.2, 0.1], [0.4, 0.3]]
h_1 = tanh([0.5*0.1 + 0.2*0 + 0.1*0, 0.3*0.1 + 0.4*0 + 0.3*0])= tanh([0.05, 0.03])≈ [0.05, 0.03] # 经过tanh激活后的结果
类似地,计算后续时间步和其他序列,最终得到:
out.shape = [2, 3, 2]
out = [[[0.05, 0.03], [0.12, 0.08], [0.20, 0.15]], # 第1个序列的3个时间步[[0.25, 0.18], [0.35, 0.25], [0.45, 0.32]] # 第2个序列的3个时间步
]
4. 提取最后时间步并通过全连接层
out[:, -1, :] = [[0.20, 0.15], [0.45, 0.32]] # 形状:[2, 2]# 假设全连接层权重W_fc = [[0.6], [0.7]],偏置b_fc = [0.1]
final_output = [[0.20*0.6 + 0.15*0.7 + 0.1], [0.45*0.6 + 0.32*0.7 + 0.1]]≈ [[0.295], [0.584]] # 形状:[2, 1]
四、关键概念总结
隐藏状态:
- RNN 的核心是隐藏状态
h_t
,它整合了当前输入和历史信息。 - 每个时间步的计算都依赖上一步的隐藏状态,形成 “记忆链”。
- RNN 的核心是隐藏状态
权重共享:
W_hx
和W_hh
在所有时间步中保持不变,减少了参数量。
输入输出形状:
- 输入:
[batch, seq_len, input_size]
。 - 输出:
[batch, seq_len, hidden_size]
(所有时间步)或[batch, hidden_size]
(最后时间步)。
- 输入:
序列建模能力:
- 通过隐藏状态的传递,RNN 能捕捉序列中的时序依赖关系(如正弦波的周期性)。
3. 前向传播与隐藏状态传递
def forward(self, x):h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)out, hn = self.rnn(x, h0)out = out[:, -1, :] # 取最后一个时间步的输出
- 概念对应:
- 初始隐藏状态(h0):序列开始时的记忆(全零向量)。
- 隐藏状态更新:
h_t = tanh(W_hx * x_t + W_hh * h_{t-1})
每个时间步的隐藏状态h_t
整合当前输入x_t
和上一步记忆h_{t-1}
。 - 输出形状:
out
是所有时间步的隐藏状态,形状为[batch, seq_len, hidden_size]
。 - 最终输出:只取最后一个时间步的隐藏状态(
out[:, -1, :]
),用于预测下一个值。
4. 批处理与并行计算
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
- 概念对应:
- 批处理(batch):每次训练同时处理 32 个序列,加速计算。
- 输入形状:
[batch_size, seq_length, input_size]
=[32, 20, 1]
。 - 并行计算:GPU 同时处理 32 个序列的前向 / 反向传播。
5. 训练过程与损失函数
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)for epoch in range(epochs):for X_batch, y_batch in train_loader:outputs = model(X_batch)loss = criterion(outputs, y_batch) # 计算预测值与真实值的MSEoptimizer.zero_grad()loss.backward() # 反向传播计算梯度optimizer.step() # 更新参数
- 概念对应:
- 损失函数(MSE):
Loss = 1/N * Σ(y_pred - y_true)²
衡量预测值与真实值的差异。 - 反向传播:通过链式法则计算每个参数的梯度(如
dLoss/dW_hh
)。 - 梯度消失:标准 RNN 在长序列中梯度会指数衰减(这里序列较短,问题不明显)。
- 损失函数(MSE):
6. 长距离依赖的挑战
# 序列长度seq_length=20,RNN可较好处理
# 若seq_length很大(如100),标准RNN性能会下降
- 概念对应:
- 梯度消失 / 爆炸:RNN 通过
tanh
激活函数传递梯度,当序列很长时,梯度会趋近于 0 或无穷大。 - 改进方案:LSTM/GRU 通过门控机制解决这一问题(后续可尝试替换
nn.RNN
为nn.LSTM
)。
- 梯度消失 / 爆炸:RNN 通过
7. 预测与可视化
plt.plot(targets, label='True Values')
plt.plot(predictions, label='Predictions')
- 概念对应:
- 预测能力:模型学习到正弦波的周期性,能用前 20 个点预测下一个点。
- 泛化验证:测试集上的预测效果验证模型是否真正理解序列规律。
8.完整代码
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import matplotlib.pyplot as plt# --------------训练 RNN 预测下一个时间步的值------------#
# 设置随机种子以确保结果可复现
torch.manual_seed(42)
np.random.seed(42)# 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')# 生成简单的时序数据用于演示
n_samples = 1000
seq_length = 20
time = np.linspace(0, 2 * np.pi * n_samples / 10, n_samples + seq_length)
data = np.sin(time)
X = []
y = []
for i in range(n_samples):X.append(data[i:i + seq_length])y.append(data[i + seq_length])
X = np.array(X)
y = np.array(y)# 转换为PyTorch张量
X = torch.FloatTensor(X).view(n_samples, seq_length, 1) # [batch, seq_len, feature_dim]
y = torch.FloatTensor(y).view(n_samples, 1) # [batch, output_dim]# 划分训练集和测试集
train_size = int(0.8 * n_samples)
train_X, test_X = X[:train_size], X[train_size:]
train_y, test_y = y[:train_size], y[train_size:]# 创建数据加载器
batch_size = 32
train_dataset = TensorDataset(train_X, train_y)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)test_dataset = TensorDataset(test_X, test_y)
test_loader = DataLoader(test_dataset, batch_size=batch_size)# 定义RNN模型
class SimpleRNN(nn.Module):def __init__(self, input_size, hidden_size, output_size, num_layers=1):super(SimpleRNN, self).__init__()self.hidden_size = hidden_sizeself.num_layers = num_layers# PyTorch内置的RNN层# batch_first=True表示输入的形状为[batch, seq_len, feature_dim]self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True, nonlinearity='tanh')# 全连接层:将RNN的输出映射到预测值self.fc = nn.Linear(hidden_size, output_size)def forward(self, x):# 初始化隐藏状态# 形状为[num_layers, batch_size, hidden_size]h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)# 前向传播RNN# out形状为[batch_size, seq_len, hidden_size]# hn形状为[num_layers, batch_size, hidden_size]out, hn = self.rnn(x, h0)# 我们只需要最后一个时间步的输出# 形状变为[batch_size, hidden_size]out = out[:, -1, :]# 通过全连接层得到预测值# 形状变为[batch_size, output_size]out = self.fc(out)return out# 模型参数
input_size = 1 # 输入特征维度(每个时间步的特征数)
hidden_size = 64 # 隐藏层维度
output_size = 1 # 输出维度(预测值的维度)
num_layers = 1 # RNN层数# 初始化模型
model = SimpleRNN(input_size, hidden_size, output_size, num_layers).to(device)# 定义损失函数和优化器
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)# 训练模型
epochs = 50
train_losses = []print("开始训练模型...")
for epoch in range(epochs):total_loss = 0for X_batch, y_batch in train_loader:X_batch, y_batch = X_batch.to(device), y_batch.to(device)# 前向传播outputs = model(X_batch)loss = criterion(outputs, y_batch)# 反向传播和优化optimizer.zero_grad()loss.backward()optimizer.step()total_loss += loss.item()# 计算平均损失avg_loss = total_loss / len(train_loader)train_losses.append(avg_loss)if (epoch + 1) % 10 == 0:print(f'Epoch [{epoch + 1}/{epochs}], Loss: {avg_loss:.4f}')# 评估模型
model.eval()
predictions = []
targets = []with torch.no_grad():for X_batch, y_batch in test_loader:X_batch, y_batch = X_batch.to(device), y_batch.to(device)# 前向传播outputs = model(X_batch)predictions.extend(outputs.cpu().numpy())targets.extend(y_batch.cpu().numpy())predictions = np.array(predictions)
targets = np.array(targets)# 可视化结果
plt.figure(figsize=(12, 5))plt.subplot(1, 2, 1)
plt.plot(train_losses)
plt.title('Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')plt.subplot(1, 2, 2)
plt.plot(targets, label='True Values')
plt.plot(predictions, label='Predictions')
plt.title('Time Series Prediction')
plt.xlabel('Time Step')
plt.ylabel('Value')
plt.legend()plt.tight_layout()
plt.show()# 保存模型
torch.save(model.state_dict(), '1-0-rnn_model.pth')
print('Model saved as rnn_model.pth')