diff --git a/CHANGELOG.md b/CHANGELOG.md index ebbd26e1c6d..6f24a497b08 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,10 @@ Main (unreleased) - update promtail converter to use `file_match` block for `loki.source.file` instead of going through `local.file_match`. (@kalleep) +### Bugfixes + +- `loki.source.api` no longer drops request when relabel rules drops a specific stream. (@kalleep) + v1.12.0-rc.0 ----------------- diff --git a/docs/sources/reference/components/loki/loki.source.api.md b/docs/sources/reference/components/loki/loki.source.api.md index 0cee4b0bad5..29330111ba2 100644 --- a/docs/sources/reference/components/loki/loki.source.api.md +++ b/docs/sources/reference/components/loki/loki.source.api.md @@ -105,6 +105,7 @@ The metrics include labels such as `status_code` where relevant, which can be us * `loki_source_api_request_message_bytes` (histogram): Size (in bytes) of messages received in the request. * `loki_source_api_response_message_bytes` (histogram): Size (in bytes) of messages sent in response. * `loki_source_api_tcp_connections` (gauge): Current number of accepted TCP connections. +* `loki_source_api_entries_written` (counter): Total number of log entries forwarded. ## Example diff --git a/internal/component/common/loki/entry.go b/internal/component/common/loki/entry.go new file mode 100644 index 00000000000..ccc945257f8 --- /dev/null +++ b/internal/component/common/loki/entry.go @@ -0,0 +1,20 @@ +package loki + +import ( + "github.com/grafana/loki/pkg/push" + "github.com/prometheus/common/model" +) + +// Entry is a log entry with labels. +type Entry struct { + Labels model.LabelSet + push.Entry +} + +// Clone returns a copy of the entry so that it can be safely fanned out. +func (e *Entry) Clone() Entry { + return Entry{ + Labels: e.Labels.Clone(), + Entry: e.Entry, + } +} diff --git a/internal/component/common/loki/types.go b/internal/component/common/loki/entry_handler.go similarity index 76% rename from internal/component/common/loki/types.go rename to internal/component/common/loki/entry_handler.go index ef913a0266a..5a076cec0f6 100644 --- a/internal/component/common/loki/types.go +++ b/internal/component/common/loki/entry_handler.go @@ -9,7 +9,6 @@ import ( "sync" "time" - "github.com/grafana/loki/pkg/push" "github.com/prometheus/common/model" ) @@ -21,68 +20,6 @@ import ( // to an outage or erroring (such as limits being hit). const finalEntryTimeout = 5 * time.Second -// LogReceiverOption is an option argument passed to NewLogsReceiver. -type LogReceiverOption func(*logsReceiver) - -func WithChannel(c chan Entry) LogReceiverOption { - return func(l *logsReceiver) { - l.entries = c - } -} - -func WithComponentID(id string) LogReceiverOption { - return func(l *logsReceiver) { - l.componentID = id - } -} - -// LogsReceiver is an interface providing `chan Entry` which is used for component -// communication. -type LogsReceiver interface { - Chan() chan Entry -} - -type logsReceiver struct { - entries chan Entry - componentID string -} - -func (l *logsReceiver) Chan() chan Entry { - return l.entries -} - -func (l *logsReceiver) String() string { - return l.componentID + ".receiver" -} - -func NewLogsReceiver(opts ...LogReceiverOption) LogsReceiver { - l := &logsReceiver{} - - for _, o := range opts { - o(l) - } - - if l.entries == nil { - l.entries = make(chan Entry) - } - - return l -} - -// Entry is a log entry with labels. -type Entry struct { - Labels model.LabelSet - push.Entry -} - -// Clone returns a copy of the entry so that it can be safely fanned out. -func (e *Entry) Clone() Entry { - return Entry{ - Labels: e.Labels.Clone(), - Entry: e.Entry, - } -} - // EntryHandler is something that can "handle" entries via a channel. // Stop must be called to gracefully shut down the EntryHandler type EntryHandler interface { diff --git a/internal/component/common/loki/types_test.go b/internal/component/common/loki/entry_handler_test.go similarity index 100% rename from internal/component/common/loki/types_test.go rename to internal/component/common/loki/entry_handler_test.go diff --git a/internal/component/common/loki/receiver.go b/internal/component/common/loki/receiver.go new file mode 100644 index 00000000000..77b94eac920 --- /dev/null +++ b/internal/component/common/loki/receiver.go @@ -0,0 +1,67 @@ +package loki + +// LogReceiverOption is an option argument passed to NewLogsReceiver. +type LogReceiverOption func(*logsReceiver) + +func WithChannel(c chan Entry) LogReceiverOption { + return func(l *logsReceiver) { + l.entries = c + } +} + +func WithComponentID(id string) LogReceiverOption { + return func(l *logsReceiver) { + l.componentID = id + } +} + +// LogsReceiver is an interface providing `chan Entry` which is used for component +// communication. +type LogsReceiver interface { + Chan() chan Entry +} + +type logsReceiver struct { + entries chan Entry + componentID string +} + +func (l *logsReceiver) Chan() chan Entry { + return l.entries +} + +func (l *logsReceiver) String() string { + return l.componentID + ".receiver" +} + +func NewLogsReceiver(opts ...LogReceiverOption) LogsReceiver { + l := &logsReceiver{} + + for _, o := range opts { + o(l) + } + + if l.entries == nil { + l.entries = make(chan Entry) + } + + return l +} + +// LogsBatchReceiver is an interface providing `chan []Entry`. This should be used when +// multiple entries need to be sent over a channel. +type LogsBatchReceiver interface { + Chan() chan []Entry +} + +func NewLogsBatchReceiver() LogsBatchReceiver { + return &logsBatchReceiver{c: make(chan []Entry)} +} + +type logsBatchReceiver struct { + c chan []Entry +} + +func (l *logsBatchReceiver) Chan() chan []Entry { + return l.c +} diff --git a/internal/component/loki/source/api/api.go b/internal/component/loki/source/api/api.go index a52c149507a..7b301036679 100644 --- a/internal/component/loki/source/api/api.go +++ b/internal/component/loki/source/api/api.go @@ -57,7 +57,7 @@ func (a *Arguments) labelSet() model.LabelSet { type Component struct { opts component.Options - handler loki.LogsReceiver + handler loki.LogsBatchReceiver uncheckedCollector *util.UncheckedCollector serverMut sync.Mutex @@ -72,7 +72,7 @@ type Component struct { func New(opts component.Options, args Arguments) (*Component, error) { c := &Component{ opts: opts, - handler: loki.NewLogsReceiver(), + handler: loki.NewLogsBatchReceiver(), receivers: args.ForwardTo, uncheckedCollector: util.NewUncheckedCollector(nil), } @@ -86,23 +86,30 @@ func New(opts component.Options, args Arguments) (*Component, error) { func (c *Component) Run(ctx context.Context) (err error) { defer func() { - c.stop() + c.serverMut.Lock() + defer c.serverMut.Unlock() + if c.server != nil { + // We want to cancel all in-flight request when component stops. + c.server.ForceShutdown() + c.server = nil + } }() for { select { - case entry := <-c.handler.Chan(): + case entries := <-c.handler.Chan(): c.receiversMut.RLock() - receivers := c.receivers - c.receiversMut.RUnlock() - - for _, receiver := range receivers { - select { - case receiver.Chan() <- entry: - case <-ctx.Done(): - return + for _, entry := range entries { + for _, receiver := range c.receivers { + select { + case receiver.Chan() <- entry: + case <-ctx.Done(): + c.receiversMut.RUnlock() + return + } } } + c.receiversMut.RUnlock() case <-ctx.Done(): return } @@ -164,12 +171,3 @@ func (c *Component) Update(args component.Arguments) error { return nil } - -func (c *Component) stop() { - c.serverMut.Lock() - defer c.serverMut.Unlock() - if c.server != nil { - c.server.Shutdown() - c.server = nil - } -} diff --git a/internal/component/loki/source/api/api_test.go b/internal/component/loki/source/api/api_test.go index 4e10648c330..da383cd5d6d 100644 --- a/internal/component/loki/source/api/api_test.go +++ b/internal/component/loki/source/api/api_test.go @@ -105,8 +105,7 @@ func TestLokiSourceAPI_Simple(t *testing.T) { a.UseIncomingTimestamp = true }) opts := defaultOptions() - _, shutdown := startTestComponent(t, opts, args, ctx) - defer shutdown() + _ = startTestComponent(t, opts, args, ctx) lokiClient := newTestLokiClient(t, args, opts) defer lokiClient.Stop() @@ -152,8 +151,7 @@ func TestLokiSourceAPI_Update(t *testing.T) { a.Labels = map[string]string{"test_label": "before"} }) opts := defaultOptions() - c, shutdown := startTestComponent(t, opts, args, ctx) - defer shutdown() + c := startTestComponent(t, opts, args, ctx) lokiClient := newTestLokiClient(t, args, opts) defer lokiClient.Stop() @@ -219,7 +217,7 @@ func TestLokiSourceAPI_FanOut(t *testing.T) { const receiversCount = 10 var receivers = make([]*fake.Client, receiversCount) - for i := 0; i < receiversCount; i++ { + for i := range receiversCount { receivers[i] = fake.NewClient(func() {}) } @@ -236,8 +234,6 @@ func TestLokiSourceAPI_FanOut(t *testing.T) { require.NoError(t, err) }() - defer comp.stop() - lokiClient := newTestLokiClient(t, args, opts) defer lokiClient.Stop() @@ -344,25 +340,19 @@ func TestComponent_detectsWhenUpdateRequiresARestart(t *testing.T) { } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - comp, err := New( - defaultOptions(), - tc.args, - ) - require.NoError(t, err) + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() - // in order to cleanly update, we want to make sure the server is running first. - waitForServerToBeReady(t, comp) + comp := startTestComponent(t, defaultOptions(), tc.args, ctx) serverBefore := comp.server - err = comp.Update(tc.newArgs) - require.NoError(t, err) + require.NoError(t, comp.Update(tc.newArgs)) restarted := serverBefore != comp.server assert.Equal(t, restarted, tc.restartRequired) // in order to cleanly shutdown, we want to make sure the server is running first. waitForServerToBeReady(t, comp) - comp.stop() }) } } @@ -388,8 +378,7 @@ func TestLokiSourceAPI_TLS(t *testing.T) { a.UseIncomingTimestamp = true }) opts := defaultOptions() - _, shutdown := startTestComponent(t, opts, args, ctx) - defer shutdown() + _ = startTestComponent(t, opts, args, ctx) // Create TLS-enabled Loki client lokiClient := newTestLokiClientTLS(t, args, opts) @@ -457,6 +446,13 @@ func TestDefaultServerConfig(t *testing.T) { defaultOptions(), args, ) + + ctx := t.Context() + go func() { + err := comp.Run(ctx) + require.NoError(t, err) + }() + require.NoError(t, err) require.Eventuallyf(t, func() bool { @@ -467,8 +463,6 @@ func TestDefaultServerConfig(t *testing.T) { )) return err == nil && resp.StatusCode == 404 }, 5*time.Second, 20*time.Millisecond, "server failed to start before timeout") - - comp.stop() } func startTestComponent( @@ -476,7 +470,7 @@ func startTestComponent( opts component.Options, args Arguments, ctx context.Context, -) (component.Component, func()) { +) *Component { comp, err := New(opts, args) require.NoError(t, err) @@ -485,11 +479,8 @@ func startTestComponent( require.NoError(t, err) }() - return comp, func() { - // in order to cleanly shutdown, we want to make sure the server is running first. - waitForServerToBeReady(t, comp) - comp.stop() - } + waitForServerToBeReady(t, comp) + return comp } func TestShutdown(t *testing.T) { diff --git a/internal/component/loki/source/api/internal/lokipush/metrics.go b/internal/component/loki/source/api/internal/lokipush/metrics.go new file mode 100644 index 00000000000..1405ed3cf47 --- /dev/null +++ b/internal/component/loki/source/api/internal/lokipush/metrics.go @@ -0,0 +1,21 @@ +package lokipush + +import ( + "github.com/grafana/alloy/internal/util" + "github.com/prometheus/client_golang/prometheus" +) + +func newMetircs(reg prometheus.Registerer) *metrics { + m := &metrics{ + entriesWritten: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "loki_source_api_entries_written", + Help: "Total number of entries written.", + }), + } + m.entriesWritten = util.MustRegisterOrGet(reg, m.entriesWritten).(prometheus.Counter) + return m +} + +type metrics struct { + entriesWritten prometheus.Counter +} diff --git a/internal/component/loki/source/api/internal/lokipush/push_api_server.go b/internal/component/loki/source/api/internal/lokipush/push_api_server.go index 5f4c9708bca..122d17000a6 100644 --- a/internal/component/loki/source/api/internal/lokipush/push_api_server.go +++ b/internal/component/loki/source/api/internal/lokipush/push_api_server.go @@ -30,10 +30,13 @@ import ( ) type PushAPIServer struct { - logger log.Logger - serverConfig *fnet.ServerConfig - server *fnet.TargetServer - handler loki.LogsReceiver + logger log.Logger + serverConfig *fnet.ServerConfig + server *fnet.TargetServer + handler loki.LogsBatchReceiver + metrics *metrics + + once sync.Once forceShutdown chan struct{} rwMutex sync.RWMutex @@ -45,7 +48,7 @@ type PushAPIServer struct { func NewPushAPIServer(logger log.Logger, serverConfig *fnet.ServerConfig, - handler loki.LogsReceiver, + handler loki.LogsBatchReceiver, registerer prometheus.Registerer, maxSendMessageSize int64, ) (*PushAPIServer, error) { @@ -59,6 +62,7 @@ func NewPushAPIServer(logger log.Logger, logger: logger, serverConfig: serverConfig, handler: handler, + metrics: newMetircs(registerer), forceShutdown: make(chan struct{}), maxSendMessageSize: maxSendMessageSize, } @@ -126,7 +130,14 @@ func (s *PushAPIServer) Shutdown() { // After we have tried a graceful shutdown we force all remaining in-flight // requests to exit. - close(s.forceShutdown) + s.once.Do(func() { close(s.forceShutdown) }) +} + +// ForceShutdown will cancel all in-flight before starting server shutdown. +func (s *PushAPIServer) ForceShutdown() { + level.Info(s.logger).Log("msg", "force shutdown of push API server") + s.once.Do(func() { close(s.forceShutdown) }) + s.server.StopAndShutdown() } func (s *PushAPIServer) SetLabels(labels model.LabelSet) { @@ -199,7 +210,10 @@ func (s *PushAPIServer) handleLoki(w http.ResponseWriter, r *http.Request) { relabelRules := s.getRelabelRules() keepTimestamp := s.getKeepTimestamp() - var lastErr error + var ( + entries []loki.Entry + lastErr error + ) for _, stream := range req.Streams { ls, err := promql_parser.ParseMetric(stream.Labels) if err != nil { @@ -217,8 +231,7 @@ func (s *PushAPIServer) handleLoki(w http.ResponseWriter, r *http.Request) { // Apply relabeling processed, keep := relabel.Process(lb.Labels(), relabelRules...) if !keep || processed.Len() == 0 { - w.WriteHeader(http.StatusNoContent) - return + continue } // Convert to model.LabelSet @@ -250,22 +263,29 @@ func (s *PushAPIServer) handleLoki(w http.ResponseWriter, r *http.Request) { e.Timestamp = time.Now() } - select { - case s.handler.Chan() <- e: - case <-r.Context().Done(): - w.WriteHeader(http.StatusServiceUnavailable) - return - case <-s.forceShutdown: - w.WriteHeader(http.StatusServiceUnavailable) - return - } + entries = append(entries, e) } } - if lastErr != nil { - level.Warn(s.logger).Log("msg", "at least one entry in the push request failed to process", "err", lastErr.Error()) - http.Error(w, lastErr.Error(), http.StatusBadRequest) - return + numEntries := len(entries) + if numEntries > 0 { + select { + case s.handler.Chan() <- entries: + case <-r.Context().Done(): + w.WriteHeader(http.StatusServiceUnavailable) + return + case <-s.forceShutdown: + w.WriteHeader(http.StatusServiceUnavailable) + return + } + + s.metrics.entriesWritten.Add(float64(numEntries)) + + if lastErr != nil { + level.Warn(s.logger).Log("msg", "at least one entry in the push request failed to process", "err", lastErr.Error()) + http.Error(w, lastErr.Error(), http.StatusBadRequest) + return + } } w.WriteHeader(http.StatusNoContent) @@ -277,6 +297,9 @@ func (s *PushAPIServer) handlePlaintext(w http.ResponseWriter, r *http.Request) defer r.Body.Close() body := bufio.NewReader(r.Body) addLabels := s.getLabels() + + var entries []loki.Entry + for { line, err := body.ReadString('\n') if err != nil && err != io.EOF { @@ -292,10 +315,16 @@ func (s *PushAPIServer) handlePlaintext(w http.ResponseWriter, r *http.Request) continue } - entry := loki.Entry{Labels: addLabels, Entry: lokipush.Entry{Timestamp: time.Now(), Line: line}} + entries = append(entries, loki.Entry{Labels: addLabels, Entry: lokipush.Entry{Timestamp: time.Now(), Line: line}}) + if err == io.EOF { + break + } + } + numEntries := len(entries) + if numEntries > 0 { select { - case s.handler.Chan() <- entry: + case s.handler.Chan() <- entries: case <-r.Context().Done(): w.WriteHeader(http.StatusServiceUnavailable) return @@ -303,10 +332,7 @@ func (s *PushAPIServer) handlePlaintext(w http.ResponseWriter, r *http.Request) w.WriteHeader(http.StatusServiceUnavailable) return } - - if err == io.EOF { - break - } + s.metrics.entriesWritten.Add(float64(numEntries)) } w.WriteHeader(http.StatusNoContent) diff --git a/internal/component/loki/source/api/internal/lokipush/push_api_server_test.go b/internal/component/loki/source/api/internal/lokipush/push_api_server_test.go index cfdf1528662..37e357f8bce 100644 --- a/internal/component/loki/source/api/internal/lokipush/push_api_server_test.go +++ b/internal/component/loki/source/api/internal/lokipush/push_api_server_test.go @@ -11,6 +11,7 @@ import ( "net" "net/http" "strconv" + "sync" "testing" "time" @@ -24,12 +25,49 @@ import ( "github.com/grafana/alloy/internal/component/common/loki" "github.com/grafana/alloy/internal/component/common/loki/client" - "github.com/grafana/alloy/internal/component/common/loki/client/fake" fnet "github.com/grafana/alloy/internal/component/common/net" frelabel "github.com/grafana/alloy/internal/component/common/relabel" "github.com/grafana/alloy/syntax" ) +type fakeBatchReceiver struct { + entries chan []loki.Entry + received []loki.Entry + mtx sync.Mutex + wg sync.WaitGroup +} + +func newFakeBatchReceiver() *fakeBatchReceiver { + c := &fakeBatchReceiver{ + entries: make(chan []loki.Entry), + } + c.wg.Go(func() { + for batch := range c.entries { + c.mtx.Lock() + c.received = append(c.received, batch...) + c.mtx.Unlock() + } + }) + return c +} + +func (c *fakeBatchReceiver) Chan() chan []loki.Entry { + return c.entries +} + +func (c *fakeBatchReceiver) Received() []loki.Entry { + c.mtx.Lock() + defer c.mtx.Unlock() + cpy := make([]loki.Entry, len(c.received)) + copy(cpy, c.received) + return cpy +} + +func (c *fakeBatchReceiver) Stop() { + close(c.entries) + c.wg.Wait() +} + const localhost = "127.0.0.1" func TestLokiPushTarget(t *testing.T) { @@ -287,7 +325,7 @@ regex = "dropme" func TestPlaintextPushTarget(t *testing.T) { logger := log.NewNopLogger() //Create PushAPIServerOld - eh := fake.NewClient(func() {}) + eh := newFakeBatchReceiver() defer eh.Stop() // Get a randomly available port by open and closing a TCP socket @@ -307,7 +345,7 @@ func TestPlaintextPushTarget(t *testing.T) { GRPC: &fnet.GRPCConfig{ListenPort: getFreePort(t)}, } - pt, err := NewPushAPIServer(logger, serverConfig, eh.LogsReceiver(), prometheus.NewRegistry(), 0) + pt, err := NewPushAPIServer(logger, serverConfig, eh, prometheus.NewRegistry(), 0) require.NoError(t, err) err = pt.Run() @@ -356,7 +394,8 @@ func TestPlaintextPushTarget(t *testing.T) { func TestPlaintextPushTargetWithXScopeOrgIDHeader(t *testing.T) { logger := log.NewNopLogger() //Create PushAPIServerOld - eh := fake.NewClient(func() {}) + + eh := newFakeBatchReceiver() defer eh.Stop() // Get a randomly available port by open and closing a TCP socket @@ -376,7 +415,7 @@ func TestPlaintextPushTargetWithXScopeOrgIDHeader(t *testing.T) { GRPC: &fnet.GRPCConfig{ListenPort: getFreePort(t)}, } - pt, err := NewPushAPIServer(logger, serverConfig, eh.LogsReceiver(), prometheus.NewRegistry(), 0) + pt, err := NewPushAPIServer(logger, serverConfig, eh, prometheus.NewRegistry(), 0) require.NoError(t, err) err = pt.Run() @@ -433,10 +472,6 @@ func TestPlaintextPushTargetWithXScopeOrgIDHeader(t *testing.T) { func TestReady(t *testing.T) { logger := log.NewNopLogger() - //Create PushAPIServerOld - eh := fake.NewClient(func() {}) - defer eh.Stop() - // Get a randomly available port by open and closing a TCP socket addr, err := net.ResolveTCPAddr("tcp", localhost+":0") require.NoError(t, err) @@ -454,7 +489,7 @@ func TestReady(t *testing.T) { GRPC: &fnet.GRPCConfig{ListenPort: getFreePort(t)}, } - pt, err := NewPushAPIServer(logger, serverConfig, eh.LogsReceiver(), prometheus.NewRegistry(), 100<<20) + pt, err := NewPushAPIServer(logger, serverConfig, nil, prometheus.NewRegistry(), 100<<20) require.NoError(t, err) err = pt.Run() @@ -497,9 +532,9 @@ func getFreePort(t *testing.T) int { return port } -func createPushServer(t *testing.T, logger log.Logger) (*PushAPIServer, int, *fake.Client) { +func createPushServer(t *testing.T, logger log.Logger) (*PushAPIServer, int, *fakeBatchReceiver) { //Create PushAPIServerOld - eh := fake.NewClient(func() {}) + eh := newFakeBatchReceiver() t.Cleanup(func() { eh.Stop() }) @@ -515,7 +550,7 @@ func createPushServer(t *testing.T, logger log.Logger) (*PushAPIServer, int, *fa GRPC: &fnet.GRPCConfig{ListenPort: getFreePort(t)}, } - pt, err := NewPushAPIServer(logger, serverConfig, eh.LogsReceiver(), prometheus.NewRegistry(), 100<<20) + pt, err := NewPushAPIServer(logger, serverConfig, eh, prometheus.NewRegistry(), 100<<20) require.NoError(t, err) err = pt.Run()