ABP VNext + CloudEvents:事件驱动微服务互操作性

ABP VNext + CloudEvents:事件驱动微服务互操作性 🚀


📚 目录

  • ABP VNext + CloudEvents:事件驱动微服务互操作性 🚀
    • 一、引言 ✨
      • ☁️ TL;DR
      • 📚 背景与动机
      • 🏗️ 整体架构图
    • 二、环境准备与依赖安装 🛠️
      • 2.1 环境要求
      • 2.2 .NET 依赖安装
      • 2.3 Go 与 Python 安装
    • 三、CloudEvents 规范概览 📚
    • 四、gRPC Protobuf 定义 📦
    • 五、在 ABP VNext 中发布 & 消费 CloudEvent 🚀
      • 5.1 Program.cs 完整配置
      • 5.2 发布 CloudEvent
      • 5.3 接收 CloudEvent
    • 六、与 Knative Eventing 集成 🐳
    • 七、与 Azure Event Grid 集成 ☁️
      • 7.1 获取密钥
      • 7.2 发布 CloudEvent
      • 7.3 订阅端点
    • 八、多语言互操作示例 🌐
      • 8.1 Python Flask 消费
      • 8.2 Go 发布到 Event Grid
    • 九、示例场景 🔄
    • 十、性能、可用性与测试 📈


一、引言 ✨

☁️ TL;DR

  • 🌐 使用 CloudEvents 1.0 统一事件元数据,消除 Knative、Azure Event Grid、Kafka 等平台差异
  • ⚡️ 在 ABP VNext 中通过 Typed HttpClient、gRPC 客户端及 Polly 重试快速发布/消费事件
  • 🐍 支持 .NET、Go、Python 多语言互操作,包含完整认证、TLS/证书与错误处理
  • 🔄 演示在 Knative EventingAzure Event Grid 间双向互操作,并接入 OpenTelemetry 全链路追踪

📚 背景与动机

微服务生态中自定义事件格式难以互通;CloudEvents(CNCF 标准)定义了必需字段、JSON/Protobuf 格式与传输绑定,极大降低跨平台、跨语言的集成成本。

🏗️ 整体架构图

Structured/gRPC
HTTP/gRPC
用户界面
ABP VNext OrderService
Dapr Pub/Sub
Knative Broker
InventoryService
Azure Event Grid
AnalyticsService

二、环境准备与依赖安装 🛠️

