Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 77 additions & 69 deletions src/Symfony/Doctrine/EventListener/PublishMercureUpdatesListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
use ApiPlatform\Doctrine\Common\Messenger\DispatchTrait;
use ApiPlatform\GraphQl\Subscription\MercureSubscriptionIriGeneratorInterface as GraphQlMercureSubscriptionIriGeneratorInterface;
use ApiPlatform\GraphQl\Subscription\SubscriptionManagerInterface as GraphQlSubscriptionManagerInterface;
use ApiPlatform\Metadata\CollectionOperationInterface;
use ApiPlatform\Metadata\Exception\InvalidArgumentException;
use ApiPlatform\Metadata\Exception\OperationNotFoundException;
use ApiPlatform\Metadata\Exception\RuntimeException;
use ApiPlatform\Metadata\HttpOperation;
use ApiPlatform\Metadata\IriConverterInterface;
Expand Down Expand Up @@ -58,9 +58,12 @@ final class PublishMercureUpdatesListener
'enable_async_update' => true,
];
private readonly ?ExpressionLanguage $expressionLanguage;
private \SplObjectStorage $createdObjects;
private \SplObjectStorage $updatedObjects;
private \SplObjectStorage $deletedObjects;
/** @var list<array{object: object, options: array, operation: ?Operation}> */
private array $createdObjects;
/** @var list<array{object: object, options: array, operation: ?Operation}> */
private array $updatedObjects;
/** @var list<array{object: object, options: array, operation: ?Operation}> */
private array $deletedObjects;

/**
* @param array<string, string[]|string> $formats
Expand Down Expand Up @@ -127,40 +130,30 @@ public function onFlush(EventArgs $eventArgs): void
public function postFlush(): void
{
try {
$creatingObjects = clone $this->createdObjects;
foreach ($creatingObjects as $object) {
if ($this->createdObjects->offsetExists($object)) {
$this->createdObjects->offsetUnset($object);
}
$this->publishUpdate($object, $creatingObjects[$object], 'create');
foreach ($this->createdObjects as $entry) {
$this->publishUpdate($entry['object'], $entry['options'], 'create', $entry['operation']);
}
$this->createdObjects = [];

$updatingObjects = clone $this->updatedObjects;
foreach ($updatingObjects as $object) {
if ($this->updatedObjects->offsetExists($object)) {
$this->updatedObjects->offsetUnset($object);
}
$this->publishUpdate($object, $updatingObjects[$object], 'update');
foreach ($this->updatedObjects as $entry) {
$this->publishUpdate($entry['object'], $entry['options'], 'update', $entry['operation']);
}
$this->updatedObjects = [];

$deletingObjects = clone $this->deletedObjects;
foreach ($deletingObjects as $object) {
$options = $this->deletedObjects[$object];
if ($this->deletedObjects->offsetExists($object)) {
$this->deletedObjects->offsetUnset($object);
}
$this->publishUpdate($object, $deletingObjects[$object], 'delete');
foreach ($this->deletedObjects as $entry) {
$this->publishUpdate($entry['object'], $entry['options'], 'delete', $entry['operation']);
}
$this->deletedObjects = [];
} finally {
$this->reset();
}
}

private function reset(): void
{
$this->createdObjects = new \SplObjectStorage();
$this->updatedObjects = new \SplObjectStorage();
$this->deletedObjects = new \SplObjectStorage();
$this->createdObjects = [];
$this->updatedObjects = [];
$this->deletedObjects = [];
}

private function storeObjectToPublish(object $object, string $property): void
Expand All @@ -169,63 +162,79 @@ private function storeObjectToPublish(object $object, string $property): void
return;
}

$operation = $this->resourceMetadataFactory->create($resourceClass)->getOperation();
try {
$options = $operation->getMercure() ?? false;
} catch (OperationNotFoundException) {
return;
}
$resourceMetadataCollection = $this->resourceMetadataFactory->create($resourceClass);

if (\is_string($options)) {
if (null === $this->expressionLanguage) {
throw new RuntimeException('The Expression Language component is not installed. Try running "composer require symfony/expression-language".');
foreach ($resourceMetadataCollection as $resourceMetadata) {
/** @var ?HttpOperation $operation */
$operation = null;
foreach ($resourceMetadata->getOperations() ?? [] as $op) {
if (!$op instanceof CollectionOperationInterface) {
$operation = $op;
break;
}
}

