Skip to content

Commit

Permalink
Added https server
Browse files Browse the repository at this point in the history
  • Loading branch information
ItielOlenick committed Apr 29, 2024
1 parent 80005c6 commit d83cc4c
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 11 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ ARCH ?= $(shell go env GOARCH)
DOCKER_USER ?= open-telemetry
IMG_PREFIX ?= ghcr.io/${DOCKER_USER}/opentelemetry-operator
IMG_REPO ?= opentelemetry-operator
IMG ?= ${IMG_PREFIX}/${IMG_REPO}:${VERSION}
IMG ?= manager:both # playground. real val --> ${IMG_PREFIX}/${IMG_REPO}:${VERSION}
BUNDLE_IMG ?= ${IMG_PREFIX}/${IMG_REPO}-bundle:${VERSION}

TARGETALLOCATOR_IMG_REPO ?= target-allocator
Expand Down Expand Up @@ -51,7 +51,7 @@ GOBIN=$(shell go env GOBIN)
endif

# by default, do not run the manager with webhooks enabled. This only affects local runs, not the build or in-cluster deployments.
ENABLE_WEBHOOKS ?= false
ENABLE_WEBHOOKS ?= true # Playground. real val --> false

# If we are running in CI, run go test in verbose mode
ifeq (,$(CI))
Expand Down
3 changes: 3 additions & 0 deletions cmd/otel-allocator/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@ COPY --from=certificates /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-ce
# Copy binary built on the host
COPY bin/targetallocator_${TARGETARCH} ./main

# playground
COPY bin/certs /certs

ENTRYPOINT ["./main"]
14 changes: 13 additions & 1 deletion cmd/otel-allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func main() {
setupLog.Error(err, "Unable to initialize allocation strategy")
os.Exit(1)
}
srv := server.NewServer(log, allocator, cfg.ListenAddr)
srv := server.NewServer(log, allocator, cfg.ListenAddr, server.WithTLS("/certs/ca.crt", "/certs/server.crt", "/certs/server.key", ":8443"))

