Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions apps/chproxy/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
FROM golang:1.23-alpine AS builder

FROM golang:1.24-alpine AS builder

WORKDIR /go/src/github.com/unkeyed/unkey/apps/chproxy
COPY go.mod ./
# COPY go.sum ./
# RUN go mod download

COPY . .
RUN go build -o bin/chproxy ./main.go

RUN go build -o bin/chproxy

FROM golang:1.23-alpine
FROM golang:1.24-alpine
RUN apk add --update curl

WORKDIR /usr/local/bin
COPY --from=builder /go/src/github.com/unkeyed/unkey/apps/chproxy/bin/chproxy .

CMD [ "/usr/local/bin/chproxy"]
CMD ["/usr/local/bin/chproxy"]
113 changes: 113 additions & 0 deletions apps/chproxy/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package main

import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"strings"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
)

type Batch struct {
Rows []string
Params url.Values
}

func persist(ctx context.Context, batch *Batch, config *Config) error {
ctx, span := telemetry.Tracer.Start(ctx, "persist_batch")
defer span.End()

if len(batch.Rows) == 0 {
return nil
}

telemetry.Metrics.BatchCounter.Add(ctx, 1)
telemetry.Metrics.RowCounter.Add(ctx, int64(len(batch.Rows)))

span.SetAttributes(
attribute.Int("rows", len(batch.Rows)),
attribute.String("query", batch.Params.Get("query")),
)

u, err := url.Parse(config.ClickhouseURL)
if err != nil {
telemetry.Metrics.ErrorCounter.Add(ctx, 1)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

u.RawQuery = batch.Params.Encode()

req, err := http.NewRequestWithContext(ctx, "POST", u.String(), strings.NewReader(strings.Join(batch.Rows, "\n")))
if err != nil {
telemetry.Metrics.ErrorCounter.Add(ctx, 1)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

req.Header.Add("Content-Type", "text/plain")

username := u.User.Username()

password, ok := u.User.Password()
if !ok {
err := fmt.Errorf("password not set")
telemetry.Metrics.ErrorCounter.Add(ctx, 1)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

req.SetBasicAuth(username, password)

res, err := httpClient.Do(req)
if err != nil {
telemetry.Metrics.ErrorCounter.Add(ctx, 1)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
telemetry.Metrics.ErrorCounter.Add(ctx, 1)
body, err := io.ReadAll(res.Body)
if err != nil {
config.Logger.Error("error reading body",
"error", err)

span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

errorMsg := string(body)

config.Logger.Error("unable to persist batch",
"response", errorMsg,
"status_code", res.StatusCode,
"query", batch.Params.Get("query"))

span.SetStatus(codes.Error, errorMsg)
span.RecordError(fmt.Errorf("HTTP %d: %s", res.StatusCode, errorMsg))

return fmt.Errorf("http error: %v", errorMsg)
}

config.Logger.Info("rows persisted",
"count", len(batch.Rows),
"query", batch.Params.Get("query"))
span.SetStatus(codes.Ok, "")
span.SetAttributes(
attribute.String("result", "successfully sent to Clickhouse"),
attribute.Int("rows_processed", len(batch.Rows)),
)

return nil
}
127 changes: 127 additions & 0 deletions apps/chproxy/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package main

import (
"context"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

// startBufferProcessor manages processing of data sent to Clickhouse.
// Returns a channel that signals when all pending batches have been processed during shutdown
func startBufferProcessor(
ctx context.Context,
buffer <-chan *Batch,
config *Config,
telemetryConfig *TelemetryConfig,
) <-chan bool {
done := make(chan bool)

go func() {
buffered := 0
batchesByParams := make(map[string]*Batch)
ticker := time.NewTicker(config.FlushInterval)
defer ticker.Stop()

tickerCount := 0

flushAndReset := func(ctx context.Context, reason string) {
ctx, span := telemetryConfig.Tracer.Start(ctx, "flush_batches")
defer span.End()

startTime := time.Now()

// Record metrics
telemetryConfig.Metrics.FlushCounter.Add(ctx, 1, metric.WithAttributes(attribute.String("reason", reason)))

span.SetAttributes(
attribute.Int("batch_count", len(batchesByParams)),
attribute.Int("buffered_rows", buffered),
attribute.String("reason", reason),
)

// We'll sample logs for ticker-based flushes
shouldLog := true
if reason == "ticker" {
tickerCount++
// Only log every LOG_TICKER_SAMPLE_RATE times
shouldLog = (tickerCount%LOG_TICKER_SAMPLE_RATE == 0)
}

// Only log if we should based on sampling
if shouldLog {
config.Logger.Info("flushing batches",
"reason", reason,
"batch_count", len(batchesByParams),
"buffered_rows", buffered)
}

for _, batch := range batchesByParams {
err := persist(ctx, batch, config)
if err != nil {
// Always log errors regardless of sampling
config.Logger.Error("error flushing batch",
"error", err.Error(),
"query", batch.Params.Get("query"))
}
}

duration := time.Since(startTime).Seconds()
telemetryConfig.Metrics.FlushDuration.Record(ctx, duration,
metric.WithAttributes(attribute.String("reason", reason)))

span.SetAttributes(attribute.Float64("duration_seconds", duration))

buffered = 0
SetBufferSize(0)
batchesByParams = make(map[string]*Batch)
}

for {
select {
case <-ctx.Done():
config.Logger.Info("context cancelled, flushing remaining batches",
"buffered_rows", buffered,
"elapsed_time", config.FlushInterval.String())
flushAndReset(ctx, "shutdown")
done <- true
return
case b, ok := <-buffer:
if !ok {
config.Logger.Info("buffer channel closed, flushing remaining batches")
flushAndReset(ctx, "shutdown")
done <- true
return
}

params := b.Params.Encode()
batch, ok := batchesByParams[params]
if !ok {
batchesByParams[params] = b
config.Logger.Debug("new batch type received",
"query", b.Params.Get("query"))
} else {
batch.Rows = append(batch.Rows, b.Rows...)
}

buffered += len(b.Rows)
SetBufferSize(int64(buffered))

if buffered >= config.MaxBatchSize {
config.Logger.Info("flushing due to max batch size",
"buffered_rows", buffered,
"max_size", config.MaxBatchSize)
flushAndReset(ctx, "max_size")
}
case <-ticker.C:
config.Logger.Info("flushing on ticker",
"buffered_rows", buffered,
"elapsed_time", config.FlushInterval.String())
flushAndReset(ctx, "ticker")
}
}
}()

return done
}
10 changes: 8 additions & 2 deletions apps/chproxy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
)

type Config struct {
LogDebug bool
Logger *slog.Logger
BasicAuth string
ClickhouseURL string
Expand All @@ -22,12 +23,13 @@ type Config struct {
func LoadConfig() (*Config, error) {
// New config with defaults
config := &Config{
FlushInterval: time.Second * 3,
LogDebug: false,
FlushInterval: time.Second * 5,
ListenerPort: "7123",
MaxBatchSize: 10000,
MaxBufferSize: 50000,
ServiceName: "chproxy",
ServiceVersion: "1.1.0",
ServiceVersion: "1.2.0",
}

config.ClickhouseURL = os.Getenv("CLICKHOUSE_URL")
Expand All @@ -40,6 +42,10 @@ func LoadConfig() (*Config, error) {
return nil, fmt.Errorf("BASIC_AUTH must be defined")
}

if debug := os.Getenv("OTEL_EXPORTER_LOG_DEBUG"); debug == "true" {
config.LogDebug = true
}

if port := os.Getenv("PORT"); port != "" {
config.ListenerPort = port
}
Expand Down
1 change: 1 addition & 0 deletions apps/chproxy/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.24.0

require (
go.opentelemetry.io/contrib/bridges/otelslog v0.10.0
go.opentelemetry.io/contrib/processors/minsev v0.8.0
go.opentelemetry.io/otel v1.35.0
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.11.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.35.0
Expand Down
2 changes: 2 additions & 0 deletions apps/chproxy/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJyS
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/contrib/bridges/otelslog v0.10.0 h1:lRKWBp9nWoBe1HKXzc3ovkro7YZSb72X2+3zYNxfXiU=
go.opentelemetry.io/contrib/bridges/otelslog v0.10.0/go.mod h1:D+iyUv/Wxbw5LUDO5oh7x744ypftIryiWjoj42I6EKs=
go.opentelemetry.io/contrib/processors/minsev v0.8.0 h1:/i0gaV0Z174Twy1/NfgQoE+oQvFVbQItNl8UMwe62Jc=
go.opentelemetry.io/contrib/processors/minsev v0.8.0/go.mod h1:5siKBWhXmdM2gNh8KHZ4b97bdS4MYhqPJEEu6JtHciw=
go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y=
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.11.0 h1:C/Wi2F8wEmbxJ9Kuzw/nhP+Z9XaHYMkyDmXy6yR2cjw=
Expand Down
Loading
Loading