Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,8 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom

func (s *Server) registerResources() {
catalog.RegisterTypes(s.typeRegistry)
catalog.RegisterControllers(s.controllerManager)

mesh.RegisterTypes(s.typeRegistry)
reaper.RegisterControllers(s.controllerManager)

Expand Down
8 changes: 8 additions & 0 deletions internal/catalog/exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package catalog

import (
"github.com/hashicorp/consul/internal/catalog/internal/controllers"
"github.com/hashicorp/consul/internal/catalog/internal/types"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/resource"
)

Expand Down Expand Up @@ -43,3 +45,9 @@ var (
func RegisterTypes(r resource.Registry) {
types.Register(r)
}

// RegisterControllers registers controllers for the catalog types with
// the given controller Manager.
func RegisterControllers(mgr *controller.Manager) {
controllers.Register(mgr)
}
120 changes: 120 additions & 0 deletions internal/catalog/internal/controllers/nodehealth/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package nodehealth

import (
"context"
"fmt"

"github.com/hashicorp/consul/internal/catalog/internal/types"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/resource"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbresource"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func NodeHealthController() controller.Controller {
return controller.ForType(types.NodeType).
WithWatch(types.HealthStatusType, controller.MapOwnerFiltered(types.NodeType)).
WithReconciler(&nodeHealthReconciler{})
}

type nodeHealthReconciler struct{}

func (r *nodeHealthReconciler) Reconcile(ctx context.Context, rt controller.Runtime, req controller.Request) error {
// The runtime is passed by value so replacing it here for the remaineder of this
// reconciliation request processing will not affect future invocations.
rt.Logger = rt.Logger.With("resource-id", req.ID)

rt.Logger.Trace("reconciling node health")

// read the node
rsp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: req.ID})
switch {
case status.Code(err) == codes.NotFound:
rt.Logger.Trace("node has been deleted")
return nil
case err != nil:
rt.Logger.Error("the resource service has returned an unexpected error", "error", err)
return err
}

res := rsp.Resource

health, err := getNodeHealth(ctx, rt, req.ID)
if err != nil {
rt.Logger.Error("failed to calculate the nodes health", "error", err)
return err
}

message := NodeHealthyMessage
statusState := pbresource.Condition_STATE_TRUE
if health != pbcatalog.Health_HEALTH_PASSING {
statusState = pbresource.Condition_STATE_FALSE
message = NodeUnhealthyMessage
}

newStatus := &pbresource.Status{
ObservedGeneration: res.Generation,
Conditions: []*pbresource.Condition{
{
Type: StatusConditionHealthy,
State: statusState,
Reason: health.String(),
Message: message,
},
},
}

if resource.EqualStatus(res.Status[StatusKey], newStatus, false) {
rt.Logger.Trace("resources node health status is unchanged", "health", health.String())
return nil
}

_, err = rt.Client.WriteStatus(ctx, &pbresource.WriteStatusRequest{
Id: res.Id,
Key: StatusKey,
Status: newStatus,
})

if err != nil {
rt.Logger.Error("error encountered when attempting to update the resources node health status", "error", err)
return err
}

rt.Logger.Trace("resources node health status was updated", "health", health.String())
return nil
}

func getNodeHealth(ctx context.Context, rt controller.Runtime, nodeRef *pbresource.ID) (pbcatalog.Health, error) {
rsp, err := rt.Client.ListByOwner(ctx, &pbresource.ListByOwnerRequest{
Owner: nodeRef,
})

if err != nil {
return pbcatalog.Health_HEALTH_CRITICAL, err
}

health := pbcatalog.Health_HEALTH_PASSING

for _, res := range rsp.Resources {
if resource.EqualType(res.Id.Type, types.HealthStatusType) {
var hs pbcatalog.HealthStatus
if err := res.Data.UnmarshalTo(&hs); err != nil {
// This should be impossible as the resource service + type validations the
// catalog is performing will ensure that no data gets written where unmarshalling
// to this type will error.
return pbcatalog.Health_HEALTH_CRITICAL, fmt.Errorf("error unmarshalling health status data: %w", err)
}

if hs.Status > health {
health = hs.Status
}
}
}

return health, nil
}
Loading