diff --git a/apps/chproxy/main.go b/apps/chproxy/main.go index 1746138ea6..3d4bc5923f 100644 --- a/apps/chproxy/main.go +++ b/apps/chproxy/main.go @@ -5,7 +5,7 @@ import ( "encoding/base64" "fmt" "io" - "log" + "log/slog" "net/http" "net/url" "os" @@ -25,9 +25,26 @@ var ( CLICKHOUSE_URL string BASIC_AUTH string PORT string + logger *slog.Logger ) +func setupLogger() { + handler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + AddSource: false, + }) + + logger = slog.New(handler) + + slog.SetDefault(logger) + + logger.Info("chproxy starting", + slog.Int("max_buffer_size", MAX_BUFFER_SIZE), + slog.Int("max_batch_size", MAX_BATCH_SIZE), + slog.String("flush_interval", FLUSH_INTERVAL.String())) +} + func init() { + setupLogger() CLICKHOUSE_URL = os.Getenv("CLICKHOUSE_URL") if CLICKHOUSE_URL == "" { @@ -80,13 +97,18 @@ func persist(batch *Batch) error { defer res.Body.Close() if res.StatusCode == http.StatusOK { - log.Printf("GOLANG persisted %d rows for %s\n", len(batch.Rows), batch.Params.Get("query")) + logger.Info("rows persisted", + slog.Int("count", len(batch.Rows)), + slog.String("query", batch.Params.Get("query"))) } else { body, err := io.ReadAll(res.Body) if err != nil { return err } - fmt.Println("unable to persist", string(body)) + logger.Error("unable to persist rows", + slog.String("response", string(body)), + slog.Int("status_code", res.StatusCode), + slog.String("query", batch.Params.Get("query"))) } return nil } @@ -99,7 +121,6 @@ func main() { done := make(chan bool) go func() { - buffered := 0 batchesByParams := make(map[string]*Batch) @@ -110,7 +131,9 @@ func main() { for _, batch := range batchesByParams { err := persist(batch) if err != nil { - log.Println("Error flushing:", err.Error()) + logger.Error("error flushing batch", + slog.String("error", err.Error()), + slog.String("query", batch.Params.Get("query"))) } } buffered = 0 @@ -139,11 +162,11 @@ func main() { buffered += len(b.Rows) if buffered >= MAX_BATCH_SIZE { - log.Println("Flushing due to max size") + logger.Info("flushing due to max size") flushAndReset() } case <-ticker.C: - log.Println("Flushing from ticker") + logger.Info("flushing from ticker") flushAndReset() } @@ -156,7 +179,9 @@ func main() { http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { if r.Header.Get("Authorization") != requiredAuthorization { - log.Println("invaldu authorization header, expected", requiredAuthorization, r.Header.Get("Authorization")) + logger.Warn("invalid authorization header", + slog.String("expected", requiredAuthorization), + slog.String("authorization", r.Header.Get("Authorization"))) http.Error(w, "unauthorized", http.StatusUnauthorized) return } @@ -172,7 +197,9 @@ func main() { body, err := io.ReadAll(r.Body) if err != nil { + logger.Error("failed to read request body", slog.String("error", err.Error())) http.Error(w, "cannot read body", http.StatusInternalServerError) + return } rows := strings.Split(string(body), "\n") @@ -187,14 +214,18 @@ func main() { ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() - fmt.Println("listening on", PORT) - if err := http.ListenAndServe(fmt.Sprintf(":%s", PORT), nil); err != nil { - log.Fatalln("error starting server:", err) - } + srv := &http.Server{Addr: fmt.Sprintf(":%s", PORT)} + go func() { + logger.Info("listening", slog.String("port", PORT)) + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.Error("failed to start server", slog.String("error", err.Error())) + os.Exit(1) + } + }() <-ctx.Done() - log.Println("shutting down") close(buffer) <-done + logger.Info("shutdown complete") } diff --git a/go/api/openapi.json b/go/api/openapi.json index e9be6233fb..f68615ad33 100644 --- a/go/api/openapi.json +++ b/go/api/openapi.json @@ -50,13 +50,7 @@ } }, "type": "object", - "required": [ - "requestId", - "detail", - "status", - "title", - "type" - ] + "required": ["requestId", "detail", "status", "title", "type"] }, "NotFoundError": { "$ref": "#/components/schemas/BaseError" @@ -86,9 +80,7 @@ "type": "array" } }, - "required": [ - "errors" - ] + "required": ["errors"] } ] }, @@ -112,10 +104,7 @@ } }, "type": "object", - "required": [ - "message", - "location" - ] + "required": ["message", "location"] }, "V2LivenessResponseBody": { "additionalProperties": false, @@ -126,9 +115,7 @@ "type": "string" } }, - "required": [ - "message" - ], + "required": ["message"], "type": "object" }, "V2RatelimitSetOverrideRequestBody": { @@ -164,11 +151,7 @@ "minimum": 0 } }, - "required": [ - "identifier", - "limit", - "duration" - ], + "required": ["identifier", "limit", "duration"], "type": "object" }, "V2RatelimitSetOverrideResponseBody": { @@ -179,9 +162,7 @@ "type": "string" } }, - "required": [ - "overrideId" - ], + "required": ["overrideId"], "type": "object" }, "V2RatelimitGetOverrideRequestBody": { @@ -207,9 +188,7 @@ "maxLength": 255 } }, - "required": [ - "identifier" - ], + "required": ["identifier"], "type": "object" }, "V2RatelimitGetOverrideResponseBody": { @@ -246,13 +225,7 @@ "minimum": 0 } }, - "required": [ - "namespaceId", - "overrideId", - "duration", - "identifier", - "limit" - ], + "required": ["namespaceId", "overrideId", "duration", "identifier", "limit"], "type": "object" }, "V2RatelimitLimitRequestBody": { @@ -290,12 +263,7 @@ "minimum": 1 } }, - "required": [ - "namespace", - "identifier", - "limit", - "duration" - ], + "required": ["namespace", "identifier", "limit", "duration"], "type": "object" }, "V2RatelimitLimitResponseBody": { @@ -325,12 +293,7 @@ "type": "string" } }, - "required": [ - "limit", - "remaining", - "reset", - "success" - ], + "required": ["limit", "remaining", "reset", "success"], "type": "object" }, "V2RatelimitDeleteOverrideRequestBody": { @@ -356,9 +319,7 @@ "maxLength": 255 } }, - "required": [ - "identifier" - ], + "required": ["identifier"], "type": "object" }, "V2RatelimitDeleteOverrideResponseBody": { @@ -443,9 +404,7 @@ "description": "Error" } }, - "tags": [ - "ratelimit" - ] + "tags": ["ratelimit"] } }, "/v2/ratelimit.setOverride": { @@ -523,16 +482,12 @@ "description": "Error" } }, - "tags": [ - "ratelimit" - ] + "tags": ["ratelimit"] } }, "/v2/ratelimit.getOverride": { "post": { - "tags": [ - "ratelimit" - ], + "tags": ["ratelimit"], "operationId": "v2.ratelimit.getOverride", "requestBody": { "content": { @@ -610,9 +565,7 @@ }, "/v2/ratelimit.deleteOverride": { "post": { - "tags": [ - "ratelimit" - ], + "tags": ["ratelimit"], "operationId": "v2.ratelimit.deleteOverride", "requestBody": { "content": { @@ -725,9 +678,7 @@ } }, "summary": "Liveness check", - "tags": [ - "liveness" - ] + "tags": ["liveness"] } } }