Skip to content

Commit

Permalink
Add pubsub router.
Browse files Browse the repository at this point in the history
  • Loading branch information
nasdf committed Feb 6, 2021
1 parent ae1a6be commit a68efb2
Show file tree
Hide file tree
Showing 16 changed files with 214 additions and 178 deletions.
9 changes: 7 additions & 2 deletions data/author.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func GetAuthor(ctx context.Context, dag ipld.DAGService, id cid.Cid) (*Author, e

// AddAuthor adds a author to the given dag.
func AddAuthor(ctx context.Context, dag ipld.DAGService, author *Author) (cid.Cid, error) {
node, err := cbornode.WrapObject(author, multihash.SHA2_256, -1)
node, err := author.Node()
if err != nil {
return cid.Cid{}, err
}
Expand All @@ -44,7 +44,7 @@ func AddAuthor(ctx context.Context, dag ipld.DAGService, author *Author) (cid.Ci
return node.Cid(), nil
}

// AuthorFromJON decodes a author from json.
// AuthorFromJSON decodes a author from json.
func AuthorFromJSON(data []byte) (*Author, error) {
var author Author
if err := json.Unmarshal(data, &author); err != nil {
Expand All @@ -70,3 +70,8 @@ func NewAuthor() *Author {
Repositories: make(map[string]cid.Cid),
}
}

// Node returns a cbornode containing the author.
func (a *Author) Node() (ipld.Node, error) {
return cbornode.WrapObject(a, multihash.SHA2_256, -1)
}
9 changes: 7 additions & 2 deletions data/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func GetCommit(ctx context.Context, dag ipld.DAGService, id cid.Cid) (*Commit, e

// AddCommit adds a commit to the given dag.
func AddCommit(ctx context.Context, dag ipld.DAGService, commit *Commit) (cid.Cid, error) {
node, err := cbornode.WrapObject(commit, multihash.SHA2_256, -1)
node, err := commit.Node()
if err != nil {
return cid.Cid{}, err
}
Expand All @@ -49,7 +49,7 @@ func AddCommit(ctx context.Context, dag ipld.DAGService, commit *Commit) (cid.Ci
return node.Cid(), nil
}

// CommitFromJON decodes a commit from json.
// CommitFromJSON decodes a commit from json.
func CommitFromJSON(data []byte) (*Commit, error) {
var commit Commit
if err := json.Unmarshal(data, &commit); err != nil {
Expand Down Expand Up @@ -80,6 +80,11 @@ func NewCommit(tree cid.Cid, message string, parents ...cid.Cid) *Commit {
}
}

// Node returns a cbornode containing the commit.
func (c *Commit) Node() (ipld.Node, error) {
return cbornode.WrapObject(c, multihash.SHA2_256, -1)
}

// ParentLinks returns parent ipld links.
func (c *Commit) ParentLinks() []*ipld.Link {
var out []*ipld.Link
Expand Down
22 changes: 6 additions & 16 deletions data/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-pinner"
"github.com/ipfs/go-ipld-cbor"
ipld "github.com/ipfs/go-ipld-format"
"github.com/multiformats/go-multihash"
Expand Down Expand Up @@ -39,7 +38,7 @@ func GetRepository(ctx context.Context, dag ipld.DAGService, id cid.Cid) (*Repos

// AddRepository adds a repo to the given dag.
func AddRepository(ctx context.Context, dag ipld.DAGService, repo *Repository) (cid.Cid, error) {
node, err := cbornode.WrapObject(repo, multihash.SHA2_256, -1)
node, err := repo.Node()
if err != nil {
return cid.Cid{}, err
}
Expand All @@ -51,20 +50,6 @@ func AddRepository(ctx context.Context, dag ipld.DAGService, repo *Repository) (
return node.Cid(), nil
}

// PinRepository pins a repo using the given pinner.
func PinRepository(ctx context.Context, pinner pin.Pinner, repo *Repository) (cid.Cid, error) {
node, err := cbornode.WrapObject(repo, multihash.SHA2_256, -1)
if err != nil {
return cid.Cid{}, err
}

if err := pinner.Pin(ctx, node, true); err != nil {
return cid.Cid{}, err
}

return node.Cid(), nil
}

// RepositoryFromJSON decodes a repo from json.
func RepositoryFromJSON(data []byte) (*Repository, error) {
var repo Repository
Expand Down Expand Up @@ -94,3 +79,8 @@ func NewRepository(name string) *Repository {
Metadata: make(map[string]string),
}
}

// Node returns a cbornode containing the repository.
func (r *Repository) Node() (ipld.Node, error) {
return cbornode.WrapObject(r, multihash.SHA2_256, -1)
}
56 changes: 0 additions & 56 deletions data/store.go

This file was deleted.

4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ require (
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.8 // indirect
github.com/ipfs/go-ipfs-pinner v0.1.0
github.com/ipfs/go-ipfs-provider v0.4.3
github.com/ipfs/go-ipld-cbor v0.0.3
github.com/ipfs/go-ipld-format v0.2.0
Expand All @@ -28,6 +27,9 @@ require (
github.com/libp2p/go-libp2p-core v0.7.0
github.com/libp2p/go-libp2p-kad-dht v0.11.0
github.com/libp2p/go-libp2p-noise v0.1.1
github.com/libp2p/go-libp2p-pubsub v0.4.1
github.com/libp2p/go-libp2p-pubsub-router v0.4.0
github.com/libp2p/go-libp2p-record v0.1.3
github.com/libp2p/go-libp2p-tls v0.1.3
github.com/multiformats/go-multihash v0.0.14
github.com/nasdf/diff3 v0.0.1
Expand Down
13 changes: 9 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYU
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/benbjohnson/clock v1.0.2 h1:Z0CN0Yb4ig9sGPXkvAQcGJfnrrMQ5QYLCMPRi9iD7YE=
github.com/benbjohnson/clock v1.0.2/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI=
github.com/btcsuite/btcd v0.0.0-20190605094302-a0d1e3e36d50/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI=
Expand Down Expand Up @@ -246,8 +247,6 @@ github.com/ipfs/go-ipfs-exchange-offline v0.0.1/go.mod h1:WhHSFCVYX36H/anEKQboAz
github.com/ipfs/go-ipfs-files v0.0.3/go.mod h1:INEFm0LL2LWXBhNJ2PMIIb2w45hpXgPjNoE7yA8Y1d4=
github.com/ipfs/go-ipfs-files v0.0.8 h1:8o0oFJkJ8UkO/ABl8T6ac6tKF3+NIpj67aAB6ZpusRg=
github.com/ipfs/go-ipfs-files v0.0.8/go.mod h1:wiN/jSG8FKyk7N0WyctKSvq3ljIa2NNTiZB55kpTdOs=
github.com/ipfs/go-ipfs-pinner v0.1.0 h1:rjSrbUDYd1YYHZ5dOgu+QEOuLcU0m/2a/brcxC/ReeU=
github.com/ipfs/go-ipfs-pinner v0.1.0/go.mod h1:EzyyaWCWeZJ/he9cDBH6QrEkSuRqTRWMmCoyNkylTTg=
github.com/ipfs/go-ipfs-posinfo v0.0.1 h1:Esoxj+1JgSjX0+ylc0hUmJCOv6V2vFoZiETLR6OtpRs=
github.com/ipfs/go-ipfs-posinfo v0.0.1/go.mod h1:SwyeVP+jCwiDu0C313l/8jg6ZxM0qqtlt2a0vILTc1A=
github.com/ipfs/go-ipfs-pq v0.0.1/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
Expand Down Expand Up @@ -282,7 +281,6 @@ github.com/ipfs/go-log/v2 v2.1.1 h1:G4TtqN+V9y9HY9TA6BwbCVyyBZ2B9MbCjR2MtGx8FR0=
github.com/ipfs/go-log/v2 v2.1.1/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM=
github.com/ipfs/go-merkledag v0.0.6/go.mod h1:QYPdnlvkOg7GnQRofu9XZimC5ZW5Wi3bKys/4GQQfto=
github.com/ipfs/go-merkledag v0.2.3/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk=
github.com/ipfs/go-merkledag v0.3.0/go.mod h1:4pymaZLhSLNVuiCITYrpViD6vmfZ/Ws4n/L9tfNv3S4=
github.com/ipfs/go-merkledag v0.3.2 h1:MRqj40QkrWkvPswXs4EfSslhZ4RVPRbxwX11js0t1xY=
github.com/ipfs/go-merkledag v0.3.2/go.mod h1:fvkZNNZixVW6cKSZ/JfLlON5OlgTXNdRLz0p6QG/I2M=
github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg=
Expand Down Expand Up @@ -471,6 +469,11 @@ github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6n
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
github.com/libp2p/go-libp2p-protocol v0.0.1/go.mod h1:Af9n4PiruirSDjHycM1QuiMi/1VZNHYcK8cLgFJLZ4s=
github.com/libp2p/go-libp2p-protocol v0.1.0/go.mod h1:KQPHpAabB57XQxGrXCNvbL6UEXfQqUgC/1adR2Xtflk=
github.com/libp2p/go-libp2p-pubsub v0.4.0/go.mod h1:izkeMLvz6Ht8yAISXjx60XUQZMq9ZMe5h2ih4dLIBIQ=
github.com/libp2p/go-libp2p-pubsub v0.4.1 h1:j4umIg5nyus+sqNfU+FWvb9aeYFQH/A+nDFhWj+8yy8=
github.com/libp2p/go-libp2p-pubsub v0.4.1/go.mod h1:izkeMLvz6Ht8yAISXjx60XUQZMq9ZMe5h2ih4dLIBIQ=
github.com/libp2p/go-libp2p-pubsub-router v0.4.0 h1:KjzTLIOBCt0+/4wH6epTxD/Qu4Up/IyeKHlj9MhWRJI=
github.com/libp2p/go-libp2p-pubsub-router v0.4.0/go.mod h1:hs0j0ugcBjMOMgJ6diOlZM2rZEId/w5Gg86E+ac4SmQ=
github.com/libp2p/go-libp2p-record v0.0.1/go.mod h1:grzqg263Rug/sRex85QrDOLntdFAymLDLm7lxMgU79Q=
github.com/libp2p/go-libp2p-record v0.1.0/go.mod h1:ujNc8iuE5dlKWVy6wuL6dd58t0n7xI4hAIl8pE6wu5Q=
github.com/libp2p/go-libp2p-record v0.1.2/go.mod h1:pal0eNcT5nqZaTV7UGhqeGqxFgGdsU/9W//C8dqjQDk=
Expand Down Expand Up @@ -768,6 +771,8 @@ github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9 h1:Y1/FEOpaCpD2
github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4=
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds=
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg=
github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE=
github.com/xanzy/ssh-agent v0.2.1 h1:TCbipTQL2JiiCprBWx9frJ2eJlCYT00NmctrHxVAr70=
github.com/xanzy/ssh-agent v0.2.1/go.mod h1:mLlQY/MoOhWBj+gOGMQkOeiEvkx+8pJSI+0Bx9h2kr4=
Expand Down
93 changes: 93 additions & 0 deletions p2p/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package p2p

import (
"context"
"errors"
"path"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
namesys "github.com/libp2p/go-libp2p-pubsub-router"
record "github.com/libp2p/go-libp2p-record"
"github.com/multiverse-vcs/go-multiverse/data"
)

// Namespace is the pubsub topic namespace.
const Namespace = "multiverse"

// Pubsub allows subcribing to topics.
type Pubsub struct {
store *namesys.PubsubValueStore
}

// TopicForPeerID returns the topic name for the given peer id.
func TopicForPeerID(id peer.ID) string {
return path.Join("/", Namespace, string(id))
}

// NewPubsub returns a new pubsub router.
func NewPubsub(ctx context.Context, host host.Host) (*Pubsub, error) {
sub, err := pubsub.NewGossipSub(ctx, host)
if err != nil {
return nil, err
}

store, err := namesys.NewPubsubValueStore(ctx, host, sub, validator{})
if err != nil {
return nil, err
}

return &Pubsub{
store: store,
}, nil
}

func (p *Pubsub) SearchAuthor(ctx context.Context, id peer.ID) (*data.Author, error) {
out, err := p.store.SearchValue(ctx, TopicForPeerID(id))
if err != nil {
return nil, err
}

return data.AuthorFromCBOR(<-out)
}

func (p *Pubsub) GetAuthor(ctx context.Context, id peer.ID) (*data.Author, error) {
value, err := p.store.GetValue(ctx, TopicForPeerID(id))
if err != nil {
return nil, err
}

return data.AuthorFromCBOR(value)
}

func (p *Pubsub) PutAuthor(ctx context.Context, id peer.ID, author *data.Author) error {
node, err := author.Node()
if err != nil {
return err
}

return p.store.PutValue(ctx, TopicForPeerID(id), node.RawData())
}

type validator struct{}

func (v validator) Validate(key string, value []byte) error {
ns, _, err := record.SplitKey(key)
if err != nil {
return err
}

if ns != Namespace {
return errors.New("invalid namespace")
}

// TODO unmarshal author from value
// TODO validate signature of author
return nil
}

func (v validator) Select(key string, vals [][]byte) (int, error) {
// TODO compare author sequence numbers
return 0, nil
}
2 changes: 1 addition & 1 deletion peer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Config struct {
path string
}

// NewConfig creates and saves a config with default settings.
// NewConfig creates a config with default settings.
func NewConfig(root string) (*Config, error) {
priv, err := p2p.GenerateKey()
if err != nil {
Expand Down
30 changes: 0 additions & 30 deletions peer/metric.go

This file was deleted.

Loading

0 comments on commit a68efb2

Please sign in to comment.