2.1 环境要求

  • Kubernetes v1.25+(含 Knative Eventing v1.10+
  • Azure 订阅:具备 Event Grid 主题 与访问密钥
  • .NET 9 SDK
  • Go 1.20+
  • Python 3.9+

2.2 .NET 依赖安装

dotnet add package CloudNative.CloudEvents               --version 2.8.0
dotnet add package CloudNative.CloudEvents.Http          --version 2.8.0
dotnet add package CloudNative.CloudEvents.Core          --version 2.8.0
dotnet add package CloudNative.CloudEvents.Protobuf      --version 2.8.0
dotnet add package CloudNative.CloudEvents.SystemTextJson--version 2.8.0
dotnet add package CloudNative.CloudEvents.AspNetCore    --version 2.8.0
dotnet add package Microsoft.Extensions.Http.Polly       --version 8.0.0
dotnet add package Azure.Messaging.EventGrid             --version 5.11.0
dotnet add package Dapr.Client                           --version 1.11.0   # 可选

2.3 Go 与 Python 安装

go get github.com/cloudevents/sdk-go/v2
pip install cloudevents flask

三、CloudEvents 规范概览 📚

  • 必需字段specversionidsourcetype

  • 常用字段timedatacontenttypedataschema、扩展属性

  • 传输模式

    • Structured(完整 JSON)
    • Binary(HTTP Header + Body)
    • gRPC(Protobuf)
  • 原生兼容:Knative Broker、Azure Event Grid、Kafka、Dapr Pub/Sub


四、gRPC Protobuf 定义 📦

  1. 从 NuGet 包 CloudNative.CloudEvents.Protobufproto/ 目录复制官方 cloudevents.proto 到项目 Protos/

  2. Protos/mycompany.events.proto 定义业务契约:

    // Protos/mycompany.events.proto
    syntax = "proto3";
    package mycompany.events;import "cloudevents.proto";service CloudEventService {rpc Send (SendRequest) returns (SendResponse);
    }message SendRequest {io.cloudevents.v1.CloudEvent event = 1;
    }
    message SendResponse {}
    
  3. .csproj 中添加:

    <ItemGroup><Protobuf Include="Protos\cloudevents.proto" GrpcServices="None" /><Protobuf Include="Protos\mycompany.events.proto" GrpcServices="Server;Client" />
    </ItemGroup>
    

五、在 ABP VNext 中发布 & 消费 CloudEvent 🚀

5.1 Program.cs 完整配置

var builder = WebApplication.CreateBuilder(args);// 1. 配置 Authentication & Authorization
builder.Services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme).AddJwtBearer(options =>{options.Authority = builder.Configuration["Jwt:Authority"];options.Audience = builder.Configuration["Jwt:Audience"];options.TokenValidationParameters = new TokenValidationParameters{ValidateIssuer = true,ValidateAudience = true,ValidateLifetime = true,ValidateIssuerSigningKey = true,IssuerSigningKey = new SymmetricSecurityKey(Encoding.UTF8.GetBytes(builder.Configuration["Jwt:Key"]))};});
builder.Services.AddAuthorization();// 2. 注册 Dapr Client(可选)
builder.Services.AddDaprClient();// 3. 控制器 & CloudEvents JSON 格式化
builder.Services.AddControllers().AddCloudEventsJsonFormatters();// 4. Typed HttpClient(Knative Broker)
builder.Services.AddHttpClient("CloudEventClient", client =>
{client.BaseAddress = new Uri("http://broker-ingress.knative-eventing.svc.cluster.local/default/");client.DefaultRequestHeaders.Add("Content-Type", "application/cloudevents+json");
})
.AddTransientHttpErrorPolicy(p => p.WaitAndRetryAsync(3, _ => TimeSpan.FromMilliseconds(200)));// 5. 注册 gRPC 客户端(含自签名证书示例)
builder.Services.AddGrpcClient<CloudEventService.CloudEventServiceClient>(o =>
{o.Address = new Uri("https://grpc-server:5001");
})
.ConfigurePrimaryHttpMessageHandler(() =>
{var handler = new HttpClientHandler();handler.ServerCertificateCustomValidationCallback = HttpClientHandler.DangerousAcceptAnyServerCertificateValidator;return handler;
});// 6. 注册 Event Grid 客户端
builder.Services.AddSingleton(sp =>
{var config = sp.GetRequiredService<IConfiguration>();return new EventGridPublisherClient(new Uri(config["EventGrid:Endpoint"]),new AzureKeyCredential(config["EventGrid:Key"]));
});// 7. OpenTelemetry(Tracing + Metrics)
builder.Services.AddOpenTelemetryTracing(b => b.AddAspNetCoreInstrumentation().AddHttpClientInstrumentation().AddGrpcClientInstrumentation().AddJaegerExporter());
builder.Services.AddOpenTelemetryMetrics(m => m.AddPrometheusExporter());var app = builder.Build();
app.UseAuthentication();
app.UseAuthorization();// 暴露 Prometheus /metrics 端点
app.UseOpenTelemetryPrometheusScrapingEndpoint();app.MapControllers();
app.Run();

5.2 发布 CloudEvent

