Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,24 @@

This repository hosts the CSI Hostpath driver and all of its build and dependent configuration files to deploy the driver.

---
*WARNING: This driver is just a demo implementation and is used for CI testing. This has many fake implementations and other non-standard best practices, and should not be used as an example of how to write a real driver.
---

## Pre-requisite
- Kubernetes cluster
- Running version 1.13 or later
- Access to terminal with `kubectl` installed
- For Kubernetes 1.17 or later, the VolumeSnapshot beta CRDs and Snapshot Controller must be installed as part of the cluster deployment (see Kubernetes 1.17+ deployment instructions)

## Features

The driver can provide empty directories that are backed by the same filesystem as EmptyDir volumes. In addition, it can provide raw block volumes that are backed by a single file in that same filesystem and bound to a loop device.

[Various command line parameters](cmd/hostpathplugin/maing.go) influence the behavior of the driver. This is relevant in particular for the end-to-end testing that this driver is used for in Kubernetes.

Usually, the driver implements all CSI operations itself. When deployed with the `-proxy-endpoint` parameter, it instead proxies all incoming connections for a CSI driver that is [embedded inside the Kubernetes E2E test suite](https://github.com/kubernetes/kubernetes/tree/master/test/e2e/storage/drivers/csi-test) and used for mocking a CSI driver [with callbacks provided by certain tests](https://github.com/kubernetes/kubernetes/blob/5ad79eae2dcbf33df3b35c48ec993d30fbda46dd/test/e2e/storage/csi_mock_volume.go#L110).

## Deployment
Deployment varies depending on the Kubernetes version your cluster is running:
- [Deployment for Kubernetes 1.17 and later](docs/deploy-1.17-and-later.md)
Expand Down
36 changes: 34 additions & 2 deletions cmd/hostpathplugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@ limitations under the License.
package main

import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"path"
"syscall"

"github.com/golang/glog"
"github.com/kubernetes-csi/csi-driver-host-path/internal/proxy"
"github.com/kubernetes-csi/csi-driver-host-path/pkg/hostpath"
)

Expand All @@ -30,7 +35,7 @@ func init() {
}

var (
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
csiEndpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
driverName = flag.String("drivername", "hostpath.csi.k8s.io", "name of the driver")
nodeID = flag.String("nodeid", "", "node id")
ephemeral = flag.Bool("ephemeral", false, "publish volumes in ephemeral mode even if kubelet did not ask for it (only needed for Kubernetes 1.15)")
Expand All @@ -41,6 +46,10 @@ var (
flag.Var(c, "capacity", "Simulate storage capacity. The parameter is <kind>=<quantity> where <kind> is the value of a 'kind' storage class parameter and <quantity> is the total amount of bytes for that kind. The flag may be used multiple times to configure different kinds.")
return c
}()
enableAttach = flag.Bool("enable-attach", false, "Enables RPC_PUBLISH_UNPUBLISH_VOLUME capability.")
// The proxy-endpoint option is intended to used by the Kubernetes E2E test suite
// for proxying incoming calls to the embedded mock CSI driver.
proxyEndpoint = flag.String("proxy-endpoint", "", "Instead of running the CSI driver code, just proxy connections from csiEndpoint to the given listening socket.")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need some documentation on how to use this proxyEndpoint?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't imagine who besides us in k/k e2e test would want to use this, so I don't think that we need end-user documentation in the README for this. For those who want to know more, there's always the source code...

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will disagree that just because something is for internal testing we shouldn't document it (and note it as such). It doesn't have to be elaborate, and even just a pointer to the tests using it could be enough.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where would you put that?

The main README.md has no documentation whatsoever about driver command line parameters. I'm really not keen on having to document all existing parameters and adding just this one there would be inconsistent. Documenting all parameters is fine and appropriate for the sidecars, but for this driver I still think it is overkill.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment above this flag is fine with me.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@avalluri can you add this section after "Pre-requisite"?

Features

The driver can provide empty directories that are backed by the same filesystem as EmptyDir volumes. In addition, it can provide raw block volumes that are backed by a single file in that same filesystem and bound to a loop device.

Various command line parameters influence the behavior of the driver. This is relevant in particular for the end-to-end testing that this driver is used for in Kubernetes.

Usually, the driver implements all CSI operations itself. When deployed with the -proxy-endpoint parameter, it instead proxies all incoming connections for a CSI driver that is embedded inside the Kubernetes E2E test suite and used for mocking a CSI driver with callbacks provided by certain tests.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@avalluri can you add this section after "Pre-requisite"?

Done. Added the 'Features' section.

// Set by the build process
version = ""
)
Expand All @@ -58,7 +67,30 @@ func main() {
fmt.Fprintln(os.Stderr, "Deprecation warning: The ephemeral flag is deprecated and should only be used when deploying on Kubernetes 1.15. It will be removed in the future.")
}

driver, err := hostpath.NewHostPathDriver(*driverName, *nodeID, *endpoint, *ephemeral, *maxVolumesPerNode, version, capacity)
if *proxyEndpoint != "" {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General question, it seems like this proxy is standalone and doesn't use any methods of the hostpath driver. Would it make sense to have it be a separate binary?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed various options when adding this functionality to the mock driver binary; the situation was the same. We considered the whole range (separate repo, separate binary in its own image, separate binary in the same image, new feature in an existing binary and image) and in the end agreed that the last option was the simplest approach.

Everything else comes with additional overhead, both for managing the releases and also regarding image sizes needed during testing (Go runtime only linked onced, same image used for multiple tests).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment above this flag is fine with me.

@msau42 Added the comment explaining the usage of this nuew flag.

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
closer, err := proxy.Run(ctx, *csiEndpoint, *proxyEndpoint)
if err != nil {
glog.Fatalf("failed to run proxy: %v", err)
}
defer closer.Close()

// Wait for signal
sigc := make(chan os.Signal, 1)
sigs := []os.Signal{
syscall.SIGTERM,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGQUIT,
}
signal.Notify(sigc, sigs...)

<-sigc
return
}

