Skip to content

Commit

Permalink
bugfix: (missing test) missing zstd compressor.Close()
Browse files Browse the repository at this point in the history
  • Loading branch information
msf committed Jun 12, 2024
1 parent 3ff81f1 commit 4a41474
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"log/slog"
"net/http"
"sync"
"time"

Expand Down Expand Up @@ -61,7 +62,6 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line
}

// SendBlock sends a block to DuneAPI
// TODO: support batching multiple blocks in a single request
func (c *client) SendBlock(ctx context.Context, payload models.RPCBlock) error {
buffer := c.bufPool.Get().(*bytes.Buffer)
defer c.bufPool.Put(buffer)
Expand All @@ -79,12 +79,14 @@ func (c *client) buildRequest(payload models.RPCBlock, buffer *bytes.Buffer) (Bl
if c.cfg.DisableCompression {
request.Payload = payload.Payload
} else {
// not thread safe, multiple calls to the compressor here
buffer.Reset()
c.compressor.Reset(buffer)
_, err := c.compressor.Write(payload.Payload)
if err != nil {
return request, err
}
c.compressor.Close()
request.ContentEncoding = "application/zstd"
request.Payload = buffer.Bytes()
}
Expand Down Expand Up @@ -138,7 +140,9 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques
return err
}
defer resp.Body.Close()
responseStatus = resp.Status
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %v, %v", resp.StatusCode, resp.Status)
}
err = json.NewDecoder(resp.Body).Decode(&response)
if err != nil {
return err
Expand Down

0 comments on commit 4a41474

Please sign in to comment.