百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

在多模块单体应用中使用 Outbox/Inbox 模式实现可靠的事件处理

ccwgpt 2025-04-08 12:27 24 浏览 0 评论

本文介绍如何在使用多个数据库的模块化单体应用中, 通过 Outbox/Inbox 模式实现可靠的事件处理. 我们将以 ModularCRM 项目为例进行说明.

项目背景

ModularCRM 是一个集成了多个 ABP 框架开源模块的单体应用, 包括:

  • Account
  • Identity
  • Tenant Management
  • Permission Management
  • Setting Management
  • 等开源模块

除了ABP框架开源模块外, 项目还包含三个业务模块:

  • 订单模块(Products), 使用 MongoDB 数据库
  • 产品模块(Ordering), 使用 SQL Server 数据库
  • 支付模块(Payment), 使用 MongoDB 数据库

项目在 appsettings.json 中分别为 ModularCRM 和三个业务模块配置了独立的数据库连接字符串:

{
"ConnectionStrings": {
"Default": "Server=localhost,1434;Database=ModularCrm;User Id=sa;Password=1q2w3E***;TrustServerCertificate=true",
"Products": "Server=localhost,1434;Database=ModularCrm_Products;User Id=sa;Password=1q2w3E***;TrustServerCertificate=true",
"Ordering": "mongodb://localhost:27017/ModularCrm_Ordering?replicaSet=rs0",
"Payment": "mongodb://localhost:27017/ModularCrm_Payment?replicaSet=rs0"
}
}

业务场景

这些模块通过 ABP 框架的 DistributedEventBus 进行通信, 实现以下业务流程:

这里我们以一个简单的业务流程为例, 实际业务流程会更复杂. 示例代码主要用于演示和问题解决.

  1. 订单模块: 用户下单后发布 OrderPlacedEto 事件
  2. 产品模块: 订阅 OrderPlacedEto 事件后更新产品库存
  3. 支付模块: 订阅 OrderPlacedEto 事件后处理支付, 完成后发布 PaymentCompletedEto 事件
  4. 订单模块: 订阅 PaymentCompletedEto 事件后更新订单状态

实现这个流程时, 我们需要确保:

  • 下单操作和事件发布的事务一致性
  • 各模块处理消息时的事务一致性
  • 消息传递的可靠性(包括持久化、确认和重试机制)

仅使用 ABP 框架的 DistributedEventBus 无法满足上述要求, 因此我们需要引入新的机制.

Outbox/Inbox 模式解决方案

为了满足上述要求,我们采用 Outbox/Inbox 模式:

Outbox 模式

  • 将分布式事件与数据库操作在同一事务中保存
  • 通过后台作业将事件发送到分布式消息中间件
  • 确保数据更新与事件发布的一致性
  • 防止系统故障期间的消息丢失

Inbox 模式

  • 先将接收到的分布式事件保存到数据库
  • 通过事务性方式处理事件
  • 通过保存已处理消息来确保消息只被处理一次
  • 维护处理状态以实现可靠处理

如何在项目和模块中启用和配置 Outbox/Inbox, 请参考:
https://abp.io/docs/latest/framework/infrastructure/event-bus/distributed#
outbox-inbox-for-transactional-events

模块配置

每个模块需要配置独立的 Outbox/Inbox. 由于是单体应用, 所有消息处理类都在同一个项目中, 我们需要为每个模块配置 Outbox/InboxSelector/EventSelector, 以确保模块只发送和接收它关注的消息, 避免消息重复处理.

ModularCRM 主应用配置

它会发送和接收所有ABP框架开源模块的消息.

// This selector will match all abp built-in modules and the current module.
Func<Type, bool> abpModuleSelector = type => type.Namespace != && (type.Namespace.StartsWith("Volo.") || type.Assembly == typeof(ModularCrmModule).Assembly);

Configure(options =>
{
options.Inboxes.Configure("ModularCrm", config =>
{
config.UseDbContext();
config.EventSelector = abpModuleSelector;
config.HandlerSelector = abpModuleSelector;
});

options.Outboxes.Configure("ModularCrm", config =>
{
config.UseDbContext();
config.Selector = abpModuleSelector;
});
});

订单模块配置

它只发送OrderPlacedEto事件, 并接收PaymentCompletedEto事件和执行
OrderPaymentCompletedEventHandler
.

Configure(options =>
{
options.Inboxes.Configure(OrderingDbProperties.ConnectionStringName, config =>
{
config.UseMongoDbContext();
config.EventSelector = type => type == typeof(PaymentCompletedEto);
config.HandlerSelector = type => type == typeof(OrderPaymentCompletedEventHandler);
});

options.Outboxes.Configure(OrderingDbProperties.ConnectionStringName, config =>
{
config.UseMongoDbContext();
config.Selector = type => type == typeof(OrderPlacedEto);
});
});

