diff --git a/internal/component/prometheus/scrape/scrape.go b/internal/component/prometheus/scrape/scrape.go index 889a5cf5c3c..b3791d31d12 100644 --- a/internal/component/prometheus/scrape/scrape.go +++ b/internal/component/prometheus/scrape/scrape.go @@ -464,16 +464,29 @@ func (c *Component) Update(args component.Arguments) error { c.mut.Lock() defer c.mut.Unlock() + // Always store the latest targets and schedule a reload, even if the rest + // of the update fails. This ensures the component scrapes the correct set + // of targets when running with a partially-updated config. + c.args.Targets = newArgs.Targets + defer func() { + select { + case c.reloadTargets <- struct{}{}: + default: + } + }() + // Some fields are not updateable at runtime - only allow them when Update() // is called for the first time from New(). if !c.firstUpdateDone { c.firstUpdateDone = true } else { if c.args.ScrapeNativeHistograms != newArgs.ScrapeNativeHistograms { - return fmt.Errorf("scrape_native_histograms cannot be updated at runtime") + level.Warn(c.opts.Logger).Log("msg", "scrape_native_histograms cannot be changed at runtime; the component will continue using the original setting until Alloy is restarted", "current", c.args.ScrapeNativeHistograms, "requested", newArgs.ScrapeNativeHistograms) + newArgs.ScrapeNativeHistograms = c.args.ScrapeNativeHistograms } if c.args.ExtraMetrics != newArgs.ExtraMetrics { - return fmt.Errorf("extra_metrics cannot be updated at runtime") + level.Warn(c.opts.Logger).Log("msg", "extra_metrics cannot be changed at runtime; the component will continue using the original setting until Alloy is restarted", "current", c.args.ExtraMetrics, "requested", newArgs.ExtraMetrics) + newArgs.ExtraMetrics = c.args.ExtraMetrics } } @@ -493,11 +506,6 @@ func (c *Component) Update(args component.Arguments) error { } level.Debug(c.opts.Logger).Log("msg", "scrape config was updated") - select { - case c.reloadTargets <- struct{}{}: - default: - } - return nil } diff --git a/internal/component/prometheus/scrape/scrape_test.go b/internal/component/prometheus/scrape/scrape_test.go index 68ae90cc09f..24213bb3ab2 100644 --- a/internal/component/prometheus/scrape/scrape_test.go +++ b/internal/component/prometheus/scrape/scrape_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/go-kit/log" "github.com/grafana/ckit/memconn" prometheus_client "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -163,33 +164,9 @@ func TestCustomDialer(t *testing.T) { err := syntax.Unmarshal([]byte(config), &args) require.NoError(t, err) - opts := component.Options{ - Logger: util.TestAlloyLogger(t), - Registerer: prometheus_client.NewRegistry(), - GetServiceData: func(name string) (any, error) { - switch name { - case http_service.ServiceName: - return http_service.Data{ - HTTPListenAddr: "inmemory:80", - MemoryListenAddr: "inmemory:80", - BaseHTTPPath: "/", - DialFunc: func(ctx context.Context, network, address string) (net.Conn, error) { - return memLis.DialContext(ctx) - }, - }, nil - - case cluster.ServiceName: - return cluster.Mock(), nil - case labelstore.ServiceName: - return labelstore.New(nil, prometheus_client.DefaultRegisterer), nil - case livedebugging.ServiceName: - return livedebugging.NewLiveDebugging(), nil - - default: - return nil, fmt.Errorf("service %q does not exist", name) - } - }, - } + opts := newComponentOpts(t, func(ctx context.Context, network, address string) (net.Conn, error) { + return memLis.DialContext(ctx) + }) s, err := New(opts, args) require.NoError(t, err) @@ -712,39 +689,7 @@ func testScrapingAllMetricTypes(t *testing.T, enableTypeAndUnitLabels bool) { // Create a Prometheus registry and metrics for protobuf format reg := setupTestMetrics() - // Create HTTP server that serves metrics in protobuf format - server := &http.Server{ - Addr: "127.0.0.1:0", // Let the OS choose a free port - Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path == "/metrics" { - // Create a promhttp handler that prefers protobuf format - handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{ - EnableOpenMetrics: true, - }) - handler.ServeHTTP(w, r) - } else { - w.WriteHeader(http.StatusNotFound) - } - }), - } - - listener, err := net.Listen("tcp", server.Addr) - require.NoError(t, err) - serverAddr := listener.Addr().String() - - go func() { - server.Serve(listener) - }() - defer server.Shutdown(ctx) - - // Wait a moment for server to start - time.Sleep(100 * time.Millisecond) - - // Test that the server is working by making a direct request - resp, err := http.Get(fmt.Sprintf("http://%s/metrics", serverAddr)) - require.NoError(t, err) - defer resp.Body.Close() - require.Equal(t, http.StatusOK, resp.StatusCode) + serverAddr := startMetricsServer(t, reg) // Set up test appender using the testappender utility appender := testappender.NewCollectingAppender() @@ -753,29 +698,7 @@ func testScrapingAllMetricTypes(t *testing.T, enableTypeAndUnitLabels bool) { mockAppendable := testappender.ConstantAppendable{Inner: appender} // Set up component options - opts := component.Options{ - Logger: util.TestAlloyLogger(t), - Registerer: prometheus_client.NewRegistry(), - GetServiceData: func(name string) (any, error) { - switch name { - case http_service.ServiceName: - return http_service.Data{ - HTTPListenAddr: "localhost:12345", - MemoryListenAddr: "alloy.internal:1245", - BaseHTTPPath: "/", - DialFunc: (&net.Dialer{}).DialContext, - }, nil - case cluster.ServiceName: - return cluster.Mock(), nil - case labelstore.ServiceName: - return labelstore.New(nil, prometheus_client.DefaultRegisterer), nil - case livedebugging.ServiceName: - return livedebugging.NewLiveDebugging(), nil - default: - return nil, fmt.Errorf("service %q does not exist", name) - } - }, - } + opts := newComponentOpts(t) // Configure scrape arguments var args Arguments @@ -907,3 +830,274 @@ func testScrapingAllMetricTypes(t *testing.T, enableTypeAndUnitLabels bool) { t.Logf("Successfully scraped %d samples with %d metadata entries and %d histograms", len(actualSamples), len(actualMetadata), len(actualHistograms)) } + +// --- Helpers for TestRuntimeUpdate --- + +// defaultFastScrapeArgs returns Arguments with defaults pointing at addr, a 50 ms +// scrape interval and a shared appendable. Validate() is NOT called so callers can +// modify fields before validating. +func defaultFastScrapeArgs(addr string, app storage.Appendable) Arguments { + var args Arguments + args.SetToDefault() + args.Targets = []discovery.Target{ + discovery.NewTargetFromLabelSet(model.LabelSet{"__address__": model.LabelValue(addr)}), + } + args.ForwardTo = []storage.Appendable{app} + args.ScrapeInterval = 50 * time.Millisecond + args.ScrapeTimeout = 40 * time.Millisecond + args.JobName = "test_job" + return args +} + +// hasSampleForMetric returns true when any sample in the map carries __name__ == name. +func hasSampleForMetric(samples map[string]*testappender.MetricSample, name string) bool { + for _, s := range samples { + if s.Labels.Get("__name__") == name { + return true + } + } + return false +} + +// TestRuntimeUpdate verifies that config fields can (or gracefully cannot) be changed +// while the component is running, and that targets are always updated regardless. +func TestRuntimeUpdate(t *testing.T) { + type testCase struct { + name string + // Metric registry scraped prior to the config update. + setupRegistryA func(t *testing.T) *prometheus_client.Registry + // Metric registry scraped after the config update. + setupRegistryB func(t *testing.T) *prometheus_client.Registry + // Arguments used to configure the component prior to the config update. + initialArgs func(t *testing.T, addrA, addrB string, app storage.Appendable) Arguments + // Arguments used to configure the component after the config update. + updatedArgs func(t *testing.T, addrA, addrB string, app storage.Appendable) Arguments + // preUpdateCheck is polled until it passes, then Update is called. + preUpdateCheck func(ct *assert.CollectT, c testappender.CollectingAppender) + // postUpdateCheck is polled after Update returns. + // Because each test uses unique metric names for the pre- and post-update states, + // the post-update metric can only appear after the update takes effect. + postUpdateCheck func(ct *assert.CollectT, c testappender.CollectingAppender) + expectUpdateError bool + } + + singleGaugeRegistry := func(name string) func(t *testing.T) *prometheus_client.Registry { + return func(t *testing.T) *prometheus_client.Registry { + reg := prometheus_client.NewRegistry() + g := prometheus_client.NewGauge(prometheus_client.GaugeOpts{Name: name}) + g.Set(1) + reg.MustRegister(g) + return reg + } + } + + tests := []testCase{ + { + // Changing targets must cause the new target to be scraped. + // server_b_up can only appear once the target switches to server B. + name: "targets are updated", + setupRegistryA: singleGaugeRegistry("server_a_up"), + setupRegistryB: singleGaugeRegistry("server_b_up"), + initialArgs: func(t *testing.T, addrA, _ string, app storage.Appendable) Arguments { + args := defaultFastScrapeArgs(addrA, app) + require.NoError(t, args.Validate()) + return args + }, + updatedArgs: func(t *testing.T, _, addrB string, app storage.Appendable) Arguments { + args := defaultFastScrapeArgs(addrB, app) + require.NoError(t, args.Validate()) + return args + }, + preUpdateCheck: func(ct *assert.CollectT, c testappender.CollectingAppender) { + assert.True(ct, hasSampleForMetric(c.CollectedSamples(), "server_a_up"), "server_a_up should appear before update") + }, + postUpdateCheck: func(ct *assert.CollectT, c testappender.CollectingAppender) { + // TODO: Check that server A is no longer being scraped + assert.True(ct, hasSampleForMetric(c.CollectedSamples(), "server_b_up"), "server_b_up should appear after target update") + }, + }, + { + // Changing scrape_native_histograms is silently ignored (a warning is logged); the component + // must keep running and must still apply the updated target list. + // server_b_up can only appear once the target switches to server B. + name: "scrape_native_histograms change is ignored but component continues with updated targets", + setupRegistryA: singleGaugeRegistry("server_a_up"), + setupRegistryB: singleGaugeRegistry("server_b_up"), + initialArgs: func(t *testing.T, addrA, _ string, app storage.Appendable) Arguments { + args := defaultFastScrapeArgs(addrA, app) + args.ScrapeNativeHistograms = false + require.NoError(t, args.Validate()) + return args + }, + updatedArgs: func(t *testing.T, _, addrB string, app storage.Appendable) Arguments { + // Flip ScrapeNativeHistograms AND change the target. The ScrapeNativeHistograms change + // must be ignored (warning logged) while the target change must still take effect. + args := defaultFastScrapeArgs(addrB, app) + args.ScrapeNativeHistograms = true + require.NoError(t, args.Validate()) + return args + }, + preUpdateCheck: func(ct *assert.CollectT, c testappender.CollectingAppender) { + assert.True(ct, hasSampleForMetric(c.CollectedSamples(), "server_a_up"), "server_a_up should appear before update") + }, + postUpdateCheck: func(ct *assert.CollectT, c testappender.CollectingAppender) { + assert.True(ct, hasSampleForMetric(c.CollectedSamples(), "server_b_up"), "server_b_up should appear: targets must be updated even when scrape_native_histograms change is ignored") + }, + }, + { + // Changing extra_metrics is silently ignored (a warning is logged); the component + // must keep running and must still apply the updated target list. + // server_b_up can only appear once the target switches to server B. + name: "extra_metrics change is ignored but component continues with updated targets", + setupRegistryA: singleGaugeRegistry("server_a_up"), + setupRegistryB: singleGaugeRegistry("server_b_up"), + initialArgs: func(t *testing.T, addrA, _ string, app storage.Appendable) Arguments { + args := defaultFastScrapeArgs(addrA, app) + args.ExtraMetrics = false + require.NoError(t, args.Validate()) + return args + }, + updatedArgs: func(t *testing.T, _, addrB string, app storage.Appendable) Arguments { + // Flip ExtraMetrics AND change the target. The ExtraMetrics change must be + // ignored (warning logged) while the target change must still take effect. + args := defaultFastScrapeArgs(addrB, app) + args.ExtraMetrics = true + require.NoError(t, args.Validate()) + return args + }, + preUpdateCheck: func(ct *assert.CollectT, c testappender.CollectingAppender) { + assert.True(ct, hasSampleForMetric(c.CollectedSamples(), "server_a_up"), "server_a_up should appear before update") + }, + postUpdateCheck: func(ct *assert.CollectT, c testappender.CollectingAppender) { + // TODO: Also check the log for the warning + assert.True(ct, hasSampleForMetric(c.CollectedSamples(), "server_b_up"), "server_b_up should appear: targets must be updated even when extra_metrics change is ignored") + }, + }, + { + // When ApplyConfig fails, the deferred reloadTargets signal still fires and the + // new targets stored in c.args are picked up by the Run loop. + // A non-existent TLS CA file causes scrape.Manager.ApplyConfig to fail when it + // tries to build the HTTP client, but the scrape manager keeps its previous + // (TLS-free) config and can still reach plain-HTTP server B. + // server_b_up can only appear once the target switches to server B. + name: "targets are updated even when ApplyConfig fails", + setupRegistryA: singleGaugeRegistry("server_a_up"), + setupRegistryB: singleGaugeRegistry("server_b_up"), + initialArgs: func(t *testing.T, addrA, _ string, app storage.Appendable) Arguments { + args := defaultFastScrapeArgs(addrA, app) + require.NoError(t, args.Validate()) + return args + }, + updatedArgs: func(t *testing.T, _, addrB string, app storage.Appendable) Arguments { + args := defaultFastScrapeArgs(addrB, app) + // A non-existent CA file passes Arguments.Validate() (file existence is not + // checked there) but causes ApplyConfig to fail when building the TLS client. + args.HTTPClientConfig.TLSConfig.CAFile = "/nonexistent/ca.pem" + require.NoError(t, args.Validate()) + return args + }, + preUpdateCheck: func(ct *assert.CollectT, c testappender.CollectingAppender) { + assert.True(ct, hasSampleForMetric(c.CollectedSamples(), "server_a_up"), "server_a_up should appear before update") + }, + postUpdateCheck: func(ct *assert.CollectT, c testappender.CollectingAppender) { + assert.True(ct, hasSampleForMetric(c.CollectedSamples(), "server_b_up"), "server_b_up should appear even when ApplyConfig fails") + }, + expectUpdateError: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + addrA := startMetricsServer(t, tc.setupRegistryA(t)) + addrB := startMetricsServer(t, tc.setupRegistryB(t)) + + appender := testappender.NewCollectingAppender() + appendable := testappender.ConstantAppendable{Inner: appender} + + c, err := New(newComponentOpts(t), tc.initialArgs(t, addrA, addrB, appendable)) + require.NoError(t, err) + go c.Run(ctx) + + require.EventuallyWithT(t, func(ct *assert.CollectT) { + tc.preUpdateCheck(ct, appender) + }, 10*time.Second, 50*time.Millisecond, "pre-update check timed out") + + updateErr := c.Update(tc.updatedArgs(t, addrA, addrB, appendable)) + if tc.expectUpdateError { + require.Error(t, updateErr) + } else { + require.NoError(t, updateErr) + } + + require.EventuallyWithT(t, func(ct *assert.CollectT) { + tc.postUpdateCheck(ct, appender) + }, 10*time.Second, 50*time.Millisecond, "post-update check timed out") + }) + } +} + +// --- Helpers for all tests --- + +// startMetricsServer starts a TCP HTTP server that serves reg at /metrics. +// The server is shut down automatically when the test ends. +func startMetricsServer(t *testing.T, reg *prometheus_client.Registry) string { + t.Helper() + handler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{EnableOpenMetrics: true}) + srv := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/metrics" { + handler.ServeHTTP(w, r) + return + } + w.WriteHeader(http.StatusNotFound) + }), + } + lis, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + addr := lis.Addr().String() + go func() { _ = srv.Serve(lis) }() + t.Cleanup(func() { _ = srv.Shutdown(context.Background()) }) + return addr +} + +// newComponentOpts returns component.Options suitable for tests. +// componentID is used as the component identifier and is included in every log line +// as component_path/component_id fields, mirroring production behaviour. +// An optional dialFunc overrides the default (*net.Dialer).DialContext, which is +// useful for tests that need an in-memory or otherwise custom transport. +func newComponentOpts(t *testing.T, dialFunc ...func(context.Context, string, string) (net.Conn, error)) component.Options { + t.Helper() + componentID := strings.ReplaceAll(t.Name(), " ", "_") + df := (&net.Dialer{}).DialContext + if len(dialFunc) > 0 && dialFunc[0] != nil { + df = dialFunc[0] + } + baseLogger := util.TestAlloyLogger(t) + return component.Options{ + ID: componentID, + Logger: log.With(baseLogger, "component_path", "prometheus.scrape", "component_id", componentID), + Registerer: prometheus_client.NewRegistry(), + GetServiceData: func(name string) (any, error) { + switch name { + case http_service.ServiceName: + return http_service.Data{ + HTTPListenAddr: "localhost:0", + MemoryListenAddr: "alloy.internal:0", + BaseHTTPPath: "/", + DialFunc: df, + }, nil + case cluster.ServiceName: + return cluster.Mock(), nil + case labelstore.ServiceName: + return labelstore.New(nil, prometheus_client.NewRegistry()), nil + case livedebugging.ServiceName: + return livedebugging.NewLiveDebugging(), nil + default: + return nil, fmt.Errorf("service %q does not exist", name) + } + }, + } +}