🛰️ 离线优先与冲突解决:ABP vNext + PWA 的边缘同步
📚 目录
- 🛰️ 离线优先与冲突解决:ABP vNext + PWA 的边缘同步
- 0. 环境 🚀
- 1. 场景与目标(痛点→指标)🎯
- 2. 架构与时序 🗺️
- 2.1 总体架构
- 2.2 写入-回放-合并-追平时序
- 3. 协议与数据模型 📜
- 3.1 客户端 `oplog` 与 `checkpoint`
- 3.2 服务端响应(pull)
- 3.3 幂等 & 审计
- 4. 策略矩阵 🧠
- 4.1 策略选择决策树 🌳
- 5. 前端实现 📱
- 5.1 IndexedDB 结构(`idb`)
- 5.2 Service Worker 缓存策略(Workbox)
- 5.3 入队 + 乐观更新 + 回放触发
- 5.4 回放策略(批量/字节上限/退避/熔断)🧯
- 5.5 冲突 UI 🧩
- 5.6 回放状态机(Mermaid)🕹️
- 6. 后端实现 🧰
- 6.1 DTO 与控制器
- 6.2 策略配置
- 6.3 LWW/OT/CRDT 关键实现
- 6.4 CRDT 类图 📐
- 7. 可观测性:Prometheus / Grafana / 告警 📊
- 7.1 后端指标
- 7.2 Prometheus 抓取配置(片段)
- 7.3 Alert 规则(冲突率>2% 且持续10m)🚨
- 8. 测试与评测 🧪
- 8.1 xUnit 单测(策略正确性)
- 8.2 Playwright E2E(离线→回放→追平)
- 8.3 k6 压测脚本(/push)
- 9. 断网/重连/回放:边界与降级 🧷
- 10. 安全与合规 🔐
- 11. 部署与限制 🚢
- 12. 常见问题(FAQ)💡
0. 环境 🚀
# 后端
dotnet tool update -g Volo.Abp.Cli
abp new Acme.SyncDemo -t app --database-provider ef
cd Acme.SyncDemo
abp add-package Volo.Abp.AspNetCore.SignalR
dotnet add Acme.SyncDemo.HttpApi.Host package prometheus-net.AspNetCore
dotnet build# 前端
npm create vite@latest pwa-edge-sync -- --template react-ts
cd pwa-edge-sync
npm i idb workbox-window workbox-routing workbox-strategies workbox-background-sync yjs diff-match-patch
npm run dev# 压测与E2E(可选)
npm i -g k6@latest
k6 run scripts/k6-push.js
1. 场景与目标(痛点→指标)🎯
-
🛠️ 典型场景:巡检/质检、外勤单据、低信号工位
-
SLO:
- ✅ 离线录入/编辑 100% 可用
- ⚡ 恢复联网后回放延迟 p95 ≤ 3s
- 🧩 冲突可解释/可回滚/可审计(字段级 LWW/OT/CRDT)
2. 架构与时序 🗺️
2.1 总体架构
2.2 写入-回放-合并-追平时序
3. 协议与数据模型 📜
3.1 客户端 oplog
与 checkpoint
{"opId": "uuid","tenant": "t1","entity": "Order","key": "O-1001","op": "update","patch": [{"path":"/qty","op":"replace","value":12}],"clientClock": 123,"baseVersion": 130,"ts": 1690000000,"author": "u1"
}
3.2 服务端响应(pull)
{"since": 120,"next": 140,"events": [{"entity":"Order","key":"O-1001","patch":[{"path":"/qty","op":"replace","value":10}],"serverClock":130,"version":"etag-130","strategy":"LWW"}],"applied": ["uuid"]
}
3.3 幂等 & 审计
- 🔁 幂等去重:
opId
+(tenant, entity, key, clientClock)
- 🕵️ 审计:原值/新值快照、命中策略、操作者、
baseVersion
4. 策略矩阵 🧠
字段类型/场景 | 推荐策略 | 要点 |
---|---|---|
普通表单字段 | LWW | 简单、覆盖风险→配合快照/撤销 |
富文本/说明 | OT | 需 baseVersion + transform |
标签/集合 | CRDT (LWW-Element-Set) | add/remove 带 ts,需 tombstone GC |
计数 | CRDT (PN-Counter) | P/N 合并取 max,值为 P-N |
4.1 策略选择决策树 🌳
5. 前端实现 📱
5.1 IndexedDB 结构(idb
)
// src/db.ts
import { openDB } from 'idb';
export const dbPromise = openDB('edge-sync', 2, {upgrade(db, oldVersion) {if (oldVersion < 1) {db.createObjectStore('state', { keyPath: ['entity','key'] });db.createObjectStore('oplog', { keyPath: 'opId' });db.createObjectStore('meta', { keyPath: 'k' });}if (oldVersion < 2) {const oplog = db.transaction.objectStore('oplog');(oplog as any).createIndex?.('clock', 'clientClock'); // 便于顺序回放}}
});
5.2 Service Worker 缓存策略(Workbox)
// public/sw.js
import {registerRoute} from 'workbox-routing';
import {NetworkFirst, StaleWhileRevalidate, NetworkOnly} from 'workbox-strategies';
import {BackgroundSyncPlugin} from 'workbox-background-sync';registerRoute(({request}) => request.mode === 'navigate', new NetworkFirst());
registerRoute(({request}) => ['style','script','image'].includes(request.destination), new StaleWhileRevalidate());const pushQueue = new BackgroundSyncPlugin('push-queue', { maxRetentionTime: 60 /*min*/ });
registerRoute(({url}) => url.pathname.startsWith('/api/sync/push'), new NetworkOnly({ plugins: [pushQueue] }), 'POST');
5.3 入队 + 乐观更新 + 回放触发
// src/sync/enqueue.ts
import { dbPromise } from '../db';export async function enqueue(op: any) {const db = await dbPromise;await (await db).transaction(['oplog','state'],'readwrite').objectStore('oplog').add(op);await applyOptimistic(op); // 本地 UI 即时可见triggerSync();
}
5.4 回放策略(批量/字节上限/退避/熔断)🧯
// src/sync/replay.ts
const MAX_BATCH_OPS = 200;
const MAX_BATCH_BYTES = 256 * 1024; // 256KB
const BASE_DELAY = 500; // ms
const MAX_DELAY = 10_000;
let backoff = 0;
let circuitOpen = false;
let lastFailureAt = 0;export async function triggerSync() {if (navigator.onLine === false) return;if (circuitOpen && Date.now() - lastFailureAt < 20_000) return;try {await flushOnce();backoff = 0; circuitOpen = false;} catch (e) {backoff = Math.min(MAX_DELAY, backoff ? backoff * 2 : BASE_DELAY);lastFailureAt = Date.now();if (backoff === MAX_DELAY) circuitOpen = true;setTimeout(triggerSync, backoff);}
}async function flushOnce() {const db = await dbPromise;const tx = db.transaction(['oplog','meta'],'readwrite');const oplog = tx.objectStore('oplog');const idx = oplog.index?.('clock') as any;const batch: any[] = [];let size = 0;let cursor = idx?.openCursor() ?? await oplog.openCursor();while (cursor && batch.length < MAX_BATCH_OPS) {const op = cursor.value;const bytes = new Blob([JSON.stringify(op)]).size;if (size + bytes > MAX_BATCH_BYTES) break;batch.push(op); size += bytes;cursor = await cursor.continue();}if (batch.length === 0) return;const ckp = (await tx.objectStore('meta').get('checkpoint'))?.value ?? 0;const res = await fetch('/api/sync/push', {method:'POST', headers:{'Content-Type':'application/json'},body: JSON.stringify({ ops: batch, clientCheckpoint: ckp })});if (res.status === 409) {const detail = await res.json();await handleConflicts(detail);throw new Error('409-conflict');}if (!res.ok) throw new Error('push-failed');const { applied, serverCheckpoint } = await res.json();for (const opId of applied) await oplog.delete(opId);await tx.objectStore('meta').put({ k:'checkpoint', value: serverCheckpoint });const pull = await fetch(`/api/sync/pull?since=${serverCheckpoint}`);const data = await pull.json();await applyEvents(data.events);
}
5.5 冲突 UI 🧩
// src/components/ConflictModal.tsx
import React, { useState } from 'react';
import DiffMatchPatch from 'diff-match-patch';export function ConflictModal({server, client, onResolve}:{server:any, client:any, onResolve:(patch:any)=>void}) {const dmp = new DiffMatchPatch();const diffs = dmp.diff_main(server.text, client.text); dmp.diff_cleanupSemantic(diffs);const [strategy, setStrategy] = useState<'server'|'client'|'merge'>('merge');const merged = dmp.patch_apply(dmp.patch_make(server.text, diffs), server.text)[0];return (<div className="modal"><h3>冲突解决</h3><select value={strategy} onChange={e=>setStrategy(e.target.value as any)}><option value="merge">智能合并(OT/Diff)</option><option value="server">以服务器为准</option><option value="client">以本地为准</option></select><pre className="diff">{JSON.stringify(diffs, null, 2)}</pre><button onClick={()=> onResolve(strategy==='merge' ? { op:'replace', path:'/text', value: merged }: strategy==='client' ? client : server)}>应用</button></div>);
}
5.6 回放状态机(Mermaid)🕹️
6. 后端实现 🧰
6.1 DTO 与控制器
// Contracts
public sealed record ClientOp(Guid OpId, string Tenant, string Entity, string Key, string Op,JsonElement Patch, long ClientClock, long BaseVersion, long Ts, string Author);public sealed record PushRequest(List<ClientOp> Ops, long ClientCheckpoint);
public sealed record PushResponse(IEnumerable<Guid> Applied, long ServerCheckpoint, ConflictReport? Conflicts);public sealed record ServerEvent(string Entity, string Key, JsonElement Patch, long ServerClock, string Version, string Strategy);
public sealed record PullResponse(long Since, long Next, List<ServerEvent> Events);// Controller
[Route("api/sync")]
public class SyncController : AbpController
{private readonly ISyncService _svc;public SyncController(ISyncService svc) => _svc = svc;[HttpPost("push")]public async Task<ActionResult<PushResponse>> PushAsync([FromBody] PushRequest req){var result = await _svc.PushAsync(req);if (result.Conflicts is not null && result.Conflicts.Total > 0)return StatusCode(StatusCodes.Status409Conflict, result);return Ok(result);}[HttpGet("pull")]public Task<PullResponse> PullAsync([FromQuery] long since) => _svc.PullAsync(since);
}
6.2 策略配置
public enum ConflictStrategy { Lww, OtText, CrdtSet, CrdtPnCounter }
public sealed record FieldPolicy(string Field, ConflictStrategy Strategy);
public interface IPolicyProvider { FieldPolicy GetPolicy(string entity, string field); }
6.3 LWW/OT/CRDT 关键实现
LWW(覆盖 + 快照审计)
private JsonElement LwwMerge(JsonElement server, JsonElement client, long sClock, long cClock)=> cClock >= sClock ? client : server;
OT(简化文本 OT transform + apply)
public sealed record TextOp(string Type, int Pos, string? Text, int? Len, long BaseVersion);
public static class OtTransform
{public static List<TextOp> Transform(List<TextOp> clientOps, List<TextOp> serverOps){foreach (var s in serverOps)foreach (var c in clientOps){if (s.Type=="insert") { if (c.Pos >= s.Pos) c.Pos += s.Text!.Length; }else if (s.Type=="delete") { if (c.Pos >= s.Pos) c.Pos -= s.Len!.Value; }}return clientOps;}public static string Apply(string baseText, IEnumerable<TextOp> ops){var text = baseText;foreach (var op in ops){if (op.Type=="insert")text = text[..op.Pos] + op.Text + text[op.Pos..];else if (op.Type=="delete")text = text[..op.Pos] + text[(op.Pos + op.Len!.Value)..];else if (op.Type=="replace")text = text[..op.Pos] + op.Text + text[(op.Pos + op.Len!.Value)..];}return text;}
}
CRDT:LWW-Element-Set + PN-Counter(含 GC)
public sealed record CrdtTag(string Value, DateTime Ts, bool IsRemove);public sealed class LwwElementSet
{private readonly Dictionary<string, CrdtTag> _map = new();public void Add(string v, DateTime ts) => _map[v] = new(v, ts, false);public void Remove(string v, DateTime ts){if (_map.TryGetValue(v, out var cur))_map[v] = cur.Ts > ts ? cur : new(v, ts, true);else _map[v] = new(v, ts, true);}public IEnumerable<string> Values => _map.Where(kv => !kv.Value.IsRemove).Select(kv => kv.Key);public void Gc(TimeSpan tombstoneTtl){var cutoff = DateTime.UtcNow - tombstoneTtl;var keys = _map.Where(kv => kv.Value.IsRemove && kv.Value.Ts < cutoff).Select(kv => kv.Key).ToList();foreach (var k in keys) _map.Remove(k);}public static LwwElementSet Merge(LwwElementSet a, LwwElementSet b){var r = new LwwElementSet();foreach (var kv in a._map.Concat(b._map)){if (!r._map.TryGetValue(kv.Key, out var cur) || kv.Value.Ts > cur.Ts)r._map[kv.Key] = kv.Value;}return r;}
}public sealed class PnCounter
{private readonly Dictionary<string,long> _p = new();private readonly Dictionary<string,long> _n = new();public void Inc(string node, long delta=1) => _p[node] = Math.Max(_p.GetValueOrDefault(node), 0) + delta;public void Dec(string node, long delta=1) => _n[node] = Math.Max(_n.GetValueOrDefault(node), 0) + delta;public long Value => _p.Values.Sum() - _n.Values.Sum();public static PnCounter Merge(PnCounter a, PnCounter b){var r = new PnCounter();foreach (var k in a._p.Keys.Concat(b._p.Keys).Distinct()) r._p[k] = Math.Max(a._p.GetValueOrDefault(k), b._p.GetValueOrDefault(k));foreach (var k in a._n.Keys.Concat(b._n.Keys).Distinct()) r._n[k] = Math.Max(a._n.GetValueOrDefault(k), b._n.GetValueOrDefault(k));return r;}
}
6.4 CRDT 类图 📐
7. 可观测性:Prometheus / Grafana / 告警 📊
7.1 后端指标
// Program.cs
using Prometheus;var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();app.UseHttpMetrics(); // 请求级指标
app.MapMetrics("/metrics"); // 暴露给 Prometheusapp.Run();
// SyncMetrics.cs
using Prometheus;public static class SyncMetrics
{public static readonly Counter PushTotal = Metrics.CreateCounter("sync_push_total","push requests");public static readonly Counter PullTotal = Metrics.CreateCounter("sync_pull_total","pull requests");public static readonly Counter ConflictTotal = Metrics.CreateCounter("sync_conflict_total","conflicts");public static readonly Histogram PushLatency = Metrics.CreateHistogram("sync_push_latency_ms","push latency (ms)",new HistogramConfiguration { Buckets = Histogram.PowersOfTenDividedBuckets(1, 10_000, 5) });
}
在 ISyncService.PushAsync
中:
var sw = Stopwatch.StartNew();
try {SyncMetrics.PushTotal.Inc();// ... 合并/持久化 ...
} finally {SyncMetrics.PushLatency.Observe(sw.Elapsed.TotalMilliseconds);
}
7.2 Prometheus 抓取配置(片段)
scrape_configs:- job_name: 'acme-syncdemo'scrape_interval: 5sstatic_configs:- targets: ['syncdemo-host:8080'] # /metrics
7.3 Alert 规则(冲突率>2% 且持续10m)🚨
groups:
- name: sync-alertsrules:- alert: HighConflictRateexpr: (rate(sync_conflict_total[10m]) / rate(sync_push_total[10m])) > 0.02for: 10mlabels: { severity: warning }annotations:summary: "Edge Sync 高冲突率"description: "冲突率超过 2% 持续 10 分钟,请检查策略/批大小/热点字段。"
8. 测试与评测 🧪
8.1 xUnit 单测(策略正确性)
public class LwwTests
{[Fact]public void LaterClientWins(){var server = JsonDocument.Parse("{\"qty\":10}").RootElement;var client = JsonDocument.Parse("{\"qty\":12}").RootElement;var merged = LwwMerge(server, client, 100, 120);Assert.Equal(12, merged.GetProperty("qty").GetInt32());}
}
8.2 Playwright E2E(离线→回放→追平)
- 断网新增/编辑 → 联网回放 → 校验 UI/服务器一致
- 并发修改 → 冲突弹窗决策生效
8.3 k6 压测脚本(/push)
// scripts/k6-push.js
import http from 'k6/http';
import { check, sleep } from 'k6';export let options = { vus: 50, duration: '1m' };export default function () {const body = JSON.stringify({ops: Array.from({length: 50}, (_,i)=> ({opId: crypto.randomUUID(),tenant:'t1', entity:'Order', key:`O-${__VU}-${i}`, op:'update',patch:[{path:'/qty', op:'replace', value: Math.floor(Math.random()*100)}],clientClock: Date.now(), baseVersion: 0, ts: Date.now(), author:'u1'})),clientCheckpoint: 0});const res = http.post('http://localhost:8080/api/sync/push', body, { headers: { 'Content-Type':'application/json' }});check(res, { 'status 200/409': r => r.status===200 || r.status===409 });sleep(0.2);
}
9. 断网/重连/回放:边界与降级 🧷
-
🧱 队列上限:op 数与字节总量双阈值;达阈值→提示并做增量压缩(同字段多次修改合并一次)
-
🔁 指数退避:500ms 起、max 10s;失败多次→熔断;冷却窗后再试
-
🧭 失败分类:
- 可重试(超时/5xx)→ 退避重放
- 语义冲突(409)→ 合并流(弹窗/静默)
- 身份(401/403)→ 提示重登,降级只读
-
📱 iOS 限制:无 Background Sync → 前台“立即同步”按钮 + SW 激活时回放
10. 安全与合规 🔐
- 🔒 IndexedDB 敏感字段加密:WebCrypto AES-GCM(密钥与会话绑定,退出销毁)
- 🧰 最小化缓存:只缓存必要字段,按租户/用户作用域隔离
- 🕵️ 审计可回滚:保存快照差异与策略命中;支持“撤销此回放”
11. 部署与限制 🚢
- ⚡ App Shell 首屏缓存 + 预拉关键数据
- 🌓 灰度:按租户/用户组开关离线与策略
- 🌐 弱网地区参数:API 超时、批大小更小、退避更陡
12. 常见问题(FAQ)💡
Q1:OT 与 CRDT 怎么选?
- 文本/顺序敏感 → OT;集合/计数 → CRDT;普通字段 → LWW。字段级混用很常见,要有策略配置。
Q2:如何避免 LWW 覆盖误伤?
- 开启“快照对比 + 决策窗”;高风险字段(备注/说明)优先 OT;对关键字段启用人工仲裁回路。
Q3:tombstone 过多怎么办?
- 配置
tombstoneTtl
(如 7 天)+ 定时 GC(见LwwElementSet.Gc
);必要时做分段重建。