diff --git a/apps/chproxy/Dockerfile b/apps/chproxy/Dockerfile index 9dbb0c15d8..4a71c3dc4e 100644 --- a/apps/chproxy/Dockerfile +++ b/apps/chproxy/Dockerfile @@ -1,19 +1,15 @@ -FROM golang:1.23-alpine AS builder - +FROM golang:1.24-alpine AS builder WORKDIR /go/src/github.com/unkeyed/unkey/apps/chproxy COPY go.mod ./ -# COPY go.sum ./ -# RUN go mod download COPY . . -RUN go build -o bin/chproxy ./main.go - +RUN go build -o bin/chproxy -FROM golang:1.23-alpine +FROM golang:1.24-alpine RUN apk add --update curl WORKDIR /usr/local/bin COPY --from=builder /go/src/github.com/unkeyed/unkey/apps/chproxy/bin/chproxy . -CMD [ "/usr/local/bin/chproxy"] +CMD ["/usr/local/bin/chproxy"] diff --git a/apps/chproxy/batch.go b/apps/chproxy/batch.go new file mode 100644 index 0000000000..0e243c9edc --- /dev/null +++ b/apps/chproxy/batch.go @@ -0,0 +1,113 @@ +package main + +import ( + "context" + "fmt" + "io" + "net/http" + "net/url" + "strings" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" +) + +type Batch struct { + Rows []string + Params url.Values +} + +func persist(ctx context.Context, batch *Batch, config *Config) error { + ctx, span := telemetry.Tracer.Start(ctx, "persist_batch") + defer span.End() + + if len(batch.Rows) == 0 { + return nil + } + + telemetry.Metrics.BatchCounter.Add(ctx, 1) + telemetry.Metrics.RowCounter.Add(ctx, int64(len(batch.Rows))) + + span.SetAttributes( + attribute.Int("rows", len(batch.Rows)), + attribute.String("query", batch.Params.Get("query")), + ) + + u, err := url.Parse(config.ClickhouseURL) + if err != nil { + telemetry.Metrics.ErrorCounter.Add(ctx, 1) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + u.RawQuery = batch.Params.Encode() + + req, err := http.NewRequestWithContext(ctx, "POST", u.String(), strings.NewReader(strings.Join(batch.Rows, "\n"))) + if err != nil { + telemetry.Metrics.ErrorCounter.Add(ctx, 1) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + req.Header.Add("Content-Type", "text/plain") + + username := u.User.Username() + + password, ok := u.User.Password() + if !ok { + err := fmt.Errorf("password not set") + telemetry.Metrics.ErrorCounter.Add(ctx, 1) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + req.SetBasicAuth(username, password) + + res, err := httpClient.Do(req) + if err != nil { + telemetry.Metrics.ErrorCounter.Add(ctx, 1) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + telemetry.Metrics.ErrorCounter.Add(ctx, 1) + body, err := io.ReadAll(res.Body) + if err != nil { + config.Logger.Error("error reading body", + "error", err) + + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + errorMsg := string(body) + + config.Logger.Error("unable to persist batch", + "response", errorMsg, + "status_code", res.StatusCode, + "query", batch.Params.Get("query")) + + span.SetStatus(codes.Error, errorMsg) + span.RecordError(fmt.Errorf("HTTP %d: %s", res.StatusCode, errorMsg)) + + return fmt.Errorf("http error: %v", errorMsg) + } + + config.Logger.Info("rows persisted", + "count", len(batch.Rows), + "query", batch.Params.Get("query")) + span.SetStatus(codes.Ok, "") + span.SetAttributes( + attribute.String("result", "successfully sent to Clickhouse"), + attribute.Int("rows_processed", len(batch.Rows)), + ) + + return nil +} diff --git a/apps/chproxy/buffer.go b/apps/chproxy/buffer.go new file mode 100644 index 0000000000..5a64e90ac2 --- /dev/null +++ b/apps/chproxy/buffer.go @@ -0,0 +1,127 @@ +package main + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +// startBufferProcessor manages processing of data sent to Clickhouse. +// Returns a channel that signals when all pending batches have been processed during shutdown +func startBufferProcessor( + ctx context.Context, + buffer <-chan *Batch, + config *Config, + telemetryConfig *TelemetryConfig, +) <-chan bool { + done := make(chan bool) + + go func() { + buffered := 0 + batchesByParams := make(map[string]*Batch) + ticker := time.NewTicker(config.FlushInterval) + defer ticker.Stop() + + tickerCount := 0 + + flushAndReset := func(ctx context.Context, reason string) { + ctx, span := telemetryConfig.Tracer.Start(ctx, "flush_batches") + defer span.End() + + startTime := time.Now() + + // Record metrics + telemetryConfig.Metrics.FlushCounter.Add(ctx, 1, metric.WithAttributes(attribute.String("reason", reason))) + + span.SetAttributes( + attribute.Int("batch_count", len(batchesByParams)), + attribute.Int("buffered_rows", buffered), + attribute.String("reason", reason), + ) + + // We'll sample logs for ticker-based flushes + shouldLog := true + if reason == "ticker" { + tickerCount++ + // Only log every LOG_TICKER_SAMPLE_RATE times + shouldLog = (tickerCount%LOG_TICKER_SAMPLE_RATE == 0) + } + + // Only log if we should based on sampling + if shouldLog { + config.Logger.Info("flushing batches", + "reason", reason, + "batch_count", len(batchesByParams), + "buffered_rows", buffered) + } + + for _, batch := range batchesByParams { + err := persist(ctx, batch, config) + if err != nil { + // Always log errors regardless of sampling + config.Logger.Error("error flushing batch", + "error", err.Error(), + "query", batch.Params.Get("query")) + } + } + + duration := time.Since(startTime).Seconds() + telemetryConfig.Metrics.FlushDuration.Record(ctx, duration, + metric.WithAttributes(attribute.String("reason", reason))) + + span.SetAttributes(attribute.Float64("duration_seconds", duration)) + + buffered = 0 + SetBufferSize(0) + batchesByParams = make(map[string]*Batch) + } + + for { + select { + case <-ctx.Done(): + config.Logger.Info("context cancelled, flushing remaining batches", + "buffered_rows", buffered, + "elapsed_time", config.FlushInterval.String()) + flushAndReset(ctx, "shutdown") + done <- true + return + case b, ok := <-buffer: + if !ok { + config.Logger.Info("buffer channel closed, flushing remaining batches") + flushAndReset(ctx, "shutdown") + done <- true + return + } + + params := b.Params.Encode() + batch, ok := batchesByParams[params] + if !ok { + batchesByParams[params] = b + config.Logger.Debug("new batch type received", + "query", b.Params.Get("query")) + } else { + batch.Rows = append(batch.Rows, b.Rows...) + } + + buffered += len(b.Rows) + SetBufferSize(int64(buffered)) + + if buffered >= config.MaxBatchSize { + config.Logger.Info("flushing due to max batch size", + "buffered_rows", buffered, + "max_size", config.MaxBatchSize) + flushAndReset(ctx, "max_size") + } + case <-ticker.C: + config.Logger.Info("flushing on ticker", + "buffered_rows", buffered, + "elapsed_time", config.FlushInterval.String()) + flushAndReset(ctx, "ticker") + } + } + }() + + return done +} diff --git a/apps/chproxy/config.go b/apps/chproxy/config.go index 27a0fc35ec..4f0c2cc791 100644 --- a/apps/chproxy/config.go +++ b/apps/chproxy/config.go @@ -8,6 +8,7 @@ import ( ) type Config struct { + LogDebug bool Logger *slog.Logger BasicAuth string ClickhouseURL string @@ -22,12 +23,13 @@ type Config struct { func LoadConfig() (*Config, error) { // New config with defaults config := &Config{ - FlushInterval: time.Second * 3, + LogDebug: false, + FlushInterval: time.Second * 5, ListenerPort: "7123", MaxBatchSize: 10000, MaxBufferSize: 50000, ServiceName: "chproxy", - ServiceVersion: "1.1.0", + ServiceVersion: "1.2.0", } config.ClickhouseURL = os.Getenv("CLICKHOUSE_URL") @@ -40,6 +42,10 @@ func LoadConfig() (*Config, error) { return nil, fmt.Errorf("BASIC_AUTH must be defined") } + if debug := os.Getenv("OTEL_EXPORTER_LOG_DEBUG"); debug == "true" { + config.LogDebug = true + } + if port := os.Getenv("PORT"); port != "" { config.ListenerPort = port } diff --git a/apps/chproxy/go.mod b/apps/chproxy/go.mod index a7b634462b..72e83e5b32 100644 --- a/apps/chproxy/go.mod +++ b/apps/chproxy/go.mod @@ -4,6 +4,7 @@ go 1.24.0 require ( go.opentelemetry.io/contrib/bridges/otelslog v0.10.0 + go.opentelemetry.io/contrib/processors/minsev v0.8.0 go.opentelemetry.io/otel v1.35.0 go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.11.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.35.0 diff --git a/apps/chproxy/go.sum b/apps/chproxy/go.sum index 0e798a504e..27bcfde4f5 100644 --- a/apps/chproxy/go.sum +++ b/apps/chproxy/go.sum @@ -23,6 +23,8 @@ go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJyS go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= go.opentelemetry.io/contrib/bridges/otelslog v0.10.0 h1:lRKWBp9nWoBe1HKXzc3ovkro7YZSb72X2+3zYNxfXiU= go.opentelemetry.io/contrib/bridges/otelslog v0.10.0/go.mod h1:D+iyUv/Wxbw5LUDO5oh7x744ypftIryiWjoj42I6EKs= +go.opentelemetry.io/contrib/processors/minsev v0.8.0 h1:/i0gaV0Z174Twy1/NfgQoE+oQvFVbQItNl8UMwe62Jc= +go.opentelemetry.io/contrib/processors/minsev v0.8.0/go.mod h1:5siKBWhXmdM2gNh8KHZ4b97bdS4MYhqPJEEu6JtHciw= go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ= go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y= go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.11.0 h1:C/Wi2F8wEmbxJ9Kuzw/nhP+Z9XaHYMkyDmXy6yR2cjw= diff --git a/apps/chproxy/main.go b/apps/chproxy/main.go index 02f41a73df..92168ee92f 100644 --- a/apps/chproxy/main.go +++ b/apps/chproxy/main.go @@ -8,16 +8,15 @@ import ( "log" "log/slog" "net/http" - "net/url" "os" "os/signal" "strings" + "sync" "syscall" "time" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/metric" ) const ( @@ -25,99 +24,26 @@ const ( ) var ( - telemetry *TelemetryConfig + telemetry *TelemetryConfig + inFlight sync.WaitGroup // incoming requests + httpClient *http.Client // shared http client ) -type Batch struct { - Rows []string - Params url.Values -} - -func persist(ctx context.Context, batch *Batch, config *Config) error { - ctx, span := telemetry.Tracer.Start(ctx, "persist_batch") - defer span.End() - - if len(batch.Rows) == 0 { - return nil - } - - telemetry.Metrics.BatchCounter.Add(ctx, 1) - telemetry.Metrics.RowCounter.Add(ctx, int64(len(batch.Rows))) - - span.SetAttributes( - attribute.Int("rows", len(batch.Rows)), - attribute.String("query", batch.Params.Get("query")), - ) - - u, err := url.Parse(config.ClickhouseURL) - if err != nil { - telemetry.Metrics.ErrorCounter.Add(ctx, 1) - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - return err - } - - u.RawQuery = batch.Params.Encode() - - req, err := http.NewRequestWithContext(ctx, "POST", u.String(), strings.NewReader(strings.Join(batch.Rows, "\n"))) - if err != nil { - telemetry.Metrics.ErrorCounter.Add(ctx, 1) - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - return err - } - req.Header.Add("Content-Type", "text/plain") - username := u.User.Username() - password, ok := u.User.Password() - if !ok { - err := fmt.Errorf("password not set") - telemetry.Metrics.ErrorCounter.Add(ctx, 1) - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - return err - } - req.SetBasicAuth(username, password) - - client := http.Client{} - res, err := client.Do(req) - if err != nil { - telemetry.Metrics.ErrorCounter.Add(ctx, 1) - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - return err - } - defer res.Body.Close() - - if res.StatusCode == http.StatusOK { - config.Logger.Info("rows persisted", - "count", len(batch.Rows), - "query", batch.Params.Get("query")) - } else { - telemetry.Metrics.ErrorCounter.Add(ctx, 1) - body, err := io.ReadAll(res.Body) - if err != nil { - span.RecordError(err) - return err - } - - errorMsg := string(body) - span.SetStatus(codes.Error, errorMsg) - span.RecordError(fmt.Errorf("HTTP %d: %s", res.StatusCode, errorMsg)) - - config.Logger.Error("unable to persist batch", - "response", errorMsg, - "status_code", res.StatusCode, - "query", batch.Params.Get("query")) - } - return nil -} - func main() { config, err := LoadConfig() if err != nil { log.Fatalf("failed to load configuration: %v", err) } + httpClient = &http.Client{ + Timeout: 10 * time.Second, + Transport: &http.Transport{ + MaxIdleConns: 25, + MaxIdleConnsPerHost: 25, + IdleConnTimeout: 60 * time.Second, + }, + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -136,6 +62,7 @@ func main() { if telemetry != nil && telemetry.LogHandler != nil { config.Logger = slog.New(telemetry.LogHandler) + slog.SetDefault(config.Logger) } @@ -147,117 +74,32 @@ func main() { requiredAuthorization := "Basic " + base64.StdEncoding.EncodeToString([]byte(config.BasicAuth)) buffer := make(chan *Batch, config.MaxBufferSize) - // blocks until we've persisted everything and the process may stop - done := make(chan bool) - - go func() { - buffered := 0 - batchesByParams := make(map[string]*Batch) - ticker := time.NewTicker(config.FlushInterval) - tickerCount := 0 - - flushAndReset := func(ctx context.Context, reason string) { - ctx, span := telemetry.Tracer.Start(ctx, "flush_batches") - defer span.End() - - startTime := time.Now() - - // Record metrics - telemetry.Metrics.FlushCounter.Add(ctx, 1, metric.WithAttributes(attribute.String("reason", reason))) - - span.SetAttributes( - attribute.Int("batch_count", len(batchesByParams)), - attribute.Int("buffered_rows", buffered), - attribute.String("reason", reason), - ) - - // We'll sample logs for ticker-based flushes - shouldLog := true - if reason == "ticker" { - tickerCount++ - // Only log every LOG_TICKER_SAMPLE_RATE times - shouldLog = (tickerCount%LOG_TICKER_SAMPLE_RATE == 0) - } - - // Only log if we should based on sampling - if shouldLog { - config.Logger.Info("flushing batches", - "reason", reason, - "batch_count", len(batchesByParams), - "buffered_rows", buffered) - } - - for _, batch := range batchesByParams { - err := persist(ctx, batch, config) - if err != nil { - // Always log errors regardless of sampling - config.Logger.Error("error flushing batch", - "error", err.Error(), - "query", batch.Params.Get("query")) - } - } - - duration := time.Since(startTime).Seconds() - telemetry.Metrics.FlushDuration.Record(ctx, duration, - metric.WithAttributes(attribute.String("reason", reason))) - - span.SetAttributes(attribute.Float64("duration_seconds", duration)) - - buffered = 0 - SetBufferSize(0) - batchesByParams = make(map[string]*Batch) - ticker.Reset(config.FlushInterval) - } - for { - select { - case b, ok := <-buffer: - if !ok { - config.Logger.Info("buffer channel closed, flushing remaining batches") - flushAndReset(ctx, "shutdown") - done <- true - return - } - - params := b.Params.Encode() - batch, ok := batchesByParams[params] - if !ok { - batchesByParams[params] = b - config.Logger.Debug("new batch type received", - "query", b.Params.Get("query")) - } else { - batch.Rows = append(batch.Rows, b.Rows...) - } - - buffered += len(b.Rows) - SetBufferSize(int64(buffered)) - - if buffered >= config.MaxBatchSize { - config.Logger.Info("flushing due to max batch size", - "buffered_rows", buffered, - "max_size", config.MaxBatchSize) - flushAndReset(ctx, "max_size") - } - case <-ticker.C: - config.Logger.Debug("flushing due to max batch size", - "buffered_rows", buffered, - "max_size", config.MaxBatchSize) - flushAndReset(ctx, "ticker") - } - } - }() + // blocks until we've persisted everything and the process may stop + done := startBufferProcessor(ctx, buffer, config, telemetry) http.HandleFunc("/v1/liveness", func(w http.ResponseWriter, r *http.Request) { _, span := telemetry.Tracer.Start(r.Context(), "liveness_check") defer span.End() + span.SetAttributes( + attribute.String("method", r.Method), + attribute.String("path", r.URL.Path), + ) + w.Write([]byte("ok")) + span.SetStatus(codes.Ok, "") }) http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + inFlight.Add(1) + defer inFlight.Done() + ctx, span := telemetry.Tracer.Start(r.Context(), "handle_request") defer span.End() + telemetry.Metrics.RequestCounter.Add(ctx, 1) + span.SetAttributes( attribute.String("method", r.Method), attribute.String("path", r.URL.Path), @@ -266,11 +108,11 @@ func main() { if r.Header.Get("Authorization") != requiredAuthorization { telemetry.Metrics.ErrorCounter.Add(ctx, 1) - log.Println("invalid authorization header, expected", requiredAuthorization, r.Header.Get("Authorization")) config.Logger.Error("invalid authorization header", "remote_addr", r.RemoteAddr, "user_agent", r.UserAgent()) + span.RecordError(err) span.SetStatus(codes.Error, "unauthorized") http.Error(w, "unauthorized", http.StatusUnauthorized) return @@ -281,7 +123,7 @@ func main() { if query == "" || !strings.HasPrefix(strings.ToLower(query), "insert into") { telemetry.Metrics.ErrorCounter.Add(ctx, 1) - config.Logger.Warn("Invalid query", + config.Logger.Warn("invalid query", "query", query, "remote_addr", r.RemoteAddr) @@ -307,8 +149,6 @@ func main() { } rows := strings.Split(string(body), "\n") - span.SetAttributes(attribute.Int("row_count", len(rows))) - config.Logger.Debug("received insert request", "row_count", len(rows), "table", strings.Split(query, " ")[2]) @@ -319,6 +159,11 @@ func main() { } w.Write([]byte("ok")) + span.SetStatus(codes.Ok, "") + span.SetAttributes( + attribute.Int("row_count", len(rows)), + attribute.String("table", strings.Split(query, " ")[2]), + ) }) // Setup signal handling @@ -343,6 +188,13 @@ func main() { shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) defer shutdownCancel() + // Start a goroutine to track in-flight requests + shutdownComplete := make(chan struct{}) + go func() { + inFlight.Wait() + close(shutdownComplete) + }() + // Attempt graceful shutdown config.Logger.Info("shutting down server") if err := server.Shutdown(shutdownCtx); err != nil { diff --git a/apps/chproxy/otel.go b/apps/chproxy/otel.go index 563301fb29..ab0e454bbc 100644 --- a/apps/chproxy/otel.go +++ b/apps/chproxy/otel.go @@ -4,9 +4,11 @@ import ( "context" "fmt" "log/slog" + "sync/atomic" "time" "go.opentelemetry.io/contrib/bridges/otelslog" + "go.opentelemetry.io/contrib/processors/minsev" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" @@ -26,11 +28,12 @@ type TelemetryConfig struct { Meter metric.Meter MetricHTTPOptions []otlpmetrichttp.Option Metrics struct { - BatchCounter metric.Int64Counter - ErrorCounter metric.Int64Counter - FlushCounter metric.Int64Counter - FlushDuration metric.Float64Histogram - RowCounter metric.Int64Counter + BatchCounter metric.Int64Counter + ErrorCounter metric.Int64Counter + FlushCounter metric.Int64Counter + FlushDuration metric.Float64Histogram + RequestCounter metric.Int64Counter + RowCounter metric.Int64Counter } TraceHTTPOptions []otlptracehttp.Option Tracer trace.Tracer @@ -40,12 +43,12 @@ var currentBufferSize int64 // SetBufferSize updates the current buffer size for metrics func SetBufferSize(size int64) { - currentBufferSize = size + atomic.StoreInt64(¤tBufferSize, size) } // GetBufferSize returns the current buffer size func GetBufferSize() int64 { - return currentBufferSize + return atomic.LoadInt64(¤tBufferSize) } // SetupTelemetry initializes OTEL tracing, metrics, and logging @@ -80,24 +83,33 @@ func setupTelemetry(ctx context.Context, config *Config) (*TelemetryConfig, func otel.SetMeterProvider(meterProvider) telemetryConfig.Meter = meterProvider.Meter(config.ServiceName) - logExporter, err := otlploghttp.New(ctx) + // Configure OTLP log handler + logExporter, err := otlploghttp.New(ctx, + otlploghttp.WithCompression(otlploghttp.GzipCompression), + ) if err != nil { return nil, nil, fmt.Errorf("failed to create log exporter: %w", err) } + var processor sdklog.Processor = sdklog.NewBatchProcessor(logExporter, sdklog.WithExportBufferSize(512)) + + processor = minsev.NewLogProcessor(processor, minsev.SeverityInfo) + + if config.LogDebug { + processor = minsev.NewLogProcessor(processor, minsev.SeverityDebug) + } + logProvider := sdklog.NewLoggerProvider( sdklog.WithResource(res), - sdklog.WithProcessor( - sdklog.NewBatchProcessor(logExporter), - ), + sdklog.WithProcessor(processor), ) - otlpHandler := otelslog.NewHandler( + otlpLogHandler := otelslog.NewHandler( config.ServiceName, otelslog.WithLoggerProvider(logProvider), ) - telemetryConfig.LogHandler = otlpHandler + telemetryConfig.LogHandler = otlpLogHandler // Configure tracer traceExporter, err := otlptracehttp.New(ctx) @@ -113,7 +125,7 @@ func setupTelemetry(ctx context.Context, config *Config) (*TelemetryConfig, func telemetryConfig.Tracer = traceProvider.Tracer(config.ServiceName) // Initialize metrics - var err1, err2, err3, err4, err5 error + var err1, err2, err3, err4, err5, err6 error telemetryConfig.Metrics.BatchCounter, err1 = telemetryConfig.Meter.Int64Counter( "clickhouse_batches_total", metric.WithDescription("Total number of batches sent to Clickhouse"), @@ -136,8 +148,13 @@ func setupTelemetry(ctx context.Context, config *Config) (*TelemetryConfig, func metric.WithDescription("Duration of flush operations"), metric.WithUnit("s"), ) + telemetryConfig.Metrics.RequestCounter, err6 = telemetryConfig.Meter.Int64Counter( + "clickhouse_http_requests_total", + metric.WithDescription("Total number of HTTP requests received"), + metric.WithUnit("{request}"), + ) - for _, err := range []error{err1, err2, err3, err4, err5} { + for _, err := range []error{err1, err2, err3, err4, err5, err6} { if err != nil { return nil, nil, fmt.Errorf("failed to create metric: %w", err) }