Skip to content

Commit

Permalink
Support hdfs://namenode:port for hdfs object storage (#3713)
Browse files Browse the repository at this point in the history
  • Loading branch information
tangyoupeng authored May 30, 2023
1 parent 1d0c9d0 commit d99abda
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 40 deletions.
9 changes: 7 additions & 2 deletions docs/en/guide/how_to_set_up_object_storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -897,9 +897,14 @@ When `--access-key` is not specified on formatting, JuiceFS will use the current

JuiceFS will try to load configurations for HDFS client based on `$HADOOP_CONF_DIR` or `$HADOOP_HOME`. If an empty value is provided to `--bucket`, the default HDFS found in Hadoop configurations will be used.

For HA cluster, the addresses of NameNodes can be specified together like this: `--bucket=namenode1:port,namenode2:port`.
bucket format:

By default, data is stored on the subdirectory of `/`. You can specify `--bucket` including the path like `--bucket=namenode1:port,namenode2:port/user/juicefs`
- `[hdfs://]namenode:port[/path]`

for HA cluster:

- `[hdfs://]namenode1:port,namenode2:port[/path]`
- `[hdfs://]nameservice[/path]`

## Apache Ozone

Expand Down
9 changes: 8 additions & 1 deletion docs/zh_cn/guide/how_to_set_up_object_storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,14 @@ juicefs format \

JuiceFS 会尝试基于 `$HADOOP_CONF_DIR``$HADOOP_HOME` 为 HDFS 客户端加载配置。如果 `--bucket` 选项留空,将使用在 Hadoop 配置中找到的默认 HDFS。

对于 HA 群集,可以像下面这样一起指定 NameNodes 的地址:`--bucket=namenode1:port,namenode2:port`
bucket 参数支持格式如下:

- `[hdfs://]namenode:port[/path]`

对于 HA 集群,bucket 参数可以:

- `[hdfs://]namenode1:port,namenode2:port[/path]`
- `[hdfs://]nameservice[/path]`

## Apache Ozone

Expand Down
71 changes: 34 additions & 37 deletions pkg/object/hdfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type hdfsclient struct {
}

func (h *hdfsclient) String() string {
return fmt.Sprintf("hdfs://%s/", h.addr)
return fmt.Sprintf("hdfs://%s%s", h.addr, h.basePath)
}

