Skip to content
Merged
11 changes: 11 additions & 0 deletions src/Activity.php
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,15 @@ public static function heartbeat($details): void

$context->heartbeat($details);
}

/**
* Get the currently running activity instance.
*/
public static function getInstance(): object
{
/** @var ActivityContextInterface $context */
$context = self::getCurrentContext();

return $context->getInstance();
}
}
7 changes: 7 additions & 0 deletions src/Activity/ActivityContextInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,11 @@ public function doNotCompleteOnReturn(): void;
* @param mixed $details
*/
public function heartbeat($details): void;

/**
* Get the currently running activity instance.
*
* @see Activity::getInstance()
*/
public function getInstance(): object;
}
23 changes: 23 additions & 0 deletions src/Internal/Activity/ActivityContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ final class ActivityContext implements ActivityContextInterface, HeaderCarrier
private ?ValuesInterface $heartbeatDetails;
private ValuesInterface $input;
private HeaderInterface $header;
private ?\WeakReference $instance = null;

public function __construct(
RPCConnectionInterface $rpc,
Expand Down Expand Up @@ -147,4 +148,26 @@ public function heartbeat($details): void
throw ActivityCompletionException::fromActivityInfo($this->info, $e);
}
}

public function getInstance(): object
{
\assert($this->instance !== null, 'Activity instance is not available');
$activity = $this->instance->get();
\assert($activity !== null, 'Activity instance is not available');
return $activity;
}

/**
* Set activity instance.
*
* @param object $instance Activity instance.
* @return $this
* @internal
*/
public function withInstance(object $instance): self
{
$clone = clone $this;
$clone->instance = \WeakReference::create($instance);
return $clone;
}
}
15 changes: 15 additions & 0 deletions src/Internal/Declaration/Prototype/WorkflowPrototype.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Temporal\Common\CronSchedule;
use Temporal\Common\MethodRetry;
use Temporal\Workflow\ReturnType;
use Temporal\Workflow\WorkflowInit;

final class WorkflowPrototype extends Prototype
{
Expand All @@ -40,6 +41,20 @@ final class WorkflowPrototype extends Prototype
private ?CronSchedule $cronSchedule = null;
private ?MethodRetry $methodRetry = null;
private ?ReturnType $returnType = null;
private bool $hasInitializer = false;

/**
* Indicates if the workflow has a constructor with {@see WorkflowInit} attribute.
*/
public function hasInitializer(): bool
{
return $this->hasInitializer;
}

public function setHasInitializer(bool $hasInitializer): void
{
$this->hasInitializer = $hasInitializer;
}

public function getCronSchedule(): ?CronSchedule
{
Expand Down
8 changes: 8 additions & 0 deletions src/Internal/Declaration/Reader/WorkflowReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
use Temporal\Workflow\SignalMethod;
use Temporal\Workflow\UpdateMethod;
use Temporal\Workflow\UpdateValidatorMethod;
use Temporal\Workflow\WorkflowInit;
use Temporal\Workflow\WorkflowInterface;
use Temporal\Workflow\WorkflowMethod;

Expand Down Expand Up @@ -125,6 +126,13 @@ private function withMethods(ClassNode $graph, WorkflowPrototype $prototype): Wo
foreach ($class->getMethods() as $method) {
$contextClass = $method->getDeclaringClass();

// Check WorkflowInit method
if ($method->isConstructor()) {
$this->getAttributedMethod($graph, $method, WorkflowInit::class);
$prototype->setHasInitializer(true);
continue;
}

/** @var UpdateMethod|null $update */
$update = $this->getAttributedMethod($graph, $method, UpdateMethod::class);

Expand Down
20 changes: 3 additions & 17 deletions src/Internal/Declaration/WorkflowInstance.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
use Temporal\Internal\Declaration\Prototype\WorkflowPrototype;
use Temporal\Internal\Declaration\WorkflowInstance\SignalQueue;
use Temporal\Internal\Interceptor;
use Temporal\Workflow\WorkflowInit;

/**
* @psalm-type QueryHandler = \Closure(QueryInput): mixed
Expand Down Expand Up @@ -149,17 +148,7 @@ public function init(array $arguments = []): void
return;
}

if ($arguments === []) {
$this->context->__construct();
return;
}

// Check InitMethod attribute
$reflection = new \ReflectionMethod($this->context, '__construct');
$attributes = $reflection->getAttributes(WorkflowInit::class);
$attributes === []
? $this->context->__construct()
: $this->context->__construct(...$arguments);
$this->context->__construct(...$arguments);
}

public function getSignalQueue(): SignalQueue
Expand Down Expand Up @@ -296,15 +285,12 @@ public function setDynamicUpdateHandler(callable $handler, ?callable $validator
static fn(ValuesInterface $arguments): mixed => $validator($input->updateName, $arguments),
);

$this->updateDynamicHandler = $this->pipeline->with(
$this->updateDynamicHandler =
fn(UpdateInput $input, Deferred $deferred): mixed => ($this->updateExecutor)(
$input,
static fn(ValuesInterface $arguments): mixed => $handler($input->updateName, $arguments),
$deferred,
),
/** @see WorkflowInboundCallsInterceptor::handleUpdate() */
'handleUpdate',
)(...);
);
}

