diff --git a/sdks/go/container/tools/buffered_logging.go b/sdks/go/container/tools/buffered_logging.go index 5a810dbfdf1a..ef5e8310c3b6 100644 --- a/sdks/go/container/tools/buffered_logging.go +++ b/sdks/go/container/tools/buffered_logging.go @@ -21,7 +21,7 @@ import ( "strings" ) -const INITIAL_LOG_SIZE int = 255 +const initialLogSize int = 255 // BufferedLogger is a wrapper around the FnAPI logging client meant to be used // in place of stdout and stderr in bootloader subprocesses. Not intended for @@ -46,7 +46,7 @@ func (b *BufferedLogger) Write(p []byte) (int, error) { } n, err := b.builder.Write(p) if b.logs == nil { - b.logs = make([]string, 0, INITIAL_LOG_SIZE) + b.logs = make([]string, 0, initialLogSize) } b.logs = append(b.logs, b.builder.String()) b.builder.Reset() diff --git a/sdks/go/pkg/beam/util/execx/exec.go b/sdks/go/pkg/beam/util/execx/exec.go index 455b5f5ff84d..aaaf9355e7c1 100644 --- a/sdks/go/pkg/beam/util/execx/exec.go +++ b/sdks/go/pkg/beam/util/execx/exec.go @@ -17,6 +17,7 @@ package execx import ( + "io" "os" "os/exec" ) @@ -24,16 +25,22 @@ import ( // Execute runs the program with the given arguments. It attaches stdio to the // child process. func Execute(prog string, args ...string) error { - return ExecuteEnv(nil, prog, args...) + return ExecuteEnvWithIO(nil, os.Stdin, os.Stdout, os.Stderr, prog, args...) } // ExecuteEnv runs the program with the given arguments with additional environment // variables. It attaches stdio to the child process. func ExecuteEnv(env map[string]string, prog string, args ...string) error { + return ExecuteEnvWithIO(env, os.Stdin, os.Stdout, os.Stderr, prog, args...) +} + +// ExecuteEnvWithIO runs the program with the given arguments with additional environment +// variables. It attaches custom IO to the child process. +func ExecuteEnvWithIO(env map[string]string, stdin io.Reader, stdout, stderr io.Writer, prog string, args ...string) error { cmd := exec.Command(prog, args...) - cmd.Stdin = os.Stdin - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr + cmd.Stdin = stdin + cmd.Stdout = stdout + cmd.Stderr = stderr if env != nil { cmd.Env = os.Environ() for k, v := range env { diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index e7b11daa3973..286490c2445c 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -205,7 +205,7 @@ func launchSDKProcess() error { } } - if setupErr := installSetupPackages(fileNames, dir, requirementsFiles); setupErr != nil { + if setupErr := installSetupPackages(ctx, logger, fileNames, dir, requirementsFiles); setupErr != nil { fmtErr := fmt.Errorf("failed to install required packages: %v", setupErr) // Send error message to logging service before returning up the call stack logger.Errorf(ctx, fmtErr.Error()) @@ -379,7 +379,7 @@ func setupAcceptableWheelSpecs() error { } // installSetupPackages installs Beam SDK and user dependencies. -func installSetupPackages(files []string, workDir string, requirementsFiles []string) error { +func installSetupPackages(ctx context.Context, logger *tools.Logger, files []string, workDir string, requirementsFiles []string) error { log.Printf("Installing setup packages ...") if err := setupAcceptableWheelSpecs(); err != nil { @@ -389,25 +389,25 @@ func installSetupPackages(files []string, workDir string, requirementsFiles []st pkgName := "apache-beam" isSdkInstalled := isPackageInstalled(pkgName) if !isSdkInstalled { - return fmt.Errorf("Apache Beam is not installed in the runtime environment. If you use a custom container image, you must install apache-beam package in the custom image using same version of Beam as in the pipeline submission environment. For more information, see: the https://beam.apache.org/documentation/runtime/environments/.") + return fmt.Errorf("Apache Beam is not installed in the runtime environment. If you use a custom container image, you must install apache-beam package in the custom image using same version of Beam as in the pipeline submission environment. For more information, see: the https://beam.apache.org/documentation/runtime/environments/") } // Install the Dataflow Python SDK and worker packages. // We install the extra requirements in case of using the beam sdk. These are ignored by pip // if the user is using an SDK that does not provide these. - if err := installSdk(files, workDir, sdkSrcFile, acceptableWhlSpecs, false); err != nil { + if err := installSdk(ctx, logger, files, workDir, sdkSrcFile, acceptableWhlSpecs, false); err != nil { return fmt.Errorf("failed to install SDK: %v", err) } // The staged files will not disappear due to restarts because workDir is a // folder that is mapped to the host (and therefore survives restarts). for _, f := range requirementsFiles { - if err := pipInstallRequirements(files, workDir, f); err != nil { + if err := pipInstallRequirements(ctx, logger, files, workDir, f); err != nil { return fmt.Errorf("failed to install requirements: %v", err) } } - if err := installExtraPackages(files, extraPackagesFile, workDir); err != nil { + if err := installExtraPackages(ctx, logger, files, extraPackagesFile, workDir); err != nil { return fmt.Errorf("failed to install extra packages: %v", err) } - if err := pipInstallPackage(files, workDir, workflowFile, false, true, nil); err != nil { + if err := pipInstallPackage(ctx, logger, files, workDir, workflowFile, false, true, nil); err != nil { return fmt.Errorf("failed to install workflow: %v", err) } @@ -450,7 +450,7 @@ func processArtifactsInSetupOnlyMode() { } files[i] = filePayload.GetPath() } - if setupErr := installSetupPackages(files, workDir, []string{requirementsFile}); setupErr != nil { + if setupErr := installSetupPackages(context.Background(), nil, files, workDir, []string{requirementsFile}); setupErr != nil { log.Fatalf("Failed to install required packages: %v", setupErr) } } diff --git a/sdks/python/container/piputil.go b/sdks/python/container/piputil.go index 350bda049d9d..fec5cf0ab50d 100644 --- a/sdks/python/container/piputil.go +++ b/sdks/python/container/piputil.go @@ -18,6 +18,7 @@ package main import ( "bufio" "bytes" + "context" "errors" "fmt" "log" @@ -26,16 +27,18 @@ import ( "path/filepath" "strings" + "github.com/apache/beam/sdks/v2/go/container/tools" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx" ) // pipInstallRequirements installs the given requirement, if present. -func pipInstallRequirements(files []string, dir, name string) error { +func pipInstallRequirements(ctx context.Context, logger *tools.Logger, files []string, dir, name string) error { pythonVersion, err := expansionx.GetPythonVersion() if err != nil { return err } + bufLogger := tools.NewBufferedLogger(logger) for _, file := range files { if file == name { // We run the install process in two rounds in order to avoid as much @@ -50,7 +53,13 @@ func pipInstallRequirements(files []string, dir, name string) error { // also installs dependencies. The key is that if all the packages have // been installed in the first round then this command will be a no-op. args = []string{"-m", "pip", "install", "-q", "-r", filepath.Join(dir, name), "--no-cache-dir", "--disable-pip-version-check", "--find-links", dir} - return execx.Execute(pythonVersion, args...) + err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...) + if err != nil { + bufLogger.FlushAtError(ctx) + return err + } + bufLogger.FlushAtDebug(ctx) + return nil } } return nil @@ -69,11 +78,12 @@ func isPackageInstalled(pkgName string) bool { } // pipInstallPackage installs the given package, if present. -func pipInstallPackage(files []string, dir, name string, force, optional bool, extras []string) error { +func pipInstallPackage(ctx context.Context, logger *tools.Logger, files []string, dir, name string, force, optional bool, extras []string) error { pythonVersion, err := expansionx.GetPythonVersion() if err != nil { return err } + bufLogger := tools.NewBufferedLogger(logger) for _, file := range files { if file == name { var packageSpec = name @@ -97,19 +107,34 @@ func pipInstallPackage(files []string, dir, name string, force, optional bool, e // installed version will match the package specified, the package itself // will not be reinstalled, but its dependencies will now be resolved and // installed if necessary. This achieves our goal outlined above. - args := []string{"-m", "pip", "install", "-q", "--no-cache-dir", "--disable-pip-version-check", "--upgrade", "--force-reinstall", "--no-deps", + args := []string{"-m", "pip", "install", "--no-cache-dir", "--disable-pip-version-check", "--upgrade", "--force-reinstall", "--no-deps", filepath.Join(dir, packageSpec)} - err := execx.Execute(pythonVersion, args...) + err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...) if err != nil { + bufLogger.FlushAtError(ctx) return err + } else { + bufLogger.FlushAtDebug(ctx) } - args = []string{"-m", "pip", "install", "-q", "--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, packageSpec)} - return execx.Execute(pythonVersion, args...) + args = []string{"-m", "pip", "install", "--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, packageSpec)} + err = execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...) + if err != nil { + bufLogger.FlushAtError(ctx) + return err + } + bufLogger.FlushAtDebug(ctx) + return nil } // Case when we do not perform a forced reinstall. - args := []string{"-m", "pip", "install", "-q", "--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, packageSpec)} - return execx.Execute(pythonVersion, args...) + args := []string{"-m", "pip", "install", "--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, packageSpec)} + err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...) + if err != nil { + bufLogger.FlushAtError(ctx) + return err + } + bufLogger.FlushAtDebug(ctx) + return nil } } if optional { @@ -120,7 +145,7 @@ func pipInstallPackage(files []string, dir, name string, force, optional bool, e // installExtraPackages installs all the packages declared in the extra // packages manifest file. -func installExtraPackages(files []string, extraPackagesFile, dir string) error { +func installExtraPackages(ctx context.Context, logger *tools.Logger, files []string, extraPackagesFile, dir string) error { // First check that extra packages manifest file is present. for _, file := range files { if file != extraPackagesFile { @@ -139,7 +164,7 @@ func installExtraPackages(files []string, extraPackagesFile, dir string) error { for s.Scan() { extraPackage := s.Text() log.Printf("Installing extra package: %s", extraPackage) - if err = pipInstallPackage(files, dir, extraPackage, true, false, nil); err != nil { + if err = pipInstallPackage(ctx, logger, files, dir, extraPackage, true, false, nil); err != nil { return fmt.Errorf("failed to install extra package %s: %v", extraPackage, err) } } @@ -167,13 +192,13 @@ func findBeamSdkWhl(files []string, acceptableWhlSpecs []string) string { // assume that the pipleine was started with the Beam SDK found in the wheel // file, and we try to install it. If not successful, we fall back to installing // SDK from source tarball provided in sdkSrcFile. -func installSdk(files []string, workDir string, sdkSrcFile string, acceptableWhlSpecs []string, required bool) error { +func installSdk(ctx context.Context, logger *tools.Logger, files []string, workDir string, sdkSrcFile string, acceptableWhlSpecs []string, required bool) error { sdkWhlFile := findBeamSdkWhl(files, acceptableWhlSpecs) if sdkWhlFile != "" { // by default, pip rejects to install wheel if same version already installed isDev := strings.Contains(sdkWhlFile, ".dev") - err := pipInstallPackage(files, workDir, sdkWhlFile, isDev, false, []string{"gcp"}) + err := pipInstallPackage(ctx, logger, files, workDir, sdkWhlFile, isDev, false, []string{"gcp"}) if err == nil { return nil } @@ -185,6 +210,6 @@ func installSdk(files []string, workDir string, sdkSrcFile string, acceptableWhl return nil } } - err := pipInstallPackage(files, workDir, sdkSrcFile, false, false, []string{"gcp"}) + err := pipInstallPackage(ctx, logger, files, workDir, sdkSrcFile, false, false, []string{"gcp"}) return err }