Skip to content

Commit a1f2b4d

Browse files
author
Andrew Lytvynov
committed
Implement kubernetes_service registration and sratup
The new service now starts, registers (locally or via a join token) and heartbeats its presence to the auth server. This service can handle k8s requests (like a proxy) but not to remote teleport clusters. Proxies will be responsible for routing those. The client (tsh) will not yet go to this service, until proxy routing is implemented. I manually tweaked server addres in kubeconfig to test it. You can also run `tctl get kube_service` to list all registered instances. The self-reported info is currently limited - only listening address is set.
1 parent a9e9a2c commit a1f2b4d

32 files changed

+1028
-373
lines changed

Diff for: constants.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,8 @@ const (
104104
// ComponentLabel is a component label name used in reporting
105105
ComponentLabel = "component"
106106

107-
// ComponentKube is a kubernetes proxy
108-
ComponentKube = "proxy:kube"
107+
// ComponentProxyKube is a kubernetes proxy
108+
ComponentProxyKube = "proxy:kube"
109109

110110
// ComponentAuth is the cluster CA node (auth server API)
111111
ComponentAuth = "auth"
@@ -219,6 +219,9 @@ const (
219219
// ComponentCgroup is the cgroup package.
220220
ComponentCgroup = "cgroups"
221221

222+
// ComponentKube is an Kubernetes API gateway.
223+
ComponentKube = "kubernetes"
224+
222225
// DebugEnvVar tells tests to use verbose debug output
223226
DebugEnvVar = "DEBUG"
224227

Diff for: lib/auth/api.go

+12
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ type Announcer interface {
4141
// for the specified duration with second resolution if it's >= 1 second
4242
UpsertAuthServer(s services.Server) error
4343

44+
// UpsertKubeService registers kubernetes presence, permanently if ttl is 0
45+
// or for the specified duration with second resolution if it's >= 1 second
46+
UpsertKubeService(s services.Server) error
47+
4448
// NewKeepAliver returns a new instance of keep aliver
4549
NewKeepAliver(ctx context.Context) (services.KeepAliver, error)
4650
}
@@ -96,6 +100,9 @@ type ReadAccessPoint interface {
96100

97101
// GetTunnelConnections returns tunnel connections for a given cluster
98102
GetTunnelConnections(clusterName string, opts ...services.MarshalOption) ([]services.TunnelConnection, error)
103+
104+
// GetKubeServices returns a list of kubernetes services registered in the cluster
105+
GetKubeServices() ([]services.Server, error)
99106
}
100107

101108
// AccessPoint is an API interface implemented by a certificate authority (CA)
@@ -237,6 +244,11 @@ func (w *Wrapper) DeleteSemaphore(ctx context.Context, filter services.Semaphore
237244
return w.NoCache.DeleteSemaphore(ctx, filter)
238245
}
239246

247+
// UpsertKubeService is part of auth.AccessPoint implementation
248+
func (w *Wrapper) UpsertKubeService(s services.Server) error {
249+
return w.NoCache.UpsertKubeService(s)
250+
}
251+
240252
// NewCachingAcessPoint returns new caching access point using
241253
// access point policy
242254
type NewCachingAccessPoint func(clt ClientI, cacheName []string) (AccessPoint, error)

Diff for: lib/auth/apiserver.go

+20
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ func NewAPIServer(config *APIConfig) http.Handler {
140140
srv.DELETE("/:version/tunnelconnections/:cluster/:conn", srv.withAuth(srv.deleteTunnelConnection))
141141
srv.DELETE("/:version/tunnelconnections/:cluster", srv.withAuth(srv.deleteTunnelConnections))
142142
srv.DELETE("/:version/tunnelconnections", srv.withAuth(srv.deleteAllTunnelConnections))
143+
srv.POST("/:version/kube_services", srv.withAuth(srv.upsertKubeService))
144+
srv.GET("/:version/kube_services", srv.withAuth(srv.getKubeServices))
143145

144146
// Server Credentials
145147
srv.POST("/:version/server/credentials", srv.withAuth(srv.generateServerKeys))
@@ -366,6 +368,12 @@ func (s *APIServer) upsertServer(auth ClientI, role teleport.Role, w http.Respon
366368
if err := auth.UpsertProxy(server); err != nil {
367369
return nil, trace.Wrap(err)
368370
}
371+
case teleport.RoleKube:
372+
if err := auth.UpsertKubeService(server); err != nil {
373+
return nil, trace.Wrap(err)
374+
}
375+
default:
376+
return nil, trace.BadParameter("unknown server role %q", role)
369377
}
370378
return message("ok"), nil
371379
}
@@ -2481,6 +2489,18 @@ func (s *APIServer) getServerID(r *http.Request) (string, error) {
24812489
return strings.TrimSuffix(role.Username, "."+clusterName), nil
24822490
}
24832491

2492+
func (s *APIServer) upsertKubeService(auth ClientI, w http.ResponseWriter, r *http.Request, p httprouter.Params, version string) (interface{}, error) {
2493+
return s.upsertServer(auth, teleport.RoleKube, w, r, p, version)
2494+
}
2495+
2496+
func (s *APIServer) getKubeServices(auth ClientI, w http.ResponseWriter, r *http.Request, p httprouter.Params, version string) (interface{}, error) {
2497+
servers, err := auth.GetKubeServices()
2498+
if err != nil {
2499+
return nil, trace.Wrap(err)
2500+
}
2501+
return marshalServers(servers, version)
2502+
}
2503+
24842504
func message(msg string) map[string]interface{} {
24852505
return map[string]interface{}{"message": msg}
24862506
}

Diff for: lib/auth/auth_with_roles.go

+20
Original file line numberDiff line numberDiff line change
@@ -1969,6 +1969,26 @@ func (a *ServerWithRoles) WaitForDelivery(context.Context) error {
19691969
return nil
19701970
}
19711971

1972+
func (a *ServerWithRoles) UpsertKubeService(s services.Server) error {
1973+
if err := a.action(defaults.Namespace, services.KindKubeService, services.VerbCreate); err != nil {
1974+
return trace.Wrap(err)
1975+
}
1976+
if err := a.action(defaults.Namespace, services.KindKubeService, services.VerbUpdate); err != nil {
1977+
return trace.Wrap(err)
1978+
}
1979+
return a.authServer.UpsertKubeService(s)
1980+
}
1981+
1982+
func (a *ServerWithRoles) GetKubeServices() ([]services.Server, error) {
1983+
if err := a.action(defaults.Namespace, services.KindKubeService, services.VerbList); err != nil {
1984+
return nil, trace.Wrap(err)
1985+
}
1986+
if err := a.action(defaults.Namespace, services.KindKubeService, services.VerbRead); err != nil {
1987+
return nil, trace.Wrap(err)
1988+
}
1989+
return a.authServer.GetKubeServices()
1990+
}
1991+
19721992
// NewAdminAuthServer returns auth server authorized as admin,
19731993
// used for auth server cached access
19741994
func NewAdminAuthServer(authServer *Server, sessions session.Service, alog events.IAuditLog) (ClientI, error) {

Diff for: lib/auth/clt.go

+36
Original file line numberDiff line numberDiff line change
@@ -2920,6 +2920,42 @@ func (c *Client) DeleteSemaphore(ctx context.Context, filter services.SemaphoreF
29202920
return nil
29212921
}
29222922

2923+
// UpsertKubeService is used by kubernetes services to report their presence
2924+
// to other auth servers in form of hearbeat expiring after ttl period.
2925+
func (c *Client) UpsertKubeService(s services.Server) error {
2926+
data, err := services.GetServerMarshaler().MarshalServer(s)
2927+
if err != nil {
2928+
return trace.Wrap(err)
2929+
}
2930+
args := &upsertServerRawReq{
2931+
Server: data,
2932+
}
2933+
_, err = c.PostJSON(c.Endpoint("kube_services"), args)
2934+
return trace.Wrap(err)
2935+
}
2936+
2937+
// GetKubeServices returns the list of kubernetes services registered in the
2938+
// cluster.
2939+
func (c *Client) GetKubeServices() ([]services.Server, error) {
2940+
out, err := c.Get(c.Endpoint("kube_services"), url.Values{})
2941+
if err != nil {
2942+
return nil, trace.Wrap(err)
2943+
}
2944+
var items []json.RawMessage
2945+
if err := json.Unmarshal(out.Bytes(), &items); err != nil {
2946+
return nil, trace.Wrap(err)
2947+
}
2948+
re := make([]services.Server, len(items))
2949+
for i, raw := range items {
2950+
server, err := services.GetServerMarshaler().UnmarshalServer(raw, services.KindKubeService, services.SkipValidation())
2951+
if err != nil {
2952+
return nil, trace.Wrap(err)
2953+
}
2954+
re[i] = server
2955+
}
2956+
return re, nil
2957+
}
2958+
29232959
// WebService implements features used by Web UI clients
29242960
type WebService interface {
29252961
// GetWebSessionInfo checks if a web sesion is valid, returns session id in case if

Diff for: lib/auth/permissions.go

+17
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,23 @@ func GetCheckerForBuiltinRole(clusterName string, clusterConfig services.Cluster
416416
Rules: []services.Rule{},
417417
},
418418
})
419+
case teleport.RoleKube:
420+
return services.FromSpec(
421+
role.String(),
422+
services.RoleSpecV3{
423+
Allow: services.RoleConditions{
424+
Namespaces: []string{services.Wildcard},
425+
Rules: []services.Rule{
426+
services.NewRule(services.KindKubeService, services.RW()),
427+
services.NewRule(services.KindEvent, services.RW()),
428+
services.NewRule(services.KindCertAuthority, services.ReadNoSecrets()),
429+
services.NewRule(services.KindClusterConfig, services.RO()),
430+
services.NewRule(services.KindUser, services.RO()),
431+
services.NewRule(services.KindRole, services.RO()),
432+
services.NewRule(services.KindNamespace, services.RO()),
433+
},
434+
},
435+
})
419436
}
420437

421438
return nil, trace.NotFound("%v is not reconginzed", role.String())

Diff for: lib/cache/cache.go

+19
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,20 @@ func ForNode(cfg Config) Config {
9090
return cfg
9191
}
9292

93+
// ForKubernetes sets up watch configuration for a kubernetes service.
94+
func ForKubernetes(cfg Config) Config {
95+
cfg.Watches = []services.WatchKind{
96+
{Kind: services.KindCertAuthority, LoadSecrets: false},
97+
{Kind: services.KindClusterName},
98+
{Kind: services.KindClusterConfig},
99+
{Kind: services.KindUser},
100+
{Kind: services.KindRole},
101+
{Kind: services.KindNamespace, Name: defaults.Namespace},
102+
}
103+
cfg.QueueSize = defaults.KubernetesQueueSize
104+
return cfg
105+
}
106+
93107
// SetupConfigFn is a function that sets up configuration
94108
// for cache
95109
type SetupConfigFn func(c Config) Config
@@ -661,3 +675,8 @@ func (c *Cache) GetTunnelConnections(clusterName string, opts ...services.Marsha
661675
func (c *Cache) GetAllTunnelConnections(opts ...services.MarshalOption) (conns []services.TunnelConnection, err error) {
662676
return c.presenceCache.GetAllTunnelConnections(opts...)
663677
}
678+
679+
// GetKubeServices is a part of auth.AccessPoint implementation
680+
func (c *Cache) GetKubeServices() ([]services.Server, error) {
681+
return c.presenceCache.GetKubeServices()
682+
}

Diff for: lib/config/configuration.go

+11-11
Original file line numberDiff line numberDiff line change
@@ -160,16 +160,16 @@ func ApplyFileConfig(fc *FileConfig, cfg *service.Config) error {
160160
return nil
161161
}
162162
// merge file-based config with defaults in 'cfg'
163-
if fc.Auth.Disabled() {
163+
if fc.Auth.Disabled(true) {
164164
cfg.Auth.Enabled = false
165165
}
166-
if fc.SSH.Disabled() {
166+
if fc.SSH.Disabled(true) {
167167
cfg.SSH.Enabled = false
168168
}
169-
if fc.Proxy.Disabled() {
169+
if fc.Proxy.Disabled(true) {
170170
cfg.Proxy.Enabled = false
171171
}
172-
if fc.Kube.Enabled() {
172+
if fc.Kube.Enabled(false) {
173173
cfg.Kube.Enabled = true
174174
}
175175
applyString(fc.NodeName, &cfg.Hostname)
@@ -323,25 +323,25 @@ func ApplyFileConfig(fc *FileConfig, cfg *service.Config) error {
323323

324324
// Apply configuration for "auth_service", "proxy_service", and
325325
// "ssh_service" if it's enabled.
326-
if fc.Auth.Enabled() {
326+
if fc.Auth.Enabled(true) {
327327
err = applyAuthConfig(fc, cfg)
328328
if err != nil {
329329
return trace.Wrap(err)
330330
}
331331
}
332-
if fc.Proxy.Enabled() {
332+
if fc.Proxy.Enabled(true) {
333333
err = applyProxyConfig(fc, cfg)
334334
if err != nil {
335335
return trace.Wrap(err)
336336
}
337337
}
338-
if fc.SSH.Enabled() {
338+
if fc.SSH.Enabled(true) {
339339
err = applySSHConfig(fc, cfg)
340340
if err != nil {
341341
return trace.Wrap(err)
342342
}
343343
}
344-
if fc.Kube.Enabled() {
344+
if fc.Kube.Enabled(false) {
345345
if err := applyKubeConfig(fc, cfg); err != nil {
346346
return trace.Wrap(err)
347347
}
@@ -540,7 +540,7 @@ func applyProxyConfig(fc *FileConfig, cfg *service.Config) error {
540540

541541
// apply kubernetes proxy config, by default kube proxy is disabled
542542
if fc.Proxy.Kube.Configured() {
543-
cfg.Proxy.Kube.Enabled = fc.Proxy.Kube.Enabled()
543+
cfg.Proxy.Kube.Enabled = fc.Proxy.Kube.Enabled(false)
544544
}
545545
if fc.Proxy.Kube.KubeconfigFile != "" {
546546
cfg.Proxy.Kube.KubeconfigPath = fc.Proxy.Kube.KubeconfigFile
@@ -653,8 +653,8 @@ func applySSHConfig(fc *FileConfig, cfg *service.Config) error {
653653

654654
// applyKubeConfig applies file configuration for the "kubernetes_service" section.
655655
func applyKubeConfig(fc *FileConfig, cfg *service.Config) error {
656-
if fc.Proxy.ListenAddress != "" {
657-
addr, err := utils.ParseHostPortAddr(fc.Proxy.ListenAddress, int(defaults.SSHProxyListenPort))
656+
if fc.Kube.ListenAddress != "" {
657+
addr, err := utils.ParseHostPortAddr(fc.Kube.ListenAddress, int(defaults.SSHProxyListenPort))
658658
if err != nil {
659659
return trace.Wrap(err)
660660
}

0 commit comments

Comments
 (0)