Skip to content

Commit

Permalink
feat: cleanup, optimizations, panic handling
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed Nov 26, 2024
1 parent b86d55b commit 7f78179
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 196 deletions.
8 changes: 4 additions & 4 deletions cmd/mirror/main.go → cmd/go-proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"log"
"runtime"

"github.com/smrz2001/go-mirror/common/config"
"github.com/smrz2001/go-mirror/common/container"
"github.com/smrz2001/go-mirror/common/logging"
"github.com/smrz2001/go-mirror/server"
"github.com/3box/go-proxy/common/config"
"github.com/3box/go-proxy/common/container"
"github.com/3box/go-proxy/common/logging"
"github.com/3box/go-proxy/server"
)

func main() {
Expand Down
37 changes: 30 additions & 7 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,38 @@ package config

import (
"strings"
"time"

"github.com/spf13/viper"

"github.com/smrz2001/go-mirror/common/logging"
"github.com/3box/go-proxy/common/logging"
)

const (
defaultProxyListenPort = "8080"
defaultMetricsListenPort = "9464"
defaultDialTimeout = 30 * time.Second
defaultIdleTimeout = 90 * time.Second
defaultMirrorTimeout = 30 * time.Second
)

type Config struct {
Proxy ProxyConfig
Proxy ProxyConfig
Metrics MetricsConfig
}

type ProxyConfig struct {
TargetURL string
MirrorURL string
ListenAddr string
TLSEnabled bool
TargetURL string
MirrorURL string
ListenPort string
DialTimeout time.Duration
IdleTimeout time.Duration
MirrorTimeout time.Duration
}

type MetricsConfig struct {
Enabled bool
ListenPort string
}

func LoadConfig(logger logging.Logger) (*Config, error) {
Expand All @@ -26,9 +43,15 @@ func LoadConfig(logger logging.Logger) (*Config, error) {
// This was necessary to get viper to recognize the nested struct fields
viper.EnvKeyReplacer(strings.NewReplacer(".", "_")),
)
v.SetEnvPrefix("GO_MIRROR")
v.SetEnvPrefix("GO_PROXY")
v.AutomaticEnv()

v.SetDefault("Proxy.ListenPort", defaultProxyListenPort)
v.SetDefault("Proxy.DialTimeout", defaultDialTimeout)
v.SetDefault("Proxy.IdleTimeout", defaultIdleTimeout)
v.SetDefault("Proxy.MirrorTimeout", defaultMirrorTimeout)
v.SetDefault("Metrics.ListenPort", defaultMetricsListenPort)

// Unmarshal environment variables into the config struct
var cfg Config
if err := v.Unmarshal(&cfg); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions common/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (

"go.uber.org/dig"

"github.com/smrz2001/go-mirror/common/config"
"github.com/smrz2001/go-mirror/common/logging"
"github.com/smrz2001/go-mirror/common/metric"
"github.com/smrz2001/go-mirror/controllers"
"github.com/smrz2001/go-mirror/server"
"github.com/3box/go-proxy/common/config"
"github.com/3box/go-proxy/common/logging"
"github.com/3box/go-proxy/common/metric"
"github.com/3box/go-proxy/controllers"
"github.com/3box/go-proxy/server"
)

func BuildContainer(ctx context.Context) (*dig.Container, error) {
Expand Down
5 changes: 3 additions & 2 deletions common/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type MetricService interface {
}

const (
MetricProxyRequest = "proxy_request"
MetricMirrorRequest = "mirror_request"
MetricProxyRequest = "proxy_request"
MetricMirrorRequest = "mirror_request"
MetricPanicRecovered = "panic_recovered"
)
2 changes: 1 addition & 1 deletion common/metric/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

"github.com/gin-gonic/gin"

"github.com/smrz2001/go-mirror/common/logging"
"github.com/3box/go-proxy/common/logging"
)

const serviceName = "go-mirror"
Expand Down
224 changes: 99 additions & 125 deletions controllers/proxy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (

"github.com/gin-gonic/gin"

"github.com/smrz2001/go-mirror/common/config"
"github.com/smrz2001/go-mirror/common/logging"
"github.com/smrz2001/go-mirror/common/metric"
"github.com/3box/go-proxy/common/config"
"github.com/3box/go-proxy/common/logging"
"github.com/3box/go-proxy/common/metric"
)

type ProxyController interface {
Expand All @@ -36,6 +36,23 @@ type proxyController struct {
transport *http.Transport
}

type requestType string

const (
proxyRequest requestType = "proxy"
mirrorRequest = "mirror"
)

// Create a struct to hold request context
type requestContext struct {
reqType requestType
ginContext *gin.Context
request *http.Request
bodyBytes []byte
startTime time.Time
targetURL *url.URL
}

func NewProxyController(
ctx context.Context,
cfg *config.Config,
Expand All @@ -57,15 +74,10 @@ func NewProxyController(
transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
Timeout: cfg.Proxy.DialTimeout,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
ForceAttemptHTTP2: true,
IdleConnTimeout: cfg.Proxy.IdleTimeout,
}

return &proxyController{
Expand All @@ -87,149 +99,111 @@ func (_this proxyController) proxyRequest(c *gin.Context) {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to read request"})
return
}
// Ignore error since we are closing the body anyway
_ = c.Request.Body.Close()

// Close the original body
c.Request.Body.Close()
// Handle proxy request
_this.handleRequest(requestContext{
reqType: proxyRequest,
ginContext: c,
request: c.Request,
bodyBytes: bodyBytes,
startTime: time.Now(),
targetURL: _this.target,
})

start := time.Now()
// Handle mirror request if configured
if _this.mirror != nil {
go func() {
ctx, cancel := context.WithTimeout(_this.ctx, _this.cfg.Proxy.MirrorTimeout)
defer cancel()

_this.handleRequest(requestContext{
reqType: mirrorRequest,
request: c.Request.Clone(ctx),
bodyBytes: bodyBytes,
startTime: time.Now(),
targetURL: _this.mirror,
})
}()
}
}

// Create the proxy request
proxyReq := c.Request.Clone(c.Request.Context())
proxyReq.URL.Scheme = _this.target.Scheme
proxyReq.URL.Host = _this.target.Host
proxyReq.Host = _this.target.Host
proxyReq.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
func (_this proxyController) handleRequest(reqCtx requestContext) {
// Prepare the request
req := reqCtx.request.Clone(reqCtx.request.Context())
req.URL.Scheme = reqCtx.targetURL.Scheme
req.URL.Host = reqCtx.targetURL.Host
req.Host = reqCtx.targetURL.Host
req.Body = io.NopCloser(bytes.NewBuffer(reqCtx.bodyBytes))

// Log outbound request
_this.logger.Debugw("outbound request",
"method", proxyReq.Method,
"url", proxyReq.URL.String(),
"headers", proxyReq.Header,
_this.logger.Debugw(fmt.Sprintf("%s request", reqCtx.reqType),
"method", req.Method,
"url", req.URL.String(),
"headers", req.Header,
)

// Record request with normalized path
err = _this.metrics.RecordRequest(_this.ctx, metric.MetricProxyRequest, proxyReq.Method, proxyReq.URL.Path)
if err != nil {
_this.logger.Errorw("failed to record proxy request metric", "error", err)
// Record metrics
metricName := metric.MetricProxyRequest
if reqCtx.reqType == mirrorRequest {
metricName = metric.MetricMirrorRequest
}

// Make the proxy request
resp, err := _this.transport.RoundTrip(proxyReq)
if err := _this.metrics.RecordRequest(_this.ctx, metricName, req.Method, req.URL.Path); err != nil {
_this.logger.Errorw("failed to record request metric", "error", err)
}

// Make the request
resp, err := _this.transport.RoundTrip(req)
if err != nil {
_this.logger.Errorw("proxy error", "error", err)
c.JSON(http.StatusBadGateway, gin.H{"error": "proxy error"})
_this.logger.Errorw(fmt.Sprintf("%s error", reqCtx.reqType), "error", err)
if reqCtx.reqType == proxyRequest {
reqCtx.ginContext.JSON(http.StatusBadGateway, gin.H{"error": "proxy error"})
}
return
}
defer resp.Body.Close()
// Ignore error since we are closing the body anyway
defer func() { _ = resp.Body.Close() }()

// Read the response body
// Process response
respBody, err := io.ReadAll(resp.Body)
if err != nil {
_this.logger.Errorw("failed to read proxy response body", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to read response"})
_this.logger.Errorw(fmt.Sprintf("failed to read %s response", reqCtx.reqType), "error", err)
if reqCtx.reqType == proxyRequest {
reqCtx.ginContext.JSON(http.StatusInternalServerError, gin.H{"error": "failed to read response"})
}
return
}

// Log response details
_this.logger.Debugw("received proxy response",
// Log response
_this.logger.Debugw(fmt.Sprintf("%s response", reqCtx.reqType),
"status", resp.StatusCode,
"content_length", len(respBody),
"content_type", resp.Header.Get("Content-Type"),
"content length", len(respBody),
"headers", resp.Header,
"latency", time.Since(reqCtx.startTime),
)

// Record metrics
attrs := []attribute.KeyValue{
attribute.String("method", proxyReq.Method),
attribute.String("path", proxyReq.URL.Path),
attribute.String("method", req.Method),
attribute.String("path", req.URL.Path),
attribute.Int("status_code", resp.StatusCode),
// Group status codes into categories
attribute.String("status_class", fmt.Sprintf("%dxx", resp.StatusCode/100)),
}

err = _this.metrics.RecordDuration(_this.ctx, metric.MetricProxyRequest, time.Since(start), attrs...)
if err != nil {
_this.logger.Errorw("failed to record proxy request duration metric", "error", err)
}

// Send mirror request if configured
if _this.mirror != nil {
// Create a new context for the mirror request
mirrorReq := c.Request.Clone(_this.ctx)
go _this.mirrorRequest(mirrorReq, bodyBytes)
if err := _this.metrics.RecordDuration(_this.ctx, metricName, time.Since(reqCtx.startTime), attrs...); err != nil {
_this.logger.Errorw("failed to record duration metric", "error", err)
}

// Copy response headers
for k, vv := range resp.Header {
for _, v := range vv {
c.Header(k, v)
// Write response for proxy requests only
if reqCtx.reqType == proxyRequest {
for k, vv := range resp.Header {
for _, v := range vv {
reqCtx.ginContext.Header(k, v)
}
}
}

// Write the response
c.Data(resp.StatusCode, resp.Header.Get("Content-Type"), respBody)
}

func (_this proxyController) mirrorRequest(mirrorReq *http.Request, bodyBytes []byte) {
// Create a context with timeout for the mirror request
ctx, cancel := context.WithTimeout(_this.ctx, 30*time.Second)
defer cancel()

// Use the new context
mirrorReq = mirrorReq.WithContext(ctx)

start := time.Now()

// Create the mirror request
mirrorReq.URL.Scheme = _this.mirror.Scheme
mirrorReq.URL.Host = _this.mirror.Host
mirrorReq.Host = _this.mirror.Host
mirrorReq.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))

// Log mirror request
_this.logger.Debugw("mirror request",
"method", mirrorReq.Method,
"url", mirrorReq.URL.String(),
"headers", mirrorReq.Header,
)

// Record request with normalized path
err := _this.metrics.RecordRequest(_this.ctx, metric.MetricMirrorRequest, mirrorReq.Method, mirrorReq.URL.Path)
if err != nil {
_this.logger.Errorw("failed to record mirror request metric", "error", err)
}

// Make the mirror request
resp, err := _this.transport.RoundTrip(mirrorReq)
if err != nil {
_this.logger.Errorw("mirror error", "error", err)
return
}
defer resp.Body.Close()

// Read and log the mirror response
body, err := io.ReadAll(resp.Body)
if err != nil {
_this.logger.Errorw("failed to read mirror response", "error", err)
return
}

// Log mirror response
_this.logger.Debugw("mirror response",
"status", resp.StatusCode,
"content_length", len(body),
"content_type", resp.Header.Get("Content-Type"),
)

attrs := []attribute.KeyValue{
attribute.String("method", mirrorReq.Method),
attribute.String("path", mirrorReq.URL.Path),
attribute.Int("status_code", resp.StatusCode),
// Group status codes into categories
attribute.String("status_class", fmt.Sprintf("%dxx", resp.StatusCode/100)),
}

err = _this.metrics.RecordDuration(_this.ctx, metric.MetricMirrorRequest, time.Since(start), attrs...)
if err != nil {
_this.logger.Errorw("failed to record proxy request duration metric", "error", err)
reqCtx.ginContext.Data(resp.StatusCode, resp.Header.Get("Content-Type"), respBody)
}
}

Expand Down
Loading

0 comments on commit 7f78179

Please sign in to comment.