From 57eff9b9d6c5723841dd5ba656a91a7c51c6fed0 Mon Sep 17 00:00:00 2001 From: Prasad Ghangal Date: Wed, 9 Oct 2024 18:56:08 +0530 Subject: [PATCH 01/12] Implement SnapshotMetadataService Signed-off-by: Prasad Ghangal --- pkg/hostpath/hostpath.go | 90 +++++++++++++++++++++- pkg/hostpath/identityserver.go | 7 ++ pkg/hostpath/server.go | 9 ++- pkg/hostpath/snapshotmetadataserver.go | 102 +++++++++++++++++++++++++ pkg/state/state.go | 3 + 5 files changed, 207 insertions(+), 4 deletions(-) create mode 100644 pkg/hostpath/snapshotmetadataserver.go diff --git a/pkg/hostpath/hostpath.go b/pkg/hostpath/hostpath.go index 7e8c8d11e..2008d14fb 100644 --- a/pkg/hostpath/hostpath.go +++ b/pkg/hostpath/hostpath.go @@ -17,6 +17,7 @@ limitations under the License. package hostpath import ( + "bytes" "errors" "fmt" "io" @@ -26,6 +27,7 @@ import ( "sync" "github.com/container-storage-interface/spec/lib/go/csi" + "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/api/resource" @@ -54,6 +56,7 @@ type hostPath struct { csi.UnimplementedControllerServer csi.UnimplementedNodeServer csi.UnimplementedGroupControllerServer + csi.UnimplementedSnapshotMetadataServer config Config // gRPC calls involving any of the fields below must be serialized @@ -80,6 +83,7 @@ type Config struct { EnableTopology bool EnableVolumeExpansion bool EnableControllerModifyVolume bool + EnableSnapshotMetadata bool AcceptedMutableParameterNames StringArray DisableControllerExpansion bool DisableNodeExpansion bool @@ -130,7 +134,7 @@ func NewHostPathDriver(cfg Config) (*hostPath, error) { func (hp *hostPath) Run() error { s := NewNonBlockingGRPCServer() // hp itself implements ControllerServer, NodeServer, and IdentityServer. - s.Start(hp.config.Endpoint, hp, hp, hp, hp) + s.Start(hp.config.Endpoint, hp, hp, hp, hp, hp) s.Wait() return nil @@ -410,3 +414,87 @@ func (hp *hostPath) createSnapshotFromVolume(vol state.Volume, file string, opts return nil } + +func (hp *hostPath) getChangedBlockMetadata(ctx context.Context, sourcePath, targetPath string, startingOffset, blockSize int64, maxResult int32, changedBlocksChan chan<- []*csi.BlockMetadata) error { + klog.V(4).Infof("finding changed blocks between two files: %s, %s", sourcePath, targetPath) + defer close(changedBlocksChan) + // Open the two files + source, err := os.Open(sourcePath) + if err != nil { + klog.Errorf("failed to read file: %v", err) + return err + } + defer source.Close() + target, err := os.Open(targetPath) + if err != nil { + klog.Errorf("failed to read file: %v", err) + return err + } + defer target.Close() + + // Seek to the desired offset in both files + _, err = source.Seek(startingOffset, 0) + if err != nil { + klog.Errorf("failed to seek file: %v", err) + return err + } + _, err = target.Seek(startingOffset, 0) + if err != nil { + klog.Errorf("failed to seek file: %v", err) + return err + } + + // Read both files block by block and compare them + var blockIndex int64 + buffer1 := make([]byte, blockSize) + buffer2 := make([]byte, blockSize) + for { + changedBlocks := []*csi.BlockMetadata{} + // Read blocks and compare them. Create the list of changed blocks metadata. + // Once the number of blocks reaches to maxResult, return the result and + // compute next batch of blocks. + for i := 1; i <= int(maxResult); i++ { + select { + case <-ctx.Done(): // Detect cancellation from the client + klog.Infof("Stream canceled by client. Exiting goroutine.") + return nil + default: + // Read block from source and target files + n1, err1 := source.Read(buffer1) + n2, err2 := target.Read(buffer2) + + // If both files have reached EOF, exit the loop + if err1 == io.EOF && err2 == io.EOF { + klog.Infof("reached to end of the file\n") + return nil + } + if err1 != nil && err1 != io.EOF { + klog.Errorf("failed to read to buffer: %v", err) + return err1 + } + if err2 != nil && err2 != io.EOF { + klog.Errorf("failed to read to buffer: %v", err) + return err2 + } + + // If the number of bytes read differs, the files are different + if n1 != n2 { + klog.Infof("files differ in size at block %d\n", blockIndex) + return nil + } + + // Compare the two blocks and add result + if !bytes.Equal(buffer1[:n1], buffer2[:n2]) { + changedBlocks = append(changedBlocks, &csi.BlockMetadata{ + ByteOffset: blockIndex * blockSize, + SizeBytes: int64(blockSize), + }) + } + blockIndex++ + } + } + if len(changedBlocks) != 0 { + changedBlocksChan <- changedBlocks + } + } +} diff --git a/pkg/hostpath/identityserver.go b/pkg/hostpath/identityserver.go index 5e8426ae3..ace626f41 100644 --- a/pkg/hostpath/identityserver.go +++ b/pkg/hostpath/identityserver.go @@ -62,6 +62,13 @@ func (hp *hostPath) GetPluginCapabilities(ctx context.Context, req *csi.GetPlugi }, }, }, + { + Type: &csi.PluginCapability_Service_{ + Service: &csi.PluginCapability_Service{ + Type: csi.PluginCapability_Service_SNAPSHOT_METADATA_SERVICE, + }, + }, + }, } if hp.config.EnableTopology { caps = append(caps, &csi.PluginCapability{ diff --git a/pkg/hostpath/server.go b/pkg/hostpath/server.go index 97a3e4efa..7c6971d44 100644 --- a/pkg/hostpath/server.go +++ b/pkg/hostpath/server.go @@ -40,11 +40,11 @@ type nonBlockingGRPCServer struct { cleanup func() } -func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer, gcs csi.GroupControllerServer) { +func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer, gcs csi.GroupControllerServer, sms csi.SnapshotMetadataServer) { s.wg.Add(1) - go s.serve(endpoint, ids, cs, ns, gcs) + go s.serve(endpoint, ids, cs, ns, gcs, sms) return } @@ -63,7 +63,7 @@ func (s *nonBlockingGRPCServer) ForceStop() { s.cleanup() } -func (s *nonBlockingGRPCServer) serve(ep string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer, gcs csi.GroupControllerServer) { +func (s *nonBlockingGRPCServer) serve(ep string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer, gcs csi.GroupControllerServer, sms csi.SnapshotMetadataServer) { listener, cleanup, err := endpoint.Listen(ep) if err != nil { klog.Fatalf("Failed to listen: %v", err) @@ -88,6 +88,9 @@ func (s *nonBlockingGRPCServer) serve(ep string, ids csi.IdentityServer, cs csi. if gcs != nil { csi.RegisterGroupControllerServer(server, gcs) } + if sms != nil { + csi.RegisterSnapshotMetadataServer(server, sms) + } klog.Infof("Listening for connections on address: %#v", listener.Addr()) diff --git a/pkg/hostpath/snapshotmetadataserver.go b/pkg/hostpath/snapshotmetadataserver.go new file mode 100644 index 000000000..7a0ce2497 --- /dev/null +++ b/pkg/hostpath/snapshotmetadataserver.go @@ -0,0 +1,102 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hostpath + +import ( + "fmt" + + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/kubernetes-csi/csi-driver-host-path/pkg/state" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "k8s.io/klog/v2" +) + +func (hp *hostPath) GetMetadataAllocated(req *csi.GetMetadataAllocatedRequest, stream csi.SnapshotMetadata_GetMetadataAllocatedServer) error { + return nil +} + +func (hp *hostPath) GetMetadataDelta(req *csi.GetMetadataDeltaRequest, stream csi.SnapshotMetadata_GetMetadataDeltaServer) error { + ctx := stream.Context() + // Check arguments + baseSnapID := req.GetBaseSnapshotId() + targetSnapID := req.GetTargetSnapshotId() + if len(baseSnapID) == 0 { + return status.Error(codes.InvalidArgument, "BaseSnapshotID missing in request") + } + if len(targetSnapID) == 0 { + return status.Error(codes.InvalidArgument, "TargetSnapshotID missing in request") + } + + // Load snapshots + source, err := hp.state.GetSnapshotByID(baseSnapID) + if err != nil { + return status.Error(codes.Internal, "Cannot find the source snapshot") + } + target, err := hp.state.GetSnapshotByID(targetSnapID) + if err != nil { + return status.Error(codes.Internal, "Cannot find the target snapshot") + } + + if !source.ReadyToUse { + return status.Error(codes.Unavailable, fmt.Sprintf("snapshot %v is not yet ready to use", baseSnapID)) + } + if !target.ReadyToUse { + return status.Error(codes.Unavailable, fmt.Sprintf("snapshot %v is not yet ready to use", targetSnapID)) + } + + if source.VolID != target.VolID { + return status.Error(codes.InvalidArgument, "Snapshots don't belong to the same Volume") + } + vol, err := hp.state.GetVolumeByID(source.VolID) + if err != nil { + return err + } + if vol.VolAccessType != state.BlockAccess { + return status.Error(codes.InvalidArgument, "Source volume does not have block mode access type") + } + + changedBlocks := make(chan []*csi.BlockMetadata) + go func() { + err := hp.getChangedBlockMetadata(ctx, hp.getSnapshotPath(baseSnapID), hp.getSnapshotPath(targetSnapID), req.StartingOffset, state.BlockSizeBytes, req.MaxResults, changedBlocks) + if err != nil { + klog.Errorf("failed to get changed block metadata: %v", err) + } + }() + + for { + select { + case cb, ok := <-changedBlocks: + if !ok { + klog.V(4).Info("channel closed, returning") + return nil + } + resp := csi.GetMetadataDeltaResponse{ + BlockMetadataType: csi.BlockMetadataType_FIXED_LENGTH, + VolumeCapacityBytes: vol.VolSize, + BlockMetadata: cb, + } + if err := stream.Send(&resp); err != nil { + return err + } + case <-ctx.Done(): + klog.V(4).Info("received cancellation signal, returning") + return nil + } + } + return nil +} diff --git a/pkg/state/state.go b/pkg/state/state.go index 3dcf0256c..cd207a371 100644 --- a/pkg/state/state.go +++ b/pkg/state/state.go @@ -34,6 +34,9 @@ type AccessType int const ( MountAccess AccessType = iota BlockAccess + + // BlockSizeBytes represents the default block size. + BlockSizeBytes = 4096 ) type Volume struct { From 3b2b10257eb7da06f2deda4ddf84d7516a4a30ee Mon Sep 17 00:00:00 2001 From: Prasad Ghangal Date: Tue, 15 Oct 2024 12:17:59 +0530 Subject: [PATCH 02/12] Add unit tests Implement GetMetadataAllocated RPC handler Signed-off-by: Prasad Ghangal --- pkg/hostpath/hostpath.go | 87 ------ pkg/hostpath/snapshotmetadata.go | 183 +++++++++++ pkg/hostpath/snapshotmetadata_test.go | 414 +++++++++++++++++++++++++ pkg/hostpath/snapshotmetadataserver.go | 54 +++- 4 files changed, 650 insertions(+), 88 deletions(-) create mode 100644 pkg/hostpath/snapshotmetadata.go create mode 100644 pkg/hostpath/snapshotmetadata_test.go diff --git a/pkg/hostpath/hostpath.go b/pkg/hostpath/hostpath.go index 2008d14fb..2ee9aa4dd 100644 --- a/pkg/hostpath/hostpath.go +++ b/pkg/hostpath/hostpath.go @@ -17,7 +17,6 @@ limitations under the License. package hostpath import ( - "bytes" "errors" "fmt" "io" @@ -27,7 +26,6 @@ import ( "sync" "github.com/container-storage-interface/spec/lib/go/csi" - "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/api/resource" @@ -83,7 +81,6 @@ type Config struct { EnableTopology bool EnableVolumeExpansion bool EnableControllerModifyVolume bool - EnableSnapshotMetadata bool AcceptedMutableParameterNames StringArray DisableControllerExpansion bool DisableNodeExpansion bool @@ -414,87 +411,3 @@ func (hp *hostPath) createSnapshotFromVolume(vol state.Volume, file string, opts return nil } - -func (hp *hostPath) getChangedBlockMetadata(ctx context.Context, sourcePath, targetPath string, startingOffset, blockSize int64, maxResult int32, changedBlocksChan chan<- []*csi.BlockMetadata) error { - klog.V(4).Infof("finding changed blocks between two files: %s, %s", sourcePath, targetPath) - defer close(changedBlocksChan) - // Open the two files - source, err := os.Open(sourcePath) - if err != nil { - klog.Errorf("failed to read file: %v", err) - return err - } - defer source.Close() - target, err := os.Open(targetPath) - if err != nil { - klog.Errorf("failed to read file: %v", err) - return err - } - defer target.Close() - - // Seek to the desired offset in both files - _, err = source.Seek(startingOffset, 0) - if err != nil { - klog.Errorf("failed to seek file: %v", err) - return err - } - _, err = target.Seek(startingOffset, 0) - if err != nil { - klog.Errorf("failed to seek file: %v", err) - return err - } - - // Read both files block by block and compare them - var blockIndex int64 - buffer1 := make([]byte, blockSize) - buffer2 := make([]byte, blockSize) - for { - changedBlocks := []*csi.BlockMetadata{} - // Read blocks and compare them. Create the list of changed blocks metadata. - // Once the number of blocks reaches to maxResult, return the result and - // compute next batch of blocks. - for i := 1; i <= int(maxResult); i++ { - select { - case <-ctx.Done(): // Detect cancellation from the client - klog.Infof("Stream canceled by client. Exiting goroutine.") - return nil - default: - // Read block from source and target files - n1, err1 := source.Read(buffer1) - n2, err2 := target.Read(buffer2) - - // If both files have reached EOF, exit the loop - if err1 == io.EOF && err2 == io.EOF { - klog.Infof("reached to end of the file\n") - return nil - } - if err1 != nil && err1 != io.EOF { - klog.Errorf("failed to read to buffer: %v", err) - return err1 - } - if err2 != nil && err2 != io.EOF { - klog.Errorf("failed to read to buffer: %v", err) - return err2 - } - - // If the number of bytes read differs, the files are different - if n1 != n2 { - klog.Infof("files differ in size at block %d\n", blockIndex) - return nil - } - - // Compare the two blocks and add result - if !bytes.Equal(buffer1[:n1], buffer2[:n2]) { - changedBlocks = append(changedBlocks, &csi.BlockMetadata{ - ByteOffset: blockIndex * blockSize, - SizeBytes: int64(blockSize), - }) - } - blockIndex++ - } - } - if len(changedBlocks) != 0 { - changedBlocksChan <- changedBlocks - } - } -} diff --git a/pkg/hostpath/snapshotmetadata.go b/pkg/hostpath/snapshotmetadata.go new file mode 100644 index 000000000..9d473cc66 --- /dev/null +++ b/pkg/hostpath/snapshotmetadata.go @@ -0,0 +1,183 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hostpath + +import ( + "bytes" + "io" + "os" + + "github.com/container-storage-interface/spec/lib/go/csi" + "golang.org/x/net/context" + "k8s.io/klog/v2" +) + +func (hp *hostPath) getAllocatedBlockMetadata(ctx context.Context, filePath string, startingOffset, blockSize int64, maxResult int32, allocBlocksChan chan<- []*csi.BlockMetadata) error { + klog.V(4).Infof("finding allocated blocks in the file: %s", filePath) + defer close(allocBlocksChan) + + file, err := os.Open(filePath) + if err != nil { + return err + } + defer file.Close() + + if _, err := file.Seek(startingOffset, 0); err != nil { + return err + } + + return hp.compareBlocks(ctx, nil, file, startingOffset, blockSize, maxResult, allocBlocksChan) +} + +func (hp *hostPath) getChangedBlockMetadata(ctx context.Context, sourcePath, targetPath string, startingOffset, blockSize int64, maxResult int32, changedBlocksChan chan<- []*csi.BlockMetadata) error { + klog.V(4).Infof("finding changed blocks between two files: %s, %s", sourcePath, targetPath) + defer close(changedBlocksChan) + + source, target, err := openFiles(sourcePath, targetPath) + if err != nil { + return err + } + defer source.Close() + defer target.Close() + + if err := seekToOffset(source, target, startingOffset); err != nil { + return err + } + + return hp.compareBlocks(ctx, source, target, startingOffset, blockSize, maxResult, changedBlocksChan) +} + +func openFiles(sourcePath, targetPath string) (source, target *os.File, err error) { + source, err = os.Open(sourcePath) + if err != nil { + return nil, nil, err + } + + target, err = os.Open(targetPath) + if err != nil { + source.Close() + return nil, nil, err + } + + return source, target, nil +} + +func seekToOffset(source, target *os.File, startingOffset int64) error { + if _, err := source.Seek(startingOffset, 0); err != nil { + return err + } + if _, err := target.Seek(startingOffset, 0); err != nil { + return err + } + return nil +} + +// Compare blocks from source and target, and send changed blocks to channel. +// If source if nil, returns blocks allocated by target. +func (hp *hostPath) compareBlocks(ctx context.Context, source, target *os.File, startingOffset, blockSize int64, maxResult int32, changedBlocksChan chan<- []*csi.BlockMetadata) error { + blockIndex := startingOffset / blockSize + sBuffer := make([]byte, blockSize) + tBuffer := make([]byte, blockSize) + eofSourceFile, eofTargetFile := false, false + + for { + changedBlocks := []*csi.BlockMetadata{} + + // Read blocks and compare them. Create the list of changed blocks metadata. + // Once the number of blocks reaches to maxResult, return the result and + // compute next batch of blocks. + for int32(len(changedBlocks)) < maxResult { + select { + case <-ctx.Done(): + klog.V(4).Infof("handling cancellation signal, closing goroutine") + return nil + default: + targetReadBytes, eofTarget, err := readFileBlock(target, tBuffer, eofTargetFile) + if err != nil { + return err + } + eofTargetFile = eofTarget + + if source == nil { + // If source is nil, return blocks allocated by target file. + if eofTargetFile { + if len(changedBlocks) != 0 { + changedBlocksChan <- changedBlocks + } + return nil + } + changedBlocks = append(changedBlocks, createBlockMetadata(blockIndex, blockSize)) + blockIndex++ + continue + } + + sourceReadBytes, eofSource, err := readFileBlock(source, sBuffer, eofSourceFile) + if err != nil { + return err + } + eofSourceFile = eofSource + + // If both files have reached EOF, exit the loop. + if eofSourceFile && eofTargetFile { + klog.V(4).Infof("reached end of the files") + if len(changedBlocks) != 0 { + changedBlocksChan <- changedBlocks + } + return nil + } + + // Compare the two blocks and add result. + // Even if one of the file reaches to end, continue to add block metadata of other file. + if blockChanged(sBuffer[:sourceReadBytes], tBuffer[:targetReadBytes]) { + // TODO: Support for VARIABLE sized block metadata + changedBlocks = append(changedBlocks, createBlockMetadata(blockIndex, blockSize)) + } + + blockIndex++ + } + } + + if len(changedBlocks) > 0 { + changedBlocksChan <- changedBlocks + } + } +} + +// readFileBlock reads blocks from a file. +func readFileBlock(file *os.File, buffer []byte, eof bool) (int, bool, error) { + if eof { + return 0, true, nil + } + + bytesRead, err := file.Read(buffer) + if err != nil && err != io.EOF { + return 0, false, err + } + + return bytesRead, err == io.EOF, nil +} + +func blockChanged(sourceBlock, targetBlock []byte) bool { + return !bytes.Equal(sourceBlock, targetBlock) +} + +func createBlockMetadata(blockIndex, blockSize int64) *csi.BlockMetadata { + return &csi.BlockMetadata{ + ByteOffset: blockIndex * blockSize, + SizeBytes: blockSize, + } +} diff --git a/pkg/hostpath/snapshotmetadata_test.go b/pkg/hostpath/snapshotmetadata_test.go new file mode 100644 index 000000000..69063779d --- /dev/null +++ b/pkg/hostpath/snapshotmetadata_test.go @@ -0,0 +1,414 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hostpath + +import ( + "context" + "math" + "os" + "testing" + + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/kubernetes-csi/csi-driver-host-path/pkg/state" +) + +func TestGetChangedBlockMetadata(t *testing.T) { + testCases := []struct { + name string + sourceFileBlocks int + targetFileBlocks int + changedBlocks []int + startingOffset int64 + maxResult int32 + expectedResponse []*csi.BlockMetadata + expectErr bool + }{ + { + name: "success case", + sourceFileBlocks: 100, + targetFileBlocks: 100, + changedBlocks: []int{2, 4, 7, 30, 70}, + maxResult: 100, + expectedResponse: []*csi.BlockMetadata{ + { + ByteOffset: 2 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 4 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 7 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 30 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 70 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + }, + expectErr: false, + }, + { + name: "success case with max result", + sourceFileBlocks: 100, + targetFileBlocks: 100, + changedBlocks: []int{2, 4, 7, 10, 30, 65, 70}, + maxResult: 3, + expectedResponse: []*csi.BlockMetadata{ + { + ByteOffset: 2 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 4 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 7 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 10 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 30 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 65 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 70 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + }, + expectErr: false, + }, + { + name: "success case with starting offset", + sourceFileBlocks: 100, + targetFileBlocks: 100, + changedBlocks: []int{2, 4, 7, 10, 30, 70, 65}, + startingOffset: 9 * state.BlockSizeBytes, + maxResult: 3, + expectedResponse: []*csi.BlockMetadata{ + { + ByteOffset: 10 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 30 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 65 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 70 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + }, + expectErr: false, + }, + { + name: "sucess case empty response", + sourceFileBlocks: 100, + targetFileBlocks: 100, + startingOffset: 9 * state.BlockSizeBytes, + maxResult: 3, + expectedResponse: []*csi.BlockMetadata{}, + expectErr: false, + }, + { + name: "sucess case different sizes", + sourceFileBlocks: 95, + targetFileBlocks: 100, + changedBlocks: []int{70, 97}, + startingOffset: 9 * state.BlockSizeBytes, + maxResult: 3, + expectedResponse: []*csi.BlockMetadata{ + { + ByteOffset: 70 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 95 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 96 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 97 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 98 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 99 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + }, + expectErr: false, + }, + { + name: "sucess case different sizes", + sourceFileBlocks: 100, + targetFileBlocks: 95, + changedBlocks: []int{70, 97}, + startingOffset: 9 * state.BlockSizeBytes, + maxResult: 3, + expectedResponse: []*csi.BlockMetadata{ + { + ByteOffset: 70 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 95 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 96 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 97 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 98 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 99 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + }, + expectErr: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + stateDir, err := os.MkdirTemp(os.TempDir(), "csi-data-dir") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(stateDir) + + // Create test files with data + sourceFile := createTempFile(t, tc.sourceFileBlocks) + defer sourceFile.Close() + targetFile := createTempFile(t, tc.targetFileBlocks) + defer targetFile.Close() + for _, i := range tc.changedBlocks { + modifyBlock(t, targetFile, i, []byte("changed block")) + } + + cfg := Config{ + StateDir: stateDir, + Endpoint: "unix://tmp/csi.sock", + DriverName: "hostpath.csi.k8s.io", + NodeID: "fakeNodeID", + MaxVolumeSize: 1024 * 1024 * 1024 * 1024, + EnableTopology: true, + EnableControllerModifyVolume: true, + } + + hp, err := NewHostPathDriver(cfg) + if err != nil { + t.Fatal(err) + } + cb := make(chan []*csi.BlockMetadata, 100) + err1 := hp.getChangedBlockMetadata(context.Background(), sourceFile.Name(), targetFile.Name(), tc.startingOffset, state.BlockSizeBytes, tc.maxResult, cb) + if tc.expectErr { + if err1 == nil { + t.Fatalf("expected error, got none") + } + return + } + if err != nil { + t.Fatalf("expected no error, got: %v", err) + } + response := []*csi.BlockMetadata{} + responsePages := 0 + for c := range cb { + responsePages++ + response = append(response, c...) + } + // Validate max result limit + expPages := int(math.Ceil(float64(len(tc.expectedResponse)) / float64(tc.maxResult))) + if responsePages != expPages { + t.Fatalf("expected %d pages of response, got: %d", expPages, responsePages) + } + // Validate response content + if len(tc.expectedResponse) != len(response) { + t.Fatalf("expected %d changed blocks metadata, got: %d", len(tc.changedBlocks), len(response)) + } + for i := 0; i < len(response); i++ { + if response[i].String() != tc.expectedResponse[i].String() { + t.Fatalf("received unexpected block metadata, expected: %s\n, got %s", tc.expectedResponse[i].String(), response[i].String()) + } + } + + }) + } +} + +// createTempFile creates a file with given number of blocks +func createTempFile(t *testing.T, blocks int) *os.File { + f, err := os.CreateTemp("", "test-*.img") + if err != nil { + t.Fatal(err) + } + // Create n blocks of default block size; declared by BlockSizeBytes + for i := 0; i < blocks; i++ { + data := make([]byte, state.BlockSizeBytes) + // Set different content in each block + for j := 0; j < state.BlockSizeBytes; j++ { + data[j] = byte(i + 1) + } + _, err = f.Write(data) + if err != nil { + t.Fatal(err) + } + } + return f +} + +// modifyBlock modifies the content of a specific block in the file +func modifyBlock(t *testing.T, file *os.File, blockNumber int, newContent []byte) { + offset := int64(blockNumber) * state.BlockSizeBytes + // Seek to the start of the block + _, err := file.Seek(offset, 0) + if err != nil { + t.Fatal(err) + } + + // Create a buffer with the same size and copy new content into it + data := make([]byte, state.BlockSizeBytes) + copy(data, newContent) + + _, err = file.Write(data) + if err != nil { + t.Fatal(err) + } +} + +func TestGetAllocatedBlockMetadata(t *testing.T) { + testCases := []struct { + name string + fileBlocks int + startingOffset int64 + maxResult int32 + expectedBlocks []int + expectErr bool + }{ + { + name: "success case", + fileBlocks: 5, + maxResult: 100, + expectedBlocks: []int{0, 1, 2, 3, 4}, + expectErr: false, + }, + { + name: "success case with max result", + fileBlocks: 10, + expectedBlocks: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + maxResult: 3, + expectErr: false, + }, + { + name: "success case with starting offset", + fileBlocks: 10, + startingOffset: 4 * state.BlockSizeBytes, + expectedBlocks: []int{4, 5, 6, 7, 8, 9}, + maxResult: 3, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + stateDir, err := os.MkdirTemp(os.TempDir(), "csi-data-dir") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(stateDir) + + file := createTempFile(t, tc.fileBlocks) + defer file.Close() + + cfg := Config{ + StateDir: stateDir, + Endpoint: "unix://tmp/csi.sock", + DriverName: "hostpath.csi.k8s.io", + NodeID: "fakeNodeID", + MaxVolumeSize: 1024 * 1024 * 1024 * 1024, + EnableTopology: true, + EnableControllerModifyVolume: true, + } + + hp, err := NewHostPathDriver(cfg) + if err != nil { + t.Fatal(err) + } + cb := make(chan []*csi.BlockMetadata, 100) + err1 := hp.getAllocatedBlockMetadata(context.Background(), file.Name(), tc.startingOffset, state.BlockSizeBytes, tc.maxResult, cb) + if tc.expectErr { + if err1 == nil { + t.Fatalf("expected error, got none") + } + return + } + if err != nil { + t.Fatalf("expected no error, got: %v", err) + } + response := []*csi.BlockMetadata{} + responsePages := 0 + for c := range cb { + responsePages++ + response = append(response, c...) + } + // Validate max result limit + expPages := int(math.Ceil(float64(len(tc.expectedBlocks)) / float64(tc.maxResult))) + if responsePages != expPages { + t.Fatalf("expected %d pages of response, got: %d", expPages, responsePages) + } + // Validate response content + if len(tc.expectedBlocks) != len(response) { + t.Fatalf("expected %d changed blocks metadata, got: %d", tc.fileBlocks, len(response)) + } + for i := 0; i < len(tc.expectedBlocks); i++ { + expCB := createBlockMetadata(int64(tc.expectedBlocks[i]), state.BlockSizeBytes) + if response[i].String() != expCB.String() { + t.Fatalf("received unexpected block metadata, expected: %s\n, got %s", expCB.String(), response[i].String()) + } + } + }) + } +} diff --git a/pkg/hostpath/snapshotmetadataserver.go b/pkg/hostpath/snapshotmetadataserver.go index 7a0ce2497..fc5434858 100644 --- a/pkg/hostpath/snapshotmetadataserver.go +++ b/pkg/hostpath/snapshotmetadataserver.go @@ -27,6 +27,58 @@ import ( ) func (hp *hostPath) GetMetadataAllocated(req *csi.GetMetadataAllocatedRequest, stream csi.SnapshotMetadata_GetMetadataAllocatedServer) error { + ctx := stream.Context() + // Check arguments + snapID := req.GetSnapshotId() + if len(snapID) == 0 { + return status.Error(codes.InvalidArgument, "SnapshotID missing in request") + } + + // Load snapshots + source, err := hp.state.GetSnapshotByID(snapID) + if err != nil { + return status.Error(codes.Internal, "Cannot find the snapshot") + } + if !source.ReadyToUse { + return status.Error(codes.Unavailable, fmt.Sprintf("snapshot %v is not yet ready to use", snapID)) + } + + vol, err := hp.state.GetVolumeByID(source.VolID) + if err != nil { + return err + } + if vol.VolAccessType != state.BlockAccess { + return status.Error(codes.InvalidArgument, "Source volume does not have block mode access type") + } + + allocatedBlocks := make(chan []*csi.BlockMetadata, 100) + go func() { + err := hp.getAllocatedBlockMetadata(ctx, hp.getSnapshotPath(snapID), req.StartingOffset, state.BlockSizeBytes, req.MaxResults, allocatedBlocks) + if err != nil { + klog.Errorf("failed to get allocated block metadata: %v", err) + } + }() + + for { + select { + case cb, ok := <-allocatedBlocks: + if !ok { + klog.V(4).Info("channel closed, returning") + return nil + } + resp := csi.GetMetadataAllocatedResponse{ + BlockMetadataType: csi.BlockMetadataType_FIXED_LENGTH, + VolumeCapacityBytes: vol.VolSize, + BlockMetadata: cb, + } + if err := stream.Send(&resp); err != nil { + return err + } + case <-ctx.Done(): + klog.V(4).Info("received cancellation signal, returning") + return nil + } + } return nil } @@ -70,7 +122,7 @@ func (hp *hostPath) GetMetadataDelta(req *csi.GetMetadataDeltaRequest, stream cs return status.Error(codes.InvalidArgument, "Source volume does not have block mode access type") } - changedBlocks := make(chan []*csi.BlockMetadata) + changedBlocks := make(chan []*csi.BlockMetadata, 100) go func() { err := hp.getChangedBlockMetadata(ctx, hp.getSnapshotPath(baseSnapID), hp.getSnapshotPath(targetSnapID), req.StartingOffset, state.BlockSizeBytes, req.MaxResults, changedBlocks) if err != nil { From dd68929c2d9c03c5b3ba035f0a5459775d014b0f Mon Sep 17 00:00:00 2001 From: Prasad Ghangal Date: Wed, 16 Oct 2024 13:06:03 +0530 Subject: [PATCH 03/12] Do not enable snapshotmetadata service by default Signed-off-by: Prasad Ghangal --- cmd/hostpathplugin/main.go | 2 ++ pkg/hostpath/hostpath.go | 10 ++++++++-- pkg/hostpath/identityserver.go | 12 +++++++----- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/cmd/hostpathplugin/main.go b/cmd/hostpathplugin/main.go index f2177e66e..ca06af582 100644 --- a/cmd/hostpathplugin/main.go +++ b/cmd/hostpathplugin/main.go @@ -55,6 +55,8 @@ func main() { flag.BoolVar(&cfg.EnableVolumeExpansion, "enable-volume-expansion", true, "Enables volume expansion feature.") flag.BoolVar(&cfg.EnableControllerModifyVolume, "enable-controller-modify-volume", false, "Enables Controller modify volume feature.") + // TODO: Remove this feature flag and enable SnapshotMetadata service by default once external-snapshot-metadata alpha is released. + flag.BoolVar(&cfg.EnableSnapshotMetadata, "enable-snapshot-metadata", false, "Enables Snapshot Metadata service.") flag.Var(&cfg.AcceptedMutableParameterNames, "accepted-mutable-parameter-names", "Comma separated list of parameter names that can be modified on a persistent volume. This is only used when enable-controller-modify-volume is true. If unset, all parameters are mutable.") flag.BoolVar(&cfg.DisableControllerExpansion, "disable-controller-expansion", false, "Disables Controller volume expansion capability.") flag.BoolVar(&cfg.DisableNodeExpansion, "disable-node-expansion", false, "Disables Node volume expansion capability.") diff --git a/pkg/hostpath/hostpath.go b/pkg/hostpath/hostpath.go index 2ee9aa4dd..e0a10f1c0 100644 --- a/pkg/hostpath/hostpath.go +++ b/pkg/hostpath/hostpath.go @@ -81,6 +81,7 @@ type Config struct { EnableTopology bool EnableVolumeExpansion bool EnableControllerModifyVolume bool + EnableSnapshotMetadata bool AcceptedMutableParameterNames StringArray DisableControllerExpansion bool DisableNodeExpansion bool @@ -130,8 +131,13 @@ func NewHostPathDriver(cfg Config) (*hostPath, error) { func (hp *hostPath) Run() error { s := NewNonBlockingGRPCServer() - // hp itself implements ControllerServer, NodeServer, and IdentityServer. - s.Start(hp.config.Endpoint, hp, hp, hp, hp, hp) + // hp itself implements ControllerServer, NodeServer, IdentityServer, and SnapshotMetadataServer. + // TODO: Enable SnapshotMetadata service by default once external-snapshot-metadata alpha is released. + var sms csi.SnapshotMetadataServer + if hp.config.EnableSnapshotMetadata { + sms = hp + } + s.Start(hp.config.Endpoint, hp, hp, hp, hp, sms) s.Wait() return nil diff --git a/pkg/hostpath/identityserver.go b/pkg/hostpath/identityserver.go index ace626f41..e3eb1ae58 100644 --- a/pkg/hostpath/identityserver.go +++ b/pkg/hostpath/identityserver.go @@ -62,19 +62,21 @@ func (hp *hostPath) GetPluginCapabilities(ctx context.Context, req *csi.GetPlugi }, }, }, - { + } + if hp.config.EnableTopology { + caps = append(caps, &csi.PluginCapability{ Type: &csi.PluginCapability_Service_{ Service: &csi.PluginCapability_Service{ - Type: csi.PluginCapability_Service_SNAPSHOT_METADATA_SERVICE, + Type: csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS, }, }, - }, + }) } - if hp.config.EnableTopology { + if hp.config.EnableSnapshotMetadata { caps = append(caps, &csi.PluginCapability{ Type: &csi.PluginCapability_Service_{ Service: &csi.PluginCapability_Service{ - Type: csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS, + Type: csi.PluginCapability_Service_SNAPSHOT_METADATA_SERVICE, }, }, }) From 91c42e365411afd9f23d901c631b21ba911f7a59 Mon Sep 17 00:00:00 2001 From: Prasad Ghangal Date: Wed, 23 Oct 2024 17:40:27 +0530 Subject: [PATCH 04/12] Add support for variable length metadata Signed-off-by: Prasad Ghangal --- cmd/hostpathplugin/main.go | 13 +++- pkg/hostpath/hostpath.go | 1 + pkg/hostpath/snapshotmetadata.go | 35 ++++++++- pkg/hostpath/snapshotmetadata_test.go | 107 ++++++++++++++++++++++---- 4 files changed, 138 insertions(+), 18 deletions(-) diff --git a/cmd/hostpathplugin/main.go b/cmd/hostpathplugin/main.go index ca06af582..f1c52899d 100644 --- a/cmd/hostpathplugin/main.go +++ b/cmd/hostpathplugin/main.go @@ -25,9 +25,11 @@ import ( "path" "syscall" + "k8s.io/klog/v2" + + "github.com/container-storage-interface/spec/lib/go/csi" "github.com/kubernetes-csi/csi-driver-host-path/internal/proxy" "github.com/kubernetes-csi/csi-driver-host-path/pkg/hostpath" - "k8s.io/klog/v2" ) var ( @@ -57,6 +59,7 @@ func main() { flag.BoolVar(&cfg.EnableControllerModifyVolume, "enable-controller-modify-volume", false, "Enables Controller modify volume feature.") // TODO: Remove this feature flag and enable SnapshotMetadata service by default once external-snapshot-metadata alpha is released. flag.BoolVar(&cfg.EnableSnapshotMetadata, "enable-snapshot-metadata", false, "Enables Snapshot Metadata service.") + snapshotMetadataBlockType := flag.String("snapshot-metadata-block-type", "FIXED_LENGTH", "Expected Snapshot Metadata block type in response. Allowed valid types are FIXED_LENGTH or VARIABLE_LENGTH. If not specified, FIXED_LENGTH is used by default.") flag.Var(&cfg.AcceptedMutableParameterNames, "accepted-mutable-parameter-names", "Comma separated list of parameter names that can be modified on a persistent volume. This is only used when enable-controller-modify-volume is true. If unset, all parameters are mutable.") flag.BoolVar(&cfg.DisableControllerExpansion, "disable-controller-expansion", false, "Disables Controller volume expansion capability.") flag.BoolVar(&cfg.DisableNodeExpansion, "disable-node-expansion", false, "Disables Node volume expansion capability.") @@ -108,6 +111,14 @@ func main() { cfg.MaxVolumeExpansionSizeNode = cfg.MaxVolumeSize } + // validate snapshot-metadata-type arg block type + bt, ok := csi.BlockMetadataType_value[*snapshotMetadataBlockType] + if !ok { + fmt.Printf("invalid snapshot-metadata-block-type passed, please pass one of the - FIXED_LENGTH, VARIABLE_LENGTH") + os.Exit(1) + } + cfg.SnapshotMetadataBlockType = csi.BlockMetadataType(bt) + driver, err := hostpath.NewHostPathDriver(cfg) if err != nil { fmt.Printf("Failed to initialize driver: %s", err.Error()) diff --git a/pkg/hostpath/hostpath.go b/pkg/hostpath/hostpath.go index e0a10f1c0..af194f7ff 100644 --- a/pkg/hostpath/hostpath.go +++ b/pkg/hostpath/hostpath.go @@ -82,6 +82,7 @@ type Config struct { EnableVolumeExpansion bool EnableControllerModifyVolume bool EnableSnapshotMetadata bool + SnapshotMetadataBlockType csi.BlockMetadataType AcceptedMutableParameterNames StringArray DisableControllerExpansion bool DisableNodeExpansion bool diff --git a/pkg/hostpath/snapshotmetadata.go b/pkg/hostpath/snapshotmetadata.go index 9d473cc66..27ad19f6c 100644 --- a/pkg/hostpath/snapshotmetadata.go +++ b/pkg/hostpath/snapshotmetadata.go @@ -120,7 +120,14 @@ func (hp *hostPath) compareBlocks(ctx context.Context, source, target *os.File, } return nil } - changedBlocks = append(changedBlocks, createBlockMetadata(blockIndex, blockSize)) + // if VARIABLE_LENGTH type is enabled, return blocks extend instead of individual blocks. + blockMetadata := createBlockMetadata(blockIndex, blockSize) + if extendBlock(changedBlocks, csi.BlockMetadataType(hp.config.SnapshotMetadataBlockType), blockIndex, blockSize) { + changedBlocks[len(changedBlocks)-1].SizeBytes += blockSize + blockIndex++ + continue + } + changedBlocks = append(changedBlocks, blockMetadata) blockIndex++ continue } @@ -143,8 +150,15 @@ func (hp *hostPath) compareBlocks(ctx context.Context, source, target *os.File, // Compare the two blocks and add result. // Even if one of the file reaches to end, continue to add block metadata of other file. if blockChanged(sBuffer[:sourceReadBytes], tBuffer[:targetReadBytes]) { - // TODO: Support for VARIABLE sized block metadata - changedBlocks = append(changedBlocks, createBlockMetadata(blockIndex, blockSize)) + blockMetadata := createBlockMetadata(blockIndex, blockSize) + // if VARIABLE_LEGTH type is enabled, check if blocks are adjacent, + // extend the previous block if adjacent blocks found instead of adding new entry. + if extendBlock(changedBlocks, csi.BlockMetadataType(hp.config.SnapshotMetadataBlockType), blockIndex, blockSize) { + changedBlocks[len(changedBlocks)-1].SizeBytes += blockSize + blockIndex++ + continue + } + changedBlocks = append(changedBlocks, blockMetadata) } blockIndex++ @@ -181,3 +195,18 @@ func createBlockMetadata(blockIndex, blockSize int64) *csi.BlockMetadata { SizeBytes: blockSize, } } + +func extendBlock(changedBlocks []*csi.BlockMetadata, metadataType csi.BlockMetadataType, blockIndex, blockSize int64) bool { + blockMetadata := createBlockMetadata(blockIndex, blockSize) + // if VARIABLE_LEGTH type is enabled, check if blocks are adjacent, + // extend the previous block if adjacent blocks found instead of adding new entry. + if len(changedBlocks) < 1 { + return false + } + lastBlock := changedBlocks[len(changedBlocks)-1] + if blockMetadata.ByteOffset == lastBlock.ByteOffset+lastBlock.SizeBytes && + metadataType == csi.BlockMetadataType_VARIABLE_LENGTH { + return true + } + return false +} diff --git a/pkg/hostpath/snapshotmetadata_test.go b/pkg/hostpath/snapshotmetadata_test.go index 69063779d..40e75b562 100644 --- a/pkg/hostpath/snapshotmetadata_test.go +++ b/pkg/hostpath/snapshotmetadata_test.go @@ -28,14 +28,15 @@ import ( func TestGetChangedBlockMetadata(t *testing.T) { testCases := []struct { - name string - sourceFileBlocks int - targetFileBlocks int - changedBlocks []int - startingOffset int64 - maxResult int32 - expectedResponse []*csi.BlockMetadata - expectErr bool + name string + sourceFileBlocks int + targetFileBlocks int + changedBlocks []int + blockMetadataType csi.BlockMetadataType + startingOffset int64 + maxResult int32 + expectedResponse []*csi.BlockMetadata + expectErr bool }{ { name: "success case", @@ -211,6 +212,57 @@ func TestGetChangedBlockMetadata(t *testing.T) { }, expectErr: false, }, + { + name: "success case with variable block sizes", + sourceFileBlocks: 100, + targetFileBlocks: 100, + changedBlocks: []int{3, 4, 5, 6, 7, 47, 48, 49, 51}, + blockMetadataType: csi.BlockMetadataType_VARIABLE_LENGTH, + maxResult: 100, + expectedResponse: []*csi.BlockMetadata{ + { + ByteOffset: 3 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes * 5, + }, + { + ByteOffset: 47 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes * 3, + }, + { + ByteOffset: 51 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + }, + expectErr: false, + }, + { + name: "success case with starting offset and variable length", + sourceFileBlocks: 100, + targetFileBlocks: 100, + changedBlocks: []int{2, 4, 7, 10, 13, 14, 30, 65}, + blockMetadataType: csi.BlockMetadataType_VARIABLE_LENGTH, + startingOffset: 9 * state.BlockSizeBytes, + maxResult: 3, + expectedResponse: []*csi.BlockMetadata{ + { + ByteOffset: 10 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 13 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes * 2, + }, + { + ByteOffset: 30 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + { + ByteOffset: 65 * state.BlockSizeBytes, + SizeBytes: state.BlockSizeBytes, + }, + }, + expectErr: false, + }, } for _, tc := range testCases { @@ -238,6 +290,7 @@ func TestGetChangedBlockMetadata(t *testing.T) { MaxVolumeSize: 1024 * 1024 * 1024 * 1024, EnableTopology: true, EnableControllerModifyVolume: true, + SnapshotMetadataBlockType: tc.blockMetadataType, } hp, err := NewHostPathDriver(cfg) @@ -322,12 +375,13 @@ func modifyBlock(t *testing.T, file *os.File, blockNumber int, newContent []byte func TestGetAllocatedBlockMetadata(t *testing.T) { testCases := []struct { - name string - fileBlocks int - startingOffset int64 - maxResult int32 - expectedBlocks []int - expectErr bool + name string + fileBlocks int + startingOffset int64 + blockMetadataType csi.BlockMetadataType + maxResult int32 + expectedBlocks []int + expectErr bool }{ { name: "success case", @@ -350,6 +404,22 @@ func TestGetAllocatedBlockMetadata(t *testing.T) { expectedBlocks: []int{4, 5, 6, 7, 8, 9}, maxResult: 3, }, + { + name: "success case with variable block types", + fileBlocks: 5, + blockMetadataType: csi.BlockMetadataType_VARIABLE_LENGTH, + maxResult: 100, + expectedBlocks: []int{0}, + expectErr: false, + }, + { + name: "success case with starting offset and variable length", + fileBlocks: 10, + startingOffset: 4 * state.BlockSizeBytes, + blockMetadataType: csi.BlockMetadataType_VARIABLE_LENGTH, + expectedBlocks: []int{4}, + maxResult: 10, + }, } for _, tc := range testCases { @@ -371,6 +441,7 @@ func TestGetAllocatedBlockMetadata(t *testing.T) { MaxVolumeSize: 1024 * 1024 * 1024 * 1024, EnableTopology: true, EnableControllerModifyVolume: true, + SnapshotMetadataBlockType: tc.blockMetadataType, } hp, err := NewHostPathDriver(cfg) @@ -403,6 +474,14 @@ func TestGetAllocatedBlockMetadata(t *testing.T) { if len(tc.expectedBlocks) != len(response) { t.Fatalf("expected %d changed blocks metadata, got: %d", tc.fileBlocks, len(response)) } + if tc.blockMetadataType == csi.BlockMetadataType_VARIABLE_LENGTH { + expCB := createBlockMetadata(int64(tc.expectedBlocks[0]), state.BlockSizeBytes) + expCB.SizeBytes = int64(tc.fileBlocks-tc.expectedBlocks[0]) * state.BlockSizeBytes + if response[0].String() != expCB.String() { + t.Fatalf("received unexpected block metadata, expected: %s\n, got %s", expCB.String(), response[0].String()) + } + return + } for i := 0; i < len(tc.expectedBlocks); i++ { expCB := createBlockMetadata(int64(tc.expectedBlocks[i]), state.BlockSizeBytes) if response[i].String() != expCB.String() { From 9821fd8e7a535b2d37bc98d18f51b1f53f639dec Mon Sep 17 00:00:00 2001 From: Prasad Ghangal Date: Fri, 25 Oct 2024 17:22:38 +0530 Subject: [PATCH 05/12] Remove unreachable code Signed-off-by: Prasad Ghangal --- pkg/hostpath/snapshotmetadataserver.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/hostpath/snapshotmetadataserver.go b/pkg/hostpath/snapshotmetadataserver.go index fc5434858..65cc52ea1 100644 --- a/pkg/hostpath/snapshotmetadataserver.go +++ b/pkg/hostpath/snapshotmetadataserver.go @@ -79,7 +79,6 @@ func (hp *hostPath) GetMetadataAllocated(req *csi.GetMetadataAllocatedRequest, s return nil } } - return nil } func (hp *hostPath) GetMetadataDelta(req *csi.GetMetadataDeltaRequest, stream csi.SnapshotMetadata_GetMetadataDeltaServer) error { @@ -150,5 +149,4 @@ func (hp *hostPath) GetMetadataDelta(req *csi.GetMetadataDeltaRequest, stream cs return nil } } - return nil } From d49ecb5bee63e57ecd65aa293806affc53c1fef1 Mon Sep 17 00:00:00 2001 From: Prasad Ghangal Date: Wed, 6 Nov 2024 18:43:31 +0530 Subject: [PATCH 06/12] Remove TODO comments Signed-off-by: Prasad Ghangal --- cmd/hostpathplugin/main.go | 1 - pkg/hostpath/hostpath.go | 2 -- pkg/hostpath/snapshotmetadata.go | 3 +++ 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/hostpathplugin/main.go b/cmd/hostpathplugin/main.go index f1c52899d..9dcbe7238 100644 --- a/cmd/hostpathplugin/main.go +++ b/cmd/hostpathplugin/main.go @@ -57,7 +57,6 @@ func main() { flag.BoolVar(&cfg.EnableVolumeExpansion, "enable-volume-expansion", true, "Enables volume expansion feature.") flag.BoolVar(&cfg.EnableControllerModifyVolume, "enable-controller-modify-volume", false, "Enables Controller modify volume feature.") - // TODO: Remove this feature flag and enable SnapshotMetadata service by default once external-snapshot-metadata alpha is released. flag.BoolVar(&cfg.EnableSnapshotMetadata, "enable-snapshot-metadata", false, "Enables Snapshot Metadata service.") snapshotMetadataBlockType := flag.String("snapshot-metadata-block-type", "FIXED_LENGTH", "Expected Snapshot Metadata block type in response. Allowed valid types are FIXED_LENGTH or VARIABLE_LENGTH. If not specified, FIXED_LENGTH is used by default.") flag.Var(&cfg.AcceptedMutableParameterNames, "accepted-mutable-parameter-names", "Comma separated list of parameter names that can be modified on a persistent volume. This is only used when enable-controller-modify-volume is true. If unset, all parameters are mutable.") diff --git a/pkg/hostpath/hostpath.go b/pkg/hostpath/hostpath.go index af194f7ff..4abeac774 100644 --- a/pkg/hostpath/hostpath.go +++ b/pkg/hostpath/hostpath.go @@ -132,8 +132,6 @@ func NewHostPathDriver(cfg Config) (*hostPath, error) { func (hp *hostPath) Run() error { s := NewNonBlockingGRPCServer() - // hp itself implements ControllerServer, NodeServer, IdentityServer, and SnapshotMetadataServer. - // TODO: Enable SnapshotMetadata service by default once external-snapshot-metadata alpha is released. var sms csi.SnapshotMetadataServer if hp.config.EnableSnapshotMetadata { sms = hp diff --git a/pkg/hostpath/snapshotmetadata.go b/pkg/hostpath/snapshotmetadata.go index 27ad19f6c..619e83677 100644 --- a/pkg/hostpath/snapshotmetadata.go +++ b/pkg/hostpath/snapshotmetadata.go @@ -26,6 +26,9 @@ import ( "k8s.io/klog/v2" ) +// NOTE: This implementation of SnapshotMetadata service is used for demo and CI testing purpose only. +// This should not be used in production or as an example about how to write a real driver. + func (hp *hostPath) getAllocatedBlockMetadata(ctx context.Context, filePath string, startingOffset, blockSize int64, maxResult int32, allocBlocksChan chan<- []*csi.BlockMetadata) error { klog.V(4).Infof("finding allocated blocks in the file: %s", filePath) defer close(allocBlocksChan) From a06f79baf4427a0bbfb4f4c6d26b89e38d69900c Mon Sep 17 00:00:00 2001 From: Prasad Ghangal Date: Thu, 7 Nov 2024 19:27:45 +0530 Subject: [PATCH 07/12] Find changed blocks in sync call Signed-off-by: Prasad Ghangal --- pkg/hostpath/snapshotmetadata.go | 244 +++++++++++++------------ pkg/hostpath/snapshotmetadata_test.go | 108 +++++------ pkg/hostpath/snapshotmetadataserver.go | 153 +++++++++++----- 3 files changed, 278 insertions(+), 227 deletions(-) diff --git a/pkg/hostpath/snapshotmetadata.go b/pkg/hostpath/snapshotmetadata.go index 619e83677..ae224a491 100644 --- a/pkg/hostpath/snapshotmetadata.go +++ b/pkg/hostpath/snapshotmetadata.go @@ -29,149 +29,165 @@ import ( // NOTE: This implementation of SnapshotMetadata service is used for demo and CI testing purpose only. // This should not be used in production or as an example about how to write a real driver. -func (hp *hostPath) getAllocatedBlockMetadata(ctx context.Context, filePath string, startingOffset, blockSize int64, maxResult int32, allocBlocksChan chan<- []*csi.BlockMetadata) error { - klog.V(4).Infof("finding allocated blocks in the file: %s", filePath) - defer close(allocBlocksChan) +type fileBlockReader struct { + base *os.File + target *os.File + offset int64 + blockSize int64 + blockMetadataType csi.BlockMetadataType + maxResult int32 +} - file, err := os.Open(filePath) +func newFileBlockReader( + basePath, + targetPath string, + startingOffset int64, + blockSize int64, + blockMetadataType csi.BlockMetadataType, + maxResult int32, +) (*fileBlockReader, error) { + base, target, err := openFiles(basePath, targetPath) if err != nil { - return err - } - defer file.Close() - - if _, err := file.Seek(startingOffset, 0); err != nil { - return err + return nil, err } - return hp.compareBlocks(ctx, nil, file, startingOffset, blockSize, maxResult, allocBlocksChan) + return &fileBlockReader{ + base: base, + target: target, + offset: startingOffset, + blockSize: blockSize, + blockMetadataType: blockMetadataType, + maxResult: maxResult, + }, nil } -func (hp *hostPath) getChangedBlockMetadata(ctx context.Context, sourcePath, targetPath string, startingOffset, blockSize int64, maxResult int32, changedBlocksChan chan<- []*csi.BlockMetadata) error { - klog.V(4).Infof("finding changed blocks between two files: %s, %s", sourcePath, targetPath) - defer close(changedBlocksChan) - - source, target, err := openFiles(sourcePath, targetPath) - if err != nil { +func (cb *fileBlockReader) seekToStartingOffset() error { + if _, err := cb.target.Seek(cb.offset, io.SeekStart); err != nil { return err } - defer source.Close() - defer target.Close() - - if err := seekToOffset(source, target, startingOffset); err != nil { + if cb.base == nil { + return nil + } + if _, err := cb.base.Seek(cb.offset, io.SeekStart); err != nil { return err } + return nil +} - return hp.compareBlocks(ctx, source, target, startingOffset, blockSize, maxResult, changedBlocksChan) +func (cb *fileBlockReader) Close() error { + if cb.base != nil { + if err := cb.base.Close(); err != nil { + return err + } + } + if cb.target != nil { + if err := cb.target.Close(); err != nil { + return err + } + } + return nil } -func openFiles(sourcePath, targetPath string) (source, target *os.File, err error) { - source, err = os.Open(sourcePath) +func openFiles(basePath, targetPath string) (base, target *os.File, err error) { + target, err = os.Open(targetPath) if err != nil { + base.Close() return nil, nil, err } - - target, err = os.Open(targetPath) + if basePath == "" { + return nil, target, nil + } + base, err = os.Open(basePath) if err != nil { - source.Close() return nil, nil, err } - return source, target, nil + return base, target, nil } -func seekToOffset(source, target *os.File, startingOffset int64) error { - if _, err := source.Seek(startingOffset, 0); err != nil { - return err - } - if _, err := target.Seek(startingOffset, 0); err != nil { - return err +// getChangedBlockMetadata reads base and target files, compare block differences between them +// and returns list of changed block metadata. It reads all the blocks till it reaches EOF or size of changed block +// metadata list <= maxSize. +func (cb *fileBlockReader) getChangedBlockMetadata(ctx context.Context) ([]*csi.BlockMetadata, error) { + if cb.base == nil { + klog.V(4).Infof("finding allocated blocks by file: %s", cb.target.Name()) + } else { + klog.V(4).Infof("finding changed blocks between two files: %s, %s", cb.base.Name(), cb.target.Name()) } - return nil -} -// Compare blocks from source and target, and send changed blocks to channel. -// If source if nil, returns blocks allocated by target. -func (hp *hostPath) compareBlocks(ctx context.Context, source, target *os.File, startingOffset, blockSize int64, maxResult int32, changedBlocksChan chan<- []*csi.BlockMetadata) error { - blockIndex := startingOffset / blockSize - sBuffer := make([]byte, blockSize) - tBuffer := make([]byte, blockSize) - eofSourceFile, eofTargetFile := false, false - - for { - changedBlocks := []*csi.BlockMetadata{} - - // Read blocks and compare them. Create the list of changed blocks metadata. - // Once the number of blocks reaches to maxResult, return the result and - // compute next batch of blocks. - for int32(len(changedBlocks)) < maxResult { - select { - case <-ctx.Done(): - klog.V(4).Infof("handling cancellation signal, closing goroutine") - return nil - default: - targetReadBytes, eofTarget, err := readFileBlock(target, tBuffer, eofTargetFile) - if err != nil { - return err - } - eofTargetFile = eofTarget - - if source == nil { - // If source is nil, return blocks allocated by target file. - if eofTargetFile { - if len(changedBlocks) != 0 { - changedBlocksChan <- changedBlocks - } - return nil - } - // if VARIABLE_LENGTH type is enabled, return blocks extend instead of individual blocks. - blockMetadata := createBlockMetadata(blockIndex, blockSize) - if extendBlock(changedBlocks, csi.BlockMetadataType(hp.config.SnapshotMetadataBlockType), blockIndex, blockSize) { - changedBlocks[len(changedBlocks)-1].SizeBytes += blockSize - blockIndex++ - continue - } - changedBlocks = append(changedBlocks, blockMetadata) - blockIndex++ - continue - } - - sourceReadBytes, eofSource, err := readFileBlock(source, sBuffer, eofSourceFile) - if err != nil { - return err - } - eofSourceFile = eofSource - - // If both files have reached EOF, exit the loop. - if eofSourceFile && eofTargetFile { - klog.V(4).Infof("reached end of the files") - if len(changedBlocks) != 0 { - changedBlocksChan <- changedBlocks - } - return nil - } - - // Compare the two blocks and add result. - // Even if one of the file reaches to end, continue to add block metadata of other file. - if blockChanged(sBuffer[:sourceReadBytes], tBuffer[:targetReadBytes]) { - blockMetadata := createBlockMetadata(blockIndex, blockSize) - // if VARIABLE_LEGTH type is enabled, check if blocks are adjacent, - // extend the previous block if adjacent blocks found instead of adding new entry. - if extendBlock(changedBlocks, csi.BlockMetadataType(hp.config.SnapshotMetadataBlockType), blockIndex, blockSize) { - changedBlocks[len(changedBlocks)-1].SizeBytes += blockSize - blockIndex++ - continue - } - changedBlocks = append(changedBlocks, blockMetadata) - } + blockIndex := cb.offset / cb.blockSize + sBuffer := make([]byte, cb.blockSize) + tBuffer := make([]byte, cb.blockSize) + eofBaseFile, eofTargetFile := false, false + + changedBlocks := []*csi.BlockMetadata{} + + // Read blocks and compare them. Create the list of changed blocks metadata. + // Once the number of blocks reaches to maxResult, return the result and + // compute next batch of blocks. + for int32(len(changedBlocks)) < cb.maxResult { + select { + case <-ctx.Done(): + klog.V(4).Infof("handling cancellation signal, closing goroutine") + return nil, ctx.Err() + default: + } + targetReadBytes, eofTarget, err := readFileBlock(cb.target, tBuffer, eofTargetFile) + if err != nil { + return nil, err + } + eofTargetFile = eofTarget + // If base is nil, return blocks allocated by target file. + if cb.base == nil { + if eofTargetFile { + return changedBlocks, io.EOF + } + // if VARIABLE_LENGTH type is enabled, return blocks extend instead of individual blocks. + blockMetadata := createBlockMetadata(blockIndex, cb.blockSize) + if extendBlock(changedBlocks, csi.BlockMetadataType(cb.blockMetadataType), blockIndex, cb.blockSize) { + changedBlocks[len(changedBlocks)-1].SizeBytes += cb.blockSize + cb.offset += cb.blockSize blockIndex++ + continue } + changedBlocks = append(changedBlocks, blockMetadata) + cb.offset += cb.blockSize + blockIndex++ + continue + } + + baseReadBytes, eofBase, err := readFileBlock(cb.base, sBuffer, eofBaseFile) + if err != nil { + return nil, err + } + eofBaseFile = eofBase + + // If both files have reached EOF, exit the loop. + if eofBaseFile && eofTargetFile { + klog.V(4).Infof("reached end of the files") + return changedBlocks, io.EOF } - if len(changedBlocks) > 0 { - changedBlocksChan <- changedBlocks + // Compare the two blocks and add result. + // Even if one of the file reaches to end, continue to add block metadata of other file. + if blockChanged(sBuffer[:baseReadBytes], tBuffer[:targetReadBytes]) { + blockMetadata := createBlockMetadata(blockIndex, cb.blockSize) + // if VARIABLE_LEGTH type is enabled, check if blocks are adjacent, + // extend the previous block if adjacent blocks found instead of adding new entry. + if extendBlock(changedBlocks, csi.BlockMetadataType(cb.blockMetadataType), blockIndex, cb.blockSize) { + changedBlocks[len(changedBlocks)-1].SizeBytes += cb.blockSize + cb.offset += cb.blockSize + blockIndex++ + continue + } + changedBlocks = append(changedBlocks, blockMetadata) } + + cb.offset += cb.blockSize + blockIndex++ } + return changedBlocks, nil } // readFileBlock reads blocks from a file. @@ -188,8 +204,8 @@ func readFileBlock(file *os.File, buffer []byte, eof bool) (int, bool, error) { return bytesRead, err == io.EOF, nil } -func blockChanged(sourceBlock, targetBlock []byte) bool { - return !bytes.Equal(sourceBlock, targetBlock) +func blockChanged(baseBlock, targetBlock []byte) bool { + return !bytes.Equal(baseBlock, targetBlock) } func createBlockMetadata(blockIndex, blockSize int64) *csi.BlockMetadata { diff --git a/pkg/hostpath/snapshotmetadata_test.go b/pkg/hostpath/snapshotmetadata_test.go index 40e75b562..6e71d5ded 100644 --- a/pkg/hostpath/snapshotmetadata_test.go +++ b/pkg/hostpath/snapshotmetadata_test.go @@ -18,6 +18,7 @@ package hostpath import ( "context" + "io" "math" "os" "testing" @@ -36,7 +37,6 @@ func TestGetChangedBlockMetadata(t *testing.T) { startingOffset int64 maxResult int32 expectedResponse []*csi.BlockMetadata - expectErr bool }{ { name: "success case", @@ -66,7 +66,6 @@ func TestGetChangedBlockMetadata(t *testing.T) { SizeBytes: state.BlockSizeBytes, }, }, - expectErr: false, }, { name: "success case with max result", @@ -104,7 +103,6 @@ func TestGetChangedBlockMetadata(t *testing.T) { SizeBytes: state.BlockSizeBytes, }, }, - expectErr: false, }, { name: "success case with starting offset", @@ -131,7 +129,6 @@ func TestGetChangedBlockMetadata(t *testing.T) { SizeBytes: state.BlockSizeBytes, }, }, - expectErr: false, }, { name: "sucess case empty response", @@ -140,7 +137,6 @@ func TestGetChangedBlockMetadata(t *testing.T) { startingOffset: 9 * state.BlockSizeBytes, maxResult: 3, expectedResponse: []*csi.BlockMetadata{}, - expectErr: false, }, { name: "sucess case different sizes", @@ -175,7 +171,6 @@ func TestGetChangedBlockMetadata(t *testing.T) { SizeBytes: state.BlockSizeBytes, }, }, - expectErr: false, }, { name: "sucess case different sizes", @@ -210,7 +205,6 @@ func TestGetChangedBlockMetadata(t *testing.T) { SizeBytes: state.BlockSizeBytes, }, }, - expectErr: false, }, { name: "success case with variable block sizes", @@ -233,7 +227,6 @@ func TestGetChangedBlockMetadata(t *testing.T) { SizeBytes: state.BlockSizeBytes, }, }, - expectErr: false, }, { name: "success case with starting offset and variable length", @@ -261,7 +254,6 @@ func TestGetChangedBlockMetadata(t *testing.T) { SizeBytes: state.BlockSizeBytes, }, }, - expectErr: false, }, } @@ -282,38 +274,31 @@ func TestGetChangedBlockMetadata(t *testing.T) { modifyBlock(t, targetFile, i, []byte("changed block")) } - cfg := Config{ - StateDir: stateDir, - Endpoint: "unix://tmp/csi.sock", - DriverName: "hostpath.csi.k8s.io", - NodeID: "fakeNodeID", - MaxVolumeSize: 1024 * 1024 * 1024 * 1024, - EnableTopology: true, - EnableControllerModifyVolume: true, - SnapshotMetadataBlockType: tc.blockMetadataType, + br, err1 := newFileBlockReader(sourceFile.Name(), targetFile.Name(), tc.startingOffset, state.BlockSizeBytes, tc.blockMetadataType, tc.maxResult) + if err1 != nil { + t.Fatalf("expected no error, got: %v", err1) } - - hp, err := NewHostPathDriver(cfg) - if err != nil { - t.Fatal(err) - } - cb := make(chan []*csi.BlockMetadata, 100) - err1 := hp.getChangedBlockMetadata(context.Background(), sourceFile.Name(), targetFile.Name(), tc.startingOffset, state.BlockSizeBytes, tc.maxResult, cb) - if tc.expectErr { - if err1 == nil { - t.Fatalf("expected error, got none") - } - return - } - if err != nil { - t.Fatalf("expected no error, got: %v", err) + if err := br.seekToStartingOffset(); err != nil { + t.Fatalf("expected no error, got: %v", err1) } + response := []*csi.BlockMetadata{} responsePages := 0 - for c := range cb { - responsePages++ - response = append(response, c...) + ctx := context.Background() + for { + cb, cbErr := br.getChangedBlockMetadata(ctx) + if cbErr != nil && cbErr != io.EOF { + t.Fatalf("expected no error, got: %v", cbErr) + } + if len(cb) != 0 { + responsePages++ + response = append(response, cb...) + } + if cbErr == io.EOF { + break + } } + // Validate max result limit expPages := int(math.Ceil(float64(len(tc.expectedResponse)) / float64(tc.maxResult))) if responsePages != expPages { @@ -381,21 +366,18 @@ func TestGetAllocatedBlockMetadata(t *testing.T) { blockMetadataType csi.BlockMetadataType maxResult int32 expectedBlocks []int - expectErr bool }{ { name: "success case", fileBlocks: 5, maxResult: 100, expectedBlocks: []int{0, 1, 2, 3, 4}, - expectErr: false, }, { name: "success case with max result", fileBlocks: 10, expectedBlocks: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, maxResult: 3, - expectErr: false, }, { name: "success case with starting offset", @@ -410,7 +392,6 @@ func TestGetAllocatedBlockMetadata(t *testing.T) { blockMetadataType: csi.BlockMetadataType_VARIABLE_LENGTH, maxResult: 100, expectedBlocks: []int{0}, - expectErr: false, }, { name: "success case with starting offset and variable length", @@ -433,38 +414,31 @@ func TestGetAllocatedBlockMetadata(t *testing.T) { file := createTempFile(t, tc.fileBlocks) defer file.Close() - cfg := Config{ - StateDir: stateDir, - Endpoint: "unix://tmp/csi.sock", - DriverName: "hostpath.csi.k8s.io", - NodeID: "fakeNodeID", - MaxVolumeSize: 1024 * 1024 * 1024 * 1024, - EnableTopology: true, - EnableControllerModifyVolume: true, - SnapshotMetadataBlockType: tc.blockMetadataType, + br, err1 := newFileBlockReader("", file.Name(), tc.startingOffset, state.BlockSizeBytes, tc.blockMetadataType, tc.maxResult) + if err1 != nil { + t.Fatalf("expected no error, got: %v", err1) } - - hp, err := NewHostPathDriver(cfg) - if err != nil { - t.Fatal(err) - } - cb := make(chan []*csi.BlockMetadata, 100) - err1 := hp.getAllocatedBlockMetadata(context.Background(), file.Name(), tc.startingOffset, state.BlockSizeBytes, tc.maxResult, cb) - if tc.expectErr { - if err1 == nil { - t.Fatalf("expected error, got none") - } - return - } - if err != nil { - t.Fatalf("expected no error, got: %v", err) + if err := br.seekToStartingOffset(); err != nil { + t.Fatalf("expected no error, got: %v", err1) } + response := []*csi.BlockMetadata{} responsePages := 0 - for c := range cb { - responsePages++ - response = append(response, c...) + ctx := context.Background() + for { + cb, cbErr := br.getChangedBlockMetadata(ctx) + if cbErr != nil && cbErr != io.EOF { + t.Fatalf("expected no error, got: %v", cbErr) + } + if len(cb) != 0 { + responsePages++ + response = append(response, cb...) + } + if cbErr == io.EOF { + break + } } + // Validate max result limit expPages := int(math.Ceil(float64(len(tc.expectedBlocks)) / float64(tc.maxResult))) if responsePages != expPages { diff --git a/pkg/hostpath/snapshotmetadataserver.go b/pkg/hostpath/snapshotmetadataserver.go index 65cc52ea1..301358f17 100644 --- a/pkg/hostpath/snapshotmetadataserver.go +++ b/pkg/hostpath/snapshotmetadataserver.go @@ -17,7 +17,10 @@ limitations under the License. package hostpath import ( + "context" + "errors" "fmt" + "io" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/kubernetes-csi/csi-driver-host-path/pkg/state" @@ -37,7 +40,7 @@ func (hp *hostPath) GetMetadataAllocated(req *csi.GetMetadataAllocatedRequest, s // Load snapshots source, err := hp.state.GetSnapshotByID(snapID) if err != nil { - return status.Error(codes.Internal, "Cannot find the snapshot") + return status.Error(codes.Internal, "cannot find the snapshot") } if !source.ReadyToUse { return status.Error(codes.Unavailable, fmt.Sprintf("snapshot %v is not yet ready to use", snapID)) @@ -48,35 +51,52 @@ func (hp *hostPath) GetMetadataAllocated(req *csi.GetMetadataAllocatedRequest, s return err } if vol.VolAccessType != state.BlockAccess { - return status.Error(codes.InvalidArgument, "Source volume does not have block mode access type") + return status.Error(codes.InvalidArgument, "source volume does not have block mode access type") } - allocatedBlocks := make(chan []*csi.BlockMetadata, 100) - go func() { - err := hp.getAllocatedBlockMetadata(ctx, hp.getSnapshotPath(snapID), req.StartingOffset, state.BlockSizeBytes, req.MaxResults, allocatedBlocks) - if err != nil { - klog.Errorf("failed to get allocated block metadata: %v", err) - } - }() + br, err := newFileBlockReader( + "", + hp.getSnapshotPath(snapID), + req.StartingOffset, + state.BlockSizeBytes, + hp.config.SnapshotMetadataBlockType, + req.MaxResults, + ) + if err != nil { + klog.Errorf("failed initialize file block reader: %v", err) + return status.Error(codes.Internal, "failed initialize file block reader") + } + defer br.Close() + if err := br.seekToStartingOffset(); err != nil { + return status.Error(codes.OutOfRange, fmt.Sprintf("failed to seek to starting offset: %v", err.Error())) + } + // Read all blocks can find allocated blocks till EOF in chunks of size == maxSize for { - select { - case cb, ok := <-allocatedBlocks: - if !ok { - klog.V(4).Info("channel closed, returning") + cb, cbErr := br.getChangedBlockMetadata(ctx) + if cbErr != nil { + if errors.Is(cbErr, context.Canceled) { + klog.V(4).Info("context canceled while getting allocated block metadata, returning") return nil } - resp := csi.GetMetadataAllocatedResponse{ - BlockMetadataType: csi.BlockMetadataType_FIXED_LENGTH, - VolumeCapacityBytes: vol.VolSize, - BlockMetadata: cb, + if errors.Is(cbErr, context.DeadlineExceeded) { + klog.V(4).Info("context deadline exceeded while getting allocated block metadata, returning") + return nil } - if err := stream.Send(&resp); err != nil { - return err + if err == io.EOF { + klog.V(4).Info("reached EOF while getting allocated block metadata, returning") + // send allocated blocks found till EOF + if err := sendGetMetadataAllocatedResponse(stream, vol.VolSize, cb); err != nil { + return err + } + return nil } - case <-ctx.Done(): - klog.V(4).Info("received cancellation signal, returning") - return nil + klog.Errorf("Failed to get allocated block metadata: %v", cbErr) + return status.Error(codes.Internal, "failed to get allocated block metadata") + } + // stream response to client + if err := sendGetMetadataAllocatedResponse(stream, vol.VolSize, cb); err != nil { + return err } } } @@ -96,11 +116,11 @@ func (hp *hostPath) GetMetadataDelta(req *csi.GetMetadataDeltaRequest, stream cs // Load snapshots source, err := hp.state.GetSnapshotByID(baseSnapID) if err != nil { - return status.Error(codes.Internal, "Cannot find the source snapshot") + return status.Error(codes.Internal, "cannot find the source snapshot") } target, err := hp.state.GetSnapshotByID(targetSnapID) if err != nil { - return status.Error(codes.Internal, "Cannot find the target snapshot") + return status.Error(codes.Internal, "cannot find the target snapshot") } if !source.ReadyToUse { @@ -111,42 +131,83 @@ func (hp *hostPath) GetMetadataDelta(req *csi.GetMetadataDeltaRequest, stream cs } if source.VolID != target.VolID { - return status.Error(codes.InvalidArgument, "Snapshots don't belong to the same Volume") + return status.Error(codes.InvalidArgument, "snapshots don't belong to the same Volume") } vol, err := hp.state.GetVolumeByID(source.VolID) if err != nil { return err } if vol.VolAccessType != state.BlockAccess { - return status.Error(codes.InvalidArgument, "Source volume does not have block mode access type") + return status.Error(codes.InvalidArgument, "source volume does not have block mode access type") } - changedBlocks := make(chan []*csi.BlockMetadata, 100) - go func() { - err := hp.getChangedBlockMetadata(ctx, hp.getSnapshotPath(baseSnapID), hp.getSnapshotPath(targetSnapID), req.StartingOffset, state.BlockSizeBytes, req.MaxResults, changedBlocks) - if err != nil { - klog.Errorf("failed to get changed block metadata: %v", err) - } - }() + br, err := newFileBlockReader( + hp.getSnapshotPath(baseSnapID), + hp.getSnapshotPath(targetSnapID), + req.StartingOffset, + state.BlockSizeBytes, + hp.config.SnapshotMetadataBlockType, + req.MaxResults, + ) + if err != nil { + klog.Errorf("failed initialize file block reader: %v", err) + return status.Error(codes.Internal, "failed initialize file block reader") + } + defer br.Close() + if err := br.seekToStartingOffset(); err != nil { + return status.Error(codes.OutOfRange, fmt.Sprintf("failed to seek to starting offset: %v", err.Error())) + } + // Read all blocks can find changed blocks till EOF in chunks of size == maxSize for { - select { - case cb, ok := <-changedBlocks: - if !ok { - klog.V(4).Info("channel closed, returning") + cb, cbErr := br.getChangedBlockMetadata(ctx) + if cbErr != nil { + if errors.Is(cbErr, context.Canceled) { + klog.V(4).Info("context canceled while getting changed block metadata, returning") return nil } - resp := csi.GetMetadataDeltaResponse{ - BlockMetadataType: csi.BlockMetadataType_FIXED_LENGTH, - VolumeCapacityBytes: vol.VolSize, - BlockMetadata: cb, + if errors.Is(cbErr, context.DeadlineExceeded) { + klog.V(4).Info("context deadline exceeded while getting changed block metadata, returning") + return nil } - if err := stream.Send(&resp); err != nil { - return err + if err == io.EOF { + klog.V(4).Info("reached EOF while getting changed block metadata, returning") + // send changed blocks found till EOF + if err := sendGetMetadataDeltaResponse(stream, vol.VolSize, cb); err != nil { + return err + } + return nil } - case <-ctx.Done(): - klog.V(4).Info("received cancellation signal, returning") - return nil + klog.Errorf("failed to get changed block metadata: %v", cbErr) + return status.Error(codes.Internal, "failed to get changed block metadata") + } + // stream response to client + if err := sendGetMetadataDeltaResponse(stream, vol.VolSize, cb); err != nil { + return err } } } + +func sendGetMetadataDeltaResponse(stream csi.SnapshotMetadata_GetMetadataDeltaServer, volSize int64, cb []*csi.BlockMetadata) error { + if len(cb) == 0 { + return nil + } + resp := csi.GetMetadataDeltaResponse{ + BlockMetadataType: csi.BlockMetadataType_FIXED_LENGTH, + VolumeCapacityBytes: volSize, + BlockMetadata: cb, + } + return stream.Send(&resp) +} + +func sendGetMetadataAllocatedResponse(stream csi.SnapshotMetadata_GetMetadataAllocatedServer, volSize int64, cb []*csi.BlockMetadata) error { + if len(cb) == 0 { + return nil + } + resp := csi.GetMetadataAllocatedResponse{ + BlockMetadataType: csi.BlockMetadataType_FIXED_LENGTH, + VolumeCapacityBytes: volSize, + BlockMetadata: cb, + } + return stream.Send(&resp) +} From c0ccb3c31f49e736df28708ab64cb508d9a42c02 Mon Sep 17 00:00:00 2001 From: Prasad Ghangal Date: Thu, 7 Nov 2024 19:33:00 +0530 Subject: [PATCH 08/12] Correct gRPC error codes Signed-off-by: Prasad Ghangal --- pkg/hostpath/snapshotmetadataserver.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/hostpath/snapshotmetadataserver.go b/pkg/hostpath/snapshotmetadataserver.go index 301358f17..95e24e933 100644 --- a/pkg/hostpath/snapshotmetadataserver.go +++ b/pkg/hostpath/snapshotmetadataserver.go @@ -40,7 +40,7 @@ func (hp *hostPath) GetMetadataAllocated(req *csi.GetMetadataAllocatedRequest, s // Load snapshots source, err := hp.state.GetSnapshotByID(snapID) if err != nil { - return status.Error(codes.Internal, "cannot find the snapshot") + return status.Error(codes.NotFound, "cannot find the snapshot") } if !source.ReadyToUse { return status.Error(codes.Unavailable, fmt.Sprintf("snapshot %v is not yet ready to use", snapID)) @@ -116,11 +116,11 @@ func (hp *hostPath) GetMetadataDelta(req *csi.GetMetadataDeltaRequest, stream cs // Load snapshots source, err := hp.state.GetSnapshotByID(baseSnapID) if err != nil { - return status.Error(codes.Internal, "cannot find the source snapshot") + return status.Error(codes.NotFound, "cannot find the source snapshot") } target, err := hp.state.GetSnapshotByID(targetSnapID) if err != nil { - return status.Error(codes.Internal, "cannot find the target snapshot") + return status.Error(codes.NotFound, "cannot find the target snapshot") } if !source.ReadyToUse { From 914468770d48eaef1ef9aa3a833d469f1f873c43 Mon Sep 17 00:00:00 2001 From: Prasad Ghangal Date: Mon, 9 Dec 2024 17:30:09 +0530 Subject: [PATCH 09/12] Correct error checks Signed-off-by: Prasad Ghangal --- pkg/hostpath/snapshotmetadataserver.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/hostpath/snapshotmetadataserver.go b/pkg/hostpath/snapshotmetadataserver.go index 95e24e933..b8ca1e1a8 100644 --- a/pkg/hostpath/snapshotmetadataserver.go +++ b/pkg/hostpath/snapshotmetadataserver.go @@ -83,7 +83,7 @@ func (hp *hostPath) GetMetadataAllocated(req *csi.GetMetadataAllocatedRequest, s klog.V(4).Info("context deadline exceeded while getting allocated block metadata, returning") return nil } - if err == io.EOF { + if errors.Is(cbErr, io.EOF) { klog.V(4).Info("reached EOF while getting allocated block metadata, returning") // send allocated blocks found till EOF if err := sendGetMetadataAllocatedResponse(stream, vol.VolSize, cb); err != nil { @@ -170,7 +170,7 @@ func (hp *hostPath) GetMetadataDelta(req *csi.GetMetadataDeltaRequest, stream cs klog.V(4).Info("context deadline exceeded while getting changed block metadata, returning") return nil } - if err == io.EOF { + if errors.Is(cbErr, io.EOF) { klog.V(4).Info("reached EOF while getting changed block metadata, returning") // send changed blocks found till EOF if err := sendGetMetadataDeltaResponse(stream, vol.VolSize, cb); err != nil { From cdcb8cd9c317967a7c081a9aeff42aaaf0581a99 Mon Sep 17 00:00:00 2001 From: Prasad Ghangal Date: Mon, 9 Dec 2024 17:49:43 +0530 Subject: [PATCH 10/12] Correct MetadataType in the response Signed-off-by: Prasad Ghangal --- pkg/hostpath/snapshotmetadataserver.go | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/pkg/hostpath/snapshotmetadataserver.go b/pkg/hostpath/snapshotmetadataserver.go index b8ca1e1a8..3ed5df161 100644 --- a/pkg/hostpath/snapshotmetadataserver.go +++ b/pkg/hostpath/snapshotmetadataserver.go @@ -86,7 +86,7 @@ func (hp *hostPath) GetMetadataAllocated(req *csi.GetMetadataAllocatedRequest, s if errors.Is(cbErr, io.EOF) { klog.V(4).Info("reached EOF while getting allocated block metadata, returning") // send allocated blocks found till EOF - if err := sendGetMetadataAllocatedResponse(stream, vol.VolSize, cb); err != nil { + if err := sendGetMetadataAllocatedResponse(stream, vol.VolSize, hp.config.SnapshotMetadataBlockType, cb); err != nil { return err } return nil @@ -95,7 +95,7 @@ func (hp *hostPath) GetMetadataAllocated(req *csi.GetMetadataAllocatedRequest, s return status.Error(codes.Internal, "failed to get allocated block metadata") } // stream response to client - if err := sendGetMetadataAllocatedResponse(stream, vol.VolSize, cb); err != nil { + if err := sendGetMetadataAllocatedResponse(stream, vol.VolSize, hp.config.SnapshotMetadataBlockType, cb); err != nil { return err } } @@ -173,7 +173,7 @@ func (hp *hostPath) GetMetadataDelta(req *csi.GetMetadataDeltaRequest, stream cs if errors.Is(cbErr, io.EOF) { klog.V(4).Info("reached EOF while getting changed block metadata, returning") // send changed blocks found till EOF - if err := sendGetMetadataDeltaResponse(stream, vol.VolSize, cb); err != nil { + if err := sendGetMetadataDeltaResponse(stream, vol.VolSize, hp.config.SnapshotMetadataBlockType, cb); err != nil { return err } return nil @@ -182,30 +182,40 @@ func (hp *hostPath) GetMetadataDelta(req *csi.GetMetadataDeltaRequest, stream cs return status.Error(codes.Internal, "failed to get changed block metadata") } // stream response to client - if err := sendGetMetadataDeltaResponse(stream, vol.VolSize, cb); err != nil { + if err := sendGetMetadataDeltaResponse(stream, vol.VolSize, hp.config.SnapshotMetadataBlockType, cb); err != nil { return err } } } -func sendGetMetadataDeltaResponse(stream csi.SnapshotMetadata_GetMetadataDeltaServer, volSize int64, cb []*csi.BlockMetadata) error { +func sendGetMetadataDeltaResponse( + stream csi.SnapshotMetadata_GetMetadataDeltaServer, + volSize int64, + blockMetadataType csi.BlockMetadataType, + cb []*csi.BlockMetadata, +) error { if len(cb) == 0 { return nil } resp := csi.GetMetadataDeltaResponse{ - BlockMetadataType: csi.BlockMetadataType_FIXED_LENGTH, + BlockMetadataType: blockMetadataType, VolumeCapacityBytes: volSize, BlockMetadata: cb, } return stream.Send(&resp) } -func sendGetMetadataAllocatedResponse(stream csi.SnapshotMetadata_GetMetadataAllocatedServer, volSize int64, cb []*csi.BlockMetadata) error { +func sendGetMetadataAllocatedResponse( + stream csi.SnapshotMetadata_GetMetadataAllocatedServer, + volSize int64, + blockMetadataType csi.BlockMetadataType, + cb []*csi.BlockMetadata, +) error { if len(cb) == 0 { return nil } resp := csi.GetMetadataAllocatedResponse{ - BlockMetadataType: csi.BlockMetadataType_FIXED_LENGTH, + BlockMetadataType: blockMetadataType, VolumeCapacityBytes: volSize, BlockMetadata: cb, } From 23f800dcf7d6ffcfe051bb27a896501bad5ff47a Mon Sep 17 00:00:00 2001 From: Prasad Ghangal Date: Mon, 16 Dec 2024 15:17:18 +0530 Subject: [PATCH 11/12] Correct error handling Signed-off-by: Prasad Ghangal --- pkg/hostpath/snapshotmetadata.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/hostpath/snapshotmetadata.go b/pkg/hostpath/snapshotmetadata.go index ae224a491..59af548d8 100644 --- a/pkg/hostpath/snapshotmetadata.go +++ b/pkg/hostpath/snapshotmetadata.go @@ -91,7 +91,6 @@ func (cb *fileBlockReader) Close() error { func openFiles(basePath, targetPath string) (base, target *os.File, err error) { target, err = os.Open(targetPath) if err != nil { - base.Close() return nil, nil, err } if basePath == "" { @@ -99,6 +98,7 @@ func openFiles(basePath, targetPath string) (base, target *os.File, err error) { } base, err = os.Open(basePath) if err != nil { + target.Close() return nil, nil, err } From 60a97682d297651581f7f9539cbd9482753173e9 Mon Sep 17 00:00:00 2001 From: Prasad Ghangal Date: Tue, 17 Dec 2024 17:45:57 +0530 Subject: [PATCH 12/12] Set default maxResults if not set by client Signed-off-by: Prasad Ghangal --- pkg/hostpath/snapshotmetadataserver.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/pkg/hostpath/snapshotmetadataserver.go b/pkg/hostpath/snapshotmetadataserver.go index 3ed5df161..d4cb7bdb9 100644 --- a/pkg/hostpath/snapshotmetadataserver.go +++ b/pkg/hostpath/snapshotmetadataserver.go @@ -29,6 +29,8 @@ import ( "k8s.io/klog/v2" ) +const defaultMaxResults = 256 + func (hp *hostPath) GetMetadataAllocated(req *csi.GetMetadataAllocatedRequest, stream csi.SnapshotMetadata_GetMetadataAllocatedServer) error { ctx := stream.Context() // Check arguments @@ -54,13 +56,18 @@ func (hp *hostPath) GetMetadataAllocated(req *csi.GetMetadataAllocatedRequest, s return status.Error(codes.InvalidArgument, "source volume does not have block mode access type") } + maxResults := req.MaxResults + if maxResults == 0 { + maxResults = defaultMaxResults + } + br, err := newFileBlockReader( "", hp.getSnapshotPath(snapID), req.StartingOffset, state.BlockSizeBytes, hp.config.SnapshotMetadataBlockType, - req.MaxResults, + maxResults, ) if err != nil { klog.Errorf("failed initialize file block reader: %v", err) @@ -141,13 +148,18 @@ func (hp *hostPath) GetMetadataDelta(req *csi.GetMetadataDeltaRequest, stream cs return status.Error(codes.InvalidArgument, "source volume does not have block mode access type") } + maxResults := req.MaxResults + if maxResults == 0 { + maxResults = defaultMaxResults + } + br, err := newFileBlockReader( hp.getSnapshotPath(baseSnapID), hp.getSnapshotPath(targetSnapID), req.StartingOffset, state.BlockSizeBytes, hp.config.SnapshotMetadataBlockType, - req.MaxResults, + maxResults, ) if err != nil { klog.Errorf("failed initialize file block reader: %v", err)