Skip to content
This repository has been archived by the owner on Mar 12, 2020. It is now read-only.

[WIP] IPFS Cluster Integration #831

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ _testmain.go
.gx/
.ipfs
.textile*
.cluster*

# Development environment files
.ackrc
Expand Down
187 changes: 187 additions & 0 deletions cluster/connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package cluster

import (
"bytes"
"context"
"fmt"
"io/ioutil"

icid "github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/core/coreapi"
"github.com/ipfs/go-ipfs/core/corerepo"
"github.com/ipfs/go-ipfs/pin"
iface "github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/interface-go-ipfs-core/options"
"github.com/ipfs/interface-go-ipfs-core/path"
"github.com/ipfs/ipfs-cluster/api"
corepeer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
)

type Connector struct {
node *core.IpfsNode
api iface.CoreAPI
peers func(ctx context.Context) []*api.ID
}

func NewConnector(node *core.IpfsNode, peers func(ctx context.Context) []*api.ID) (*Connector, error) {
capi, err := coreapi.NewCoreAPI(node)
if err != nil {
return nil, err
}
return &Connector{
node: node,
api: capi,
peers: peers,
}, nil
}

func (c *Connector) ID(context.Context) (*api.IPFSID, error) {
var addrs []api.Multiaddr
for _, addr := range c.node.PeerHost.Addrs() {
addrs = append(addrs, api.Multiaddr{Multiaddr: addr})
}
return &api.IPFSID{
ID: c.node.Identity,
Addresses: addrs,
}, nil
}

func (c *Connector) SetClient(client *rpc.Client) {
// noop
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and in Shutdown you will need to copy from https://github.com/ipfs/ipfs-cluster/blob/master/ipfsconn/ipfshttp/ipfshttp.go#L191

The main Cluster component will SetClient with an RPC client that allows this component to talk to any component in any peer in the Cluster. It is also the signal that the component can move forward with any internal tasks (the normal IPFSConnector implementation will wait for a bit and trigger an automatic ipfs swarm connect to every other daemon attached to every other known cluster peer).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, since you don't use rpc at all, it's actually ok like this!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌

}

func (c *Connector) Shutdown(ctx context.Context) error {
// noop
return nil
}