产品模块配置

它只接收EntityCreatedEtoOrderPlacedEto事件, 并执行
ProductsOrderPlacedEventHandler

ProductsUserCreatedEventHandler
. 暂时不需要发送任何事件.

Configure(options =>
{
options.Inboxes.Configure(ProductsDbProperties.ConnectionStringName, config =>
{
config.UseDbContext();
config.EventSelector = type => type == typeof(EntityCreatedEto) || type == typeof(OrderPlacedEto);
config.HandlerSelector = type => type == typeof(ProductsOrderPlacedEventHandler) || type == typeof(ProductsUserCreatedEventHandler);
});

// Outboxes are not used in this module
options.Outboxes.Configure(ProductsDbProperties.ConnectionStringName, config =>
{
config.UseDbContext();
config.Selector = type => false;
});
});

支付模块配置

它只发送PaymentCompletedEto事件, 并接收OrderPlacedEto事件和执行
PaymentOrderPlacedEventHandler
.

Configure(options =>
{
options.Inboxes.Configure(PaymentDbProperties.ConnectionStringName, config =>
{
config.UseMongoDbContext();
config.EventSelector = type => type == typeof(OrderPlacedEto);
config.HandlerSelector = type => type == typeof(PaymentOrderPlacedEventHandler);
});

options.Outboxes.Configure(PaymentDbProperties.ConnectionStringName, config =>
{
config.UseMongoDbContext();
config.Selector = type => type == typeof(PaymentCompletedEto);
});
});

运行ModularCRM模拟业务流程

  1. ModularCrm 目录下运行:
# 在Docker中启动SQL Server和MongoDB数据库
docker-compose up -d

# 还原安装依赖项
abp install-lib

# 迁移数据库
dotnet run --project ModularCrm --migrate-database

# 启动应用
dotnet run --project ModularCrm
  • 访问 https://localhost:44303/ 进入应用首页

  • 输入一个客户名称然后选择一个产品并提交一个订单. 稍等片刻后刷新页面可以看到订单,产品以及支付信息.

系统日志显示完整的处理流程:

[Ordering Module] Order created: OrderId: b7ad3f47-0e77-bb81-082f-3a1834503e88, ProductId: 0f95689f-4cb6-36f5-68bd-3a18344d32c9, CustomerName: john

[Products Module] OrderPlacedEto event received: OrderId: b7ad3f47-0e77-bb81-082f-3a1834503e88, CustomerName: john, ProductId: 0f95689f-4cb6-36f5-68bd-3a18344d32c9
[Products Module] Stock count decreased for ProductId: 0f95689f-4cb6-36f5-68bd-3a18344d32c9

[Payment Module] OrderPlacedEto event received: OrderId: b7ad3f47-0e77-bb81-082f-3a1834503e88, CustomerName: john, ProductId: 0f95689f-4cb6-36f5-68bd-3a18344d32c9
[Payment Module] Payment processing completed for OrderId: b7ad3f47-0e77-bb81-082f-3a1834503e88

[Ordering Module] PaymentCompletedEto event received: OrderId: b7ad3f47-0e77-bb81-082f-3a1834503e88, PaymentId: d0a41ead-ee0f-714c-e254-3a1834504d65, PaymentMethod: CreditCard, PaymentAmount: ModularCrm.Payment.Payment.PaymentCompletedEto
[Ordering Module] Order state updated to Delivered for OrderId: b7ad3f47-0e77-bb81-082f-3a1834503e88

此外,当新用户注册时,产品模块还会接收到 EntityCreatedEto 事件, 我们会给新用户发送一个邮件, 这只是为了演示Outbox/Inbox的Selector机制.

[Products Module] UserCreated event received: UserId: "9a1f2bd0-5b28-210a-9e56-3a18344d310a", UserName: admin
[Products Module] Sending a popular products email to admin@abp.io...

总结

通过引入 Outbox/Inbox 模式, 我们实现了:

  1. 事务性的消息发送和接收
  2. 可靠的消息处理机制
  3. 多数据库环境下的模块化事件处理

ModularCRM 项目不仅实现了可靠的消息处理, 还展示了如何在单体应用中优雅地处理多数据库场景. 项目源码:
https://github.com/abpframework/abp-samples/tree/master/ModularCrm-OutboxInbox-Pattern

参考资料

  • Outbox/Inbox for transactional events https://abp.io/docs/latest/framework/infrastructure/event-bus/distributed#outbox-inbox-for-transactional-events
  • ConnectionStrings https://abp.io/docs/latest/framework/fundamentals/connection-strings
  • ABP Studio: Single Layer Solution Template https://abp.io/docs/latest/solution-templates/single-layer-web-application

