diff --git a/.semaphore/semaphore-scheduled-builds.yml b/.semaphore/semaphore-scheduled-builds.yml index 5d74ea528ac..5a0c6c97362 100644 --- a/.semaphore/semaphore-scheduled-builds.yml +++ b/.semaphore/semaphore-scheduled-builds.yml @@ -561,6 +561,30 @@ blocks: - s390x commands: - ../.semaphore/run-and-monitor image-$ARCH.log make build ARCH=$ARCH +- name: guardian + run: + when: "true or change_in(['/*', '/guardian/'], {exclude: ['/**/.gitignore', '/**/README.md', '/**/LICENSE']})" + execution_time_limit: + minutes: 30 + dependencies: + - Prerequisites + task: + prologue: + commands: + - cd guardian + jobs: + - name: make ci + commands: + - ../.semaphore/run-and-monitor make-ci.log make ci + - name: Build binary + matrix: + - env_var: ARCH + values: + - arm64 + - ppc64le + - s390x + commands: + - ../.semaphore/run-and-monitor image-$ARCH.log make build ARCH=$ARCH - name: kube-controllers run: when: "true or change_in(['/*', '/api/', '/libcalico-go/', '/kube-controllers/', '/hack/test/certs/'], {exclude: ['/**/.gitignore', '/**/README.md', '/**/LICENSE']})" diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index c82e96dca00..4db42744e2b 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -561,6 +561,30 @@ blocks: - s390x commands: - ../.semaphore/run-and-monitor image-$ARCH.log make build ARCH=$ARCH +- name: guardian + run: + when: "false or change_in(['/*', '/guardian/'], {exclude: ['/**/.gitignore', '/**/README.md', '/**/LICENSE']})" + execution_time_limit: + minutes: 30 + dependencies: + - Prerequisites + task: + prologue: + commands: + - cd guardian + jobs: + - name: make ci + commands: + - ../.semaphore/run-and-monitor make-ci.log make ci + - name: Build binary + matrix: + - env_var: ARCH + values: + - arm64 + - ppc64le + - s390x + commands: + - ../.semaphore/run-and-monitor image-$ARCH.log make build ARCH=$ARCH - name: kube-controllers run: when: "false or change_in(['/*', '/api/', '/libcalico-go/', '/kube-controllers/', '/hack/test/certs/'], {exclude: ['/**/.gitignore', '/**/README.md', '/**/LICENSE']})" diff --git a/.semaphore/semaphore.yml.d/blocks/20-guardian.yml b/.semaphore/semaphore.yml.d/blocks/20-guardian.yml new file mode 100644 index 00000000000..ac17a7ca7b4 --- /dev/null +++ b/.semaphore/semaphore.yml.d/blocks/20-guardian.yml @@ -0,0 +1,24 @@ +- name: guardian + run: + when: "${FORCE_RUN} or change_in(['/*', '/guardian/'], {exclude: ['/**/.gitignore', '/**/README.md', '/**/LICENSE']})" + execution_time_limit: + minutes: 30 + dependencies: + - Prerequisites + task: + prologue: + commands: + - cd guardian + jobs: + - name: make ci + commands: + - ../.semaphore/run-and-monitor make-ci.log make ci + - name: Build binary + matrix: + - env_var: ARCH + values: + - arm64 + - ppc64le + - s390x + commands: + - ../.semaphore/run-and-monitor image-$ARCH.log make build ARCH=$ARCH diff --git a/go.mod b/go.mod index c0378bdf0fe..bf980535ef1 100644 --- a/go.mod +++ b/go.mod @@ -117,7 +117,10 @@ require ( require ( github.com/gogo/googleapis v1.4.1 + github.com/hashicorp/yamux v0.1.2 golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 + golang.org/x/net v0.33.0 + golang.org/x/oauth2 v0.24.0 ) require ( @@ -318,8 +321,6 @@ require ( go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect - golang.org/x/net v0.33.0 // indirect - golang.org/x/oauth2 v0.24.0 // indirect golang.org/x/term v0.27.0 // indirect golang.org/x/tools v0.28.0 // indirect golang.zx2c4.com/wireguard v0.0.20200121 // indirect diff --git a/go.sum b/go.sum index 9c3b979da3a..aedfa817548 100644 --- a/go.sum +++ b/go.sum @@ -377,6 +377,8 @@ github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+l github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/hashicorp/yamux v0.1.2 h1:XtB8kyFOyHXYVFnwT5C3+Bdo8gArse7j2AQ0DA0Uey8= +github.com/hashicorp/yamux v0.1.2/go.mod h1:C+zze2n6e/7wshOZep2A70/aQU6QBRWJO/G6FT1wIns= github.com/homeport/dyff v1.6.0 h1:AN+ikld0Fy+qx34YE7655b/bpWuxS6cL9k852pE2GUc= github.com/homeport/dyff v1.6.0/go.mod h1:FlAOFYzeKvxmU5nTrnG+qrlJVWpsFew7pt8L99p5q8k= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= diff --git a/guardian/.mockery.yaml b/guardian/.mockery.yaml new file mode 100644 index 00000000000..9872f94f2aa --- /dev/null +++ b/guardian/.mockery.yaml @@ -0,0 +1,17 @@ +with-expecter: False +inpackage: False +dir: "{{.InterfaceDir}}/mocks" +mockname: "{{.InterfaceName}}" +outpkg: "mocks" +filename: "{{.InterfaceName}}.go" +packages: + github.com/projectcalico/calico/guardian/pkg/tunnel: + interfaces: + SessionDialer: + Session: + net: + config: + outpkg: "{{.PackageName}}" + dir: "pkg/thirdpartymocks/{{.PackagePath}}" + interfaces: + Conn: diff --git a/guardian/Makefile b/guardian/Makefile new file mode 100644 index 00000000000..2844f47a34a --- /dev/null +++ b/guardian/Makefile @@ -0,0 +1,111 @@ +include ../metadata.mk + +PACKAGE_NAME ?= github.com/projectcalico/calico/guardian + +############################################# +# Env vars related to packaging and releasing +############################################# +COMPONENTS ?=guardian +GUARDIAN_IMAGE ?=guardian +BUILD_IMAGES ?=$(GUARDIAN_IMAGE) + +############################################################################## +# Include ../lib.Makefile before anything else +# Additions to EXTRA_DOCKER_ARGS need to happen before the include since +# that variable is evaluated when we declare DOCKER_RUN and siblings. +############################################################################## +include ../lib.Makefile + +########################################################################################## +# Define some constants +########################################################################################## +BRANCH_NAME ?= $(PIN_BRANCH) + +# Some env vars that devs might find useful: +# TEST_DIRS= : only run the unit tests from the specified dirs +# UNIT_TESTS= : only run the unit tests matching the specified regexp + +BINDIR ?= bin +SRC_FILES = $(shell find . -name '*.go') \ + $(shell find ../api/pkg -name '*.go') \ + $(shell find ../libcalico-go/lib/logutils -name '*.go') +ifdef UNIT_TESTS +UNIT_TEST_FLAGS=-run $(UNIT_TESTS) -v +endif + +############################################# +# Env vars related to building +############################################# + +# Flags for building the binaries. +# +# We use -X to insert the version information into the placeholder variables +# in the version package. +LDFLAGS = -X $(PACKAGE_NAME)/pkg/version.BuildVersion=$(GIT_VERSION) \ + -X $(PACKAGE_NAME)/pkg/version.BuildDate=$(DATE) \ + -X $(PACKAGE_NAME)/pkg/version.GitDescription=$(GIT_DESCRIPTION) \ + -X $(PACKAGE_NAME)/pkg/version.GitRevision=$(GIT_COMMIT) \ + +########################################################################################## +# BUILD +########################################################################################## +build: $(BINDIR)/guardian-$(ARCH) + +.PHONY: $(BINDIR)/guardian-$(ARCH) +$(BINDIR)/guardian-$(ARCH): $(SRC_FILES) +ifeq ($(FIPS),true) + $(call build_cgo_boring_binary, ./cmd/guardian/main.go, $@) +else + $(call build_binary, ./cmd/guardian/main.go, $@) +endif + +gen-mocks: + $(DOCKER_RUN) $(CALICO_BUILD) sh -c 'mockery' + +gen-files: gen-mocks + +############################################# +# Docker Image +############################################# +GUARDIAN_CONTAINER_CREATED=.guardian.created-$(ARCH) + +# by default, build the image for the target architecture +.PHONY: image-all +image-all: $(addprefix sub-image-,$(VALIDARCHES)) +sub-image-%: + $(MAKE) image ARCH=$* + +.PHONY: image +image: $(BUILD_IMAGES) + +$(GUARDIAN_IMAGE): $(GUARDIAN_CONTAINER_CREATED) +$(GUARDIAN_CONTAINER_CREATED): docker-image/guardian/Dockerfile $(BINDIR)/guardian-$(ARCH) + $(DOCKER_BUILD) -t $(GUARDIAN_IMAGE):latest-$(ARCH) -f docker-image/guardian/Dockerfile . + $(MAKE) retag-build-images-with-registries VALIDARCHES=$(ARCH) IMAGETAG=latest BUILD_IMAGES=$(GUARDIAN_IMAGE) + touch $@ + +############################################# +# Run unit level tests +############################################# +.PHONY: ut +## Run only Unit Tests. +ut: + $(DOCKER_GO_BUILD) go test ./... -cover -count 1 + +########################################################################################## +# CI/CD +########################################################################################## +.PHONY: ci cd + +############################################# +# Run CI cycle - build, test, etc. +############################################# +## Run all CI steps for build and test, likely other targets. +ci: static-checks ut + +############################################# +# Deploy images to registry +############################################# +## Run all CD steps, normally pushing images out to registries. +cd: image-all cd-common + diff --git a/guardian/cmd/guardian/main.go b/guardian/cmd/guardian/main.go new file mode 100644 index 00000000000..b9c2bd8851d --- /dev/null +++ b/guardian/cmd/guardian/main.go @@ -0,0 +1,47 @@ +// Copyright (c) 2025 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + "os" + + "github.com/sirupsen/logrus" + + "github.com/projectcalico/calico/guardian/pkg/config" + "github.com/projectcalico/calico/guardian/pkg/daemon" + "github.com/projectcalico/calico/guardian/pkg/version" +) + +var ( + versionFlag = flag.Bool("version", false, "Print version information") +) + +func main() { + flag.Parse() + + // For --version use case + if *versionFlag { + version.Version() + os.Exit(0) + } + + cfg, err := config.NewConfig() + if err != nil { + logrus.Fatal(err) + } + + daemon.Run(cfg, cfg.Targets()) +} diff --git a/guardian/docker-image/guardian/Dockerfile b/guardian/docker-image/guardian/Dockerfile new file mode 100644 index 00000000000..dbb8a8cb140 --- /dev/null +++ b/guardian/docker-image/guardian/Dockerfile @@ -0,0 +1,15 @@ +# Copyright (c) 2023 Tigera, Inc. All rights reserved. + +FROM scratch AS source + +ARG TARGETARCH + +COPY bin/guardian-${TARGETARCH} /usr/bin/guardian + +FROM calico/base + +COPY --from=source / / + +USER 10001:10001 + +ENTRYPOINT ["/usr/bin/guardian"] diff --git a/guardian/pkg/chanutil/chan.go b/guardian/pkg/chanutil/chan.go new file mode 100644 index 00000000000..6b1c26b44a4 --- /dev/null +++ b/guardian/pkg/chanutil/chan.go @@ -0,0 +1,132 @@ +// Copyright (c) 2025 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chanutil + +import "context" + +// TODO maybe this shouldn't be under a "chan" package, but some sort of "service" package? The channel isn't actually exposed... +type Service[Req any, Resp any] interface { + Send(ctx context.Context, req Req) (Resp, error) + Listen() <-chan Request[Req, Resp] + Close() +} + +type service[Req any, Resp any] struct { + ch chan Request[Req, Resp] +} + +func NewService[Req any, Resp any](bufferSize int) Service[Req, Resp] { + return &service[Req, Resp]{ch: make(chan Request[Req, Resp], bufferSize)} +} + +func (srv *service[Req, Resp]) Send(ctx context.Context, req Req) (Resp, error) { + rspChan := make(chan ResponseType[Resp]) + + select { + case <-ctx.Done(): + case srv.ch <- Request[Req, Resp]{req: req, rspChan: rspChan}: + } + + // TODO need to ensure some other kind of timeout... maybe?? + var rsp ResponseType[Resp] + select { + case rsp = <-rspChan: + case <-ctx.Done(): + return rsp.resp, ctx.Err() + } + return rsp.resp, rsp.err +} + +func (srv *service[Req, Resp]) Listen() <-chan Request[Req, Resp] { + return srv.ch +} + +func (srv *service[Req, Resp]) Close() { + close(srv.ch) +} + +type ResponseType[Resp any] struct { + resp Resp + err error +} + +type Request[Req any, Resp any] struct { + req Req + rspChan chan ResponseType[Resp] +} + +func (c Request[Req, Resp]) Get() Req { + return c.req +} + +func (c Request[Req, Resp]) Return(resp Resp) { + c.rspChan <- ResponseType[Resp]{resp: resp} +} + +func (c Request[Req, Resp]) Close() { + close(c.rspChan) +} + +func (c Request[Req, Resp]) ReturnError(err error) { + c.rspChan <- ResponseType[Resp]{err: err} +} + +type RequestsHandler[Req any, Resp any] interface { + Handle() error + ReturnError(error) + Close() + Add(Request[Req, Resp]) +} + +type reqsHandler[Req any, Resp any] struct { + requests []Request[Req, Resp] + handleFunc func(Req) (Resp, error) +} + +func (h *reqsHandler[Req, Resp]) ReturnError(err error) { + for _, req := range h.requests { + req.ReturnError(err) + req.Close() + } +} + +func (h *reqsHandler[Req, Resp]) Close() { + for _, req := range h.requests { + req.Close() + } +} +func (h *reqsHandler[Req, Resp]) Handle() error { + for i, req := range h.requests { + rsp, err := h.handleFunc(req.Get()) + if err != nil { + h.requests = h.requests[i:] + return err + } + + req.Return(rsp) + req.Close() + } + + h.requests = nil + return nil +} + +func (h *reqsHandler[Req, Resp]) Add(req Request[Req, Resp]) { + h.requests = append(h.requests, req) +} + +func NewRequestsHandler[Req any, Resp any](f func(Req) (Resp, error)) RequestsHandler[Req, Resp] { + return &reqsHandler[Req, Resp]{handleFunc: f} +} diff --git a/guardian/pkg/config/config.go b/guardian/pkg/config/config.go new file mode 100644 index 00000000000..7feb47fe8b3 --- /dev/null +++ b/guardian/pkg/config/config.go @@ -0,0 +1,178 @@ +// Copyright (c) 2025 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "crypto/x509" + "encoding/json" + "errors" + "fmt" + "net" + "net/url" + "os" + "strings" + "time" + + "github.com/kelseyhightower/envconfig" + log "github.com/sirupsen/logrus" + "golang.org/x/net/http/httpproxy" + + "github.com/projectcalico/calico/guardian/pkg/cryptoutils" + "github.com/projectcalico/calico/guardian/pkg/server" + "github.com/projectcalico/calico/libcalico-go/lib/logutils" +) + +const ( + defaultTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token" + defaultCABundlePath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" +) + +const ( + // EnvConfigPrefix represents the prefix used to load ENV variables required for startup + EnvConfigPrefix = "GUARDIAN" +) + +// Config is a configuration used for Guardian +type Config struct { + LogLevel string `default:"INFO"` + CertPath string `default:"/certs" split_words:"true" json:"-"` + VoltronCAType string `default:"Tigera" split_words:"true"` + VoltronURL string `required:"true" split_words:"true"` + + KeepAliveEnable bool `default:"true" split_words:"true"` + KeepAliveInterval int `default:"100" split_words:"true"` + PProf bool `default:"false"` + + K8sEndpoint string `default:"https://kubernetes.default" split_words:"true"` + + TunnelDialRetryAttempts int `default:"20" split_words:"true"` + TunnelDialRetryInterval time.Duration `default:"5s" split_words:"true"` + TunnelDialTimeout time.Duration `default:"60s" split_words:"true"` + + TunnelDialRecreateOnTunnelClose bool `default:"true" split_words:"true"` + ConnectionRetryAttempts int `default:"25" split_words:"true"` + ConnectionRetryInterval time.Duration `default:"5s" split_words:"true"` + + Listen bool `default:"true"` + ListenHost string `default:"" split_words:"true"` + ListenPort string `default:"8080" split_words:"true"` +} + +func NewConfig() (*Config, error) { + cfg := &Config{} + if err := envconfig.Process(EnvConfigPrefix, cfg); err != nil { + return nil, err + } + + cfg.ConfigureLogging() + + return cfg, nil +} + +func (cfg *Config) Targets() []server.Target { + return []server.Target{ + server.MustCreateTarget("/api/", cfg.K8sEndpoint+":6443", + server.WithToken(defaultTokenPath), + server.WithCAPem(defaultCABundlePath)), + server.MustCreateTarget("/apis/", cfg.K8sEndpoint+":6443", + server.WithToken(defaultTokenPath), + server.WithCAPem(defaultCABundlePath)), + } +} + +func (cfg *Config) String() string { + data, err := json.Marshal(cfg) + if err != nil { + return "{}" + } + return string(data) +} + +func (cfg *Config) ConfigureLogging() { + logutils.ConfigureFormatter("guardian") + log.SetOutput(os.Stdout) + + // Override with desired log level + level, err := log.ParseLevel(cfg.LogLevel) + if err != nil { + log.Error("Invalid logging level passed in. Will use default level set to WARN") + // Setting default to WARN + level = log.WarnLevel + } + + log.SetLevel(level) +} + +func (cfg *Config) Cert() (string, *x509.CertPool, error) { + if strings.ToLower(cfg.VoltronCAType) == "public" { + // leave the ca cert pool as a nil pointer which will cause the tls dialer to load certs from the system. + log.Info("Using system certs.") + // in this case, the serverName will match the remote address + // we need to strip the ports + return strings.Split(cfg.VoltronURL, ":")[0], nil, nil + } else { + serverCrt := fmt.Sprintf("%s/management-cluster.crt", cfg.CertPath) + pemServerCrt, err := os.ReadFile(serverCrt) + if err != nil { + return "", nil, fmt.Errorf("failed to read server cert: %w", err) + } + + ca := x509.NewCertPool() + if ok := ca.AppendCertsFromPEM(pemServerCrt); !ok { + return "", nil, errors.New("Cannot append the certificate to ca pool") + } + + serverName, err := cryptoutils.ExtractServerName(pemServerCrt) + if err != nil { + return "", nil, err + } + return serverName, ca, nil + } +} + +// GetHTTPProxyURL resolves the proxy URL that should be used for the tunnel target. It respects HTTPS_PROXY and NO_PROXY +// environment variables (case-insensitive). +func (cfg *Config) GetHTTPProxyURL() (*url.URL, error) { + targetURL := &url.URL{ + // The scheme should be HTTPS, as we are establishing an mTLS session with the target. + Scheme: "https", + + // We expect `target` to be of the form host:port. + Host: cfg.VoltronURL, + } + + proxyURL, err := httpproxy.FromEnvironment().ProxyFunc()(targetURL) + if err != nil { + return nil, err + } + + if proxyURL == nil { + return nil, nil + } + + // Validate the URL scheme. + if proxyURL.Scheme != "http" && proxyURL.Scheme != "https" { + return nil, fmt.Errorf("proxy URL had invalid scheme (%s) - must be http or https", proxyURL.Scheme) + } + + // Update the host if we can infer a port number. + if proxyURL.Port() == "" && proxyURL.Scheme == "http" { + proxyURL.Host = net.JoinHostPort(proxyURL.Host, "80") + } else if proxyURL.Port() == "" && proxyURL.Scheme == "https" { + proxyURL.Host = net.JoinHostPort(proxyURL.Host, "443") + } + + return proxyURL, nil +} diff --git a/guardian/pkg/conn/forward.go b/guardian/pkg/conn/forward.go new file mode 100644 index 00000000000..5c9f50b2d82 --- /dev/null +++ b/guardian/pkg/conn/forward.go @@ -0,0 +1,56 @@ +package conn + +import ( + "errors" + "io" + "net" + "strings" + "sync" + + log "github.com/sirupsen/logrus" +) + +// Forward sends all data coming from the srcConn to the dstConn, and all data coming from dstConn to srcConn. Both +// srcConn and dstConn are closed when this function returns +func Forward(srcConn net.Conn, dstCon net.Conn) { + var wg sync.WaitGroup + + wg.Add(2) + go forwardConnection(srcConn, dstCon, &wg) + go forwardConnection(dstCon, srcConn, &wg) + + wg.Wait() +} + +// forwardConnection forwards data from srcConn to dstConn. This function attempts to close both srcConn and dstConn, and +// ignores all "use of closed network connection" errors, as these errors are benign. +func forwardConnection(srcConn net.Conn, dstCon net.Conn, wg *sync.WaitGroup) { + defer func() { + if err := srcConn.Close(); err != nil && !isUseOfClosedNetworkErr(err) { + log.WithError(err).Error("failed to close src connection") + } + }() + defer func() { + if err := dstCon.Close(); err != nil && !isUseOfClosedNetworkErr(err) { + log.WithError(err).Error("failed to close dst connection") + } + }() + defer wg.Done() + + if _, err := io.Copy(dstCon, srcConn); err != nil && !isUseOfClosedNetworkErr(err) { + log.WithError(err).Error("failed to forward data") + } +} + +func isUseOfClosedNetworkErr(err error) bool { + if errors.Is(err, net.ErrClosed) { + return true + } + switch err := err.(type) { + case *net.OpError: + if strings.Contains(err.Err.Error(), "use of closed network connection") { + return true + } + } + return false +} diff --git a/guardian/pkg/cryptoutils/crypto.go b/guardian/pkg/cryptoutils/crypto.go new file mode 100644 index 00000000000..51fd72040fa --- /dev/null +++ b/guardian/pkg/cryptoutils/crypto.go @@ -0,0 +1,54 @@ +// Copyright (c) 2025 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package cryptoutils has a set of utility function to be used across components +package cryptoutils + +import ( + "crypto/sha256" + "crypto/x509" + "encoding/pem" + "fmt" + + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" +) + +// GenerateFingerprint returns the sha256 hash for a x509 certificate printed as a hex number +func GenerateFingerprint(certificate *x509.Certificate) string { + // NewManagedClusterStorage() call in managedCluster_storage.go generates the certificate fingerprint + // using sha256. This checksum is saved as one of the annotations for this ManagedCluster resource. + // When voltron accepts the tunnel connection from guardian in voltron server.go, this internal active + // fingerprint is checked. If we want to upgrade to a better hash algorithm in the future, we need to + // change both places and properly update the annotation for any existing ManagedCluster resources. + fingerprint := fmt.Sprintf("%x", sha256.Sum256(certificate.Raw)) + log.Debugf("Created fingerprint for cert with common name: %s and fingerprint: %s", certificate.Subject.CommonName, fingerprint) + return fingerprint +} + +func ExtractServerName(pemServerCrt []byte) (string, error) { + certDERBlock, _ := pem.Decode(pemServerCrt) + if certDERBlock == nil || certDERBlock.Type != "CERTIFICATE" { + return "", errors.New("Cannot decode pem block for server certificate") + } + + cert, err := x509.ParseCertificate(certDERBlock.Bytes) + if err != nil { + return "", fmt.Errorf("cannot decode pem block for server certificate: %w", err) + } + if len(cert.DNSNames) != 1 { + return "", fmt.Errorf("expected a single DNS name registered on the certificate: %w", err) + } + return cert.DNSNames[0], nil +} diff --git a/guardian/pkg/daemon/daemon.go b/guardian/pkg/daemon/daemon.go new file mode 100644 index 00000000000..7e61977cc03 --- /dev/null +++ b/guardian/pkg/daemon/daemon.go @@ -0,0 +1,126 @@ +package daemon + +import ( + "context" + "fmt" + "os" + "os/signal" + "strings" + "sync" + "syscall" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/projectcalico/calico/guardian/pkg/config" + "github.com/projectcalico/calico/guardian/pkg/server" + "github.com/projectcalico/calico/guardian/pkg/tunnel" +) + +type configOpts struct { + proxyTargets []server.Target +} + +func Run(cfg *config.Config, proxyTargets []server.Target, opts ...Option) { + cfgOpts := &configOpts{} + for _, opt := range opts { + if err := opt(cfgOpts); err != nil { + log.Fatalf("Failed to apply option: %s", err) + } + } + + tunnelDialOpts := []tunnel.DialerOption{ + tunnel.WithDialerTimeout(cfg.TunnelDialTimeout), + tunnel.WithDialerRetryInterval(cfg.TunnelDialRetryInterval), + tunnel.WithDialerTimeout(cfg.TunnelDialTimeout), + tunnel.WithDialerKeepAliveSettings(cfg.KeepAliveEnable, time.Duration(cfg.KeepAliveInterval)*time.Millisecond), + } + + proxyURL, err := cfg.GetHTTPProxyURL() + if err != nil { + log.Fatalf("Failed to resolve proxy URL: %s", err) + } else if proxyURL != nil { + tunnelDialOpts = append(tunnelDialOpts, tunnel.WithDialerHTTPProxyURL(proxyURL)) + } + + srvOpts := []server.Option{ + server.WithProxyTargets(proxyTargets), + server.WithConnectionRetryAttempts(cfg.ConnectionRetryAttempts), + server.WithConnectionRetryInterval(cfg.ConnectionRetryInterval), + server.WithTunnelDialerOptions(tunnelDialOpts...), + } + + cert := fmt.Sprintf("%s/managed-cluster.crt", cfg.CertPath) + key := fmt.Sprintf("%s/managed-cluster.key", cfg.CertPath) + opt, err := server.WithTunnelCertificatesFromFile(cert, key) + if err != nil { + log.Fatalf("Failed to load tunnel cert: %s", err) + } else if opt != nil { + srvOpts = append(srvOpts, opt) + } + + if strings.ToLower(cfg.VoltronCAType) != "public" { + opt, err := server.WithTunnelRootCAFromFile(fmt.Sprintf("%s/management-cluster.crt", cfg.CertPath)) + if err != nil { + log.Fatalf("Failed to load tunnel root CA: %s", err) + } else if opt != nil { + srvOpts = append(srvOpts, opt) + } + } + + srv, err := server.New(cfg.VoltronURL, srvOpts...) + if err != nil { + log.Fatalf("Failed to create server: %s", err) + } + + health, err := server.NewHealth() + if err != nil { + log.Fatalf("Failed to create health server: %s.", err) + } + + ctx := GetShutdownContext() + go func() { + // Health checks start, meaning everything before has worked. + if err = health.ListenAndServeHTTP(); err != nil { + log.Fatalf("Health exited with error: %s", err) + } + }() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + // Allow requests to come down from the management cluster. + if err := srv.ListenAndServeManagementCluster(ctx); err != nil { + log.WithError(err).Fatal("Serving the tunnel exited.") + } + }() + + // Allow requests from the cluster to be sent up to the management cluster. + if cfg.Listen { + wg.Add(1) + go func() { + defer wg.Done() + + if err := srv.ListenAndServeCluster(ctx); err != nil { + log.WithError(err).Fatal("proxy tunnel exited with an error") + } + }() + } + + wg.Wait() +} + +// GetShutdownContext creates a context that's done when either syscall.SIGINT or syscall.SIGTERM notified. +func GetShutdownContext() context.Context { + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + <-signalChan + cancel() + }() + + return ctx +} diff --git a/guardian/pkg/daemon/option.go b/guardian/pkg/daemon/option.go new file mode 100644 index 00000000000..c2b03ce151c --- /dev/null +++ b/guardian/pkg/daemon/option.go @@ -0,0 +1,12 @@ +package daemon + +import "github.com/projectcalico/calico/guardian/pkg/server" + +type Option func(opts *configOpts) error + +func WithProxyTargets(targets ...server.Target) Option { + return func(opts *configOpts) error { + opts.proxyTargets = append(opts.proxyTargets, targets...) + return nil + } +} diff --git a/guardian/pkg/server/health.go b/guardian/pkg/server/health.go new file mode 100644 index 00000000000..b82cc0e193d --- /dev/null +++ b/guardian/pkg/server/health.go @@ -0,0 +1,55 @@ +// Copyright (c) 2025 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "net/http" + + log "github.com/sirupsen/logrus" +) + +// Health is for liveness and readiness probes +type Health struct { + http *http.Server + httpServeMux *http.ServeMux +} + +// NewHealth returns a new health and instantiates the handlers. +func NewHealth() (*Health, error) { + health := &Health{ + http: new(http.Server), + httpServeMux: http.NewServeMux(), + } + health.http.Addr = ":9080" + health.http.Handler = health.httpServeMux + + // Both readiness and liveness should both be healthy. In other words, if the endpoints + // are accessible, the service is live and ready. + health.httpServeMux.HandleFunc("/health", func(resp http.ResponseWriter, req *http.Request) { + log.Trace("GET /health") + + if _, err := resp.Write([]byte("OK")); err != nil { + log.WithError(err).Error("failed to write /health response") + } + }) + + return health, nil +} + +// ListenAndServeHTTP starts to listen and serve HTTP requests +func (h *Health) ListenAndServeHTTP() error { + log.Infof("Starting Health server at %s", h.http.Addr) + return h.http.ListenAndServe() +} diff --git a/guardian/pkg/server/options.go b/guardian/pkg/server/options.go new file mode 100644 index 00000000000..785f5975a3d --- /dev/null +++ b/guardian/pkg/server/options.go @@ -0,0 +1,148 @@ +// Copyright (c) 2025 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "crypto/tls" + "crypto/x509" + "encoding/pem" + "errors" + "fmt" + "os" + "time" + + "github.com/projectcalico/calico/guardian/pkg/tunnel" +) + +// Option is a common format for New() options +type Option func(*server) error + +// WithProxyTargets sets the proxying targets, can be used multiple times to add +// to a union of target. +func WithProxyTargets(tgts []Target) Option { + return func(c *server) error { + c.targets = tgts + return nil + } +} + +func WithTunnelCertificatesFromFile(certPath, keyPath string) (Option, error) { + pemCert, err := os.ReadFile(certPath) + if err != nil { + return nil, fmt.Errorf("failed to load tunnel cert from path %s: %w", certPath, err) + } + pemKey, err := os.ReadFile(keyPath) + if err != nil { + return nil, fmt.Errorf("failed to load tunnel key from path %s: %w", certPath, err) + } + + return func(c *server) error { + cert, err := tls.X509KeyPair(pemCert, pemKey) + if err != nil { + return fmt.Errorf("tls.X509KeyPair: %s", err.Error()) + } + + c.tunnelCert = &cert + return nil + }, nil +} + +func WithTunnelRootCAFromFile(caPath string) (Option, error) { + pemServerCrt, err := os.ReadFile(caPath) + if err != nil { + return nil, fmt.Errorf("failed to read server cert from path %s: %w", caPath, err) + } + + ca := x509.NewCertPool() + if ok := ca.AppendCertsFromPEM(pemServerCrt); !ok { + return nil, fmt.Errorf("failed to append the server cert to cert pool: %w", err) + } + + serverName, err := extractServerName(pemServerCrt) + if err != nil { + return nil, err + } + return func(c *server) error { + c.tunnelServerName = serverName + c.tunnelRootCAs = ca + return nil + }, nil +} + +func extractServerName(pemServerCrt []byte) (string, error) { + certDERBlock, _ := pem.Decode(pemServerCrt) + if certDERBlock == nil || certDERBlock.Type != "CERTIFICATE" { + return "", errors.New("Cannot decode pem block for server certificate") + } + + cert, err := x509.ParseCertificate(certDERBlock.Bytes) + if err != nil { + return "", fmt.Errorf("cannot decode pem block for server certificate: %w", err) + } + if len(cert.DNSNames) != 1 { + return "", fmt.Errorf("expected a single DNS name registered on the certificate: %w", err) + } + return cert.DNSNames[0], nil +} + +// WithTunnelCreds sets the credential to be used when establishing the tunnel +func WithTunnelCreds(certPEM []byte, keyPEM []byte) Option { + return func(c *server) error { + if certPEM == nil || keyPEM == nil { + return errors.New("WithTunnelCreds: cert and key are required") + } + + cert, err := tls.X509KeyPair(certPEM, keyPEM) + if err != nil { + return fmt.Errorf("tls.X509KeyPair: %s", err.Error()) + } + + c.tunnelCert = &cert + return nil + } +} + +// WithTunnelRootCA sets the cert to be used when verifying the server's identity. +func WithTunnelRootCA(ca *x509.CertPool) Option { + return func(c *server) error { + c.tunnelRootCAs = ca + return nil + } +} + +// WithConnectionRetryAttempts sets the number of times the client should retry opening or accepting a connection over +// the tunnel before failing permanently. +func WithConnectionRetryAttempts(connRetryAttempts int) Option { + return func(c *server) error { + c.connRetryAttempts = connRetryAttempts + return nil + } +} + +// WithConnectionRetryInterval sets the interval that the client should wait before retrying to open or accept a connection +// over the tunnel after failing. +func WithConnectionRetryInterval(connRetryInterval time.Duration) Option { + return func(c *server) error { + c.connRetryInterval = connRetryInterval + return nil + } +} + +func WithTunnelDialerOptions(opts ...tunnel.DialerOption) Option { + return func(c *server) error { + c.tunnelDialerOptions = opts + return nil + } +} diff --git a/guardian/pkg/server/proxy.go b/guardian/pkg/server/proxy.go new file mode 100644 index 00000000000..d62e6ffab24 --- /dev/null +++ b/guardian/pkg/server/proxy.go @@ -0,0 +1,157 @@ +// Copyright (c) 2025 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + gotls "crypto/tls" + "crypto/x509" + "fmt" + "net/http" + "net/http/httputil" + "os" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + log "github.com/sirupsen/logrus" + + "github.com/projectcalico/calico/crypto/pkg/tls" +) + +// Proxy proxies HTTP based on the provided list of targets +type Proxy struct { + mux *http.ServeMux +} + +// NewProxy returns an initialized Proxy +func NewProxy(tgts []Target) (*Proxy, error) { + p := &Proxy{ + mux: http.NewServeMux(), + } + + for i, t := range tgts { + if t.Dest == nil { + return nil, fmt.Errorf("bad target %d, no destination", i) + } + if len(t.CAPem) != 0 && t.Dest.Scheme != "https" { + log.Debugf("Configuring CA cert for secure communication %s for %s", t.CAPem, t.Dest.Scheme) + return nil, fmt.Errorf("CA configured for url scheme %q", t.Dest.Scheme) + } + hdlr, err := newTargetHandler(t) + if err != nil { + return nil, err + } + p.mux.HandleFunc(t.Path, hdlr) + log.Debugf("Proxy target %q -> %q", t.Path, t.Dest) + } + + return p, nil +} + +func newTargetHandler(tgt Target) (func(http.ResponseWriter, *http.Request), error) { + p := httputil.NewSingleHostReverseProxy(tgt.Dest) + p.FlushInterval = -1 + + if tgt.Transport != nil { + p.Transport = tgt.Transport + } else if tgt.Dest.Scheme == "https" { + tlsCfg := tls.NewTLSConfig() + + if tgt.AllowInsecureTLS { + tlsCfg.InsecureSkipVerify = true + } else { + if len(tgt.CAPem) == 0 { + return nil, fmt.Errorf("failed to create target handler for path %s: ca bundle was empty", tgt.Path) + } + + log.Debugf("Detected secure transport for %s. Will pick up system cert pool", tgt.Dest) + var ca *x509.CertPool + ca, err := x509.SystemCertPool() + if err != nil { + log.WithError(err).Warn("failed to get system cert pool, creating a new one") + ca = x509.NewCertPool() + } + + file, err := os.ReadFile(tgt.CAPem) + if err != nil { + return nil, errors.Wrap(err, fmt.Sprintf("could not read cert from file %s", tgt.CAPem)) + } + + ca.AppendCertsFromPEM(file) + tlsCfg.RootCAs = ca + } + + // If specified, load and include the provided client certificate for mTLS with the destination. + if tgt.ClientKeyPath != "" && tgt.ClientCertPath != "" { + clientCert, err := gotls.LoadX509KeyPair(tgt.ClientCertPath, tgt.ClientKeyPath) + if err != nil { + return nil, fmt.Errorf("error load cert key pair for linseed client: %s", err) + } + tlsCfg.Certificates = append(tlsCfg.Certificates, clientCert) + logrus.Info("Using provided client certificates for mTLS") + } + + p.Transport = &http.Transport{ + TLSClientConfig: tlsCfg, + } + } + + return func(w http.ResponseWriter, r *http.Request) { + logCtx := log.WithField("dst", tgt) + if tgt.PathRegexp != nil { + if !tgt.PathRegexp.MatchString(r.URL.Path) { + http.Error(w, "Not found", 404) + logCtx.Debugf("Received request %s rejected by PathRegexp %q", r.RequestURI, tgt.PathRegexp) + return + } + if tgt.PathReplace != nil { + logCtx.Debugf("Replacing URL path %s.", r.URL.Path) + r.URL.Path = tgt.PathRegexp.ReplaceAllString(r.URL.Path, string(tgt.PathReplace)) + logCtx.Debugf("Replaced URL path is now %s.", r.URL.Path) + } + if tgt.HostHeader != nil { + logCtx.Debugf("Rewriting host header to %s", *tgt.HostHeader) + r.Host = *tgt.HostHeader + } + } + + // Get the token value in the handler so if the Token changes we'll pick up the + // updated token. + if tgt.Token != nil { + tok, err := tgt.Token.Token() + if err != nil { + http.Error(w, "Internal Server Error, token read failure", 500) + logCtx.Errorf("Loading token failed %s", err) + } + tok.SetAuthHeader(r) + } + + logCtx.Debugf("Received request %s will proxy to %s", r.RequestURI, tgt.Dest) + + p.ServeHTTP(w, r) + }, nil +} + +// ServeHTTP knows how to proxy HTTP requests to different named targets +func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { + r.Header.Set("X-Forwarded-Host", r.Header.Get("Host")) + w.Header().Set("Strict-Transport-Security", "max-age=31536000; includeSubDomains") + p.mux.ServeHTTP(w, r) +} + +// GetTargetPath returns the target that would be used. +func (p *Proxy) GetTargetPath(r *http.Request) string { + _, pat := p.mux.Handler(r) + return pat +} diff --git a/guardian/pkg/server/server.go b/guardian/pkg/server/server.go new file mode 100644 index 00000000000..85023278bab --- /dev/null +++ b/guardian/pkg/server/server.go @@ -0,0 +1,196 @@ +// Copyright (c) 2025 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "net" + "net/http" + "strings" + "time" + + log "github.com/sirupsen/logrus" + + calicotls "github.com/projectcalico/calico/crypto/pkg/tls" + "github.com/projectcalico/calico/guardian/pkg/conn" + "github.com/projectcalico/calico/guardian/pkg/tunnel" +) + +type Server interface { + ListenAndServeCluster(ctx context.Context) error + ListenAndServeManagementCluster(ctx context.Context) error +} + +// Client is the voltron client. It is used by Guardian to establish a secure tunnel connection to the Voltron server and +// then enable managed cluster services and management cluster services to communicate with one another. +type server struct { + http *http.Server + proxyMux *http.ServeMux + targets []Target + + tunnelAddr string + tunnelCert *tls.Certificate + + // tunnelRootCAs defines the set of root certificate authorities that guardian will use when verifying voltron certificates. + // if nil, dialer will use the host's CA set. + tunnelRootCAs *x509.CertPool + // TunnelServerName defines the server name to be used when connecting to Voltron + tunnelServerName string + + tunnel tunnel.Tunnel + + connRetryAttempts int + connRetryInterval time.Duration + + listenPort string + listenHost string + + tunnelDialerOptions []tunnel.DialerOption +} + +func New(addr string, opts ...Option) (Server, error) { + var err error + srv := &server{ + http: new(http.Server), + + tunnelAddr: addr, + tunnelServerName: strings.Split(addr, ":")[0], + connRetryAttempts: 5, + connRetryInterval: 2 * time.Second, + listenPort: "8080", + } + + log.Infof("Tunnel Address: %s", srv.tunnelAddr) + for _, o := range opts { + if err := o(srv); err != nil { + return nil, fmt.Errorf("applying option failed: %w", err) + } + } + + log.Debug("expecting TLS server name: ", srv.tunnelServerName) + + // set the dialer for the tunnel manager if one hasn't been specified + tunnelAddress := srv.tunnelAddr + + var dialer tunnel.SessionDialer + if srv.tunnelCert == nil { + log.Warnf("No tunnel creds, using unsecured tunnel") + dialer, err = tunnel.NewSessionDialer(tunnelAddress, srv.tunnelDialerOptions...) + if err != nil { + return nil, fmt.Errorf("failed to create tunnel dialer: %w", err) + } + } else { + tunnelCert := srv.tunnelCert + tunnelRootCAs := srv.tunnelRootCAs + + tlsConfig := calicotls.NewTLSConfig() + tlsConfig.Certificates = []tls.Certificate{*tunnelCert} + tlsConfig.RootCAs = tunnelRootCAs + tlsConfig.ServerName = srv.tunnelServerName + + dialer, err = tunnel.NewTLSSessionDialer(tunnelAddress, tlsConfig, srv.tunnelDialerOptions...) + if err != nil { + return nil, fmt.Errorf("failed to create tunnel dialer: %w", err) + } + } + + for _, target := range srv.targets { + log.Infof("Will route traffic to %s for requests matching %s", target.Dest, target.Path) + } + + srv.proxyMux = http.NewServeMux() + srv.http.Handler = srv.proxyMux + + handler, err := NewProxy(srv.targets) + if err != nil { + return nil, fmt.Errorf("failed to create proxy: %w", err) + } + srv.proxyMux.Handle("/", handler) + + srv.tunnel, err = tunnel.NewTunnel(dialer) + if err != nil { + return nil, fmt.Errorf("failed to create tunnel: %w", err) + } + + return srv, nil +} + +func (srv *server) ListenAndServeManagementCluster(ctx context.Context) error { + if err := srv.tunnel.Connect(ctx); err != nil { + return fmt.Errorf("failed to connect to tunnel: %w", err) + } + + log.Debug("Getting listener for tunnel.") + listener, err := srv.tunnel.Listener(ctx) + if err != nil { + return err + } + + if srv.tunnelCert != nil { + // we need to upgrade the tunnel to a TLS listener to support HTTP2 on this side. + tlsConfig := calicotls.NewTLSConfig() + tlsConfig.Certificates = []tls.Certificate{*srv.tunnelCert} + tlsConfig.NextProtos = []string{"h2"} + listener = tls.NewListener(listener, tlsConfig) + log.Infof("serving HTTP/2 enabled") + } + + log.Infof("Starting to serve tunneled HTTP.") + + return srv.http.Serve(listener) +} + +func (srv *server) ListenAndServeCluster(ctx context.Context) error { + log.Infof("Listening on %s:%s for connections to proxy to voltron", srv.listenHost, srv.listenPort) + if err := srv.tunnel.Connect(ctx); err != nil { + return fmt.Errorf("failed to connect to tunnel: %w", err) + } + + listener, err := net.Listen("tcp", fmt.Sprintf("%s:%s", srv.listenHost, srv.listenPort)) + if err != nil { + return fmt.Errorf("failed to listen on %s:%s: %w", srv.listenHost, srv.listenPort, err) + } + + defer wrapErrFunc(listener.Close, "Failed to close listener.") + + for { + // TODO Consider throttling the number of connections this accepts. + srcConn, err := listener.Accept() + if err != nil { + return err + } + + dstConn, err := srv.tunnel.Open(ctx) + if err != nil { + if err := srcConn.Close(); err != nil { + log.WithError(err).Error("failed to close source connection") + } + + log.WithError(err).Error("failed to open connection to the tunnel") + return err + } + + go conn.Forward(srcConn, dstConn) + } +} + +func wrapErrFunc(f func() error, errMessage string) { + if err := f(); err != nil { + log.WithError(err).Error(errMessage) + } +} diff --git a/guardian/pkg/server/target.go b/guardian/pkg/server/target.go new file mode 100644 index 00000000000..62ee864a857 --- /dev/null +++ b/guardian/pkg/server/target.go @@ -0,0 +1,169 @@ +// Copyright (c) 2025 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "fmt" + "net/http" + "net/url" + "os" + "regexp" + + "github.com/sirupsen/logrus" + "golang.org/x/oauth2" + "k8s.io/client-go/transport" +) + +type strAsByteSlice []byte + +// Target is the format for env variable to set proxy targets +type TargetParam struct { + // Path is the path portion of the URL based on which we proxy + Path string `json:"path"` + // Dest is the destination URL + Dest string `json:"destination"` + // TokenPath is where we read the Bearer token from (if non-empty) + TokenPath string `json:"tokenPath,omitempty"` + // CABundlePath is where we read the CA bundle from to authenticate the + // destination (if non-empty) + CABundlePath string `json:"caBundlePath,omitempty"` + // PathRegexp, if not nil, checks if Regexp matches the path + PathRegexp strAsByteSlice `json:"pathRegexp,omitempty"` + // PathReplace if not nil will be used to replace PathRegexp matches + PathReplace strAsByteSlice `json:"pathReplace,omitempty"` + + // HostHeader rewrites the host value for the proxied request. + HostHeader *string `json:"hostHeader,omitempty"` + // AllowInsecureTLS allows https with insecure tls settings + AllowInsecureTLS bool `json:"allowInsecureTLS,omitempty"` + + // ClientCertPath and ClientKeyPath can be set for mTLS on the connection + // from Voltron to the destination. + ClientCertPath string `json:"clientCertPath"` + ClientKeyPath string `json:"clientKeyPath"` + + Unauthenticated bool `json:"unauthenticated,omitempty"` +} + +// Target describes which path is proxied to what destination URL +type Target struct { + Path string + Dest *url.URL + Token oauth2.TokenSource + CAPem string + + // PathRegexp, if not nil, check if Regexp matches the path + PathRegexp *regexp.Regexp + // PathReplace if not nil will be used to replace PathRegexp matches + PathReplace []byte + + // HostHeader if not nil will replace the Host header for the proxied request. + HostHeader *string + + // Transport to use for this target. If nil, Proxy will provide one + Transport http.RoundTripper + AllowInsecureTLS bool + + // Configures client key and certificate for mTLS from Voltron with the target. + ClientKeyPath string + ClientCertPath string +} + +type TargetOption func(*Target) error + +func WithAllowInsecureTLS() TargetOption { + return func(t *Target) error { + t.AllowInsecureTLS = true + return nil + } +} + +func WithToken(path string) TargetOption { + return func(t *Target) error { + _, err := os.ReadFile(path) + if err != nil { + return fmt.Errorf("failed reading token from %s: %w", path, err) + } + + t.Token = transport.NewCachedFileTokenSource(path) + return nil + } +} + +func WithCAPem(path string) TargetOption { + return func(t *Target) error { + t.CAPem = path + return nil + } +} + +func WithPathReplace(path string, reg string) TargetOption { + return func(t *Target) error { + t.PathReplace = []byte(path) + r, err := regexp.Compile(reg) + if err != nil { + return fmt.Errorf("PathRegexp failed: %s", err) + } + t.PathRegexp = r + return nil + } +} + +func WithHostHeader(hostHeader string) TargetOption { + return func(t *Target) error { + t.HostHeader = &hostHeader + return nil + } +} + +func WithCertKeyPair(certPath, keyPath string) TargetOption { + return func(t *Target) error { + t.ClientCertPath = certPath + t.ClientKeyPath = keyPath + return nil + } +} + +func MustCreateTarget(path, dest string, opts ...TargetOption) Target { + if path == "" { + logrus.Fatal("proxy target path cannot be empty") + } + + destURL, err := url.Parse(dest) + if err != nil { + logrus.WithError(err).Fatalf("incorrect URL %s for path %s", dest, path) + } + + target := &Target{ + Path: path, + Dest: destURL, + } + + for _, opt := range opts { + if err := opt(target); err != nil { + logrus.WithError(err).Fatalf("failed to apply option") + } + } + + if target.Dest.Scheme == "https" && !target.AllowInsecureTLS && target.CAPem == "" { + logrus.Fatalf("target for path '%s' must specify the ca bundle if AllowInsecureTLS is false when the scheme is https", path) + } + + if target.PathReplace != nil && target.PathRegexp == nil { + logrus.Fatalf("PathReplace specified but PathRegexp is not") + } + + return *target +} diff --git a/guardian/pkg/thirdpartymocks/net/Conn.go b/guardian/pkg/thirdpartymocks/net/Conn.go new file mode 100644 index 00000000000..33bee12dd1c --- /dev/null +++ b/guardian/pkg/thirdpartymocks/net/Conn.go @@ -0,0 +1,197 @@ +// Code generated by mockery v2.50.4. DO NOT EDIT. + +package net + +import ( + net "net" + time "time" + + mock "github.com/stretchr/testify/mock" +) + +// Conn is an autogenerated mock type for the Conn type +type Conn struct { + mock.Mock +} + +// Close provides a mock function with no fields +func (_m *Conn) Close() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Close") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// LocalAddr provides a mock function with no fields +func (_m *Conn) LocalAddr() net.Addr { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for LocalAddr") + } + + var r0 net.Addr + if rf, ok := ret.Get(0).(func() net.Addr); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(net.Addr) + } + } + + return r0 +} + +// Read provides a mock function with given fields: b +func (_m *Conn) Read(b []byte) (int, error) { + ret := _m.Called(b) + + if len(ret) == 0 { + panic("no return value specified for Read") + } + + var r0 int + var r1 error + if rf, ok := ret.Get(0).(func([]byte) (int, error)); ok { + return rf(b) + } + if rf, ok := ret.Get(0).(func([]byte) int); ok { + r0 = rf(b) + } else { + r0 = ret.Get(0).(int) + } + + if rf, ok := ret.Get(1).(func([]byte) error); ok { + r1 = rf(b) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RemoteAddr provides a mock function with no fields +func (_m *Conn) RemoteAddr() net.Addr { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for RemoteAddr") + } + + var r0 net.Addr + if rf, ok := ret.Get(0).(func() net.Addr); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(net.Addr) + } + } + + return r0 +} + +// SetDeadline provides a mock function with given fields: t +func (_m *Conn) SetDeadline(t time.Time) error { + ret := _m.Called(t) + + if len(ret) == 0 { + panic("no return value specified for SetDeadline") + } + + var r0 error + if rf, ok := ret.Get(0).(func(time.Time) error); ok { + r0 = rf(t) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SetReadDeadline provides a mock function with given fields: t +func (_m *Conn) SetReadDeadline(t time.Time) error { + ret := _m.Called(t) + + if len(ret) == 0 { + panic("no return value specified for SetReadDeadline") + } + + var r0 error + if rf, ok := ret.Get(0).(func(time.Time) error); ok { + r0 = rf(t) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SetWriteDeadline provides a mock function with given fields: t +func (_m *Conn) SetWriteDeadline(t time.Time) error { + ret := _m.Called(t) + + if len(ret) == 0 { + panic("no return value specified for SetWriteDeadline") + } + + var r0 error + if rf, ok := ret.Get(0).(func(time.Time) error); ok { + r0 = rf(t) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Write provides a mock function with given fields: b +func (_m *Conn) Write(b []byte) (int, error) { + ret := _m.Called(b) + + if len(ret) == 0 { + panic("no return value specified for Write") + } + + var r0 int + var r1 error + if rf, ok := ret.Get(0).(func([]byte) (int, error)); ok { + return rf(b) + } + if rf, ok := ret.Get(0).(func([]byte) int); ok { + r0 = rf(b) + } else { + r0 = ret.Get(0).(int) + } + + if rf, ok := ret.Get(1).(func([]byte) error); ok { + r1 = rf(b) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewConn creates a new instance of Conn. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewConn(t interface { + mock.TestingT + Cleanup(func()) +}) *Conn { + mock := &Conn{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/guardian/pkg/tunnel/dial.go b/guardian/pkg/tunnel/dial.go new file mode 100644 index 00000000000..16eaa4f844a --- /dev/null +++ b/guardian/pkg/tunnel/dial.go @@ -0,0 +1,256 @@ +// Copyright (c) 2025 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tunnel + +import ( + "bufio" + "context" + "crypto/tls" + "crypto/x509" + "encoding/base64" + "errors" + "fmt" + "net" + "net/http" + "net/url" + "strings" + "time" + + "github.com/hashicorp/yamux" + "github.com/sirupsen/logrus" + + calicoTLS "github.com/projectcalico/calico/crypto/pkg/tls" + "github.com/projectcalico/calico/guardian/pkg/cryptoutils" +) + +const ( + defaultDialTimeout = 60 * time.Second + defaultDialRetries = 5 + defaultDialRetryInterval = 2 * time.Second + defaultKeepAlive = true + defaultKeepAliveInterval = 100 * time.Millisecond + defaultSessionBacklog = 1000 +) + +type SessionDialer interface { + Dial() (Session, error) +} + +type Session interface { + Open() (net.Conn, error) + Accept() (net.Conn, error) + Addr() net.Addr + Close() error +} + +type sessionDialer struct { + addr string + + tlsConfig *tls.Config + + retryAttempts int + retryInterval time.Duration + timeout time.Duration + keepAliveEnable bool + keepAliveInterval time.Duration + + // If set, the default tunnel dialer will issue an HTTP CONNECT to this URL to establish a TCP pass-through connection to Voltron. + httpProxyURL *url.URL +} + +// NewSessionDialer creates a new Dialer. +func NewSessionDialer(addr string, opts ...DialerOption) (SessionDialer, error) { + d := &sessionDialer{ + addr: addr, + retryAttempts: defaultDialRetries, + retryInterval: defaultDialRetryInterval, + timeout: defaultDialTimeout, + keepAliveEnable: defaultKeepAlive, + keepAliveInterval: defaultKeepAliveInterval, + } + + for _, opt := range opts { + if err := opt(d); err != nil { + return nil, fmt.Errorf("applying option failed: %w", err) + } + } + + return d, nil +} + +func NewTLSSessionDialer(addr string, tlsConfig *tls.Config, opts ...DialerOption) (SessionDialer, error) { + d := &sessionDialer{ + addr: addr, + tlsConfig: tlsConfig, + retryAttempts: defaultDialRetries, + retryInterval: defaultDialRetryInterval, + timeout: defaultDialTimeout, + } + + for _, opt := range opts { + if err := opt(d); err != nil { + return nil, fmt.Errorf("applying option failed: %w", err) + } + } + + return d, nil +} + +func (d *sessionDialer) Dial() (Session, error) { + var dialFunc func() (net.Conn, error) + if d.tlsConfig == nil { + dialFunc = func() (net.Conn, error) { return net.Dial("tcp", d.addr) } + } else { + dialFunc = d.dialTLS + } + conn, err := dialRetry(dialFunc, d.retryAttempts, d.retryInterval, d.timeout) + if err != nil { + return nil, err + } + + config := yamux.DefaultConfig() + config.AcceptBacklog = defaultSessionBacklog + config.EnableKeepAlive = d.keepAliveEnable + config.KeepAliveInterval = d.keepAliveInterval + config.LogOutput = &logrusWriter{logrus.WithField("component", "tunnel-yamux")} + session, err := yamux.Client(conn, config) + if err != nil { + return nil, fmt.Errorf("failed creating muxer: %s", err) + } + return session, nil +} + +// DialTLS creates a TLS connection based on the config, must not be nil. +func (d *sessionDialer) dialTLS() (net.Conn, error) { + logrus.Infof("Starting TLS dial to %s with a timeout of %v", d.addr, d.timeout) + + // First, establish the mTLS connection that serves as the basis of the tunnel. + var c net.Conn + var err error + dialer := newDialer(d.timeout) + if d.httpProxyURL != nil { + // mTLS will be negotiated over a TCP connection to the proxy, which performs TCP passthrough to the target. + logrus.Infof("Dialing to %s via HTTP proxy at %s", d.addr, d.httpProxyURL) + c, err = tlsDialViaHTTPProxy(dialer, d.addr, d.httpProxyURL, d.tlsConfig, calicoTLS.NewTLSConfig()) + if err != nil { + return nil, fmt.Errorf("TLS dial via HTTP proxy failed: %w", err) + } + } else { + // mTLS will be negotiated over a TCP connection directly to the target. + logrus.Infof("Dialing directly to %s", d.addr) + c, err = tls.DialWithDialer(dialer, "tcp", d.addr, d.tlsConfig) + if err != nil { + return nil, fmt.Errorf("TLS dial failed: %w", err) + } + } + logrus.Infof("TLS dial to %s succeeded: basis connection for the tunnel has been established", d.addr) + + // Then, create the tunnel on top of the mTLS connection. + return c, nil +} + +func newDialer(timeout time.Duration) *net.Dialer { + // We need to explicitly set the timeout as it seems it's possible for this to hang indefinitely if we don't. + return &net.Dialer{ + Timeout: timeout, + } +} + +func tlsDialViaHTTPProxy(d *net.Dialer, destination string, proxyTargetURL *url.URL, tunnelTLS *tls.Config, proxyTLS *tls.Config) (net.Conn, error) { + // Establish the TCP connection to the proxy. + var c net.Conn + var err error + if proxyTargetURL.Scheme == "https" { + c, err = tls.DialWithDialer(d, "tcp", proxyTargetURL.Host, proxyTLS) + } else { + c, err = d.DialContext(context.Background(), "tcp", proxyTargetURL.Host) + } + if err != nil { + return nil, fmt.Errorf("dialing proxy %q failed: %v", proxyTargetURL.Host, err) + } + + // Build the HTTP CONNECT request. + var requestBuilder strings.Builder + requestBuilder.WriteString(fmt.Sprintf("CONNECT %s HTTP/1.1\r\nHost: %s\r\n", destination, destination)) + if proxyTargetURL.User != nil { + username := proxyTargetURL.User.Username() + password, _ := proxyTargetURL.User.Password() + encodedCredentials := base64.StdEncoding.EncodeToString([]byte(username + ":" + password)) + requestBuilder.WriteString(fmt.Sprintf("Proxy-Authorization: Basic %s\r\n", encodedCredentials)) + } + requestBuilder.WriteString("\r\n") + + // Send the HTTP CONNECT request to the proxy. + _, err = fmt.Fprint(c, requestBuilder.String()) + if err != nil { + return nil, fmt.Errorf("writing HTTP CONNECT to proxy %s failed: %v", proxyTargetURL.Host, err) + } + br := bufio.NewReader(c) + res, err := http.ReadResponse(br, nil) + if err != nil { + return nil, fmt.Errorf("reading HTTP response from CONNECT to %s via proxy %s failed: %v", destination, proxyTargetURL.Host, err) + } + if res.StatusCode != 200 { + return nil, fmt.Errorf("proxy error from %s while dialing %s: %v", proxyTargetURL.Host, destination, res.Status) + } + if br.Buffered() > 0 { + // After the CONNECT was handled by the server, the client should be the first to talk to initiate the TLS handshake. + // If we reach this point, the server spoke before the client, so something went wrong. + return nil, fmt.Errorf("unexpected %d bytes of buffered data from CONNECT proxy %q", br.Buffered(), proxyTargetURL.Host) + } + + // When we've reached this point, the proxy should now passthrough any TCP segments written to our connection to the destination. + // Any TCP segments sent by the destination should also be readable on our connection. + + // Negotiate mTLS on top of our passthrough connection. + mtlsC := tls.Client(c, tunnelTLS) + if err := mtlsC.HandshakeContext(context.Background()); err != nil { + mtlsC.Close() + return nil, err + } + return mtlsC, nil +} + +func dialRetry(f func() (net.Conn, error), retryAttempts int, retryInterval time.Duration, timeout time.Duration) (net.Conn, error) { + var err error + var conn net.Conn + + done := make(chan struct{}) + go func() { + for i := 0; i < retryAttempts; i++ { + conn, err = f() + if err != nil { + var xerr x509.UnknownAuthorityError + if errors.As(err, &xerr) { + logrus.WithError(err).Infof("TLS dial failed: %s. fingerprint='%s' issuerCommonName='%s' subjectCommonName='%s'", xerr.Error(), cryptoutils.GenerateFingerprint(xerr.Cert), xerr.Cert.Issuer.CommonName, xerr.Cert.Subject.CommonName) + } else { + logrus.WithError(err).Infof("TLS dial attempt %d failed, will retry in %s", i, retryInterval.String()) + } + time.Sleep(retryInterval) + continue + } + break + } + close(done) + }() + + select { + case <-done: + case <-time.After(timeout): + return nil, fmt.Errorf("dialer timed out after %s", timeout.String()) + } + + return conn, err +} diff --git a/guardian/pkg/tunnel/doc.go b/guardian/pkg/tunnel/doc.go new file mode 100644 index 00000000000..50048891be0 --- /dev/null +++ b/guardian/pkg/tunnel/doc.go @@ -0,0 +1,17 @@ +// Copyright (c) 2025 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package tunnel defines an authenticated tunnel API, that allows creating byte +// pipes in both directions, initiated from either side of the tunnel. +package tunnel diff --git a/guardian/pkg/tunnel/log.go b/guardian/pkg/tunnel/log.go new file mode 100644 index 00000000000..30806fcee17 --- /dev/null +++ b/guardian/pkg/tunnel/log.go @@ -0,0 +1,26 @@ +// Copyright (c) 2025 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tunnel + +import "github.com/sirupsen/logrus" + +type logrusWriter struct { + log *logrus.Entry +} + +func (l *logrusWriter) Write(p []byte) (n int, err error) { + l.log.Info(string(p)) + return len(p), nil +} diff --git a/guardian/pkg/tunnel/mocks/Session.go b/guardian/pkg/tunnel/mocks/Session.go new file mode 100644 index 00000000000..1a04cdfeec5 --- /dev/null +++ b/guardian/pkg/tunnel/mocks/Session.go @@ -0,0 +1,126 @@ +// Code generated by mockery v2.50.4. DO NOT EDIT. + +package mocks + +import ( + net "net" + + mock "github.com/stretchr/testify/mock" +) + +// Session is an autogenerated mock type for the Session type +type Session struct { + mock.Mock +} + +// Accept provides a mock function with no fields +func (_m *Session) Accept() (net.Conn, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Accept") + } + + var r0 net.Conn + var r1 error + if rf, ok := ret.Get(0).(func() (net.Conn, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() net.Conn); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(net.Conn) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Addr provides a mock function with no fields +func (_m *Session) Addr() net.Addr { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Addr") + } + + var r0 net.Addr + if rf, ok := ret.Get(0).(func() net.Addr); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(net.Addr) + } + } + + return r0 +} + +// Close provides a mock function with no fields +func (_m *Session) Close() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Close") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Open provides a mock function with no fields +func (_m *Session) Open() (net.Conn, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Open") + } + + var r0 net.Conn + var r1 error + if rf, ok := ret.Get(0).(func() (net.Conn, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() net.Conn); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(net.Conn) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewSession creates a new instance of Session. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewSession(t interface { + mock.TestingT + Cleanup(func()) +}) *Session { + mock := &Session{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/guardian/pkg/tunnel/mocks/SessionDialer.go b/guardian/pkg/tunnel/mocks/SessionDialer.go new file mode 100644 index 00000000000..294f446b870 --- /dev/null +++ b/guardian/pkg/tunnel/mocks/SessionDialer.go @@ -0,0 +1,58 @@ +// Code generated by mockery v2.50.4. DO NOT EDIT. + +package mocks + +import ( + mock "github.com/stretchr/testify/mock" + + tunnel "github.com/projectcalico/calico/guardian/pkg/tunnel" +) + +// SessionDialer is an autogenerated mock type for the SessionDialer type +type SessionDialer struct { + mock.Mock +} + +// Dial provides a mock function with no fields +func (_m *SessionDialer) Dial() (tunnel.Session, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Dial") + } + + var r0 tunnel.Session + var r1 error + if rf, ok := ret.Get(0).(func() (tunnel.Session, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() tunnel.Session); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(tunnel.Session) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewSessionDialer creates a new instance of SessionDialer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewSessionDialer(t interface { + mock.TestingT + Cleanup(func()) +}) *SessionDialer { + mock := &SessionDialer{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/guardian/pkg/tunnel/options.go b/guardian/pkg/tunnel/options.go new file mode 100644 index 00000000000..c7180a8e350 --- /dev/null +++ b/guardian/pkg/tunnel/options.go @@ -0,0 +1,66 @@ +// Copyright (c) 2025 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tunnel + +import ( + "errors" + "net/url" + "time" +) + +// Option is a common format for New() options +type Option func(*tunnel) error + +type DialerOption func(*sessionDialer) error + +// WithKeepAliveSettings sets the Keep Alive settings for the tunnel. +func WithDialerKeepAliveSettings(enable bool, intervalDuration time.Duration) DialerOption { + return func(dialer *sessionDialer) error { + dialer.keepAliveEnable = enable + dialer.keepAliveInterval = intervalDuration + return nil + } +} + +func WithDialerTimeout(dialTimeout time.Duration) DialerOption { + return func(dialer *sessionDialer) error { + dialer.timeout = dialTimeout + return nil + } +} + +func WithDialerRetryAttempts(retryAttempts int) DialerOption { + return func(dialer *sessionDialer) error { + dialer.retryAttempts = retryAttempts + return nil + } +} + +func WithDialerRetryInterval(retryInterval time.Duration) DialerOption { + return func(dialer *sessionDialer) error { + dialer.retryInterval = retryInterval + return nil + } +} + +func WithDialerHTTPProxyURL(httpProxyURL *url.URL) DialerOption { + return func(dialer *sessionDialer) error { + if dialer.tlsConfig == nil { + return errors.New("WithHTTPProxyURL: TLS dialer is required to use HTTP proxy") + } + dialer.httpProxyURL = httpProxyURL + return nil + } +} diff --git a/guardian/pkg/tunnel/suite_test.go b/guardian/pkg/tunnel/suite_test.go new file mode 100644 index 00000000000..6ea9487c36b --- /dev/null +++ b/guardian/pkg/tunnel/suite_test.go @@ -0,0 +1,25 @@ +// Copyright (c) 2025 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tunnel_test + +import ( + "testing" + + . "github.com/onsi/gomega" +) + +func setupTest(t *testing.T) { + RegisterTestingT(t) +} diff --git a/guardian/pkg/tunnel/tunnel.go b/guardian/pkg/tunnel/tunnel.go new file mode 100644 index 00000000000..b98ea1196b0 --- /dev/null +++ b/guardian/pkg/tunnel/tunnel.go @@ -0,0 +1,264 @@ +// Copyright (c) 2025 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tunnel + +import ( + "context" + "io" + "net" + "sync" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + + "github.com/projectcalico/calico/guardian/pkg/chanutil" +) + +const ( + tunnelNetwork = "voltron-tunnel" +) + +type Tunnel interface { + Connect(context.Context) error + Open(context.Context) (net.Conn, error) + Listener(context.Context) (net.Listener, error) + WaitForClose() <-chan struct{} +} + +type ObjectWithErr[Obj any] struct { + Obj Obj + Err error +} + +func newObjectWithErr[Obj any](obj Obj, err error) ObjectWithErr[Obj] { + return ObjectWithErr[Obj]{Obj: obj, Err: err} +} + +// Tunnel represents either side of the tunnel that allows waiting for, +// accepting and initiating creation of new BytePipes. +type tunnel struct { + openConnService chanutil.Service[any, net.Conn] + getListenerService chanutil.Service[any, net.Listener] + acceptConnService chanutil.Service[any, net.Conn] + getAddrService chanutil.Service[any, net.Addr] + + connectOnce sync.Once + + dialing bool + dialer SessionDialer + session Session + sessionChan chan ObjectWithErr[Session] + + closed chan struct{} +} + +func (t *tunnel) WaitForClose() <-chan struct{} { + return t.closed +} + +func NewTunnel(dialer SessionDialer, opts ...Option) (Tunnel, error) { + return newTunnel(dialer, opts...) +} + +func newTunnel(dialer SessionDialer, opts ...Option) (*tunnel, error) { + t := &tunnel{ + dialer: dialer, + openConnService: chanutil.NewService[any, net.Conn](0), + getListenerService: chanutil.NewService[any, net.Listener](0), + acceptConnService: chanutil.NewService[any, net.Conn](0), + getAddrService: chanutil.NewService[any, net.Addr](0), + sessionChan: make(chan ObjectWithErr[Session]), + closed: make(chan struct{}), + } + + for _, o := range opts { + if err := o(t); err != nil { + return nil, errors.WithMessage(err, "applying option failed") + } + } + + return t, nil +} + +// Connect connects to the other side of the tunnel. The Tunnel cannot be used before this function is called, otherwise +// it will panic. +func (t *tunnel) Connect(ctx context.Context) error { + // TODO consider adding the context to the service loop so that if this is called multiple times it's not actually shut + // TODO down if one context is closed? + var err error + t.connectOnce.Do(func() { + t.session, err = t.dialer.Dial() + if err != nil { + logrus.WithError(err).Error("Failed to open initial connection.") + return + } + go t.startServiceLoop(ctx) + }) + return err +} + +func (t *tunnel) startServiceLoop(ctx context.Context) { + defer t.openConnService.Close() + defer t.getListenerService.Close() + defer t.acceptConnService.Close() + defer t.getAddrService.Close() + defer close(t.sessionChan) + + openConnReqs := chanutil.NewRequestsHandler(func(any) (net.Conn, error) { return t.session.Open() }) + getListenerReqs := chanutil.NewRequestsHandler(func(any) (net.Listener, error) { + return newListener(t), nil + }) + acceptConnReqs := chanutil.NewRequestsHandler(func(any) (net.Conn, error) { return t.session.Accept() }) + getAddrReqs := chanutil.NewRequestsHandler(func(any) (net.Addr, error) { + return newTunnelAddress(t.session.Addr().String()), nil + }) + requestHandlers := []interface { + Handle() error + ReturnError(error) + Close() + }{openConnReqs, acceptConnReqs, getListenerReqs, getAddrReqs} + + var fatalErr error + defer func() { + defer close(t.closed) + if t.session != nil { + if err := t.session.Close(); err != nil { + logrus.WithError(err).Error("Failed to close mux.") + } + } + // Return an error for all open requests. + if fatalErr != nil { + for _, hdlr := range requestHandlers { + hdlr.ReturnError(fatalErr) + } + return + } + }() + + for { + select { + case req := <-t.openConnService.Listen(): + openConnReqs.Add(req) + case req := <-t.getListenerService.Listen(): + getListenerReqs.Add(req) + case req := <-t.acceptConnService.Listen(): + acceptConnReqs.Add(req) + case req := <-t.getAddrService.Listen(): + getAddrReqs.Add(req) + case req := <-t.sessionChan: + if req.Err != nil { + fatalErr = req.Err + return + } + t.session = req.Obj + t.dialing = false + case <-ctx.Done(): + return + } + + // If we're dialing to acquire the session then continue since be can't handle any of the outstanding requests. + if t.dialing { + logrus.Info("Skipping handling of requests while waiting for session to be established.") + continue + } + + logrus.Info("Handling requests.") + for _, hdlr := range requestHandlers { + if err := hdlr.Handle(); err != nil { + if err != io.EOF { + logrus.WithError(err).Error("Failed to handle request, closing tunnel permanently.") + fatalErr = err + return + } + + logrus.Info("Session was closed, recreating it.") + t.reCreateSession() + } + } + } +} + +func (t *tunnel) reCreateSession() { + if !t.dialing { + t.dialing = true + go func() { + mux, err := t.dialer.Dial() + t.sessionChan <- newObjectWithErr(mux, err) + }() + } +} + +func (t *tunnel) Listener(ctx context.Context) (net.Listener, error) { + return t.getListenerService.Send(ctx, nil) +} + +func (t *tunnel) accept() (net.Conn, error) { + return t.acceptConnService.Send(context.Background(), nil) +} + +// Addr returns the address of this tunnel sides endpoint. +func (t *tunnel) addr(ctx context.Context) (net.Addr, error) { + return t.getAddrService.Send(ctx, nil) +} + +// Open opens a new net.Conn to the other side of the tunnel. Returns when +func (t *tunnel) Open(ctx context.Context) (net.Conn, error) { + return t.openConnService.Send(ctx, nil) +} + +func newTunnelAddress(addr string) net.Addr { + return tunnelAddress{addr: addr} +} + +type tunnelAddress struct { + addr string +} + +func (a tunnelAddress) Network() string { + return tunnelNetwork +} + +func (a tunnelAddress) String() string { + return a.addr +} + +// listener implements the net.Listener interface and is used by the Manager to allow components to listen for connections +// over the tunnel +type listener struct { + tunnel *tunnel +} + +func newListener(tunnel *tunnel) *listener { + return &listener{tunnel: tunnel} +} + +// Accept waits for a connection to be opened from the other side of the connection and returns it. +func (l *listener) Accept() (net.Conn, error) { + return l.tunnel.accept() +} + +// Close closes the listener. A closed listener cannot be used again +func (l *listener) Close() error { + return nil +} + +func (l *listener) Addr() net.Addr { + // TODO I'm wondering if we should instead set this when we create the listener... + a, err := l.tunnel.addr(context.Background()) + if err != nil { + return nil + } + return a +} diff --git a/guardian/pkg/tunnel/tunnel_test.go b/guardian/pkg/tunnel/tunnel_test.go new file mode 100644 index 00000000000..fb12425c807 --- /dev/null +++ b/guardian/pkg/tunnel/tunnel_test.go @@ -0,0 +1,164 @@ +// Copyright (c) 2025 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tunnel_test + +import ( + "context" + "errors" + "io" + "testing" + + . "github.com/onsi/gomega" + + netmocks "github.com/projectcalico/calico/guardian/pkg/thirdpartymocks/net" + "github.com/projectcalico/calico/guardian/pkg/tunnel" + tunmocks "github.com/projectcalico/calico/guardian/pkg/tunnel/mocks" +) + +func TestTunnelOpenConnection(t *testing.T) { + setupTest(t) + + tt := []struct { + description string + setSession func(*tunmocks.Session) + expectedErr error + }{ + { + description: "session opens immediately", + setSession: func(session *tunmocks.Session) { + session. + On("Close").Return(nil).Once(). + On("Open").Return(netmocks.NewConn(t), nil).Once() + }, + }, + { + description: "session fails to open first with EOF then succeeds", + setSession: func(session *tunmocks.Session) { + session. + On("Close").Return(nil).Once(). + On("Open").Return(nil, io.EOF).Once(). + On("Open").Return(netmocks.NewConn(t), nil).Once() + }, + }, + { + description: "session to open with non EOF error and returns an error", + setSession: func(session *tunmocks.Session) { + session. + On("Close").Return(nil).Once(). + On("Open").Return(nil, errors.New("some error")).Once() + }, + expectedErr: errors.New("some error"), + }, + } + + for _, tc := range tt { + t.Run(tc.description, func(t *testing.T) { + mockDialer := tunmocks.NewSessionDialer(t) + mockSession := tunmocks.NewSession(t) + + mockDialer.On("Dial").Return(mockSession, nil) + tc.setSession(mockSession) + + tun, err := tunnel.NewTunnel(mockDialer) + Expect(err).NotTo(HaveOccurred()) + + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + cancel() + <-tun.WaitForClose() + }() + + Expect(tun.Connect(ctx)).ShouldNot(HaveOccurred()) + con, err := tun.Open(ctx) + if tc.expectedErr != nil { + Expect(err).Should(HaveOccurred()) + Expect(con).Should(BeNil()) + } else { + Expect(err).ShouldNot(HaveOccurred()) + Expect(con).ShouldNot(BeNil()) + } + }) + } +} + +func TestTunnelAcceptConnection(t *testing.T) { + setupTest(t) + + tt := []struct { + description string + setSession func(*tunmocks.Session) + expectedErr error + }{ + { + description: "listener accepts connection initially", + setSession: func(session *tunmocks.Session) { + session. + On("Close").Return(nil).Once(). + On("Accept").Return(netmocks.NewConn(t), nil) + }, + }, + { + description: "listener fails to accept connection initially with EOF then succeeds", + setSession: func(session *tunmocks.Session) { + session. + On("Close").Return(nil).Once(). + On("Accept").Return(nil, io.EOF).Once(). + On("Accept").Return(netmocks.NewConn(t), nil) + }, + }, + { + description: "listener fails to accept connection with non EOF error and returns an error", + setSession: func(session *tunmocks.Session) { + session. + On("Close").Return(nil).Once(). + On("Accept").Return(nil, errors.New("some error")).Once() + }, + expectedErr: errors.New("some error"), + }, + } + + for _, tc := range tt { + t.Run(tc.description, func(t *testing.T) { + mockDialer := tunmocks.NewSessionDialer(t) + mockSession := tunmocks.NewSession(t) + + mockDialer.On("Dial").Return(mockSession, nil) + tc.setSession(mockSession) + + tun, err := tunnel.NewTunnel(mockDialer) + Expect(err).NotTo(HaveOccurred()) + + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + cancel() + <-tun.WaitForClose() + }() + + Expect(tun.Connect(ctx)).ShouldNot(HaveOccurred()) + listener, err := tun.Listener(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(listener).NotTo(BeNil()) + + conn, err := listener.Accept() + if tc.expectedErr != nil { + Expect(err).Should(HaveOccurred()) + Expect(conn).Should(BeNil()) + } else { + Expect(err).ShouldNot(HaveOccurred()) + Expect(conn).ShouldNot(BeNil()) + } + }) + } +} diff --git a/guardian/pkg/version/version.go b/guardian/pkg/version/version.go new file mode 100644 index 00000000000..578f3c6a6a7 --- /dev/null +++ b/guardian/pkg/version/version.go @@ -0,0 +1,37 @@ +// Copyright (c) 2025 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package version + +import "fmt" + +// BuildVersion stores the SemVer for the given build +var BuildVersion string + +// BuildDate stores the date of the build +var BuildDate string + +// GitDescription stores the tag description +var GitDescription string + +// GitRevision stores git commit hash for the given build +var GitRevision string + +// Version prints version and build information. +func Version() { + fmt.Println("Version: ", BuildVersion) + fmt.Println("Build date: ", BuildDate) + fmt.Println("Git tag ref: ", GitDescription) + fmt.Println("Git commit: ", GitRevision) +}