Laravel-Event-Sourcing 中 StoredEvent 怎么保存 而 Projector 和 Reactor 又是怎么样监听到对应的事件的?
用户取消订单,需要触发 BookingCancelled
事件,我们需要在 Projector
监听到 BookingCancelled
后改变订单状态。同时需要在 Reactor
发送一封邮件给用户。
- 我们有一个 AggregateRoot
App\Bookings\AggregateRoot\BookingRoot
。里面有个取消订单的方法 cancle 代码如下:
class BookingRoot extends AggregateRoot
public function cancel(): self
{
$this->recordThat(
new BookingCancelled(
$this->userId,
)
);
return $this;
}
}
- 我们有一个 Projector
App\Bookings\Projector\BookingsProjector
。他是同步的,代码如下:
class BookingsProjector extends Projector
{
public function onBookingCancelled(
StoredEvent $storedEvent,
BookingCancelled $event,
string $aggregateUuid
) {
$projection = ProjectionBooking::query()
->where('booking_aggregate_id', $aggregateUuid)
->first();
$projection->status = BookingStatus::STATUS_CANCELLED;
$projection->cancelled = $storedEvent->created_at;
$projection->save();
}
}
- 我们有一个 Reactor
App\Bookings\Reactor\NotificationReactor
。他是异步的,代码如下:
class NotificationReactor extends Reactor implements ShouldQueue
{
public function onBookingCancelled(BookingCancelled $event, StoredEvent $storedEvent)
{
$user = User::query()->findOrFail($event->userId);
Notification::send($user, new BookingCancelledNotification('订单取消'));
}
当我们执行如下代码:
BookingRoot::retrieve($bookingAggregateId)->cancel()->persist();
public function persist()
{
$storedEvents = $this->persistWithoutApplyingToEventHandlers();
$storedEvents->each(fn (StoredEvent $storedEvent) => $storedEvent->handleForAggregateRoot());
$this->aggregateVersionAfterReconstitution = $this->aggregateVersion;
return $this;
}
protected function persistWithoutApplyingToEventHandlers(): LazyCollection
{
$this->ensureNoOtherEventsHaveBeenPersisted();
$storedEvents = $this
->getStoredEventRepository()
->persistMany(
$this->getAndClearRecordedEvents(),
$this->uuid(),
$this->aggregateVersion,
);
return $storedEvents;
}
在代码 $this->persistWithoutApplyingToEventHandlers
会调用 Spatie\EventSourcing\StoredEvents\Repositories\EloquentStoredEventRepository
的 persistMany 再循环要保存的事件通过 persist 保存到 Spatie\EventSourcing\StoredEvents\Models\EloquentStoredEvent
最后存到数据库表 stored_events 中。Spatie\EventSourcing\StoredEvents\Repositories\EloquentStoredEventRepository
代码如下:
class EloquentStoredEventRepository implements StoredEventRepository
{
public function persistMany(array $events, string $uuid = null, int $aggregateVersion = null): LazyCollection
{
$storedEvents = [];
foreach ($events as $event) {
$storedEvents[] = $this->persist($event, $uuid, $aggregateVersion);
}
return new LazyCollection($storedEvents);
}
public function persist(ShouldBeStored $event, string $uuid = null, int $aggregateVersion = null): StoredEvent
{
/** @var EloquentStoredEvent $eloquentStoredEvent */
$eloquentStoredEvent = new $this->storedEventModel();
$eloquentStoredEvent->setOriginalEvent($event);
$eloquentStoredEvent->setRawAttributes([
'event_properties' => app(EventSerializer::class)->serialize(clone $event),
'aggregate_uuid' => $uuid,
'aggregate_version' => $aggregateVersion,
'event_class' => $this->getEventClass(get_class($event)),
'meta_data' => json_encode($event->metaData()),
'created_at' => Carbon::now(),
]);
$eloquentStoredEvent->save();
return $eloquentStoredEvent->toStoredEvent();
}
$storedEvents = $this->persistWithoutApplyingToEventHandlers();
返回的是 item 为 `Spatie\EventSourcing\StoredEvents\StoredEvent` 的 `LazyCollection`
第二步:
```php
$storedEvents->each(fn (StoredEvent $storedEvent) => $storedEvent->handleForAggregateRoot());
循环调用每一个 Spatie\EventSourcing\StoredEvents\StoredEvent
的 handleForAggregateRoot
方法。
namespace Spatie\EventSourcing\StoredEvents;
class StoredEvent implements Arrayable
{
public function handleForAggregateRoot(): void
{
$this->handle();
if (! config('event-sourcing.dispatch_events_from_aggregate_roots', false)) {
return;
}
$this->event->firedFromAggregateRoot = true;
event($this->event);
}
public function handle()
{
// Projector 和 Reactor 如果是同步 那么在 Projectionist 的 handleWithSyncEventHandlers 方法处理
Projectionist::handleWithSyncEventHandlers($this);
if (method_exists($this->event, 'tags')) {
$tags = $this->event->tags();
}
if (! $this->shouldDispatchJob()) {
return;
}
$storedEventJob = call_user_func(
[config('event-sourcing.stored_event_job'), 'createForEvent'],
$this,
$tags ?? []
);
// Projector 和 Reactor 如果是异步就把 Spatie\EventSourcing\StoredEvents\HandleStoredEventJob 推送到队列
dispatch($storedEventJob->onQueue($this->getQueueName()));
}
上面代码中的 $this->event 为 App\Bookings\Event\BookingCancelled
$storedEventJob 为 Spatie\EventSourcing\StoredEvents\HandleStoredEventJob
属性 $storedEvent 为代码中的 $this。
上面代码中同步调用代码:
Projectionist::handleWithSyncEventHandlers($this);
我们找到 Spatie\EventSourcing\Projectionist
的 handleWithSyncEventHandlers
namespace Spatie\EventSourcing;
use Spatie\EventSourcing\EventHandlers\EventHandlerCollection;
class Projectionist
{
private EventHandlerCollection $projectors;
private EventHandlerCollection $reactors;
private bool $catchExceptions;
private bool $replayChunkSize;
private bool $isProjecting = false;
private bool $isReplaying = false;
public function __construct(array $config)
{
$this->projectors = new EventHandlerCollection();
$this->reactors = new EventHandlerCollection();
$this->catchExceptions = $config['catch_exceptions'];
$this->replayChunkSize = $config['replay_chunk_size'];
}
public function handleWithSyncEventHandlers(StoredEvent $storedEvent): void
{
$projectors = $this->projectors
->forEvent($storedEvent)
->syncEventHandlers();
$this->applyStoredEventToProjectors($storedEvent, $projectors);
$reactors = $this->reactors
->forEvent($storedEvent)
->syncEventHandlers();
$this->applyStoredEventToReactors($storedEvent, $reactors);
}
}
上面代码中的 handleWithSyncEventHandlers
方法其实是通过 EventHandlerCollection
筛选出同步的 Projector 和 Reactor。
如果是 Projector 执行 $this->applyStoredEventToProjectors($storedEvent, $projectors)
,
如果是 Reactor 执行 $this->applyStoredEventToReactors(StoredEvent $storedEvent, Collection $reactors):
private function applyStoredEventToProjectors(StoredEvent $storedEvent, Collection $projectors): void
{
$this->isProjecting = true;
foreach ($projectors as $projector) {
$this->callEventHandler($projector, $storedEvent);
}
$this->isProjecting = false;
}
private function applyStoredEventToReactors(StoredEvent $storedEvent, Collection $reactors): void
{
foreach ($reactors as $reactor) {
$this->callEventHandler($reactor, $storedEvent);
}
}
private function callEventHandler(EventHandler $eventHandler, StoredEvent $storedEvent): bool
{
try {
$eventHandler->handle($storedEvent);
} catch (Exception $exception) {
if (! $this->catchExceptions) {
throw $exception;
}
$eventHandler->handleException($exception);
event(new EventHandlerFailedHandlingEvent($eventHandler, $storedEvent, $exception));
return false;
}
return true;
}
在上面的案例中到了这一步,我们触发的是 $this->applyStoredEventToProjectors($storedEvent, $projectors)
。
当调用 $this->callEventHandler($projector, $storedEvent)
时候:
$projector 就是 App\Bookings\Projector\BookingsProjector
,
$storedEvent 就是 Spatie\EventSourcing\StoredEvents\StoredEvent
它的属性 event 是 App\Bookings\Event\BookingCancelled
。
所以 callEventHandler
里面的 $eventHandler->handle($storedEvent)
可以理解为:$projector->handle($storedEvent)
;
其实就是调用了 App\Bookings\Projector\BookingsProjector
的 handle 方法。然后 handle 执行了onBookingCancelled
方法。
上面代码中异步调用代码:
dispatch($storedEventJob->onQueue($this->getQueueName()));
Laravel 的监听最后通过 Illuminate\Bus\Dispatcher
执行方法 dispatchNow($command, $handler = null)
还是会调用 $storedEventJob 的 handler 方法。
我们找到 Spatie\EventSourcing\StoredEvents\HandleStoredEventJob
的相关代码:
namespace Spatie\EventSourcing\StoredEvents;
class HandleStoredEventJob implements HandleDomainEventJob, ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
public StoredEvent $storedEvent;
public array $tags;
public function __construct(StoredEvent $storedEvent, array $tags)
{
$this->storedEvent = $storedEvent;
$this->tags = $tags;
}
public function handle(Projectionist $projectionist): void
{
$projectionist->handle($this->storedEvent);
}
}
handle
继续调用了 Spatie\EventSourcing\Projectionist
的 handle
方法,代码如下:
namespace Spatie\EventSourcing;
class Projectionist
{
public function handle(StoredEvent $storedEvent): void
{
$projectors = $this->projectors
->forEvent($storedEvent)
->asyncEventHandlers();
$this->applyStoredEventToProjectors(
$storedEvent,
$projectors
);
$reactors = $this->reactors
->forEvent($storedEvent)
->asyncEventHandlers();
$this->applyStoredEventToReactors(
$storedEvent,
$reactors
);
}
}
到这里其实和同步的逻辑就一样了。 handle
和 handleWithSyncEventHandlers
的区别只是前者筛选的是异步的 Projector 和 Reactor。后者筛选的是同步的。然后走到
$this->applyStoredEventToProjectors($storedEvent, $projectors)
以及
$this->applyStoredEventToReactors(StoredEvent $storedEvent, Collection $reactors)
。
案例中:触发的就是App\Bookings\Reactor\NotificationReactor
的 handle
方法。然后 handle
执行了 onBookingCancelled
方法。
Laravel-Event-Sourcing 异步处理机制是判断监听当前事件的 Projector 和 Reactor 是否 instanceof ShouldQueue
,如果比对有一个则将当前事件创建的 HandleStoredEventJob 推送到 queue。无论同步还是异步的执行。都是拿到监听这个事件的所有 Projector 和 Reactor。如案例中的 BookingCancelled,则循环执行所有 Projector 和 Reactor 的 onBookingCancelled 方法。代码如下:
private function applyStoredEventToProjectors(StoredEvent $storedEvent, Collection $projectors): void
{
$this->isProjecting = true;
foreach ($projectors as $projector) {
$this->callEventHandler($projector, $storedEvent);
}
$this->isProjecting = false;
}
private function applyStoredEventToReactors(StoredEvent $storedEvent, Collection $reactors): void
{
foreach ($reactors as $reactor) {
$this->callEventHandler($reactor, $storedEvent);
}
}
private function callEventHandler(EventHandler $eventHandler, StoredEvent $storedEvent): bool
{
try {
$eventHandler->handle($storedEvent);
} catch (Exception $exception) {
if (! $this->catchExceptions) {
throw $exception;
}
$eventHandler->handleException($exception);
event(new EventHandlerFailedHandlingEvent($eventHandler, $storedEvent, $exception));
return false;
}
return true;
}
如果我们不希望循环执行所有 Projector 和 Reactor 时候被中断,可以在 event-sourcing.php 配置文件的 catch_exceptions 设置为 true,然后在 Projector 和 Reactor 自定义 handleException 方法处理我们的异常。 通过阅读以上代码可知 Laravel-Event-Sourcing 的异步是针对事件异步,不是针对监听这个事件的 Projector 和 Reactor 进行的异步。这一点是需要特别注意的。