Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified bin/composer.phar
Binary file not shown.
Binary file modified bin/dphp
Binary file not shown.
15 changes: 11 additions & 4 deletions src/DurableFuture.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,34 @@
namespace Bottledcode\DurablePhp;

use Amp\DeferredFuture;
use Bottledcode\DurablePhp\State\Serializer;
use LogicException;

/**
* @template T
*/
class DurableFuture
{
/**
* @param DeferredFuture<T> $future
* @param DeferredFuture<T> $future
* @param class-string<T>|null $resultType
*/
public function __construct(public readonly DeferredFuture $future) {}
public function __construct(public readonly DeferredFuture $future, public readonly ?string $resultType = null) {}

/**
* @return T
*/
public function getResult(): mixed
{
if ($this->future->isComplete()) {
return $this->future->getFuture()->await();
if ($this->resultType === null) {
return $this->future->getFuture()->await();
}

return Serializer::deserialize($this->future->getFuture()->await(), $this->resultType);
}

throw new \LogicException('Future is not complete');
throw new LogicException('Future is not complete');
}

public function hasResult(): bool
Expand Down
44 changes: 22 additions & 22 deletions src/OrchestrationContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,18 @@ final class OrchestrationContext implements OrchestrationContextInterface

private int $randomKey = 0;

public function callActivity(string $name, array $args = [], ?RetryOptions $retryOptions = null): DurableFuture
public function callActivity(string $name, ?string $returnType = null, ?RetryOptions $retryOptions = null, mixed ...$args): DurableFuture
{
$this->durableLogger->debug('Calling activity', ['name' => $name]);
$identity = $this->newGuid();

return $this->createFuture(
fn()
=> $this->taskController->fire(
AwaitResult::forEvent(
StateId::fromInstance($this->id),
WithActivity::forEvent($identity, ScheduleTask::forName($name, $args)),
),
fn() => $this->taskController->fire(
AwaitResult::forEvent(
StateId::fromInstance($this->id),
WithActivity::forEvent($identity, ScheduleTask::forName($name, $args)),
),
),
function (Event $event, string $eventIdentity) use ($identity): array {
if (($event instanceof TaskCompleted || $event instanceof TaskFailed) &&
$eventIdentity === $identity->toString()) {
Expand All @@ -92,6 +91,7 @@ function (Event $event, string $eventIdentity) use ($identity): array {
return [null, false];
},
$identity->toString(),
$returnType,
);
}

Expand All @@ -113,9 +113,10 @@ private function createFuture(
Closure $onSent,
Closure $onReceived,
?string $identity = null,
?string $resultType = null,
): DurableFuture {
$identity ??= $this->history->historicalTaskResults->getIdentity();
if (!$this->history->historicalTaskResults->hasSentIdentity($identity)) {
if (! $this->history->historicalTaskResults->hasSentIdentity($identity)) {
$this->durableLogger->debug('Future requested for an unsent identity', [$identity]);
[$eventId] = $onSent();
$deferred = new DeferredFuture();
Expand All @@ -129,7 +130,7 @@ private function createFuture(
$this->durableLogger->debug('Future requested for a sent identity, processing future', [$identity]);

$deferred = new DeferredFuture();
$future = new DurableFuture($deferred);
$future = new DurableFuture($deferred, $resultType);
$this->history->historicalTaskResults->trackFuture($onReceived, $future);

return $future;
Expand Down Expand Up @@ -177,9 +178,9 @@ public function callActivityInline(Closure $activity): DurableFuture

public function callSubOrchestrator(
string $name,
array $args = [],
?string $instanceId = null,
?RetryOptions $retryOptions = null,
mixed ...$args,
): DurableFuture {
throw new LogicException('Not implemented');
}
Expand Down Expand Up @@ -214,13 +215,12 @@ public function createTimer(DateTimeImmutable|DateInterval $fireAt): DurableFutu
$identity = sha1($fireAt->format('c'));

return $this->createFuture(
fn()
=> $this->taskController->fire(
WithOrchestration::forInstance(
StateId::fromInstance($this->id),
WithDelay::forEvent($fireAt, RaiseEvent::forTimer($identity)),
),
fn() => $this->taskController->fire(
WithOrchestration::forInstance(
StateId::fromInstance($this->id),
WithDelay::forEvent($fireAt, RaiseEvent::forTimer($identity)),
),
),
function (Event $event) use ($identity): array {
if ($event instanceof RaiseEvent && $event->eventName === $identity) {
return [$event, true];
Expand All @@ -236,10 +236,10 @@ public function getCurrentTime(): DateTimeImmutable
return $this->history->historicalTaskResults->getCurrentTime();
}

public function waitForExternalEvent(string $name): DurableFuture
public function waitForExternalEvent(string $name, ?string $resultType = null): DurableFuture
{
$this->durableLogger->debug('Waiting for external event', ['name' => $name]);
$future = new DurableFuture(new DeferredFuture());
$future = new DurableFuture(new DeferredFuture(), $resultType);
$this->history->historicalTaskResults->trackFuture(function (Event $event) use ($name): array {
$found = false;
$result = null;
Expand Down Expand Up @@ -370,7 +370,7 @@ public function isLockedOwned(EntityId $entityId): bool
public function lockEntity(EntityId ...$entityId): EntityLock
{
$this->durableLogger->debug('Locking entities', ['entityId' => $entityId]);
if (!empty($this->history->locks ?? []) && !$this->isReplaying()) {
if (! empty($this->history->locks ?? []) && ! $this->isReplaying()) {
throw new LogicException('Cannot lock an entity while holding locks');
}

Expand Down Expand Up @@ -447,7 +447,7 @@ public function waitAll(DurableFuture ...$tasks): array
/**
* @template T
*
* @param class-string<T> $className
* @param class-string<T> $className
* @return T
*/
public function createEntityProxy(string $className, ?EntityId $entityId = null): object
Expand All @@ -457,7 +457,7 @@ public function createEntityProxy(string $className, ?EntityId $entityId = null)
}

$class = new ReflectionClass($className);
if (!$class->isInterface()) {
if (! $class->isInterface()) {
throw new LogicException('Only interfaces can be proxied');
}

Expand Down Expand Up @@ -577,7 +577,7 @@ public function entityOp(string|EntityId $id, Closure $operation): mixed
}

$name = $type->getName();
if (!interface_exists($name)) {
if (! interface_exists($name)) {
throw new LogicException('Unable to load interface: ' . $name);
}

Expand Down
19 changes: 15 additions & 4 deletions src/OrchestrationContextInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ interface OrchestrationContextInterface
* @template T
*
* @param string $name The name of the function to remotely invoke
* @param array $args The arguments to pass to the function
* @param class-string<T>|null $returnType
* @param RetryOptions|null $retryOptions How to retry on failure
* @param array $args The arguments to pass to the function
* @return DurableFuture<T>
*/
public function callActivity(string $name, array $args = [], ?RetryOptions $retryOptions = null): DurableFuture;
public function callActivity(string $name, ?string $returnType = null, ?RetryOptions $retryOptions = null, mixed ...$args): DurableFuture;

/**
* Calls an activity inline. There are no retries and exceptions will cause an immediate failure.
Expand Down Expand Up @@ -94,9 +95,9 @@ public function lockEntity(EntityId ...$entityId): EntityLock;

public function callSubOrchestrator(
string $name,
array $args = [],
?string $instanceId = null,
?RetryOptions $retryOptions = null,
mixed ...$args,
): DurableFuture;

public function continueAsNew(array $args = []): never;
Expand Down Expand Up @@ -133,9 +134,10 @@ public function setCustomStatus(string $customStatus): void;
*
* @template T
*
* @param class-string<T>|null $resultType
* @return DurableFuture<T>
*/
public function waitForExternalEvent(string $name): DurableFuture;
public function waitForExternalEvent(string $name, ?string $resultType = null): DurableFuture;

/**
* Gets the current time in a deterministic way. (always the time the execution started)
Expand Down Expand Up @@ -190,11 +192,20 @@ public function waitAny(DurableFuture ...$tasks): DurableFuture;

/**
* Returns once all futures have completed.
*
* @template-covariant T
*
* @return array<T>
*/
public function waitAll(DurableFuture ...$tasks): array;

/**
* Returns the result (or throws on failure) once a single future has completed.
*
* @template T
*
* @param DurableFuture<T> $task
* @return T
*/
public function waitOne(DurableFuture $task): mixed;

Expand Down
8 changes: 7 additions & 1 deletion src/RemoteOrchestrationClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
use Bottledcode\DurablePhp\State\Status;
use Exception;
use Generator;
use JsonException;
use Override;
use RuntimeException;
use Withinboredom\Time\Unit;

use function Withinboredom\Time\Hours;
Expand Down Expand Up @@ -94,7 +96,11 @@ public function getStatus(OrchestrationInstance $instance): Status
while ($result->getBody()->isReadable()) {
$body .= $result->getBody()->buffer();
}
$result = json_decode($body, true, 512, JSON_THROW_ON_ERROR);
try {
$result = json_decode($body, true, flags: JSON_THROW_ON_ERROR);
} catch (JsonException $exception) {
throw new RuntimeException('Failed to decode JSON: ' . $body, previous: $exception);
}

return Serializer::deserialize($result, Status::class);
}
Expand Down
17 changes: 9 additions & 8 deletions src/Testing/DummyOrchestrationContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ public function asUser(string $userId): void

public function callActivity(
string $name,
array $args = [],
?string $returnType = null,
?RetryOptions $retryOptions = null,
mixed ...$args,
): DurableFuture {
$future = new DeferredFuture();
if ($this->activities[$name] ?? false) {
Expand All @@ -118,7 +119,7 @@ public function callActivity(
$future->complete($result);
}

return new DurableFuture($future);
return new DurableFuture($future, $returnType);
}

throw new LogicException('Failed to find registered activity: ' . $name);
Expand Down Expand Up @@ -147,7 +148,7 @@ public function entityOp(EntityId|string $id, Closure $operation): mixed
}

$name = $type->getName();
if (!interface_exists($name)) {
if (! interface_exists($name)) {
throw new LogicException('Unable to load interface: ' . $name);
}

Expand Down Expand Up @@ -230,9 +231,9 @@ public function lockEntity(EntityId ...$entityId): EntityLock

public function callSubOrchestrator(
string $name,
array $args = [],
?string $instanceId = null,
?RetryOptions $retryOptions = null,
mixed ...$args,
): DurableFuture {
throw new LogicException('Not implemented');
}
Expand Down Expand Up @@ -260,13 +261,13 @@ public function setCustomStatus(string $customStatus): void
$this->status = $this->status->with(customStatus: $customStatus);
}

public function waitForExternalEvent(string $name): DurableFuture
public function waitForExternalEvent(string $name, ?string $resultType = null): DurableFuture
{
$future = new DeferredFuture();
$value = $this->events[$name] ?? throw new LogicException('Event not found: ' . $name);
$future->complete($value);

return new DurableFuture($future);
return new DurableFuture($future, $resultType);
}

public function getCurrentTime(): DateTimeImmutable
Expand Down Expand Up @@ -358,7 +359,7 @@ public function waitAll(DurableFuture ...$tasks): array
{
$results = [];
foreach ($tasks as $task) {
if (!$task->future->isComplete()) {
if (! $task->future->isComplete()) {
throw new LogicException('Not all futures are completed');
}
$results[] = $task->getResult();
Expand All @@ -376,7 +377,7 @@ public function createEntityProxy(
}

$class = new ReflectionClass($className);
if (!$class->isInterface()) {
if (! $class->isInterface()) {
throw new LogicException('Only interfaces can be proxied');
}

Expand Down
2 changes: 1 addition & 1 deletion tests/PerformanceTests/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM php:8.3-zts
FROM php:8.4-zts

COPY --from=mlocati/php-extension-installer /usr/bin/install-php-extensions /usr/local/bin/

Expand Down
3 changes: 2 additions & 1 deletion tests/PerformanceTests/FanOutFanIn.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<?php

/*
* Copyright ©2024 Robert Landers
*
Expand Down Expand Up @@ -35,7 +36,7 @@ public function __invoke(OrchestrationContextInterface $context): void
$count = $context->getInput()['count'];
$tasks = [];
for ($i = 0; $i < $count; $i++) {
$tasks[] = $context->callActivity(SayHello::class, [mb_str_pad((string) $i, 4, '0', STR_PAD_LEFT)]);
$tasks[] = $context->callActivity(SayHello::class, null, null, mb_str_pad((string) $i, 4, '0', STR_PAD_LEFT));
}
$context->waitAll(...$tasks);
foreach ($tasks as $i => $task) {
Expand Down
11 changes: 6 additions & 5 deletions tests/PerformanceTests/HelloCities/HelloSequence.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<?php

/*
* Copyright ©2024 Robert Landers
*
Expand Down Expand Up @@ -33,11 +34,11 @@ class HelloSequence
public function __invoke(OrchestrationContextInterface $context): array
{
$outputs = [
$context->callActivity(SayHello::class, ['Tokyo']),
$context->callActivity(SayHello::class, ['Seattle']),
$context->callActivity(SayHello::class, ['London']),
$context->callActivity(SayHello::class, ['Amsterdam']),
$context->callActivity(SayHello::class, ['Seoul']),
$context->callActivity(SayHello::class, null, null, 'Tokyo'),
$context->callActivity(SayHello::class, null, null, 'Seattle'),
$context->callActivity(SayHello::class, null, null, 'London'),
$context->callActivity(SayHello::class, null, null, 'Amsterdam'),
$context->callActivity(SayHello::class, null, null, 'Seoul'),
];

return $context->waitAll(...$outputs);
Expand Down
3 changes: 2 additions & 1 deletion tests/PerformanceTests/Sequence.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<?php

/*
* Copyright ©2024 Robert Landers
*
Expand Down Expand Up @@ -35,7 +36,7 @@ public function __invoke(OrchestrationContextInterface $context)
$sequence = $context->getInput();
$results = [];
foreach ($sequence as $value) {
$results[] = $context->waitOne($context->callActivity(SayHello::class, ['name' => $value]));
$results[] = $context->waitOne($context->callActivity(SayHello::class, null, null, $value));
}

return $results;
Expand Down
Loading
Loading