Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 44 additions & 13 deletions apps/chproxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"encoding/base64"
"fmt"
"io"
"log"
"log/slog"
"net/http"
"net/url"
"os"
Expand All @@ -25,9 +25,26 @@ var (
CLICKHOUSE_URL string
BASIC_AUTH string
PORT string
logger *slog.Logger
)

func setupLogger() {
handler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
AddSource: false,
})

logger = slog.New(handler)

slog.SetDefault(logger)

logger.Info("chproxy starting",
slog.Int("max_buffer_size", MAX_BUFFER_SIZE),
slog.Int("max_batch_size", MAX_BATCH_SIZE),
slog.String("flush_interval", FLUSH_INTERVAL.String()))
}

func init() {
setupLogger()

CLICKHOUSE_URL = os.Getenv("CLICKHOUSE_URL")
if CLICKHOUSE_URL == "" {
Expand Down Expand Up @@ -80,13 +97,18 @@ func persist(batch *Batch) error {
defer res.Body.Close()

if res.StatusCode == http.StatusOK {
log.Printf("GOLANG persisted %d rows for %s\n", len(batch.Rows), batch.Params.Get("query"))
logger.Info("rows persisted",
slog.Int("count", len(batch.Rows)),
slog.String("query", batch.Params.Get("query")))
} else {
body, err := io.ReadAll(res.Body)
if err != nil {
return err
}
fmt.Println("unable to persist", string(body))
logger.Error("unable to persist rows",
slog.String("response", string(body)),
slog.Int("status_code", res.StatusCode),
slog.String("query", batch.Params.Get("query")))
}
return nil
}
Expand All @@ -99,7 +121,6 @@ func main() {
done := make(chan bool)

go func() {

buffered := 0

batchesByParams := make(map[string]*Batch)
Expand All @@ -110,7 +131,9 @@ func main() {
for _, batch := range batchesByParams {
err := persist(batch)
if err != nil {
log.Println("Error flushing:", err.Error())
logger.Error("error flushing batch",
slog.String("error", err.Error()),
slog.String("query", batch.Params.Get("query")))
}
}
buffered = 0
Expand Down Expand Up @@ -139,11 +162,11 @@ func main() {
buffered += len(b.Rows)

if buffered >= MAX_BATCH_SIZE {
log.Println("Flushing due to max size")
logger.Info("flushing due to max size")
flushAndReset()
}
case <-ticker.C:
log.Println("Flushing from ticker")
logger.Info("flushing from ticker")

flushAndReset()
}
Expand All @@ -156,7 +179,9 @@ func main() {

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Authorization") != requiredAuthorization {
log.Println("invaldu authorization header, expected", requiredAuthorization, r.Header.Get("Authorization"))
logger.Warn("invalid authorization header",
slog.String("expected", requiredAuthorization),
slog.String("authorization", r.Header.Get("Authorization")))
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
Expand All @@ -172,7 +197,9 @@ func main() {

body, err := io.ReadAll(r.Body)
if err != nil {
logger.Error("failed to read request body", slog.String("error", err.Error()))
http.Error(w, "cannot read body", http.StatusInternalServerError)
return
}
rows := strings.Split(string(body), "\n")

Expand All @@ -187,14 +214,18 @@ func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()

fmt.Println("listening on", PORT)
if err := http.ListenAndServe(fmt.Sprintf(":%s", PORT), nil); err != nil {
log.Fatalln("error starting server:", err)
}
srv := &http.Server{Addr: fmt.Sprintf(":%s", PORT)}
go func() {
logger.Info("listening", slog.String("port", PORT))
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Error("failed to start server", slog.String("error", err.Error()))
os.Exit(1)
}
}()

<-ctx.Done()
log.Println("shutting down")
close(buffer)
<-done

logger.Info("shutdown complete")
}
81 changes: 16 additions & 65 deletions go/api/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,7 @@
}
},
"type": "object",
"required": [
"requestId",
"detail",
"status",
"title",
"type"
]
"required": ["requestId", "detail", "status", "title", "type"]
},
"NotFoundError": {
"$ref": "#/components/schemas/BaseError"
Expand Down Expand Up @@ -86,9 +80,7 @@
"type": "array"
}
},
"required": [
"errors"
]
"required": ["errors"]
}
]
},
Expand All @@ -112,10 +104,7 @@
}
},
"type": "object",
"required": [
"message",
"location"
]
"required": ["message", "location"]
},
"V2LivenessResponseBody": {
"additionalProperties": false,
Expand All @@ -126,9 +115,7 @@
"type": "string"
}
},
"required": [
"message"
],
"required": ["message"],
"type": "object"
},
"V2RatelimitSetOverrideRequestBody": {
Expand Down Expand Up @@ -164,11 +151,7 @@
"minimum": 0
}
},
"required": [
"identifier",
"limit",
"duration"
],
"required": ["identifier", "limit", "duration"],
"type": "object"
},
"V2RatelimitSetOverrideResponseBody": {
Expand All @@ -179,9 +162,7 @@
"type": "string"
}
},
"required": [
"overrideId"
],
"required": ["overrideId"],
"type": "object"
},
"V2RatelimitGetOverrideRequestBody": {
Expand All @@ -207,9 +188,7 @@
"maxLength": 255
}
},
"required": [
"identifier"
],
"required": ["identifier"],
"type": "object"
},
"V2RatelimitGetOverrideResponseBody": {
Expand Down Expand Up @@ -246,13 +225,7 @@
"minimum": 0
}
},
"required": [
"namespaceId",
"overrideId",
"duration",
"identifier",
"limit"
],
"required": ["namespaceId", "overrideId", "duration", "identifier", "limit"],
"type": "object"
},
"V2RatelimitLimitRequestBody": {
Expand Down Expand Up @@ -290,12 +263,7 @@
"minimum": 1
}
},
"required": [
"namespace",
"identifier",
"limit",
"duration"
],
"required": ["namespace", "identifier", "limit", "duration"],
"type": "object"
},
"V2RatelimitLimitResponseBody": {
Expand Down Expand Up @@ -325,12 +293,7 @@
"type": "string"
}
},
"required": [
"limit",
"remaining",
"reset",
"success"
],
"required": ["limit", "remaining", "reset", "success"],
"type": "object"
},
"V2RatelimitDeleteOverrideRequestBody": {
Expand All @@ -356,9 +319,7 @@
"maxLength": 255
}
},
"required": [
"identifier"
],
"required": ["identifier"],
"type": "object"
},
"V2RatelimitDeleteOverrideResponseBody": {
Expand Down Expand Up @@ -443,9 +404,7 @@
"description": "Error"
}
},
"tags": [
"ratelimit"
]
"tags": ["ratelimit"]
}
},
"/v2/ratelimit.setOverride": {
Expand Down Expand Up @@ -523,16 +482,12 @@
"description": "Error"
}
},
"tags": [
"ratelimit"
]
"tags": ["ratelimit"]
}
},
"/v2/ratelimit.getOverride": {
"post": {
"tags": [
"ratelimit"
],
"tags": ["ratelimit"],
"operationId": "v2.ratelimit.getOverride",
"requestBody": {
"content": {
Expand Down Expand Up @@ -610,9 +565,7 @@
},
"/v2/ratelimit.deleteOverride": {
"post": {
"tags": [
"ratelimit"
],
"tags": ["ratelimit"],
"operationId": "v2.ratelimit.deleteOverride",
"requestBody": {
"content": {
Expand Down Expand Up @@ -725,9 +678,7 @@
}
},
"summary": "Liveness check",
"tags": [
"liveness"
]
"tags": ["liveness"]
}
}
}
Expand Down
Loading