driver, err := hostpath.NewHostPathDriver(*driverName, *nodeID, *csiEndpoint, *ephemeral, *maxVolumesPerNode, version, capacity, *enableAttach)
if err != nil {
fmt.Printf("Failed to initialize driver: %s", err.Error())
os.Exit(1)
Expand Down
57 changes: 57 additions & 0 deletions internal/endpoint/endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
Copyright 2020 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package endpoint

import (
"fmt"
"net"
"os"
"strings"
)

func Parse(ep string) (string, string, error) {
if strings.HasPrefix(strings.ToLower(ep), "unix://") || strings.HasPrefix(strings.ToLower(ep), "tcp://") {
s := strings.SplitN(ep, "://", 2)
if s[1] != "" {
return s[0], s[1], nil
}
return "", "", fmt.Errorf("Invalid endpoint: %v", ep)
}
// Assume everything else is a file path for a Unix Domain Socket.
return "unix", ep, nil
}

func Listen(endpoint string) (net.Listener, func(), error) {
proto, addr, err := Parse(endpoint)
if err != nil {
return nil, nil, err
}

cleanup := func() {}
if proto == "unix" {
addr = "/" + addr
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) { //nolint: vetshadow
return nil, nil, fmt.Errorf("%s: %q", addr, err)
}
cleanup = func() {
os.Remove(addr)
}
}

l, err := net.Listen(proto, addr)
return l, cleanup, err
}
146 changes: 146 additions & 0 deletions internal/proxy/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
Copyright 2020 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package proxy makes it possible to forward a listening socket in
// situations where the proxy cannot connect to some other address.
// Instead, it creates two listening sockets, pairs two incoming
// connections and then moves data back and forth. This matches
// the behavior of the following socat command:
// socat -d -d -d UNIX-LISTEN:/tmp/socat,fork TCP-LISTEN:9000,reuseport
//
// The advantage over that command is that both listening
// sockets are always open, in contrast to the socat solution
// where the TCP port is only open when there actually is a connection
// available.
//
// To establish a connection, someone has to poll the proxy with a dialer.
package proxy

import (
"context"
"fmt"
"io"
"net"

"github.com/golang/glog"

"github.com/kubernetes-csi/csi-driver-host-path/internal/endpoint"
)

// New listens on both endpoints and starts accepting connections
// until closed or the context is done.
func Run(ctx context.Context, endpoint1, endpoint2 string) (io.Closer, error) {
proxy := &proxy{}
failedProxy := proxy
defer func() {
if failedProxy != nil {
failedProxy.Close()
}
}()

proxy.ctx, proxy.cancel = context.WithCancel(ctx)

var err error
proxy.s1, proxy.cleanup1, err = endpoint.Listen(endpoint1)
if err != nil {
return nil, fmt.Errorf("listen %s: %v", endpoint1, err)
}
proxy.s2, proxy.cleanup2, err = endpoint.Listen(endpoint2)
if err != nil {
return nil, fmt.Errorf("listen %s: %v", endpoint2, err)
}

glog.V(3).Infof("proxy listening on %s and %s", endpoint1, endpoint2)

go func() {
for {
// We block on the first listening socket.
// The Linux kernel proactively accepts connections
// on the second one which we will take over below.
conn1 := accept(proxy.ctx, proxy.s1, endpoint1)
if conn1 == nil {
// Done, shut down.
glog.V(5).Infof("proxy endpoint %s closed, shutting down", endpoint1)
return
}
conn2 := accept(proxy.ctx, proxy.s2, endpoint2)
if conn2 == nil {
// Done, shut down. The already accepted
// connection gets closed.
glog.V(5).Infof("proxy endpoint %s closed, shutting down and close established connection", endpoint2)
conn1.Close()
return
}

glog.V(3).Infof("proxy established a new connection between %s and %s", endpoint1, endpoint2)
go copy(conn1, conn2, endpoint1, endpoint2)
go copy(conn2, conn1, endpoint2, endpoint1)
}
}()

failedProxy = nil
return proxy, nil
}

type proxy struct {
ctx context.Context
cancel func()
s1, s2 net.Listener
cleanup1, cleanup2 func()
}

func (p *proxy) Close() error {
if p.cancel != nil {
p.cancel()
}
if p.s1 != nil {
p.s1.Close()
}
if p.s2 != nil {
p.s2.Close()
}
if p.cleanup1 != nil {
p.cleanup1()
}
if p.cleanup2 != nil {
p.cleanup2()
}
return nil
}

func copy(from, to net.Conn, fromEndpoint, toEndpoint string) {
glog.V(5).Infof("starting to copy %s -> %s", fromEndpoint, toEndpoint)
// Signal recipient that no more data is going to come.
// This also stops reading from it.
defer to.Close()
// Copy data until EOF.
cnt, err := io.Copy(to, from)
glog.V(5).Infof("done copying %s -> %s: %d bytes, %v", fromEndpoint, toEndpoint, cnt, err)
}

func accept(ctx context.Context, s net.Listener, endpoint string) net.Conn {
for {
c, err := s.Accept()
if err == nil {
return c
}
// Ignore error if we are shutting down.
if ctx.Err() != nil {
return nil
}
glog.V(3).Infof("accept on %s failed: %v", endpoint, err)
}
}
Loading