目录
- 背景说明
- ORM与模型优化
- 数据量处理策略
- 接口与报表优化
- 系统架构优化
- 监控与诊断工具
- 项目实战总结(案例)
- 后续优化建议
- 性能优化检查清单
- 总结
一、背景说明
在 Odoo 项目中,随着业务不断扩展,系统常常面临如下挑战:
- 模型数据量爆炸(百万级记录)
- 用户并发量高,接口响应慢
- 报表统计卡顿或失败
- 后台任务执行效率低
本总结文档归纳了我们在 Odoo 实战中应对大型业务系统常见性能问题的优化策略。
二、ORM与模型优化
1. 字段索引优化
- 为常用于搜索、排序、过滤的字段添加索引:
field_name = fields.Char(index=True)
- 使用 PostgreSQL 手动添加联合索引:
CREATE INDEX idx_order_partner_date ON sale_order (partner_id, date_order);
2. 避免 Python 层过滤
- 替换
.filtered()
/.mapped()
逻辑,改为.search(domain)
下推到数据库:records = model.search([('state', '=', 'done')])
3. 使用 with_prefetch 优化权限机制
- 避免 N+1 查询:
records = self.env["model"].with_prefetch().search([...])
三、数据量处理策略
1. 分页加载与游标优化
- 所有列表接口使用 limit/offset 控制分页:
self.env['model'].search(domain, limit=100, offset=0)
- 对于大数据量,使用游标分页避免深度分页性能问题:
# 基于ID的游标分页 last_id = request.params.get('last_id', 0) records = self.env['model'].search([('id', '>', last_id)], limit=100, order='id')
- 使用
search_read()
代替search()
+read()
减少数据库查询:data = self.env['model'].search_read(domain, fields=['name', 'date'], limit=100)
2. 批量操作优化
- 使用
create()
批量创建,避免逐条插入:# 批量创建,一次性创建多条记录 vals_list = [{'name': f'record_{i}'} for i in range(1000)] records = self.env['model'].create(vals_list)
- 批量更新使用
write()
或 SQL 语句:# 批量更新 records = self.env['model'].search([('state', '=', 'draft')]) records.write({'state': 'confirmed'})# 直接SQL更新(更高效) self.env.cr.execute("""UPDATE model_table SET state = 'confirmed' WHERE state = 'draft' AND date < %s """, (cutoff_date,))
- 使用
unlink()
批量删除:# 分批删除避免内存溢出 batch_size = 1000 total_deleted = 0 while True:records = self.env['model'].search([('to_delete', '=', True)], limit=batch_size)if not records:breakrecords.unlink()total_deleted += len(records)self.env.cr.commit() # 分批提交
3. 历史数据归档与生命周期管理
- 将历史记录归档到归档模型或外部数据库:
from odoo import models, fields, api from datetime import timedeltaclass SaleOrderArchive(models.Model):_name = 'sale.order.archive'_description = 'Sale Order Archive'_auto = False # 不自动创建表_table = 'sale_order_archive' # 指向归档表# 归档逻辑 @api.model def archive_old_records(self):cutoff_date = fields.Date.today() - timedelta(days=365)old_orders = self.env['sale.order'].search([('date_order', '<', cutoff_date),('state', 'in', ['done', 'cancel'])])# 复制到归档表for order in old_orders:self.env.cr.execute("""INSERT INTO sale_order_archive SELECT * FROM sale_order WHERE id = %s""", (order.id,))# 删除原记录old_orders.unlink()self.env.cr.commit()
- 定期清理临时数据和日志:
@api.model def clean_old_data(self):# 清理超过30天的错误日志self.env['ir.logging'].search([('create_date', '<', fields.Datetime.now() - timedelta(days=30))]).unlink()# 清理过期的会话数据self.env.cr.execute("DELETE FROM ir_sessions WHERE expiry < NOW()")
4. 数据分区与水平扩展
4.1 PostgreSQL 分区表实现详解
步骤1:创建分区主表
-- 1. 备份原表数据
CREATE TABLE sale_order_backup AS SELECT * FROM sale_order;-- 2. 重建主表为分区表
DROP TABLE IF EXISTS sale_order CASCADE;
CREATE TABLE sale_order (id SERIAL,name VARCHAR,date_order DATE NOT NULL,partner_id INTEGER,state VARCHAR,amount_total NUMERIC,company_id INTEGER,-- 其他字段...PRIMARY KEY (id, date_order) -- 分区键必须包含在主键中
) PARTITION BY RANGE (date_order);-- 3. 重建索引
CREATE INDEX idx_sale_order_partner_date ON sale_order (partner_id, date_order);
CREATE INDEX idx_sale_order_state ON sale_order (state);
步骤2:创建分区子表
-- 创建历史分区(按年)
CREATE TABLE sale_order_2022 PARTITION OF sale_order
FOR VALUES FROM ('2022-01-01') TO ('2023-01-01');CREATE TABLE sale_order_2023 PARTITION OF sale_order
FOR VALUES FROM ('2023-01-01') TO ('2024-01-01');-- 创建当前年份的月分区
CREATE TABLE sale_order_2024_01 PARTITION OF sale_order
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');CREATE TABLE sale_order_2024_02 PARTITION OF sale_order
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');CREATE TABLE sale_order_2024_03 PARTITION OF sale_order
FOR VALUES FROM ('2024-03-01') TO ('2024-04-01');-- 继续创建其他月份分区...
CREATE TABLE sale_order_2024_04 PARTITION OF sale_order
FOR VALUES FROM ('2024-04-01') TO ('2024-05-01');CREATE TABLE sale_order_2024_05 PARTITION OF sale_order
FOR VALUES FROM ('2024-05-01') TO ('2024-06-01');CREATE TABLE sale_order_2024_06 PARTITION OF sale_order
FOR VALUES FROM ('2024-06-01') TO ('2024-07-01');CREATE TABLE sale_order_2024_07 PARTITION OF sale_order
FOR VALUES FROM ('2024-07-01') TO ('2024-08-01');CREATE TABLE sale_order_2024_08 PARTITION OF sale_order
FOR VALUES FROM ('2024-08-01') TO ('2024-09-01');CREATE TABLE sale_order_2024_09 PARTITION OF sale_order
FOR VALUES FROM ('2024-09-01') TO ('2024-10-01');CREATE TABLE sale_order_2024_10 PARTITION OF sale_order
FOR VALUES FROM ('2024-10-01') TO ('2024-11-01');CREATE TABLE sale_order_2024_11 PARTITION OF sale_order
FOR VALUES FROM ('2024-11-01') TO ('2024-12-01');CREATE TABLE sale_order_2024_12 PARTITION OF sale_order
FOR VALUES FROM ('2024-12-01') TO ('2025-01-01');-- 创建下年度分区
CREATE TABLE sale_order_2025_01 PARTITION OF sale_order
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
步骤3:数据迁移
-- 将备份数据插入分区表
INSERT INTO sale_order SELECT * FROM sale_order_backup;-- 验证数据完整性
SELECT schemaname,tablename,pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size,(SELECT COUNT(*) FROM sale_order WHERE date_order >= (SELECT MIN(date_order) FROM sale_order_backup) ANDdate_order < (SELECT MIN(date_order) FROM sale_order_backup) + interval '1 month') as record_count
FROM pg_tables
WHERE tablename LIKE 'sale_order_2024%'
ORDER BY tablename;
4.2 Odoo 中的分区表配置
方法1:使用视图模型(推荐)
from odoo import models, fields, toolsclass SaleOrderPartitioned(models.Model):_name = 'sale.order'_description = 'Sale Order (Partitioned)'_table = 'sale_order' # 使用分区主表_order = 'date_order desc, id desc'# 字段定义保持不变name = fields.Char('Order Reference', required=True)date_order = fields.Datetime('Order Date', required=True)partner_id = fields.Many2one('res.partner', 'Customer')state = fields.Selection([('draft', 'Quotation'),('sent', 'Quotation Sent'),('sale', 'Sales Order'),('done', 'Locked'),('cancel', 'Cancelled'),], default='draft')# 优化查询方法@api.modeldef search(self, args, offset=0, limit=None, order=None, count=False):# 如果查询条件包含日期范围,PostgreSQL会自动进行分区裁剪return super().search(args, offset=offset, limit=limit, order=order, count=count)
方法2:月度分区模型
class SaleOrderMonthly(models.Model):_name = 'sale.order.monthly'_description = 'Sale Order Monthly View'_auto = False@api.modeldef get_monthly_data(self, year, month):"""获取指定月份的订单数据"""table_name = f'sale_order_{year}_{month:02d}'# 检查分区表是否存在self.env.cr.execute("""SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = %s)""", (table_name,))if not self.env.cr.fetchone()[0]:return []# 直接查询分区表self.env.cr.execute(f"""SELECT id, name, date_order, partner_id, amount_total, stateFROM {table_name}WHERE date_order >= %s AND date_order < %sORDER BY date_order DESC""", (f'{year}-{month:02d}-01',f'{year}-{month+1:02d}-01' if month < 12 else f'{year+1}-01-01'))return self.env.cr.dictfetchall()
4.3 自动分区管理
创建分区管理函数
-- 创建自动分区函数
CREATE OR REPLACE FUNCTION create_monthly_partition(table_name TEXT, start_date DATE)
RETURNS VOID AS $$
DECLAREpartition_name TEXT;end_date DATE;
BEGIN-- 生成分区表名partition_name := table_name || '_' || to_char(start_date, 'YYYY_MM');end_date := start_date + interval '1 month';-- 检查分区是否已存在IF NOT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = partition_name) THEN-- 创建分区EXECUTE format('CREATE TABLE %I PARTITION OF %I FOR VALUES FROM (%L) TO (%L)',partition_name, table_name, start_date, end_date);-- 创建分区索引EXECUTE format('CREATE INDEX %I ON %I (partner_id, date_order)','idx_' || partition_name || '_partner_date', partition_name);RAISE NOTICE 'Created partition: %', partition_name;END IF;
END;
$$ LANGUAGE plpgsql;-- 批量创建未来12个月的分区
DO $$
DECLAREi INTEGER;start_date DATE;
BEGINstart_date := date_trunc('month', CURRENT_DATE);FOR i IN 0..11 LOOPPERFORM create_monthly_partition('sale_order', start_date + (i || ' months')::interval);END LOOP;
END;
$$;
Odoo 定时任务创建分区
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import logging_logger = logging.getLogger(__name__)class PartitionManager(models.Model):_name = 'partition.manager'_description = 'Database Partition Manager'@api.modeldef create_future_partitions(self):"""创建未来3个月的分区"""current_date = datetime.now().date()for i in range(3):future_date = current_date + relativedelta(months=i+1)partition_name = f"sale_order_{future_date.strftime('%Y_%m')}"# 检查分区是否存在self.env.cr.execute("""SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = %s)""", (partition_name,))if not self.env.cr.fetchone()[0]:# 创建分区start_date = future_date.replace(day=1)end_date = start_date + relativedelta(months=1)self.env.cr.execute(f"""CREATE TABLE {partition_name} PARTITION OF sale_orderFOR VALUES FROM ('{start_date}') TO ('{end_date}')""")# 创建索引self.env.cr.execute(f"""CREATE INDEX idx_{partition_name}_partner_date ON {partition_name} (partner_id, date_order)""")_logger.info(f"Created partition: {partition_name}")@api.modeldef cleanup_old_partitions(self):"""清理旧分区(保留2年数据)"""cutoff_date = datetime.now().date() - relativedelta(years=2)cutoff_month = cutoff_date.strftime('%Y_%m')# 查找需要清理的分区self.env.cr.execute("""SELECT tablename FROM pg_tables WHERE tablename LIKE 'sale_order_____%%' AND tablename < 'sale_order_%s'""", (cutoff_month,))old_partitions = [row[0] for row in self.env.cr.fetchall()]for partition in old_partitions:# 备份到归档表archive_table = f"{partition}_archive"self.env.cr.execute(f"""CREATE TABLE {archive_table} AS SELECT * FROM {partition}""")# 删除分区self.env.cr.execute(f"DROP TABLE {partition}")_logger.info(f"Archived and dropped partition: {partition}")
4.4 查询优化实践
分区裁剪查询示例
class SaleOrderOptimized(models.Model):_inherit = 'sale.order'@api.modeldef get_monthly_statistics(self, year, month):"""获取月度统计(自动分区裁剪)"""start_date = f'{year}-{month:02d}-01'end_date = f'{year}-{month+1:02d}-01' if month < 12 else f'{year+1}-01-01'# PostgreSQL会自动选择正确的分区self.env.cr.execute("""SELECT state,COUNT(*) as order_count,SUM(amount_total) as total_amount,AVG(amount_total) as avg_amountFROM sale_order WHERE date_order >= %s AND date_order < %sGROUP BY state""", (start_date, end_date))return self.env.cr.dictfetchall()@api.modeldef get_customer_orders_range(self, partner_id, date_from, date_to):"""获取客户订单范围查询(跨分区优化)"""# 这个查询会自动跨多个分区return self.search([('partner_id', '=', partner_id),('date_order', '>=', date_from),('date_order', '<=', date_to)], order='date_order desc')
性能监控查询
-- 查看分区裁剪效果
EXPLAIN (ANALYZE, BUFFERS)
SELECT COUNT(*) FROM sale_order
WHERE date_order >= '2024-01-01' AND date_order < '2024-02-01';-- 查看分区大小
SELECT schemaname,tablename,pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size,pg_total_relation_size(schemaname||'.'||tablename) as bytes
FROM pg_tables
WHERE tablename LIKE 'sale_order_%'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;-- 查看查询计划中的分区使用情况
SELECT query,calls,total_exec_time,mean_exec_time,rows
FROM pg_stat_statements
WHERE query LIKE '%sale_order%'
ORDER BY total_exec_time DESC;
4.5 性能提升原理
为什么分区能提升10倍性能?
-
分区裁剪(Partition Pruning):
- 查询时只扫描相关分区,减少IO
- 例:查询1月数据时,只扫描
sale_order_2024_01
表
-
并行查询:
- 跨分区查询可以并行执行
- 每个分区独立处理,提高CPU利用率
-
索引优化:
- 每个分区有独立的索引,索引更小更快
- 减少索引碎片和维护开销
-
内存效率:
- 查询缓存更有效,热数据留在内存中
- 减少不必要的数据页面加载
实际测试对比:
-- 测试:查询单月订单统计
-- 分区前(500万记录)
EXPLAIN ANALYZE SELECT COUNT(*), SUM(amount_total)
FROM sale_order_old
WHERE date_order >= '2024-01-01' AND date_order < '2024-02-01';
-- 执行时间: 8.2 秒,扫描全表-- 分区后(同样数据量)
EXPLAIN ANALYZE SELECT COUNT(*), SUM(amount_total)
FROM sale_order
WHERE date_order >= '2024-01-01' AND date_order < '2024-02-01';
-- 执行时间: 0.8 秒,只扫描1月分区
这就是为什么按月分区能够实现10倍性能提升的原因!
5. 数据库连接池与事务优化
- 配置数据库连接池参数:
# odoo.conf db_maxconn = 64 db_template = template0# PostgreSQL配置 max_connections = 200 shared_buffers = 256MB effective_cache_size = 1GB
- 长事务拆分,避免锁表:
def process_large_dataset(self):batch_size = 1000processed = 0while True:# 使用新的环境和cursorwith self.env.registry.cursor() as cr:env = self.env(cr=cr)records = env['model'].search([('processed', '=', False)], limit=batch_size)if not records:breakfor record in records:# 处理单条记录record.process()processed += 1if processed % 100 == 0:cr.commit() # 定期提交
6. 内存管理与缓存策略
- 使用
invalidate_cache()
清理ORM缓存:def process_large_batch(self):for i in range(0, total_records, batch_size):batch = records[i:i+batch_size]# 处理批次batch.process()# 清理缓存避免内存泄漏self.env.invalidate_all()# 强制垃圾回收import gcgc.collect()
- 使用
with_context()
优化权限检查:# 跳过权限检查提升性能 records = self.env['model'].with_context(active_test=False).search(domain)# 批量操作时跳过触发器 records = self.env['model'].with_context(skip_triggers=True).create(vals_list)
7. 数据导入导出优化
- 大文件导入使用流式处理:
def import_large_csv(self, file_path):import pandas as pd# 分块读取大文件chunk_size = 10000for chunk in pd.read_csv(file_path, chunksize=chunk_size):vals_list = []for _, row in chunk.iterrows():vals_list.append({'name': row['name'],'value': row['value']})# 批量创建self.env['model'].create(vals_list)self.env.cr.commit()
- 数据导出使用游标和分页:
def export_large_data(self):import csvwith open('export.csv', 'w', newline='') as csvfile:writer = csv.writer(csvfile)writer.writerow(['ID', 'Name', 'Date']) # 写入表头offset = 0batch_size = 10000while True:records = self.env['model'].search([], limit=batch_size, offset=offset)if not records:breakfor record in records:writer.writerow([record.id, record.name, record.date])offset += batch_size# 清理缓存self.env.invalidate_all()
8. 并发控制与锁机制
- 使用数据库级别的锁避免并发冲突:
def process_with_lock(self, record_id):# 使用行级锁self.env.cr.execute("""SELECT id FROM model_table WHERE id = %s FOR UPDATE NOWAIT""", (record_id,))try:record = self.env['model'].browse(record_id)record.process()self.env.cr.commit()except Exception:self.env.cr.rollback()raise
- 使用Redis分布式锁:
import redisdef process_with_redis_lock(self, key):redis_client = redis.Redis(host='localhost', port=6379, db=0)lock_key = f"lock:{key}"# 获取分布式锁if redis_client.set(lock_key, "locked", nx=True, ex=300):try:# 执行业务逻辑self.process_business_logic()finally:# 释放锁redis_client.delete(lock_key)else:raise UserError("资源正在处理中,请稍后再试")
9. 数据压缩与存储优化
- 使用PostgreSQL的TOAST机制存储大字段:
# 大文本字段使用压缩存储 large_text = fields.Text(compress=True)
- 文件存储使用对象存储服务:
def store_large_file(self, file_data):# 使用阿里云OSS或AWS S3存储大文件import oss2bucket = oss2.Bucket(auth, endpoint, bucket_name)file_key = f"files/{self.id}/{filename}"# 上传文件bucket.put_object(file_key, file_data)# 只在数据库中存储文件路径self.file_url = f"https://{bucket_name}.{endpoint}/{file_key}"
10. 监控与预警机制
- 监控数据库表大小和增长趋势:
-- 查看表大小 SELECT schemaname,tablename,pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size FROM pg_tables WHERE schemaname = 'public' ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;
- 设置数据量预警:
@api.model def check_data_volume(self):# 检查关键表的数据量large_tables = ['sale.order','account.move','stock.move']for table in large_tables:count = self.env[table].search_count([])if count > 1000000: # 超过100万条记录# 发送预警邮件self.send_alert_email(f"表 {table} 数据量过大: {count} 条记录")
11. 数据质量与清理策略
- 定期清理重复数据:
def clean_duplicate_records(self):# 查找重复记录self.env.cr.execute("""SELECT name, email, array_agg(id) as ids, count(*)FROM res_partner WHERE email IS NOT NULLGROUP BY name, emailHAVING count(*) > 1""")duplicates = self.env.cr.dictfetchall()for dup in duplicates:ids = dup['ids']# 保留第一条,删除其他self.env['res.partner'].browse(ids[1:]).unlink()
- 数据一致性检查:
@api.model def check_data_consistency(self):# 检查订单明细总额与订单总额是否一致inconsistent_orders = self.env['sale.order'].search([('amount_total', '!=', 0)]).filtered(lambda o: abs(o.amount_total - sum(o.order_line.mapped('price_subtotal'))) > 0.01)if inconsistent_orders:# 记录日志或发送通知import logging_logger = logging.getLogger(__name__)_logger.warning(f"发现 {len(inconsistent_orders)} 个金额不一致的订单")
四、接口与报表优化
1. Redis 缓存热点数据
- 使用
ir.cache
或集成 Redis 缓存频繁访问的统计数据,如看板、图表等。
2. 异步任务处理
- 导入导出、批量操作、报表生成等使用 Odoo
ir.cron
、Celery + Redis 或外部服务异步执行。
3. 大报表处理策略
- 拆分子模块单独处理数据,使用 Pandas / SQL 脚本分析后再写入 Odoo。
- 可结合 QWeb 报表批量输出 PDF / Excel 文件。
五、系统架构优化
1. 多 Worker 支持并发
- 启动参数设置:
--workers=8 --max-cron-threads=2
- workers 数量建议为:
(CPU核心数 * 2) + 1
2. 数据库读写分离
- 使用 PostgreSQL 主从架构,结合 Nginx / 自定义 DB Router 读写分离。
3. 服务拆分与微服务
- 将 Odoo 作为主业务系统,重计算部分拆出为 FastAPI / Flask 微服务。
- 使用 RabbitMQ/Kafka 做数据桥接。
六、监控与诊断工具
1. SQL 分析
- 使用
EXPLAIN ANALYZE
、pg_stat_statements 插件查看慢 SQL。
2. 性能监控
- 配置 Prometheus + Grafana 监控数据库连接数、IO、接口耗时。
- 使用 Odoo
logger.setLevel(logging.DEBUG)
捕获 ORM 行为。
3. 压力测试
- 使用
locust
,wrk
,ab
工具做接口并发测试,评估系统负载能力。
七、项目实战总结(案例)
案例一:电商订单系统优化
某电商客户订单系统月处理订单超 1000 万条,用户并发上百。
挑战:
- 订单表 5000 万条记录,单表查询超时
- 用户下单高峰期系统响应缓慢
- 财务月结报表生成耗时 8+ 小时
解决方案:
- 数据分区:按月对订单表进行分区,查询性能提升 10 倍
- 批量优化:订单导入改为批量创建,从 1000 条/分钟提升到 50000 条/分钟
- 异步处理:报表生成改为 Celery 异步 + Redis 缓存,生成时间降至 30 分钟
- 历史归档:3 年前订单数据归档到历史表,主表数据量减少 60%
效果:
- 接口响应时间从 3-5 秒降至 200-500ms
- 系统支持并发用户数从 100 提升到 500+
- 数据库查询性能整体提升 5-8 倍
案例二:制造业ERP系统大数据处理
某制造企业 ERP 系统,库存移动记录日增长 100 万条。
挑战:
- 库存移动表 2 亿条记录,库存计算严重超时
- 生产计划报表生成失败,数据量过大
- 系统内存占用过高,频繁崩溃
解决方案:
- 数据生命周期管理:历史库存移动按年归档,保留 2 年热数据
- 内存管理优化:大批量处理增加缓存清理和垃圾回收
- 并发控制:关键业务操作增加分布式锁,避免数据冲突
- 数据压缩:大文本字段启用压缩存储,减少 40% 存储空间
效果:
- 库存计算时间从 2 小时缩短到 15 分钟
- 系统内存使用率降低 50%
- 数据一致性问题减少 95%
案例三:金融交易系统优化
某金融公司交易系统,日处理交易记录 500 万条。
挑战:
- 交易记录实时同步延迟高
- 风控计算涉及大量历史数据查询
- 监管报表生成需要全量数据分析
解决方案:
- 流式处理:大文件导入改为流式分块处理
- 预计算缓存:风控指标预计算并缓存到 Redis
- 数据质量监控:建立自动化数据质量检查和修复机制
- 读写分离:报表查询使用只读副本,减少主库压力
效果:
- 数据同步延迟从 30 分钟降至 5 分钟
- 风控计算响应时间提升 20 倍
- 监管报表生成时间从 6 小时降至 1 小时
案例四:大型采购管理系统优化
某大型集团采购管理系统,管理数百万级采购记录,涉及全球多个子公司。
挑战:
- 采购记录表超过 800 万条数据,查询响应时间 8+ 秒
- 采购统计报表生成耗时超过 30 分钟,用户体验极差
- 高峰期(月末结算)系统经常超时崩溃
- 采购数据导出功能因数据量过大频繁失败
- 多维度统计查询(按供应商、类别、时间等)性能极差
详细解决方案:
-
关键字段索引优化:
-- 采购订单核心索引 CREATE INDEX idx_purchase_order_supplier_date ON purchase_order (partner_id, date_order); CREATE INDEX idx_purchase_order_state_company ON purchase_order (state, company_id); CREATE INDEX idx_purchase_order_category_date ON purchase_order (category_id, date_order);-- 采购明细索引 CREATE INDEX idx_purchase_line_product_date ON purchase_order_line (product_id, date_planned); CREATE INDEX idx_purchase_line_amount ON purchase_order_line (price_subtotal) WHERE price_subtotal > 0;
-
异步报表生成系统:
from odoo import api, models, fields import json from datetime import datetime, timedeltaclass PurchaseReportGenerator(models.Model):_name = 'purchase.report.generator'_description = '采购报表异步生成器'@api.modeldef generate_monthly_report_async(self, month, year):"""异步生成月度采购报表"""# 创建异步任务task = self.env['ir.cron'].create({'name': f'采购月报生成_{year}_{month}','model_id': self.env.ref('purchase.model_purchase_report_generator').id,'state': 'code','code': f'model.execute_monthly_report({month}, {year})','interval_number': 1,'interval_type': 'minutes','numbercall': 1,'active': True,})return {'type': 'ir.actions.client','tag': 'display_notification','params': {'message': f'月度报表生成任务已启动,任务ID: {task.id}','type': 'success',}}def execute_monthly_report(self, month, year):"""执行月度报表生成"""start_date = datetime(year, month, 1)end_date = (start_date + timedelta(days=32)).replace(day=1) - timedelta(days=1)# 分批查询数据避免内存溢出batch_size = 10000offset = 0report_data = []while True:orders = self.env['purchase.order'].search([('date_order', '>=', start_date),('date_order', '<=', end_date),('state', 'in', ['purchase', 'done'])], limit=batch_size, offset=offset)if not orders:break# 处理批次数据batch_data = self._process_batch_data(orders)report_data.extend(batch_data)offset += batch_size# 清理缓存self.env.invalidate_all()# 保存报表到文件或数据库self._save_report_data(report_data, month, year)
-
Redis 缓存统计数据:
import redis import json from datetime import datetime, timedeltaclass PurchaseStatistics(models.Model):_name = 'purchase.statistics'_description = '采购统计缓存'def _get_redis_client(self):"""获取Redis客户端"""return redis.Redis(host='localhost', port=6379, db=0)@propertydef cache_timeout(self):return 3600 # 1小时缓存@api.modeldef get_supplier_statistics(self, date_from, date_to):"""获取供应商统计数据(带缓存)"""redis_client = self._get_redis_client()cache_key = f"supplier_stats:{date_from}:{date_to}"# 尝试从缓存获取cached_data = redis_client.get(cache_key)if cached_data:return json.loads(cached_data)# 缓存不存在,查询数据库stats = self._calculate_supplier_statistics(date_from, date_to)# 存入缓存redis_client.setex(cache_key, self.cache_timeout, json.dumps(stats, default=str))return statsdef _calculate_supplier_statistics(self, date_from, date_to):"""计算供应商统计数据"""self.env.cr.execute("""SELECT rp.name as supplier_name,COUNT(po.id) as order_count,SUM(po.amount_total) as total_amount,AVG(po.amount_total) as avg_amountFROM purchase_order poJOIN res_partner rp ON po.partner_id = rp.idWHERE po.date_order >= %s AND po.date_order <= %sAND po.state IN ('purchase', 'done')GROUP BY rp.id, rp.nameORDER BY total_amount DESCLIMIT 100""", (date_from, date_to))return self.env.cr.dictfetchall()
-
分页导出优化:
import csv import io import zipfile from odoo import http from odoo.http import request, Responseclass PurchaseExportController(http.Controller):@http.route('/purchase/export/large', type='http', auth='user', methods=['POST'])def export_large_data(self, **kwargs):"""大数据量分页导出"""domain = json.loads(kwargs.get('domain', '[]'))# 创建临时zip文件zip_buffer = io.BytesIO()with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:batch_size = 50000offset = 0file_count = 1while True:# 分批查询orders = request.env['purchase.order'].search(domain, limit=batch_size, offset=offset,order='date_order desc')if not orders:break# 创建CSV文件csv_buffer = io.StringIO()writer = csv.writer(csv_buffer)# 写入表头if offset == 0:writer.writerow(['订单号', '供应商', '订单日期', '状态', '总金额', '币种'])# 写入数据for order in orders:writer.writerow([order.name,order.partner_id.name,order.date_order.strftime('%Y-%m-%d'),dict(order._fields['state'].selection)[order.state],order.amount_total,order.currency_id.name])# 添加到zip文件zip_file.writestr(f'采购订单_{file_count:03d}.csv',csv_buffer.getvalue().encode('utf-8-sig'))offset += batch_sizefile_count += 1# 清理缓存request.env.invalidate_all()# 返回zip文件zip_buffer.seek(0)return Response(zip_buffer.read(),headers=[('Content-Type', 'application/zip'),('Content-Disposition', 'attachment; filename="采购数据导出.zip"')])
-
多维度查询优化:
from odoo import models, toolsclass PurchaseAnalysis(models.Model):_name = 'purchase.analysis'_description = '采购分析'_auto = False# 创建物化视图提升查询性能def init(self):tools.drop_view_if_exists(self.env.cr, self._table)self.env.cr.execute("""CREATE OR REPLACE VIEW %s AS (SELECT row_number() OVER () AS id,po.id as order_id,po.name as order_name,po.date_order,extract(year from po.date_order) as year,extract(month from po.date_order) as month,po.partner_id,rp.name as partner_name,po.company_id,po.state,po.amount_total,po.currency_id,COUNT(pol.id) as line_count,AVG(pol.price_unit) as avg_priceFROM purchase_order poLEFT JOIN purchase_order_line pol ON po.id = pol.order_idLEFT JOIN res_partner rp ON po.partner_id = rp.idWHERE po.state IN ('purchase', 'done')GROUP BY po.id, rp.name)""" % self._table)
优化效果:
- 查询响应时间:从 8+ 秒优化到 800ms 以内
- 报表生成时间:从 30+ 分钟缩短到 3-5 分钟
- 数据导出成功率:从 60% 提升到 99%+
- 系统稳定性:月末高峰期零崩溃,支持 200+ 并发用户
- 用户满意度:显著提升,投诉率降低 90%
- 服务器资源使用:CPU 使用率降低 40%,内存使用更加稳定
关键技术指标:
- 处理数据量:800+ 万采购记录
- 并发用户数:200+
- 查询响应时间:< 1 秒
- 报表生成时间:< 5 分钟
- 数据导出速度:50,000 条/分钟
- 系统可用性:99.9%+
八、后续优化建议
- 增加模型数据生命周期管理模块,定期清理无效数据。
- 自定义 ORM 缓存中间层,降低重复查询。
- 引入 ClickHouse / Elasticsearch 做大数据分析。
九、性能优化检查清单
🔍 数据库层面
- 关键字段已添加索引
- 复合查询已创建联合索引
- 大表已实施分区策略
- 慢查询已识别并优化
- 数据库连接池已优化配置
🔧 代码层面
- 避免使用
.filtered()
和.mapped()
进行数据过滤 - 批量操作使用
create()
、write()
、unlink()
- 大数据查询使用
search_read()
- 分页查询使用游标而非深度分页
- 长事务已拆分为小批次处理
🗄️ 数据管理
- 历史数据已设置归档策略
- 临时数据有定期清理机制
- 重复数据有检测和清理
- 数据一致性有监控机制
🚀 性能监控
- 关键接口响应时间 < 2 秒
- 数据库查询平均耗时 < 500ms
- 系统内存使用率 < 80%
- 数据库连接数在合理范围
- 定期生成性能报告
📊 系统架构
- Worker 数量根据 CPU 核心数配置
- 读写分离已实施(如需要)
- 缓存策略已部署
- 异步任务处理已配置
- 监控告警系统已建立
📌 总结
Odoo 是一个强大的框架,但在面对高并发、高数据量业务时,必须结合数据库优化、异步架构、缓存机制和运维监控手段进行系统性的优化。
本指南提供了从基础优化到高级架构的完整解决方案,包含:
- 11个核心数据处理策略
- 4个完整实战案例
- 详细的分区表实施指南
- 性能优化检查清单
通过系统性地应用这些优化策略,您的 Odoo 项目能够:
- 支持百万级数据量
- 处理200+并发用户
- 实现秒级响应时间
- 保持99.9%+系统可用性】