Skip to content
Merged
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,12 @@ require (
github.com/ipfs/go-log/v2 v2.5.1
github.com/json-iterator/go v1.1.12
github.com/k3s-io/helm-controller v0.15.8
github.com/k3s-io/kine v0.11.0
github.com/k3s-io/kine v0.11.4
github.com/klauspost/compress v1.17.2
github.com/kubernetes-sigs/cri-tools v0.0.0-00010101000000-000000000000
github.com/lib/pq v1.10.2
github.com/libp2p/go-libp2p v0.30.0
github.com/mattn/go-sqlite3 v1.14.17
github.com/mattn/go-sqlite3 v1.14.19
github.com/minio/minio-go/v7 v7.0.33
github.com/mwitkow/go-http-dialer v0.0.0-20161116154839-378f744fb2b8
github.com/natefinch/lumberjack v2.0.0+incompatible
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -811,8 +811,8 @@ github.com/k3s-io/etcd/server/v3 v3.5.9-k3s1 h1:B3039IkTPnwQEt4tIMjC6yd6b1Q3Z9ZZ
github.com/k3s-io/etcd/server/v3 v3.5.9-k3s1/go.mod h1:GgI1fQClQCFIzuVjlvdbMxNbnISt90gdfYyqiAIt65g=
github.com/k3s-io/helm-controller v0.15.8 h1:CAMEPmiqf4ugUCpZdICGINthCn+hkG/l1fadn8aVjfQ=
github.com/k3s-io/helm-controller v0.15.8/go.mod h1:AYitg40howLjKloL/zdjDDOPL1jg/K5R4af0tQcyPR8=
github.com/k3s-io/kine v0.11.0 h1:7tS0H9yBDxXiy1BgEEkBWLswwG/q4sARPTHdxOMz1qw=
github.com/k3s-io/kine v0.11.0/go.mod h1:tjSsWrCetgaGMTfnJW6vzqdT/qOPhF/+nUEaE+eixBA=
github.com/k3s-io/kine v0.11.4 h1:ZIXQT4vPPKNL9DwLF4dQ11tWtpJ1C/7OKNIpFmTkImo=
github.com/k3s-io/kine v0.11.4/go.mod h1:NmwOWsWgB3aScq5+LEYytAaceqkG7lmCLLjjrWug8v4=
github.com/k3s-io/klog v1.0.0-k3s2 h1:yyvD2bQbxG7m85/pvNctLX2bUDmva5kOBvuZ77tTGBA=
github.com/k3s-io/klog v1.0.0-k3s2/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
github.com/k3s-io/klog/v2 v2.80.1-k3s1 h1:mGMXURxxmabQurmtRhXuQTJ9jC0pvIhESSxRSymepS8=
Expand Down Expand Up @@ -989,8 +989,8 @@ github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzp
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-shellwords v1.0.12/go.mod h1:EZzvwXDESEeg03EKmM+RmDnNOPKG4lLtQsUlTZDWQ8Y=
github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM=
github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/mattn/go-sqlite3 v1.14.19 h1:fhGleo2h1p8tVChob4I9HpmVFIAkKGpiukdrgQbWfGI=
github.com/mattn/go-sqlite3 v1.14.19/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/matttproud/golang_protobuf_extensions v1.0.2/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
Expand Down
45 changes: 44 additions & 1 deletion pkg/cli/etcdsnapshot/etcd_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package etcdsnapshot
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"os"
"path/filepath"
"sort"
Expand All @@ -17,7 +17,9 @@ import (
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/etcd"
"github.com/k3s-io/k3s/pkg/server"
"github.com/k3s-io/k3s/pkg/util"
util2 "github.com/k3s-io/k3s/pkg/util"
"github.com/pkg/errors"
"github.com/rancher/wrangler/pkg/signals"
"github.com/urfave/cli"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -52,6 +54,7 @@ func commandSetup(app *cli.Context, cfg *cmds.Server, config *server.Config) (*e

config.DisableAgent = true
config.ControlConfig.DataDir = dataDir
config.ControlConfig.BindAddress = cfg.BindAddress
config.ControlConfig.EtcdSnapshotName = cfg.EtcdSnapshotName
config.ControlConfig.EtcdSnapshotDir = cfg.EtcdSnapshotDir
config.ControlConfig.EtcdSnapshotCompress = cfg.EtcdSnapshotCompress
Expand All @@ -73,6 +76,46 @@ func commandSetup(app *cli.Context, cfg *cmds.Server, config *server.Config) (*e
config.ControlConfig.Runtime.ClientETCDKey = filepath.Join(dataDir, "tls", "etcd", "client.key")
config.ControlConfig.Runtime.KubeConfigAdmin = filepath.Join(dataDir, "cred", "admin.kubeconfig")

// We need to go through defaulting of cluster addresses to ensure that the etcd config for the standalone
// command uses the same endpoint selection logic as it does when starting up the full server. Specifically,
// we need to set an IPv6 service CIDR on IPv6-only or IPv6-first nodes, as the etcd default endpoints check
// the service CIDR primary addresss family to determine what loopback address to use.
_, nodeIPs, err := util.GetHostnameAndIPs(cmds.AgentConfig.NodeName, cmds.AgentConfig.NodeIP)
if err != nil {
return nil, err
}

// configure ClusterIPRanges. Use default 10.42.0.0/16 or fd00:42::/56 if user did not set it
_, defaultClusterCIDR, defaultServiceCIDR, _ := util.GetDefaultAddresses(nodeIPs[0])
if len(cfg.ClusterCIDR) == 0 {
cfg.ClusterCIDR.Set(defaultClusterCIDR)
}
for _, cidr := range util.SplitStringSlice(cfg.ClusterCIDR) {
_, parsed, err := net.ParseCIDR(cidr)
if err != nil {
return nil, errors.Wrapf(err, "invalid cluster-cidr %s", cidr)
}
config.ControlConfig.ClusterIPRanges = append(config.ControlConfig.ClusterIPRanges, parsed)
}

// set ClusterIPRange to the first address (first defined IPFamily is preferred)
config.ControlConfig.ClusterIPRange = config.ControlConfig.ClusterIPRanges[0]

// configure ServiceIPRanges. Use default 10.43.0.0/16 or fd00:43::/112 if user did not set it
if len(cfg.ServiceCIDR) == 0 {
cfg.ServiceCIDR.Set(defaultServiceCIDR)
}
for _, cidr := range util.SplitStringSlice(cfg.ServiceCIDR) {
_, parsed, err := net.ParseCIDR(cidr)
if err != nil {
return nil, errors.Wrapf(err, "invalid service-cidr %s", cidr)
}
config.ControlConfig.ServiceIPRanges = append(config.ControlConfig.ServiceIPRanges, parsed)
}

// set ServiceIPRange to the first address (first defined IPFamily is preferred)
config.ControlConfig.ServiceIPRange = config.ControlConfig.ServiceIPRanges[0]

e := etcd.NewETCD()
if err := e.SetControlConfig(&config.ControlConfig); err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions pkg/cli/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
serverConfig.ControlConfig.ExtraEtcdArgs = cfg.ExtraEtcdArgs
serverConfig.ControlConfig.ExtraSchedulerAPIArgs = cfg.ExtraSchedulerArgs
serverConfig.ControlConfig.ClusterDomain = cfg.ClusterDomain
serverConfig.ControlConfig.Datastore.NotifyInterval = 5 * time.Second
serverConfig.ControlConfig.Datastore.Endpoint = cfg.DatastoreEndpoint
serverConfig.ControlConfig.Datastore.BackendTLSConfig.CAFile = cfg.DatastoreCAFile
serverConfig.ControlConfig.Datastore.BackendTLSConfig.CertFile = cfg.DatastoreCertFile
Expand Down Expand Up @@ -441,6 +442,7 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
serverConfig.ControlConfig.DisableControllerManager = true
serverConfig.ControlConfig.DisableScheduler = true
serverConfig.ControlConfig.DisableCCM = true
serverConfig.ControlConfig.DisableServiceLB = true

// If the supervisor and apiserver are on the same port, everything is running embedded
// and we don't need the kubelet or containerd up to perform a cluster reset.
Expand Down
13 changes: 9 additions & 4 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/url"
"runtime"
"strings"
"time"

"github.com/k3s-io/k3s/pkg/clientaccess"
"github.com/k3s-io/k3s/pkg/cluster/managed"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/k3s-io/kine/pkg/endpoint"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/wait"
utilsnet "k8s.io/utils/net"
)

Expand Down Expand Up @@ -107,11 +109,14 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) {
}

if !c.config.EtcdDisableSnapshots {
if err := c.managedDB.ReconcileSnapshotData(ctx); err != nil {
logrus.Errorf("Failed to record snapshots for cluster: %v", err)
}
wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) {
err := c.managedDB.ReconcileSnapshotData(ctx)
if err != nil {
logrus.Errorf("Failed to record snapshots for cluster: %v", err)
}
return err == nil, nil
})
}

