From 67f9c39dabd317c66b537bb9ae71b6211e5270db Mon Sep 17 00:00:00 2001 From: Artemii Gerasimovich Date: Mon, 15 Sep 2025 19:05:53 +0200 Subject: [PATCH 1/5] Implement transaction streaming --- .typos.toml | 2 + sdks/go/client/client.go | 49 ++++++++++++++ sdks/go/client/client_test.go | 9 +++ sdks/go/client/multiple_nodes_client.go | 90 +++++++++++++++++++++++++ sdks/go/client/query.go | 8 +++ sdks/go/client/stream.go | 75 +++++++++++++++++++++ sdks/go/go.mod | 1 + sdks/go/go.sum | 2 + 8 files changed, 236 insertions(+) create mode 100644 sdks/go/client/stream.go diff --git a/.typos.toml b/.typos.toml index 1d78ca16e6c..5bd270e3d04 100644 --- a/.typos.toml +++ b/.typos.toml @@ -20,3 +20,5 @@ extend-exclude = [ [default.extend-words] Forgetten = "Forgetten" +# Common "spelling" of "type" in Rust/go +typ = "typ" diff --git a/sdks/go/client/client.go b/sdks/go/client/client.go index 8483963f51d..e87270e5a18 100644 --- a/sdks/go/client/client.go +++ b/sdks/go/client/client.go @@ -12,6 +12,7 @@ import ( types "github.com/EspressoSystems/espresso-network/sdks/go/types" common "github.com/EspressoSystems/espresso-network/sdks/go/types/common" + "github.com/coder/websocket" ) var _ QueryService = (*Client)(nil) @@ -42,6 +43,9 @@ var ErrEphemeral = errors.New("retryable") // resolved by a retry. var ErrPermanent = errors.New("not retryable") +// Transaction submission or fetch error due to timeout. +var ErrTimeout = fmt.Errorf("%w: timeout", ErrEphemeral) + func (c *Client) FetchVidCommonByHeight(ctx context.Context, blockHeight uint64) (common.VidCommon, error) { var res types.VidCommonQueryData if err := c.get(ctx, &res, "availability/vid/common/%d", blockHeight); err != nil { @@ -177,6 +181,51 @@ func (c *Client) SubmitTransaction(ctx context.Context, tx types.Transaction) (* return &hash, nil } +type WsStream[S any] struct { + conn *websocket.Conn +} + +func (s *WsStream[S]) NextRaw(ctx context.Context) (json.RawMessage, error) { + typ, msg, err := s.conn.Read(ctx) + if err != nil { + return nil, fmt.Errorf("%w: %v", ErrEphemeral, err) + } + if typ != websocket.MessageText { + return nil, fmt.Errorf("%w: %v", ErrPermanent, err) + } + return msg, nil +} + +func (s *WsStream[S]) Next(ctx context.Context) (*S, error) { + typ, msg, err := s.conn.Read(ctx) + if err != nil { + return nil, fmt.Errorf("%w: %v", ErrEphemeral, err) + } + if typ != websocket.MessageText { + return nil, fmt.Errorf("%w: %v", ErrPermanent, err) + } + var data S + if err := json.Unmarshal(msg, &data); err != nil { + return nil, fmt.Errorf("%w: %v", ErrPermanent, err) + } + return &data, nil +} + +func (s *WsStream[S]) Close() error { + return s.conn.Close(websocket.StatusNormalClosure, "") +} + +func (c *Client) StreamTransactions(ctx context.Context, height uint64) (Stream[types.TransactionQueryData], error) { + opts := &websocket.DialOptions{} + opts.HTTPClient = c.client + url := c.baseUrl + fmt.Sprintf("availability/stream/transactions/%d", height) + conn, _, err := websocket.Dial(ctx, url, opts) + if err != nil { + return nil, fmt.Errorf("%w: %v", ErrEphemeral, err) + } + return &WsStream[types.TransactionQueryData]{conn: conn}, nil +} + type NamespaceResponse struct { Proof *json.RawMessage `json:"proof"` Transactions *[]types.Transaction `json:"transactions"` diff --git a/sdks/go/client/client_test.go b/sdks/go/client/client_test.go index 367656e769b..50698e2fb89 100644 --- a/sdks/go/client/client_test.go +++ b/sdks/go/client/client_test.go @@ -12,6 +12,7 @@ import ( tagged_base64 "github.com/EspressoSystems/espresso-network/sdks/go/tagged-base64" types "github.com/EspressoSystems/espresso-network/sdks/go/types" "github.com/ethereum/go-ethereum/log" + "github.com/stretchr/testify/require" ) var workingDir = "../../../" @@ -67,6 +68,14 @@ func TestApiWithEspressoDevNode(t *testing.T) { } fmt.Println("submitted transaction with hash", hash) + stream, err := client.StreamTransactions(ctx, 1) + require.NoError(t, err) + + txData, err := stream.Next(ctx) + require.NoError(t, err) + require.NotNil(t, txData) + require.Equal(t, txData.Transaction.Payload, tx.Payload) + require.Equal(t, txData.Transaction.Namespace, tx.Namespace) } func runDevNode(ctx context.Context, tmpDir string) func() { diff --git a/sdks/go/client/multiple_nodes_client.go b/sdks/go/client/multiple_nodes_client.go index 6b8199907ee..1ccf089ed42 100644 --- a/sdks/go/client/multiple_nodes_client.go +++ b/sdks/go/client/multiple_nodes_client.go @@ -168,6 +168,96 @@ func (c *MultipleNodesClient) SubmitTransaction(ctx context.Context, tx common.T return nil, fmt.Errorf("%w: encountered an error with all nodes while attempting to SubmitTransaction.\n Errors: %v \n", ErrEphemeral, errs) } +type MultiplexedStream[T any] struct { + nStreams int + workingStreams []Stream[T] +} + +func (ms *MultiplexedStream[T]) NextRaw(ctx context.Context) (json.RawMessage, error) { + newWorkingStreams := []Stream[T]{} + + majority := (ms.nStreams / 2) + 1 + + values := make(map[string]int) + var returnValue json.RawMessage = nil + + for _, stream := range ms.workingStreams { + rawValue, err := stream.NextRaw(ctx) + if err != nil { + continue + } + + hash, err := hashNormalizedJSON(rawValue) + if err != nil { + continue + } + + if _, ok := values[hash]; !ok { + values[hash] = 0 + } + + values[hash]++ + if values[hash] > majority { + returnValue = rawValue + } + + newWorkingStreams = append(newWorkingStreams, stream) + } + + ms.workingStreams = newWorkingStreams + + if returnValue == nil { + return nil, ErrPermanent + } else { + return returnValue, nil + } +} + +func (ms *MultiplexedStream[T]) Next(ctx context.Context) (*T, error) { + next, err := ms.NextRaw(ctx) + if err != nil { + return nil, err + } + + var value T + err = json.Unmarshal(next, &value) + if err != nil { + return nil, err + } + + return &value, nil +} + +func (ms *MultiplexedStream[T]) Close() error { + var returnErr error + for _, stream := range ms.workingStreams { + err := stream.Close() + if err != nil { + returnErr = err + } + } + + return returnErr +} + +func (c *MultipleNodesClient) StreamTransactions(ctx context.Context, height uint64) (Stream[types.TransactionQueryData], error) { + + workingStreams := []Stream[types.TransactionQueryData]{} + for _, node := range c.nodes { + stream, err := node.StreamTransactions(ctx, height) + if err != nil { + return nil, err + } + + workingStreams = append(workingStreams, stream) + } + + return &MultiplexedStream[types.TransactionQueryData]{ + nStreams: len(c.nodes), + workingStreams: workingStreams, + }, nil +} + func FetchWithMajority[T any](ctx context.Context, nodes []*T, fetchFunc func(*T) (json.RawMessage, error)) (json.RawMessage, error) { type result struct { value json.RawMessage diff --git a/sdks/go/client/query.go b/sdks/go/client/query.go index e9055b83ddf..82ac81af7a6 100644 --- a/sdks/go/client/query.go +++ b/sdks/go/client/query.go @@ -27,6 +27,14 @@ type QueryService interface { FetchVidCommonByHeight(ctx context.Context, blockHeight uint64) (types.VidCommon, error) // Get the transaction by its hash from the explorer. FetchExplorerTransactionByHash(ctx context.Context, hash *types.TaggedBase64) (types.ExplorerTransactionQueryData, error) + // Stream transactions starting from the given height. + StreamTransactions(ctx context.Context, height uint64) (Stream[types.TransactionQueryData], error) +} + +type Stream[S any] interface { + Next(ctx context.Context) (*S, error) + NextRaw(ctx context.Context) (json.RawMessage, error) + Close() error } // Response to `FetchTransactionsInBlock` diff --git a/sdks/go/client/stream.go b/sdks/go/client/stream.go new file mode 100644 index 00000000000..0c48fa5180f --- /dev/null +++ b/sdks/go/client/stream.go @@ -0,0 +1,75 @@ +package client + +import ( + "context" + "encoding/json" + "fmt" + "time" +) + +var _ Stream[any] = (*StreamWithTimeout[any])(nil) + +type StreamWithTimeout[T any] struct { + stream Stream[T] + timeout time.Duration +} + +func NewStreamWithTimeout[T any](stream Stream[T], timeout time.Duration) *StreamWithTimeout[T] { + return &StreamWithTimeout[T]{ + stream: stream, + timeout: timeout, + } +} + +func (s *StreamWithTimeout[T]) Next(ctx context.Context) (*T, error) { + raw, err := s.NextRaw(ctx) + + if err != nil { + return nil, err + } + + var data T + if err := json.Unmarshal(raw, &data); err != nil { + return nil, fmt.Errorf("%w: %s", ErrPermanent, err) + } + + return &data, nil +} + +func (s *StreamWithTimeout[T]) Close() error { + result := make(chan error, 1) + go func() { + err := s.stream.Close() + result <- err + }() + + select { + case <-time.After(s.timeout): + return fmt.Errorf("%w: timeout after %s", ErrPermanent, s.timeout) + case err := <-result: + return err + } +} + +func (s *StreamWithTimeout[T]) NextRaw(ctx context.Context) (json.RawMessage, error) { + type Result struct { + Data json.RawMessage + Err error + } + + result := make(chan Result, 1) + + go func() { + data, err := s.stream.NextRaw(ctx) + result <- Result{Data: data, Err: err} + }() + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(s.timeout): + return nil, fmt.Errorf("%w: timeout after %s", ErrTimeout, s.timeout) + case res := <-result: + return res.Data, res.Err + } +} diff --git a/sdks/go/go.mod b/sdks/go/go.mod index a4ecec58bd7..052c118d3ff 100644 --- a/sdks/go/go.mod +++ b/sdks/go/go.mod @@ -17,6 +17,7 @@ require ( github.com/StackExchange/wmi v1.2.1 // indirect github.com/bits-and-blooms/bitset v1.10.0 // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect + github.com/coder/websocket v1.8.14 // indirect github.com/consensys/bavard v0.1.13 // indirect github.com/consensys/gnark-crypto v0.12.1 // indirect github.com/crate-crypto/go-kzg-4844 v0.7.0 // indirect diff --git a/sdks/go/go.sum b/sdks/go/go.sum index 41802218b64..60f0c01c4e2 100644 --- a/sdks/go/go.sum +++ b/sdks/go/go.sum @@ -32,6 +32,8 @@ github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2 h1:IKgmqgMQlVJIZj19CdocBeS github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2/go.mod h1:8BT+cPK6xvFOcRlk0R8eg+OTkcqI6baNH4xAkpiYVvQ= github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo= github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= +github.com/coder/websocket v1.8.14 h1:9L0p0iKiNOibykf283eHkKUHHrpG7f65OE3BhhO7v9g= +github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg= github.com/consensys/bavard v0.1.13 h1:oLhMLOFGTLdlda/kma4VOJazblc7IM5y5QPd2A/YjhQ= github.com/consensys/bavard v0.1.13/go.mod h1:9ItSMtA/dXMAiL7BG6bqW2m3NdSEObYWoH223nGHukI= github.com/consensys/gnark-crypto v0.12.1 h1:lHH39WuuFgVHONRl3J0LRBtuYdQTumFSDtJF7HpyG8M= From 86919e393c665e4a0e359c3423162aa68ccfeddd Mon Sep 17 00:00:00 2001 From: Artemii Gerasimovich Date: Wed, 1 Oct 2025 17:22:15 +0200 Subject: [PATCH 2/5] feat: implement PR feedback - remove StreamWithTimeout and add namespace filtering support - Remove StreamWithTimeout as Go's context already provides timeout capabilities - Add StreamTransactionsInNamespace method to stream transactions filtered by namespace - Update QueryService interface and MultipleNodesClient to support namespace filtering - Add tests for the new namespace filtering functionality --- sdks/go/client/client.go | 11 ++++ sdks/go/client/client_test.go | 10 ++++ sdks/go/client/multiple_nodes_client.go | 18 ++++++ sdks/go/client/query.go | 2 + sdks/go/client/stream.go | 75 ------------------------- 5 files changed, 41 insertions(+), 75 deletions(-) delete mode 100644 sdks/go/client/stream.go diff --git a/sdks/go/client/client.go b/sdks/go/client/client.go index e87270e5a18..83d4495217b 100644 --- a/sdks/go/client/client.go +++ b/sdks/go/client/client.go @@ -226,6 +226,17 @@ func (c *Client) StreamTransactions(ctx context.Context, height uint64) (Stream[ return &WsStream[types.TransactionQueryData]{conn: conn}, nil } +func (c *Client) StreamTransactionsInNamespace(ctx context.Context, height uint64, namespace uint64) (Stream[types.TransactionQueryData], error) { + opts := &websocket.DialOptions{} + opts.HTTPClient = c.client + url := c.baseUrl + fmt.Sprintf("availability/stream/transactions/%d/namespace/%d", height, namespace) + conn, _, err := websocket.Dial(ctx, url, opts) + if err != nil { + return nil, fmt.Errorf("%w: %v", ErrEphemeral, err) + } + return &WsStream[types.TransactionQueryData]{conn: conn}, nil +} + type NamespaceResponse struct { Proof *json.RawMessage `json:"proof"` Transactions *[]types.Transaction `json:"transactions"` diff --git a/sdks/go/client/client_test.go b/sdks/go/client/client_test.go index 50698e2fb89..e69387f1068 100644 --- a/sdks/go/client/client_test.go +++ b/sdks/go/client/client_test.go @@ -76,6 +76,16 @@ func TestApiWithEspressoDevNode(t *testing.T) { require.NotNil(t, txData) require.Equal(t, txData.Transaction.Payload, tx.Payload) require.Equal(t, txData.Transaction.Namespace, tx.Namespace) + + // Test streaming with namespace filter + nsStream, err := client.StreamTransactionsInNamespace(ctx, 1, tx.Namespace) + require.NoError(t, err) + + nsTxData, err := nsStream.Next(ctx) + require.NoError(t, err) + require.NotNil(t, nsTxData) + require.Equal(t, nsTxData.Transaction.Payload, tx.Payload) + require.Equal(t, nsTxData.Transaction.Namespace, tx.Namespace) } func runDevNode(ctx context.Context, tmpDir string) func() { diff --git a/sdks/go/client/multiple_nodes_client.go b/sdks/go/client/multiple_nodes_client.go index 1ccf089ed42..8c46b231593 100644 --- a/sdks/go/client/multiple_nodes_client.go +++ b/sdks/go/client/multiple_nodes_client.go @@ -258,6 +258,24 @@ func (c *MultipleNodesClient) StreamTransactions(ctx context.Context, height uin }, nil } +func (c *MultipleNodesClient) StreamTransactionsInNamespace(ctx context.Context, height uint64, namespace uint64) (Stream[types.TransactionQueryData], error) { + + workingStreams := []Stream[types.TransactionQueryData]{} + for _, node := range c.nodes { + stream, err := node.StreamTransactionsInNamespace(ctx, height, namespace) + if err != nil { + return nil, err + } + + workingStreams = append(workingStreams, stream) + } + + return &MultiplexedStream[types.TransactionQueryData]{ + nStreams: len(c.nodes), + workingStreams: workingStreams, + }, nil +} + func FetchWithMajority[T any](ctx context.Context, nodes []*T, fetchFunc func(*T) (json.RawMessage, error)) (json.RawMessage, error) { type result struct { value json.RawMessage diff --git a/sdks/go/client/query.go b/sdks/go/client/query.go index 82ac81af7a6..2711c21bcc2 100644 --- a/sdks/go/client/query.go +++ b/sdks/go/client/query.go @@ -29,6 +29,8 @@ type QueryService interface { FetchExplorerTransactionByHash(ctx context.Context, hash *types.TaggedBase64) (types.ExplorerTransactionQueryData, error) // Stream transactions starting from the given height. StreamTransactions(ctx context.Context, height uint64) (Stream[types.TransactionQueryData], error) + // Stream transactions starting from the given height, filtered by namespace. + StreamTransactionsInNamespace(ctx context.Context, height uint64, namespace uint64) (Stream[types.TransactionQueryData], error) } type Stream[S any] interface { diff --git a/sdks/go/client/stream.go b/sdks/go/client/stream.go deleted file mode 100644 index 0c48fa5180f..00000000000 --- a/sdks/go/client/stream.go +++ /dev/null @@ -1,75 +0,0 @@ -package client - -import ( - "context" - "encoding/json" - "fmt" - "time" -) - -var _ Stream[any] = (*StreamWithTimeout[any])(nil) - -type StreamWithTimeout[T any] struct { - stream Stream[T] - timeout time.Duration -} - -func NewStreamWithTimeout[T any](stream Stream[T], timeout time.Duration) *StreamWithTimeout[T] { - return &StreamWithTimeout[T]{ - stream: stream, - timeout: timeout, - } -} - -func (s *StreamWithTimeout[T]) Next(ctx context.Context) (*T, error) { - raw, err := s.NextRaw(ctx) - - if err != nil { - return nil, err - } - - var data T - if err := json.Unmarshal(raw, &data); err != nil { - return nil, fmt.Errorf("%w: %s", ErrPermanent, err) - } - - return &data, nil -} - -func (s *StreamWithTimeout[T]) Close() error { - result := make(chan error, 1) - go func() { - err := s.stream.Close() - result <- err - }() - - select { - case <-time.After(s.timeout): - return fmt.Errorf("%w: timeout after %s", ErrPermanent, s.timeout) - case err := <-result: - return err - } -} - -func (s *StreamWithTimeout[T]) NextRaw(ctx context.Context) (json.RawMessage, error) { - type Result struct { - Data json.RawMessage - Err error - } - - result := make(chan Result, 1) - - go func() { - data, err := s.stream.NextRaw(ctx) - result <- Result{Data: data, Err: err} - }() - - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-time.After(s.timeout): - return nil, fmt.Errorf("%w: timeout after %s", ErrTimeout, s.timeout) - case res := <-result: - return res.Data, res.Err - } -} From 0b3e6bf0a863cad1fa91bd7d334880f1a1a3ad5e Mon Sep 17 00:00:00 2001 From: Artemii Gerasimovich Date: Wed, 1 Oct 2025 17:43:18 +0200 Subject: [PATCH 3/5] Remove extraneous ErrTimeout --- sdks/go/client/client.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/sdks/go/client/client.go b/sdks/go/client/client.go index 83d4495217b..1a5879bc7fb 100644 --- a/sdks/go/client/client.go +++ b/sdks/go/client/client.go @@ -43,9 +43,6 @@ var ErrEphemeral = errors.New("retryable") // resolved by a retry. var ErrPermanent = errors.New("not retryable") -// Transaction submission or fetch error due to timeout. -var ErrTimeout = fmt.Errorf("%w: timeout", ErrEphemeral) - func (c *Client) FetchVidCommonByHeight(ctx context.Context, blockHeight uint64) (common.VidCommon, error) { var res types.VidCommonQueryData if err := c.get(ctx, &res, "availability/vid/common/%d", blockHeight); err != nil { From b9561a6a0482ebdf7de0fc98175610690bed0011 Mon Sep 17 00:00:00 2001 From: Artemii Gerasimovich Date: Fri, 3 Oct 2025 19:12:26 +0200 Subject: [PATCH 4/5] Fix majority check --- sdks/go/client/multiple_nodes_client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/go/client/multiple_nodes_client.go b/sdks/go/client/multiple_nodes_client.go index 8c46b231593..60c7afe0caa 100644 --- a/sdks/go/client/multiple_nodes_client.go +++ b/sdks/go/client/multiple_nodes_client.go @@ -197,7 +197,7 @@ func (ms *MultiplexedStream[T]) NextRaw(ctx context.Context) (json.RawMessage, e } values[hash]++ - if values[hash] > majority { + if values[hash] >= majority { returnValue = rawValue } @@ -207,7 +207,7 @@ func (ms *MultiplexedStream[T]) NextRaw(ctx context.Context) (json.RawMessage, e ms.workingStreams = newWorkingStreams if returnValue == nil { - return nil, ErrPermanent + return nil, fmt.Errorf("%w: no majority", ErrPermanent) } else { return returnValue, nil } From bd31a23fdf879917aa40ca79a43c2aa413d2d1a4 Mon Sep 17 00:00:00 2001 From: Artemii Gerasimovich Date: Fri, 3 Oct 2025 19:26:23 +0200 Subject: [PATCH 5/5] Add docs --- sdks/go/client/client.go | 3 +++ sdks/go/client/multiple_nodes_client.go | 6 ++++++ sdks/go/client/query.go | 5 +++++ 3 files changed, 14 insertions(+) diff --git a/sdks/go/client/client.go b/sdks/go/client/client.go index 1a5879bc7fb..b763bb11442 100644 --- a/sdks/go/client/client.go +++ b/sdks/go/client/client.go @@ -178,6 +178,7 @@ func (c *Client) SubmitTransaction(ctx context.Context, tx types.Transaction) (* return &hash, nil } +// Stream of JSON-encoded objects over a WebSocket connection type WsStream[S any] struct { conn *websocket.Conn } @@ -212,6 +213,7 @@ func (s *WsStream[S]) Close() error { return s.conn.Close(websocket.StatusNormalClosure, "") } +// Open a `Stream` of Espresso transactions starting from a specific block height. func (c *Client) StreamTransactions(ctx context.Context, height uint64) (Stream[types.TransactionQueryData], error) { opts := &websocket.DialOptions{} opts.HTTPClient = c.client @@ -223,6 +225,7 @@ func (c *Client) StreamTransactions(ctx context.Context, height uint64) (Stream[ return &WsStream[types.TransactionQueryData]{conn: conn}, nil } +// Open a `Stream` of Espresso transactions starting from a specific block height, filtered by namespace. func (c *Client) StreamTransactionsInNamespace(ctx context.Context, height uint64, namespace uint64) (Stream[types.TransactionQueryData], error) { opts := &websocket.DialOptions{} opts.HTTPClient = c.client diff --git a/sdks/go/client/multiple_nodes_client.go b/sdks/go/client/multiple_nodes_client.go index 60c7afe0caa..0ab51f598d9 100644 --- a/sdks/go/client/multiple_nodes_client.go +++ b/sdks/go/client/multiple_nodes_client.go @@ -168,6 +168,12 @@ func (c *MultipleNodesClient) SubmitTransaction(ctx context.Context, tx common.T return nil, fmt.Errorf("%w: encountered an error with all nodes while attempting to SubmitTransaction.\n Errors: %v \n", ErrEphemeral, errs) } +// A wrapper over multiple `Stream`s that are supposed to return the same +// sequence of objects that verifies the items using majority rule. +// An underlying stream that deviates from majority or responds with an error +// is disabled for the rest of the MultiplexedStream's existence. If majority +// of the underlying streams is disabled, calling Next on the MultiplexedStream +// will always return an error. type MultiplexedStream[T any] struct { nStreams int workingStreams []Stream[T] diff --git a/sdks/go/client/query.go b/sdks/go/client/query.go index 2711c21bcc2..c05bf193523 100644 --- a/sdks/go/client/query.go +++ b/sdks/go/client/query.go @@ -33,9 +33,14 @@ type QueryService interface { StreamTransactionsInNamespace(ctx context.Context, height uint64, namespace uint64) (Stream[types.TransactionQueryData], error) } +// Interface representing a pollable stream of JSON-encoded objects type Stream[S any] interface { + // Get the next item from the stream. Equivalent to + // calling NextRaw(ctx) and decoding the result. Next(ctx context.Context) (*S, error) + // Get next item from the stream as raw JSON objects. NextRaw(ctx context.Context) (json.RawMessage, error) + // Close the underlying connection, if applicable Close() error }