Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misc Fixes and Updates #1763

Merged
merged 9 commits into from
Nov 21, 2024
43 changes: 40 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,9 +1,46 @@
.vscode
.idea
#### Project Specific Ignores ####
# Built binaries
/kube-router
/gobgp
/cni-download

# Go directories
_output
_cache
vendor
.*.sw?
/cni-download

# Ignore worktree directory
worktrees

# Ignore common IDE directories
/.vscode
/.idea


#### Go Lang Ignores ####
# If you prefer the allow list template instead of the deny list, see community template:
# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore
#
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib

# Test binary, built with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# Dependency directories (remove the comment below to include it)
# vendor/

# Go workspace file
go.work
go.work.sum

# env file
.env
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ docker-login: ## Logs into a docker registry using {DOCKER,QUAY}_{USERNAME,PASSW

push: container docker-login ## Pushes a Docker container image to a registry.
@echo Starting kube-router container image push.
$(DOCKER) push "$(REGISTRY_DEV):$(IMG_TAG)"
$(DOCKER) push "$(REGISTRY_DEV):$(subst /,,$(IMG_TAG))"
@echo Finished kube-router container image push.

push-manifest:
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/lballoc/lballoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func (lbc *LoadBalancerController) Run(healthChan chan<- *healthcheck.Controller
}
case <-timer.C:
timer.Reset(time.Minute)
healthcheck.SendHeartBeat(healthChan, "LBC")
healthcheck.SendHeartBeat(healthChan, healthcheck.LoadBalancerController)
if isLeader {
go lbc.walkServices()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/netpol/network_policy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (npc *NetworkPolicyController) fullPolicySync() {
}
}

healthcheck.SendHeartBeat(npc.healthChan, "NPC")
healthcheck.SendHeartBeat(npc.healthChan, healthcheck.NetworkPolicyController)
start := time.Now()
syncVersion := strconv.FormatInt(start.UnixNano(), syncVersionBase)
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/proxy/hairpin_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (hpc *hairpinController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup,
endpointIP, err)
}
case <-t.C:
healthcheck.SendHeartBeat(healthChan, "HPC")
healthcheck.SendHeartBeat(healthChan, healthcheck.HairpinController)
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/proxy/network_services_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (nsc *NetworkServicesController) Run(healthChan chan<- *healthcheck.Control
}

case perform := <-nsc.syncChan:
healthcheck.SendHeartBeat(healthChan, "NSC")
healthcheck.SendHeartBeat(healthChan, healthcheck.NetworkServicesController)
switch perform {
case synctypeAll:
klog.V(1).Info("Performing requested full sync of services")
Expand All @@ -365,18 +365,18 @@ func (nsc *NetworkServicesController) Run(healthChan chan<- *healthcheck.Control
nsc.mu.Unlock()
}
if err == nil {
healthcheck.SendHeartBeat(healthChan, "NSC")
healthcheck.SendHeartBeat(healthChan, healthcheck.NetworkServicesController)
}

case <-t.C:
klog.V(1).Info("Performing periodic sync of ipvs services")
healthcheck.SendHeartBeat(healthChan, "NSC")
healthcheck.SendHeartBeat(healthChan, healthcheck.NetworkServicesController)
err := nsc.doSync()
if err != nil {
klog.Errorf("Error during periodic ipvs sync in network service controller. Error: " + err.Error())
klog.Errorf("Skipping sending heartbeat from network service controller as periodic sync failed.")
} else {
healthcheck.SendHeartBeat(healthChan, "NSC")
healthcheck.SendHeartBeat(healthChan, healthcheck.NetworkServicesController)
}
}
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/controllers/routing/bgp_policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cloudnativelabs/kube-router/v2/pkg/utils"
gobgpapi "github.com/osrg/gobgp/v3/api"
gobgp "github.com/osrg/gobgp/v3/pkg/server"
"github.com/stretchr/testify/assert"
)

