Skip to content

Commit

Permalink
[cherry-pick] fix local dns proxy. (#467) (#468)
Browse files Browse the repository at this point in the history
Co-authored-by: Cybwan <[email protected]>
  • Loading branch information
reaver-flomesh and cybwan authored Nov 22, 2024
1 parent efe559c commit 6627e54
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 48 deletions.
64 changes: 28 additions & 36 deletions cmd/fsm-controller/fsm-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/go-logr/zerologr"

"github.com/flomesh-io/fsm/pkg/debugger"
mgrecon "github.com/flomesh-io/fsm/pkg/manager/reconciler"

"github.com/flomesh-io/fsm/pkg/dns"
Expand Down Expand Up @@ -73,6 +72,7 @@ import (
"github.com/flomesh-io/fsm/pkg/certificate/providers"
"github.com/flomesh-io/fsm/pkg/configurator"
"github.com/flomesh-io/fsm/pkg/constants"
"github.com/flomesh-io/fsm/pkg/debugger"
"github.com/flomesh-io/fsm/pkg/endpoint"
"github.com/flomesh-io/fsm/pkg/errcode"
"github.com/flomesh-io/fsm/pkg/health"
Expand Down Expand Up @@ -283,6 +283,11 @@ func main() {

// This component will be watching resources in the config.flomesh.io API group
cfg := configurator.NewConfigurator(informerCollection, fsmNamespace, fsmMeshConfigName, msgBroker)
err = sidecar.InstallDriver(cfg.GetSidecarClass())
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating sidecar driver")
}

k8sClient := k8s.NewKubernetesController(informerCollection, policyClient, pluginClient, msgBroker)
meshSpec := smi.NewSMIClient(informerCollection, fsmNamespace, k8sClient, msgBroker)

Expand Down Expand Up @@ -342,45 +347,22 @@ func main() {
msgBroker,
)

proxyServiceCert, err := certManager.IssueCertificate(xdsServerCertificateCommonName, certificate.Internal)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.CertificateIssuanceFailure, "Error issuing XDS certificate to ADS server")
}

background.Configurator = cfg
background.MeshCatalog = meshCatalog
background.CertManager = certManager
background.MsgBroker = msgBroker
background.ProxyServiceCert = proxyServiceCert
background.ProxyServerPort = cfg.GetProxyServerPort()

// Health/Liveness probes
var funcProbes []health.Probes
if cfg.GetTrafficInterceptionMode() == constants.TrafficInterceptionModePodLevel {
err = sidecar.InstallDriver(cfg.GetSidecarClass())
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating sidecar driver")
}

proxyServiceCert, err := certManager.IssueCertificate(xdsServerCertificateCommonName, certificate.Internal)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.CertificateIssuanceFailure, "Error issuing XDS certificate to ADS server")
}

background.ProxyServiceCert = proxyServiceCert
background.ProxyServerPort = cfg.GetProxyServerPort()

// Create and start the sidecar proxy service
healthProbes, err := sidecar.Start(ctx)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error initializing proxy control server")
}

dns.Init(cfg)

// Create DebugServer and start its config event listener.
// Listener takes care to start and stop the debug server as appropriate
debugConfig := debugger.NewDebugConfig(certManager, meshCatalog, kubeConfig, kubeClient, cfg, k8sClient, msgBroker)
go debugConfig.StartDebugServerConfigListener(background.DebugHandlers, stop)

go dns.WatchAndUpdateLocalDNSProxy(msgBroker, stop)
// Start the k8s pod watcher that updates corresponding k8s secrets
go k8s.WatchAndUpdateProxyBootstrapSecret(kubeClient, msgBroker, stop)

funcProbes = append(funcProbes, healthProbes)
// Create and start the sidecar proxy service
healthProbes, err := sidecar.Start(ctx)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error initializing proxy control server")
}

clientset := extensionsClientset.NewForConfigOrDie(kubeConfig)
Expand All @@ -389,12 +371,14 @@ func main() {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error starting the validating webhook server")
}

funcProbes = append(funcProbes, smi.HealthChecker{DiscoveryClient: clientset.Discovery()})
dns.Init(k8sClient, cfg)

version.SetMetric()

