Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
356 changes: 356 additions & 0 deletions apps/operator/internal/controller/database_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ import (
"math/big"
"strings"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
logf "sigs.k8s.io/controller-runtime/pkg/log"

Expand All @@ -42,6 +45,21 @@ const (

// DatabaseTraitType is the trait type identifier for database traits
DatabaseTraitType = "database"

// DefaultPostgresVersion is the default Postgres image tag when not specified.
DefaultPostgresVersion = "16"

// DefaultPostgresPort is the default port for Postgres.
DefaultPostgresPort = 5432

// DefaultDatabaseStorage is the default PVC size for database volumes.
DefaultDatabaseStorage = "1Gi"

// PostgresDataPath is the mount path for Postgres data directory.
PostgresDataPath = "/var/lib/postgresql/data"

// PostgresDataSubPath is the subPath within the PVC to avoid lost+found issues.
PostgresDataSubPath = "pgdata"
)

// DatabaseCredentials holds generated database credentials
Expand Down Expand Up @@ -278,6 +296,344 @@ func (r *HeliosAppReconciler) reconcileDatabaseSecrets(ctx context.Context, app
return nil
}

// reconcileDatabaseInstance provisions database StatefulSets and headless
// Services for components with database traits. This runs AFTER
// reconcileDatabaseSecrets so that the credential Secret already exists
// when the StatefulSet is created.
func (r *HeliosAppReconciler) reconcileDatabaseInstance(ctx context.Context, app *appv1alpha1.HeliosApp) error {
log := logf.FromContext(ctx)

dbTraits := ExtractDatabaseTraits(app)
if len(dbTraits) == 0 {
log.V(1).Info("No database traits found, skipping instance provisioning")
return nil
}

for _, dbTrait := range dbTraits {
// Only provision postgres instances for now
if strings.ToLower(dbTrait.Properties.DBType) != "postgres" {
log.V(1).Info("Skipping non-postgres database type",
"component", dbTrait.ComponentName,
"dbType", dbTrait.Properties.DBType)
continue
}

dbHost := GetDatabaseHost(dbTrait.ComponentName)
secretName := GetDatabaseSecretName(dbTrait.ComponentName)

// Determine effective database name
effectiveDBName := dbTrait.Properties.DBName
if effectiveDBName == "" {
effectiveDBName = fmt.Sprintf("%s-db", dbTrait.ComponentName)
}

// Determine version — CUE schema requires version!, but we
// guard here defensively in case of direct API usage.
version := dbTrait.Properties.Version
if version == "" {
version = DefaultPostgresVersion
}

// Determine port
port := dbTrait.Properties.Port
if port <= 0 {
port = DefaultPostgresPort
}
Comment thread
PhamHoangKha1403 marked this conversation as resolved.

// Determine storage
storage := dbTrait.Properties.Storage
if storage == "" {
storage = DefaultDatabaseStorage
}

// --- StatefulSet ---
sts, err := GenerateDatabaseStatefulSet(
app.Namespace, dbHost, secretName, effectiveDBName, version, storage, int32(port),
)
if err != nil {
log.Error(err, "Failed to generate database StatefulSet",
"component", dbTrait.ComponentName, "storage", storage)
return fmt.Errorf("failed to generate StatefulSet for %s: %w", dbHost, err)
}

if err := ctrl.SetControllerReference(app, sts, r.Scheme); err != nil {
log.Error(err, "Failed to set owner reference for database StatefulSet",
"component", dbTrait.ComponentName)
return fmt.Errorf("failed to set owner reference for StatefulSet %s: %w", dbHost, err)
}

existingSts := &appsv1.StatefulSet{}
err = r.Get(ctx, types.NamespacedName{Name: dbHost, Namespace: app.Namespace}, existingSts)
if err != nil {
if !errors.IsNotFound(err) {
return fmt.Errorf("failed to check for StatefulSet %s: %w", dbHost, err)
}

log.Info("Creating database StatefulSet",
"component", dbTrait.ComponentName,
"statefulset", dbHost,
"image", fmt.Sprintf("postgres:%s", version))

if err := r.Create(ctx, sts); err != nil {
if errors.IsAlreadyExists(err) {
log.Info("Database StatefulSet was created concurrently, skipping",
"component", dbTrait.ComponentName)
} else {
return fmt.Errorf("failed to create StatefulSet %s: %w", dbHost, err)
}
}
} else {
// Handle StatefulSet drift: update spec to match the new template
log.Info("Database StatefulSet already exists, updating if necessary",
"component", dbTrait.ComponentName,
"statefulset", dbHost)

// We only update the mutable fields (Replicas, Template)
updatedSts := existingSts.DeepCopy()
updatedSts.Spec.Replicas = sts.Spec.Replicas
updatedSts.Spec.Template = sts.Spec.Template

// We need to preserve the existing VolumeClaimTemplates when updating
updatedSts.Spec.VolumeClaimTemplates = existingSts.Spec.VolumeClaimTemplates

if err := r.Update(ctx, updatedSts); err != nil {
return fmt.Errorf("failed to update StatefulSet %s: %w", dbHost, err)
}
}
Comment thread
PhamHoangKha1403 marked this conversation as resolved.

// --- Headless Service ---
svc := GenerateDatabaseService(app.Namespace, dbHost, int32(port))

if err := ctrl.SetControllerReference(app, svc, r.Scheme); err != nil {
log.Error(err, "Failed to set owner reference for database Service",
"component", dbTrait.ComponentName)
return fmt.Errorf("failed to set owner reference for Service %s: %w", dbHost, err)
}

existingSvc := &corev1.Service{}
err = r.Get(ctx, types.NamespacedName{Name: dbHost, Namespace: app.Namespace}, existingSvc)
if err != nil {
if !errors.IsNotFound(err) {
return fmt.Errorf("failed to check for Service %s: %w", dbHost, err)
}

log.Info("Creating database headless Service",
"component", dbTrait.ComponentName,
"service", dbHost)

if err := r.Create(ctx, svc); err != nil {
if errors.IsAlreadyExists(err) {
log.Info("Database Service was created concurrently, skipping",
"component", dbTrait.ComponentName)
} else {
return fmt.Errorf("failed to create Service %s: %w", dbHost, err)
}
}
} else {
log.Info("Database Service already exists, updating if necessary",
"component", dbTrait.ComponentName,
"service", dbHost)

updatedSvc := existingSvc.DeepCopy()
updatedSvc.Spec.Ports = svc.Spec.Ports

if err := r.Update(ctx, updatedSvc); err != nil {
return fmt.Errorf("failed to update Service %s: %w", dbHost, err)
}
}

log.Info("Successfully reconciled database instance",
"component", dbTrait.ComponentName,
"statefulset", dbHost,
"dbName", effectiveDBName)
}

return nil
}

// GenerateDatabaseStatefulSet creates a StatefulSet for a Postgres database instance.
// The StatefulSet injects POSTGRES_DB from the CRD's database.name value, and
// uses the Secret from Issue #33 for POSTGRES_USER and POSTGRES_PASSWORD.
func GenerateDatabaseStatefulSet(namespace, name, secretName, dbName, version, storage string, port int32) (*appsv1.StatefulSet, error) {
storageQty, err := resource.ParseQuantity(storage)
if err != nil {
return nil, fmt.Errorf("invalid storage size format %q: %w", storage, err)
}

replicas := int32(1)
labels := map[string]string{
"app": name,
"helios.io/managed-by": "operator",
"helios.io/trait": "database",
"helios.io/db-type": "postgres",
}

return &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: labels,
},
Spec: appsv1.StatefulSetSpec{
ServiceName: name,
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"app": name},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app": name},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "postgres",
Image: fmt.Sprintf("postgres:%s", version),
Ports: []corev1.ContainerPort{
{
ContainerPort: port,
Name: "postgres",
},
},
Env: []corev1.EnvVar{
{
Name: "POSTGRES_DB",
Value: dbName,
},
{
Name: "POSTGRES_USER",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: secretName,
},
Key: "DB_USER",
},
},
},
{
Name: "POSTGRES_PASSWORD",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: secretName,
},
Key: "DB_PASS",
},
},
},
{
// PGDATA tells Postgres where to store cluster data.
// Must match volumeMount + subPath to avoid lost+found conflicts.
Name: "PGDATA",
Value: PostgresDataPath + "/" + PostgresDataSubPath,
},
{
// Ensure consistent UTF-8 encoding for all databases.
Name: "POSTGRES_INITDB_ARGS",
Value: "--encoding=UTF-8 --lc-collate=C --lc-ctype=C",
},
{
// Explicitly set the custom port so that postgres knows to listen on it.
Name: "PGPORT",
Value: fmt.Sprintf("%d", port),
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "data",
MountPath: PostgresDataPath,
SubPath: PostgresDataSubPath,
},
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resourceMustParse("100m"),
corev1.ResourceMemory: resourceMustParse("256Mi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resourceMustParse("500m"),
corev1.ResourceMemory: resourceMustParse("512Mi"),
},
},
ReadinessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"pg_isready", "-U", "$(POSTGRES_USER)", "-d", dbName, "-p", "$(PGPORT)"},
},
},
InitialDelaySeconds: 5,
PeriodSeconds: 10,
},
LivenessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"pg_isready", "-U", "$(POSTGRES_USER)", "-d", dbName, "-p", "$(PGPORT)"},
},
},
InitialDelaySeconds: 30,
PeriodSeconds: 10,
},
},
},
},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
{
ObjectMeta: metav1.ObjectMeta{
Name: "data",
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteOnce,
},
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: storageQty,
},
},
},
},
},
},
}, nil
}

// GenerateDatabaseService creates a headless Service for a database StatefulSet.
// The headless Service (clusterIP: None) provides stable DNS resolution
// so <name> resolves to the database pod.
func GenerateDatabaseService(namespace, name string, port int32) *corev1.Service {
labels := map[string]string{
"app": name,
"helios.io/managed-by": "operator",
"helios.io/trait": "database",
}

return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: labels,
},
Spec: corev1.ServiceSpec{
ClusterIP: "None",
Selector: map[string]string{"app": name},
Ports: []corev1.ServicePort{
{
Port: port,
TargetPort: intstr.FromInt32(port),
Name: "db",
},
},
},
}
}

// resourceMustParse is a helper to parse resource quantities. Panics on invalid input.
func resourceMustParse(s string) resource.Quantity {
return resource.MustParse(s)
}

// GenerateBase64Token generates a random base64-encoded token
// Useful for generating secure webhook secrets or API tokens
func GenerateBase64Token(byteLength int) (string, error) {
Expand Down
Loading