EF Core 批量写与“软实时”一致性:ExecuteUpdate / COPY / SqlBulkCopy 的取舍与事务权衡 ✨
📚 目录
- EF Core 批量写与“软实时”一致性:ExecuteUpdate / COPY / SqlBulkCopy 的取舍与事务权衡 ✨
- 1. 术语与目标 🧭
- 2. 技术选型总览 🧰
- 3. 建模与热点隔离 🌋
- 4. 写通道:背压 + 微批 🛣️
- 5. 三类批量写法与工程细节 🔧
- 5.1 集合级更新/删除:`ExecuteUpdate / ExecuteDelete` ✍️
- 5.2 PostgreSQL:`COPY BINARY`(Npgsql) 🐘🚀
- 5.3 SQL Server:`SqlBulkCopy`(常规 + 流式)🧱⚙️
- 6. 事务与隔离级别 🧪
- 7. 一致性策略(面向业务的选择)⚖️
- 8. 读写一致性与缓存 🧩
- 9. 可观测性与告警(最低集)📈🔔
- 10. 实验与基准 🧪🧪
- 10.1 微基准(BenchmarkDotNet)
- 10.2 压测(k6):**uuidv4()** ✅
- 11. 失败与回滚策略 🛡️
- 12. 代码与配置清单 🧱
- 12.1 模型
- 12.2 DbContext(PG/SQL Server 通用)
- 12.3 批量实现(核心方法)
- 12.4 Web API 入队口(限流/限包/校验)
- 12.5 appsettings(关键参数)
- 13. 选型决策树 🌳
TL;DR
插入为主:PostgreSQL 选 COPY(Binary);SQL Server 选 SqlBulkCopy。
更新/删除为主:优先 ExecuteUpdate/ExecuteDelete(集合级 DML,无需加载实体)。
Upsert:PG 用
INSERT ... ON CONFLICT DO UPDATE
;SQL Server 用MERGE
(小批量+唯一约束,谨慎)。一致性:高吞吐优先 有界通道 + 微批,以时间窗 + 条数双门限触发;读侧承诺软实时 SLO并可回补。
要点:
COPY FROM
会触发表触发器/检查约束(不触发 rules);timestamptz
只接受 UTC。SqlBulkCopy
默认不触发触发器/不检查约束;可用选项显式开启;超大批量改流式。
1. 术语与目标 🧭
- 批量写:一次提交多行 DML,追求吞吐、降低往返/日志/WAL 开销。
- 软实时一致性:允许读模型滞后几十毫秒~数秒,但有可观测上限(SLO)与回补(幂等补偿/重放)。
- 目标:最大化写吞吐(不拖垮库)、读延迟可控、失败可回收、全链路可观测。
2. 技术选型总览 🧰
场景 | 首选 | 备选 | 关键点 | |
---|---|---|---|---|
批量插入(PG) | COPY (Binary) | 多值 INSERT/ON CONFLICT | COPY 吞吐最佳;COPY FROM 触发表触发器、检查约束(不触发 rules)。 | |
批量插入(SQL Server) | SqlBulkCopy | 多值 INSERT/MERGE | 默认不触发触发器/不检查约束,需 `FireTriggers | CheckConstraints;可用 TableLock`。 |
批量更新/删除 | ExecuteUpdate/ExecuteDelete | MERGE/手写 SQL | 集合级 DML,不载入实体,单语句更新/删除。 | |
Upsert | PG ON CONFLICT DO UPDATE | SQL Server MERGE | 有唯一约束/幂等键;MERGE 历史缺陷多,生产务必小批+足量测试。 | |
读你所写 | 小批同步提交+版本水位 | —— | 吞吐受限,适合强一致界面。 | |
软实时(推荐) | 有界通道+微批刷写 | Outbox/事件→读库 | 给定追平上限(SLO);提供刷新/通知与水位线。 |
数据通路(分层 + 批触发条件):
3. 建模与热点隔离 🌋
- 幂等键:如
(tenant_id, business_id)
唯一约束,支撑 Upsert 与重放;Upsert 日志需记录影响行数与冲突键用于审计。 - 分区/分片:分散写热点到不同分区/索引;SQL Server 可使用分区表与合适
FILLFACTOR
,PG 可用声明式分区。 - 索引策略:大批量导入前可暂时禁用或移除次要非聚集索引,导入后再重建;或设置较低
FILLFACTOR
减少页分裂。 - 任务/队列表(PG):
FOR UPDATE SKIP LOCKED
支持多消费者不阻塞拉取,适合软实时流水线。
4. 写通道:背压 + 微批 🛣️
关键:有界通道(满→等待/丢弃/降级)、事件驱动消费(
WaitToReadAsync
)、双门限(条数/时间)、错误防护/死信、幂等重放。
// Program.cs
builder.Services.AddHostedService<BulkWriter>();
builder.Services.AddSingleton(Channel.CreateBounded<WriteItem>(new BoundedChannelOptions(50_000) {FullMode = BoundedChannelFullMode.Wait, // DropOldest/DropNewest/DropWrite 按需选择SingleReader = true, SingleWriter = false}));// (可选)DbContext 池化,降低分配开销
builder.Services.AddDbContextPool<AppDbContext>(o => /* options */);
public sealed class BulkWriter : BackgroundService
{private readonly Channel<WriteItem> _ch;private readonly IServiceProvider _sp;private const int MaxBatch = 5000;private static readonly TimeSpan MaxWait = TimeSpan.FromMilliseconds(100);public BulkWriter(Channel<WriteItem> ch, IServiceProvider sp) { _ch = ch; _sp = sp; }protected override async Task ExecuteAsync(CancellationToken ct){var buffer = new List<WriteItem>(MaxBatch);var lastFlush = System.Diagnostics.Stopwatch.StartNew();while (await _ch.Reader.WaitToReadAsync(ct)){while (_ch.Reader.TryRead(out var item)){buffer.Add(item);if (buffer.Count >= MaxBatch) break;}if (buffer.Count >= MaxBatch || lastFlush.Elapsed >= MaxWait){await FlushSafeAsync(buffer, ct);buffer.Clear();lastFlush.Restart();}}// drainif (buffer.Count > 0) await FlushSafeAsync(buffer, ct);}private async Task FlushSafeAsync(List<WriteItem> batch, CancellationToken ct){if (batch.Count == 0) return;try { await FlushAsync(batch, ct); }catch (Exception ex){// TODO: 打点/日志await DeadLetterSink.WriteAsync(batch, ex, ct); // -> 死信}}private async Task FlushAsync(List<WriteItem> batch, CancellationToken ct){using var scope = _sp.CreateScope();var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();if (db.Database.IsNpgsql()) await BulkImpl.PostgresCopyAsync(db, batch, ct);else if (db.Database.IsSqlServer()){if (batch.Count >= 100_000)await BulkImpl.SqlServerBulkCopyStreamAsync(db, batch, ct); // 超大批:流式elseawait BulkImpl.SqlServerBulkCopyAsync(db, batch, ct); // 常规:DataTable}}
}
时序(背压/Accepted/批刷写):
5. 三类批量写法与工程细节 🔧
5.1 集合级更新/删除:ExecuteUpdate / ExecuteDelete
✍️
var cutoff = DateTimeOffset.UtcNow.AddHours(-1);
var affected = await db.Orders.Where(o => o.TenantId == tenant && o.Status == OrderStatus.Pending && o.UpdatedAt < cutoff).ExecuteUpdateAsync(s => s.SetProperty(o => o.Status, _ => OrderStatus.Closed).SetProperty(o => o.UpdatedAt, _ => DateTimeOffset.UtcNow), ct);
// 记录 affected 便于审计;注意:该路径绕开乐观并发标记/行版本
5.2 PostgreSQL:COPY BINARY
(Npgsql) 🐘🚀
await using var conn = (NpgsqlConnection)db.Database.GetDbConnection();
if (conn.State != ConnectionState.Open) await conn.OpenAsync(ct);await using var wr = conn.BeginBinaryImport("COPY public.metrics (tenant_id, id, ts, value) FROM STDIN (FORMAT BINARY)");foreach (var r in rows)
{wr.StartRow();wr.Write(r.TenantId, NpgsqlDbType.Text);wr.Write(r.Id, NpgsqlDbType.Uuid);// ✅ timestamptz 只接受 UTC(Offset=0)wr.Write(r.Ts.ToUniversalTime(), NpgsqlDbType.TimestampTz);wr.Write(r.Value, NpgsqlDbType.Double);
}
await wr.CompleteAsync(ct); // 未 Complete/Dispose 即取消并回滚
多次 COPY 纳入同一事务:外层
BeginTransaction()
,多次 COPY 后统一Commit()
。
5.3 SQL Server:SqlBulkCopy
(常规 + 流式)🧱⚙️
常规(DataTable) —— 便于快速复现:
await using var conn = (SqlConnection)db.Database.GetDbConnection();
if (conn.State != ConnectionState.Open) await conn.OpenAsync(ct);
await using var tx = await conn.BeginTransactionAsync(ct);using var bulk = new SqlBulkCopy(conn,SqlBulkCopyOptions.CheckConstraints | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.TableLock,(SqlTransaction)tx)
{DestinationTableName = "[dbo].[metrics]",BatchSize = 5000,BulkCopyTimeout = 120
};var table = new DataTable();
table.Columns.Add("tenant_id", typeof(string));
table.Columns.Add("id", typeof(Guid));
table.Columns.Add("ts", typeof(DateTimeOffset)); // -> datetimeoffset
table.Columns.Add("value", typeof(double));
foreach (var r in rows) table.Rows.Add(r.TenantId, r.Id, r.Ts, r.Value);bulk.ColumnMappings.Add("tenant_id", "tenant_id");
bulk.ColumnMappings.Add("id", "id");
bulk.ColumnMappings.Add("ts", "ts");
bulk.ColumnMappings.Add("value", "value");bulk.NotifyAfter = 5000;
bulk.SqlRowsCopied += (_, e) => Console.WriteLine($"Copied: {e.RowsCopied}");await bulk.WriteToServerAsync(table, ct);
await tx.CommitAsync(ct);
超大批(流式:低内存) —— 切换阈值示例 >= 100_000
:
public static async Task SqlServerBulkCopyStreamAsync(AppDbContext db, List<WriteItem> rows, CancellationToken ct)
{var conn = (SqlConnection)db.Database.GetDbConnection();if (conn.State != ConnectionState.Open) await conn.OpenAsync(ct);var meta = new[]{new SqlMetaData("tenant_id", SqlDbType.NVarChar, 128),new SqlMetaData("id", SqlDbType.UniqueIdentifier),new SqlMetaData("ts", SqlDbType.DateTimeOffset),new SqlMetaData("value", SqlDbType.Float)};IEnumerable<SqlDataRecord> Stream(){foreach (var r in rows){var rec = new SqlDataRecord(meta);rec.SetString(0, r.TenantId);rec.SetGuid(1, r.Id);rec.SetDateTimeOffset(2, r.Ts);rec.SetDouble(3, r.Value);yield return rec;}}using var bulk = new SqlBulkCopy(conn){DestinationTableName = "[dbo].[metrics]",BatchSize = 5000,BulkCopyTimeout = 120};await bulk.WriteToServerAsync(Stream().GetEnumerator(), ct);
}
说明:
SqlBulkCopyOptions
默认不触发触发器/不检查约束;如需等效常规 DML,请显式开启。若有标识列并需保留源值,使用KeepIdentity
。
6. 事务与隔离级别 🧪
-
小批多提 vs 大批一提:小批降低锁持有和回滚代价;大批吞吐更高但失败成本大。
-
隔离级别:
- SQL Server:启用 RCSI(
READ_COMMITTED_SNAPSHOT ON
)在读已提交下读取版本,减少读写冲突。 - PostgreSQL:任务/队列使用
FOR UPDATE SKIP LOCKED
避免“抢同一行”。
- SQL Server:启用 RCSI(
-
幂等:唯一键 + Upsert,保证“效果一次”。
7. 一致性策略(面向业务的选择)⚖️
策略 | 描述 | 读你所写 | 吞吐 | 实施 |
---|---|---|---|---|
强同步 | API 等到落库 | 强 | 中 | 小批同步提交 |
软实时(推荐) | API 快速 ACK,后台批写,承诺追平上限(SLO) | 可配 | 高 | 有界通道+微批+“水位线” |
最终一致 | 读库异步构建 | 弱 | 最高 | Outbox/Event→读库 |
一致性战略图(含 p99 SLA):
8. 读写一致性与缓存 🧩
- 写后短期写侧缓存(30–120s TTL),键含
(tenant,key,version)
; - 批更新后发布失效事件或写水位线(如
last_applied_offset
),供前端判断“是否追平”。
9. 可观测性与告警(最低集)📈🔔
- 写通道:入队速率、队列深度、高/低水位、批大小、批耗时 p95/p99、失败/重试率。
- 数据库:锁等待、WAL/日志增长、检查点时间、索引膨胀。
- SLO:入队→可读 p99 ≤ 2s,连续 5 分钟越界告警。
- 实现建议:统一封装 Prometheus/OpenTelemetry 指标(把
SqlRowsCopied
、EFLogTo
、队列深度等采集起来)。
10. 实验与基准 🧪🧪
10.1 微基准(BenchmarkDotNet)
[MemoryDiagnoser]
public class BulkBench
{private readonly List<WriteItem> _batch = DataGen.Generate(5000);private AppDbContext _db = default!;[GlobalSetup]public void Setup() => _db = DbFactory.Create();[Benchmark] public Task PG_Copy() => BulkImpl.PostgresCopyAsync(_db, _batch, CancellationToken.None);[Benchmark] public Task SQL_BulkCopy() => BulkImpl.SqlServerBulkCopyAsync(_db, _batch, CancellationToken.None);[Benchmark] public Task EF_ExecuteUpdate() => BulkImpl.ExecuteUpdateAsync(_db, CancellationToken.None);
}
10.2 压测(k6):uuidv4() ✅
import http from 'k6/http';
import { sleep } from 'k6';
import { uuidv4 } from 'https://jslib.k6.io/k6-utils/1.4.0/index.js';export const options = { vus: 50, duration: '2m' };function genItems(n) {const items = [];for (let i = 0; i < n; i++) {items.push({tenantId: 't1',id: uuidv4(), // ✅ts: new Date().toISOString(),value: Math.random()});}return items;
}export default function () {const payload = JSON.stringify({ tenantId: 't1', items: genItems(100) });http.post('http://localhost:5000/api/ingest', payload, { headers: { 'Content-Type': 'application/json' } });sleep(0.1);
}
11. 失败与回滚策略 🛡️
- 批失败二分:将失败批二分定位坏记录(或坏子集),异常数据入死信并记录原因/哈希。
- 重试:指数退避,超限转死信;日终幂等补偿脚本按唯一键重放。
- 审计:批次表记录
batch_id
、count
、failures
、dur_ms
、时间戳。
建议准备
dead_letters
表(含批次 ID、异常摘要、payload hash、首次/末次时间、重试次数)。
12. 代码与配置清单 🧱
12.1 模型
public record WriteItem(string TenantId, Guid Id, DateTimeOffset Ts, double Value);public class Metric
{public string TenantId { get; set; } = default!;public Guid Id { get; set; }public DateTimeOffset Ts { get; set; }public double Value { get; set; }
}
12.2 DbContext(PG/SQL Server 通用)
public class AppDbContext : DbContext
{public DbSet<Metric> Metrics => Set<Metric>();public AppDbContext(DbContextOptions<AppDbContext> options) : base(options) { }protected override void OnModelCreating(ModelBuilder b){b.Entity<Metric>(e =>{e.ToTable("metrics");e.HasKey(x => new { x.TenantId, x.Id }); // 幂等键e.Property(x => x.Ts).HasColumnName("ts");e.HasIndex(x => x.Ts);});}
}
12.3 批量实现(核心方法)
public static class BulkImpl
{public static async Task PostgresCopyAsync(AppDbContext db, List<WriteItem> rows, CancellationToken ct){var conn = (NpgsqlConnection)db.Database.GetDbConnection();if (conn.State != ConnectionState.Open) await conn.OpenAsync(ct);await using var wr = conn.BeginBinaryImport("COPY public.metrics (tenant_id, id, ts, value) FROM STDIN (FORMAT BINARY)");foreach (var r in rows){wr.StartRow();wr.Write(r.TenantId, NpgsqlDbType.Text);wr.Write(r.Id, NpgsqlDbType.Uuid);wr.Write(r.Ts.ToUniversalTime(), NpgsqlDbType.TimestampTz); // ✅ UTCwr.Write(r.Value, NpgsqlDbType.Double);}await wr.CompleteAsync(ct); // 未 Complete 则回滚}// 常规模式:DataTablepublic static async Task SqlServerBulkCopyAsync(AppDbContext db, List<WriteItem> rows, CancellationToken ct){var table = new DataTable();table.Columns.Add("tenant_id", typeof(string));table.Columns.Add("id", typeof(Guid));table.Columns.Add("ts", typeof(DateTimeOffset));table.Columns.Add("value", typeof(double));foreach (var r in rows) table.Rows.Add(r.TenantId, r.Id, r.Ts, r.Value);var conn = (SqlConnection)db.Database.GetDbConnection();if (conn.State != ConnectionState.Open) await conn.OpenAsync(ct);using var bulk = new SqlBulkCopy(conn,SqlBulkCopyOptions.CheckConstraints | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.TableLock,null){DestinationTableName = "[dbo].[metrics]",BatchSize = 5000,BulkCopyTimeout = 120};bulk.ColumnMappings.Add("tenant_id", "tenant_id");bulk.ColumnMappings.Add("id", "id");bulk.ColumnMappings.Add("ts", "ts");bulk.ColumnMappings.Add("value", "value");bulk.NotifyAfter = 5000;bulk.SqlRowsCopied += (_, e) => Console.WriteLine($"Copied: {e.RowsCopied}");await bulk.WriteToServerAsync(table, ct);}// 流式模式:IEnumerable<SqlDataRecord>(低内存)public static async Task SqlServerBulkCopyStreamAsync(AppDbContext db, List<WriteItem> rows, CancellationToken ct){var conn = (SqlConnection)db.Database.GetDbConnection();if (conn.State != ConnectionState.Open) await conn.OpenAsync(ct);var meta = new[]{new SqlMetaData("tenant_id", SqlDbType.NVarChar, 128),new SqlMetaData("id", SqlDbType.UniqueIdentifier),new SqlMetaData("ts", SqlDbType.DateTimeOffset),new SqlMetaData("value", SqlDbType.Float)};IEnumerable<SqlDataRecord> Stream(){foreach (var r in rows){var rec = new SqlDataRecord(meta);rec.SetString(0, r.TenantId);rec.SetGuid(1, r.Id);rec.SetDateTimeOffset(2, r.Ts);rec.SetDouble(3, r.Value);yield return rec;}}using var bulk = new SqlBulkCopy(conn){DestinationTableName = "[dbo].[metrics]",BatchSize = 5000,BulkCopyTimeout = 120};await bulk.WriteToServerAsync(Stream().GetEnumerator(), ct);}public static async Task ExecuteUpdateAsync(AppDbContext db, CancellationToken ct){var cutoff = DateTimeOffset.UtcNow.AddHours(-1);var rows = await db.Metrics.Where(m => m.Ts < cutoff).ExecuteUpdateAsync(s => s.SetProperty(m => m.Value, m => m.Value * 0.99), ct);// rows -> 审计}
}
12.4 Web API 入队口(限流/限包/校验)
[ApiController, Route("api/ingest")]
public class IngestController : ControllerBase
{private readonly Channel<WriteItem> _ch;public IngestController(Channel<WriteItem> ch) => _ch = ch;[HttpPost][RequestSizeLimit(5 * 1024 * 1024)] // 5MB:防大包;按需上调public async Task<IActionResult> Post([FromBody] IngestRequest req, CancellationToken ct){if (req.Items is null || req.Items.Count == 0 || req.Items.Count > 10_000)return BadRequest("Items count out of range");foreach (var it in req.Items){var ok = await _ch.Writer.WaitToWriteAsync(ct) && _ch.Writer.TryWrite(it);if (!ok) return StatusCode(StatusCodes.Status429TooManyRequests); // 背压}return Accepted(); // 软实时快速 ACK}
}public record IngestRequest(string TenantId, List<WriteItem> Items);
12.5 appsettings(关键参数)
{"Bulk": {"MaxBatchSize": 5000,"MaxBatchIntervalMs": 100,"ChannelCapacity": 50000,"Retry": { "MaxAttempts": 3, "BaseDelayMs": 100 }},"ConnectionStrings": {"Pg": "Host=localhost;Username=postgres;Password=postgres;Database=app;","Sql": "Server=localhost,1433;User Id=sa;Password=Pass@word1;Encrypt=False;TrustServerCertificate=True"}
}