using CloudNative.CloudEvents;
using CloudNative.CloudEvents.Protobuf;
using CloudNative.CloudEvents.Http;
using CloudNative.CloudEvents.SystemTextJson;
using Azure.Messaging.EventGrid;public class OrderService
{private readonly HttpClient _http;private readonly CloudEventService.CloudEventServiceClient _grpc;private readonly EventGridPublisherClient _egClient;private readonly Dapr.Client.DaprClient _dapr;private readonly ILogger<OrderService> _logger;public OrderService(IHttpClientFactory httpFactory,CloudEventService.CloudEventServiceClient grpc,EventGridPublisherClient egClient,Dapr.Client.DaprClient dapr,ILogger<OrderService> logger){_http    = httpFactory.CreateClient("CloudEventClient");_grpc    = grpc;_egClient= egClient;_dapr    = dapr;_logger  = logger;}public async Task PublishAsync(Guid orderId, decimal amount){var ce = new CloudEvent("com.mycompany.order.created", new Uri("urn:abp:orderservice")){Id              = Guid.NewGuid().ToString(),Time            = DateTimeOffset.UtcNow,DataContentType = "application/json",Data            = new { OrderId = orderId, Amount = amount }};ce.DataSchema = new Uri("https://schemas.mycompany.com/order/1.0");ce.Extensions["version"] = "1.0";// 1. HTTP Structuredvar httpContent = new CloudEventContent(ce, ContentMode.Structured, new JsonEventFormatter());var resp = await _http.PostAsync("", httpContent);resp.EnsureSuccessStatusCode();// 2. gRPC Binarytry{var protoEvent = ce.ToProto();await _grpc.SendAsync(new SendRequest { Event = protoEvent });}catch (RpcException ex){_logger.LogError(ex, "gRPC send failed for {EventId}", ce.Id);throw;}// 3. Azure Event Gridawait _egClient.SendCloudEventAsync(ce);// 4. Dapr Pub/Sub(可选)await _dapr.PublishEventAsync("pubsub", "order.created", ce);}
}

5.3 接收 CloudEvent

[ApiController]
[Route("api/events")]
public class EventsController : ControllerBase
{private readonly IOrderAppService _orders;private readonly ILogger<EventsController> _logger;public EventsController(IOrderAppService orders, ILogger<EventsController> logger){_orders = orders;_logger = logger;}[HttpPost][Authorize]public async Task<IActionResult> Receive([FromBody] CloudEvent ce){try{var order = ce.Data.ToObject<OrderCreatedDto>();await _orders.ProcessOrderAsync(order);return Ok();}catch (Exception ex){_logger.LogError(ex, "Processing failed for CloudEvent {EventId}", ce.Id);return StatusCode(500);}}
}

六、与 Knative Eventing 集成 🐳

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:name: default---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:name: order-trigger
spec:broker: defaultfilter:attributes:type: com.mycompany.order.createdsubscriber:uri: http://my-abp-app.default.svc.cluster.local/api/eventsdelivery:retry: 5backoffPolicy: exponentialdeadLetterSink:uri: http://deadletter.default.svc.cluster.local
POST /default
OrderService
Knative Broker
InventoryService
AnalyticsService

七、与 Azure Event Grid 集成 ☁️

7.1 获取密钥

topicKey=$(az eventgrid topic key list \--name myTopic \--resource-group myRg \--query key1 -o tsv)

EventGrid:EndpointEventGrid:Key 写入 appsettings.json 或环境变量。

7.2 发布 CloudEvent

// egClient 通过 DI 注入
await _egClient.SendCloudEventAsync(ce);

7.3 订阅端点

[HttpPost("api/eventgrid")]
public IActionResult OnEvent([FromBody] CloudEvent ce)
{_logger.LogInformation("EG Received {EventId}", ce.Id);return Ok();
}

八、多语言互操作示例 🌐

8.1 Python Flask 消费

pip install cloudevents flask
from flask import Flask, request, abort
from cloudevents.http import from_httpapp = Flask(__name__)@app.route("/python-events", methods=["POST"])
def receive():try:ce = from_http(request.headers, request.get_data())print("📥 Received:", ce["id"], ce.data)return "", 200except Exception:abort(400)if __name__ == "__main__":app.run(port=3000)

8.2 Go 发布到 Event Grid

import ("context""log""os"cloudevents "github.com/cloudevents/sdk-go/v2"
)func main() {target := os.Getenv("EVENT_GRID_ENDPOINT")key    := os.Getenv("EVENT_GRID_KEY")c, err := cloudevents.NewClientHTTP(cloudevents.WithTarget(target),cloudevents.WithHeader("aeg-sas-key", key),)if err != nil {log.Fatalf("❌ client error: %v", err)}e := cloudevents.NewEvent()e.SetSource("urn:go:inventory")e.SetType("com.mycompany.inventory.updated")e.SetData(cloudevents.ApplicationJSON, map[string]int{"productId": 123, "qty": 10})if res := c.Send(context.Background(), e); cloudevents.IsUndelivered(res) {log.Fatalf("❌ send failed: %v", res)}log.Println("✅ Event sent")
}

九、示例场景 🔄

用户界面OrderServiceKnative BrokerInventoryServiceEvent GridAnalyticsServiceDatabaseSubmit OrderPublish order.createdTrigger Inventory ReductionSend to Event GridDistribute eventWrite Reports用户界面OrderServiceKnative BrokerInventoryServiceEvent GridAnalyticsServiceDatabase

十、性能、可用性与测试 📈

  • HTTP vs gRPC

    • HTTP Structured 易调试;gRPC Binary 延迟更低、吞吐更高
  • 重试 & 死信

    • Knative:retry + deadLetterSink
    • Event Grid:指数退避重试 + 死信存储
  • Schema 管理

    • 使用 DataSchema 与扩展属性版本化事件
    • 可结合 Schema Registry(如 Azure Schema Registry)
  • 安全

    • 全链路 HTTPS + JWT/SAS 验证 + 消息签名
  • 测试示例

    • xUnit 集成测试WebApplicationFactory<Program> 验证 /api/events
    • k6 性能脚本(HTTP vs gRPC 对比)
// k6 script snippet
import http from 'k6/http';
import grpc from 'k6/net/grpc';const client = new grpc.Client();
client.load(['protos'], 'mycompany.events.proto');
client.connect('grpc-server:5001', { plaintext: true });export default function() {http.post('http://broker-ingress.knative-eventing.svc.cluster.local/default',JSON.stringify({ /* CloudEvent JSON */ }),{ headers: { 'Content-Type': 'application/cloudevents+json' } });client.invoke('mycompany.events.CloudEventService/Send',{ event: {/* proto CloudEvent */} });
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/91852.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/91852.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软件测试测评公司关于HTTP安全头配置与测试?

浏览器和服务器之间那几行看不见的HTTP安全头配置&#xff0c;往往是抵御网络攻击的关键防线。作为软件测试测评公司&#xff0c;我们发现超过六成的高危漏洞源于安全头缺失或误配。别小看这些响应头&#xff0c;它们能直接掐断跨站脚本、点击劫持、数据嗅探的攻击路径。五条命…

Mysql集成技术

目录 mysql的编译安装与部署 1.编译安装mysql 2.部署mysql mysql主从复制 什么是mysql主从复制&#xff1f; 1.配置master 2.配置slave 3.存在数据时添加slave2 4.GTID模式 什么是GTID模式&#xff1f; 配置GTID 5.延迟复制 6.慢查询日志 核心作用 开启慢查询日志…

《MySQL进阶核心技术剖析(一): 存储引擎》

目录 一、存储引擎 1.1 MySQL体系结构 1.2 存储引擎介绍 1). 建表时指定存储引擎 2). 查询当前数据库支持的存储引擎 1.3 存储引擎特点 1.3.1 InnoDB 1.3.2 MyISAM 1.3.3 Memory 1.3.4 区别及特点 1.4 存储引擎选择 一、存储引擎 1.1 MySQL体系结构 1). 连接层 最上…

