diff --git a/.typos.toml b/.typos.toml index 1d78ca16e6c..5bd270e3d04 100644 --- a/.typos.toml +++ b/.typos.toml @@ -20,3 +20,5 @@ extend-exclude = [ [default.extend-words] Forgetten = "Forgetten" +# Common "spelling" of "type" in Rust/go +typ = "typ" diff --git a/sdks/go/client/client.go b/sdks/go/client/client.go index 8483963f51d..b763bb11442 100644 --- a/sdks/go/client/client.go +++ b/sdks/go/client/client.go @@ -12,6 +12,7 @@ import ( types "github.com/EspressoSystems/espresso-network/sdks/go/types" common "github.com/EspressoSystems/espresso-network/sdks/go/types/common" + "github.com/coder/websocket" ) var _ QueryService = (*Client)(nil) @@ -177,6 +178,65 @@ func (c *Client) SubmitTransaction(ctx context.Context, tx types.Transaction) (* return &hash, nil } +// Stream of JSON-encoded objects over a WebSocket connection +type WsStream[S any] struct { + conn *websocket.Conn +} + +func (s *WsStream[S]) NextRaw(ctx context.Context) (json.RawMessage, error) { + typ, msg, err := s.conn.Read(ctx) + if err != nil { + return nil, fmt.Errorf("%w: %v", ErrEphemeral, err) + } + if typ != websocket.MessageText { + return nil, fmt.Errorf("%w: %v", ErrPermanent, err) + } + return msg, nil +} + +func (s *WsStream[S]) Next(ctx context.Context) (*S, error) { + typ, msg, err := s.conn.Read(ctx) + if err != nil { + return nil, fmt.Errorf("%w: %v", ErrEphemeral, err) + } + if typ != websocket.MessageText { + return nil, fmt.Errorf("%w: %v", ErrPermanent, err) + } + var data S + if err := json.Unmarshal(msg, &data); err != nil { + return nil, fmt.Errorf("%w: %v", ErrPermanent, err) + } + return &data, nil +} + +func (s *WsStream[S]) Close() error { + return s.conn.Close(websocket.StatusNormalClosure, "") +} + +// Open a `Stream` of Espresso transactions starting from a specific block height. +func (c *Client) StreamTransactions(ctx context.Context, height uint64) (Stream[types.TransactionQueryData], error) { + opts := &websocket.DialOptions{} + opts.HTTPClient = c.client + url := c.baseUrl + fmt.Sprintf("availability/stream/transactions/%d", height) + conn, _, err := websocket.Dial(ctx, url, opts) + if err != nil { + return nil, fmt.Errorf("%w: %v", ErrEphemeral, err) + } + return &WsStream[types.TransactionQueryData]{conn: conn}, nil +} + +// Open a `Stream` of Espresso transactions starting from a specific block height, filtered by namespace. +func (c *Client) StreamTransactionsInNamespace(ctx context.Context, height uint64, namespace uint64) (Stream[types.TransactionQueryData], error) { + opts := &websocket.DialOptions{} + opts.HTTPClient = c.client + url := c.baseUrl + fmt.Sprintf("availability/stream/transactions/%d/namespace/%d", height, namespace) + conn, _, err := websocket.Dial(ctx, url, opts) + if err != nil { + return nil, fmt.Errorf("%w: %v", ErrEphemeral, err) + } + return &WsStream[types.TransactionQueryData]{conn: conn}, nil +} + type NamespaceResponse struct { Proof *json.RawMessage `json:"proof"` Transactions *[]types.Transaction `json:"transactions"` diff --git a/sdks/go/client/client_test.go b/sdks/go/client/client_test.go index 367656e769b..e69387f1068 100644 --- a/sdks/go/client/client_test.go +++ b/sdks/go/client/client_test.go @@ -12,6 +12,7 @@ import ( tagged_base64 "github.com/EspressoSystems/espresso-network/sdks/go/tagged-base64" types "github.com/EspressoSystems/espresso-network/sdks/go/types" "github.com/ethereum/go-ethereum/log" + "github.com/stretchr/testify/require" ) var workingDir = "../../../" @@ -67,6 +68,24 @@ func TestApiWithEspressoDevNode(t *testing.T) { } fmt.Println("submitted transaction with hash", hash) + stream, err := client.StreamTransactions(ctx, 1) + require.NoError(t, err) + + txData, err := stream.Next(ctx) + require.NoError(t, err) + require.NotNil(t, txData) + require.Equal(t, txData.Transaction.Payload, tx.Payload) + require.Equal(t, txData.Transaction.Namespace, tx.Namespace) + + // Test streaming with namespace filter + nsStream, err := client.StreamTransactionsInNamespace(ctx, 1, tx.Namespace) + require.NoError(t, err) + + nsTxData, err := nsStream.Next(ctx) + require.NoError(t, err) + require.NotNil(t, nsTxData) + require.Equal(t, nsTxData.Transaction.Payload, tx.Payload) + require.Equal(t, nsTxData.Transaction.Namespace, tx.Namespace) } func runDevNode(ctx context.Context, tmpDir string) func() { diff --git a/sdks/go/client/multiple_nodes_client.go b/sdks/go/client/multiple_nodes_client.go index 6b8199907ee..0ab51f598d9 100644 --- a/sdks/go/client/multiple_nodes_client.go +++ b/sdks/go/client/multiple_nodes_client.go @@ -168,6 +168,120 @@ func (c *MultipleNodesClient) SubmitTransaction(ctx context.Context, tx common.T return nil, fmt.Errorf("%w: encountered an error with all nodes while attempting to SubmitTransaction.\n Errors: %v \n", ErrEphemeral, errs) } +// A wrapper over multiple `Stream`s that are supposed to return the same +// sequence of objects that verifies the items using majority rule. +// An underlying stream that deviates from majority or responds with an error +// is disabled for the rest of the MultiplexedStream's existence. If majority +// of the underlying streams is disabled, calling Next on the MultiplexedStream +// will always return an error. +type MultiplexedStream[T any] struct { + nStreams int + workingStreams []Stream[T] +} + +func (ms *MultiplexedStream[T]) NextRaw(ctx context.Context) (json.RawMessage, error) { + newWorkingStreams := []Stream[T]{} + + majority := (ms.nStreams / 2) + 1 + + values := make(map[string]int) + var returnValue json.RawMessage = nil + + for _, stream := range ms.workingStreams { + rawValue, err := stream.NextRaw(ctx) + if err != nil { + continue + } + + hash, err := hashNormalizedJSON(rawValue) + if err != nil { + continue + } + + if _, ok := values[hash]; !ok { + values[hash] = 0 + } + + values[hash]++ + if values[hash] >= majority { + returnValue = rawValue + } + + newWorkingStreams = append(newWorkingStreams, stream) + } + + ms.workingStreams = newWorkingStreams + + if returnValue == nil { + return nil, fmt.Errorf("%w: no majority", ErrPermanent) + } else { + return returnValue, nil + } +} + +func (ms *MultiplexedStream[T]) Next(ctx context.Context) (*T, error) { + next, err := ms.NextRaw(ctx) + if err != nil { + return nil, err + } + + var value T + err = json.Unmarshal(next, &value) + if err != nil { + return nil, err + } + + return &value, nil +} + +func (ms *MultiplexedStream[T]) Close() error { + var returnErr error + for _, stream := range ms.workingStreams { + err := stream.Close() + if err != nil { + returnErr = err + } + } + + return returnErr +} + +func (c *MultipleNodesClient) StreamTransactions(ctx context.Context, height uint64) (Stream[types.TransactionQueryData], error) { + + workingStreams := []Stream[types.TransactionQueryData]{} + for _, node := range c.nodes { + stream, err := node.StreamTransactions(ctx, height) + if err != nil { + return nil, err + } + + workingStreams = append(workingStreams, stream) + } + + return &MultiplexedStream[types.TransactionQueryData]{ + nStreams: len(c.nodes), + workingStreams: workingStreams, + }, nil +} + +func (c *MultipleNodesClient) StreamTransactionsInNamespace(ctx context.Context, height uint64, namespace uint64) (Stream[types.TransactionQueryData], error) { + + workingStreams := []Stream[types.TransactionQueryData]{} + for _, node := range c.nodes { + stream, err := node.StreamTransactionsInNamespace(ctx, height, namespace) + if err != nil { + return nil, err + } + + workingStreams = append(workingStreams, stream) + } + + return &MultiplexedStream[types.TransactionQueryData]{ + nStreams: len(c.nodes), + workingStreams: workingStreams, + }, nil +} + func FetchWithMajority[T any](ctx context.Context, nodes []*T, fetchFunc func(*T) (json.RawMessage, error)) (json.RawMessage, error) { type result struct { value json.RawMessage diff --git a/sdks/go/client/query.go b/sdks/go/client/query.go index e9055b83ddf..c05bf193523 100644 --- a/sdks/go/client/query.go +++ b/sdks/go/client/query.go @@ -27,6 +27,21 @@ type QueryService interface { FetchVidCommonByHeight(ctx context.Context, blockHeight uint64) (types.VidCommon, error) // Get the transaction by its hash from the explorer. FetchExplorerTransactionByHash(ctx context.Context, hash *types.TaggedBase64) (types.ExplorerTransactionQueryData, error) + // Stream transactions starting from the given height. + StreamTransactions(ctx context.Context, height uint64) (Stream[types.TransactionQueryData], error) + // Stream transactions starting from the given height, filtered by namespace. + StreamTransactionsInNamespace(ctx context.Context, height uint64, namespace uint64) (Stream[types.TransactionQueryData], error) +} + +// Interface representing a pollable stream of JSON-encoded objects +type Stream[S any] interface { + // Get the next item from the stream. Equivalent to + // calling NextRaw(ctx) and decoding the result. + Next(ctx context.Context) (*S, error) + // Get next item from the stream as raw JSON objects. + NextRaw(ctx context.Context) (json.RawMessage, error) + // Close the underlying connection, if applicable + Close() error } // Response to `FetchTransactionsInBlock` diff --git a/sdks/go/go.mod b/sdks/go/go.mod index a4ecec58bd7..052c118d3ff 100644 --- a/sdks/go/go.mod +++ b/sdks/go/go.mod @@ -17,6 +17,7 @@ require ( github.com/StackExchange/wmi v1.2.1 // indirect github.com/bits-and-blooms/bitset v1.10.0 // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect + github.com/coder/websocket v1.8.14 // indirect github.com/consensys/bavard v0.1.13 // indirect github.com/consensys/gnark-crypto v0.12.1 // indirect github.com/crate-crypto/go-kzg-4844 v0.7.0 // indirect diff --git a/sdks/go/go.sum b/sdks/go/go.sum index 41802218b64..60f0c01c4e2 100644 --- a/sdks/go/go.sum +++ b/sdks/go/go.sum @@ -32,6 +32,8 @@ github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2 h1:IKgmqgMQlVJIZj19CdocBeS github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2/go.mod h1:8BT+cPK6xvFOcRlk0R8eg+OTkcqI6baNH4xAkpiYVvQ= github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo= github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= +github.com/coder/websocket v1.8.14 h1:9L0p0iKiNOibykf283eHkKUHHrpG7f65OE3BhhO7v9g= +github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg= github.com/consensys/bavard v0.1.13 h1:oLhMLOFGTLdlda/kma4VOJazblc7IM5y5QPd2A/YjhQ= github.com/consensys/bavard v0.1.13/go.mod h1:9ItSMtA/dXMAiL7BG6bqW2m3NdSEObYWoH223nGHukI= github.com/consensys/gnark-crypto v0.12.1 h1:lHH39WuuFgVHONRl3J0LRBtuYdQTumFSDtJF7HpyG8M=