diff --git a/bin/composer.phar b/bin/composer.phar index bd5bab8c..b58b584b 100755 Binary files a/bin/composer.phar and b/bin/composer.phar differ diff --git a/bin/dphp b/bin/dphp index 9d15ab52..4e04531d 100755 Binary files a/bin/dphp and b/bin/dphp differ diff --git a/src/DurableFuture.php b/src/DurableFuture.php index a77bfda4..206924d7 100644 --- a/src/DurableFuture.php +++ b/src/DurableFuture.php @@ -25,6 +25,8 @@ namespace Bottledcode\DurablePhp; use Amp\DeferredFuture; +use Bottledcode\DurablePhp\State\Serializer; +use LogicException; /** * @template T @@ -32,9 +34,10 @@ class DurableFuture { /** - * @param DeferredFuture $future + * @param DeferredFuture $future + * @param class-string|null $resultType */ - public function __construct(public readonly DeferredFuture $future) {} + public function __construct(public readonly DeferredFuture $future, public readonly ?string $resultType = null) {} /** * @return T @@ -42,10 +45,14 @@ public function __construct(public readonly DeferredFuture $future) {} public function getResult(): mixed { if ($this->future->isComplete()) { - return $this->future->getFuture()->await(); + if ($this->resultType === null) { + return $this->future->getFuture()->await(); + } + + return Serializer::deserialize($this->future->getFuture()->await(), $this->resultType); } - throw new \LogicException('Future is not complete'); + throw new LogicException('Future is not complete'); } public function hasResult(): bool diff --git a/src/OrchestrationContext.php b/src/OrchestrationContext.php index 01019b0e..00b24c73 100644 --- a/src/OrchestrationContext.php +++ b/src/OrchestrationContext.php @@ -70,19 +70,18 @@ final class OrchestrationContext implements OrchestrationContextInterface private int $randomKey = 0; - public function callActivity(string $name, array $args = [], ?RetryOptions $retryOptions = null): DurableFuture + public function callActivity(string $name, ?string $returnType = null, ?RetryOptions $retryOptions = null, mixed ...$args): DurableFuture { $this->durableLogger->debug('Calling activity', ['name' => $name]); $identity = $this->newGuid(); return $this->createFuture( - fn() - => $this->taskController->fire( - AwaitResult::forEvent( - StateId::fromInstance($this->id), - WithActivity::forEvent($identity, ScheduleTask::forName($name, $args)), - ), + fn() => $this->taskController->fire( + AwaitResult::forEvent( + StateId::fromInstance($this->id), + WithActivity::forEvent($identity, ScheduleTask::forName($name, $args)), ), + ), function (Event $event, string $eventIdentity) use ($identity): array { if (($event instanceof TaskCompleted || $event instanceof TaskFailed) && $eventIdentity === $identity->toString()) { @@ -92,6 +91,7 @@ function (Event $event, string $eventIdentity) use ($identity): array { return [null, false]; }, $identity->toString(), + $returnType, ); } @@ -113,9 +113,10 @@ private function createFuture( Closure $onSent, Closure $onReceived, ?string $identity = null, + ?string $resultType = null, ): DurableFuture { $identity ??= $this->history->historicalTaskResults->getIdentity(); - if (!$this->history->historicalTaskResults->hasSentIdentity($identity)) { + if (! $this->history->historicalTaskResults->hasSentIdentity($identity)) { $this->durableLogger->debug('Future requested for an unsent identity', [$identity]); [$eventId] = $onSent(); $deferred = new DeferredFuture(); @@ -129,7 +130,7 @@ private function createFuture( $this->durableLogger->debug('Future requested for a sent identity, processing future', [$identity]); $deferred = new DeferredFuture(); - $future = new DurableFuture($deferred); + $future = new DurableFuture($deferred, $resultType); $this->history->historicalTaskResults->trackFuture($onReceived, $future); return $future; @@ -177,9 +178,9 @@ public function callActivityInline(Closure $activity): DurableFuture public function callSubOrchestrator( string $name, - array $args = [], ?string $instanceId = null, ?RetryOptions $retryOptions = null, + mixed ...$args, ): DurableFuture { throw new LogicException('Not implemented'); } @@ -214,13 +215,12 @@ public function createTimer(DateTimeImmutable|DateInterval $fireAt): DurableFutu $identity = sha1($fireAt->format('c')); return $this->createFuture( - fn() - => $this->taskController->fire( - WithOrchestration::forInstance( - StateId::fromInstance($this->id), - WithDelay::forEvent($fireAt, RaiseEvent::forTimer($identity)), - ), + fn() => $this->taskController->fire( + WithOrchestration::forInstance( + StateId::fromInstance($this->id), + WithDelay::forEvent($fireAt, RaiseEvent::forTimer($identity)), ), + ), function (Event $event) use ($identity): array { if ($event instanceof RaiseEvent && $event->eventName === $identity) { return [$event, true]; @@ -236,10 +236,10 @@ public function getCurrentTime(): DateTimeImmutable return $this->history->historicalTaskResults->getCurrentTime(); } - public function waitForExternalEvent(string $name): DurableFuture + public function waitForExternalEvent(string $name, ?string $resultType = null): DurableFuture { $this->durableLogger->debug('Waiting for external event', ['name' => $name]); - $future = new DurableFuture(new DeferredFuture()); + $future = new DurableFuture(new DeferredFuture(), $resultType); $this->history->historicalTaskResults->trackFuture(function (Event $event) use ($name): array { $found = false; $result = null; @@ -370,7 +370,7 @@ public function isLockedOwned(EntityId $entityId): bool public function lockEntity(EntityId ...$entityId): EntityLock { $this->durableLogger->debug('Locking entities', ['entityId' => $entityId]); - if (!empty($this->history->locks ?? []) && !$this->isReplaying()) { + if (! empty($this->history->locks ?? []) && ! $this->isReplaying()) { throw new LogicException('Cannot lock an entity while holding locks'); } @@ -447,7 +447,7 @@ public function waitAll(DurableFuture ...$tasks): array /** * @template T * - * @param class-string $className + * @param class-string $className * @return T */ public function createEntityProxy(string $className, ?EntityId $entityId = null): object @@ -457,7 +457,7 @@ public function createEntityProxy(string $className, ?EntityId $entityId = null) } $class = new ReflectionClass($className); - if (!$class->isInterface()) { + if (! $class->isInterface()) { throw new LogicException('Only interfaces can be proxied'); } @@ -577,7 +577,7 @@ public function entityOp(string|EntityId $id, Closure $operation): mixed } $name = $type->getName(); - if (!interface_exists($name)) { + if (! interface_exists($name)) { throw new LogicException('Unable to load interface: ' . $name); } diff --git a/src/OrchestrationContextInterface.php b/src/OrchestrationContextInterface.php index 52afe692..5f7d4f91 100644 --- a/src/OrchestrationContextInterface.php +++ b/src/OrchestrationContextInterface.php @@ -41,11 +41,12 @@ interface OrchestrationContextInterface * @template T * * @param string $name The name of the function to remotely invoke - * @param array $args The arguments to pass to the function + * @param class-string|null $returnType * @param RetryOptions|null $retryOptions How to retry on failure + * @param array $args The arguments to pass to the function * @return DurableFuture */ - public function callActivity(string $name, array $args = [], ?RetryOptions $retryOptions = null): DurableFuture; + public function callActivity(string $name, ?string $returnType = null, ?RetryOptions $retryOptions = null, mixed ...$args): DurableFuture; /** * Calls an activity inline. There are no retries and exceptions will cause an immediate failure. @@ -94,9 +95,9 @@ public function lockEntity(EntityId ...$entityId): EntityLock; public function callSubOrchestrator( string $name, - array $args = [], ?string $instanceId = null, ?RetryOptions $retryOptions = null, + mixed ...$args, ): DurableFuture; public function continueAsNew(array $args = []): never; @@ -133,9 +134,10 @@ public function setCustomStatus(string $customStatus): void; * * @template T * + * @param class-string|null $resultType * @return DurableFuture */ - public function waitForExternalEvent(string $name): DurableFuture; + public function waitForExternalEvent(string $name, ?string $resultType = null): DurableFuture; /** * Gets the current time in a deterministic way. (always the time the execution started) @@ -190,11 +192,20 @@ public function waitAny(DurableFuture ...$tasks): DurableFuture; /** * Returns once all futures have completed. + * + * @template-covariant T + * + * @return array */ public function waitAll(DurableFuture ...$tasks): array; /** * Returns the result (or throws on failure) once a single future has completed. + * + * @template T + * + * @param DurableFuture $task + * @return T */ public function waitOne(DurableFuture $task): mixed; diff --git a/src/RemoteOrchestrationClient.php b/src/RemoteOrchestrationClient.php index 1be81766..39b2d592 100644 --- a/src/RemoteOrchestrationClient.php +++ b/src/RemoteOrchestrationClient.php @@ -35,7 +35,9 @@ use Bottledcode\DurablePhp\State\Status; use Exception; use Generator; +use JsonException; use Override; +use RuntimeException; use Withinboredom\Time\Unit; use function Withinboredom\Time\Hours; @@ -94,7 +96,11 @@ public function getStatus(OrchestrationInstance $instance): Status while ($result->getBody()->isReadable()) { $body .= $result->getBody()->buffer(); } - $result = json_decode($body, true, 512, JSON_THROW_ON_ERROR); + try { + $result = json_decode($body, true, flags: JSON_THROW_ON_ERROR); + } catch (JsonException $exception) { + throw new RuntimeException('Failed to decode JSON: ' . $body, previous: $exception); + } return Serializer::deserialize($result, Status::class); } diff --git a/src/Testing/DummyOrchestrationContext.php b/src/Testing/DummyOrchestrationContext.php index 9b4453e2..e62a9b62 100644 --- a/src/Testing/DummyOrchestrationContext.php +++ b/src/Testing/DummyOrchestrationContext.php @@ -106,8 +106,9 @@ public function asUser(string $userId): void public function callActivity( string $name, - array $args = [], + ?string $returnType = null, ?RetryOptions $retryOptions = null, + mixed ...$args, ): DurableFuture { $future = new DeferredFuture(); if ($this->activities[$name] ?? false) { @@ -118,7 +119,7 @@ public function callActivity( $future->complete($result); } - return new DurableFuture($future); + return new DurableFuture($future, $returnType); } throw new LogicException('Failed to find registered activity: ' . $name); @@ -147,7 +148,7 @@ public function entityOp(EntityId|string $id, Closure $operation): mixed } $name = $type->getName(); - if (!interface_exists($name)) { + if (! interface_exists($name)) { throw new LogicException('Unable to load interface: ' . $name); } @@ -230,9 +231,9 @@ public function lockEntity(EntityId ...$entityId): EntityLock public function callSubOrchestrator( string $name, - array $args = [], ?string $instanceId = null, ?RetryOptions $retryOptions = null, + mixed ...$args, ): DurableFuture { throw new LogicException('Not implemented'); } @@ -260,13 +261,13 @@ public function setCustomStatus(string $customStatus): void $this->status = $this->status->with(customStatus: $customStatus); } - public function waitForExternalEvent(string $name): DurableFuture + public function waitForExternalEvent(string $name, ?string $resultType = null): DurableFuture { $future = new DeferredFuture(); $value = $this->events[$name] ?? throw new LogicException('Event not found: ' . $name); $future->complete($value); - return new DurableFuture($future); + return new DurableFuture($future, $resultType); } public function getCurrentTime(): DateTimeImmutable @@ -358,7 +359,7 @@ public function waitAll(DurableFuture ...$tasks): array { $results = []; foreach ($tasks as $task) { - if (!$task->future->isComplete()) { + if (! $task->future->isComplete()) { throw new LogicException('Not all futures are completed'); } $results[] = $task->getResult(); @@ -376,7 +377,7 @@ public function createEntityProxy( } $class = new ReflectionClass($className); - if (!$class->isInterface()) { + if (! $class->isInterface()) { throw new LogicException('Only interfaces can be proxied'); } diff --git a/tests/PerformanceTests/Dockerfile b/tests/PerformanceTests/Dockerfile index 1fe18aab..3e6396c0 100644 --- a/tests/PerformanceTests/Dockerfile +++ b/tests/PerformanceTests/Dockerfile @@ -1,4 +1,4 @@ -FROM php:8.3-zts +FROM php:8.4-zts COPY --from=mlocati/php-extension-installer /usr/bin/install-php-extensions /usr/local/bin/ diff --git a/tests/PerformanceTests/FanOutFanIn.php b/tests/PerformanceTests/FanOutFanIn.php index 3565d185..74fed973 100644 --- a/tests/PerformanceTests/FanOutFanIn.php +++ b/tests/PerformanceTests/FanOutFanIn.php @@ -1,4 +1,5 @@ getInput()['count']; $tasks = []; for ($i = 0; $i < $count; $i++) { - $tasks[] = $context->callActivity(SayHello::class, [mb_str_pad((string) $i, 4, '0', STR_PAD_LEFT)]); + $tasks[] = $context->callActivity(SayHello::class, null, null, mb_str_pad((string) $i, 4, '0', STR_PAD_LEFT)); } $context->waitAll(...$tasks); foreach ($tasks as $i => $task) { diff --git a/tests/PerformanceTests/HelloCities/HelloSequence.php b/tests/PerformanceTests/HelloCities/HelloSequence.php index 18195d78..0f4be682 100644 --- a/tests/PerformanceTests/HelloCities/HelloSequence.php +++ b/tests/PerformanceTests/HelloCities/HelloSequence.php @@ -1,4 +1,5 @@ callActivity(SayHello::class, ['Tokyo']), - $context->callActivity(SayHello::class, ['Seattle']), - $context->callActivity(SayHello::class, ['London']), - $context->callActivity(SayHello::class, ['Amsterdam']), - $context->callActivity(SayHello::class, ['Seoul']), + $context->callActivity(SayHello::class, null, null, 'Tokyo'), + $context->callActivity(SayHello::class, null, null, 'Seattle'), + $context->callActivity(SayHello::class, null, null, 'London'), + $context->callActivity(SayHello::class, null, null, 'Amsterdam'), + $context->callActivity(SayHello::class, null, null, 'Seoul'), ]; return $context->waitAll(...$outputs); diff --git a/tests/PerformanceTests/Sequence.php b/tests/PerformanceTests/Sequence.php index b3555a76..738a2236 100644 --- a/tests/PerformanceTests/Sequence.php +++ b/tests/PerformanceTests/Sequence.php @@ -1,4 +1,5 @@ getInput(); $results = []; foreach ($sequence as $value) { - $results[] = $context->waitOne($context->callActivity(SayHello::class, ['name' => $value])); + $results[] = $context->waitOne($context->callActivity(SayHello::class, null, null, $value)); } return $results; diff --git a/tests/Unit/OrchestrationHistoryTest.php b/tests/Unit/OrchestrationHistoryTest.php index 93e9e1f9..68a490f8 100644 --- a/tests/Unit/OrchestrationHistoryTest.php +++ b/tests/Unit/OrchestrationHistoryTest.php @@ -196,7 +196,7 @@ public function entry(string $test, SerializedType $type): string it('can call an activity with a successful result', function (): void { $instance = getOrchestration( 'test', - fn(OrchestrationContext $context) => $context->waitOne($context->callActivity('test', ['hello world'])), + fn(OrchestrationContext $context) => $context->waitOne($context->callActivity('test', null, null, args: 'hello world')), [], $nextEvent, ); @@ -215,16 +215,14 @@ public function entry(string $test, SerializedType $type): string }); it('can call an activity with a successful result (example)', function (): void { - $instance = fn(OrchestrationContextInterface $context) - => $context->waitOne($context->callActivity('test', ['hello world'])); + $instance = fn(OrchestrationContextInterface $context) => $context->waitOne($context->callActivity('test', args: 'hello world')); $context = new DummyOrchestrationContext($instance, []); $context->handleActivities(new ActivityMock('test', 'pretty colors')); expect($instance($context))->toBe(['pretty colors']); }); it('can call an activity with a failed result (example)', function (): void { - $instance = fn(OrchestrationContextInterface $context) - => $context->waitOne($context->callActivity('test', ['hello world'])); + $instance = fn(OrchestrationContextInterface $context) => $context->waitOne($context->callActivity('test', args: 'hello world')); $context = new DummyOrchestrationContext($instance, []); $context->handleActivities(new ActivityMock('test', new Exception('hello world'))); expect(fn() => $instance($context))->toThrow(Exception::class, 'hello world'); @@ -233,7 +231,7 @@ public function entry(string $test, SerializedType $type): string it('can call an activity with a failed result', function (): void { $instance = getOrchestration( 'test', - fn(OrchestrationContext $context) => $context->waitOne($context->callActivity('test', ['hello world'])), + fn(OrchestrationContext $context) => $context->waitOne($context->callActivity('test', args: 'hello world')), [], $nextEvent, );