-
Notifications
You must be signed in to change notification settings - Fork 173
/
zookeeper.go
98 lines (85 loc) · 2.62 KB
/
zookeeper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
//
// Copyright 2021 Layotto Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utils
import (
"errors"
"fmt"
"github.com/go-zookeeper/zk"
"strconv"
"strings"
"time"
)
const (
zkHost = "zookeeperHosts"
zkPassword = "zookeeperPassword"
sessionTimeout = "SessionTimeout"
logInfo = "LogInfo"
defaultSessionTimeout = 5 * time.Second
)
type ConnectionFactory interface {
NewConnection(expire time.Duration, meta ZookeeperMetadata) (ZKConnection, error)
}
type ConnectionFactoryImpl struct {
}
func (c *ConnectionFactoryImpl) NewConnection(expire time.Duration, meta ZookeeperMetadata) (ZKConnection, error) {
if expire == 0 {
expire = meta.SessionTimeout
}
conn, _, err := zk.Connect(meta.Hosts, expire, zk.WithLogInfo(meta.LogInfo))
if err != nil {
return nil, err
}
return conn, nil
}
type ZKConnection interface {
Get(path string) ([]byte, *zk.Stat, error)
Set(path string, data []byte, version int32) (*zk.Stat, error)
Delete(path string, version int32) error
Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error)
Close()
}
type ZookeeperMetadata struct {
Hosts []string
Password string
SessionTimeout time.Duration
LogInfo bool
}
func ParseZookeeperMetadata(properties map[string]string) (ZookeeperMetadata, error) {
m := ZookeeperMetadata{}
if val, ok := properties[zkHost]; ok && val != "" {
split := strings.Split(val, ";")
m.Hosts = append(m.Hosts, split...)
} else {
return m, errors.New("zookeeper store error: missing zkHost address")
}
if val, ok := properties[zkPassword]; ok && val != "" {
m.Password = val
}
m.SessionTimeout = defaultSessionTimeout
if val, ok := properties[sessionTimeout]; ok && val != "" {
parsedVal, err := strconv.Atoi(val)
if err != nil {
return m, fmt.Errorf("zookeeper store error: can't parse SessionTimeout field: %s", err)
}
m.SessionTimeout = time.Duration(parsedVal) * time.Second
}
if val, ok := properties[logInfo]; ok && val != "" {
b, err := strconv.ParseBool(val)
if err != nil {
return ZookeeperMetadata{}, err
}
m.LogInfo = b
}
return m, nil
}