Skip to content
This repository has been archived by the owner on Oct 5, 2023. It is now read-only.

Commit

Permalink
feat: pubsub http rpc with multibase
Browse files Browse the repository at this point in the history
This updates HTTP RPC wire format to one from
ipfs/kubo#8183
  • Loading branch information
lidel committed Oct 12, 2021
1 parent 8569e83 commit 37a13e2
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 20 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# go-ipfs-http-api

[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/)
[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](https://protocol.ai)
[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](https://ipfs.io/)
[![](https://img.shields.io/badge/matrix-%23ipfs-blue.svg?style=flat-square)](https://app.element.io/#/room/#ipfs:matrix.org)
[![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme)
[![GoDoc](https://godoc.org/github.com/ipfs/go-ipfs-http-api?status.svg)](https://godoc.org/github.com/ipfs/go-ipfs-http-api)

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/libp2p/go-libp2p-core v0.8.6
github.com/mitchellh/go-homedir v1.1.0
github.com/multiformats/go-multiaddr v0.3.3
github.com/multiformats/go-multibase v0.0.3
github.com/multiformats/go-multihash v0.0.15
github.com/pkg/errors v0.9.1
)
Expand Down
78 changes: 61 additions & 17 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
iface "github.com/ipfs/interface-go-ipfs-core"
caopts "github.com/ipfs/interface-go-ipfs-core/options"
"github.com/libp2p/go-libp2p-core/peer"
mbase "github.com/multiformats/go-multibase"
)

type PubsubAPI HttpApi
Expand All @@ -21,8 +22,15 @@ func (api *PubsubAPI) Ls(ctx context.Context) ([]string, error) {
if err := api.core().Request("pubsub/ls").Exec(ctx, &out); err != nil {
return nil, err
}

return out.Strings, nil
topics := make([]string, len(out.Strings))
for n, mb := range out.Strings {
_, topic, err := mbase.Decode(mb)
if err != nil {
return nil, err
}
topics[n] = string(topic)
}
return topics, nil
}

func (api *PubsubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOption) ([]peer.ID, error) {
Expand All @@ -35,7 +43,11 @@ func (api *PubsubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOptio
Strings []string
}

if err := api.core().Request("pubsub/peers", options.Topic).Exec(ctx, &out); err != nil {
var optionalTopic string
if len(options.Topic) > 0 {
optionalTopic = toMultibase([]byte(options.Topic))
}
if err := api.core().Request("pubsub/peers", optionalTopic).Exec(ctx, &out); err != nil {
return nil, err
}

Expand All @@ -51,7 +63,7 @@ func (api *PubsubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOptio
}

func (api *PubsubAPI) Publish(ctx context.Context, topic string, message []byte) error {
return api.core().Request("pubsub/pub", topic).
return api.core().Request("pubsub/pub", toMultibase([]byte(topic))).
FileBody(bytes.NewReader(message)).
Exec(ctx, nil)
}
Expand All @@ -64,29 +76,36 @@ type pubsubSub struct {
}

type pubsubMessage struct {
JFrom []byte `json:"from,omitempty"`
JData []byte `json:"data,omitempty"`
JSeqno []byte `json:"seqno,omitempty"`
JFrom string `json:"from,omitempty"`
JData string `json:"data,omitempty"`
JSeqno string `json:"seqno,omitempty"`
JTopicIDs []string `json:"topicIDs,omitempty"`

from peer.ID
err error
// real values after unpacking from text/multibase envelopes
from peer.ID
data []byte
seqno []byte
topics []string

err error
}

func (msg *pubsubMessage) From() peer.ID {
return msg.from
}

func (msg *pubsubMessage) Data() []byte {
return msg.JData
return msg.data
}

func (msg *pubsubMessage) Seq() []byte {
return msg.JSeqno
return msg.seqno
}

// TODO: do we want to keep this interface as []string,
// or change to more correct [][]byte?
func (msg *pubsubMessage) Topics() []string {
return msg.JTopicIDs
return msg.topics
}

func (s *pubsubSub) Next(ctx context.Context) (iface.PubSubMessage, error) {
Expand All @@ -98,22 +117,41 @@ func (s *pubsubSub) Next(ctx context.Context) (iface.PubSubMessage, error) {
if msg.err != nil {
return nil, msg.err
}
// unpack values from text/multibase envelopes
var err error
msg.from, err = peer.IDFromBytes(msg.JFrom)
return &msg, err
msg.from, err = peer.Decode(msg.JFrom)
if err != nil {
return nil, err
}
_, msg.data, err = mbase.Decode(msg.JData)
if err != nil {
return nil, err
}
_, msg.seqno, err = mbase.Decode(msg.JSeqno)
if err != nil {
return nil, err
}
for _, mbt := range msg.JTopicIDs {
_, topic, err := mbase.Decode(mbt)
if err != nil {
return nil, err
}
msg.topics = append(msg.topics, string(topic))
}
return &msg, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}

func (api *PubsubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (iface.PubSubSubscription, error) {
/* right now we have no options (discover got deprecated)
options, err := caopts.PubSubSubscribeOptions(opts...)
if err != nil {
return nil, err
}

resp, err := api.core().Request("pubsub/sub", topic).
Option("discover", options.Discover).Send(ctx)
*/
resp, err := api.core().Request("pubsub/sub", toMultibase([]byte(topic))).Send(ctx)

if err != nil {
return nil, err
Expand Down Expand Up @@ -168,3 +206,9 @@ func (s *pubsubSub) Close() error {
func (api *PubsubAPI) core() *HttpApi {
return (*HttpApi)(api)
}

// Encodes bytes into URL-safe multibase that can be sent over HTTP RPC (URL or body)
func toMultibase(data []byte) string {
mb, _ := mbase.Encode(mbase.Base64url, data)
return mb
}

0 comments on commit 37a13e2

Please sign in to comment.