Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flexible File Filtering Feature for Blobfuse Mounting #1435

Open
wants to merge 32 commits into
base: feature/filter
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
b88dc62
Implemented 1st step that is parsing of filter string given by user
t-yashmahale May 29, 2024
8d66bdb
added a dummy threadpool where 16 threads are accessing files parallely
t-yashmahale May 29, 2024
b88e380
added <= and >= features in size filter
t-yashmahale May 29, 2024
a3bd1d8
Added all functions for checking the file against filters
t-yashmahale May 30, 2024
05e6869
solved bug of deadlock if a file does not returns true for any compar…
t-yashmahale May 30, 2024
5107413
created modules for code and added meaningfull names
t-yashmahale May 31, 2024
f4b9ccf
added comments for complex functions
t-yashmahale May 31, 2024
99d3458
added regex filter
t-yashmahale Jun 3, 2024
9f54faa
added modification time filter (currently supporting ust only)
t-yashmahale Jun 3, 2024
c817cba
made some changes to make code more efficient
t-yashmahale Jun 4, 2024
448e473
increased modularity
t-yashmahale Jun 5, 2024
02c70d5
more optimised prev commit
t-yashmahale Jun 5, 2024
7af33a5
included reading from output channel
t-yashmahale Jun 5, 2024
1f290e4
passed all variables by refrence and other optimisations
t-yashmahale Jun 6, 2024
df32e9e
started integration ,added input flag blobFilter
t-yashmahale Jun 9, 2024
024a9e3
updated filter.go
t-yashmahale Jun 9, 2024
aec9db8
integrated filters with blobfuse
t-yashmahale Jun 10, 2024
e19c97a
filter integrated with prints
t-yashmahale Jun 10, 2024
27eca4f
minor changes
t-yashmahale Jun 11, 2024
057f84e
_
t-yashmahale Jun 11, 2024
c48ef28
invalid input filter handeling done
t-yashmahale Jun 12, 2024
861a4a0
GetAttr done and error handling in case of wrong filter
t-yashmahale Jun 12, 2024
0555121
added comments
t-yashmahale Jun 13, 2024
733bad0
made some more changes to clean code
t-yashmahale Jun 13, 2024
40dc026
made changes
t-yashmahale Jun 13, 2024
8d5cdb0
solved bug of infinite waiting when a list containing no file is pass…
t-yashmahale Jun 13, 2024
2532fa0
resolved issue of infinite waiting in recieveOutput function due to a…
t-yashmahale Jun 14, 2024
08c01f1
added access tier filter
t-yashmahale Jun 18, 2024
cd8e469
added directory functionality
t-yashmahale Jun 18, 2024
7f98d4a
added blobtag filter and some other changes
t-yashmahale Jun 19, 2024
61450e3
Enable read-only mode when user specifies filters
t-yashmahale Jun 24, 2024
a5228d4
changed name of flag to blob-filter
t-yashmahale Jul 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ test/manual_scripts/cachetest.go
lint.log
azure-storage-fuse
bfusemon
test/scripts/dirIterate.go
test/scripts/dirIterate.go
13 changes: 13 additions & 0 deletions cmd/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/Azure/azure-storage-fuse/v2/common/config"
"github.com/Azure/azure-storage-fuse/v2/common/log"
"github.com/Azure/azure-storage-fuse/v2/internal"
"github.com/Azure/azure-storage-fuse/v2/internal/filter"

