diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 121abe92b20a..34b6b6433315 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -407,6 +407,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403] - Fix handling of ADC (Application Default Credentials) metadata server credentials in HTTPJSON input. {issue}44349[44349] {pull}44436[44436] - Fix handling of ADC (Application Default Credentials) metadata server credentials in CEL input. {issue}44349[44349] {pull}44571[44571] - Introduce lastSync start position to AWS CloudWatch input backed by state registry. {pull}43251[43251] +- Add Fleet status update functionality to udp input. {issue}44419[44419] {pull}44785[44785] - Filestream now logs at level warn the number of files that are too small to be ingested {pull}44751[44751] *Auditbeat* diff --git a/filebeat/input/udp/input.go b/filebeat/input/udp/input.go index 190b77663ac4..1f22b3db3ad9 100644 --- a/filebeat/input/udp/input.go +++ b/filebeat/input/udp/input.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/filebeat/inputsource/udp" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/management/status" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/mapstr" @@ -81,7 +82,7 @@ func newServer(config config) (*server, error) { func (s *server) Name() string { return "udp" } func (s *server) Test(_ input.TestContext) error { - l, err := net.Listen("udp", s.config.Config.Host) + l, err := net.Listen("udp", s.Host) if err != nil { return err } @@ -89,16 +90,19 @@ func (s *server) Test(_ input.TestContext) error { } func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error { - log := ctx.Logger.With("host", s.config.Config.Host) + log := ctx.Logger.With("host", s.Host) log.Info("starting udp socket input") defer log.Info("udp input stopped") + ctx.UpdateStatus(status.Starting, "") + ctx.UpdateStatus(status.Configuring, "") + const pollInterval = time.Minute - metrics := netmetrics.NewUDP("udp", ctx.ID, s.config.Host, uint64(s.config.ReadBuffer), pollInterval, log) + metrics := netmetrics.NewUDP("udp", ctx.ID, s.Host, uint64(s.ReadBuffer), pollInterval, log) // #nosec G115 -- ignore "overflow conversion int64 -> uint64", config validation ensures value is always positive. defer metrics.Close() - server := udp.New(&s.config.Config, func(data []byte, metadata inputsource.NetworkMetadata) { + server := udp.New(&s.Config, func(data []byte, metadata inputsource.NetworkMetadata) { evt := beat.Event{ Timestamp: time.Now(), Meta: mapstr.M{ @@ -124,11 +128,19 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error { }) log.Debug("udp input initialized") + ctx.UpdateStatus(status.Running, "") err := server.Run(ctxtool.FromCanceller(ctx.Cancelation)) // Ignore error from 'Run' in case shutdown was signaled. if ctxerr := ctx.Cancelation.Err(); ctxerr != nil { err = ctxerr } + + if err != nil { + ctx.UpdateStatus(status.Failed, "Input exited unexpectedly: "+err.Error()) + } else { + ctx.UpdateStatus(status.Stopped, "") + } + return err }