From 01529290b5694d0ba110175e98ced353b57bb07c Mon Sep 17 00:00:00 2001 From: Miguel Filipe Date: Mon, 22 Jul 2024 18:44:19 -0700 Subject: [PATCH] DuneAPI: new batch wire format (breaking change!) This is a breaking change, it changes the block batch format on the wire. It introduces a single JSON line (NDJSON) which is the batch header. This can be DISABLED with `DUNEAPI_DISABLE_BATCH_HEADER` environment variable This line indicates how many blocks and the size of each block message In essense, we're length prefixing the batch. This allows the reader to split out easily all the messages. Until now this wasn't needed because Optimism Stack (EVM stack) uses a fixed 3-line payload for the blocks. But this isn't the case in other EVM stacks (such as arbitrum nitro) It uses JSON to encode this information because the jsonRPC message from the blockchain RPC node are already in JSON and the wire format is already NDJSON. For the server side to handle in a backwards compatible way the payload, it needs to read the first line and attempt to parse it as a BlockBatchHeader message If that fails, it can revert back to the older message format parser --- client/duneapi/batch.go | 44 +++++++++++++++++ client/duneapi/batch_test.go | 95 ++++++++++++++++++++++++++++++++++++ client/duneapi/client.go | 25 +++++----- client/duneapi/models.go | 2 + cmd/main.go | 1 + config/config.go | 5 +- 6 files changed, 156 insertions(+), 16 deletions(-) create mode 100644 client/duneapi/batch.go create mode 100644 client/duneapi/batch_test.go diff --git a/client/duneapi/batch.go b/client/duneapi/batch.go new file mode 100644 index 0000000..dc98c9f --- /dev/null +++ b/client/duneapi/batch.go @@ -0,0 +1,44 @@ +package duneapi + +import ( + "encoding/json" + "io" + + "github.com/duneanalytics/blockchain-ingester/models" +) + +type BlockBatchHeader struct { + BlockSizes []int `json:"block_sizes"` +} + +func WriteBlockBatch(out io.Writer, payloads []models.RPCBlock, disableHeader bool) error { + // we write a batch header (single line, NDJSON) with the size of each block payload and then concatenate the payloads + header := BlockBatchHeader{ + BlockSizes: make([]int, len(payloads)), + } + for i, block := range payloads { + header.BlockSizes[i] = len(block.Payload) + } + // allow disabling the header for testing/backwards compatibility + if !disableHeader { + buf, err := json.Marshal(header) + if err != nil { + return err + } + _, err = out.Write(buf) + if err != nil { + return err + } + _, err = out.Write([]byte("\n")) + if err != nil { + return err + } + } + for _, block := range payloads { + _, err := out.Write(block.Payload) + if err != nil { + return err + } + } + return nil +} diff --git a/client/duneapi/batch_test.go b/client/duneapi/batch_test.go new file mode 100644 index 0000000..b9c7ba8 --- /dev/null +++ b/client/duneapi/batch_test.go @@ -0,0 +1,95 @@ +package duneapi_test + +import ( + "bufio" + "bytes" + "encoding/json" + "io" + "testing" + + "github.com/duneanalytics/blockchain-ingester/client/duneapi" + "github.com/duneanalytics/blockchain-ingester/models" + "github.com/stretchr/testify/require" +) + +func TestWriteBlockBatch(t *testing.T) { + tests := []struct { + name string + payloads []models.RPCBlock + expected string + }{ + { + name: "single payload", + payloads: []models.RPCBlock{ + {Payload: []byte(`{"block":1}`)}, + }, + expected: `{"block_sizes":[11]} +{"block":1}`, + }, + { + name: "multiple payloads, with new lines", + payloads: []models.RPCBlock{ + {Payload: []byte(`{"block":1}` + "\n")}, + {Payload: []byte(`{"block":2}` + "\n")}, + }, + expected: `{"block_sizes":[12,12]} +{"block":1} +{"block":2} +`, + }, + { + name: "multiple payloads, no newlines", + payloads: []models.RPCBlock{ + {Payload: []byte(`{"block":1}`)}, + {Payload: []byte(`{"block":2}`)}, + }, + expected: `{"block_sizes":[11,11]} +{"block":1}{"block":2}`, + }, + { + name: "empty payloads", + payloads: []models.RPCBlock{}, + expected: `{"block_sizes":[]} +`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + err := duneapi.WriteBlockBatch(&buf, tt.payloads, false) + require.NoError(t, err) + + require.Equal(t, tt.expected, buf.String()) + rebuilt, err := ReadBlockBatch(&buf) + require.NoError(t, err) + require.EqualValues(t, tt.payloads, rebuilt) + }) + } +} + +func ReadBlockBatch(buf *bytes.Buffer) ([]models.RPCBlock, error) { + reader := bufio.NewReader(buf) + headerLine, err := reader.ReadString('\n') + if err != nil { + return nil, err + } + + var header duneapi.BlockBatchHeader + err = json.Unmarshal([]byte(headerLine), &header) + if err != nil { + return nil, err + } + + payloads := make([]models.RPCBlock, len(header.BlockSizes)) + for i, size := range header.BlockSizes { + payload := make([]byte, size) + _, err := io.ReadFull(reader, payload) + if err != nil { + return nil, err + } + payloads[i] = models.RPCBlock{Payload: payload} + } + + return payloads, nil +} diff --git a/client/duneapi/client.go b/client/duneapi/client.go index b91e114..37f2342 100644 --- a/client/duneapi/client.go +++ b/client/duneapi/client.go @@ -101,33 +101,28 @@ func (c *client) SendBlocks(ctx context.Context, payloads []models.RPCBlock) err func (c *client) buildRequest(payloads []models.RPCBlock, buffer *bytes.Buffer) (*BlockchainIngestRequest, error) { request := &BlockchainIngestRequest{} + var err error + buffer.Reset() // not thread safe, multiple calls to the compressor here if c.cfg.DisableCompression { - buffer.Reset() - for _, block := range payloads { - _, err := buffer.Write(block.Payload) - if err != nil { - return nil, err - } + err = WriteBlockBatch(buffer, payloads, c.cfg.DisableBatchHeader) + if err != nil { + return nil, err } - request.Payload = buffer.Bytes() } else { - buffer.Reset() c.compressor.Reset(buffer) - for _, block := range payloads { - _, err := c.compressor.Write(block.Payload) - if err != nil { - return nil, err - } + err = WriteBlockBatch(c.compressor, payloads, c.cfg.DisableBatchHeader) + if err != nil { + return nil, err } err := c.compressor.Close() if err != nil { return nil, err } request.ContentEncoding = "application/zstd" - request.Payload = buffer.Bytes() } + request.Payload = buffer.Bytes() numbers := make([]string, len(payloads)) for i, payload := range payloads { @@ -137,6 +132,7 @@ func (c *client) buildRequest(payloads []models.RPCBlock, buffer *bytes.Buffer) request.BlockNumbers = blockNumbers request.IdempotencyKey = c.idempotencyKey(*request) request.EVMStack = c.cfg.Stack.String() + request.BatchSize = len(payloads) return request, nil } @@ -281,6 +277,7 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch if err != nil { return err } + req.Header.Set("User-Agent", userAgent) req.Header.Set("Content-Type", "application/json") req.Header.Set("x-dune-api-key", c.cfg.APIKey) req = req.WithContext(ctx) diff --git a/client/duneapi/models.go b/client/duneapi/models.go index 0a1e12e..c8148f1 100644 --- a/client/duneapi/models.go +++ b/client/duneapi/models.go @@ -22,6 +22,8 @@ type Config struct { // - lowers latency // - reduces bandwidth DisableCompression bool + + DisableBatchHeader bool // for testing/backwards compatibility } // The response from the DuneAPI ingest endpoint. diff --git a/cmd/main.go b/cmd/main.go index 6b1448c..c648542 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -57,6 +57,7 @@ func main() { BlockchainName: cfg.BlockchainName, Stack: cfg.RPCStack, DisableCompression: cfg.DisableCompression, + DisableBatchHeader: cfg.Dune.DisableBatchHeader, }) if err != nil { stdlog.Fatal(err) diff --git a/config/config.go b/config/config.go index 94d2f9d..e50cf04 100644 --- a/config/config.go +++ b/config/config.go @@ -11,8 +11,9 @@ import ( ) type DuneClient struct { - APIKey string `long:"dune-api-key" env:"DUNE_API_KEY" description:"API key for DuneAPI"` - URL string `long:"dune-api-url" env:"DUNE_API_URL" description:"URL for DuneAPI" default:"https://api.dune.com"` + APIKey string `long:"dune-api-key" env:"DUNE_API_KEY" description:"API key for DuneAPI"` + URL string `long:"dune-api-url" env:"DUNE_API_URL" description:"URL for DuneAPI" default:"https://api.dune.com"` // nolint:lll + DisableBatchHeader bool `long:"duneapi-disable-batch-header" env:"DUNEAPI_DISABLE_BATCH_HEADERS" description:"Disable batch headers on DuneAPI request payload"` // nolint:lll } func (d DuneClient) HasError() error {