discoveryCtx, discoveryCancel := context.WithCancel(ctx)
sdMetrics, err := discovery.CreateAndRegisterSDMetrics(prometheus.DefaultRegisterer)
Expand Down Expand Up @@ -178,6 +178,18 @@ func main() {
setupLog.Error(shutdownErr, "Error on server shutdown")
}
})
runGroup.Add(
func() error {
err := srv.StartTls()
setupLog.Info("TLS Server failed to start")
return err
},
func(_ error) {
setupLog.Info("Closing TLS server")
if shutdownErr := srv.ShutdownTls(ctx); shutdownErr != nil {
setupLog.Error(shutdownErr, "Error on TLS server shutdown")
}
})
runGroup.Add(
func() error {
for {
Expand Down
116 changes: 108 additions & 8 deletions cmd/otel-allocator/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ package server

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"net/http"
"net/http/pprof"
"net/url"
"os"
"strings"
"sync"
"time"
Expand All @@ -32,6 +35,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/config"
promconfig "github.com/prometheus/prometheus/config"
"gopkg.in/yaml.v2"

Expand Down Expand Up @@ -63,37 +67,87 @@ type Server struct {
logger logr.Logger
allocator allocation.Allocator
server *http.Server
tlsServer *http.Server
jsonMarshaller jsoniter.API

// Use RWMutex to protect scrapeConfigResponse, since it
// will be predominantly read and only written when config
// is applied.
mtx sync.RWMutex
scrapeConfigResponse []byte
tlsScrapeConfigResponse []byte
}

func NewServer(log logr.Logger, allocator allocation.Allocator, listenAddr string) *Server {
s := &Server{
logger: log,
allocator: allocator,
jsonMarshaller: jsonConfig,
type ServerOption func(*Server)

// ServerOption to create an additional server with mTLS configuration.
// Used for getting the scrape config with actual secret values.
func WithTLS(caFile, certFile, keyFile, tlsListenAddr string) ServerOption {
return func(s *Server) {
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
s.logger.Error(err, "failed to load certificates")
}

caCert, err := os.ReadFile(caFile)
if err != nil {
s.logger.Error(err, "failed to load CA certificate")
}

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: caCertPool,
}

tlsRouter := gin.New()
s.setRouter(tlsRouter, true)

s.tlsServer = &http.Server{Addr: tlsListenAddr, Handler: tlsRouter, ReadHeaderTimeout: 90 * time.Second}
s.tlsServer.TLSConfig = tlsConfig
}
}

gin.SetMode(gin.ReleaseMode)
router := gin.New()
func (s *Server) setRouter(router *gin.Engine, tlsRouter bool) {
router.Use(gin.Recovery())
router.UseRawPath = true
router.UnescapePathValues = false
router.Use(s.PrometheusMiddleware)
router.GET("/scrape_configs", s.ScrapeConfigsHandler)

if tlsRouter {
router.GET("/scrape_configs", s.ScrapeConfigsTlsHandler)
} else {
router.GET("/scrape_configs", s.ScrapeConfigsHandler)
}

router.GET("/jobs", s.JobHandler)
router.GET("/jobs/:job_id/targets", s.TargetsHandler)
router.GET("/metrics", gin.WrapH(promhttp.Handler()))
router.GET("/livez", s.LivenessProbeHandler)
router.GET("/readyz", s.ReadinessProbeHandler)
registerPprof(router.Group("/debug/pprof/"))
}

func NewServer(log logr.Logger, allocator allocation.Allocator, listenAddr string, options ...ServerOption) *Server {
s := &Server{
logger: log,
allocator: allocator,
jsonMarshaller: jsonConfig,
}

gin.SetMode(gin.ReleaseMode)
router := gin.New()
s.setRouter(router, false)

s.server = &http.Server{Addr: listenAddr, Handler: router, ReadHeaderTimeout: 90 * time.Second}

for _, opt := range options {
opt(s)
}

return s
}

Expand All @@ -107,6 +161,16 @@ func (s *Server) Shutdown(ctx context.Context) error {
return s.server.Shutdown(ctx)
}

func (s *Server) StartTls() error {
s.logger.Info("Starting TLS server...")
return s.tlsServer.ListenAndServeTLS("", "")
}

func (s *Server) ShutdownTls(ctx context.Context) error {
s.logger.Info("Shutting down TLS server...")
return s.tlsServer.Shutdown(ctx)
}

// RemoveRegexFromRelabelAction is needed specifically for keepequal/dropequal actions because even though the user doesn't specify the
// regex field for these actions the unmarshalling implementations of prometheus adds back the default regex fields
// which in turn causes the receiver to error out since the unmarshaling of the json response doesn't expect anything in the regex fields
Expand Down Expand Up @@ -156,6 +220,8 @@ func RemoveRegexFromRelabelAction(jsonConfig []byte) ([]byte, error) {
// in to a JSON format for consumers to use.
func (s *Server) UpdateScrapeConfigResponse(configs map[string]*promconfig.ScrapeConfig) error {
var configBytes []byte

config.MarshalSecretValue = false
configBytes, err := yaml.Marshal(configs)
if err != nil {
return err
Expand All @@ -174,6 +240,26 @@ func (s *Server) UpdateScrapeConfigResponse(configs map[string]*promconfig.Scrap
s.mtx.Lock()
s.scrapeConfigResponse = jsonConfigNew
s.mtx.Unlock()

// Marshaling with actual secrets values for serving with mTLS
config.MarshalSecretValue = true
configBytes, err = yaml.Marshal(configs)
if err != nil {
return err
}
jsonConfig, err = yaml2.YAMLToJSON(configBytes)
if err != nil {
return err
}

jsonConfigNew, err = RemoveRegexFromRelabelAction(jsonConfig)
if err != nil {
return err
}

s.mtx.Lock()
s.tlsScrapeConfigResponse = jsonConfigNew
s.mtx.Unlock()
return nil
}

Expand All @@ -191,6 +277,20 @@ func (s *Server) ScrapeConfigsHandler(c *gin.Context) {
}
}

// ScrapeConfigsHandler returns the available scrape configuration discovered by the target allocator.
func (s *Server) ScrapeConfigsTlsHandler(c *gin.Context) {
s.mtx.RLock()
result := s.tlsScrapeConfigResponse
s.mtx.RUnlock()

// We don't use the jsonHandler method because we don't want our bytes to be re-encoded
c.Writer.Header().Set("Content-Type", "application/json")
_, err := c.Writer.Write(result)
if err != nil {
s.errorHandler(c.Writer, err)
}
}

func (s *Server) ReadinessProbeHandler(c *gin.Context) {
s.mtx.RLock()
result := s.scrapeConfigResponse
Expand Down
6 changes: 6 additions & 0 deletions config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,8 @@
resources:
- manager.yaml
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
images:
- name: controller
newName: manager
newTag: both

0 comments on commit d83cc4c

Please sign in to comment.