"github.com/sevlyar/go-daemon"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -88,6 +89,7 @@ type mountOptions struct {
MonitorOpt monitorOptions `config:"health_monitor"`
WaitForMount time.Duration `config:"wait-for-mount"`
LazyWrite bool `config:"lazy-write"`
blobFilter string `config:"blob-filter"`

// v1 support
Streaming bool `config:"streaming"`
Expand Down Expand Up @@ -242,6 +244,13 @@ var mountCmd = &cobra.Command{

configFileExists := true

if config.IsSet("blob-filter") {
if len(options.blobFilter) > 0 {
filter.ProvidedFilter = options.blobFilter
config.Set("read-only", "true") //set read-only mode if filter is provided
}
}

if options.ConfigFile == "" {
// Config file is not set in cli parameters
// Blobfuse2 defaults to config.yaml in current directory
Expand Down Expand Up @@ -707,6 +716,10 @@ func init() {
config.BindPFlag("pre-mount-validate", mountCmd.Flags().Lookup("pre-mount-validate"))
mountCmd.Flags().Lookup("pre-mount-validate").Hidden = true

//accessing blobFilter
mountCmd.PersistentFlags().StringVar(&options.blobFilter, "blob-filter", "", "Filter string for blob filtering.")
config.BindPFlag("blob-filter", mountCmd.PersistentFlags().Lookup("blob-filter"))

mountCmd.Flags().Bool("basic-remount-check", true, "Validate blobfuse2 is mounted by reading /etc/mtab.")
config.BindPFlag("basic-remount-check", mountCmd.Flags().Lookup("basic-remount-check"))
mountCmd.Flags().Lookup("basic-remount-check").Hidden = true
Expand Down
27 changes: 25 additions & 2 deletions component/azstorage/azstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ package azstorage

import (
"context"
"errors"
"fmt"
"sync/atomic"
"syscall"
Expand All @@ -45,9 +46,9 @@ import (
"github.com/Azure/azure-storage-fuse/v2/common/config"
"github.com/Azure/azure-storage-fuse/v2/common/log"
"github.com/Azure/azure-storage-fuse/v2/internal"
"github.com/Azure/azure-storage-fuse/v2/internal/filter"
"github.com/Azure/azure-storage-fuse/v2/internal/handlemap"
"github.com/Azure/azure-storage-fuse/v2/internal/stats_manager"

"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -298,6 +299,7 @@ func (az *AzStorage) StreamDir(options internal.StreamDirOptions) ([]*internal.O
path := formatListDirName(options.Name)

new_list, new_marker, err := az.storage.List(path, &options.Token, options.Count)

if err != nil {
log.Err("AzStorage::StreamDir : Failed to read dir [%s]", err)
return new_list, "", err
Expand Down Expand Up @@ -331,6 +333,11 @@ func (az *AzStorage) StreamDir(options internal.StreamDirOptions) ([]*internal.O
// increment streamdir call count
azStatsCollector.UpdateStats(stats_manager.Increment, streamDir, (int64)(1))

//check for filters provided
if az.stConfig.filters != nil { //only apply if user has given filter
filtered_list := az.stConfig.filters.ApplyFilterOnBlobs(new_list)
return filtered_list, *new_marker, nil
}
return new_list, *new_marker, nil
}

Expand Down Expand Up @@ -521,7 +528,23 @@ func (az *AzStorage) ReadLink(options internal.ReadLinkOptions) (string, error)
// Attribute operations
func (az *AzStorage) GetAttr(options internal.GetAttrOptions) (attr *internal.ObjAttr, err error) {
//log.Trace("AzStorage::GetAttr : Get attributes of file %s", name)
return az.storage.GetAttr(options.Name)
// return az.storage.GetAttr(options.Name)
resp, err := az.storage.GetAttr(options.Name)
if err != nil {
return resp, err
}

if az.stConfig.filters != nil {
fileValidatorObj := &filter.FileValidator{
FilterArr: az.stConfig.filters.FilterArr,
}
if fileValidatorObj.CheckFileWithFilters(resp) { //if this particular file passes all filters, return it
return resp, nil
} else {
return nil, errors.New("the file does not pass the provided filters")
}
}
return resp, err
}

func (az *AzStorage) Chmod(options internal.ChmodOptions) error {
Expand Down
13 changes: 13 additions & 0 deletions component/azstorage/block_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func (bb *BlockBlob) Configure(cfg AzStorageConfig) error {
Snapshots: false,
}

//if filter is provided, and blobtag filter is also present then we need the details about blobtags
if bb.AzStorageConnection.Config.filters != nil && bb.AzStorageConnection.Config.filters.TagChk {
bb.listDetails.Tags = true
}
return nil
}

Expand Down Expand Up @@ -446,6 +450,7 @@ func (bb *BlockBlob) getAttrUsingRest(name string) (attr *internal.ObjAttr, err
Crtime: *prop.CreationTime,
Flags: internal.NewFileBitMap(),
MD5: prop.ContentMD5,
Tier: *prop.AccessTier,
}

parseMetadata(attr, prop.Metadata)
Expand Down Expand Up @@ -593,11 +598,15 @@ func (bb *BlockBlob) List(prefix string, marker *string, count int32) ([]*intern
Crtime: dereferenceTime(blobInfo.Properties.CreationTime, *blobInfo.Properties.LastModified),
Flags: internal.NewFileBitMap(),
MD5: blobInfo.Properties.ContentMD5,
Tier: string(*blobInfo.Properties.AccessTier),
}
parseMetadata(attr, blobInfo.Metadata)
attr.Flags.Set(internal.PropFlagMetadataRetrieved)
attr.Flags.Set(internal.PropFlagModeDefault)
}
if bb.listDetails.Tags { //if we need blobtags
attr.Tags = parseBlobTags(blobInfo.BlobTags)
}
blobList = append(blobList, attr)

if attr.IsDir() {
Expand Down Expand Up @@ -636,6 +645,10 @@ func (bb *BlockBlob) List(prefix string, marker *string, count int32) ([]*intern
attr.Ctime = attr.Mtime
attr.Flags.Set(internal.PropFlagMetadataRetrieved)
attr.Flags.Set(internal.PropFlagModeDefault)
attr.Tier = ""
if bb.Config.defaultTier != nil { //if any defualt value of access tier is provided ,set it
attr.Tier = string(*bb.Config.defaultTier)
}
blobList = append(blobList, attr)
}
}
Expand Down
14 changes: 14 additions & 0 deletions component/azstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-storage-fuse/v2/common/config"
"github.com/Azure/azure-storage-fuse/v2/common/log"
"github.com/Azure/azure-storage-fuse/v2/internal/filter"

"github.com/JeffreyRichter/enum/enum"
)
Expand Down Expand Up @@ -186,6 +187,7 @@ type AzStorageOptions struct {
CPKEnabled bool `config:"cpk-enabled" yaml:"cpk-enabled"`
CPKEncryptionKey string `config:"cpk-encryption-key" yaml:"cpk-encryption-key"`
CPKEncryptionKeySha256 string `config:"cpk-encryption-key-sha256" yaml:"cpk-encryption-key-sha256"`
// BlobFilter string `config:"blobFilter" yaml:"blobFilter"`

// v1 support
UseAdls bool `config:"use-adls" yaml:"-"`
Expand Down Expand Up @@ -386,6 +388,18 @@ func ParseAndValidateConfig(az *AzStorage, opt AzStorageOptions) error {

az.stConfig.telemetry = opt.Telemetry

//if blobFilter is provided, parse string and setup filters
if len(filter.ProvidedFilter) > 0 {
log.Info("ParseAndValidateConfig : provided filter is %s", filter.ProvidedFilter)
az.stConfig.filters = &filter.UserInputFilters{}
erro := az.stConfig.filters.ParseInp(&filter.ProvidedFilter)
log.Info("ParseAndValidateConfig : number of OR seperated filters are %d", len(az.stConfig.filters.FilterArr))
if erro != nil {
log.Err("ParseAndValidateConfig : mount failed due to an error encountered while parsing")
return erro
}
}

httpProxyProvided := opt.HttpProxyAddress != ""
httpsProxyProvided := opt.HttpsProxyAddress != ""

Expand Down
4 changes: 4 additions & 0 deletions component/azstorage/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/Azure/azure-storage-fuse/v2/common"
"github.com/Azure/azure-storage-fuse/v2/common/log"
"github.com/Azure/azure-storage-fuse/v2/internal"
"github.com/Azure/azure-storage-fuse/v2/internal/filter"
)

// Example for azblob usage : https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/storage/azblob#pkg-examples
Expand Down Expand Up @@ -81,6 +82,9 @@ type AzStorageConfig struct {
cpkEnabled bool
cpkEncryptionKey string
cpkEncryptionKeySha256 string

// Filter related config
filters *filter.UserInputFilters
}

type AzStorageConnection struct {
Expand Down
4 changes: 3 additions & 1 deletion component/azstorage/datalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ func (dl *Datalake) GetAttr(name string) (attr *internal.ObjAttr, err error) {
Ctime: *prop.LastModified,
Crtime: *prop.LastModified,
Flags: internal.NewFileBitMap(),
Tier: *prop.AccessTier, //set up tier
}
parseMetadata(attr, prop.Metadata)

Expand Down Expand Up @@ -471,7 +472,8 @@ func (dl *Datalake) List(prefix string, marker *string, count int32) ([]*interna
for _, pathInfo := range listPath.Paths {
var attr *internal.ObjAttr
var lastModifiedTime time.Time
if dl.Config.disableSymlink {
//if tier filter is provided by user then we need to set it up in the else statement
if (dl.Config.disableSymlink && dl.Config.filters == nil) || (dl.Config.disableSymlink && !dl.Config.filters.TierChk) {
var mode fs.FileMode
if pathInfo.Permissions != nil {
mode, err = getFileMode(*pathInfo.Permissions)
Expand Down
19 changes: 19 additions & 0 deletions component/azstorage/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
serviceBfs "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/service"
"github.com/Azure/azure-storage-fuse/v2/common"
Expand Down Expand Up @@ -304,6 +305,24 @@ func parseMetadata(attr *internal.ObjAttr, metadata map[string]*string) {
}
}

// ----------- BlobTags handling ---------------
func parseBlobTags(tags *container.BlobTags) map[string]string {
blobtags := make(map[string]string) //pushing blobtags in map for fast execution during filtering
if tags != nil {
for _, tag := range tags.BlobTagSet {
if tag != nil {
if tag.Key != nil {
blobtags[*tag.Key] = ""
}
if tag.Value != nil {
blobtags[*tag.Key] = *tag.Value
}
}
}
}
return blobtags
}

// ----------- Content-type handling ---------------

// ContentTypeMap : Store file extension to content-type mapping
Expand Down
2 changes: 2 additions & 0 deletions internal/attribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type ObjAttr struct {
Name string // base name of the path
MD5 []byte
Metadata map[string]*string // extra information to preserve
Tier string //access tier of blob
Tags map[string]string //blobtags (key value pair)
}

// IsDir : Test blob is a directory or not
Expand Down
47 changes: 47 additions & 0 deletions internal/filter/accesstier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package filter

import (
"errors"
"strings"

"github.com/Azure/azure-storage-fuse/v2/internal"
)

const lenTier = len(tier)

type AccessTierFilter struct {
opr bool // true means equal to , false means not equal to
tier string
}

func (filter AccessTierFilter) Apply(fileInfo *internal.ObjAttr) bool {
// fmt.Println("AccessTier filter ", filter, " file name ", (*fileInfo).Name) DEBUG PRINT
return (filter.opr == (filter.tier == strings.ToLower(fileInfo.Tier))) //if both are same then return true
}

// used for dynamic creation of AccessTierFilter
func newAccessTierFilter(args ...interface{}) Filter {
return AccessTierFilter{
opr: args[0].(bool),
tier: args[1].(string),
}
}

func giveAccessTierFilterObj(singleFilter *string) (Filter, error) {
(*singleFilter) = strings.Map(StringConv, (*singleFilter)) //remove all spaces and make all upperCase to lowerCase
sinChk := (*singleFilter)[lenTier : lenTier+1] //single char after tier (ex- tier=hot , here sinChk will be "=")
doubChk := (*singleFilter)[lenTier : lenTier+2] //2 chars after tier (ex- tier != cold , here doubChk will be "!=")
erro := errors.New("invalid accesstier filter, no files passed")
if !((sinChk == "=") || (doubChk == "!=")) {
return nil, erro
}
if (doubChk == "!=") && (len(*singleFilter) > lenTier+2) {
value := (*singleFilter)[lenTier+2:] // len(tier) + 2 = 4 and + 2
return newAccessTierFilter(false, value), nil
} else if (sinChk == "=") && (len(*singleFilter) > lenTier+1) {
value := (*singleFilter)[lenTier+1:] // len(tier) + 1 = 4 and + 1
return newAccessTierFilter(true, value), nil
} else {
return nil, erro
}
}
53 changes: 53 additions & 0 deletions internal/filter/blobtag.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package filter

import (
"errors"
"strings"

"github.com/Azure/azure-storage-fuse/v2/internal"
)

const lenTag = len(tag)

type BlobTagFilter struct {
key string
value string
}

func (filter BlobTagFilter) Apply(fileInfo *internal.ObjAttr) bool {
// fmt.Println("BlobTag filter ", filter, " file name ", (*fileInfo).Name) DEBUG PRINT
if val, ok := fileInfo.Tags[filter.key]; ok {
return (filter.value == strings.ToLower(val))
}
return false
}

// used for dynamic creation of BlobTagFilter
func newBlobTagFilter(args ...interface{}) Filter {
return BlobTagFilter{
key: args[0].(string),
value: args[1].(string),
}
}

func giveBlobTagFilterObj(singleFilter *string) (Filter, error) {
(*singleFilter) = strings.Map(StringConv, (*singleFilter)) //remove all spaces and make all upperCase to lowerCase
sinChk := (*singleFilter)[lenTag : lenTag+1] //single char after tag (ex- tag=hot:yes , here sinChk will be "=")
erro := errors.New("invalid blobtag filter, no files passed")
if !(sinChk == "=") {
return nil, erro
}
splitEq := strings.Split(*singleFilter, "=")
if len(splitEq) == 2 {
splitCol := strings.Split(splitEq[1], ":")
if len(splitCol) == 2 {
tagKey := splitCol[0]
tagVal := splitCol[1]
return newBlobTagFilter(tagKey, tagVal), nil
} else {
return nil, erro
}
} else {
return nil, erro
}
}
Loading