Skip to content

Commit

Permalink
DuneAPI: new batch wire format (breaking change!)
Browse files Browse the repository at this point in the history
This is a breaking change, it changes the block batch format on the wire.
It introduces a single JSON line (NDJSON) which is the batch header.
This can be DISABLED with `DUNEAPI_DISABLE_BATCH_HEADER`
environment variable

This line indicates how many blocks and the size of each block message
In essense, we're length prefixing the batch.
This allows the reader to split out easily all the messages.
Until now this wasn't needed because Optimism Stack (EVM stack) uses a
fixed 3-line payload for the blocks.
But this isn't the case in other EVM stacks (such as arbitrum nitro)

It uses JSON to encode this information because the jsonRPC message from
the blockchain RPC node are already in JSON and the wire format is already NDJSON.

For the server side to handle in a backwards compatible way the payload,
it needs to read the first line and attempt to parse it as a BlockBatchHeader message
If that fails, it can revert back to the older message format parser
  • Loading branch information
msf committed Jul 23, 2024
1 parent dcefb56 commit 0152929
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 16 deletions.
44 changes: 44 additions & 0 deletions client/duneapi/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package duneapi

import (
"encoding/json"
"io"

"github.com/duneanalytics/blockchain-ingester/models"
)

type BlockBatchHeader struct {
BlockSizes []int `json:"block_sizes"`
}

func WriteBlockBatch(out io.Writer, payloads []models.RPCBlock, disableHeader bool) error {
// we write a batch header (single line, NDJSON) with the size of each block payload and then concatenate the payloads
header := BlockBatchHeader{
BlockSizes: make([]int, len(payloads)),
}
for i, block := range payloads {
header.BlockSizes[i] = len(block.Payload)
}
// allow disabling the header for testing/backwards compatibility
if !disableHeader {
buf, err := json.Marshal(header)
if err != nil {
return err
}
_, err = out.Write(buf)
if err != nil {
return err
}
_, err = out.Write([]byte("\n"))
if err != nil {
return err
}
}
for _, block := range payloads {
_, err := out.Write(block.Payload)
if err != nil {
return err
}
}
return nil
}
95 changes: 95 additions & 0 deletions client/duneapi/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package duneapi_test

import (
"bufio"
"bytes"
"encoding/json"
"io"
"testing"

"github.com/duneanalytics/blockchain-ingester/client/duneapi"
"github.com/duneanalytics/blockchain-ingester/models"
"github.com/stretchr/testify/require"
)

func TestWriteBlockBatch(t *testing.T) {
tests := []struct {
name string
payloads []models.RPCBlock
expected string
}{
{
name: "single payload",
payloads: []models.RPCBlock{
{Payload: []byte(`{"block":1}`)},
},
expected: `{"block_sizes":[11]}
{"block":1}`,
},
{
name: "multiple payloads, with new lines",
payloads: []models.RPCBlock{
{Payload: []byte(`{"block":1}` + "\n")},
{Payload: []byte(`{"block":2}` + "\n")},
},
expected: `{"block_sizes":[12,12]}
{"block":1}
{"block":2}
`,
},
{
name: "multiple payloads, no newlines",
payloads: []models.RPCBlock{
{Payload: []byte(`{"block":1}`)},
{Payload: []byte(`{"block":2}`)},
},
expected: `{"block_sizes":[11,11]}
{"block":1}{"block":2}`,
},
{
name: "empty payloads",
payloads: []models.RPCBlock{},
expected: `{"block_sizes":[]}
`,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var buf bytes.Buffer
err := duneapi.WriteBlockBatch(&buf, tt.payloads, false)
require.NoError(t, err)

require.Equal(t, tt.expected, buf.String())
rebuilt, err := ReadBlockBatch(&buf)
require.NoError(t, err)
require.EqualValues(t, tt.payloads, rebuilt)
})
}
}

