diff --git a/apps/chproxy/.gitignore b/apps/chproxy/.gitignore deleted file mode 100644 index b90a78d60c..0000000000 --- a/apps/chproxy/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -chproxy -coverage.out diff --git a/apps/chproxy/Dockerfile b/apps/chproxy/Dockerfile deleted file mode 100644 index 4a71c3dc4e..0000000000 --- a/apps/chproxy/Dockerfile +++ /dev/null @@ -1,15 +0,0 @@ -FROM golang:1.24-alpine AS builder - -WORKDIR /go/src/github.com/unkeyed/unkey/apps/chproxy -COPY go.mod ./ - -COPY . . -RUN go build -o bin/chproxy - -FROM golang:1.24-alpine -RUN apk add --update curl - -WORKDIR /usr/local/bin -COPY --from=builder /go/src/github.com/unkeyed/unkey/apps/chproxy/bin/chproxy . - -CMD ["/usr/local/bin/chproxy"] diff --git a/apps/chproxy/README.md b/apps/chproxy/README.md deleted file mode 100644 index 08c3beff88..0000000000 --- a/apps/chproxy/README.md +++ /dev/null @@ -1 +0,0 @@ -Read more: [https://engineering.unkey.com/docs/architecture/clickhouse-proxy](https://engineering.unkey.com/docs/architecture/clickhouse-proxy) diff --git a/apps/chproxy/batch.go b/apps/chproxy/batch.go deleted file mode 100644 index 7b3e6aae3a..0000000000 --- a/apps/chproxy/batch.go +++ /dev/null @@ -1,110 +0,0 @@ -package main - -import ( - "context" - "fmt" - "io" - "net/http" - "net/url" - "strings" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" -) - -type Batch struct { - Rows []string - Params url.Values - Table string -} - -func persist(ctx context.Context, batch *Batch, config *Config) error { - ctx, span := telemetry.Tracer.Start(ctx, batch.Table) - defer span.End() - - if len(batch.Rows) == 0 { - return nil - } - - telemetry.Metrics.BatchCounter.Add(ctx, 1) - telemetry.Metrics.RowCounter.Add(ctx, int64(len(batch.Rows))) - - span.SetAttributes( - attribute.Int("rows", len(batch.Rows)), - ) - - u, err := url.Parse(config.ClickhouseURL) - if err != nil { - telemetry.Metrics.ErrorCounter.Add(ctx, 1) - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - return err - } - - u.RawQuery = batch.Params.Encode() - - req, err := http.NewRequestWithContext(ctx, "POST", u.String(), strings.NewReader(strings.Join(batch.Rows, "\n"))) - if err != nil { - telemetry.Metrics.ErrorCounter.Add(ctx, 1) - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - return err - } - - req.Header.Add("Content-Type", "text/plain") - - username := u.User.Username() - - password, ok := u.User.Password() - if !ok { - err := fmt.Errorf("password not set") - telemetry.Metrics.ErrorCounter.Add(ctx, 1) - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - return err - } - - req.SetBasicAuth(username, password) - - res, err := httpClient.Do(req) - if err != nil { - telemetry.Metrics.ErrorCounter.Add(ctx, 1) - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - return err - } - defer res.Body.Close() - - if res.StatusCode != http.StatusOK { - telemetry.Metrics.ErrorCounter.Add(ctx, 1) - body, err := io.ReadAll(res.Body) - if err != nil { - config.Logger.Error("error reading body", - "error", err) - - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - return err - } - - errorMsg := string(body) - - config.Logger.Error("unable to persist batch", - "response", errorMsg, - "status_code", res.StatusCode, - "query", batch.Params.Get("query")) - - span.SetStatus(codes.Error, errorMsg) - span.RecordError(fmt.Errorf("HTTP %d: %s", res.StatusCode, errorMsg)) - - return fmt.Errorf("http error: %v", errorMsg) - } - - config.Logger.Info("rows persisted", - "count", len(batch.Rows), - "table", batch.Table) - - span.SetStatus(codes.Ok, "") - - return nil -} diff --git a/apps/chproxy/buffer.go b/apps/chproxy/buffer.go deleted file mode 100644 index 72b12e4c15..0000000000 --- a/apps/chproxy/buffer.go +++ /dev/null @@ -1,154 +0,0 @@ -package main - -import ( - "context" - "fmt" - "sync" - "time" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" -) - -// startBufferProcessor manages processing of data sent to Clickhouse. -// Returns a channel that signals when all pending batches have been processed during shutdown -func startBufferProcessor( - ctx context.Context, - buffer <-chan *Batch, - config *Config, - telemetryConfig *TelemetryConfig, -) <-chan bool { - done := make(chan bool) - - go func() { - buffered := 0 - batchesByParams := make(map[string]*Batch) - ticker := time.NewTicker(config.FlushInterval) - defer ticker.Stop() - - flushAndReset := func(ctx context.Context, reason string) { - reason = fmt.Sprintf("flush_batches.%s", reason) - ctx, span := telemetryConfig.Tracer.Start(ctx, reason) - defer span.End() - - startTime := time.Now() - - telemetryConfig.Metrics.FlushCounter.Add(ctx, 1) - telemetryConfig.Metrics.FlushBatchCount.Record(ctx, int64(len(batchesByParams))) - - span.SetAttributes( - attribute.Int("batch_count", len(batchesByParams)), - attribute.Int("buffered_rows", buffered), - ) - - // Process batches in parallel for better throughput - var wg sync.WaitGroup - for _, batch := range batchesByParams { - wg.Add(1) - go func(b *Batch) { - defer wg.Done() - - batchStart := time.Now() - err := persist(ctx, b, config) - batchDuration := time.Since(batchStart).Seconds() - telemetryConfig.Metrics.BatchPersistDuration.Record(ctx, batchDuration) - - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - - config.Logger.Error("error persisting batch, data dropped", - "error", err.Error(), - "table", b.Table, - "rows_dropped", len(b.Rows), - "batch_duration_seconds", batchDuration, - "query", b.Params.Get("query"), - ) - } - }(batch) - } - wg.Wait() // Wait for all parallel persists to complete - - duration := time.Since(startTime).Seconds() - telemetryConfig.Metrics.FlushDuration.Record(ctx, duration) - - span.SetAttributes(attribute.Float64("duration_seconds", duration)) - span.SetStatus(codes.Ok, "") - - buffered = 0 - SetBufferSize(0) - batchesByParams = make(map[string]*Batch) - } - - for { - select { - case <-ctx.Done(): - config.Logger.Info("context cancelled, flushing remaining batches") - flushAndReset(ctx, "context_cancelled") - done <- true - return - case b, ok := <-buffer: - if !ok { - config.Logger.Info("buffer channel closed, flushing remaining batches") - flushAndReset(ctx, "buffer_closed") - done <- true - return - } - - params := b.Params.Encode() - batch, ok := batchesByParams[params] - if !ok { - batchesByParams[params] = b - config.Logger.Info("new batch type received", - "query", b.Params.Get("query"), - "table", b.Table, - "params", params, - "total_batch_types", len(batchesByParams)) - } else { - // Check if adding these rows would exceed the per-batch limit - if len(batch.Rows)+len(b.Rows) > maxBatchRows { - // Flush the current batch to make room - config.Logger.Info("flushing batch due to individual batch size limit", - "current_rows", len(batch.Rows), - "incoming_rows", len(b.Rows), - "table", batch.Table) - - err := persist(ctx, batch, config) - - // Always free the memory by resetting batch, regardless of persist success - // Update buffered count: subtract old rows, will add new rows below - buffered -= len(batch.Rows) - - if err != nil { - config.Logger.Error("error persisting batch during size limit flush, data dropped", - "error", err.Error(), - "table", batch.Table, - "rows_dropped", len(batch.Rows), - "query", batch.Params.Get("query")) - } - - // Reset this batch and start fresh with the new rows - batch.Rows = b.Rows - } else { - batch.Rows = append(batch.Rows, b.Rows...) - } - } - - // Always add the new incoming rows to the buffer count - buffered += len(b.Rows) - SetBufferSize(int64(buffered)) - - if buffered >= maxBatchSize { - config.Logger.Info("flushing due to max batch size") - flushAndReset(ctx, "max_batch_size") - } - case <-ticker.C: - config.Logger.Info("flushing on ticker", - "buffered_rows", buffered) - flushAndReset(ctx, "ticker") - } - } - }() - - return done -} diff --git a/apps/chproxy/config.go b/apps/chproxy/config.go deleted file mode 100644 index e453e52354..0000000000 --- a/apps/chproxy/config.go +++ /dev/null @@ -1,70 +0,0 @@ -package main - -import ( - "fmt" - "log/slog" - "os" - "strconv" - "time" -) - -type Config struct { - BasicAuth string - ClickhouseURL string - FlushInterval time.Duration - ListenerPort string - LogDebug bool - Logger *slog.Logger - ServiceName string - ServiceVersion string - TraceMaxBatchSize int - TraceSampleRate float64 -} - -func LoadConfig() (*Config, error) { - // Defaults set are for production use. - // Configure to your liking for development/testing - config := &Config{ - FlushInterval: time.Second * 5, - ListenerPort: "7123", - LogDebug: false, - ServiceName: "chproxy", - ServiceVersion: "1.3.2", - TraceMaxBatchSize: 512, - TraceSampleRate: 0.25, // Sample 25% - } - - // Generic logger - config.Logger = slog.New(slog.NewTextHandler(os.Stdout, nil)) - - config.ClickhouseURL = os.Getenv("CLICKHOUSE_URL") - if config.ClickhouseURL == "" { - return nil, fmt.Errorf("CLICKHOUSE_URL must be defined") - } - - config.BasicAuth = os.Getenv("BASIC_AUTH") - if config.BasicAuth == "" { - return nil, fmt.Errorf("BASIC_AUTH must be defined") - } - - if debug := os.Getenv("OTEL_EXPORTER_LOG_DEBUG"); debug == "true" { - config.LogDebug = true - } - - if port := os.Getenv("PORT"); port != "" { - config.ListenerPort = port - } - - if sampleRateStr := os.Getenv("OTEL_TRACE_SAMPLE_RATE"); sampleRateStr != "" { - sampleRate, err := strconv.ParseFloat(sampleRateStr, 64) - if err != nil { - return nil, fmt.Errorf("invalid TRACE_SAMPLE_RATE: %w", err) - } - if sampleRate < 0.0 || sampleRate > 1.0 { - return nil, fmt.Errorf("OTEL_TRACE_SAMPLE_RATE must be between 0.0 and 1.0") - } - config.TraceSampleRate = sampleRate - } - - return config, nil -} diff --git a/apps/chproxy/config_test.go b/apps/chproxy/config_test.go deleted file mode 100644 index cf1fcf3ab1..0000000000 --- a/apps/chproxy/config_test.go +++ /dev/null @@ -1,244 +0,0 @@ -package main - -import ( - "os" - "testing" - "time" -) - -func TestLoadConfig(t *testing.T) { - // Setup and cleanup for all tests - setupEnv := func(t *testing.T, envs map[string]string) { - t.Helper() - for k, v := range envs { - os.Setenv(k, v) - } - } - - cleanEnv := func(t *testing.T, keys []string) { - t.Helper() - for _, k := range keys { - os.Unsetenv(k) - } - } - - // Required env vars that need to be cleaned up - envKeys := []string{ - "CLICKHOUSE_URL", - "BASIC_AUTH", - "PORT", - "OTEL_EXPORTER_LOG_DEBUG", - "OTEL_TRACE_SAMPLE_RATE", - } - - // Cleanup after all tests in this function - t.Cleanup(func() { - cleanEnv(t, envKeys) - }) - - // Test for valid config with default values - t.Run("ValidConfig", func(t *testing.T) { - // Set required environment variables - setupEnv(t, map[string]string{ - "CLICKHOUSE_URL": "http://localhost:8123", - "BASIC_AUTH": "user:password", - }) - - config, err := LoadConfig() - if err != nil { - t.Fatalf("LoadConfig() error = %v, wantErr = false", err) - } - - // Use a table-driven check for expected values - tests := []struct { - name string - got interface{} - expected interface{} - }{ - {"FlushInterval", config.FlushInterval, time.Second * 5}, - {"ListenerPort", config.ListenerPort, "7123"}, - {"LogDebug", config.LogDebug, false}, - {"ServiceName", config.ServiceName, "chproxy"}, - {"ServiceVersion", config.ServiceVersion, "1.3.1"}, - {"TraceMaxBatchSize", config.TraceMaxBatchSize, 512}, - {"TraceSampleRate", config.TraceSampleRate, 0.25}, - {"ClickhouseURL", config.ClickhouseURL, "http://localhost:8123"}, - {"BasicAuth", config.BasicAuth, "user:password"}, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - if tc.got != tc.expected { - t.Errorf("%s = %v, want %v", tc.name, tc.got, tc.expected) - } - }) - } - - if config.Logger == nil { - t.Error("Logger is nil, want non-nil") - } - }) - - // Test for error cases - t.Run("ErrorCases", func(t *testing.T) { - tests := []struct { - name string - envs map[string]string - wantErr string - }{ - { - name: "MissingClickhouseURL", - envs: map[string]string{ - "BASIC_AUTH": "user:password", - }, - wantErr: "CLICKHOUSE_URL must be defined", - }, - { - name: "MissingBasicAuth", - envs: map[string]string{ - "CLICKHOUSE_URL": "http://localhost:8123", - }, - wantErr: "BASIC_AUTH must be defined", - }, - { - name: "InvalidSampleRate", - envs: map[string]string{ - "CLICKHOUSE_URL": "http://localhost:8123", - "BASIC_AUTH": "user:password", - "OTEL_TRACE_SAMPLE_RATE": "invalid", - }, - wantErr: "invalid TRACE_SAMPLE_RATE", - }, - { - name: "SampleRateTooLow", - envs: map[string]string{ - "CLICKHOUSE_URL": "http://localhost:8123", - "BASIC_AUTH": "user:password", - "OTEL_TRACE_SAMPLE_RATE": "-0.1", - }, - wantErr: "OTEL_TRACE_SAMPLE_RATE must be between 0.0 and 1.0", - }, - { - name: "SampleRateTooHigh", - envs: map[string]string{ - "CLICKHOUSE_URL": "http://localhost:8123", - "BASIC_AUTH": "user:password", - "OTEL_TRACE_SAMPLE_RATE": "1.1", - }, - wantErr: "OTEL_TRACE_SAMPLE_RATE must be between 0.0 and 1.0", - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - // Clean environment before each test case - cleanEnv(t, envKeys) - - // Set up environment for this test case - setupEnv(t, tc.envs) - - _, err := LoadConfig() - if err == nil { - t.Fatalf("LoadConfig() error = nil, wantErr = true") - } - - if got := err.Error(); got != tc.wantErr && got[:len(tc.wantErr)] != tc.wantErr { - t.Errorf("LoadConfig() error = %v, want to contain %v", got, tc.wantErr) - } - }) - } - }) - - // Test for custom configurations - t.Run("CustomConfigurations", func(t *testing.T) { - // Table-driven tests for custom configurations - tests := []struct { - name string - envs map[string]string - checkFunc func(*testing.T, *Config) - }{ - { - name: "CustomPort", - envs: map[string]string{ - "CLICKHOUSE_URL": "http://localhost:8123", - "BASIC_AUTH": "user:password", - "PORT": "8000", - }, - checkFunc: func(t *testing.T, c *Config) { - if c.ListenerPort != "8000" { - t.Errorf("ListenerPort = %s, want 8000", c.ListenerPort) - } - }, - }, - { - name: "DebugMode", - envs: map[string]string{ - "CLICKHOUSE_URL": "http://localhost:8123", - "BASIC_AUTH": "user:password", - "OTEL_EXPORTER_LOG_DEBUG": "true", - }, - checkFunc: func(t *testing.T, c *Config) { - if !c.LogDebug { - t.Errorf("LogDebug = %v, want true", c.LogDebug) - } - }, - }, - { - name: "CustomSampleRate", - envs: map[string]string{ - "CLICKHOUSE_URL": "http://localhost:8123", - "BASIC_AUTH": "user:password", - "OTEL_TRACE_SAMPLE_RATE": "0.5", - }, - checkFunc: func(t *testing.T, c *Config) { - if c.TraceSampleRate != 0.5 { - t.Errorf("TraceSampleRate = %f, want 0.5", c.TraceSampleRate) - } - }, - }, - { - name: "ZeroSampleRate", - envs: map[string]string{ - "CLICKHOUSE_URL": "http://localhost:8123", - "BASIC_AUTH": "user:password", - "OTEL_TRACE_SAMPLE_RATE": "0.0", - }, - checkFunc: func(t *testing.T, c *Config) { - if c.TraceSampleRate != 0.0 { - t.Errorf("TraceSampleRate = %f, want 0.0", c.TraceSampleRate) - } - }, - }, - { - name: "OneSampleRate", - envs: map[string]string{ - "CLICKHOUSE_URL": "http://localhost:8123", - "BASIC_AUTH": "user:password", - "OTEL_TRACE_SAMPLE_RATE": "1.0", - }, - checkFunc: func(t *testing.T, c *Config) { - if c.TraceSampleRate != 1.0 { - t.Errorf("TraceSampleRate = %f, want 1.0", c.TraceSampleRate) - } - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - // Clean environment before each test case - cleanEnv(t, envKeys) - - // Set up environment for this test case - setupEnv(t, tc.envs) - - config, err := LoadConfig() - if err != nil { - t.Fatalf("LoadConfig() error = %v, wantErr = false", err) - } - - tc.checkFunc(t, config) - }) - } - }) -} diff --git a/apps/chproxy/go.mod b/apps/chproxy/go.mod deleted file mode 100644 index 30efdfcee3..0000000000 --- a/apps/chproxy/go.mod +++ /dev/null @@ -1,36 +0,0 @@ -module github.com/unkeyed/unkey/apps/chproxy - -go 1.24.0 - -require ( - go.opentelemetry.io/contrib/bridges/otelslog v0.10.0 - go.opentelemetry.io/contrib/processors/minsev v0.8.0 - go.opentelemetry.io/otel v1.35.0 - go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.11.0 - go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.35.0 - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0 - go.opentelemetry.io/otel/metric v1.35.0 - go.opentelemetry.io/otel/sdk v1.35.0 - go.opentelemetry.io/otel/sdk/log v0.11.0 - go.opentelemetry.io/otel/sdk/metric v1.35.0 - go.opentelemetry.io/otel/trace v1.35.0 -) - -require ( - github.com/cenkalti/backoff/v4 v4.3.0 // indirect - github.com/go-logr/logr v1.4.2 // indirect - github.com/go-logr/stdr v1.2.2 // indirect - github.com/google/uuid v1.6.0 // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect - go.opentelemetry.io/auto/sdk v1.1.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 // indirect - go.opentelemetry.io/otel/log v0.11.0 // indirect - go.opentelemetry.io/proto/otlp v1.5.0 // indirect - golang.org/x/net v0.39.0 // indirect - golang.org/x/sys v0.32.0 // indirect - golang.org/x/text v0.24.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20250414145226-207652e42e2e // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250414145226-207652e42e2e // indirect - google.golang.org/grpc v1.71.1 // indirect - google.golang.org/protobuf v1.36.6 // indirect -) diff --git a/apps/chproxy/go.sum b/apps/chproxy/go.sum deleted file mode 100644 index 533febdbc5..0000000000 --- a/apps/chproxy/go.sum +++ /dev/null @@ -1,85 +0,0 @@ -github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= -github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= -github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= -github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= -github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= -github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= -github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= -github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= -github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 h1:e9Rjr40Z98/clHv5Yg79Is0NtosR5LXRvdr7o/6NwbA= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1/go.mod h1:tIxuGz/9mpox++sgp9fJjHO0+q1X9/UOWd798aAm22M= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 h1:5ZPtiqj0JL5oKWmcsq4VMaAW5ukBEgSGXEN89zeH1Jo= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3/go.mod h1:ndYquD05frm2vACXE1nsccT4oJzjhw2arTS2cpUD1PI= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= -go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= -go.opentelemetry.io/contrib/bridges/otelslog v0.10.0 h1:lRKWBp9nWoBe1HKXzc3ovkro7YZSb72X2+3zYNxfXiU= -go.opentelemetry.io/contrib/bridges/otelslog v0.10.0/go.mod h1:D+iyUv/Wxbw5LUDO5oh7x744ypftIryiWjoj42I6EKs= -go.opentelemetry.io/contrib/processors/minsev v0.8.0 h1:/i0gaV0Z174Twy1/NfgQoE+oQvFVbQItNl8UMwe62Jc= -go.opentelemetry.io/contrib/processors/minsev v0.8.0/go.mod h1:5siKBWhXmdM2gNh8KHZ4b97bdS4MYhqPJEEu6JtHciw= -go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ= -go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y= -go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.11.0 h1:C/Wi2F8wEmbxJ9Kuzw/nhP+Z9XaHYMkyDmXy6yR2cjw= -go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.11.0/go.mod h1:0Lr9vmGKzadCTgsiBydxr6GEZ8SsZ7Ks53LzjWG5Ar4= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.35.0 h1:0NIXxOCFx+SKbhCVxwl3ETG8ClLPAa0KuKV6p3yhxP8= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.35.0/go.mod h1:ChZSJbbfbl/DcRZNc9Gqh6DYGlfjw4PvO1pEOZH1ZsE= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 h1:1fTNlAIJZGWLP5FVu0fikVry1IsiUnXjf7QFvoNN3Xw= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0/go.mod h1:zjPK58DtkqQFn+YUMbx0M2XV3QgKU0gS9LeGohREyK4= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0 h1:xJ2qHD0C1BeYVTLLR9sX12+Qb95kfeD/byKj6Ky1pXg= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0/go.mod h1:u5BF1xyjstDowA1R5QAO9JHzqK+ublenEW/dyqTjBVk= -go.opentelemetry.io/otel/log v0.11.0 h1:c24Hrlk5WJ8JWcwbQxdBqxZdOK7PcP/LFtOtwpDTe3Y= -go.opentelemetry.io/otel/log v0.11.0/go.mod h1:U/sxQ83FPmT29trrifhQg+Zj2lo1/IPN1PF6RTFqdwc= -go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M= -go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE= -go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY= -go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg= -go.opentelemetry.io/otel/sdk/log v0.11.0 h1:7bAOpjpGglWhdEzP8z0VXc4jObOiDEwr3IYbhBnjk2c= -go.opentelemetry.io/otel/sdk/log v0.11.0/go.mod h1:dndLTxZbwBstZoqsJB3kGsRPkpAgaJrWfQg3lhlHFFY= -go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o= -go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w= -go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs= -go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc= -go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= -go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= -go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= -go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= -golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= -golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY= -golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E= -golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= -golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= -golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= -golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= -golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= -golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= -golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= -google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a h1:nwKuGPlUAt+aR+pcrkfFRrTU1BVrSmYyYMxYbUIVHr0= -google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a/go.mod h1:3kWAYMk1I75K4vykHtKt2ycnOgpA6974V7bREqbsenU= -google.golang.org/genproto/googleapis/api v0.0.0-20250414145226-207652e42e2e h1:UdXH7Kzbj+Vzastr5nVfccbmFsmYNygVLSPk1pEfDoY= -google.golang.org/genproto/googleapis/api v0.0.0-20250414145226-207652e42e2e/go.mod h1:085qFyf2+XaZlRdCgKNCIZ3afY2p4HHZdoIRpId8F4A= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a h1:51aaUVRocpvUOSQKM6Q7VuoaktNIaMCLuhZB6DKksq4= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a/go.mod h1:uRxBH1mhmO8PGhU89cMcHaXKZqO+OfakD8QQO0oYwlQ= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250414145226-207652e42e2e h1:ztQaXfzEXTmCBvbtWYRhJxW+0iJcz2qXfd38/e9l7bA= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250414145226-207652e42e2e/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= -google.golang.org/grpc v1.71.0 h1:kF77BGdPTQ4/JZWMlb9VpJ5pa25aqvVqogsxNHHdeBg= -google.golang.org/grpc v1.71.0/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec= -google.golang.org/grpc v1.71.1 h1:ffsFWr7ygTUscGPI0KKK6TLrGz0476KUvvsbqWK0rPI= -google.golang.org/grpc v1.71.1/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec= -google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= -google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= -google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= -google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/apps/chproxy/main.go b/apps/chproxy/main.go deleted file mode 100644 index b25375cba7..0000000000 --- a/apps/chproxy/main.go +++ /dev/null @@ -1,226 +0,0 @@ -package main - -import ( - "context" - "encoding/base64" - "fmt" - "io" - "log" - "log/slog" - "net/http" - "os" - "os/signal" - "strings" - "sync" - "syscall" - "time" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" -) - -const ( - maxBufferSize int = 50000 - maxBatchSize int = 10000 - maxBatchRows int = 5000 // Max rows per individual batch -) - -var ( - telemetry *TelemetryConfig - inFlight sync.WaitGroup // incoming requests - httpClient *http.Client // shared http client -) - -func main() { - config, err := LoadConfig() - if err != nil { - config.Logger.Error("failed to load configuration", slog.String("error", err.Error())) - os.Exit(1) - } - - httpClient = &http.Client{ - Timeout: 10 * time.Second, - Transport: &http.Transport{ - MaxIdleConns: 25, - MaxIdleConnsPerHost: 25, - IdleConnTimeout: 60 * time.Second, - }, - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var cleanup func(context.Context) error - telemetry, cleanup, err = setupTelemetry(ctx, config) - if err != nil { - config.Logger.Error("failed to setup telemetry", slog.String("error", err.Error())) - os.Exit(1) - } - defer func() { - cleanupCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := cleanup(cleanupCtx); err != nil { - log.Printf("failed to cleanup telemetry: %v", err) - } - }() - - if telemetry != nil && telemetry.LogHandler != nil { - config.Logger = slog.New(telemetry.LogHandler) - - slog.SetDefault(config.Logger) - } - - config.Logger.Info(fmt.Sprintf("%s starting", config.ServiceName), - "flush_interval", config.FlushInterval.String()) - - requiredAuthorization := "Basic " + base64.StdEncoding.EncodeToString([]byte(config.BasicAuth)) - - buffer := make(chan *Batch, maxBufferSize) - - // blocks until we've persisted everything and the process may stop - done := startBufferProcessor(ctx, buffer, config, telemetry) - - http.HandleFunc("/v1/liveness", func(w http.ResponseWriter, r *http.Request) { - _, span := telemetry.Tracer.Start(r.Context(), "liveness_check") - defer span.End() - - span.SetAttributes( - attribute.String("method", r.Method), - attribute.String("path", r.URL.Path), - ) - - w.Write([]byte("ok")) - span.SetStatus(codes.Ok, "") - }) - - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - inFlight.Add(1) - defer inFlight.Done() - - ctx, span := telemetry.Tracer.Start(r.Context(), "handle_request") - defer span.End() - - telemetry.Metrics.RequestCounter.Add(ctx, 1) - - if r.Header.Get("Authorization") != requiredAuthorization { - telemetry.Metrics.ErrorCounter.Add(ctx, 1) - config.Logger.Error("invalid authorization header", - "remote_addr", r.RemoteAddr, - "user_agent", r.UserAgent()) - - span.RecordError(err) - span.SetStatus(codes.Error, "unauthorized") - http.Error(w, "unauthorized", http.StatusUnauthorized) - return - } - - span.SetAttributes( - attribute.String("method", r.Method), - attribute.String("path", r.URL.Path), - attribute.String("remote_addr", r.RemoteAddr), - ) - - query := r.URL.Query().Get("query") - span.SetAttributes(attribute.String("query", query)) - - if query == "" || !strings.HasPrefix(strings.ToLower(query), "insert into") { - telemetry.Metrics.ErrorCounter.Add(ctx, 1) - config.Logger.Warn("invalid query", - "query", query, - "remote_addr", r.RemoteAddr) - - span.SetStatus(codes.Error, "wrong query") - http.Error(w, "wrong query", http.StatusBadRequest) - return - } - - params := r.URL.Query() - params.Del("query_id") - - body, err := io.ReadAll(r.Body) - if err != nil { - telemetry.Metrics.ErrorCounter.Add(ctx, 1) - config.Logger.Error("failed to read request body", - "error", err.Error(), - "remote_addr", r.RemoteAddr) - - span.RecordError(err) - span.SetStatus(codes.Error, "cannot read body") - http.Error(w, "cannot read body", http.StatusInternalServerError) - return - } - rows := strings.Split(string(body), "\n") - - config.Logger.Debug("received insert request", - "row_count", len(rows), - "table", strings.Split(query, " ")[2]) - - batch := &Batch{ - Params: params, - Rows: rows, - Table: strings.Split(query, " ")[2], - } - - select { - case buffer <- batch: - // Successfully sent to buffer - w.Write([]byte("ok")) - default: - // Buffer is full, drop the message - telemetry.Metrics.ErrorCounter.Add(ctx, 1) - config.Logger.Warn("buffer full, dropping message", - "row_count", len(rows), - "table", batch.Table, - "remote_addr", r.RemoteAddr) - - span.SetStatus(codes.Error, "buffer full") - http.Error(w, "service overloaded", 529) - return - } - span.SetStatus(codes.Ok, "") - span.SetAttributes( - attribute.Int("row_count", len(rows)), - attribute.String("table", strings.Split(query, " ")[2]), - ) - }) - - // Setup signal handling - signalCtx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) - defer stop() - - // Start HTTP server in a goroutine - server := &http.Server{Addr: fmt.Sprintf(":%s", config.ListenerPort)} - go func() { - config.Logger.Info("server listening", "port", config.ListenerPort) - if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { - config.Logger.Error("failed to start server", "error", err.Error()) - os.Exit(1) - } - }() - - // Wait for interrupt signal - <-signalCtx.Done() - config.Logger.Info("shutdown signal received") - - // Create a timeout context for shutdown - shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) - defer shutdownCancel() - - // Start a goroutine to track in-flight requests - shutdownComplete := make(chan struct{}) - go func() { - inFlight.Wait() - close(shutdownComplete) - }() - - // Attempt graceful shutdown - config.Logger.Info("shutting down server") - if err := server.Shutdown(shutdownCtx); err != nil { - config.Logger.Error("server shutdown error", "error", err.Error()) - } - - // Close the buffer channel and wait for processing to finish - close(buffer) - <-done - config.Logger.Info("graceful shutdown complete") -} diff --git a/apps/chproxy/otel.go b/apps/chproxy/otel.go deleted file mode 100644 index d38f992997..0000000000 --- a/apps/chproxy/otel.go +++ /dev/null @@ -1,237 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log/slog" - "sync/atomic" - "time" - - "go.opentelemetry.io/contrib/bridges/otelslog" - "go.opentelemetry.io/contrib/processors/minsev" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp" - "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" - "go.opentelemetry.io/otel/metric" - sdklog "go.opentelemetry.io/otel/sdk/log" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/resource" - sdktrace "go.opentelemetry.io/otel/sdk/trace" - semconv "go.opentelemetry.io/otel/semconv/v1.21.0" - "go.opentelemetry.io/otel/trace" -) - -type TelemetryConfig struct { - LogHandler slog.Handler - LogHTTPOptions []otlploghttp.Option - Meter metric.Meter - MetricHTTPOptions []otlpmetrichttp.Option - Metrics struct { - BatchCounter metric.Int64Counter - BatchPersistDuration metric.Float64Histogram - ErrorCounter metric.Int64Counter - FlushCounter metric.Int64Counter - FlushDuration metric.Float64Histogram - FlushBatchCount metric.Int64Histogram - RequestCounter metric.Int64Counter - RowCounter metric.Int64Counter - } - TraceHTTPOptions []otlptracehttp.Option - Tracer trace.Tracer -} - -var currentBufferSize int64 - -// SetBufferSize updates the current buffer size for metrics -func SetBufferSize(size int64) { - atomic.StoreInt64(¤tBufferSize, size) -} - -// GetBufferSize returns the current buffer size -func GetBufferSize() int64 { - return atomic.LoadInt64(¤tBufferSize) -} - -// SetupTelemetry initializes OTEL tracing, metrics, and logging -func setupTelemetry(ctx context.Context, config *Config) (*TelemetryConfig, func(context.Context) error, error) { - telemetryConfig := &TelemetryConfig{} - - res, err := resource.New(ctx, - resource.WithAttributes( - semconv.ServiceName(config.ServiceName), - semconv.ServiceVersion(config.ServiceVersion), - ), - ) - if err != nil { - return nil, nil, fmt.Errorf("failed to create OTEL resource: %w", err) - } - - // Configure metric/meter - metricExporter, err := otlpmetrichttp.New(ctx) - if err != nil { - return nil, nil, fmt.Errorf("failed to create OTEL metrics exporter: %w", err) - } - - meterProvider := sdkmetric.NewMeterProvider( - sdkmetric.WithResource(res), - sdkmetric.WithReader( - sdkmetric.NewPeriodicReader( - metricExporter, - sdkmetric.WithInterval(60*time.Second), - ), - ), - ) - otel.SetMeterProvider(meterProvider) - telemetryConfig.Meter = meterProvider.Meter(config.ServiceName) - - // Configure OTLP log handler - logExporter, err := otlploghttp.New(ctx, - otlploghttp.WithCompression(otlploghttp.GzipCompression), - ) - if err != nil { - return nil, nil, fmt.Errorf("failed to create log exporter: %w", err) - } - - var processor sdklog.Processor = sdklog.NewBatchProcessor(logExporter, sdklog.WithExportBufferSize(512)) - - processor = minsev.NewLogProcessor(processor, minsev.SeverityInfo) - - if config.LogDebug { - processor = minsev.NewLogProcessor(processor, minsev.SeverityDebug) - } - - logProvider := sdklog.NewLoggerProvider( - sdklog.WithResource(res), - sdklog.WithProcessor(processor), - ) - - otlpLogHandler := otelslog.NewHandler( - config.ServiceName, - otelslog.WithLoggerProvider(logProvider), - ) - - telemetryConfig.LogHandler = otlpLogHandler - - // Configure tracer with compression - traceExporter, err := otlptracehttp.New(ctx, - otlptracehttp.WithCompression(otlptracehttp.GzipCompression), - ) - - if err != nil { - return nil, nil, fmt.Errorf("failed to create trace exporter: %w", err) - } - - var sampler sdktrace.Sampler - - // We'll always sample errors - alwaysOnError := sdktrace.ParentBased( - sdktrace.TraceIDRatioBased(config.TraceSampleRate), - sdktrace.WithRemoteParentSampled(sdktrace.AlwaysSample()), - sdktrace.WithRemoteParentNotSampled(sdktrace.TraceIDRatioBased(config.TraceSampleRate)), - sdktrace.WithLocalParentSampled(sdktrace.AlwaysSample()), - sdktrace.WithLocalParentNotSampled(sdktrace.TraceIDRatioBased(config.TraceSampleRate)), - ) - - // Configure the sampler - if config.TraceSampleRate >= 1.0 { - sampler = sdktrace.AlwaysSample() - } else if config.TraceSampleRate <= 0.0 { - sampler = sdktrace.NeverSample() - } else { - sampler = alwaysOnError - } - - config.Logger.Info("configured tracer with sampling", - slog.Float64("rate", config.TraceSampleRate)) - - traceProvider := sdktrace.NewTracerProvider( - sdktrace.WithResource(res), - sdktrace.WithBatcher(traceExporter, - sdktrace.WithMaxExportBatchSize(config.TraceMaxBatchSize), - ), - sdktrace.WithSampler(sampler), - ) - - otel.SetTracerProvider(traceProvider) - telemetryConfig.Tracer = traceProvider.Tracer(config.ServiceName) - - // - // Initialize metrics - // - var err1, err2, err3, err4, err5, err6, err7, err8 error - telemetryConfig.Metrics.BatchCounter, err1 = telemetryConfig.Meter.Int64Counter( - "clickhouse_batches_total", - metric.WithDescription("Total number of batches sent to Clickhouse"), - ) - telemetryConfig.Metrics.RowCounter, err2 = telemetryConfig.Meter.Int64Counter( - "clickhouse_rows_total", - metric.WithDescription("Total number of rows sent to Clickhouse"), - ) - telemetryConfig.Metrics.FlushCounter, err3 = telemetryConfig.Meter.Int64Counter( - "clickhouse_flushes_total", - metric.WithDescription("Total number of flush operations"), - metric.WithUnit("{flush}"), - ) - telemetryConfig.Metrics.ErrorCounter, err4 = telemetryConfig.Meter.Int64Counter( - "clickhouse_errors_total", - metric.WithDescription("Total number of errors"), - ) - telemetryConfig.Metrics.FlushDuration, err5 = telemetryConfig.Meter.Float64Histogram( - "clickhouse_flush_duration_seconds", - metric.WithDescription("Duration of flush operations"), - metric.WithUnit("s"), - ) - telemetryConfig.Metrics.RequestCounter, err6 = telemetryConfig.Meter.Int64Counter( - "clickhouse_http_requests_total", - metric.WithDescription("Total number of HTTP requests received"), - metric.WithUnit("{request}"), - ) - telemetryConfig.Metrics.FlushBatchCount, err7 = telemetryConfig.Meter.Int64Histogram( - "clickhouse_flush_batch_count", - metric.WithDescription("Number of batches processed per flush operation"), - metric.WithUnit("{batch}"), - ) - telemetryConfig.Metrics.BatchPersistDuration, err8 = telemetryConfig.Meter.Float64Histogram( - "clickhouse_batch_persist_duration_seconds", - metric.WithDescription("Duration of individual batch persist operations"), - metric.WithUnit("s"), - ) - - for _, err := range []error{err1, err2, err3, err4, err5, err6, err7, err8} { - if err != nil { - return nil, nil, fmt.Errorf("failed to create metric: %w", err) - } - } - - _, err = telemetryConfig.Meter.Int64ObservableGauge( - "clickhouse_buffer_size", - metric.WithDescription("Current number of rows in buffer"), - metric.WithInt64Callback(func(_ context.Context, o metric.Int64Observer) error { - o.Observe(GetBufferSize()) - return nil - }), - ) - if err != nil { - return nil, nil, fmt.Errorf("failed to create buffer size gauge: %w", err) - } - - // Return a cleanup function (should be nil but might be an actual error) - cleanup := func(ctx context.Context) error { - if err := meterProvider.Shutdown(ctx); err != nil { - return err - } - - if err := traceProvider.Shutdown(ctx); err != nil { - return err - } - - if err := logProvider.Shutdown(ctx); err != nil { - return err - } - - return nil - } - - return telemetryConfig, cleanup, nil -}