return
default:
runtime.Gosched()
Expand Down
36 changes: 19 additions & 17 deletions pkg/daemons/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,25 @@ type Executor interface {
}

type ETCDConfig struct {
InitialOptions `json:",inline"`
Name string `json:"name,omitempty"`
ListenClientURLs string `json:"listen-client-urls,omitempty"`
ListenClientHTTPURLs string `json:"listen-client-http-urls,omitempty"`
ListenMetricsURLs string `json:"listen-metrics-urls,omitempty"`
ListenPeerURLs string `json:"listen-peer-urls,omitempty"`
AdvertiseClientURLs string `json:"advertise-client-urls,omitempty"`
DataDir string `json:"data-dir,omitempty"`
SnapshotCount int `json:"snapshot-count,omitempty"`
ServerTrust ServerTrust `json:"client-transport-security"`
PeerTrust PeerTrust `json:"peer-transport-security"`
ForceNewCluster bool `json:"force-new-cluster,omitempty"`
HeartbeatInterval int `json:"heartbeat-interval"`
ElectionTimeout int `json:"election-timeout"`
Logger string `json:"logger"`
LogOutputs []string `json:"log-outputs"`
ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"`
InitialOptions `json:",inline"`
Name string `json:"name,omitempty"`
ListenClientURLs string `json:"listen-client-urls,omitempty"`
ListenClientHTTPURLs string `json:"listen-client-http-urls,omitempty"`
ListenMetricsURLs string `json:"listen-metrics-urls,omitempty"`
ListenPeerURLs string `json:"listen-peer-urls,omitempty"`
AdvertiseClientURLs string `json:"advertise-client-urls,omitempty"`
DataDir string `json:"data-dir,omitempty"`
SnapshotCount int `json:"snapshot-count,omitempty"`
ServerTrust ServerTrust `json:"client-transport-security"`
PeerTrust PeerTrust `json:"peer-transport-security"`
ForceNewCluster bool `json:"force-new-cluster,omitempty"`
HeartbeatInterval int `json:"heartbeat-interval"`
ElectionTimeout int `json:"election-timeout"`
Logger string `json:"logger"`
LogOutputs []string `json:"log-outputs"`

ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
}

