From 6d1f9ed1b482a0dd8b1e26e0a6802b44e7f4ec6b Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Tue, 10 Mar 2026 14:05:41 +0100 Subject: [PATCH 1/3] remove grpc block --- .../reference/components/loki/loki.source.awsfirehose.md | 7 ------- 1 file changed, 7 deletions(-) diff --git a/docs/sources/reference/components/loki/loki.source.awsfirehose.md b/docs/sources/reference/components/loki/loki.source.awsfirehose.md index d3a28ba1e72..0c4af976a16 100644 --- a/docs/sources/reference/components/loki/loki.source.awsfirehose.md +++ b/docs/sources/reference/components/loki/loki.source.awsfirehose.md @@ -105,8 +105,6 @@ You can use the following blocks with `loki.source.awsfirehose`: | Name | Description | Required | | --------------------- | -------------------------------------------------- | -------- | -| [`grpc`][grpc] | Configures the gRPC server that receives requests. | no | -| `gprc` > [`tls`][tls] | Configures TLS for the gRPC server. | no | | [`http`][http] | Configures the HTTP server that receives requests. | no | | `http` > [`tls`][tls] | Configures TLS for the HTTP server. | no | @@ -114,13 +112,8 @@ The > symbol indicates deeper levels of nesting. For example, `http` > `tls` refers to a `tls` block defined inside an `http` block. [http]: #http -[grpc]: #grpc [tls]: #tls -### `grpc` - -{{< docs/shared lookup="reference/components/loki-server-grpc.md" source="alloy" version="" >}} - ### `http` {{< docs/shared lookup="reference/components/server-http.md" source="alloy" version="" >}} From 23e981ec900fb8b7f3ded3a1a52241a4a2607077 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Tue, 10 Mar 2026 14:19:47 +0100 Subject: [PATCH 2/3] set metrics namespace as loki_source_awsfirehose --- internal/component/loki/source/aws_firehose/component.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/internal/component/loki/source/aws_firehose/component.go b/internal/component/loki/source/aws_firehose/component.go index 266dd111d10..63699dd7e52 100644 --- a/internal/component/loki/source/aws_firehose/component.go +++ b/internal/component/loki/source/aws_firehose/component.go @@ -3,7 +3,6 @@ package aws_firehose import ( "context" "reflect" - "strings" "sync" "github.com/go-kit/log" @@ -155,13 +154,9 @@ func (c *Component) Update(args component.Arguments) error { c.rbs = newRelabels } - r := strings.NewReplacer(".", "_", "/", "_") - jobName := r.Replace(c.opts.ID) - registry := prometheus.NewRegistry() c.serverMetrics.SetCollector(registry) - - c.server, err = fnet.NewTargetServer(c.logger, jobName, registry, newArgs.Server) + c.server, err = fnet.NewTargetServer(c.logger, "loki_source_awsfirehose", registry, newArgs.Server) if err != nil { return err } From 8ab257ddce52a3ebe95641c39a35f3d777ded503 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Tue, 10 Mar 2026 14:29:38 +0100 Subject: [PATCH 3/3] use fanout and source.Consume --- .../loki/source/aws_firehose/component.go | 65 +++++++------------ 1 file changed, 24 insertions(+), 41 deletions(-) diff --git a/internal/component/loki/source/aws_firehose/component.go b/internal/component/loki/source/aws_firehose/component.go index 63699dd7e52..af1c2741082 100644 --- a/internal/component/loki/source/aws_firehose/component.go +++ b/internal/component/loki/source/aws_firehose/component.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/alloy/internal/component/common/loki" fnet "github.com/grafana/alloy/internal/component/common/net" alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel" + "github.com/grafana/alloy/internal/component/loki/source" "github.com/grafana/alloy/internal/component/loki/source/aws_firehose/internal" "github.com/grafana/alloy/internal/util" ) @@ -49,31 +50,27 @@ func (a *Arguments) SetToDefault() { // Component is the main type for the `loki.source.awsfirehose` component. type Component struct { - // mut controls concurrent access to fanout - mut sync.RWMutex - fanout []loki.LogsReceiver + opts component.Options + logger log.Logger - // destination is the main destination where the TargetServer writes received log entries to - destination loki.LogsReceiver - rbs []*relabel.Config - - server *fnet.TargetServer - - opts component.Options - args Arguments - - // utils serverMetrics *util.UncheckedCollector handlerMetrics *internal.Metrics - logger log.Logger + + fanout *loki.Fanout + handler loki.LogsReceiver + + mut sync.Mutex + args Arguments + rbs []*relabel.Config + server *fnet.TargetServer } // New creates a new Component. func New(o component.Options, args Arguments) (*Component, error) { c := &Component{ opts: o, - destination: loki.NewLogsReceiver(), - fanout: args.ForwardTo, + handler: loki.NewLogsReceiver(), + fanout: loki.NewFanout(args.ForwardTo), serverMetrics: util.NewUncheckedCollector(nil), handlerMetrics: internal.NewMetrics(o.Registerer), @@ -94,21 +91,13 @@ func (c *Component) Run(ctx context.Context) error { defer func() { c.mut.Lock() defer c.mut.Unlock() - c.shutdownServer() + if c.server != nil { + c.server.StopAndShutdown() + } }() - for { - select { - case <-ctx.Done(): - return nil - case entry := <-c.destination.Chan(): - c.mut.RLock() - for _, receiver := range c.fanout { - receiver.Chan() <- entry - } - c.mut.RUnlock() - } - } + source.Consume(ctx, c.handler, c.fanout) + return nil } // Update updates the component with a new configuration, restarting the server if needed. @@ -118,7 +107,8 @@ func (c *Component) Update(args component.Arguments) error { defer c.mut.Unlock() newArgs := args.(Arguments) - c.fanout = newArgs.ForwardTo + + c.fanout.UpdateChildren(newArgs.ForwardTo) var newRelabels []*relabel.Config = nil // first condition to consider if the handler needs to be updated is if the UseIncomingTimestamp field @@ -147,7 +137,9 @@ func (c *Component) Update(args component.Arguments) error { return nil } - c.shutdownServer() + if c.server != nil { + c.server.StopAndShutdown() + } // update relabel rules in component if needed if handlerNeedsUpdate { @@ -175,14 +167,5 @@ func (c *Component) Update(args component.Arguments) error { // Send implements internal.Sender so that the component is able to receive logs decoded by the handler. func (c *Component) Send(ctx context.Context, entry loki.Entry) { - c.destination.Chan() <- entry -} - -// shutdownServer will shut down the currently used server. -// It is not goroutine-safe and mut write lock must be held when it's called. -func (c *Component) shutdownServer() { - if c.server != nil { - c.server.StopAndShutdown() - c.server = nil - } + c.handler.Chan() <- entry }