From d99abda2acab9909ce194c6c4804479a8235ace7 Mon Sep 17 00:00:00 2001 From: tangyoupeng Date: Tue, 30 May 2023 17:23:48 +0800 Subject: [PATCH] Support hdfs://namenode:port for hdfs object storage (#3713) --- docs/en/guide/how_to_set_up_object_storage.md | 9 ++- .../guide/how_to_set_up_object_storage.md | 9 ++- pkg/object/hdfs.go | 71 +++++++++---------- pkg/object/object_storage_test.go | 28 ++++++++ 4 files changed, 77 insertions(+), 40 deletions(-) diff --git a/docs/en/guide/how_to_set_up_object_storage.md b/docs/en/guide/how_to_set_up_object_storage.md index 2b6846bc9ce9..2971178e19cf 100644 --- a/docs/en/guide/how_to_set_up_object_storage.md +++ b/docs/en/guide/how_to_set_up_object_storage.md @@ -897,9 +897,14 @@ When `--access-key` is not specified on formatting, JuiceFS will use the current JuiceFS will try to load configurations for HDFS client based on `$HADOOP_CONF_DIR` or `$HADOOP_HOME`. If an empty value is provided to `--bucket`, the default HDFS found in Hadoop configurations will be used. -For HA cluster, the addresses of NameNodes can be specified together like this: `--bucket=namenode1:port,namenode2:port`. +bucket format: -By default, data is stored on the subdirectory of `/`. You can specify `--bucket` including the path like `--bucket=namenode1:port,namenode2:port/user/juicefs` +- `[hdfs://]namenode:port[/path]` + +for HA cluster: + +- `[hdfs://]namenode1:port,namenode2:port[/path]` +- `[hdfs://]nameservice[/path]` ## Apache Ozone diff --git a/docs/zh_cn/guide/how_to_set_up_object_storage.md b/docs/zh_cn/guide/how_to_set_up_object_storage.md index d20d4bcc12bc..721b7da5057b 100644 --- a/docs/zh_cn/guide/how_to_set_up_object_storage.md +++ b/docs/zh_cn/guide/how_to_set_up_object_storage.md @@ -895,7 +895,14 @@ juicefs format \ JuiceFS 会尝试基于 `$HADOOP_CONF_DIR` 或 `$HADOOP_HOME` 为 HDFS 客户端加载配置。如果 `--bucket` 选项留空,将使用在 Hadoop 配置中找到的默认 HDFS。 -对于 HA 群集,可以像下面这样一起指定 NameNodes 的地址:`--bucket=namenode1:port,namenode2:port`。 +bucket 参数支持格式如下: + +- `[hdfs://]namenode:port[/path]` + +对于 HA 集群,bucket 参数可以: + +- `[hdfs://]namenode1:port,namenode2:port[/path]` +- `[hdfs://]nameservice[/path]` ## Apache Ozone diff --git a/pkg/object/hdfs.go b/pkg/object/hdfs.go index fc1b7e143fd9..58f138485def 100644 --- a/pkg/object/hdfs.go +++ b/pkg/object/hdfs.go @@ -48,7 +48,7 @@ type hdfsclient struct { } func (h *hdfsclient) String() string { - return fmt.Sprintf("hdfs://%s/", h.addr) + return fmt.Sprintf("hdfs://%s%s", h.addr, h.basePath) } func (h *hdfsclient) path(key string) string { @@ -314,41 +314,11 @@ func newHDFS(addr, username, sk, token string) (ObjectStorage, error) { return nil, fmt.Errorf("Problem loading configuration: %s", err) } - rpcAddr := addr - // addr can be hdfs://nameservice e.g. hdfs://example, hdfs://example/user/juicefs - // convert the nameservice as a comma separated list of host:port by referencing hadoop conf - if strings.HasPrefix(addr, "hdfs://") { - sp := strings.SplitN(addr[len("hdfs://"):], "/", 2) - nameservice := sp[0] - - var nns []string - confParam := "dfs.namenode.rpc-address." + nameservice - for key, value := range conf { - if key == confParam || - strings.HasPrefix(key, confParam + "." ) { - nns = append(nns, value) - } - } - if len(nns) <= 0 { - return nil, fmt.Errorf("invalid nameservice: %s", nameservice) - } - // e.g. nn1.example.com:8020,nn2.example.com:8020, nn1.example.com:8020,nn2.example.com:8020/user/juicefs - rpcAddr = strings.Join(nns, ",") - if len(sp) > 1 { - rpcAddr = rpcAddr + "/" + sp[1] - } - } - - basePath := "/" + rpcAddr, basePath := parseHDFSAddr(addr, conf) options := hdfs.ClientOptionsFromConf(conf) - if rpcAddr != "" { - // nn1.example.com:8020,nn2.example.com:8020/user/juicefs - sp := strings.SplitN(rpcAddr, "/", 2) - if len(sp) > 1 { - basePath = basePath + strings.TrimRight(sp[1], "/") + "/" - } - options.Addresses = strings.Split(sp[0], ",") - logger.Infof("HDFS Addresses: %s, basePath: %s", sp[0], basePath) + if addr != "" { + options.Addresses = rpcAddr + logger.Infof("HDFS Addresses: %s, basePath: %s", rpcAddr, basePath) } if options.KerberosClient != nil { @@ -381,14 +351,41 @@ func newHDFS(addr, username, sk, token string) (ObjectStorage, error) { supergroup = os.Getenv("HADOOP_SUPER_GROUP") } - var replication int = 3 + var replication = 3 if replication_conf, found := conf["dfs.replication"]; found { if x, err := strconv.Atoi(replication_conf); err == nil { replication = x } } - return &hdfsclient{addr: rpcAddr, c: c, dfsReplication: replication, basePath: basePath}, nil + return &hdfsclient{addr: strings.Join(rpcAddr, ","), c: c, dfsReplication: replication, basePath: basePath}, nil +} + +// addr can be hdfs://nameservice e.g. hdfs://example, hdfs://example/user/juicefs +// convert the nameservice as a comma separated list of host:port by referencing hadoop conf +func parseHDFSAddr(addr string, conf hadoopconf.HadoopConf) (rpcAddresses []string, basePath string) { + addr = strings.TrimPrefix(addr, "hdfs://") + sp := strings.SplitN(addr, "/", 2) + authority := sp[0] + + // check if it is a nameservice + var nns []string + confParam := "dfs.namenode.rpc-address." + authority + for key, value := range conf { + if key == confParam || strings.HasPrefix(key, confParam+".") { + nns = append(nns, value) + } + } + if len(nns) > 0 { + rpcAddresses = nns + } else { + rpcAddresses = strings.Split(authority, ",") + } + basePath = "/" + if len(sp) > 1 && len(sp[1]) > 0 { + basePath += strings.TrimRight(sp[1], "/") + "/" + } + return } func init() { diff --git a/pkg/object/object_storage_test.go b/pkg/object/object_storage_test.go index 41d04576c30e..947e6e51ce35 100644 --- a/pkg/object/object_storage_test.go +++ b/pkg/object/object_storage_test.go @@ -35,6 +35,8 @@ import ( "testing" "time" + "github.com/colinmarc/hdfs/v2/hadoopconf" + blob2 "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/volcengine/ve-tos-golang-sdk/v2/tos/enum" @@ -725,6 +727,32 @@ func TestOBS(t *testing.T) { //skip mutate } func TestHDFS(t *testing.T) { //skip mutate + conf := make(hadoopconf.HadoopConf) + conf["dfs.namenode.rpc-address.ns.namenode1"] = "hadoop01:8020" + conf["dfs.namenode.rpc-address.ns.namenode2"] = "hadoop02:8020" + + checkAddr := func(addr string, expected []string, base string) { + addresses, basePath := parseHDFSAddr(addr, conf) + if !reflect.DeepEqual(addresses, expected) { + t.Fatalf("expected addrs is %+v but got %+v from %s", expected, addresses, addr) + } + if basePath != base { + t.Fatalf("expected path is %s but got %s from %s", base, basePath, addr) + } + } + + checkAddr("hadoop01:8020", []string{"hadoop01:8020"}, "/") + checkAddr("hdfs://hadoop01:8020/", []string{"hadoop01:8020"}, "/") + checkAddr("hadoop01:8020/user/juicefs/", []string{"hadoop01:8020"}, "/user/juicefs/") + checkAddr("hadoop01:8020/user/juicefs", []string{"hadoop01:8020"}, "/user/juicefs/") + checkAddr("hdfs://hadoop01:8020/user/juicefs/", []string{"hadoop01:8020"}, "/user/juicefs/") + + // for HA + checkAddr("hadoop01:8020,hadoop02:8020", []string{"hadoop01:8020", "hadoop02:8020"}, "/") + checkAddr("hadoop01:8020,hadoop02:8020/user/juicefs/", []string{"hadoop01:8020", "hadoop02:8020"}, "/user/juicefs/") + checkAddr("hdfs://ns/user/juicefs", []string{"hadoop01:8020", "hadoop02:8020"}, "/user/juicefs/") + checkAddr("ns/user/juicefs/", []string{"hadoop01:8020", "hadoop02:8020"}, "/user/juicefs/") + if os.Getenv("HDFS_ADDR") == "" { t.SkipNow() }