此博客文章是关于比较社区已发布的域事件与集成事件模式的几种方法。我可能会根据反馈和我们将在短期/中期内完成的一些实现来发展这篇文章。所以,请随时在本文末尾的评论中讨论它。我愿意接受各种变化。 ?
在讨论“域事件”时,有一些缺乏明确性,因为有时你可能会谈论发生的域事件,但是在仍然在内存对象中的操作的同一事务范围内(所以我实际上,它仍然没有发生另一方面,您可能正在讨论已在持久性存储中提交的事件,并且由于状态传播以及多个微服务,有界上下文或外部系统之间的一致性而必须发布这些事件。在后一种情况下,从应用程序和持久性的角度来看,它确实发生在过去。
后一种类型的事件可以用作“集成事件”,因为它的相关数据是保持肯定的,因此,您可以使用EventBus.Publish()的任何实现,并将该事件异步发布给任何订阅者,无论订阅者是否在 - 内存或远程和进程外,您正在通过真实的消息传递系统进行通信,如Service Bus或任何支持Pubs / Subs模型的基于队列的系统。
让我们从第一个模型开始。一种内存/事务域事件。
域事件
提到的区别非常重要,因为在第一个模型(纯域事件)中,您必须在进程内(内存中)并在同一事务或操作中执行所有操作。您必须从域实体类(AggregateRoot或子域实体)中的业务方法引发事件,并处理来自多个处理程序(任何订阅的任何人)的事件,但所有事件都发生在同一事务范围内。这是至关重要的,因为如果在发布域事件时,该事件以“一次又一次”的方式异步发布到其他地方,包括远程系统的可能性,那么在提交原始事务之前就会发生事件传播,因此,如果如果您的原始事务或操作失败,您的原始本地域和订阅系统之间的数据就会不一致(可能是在进程外,就像另一个微服务一样)。
但是,如果这些事件仅在同一事务范围和内存中处理,则该方法很好并提供“完全封装的域模型”。
简而言之,内存中的Domain事件及其副作用(事件处理程序的操作)需要在同一逻辑事务中发生。
Udi Dahan在几篇博客文章中清楚地揭示了这个模型:
- http://udidahan.com/2009/06/14/domain-events-salvation/
- http://udidahan.com/2008/08/25/domain-events-take-2/
- http://udidahan.com/2008/02/29/how-to-create-fully-encapsulated-domain-…
还有Jimmy Bogard撰写的这篇好博文:
如果我记得很清楚,Vaugh Vernon对Domain Events的实现非常相似。
您可以在DomainEvents类实现中有差异。它可以是类似于Udi方法的静态类,也可以使用IoC容器(如Autofac等)来获取DomainEvents类,并将该类连接起来进行进程内通信。但重要的一点是,在持久化状态之前,您在Domain实体中引发事件,因此,所有事件处理程序必须在同一事务中执行,否则您可能会获得不一致的状态。
例如,从Udi的使用Domain实体的示例:
public class Customer
{
public void DoSomething()
{
//Operations to change Customer’s state to preferred (in-memory)
//…
//Raise the Domain event
DomainEvents.Raise(new CustomerBecamePreferred()
{ Customer = this
});
}
}
或者在Jimmy Bogard的初始示例中(在他的其他演化模式之前)使用此代码片段与另一个域实体:
public Payment RecordPayment(decimal paymentAmount, IBalanceCalculator balanceCalculator)
{
var payment = new Payment(paymentAmount, this);
_payments.Add(payment);
Balance = balanceCalculator.Calculate(this);
if (Balance == 0)
DomainEvents.Raise(new FeePaidOff(this));
return payment;
}
在这两种情况下,您都会在域实体中引发事件,此时该事件只是内存中的对象仍未保留。因此,无论订阅该事件的事件处理程序发生什么,都必须在同一组操作或事务中进行。这就是为什么该事件不应该是基于异步消息传递(如队列或服务总线)的集成事件,除非您以某种方式使其成为事务性的,因为事件处理程序可能正在传播仍未在数据库中持久化的事件,如果您原始事务失败,您将在远程系统(称为微服务,有界上下文或远程子系统甚至不同的应用程序)和原始域模型之间存在不一致。
处理事件时,订阅事件的任何事件处理程序都可以使用其他AggregateRoot对象运行其他域操作,但同样,您仍需要在同一事务范围内。
public class FeePaidOffHandler : IHandler<FeePaidOff>
{
private readonly ICustomerRepository _customerRepository;
public FeePaidOffHandler(ICustomerRepository customerRepository)
{
_customerRepository = customerRepository;
}
public void Handle(FeePaidOff args)
{
var fee = args.Fee;
var customer = _customerRepository.GetCustomerChargedForFee(fee);
customer.UpdateAtRiskStatus();
}
}
因此,域事件作为模式并不总是适用。但是,对于作为同一域模型的一部分并且是同一事务的一部分的断开连接的聚合的基于内存事件的通信,域事件可以很好地确保在同一微服务或有界上下文中的单个域模型之间的一致性。
两阶段域事件(吉米·博加德)
史蒂夫史密斯也在GitHub及其博客上发布的一些例子中使用了这种方法。
注意:我称之为“两阶段”域事件,因为显然有两个解耦阶段。但它与“基于DTC等的两阶段提交事务”无关......
因此,Jimmy Bogard写了一篇有趣的博客文章(一个更好的域事件模式),他描述了一种使用域事件的替代方法,它仍然意味着在同一事务范围或内存中的操作组内的事件副作用操作,但使用一种改进的方法,它将提升事件操作与调度操作分离。
他开始强调原始方法的问题,即DomainEvents类立即调度给处理程序。这使得测试您的域模型变得困难,因为在提升域事件时立即执行副作用(由事件处理程序执行的操作)。
但是,如果不是直接从域实体中提升域事件(请记住,在这种情况下事件将立即处理),而是记录发生的域事件,并且在提交事务之前,您将在此处调度所有这些域事件。点?
您现在可以看到,在实体类中,您不是在提升/发布域事件,而是仅向同一范围内的集合添加事件。
public Payment RecordPayment(decimal paymentAmount, IBalanceCalculator balanceCalculator)
{
var payment = new Payment(paymentAmount, this);
_payments.Add(payment);
Balance = balanceCalculator.Calculate(this);
if (Balance == 0)
Events.Add(new FeePaidOff(this));
return payment;
}
Events集合将是每个Domain Entity需要事件的子事件集合(通常AggregateRoots是需要事件的实体,最重要的是)。
最后,您需要实际触发这些域事件。这是你可以挂钩到我们正在使用的任何ORM的基础设施层的东西。在Entity Framework中,您可以挂钩到SaveChanges方法,如下所示:
public override int SaveChanges() {
var domainEventEntities = ChangeTracker.Entries<IEntity>()
.Select(po => po.Entity)
.Where(po => po.Events.Any())
.ToArray();
foreach (var entity in domainEventEntities)
{
var events = entity.Events.ToArray();
entity.Events.Clear();
foreach (var domainEvent in events)
{
_dispatcher.Dispatch(domainEvent);
}
}
return base.SaveChanges();
}
在任何情况下,您都可以看到在使用EF DbContext SaveChanges()提交原始事务操作之前,您将立即调度事件。这就是为什么这些事件及其事件处理程序仍然需要在内存中运行并作为同一事务的一部分。除此之外,如果您发送远程和异步事件(如集成事件)但“SaveChanges()”失败,您的多个模型,微服务或外部系统之间的数据将不一致。
只有在通过某些事务工件(如事务队列)发布/调度消息的情况下,才可以使用此方法进行消息传递(进程外通信),因此如果原始的.SaveChanges()失败,您将接受事件 - 在由其他微服务或外部系统处理之前从事务队列返回消息。
无论如何,与常规域事件相比,这种方法的优点在于,您可以使用Events.Add()从调度到处理程序来解除域事件的引发。
集成事件
处理集成事件通常与在同一事务中使用域事件不同。您可能需要发送异步事件以将更改从一个原始域模型(例如,原始微服务或原始有界上下文)传递和传播到多个订阅的微服务甚至外部订阅的应用程序。
由于这种异步通信,在100%确定原始操作(事件)过去确实发生之前,不应发布集成事件。这意味着它被持久存储在数据库或任何持久系统中。如果不这样做,最终可能会在多个微服务,有界上下文或外部应用程序中产生不一致的信息。
这种集成事件非常类似于完整CQRS中使用的事件与“写入数据库”不同的“读取数据库”,您需要将事务或“写入数据库”中的更改/事件传播到“读取数据库”。
一个非常类似的场景是在通信进程外微服务时,每个微服务拥有自己的模型,因为它实际上也是相同的模式。
因此,当您发布集成事件时,通常会在将数据保存到持久数据库后立即从基础结构层或应用程序层执行,例如,Greg Young在其简化的CQRS + EventSource示例中这样做:
https://github.com/gregoryyoung/m-r/blob/master/SimpleCQRS/EventStore.cs
public class EventStore : IEventStore
{
//Additional parts of the EventStore…
//…
public void SaveEvents(Guid aggregateId, IEnumerable<Event> events, int expectedVersion)
{
//Validations and obtaining objects, etc…
// iterate through current aggregate events increasing version with each processed event
foreach (var @event in events)
{
i++;
@event.Version = i;
// push event to the event descriptors list for current aggregate
eventDescriptors.Add(new EventDescriptor(aggregateId,@event,i));
// publish current event to the EventBus for further processing by subscribers
_publisher.Publish(@event);
}
}
}
与使用EventSourcing作为持久性工件的CQRS示例无关,您可以看到在使用EventBus.Publish()发布集成事件时,在确保数据在原始系统中保留之后,它是作为最后一个基础结构操作完成的。
即使是通信Pub / Subs工件也应该被命名为不同的。在多个有界上下文或微服务中传播状态更改时,感觉更像是EventBus和EventBus.Publish(IntegrationEvent)。
但是,在相同的微服务或有界上下文(本文中的第一个场景)中使用进程内域事件时,如果将通信工件命名为DomainEvents.Add(),然后简单地将Dispatcher.Dispath(DomainEvent)命名为更好。
当然,你可以说Jimmy Bogard使用EF SaveChanges()提出的最后一种方法看起来非常类似于这个SaveEvents()。不同之处在于,当使用覆盖SaveChanges()时,最后一个操作是base.SaveChanges(),因此在确保正确完成持久性之前不应发布集成事件(异步消息传递)(除非该publish()动作是也是交易的)。
此外,关于域事件,如果在调度事件之前调用base.SaveChanges(),那么它们将不参与事务,这会破坏域事件的整个目的。在这种情况下,您必须担心事件的事务失败时在同一域中发生的事情。这就是集成事件或承诺事件是一个单独的故事的原因。
将域事件转换为集成事件
当然,您可能希望将一些域事件传播到其他微服务或有界上下文。关键是您将域事件转换为集成事件(或将多个域事件聚合为单个集成事件)并在确认原始事务已提交之后将其发布到外部世界,在“它确实发生”之后过去在原始系统中,这是事件的真实定义。
如何在发布集成事件之前发生故障时保持微服务之间的一致性
有多种方法可以确保聚合的保存操作和发布的事件在发生故障时得到保证,因为,不要搞错,在分布式系统中会发生故障,因此您需要接受它们。
根据您的应用,一些可能的方法如下:
使用事务数据库表作为消息队列,该消息队列将成为创建事件并发布事件的事件创建者组件的基础。
使用事务日志挖掘
使用事件采购模式
可能使用事件源模式是最好的方法之一,但是,您可能无法在许多应用程序场景中实现完整的事件源。
事务日志挖掘可能过于耦合到底层基础结构。在某些情况下,您不希望这样。它很复杂,并且非常多地与基础设施耦合,在这种情况下可能与SQL基础设施相关。
我认为解决此问题的一个好方法是基于诸如“准备发布事件”状态的最终一致性,例如您在原始事务(和提交)中设置的状态,然后尝试将事件发布到事件总线(基于在队列或其他什么)。然后,如果成功(Service Bus事务工作,如将事件保留在队列中并提交),则在源/原始服务中启动另一个事务,并将状态从“准备发布事件”移动到“已发布的事件” ”。
如果“发布事件”操作失败,数据将不会在原始微服务中不一致,因为它仍然标记为“准备发布事件”,并且就其他服务而言,它最终将是一致的,就像您一样可以始终让后台进程检查事务的状态,如果发现任何“准备发布事件”,它将尝试将该事件重新发布到事件总线(队列等)中。
您可以看到这种方法可以基于简化的事件采购系统,因为您需要一个具有当前状态的集成事件列表(准备发布与已发布),但您只需要为此实现简化的事件源。您可能不需要将“所有内容”存储为事务数据库中的事件,您只需要存储每个微服务的集成事件。
一个重要的方面是通信中任何一点的失败都应该导致重试消息(以指数退避重试等),或者搜索“准备发布事件”状态的后台进程可能会尝试发布一个事件,正在同时发布,就像在竞争条件下一样,所以你还需要确保它是幂等的或携带足够的信息,以确保你可以检测到副本并丢弃它并发回你已发送的相同响应。
摘要
因此,我喜欢的全球方法如下:
域事件:当域事件范围纯粹在域本身内时,您从域实体类添加或引发域事件(使用或不使用Jimmy的“两阶段”方法,这更好)仅用于域内通信,就像在一个微服务领域内一样。
集成事件:当您需要在多个微服务或外部传播状态更改时,使用来自域实体类外部的EventBus.Publish()(来自基础结构甚至应用程序层,如命令处理程序)发布集成事件处理系统。
结论
基本上,通过区分域事件和集成事件,您可以解决处理事务的问题,因为域事件总是在事务中作用域,但集成事件(使用EventBus.Publish())仅在事务中发布到外部世界成功了。通过这样做,您可以确定其他域模型,微服务和外部系统不会对事实已经回滚且不再存在的事物做出反应。确保整个过程外系统(如微服务)的一致性。
将域事件用于内部域事件以及在进程外系统之间进行集成
作为另一种可能的方法,当您引发域事件时,一旦事务成功完成,它也将排队到CommitedEvent队列。然后,您将有两种类型的处理程序。 DomainEventHandler和CommitedEventHandler。
您可以扩展任何这些类。如果您需要立即处理事件(内部域操作),您将使用“正常”DomainEventHandler。如果您需要Commited数据(例如:您的处理程序依赖于对存储数据的查询),您可以使用CommitedDomainEventHandler,它与处理集成事件非常相似。
在一天结束时,它非常类似于具有域事件和集成事件,但是根据状态(已提交与未提交事件)使用相同的事件定义通过不同的通道。
本文:http://pub.intelligentx.net/node/556
讨论:请加入知识星球或者小红圈【首席架构师圈】
- 登录 发表评论
- 58 次浏览
最新内容
- 1 week 1 day ago
- 1 week 2 days ago
- 1 week 5 days ago
- 1 week 6 days ago
- 2 weeks ago
- 2 weeks ago
- 2 weeks ago
- 2 weeks ago
- 2 weeks 1 day ago
- 3 weeks ago