Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add buffered logger to the Python bootloader #28317

Merged
merged 3 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sdks/go/container/tools/buffered_logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"strings"
)

const INITIAL_LOG_SIZE int = 255
const initialLogSize int = 255

// BufferedLogger is a wrapper around the FnAPI logging client meant to be used
// in place of stdout and stderr in bootloader subprocesses. Not intended for
Expand All @@ -46,7 +46,7 @@ func (b *BufferedLogger) Write(p []byte) (int, error) {
}
n, err := b.builder.Write(p)
if b.logs == nil {
b.logs = make([]string, 0, INITIAL_LOG_SIZE)
b.logs = make([]string, 0, initialLogSize)
}
b.logs = append(b.logs, b.builder.String())
b.builder.Reset()
Expand Down
15 changes: 11 additions & 4 deletions sdks/go/pkg/beam/util/execx/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,30 @@
package execx

import (
"io"
"os"
"os/exec"
)

// Execute runs the program with the given arguments. It attaches stdio to the
// child process.
func Execute(prog string, args ...string) error {
return ExecuteEnv(nil, prog, args...)
return ExecuteEnvWithIO(nil, os.Stdin, os.Stdout, os.Stderr, prog, args...)
}

// ExecuteEnv runs the program with the given arguments with additional environment
// variables. It attaches stdio to the child process.
func ExecuteEnv(env map[string]string, prog string, args ...string) error {
return ExecuteEnvWithIO(env, os.Stdin, os.Stdout, os.Stderr, prog, args...)
}

