Skip to content

Commit

Permalink
Restore sandboxes on daemon restart
Browse files Browse the repository at this point in the history
Signed-off-by: Maksym Pavlenko <[email protected]>
  • Loading branch information
mxpv committed Apr 8, 2022
1 parent 0c5e5c3 commit 0d165e6
Show file tree
Hide file tree
Showing 12 changed files with 165 additions and 53 deletions.
7 changes: 7 additions & 0 deletions api/next.pb.txt
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,13 @@ file {
}
json_name: "extensions"
}
field {
name: "sandbox"
number: 11
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "sandbox"
}
nested_type {
name: "LabelsEntry"
field {
Expand Down
54 changes: 50 additions & 4 deletions api/services/containers/v1/containers.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions api/services/containers/v1/containers.proto
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ message Container {
// data, one should only update the specified extension using field paths
// to select a specific map key.
map<string, google.protobuf.Any> extensions = 10 [(gogoproto.nullable) = false];

// Sandbox ID this container belongs to.
string sandbox = 11;
}

message GetContainerRequest {
Expand Down
2 changes: 1 addition & 1 deletion cmd/containerd-shim-runc-v2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ package main
import (
"context"

_ "github.com/containerd/containerd/runtime/v2/pause"
"github.com/containerd/containerd/runtime/v2/runc/manager"
_ "github.com/containerd/containerd/runtime/v2/runc/pause"
_ "github.com/containerd/containerd/runtime/v2/runc/task/plugin"
"github.com/containerd/containerd/runtime/v2/shim"
)
Expand Down
12 changes: 11 additions & 1 deletion cmd/ctr/commands/sandboxes/sandboxes.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,24 @@ var removeCommand = cli.Command{
}
defer cancel()

force := context.Bool("force")

for _, id := range context.Args() {
sandbox, err := client.LoadSandbox(ctx, id)
if err != nil {
log.G(ctx).WithError(err).Errorf("failed to load sandbox %s", id)
continue
}

err = sandbox.Shutdown(ctx, context.Bool("force"))
err = sandbox.Stop(ctx)
if err != nil {
log.G(ctx).WithError(err).Errorf("failed to stop sandbox %s", id)
if !force {
continue
}
}

err = sandbox.Delete(ctx)
if err != nil {
log.G(ctx).WithError(err).Errorf("failed to shutdown sandbox %s", id)
continue
Expand Down
2 changes: 2 additions & 0 deletions containerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func containerToProto(container *containers.Container) containersapi.Container {
Snapshotter: container.Snapshotter,
SnapshotKey: container.SnapshotKey,
Extensions: extensions,
Sandbox: container.SandboxID,
}
}

Expand Down Expand Up @@ -193,6 +194,7 @@ func containerFromProto(containerpb *containersapi.Container) containers.Contain
CreatedAt: containerpb.CreatedAt,
UpdatedAt: containerpb.UpdatedAt,
Extensions: extensions,
SandboxID: containerpb.Sandbox,
}
}

Expand Down
64 changes: 46 additions & 18 deletions runtime/v2/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,40 @@ func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateO
}
}()

// This container belongs to sandbox which supposed to be already started via sandbox API.
if opts.SandboxID != "" {
process, err := m.Get(ctx, opts.SandboxID)
if err != nil {
return nil, fmt.Errorf("can't find sandbox %s", opts.SandboxID)
}

// Write sandbox ID this task belongs to.
if err := os.WriteFile(filepath.Join(bundle.Path, "sandbox"), []byte(opts.SandboxID), 0600); err != nil {
return nil, err
}

address, err := shimbinary.ReadAddress(filepath.Join(m.state, process.Namespace(), opts.SandboxID, "address"))
if err != nil {
return nil, fmt.Errorf("failed to get socket address for sandbox %q: %w", opts.SandboxID, err)
}

// Use sandbox's socket address to handle task requests for this container.
if err := shimbinary.WriteAddress(filepath.Join(bundle.Path, "address"), address); err != nil {
return nil, err
}

shim, err := loadShim(ctx, bundle, func() {})
if err != nil {
return nil, fmt.Errorf("failed to load sandbox task %q: %w", opts.SandboxID, err)
}

if err := m.shims.Add(ctx, shim); err != nil {
return nil, err
}

return shim, nil
}