// Initialize FSM's http service server
httpServer := httpserver.NewHTTPServer(constants.FSMHTTPServerPort)
// Health/Liveness probes
funcProbes := []health.Probes{healthProbes, smi.HealthChecker{DiscoveryClient: clientset.Discovery()}}
httpServer.AddHandlers(map[string]http.Handler{
constants.FSMControllerReadinessPath: health.ReadinessHandler(funcProbes, nil),
constants.FSMControllerLivenessPath: health.LivenessHandler(funcProbes, nil),
Expand All @@ -412,6 +396,14 @@ func main() {
log.Fatal().Err(err).Msgf("Failed to start FSM metrics/probes HTTP server")
}

// Create DebugServer and start its config event listener.
// Listener takes care to start and stop the debug server as appropriate
debugConfig := debugger.NewDebugConfig(certManager, meshCatalog, kubeConfig, kubeClient, cfg, k8sClient, msgBroker)
go debugConfig.StartDebugServerConfigListener(background.DebugHandlers, stop)

go dns.WatchAndUpdateLocalDNSProxy(msgBroker, stop)
// Start the k8s pod watcher that updates corresponding k8s secrets
go k8s.WatchAndUpdateProxyBootstrapSecret(kubeClient, msgBroker, stop)
// Start the global log level watcher that updates the log level dynamically
go k8s.WatchAndUpdateLogLevel(msgBroker, stop)

Expand Down
46 changes: 42 additions & 4 deletions pkg/dns/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,36 @@ func NewHandler(config *Config) *DNSHandler {
return handler
}

func (h *DNSHandler) getTrustDomainSearches(trustDomain, namespace string) []string {
searches := []string{fmt.Sprintf(`.svc.%s`, namespace)}
sections := strings.Split(trustDomain, `.`)
for index := range sections {
searches = append(searches, fmt.Sprintf(`.svc.%s.%s`, strings.Join(sections[0:index+1], `.`), namespace))
}
searches = append(searches, fmt.Sprintf(`.%s`, namespace))
return searches
}

func (h *DNSHandler) getRawQName(qname string, suffixDomain string, trustDomain string) (string, string) {
fromNamespace := `default`
if strings.HasSuffix(qname, suffixDomain) {
qname = strings.TrimSuffix(qname, suffixDomain)
sections := strings.Split(qname, `.`)
ndots := len(sections)
if ndots > 2 {
fromNamespace = sections[len(sections)-1]
searches := h.getTrustDomainSearches(trustDomain, fromNamespace)
for _, search := range searches {
if strings.HasSuffix(qname, search) {
qname = strings.TrimSuffix(qname, search)
break
}
}
}
}
return qname, fromNamespace
}

func (h *DNSHandler) do(cfg *Config) {
trustDomain := service.GetTrustDomain()
suffixDomain := fmt.Sprintf(".svc.%s.", trustDomain)
Expand All @@ -87,16 +117,24 @@ func (h *DNSHandler) do(cfg *Config) {
}(w)

for index, q := range req.Question {
if strings.HasSuffix(q.Name, suffixDomain) {
if segs := strings.Split(q.Name, "."); len(segs) == 7 {
req.Question[index].Name = fmt.Sprintf("%s.%s.svc.%s.", segs[0], segs[1], trustDomain)
qname, fromNamespace := h.getRawQName(q.Name, suffixDomain, trustDomain)
segs := strings.Split(qname, `.`)
sections := len(segs)
if sections == 1 { //internal domain name
req.Question[index].Name = fmt.Sprintf(`%s.%s.svc.%s.`, segs[0], fromNamespace, trustDomain)
} else if sections > 2 { //external domain name
req.Question[index].Name = fmt.Sprintf(`%s.`, qname)
} else {
if k8sClient.GetK8sNamespace(segs[1]) != nil {
req.Question[index].Name = fmt.Sprintf(`%s.%s.svc.%s.`, segs[0], segs[1], trustDomain)
} else { //external domain name
req.Question[index].Name = fmt.Sprintf(`%s.`, qname)
}
}
}

q := req.Question[0]
Q := Question{UnFqdn(q.Name), dns.TypeToString[q.Qtype], dns.ClassToString[q.Qclass]}

var remote net.IP
if Net == "tcp" {
remote = w.RemoteAddr().(*net.TCPAddr).IP
Expand Down
4 changes: 2 additions & 2 deletions pkg/dns/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func (r *Resolver) Lookup(net string, req *dns.Msg, timeout int, interval int, n
defer wg.Done()
r, _, err := c.Exchange(req, nameserver)
if err != nil {
log.Error().Msgf("%s socket error on %s", qname, nameserver)
log.Error().Msgf("error:%s", err.Error())
log.Warn().Msgf("%s socket error on %s", qname, nameserver)
log.Warn().Msgf("error:%s", err.Error())
return
}
if r != nil && r.Rcode != dns.RcodeSuccess {
Expand Down
7 changes: 5 additions & 2 deletions pkg/dns/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
configv1alpha3 "github.com/flomesh-io/fsm/pkg/apis/config/v1alpha3"
"github.com/flomesh-io/fsm/pkg/configurator"
"github.com/flomesh-io/fsm/pkg/constants"
"github.com/flomesh-io/fsm/pkg/k8s"
"github.com/flomesh-io/fsm/pkg/k8s/events"
"github.com/flomesh-io/fsm/pkg/messaging"
)
Expand Down Expand Up @@ -91,7 +92,8 @@ func (s *Server) stop() {
}

var (
cfg configurator.Configurator
k8sClient k8s.Controller
cfg configurator.Configurator

server = &Server{
host: fmt.Sprintf(":%d", constants.FSMDNSProxyPort),
Expand All @@ -100,7 +102,8 @@ var (
}
)

func Init(configurator configurator.Configurator) {
func Init(kubeController k8s.Controller, configurator configurator.Configurator) {
k8sClient = kubeController
cfg = configurator
if cfg.IsLocalDNSProxyEnabled() {
Start()
Expand Down
9 changes: 9 additions & 0 deletions pkg/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,15 @@ func (c *client) GetNamespace(ns string) *corev1.Namespace {
return nil
}

// GetK8sNamespace returns a Namespace resource if found, nil otherwise.
func (c *client) GetK8sNamespace(ns string) *corev1.Namespace {
nsIf, exists, err := c.informers.GetByKey(fsminformers.InformerKeyNamespaceAll, ns)
if exists && err == nil {
return nsIf.(*corev1.Namespace)
}
return nil
}

// ListPods returns a list of pods part of the mesh
// Kubecontroller does not currently segment pod notifications, hence it receives notifications
// for all k8s Pods.
Expand Down
14 changes: 14 additions & 0 deletions pkg/k8s/mock_controller_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/k8s/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ type Controller interface {
// GetNamespace returns k8s namespace present in cache
GetNamespace(string) *corev1.Namespace

// GetK8sNamespace returns k8s namespace present in cache
GetK8sNamespace(string) *corev1.Namespace

// ListPods returns a list of pods part of the mesh
ListPods() []*corev1.Pod

Expand Down
2 changes: 0 additions & 2 deletions pkg/sidecar/providers/pipy/driver/pipy_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,8 @@ func getPipySidecarContainerSpec(injCtx *driver.InjectorContext, pod *corev1.Pod
dots := "4"
searches := make([]string, 0)
if len(pod.Namespace) > 0 {
dots = "5"
searches = append(searches, fmt.Sprintf("%s.svc.%s", pod.Namespace, trustDomain))
} else if len(injCtx.PodNamespace) > 0 {
dots = "5"
searches = append(searches, fmt.Sprintf("%s.svc.%s", injCtx.PodNamespace, trustDomain))
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sidecar/providers/pipy/repo/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (job *PipyConfGeneratorJob) Run() {
job.publishSidecarConf(s.repoClient, proxy, pipyConf)
end := time.Now()

log.Log().Str("proxy", proxy.GetCNPrefix()).
log.Debug().Str("proxy", proxy.GetCNPrefix()).
Int("maxprocs", runtime.GOMAXPROCS(-1)).
Str("elapsed", end.Sub(start).String()).
Msg("Codebase Recalculated")
Expand Down Expand Up @@ -598,7 +598,7 @@ func (job *PipyConfGeneratorJob) publishSidecarConf(repoClient *client.PipyRepoC
_, _ = repoClient.Delete(codebase)
} else {
proxy.ETag = codebaseCurV
log.Log().Str("proxy", proxy.GetCNPrefix()).
log.Debug().Str("proxy", proxy.GetCNPrefix()).
Str("id", fmt.Sprintf("%05d", proxy.ID)).
Str("prev", fmt.Sprintf("%020d", codebasePreV)).
Str("curv", fmt.Sprintf("%020d", codebaseCurV)).
Expand Down

0 comments on commit 6627e54

Please sign in to comment.