diff --git a/README.md b/README.md index e78b34b92..d3f7afc4a 100644 --- a/README.md +++ b/README.md @@ -2,12 +2,24 @@ This repository hosts the CSI Hostpath driver and all of its build and dependent configuration files to deploy the driver. +--- +*WARNING: This driver is just a demo implementation and is used for CI testing. This has many fake implementations and other non-standard best practices, and should not be used as an example of how to write a real driver. +--- + ## Pre-requisite - Kubernetes cluster - Running version 1.13 or later - Access to terminal with `kubectl` installed - For Kubernetes 1.17 or later, the VolumeSnapshot beta CRDs and Snapshot Controller must be installed as part of the cluster deployment (see Kubernetes 1.17+ deployment instructions) +## Features + +The driver can provide empty directories that are backed by the same filesystem as EmptyDir volumes. In addition, it can provide raw block volumes that are backed by a single file in that same filesystem and bound to a loop device. + +[Various command line parameters](cmd/hostpathplugin/maing.go) influence the behavior of the driver. This is relevant in particular for the end-to-end testing that this driver is used for in Kubernetes. + +Usually, the driver implements all CSI operations itself. When deployed with the `-proxy-endpoint` parameter, it instead proxies all incoming connections for a CSI driver that is [embedded inside the Kubernetes E2E test suite](https://github.com/kubernetes/kubernetes/tree/master/test/e2e/storage/drivers/csi-test) and used for mocking a CSI driver [with callbacks provided by certain tests](https://github.com/kubernetes/kubernetes/blob/5ad79eae2dcbf33df3b35c48ec993d30fbda46dd/test/e2e/storage/csi_mock_volume.go#L110). + ## Deployment Deployment varies depending on the Kubernetes version your cluster is running: - [Deployment for Kubernetes 1.17 and later](docs/deploy-1.17-and-later.md) diff --git a/cmd/hostpathplugin/main.go b/cmd/hostpathplugin/main.go index 55debe371..83b20cfde 100644 --- a/cmd/hostpathplugin/main.go +++ b/cmd/hostpathplugin/main.go @@ -17,11 +17,16 @@ limitations under the License. package main import ( + "context" "flag" "fmt" "os" + "os/signal" "path" + "syscall" + "github.com/golang/glog" + "github.com/kubernetes-csi/csi-driver-host-path/internal/proxy" "github.com/kubernetes-csi/csi-driver-host-path/pkg/hostpath" ) @@ -30,7 +35,7 @@ func init() { } var ( - endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint") + csiEndpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint") driverName = flag.String("drivername", "hostpath.csi.k8s.io", "name of the driver") nodeID = flag.String("nodeid", "", "node id") ephemeral = flag.Bool("ephemeral", false, "publish volumes in ephemeral mode even if kubelet did not ask for it (only needed for Kubernetes 1.15)") @@ -41,6 +46,10 @@ var ( flag.Var(c, "capacity", "Simulate storage capacity. The parameter is = where is the value of a 'kind' storage class parameter and is the total amount of bytes for that kind. The flag may be used multiple times to configure different kinds.") return c }() + enableAttach = flag.Bool("enable-attach", false, "Enables RPC_PUBLISH_UNPUBLISH_VOLUME capability.") + // The proxy-endpoint option is intended to used by the Kubernetes E2E test suite + // for proxying incoming calls to the embedded mock CSI driver. + proxyEndpoint = flag.String("proxy-endpoint", "", "Instead of running the CSI driver code, just proxy connections from csiEndpoint to the given listening socket.") // Set by the build process version = "" ) @@ -58,7 +67,30 @@ func main() { fmt.Fprintln(os.Stderr, "Deprecation warning: The ephemeral flag is deprecated and should only be used when deploying on Kubernetes 1.15. It will be removed in the future.") } - driver, err := hostpath.NewHostPathDriver(*driverName, *nodeID, *endpoint, *ephemeral, *maxVolumesPerNode, version, capacity) + if *proxyEndpoint != "" { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + closer, err := proxy.Run(ctx, *csiEndpoint, *proxyEndpoint) + if err != nil { + glog.Fatalf("failed to run proxy: %v", err) + } + defer closer.Close() + + // Wait for signal + sigc := make(chan os.Signal, 1) + sigs := []os.Signal{ + syscall.SIGTERM, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGQUIT, + } + signal.Notify(sigc, sigs...) + + <-sigc + return + } + + driver, err := hostpath.NewHostPathDriver(*driverName, *nodeID, *csiEndpoint, *ephemeral, *maxVolumesPerNode, version, capacity, *enableAttach) if err != nil { fmt.Printf("Failed to initialize driver: %s", err.Error()) os.Exit(1) diff --git a/internal/endpoint/endpoint.go b/internal/endpoint/endpoint.go new file mode 100644 index 000000000..4f85b5ba3 --- /dev/null +++ b/internal/endpoint/endpoint.go @@ -0,0 +1,57 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpoint + +import ( + "fmt" + "net" + "os" + "strings" +) + +func Parse(ep string) (string, string, error) { + if strings.HasPrefix(strings.ToLower(ep), "unix://") || strings.HasPrefix(strings.ToLower(ep), "tcp://") { + s := strings.SplitN(ep, "://", 2) + if s[1] != "" { + return s[0], s[1], nil + } + return "", "", fmt.Errorf("Invalid endpoint: %v", ep) + } + // Assume everything else is a file path for a Unix Domain Socket. + return "unix", ep, nil +} + +func Listen(endpoint string) (net.Listener, func(), error) { + proto, addr, err := Parse(endpoint) + if err != nil { + return nil, nil, err + } + + cleanup := func() {} + if proto == "unix" { + addr = "/" + addr + if err := os.Remove(addr); err != nil && !os.IsNotExist(err) { //nolint: vetshadow + return nil, nil, fmt.Errorf("%s: %q", addr, err) + } + cleanup = func() { + os.Remove(addr) + } + } + + l, err := net.Listen(proto, addr) + return l, cleanup, err +} diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go new file mode 100644 index 000000000..b5df3a8e4 --- /dev/null +++ b/internal/proxy/proxy.go @@ -0,0 +1,146 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package proxy makes it possible to forward a listening socket in +// situations where the proxy cannot connect to some other address. +// Instead, it creates two listening sockets, pairs two incoming +// connections and then moves data back and forth. This matches +// the behavior of the following socat command: +// socat -d -d -d UNIX-LISTEN:/tmp/socat,fork TCP-LISTEN:9000,reuseport +// +// The advantage over that command is that both listening +// sockets are always open, in contrast to the socat solution +// where the TCP port is only open when there actually is a connection +// available. +// +// To establish a connection, someone has to poll the proxy with a dialer. +package proxy + +import ( + "context" + "fmt" + "io" + "net" + + "github.com/golang/glog" + + "github.com/kubernetes-csi/csi-driver-host-path/internal/endpoint" +) + +// New listens on both endpoints and starts accepting connections +// until closed or the context is done. +func Run(ctx context.Context, endpoint1, endpoint2 string) (io.Closer, error) { + proxy := &proxy{} + failedProxy := proxy + defer func() { + if failedProxy != nil { + failedProxy.Close() + } + }() + + proxy.ctx, proxy.cancel = context.WithCancel(ctx) + + var err error + proxy.s1, proxy.cleanup1, err = endpoint.Listen(endpoint1) + if err != nil { + return nil, fmt.Errorf("listen %s: %v", endpoint1, err) + } + proxy.s2, proxy.cleanup2, err = endpoint.Listen(endpoint2) + if err != nil { + return nil, fmt.Errorf("listen %s: %v", endpoint2, err) + } + + glog.V(3).Infof("proxy listening on %s and %s", endpoint1, endpoint2) + + go func() { + for { + // We block on the first listening socket. + // The Linux kernel proactively accepts connections + // on the second one which we will take over below. + conn1 := accept(proxy.ctx, proxy.s1, endpoint1) + if conn1 == nil { + // Done, shut down. + glog.V(5).Infof("proxy endpoint %s closed, shutting down", endpoint1) + return + } + conn2 := accept(proxy.ctx, proxy.s2, endpoint2) + if conn2 == nil { + // Done, shut down. The already accepted + // connection gets closed. + glog.V(5).Infof("proxy endpoint %s closed, shutting down and close established connection", endpoint2) + conn1.Close() + return + } + + glog.V(3).Infof("proxy established a new connection between %s and %s", endpoint1, endpoint2) + go copy(conn1, conn2, endpoint1, endpoint2) + go copy(conn2, conn1, endpoint2, endpoint1) + } + }() + + failedProxy = nil + return proxy, nil +} + +type proxy struct { + ctx context.Context + cancel func() + s1, s2 net.Listener + cleanup1, cleanup2 func() +} + +func (p *proxy) Close() error { + if p.cancel != nil { + p.cancel() + } + if p.s1 != nil { + p.s1.Close() + } + if p.s2 != nil { + p.s2.Close() + } + if p.cleanup1 != nil { + p.cleanup1() + } + if p.cleanup2 != nil { + p.cleanup2() + } + return nil +} + +func copy(from, to net.Conn, fromEndpoint, toEndpoint string) { + glog.V(5).Infof("starting to copy %s -> %s", fromEndpoint, toEndpoint) + // Signal recipient that no more data is going to come. + // This also stops reading from it. + defer to.Close() + // Copy data until EOF. + cnt, err := io.Copy(to, from) + glog.V(5).Infof("done copying %s -> %s: %d bytes, %v", fromEndpoint, toEndpoint, cnt, err) +} + +func accept(ctx context.Context, s net.Listener, endpoint string) net.Conn { + for { + c, err := s.Accept() + if err == nil { + return c + } + // Ignore error if we are shutting down. + if ctx.Err() != nil { + return nil + } + glog.V(3).Infof("accept on %s failed: %v", endpoint, err) + } +} diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go new file mode 100644 index 000000000..e83548023 --- /dev/null +++ b/internal/proxy/proxy_test.go @@ -0,0 +1,103 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "bytes" + "context" + "io" + "net" + "testing" +) + +func TestProxy(t *testing.T) { + tmpdir := t.TempDir() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + endpoint1 := tmpdir + "/a.sock" + endpoint2 := tmpdir + "/b.sock" + + closer, err := Run(ctx, endpoint1, endpoint2) + if err != nil { + t.Fatalf("proxy error: %v", err) + } + defer closer.Close() + + t.Run("a-to-b", func(t *testing.T) { + sendReceive(t, endpoint1, endpoint2) + }) + t.Run("b-to-a", func(t *testing.T) { + sendReceive(t, endpoint2, endpoint1) + }) +} + +func sendReceive(t *testing.T, endpoint1, endpoint2 string) { + conn1, err := net.Dial("unix", endpoint1) + if err != nil { + t.Fatalf("error connecting to first endpoint %s: %v", endpoint1, err) + } + defer conn1.Close() + conn2, err := net.Dial("unix", endpoint2) + if err != nil { + t.Fatalf("error connecting to second endpoint %s: %v", endpoint2, err) + } + defer conn2.Close() + + req1 := "ping" + if _, err := conn1.Write([]byte(req1)); err != nil { + t.Fatalf("error writing %q: %v", req1, err) + } + buffer := make([]byte, 100) + len, err := conn2.Read(buffer) + if err != nil { + t.Fatalf("error reading %q: %v", req1, err) + } + if string(buffer[:len]) != req1 { + t.Fatalf("expected %q, got %q", req1, string(buffer[:len])) + } + + resp1 := "pong-pong" + if _, err := conn2.Write([]byte(resp1)); err != nil { + t.Fatalf("error writing %q: %v", resp1, err) + } + buffer = make([]byte, 100) + len, err = conn1.Read(buffer) + if err != nil { + t.Fatalf("error reading %q: %v", resp1, err) + } + if string(buffer[:len]) != resp1 { + t.Fatalf("expected %q, got %q", resp1, string(buffer[:len])) + } + + // Closing one side should be noticed at the other end. + err = conn1.Close() + if err != nil { + t.Fatalf("error closing connection to %s: %v", endpoint1, err) + } + len2, err := io.Copy(&bytes.Buffer{}, conn2) + if err != nil { + t.Fatalf("error reading from %s: %v", endpoint2, err) + } + if len2 != 0 { + t.Fatalf("unexpected data via %s: %d", endpoint2, len2) + } + err = conn2.Close() + if err != nil { + t.Fatalf("error closing connection to %s: %v", endpoint2, err) + } +} diff --git a/pkg/hostpath/controllerserver.go b/pkg/hostpath/controllerserver.go index 7bca915a9..9d251468f 100644 --- a/pkg/hostpath/controllerserver.go +++ b/pkg/hostpath/controllerserver.go @@ -99,9 +99,9 @@ func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque defer hp.mutex.Unlock() capacity := int64(req.GetCapacityRange().GetRequiredBytes()) - topologies := []*csi.Topology{&csi.Topology{ - Segments: map[string]string{TopologyKeyNode: hp.nodeID}, - }} + topologies := []*csi.Topology{ + {Segments: map[string]string{TopologyKeyNode: hp.nodeID}}, + } // Need to check for already existing volume name, and if found // check for the requested capacity and already allocated capacity @@ -202,6 +202,17 @@ func (hp *hostPath) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeReque defer hp.mutex.Unlock() volId := req.GetVolumeId() + vol, err := hp.getVolumeByID(volId) + if err != nil { + // Volume not found: might have already deleted + return &csi.DeleteVolumeResponse{}, nil + } + + if vol.IsAttached || vol.IsPublished || vol.IsStaged { + return nil, status.Errorf(codes.Internal, "Volume '%s' is still used (attached: %v, staged: %v, published: %v) by '%s' node", + vol.VolID, vol.IsAttached, vol.IsStaged, vol.IsPublished, vol.NodeID) + } + if err := hp.deleteVolume(volId); err != nil { return nil, fmt.Errorf("failed to delete volume %v: %w", volId, err) } @@ -254,11 +265,91 @@ func (hp *hostPath) ValidateVolumeCapabilities(ctx context.Context, req *csi.Val } func (hp *hostPath) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { - return nil, status.Error(codes.Unimplemented, "") + if !hp.enableAttach { + return nil, status.Error(codes.Unimplemented, "ControllerPublishVolume is not supported") + } + + if len(req.VolumeId) == 0 { + return nil, status.Error(codes.InvalidArgument, "Volume ID cannot be empty") + } + if len(req.NodeId) == 0 { + return nil, status.Error(codes.InvalidArgument, "Node ID cannot be empty") + } + if req.VolumeCapability == nil { + return nil, status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty") + } + + if req.NodeId != hp.nodeID { + return nil, status.Errorf(codes.NotFound, "Not matching Node ID %s to hostpath Node ID %s", req.NodeId, hp.nodeID) + } + + hp.mutex.Lock() + defer hp.mutex.Unlock() + + vol, err := hp.getVolumeByID(req.VolumeId) + if err != nil { + return nil, status.Error(codes.NotFound, err.Error()) + } + + // Check to see if the volume is already published. + if vol.IsAttached { + // Check if readonly flag is compatible with the publish request. + if req.GetReadonly() != vol.ReadOnlyAttach { + return nil, status.Error(codes.AlreadyExists, "Volume published but has incompatible readonly flag") + } + + return &csi.ControllerPublishVolumeResponse{ + PublishContext: map[string]string{}, + }, nil + } + + vol.IsAttached = true + vol.ReadOnlyAttach = req.GetReadonly() + if err := hp.updateVolume(vol.VolID, vol); err != nil { + return nil, status.Errorf(codes.Internal, "failed to update volume %s: %v", vol.VolID, err) + } + + return &csi.ControllerPublishVolumeResponse{ + PublishContext: map[string]string{}, + }, nil } func (hp *hostPath) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { - return nil, status.Error(codes.Unimplemented, "") + if !hp.enableAttach { + return nil, status.Error(codes.Unimplemented, "ControllerUnpublishVolume is not supported") + } + + if len(req.VolumeId) == 0 { + return nil, status.Error(codes.InvalidArgument, "Volume ID cannot be empty") + } + + // Empty node id is not a failure as per Spec + if req.NodeId != "" && req.NodeId != hp.nodeID { + return nil, status.Errorf(codes.NotFound, "Node ID %s does not match to expected Node ID %s", req.NodeId, hp.nodeID) + } + + hp.mutex.Lock() + defer hp.mutex.Unlock() + + vol, err := hp.getVolumeByID(req.VolumeId) + if err != nil { + // Not an error: a non-existent volume is not published. + // See also https://github.com/kubernetes-csi/external-attacher/pull/165 + return &csi.ControllerUnpublishVolumeResponse{}, nil + } + + // Check to see if the volume is staged/published on a node + if vol.IsPublished || vol.IsStaged { + return nil, status.Errorf(codes.Internal, "Volume '%s' is still used (staged: %v, published: %v) by '%s' node", + vol.VolID, vol.IsStaged, vol.IsPublished, vol.NodeID) + } + + vol.IsAttached = false + if err := hp.updateVolume(vol.VolID, vol); err != nil { + return nil, status.Errorf(codes.Internal, "could not update volume %s: %v", vol.VolID, err) + } + + return &csi.ControllerUnpublishVolumeResponse{}, nil } func (hp *hostPath) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) { @@ -677,6 +768,9 @@ func (hp *hostPath) getControllerServiceCapabilities() []*csi.ControllerServiceC csi.ControllerServiceCapability_RPC_EXPAND_VOLUME, csi.ControllerServiceCapability_RPC_VOLUME_CONDITION, } + if hp.enableAttach { + cl = append(cl, csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME) + } } var csc []*csi.ControllerServiceCapability diff --git a/pkg/hostpath/hostpath.go b/pkg/hostpath/hostpath.go index 39939232d..23adbacee 100644 --- a/pkg/hostpath/hostpath.go +++ b/pkg/hostpath/hostpath.go @@ -62,23 +62,28 @@ type hostPath struct { // gRPC calls involving any of the fields below must be serialized // by locking this mutex before starting. Internal helper // functions assume that the mutex has been locked. - mutex sync.Mutex - volumes map[string]hostPathVolume - snapshots map[string]hostPathSnapshot - capacity Capacity + mutex sync.Mutex + volumes map[string]hostPathVolume + snapshots map[string]hostPathSnapshot + capacity Capacity + enableAttach bool } type hostPathVolume struct { - VolName string `json:"volName"` - VolID string `json:"volID"` - VolSize int64 `json:"volSize"` - VolPath string `json:"volPath"` - VolAccessType accessType `json:"volAccessType"` - ParentVolID string `json:"parentVolID,omitempty"` - ParentSnapID string `json:"parentSnapID,omitempty"` - Ephemeral bool `json:"ephemeral"` - NodeID string `json:"nodeID"` - Kind string `json:"kind"` + VolName string `json:"volName"` + VolID string `json:"volID"` + VolSize int64 `json:"volSize"` + VolPath string `json:"volPath"` + VolAccessType accessType `json:"volAccessType"` + ParentVolID string `json:"parentVolID,omitempty"` + ParentSnapID string `json:"parentSnapID,omitempty"` + Ephemeral bool `json:"ephemeral"` + NodeID string `json:"nodeID"` + Kind string `json:"kind"` + ReadOnlyAttach bool `json:"readOnlyAttach"` + IsAttached bool `json:"isAttached"` + IsStaged bool `json:"isStaged"` + IsPublished bool `json:"isPublished"` } type hostPathSnapshot struct { @@ -105,7 +110,7 @@ const ( snapshotExt = ".snap" ) -func NewHostPathDriver(driverName, nodeID, endpoint string, ephemeral bool, maxVolumesPerNode int64, version string, capacity Capacity) (*hostPath, error) { +func NewHostPathDriver(driverName, nodeID, endpoint string, ephemeral bool, maxVolumesPerNode int64, version string, capacity Capacity, enableAttach bool) (*hostPath, error) { if driverName == "" { return nil, errors.New("no driver name provided") } @@ -136,6 +141,7 @@ func NewHostPathDriver(driverName, nodeID, endpoint string, ephemeral bool, maxV ephemeral: ephemeral, maxVolumesPerNode: maxVolumesPerNode, capacity: capacity, + enableAttach: enableAttach, volumes: map[string]hostPathVolume{}, snapshots: map[string]hostPathSnapshot{}, diff --git a/pkg/hostpath/nodeserver.go b/pkg/hostpath/nodeserver.go index bb7e8c9ef..cef777347 100644 --- a/pkg/hostpath/nodeserver.go +++ b/pkg/hostpath/nodeserver.go @@ -79,6 +79,10 @@ func (hp *hostPath) NodePublishVolume(ctx context.Context, req *csi.NodePublishV return nil, status.Error(codes.NotFound, err.Error()) } + if !ephemeralVolume && !vol.IsStaged { + return nil, status.Errorf(codes.FailedPrecondition, "Volume ('%s') must be staged before publishing.", vol.VolID) + } + if req.GetVolumeCapability().GetBlock() != nil { if vol.VolAccessType != blockAccess { return nil, status.Error(codes.InvalidArgument, "cannot publish a non-block volume as block volume") @@ -179,6 +183,7 @@ func (hp *hostPath) NodePublishVolume(ctx context.Context, req *csi.NodePublishV } vol.NodeID = hp.nodeID + vol.IsPublished = true hp.updateVolume(req.GetVolumeId(), vol) return &csi.NodePublishVolumeResponse{}, nil } @@ -229,6 +234,11 @@ func (hp *hostPath) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpubl if err := hp.deleteVolume(volumeID); err != nil && !os.IsNotExist(err) { return nil, fmt.Errorf("failed to delete volume: %w", err) } + } else { + vol.IsPublished = false + if err := hp.updateVolume(vol.VolID, vol); err != nil { + return nil, fmt.Errorf("could not update volume %s: %w", vol.VolID, err) + } } return &csi.NodeUnpublishVolumeResponse{}, nil @@ -247,6 +257,21 @@ func (hp *hostPath) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolum return nil, status.Error(codes.InvalidArgument, "Volume Capability missing in request") } + vol, err := hp.getVolumeByID(req.VolumeId) + if err != nil { + return nil, status.Error(codes.NotFound, err.Error()) + } + + if hp.enableAttach && !vol.IsAttached { + return nil, status.Errorf(codes.Internal, "ControllerPublishVolume must be called on volume '%s' before staging on node", + vol.VolID) + } + + vol.IsStaged = true + if err := hp.updateVolume(vol.VolID, vol); err != nil { + return nil, fmt.Errorf("could not update volume %s: %w", vol.VolID, err) + } + return &csi.NodeStageVolumeResponse{}, nil } @@ -260,6 +285,19 @@ func (hp *hostPath) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageV return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } + vol, err := hp.getVolumeByID(req.VolumeId) + if err != nil { + return nil, status.Error(codes.NotFound, err.Error()) + } + + if vol.IsPublished { + return nil, status.Errorf(codes.Internal, "Volume '%s' is still pulished on '%s' node", vol.VolID, vol.NodeID) + } + vol.IsStaged = false + if err := hp.updateVolume(vol.VolID, vol); err != nil { + return nil, fmt.Errorf("could not update volume %s: %w", vol.VolID, err) + } + return &csi.NodeUnstageVolumeResponse{}, nil } diff --git a/pkg/hostpath/server.go b/pkg/hostpath/server.go index 882f80b1a..8687fe191 100644 --- a/pkg/hostpath/server.go +++ b/pkg/hostpath/server.go @@ -17,10 +17,7 @@ limitations under the License. package hostpath import ( - "fmt" - "net" - "os" - "strings" + "encoding/json" "sync" "github.com/golang/glog" @@ -28,6 +25,7 @@ import ( "google.golang.org/grpc" "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/kubernetes-csi/csi-driver-host-path/internal/endpoint" "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" ) @@ -37,8 +35,9 @@ func NewNonBlockingGRPCServer() *nonBlockingGRPCServer { // NonBlocking server type nonBlockingGRPCServer struct { - wg sync.WaitGroup - server *grpc.Server + wg sync.WaitGroup + server *grpc.Server + cleanup func() } func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) { @@ -56,27 +55,16 @@ func (s *nonBlockingGRPCServer) Wait() { func (s *nonBlockingGRPCServer) Stop() { s.server.GracefulStop() + s.cleanup() } func (s *nonBlockingGRPCServer) ForceStop() { s.server.Stop() + s.cleanup() } -func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) { - - proto, addr, err := parseEndpoint(endpoint) - if err != nil { - glog.Fatal(err.Error()) - } - - if proto == "unix" { - addr = "/" + addr - if err := os.Remove(addr); err != nil && !os.IsNotExist(err) { //nolint: vetshadow - glog.Fatalf("Failed to remove %s, error: %s", addr, err.Error()) - } - } - - listener, err := net.Listen(proto, addr) +func (s *nonBlockingGRPCServer) serve(ep string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) { + listener, cleanup, err := endpoint.Listen(ep) if err != nil { glog.Fatalf("Failed to listen: %v", err) } @@ -86,6 +74,7 @@ func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, c } server := grpc.NewServer(opts...) s.server = server + s.cleanup = cleanup if ids != nil { csi.RegisterIdentityServer(server, ids) @@ -103,27 +92,69 @@ func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, c } -func parseEndpoint(ep string) (string, string, error) { - if strings.HasPrefix(strings.ToLower(ep), "unix://") || strings.HasPrefix(strings.ToLower(ep), "tcp://") { - s := strings.SplitN(ep, "://", 2) - if s[1] != "" { - return s[0], s[1], nil - } - } - return "", "", fmt.Errorf("Invalid endpoint: %v", ep) -} - func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + pri := glog.Level(3) if info.FullMethod == "/csi.v1.Identity/Probe" { - return handler(ctx, req) + // This call occurs frequently, therefore it only gets log at level 5. + pri = 5 + } + glog.V(pri).Infof("GRPC call: %s", info.FullMethod) + + v5 := glog.V(5) + if v5 { + v5.Infof("GRPC request: %s", protosanitizer.StripSecrets(req)) } - glog.V(3).Infof("GRPC call: %s", info.FullMethod) - glog.V(5).Infof("GRPC request: %+v", protosanitizer.StripSecrets(req)) resp, err := handler(ctx, req) if err != nil { + // Always log errors. Probably not useful though without the method name?! glog.Errorf("GRPC error: %v", err) - } else { - glog.V(5).Infof("GRPC response: %+v", protosanitizer.StripSecrets(resp)) } + + if v5 { + v5.Infof("GRPC response: %s", protosanitizer.StripSecrets(resp)) + + // In JSON format, intentionally logging without stripping secret + // fields due to below reasons: + // - It's technically complicated because protosanitizer.StripSecrets does + // not construct new objects, it just wraps the existing ones with a custom + // String implementation. Therefore a simple json.Marshal(protosanitizer.StripSecrets(resp)) + // will still include secrets because it reads fields directly + // and more complicated code would be needed. + // - This is indeed for verification in mock e2e tests. though + // currently no test which look at secrets, but we might. + // so conceptually it seems better to me to include secrets. + logGRPCJson(info.FullMethod, req, resp, err) + } + return resp, err } + +// logGRPCJson logs the called GRPC call details in JSON format +func logGRPCJson(method string, request, reply interface{}, err error) { + // Log JSON with the request and response for easier parsing + logMessage := struct { + Method string + Request interface{} + Response interface{} + // Error as string, for backward compatibility. + // "" on no error. + Error string + // Full error dump, to be able to parse out full gRPC error code and message separately in a test. + FullError error + }{ + Method: method, + Request: request, + Response: reply, + FullError: err, + } + + if err != nil { + logMessage.Error = err.Error() + } + + msg, err := json.Marshal(logMessage) + if err != nil { + logMessage.Error = err.Error() + } + glog.V(5).Infof("gRPCCall: %s\n", msg) +}