sqli-labs:Less-26关卡详细解析

1. 思路&#x1f680; 本关的SQL语句为&#xff1a; $sql"SELECT * FROM users WHERE id$id LIMIT 0,1";注入类型&#xff1a;字符串型&#xff08;单引号包裹&#xff09;、GET操作提示&#xff1a;参数需以闭合关键参数&#xff1a;id php输出语句的部分代码&am…

Spring Boot 的事务注解 @Transactional 失效的几种情况

开发中我们经常会用到 Spring Boot 的事务注解&#xff0c;为含有多种操作的方法添加事务&#xff0c;做到如果某一个环节出错&#xff0c;全部回滚的效果。但是在开发中可能会因为不了解事务机制&#xff0c;而导致我们的方法使用了 Transactional 注解但是没有生效的情况&…

#C语言——刷题攻略:牛客编程入门训练(四):运算

&#x1f31f;菜鸟主页&#xff1a;晨非辰的主页 &#x1f440;学习专栏&#xff1a;《C语言刷题合集》 &#x1f4aa;学习阶段&#xff1a;C语言方向初学者 ⏳名言欣赏&#xff1a;"代码行数决定你的下限&#xff0c;算法思维决定你的上限。" 目录 1. BC25 牛牛买电…

阻抗分析中的软件解调计算

接上篇 重温无功功率测量-CSDN博客 已知被测阻抗两端电压与流过 通过两个ADC同步采集到。 激励频率10k, 采样率1M, 每周期100个点 关键是:采样率除以激励频率, 得是4的倍数... 所以ADC不能自由运行, 得用一个timer来触发. 因为要进行同相分量正交分量计算。 1&#xff1a;直…

ubuntu 镜像克隆

一、克隆 1、准备 一个u盘&#xff08;制作启动盘&#xff09; 一个移动固态硬盘&#xff08;大于要克隆系统盘的1.2倍&#xff09; 2、使用 rufus生成系统启动盘 &#xff08;1&#xff09;下载ubuntu iso 桌面版 https://cn.ubuntu.com/download &#xff08;2&#x…

Axure下拉菜单:从基础交互到高保真元件库应用

在Web端产品设计中&#xff0c;下拉菜单&#xff08;Dropdown Menu&#xff09; 是用户与系统交互的核心组件之一&#xff0c;它通过隐藏次要选项、节省页面空间的方式&#xff0c;提升信息密度与操作效率。无论是基础下拉菜单、图标式下拉菜单&#xff0c;还是复杂的多级下拉菜…

