Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 36 additions & 45 deletions internal/buildapi/server.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package buildapi

import (
"archive/tar"
"bufio"
"bytes"
"context"
_ "embed"
"encoding/base64"
Expand Down Expand Up @@ -84,6 +84,26 @@ const (
var getClientFromRequestFn = getClientFromRequest
var getRESTConfigFromRequestFn = getRESTConfigFromRequest
var createInternalRegistrySecretFn = createInternalRegistrySecret
var newPodExecExecutorFn = func(
config *rest.Config,
namespace, podName, containerName string,
cmd []string,
) (remotecommand.Executor, error) {
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
req := clientset.CoreV1().RESTClient().Post().Resource("pods").Name(podName).Namespace(namespace).SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Container: containerName,
Command: cmd,
Stdin: true,
Stdout: true,
Stderr: true,
TTY: false,
}, kscheme.ParameterCodec)
return remotecommand.NewSPDYExecutor(config, http.MethodPost, req.URL())
}
var errRegistryCredentialsRequiredForPush = errors.New("registry credentials are required when push repository is specified")
var loadOperatorConfigFn = func(
ctx context.Context,
Expand Down Expand Up @@ -2355,6 +2375,7 @@ func getBuildTemplate(c *gin.Context, name string) {

// uploadContext holds the context needed for file upload operations.
type uploadContext struct {
ctx context.Context
restCfg *rest.Config
namespace string
podName string
Expand Down Expand Up @@ -2422,7 +2443,7 @@ func processFilePart(part *multipart.Part, pendingPath string, uctx *uploadConte
}

destPath := "/workspace/shared/" + cleanDest
if err := copyFileToPod(uctx.restCfg, uctx.namespace, uctx.podName, uctx.container, tmpName, destPath); err != nil {
if err := copyFileToPod(uctx.ctx, uctx.restCfg, uctx.namespace, uctx.podName, uctx.container, tmpName, destPath); err != nil {
return processFilePartResult{}, fmt.Errorf("stream to pod failed: %w", err)
}

Expand Down Expand Up @@ -2498,6 +2519,7 @@ func (a *APIServer) uploadFiles(c *gin.Context, name string) {
}

uctx := &uploadContext{
ctx: c.Request.Context(),
restCfg: restCfg,
namespace: namespace,
podName: uploadPod.Name,
Expand Down Expand Up @@ -2560,7 +2582,7 @@ func (a *APIServer) uploadFiles(c *gin.Context, name string) {
writeJSON(c, http.StatusOK, map[string]string{"status": "ok"})
}

func copyFileToPod(config *rest.Config, namespace, podName, containerName, localPath, podPath string) error {
func copyFileToPod(ctx context.Context, config *rest.Config, namespace, podName, containerName, localPath, podPath string) error {
f, err := os.Open(localPath)
if err != nil {
return err
Expand All @@ -2570,55 +2592,24 @@ func copyFileToPod(config *rest.Config, namespace, podName, containerName, local
fmt.Fprintf(os.Stderr, "Warning: failed to close file: %v\n", err)
}
}()
info, err := f.Stat()
if err != nil {
return err
}

pr, pw := io.Pipe()
go func() {
tw := tar.NewWriter(pw)
defer func() {
if err := tw.Close(); err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to close tar writer: %v\n", err)
}
if err := pw.Close(); err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to close pipe writer: %v\n", err)
}
}()
hdr := &tar.Header{Name: path.Base(podPath), Mode: 0600, Size: info.Size(), ModTime: info.ModTime()}
if err := tw.WriteHeader(hdr); err != nil {
pw.CloseWithError(err)
return
}
if _, err := io.Copy(tw, f); err != nil {
pw.CloseWithError(err)
return
}
}()
// Stream raw file bytes via stdin; the pod-side command writes them directly.
// Uses only sh + cat (available in ubi-minimal), no tar dependency.
cmd := []string{"/bin/sh", "-c", "mkdir -p \"$(dirname \"$1\")\" && cat > \"$1\" && chmod 0600 \"$1\"", "--", podPath}

destDir := path.Dir(podPath)
cmd := []string{"/bin/sh", "-c", "mkdir -p \"$1\" && tar -x -C \"$1\"", "--", destDir}

clientset, err := kubernetes.NewForConfig(config)
executor, err := newPodExecExecutorFn(config, namespace, podName, containerName, cmd)
if err != nil {
return err
}
req := clientset.CoreV1().RESTClient().Post().Resource("pods").Name(podName).Namespace(namespace).SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Container: containerName,
Command: cmd,
Stdin: true,
Stdout: true,
Stderr: true,
TTY: false,
}, kscheme.ParameterCodec)
executor, err := remotecommand.NewSPDYExecutor(config, http.MethodPost, req.URL())
if err != nil {
var stderr bytes.Buffer
streamOpts := remotecommand.StreamOptions{Stdin: f, Stdout: io.Discard, Stderr: &stderr}
if err := executor.StreamWithContext(ctx, streamOpts); err != nil {
if stderr.Len() > 0 {
return fmt.Errorf("copy to pod: %w (stderr: %s)", err, stderr.String())
}
return err
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
streamOpts := remotecommand.StreamOptions{Stdin: pr, Stdout: io.Discard, Stderr: io.Discard}
return executor.StreamWithContext(context.Background(), streamOpts)
return nil
}

func setSecretOwnerRef(
Expand Down
139 changes: 139 additions & 0 deletions internal/buildapi/server_upload_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package buildapi

import (
"bytes"
"context"
"errors"
"io"
"os"
"reflect"
"testing"

"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
)

type fakeRemoteExecutor struct {
streamWithContextFn func(context.Context, remotecommand.StreamOptions) error
}

func (f *fakeRemoteExecutor) Stream(options remotecommand.StreamOptions) error {
return f.StreamWithContext(context.Background(), options)
}

func (f *fakeRemoteExecutor) StreamWithContext(ctx context.Context, options remotecommand.StreamOptions) error {
if f.streamWithContextFn != nil {
return f.streamWithContextFn(ctx, options)
}
return nil
}

func writeTempUploadFile(t *testing.T, data []byte) string {
t.Helper()

f, err := os.CreateTemp(t.TempDir(), "upload-*")
if err != nil {
t.Fatalf("create temp file: %v", err)
}
if _, err := f.Write(data); err != nil {
_ = f.Close()
t.Fatalf("write temp file: %v", err)
}
if err := f.Close(); err != nil {
t.Fatalf("close temp file: %v", err)
}
return f.Name()
}

func TestCopyFileToPodStreamsRawBytesWithNoTarCommand(t *testing.T) {
content := []byte("hello\x00world\n")
localPath := writeTempUploadFile(t, content)

originalNewPodExecExecutorFn := newPodExecExecutorFn
t.Cleanup(func() {
newPodExecExecutorFn = originalNewPodExecExecutorFn
})

var gotNamespace, gotPodName, gotContainerName string
var gotCmd []string
var gotBytes []byte

newPodExecExecutorFn = func(
_ *rest.Config,
namespace, podName, containerName string,
cmd []string,
) (remotecommand.Executor, error) {
gotNamespace = namespace
gotPodName = podName
gotContainerName = containerName
gotCmd = append([]string(nil), cmd...)

return &fakeRemoteExecutor{
streamWithContextFn: func(_ context.Context, options remotecommand.StreamOptions) error {
data, err := io.ReadAll(options.Stdin)
if err != nil {
return err
}
gotBytes = append([]byte(nil), data...)
return nil
},
}, nil
}

err := copyFileToPod(
context.Background(),
&rest.Config{},
"test-ns",
"test-pod",
"fileserver",
localPath,
"/workspace/shared/configs/app.conf",
)
if err != nil {
t.Fatalf("copyFileToPod returned error: %v", err)
}

wantCmd := []string{
"/bin/sh",
"-c",
"mkdir -p \"$(dirname \"$1\")\" && cat > \"$1\" && chmod 0600 \"$1\"",
"--",
"/workspace/shared/configs/app.conf",
}
if gotNamespace != "test-ns" || gotPodName != "test-pod" || gotContainerName != "fileserver" {
t.Fatalf("unexpected exec target: namespace=%q pod=%q container=%q", gotNamespace, gotPodName, gotContainerName)
}
if !reflect.DeepEqual(gotCmd, wantCmd) {
t.Fatalf("unexpected command:\n got: %#v\nwant: %#v", gotCmd, wantCmd)
}
if !bytes.Equal(gotBytes, content) {
t.Fatalf("unexpected streamed bytes:\n got: %q\nwant: %q", gotBytes, content)
}
}

func TestCopyFileToPodPropagatesStreamErrors(t *testing.T) {
localPath := writeTempUploadFile(t, []byte("content"))

originalNewPodExecExecutorFn := newPodExecExecutorFn
t.Cleanup(func() {
newPodExecExecutorFn = originalNewPodExecExecutorFn
})

wantErr := errors.New("stream failed")
newPodExecExecutorFn = func(
_ *rest.Config,
_, _, _ string,
_ []string,
) (remotecommand.Executor, error) {
return &fakeRemoteExecutor{
streamWithContextFn: func(_ context.Context, _ remotecommand.StreamOptions) error {
return wantErr
},
}, nil
}

err := copyFileToPod(context.Background(), &rest.Config{}, "test-ns", "test-pod", "fileserver", localPath, "/workspace/shared/file.txt")
if !errors.Is(err, wantErr) {
t.Fatalf("expected error %v, got %v", wantErr, err)
}
}
14 changes: 7 additions & 7 deletions internal/common/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/log"
)

// BuildConfig defines configuration options for build operations
Expand Down Expand Up @@ -592,15 +591,16 @@ func GenerateBuildAutomotiveImageTask(namespace string, buildConfig *BuildConfig
},
}

if buildConfig != nil && buildConfig.UseMemoryVolumes && buildConfig.UsePVCScratchVolumes {
log.Log.Info("WARNING: useMemoryVolumes and usePVCScratchVolumes are both enabled; usePVCScratchVolumes takes precedence")
}

if buildConfig != nil && buildConfig.UseMemoryVolumes && !buildConfig.UsePVCScratchVolumes {
if buildConfig != nil && buildConfig.UseMemoryVolumes {
for i := range task.Spec.Volumes {
vol := &task.Spec.Volumes[i]

if vol.Name == "build-dir" || vol.Name == "run-dir" || vol.Name == volumeNameContainerStorage || vol.Name == "output-dir" {
isContainerStorage := vol.Name == volumeNameContainerStorage
isScratch := vol.Name == "build-dir" || vol.Name == "run-dir" || vol.Name == "output-dir"

// When PVC scratch is on, only container-storage remains as emptyDir;
// the other scratch volumes get redirected to the workspace PVC below.
if isContainerStorage || (!buildConfig.UsePVCScratchVolumes && isScratch) {
vol.EmptyDir = &corev1.EmptyDirVolumeSource{
Medium: corev1.StorageMediumMemory,
}
Expand Down
27 changes: 22 additions & 5 deletions internal/controller/imagebuild/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1736,9 +1736,24 @@ func (r *ImageBuildReconciler) createUploadPod(ctx context.Context, imageBuild *
return fmt.Errorf("error checking for existing pod: %w", err)
}

workspacePVCName, err := r.getOrCreateWorkspacePVC(ctx, imageBuild)
if err != nil {
return err
// When a build-cache PVC exists (from --workspace), use it for uploads too
// so files land on the same PVC the TaskRun will mount as shared-workspace.
var workspacePVCName string
if imageBuild.Spec.BuildCachePVC != "" {
pvc := &corev1.PersistentVolumeClaim{}
if err := r.Get(ctx, types.NamespacedName{
Name: imageBuild.Spec.BuildCachePVC,
Namespace: imageBuild.Namespace,
}, pvc); err != nil {
return fmt.Errorf("buildCachePVC %q is not available: %w", imageBuild.Spec.BuildCachePVC, err)
}
workspacePVCName = pvc.Name
} else {
var err error
workspacePVCName, err = r.getOrCreateWorkspacePVC(ctx, imageBuild)
if err != nil {
return err
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
}

if imageBuild.Status.PVCName != workspacePVCName {
Expand Down Expand Up @@ -1788,7 +1803,7 @@ func (r *ImageBuildReconciler) createUploadPod(ctx context.Context, imageBuild *
Containers: []corev1.Container{
{
Name: "fileserver",
Image: "quay.io/nginx/nginx-unprivileged:latest",
Image: "registry.access.redhat.com/ubi10-minimal:latest",
Command: []string{"sleep", "infinity"},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
Expand Down Expand Up @@ -2051,7 +2066,7 @@ func (r *ImageBuildReconciler) getOrCreateWorkspacePVC(
"old-pvc", imageBuild.Status.PVCName)
}

// Fetch OperatorConfig to get PVC size configuration
// Fetch OperatorConfig to get PVC size and storage class configuration
operatorConfig := &automotivev1alpha1.OperatorConfig{}
err := r.Get(ctx, types.NamespacedName{Name: "config", Namespace: OperatorNamespace}, operatorConfig)

Expand Down Expand Up @@ -2097,6 +2112,8 @@ func (r *ImageBuildReconciler) getOrCreateWorkspacePVC(

if imageBuild.Spec.StorageClass != "" {
pvc.Spec.StorageClassName = &imageBuild.Spec.StorageClass
} else if err == nil && operatorConfig.Spec.OSBuilds != nil && operatorConfig.Spec.OSBuilds.StorageClass != "" {
pvc.Spec.StorageClassName = &operatorConfig.Spec.OSBuilds.StorageClass
}

if err := r.Create(ctx, pvc); err != nil {
Expand Down
Loading