Skip to content
Merged
14 changes: 13 additions & 1 deletion cmd/hostpathplugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ import (
"path"
"syscall"

"k8s.io/klog/v2"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-driver-host-path/internal/proxy"
"github.com/kubernetes-csi/csi-driver-host-path/pkg/hostpath"
"k8s.io/klog/v2"
)

var (
Expand Down Expand Up @@ -55,6 +57,8 @@ func main() {

flag.BoolVar(&cfg.EnableVolumeExpansion, "enable-volume-expansion", true, "Enables volume expansion feature.")
flag.BoolVar(&cfg.EnableControllerModifyVolume, "enable-controller-modify-volume", false, "Enables Controller modify volume feature.")
flag.BoolVar(&cfg.EnableSnapshotMetadata, "enable-snapshot-metadata", false, "Enables Snapshot Metadata service.")
snapshotMetadataBlockType := flag.String("snapshot-metadata-block-type", "FIXED_LENGTH", "Expected Snapshot Metadata block type in response. Allowed valid types are FIXED_LENGTH or VARIABLE_LENGTH. If not specified, FIXED_LENGTH is used by default.")
flag.Var(&cfg.AcceptedMutableParameterNames, "accepted-mutable-parameter-names", "Comma separated list of parameter names that can be modified on a persistent volume. This is only used when enable-controller-modify-volume is true. If unset, all parameters are mutable.")
flag.BoolVar(&cfg.DisableControllerExpansion, "disable-controller-expansion", false, "Disables Controller volume expansion capability.")
flag.BoolVar(&cfg.DisableNodeExpansion, "disable-node-expansion", false, "Disables Node volume expansion capability.")
Expand Down Expand Up @@ -106,6 +110,14 @@ func main() {
cfg.MaxVolumeExpansionSizeNode = cfg.MaxVolumeSize
}

// validate snapshot-metadata-type arg block type
bt, ok := csi.BlockMetadataType_value[*snapshotMetadataBlockType]
if !ok {
fmt.Printf("invalid snapshot-metadata-block-type passed, please pass one of the - FIXED_LENGTH, VARIABLE_LENGTH")
Copy link

@pnhoang75 pnhoang75 Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use upper case for beginning of the error/exit message to conform with other error messages in the file. And probably rephase "please pass one of the -..." to "valid types are FIXED_LENGTH, VARIABLE_LENGTH".

os.Exit(1)
}
cfg.SnapshotMetadataBlockType = csi.BlockMetadataType(bt)

driver, err := hostpath.NewHostPathDriver(cfg)
if err != nil {
fmt.Printf("Failed to initialize driver: %s", err.Error())
Expand Down
10 changes: 8 additions & 2 deletions pkg/hostpath/hostpath.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type hostPath struct {
csi.UnimplementedControllerServer
csi.UnimplementedNodeServer
csi.UnimplementedGroupControllerServer
csi.UnimplementedSnapshotMetadataServer
config Config

// gRPC calls involving any of the fields below must be serialized
Expand All @@ -80,6 +81,8 @@ type Config struct {
EnableTopology bool
EnableVolumeExpansion bool
EnableControllerModifyVolume bool
EnableSnapshotMetadata bool
SnapshotMetadataBlockType csi.BlockMetadataType
AcceptedMutableParameterNames StringArray
DisableControllerExpansion bool
DisableNodeExpansion bool
Expand Down Expand Up @@ -129,8 +132,11 @@ func NewHostPathDriver(cfg Config) (*hostPath, error) {

func (hp *hostPath) Run() error {
s := NewNonBlockingGRPCServer()
// hp itself implements ControllerServer, NodeServer, and IdentityServer.
s.Start(hp.config.Endpoint, hp, hp, hp, hp)
var sms csi.SnapshotMetadataServer
if hp.config.EnableSnapshotMetadata {
sms = hp
}
s.Start(hp.config.Endpoint, hp, hp, hp, hp, sms)
s.Wait()

return nil
Expand Down
9 changes: 9 additions & 0 deletions pkg/hostpath/identityserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ func (hp *hostPath) GetPluginCapabilities(ctx context.Context, req *csi.GetPlugi
},
})
}
if hp.config.EnableSnapshotMetadata {
caps = append(caps, &csi.PluginCapability{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_SNAPSHOT_METADATA_SERVICE,
},
},
})
}

return &csi.GetPluginCapabilitiesResponse{Capabilities: caps}, nil
}
9 changes: 6 additions & 3 deletions pkg/hostpath/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ type nonBlockingGRPCServer struct {
cleanup func()
}

func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer, gcs csi.GroupControllerServer) {
func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer, gcs csi.GroupControllerServer, sms csi.SnapshotMetadataServer) {

s.wg.Add(1)

go s.serve(endpoint, ids, cs, ns, gcs)
go s.serve(endpoint, ids, cs, ns, gcs, sms)

return
}
Expand All @@ -63,7 +63,7 @@ func (s *nonBlockingGRPCServer) ForceStop() {
s.cleanup()
}

