Skip to content
50 changes: 50 additions & 0 deletions internal/component/common/loki/receiver.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package loki

import "sync"

// LogReceiverOption is an option argument passed to NewLogsReceiver.
type LogReceiverOption func(*logsReceiver)

Expand Down Expand Up @@ -65,3 +67,51 @@ type logsBatchReceiver struct {
func (l *logsBatchReceiver) Chan() chan []Entry {
return l.c
}

func NewCollectingBatchReceiver() *CollectingBatchReceiver {
c := &CollectingBatchReceiver{
entries: make(chan []Entry),
}
c.wg.Go(func() {
for batch := range c.entries {
c.mtx.Lock()
c.received = append(c.received, batch...)
c.mtx.Unlock()
}
})
return c
}

// CollectingBatchReceiver is a LogsBatchReceiver that will
// collect all received entries so it can later be inspected.
// Used in tests.
type CollectingBatchReceiver struct {
Comment on lines +85 to +88
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to move this in to one of the test packages?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah maybe, currently I am just preparing to rebuild the pipeline so these should eventually be removed

entries chan []Entry
received []Entry
mtx sync.Mutex
wg sync.WaitGroup
once sync.Once
}

func (c *CollectingBatchReceiver) Chan() chan []Entry {
return c.entries
}

func (c *CollectingBatchReceiver) Received() []Entry {
c.mtx.Lock()
defer c.mtx.Unlock()
cpy := make([]Entry, len(c.received))
copy(cpy, c.received)
return cpy
}

func (c *CollectingBatchReceiver) Clear() {
c.mtx.Lock()
defer c.mtx.Unlock()
c.received = []Entry{}
}

func (c *CollectingBatchReceiver) Stop() {
c.once.Do(func() { close(c.entries) })
c.wg.Wait()
}
4 changes: 2 additions & 2 deletions internal/component/loki/source/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ func (c *Component) Update(args component.Arguments) error {
if err != nil {
return fmt.Errorf("failed to create embedded server: %v", err)
}
err = c.server.Run()
if err != nil {

if err = c.server.Run(); err != nil {
return fmt.Errorf("failed to run embedded server: %v", err)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"net"
"net/http"
"strconv"
"sync"
"testing"
"time"

Expand All @@ -30,44 +29,6 @@ import (
"github.com/grafana/alloy/syntax"
)

type fakeBatchReceiver struct {
entries chan []loki.Entry
received []loki.Entry
mtx sync.Mutex
wg sync.WaitGroup
}

func newFakeBatchReceiver() *fakeBatchReceiver {
c := &fakeBatchReceiver{
entries: make(chan []loki.Entry),
}
c.wg.Go(func() {
for batch := range c.entries {
c.mtx.Lock()
c.received = append(c.received, batch...)
c.mtx.Unlock()
}
})
return c
}

func (c *fakeBatchReceiver) Chan() chan []loki.Entry {
return c.entries
}

func (c *fakeBatchReceiver) Received() []loki.Entry {
c.mtx.Lock()
defer c.mtx.Unlock()
cpy := make([]loki.Entry, len(c.received))
copy(cpy, c.received)
return cpy
}

func (c *fakeBatchReceiver) Stop() {
close(c.entries)
c.wg.Wait()
}

const localhost = "127.0.0.1"

func TestLokiPushTarget(t *testing.T) {
Expand Down Expand Up @@ -318,8 +279,7 @@ regex = "dropme"

func TestPlaintextPushTarget(t *testing.T) {
logger := log.NewNopLogger()
//Create PushAPIServerOld
eh := newFakeBatchReceiver()
eh := loki.NewCollectingBatchReceiver()
defer eh.Stop()

// Get a randomly available port by open and closing a TCP socket
Expand Down Expand Up @@ -387,9 +347,7 @@ func TestPlaintextPushTarget(t *testing.T) {

func TestPlaintextPushTargetWithXScopeOrgIDHeader(t *testing.T) {
logger := log.NewNopLogger()
//Create PushAPIServerOld

eh := newFakeBatchReceiver()
eh := loki.NewCollectingBatchReceiver()
defer eh.Stop()

// Get a randomly available port by open and closing a TCP socket
Expand Down Expand Up @@ -526,9 +484,8 @@ func getFreePort(t *testing.T) int {
return port
}

func createPushServer(t *testing.T, logger log.Logger) (*PushAPIServer, int, *fakeBatchReceiver) {
//Create PushAPIServerOld
eh := newFakeBatchReceiver()
func createPushServer(t *testing.T, logger log.Logger) (*PushAPIServer, int, *loki.CollectingBatchReceiver) {
eh := loki.NewCollectingBatchReceiver()
t.Cleanup(func() {
eh.Stop()
})
Expand Down
69 changes: 29 additions & 40 deletions internal/component/loki/source/heroku/heroku.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package heroku

import (
"context"
"fmt"
"reflect"
"sync"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/common/loki"
fnet "github.com/grafana/alloy/internal/component/common/net"
alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel"
"github.com/grafana/alloy/internal/component/loki/source"
ht "github.com/grafana/alloy/internal/component/loki/source/heroku/internal/herokutarget"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/logging/level"
Expand Down Expand Up @@ -53,12 +55,13 @@ type Component struct {
metrics *ht.Metrics // Metrics about Heroku entries.
serverMetrics *util.UncheckedCollector // Metircs about the HTTP server managed by the component.

mut sync.RWMutex
args Arguments
fanout []loki.LogsReceiver
target *ht.HerokuTarget
mut sync.RWMutex
args Arguments

handler loki.LogsReceiver
fanout *loki.Fanout
server *ht.HerokuServer

handler loki.LogsBatchReceiver
}

// New creates a new loki.source.heroku component.
Expand All @@ -68,9 +71,8 @@ func New(o component.Options, args Arguments) (*Component, error) {
metrics: ht.NewMetrics(o.Registerer),
mut: sync.RWMutex{},
args: Arguments{},
fanout: args.ForwardTo,
target: nil,
handler: loki.NewLogsReceiver(),
fanout: loki.NewFanout(args.ForwardTo),
handler: loki.NewLogsBatchReceiver(),
serverMetrics: util.NewUncheckedCollector(nil),
}

Expand All @@ -91,26 +93,13 @@ func (c *Component) Run(ctx context.Context) error {
defer c.mut.Unlock()

level.Info(c.opts.Logger).Log("msg", "loki.source.heroku component shutting down, stopping listener")
if c.target != nil {
err := c.target.Stop()
if err != nil {
level.Error(c.opts.Logger).Log("msg", "error while stopping heroku listener", "err", err)
}
if c.server != nil {
c.server.ForceShutdown()
}
}()

for {
select {
case <-ctx.Done():
return nil
case entry := <-c.handler.Chan():
c.mut.RLock()
for _, receiver := range c.fanout {
receiver.Chan() <- entry
}
c.mut.RUnlock()
}
}
source.ConsumeBatch(ctx, c.handler, c.fanout)
return nil
}

// Update implements component.Component.
Expand All @@ -119,7 +108,7 @@ func (c *Component) Update(args component.Arguments) error {
defer c.mut.Unlock()

newArgs := args.(Arguments)
c.fanout = newArgs.ForwardTo
c.fanout.UpdateChildren(newArgs.ForwardTo)

var rcs []*relabel.Config
if len(newArgs.RelabelRules) > 0 {
Expand All @@ -130,12 +119,10 @@ func (c *Component) Update(args component.Arguments) error {
changed(c.args.RelabelRules, newArgs.RelabelRules) ||
changed(c.args.Labels, newArgs.Labels) ||
c.args.UseIncomingTimestamp != newArgs.UseIncomingTimestamp

if restartRequired {
if c.target != nil {
err := c.target.Stop()
if err != nil {
level.Error(c.opts.Logger).Log("msg", "error while stopping heroku listener", "err", err)
}
if c.server != nil {
c.server.Shutdown()
}

// [ht.NewHerokuTarget] registers new metrics every time it is called. To
Expand All @@ -145,28 +132,30 @@ func (c *Component) Update(args component.Arguments) error {
registry := prometheus.NewRegistry()
c.serverMetrics.SetCollector(registry)

entryHandler := loki.NewEntryHandler(c.handler.Chan(), func() {})
t, err := ht.NewHerokuTarget(c.metrics, c.opts.Logger, entryHandler, rcs, newArgs.Convert(), registry)
server, err := ht.NewHerokuServer(c.metrics, c.opts.Logger, c.handler, rcs, newArgs.Convert(), registry)
if err != nil {
level.Error(c.opts.Logger).Log("msg", "failed to create heroku listener with provided config", "err", err)
return err
return fmt.Errorf("failed to create heroku server: %w", err)
}

if err := server.Run(); err != nil {
return fmt.Errorf("failed to run heroku server: %w", err)
}

c.target = t
c.server = server
c.args = newArgs
}

return nil
}

// Convert is used to bridge between the Alloy and Promtail types.
func (args *Arguments) Convert() *ht.HerokuDrainTargetConfig {
func (args *Arguments) Convert() *ht.HerokuConfig {
lbls := make(model.LabelSet, len(args.Labels))
for k, v := range args.Labels {
lbls[model.LabelName(k)] = model.LabelValue(v)
}

return &ht.HerokuDrainTargetConfig{
return &ht.HerokuConfig{
Server: args.Server,
Labels: lbls,
UseIncomingTimestamp: args.UseIncomingTimestamp,
Expand All @@ -179,8 +168,8 @@ func (c *Component) DebugInfo() any {
defer c.mut.RUnlock()

var res = readerDebugInfo{
Ready: c.target.Ready(),
Address: c.target.HTTPListenAddress(),
Ready: c.server.Ready(),
Address: c.server.HTTPListenAddress(),
}

return res
Expand Down
12 changes: 6 additions & 6 deletions internal/component/loki/source/heroku/heroku_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestPush(t *testing.T) {
waitForServerToBeReady(t, c)

// Create a Heroku Drain Request and send it to the launched server.
req, err := http.NewRequest(http.MethodPost, getEndpoint(c.target), strings.NewReader(testPayload))
req, err := http.NewRequest(http.MethodPost, getEndpoint(c.server), strings.NewReader(testPayload))
require.NoError(t, err)

res, err := http.DefaultClient.Do(req)
Expand Down Expand Up @@ -146,17 +146,17 @@ func TestUpdate_detectsWhenTargetRequiresARestart(t *testing.T) {
defer func() {
// in order to cleanly shutdown, we want to make sure the server is running first.
waitForServerToBeReady(t, comp)
require.NoError(t, comp.target.Stop())
comp.server.Shutdown()
}()

// in order to cleanly update, we want to make sure the server is running first.
waitForServerToBeReady(t, comp)

targetBefore := comp.target
targetBefore := comp.server
err = comp.Update(tc.newArgs)
require.NoError(t, err)

restarted := targetBefore != comp.target
restarted := targetBefore != comp.server
require.Equal(t, restarted, tc.restartRequired)
})
}
Expand Down Expand Up @@ -239,7 +239,7 @@ func waitForServerToBeReady(t *testing.T, comp *Component) {
require.Eventuallyf(t, func() bool {
resp, err := http.Get(fmt.Sprintf(
"http://%v/wrong/url",
comp.target.HTTPListenAddress(),
comp.server.HTTPListenAddress(),
))
return err == nil && resp.StatusCode == 404
}, 5*time.Second, 20*time.Millisecond, "server failed to start before timeout")
Expand All @@ -259,6 +259,6 @@ func newRegexp() alloy_relabel.Regexp {
return alloy_relabel.Regexp{Regexp: re}
}

func getEndpoint(target *herokutarget.HerokuTarget) string {
func getEndpoint(target *herokutarget.HerokuServer) string {
return fmt.Sprintf("http://%s%s", target.HTTPListenAddress(), target.DrainEndpoint())
}
Loading
Loading