Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 59 additions & 29 deletions pkg/compose/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/docker/cli/cli-plugins/manager"
"github.com/docker/cli/cli-plugins/socket"
"github.com/docker/compose/v2/pkg/progress"
"github.com/docker/docker/errdefs"
"github.com/spf13/cobra"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
Expand All @@ -48,41 +47,19 @@ const (
SetEnvType = "setenv"
)

func (s *composeService) runPlugin(ctx context.Context, project *types.Project, service types.ServiceConfig, command string) error { //nolint:gocyclo
x := *service.Provider
func (s *composeService) runPlugin(ctx context.Context, project *types.Project, service types.ServiceConfig, command string) error {
provider := *service.Provider

// Only support Docker CLI plugins for first iteration. Could support any binary from PATH
plugin, err := manager.GetPlugin(x.Type, s.dockerCli, &cobra.Command{})
plugin, err := s.getPluginBinaryPath(provider.Type)
if err != nil {
if errdefs.IsNotFound(err) {
return fmt.Errorf("unsupported external service type %s", x.Type)
}
return err
}

args := []string{"compose", "--project-name", project.Name, command}
for k, v := range x.Options {
args = append(args, fmt.Sprintf("--%s=%s", k, v))
}

cmd := exec.CommandContext(ctx, plugin.Path, args...)
// Remove DOCKER_CLI_PLUGIN... variable so plugin can detect it run standalone
cmd.Env = filter(os.Environ(), manager.ReexecEnvvar)

// Use docker/cli mechanism to propagate termination signal to child process
server, err := socket.NewPluginServer(nil)
if err != nil {
defer server.Close() //nolint:errcheck
cmd.Cancel = server.Close
cmd.Env = replace(cmd.Env, socket.EnvKey, server.Addr().String())
if err := s.checkPluginEnabledInDD(ctx, plugin); err != nil {
return err
}

cmd.Env = append(cmd.Env, fmt.Sprintf("DOCKER_CONTEXT=%s", s.dockerCli.CurrentContext()))

// propagate opentelemetry context to child process, see https://github.com/open-telemetry/oteps/blob/main/text/0258-env-context-baggage-carriers.md
carrier := propagation.MapCarrier{}
otel.GetTextMapPropagator().Inject(ctx, &carrier)
cmd.Env = append(cmd.Env, types.Mapping(carrier).Values()...)
cmd := s.setupPluginCommand(ctx, project, provider, plugin.Path, command)

eg := errgroup.Group{}
stdout, err := cmd.StdoutPipe()
Expand Down Expand Up @@ -147,3 +124,56 @@ func (s *composeService) runPlugin(ctx context.Context, project *types.Project,
}
return nil
}

func (s *composeService) getPluginBinaryPath(providerType string) (*manager.Plugin, error) {
// Only support Docker CLI plugins for first iteration. Could support any binary from PATH
return manager.GetPlugin(providerType, s.dockerCli, &cobra.Command{})
}

func (s *composeService) setupPluginCommand(ctx context.Context, project *types.Project, provider types.ServiceProviderConfig, path, command string) *exec.Cmd {
args := []string{"compose", "--project-name", project.Name, command}
for k, v := range provider.Options {
args = append(args, fmt.Sprintf("--%s=%s", k, v))
}

cmd := exec.CommandContext(ctx, path, args...)
// Remove DOCKER_CLI_PLUGIN... variable so plugin can detect it run standalone
cmd.Env = filter(os.Environ(), manager.ReexecEnvvar)

// Use docker/cli mechanism to propagate termination signal to child process
server, err := socket.NewPluginServer(nil)
if err == nil {
defer server.Close() //nolint:errcheck
cmd.Cancel = server.Close
cmd.Env = replace(cmd.Env, socket.EnvKey, server.Addr().String())
}

cmd.Env = append(cmd.Env, fmt.Sprintf("DOCKER_CONTEXT=%s", s.dockerCli.CurrentContext()))

// propagate opentelemetry context to child process, see https://github.com/open-telemetry/oteps/blob/main/text/0258-env-context-baggage-carriers.md
carrier := propagation.MapCarrier{}
otel.GetTextMapPropagator().Inject(ctx, &carrier)
cmd.Env = append(cmd.Env, types.Mapping(carrier).Values()...)
return cmd
}

func (s *composeService) checkPluginEnabledInDD(ctx context.Context, plugin *manager.Plugin) error {
if integrationEnabled := s.isDesktopIntegrationActive(); !integrationEnabled {
return fmt.Errorf("you should enable Docker Desktop integration to use %q provider services", plugin.Name)
}

// Until we support more use cases, check explicitly status of model runner
if plugin.Name == "model" {
cmd := exec.CommandContext(ctx, "docker", "model", "status")
_, err := cmd.CombinedOutput()
if err != nil {
var exitErr *exec.ExitError
if errors.As(err, &exitErr) && exitErr.ExitCode() == 1 {
return fmt.Errorf("you should enable model runner to use %q provider services: %s", plugin.Name, err.Error())
}
}
} else {
return fmt.Errorf("unsupported provider %q", plugin.Name)
}
return nil
}