diff --git a/.github/workflows/build-cli.yaml b/.github/workflows/build-cli.yaml index 7d0f6a88..05cb2571 100644 --- a/.github/workflows/build-cli.yaml +++ b/.github/workflows/build-cli.yaml @@ -189,8 +189,6 @@ jobs: dphp exec tests/PerformanceTests/PerformanceClient.php echo "Running fan out/in test" dphp exec tests/PerformanceTests/FanOutFanInClient.php - echo "Running seq test" - dphp exec tests/PerformanceTests/SequenceClient.php - uses: peter-evans/find-comment@v3 continue-on-error: true id: fc diff --git a/Dockerfile b/Dockerfile index 88c9ce49..02fefded 100644 --- a/Dockerfile +++ b/Dockerfile @@ -78,7 +78,7 @@ RUN install-php-extensions @composer apcu bcmath bz2 calendar ctype curl dom exi FROM common AS builder COPY --from=golang-base /usr/local/go /usr/local/go -ENV PATH /usr/local/go/bin:$PATH +ENV PATH=/usr/local/go/bin:$PATH RUN apt-get update && \ apt-get -y --no-install-recommends install \ diff --git a/cli/cli.go b/cli/cli.go index bd4ebf1a..67ed5d9e 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -565,6 +565,7 @@ func main() { WithOption(cli.NewOption("no-api", "Disable the api server").WithType(cli.TypeBool)). WithOption(cli.NewOption("verbose", "Enable info level logging").WithType(cli.TypeBool)). WithOption(cli.NewOption("debug", "Enable debug logging").WithType(cli.TypeBool)). + WithOption(cli.NewOption("typesense-url", "The url to the typesense server").WithType(cli.TypeString)). WithCommand(run). WithCommand(initCmd). WithCommand(version). diff --git a/cli/config/config.go b/cli/config/config.go index a8698435..52417ddf 100644 --- a/cli/config/config.go +++ b/cli/config/config.go @@ -114,6 +114,9 @@ func ApplyOptions(config *Config, options map[string]string) (*Config, error) { config.Nat.Url = options["nats-server"] config.Nat.Internal = false } + if options["typesense-url"] != "" { + config.Extensions.Search.Url = options["typesense-url"] + } if options["bootstrap"] != "" { config.Bootstrap = options["bootstrap"] } diff --git a/cli/glue/glue.go b/cli/glue/glue.go index 3b7781be..9ba9e784 100644 --- a/cli/glue/glue.go +++ b/cli/glue/glue.go @@ -15,6 +15,7 @@ import ( "net/url" "os" "path/filepath" + "strings" "sync" ) @@ -98,9 +99,15 @@ func FromApiRequest(ctx context.Context, r *http.Request, function Method, logge env := make(map[string]string) env["FROM_REQUEST"] = "1" env["STATE_ID"] = id.String() + remoteAddr := strings.Split(r.RemoteAddr, ":")[0] + env["REMOTE_ADDR"] = remoteAddr msgs, responseHeaders, _, deleteAfter := glu.Execute(ctx, headers, logger, env, stream, id) + for _, msg := range msgs { + msg.Header.Add("Remote-Addr", remoteAddr) + } + return msgs, temp.Name(), nil, &responseHeaders, deleteAfter } diff --git a/cli/go.mod b/cli/go.mod index 09ce8768..0738cb64 100644 --- a/cli/go.mod +++ b/cli/go.mod @@ -1,6 +1,6 @@ module durable_php -go 1.22 +go 1.23 require github.com/dunglas/frankenphp v1.2.5 diff --git a/cli/lib/consumer.go b/cli/lib/consumer.go index e847be3b..142e16ce 100644 --- a/cli/lib/consumer.go +++ b/cli/lib/consumer.go @@ -289,6 +289,7 @@ func processMsg(ctx context.Context, logger *zap.Logger, msg jetstream.Msg, js j headers.Add("X-Correlation-ID", ctx.Value("cid").(string)) env["EVENT"] = string(msg.Data()) env["STATE_ID"] = msg.Headers().Get(string(glue.HeaderStateId)) + env["REMOTE_ADDR"] = msg.Headers().Get("Remote-Addr") msgs, headers, _, deleteAfter := glu.Execute(ctx, headers, logger, env, js, id) diff --git a/src/Events/EventQueue.php b/src/Events/EventQueue.php index 54a407aa..8ab8d867 100644 --- a/src/Events/EventQueue.php +++ b/src/Events/EventQueue.php @@ -25,9 +25,14 @@ namespace Bottledcode\DurablePhp\Events; use Amp\DeferredCancellation; +use DateTimeImmutable; use Revolt\EventLoop; use SplQueue; use Withinboredom\Time\Seconds; +use Withinboredom\Time\Time; +use Withinboredom\Time\TimeUnit; + +use function Withinboredom\Time\Seconds; class EventQueue { @@ -96,8 +101,8 @@ public function getNext(array $requeueKeys): Event|null public function enqueue(string $key, Event $event): void { $delay = $this->getDelay($event); - if ($delay->inSeconds() > 0) { - EventLoop::delay($delay->inSeconds(), function () use ($key, $event): void { + if ($delay->as(TimeUnit::Seconds) > 0) { + EventLoop::delay($delay->as(TimeUnit::Seconds), function () use ($key, $event): void { $this->enqueue($key, $event); if ($this->cancellation !== null) { $this->cancellation?->cancel(); @@ -116,14 +121,14 @@ public function enqueue(string $key, Event $event): void $this->size++; } - private function getDelay(Event $event): Seconds + private function getDelay(Event $event): Time { while ($event instanceof HasInnerEventInterface) { if ($event instanceof WithDelay) { $at = $event->fireAt->getTimestamp(); - $now = (new \DateTimeImmutable())->getTimestamp(); + $now = (new DateTimeImmutable())->getTimestamp(); $seconds = $at - $now; - return new Seconds(max(0, $seconds)); + return Seconds(max(0, $seconds)); } $event = $event->getInnerEvent(); diff --git a/src/Glue/glue.php b/src/Glue/glue.php index 9aafd5fe..ebc6f123 100644 --- a/src/Glue/glue.php +++ b/src/Glue/glue.php @@ -57,6 +57,7 @@ use Ramsey\Uuid\Uuid; use ReflectionClass; use ReflectionFunction; +use Withinboredom\Time\TimeUnit; require_once __DIR__ . '/autoload.php'; @@ -85,18 +86,21 @@ public function __construct(private DurableLogger $logger) $this->method = $_SERVER['HTTP_DPHP_FUNCTION']; try { $provenance = json_decode($_SERVER['HTTP_DPHP_PROVENANCE'] ?? 'null', true, 32, JSON_THROW_ON_ERROR); - if (! $provenance || $provenance === ['userId' => '', 'roles' => null]) { + if (!$provenance || $provenance === ['userId' => '', 'roles' => null]) { $this->provenance = null; } else { $provenance['roles'] ??= []; $this->provenance = Serializer::deserialize($provenance, Provenance::class); } } catch (JsonException $e) { - $this->logger->alert('Failed to capture provenance', ['provenance' => $_SERVER['HTTP_DPHP_PROVENANCE'] ?? null]); + $this->logger->alert( + 'Failed to capture provenance', + ['provenance' => $_SERVER['HTTP_DPHP_PROVENANCE'] ?? null], + ); $this->provenance = null; } - if (! file_exists($_SERVER['HTTP_DPHP_PAYLOAD'])) { + if (!file_exists($_SERVER['HTTP_DPHP_PAYLOAD'])) { throw new LogicException('Unable to load payload'); } @@ -186,14 +190,20 @@ public function outputEvent(EventDescription $event): void private function startOrchestration(): void { - if (! $this->target->toOrchestrationInstance()->executionId) { - $this->target = StateId::fromInstance(new OrchestrationInstance($this->target->toOrchestrationInstance()->instanceId, Uuid::uuid7()->toString())); + if (!$this->target->toOrchestrationInstance()->executionId) { + $this->target = StateId::fromInstance( + new OrchestrationInstance( + $this->target->toOrchestrationInstance()->instanceId, + Uuid::uuid7()->toString(), + ), + ); } header('X-Id: ' . $this->target->id); $input = SerializedArray::import($this->payload['input'])->toArray(); - $event = WithOrchestration::forInstance($this->target, StartExecution::asParent($input, []/* todo: scheduling */)); + $event = + WithOrchestration::forInstance($this->target, StartExecution::asParent($input, []/* todo: scheduling */)); $this->outputEvent(new EventDescription($event)); $actualId = $this->target->toOrchestrationInstance(); @@ -301,19 +311,16 @@ private function getPermissions(): void $permissions['mode'] = 'anon'; break; case $attribute->getName() === AllowCreateForRole::class: - /** @var AllowCreateForRole $attribute */ - $attribute = $attribute->newInstance(); + /** @var AllowCreateForRole $attribute */ $attribute = $attribute->newInstance(); $permissions['roles'][] = $attribute->role; break; case $attribute->getName() === AllowCreateForUser::class: - /** @var AllowCreateForUser $attribute */ - $attribute = $attribute->newInstance(); + /** @var AllowCreateForUser $attribute */ $attribute = $attribute->newInstance(); $permissions['users'][] = $attribute->user; break; case $attribute->getName() === TimeToLive::class: - /** @var TimeToLive $attribute */ - $attribute = $attribute->newInstance(); - $permissions['ttl'] = $attribute->timeToLive()->inNanoseconds(); + /** @var TimeToLive $attribute */ $attribute = $attribute->newInstance(); + $permissions['ttl'] = $attribute->timeToLive()->as(TimeUnit::Nanoseconds); break; } }