From b0a2ba77b3fe7487c92a42a14d812ab174999d43 Mon Sep 17 00:00:00 2001 From: tangyoupeng Date: Mon, 29 May 2023 17:57:14 +0800 Subject: [PATCH 1/7] Support hdfs://namenode:port for hdfs object storage --- pkg/object/hdfs.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/object/hdfs.go b/pkg/object/hdfs.go index fc1b7e143fd9..1e0e981ddaca 100644 --- a/pkg/object/hdfs.go +++ b/pkg/object/hdfs.go @@ -325,12 +325,12 @@ func newHDFS(addr, username, sk, token string) (ObjectStorage, error) { confParam := "dfs.namenode.rpc-address." + nameservice for key, value := range conf { if key == confParam || - strings.HasPrefix(key, confParam + "." ) { + strings.HasPrefix(key, confParam+".") { nns = append(nns, value) } } - if len(nns) <= 0 { - return nil, fmt.Errorf("invalid nameservice: %s", nameservice) + if len(nns) == 0 { + nns = append(nns, nameservice) } // e.g. nn1.example.com:8020,nn2.example.com:8020, nn1.example.com:8020,nn2.example.com:8020/user/juicefs rpcAddr = strings.Join(nns, ",") From 246f0044800b78c4858bf8eb4e480230cd31c340 Mon Sep 17 00:00:00 2001 From: tangyoupeng Date: Tue, 30 May 2023 12:16:17 +0800 Subject: [PATCH 2/7] Support hdfs://namenode:port for hdfs object storage --- docs/en/guide/how_to_set_up_object_storage.md | 9 ++- .../guide/how_to_set_up_object_storage.md | 10 ++- pkg/object/hdfs.go | 62 ++++++++++--------- pkg/object/object_storage_test.go | 32 ++++++++++ 4 files changed, 82 insertions(+), 31 deletions(-) diff --git a/docs/en/guide/how_to_set_up_object_storage.md b/docs/en/guide/how_to_set_up_object_storage.md index 2b6846bc9ce9..b9bf7465bab1 100644 --- a/docs/en/guide/how_to_set_up_object_storage.md +++ b/docs/en/guide/how_to_set_up_object_storage.md @@ -897,9 +897,14 @@ When `--access-key` is not specified on formatting, JuiceFS will use the current JuiceFS will try to load configurations for HDFS client based on `$HADOOP_CONF_DIR` or `$HADOOP_HOME`. If an empty value is provided to `--bucket`, the default HDFS found in Hadoop configurations will be used. -For HA cluster, the addresses of NameNodes can be specified together like this: `--bucket=namenode1:port,namenode2:port`. +bucket format: -By default, data is stored on the subdirectory of `/`. You can specify `--bucket` including the path like `--bucket=namenode1:port,namenode2:port/user/juicefs` +- [hdfs://]namenode:port[/path] + +for HA cluster: + +- [hdfs://]namenode1:port,namenode2:port[/path] +- [hdfs://]nameservice[/path] ## Apache Ozone diff --git a/docs/zh_cn/guide/how_to_set_up_object_storage.md b/docs/zh_cn/guide/how_to_set_up_object_storage.md index d20d4bcc12bc..6d2ee0186ead 100644 --- a/docs/zh_cn/guide/how_to_set_up_object_storage.md +++ b/docs/zh_cn/guide/how_to_set_up_object_storage.md @@ -895,7 +895,15 @@ juicefs format \ JuiceFS 会尝试基于 `$HADOOP_CONF_DIR` 或 `$HADOOP_HOME` 为 HDFS 客户端加载配置。如果 `--bucket` 选项留空,将使用在 Hadoop 配置中找到的默认 HDFS。 -对于 HA 群集,可以像下面这样一起指定 NameNodes 的地址:`--bucket=namenode1:port,namenode2:port`。 +bucket 参数支持格式如下: + +- [hdfs://]namenode:port[/path] + +对于 HA 集群,bucket 参数可以: + +- [hdfs://]namenode1:port,namenode2:port[/path] +- [hdfs://]nameservice[/path] + ## Apache Ozone diff --git a/pkg/object/hdfs.go b/pkg/object/hdfs.go index 1e0e981ddaca..6f93d9b8072c 100644 --- a/pkg/object/hdfs.go +++ b/pkg/object/hdfs.go @@ -314,40 +314,19 @@ func newHDFS(addr, username, sk, token string) (ObjectStorage, error) { return nil, fmt.Errorf("Problem loading configuration: %s", err) } - rpcAddr := addr + //rpcAddr := addr // addr can be hdfs://nameservice e.g. hdfs://example, hdfs://example/user/juicefs // convert the nameservice as a comma separated list of host:port by referencing hadoop conf - if strings.HasPrefix(addr, "hdfs://") { - sp := strings.SplitN(addr[len("hdfs://"):], "/", 2) - nameservice := sp[0] - - var nns []string - confParam := "dfs.namenode.rpc-address." + nameservice - for key, value := range conf { - if key == confParam || - strings.HasPrefix(key, confParam+".") { - nns = append(nns, value) - } - } - if len(nns) == 0 { - nns = append(nns, nameservice) - } - // e.g. nn1.example.com:8020,nn2.example.com:8020, nn1.example.com:8020,nn2.example.com:8020/user/juicefs - rpcAddr = strings.Join(nns, ",") - if len(sp) > 1 { - rpcAddr = rpcAddr + "/" + sp[1] - } - } + rpcAddr, basePath := parseAddr(addr, conf) - basePath := "/" options := hdfs.ClientOptionsFromConf(conf) - if rpcAddr != "" { + if addr != "" { // nn1.example.com:8020,nn2.example.com:8020/user/juicefs - sp := strings.SplitN(rpcAddr, "/", 2) + sp := strings.SplitN(addr, "/", 2) if len(sp) > 1 { basePath = basePath + strings.TrimRight(sp[1], "/") + "/" } - options.Addresses = strings.Split(sp[0], ",") + options.Addresses = rpcAddr logger.Infof("HDFS Addresses: %s, basePath: %s", sp[0], basePath) } @@ -381,14 +360,41 @@ func newHDFS(addr, username, sk, token string) (ObjectStorage, error) { supergroup = os.Getenv("HADOOP_SUPER_GROUP") } - var replication int = 3 + var replication = 3 if replication_conf, found := conf["dfs.replication"]; found { if x, err := strconv.Atoi(replication_conf); err == nil { replication = x } } - return &hdfsclient{addr: rpcAddr, c: c, dfsReplication: replication, basePath: basePath}, nil + return &hdfsclient{addr: addr, c: c, dfsReplication: replication, basePath: basePath}, nil +} + +func parseAddr(addr string, conf hadoopconf.HadoopConf) (rpcAddresses []string, basePath string) { + addr = strings.TrimPrefix(addr, "hdfs://") + + sp := strings.SplitN(addr, "/", 2) + authority := sp[0] + + // check if it is a nameservice + var nns []string + confParam := "dfs.namenode.rpc-address." + authority + for key, value := range conf { + if key == confParam || + strings.HasPrefix(key, confParam+".") { + nns = append(nns, value) + } + } + if len(nns) > 0 { + rpcAddresses = nns + } else { + rpcAddresses = strings.Split(authority, ",") + } + basePath = "/" + if len(sp) > 1 { + basePath = basePath + strings.TrimRight(sp[1], "/") + "/" + } + return } func init() { diff --git a/pkg/object/object_storage_test.go b/pkg/object/object_storage_test.go index 41d04576c30e..c073dcf8e40a 100644 --- a/pkg/object/object_storage_test.go +++ b/pkg/object/object_storage_test.go @@ -24,6 +24,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/colinmarc/hdfs/v2/hadoopconf" "io" "math" "os" @@ -728,10 +729,41 @@ func TestHDFS(t *testing.T) { //skip mutate if os.Getenv("HDFS_ADDR") == "" { t.SkipNow() } + + conf := make(hadoopconf.HadoopConf) + conf["dfs.namenode.rpc-address.ns.namenode1"] = "hadoop01:8020" + conf["dfs.namenode.rpc-address.ns.namenode2"] = "hadoop02:8020" + + addresses, basePath := parseAddr("hadoop01:8020", conf) + checkResult(t, addresses, basePath, []string{"hadoop01:8020"}, "/") + addresses, basePath = parseAddr("hadoop01:8020/user/juicefs/", conf) + checkResult(t, addresses, basePath, []string{"hadoop01:8020"}, "/user/juicefs/") + addresses, basePath = parseAddr("hdfs://hadoop01:8020/user/juicefs/", conf) + checkResult(t, addresses, basePath, []string{"hadoop01:8020"}, "/user/juicefs/") + + // for HA + addresses, basePath = parseAddr("hadoop01:8020,hadoop02:8020", conf) + checkResult(t, addresses, basePath, []string{"hadoop01:8020", "hadoop02:8020"}, "/") + addresses, basePath = parseAddr("hadoop01:8020,hadoop02:8020/user/juicefs/", conf) + checkResult(t, addresses, basePath, []string{"hadoop01:8020", "hadoop02:8020"}, "/user/juicefs/") + addresses, basePath = parseAddr("hdfs://ns/user/juicefs", conf) + checkResult(t, addresses, basePath, []string{"hadoop01:8020", "hadoop02:8020"}, "/user/juicefs/") + addresses, basePath = parseAddr("ns/user/juicefs/", conf) + checkResult(t, addresses, basePath, []string{"hadoop01:8020", "hadoop02:8020"}, "/user/juicefs/") + dfs, _ := newHDFS(os.Getenv("HDFS_ADDR"), "", "", "") testStorage(t, dfs) } +func checkResult(t *testing.T, addresses []string, basePath string, expected []string, expectedBasePath string) { + if !reflect.DeepEqual(addresses, expected) { + t.Fatalf("parseAddr for HDFS failed: %v", addresses) + } + if basePath != expectedBasePath { + t.Fatalf("parseAddr for HDFS failed: %v", basePath) + } +} + func TestOOS(t *testing.T) { //skip mutate if os.Getenv("OOS_ACCESS_KEY") == "" { t.SkipNow() From f3469f6d13e190de27c5d1fe068329e848bcb385 Mon Sep 17 00:00:00 2001 From: tangyoupeng Date: Tue, 30 May 2023 13:08:41 +0800 Subject: [PATCH 3/7] fix --- pkg/object/hdfs.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/pkg/object/hdfs.go b/pkg/object/hdfs.go index 6f93d9b8072c..961d26c4cf36 100644 --- a/pkg/object/hdfs.go +++ b/pkg/object/hdfs.go @@ -321,13 +321,9 @@ func newHDFS(addr, username, sk, token string) (ObjectStorage, error) { options := hdfs.ClientOptionsFromConf(conf) if addr != "" { - // nn1.example.com:8020,nn2.example.com:8020/user/juicefs - sp := strings.SplitN(addr, "/", 2) - if len(sp) > 1 { - basePath = basePath + strings.TrimRight(sp[1], "/") + "/" - } options.Addresses = rpcAddr - logger.Infof("HDFS Addresses: %s, basePath: %s", sp[0], basePath) + logger.Infof("HDFS Addresses: %s, basePath: %s", rpcAddr, basePath) + } if options.KerberosClient != nil { @@ -367,7 +363,7 @@ func newHDFS(addr, username, sk, token string) (ObjectStorage, error) { } } - return &hdfsclient{addr: addr, c: c, dfsReplication: replication, basePath: basePath}, nil + return &hdfsclient{addr: strings.TrimSuffix(strings.Join(rpcAddr, ",")+basePath, "/"), c: c, dfsReplication: replication, basePath: basePath}, nil } func parseAddr(addr string, conf hadoopconf.HadoopConf) (rpcAddresses []string, basePath string) { From b645360e9f03726a5cbe3c279134aa04ad75c9c7 Mon Sep 17 00:00:00 2001 From: tangyoupeng Date: Tue, 30 May 2023 13:09:37 +0800 Subject: [PATCH 4/7] clean --- pkg/object/hdfs.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/object/hdfs.go b/pkg/object/hdfs.go index 961d26c4cf36..38c154a83b18 100644 --- a/pkg/object/hdfs.go +++ b/pkg/object/hdfs.go @@ -314,7 +314,6 @@ func newHDFS(addr, username, sk, token string) (ObjectStorage, error) { return nil, fmt.Errorf("Problem loading configuration: %s", err) } - //rpcAddr := addr // addr can be hdfs://nameservice e.g. hdfs://example, hdfs://example/user/juicefs // convert the nameservice as a comma separated list of host:port by referencing hadoop conf rpcAddr, basePath := parseAddr(addr, conf) From b8bbe4850473ebe610ae894df39b5a6da07722b4 Mon Sep 17 00:00:00 2001 From: tangyoupeng Date: Tue, 30 May 2023 13:12:54 +0800 Subject: [PATCH 5/7] fix doc --- docs/en/guide/how_to_set_up_object_storage.md | 6 +++--- docs/zh_cn/guide/how_to_set_up_object_storage.md | 7 +++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/docs/en/guide/how_to_set_up_object_storage.md b/docs/en/guide/how_to_set_up_object_storage.md index b9bf7465bab1..2971178e19cf 100644 --- a/docs/en/guide/how_to_set_up_object_storage.md +++ b/docs/en/guide/how_to_set_up_object_storage.md @@ -899,12 +899,12 @@ JuiceFS will try to load configurations for HDFS client based on `$HADOOP_CONF_D bucket format: -- [hdfs://]namenode:port[/path] +- `[hdfs://]namenode:port[/path]` for HA cluster: -- [hdfs://]namenode1:port,namenode2:port[/path] -- [hdfs://]nameservice[/path] +- `[hdfs://]namenode1:port,namenode2:port[/path]` +- `[hdfs://]nameservice[/path]` ## Apache Ozone diff --git a/docs/zh_cn/guide/how_to_set_up_object_storage.md b/docs/zh_cn/guide/how_to_set_up_object_storage.md index 6d2ee0186ead..721b7da5057b 100644 --- a/docs/zh_cn/guide/how_to_set_up_object_storage.md +++ b/docs/zh_cn/guide/how_to_set_up_object_storage.md @@ -897,13 +897,12 @@ JuiceFS 会尝试基于 `$HADOOP_CONF_DIR` 或 `$HADOOP_HOME` 为 HDFS 客户端 bucket 参数支持格式如下: -- [hdfs://]namenode:port[/path] +- `[hdfs://]namenode:port[/path]` 对于 HA 集群,bucket 参数可以: -- [hdfs://]namenode1:port,namenode2:port[/path] -- [hdfs://]nameservice[/path] - +- `[hdfs://]namenode1:port,namenode2:port[/path]` +- `[hdfs://]nameservice[/path]` ## Apache Ozone From 617699cf86702a6bd05a6f176287d19b3290fed8 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 30 May 2023 16:36:58 +0800 Subject: [PATCH 6/7] cleanup --- pkg/object/hdfs.go | 18 +++++------ pkg/object/object_storage_test.go | 52 ++++++++++++++----------------- 2 files changed, 31 insertions(+), 39 deletions(-) diff --git a/pkg/object/hdfs.go b/pkg/object/hdfs.go index 38c154a83b18..d0e7434016f0 100644 --- a/pkg/object/hdfs.go +++ b/pkg/object/hdfs.go @@ -314,15 +314,11 @@ func newHDFS(addr, username, sk, token string) (ObjectStorage, error) { return nil, fmt.Errorf("Problem loading configuration: %s", err) } - // addr can be hdfs://nameservice e.g. hdfs://example, hdfs://example/user/juicefs - // convert the nameservice as a comma separated list of host:port by referencing hadoop conf - rpcAddr, basePath := parseAddr(addr, conf) - + rpcAddr, basePath := parseHDFSAddr(addr, conf) options := hdfs.ClientOptionsFromConf(conf) if addr != "" { options.Addresses = rpcAddr logger.Infof("HDFS Addresses: %s, basePath: %s", rpcAddr, basePath) - } if options.KerberosClient != nil { @@ -365,9 +361,10 @@ func newHDFS(addr, username, sk, token string) (ObjectStorage, error) { return &hdfsclient{addr: strings.TrimSuffix(strings.Join(rpcAddr, ",")+basePath, "/"), c: c, dfsReplication: replication, basePath: basePath}, nil } -func parseAddr(addr string, conf hadoopconf.HadoopConf) (rpcAddresses []string, basePath string) { +// addr can be hdfs://nameservice e.g. hdfs://example, hdfs://example/user/juicefs +// convert the nameservice as a comma separated list of host:port by referencing hadoop conf +func parseHDFSAddr(addr string, conf hadoopconf.HadoopConf) (rpcAddresses []string, basePath string) { addr = strings.TrimPrefix(addr, "hdfs://") - sp := strings.SplitN(addr, "/", 2) authority := sp[0] @@ -375,8 +372,7 @@ func parseAddr(addr string, conf hadoopconf.HadoopConf) (rpcAddresses []string, var nns []string confParam := "dfs.namenode.rpc-address." + authority for key, value := range conf { - if key == confParam || - strings.HasPrefix(key, confParam+".") { + if key == confParam || strings.HasPrefix(key, confParam+".") { nns = append(nns, value) } } @@ -386,8 +382,8 @@ func parseAddr(addr string, conf hadoopconf.HadoopConf) (rpcAddresses []string, rpcAddresses = strings.Split(authority, ",") } basePath = "/" - if len(sp) > 1 { - basePath = basePath + strings.TrimRight(sp[1], "/") + "/" + if len(sp) > 1 && len(sp[1]) > 0 { + basePath += strings.TrimRight(sp[1], "/") + "/" } return } diff --git a/pkg/object/object_storage_test.go b/pkg/object/object_storage_test.go index c073dcf8e40a..947e6e51ce35 100644 --- a/pkg/object/object_storage_test.go +++ b/pkg/object/object_storage_test.go @@ -24,7 +24,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/colinmarc/hdfs/v2/hadoopconf" "io" "math" "os" @@ -36,6 +35,8 @@ import ( "testing" "time" + "github.com/colinmarc/hdfs/v2/hadoopconf" + blob2 "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/volcengine/ve-tos-golang-sdk/v2/tos/enum" @@ -726,44 +727,39 @@ func TestOBS(t *testing.T) { //skip mutate } func TestHDFS(t *testing.T) { //skip mutate - if os.Getenv("HDFS_ADDR") == "" { - t.SkipNow() - } - conf := make(hadoopconf.HadoopConf) conf["dfs.namenode.rpc-address.ns.namenode1"] = "hadoop01:8020" conf["dfs.namenode.rpc-address.ns.namenode2"] = "hadoop02:8020" - addresses, basePath := parseAddr("hadoop01:8020", conf) - checkResult(t, addresses, basePath, []string{"hadoop01:8020"}, "/") - addresses, basePath = parseAddr("hadoop01:8020/user/juicefs/", conf) - checkResult(t, addresses, basePath, []string{"hadoop01:8020"}, "/user/juicefs/") - addresses, basePath = parseAddr("hdfs://hadoop01:8020/user/juicefs/", conf) - checkResult(t, addresses, basePath, []string{"hadoop01:8020"}, "/user/juicefs/") + checkAddr := func(addr string, expected []string, base string) { + addresses, basePath := parseHDFSAddr(addr, conf) + if !reflect.DeepEqual(addresses, expected) { + t.Fatalf("expected addrs is %+v but got %+v from %s", expected, addresses, addr) + } + if basePath != base { + t.Fatalf("expected path is %s but got %s from %s", base, basePath, addr) + } + } + + checkAddr("hadoop01:8020", []string{"hadoop01:8020"}, "/") + checkAddr("hdfs://hadoop01:8020/", []string{"hadoop01:8020"}, "/") + checkAddr("hadoop01:8020/user/juicefs/", []string{"hadoop01:8020"}, "/user/juicefs/") + checkAddr("hadoop01:8020/user/juicefs", []string{"hadoop01:8020"}, "/user/juicefs/") + checkAddr("hdfs://hadoop01:8020/user/juicefs/", []string{"hadoop01:8020"}, "/user/juicefs/") // for HA - addresses, basePath = parseAddr("hadoop01:8020,hadoop02:8020", conf) - checkResult(t, addresses, basePath, []string{"hadoop01:8020", "hadoop02:8020"}, "/") - addresses, basePath = parseAddr("hadoop01:8020,hadoop02:8020/user/juicefs/", conf) - checkResult(t, addresses, basePath, []string{"hadoop01:8020", "hadoop02:8020"}, "/user/juicefs/") - addresses, basePath = parseAddr("hdfs://ns/user/juicefs", conf) - checkResult(t, addresses, basePath, []string{"hadoop01:8020", "hadoop02:8020"}, "/user/juicefs/") - addresses, basePath = parseAddr("ns/user/juicefs/", conf) - checkResult(t, addresses, basePath, []string{"hadoop01:8020", "hadoop02:8020"}, "/user/juicefs/") + checkAddr("hadoop01:8020,hadoop02:8020", []string{"hadoop01:8020", "hadoop02:8020"}, "/") + checkAddr("hadoop01:8020,hadoop02:8020/user/juicefs/", []string{"hadoop01:8020", "hadoop02:8020"}, "/user/juicefs/") + checkAddr("hdfs://ns/user/juicefs", []string{"hadoop01:8020", "hadoop02:8020"}, "/user/juicefs/") + checkAddr("ns/user/juicefs/", []string{"hadoop01:8020", "hadoop02:8020"}, "/user/juicefs/") + if os.Getenv("HDFS_ADDR") == "" { + t.SkipNow() + } dfs, _ := newHDFS(os.Getenv("HDFS_ADDR"), "", "", "") testStorage(t, dfs) } -func checkResult(t *testing.T, addresses []string, basePath string, expected []string, expectedBasePath string) { - if !reflect.DeepEqual(addresses, expected) { - t.Fatalf("parseAddr for HDFS failed: %v", addresses) - } - if basePath != expectedBasePath { - t.Fatalf("parseAddr for HDFS failed: %v", basePath) - } -} - func TestOOS(t *testing.T) { //skip mutate if os.Getenv("OOS_ACCESS_KEY") == "" { t.SkipNow() From d1ad2cfd14ead517ba0518eb6d506b4f54c31cf8 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 30 May 2023 16:42:11 +0800 Subject: [PATCH 7/7] cleanup --- pkg/object/hdfs.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/object/hdfs.go b/pkg/object/hdfs.go index d0e7434016f0..58f138485def 100644 --- a/pkg/object/hdfs.go +++ b/pkg/object/hdfs.go @@ -48,7 +48,7 @@ type hdfsclient struct { } func (h *hdfsclient) String() string { - return fmt.Sprintf("hdfs://%s/", h.addr) + return fmt.Sprintf("hdfs://%s%s", h.addr, h.basePath) } func (h *hdfsclient) path(key string) string { @@ -358,7 +358,7 @@ func newHDFS(addr, username, sk, token string) (ObjectStorage, error) { } } - return &hdfsclient{addr: strings.TrimSuffix(strings.Join(rpcAddr, ",")+basePath, "/"), c: c, dfsReplication: replication, basePath: basePath}, nil + return &hdfsclient{addr: strings.Join(rpcAddr, ","), c: c, dfsReplication: replication, basePath: basePath}, nil } // addr can be hdfs://nameservice e.g. hdfs://example, hdfs://example/user/juicefs