Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ extend-exclude = [

[default.extend-words]
Forgetten = "Forgetten"
# Common "spelling" of "type" in Rust/go
typ = "typ"
60 changes: 60 additions & 0 deletions sdks/go/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

types "github.com/EspressoSystems/espresso-network/sdks/go/types"
common "github.com/EspressoSystems/espresso-network/sdks/go/types/common"
"github.com/coder/websocket"
)

var _ QueryService = (*Client)(nil)
Expand Down Expand Up @@ -177,6 +178,65 @@ func (c *Client) SubmitTransaction(ctx context.Context, tx types.Transaction) (*
return &hash, nil
}

// Stream of JSON-encoded objects over a WebSocket connection
type WsStream[S any] struct {
conn *websocket.Conn
}

func (s *WsStream[S]) NextRaw(ctx context.Context) (json.RawMessage, error) {
typ, msg, err := s.conn.Read(ctx)
if err != nil {
return nil, fmt.Errorf("%w: %v", ErrEphemeral, err)
}
if typ != websocket.MessageText {
return nil, fmt.Errorf("%w: %v", ErrPermanent, err)
}
return msg, nil
}

func (s *WsStream[S]) Next(ctx context.Context) (*S, error) {
typ, msg, err := s.conn.Read(ctx)
if err != nil {
return nil, fmt.Errorf("%w: %v", ErrEphemeral, err)
}
if typ != websocket.MessageText {
return nil, fmt.Errorf("%w: %v", ErrPermanent, err)
}
var data S
if err := json.Unmarshal(msg, &data); err != nil {
return nil, fmt.Errorf("%w: %v", ErrPermanent, err)
}
return &data, nil
}

func (s *WsStream[S]) Close() error {
return s.conn.Close(websocket.StatusNormalClosure, "")
}

// Open a `Stream` of Espresso transactions starting from a specific block height.
func (c *Client) StreamTransactions(ctx context.Context, height uint64) (Stream[types.TransactionQueryData], error) {
opts := &websocket.DialOptions{}
opts.HTTPClient = c.client
url := c.baseUrl + fmt.Sprintf("availability/stream/transactions/%d", height)
conn, _, err := websocket.Dial(ctx, url, opts)
if err != nil {
return nil, fmt.Errorf("%w: %v", ErrEphemeral, err)
}
return &WsStream[types.TransactionQueryData]{conn: conn}, nil
}

// Open a `Stream` of Espresso transactions starting from a specific block height, filtered by namespace.
func (c *Client) StreamTransactionsInNamespace(ctx context.Context, height uint64, namespace uint64) (Stream[types.TransactionQueryData], error) {
opts := &websocket.DialOptions{}
opts.HTTPClient = c.client
url := c.baseUrl + fmt.Sprintf("availability/stream/transactions/%d/namespace/%d", height, namespace)
conn, _, err := websocket.Dial(ctx, url, opts)
if err != nil {
return nil, fmt.Errorf("%w: %v", ErrEphemeral, err)
}
return &WsStream[types.TransactionQueryData]{conn: conn}, nil
}

type NamespaceResponse struct {
Proof *json.RawMessage `json:"proof"`
Transactions *[]types.Transaction `json:"transactions"`
Expand Down
19 changes: 19 additions & 0 deletions sdks/go/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
tagged_base64 "github.com/EspressoSystems/espresso-network/sdks/go/tagged-base64"
types "github.com/EspressoSystems/espresso-network/sdks/go/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)

var workingDir = "../../../"
Expand Down Expand Up @@ -67,6 +68,24 @@ func TestApiWithEspressoDevNode(t *testing.T) {
}
fmt.Println("submitted transaction with hash", hash)

stream, err := client.StreamTransactions(ctx, 1)
require.NoError(t, err)

txData, err := stream.Next(ctx)
require.NoError(t, err)
require.NotNil(t, txData)
require.Equal(t, txData.Transaction.Payload, tx.Payload)
require.Equal(t, txData.Transaction.Namespace, tx.Namespace)

// Test streaming with namespace filter
nsStream, err := client.StreamTransactionsInNamespace(ctx, 1, tx.Namespace)
require.NoError(t, err)

nsTxData, err := nsStream.Next(ctx)
require.NoError(t, err)
require.NotNil(t, nsTxData)
require.Equal(t, nsTxData.Transaction.Payload, tx.Payload)
require.Equal(t, nsTxData.Transaction.Namespace, tx.Namespace)
}