// @todo handle maxDepth
func (c *Connector) Pin(ctx context.Context, cid icid.Cid, maxDepth int) error {
return c.api.Pin().Add(ctx, path.New(cid.String()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For record. The original ipfs-cluster connector, by default, calls refs -r <cid> and then pin. The reason is many refs -r can happen in parallel, while only one pin does. For the general case, this way works better.

On the other hand, refs -r uses a non-parallel dag walking approach while pin uses the async, faster version.

The original cluster-pin also uses the RPC to request an update of the freespace metrics every 10th pin.

}

func (c *Connector) Unpin(ctx context.Context, cid icid.Cid) error {
return c.api.Pin().Rm(ctx, path.New(cid.String()))
}

func (c *Connector) PinLsCid(ctx context.Context, cid icid.Cid) (api.IPFSPinStatus, error) {
pins, err := c.node.Pinning.CheckIfPinned(cid)
if err != nil {
return api.IPFSPinStatusError, err
}
if len(pins) == 0 {
return api.IPFSPinStatusError, fmt.Errorf("invalid pin check result")
}
return c.pinModeToStatus(pins[0].Mode), nil
}

func (c *Connector) PinLs(ctx context.Context, typeFilter string) (map[string]api.IPFSPinStatus, error) {
pins, err := c.api.Pin().Ls(ctx, c.pinFilterToOption(typeFilter))
if err != nil {
return nil, err
}
statusMap := make(map[string]api.IPFSPinStatus)
for _, p := range pins {
mode, ok := pin.StringToMode(p.Type())
if !ok {
continue
}
statusMap[p.Path().String()] = c.pinModeToStatus(mode)
}
return statusMap, nil
}

func (c *Connector) ConnectSwarms(ctx context.Context) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For information: this will get triggered when a new cafe "bootstraps" (uses Cluster.Join(<peer maddr>)). It will be called in the cluster peer it bootstraps to. If textile never calls Join() to introduce new peers to the Cluster*, then it can be a noop.

*Join() is not strictly necessary with CRDTs, but it is a way of getting two libp2p hosts in the same cluster connected (and this should 1) allow pubsub to start working (otherwise the peer may never receive pubsub messages because it doesn't know any of the other peers subscribed to them), 2) Potentially allow dht-discovery of other cluster peers and better connectivity.

for _, p := range c.peers(ctx) {
log.Debugf("cluster dialing %s", p.ID.Pretty())
var addrs []ma.Multiaddr
for _, addr := range p.Addresses {
addrs = append(addrs, addr.Multiaddr)
}
err := c.api.Swarm().Connect(ctx, corepeer.AddrInfo{
ID: p.ID,
Addrs: addrs,
})
if err != nil {
return err
}
log.Debugf("cluster connected to %s")
}
return nil
}

func (c *Connector) SwarmPeers(ctx context.Context) ([]peer.ID, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only used by ConnectGraph (which generates a .dot file with cluster and ipfs daemons connections).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nice that's cool

conns, err := c.api.Swarm().Peers(ctx)
if err != nil {
return nil, err
}
var peers []peer.ID
for _, c := range conns {
peers = append(peers, c.ID())
}
return peers, nil
}

func (c *Connector) ConfigKey(keypath string) (interface{}, error) {
return c.node.Repo.GetConfigKey(keypath)
}

func (c *Connector) RepoStat(ctx context.Context) (*api.IPFSRepoStat, error) {
stat, err := corerepo.RepoStat(ctx, c.node)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To check: we call ipfs with size-only=true. https://github.com/ipfs/ipfs-cluster/blob/master/ipfsconn/ipfshttp/ipfshttp.go#L600

This makes the RepoStat call WAY faster (no full node count).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if err != nil {
return nil, err
}
return &api.IPFSRepoStat{
RepoSize: stat.RepoSize,
StorageMax: stat.StorageMax,
}, nil
}

func (c *Connector) Resolve(ctx context.Context, pth string) (icid.Cid, error) {
res, err := c.api.ResolvePath(ctx, path.New(pth))
if err != nil {
return icid.Undef, err
}
return res.Cid(), nil
}

func (c *Connector) BlockPut(ctx context.Context, b *api.NodeWithMeta) error {
_, err := c.api.Block().Put(ctx, bytes.NewReader(b.Data), options.Block.Format(b.Format))
return err
}

func (c *Connector) BlockGet(ctx context.Context, cid icid.Cid) ([]byte, error) {
r, err := c.api.Block().Get(ctx, path.New(cid.String()))
if err != nil {
return nil, err
}
return ioutil.ReadAll(r)
}

func (c *Connector) pinModeToStatus(mode pin.Mode) api.IPFSPinStatus {
switch mode {
case pin.Recursive:
return api.IPFSPinStatusRecursive
case pin.Direct:
return api.IPFSPinStatusDirect
case pin.Indirect:
return api.IPFSPinStatusIndirect
case pin.Internal:
return api.IPFSPinStatusDirect
case pin.NotPinned:
return api.IPFSPinStatusUnpinned
default:
return api.IPFSPinStatusError
}
}

func (c *Connector) pinFilterToOption(typeFilter string) options.PinLsOption {
return func(settings *options.PinLsSettings) error {
settings.Type = typeFilter
return nil
}
}
192 changes: 192 additions & 0 deletions cluster/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package cluster

import (
"context"
"fmt"
"path/filepath"

ds "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/allocator/ascendalloc"
"github.com/ipfs/ipfs-cluster/allocator/descendalloc"
"github.com/ipfs/ipfs-cluster/config"
"github.com/ipfs/ipfs-cluster/consensus/crdt"
"github.com/ipfs/ipfs-cluster/informer/disk"
"github.com/ipfs/ipfs-cluster/informer/numpin"
"github.com/ipfs/ipfs-cluster/monitor/pubsubmon"
"github.com/ipfs/ipfs-cluster/observations"
"github.com/ipfs/ipfs-cluster/pintracker/maptracker"
"github.com/ipfs/ipfs-cluster/pintracker/stateless"
host "github.com/libp2p/go-libp2p-host"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
ma "github.com/multiformats/go-multiaddr"
)

var log = logging.Logger("tex-cluster")

func InitCluster(repoPath, listenAddr string) error {
cfgMgr, cfgs := makeClusterConfigs()
err := cfgMgr.Default()
if err != nil {
return err
}

if listenAddr != "" {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, I think we don't use it. This is another of the options (like secret) which are only used to configure the libp2p host and not in Cluster per-se.

However the config will fail to validate if unset. I think it makes sense to set it to the real listen endpoint of the cafe (like here).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, cool. Should be easy enough to just copy the value used in the IPFS config.

addr, err := ma.NewMultiaddr(listenAddr)
if err != nil {
return err
}
cfgs.ClusterCfg.ListenAddr = addr
}

return cfgMgr.SaveJSON(ConfigPath(repoPath))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you don't need to save (it's writing or overwriting to disk). Might be useful to debug what actual config you are generating.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have to double check, but I think this is the first time it's written

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you don't want your users to modify cluster configs, rather you want to give them something that "just works" and let them adjust some paramemeters via go-textile flags? how's the approach with the embedded ipfs daemon now, can they manually edit its configuration?

}

func ConfigPath(repoPath string) string {
return filepath.Join(repoPath, "service.json")
}

func MakeAndLoadConfigs(repoPath string) (*config.Manager, *cfgs, error) {
cfgMgr, cfgs := makeClusterConfigs()
err := cfgMgr.LoadJSONFromFile(ConfigPath(repoPath))
if err != nil {
return nil, nil, err
}
return cfgMgr, cfgs, nil
}

func ParseBootstraps(addrs []string) ([]ma.Multiaddr, error) {
var parsed []ma.Multiaddr
for _, a := range addrs {
p, err := ma.NewMultiaddr(a)
if err != nil {
return nil, err
}
parsed = append(parsed, p)
}
return parsed, nil
}

func Bootstrap(ctx context.Context, cluster *ipfscluster.Cluster, cons ipfscluster.Consensus, bootstraps []ma.Multiaddr) {
for _, bstrap := range bootstraps {
log.Infof("Bootstrapping to %s", bstrap)
err := cluster.Join(ctx, bstrap)
if err != nil {
log.Errorf("bootstrap to %s failed: %s", bstrap, err)
} else {
for _, p := range cluster.Peers(ctx) {
err = cons.Trust(ctx, p.ID)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is a +1 on auto-trusting peers you bootstrap to. So might get this done in Cluster (it's not yet there).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, maybe a config setting to auto-trust?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this. The bootstrap concept in cluster is part of the daemon binary (not of the cluster library itself), so while we can fix this there (ipfs-cluster/ipfs-cluster#834), it won't help you.

if err != nil {
log.Errorf("failed to trust %s: %s", p.ID.Pretty(), err)
}
}
}
}
}

func SetupAllocation(
name string,
diskInfCfg *disk.Config,
numpinInfCfg *numpin.Config,
) (ipfscluster.Informer, ipfscluster.PinAllocator, error) {
switch name {
case "disk", "disk-freespace":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably you want to default to this one. The others are not so useful, but maybe a good placeholder to have this for the future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌

informer, err := disk.NewInformer(diskInfCfg)
if err != nil {
return nil, nil, err
}
return informer, descendalloc.NewAllocator(), nil
case "disk-reposize":
informer, err := disk.NewInformer(diskInfCfg)
if err != nil {
return nil, nil, err
}
return informer, ascendalloc.NewAllocator(), nil
case "numpin", "pincount":
informer, err := numpin.NewInformer(numpinInfCfg)
if err != nil {
return nil, nil, err
}
return informer, ascendalloc.NewAllocator(), nil
default:
return nil, nil, fmt.Errorf("unknown allocation strategy")
}
}

func SetupPinTracker(
name string,
h host.Host,
mapCfg *maptracker.Config,
statelessCfg *stateless.Config,
peerName string,
) (ipfscluster.PinTracker, error) {
switch name {
case "map":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the moment defaulting to this one makes most sense. We want to level up the stateless one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌

ptrk := maptracker.NewMapPinTracker(mapCfg, h.ID(), peerName)
log.Debug("map pintracker loaded")
return ptrk, nil
case "stateless":
ptrk := stateless.New(statelessCfg, h.ID(), peerName)
log.Debug("stateless pintracker loaded")
return ptrk, nil
default:
return nil, fmt.Errorf("unknown pintracker type")
}
}

func SetupConsensus(
h host.Host,
dht *dht.IpfsDHT,
pubsub *pubsub.PubSub,
crdtCfg *crdt.Config,
store ds.Datastore,
) (ipfscluster.Consensus, error) {
convrdt, err := crdt.New(h, dht, pubsub, crdtCfg, store)
if err != nil {
return nil, fmt.Errorf("error creating CRDT component: %s", err)
}
return convrdt, nil
}

type cfgs struct {
ClusterCfg *ipfscluster.Config
CrdtCfg *crdt.Config
MaptrackerCfg *maptracker.Config
StatelessTrackerCfg *stateless.Config
PubsubmonCfg *pubsubmon.Config
DiskInfCfg *disk.Config
NumpinInfCfg *numpin.Config
TracingCfg *observations.TracingConfig
}

func makeClusterConfigs() (*config.Manager, *cfgs) {
cfg := config.NewManager()
clusterCfg := &ipfscluster.Config{}
crdtCfg := &crdt.Config{}
maptrackerCfg := &maptracker.Config{}
statelessCfg := &stateless.Config{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not worth registering if not using, same for numpinInformer.

pubsubmonCfg := &pubsubmon.Config{}
diskInfCfg := &disk.Config{}
numpinInfCfg := &numpin.Config{}
tracingCfg := &observations.TracingConfig{}
cfg.RegisterComponent(config.Cluster, clusterCfg)
cfg.RegisterComponent(config.Consensus, crdtCfg)
cfg.RegisterComponent(config.PinTracker, maptrackerCfg)
cfg.RegisterComponent(config.PinTracker, statelessCfg)
cfg.RegisterComponent(config.Monitor, pubsubmonCfg)
cfg.RegisterComponent(config.Informer, diskInfCfg)
cfg.RegisterComponent(config.Informer, numpinInfCfg)
cfg.RegisterComponent(config.Observations, tracingCfg)
return cfg, &cfgs{
clusterCfg,
crdtCfg,
maptrackerCfg,
statelessCfg,
pubsubmonCfg,
diskInfCfg,
numpinInfCfg,
tracingCfg,
}
}
Loading