相关推荐

十分钟让你学会LNMP架构负载均衡(impala负载均衡)

业务架构、应用架构、数据架构和技术架构一、几个基本概念1、pv值pv值(pageviews):页面的浏览量概念:一个网站的所有页面,在一天内,被浏览的总次数。(大型网站通常是上千万的级别)2、u...

AGV仓储机器人调度系统架构(agv物流机器人)

系统架构层次划分采用分层模块化设计,分为以下五层:1.1用户接口层功能:提供人机交互界面(Web/桌面端),支持任务下发、实时监控、数据可视化和报警管理。模块:任务管理面板:接收订单(如拣货、...

远程热部署在美团的落地实践(远程热点是什么意思)

Sonic是美团内部研发设计的一款用于热部署的IDEA插件,本文其实现原理及落地的一些技术细节。在阅读本文之前,建议大家先熟悉一下Spring源码、SpringMVC源码、SpringBoot...

springboot搭建xxl-job(分布式任务调度系统)

一、部署xxl-job服务端下载xxl-job源码:https://gitee.com/xuxueli0323/xxl-job二、导入项目、创建xxl_job数据库、修改配置文件为自己的数据库三、启动...

大模型:使用vLLM和Ray分布式部署推理应用

一、vLLM:面向大模型的高效推理框架1.核心特点专为推理优化:专注于大模型(如GPT-3、LLaMA)的高吞吐量、低延迟推理。关键技术:PagedAttention:类似操作系统内存分页管理,将K...

国产开源之光【分布式工作流调度系统】:DolphinScheduler

DolphinScheduler是一个开源的分布式工作流调度系统,旨在帮助用户以可靠、高效和可扩展的方式管理和调度大规模的数据处理工作流。它支持以图形化方式定义和管理工作流,提供了丰富的调度功能和监控...

简单可靠高效的分布式任务队列系统

#记录我的2024#大家好,又见面了,我是GitHub精选君!背景介绍在系统访问量逐渐增大,高并发、分布式系统成为了企业技术架构升级的必由之路。在这样的背景下,异步任务队列扮演着至关重要的角色,...

虚拟服务器之间如何分布式运行?(虚拟服务器部署)

  在云计算和虚拟化技术快速发展的今天,传统“单机单任务”的服务器架构早已难以满足现代业务对高并发、高可用、弹性伸缩和容错容灾的严苛要求。分布式系统应运而生,并成为支撑各类互联网平台、企业信息系统和A...

一文掌握 XXL-Job 的 6 大核心组件

XXL-Job是一个分布式任务调度平台,其核心组件主要包括以下部分,各组件相互协作实现高效的任务调度与管理:1.调度注册中心(RegistryCenter)作用:负责管理调度器(Schedule...

京东大佬问我,SpringBoot中如何做延迟队列?单机与分布式如何做?

京东大佬问我,SpringBoot中如何做延迟队列?单机如何做?分布式如何做呢?并给出案例与代码分析。嗯,用户问的是在SpringBoot中如何实现延迟队列,单机和分布式环境下分别怎么做。这个问题其实...

企业级项目组件选型(一)分布式任务调度平台

官网地址:https://www.xuxueli.com/xxl-job/能力介绍架构图安全性为提升系统安全性,调度中心和执行器进行安全性校验,双方AccessToken匹配才允许通讯;调度中心和执...

python多进程的分布式任务调度应用场景及示例

多进程的分布式任务调度可以应用于以下场景:分布式爬虫:importmultiprocessingimportrequestsdefcrawl(url):response=re...

SpringBoot整合ElasticJob实现分布式任务调度

介绍ElasticJob是面向互联网生态和海量任务的分布式调度解决方案,由两个相互独立的子项目ElasticJob-Lite和ElasticJob-Cloud组成。它通过弹性调度、资源管控、...

分布式可视化 DAG 任务调度系统 Taier 的整体流程分析

Taier作为袋鼠云的开源项目之一,是一个分布式可视化的DAG任务调度系统。旨在降低ETL开发成本,提高大数据平台稳定性,让大数据开发人员可以在Taier直接进行业务逻辑的开发,而不用关...

SpringBoot任务调度:@Scheduled与TaskExecutor全面解析

一、任务调度基础概念1.1什么是任务调度任务调度是指按照预定的时间计划或特定条件自动执行任务的过程。在现代应用开发中,任务调度扮演着至关重要的角色,它使得开发者能够自动化处理周期性任务、定时任务和异...

取消回复欢迎 发表评论: