kaggle找到一个图像数据集,用cnn网络进行训练并且用grad-cam做可视化
进阶:并拆分成多个文件
config.py
import os# 基础配置类
class Config:def __init__(self):# Kaggle配置self.kaggle_username = "" # Kaggle用户名self.kaggle_key = "" # Kaggle API密钥# 数据集配置self.dataset_name = "chest-xray-pneumonia" # 默认使用胸部X光数据集self.data_dir = "data"self.train_dir = os.path.join(self.data_dir, "train")self.val_dir = os.path.join(self.data_dir, "val")self.test_dir = os.path.join(self.data_dir, "test")# 模型配置self.model_save_path = "models/cnn_model.h5"self.img_width, self.img_height = 224, 224self.batch_size = 32self.epochs = 10self.learning_rate = 0.001# Grad-CAM配置self.gradcam_output_dir = "gradcam_output"self.target_layer = "block5_conv3" # VGG16最后一个卷积层,根据模型调整
data_loader.py
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from config import Configclass DataLoader:def __init__(self, config: Config):self.config = configself.train_generator = Noneself.val_generator = Noneself.test_generator = Noneself.class_indices = Nonedef setup_data_generators(self):# 数据增强配置train_datagen = ImageDataGenerator(rescale=1./255,rotation_range=20,width_shift_range=0.2,height_shift_range=0.2,shear_range=0.2,zoom_range=0.2,horizontal_flip=True,fill_mode='nearest')test_datagen = ImageDataGenerator(rescale=1./255)# 创建数据生成器self.train_generator = train_datagen.flow_from_directory(self.config.train_dir,target_size=(self.config.img_width, self.config.img_height),batch_size=self.config.batch_size,class_mode='categorical')self.val_generator = test_datagen.flow_from_directory(self.config.val_dir,target_size=(self.config.img_width, self.config.img_height),batch_size=self.config.batch_size,class_mode='categorical')self.test_generator = test_datagen.flow_from_directory(self.config.test_dir,target_size=(self.config.img_width, self.config.img_height),batch_size=self.config.batch_size,class_mode='categorical',shuffle=False)self.class_indices = self.train_generator.class_indicesreturn self.train_generator, self.val_generator, self.test_generatordef get_class_names(self):if self.class_indices is None:self.setup_data_generators()return list(self.class_indices.keys())
grad_cam.py
import os
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import cv2
from tensorflow.keras.models import Model
from config import Configclass GradCAM:def __init__(self, model, class_names, config: Config):self.model = modelself.class_names = class_namesself.config = configos.makedirs(self.config.gradcam_output_dir, exist_ok=True)def generate_heatmap(self, img_array, layer_name=None):if layer_name is None:layer_name = self.config.target_layer# 创建一个用于获取输出的模型grad_model = Model(inputs=[self.model.inputs],outputs=[self.model.get_layer(layer_name).output, self.model.output])# 计算梯度with tf.GradientTape() as tape:conv_outputs, predictions = grad_model(img_array)class_idx = np.argmax(predictions[0])class_name = self.class_names[class_idx]loss = predictions[:, class_idx]# 获取梯度grads = tape.gradient(loss, conv_outputs)# 平均梯度pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))# 权重激活映射conv_outputs = conv_outputs[0]heatmap = tf.reduce_mean(tf.multiply(pooled_grads, conv_outputs), axis=-1)# 归一化热图heatmap = np.maximum(heatmap, 0) / np.max(heatmap)return heatmap, class_name, predictions[0][class_idx]def overlay_heatmap(self, heatmap, img_path, alpha=0.4):# 加载原始图像img = cv2.imread(img_path)img = cv2.resize(img, (self.config.img_width, self.config.img_height))# 调整热图大小heatmap = cv2.resize(heatmap, (img.shape[1], img.shape[0]))# 将热图转换为RGBheatmap = np.uint8(255 * heatmap)heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)# 将热图叠加到原图superimposed_img = heatmap * alpha + imgsuperimposed_img = np.uint8(superimposed_img)return img, heatmap, superimposed_imgdef process_image(self, img_path, layer_name=None):# 加载和预处理图像img = tf.keras.preprocessing.image.load_img(img_path, target_size=(self.config.img_width, self.config.img_height))img_array = tf.keras.preprocessing.image.img_to_array(img)img_array = np.expand_dims(img_array, axis=0)img_array = img_array / 255.0# 生成热图heatmap, class_name, confidence = self.generate_heatmap(img_array, layer_name)# 叠加热图original_img, heatmap_img, superimposed_img = self.overlay_heatmap(heatmap, img_path)# 保存结果filename = os.path.basename(img_path)output_path = os.path.join(self.config.gradcam_output_dir, f"gradcam_{filename}")# 创建可视化fig, axes = plt.subplots(1, 3, figsize=(15, 5))axes[0].imshow(cv2.cvtColor(original_img, cv2.COLOR_BGR2RGB))axes[0].set_title('原始图像')axes[0].axis('off')axes[1].imshow(heatmap)axes[1].set_title('Grad-CAM热图')axes[1].axis('off')axes[2].imshow(cv2.cvtColor(superimposed_img, cv2.COLOR_BGR2RGB))axes[2].set_title(f'叠加图像 - {class_name} ({confidence:.2%})')axes[2].axis('off')plt.tight_layout()plt.savefig(output_path)plt.close()return output_path, class_name, confidence
kaggle_downloader.py
import os
import json
import kaggle
from kaggle.api.kaggle_api_extended import KaggleApi
from config import Config
import zipfileclass KaggleDownloader:def __init__(self, config: Config):self.config = configself.api = Nonedef authenticate(self):# 设置Kaggle API凭证os.environ['KAGGLE_USERNAME'] = self.config.kaggle_usernameos.environ['KAGGLE_KEY'] = self.config.kaggle_key# 初始化API客户端self.api = KaggleApi()self.api.authenticate()def download_dataset(self):if not self.api:self.authenticate()# 创建数据目录os.makedirs(self.config.data_dir, exist_ok=True)# 下载数据集print(f"正在下载数据集: {self.config.dataset_name}")self.api.dataset_download_files(self.config.dataset_name, path=self.config.data_dir, unzip=True)print(f"数据集下载完成,保存路径: {self.config.data_dir}")# 解压文件(如果需要)for file in os.listdir(self.config.data_dir):if file.endswith('.zip'):zip_path = os.path.join(self.config.data_dir, file)with zipfile.ZipFile(zip_path, 'r') as zip_ref:zip_ref.extractall(self.config.data_dir)os.remove(zip_path)
main.py
import argparse
from config import Config
from kaggle_downloader import KaggleDownloader
from data_loader import DataLoader
from model_builder import ModelBuilder
from trainer import Trainer
from grad_cam import GradCAM
import tensorflow as tf
import osdef main():# 解析命令行参数parser = argparse.ArgumentParser(description='Kaggle图像数据CNN训练与Grad-CAM可视化')parser.add_argument('--download', action='store_true', help='下载Kaggle数据集')parser.add_argument('--train', action='store_true', help='训练模型')parser.add_argument('--evaluate', action='store_true', help='评估模型')parser.add_argument('--visualize', action='store_true', help='运行Grad-CAM可视化')parser.add_argument('--dataset', type=str, help='Kaggle数据集名称')parser.add_argument('--model_type', type=str, default='vgg16', choices=['simple', 'vgg16'], help='模型类型')parser.add_argument('--img_path', type=str, help='用于Grad-CAM可视化的图像路径')args = parser.parse_args()# 配置config = Config()# 更新配置if args.dataset:config.dataset_name = args.dataset# 1. 下载Kaggle数据集if args.download:downloader = KaggleDownloader(config)downloader.download_dataset()# 2. 加载数据data_loader = DataLoader(config)train_generator, val_generator, test_generator = data_loader.setup_data_generators()class_names = data_loader.get_class_names()print(f"分类类别: {class_names}")# 3. 构建模型model_builder = ModelBuilder(config, len(class_names))if args.model_type == 'simple':model = model_builder.build_simple_cnn()else:model = model_builder.build_vgg16_model()# 4. 训练模型if args.train:trainer = Trainer(config)history = trainer.train(model, train_generator, val_generator)print("模型训练完成")# 5. 评估模型if args.evaluate:if os.path.exists(config.model_save_path):model = tf.keras.models.load_model(config.model_save_path)print("加载已保存的模型")test_loss, test_acc = model.evaluate(test_generator)print(f"测试集准确率: {test_acc:.2%}")# 6. Grad-CAM可视化if args.visualize:if os.path.exists(config.model_save_path):model = tf.keras.models.load_model(config.model_save_path)print("加载已保存的模型用于可视化")if args.img_path and os.path.exists(args.img_path):grad_cam = GradCAM(model, class_names, config)output_path, class_name, confidence = grad_cam.process_image(args.img_path)print(f"可视化完成,结果保存在: {output_path}")print(f"预测类别: {class_name}, 置信度: {confidence:.2%}")else:print("请提供有效的图像路径")if __name__ == "__main__":main()
model_builder.py
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.applications import VGG16
from tensorflow.keras.optimizers import Adam
from config import Configclass ModelBuilder:def __init__(self, config: Config, num_classes: int):self.config = configself.num_classes = num_classesdef build_simple_cnn(self):# 构建简单的CNN模型model = Sequential([Conv2D(32, (3, 3), activation='relu', input_shape=(self.config.img_width, self.config.img_height, 3)),MaxPooling2D((2, 2)),Conv2D(64, (3, 3), activation='relu'),MaxPooling2D((2, 2)),Conv2D(128, (3, 3), activation='relu'),MaxPooling2D((2, 2)),Flatten(),Dense(128, activation='relu'),Dropout(0.5),Dense(self.num_classes, activation='softmax')])model.compile(optimizer=Adam(learning_rate=self.config.learning_rate),loss='categorical_crossentropy',metrics=['accuracy'])return modeldef build_vgg16_model(self, fine_tune=False):# 构建基于VGG16的预训练模型base_model = VGG16(weights='imagenet',include_top=False,input_shape=(self.config.img_width, self.config.img_height, 3))# 是否微调预训练模型if not fine_tune:for layer in base_model.layers:layer.trainable = False# 添加自定义层x = base_model.outputx = Flatten()(x)x = Dense(256, activation='relu')(x)x = Dropout(0.5)(x)predictions = Dense(self.num_classes, activation='softmax')(x)model = Model(inputs=base_model.input, outputs=predictions)model.compile(optimizer=Adam(learning_rate=self.config.learning_rate),loss='categorical_crossentropy',metrics=['accuracy'])return model
trainer.py
import os
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from config import Configclass Trainer:def __init__(self, config: Config):self.config = configdef train(self, model, train_generator, val_generator):# 创建模型保存目录os.makedirs(os.path.dirname(self.config.model_save_path), exist_ok=True)# 定义回调函数callbacks = [ModelCheckpoint(self.config.model_save_path, monitor='val_accuracy', save_best_only=True, mode='max',verbose=1),EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True,verbose=1),ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=0.00001,verbose=1)]# 训练模型history = model.fit(train_generator,steps_per_epoch=train_generator.samples // self.config.batch_size,validation_data=val_generator,validation_steps=val_generator.samples // self.config.batch_size,epochs=self.config.epochs,callbacks=callbacks)return history
@浙大疏锦行