diff --git a/internal/buildapi/flash.go b/internal/buildapi/flash.go new file mode 100644 index 00000000..b202436f --- /dev/null +++ b/internal/buildapi/flash.go @@ -0,0 +1,507 @@ +package buildapi + +import ( + "bufio" + "context" + "fmt" + "io" + "net/http" + "os" + "sort" + "strings" + "time" + + "github.com/gin-gonic/gin" + "github.com/google/uuid" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/client" + + automotivev1alpha1 "github.com/centos-automotive-suite/automotive-dev-operator/api/v1alpha1" + "github.com/centos-automotive-suite/automotive-dev-operator/internal/common/tasks" + tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1" +) + +func (a *APIServer) handleCreateFlash(c *gin.Context) { + a.log.Info("create flash", "reqID", c.GetString("reqID")) + a.createFlash(c) +} + +func (a *APIServer) handleListFlash(c *gin.Context) { + a.log.Info("list flash jobs", "reqID", c.GetString("reqID")) + a.listFlash(c) +} + +func (a *APIServer) handleGetFlash(c *gin.Context) { + name := c.Param("name") + a.log.Info("get flash", "flash", name, "reqID", c.GetString("reqID")) + a.getFlash(c, name) +} + +func (a *APIServer) handleFlashLogs(c *gin.Context) { + name := c.Param("name") + a.log.Info("flash logs requested", "flash", name, "reqID", c.GetString("reqID")) + a.streamFlashLogs(c, name) +} + +func (a *APIServer) createFlash(c *gin.Context) { + var req FlashRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid JSON request"}) + return + } + + // Validate required fields + if req.ImageRef == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "imageRef is required"}) + return + } + if req.ClientConfig == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "clientConfig is required"}) + return + } + + // Auto-generate name if not provided + if req.Name == "" { + req.Name = fmt.Sprintf("flash-%s", uuid.New().String()[:5]) + } + + // Validate and sanitize name for Kubernetes compatibility + if err := validateBuildName(req.Name); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + req.Name = sanitizeBuildNameForValidation(req.Name) + + // Validate mutual exclusivity of lease-name and lease-duration + if req.LeaseName != "" && req.LeaseDuration != "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "lease-name and lease-duration are mutually exclusive"}) + return + } + + k8sClient, err := getClientFromRequestFn(c) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("k8s client error: %v", err)}) + return + } + + restCfg, err := getRESTConfigFromRequestFn(c) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + clientset, err := kubernetes.NewForConfig(restCfg) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + ctx := c.Request.Context() + namespace := resolveNamespace() + requestedBy := a.resolveRequester(c) + + // Load OperatorConfig for target mappings, image overrides, and lease duration defaults + operatorConfig := &automotivev1alpha1.OperatorConfig{} + if err := k8sClient.Get(ctx, types.NamespacedName{Name: "config", Namespace: namespace}, operatorConfig); err != nil { + if !k8serrors.IsNotFound(err) { + a.log.Error(err, "failed to load OperatorConfig for flash, using defaults") + } + operatorConfig = &automotivev1alpha1.OperatorConfig{} + } + + // Resolve exporter selector and flash command from OperatorConfig + exporterSelector, flashCmd := resolveFlashTargetConfig(req, operatorConfig) + if exporterSelector == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "exporterSelector or valid target is required"}) + return + } + + // Replace placeholders in flash command + if flashCmd != "" { + flashCmd = strings.ReplaceAll(flashCmd, "{image_uri}", req.ImageRef) + flashCmd = strings.ReplaceAll(flashCmd, "{artifact_url}", req.ImageRef) + } + + // Create Jumpstarter client config secret + secretName, createdSecret, secretErr := createFlashClientConfigSecret(ctx, clientset, namespace, req) + if secretErr != nil { + c.JSON(secretErr.code, gin.H{"error": secretErr.message}) + return + } + + // Create OCI auth secret for flash image pull credentials + flashOCIAuthSecretName, createdOCIAuthSecret, ociErr := createFlashOCIAuthSecret(ctx, clientset, namespace, req.Name, req.RegistryCredentials) + if ociErr != nil { + _ = clientset.CoreV1().Secrets(namespace).Delete(ctx, secretName, metav1.DeleteOptions{}) + c.JSON(ociErr.code, gin.H{"error": ociErr.message}) + return + } + + // Build task config from OperatorConfig for flash task generation + var flashBuildConfig *tasks.BuildConfig + if operatorConfig.Spec.OSBuilds != nil { + flashBuildConfig = &tasks.BuildConfig{ + FlashTimeoutMinutes: operatorConfig.Spec.OSBuilds.GetFlashTimeoutMinutes(), + DefaultLeaseDuration: operatorConfig.Spec.Jumpstarter.GetDefaultLeaseDuration(), + } + } + + // Get the flash task spec + flashTask := tasks.GenerateFlashTask(namespace, flashBuildConfig) + + // Lease duration: only resolve when not using an existing lease + // Fallback: request > FlashTimeoutMinutes (as HH:MM:SS) > Jumpstarter default > constant + leaseDuration := req.LeaseDuration + if req.LeaseName == "" && leaseDuration == "" { + if operatorConfig.Spec.OSBuilds != nil && operatorConfig.Spec.OSBuilds.FlashTimeoutMinutes > 0 { + m := operatorConfig.Spec.OSBuilds.FlashTimeoutMinutes + leaseDuration = fmt.Sprintf("%02d:%02d:00", m/60, m%60) + } else { + leaseDuration = operatorConfig.Spec.Jumpstarter.GetDefaultLeaseDuration() + } + } + + // Build workspace bindings + workspaces := []tektonv1.WorkspaceBinding{ + { + Name: "jumpstarter-client", + Secret: &corev1.SecretVolumeSource{ + SecretName: secretName, + }, + }, + } + if flashOCIAuthSecretName != "" { + workspaces = append(workspaces, tektonv1.WorkspaceBinding{ + Name: "flash-oci-auth", + Secret: &corev1.SecretVolumeSource{ + SecretName: flashOCIAuthSecretName, + }, + }) + } + + // Create the flash TaskRun + taskRun := &tektonv1.TaskRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: req.Name, + Namespace: namespace, + Labels: map[string]string{ + "app.kubernetes.io/managed-by": "build-api", + "app.kubernetes.io/part-of": "automotive-dev", + "app.kubernetes.io/name": "flash-taskrun", + flashTaskRunLabel: req.Name, + }, + Annotations: map[string]string{ + "automotive.sdv.cloud.redhat.com/requested-by": requestedBy, + "automotive.sdv.cloud.redhat.com/image-ref": req.ImageRef, + }, + }, + Spec: tektonv1.TaskRunSpec{ + ServiceAccountName: automotivev1alpha1.BuildServiceAccountName, + TaskSpec: &flashTask.Spec, + Params: []tektonv1.Param{ + {Name: "image-ref", Value: tektonv1.ParamValue{Type: tektonv1.ParamTypeString, StringVal: req.ImageRef}}, + {Name: "exporter-selector", Value: tektonv1.ParamValue{Type: tektonv1.ParamTypeString, StringVal: exporterSelector}}, + {Name: "flash-cmd", Value: tektonv1.ParamValue{Type: tektonv1.ParamTypeString, StringVal: flashCmd}}, + {Name: "lease-duration", Value: tektonv1.ParamValue{Type: tektonv1.ParamTypeString, StringVal: leaseDuration}}, + {Name: "lease-name", Value: tektonv1.ParamValue{Type: tektonv1.ParamTypeString, StringVal: req.LeaseName}}, + }, + Workspaces: workspaces, + }, + } + + if err := k8sClient.Create(ctx, taskRun); err != nil { + // Clean up secrets if TaskRun creation fails + _ = clientset.CoreV1().Secrets(namespace).Delete(ctx, secretName, metav1.DeleteOptions{}) + if flashOCIAuthSecretName != "" { + _ = clientset.CoreV1().Secrets(namespace).Delete(ctx, flashOCIAuthSecretName, metav1.DeleteOptions{}) + } + c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("failed to create flash TaskRun: %v", err)}) + return + } + + // Set owner reference on secrets for automatic cleanup + ownerRef := []metav1.OwnerReference{ + { + APIVersion: "tekton.dev/v1", + Kind: "TaskRun", + Name: taskRun.Name, + UID: taskRun.UID, + }, + } + createdSecret.OwnerReferences = ownerRef + if _, err := clientset.CoreV1().Secrets(namespace).Update(ctx, createdSecret, metav1.UpdateOptions{}); err != nil { + a.log.Error(err, "failed to set owner reference on secret", "secret", secretName) + } + if createdOCIAuthSecret != nil { + createdOCIAuthSecret.OwnerReferences = ownerRef + if _, updErr := clientset.CoreV1().Secrets(namespace).Update(ctx, createdOCIAuthSecret, metav1.UpdateOptions{}); updErr != nil { + a.log.Error(updErr, "failed to set owner reference on flash OCI auth secret", "secret", flashOCIAuthSecretName) + } + } + + FlashCreatedTotal.Inc() + + writeJSON(c, http.StatusAccepted, FlashResponse{ + Name: req.Name, + Phase: phasePending, + Message: "Flash TaskRun created", + RequestedBy: requestedBy, + TaskRunName: taskRun.Name, + }) +} + +func (a *APIServer) listFlash(c *gin.Context) { + namespace := resolveNamespace() + limit, offset := parsePagination(c) + + k8sClient, err := getClientFromRequestFn(c) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("k8s client error: %v", err)}) + return + } + + ctx := c.Request.Context() + + // List TaskRuns with flash label + taskRunList := &tektonv1.TaskRunList{} + if err := k8sClient.List(ctx, taskRunList, client.InNamespace(namespace), client.HasLabels{flashTaskRunLabel}); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("failed to list flash TaskRuns: %v", err)}) + return + } + + // Sort by creation time, newest first + sort.Slice(taskRunList.Items, func(i, j int) bool { + return taskRunList.Items[j].CreationTimestamp.Before(&taskRunList.Items[i].CreationTimestamp) + }) + + page := applyPagination(taskRunList.Items, limit, offset) + + resp := make([]FlashListItem, 0, len(page)) + for _, tr := range page { + phase, message := getTaskRunStatus(&tr) + var compStr string + if tr.Status.CompletionTime != nil { + compStr = tr.Status.CompletionTime.Format(time.RFC3339) + } + resp = append(resp, FlashListItem{ + Name: tr.Name, + Phase: phase, + Message: message, + RequestedBy: tr.Annotations["automotive.sdv.cloud.redhat.com/requested-by"], + CreatedAt: tr.CreationTimestamp.Format(time.RFC3339), + CompletionTime: compStr, + }) + } + writeJSON(c, http.StatusOK, resp) +} + +func (a *APIServer) getFlash(c *gin.Context, name string) { + namespace := resolveNamespace() + + k8sClient, err := getClientFromRequestFn(c) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("k8s client error: %v", err)}) + return + } + + ctx := c.Request.Context() + taskRun := &tektonv1.TaskRun{} + if err := k8sClient.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, taskRun); err != nil { + if k8serrors.IsNotFound(err) { + c.JSON(http.StatusNotFound, gin.H{"error": "flash TaskRun not found"}) + return + } + c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("failed to get flash TaskRun: %v", err)}) + return + } + + // Verify it's a flash TaskRun + if taskRun.Labels[flashTaskRunLabel] == "" { + c.JSON(http.StatusNotFound, gin.H{"error": "flash TaskRun not found"}) + return + } + + phase, message := getTaskRunStatus(taskRun) + var startStr, compStr string + if taskRun.Status.StartTime != nil { + startStr = taskRun.Status.StartTime.Format(time.RFC3339) + } + if taskRun.Status.CompletionTime != nil { + compStr = taskRun.Status.CompletionTime.Format(time.RFC3339) + } + + writeJSON(c, http.StatusOK, FlashResponse{ + Name: taskRun.Name, + Phase: phase, + Message: message, + RequestedBy: taskRun.Annotations["automotive.sdv.cloud.redhat.com/requested-by"], + StartTime: startStr, + CompletionTime: compStr, + TaskRunName: taskRun.Name, + }) +} + +func getTaskRunStatus(tr *tektonv1.TaskRun) (phase, message string) { + if tr.Status.CompletionTime != nil { + for _, cond := range tr.Status.Conditions { + if cond.Type == "Succeeded" { + if cond.Status == corev1.ConditionTrue { + return phaseCompleted, "Flash completed successfully" + } + if cond.Message == "" { + return phaseFailed, "Flash failed" + } + return phaseFailed, cond.Message + } + } + return phaseFailed, "Flash failed" + } + + if tr.Status.StartTime != nil { + return phaseRunning, "Flash in progress" + } + + return phasePending, "Waiting to start" +} + +func (a *APIServer) streamFlashLogs(c *gin.Context, name string) { + namespace := resolveNamespace() + + k8sClient, err := getClientFromRequestFn(c) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("k8s client error: %v", err)}) + return + } + + restCfg, err := getRESTConfigFromRequestFn(c) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + clientset, err := kubernetes.NewForConfig(restCfg) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + ctx := c.Request.Context() + + // Verify the TaskRun exists and is a flash TaskRun + taskRun := &tektonv1.TaskRun{} + if err := k8sClient.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, taskRun); err != nil { + if k8serrors.IsNotFound(err) { + c.JSON(http.StatusNotFound, gin.H{"error": "flash TaskRun not found"}) + return + } + c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("failed to get flash TaskRun: %v", err)}) + return + } + if taskRun.Labels[flashTaskRunLabel] == "" { + c.JSON(http.StatusNotFound, gin.H{"error": "flash TaskRun not found"}) + return + } + + sinceTime := parseSinceTime(c.Query("since")) + streamDuration := time.Duration(a.limits.MaxLogStreamDurationMinutes) * time.Minute + streamCtx, cancel := context.WithTimeout(ctx, streamDuration) + defer cancel() + + // Get the pod name from TaskRun status + podName := taskRun.Status.PodName + if podName == "" { + c.JSON(http.StatusServiceUnavailable, gin.H{"error": "flash pod not ready"}) + return + } + + setupLogStreamHeaders(c) + + // TaskRun pods use step containers with naming convention "step-" + containerName := "step-flash" + + // Stream logs, retrying while the container is still initializing + logReq := clientset.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{ + Container: containerName, + Follow: true, + SinceTime: sinceTime, + }) + var stream io.ReadCloser + for { + stream, err = logReq.Stream(streamCtx) + if err == nil { + break + } + pod, getErr := clientset.CoreV1().Pods(namespace).Get(streamCtx, podName, metav1.GetOptions{}) + if getErr != nil || !isPodInitializing(pod) { + _, _ = fmt.Fprintf(c.Writer, "\n[Error streaming logs: %v]\n", err) + c.Writer.Flush() + return + } + select { + case <-streamCtx.Done(): + _, _ = fmt.Fprintf(c.Writer, "\n[Timed out waiting for container]\n") + c.Writer.Flush() + return + case <-time.After(500 * time.Millisecond): + } + } + defer func() { + if err := stream.Close(); err != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to close stream: %v\n", err) + } + }() + + _, _ = c.Writer.Write([]byte("\n===== Flash TaskRun Logs =====\n\n")) + c.Writer.Flush() + + scanner := bufio.NewScanner(stream) + scanner.Buffer(make([]byte, 64*1024), 1024*1024) + + for scanner.Scan() { + select { + case <-streamCtx.Done(): + return + default: + } + line := scanner.Bytes() + if _, writeErr := c.Writer.Write(line); writeErr != nil { + return + } + if _, writeErr := c.Writer.Write([]byte("\n")); writeErr != nil { + return + } + c.Writer.Flush() + } + + if err := scanner.Err(); err != nil && err != io.EOF { + var errMsg []byte + errMsg = fmt.Appendf(errMsg, "\n[Stream error: %v]\n", err) + _, _ = c.Writer.Write(errMsg) + c.Writer.Flush() + } + + _, _ = c.Writer.Write([]byte("\n[Log streaming completed]\n")) + c.Writer.Flush() +} + +func isPodInitializing(pod *corev1.Pod) bool { + if pod.Status.Phase == corev1.PodPending { + return true + } + for _, cs := range pod.Status.ContainerStatuses { + if cs.State.Waiting != nil { + switch cs.State.Waiting.Reason { + case "ContainerCreating", "PodInitializing": + return true + } + } + } + for _, cs := range pod.Status.InitContainerStatuses { + if cs.State.Running != nil || cs.State.Waiting != nil { + return true + } + } + return false +} diff --git a/internal/buildapi/flash_metrics.go b/internal/buildapi/flash_metrics.go new file mode 100644 index 00000000..f21ed464 --- /dev/null +++ b/internal/buildapi/flash_metrics.go @@ -0,0 +1,64 @@ +package buildapi + +import ( + "fmt" + "time" + + "github.com/gin-gonic/gin" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +const ( + flashMetricsNamespace = "ado" + flashMetricsSubsystem = "flash" +) + +var ( + // FlashCreatedTotal counts standalone flash operations created via the REST API. + FlashCreatedTotal = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: flashMetricsNamespace, + Subsystem: flashMetricsSubsystem, + Name: "created_total", + Help: "Total number of flash operations created", + }, + ) + + // FlashRequestDuration tracks flash API request duration by endpoint and status code. + FlashRequestDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: flashMetricsNamespace, + Subsystem: flashMetricsSubsystem, + Name: "request_duration_seconds", + Help: "Flash API request duration in seconds", + Buckets: prometheus.DefBuckets, + }, + []string{"endpoint", "status_code"}, + ) +) + +func init() { + prometheus.MustRegister( + FlashCreatedTotal, + FlashRequestDuration, + ) +} + +func flashMetricsMiddleware() gin.HandlerFunc { + return func(c *gin.Context) { + start := time.Now() + c.Next() + duration := time.Since(start).Seconds() + endpoint := c.FullPath() + statusCode := fmt.Sprintf("%d", c.Writer.Status()) + FlashRequestDuration.WithLabelValues(endpoint, statusCode).Observe(duration) + } +} + +func metricsHandler() gin.HandlerFunc { + h := promhttp.Handler() + return func(c *gin.Context) { + h.ServeHTTP(c.Writer, c.Request) + } +} diff --git a/internal/buildapi/flash_metrics_test.go b/internal/buildapi/flash_metrics_test.go new file mode 100644 index 00000000..bd9226ab --- /dev/null +++ b/internal/buildapi/flash_metrics_test.go @@ -0,0 +1,68 @@ +package buildapi + +import ( + "net/http" + "net/http/httptest" + + "github.com/gin-gonic/gin" + "github.com/go-logr/logr" + . "github.com/onsi/ginkgo/v2" //nolint:revive // Dot import is standard for Ginkgo + . "github.com/onsi/gomega" //nolint:revive // Dot import is standard for Gomega + io_prometheus_client "github.com/prometheus/client_model/go" +) + +var _ = Describe("Flash Metrics", func() { + var server *APIServer + + BeforeEach(func() { + gin.SetMode(gin.TestMode) + server = NewAPIServer(":0", logr.Discard()) + }) + + Context("metrics endpoint", func() { + It("should expose prometheus metrics at /metrics", func() { + // Ensure at least one observation so the histogram appears + FlashRequestDuration.WithLabelValues("/v1/flash", "200").Observe(0.001) + + req, err := http.NewRequest("GET", "/metrics", nil) + Expect(err).NotTo(HaveOccurred()) + + w := httptest.NewRecorder() + server.router.ServeHTTP(w, req) + + Expect(w.Code).To(Equal(http.StatusOK)) + body := w.Body.String() + Expect(body).To(ContainSubstring("ado_flash_created_total")) + Expect(body).To(ContainSubstring("ado_flash_request_duration_seconds")) + }) + }) + + Context("FlashCreatedTotal", func() { + It("should increment on flash creation", func() { + m := &io_prometheus_client.Metric{} + Expect(FlashCreatedTotal.Write(m)).To(Succeed()) + before := m.GetCounter().GetValue() + + FlashCreatedTotal.Inc() + + m = &io_prometheus_client.Metric{} + Expect(FlashCreatedTotal.Write(m)).To(Succeed()) + after := m.GetCounter().GetValue() + Expect(after - before).To(Equal(float64(1))) + }) + }) + + Context("FlashRequestDuration", func() { + It("should record request duration with endpoint and status labels", func() { + FlashRequestDuration.WithLabelValues("/v1/flash", "200").Observe(0.5) + + m := &io_prometheus_client.Metric{} + obs, err := FlashRequestDuration.GetMetricWithLabelValues("/v1/flash", "200") + Expect(err).NotTo(HaveOccurred()) + Expect(obs.(interface { + Write(*io_prometheus_client.Metric) error + }).Write(m)).To(Succeed()) + Expect(m.GetHistogram().GetSampleCount()).To(BeNumerically(">=", 1)) + }) + }) +}) diff --git a/internal/buildapi/flash_test.go b/internal/buildapi/flash_test.go new file mode 100644 index 00000000..4b8fda85 --- /dev/null +++ b/internal/buildapi/flash_test.go @@ -0,0 +1,299 @@ +package buildapi + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "os" + "time" + + tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + apis "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/gin-gonic/gin" + "github.com/go-logr/logr" + . "github.com/onsi/ginkgo/v2" //nolint:revive // Dot import is standard for Ginkgo + . "github.com/onsi/gomega" //nolint:revive // Dot import is standard for Gomega +) + +var _ = Describe("Flash", func() { + var ( + server *APIServer + originalGetClientFromRequestFn func(*gin.Context) (ctrlclient.Client, error) + originalGetRESTConfigFromRequestFn func(*gin.Context) (*rest.Config, error) + originalNamespace string + hasOriginalNamespace bool + ) + + newFakeClient := func(objs ...ctrlclient.Object) ctrlclient.Client { + scheme := runtime.NewScheme() + Expect(corev1.AddToScheme(scheme)).To(Succeed()) + Expect(tektonv1.AddToScheme(scheme)).To(Succeed()) + builder := fake.NewClientBuilder().WithScheme(scheme) + for _, obj := range objs { + builder = builder.WithObjects(obj) + } + return builder.Build() + } + + newFlashTaskRun := func(name, requestedBy, phase string) *tektonv1.TaskRun { + tr := &tektonv1.TaskRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "test-ns", + Labels: map[string]string{ + flashTaskRunLabel: name, + }, + Annotations: map[string]string{ + "automotive.sdv.cloud.redhat.com/requested-by": requestedBy, + }, + CreationTimestamp: metav1.NewTime(time.Now()), + }, + } + switch phase { + case "running": + now := metav1.Now() + tr.Status.StartTime = &now + case "completed": + now := metav1.Now() + tr.Status.StartTime = &now + tr.Status.CompletionTime = &now + tr.Status.Status = duckv1.Status{ + Conditions: duckv1.Conditions{ + { + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + }, + }, + } + case "failed": + now := metav1.Now() + tr.Status.StartTime = &now + tr.Status.CompletionTime = &now + tr.Status.Status = duckv1.Status{ + Conditions: duckv1.Conditions{ + { + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + Message: "build step failed", + }, + }, + } + } + return tr + } + + BeforeEach(func() { + gin.SetMode(gin.TestMode) + server = NewAPIServer(":0", logr.Discard()) + originalGetClientFromRequestFn = getClientFromRequestFn + originalGetRESTConfigFromRequestFn = getRESTConfigFromRequestFn + originalNamespace, hasOriginalNamespace = os.LookupEnv("BUILD_API_NAMESPACE") + Expect(os.Setenv("BUILD_API_NAMESPACE", "test-ns")).To(Succeed()) + }) + + AfterEach(func() { + getClientFromRequestFn = originalGetClientFromRequestFn + getRESTConfigFromRequestFn = originalGetRESTConfigFromRequestFn + if hasOriginalNamespace { + Expect(os.Setenv("BUILD_API_NAMESPACE", originalNamespace)).To(Succeed()) + } else { + Expect(os.Unsetenv("BUILD_API_NAMESPACE")).To(Succeed()) + } + }) + + Context("getTaskRunStatus", func() { + It("should return pending for TaskRun with no start time", func() { + tr := newFlashTaskRun("test-flash", "alice", "pending") + phase, msg := getTaskRunStatus(tr) + Expect(phase).To(Equal(phasePending)) + Expect(msg).To(Equal("Waiting to start")) + }) + + It("should return running for started TaskRun", func() { + tr := newFlashTaskRun("test-flash", "alice", "running") + phase, msg := getTaskRunStatus(tr) + Expect(phase).To(Equal(phaseRunning)) + Expect(msg).To(Equal("Flash in progress")) + }) + + It("should return completed for successful TaskRun", func() { + tr := newFlashTaskRun("test-flash", "alice", "completed") + phase, msg := getTaskRunStatus(tr) + Expect(phase).To(Equal(phaseCompleted)) + Expect(msg).To(Equal("Flash completed successfully")) + }) + + It("should return failed with message for failed TaskRun", func() { + tr := newFlashTaskRun("test-flash", "alice", "failed") + phase, msg := getTaskRunStatus(tr) + Expect(phase).To(Equal(phaseFailed)) + Expect(msg).To(Equal("build step failed")) + }) + + It("should return failed when completed with no Succeeded condition", func() { + tr := newFlashTaskRun("test-flash", "alice", "pending") + now := metav1.Now() + tr.Status.CompletionTime = &now + phase, msg := getTaskRunStatus(tr) + Expect(phase).To(Equal(phaseFailed)) + Expect(msg).To(Equal("Flash failed")) + }) + }) + + Context("getFlash", func() { + It("should return 404 for nonexistent flash TaskRun", func() { + fakeClient := newFakeClient() + getClientFromRequestFn = func(_ *gin.Context) (ctrlclient.Client, error) { + return fakeClient, nil + } + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request, _ = http.NewRequest(http.MethodGet, "/v1/flashes/nonexistent", nil) + + server.getFlash(c, "nonexistent") + + Expect(w.Code).To(Equal(http.StatusNotFound)) + Expect(w.Body.String()).To(ContainSubstring("flash TaskRun not found")) + }) + + It("should return 404 for TaskRun without flash label", func() { + tr := &tektonv1.TaskRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: "not-a-flash", + Namespace: "test-ns", + Labels: map[string]string{}, + }, + } + fakeClient := newFakeClient(tr) + getClientFromRequestFn = func(_ *gin.Context) (ctrlclient.Client, error) { + return fakeClient, nil + } + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request, _ = http.NewRequest(http.MethodGet, "/v1/flashes/not-a-flash", nil) + + server.getFlash(c, "not-a-flash") + + Expect(w.Code).To(Equal(http.StatusNotFound)) + }) + + It("should return flash details for valid flash TaskRun", func() { + tr := newFlashTaskRun("my-flash", "alice", "running") + fakeClient := newFakeClient(tr) + getClientFromRequestFn = func(_ *gin.Context) (ctrlclient.Client, error) { + return fakeClient, nil + } + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request, _ = http.NewRequest(http.MethodGet, "/v1/flashes/my-flash", nil) + + server.getFlash(c, "my-flash") + + Expect(w.Code).To(Equal(http.StatusOK)) + var resp FlashResponse + Expect(json.Unmarshal(w.Body.Bytes(), &resp)).To(Succeed()) + Expect(resp.Name).To(Equal("my-flash")) + Expect(resp.Phase).To(Equal(phaseRunning)) + Expect(resp.RequestedBy).To(Equal("alice")) + }) + }) + + Context("listFlash", func() { + It("should return empty list when no flash TaskRuns exist", func() { + fakeClient := newFakeClient() + getClientFromRequestFn = func(_ *gin.Context) (ctrlclient.Client, error) { + return fakeClient, nil + } + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request, _ = http.NewRequest(http.MethodGet, "/v1/flashes", nil) + + server.listFlash(c) + + Expect(w.Code).To(Equal(http.StatusOK)) + var resp []FlashListItem + Expect(json.Unmarshal(w.Body.Bytes(), &resp)).To(Succeed()) + Expect(resp).To(BeEmpty()) + }) + + It("should list flash TaskRuns sorted by creation time", func() { + tr1 := newFlashTaskRun("flash-old", "alice", "completed") + tr1.CreationTimestamp = metav1.NewTime(time.Now().Add(-1 * time.Hour)) + tr2 := newFlashTaskRun("flash-new", "bob", "running") + tr2.CreationTimestamp = metav1.NewTime(time.Now()) + + fakeClient := newFakeClient(tr1, tr2) + getClientFromRequestFn = func(_ *gin.Context) (ctrlclient.Client, error) { + return fakeClient, nil + } + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request, _ = http.NewRequest(http.MethodGet, "/v1/flashes", nil) + + server.listFlash(c) + + Expect(w.Code).To(Equal(http.StatusOK)) + var resp []FlashListItem + Expect(json.Unmarshal(w.Body.Bytes(), &resp)).To(Succeed()) + Expect(resp).To(HaveLen(2)) + Expect(resp[0].Name).To(Equal("flash-new")) + Expect(resp[1].Name).To(Equal("flash-old")) + }) + }) + + Context("streamFlashLogs", func() { + var fakeRESTConfig *rest.Config + + BeforeEach(func() { + fakeRESTConfig = &rest.Config{Host: "https://fake-k8s:6443"} + getRESTConfigFromRequestFn = func(_ *gin.Context) (*rest.Config, error) { + return fakeRESTConfig, nil + } + }) + + It("should return 404 for nonexistent flash TaskRun", func() { + fakeClient := newFakeClient() + getClientFromRequestFn = func(_ *gin.Context) (ctrlclient.Client, error) { + return fakeClient, nil + } + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request, _ = http.NewRequest(http.MethodGet, "/v1/flashes/nonexistent/logs", nil) + + server.streamFlashLogs(c, "nonexistent") + + Expect(w.Code).To(Equal(http.StatusNotFound)) + }) + + It("should return 503 when flash pod is not ready", func() { + tr := newFlashTaskRun("my-flash", "alice", "pending") + fakeClient := newFakeClient(tr) + getClientFromRequestFn = func(_ *gin.Context) (ctrlclient.Client, error) { + return fakeClient, nil + } + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request, _ = http.NewRequest(http.MethodGet, "/v1/flashes/my-flash/logs", nil) + + server.streamFlashLogs(c, "my-flash") + + Expect(w.Code).To(Equal(http.StatusServiceUnavailable)) + Expect(w.Body.String()).To(ContainSubstring("flash pod not ready")) + }) + }) +}) diff --git a/internal/buildapi/helpers.go b/internal/buildapi/helpers.go new file mode 100644 index 00000000..9910fad5 --- /dev/null +++ b/internal/buildapi/helpers.go @@ -0,0 +1,235 @@ +package buildapi + +import ( + "fmt" + "net/http" + "os" + "strconv" + "strings" + "time" + + "github.com/gin-gonic/gin" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "sigs.k8s.io/controller-runtime/pkg/client" + + automotivev1alpha1 "github.com/centos-automotive-suite/automotive-dev-operator/api/v1alpha1" + tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1" +) + +func writeJSON(c *gin.Context, status int, v any) { + c.Header("Cache-Control", "no-store") + c.IndentedJSON(status, v) +} + +const maxPageLimit = 500 + +// parsePagination extracts limit and offset from query parameters. +// When limit is not provided, 0 is returned and applyPagination returns +// the full slice (preserving backward compatibility for existing clients). +// When provided, limit is clamped to maxPageLimit (500). +func parsePagination(c *gin.Context) (limit, offset int) { + if l := c.Query("limit"); l != "" { + if n, err := strconv.Atoi(l); err == nil && n > 0 { + limit = n + if limit > maxPageLimit { + limit = maxPageLimit + } + } + } + if o := c.Query("offset"); o != "" { + if n, err := strconv.Atoi(o); err == nil && n >= 0 { + offset = n + } + } + return +} + +// applyPagination returns the paginated window of items. A limit of 0 +// means "no limit" — the full slice (from offset) is returned. +func applyPagination[T any](items []T, limit, offset int) []T { + if offset >= len(items) { + return []T{} + } + if limit <= 0 { + return items[offset:] + } + end := offset + limit + if end > len(items) { + end = len(items) + } + return items[offset:end] +} + +func parseSinceTime(sinceParam string) *metav1.Time { + if sinceParam == "" { + return nil + } + t, err := time.Parse(time.RFC3339, sinceParam) + if err != nil { + return nil + } + return &metav1.Time{Time: t} +} + +func resolveNamespace() string { + if ns := strings.TrimSpace(os.Getenv("BUILD_API_NAMESPACE")); ns != "" { + return ns + } + if data, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil { + ns := strings.TrimSpace(string(data)) + if ns != "" { + return ns + } + } + return "default" +} + +func getRESTConfigFromRequest(_ *gin.Context) (*rest.Config, error) { + var cfg *rest.Config + var err error + cfg, err = rest.InClusterConfig() + if err != nil { + kubeconfig := os.Getenv("KUBECONFIG") + cfg, err = clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + return nil, fmt.Errorf("failed to build kube config: %w", err) + } + } + cfgCopy := rest.CopyConfig(cfg) + cfgCopy.Timeout = 30 * time.Minute + return cfgCopy, nil +} + +// getKubernetesClient creates a controller-runtime client for accessing Kubernetes resources +func getKubernetesClient() (client.Client, error) { + cfg, err := rest.InClusterConfig() + if err != nil { + kubeconfig := os.Getenv("KUBECONFIG") + cfg, err = clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + return nil, fmt.Errorf("failed to build kube config: %w", err) + } + } + + scheme := runtime.NewScheme() + if err := automotivev1alpha1.AddToScheme(scheme); err != nil { + return nil, fmt.Errorf("failed to add scheme: %w", err) + } + + k8sClient, err := client.New(cfg, client.Options{Scheme: scheme}) + if err != nil { + return nil, fmt.Errorf("failed to create k8s client: %w", err) + } + return k8sClient, nil +} + +func getClientFromRequest(c *gin.Context) (client.Client, error) { + cfg, err := getRESTConfigFromRequest(c) + if err != nil { + return nil, err + } + + scheme := runtime.NewScheme() + if err := automotivev1alpha1.AddToScheme(scheme); err != nil { + return nil, fmt.Errorf("failed to add automotive scheme: %w", err) + } + if err := corev1.AddToScheme(scheme); err != nil { + return nil, fmt.Errorf("failed to add core scheme: %w", err) + } + if err := tektonv1.AddToScheme(scheme); err != nil { + return nil, fmt.Errorf("failed to add tekton scheme: %w", err) + } + + k8sClient, err := client.New(cfg, client.Options{Scheme: scheme}) + if err != nil { + return nil, fmt.Errorf("failed to create k8s client: %w", err) + } + return k8sClient, nil +} + +// setupLogStreamHeaders configures HTTP headers for log streaming +func setupLogStreamHeaders(c *gin.Context) { + c.Writer.Header().Set("Content-Type", "text/plain; charset=utf-8") + c.Writer.Header().Set("Transfer-Encoding", "chunked") + c.Writer.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate") + c.Writer.Header().Set("Connection", "keep-alive") + c.Writer.Header().Set("X-Accel-Buffering", "no") + c.Writer.Header().Set("X-Content-Type-Options", "nosniff") + c.Writer.Header().Set("Pragma", "no-cache") + c.Writer.WriteHeader(http.StatusOK) + _, _ = c.Writer.Write([]byte("Waiting for logs...\n")) + c.Writer.Flush() +} + +// Shell metacharacters that must be blocked to prevent injection attacks +var shellMetachars = []string{";", "|", "&", "$", "`", "(", ")", "{", "}", "<", ">", "!", "\\", "'", "\"", "\n", "\r"} + +// validateInput validates a string for dangerous characters and length +func validateInput(value, fieldName string, maxLen int, allowEmpty bool, extraChars ...string) error { + if value == "" { + if allowEmpty { + return nil + } + return fmt.Errorf("%s is required", fieldName) + } + + // Combine shell metacharacters with any additional blocked characters + blockedChars := append(shellMetachars, extraChars...) + for _, char := range blockedChars { + if strings.Contains(value, char) { + return fmt.Errorf("%s contains invalid character: %q", fieldName, char) + } + } + + if len(value) > maxLen { + return fmt.Errorf("%s too long (max %d characters)", fieldName, maxLen) + } + return nil +} + +func validateContainerRef(ref string) error { + return validateInput(ref, "container reference", 500, true) +} + +func validateBuildName(name string) error { + if err := validateInput(name, "build name", 253, false, "/"); err != nil { + return err + } + + sanitized := sanitizeBuildNameForValidation(name) + if sanitized == "" { + return fmt.Errorf("build name contains only invalid characters") + } + + return nil +} + +func sanitizeBuildNameForValidation(name string) string { + name = strings.ToLower(name) + var b strings.Builder + for _, r := range name { + if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '-' { + b.WriteRune(r) + } else { + b.WriteRune('-') + } + } + result := strings.ReplaceAll(b.String(), "--", "-") + for strings.Contains(result, "--") { + result = strings.ReplaceAll(result, "--", "-") + } + return strings.Trim(result, "-") +} + +func (a *APIServer) resolveRequester(c *gin.Context) string { + if v, ok := c.Get("requester"); ok { + if username, ok := v.(string); ok && username != "" { + return username + } + } + return "unknown" +} diff --git a/internal/buildapi/server.go b/internal/buildapi/server.go index 42b90a9f..b19c270c 100644 --- a/internal/buildapi/server.go +++ b/internal/buildapi/server.go @@ -17,7 +17,6 @@ import ( "path" "regexp" "sort" - "strconv" "strings" "sync" "time" @@ -550,6 +549,8 @@ func (a *APIServer) createRouter() *gin.Engine { c.Next() }) + router.GET("/metrics", metricsHandler()) + v1 := router.Group("/v1") { v1.GET("/healthz", func(c *gin.Context) { @@ -579,7 +580,7 @@ func (a *APIServer) createRouter() *gin.Engine { } flashGroup := v1.Group("/flash") - flashGroup.Use(a.authMiddleware()) + flashGroup.Use(flashMetricsMiddleware(), a.authMiddleware()) { flashGroup.POST("", a.handleCreateFlash) flashGroup.GET("", a.handleListFlash) @@ -992,20 +993,6 @@ func (a *APIServer) handleUploadFiles(c *gin.Context) { a.uploadFiles(c, name) } -// setupLogStreamHeaders configures HTTP headers for log streaming -func setupLogStreamHeaders(c *gin.Context) { - c.Writer.Header().Set("Content-Type", "text/plain; charset=utf-8") - c.Writer.Header().Set("Transfer-Encoding", "chunked") - c.Writer.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate") - c.Writer.Header().Set("Connection", "keep-alive") - c.Writer.Header().Set("X-Accel-Buffering", "no") - c.Writer.Header().Set("X-Content-Type-Options", "nosniff") - c.Writer.Header().Set("Pragma", "no-cache") - c.Writer.WriteHeader(http.StatusOK) - _, _ = c.Writer.Write([]byte("Waiting for logs...\n")) - c.Writer.Flush() -} - // getStepContainerNames returns container names for pipeline steps func getStepContainerNames(pod corev1.Pod) []string { stepNames := make([]string, 0, len(pod.Spec.Containers)) @@ -1501,67 +1488,6 @@ func createPushSecret( return secretName, nil } -// Shell metacharacters that must be blocked to prevent injection attacks -var shellMetachars = []string{";", "|", "&", "$", "`", "(", ")", "{", "}", "<", ">", "!", "\\", "'", "\"", "\n", "\r"} - -// validateInput validates a string for dangerous characters and length -func validateInput(value, fieldName string, maxLen int, allowEmpty bool, extraChars ...string) error { - if value == "" { - if allowEmpty { - return nil - } - return fmt.Errorf("%s is required", fieldName) - } - - // Combine shell metacharacters with any additional blocked characters - blockedChars := append(shellMetachars, extraChars...) - for _, char := range blockedChars { - if strings.Contains(value, char) { - return fmt.Errorf("%s contains invalid character: %q", fieldName, char) - } - } - - if len(value) > maxLen { - return fmt.Errorf("%s too long (max %d characters)", fieldName, maxLen) - } - return nil -} - -func validateContainerRef(ref string) error { - return validateInput(ref, "container reference", 500, true) -} - -func validateBuildName(name string) error { - if err := validateInput(name, "build name", 253, false, "/"); err != nil { - return err - } - - // Check if name would become empty after sanitization - sanitized := sanitizeBuildNameForValidation(name) - if sanitized == "" { - return fmt.Errorf("build name contains only invalid characters") - } - - return nil -} - -func sanitizeBuildNameForValidation(name string) string { - name = strings.ToLower(name) - var b strings.Builder - for _, r := range name { - if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '-' { - b.WriteRune(r) - } else { - b.WriteRune('-') - } - } - result := strings.ReplaceAll(b.String(), "--", "-") - for strings.Contains(result, "--") { - result = strings.ReplaceAll(result, "--", "-") - } - return strings.Trim(result, "-") -} - // validateBuildRequest validates the build request, sanitizes the name, and applies defaults func validateBuildRequest(req *BuildRequest) error { if err := validateBuildName(req.Name); err != nil { @@ -2882,137 +2808,6 @@ func createFlashClientSecret( return c.Create(ctx, secret) } -func writeJSON(c *gin.Context, status int, v any) { - c.Header("Cache-Control", "no-store") - c.IndentedJSON(status, v) -} - -const maxPageLimit = 500 - -// parsePagination extracts limit and offset from query parameters. -// When limit is not provided, 0 is returned and applyPagination returns -// the full slice (preserving backward compatibility for existing clients). -// When provided, limit is clamped to maxPageLimit (500). -func parsePagination(c *gin.Context) (limit, offset int) { - if l := c.Query("limit"); l != "" { - if n, err := strconv.Atoi(l); err == nil && n > 0 { - limit = n - if limit > maxPageLimit { - limit = maxPageLimit - } - } - } - if o := c.Query("offset"); o != "" { - if n, err := strconv.Atoi(o); err == nil && n >= 0 { - offset = n - } - } - return -} - -// applyPagination returns the paginated window of items. A limit of 0 -// means "no limit" — the full slice (from offset) is returned. -func applyPagination[T any](items []T, limit, offset int) []T { - if offset >= len(items) { - return []T{} - } - if limit <= 0 { - return items[offset:] - } - end := offset + limit - if end > len(items) { - end = len(items) - } - return items[offset:end] -} - -func parseSinceTime(sinceParam string) *metav1.Time { - if sinceParam == "" { - return nil - } - t, err := time.Parse(time.RFC3339, sinceParam) - if err != nil { - return nil - } - return &metav1.Time{Time: t} -} - -func resolveNamespace() string { - if ns := strings.TrimSpace(os.Getenv("BUILD_API_NAMESPACE")); ns != "" { - return ns - } - if data, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil { - ns := strings.TrimSpace(string(data)) - if ns != "" { - return ns - } - } - return "default" -} - -func getRESTConfigFromRequest(_ *gin.Context) (*rest.Config, error) { - var cfg *rest.Config - var err error - cfg, err = rest.InClusterConfig() - if err != nil { - kubeconfig := os.Getenv("KUBECONFIG") - cfg, err = clientcmd.BuildConfigFromFlags("", kubeconfig) - if err != nil { - return nil, fmt.Errorf("failed to build kube config: %w", err) - } - } - cfgCopy := rest.CopyConfig(cfg) - cfgCopy.Timeout = 30 * time.Minute - return cfgCopy, nil -} - -// getKubernetesClient creates a controller-runtime client for accessing Kubernetes resources -func getKubernetesClient() (client.Client, error) { - cfg, err := rest.InClusterConfig() - if err != nil { - kubeconfig := os.Getenv("KUBECONFIG") - cfg, err = clientcmd.BuildConfigFromFlags("", kubeconfig) - if err != nil { - return nil, fmt.Errorf("failed to build kube config: %w", err) - } - } - - scheme := runtime.NewScheme() - if err := automotivev1alpha1.AddToScheme(scheme); err != nil { - return nil, fmt.Errorf("failed to add scheme: %w", err) - } - - k8sClient, err := client.New(cfg, client.Options{Scheme: scheme}) - if err != nil { - return nil, fmt.Errorf("failed to create k8s client: %w", err) - } - return k8sClient, nil -} - -func getClientFromRequest(c *gin.Context) (client.Client, error) { - cfg, err := getRESTConfigFromRequest(c) - if err != nil { - return nil, err - } - - scheme := runtime.NewScheme() - if err := automotivev1alpha1.AddToScheme(scheme); err != nil { - return nil, fmt.Errorf("failed to add automotive scheme: %w", err) - } - if err := corev1.AddToScheme(scheme); err != nil { - return nil, fmt.Errorf("failed to add core scheme: %w", err) - } - if err := tektonv1.AddToScheme(scheme); err != nil { - return nil, fmt.Errorf("failed to add tekton scheme: %w", err) - } - - k8sClient, err := client.New(cfg, client.Options{Scheme: scheme}) - if err != nil { - return nil, fmt.Errorf("failed to create k8s client: %w", err) - } - return k8sClient, nil -} - // refreshAuthConfigIfNeeded periodically checks and refreshes authentication configuration from OperatorConfig // IMPORTANT: This function only recreates the OIDC authenticator if the config actually changed. func (a *APIServer) refreshAuthConfigIfNeeded() { @@ -3274,15 +3069,6 @@ func extractBearerToken(c *gin.Context) string { return "" } -func (a *APIServer) resolveRequester(c *gin.Context) string { - if v, ok := c.Get("requester"); ok { - if username, ok := v.(string); ok && username != "" { - return username - } - } - return "unknown" -} - // handleGetAuthConfig returns OIDC configuration for clients (no auth required) func (a *APIServer) handleGetAuthConfig(c *gin.Context) { // Refresh auth config if needed @@ -3442,30 +3228,6 @@ func (a *APIServer) handleGetOperatorConfig(c *gin.Context) { c.JSON(http.StatusOK, response) } -// Flash API handlers - -func (a *APIServer) handleCreateFlash(c *gin.Context) { - a.log.Info("create flash", "reqID", c.GetString("reqID")) - a.createFlash(c) -} - -func (a *APIServer) handleListFlash(c *gin.Context) { - a.log.Info("list flash jobs", "reqID", c.GetString("reqID")) - a.listFlash(c) -} - -func (a *APIServer) handleGetFlash(c *gin.Context) { - name := c.Param("name") - a.log.Info("get flash", "flash", name, "reqID", c.GetString("reqID")) - a.getFlash(c, name) -} - -func (a *APIServer) handleFlashLogs(c *gin.Context) { - name := c.Param("name") - a.log.Info("flash logs requested", "flash", name, "reqID", c.GetString("reqID")) - a.streamFlashLogs(c, name) -} - // Sealed API handlers // sealedPathToOperation maps the API path prefix to the AIB sealed operation. @@ -3510,428 +3272,6 @@ func (a *APIServer) handleSealedLogs(c *gin.Context) { a.streamSealedLogs(c, name) } -func (a *APIServer) createFlash(c *gin.Context) { - var req FlashRequest - if err := c.ShouldBindJSON(&req); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": "invalid JSON request"}) - return - } - - // Validate required fields - if req.ImageRef == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "imageRef is required"}) - return - } - if req.ClientConfig == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "clientConfig is required"}) - return - } - - // Auto-generate name if not provided - if req.Name == "" { - req.Name = fmt.Sprintf("flash-%s", uuid.New().String()[:5]) - } - - // Validate name - if err := validateBuildName(req.Name); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - return - } - - // Validate mutual exclusivity of lease-name and lease-duration - if req.LeaseName != "" && req.LeaseDuration != "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "lease-name and lease-duration are mutually exclusive"}) - return - } - - k8sClient, err := getClientFromRequest(c) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("k8s client error: %v", err)}) - return - } - - restCfg, err := getRESTConfigFromRequest(c) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) - return - } - clientset, err := kubernetes.NewForConfig(restCfg) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) - return - } - - ctx := c.Request.Context() - namespace := resolveNamespace() - requestedBy := a.resolveRequester(c) - - // Load OperatorConfig for target mappings, image overrides, and lease duration defaults - operatorConfig := &automotivev1alpha1.OperatorConfig{} - if err := k8sClient.Get(ctx, types.NamespacedName{Name: "config", Namespace: namespace}, operatorConfig); err != nil { - if !k8serrors.IsNotFound(err) { - a.log.Error(err, "failed to load OperatorConfig for flash, using defaults") - } - operatorConfig = &automotivev1alpha1.OperatorConfig{} - } - - // Resolve exporter selector and flash command from OperatorConfig - exporterSelector, flashCmd := resolveFlashTargetConfig(req, operatorConfig) - if exporterSelector == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "exporterSelector or valid target is required"}) - return - } - - // Replace placeholders in flash command - if flashCmd != "" { - flashCmd = strings.ReplaceAll(flashCmd, "{image_uri}", req.ImageRef) - flashCmd = strings.ReplaceAll(flashCmd, "{artifact_url}", req.ImageRef) - } - - // Create Jumpstarter client config secret - secretName, createdSecret, secretErr := createFlashClientConfigSecret(ctx, clientset, namespace, req) - if secretErr != nil { - c.JSON(secretErr.code, gin.H{"error": secretErr.message}) - return - } - - // Create OCI auth secret for flash image pull credentials - flashOCIAuthSecretName, createdOCIAuthSecret, ociErr := createFlashOCIAuthSecret(ctx, clientset, namespace, req.Name, req.RegistryCredentials) - if ociErr != nil { - _ = clientset.CoreV1().Secrets(namespace).Delete(ctx, secretName, metav1.DeleteOptions{}) - c.JSON(ociErr.code, gin.H{"error": ociErr.message}) - return - } - - // Build task config from OperatorConfig for flash task generation - var flashBuildConfig *tasks.BuildConfig - if operatorConfig.Spec.OSBuilds != nil { - flashBuildConfig = &tasks.BuildConfig{ - FlashTimeoutMinutes: operatorConfig.Spec.OSBuilds.GetFlashTimeoutMinutes(), - DefaultLeaseDuration: operatorConfig.Spec.Jumpstarter.GetDefaultLeaseDuration(), - } - } - - // Get the flash task spec - flashTask := tasks.GenerateFlashTask(namespace, flashBuildConfig) - - // Lease duration: only resolve when not using an existing lease - // Fallback: request > FlashTimeoutMinutes (as HH:MM:SS) > Jumpstarter default > constant - leaseDuration := req.LeaseDuration - if req.LeaseName == "" && leaseDuration == "" { - if operatorConfig.Spec.OSBuilds != nil && operatorConfig.Spec.OSBuilds.FlashTimeoutMinutes > 0 { - m := operatorConfig.Spec.OSBuilds.FlashTimeoutMinutes - leaseDuration = fmt.Sprintf("%02d:%02d:00", m/60, m%60) - } else { - leaseDuration = operatorConfig.Spec.Jumpstarter.GetDefaultLeaseDuration() - } - } - - // Build workspace bindings - workspaces := []tektonv1.WorkspaceBinding{ - { - Name: "jumpstarter-client", - Secret: &corev1.SecretVolumeSource{ - SecretName: secretName, - }, - }, - } - if flashOCIAuthSecretName != "" { - workspaces = append(workspaces, tektonv1.WorkspaceBinding{ - Name: "flash-oci-auth", - Secret: &corev1.SecretVolumeSource{ - SecretName: flashOCIAuthSecretName, - }, - }) - } - - // Create the flash TaskRun - taskRun := &tektonv1.TaskRun{ - ObjectMeta: metav1.ObjectMeta{ - Name: req.Name, - Namespace: namespace, - Labels: map[string]string{ - "app.kubernetes.io/managed-by": "build-api", - "app.kubernetes.io/part-of": "automotive-dev", - "app.kubernetes.io/name": "flash-taskrun", - flashTaskRunLabel: req.Name, - }, - Annotations: map[string]string{ - "automotive.sdv.cloud.redhat.com/requested-by": requestedBy, - "automotive.sdv.cloud.redhat.com/image-ref": req.ImageRef, - }, - }, - Spec: tektonv1.TaskRunSpec{ - ServiceAccountName: automotivev1alpha1.BuildServiceAccountName, - TaskSpec: &flashTask.Spec, - Params: []tektonv1.Param{ - {Name: "image-ref", Value: tektonv1.ParamValue{Type: tektonv1.ParamTypeString, StringVal: req.ImageRef}}, - {Name: "exporter-selector", Value: tektonv1.ParamValue{Type: tektonv1.ParamTypeString, StringVal: exporterSelector}}, - {Name: "flash-cmd", Value: tektonv1.ParamValue{Type: tektonv1.ParamTypeString, StringVal: flashCmd}}, - {Name: "lease-duration", Value: tektonv1.ParamValue{Type: tektonv1.ParamTypeString, StringVal: leaseDuration}}, - {Name: "lease-name", Value: tektonv1.ParamValue{Type: tektonv1.ParamTypeString, StringVal: req.LeaseName}}, - }, - Workspaces: workspaces, - }, - } - - if err := k8sClient.Create(ctx, taskRun); err != nil { - // Clean up secrets if TaskRun creation fails - _ = clientset.CoreV1().Secrets(namespace).Delete(ctx, secretName, metav1.DeleteOptions{}) - if flashOCIAuthSecretName != "" { - _ = clientset.CoreV1().Secrets(namespace).Delete(ctx, flashOCIAuthSecretName, metav1.DeleteOptions{}) - } - c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("failed to create flash TaskRun: %v", err)}) - return - } - - // Set owner reference on secrets for automatic cleanup - ownerRef := []metav1.OwnerReference{ - { - APIVersion: "tekton.dev/v1", - Kind: "TaskRun", - Name: taskRun.Name, - UID: taskRun.UID, - }, - } - createdSecret.OwnerReferences = ownerRef - if _, err := clientset.CoreV1().Secrets(namespace).Update(ctx, createdSecret, metav1.UpdateOptions{}); err != nil { - log.Printf("WARNING: failed to set owner reference on secret %s: %v", secretName, err) - } - if createdOCIAuthSecret != nil { - createdOCIAuthSecret.OwnerReferences = ownerRef - if _, updErr := clientset.CoreV1().Secrets(namespace).Update(ctx, createdOCIAuthSecret, metav1.UpdateOptions{}); updErr != nil { - log.Printf("WARNING: failed to set owner reference on flash OCI auth secret %s: %v", flashOCIAuthSecretName, updErr) - } - } - - writeJSON(c, http.StatusAccepted, FlashResponse{ - Name: req.Name, - Phase: phasePending, - Message: "Flash TaskRun created", - RequestedBy: requestedBy, - TaskRunName: taskRun.Name, - }) -} - -func (a *APIServer) listFlash(c *gin.Context) { - namespace := resolveNamespace() - limit, offset := parsePagination(c) - - k8sClient, err := getClientFromRequest(c) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("k8s client error: %v", err)}) - return - } - - ctx := c.Request.Context() - - // List TaskRuns with flash label - taskRunList := &tektonv1.TaskRunList{} - if err := k8sClient.List(ctx, taskRunList, client.InNamespace(namespace), client.HasLabels{flashTaskRunLabel}); err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("failed to list flash TaskRuns: %v", err)}) - return - } - - // Sort by creation time, newest first - sort.Slice(taskRunList.Items, func(i, j int) bool { - return taskRunList.Items[j].CreationTimestamp.Before(&taskRunList.Items[i].CreationTimestamp) - }) - - page := applyPagination(taskRunList.Items, limit, offset) - - resp := make([]FlashListItem, 0, len(page)) - for _, tr := range page { - phase, message := getTaskRunStatus(&tr) - var compStr string - if tr.Status.CompletionTime != nil { - compStr = tr.Status.CompletionTime.Format(time.RFC3339) - } - resp = append(resp, FlashListItem{ - Name: tr.Name, - Phase: phase, - Message: message, - RequestedBy: tr.Annotations["automotive.sdv.cloud.redhat.com/requested-by"], - CreatedAt: tr.CreationTimestamp.Format(time.RFC3339), - CompletionTime: compStr, - }) - } - writeJSON(c, http.StatusOK, resp) -} - -func (a *APIServer) getFlash(c *gin.Context, name string) { - namespace := resolveNamespace() - - k8sClient, err := getClientFromRequest(c) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("k8s client error: %v", err)}) - return - } - - ctx := c.Request.Context() - taskRun := &tektonv1.TaskRun{} - if err := k8sClient.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, taskRun); err != nil { - if k8serrors.IsNotFound(err) { - c.JSON(http.StatusNotFound, gin.H{"error": "flash TaskRun not found"}) - return - } - c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("failed to get flash TaskRun: %v", err)}) - return - } - - // Verify it's a flash TaskRun - if taskRun.Labels[flashTaskRunLabel] == "" { - c.JSON(http.StatusNotFound, gin.H{"error": "flash TaskRun not found"}) - return - } - - phase, message := getTaskRunStatus(taskRun) - var startStr, compStr string - if taskRun.Status.StartTime != nil { - startStr = taskRun.Status.StartTime.Format(time.RFC3339) - } - if taskRun.Status.CompletionTime != nil { - compStr = taskRun.Status.CompletionTime.Format(time.RFC3339) - } - - writeJSON(c, http.StatusOK, FlashResponse{ - Name: taskRun.Name, - Phase: phase, - Message: message, - RequestedBy: taskRun.Annotations["automotive.sdv.cloud.redhat.com/requested-by"], - StartTime: startStr, - CompletionTime: compStr, - TaskRunName: taskRun.Name, - }) -} - -func getTaskRunStatus(tr *tektonv1.TaskRun) (phase, message string) { - // Check if completed - if tr.Status.CompletionTime != nil { - // Check conditions for success/failure - for _, cond := range tr.Status.Conditions { - if cond.Type == "Succeeded" { - if cond.Status == corev1.ConditionTrue { - return phaseCompleted, "Flash completed successfully" - } - return phaseFailed, cond.Message - } - } - return phaseFailed, "Flash failed" - } - - // Check if running - if tr.Status.StartTime != nil { - return phaseRunning, "Flash in progress" - } - - return phasePending, "Waiting to start" -} - -func (a *APIServer) streamFlashLogs(c *gin.Context, name string) { - namespace := resolveNamespace() - - k8sClient, err := getClientFromRequest(c) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("k8s client error: %v", err)}) - return - } - - restCfg, err := getRESTConfigFromRequest(c) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) - return - } - clientset, err := kubernetes.NewForConfig(restCfg) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) - return - } - - ctx := c.Request.Context() - - // Verify the TaskRun exists and is a flash TaskRun - taskRun := &tektonv1.TaskRun{} - if err := k8sClient.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, taskRun); err != nil { - if k8serrors.IsNotFound(err) { - c.JSON(http.StatusNotFound, gin.H{"error": "flash TaskRun not found"}) - return - } - c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("failed to get flash TaskRun: %v", err)}) - return - } - if taskRun.Labels[flashTaskRunLabel] == "" { - c.JSON(http.StatusNotFound, gin.H{"error": "flash TaskRun not found"}) - return - } - - sinceTime := parseSinceTime(c.Query("since")) - streamDuration := time.Duration(a.limits.MaxLogStreamDurationMinutes) * time.Minute - streamCtx, cancel := context.WithTimeout(ctx, streamDuration) - defer cancel() - - // Get the pod name from TaskRun status - podName := taskRun.Status.PodName - if podName == "" { - c.JSON(http.StatusServiceUnavailable, gin.H{"error": "flash pod not ready"}) - return - } - - setupLogStreamHeaders(c) - - // TaskRun pods use step containers with naming convention "step-" - containerName := "step-flash" - - // Stream logs - req := clientset.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{ - Container: containerName, - Follow: true, - SinceTime: sinceTime, - }) - stream, err := req.Stream(streamCtx) - if err != nil { - _, _ = fmt.Fprintf(c.Writer, "\n[Error streaming logs: %v]\n", err) - c.Writer.Flush() - return - } - defer func() { - if err := stream.Close(); err != nil { - fmt.Fprintf(os.Stderr, "Warning: failed to close stream: %v\n", err) - } - }() - - _, _ = c.Writer.Write([]byte("\n===== Flash TaskRun Logs =====\n\n")) - c.Writer.Flush() - - scanner := bufio.NewScanner(stream) - scanner.Buffer(make([]byte, 64*1024), 1024*1024) - - for scanner.Scan() { - select { - case <-streamCtx.Done(): - return - default: - } - line := scanner.Bytes() - if _, writeErr := c.Writer.Write(line); writeErr != nil { - return - } - if _, writeErr := c.Writer.Write([]byte("\n")); writeErr != nil { - return - } - c.Writer.Flush() - } - - if err := scanner.Err(); err != nil && err != io.EOF { - var errMsg []byte - errMsg = fmt.Appendf(errMsg, "\n[Stream error: %v]\n", err) - _, _ = c.Writer.Write(errMsg) - c.Writer.Flush() - } - - _, _ = c.Writer.Write([]byte("\n[Log streaming completed]\n")) - c.Writer.Flush() -} - // validateSealedRequest validates and normalizes a SealedRequest, returning the resolved stages or an error message. func validateSealedRequest(req *SealedRequest) ([]string, string) { validOps := map[string]bool{ diff --git a/internal/controller/imagebuild/controller.go b/internal/controller/imagebuild/controller.go index 413e38a7..bb9450fe 100644 --- a/internal/controller/imagebuild/controller.go +++ b/internal/controller/imagebuild/controller.go @@ -606,6 +606,9 @@ func (r *ImageBuildReconciler) checkBuildProgress( return ctrl.Result{}, err } recordBuildMetrics(fresh, pipelineRun, buildStatusSuccess) + if fresh.Spec.IsFlashEnabled() { + r.recordPipelineFlashMetrics(ctx, fresh, pipelineRun, buildStatusSuccess) + } r.emitEventf( fresh, @@ -647,6 +650,9 @@ func (r *ImageBuildReconciler) checkBuildProgress( return ctrl.Result{}, err } recordBuildMetrics(imageBuild, pipelineRun, buildStatusFailure) + if imageBuild.Spec.IsFlashEnabled() { + r.recordPipelineFlashMetrics(ctx, imageBuild, pipelineRun, buildStatusFailure) + } if cleanupErr != nil { return ctrl.Result{RequeueAfter: secretCleanupRequeue}, nil } @@ -658,6 +664,9 @@ func (r *ImageBuildReconciler) checkBuildProgress( return ctrl.Result{}, err } recordBuildMetrics(imageBuild, pipelineRun, buildStatusFailure) + if imageBuild.Spec.IsFlashEnabled() { + r.recordPipelineFlashMetrics(ctx, imageBuild, pipelineRun, buildStatusFailure) + } if cleanupErr != nil { return ctrl.Result{RequeueAfter: secretCleanupRequeue}, nil } @@ -1633,7 +1642,8 @@ func (r *ImageBuildReconciler) handleFlashingState( patch := client.MergeFrom(fresh.DeepCopy()) - if isTaskRunSuccessful(taskRun) { + flashSucceeded := isTaskRunSuccessful(taskRun) + if flashSucceeded { fresh.Status.Phase = phaseCompleted fresh.Status.Message = "Build, push, and flash completed successfully" } else { @@ -1651,6 +1661,12 @@ func (r *ImageBuildReconciler) handleFlashingState( return ctrl.Result{}, err } + if flashSucceeded { + recordFlashMetrics(imageBuild, taskRun, buildStatusSuccess) + } else { + recordFlashMetrics(imageBuild, taskRun, buildStatusFailure) + } + if cleanupErr != nil { return ctrl.Result{RequeueAfter: secretCleanupRequeue}, nil } @@ -1986,6 +2002,44 @@ func recordBuildMetrics(imageBuild *automotivev1alpha1.ImageBuild, pipelineRun * } } +func (r *ImageBuildReconciler) recordPipelineFlashMetrics( + ctx context.Context, + imageBuild *automotivev1alpha1.ImageBuild, + pipelineRun *tektonv1.PipelineRun, + status string, +) { + target := imageBuild.Spec.GetTarget() + + for _, child := range pipelineRun.Status.ChildReferences { + if child.PipelineTaskName != "flash-image" { + continue + } + taskRun := &tektonv1.TaskRun{} + if err := r.Get(ctx, types.NamespacedName{ + Name: child.Name, + Namespace: pipelineRun.Namespace, + }, taskRun); err != nil { + break + } + FlashTotal.WithLabelValues(target, status).Inc() + if taskRun.Status.CompletionTime != nil { + duration := taskRun.Status.CompletionTime.Sub(taskRun.CreationTimestamp.Time).Seconds() + FlashDuration.WithLabelValues(target, status).Observe(duration) + } + return + } +} + +func recordFlashMetrics(imageBuild *automotivev1alpha1.ImageBuild, taskRun *tektonv1.TaskRun, status string) { + target := imageBuild.Spec.GetTarget() + FlashTotal.WithLabelValues(target, status).Inc() + + if taskRun.Status.CompletionTime != nil { + duration := taskRun.Status.CompletionTime.Sub(taskRun.CreationTimestamp.Time).Seconds() + FlashDuration.WithLabelValues(target, status).Observe(duration) + } +} + func extractProvenance(pipelineRun *tektonv1.PipelineRun, aibImage string) (aibImageUsed, builderImageUsed string) { aibImageUsed = aibImage // Always record the AIB image that was requested diff --git a/internal/controller/imagebuild/metrics.go b/internal/controller/imagebuild/metrics.go index 05e64676..5bccca7e 100644 --- a/internal/controller/imagebuild/metrics.go +++ b/internal/controller/imagebuild/metrics.go @@ -48,6 +48,29 @@ var ( }, []string{"mode", "distro", "target", "format", "arch", "status"}, ) + + // FlashTotal counts pipeline-triggered flash operations by status. + FlashTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: "flash", + Name: "total", + Help: "Total number of pipeline flash operations by status", + }, + []string{"target", "status"}, + ) + + // FlashDuration tracks pipeline flash duration in seconds. + FlashDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: "flash", + Name: "duration_seconds", + Help: "Pipeline flash operation duration in seconds", + Buckets: []float64{10, 30, 60, 120, 180, 300, 600, 900}, + }, + []string{"target", "status"}, + ) ) func init() { @@ -55,5 +78,7 @@ func init() { BuildDuration, BuildPhaseDuration, BuildTotal, + FlashTotal, + FlashDuration, ) }