diff --git a/docs/sources/reference/components/loki/loki.source.awsfirehose.md b/docs/sources/reference/components/loki/loki.source.awsfirehose.md index d3a28ba1e72..0c4af976a16 100644 --- a/docs/sources/reference/components/loki/loki.source.awsfirehose.md +++ b/docs/sources/reference/components/loki/loki.source.awsfirehose.md @@ -105,8 +105,6 @@ You can use the following blocks with `loki.source.awsfirehose`: | Name | Description | Required | | --------------------- | -------------------------------------------------- | -------- | -| [`grpc`][grpc] | Configures the gRPC server that receives requests. | no | -| `gprc` > [`tls`][tls] | Configures TLS for the gRPC server. | no | | [`http`][http] | Configures the HTTP server that receives requests. | no | | `http` > [`tls`][tls] | Configures TLS for the HTTP server. | no | @@ -114,13 +112,8 @@ The > symbol indicates deeper levels of nesting. For example, `http` > `tls` refers to a `tls` block defined inside an `http` block. [http]: #http -[grpc]: #grpc [tls]: #tls -### `grpc` - -{{< docs/shared lookup="reference/components/loki-server-grpc.md" source="alloy" version="" >}} - ### `http` {{< docs/shared lookup="reference/components/server-http.md" source="alloy" version="" >}} diff --git a/internal/component/loki/source/aws_firehose/component.go b/internal/component/loki/source/aws_firehose/component.go index 266dd111d10..af1c2741082 100644 --- a/internal/component/loki/source/aws_firehose/component.go +++ b/internal/component/loki/source/aws_firehose/component.go @@ -3,7 +3,6 @@ package aws_firehose import ( "context" "reflect" - "strings" "sync" "github.com/go-kit/log" @@ -17,6 +16,7 @@ import ( "github.com/grafana/alloy/internal/component/common/loki" fnet "github.com/grafana/alloy/internal/component/common/net" alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel" + "github.com/grafana/alloy/internal/component/loki/source" "github.com/grafana/alloy/internal/component/loki/source/aws_firehose/internal" "github.com/grafana/alloy/internal/util" ) @@ -50,31 +50,27 @@ func (a *Arguments) SetToDefault() { // Component is the main type for the `loki.source.awsfirehose` component. type Component struct { - // mut controls concurrent access to fanout - mut sync.RWMutex - fanout []loki.LogsReceiver + opts component.Options + logger log.Logger - // destination is the main destination where the TargetServer writes received log entries to - destination loki.LogsReceiver - rbs []*relabel.Config - - server *fnet.TargetServer - - opts component.Options - args Arguments - - // utils serverMetrics *util.UncheckedCollector handlerMetrics *internal.Metrics - logger log.Logger + + fanout *loki.Fanout + handler loki.LogsReceiver + + mut sync.Mutex + args Arguments + rbs []*relabel.Config + server *fnet.TargetServer } // New creates a new Component. func New(o component.Options, args Arguments) (*Component, error) { c := &Component{ opts: o, - destination: loki.NewLogsReceiver(), - fanout: args.ForwardTo, + handler: loki.NewLogsReceiver(), + fanout: loki.NewFanout(args.ForwardTo), serverMetrics: util.NewUncheckedCollector(nil), handlerMetrics: internal.NewMetrics(o.Registerer), @@ -95,21 +91,13 @@ func (c *Component) Run(ctx context.Context) error { defer func() { c.mut.Lock() defer c.mut.Unlock() - c.shutdownServer() + if c.server != nil { + c.server.StopAndShutdown() + } }() - for { - select { - case <-ctx.Done(): - return nil - case entry := <-c.destination.Chan(): - c.mut.RLock() - for _, receiver := range c.fanout { - receiver.Chan() <- entry - } - c.mut.RUnlock() - } - } + source.Consume(ctx, c.handler, c.fanout) + return nil } // Update updates the component with a new configuration, restarting the server if needed. @@ -119,7 +107,8 @@ func (c *Component) Update(args component.Arguments) error { defer c.mut.Unlock() newArgs := args.(Arguments) - c.fanout = newArgs.ForwardTo + + c.fanout.UpdateChildren(newArgs.ForwardTo) var newRelabels []*relabel.Config = nil // first condition to consider if the handler needs to be updated is if the UseIncomingTimestamp field @@ -148,20 +137,18 @@ func (c *Component) Update(args component.Arguments) error { return nil } - c.shutdownServer() + if c.server != nil { + c.server.StopAndShutdown() + } // update relabel rules in component if needed if handlerNeedsUpdate { c.rbs = newRelabels } - r := strings.NewReplacer(".", "_", "/", "_") - jobName := r.Replace(c.opts.ID) - registry := prometheus.NewRegistry() c.serverMetrics.SetCollector(registry) - - c.server, err = fnet.NewTargetServer(c.logger, jobName, registry, newArgs.Server) + c.server, err = fnet.NewTargetServer(c.logger, "loki_source_awsfirehose", registry, newArgs.Server) if err != nil { return err } @@ -180,14 +167,5 @@ func (c *Component) Update(args component.Arguments) error { // Send implements internal.Sender so that the component is able to receive logs decoded by the handler. func (c *Component) Send(ctx context.Context, entry loki.Entry) { - c.destination.Chan() <- entry -} - -// shutdownServer will shut down the currently used server. -// It is not goroutine-safe and mut write lock must be held when it's called. -func (c *Component) shutdownServer() { - if c.server != nil { - c.server.StopAndShutdown() - c.server = nil - } + c.handler.Chan() <- entry }