diff --git a/apps/chproxy/buffer.go b/apps/chproxy/buffer.go index 4a5a125b05..00dedaef8e 100644 --- a/apps/chproxy/buffer.go +++ b/apps/chproxy/buffer.go @@ -45,12 +45,14 @@ func startBufferProcessor( span.RecordError(err) span.SetStatus(codes.Error, err.Error()) - config.Logger.Error("error persisting batch", + config.Logger.Error("error persisting batch, data dropped", "error", err.Error(), "table", batch.Table, + "rows_dropped", len(batch.Rows), "query", batch.Params.Get("query"), ) } + // Note: Memory will be freed when batchesByParams is reset below } duration := time.Since(startTime).Seconds() @@ -86,9 +88,36 @@ func startBufferProcessor( config.Logger.Debug("new batch type received", "query", b.Params.Get("query")) } else { - batch.Rows = append(batch.Rows, b.Rows...) + // Check if adding these rows would exceed the per-batch limit + if len(batch.Rows)+len(b.Rows) > maxBatchRows { + // Flush the current batch to make room + config.Logger.Info("flushing batch due to individual batch size limit", + "current_rows", len(batch.Rows), + "incoming_rows", len(b.Rows), + "table", batch.Table) + + err := persist(ctx, batch, config) + + // Always free the memory by resetting batch, regardless of persist success + // Update buffered count: subtract old rows, will add new rows below + buffered -= len(batch.Rows) + + if err != nil { + config.Logger.Error("error persisting batch during size limit flush, data dropped", + "error", err.Error(), + "table", batch.Table, + "rows_dropped", len(batch.Rows), + "query", batch.Params.Get("query")) + } + + // Reset this batch and start fresh with the new rows + batch.Rows = b.Rows + } else { + batch.Rows = append(batch.Rows, b.Rows...) + } } + // Always add the new incoming rows to the buffer count buffered += len(b.Rows) SetBufferSize(int64(buffered)) diff --git a/apps/chproxy/main.go b/apps/chproxy/main.go index a011201377..b25375cba7 100644 --- a/apps/chproxy/main.go +++ b/apps/chproxy/main.go @@ -22,6 +22,7 @@ import ( const ( maxBufferSize int = 50000 maxBatchSize int = 10000 + maxBatchRows int = 5000 // Max rows per individual batch ) var ( @@ -154,13 +155,28 @@ func main() { "row_count", len(rows), "table", strings.Split(query, " ")[2]) - buffer <- &Batch{ + batch := &Batch{ Params: params, Rows: rows, Table: strings.Split(query, " ")[2], } - w.Write([]byte("ok")) + select { + case buffer <- batch: + // Successfully sent to buffer + w.Write([]byte("ok")) + default: + // Buffer is full, drop the message + telemetry.Metrics.ErrorCounter.Add(ctx, 1) + config.Logger.Warn("buffer full, dropping message", + "row_count", len(rows), + "table", batch.Table, + "remote_addr", r.RemoteAddr) + + span.SetStatus(codes.Error, "buffer full") + http.Error(w, "service overloaded", 529) + return + } span.SetStatus(codes.Ok, "") span.SetAttributes( attribute.Int("row_count", len(rows)),