Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
432 changes: 428 additions & 4 deletions NOTICE.txt

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion filebeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func FilebeatSettings(moduleNameSpace string) instance.Settings {
Initialize: []func(){
include.InitializeModule,
func() { fileset.RegisterMonitoringModules(moduleNameSpace) },
input.RegisterMonitoringInputs,
func() { input.RegisterMonitoringInputs("") },
},
}
}
Expand Down
11 changes: 8 additions & 3 deletions filebeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,15 @@ import (
var inputList = monitoring.NewUniqueList()
var inputListMetricsOnce sync.Once

// RegisterMonitoringInputs registers the inputs list with the monitoring system.
func RegisterMonitoringInputs() {
// RegisterMonitoringInputs registers a list of inputs with the
// monitoring system under the provided namespace. If namespace is
// empty, it default to "state". Registration only occurs once.
func RegisterMonitoringInputs(namespace string) {
if namespace == "" {
namespace = "state"
}
inputListMetricsOnce.Do(func() {
monitoring.NewFunc(monitoring.GetNamespace("state").GetRegistry(), "input", inputList.Report, monitoring.Report)
monitoring.NewFunc(monitoring.GetNamespace(namespace).GetRegistry(), "input", inputList.Report, monitoring.Report)
})
}

Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ require (
github.com/blakesmith/ar v0.0.0-20150311145944-8bd4349a67f2
github.com/bsm/sarama-cluster v2.1.14-0.20180625083203-7e67d87a6b3f+incompatible
github.com/cavaliergopher/rpm v1.2.0
github.com/cespare/xxhash/v2 v2.2.0
github.com/cespare/xxhash/v2 v2.3.0
github.com/cloudfoundry-community/go-cfclient v0.0.0-20190808214049-35bcce23fc5f
github.com/cloudfoundry/noaa v2.1.0+incompatible
github.com/cloudfoundry/sonde-go v0.0.0-20171206171820-b33733203bb4
Expand Down Expand Up @@ -149,7 +149,7 @@ require (
golang.org/x/tools v0.23.0
google.golang.org/api v0.191.0
google.golang.org/genproto v0.0.0-20240730163845-b1a4ccb954bf // indirect
google.golang.org/grpc v1.64.1
google.golang.org/grpc v1.66.0
google.golang.org/protobuf v1.34.2
gopkg.in/inf.v0 v0.9.1
gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect
Expand Down Expand Up @@ -219,6 +219,7 @@ require (
go.elastic.co/apm/module/apmhttp/v2 v2.6.0
go.elastic.co/apm/v2 v2.6.0
go.mongodb.org/mongo-driver v1.5.1
go.opentelemetry.io/collector/consumer v0.107.0
google.golang.org/genproto/googleapis/api v0.0.0-20240725223205-93522f1f2a9f
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)
Expand Down Expand Up @@ -363,6 +364,7 @@ require (
github.com/zeebo/xxh3 v1.0.2 // indirect
go.elastic.co/fastjson v1.1.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector/pdata v1.13.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
Expand Down
12 changes: 8 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,8 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf
github.com/cespare/xxhash/v2 v2.1.0/go.mod h1:dgIUBU3pDso/gPgZ1osOZ0iQf77oPR28Tjxl5dIMyVM=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
Expand Down Expand Up @@ -1730,6 +1730,10 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/collector/consumer v0.107.0 h1:fF/+xyv9BfXQUvuJqkljrpzKyBQExDQt6zB5rzGyuHs=
go.opentelemetry.io/collector/consumer v0.107.0/go.mod h1:wgWpFes9sbnZ11XeJPSeutU8GJx6dT/gzSUqHpaZZQA=
go.opentelemetry.io/collector/pdata v1.13.0 h1:eV3NQt2f1UcaibkziMvGTQI34LlpiYBUGp1yP0G/Cxw=
go.opentelemetry.io/collector/pdata v1.13.0/go.mod h1:MYeB0MmMAxeM0hstCFrCqWLzdyeYySim2dG6pDT6nYI=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0/go.mod h1:Mjt1i1INqiaoZOMGR1RIUJN+i3ChKoFRqzrRQhlkbs0=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 h1:4K4tsIXefpVJtvA/8srF4V4y0akAoPHkIslgAkjixJA=
Expand Down Expand Up @@ -2384,8 +2388,8 @@ google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQ
google.golang.org/grpc v1.37.1/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI=
google.golang.org/grpc v1.64.1 h1:LKtvyfbX3UGVPFcGqJ9ItpVWW6oN/2XqTxfAnwRRXiA=
google.golang.org/grpc v1.64.1/go.mod h1:hiQF4LFZelK2WKaP6W0L92zGHtiQdZxk8CrSdvyjeP0=
google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c=
google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
Expand Down
3 changes: 3 additions & 0 deletions libbeat/beat/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/gofrs/uuid/v5"
"go.opentelemetry.io/collector/consumer"
)

// Info stores a beats instance meta data.
Expand All @@ -42,6 +43,8 @@ type Info struct {
Monitoring struct {
DefaultUsername string // The default username to be used to connect to Elasticsearch Monitoring
}
LogConsumer consumer.Logs //otel log consumer

}

func (i Info) FQDNAwareHostname(useFQDN bool) string {
Expand Down
219 changes: 219 additions & 0 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"time"

"github.com/gofrs/uuid/v5"
"go.opentelemetry.io/collector/consumer"
"go.uber.org/zap"

"github.com/elastic/beats/v7/libbeat/api"
Expand Down Expand Up @@ -285,6 +286,224 @@ func NewBeat(name, indexPrefix, v string, elasticLicensed bool, initFuncs []func
return &Beat{Beat: b}, nil
}

// NewBeatReceiver creates a Beat that will be used in the context of an otel receiver
func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, consumer consumer.Logs) (*Beat, error) {
b, err := NewBeat(settings.Name,
settings.IndexPrefix,
settings.Version,
settings.ElasticLicensed,
settings.Initialize)
if err != nil {
return nil, err
}

b.Info.LogConsumer = consumer

// begin code similar to configure
if err = plugin.Initialize(); err != nil {
return nil, fmt.Errorf("error initializing plugins: %w", err)
}

b.InputQueueSize = settings.InputQueueSize

cfOpts := []ucfg.Option{
ucfg.PathSep("."),
ucfg.ResolveEnv,
ucfg.VarExp,
}

tmp, err := ucfg.NewFrom(receiverConfig, cfOpts...)
if err != nil {
return nil, fmt.Errorf("error converting receiver config to ucfg: %w", err)
}

cfg := (*config.C)(tmp)
if err := initPaths(cfg); err != nil {
return nil, fmt.Errorf("error initializing paths: %w", err)
}

// We have to initialize the keystore before any unpack or merging the cloud
// options.
store, err := LoadKeystore(cfg, b.Info.Beat)
if err != nil {
return nil, fmt.Errorf("could not initialize the keystore: %w", err)
}

if settings.DisableConfigResolver {
config.OverwriteConfigOpts(obfuscateConfigOpts())
} else {
// TODO: Allow the options to be more flexible for dynamic changes
config.OverwriteConfigOpts(configOpts(store))
}

instrumentation, err := instrumentation.New(cfg, b.Info.Beat, b.Info.Version)
if err != nil {
return nil, fmt.Errorf("error setting up instrumentation: %w", err)
}
b.Beat.Instrumentation = instrumentation

b.keystore = store
b.Beat.Keystore = store
err = cloudid.OverwriteSettings(cfg)
if err != nil {
return nil, fmt.Errorf("error overwriting cloudid settings: %w", err)
}

b.RawConfig = cfg
err = cfg.Unpack(&b.Config)
if err != nil {
return nil, fmt.Errorf("error unpacking config data: %w", err)
}

if err := promoteOutputQueueSettings(&b.Config); err != nil {
return nil, fmt.Errorf("could not promote output queue settings: %w", err)
}

if err := features.UpdateFromConfig(b.RawConfig); err != nil {
return nil, fmt.Errorf("could not parse features: %w", err)
}
b.RegisterHostname(features.FQDN())

b.Beat.Config = &b.Config.BeatConfig

if name := b.Config.Name; name != "" {
b.Info.Name = name
}

if err := common.SetTimestampPrecision(b.Config.TimestampPrecision); err != nil {
return nil, fmt.Errorf("error setting timestamp precision: %w", err)
}

if err := configure.LoggingWithTypedOutputs(b.Info.Beat, b.Config.Logging, b.Config.EventLogging, logp.TypeKey, logp.EventType); err != nil {
return nil, fmt.Errorf("error initializing logging: %w", err)
}

// log paths values to help with troubleshooting
logp.Info(paths.Paths.String())

metaPath := paths.Resolve(paths.Data, "meta.json")
err = b.loadMeta(metaPath)
if err != nil {
return nil, fmt.Errorf("error loading meta data: %w", err)
}

logp.Info("Beat ID: %v", b.Info.ID)

// Try to get the host's FQDN and set it.
h, err := sysinfo.Host()
if err != nil {
return nil, fmt.Errorf("failed to get host information: %w", err)
}

fqdnLookupCtx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

fqdn, err := h.FQDNWithContext(fqdnLookupCtx)
if err != nil {
// FQDN lookup is "best effort". We log the error, fallback to
// the OS-reported hostname, and move on.
logp.Warn("unable to lookup FQDN: %s, using hostname = %s as FQDN", err.Error(), b.Info.Hostname)
b.Info.FQDN = b.Info.Hostname
} else {
b.Info.FQDN = fqdn
}

// initialize config manager
m, err := management.NewManager(b.Config.Management, b.Registry)
if err != nil {
return nil, fmt.Errorf("error creating new manager: %w", err)
}
b.Manager = m

if b.Manager.AgentInfo().Version != "" {
// During the manager initialization the client to connect to the agent is
// also initialized. That makes the beat to read information sent by the
// agent, which includes the AgentInfo with the agent's package version.
// Components running under agent should report the agent's package version
// as their own version.
// In order to do so b.Info.Version needs to be set to the version the agent
// sent. As this Beat instance is initialized much before the package
// version is received, it's overridden here. So far it's early enough for
// the whole beat to report the right version.
b.Info.Version = b.Manager.AgentInfo().Version
version.SetPackageVersion(b.Info.Version)
}

// build the user-agent string to be used by the outputs
b.GenerateUserAgent()

if err := b.Manager.CheckRawConfig(b.RawConfig); err != nil {
return nil, fmt.Errorf("error checking raw config: %w", err)
}

b.Beat.BeatConfig, err = b.BeatConfig()
if err != nil {
return nil, fmt.Errorf("error setting BeatConfig: %w", err)
}

imFactory := settings.IndexManagement
if imFactory == nil {
imFactory = idxmgmt.MakeDefaultSupport(settings.ILM)
}
b.IdxSupporter, err = imFactory(nil, b.Beat.Info, b.RawConfig)
if err != nil {
return nil, fmt.Errorf("error setting index supporter: %w", err)
}

processingFactory := settings.Processing
if processingFactory == nil {
processingFactory = processing.MakeDefaultBeatSupport(true)
}

b.processors, err = processingFactory(b.Info, logp.L().Named("processors"), b.RawConfig)
if err != nil {
return nil, fmt.Errorf("error creating processors: %w", err)
}

reg := monitoring.Default.GetRegistry(b.Info.Name)
if reg == nil {
reg = monitoring.Default.NewRegistry(b.Info.Name)
}

// This should be replaced with static config for otel consumer
// but need to figure out if we want the Queue settings from here.
outputEnabled := b.Config.Output.IsSet() && b.Config.Output.Config().Enabled()
if !outputEnabled {
if b.Manager.Enabled() {
logp.Info("Output is configured through Central Management")
} else {
return nil, fmt.Errorf("no outputs are defined, please define one under the output section")
}
}

tel := reg.GetRegistry("state")
if tel == nil {
tel = reg.NewRegistry("state")
}
monitors := pipeline.Monitors{
Metrics: reg,
Telemetry: tel,
Logger: logp.L().Named("publisher"),
Tracer: b.Instrumentation.Tracer(),
}

outputFactory := b.makeOutputFactory(b.Config.Output)

pipelineSettings := pipeline.Settings{
Processors: b.processors,
InputQueueSize: b.InputQueueSize,
}
publisher, err := pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, pipelineSettings)
if err != nil {
return nil, fmt.Errorf("error initializing publisher: %w", err)
}
b.Registry.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader()))
b.Publisher = publisher

return b, nil

}

// InitWithSettings does initialization of things common to all actions (read confs, flags)
func (b *Beat) InitWithSettings(settings Settings) error {
err := b.handleFlags()
Expand Down