// ExecuteEnvWithIO runs the program with the given arguments with additional environment
// variables. It attaches custom IO to the child process.
func ExecuteEnvWithIO(env map[string]string, stdin io.Reader, stdout, stderr io.Writer, prog string, args ...string) error {
jrmccluskey marked this conversation as resolved.
Show resolved Hide resolved
cmd := exec.Command(prog, args...)
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Stdin = stdin
jrmccluskey marked this conversation as resolved.
Show resolved Hide resolved
cmd.Stdout = stdout
cmd.Stderr = stderr
if env != nil {
cmd.Env = os.Environ()
for k, v := range env {
Expand Down
16 changes: 8 additions & 8 deletions sdks/python/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func launchSDKProcess() error {
}
}

if setupErr := installSetupPackages(fileNames, dir, requirementsFiles); setupErr != nil {
if setupErr := installSetupPackages(ctx, logger, fileNames, dir, requirementsFiles); setupErr != nil {
fmtErr := fmt.Errorf("failed to install required packages: %v", setupErr)
// Send error message to logging service before returning up the call stack
logger.Errorf(ctx, fmtErr.Error())
Expand Down Expand Up @@ -379,7 +379,7 @@ func setupAcceptableWheelSpecs() error {
}

// installSetupPackages installs Beam SDK and user dependencies.
func installSetupPackages(files []string, workDir string, requirementsFiles []string) error {
func installSetupPackages(ctx context.Context, logger *tools.Logger, files []string, workDir string, requirementsFiles []string) error {
log.Printf("Installing setup packages ...")

if err := setupAcceptableWheelSpecs(); err != nil {
Expand All @@ -389,25 +389,25 @@ func installSetupPackages(files []string, workDir string, requirementsFiles []st
pkgName := "apache-beam"
isSdkInstalled := isPackageInstalled(pkgName)
if !isSdkInstalled {
return fmt.Errorf("Apache Beam is not installed in the runtime environment. If you use a custom container image, you must install apache-beam package in the custom image using same version of Beam as in the pipeline submission environment. For more information, see: the https://beam.apache.org/documentation/runtime/environments/.")
return fmt.Errorf("Apache Beam is not installed in the runtime environment. If you use a custom container image, you must install apache-beam package in the custom image using same version of Beam as in the pipeline submission environment. For more information, see: the https://beam.apache.org/documentation/runtime/environments/")
}
// Install the Dataflow Python SDK and worker packages.
// We install the extra requirements in case of using the beam sdk. These are ignored by pip
// if the user is using an SDK that does not provide these.
if err := installSdk(files, workDir, sdkSrcFile, acceptableWhlSpecs, false); err != nil {
if err := installSdk(ctx, logger, files, workDir, sdkSrcFile, acceptableWhlSpecs, false); err != nil {
return fmt.Errorf("failed to install SDK: %v", err)
}
// The staged files will not disappear due to restarts because workDir is a
// folder that is mapped to the host (and therefore survives restarts).
for _, f := range requirementsFiles {
if err := pipInstallRequirements(files, workDir, f); err != nil {
if err := pipInstallRequirements(ctx, logger, files, workDir, f); err != nil {
return fmt.Errorf("failed to install requirements: %v", err)
}
}
if err := installExtraPackages(files, extraPackagesFile, workDir); err != nil {
if err := installExtraPackages(ctx, logger, files, extraPackagesFile, workDir); err != nil {
return fmt.Errorf("failed to install extra packages: %v", err)
}
if err := pipInstallPackage(files, workDir, workflowFile, false, true, nil); err != nil {
if err := pipInstallPackage(ctx, logger, files, workDir, workflowFile, false, true, nil); err != nil {
return fmt.Errorf("failed to install workflow: %v", err)
}

Expand Down Expand Up @@ -450,7 +450,7 @@ func processArtifactsInSetupOnlyMode() {
}
files[i] = filePayload.GetPath()
}
if setupErr := installSetupPackages(files, workDir, []string{requirementsFile}); setupErr != nil {
if setupErr := installSetupPackages(context.Background(), nil, files, workDir, []string{requirementsFile}); setupErr != nil {
tvalentyn marked this conversation as resolved.
Show resolved Hide resolved
log.Fatalf("Failed to install required packages: %v", setupErr)
}
}
53 changes: 39 additions & 14 deletions sdks/python/container/piputil.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"log"
Expand All @@ -26,16 +27,18 @@ import (
"path/filepath"
"strings"

"github.com/apache/beam/sdks/v2/go/container/tools"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
)

// pipInstallRequirements installs the given requirement, if present.
func pipInstallRequirements(files []string, dir, name string) error {
func pipInstallRequirements(ctx context.Context, logger *tools.Logger, files []string, dir, name string) error {
pythonVersion, err := expansionx.GetPythonVersion()
if err != nil {
return err
}
bufLogger := tools.NewBufferedLogger(logger)
for _, file := range files {
if file == name {
// We run the install process in two rounds in order to avoid as much
Expand All @@ -50,7 +53,13 @@ func pipInstallRequirements(files []string, dir, name string) error {
// also installs dependencies. The key is that if all the packages have
// been installed in the first round then this command will be a no-op.
args = []string{"-m", "pip", "install", "-q", "-r", filepath.Join(dir, name), "--no-cache-dir", "--disable-pip-version-check", "--find-links", dir}
return execx.Execute(pythonVersion, args...)
err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...)
if err != nil {
bufLogger.FlushAtError(ctx)
return err
}
bufLogger.FlushAtDebug(ctx)
return nil
}
}
return nil
Expand All @@ -69,11 +78,12 @@ func isPackageInstalled(pkgName string) bool {
}

// pipInstallPackage installs the given package, if present.
func pipInstallPackage(files []string, dir, name string, force, optional bool, extras []string) error {
func pipInstallPackage(ctx context.Context, logger *tools.Logger, files []string, dir, name string, force, optional bool, extras []string) error {
pythonVersion, err := expansionx.GetPythonVersion()
if err != nil {
return err
}
bufLogger := tools.NewBufferedLogger(logger)
for _, file := range files {
if file == name {
var packageSpec = name
Expand All @@ -97,19 +107,34 @@ func pipInstallPackage(files []string, dir, name string, force, optional bool, e
// installed version will match the package specified, the package itself
// will not be reinstalled, but its dependencies will now be resolved and
// installed if necessary. This achieves our goal outlined above.
args := []string{"-m", "pip", "install", "-q", "--no-cache-dir", "--disable-pip-version-check", "--upgrade", "--force-reinstall", "--no-deps",
args := []string{"-m", "pip", "install", "--no-cache-dir", "--disable-pip-version-check", "--upgrade", "--force-reinstall", "--no-deps",
filepath.Join(dir, packageSpec)}
err := execx.Execute(pythonVersion, args...)
err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...)
jrmccluskey marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
bufLogger.FlushAtError(ctx)
return err
} else {
bufLogger.FlushAtDebug(ctx)
}
args = []string{"-m", "pip", "install", "-q", "--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
return execx.Execute(pythonVersion, args...)
args = []string{"-m", "pip", "install", "--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
err = execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...)
if err != nil {
bufLogger.FlushAtError(ctx)
return err
}
bufLogger.FlushAtDebug(ctx)
return nil
}

// Case when we do not perform a forced reinstall.
args := []string{"-m", "pip", "install", "-q", "--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
return execx.Execute(pythonVersion, args...)
args := []string{"-m", "pip", "install", "--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...)
if err != nil {
bufLogger.FlushAtError(ctx)
return err
}
bufLogger.FlushAtDebug(ctx)
return nil
}
}
if optional {
Expand All @@ -120,7 +145,7 @@ func pipInstallPackage(files []string, dir, name string, force, optional bool, e

// installExtraPackages installs all the packages declared in the extra
// packages manifest file.
func installExtraPackages(files []string, extraPackagesFile, dir string) error {
func installExtraPackages(ctx context.Context, logger *tools.Logger, files []string, extraPackagesFile, dir string) error {
// First check that extra packages manifest file is present.
for _, file := range files {
if file != extraPackagesFile {
Expand All @@ -139,7 +164,7 @@ func installExtraPackages(files []string, extraPackagesFile, dir string) error {
for s.Scan() {
extraPackage := s.Text()
log.Printf("Installing extra package: %s", extraPackage)
if err = pipInstallPackage(files, dir, extraPackage, true, false, nil); err != nil {
if err = pipInstallPackage(ctx, logger, files, dir, extraPackage, true, false, nil); err != nil {
return fmt.Errorf("failed to install extra package %s: %v", extraPackage, err)
}
}
Expand Down Expand Up @@ -167,13 +192,13 @@ func findBeamSdkWhl(files []string, acceptableWhlSpecs []string) string {
// assume that the pipleine was started with the Beam SDK found in the wheel
// file, and we try to install it. If not successful, we fall back to installing
// SDK from source tarball provided in sdkSrcFile.
func installSdk(files []string, workDir string, sdkSrcFile string, acceptableWhlSpecs []string, required bool) error {
func installSdk(ctx context.Context, logger *tools.Logger, files []string, workDir string, sdkSrcFile string, acceptableWhlSpecs []string, required bool) error {
sdkWhlFile := findBeamSdkWhl(files, acceptableWhlSpecs)

if sdkWhlFile != "" {
// by default, pip rejects to install wheel if same version already installed
isDev := strings.Contains(sdkWhlFile, ".dev")
err := pipInstallPackage(files, workDir, sdkWhlFile, isDev, false, []string{"gcp"})
err := pipInstallPackage(ctx, logger, files, workDir, sdkWhlFile, isDev, false, []string{"gcp"})
if err == nil {
return nil
}
Expand All @@ -185,6 +210,6 @@ func installSdk(files []string, workDir string, sdkSrcFile string, acceptableWhl
return nil
}
}
err := pipInstallPackage(files, workDir, sdkSrcFile, false, false, []string{"gcp"})
err := pipInstallPackage(ctx, logger, files, workDir, sdkSrcFile, false, false, []string{"gcp"})
return err
}
Loading