Skip to content

Commit

Permalink
Merge pull request #3022 from docker/feature-volumes
Browse files Browse the repository at this point in the history
Merge CSI/Cluster Volumes code into Master
  • Loading branch information
dperny authored Aug 3, 2021
2 parents 3629f50 + 24aaa54 commit 516e8f3
Show file tree
Hide file tree
Showing 318 changed files with 102,979 additions and 14,423 deletions.
36 changes: 35 additions & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,40 @@ func (a *Agent) UpdateTaskStatus(ctx context.Context, taskID string, status *api
}
}

// ReportVolumeUnpublished sends a Volume status update to the manager
// indicating that the provided volume has been successfully unpublished.
func (a *Agent) ReportVolumeUnpublished(ctx context.Context, volumeID string) error {
l := log.G(ctx).WithField("volume.ID", volumeID)
l.Debug("(*Agent).ReportVolumeUnpublished")
ctx, cancel := context.WithCancel(ctx)
defer cancel()

errs := make(chan error, 1)
if err := a.withSession(ctx, func(session *session) error {
go func() {
err := session.reportVolumeUnpublished(ctx, []string{volumeID})
if err != nil {
l.WithError(err).Error("error reporting volume unpublished")
} else {
l.Debug("reported volume unpublished")
}

errs <- err
}()

return nil
}); err != nil {
return err
}

select {
case err := <-errs:
return err
case <-ctx.Done():
return ctx.Err()
}
}

