From a68efb222e8a746c20f34e67d5493d7127afb475 Mon Sep 17 00:00:00 2001 From: Keenan Nemetz Date: Fri, 5 Feb 2021 17:38:19 -0800 Subject: [PATCH] Add pubsub router. --- data/author.go | 9 ++++- data/commit.go | 9 ++++- data/repository.go | 22 +++-------- data/store.go | 56 -------------------------- go.mod | 4 +- go.sum | 13 +++++-- p2p/pubsub.go | 93 ++++++++++++++++++++++++++++++++++++++++++++ peer/config.go | 2 +- peer/metric.go | 30 -------------- peer/peer.go | 17 ++++++++ web/html/author.html | 11 ++++++ web/html/home.html | 26 ------------- web/html/index.html | 2 +- web/html/tree.html | 14 +++---- web/view.go | 69 ++++++++++++++++++-------------- web/web.go | 15 +++++-- 16 files changed, 214 insertions(+), 178 deletions(-) delete mode 100644 data/store.go create mode 100644 p2p/pubsub.go delete mode 100644 peer/metric.go create mode 100644 web/html/author.html delete mode 100644 web/html/home.html diff --git a/data/author.go b/data/author.go index 2ca3e98..a908f7c 100644 --- a/data/author.go +++ b/data/author.go @@ -32,7 +32,7 @@ func GetAuthor(ctx context.Context, dag ipld.DAGService, id cid.Cid) (*Author, e // AddAuthor adds a author to the given dag. func AddAuthor(ctx context.Context, dag ipld.DAGService, author *Author) (cid.Cid, error) { - node, err := cbornode.WrapObject(author, multihash.SHA2_256, -1) + node, err := author.Node() if err != nil { return cid.Cid{}, err } @@ -44,7 +44,7 @@ func AddAuthor(ctx context.Context, dag ipld.DAGService, author *Author) (cid.Ci return node.Cid(), nil } -// AuthorFromJON decodes a author from json. +// AuthorFromJSON decodes a author from json. func AuthorFromJSON(data []byte) (*Author, error) { var author Author if err := json.Unmarshal(data, &author); err != nil { @@ -70,3 +70,8 @@ func NewAuthor() *Author { Repositories: make(map[string]cid.Cid), } } + +// Node returns a cbornode containing the author. +func (a *Author) Node() (ipld.Node, error) { + return cbornode.WrapObject(a, multihash.SHA2_256, -1) +} diff --git a/data/commit.go b/data/commit.go index 2516751..42007b8 100644 --- a/data/commit.go +++ b/data/commit.go @@ -37,7 +37,7 @@ func GetCommit(ctx context.Context, dag ipld.DAGService, id cid.Cid) (*Commit, e // AddCommit adds a commit to the given dag. func AddCommit(ctx context.Context, dag ipld.DAGService, commit *Commit) (cid.Cid, error) { - node, err := cbornode.WrapObject(commit, multihash.SHA2_256, -1) + node, err := commit.Node() if err != nil { return cid.Cid{}, err } @@ -49,7 +49,7 @@ func AddCommit(ctx context.Context, dag ipld.DAGService, commit *Commit) (cid.Ci return node.Cid(), nil } -// CommitFromJON decodes a commit from json. +// CommitFromJSON decodes a commit from json. func CommitFromJSON(data []byte) (*Commit, error) { var commit Commit if err := json.Unmarshal(data, &commit); err != nil { @@ -80,6 +80,11 @@ func NewCommit(tree cid.Cid, message string, parents ...cid.Cid) *Commit { } } +// Node returns a cbornode containing the commit. +func (c *Commit) Node() (ipld.Node, error) { + return cbornode.WrapObject(c, multihash.SHA2_256, -1) +} + // ParentLinks returns parent ipld links. func (c *Commit) ParentLinks() []*ipld.Link { var out []*ipld.Link diff --git a/data/repository.go b/data/repository.go index b29ad03..760bad9 100644 --- a/data/repository.go +++ b/data/repository.go @@ -5,7 +5,6 @@ import ( "encoding/json" "github.com/ipfs/go-cid" - "github.com/ipfs/go-ipfs-pinner" "github.com/ipfs/go-ipld-cbor" ipld "github.com/ipfs/go-ipld-format" "github.com/multiformats/go-multihash" @@ -39,7 +38,7 @@ func GetRepository(ctx context.Context, dag ipld.DAGService, id cid.Cid) (*Repos // AddRepository adds a repo to the given dag. func AddRepository(ctx context.Context, dag ipld.DAGService, repo *Repository) (cid.Cid, error) { - node, err := cbornode.WrapObject(repo, multihash.SHA2_256, -1) + node, err := repo.Node() if err != nil { return cid.Cid{}, err } @@ -51,20 +50,6 @@ func AddRepository(ctx context.Context, dag ipld.DAGService, repo *Repository) ( return node.Cid(), nil } -// PinRepository pins a repo using the given pinner. -func PinRepository(ctx context.Context, pinner pin.Pinner, repo *Repository) (cid.Cid, error) { - node, err := cbornode.WrapObject(repo, multihash.SHA2_256, -1) - if err != nil { - return cid.Cid{}, err - } - - if err := pinner.Pin(ctx, node, true); err != nil { - return cid.Cid{}, err - } - - return node.Cid(), nil -} - // RepositoryFromJSON decodes a repo from json. func RepositoryFromJSON(data []byte) (*Repository, error) { var repo Repository @@ -94,3 +79,8 @@ func NewRepository(name string) *Repository { Metadata: make(map[string]string), } } + +// Node returns a cbornode containing the repository. +func (r *Repository) Node() (ipld.Node, error) { + return cbornode.WrapObject(r, multihash.SHA2_256, -1) +} diff --git a/data/store.go b/data/store.go deleted file mode 100644 index c1f31dd..0000000 --- a/data/store.go +++ /dev/null @@ -1,56 +0,0 @@ -package data - -import ( - "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" - "github.com/ipfs/go-datastore/query" -) - -// prefix is the parent key for the datstore. -var prefix = datastore.NewKey("multiverse") - -// Store is a key value database. -type Store struct { - datastore.Datastore -} - -// NewStore returns a new store that wraps the given store. -func NewStore(dstore datastore.Datastore) *Store { - return &Store{namespace.Wrap(dstore, prefix)} -} - -// GetCid returns the cid value of the given key name. -func (s *Store) GetCid(name string) (cid.Cid, error) { - data, err := s.Get(datastore.NewKey(name)) - if err != nil { - return cid.Cid{}, err - } - - return cid.Cast(data) -} - -// PutCid persists the given key name and value id. -func (s *Store) PutCid(name string, id cid.Cid) error { - return s.Put(datastore.NewKey(name), id.Bytes()) -} - -// Keys returns a list of all keys in the store. -func (s *Store) Keys() ([]string, error) { - res, err := s.Query(query.Query{KeysOnly: true}) - if err != nil { - return nil, err - } - - all, err := res.Rest() - if err != nil { - return nil, err - } - - var keys []string - for _, e := range all { - keys = append(keys, e.Key) - } - - return keys, nil -} diff --git a/go.mod b/go.mod index f3628b7..33de9de 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,6 @@ require ( github.com/ipfs/go-ipfs-chunker v0.0.5 github.com/ipfs/go-ipfs-exchange-offline v0.0.1 github.com/ipfs/go-ipfs-files v0.0.8 // indirect - github.com/ipfs/go-ipfs-pinner v0.1.0 github.com/ipfs/go-ipfs-provider v0.4.3 github.com/ipfs/go-ipld-cbor v0.0.3 github.com/ipfs/go-ipld-format v0.2.0 @@ -28,6 +27,9 @@ require ( github.com/libp2p/go-libp2p-core v0.7.0 github.com/libp2p/go-libp2p-kad-dht v0.11.0 github.com/libp2p/go-libp2p-noise v0.1.1 + github.com/libp2p/go-libp2p-pubsub v0.4.1 + github.com/libp2p/go-libp2p-pubsub-router v0.4.0 + github.com/libp2p/go-libp2p-record v0.1.3 github.com/libp2p/go-libp2p-tls v0.1.3 github.com/multiformats/go-multihash v0.0.14 github.com/nasdf/diff3 v0.0.1 diff --git a/go.sum b/go.sum index 44e0ba0..5fa4c3f 100644 --- a/go.sum +++ b/go.sum @@ -27,8 +27,9 @@ github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYU github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= -github.com/benbjohnson/clock v1.0.2 h1:Z0CN0Yb4ig9sGPXkvAQcGJfnrrMQ5QYLCMPRi9iD7YE= github.com/benbjohnson/clock v1.0.2/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= +github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg= +github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8= github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= github.com/btcsuite/btcd v0.0.0-20190605094302-a0d1e3e36d50/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= @@ -246,8 +247,6 @@ github.com/ipfs/go-ipfs-exchange-offline v0.0.1/go.mod h1:WhHSFCVYX36H/anEKQboAz github.com/ipfs/go-ipfs-files v0.0.3/go.mod h1:INEFm0LL2LWXBhNJ2PMIIb2w45hpXgPjNoE7yA8Y1d4= github.com/ipfs/go-ipfs-files v0.0.8 h1:8o0oFJkJ8UkO/ABl8T6ac6tKF3+NIpj67aAB6ZpusRg= github.com/ipfs/go-ipfs-files v0.0.8/go.mod h1:wiN/jSG8FKyk7N0WyctKSvq3ljIa2NNTiZB55kpTdOs= -github.com/ipfs/go-ipfs-pinner v0.1.0 h1:rjSrbUDYd1YYHZ5dOgu+QEOuLcU0m/2a/brcxC/ReeU= -github.com/ipfs/go-ipfs-pinner v0.1.0/go.mod h1:EzyyaWCWeZJ/he9cDBH6QrEkSuRqTRWMmCoyNkylTTg= github.com/ipfs/go-ipfs-posinfo v0.0.1 h1:Esoxj+1JgSjX0+ylc0hUmJCOv6V2vFoZiETLR6OtpRs= github.com/ipfs/go-ipfs-posinfo v0.0.1/go.mod h1:SwyeVP+jCwiDu0C313l/8jg6ZxM0qqtlt2a0vILTc1A= github.com/ipfs/go-ipfs-pq v0.0.1/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY= @@ -282,7 +281,6 @@ github.com/ipfs/go-log/v2 v2.1.1 h1:G4TtqN+V9y9HY9TA6BwbCVyyBZ2B9MbCjR2MtGx8FR0= github.com/ipfs/go-log/v2 v2.1.1/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM= github.com/ipfs/go-merkledag v0.0.6/go.mod h1:QYPdnlvkOg7GnQRofu9XZimC5ZW5Wi3bKys/4GQQfto= github.com/ipfs/go-merkledag v0.2.3/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk= -github.com/ipfs/go-merkledag v0.3.0/go.mod h1:4pymaZLhSLNVuiCITYrpViD6vmfZ/Ws4n/L9tfNv3S4= github.com/ipfs/go-merkledag v0.3.2 h1:MRqj40QkrWkvPswXs4EfSslhZ4RVPRbxwX11js0t1xY= github.com/ipfs/go-merkledag v0.3.2/go.mod h1:fvkZNNZixVW6cKSZ/JfLlON5OlgTXNdRLz0p6QG/I2M= github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg= @@ -471,6 +469,11 @@ github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6n github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= github.com/libp2p/go-libp2p-protocol v0.0.1/go.mod h1:Af9n4PiruirSDjHycM1QuiMi/1VZNHYcK8cLgFJLZ4s= github.com/libp2p/go-libp2p-protocol v0.1.0/go.mod h1:KQPHpAabB57XQxGrXCNvbL6UEXfQqUgC/1adR2Xtflk= +github.com/libp2p/go-libp2p-pubsub v0.4.0/go.mod h1:izkeMLvz6Ht8yAISXjx60XUQZMq9ZMe5h2ih4dLIBIQ= +github.com/libp2p/go-libp2p-pubsub v0.4.1 h1:j4umIg5nyus+sqNfU+FWvb9aeYFQH/A+nDFhWj+8yy8= +github.com/libp2p/go-libp2p-pubsub v0.4.1/go.mod h1:izkeMLvz6Ht8yAISXjx60XUQZMq9ZMe5h2ih4dLIBIQ= +github.com/libp2p/go-libp2p-pubsub-router v0.4.0 h1:KjzTLIOBCt0+/4wH6epTxD/Qu4Up/IyeKHlj9MhWRJI= +github.com/libp2p/go-libp2p-pubsub-router v0.4.0/go.mod h1:hs0j0ugcBjMOMgJ6diOlZM2rZEId/w5Gg86E+ac4SmQ= github.com/libp2p/go-libp2p-record v0.0.1/go.mod h1:grzqg263Rug/sRex85QrDOLntdFAymLDLm7lxMgU79Q= github.com/libp2p/go-libp2p-record v0.1.0/go.mod h1:ujNc8iuE5dlKWVy6wuL6dd58t0n7xI4hAIl8pE6wu5Q= github.com/libp2p/go-libp2p-record v0.1.2/go.mod h1:pal0eNcT5nqZaTV7UGhqeGqxFgGdsU/9W//C8dqjQDk= @@ -768,6 +771,8 @@ github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9 h1:Y1/FEOpaCpD2 github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds= github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI= +github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow= +github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= github.com/xanzy/ssh-agent v0.2.1 h1:TCbipTQL2JiiCprBWx9frJ2eJlCYT00NmctrHxVAr70= github.com/xanzy/ssh-agent v0.2.1/go.mod h1:mLlQY/MoOhWBj+gOGMQkOeiEvkx+8pJSI+0Bx9h2kr4= diff --git a/p2p/pubsub.go b/p2p/pubsub.go new file mode 100644 index 0000000..5fe31d9 --- /dev/null +++ b/p2p/pubsub.go @@ -0,0 +1,93 @@ +package p2p + +import ( + "context" + "errors" + "path" + + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + pubsub "github.com/libp2p/go-libp2p-pubsub" + namesys "github.com/libp2p/go-libp2p-pubsub-router" + record "github.com/libp2p/go-libp2p-record" + "github.com/multiverse-vcs/go-multiverse/data" +) + +// Namespace is the pubsub topic namespace. +const Namespace = "multiverse" + +// Pubsub allows subcribing to topics. +type Pubsub struct { + store *namesys.PubsubValueStore +} + +// TopicForPeerID returns the topic name for the given peer id. +func TopicForPeerID(id peer.ID) string { + return path.Join("/", Namespace, string(id)) +} + +// NewPubsub returns a new pubsub router. +func NewPubsub(ctx context.Context, host host.Host) (*Pubsub, error) { + sub, err := pubsub.NewGossipSub(ctx, host) + if err != nil { + return nil, err + } + + store, err := namesys.NewPubsubValueStore(ctx, host, sub, validator{}) + if err != nil { + return nil, err + } + + return &Pubsub{ + store: store, + }, nil +} + +func (p *Pubsub) SearchAuthor(ctx context.Context, id peer.ID) (*data.Author, error) { + out, err := p.store.SearchValue(ctx, TopicForPeerID(id)) + if err != nil { + return nil, err + } + + return data.AuthorFromCBOR(<-out) +} + +func (p *Pubsub) GetAuthor(ctx context.Context, id peer.ID) (*data.Author, error) { + value, err := p.store.GetValue(ctx, TopicForPeerID(id)) + if err != nil { + return nil, err + } + + return data.AuthorFromCBOR(value) +} + +func (p *Pubsub) PutAuthor(ctx context.Context, id peer.ID, author *data.Author) error { + node, err := author.Node() + if err != nil { + return err + } + + return p.store.PutValue(ctx, TopicForPeerID(id), node.RawData()) +} + +type validator struct{} + +func (v validator) Validate(key string, value []byte) error { + ns, _, err := record.SplitKey(key) + if err != nil { + return err + } + + if ns != Namespace { + return errors.New("invalid namespace") + } + + // TODO unmarshal author from value + // TODO validate signature of author + return nil +} + +func (v validator) Select(key string, vals [][]byte) (int, error) { + // TODO compare author sequence numbers + return 0, nil +} diff --git a/peer/config.go b/peer/config.go index 8db06ce..2ca6bca 100644 --- a/peer/config.go +++ b/peer/config.go @@ -22,7 +22,7 @@ type Config struct { path string } -// NewConfig creates and saves a config with default settings. +// NewConfig creates a config with default settings. func NewConfig(root string) (*Config, error) { priv, err := p2p.GenerateKey() if err != nil { diff --git a/peer/metric.go b/peer/metric.go deleted file mode 100644 index 7808100..0000000 --- a/peer/metric.go +++ /dev/null @@ -1,30 +0,0 @@ -package peer - -import ( - "github.com/ipfs/go-datastore" -) - -const megabyte = 1024 * 1024 - -// Metrics contains info about resource usage. -type Metrics struct { - // DiskUsage is the amount of storage used. - DiskUsage uint64 - // Peers is the number of discovered peers. - Peers int -} - -// GetMetrics returns a snapshot of the current metrics. -func (c *Client) GetMetrics() (*Metrics, error) { - peers := c.host.Peerstore().Peers() - - diskUsage, err := datastore.DiskUsage(c.dstore) - if err != nil { - return nil, err - } - - return &Metrics{ - DiskUsage: diskUsage / megabyte, - Peers: len(peers), - }, nil -} diff --git a/peer/peer.go b/peer/peer.go index 5c7b33e..c41d9dc 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -38,6 +38,7 @@ type Client struct { config *Config bstore blockstore.Blockstore dstore datastore.Batching + pubsub *p2p.Pubsub resolv *resolver.Resolver router routing.Routing system provider.System @@ -55,6 +56,11 @@ func New(ctx context.Context, dstore datastore.Batching, config *Config) (*Clien return nil, err } + pubsub, err := p2p.NewPubsub(ctx, host) + if err != nil { + return nil, err + } + bstore := blockstore.NewBlockstore(dstore) net := bsnet.NewFromIpfsHost(host, router) exc := bitswap.New(ctx, net, bstore) @@ -82,18 +88,29 @@ func New(ctx context.Context, dstore datastore.Batching, config *Config) (*Clien return nil, err } + // TODO use persistent store for pubsub so we don't have to add everytime + if err := pubsub.PutAuthor(ctx, host.ID(), config.Author); err != nil { + return nil, err + } + return &Client{ DAGService: dag, host: host, config: config, bstore: bstore, dstore: dstore, + pubsub: pubsub, resolv: resolv, router: router, system: system, }, nil } +// Pubsub returns the pubsub api. +func (c *Client) Pubsub() *p2p.Pubsub { + return c.pubsub +} + // Config returns the peer config. func (c *Client) Config() *Config { return c.config diff --git a/web/html/author.html b/web/html/author.html new file mode 100644 index 0000000..beafde2 --- /dev/null +++ b/web/html/author.html @@ -0,0 +1,11 @@ +
+ {{ range $i, $v := .List }} +
+

+ + {{ $v.Name }} + +

+
+ {{ end }} +
diff --git a/web/html/home.html b/web/html/home.html deleted file mode 100644 index 4ddd509..0000000 --- a/web/html/home.html +++ /dev/null @@ -1,26 +0,0 @@ - - -
- {{ range $i, $v := .List }} -
-

- - {{ $v.Name }} - -

-
- {{ end }} -
diff --git a/web/html/index.html b/web/html/index.html index c881f4f..d8cec5f 100644 --- a/web/html/index.html +++ b/web/html/index.html @@ -23,7 +23,7 @@

Multiverse

- {{ .Page }} + {{ . }}
diff --git a/web/html/tree.html b/web/html/tree.html index 55e8b58..ac44fe9 100644 --- a/web/html/tree.html +++ b/web/html/tree.html @@ -20,7 +20,7 @@