func (s *nonBlockingGRPCServer) serve(ep string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer, gcs csi.GroupControllerServer) {
func (s *nonBlockingGRPCServer) serve(ep string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer, gcs csi.GroupControllerServer, sms csi.SnapshotMetadataServer) {
listener, cleanup, err := endpoint.Listen(ep)
if err != nil {
klog.Fatalf("Failed to listen: %v", err)
Expand All @@ -88,6 +88,9 @@ func (s *nonBlockingGRPCServer) serve(ep string, ids csi.IdentityServer, cs csi.
if gcs != nil {
csi.RegisterGroupControllerServer(server, gcs)
}
if sms != nil {
csi.RegisterSnapshotMetadataServer(server, sms)
}

klog.Infof("Listening for connections on address: %#v", listener.Addr())

Expand Down
231 changes: 231 additions & 0 deletions pkg/hostpath/snapshotmetadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package hostpath

import (
"bytes"
"io"
"os"

"github.com/container-storage-interface/spec/lib/go/csi"
"golang.org/x/net/context"
"k8s.io/klog/v2"
)

// NOTE: This implementation of SnapshotMetadata service is used for demo and CI testing purpose only.
// This should not be used in production or as an example about how to write a real driver.

type fileBlockReader struct {
base *os.File
target *os.File
offset int64
blockSize int64
blockMetadataType csi.BlockMetadataType
maxResult int32
}

func newFileBlockReader(
basePath,
targetPath string,
startingOffset int64,
blockSize int64,
blockMetadataType csi.BlockMetadataType,
maxResult int32,
) (*fileBlockReader, error) {
base, target, err := openFiles(basePath, targetPath)
if err != nil {
return nil, err
}

return &fileBlockReader{
base: base,
target: target,
offset: startingOffset,
blockSize: blockSize,
blockMetadataType: blockMetadataType,
maxResult: maxResult,
}, nil
}

func (cb *fileBlockReader) seekToStartingOffset() error {
if _, err := cb.target.Seek(cb.offset, io.SeekStart); err != nil {
return err
}
if cb.base == nil {
return nil
}
if _, err := cb.base.Seek(cb.offset, io.SeekStart); err != nil {
return err
}
return nil
}

func (cb *fileBlockReader) Close() error {
if cb.base != nil {
if err := cb.base.Close(); err != nil {
return err
}
}
if cb.target != nil {
if err := cb.target.Close(); err != nil {
return err
}
}
return nil
}

func openFiles(basePath, targetPath string) (base, target *os.File, err error) {
target, err = os.Open(targetPath)
if err != nil {
return nil, nil, err
}
if basePath == "" {
return nil, target, nil
}
base, err = os.Open(basePath)
if err != nil {
target.Close()
return nil, nil, err
}

return base, target, nil
}

// getChangedBlockMetadata reads base and target files, compare block differences between them
// and returns list of changed block metadata. It reads all the blocks till it reaches EOF or size of changed block
// metadata list <= maxSize.
func (cb *fileBlockReader) getChangedBlockMetadata(ctx context.Context) ([]*csi.BlockMetadata, error) {
if cb.base == nil {
klog.V(4).Infof("finding allocated blocks by file: %s", cb.target.Name())
} else {
klog.V(4).Infof("finding changed blocks between two files: %s, %s", cb.base.Name(), cb.target.Name())
}

blockIndex := cb.offset / cb.blockSize
sBuffer := make([]byte, cb.blockSize)
tBuffer := make([]byte, cb.blockSize)
eofBaseFile, eofTargetFile := false, false

changedBlocks := []*csi.BlockMetadata{}

// Read blocks and compare them. Create the list of changed blocks metadata.
// Once the number of blocks reaches to maxResult, return the result and
// compute next batch of blocks.
for int32(len(changedBlocks)) < cb.maxResult {
select {
case <-ctx.Done():
klog.V(4).Infof("handling cancellation signal, closing goroutine")
return nil, ctx.Err()
default:
}
targetReadBytes, eofTarget, err := readFileBlock(cb.target, tBuffer, eofTargetFile)
if err != nil {
return nil, err
}
eofTargetFile = eofTarget

// If base is nil, return blocks allocated by target file.
if cb.base == nil {
if eofTargetFile {
return changedBlocks, io.EOF
}
// if VARIABLE_LENGTH type is enabled, return blocks extend instead of individual blocks.
blockMetadata := createBlockMetadata(blockIndex, cb.blockSize)
if extendBlock(changedBlocks, csi.BlockMetadataType(cb.blockMetadataType), blockIndex, cb.blockSize) {
changedBlocks[len(changedBlocks)-1].SizeBytes += cb.blockSize
cb.offset += cb.blockSize
blockIndex++
continue
}
changedBlocks = append(changedBlocks, blockMetadata)
cb.offset += cb.blockSize
blockIndex++
continue
}

baseReadBytes, eofBase, err := readFileBlock(cb.base, sBuffer, eofBaseFile)
if err != nil {
return nil, err
}
eofBaseFile = eofBase

// If both files have reached EOF, exit the loop.
if eofBaseFile && eofTargetFile {
klog.V(4).Infof("reached end of the files")
return changedBlocks, io.EOF
}

// Compare the two blocks and add result.
// Even if one of the file reaches to end, continue to add block metadata of other file.
if blockChanged(sBuffer[:baseReadBytes], tBuffer[:targetReadBytes]) {
blockMetadata := createBlockMetadata(blockIndex, cb.blockSize)
// if VARIABLE_LEGTH type is enabled, check if blocks are adjacent,
// extend the previous block if adjacent blocks found instead of adding new entry.
if extendBlock(changedBlocks, csi.BlockMetadataType(cb.blockMetadataType), blockIndex, cb.blockSize) {
changedBlocks[len(changedBlocks)-1].SizeBytes += cb.blockSize
cb.offset += cb.blockSize
blockIndex++
continue
}
changedBlocks = append(changedBlocks, blockMetadata)
}

cb.offset += cb.blockSize
blockIndex++
}
return changedBlocks, nil
}

// readFileBlock reads blocks from a file.
func readFileBlock(file *os.File, buffer []byte, eof bool) (int, bool, error) {
if eof {
return 0, true, nil
}

bytesRead, err := file.Read(buffer)
if err != nil && err != io.EOF {
return 0, false, err
}

return bytesRead, err == io.EOF, nil
}

func blockChanged(baseBlock, targetBlock []byte) bool {
return !bytes.Equal(baseBlock, targetBlock)
}

func createBlockMetadata(blockIndex, blockSize int64) *csi.BlockMetadata {
return &csi.BlockMetadata{
ByteOffset: blockIndex * blockSize,
SizeBytes: blockSize,
}
}

func extendBlock(changedBlocks []*csi.BlockMetadata, metadataType csi.BlockMetadataType, blockIndex, blockSize int64) bool {
blockMetadata := createBlockMetadata(blockIndex, blockSize)
// if VARIABLE_LEGTH type is enabled, check if blocks are adjacent,
// extend the previous block if adjacent blocks found instead of adding new entry.
if len(changedBlocks) < 1 {
return false
}
lastBlock := changedBlocks[len(changedBlocks)-1]
if blockMetadata.ByteOffset == lastBlock.ByteOffset+lastBlock.SizeBytes &&
metadataType == csi.BlockMetadataType_VARIABLE_LENGTH {
return true
}
return false
}
Loading