// Publisher returns a LogPublisher for the given subscription
// as well as a cancel function that should be called when the log stream
// is completed.
Expand Down Expand Up @@ -597,8 +631,8 @@ func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogP
func (a *Agent) nodeDescriptionWithHostname(ctx context.Context, tlsInfo *api.NodeTLSInfo) (*api.NodeDescription, error) {
desc, err := a.config.Executor.Describe(ctx)

// Override hostname and TLS info
if desc != nil {
// Override hostname and TLS info
if a.config.Hostname != "" {
desc.Hostname = a.config.Hostname
}
Expand Down
121 changes: 121 additions & 0 deletions agent/csi/plugin/client_fake_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package plugin

import (
"context"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/container-storage-interface/spec/lib/go/csi"
)

type fakeNodeClient struct {
// stagedVolumes is a set of all volume IDs for which NodeStageVolume has been
// called on this fake
stagedVolumes map[string]struct{}

// getInfoRequests is a log of all requests to NodeGetInfo.
getInfoRequests []*csi.NodeGetInfoRequest
// stageVolumeRequests is a log of all requests to NodeStageVolume.
stageVolumeRequests []*csi.NodeStageVolumeRequest
// unstageVolumeRequests is a log of all requests to NodeUnstageVolume.
unstageVolumeRequests []*csi.NodeUnstageVolumeRequest
// publishVolumeRequests is a log of all requests to NodePublishVolume.
publishVolumeRequests []*csi.NodePublishVolumeRequest
// unpublishVolumeRequests is a log of all requests to NodeUnpublishVolume.
unpublishVolumeRequests []*csi.NodeUnpublishVolumeRequest
// getCapabilitiesRequests is a log of all requests to NodeGetInfo.
getCapabilitiesRequests []*csi.NodeGetCapabilitiesRequest
// idCounter is a simple way to generate ids
idCounter int
// isStaging indicates if plugin supports stage/unstage capability
isStaging bool
// node ID is identifier for the node.
nodeID string
}

func newFakeNodeClient(isStaging bool, nodeID string) *fakeNodeClient {
return &fakeNodeClient{
stagedVolumes: map[string]struct{}{},
getInfoRequests: []*csi.NodeGetInfoRequest{},
stageVolumeRequests: []*csi.NodeStageVolumeRequest{},
unstageVolumeRequests: []*csi.NodeUnstageVolumeRequest{},
publishVolumeRequests: []*csi.NodePublishVolumeRequest{},
unpublishVolumeRequests: []*csi.NodeUnpublishVolumeRequest{},
getCapabilitiesRequests: []*csi.NodeGetCapabilitiesRequest{},
isStaging: isStaging,
nodeID: nodeID,
}
}

func (f *fakeNodeClient) NodeGetInfo(ctx context.Context, in *csi.NodeGetInfoRequest, _ ...grpc.CallOption) (*csi.NodeGetInfoResponse, error) {

f.idCounter++
f.getInfoRequests = append(f.getInfoRequests, in)
return &csi.NodeGetInfoResponse{
NodeId: f.nodeID,
}, nil
}

func (f *fakeNodeClient) NodeStageVolume(ctx context.Context, in *csi.NodeStageVolumeRequest, opts ...grpc.CallOption) (*csi.NodeStageVolumeResponse, error) {
f.idCounter++
f.stageVolumeRequests = append(f.stageVolumeRequests, in)
f.stagedVolumes[in.VolumeId] = struct{}{}

return &csi.NodeStageVolumeResponse{}, nil
}

func (f *fakeNodeClient) NodeUnstageVolume(ctx context.Context, in *csi.NodeUnstageVolumeRequest, opts ...grpc.CallOption) (*csi.NodeUnstageVolumeResponse, error) {
f.idCounter++
f.unstageVolumeRequests = append(f.unstageVolumeRequests, in)

if _, ok := f.stagedVolumes[in.VolumeId]; !ok {
return nil, status.Error(codes.FailedPrecondition, "can't unstage volume that is not already staged")
}

delete(f.stagedVolumes, in.VolumeId)

return &csi.NodeUnstageVolumeResponse{}, nil
}

func (f *fakeNodeClient) NodePublishVolume(ctx context.Context, in *csi.NodePublishVolumeRequest, opts ...grpc.CallOption) (*csi.NodePublishVolumeResponse, error) {
f.idCounter++
f.publishVolumeRequests = append(f.publishVolumeRequests, in)

return &csi.NodePublishVolumeResponse{}, nil
}

func (f *fakeNodeClient) NodeUnpublishVolume(ctx context.Context, in *csi.NodeUnpublishVolumeRequest, opts ...grpc.CallOption) (*csi.NodeUnpublishVolumeResponse, error) {
f.idCounter++
f.unpublishVolumeRequests = append(f.unpublishVolumeRequests, in)
return &csi.NodeUnpublishVolumeResponse{}, nil

}

func (f *fakeNodeClient) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVolumeStatsRequest, opts ...grpc.CallOption) (*csi.NodeGetVolumeStatsResponse, error) {
return nil, nil
}

func (f *fakeNodeClient) NodeExpandVolume(ctx context.Context, in *csi.NodeExpandVolumeRequest, opts ...grpc.CallOption) (*csi.NodeExpandVolumeResponse, error) {
return nil, nil
}

func (f *fakeNodeClient) NodeGetCapabilities(ctx context.Context, in *csi.NodeGetCapabilitiesRequest, opts ...grpc.CallOption) (*csi.NodeGetCapabilitiesResponse, error) {
f.idCounter++
f.getCapabilitiesRequests = append(f.getCapabilitiesRequests, in)
if f.isStaging {
return &csi.NodeGetCapabilitiesResponse{
Capabilities: []*csi.NodeServiceCapability{
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
},
},
},
},
}, nil
}
return &csi.NodeGetCapabilitiesResponse{}, nil
}
119 changes: 119 additions & 0 deletions agent/csi/plugin/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package plugin

import (
"context"
"fmt"
"sync"

"github.com/docker/docker/pkg/plugingetter"

"github.com/docker/swarmkit/api"
)

const (
// DockerCSIPluginCap is the capability name of the plugins we use with the
// PluginGetter to get only the plugins we need. The full name of the
// plugin interface is "swarm.csiplugin/1.0"
DockerCSIPluginCap = "csiplugin"
)

// PluginManager manages the multiple CSI plugins that may be in use on the
// node. PluginManager should be thread-safe.
type PluginManager interface {
// Get gets the plugin with the given name
Get(name string) (NodePlugin, error)

// NodeInfo returns the NodeCSIInfo for every active plugin.
NodeInfo(ctx context.Context) ([]*api.NodeCSIInfo, error)
}

type pluginManager struct {
plugins map[string]NodePlugin
pluginsMu sync.Mutex

// newNodePluginFunc usually points to NewNodePlugin. However, for testing,
// NewNodePlugin can be swapped out with a function that creates fake node
// plugins
newNodePluginFunc func(string, plugingetter.CompatPlugin, plugingetter.PluginAddr, SecretGetter) NodePlugin

// secrets is a SecretGetter for use by node plugins.
secrets SecretGetter

pg plugingetter.PluginGetter
}

