Skip to content

Commit

Permalink
serde
Browse files Browse the repository at this point in the history
  • Loading branch information
renaynay committed Nov 12, 2021
1 parent 71d753f commit 7d6ef08
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 85 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ replace github.com/ipfs/go-verifcid => github.com/celestiaorg/go-verifcid v0.0.1
require (
github.com/BurntSushi/toml v0.4.1
github.com/celestiaorg/celestia-core v0.0.2-0.20210924001615-488ac31b4b3c
github.com/celestiaorg/go-libp2p-messenger v0.1.0
github.com/celestiaorg/nmt v0.7.0
github.com/celestiaorg/rsmt2d v0.3.0
github.com/gogo/protobuf v1.3.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ github.com/celestiaorg/celestia-core v0.0.2-0.20210924001615-488ac31b4b3c h1:5He
github.com/celestiaorg/celestia-core v0.0.2-0.20210924001615-488ac31b4b3c/go.mod h1:i1ECol3kZLZFIkWUJ+/zW5adfvSA8CCl4w/j2FwPOuc=
github.com/celestiaorg/go-leopard v0.1.0 h1:28z2EkvKJIez5J9CEaiiUEC+OxalRLtTGJJ1oScfE1g=
github.com/celestiaorg/go-leopard v0.1.0/go.mod h1:NtO/rjlB8dw2aq7jr06vZFKGvryQcTDXaNHelmPNOAM=
github.com/celestiaorg/go-libp2p-messenger v0.1.0 h1:rFldTa3ZWcRRn8E2bRWS94Qp1GFYXO2a0uvqpIey1B8=
github.com/celestiaorg/go-libp2p-messenger v0.1.0/go.mod h1:XzNksXrH0VxuNRGOnjPL9Ck4UyQlbmMpCYg9YwSBerI=
github.com/celestiaorg/go-verifcid v0.0.1-lazypatch h1:9TSe3w1cmJmbWlweCwCTIZkan7jV8M+KwglXpdD+UG8=
github.com/celestiaorg/go-verifcid v0.0.1-lazypatch/go.mod h1:kXPYu0XqTNUKWA1h3M95UHjUqBzDwXVVt/RXZDjKJmQ=
github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4 h1:CJdIpo8n5MFP2MwK0gSRcOVlDlFdQJO1p+FqdxYzmvc=
Expand Down
83 changes: 33 additions & 50 deletions service/header/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"fmt"

header_pb "github.com/celestiaorg/celestia-node/service/header/pb"
"github.com/celestiaorg/go-libp2p-messenger/serde"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -35,21 +38,17 @@ func newExchange(host host.Host, peer *peer.AddrInfo, store Store) *exchange {
// requestHandler handles inbound ExtendedHeaderRequests.
func (e *exchange) requestHandler(stream network.Stream) {
// unmarshal request
buf := make([]byte, 100) // TODO @renaynay: use Messenger lib to determine size of packet before reading. https://github.com/celestiaorg/go-libp2p-messenger/tree/main/serde
reqSize, err := stream.Read(buf)
pbreq := new(header_pb.ExtendedHeaderRequest)
_, err := serde.Read(stream, pbreq)
if err != nil {
log.Errorw("reading header request from stream", "err", err.Error())
//nolint:errcheck
stream.Reset()
return
}
request := new(ExtendedHeaderRequest)
err = request.UnmarshalBinary(buf[:reqSize])
if err != nil {
log.Errorw("unmarshaling inbound header request", "err", err.Error())
//nolint:errcheck
stream.Reset()
return
request := &ExtendedHeaderRequest{
Origin: pbreq.Origin,
Amount: pbreq.Amount,
}
// route depending on amount of headers requested
if request.Amount > 1 {
Expand Down Expand Up @@ -77,14 +76,14 @@ func (e *exchange) handleSingleHeaderRequest(origin uint64, stream network.Strea
stream.Reset()
return
}
bin, err := header.MarshalBinary()
resp, err := ExtendedHeaderToProto(header)
if err != nil {
log.Errorw("marshaling header", "height", origin, "err", err.Error())
log.Errorw("marshaling header to proto", "height", origin, "err", err.Error())
//nolint:errcheck
stream.Reset()
return
}
_, err = stream.Write(bin) // TODO @renaynay: use serde.Write from the golibp2p-messenger lib
_, err = serde.Write(stream, resp)
if err != nil {
log.Errorw("writing header to stream", "height", origin, "err", err.Error())
}
Expand All @@ -108,31 +107,26 @@ func (e *exchange) RequestHead(ctx context.Context) (*ExtendedHeader, error) {
return nil, err
}
// create request
req := &ExtendedHeaderRequest{
req := &header_pb.ExtendedHeaderRequest{
Origin: uint64(0),
Amount: 1,
}
bin, err := req.MarshalBinary()
if err != nil {
return nil, err
}
// send request
_, err = stream.Write(bin)
_, err = serde.Write(stream, req)
if err != nil {
return nil, err
}
buf := make([]byte, 2000) // TODO @renaynay: how big do we expect ExtendedHeader to be?
msgSize, err := stream.Read(buf)
// read request
resp := new(header_pb.ExtendedHeader)
_, err = serde.Read(stream, resp)
if err != nil {
return nil, err
}
// unmarshal response
resp := new(ExtendedHeader)
err = resp.UnmarshalBinary(buf[:msgSize])
head, err := ProtoToExtendedHeader(resp)
if err != nil {
return nil, err
}
return resp, nil
return head, nil
}

func (e *exchange) RequestHeader(ctx context.Context, height uint64) (*ExtendedHeader, error) {
Expand All @@ -145,31 +139,26 @@ func (e *exchange) RequestHeader(ctx context.Context, height uint64) (*ExtendedH
return nil, err
}
// create request
req := &ExtendedHeaderRequest{
req := &header_pb.ExtendedHeaderRequest{
Origin: height,
Amount: 1,
}
bin, err := req.MarshalBinary()
if err != nil {
return nil, err
}
// send request
_, err = stream.Write(bin)
_, err = serde.Write(stream, req)
if err != nil {
return nil, err
}
buf := make([]byte, 2000) // TODO @renaynay: how big do we expect ExtendedHeader to be?
msgSize, err := stream.Read(buf)
// read response
resp := new(header_pb.ExtendedHeader)
_, err = serde.Read(stream, resp)
if err != nil {
return nil, err
}
// unmarshal response
resp := new(ExtendedHeader)
err = resp.UnmarshalBinary(buf[:msgSize])
head, err := ProtoToExtendedHeader(resp)
if err != nil {
return nil, err
}
return resp, nil
return head, nil
}

func (e *exchange) RequestHeaders(ctx context.Context, origin, amount uint64) ([]*ExtendedHeader, error) {
Expand All @@ -178,34 +167,28 @@ func (e *exchange) RequestHeaders(ctx context.Context, origin, amount uint64) ([
return nil, err
}
// create request
req := &ExtendedHeaderRequest{
req := &header_pb.ExtendedHeaderRequest{
Origin: origin,
Amount: amount,
}
bin, err := req.MarshalBinary()
if err != nil {
return nil, err
}
// send request
_, err = stream.Write(bin)
_, err = serde.Write(stream, req)
if err != nil {
return nil, err
}
// read responses
resp := make([]*ExtendedHeader, amount)
headers := make([]*ExtendedHeader, amount)
for i := 0; i < int(amount); i++ {
buf := make([]byte, 2000) // TODO @renaynay: use serde
msgSize, err := stream.Read(buf)
resp := new(header_pb.ExtendedHeader)
_, err := serde.Read(stream, resp)
if err != nil {
return nil, err
}
// unmarshal response
eh := new(ExtendedHeader)
err = eh.UnmarshalBinary(buf[:msgSize])
header, err := ProtoToExtendedHeader(resp)
if err != nil {
return nil, err
}
resp[i] = eh
headers[i] = header
}
return resp, nil
return headers, nil
}
64 changes: 29 additions & 35 deletions service/header/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/stretchr/testify/require"

tmbytes "github.com/celestiaorg/celestia-core/libs/bytes"
header_pb "github.com/celestiaorg/celestia-node/service/header/pb"
"github.com/celestiaorg/go-libp2p-messenger/serde"
)

func TestExchange_RequestHead(t *testing.T) {
Expand Down Expand Up @@ -78,23 +80,23 @@ var expected *ExtendedHeader
var multipleExpected = make([]*ExtendedHeader, 5)

func testHeaderHandler(stream network.Stream) {
bin, err := expected.MarshalBinary()
proto, err := ExtendedHeaderToProto(expected)
if err != nil {
panic(err)
}
_, err = stream.Write(bin)
_, err = serde.Write(stream, proto)
if err != nil {
panic(err)
}
}

func testMultipleHeadersHandler(stream network.Stream) {
for _, header := range multipleExpected {
bin, err := header.MarshalBinary()
proto, err := ExtendedHeaderToProto(header)
if err != nil {
panic(err)
}
_, err = stream.Write(bin)
_, err = serde.Write(stream, proto)
if err != nil {
panic(err)
}
Expand All @@ -121,25 +123,23 @@ func TestExchange_Response_Head(t *testing.T) {
stream, err := peer.NewStream(context.Background(), libhost.InfoFromHost(host).ID, headerExchangeProtocolID)
require.NoError(t, err)
// create request
req := &ExtendedHeaderRequest{
req := &header_pb.ExtendedHeaderRequest{
Origin: uint64(0),
Amount: 1,
}
bin, err := req.MarshalBinary()
require.NoError(t, err)
// send request
_, err = stream.Write(bin)
_, err = serde.Write(stream, req)
require.NoError(t, err)
// read resp
buf := make([]byte, 2000)
respSize, err := stream.Read(buf)
require.NoError(t, err)
resp := new(ExtendedHeader)
err = resp.UnmarshalBinary(buf[:respSize])
resp := new(header_pb.ExtendedHeader)
_, err = serde.Read(stream, resp)
require.NoError(t, err)
// compare
assert.Equal(t, store.headers[5].Height, resp.Height)
assert.Equal(t, store.headers[5].Hash(), resp.Hash())
eh, err := ProtoToExtendedHeader(resp)
require.NoError(t, err)

assert.Equal(t, store.headers[5].Height, eh.Height)
assert.Equal(t, store.headers[5].Hash(), eh.Hash())
}

// TestExchange_Response_Single tests that the exchange instance can respond
Expand Down Expand Up @@ -167,21 +167,18 @@ func TestExchange_Response_Single(t *testing.T) {
Origin: origin,
Amount: 1,
}
bin, err := req.MarshalBinary()
require.NoError(t, err)
// send request
_, err = stream.Write(bin)
_, err = serde.Write(stream, req.ToProto())
require.NoError(t, err)
// read resp
buf := make([]byte, 2000)
respSize, err := stream.Read(buf)
require.NoError(t, err)
resp := new(ExtendedHeader)
err = resp.UnmarshalBinary(buf[:respSize])
resp := new(header_pb.ExtendedHeader)
_, err = serde.Read(stream, resp)
require.NoError(t, err)
// compare
assert.Equal(t, store.headers[int(origin)].Height, resp.Height)
assert.Equal(t, store.headers[int(origin)].Hash(), resp.Hash())
got, err := ProtoToExtendedHeader(resp)
require.NoError(t, err)
assert.Equal(t, store.headers[int(origin)].Height, got.Height)
assert.Equal(t, store.headers[int(origin)].Hash(), got.Hash())
}

// TestExchange_Response_Multiple tests that the exchange instance can respond
Expand All @@ -205,26 +202,23 @@ func TestExchange_Response_Multiple(t *testing.T) {
require.NoError(t, err)
// create request
origin := uint64(3)
req := &ExtendedHeaderRequest{
req := &header_pb.ExtendedHeaderRequest{
Origin: origin,
Amount: 2,
}
bin, err := req.MarshalBinary()
require.NoError(t, err)
// send request
_, err = stream.Write(bin)
_, err = serde.Write(stream, req)
require.NoError(t, err)
// read responses
for i := origin; i < (origin + req.Amount); i++ {
buf := make([]byte, 2000)
respSize, err := stream.Read(buf)
resp := new(header_pb.ExtendedHeader)
_, err := serde.Read(stream, resp)
require.NoError(t, err)
resp := new(ExtendedHeader)
err = resp.UnmarshalBinary(buf[:respSize])
eh, err := ProtoToExtendedHeader(resp)
require.NoError(t, err)
// compare
assert.Equal(t, store.headers[int(i)].Height, resp.Height)
assert.Equal(t, store.headers[int(i)].Hash(), resp.Hash())
assert.Equal(t, store.headers[int(i)].Height, eh.Height)
assert.Equal(t, store.headers[int(i)].Hash(), eh.Hash())
}
}

Expand Down
8 changes: 8 additions & 0 deletions service/header/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/celestiaorg/celestia-core/pkg/da"
core "github.com/celestiaorg/celestia-core/types"
header_pb "github.com/celestiaorg/celestia-node/service/header/pb"
)

type DataAvailabilityHeader = da.DataAvailabilityHeader
Expand Down Expand Up @@ -69,3 +70,10 @@ func (ehr *ExtendedHeaderRequest) UnmarshalBinary(data []byte) error {
*ehr = *out
return nil
}

func (ehr *ExtendedHeaderRequest) ToProto() *header_pb.ExtendedHeaderRequest {
return &header_pb.ExtendedHeaderRequest{
Origin: ehr.Origin,
Amount: ehr.Amount,
}
}
31 changes: 31 additions & 0 deletions service/header/header_serde.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,37 @@ func UnmarshalExtendedHeader(data []byte) (*ExtendedHeader, error) {
return out, nil
}

func ExtendedHeaderToProto(eh *ExtendedHeader) (*header_pb.ExtendedHeader, error) {
pb := &header_pb.ExtendedHeader{
Header: eh.RawHeader.ToProto(),
Commit: eh.Commit.ToProto(),
}
valSet, err := eh.ValidatorSet.ToProto()
if err != nil {
return nil, err
}
pb.ValidatorSet = valSet
dah, err := eh.DAH.ToProto()
if err != nil {
return nil, err
}
pb.Dah = dah
return pb, nil
}

func ProtoToExtendedHeader(pb *header_pb.ExtendedHeader) (*ExtendedHeader, error) {
bin, err := pb.Marshal()
if err != nil {
return nil, err
}
header := new(ExtendedHeader)
err = header.UnmarshalBinary(bin)
if err != nil {
return nil, err
}
return header, nil
}

// MarshalExtendedHeaderRequest serializes the given ExtendedHeaderRequest to bytes using protobuf.
// Paired with UnmarshalExtendedHeaderRequest.
func MarshalExtendedHeaderRequest(in *ExtendedHeaderRequest) ([]byte, error) {
Expand Down

0 comments on commit 7d6ef08

Please sign in to comment.