func runDevNode(ctx context.Context, tmpDir string) func() {
Expand Down
114 changes: 114 additions & 0 deletions sdks/go/client/multiple_nodes_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,120 @@ func (c *MultipleNodesClient) SubmitTransaction(ctx context.Context, tx common.T
return nil, fmt.Errorf("%w: encountered an error with all nodes while attempting to SubmitTransaction.\n Errors: %v \n", ErrEphemeral, errs)
}

// A wrapper over multiple `Stream`s that are supposed to return the same
// sequence of objects that verifies the items using majority rule.
// An underlying stream that deviates from majority or responds with an error
// is disabled for the rest of the MultiplexedStream's existence. If majority
// of the underlying streams is disabled, calling Next on the MultiplexedStream
// will always return an error.
type MultiplexedStream[T any] struct {
nStreams int
workingStreams []Stream[T]
}

func (ms *MultiplexedStream[T]) NextRaw(ctx context.Context) (json.RawMessage, error) {
newWorkingStreams := []Stream[T]{}

majority := (ms.nStreams / 2) + 1

values := make(map[string]int)
var returnValue json.RawMessage = nil

for _, stream := range ms.workingStreams {
rawValue, err := stream.NextRaw(ctx)
if err != nil {
continue
}

hash, err := hashNormalizedJSON(rawValue)
if err != nil {
continue
}

if _, ok := values[hash]; !ok {
values[hash] = 0
}

values[hash]++
if values[hash] >= majority {
returnValue = rawValue
}

newWorkingStreams = append(newWorkingStreams, stream)
}

ms.workingStreams = newWorkingStreams

if returnValue == nil {
return nil, fmt.Errorf("%w: no majority", ErrPermanent)
} else {
return returnValue, nil
}
}

func (ms *MultiplexedStream[T]) Next(ctx context.Context) (*T, error) {
next, err := ms.NextRaw(ctx)
if err != nil {
return nil, err
}

var value T
err = json.Unmarshal(next, &value)
if err != nil {
return nil, err
}

return &value, nil
}

func (ms *MultiplexedStream[T]) Close() error {
var returnErr error
for _, stream := range ms.workingStreams {
err := stream.Close()
if err != nil {
returnErr = err
}
}

return returnErr
}

func (c *MultipleNodesClient) StreamTransactions(ctx context.Context, height uint64) (Stream[types.TransactionQueryData], error) {

workingStreams := []Stream[types.TransactionQueryData]{}
for _, node := range c.nodes {
stream, err := node.StreamTransactions(ctx, height)
if err != nil {
return nil, err
}

workingStreams = append(workingStreams, stream)
}

return &MultiplexedStream[types.TransactionQueryData]{
nStreams: len(c.nodes),
workingStreams: workingStreams,
}, nil
}

func (c *MultipleNodesClient) StreamTransactionsInNamespace(ctx context.Context, height uint64, namespace uint64) (Stream[types.TransactionQueryData], error) {

workingStreams := []Stream[types.TransactionQueryData]{}
for _, node := range c.nodes {
stream, err := node.StreamTransactionsInNamespace(ctx, height, namespace)
if err != nil {
return nil, err
}

workingStreams = append(workingStreams, stream)
}

return &MultiplexedStream[types.TransactionQueryData]{
nStreams: len(c.nodes),
workingStreams: workingStreams,
}, nil
}

func FetchWithMajority[T any](ctx context.Context, nodes []*T, fetchFunc func(*T) (json.RawMessage, error)) (json.RawMessage, error) {
type result struct {
value json.RawMessage
Expand Down
15 changes: 15 additions & 0 deletions sdks/go/client/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ type QueryService interface {
FetchVidCommonByHeight(ctx context.Context, blockHeight uint64) (types.VidCommon, error)
// Get the transaction by its hash from the explorer.
FetchExplorerTransactionByHash(ctx context.Context, hash *types.TaggedBase64) (types.ExplorerTransactionQueryData, error)
// Stream transactions starting from the given height.
StreamTransactions(ctx context.Context, height uint64) (Stream[types.TransactionQueryData], error)
// Stream transactions starting from the given height, filtered by namespace.
StreamTransactionsInNamespace(ctx context.Context, height uint64, namespace uint64) (Stream[types.TransactionQueryData], error)
}

// Interface representing a pollable stream of JSON-encoded objects
type Stream[S any] interface {
// Get the next item from the stream. Equivalent to
// calling NextRaw(ctx) and decoding the result.
Next(ctx context.Context) (*S, error)
// Get next item from the stream as raw JSON objects.
NextRaw(ctx context.Context) (json.RawMessage, error)
// Close the underlying connection, if applicable
Close() error
}

// Response to `FetchTransactionsInBlock`
Expand Down
1 change: 1 addition & 0 deletions sdks/go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/StackExchange/wmi v1.2.1 // indirect
github.com/bits-and-blooms/bitset v1.10.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/coder/websocket v1.8.14 // indirect
github.com/consensys/bavard v0.1.13 // indirect
github.com/consensys/gnark-crypto v0.12.1 // indirect
github.com/crate-crypto/go-kzg-4844 v0.7.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions sdks/go/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2 h1:IKgmqgMQlVJIZj19CdocBeS
github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2/go.mod h1:8BT+cPK6xvFOcRlk0R8eg+OTkcqI6baNH4xAkpiYVvQ=
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo=
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ=
github.com/coder/websocket v1.8.14 h1:9L0p0iKiNOibykf283eHkKUHHrpG7f65OE3BhhO7v9g=
github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg=
github.com/consensys/bavard v0.1.13 h1:oLhMLOFGTLdlda/kma4VOJazblc7IM5y5QPd2A/YjhQ=
github.com/consensys/bavard v0.1.13/go.mod h1:9ItSMtA/dXMAiL7BG6bqW2m3NdSEObYWoH223nGHukI=
github.com/consensys/gnark-crypto v0.12.1 h1:lHH39WuuFgVHONRl3J0LRBtuYdQTumFSDtJF7HpyG8M=
Expand Down
Loading