func NewPluginManager(pg plugingetter.PluginGetter, secrets SecretGetter) PluginManager {
return &pluginManager{
plugins: map[string]NodePlugin{},
newNodePluginFunc: NewNodePlugin,
secrets: secrets,
pg: pg,
}
}

func (pm *pluginManager) Get(name string) (NodePlugin, error) {
pm.pluginsMu.Lock()
defer pm.pluginsMu.Unlock()

plugin, err := pm.getPlugin(name)
if err != nil {
return nil, fmt.Errorf("cannot get plugin %v: %v", name, err)
}

return plugin, nil
}

func (pm *pluginManager) NodeInfo(ctx context.Context) ([]*api.NodeCSIInfo, error) {
// TODO(dperny): do not acquire this lock for the duration of the the
// function call. that's too long and too blocking.
pm.pluginsMu.Lock()
defer pm.pluginsMu.Unlock()

// first, we should make sure all of the plugins are initialized. do this
// by looking up all the current plugins with DockerCSIPluginCap.
plugins := pm.pg.GetAllManagedPluginsByCap(DockerCSIPluginCap)
for _, plugin := range plugins {
// TODO(dperny): use this opportunity to drop plugins that we're
// tracking but which no longer exist.

// we don't actually need the plugin returned, we just need it loaded
// as a side effect.
pm.getPlugin(plugin.Name())
}

nodeInfo := []*api.NodeCSIInfo{}
for _, plugin := range pm.plugins {
info, err := plugin.NodeGetInfo(ctx)
if err != nil {
// skip any plugin that returns an error
continue
}

nodeInfo = append(nodeInfo, info)
}
return nodeInfo, nil
}

// getPlugin looks up the plugin with the specified name. Loads the plugin if
// not yet loaded.
//
// pm.pluginsMu must be obtained before calling this method.
func (pm *pluginManager) getPlugin(name string) (NodePlugin, error) {
if p, ok := pm.plugins[name]; ok {
return p, nil
}

pc, err := pm.pg.Get(name, DockerCSIPluginCap, plugingetter.Lookup)
if err != nil {
return nil, err
}

pa, ok := pc.(plugingetter.PluginAddr)
if !ok {
return nil, fmt.Errorf("plugin does not implement PluginAddr interface")
}

p := pm.newNodePluginFunc(name, pc, pa, pm.secrets)
pm.plugins[name] = p
return p, nil
}
80 changes: 80 additions & 0 deletions agent/csi/plugin/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package plugin

import (
"context"
"net"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"github.com/docker/swarmkit/testutils"
)

var _ = Describe("PluginManager", func() {
var (
pm *pluginManager
pg *testutils.FakePluginGetter
)

BeforeEach(func() {
pg = &testutils.FakePluginGetter{
Plugins: map[string]*testutils.FakeCompatPlugin{},
}

pm = &pluginManager{
plugins: map[string]NodePlugin{},
newNodePluginFunc: newFakeNodePlugin,
pg: pg,
}

pg.Plugins["plug1"] = &testutils.FakeCompatPlugin{
PluginName: "plug1",
PluginAddr: &net.UnixAddr{
Net: "unix",
Name: "",
},
}
pg.Plugins["plug2"] = &testutils.FakeCompatPlugin{
PluginName: "plug2",
PluginAddr: &net.UnixAddr{
Net: "unix",
Name: "fail",
},
}
pg.Plugins["plug3"] = &testutils.FakeCompatPlugin{
PluginName: "plug3",
PluginAddr: &net.UnixAddr{
Net: "unix",
Name: "",
},
}
})

Describe("Get", func() {
It("should return the requested plugin", func() {
p, err := pm.Get("plug1")
Expect(err).ToNot(HaveOccurred())
Expect(p).ToNot(BeNil())
})

It("should return an error if no plugin can be found", func() {
p, err := pm.Get("plugNotHere")
Expect(err).To(HaveOccurred())
Expect(p).To(BeNil())
})
})

Describe("NodeInfo", func() {
It("should return NodeCSIInfo for every active plugin", func() {
info, err := pm.NodeInfo(context.Background())
Expect(err).ToNot(HaveOccurred())

pluginNames := []string{}
for _, i := range info {
pluginNames = append(pluginNames, i.PluginName)
}

Expect(pluginNames).To(ConsistOf("plug1", "plug3"))
})
})
})
Loading

0 comments on commit 516e8f3

Please sign in to comment.