shim, err := m.startShim(ctx, bundle, id, opts)
if err != nil {
return nil, err
Expand Down Expand Up @@ -391,22 +425,9 @@ func (m *TaskManager) ID() string {

// Create launches new shim instance and creates new task
func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (runtime.Task, error) {
var (
process ShimProcess
err error
)

if opts.SandboxID != "" {
// This container belongs to sandbox which supposed to be already started via sandbox API.
process, err = m.manager.Get(ctx, opts.SandboxID)
if err != nil {
return nil, fmt.Errorf("can't find sandbox %s", opts.SandboxID)
}
} else {
process, err = m.manager.Start(ctx, taskID, opts)
if err != nil {
return nil, fmt.Errorf("failed to start shim: %w", err)
}
process, err := m.manager.Start(ctx, taskID, opts)
if err != nil {
return nil, fmt.Errorf("failed to start shim: %w", err)
}

// Cast to shim task and call task service to create a new container task instance.
Expand All @@ -420,7 +441,8 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr
dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout)
defer cancel()

_, errShim := shim.delete(dctx, func(context.Context, string) {})
sandboxed := opts.SandboxID != ""
_, errShim := shim.delete(dctx, sandboxed, func(context.Context, string) {})
if errShim != nil {
if errdefs.IsDeadlineExceeded(errShim) {
dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout)
Expand Down Expand Up @@ -454,8 +476,14 @@ func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit,
return nil, err
}

container, err := m.manager.containers.Get(ctx, taskID)
if err != nil {
return nil, err
}

sandboxed := container.SandboxID != ""
shimTask := item.(*shimTask)
exit, err := shimTask.delete(ctx, func(ctx context.Context, id string) {
exit, err := shimTask.delete(ctx, sandboxed, func(ctx context.Context, id string) {
m.manager.shims.Delete(ctx, id)
})

Expand Down
18 changes: 16 additions & 2 deletions runtime/v2/pause/sandbox.go → runtime/v2/runc/pause/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package pause
import (
"context"

"github.com/containerd/containerd/pkg/shutdown"
"github.com/containerd/ttrpc"
log "github.com/sirupsen/logrus"

Expand All @@ -33,14 +34,26 @@ func init() {
plugin.Register(&plugin.Registration{
Type: plugin.TTRPCPlugin,
ID: "pause",
Requires: []plugin.Type{
plugin.InternalPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
return &pauseService{}, nil
ss, err := ic.GetByID(plugin.InternalPlugin, "shutdown")
if err != nil {
return nil, err
}

return &pauseService{
shutdown: ss.(shutdown.Service),
}, nil
},
})
}

// pauseService is an extension for task v2 runtime to support Pod "pause" containers via sandbox API.
type pauseService struct{}
type pauseService struct {
shutdown shutdown.Service
}

var _ api.SandboxService = (*pauseService)(nil)

Expand All @@ -56,6 +69,7 @@ func (p *pauseService) StartSandbox(ctx context.Context, req *api.StartSandboxRe

func (p *pauseService) StopSandbox(ctx context.Context, req *api.StopSandboxRequest) (*api.StopSandboxResponse, error) {
log.Debugf("stop sandbox request: %+v", req)
p.shutdown.Shutdown()
return &api.StopSandboxResponse{}, nil
}

Expand Down
10 changes: 7 additions & 3 deletions runtime/v2/shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (s *shimTask) PID(ctx context.Context) (uint32, error) {
return response.TaskPid, nil
}

func (s *shimTask) delete(ctx context.Context, removeTask func(ctx context.Context, id string)) (*runtime.Exit, error) {
func (s *shimTask) delete(ctx context.Context, sandboxed bool, removeTask func(ctx context.Context, id string)) (*runtime.Exit, error) {
response, shimErr := s.task.Delete(ctx, &task.DeleteRequest{
ID: s.ID(),
})
Expand Down Expand Up @@ -305,8 +305,12 @@ func (s *shimTask) delete(ctx context.Context, removeTask func(ctx context.Conte
removeTask(ctx, s.ID())
}

if err := s.waitShutdown(ctx); err != nil {
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim task")
// Don't shutdown sandbox as there may be other containers running.
// Let controller decide when to shutdown.
if !sandboxed {
if err := s.waitShutdown(ctx); err != nil {
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to shutdown shim task")
}
}

if err := s.shim.delete(ctx); err != nil {
Expand Down
41 changes: 19 additions & 22 deletions sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ package containerd

import (
"context"
"fmt"
"time"

"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/oci"
api "github.com/containerd/containerd/sandbox"
"github.com/containerd/typeurl"
"github.com/gogo/protobuf/types"
Expand All @@ -37,9 +38,10 @@ type Sandbox interface {
Labels(ctx context.Context) (map[string]string, error)
// Start starts new sandbox instance
Start(ctx context.Context) error
// Shutdown will turn down existing sandbox instance.
// If using force, the client will ignore shutdown errors.
Shutdown(ctx context.Context, force bool) error
// Stop sends stop request to the shim instance.
Stop(ctx context.Context) error
// Delete removes sandbox from the metadata store.
Delete(ctx context.Context) error
// Pause will freeze running sandbox instance
Pause(ctx context.Context) error
// Resume will unfreeze previously paused sandbox instance
Expand Down Expand Up @@ -76,23 +78,12 @@ func (s *sandboxClient) Start(ctx context.Context) error {
return s.client.SandboxController().Start(ctx, s.ID())
}

func (s *sandboxClient) Shutdown(ctx context.Context, force bool) error {
var (
controller = s.client.SandboxController()
store = s.client.SandboxStore()
)

err := controller.Shutdown(ctx, s.ID())
if err != nil && !force {
return fmt.Errorf("failed to shutdown sandbox: %w", err)
}

err = store.Delete(ctx, s.ID())
if err != nil {
return fmt.Errorf("failed to delete sandbox from metadata store: %w", err)
}
func (s *sandboxClient) Stop(ctx context.Context) error {
return s.client.SandboxController().Shutdown(ctx, s.ID())
}

return nil
func (s *sandboxClient) Delete(ctx context.Context) error {
return s.client.SandboxStore().Delete(ctx, s.ID())
}

func (s *sandboxClient) Pause(ctx context.Context) error {
Expand Down Expand Up @@ -187,9 +178,15 @@ func WithSandboxRuntime(name string, options interface{}) NewSandboxOpts {
}

// WithSandboxSpec will provide the sandbox runtime spec
func WithSandboxSpec(spec interface{}) NewSandboxOpts {
func WithSandboxSpec(s *oci.Spec, opts ...oci.SpecOpts) NewSandboxOpts {
return func(ctx context.Context, client *Client, sandbox *api.Sandbox) error {
spec, err := typeurl.MarshalAny(spec)
c := &containers.Container{ID: sandbox.ID}

if err := oci.ApplyOpts(ctx, client, c, s, opts...); err != nil {
return err
}

spec, err := typeurl.MarshalAny(s)
if err != nil {
return errors.Wrap(err, "failed to marshal spec")
}
Expand Down
Loading

0 comments on commit 0d165e6

Please sign in to comment.