diff --git a/apps/chproxy/config.go b/apps/chproxy/config.go new file mode 100644 index 0000000000..27a0fc35ec --- /dev/null +++ b/apps/chproxy/config.go @@ -0,0 +1,48 @@ +package main + +import ( + "fmt" + "log/slog" + "os" + "time" +) + +type Config struct { + Logger *slog.Logger + BasicAuth string + ClickhouseURL string + FlushInterval time.Duration + ListenerPort string + MaxBatchSize int + MaxBufferSize int + ServiceName string + ServiceVersion string +} + +func LoadConfig() (*Config, error) { + // New config with defaults + config := &Config{ + FlushInterval: time.Second * 3, + ListenerPort: "7123", + MaxBatchSize: 10000, + MaxBufferSize: 50000, + ServiceName: "chproxy", + ServiceVersion: "1.1.0", + } + + config.ClickhouseURL = os.Getenv("CLICKHOUSE_URL") + if config.ClickhouseURL == "" { + return nil, fmt.Errorf("CLICKHOUSE_URL must be defined") + } + + config.BasicAuth = os.Getenv("BASIC_AUTH") + if config.BasicAuth == "" { + return nil, fmt.Errorf("BASIC_AUTH must be defined") + } + + if port := os.Getenv("PORT"); port != "" { + config.ListenerPort = port + } + + return config, nil +} diff --git a/apps/chproxy/go.mod b/apps/chproxy/go.mod index 23fcd8c61a..a7b634462b 100644 --- a/apps/chproxy/go.mod +++ b/apps/chproxy/go.mod @@ -1,3 +1,35 @@ module github.com/unkeyed/unkey/apps/chproxy -go 1.23.2 +go 1.24.0 + +require ( + go.opentelemetry.io/contrib/bridges/otelslog v0.10.0 + go.opentelemetry.io/otel v1.35.0 + go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.11.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.35.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0 + go.opentelemetry.io/otel/metric v1.35.0 + go.opentelemetry.io/otel/sdk v1.35.0 + go.opentelemetry.io/otel/sdk/log v0.11.0 + go.opentelemetry.io/otel/sdk/metric v1.35.0 + go.opentelemetry.io/otel/trace v1.35.0 +) + +require ( + github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 // indirect + go.opentelemetry.io/otel/log v0.11.0 // indirect + go.opentelemetry.io/proto/otlp v1.5.0 // indirect + golang.org/x/net v0.35.0 // indirect + golang.org/x/sys v0.30.0 // indirect + golang.org/x/text v0.22.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect + google.golang.org/grpc v1.71.0 // indirect + google.golang.org/protobuf v1.36.5 // indirect +) diff --git a/apps/chproxy/go.sum b/apps/chproxy/go.sum index e69de29bb2..0e798a504e 100644 --- a/apps/chproxy/go.sum +++ b/apps/chproxy/go.sum @@ -0,0 +1,67 @@ +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 h1:e9Rjr40Z98/clHv5Yg79Is0NtosR5LXRvdr7o/6NwbA= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1/go.mod h1:tIxuGz/9mpox++sgp9fJjHO0+q1X9/UOWd798aAm22M= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/contrib/bridges/otelslog v0.10.0 h1:lRKWBp9nWoBe1HKXzc3ovkro7YZSb72X2+3zYNxfXiU= +go.opentelemetry.io/contrib/bridges/otelslog v0.10.0/go.mod h1:D+iyUv/Wxbw5LUDO5oh7x744ypftIryiWjoj42I6EKs= +go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ= +go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.11.0 h1:C/Wi2F8wEmbxJ9Kuzw/nhP+Z9XaHYMkyDmXy6yR2cjw= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.11.0/go.mod h1:0Lr9vmGKzadCTgsiBydxr6GEZ8SsZ7Ks53LzjWG5Ar4= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.35.0 h1:0NIXxOCFx+SKbhCVxwl3ETG8ClLPAa0KuKV6p3yhxP8= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.35.0/go.mod h1:ChZSJbbfbl/DcRZNc9Gqh6DYGlfjw4PvO1pEOZH1ZsE= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 h1:1fTNlAIJZGWLP5FVu0fikVry1IsiUnXjf7QFvoNN3Xw= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0/go.mod h1:zjPK58DtkqQFn+YUMbx0M2XV3QgKU0gS9LeGohREyK4= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0 h1:xJ2qHD0C1BeYVTLLR9sX12+Qb95kfeD/byKj6Ky1pXg= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0/go.mod h1:u5BF1xyjstDowA1R5QAO9JHzqK+ublenEW/dyqTjBVk= +go.opentelemetry.io/otel/log v0.11.0 h1:c24Hrlk5WJ8JWcwbQxdBqxZdOK7PcP/LFtOtwpDTe3Y= +go.opentelemetry.io/otel/log v0.11.0/go.mod h1:U/sxQ83FPmT29trrifhQg+Zj2lo1/IPN1PF6RTFqdwc= +go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M= +go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE= +go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY= +go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg= +go.opentelemetry.io/otel/sdk/log v0.11.0 h1:7bAOpjpGglWhdEzP8z0VXc4jObOiDEwr3IYbhBnjk2c= +go.opentelemetry.io/otel/sdk/log v0.11.0/go.mod h1:dndLTxZbwBstZoqsJB3kGsRPkpAgaJrWfQg3lhlHFFY= +go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o= +go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w= +go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs= +go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc= +go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= +go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= +golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= +golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= +golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= +golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= +google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a h1:nwKuGPlUAt+aR+pcrkfFRrTU1BVrSmYyYMxYbUIVHr0= +google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a/go.mod h1:3kWAYMk1I75K4vykHtKt2ycnOgpA6974V7bREqbsenU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a h1:51aaUVRocpvUOSQKM6Q7VuoaktNIaMCLuhZB6DKksq4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a/go.mod h1:uRxBH1mhmO8PGhU89cMcHaXKZqO+OfakD8QQO0oYwlQ= +google.golang.org/grpc v1.71.0 h1:kF77BGdPTQ4/JZWMlb9VpJ5pa25aqvVqogsxNHHdeBg= +google.golang.org/grpc v1.71.0/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/apps/chproxy/main.go b/apps/chproxy/main.go index 3d4bc5923f..02f41a73df 100644 --- a/apps/chproxy/main.go +++ b/apps/chproxy/main.go @@ -5,6 +5,7 @@ import ( "encoding/base64" "fmt" "io" + "log" "log/slog" "net/http" "net/url" @@ -13,139 +14,207 @@ import ( "strings" "syscall" "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/metric" ) const ( - MAX_BUFFER_SIZE = 50000 - MAX_BATCH_SIZE = 10000 - FLUSH_INTERVAL = time.Second * 3 + LOG_TICKER_SAMPLE_RATE = 10 // Only log every 10th ticker flush ) var ( - CLICKHOUSE_URL string - BASIC_AUTH string - PORT string - logger *slog.Logger + telemetry *TelemetryConfig ) -func setupLogger() { - handler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ - AddSource: false, - }) - - logger = slog.New(handler) - - slog.SetDefault(logger) - - logger.Info("chproxy starting", - slog.Int("max_buffer_size", MAX_BUFFER_SIZE), - slog.Int("max_batch_size", MAX_BATCH_SIZE), - slog.String("flush_interval", FLUSH_INTERVAL.String())) -} - -func init() { - setupLogger() - - CLICKHOUSE_URL = os.Getenv("CLICKHOUSE_URL") - if CLICKHOUSE_URL == "" { - panic("CLICKHOUSE_URL must be defined") - } - BASIC_AUTH = os.Getenv("BASIC_AUTH") - if BASIC_AUTH == "" { - panic("BASIC_AUTH must be defined") - } - PORT = os.Getenv("PORT") - if PORT == "" { - PORT = "7123" - } -} - type Batch struct { Rows []string Params url.Values } -func persist(batch *Batch) error { +func persist(ctx context.Context, batch *Batch, config *Config) error { + ctx, span := telemetry.Tracer.Start(ctx, "persist_batch") + defer span.End() + if len(batch.Rows) == 0 { return nil } - u, err := url.Parse(CLICKHOUSE_URL) + telemetry.Metrics.BatchCounter.Add(ctx, 1) + telemetry.Metrics.RowCounter.Add(ctx, int64(len(batch.Rows))) + + span.SetAttributes( + attribute.Int("rows", len(batch.Rows)), + attribute.String("query", batch.Params.Get("query")), + ) + + u, err := url.Parse(config.ClickhouseURL) if err != nil { + telemetry.Metrics.ErrorCounter.Add(ctx, 1) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return err } u.RawQuery = batch.Params.Encode() - req, err := http.NewRequest("POST", u.String(), strings.NewReader(strings.Join(batch.Rows, "\n"))) + req, err := http.NewRequestWithContext(ctx, "POST", u.String(), strings.NewReader(strings.Join(batch.Rows, "\n"))) if err != nil { + telemetry.Metrics.ErrorCounter.Add(ctx, 1) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return err } req.Header.Add("Content-Type", "text/plain") username := u.User.Username() password, ok := u.User.Password() if !ok { - return fmt.Errorf("password not set") + err := fmt.Errorf("password not set") + telemetry.Metrics.ErrorCounter.Add(ctx, 1) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err } req.SetBasicAuth(username, password) client := http.Client{} res, err := client.Do(req) if err != nil { + telemetry.Metrics.ErrorCounter.Add(ctx, 1) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return err } defer res.Body.Close() if res.StatusCode == http.StatusOK { - logger.Info("rows persisted", - slog.Int("count", len(batch.Rows)), - slog.String("query", batch.Params.Get("query"))) + config.Logger.Info("rows persisted", + "count", len(batch.Rows), + "query", batch.Params.Get("query")) } else { + telemetry.Metrics.ErrorCounter.Add(ctx, 1) body, err := io.ReadAll(res.Body) if err != nil { + span.RecordError(err) return err } - logger.Error("unable to persist rows", - slog.String("response", string(body)), - slog.Int("status_code", res.StatusCode), - slog.String("query", batch.Params.Get("query"))) + + errorMsg := string(body) + span.SetStatus(codes.Error, errorMsg) + span.RecordError(fmt.Errorf("HTTP %d: %s", res.StatusCode, errorMsg)) + + config.Logger.Error("unable to persist batch", + "response", errorMsg, + "status_code", res.StatusCode, + "query", batch.Params.Get("query")) } return nil } func main() { - requiredAuthorization := "Basic " + base64.StdEncoding.EncodeToString([]byte(BASIC_AUTH)) + config, err := LoadConfig() + if err != nil { + log.Fatalf("failed to load configuration: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var cleanup func(context.Context) error + telemetry, cleanup, err = setupTelemetry(ctx, config) + if err != nil { + log.Fatalf("failed to setup telemetry: %v", err) + } + defer func() { + cleanupCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := cleanup(cleanupCtx); err != nil { + log.Printf("failed to cleanup telemetry: %v", err) + } + }() - buffer := make(chan *Batch, MAX_BUFFER_SIZE) + if telemetry != nil && telemetry.LogHandler != nil { + config.Logger = slog.New(telemetry.LogHandler) + slog.SetDefault(config.Logger) + } + + config.Logger.Info(fmt.Sprintf("%s starting", config.ServiceName), + "max_buffer_size", config.MaxBufferSize, + "max_batch_size", config.MaxBatchSize, + "flush_interval", config.FlushInterval.String()) + + requiredAuthorization := "Basic " + base64.StdEncoding.EncodeToString([]byte(config.BasicAuth)) + + buffer := make(chan *Batch, config.MaxBufferSize) // blocks until we've persisted everything and the process may stop done := make(chan bool) go func() { buffered := 0 - batchesByParams := make(map[string]*Batch) + ticker := time.NewTicker(config.FlushInterval) + tickerCount := 0 + + flushAndReset := func(ctx context.Context, reason string) { + ctx, span := telemetry.Tracer.Start(ctx, "flush_batches") + defer span.End() + + startTime := time.Now() + + // Record metrics + telemetry.Metrics.FlushCounter.Add(ctx, 1, metric.WithAttributes(attribute.String("reason", reason))) + + span.SetAttributes( + attribute.Int("batch_count", len(batchesByParams)), + attribute.Int("buffered_rows", buffered), + attribute.String("reason", reason), + ) + + // We'll sample logs for ticker-based flushes + shouldLog := true + if reason == "ticker" { + tickerCount++ + // Only log every LOG_TICKER_SAMPLE_RATE times + shouldLog = (tickerCount%LOG_TICKER_SAMPLE_RATE == 0) + } - ticker := time.NewTicker(FLUSH_INTERVAL) + // Only log if we should based on sampling + if shouldLog { + config.Logger.Info("flushing batches", + "reason", reason, + "batch_count", len(batchesByParams), + "buffered_rows", buffered) + } - flushAndReset := func() { for _, batch := range batchesByParams { - err := persist(batch) + err := persist(ctx, batch, config) if err != nil { - logger.Error("error flushing batch", - slog.String("error", err.Error()), - slog.String("query", batch.Params.Get("query"))) + // Always log errors regardless of sampling + config.Logger.Error("error flushing batch", + "error", err.Error(), + "query", batch.Params.Get("query")) } } + + duration := time.Since(startTime).Seconds() + telemetry.Metrics.FlushDuration.Record(ctx, duration, + metric.WithAttributes(attribute.String("reason", reason))) + + span.SetAttributes(attribute.Float64("duration_seconds", duration)) + buffered = 0 + SetBufferSize(0) batchesByParams = make(map[string]*Batch) - ticker.Reset(FLUSH_INTERVAL) + ticker.Reset(config.FlushInterval) } + for { select { case b, ok := <-buffer: if !ok { - // channel closed - flushAndReset() + config.Logger.Info("buffer channel closed, flushing remaining batches") + flushAndReset(ctx, "shutdown") done <- true return } @@ -154,40 +223,69 @@ func main() { batch, ok := batchesByParams[params] if !ok { batchesByParams[params] = b + config.Logger.Debug("new batch type received", + "query", b.Params.Get("query")) } else { - batch.Rows = append(batch.Rows, b.Rows...) } buffered += len(b.Rows) + SetBufferSize(int64(buffered)) - if buffered >= MAX_BATCH_SIZE { - logger.Info("flushing due to max size") - flushAndReset() + if buffered >= config.MaxBatchSize { + config.Logger.Info("flushing due to max batch size", + "buffered_rows", buffered, + "max_size", config.MaxBatchSize) + flushAndReset(ctx, "max_size") } case <-ticker.C: - logger.Info("flushing from ticker") - - flushAndReset() + config.Logger.Debug("flushing due to max batch size", + "buffered_rows", buffered, + "max_size", config.MaxBatchSize) + flushAndReset(ctx, "ticker") } } }() http.HandleFunc("/v1/liveness", func(w http.ResponseWriter, r *http.Request) { + _, span := telemetry.Tracer.Start(r.Context(), "liveness_check") + defer span.End() + w.Write([]byte("ok")) }) http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + ctx, span := telemetry.Tracer.Start(r.Context(), "handle_request") + defer span.End() + + span.SetAttributes( + attribute.String("method", r.Method), + attribute.String("path", r.URL.Path), + attribute.String("remote_addr", r.RemoteAddr), + ) + if r.Header.Get("Authorization") != requiredAuthorization { - logger.Warn("invalid authorization header", - slog.String("expected", requiredAuthorization), - slog.String("authorization", r.Header.Get("Authorization"))) + telemetry.Metrics.ErrorCounter.Add(ctx, 1) + log.Println("invalid authorization header, expected", requiredAuthorization, r.Header.Get("Authorization")) + config.Logger.Error("invalid authorization header", + "remote_addr", r.RemoteAddr, + "user_agent", r.UserAgent()) + + span.SetStatus(codes.Error, "unauthorized") http.Error(w, "unauthorized", http.StatusUnauthorized) return } query := r.URL.Query().Get("query") + span.SetAttributes(attribute.String("query", query)) + if query == "" || !strings.HasPrefix(strings.ToLower(query), "insert into") { + telemetry.Metrics.ErrorCounter.Add(ctx, 1) + config.Logger.Warn("Invalid query", + "query", query, + "remote_addr", r.RemoteAddr) + + span.SetStatus(codes.Error, "wrong query") http.Error(w, "wrong query", http.StatusBadRequest) return } @@ -197,12 +295,24 @@ func main() { body, err := io.ReadAll(r.Body) if err != nil { - logger.Error("failed to read request body", slog.String("error", err.Error())) + telemetry.Metrics.ErrorCounter.Add(ctx, 1) + config.Logger.Error("failed to read request body", + "error", err.Error(), + "remote_addr", r.RemoteAddr) + + span.RecordError(err) + span.SetStatus(codes.Error, "cannot read body") http.Error(w, "cannot read body", http.StatusInternalServerError) return } rows := strings.Split(string(body), "\n") + span.SetAttributes(attribute.Int("row_count", len(rows))) + + config.Logger.Debug("received insert request", + "row_count", len(rows), + "table", strings.Split(query, " ")[2]) + buffer <- &Batch{ Params: params, Rows: rows, @@ -211,21 +321,36 @@ func main() { w.Write([]byte("ok")) }) - ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + // Setup signal handling + signalCtx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) defer stop() - srv := &http.Server{Addr: fmt.Sprintf(":%s", PORT)} + // Start HTTP server in a goroutine + server := &http.Server{Addr: fmt.Sprintf(":%s", config.ListenerPort)} go func() { - logger.Info("listening", slog.String("port", PORT)) - if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { - logger.Error("failed to start server", slog.String("error", err.Error())) + config.Logger.Info("server listening", "port", config.ListenerPort) + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + config.Logger.Error("failed to start server", "error", err.Error()) os.Exit(1) } }() - <-ctx.Done() + // Wait for interrupt signal + <-signalCtx.Done() + config.Logger.Info("shutdown signal received") + + // Create a timeout context for shutdown + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer shutdownCancel() + + // Attempt graceful shutdown + config.Logger.Info("shutting down server") + if err := server.Shutdown(shutdownCtx); err != nil { + config.Logger.Error("server shutdown error", "error", err.Error()) + } + + // Close the buffer channel and wait for processing to finish close(buffer) <-done - - logger.Info("shutdown complete") + config.Logger.Info("graceful shutdown complete") } diff --git a/apps/chproxy/otel.go b/apps/chproxy/otel.go new file mode 100644 index 0000000000..563301fb29 --- /dev/null +++ b/apps/chproxy/otel.go @@ -0,0 +1,174 @@ +package main + +import ( + "context" + "fmt" + "log/slog" + "time" + + "go.opentelemetry.io/contrib/bridges/otelslog" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/metric" + sdklog "go.opentelemetry.io/otel/sdk/log" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.21.0" + "go.opentelemetry.io/otel/trace" +) + +type TelemetryConfig struct { + LogHandler slog.Handler + LogHTTPOptions []otlploghttp.Option + Meter metric.Meter + MetricHTTPOptions []otlpmetrichttp.Option + Metrics struct { + BatchCounter metric.Int64Counter + ErrorCounter metric.Int64Counter + FlushCounter metric.Int64Counter + FlushDuration metric.Float64Histogram + RowCounter metric.Int64Counter + } + TraceHTTPOptions []otlptracehttp.Option + Tracer trace.Tracer +} + +var currentBufferSize int64 + +// SetBufferSize updates the current buffer size for metrics +func SetBufferSize(size int64) { + currentBufferSize = size +} + +// GetBufferSize returns the current buffer size +func GetBufferSize() int64 { + return currentBufferSize +} + +// SetupTelemetry initializes OTEL tracing, metrics, and logging +func setupTelemetry(ctx context.Context, config *Config) (*TelemetryConfig, func(context.Context) error, error) { + telemetryConfig := &TelemetryConfig{} + + res, err := resource.New(ctx, + resource.WithAttributes( + semconv.ServiceName(config.ServiceName), + semconv.ServiceVersion(config.ServiceVersion), + ), + ) + if err != nil { + return nil, nil, fmt.Errorf("failed to create OTEL resource: %w", err) + } + + // Configure metric/meter + metricExporter, err := otlpmetrichttp.New(ctx) + if err != nil { + return nil, nil, fmt.Errorf("failed to create OTEL metrics exporter: %w", err) + } + + meterProvider := sdkmetric.NewMeterProvider( + sdkmetric.WithResource(res), + sdkmetric.WithReader( + sdkmetric.NewPeriodicReader( + metricExporter, + sdkmetric.WithInterval(10*time.Second), + ), + ), + ) + otel.SetMeterProvider(meterProvider) + telemetryConfig.Meter = meterProvider.Meter(config.ServiceName) + + logExporter, err := otlploghttp.New(ctx) + if err != nil { + return nil, nil, fmt.Errorf("failed to create log exporter: %w", err) + } + + logProvider := sdklog.NewLoggerProvider( + sdklog.WithResource(res), + sdklog.WithProcessor( + sdklog.NewBatchProcessor(logExporter), + ), + ) + + otlpHandler := otelslog.NewHandler( + config.ServiceName, + otelslog.WithLoggerProvider(logProvider), + ) + + telemetryConfig.LogHandler = otlpHandler + + // Configure tracer + traceExporter, err := otlptracehttp.New(ctx) + if err != nil { + return nil, nil, fmt.Errorf("failed to create trace exporter: %w", err) + } + + traceProvider := sdktrace.NewTracerProvider( + sdktrace.WithResource(res), + sdktrace.WithBatcher(traceExporter), + ) + otel.SetTracerProvider(traceProvider) + telemetryConfig.Tracer = traceProvider.Tracer(config.ServiceName) + + // Initialize metrics + var err1, err2, err3, err4, err5 error + telemetryConfig.Metrics.BatchCounter, err1 = telemetryConfig.Meter.Int64Counter( + "clickhouse_batches_total", + metric.WithDescription("Total number of batches sent to Clickhouse"), + ) + telemetryConfig.Metrics.RowCounter, err2 = telemetryConfig.Meter.Int64Counter( + "clickhouse_rows_total", + metric.WithDescription("Total number of rows sent to Clickhouse"), + ) + telemetryConfig.Metrics.FlushCounter, err3 = telemetryConfig.Meter.Int64Counter( + "clickhouse_flushes_total", + metric.WithDescription("Total number of flush operations"), + metric.WithUnit("{flush}"), + ) + telemetryConfig.Metrics.ErrorCounter, err4 = telemetryConfig.Meter.Int64Counter( + "clickhouse_errors_total", + metric.WithDescription("Total number of errors"), + ) + telemetryConfig.Metrics.FlushDuration, err5 = telemetryConfig.Meter.Float64Histogram( + "clickhouse_flush_duration_seconds", + metric.WithDescription("Duration of flush operations"), + metric.WithUnit("s"), + ) + + for _, err := range []error{err1, err2, err3, err4, err5} { + if err != nil { + return nil, nil, fmt.Errorf("failed to create metric: %w", err) + } + } + + _, err = telemetryConfig.Meter.Int64ObservableGauge( + "clickhouse_buffer_size", + metric.WithDescription("Current number of rows in buffer"), + metric.WithInt64Callback(func(_ context.Context, o metric.Int64Observer) error { + o.Observe(GetBufferSize()) + return nil + }), + ) + if err != nil { + return nil, nil, fmt.Errorf("failed to create buffer size gauge: %w", err) + } + + // Return a cleanup function (should be nil but might be an actual error) + cleanup := func(ctx context.Context) error { + err := meterProvider.Shutdown(ctx) + if err != nil { + return err + } + + if err := traceProvider.Shutdown(ctx); err != nil { + return err + } + + // Actually shutdown the log exporter + return logExporter.Shutdown(ctx) + } + + return telemetryConfig, cleanup, nil +}