Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support hdfs://namenode:port for hdfs object storage #3713

Merged
merged 7 commits into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions docs/en/guide/how_to_set_up_object_storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -897,9 +897,14 @@ When `--access-key` is not specified on formatting, JuiceFS will use the current

JuiceFS will try to load configurations for HDFS client based on `$HADOOP_CONF_DIR` or `$HADOOP_HOME`. If an empty value is provided to `--bucket`, the default HDFS found in Hadoop configurations will be used.

For HA cluster, the addresses of NameNodes can be specified together like this: `--bucket=namenode1:port,namenode2:port`.
bucket format:

By default, data is stored on the subdirectory of `/`. You can specify `--bucket` including the path like `--bucket=namenode1:port,namenode2:port/user/juicefs`
- `[hdfs://]namenode:port[/path]`

for HA cluster:

- `[hdfs://]namenode1:port,namenode2:port[/path]`
- `[hdfs://]nameservice[/path]`

## Apache Ozone

Expand Down
9 changes: 8 additions & 1 deletion docs/zh_cn/guide/how_to_set_up_object_storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,14 @@ juicefs format \

JuiceFS 会尝试基于 `$HADOOP_CONF_DIR` 或 `$HADOOP_HOME` 为 HDFS 客户端加载配置。如果 `--bucket` 选项留空,将使用在 Hadoop 配置中找到的默认 HDFS。

对于 HA 群集,可以像下面这样一起指定 NameNodes 的地址:`--bucket=namenode1:port,namenode2:port`。
bucket 参数支持格式如下:

- `[hdfs://]namenode:port[/path]`

对于 HA 集群,bucket 参数可以:

- `[hdfs://]namenode1:port,namenode2:port[/path]`
- `[hdfs://]nameservice[/path]`

## Apache Ozone

Expand Down
71 changes: 34 additions & 37 deletions pkg/object/hdfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type hdfsclient struct {
}

func (h *hdfsclient) String() string {
return fmt.Sprintf("hdfs://%s/", h.addr)
return fmt.Sprintf("hdfs://%s%s", h.addr, h.basePath)
}