func (h *hdfsclient) path(key string) string {
Expand Down Expand Up @@ -314,41 +314,11 @@ func newHDFS(addr, username, sk, token string) (ObjectStorage, error) {
return nil, fmt.Errorf("Problem loading configuration: %s", err)
}

rpcAddr := addr
// addr can be hdfs://nameservice e.g. hdfs://example, hdfs://example/user/juicefs
// convert the nameservice as a comma separated list of host:port by referencing hadoop conf
if strings.HasPrefix(addr, "hdfs://") {
sp := strings.SplitN(addr[len("hdfs://"):], "/", 2)
nameservice := sp[0]

var nns []string
confParam := "dfs.namenode.rpc-address." + nameservice
for key, value := range conf {
if key == confParam ||
strings.HasPrefix(key, confParam + "." ) {
nns = append(nns, value)
}
}
if len(nns) <= 0 {
return nil, fmt.Errorf("invalid nameservice: %s", nameservice)
}
// e.g. nn1.example.com:8020,nn2.example.com:8020, nn1.example.com:8020,nn2.example.com:8020/user/juicefs
rpcAddr = strings.Join(nns, ",")
if len(sp) > 1 {
rpcAddr = rpcAddr + "/" + sp[1]
}
}

basePath := "/"
rpcAddr, basePath := parseHDFSAddr(addr, conf)
options := hdfs.ClientOptionsFromConf(conf)
if rpcAddr != "" {
// nn1.example.com:8020,nn2.example.com:8020/user/juicefs
sp := strings.SplitN(rpcAddr, "/", 2)
if len(sp) > 1 {
basePath = basePath + strings.TrimRight(sp[1], "/") + "/"
}
options.Addresses = strings.Split(sp[0], ",")
logger.Infof("HDFS Addresses: %s, basePath: %s", sp[0], basePath)
if addr != "" {
options.Addresses = rpcAddr
logger.Infof("HDFS Addresses: %s, basePath: %s", rpcAddr, basePath)
}

if options.KerberosClient != nil {
Expand Down Expand Up @@ -381,14 +351,41 @@ func newHDFS(addr, username, sk, token string) (ObjectStorage, error) {
supergroup = os.Getenv("HADOOP_SUPER_GROUP")
}

var replication int = 3
var replication = 3
if replication_conf, found := conf["dfs.replication"]; found {
if x, err := strconv.Atoi(replication_conf); err == nil {
replication = x
}
}

return &hdfsclient{addr: rpcAddr, c: c, dfsReplication: replication, basePath: basePath}, nil
return &hdfsclient{addr: strings.Join(rpcAddr, ","), c: c, dfsReplication: replication, basePath: basePath}, nil
}

// addr can be hdfs://nameservice e.g. hdfs://example, hdfs://example/user/juicefs
// convert the nameservice as a comma separated list of host:port by referencing hadoop conf
func parseHDFSAddr(addr string, conf hadoopconf.HadoopConf) (rpcAddresses []string, basePath string) {
addr = strings.TrimPrefix(addr, "hdfs://")
sp := strings.SplitN(addr, "/", 2)
authority := sp[0]

// check if it is a nameservice
var nns []string
confParam := "dfs.namenode.rpc-address." + authority
for key, value := range conf {
if key == confParam || strings.HasPrefix(key, confParam+".") {
nns = append(nns, value)
}
}
if len(nns) > 0 {
rpcAddresses = nns
} else {
rpcAddresses = strings.Split(authority, ",")
}
basePath = "/"
if len(sp) > 1 && len(sp[1]) > 0 {
basePath += strings.TrimRight(sp[1], "/") + "/"
}
return
}

func init() {
Expand Down
28 changes: 28 additions & 0 deletions pkg/object/object_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"testing"
"time"

"github.com/colinmarc/hdfs/v2/hadoopconf"

blob2 "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"

"github.com/volcengine/ve-tos-golang-sdk/v2/tos/enum"
Expand Down Expand Up @@ -725,6 +727,32 @@ func TestOBS(t *testing.T) { //skip mutate
}

func TestHDFS(t *testing.T) { //skip mutate
conf := make(hadoopconf.HadoopConf)
conf["dfs.namenode.rpc-address.ns.namenode1"] = "hadoop01:8020"
conf["dfs.namenode.rpc-address.ns.namenode2"] = "hadoop02:8020"

checkAddr := func(addr string, expected []string, base string) {
addresses, basePath := parseHDFSAddr(addr, conf)
if !reflect.DeepEqual(addresses, expected) {
t.Fatalf("expected addrs is %+v but got %+v from %s", expected, addresses, addr)
}
if basePath != base {
t.Fatalf("expected path is %s but got %s from %s", base, basePath, addr)
}
}

checkAddr("hadoop01:8020", []string{"hadoop01:8020"}, "/")
checkAddr("hdfs://hadoop01:8020/", []string{"hadoop01:8020"}, "/")
checkAddr("hadoop01:8020/user/juicefs/", []string{"hadoop01:8020"}, "/user/juicefs/")
checkAddr("hadoop01:8020/user/juicefs", []string{"hadoop01:8020"}, "/user/juicefs/")
checkAddr("hdfs://hadoop01:8020/user/juicefs/", []string{"hadoop01:8020"}, "/user/juicefs/")

// for HA
checkAddr("hadoop01:8020,hadoop02:8020", []string{"hadoop01:8020", "hadoop02:8020"}, "/")
checkAddr("hadoop01:8020,hadoop02:8020/user/juicefs/", []string{"hadoop01:8020", "hadoop02:8020"}, "/user/juicefs/")
checkAddr("hdfs://ns/user/juicefs", []string{"hadoop01:8020", "hadoop02:8020"}, "/user/juicefs/")
checkAddr("ns/user/juicefs/", []string{"hadoop01:8020", "hadoop02:8020"}, "/user/juicefs/")

if os.Getenv("HDFS_ADDR") == "" {
t.SkipNow()
}
Expand Down

0 comments on commit d99abda

Please sign in to comment.