type PolicyTestCase struct {
Expand Down Expand Up @@ -1457,10 +1458,11 @@ func Test_AddPolicies(t *testing.T) {
}

err := testcase.nrc.startBgpServer(false)
if !reflect.DeepEqual(err, testcase.startBGPServerErr) {
t.Logf("expected err when invoking startBGPServer(): %v", testcase.startBGPServerErr)
t.Logf("actual err from startBGPServer() received: %v", err)
t.Fatal("unexpected error")
if testcase.startBGPServerErr != nil && err == nil {
t.Errorf("expected error when starting BGP server, got nil on testcase: %s", testcase.name)
}
if err != nil {
assert.EqualError(t, testcase.startBGPServerErr, err.Error())
}
// If the server was not expected to start we should stop here as the rest of the tests are unimportant
if testcase.startBGPServerErr != nil {
Expand Down
7 changes: 4 additions & 3 deletions pkg/controllers/routing/ecmp_vip.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ func getServiceObject(obj interface{}) (svc *v1core.Service) {
}

func (nrc *NetworkRoutingController) handleServiceUpdate(svcOld, svcNew *v1core.Service) {
klog.V(2).Infof("Handling update for service: %s", svcNew)
if !nrc.bgpServerStarted {
klog.V(3).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync",
klog.V(1).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync",
svcNew.Namespace, svcNew.Name)
return
}
Expand All @@ -156,9 +157,9 @@ func (nrc *NetworkRoutingController) handleServiceUpdate(svcOld, svcNew *v1core.
}

func (nrc *NetworkRoutingController) handleServiceDelete(oldSvc *v1core.Service) {

klog.V(2).Infof("Handling delete for service: %s", oldSvc)
if !nrc.bgpServerStarted {
klog.V(3).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync",
klog.V(1).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync",
oldSvc.Namespace, oldSvc.Name)
return
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/controllers/routing/network_routes_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ const (
type RouteSyncer interface {
AddInjectedRoute(dst *net.IPNet, route *netlink.Route)
DelInjectedRoute(dst *net.IPNet)
Run(stopCh <-chan struct{}, wg *sync.WaitGroup)
SyncLocalRouteTable()
Run(healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup)
SyncLocalRouteTable() error
}

// PolicyBasedRouting is an interface that defines the methods needed to enable/disable policy based routing
Expand Down Expand Up @@ -310,7 +310,7 @@ func (nrc *NetworkRoutingController) Run(healthChan chan<- *healthcheck.Controll
klog.Infof("Starting network route controller")

// Start route syncer
nrc.routeSyncer.Run(stopCh, wg)
nrc.routeSyncer.Run(healthChan, stopCh, wg)

// Wait till we are ready to launch BGP server
for {
Expand Down Expand Up @@ -391,7 +391,7 @@ func (nrc *NetworkRoutingController) Run(healthChan chan<- *healthcheck.Controll
}

if err == nil {
healthcheck.SendHeartBeat(healthChan, "NRC")
healthcheck.SendHeartBeat(healthChan, healthcheck.NetworkRoutesController)
} else {
klog.Errorf("Error during periodic sync in network routing controller. Error: " + err.Error())
klog.Errorf("Skipping sending heartbeat from network routing controller as periodic sync failed.")
Expand Down Expand Up @@ -659,6 +659,7 @@ func (nrc *NetworkRoutingController) injectRoute(path *gobgpapi.Path) error {
} else {
// knowing that a tunnel shouldn't exist for this route, check to see if there are any lingering tunnels /
// routes that need to be cleaned up.
nrc.routeSyncer.DelInjectedRoute(dst)
tunnels.CleanupTunnel(dst, tunnelName)
}

Expand Down Expand Up @@ -700,15 +701,15 @@ func (nrc *NetworkRoutingController) injectRoute(path *gobgpapi.Path) error {
}
default:
// otherwise, let BGP do its thing, nothing to do here
nrc.routeSyncer.DelInjectedRoute(dst)
return nil
}

// Alright, everything is in place, and we have our route configured, let's add it to the host's routing table
klog.V(2).Infof("Inject route: '%s via %s' from peer to routing table", dst, nextHop)
nrc.routeSyncer.AddInjectedRoute(dst, route)
// Immediately sync the local route table regardless of timer
nrc.routeSyncer.SyncLocalRouteTable()
return nil
return nrc.routeSyncer.SyncLocalRouteTable()
}

func (nrc *NetworkRoutingController) isPeerEstablished(peerIP string) (bool, error) {
Expand Down Expand Up @@ -1288,7 +1289,7 @@ func NewNetworkRoutingController(clientset kubernetes.Interface,
nrc.bgpServerStarted = false
nrc.disableSrcDstCheck = kubeRouterConfig.DisableSrcDstCheck
nrc.initSrcDstCheckDone = false
nrc.routeSyncer = routes.NewRouteSyncer(kubeRouterConfig.InjectedRoutesSyncPeriod)
nrc.routeSyncer = routes.NewRouteSyncer(kubeRouterConfig.InjectedRoutesSyncPeriod, kubeRouterConfig.MetricsEnabled)

nrc.bgpHoldtime = kubeRouterConfig.BGPHoldTime.Seconds()
if nrc.bgpHoldtime > 65536 || nrc.bgpHoldtime < 3 {
Expand Down
57 changes: 46 additions & 11 deletions pkg/healthcheck/health_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,32 @@ const (
HPCSyncPeriod = time.Duration(HPCStaticSyncInterval) * time.Second
defaultGraceTimeDuration = time.Duration(1500) * time.Millisecond
healthControllerTickTime = 5000 * time.Millisecond

// Defined health checks
NetworkRoutesController = iota
LoadBalancerController
NetworkPolicyController
NetworkServicesController
HairpinController
MetricsController
RouteSyncController
)

var (
HeartBeatCompNames = map[int]string{
NetworkRoutesController: "NetworkRoutesController",
LoadBalancerController: "LoadBalancerController",
NetworkPolicyController: "NetworkPolicyController",
NetworkServicesController: "NetworkServicesController",
HairpinController: "HairpinController",
MetricsController: "MetricsController",
RouteSyncController: "RouteSyncController",
}
)

// ControllerHeartbeat is the structure to hold the heartbeats sent by controllers
type ControllerHeartbeat struct {
Component string
Component int
LastHeartBeat time.Time
}

Expand All @@ -47,12 +68,14 @@ type HealthStats struct {
NetworkServicesControllerAliveTTL time.Duration
HairpinControllerAlive time.Time
HairpinControllerAliveTTL time.Duration
RouteSyncControllerAlive time.Time
RouteSyncControllerAliveTTL time.Duration
}

// SendHeartBeat sends a heartbeat on the passed channel
func SendHeartBeat(channel chan<- *ControllerHeartbeat, controller string) {
func SendHeartBeat(channel chan<- *ControllerHeartbeat, component int) {
heartbeat := ControllerHeartbeat{
Component: controller,
Component: component,
LastHeartBeat: time.Now(),
}
channel <- &heartbeat
Expand Down Expand Up @@ -88,45 +111,51 @@ func (hc *HealthController) Handler(w http.ResponseWriter, _ *http.Request) {

// HandleHeartbeat handles received heartbeats on the health channel
func (hc *HealthController) HandleHeartbeat(beat *ControllerHeartbeat) {
klog.V(3).Infof("Received heartbeat from %s", beat.Component)
klog.V(3).Infof("Received heartbeat from %s", HeartBeatCompNames[beat.Component])

hc.Status.Lock()
defer hc.Status.Unlock()

switch {
switch beat.Component {
// The first heartbeat will set the initial gracetime the controller has to report in, A static time is added as
// well when checking to allow for load variation in sync time
case beat.Component == "LBC":
case LoadBalancerController:
if hc.Status.LoadBalancerControllerAliveTTL == 0 {
hc.Status.LoadBalancerControllerAliveTTL = time.Since(hc.Status.LoadBalancerControllerAlive)
}
hc.Status.LoadBalancerControllerAlive = beat.LastHeartBeat

case beat.Component == "NSC":
case NetworkServicesController:
if hc.Status.NetworkServicesControllerAliveTTL == 0 {
hc.Status.NetworkServicesControllerAliveTTL = time.Since(hc.Status.NetworkServicesControllerAlive)
}
hc.Status.NetworkServicesControllerAlive = beat.LastHeartBeat

case beat.Component == "HPC":
case HairpinController:
if hc.Status.HairpinControllerAliveTTL == 0 {
hc.Status.HairpinControllerAliveTTL = time.Since(hc.Status.HairpinControllerAlive)
}
hc.Status.HairpinControllerAlive = beat.LastHeartBeat

case beat.Component == "NRC":
case NetworkRoutesController:
if hc.Status.NetworkRoutingControllerAliveTTL == 0 {
hc.Status.NetworkRoutingControllerAliveTTL = time.Since(hc.Status.NetworkRoutingControllerAlive)
}
hc.Status.NetworkRoutingControllerAlive = beat.LastHeartBeat

case beat.Component == "NPC":
case RouteSyncController:
if hc.Status.RouteSyncControllerAliveTTL == 0 {
hc.Status.RouteSyncControllerAliveTTL = time.Since(hc.Status.RouteSyncControllerAlive)
}
hc.Status.RouteSyncControllerAlive = beat.LastHeartBeat

case NetworkPolicyController:
if hc.Status.NetworkPolicyControllerAliveTTL == 0 {
hc.Status.NetworkPolicyControllerAliveTTL = time.Since(hc.Status.NetworkPolicyControllerAlive)
}
hc.Status.NetworkPolicyControllerAlive = beat.LastHeartBeat

case beat.Component == "MC":
case MetricsController:
hc.Status.MetricsControllerAlive = beat.LastHeartBeat
}
}
Expand Down Expand Up @@ -158,6 +187,11 @@ func (hc *HealthController) CheckHealth() bool {
klog.Error("Network Routing Controller heartbeat missed")
health = false
}
if time.Since(hc.Status.RouteSyncControllerAlive) >
hc.Config.InjectedRoutesSyncPeriod+hc.Status.RouteSyncControllerAliveTTL+graceTime {
klog.Error("Routes Sync Controller heartbeat missed")
health = false
}
}

if hc.Config.RunServiceProxy {
Expand Down Expand Up @@ -242,6 +276,7 @@ func (hc *HealthController) SetAlive() {
hc.Status.NetworkRoutingControllerAlive = now
hc.Status.NetworkServicesControllerAlive = now
hc.Status.HairpinControllerAlive = now
hc.Status.RouteSyncControllerAlive = now
}

// NewHealthController creates a new health controller and returns a reference to it
Expand Down
26 changes: 25 additions & 1 deletion pkg/metrics/metrics_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,30 @@ var (
Name: "controller_policy_ipsets",
Help: "Active policy ipsets",
})
// ControllerHostRoutesSyncTime Time it took for the host routes controller to sync to the system
ControllerHostRoutesSyncTime = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Name: "host_routes_sync_time",
Help: "Time it took for the host routes controller to sync to the system",
})
// ControllerHostRoutesSynced Number of host routes currently synced to the system
ControllerHostRoutesSynced = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "host_routes_synced",
Help: "Count of host routes currently synced to the system",
})
// ControllerHostRoutesSynced Number of host routes added to the system
ControllerHostRoutesAdded = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "host_routes_added",
Help: "Total count of host routes added to the system",
})
// ControllerHostRoutesSynced Number of host routes removed to the system
ControllerHostRoutesRemoved = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "host_routes_removed",
Help: "Total count of host routes removed to the system",
})
)

// Controller Holds settings for the metrics controller
Expand Down Expand Up @@ -251,7 +275,7 @@ func (mc *Controller) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, st
}
}()
for {
healthcheck.SendHeartBeat(healthChan, "MC")
healthcheck.SendHeartBeat(healthChan, healthcheck.MetricsController)
select {
case <-stopCh:
klog.Infof("Shutting down metrics controller")
Expand Down
Loading