func (h *hdfsclient) path(key string) string {
Expand Down Expand Up @@ -314,41 +314,11 @@ func newHDFS(addr, username, sk, token string) (ObjectStorage, error) {
return nil, fmt.Errorf("Problem loading configuration: %s", err)
}

rpcAddr := addr
// addr can be hdfs://nameservice e.g. hdfs://example, hdfs://example/user/juicefs
// convert the nameservice as a comma separated list of host:port by referencing hadoop conf
if strings.HasPrefix(addr, "hdfs://") {
sp := strings.SplitN(addr[len("hdfs://"):], "/", 2)
nameservice := sp[0]

var nns []string
confParam := "dfs.namenode.rpc-address." + nameservice
for key, value := range conf {
if key == confParam ||
strings.HasPrefix(key, confParam + "." ) {
nns = append(nns, value)
}
}
if len(nns) <= 0 {
return nil, fmt.Errorf("invalid nameservice: %s", nameservice)
}
// e.g. nn1.example.com:8020,nn2.example.com:8020, nn1.example.com:8020,nn2.example.com:8020/user/juicefs
rpcAddr = strings.Join(nns, ",")
if len(sp) > 1 {
rpcAddr = rpcAddr + "/" + sp[1]
}
}

basePath := "/"
rpcAddr, basePath := parseHDFSAddr(addr, conf)
options := hdfs.ClientOptionsFromConf(conf)
if rpcAddr != "" {
// nn1.example.com:8020,nn2.example.com:8020/user/juicefs
sp := strings.SplitN(rpcAddr, "/", 2)
if len(sp) > 1 {
basePath = basePath + strings.TrimRight(sp[1], "/") + "/"
}
options.Addresses = strings.Split(sp[0], ",")
logger.Infof("HDFS Addresses: %s, basePath: %s", sp[0], basePath)
if addr != "" {
options.Addresses = rpcAddr
logger.Infof("HDFS Addresses: %s, basePath: %s", rpcAddr, basePath)
}

if options.KerberosClient != nil {
Expand Down Expand Up @@ -381,14 +351,41 @@ func newHDFS(addr, username, sk, token string) (ObjectStorage, error) {
supergroup = os.Getenv("HADOOP_SUPER_GROUP")
}

var replication int = 3
var replication = 3
if replication_conf, found := conf["dfs.replication"]; found {
if x, err := strconv.Atoi(replication_conf); err == nil {
replication = x
}
}

return &hdfsclient{addr: rpcAddr, c: c, dfsReplication: replication, basePath: basePath}, nil
return &hdfsclient{addr: strings.Join(rpcAddr, ","), c: c, dfsReplication: replication, basePath: basePath}, nil
}

// addr can be hdfs://nameservice e.g. hdfs://example, hdfs://example/user/juicefs
// convert the nameservice as a comma separated list of host:port by referencing hadoop conf
func parseHDFSAddr(addr string, conf hadoopconf.HadoopConf) (rpcAddresses []string, basePath string) {
addr = strings.TrimPrefix(addr, "hdfs://")
sp := strings.SplitN(addr, "/", 2)
authority := sp[0]

// check if it is a nameservice
var nns []string
confParam := "dfs.namenode.rpc-address." + authority
for key, value := range conf {
if key == confParam || strings.HasPrefix(key, confParam+".") {
nns = append(nns, value)
}
}
if len(nns) > 0 {
rpcAddresses = nns
} else {
rpcAddresses = strings.Split(authority, ",")
}
basePath = "/"
if len(sp) > 1 && len(sp[1]) > 0 {
basePath += strings.TrimRight(sp[1], "/") + "/"
}
return
}

func init() {
Expand Down
28 changes: 28 additions & 0 deletions pkg/object/object_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"testing"
"time"

"github.com/colinmarc/hdfs/v2/hadoopconf"

blob2 "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"

"github.com/volcengine/ve-tos-golang-sdk/v2/tos/enum"
Expand Down Expand Up @@ -725,6 +727,32 @@ func TestOBS(t *testing.T) { //skip mutate
}

func TestHDFS(t *testing.T) { //skip mutate
conf := make(hadoopconf.HadoopConf)
conf["dfs.namenode.rpc-address.ns.namenode1"] = "hadoop01:8020"
conf["dfs.namenode.rpc-address.ns.namenode2"] = "hadoop02:8020"

checkAddr := func(addr string, expected []string, base string) {
addresses, basePath := parseHDFSAddr(addr, conf)
if !reflect.DeepEqual(addresses, expected) {
t.Fatalf("expected addrs is %+v but got %+v from %s", expected, addresses, addr)
}
if basePath != base {
t.Fatalf("expected path is %s but got %s from %s", base, basePath, addr)
}
}

checkAddr("hadoop01:8020", []string{"hadoop01:8020"}, "/")
checkAddr("hdfs://hadoop01:8020/", []string{"hadoop01:8020"}, "/")
checkAddr("hadoop01:8020/user/juicefs/", []string{"hadoop01:8020"}, "/user/juicefs/")
checkAddr("hadoop01:8020/user/juicefs", []string{"hadoop01:8020"}, "/user/juicefs/")
checkAddr("hdfs://hadoop01:8020/user/juicefs/", []string{"hadoop01:8020"}, "/user/juicefs/")

// for HA
checkAddr("hadoop01:8020,hadoop02:8020", []string{"hadoop01:8020", "hadoop02:8020"}, "/")
checkAddr("hadoop01:8020,hadoop02:8020/user/juicefs/", []string{"hadoop01:8020", "hadoop02:8020"}, "/user/juicefs/")
checkAddr("hdfs://ns/user/juicefs", []string{"hadoop01:8020", "hadoop02:8020"}, "/user/juicefs/")
checkAddr("ns/user/juicefs/", []string{"hadoop01:8020", "hadoop02:8020"}, "/user/juicefs/")

if os.Getenv("HDFS_ADDR") == "" {
t.SkipNow()
}
Expand Down