Skip to content

Commit

Permalink
fix local dns proxy. (#467)
Browse files Browse the repository at this point in the history
  • Loading branch information
cybwan authored Nov 22, 2024
1 parent 0b44bf5 commit a624743
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 14 deletions.
3 changes: 1 addition & 2 deletions cmd/fsm-controller/fsm-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,6 @@ func main() {
}

k8sClient := k8s.NewKubernetesController(informerCollection, policyClient, pluginClient, msgBroker)

meshSpec := smi.NewSMIClient(informerCollection, fsmNamespace, k8sClient, msgBroker)

certOpts, err := getCertOptions()
Expand Down Expand Up @@ -372,7 +371,7 @@ func main() {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error starting the validating webhook server")
}

dns.Init(cfg)
dns.Init(k8sClient, cfg)

version.SetMetric()

Expand Down
46 changes: 42 additions & 4 deletions pkg/dns/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,36 @@ func NewHandler(config *Config) *DNSHandler {
return handler
}

func (h *DNSHandler) getTrustDomainSearches(trustDomain, namespace string) []string {
searches := []string{fmt.Sprintf(`.svc.%s`, namespace)}
sections := strings.Split(trustDomain, `.`)
for index := range sections {
searches = append(searches, fmt.Sprintf(`.svc.%s.%s`, strings.Join(sections[0:index+1], `.`), namespace))
}
searches = append(searches, fmt.Sprintf(`.%s`, namespace))
return searches
}

func (h *DNSHandler) getRawQName(qname string, suffixDomain string, trustDomain string) (string, string) {
fromNamespace := `default`
if strings.HasSuffix(qname, suffixDomain) {
qname = strings.TrimSuffix(qname, suffixDomain)
sections := strings.Split(qname, `.`)
ndots := len(sections)
if ndots > 2 {
fromNamespace = sections[len(sections)-1]
searches := h.getTrustDomainSearches(trustDomain, fromNamespace)
for _, search := range searches {
if strings.HasSuffix(qname, search) {
qname = strings.TrimSuffix(qname, search)
break
}
}
}
}
return qname, fromNamespace
}

func (h *DNSHandler) do(cfg *Config) {
trustDomain := service.GetTrustDomain()
suffixDomain := fmt.Sprintf(".svc.%s.", trustDomain)
Expand All @@ -87,16 +117,24 @@ func (h *DNSHandler) do(cfg *Config) {
}(w)

for index, q := range req.Question {
if strings.HasSuffix(q.Name, suffixDomain) {
if segs := strings.Split(q.Name, "."); len(segs) == 7 {
req.Question[index].Name = fmt.Sprintf("%s.%s.svc.%s.", segs[0], segs[1], trustDomain)
qname, fromNamespace := h.getRawQName(q.Name, suffixDomain, trustDomain)
segs := strings.Split(qname, `.`)
sections := len(segs)
if sections == 1 { //internal domain name
req.Question[index].Name = fmt.Sprintf(`%s.%s.svc.%s.`, segs[0], fromNamespace, trustDomain)
} else if sections > 2 { //external domain name
req.Question[index].Name = fmt.Sprintf(`%s.`, qname)
} else {
if k8sClient.GetK8sNamespace(segs[1]) != nil {
req.Question[index].Name = fmt.Sprintf(`%s.%s.svc.%s.`, segs[0], segs[1], trustDomain)
} else { //external domain name
req.Question[index].Name = fmt.Sprintf(`%s.`, qname)
}
}
}

q := req.Question[0]
Q := Question{UnFqdn(q.Name), dns.TypeToString[q.Qtype], dns.ClassToString[q.Qclass]}

var remote net.IP
if Net == "tcp" {
remote = w.RemoteAddr().(*net.TCPAddr).IP
Expand Down
4 changes: 2 additions & 2 deletions pkg/dns/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func (r *Resolver) Lookup(net string, req *dns.Msg, timeout int, interval int, n
defer wg.Done()
r, _, err := c.Exchange(req, nameserver)
if err != nil {
log.Error().Msgf("%s socket error on %s", qname, nameserver)
log.Error().Msgf("error:%s", err.Error())
log.Warn().Msgf("%s socket error on %s", qname, nameserver)
log.Warn().Msgf("error:%s", err.Error())
return
}
if r != nil && r.Rcode != dns.RcodeSuccess {
Expand Down
7 changes: 5 additions & 2 deletions pkg/dns/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
configv1alpha3 "github.com/flomesh-io/fsm/pkg/apis/config/v1alpha3"
"github.com/flomesh-io/fsm/pkg/configurator"
"github.com/flomesh-io/fsm/pkg/constants"
"github.com/flomesh-io/fsm/pkg/k8s"
"github.com/flomesh-io/fsm/pkg/k8s/events"
"github.com/flomesh-io/fsm/pkg/messaging"
)
Expand Down Expand Up @@ -91,7 +92,8 @@ func (s *Server) stop() {
}

var (
cfg configurator.Configurator
k8sClient k8s.Controller
cfg configurator.Configurator

server = &Server{
host: fmt.Sprintf(":%d", constants.FSMDNSProxyPort),
Expand All @@ -100,7 +102,8 @@ var (
}
)

func Init(configurator configurator.Configurator) {
func Init(kubeController k8s.Controller, configurator configurator.Configurator) {
k8sClient = kubeController
cfg = configurator
if cfg.IsLocalDNSProxyEnabled() {
Start()
Expand Down
9 changes: 9 additions & 0 deletions pkg/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,15 @@ func (c *client) GetNamespace(ns string) *corev1.Namespace {
return nil
}

// GetK8sNamespace returns a Namespace resource if found, nil otherwise.
func (c *client) GetK8sNamespace(ns string) *corev1.Namespace {
nsIf, exists, err := c.informers.GetByKey(fsminformers.InformerKeyNamespaceAll, ns)
if exists && err == nil {
return nsIf.(*corev1.Namespace)
}
return nil
}

// ListPods returns a list of pods part of the mesh
// Kubecontroller does not currently segment pod notifications, hence it receives notifications
// for all k8s Pods.
Expand Down
14 changes: 14 additions & 0 deletions pkg/k8s/mock_controller_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/k8s/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ type Controller interface {
// GetNamespace returns k8s namespace present in cache
GetNamespace(string) *corev1.Namespace

// GetK8sNamespace returns k8s namespace present in cache
GetK8sNamespace(string) *corev1.Namespace

// ListPods returns a list of pods part of the mesh
ListPods() []*corev1.Pod

Expand Down
2 changes: 0 additions & 2 deletions pkg/sidecar/providers/pipy/driver/pipy_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,8 @@ func getPipySidecarContainerSpec(injCtx *driver.InjectorContext, pod *corev1.Pod
dots := "4"
searches := make([]string, 0)
if len(pod.Namespace) > 0 {
dots = "5"
searches = append(searches, fmt.Sprintf("%s.svc.%s", pod.Namespace, trustDomain))
} else if len(injCtx.PodNamespace) > 0 {
dots = "5"
searches = append(searches, fmt.Sprintf("%s.svc.%s", injCtx.PodNamespace, trustDomain))
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sidecar/providers/pipy/repo/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (job *PipyConfGeneratorJob) Run() {
job.publishSidecarConf(s.repoClient, proxy, pipyConf)
end := time.Now()

log.Log().Str("proxy", proxy.GetCNPrefix()).
log.Debug().Str("proxy", proxy.GetCNPrefix()).
Int("maxprocs", runtime.GOMAXPROCS(-1)).
Str("elapsed", end.Sub(start).String()).
Msg("Codebase Recalculated")
Expand Down Expand Up @@ -598,7 +598,7 @@ func (job *PipyConfGeneratorJob) publishSidecarConf(repoClient *client.PipyRepoC
_, _ = repoClient.Delete(codebase)
} else {
proxy.ETag = codebaseCurV
log.Log().Str("proxy", proxy.GetCNPrefix()).
log.Debug().Str("proxy", proxy.GetCNPrefix()).
Str("id", fmt.Sprintf("%05d", proxy.ID)).
Str("prev", fmt.Sprintf("%020d", codebasePreV)).
Str("curv", fmt.Sprintf("%020d", codebaseCurV)).
Expand Down

0 comments on commit a624743

Please sign in to comment.