diff --git a/cmd/config.go b/cmd/config.go index 9ff45e9a8291..e9db5005660b 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -75,6 +75,10 @@ $ juicefs config redis://localhost --min-client-version 1.0.0 --max-client-versi Name: "session-token", Usage: "session token for object storage", }, + &cli.StringFlag{ + Name: "storage-class", + Usage: "the default storage class for data written in future", + }, &cli.BoolFlag{ Name: "encrypt-secret", Usage: "encrypt the secret key if it was previously stored in plain format", @@ -187,6 +191,12 @@ func config(ctx *cli.Context) error { } format.SessionToken = ctx.String(flag) storage = true + case "storage-class": // always update + if new := ctx.String(flag); new != format.StorageClass { + msg.WriteString(fmt.Sprintf("%10s: %s -> %s\n", flag, format.StorageClass, new)) + format.StorageClass = new + storage = true + } case "trash-days": if new := ctx.Int(flag); new != format.TrashDays { if new < 0 { diff --git a/cmd/flags.go b/cmd/flags.go index 2550f7c70cf3..b5fa041bed70 100644 --- a/cmd/flags.go +++ b/cmd/flags.go @@ -84,6 +84,10 @@ func clientFlags() []cli.Flag { Name: "bucket", Usage: "customized endpoint to access object store", }, + &cli.StringFlag{ + Name: "storage-class", + Usage: "the storage class for data written by current client", + }, &cli.IntFlag{ Name: "get-timeout", Value: 60, diff --git a/cmd/format.go b/cmd/format.go index 6a5b89f54bcc..ab79896636ec 100644 --- a/cmd/format.go +++ b/cmd/format.go @@ -141,6 +141,10 @@ Details: https://juicefs.com/docs/community/quick_start_guide`, Name: "session-token", Usage: "session token for object storage", }, + &cli.StringFlag{ + Name: "storage-class", + Usage: "the default storage class", + }, &cli.StringFlag{ Name: "encrypt-rsa-key", Usage: "a path to RSA private key (PEM)", @@ -219,7 +223,11 @@ func createStorage(format meta.Format) (object.ObjectStorage, error) { return nil, err } blob = object.WithPrefix(blob, format.Name+"/") - + if format.StorageClass != "" { + if os, ok := blob.(object.SupportStorageClass); ok { + os.SetStorageClass(format.StorageClass) + } + } if format.EncryptKey != "" { passphrase := os.Getenv("JFS_RSA_PASSPHRASE") if passphrase == "" { @@ -392,6 +400,7 @@ func format(c *cli.Context) error { Name: name, UUID: uuid.New().String(), Storage: c.String("storage"), + StorageClass: c.String("storage-class"), Bucket: c.String("bucket"), AccessKey: c.String("access-key"), SecretKey: c.String("secret-key"), diff --git a/cmd/mount.go b/cmd/mount.go index d4c7eba93018..f1981d9f77e7 100644 --- a/cmd/mount.go +++ b/cmd/mount.go @@ -184,6 +184,9 @@ func updateFormat(c *cli.Context) func(*meta.Format) { if c.IsSet("storage") { format.Storage = c.String("storage") } + if c.IsSet("storage-class") { + format.StorageClass = c.String("storage-class") + } } } @@ -434,8 +437,8 @@ func NewReloadableStorage(format *meta.Format, cli meta.Meta, patch func(*meta.F patch(new) } old := &holder.fmt - if new.Storage != old.Storage || new.Bucket != old.Bucket || new.AccessKey != old.AccessKey || new.SecretKey != old.SecretKey || new.SessionToken != old.SessionToken { - logger.Infof("found new configuration: storage=%s bucket=%s ak=%s", new.Storage, new.Bucket, new.AccessKey) + if new.Storage != old.Storage || new.Bucket != old.Bucket || new.AccessKey != old.AccessKey || new.SecretKey != old.SecretKey || new.SessionToken != old.SessionToken || new.StorageClass != old.StorageClass { + logger.Infof("found new configuration: storage=%s bucket=%s ak=%s storageClass=%s", new.Storage, new.Bucket, new.AccessKey, new.StorageClass) newBlob, err := createStorage(*new) if err != nil { diff --git a/cmd/object.go b/cmd/object.go index 54143a6e66a6..1f048755beaa 100644 --- a/cmd/object.go +++ b/cmd/object.go @@ -181,12 +181,13 @@ func (o *jObj) Size() int64 { } return o.fi.Size() } -func (o *jObj) Mtime() time.Time { return o.fi.ModTime() } -func (o *jObj) IsDir() bool { return o.fi.IsDir() } -func (o *jObj) IsSymlink() bool { return o.fi.IsSymlink() } -func (o *jObj) Owner() string { return utils.UserName(o.fi.Uid()) } -func (o *jObj) Group() string { return utils.GroupName(o.fi.Gid()) } -func (o *jObj) Mode() os.FileMode { return o.fi.Mode() } +func (o *jObj) Mtime() time.Time { return o.fi.ModTime() } +func (o *jObj) IsDir() bool { return o.fi.IsDir() } +func (o *jObj) IsSymlink() bool { return o.fi.IsSymlink() } +func (o *jObj) Owner() string { return utils.UserName(o.fi.Uid()) } +func (o *jObj) Group() string { return utils.GroupName(o.fi.Gid()) } +func (o *jObj) Mode() os.FileMode { return o.fi.Mode() } +func (o *jObj) StorageClass() string { return "" } func (j *juiceFS) Head(key string) (object.Object, error) { fi, eno := j.jfs.Stat(ctx, j.path(key)) diff --git a/cmd/sync.go b/cmd/sync.go index 6a5ad6389c2e..5a3981c14452 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -73,6 +73,10 @@ $ juicefs sync --include='a1/b1' --exclude='a*' --include='b2' --exclude='b?' s3 Details: https://juicefs.com/docs/community/administration/sync Supported storage systems: https://juicefs.com/docs/community/how_to_setup_object_storage#supported-object-storage`, Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "storage-class", + Usage: "the storage class for destination", + }, &cli.StringFlag{ Name: "start", Aliases: []string{"s"}, @@ -349,5 +353,8 @@ func doSync(c *cli.Context) error { if err != nil { return err } + if os, ok := dst.(object.SupportStorageClass); ok { + os.SetStorageClass(config.StorageClass) + } return sync.Sync(src, dst, config) } diff --git a/go.mod b/go.mod index 77f69c112d08..01ffb8504779 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( cloud.google.com/go/compute v1.5.0 cloud.google.com/go/storage v1.21.0 github.com/Arvintian/scs-go-sdk v1.1.0 - github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.3.0 + github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.4.0 github.com/DataDog/zstd v1.5.0 github.com/IBM/ibm-cos-sdk-go v1.8.0 github.com/agiledragon/gomonkey/v2 v2.6.0 @@ -80,8 +80,8 @@ require ( cloud.google.com/go v0.100.2 // indirect cloud.google.com/go/iam v0.1.1 // indirect git.apache.org/thrift.git v0.13.0 // indirect - github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.1 // indirect - github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.3 // indirect + github.com/Azure/azure-sdk-for-go/sdk/azcore v0.23.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/internal v0.9.2 // indirect github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c // indirect github.com/StackExchange/wmi v1.2.1 // indirect github.com/VividCortex/ewma v1.2.0 // indirect @@ -206,7 +206,6 @@ require ( github.com/smartystreets/assertions v1.2.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/stathat/consistent v1.0.0 // indirect - github.com/stretchr/objx v0.5.0 // indirect github.com/syndtr/goleveldb v1.0.0 // indirect github.com/tidwall/gjson v1.9.3 // indirect github.com/tidwall/match v1.1.1 // indirect diff --git a/go.sum b/go.sum index 9d2d18b1a6cd..8c5df8698f51 100644 --- a/go.sum +++ b/go.sum @@ -67,13 +67,15 @@ gitea.com/xorm/sqlfiddle v0.0.0-20180821085327-62ce714f951a/go.mod h1:EXuID2Zs0p github.com/Arvintian/scs-go-sdk v1.1.0 h1:vqVOfoMD6XSr7eG1a2M9oSiQwhDZYKKdH2rrZRPx6So= github.com/Arvintian/scs-go-sdk v1.1.0/go.mod h1:DMIkwn27iuTIo9o7INj3L/bcA7bW6QwljWC3ZpxjkXw= github.com/Azure/azure-sdk-for-go v32.4.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= +github.com/Azure/azure-sdk-for-go v32.6.0+incompatible h1:PgaVceWF5idtJajyt1rzq1cep6eRPJ8+8hs4GnNzTo0= github.com/Azure/azure-sdk-for-go v32.6.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= -github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.1 h1:qoVeMsc9/fh/yhxVaA0obYjVH/oI/ihrOoMwsLS9KSA= -github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.1/go.mod h1:fBF9PQNqB8scdgpZ3ufzaLntG0AG7C1WjPMsiFOmfHM= -github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.3 h1:E+m3SkZCN0Bf5q7YdTs5lSm2CYY3CK4spn5OmUIiQtk= -github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.3/go.mod h1:KLF4gFr6DcKFZwSuH8w8yEK6DpFl3LP5rhdvAb7Yz5I= -github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.3.0 h1:Px2UA+2RvSSvv+RvJNuUB6n7rs5Wsel4dXLe90Um2n4= -github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.3.0/go.mod h1:tPaiy8S5bQ+S5sOiDlINkp7+Ef339+Nz5L5XO+cnOHo= +github.com/Azure/azure-sdk-for-go/sdk/azcore v0.23.0 h1:D7l5jspkc4kwBYRWoZE4DQnu6LVpLwDsMZjBKS4wZLQ= +github.com/Azure/azure-sdk-for-go/sdk/azcore v0.23.0/go.mod h1:w5pDIZuawUmY3Bj4tVx3Xb8KS96ToB0j315w9rqpAg0= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.14.0 h1:NVS/4LOQfkBpk+B1VopIzv1ptmYeEskA8w/3K/w7vjo= +github.com/Azure/azure-sdk-for-go/sdk/internal v0.9.2 h1:Px2KVERcYEg2Lv25AqC2hVr0xUWaq94wuEObLIkYzmA= +github.com/Azure/azure-sdk-for-go/sdk/internal v0.9.2/go.mod h1:CdSJQNNzZhCkwDaV27XV1w48ZBPtxe7mlrZAsPNxD5g= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.4.0 h1:0nJeKDmB7a1a8RDMjTltahlPsaNlWjq/LpkZleSwINk= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.4.0/go.mod h1:mbwxKc/fW+IkF0GG591MuXw0KuEQBDkeRoZ9vmVJPxg= github.com/Azure/go-autorest v11.1.2+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= github.com/Azure/go-autorest/autorest v0.1.0/go.mod h1:AKyIcETwSUFxIcs/Wnq/C+kwCtlEYGUVd7FPNb2slmg= github.com/Azure/go-autorest/autorest v0.5.0/go.mod h1:9HLKlQjVBH6U3oDfsXOeVc56THsLPw1L03yban4xThw= @@ -100,6 +102,7 @@ github.com/Azure/go-autorest/tracing v0.1.0/go.mod h1:ROEEAFwXycQw7Sn3DXNtEedEvd github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk= github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c h1:/IBSNwUN8+eKzUzbJPqhK839ygXJ82sde8x3ogr6R28= github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU= +github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 h1:WVsrXCnHlDDX8ls+tootqRE87/hL9S/g4ewig9RsD/c= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= @@ -259,9 +262,8 @@ github.com/dimchansky/utfbom v1.1.0/go.mod h1:rO41eb7gLfo8SF1jd9F8HplJm1Fewwi4mQ github.com/djherbis/atime v1.0.0 h1:ySLvBAM0EvOGaX7TI4dAM5lWj+RdJUCKtGSEHN8SGBg= github.com/djherbis/atime v1.0.0/go.mod h1:5W+KBIuTwVGcqjIfaTwt+KSYX1o6uep8dtevevQP/f8= github.com/dnaeon/go-vcr v0.0.0-20180814043457-aafff18a5cc2/go.mod h1:aBB1+wY4s93YsC3HHjMBMrwTj2R9FHDzUr9KyGc8n1E= +github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c= github.com/dnaeon/go-vcr v1.1.0/go.mod h1:M7tiix8f0r6mKKJ3Yq/kqU1OYf3MnfmBWVbPx/yU9ko= -github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI= -github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ= github.com/dnsimple/dnsimple-go v0.30.0/go.mod h1:O5TJ0/U6r7AfT8niYNlmohpLbCSG+c71tQlGr9SeGrg= github.com/dnstap/golang-dnstap v0.0.0-20170829151710-2cf77a2b5e11/go.mod h1:s1PfVYYVmTMgCSPtho4LKBDecEHJWtiVDPNv78Z985U= github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= @@ -645,6 +647,7 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/ks3sdklib/aws-sdk-go v1.2.0 h1:Hhe7Ku2gs/TykWy4hoSVTqlLu2p+AApeatKHysgRgVM= github.com/ks3sdklib/aws-sdk-go v1.2.0/go.mod h1:DVzr6V4XzDjdy+H+1ptuIDIy1MQgI+28SvUpOkJXJD8= github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/labbsr0x/bindman-dns-webhook v1.0.2/go.mod h1:p6b+VCXIR8NYKpDr8/dg1HKfQoRHCdcsROXKvmoehKA= github.com/labbsr0x/goh v1.0.1/go.mod h1:8K2UhVoaWXcCU7Lxoa2omWnC8gyW8px7/lmO61c027w= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= @@ -793,6 +796,7 @@ github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172 h1:FYgKV9znRQmzVrr github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 h1:URLoJ61DmmY++Sa/yyPEQHG2s/ZBeV1FbIswHEMrdoY= github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= +github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 h1:Qj1ukM4GlMWXNdMBuXcXfz/Kw9s1qm0CLY32QxuSImI= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -920,9 +924,8 @@ github.com/stathat/consistent v1.0.0 h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4Cc github.com/stathat/consistent v1.0.0/go.mod h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -1157,8 +1160,6 @@ golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLd golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8= golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211216030914-fe4d6282115f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= diff --git a/pkg/meta/config.go b/pkg/meta/config.go index f1ddfbe13d28..dc70732992cd 100644 --- a/pkg/meta/config.go +++ b/pkg/meta/config.go @@ -68,6 +68,7 @@ type Format struct { Name string UUID string Storage string + StorageClass string `json:",omitempty"` Bucket string AccessKey string `json:",omitempty"` SecretKey string `json:",omitempty"` diff --git a/pkg/object/azure.go b/pkg/object/azure.go index 988fff4b2dca..9263f1b27c3c 100644 --- a/pkg/object/azure.go +++ b/pkg/object/azure.go @@ -33,6 +33,7 @@ import ( type wasb struct { DefaultObjectStorage container *azblob.ContainerClient + sc string cName string marker string } @@ -50,7 +51,11 @@ func (b *wasb) Create() error { } func (b *wasb) Head(key string) (Object, error) { - properties, err := b.container.NewBlobClient(key).GetProperties(ctx, &azblob.GetBlobPropertiesOptions{}) + client, err := b.container.NewBlobClient(key) + if err != nil { + return nil, err + } + properties, err := client.GetProperties(ctx, &azblob.BlobGetPropertiesOptions{}) if err != nil { if strings.Contains(err.Error(), string(azblob.StorageErrorCodeBlobNotFound)) { err = os.ErrNotExist @@ -63,30 +68,59 @@ func (b *wasb) Head(key string) (Object, error) { *properties.ContentLength, *properties.LastModified, strings.HasSuffix(key, "/"), + *properties.AccessTier, }, nil } func (b *wasb) Get(key string, off, limit int64) (io.ReadCloser, error) { - download, err := b.container.NewBlockBlobClient(key).Download(ctx, &azblob.DownloadBlobOptions{Offset: &off, Count: &limit}) + client, err := b.container.NewBlockBlobClient(key) if err != nil { return nil, err } - return download.BlobDownloadResponse.RawResponse.Body, err + download, err := client.Download(ctx, &azblob.BlobDownloadOptions{Offset: &off, Count: &limit}) + if err != nil { + return nil, err + } + return download.RawResponse.Body, err } func (b *wasb) Put(key string, data io.Reader) error { - _, err := b.container.NewBlockBlobClient(key).UploadStreamToBlockBlob(ctx, data, azblob.UploadStreamToBlockBlobOptions{}) + client, err := b.container.NewBlockBlobClient(key) + if err != nil { + return err + } + options := azblob.UploadStreamOptions{} + if b.sc != "" { + options.AccessTier = azblob.AccessTier(b.sc).ToPtr() + } + _, err = client.UploadStream(ctx, data, options) return err } func (b *wasb) Copy(dst, src string) error { - _, err := b.container.NewBlockBlobClient(dst).CopyFromURL(ctx, b.container.NewBlockBlobClient(src).URL(), - &azblob.CopyBlockBlobFromURLOptions{}) + dstCli, err := b.container.NewBlockBlobClient(dst) + if err != nil { + return err + } + srcCli, err := b.container.NewBlockBlobClient(src) + if err != nil { + return err + } + options := &azblob.BlockBlobCopyFromURLOptions{} + if b.sc != "" { + options.Tier = azblob.AccessTier(b.sc).ToPtr() + } + _, err = dstCli.CopyFromURL(ctx, srcCli.URL(), + options) return err } func (b *wasb) Delete(key string) error { - _, err := b.container.NewBlockBlobClient(key).Delete(ctx, &azblob.DeleteBlobOptions{}) + client, err := b.container.NewBlockBlobClient(key) + if err != nil { + return err + } + _, err = client.Delete(ctx, &azblob.BlobDeleteOptions{}) if err != nil && strings.Contains(err.Error(), string(azblob.StorageErrorCodeBlobNotFound)) { err = nil } @@ -107,7 +141,7 @@ func (b *wasb) List(prefix, marker, delimiter string, limit int64) ([]Object, er } limit32 := int32(limit) - pager := b.container.ListBlobsFlat(&azblob.ContainerListBlobFlatSegmentOptions{Prefix: &prefix, Marker: &marker, Maxresults: &(limit32)}) + pager := b.container.ListBlobsFlat(&azblob.ContainerListBlobsFlatOptions{Prefix: &prefix, Marker: &marker, MaxResults: &(limit32)}) if pager.Err() != nil { return nil, pager.Err() } @@ -129,11 +163,16 @@ func (b *wasb) List(prefix, marker, delimiter string, limit int64) ([]Object, er *blob.Properties.ContentLength, *mtime, strings.HasSuffix(*blob.Name, "/"), + string(*blob.Properties.AccessTier), } } return objs, nil } +func (b *wasb) SetStorageClass(sc string) { + b.sc = sc +} + func autoWasbEndpoint(containerName, accountName, scheme string, credential *azblob.SharedKeyCredential) (string, error) { baseURLs := []string{"blob.core.windows.net", "blob.core.chinacloudapi.cn"} endpoint := "" @@ -172,11 +211,11 @@ func newWasb(endpoint, accountName, accountKey, token string) (ObjectStorage, er containerName := hostParts[0] // Connection string support: DefaultEndpointsProtocol=[http|https];AccountName=***;AccountKey=***;EndpointSuffix=[core.windows.net|core.chinacloudapi.cn] if connString := os.Getenv("AZURE_STORAGE_CONNECTION_STRING"); connString != "" { - var client azblob.ContainerClient + var client *azblob.ContainerClient if client, err = azblob.NewContainerClientFromConnectionString(connString, containerName, nil); err != nil { return nil, err } - return &wasb{container: &client, cName: containerName}, nil + return &wasb{container: client, cName: containerName}, nil } credential, err := azblob.NewSharedKeyCredential(accountName, accountKey) @@ -198,7 +237,7 @@ func newWasb(endpoint, accountName, accountKey, token string) (ObjectStorage, er return nil, err } - return &wasb{container: &client, cName: containerName}, nil + return &wasb{container: client, cName: containerName}, nil } func init() { diff --git a/pkg/object/b2.go b/pkg/object/b2.go index df607cec845e..c02abf0429ce 100644 --- a/pkg/object/b2.go +++ b/pkg/object/b2.go @@ -78,6 +78,7 @@ func (c *b2client) Head(key string) (Object, error) { f.ContentLength, time.Unix(f.UploadTimestamp/1000, 0), strings.HasSuffix(f.Name, "/"), + "", }, nil } @@ -142,6 +143,7 @@ func (c *b2client) List(prefix, marker, delimiter string, limit int64) ([]Object f.ContentLength, time.Unix(f.UploadTimestamp/1000, 0), strings.HasSuffix(f.Name, "/"), + "", } } c.nextMarker = resp.NextFileName diff --git a/pkg/object/bos.go b/pkg/object/bos.go index f4225d61531e..51a7a8927164 100644 --- a/pkg/object/bos.go +++ b/pkg/object/bos.go @@ -39,6 +39,7 @@ const bosDefaultRegion = "bj" type bosclient struct { DefaultObjectStorage bucket string + sc string c *bos.Client } @@ -58,6 +59,11 @@ func (q *bosclient) Limits() Limits { func (q *bosclient) Create() error { _, err := q.c.PutBucket(q.bucket) + if err == nil && q.sc != "" { + if err := q.c.PutBucketStorageclass(q.bucket, q.sc); err != nil { + logger.Warnf("failed to set storage class: %v", err) + } + } if err != nil && isExists(err) { err = nil } @@ -78,6 +84,7 @@ func (q *bosclient) Head(key string) (Object, error) { r.ContentLength, mtime, strings.HasSuffix(key, "/"), + r.StorageClass, }, nil } @@ -106,7 +113,11 @@ func (q *bosclient) Put(key string, in io.Reader) error { if err != nil { return err } - _, err = q.c.BasicPutObject(q.bucket, key, body) + args := new(api.PutObjectArgs) + if q.sc != "" { + args.StorageClass = q.sc + } + _, err = q.c.PutObject(q.bucket, key, body, args) return err } @@ -137,11 +148,11 @@ func (q *bosclient) List(prefix, marker, delimiter string, limit int64) ([]Objec for i := 0; i < n; i++ { k := out.Contents[i] mod, _ := time.Parse("2006-01-02T15:04:05Z", k.LastModified) - objs[i] = &obj{k.Key, int64(k.Size), mod, strings.HasSuffix(k.Key, "/")} + objs[i] = &obj{k.Key, int64(k.Size), mod, strings.HasSuffix(k.Key, "/"), k.StorageClass} } if delimiter != "" { for _, p := range out.CommonPrefixes { - objs = append(objs, &obj{p.Prefix, 0, time.Unix(0, 0), true}) + objs = append(objs, &obj{p.Prefix, 0, time.Unix(0, 0), true, ""}) } sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() }) } @@ -149,7 +160,11 @@ func (q *bosclient) List(prefix, marker, delimiter string, limit int64) ([]Objec } func (q *bosclient) CreateMultipartUpload(key string) (*MultipartUpload, error) { - r, err := q.c.BasicInitiateMultipartUpload(q.bucket, key) + args := new(api.InitiateMultipartUploadArgs) + if q.sc != "" { + args.StorageClass = q.sc + } + r, err := q.c.InitiateMultipartUpload(q.bucket, key, "", args) if err != nil { return nil, err } diff --git a/pkg/object/ceph.go b/pkg/object/ceph.go index 24a48eab3ebb..f82c51f0e8d6 100644 --- a/pkg/object/ceph.go +++ b/pkg/object/ceph.go @@ -178,7 +178,7 @@ func (c *ceph) Head(key string) (Object, error) { if err != nil { return err } - o = &obj{key, int64(stat.Size), stat.ModTime, strings.HasSuffix(key, "/")} + o = &obj{key, int64(stat.Size), stat.ModTime, strings.HasSuffix(key, "/"), ""} return nil }) if err == rados.ErrNotFound { @@ -222,7 +222,7 @@ func (c *ceph) ListAll(prefix, marker string) (<-chan Object, error) { logger.Errorf("Stat key %s: %s", key, err) return } - objs <- &obj{key, int64(st.Size), st.ModTime, strings.HasSuffix(key, "/")} + objs <- &obj{key, int64(st.Size), st.ModTime, strings.HasSuffix(key, "/"), ""} } }() return nil diff --git a/pkg/object/cos.go b/pkg/object/cos.go index 3c6ca78ced8f..f4a4bb4e1315 100644 --- a/pkg/object/cos.go +++ b/pkg/object/cos.go @@ -41,6 +41,7 @@ const cosChecksumKey = "x-cos-meta-" + checksumAlgr type COS struct { c *cos.Client endpoint string + sc string } func (c *COS) String() string { @@ -84,8 +85,15 @@ func (c *COS) Head(key string) (Object, error) { if val, ok := header["Last-Modified"]; ok { mtime, _ = time.Parse(time.RFC1123, val[0]) } - - return &obj{key, size, mtime, strings.HasSuffix(key, "/")}, nil + var sc string + if val, ok := header["X-Cos-Storage-Class"]; ok { + sc = val[0] + } else { + // https://cloud.tencent.com/document/product/436/7745 + // This header is returned only if the object is not STANDARD storage class. + sc = "STANDARD" + } + return &obj{key, size, mtime, strings.HasSuffix(key, "/"), sc}, nil } func (c *COS) Get(key string, off, limit int64) (io.ReadCloser, error) { @@ -105,20 +113,30 @@ func (c *COS) Get(key string, off, limit int64) (io.ReadCloser, error) { } func (c *COS) Put(key string, in io.Reader) error { - var options *cos.ObjectPutOptions + var options cos.ObjectPutOptions if ins, ok := in.(io.ReadSeeker); ok { header := http.Header(map[string][]string{ cosChecksumKey: {generateChecksum(ins)}, }) - options = &cos.ObjectPutOptions{ObjectPutHeaderOptions: &cos.ObjectPutHeaderOptions{XCosMetaXXX: &header}} + options.ObjectPutHeaderOptions = &cos.ObjectPutHeaderOptions{XCosMetaXXX: &header} + } + if c.sc != "" { + if options.ObjectPutHeaderOptions == nil { + options.ObjectPutHeaderOptions = &cos.ObjectPutHeaderOptions{} + } + options.ObjectPutHeaderOptions.XCosStorageClass = c.sc } - _, err := c.c.Object.Put(ctx, key, in, options) + _, err := c.c.Object.Put(ctx, key, in, &options) return err } func (c *COS) Copy(dst, src string) error { + var opt cos.ObjectCopyOptions + if c.sc != "" { + opt.ObjectCopyHeaderOptions = &cos.ObjectCopyHeaderOptions{XCosStorageClass: c.sc} + } source := fmt.Sprintf("%s/%s", c.endpoint, src) - _, _, err := c.c.Object.Copy(ctx, dst, source, nil) + _, _, err := c.c.Object.Copy(ctx, dst, source, &opt) return err } @@ -154,7 +172,7 @@ func (c *COS) List(prefix, marker, delimiter string, limit int64) ([]Object, err if err != nil { return nil, errors.WithMessagef(err, "failed to decode key %s", o.Key) } - objs[i] = &obj{key, int64(o.Size), t, strings.HasSuffix(key, "/")} + objs[i] = &obj{key, int64(o.Size), t, strings.HasSuffix(key, "/"), o.StorageClass} } if delimiter != "" { for _, p := range resp.CommonPrefixes { @@ -162,7 +180,7 @@ func (c *COS) List(prefix, marker, delimiter string, limit int64) ([]Object, err if err != nil { return nil, errors.WithMessagef(err, "failed to decode commonPrefixes %s", p) } - objs = append(objs, &obj{key, 0, time.Unix(0, 0), true}) + objs = append(objs, &obj{key, 0, time.Unix(0, 0), true, ""}) } sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() }) } @@ -174,7 +192,11 @@ func (c *COS) ListAll(prefix, marker string) (<-chan Object, error) { } func (c *COS) CreateMultipartUpload(key string) (*MultipartUpload, error) { - resp, _, err := c.c.Object.InitiateMultipartUpload(ctx, key, nil) + var options cos.InitiateMultipartUploadOptions + if c.sc != "" { + options.ObjectPutHeaderOptions = &cos.ObjectPutHeaderOptions{XCosStorageClass: c.sc} + } + resp, _, err := c.c.Object.InitiateMultipartUpload(ctx, key, &options) if err != nil { return nil, err } @@ -228,6 +250,10 @@ func (c *COS) ListUploads(marker string) ([]*PendingPart, string, error) { return parts, result.NextKeyMarker, nil } +func (c *COS) SetStorageClass(sc string) { + c.sc = sc +} + func autoCOSEndpoint(bucketName, accessKey, secretKey, token string) (string, error) { client := cos.NewClient(nil, &http.Client{ Transport: &cos.AuthorizationTransport{ @@ -287,7 +313,7 @@ func newCOS(endpoint, accessKey, secretKey, token string) (ObjectStorage, error) }, }) client.UserAgent = UserAgent - return &COS{client, uri.Host}, nil + return &COS{c: client, endpoint: uri.Host}, nil } func init() { diff --git a/pkg/object/eos.go b/pkg/object/eos.go index 0dd507280c2c..f80163787361 100644 --- a/pkg/object/eos.go +++ b/pkg/object/eos.go @@ -87,7 +87,7 @@ func newEos(endpoint, accessKey, secretKey, token string) (ObjectStorage, error) return nil, fmt.Errorf("aws session: %s", err) } ses.Handlers.Build.PushFront(disableSha256Func) - return &eos{s3client{bucket, s3.New(ses), ses}}, nil + return &eos{s3client{bucket: bucket, s3: s3.New(ses), ses: ses}}, nil } func init() { diff --git a/pkg/object/etcd.go b/pkg/object/etcd.go index 8573b3bdf8ac..f8559da1ab57 100644 --- a/pkg/object/etcd.go +++ b/pkg/object/etcd.go @@ -87,6 +87,7 @@ func (c *etcdClient) Head(key string) (Object, error) { int64(len(p.Value)), time.Now(), strings.HasSuffix(key, "/"), + "", }, nil } } @@ -138,6 +139,7 @@ func (c *etcdClient) List(prefix, marker, delimiter string, limit int64) ([]Obje int64(len(kv.Value)), time.Now(), strings.HasSuffix(k, "/"), + "", }) } return objs, nil diff --git a/pkg/object/file.go b/pkg/object/file.go index fdbeb8da208d..ed0ac6ce9e90 100644 --- a/pkg/object/file.go +++ b/pkg/object/file.go @@ -93,6 +93,7 @@ func (d *filestore) Head(key string) (Object, error) { size, fi.ModTime(), fi.IsDir(), + "", }, owner, group, @@ -369,6 +370,7 @@ func (d *filestore) ListAll(prefix, marker string) (<-chan Object, error) { info.Size(), info.ModTime(), info.IsDir(), + "", }, owner, group, diff --git a/pkg/object/gs.go b/pkg/object/gs.go index 203e7ae85906..ea35e2f4a1de 100644 --- a/pkg/object/gs.go +++ b/pkg/object/gs.go @@ -42,6 +42,7 @@ type gs struct { bucket string region string pageToken string + sc string } func (g *gs) String() string { @@ -76,8 +77,9 @@ func (g *gs) Create() error { } err := g.client.Bucket(g.bucket).Create(ctx, projectID, &storage.BucketAttrs{ - Name: g.bucket, - Location: g.region, + Name: g.bucket, + StorageClass: g.sc, + Location: g.region, }) if err != nil && strings.Contains(err.Error(), "You already own this bucket") { return nil @@ -99,6 +101,7 @@ func (g *gs) Head(key string) (Object, error) { attrs.Size, attrs.Updated, strings.HasSuffix(key, "/"), + attrs.StorageClass, }, nil } @@ -112,6 +115,7 @@ func (g *gs) Get(key string, off, limit int64) (io.ReadCloser, error) { func (g *gs) Put(key string, data io.Reader) error { writer := g.client.Bucket(g.bucket).Object(key).NewWriter(ctx) + writer.StorageClass = g.sc _, err := io.Copy(writer, data) if err != nil { return err @@ -151,9 +155,9 @@ func (g *gs) List(prefix, marker, delimiter string, limit int64) ([]Object, erro for i := 0; i < n; i++ { item := entries[i] if delimiter != "" && item.Prefix != "" { - objs[i] = &obj{item.Prefix, 0, time.Unix(0, 0), true} + objs[i] = &obj{item.Prefix, 0, time.Unix(0, 0), true, item.StorageClass} } else { - objs[i] = &obj{item.Name, item.Size, item.Updated, strings.HasSuffix(item.Name, "/")} + objs[i] = &obj{item.Name, item.Size, item.Updated, strings.HasSuffix(item.Name, "/"), item.StorageClass} } } if delimiter != "" { @@ -162,6 +166,10 @@ func (g *gs) List(prefix, marker, delimiter string, limit int64) ([]Object, erro return objs, nil } +func (g *gs) SetStorageClass(sc string) { + g.sc = sc +} + func newGS(endpoint, accessKey, secretKey, token string) (ObjectStorage, error) { if !strings.Contains(endpoint, "://") { endpoint = fmt.Sprintf("gs://%s", endpoint) diff --git a/pkg/object/hdfs.go b/pkg/object/hdfs.go index cc2c3c52e8f6..dd827cd8441f 100644 --- a/pkg/object/hdfs.go +++ b/pkg/object/hdfs.go @@ -44,7 +44,7 @@ type hdfsclient struct { addr string c *hdfs.Client dfsReplication int - basePath string + basePath string } func (h *hdfsclient) String() string { @@ -52,7 +52,7 @@ func (h *hdfsclient) String() string { } func (h *hdfsclient) path(key string) string { - return h.basePath + key + return h.basePath + key } func (h *hdfsclient) Head(key string) (Object, error) { @@ -68,6 +68,7 @@ func (h *hdfsclient) Head(key string) (Object, error) { info.Size(), info.ModTime(), info.IsDir(), + "", }, hinfo.Owner(), hinfo.OwnerGroup(), @@ -257,6 +258,7 @@ func (h *hdfsclient) ListAll(prefix, marker string) (<-chan Object, error) { info.Size(), info.ModTime(), info.IsDir(), + "", }, hinfo.Owner(), hinfo.OwnerGroup(), @@ -312,16 +314,16 @@ func newHDFS(addr, username, sk, token string) (ObjectStorage, error) { return nil, fmt.Errorf("Problem loading configuration: %s", err) } - basePath := "/" + basePath := "/" options := hdfs.ClientOptionsFromConf(conf) if addr != "" { - // nn1.example.com:8020,nn2.example.com:8020/user/juicefs - sp := strings.SplitN(addr, "/", 2) - if len(sp) > 1 { - basePath = basePath + strings.TrimRight(sp[1], "/") + "/" - } - options.Addresses = strings.Split(sp[0], ",") - logger.Infof("HDFS Addresses: %s, basePath: %s", sp[0], basePath) + // nn1.example.com:8020,nn2.example.com:8020/user/juicefs + sp := strings.SplitN(addr, "/", 2) + if len(sp) > 1 { + basePath = basePath + strings.TrimRight(sp[1], "/") + "/" + } + options.Addresses = strings.Split(sp[0], ",") + logger.Infof("HDFS Addresses: %s, basePath: %s", sp[0], basePath) } if options.KerberosClient != nil { diff --git a/pkg/object/ibmcos.go b/pkg/object/ibmcos.go index 669eeaec1960..2e87d82ecc76 100644 --- a/pkg/object/ibmcos.go +++ b/pkg/object/ibmcos.go @@ -43,6 +43,7 @@ import ( type ibmcos struct { bucket string s3 *s3.S3 + sc string } func (s *ibmcos) String() string { @@ -50,7 +51,14 @@ func (s *ibmcos) String() string { } func (s *ibmcos) Create() error { - _, err := s.s3.CreateBucket(&s3.CreateBucketInput{Bucket: &s.bucket}) + input := &s3.CreateBucketInput{Bucket: &s.bucket} + // https://cloud.ibm.com/docs/cloud-object-storage?topic=cloud-object-storage-classes&code=go + if s.sc != "" { + input.CreateBucketConfiguration = &s3.CreateBucketConfiguration{ + LocationConstraint: &s.sc, + } + } + _, err := s.s3.CreateBucket(input) if err != nil && isExists(err) { err = nil } @@ -100,6 +108,9 @@ func (s *ibmcos) Put(key string, in io.Reader) error { Body: body, ContentType: &mimeType, } + if s.sc != "" { + params.SetStorageClass(s.sc) + } _, err := s.s3.PutObject(params) return err } @@ -111,6 +122,9 @@ func (s *ibmcos) Copy(dst, src string) error { Key: &dst, CopySource: &src, } + if s.sc != "" { + params.SetStorageClass(s.sc) + } _, err := s.s3.CopyObject(params) return err } @@ -132,6 +146,7 @@ func (s *ibmcos) Head(key string) (Object, error) { *r.ContentLength, *r.LastModified, strings.HasSuffix(key, "/"), + *r.StorageClass, }, nil } @@ -167,7 +182,7 @@ func (s *ibmcos) List(prefix, marker, delimiter string, limit int64) ([]Object, if err != nil { return nil, errors.WithMessagef(err, "failed to decode key %s", *o.Key) } - objs[i] = &obj{oKey, *o.Size, *o.LastModified, strings.HasSuffix(oKey, "/")} + objs[i] = &obj{oKey, *o.Size, *o.LastModified, strings.HasSuffix(oKey, "/"), *o.StorageClass} } if delimiter != "" { for _, p := range resp.CommonPrefixes { @@ -175,7 +190,7 @@ func (s *ibmcos) List(prefix, marker, delimiter string, limit int64) ([]Object, if err != nil { return nil, errors.WithMessagef(err, "failed to decode commonPrefixes %s", *p.Prefix) } - objs = append(objs, &obj{prefix, 0, time.Unix(0, 0), true}) + objs = append(objs, &obj{prefix, 0, time.Unix(0, 0), true, ""}) } sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() }) } @@ -191,6 +206,9 @@ func (s *ibmcos) CreateMultipartUpload(key string) (*MultipartUpload, error) { Bucket: &s.bucket, Key: &key, } + if s.sc != "" { + params.SetStorageClass(s.sc) + } resp, err := s.s3.CreateMultipartUpload(params) if err != nil { return nil, err @@ -265,6 +283,10 @@ func (s *ibmcos) ListUploads(marker string) ([]*PendingPart, string, error) { return parts, nextMarker, nil } +func (s *ibmcos) SetStorageClass(sc string) { + s.sc = sc +} + func newIBMCOS(endpoint, apiKey, serviceInstanceID, token string) (ObjectStorage, error) { if !strings.Contains(endpoint, "://") { endpoint = fmt.Sprintf("https://%s", endpoint) @@ -283,7 +305,7 @@ func newIBMCOS(endpoint, apiKey, serviceInstanceID, token string) (ObjectStorage WithS3ForcePathStyle(true) sess := session.Must(session.NewSession()) client := s3.New(sess, conf) - return &ibmcos{bucket, client}, nil + return &ibmcos{bucket: bucket, s3: client}, nil } func init() { diff --git a/pkg/object/interface.go b/pkg/object/interface.go index 926ab2b312a3..16e766746a7f 100644 --- a/pkg/object/interface.go +++ b/pkg/object/interface.go @@ -27,6 +27,7 @@ type Object interface { Mtime() time.Time IsDir() bool IsSymlink() bool + StorageClass() string } type obj struct { @@ -34,13 +35,15 @@ type obj struct { size int64 mtime time.Time isDir bool + sc string } -func (o *obj) Key() string { return o.key } -func (o *obj) Size() int64 { return o.size } -func (o *obj) Mtime() time.Time { return o.mtime } -func (o *obj) IsDir() bool { return o.isDir } -func (o *obj) IsSymlink() bool { return false } +func (o *obj) Key() string { return o.key } +func (o *obj) Size() int64 { return o.size } +func (o *obj) Mtime() time.Time { return o.mtime } +func (o *obj) IsDir() bool { return o.isDir } +func (o *obj) IsSymlink() bool { return false } +func (o *obj) StorageClass() string { return o.sc } type MultipartUpload struct { MinPartSize int diff --git a/pkg/object/jss.go b/pkg/object/jss.go index 73c1e5b6ccf5..280327d5e846 100644 --- a/pkg/object/jss.go +++ b/pkg/object/jss.go @@ -45,6 +45,9 @@ func (j *jss) Copy(dst, src string) error { Key: &dst, CopySource: &src, } + if j.sc != "" { + params.SetStorageClass(j.sc) + } _, err := j.s3client.s3.CopyObject(params) return err } @@ -74,7 +77,7 @@ func newJSS(endpoint, accessKey, secretKey, token string) (ObjectStorage, error) return nil, err } ses.Handlers.Build.PushFront(disableSha256Func) - return &jss{s3client{bucket, s3.New(ses), ses}}, nil + return &jss{s3client{bucket: bucket, s3: s3.New(ses), ses: ses}}, nil } func init() { diff --git a/pkg/object/ks3.go b/pkg/object/ks3.go index bc4192512c1c..57c834f56cd8 100644 --- a/pkg/object/ks3.go +++ b/pkg/object/ks3.go @@ -32,7 +32,6 @@ import ( "github.com/pkg/errors" - "github.com/aws/aws-sdk-go/aws/session" "github.com/juicedata/juicefs/pkg/utils" "github.com/ks3sdklib/aws-sdk-go/aws" "github.com/ks3sdklib/aws-sdk-go/aws/awserr" @@ -43,7 +42,7 @@ import ( type ks3 struct { bucket string s3 *s3.S3 - ses *session.Session + sc string } func (s *ks3) String() string { @@ -82,11 +81,18 @@ func (s *ks3) Head(key string) (Object, error) { return nil, err } + var sc string + if val, ok := r.Metadata["X-Amz-Storage-Class"]; ok { + sc = *val + } else { + sc = "STANDARD" + } return &obj{ key, *r.ContentLength, *r.LastModified, strings.HasSuffix(key, "/"), + sc, }, nil } @@ -126,6 +132,9 @@ func (s *ks3) Put(key string, in io.Reader) error { Body: body, ContentType: &mimeType, } + if s.sc != "" { + params.StorageClass = aws.String(s.sc) + } _, err := s.s3.PutObject(params) return err } @@ -136,6 +145,9 @@ func (s *ks3) Copy(dst, src string) error { Key: &dst, CopySource: &src, } + if s.sc != "" { + params.StorageClass = aws.String(s.sc) + } _, err := s.s3.CopyObject(params) return err } @@ -175,7 +187,7 @@ func (s *ks3) List(prefix, marker, delimiter string, limit int64) ([]Object, err if err != nil { return nil, errors.WithMessagef(err, "failed to decode key %s", *o.Key) } - objs[i] = &obj{oKey, *o.Size, *o.LastModified, strings.HasSuffix(oKey, "/")} + objs[i] = &obj{oKey, *o.Size, *o.LastModified, strings.HasSuffix(oKey, "/"), *o.StorageClass} } if delimiter != "" { for _, p := range resp.CommonPrefixes { @@ -183,7 +195,7 @@ func (s *ks3) List(prefix, marker, delimiter string, limit int64) ([]Object, err if err != nil { return nil, errors.WithMessagef(err, "failed to decode commonPrefixes %s", *p.Prefix) } - objs = append(objs, &obj{prefix, 0, time.Unix(0, 0), true}) + objs = append(objs, &obj{prefix, 0, time.Unix(0, 0), true, ""}) } sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() }) } @@ -199,6 +211,9 @@ func (s *ks3) CreateMultipartUpload(key string) (*MultipartUpload, error) { Bucket: &s.bucket, Key: &key, } + if s.sc != "" { + params.StorageClass = aws.String(s.sc) + } resp, err := s.s3.CreateMultipartUpload(params) if err != nil { return nil, err @@ -284,6 +299,10 @@ func (s *ks3) ListUploads(marker string) ([]*PendingPart, string, error) { return parts, nextMarker, nil } +func (s *ks3) SetStorageClass(sc string) { + s.sc = sc +} + var ks3Regions = map[string]string{ "cn-beijing": "BEIJING", "cn-shanghai": "SHANGHAI", @@ -337,7 +356,7 @@ func newKS3(endpoint, accessKey, secretKey, token string) (ObjectStorage, error) Credentials: credentials.NewStaticCredentials(accessKey, secretKey, token), } - return &ks3{bucket, s3.New(awsConfig), nil}, nil + return &ks3{bucket: bucket, s3: s3.New(awsConfig)}, nil } func init() { diff --git a/pkg/object/mem.go b/pkg/object/mem.go index 23f0ac9ded0d..463910301eb0 100644 --- a/pkg/object/mem.go +++ b/pkg/object/mem.go @@ -64,6 +64,7 @@ func (m *memStore) Head(key string) (Object, error) { int64(len(o.data)), o.mtime, strings.HasSuffix(key, "/"), + "", }, o.owner, o.group, @@ -150,6 +151,7 @@ func (m *memStore) List(prefix, marker, delimiter string, limit int64) ([]Object 0, time.Unix(0, 0), strings.HasSuffix(commonPrefix, "/"), + "", }, o.owner, o.group, @@ -168,6 +170,7 @@ func (m *memStore) List(prefix, marker, delimiter string, limit int64) ([]Object int64(len(o.data)), o.mtime, strings.HasSuffix(k, "/"), + "", }, o.owner, o.group, diff --git a/pkg/object/minio.go b/pkg/object/minio.go index e295822c65f7..029ee606da8a 100644 --- a/pkg/object/minio.go +++ b/pkg/object/minio.go @@ -96,7 +96,7 @@ func newMinio(endpoint, accessKey, secretKey, token string) (ObjectStorage, erro bucket = bucket[len("minio/"):] } bucket = strings.Split(bucket, "/")[0] - return &minio{s3client{bucket, s3.New(ses), ses}}, nil + return &minio{s3client{bucket: bucket, s3: s3.New(ses), ses: ses}}, nil } func init() { diff --git a/pkg/object/object_storage.go b/pkg/object/object_storage.go index f21bf35da0b4..867019844049 100644 --- a/pkg/object/object_storage.go +++ b/pkg/object/object_storage.go @@ -44,6 +44,10 @@ type SupportSymlink interface { Readlink(name string) (string, error) } +type SupportStorageClass interface { + SetStorageClass(sc string) +} + type File interface { Object Owner() string diff --git a/pkg/object/object_storage_test.go b/pkg/object/object_storage_test.go index 721805034ecf..3bef6f468f83 100644 --- a/pkg/object/object_storage_test.go +++ b/pkg/object/object_storage_test.go @@ -35,6 +35,14 @@ import ( "testing" "time" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + + "github.com/volcengine/ve-tos-golang-sdk/v2/tos/enum" + + "github.com/huaweicloud/huaweicloud-sdk-go-obs/obs" + + "github.com/aliyun/aliyun-oss-go-sdk/oss" + "github.com/redis/go-redis/v9" ) @@ -68,8 +76,46 @@ func listAll(s ObjectStorage, prefix, marker string, limit int64) ([]Object, err return nil, err } +func setStorageClass(o ObjectStorage) string { + switch s := o.(type) { + case *wasb: + s.sc = string(azblob.AccessTierCool) + return s.sc + case *bosclient: + s.sc = "STANDARD_IA" + return s.sc + case *COS: + s.sc = "STANDARD_IA" + return s.sc + case *ks3: + s.sc = "STANDARD_IA" + return s.sc + case *gs: + s.sc = "NEARLINE" + return s.sc + case *obsClient: + s.sc = string(obs.StorageClassWarm) + return s.sc + case *ossClient: + s.sc = string(oss.StorageIA) + return s.sc + case *qingstor: + s.sc = "STANDARD_IA" + return s.sc + case *s3client: + s.sc = "STANDARD_IA" + return s.sc + case *tosClient: + s.sc = string(enum.StorageClassIa) + return s.sc + default: + return "" + } +} + // nolint:errcheck func testStorage(t *testing.T, s ObjectStorage) { + sc := setStorageClass(s) if err := s.Create(); err != nil { t.Fatalf("Can't create bucket %s: %s", s, err) } @@ -294,6 +340,9 @@ func testStorage(t *testing.T, s ObjectStorage) { if objs[i].Key() != sortedKeys[i] { t.Fatal("The result for list4 is incorrect") } + if sc != "" && objs[i].StorageClass() != sc { + t.Fatal("storage class is not correct") + } } } @@ -316,8 +365,10 @@ func testStorage(t *testing.T, s ObjectStorage) { t.Fatal("err should be os.ErrNotExist") } - if _, err := s.Head("test"); err != nil { + if o, err := s.Head("test"); err != nil { t.Fatalf("check exists failed: %s", err.Error()) + } else if sc != "" && o.StorageClass() != sc { + t.Fatalf("storage class should be %s but got %s", sc, o.StorageClass()) } if err := s.Delete("test"); err != nil { @@ -390,6 +441,11 @@ func testStorage(t *testing.T, s ObjectStorage) { if err = s.CompleteUpload(k, upload.UploadID, parts); err != nil { t.Fatalf("failed to complete multipart upload: %v", err) } + if meta, err := s.Head(k); err != nil { + t.Fatalf("failed to head object: %v", err) + } else if sc != "" && meta.StorageClass() != sc { + t.Fatalf("storage class should be %s but got %s", sc, meta.StorageClass()) + } checkContent := func(key string, content []byte) { r, err := s.Get(key, 0, -1) if err != nil { @@ -545,6 +601,7 @@ func TestGS(t *testing.T) { //skip mutate if os.Getenv("GOOGLE_APPLICATION_CREDENTIALS") == "" { t.SkipNow() } + //export https_proxy=http://127.0.0.1:7890 http_proxy=http://127.0.0.1:7890 all_proxy=socks5://127.0.0.1:7890 gs, _ := newGS(os.Getenv("GOOGLE_ENDPOINT"), "", "", "") testStorage(t, gs) } @@ -890,16 +947,11 @@ func TestTOS(t *testing.T) { //skip mutate func TestMain(m *testing.M) { if envFile := os.Getenv("JUICEFS_ENV_FILE_FOR_TEST"); envFile != "" { - // schema: S3 AWS_ENDPOINT=xxxxx AWS_ACCESS_KEY_ID=xxxx AWS_SECRET_ACCESS_KEY=xxxx + // schema: S3 AWS_ENDPOINT=xxxxx if _, err := os.Stat(envFile); err == nil { file, _ := os.ReadFile(envFile) for _, line := range strings.Split(strings.TrimSpace(string(file)), "\n") { - env := strings.Fields(line) - if len(env) <= 1 { - continue - } - for _, e := range env[1:] { - envkv := strings.SplitN(e, "=", 2) + if envkv := strings.SplitN(line, "=", 2); len(envkv) == 2 { if err := os.Setenv(envkv[0], envkv[1]); err != nil { logger.Errorf("set env %s=%s error", envkv[0], envkv[1]) } diff --git a/pkg/object/obs.go b/pkg/object/obs.go index aa4866845193..48f516aeb992 100644 --- a/pkg/object/obs.go +++ b/pkg/object/obs.go @@ -45,6 +45,7 @@ type obsClient struct { bucket string region string checkEtag bool + sc string c *obs.ObsClient } @@ -67,6 +68,7 @@ func (s *obsClient) Create() error { params.Bucket = s.bucket params.Location = s.region params.AvailableZone = "3az" + params.StorageClass = obs.StorageClassType(s.sc) _, err := s.c.CreateBucket(params) if err != nil && isExists(err) { err = nil @@ -91,6 +93,7 @@ func (s *obsClient) Head(key string) (Object, error) { r.ContentLength, r.LastModified, strings.HasSuffix(key, "/"), + string(r.StorageClass), }, nil } @@ -153,6 +156,7 @@ func (s *obsClient) Put(key string, in io.Reader) error { params.ContentLength = vlen params.ContentMD5 = base64.StdEncoding.EncodeToString(sum[:]) params.ContentType = mimeType + params.StorageClass = obs.StorageClassType(s.sc) resp, err := s.c.PutObject(params) if err == nil && s.checkEtag && strings.Trim(resp.ETag, "\"") != obs.Hex(sum) { err = fmt.Errorf("unexpected ETag: %s != %s", strings.Trim(resp.ETag, "\""), obs.Hex(sum)) @@ -166,6 +170,7 @@ func (s *obsClient) Copy(dst, src string) error { params.Key = dst params.CopySourceBucket = s.bucket params.CopySourceKey = src + params.StorageClass = obs.StorageClassType(s.sc) _, err := s.c.CopyObject(params) return err } @@ -199,7 +204,7 @@ func (s *obsClient) List(prefix, marker, delimiter string, limit int64) ([]Objec if err != nil { return nil, errors.WithMessagef(err, "failed to decode key %s", o.Key) } - objs[i] = &obj{key, o.Size, o.LastModified, strings.HasSuffix(key, "/")} + objs[i] = &obj{key, o.Size, o.LastModified, strings.HasSuffix(key, "/"), string(o.StorageClass)} } if delimiter != "" { for _, p := range resp.CommonPrefixes { @@ -207,7 +212,7 @@ func (s *obsClient) List(prefix, marker, delimiter string, limit int64) ([]Objec if err != nil { return nil, errors.WithMessagef(err, "failed to decode commonPrefixes %s", p) } - objs = append(objs, &obj{prefix, 0, time.Unix(0, 0), true}) + objs = append(objs, &obj{prefix, 0, time.Unix(0, 0), true, ""}) } sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() }) } @@ -222,6 +227,7 @@ func (s *obsClient) CreateMultipartUpload(key string) (*MultipartUpload, error) params := &obs.InitiateMultipartUploadInput{} params.Bucket = s.bucket params.Key = key + params.StorageClass = obs.StorageClassType(s.sc) resp, err := s.c.InitiateMultipartUpload(params) if err != nil { return nil, err @@ -307,6 +313,10 @@ func (s *obsClient) ListUploads(marker string) ([]*PendingPart, string, error) { return parts, nextMarker, nil } +func (s *obsClient) SetStorageClass(sc string) { + s.sc = sc +} + func autoOBSEndpoint(bucketName, accessKey, secretKey, token string) (string, error) { region := obsDefaultRegion if r := os.Getenv("HWCLOUD_DEFAULT_REGION"); r != "" { diff --git a/pkg/object/oos.go b/pkg/object/oos.go index 04af0a59e928..2cc269c2fd00 100644 --- a/pkg/object/oos.go +++ b/pkg/object/oos.go @@ -93,7 +93,7 @@ func newOOS(endpoint, accessKey, secretKey, token string) (ObjectStorage, error) return nil, fmt.Errorf("OOS session: %s", err) } ses.Handlers.Build.PushFront(disableSha256Func) - return &oos{s3client{bucket, s3.New(ses), ses}}, nil + return &oos{s3client{bucket: bucket, s3: s3.New(ses), ses: ses}}, nil } func init() { diff --git a/pkg/object/oss.go b/pkg/object/oss.go index ce959e266a03..e05524b905b2 100644 --- a/pkg/object/oss.go +++ b/pkg/object/oss.go @@ -41,6 +41,7 @@ const ossDefaultRegionID = "cn-hangzhou" type ossClient struct { client *oss.Client bucket *oss.Bucket + sc string } func (o *ossClient) String() string { @@ -58,7 +59,11 @@ func (o *ossClient) Limits() Limits { } func (o *ossClient) Create() error { - err := o.bucket.Client.CreateBucket(o.bucket.BucketName) + var option []oss.Option + if o.sc != "" { + option = append(option, oss.StorageClass(oss.StorageClassType(o.sc))) + } + err := o.bucket.Client.CreateBucket(o.bucket.BucketName, option...) if err != nil && isExists(err) { err = nil } @@ -78,7 +83,13 @@ func (o *ossClient) checkError(err error) error { } func (o *ossClient) Head(key string) (Object, error) { - r, err := o.bucket.GetObjectMeta(key) + var r http.Header + var err error + if o.sc != "" { + r, err = o.bucket.GetObjectDetailedMeta(key) + } else { + r, err = o.bucket.GetObjectMeta(key) + } if o.checkError(err) != nil { if e, ok := err.(oss.ServiceError); ok && e.StatusCode == http.StatusNotFound { err = os.ErrNotExist @@ -98,6 +109,7 @@ func (o *ossClient) Head(key string) (Object, error) { size, mtime, strings.HasSuffix(key, "/"), + r.Get(oss.HTTPHeaderOssStorageClass), }, nil } @@ -122,15 +134,22 @@ func (o *ossClient) Get(key string, off, limit int64) (resp io.ReadCloser, err e } func (o *ossClient) Put(key string, in io.Reader) error { + var option []oss.Option if ins, ok := in.(io.ReadSeeker); ok { - option := oss.Meta(checksumAlgr, generateChecksum(ins)) - return o.checkError(o.bucket.PutObject(key, in, option)) + option = append(option, oss.Meta(checksumAlgr, generateChecksum(ins))) + } + if o.sc != "" { + option = append(option, oss.ObjectStorageClass(oss.StorageClassType(o.sc))) } - return o.checkError(o.bucket.PutObject(key, in)) + return o.checkError(o.bucket.PutObject(key, in, option...)) } func (o *ossClient) Copy(dst, src string) error { - _, err := o.bucket.CopyObject(src, dst) + var option []oss.Option + if o.sc != "" { + option = append(option, oss.ObjectStorageClass(oss.StorageClassType(o.sc))) + } + _, err := o.bucket.CopyObject(src, dst, option...) return o.checkError(err) } @@ -151,11 +170,11 @@ func (o *ossClient) List(prefix, marker, delimiter string, limit int64) ([]Objec objs := make([]Object, n) for i := 0; i < n; i++ { o := result.Objects[i] - objs[i] = &obj{o.Key, o.Size, o.LastModified, strings.HasSuffix(o.Key, "/")} + objs[i] = &obj{o.Key, o.Size, o.LastModified, strings.HasSuffix(o.Key, "/"), o.StorageClass} } if delimiter != "" { for _, o := range result.CommonPrefixes { - objs = append(objs, &obj{o, 0, time.Unix(0, 0), true}) + objs = append(objs, &obj{o, 0, time.Unix(0, 0), true, ""}) } sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() }) } @@ -167,7 +186,11 @@ func (o *ossClient) ListAll(prefix, marker string) (<-chan Object, error) { } func (o *ossClient) CreateMultipartUpload(key string) (*MultipartUpload, error) { - r, err := o.bucket.InitiateMultipartUpload(key) + var option []oss.Option + if o.sc != "" { + option = append(option, oss.ObjectStorageClass(oss.StorageClassType(o.sc))) + } + r, err := o.bucket.InitiateMultipartUpload(key, option...) if o.checkError(err) != nil { return nil, err } @@ -229,6 +252,10 @@ func (o *ossClient) ListUploads(marker string) ([]*PendingPart, string, error) { return parts, result.NextKeyMarker, nil } +func (o *ossClient) SetStorageClass(sc string) { + o.sc = sc +} + type stsCred struct { AccessKeyId string AccessKeySecret string diff --git a/pkg/object/prefix.go b/pkg/object/prefix.go index a7f230de85f7..95dc0a842bf5 100644 --- a/pkg/object/prefix.go +++ b/pkg/object/prefix.go @@ -33,6 +33,12 @@ func WithPrefix(os ObjectStorage, prefix string) ObjectStorage { return &withPrefix{os, prefix} } +func (s *withPrefix) SetStorageClass(sc string) { + if o, ok := s.os.(SupportStorageClass); ok { + o.SetStorageClass(sc) + } +} + func (s *withPrefix) Symlink(oldName, newName string) error { if w, ok := s.os.(SupportSymlink); ok { return w.Symlink(oldName, s.prefix+newName) diff --git a/pkg/object/qingstor.go b/pkg/object/qingstor.go index 8172affeb44c..4244f991870b 100644 --- a/pkg/object/qingstor.go +++ b/pkg/object/qingstor.go @@ -40,6 +40,7 @@ import ( type qingstor struct { bucket *qs.Bucket + sc string } func (q *qingstor) String() string { @@ -76,6 +77,7 @@ func (q *qingstor) Head(key string) (Object, error) { *r.ContentLength, *r.LastModified, strings.HasSuffix(key, "/"), + *r.XQSStorageClass, }, nil } @@ -142,6 +144,9 @@ func (q *qingstor) Put(key string, in io.Reader) error { ContentLength: &vlen, ContentType: &mimeType, } + if q.sc != "" { + input.XQSStorageClass = &q.sc + } out, err := q.bucket.PutObject(key, input) if err != nil { return err @@ -157,6 +162,9 @@ func (q *qingstor) Copy(dst, src string) error { input := &qs.PutObjectInput{ XQSCopySource: &source, } + if q.sc != "" { + input.XQSStorageClass = &q.sc + } out, err := q.bucket.PutObject(dst, input) if err != nil { return err @@ -198,11 +206,12 @@ func (q *qingstor) List(prefix, marker, delimiter string, limit int64) ([]Object *k.Size, time.Unix(int64(*k.Modified), 0), strings.HasSuffix(*k.Key, "/"), + *k.StorageClass, } } if delimiter != "" { for _, p := range out.CommonPrefixes { - objs = append(objs, &obj{*p, 0, time.Unix(0, 0), true}) + objs = append(objs, &obj{*p, 0, time.Unix(0, 0), true, ""}) } sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() }) } @@ -214,7 +223,11 @@ func (q *qingstor) ListAll(prefix, marker string) (<-chan Object, error) { } func (q *qingstor) CreateMultipartUpload(key string) (*MultipartUpload, error) { - r, err := q.bucket.InitiateMultipartUpload(key, nil) + var input qs.InitiateMultipartUploadInput + if q.sc != "" { + input.XQSStorageClass = &q.sc + } + r, err := q.bucket.InitiateMultipartUpload(key, &input) if err != nil { return nil, err } @@ -290,6 +303,10 @@ func (q *qingstor) ListUploads(marker string) ([]*PendingPart, string, error) { return parts, nextMarker, nil } +func (q *qingstor) SetStorageClass(sc string) { + q.sc = sc +} + func newQingStor(endpoint, accessKey, secretKey, token string) (ObjectStorage, error) { if !strings.Contains(endpoint, "://") { endpoint = fmt.Sprintf("https://%s", endpoint) diff --git a/pkg/object/qiniu.go b/pkg/object/qiniu.go index 1ced1116b1b5..b62aab522bd3 100644 --- a/pkg/object/qiniu.go +++ b/pkg/object/qiniu.go @@ -99,6 +99,7 @@ func (q *qiniu) Head(key string) (Object, error) { r.Fsize, mtime, strings.HasSuffix(key, "/"), + "", }, nil } @@ -169,11 +170,11 @@ func (q *qiniu) List(prefix, marker, delimiter string, limit int64) ([]Object, e for i := 0; i < n; i++ { entry := entries[i] mtime := entry.PutTime / 10000000 - objs[i] = &obj{entry.Key, entry.Fsize, time.Unix(mtime, 0), strings.HasSuffix(entry.Key, "/")} + objs[i] = &obj{entry.Key, entry.Fsize, time.Unix(mtime, 0), strings.HasSuffix(entry.Key, "/"), ""} } if delimiter != "" { for _, p := range prefixes { - objs = append(objs, &obj{p, 0, time.Unix(0, 0), true}) + objs = append(objs, &obj{p, 0, time.Unix(0, 0), true, ""}) } sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() }) } @@ -215,7 +216,7 @@ func newQiniu(endpoint, accessKey, secretKey, token string) (ObjectStorage, erro return nil, fmt.Errorf("aws session: %s", err) } ses.Handlers.Build.PushFront(disableSha256Func) - s3client := s3client{bucket, s3.New(ses), ses} + s3client := s3client{bucket: bucket, s3: s3.New(ses), ses: ses} cfg := storage.Config{ UseHTTPS: uri.Scheme == "https", diff --git a/pkg/object/redis.go b/pkg/object/redis.go index f61df74983d7..c00961fdaf66 100644 --- a/pkg/object/redis.go +++ b/pkg/object/redis.go @@ -158,7 +158,7 @@ func (t *redisStore) ListAll(prefix, marker string) (<-chan Object, error) { } } // FIXME: mtime - objs <- &obj{keyList[start:end][idx], size, now, strings.HasSuffix(keyList[start:end][idx], "/")} + objs <- &obj{keyList[start:end][idx], size, now, strings.HasSuffix(keyList[start:end][idx], "/"), ""} } } } @@ -176,6 +176,7 @@ func (t *redisStore) Head(key string) (Object, error) { int64(len(data)), time.Now(), strings.HasSuffix(key, "/"), + "", }, err } diff --git a/pkg/object/restful.go b/pkg/object/restful.go index 32bfb531dc21..d0eeb777835c 100644 --- a/pkg/object/restful.go +++ b/pkg/object/restful.go @@ -179,6 +179,7 @@ func (s *RestfulStorage) Head(key string) (Object, error) { resp.ContentLength, mtime, strings.HasSuffix(key, "/"), + "", }, nil } diff --git a/pkg/object/s3.go b/pkg/object/s3.go index b61e8e557386..8f03199b0d71 100644 --- a/pkg/object/s3.go +++ b/pkg/object/s3.go @@ -56,6 +56,7 @@ var disableSha256Func = func(r *request.Request) { type s3client struct { bucket string + sc string s3 *s3.S3 ses *session.Session } @@ -102,11 +103,18 @@ func (s *s3client) Head(key string) (Object, error) { } return nil, err } + var sc string + if r.StorageClass != nil { + sc = *r.StorageClass + } else { + sc = "STANDARD" + } return &obj{ key, *r.ContentLength, *r.LastModified, strings.HasSuffix(key, "/"), + sc, }, nil } @@ -154,6 +162,9 @@ func (s *s3client) Put(key string, in io.Reader) error { ContentType: &mimeType, Metadata: map[string]*string{checksumAlgr: &checksum}, } + if s.sc != "" { + params.SetStorageClass(s.sc) + } _, err := s.s3.PutObject(params) return err } @@ -165,6 +176,9 @@ func (s *s3client) Copy(dst, src string) error { Key: &dst, CopySource: &src, } + if s.sc != "" { + params.SetStorageClass(s.sc) + } _, err := s.s3.CopyObject(params) return err } @@ -212,6 +226,7 @@ func (s *s3client) List(prefix, marker, delimiter string, limit int64) ([]Object *o.Size, *o.LastModified, strings.HasSuffix(oKey, "/"), + *o.StorageClass, } } if delimiter != "" { @@ -220,7 +235,7 @@ func (s *s3client) List(prefix, marker, delimiter string, limit int64) ([]Object if err != nil { return nil, errors.WithMessagef(err, "failed to decode commonPrefixes %s", *p.Prefix) } - objs = append(objs, &obj{prefix, 0, time.Unix(0, 0), true}) + objs = append(objs, &obj{prefix, 0, time.Unix(0, 0), true, ""}) } sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() }) } @@ -236,6 +251,9 @@ func (s *s3client) CreateMultipartUpload(key string) (*MultipartUpload, error) { Bucket: &s.bucket, Key: &key, } + if s.sc != "" { + params.SetStorageClass(s.sc) + } resp, err := s.s3.CreateMultipartUpload(params) if err != nil { return nil, err @@ -321,6 +339,10 @@ func (s *s3client) ListUploads(marker string) ([]*PendingPart, string, error) { return parts, nextMarker, nil } +func (s *s3client) SetStorageClass(sc string) { + s.sc = sc +} + func autoS3Region(bucketName, accessKey, secretKey string) (string, error) { awsConfig := &aws.Config{ HTTPClient: httpClient, @@ -507,7 +529,7 @@ func newS3(endpoint, accessKey, secretKey, token string) (ObjectStorage, error) return nil, fmt.Errorf("Fail to create aws session: %s", err) } ses.Handlers.Build.PushFront(disableSha256Func) - return &s3client{bucketName, s3.New(ses), ses}, nil + return &s3client{bucket: bucketName, s3: s3.New(ses), ses: ses}, nil } func init() { diff --git a/pkg/object/scs.go b/pkg/object/scs.go index c6074c1f9db0..20d91fa1bdbe 100644 --- a/pkg/object/scs.go +++ b/pkg/object/scs.go @@ -129,7 +129,7 @@ func (s *scsClient) List(prefix, marker, delimiter string, limit int64) ([]Objec } if delimiter != "" { for _, p := range list.CommonPrefixes { - objs = append(objs, &obj{p.Prefix, 0, time.Unix(0, 0), true}) + objs = append(objs, &obj{p.Prefix, 0, time.Unix(0, 0), true, ""}) } sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() }) } diff --git a/pkg/object/scw.go b/pkg/object/scw.go index f130cbfd7d84..2623f7a18a7a 100644 --- a/pkg/object/scw.go +++ b/pkg/object/scw.go @@ -84,7 +84,7 @@ func newScw(endpoint, accessKey, secretKey, token string) (ObjectStorage, error) return nil, fmt.Errorf("aws session: %s", err) } ses.Handlers.Build.PushFront(disableSha256Func) - return &scw{s3client{bucket, s3.New(ses), ses}}, nil + return &scw{s3client{bucket: bucket, s3: s3.New(ses), ses: ses}}, nil } func init() { diff --git a/pkg/object/sftp.go b/pkg/object/sftp.go index 6e494259161b..f65d8be93216 100644 --- a/pkg/object/sftp.go +++ b/pkg/object/sftp.go @@ -329,7 +329,7 @@ func (f *sftpStore) fileInfo(c *sftp.Client, key string, fi os.FileInfo) Object } } ff := &file{ - obj{key, fi.Size(), fi.ModTime(), fi.IsDir()}, + obj{key, fi.Size(), fi.ModTime(), fi.IsDir(), ""}, owner, group, fi.Mode(), diff --git a/pkg/object/sharding.go b/pkg/object/sharding.go index ee7c3cb751ca..7c545710ce5d 100644 --- a/pkg/object/sharding.go +++ b/pkg/object/sharding.go @@ -73,6 +73,14 @@ func (s *sharded) Delete(key string) error { return s.pick(key).Delete(key) } +func (s *sharded) SetStorageClass(sc string) { + for _, o := range s.stores { + if os, ok := o.(SupportStorageClass); ok { + os.SetStorageClass(sc) + } + } +} + const maxResults = 10000 // ListAll on all the keys that starts at marker from object storage. diff --git a/pkg/object/space.go b/pkg/object/space.go index 01c2c9544ef7..162983c71a0c 100644 --- a/pkg/object/space.go +++ b/pkg/object/space.go @@ -70,7 +70,7 @@ func newSpace(endpoint, accessKey, secretKey, token string) (ObjectStorage, erro return nil, fmt.Errorf("aws session: %s", err) } ses.Handlers.Build.PushFront(disableSha256Func) - return &space{s3client{bucket, s3.New(ses), ses}}, nil + return &space{s3client{bucket: bucket, s3: s3.New(ses), ses: ses}}, nil } func init() { diff --git a/pkg/object/speedy.go b/pkg/object/speedy.go index 6a24e17cbc93..20cca8a7980f 100644 --- a/pkg/object/speedy.go +++ b/pkg/object/speedy.go @@ -100,7 +100,7 @@ func (s *speedy) List(prefix, marker, delimiter string, limit int64) ([]Object, if strings.HasSuffix(item.Key, "/.speedycloud_dir_flag") { continue } - objs = append(objs, &obj{item.Key, item.Size, item.LastModified, strings.HasSuffix(item.Key, "/")}) + objs = append(objs, &obj{item.Key, item.Size, item.LastModified, strings.HasSuffix(item.Key, "/"), ""}) } return objs, nil } diff --git a/pkg/object/sql.go b/pkg/object/sql.go index 94a92662f314..808b9cdc1a79 100644 --- a/pkg/object/sql.go +++ b/pkg/object/sql.go @@ -119,6 +119,7 @@ func (s *sqlStore) Head(key string) (Object, error) { b.Size, b.Modified, strings.HasSuffix(key, "/"), + "", }, nil } diff --git a/pkg/object/swift.go b/pkg/object/swift.go index 1f445976f598..16f511fb68f5 100644 --- a/pkg/object/swift.go +++ b/pkg/object/swift.go @@ -97,9 +97,9 @@ func (s *swiftOSS) List(prefix, marker, delimiter string, limit int64) ([]Object for i, o := range objects { // https://docs.openstack.org/swift/latest/api/pseudo-hierarchical-folders-directories.html if delimiter != "" && o.PseudoDirectory { - objs[i] = &obj{o.SubDir, 0, time.Unix(0, 0), true} + objs[i] = &obj{o.SubDir, 0, time.Unix(0, 0), true, ""} } else { - objs[i] = &obj{o.Name, o.Bytes, o.LastModified, strings.HasSuffix(o.Name, "/")} + objs[i] = &obj{o.Name, o.Bytes, o.LastModified, strings.HasSuffix(o.Name, "/"), ""} } } return objs, nil @@ -115,6 +115,7 @@ func (s *swiftOSS) Head(key string) (Object, error) { object.Bytes, object.LastModified, strings.HasSuffix(key, "/"), + "", }, err } diff --git a/pkg/object/tikv.go b/pkg/object/tikv.go index 74f08845a2ad..a37b30c01ec1 100644 --- a/pkg/object/tikv.go +++ b/pkg/object/tikv.go @@ -81,6 +81,7 @@ func (t *tikv) Head(key string) (Object, error) { int64(len(data)), time.Now(), strings.HasSuffix(key, "/"), + "", }, err } @@ -107,7 +108,7 @@ func (t *tikv) List(prefix, marker, delimiter string, limit int64) ([]Object, er mtime := time.Now() for i, k := range keys { // FIXME: mtime - objs[i] = &obj{string(k), int64(len(vs[i])), mtime, strings.HasSuffix(string(k), "/")} + objs[i] = &obj{string(k), int64(len(vs[i])), mtime, strings.HasSuffix(string(k), "/"), ""} } return objs, nil } diff --git a/pkg/object/tos.go b/pkg/object/tos.go index 8c2b627811ef..9395c6aabaa7 100644 --- a/pkg/object/tos.go +++ b/pkg/object/tos.go @@ -32,18 +32,20 @@ import ( "github.com/volcengine/ve-tos-golang-sdk/v2/tos" "github.com/volcengine/ve-tos-golang-sdk/v2/tos/codes" + "github.com/volcengine/ve-tos-golang-sdk/v2/tos/enum" ) type tosClient struct { bucket string + sc string client *tos.ClientV2 } -func (t tosClient) String() string { +func (t *tosClient) String() string { return fmt.Sprintf("tos://%s/", t.bucket) } -func (t tosClient) Limits() Limits { +func (t *tosClient) Limits() Limits { return Limits{ IsSupportMultipartUpload: true, IsSupportUploadPartCopy: true, @@ -53,8 +55,8 @@ func (t tosClient) Limits() Limits { } } -func (t tosClient) Create() error { - _, err := t.client.CreateBucketV2(context.Background(), &tos.CreateBucketV2Input{Bucket: t.bucket}) +func (t *tosClient) Create() error { + _, err := t.client.CreateBucketV2(context.Background(), &tos.CreateBucketV2Input{Bucket: t.bucket, StorageClass: enum.StorageClassType(t.sc)}) if e, ok := err.(*tos.TosServerError); ok { if e.Code == codes.BucketAlreadyOwnedByYou || e.Code == codes.BucketAlreadyExists { return nil @@ -63,7 +65,7 @@ func (t tosClient) Create() error { return err } -func (t tosClient) Get(key string, off, limit int64) (io.ReadCloser, error) { +func (t *tosClient) Get(key string, off, limit int64) (io.ReadCloser, error) { rangeStr := getRange(off, limit) resp, err := t.client.GetObjectV2(context.Background(), &tos.GetObjectV2Input{ Bucket: t.bucket, @@ -80,18 +82,19 @@ func (t tosClient) Get(key string, off, limit int64) (io.ReadCloser, error) { return resp.Content, nil } -func (t tosClient) Put(key string, in io.Reader) error { +func (t *tosClient) Put(key string, in io.Reader) error { _, err := t.client.PutObjectV2(context.Background(), &tos.PutObjectV2Input{ PutObjectBasicInput: tos.PutObjectBasicInput{ - Bucket: t.bucket, - Key: key, + Bucket: t.bucket, + Key: key, + StorageClass: enum.StorageClassType(t.sc), }, Content: in, }) return err } -func (t tosClient) Delete(key string) error { +func (t *tosClient) Delete(key string) error { _, err := t.client.DeleteObjectV2(context.Background(), &tos.DeleteObjectV2Input{ Bucket: t.bucket, Key: key, @@ -99,7 +102,7 @@ func (t tosClient) Delete(key string) error { return err } -func (t tosClient) Head(key string) (Object, error) { +func (t *tosClient) Head(key string) (Object, error) { head, err := t.client.HeadObjectV2(context.Background(), &tos.HeadObjectV2Input{Bucket: t.bucket, Key: key}) if err != nil { @@ -115,10 +118,11 @@ func (t tosClient) Head(key string) (Object, error) { head.ContentLength, head.LastModified, strings.HasSuffix(key, "/"), + string(head.StorageClass), }, err } -func (t tosClient) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { +func (t *tosClient) List(prefix, marker, delimiter string, limit int64) ([]Object, error) { resp, err := t.client.ListObjectsV2(context.Background(), &tos.ListObjectsV2Input{ Bucket: t.bucket, ListObjectsInput: tos.ListObjectsInput{ @@ -143,25 +147,27 @@ func (t tosClient) List(prefix, marker, delimiter string, limit int64) ([]Object o.Size, o.LastModified, strings.HasSuffix(o.Key, "/"), + string(o.StorageClass), } } if delimiter != "" { for _, p := range resp.CommonPrefixes { - objs = append(objs, &obj{p.Prefix, 0, time.Unix(0, 0), true}) + objs = append(objs, &obj{p.Prefix, 0, time.Unix(0, 0), true, ""}) } sort.Slice(objs, func(i, j int) bool { return objs[i].Key() < objs[j].Key() }) } return objs, nil } -func (t tosClient) ListAll(prefix, marker string) (<-chan Object, error) { +func (t *tosClient) ListAll(prefix, marker string) (<-chan Object, error) { return nil, notSupported } -func (t tosClient) CreateMultipartUpload(key string) (*MultipartUpload, error) { +func (t *tosClient) CreateMultipartUpload(key string) (*MultipartUpload, error) { resp, err := t.client.CreateMultipartUploadV2(context.Background(), &tos.CreateMultipartUploadV2Input{ - Bucket: t.bucket, - Key: key, + Bucket: t.bucket, + Key: key, + StorageClass: enum.StorageClassType(t.sc), }) if err != nil { return nil, err @@ -169,7 +175,7 @@ func (t tosClient) CreateMultipartUpload(key string) (*MultipartUpload, error) { return &MultipartUpload{UploadID: resp.UploadID, MinPartSize: 5 << 20, MaxCount: 10000}, nil } -func (t tosClient) UploadPart(key string, uploadID string, num int, body []byte) (*Part, error) { +func (t *tosClient) UploadPart(key string, uploadID string, num int, body []byte) (*Part, error) { resp, err := t.client.UploadPartV2(context.Background(), &tos.UploadPartV2Input{ UploadPartBasicInput: tos.UploadPartBasicInput{ Bucket: t.bucket, @@ -185,7 +191,7 @@ func (t tosClient) UploadPart(key string, uploadID string, num int, body []byte) return &Part{Num: num, ETag: resp.ETag}, nil } -func (t tosClient) UploadPartCopy(key string, uploadID string, num int, srcKey string, off, size int64) (*Part, error) { +func (t *tosClient) UploadPartCopy(key string, uploadID string, num int, srcKey string, off, size int64) (*Part, error) { resp, err := t.client.UploadPartCopyV2(context.Background(), &tos.UploadPartCopyV2Input{ Bucket: t.bucket, Key: key, @@ -202,7 +208,7 @@ func (t tosClient) UploadPartCopy(key string, uploadID string, num int, srcKey s return &Part{Num: num, ETag: resp.ETag}, nil } -func (t tosClient) AbortUpload(key string, uploadID string) { +func (t *tosClient) AbortUpload(key string, uploadID string) { _, _ = t.client.AbortMultipartUpload(context.Background(), &tos.AbortMultipartUploadInput{ Bucket: t.bucket, Key: key, @@ -210,7 +216,7 @@ func (t tosClient) AbortUpload(key string, uploadID string) { }) } -func (t tosClient) CompleteUpload(key string, uploadID string, parts []*Part) error { +func (t *tosClient) CompleteUpload(key string, uploadID string, parts []*Part) error { var tosParts []tos.UploadedPartV2 for i := range parts { tosParts = append(tosParts, tos.UploadedPartV2{ETag: parts[i].ETag, PartNumber: parts[i].Num}) @@ -224,7 +230,7 @@ func (t tosClient) CompleteUpload(key string, uploadID string, parts []*Part) er return err } -func (t tosClient) ListUploads(marker string) ([]*PendingPart, string, error) { +func (t *tosClient) ListUploads(marker string) ([]*PendingPart, string, error) { result, err := t.client.ListMultipartUploadsV2(context.Background(), &tos.ListMultipartUploadsV2Input{Bucket: t.bucket}) if err != nil { @@ -241,16 +247,21 @@ func (t tosClient) ListUploads(marker string) ([]*PendingPart, string, error) { return parts, nextMarker, nil } -func (t tosClient) Copy(dst, src string) error { +func (t *tosClient) Copy(dst, src string) error { _, err := t.client.CopyObject(context.Background(), &tos.CopyObjectInput{ - SrcBucket: t.bucket, - Bucket: t.bucket, - SrcKey: src, - Key: dst, + SrcBucket: t.bucket, + Bucket: t.bucket, + SrcKey: src, + Key: dst, + StorageClass: enum.StorageClassType(t.sc), }) return err } +func (t *tosClient) SetStorageClass(sc string) { + t.sc = sc +} + func newTOS(endpoint, accessKey, secretKey, token string) (ObjectStorage, error) { if !strings.Contains(endpoint, "://") { endpoint = fmt.Sprintf("https://%s", endpoint) diff --git a/pkg/object/ufile.go b/pkg/object/ufile.go index 0ae401ea5797..65e86846fc5b 100644 --- a/pkg/object/ufile.go +++ b/pkg/object/ufile.go @@ -222,7 +222,7 @@ func (u *ufile) List(prefix, marker, delimiter string, limit int64) ([]Object, e } objs := make([]Object, len(out.DataSet)) for i, item := range out.DataSet { - objs[i] = &obj{item.FileName, item.Size, time.Unix(int64(item.ModifyTime), 0), strings.HasSuffix(item.FileName, "/")} + objs[i] = &obj{item.FileName, item.Size, time.Unix(int64(item.ModifyTime), 0), strings.HasSuffix(item.FileName, "/"), ""} } return objs, nil } diff --git a/pkg/object/upyun.go b/pkg/object/upyun.go index 582be87f1c55..1858fc889276 100644 --- a/pkg/object/upyun.go +++ b/pkg/object/upyun.go @@ -58,6 +58,7 @@ func (u *up) Head(key string) (Object, error) { info.Size, info.Time, strings.HasSuffix(key, "/"), + "", }, nil } @@ -121,7 +122,7 @@ func (u *up) List(prefix, marker, delimiter string, limit int64) ([]Object, erro } key := prefix + "/" + fi.Name if !fi.IsDir && key > marker { - objs = append(objs, &obj{key, fi.Size, fi.Time, strings.HasSuffix(key, "/")}) + objs = append(objs, &obj{key, fi.Size, fi.Time, strings.HasSuffix(key, "/"), ""}) } } if len(objs) > 0 { diff --git a/pkg/object/wasabi.go b/pkg/object/wasabi.go index a9306b767d4a..9342dfb22d66 100644 --- a/pkg/object/wasabi.go +++ b/pkg/object/wasabi.go @@ -66,7 +66,7 @@ func newWasabi(endpoint, accessKey, secretKey, token string) (ObjectStorage, err return nil, fmt.Errorf("aws session: %s", err) } ses.Handlers.Build.PushFront(disableSha256Func) - return &wasabi{s3client{bucket, s3.New(ses), ses}}, nil + return &wasabi{s3client{bucket: bucket, s3: s3.New(ses), ses: ses}}, nil } func init() { diff --git a/pkg/object/webdav.go b/pkg/object/webdav.go index eb12ea3f39e1..d588cc0c5787 100644 --- a/pkg/object/webdav.go +++ b/pkg/object/webdav.go @@ -60,6 +60,7 @@ func (w *webdav) Head(key string) (Object, error) { info.Size(), info.ModTime(), info.IsDir(), + "", }, nil } @@ -262,6 +263,7 @@ func (w *webdav) ListAll(prefix, marker string) (<-chan Object, error) { info.Size(), info.ModTime(), false, + "", } return nil }) diff --git a/pkg/sync/cluster_test.go b/pkg/sync/cluster_test.go index 6464d75258d9..cde0fcea26a8 100644 --- a/pkg/sync/cluster_test.go +++ b/pkg/sync/cluster_test.go @@ -30,11 +30,12 @@ type obj struct { isSymlink bool } -func (o *obj) Key() string { return o.key } -func (o *obj) Size() int64 { return o.size } -func (o *obj) Mtime() time.Time { return o.mtime } -func (o *obj) IsDir() bool { return o.isDir } -func (o *obj) IsSymlink() bool { return o.isSymlink } +func (o *obj) Key() string { return o.key } +func (o *obj) Size() int64 { return o.size } +func (o *obj) Mtime() time.Time { return o.mtime } +func (o *obj) IsDir() bool { return o.isDir } +func (o *obj) IsSymlink() bool { return o.isSymlink } +func (o *obj) StorageClass() string { return "" } func TestCluster(t *testing.T) { // manager diff --git a/pkg/sync/config.go b/pkg/sync/config.go index 9cc9ba441c23..49bb379ad449 100644 --- a/pkg/sync/config.go +++ b/pkg/sync/config.go @@ -24,30 +24,31 @@ import ( ) type Config struct { - Start string - End string - Threads int - HTTPPort int - Update bool - ForceUpdate bool - Perms bool - Dry bool - DeleteSrc bool - DeleteDst bool - Dirs bool - Exclude []string - Include []string - Links bool - Limit int64 - Manager string - Workers []string - BWLimit int - NoHTTPS bool - Verbose bool - Quiet bool - CheckAll bool - CheckNew bool - Env map[string]string + StorageClass string + Start string + End string + Threads int + HTTPPort int + Update bool + ForceUpdate bool + Perms bool + Dry bool + DeleteSrc bool + DeleteDst bool + Dirs bool + Exclude []string + Include []string + Links bool + Limit int64 + Manager string + Workers []string + BWLimit int + NoHTTPS bool + Verbose bool + Quiet bool + CheckAll bool + CheckNew bool + Env map[string]string } func envList() []string {