Skip to content

Commit

Permalink
feat: add KubeSpan extra endpoint configuration
Browse files Browse the repository at this point in the history
Fixes #9174

Signed-off-by: Andrey Smirnov <[email protected]>
  • Loading branch information
smira committed Sep 6, 2024
1 parent 3038ccf commit dd4185b
Show file tree
Hide file tree
Showing 26 changed files with 632 additions and 108 deletions.
1 change: 1 addition & 0 deletions api/resource/definitions/kubespan/kubespan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ message ConfigSpec {
uint32 mtu = 6;
repeated string endpoint_filters = 7;
bool harvest_extra_endpoints = 8;
repeated common.NetIPPort extra_endpoints = 9;
}

// EndpointSpec describes Endpoint state.
Expand Down
6 changes: 6 additions & 0 deletions hack/release.toml
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,12 @@ Talos Linux installer now never wipes the system disk on upgrades, which means t
title = "Disk Management"
description = """\
Talos Linux now supports [configuration](https://www.talos.dev/v1.8/talos-guides/configuration/disk-management/#machine-configuration) for the `EPHEMERAL` volume.
"""

[notes.kubespan]
title = "KubeSpan"
description = """\
Extra announced endpoints can be added using the [`KubespanEndpointsConfig` document](https://www.talos.dev/v1.8/talos-guides/network/kubespan/#configuration).
"""

