diff --git a/apps/chproxy/buffer.go b/apps/chproxy/buffer.go index 00dedaef8e..d756eb2077 100644 --- a/apps/chproxy/buffer.go +++ b/apps/chproxy/buffer.go @@ -33,6 +33,7 @@ func startBufferProcessor( startTime := time.Now() telemetryConfig.Metrics.FlushCounter.Add(ctx, 1) + telemetryConfig.Metrics.FlushBatchCount.Record(ctx, int64(len(batchesByParams))) span.SetAttributes( attribute.Int("batch_count", len(batchesByParams)), @@ -40,7 +41,11 @@ func startBufferProcessor( ) for _, batch := range batchesByParams { + batchStart := time.Now() err := persist(ctx, batch, config) + batchDuration := time.Since(batchStart).Seconds() + telemetryConfig.Metrics.BatchPersistDuration.Record(ctx, batchDuration) + if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) @@ -49,6 +54,7 @@ func startBufferProcessor( "error", err.Error(), "table", batch.Table, "rows_dropped", len(batch.Rows), + "batch_duration_seconds", batchDuration, "query", batch.Params.Get("query"), ) } diff --git a/apps/chproxy/otel.go b/apps/chproxy/otel.go index 2dcea90d77..d38f992997 100644 --- a/apps/chproxy/otel.go +++ b/apps/chproxy/otel.go @@ -28,12 +28,14 @@ type TelemetryConfig struct { Meter metric.Meter MetricHTTPOptions []otlpmetrichttp.Option Metrics struct { - BatchCounter metric.Int64Counter - ErrorCounter metric.Int64Counter - FlushCounter metric.Int64Counter - FlushDuration metric.Float64Histogram - RequestCounter metric.Int64Counter - RowCounter metric.Int64Counter + BatchCounter metric.Int64Counter + BatchPersistDuration metric.Float64Histogram + ErrorCounter metric.Int64Counter + FlushCounter metric.Int64Counter + FlushDuration metric.Float64Histogram + FlushBatchCount metric.Int64Histogram + RequestCounter metric.Int64Counter + RowCounter metric.Int64Counter } TraceHTTPOptions []otlptracehttp.Option Tracer trace.Tracer @@ -157,7 +159,7 @@ func setupTelemetry(ctx context.Context, config *Config) (*TelemetryConfig, func // // Initialize metrics // - var err1, err2, err3, err4, err5, err6 error + var err1, err2, err3, err4, err5, err6, err7, err8 error telemetryConfig.Metrics.BatchCounter, err1 = telemetryConfig.Meter.Int64Counter( "clickhouse_batches_total", metric.WithDescription("Total number of batches sent to Clickhouse"), @@ -185,8 +187,18 @@ func setupTelemetry(ctx context.Context, config *Config) (*TelemetryConfig, func metric.WithDescription("Total number of HTTP requests received"), metric.WithUnit("{request}"), ) + telemetryConfig.Metrics.FlushBatchCount, err7 = telemetryConfig.Meter.Int64Histogram( + "clickhouse_flush_batch_count", + metric.WithDescription("Number of batches processed per flush operation"), + metric.WithUnit("{batch}"), + ) + telemetryConfig.Metrics.BatchPersistDuration, err8 = telemetryConfig.Meter.Float64Histogram( + "clickhouse_batch_persist_duration_seconds", + metric.WithDescription("Duration of individual batch persist operations"), + metric.WithUnit("s"), + ) - for _, err := range []error{err1, err2, err3, err4, err5, err6} { + for _, err := range []error{err1, err2, err3, err4, err5, err6, err7, err8} { if err != nil { return nil, nil, fmt.Errorf("failed to create metric: %w", err) }