Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apps/chproxy/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,19 @@ func startBufferProcessor(
startTime := time.Now()

telemetryConfig.Metrics.FlushCounter.Add(ctx, 1)
telemetryConfig.Metrics.FlushBatchCount.Record(ctx, int64(len(batchesByParams)))

span.SetAttributes(
attribute.Int("batch_count", len(batchesByParams)),
attribute.Int("buffered_rows", buffered),
)

for _, batch := range batchesByParams {
batchStart := time.Now()
err := persist(ctx, batch, config)
batchDuration := time.Since(batchStart).Seconds()
telemetryConfig.Metrics.BatchPersistDuration.Record(ctx, batchDuration)

if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
Expand All @@ -49,6 +54,7 @@ func startBufferProcessor(
"error", err.Error(),
"table", batch.Table,
"rows_dropped", len(batch.Rows),
"batch_duration_seconds", batchDuration,
"query", batch.Params.Get("query"),
)
}
Expand Down
28 changes: 20 additions & 8 deletions apps/chproxy/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ type TelemetryConfig struct {
Meter metric.Meter
MetricHTTPOptions []otlpmetrichttp.Option
Metrics struct {
BatchCounter metric.Int64Counter
ErrorCounter metric.Int64Counter
FlushCounter metric.Int64Counter
FlushDuration metric.Float64Histogram
RequestCounter metric.Int64Counter
RowCounter metric.Int64Counter
BatchCounter metric.Int64Counter
BatchPersistDuration metric.Float64Histogram
ErrorCounter metric.Int64Counter
FlushCounter metric.Int64Counter
FlushDuration metric.Float64Histogram
FlushBatchCount metric.Int64Histogram
RequestCounter metric.Int64Counter
RowCounter metric.Int64Counter
}
TraceHTTPOptions []otlptracehttp.Option
Tracer trace.Tracer
Expand Down Expand Up @@ -157,7 +159,7 @@ func setupTelemetry(ctx context.Context, config *Config) (*TelemetryConfig, func
//
// Initialize metrics
//
var err1, err2, err3, err4, err5, err6 error
var err1, err2, err3, err4, err5, err6, err7, err8 error
telemetryConfig.Metrics.BatchCounter, err1 = telemetryConfig.Meter.Int64Counter(
"clickhouse_batches_total",
metric.WithDescription("Total number of batches sent to Clickhouse"),
Expand Down Expand Up @@ -185,8 +187,18 @@ func setupTelemetry(ctx context.Context, config *Config) (*TelemetryConfig, func
metric.WithDescription("Total number of HTTP requests received"),
metric.WithUnit("{request}"),
)
telemetryConfig.Metrics.FlushBatchCount, err7 = telemetryConfig.Meter.Int64Histogram(
"clickhouse_flush_batch_count",
metric.WithDescription("Number of batches processed per flush operation"),
metric.WithUnit("{batch}"),
)
telemetryConfig.Metrics.BatchPersistDuration, err8 = telemetryConfig.Meter.Float64Histogram(
"clickhouse_batch_persist_duration_seconds",
metric.WithDescription("Duration of individual batch persist operations"),
metric.WithUnit("s"),
)

for _, err := range []error{err1, err2, err3, err4, err5, err6} {
for _, err := range []error{err1, err2, err3, err4, err5, err6, err7, err8} {
if err != nil {
return nil, nil, fmt.Errorf("failed to create metric: %w", err)
}
Expand Down