[make_deps]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,13 @@ func (ctrl *LocalAffiliateController) Run(ctx context.Context, r controller.Runt
spec.KubeSpan.Endpoints = xslices.Map(endpointIPs, func(addr netip.Addr) netip.AddrPort {
return netip.AddrPortFrom(addr, constants.KubeSpanDefaultPort)
})

// add extra announced endpoints, deduplicating on the way
for _, addr := range kubespanConfig.TypedSpec().ExtraEndpoints {
if !slices.Contains(spec.KubeSpan.Endpoints, addr) {
spec.KubeSpan.Endpoints = append(spec.KubeSpan.Endpoints, addr)
}
}
}

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (suite *LocalAffiliateSuite) TestGeneration() {
ksConfig := kubespan.NewConfig(config.NamespaceName, kubespan.ConfigID)
ksConfig.TypedSpec().EndpointFilters = []string{"0.0.0.0/0", "!192.168.0.0/16", "2001::/16"}
ksConfig.TypedSpec().AdvertiseKubernetesNetworks = true
ksConfig.TypedSpec().ExtraEndpoints = []netip.AddrPort{netip.MustParseAddrPort("10.5.0.1:51820"), netip.MustParseAddrPort("1.2.3.4:5678")}
suite.Require().NoError(suite.state.Create(suite.ctx, ksConfig))

// add KS address to the list of node addresses, it should be ignored in the endpoints
Expand Down Expand Up @@ -109,7 +110,6 @@ func (suite *LocalAffiliateSuite) TestGeneration() {

asrt.NotZero(spec.KubeSpan.PublicKey)
asrt.NotZero(spec.KubeSpan.AdditionalAddresses)
asrt.Len(spec.KubeSpan.Endpoints, 4)

asrt.Equal(ksIdentity.TypedSpec().Address.Addr(), spec.KubeSpan.Address)
asrt.Equal(ksIdentity.TypedSpec().PublicKey, spec.KubeSpan.PublicKey)
Expand All @@ -120,6 +120,7 @@ func (suite *LocalAffiliateSuite) TestGeneration() {
"10.5.0.1:51820",
"1.1.1.1:51820",
"[2001:123:4567::1]:51820",
"1.2.3.4:5678",
},
xslices.Map(spec.KubeSpan.Endpoints, netip.AddrPort.String),
)
Expand Down
1 change: 1 addition & 0 deletions internal/app/machined/pkg/controllers/kubespan/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func NewConfigController() *ConfigController {
res.TypedSpec().HarvestExtraEndpoints = c.Machine().Network().KubeSpan().HarvestExtraEndpoints()
res.TypedSpec().MTU = c.Machine().Network().KubeSpan().MTU()
res.TypedSpec().EndpointFilters = c.Machine().Network().KubeSpan().Filters().Endpoints()
res.TypedSpec().ExtraEndpoints = c.KubespanConfig().ExtraAnnouncedEndpoints()
}

return nil
Expand Down
40 changes: 26 additions & 14 deletions internal/app/machined/pkg/controllers/kubespan/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package kubespan_test

import (
"fmt"
"net/netip"
"testing"
"time"

Expand All @@ -14,6 +16,7 @@ import (

kubespanctrl "github.com/siderolabs/talos/internal/app/machined/pkg/controllers/kubespan"
"github.com/siderolabs/talos/pkg/machinery/config/container"
"github.com/siderolabs/talos/pkg/machinery/config/types/network"
"github.com/siderolabs/talos/pkg/machinery/config/types/v1alpha1"
"github.com/siderolabs/talos/pkg/machinery/resources/config"
"github.com/siderolabs/talos/pkg/machinery/resources/kubespan"
Expand All @@ -28,22 +31,30 @@ func (suite *ConfigSuite) TestReconcileConfig() {

suite.startRuntime()

cfg := config.NewMachineConfig(
container.NewV1Alpha1(
&v1alpha1.Config{
ConfigVersion: "v1alpha1",
MachineConfig: &v1alpha1.MachineConfig{
MachineNetwork: &v1alpha1.NetworkConfig{
NetworkKubeSpan: &v1alpha1.NetworkKubeSpan{
KubeSpanEnabled: pointer.To(true),
},
ctr, err := container.New(
&v1alpha1.Config{
ConfigVersion: "v1alpha1",
MachineConfig: &v1alpha1.MachineConfig{
MachineNetwork: &v1alpha1.NetworkConfig{
NetworkKubeSpan: &v1alpha1.NetworkKubeSpan{
KubeSpanEnabled: pointer.To(true),
},
},
ClusterConfig: &v1alpha1.ClusterConfig{
ClusterID: "8XuV9TZHW08DOk3bVxQjH9ih_TBKjnh-j44tsCLSBzo=",
ClusterSecret: "I+1In7fLnpcRIjUmEoeugZnSyFoTF6MztLxICL5Yu0s=",
},
}))
},
ClusterConfig: &v1alpha1.ClusterConfig{
ClusterID: "8XuV9TZHW08DOk3bVxQjH9ih_TBKjnh-j44tsCLSBzo=",
ClusterSecret: "I+1In7fLnpcRIjUmEoeugZnSyFoTF6MztLxICL5Yu0s=",
},
},
&network.KubespanEndpointsConfigV1Alpha1{
ExtraAnnouncedEndpointsConfig: []netip.AddrPort{
netip.MustParseAddrPort("192.168.33.11:1001"),
},
},
)
suite.Require().NoError(err)

cfg := config.NewMachineConfig(ctr)

suite.Require().NoError(suite.state.Create(suite.ctx, cfg))

Expand All @@ -61,6 +72,7 @@ func (suite *ConfigSuite) TestReconcileConfig() {
suite.Assert().True(spec.ForceRouting)
suite.Assert().False(spec.AdvertiseKubernetesNetworks)
suite.Assert().False(spec.HarvestExtraEndpoints)
suite.Assert().Equal("[\"192.168.33.11:1001\"]", fmt.Sprintf("%q", spec.ExtraEndpoints))

return nil
},
Expand Down
55 changes: 55 additions & 0 deletions internal/integration/api/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package api
import (
"context"
"fmt"
"math/rand/v2"
"net/netip"
"strings"
"time"
Expand All @@ -23,6 +24,7 @@ import (

"github.com/siderolabs/talos/internal/integration/base"
"github.com/siderolabs/talos/pkg/machinery/client"
"github.com/siderolabs/talos/pkg/machinery/config/types/network"
"github.com/siderolabs/talos/pkg/machinery/resources/cluster"
"github.com/siderolabs/talos/pkg/machinery/resources/kubespan"
)
Expand Down Expand Up @@ -277,6 +279,59 @@ func (suite *DiscoverySuite) TestKubeSpanPeers() {
}
}

// TestKubeSpanExtraEndpoints verifies that KubeSpan peer specs are updated with extra endpoints.
func (suite *DiscoverySuite) TestKubeSpanExtraEndpoints() {
if !suite.Capabilities().RunsTalosKernel {
suite.T().Skip("not running Talos kernel")
}

// check that cluster has KubeSpan enabled
node := suite.RandomDiscoveredNodeInternalIP()
suite.ClearConnectionRefused(suite.ctx, node)

nodeCtx := client.WithNode(suite.ctx, node)
provider, err := suite.ReadConfigFromNode(nodeCtx)
suite.Require().NoError(err)

if !provider.Machine().Network().KubeSpan().Enabled() {
suite.T().Skip("KubeSpan is disabled")
}

nodes := suite.DiscoverNodeInternalIPs(suite.ctx)

if len(nodes) < 2 {
suite.T().Skip("need at least two nodes for this test")
}

perm := rand.Perm(len(nodes))

checkNode := nodes[perm[0]]
targetNode := nodes[perm[1]]

mockEndpoint := netip.MustParseAddrPort("169.254.121.121:5820")

// inject extra endpoint to target node
cfgDocument := network.NewKubespanEndpointsV1Alpha1()
cfgDocument.ExtraAnnouncedEndpointsConfig = []netip.AddrPort{mockEndpoint}

suite.T().Logf("injecting extra endpoint %s to node %s", mockEndpoint, targetNode)
suite.PatchMachineConfig(client.WithNode(suite.ctx, targetNode), cfgDocument)

targetIdentity, err := safe.ReaderGetByID[*kubespan.Identity](client.WithNode(suite.ctx, targetNode), suite.Client.COSI, kubespan.LocalIdentity)
suite.Require().NoError(err)

suite.T().Logf("checking extra endpoint %s on node %s", mockEndpoint, checkNode)
rtestutils.AssertResources(client.WithNode(suite.ctx, checkNode), suite.T(), suite.Client.COSI, []string{targetIdentity.TypedSpec().PublicKey},
func(peer *kubespan.PeerSpec, asrt *assert.Assertions) {
asrt.Contains(peer.TypedSpec().Endpoints, mockEndpoint)
},
)

// the extra endpoints disappears with a timeout from the discovery service, so can't assert on that
suite.T().Logf("removin extra endpoint %s from node %s", mockEndpoint, targetNode)
suite.RemoveMachineConfigDocuments(client.WithNode(suite.ctx, targetNode), cfgDocument.MetaKind)
}

func (suite *DiscoverySuite) getMembers(nodeCtx context.Context) []*cluster.Member {
var result []*cluster.Member

Expand Down
Loading

0 comments on commit dd4185b

Please sign in to comment.