@浙大疏锦行
作业:
对之前的信贷项目,利用神经网络训练下,尝试用到目前的知识点让代码更加规范和美观。
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import time
import matplotlib.pyplot as plt
from tqdm import tqdm
import pandas as pd
import warningswarnings.filterwarnings("ignore")# 设置GPU设备
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")# 数据预处理函数
def preprocess_data(data_path):data = pd.read_csv(data_path)# 标签编码home_ownership_mapping = {'Own Home': 1, 'Rent': 2, 'Have Mortgage': 3, 'Home Mortgage': 4}data['Home Ownership'] = data['Home Ownership'].map(home_ownership_mapping)years_in_job_mapping = {'< 1 year': 1, '1 year': 2, '2 years': 3, '3 years': 4,'4 years': 5, '5 years': 6, '6 years': 7, '7 years': 8,'8 years': 9, '9 years': 10, '10+ years': 11}data['Years in current job'] = data['Years in current job'].map(years_in_job_mapping)# 独热编码data = pd.get_dummies(data, columns=['Purpose'])# 转换bool为intfor col in data.select_dtypes(include=['bool']).columns:data[col] = data[col].astype(int)# Term映射term_mapping = {'Short Term': 0, 'Long Term': 1}data['Term'] = data['Term'].map(term_mapping)data.rename(columns={'Term': 'Long Term'}, inplace=True)# 填充缺失值for feature in data.select_dtypes(include=['int64', 'float64']).columns:mode_value = data[feature].mode()[0]data[feature].fillna(mode_value, inplace=True)return data# 加载并预处理数据
data = preprocess_data('data.csv')
X = data.drop('Credit Default', axis=1)
y = data['Credit Default']# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y
)# 归一化
scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)# 转换为Tensor
X_train = torch.FloatTensor(X_train).to(device)
y_train = torch.LongTensor(y_train.values).to(device)
X_test = torch.FloatTensor(X_test).to(device)
y_test = torch.LongTensor(y_test.values).to(device)# 定义模型
class MLP(nn.Module):def __init__(self, input_dim, hidden_dim=10, output_dim=2):super().__init__()self.layers = nn.Sequential(nn.Linear(input_dim, hidden_dim),nn.ReLU(),nn.Linear(hidden_dim, output_dim))def forward(self, x):return self.layers(x)# 初始化模型
input_dim = X_train.shape[1]
model = MLP(input_dim=input_dim, output_dim=2).to(device) # 二分类问题输出维度应为2# 损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001) # 使用Adam优化器# 训练参数
num_epochs = 1000
batch_size = 64
n_batches = len(X_train) // batch_size# 训练循环
losses = []
start_time = time.time()with tqdm(range(num_epochs), desc="训练进度") as pbar:for epoch in pbar:epoch_loss = 0# 小批量训练for i in range(n_batches):start = i * batch_sizeend = start + batch_sizebatch_X = X_train[start:end]batch_y = y_train[start:end]# 前向传播outputs = model(batch_X)loss = criterion(outputs, batch_y)# 反向传播optimizer.zero_grad()loss.backward()optimizer.step()epoch_loss += loss.item()avg_loss = epoch_loss / n_batcheslosses.append(avg_loss)# 更新进度条pbar.set_postfix({'Loss': f'{avg_loss:.4f}'})print(f'训练时间: {time.time()-start_time:.2f}秒')# 绘制损失曲线
plt.figure(figsize=(10, 6))
plt.plot(losses)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss')
plt.grid(True)
plt.show()# 评估模型
model.eval()
with torch.no_grad():outputs = model(X_test)_, predicted = torch.max(outputs, 1)accuracy = (predicted == y_test).float().mean()print(f'测试集准确率: {accuracy.item()*100:.2f}%')
使用设备: cuda:0
训练进度: 100%|██████████| 1000/1000 [03:01<00:00, 5.50it/s, Loss=0.4466]
训练时间: 181.73秒
测试集准确率: 77.20%
代码耗时过长,进行优化
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score
import pandas as pd
import numpy as np
import time
from tqdm import tqdm
import matplotlib.pyplot as plt
import warnings# 禁用警告
warnings.filterwarnings("ignore")# 设备配置
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")# ==================== 数据预处理函数 ====================
def preprocess_data(data_path):"""优化点1:封装预处理逻辑为函数"""data = pd.read_csv(data_path)# 标签编码(优化点2:使用字典映射替代if-else)mapping_dicts = {'Home Ownership': {'Own Home': 1, 'Rent': 2, 'Have Mortgage': 3, 'Home Mortgage': 4},'Years in current job': {'< 1 year': 1, '1 year': 2, '2 years': 3, '3 years': 4,'4 years': 5, '5 years': 6, '6 years': 7, '7 years': 8,'8 years': 9, '9 years': 10, '10+ years': 11},'Term': {'Short Term': 0, 'Long Term': 1}}for col, mapping in mapping_dicts.items():data[col] = data[col].map(mapping)# 独热编码(优化点3:自动处理新特征)data = pd.get_dummies(data, columns=['Purpose'], drop_first=True)# 优化点4:自动类型转换for col in data.select_dtypes(include=['bool', 'uint8']).columns:data[col] = data[col].astype(int)# 优化点5:统一缺失值处理num_cols = data.select_dtypes(include=['int64', 'float64']).columnsdata[num_cols] = data[num_cols].fillna(data[num_cols].mode().iloc[0])return data# ==================== 数据加载与分割 ====================
data = preprocess_data('data.csv')
X = data.drop('Credit Default', axis=1)
y = data['Credit Default']# 优化点6:分层抽样保证类别平衡
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y
)# 归一化(优化点7:避免数据泄露)
scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)# ==================== 模型定义 ====================
class MLP(nn.Module):"""优化点8:参数化模型结构"""def __init__(self, input_dim, hidden_dims=[64, 32], output_dim=2):super().__init__()layers = []prev_dim = input_dim# 动态构建隐藏层for hidden_dim in hidden_dims:layers.extend([nn.Linear(prev_dim, hidden_dim),nn.ReLU(),nn.Dropout(0.3) # 优化点9:添加Dropout防止过拟合])prev_dim = hidden_dimlayers.append(nn.Linear(prev_dim, output_dim))self.net = nn.Sequential(*layers)def forward(self, x):return self.net(x)# ==================== 训练配置 ====================
# 转换为Tensor
X_train_t = torch.FloatTensor(X_train).to(device)
y_train_t = torch.LongTensor(y_train.values).to(device)
X_test_t = torch.FloatTensor(X_test).to(device)
y_test_t = torch.LongTensor(y_test.values).to(device)# 创建DataLoader(优化点10:批处理)
train_dataset = TensorDataset(X_train_t, y_train_t)
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)# 初始化模型
model = MLP(input_dim=X_train.shape[1]).to(device)# 损失函数与优化器(优化点11:AdamW + 权重衰减)
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)# 学习率调度器(优化点12:余弦退火)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50)# ==================== 训练循环 ====================
def train_model(model, train_loader, criterion, optimizer, num_epochs=100):"""优化点13:封装训练过程"""train_losses = []best_loss = float('inf')patience, counter = 5, 0pbar = tqdm(range(num_epochs), desc="训练进度")for epoch in pbar:model.train()epoch_loss = 0for batch_X, batch_y in train_loader:optimizer.zero_grad()outputs = model(batch_X)loss = criterion(outputs, batch_y)loss.backward()optimizer.step()epoch_loss += loss.item()# 计算平均损失avg_loss = epoch_loss / len(train_loader)train_losses.append(avg_loss)# 早停机制(优化点14)if avg_loss < best_loss:best_loss = avg_losscounter = 0torch.save(model.state_dict(), 'best_model.pth')else:counter += 1if counter >= patience:print(f"\n早停触发,最佳损失: {best_loss:.4f}")break# 更新进度条pbar.set_postfix({'Loss': f'{avg_loss:.4f}','LR': f"{optimizer.param_groups[0]['lr']:.2e}"})scheduler.step()return train_losses# 执行训练
start_time = time.time()
loss_history = train_model(model, train_loader, criterion, optimizer, num_epochs=100)
print(f"训练耗时: {time.time()-start_time:.2f}秒")# ==================== 结果可视化 ====================
plt.figure(figsize=(10, 5))
plt.plot(loss_history, label='训练损失')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('训练损失曲线')
plt.legend()
plt.grid(True)
plt.show()# ==================== 模型评估 ====================
model.load_state_dict(torch.load('best_model.pth'))
model.eval()
with torch.no_grad():y_pred = model(X_test_t)_, predicted = torch.max(y_pred, 1)acc = accuracy_score(y_test, predicted.cpu())print(f"\n测试集准确率: {acc*100:.2f}%")# 优化点15:输出分类报告
from sklearn.metrics import classification_report
print("\n分类报告:")
print(classification_report(y_test, predicted.cpu()))
使用设备: cuda:0
训练进度: 33%|███▎ | 33/100 [00:06<00:13, 5.12it/s, Loss=0.4611, LR=2.87e-04]早停触发,最佳损失: 0.4596
训练耗时: 6.45秒
测试集准确率: 77.80%分类报告:precision recall f1-score support0 0.77 0.98 0.86 10771 0.83 0.27 0.40 423accuracy 0.78 1500macro avg 0.80 0.62 0.63 1500
weighted avg 0.79 0.78 0.73 1500
探索性作业(随意完成):
尝试进入nn.Module中,查看他的方法
方法一:使用 dir()
查看所有方法和属性
import torch.nn as nn# 列出 nn.Module 的所有方法和属性
print(dir(nn.Module))
这会输出 nn.Module
的所有成员,包括方法、属性和特殊方法(以 __
开头和结尾的)。
方法二:使用 help()
查看详细文档
help(nn.Module)
这会显示 nn.Module
的完整文档,包括方法的详细说明、参数和返回值。
方法三:查看特定方法的源代码
如果你想深入了解某个方法的实现,可以使用 inspect
模块查看源代码:
import inspect
from torch.nn import Module# 查看 forward 方法的源代码
print(inspect.getsource(Module.forward))