func ReadBlockBatch(buf *bytes.Buffer) ([]models.RPCBlock, error) {
reader := bufio.NewReader(buf)
headerLine, err := reader.ReadString('\n')
if err != nil {
return nil, err
}

var header duneapi.BlockBatchHeader
err = json.Unmarshal([]byte(headerLine), &header)
if err != nil {
return nil, err
}

payloads := make([]models.RPCBlock, len(header.BlockSizes))
for i, size := range header.BlockSizes {
payload := make([]byte, size)
_, err := io.ReadFull(reader, payload)
if err != nil {
return nil, err
}
payloads[i] = models.RPCBlock{Payload: payload}
}

return payloads, nil
}
25 changes: 11 additions & 14 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,33 +101,28 @@ func (c *client) SendBlocks(ctx context.Context, payloads []models.RPCBlock) err

func (c *client) buildRequest(payloads []models.RPCBlock, buffer *bytes.Buffer) (*BlockchainIngestRequest, error) {
request := &BlockchainIngestRequest{}
var err error

buffer.Reset()
// not thread safe, multiple calls to the compressor here
if c.cfg.DisableCompression {
buffer.Reset()
for _, block := range payloads {
_, err := buffer.Write(block.Payload)
if err != nil {
return nil, err
}
err = WriteBlockBatch(buffer, payloads, c.cfg.DisableBatchHeader)
if err != nil {
return nil, err
}
request.Payload = buffer.Bytes()
} else {
buffer.Reset()
c.compressor.Reset(buffer)
for _, block := range payloads {
_, err := c.compressor.Write(block.Payload)
if err != nil {
return nil, err
}
err = WriteBlockBatch(c.compressor, payloads, c.cfg.DisableBatchHeader)
if err != nil {
return nil, err
}
err := c.compressor.Close()
if err != nil {
return nil, err
}
request.ContentEncoding = "application/zstd"
request.Payload = buffer.Bytes()
}
request.Payload = buffer.Bytes()

numbers := make([]string, len(payloads))
for i, payload := range payloads {
Expand All @@ -137,6 +132,7 @@ func (c *client) buildRequest(payloads []models.RPCBlock, buffer *bytes.Buffer)
request.BlockNumbers = blockNumbers
request.IdempotencyKey = c.idempotencyKey(*request)
request.EVMStack = c.cfg.Stack.String()
request.BatchSize = len(payloads)
return request, nil
}

Expand Down Expand Up @@ -281,6 +277,7 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch
if err != nil {
return err
}
req.Header.Set("User-Agent", userAgent)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("x-dune-api-key", c.cfg.APIKey)
req = req.WithContext(ctx)
Expand Down
2 changes: 2 additions & 0 deletions client/duneapi/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type Config struct {
// - lowers latency
// - reduces bandwidth
DisableCompression bool

DisableBatchHeader bool // for testing/backwards compatibility
}

// The response from the DuneAPI ingest endpoint.
Expand Down
1 change: 1 addition & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func main() {
BlockchainName: cfg.BlockchainName,
Stack: cfg.RPCStack,
DisableCompression: cfg.DisableCompression,
DisableBatchHeader: cfg.Dune.DisableBatchHeader,
})
if err != nil {
stdlog.Fatal(err)
Expand Down
5 changes: 3 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (
)

type DuneClient struct {
APIKey string `long:"dune-api-key" env:"DUNE_API_KEY" description:"API key for DuneAPI"`
URL string `long:"dune-api-url" env:"DUNE_API_URL" description:"URL for DuneAPI" default:"https://api.dune.com"`
APIKey string `long:"dune-api-key" env:"DUNE_API_KEY" description:"API key for DuneAPI"`
URL string `long:"dune-api-url" env:"DUNE_API_URL" description:"URL for DuneAPI" default:"https://api.dune.com"` // nolint:lll
DisableBatchHeader bool `long:"duneapi-disable-batch-header" env:"DUNEAPI_DISABLE_BATCH_HEADERS" description:"Disable batch headers on DuneAPI request payload"` // nolint:lll
}

func (d DuneClient) HasError() error {
Expand Down

0 comments on commit 0152929

Please sign in to comment.