From 6d4a8317c1da2cbf6a8201bc73eddda0c4ffa326 Mon Sep 17 00:00:00 2001 From: soyuka Date: Tue, 17 Feb 2026 13:52:28 +0100 Subject: [PATCH] fix(symfony): publish mercure updates for all resources of an entity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit | Q | A | ------------- | --- | Branch? | fix/multiple-mercure | Tickets | ∅ | License | MIT | Doc PR | ∅ * SplObjectStorage keyed by entity only allowed one update per entity; replaced with array to support multiple entries per resource * storeObjectToPublish iterates all resources in the metadata collection and stores a separate entry for each one that has mercure enabled * publishUpdate uses the specific operation for normalization context and IRI resolution Co-Authored-By: Claude Opus 4.6 --- .../PublishMercureUpdatesListener.php | 146 +++++++++--------- .../PublishMercureUpdatesListenerTest.php | 104 +++++++++++-- .../Entity/DummyMercureMultiResource.php | 50 ++++++ 3 files changed, 219 insertions(+), 81 deletions(-) create mode 100644 src/Symfony/Tests/Fixtures/TestBundle/Entity/DummyMercureMultiResource.php diff --git a/src/Symfony/Doctrine/EventListener/PublishMercureUpdatesListener.php b/src/Symfony/Doctrine/EventListener/PublishMercureUpdatesListener.php index 902f4c52a0..7ca43c05dc 100644 --- a/src/Symfony/Doctrine/EventListener/PublishMercureUpdatesListener.php +++ b/src/Symfony/Doctrine/EventListener/PublishMercureUpdatesListener.php @@ -16,8 +16,8 @@ use ApiPlatform\Doctrine\Common\Messenger\DispatchTrait; use ApiPlatform\GraphQl\Subscription\MercureSubscriptionIriGeneratorInterface as GraphQlMercureSubscriptionIriGeneratorInterface; use ApiPlatform\GraphQl\Subscription\SubscriptionManagerInterface as GraphQlSubscriptionManagerInterface; +use ApiPlatform\Metadata\CollectionOperationInterface; use ApiPlatform\Metadata\Exception\InvalidArgumentException; -use ApiPlatform\Metadata\Exception\OperationNotFoundException; use ApiPlatform\Metadata\Exception\RuntimeException; use ApiPlatform\Metadata\HttpOperation; use ApiPlatform\Metadata\IriConverterInterface; @@ -58,9 +58,12 @@ final class PublishMercureUpdatesListener 'enable_async_update' => true, ]; private readonly ?ExpressionLanguage $expressionLanguage; - private \SplObjectStorage $createdObjects; - private \SplObjectStorage $updatedObjects; - private \SplObjectStorage $deletedObjects; + /** @var list */ + private array $createdObjects; + /** @var list */ + private array $updatedObjects; + /** @var list */ + private array $deletedObjects; /** * @param array $formats @@ -127,30 +130,20 @@ public function onFlush(EventArgs $eventArgs): void public function postFlush(): void { try { - $creatingObjects = clone $this->createdObjects; - foreach ($creatingObjects as $object) { - if ($this->createdObjects->offsetExists($object)) { - $this->createdObjects->offsetUnset($object); - } - $this->publishUpdate($object, $creatingObjects[$object], 'create'); + foreach ($this->createdObjects as $entry) { + $this->publishUpdate($entry['object'], $entry['options'], 'create', $entry['operation']); } + $this->createdObjects = []; - $updatingObjects = clone $this->updatedObjects; - foreach ($updatingObjects as $object) { - if ($this->updatedObjects->offsetExists($object)) { - $this->updatedObjects->offsetUnset($object); - } - $this->publishUpdate($object, $updatingObjects[$object], 'update'); + foreach ($this->updatedObjects as $entry) { + $this->publishUpdate($entry['object'], $entry['options'], 'update', $entry['operation']); } + $this->updatedObjects = []; - $deletingObjects = clone $this->deletedObjects; - foreach ($deletingObjects as $object) { - $options = $this->deletedObjects[$object]; - if ($this->deletedObjects->offsetExists($object)) { - $this->deletedObjects->offsetUnset($object); - } - $this->publishUpdate($object, $deletingObjects[$object], 'delete'); + foreach ($this->deletedObjects as $entry) { + $this->publishUpdate($entry['object'], $entry['options'], 'delete', $entry['operation']); } + $this->deletedObjects = []; } finally { $this->reset(); } @@ -158,9 +151,9 @@ public function postFlush(): void private function reset(): void { - $this->createdObjects = new \SplObjectStorage(); - $this->updatedObjects = new \SplObjectStorage(); - $this->deletedObjects = new \SplObjectStorage(); + $this->createdObjects = []; + $this->updatedObjects = []; + $this->deletedObjects = []; } private function storeObjectToPublish(object $object, string $property): void @@ -169,63 +162,79 @@ private function storeObjectToPublish(object $object, string $property): void return; } - $operation = $this->resourceMetadataFactory->create($resourceClass)->getOperation(); - try { - $options = $operation->getMercure() ?? false; - } catch (OperationNotFoundException) { - return; - } + $resourceMetadataCollection = $this->resourceMetadataFactory->create($resourceClass); - if (\is_string($options)) { - if (null === $this->expressionLanguage) { - throw new RuntimeException('The Expression Language component is not installed. Try running "composer require symfony/expression-language".'); + foreach ($resourceMetadataCollection as $resourceMetadata) { + /** @var ?HttpOperation $operation */ + $operation = null; + foreach ($resourceMetadata->getOperations() ?? [] as $op) { + if (!$op instanceof CollectionOperationInterface) { + $operation = $op; + break; + } } - $options = $this->expressionLanguage->evaluate($options, ['object' => $object]); - } + if (null === $operation) { + continue; + } - if (false === $options) { - return; - } + $options = $operation->getMercure() ?? false; - if (true === $options) { - $options = []; - } + if (\is_string($options)) { + if (null === $this->expressionLanguage) { + throw new RuntimeException('The Expression Language component is not installed. Try running "composer require symfony/expression-language".'); + } - if (!\is_array($options)) { - throw new InvalidArgumentException(\sprintf('The value of the "mercure" attribute of the "%s" resource class must be a boolean, an array of options or an expression returning this array, "%s" given.', $resourceClass, \gettype($options))); - } + $options = $this->expressionLanguage->evaluate($options, ['object' => $object]); + } - foreach ($options as $key => $value) { - if (!isset(self::ALLOWED_KEYS[$key])) { - throw new InvalidArgumentException(\sprintf('The option "%s" set in the "mercure" attribute of the "%s" resource does not exist. Existing options: "%s"', $key, $resourceClass, implode('", "', array_keys(self::ALLOWED_KEYS)))); + if (false === $options) { + continue; } - } - $options['enable_async_update'] ??= true; + if (true === $options) { + $options = []; + } - if ('deletedObjects' === $property) { - $types = $operation instanceof HttpOperation ? $operation->getTypes() : null; - if (null === $types) { - $types = [$operation->getShortName()]; + if (!\is_array($options)) { + throw new InvalidArgumentException(\sprintf('The value of the "mercure" attribute of the "%s" resource class must be a boolean, an array of options or an expression returning this array, "%s" given.', $resourceClass, \gettype($options))); } - // We need to evaluate it here, because in publishUpdate() the resource would be already deleted - $this->evaluateTopics($options, $object); + foreach ($options as $key => $value) { + if (!isset(self::ALLOWED_KEYS[$key])) { + throw new InvalidArgumentException(\sprintf('The option "%s" set in the "mercure" attribute of the "%s" resource does not exist. Existing options: "%s"', $key, $resourceClass, implode('", "', array_keys(self::ALLOWED_KEYS)))); + } + } - $this->deletedObjects[(object) [ - 'id' => $this->iriConverter->getIriFromResource($object), - 'iri' => $this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_URL), - 'type' => 1 === \count($types) ? $types[0] : $types, - ]] = $options; + $options['enable_async_update'] ??= true; - return; - } + if ('deletedObjects' === $property) { + $types = $operation->getTypes(); + if (null === $types) { + $types = [$operation->getShortName()]; + } + + // We need to evaluate it here, because in publishUpdate() the resource would be already deleted + $this->evaluateTopics($options, $object); + + $this->deletedObjects[] = [ + 'object' => (object) [ + 'id' => $this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_PATH, $operation), + 'iri' => $this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_URL, $operation), + 'type' => 1 === \count($types) ? $types[0] : $types, + ], + 'options' => $options, + 'operation' => $operation, + ]; - $this->{$property}[$object] = $options; + continue; + } + + $this->{$property}[] = ['object' => $object, 'options' => $options, 'operation' => $operation]; + } } - private function publishUpdate(object $object, array $options, string $type): void + private function publishUpdate(object $object, array $options, string $type, ?Operation $operation = null): void { if ($object instanceof \stdClass) { // By convention, if the object has been deleted, we send only its IRI and its type. @@ -235,13 +244,12 @@ private function publishUpdate(object $object, array $options, string $type): vo /** @var non-empty-string $data */ $data = json_encode(['@id' => $object->id] + ($this->includeType ? ['@type' => $object->type] : []), \JSON_THROW_ON_ERROR); } else { - $resourceClass = $this->getObjectClass($object); - $context = $options['normalization_context'] ?? $this->resourceMetadataFactory->create($resourceClass)->getOperation()->getNormalizationContext() ?? []; + $context = $options['normalization_context'] ?? $operation?->getNormalizationContext() ?? []; // We need to evaluate it here, because in storeObjectToPublish() the resource would not have been persisted yet $this->evaluateTopics($options, $object); - $iri = $options['topics'] ?? $this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_URL); + $iri = $options['topics'] ?? $this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_URL, $operation); $data = $options['data'] ?? $this->serializer->serialize($object, key($this->formats), $context); } diff --git a/src/Symfony/Tests/Doctrine/EventListener/PublishMercureUpdatesListenerTest.php b/src/Symfony/Tests/Doctrine/EventListener/PublishMercureUpdatesListenerTest.php index 94524d7aa3..8ce791e1a5 100644 --- a/src/Symfony/Tests/Doctrine/EventListener/PublishMercureUpdatesListenerTest.php +++ b/src/Symfony/Tests/Doctrine/EventListener/PublishMercureUpdatesListenerTest.php @@ -30,6 +30,7 @@ use ApiPlatform\Symfony\Tests\Fixtures\TestBundle\Entity\DummyCar; use ApiPlatform\Symfony\Tests\Fixtures\TestBundle\Entity\DummyFriend; use ApiPlatform\Symfony\Tests\Fixtures\TestBundle\Entity\DummyMercure; +use ApiPlatform\Symfony\Tests\Fixtures\TestBundle\Entity\DummyMercureMultiResource; use ApiPlatform\Symfony\Tests\Fixtures\TestBundle\Entity\DummyOffer; use ApiPlatform\Symfony\Tests\Fixtures\TestBundle\Entity\MercureWithTopicsAndGetOperation; use Doctrine\ORM\EntityManagerInterface; @@ -84,14 +85,14 @@ public function testPublishUpdate(): void $resourceClassResolverProphecy->isResourceClass(DummyMercure::class)->willReturn(true); $iriConverterProphecy = $this->prophesize(IriConverterInterface::class); - $iriConverterProphecy->getIriFromResource($toInsert, UrlGeneratorInterface::ABS_URL)->willReturn('http://example.com/dummies/1')->shouldBeCalled(); - $iriConverterProphecy->getIriFromResource($toUpdate, UrlGeneratorInterface::ABS_URL)->willReturn('http://example.com/dummies/2')->shouldBeCalled(); - $iriConverterProphecy->getIriFromResource($toDelete, UrlGeneratorInterface::ABS_URL)->willReturn('http://example.com/dummies/3')->shouldBeCalled(); - $iriConverterProphecy->getIriFromResource($toDelete)->willReturn('/dummies/3')->shouldBeCalled(); - $iriConverterProphecy->getIriFromResource($toDeleteExpressionLanguage)->willReturn('/dummy_friends/4')->shouldBeCalled(); - $iriConverterProphecy->getIriFromResource($toDeleteExpressionLanguage, UrlGeneratorInterface::ABS_URL)->willReturn('http://example.com/dummy_friends/4')->shouldBeCalled(); - $iriConverterProphecy->getIriFromResource($toDeleteMercureOptions)->willReturn('/dummy_offers/5')->shouldBeCalled(); - $iriConverterProphecy->getIriFromResource($toDeleteMercureOptions, UrlGeneratorInterface::ABS_URL)->willReturn('http://example.com/dummy_offers/5')->shouldBeCalled(); + $iriConverterProphecy->getIriFromResource($toInsert, UrlGeneratorInterface::ABS_URL, Argument::any())->willReturn('http://example.com/dummies/1')->shouldBeCalled(); + $iriConverterProphecy->getIriFromResource($toUpdate, UrlGeneratorInterface::ABS_URL, Argument::any())->willReturn('http://example.com/dummies/2')->shouldBeCalled(); + $iriConverterProphecy->getIriFromResource($toDelete, UrlGeneratorInterface::ABS_URL, Argument::any())->willReturn('http://example.com/dummies/3')->shouldBeCalled(); + $iriConverterProphecy->getIriFromResource($toDelete, UrlGeneratorInterface::ABS_PATH, Argument::any())->willReturn('/dummies/3')->shouldBeCalled(); + $iriConverterProphecy->getIriFromResource($toDeleteExpressionLanguage, UrlGeneratorInterface::ABS_PATH, Argument::any())->willReturn('/dummy_friends/4')->shouldBeCalled(); + $iriConverterProphecy->getIriFromResource($toDeleteExpressionLanguage, UrlGeneratorInterface::ABS_URL, Argument::any())->willReturn('http://example.com/dummy_friends/4')->shouldBeCalled(); + $iriConverterProphecy->getIriFromResource($toDeleteMercureOptions, UrlGeneratorInterface::ABS_PATH, Argument::any())->willReturn('/dummy_offers/5')->shouldBeCalled(); + $iriConverterProphecy->getIriFromResource($toDeleteMercureOptions, UrlGeneratorInterface::ABS_URL, Argument::any())->willReturn('http://example.com/dummy_offers/5')->shouldBeCalled(); $resourceMetadataFactoryProphecy = $this->prophesize(ResourceMetadataCollectionFactoryInterface::class); @@ -215,9 +216,8 @@ public function testPublishUpdateMultipleTopicsUsingExpressionLanguage(): void $iriConverterProphecy->getIriFromResource($toUpdate, UrlGeneratorInterface::ABS_PATH, null)->willReturn('/mercure_with_topics_and_get_operations/2')->shouldBeCalled(); $iriConverterProphecy->getIriFromResource($toUpdate, UrlGeneratorInterface::ABS_URL, Argument::exact($customGetOperation))->willReturn('http://example.com/custom_resource/mercure_with_topics_and_get_operations/2')->shouldBeCalled(); - $iriConverterProphecy->getIriFromResource($toDelete)->willReturn('/mercure_with_topics_and_get_operations/3')->shouldBeCalled(); - $iriConverterProphecy->getIriFromResource($toDelete, UrlGeneratorInterface::ABS_URL, null)->willReturn('http://example.com/mercure_with_topics_and_get_operations/3')->shouldBeCalled(); - $iriConverterProphecy->getIriFromResource($toDelete, UrlGeneratorInterface::ABS_PATH, null)->willReturn('/mercure_with_topics_and_get_operations/3')->shouldBeCalled(); + $iriConverterProphecy->getIriFromResource($toDelete, UrlGeneratorInterface::ABS_PATH, Argument::any())->willReturn('/mercure_with_topics_and_get_operations/3')->shouldBeCalled(); + $iriConverterProphecy->getIriFromResource($toDelete, UrlGeneratorInterface::ABS_URL, Argument::any())->willReturn('http://example.com/mercure_with_topics_and_get_operations/3')->shouldBeCalled(); $iriConverterProphecy->getIriFromResource($toDelete, UrlGeneratorInterface::ABS_URL, Argument::exact($customGetOperation))->willReturn('http://example.com/custom_resource/mercure_with_topics_and_get_operations/3')->shouldBeCalled(); $resourceMetadataFactoryProphecy = $this->prophesize(ResourceMetadataCollectionFactoryInterface::class); @@ -298,7 +298,7 @@ public function testPublishGraphQlUpdates(): void $resourceClassResolverProphecy->isResourceClass(Dummy::class)->willReturn(true); $iriConverterProphecy = $this->prophesize(IriConverterInterface::class); - $iriConverterProphecy->getIriFromResource($toUpdate, UrlGeneratorInterface::ABS_URL)->willReturn('http://example.com/dummies/2'); + $iriConverterProphecy->getIriFromResource($toUpdate, UrlGeneratorInterface::ABS_URL, Argument::any())->willReturn('http://example.com/dummies/2'); $resourceMetadataFactoryProphecy = $this->prophesize(ResourceMetadataCollectionFactoryInterface::class); $resourceMetadataFactoryProphecy->create(Dummy::class)->willReturn(new ResourceMetadataCollection(Dummy::class, [(new ApiResource())->withOperations(new Operations([ @@ -364,6 +364,86 @@ public function testPublishGraphQlUpdates(): void $this->assertEquals(['2', '["data"]'], $data); } + public function testPublishUpdateWithMultipleResources(): void + { + $toInsert = new DummyMercureMultiResource(); + $toInsert->id = 1; + $toInsert->name = 'test'; + + $toDelete = new DummyMercureMultiResource(); + $toDelete->id = 2; + $toDelete->name = 'deleted'; + + $resourceClassResolverProphecy = $this->prophesize(ResourceClassResolverInterface::class); + $resourceClassResolverProphecy->getResourceClass(Argument::type(DummyMercureMultiResource::class))->willReturn(DummyMercureMultiResource::class); + $resourceClassResolverProphecy->isResourceClass(DummyMercureMultiResource::class)->willReturn(true); + + $iriConverterProphecy = $this->prophesize(IriConverterInterface::class); + $iriConverterProphecy->getIriFromResource($toInsert, UrlGeneratorInterface::ABS_URL, Argument::any())->willReturn('http://example.com/admin/dummy_mercures/1', 'http://example.com/dummy_mercures/1'); + $iriConverterProphecy->getIriFromResource($toDelete, UrlGeneratorInterface::ABS_PATH, Argument::any())->willReturn('/admin/dummy_mercures/2', '/dummy_mercures/2'); + $iriConverterProphecy->getIriFromResource($toDelete, UrlGeneratorInterface::ABS_URL, Argument::any())->willReturn('http://example.com/admin/dummy_mercures/2', 'http://example.com/dummy_mercures/2'); + + $adminGetOp = (new Get(uriTemplate: '/admin/dummy_mercures/{id}{._format}'))->withShortName('AdminDummyMercure')->withMercure(['enable_async_update' => false, 'hub' => 'managed'])->withNormalizationContext(['groups' => ['admin:read']]); + $publicGetOp = (new Get(uriTemplate: '/dummy_mercures/{id}{._format}'))->withShortName('DummyMercure')->withMercure(['enable_async_update' => false, 'hub' => 'managed'])->withNormalizationContext(['groups' => ['read']]); + + $resourceMetadataFactoryProphecy = $this->prophesize(ResourceMetadataCollectionFactoryInterface::class); + $resourceMetadataFactoryProphecy->create(DummyMercureMultiResource::class)->willReturn(new ResourceMetadataCollection(DummyMercureMultiResource::class, [ + (new ApiResource())->withShortName('AdminDummyMercure')->withOperations(new Operations([ + 'get' => $adminGetOp, + ])), + (new ApiResource())->withShortName('DummyMercure')->withOperations(new Operations([ + 'get' => $publicGetOp, + ])), + ])); + + $serializerProphecy = $this->prophesize(SerializerInterface::class); + $serializerProphecy->serialize($toInsert, 'jsonld', ['groups' => ['admin:read']])->willReturn('{"admin":1}'); + $serializerProphecy->serialize($toInsert, 'jsonld', ['groups' => ['read']])->willReturn('{"public":1}'); + + $formats = ['jsonld' => ['application/ld+json']]; + + $topics = []; + $data = []; + + $managedHub = $this->createMockHub(static function (Update $update) use (&$topics, &$data): string { + $topics = array_merge($topics, $update->getTopics()); + $data[] = $update->getData(); + + return 'id'; + }); + + $listener = new PublishMercureUpdatesListener( + $resourceClassResolverProphecy->reveal(), + $iriConverterProphecy->reveal(), + $resourceMetadataFactoryProphecy->reveal(), + $serializerProphecy->reveal(), + $formats, + null, + new HubRegistry($this->createMock(HubInterface::class), ['managed' => $managedHub]), + null, + null, + null, + true, + ); + + $uowProphecy = $this->prophesize(UnitOfWork::class); + $uowProphecy->getScheduledEntityInsertions()->willReturn([$toInsert])->shouldBeCalled(); + $uowProphecy->getScheduledEntityUpdates()->willReturn([])->shouldBeCalled(); + $uowProphecy->getScheduledEntityDeletions()->willReturn([$toDelete])->shouldBeCalled(); + + $emProphecy = $this->prophesize(EntityManagerInterface::class); + $emProphecy->getUnitOfWork()->willReturn($uowProphecy->reveal())->shouldBeCalled(); + $eventArgs = new OnFlushEventArgs($emProphecy->reveal()); + + $listener->onFlush($eventArgs); + $listener->postFlush(); + + // Both resources should have published updates + $this->assertCount(4, $data, 'Expected 4 updates: 2 inserts (admin + public) + 2 deletes (admin + public)'); + $this->assertEquals('{"admin":1}', $data[0]); + $this->assertEquals('{"public":1}', $data[1]); + } + private function createMockHub(callable $callable): HubInterface { return new MockHub('https://mercure.demo/.well-known/mercure', new StaticTokenProvider('x'), $callable); diff --git a/src/Symfony/Tests/Fixtures/TestBundle/Entity/DummyMercureMultiResource.php b/src/Symfony/Tests/Fixtures/TestBundle/Entity/DummyMercureMultiResource.php new file mode 100644 index 0000000000..dea53ceb0a --- /dev/null +++ b/src/Symfony/Tests/Fixtures/TestBundle/Entity/DummyMercureMultiResource.php @@ -0,0 +1,50 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace ApiPlatform\Symfony\Tests\Fixtures\TestBundle\Entity; + +use ApiPlatform\Metadata\ApiResource; +use ApiPlatform\Metadata\Get; +use ApiPlatform\Metadata\GetCollection; +use ApiPlatform\Metadata\Post; +use Doctrine\ORM\Mapping as ORM; + +/** + * Entity with two ApiResource declarations and mercure enabled on both. + * Tests that Mercure publishes updates for each resource, not just the first. + */ +#[ApiResource( + shortName: 'AdminDummyMercure', + uriTemplate: '/admin/dummy_mercures/{id}{._format}', + operations: [new Get(), new GetCollection(), new Post()], + mercure: ['enable_async_update' => false, 'hub' => 'managed'], + normalizationContext: ['groups' => ['admin:read']], +)] +#[ApiResource( + shortName: 'DummyMercure', + uriTemplate: '/dummy_mercures/{id}{._format}', + operations: [new Get(), new GetCollection()], + mercure: ['enable_async_update' => false, 'hub' => 'managed'], + normalizationContext: ['groups' => ['read']], +)] +#[ORM\Entity] +class DummyMercureMultiResource +{ + #[ORM\Id] + #[ORM\Column(type: 'integer')] + #[ORM\GeneratedValue(strategy: 'AUTO')] + public ?int $id = null; + + #[ORM\Column] + public string $name = ''; +}