type ServerTrust struct {
Expand Down
57 changes: 32 additions & 25 deletions pkg/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ func (e *ETCD) Reset(ctx context.Context, rebootstrap func() error) error {
if err := os.WriteFile(e.ResetFile(), []byte{}, 0600); err != nil {
return err
}

return e.newCluster(ctx, true)
}

Expand Down Expand Up @@ -757,7 +758,7 @@ func getAdvertiseAddress(advertiseIP string) (string, error) {

// newCluster returns options to set up etcd for a new cluster
func (e *ETCD) newCluster(ctx context.Context, reset bool) error {
logrus.Infof("Starting etcd for new cluster")
logrus.Infof("Starting etcd for new cluster, cluster-reset=%v", reset)
err := e.cluster(ctx, reset, executor.InitialOptions{
AdvertisePeerURL: e.peerURL(),
Cluster: fmt.Sprintf("%s=%s", e.name, e.peerURL()),
Expand All @@ -766,8 +767,10 @@ func (e *ETCD) newCluster(ctx context.Context, reset bool) error {
if err != nil {
return err
}
if err := e.migrateFromSQLite(ctx); err != nil {
return fmt.Errorf("failed to migrate content from sqlite to etcd: %w", err)
if !reset {
if err := e.migrateFromSQLite(ctx); err != nil {
return fmt.Errorf("failed to migrate content from sqlite to etcd: %w", err)
}
}
return nil
}
Expand Down Expand Up @@ -848,7 +851,7 @@ func (e *ETCD) clientURL() string {
// on other nodes connect mid-process.
func (e *ETCD) advertiseClientURLs(reset bool) string {
if reset {
return fmt.Sprintf("https://%s", net.JoinHostPort(e.config.Loopback(true), "2379"))
return fmt.Sprintf("https://%s:2379", e.config.Loopback(true))
}
return e.clientURL()
}
Expand Down Expand Up @@ -905,13 +908,15 @@ func (e *ETCD) cluster(ctx context.Context, reset bool, options executor.Initial
ClientCertAuth: true,
TrustedCAFile: e.config.Runtime.ETCDPeerCA,
},
SnapshotCount: 10000,
ElectionTimeout: 5000,
HeartbeatInterval: 500,
Logger: "zap",
LogOutputs: []string{"stderr"},
ExperimentalInitialCorruptCheck: true,
ListenClientHTTPURLs: e.listenClientHTTPURLs(),
SnapshotCount: 10000,
ElectionTimeout: 5000,
HeartbeatInterval: 500,
Logger: "zap",
LogOutputs: []string{"stderr"},
ListenClientHTTPURLs: e.listenClientHTTPURLs(),

ExperimentalInitialCorruptCheck: true,
ExperimentalWatchProgressNotifyInterval: e.config.Datastore.NotifyInterval,
}, e.config.ExtraEtcdArgs)
}

Expand Down Expand Up @@ -964,20 +969,22 @@ func (e *ETCD) StartEmbeddedTemporary(ctx context.Context) error {
embedded := executor.Embedded{}
ctx, e.cancel = context.WithCancel(ctx)
return embedded.ETCD(ctx, executor.ETCDConfig{
InitialOptions: executor.InitialOptions{AdvertisePeerURL: peerURL},
DataDir: tmpDataDir,
ForceNewCluster: true,
AdvertiseClientURLs: clientURL,
ListenClientURLs: clientURL,
ListenClientHTTPURLs: clientHTTPURL,
ListenPeerURLs: peerURL,
Logger: "zap",
HeartbeatInterval: 500,
ElectionTimeout: 5000,
SnapshotCount: 10000,
Name: e.name,
LogOutputs: []string{"stderr"},
ExperimentalInitialCorruptCheck: true,
InitialOptions: executor.InitialOptions{AdvertisePeerURL: peerURL},
DataDir: tmpDataDir,
ForceNewCluster: true,
AdvertiseClientURLs: clientURL,
ListenClientURLs: clientURL,
ListenClientHTTPURLs: clientHTTPURL,
ListenPeerURLs: peerURL,
Logger: "zap",
HeartbeatInterval: 500,
ElectionTimeout: 5000,
SnapshotCount: 10000,
Name: e.name,
LogOutputs: []string{"stderr"},

ExperimentalInitialCorruptCheck: true,
ExperimentalWatchProgressNotifyInterval: e.config.Datastore.NotifyInterval,
}, append(e.config.ExtraEtcdArgs, "--max-snapshots=0", "--max-wals=0"))
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func generateTestConfig() *config.Control {
EtcdSnapshotRetention: 5,
EtcdS3Endpoint: "s3.amazonaws.com",
EtcdS3Region: "us-east-1",
SANs: []string{"127.0.0.1"},
SANs: []string{"127.0.0.1", mustGetAddress()},
CriticalControlArgs: criticalControlArgs,
}
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/etcd/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,13 @@ func (e *ETCD) Snapshot(ctx context.Context) error {

snapshotDir, err := snapshotDir(e.config, true)
if err != nil {
return errors.Wrap(err, "failed to get the snapshot dir")
return errors.Wrap(err, "failed to get etcd-snapshot-dir")
}

if info, err := os.Stat(snapshotDir); err != nil {
return errors.Wrapf(err, "failed to stat etcd-snapshot-dir %s", snapshotDir)
} else if !info.IsDir() {
return fmt.Errorf("etcd-snapshot-dir %s is not a directory", snapshotDir)
}

cfg, err := getClientConfig(ctx, e.config)
Expand Down Expand Up @@ -436,7 +442,7 @@ func (e *ETCD) listLocalSnapshots() (map[string]snapshotFile, error) {
}

if err := filepath.Walk(snapshotDir, func(path string, file os.FileInfo, err error) error {
if file.IsDir() || err != nil {
if err != nil || file.IsDir() {
return err
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/etcd/snapshot_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ const (
var (
snapshotConfigMapName = version.Program + "-etcd-snapshots"
errNotReconciled = errors.New("no nodes have reconciled ETCDSnapshotFile resources")
reconcileBackoff = wait.Backoff{
Steps: 9,
Duration: 10 * time.Millisecond,
Factor: 3.0,
Jitter: 0.1,
Cap: 30 * time.Second,
}
)

type etcdSnapshotHandler struct {
Expand Down Expand Up @@ -62,7 +69,7 @@ func (e *etcdSnapshotHandler) sync(key string, esf *apisv1.ETCDSnapshotFile) (*a
err := e.reconcile()
if err == errNotReconciled {
logrus.Debugf("Failed to reconcile snapshot ConfigMap: %v, requeuing", err)
e.snapshots.Enqueue(key)
e.snapshots.EnqueueAfter(key, reconcileBackoff.Step())
return nil, nil
}
return nil, err
Expand Down
10 changes: 7 additions & 3 deletions pkg/node/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
coreclient "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
"github.com/sirupsen/logrus"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func Register(ctx context.Context,
Expand Down Expand Up @@ -76,13 +77,12 @@ func (h *handler) updateCoreDNSConfigMap(nodeName, nodeAddress string, removed b
return nil
}

configMapCache, err := h.configMaps.Cache().Get("kube-system", "coredns")
if err != nil || configMapCache == nil {
configMap, err := h.configMaps.Get("kube-system", "coredns", metav1.GetOptions{})
if err != nil || configMap == nil {
logrus.Warn(errors.Wrap(err, "Unable to fetch coredns config map"))
return nil
}

configMap := configMapCache.DeepCopy()
hosts := configMap.Data["NodeHosts"]
hostsMap := map[string]string{}

Expand Down Expand Up @@ -116,6 +116,10 @@ func (h *handler) updateCoreDNSConfigMap(nodeName, nodeAddress string, removed b
for host, ip := range hostsMap {
newHosts += ip + " " + host + "\n"
}

if configMap.Data == nil {
configMap.Data = map[string]string{}
}
configMap.Data["NodeHosts"] = newHosts

if _, err := h.configMaps.Update(configMap); err != nil {
Expand Down
Loading