diff --git a/cmd/agent/app/options/options.go b/cmd/agent/app/options/options.go index a170ab051..b2c427277 100644 --- a/cmd/agent/app/options/options.go +++ b/cmd/agent/app/options/options.go @@ -24,14 +24,38 @@ import ( "github.com/google/uuid" "github.com/spf13/pflag" - "google.golang.org/grpc" "k8s.io/klog/v2" "sigs.k8s.io/apiserver-network-proxy/pkg/agent" "sigs.k8s.io/apiserver-network-proxy/pkg/util" "sigs.k8s.io/apiserver-network-proxy/proto/header" + "strings" + "net" + "k8s.io/utils/strings/slices" ) +type ProxyConnectionManager struct { + val string +} + +var validConnectionManagers = []string{agent.HAConnectionManager, agent.IPListConnectionManager} + +func (p ProxyConnectionManager) String() string { + return p.val +} + +func (p ProxyConnectionManager) Set(s string) error { + if !slices.Contains(validConnectionManagers, s) { + return fmt.Errorf("%s is not one of the valid values [%s]", p, strings.Join(validConnectionManagers, ",")) + } + p.val = s + return nil +} + +func (p ProxyConnectionManager) Type() string { + return "string" +} + type GrpcProxyAgentOptions struct { // Configuration for authenticating with the proxy-server AgentCert string @@ -39,9 +63,11 @@ type GrpcProxyAgentOptions struct { CaCert string // Configuration for connecting to the proxy-server - ProxyServerHost string - ProxyServerPort int - AlpnProtos []string + ProxyConnectionManager ProxyConnectionManager + ProxyServerHost string + ProxyServerPort int + ProxyServerList string + AlpnProtos []string // Bind address for the health connections. HealthServerHost string @@ -70,7 +96,7 @@ type GrpcProxyAgentOptions struct { ServiceAccountTokenPath string // This warns if we attempt to push onto a "full" transfer channel. - // However checking that the transfer channel is full is not safe. + // However, checking that the transfer channel is full is not safe. // It violates our race condition checking. Adding locks around a potentially // blocking call has its own problems, so it cannot easily be made race condition safe. // The check is an "unlocked" read but is still use at your own peril. @@ -79,15 +105,21 @@ type GrpcProxyAgentOptions struct { SyncForever bool } -func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption) *agent.ClientSetConfig { +func (o *GrpcProxyAgentOptions) ClientSetConfig() *agent.ClientSetConfig { return &agent.ClientSetConfig{ + ConnectionManager: o.ProxyConnectionManager.val, Address: fmt.Sprintf("%s:%d", o.ProxyServerHost, o.ProxyServerPort), + Addresses: parseAddresses(o.ProxyServerList), AgentID: o.AgentID, AgentIdentifiers: o.AgentIdentifiers, + CaCert: o.CaCert, + AgentCert: o.AgentCert, + AgentKey: o.AgentKey, + AlpnProtos: o.AlpnProtos, SyncInterval: o.SyncInterval, ProbeInterval: o.ProbeInterval, SyncIntervalCap: o.SyncIntervalCap, - DialOptions: dialOptions, + KeepaliveTime: o.KeepaliveTime, ServiceAccountTokenPath: o.ServiceAccountTokenPath, WarnOnChannelLimit: o.WarnOnChannelLimit, SyncForever: o.SyncForever, @@ -99,8 +131,10 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet { flags.StringVar(&o.AgentCert, "agent-cert", o.AgentCert, "If non-empty secure communication with this cert.") flags.StringVar(&o.AgentKey, "agent-key", o.AgentKey, "If non-empty secure communication with this key.") flags.StringVar(&o.CaCert, "ca-cert", o.CaCert, "If non-empty the CAs we use to validate clients.") - flags.StringVar(&o.ProxyServerHost, "proxy-server-host", o.ProxyServerHost, "The hostname to use to connect to the proxy-server.") - flags.IntVar(&o.ProxyServerPort, "proxy-server-port", o.ProxyServerPort, "The port the proxy server is listening on.") + flags.Var(&o.ProxyConnectionManager, "proxy-connection-manager", fmt.Sprintf("Which connection manager should handle proxy connections [%s]", strings.Join(validConnectionManagers, ","))) + flags.StringVar(&o.ProxyServerHost, "proxy-server-host", o.ProxyServerHost, "The hostname to use to connect to the proxy-server. Useful for single server or load-balancer cases.") + flags.IntVar(&o.ProxyServerPort, "proxy-server-port", o.ProxyServerPort, "The port the proxy server is listening on. Useful for single server or load-balancer cases.") + flags.StringVar(&o.ProxyServerList, "proxy-server-list", o.ProxyServerList, "The comma-separated list of host:port addresses to connect to multiple proxy-servers.") flags.StringSliceVar(&o.AlpnProtos, "alpn-proto", o.AlpnProtos, "Additional ALPN protocols to be presented when connecting to the server. Useful to distinguish between network proxy and apiserver connections that share the same destination address.") flags.StringVar(&o.HealthServerHost, "health-server-host", o.HealthServerHost, "The host address to listen on, without port.") flags.IntVar(&o.HealthServerPort, "health-server-port", o.HealthServerPort, "The port the health server is listening on.") @@ -124,8 +158,10 @@ func (o *GrpcProxyAgentOptions) Print() { klog.V(1).Infof("AgentCert set to %q.\n", o.AgentCert) klog.V(1).Infof("AgentKey set to %q.\n", o.AgentKey) klog.V(1).Infof("CACert set to %q.\n", o.CaCert) + klog.V(1).Infof("ProxyConnectionManager set to %q.\n", o.ProxyConnectionManager) klog.V(1).Infof("ProxyServerHost set to %q.\n", o.ProxyServerHost) klog.V(1).Infof("ProxyServerPort set to %d.\n", o.ProxyServerPort) + klog.V(1).Infof("ProxyServerList set to %q.\n", o.ProxyServerList) klog.V(1).Infof("ALPNProtos set to %+s.\n", o.AlpnProtos) klog.V(1).Infof("HealthServerHost set to %s\n", o.HealthServerHost) klog.V(1).Infof("HealthServerPort set to %d.\n", o.HealthServerPort) @@ -166,8 +202,23 @@ func (o *GrpcProxyAgentOptions) Validate() error { return fmt.Errorf("error checking agent CA cert %s, got %v", o.CaCert, err) } } - if o.ProxyServerPort <= 0 { - return fmt.Errorf("proxy server port %d must be greater than 0", o.ProxyServerPort) + switch o.ProxyConnectionManager.val { + case agent.HAConnectionManager: + if o.ProxyServerPort <= 0 { + return fmt.Errorf("proxy server port %d must be greater than 0", o.ProxyServerPort) + } + if len(o.ProxyServerList) > 0 { + return fmt.Errorf("proxy server list cannot be defined with the HA connection manager") + } + case agent.IPListConnectionManager: + if err := validateAddressList(o.ProxyServerList); err != nil { + return fmt.Errorf("proxy server address list is invalid: #{err}") + } + if len(o.ProxyServerHost) > 0 || o.ProxyServerPort != 0 { + return fmt.Errorf("cannot define proxyServerList with either proxyServerHost or proxyServerPort") + } + default: + return fmt.Errorf("require a valid value for ProxyConnectionManager %s", o.ProxyConnectionManager.val) } if o.HealthServerPort <= 0 { return fmt.Errorf("health server port %d must be greater than 0", o.HealthServerPort) @@ -211,11 +262,34 @@ func validateAgentIdentifiers(agentIdentifiers string) error { return nil } +func validateAddressList(addressList string) error { + addresses := parseAddresses(addressList) + for _, address := range addresses { + if err := validateAddress(address); err != nil { + return err + } + } + return nil +} + +func validateAddress(address string) error { + if net.ParseIP(address) == nil { + return fmt.Errorf("invalid proxy host address #{address}") + } + return nil +} + +func parseAddresses(addressList string) []string { + res := strings.Split(addressList, ",") + return res +} + func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions { o := GrpcProxyAgentOptions{ AgentCert: "", AgentKey: "", CaCert: "", + ProxyConnectionManager: ProxyConnectionManager{val: agent.HAConnectionManager}, ProxyServerHost: "127.0.0.1", ProxyServerPort: 8091, HealthServerHost: "", diff --git a/cmd/agent/app/server.go b/cmd/agent/app/server.go index d0a13a7ef..2aae3611b 100644 --- a/cmd/agent/app/server.go +++ b/cmd/agent/app/server.go @@ -18,7 +18,6 @@ package app import ( "context" - "crypto/tls" "fmt" "net" "net/http" @@ -30,9 +29,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/cobra" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/keepalive" "k8s.io/klog/v2" "sigs.k8s.io/apiserver-network-proxy/cmd/agent/app/options" @@ -81,7 +77,7 @@ func (a *Agent) run(o *options.GrpcProxyAgentOptions) error { } func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, stopCh <-chan struct{}) error { - var tlsConfig *tls.Config + /*var tlsConfig *tls.Config var err error if tlsConfig, err = util.GetClientTLSConfig(o.CaCert, o.AgentCert, o.AgentKey, o.ProxyServerHost, o.AlpnProtos); err != nil { return err @@ -92,8 +88,8 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, stopCh <-ch Time: o.KeepaliveTime, PermitWithoutStream: true, }), - } - cc := o.ClientSetConfig(dialOptions...) + } */ + cc := o.ClientSetConfig() cs := cc.NewAgentClientSet(stopCh) cs.Serve() diff --git a/konnectivity-client/proto/client/client.pb.go b/konnectivity-client/proto/client/client.pb.go index 6af92b448..892346e4e 100644 --- a/konnectivity-client/proto/client/client.pb.go +++ b/konnectivity-client/proto/client/client.pb.go @@ -15,7 +15,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.27.1 -// protoc v3.12.4 +// protoc v3.21.12 // source: konnectivity-client/proto/client/client.proto package client @@ -99,7 +99,6 @@ type Packet struct { Type PacketType `protobuf:"varint,1,opt,name=type,proto3,enum=PacketType" json:"type,omitempty"` // Types that are assignable to Payload: - // // *Packet_DialRequest // *Packet_DialResponse // *Packet_Data diff --git a/konnectivity-client/proto/client/client_grpc.pb.go b/konnectivity-client/proto/client/client_grpc.pb.go index b8d07fe55..5a0d6a2a8 100644 --- a/konnectivity-client/proto/client/client_grpc.pb.go +++ b/konnectivity-client/proto/client/client_grpc.pb.go @@ -16,7 +16,7 @@ limitations under the License. // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v3.12.4 +// - protoc v3.21.12 // source: konnectivity-client/proto/client/client.proto package client diff --git a/pkg/agent/clientset.go b/pkg/agent/clientset.go index c5adcf4f7..49ddd25db 100644 --- a/pkg/agent/clientset.go +++ b/pkg/agent/clientset.go @@ -28,17 +28,197 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "sigs.k8s.io/apiserver-network-proxy/pkg/agent/metrics" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" + "crypto/tls" + "sigs.k8s.io/apiserver-network-proxy/pkg/util" + "net" ) +const HAConnectionManager = "HA-LB" +const IPListConnectionManager = "IPList" + +// type proxyConnectionManager interface { controls connections to the APIServer Network Proxy Server. +// It isolates logic such as the HA Load Balancer case vs know list of Proxy Servers. +type proxyConnectionManager interface { + ensureConnectivity() error + setClientSet(*ClientSet) +} + +// haProxyConnectionManager makes sure that #clients >= #proxy servers +type haProxyConnectionManager struct { + clientSet *ClientSet + + KeepaliveTime time.Duration + dialOptions []grpc.DialOption + + agentID string // ID of this agent + address string // proxy server address. Assuming HA proxy server + serverCount int // number of proxy server instances, should be + + CaCert string + AgentCert string + AgentKey string + AlpnProtos []string + + syncForever bool // Continue syncing (support dynamic server count). +} + +func (m *haProxyConnectionManager) setClientSet(cs *ClientSet) { + m.clientSet = cs +} + +// sync makes sure that #clients >= #proxy servers +func (m *haProxyConnectionManager) ensureConnectivity() error { + defer m.clientSet.shutdown() + backoff := m.clientSet.resetBackoff() + var duration time.Duration + + var tlsConfig *tls.Config + + host, _, err := net.SplitHostPort(m.address) + if err != nil { + return err + } + if tlsConfig, err = util.GetClientTLSConfig(m.CaCert, m.AgentCert, m.AgentKey, host, m.AlpnProtos); err != nil { + return err + } + m.dialOptions = []grpc.DialOption{ + grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: m.KeepaliveTime, + PermitWithoutStream: true, + }), + } + + for { + if err := m.connectOnce(); err != nil { + if dse, ok := err.(*DuplicateServerError); ok { + klog.V(4).InfoS("duplicate server", "serverID", dse.ServerID, "serverCount", m.serverCount, "clientsCount", m.clientSet.ClientsCount()) + if m.serverCount != 0 && m.clientSet.ClientsCount() >= m.serverCount { + duration = backoff.Step() + } + } else { + klog.ErrorS(err, "cannot connect once") + duration = backoff.Step() + } + } else { + backoff = m.clientSet.resetBackoff() + duration = wait.Jitter(backoff.Duration, backoff.Jitter) + } + time.Sleep(duration) + select { + case <-m.clientSet.stopCh: + return nil + default: + } + } + return nil +} + +func (m *haProxyConnectionManager) connectOnce() error { + if !m.syncForever && m.serverCount != 0 && m.clientSet.ClientsCount() >= m.serverCount { + return nil + } + c, serverCount, err := m.clientSet.newAgentClient(m.address, m.agentID, m.dialOptions) + if err != nil { + return err + } + if m.serverCount != 0 && m.serverCount != serverCount { + klog.V(2).InfoS("Server count change suggestion by server", + "current", m.serverCount, "serverID", c.serverID, "actual", serverCount) + + } + m.serverCount = serverCount + if err := m.clientSet.AddClient(c.serverID, c); err != nil { + c.Close() + return err + } + klog.V(2).InfoS("sync added client connecting to proxy server", "serverID", c.serverID) + + labels := runpprof.Labels( + "agentIdentifiers", m.clientSet.agentIdentifiers, + "serverAddress", m.address, + "serverID", c.serverID, + ) + go runpprof.Do(context.Background(), labels, func(context.Context) { c.Serve() }) + return nil +} + +// listProxyConnectionManager makes sure that #clients >= #proxy servers +type listProxyConnectionManager struct { + clientSet *ClientSet + + KeepaliveTime time.Duration + + agentID string // ID of this agent + addresses []string // proxy server addresses. + dialOptions [][]grpc.DialOption + + CaCert string + AgentCert string + AgentKey string + AlpnProtos []string +} + +func (m *listProxyConnectionManager) ensureConnectivity() error { + defer m.clientSet.shutdown() + //backoff := m.clientSet.resetBackoff() + + m.dialOptions = make([][]grpc.DialOption, len(m.addresses)) + for index, address := range m.addresses { + host, _, err := net.SplitHostPort(address) + if err != nil { + return err + } + tlsConfig, err := util.GetClientTLSConfig(m.CaCert, m.AgentCert, m.AgentKey, host, m.AlpnProtos) + if err != nil { + return err + } + m.dialOptions[index] = []grpc.DialOption{ + grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: m.KeepaliveTime, + PermitWithoutStream: true, + }), + } + } + for index, address := range m.addresses { + c, _, err := m.clientSet.newAgentClient(address, m.agentID, m.dialOptions[index]) + if err != nil { + return err + } + if err := m.clientSet.AddClient(c.serverID, c); err != nil { + c.Close() + return err + } + klog.V(2).InfoS("sync added client connecting to proxy server", "serverID", c.serverID) + labels := runpprof.Labels( + "agentIdentifiers", m.clientSet.agentIdentifiers, + "serverAddress", address, + "serverID", c.serverID, + ) + go runpprof.Do(context.Background(), labels, func(context.Context) { c.Serve() }) + } + // TODO: Ensure clients remain valid + return nil +} + +func (m *listProxyConnectionManager) setClientSet(cs *ClientSet) { + m.clientSet = cs +} + // ClientSet consists of clients connected to each instance of an HA proxy server. type ClientSet struct { mu sync.Mutex //protects the clients. clients map[string]*Client // map between serverID and the client + // connects to this server. + //agentID string // ID of this agent + //address string // proxy server address. Assuming HA proxy server + //serverCount int // number of proxy server instances, should be 1 + proxyConnectionManager proxyConnectionManager - agentID string // ID of this agent - address string // proxy server address. Assuming HA proxy server - serverCount int // number of proxy server instances, should be 1 // unless it is an HA server. Initialized when the ClientSet creates // the first client. When syncForever is set, it will be the most recently seen. syncInterval time.Duration // The interval by which the agent @@ -49,7 +229,7 @@ type ClientSet struct { syncIntervalCap time.Duration // The maximum interval // for the syncInterval to back off to when unable to connect to the proxy server - dialOptions []grpc.DialOption + //dialOptions []grpc.DialOption // file path contains service account token serviceAccountTokenPath string // channel to signal shutting down the client set. Primarily for test. @@ -59,8 +239,6 @@ type ClientSet struct { // by the server when choosing agent warnOnChannelLimit bool - - syncForever bool // Continue syncing (support dynamic server count). } func (cs *ClientSet) ClientsCount() int { @@ -129,12 +307,19 @@ func (cs *ClientSet) RemoveClient(serverID string) { } type ClientSetConfig struct { + ConnectionManager string Address string + Addresses []string AgentID string AgentIdentifiers string + CaCert string + AgentCert string + AgentKey string + AlpnProtos []string SyncInterval time.Duration ProbeInterval time.Duration SyncIntervalCap time.Duration + KeepaliveTime time.Duration DialOptions []grpc.DialOption ServiceAccountTokenPath string WarnOnChannelLimit bool @@ -142,24 +327,50 @@ type ClientSetConfig struct { } func (cc *ClientSetConfig) NewAgentClientSet(stopCh <-chan struct{}) *ClientSet { - return &ClientSet{ - clients: make(map[string]*Client), - agentID: cc.AgentID, - agentIdentifiers: cc.AgentIdentifiers, - address: cc.Address, - syncInterval: cc.SyncInterval, - probeInterval: cc.ProbeInterval, - syncIntervalCap: cc.SyncIntervalCap, - dialOptions: cc.DialOptions, + var proxyConnectionManager proxyConnectionManager + if cc.ConnectionManager == HAConnectionManager { + proxyConnectionManager = &haProxyConnectionManager{ + KeepaliveTime: cc.KeepaliveTime, + agentID: cc.AgentID, + address: cc.Address, + syncForever: cc.SyncForever, + CaCert: cc.CaCert, + AgentCert: cc.AgentCert, + AgentKey: cc.AgentKey, + AlpnProtos: cc.AlpnProtos, + } + } else if cc.ConnectionManager == IPListConnectionManager { + proxyConnectionManager = &listProxyConnectionManager{ + KeepaliveTime: cc.KeepaliveTime, + + agentID: cc.AgentID, + addresses: cc.Addresses, + + CaCert: cc.CaCert, + AgentCert: cc.AgentCert, + AgentKey: cc.AgentKey, + AlpnProtos: cc.AlpnProtos, + } + } + + cs := &ClientSet{ + clients: make(map[string]*Client), + proxyConnectionManager: proxyConnectionManager, + agentIdentifiers: cc.AgentIdentifiers, + syncInterval: cc.SyncInterval, + probeInterval: cc.ProbeInterval, + syncIntervalCap: cc.SyncIntervalCap, + //dialOptions: cc.DialOptions, serviceAccountTokenPath: cc.ServiceAccountTokenPath, warnOnChannelLimit: cc.WarnOnChannelLimit, - syncForever: cc.SyncForever, stopCh: stopCh, } + proxyConnectionManager.setClientSet(cs) + return cs } -func (cs *ClientSet) newAgentClient() (*Client, int, error) { - return newAgentClient(cs.address, cs.agentID, cs.agentIdentifiers, cs, cs.dialOptions...) +func (cs *ClientSet) newAgentClient(address string, agentID string, dialOptions []grpc.DialOption) (*Client, int, error) { + return newAgentClient(address, agentID, cs.agentIdentifiers, cs, dialOptions...) } func (cs *ClientSet) resetBackoff() *wait.Backoff { @@ -172,70 +383,11 @@ func (cs *ClientSet) resetBackoff() *wait.Backoff { } } -// sync makes sure that #clients >= #proxy servers -func (cs *ClientSet) sync() { - defer cs.shutdown() - backoff := cs.resetBackoff() - var duration time.Duration - for { - if err := cs.connectOnce(); err != nil { - if dse, ok := err.(*DuplicateServerError); ok { - klog.V(4).InfoS("duplicate server", "serverID", dse.ServerID, "serverCount", cs.serverCount, "clientsCount", cs.ClientsCount()) - if cs.serverCount != 0 && cs.ClientsCount() >= cs.serverCount { - duration = backoff.Step() - } - } else { - klog.ErrorS(err, "cannot connect once") - duration = backoff.Step() - } - } else { - backoff = cs.resetBackoff() - duration = wait.Jitter(backoff.Duration, backoff.Jitter) - } - time.Sleep(duration) - select { - case <-cs.stopCh: - return - default: - } - } -} - -func (cs *ClientSet) connectOnce() error { - if !cs.syncForever && cs.serverCount != 0 && cs.ClientsCount() >= cs.serverCount { - return nil - } - c, serverCount, err := cs.newAgentClient() - if err != nil { - return err - } - if cs.serverCount != 0 && cs.serverCount != serverCount { - klog.V(2).InfoS("Server count change suggestion by server", - "current", cs.serverCount, "serverID", c.serverID, "actual", serverCount) - - } - cs.serverCount = serverCount - if err := cs.AddClient(c.serverID, c); err != nil { - c.Close() - return err - } - klog.V(2).InfoS("sync added client connecting to proxy server", "serverID", c.serverID) - - labels := runpprof.Labels( - "agentIdentifiers", cs.agentIdentifiers, - "serverAddress", cs.address, - "serverID", c.serverID, - ) - go runpprof.Do(context.Background(), labels, func(context.Context) { c.Serve() }) - return nil -} - func (cs *ClientSet) Serve() { labels := runpprof.Labels( "agentIdentifiers", cs.agentIdentifiers, - "serverAddress", cs.address, ) - go runpprof.Do(context.Background(), labels, func(context.Context) { cs.sync() }) + go runpprof.Do(context.Background(), labels, func(context.Context) { cs.proxyConnectionManager.ensureConnectivity() }) } func (cs *ClientSet) shutdown() { diff --git a/proto/agent/agent.pb.go b/proto/agent/agent.pb.go index a124ebcb1..5744e5b67 100644 --- a/proto/agent/agent.pb.go +++ b/proto/agent/agent.pb.go @@ -15,7 +15,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.27.1 -// protoc v3.12.4 +// protoc v3.21.12 // source: proto/agent/agent.proto package agent diff --git a/proto/agent/agent_grpc.pb.go b/proto/agent/agent_grpc.pb.go index 14e738a6e..a01637c6e 100644 --- a/proto/agent/agent_grpc.pb.go +++ b/proto/agent/agent_grpc.pb.go @@ -16,7 +16,7 @@ limitations under the License. // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v3.12.4 +// - protoc v3.21.12 // source: proto/agent/agent.proto package agent