复现YOLOV5+训练指定数据集

一、复现YOLOV5代码 1.github下载&#xff1a;https://github.com/MIPIT-Team/SSA-YOLO 2.配置环境&#xff1a;创建虚拟环境yolo5 conda create -n yolo5 python3.9 #对应文件夹下pip install -r requirements.txt报错&#xff1a;ERROR: pips dependency resolver does no…

Agents-SDK智能体开发[4]之集成MCP入门

文章目录说明一 Agents SDK接入MCP1.1 MCP技术回顾1.2 MCP基础实践流程1.2.1 天气查询服务器Server创建流程1.2.2 服务器依赖安装和代码编写1.2.3 环境配置文件1.2.4 客户端代码编写1.3 测试运行二 MCPAgents SDK基础调用2.1 weather_server.py2.2 client_agent.py2.3 运行测试…

Camera相机人脸识别系列专题分析之十九:MTK ISP6S平台FDNode传递三方FFD到APP流程解析

【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了: 这一篇我们开始讲: Camera相机人脸识别系列专题分析之十九:MTK平台FDNode传递三方FFD到APP流程解析 目录 一、背景 二、:OcamMeta传递FFD到APP 2.1:OcamMeta 2.2 :OcamMeta::process更新FFD 2.…

【实时Linux实战系列】构建实时监测与报警系统

在实时系统中&#xff0c;监测与报警系统是确保系统正常运行和及时响应异常情况的关键组件。实时监测与报警系统能够实时收集系统数据&#xff0c;分析关键事件&#xff0c;并在检测到异常时发出警报。这种系统广泛应用于工业自动化、医疗设备监控、网络安全等领域。掌握实时监…

PHP入门及数据类型

PHP数据类型 PHP标记 //HTML风格 <?phpecho "hello world"; ?> //简短风格 <?echo "hello world"; ?>数据类型 PHP 最初源于 Perl 语言&#xff0c;与 Perl 类似&#xff0c;PHP 对数据类型采取较为宽松的态度。PHP 规定&#xff0c;变量数…

沸点 | 嬴图参加世界人工智能大会

2025 WAIC于 7 月 26 日至 28 日在上海举行。大会展览面积突破 7 万平方米&#xff0c;800 余家企业参展。嬴图作为图数据库领域的领先企业&#xff0c;携前沿技术与创新应用精彩亮相。​大会期间&#xff0c;嬴图创始人兼CEO孙宇熙与来自全球的顶尖学者、企业代表共同探讨人工…

2. 字符设备驱动

一、设备号 1.1. 什么是设备号 设备号是用来标记一类设备以及区分这类设备中具体个体的一组号码。 设备号由主设备号和次设备号组成。主设备号的作用为标记一类设备、用于标识设备驱动程序,而次设备号的作用是为了区分这类设备中的具体个体设备及用于标识同一驱动程序下的具…

uboot armv8 启动流程之 linker script

section 详细说明.text按如下顺序&#xff0c;中断向量表vectors, 启动入口代码start.o,普通text, glue &#xff08;arm thumb2 相互调用时自动生成的代码&#xff09;*(.vectors)CPUDIR/start.o (.text*)*(.text*)*(.glue*)__image_copy_start 标记为text 段入口&#xff0c;…

xxljob总结

XXL-Job 支持多种任务类型&#xff0c;以下是常见任务类型的示例 Demo&#xff0c;包含核心配置和代码片段&#xff0c;帮助快速理解用法&#xff1a;一、Bean模式任务&#xff08;最常用&#xff09;通过注解 XxlJob 定义任务方法&#xff0c;直接在 Spring 容器中管理&…

Python包安全工程实践:构建安全可靠的Python生态系统

在现代计算环境中&#xff0c;性能往往是Python包成功的关键因素。本文将深入探讨Python包的性能优化技术&#xff0c;包括并发编程模型、性能分析工具、内存优化策略以及原生代码集成等高级主题&#xff0c;帮助你构建高性能的Python组件。1. 性能分析基础1.1 性能分析工具矩阵…

kubernetes基础知识

个人博客站—运维鹿: http://www.kervin24.top CSDN博客—做个超努力的小奚&#xff1a; https://blog.csdn.net/qq_52914969?typeblog一、kubernetes介绍Kubernetes本质是一组服务器集群&#xff0c;它可以在集群的每个节点上运行特定的程序&#xff0c;来对节点中的容器进行…