$options = $this->expressionLanguage->evaluate($options, ['object' => $object]);
}
if (null === $operation) {
continue;
}

if (false === $options) {
return;
}
$options = $operation->getMercure() ?? false;

if (true === $options) {
$options = [];
}
if (\is_string($options)) {
if (null === $this->expressionLanguage) {
throw new RuntimeException('The Expression Language component is not installed. Try running "composer require symfony/expression-language".');
}

if (!\is_array($options)) {
throw new InvalidArgumentException(\sprintf('The value of the "mercure" attribute of the "%s" resource class must be a boolean, an array of options or an expression returning this array, "%s" given.', $resourceClass, \gettype($options)));
}
$options = $this->expressionLanguage->evaluate($options, ['object' => $object]);
}

foreach ($options as $key => $value) {
if (!isset(self::ALLOWED_KEYS[$key])) {
throw new InvalidArgumentException(\sprintf('The option "%s" set in the "mercure" attribute of the "%s" resource does not exist. Existing options: "%s"', $key, $resourceClass, implode('", "', array_keys(self::ALLOWED_KEYS))));
if (false === $options) {
continue;
}
}

$options['enable_async_update'] ??= true;
if (true === $options) {
$options = [];
}

if ('deletedObjects' === $property) {
$types = $operation instanceof HttpOperation ? $operation->getTypes() : null;
if (null === $types) {
$types = [$operation->getShortName()];
if (!\is_array($options)) {
throw new InvalidArgumentException(\sprintf('The value of the "mercure" attribute of the "%s" resource class must be a boolean, an array of options or an expression returning this array, "%s" given.', $resourceClass, \gettype($options)));
}

// We need to evaluate it here, because in publishUpdate() the resource would be already deleted
$this->evaluateTopics($options, $object);
foreach ($options as $key => $value) {
if (!isset(self::ALLOWED_KEYS[$key])) {
throw new InvalidArgumentException(\sprintf('The option "%s" set in the "mercure" attribute of the "%s" resource does not exist. Existing options: "%s"', $key, $resourceClass, implode('", "', array_keys(self::ALLOWED_KEYS))));
}
}

$this->deletedObjects[(object) [
'id' => $this->iriConverter->getIriFromResource($object),
'iri' => $this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_URL),
'type' => 1 === \count($types) ? $types[0] : $types,
]] = $options;
$options['enable_async_update'] ??= true;

return;
}
if ('deletedObjects' === $property) {
$types = $operation->getTypes();
if (null === $types) {
$types = [$operation->getShortName()];
}

// We need to evaluate it here, because in publishUpdate() the resource would be already deleted
$this->evaluateTopics($options, $object);

$this->deletedObjects[] = [
'object' => (object) [
'id' => $this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_PATH, $operation),
'iri' => $this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_URL, $operation),
'type' => 1 === \count($types) ? $types[0] : $types,
],
'options' => $options,
'operation' => $operation,
];

$this->{$property}[$object] = $options;
continue;
}

$this->{$property}[] = ['object' => $object, 'options' => $options, 'operation' => $operation];
}
}

private function publishUpdate(object $object, array $options, string $type): void
private function publishUpdate(object $object, array $options, string $type, ?Operation $operation = null): void
{
if ($object instanceof \stdClass) {
// By convention, if the object has been deleted, we send only its IRI and its type.
Expand All @@ -235,13 +244,12 @@ private function publishUpdate(object $object, array $options, string $type): vo
/** @var non-empty-string $data */
$data = json_encode(['@id' => $object->id] + ($this->includeType ? ['@type' => $object->type] : []), \JSON_THROW_ON_ERROR);
} else {
$resourceClass = $this->getObjectClass($object);
$context = $options['normalization_context'] ?? $this->resourceMetadataFactory->create($resourceClass)->getOperation()->getNormalizationContext() ?? [];
$context = $options['normalization_context'] ?? $operation?->getNormalizationContext() ?? [];

// We need to evaluate it here, because in storeObjectToPublish() the resource would not have been persisted yet
$this->evaluateTopics($options, $object);

$iri = $options['topics'] ?? $this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_URL);
$iri = $options['topics'] ?? $this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_URL, $operation);
$data = $options['data'] ?? $this->serializer->serialize($object, key($this->formats), $context);
}

Expand Down
Loading
Loading