diff --git a/client/duneapi/batch.go b/client/duneapi/batch.go new file mode 100644 index 0000000..dc98c9f --- /dev/null +++ b/client/duneapi/batch.go @@ -0,0 +1,44 @@ +package duneapi + +import ( + "encoding/json" + "io" + + "github.com/duneanalytics/blockchain-ingester/models" +) + +type BlockBatchHeader struct { + BlockSizes []int `json:"block_sizes"` +} + +func WriteBlockBatch(out io.Writer, payloads []models.RPCBlock, disableHeader bool) error { + // we write a batch header (single line, NDJSON) with the size of each block payload and then concatenate the payloads + header := BlockBatchHeader{ + BlockSizes: make([]int, len(payloads)), + } + for i, block := range payloads { + header.BlockSizes[i] = len(block.Payload) + } + // allow disabling the header for testing/backwards compatibility + if !disableHeader { + buf, err := json.Marshal(header) + if err != nil { + return err + } + _, err = out.Write(buf) + if err != nil { + return err + } + _, err = out.Write([]byte("\n")) + if err != nil { + return err + } + } + for _, block := range payloads { + _, err := out.Write(block.Payload) + if err != nil { + return err + } + } + return nil +} diff --git a/client/duneapi/batch_test.go b/client/duneapi/batch_test.go new file mode 100644 index 0000000..b9c7ba8 --- /dev/null +++ b/client/duneapi/batch_test.go @@ -0,0 +1,95 @@ +package duneapi_test + +import ( + "bufio" + "bytes" + "encoding/json" + "io" + "testing" + + "github.com/duneanalytics/blockchain-ingester/client/duneapi" + "github.com/duneanalytics/blockchain-ingester/models" + "github.com/stretchr/testify/require" +) + +func TestWriteBlockBatch(t *testing.T) { + tests := []struct { + name string + payloads []models.RPCBlock + expected string + }{ + { + name: "single payload", + payloads: []models.RPCBlock{ + {Payload: []byte(`{"block":1}`)}, + }, + expected: `{"block_sizes":[11]} +{"block":1}`, + }, + { + name: "multiple payloads, with new lines", + payloads: []models.RPCBlock{ + {Payload: []byte(`{"block":1}` + "\n")}, + {Payload: []byte(`{"block":2}` + "\n")}, + }, + expected: `{"block_sizes":[12,12]} +{"block":1} +{"block":2} +`, + }, + { + name: "multiple payloads, no newlines", + payloads: []models.RPCBlock{ + {Payload: []byte(`{"block":1}`)}, + {Payload: []byte(`{"block":2}`)}, + }, + expected: `{"block_sizes":[11,11]} +{"block":1}{"block":2}`, + }, + { + name: "empty payloads", + payloads: []models.RPCBlock{}, + expected: `{"block_sizes":[]} +`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + err := duneapi.WriteBlockBatch(&buf, tt.payloads, false) + require.NoError(t, err) + + require.Equal(t, tt.expected, buf.String()) + rebuilt, err := ReadBlockBatch(&buf) + require.NoError(t, err) + require.EqualValues(t, tt.payloads, rebuilt) + }) + } +} + +func ReadBlockBatch(buf *bytes.Buffer) ([]models.RPCBlock, error) { + reader := bufio.NewReader(buf) + headerLine, err := reader.ReadString('\n') + if err != nil { + return nil, err + } + + var header duneapi.BlockBatchHeader + err = json.Unmarshal([]byte(headerLine), &header) + if err != nil { + return nil, err + } + + payloads := make([]models.RPCBlock, len(header.BlockSizes)) + for i, size := range header.BlockSizes { + payload := make([]byte, size) + _, err := io.ReadFull(reader, payload) + if err != nil { + return nil, err + } + payloads[i] = models.RPCBlock{Payload: payload} + } + + return payloads, nil +} diff --git a/client/duneapi/client.go b/client/duneapi/client.go index b91e114..37f2342 100644 --- a/client/duneapi/client.go +++ b/client/duneapi/client.go @@ -101,33 +101,28 @@ func (c *client) SendBlocks(ctx context.Context, payloads []models.RPCBlock) err func (c *client) buildRequest(payloads []models.RPCBlock, buffer *bytes.Buffer) (*BlockchainIngestRequest, error) { request := &BlockchainIngestRequest{} + var err error + buffer.Reset() // not thread safe, multiple calls to the compressor here if c.cfg.DisableCompression { - buffer.Reset() - for _, block := range payloads { - _, err := buffer.Write(block.Payload) - if err != nil { - return nil, err - } + err = WriteBlockBatch(buffer, payloads, c.cfg.DisableBatchHeader) + if err != nil { + return nil, err } - request.Payload = buffer.Bytes() } else { - buffer.Reset() c.compressor.Reset(buffer) - for _, block := range payloads { - _, err := c.compressor.Write(block.Payload) - if err != nil { - return nil, err - } + err = WriteBlockBatch(c.compressor, payloads, c.cfg.DisableBatchHeader) + if err != nil { + return nil, err } err := c.compressor.Close() if err != nil { return nil, err } request.ContentEncoding = "application/zstd" - request.Payload = buffer.Bytes() } + request.Payload = buffer.Bytes() numbers := make([]string, len(payloads)) for i, payload := range payloads { @@ -137,6 +132,7 @@ func (c *client) buildRequest(payloads []models.RPCBlock, buffer *bytes.Buffer) request.BlockNumbers = blockNumbers request.IdempotencyKey = c.idempotencyKey(*request) request.EVMStack = c.cfg.Stack.String() + request.BatchSize = len(payloads) return request, nil } @@ -281,6 +277,7 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch if err != nil { return err } + req.Header.Set("User-Agent", userAgent) req.Header.Set("Content-Type", "application/json") req.Header.Set("x-dune-api-key", c.cfg.APIKey) req = req.WithContext(ctx) diff --git a/client/duneapi/models.go b/client/duneapi/models.go index 0a1e12e..c8148f1 100644 --- a/client/duneapi/models.go +++ b/client/duneapi/models.go @@ -22,6 +22,8 @@ type Config struct { // - lowers latency // - reduces bandwidth DisableCompression bool + + DisableBatchHeader bool // for testing/backwards compatibility } // The response from the DuneAPI ingest endpoint. diff --git a/cmd/main.go b/cmd/main.go index 6b1448c..c648542 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -57,6 +57,7 @@ func main() { BlockchainName: cfg.BlockchainName, Stack: cfg.RPCStack, DisableCompression: cfg.DisableCompression, + DisableBatchHeader: cfg.Dune.DisableBatchHeader, }) if err != nil { stdlog.Fatal(err) diff --git a/config/config.go b/config/config.go index 94d2f9d..e50cf04 100644 --- a/config/config.go +++ b/config/config.go @@ -11,8 +11,9 @@ import ( ) type DuneClient struct { - APIKey string `long:"dune-api-key" env:"DUNE_API_KEY" description:"API key for DuneAPI"` - URL string `long:"dune-api-url" env:"DUNE_API_URL" description:"URL for DuneAPI" default:"https://api.dune.com"` + APIKey string `long:"dune-api-key" env:"DUNE_API_KEY" description:"API key for DuneAPI"` + URL string `long:"dune-api-url" env:"DUNE_API_URL" description:"URL for DuneAPI" default:"https://api.dune.com"` // nolint:lll + DisableBatchHeader bool `long:"duneapi-disable-batch-header" env:"DUNEAPI_DISABLE_BATCH_HEADERS" description:"Disable batch headers on DuneAPI request payload"` // nolint:lll } func (d DuneClient) HasError() error {