文章概要
本文详细介绍 Pandas 的进阶主题,包括:
- 自定义函数
- 高级索引
- 数据导出
- 实际应用示例
自定义函数
函数应用
# 基本函数应用
def calculate_bonus(salary, performance):"""计算奖金Args:salary (float): 基本工资performance (float): 绩效分数 (0-1)Returns:float: 奖金金额"""return salary * performance * 0.1# 应用到 DataFrame
df['bonus'] = df.apply(lambda x: calculate_bonus(x['salary'], x['performance']),axis=1
)# 使用 applymap 应用到所有元素
def format_currency(x):"""格式化货币Args:x (float): 金额Returns:str: 格式化后的金额"""return f"${x:,.2f}"df = df.applymap(format_currency)# 使用 transform 保持索引
def normalize_column(x):"""标准化列Args:x (pd.Series): 输入序列Returns:pd.Series: 标准化后的序列"""return (x - x.mean()) / x.std()df['normalized_value'] = df.groupby('category')['value'].transform(normalize_column)
函数映射
# 使用 map 进行值映射
category_map = {'A': 'High','B': 'Medium','C': 'Low'
}df['category_level'] = df['category'].map(category_map)# 使用 replace 进行值替换
df['status'] = df['status'].replace({'active': 1,'inactive': 0,'pending': -1
})# 使用 apply 进行复杂映射
def map_complex_value(row):"""复杂值映射Args:row (pd.Series): 行数据Returns:str: 映射后的值"""if row['age'] < 18:return 'Minor'elif row['age'] < 65:return 'Adult'else:return 'Senior'df['age_group'] = df.apply(map_complex_value, axis=1)
函数优化
# 使用 numba 优化函数
from numba import jit@jit(nopython=True)
def calculate_statistics(values):"""计算统计量Args:values (np.array): 数值数组Returns:tuple: (均值, 标准差)"""mean = 0.0for x in values:mean += xmean /= len(values)std = 0.0for x in values:std += (x - mean) ** 2std = (std / len(values)) ** 0.5return mean, std# 使用向量化操作
def vectorized_calculation(df):"""向量化计算Args:df (pd.DataFrame): 输入数据Returns:pd.DataFrame: 计算结果"""# 使用 numpy 的向量化操作df['result'] = np.where(df['value'] > df['threshold'],df['value'] * 1.1,df['value'] * 0.9)return df# 使用并行处理
from multiprocessing import Pooldef parallel_process(df, func, n_cores=4):"""并行处理Args:df (pd.DataFrame): 输入数据func (function): 处理函数n_cores (int): 核心数Returns:pd.DataFrame: 处理结果"""# 分割数据chunks = np.array_split(df, n_cores)# 创建进程池pool = Pool(n_cores)# 并行处理results = pool.map(func, chunks)# 合并结果return pd.concat(results)
高级索引
多级索引
# 创建多级索引
df = pd.DataFrame({'value': np.random.randn(100),'category': np.random.choice(['A', 'B', 'C'], 100),'subcategory': np.random.choice(['X', 'Y', 'Z'], 100)
})# 设置多级索引
df = df.set_index(['category', 'subcategory'])# 使用多级索引
# 选择特定类别
df.loc['A']# 选择特定子类别
df.loc[('A', 'X')]# 使用 xs 进行交叉选择
df.xs('X', level='subcategory')# 重置索引
df = df.reset_index()# 使用 stack 和 unstack
df_stacked = df.stack()
df_unstacked = df.unstack()
索引操作
# 设置索引
df = df.set_index('date')# 重置索引
df = df.reset_index()# 重命名索引
df.index.name = 'date'
df.index = df.index.rename('date')# 索引排序
df = df.sort_index()# 索引对齐
df1 = pd.DataFrame({'A': [1, 2, 3]}, index=['a', 'b', 'c'])
df2 = pd.DataFrame({'B': [4, 5, 6]}, index=['b', 'c', 'd'])
df_aligned = df1.align(df2, join='outer')# 索引转换
df.index = pd.to_datetime(df.index)
索引优化
# 检查索引是否唯一
is_unique = df.index.is_unique# 检查索引是否单调
is_monotonic = df.index.is_monotonic# 检查索引是否已排序
is_sorted = df.index.is_monotonic_increasing# 优化索引
def optimize_index(df):"""优化索引Args:df (pd.DataFrame): 输入数据Returns:pd.DataFrame: 优化后的数据"""# 检查索引类型if isinstance(df.index, pd.DatetimeIndex):# 确保索引已排序if not df.index.is_monotonic:df = df.sort_index()# 检查索引是否连续if not df.index.is_monotonic_increasing:df = df.reindex(pd.date_range(df.index.min(),df.index.max(),freq='D'))return df# 使用示例
df = pd.DataFrame({'value': np.random.randn(100)
}, index=pd.date_range('2023-01-01', periods=100))# 优化索引
df = optimize_index(df)
数据导出
格式转换
# 导出为 CSV
df.to_csv('output.csv', index=False)# 导出为 Excel
df.to_excel('output.xlsx', sheet_name='Sheet1', index=False)# 导出为 JSON
df.to_json('output.json', orient='records')# 导出为 SQL
from sqlalchemy import create_engine
engine = create_engine('sqlite:///database.db')
df.to_sql('table_name', engine, if_exists='replace')# 导出为 HTML
df.to_html('output.html')# 导出为 Markdown
df.to_markdown('output.md')
数据压缩
# 使用 gzip 压缩
df.to_csv('output.csv.gz', compression='gzip', index=False)# 使用 zip 压缩
df.to_csv('output.csv.zip', compression='zip', index=False)# 使用 bz2 压缩
df.to_csv('output.csv.bz2', compression='bz2', index=False)# 使用 xz 压缩
df.to_csv('output.csv.xz', compression='xz', index=False)# 自定义压缩函数
def compress_data(df, output_file, compression='gzip'):"""压缩数据Args:df (pd.DataFrame): 输入数据output_file (str): 输出文件路径compression (str): 压缩方式"""df.to_csv(output_file, compression=compression, index=False)
批量处理
# 批量导出
def batch_export(df, output_dir, chunk_size=10000):"""批量导出数据Args:df (pd.DataFrame): 输入数据output_dir (str): 输出目录chunk_size (int): 块大小"""# 创建输出目录import osos.makedirs(output_dir, exist_ok=True)# 分块导出for i, chunk in enumerate(np.array_split(df, len(df) // chunk_size + 1)):output_file = os.path.join(output_dir, f'chunk_{i}.csv')chunk.to_csv(output_file, index=False)# 批量转换格式
def batch_convert(input_dir, output_dir, input_format='csv', output_format='excel'):"""批量转换格式Args:input_dir (str): 输入目录output_dir (str): 输出目录input_format (str): 输入格式output_format (str): 输出格式"""# 创建输出目录import osos.makedirs(output_dir, exist_ok=True)# 获取输入文件列表input_files = [f for f in os.listdir(input_dir) if f.endswith(f'.{input_format}')]# 转换每个文件for input_file in input_files:# 读取输入文件input_path = os.path.join(input_dir, input_file)df = pd.read_csv(input_path)# 生成输出文件名output_file = os.path.splitext(input_file)[0] + f'.{output_format}'output_path = os.path.join(output_dir, output_file)# 导出文件if output_format == 'excel':df.to_excel(output_path, index=False)elif output_format == 'json':df.to_json(output_path, orient='records')elif output_format == 'csv':df.to_csv(output_path, index=False)
总结
进阶主题部分涵盖了:
- 自定义函数(函数应用、函数映射、函数优化)
- 高级索引(多级索引、索引操作、索引优化)
- 数据导出(格式转换、数据压缩、批量处理)
- 实际应用示例
掌握这些进阶主题对于提升 Pandas 使用水平至关重要,它可以帮助我们:
- 提高代码效率
- 优化数据处理
- 增强数据导出能力
- 提升代码质量
建议在实际项目中注意:
- 合理使用自定义函数
- 优化索引操作
- 选择合适的导出格式
- 注意数据压缩
- 考虑批量处理
- 保持代码可维护性
- 持续学习新特性