Skip to content
This repository was archived by the owner on Mar 12, 2020. It is now read-only.

Commit 4e5a3fc

Browse files
committed
cluster: move to own package
Signed-off-by: Sander Pick <[email protected]>
1 parent 12c2ae0 commit 4e5a3fc

File tree

7 files changed

+277
-231
lines changed

7 files changed

+277
-231
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package core
1+
package cluster_test
22

33
import (
44
"context"
@@ -7,52 +7,61 @@ import (
77
"testing"
88
"time"
99

10+
icid "github.com/ipfs/go-cid"
11+
icore "github.com/ipfs/go-ipfs/core"
1012
"github.com/ipfs/go-ipfs/repo/fsrepo"
11-
"github.com/textileio/go-textile/repo/config"
12-
13+
"github.com/textileio/go-textile/cluster"
14+
"github.com/textileio/go-textile/core"
15+
"github.com/textileio/go-textile/ipfs"
1316
"github.com/textileio/go-textile/keypair"
17+
"github.com/textileio/go-textile/repo/config"
1418
)
1519

16-
var clusterVars = struct {
20+
var vars = struct {
1721
repoPath1 string
1822
repoPath2 string
19-
node1 *Textile
20-
node2 *Textile
23+
24+
node1 *core.Textile
25+
node2 *core.Textile
26+
27+
cid icid.Cid
2128
}{
2229
repoPath1: "testdata/.cluster1",
2330
repoPath2: "testdata/.cluster2",
31+
32+
cid: icid.Undef,
2433
}
2534

2635
func TestInitCluster(t *testing.T) {
27-
_ = os.RemoveAll(clusterVars.repoPath1)
28-
_ = os.RemoveAll(clusterVars.repoPath2)
36+
_ = os.RemoveAll(vars.repoPath1)
37+
_ = os.RemoveAll(vars.repoPath2)
2938

3039
accnt1 := keypair.Random()
3140
accnt2 := keypair.Random()
3241

33-
swarmPort1 := GetRandomPort()
34-
swarmPort2 := GetRandomPort()
42+
swarmPort1 := core.GetRandomPort()
43+
swarmPort2 := core.GetRandomPort()
3544

36-
secret, err := NewClusterSecret()
45+
secret, err := cluster.NewClusterSecret()
3746
if err != nil {
3847
t.Fatal(err)
3948
}
4049

41-
err = InitRepo(InitConfig{
50+
err = core.InitRepo(core.InitConfig{
4251
Account: accnt1,
43-
RepoPath: clusterVars.repoPath1,
44-
ApiAddr: fmt.Sprintf("127.0.0.1:%s", GetRandomPort()),
52+
RepoPath: vars.repoPath1,
53+
ApiAddr: fmt.Sprintf("127.0.0.1:%s", core.GetRandomPort()),
4554
SwarmPorts: swarmPort1,
4655
ClusterSecret: secret,
4756
Debug: true,
4857
})
4958
if err != nil {
5059
t.Fatalf("init node1 failed: %s", err)
5160
}
52-
err = InitRepo(InitConfig{
61+
err = core.InitRepo(core.InitConfig{
5362
Account: accnt2,
54-
RepoPath: clusterVars.repoPath2,
55-
ApiAddr: fmt.Sprintf("127.0.0.1:%s", GetRandomPort()),
63+
RepoPath: vars.repoPath2,
64+
ApiAddr: fmt.Sprintf("127.0.0.1:%s", core.GetRandomPort()),
5665
SwarmPorts: swarmPort2,
5766
ClusterSecret: secret,
5867
Debug: true,
@@ -62,19 +71,19 @@ func TestInitCluster(t *testing.T) {
6271
}
6372

6473
// update bootstraps
65-
addr1, err := getPeerAddress(clusterVars.repoPath1, swarmPort1)
74+
addr1, err := getPeerAddress(vars.repoPath1, swarmPort1)
6675
if err != nil {
6776
t.Fatal(err)
6877
}
69-
addr2, err := getPeerAddress(clusterVars.repoPath2, swarmPort2)
78+
addr2, err := getPeerAddress(vars.repoPath2, swarmPort2)
7079
if err != nil {
7180
t.Fatal(err)
7281
}
73-
err = updateClusterBootstraps(clusterVars.repoPath1, []string{addr2})
82+
err = updateClusterBootstraps(vars.repoPath1, []string{addr2})
7483
if err != nil {
7584
t.Fatal(err)
7685
}
77-
err = updateClusterBootstraps(clusterVars.repoPath2, []string{addr1})
86+
err = updateClusterBootstraps(vars.repoPath2, []string{addr1})
7887
if err != nil {
7988
t.Fatal(err)
8089
}
@@ -104,45 +113,64 @@ func updateClusterBootstraps(repoPath string, bootstraps []string) error {
104113

105114
func TestNewTextileCluster(t *testing.T) {
106115
var err error
107-
clusterVars.node1, err = NewTextile(RunConfig{
108-
RepoPath: clusterVars.repoPath1,
116+
vars.node1, err = core.NewTextile(core.RunConfig{
117+
RepoPath: vars.repoPath1,
109118
Debug: true,
110119
})
111120
if err != nil {
112121
t.Fatalf("create node1 failed: %s", err)
113122
}
114-
clusterVars.node2, err = NewTextile(RunConfig{
115-
RepoPath: clusterVars.repoPath2,
123+
vars.node2, err = core.NewTextile(core.RunConfig{
124+
RepoPath: vars.repoPath2,
116125
Debug: true,
117126
})
118127
if err != nil {
119128
t.Fatalf("create node2 failed: %s", err)
120129
}
121130

122-
<-clusterVars.node1.OnlineCh()
123-
<-clusterVars.node2.OnlineCh()
131+
<-vars.node1.OnlineCh()
132+
<-vars.node2.OnlineCh()
124133

125-
err = addTestData(clusterVars.node1)
134+
cid, err := pinTestData(vars.node1.Ipfs())
126135
if err != nil {
127136
t.Fatal(err)
128137
}
138+
vars.cid = *cid
129139
}
130140

131141
func TestTextileClusterSync(t *testing.T) {
132-
ctx, cancel := context.WithTimeout(clusterVars.node1.node.Context(), time.Minute)
142+
ctx, cancel := context.WithTimeout(vars.node1.Ipfs().Context(), time.Minute)
133143
defer cancel()
134144

135-
info, err := clusterVars.node1.cluster.SyncAll(ctx)
145+
info, err := vars.node1.Cluster().SyncAll(ctx)
136146
if err != nil {
137-
t.Fatalf("sync failed: %s", err)
147+
t.Fatalf("sync all failed: %s", err)
138148
}
139149

150+
var foundCid bool
140151
for _, i := range info {
141152
fmt.Println(i.String())
153+
if i.Cid.Equals(vars.cid) {
154+
foundCid = true
155+
}
156+
}
157+
158+
if !foundCid {
159+
t.Fatalf("failed to find cid in cluster: %s", vars.cid.String())
142160
}
143161
}
144162

145163
func TestTextileCluster_Teardown(t *testing.T) {
146-
clusterVars.node1 = nil
147-
clusterVars.node2 = nil
164+
vars.node1 = nil
165+
vars.node2 = nil
166+
}
167+
168+
func pinTestData(node *icore.IpfsNode) (*icid.Cid, error) {
169+
f, err := os.Open("../mill/testdata/image.jpeg")
170+
if err != nil {
171+
return nil, err
172+
}
173+
defer f.Close()
174+
175+
return ipfs.AddData(node, f, true, false)
148176
}

cluster/main.go

+187
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
package cluster
2+
3+
import (
4+
"context"
5+
"crypto/rand"
6+
"encoding/hex"
7+
"fmt"
8+
"path/filepath"
9+
10+
ds "github.com/ipfs/go-datastore"
11+
ipfscluster "github.com/ipfs/ipfs-cluster"
12+
"github.com/ipfs/ipfs-cluster/allocator/ascendalloc"
13+
"github.com/ipfs/ipfs-cluster/allocator/descendalloc"
14+
"github.com/ipfs/ipfs-cluster/config"
15+
"github.com/ipfs/ipfs-cluster/consensus/crdt"
16+
"github.com/ipfs/ipfs-cluster/informer/disk"
17+
"github.com/ipfs/ipfs-cluster/informer/numpin"
18+
"github.com/ipfs/ipfs-cluster/monitor/pubsubmon"
19+
"github.com/ipfs/ipfs-cluster/pintracker/maptracker"
20+
"github.com/ipfs/ipfs-cluster/pintracker/stateless"
21+
host "github.com/libp2p/go-libp2p-host"
22+
dht "github.com/libp2p/go-libp2p-kad-dht"
23+
pubsub "github.com/libp2p/go-libp2p-pubsub"
24+
ma "github.com/multiformats/go-multiaddr"
25+
"github.com/prometheus/common/log"
26+
)
27+
28+
func NewClusterSecret() (string, error) {
29+
secret := make([]byte, 32)
30+
_, err := rand.Read(secret)
31+
if err != nil {
32+
return "", err
33+
}
34+
return hex.EncodeToString(secret), nil
35+
}
36+
37+
func InitCluster(repoPath, secret string) error {
38+
decoded, err := ipfscluster.DecodeClusterSecret(secret)
39+
if err != nil {
40+
return err
41+
}
42+
43+
cfgMgr, cfgs := makeClusterConfigs()
44+
err = cfgMgr.Default()
45+
if err != nil {
46+
return err
47+
}
48+
cfgs.ClusterCfg.Secret = decoded
49+
50+
return cfgMgr.SaveJSON(ConfigPath(repoPath))
51+
}
52+
53+
func ConfigPath(repoPath string) string {
54+
return filepath.Join(repoPath, "service.json")
55+
}
56+
57+
func MakeAndLoadConfigs(repoPath string) (*config.Manager, *cfgs, error) {
58+
cfgMgr, cfgs := makeClusterConfigs()
59+
err := cfgMgr.LoadJSONFromFile(ConfigPath(repoPath))
60+
if err != nil {
61+
return nil, nil, err
62+
}
63+
return cfgMgr, cfgs, nil
64+
}
65+
66+
func ParseBootstraps(addrs []string) ([]ma.Multiaddr, error) {
67+
var parsed []ma.Multiaddr
68+
for _, a := range addrs {
69+
p, err := ma.NewMultiaddr(a)
70+
if err != nil {
71+
return nil, err
72+
}
73+
parsed = append(parsed, p)
74+
}
75+
return parsed, nil
76+
}
77+
78+
func Bootstrap(ctx context.Context, cluster *ipfscluster.Cluster, bootstraps []ma.Multiaddr) {
79+
for _, bstrap := range bootstraps {
80+
log.Infof("Bootstrapping to %s", bstrap)
81+
err := cluster.Join(ctx, bstrap)
82+
if err != nil {
83+
log.Errorf("bootstrap to %s failed: %s", bstrap, err)
84+
}
85+
}
86+
}
87+
88+
func SetupAllocation(
89+
name string,
90+
diskInfCfg *disk.Config,
91+
numpinInfCfg *numpin.Config,
92+
) (ipfscluster.Informer, ipfscluster.PinAllocator, error) {
93+
switch name {
94+
case "disk", "disk-freespace":
95+
informer, err := disk.NewInformer(diskInfCfg)
96+
if err != nil {
97+
return nil, nil, err
98+
}
99+
return informer, descendalloc.NewAllocator(), nil
100+
case "disk-reposize":
101+
informer, err := disk.NewInformer(diskInfCfg)
102+
if err != nil {
103+
return nil, nil, err
104+
}
105+
return informer, ascendalloc.NewAllocator(), nil
106+
case "numpin", "pincount":
107+
informer, err := numpin.NewInformer(numpinInfCfg)
108+
if err != nil {
109+
return nil, nil, err
110+
}
111+
return informer, ascendalloc.NewAllocator(), nil
112+
default:
113+
return nil, nil, fmt.Errorf("unknown allocation strategy")
114+
}
115+
}
116+
117+
func SetupPinTracker(
118+
name string,
119+
h host.Host,
120+
mapCfg *maptracker.Config,
121+
statelessCfg *stateless.Config,
122+
peerName string,
123+
) (ipfscluster.PinTracker, error) {
124+
switch name {
125+
case "map":
126+
ptrk := maptracker.NewMapPinTracker(mapCfg, h.ID(), peerName)
127+
log.Debug("map pintracker loaded")
128+
return ptrk, nil
129+
case "stateless":
130+
ptrk := stateless.New(statelessCfg, h.ID(), peerName)
131+
log.Debug("stateless pintracker loaded")
132+
return ptrk, nil
133+
default:
134+
return nil, fmt.Errorf("unknown pintracker type")
135+
}
136+
}
137+
138+
func SetupConsensus(
139+
h host.Host,
140+
dht *dht.IpfsDHT,
141+
pubsub *pubsub.PubSub,
142+
crdtCfg *crdt.Config,
143+
store ds.Datastore,
144+
) (ipfscluster.Consensus, error) {
145+
convrdt, err := crdt.New(h, dht, pubsub, crdtCfg, store)
146+
if err != nil {
147+
return nil, fmt.Errorf("error creating CRDT component: %s", err)
148+
}
149+
return convrdt, nil
150+
}
151+
152+
type cfgs struct {
153+
ClusterCfg *ipfscluster.Config
154+
CrdtCfg *crdt.Config
155+
MaptrackerCfg *maptracker.Config
156+
StatelessTrackerCfg *stateless.Config
157+
PubsubmonCfg *pubsubmon.Config
158+
DiskInfCfg *disk.Config
159+
NumpinInfCfg *numpin.Config
160+
}
161+
162+
func makeClusterConfigs() (*config.Manager, *cfgs) {
163+
cfg := config.NewManager()
164+
clusterCfg := &ipfscluster.Config{}
165+
crdtCfg := &crdt.Config{}
166+
maptrackerCfg := &maptracker.Config{}
167+
statelessCfg := &stateless.Config{}
168+
pubsubmonCfg := &pubsubmon.Config{}
169+
diskInfCfg := &disk.Config{}
170+
numpinInfCfg := &numpin.Config{}
171+
cfg.RegisterComponent(config.Cluster, clusterCfg)
172+
cfg.RegisterComponent(config.Consensus, crdtCfg)
173+
cfg.RegisterComponent(config.PinTracker, maptrackerCfg)
174+
cfg.RegisterComponent(config.PinTracker, statelessCfg)
175+
cfg.RegisterComponent(config.Monitor, pubsubmonCfg)
176+
cfg.RegisterComponent(config.Informer, diskInfCfg)
177+
cfg.RegisterComponent(config.Informer, numpinInfCfg)
178+
return cfg, &cfgs{
179+
clusterCfg,
180+
crdtCfg,
181+
maptrackerCfg,
182+
statelessCfg,
183+
pubsubmonCfg,
184+
diskInfCfg,
185+
numpinInfCfg,
186+
}
187+
}

cmd/main.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/golang/protobuf/proto"
2222
logging "github.com/ipfs/go-log"
2323
"github.com/mitchellh/go-homedir"
24+
"github.com/textileio/go-textile/cluster"
2425
"github.com/textileio/go-textile/core"
2526
"github.com/textileio/go-textile/keypair"
2627
"github.com/textileio/go-textile/pb"
@@ -458,7 +459,7 @@ Stacks may include:
458459
var secret string
459460
if *initIpfsCluster {
460461
if *initIpfsClusterSecret == "" {
461-
secret, err = core.NewClusterSecret()
462+
secret, err = cluster.NewClusterSecret()
462463
if err != nil {
463464
return err
464465
}

0 commit comments

Comments
 (0)