public function clearSignalQueue(): void
Expand Down
7 changes: 6 additions & 1 deletion src/Internal/Transport/Router/InvokeActivity.php
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,12 @@ public function handle(ServerRequestInterface $request, array $headers, Deferred
$prototype = $this->findDeclarationOrFail($context->getInfo());

try {
$handler = $prototype->getInstance()->getHandler();
// Create ActivityInstance
$instance = $prototype->getInstance();

// Register Activity instance in the context
$context = $context->withInstance($instance->getContext());
$handler = $instance->getHandler();

// Define Context for interceptors Pipeline
Activity::setCurrentContext($context);
Expand Down
37 changes: 4 additions & 33 deletions src/Internal/Transport/Router/StartWorkflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
use Temporal\Common\TypedSearchAttributes;
use Temporal\DataConverter\EncodedCollection;
use Temporal\DataConverter\EncodedValues;
use Temporal\Interceptor\WorkflowInbound\WorkflowInput;
use Temporal\Interceptor\WorkflowInboundCallsInterceptor;
use Temporal\Internal\Declaration\Instantiator\WorkflowInstantiator;
use Temporal\Internal\Declaration\Prototype\WorkflowPrototype;
use Temporal\Internal\ServiceContainer;
Expand Down Expand Up @@ -92,38 +90,11 @@ public function handle(ServerRequestInterface $request, array $headers, Deferred
);
$runId = $request->getID();

$starter = function (WorkflowInput $input) use (
$resolver,
$instance,
$context,
$runId,
): void {
$context = $context->withInput(new Input($input->info, $input->arguments, $input->header));
$process = new Process($this->services, $context, $runId);
$this->services->running->add($process);
$resolver->resolve(EncodedValues::fromValues([null]));

$process->start($instance->getHandler(), $context->getInput(), $this->wfStartDeferred);
};

// Define Context for interceptors Pipeline
Workflow::setCurrentContext($context);

// Run workflow handler in an interceptor pipeline
$this->services->interceptorProvider
->getPipeline(WorkflowInboundCallsInterceptor::class)
->with(
$starter,
/** @see WorkflowInboundCallsInterceptor::execute() */
'execute',
)(
new WorkflowInput(
$context->getInfo(),
$context->getInput(),
$context->getHeader(),
$context->isReplaying(),
),
);
$process = new Process($this->services, $runId, $instance);
$this->services->running->add($process);
$resolver->resolve(EncodedValues::fromValues([null]));
$process->initAndStart($context, $instance, $this->wfStartDeferred);
}

private function findWorkflowOrFail(WorkflowInfo $info): WorkflowPrototype
Expand Down
Loading