diff --git a/internal/component/common/loki/receiver.go b/internal/component/common/loki/receiver.go index 77b94eac920..fcb8d899f79 100644 --- a/internal/component/common/loki/receiver.go +++ b/internal/component/common/loki/receiver.go @@ -1,5 +1,7 @@ package loki +import "sync" + // LogReceiverOption is an option argument passed to NewLogsReceiver. type LogReceiverOption func(*logsReceiver) @@ -65,3 +67,51 @@ type logsBatchReceiver struct { func (l *logsBatchReceiver) Chan() chan []Entry { return l.c } + +func NewCollectingBatchReceiver() *CollectingBatchReceiver { + c := &CollectingBatchReceiver{ + entries: make(chan []Entry), + } + c.wg.Go(func() { + for batch := range c.entries { + c.mtx.Lock() + c.received = append(c.received, batch...) + c.mtx.Unlock() + } + }) + return c +} + +// CollectingBatchReceiver is a LogsBatchReceiver that will +// collect all received entries so it can later be inspected. +// Used in tests. +type CollectingBatchReceiver struct { + entries chan []Entry + received []Entry + mtx sync.Mutex + wg sync.WaitGroup + once sync.Once +} + +func (c *CollectingBatchReceiver) Chan() chan []Entry { + return c.entries +} + +func (c *CollectingBatchReceiver) Received() []Entry { + c.mtx.Lock() + defer c.mtx.Unlock() + cpy := make([]Entry, len(c.received)) + copy(cpy, c.received) + return cpy +} + +func (c *CollectingBatchReceiver) Clear() { + c.mtx.Lock() + defer c.mtx.Unlock() + c.received = []Entry{} +} + +func (c *CollectingBatchReceiver) Stop() { + c.once.Do(func() { close(c.entries) }) + c.wg.Wait() +} diff --git a/internal/component/loki/source/api/api.go b/internal/component/loki/source/api/api.go index b5f2b6ff3a6..b5897f1f907 100644 --- a/internal/component/loki/source/api/api.go +++ b/internal/component/loki/source/api/api.go @@ -139,8 +139,8 @@ func (c *Component) Update(args component.Arguments) error { if err != nil { return fmt.Errorf("failed to create embedded server: %v", err) } - err = c.server.Run() - if err != nil { + + if err = c.server.Run(); err != nil { return fmt.Errorf("failed to run embedded server: %v", err) } } diff --git a/internal/component/loki/source/api/internal/lokipush/push_api_server_test.go b/internal/component/loki/source/api/internal/lokipush/push_api_server_test.go index faaecab689c..cd46bc5c90d 100644 --- a/internal/component/loki/source/api/internal/lokipush/push_api_server_test.go +++ b/internal/component/loki/source/api/internal/lokipush/push_api_server_test.go @@ -11,7 +11,6 @@ import ( "net" "net/http" "strconv" - "sync" "testing" "time" @@ -30,44 +29,6 @@ import ( "github.com/grafana/alloy/syntax" ) -type fakeBatchReceiver struct { - entries chan []loki.Entry - received []loki.Entry - mtx sync.Mutex - wg sync.WaitGroup -} - -func newFakeBatchReceiver() *fakeBatchReceiver { - c := &fakeBatchReceiver{ - entries: make(chan []loki.Entry), - } - c.wg.Go(func() { - for batch := range c.entries { - c.mtx.Lock() - c.received = append(c.received, batch...) - c.mtx.Unlock() - } - }) - return c -} - -func (c *fakeBatchReceiver) Chan() chan []loki.Entry { - return c.entries -} - -func (c *fakeBatchReceiver) Received() []loki.Entry { - c.mtx.Lock() - defer c.mtx.Unlock() - cpy := make([]loki.Entry, len(c.received)) - copy(cpy, c.received) - return cpy -} - -func (c *fakeBatchReceiver) Stop() { - close(c.entries) - c.wg.Wait() -} - const localhost = "127.0.0.1" func TestLokiPushTarget(t *testing.T) { @@ -318,8 +279,7 @@ regex = "dropme" func TestPlaintextPushTarget(t *testing.T) { logger := log.NewNopLogger() - //Create PushAPIServerOld - eh := newFakeBatchReceiver() + eh := loki.NewCollectingBatchReceiver() defer eh.Stop() // Get a randomly available port by open and closing a TCP socket @@ -387,9 +347,7 @@ func TestPlaintextPushTarget(t *testing.T) { func TestPlaintextPushTargetWithXScopeOrgIDHeader(t *testing.T) { logger := log.NewNopLogger() - //Create PushAPIServerOld - - eh := newFakeBatchReceiver() + eh := loki.NewCollectingBatchReceiver() defer eh.Stop() // Get a randomly available port by open and closing a TCP socket @@ -526,9 +484,8 @@ func getFreePort(t *testing.T) int { return port } -func createPushServer(t *testing.T, logger log.Logger) (*PushAPIServer, int, *fakeBatchReceiver) { - //Create PushAPIServerOld - eh := newFakeBatchReceiver() +func createPushServer(t *testing.T, logger log.Logger) (*PushAPIServer, int, *loki.CollectingBatchReceiver) { + eh := loki.NewCollectingBatchReceiver() t.Cleanup(func() { eh.Stop() }) diff --git a/internal/component/loki/source/heroku/heroku.go b/internal/component/loki/source/heroku/heroku.go index ced24d14283..97ae5e635d2 100644 --- a/internal/component/loki/source/heroku/heroku.go +++ b/internal/component/loki/source/heroku/heroku.go @@ -2,6 +2,7 @@ package heroku import ( "context" + "fmt" "reflect" "sync" @@ -9,6 +10,7 @@ import ( "github.com/grafana/alloy/internal/component/common/loki" fnet "github.com/grafana/alloy/internal/component/common/net" alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel" + "github.com/grafana/alloy/internal/component/loki/source" ht "github.com/grafana/alloy/internal/component/loki/source/heroku/internal/herokutarget" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" @@ -53,12 +55,13 @@ type Component struct { metrics *ht.Metrics // Metrics about Heroku entries. serverMetrics *util.UncheckedCollector // Metircs about the HTTP server managed by the component. - mut sync.RWMutex - args Arguments - fanout []loki.LogsReceiver - target *ht.HerokuTarget + mut sync.RWMutex + args Arguments - handler loki.LogsReceiver + fanout *loki.Fanout + server *ht.HerokuServer + + handler loki.LogsBatchReceiver } // New creates a new loki.source.heroku component. @@ -68,9 +71,8 @@ func New(o component.Options, args Arguments) (*Component, error) { metrics: ht.NewMetrics(o.Registerer), mut: sync.RWMutex{}, args: Arguments{}, - fanout: args.ForwardTo, - target: nil, - handler: loki.NewLogsReceiver(), + fanout: loki.NewFanout(args.ForwardTo), + handler: loki.NewLogsBatchReceiver(), serverMetrics: util.NewUncheckedCollector(nil), } @@ -91,26 +93,13 @@ func (c *Component) Run(ctx context.Context) error { defer c.mut.Unlock() level.Info(c.opts.Logger).Log("msg", "loki.source.heroku component shutting down, stopping listener") - if c.target != nil { - err := c.target.Stop() - if err != nil { - level.Error(c.opts.Logger).Log("msg", "error while stopping heroku listener", "err", err) - } + if c.server != nil { + c.server.ForceShutdown() } }() - for { - select { - case <-ctx.Done(): - return nil - case entry := <-c.handler.Chan(): - c.mut.RLock() - for _, receiver := range c.fanout { - receiver.Chan() <- entry - } - c.mut.RUnlock() - } - } + source.ConsumeBatch(ctx, c.handler, c.fanout) + return nil } // Update implements component.Component. @@ -119,7 +108,7 @@ func (c *Component) Update(args component.Arguments) error { defer c.mut.Unlock() newArgs := args.(Arguments) - c.fanout = newArgs.ForwardTo + c.fanout.UpdateChildren(newArgs.ForwardTo) var rcs []*relabel.Config if len(newArgs.RelabelRules) > 0 { @@ -130,12 +119,10 @@ func (c *Component) Update(args component.Arguments) error { changed(c.args.RelabelRules, newArgs.RelabelRules) || changed(c.args.Labels, newArgs.Labels) || c.args.UseIncomingTimestamp != newArgs.UseIncomingTimestamp + if restartRequired { - if c.target != nil { - err := c.target.Stop() - if err != nil { - level.Error(c.opts.Logger).Log("msg", "error while stopping heroku listener", "err", err) - } + if c.server != nil { + c.server.Shutdown() } // [ht.NewHerokuTarget] registers new metrics every time it is called. To @@ -145,14 +132,16 @@ func (c *Component) Update(args component.Arguments) error { registry := prometheus.NewRegistry() c.serverMetrics.SetCollector(registry) - entryHandler := loki.NewEntryHandler(c.handler.Chan(), func() {}) - t, err := ht.NewHerokuTarget(c.metrics, c.opts.Logger, entryHandler, rcs, newArgs.Convert(), registry) + server, err := ht.NewHerokuServer(c.metrics, c.opts.Logger, c.handler, rcs, newArgs.Convert(), registry) if err != nil { - level.Error(c.opts.Logger).Log("msg", "failed to create heroku listener with provided config", "err", err) - return err + return fmt.Errorf("failed to create heroku server: %w", err) + } + + if err := server.Run(); err != nil { + return fmt.Errorf("failed to run heroku server: %w", err) } - c.target = t + c.server = server c.args = newArgs } @@ -160,13 +149,13 @@ func (c *Component) Update(args component.Arguments) error { } // Convert is used to bridge between the Alloy and Promtail types. -func (args *Arguments) Convert() *ht.HerokuDrainTargetConfig { +func (args *Arguments) Convert() *ht.HerokuConfig { lbls := make(model.LabelSet, len(args.Labels)) for k, v := range args.Labels { lbls[model.LabelName(k)] = model.LabelValue(v) } - return &ht.HerokuDrainTargetConfig{ + return &ht.HerokuConfig{ Server: args.Server, Labels: lbls, UseIncomingTimestamp: args.UseIncomingTimestamp, @@ -179,8 +168,8 @@ func (c *Component) DebugInfo() any { defer c.mut.RUnlock() var res = readerDebugInfo{ - Ready: c.target.Ready(), - Address: c.target.HTTPListenAddress(), + Ready: c.server.Ready(), + Address: c.server.HTTPListenAddress(), } return res diff --git a/internal/component/loki/source/heroku/heroku_test.go b/internal/component/loki/source/heroku/heroku_test.go index 51fdd0082c8..b2507adf874 100644 --- a/internal/component/loki/source/heroku/heroku_test.go +++ b/internal/component/loki/source/heroku/heroku_test.go @@ -37,7 +37,7 @@ func TestPush(t *testing.T) { waitForServerToBeReady(t, c) // Create a Heroku Drain Request and send it to the launched server. - req, err := http.NewRequest(http.MethodPost, getEndpoint(c.target), strings.NewReader(testPayload)) + req, err := http.NewRequest(http.MethodPost, getEndpoint(c.server), strings.NewReader(testPayload)) require.NoError(t, err) res, err := http.DefaultClient.Do(req) @@ -146,17 +146,17 @@ func TestUpdate_detectsWhenTargetRequiresARestart(t *testing.T) { defer func() { // in order to cleanly shutdown, we want to make sure the server is running first. waitForServerToBeReady(t, comp) - require.NoError(t, comp.target.Stop()) + comp.server.Shutdown() }() // in order to cleanly update, we want to make sure the server is running first. waitForServerToBeReady(t, comp) - targetBefore := comp.target + targetBefore := comp.server err = comp.Update(tc.newArgs) require.NoError(t, err) - restarted := targetBefore != comp.target + restarted := targetBefore != comp.server require.Equal(t, restarted, tc.restartRequired) }) } @@ -239,7 +239,7 @@ func waitForServerToBeReady(t *testing.T, comp *Component) { require.Eventuallyf(t, func() bool { resp, err := http.Get(fmt.Sprintf( "http://%v/wrong/url", - comp.target.HTTPListenAddress(), + comp.server.HTTPListenAddress(), )) return err == nil && resp.StatusCode == 404 }, 5*time.Second, 20*time.Millisecond, "server failed to start before timeout") @@ -259,6 +259,6 @@ func newRegexp() alloy_relabel.Regexp { return alloy_relabel.Regexp{Regexp: re} } -func getEndpoint(target *herokutarget.HerokuTarget) string { +func getEndpoint(target *herokutarget.HerokuServer) string { return fmt.Sprintf("http://%s%s", target.HTTPListenAddress(), target.DrainEndpoint()) } diff --git a/internal/component/loki/source/heroku/internal/herokutarget/herokutarget.go b/internal/component/loki/source/heroku/internal/herokutarget/server.go similarity index 56% rename from internal/component/loki/source/heroku/internal/herokutarget/herokutarget.go rename to internal/component/loki/source/heroku/internal/herokutarget/server.go index ba671dff236..b0fba5a5fd1 100644 --- a/internal/component/loki/source/heroku/internal/herokutarget/herokutarget.go +++ b/internal/component/loki/source/heroku/internal/herokutarget/server.go @@ -8,6 +8,7 @@ import ( "fmt" "net/http" "strings" + "sync" "time" "github.com/go-kit/log" @@ -26,8 +27,8 @@ import ( const ReservedLabelTenantID = "__tenant_id__" -// HerokuDrainTargetConfig describes a scrape config to listen and consume heroku logs, in the HTTPS drain manner. -type HerokuDrainTargetConfig struct { +// HerokuConfig describes a scrape config to listen and consume heroku log. +type HerokuConfig struct { Server *fnet.ServerConfig // Labels optionally holds labels to associate with each record received on the push api. @@ -38,48 +39,63 @@ type HerokuDrainTargetConfig struct { UseIncomingTimestamp bool } -type HerokuTarget struct { - logger log.Logger - handler loki.EntryHandler - config *HerokuDrainTargetConfig +type HerokuServer struct { + logger log.Logger + handler loki.LogsBatchReceiver + + once sync.Once + forceShutdown chan struct{} + + config *HerokuConfig metrics *Metrics relabelConfigs []*relabel.Config server *fnet.TargetServer } -// NewHerokuTarget creates a brand new Heroku Drain target, capable of receiving logs from a Heroku application through an HTTP drain. -func NewHerokuTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandler, relabel []*relabel.Config, config *HerokuDrainTargetConfig, reg prometheus.Registerer) (*HerokuTarget, error) { - wrappedLogger := log.With(logger, "component", "heroku_drain") +// NewHerokuServer creates a new Heroku server, capable of receiving logs from a Heroku application through an HTTP drain endpoint. +func NewHerokuServer( + metrics *Metrics, + logger log.Logger, + handler loki.LogsBatchReceiver, + relabel []*relabel.Config, + config *HerokuConfig, + reg prometheus.Registerer, +) (*HerokuServer, error) { + + logger = log.With(logger, "component", "heroku_drain") - srv, err := fnet.NewTargetServer(wrappedLogger, "loki_source_heroku_drain_target", reg, config.Server) + srv, err := fnet.NewTargetServer(logger, "loki_source_heroku_drain_target", reg, config.Server) if err != nil { return nil, fmt.Errorf("failed to create loki server: %w", err) } - ht := &HerokuTarget{ + return &HerokuServer{ server: srv, metrics: metrics, - logger: wrappedLogger, + logger: logger, handler: handler, + forceShutdown: make(chan struct{}), config: config, relabelConfigs: relabel, - } + }, nil +} - err = ht.server.MountAndRun(func(router *mux.Router) { - router.Path(ht.DrainEndpoint()).Methods("POST").Handler(http.HandlerFunc(ht.drain)) - router.Path(ht.HealthyEndpoint()).Methods("GET").Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) })) +func (h *HerokuServer) Run() error { + return h.server.MountAndRun(func(router *mux.Router) { + router.Path(h.DrainEndpoint()).Methods("POST").Handler(http.HandlerFunc(h.drain)) + router.Path(h.HealthyEndpoint()).Methods("GET").Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) })) }) - if err != nil { - return nil, err - } - - return ht, nil } -func (h *HerokuTarget) drain(w http.ResponseWriter, r *http.Request) { - entries := h.handler.Chan() +func (h *HerokuServer) drain(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() herokuScanner := herokuEncoding.NewDrainScanner(r.Body) + + var ( + entries []loki.Entry + created = time.Now() + ) + for herokuScanner.Scan() { ts := time.Now() message := herokuScanner.Message() @@ -121,39 +137,54 @@ func (h *HerokuTarget) drain(w http.ResponseWriter, r *http.Request) { filtered[ReservedLabelTenantID] = model.LabelValue(tenantIDHeaderValue) } - entries <- loki.NewEntry(filtered, push.Entry{ + entries = append(entries, loki.NewEntryWithCreated(filtered, created, push.Entry{ Timestamp: ts, Line: message.Message, - }) - h.metrics.herokuEntries.Inc() + })) } - err := herokuScanner.Err() - if err != nil { + + if err := herokuScanner.Err(); err != nil { h.metrics.herokuErrors.Inc() level.Warn(h.logger).Log("msg", "failed to read incoming heroku request", "err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) return } + + numEntries := len(entries) + if numEntries > 0 { + select { + case h.handler.Chan() <- entries: + case <-r.Context().Done(): + w.WriteHeader(http.StatusServiceUnavailable) + return + case <-h.forceShutdown: + w.WriteHeader(http.StatusServiceUnavailable) + return + } + + h.metrics.herokuEntries.Add(float64(numEntries)) + } + w.WriteHeader(http.StatusNoContent) } -func (h *HerokuTarget) Labels() model.LabelSet { +func (h *HerokuServer) Labels() model.LabelSet { return h.config.Labels } -func (h *HerokuTarget) HTTPListenAddress() string { +func (h *HerokuServer) HTTPListenAddress() string { return h.server.HTTPListenAddr() } -func (h *HerokuTarget) DrainEndpoint() string { +func (h *HerokuServer) DrainEndpoint() string { return "/heroku/api/v1/drain" } -func (h *HerokuTarget) HealthyEndpoint() string { +func (h *HerokuServer) HealthyEndpoint() string { return "/healthy" } -func (h *HerokuTarget) Ready() bool { +func (h *HerokuServer) Ready() bool { req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s%s", h.HTTPListenAddress(), h.HealthyEndpoint()), nil) if err != nil { return false @@ -167,13 +198,27 @@ func (h *HerokuTarget) Ready() bool { return true } -func (h *HerokuTarget) Details() any { +func (h *HerokuServer) Details() any { return map[string]string{} } -func (h *HerokuTarget) Stop() error { - level.Info(h.logger).Log("msg", "stopping heroku drain target") +func (h *HerokuServer) Shutdown() { + level.Info(h.logger).Log("msg", "stopping heroku server") + // StopAndShutdown tries to gracefully shutdown. + // It will stop idle and incoming connections + // and try to wait for all in-flight connections + // to finish. If configured timeout `ServerGracefulShutdownTimeout` + // expired this call will be unblocked. + h.server.StopAndShutdown() + + // After we have tried a graceful shutdown we force all remaining in-flight + // requests to exit. + h.once.Do(func() { close(h.forceShutdown) }) +} + +// ForceShutdown will cancel all in-flight before starting server shutdown. +func (h *HerokuServer) ForceShutdown() { + level.Info(h.logger).Log("msg", "force shutdown of heroku server") + h.once.Do(func() { close(h.forceShutdown) }) h.server.StopAndShutdown() - h.handler.Stop() - return nil } diff --git a/internal/component/loki/source/heroku/internal/herokutarget/target_test.go b/internal/component/loki/source/heroku/internal/herokutarget/server_test.go similarity index 94% rename from internal/component/loki/source/heroku/internal/herokutarget/target_test.go rename to internal/component/loki/source/heroku/internal/herokutarget/server_test.go index 44643547e5f..8bc897e3898 100644 --- a/internal/component/loki/source/heroku/internal/herokutarget/target_test.go +++ b/internal/component/loki/source/heroku/internal/herokutarget/server_test.go @@ -60,7 +60,7 @@ func makeDrainRequest(host string, params map[string][]string, bodies ...string) return req, nil } -func TestHerokuDrainTarget(t *testing.T) { +func TestHerokuServer(t *testing.T) { w := log.NewSyncWriter(os.Stderr) logger := log.NewLogfmtLogger(w) @@ -275,12 +275,12 @@ func TestHerokuDrainTarget(t *testing.T) { } for name, tc := range cases { t.Run(name, func(t *testing.T) { - eh := loki.NewCollectingHandler() + eh := loki.NewCollectingBatchReceiver() defer eh.Stop() serverConfig, port, err := getServerConfigWithAvailablePort() require.NoError(t, err, "error generating server config or finding open port") - config := &HerokuDrainTargetConfig{ + config := &HerokuConfig{ Server: serverConfig, Labels: tc.args.Labels, UseIncomingTimestamp: false, @@ -288,11 +288,10 @@ func TestHerokuDrainTarget(t *testing.T) { prometheus.DefaultRegisterer = prometheus.NewRegistry() metrics := NewMetrics(prometheus.DefaultRegisterer) - pt, err := NewHerokuTarget(metrics, logger, eh, tc.args.RelabelConfigs, config, prometheus.DefaultRegisterer) + pt, err := NewHerokuServer(metrics, logger, eh, tc.args.RelabelConfigs, config, prometheus.DefaultRegisterer) require.NoError(t, err) - defer func() { - _ = pt.Stop() - }() + require.NoError(t, pt.Run()) + defer pt.Shutdown() // Clear received lines after test case is ran defer eh.Clear() @@ -331,16 +330,16 @@ func TestHerokuDrainTarget(t *testing.T) { } } -func TestHerokuDrainTarget_UseIncomingTimestamp(t *testing.T) { +func TestHerokuServer_UseIncomingTimestamp(t *testing.T) { w := log.NewSyncWriter(os.Stderr) logger := log.NewLogfmtLogger(w) - eh := loki.NewCollectingHandler() + eh := loki.NewCollectingBatchReceiver() defer eh.Stop() serverConfig, port, err := getServerConfigWithAvailablePort() require.NoError(t, err, "error generating server config or finding open port") - config := &HerokuDrainTargetConfig{ + config := &HerokuConfig{ Server: serverConfig, Labels: nil, UseIncomingTimestamp: true, @@ -348,11 +347,10 @@ func TestHerokuDrainTarget_UseIncomingTimestamp(t *testing.T) { prometheus.DefaultRegisterer = prometheus.NewRegistry() metrics := NewMetrics(prometheus.DefaultRegisterer) - pt, err := NewHerokuTarget(metrics, logger, eh, nil, config, prometheus.DefaultRegisterer) + pt, err := NewHerokuServer(metrics, logger, eh, nil, config, prometheus.DefaultRegisterer) require.NoError(t, err) - defer func() { - _ = pt.Stop() - }() + require.NoError(t, pt.Run()) + defer pt.Shutdown() // Clear received lines after test case is ran defer eh.Clear() @@ -373,17 +371,17 @@ func TestHerokuDrainTarget_UseIncomingTimestamp(t *testing.T) { require.Equal(t, expectedTs, eh.Received()[0].Timestamp, "expected entry timestamp to be overridden by received one") } -func TestHerokuDrainTarget_UseTenantIDHeaderIfPresent(t *testing.T) { +func TestHerokuServer_UseTenantIDHeaderIfPresent(t *testing.T) { w := log.NewSyncWriter(os.Stderr) logger := log.NewLogfmtLogger(w) // Create fake promtail client - eh := loki.NewCollectingHandler() + eh := loki.NewCollectingBatchReceiver() defer eh.Stop() serverConfig, port, err := getServerConfigWithAvailablePort() require.NoError(t, err, "error generating server config or finding open port") - config := &HerokuDrainTargetConfig{ + config := &HerokuConfig{ Server: serverConfig, Labels: nil, UseIncomingTimestamp: true, @@ -401,11 +399,11 @@ func TestHerokuDrainTarget_UseTenantIDHeaderIfPresent(t *testing.T) { NameValidationScheme: model.LegacyValidation, }, } - pt, err := NewHerokuTarget(metrics, logger, eh, tenantIDRelabelConfig, config, prometheus.DefaultRegisterer) + pt, err := NewHerokuServer(metrics, logger, eh, tenantIDRelabelConfig, config, prometheus.DefaultRegisterer) require.NoError(t, err) - defer func() { - _ = pt.Stop() - }() + require.NoError(t, pt.Run()) + + defer pt.Shutdown() // Clear received lines after test case is ran defer eh.Clear() @@ -426,7 +424,7 @@ func TestHerokuDrainTarget_UseTenantIDHeaderIfPresent(t *testing.T) { require.Equal(t, model.LabelValue("42"), eh.Received()[0].Labels["tenant_id"]) } -func waitForMessages(eh *loki.CollectingHandler) { +func waitForMessages(eh *loki.CollectingBatchReceiver) { countdown := 1000 for len(eh.Received()) != 1 && countdown > 0 { time.Sleep(1 * time.Millisecond)