Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,22 +105,15 @@ You can use the following blocks with `loki.source.awsfirehose`:

| Name | Description | Required |
| --------------------- | -------------------------------------------------- | -------- |
| [`grpc`][grpc] | Configures the gRPC server that receives requests. | no |
| `gprc` > [`tls`][tls] | Configures TLS for the gRPC server. | no |
| [`http`][http] | Configures the HTTP server that receives requests. | no |
| `http` > [`tls`][tls] | Configures TLS for the HTTP server. | no |

The > symbol indicates deeper levels of nesting.
For example, `http` > `tls` refers to a `tls` block defined inside an `http` block.

[http]: #http
[grpc]: #grpc
[tls]: #tls

### `grpc`

{{< docs/shared lookup="reference/components/loki-server-grpc.md" source="alloy" version="<ALLOY_VERSION>" >}}

### `http`

{{< docs/shared lookup="reference/components/server-http.md" source="alloy" version="<ALLOY_VERSION>" >}}
Expand Down
72 changes: 25 additions & 47 deletions internal/component/loki/source/aws_firehose/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package aws_firehose
import (
"context"
"reflect"
"strings"
"sync"

"github.com/go-kit/log"
Expand All @@ -17,6 +16,7 @@ import (
"github.com/grafana/alloy/internal/component/common/loki"
fnet "github.com/grafana/alloy/internal/component/common/net"
alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel"
"github.com/grafana/alloy/internal/component/loki/source"
"github.com/grafana/alloy/internal/component/loki/source/aws_firehose/internal"
"github.com/grafana/alloy/internal/util"
)
Expand Down Expand Up @@ -50,31 +50,27 @@ func (a *Arguments) SetToDefault() {

// Component is the main type for the `loki.source.awsfirehose` component.
type Component struct {
// mut controls concurrent access to fanout
mut sync.RWMutex
fanout []loki.LogsReceiver
opts component.Options
logger log.Logger

// destination is the main destination where the TargetServer writes received log entries to
destination loki.LogsReceiver
rbs []*relabel.Config

server *fnet.TargetServer

opts component.Options
args Arguments

// utils
serverMetrics *util.UncheckedCollector
handlerMetrics *internal.Metrics
logger log.Logger

fanout *loki.Fanout
handler loki.LogsReceiver

mut sync.Mutex
args Arguments
rbs []*relabel.Config
server *fnet.TargetServer
}

// New creates a new Component.
func New(o component.Options, args Arguments) (*Component, error) {
c := &Component{
opts: o,
destination: loki.NewLogsReceiver(),
fanout: args.ForwardTo,
handler: loki.NewLogsReceiver(),
fanout: loki.NewFanout(args.ForwardTo),
serverMetrics: util.NewUncheckedCollector(nil),
handlerMetrics: internal.NewMetrics(o.Registerer),

Expand All @@ -95,21 +91,13 @@ func (c *Component) Run(ctx context.Context) error {
defer func() {
c.mut.Lock()
defer c.mut.Unlock()
c.shutdownServer()
if c.server != nil {
c.server.StopAndShutdown()
}
}()

for {
select {
case <-ctx.Done():
return nil
case entry := <-c.destination.Chan():
c.mut.RLock()
for _, receiver := range c.fanout {
receiver.Chan() <- entry
}
c.mut.RUnlock()
}
}
source.Consume(ctx, c.handler, c.fanout)
return nil
}

// Update updates the component with a new configuration, restarting the server if needed.
Expand All @@ -119,7 +107,8 @@ func (c *Component) Update(args component.Arguments) error {
defer c.mut.Unlock()

newArgs := args.(Arguments)
c.fanout = newArgs.ForwardTo

c.fanout.UpdateChildren(newArgs.ForwardTo)

var newRelabels []*relabel.Config = nil
// first condition to consider if the handler needs to be updated is if the UseIncomingTimestamp field
Expand Down Expand Up @@ -148,20 +137,18 @@ func (c *Component) Update(args component.Arguments) error {
return nil
}

c.shutdownServer()
if c.server != nil {
c.server.StopAndShutdown()
}

// update relabel rules in component if needed
if handlerNeedsUpdate {
c.rbs = newRelabels
}

r := strings.NewReplacer(".", "_", "/", "_")
jobName := r.Replace(c.opts.ID)

registry := prometheus.NewRegistry()
c.serverMetrics.SetCollector(registry)
Comment thread
ptodev marked this conversation as resolved.

c.server, err = fnet.NewTargetServer(c.logger, jobName, registry, newArgs.Server)
c.server, err = fnet.NewTargetServer(c.logger, "loki_source_awsfirehose", registry, newArgs.Server)
if err != nil {
return err
}
Expand All @@ -180,14 +167,5 @@ func (c *Component) Update(args component.Arguments) error {

// Send implements internal.Sender so that the component is able to receive logs decoded by the handler.
func (c *Component) Send(ctx context.Context, entry loki.Entry) {
c.destination.Chan() <- entry
}

// shutdownServer will shut down the currently used server.
// It is not goroutine-safe and mut write lock must be held when it's called.
func (c *Component) shutdownServer() {
if c.server != nil {
c.server.StopAndShutdown()
c.server = nil
}
c.handler.Chan() <- entry
}
Loading