diff --git a/go/apps/ctrl/run.go b/go/apps/ctrl/run.go index 8ea9b829c9..2f23aef4b8 100644 --- a/go/apps/ctrl/run.go +++ b/go/apps/ctrl/run.go @@ -103,7 +103,8 @@ func Run(ctx context.Context, cfg Config) error { builderService := builder.NewMockService() // Register deployment workflow with Hydra worker - deployWorkflow := version.NewDeployWorkflow(database, logger, builderService) + // TODO: Replace nil with actual metald client when available + deployWorkflow := version.NewDeployWorkflow(database, logger, builderService, nil) err = hydra.RegisterWorkflow(hydraWorker, deployWorkflow) if err != nil { return fmt.Errorf("unable to register deployment workflow: %w", err) diff --git a/go/apps/ctrl/services/version/deploy_workflow.go b/go/apps/ctrl/services/version/deploy_workflow.go index 29c50374d0..d5408d21f8 100644 --- a/go/apps/ctrl/services/version/deploy_workflow.go +++ b/go/apps/ctrl/services/version/deploy_workflow.go @@ -6,6 +6,9 @@ import ( "fmt" "time" + "connectrpc.com/connect" + vmprovisionerv1 "github.com/unkeyed/unkey/go/gen/proto/metal/vmprovisioner/v1" + "github.com/unkeyed/unkey/go/gen/proto/metal/vmprovisioner/v1/vmprovisionerv1connect" "github.com/unkeyed/unkey/go/pkg/builder" "github.com/unkeyed/unkey/go/pkg/db" "github.com/unkeyed/unkey/go/pkg/hydra" @@ -18,14 +21,16 @@ type DeployWorkflow struct { db db.Database logger logging.Logger builderService builder.Service + metaldClient vmprovisionerv1connect.VmServiceClient } // NewDeployWorkflow creates a new deploy workflow instance -func NewDeployWorkflow(database db.Database, logger logging.Logger, builderService builder.Service) *DeployWorkflow { +func NewDeployWorkflow(database db.Database, logger logging.Logger, builderService builder.Service, metaldClient vmprovisionerv1connect.VmServiceClient) *DeployWorkflow { return &DeployWorkflow{ db: database, logger: logger, builderService: builderService, + metaldClient: metaldClient, } } @@ -140,129 +145,204 @@ func (w *DeployWorkflow) Run(ctx hydra.WorkflowContext, req *DeployRequest) erro return err } - // Step 7: Wait for build completion with polling - buildResult, err := hydra.Step(ctx, "wait-for-completion", func(stepCtx context.Context) (*BuildResult, error) { - w.logger.Info("waiting for build completion", "build_id", buildID) + // Wait for build completion with polling (max 150 attempts = 5 minutes) + var buildResult *BuildResult + lastStatus := "" - ticker := time.NewTicker(2 * time.Second) - defer ticker.Stop() - - lastStatus := "" - - for { - select { - case <-stepCtx.Done(): - return nil, stepCtx.Err() - case <-ticker.C: - buildStatus, statusErr := w.builderService.GetBuildStatus(stepCtx, buildID) - if statusErr != nil { - w.logger.Error("failed to get build status", "build_id", buildID, "error", statusErr) - continue - } - - currentStatus := string(buildStatus.Status) + for attempt := 1; attempt <= 150; attempt++ { + currentBuildStatus, err := hydra.Step(ctx, fmt.Sprintf("check-build-status-%d", attempt), func(stepCtx context.Context) (*builder.BuildInfo, error) { + buildStatus, statusErr := w.builderService.GetBuildStatus(stepCtx, buildID) + if statusErr != nil { + return nil, fmt.Errorf("failed to get build status: %w", statusErr) + } - // Only update if status changed - if currentStatus == lastStatus { - continue - } + w.logger.Info("build status check", "build_id", buildID, "status", string(buildStatus.Status), "attempt", attempt) + return buildStatus, nil + }) + if err != nil { + w.logger.Error("failed to check build status", "error", err, "build_id", buildID, "attempt", attempt) + return err + } - w.logger.Info("build status update", "build_id", buildID, "status", currentStatus) - lastStatus = currentStatus - - _, err = hydra.Step(ctx, "update-build-status", func(updateCtx context.Context) (*struct{}, error) { - now := time.Now().UnixMilli() - - switch buildStatus.Status { - case builder.BuildStatusQueued: - // Build is queued, no update needed yet - // Continue polling without database update - - case builder.BuildStatusRunning: - runningErr := db.Query.UpdateBuildStatus(updateCtx, w.db.RW(), db.UpdateBuildStatusParams{ - ID: buildID, - Status: db.BuildsStatusRunning, - Now: sql.NullInt64{Valid: true, Int64: now}, - }) - if runningErr != nil { - return nil, fmt.Errorf("failed to update build status to running: %w", runningErr) - } - - case builder.BuildStatusSuccess: - successErr := db.Query.UpdateBuildSucceeded(updateCtx, w.db.RW(), db.UpdateBuildSucceededParams{ - ID: buildID, - Now: sql.NullInt64{Valid: true, Int64: now}, - }) - if successErr != nil { - return nil, fmt.Errorf("failed to update build status to succeeded: %w", successErr) - } - - case builder.BuildStatusFailed: - failedErr := db.Query.UpdateBuildFailed(updateCtx, w.db.RW(), db.UpdateBuildFailedParams{ - ID: buildID, - ErrorMessage: sql.NullString{String: buildStatus.ErrorMsg, Valid: buildStatus.ErrorMsg != ""}, - Now: sql.NullInt64{Valid: true, Int64: now}, - }) - if failedErr != nil { - return nil, fmt.Errorf("failed to update build status to failed: %w", failedErr) - } - - // Also update version status to failed - versionErr := db.Query.UpdateVersionStatus(updateCtx, w.db.RW(), db.UpdateVersionStatusParams{ - ID: req.VersionID, - Status: db.VersionsStatusFailed, - Now: sql.NullInt64{Valid: true, Int64: now}, - }) - if versionErr != nil { - return nil, fmt.Errorf("failed to update version status to failed: %w", versionErr) - } - } + currentStatus := string(currentBuildStatus.Status) - return &struct{}{}, nil - }) - if err != nil { - w.logger.Error("failed to update build status", "error", err, "status", currentStatus) - if buildStatus.Status != builder.BuildStatusFailed { - continue // For non-failed states, continue polling - } - } + // Skip database update if status hasn't changed + if currentStatus == lastStatus { + // Status unchanged, continue to build completion check + } else { + _, err = hydra.Step(ctx, fmt.Sprintf("update-build-status-%d", attempt), func(updateCtx context.Context) (*struct{}, error) { + now := time.Now().UnixMilli() - // Return appropriate result based on final status - switch buildStatus.Status { + switch currentBuildStatus.Status { case builder.BuildStatusQueued: - // Continue polling, build is still queued - continue + // Build is queued, no update needed yet case builder.BuildStatusRunning: - // Continue polling, build is still running - continue + runningErr := db.Query.UpdateBuildStatus(updateCtx, w.db.RW(), db.UpdateBuildStatusParams{ + ID: buildID, + Status: db.BuildsStatusRunning, + Now: sql.NullInt64{Valid: true, Int64: now}, + }) + if runningErr != nil { + return nil, fmt.Errorf("failed to update build status to running: %w", runningErr) + } case builder.BuildStatusSuccess: - return &BuildResult{ - BuildID: buildID, - Status: "succeeded", - ErrorMsg: "", - }, nil + successErr := db.Query.UpdateBuildSucceeded(updateCtx, w.db.RW(), db.UpdateBuildSucceededParams{ + ID: buildID, + Now: sql.NullInt64{Valid: true, Int64: now}, + }) + if successErr != nil { + return nil, fmt.Errorf("failed to update build status to succeeded: %w", successErr) + } case builder.BuildStatusFailed: - return &BuildResult{ - BuildID: buildID, - Status: "failed", - ErrorMsg: buildStatus.ErrorMsg, - }, fmt.Errorf("build failed: %s", buildStatus.ErrorMsg) + failedErr := db.Query.UpdateBuildFailed(updateCtx, w.db.RW(), db.UpdateBuildFailedParams{ + ID: buildID, + ErrorMessage: sql.NullString{String: currentBuildStatus.ErrorMsg, Valid: currentBuildStatus.ErrorMsg != ""}, + Now: sql.NullInt64{Valid: true, Int64: now}, + }) + if failedErr != nil { + return nil, fmt.Errorf("failed to update build status to failed: %w", failedErr) + } + + // Also update version status to failed + versionErr := db.Query.UpdateVersionStatus(updateCtx, w.db.RW(), db.UpdateVersionStatusParams{ + ID: req.VersionID, + Status: db.VersionsStatusFailed, + Now: sql.NullInt64{Valid: true, Int64: now}, + }) + if versionErr != nil { + return nil, fmt.Errorf("failed to update version status to failed: %w", versionErr) + } } + + return &struct{}{}, nil + }) + if err != nil { + w.logger.Error("failed to update build status", "error", err, "status", currentStatus, "attempt", attempt) + return err + } + lastStatus = currentStatus + } + + // Check if build is complete + switch currentBuildStatus.Status { + case builder.BuildStatusSuccess: + buildResult = &BuildResult{ + BuildID: buildID, + Status: "succeeded", + ErrorMsg: "", + } + goto buildComplete + + case builder.BuildStatusFailed: + buildResult = &BuildResult{ + BuildID: buildID, + Status: "failed", + ErrorMsg: currentBuildStatus.ErrorMsg, + } + goto buildComplete + + default: + // Still building, sleep before next attempt + err = hydra.Sleep(ctx, 2*time.Second) + if err != nil { + w.logger.Error("failed to sleep between build checks", "error", err, "attempt", attempt) + return err } } - }) - if err != nil { - w.logger.Error("build failed", "error", err, "build_id", buildID) - return err } - // Step 8: Deploy if build succeeded + // If we reach here, we exceeded max attempts + return fmt.Errorf("build polling timed out after 150 attempts (5 minutes)") + +buildComplete: + + // Handle build failure + if buildResult.Status == "failed" { + w.logger.Error("build failed", "build_id", buildID, "error", buildResult.ErrorMsg) + return fmt.Errorf("build failed: %s", buildResult.ErrorMsg) + } + + // Deploy if build succeeded if buildResult.Status == "succeeded" { - // Step 8b: Update version status to deploying + // Create VM first + createResult, err := hydra.Step(ctx, "create-vm", func(stepCtx context.Context) (*vmprovisionerv1.CreateVmResponse, error) { + w.logger.Info("creating VM for deployment", "version_id", req.VersionID, "docker_image", req.DockerImage) + + // Hardcoded VM configuration (TemplateStandard + ForDockerImage): + vmConfig := &vmprovisionerv1.VmConfig{ + Cpu: &vmprovisionerv1.CpuConfig{ + VcpuCount: 2, + MaxVcpuCount: 4, + }, + Memory: &vmprovisionerv1.MemoryConfig{ + SizeBytes: 2 * 1024 * 1024 * 1024, // 2GB + MaxSizeBytes: 8 * 1024 * 1024 * 1024, // 8GB + HotplugEnabled: true, + }, + Boot: &vmprovisionerv1.BootConfig{ + KernelPath: "/opt/vm-assets/vmlinux", + KernelArgs: "console=ttyS0 reboot=k panic=1 pci=off", + }, + Storage: []*vmprovisionerv1.StorageDevice{{ + Id: "rootfs", + Path: "/opt/vm-assets/rootfs.ext4", + ReadOnly: false, + IsRootDevice: true, + InterfaceType: "virtio-blk", + Options: map[string]string{ + "docker_image": req.DockerImage, + "auto_build": "true", + }, + }}, + Network: []*vmprovisionerv1.NetworkInterface{{ + Id: "eth0", + InterfaceType: "virtio-net", + Mode: vmprovisionerv1.NetworkMode_NETWORK_MODE_DUAL_STACK, + Ipv4Config: &vmprovisionerv1.IPv4Config{ + Dhcp: true, + }, + Ipv6Config: &vmprovisionerv1.IPv6Config{ + Slaac: true, + PrivacyExtensions: true, + }, + }}, + Console: &vmprovisionerv1.ConsoleConfig{ + Enabled: true, + Output: "/tmp/standard-vm-console.log", + ConsoleType: "serial", + }, + Metadata: map[string]string{ + "template": "standard", + "purpose": "general", + "docker_image": req.DockerImage, + "runtime": "docker", + "version_id": req.VersionID, + "workspace_id": req.WorkspaceID, + "project_id": req.ProjectID, + "created_by": "deploy-workflow", + }, + } + + resp, createErr := w.metaldClient.CreateVm(stepCtx, connect.NewRequest(&vmprovisionerv1.CreateVmRequest{ + Config: vmConfig, + })) + if createErr != nil { + return nil, fmt.Errorf("failed to create VM: %w", createErr) + } + + w.logger.Info("VM created successfully", "vm_id", resp.Msg.VmId, "state", resp.Msg.State.String()) + return resp.Msg, nil + }) + if err != nil { + w.logger.Error("VM creation failed", "error", err, "version_id", req.VersionID) + return err + } + + // Update version status to deploying (after successful VM creation) _, err = hydra.Step(ctx, "update-version-deploying", func(stepCtx context.Context) (*struct{}, error) { w.logger.Info("starting deployment", "version_id", req.VersionID) @@ -281,18 +361,70 @@ func (w *DeployWorkflow) Run(ctx hydra.WorkflowContext, req *DeployRequest) erro return err } - // Step 8c: Simulate deployment process - _, err = hydra.Step(ctx, "simulate-deployment", func(stepCtx context.Context) (*struct{}, error) { - // Simulate deployment process (in real implementation, this would orchestrate actual deployment) - time.Sleep(3 * time.Second) - return &struct{}{}, nil + // Check VM readiness (max 30 attempts = 30 seconds) + for attempt := 1; attempt <= 30; attempt++ { + vmInfo, err := hydra.Step(ctx, fmt.Sprintf("check-vm-status-%d", attempt), func(stepCtx context.Context) (*vmprovisionerv1.GetVmInfoResponse, error) { + resp, getErr := w.metaldClient.GetVmInfo(stepCtx, connect.NewRequest(&vmprovisionerv1.GetVmInfoRequest{ + VmId: createResult.VmId, + })) + if getErr != nil { + return nil, fmt.Errorf("failed to get VM info: %w", getErr) + } + + w.logger.Info("VM status check", "vm_id", createResult.VmId, "state", resp.Msg.State.String(), "attempt", attempt) + return resp.Msg, nil + }) + if err != nil { + w.logger.Error("failed to check VM status", "error", err, "vm_id", createResult.VmId, "attempt", attempt) + return err + } + + // Check if VM is ready for boot + if vmInfo.State == vmprovisionerv1.VmState_VM_STATE_CREATED || + vmInfo.State == vmprovisionerv1.VmState_VM_STATE_RUNNING { + w.logger.Info("VM is ready", "vm_id", createResult.VmId, "state", vmInfo.State.String()) + goto vmReady + } + + // Sleep before next attempt (except on last attempt) + if attempt < 30 { + err = hydra.Sleep(ctx, 1*time.Second) + if err != nil { + w.logger.Error("failed to sleep between VM checks", "error", err, "attempt", attempt) + return err + } + } + } + + // If we reach here, VM never became ready + return fmt.Errorf("VM polling timed out after 30 attempts (30 seconds)") + +vmReady: + + // Boot VM + _, err = hydra.Step(ctx, "boot-vm", func(stepCtx context.Context) (*vmprovisionerv1.BootVmResponse, error) { + w.logger.Info("booting VM", "vm_id", createResult.VmId) + + resp, bootErr := w.metaldClient.BootVm(stepCtx, connect.NewRequest(&vmprovisionerv1.BootVmRequest{ + VmId: createResult.VmId, + })) + if bootErr != nil { + return nil, fmt.Errorf("failed to boot VM: %w", bootErr) + } + + if !resp.Msg.Success { + return nil, fmt.Errorf("VM boot was not successful, state: %s", resp.Msg.State.String()) + } + + w.logger.Info("VM booted successfully", "vm_id", createResult.VmId, "state", resp.Msg.State.String()) + return resp.Msg, nil }) if err != nil { - w.logger.Error("deployment simulation failed", "error", err, "version_id", req.VersionID) + w.logger.Error("VM boot failed", "error", err, "vm_id", createResult.VmId) return err } - // Step 8d: Generate completion timestamp + // Generate completion timestamp completionTime, err := hydra.Step(ctx, "generate-completion-timestamp", func(stepCtx context.Context) (int64, error) { return time.Now().UnixMilli(), nil }) @@ -301,7 +433,7 @@ func (w *DeployWorkflow) Run(ctx hydra.WorkflowContext, req *DeployRequest) erro return err } - // Step 8e: Update version status to active + // Update version status to active _, err = hydra.Step(ctx, "update-version-active", func(stepCtx context.Context) (*DeploymentResult, error) { activeErr := db.Query.UpdateVersionStatus(stepCtx, w.db.RW(), db.UpdateVersionStatusParams{ ID: req.VersionID,