diff --git a/NOTICE.txt b/NOTICE.txt index 30c69cce2a30..db274bc1fc20 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -24822,6 +24822,218 @@ Contents of probable licence file $GOMODCACHE/go.opentelemetry.io/collector/comp limitations under the License. +-------------------------------------------------------------------------------- +Dependency : go.opentelemetry.io/collector/component/componentstatus +Version: v0.125.0 +Licence type (autodetected): Apache-2.0 +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/go.opentelemetry.io/collector/component/componentstatus@v0.125.0/LICENSE: + + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + -------------------------------------------------------------------------------- Dependency : go.opentelemetry.io/collector/config/configtls Version: v1.31.0 @@ -62986,218 +63198,6 @@ Contents of probable licence file $GOMODCACHE/go.opentelemetry.io/collector/clie limitations under the License. --------------------------------------------------------------------------------- -Dependency : go.opentelemetry.io/collector/component/componentstatus -Version: v0.125.0 -Licence type (autodetected): Apache-2.0 --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/go.opentelemetry.io/collector/component/componentstatus@v0.125.0/LICENSE: - - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - -------------------------------------------------------------------------------- Dependency : go.opentelemetry.io/collector/component/componenttest Version: v0.125.0 diff --git a/filebeat/beater/crawler.go b/filebeat/beater/crawler.go index 525ab544141f..c20fde6b79b1 100644 --- a/filebeat/beater/crawler.go +++ b/filebeat/beater/crawler.go @@ -26,6 +26,9 @@ import ( "github.com/elastic/beats/v7/filebeat/input" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" + "github.com/elastic/beats/v7/libbeat/management" + "github.com/elastic/beats/v7/libbeat/management/status" + conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) @@ -66,14 +69,18 @@ func (c *crawler) Start( pipeline beat.PipelineConnector, configInputs *conf.C, configModules *conf.C, + reporter status.StatusReporter, ) error { log := c.log log.Infof("Loading Inputs: %d", len(c.inputConfigs)) + groupReporter := management.NewGroupStatusReporter(reporter) + // Prospect the globs/paths given on the command line and launch harvesters for _, inputConfig := range c.inputConfigs { - err := c.startInput(pipeline, inputConfig) + err := c.startInput(pipeline, inputConfig, groupReporter) + if err != nil { return fmt.Errorf("starting input failed: %w", err) } @@ -112,6 +119,7 @@ func (c *crawler) Start( func (c *crawler) startInput( pipeline beat.PipelineConnector, config *conf.C, + reporter management.RunnerReporter, ) error { if !config.Enabled() { @@ -139,7 +147,9 @@ func (c *crawler) startInput( if inputRunner, ok := runner.(*input.Runner); ok { inputRunner.Once = c.once } - + if r, ok := runner.(status.WithStatusReporter); ok { + r.SetStatusReporter(reporter.GetReporterForRunner(id)) + } c.inputs[id] = runner c.log.Infof("Starting input (ID: %d)", id) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 9cb795381b2a..b6aca607f875 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -438,7 +438,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { fb.logger.Debug("modules", "Existing Ingest pipelines will be updated") } - err = crawler.Start(fb.pipeline, config.ConfigInput, config.ConfigModules) + err = crawler.Start(fb.pipeline, config.ConfigInput, config.ConfigModules, b.Manager) if err != nil { crawler.Stop() cancelPipelineFactoryCtx() diff --git a/go.mod b/go.mod index 202674d2770c..d44e31fbc000 100644 --- a/go.mod +++ b/go.mod @@ -217,6 +217,7 @@ require ( go.elastic.co/apm/v2 v2.7.0 go.mongodb.org/mongo-driver v1.14.0 go.opentelemetry.io/collector/component v1.31.0 + go.opentelemetry.io/collector/component/componentstatus v0.125.0 go.opentelemetry.io/collector/config/configtls v1.31.0 go.opentelemetry.io/collector/confmap v1.31.0 go.opentelemetry.io/collector/consumer v1.31.0 @@ -408,7 +409,6 @@ require ( go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/collector/client v1.31.0 // indirect - go.opentelemetry.io/collector/component/componentstatus v0.125.0 // indirect go.opentelemetry.io/collector/component/componenttest v0.125.0 // indirect go.opentelemetry.io/collector/config/configauth v0.125.0 // indirect go.opentelemetry.io/collector/config/configcompression v1.31.0 // indirect diff --git a/libbeat/management/group.go b/libbeat/management/group.go new file mode 100644 index 000000000000..34ddfaf9eda2 --- /dev/null +++ b/libbeat/management/group.go @@ -0,0 +1,126 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package management + +import ( + "sync" + + "github.com/elastic/beats/v7/libbeat/management/status" +) + +type runnerState struct { + state status.Status + msg string +} + +// RunnerReporter defines an interface that returns a StatusReporter for a specific runner. +// This is used for grouping and managing statuses of multiple runners +type RunnerReporter interface { + GetReporterForRunner(id uint64) status.StatusReporter +} + +// NewGroupStatusReporter creates a reporter that aggregates the statuses of multiple runners +// and reports the combined status to the parent StatusReporter. +// This is needed because multiple modules can report different statuses, and we want to avoid +// repeatedly flipping the parent's status. +func NewGroupStatusReporter(parent status.StatusReporter) RunnerReporter { + // If the parent is a "fallbackManager", we're operating in standard standalone mode, + // so setting a group reporter isn't necessary. + if _, ok := parent.(*fallbackManager); ok || parent == nil { + return &nopStatus{} + } + return &reporter{ + parent: parent, + runnerStates: make(map[uint64]runnerState), + } +} + +type reporter struct { + runnerStates map[uint64]runnerState + parent status.StatusReporter + mtx sync.Mutex +} + +func (r *reporter) GetReporterForRunner(id uint64) status.StatusReporter { + r.mtx.Lock() + defer r.mtx.Unlock() + return &subReporter{ + id: id, + r: r, + } +} + +func (r *reporter) updateStatusForRunner(id uint64, state status.Status, msg string) { + r.mtx.Lock() + defer r.mtx.Unlock() + if r.runnerStates == nil { + r.runnerStates = make(map[uint64]runnerState) + } + + // add status for the runner to the map + r.runnerStates[id] = runnerState{ + state: state, + msg: msg, + } + + // calculate the aggregate state of beat based on the module states + calcState, calcMsg := r.calculateState() + + // report status to parent reporter + r.parent.UpdateStatus(calcState, calcMsg) +} + +func (r *reporter) calculateState() (status.Status, string) { + reportedState := status.Running + reportedMsg := "" + for _, s := range r.runnerStates { + switch s.state { + case status.Degraded: + if reportedState != status.Degraded { + reportedState = status.Degraded + reportedMsg = s.msg + } + case status.Failed: + // we've encountered a failed runner. + // short-circuit and return, as Failed state takes precedence over other states + return s.state, s.msg + } + } + return reportedState, reportedMsg +} + +type nopStatus struct{} + +type nopReporter struct{} + +func (*nopReporter) UpdateStatus(status.Status, string) {} + +func (s *nopStatus) GetReporterForRunner(id uint64) status.StatusReporter { + return &nopReporter{} +} + +// subReporter implements status.StatusReporter +type subReporter struct { + id uint64 + r *reporter +} + +func (m *subReporter) UpdateStatus(status status.Status, msg string) { + // report status to its parent + m.r.updateStatusForRunner(m.id, status, msg) +} diff --git a/libbeat/management/group_test.go b/libbeat/management/group_test.go new file mode 100644 index 000000000000..dc55ff6a9d05 --- /dev/null +++ b/libbeat/management/group_test.go @@ -0,0 +1,68 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package management + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/management/status" +) + +type mockStatusReporter struct { + s status.Status + msg string +} + +func (m *mockStatusReporter) UpdateStatus(s status.Status, msg string) { + m.s = s + m.msg = msg +} + +func TestGroupStatus(t *testing.T) { + m := &mockStatusReporter{} + reporter := NewGroupStatusReporter(m) + + subReporter1, subReporter2, subReporter3 := reporter.GetReporterForRunner(1), reporter.GetReporterForRunner(2), reporter.GetReporterForRunner(3) + + subReporter1.UpdateStatus(status.Running, "") + subReporter2.UpdateStatus(status.Running, "") + subReporter3.UpdateStatus(status.Running, "") + + require.Equal(t, m.s, status.Running) + require.Equal(t, m.msg, "") + + subReporter1.UpdateStatus(status.Degraded, "Degrade Runner1") + require.Equal(t, m.s, status.Degraded) + require.Equal(t, m.msg, "Degrade Runner1") + + subReporter3.UpdateStatus(status.Degraded, "Degrade Runner3") + subReporter2.UpdateStatus(status.Failed, "Failed Runner2") + + require.Equal(t, m.s, status.Failed) + require.Equal(t, m.msg, "Failed Runner2") +} + +func TestNopReporter(t *testing.T) { + r := NewGroupStatusReporter(nil) + require.IsType(t, &nopStatus{}, r) + + r = NewGroupStatusReporter(&fallbackManager{}) + require.IsType(t, &nopStatus{}, r) +} diff --git a/libbeat/otelbeat/oteltest/oteltest.go b/libbeat/otelbeat/oteltest/oteltest.go index 75f048b16d8b..30d8bba0adc6 100644 --- a/libbeat/otelbeat/oteltest/oteltest.go +++ b/libbeat/otelbeat/oteltest/oteltest.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/receiver" @@ -38,6 +39,18 @@ import ( "go.uber.org/zap/zaptest/observer" ) +type MockHost struct { + Evt *componentstatus.Event +} + +func (*MockHost) GetExtensions() map[component.ID]component.Component { + return nil +} + +func (h *MockHost) Report(evt *componentstatus.Event) { + h.Evt = evt +} + type ReceiverConfig struct { Name string Config component.Config @@ -61,6 +74,8 @@ func CheckReceivers(params CheckReceiversParams) { var logsMu sync.Mutex logs := make(map[string][]mapstr.M) + host := &MockHost{} + zapCore := zapcore.NewCore( zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), &zaptest.Discarder{}, @@ -112,7 +127,7 @@ func CheckReceivers(params CheckReceiversParams) { } for i, r := range receivers { - err := r.Start(ctx, nil) + err := r.Start(ctx, host) require.NoErrorf(t, err, "Error starting receiver %d", i) defer func() { require.NoErrorf(t, r.Shutdown(ctx), "Error shutting down receiver %d", i) @@ -138,6 +153,9 @@ func CheckReceivers(params CheckReceiversParams) { require.Equal(t, zl.ContextMap()["otelcol.signal"], "logs") break } + require.NotNil(t, host.Evt) + require.Nil(t, host.Evt.Err()) + require.Equal(t, host.Evt.Status(), componentstatus.StatusOK) params.AssertFunc(ct, logs, zapLogs) }, 2*time.Minute, 100*time.Millisecond, diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index 0393eac33011..f7558190ea01 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -25,6 +25,8 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/management" + "github.com/elastic/beats/v7/libbeat/management/status" + "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/module" @@ -44,9 +46,9 @@ import ( // Metricbeat implements the Beater interface for metricbeat. type Metricbeat struct { - done chan struct{} // Channel used to initiate shutdown. - stopOnce sync.Once // wraps the Stop() method - runners []cfgfile.Runner // Active list of module runners. + done chan struct{} // Channel used to initiate shutdown. + stopOnce sync.Once // wraps the Stop() method + runners map[uint64]cfgfile.Runner // Active list of module runners. config Config registry *mb.Register autodiscover *autodiscover.Autodiscover @@ -154,6 +156,7 @@ func newMetricbeat(b *beat.Beat, c *conf.C, registry *mb.Register, options ...Op config: config, registry: registry, logger: b.Info.Logger, + runners: make(map[uint64]cfgfile.Runner), } for _, applyOption := range options { @@ -202,7 +205,12 @@ func newMetricbeat(b *beat.Beat, c *conf.C, registry *mb.Register, options ...Op return nil, err } - metricbeat.runners = append(metricbeat.runners, runner) + hash, err := cfgfile.HashConfig(moduleCfg) + if err != nil { + return nil, fmt.Errorf("error hashing module config: %w", err) + } + + metricbeat.runners[hash] = runner } if len(metricbeat.runners) == 0 && !dynamicCfgEnabled { @@ -235,8 +243,15 @@ func newMetricbeat(b *beat.Beat, c *conf.C, registry *mb.Register, options ...Op func (bt *Metricbeat) Run(b *beat.Beat) error { var wg sync.WaitGroup + groupReporter := management.NewGroupStatusReporter(b.Manager) + // Static modules (metricbeat.runners) - for _, r := range bt.runners { + for hash, r := range bt.runners { + // If the otelStatusReporter is set, we need to set the status reporter + if status, ok := r.(status.WithStatusReporter); ok { + status.SetStatusReporter(groupReporter.GetReporterForRunner(hash)) + } + r.Start() wg.Add(1) diff --git a/x-pack/filebeat/fbreceiver/receiver.go b/x-pack/filebeat/fbreceiver/receiver.go index 62c38dc3aa6f..005277ecef53 100644 --- a/x-pack/filebeat/fbreceiver/receiver.go +++ b/x-pack/filebeat/fbreceiver/receiver.go @@ -25,7 +25,7 @@ func (fb *filebeatReceiver) Start(ctx context.Context, host component.Host) erro go func() { defer fb.wg.Done() fb.Logger.Info("starting filebeat receiver") - if err := fb.BeatReceiver.Start(); err != nil { + if err := fb.BeatReceiver.Start(host); err != nil { fb.Logger.Error("error starting filebeat receiver", zap.Error(err)) } }() diff --git a/x-pack/filebeat/input/benchmark/input.go b/x-pack/filebeat/input/benchmark/input.go index 4f9b64332f7f..1ab1d57e7033 100644 --- a/x-pack/filebeat/input/benchmark/input.go +++ b/x-pack/filebeat/input/benchmark/input.go @@ -14,6 +14,7 @@ import ( stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/mapstr" @@ -62,6 +63,8 @@ func (bi *benchmarkInput) Run(ctx v2.Context, publisher stateless.Publisher) err var wg sync.WaitGroup metrics := newInputMetrics(ctx) + ctx.UpdateStatus(status.Running, "") + for i := uint8(0); i < bi.cfg.Threads; i++ { wg.Add(1) go func(thread uint8) { @@ -96,7 +99,7 @@ func runThread(ctx v2.Context, publisher stateless.Publisher, thread uint8, cfg } case cfg.Eps > 0: ticker := time.NewTicker(1 * time.Second) - pubChan := make(chan bool, int(cfg.Eps)) + pubChan := make(chan bool, int(cfg.Eps)) //nolint:gosec // disable G115 for { select { case <-ctx.Cancelation.Done(): diff --git a/x-pack/libbeat/cmd/instance/receiver.go b/x-pack/libbeat/cmd/instance/receiver.go index 8aa9919decff..99dcea612d80 100644 --- a/x-pack/libbeat/cmd/instance/receiver.go +++ b/x-pack/libbeat/cmd/instance/receiver.go @@ -7,11 +7,14 @@ package instance import ( "fmt" + "go.opentelemetry.io/collector/component" + "github.com/elastic/beats/v7/libbeat/api" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cmd/instance" "github.com/elastic/beats/v7/libbeat/version" _ "github.com/elastic/beats/v7/x-pack/libbeat/include" + "github.com/elastic/beats/v7/x-pack/libbeat/management" metricreport "github.com/elastic/elastic-agent-system-metrics/report" "go.uber.org/zap" @@ -73,7 +76,8 @@ func NewBeatReceiver(b *instance.Beat, creator beat.Creator, logger *zap.Logger) } // BeatReceiver.Stop() starts the beat receiver. -func (br *BeatReceiver) Start() error { +func (br *BeatReceiver) Start(host component.Host) error { + br.beat.Manager = management.NewOtelManager(br.beat.Manager, host) if err := br.beater.Run(&br.beat.Beat); err != nil { return fmt.Errorf("beat receiver run error: %w", err) } diff --git a/x-pack/libbeat/management/otel.go b/x-pack/libbeat/management/otel.go new file mode 100644 index 000000000000..419114b83f10 --- /dev/null +++ b/x-pack/libbeat/management/otel.go @@ -0,0 +1,52 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package management + +import ( + "errors" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentstatus" + + "github.com/elastic/beats/v7/libbeat/management" + "github.com/elastic/beats/v7/libbeat/management/status" +) + +type otelManager struct { + management.Manager + host component.Host +} + +var _ management.Manager = (*otelManager)(nil) +var _ status.StatusReporter = (*otelManager)(nil) + +func NewOtelManager(parent management.Manager, host component.Host) management.Manager { + return &otelManager{ + Manager: parent, + host: host, + } +} + +func (m *otelManager) UpdateStatus(s status.Status, msg string) { + var evt *componentstatus.Event + switch s { + case status.Starting: + evt = componentstatus.NewEvent(componentstatus.StatusStarting) + case status.Running: + evt = componentstatus.NewEvent(componentstatus.StatusOK) + case status.Degraded: + evt = componentstatus.NewRecoverableErrorEvent(errors.New(msg)) + case status.Failed: + evt = componentstatus.NewPermanentErrorEvent(errors.New(msg)) + case status.Stopping: + evt = componentstatus.NewEvent(componentstatus.StatusStopped) + case status.Stopped: + evt = componentstatus.NewEvent(componentstatus.StatusStopped) + default: + return + } + + componentstatus.ReportStatus(m.host, evt) +} diff --git a/x-pack/metricbeat/mbreceiver/receiver.go b/x-pack/metricbeat/mbreceiver/receiver.go index 54ccad56750f..66a56259440e 100644 --- a/x-pack/metricbeat/mbreceiver/receiver.go +++ b/x-pack/metricbeat/mbreceiver/receiver.go @@ -25,7 +25,7 @@ func (mb *metricbeatReceiver) Start(ctx context.Context, host component.Host) er go func() { defer mb.wg.Done() mb.Logger.Info("starting metricbeat receiver") - if err := mb.BeatReceiver.Start(); err != nil { + if err := mb.BeatReceiver.Start(host); err != nil { mb.Logger.Error("error starting metricbeat receiver", zap.Error(err)) } }()