-
Notifications
You must be signed in to change notification settings - Fork 610
feat: implement krane gateway RPCs for docker #4365
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8d0122f
7bb2d55
67c453c
4693bcf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| package docker | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
|
|
||
| "connectrpc.com/connect" | ||
| "github.com/docker/docker/api/types/container" | ||
| "github.com/docker/docker/api/types/network" | ||
| "github.com/docker/go-connections/nat" | ||
| kranev1 "github.com/unkeyed/unkey/go/gen/proto/krane/v1" | ||
| ) | ||
|
|
||
| // CreateGateway creates containers for a gateway with the specified replica count. | ||
| // | ||
| // Creates multiple containers with shared labels, dynamic port mapping to port 8040, | ||
| // and resource limits. Returns GATEWAY_STATUS_PENDING as containers may not be | ||
| // immediately ready. | ||
| func (d *docker) CreateGateway(ctx context.Context, req *connect.Request[kranev1.CreateGatewayRequest]) (*connect.Response[kranev1.CreateGatewayResponse], error) { | ||
| gateway := req.Msg.GetGateway() | ||
| d.logger.Info("creating gateway", | ||
| "gateway_id", gateway.GetGatewayId(), | ||
| "image", gateway.GetImage(), | ||
| ) | ||
|
|
||
| // Ensure image exists locally (pull if not present) | ||
| if err := d.ensureImageExists(ctx, gateway.GetImage()); err != nil { | ||
| return nil, connect.NewError(connect.CodeInternal, | ||
| fmt.Errorf("failed to ensure image exists: %w", err)) | ||
| } | ||
|
|
||
| // Configure port mapping | ||
| exposedPorts := nat.PortSet{ | ||
| "8040/tcp": struct{}{}, | ||
| } | ||
|
|
||
| portBindings := nat.PortMap{ | ||
| "8040/tcp": []nat.PortBinding{ | ||
| { | ||
| HostIP: "0.0.0.0", | ||
| HostPort: "0", // Docker will assign a random available port | ||
| }, | ||
| }, | ||
| } | ||
|
|
||
| // Configure resource limits | ||
| cpuNanos := int64(gateway.GetCpuMillicores()) * 1_000_000 // Convert millicores to nanoseconds | ||
| memoryBytes := int64(gateway.GetMemorySizeMib()) * 1024 * 1024 //nolint:gosec // Intentional conversion | ||
|
|
||
| //nolint:exhaustruct // Docker SDK types have many optional fields | ||
| containerConfig := &container.Config{ | ||
| Image: gateway.GetImage(), | ||
| Labels: map[string]string{ | ||
| "unkey.gateway.id": gateway.GetGatewayId(), | ||
| "unkey.managed.by": "krane", | ||
| }, | ||
| ExposedPorts: exposedPorts, | ||
| Env: []string{ | ||
| fmt.Sprintf("UNKEY_WORKSPACE_ID=%s", gateway.GetWorkspaceId()), | ||
| fmt.Sprintf("UNKEY_GATEWAY_ID=%s", gateway.GetGatewayId()), | ||
| fmt.Sprintf("UNKEY_IMAGE=%s", gateway.GetImage()), | ||
| }, | ||
| } | ||
|
|
||
| //nolint:exhaustruct // Docker SDK types have many optional fields | ||
| hostConfig := &container.HostConfig{ | ||
| PortBindings: portBindings, | ||
| RestartPolicy: container.RestartPolicy{ | ||
| Name: "unless-stopped", | ||
| }, | ||
| Resources: container.Resources{ | ||
| NanoCPUs: cpuNanos, | ||
| Memory: memoryBytes, | ||
| }, | ||
| } | ||
|
|
||
| //nolint:exhaustruct // Docker SDK types have many optional fields | ||
| networkConfig := &network.NetworkingConfig{} | ||
|
|
||
| // Create container | ||
|
|
||
| for i := range req.Msg.GetGateway().GetReplicas() { | ||
| //nolint:exhaustruct // Docker SDK types have many optional fields | ||
| resp, err := d.client.ContainerCreate( | ||
| ctx, | ||
| containerConfig, | ||
| hostConfig, | ||
| networkConfig, | ||
| nil, | ||
| fmt.Sprintf("%s-%d", gateway.GetGatewayId(), i), | ||
| ) | ||
| if err != nil { | ||
| return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to create container: %w", err)) | ||
| } | ||
|
|
||
| //nolint:exhaustruct // Docker SDK types have many optional fields | ||
| err = d.client.ContainerStart(ctx, resp.ID, container.StartOptions{}) | ||
| if err != nil { | ||
| return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to start container: %w", err)) | ||
| } | ||
| } | ||
|
|
||
| return connect.NewResponse(&kranev1.CreateGatewayResponse{ | ||
| Status: kranev1.GatewayStatus_GATEWAY_STATUS_PENDING, | ||
| }), nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| package docker | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
|
|
||
| "connectrpc.com/connect" | ||
| "github.com/docker/docker/api/types/container" | ||
| "github.com/docker/docker/api/types/filters" | ||
| kranev1 "github.com/unkeyed/unkey/go/gen/proto/krane/v1" | ||
| ) | ||
|
|
||
| // DeleteGateway removes all containers for a gateway. | ||
| // | ||
| // Finds containers by gateway ID label and forcibly removes them with | ||
| // volumes and network links to ensure complete cleanup. | ||
| func (d *docker) DeleteGateway(ctx context.Context, req *connect.Request[kranev1.DeleteGatewayRequest]) (*connect.Response[kranev1.DeleteGatewayResponse], error) { | ||
| gatewayID := req.Msg.GetGatewayId() | ||
|
|
||
| d.logger.Info("getting gateway", "gateway_id", gatewayID) | ||
|
|
||
| containers, err := d.client.ContainerList(ctx, container.ListOptions{ | ||
| Size: false, | ||
| Latest: false, | ||
| Since: "", | ||
| Before: "", | ||
| Limit: 0, | ||
| All: true, | ||
| Filters: filters.NewArgs( | ||
| filters.Arg("label", fmt.Sprintf("unkey.gateway.id=%s", gatewayID)), | ||
| ), | ||
| }) | ||
| if err != nil { | ||
| return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to list containers: %w", err)) | ||
| } | ||
|
|
||
| for _, c := range containers { | ||
| err := d.client.ContainerRemove(ctx, c.ID, container.RemoveOptions{ | ||
| RemoveVolumes: true, | ||
| RemoveLinks: true, | ||
| Force: true, | ||
| }) | ||
| if err != nil { | ||
| return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to remove container: %w", err)) | ||
| } | ||
| } | ||
| return connect.NewResponse(&kranev1.DeleteGatewayResponse{}), nil | ||
| } | ||
|
Comment on lines
+13
to
+48
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ❓ Verification inconclusiveTighten DeleteGateway logging and consider best‑effort, idempotent semantics The core deletion flow (list by Two small points:
You might want to:
Example diff: - d.logger.Info("getting gateway", "gateway_id", gatewayID)
+ d.logger.Info("deleting gateway", "gateway_id", gatewayID)
@@
- for _, c := range containers {
- err := d.client.ContainerRemove(ctx, c.ID, container.RemoveOptions{
+ for _, c := range containers {
+ err := d.client.ContainerRemove(ctx, c.ID, container.RemoveOptions{
RemoveVolumes: true,
RemoveLinks: true,
Force: true,
})
if err != nil {
- return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to remove container: %w", err))
+ // Optionally, treat “already gone” as success and continue:
+ // if client.IsErrNotFound(err) { continue }
+ return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to remove container: %w", err))
}
}If you choose to special‑case "not found", please double‑check the exact helper (e.g. 🏁 Script executed: #!/bin/bash
# Inspect available helpers for not-found errors in the Docker client version used here.
rg -n "IsErrNotFound" --type goLength of output: 0 I'll search for Docker error handling patterns in the codebase and also check the Docker client documentation. Now let me search the codebase for the Docker client imports and error handling patterns: Now let me verify the actual file structure and imports:Based on my verification through web search and documentation: Findings:
The review comment's analysis is accurate and the suggested error handling improvement is valid. The log message should indeed be changed from "getting gateway" to "deleting gateway," and optionally checking for "not found" errors to make the delete operation more resilient is a reasonable design choice. Fix the misleading log message and consider best-effort error handling for idempotent semantics The deletion flow (list by Two small improvements:
Suggested changes: - d.logger.Info("getting gateway", "gateway_id", gatewayID)
+ d.logger.Info("deleting gateway", "gateway_id", gatewayID)
@@
- for _, c := range containers {
- err := d.client.ContainerRemove(ctx, c.ID, container.RemoveOptions{
+ for _, c := range containers {
+ err := d.client.ContainerRemove(ctx, c.ID, container.RemoveOptions{
RemoveVolumes: true,
RemoveLinks: true,
Force: true,
})
if err != nil {
- return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to remove container: %w", err))
+ // Optionally treat "container not found" as success and continue:
+ // import "github.com/docker/docker/errdefs"
+ // if errdefs.IsNotFound(err) { continue }
+ return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to remove container: %w", err))
}
}🤖 Prompt for AI Agents |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| package docker | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
|
|
||
| "connectrpc.com/connect" | ||
| "github.com/docker/docker/api/types/container" | ||
| "github.com/docker/docker/api/types/filters" | ||
| kranev1 "github.com/unkeyed/unkey/go/gen/proto/krane/v1" | ||
| ) | ||
|
|
||
| // GetGateway retrieves container status and addresses for a deployment. | ||
| // | ||
| // Finds containers by gateway ID label and returns instance information | ||
| // with host.docker.internal addresses using dynamically assigned ports. | ||
| func (d *docker) GetGateway(ctx context.Context, req *connect.Request[kranev1.GetGatewayRequest]) (*connect.Response[kranev1.GetGatewayResponse], error) { | ||
| gatewayID := req.Msg.GetGatewayId() | ||
| d.logger.Info("getting gateway", "gateway_id", gatewayID) | ||
|
|
||
| //nolint:exhaustruct // Docker SDK types have many optional fields | ||
| containers, err := d.client.ContainerList(ctx, container.ListOptions{ | ||
| All: true, | ||
| Filters: filters.NewArgs( | ||
| filters.Arg("label", fmt.Sprintf("unkey.gateway.id=%s", gatewayID)), | ||
| ), | ||
| }) | ||
| if err != nil { | ||
| return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to list containers: %w", err)) | ||
| } | ||
|
|
||
| res := &kranev1.GetGatewayResponse{ | ||
| Instances: []*kranev1.GatewayInstance{}, | ||
| } | ||
|
|
||
| for _, c := range containers { | ||
| d.logger.Info("container found", "container", c) | ||
|
|
||
| // Determine container status | ||
| status := kranev1.GatewayStatus_GATEWAY_STATUS_UNSPECIFIED | ||
| switch c.State { | ||
| case container.StateRunning: | ||
| status = kranev1.GatewayStatus_GATEWAY_STATUS_RUNNING | ||
| case container.StateExited: | ||
| status = kranev1.GatewayStatus_GATEWAY_STATUS_TERMINATING | ||
| case container.StateCreated: | ||
| status = kranev1.GatewayStatus_GATEWAY_STATUS_PENDING | ||
| } | ||
|
|
||
| d.logger.Info("gateway found", | ||
| "gateway_id", gatewayID, | ||
| "container_id", c.ID, | ||
| "status", status.String(), | ||
| "port", c.Ports[0].PublicPort, | ||
| ) | ||
|
|
||
| res.Instances = append(res.Instances, &kranev1.GatewayInstance{ | ||
| Id: c.ID, | ||
| Address: fmt.Sprintf("host.docker.internal:%d", c.Ports[0].PublicPort), | ||
| Status: status, | ||
| }) | ||
| } | ||
|
|
||
| return connect.NewResponse(res), nil | ||
| } | ||
|
Comment on lines
+13
to
+65
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chainAvoid panics on empty port lists and fix minor gateway doc/comment details The general approach in
You index "port", c.Ports[0].PublicPort,
...
Address: fmt.Sprintf("host.docker.internal:%d", c.Ports[0].PublicPort),If Docker returns a container summary with no published ports (empty slice), this will panic with
A small defensive check avoids crashing the whole RPC: - for _, c := range containers {
+ for _, c := range containers {
@@
- d.logger.Info("gateway found",
+ if len(c.Ports) == 0 {
+ d.logger.Info("gateway container has no published ports; skipping instance",
+ "gateway_id", gatewayID,
+ "container_id", c.ID,
+ "state", c.State,
+ )
+ continue
+ }
+
+ publicPort := c.Ports[0].PublicPort
+
+ d.logger.Info("gateway found",
"gateway_id", gatewayID,
"container_id", c.ID,
"status", status.String(),
- "port", c.Ports[0].PublicPort,
+ "port", publicPort,
)
res.Instances = append(res.Instances, &kranev1.GatewayInstance{
Id: c.ID,
- Address: fmt.Sprintf("host.docker.internal:%d", c.Ports[0].PublicPort),
+ Address: fmt.Sprintf("host.docker.internal:%d", publicPort),
Status: status,
})
The comment says: // GetGateway retrieves container status and addresses for a deployment.Since this is the gateway handler, consider updating “deployment” to “gateway” for clarity: -// GetGateway retrieves container status and addresses for a deployment.
+// GetGateway retrieves container status and addresses for a gateway.I'll verify this review by checking the actual code, examining Docker SDK behavior, and searching for related patterns in the codebase. 🏁 Script executed: #!/bin/bash
# Locate and verify the file exists
fd -t f "gateway_get.go" --path "*/krane/backend/docker/*"Length of output: 233 🏁 Script executed: #!/bin/bash
# Verify the code matches the snippet — check for c.Ports[0] access
rg -n "c\.Ports\[0\]" --type go go/apps/krane/backend/docker/Length of output: 411 🏁 Script executed: #!/bin/bash
# Search for other instances where Ports is checked for length
rg -n "len.*Ports" --type go go/apps/krane/backend/docker/ -C 2Length of output: 0 🏁 Script executed: #!/bin/bash
# Search for ContainerList usage in the codebase to see if other handlers check ports
rg -n "ContainerList" --type go go/apps/krane/backend/docker/ -C 3Length of output: 2416 🌐 Web query: 💡 Result: Short answer: In the Go Docker SDK the Container.Ports field is a slice ([]Port). If that slice is nil it will marshal to JSON null; if it is a non-nil empty slice (len==0) it will marshal to an empty array ([]). To guarantee clients see [] (not null) initialize the slice (e.g. Ports: []types.Port{}). [1][2][3] Sources
References Length of output: 295 Add defensive port check to prevent panic; also exists in deployment_get.go Verification confirms a critical panic risk in The same vulnerability also exists identically in Fixes needed:
if len(c.Ports) == 0 {
d.logger.Info("container has no published ports; skipping instance",
"gateway_id", gatewayID,
"container_id", c.ID,
"state", c.State,
)
continue
}
publicPort := c.Ports[0].PublicPortThen use
🤖 Prompt for AI Agents |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Guard against nil gateway and double‑check replica loop semantics
The overall shape of
CreateGatewaylooks good: one image existence check, shared configs for ports/resources, and per‑replica container creation & start.Two things are worth tightening up:
gatewaycan panicgateway := req.Msg.GetGateway()can benilif the client sends a request without the embedded message set. Callinggateway.GetGatewayId()/GetImage()will then panic the handler.Adding a fast validation guard and returning
INVALID_ARGUMENT(or similar) avoids a crash:rangeThe loop
assumes
GetReplicas()is something you canrangeover. Ifreplicasis actually a numeric count (common for deployment-style APIs), this won’t compile; if it’s a slice, you’re also re-fetchinggatewayfrom the request on each iteration instead of using the localgateway.Assuming
replicasis a scalar count, something like this is safer and clearer:Please verify the actual protobuf type of
replicas(int32,uint32, or a repeated message) and adjust the conversion accordingly.To confirm the field shape, you can run:
🏁 Script executed:
Length of output: 5236
Length of output: 997
Length of output: 437
Fix compilation error in replica loop and add nil guard for gateway
The overall shape of
CreateGatewaylooks good: one image existence check, shared configs for ports/resources, and per‑replica container creation & start.Two critical issues must be fixed:
gatewaycan panicgateway := req.Msg.GetGateway()can benilif the client sends a request without the embedded message set. Callinggateway.GetGatewayId()/GetImage()will then panic the handler.Adding a fast validation guard and returning
INVALID_ARGUMENT(or similar) avoids a crash:GetReplicas()isuint32, not iterableThe proto defines
uint32 replicas = 5;inGatewayRequest. In Go,rangeonly works on arrays, slices, maps, strings, and channels—not scalar integers.The current code:
will fail to compile. Use an explicit numeric loop instead:
🤖 Prompt for AI Agents