diff --git a/cmd/spire-server/cli/agent/agent_test.go b/cmd/spire-server/cli/agent/agent_test.go index 1cd3c14c5d..0f4fe12b79 100644 --- a/cmd/spire-server/cli/agent/agent_test.go +++ b/cmd/spire-server/cli/agent/agent_test.go @@ -20,7 +20,17 @@ import ( ) var ( - testAgents = []*types.Agent{{Id: &types.SPIFFEID{TrustDomain: "example.org", Path: "/spire/agent/agent1"}}} + testAgents = []*types.Agent{{Id: &types.SPIFFEID{TrustDomain: "example.org", Path: "/spire/agent/agent1"}}} + testAgentsWithSelectors = []*types.Agent{ + { + Id: &types.SPIFFEID{TrustDomain: "example.org", Path: "/spire/agent/agent2"}, + Selectors: []*types.Selector{ + {Type: "k8s_psat", Value: "agent_ns:spire"}, + {Type: "k8s_psat", Value: "agent_sa:spire-agent"}, + {Type: "k8s_psat", Value: "cluster:demo-cluster"}, + }, + }, + } ) type agentTest struct { @@ -205,6 +215,13 @@ func TestShow(t *testing.T) { expectedReturnCode: 1, expectedStderr: "Error: connection error: desc = \"transport: error while dialing: dial unix does-not-exist.sock: connect: no such file or directory\"\n", }, + { + name: "show selectors", + args: []string{"-spiffeID", "spiffe://example.org/spire/agent/agent2"}, + existentAgents: testAgentsWithSelectors, + expectedReturnCode: 0, + expectedStdout: "Selectors : k8s_psat:agent_ns:spire\nSelectors : k8s_psat:agent_sa:spire-agent\nSelectors : k8s_psat:cluster:demo-cluster", + }, } { tt := tt t.Run(tt.name, func(t *testing.T) { diff --git a/cmd/spire-server/cli/agent/show.go b/cmd/spire-server/cli/agent/show.go index e02a5e1c99..cb432037cd 100644 --- a/cmd/spire-server/cli/agent/show.go +++ b/cmd/spire-server/cli/agent/show.go @@ -62,6 +62,9 @@ func (c *showCommand) Run(ctx context.Context, env *common_cli.Env, serverClient return err } + for _, s := range agent.Selectors { + env.Printf("Selectors : %s:%s\n", s.Type, s.Value) + } return nil } diff --git a/cmd/spire-server/cli/healthcheck/healthcheck.go b/cmd/spire-server/cli/healthcheck/healthcheck.go index 44e3fd19a6..fec5d9a855 100644 --- a/cmd/spire-server/cli/healthcheck/healthcheck.go +++ b/cmd/spire-server/cli/healthcheck/healthcheck.go @@ -83,6 +83,8 @@ func (c *healthCheckCommand) run() error { } return errors.New("cannot create registration client") } + defer client.Release() + bundleClient := client.NewBundleClient() // Currently using the ability to fetch a bundle as the health check. This diff --git a/cmd/spire-server/cli/run/run.go b/cmd/spire-server/cli/run/run.go index 8897b4a008..cd34b60b8c 100644 --- a/cmd/spire-server/cli/run/run.go +++ b/cmd/spire-server/cli/run/run.go @@ -48,7 +48,7 @@ var ( Organization: []string{"SPIFFE"}, } - defaultRateLimitAttestation = true + defaultRateLimit = true ) // Config contains all available configurables, arranged by section @@ -155,6 +155,7 @@ type federatesWithBundleEndpointConfig struct { type rateLimitConfig struct { Attestation *bool `hcl:"attestation"` + Signing *bool `hcl:"signing"` UnusedKeys []string `hcl:",unusedKeys"` } @@ -368,11 +369,17 @@ func NewServerConfig(c *Config, logOptions []log.Option, allowUnknownConfig bool sc.Log = logger if c.Server.RateLimit.Attestation == nil { - c.Server.RateLimit.Attestation = &defaultRateLimitAttestation + c.Server.RateLimit.Attestation = &defaultRateLimit } sc.RateLimit.Attestation = *c.Server.RateLimit.Attestation + if c.Server.RateLimit.Signing == nil { + c.Server.RateLimit.Signing = &defaultRateLimit + } + sc.RateLimit.Signing = *c.Server.RateLimit.Signing + sc.Experimental.AllowAgentlessNodeAttestors = c.Server.Experimental.AllowAgentlessNodeAttestors + if c.Server.Federation != nil { if c.Server.Federation.BundleEndpoint != nil { sc.Federation.BundleEndpoint = &bundle.EndpointConfig{ diff --git a/cmd/spire-server/cli/run/run_test.go b/cmd/spire-server/cli/run/run_test.go index 4f9aa9cf7a..14fe9aae9b 100644 --- a/cmd/spire-server/cli/run/run_test.go +++ b/cmd/spire-server/cli/run/run_test.go @@ -835,6 +835,34 @@ func TestNewServerConfig(t *testing.T) { require.True(t, c.RateLimit.Attestation) }, }, + { + msg: "signing rate limit is on by default", + input: func(c *Config) { + }, + test: func(t *testing.T, c *server.Config) { + require.True(t, c.RateLimit.Signing) + }, + }, + { + msg: "signing rate limit can be explicitly disabled", + input: func(c *Config) { + value := false + c.Server.RateLimit.Signing = &value + }, + test: func(t *testing.T, c *server.Config) { + require.False(t, c.RateLimit.Signing) + }, + }, + { + msg: "signing rate limit can be explicitly enabled", + input: func(c *Config) { + value := true + c.Server.RateLimit.Signing = &value + }, + test: func(t *testing.T, c *server.Config) { + require.True(t, c.RateLimit.Signing) + }, + }, } for _, testCase := range cases { diff --git a/conf/agent/agent_full.conf b/conf/agent/agent_full.conf index 65d6b2acb0..d776f7ac9d 100644 --- a/conf/agent/agent_full.conf +++ b/conf/agent/agent_full.conf @@ -1,5 +1,5 @@ # This is the SPIRE Agent configuration file including all possible configuration -# options. +# options. # agent: Contains core configuration parameters. agent { @@ -24,16 +24,16 @@ agent { # server_address: DNS name or IP address of the SPIRE server. server_address = "127.0.0.1" - + # server_port: Port number of the SPIRE server. server_port = "8081" - + # socket_path: Location to bind the workload API socket. Default: /tmp/agent.sock. socket_path = "/tmp/agent.sock" - + # trust_bundle_path: Path to the SPIRE server CA bundle. trust_bundle_path = "./conf/agent/dummy_root_ca.crt" - + # trust_bundle_url: URL to download the initial SPIRE server trust bundle. # trust_bundle_url = "" @@ -56,9 +56,9 @@ agent { # Each nested object has the following format: # # PluginType "plugin_name" { -# +# # # plugin_cmd: Path to the plugin implementation binary (optional, not -# # needed for built-ins) +# # needed for built-ins) # plugin_cmd = # # # plugin_checksum: An optional sha256 of the plugin binary (optional, @@ -149,7 +149,7 @@ plugins { # cluster: Name of the cluster. It must correspond to a cluster # configured in the server plugin. # cluster = "" - + # token_path: Path to the service account token on disk. # Default: /run/secrets/kubernetes.io/serviceaccount/token. # token_path = "/run/secrets/kubernetes.io/serviceaccount/token" @@ -180,7 +180,7 @@ plugins { # certificate_path: The path to the certificate bundle on disk. The # file must contain one or more PEM blocks, starting with the identity # certificate followed by any intermediate certificates necessary for - # chain-of-trust validation. + # chain-of-trust validation. # certificate_path = "" # intermediates_path: Optional. The path to a chain of intermediate @@ -204,7 +204,7 @@ plugins { # docker_version = "" } } - + # WorkloadAttestor "k8s": A workload attestor which allows selectors based # on Kubernetes constructs such ns (namespace) and sa (service account). WorkloadAttestor "k8s" { @@ -244,7 +244,7 @@ plugins { # node_name_env = "MY_NODE_NAME" # node_name: The name of the node. Overrides the value obtained by - # the environment variable specified by node_name_env. + # the environment variable specified by node_name_env. # node_name = "" } } @@ -276,7 +276,7 @@ plugins { # port = 9988 # } -# DogStatsd = [ +# DogStatsd = [ # # List of DogStatsd addresses. # { address = "localhost:8125" }, # { address = "collector.example.org:1337" }, @@ -301,7 +301,7 @@ plugins { # } # health_checks: If health checking is desired use this section to configure -# and expose an additional server endpoint for such purpose. +# and expose an additional agent endpoint for such purpose. # health_checks { # # listener_enabled: Enables health checks endpoint. # listener_enabled = true @@ -312,9 +312,9 @@ plugins { # # bind_port: HTTP Port number of the health checks endpoint. Default: 80. # # bind_port = "80" -# # live_path: HTTP resource path for checking server liveness. Default: /live. +# # live_path: HTTP resource path for checking agent liveness. Default: /live. # # live_path = "/live" -# # ready_path: HTTP resource path for checking server readiness. Default: /ready. +# # ready_path: HTTP resource path for checking agent readiness. Default: /ready. # # ready_path = "/ready" # } diff --git a/conf/server/server_full.conf b/conf/server/server_full.conf index e608a0fa2c..e711a626d8 100644 --- a/conf/server/server_full.conf +++ b/conf/server/server_full.conf @@ -105,6 +105,10 @@ server { # # Controls whether or not node attestation is rate limited to one # # attempt per-second per-IP. Default: true. # attestation = true + + # # Controls whether or not X509 and JWT signing are rate limited to 500 + # # requests per-second per-IP (separately). Default: true. + # signing = true # } # registration_uds_path: Location to bind the registration API socket. diff --git a/doc/plugin_agent_workloadattestor_k8s.md b/doc/plugin_agent_workloadattestor_k8s.md index fcf4a18a81..e759d88bae 100644 --- a/doc/plugin_agent_workloadattestor_k8s.md +++ b/doc/plugin_agent_workloadattestor_k8s.md @@ -1,6 +1,6 @@ # Agent plugin: WorkloadAttestor "k8s" -The `k8s` plugin generates kubernetes-based selectors for workloads calling the agent. +The `k8s` plugin generates Kubernetes-based selectors for workloads calling the agent. It does so by retrieving the workload's pod ID from its cgroup membership, then querying the kubelet for information about the pod. @@ -19,16 +19,17 @@ the kubelet is contacted over 127.0.0.1 (requires host networking to be enabled). In the latter case, the hostname is used to perform certificate server name validation against the kubelet certificate. -**Note** kubelet authentication via bearer token requires that the kubelet be -started with the `--authentication-token-webhook` flag. See [Kubelet authentication/authorization](https://kubernetes.io/docs/reference/command-line-tools-reference/kubelet-authentication-authorization/) -for details. +> **Note** kubelet authentication via bearer token requires that the kubelet be +> started with the `--authentication-token-webhook` flag. +> See [Kubelet authentication/authorization](https://kubernetes.io/docs/reference/command-line-tools-reference/kubelet-authentication-authorization/) +> for details. -**Note** The kubelet uses the TokenReview API to validate bearer tokens. This -requires reachability to the Kubernetes API server. Therefore API server downtime can -interrupt workload attestation. The `--authentication-token-webhook-cache-ttl` kubelet flag -controls how long the kubelet caches TokenReview responses and may help to -mitigate this issue. A large cache ttl value is not recommended however, as -that can impact permission revocation. +> **Note** The kubelet uses the TokenReview API to validate bearer tokens. +> This requires reachability to the Kubernetes API server. Therefore API server downtime can +> interrupt workload attestation. The `--authentication-token-webhook-cache-ttl` kubelet flag +> controls how long the kubelet caches TokenReview responses and may help to +> mitigate this issue. A large cache ttl value is not recommended however, as +> that can impact permission revocation. | Configuration | Description | | ------------- | ----------- | @@ -46,19 +47,23 @@ that can impact permission revocation. | -------- | ----- | | k8s:ns | The workload's namespace | | k8s:sa | The workload's service account | -| k8s:container-image | The image of the workload's container | +| k8s:container-image | The Image OR ImageID of the container in the workload's pod which is requesting an SVID, [as reported by K8S](https://pkg.go.dev/k8s.io/api/core/v1#ContainerStatus). Selector value may be an image tag, such as: `docker.io/envoyproxy/envoy-alpine:v1.16.0`, or a resolved SHA256 image digest, such as `docker.io/envoyproxy/envoy-alpine@sha256:bf862e5f5eca0a73e7e538224578c5cf867ce2be91b5eaed22afc153c00363eb` | | k8s:container-name | The name of the workload's container | | k8s:node-name | The name of the workload's node | -| k8s:pod-label | A label given to the the workload's pod | +| k8s:pod-label | A label given to the workload's pod | | k8s:pod-owner | The name of the workload's pod owner | | k8s:pod-owner-uid | The UID of the workload's pod owner | | k8s:pod-uid | The UID of the workload's pod | | k8s:pod-name | The name of the workload's pod | -| k8s:pod-image | An image of a container in workload's pod | +| k8s:pod-image | An Image OR ImageID of any container in the workload's pod, [as reported by K8S](https://pkg.go.dev/k8s.io/api/core/v1#ContainerStatus). Selector value may be an image tag, such as: `docker.io/envoyproxy/envoy-alpine:v1.16.0`, or a resolved SHA256 image digest, such as `docker.io/envoyproxy/envoy-alpine@sha256:bf862e5f5eca0a73e7e538224578c5cf867ce2be91b5eaed22afc153c00363eb`| | k8s:pod-image-count | The number of container images in workload's pod | -| k8s:pod-init-image | An image of an init container in workload's pod | +| k8s:pod-init-image | An Image OR ImageID of any init container in the workload's pod, [as reported by K8S](https://pkg.go.dev/k8s.io/api/core/v1#ContainerStatus). Selector value may be an image tag, such as: `docker.io/envoyproxy/envoy-alpine:v1.16.0`, or a resolved SHA256 image digest, such as `docker.io/envoyproxy/envoy-alpine@sha256:bf862e5f5eca0a73e7e538224578c5cf867ce2be91b5eaed22afc153c00363eb`| | k8s:pod-init-image-count | The number of init container images in workload's pod | +> **Note** `container-image` will ONLY match against the specific container in the pod that is contacting SPIRE on behalf of +> the pod, whereas `pod-image` and `pod-init-image` will match against ANY container or init container in the Pod, +> respectively. + ## Examples To use the kubelet read-only port: diff --git a/doc/plugin_server_nodeattestor_aws_iid.md b/doc/plugin_server_nodeattestor_aws_iid.md index 5090676734..c2f83924f0 100644 --- a/doc/plugin_server_nodeattestor_aws_iid.md +++ b/doc/plugin_server_nodeattestor_aws_iid.md @@ -14,6 +14,7 @@ this plugin resolves the agent's AWS IID-based SPIFFE ID into a set of selectors | `access_key_id` | AWS access key id | Value of `AWS_ACCESS_KEY_ID` environment variable | | `secret_access_key` | AWS secret access key | Value of `AWS_SECRET_ACCESS_KEY` environment variable | | `skip_block_device` | Skip anti-tampering mechanism which checks to make sure that the underlying root volume has not been detached prior to attestation. | false | +| `disable_instance_profile_selectors` | Disables retrieving the attesting instance profile information that is used in the selectors. Useful in cases where the server cannot reach iam.amazonaws.com | false | A sample configuration: @@ -25,6 +26,12 @@ A sample configuration: } } ``` + +## Disabling Instance Profile Selectors +In cases where spire-server is running in a location with no public internet access available, setting `disable_instance_profile_selectors = true` will prevent the server from making requests to `iam.amazonaws.com`. This is needed as spire-server will fail to attest nodes as it cannot retrieve the metadata information. + +When this is enabled, `IAM Role` selector information will no longer be available for use. + ## AWS IAM Permissions The user or role identified by the configured credentials must have permissions for `ec2:DescribeInstances`. @@ -58,7 +65,7 @@ This plugin generates the following selectors related to the instance where the All of the selectors have the type `aws_iid`. -The `IAM role` selector is included in the generated set of selectors only if the instance has an IAM Instance Profile associated. +The `IAM role` selector is included in the generated set of selectors only if the instance has an IAM Instance Profile associated and `disable_instance_profile_selectors = false` ## Security Considerations The AWS Instance Identity Document, which this attestor leverages to prove node identity, is available to any process running on the node by default. As a result, it is possible for non-agent code running on a node to attest to the SPIRE Server, allowing it to obtain any workload identity that the node is authorized to run. diff --git a/doc/plugin_server_notifier_k8sbundle.md b/doc/plugin_server_notifier_k8sbundle.md index a695a84622..d8add7200b 100644 --- a/doc/plugin_server_notifier_k8sbundle.md +++ b/doc/plugin_server_notifier_k8sbundle.md @@ -14,6 +14,7 @@ The plugin accepts the following configuration options: | config_map | The name of the ConfigMap | `spire-bundle` | | config_map_key | The key within the ConfigMap for the bundle | `bundle.crt` | | kube_config_file_path | The path on disk to the kubeconfig containing configuration to enable interaction with the Kubernetes API server. If unset, it is assumed the notifier is in-cluster and in-cluster credentials will be used. | | +| webhook_label | If set, rotate the CA Bundle in validating and mutating webhooks with this label set to `true`. | | ## Configuring Kubernetes @@ -22,6 +23,7 @@ The following actions are required to set up the plugin. - Bind ClusterRole or Role that can `get` and `patch` the ConfigMap to Service Account - In the case of in-cluster SPIRE server, it is Service Account that runs the SPIRE server - In the case of out-of-cluster SPIRE server, it is Service Account that interacts with the Kubernetes API server + - In the case of setting `webhook_label`, the ClusterRole additionally needs permissions to `get`, `list`, `patch`, and `watch` `mutatingwebhookconfigurations` and `validatingwebhookconfigurations`. - Create the ConfigMap that the plugin pushes For example: @@ -62,6 +64,23 @@ metadata: namespace: spire ``` +### Configuration when Rotating Webhook CA Bundles +When rotating webhook CA bundles, use the below ClusterRole: + +```yaml +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: spire-server-cluster-role +rules: +- apiGroups: [""] + resources: ["configmaps"] + verbs: ["get", "patch"] +- apiGroups: ["admissionregistration.k8s.io"] + resources: ["mutatingwebhookconfigurations", "validatingwebhookconfigurations"] + verbs: ["get", "list", "patch", "watch"] +``` + ## Sample configurations ### Default In-Cluster @@ -92,3 +111,17 @@ the credentials found in the `/path/to/kubeconfig` file. } } ``` + +### Default In-Cluster with Webhook Rotation +The following configuration pushes bundle contents from an in-cluster SPIRE +server to +- The `bundle.crt` key in the `spire:spire-bundle` ConfigMap +- Validating and mutating webhooks with a label of `spiffe.io/webhook: true` + +``` + Notifier "k8sbundle" { + plugin_data { + webhook_label = "spiffe.io/webhook" + } + } +``` diff --git a/doc/spire_agent.md b/doc/spire_agent.md index 5864609d92..04021160e1 100644 --- a/doc/spire_agent.md +++ b/doc/spire_agent.md @@ -109,7 +109,7 @@ The agent can expose additional endpoint that can be used for health checking. I health_checks { listener_enabled = true bind_address = "localhost" - bind_port = "80" + bind_port = "8080" live_path = "/live" ready_path = "/ready" } diff --git a/doc/spire_server.md b/doc/spire_server.md index 50a80f9f88..620e2d262a 100644 --- a/doc/spire_server.md +++ b/doc/spire_server.md @@ -75,6 +75,7 @@ This may be useful for templating configuration files, for example across differ | ratelimit | Description | Default | |:----------------------------|--------------------------------|----------------| | `attestation` | Whether or not to rate limit node attestation. If true, node attestation is rate limited to one attempt per second per IP address. | true | +| `signing` | Whether or not to rate limit JWT and X509 signing. If true, JWT and X509 signing are rate limited to 500 requests per second per IP address (separately). | true | ## Plugin configuration @@ -185,7 +186,7 @@ The server can expose an additional endpoint that can be used for health checkin health_checks { listener_enabled = true bind_address = "localhost" - bind_port = "80" + bind_port = "8080" live_path = "/live" ready_path = "/ready" } diff --git a/doc/telemetry.md b/doc/telemetry.md index a7c2862567..afbafe5a0b 100644 --- a/doc/telemetry.md +++ b/doc/telemetry.md @@ -74,7 +74,8 @@ The following metrics are emitted: | Counter | `server_ca`, `sign`, `x509_ca_svid` | | The CA has successfully signed an X.509 CA SVID. | Counter | `server_ca`, `sign`, `x509_svid` | | The CA has successfully signed an X.509 SVID. | Call Counter | `svid`, `rotate` | | The Server's SVID is being rotated. -| Gauge | `started` | `version` | | The version of the Server. +| Gauge | `started` | `version` | The version of the Server. +| Gauge | `uptime_in_ms` | | The uptime of the Server in milliseconds. ## SPIRE Agent @@ -99,5 +100,6 @@ The following metrics are emitted: | Call Counter | `workload_api`, `workload_attestation` | | The Workload API is performing a workload attestation. | Call Counter | `workload_api`, `workload_attestor` | `attestor` | The Workload API is invoking a given attestor. | Gauge | `started` | `version` | The version of the Agent. +| Gauge | `uptime_in_ms` | | The uptime of the Agent in milliseconds. Note: These are the keys and labels that SPIRE emits, but the format of the metric once ingested could vary depending on the metric collector. E.g. once in StatsD, the metric emitted when rotating an Agent SVID (`agent_svid`, `rotate`) can be found as `spire_agent_agent_svid_rotate_internal_host-agent-0`, where `host-agent-0` is the hostname and `spire-agent` is the service name. diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 171953f8ed..657db0999d 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -2,6 +2,7 @@ package agent import ( "context" + "errors" "fmt" "net/http" _ "net/http/pprof" //nolint: gosec // import registers routes on DefaultServeMux @@ -9,9 +10,9 @@ import ( "path" "runtime" "sync" - "time" "github.com/spiffe/go-spiffe/v2/spiffeid" + api_workload "github.com/spiffe/spire/api/workload" admin_api "github.com/spiffe/spire/pkg/agent/api" node_attestor "github.com/spiffe/spire/pkg/agent/attestor/node" workload_attestor "github.com/spiffe/spire/pkg/agent/attestor/workload" @@ -30,6 +31,8 @@ import ( "github.com/spiffe/spire/proto/spire/api/server/bundle/v1" _ "golang.org/x/net/trace" // registers handlers on the DefaultServeMux "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type Agent struct { @@ -66,6 +69,7 @@ func (a *Agent) Run(ctx context.Context) error { }) telemetry.EmitVersion(metrics) + uptime.ReportMetrics(ctx, metrics) cat, err := catalog.Load(ctx, catalog.Config{ Log: a.c.Log.WithField(telemetry.SubsystemName, telemetry.Catalog), @@ -97,7 +101,7 @@ func (a *Agent) Run(ctx context.Context) error { endpoints := a.newEndpoints(cat, metrics, manager) - if err := healthChecks.AddCheck("agent", a, time.Minute); err != nil { + if err := healthChecks.AddCheck("agent", a); err != nil { return fmt.Errorf("failed adding healthcheck: %v", err) } @@ -261,5 +265,23 @@ func (a *Agent) agentSVIDPath() string { // Status is used as a top-level health check for the Agent. func (a *Agent) Status() (interface{}, error) { - return nil, nil + client := api_workload.NewX509Client(&api_workload.X509ClientConfig{ + Addr: a.c.BindAddress, + FailOnError: true, + }) + defer client.Stop() + + errCh := make(chan error, 1) + go func() { + errCh <- client.Start() + }() + + err := <-errCh + if status.Code(err) == codes.Unavailable { + return nil, errors.New("workload api is unavailable") //nolint: golint // error is (ab)used for CLI output + } + + return health.Details{ + Message: "successfully created a workload api client to fetch x509 svid", + }, nil } diff --git a/pkg/agent/endpoints/sdsv2/handler.go b/pkg/agent/endpoints/sdsv2/handler.go index 4f603032be..99a19386f5 100644 --- a/pkg/agent/endpoints/sdsv2/handler.go +++ b/pkg/agent/endpoints/sdsv2/handler.go @@ -243,52 +243,62 @@ func (h *Handler) buildResponse(versionInfo string, req *api_v2.DiscoveryRequest names[name] = true } } + returnAllEntries := len(names) == 0 // TODO: verify the type url if upd.Bundle != nil { switch { - case len(names) == 0 || names[upd.Bundle.TrustDomainID()]: + case returnAllEntries || names[upd.Bundle.TrustDomainID()]: validationContext, err := buildValidationContext(upd.Bundle, "") if err != nil { return nil, err } + delete(names, upd.Bundle.TrustDomainID()) resp.Resources = append(resp.Resources, validationContext) case names[h.c.DefaultBundleName]: validationContext, err := buildValidationContext(upd.Bundle, h.c.DefaultBundleName) if err != nil { return nil, err } + delete(names, h.c.DefaultBundleName) resp.Resources = append(resp.Resources, validationContext) } } for _, federatedBundle := range upd.FederatedBundles { - if len(names) == 0 || names[federatedBundle.TrustDomainID()] { + if returnAllEntries || names[federatedBundle.TrustDomainID()] { validationContext, err := buildValidationContext(federatedBundle, "") if err != nil { return nil, err } + delete(names, federatedBundle.TrustDomainID()) resp.Resources = append(resp.Resources, validationContext) } } for i, identity := range upd.Identities { switch { - case len(names) == 0 || names[identity.Entry.SpiffeId]: + case returnAllEntries || names[identity.Entry.SpiffeId]: tlsCertificate, err := buildTLSCertificate(identity, "") if err != nil { return nil, err } + delete(names, identity.Entry.SpiffeId) resp.Resources = append(resp.Resources, tlsCertificate) case i == 0 && names[h.c.DefaultSVIDName]: tlsCertificate, err := buildTLSCertificate(identity, h.c.DefaultSVIDName) if err != nil { return nil, err } + delete(names, h.c.DefaultSVIDName) resp.Resources = append(resp.Resources, tlsCertificate) } } + if len(names) > 0 { + return nil, errs.New("unable to retrieve all requested identities, missing %v", names) + } + return resp, nil } diff --git a/pkg/agent/endpoints/sdsv2/handler_test.go b/pkg/agent/endpoints/sdsv2/handler_test.go index e3e618e416..95b5804602 100644 --- a/pkg/agent/endpoints/sdsv2/handler_test.go +++ b/pkg/agent/endpoints/sdsv2/handler_test.go @@ -287,9 +287,8 @@ func (s *HandlerSuite) TestStreamSecretsUnknownResource() { s.sendAndWait(stream, &api_v2.DiscoveryRequest{ ResourceNames: []string{"spiffe://domain.test/WHATEVER"}, }) - resp, err := stream.Recv() - s.Require().NoError(err) - s.requireSecrets(resp) + _, err = stream.Recv() + s.Require().Error(err) } func (s *HandlerSuite) TestStreamSecretsStreaming() { @@ -479,14 +478,10 @@ func (s *HandlerSuite) TestFetchSecrets() { s.requireSecrets(resp, workloadTLSCertificate1) // Fetch non-existent resource - resp, err = s.handler.FetchSecrets(context.Background(), &api_v2.DiscoveryRequest{ + _, err = s.handler.FetchSecrets(context.Background(), &api_v2.DiscoveryRequest{ ResourceNames: []string{"spiffe://domain.test/other"}, }) - s.Require().NoError(err) - s.Require().NotNil(resp) - s.Require().Empty(resp.VersionInfo) - s.Require().Empty(resp.Nonce) - s.requireSecrets(resp) + s.Require().Error(err) } func (s *HandlerSuite) setWorkloadUpdate(workloadCert *x509.Certificate) { diff --git a/pkg/agent/endpoints/sdsv3/handler.go b/pkg/agent/endpoints/sdsv3/handler.go index ae0dedbc39..dc947ce9cf 100644 --- a/pkg/agent/endpoints/sdsv3/handler.go +++ b/pkg/agent/endpoints/sdsv3/handler.go @@ -241,52 +241,62 @@ func (h *Handler) buildResponse(versionInfo string, req *discovery_v3.DiscoveryR names[name] = true } } + returnAllEntries := len(names) == 0 // TODO: verify the type url if upd.Bundle != nil { switch { - case len(names) == 0 || names[upd.Bundle.TrustDomainID()]: + case returnAllEntries || names[upd.Bundle.TrustDomainID()]: validationContext, err := buildValidationContext(upd.Bundle, "") if err != nil { return nil, err } + delete(names, upd.Bundle.TrustDomainID()) resp.Resources = append(resp.Resources, validationContext) case names[h.c.DefaultBundleName]: validationContext, err := buildValidationContext(upd.Bundle, h.c.DefaultBundleName) if err != nil { return nil, err } + delete(names, h.c.DefaultBundleName) resp.Resources = append(resp.Resources, validationContext) } } for _, federatedBundle := range upd.FederatedBundles { - if len(names) == 0 || names[federatedBundle.TrustDomainID()] { + if returnAllEntries || names[federatedBundle.TrustDomainID()] { validationContext, err := buildValidationContext(federatedBundle, "") if err != nil { return nil, err } + delete(names, federatedBundle.TrustDomainID()) resp.Resources = append(resp.Resources, validationContext) } } for i, identity := range upd.Identities { switch { - case len(names) == 0 || names[identity.Entry.SpiffeId]: + case returnAllEntries || names[identity.Entry.SpiffeId]: tlsCertificate, err := buildTLSCertificate(identity, "") if err != nil { return nil, err } + delete(names, identity.Entry.SpiffeId) resp.Resources = append(resp.Resources, tlsCertificate) case i == 0 && names[h.c.DefaultSVIDName]: tlsCertificate, err := buildTLSCertificate(identity, h.c.DefaultSVIDName) if err != nil { return nil, err } + delete(names, h.c.DefaultSVIDName) resp.Resources = append(resp.Resources, tlsCertificate) } } + if len(names) > 0 { + return nil, errs.New("unable to retrieve all requested identities, missing %v", names) + } + return resp, nil } diff --git a/pkg/agent/endpoints/sdsv3/handler_test.go b/pkg/agent/endpoints/sdsv3/handler_test.go index 805a6cdf5b..60410005a3 100644 --- a/pkg/agent/endpoints/sdsv3/handler_test.go +++ b/pkg/agent/endpoints/sdsv3/handler_test.go @@ -287,9 +287,8 @@ func (s *HandlerSuite) TestStreamSecretsUnknownResource() { s.sendAndWait(stream, &discovery_v3.DiscoveryRequest{ ResourceNames: []string{"spiffe://domain.test/WHATEVER"}, }) - resp, err := stream.Recv() - s.Require().NoError(err) - s.requireSecrets(resp) + _, err = stream.Recv() + s.Require().Error(err) } func (s *HandlerSuite) TestStreamSecretsStreaming() { @@ -479,14 +478,10 @@ func (s *HandlerSuite) TestFetchSecrets() { s.requireSecrets(resp, workloadTLSCertificate1) // Fetch non-existent resource - resp, err = s.handler.FetchSecrets(context.Background(), &discovery_v3.DiscoveryRequest{ + _, err = s.handler.FetchSecrets(context.Background(), &discovery_v3.DiscoveryRequest{ ResourceNames: []string{"spiffe://domain.test/other"}, }) - s.Require().NoError(err) - s.Require().NotNil(resp) - s.Require().Empty(resp.VersionInfo) - s.Require().Empty(resp.Nonce) - s.requireSecrets(resp) + s.Require().Error(err) } func (s *HandlerSuite) setWorkloadUpdate(workloadCert *x509.Certificate) { diff --git a/pkg/agent/manager/sync.go b/pkg/agent/manager/sync.go index d395089090..a32dd17507 100644 --- a/pkg/agent/manager/sync.go +++ b/pkg/agent/manager/sync.go @@ -15,7 +15,7 @@ import ( "github.com/spiffe/spire/pkg/common/telemetry" telemetry_agent "github.com/spiffe/spire/pkg/common/telemetry/agent" "github.com/spiffe/spire/pkg/common/util" - "github.com/spiffe/spire/proto/spire/api/node" + "github.com/spiffe/spire/pkg/server/api/limits" "github.com/spiffe/spire/proto/spire/common" ) @@ -76,11 +76,11 @@ func (m *manager) synchronize(ctx context.Context) (err error) { if len(staleEntries) > 0 { m.c.Log.WithFields(logrus.Fields{ telemetry.Count: len(staleEntries), - telemetry.Limit: node.CSRLimit, + telemetry.Limit: limits.SignLimitPerIP, }).Debug("Renewing stale entries") for _, staleEntry := range staleEntries { // we've exceeded the CSR limit, don't make any more CSRs - if len(csrs) >= node.CSRLimit { + if len(csrs) >= limits.SignLimitPerIP { break } diff --git a/pkg/agent/plugin/workloadattestor/k8s/k8s.go b/pkg/agent/plugin/workloadattestor/k8s/k8s.go index 0005dc2cb8..700df99148 100644 --- a/pkg/agent/plugin/workloadattestor/k8s/k8s.go +++ b/pkg/agent/plugin/workloadattestor/k8s/k8s.go @@ -628,19 +628,28 @@ func lookUpContainerInPod(containerID string, status corev1.PodStatus) (*corev1. return nil, containerNotInPod } -func getPodImages(containerStatusArray []corev1.ContainerStatus) map[string]bool { +func getPodImageIdentifiers(containerStatusArray []corev1.ContainerStatus) map[string]bool { + // Map is used purely to exclude duplicate selectors, value is unused. podImages := make(map[string]bool) - - // collect container images + // Note that for each pod image we generate *2* matching selectors. + // This is to support matching against ImageID, which has a SHA + // docker.io/envoyproxy/envoy-alpine@sha256:bf862e5f5eca0a73e7e538224578c5cf867ce2be91b5eaed22afc153c00363eb + // as well as + // docker.io/envoyproxy/envoy-alpine:v1.16.0, which does not, + // while also maintaining backwards compatibility and allowing for dynamic workload registration (k8s operator) + // when the SHA is not yet known (e.g. before the image pull is initiated at workload creation time) + // More info here: https://github.com/spiffe/spire/issues/2026 for _, status := range containerStatusArray { podImages[status.ImageID] = true + podImages[status.Image] = true } return podImages } func getSelectorsFromPodInfo(pod *corev1.Pod, status *corev1.ContainerStatus) []*common.Selector { - podImages := getPodImages(pod.Status.ContainerStatuses) - podInitImages := getPodImages(pod.Status.InitContainerStatuses) + podImageIdentifiers := getPodImageIdentifiers(pod.Status.ContainerStatuses) + podInitImageIdentifiers := getPodImageIdentifiers(pod.Status.InitContainerStatuses) + containerImageIdentifiers := getPodImageIdentifiers([]corev1.ContainerStatus{*status}) selectors := []*common.Selector{ makeSelector("sa:%s", pod.Spec.ServiceAccountName), @@ -649,14 +658,17 @@ func getSelectorsFromPodInfo(pod *corev1.Pod, status *corev1.ContainerStatus) [] makeSelector("pod-uid:%s", pod.UID), makeSelector("pod-name:%s", pod.Name), makeSelector("container-name:%s", status.Name), - makeSelector("container-image:%s", status.Image), - makeSelector("pod-image-count:%s", strconv.Itoa(len(podImages))), - makeSelector("pod-init-image-count:%s", strconv.Itoa(len(podInitImages))), + makeSelector("pod-image-count:%s", strconv.Itoa(len(pod.Status.ContainerStatuses))), + makeSelector("pod-init-image-count:%s", strconv.Itoa(len(pod.Status.InitContainerStatuses))), + } + + for containerImage := range containerImageIdentifiers { + selectors = append(selectors, makeSelector("container-image:%s", containerImage)) } - for podImage := range podImages { + for podImage := range podImageIdentifiers { selectors = append(selectors, makeSelector("pod-image:%s", podImage)) } - for podInitImage := range podInitImages { + for podInitImage := range podInitImageIdentifiers { selectors = append(selectors, makeSelector("pod-init-image:%s", podInitImage)) } diff --git a/pkg/agent/plugin/workloadattestor/k8s/k8s_test.go b/pkg/agent/plugin/workloadattestor/k8s/k8s_test.go index 2bc5f2fe6f..f626e14088 100644 --- a/pkg/agent/plugin/workloadattestor/k8s/k8s_test.go +++ b/pkg/agent/plugin/workloadattestor/k8s/k8s_test.go @@ -67,6 +67,7 @@ FwOGLt+I3+9beT0vo+pn9Rq0squewFYe3aJbwpkyfP2xOovQCdm4PC8y `)) testPodSelectors = []*common.Selector{ + {Type: "k8s", Value: "container-image:docker-pullable://localhost/spiffe/blog@sha256:0cfdaced91cb46dd7af48309799a3c351e4ca2d5e1ee9737ca0cbd932cb79898"}, {Type: "k8s", Value: "container-image:localhost/spiffe/blog:latest"}, {Type: "k8s", Value: "container-name:blog"}, {Type: "k8s", Value: "node-name:k8s-node-1"}, @@ -74,6 +75,8 @@ FwOGLt+I3+9beT0vo+pn9Rq0squewFYe3aJbwpkyfP2xOovQCdm4PC8y {Type: "k8s", Value: "pod-image-count:2"}, {Type: "k8s", Value: "pod-image:docker-pullable://localhost/spiffe/blog@sha256:0cfdaced91cb46dd7af48309799a3c351e4ca2d5e1ee9737ca0cbd932cb79898"}, {Type: "k8s", Value: "pod-image:docker-pullable://localhost/spiffe/ghostunnel@sha256:b2fc20676c92a433b9a91f3f4535faddec0c2c3613849ac12f02c1d5cfcd4c3a"}, + {Type: "k8s", Value: "pod-image:localhost/spiffe/blog:latest"}, + {Type: "k8s", Value: "pod-image:localhost/spiffe/ghostunnel:latest"}, {Type: "k8s", Value: "pod-init-image-count:0"}, {Type: "k8s", Value: "pod-label:k8s-app:blog"}, {Type: "k8s", Value: "pod-label:version:v0"}, @@ -86,10 +89,12 @@ FwOGLt+I3+9beT0vo+pn9Rq0squewFYe3aJbwpkyfP2xOovQCdm4PC8y testKindPodSelectors = []*common.Selector{ {Type: "k8s", Value: "container-image:gcr.io/spiffe-io/spire-agent:0.8.1"}, + {Type: "k8s", Value: "container-image:gcr.io/spiffe-io/spire-agent@sha256:1e4c481d76e9ecbd3d8684891e0e46aa021a30920ca04936e1fdcc552747d941"}, {Type: "k8s", Value: "container-name:workload-api-client"}, {Type: "k8s", Value: "node-name:kind-control-plane"}, {Type: "k8s", Value: "ns:default"}, {Type: "k8s", Value: "pod-image-count:1"}, + {Type: "k8s", Value: "pod-image:gcr.io/spiffe-io/spire-agent:0.8.1"}, {Type: "k8s", Value: "pod-image:gcr.io/spiffe-io/spire-agent@sha256:1e4c481d76e9ecbd3d8684891e0e46aa021a30920ca04936e1fdcc552747d941"}, {Type: "k8s", Value: "pod-init-image-count:0"}, {Type: "k8s", Value: "pod-label:app:sample-workload"}, @@ -102,14 +107,17 @@ FwOGLt+I3+9beT0vo+pn9Rq0squewFYe3aJbwpkyfP2xOovQCdm4PC8y } testInitPodSelectors = []*common.Selector{ + {Type: "k8s", Value: "container-image:docker-pullable://quay.io/coreos/flannel@sha256:1b401bf0c30bada9a539389c3be652b58fe38463361edf488e6543c8761d4970"}, {Type: "k8s", Value: "container-image:quay.io/coreos/flannel:v0.9.0-amd64"}, {Type: "k8s", Value: "container-name:install-cni"}, {Type: "k8s", Value: "node-name:k8s-node-1"}, {Type: "k8s", Value: "ns:kube-system"}, {Type: "k8s", Value: "pod-image-count:1"}, {Type: "k8s", Value: "pod-image:docker-pullable://quay.io/coreos/flannel@sha256:1b401bf0c30bada9a539389c3be652b58fe38463361edf488e6543c8761d4970"}, + {Type: "k8s", Value: "pod-image:quay.io/coreos/flannel:v0.9.0-amd64"}, {Type: "k8s", Value: "pod-init-image-count:1"}, {Type: "k8s", Value: "pod-init-image:docker-pullable://quay.io/coreos/flannel@sha256:1b401bf0c30bada9a539389c3be652b58fe38463361edf488e6543c8761d4970"}, + {Type: "k8s", Value: "pod-init-image:quay.io/coreos/flannel:v0.9.0-amd64"}, {Type: "k8s", Value: "pod-label:app:flannel"}, {Type: "k8s", Value: "pod-label:controller-revision-hash:1846323910"}, {Type: "k8s", Value: "pod-label:pod-template-generation:1"}, diff --git a/pkg/agent/plugin/workloadattestor/unix/unix.go b/pkg/agent/plugin/workloadattestor/unix/unix.go index 287d1d53f4..0d692372fa 100644 --- a/pkg/agent/plugin/workloadattestor/unix/unix.go +++ b/pkg/agent/plugin/workloadattestor/unix/unix.go @@ -15,6 +15,7 @@ import ( "strings" "sync" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/hcl" "github.com/shirou/gopsutil/process" "github.com/spiffe/spire/pkg/agent/plugin/workloadattestor" @@ -104,6 +105,7 @@ type Plugin struct { mu sync.Mutex config *Configuration + log hclog.Logger // hooks for tests hooks struct { @@ -121,6 +123,10 @@ func New() *Plugin { return p } +func (p *Plugin) SetLogger(log hclog.Logger) { + p.log = log +} + func (p *Plugin) Attest(ctx context.Context, req *workloadattestor.AttestRequest) (*workloadattestor.AttestResponse, error) { config, err := p.getConfig() if err != nil { @@ -241,6 +247,7 @@ func (p *Plugin) getUID(proc processInfo) (string, error) { func (p *Plugin) getUserName(uid string) (string, bool) { u, err := p.hooks.lookupUserByID(uid) if err != nil { + p.log.Warn("Failed to lookup user name by uid", "uid", uid, "error", err) return "", false } return u.Username, true @@ -265,6 +272,7 @@ func (p *Plugin) getGID(proc processInfo) (string, error) { func (p *Plugin) getGroupName(gid string) (string, bool) { g, err := p.hooks.lookupGroupByID(gid) if err != nil { + p.log.Warn("Failed to lookup group name by gid", "gid", gid, "error", err) return "", false } return g.Name, true diff --git a/pkg/agent/plugin/workloadattestor/unix/unix_test.go b/pkg/agent/plugin/workloadattestor/unix/unix_test.go index aeba3e6356..c96e0ae1f5 100644 --- a/pkg/agent/plugin/workloadattestor/unix/unix_test.go +++ b/pkg/agent/plugin/workloadattestor/unix/unix_test.go @@ -9,7 +9,10 @@ import ( "strconv" "testing" + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" "github.com/spiffe/spire/pkg/agent/plugin/workloadattestor" + "github.com/spiffe/spire/pkg/common/telemetry" spi "github.com/spiffe/spire/proto/spire/common/plugin" "github.com/spiffe/spire/test/spiretest" "github.com/stretchr/testify/require" @@ -27,8 +30,9 @@ func TestPlugin(t *testing.T) { type Suite struct { spiretest.Suite - dir string - p workloadattestor.Plugin + dir string + p workloadattestor.Plugin + logHook *test.Hook } func (s *Suite) SetupTest() { @@ -40,18 +44,21 @@ func (s *Suite) SetupTest() { } p.hooks.lookupUserByID = fakeLookupUserByID p.hooks.lookupGroupByID = fakeLookupGroupByID - s.LoadPlugin(builtin(p), &s.p) + log, hook := test.NewNullLogger() + s.logHook = hook + s.LoadPlugin(builtin(p), &s.p, spiretest.Logger(log)) s.configure("") } func (s *Suite) TestAttest() { testCases := []struct { - name string - pid int32 - err string - selectors []string - config string + name string + pid int32 + err string + selectors []string + config string + expectedLogs []spiretest.LogEntry }{ { name: "pid with no uids", @@ -71,6 +78,17 @@ func (s *Suite) TestAttest() { "gid:2000", "group:g2000", }, + expectedLogs: []spiretest.LogEntry{ + { + Level: logrus.WarnLevel, + Message: "Failed to lookup user name by uid", + Data: logrus.Fields{ + "uid": "1999", + logrus.ErrorKey: "no user with UID 1999", + telemetry.SubsystemName: "built-in_plugin.unix", + }, + }, + }, }, { name: "pid with no gids", @@ -90,6 +108,17 @@ func (s *Suite) TestAttest() { "user:u1000", "gid:2999", }, + expectedLogs: []spiretest.LogEntry{ + { + Level: logrus.WarnLevel, + Message: "Failed to lookup group name by gid", + Data: logrus.Fields{ + "gid": "2999", + logrus.ErrorKey: "no group with GID 2999", + telemetry.SubsystemName: "built-in_plugin.unix", + }, + }, + }, }, { name: "primary user and gid", @@ -198,10 +227,12 @@ func (s *Suite) TestAttest() { for _, testCase := range testCases { testCase := testCase s.T().Run(testCase.name, func(t *testing.T) { + defer s.logHook.Reset() s.configure(testCase.config) resp, err := s.p.Attest(ctx, &workloadattestor.AttestRequest{ Pid: testCase.pid, }) + if testCase.err != "" { spiretest.RequireGRPCStatus(t, err, codes.Unknown, testCase.err) require.Nil(t, resp) @@ -215,7 +246,9 @@ func (s *Suite) TestAttest() { require.Equal(t, "unix", selector.Type) selectors = append(selectors, selector.Value) } + require.Equal(t, testCase.selectors, selectors) + spiretest.AssertLogs(t, s.logHook.AllEntries(), testCase.expectedLogs) }) } } diff --git a/pkg/agent/svid/rotator.go b/pkg/agent/svid/rotator.go index 1bef0b1310..d9f9cc3bae 100644 --- a/pkg/agent/svid/rotator.go +++ b/pkg/agent/svid/rotator.go @@ -67,6 +67,10 @@ func (r *rotator) runRotation(ctx context.Context) error { err := r.rotateSVID(ctx) switch { + case err != nil && rotationutil.X509Expired(r.clk.Now(), r.state.Value().(State).SVID[0]): + r.c.Log.WithError(err).Error("Could not rotate agent SVID") + // Since our X509 cert has expired, and we weren't able to carry out a rotation request, we're probably unrecoverable without re-attesting. + return fmt.Errorf("current SVID has already expired and rotation failed: %v", err) case err != nil && nodeutil.ShouldAgentReattest(err): r.c.Log.WithError(err).Error("Could not rotate agent SVID") return err diff --git a/pkg/common/cryptoutil/keys.go b/pkg/common/cryptoutil/keys.go index adac44b99c..57b2b41e7e 100644 --- a/pkg/common/cryptoutil/keys.go +++ b/pkg/common/cryptoutil/keys.go @@ -29,6 +29,9 @@ func RSAKeyMatches(privateKey *rsa.PrivateKey, publicKey *rsa.PublicKey) bool { } func GetPublicKey(ctx context.Context, km keymanager.KeyManager, keyID string) (crypto.PublicKey, error) { + ctx, cancel := context.WithTimeout(ctx, keymanager.RPCTimeout) + defer cancel() + resp, err := km.GetPublicKey(ctx, &keymanager.GetPublicKeyRequest{ KeyId: keyID, }) diff --git a/pkg/common/cryptoutil/signer.go b/pkg/common/cryptoutil/signer.go index 3b0a38916c..f1ae4ab79e 100644 --- a/pkg/common/cryptoutil/signer.go +++ b/pkg/common/cryptoutil/signer.go @@ -69,10 +69,16 @@ func (s *KeyManagerSigner) Sign(_ io.Reader, digest []byte, opts crypto.SignerOp // rand is purposefully ignored since it can't be communicated between // the plugin boundary. The crypto.Signer interface implies this is ok // when it says "possibly using entropy from rand". - return s.SignContext(context.Background(), digest, opts) + ctx, cancel := context.WithTimeout(context.Background(), keymanager.RPCTimeout) + defer cancel() + + return s.SignContext(ctx, digest, opts) } func GenerateKeyRaw(ctx context.Context, km keymanager.KeyManager, keyID string, keyType keymanager.KeyType) ([]byte, error) { + ctx, cancel := context.WithTimeout(ctx, keymanager.RPCTimeout) + defer cancel() + resp, err := km.GenerateKey(ctx, &keymanager.GenerateKeyRequest{ KeyId: keyID, KeyType: keyType, diff --git a/pkg/common/health/config.go b/pkg/common/health/config.go index 1c2ce2db5c..af3eea05b0 100644 --- a/pkg/common/health/config.go +++ b/pkg/common/health/config.go @@ -18,6 +18,21 @@ type Config struct { UnusedKeys []string `hcl:",unusedKeys"` } +// getAddress returns an address suitable for use as http.Server.Addr. +func (c *Config) getAddress() string { + host := "localhost" + if c.BindAddress != "" { + host = c.BindAddress + } + + port := "80" + if c.BindPort != "" { + port = c.BindPort + } + + return fmt.Sprintf("%s:%s", host, port) +} + // getReadyPath returns the configured value or a default func (c *Config) getReadyPath() string { if c.ReadyPath == "" { @@ -36,17 +51,7 @@ func (c *Config) getLivePath() string { return c.LivePath } -// getAddress returns an address suitable for use as http.Server.Addr. -func (c *Config) getAddress() string { - host := "localhost" - if c.BindAddress != "" { - host = c.BindAddress - } - - port := "80" - if c.BindPort != "" { - port = c.BindPort - } - - return fmt.Sprintf("%s:%s", host, port) +// Details are additional data to be used when the system is ready +type Details struct { + Message string `json:"message,omitempty"` } diff --git a/pkg/common/health/health.go b/pkg/common/health/health.go index 8b8ecd663c..7201bb0c01 100644 --- a/pkg/common/health/health.go +++ b/pkg/common/health/health.go @@ -12,6 +12,8 @@ import ( "github.com/spiffe/spire/pkg/common/telemetry" ) +const readyCheckInterval = time.Minute + // health.Checker is responsible for running health checks and serving the healthcheck HTTP paths type Checker struct { config Config @@ -46,20 +48,21 @@ func NewChecker(config Config, log logrus.FieldLogger) *Checker { } } - hc.StatusListener = &statusListener{} - hc.Logger = &logadapter{FieldLogger: log.WithField(telemetry.SubsystemName, "health")} + l := log.WithField(telemetry.SubsystemName, "health") + hc.StatusListener = &statusListener{log: l} + hc.Logger = &logadapter{FieldLogger: l} return &Checker{config: config, server: server, hc: hc, log: log} } -func (c *Checker) AddCheck(name string, checker health.ICheckable, interval time.Duration) error { +func (c *Checker) AddCheck(name string, checker health.ICheckable) error { c.mutex.Lock() defer c.mutex.Unlock() return c.hc.AddCheck(&health.Config{ Name: name, Checker: checker, - Interval: interval, + Interval: readyCheckInterval, Fatal: true, }) } diff --git a/pkg/common/telemetry/server/keymanager/wrapper.go b/pkg/common/telemetry/server/keymanager/wrapper.go index b2f53484a6..14bd40c320 100644 --- a/pkg/common/telemetry/server/keymanager/wrapper.go +++ b/pkg/common/telemetry/server/keymanager/wrapper.go @@ -22,6 +22,10 @@ func WithMetrics(km keymanager.KeyManager, metrics telemetry.Metrics) keymanager func (w serverKeyManagerWrapper) GenerateKey(ctx context.Context, req *keymanager.GenerateKeyRequest) (_ *keymanager.GenerateKeyResponse, err error) { callCounter := StartGenerateKeyCall(w.m) defer callCounter.Done(&err) + + ctx, cancel := context.WithTimeout(ctx, keymanager.RPCTimeout) + defer cancel() + return w.k.GenerateKey(ctx, req) } diff --git a/pkg/common/telemetry/uptime.go b/pkg/common/telemetry/uptime.go new file mode 100644 index 0000000000..72a0513e68 --- /dev/null +++ b/pkg/common/telemetry/uptime.go @@ -0,0 +1,5 @@ +package telemetry + +func EmitUptime(m Metrics, v float32) { + m.SetGauge([]string{"uptime_in_ms"}, v) +} diff --git a/pkg/common/uptime/uptime.go b/pkg/common/uptime/uptime.go index 41d19b875e..9c6231be58 100644 --- a/pkg/common/uptime/uptime.go +++ b/pkg/common/uptime/uptime.go @@ -1,9 +1,38 @@ package uptime -import "time" +import ( + "context" + "time" -var start = time.Now() + "github.com/andres-erbsen/clock" + "github.com/spiffe/spire/pkg/common/telemetry" +) + +// Report every 10 seconds. +const reportInterval = time.Second * 10 + +var ( + clk = clock.New() + start = clk.Now() +) func Uptime() time.Duration { - return time.Since(start) + return clk.Now().Sub(start) +} + +func reportMetrics(ctx context.Context, interval time.Duration, m telemetry.Metrics) { + t := clk.Ticker(interval) + defer t.Stop() + for { + telemetry.EmitUptime(m, float32(Uptime()/time.Millisecond)) + select { + case <-t.C: + case <-ctx.Done(): + return + } + } +} + +func ReportMetrics(ctx context.Context, metrics telemetry.Metrics) { + go reportMetrics(ctx, reportInterval, metrics) } diff --git a/pkg/common/uptime/uptime_test.go b/pkg/common/uptime/uptime_test.go new file mode 100644 index 0000000000..257d66ff87 --- /dev/null +++ b/pkg/common/uptime/uptime_test.go @@ -0,0 +1,44 @@ +package uptime + +import ( + "context" + "testing" + "time" + + "github.com/spiffe/spire/test/clock" + "github.com/spiffe/spire/test/fakes/fakemetrics" + "github.com/stretchr/testify/assert" +) + +func TestReportMetrics(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + for _, tt := range []struct { + name string + reportInterval time.Duration + testUpTime time.Duration + expectedMetrics []fakemetrics.MetricItem + }{ + { + name: "report uptime metrics with 10 milliseconds interval", + reportInterval: 10 * time.Millisecond, + testUpTime: 200 * time.Millisecond, + expectedMetrics: []fakemetrics.MetricItem{ + {Type: fakemetrics.SetGaugeType, Key: []string{"uptime_in_ms"}, Val: 200}, + }, + }, + } { + tt := tt + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + metrics := fakemetrics.New() + // overwrite the variable to use mock clock. + clk = clock.NewMock(t) + start = clk.Now().Add(-tt.testUpTime) + reportMetrics(ctx, 0*time.Nanosecond, metrics) + assert.Equal(t, tt.expectedMetrics, metrics.AllMetrics()) + }) + } +} diff --git a/pkg/server/api/agent/v1/service.go b/pkg/server/api/agent/v1/service.go index 6b8951dda4..4a9860f4c0 100644 --- a/pkg/server/api/agent/v1/service.go +++ b/pkg/server/api/agent/v1/service.go @@ -532,7 +532,7 @@ func (s *Service) attestChallengeResponse(ctx context.Context, agentStream agent nodeAttestor, ok := s.cat.GetNodeAttestorNamed(attestorType) if !ok { - return nil, api.MakeErr(log, codes.FailedPrecondition, "could not find node attestor type", nil) + return nil, api.MakeErr(log, codes.FailedPrecondition, "error getting node attestor", fmt.Errorf("could not find node attestor type %q", attestorType)) } attestorStream, err := nodeAttestor.Attest(ctx) diff --git a/pkg/server/api/agent/v1/service_test.go b/pkg/server/api/agent/v1/service_test.go index 2772611a5d..19567ee168 100644 --- a/pkg/server/api/agent/v1/service_test.go +++ b/pkg/server/api/agent/v1/service_test.go @@ -1529,12 +1529,13 @@ func TestAttestAgent(t *testing.T) { name: "attest with bad attestor", request: getAttestAgentRequest("bad_type", []byte("payload_with_result"), testCsr), expectCode: codes.FailedPrecondition, - expectMsg: "could not find node attestor type", + expectMsg: "error getting node attestor: could not find node attestor type \"bad_type\"", expectLogs: []spiretest.LogEntry{ { Level: logrus.ErrorLevel, - Message: "Could not find node attestor type", + Message: "Error getting node attestor", Data: logrus.Fields{ + logrus.ErrorKey: "could not find node attestor type \"bad_type\"", telemetry.NodeAttestorType: "bad_type", }, }, diff --git a/pkg/server/api/limits/limits.go b/pkg/server/api/limits/limits.go new file mode 100644 index 0000000000..4cf0b1283f --- /dev/null +++ b/pkg/server/api/limits/limits.go @@ -0,0 +1,7 @@ +package limits + +const ( + AttestLimitPerIP = 1 + SignLimitPerIP = 500 + PushJWTKeyLimitPerIP = 500 +) diff --git a/pkg/server/ca/manager.go b/pkg/server/ca/manager.go index 5e347085ec..b117c7f131 100644 --- a/pkg/server/ca/manager.go +++ b/pkg/server/ca/manager.go @@ -689,6 +689,10 @@ func (m *Manager) loadJWTKeySlotFromEntry(ctx context.Context, entry *JWTKeyEntr func (m *Manager) makeSigner(ctx context.Context, keyID string) (crypto.Signer, error) { km := m.c.Catalog.GetKeyManager() + + ctx, cancel := context.WithTimeout(ctx, keymanager.RPCTimeout) + defer cancel() + resp, err := km.GetPublicKey(ctx, &keymanager.GetPublicKeyRequest{ KeyId: keyID, }) diff --git a/pkg/server/endpoints/bundle/acme_auth.go b/pkg/server/endpoints/bundle/acme_auth.go index b5286f86ed..6e1360c30f 100644 --- a/pkg/server/endpoints/bundle/acme_auth.go +++ b/pkg/server/endpoints/bundle/acme_auth.go @@ -101,6 +101,9 @@ type acmeKeyStore struct { func (ks *acmeKeyStore) GetPrivateKey(ctx context.Context, id string) (crypto.Signer, error) { keyID := acmeKeyPrefix + id + ctx, cancel := context.WithTimeout(ctx, keymanager.RPCTimeout) + defer cancel() + resp, err := ks.km.GetPublicKey(ctx, &keymanager.GetPublicKeyRequest{ KeyId: keyID, }) @@ -127,6 +130,9 @@ func (ks *acmeKeyStore) NewPrivateKey(ctx context.Context, id string, keyType au return nil, errs.New("unsupported key type: %d", keyType) } + ctx, cancel := context.WithTimeout(ctx, keymanager.RPCTimeout) + defer cancel() + resp, err := ks.km.GenerateKey(ctx, &keymanager.GenerateKeyRequest{ KeyId: keyID, KeyType: kmKeyType, diff --git a/pkg/server/endpoints/bundle/server.go b/pkg/server/endpoints/bundle/server.go index e1f9bb2487..3c68bcb8b3 100644 --- a/pkg/server/endpoints/bundle/server.go +++ b/pkg/server/endpoints/bundle/server.go @@ -57,9 +57,13 @@ func (s *Server) ListenAndServe(ctx context.Context) error { return errs.Wrap(err) } + // Set up the TLS config, setting TLS 1.2 as the minimum. + tlsConfig := s.c.ServerAuth.GetTLSConfig() + tlsConfig.MinVersion = tls.VersionTLS12 + server := &http.Server{ Handler: http.HandlerFunc(s.serveHTTP), - TLSConfig: s.c.ServerAuth.GetTLSConfig(), + TLSConfig: tlsConfig, } errCh := make(chan error, 1) diff --git a/pkg/server/endpoints/endpoints.go b/pkg/server/endpoints/endpoints.go index 9077d645f6..10d9aec745 100644 --- a/pkg/server/endpoints/endpoints.go +++ b/pkg/server/endpoints/endpoints.go @@ -83,6 +83,9 @@ type APIServers struct { type RateLimitConfig struct { // Attestation, if true, rate limits attestation Attestation bool + + // Signing, if true, rate limits JWT and X509 signing requests + Signing bool } // New creates new endpoints struct @@ -92,7 +95,7 @@ func New(ctx context.Context, c Config) (*Endpoints, error) { return nil, err } - buildCacheFn := func(ctx context.Context) (entrycache.Cache, error) { + buildCacheFn := func(ctx context.Context) (_ entrycache.Cache, err error) { call := telemetry.StartCall(c.Metrics, telemetry.Entry, telemetry.Cache, telemetry.Reload) defer call.Done(&err) return entrycache.BuildFromDataStore(ctx, c.Catalog.GetDataStore()) diff --git a/pkg/server/endpoints/endpoints_test.go b/pkg/server/endpoints/endpoints_test.go index 5d4e21ad57..87ad1fde6f 100644 --- a/pkg/server/endpoints/endpoints_test.go +++ b/pkg/server/endpoints/endpoints_test.go @@ -51,7 +51,10 @@ var ( agentID = testTD.NewID("/spire/agent/foo") adminID = testTD.NewID("/admin") downstreamID = testTD.NewID("/downstream") - rateLimit = RateLimitConfig{Attestation: true} + rateLimit = RateLimitConfig{ + Attestation: true, + Signing: true, + } ) func TestNew(t *testing.T) { diff --git a/pkg/server/endpoints/middleware.go b/pkg/server/endpoints/middleware.go index 837df8e08d..aa641832ff 100644 --- a/pkg/server/endpoints/middleware.go +++ b/pkg/server/endpoints/middleware.go @@ -10,12 +10,12 @@ import ( "github.com/spiffe/spire/pkg/common/telemetry" "github.com/spiffe/spire/pkg/server/api" "github.com/spiffe/spire/pkg/server/api/bundle/v1" + "github.com/spiffe/spire/pkg/server/api/limits" "github.com/spiffe/spire/pkg/server/api/middleware" "github.com/spiffe/spire/pkg/server/ca" "github.com/spiffe/spire/pkg/server/cache/entrycache" "github.com/spiffe/spire/pkg/server/plugin/datastore" "github.com/spiffe/spire/pkg/server/util/regentryutil" - node_pb "github.com/spiffe/spire/proto/spire/api/node" "github.com/spiffe/spire/proto/spire/types" "github.com/spiffe/spire/test/clock" "golang.org/x/net/context" @@ -193,11 +193,20 @@ func RateLimits(config RateLimitConfig) map[string]api.RateLimiter { noLimit := middleware.NoLimit() attestLimit := middleware.DisabledLimit() if config.Attestation { - attestLimit = middleware.PerIPLimit(node_pb.AttestLimit) + attestLimit = middleware.PerIPLimit(limits.AttestLimitPerIP) } - csrLimit := middleware.PerIPLimit(node_pb.CSRLimit) - jsrLimit := middleware.PerIPLimit(node_pb.JSRLimit) - pushJWTKeyLimit := middleware.PerIPLimit(node_pb.PushJWTKeyLimit) + + csrLimit := middleware.DisabledLimit() + if config.Signing { + csrLimit = middleware.PerIPLimit(limits.SignLimitPerIP) + } + + jsrLimit := middleware.DisabledLimit() + if config.Signing { + jsrLimit = middleware.PerIPLimit(limits.SignLimitPerIP) + } + + pushJWTKeyLimit := middleware.PerIPLimit(limits.PushJWTKeyLimitPerIP) return map[string]api.RateLimiter{ "/spire.api.server.svid.v1.SVID/MintX509SVID": noLimit, diff --git a/pkg/server/plugin/datastore/sql/sql.go b/pkg/server/plugin/datastore/sql/sql.go index 35f36f9a0f..40f02ba990 100644 --- a/pkg/server/plugin/datastore/sql/sql.go +++ b/pkg/server/plugin/datastore/sql/sql.go @@ -139,7 +139,7 @@ func (ds *Plugin) CreateBundle(ctx context.Context, req *datastore.CreateBundleR // UpdateBundle updates an existing bundle with the given CAs. Overwrites any // existing certificates. func (ds *Plugin) UpdateBundle(ctx context.Context, req *datastore.UpdateBundleRequest) (resp *datastore.UpdateBundleResponse, err error) { - if err = ds.withWriteRepeatableReadTx(ctx, func(tx *gorm.DB) (err error) { + if err = ds.withReadModifyWriteTx(ctx, func(tx *gorm.DB) (err error) { resp, err = updateBundle(tx, req) return err }); err != nil { @@ -161,7 +161,7 @@ func (ds *Plugin) SetBundle(ctx context.Context, req *datastore.SetBundleRequest // AppendBundle append bundle contents to the existing bundle (by trust domain). If no existing one is present, create it. func (ds *Plugin) AppendBundle(ctx context.Context, req *datastore.AppendBundleRequest) (resp *datastore.AppendBundleResponse, err error) { - if err = ds.withWriteRepeatableReadTx(ctx, func(tx *gorm.DB) (err error) { + if err = ds.withReadModifyWriteTx(ctx, func(tx *gorm.DB) (err error) { resp, err = appendBundle(tx, req) return err }); err != nil { @@ -216,7 +216,7 @@ func (ds *Plugin) ListBundles(ctx context.Context, req *datastore.ListBundlesReq // PruneBundle removes expired certs and keys from a bundle func (ds *Plugin) PruneBundle(ctx context.Context, req *datastore.PruneBundleRequest) (resp *datastore.PruneBundleResponse, err error) { - if err = ds.withWriteRepeatableReadTx(ctx, func(tx *gorm.DB) (err error) { + if err = ds.withReadModifyWriteTx(ctx, func(tx *gorm.DB) (err error) { resp, err = pruneBundle(tx, req, ds.log) return err }); err != nil { @@ -282,7 +282,7 @@ func (ds *Plugin) ListAttestedNodes(ctx context.Context, // UpdateAttestedNode updates the given node's cert serial and expiration. func (ds *Plugin) UpdateAttestedNode(ctx context.Context, req *datastore.UpdateAttestedNodeRequest) (resp *datastore.UpdateAttestedNodeResponse, err error) { - if err = ds.withWriteTx(ctx, func(tx *gorm.DB) (err error) { + if err = ds.withReadModifyWriteTx(ctx, func(tx *gorm.DB) (err error) { resp, err = updateAttestedNode(tx, req) return err }); err != nil { @@ -384,7 +384,7 @@ func (ds *Plugin) ListRegistrationEntries(ctx context.Context, // UpdateRegistrationEntry updates an existing registration entry func (ds *Plugin) UpdateRegistrationEntry(ctx context.Context, req *datastore.UpdateRegistrationEntryRequest) (resp *datastore.UpdateRegistrationEntryResponse, err error) { - if err = ds.withWriteRepeatableReadTx(ctx, func(tx *gorm.DB) (err error) { + if err = ds.withReadModifyWriteTx(ctx, func(tx *gorm.DB) (err error) { resp, err = updateRegistrationEntry(tx, req) return err }); err != nil { @@ -561,14 +561,38 @@ func (*Plugin) GetPluginInfo(context.Context, *spi.GetPluginInfoRequest) (*spi.G return &pluginInfo, nil } -func (ds *Plugin) withWriteRepeatableReadTx(ctx context.Context, op func(tx *gorm.DB) error) error { - return ds.withTx(ctx, op, false, &sql.TxOptions{Isolation: sql.LevelRepeatableRead}) -} - +// withReadModifyWriteTx wraps the operation in a transaction appropriate for +// operations that will read one or more rows, change one or more columns in +// those rows, and then set them back. This requires a stronger level of +// consistency that prevents two transactions from doing read-modify-write +// concurrently. +func (ds *Plugin) withReadModifyWriteTx(ctx context.Context, op func(tx *gorm.DB) error) error { + isolationLevel := sql.LevelRepeatableRead + if ds.db.databaseType == MySQL { + // MySQL REPEATABLE READ is weaker than that of PostgreSQL. Namely, + // PostgreSQL, beyond providing the minimum consistency guarantees + // mandated for REPEATABLE READ in the standard, automatically fails + // concurrent transactions that try to update the same target row. + // + // MySQL SERIALIZABLE is the same as REPEATABLE READ except that it + // automatically converts `SELECT` to `SELECT ... LOCK FOR SHARE MODE` + // which "sets a shared lock that permits other transactions to read + // the examined rows but not to update or delete them", which is what + // we want. + isolationLevel = sql.LevelSerializable + } + return ds.withTx(ctx, op, false, &sql.TxOptions{Isolation: isolationLevel}) +} + +// withWriteTx wraps the operation in a transaction appropriate for operations +// that unconditionally create/update rows, without reading them first. If two +// transactions try and update at the same time, last writer wins. func (ds *Plugin) withWriteTx(ctx context.Context, op func(tx *gorm.DB) error) error { return ds.withTx(ctx, op, false, nil) } +// withWriteTx wraps the operation in a transaction appropriate for operations +// that only read rows. func (ds *Plugin) withReadTx(ctx context.Context, op func(tx *gorm.DB) error) error { return ds.withTx(ctx, op, true, nil) } @@ -1514,6 +1538,7 @@ FROM attested_node_entries N return builder.String(), args, nil } + func updateAttestedNode(tx *gorm.DB, req *datastore.UpdateAttestedNodeRequest) (*datastore.UpdateAttestedNodeResponse, error) { var model AttestedNode if err := tx.Find(&model, "spiffe_id = ?", req.SpiffeId).Error; err != nil { diff --git a/pkg/server/plugin/keymanager/constant.go b/pkg/server/plugin/keymanager/constant.go new file mode 100644 index 0000000000..3b5d5d88fb --- /dev/null +++ b/pkg/server/plugin/keymanager/constant.go @@ -0,0 +1,5 @@ +package keymanager + +import "time" + +const RPCTimeout = 30 * time.Second diff --git a/pkg/server/plugin/nodeattestor/aws/iid.go b/pkg/server/plugin/nodeattestor/aws/iid.go index 2ced9c4ecf..887a817015 100644 --- a/pkg/server/plugin/nodeattestor/aws/iid.go +++ b/pkg/server/plugin/nodeattestor/aws/iid.go @@ -104,13 +104,14 @@ type IIDAttestorPlugin struct { // IIDAttestorConfig holds hcl configuration for IID attestor plugin type IIDAttestorConfig struct { - SessionConfig `hcl:",squash"` - SkipBlockDevice bool `hcl:"skip_block_device"` - LocalValidAcctIDs []string `hcl:"account_ids_for_local_validation"` - AgentPathTemplate string `hcl:"agent_path_template"` - pathTemplate *template.Template - trustDomain string - awsCaCertPublicKey *rsa.PublicKey + SessionConfig `hcl:",squash"` + SkipBlockDevice bool `hcl:"skip_block_device"` + DisableInstanceProfileSelectors bool `hcl:"disable_instance_profile_selectors"` + LocalValidAcctIDs []string `hcl:"account_ids_for_local_validation"` + AgentPathTemplate string `hcl:"agent_path_template"` + pathTemplate *template.Template + trustDomain string + awsCaCertPublicKey *rsa.PublicKey } // New creates a new IIDAttestorPlugin. @@ -400,11 +401,16 @@ func (p *IIDAttestorPlugin) resolveSelectors(parent context.Context, instancesDe } } + c, err := p.getConfig() + if err != nil { + return nil, err + } + for _, reservation := range instancesDesc.Reservations { for _, instance := range reservation.Instances { addSelectors(resolveTags(instance.Tags)) addSelectors(resolveSecurityGroups(instance.SecurityGroups)) - if instance.IamInstanceProfile != nil && instance.IamInstanceProfile.Arn != nil { + if !c.DisableInstanceProfileSelectors && instance.IamInstanceProfile != nil && instance.IamInstanceProfile.Arn != nil { instanceProfileName, err := instanceProfileNameFromArn(*instance.IamInstanceProfile.Arn) if err != nil { return nil, err diff --git a/pkg/server/plugin/nodeattestor/aws/iid_test.go b/pkg/server/plugin/nodeattestor/aws/iid_test.go index 2eaf849e6d..104e9d83be 100644 --- a/pkg/server/plugin/nodeattestor/aws/iid_test.go +++ b/pkg/server/plugin/nodeattestor/aws/iid_test.go @@ -225,15 +225,16 @@ func (s *IIDAttestorSuite) TestErrorOnNoSignature() { func (s *IIDAttestorSuite) TestClientAndIDReturns() { tests := []struct { - desc string - mockExpect func(mock *mock_aws.MockClient) - expectID string - expectSelectors []*common.Selector - expectErr string - replacementTemplate string - allowList []string - skipBlockDev bool - skipEC2Block bool + desc string + mockExpect func(mock *mock_aws.MockClient) + expectID string + expectSelectors []*common.Selector + expectErr string + replacementTemplate string + allowList []string + skipBlockDev bool + skipEC2Block bool + disableInstanceProfileSelectors bool }{ { desc: "error on call", @@ -409,6 +410,47 @@ func (s *IIDAttestorSuite) TestClientAndIDReturns() { replacementTemplate: "{{ .PluginName}}/zone1/{{ .Tags.Hostname }}", expectID: "spiffe://example.org/spire/agent/aws_iid/zone1/%3Cno%20value%3E", }, + { + desc: "success, ignore instance profile selectors", + disableInstanceProfileSelectors: true, + mockExpect: func(mock *mock_aws.MockClient) { + output := getDefaultDescribeInstancesOutput() + output.Reservations[0].Instances[0].Tags = []*ec2.Tag{ + { + Key: aws.String("Hostname"), + Value: aws.String("host1"), + }, + } + output.Reservations[0].Instances[0].SecurityGroups = []*ec2.GroupIdentifier{ + { + GroupId: aws.String("TestGroup"), + GroupName: aws.String("Test Group Name"), + }, + } + output.Reservations[0].Instances[0].IamInstanceProfile = &ec2.IamInstanceProfile{ + Arn: aws.String("arn:aws::::instance-profile/" + testProfile), + } + output.Reservations[0].Instances[0].RootDeviceType = &instanceStoreType + output.Reservations[0].Instances[0].NetworkInterfaces[0].Attachment.DeviceIndex = &zeroDeviceIndex + setAttestExpectations(mock, output, nil) + gipo := &iam.GetInstanceProfileOutput{ + InstanceProfile: &iam.InstanceProfile{ + Roles: []*iam.Role{ + {Arn: aws.String("role1")}, + {Arn: aws.String("role2")}, + }, + }, + } + setResolveSelectorsExpectations(mock, gipo) + }, + replacementTemplate: "{{ .PluginName}}/zone1/{{ .Tags.Hostname }}", + expectSelectors: []*common.Selector{ + {Type: caws.PluginName, Value: "sg:id:TestGroup"}, + {Type: caws.PluginName, Value: "sg:name:Test Group Name"}, + {Type: caws.PluginName, Value: "tag:Hostname:host1"}, + }, + expectID: "spiffe://example.org/spire/agent/aws_iid/zone1/host1", + }, } for _, tt := range tests { @@ -446,6 +488,10 @@ func (s *IIDAttestorSuite) TestClientAndIDReturns() { configStr += "\nskip_ec2_attest_calling = true" } + if tt.disableInstanceProfileSelectors { + configStr += "\ndisable_instance_profile_selectors = true" + } + _, err := s.p.Configure(context.Background(), &plugin.ConfigureRequest{ Configuration: configStr, GlobalConfig: &plugin.ConfigureRequest_GlobalConfig{TrustDomain: "example.org"}, diff --git a/pkg/server/plugin/notifier/k8sbundle/k8sbundle.go b/pkg/server/plugin/notifier/k8sbundle/k8sbundle.go index 337b9c079e..91aec016d1 100644 --- a/pkg/server/plugin/notifier/k8sbundle/k8sbundle.go +++ b/pkg/server/plugin/notifier/k8sbundle/k8sbundle.go @@ -6,25 +6,31 @@ import ( "encoding/json" "encoding/pem" "errors" - "net/http" + "fmt" + "strings" "sync" "github.com/hashicorp/go-hclog" "github.com/hashicorp/hcl" "github.com/spiffe/spire/pkg/common/catalog" - "github.com/spiffe/spire/pkg/common/telemetry" "github.com/spiffe/spire/pkg/server/plugin/hostservices" "github.com/spiffe/spire/pkg/server/plugin/notifier" "github.com/spiffe/spire/proto/spire/common" spi "github.com/spiffe/spire/proto/spire/common/plugin" "github.com/zeebo/errs" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + admissionv1 "k8s.io/api/admissionregistration/v1" corev1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/retry" ) var ( @@ -51,6 +57,7 @@ type pluginConfig struct { Namespace string `hcl:"namespace"` ConfigMap string `hcl:"config_map"` ConfigMapKey string `hcl:"config_map_key"` + WebhookLabel string `hcl:"webhook_label"` KubeConfigFilePath string `hcl:"kube_config_file_path"` } @@ -61,9 +68,10 @@ type Plugin struct { log hclog.Logger config *pluginConfig identityProvider hostservices.IdentityProvider + cancelWatcher func() hooks struct { - newKubeClient func(configPath string) (kubeClient, error) + newKubeClient func(c *pluginConfig) ([]kubeClient, error) } } @@ -95,8 +103,8 @@ func (p *Plugin) Notify(ctx context.Context, req *notifier.NotifyRequest) (*noti } if _, ok := req.Event.(*notifier.NotifyRequest_BundleUpdated); ok { - // ignore the bundle presented in the request. see updateBundleConfigMap for details on why. - if err := p.updateBundleConfigMap(ctx, config); err != nil { + // ignore the bundle presented in the request. see updateBundle for details on why. + if err := p.updateBundles(ctx, config); err != nil { return nil, err } } @@ -110,8 +118,8 @@ func (p *Plugin) NotifyAndAdvise(ctx context.Context, req *notifier.NotifyAndAdv } if _, ok := req.Event.(*notifier.NotifyAndAdviseRequest_BundleLoaded); ok { - // ignore the bundle presented in the request. see updateBundleConfigMap for details on why. - if err := p.updateBundleConfigMap(ctx, config); err != nil { + // ignore the bundle presented in the request. see updateBundle for details on why. + if err := p.updateBundles(ctx, config); err != nil { return nil, err } } @@ -138,7 +146,10 @@ func (p *Plugin) Configure(ctx context.Context, req *spi.ConfigureRequest) (resp config.ConfigMapKey = defaultConfigMapKey } - p.setConfig(config) + if err = p.setConfig(config); err != nil { + return nil, k8sErr.New("unable to set configuration: %v", err) + } + return &spi.ConfigureResponse{}, nil } @@ -155,28 +166,96 @@ func (p *Plugin) getConfig() (*pluginConfig, error) { return p.config, nil } -func (p *Plugin) setConfig(config *pluginConfig) { +func (p *Plugin) setConfig(config *pluginConfig) error { p.mu.Lock() defer p.mu.Unlock() + + // Start watcher to set CA Bundle in objects created after server has started + var cancelWatcher func() + if config.WebhookLabel != "" { + ctx, cancel := context.WithCancel(context.Background()) + watcher, err := newBundleWatcher(ctx, p, config) + if err != nil { + cancel() + return err + } + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + if err := watcher.Watch(ctx); err != nil && !errors.Is(err, context.Canceled) { + p.log.Error("Unable to watch", "error", err) + } + }() + cancelWatcher = func() { + cancel() + wg.Wait() + } + } + if p.cancelWatcher != nil { + p.cancelWatcher() + p.cancelWatcher = nil + } + if config.WebhookLabel != "" { + p.cancelWatcher = cancelWatcher + } + p.config = config + return nil } -func (p *Plugin) updateBundleConfigMap(ctx context.Context, c *pluginConfig) (err error) { - client, err := p.hooks.newKubeClient(c.KubeConfigFilePath) +// updateBundles iterates through all the objects that need an updated CA bundle +// If an error is an encountered updating the bundle for an object, we record the +// error and continue on to the next object +func (p *Plugin) updateBundles(ctx context.Context, c *pluginConfig) (err error) { + clients, err := p.hooks.newKubeClient(c) if err != nil { return err } - for { - // Get the config map so we can use the version to resolve conflicts racing + var updateErrs string + for _, client := range clients { + list, err := client.GetList(ctx, c) + if err != nil { + updateErrs += fmt.Sprintf("unable to get list: %v, ", err) + continue + } + listItems, err := meta.ExtractList(list) + if err != nil { + updateErrs += fmt.Sprintf("unable to extract list items: %v, ", err) + continue + } + for _, item := range listItems { + itemMeta, err := meta.Accessor(item) + if err != nil { + updateErrs += fmt.Sprintf("unable to extract metadata for item: %v, ", err) + continue + } + err = p.updateBundle(ctx, c, client, itemMeta.GetNamespace(), itemMeta.GetName()) + if err != nil && status.Code(err) != codes.AlreadyExists { + updateErrs += fmt.Sprintf("%s: %v, ", namespacedName(itemMeta), err) + } + } + } + + if len(updateErrs) > 0 { + return k8sErr.New("unable to update: %s", strings.TrimSuffix(updateErrs, ", ")) + } + return nil +} + +// updateBundle does the ready-modify-write semantics for Kubernetes, retrying on conflict +func (p *Plugin) updateBundle(ctx context.Context, c *pluginConfig, client kubeClient, namespace, name string) (err error) { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + // Get the object so we can use the version to resolve conflicts racing // on updates from other servers. - configMap, err := client.GetConfigMap(ctx, c.Namespace, c.ConfigMap) + obj, err := client.Get(ctx, namespace, name) if err != nil { - return k8sErr.New("unable to get config map %s/%s: %v", c.Namespace, c.ConfigMap, err) + return k8sErr.New("unable to get object %s/%s: %v", namespace, name, err) } // Load bundle data from the registration api. The bundle has to be - // loaded after fetching the config map so we can properly detect and + // loaded after fetching the object so we can properly detect and // correct a race updating the bundle (i.e. read-modify-write // semantics). resp, err := p.identityProvider.FetchX509Identity(ctx, &hostservices.FetchX509IdentityRequest{}) @@ -186,34 +265,38 @@ func (p *Plugin) updateBundleConfigMap(ctx context.Context, c *pluginConfig) (er // Build patch with the new bundle data. The resource version MUST be set // to support conflict resolution. - patchBytes, err := json.Marshal(corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: configMap.ResourceVersion, - }, - Data: map[string]string{ - c.ConfigMapKey: bundleData(resp.Bundle), - }, - }) + patch, err := client.CreatePatch(ctx, c, obj, resp) if err != nil { - return k8sErr.New("unable to marshal patch: %v", err) + return err } // Patch the bundle, handling version conflicts - if err := client.PatchConfigMap(ctx, c.Namespace, c.ConfigMap, patchBytes); err != nil { - // If there is a conflict then some other server won the race updating - // the ConfigMap. We need to retrieve the latest bundle and try again. - if s, ok := err.(k8serrors.APIStatus); ok && s.Status().Code == http.StatusConflict { - p.log.Debug("Conflict detected patching configmap; will retry", telemetry.VersionInfo, configMap.ResourceVersion) - continue - } - return k8sErr.New("unable to update config map %s/%s: %v", c.Namespace, c.ConfigMap, err) + patchBytes, err := json.Marshal(patch) + if err != nil { + return k8sErr.New("unable to marshal patch: %v", err) } + return client.Patch(ctx, namespace, name, patchBytes) + }) +} - return nil +func newKubeClient(c *pluginConfig) ([]kubeClient, error) { + clientset, err := newKubeClientset(c.KubeConfigFilePath) + if err != nil { + return nil, k8sErr.Wrap(err) } + + clients := []kubeClient{configMapClient{Clientset: clientset}} + if c.WebhookLabel != "" { + clients = append(clients, + mutatingWebhookClient{Clientset: clientset}, + validatingWebhookClient{Clientset: clientset}, + ) + } + + return clients, nil } -func newKubeClient(configPath string) (kubeClient, error) { +func newKubeClientset(configPath string) (*kubernetes.Clientset, error) { config, err := getKubeConfig(configPath) if err != nil { return nil, k8sErr.Wrap(err) @@ -223,7 +306,7 @@ func newKubeClient(configPath string) (kubeClient, error) { if err != nil { return nil, k8sErr.Wrap(err) } - return kubeClientset{Clientset: client}, nil + return client, nil } func getKubeConfig(configPath string) (*rest.Config, error) { @@ -233,24 +316,183 @@ func getKubeConfig(configPath string) (*rest.Config, error) { return rest.InClusterConfig() } +// kubeClient encapsulates the Kubenetes API for config maps, validating webhooks, and mutating webhooks type kubeClient interface { - GetConfigMap(ctx context.Context, namespace, configMap string) (*corev1.ConfigMap, error) - PatchConfigMap(ctx context.Context, namespace string, configMap string, patchBytes []byte) error + Get(ctx context.Context, namespace, name string) (runtime.Object, error) + GetList(ctx context.Context, config *pluginConfig) (runtime.Object, error) + CreatePatch(ctx context.Context, config *pluginConfig, obj runtime.Object, resp *hostservices.FetchX509IdentityResponse) (runtime.Object, error) + Patch(ctx context.Context, namespace, name string, patchBytes []byte) error + Watch(ctx context.Context, config *pluginConfig) (watch.Interface, error) } -type kubeClientset struct { +// configMapClient encapsulates the Kubenetes API for updating the CA Bundle in a config map +type configMapClient struct { *kubernetes.Clientset } -func (c kubeClientset) GetConfigMap(ctx context.Context, namespace, configMap string) (*corev1.ConfigMap, error) { +func (c configMapClient) Get(ctx context.Context, namespace, configMap string) (runtime.Object, error) { return c.CoreV1().ConfigMaps(namespace).Get(ctx, configMap, metav1.GetOptions{}) } -func (c kubeClientset) PatchConfigMap(ctx context.Context, namespace, configMap string, patchBytes []byte) error { - _, err := c.CoreV1().ConfigMaps(namespace).Patch(ctx, configMap, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) +func (c configMapClient) GetList(ctx context.Context, config *pluginConfig) (runtime.Object, error) { + obj, err := c.Get(ctx, config.Namespace, config.ConfigMap) + if err != nil { + return nil, err + } + configMap := obj.(*corev1.ConfigMap) + return &corev1.ConfigMapList{ + Items: []corev1.ConfigMap{*configMap}, + }, nil +} + +func (c configMapClient) CreatePatch(ctx context.Context, config *pluginConfig, obj runtime.Object, resp *hostservices.FetchX509IdentityResponse) (runtime.Object, error) { + configMap, ok := obj.(*corev1.ConfigMap) + if !ok { + return nil, k8sErr.New("wrong type, expecting ConfigMap") + } + return &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: configMap.ResourceVersion, + }, + Data: map[string]string{ + config.ConfigMapKey: bundleData(resp.Bundle), + }, + }, nil +} + +func (c configMapClient) Patch(ctx context.Context, namespace, name string, patchBytes []byte) error { + _, err := c.CoreV1().ConfigMaps(namespace).Patch(ctx, name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) return err } +func (c configMapClient) Watch(ctx context.Context, config *pluginConfig) (watch.Interface, error) { + return nil, nil +} + +// mutatingWebhookClient encapsulates the Kubenetes API for updating the CA Bundle in a mutating webhook +type mutatingWebhookClient struct { + *kubernetes.Clientset +} + +func (c mutatingWebhookClient) Get(ctx context.Context, namespace, mutatingWebhook string) (runtime.Object, error) { + return c.AdmissionregistrationV1().MutatingWebhookConfigurations().Get(ctx, mutatingWebhook, metav1.GetOptions{}) +} + +func (c mutatingWebhookClient) GetList(ctx context.Context, config *pluginConfig) (runtime.Object, error) { + return c.AdmissionregistrationV1().MutatingWebhookConfigurations().List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=true", config.WebhookLabel), + }) +} + +func (c mutatingWebhookClient) CreatePatch(ctx context.Context, config *pluginConfig, obj runtime.Object, resp *hostservices.FetchX509IdentityResponse) (runtime.Object, error) { + mutatingWebhook, ok := obj.(*admissionv1.MutatingWebhookConfiguration) + if !ok { + return nil, k8sErr.New("wrong type, expecting MutatingWebhookConfiguration") + } + + // Check if MutatingWebhookConfiguration needs an update + needsUpdate := false + for _, webhook := range mutatingWebhook.Webhooks { + if !bytes.Equal(webhook.ClientConfig.CABundle, []byte(bundleData(resp.Bundle))) { + needsUpdate = true + break + } + } + if !needsUpdate { + return nil, status.Errorf(codes.AlreadyExists, "MutatingWebhookConfiguration %s is already up to date", mutatingWebhook.Name) + } + + patch := &admissionv1.MutatingWebhookConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: mutatingWebhook.ResourceVersion, + }, + } + patch.Webhooks = make([]admissionv1.MutatingWebhook, len(mutatingWebhook.Webhooks)) + + // Step through all the the webhooks in the MutatingWebhookConfiguration + for i := range patch.Webhooks { + patch.Webhooks[i].AdmissionReviewVersions = mutatingWebhook.Webhooks[i].AdmissionReviewVersions + patch.Webhooks[i].ClientConfig.CABundle = []byte(bundleData(resp.Bundle)) + patch.Webhooks[i].Name = mutatingWebhook.Webhooks[i].Name + patch.Webhooks[i].SideEffects = mutatingWebhook.Webhooks[i].SideEffects + } + + return patch, nil +} + +func (c mutatingWebhookClient) Patch(ctx context.Context, namespace, name string, patchBytes []byte) error { + _, err := c.AdmissionregistrationV1().MutatingWebhookConfigurations().Patch(ctx, name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) + return err +} + +func (c mutatingWebhookClient) Watch(ctx context.Context, config *pluginConfig) (watch.Interface, error) { + return c.AdmissionregistrationV1().MutatingWebhookConfigurations().Watch(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=true", config.WebhookLabel), + }) +} + +// validatingWebhookClient encapsulates the Kubenetes API for updating the CA Bundle in a validating webhook +type validatingWebhookClient struct { + *kubernetes.Clientset +} + +func (c validatingWebhookClient) Get(ctx context.Context, namespace, validatingWebhook string) (runtime.Object, error) { + return c.AdmissionregistrationV1().ValidatingWebhookConfigurations().Get(ctx, validatingWebhook, metav1.GetOptions{}) +} + +func (c validatingWebhookClient) GetList(ctx context.Context, config *pluginConfig) (runtime.Object, error) { + return c.AdmissionregistrationV1().ValidatingWebhookConfigurations().List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=true", config.WebhookLabel), + }) +} + +func (c validatingWebhookClient) CreatePatch(ctx context.Context, config *pluginConfig, obj runtime.Object, resp *hostservices.FetchX509IdentityResponse) (runtime.Object, error) { + validatingWebhook, ok := obj.(*admissionv1.ValidatingWebhookConfiguration) + if !ok { + return nil, k8sErr.New("wrong type, expecting ValidatingWebhookConfiguration") + } + + // Check if ValidatingWebhookConfiguration needs an update + needsUpdate := false + for _, webhook := range validatingWebhook.Webhooks { + if !bytes.Equal(webhook.ClientConfig.CABundle, []byte(bundleData(resp.Bundle))) { + needsUpdate = true + break + } + } + if !needsUpdate { + return nil, status.Errorf(codes.AlreadyExists, "ValidatingWebhookConfiguration %s is already up to date", validatingWebhook.Name) + } + + patch := &admissionv1.ValidatingWebhookConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: validatingWebhook.ResourceVersion, + }, + } + patch.Webhooks = make([]admissionv1.ValidatingWebhook, len(validatingWebhook.Webhooks)) + + // Step through all the the webhooks in the ValidatingWebhookConfiguration + for i := range patch.Webhooks { + patch.Webhooks[i].AdmissionReviewVersions = validatingWebhook.Webhooks[i].AdmissionReviewVersions + patch.Webhooks[i].ClientConfig.CABundle = []byte(bundleData(resp.Bundle)) + patch.Webhooks[i].Name = validatingWebhook.Webhooks[i].Name + patch.Webhooks[i].SideEffects = validatingWebhook.Webhooks[i].SideEffects + } + + return patch, nil +} + +func (c validatingWebhookClient) Patch(ctx context.Context, namespace, name string, patchBytes []byte) error { + _, err := c.AdmissionregistrationV1().ValidatingWebhookConfigurations().Patch(ctx, name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) + return err +} + +func (c validatingWebhookClient) Watch(ctx context.Context, config *pluginConfig) (watch.Interface, error) { + return c.AdmissionregistrationV1().ValidatingWebhookConfigurations().Watch(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=true", config.WebhookLabel), + }) +} + // bundleData formats the bundle data for inclusion in the config map func bundleData(bundle *common.Bundle) string { bundleData := new(bytes.Buffer) @@ -262,3 +504,11 @@ func bundleData(bundle *common.Bundle) string { } return bundleData.String() } + +// namespacedName returns "namespace/name" for namespaced resources and "name" for non-namespaced resources +func namespacedName(itemMeta metav1.Object) string { + if itemMeta.GetNamespace() != "" { + return fmt.Sprintf("%s/%s", itemMeta.GetNamespace(), itemMeta.GetName()) + } + return itemMeta.GetName() +} diff --git a/pkg/server/plugin/notifier/k8sbundle/k8sbundle_test.go b/pkg/server/plugin/notifier/k8sbundle/k8sbundle_test.go index 470e8f1f07..8871329051 100644 --- a/pkg/server/plugin/notifier/k8sbundle/k8sbundle_test.go +++ b/pkg/server/plugin/notifier/k8sbundle/k8sbundle_test.go @@ -23,6 +23,8 @@ import ( corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" ) var ( @@ -126,7 +128,7 @@ func (s *Suite) TestBundleLoadedConfigMapGetFailure() { }, }, }) - s.RequireGRPCStatus(err, codes.Unknown, "k8s-bundle: unable to get config map spire/spire-bundle: not found") + s.RequireGRPCStatus(err, codes.Unknown, "k8s-bundle: unable to update: unable to get list: not found") s.Nil(resp) } @@ -149,7 +151,7 @@ func (s *Suite) TestBundleLoadedConfigMapPatchFailure() { }, }, }) - s.RequireGRPCStatus(err, codes.Unknown, "k8s-bundle: unable to update config map spire/spire-bundle: some error") + s.RequireGRPCStatus(err, codes.Unknown, "k8s-bundle: unable to update: spire/spire-bundle: some error") s.Nil(resp) } @@ -159,6 +161,7 @@ func (s *Suite) TestBundleLoadedConfigMapUpdateConflict() { ErrStatus: metav1.Status{ Code: http.StatusConflict, Message: "unexpected version", + Reason: "Conflict", }, }) @@ -279,7 +282,7 @@ func (s *Suite) TestBundleUpdatedConfigMapGetFailure() { }, }, }) - s.RequireGRPCStatus(err, codes.Unknown, "k8s-bundle: unable to get config map spire/spire-bundle: not found") + s.RequireGRPCStatus(err, codes.Unknown, "k8s-bundle: unable to update: unable to get list: not found") s.Nil(resp) } @@ -302,7 +305,7 @@ func (s *Suite) TestBundleUpdatedConfigMapPatchFailure() { }, }, }) - s.RequireGRPCStatus(err, codes.Unknown, "k8s-bundle: unable to update config map spire/spire-bundle: some error") + s.RequireGRPCStatus(err, codes.Unknown, "k8s-bundle: unable to update: spire/spire-bundle: some error") s.Nil(resp) } @@ -312,6 +315,7 @@ func (s *Suite) TestBundleUpdatedConfigMapUpdateConflict() { ErrStatus: metav1.Status{ Code: http.StatusConflict, Message: "unexpected version", + Reason: "Conflict", }, }) @@ -429,12 +433,12 @@ func (s *Suite) TestBundleFailsToLoadIfHostServicesUnavailabler() { } func (s *Suite) withKubeClient(client kubeClient, expectedConfigPath string) { - s.raw.hooks.newKubeClient = func(configPath string) (kubeClient, error) { - s.Equal(expectedConfigPath, configPath) + s.raw.hooks.newKubeClient = func(c *pluginConfig) ([]kubeClient, error) { + s.Equal(expectedConfigPath, c.KubeConfigFilePath) if client == nil { return nil, errors.New("kube client not configured") } - return client, nil + return []kubeClient{client}, nil } } @@ -461,15 +465,37 @@ func newFakeKubeClient(configMaps ...*corev1.ConfigMap) *fakeKubeClient { return c } -func (c *fakeKubeClient) GetConfigMap(ctx context.Context, namespace, configMap string) (*corev1.ConfigMap, error) { +func (c *fakeKubeClient) Get(ctx context.Context, namespace, configMap string) (runtime.Object, error) { entry := c.getConfigMap(namespace, configMap) if entry == nil { return nil, errors.New("not found") } return entry, nil } +func (c *fakeKubeClient) GetList(ctx context.Context, config *pluginConfig) (runtime.Object, error) { + list := c.getConfigMapList() + if list.Items == nil { + return nil, errors.New("not found") + } + return list, nil +} -func (c *fakeKubeClient) PatchConfigMap(ctx context.Context, namespace, configMap string, patchBytes []byte) error { +func (c *fakeKubeClient) CreatePatch(ctx context.Context, config *pluginConfig, obj runtime.Object, resp *hostservices.FetchX509IdentityResponse) (runtime.Object, error) { + configMap, ok := obj.(*corev1.ConfigMap) + if !ok { + return nil, k8sErr.New("wrong type, expecting config map") + } + return &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: configMap.ResourceVersion, + }, + Data: map[string]string{ + config.ConfigMapKey: bundleData(resp.Bundle), + }, + }, nil +} + +func (c *fakeKubeClient) Patch(ctx context.Context, namespace, configMap string, patchBytes []byte) error { c.mu.Lock() defer c.mu.Unlock() @@ -503,12 +529,26 @@ func (c *fakeKubeClient) PatchConfigMap(ctx context.Context, namespace, configMa return nil } +func (c *fakeKubeClient) Watch(ctx context.Context, config *pluginConfig) (watch.Interface, error) { + return nil, nil +} + func (c *fakeKubeClient) getConfigMap(namespace, configMap string) *corev1.ConfigMap { c.mu.RLock() defer c.mu.RUnlock() return c.configMaps[configMapKey(namespace, configMap)] } +func (c *fakeKubeClient) getConfigMapList() *corev1.ConfigMapList { + c.mu.RLock() + defer c.mu.RUnlock() + configMapList := &corev1.ConfigMapList{} + for _, configMap := range c.configMaps { + configMapList.Items = append(configMapList.Items, *configMap) + } + return configMapList +} + func (c *fakeKubeClient) setConfigMap(configMap *corev1.ConfigMap) { c.mu.Lock() defer c.mu.Unlock() diff --git a/pkg/server/plugin/notifier/k8sbundle/k8sbundle_watcher.go b/pkg/server/plugin/notifier/k8sbundle/k8sbundle_watcher.go new file mode 100644 index 0000000000..3d9ef24ab8 --- /dev/null +++ b/pkg/server/plugin/notifier/k8sbundle/k8sbundle_watcher.go @@ -0,0 +1,124 @@ +package k8sbundle + +import ( + "context" + "reflect" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/watch" +) + +type bundleWatcher struct { + p *Plugin + clients []kubeClient + watchers []watch.Interface + config *pluginConfig + + hooks struct { + watch func(ctx context.Context) error + } +} + +// newBundleWatcher creates a new watcher for newly created objects +func newBundleWatcher(ctx context.Context, p *Plugin, config *pluginConfig) (*bundleWatcher, error) { + clients, err := p.hooks.newKubeClient(config) + if err != nil { + return nil, err + } + watchers, clients, err := newWatchers(ctx, clients, config) + if err != nil { + return nil, err + } + + watcher := &bundleWatcher{ + p: p, + clients: clients, + config: config, + watchers: watchers, + } + watcher.hooks.watch = watcher.watch + return watcher, nil +} + +// Watch calls the hook to watch for new objects +func (b *bundleWatcher) Watch(ctx context.Context) error { + return b.hooks.watch(ctx) +} + +// watch watches for new objects that are created with the proper selector and updates the CA Bundle +func (b *bundleWatcher) watch(ctx context.Context) error { + selectCase := newSelectCase(ctx, b.watchers) + for { + chosen, recv, _ := reflect.Select(selectCase) + if chosen < len(b.clients) { + if err := b.watchEvent(ctx, b.clients[chosen], recv.Interface().(watch.Event)); err != nil { + b.p.log.Error("Handling watch event", "error", err) + } + } else { + // The context is the last element in the array + return ctx.Err() + } + } +} + +// watchEvent triggers the read-modify-write for a newly created object +func (b *bundleWatcher) watchEvent(ctx context.Context, client kubeClient, event watch.Event) error { + if event.Type == watch.Added || event.Type == watch.Modified { + objectMeta, err := meta.Accessor(event.Object) + if err != nil { + return err + } + + err = b.p.updateBundle(ctx, b.config, client, objectMeta.GetNamespace(), objectMeta.GetName()) + switch { + case err == nil: + b.p.log.Debug("Set bundle for object", "name", objectMeta.GetName(), "event", event.Type) + case status.Code(err) == codes.FailedPrecondition: + // Ignore FailPrecondition errors for when SPIRE is booting and we receive an event prior to + // IdentityProvider being initialized. In this case the BundleLoaded event will come + // to populate the caBundle, so its safe to ignore this error. + case status.Code(err) == codes.AlreadyExists: + // Updating the bundle from an ADD event triggers a subsequent MODIFIED event. updateBundle will + // return AlreadyExists since nothing needs to be updated. + default: + return err + } + } + return nil +} + +// newWatchers creates a watcher array for all of the clients +func newWatchers(ctx context.Context, clients []kubeClient, config *pluginConfig) ([]watch.Interface, []kubeClient, error) { + watchers := []watch.Interface{} + validClients := []kubeClient{} + for _, client := range clients { + watcher, err := client.Watch(ctx, config) + if err != nil { + return nil, nil, err + } + if watcher != nil { + watchers = append(watchers, watcher) + validClients = append(validClients, client) + } + } + return watchers, validClients, nil +} + +// newSelectCase creates the SelectCase array used by reflect.Select +func newSelectCase(ctx context.Context, watchers []watch.Interface) []reflect.SelectCase { + selectCase := []reflect.SelectCase{} + for _, watcher := range watchers { + selectCase = append(selectCase, reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(watcher.ResultChan()), + }) + } + // Add the context as the last element in the array + selectCase = append(selectCase, reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(ctx.Done()), + }) + return selectCase +} diff --git a/pkg/server/plugin/notifier/k8sbundle/k8sbundle_watcher_test.go b/pkg/server/plugin/notifier/k8sbundle/k8sbundle_watcher_test.go new file mode 100644 index 0000000000..3ed1347e61 --- /dev/null +++ b/pkg/server/plugin/notifier/k8sbundle/k8sbundle_watcher_test.go @@ -0,0 +1,250 @@ +package k8sbundle + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strconv" + "sync" + "time" + + "github.com/spiffe/spire/pkg/server/plugin/hostservices" + admissionv1 "k8s.io/api/admissionregistration/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" +) + +const ( + testTimeout = time.Second * 2 +) + +func (s *Suite) TestBundleWatcherErrorsWhenCannotCreateClient() { + s.withKubeClient(nil, "") + + s.configure("") + + _, err := newBundleWatcher(context.TODO(), s.raw, s.raw.config) + s.Require().Equal(err.Error(), "kube client not configured") +} + +func (s *Suite) TestBundleWatchersStartsAndStops() { + s.configure("") + + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error) + watcherStarted := make(chan struct{}) + watcher, err := newBundleWatcher(ctx, s.raw, s.raw.config) + s.Require().NoError(err) + + watcher.hooks.watch = func(ctx context.Context) error { + watcherStarted <- struct{}{} + <-ctx.Done() + return ctx.Err() + } + go func() { + errCh <- watcher.Watch(ctx) + }() + + timer := time.NewTimer(testTimeout) + defer timer.Stop() + + select { + case <-watcherStarted: + case err := <-errCh: + if err != nil { + s.Require().FailNow(fmt.Sprintf("watcher.Watch() unexpected exit: %v", err)) + } else { + s.Require().FailNow("watcher.Watch() unexpected exit") + } + case <-timer.C: + s.Require().FailNow("timed out waiting for watcher to start") + } + + cancel() + + select { + case err := <-errCh: + s.Require().Equal(err.Error(), "context canceled") + case <-timer.C: + s.Require().FailNow("timed out waiting for watcher.Watch() to return") + } +} + +func (s *Suite) TestBundleWatcherUpdateConfig() { + w := newFakeWebhook() + s.withKubeClient(w, "/some/file/path") + + s.configure(` +webhook_label = "LABEL" +kube_config_file_path = "/some/file/path" +`) + s.Require().Eventually(func() bool { + return w.getWatchLabel() == "LABEL" + }, testTimeout, time.Second) + + s.configure(` +webhook_label = "LABEL2" +kube_config_file_path = "/some/file/path" +`) + s.Require().Eventually(func() bool { + return w.getWatchLabel() == "LABEL2" + }, testTimeout, time.Second) +} + +func (s *Suite) TestBundleWatcherAddEvent() { + w := newFakeWebhook() + s.withKubeClient(w, "/some/file/path") + s.configure(` +webhook_label = "LABEL" +kube_config_file_path = "/some/file/path" +`) + + webhook := newWebhook() + s.r.AppendBundle(testBundle) + w.setWebhook(webhook) + w.addWatchEvent(webhook) + + s.Require().Eventually(func() bool { + return s.Equal(&admissionv1.MutatingWebhookConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "spire-webhook", + ResourceVersion: "2", + }, + Webhooks: []admissionv1.MutatingWebhook{ + { + ClientConfig: admissionv1.WebhookClientConfig{ + CABundle: []byte(testBundleData), + }, + }, + }, + }, w.getWebhook("spire-webhook")) + }, testTimeout, time.Second) +} + +type fakeWebhook struct { + mu sync.RWMutex + fakeWatch *watch.FakeWatcher + webhooks map[string]*admissionv1.MutatingWebhookConfiguration + watchLabel string +} + +func newFakeWebhook() *fakeWebhook { + w := &fakeWebhook{ + fakeWatch: watch.NewFake(), + webhooks: make(map[string]*admissionv1.MutatingWebhookConfiguration), + } + return w +} + +func (w *fakeWebhook) Get(ctx context.Context, namespace, name string) (runtime.Object, error) { + entry := w.getWebhook(name) + if entry == nil { + return nil, errors.New("not found") + } + return entry, nil +} +func (w *fakeWebhook) GetList(ctx context.Context, config *pluginConfig) (runtime.Object, error) { + list := w.getWebhookList() + if list.Items == nil { + return nil, errors.New("not found") + } + return list, nil +} + +func (w *fakeWebhook) CreatePatch(ctx context.Context, config *pluginConfig, obj runtime.Object, resp *hostservices.FetchX509IdentityResponse) (runtime.Object, error) { + webhook, ok := obj.(*admissionv1.MutatingWebhookConfiguration) + if !ok { + return nil, k8sErr.New("wrong type, expecting mutating webhook") + } + return &admissionv1.MutatingWebhookConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: webhook.ResourceVersion, + }, + Webhooks: []admissionv1.MutatingWebhook{ + { + ClientConfig: admissionv1.WebhookClientConfig{ + CABundle: []byte(bundleData(resp.Bundle)), + }, + }, + }, + }, nil +} + +func (w *fakeWebhook) Patch(ctx context.Context, namespace, name string, patchBytes []byte) error { + w.mu.Lock() + defer w.mu.Unlock() + + entry, ok := w.webhooks[name] + if !ok { + return errors.New("not found") + } + + patchedWebhook := new(admissionv1.MutatingWebhookConfiguration) + if err := json.Unmarshal(patchBytes, patchedWebhook); err != nil { + return err + } + resourceVersion, err := strconv.Atoi(patchedWebhook.ResourceVersion) + if err != nil { + return errors.New("patch does not have resource version") + } + entry.ResourceVersion = fmt.Sprint(resourceVersion + 1) + for i := range entry.Webhooks { + entry.Webhooks[i].ClientConfig.CABundle = patchedWebhook.Webhooks[i].ClientConfig.CABundle + } + return nil +} + +func (w *fakeWebhook) Watch(ctx context.Context, config *pluginConfig) (watch.Interface, error) { + w.mu.Lock() + defer w.mu.Unlock() + w.watchLabel = config.WebhookLabel + return w.fakeWatch, nil +} + +func (w *fakeWebhook) getWebhook(name string) *admissionv1.MutatingWebhookConfiguration { + w.mu.RLock() + defer w.mu.RUnlock() + return w.webhooks[name] +} + +func (w *fakeWebhook) getWebhookList() *admissionv1.MutatingWebhookConfigurationList { + w.mu.RLock() + defer w.mu.RUnlock() + webhookList := &admissionv1.MutatingWebhookConfigurationList{} + for _, webhook := range w.webhooks { + webhookList.Items = append(webhookList.Items, *webhook) + } + return webhookList +} + +func (w *fakeWebhook) setWebhook(webhook *admissionv1.MutatingWebhookConfiguration) { + w.mu.Lock() + defer w.mu.Unlock() + w.webhooks[webhook.Name] = webhook +} + +func (w *fakeWebhook) addWatchEvent(obj runtime.Object) { + w.fakeWatch.Add(obj) +} + +func (w *fakeWebhook) getWatchLabel() string { + w.mu.RLock() + defer w.mu.RUnlock() + return w.watchLabel +} + +func newWebhook() *admissionv1.MutatingWebhookConfiguration { + return &admissionv1.MutatingWebhookConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "spire-webhook", + ResourceVersion: "1", + }, + Webhooks: []admissionv1.MutatingWebhook{ + { + ClientConfig: admissionv1.WebhookClientConfig{}, + }, + }, + } +} diff --git a/pkg/server/plugin/upstreamauthority/vault/vault_client.go b/pkg/server/plugin/upstreamauthority/vault/vault_client.go index 2490ee5bc1..e726960a06 100644 --- a/pkg/server/plugin/upstreamauthority/vault/vault_client.go +++ b/pkg/server/plugin/upstreamauthority/vault/vault_client.go @@ -162,7 +162,6 @@ func (c *ClientConfig) NewAuthenticatedClient(method AuthMethod) (client *Client if sec == nil { return nil, false, errors.New("lookup self response is nil") } - client.SetToken(c.clientParams.Token) case CERT: path := fmt.Sprintf("auth/%v/login", c.clientParams.CertAuthMountPoint) sec, err = client.Auth(path, map[string]interface{}{ @@ -306,6 +305,11 @@ func (c *Client) Auth(path string, body map[string]interface{}) (*vapi.Secret, e } func (c *Client) LookupSelf(token string) (*vapi.Secret, error) { + if token == "" { + return nil, errors.New("token is empty") + } + c.SetToken(token) + secret, err := c.vaultClient.Logical().Read("auth/token/lookup-self") if err != nil { return nil, fmt.Errorf("token lookup failed: %v", err) diff --git a/pkg/server/plugin/upstreamauthority/vault/vault_client_test.go b/pkg/server/plugin/upstreamauthority/vault/vault_client_test.go index 5e4edec6bd..62b65cfe1f 100644 --- a/pkg/server/plugin/upstreamauthority/vault/vault_client_test.go +++ b/pkg/server/plugin/upstreamauthority/vault/vault_client_test.go @@ -163,30 +163,44 @@ func (vcs *VaultClientSuite) Test_NewAuthenticatedClient_TokenAuth() { vcs.fakeVaultServer.LookupSelfResponseCode = 200 for _, c := range []struct { name string + token string response []byte reusable bool namespace string + err string }{ { name: "Token Authentication success / Token never expire", + token: "test-token", response: []byte(testLookupSelfResponseNeverExpire), reusable: true, }, { name: "Token Authentication success / Token is renewable", + token: "test-token", response: []byte(testLookupSelfResponse), reusable: true, }, { name: "Token Authentication success / Token is not renewable", + token: "test-token", response: []byte(testLookupSelfResponseNotRenewable), }, { name: "Token Authentication success / Token is renewable / Namespace is given", + token: "test-token", response: []byte(testCertAuthResponse), reusable: true, namespace: "test-ns", }, + { + name: "Token Authentication error / Token is empty", + token: "", + response: []byte(testCertAuthResponse), + reusable: true, + namespace: "test-ns", + err: "token is empty", + }, } { c := c vcs.Run(c.name, func() { @@ -202,18 +216,21 @@ func (vcs *VaultClientSuite) Test_NewAuthenticatedClient_TokenAuth() { VaultAddr: fmt.Sprintf("https://%v/", addr), Namespace: c.namespace, CACertPath: testRootCert, - Token: "test-token", + Token: c.token, } cc, err := NewClientConfig(cp, hclog.Default()) vcs.Require().NoError(err) client, reusable, err := cc.NewAuthenticatedClient(TOKEN) - vcs.Require().NoError(err) - vcs.Require().Equal(c.reusable, reusable) - - if cp.Namespace != "" { - headers := client.vaultClient.Headers() - vcs.Require().Equal(cp.Namespace, headers.Get(consts.NamespaceHeaderName)) + if c.err != "" { + vcs.Require().Equal(err.Error(), c.err) + } else { + vcs.Require().NoError(err) + vcs.Require().Equal(c.reusable, reusable) + if cp.Namespace != "" { + headers := client.vaultClient.Headers() + vcs.Require().Equal(cp.Namespace, headers.Get(consts.NamespaceHeaderName)) + } } }) } diff --git a/pkg/server/server.go b/pkg/server/server.go index 00ea61e775..4f85f08409 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -2,6 +2,7 @@ package server import ( "context" + "errors" "fmt" "net/http" _ "net/http/pprof" //nolint: gosec // import registers routes on DefaultServeMux @@ -9,10 +10,10 @@ import ( "os" "runtime" "sync" - "time" "github.com/andres-erbsen/clock" "github.com/spiffe/go-spiffe/v2/spiffeid" + server_util "github.com/spiffe/spire/cmd/spire-server/util" "github.com/spiffe/spire/pkg/common/health" "github.com/spiffe/spire/pkg/common/hostservices/metricsservice" common_services "github.com/spiffe/spire/pkg/common/plugin/hostservices" @@ -30,6 +31,7 @@ import ( "github.com/spiffe/spire/pkg/server/plugin/hostservices" "github.com/spiffe/spire/pkg/server/registration" "github.com/spiffe/spire/pkg/server/svid" + "github.com/spiffe/spire/proto/spire/api/server/bundle/v1" "google.golang.org/grpc" ) @@ -85,6 +87,7 @@ func (s *Server) run(ctx context.Context) (err error) { }) telemetry.EmitVersion(metrics) + uptime.ReportMetrics(ctx, metrics) // Create the identity provider host service. It will not be functional // until the call to SetDeps() below. There is some tricky initialization @@ -159,7 +162,7 @@ func (s *Server) run(ctx context.Context) (err error) { registrationManager := s.newRegistrationManager(cat, metrics) - if err := healthChecks.AddCheck("server", s, time.Minute); err != nil { + if err := healthChecks.AddCheck("server", s); err != nil { return fmt.Errorf("failed adding healthcheck: %v", err) } @@ -386,5 +389,23 @@ func (s *Server) validateTrustDomain(ctx context.Context, ds datastore.DataStore // Status is used as a top-level health check for the Server. func (s *Server) Status() (interface{}, error) { - return nil, nil + client, err := server_util.NewServerClient(s.config.BindUDSAddress.Name) + if err != nil { + return nil, errors.New("cannot create registration client") + } + defer client.Release() + + bundleClient := client.NewBundleClient() + + // Currently using the ability to fetch a bundle as the health check. This + // **could** be problematic if the Upstream CA signing process is lengthy. + // As currently coded however, the API isn't served until after + // the server CA has been signed by upstream. + if _, err := bundleClient.GetBundle(context.Background(), &bundle.GetBundleRequest{}); err != nil { + return nil, errors.New("unable to fetch bundle") + } + + return health.Details{ + Message: "successfully fetched bundle", + }, nil } diff --git a/support/k8s/k8s-workload-registrar/mode-crd/controllers/spiffeid_controller.go b/support/k8s/k8s-workload-registrar/mode-crd/controllers/spiffeid_controller.go index 22a91748f7..661d16b34f 100644 --- a/support/k8s/k8s-workload-registrar/mode-crd/controllers/spiffeid_controller.go +++ b/support/k8s/k8s-workload-registrar/mode-crd/controllers/spiffeid_controller.go @@ -306,8 +306,15 @@ func spiffeIDFromString(rawID string) (*types.SPIFFEID, error) { func entryEqual(existing, current *types.Entry) bool { return equalStringSlice(existing.DnsNames, current.DnsNames) && selectorSetsEqual(existing.Selectors, current.Selectors) && - existing.SpiffeId == current.SpiffeId && - existing.ParentId == current.ParentId + spiffeIDEqual(existing.SpiffeId, current.SpiffeId) && + spiffeIDEqual(existing.ParentId, current.ParentId) +} + +func spiffeIDEqual(existing, current *types.SPIFFEID) bool { + if existing == nil || current == nil { + return existing == current + } + return existing.String() == current.String() } func selectorSetsEqual(as, bs []*types.Selector) bool { diff --git a/support/k8s/k8s-workload-registrar/mode-crd/controllers/spiffeid_controller_test.go b/support/k8s/k8s-workload-registrar/mode-crd/controllers/spiffeid_controller_test.go index e3da19813e..27d873fade 100644 --- a/support/k8s/k8s-workload-registrar/mode-crd/controllers/spiffeid_controller_test.go +++ b/support/k8s/k8s-workload-registrar/mode-crd/controllers/spiffeid_controller_test.go @@ -117,6 +117,28 @@ func (s *SpiffeIDControllerTestSuite) TestCreateSpiffeID() { s.Require().Equal(createdSpiffeID.Spec.Selector.PodName, "test") } +func (s *SpiffeIDControllerTestSuite) TestSpiffeIDEqual() { + var existing, current *spireTypes.SPIFFEID + // Both nil + s.Require().True(spiffeIDEqual(existing, current)) + + // One nil + var err error + current, err = spiffeIDFromString(makeID(s.trustDomain, "%s", SpiffeIDName)) + s.Require().Nil(err) + s.Require().False(spiffeIDEqual(existing, current)) + + // Equal + existing, err = spiffeIDFromString(makeID(s.trustDomain, "%s", SpiffeIDName)) + s.Require().Nil(err) + s.Require().True(spiffeIDEqual(existing, current)) + + // Not equal + current, err = spiffeIDFromString(makeID(s.trustDomain, "%s", "spiffeid-not-equal")) + s.Require().Nil(err) + s.Require().False(spiffeIDEqual(existing, current)) +} + func stringFromID(id *spireTypes.SPIFFEID) string { return fmt.Sprintf("spiffe://%s%s", id.TrustDomain, id.Path) } diff --git a/support/k8s/k8s-workload-registrar/server.go b/support/k8s/k8s-workload-registrar/server.go index 68d44daba6..f2fe030483 100644 --- a/support/k8s/k8s-workload-registrar/server.go +++ b/support/k8s/k8s-workload-registrar/server.go @@ -37,6 +37,7 @@ func NewServer(config ServerConfig) (*Server, error) { tlsConfig := &tls.Config{ Certificates: []tls.Certificate{cert}, + MinVersion: tls.VersionTLS12, } if !config.InsecureSkipClientVerification { tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert diff --git a/test/integration/suites/k8s-reconcile/conf/agent/spire-agent.yaml b/test/integration/suites/k8s-reconcile/conf/agent/spire-agent.yaml index eea78dcce1..a0992f27b9 100644 --- a/test/integration/suites/k8s-reconcile/conf/agent/spire-agent.yaml +++ b/test/integration/suites/k8s-reconcile/conf/agent/spire-agent.yaml @@ -78,6 +78,14 @@ data: } } + health_checks { + listener_enabled = true + bind_address = "0.0.0.0" + bind_port = "8080" + live_path = "/live" + ready_path = "/ready" + } + --- apiVersion: apps/v1 @@ -129,13 +137,15 @@ spec: - name: spire-token mountPath: /var/run/secrets/tokens livenessProbe: - exec: - command: ["/opt/spire/bin/spire-agent", "healthcheck", "-socketPath", "/run/spire/sockets/agent.sock"] + httpGet: + path: /live + port: 8080 initialDelaySeconds: 10 periodSeconds: 10 readinessProbe: - exec: - command: ["/opt/spire/bin/spire-agent", "healthcheck", "-socketPath", "/run/spire/sockets/agent.sock", "--shallow"] + httpGet: + path: /ready + port: 8080 initialDelaySeconds: 10 periodSeconds: 10 volumes: @@ -151,8 +161,8 @@ spec: type: DirectoryOrCreate - name: spire-token projected: - sources: - - serviceAccountToken: - path: spire-agent - expirationSeconds: 7200 - audience: spire-server + sources: + - serviceAccountToken: + path: spire-agent + expirationSeconds: 7200 + audience: spire-server diff --git a/test/integration/suites/k8s-reconcile/conf/server/spire-server.yaml b/test/integration/suites/k8s-reconcile/conf/server/spire-server.yaml index 0dce35b550..9adad1704d 100644 --- a/test/integration/suites/k8s-reconcile/conf/server/spire-server.yaml +++ b/test/integration/suites/k8s-reconcile/conf/server/spire-server.yaml @@ -157,6 +157,14 @@ data: } } + health_checks { + listener_enabled = true + bind_address = "0.0.0.0" + bind_port = "8080" + live_path = "/live" + ready_path = "/ready" + } + --- apiVersion: v1 @@ -171,6 +179,7 @@ data: cluster = "example-cluster" server_socket_path = "/run/spire/sockets/registration.sock" leader_election = true + metrics_addr = "0.0.0.0:18080" --- @@ -212,13 +221,15 @@ spec: mountPath: /run/spire/sockets readOnly: false livenessProbe: - exec: - command: ["/opt/spire/bin/spire-server", "healthcheck", "-registrationUDSPath", "/run/spire/sockets/registration.sock"] + httpGet: + path: /live + port: 8080 initialDelaySeconds: 5 periodSeconds: 5 readinessProbe: - exec: - command: ["/opt/spire/bin/spire-server", "healthcheck", "-registrationUDSPath", "/run/spire/sockets/registration.sock", "--shallow"] + httpGet: + path: /ready + port: 8080 initialDelaySeconds: 5 periodSeconds: 5 - name: k8s-workload-registrar @@ -279,5 +290,3 @@ spec: ports: - port: 443 targetPort: registrar-port - - diff --git a/test/integration/suites/k8s/conf/agent/spire-agent.yaml b/test/integration/suites/k8s/conf/agent/spire-agent.yaml index eea78dcce1..a0992f27b9 100644 --- a/test/integration/suites/k8s/conf/agent/spire-agent.yaml +++ b/test/integration/suites/k8s/conf/agent/spire-agent.yaml @@ -78,6 +78,14 @@ data: } } + health_checks { + listener_enabled = true + bind_address = "0.0.0.0" + bind_port = "8080" + live_path = "/live" + ready_path = "/ready" + } + --- apiVersion: apps/v1 @@ -129,13 +137,15 @@ spec: - name: spire-token mountPath: /var/run/secrets/tokens livenessProbe: - exec: - command: ["/opt/spire/bin/spire-agent", "healthcheck", "-socketPath", "/run/spire/sockets/agent.sock"] + httpGet: + path: /live + port: 8080 initialDelaySeconds: 10 periodSeconds: 10 readinessProbe: - exec: - command: ["/opt/spire/bin/spire-agent", "healthcheck", "-socketPath", "/run/spire/sockets/agent.sock", "--shallow"] + httpGet: + path: /ready + port: 8080 initialDelaySeconds: 10 periodSeconds: 10 volumes: @@ -151,8 +161,8 @@ spec: type: DirectoryOrCreate - name: spire-token projected: - sources: - - serviceAccountToken: - path: spire-agent - expirationSeconds: 7200 - audience: spire-server + sources: + - serviceAccountToken: + path: spire-agent + expirationSeconds: 7200 + audience: spire-server diff --git a/test/integration/suites/k8s/conf/server/spire-server.yaml b/test/integration/suites/k8s/conf/server/spire-server.yaml index 2bba5e93f4..cbc804cd66 100644 --- a/test/integration/suites/k8s/conf/server/spire-server.yaml +++ b/test/integration/suites/k8s/conf/server/spire-server.yaml @@ -147,6 +147,14 @@ data: } } + health_checks { + listener_enabled = true + bind_address = "0.0.0.0" + bind_port = "8080" + live_path = "/live" + ready_path = "/ready" + } + --- apiVersion: v1 @@ -156,9 +164,9 @@ metadata: namespace: spire data: k8s-workload-registrar.conf: | - cert_path = "/run/spire/k8s-workload-registrar/certs/server-cert.pem" - key_path = "/run/spire/k8s-workload-registrar/secret/server-key.pem" - cacert_path = "/run/spire/k8s-workload-registrar/certs/cacert.pem" + cert_path = "/run/spire/k8s-workload-registrar/certs/server-cert.pem" + key_path = "/run/spire/k8s-workload-registrar/secret/server-key.pem" + cacert_path = "/run/spire/k8s-workload-registrar/certs/cacert.pem" trust_domain = "example.org" cluster = "example-cluster" server_socket_path = "/run/spire/sockets/registration.sock" @@ -242,13 +250,15 @@ spec: mountPath: /run/spire/sockets readOnly: false livenessProbe: - exec: - command: ["/opt/spire/bin/spire-server", "healthcheck", "-registrationUDSPath", "/run/spire/sockets/registration.sock"] + httpGet: + path: /live + port: 8080 initialDelaySeconds: 5 periodSeconds: 5 readinessProbe: - exec: - command: ["/opt/spire/bin/spire-server", "healthcheck", "-registrationUDSPath", "/run/spire/sockets/registration.sock", "--shallow"] + httpGet: + path: /ready + port: 8080 initialDelaySeconds: 5 periodSeconds: 5 - name: k8s-workload-registrar @@ -321,5 +331,3 @@ spec: ports: - port: 443 targetPort: registrar-port - -