核心方法
- 逐行读取 - 最常用,内存占用O(1)
- 分块读取 - 适合超大文件,可控制内存使用
- 内存映射 - 高性能,虚拟内存映射
- 缓冲读取 - 平衡性能和内存
特殊场景处理
- CSV文件 - 使用pandas的chunksize参数
- JSON Lines - 逐行解析JSON对象
- 文本分析 - 内存高效的单词计数示例
关键优化技巧
- 使用生成器 - 避免一次性加载所有数据到内存
- 合理设置块大小 - 平衡内存使用和IO效率
- 进度监控 - 实时显示处理进度
- 错误处理 - 处理编码错误、文件不存在等异常
使用建议
- 小于100MB: 直接读取到内存
- 100MB-1GB: 使用逐行读取或小块读取
- 大于1GB: 使用内存映射或大块分批处理
- 结构化数据: 使用pandas的chunksize参数
这些方法可以处理几GB甚至几十GB的文件而不会导致内存溢出。根据您的具体需求选择最适合的方法即可。
示例代码:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
大文件读取示例 - 避免内存溢出的多种方法
"""import os
import sys
import mmap
import csv
import json
from typing import Generator, Iterator
import pandas as pdclass LargeFileReader:"""大文件读取工具类"""def __init__(self, file_path: str, encoding: str = 'utf-8'):self.file_path = file_pathself.encoding = encodingdef get_file_size(self) -> int:"""获取文件大小(字节)"""return os.path.getsize(self.file_path)def get_file_size_mb(self) -> float:"""获取文件大小(MB)"""return self.get_file_size() / (1024 * 1024)def method1_line_by_line(file_path: str, encoding: str = 'utf-8') -> Generator[str, None, None]:"""方法1: 逐行读取 - 最常用的方法内存使用量: O(1) - 每次只加载一行适用场景: 文本文件、日志文件"""try:with open(file_path, 'r', encoding=encoding) as file:for line_num, line in enumerate(file, 1):# 处理每一行yield line.strip() # 去除行尾换行符# 可选:显示进度if line_num % 10000 == 0:print(f"已处理 {line_num} 行")except FileNotFoundError:print(f"文件未找到: {file_path}")except UnicodeDecodeError as e:print(f"编码错误: {e}")def method2_chunk_reading(file_path: str, chunk_size: int = 1024*1024, encoding: str = 'utf-8') -> Generator[str, None, None]:"""方法2: 按块读取 - 适合处理二进制文件或超大文本文件内存使用量: O(chunk_size) - 每次加载指定大小的块适用场景: 二进制文件、超大文本文件"""try:with open(file_path, 'r', encoding=encoding) as file:while True:chunk = file.read(chunk_size)if not chunk:breakyield chunkexcept FileNotFoundError:print(f"文件未找到: {file_path}")except UnicodeDecodeError as e:print(f"编码错误: {e}")def method3_mmap_reading(file_path: str, encoding: str = 'utf-8') -> Iterator[str]:"""方法3: 内存映射文件 - 高性能读取内存使用量: 虚拟内存映射,物理内存按需加载适用场景: 需要随机访问的大文件、高性能要求"""try:with open(file_path, 'r', encoding=encoding) as file:with mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ) as mmapped_file:# 逐行读取for line in iter(mmapped_file.readline, b""):yield line.decode(encoding).strip()except FileNotFoundError:print(f"文件未找到: {file_path}")except Exception as e:print(f"内存映射错误: {e}")def method4_buffered_reading(file_path: str, buffer_size: int = 8192, encoding: str = 'utf-8') -> Generator[str, None, None]:"""方法4: 缓冲区读取 - 平衡性能和内存使用内存使用量: O(buffer_size)适用场景: 需要自定义缓冲区大小的场景"""try:with open(file_path, 'r', encoding=encoding, buffering=buffer_size) as file:for line in file:yield line.strip()except FileNotFoundError:print(f"文件未找到: {file_path}")except UnicodeDecodeError as e:print(f"编码错误: {e}")def process_large_csv(file_path: str, chunk_size: int = 10000) -> None:"""处理大型CSV文件的示例使用pandas的chunksize参数分块读取"""print(f"开始处理CSV文件: {file_path}")try:# 分块读取CSV文件chunk_iter = pd.read_csv(file_path, chunksize=chunk_size)total_rows = 0for chunk_num, chunk in enumerate(chunk_iter, 1):# 处理当前块print(f"处理第 {chunk_num} 块,包含 {len(chunk)} 行")# 示例处理:统计每列的基本信息print(f"列名: {list(chunk.columns)}")print(f"数据类型: {chunk.dtypes.to_dict()}")# 这里可以添加你的数据处理逻辑# 例如:数据清洗、计算、转换等total_rows += len(chunk)# 可选:限制处理的块数量(用于测试)if chunk_num >= 5: # 只处理前5块breakprint(f"总共处理了 {total_rows} 行数据")except FileNotFoundError:print(f"CSV文件未找到: {file_path}")except pd.errors.EmptyDataError:print("CSV文件为空")except Exception as e:print(f"处理CSV文件时出错: {e}")def process_large_json_lines(file_path: str) -> None:"""处理大型JSON Lines文件 (.jsonl)每行是一个独立的JSON对象"""print(f"开始处理JSON Lines文件: {file_path}")try:with open(file_path, 'r', encoding='utf-8') as file:for line_num, line in enumerate(file, 1):line = line.strip()if not line:continuetry:# 解析JSON对象json_obj = json.loads(line)# 处理JSON对象# 这里添加你的处理逻辑print(f"第 {line_num} 行: {type(json_obj)} - {len(str(json_obj))} 字符")# 示例:提取特定字段if isinstance(json_obj, dict):keys = list(json_obj.keys())[:5] # 只显示前5个键print(f" 键: {keys}")except json.JSONDecodeError as e:print(f"第 {line_num} 行JSON解析错误: {e}")continue# 显示进度if line_num % 1000 == 0:print(f"已处理 {line_num} 行")# 可选:限制处理行数(用于测试)if line_num >= 10000:breakexcept FileNotFoundError:print(f"JSON Lines文件未找到: {file_path}")def process_with_progress_callback(file_path: str, callback_interval: int = 10000) -> None:"""带进度回调的文件处理示例"""reader = LargeFileReader(file_path)file_size_mb = reader.get_file_size_mb()print(f"文件大小: {file_size_mb:.2f} MB")print("开始处理文件...")processed_lines = 0for line in method1_line_by_line(file_path):# 处理每一行# 这里添加你的处理逻辑line_length = len(line)processed_lines += 1# 进度回调if processed_lines % callback_interval == 0:print(f"已处理 {processed_lines:,} 行")# 可选:限制处理行数(用于测试)if processed_lines >= 50000:print("达到处理限制,停止处理")breakprint(f"处理完成,总共处理了 {processed_lines:,} 行")def memory_efficient_word_count(file_path: str) -> dict:"""内存高效的单词计数示例适用于超大文本文件"""word_count = {}print("开始统计单词频率...")for line_num, line in enumerate(method1_line_by_line(file_path), 1):# 简单的单词分割(可以根据需要改进)words = line.lower().split()for word in words:# 清理单词(去除标点符号等)clean_word = ''.join(c for c in word if c.isalnum())if clean_word:word_count[clean_word] = word_count.get(clean_word, 0) + 1# 显示进度if line_num % 10000 == 0:print(f"已处理 {line_num} 行,当前词汇量: {len(word_count)}")print(f"统计完成,总词汇量: {len(word_count)}")# 返回前10个最常用的单词sorted_words = sorted(word_count.items(), key=lambda x: x[1], reverse=True)return dict(sorted_words[:10])def main():"""主函数 - 演示各种大文件读取方法"""# 注意:请替换为你的实际文件路径test_file = "large_file.txt" # 替换为实际的大文件路径csv_file = "large_data.csv" # 替换为实际的CSV文件路径json_file = "large_data.jsonl" # 替换为实际的JSON Lines文件路径print("=== 大文件读取示例 ===\n")# 检查文件是否存在if not os.path.exists(test_file):print(f"测试文件 {test_file} 不存在")print("请创建一个测试文件或修改文件路径")return# 创建文件读取器reader = LargeFileReader(test_file)print(f"文件路径: {test_file}")print(f"文件大小: {reader.get_file_size_mb():.2f} MB\n")# 示例1: 逐行读取(推荐用于大多数文本文件)print("=== 方法1: 逐行读取 ===")line_count = 0for line in method1_line_by_line(test_file):line_count += 1if line_count <= 5: # 只显示前5行print(f"第{line_count}行: {line[:50]}...")if line_count >= 10000: # 限制处理行数breakprint(f"处理了 {line_count} 行\n")# 示例2: 块读取print("=== 方法2: 块读取 ===")chunk_count = 0for chunk in method2_chunk_reading(test_file, chunk_size=1024):chunk_count += 1if chunk_count <= 3: # 只显示前3块print(f"块{chunk_count}: {len(chunk)} 字符")if chunk_count >= 10: # 限制处理块数breakprint(f"处理了 {chunk_count} 个块\n")# 示例3: 内存映射print("=== 方法3: 内存映射 ===")mmap_count = 0try:for line in method3_mmap_reading(test_file):mmap_count += 1if mmap_count >= 10000: # 限制处理行数breakprint(f"使用内存映射处理了 {mmap_count} 行\n")except Exception as e:print(f"内存映射失败: {e}\n")# 示例4: 带进度的处理print("=== 方法4: 带进度回调的处理 ===")process_with_progress_callback(test_file, callback_interval=5000)print()# 示例5: CSV文件处理if os.path.exists(csv_file):print("=== CSV文件处理 ===")process_large_csv(csv_file, chunk_size=1000)print()# 示例6: JSON Lines文件处理if os.path.exists(json_file):print("=== JSON Lines文件处理 ===")process_large_json_lines(json_file)print()# 示例7: 单词计数print("=== 内存高效单词计数 ===")try:top_words = memory_efficient_word_count(test_file)print("前10个最常用单词:")for word, count in top_words.items():print(f" {word}: {count}")except Exception as e:print(f"单词统计失败: {e}")if __name__ == "__main__":main()# 性能优化建议:
"""
1. 选择合适的方法:- 逐行读取: 适用于大多数文本文件- 块读取: 适用于二进制文件或需要自定义处理块的场景- 内存映射: 适用于需要随机访问或高性能要求的场景- pandas分块: 适用于结构化数据(CSV)2. 内存优化:- 及时释放不需要的变量- 使用生成器而不是列表- 避免一次性加载整个文件到内存3. 性能优化:- 合理设置缓冲区大小- 使用适当的编码- 考虑使用多进程/多线程处理4. 错误处理:- 处理文件不存在的情况- 处理编码错误- 处理磁盘空间不足等IO错误
"""
Excel大文件读取方法
1. pandas分块读取 (.xlsx, .xls)
- 适合中等大小的Excel文件
- 可以处理多个工作表
- 支持数据类型自动识别
2. openpyxl逐行读取 (.xlsx) - 推荐
- 内存效率最高的方法
- 使用
read_only=True
模式 - 真正的逐行处理,内存占用O(1)
- 适合处理超大Excel文件
3. xlrd处理 (.xls)
- 专门处理旧版Excel格式
- 分块读取支持
- 适合Legacy Excel文件
4. pyxlsb处理 (.xlsb)
- 处理Excel二进制格式
- 读取速度快,文件小
- 需要额外安装pyxlsb库
新增功能特点
✅ 智能文件信息获取 - 不加载全部数据就能获取文件结构信息
✅ 内存使用对比 - 实时监控不同方法的内存消耗
✅ 批量数据处理 - 支持批次处理和进度监控
✅ 多格式支持 - 支持.xlsx、.xls、.xlsb三种格式
✅ 错误处理 - 完善的异常处理机制
安装依赖
bash
pip install pandas openpyxl xlrd pyxlsb psutil
使用建议
- 小于50MB: 可以使用pandas直接读取
- 50MB-500MB: 使用pandas分块读取
- 大于500MB: 推荐使用openpyxl逐行读取
- 超大文件: 考虑转换为CSV或Parquet格式
这套方案可以处理几GB甚至更大的Excel文件而不会内存溢出,特别是openpyxl的逐行读取方法,是处理超大Excel文件的最佳选择!
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
大文件读取示例 - 避免内存溢出的多种方法
"""import os
import sys
import mmap
import csv
import json
from typing import Generator, Iterator
import pandas as pdclass LargeFileReader:"""大文件读取工具类"""def __init__(self, file_path: str, encoding: str = 'utf-8'):self.file_path = file_pathself.encoding = encodingdef get_file_size(self) -> int:"""获取文件大小(字节)"""return os.path.getsize(self.file_path)def get_file_size_mb(self) -> float:"""获取文件大小(MB)"""return self.get_file_size() / (1024 * 1024)def method1_line_by_line(file_path: str, encoding: str = 'utf-8') -> Generator[str, None, None]:"""方法1: 逐行读取 - 最常用的方法内存使用量: O(1) - 每次只加载一行适用场景: 文本文件、日志文件"""try:with open(file_path, 'r', encoding=encoding) as file:for line_num, line in enumerate(file, 1):# 处理每一行yield line.strip() # 去除行尾换行符# 可选:显示进度if line_num % 10000 == 0:print(f"已处理 {line_num} 行")except FileNotFoundError:print(f"文件未找到: {file_path}")except UnicodeDecodeError as e:print(f"编码错误: {e}")def method2_chunk_reading(file_path: str, chunk_size: int = 1024*1024, encoding: str = 'utf-8') -> Generator[str, None, None]:"""方法2: 按块读取 - 适合处理二进制文件或超大文本文件内存使用量: O(chunk_size) - 每次加载指定大小的块适用场景: 二进制文件、超大文本文件"""try:with open(file_path, 'r', encoding=encoding) as file:while True:chunk = file.read(chunk_size)if not chunk:breakyield chunkexcept FileNotFoundError:print(f"文件未找到: {file_path}")except UnicodeDecodeError as e:print(f"编码错误: {e}")def method3_mmap_reading(file_path: str, encoding: str = 'utf-8') -> Iterator[str]:"""方法3: 内存映射文件 - 高性能读取内存使用量: 虚拟内存映射,物理内存按需加载适用场景: 需要随机访问的大文件、高性能要求"""try:with open(file_path, 'r', encoding=encoding) as file:with mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ) as mmapped_file:# 逐行读取for line in iter(mmapped_file.readline, b""):yield line.decode(encoding).strip()except FileNotFoundError:print(f"文件未找到: {file_path}")except Exception as e:print(f"内存映射错误: {e}")def method4_buffered_reading(file_path: str, buffer_size: int = 8192, encoding: str = 'utf-8') -> Generator[str, None, None]:"""方法4: 缓冲区读取 - 平衡性能和内存使用内存使用量: O(buffer_size)适用场景: 需要自定义缓冲区大小的场景"""try:with open(file_path, 'r', encoding=encoding, buffering=buffer_size) as file:for line in file:yield line.strip()except FileNotFoundError:print(f"文件未找到: {file_path}")except UnicodeDecodeError as e:print(f"编码错误: {e}")def process_large_csv(file_path: str, chunk_size: int = 10000) -> None:"""处理大型CSV文件的示例使用pandas的chunksize参数分块读取"""print(f"开始处理CSV文件: {file_path}")try:# 分块读取CSV文件chunk_iter = pd.read_csv(file_path, chunksize=chunk_size)total_rows = 0for chunk_num, chunk in enumerate(chunk_iter, 1):# 处理当前块print(f"处理第 {chunk_num} 块,包含 {len(chunk)} 行")# 示例处理:统计每列的基本信息print(f"列名: {list(chunk.columns)}")print(f"数据类型: {chunk.dtypes.to_dict()}")# 这里可以添加你的数据处理逻辑# 例如:数据清洗、计算、转换等total_rows += len(chunk)# 可选:限制处理的块数量(用于测试)if chunk_num >= 5: # 只处理前5块breakprint(f"总共处理了 {total_rows} 行数据")except FileNotFoundError:print(f"CSV文件未找到: {file_path}")except pd.errors.EmptyDataError:print("CSV文件为空")except Exception as e:print(f"处理CSV文件时出错: {e}")def process_large_json_lines(file_path: str) -> None:"""处理大型JSON Lines文件 (.jsonl)每行是一个独立的JSON对象"""print(f"开始处理JSON Lines文件: {file_path}")try:with open(file_path, 'r', encoding='utf-8') as file:for line_num, line in enumerate(file, 1):line = line.strip()if not line:continuetry:# 解析JSON对象json_obj = json.loads(line)# 处理JSON对象# 这里添加你的处理逻辑print(f"第 {line_num} 行: {type(json_obj)} - {len(str(json_obj))} 字符")# 示例:提取特定字段if isinstance(json_obj, dict):keys = list(json_obj.keys())[:5] # 只显示前5个键print(f" 键: {keys}")except json.JSONDecodeError as e:print(f"第 {line_num} 行JSON解析错误: {e}")continue# 显示进度if line_num % 1000 == 0:print(f"已处理 {line_num} 行")# 可选:限制处理行数(用于测试)if line_num >= 10000:breakexcept FileNotFoundError:print(f"JSON Lines文件未找到: {file_path}")def process_with_progress_callback(file_path: str, callback_interval: int = 10000) -> None:"""带进度回调的文件处理示例"""reader = LargeFileReader(file_path)file_size_mb = reader.get_file_size_mb()print(f"文件大小: {file_size_mb:.2f} MB")print("开始处理文件...")processed_lines = 0for line in method1_line_by_line(file_path):# 处理每一行# 这里添加你的处理逻辑line_length = len(line)processed_lines += 1# 进度回调if processed_lines % callback_interval == 0:print(f"已处理 {processed_lines:,} 行")# 可选:限制处理行数(用于测试)if processed_lines >= 50000:print("达到处理限制,停止处理")breakprint(f"处理完成,总共处理了 {processed_lines:,} 行")def memory_efficient_word_count(file_path: str) -> dict:"""内存高效的单词计数示例适用于超大文本文件"""word_count = {}print("开始统计单词频率...")for line_num, line in enumerate(method1_line_by_line(file_path), 1):# 简单的单词分割(可以根据需要改进)words = line.lower().split()for word in words:# 清理单词(去除标点符号等)clean_word = ''.join(c for c in word if c.isalnum())if clean_word:word_count[clean_word] = word_count.get(clean_word, 0) + 1# 显示进度if line_num % 10000 == 0:print(f"已处理 {line_num} 行,当前词汇量: {len(word_count)}")print(f"统计完成,总词汇量: {len(word_count)}")# 返回前10个最常用的单词sorted_words = sorted(word_count.items(), key=lambda x: x[1], reverse=True)return dict(sorted_words[:10])def main():"""主函数 - 演示各种大文件读取方法"""# 注意:请替换为你的实际文件路径test_file = "large_file.txt" # 替换为实际的大文件路径csv_file = "large_data.csv" # 替换为实际的CSV文件路径json_file = "large_data.jsonl" # 替换为实际的JSON Lines文件路径print("=== 大文件读取示例 ===\n")# 检查文件是否存在if not os.path.exists(test_file):print(f"测试文件 {test_file} 不存在")print("请创建一个测试文件或修改文件路径")return# 创建文件读取器reader = LargeFileReader(test_file)print(f"文件路径: {test_file}")print(f"文件大小: {reader.get_file_size_mb():.2f} MB\n")# 示例1: 逐行读取(推荐用于大多数文本文件)print("=== 方法1: 逐行读取 ===")line_count = 0for line in method1_line_by_line(test_file):line_count += 1if line_count <= 5: # 只显示前5行print(f"第{line_count}行: {line[:50]}...")if line_count >= 10000: # 限制处理行数breakprint(f"处理了 {line_count} 行\n")# 示例2: 块读取print("=== 方法2: 块读取 ===")chunk_count = 0for chunk in method2_chunk_reading(test_file, chunk_size=1024):chunk_count += 1if chunk_count <= 3: # 只显示前3块print(f"块{chunk_count}: {len(chunk)} 字符")if chunk_count >= 10: # 限制处理块数breakprint(f"处理了 {chunk_count} 个块\n")# 示例3: 内存映射print("=== 方法3: 内存映射 ===")mmap_count = 0try:for line in method3_mmap_reading(test_file):mmap_count += 1if mmap_count >= 10000: # 限制处理行数breakprint(f"使用内存映射处理了 {mmap_count} 行\n")except Exception as e:print(f"内存映射失败: {e}\n")# 示例4: 带进度的处理print("=== 方法4: 带进度回调的处理 ===")process_with_progress_callback(test_file, callback_interval=5000)print()# 示例5: CSV文件处理if os.path.exists(csv_file):print("=== CSV文件处理 ===")process_large_csv(csv_file, chunk_size=1000)print()# 示例6: JSON Lines文件处理if os.path.exists(json_file):print("=== JSON Lines文件处理 ===")process_large_json_lines(json_file)print()# 示例7: 单词计数print("=== 内存高效单词计数 ===")try:top_words = memory_efficient_word_count(test_file)print("前10个最常用单词:")for word, count in top_words.items():print(f" {word}: {count}")except Exception as e:print(f"单词统计失败: {e}")if __name__ == "__main__":main()# 性能优化建议:
"""
Excel文件大文件读取最佳实践:1. 选择合适的库和方法:- openpyxl (read_only=True): 最佳内存效率,支持.xlsx- pandas + chunksize: 易用但内存使用较高- xlrd: 适用于.xls文件- pyxlsb: 适用于.xlsb文件2. Excel特有优化:- 使用read_only=True模式- 设置data_only=True跳过公式计算- 逐行读取而不是一次性加载整个工作表- 按批次处理数据3. 内存优化策略:- 使用生成器和迭代器- 及时释放DataFrame和工作簿对象- 避免同时打开多个工作簿- 考虑将数据转换为更高效的格式(如Parquet)4. 性能建议:- 对于超大Excel文件,考虑先转换为CSV格式- 使用多进程处理多个工作表- 设置合适的批次大小(1000-10000行)- 在SSD上处理文件以提高IO速度5. 文件格式选择:- .xlsx: 现代格式,压缩率高,但读取较慢- .xls: 旧格式,读取快但文件大- .xlsb: 二进制格式,读取快,文件小- .csv: 最快的读取速度,但丢失Excel特性6. 错误处理:- 处理损坏的Excel文件- 处理不同的数据类型和格式- 处理合并单元格- 处理隐藏的行和列安装所需的依赖:
pip install pandas openpyxl xlrd pyxlsb psutil
"""