Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/apps/ctrl/services/deployment/create_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func (s *Service) CreateDeployment(
"memory": 2048
}`),
OpenapiSpec: sql.NullString{String: "", Valid: false},
GatewayConfig: env.GatewayConfig,
Status: db.DeploymentsStatusPending,
CreatedAt: now,
UpdatedAt: sql.NullInt64{Int64: now, Valid: true},
Expand Down
7 changes: 3 additions & 4 deletions go/apps/ctrl/workflows/deploy/deploy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,9 @@ func (w *Workflow) Deploy(ctx restate.ObjectContext, req *hydrav1.DeployRequest)

}

}

if allReady {
return resp.Msg.GetInstances(), nil
if allReady {
return resp.Msg.GetInstances(), nil
}
}
// next loop

Expand Down
15 changes: 8 additions & 7 deletions go/apps/ctrl/workflows/project/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,14 @@ func (s *Service) CreateProject(ctx restate.ObjectContext, req *hydrav1.CreatePr

_, err = restate.Run(ctx, func(runCtx restate.RunContext) (restate.Void, error) {
return restate.Void{}, db.Query.InsertEnvironment(runCtx, s.db.RW(), db.InsertEnvironmentParams{
ID: environmentID,
WorkspaceID: workspace.ID,
ProjectID: projectID,
Slug: env.Slug,
Description: env.Description,
CreatedAt: time.Now().UnixMilli(),
UpdatedAt: sql.NullInt64{Valid: false, Int64: 0},
ID: environmentID,
WorkspaceID: workspace.ID,
ProjectID: projectID,
Slug: env.Slug,
Description: env.Description,
CreatedAt: time.Now().UnixMilli(),
UpdatedAt: sql.NullInt64{Valid: false, Int64: 0},
GatewayConfig: []byte(""),
})
}, restate.WithName("insert environment"))

Expand Down
17 changes: 10 additions & 7 deletions go/apps/krane/backend/kubernetes/deployment_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,15 @@ import (
// and ready for traffic after creation.
func (k *k8s) CreateDeployment(ctx context.Context, req *connect.Request[kranev1.CreateDeploymentRequest]) (*connect.Response[kranev1.CreateDeploymentResponse], error) {
k8sDeploymentID := safeIDForK8s(req.Msg.GetDeployment().GetDeploymentId())
namespace := safeIDForK8s(req.Msg.GetDeployment().GetNamespace())

k.logger.Info("creating deployment",
"namespace", req.Msg.GetDeployment().GetNamespace(),
"namespace", namespace,
"deployment_id", k8sDeploymentID,
)

service, err := k.clientset.CoreV1().
Services(req.Msg.GetDeployment().GetNamespace()).
Services(namespace).
Create(ctx,
// This implementation of using stateful sets is very likely not what we want to
// use in v1.
Expand All @@ -87,7 +89,7 @@ func (k *k8s) CreateDeployment(ctx context.Context, req *connect.Request[kranev1
&corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: k8sDeploymentID,
Namespace: req.Msg.GetDeployment().GetNamespace(),
Namespace: namespace,
Labels: map[string]string{
"unkey.deployment.id": k8sDeploymentID,
"unkey.managed.by": "krane",
Expand Down Expand Up @@ -124,12 +126,12 @@ func (k *k8s) CreateDeployment(ctx context.Context, req *connect.Request[kranev1
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to create service: %w", err))
}

sfs, err := k.clientset.AppsV1().StatefulSets(req.Msg.GetDeployment().GetNamespace()).Create(ctx,
sfs, err := k.clientset.AppsV1().StatefulSets(namespace).Create(ctx,
//nolint: exhaustruct
&appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: k8sDeploymentID,
Namespace: req.Msg.GetDeployment().GetNamespace(),
Namespace: namespace,
Labels: map[string]string{
"unkey.deployment.id": k8sDeploymentID,
"unkey.managed.by": "krane",
Expand All @@ -154,6 +156,7 @@ func (k *k8s) CreateDeployment(ctx context.Context, req *connect.Request[kranev1
Annotations: map[string]string{},
},
Spec: corev1.PodSpec{

ImagePullSecrets: func() []corev1.LocalObjectReference {
// Only add imagePullSecrets if using Depot registry
if strings.HasPrefix(req.Msg.GetDeployment().GetImage(), "registry.depot.dev/") {
Expand Down Expand Up @@ -203,7 +206,7 @@ func (k *k8s) CreateDeployment(ctx context.Context, req *connect.Request[kranev1
k.logger.Info("Deleting service, because deployment creation failed")
// Delete service
// nolint: exhaustruct
if rollbackErr := k.clientset.CoreV1().Services(req.Msg.GetDeployment().GetNamespace()).Delete(ctx, service.Name, metav1.DeleteOptions{}); rollbackErr != nil {
if rollbackErr := k.clientset.CoreV1().Services(namespace).Delete(ctx, service.Name, metav1.DeleteOptions{}); rollbackErr != nil {
k.logger.Error("Failed to delete service", "error", rollbackErr.Error())
}

Expand All @@ -221,7 +224,7 @@ func (k *k8s) CreateDeployment(ctx context.Context, req *connect.Request[kranev1
},
}
//nolint:exhaustruct
_, err = k.clientset.CoreV1().Services(req.Msg.GetDeployment().GetNamespace()).Update(ctx, service, metav1.UpdateOptions{})
_, err = k.clientset.CoreV1().Services(namespace).Update(ctx, service, metav1.UpdateOptions{})
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to update deployment: %w", err))
}
Expand Down
7 changes: 4 additions & 3 deletions go/apps/krane/backend/kubernetes/deployment_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,23 @@ import (
// ensure associated pods and other resources are properly terminated.
func (k *k8s) DeleteDeployment(ctx context.Context, req *connect.Request[kranev1.DeleteDeploymentRequest]) (*connect.Response[kranev1.DeleteDeploymentResponse], error) {
k8sDeploymentID := strings.ReplaceAll(req.Msg.GetDeploymentId(), "_", "-")
namespace := safeIDForK8s(req.Msg.GetNamespace())

k.logger.Info("deleting deployment",
"namespace", req.Msg.GetNamespace(),
"namespace", namespace,
"deployment_id", k8sDeploymentID,
)

//nolint: exhaustruct
err := k.clientset.CoreV1().Services(req.Msg.GetNamespace()).Delete(ctx, k8sDeploymentID, metav1.DeleteOptions{
err := k.clientset.CoreV1().Services(namespace).Delete(ctx, k8sDeploymentID, metav1.DeleteOptions{
PropagationPolicy: ptr.P(metav1.DeletePropagationBackground),
})
if err != nil && !apierrors.IsNotFound(err) {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to delete service: %w", err))
}

//nolint: exhaustruct
err = k.clientset.AppsV1().StatefulSets(req.Msg.GetNamespace()).Delete(ctx, k8sDeploymentID, metav1.DeleteOptions{
err = k.clientset.AppsV1().StatefulSets(namespace).Delete(ctx, k8sDeploymentID, metav1.DeleteOptions{
PropagationPolicy: ptr.P(metav1.DeletePropagationBackground),
})
if err != nil && !apierrors.IsNotFound(err) {
Expand Down
7 changes: 4 additions & 3 deletions go/apps/krane/backend/kubernetes/deployment_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ func (k *k8s) GetDeployment(ctx context.Context, req *connect.Request[kranev1.Ge
}

k8sDeploymentID := safeIDForK8s(req.Msg.GetDeploymentId())
namespace := safeIDForK8s(req.Msg.GetNamespace())

k.logger.Info("getting deployment", "deployment_id", k8sDeploymentID)

// Get the Job by name (deployment_id)
// nolint: exhaustruct
sfs, err := k.clientset.AppsV1().StatefulSets(req.Msg.GetNamespace()).Get(ctx, k8sDeploymentID, metav1.GetOptions{})
sfs, err := k.clientset.AppsV1().StatefulSets(namespace).Get(ctx, k8sDeploymentID, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("deployment not found: %s", k8sDeploymentID))
Expand All @@ -57,7 +58,7 @@ func (k *k8s) GetDeployment(ctx context.Context, req *connect.Request[kranev1.Ge

// Get the service to retrieve port info
// nolint: exhaustruct
service, err := k.clientset.CoreV1().Services(req.Msg.GetNamespace()).Get(ctx, k8sDeploymentID, metav1.GetOptions{})
service, err := k.clientset.CoreV1().Services(namespace).Get(ctx, k8sDeploymentID, metav1.GetOptions{})
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("could not load service: %s", k8sDeploymentID))
}
Expand All @@ -73,7 +74,7 @@ func (k *k8s) GetDeployment(ctx context.Context, req *connect.Request[kranev1.Ge
})

//nolint: exhaustruct
pods, err := k.clientset.CoreV1().Pods(req.Msg.GetNamespace()).List(ctx, metav1.ListOptions{
pods, err := k.clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
Expand Down
57 changes: 0 additions & 57 deletions go/apps/krane/backend/kubernetes/eviction.go

This file was deleted.

41 changes: 34 additions & 7 deletions go/apps/krane/backend/kubernetes/gateway_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,52 @@ import (
"github.com/unkeyed/unkey/go/pkg/ptr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

func (k *k8s) CreateGateway(ctx context.Context, req *connect.Request[kranev1.CreateGatewayRequest]) (*connect.Response[kranev1.CreateGatewayResponse], error) {
k8sGatewayID := safeIDForK8s(req.Msg.GetGateway().GetGatewayId())
namespace := safeIDForK8s(req.Msg.GetGateway().GetNamespace())
k.logger.Info("creating deployment",
"namespace", req.Msg.GetGateway().GetNamespace(),
"namespace", namespace,
"deployment_id", k8sGatewayID,
)

// Ensure namespace exists
// It's not ideal to do it here, but it's the best I can do for now without rebuilding the entire workspace creation system.
_, err := k.clientset.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{})
if err != nil {
// If namespace doesn't exist, create it
if errors.IsNotFound(err) {
k.logger.Info("namespace not found, creating it", "namespace", namespace)
_, createErr := k.clientset.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: namespace,
Labels: map[string]string{
"unkey.managed.by": "krane",
},
},
}, metav1.CreateOptions{})
if createErr != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to create namespace: %w", createErr))
}
k.logger.Info("namespace created successfully", "namespace", namespace)
} else {
// Some other error occurred while getting the namespace
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to get namespace: %w", err))
}
}

// Create Deployment
deployment, err := k.clientset.AppsV1().Deployments(req.Msg.GetGateway().GetNamespace()).Create(ctx,
deployment, err := k.clientset.AppsV1().Deployments(namespace).Create(ctx,
//nolint: exhaustruct
&appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: k8sGatewayID,
Namespace: req.Msg.GetGateway().GetNamespace(),
Namespace: namespace,
Labels: map[string]string{
"unkey.gateway.id": k8sGatewayID,
"unkey.managed.by": "krane",
Expand Down Expand Up @@ -82,12 +109,12 @@ func (k *k8s) CreateGateway(ctx context.Context, req *connect.Request[kranev1.Cr
},
}, metav1.CreateOptions{})
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to create deployment: %w", err))
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to create gateway: %w", err))
}

// Create Service with owner reference to the Deployment
service, err := k.clientset.CoreV1().
Services(req.Msg.GetGateway().GetNamespace()).
Services(namespace).
Create(ctx,
// This implementation uses Deployments with ClusterIP services
// for better scalability while maintaining internal accessibility via service name.
Expand All @@ -96,7 +123,7 @@ func (k *k8s) CreateGateway(ctx context.Context, req *connect.Request[kranev1.Cr
&corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: k8sGatewayID,
Namespace: req.Msg.GetGateway().GetNamespace(),
Namespace: namespace,
Labels: map[string]string{
"unkey.gateway.id": k8sGatewayID,
"unkey.managed.by": "krane",
Expand Down Expand Up @@ -138,7 +165,7 @@ func (k *k8s) CreateGateway(ctx context.Context, req *connect.Request[kranev1.Cr
k.logger.Info("Deleting deployment, because service creation failed")
// Delete deployment
// nolint: exhaustruct
if rollbackErr := k.clientset.AppsV1().Deployments(req.Msg.GetGateway().GetNamespace()).Delete(ctx, deployment.Name, metav1.DeleteOptions{}); rollbackErr != nil {
if rollbackErr := k.clientset.AppsV1().Deployments(namespace).Delete(ctx, deployment.Name, metav1.DeleteOptions{}); rollbackErr != nil {
k.logger.Error("Failed to delete deployment", "error", rollbackErr.Error())
}

Expand Down
4 changes: 3 additions & 1 deletion go/apps/krane/backend/kubernetes/gateway_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ import (
func (k *k8s) DeleteGateway(ctx context.Context, req *connect.Request[kranev1.DeleteGatewayRequest]) (*connect.Response[kranev1.DeleteGatewayResponse], error) {
k8sGatewayID := strings.ReplaceAll(req.Msg.GetGatewayId(), "_", "-")

namespace := safeIDForK8s(req.Msg.GetGatewayId())

k.logger.Info("deleting deployment",
"namespace", req.Msg.GetNamespace(),
"namespace", namespace,
"gateway_id", k8sGatewayID,
)

Expand Down
9 changes: 5 additions & 4 deletions go/apps/krane/backend/kubernetes/gateway_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ func (k *k8s) GetGateway(ctx context.Context, req *connect.Request[kranev1.GetGa
}

k8sgatewayID := safeIDForK8s(req.Msg.GetGatewayId())
namespace := safeIDForK8s(req.Msg.GetNamespace())

k.logger.Info("getting gateway", "gateway_id", k8sgatewayID)

// Get the deployment by name (gateway_id)
// nolint: exhaustruct
deployment, err := k.clientset.AppsV1().Deployments(req.Msg.GetNamespace()).Get(ctx, k8sgatewayID, metav1.GetOptions{})
deployment, err := k.clientset.AppsV1().Deployments(namespace).Get(ctx, k8sgatewayID, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("deployment not found: %s", k8sgatewayID))
Expand All @@ -56,16 +57,16 @@ func (k *k8s) GetGateway(ctx context.Context, req *connect.Request[kranev1.GetGa
}

// Get the service to retrieve port info
service, err := k.clientset.CoreV1().Services(req.Msg.GetNamespace()).Get(ctx, k8sgatewayID, metav1.GetOptions{})
service, err := k.clientset.CoreV1().Services(namespace).Get(ctx, k8sgatewayID, metav1.GetOptions{})
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("could not load service %s: %w", k8sgatewayID, err))
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("could not load service: %s", k8sgatewayID))
}
var port int32 = 8080 // default
if len(service.Spec.Ports) > 0 {
port = service.Spec.Ports[0].Port
}

pods, err := k.clientset.CoreV1().Pods(req.Msg.GetNamespace()).List(ctx, metav1.ListOptions{
pods, err := k.clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
LabelSelector: metav1.FormatLabelSelector(&metav1.LabelSelector{
MatchExpressions: nil,
MatchLabels: deployment.Spec.Selector.MatchLabels,
Expand Down
4 changes: 0 additions & 4 deletions go/apps/krane/backend/kubernetes/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,5 @@ func New(cfg Config) (*k8s, error) {
clientset: clientset,
}

if cfg.DeploymentEvictionTTL > 0 {
k.autoEvictDeployments(cfg.DeploymentEvictionTTL)
}

return k, nil
}
Loading