From d05f589deff9ca291e0ae21a4d6db27a4563aa1d Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 16 Sep 2025 00:30:06 +0530 Subject: [PATCH 1/3] [beatreceivers] Integrate beatsauthextension (#9257) * [beatreceivers] Integrate beatsauthextension * add test cases * final tests and this works * update * remove agent.port * mage otel:readme * address review comments * check is ssl.TLS is non-nil * address review comments * remove port * make notice for beatsauth * update beatsauthextension * beatsauth final test * final fix * add beatsauthextension test * Address comments * fix test * fix test * change comment * update beatsauthextension * Update internal/pkg/otel/translate/otelconfig.go Co-authored-by: Craig MacKenzie * fix ci * add a test that a unique extension is created per output * fix ci * add comment * add better test * fix test --------- Co-authored-by: Craig MacKenzie (cherry picked from commit 779fafdcd34046e47af2095595d57d063e50707a) # Conflicts: # internal/pkg/otel/README.md # internal/pkg/otel/manager/manager.go # internal/pkg/otel/manager/manager_test.go # internal/pkg/otel/translate/otelconfig.go # internal/pkg/otel/translate/otelconfig_test.go --- NOTICE-fips.txt | 211 +++++++ NOTICE.txt | 211 +++++++ go.mod | 1 + go.sum | 2 + internal/pkg/otel/README.md | 12 + internal/pkg/otel/components.go | 2 + internal/pkg/otel/manager/manager.go | 136 +++++ internal/pkg/otel/manager/manager_test.go | 559 ++++++++++++++++++ internal/pkg/otel/translate/otelconfig.go | 141 ++++- .../pkg/otel/translate/otelconfig_test.go | 521 +++++++++++++--- testing/integration/ess/otel_test.go | 164 ++++- 11 files changed, 1865 insertions(+), 95 deletions(-) diff --git a/NOTICE-fips.txt b/NOTICE-fips.txt index 7661144f195..3de1ff1bc7e 100644 --- a/NOTICE-fips.txt +++ b/NOTICE-fips.txt @@ -3152,6 +3152,217 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/opentelemetry-c limitations under the License. +-------------------------------------------------------------------------------- +Dependency : github.com/elastic/opentelemetry-collector-components/extension/beatsauthextension +Version: v0.2.0 +Licence type (autodetected): Apache-2.0 +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/elastic/opentelemetry-collector-components/extension/beatsauthextension@v0.2.0/LICENSE: + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2018 Elasticsearch BV + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + -------------------------------------------------------------------------------- Dependency : github.com/elastic/opentelemetry-collector-components/processor/elasticinframetricsprocessor Version: v0.16.0 diff --git a/NOTICE.txt b/NOTICE.txt index 7dde28709e1..0a8faddaaf5 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -3152,6 +3152,217 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/opentelemetry-c limitations under the License. +-------------------------------------------------------------------------------- +Dependency : github.com/elastic/opentelemetry-collector-components/extension/beatsauthextension +Version: v0.2.0 +Licence type (autodetected): Apache-2.0 +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/elastic/opentelemetry-collector-components/extension/beatsauthextension@v0.2.0/LICENSE: + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2018 Elasticsearch BV + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + -------------------------------------------------------------------------------- Dependency : github.com/elastic/opentelemetry-collector-components/processor/elasticinframetricsprocessor Version: v0.16.0 diff --git a/go.mod b/go.mod index 6890a8a951d..46ca71fac27 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/elastic/opentelemetry-collector-components/connector/elasticapmconnector v0.6.0 github.com/elastic/opentelemetry-collector-components/extension/apikeyauthextension v0.4.1 github.com/elastic/opentelemetry-collector-components/extension/apmconfigextension v0.6.0 + github.com/elastic/opentelemetry-collector-components/extension/beatsauthextension v0.2.0 github.com/elastic/opentelemetry-collector-components/processor/elasticinframetricsprocessor v0.16.0 github.com/elastic/opentelemetry-collector-components/processor/elastictraceprocessor v0.9.0 github.com/fatih/color v1.18.0 diff --git a/go.sum b/go.sum index a2542d6151e..cf284514963 100644 --- a/go.sum +++ b/go.sum @@ -543,6 +543,8 @@ github.com/elastic/opentelemetry-collector-components/extension/apikeyauthextens github.com/elastic/opentelemetry-collector-components/extension/apikeyauthextension v0.4.1/go.mod h1:4CogfV72wu4glmT7/Gr6XzzJtSoGQs/cDAeSbb7UZYc= github.com/elastic/opentelemetry-collector-components/extension/apmconfigextension v0.6.0 h1:UBAq2kilCpYBKkRQovA8AG5N5AAnQY5IA3ZEhv4vMAo= github.com/elastic/opentelemetry-collector-components/extension/apmconfigextension v0.6.0/go.mod h1:E1uPMjGeBL8PbYCqX4WHMHGqZ3Oo+gK1OKe5Mp1146M= +github.com/elastic/opentelemetry-collector-components/extension/beatsauthextension v0.2.0 h1:cQ4Bu5iyJn5jk68OdwpGIifqVwAZUCoYpN3ZVbVRGBA= +github.com/elastic/opentelemetry-collector-components/extension/beatsauthextension v0.2.0/go.mod h1:aG7w7AA2CydjMxGG6zUZggXCPa+jVRhVYo/92wiDx4Q= github.com/elastic/opentelemetry-collector-components/internal/sharedcomponent v0.0.0-20250220025958-386ba0c4bced h1:XcWi/S3OoeE5Qwmj381AoO3nr3AVXl4Z4QO0mqyjlzU= github.com/elastic/opentelemetry-collector-components/internal/sharedcomponent v0.0.0-20250220025958-386ba0c4bced/go.mod h1:8e9NcGfE2xeor8r+WV9a+hKBEkzJEDnqZN7tqb3GUe8= github.com/elastic/opentelemetry-collector-components/internal/testutil v0.0.0-20250613082151-282de5af1c9b h1:NWuTKdMCJlU9ehRH8V0w1Kk1QI5Vn+9OcJWIO9wI+pE= diff --git a/internal/pkg/otel/README.md b/internal/pkg/otel/README.md index f8c1362ded1..7feaebad1df 100644 --- a/internal/pkg/otel/README.md +++ b/internal/pkg/otel/README.md @@ -89,6 +89,7 @@ This section provides a summary of components included in the Elastic Distributi |---|---| | [apikeyauthextension](https://github.com/elastic/opentelemetry-collector-components/blob/extension/apikeyauthextension/v0.4.1/extension/apikeyauthextension/README.md) | v0.4.1 | | [apmconfigextension](https://github.com/elastic/opentelemetry-collector-components/blob/extension/apmconfigextension/v0.6.0/extension/apmconfigextension/README.md) | v0.6.0 | +<<<<<<< HEAD | [bearertokenauthextension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/bearertokenauthextension/v0.130.0/extension/bearertokenauthextension/README.md) | v0.130.0 | | [filestorage](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/storage/filestorage/v0.130.0/extension/storage/filestorage/README.md) | v0.130.0 | | [healthcheckextension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/healthcheckextension/v0.130.0/extension/healthcheckextension/README.md) | v0.130.0 | @@ -96,6 +97,17 @@ This section provides a summary of components included in the Elastic Distributi | [k8sobserver](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/observer/k8sobserver/v0.130.0/extension/observer/k8sobserver/README.md) | v0.130.0 | | [memorylimiterextension](https://github.com/open-telemetry/opentelemetry-collector/blob/extension/memorylimiterextension/v0.130.0/extension/memorylimiterextension/README.md) | v0.130.0 | | [pprofextension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/pprofextension/v0.130.0/extension/pprofextension/README.md) | v0.130.0 | +======= +| [bearertokenauthextension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/bearertokenauthextension/v0.132.0/extension/bearertokenauthextension/README.md) | v0.132.0 | +| [beatsauthextension](https://github.com/elastic/opentelemetry-collector-components/blob/extension/beatsauthextension/v0.2.0/extension/beatsauthextension/README.md) | v0.2.0 | +| [filestorage](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/storage/filestorage/v0.132.0/extension/storage/filestorage/README.md) | v0.132.0 | +| [healthcheckextension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/healthcheckextension/v0.132.0/extension/healthcheckextension/README.md) | v0.132.0 | +| [healthcheckv2extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/healthcheckv2extension/v0.132.0/extension/healthcheckv2extension/README.md) | v0.132.0 | +| [k8sleaderelector](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/k8sleaderelector/v0.132.0/extension/k8sleaderelector/README.md) | v0.132.0 | +| [k8sobserver](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/observer/k8sobserver/v0.132.0/extension/observer/k8sobserver/README.md) | v0.132.0 | +| [memorylimiterextension](https://github.com/open-telemetry/opentelemetry-collector/blob/extension/memorylimiterextension/v0.132.0/extension/memorylimiterextension/README.md) | v0.132.0 | +| [pprofextension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/pprofextension/v0.132.0/extension/pprofextension/README.md) | v0.132.0 | +>>>>>>> 779fafdcd ([beatreceivers] Integrate beatsauthextension (#9257)) ### Connectors diff --git a/internal/pkg/otel/components.go b/internal/pkg/otel/components.go index 4488cd23bfa..49a1bc8fcb8 100644 --- a/internal/pkg/otel/components.go +++ b/internal/pkg/otel/components.go @@ -75,6 +75,7 @@ import ( forwardconnector "go.opentelemetry.io/collector/connector/forwardconnector" elasticapmconnector "github.com/elastic/opentelemetry-collector-components/connector/elasticapmconnector" + beatsauthextension "github.com/elastic/opentelemetry-collector-components/extension/beatsauthextension" ) func components(extensionFactories ...extension.Factory) func() (otelcol.Factories, error) { @@ -167,6 +168,7 @@ func components(extensionFactories ...extension.Factory) func() (otelcol.Factori k8sobserver.NewFactory(), apikeyauthextension.NewFactory(), apmconfigextension.NewFactory(), + beatsauthextension.NewFactory(), } extensions = append(extensions, extensionFactories...) factories.Extensions, err = otelcol.MakeFactoryMap[extension.Factory](extensions...) diff --git a/internal/pkg/otel/manager/manager.go b/internal/pkg/otel/manager/manager.go index 7175cf6b036..b8543d2c722 100644 --- a/internal/pkg/otel/manager/manager.go +++ b/internal/pkg/otel/manager/manager.go @@ -112,6 +112,7 @@ func (m *OTelManager) Run(ctx context.Context) error { m.reportErr(ctx, err) } } +<<<<<<< HEAD case cfg := <-m.cfgCh: m.cfg = cfg if cfg == nil { @@ -156,6 +157,35 @@ func (m *OTelManager) Run(ctx context.Context) error { // needs to be updated in the collector provider.Update(m.cfg) } +======= + + case cfgUpdate := <-m.updateCh: + // we received a new configuration, thus stop the recovery timer + // and reset the retry count + m.recoveryTimer.Stop() + m.recoveryRetries.Store(0) + mergedCfg, err := buildMergedConfig(cfgUpdate, m.agentInfo, m.beatMonitoringConfigGetter, m.baseLogger) + if err != nil { + reportErr(ctx, m.errCh, err) + continue + } + + // this is the only place where we mutate the internal config attributes, take a write lock for the duration + m.mx.Lock() + m.mergedCollectorCfg = mergedCfg + m.collectorCfg = cfgUpdate.collectorCfg + m.components = cfgUpdate.components + m.mx.Unlock() + + err = m.applyMergedConfig(ctx, collectorStatusCh, collectorRunErr) + // report the error unconditionally to indicate that the config was applied + reportErr(ctx, m.errCh, err) + + case otelStatus := <-collectorStatusCh: + err = m.reportOtelStatusUpdate(ctx, otelStatus) + if err != nil { + reportErr(ctx, m.errCh, err) +>>>>>>> 779fafdcd ([beatreceivers] Integrate beatsauthextension (#9257)) } } } @@ -166,10 +196,116 @@ func (m *OTelManager) Errors() <-chan error { return m.errCh } +<<<<<<< HEAD // Update updates the configuration. // // When nil is passed for the cfg, then the collector is stopped. func (m *OTelManager) Update(cfg *confmap.Conf) { +======= +// buildMergedConfig combines collector configuration with component-derived configuration. +func buildMergedConfig(cfgUpdate configUpdate, agentInfo info.Agent, monitoringConfigGetter translate.BeatMonitoringConfigGetter, logger *logp.Logger) (*confmap.Conf, error) { + mergedOtelCfg := confmap.New() + + // Generate component otel config if there are components + var componentOtelCfg *confmap.Conf + if len(cfgUpdate.components) > 0 { + model := &component.Model{Components: cfgUpdate.components} + var err error + componentOtelCfg, err = translate.GetOtelConfig(model, agentInfo, monitoringConfigGetter, logger) + if err != nil { + return nil, fmt.Errorf("failed to generate otel config: %w", err) + } + } + + // If both configs are nil, return nil so the manager knows to stop the collector + if componentOtelCfg == nil && cfgUpdate.collectorCfg == nil { + return nil, nil + } + + // Merge component config if it exists + if componentOtelCfg != nil { + err := mergedOtelCfg.Merge(componentOtelCfg) + if err != nil { + return nil, fmt.Errorf("failed to merge component otel config: %w", err) + } + } + + // Merge with base collector config if it exists + if cfgUpdate.collectorCfg != nil { + err := mergedOtelCfg.Merge(cfgUpdate.collectorCfg) + if err != nil { + return nil, fmt.Errorf("failed to merge collector otel config: %w", err) + } + } + + return mergedOtelCfg, nil +} + +func (m *OTelManager) applyMergedConfig(ctx context.Context, collectorStatusCh chan *status.AggregateStatus, collectorRunErr chan error) error { + if m.proc != nil { + m.proc.Stop(ctx) + m.proc = nil + select { + case <-collectorRunErr: + case <-ctx.Done(): + // our caller ctx is Done + return ctx.Err() + } + // drain the internal status update channel + // this status handling is normally done in the main loop, but in this case we want to ensure that we emit a + // nil status after the collector has stopped + select { + case statusCh := <-collectorStatusCh: + updateErr := m.reportOtelStatusUpdate(ctx, statusCh) + if updateErr != nil { + m.logger.Error("failed to update otel status", zap.Error(updateErr)) + } + case <-ctx.Done(): + // our caller ctx is Done + return ctx.Err() + default: + } + err := m.reportOtelStatusUpdate(ctx, nil) + if err != nil { + return err + } + } + + if m.mergedCollectorCfg == nil { + // no configuration then the collector should not be + // running. + // ensure that the coordinator knows that there is no error + // as the collector is not running anymore + return nil + } else { + // either a new configuration or the first configuration + // that results in the collector being started + proc, err := m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh) + if err != nil { + // failed to create the collector (this is different then + // it's failing to run). we do not retry creation on failure + // as it will always fail. A new configuration is required for + // it not to fail (a new configuration will result in the retry) + // since this is a new configuration we want to start the timer + // from the initial delay + recoveryDelay := m.recoveryTimer.ResetInitial() + m.logger.Errorf("collector exited with error (will try to recover in %s): %v", recoveryDelay.String(), err) + return err + } else { + // all good at the moment (possible that it will fail) + m.proc = proc + } + } + return nil +} + +// Update sends collector configuration and component updates to the manager's run loop. +func (m *OTelManager) Update(cfg *confmap.Conf, components []component.Component) { + cfgUpdate := configUpdate{ + collectorCfg: cfg, + components: components, + } +>>>>>>> 779fafdcd ([beatreceivers] Integrate beatsauthextension (#9257)) select { case m.cfgCh <- cfg: case <-m.doneChan: diff --git a/internal/pkg/otel/manager/manager_test.go b/internal/pkg/otel/manager/manager_test.go index 72c00d32645..814e62e5ddc 100644 --- a/internal/pkg/otel/manager/manager_test.go +++ b/internal/pkg/otel/manager/manager_test.go @@ -15,12 +15,17 @@ import ( "gopkg.in/yaml.v2" +<<<<<<< HEAD "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/confmap" +======= + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/logp/logptest" +>>>>>>> 779fafdcd ([beatreceivers] Integrate beatsauthextension (#9257)) "github.com/elastic/elastic-agent/pkg/core/logger/loggertest" ) @@ -272,3 +277,557 @@ func toSerializableStatus(s *status.AggregateStatus) *serializableStatus { } return outputStruct } +<<<<<<< HEAD +======= + +// Mock function for BeatMonitoringConfigGetter +func mockBeatMonitoringConfigGetter(unitID, binary string) map[string]any { + return map[string]any{"test": "config"} +} + +// Helper function to create test logger +func newTestLogger() *logger.Logger { + l, _ := loggertest.New("test") + return l +} + +func TestOTelManager_buildMergedConfig(t *testing.T) { + // Common parameters used across all test cases + commonAgentInfo := &info.AgentInfo{} + commonBeatMonitoringConfigGetter := mockBeatMonitoringConfigGetter + testComp := testComponent("test-component") + + tests := []struct { + name string + collectorCfg *confmap.Conf + components []component.Component + expectedKeys []string + expectedErrorString string + }{ + { + name: "nil config returns nil", + collectorCfg: nil, + components: nil, + }, + { + name: "empty config returns empty config", + collectorCfg: nil, + components: nil, + expectedKeys: []string{}, + }, + { + name: "collector config only", + collectorCfg: confmap.NewFromStringMap(map[string]any{"receivers": map[string]any{"nop": map[string]any{}}}), + components: nil, + expectedKeys: []string{"receivers"}, + }, + { + name: "components only", + collectorCfg: nil, + components: []component.Component{testComp}, + expectedKeys: []string{"receivers", "exporters", "service"}, + }, + { + name: "both collector config and components", + collectorCfg: confmap.NewFromStringMap(map[string]any{"processors": map[string]any{"batch": map[string]any{}}}), + components: []component.Component{testComp}, + expectedKeys: []string{"receivers", "exporters", "service", "processors"}, + }, + { + name: "component config generation error", + collectorCfg: nil, + components: []component.Component{{ + ID: "test-component", + InputType: "filestream", // Supported input type + OutputType: "elasticsearch", // Supported output type + // Missing InputSpec which should cause an error during config generation + }}, + expectedErrorString: "failed to generate otel config: unknown otel receiver type for input type: filestream", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfgUpdate := configUpdate{ + collectorCfg: tt.collectorCfg, + components: tt.components, + } + result, err := buildMergedConfig(cfgUpdate, commonAgentInfo, commonBeatMonitoringConfigGetter, logptest.NewTestingLogger(t, "")) + + if tt.expectedErrorString != "" { + assert.Error(t, err) + assert.Equal(t, tt.expectedErrorString, err.Error()) + assert.Nil(t, result) + return + } + + assert.NoError(t, err) + + if len(tt.expectedKeys) == 0 { + assert.Nil(t, result) + return + } + + require.NotNil(t, result) + for _, key := range tt.expectedKeys { + assert.True(t, result.IsSet(key), "Expected key %s to be set", key) + } + }) + } +} + +func TestOTelManager_handleOtelStatusUpdate(t *testing.T) { + // Common test component used across test cases + testComp := testComponent("test-component") + + tests := []struct { + name string + components []component.Component + inputStatus *status.AggregateStatus + expectedErrorString string + expectedCollectorStatus *status.AggregateStatus + expectedComponentStates []runtime.ComponentComponentState + }{ + { + name: "successful status update with component states", + components: []component.Component{testComp}, + inputStatus: &status.AggregateStatus{ + Event: componentstatus.NewEvent(componentstatus.StatusOK), + ComponentStatusMap: map[string]*status.AggregateStatus{ + // This represents a pipeline for our component (with OtelNamePrefix) + "pipeline:logs/_agent-component/test-component": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + ComponentStatusMap: map[string]*status.AggregateStatus{ + "receiver:filebeat/_agent-component/test-component": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + "exporter:elasticsearch/_agent-component/test-component": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + }, + }, + // This represents a regular collector pipeline (should remain after cleaning) + "pipeline:logs": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + }, + }, + expectedCollectorStatus: &status.AggregateStatus{ + Event: componentstatus.NewEvent(componentstatus.StatusOK), + ComponentStatusMap: map[string]*status.AggregateStatus{ + // This represents a regular collector pipeline (should remain after cleaning) + "pipeline:logs": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + }, + }, + expectedComponentStates: []runtime.ComponentComponentState{ + { + Component: testComp, + State: runtime.ComponentState{ + State: client.UnitStateHealthy, + Message: "HEALTHY", + Units: map[runtime.ComponentUnitKey]runtime.ComponentUnitState{ + runtime.ComponentUnitKey{ + UnitID: "filestream-unit", + UnitType: client.UnitTypeInput, + }: { + State: client.UnitStateHealthy, + Message: "Healthy", + Payload: map[string]any{ + "streams": map[string]map[string]string{ + "test-1": { + "error": "", + "status": client.UnitStateHealthy.String(), + }, + "test-2": { + "error": "", + "status": client.UnitStateHealthy.String(), + }, + }, + }, + }, + runtime.ComponentUnitKey{ + UnitID: "filestream-default", + UnitType: client.UnitTypeOutput, + }: { + State: client.UnitStateHealthy, + Message: "Healthy", + }, + }, + VersionInfo: runtime.ComponentVersionInfo{ + Name: translate.OtelComponentName, + Meta: map[string]string{ + "build_time": version.BuildTime().String(), + "commit": version.Commit(), + }, + BuildHash: version.Commit(), + }, + }, + }, + }, + }, + { + name: "handles nil otel status", + components: []component.Component{}, + inputStatus: nil, + expectedCollectorStatus: nil, + expectedComponentStates: nil, + }, + { + name: "handles empty components list", + components: []component.Component{}, + inputStatus: &status.AggregateStatus{ + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + expectedErrorString: "", + expectedCollectorStatus: &status.AggregateStatus{ + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + expectedComponentStates: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mgr := &OTelManager{ + logger: newTestLogger(), + components: tt.components, + currentComponentStates: make(map[string]runtime.ComponentComponentState), + } + + componentStates, err := mgr.handleOtelStatusUpdate(tt.inputStatus) + + // Verify error expectation + if tt.expectedErrorString != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tt.expectedErrorString) + return + } + + require.NoError(t, err) + + // Compare component states + assert.Equal(t, tt.expectedComponentStates, componentStates) + + // Compare collector status + assertOtelStatusesEqualIgnoringTimestamps(t, tt.expectedCollectorStatus, mgr.currentCollectorStatus) + }) + } +} + +func TestOTelManager_processComponentStates(t *testing.T) { + tests := []struct { + name string + currentComponentStates map[string]runtime.ComponentComponentState + inputComponentStates []runtime.ComponentComponentState + expectedOutputStates []runtime.ComponentComponentState + expectedCurrentStatesAfter map[string]runtime.ComponentComponentState + }{ + { + name: "empty input and current states", + currentComponentStates: map[string]runtime.ComponentComponentState{}, + inputComponentStates: []runtime.ComponentComponentState{}, + expectedOutputStates: []runtime.ComponentComponentState{}, + expectedCurrentStatesAfter: map[string]runtime.ComponentComponentState{}, + }, + { + name: "new component state added", + currentComponentStates: map[string]runtime.ComponentComponentState{}, + inputComponentStates: []runtime.ComponentComponentState{ + { + Component: component.Component{ID: "comp1"}, + State: runtime.ComponentState{State: client.UnitStateHealthy}, + }, + }, + expectedOutputStates: []runtime.ComponentComponentState{ + { + Component: component.Component{ID: "comp1"}, + State: runtime.ComponentState{State: client.UnitStateHealthy}, + }, + }, + expectedCurrentStatesAfter: map[string]runtime.ComponentComponentState{ + "comp1": { + Component: component.Component{ID: "comp1"}, + State: runtime.ComponentState{State: client.UnitStateHealthy}, + }, + }, + }, + { + name: "component removed from config generates STOPPED state", + currentComponentStates: map[string]runtime.ComponentComponentState{ + "comp1": { + Component: component.Component{ID: "comp1"}, + State: runtime.ComponentState{State: client.UnitStateHealthy}, + }, + }, + inputComponentStates: []runtime.ComponentComponentState{}, + expectedOutputStates: []runtime.ComponentComponentState{ + { + Component: component.Component{ID: "comp1"}, + State: runtime.ComponentState{State: client.UnitStateStopped}, + }, + }, + expectedCurrentStatesAfter: map[string]runtime.ComponentComponentState{}, + }, + { + name: "component stopped removes from current states", + currentComponentStates: map[string]runtime.ComponentComponentState{ + "comp1": { + Component: component.Component{ID: "comp1"}, + State: runtime.ComponentState{State: client.UnitStateHealthy}, + }, + }, + inputComponentStates: []runtime.ComponentComponentState{ + { + Component: component.Component{ID: "comp1"}, + State: runtime.ComponentState{State: client.UnitStateStopped}, + }, + }, + expectedOutputStates: []runtime.ComponentComponentState{ + { + Component: component.Component{ID: "comp1"}, + State: runtime.ComponentState{State: client.UnitStateStopped}, + }, + }, + expectedCurrentStatesAfter: map[string]runtime.ComponentComponentState{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mgr := &OTelManager{ + logger: newTestLogger(), + currentComponentStates: tt.currentComponentStates, + } + + result := mgr.processComponentStates(tt.inputComponentStates) + + assert.ElementsMatch(t, tt.expectedOutputStates, result) + assert.Equal(t, tt.expectedCurrentStatesAfter, mgr.currentComponentStates) + }) + } +} + +// TestOTelManagerEndToEnd tests the full lifecycle of the OTelManager +// including configuration updates, status updates, and error handling. +func TestOTelManagerEndToEnd(t *testing.T) { + // Setup test logger and dependencies + testLogger, _ := loggertest.New("test") + agentInfo := &info.AgentInfo{} + beatMonitoringConfigGetter := mockBeatMonitoringConfigGetter + collectorStarted := make(chan struct{}) + + execution := &mockExecution{ + collectorStarted: collectorStarted, + } + + // Create manager with test dependencies + mgr := OTelManager{ + logger: testLogger, + baseLogger: testLogger, + errCh: make(chan error, 1), // holds at most one error + updateCh: make(chan configUpdate), + collectorStatusCh: make(chan *status.AggregateStatus, 1), + componentStateCh: make(chan []runtime.ComponentComponentState, 1), + doneChan: make(chan struct{}), + recoveryTimer: newRestarterNoop(), + execution: execution, + agentInfo: agentInfo, + beatMonitoringConfigGetter: beatMonitoringConfigGetter, + } + + // Start manager in a goroutine + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*1) + defer cancel() + + go func() { + err := mgr.Run(ctx) + assert.ErrorIs(t, err, context.Canceled) + }() + + collectorCfg := confmap.NewFromStringMap(map[string]interface{}{ + "receivers": map[string]interface{}{ + "nop": map[string]interface{}{}, + }, + "exporters": map[string]interface{}{"nop": map[string]interface{}{}}, + "service": map[string]interface{}{ + "pipelines": map[string]interface{}{ + "metrics": map[string]interface{}{ + "receivers": []string{"nop"}, + "exporters": []string{"nop"}, + }, + }, + }, + }) + + testComp := testComponent("test") + components := []component.Component{testComp} + + t.Run("collector config is passed down to the collector execution", func(t *testing.T) { + mgr.Update(collectorCfg, nil) + select { + case <-collectorStarted: + case <-ctx.Done(): + t.Fatal("timeout waiting for collector config update") + } + assert.Equal(t, collectorCfg, execution.cfg) + + }) + + t.Run("collector status is passed up to the component manager", func(t *testing.T) { + otelStatus := &status.AggregateStatus{ + Event: componentstatus.NewEvent(componentstatus.StatusOK), + } + + select { + case <-ctx.Done(): + t.Fatal("timeout waiting for collector status update") + case execution.statusCh <- otelStatus: + } + + collectorStatus, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchCollector(), mgr.Errors()) + require.NoError(t, err) + assert.Equal(t, otelStatus, collectorStatus) + }) + + t.Run("component config is passed down to the otel manager", func(t *testing.T) { + mgr.Update(collectorCfg, components) + select { + case <-collectorStarted: + case <-ctx.Done(): + t.Fatal("timeout waiting for collector config update") + } + cfg := execution.cfg + require.NotNil(t, cfg) + receivers, err := cfg.Sub("receivers") + require.NoError(t, err) + require.NotNil(t, receivers) + assert.True(t, receivers.IsSet("nop")) + assert.True(t, receivers.IsSet("filebeatreceiver/_agent-component/test")) + + collectorStatus, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchCollector(), mgr.Errors()) + assert.Nil(t, err) + assert.Nil(t, collectorStatus) + }) + + t.Run("empty collector config leaves the component config running", func(t *testing.T) { + mgr.Update(nil, components) + select { + case <-collectorStarted: + case <-ctx.Done(): + t.Fatal("timeout waiting for collector config update") + } + cfg := execution.cfg + require.NotNil(t, cfg) + receivers, err := cfg.Sub("receivers") + require.NoError(t, err) + require.NotNil(t, receivers) + assert.False(t, receivers.IsSet("nop")) + assert.True(t, receivers.IsSet("filebeatreceiver/_agent-component/test")) + + collectorStatus, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchCollector(), mgr.Errors()) + assert.Nil(t, err) + assert.Nil(t, collectorStatus) + }) + + t.Run("collector status with components is passed up to the component manager", func(t *testing.T) { + otelStatus := &status.AggregateStatus{ + Event: componentstatus.NewEvent(componentstatus.StatusOK), + ComponentStatusMap: map[string]*status.AggregateStatus{ + // This represents a pipeline for our component (with OtelNamePrefix) + "pipeline:logs/_agent-component/test": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + ComponentStatusMap: map[string]*status.AggregateStatus{ + "receiver:filebeatreceiver/_agent-component/test": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + "exporter:elasticsearch/_agent-component/test": { + Event: componentstatus.NewEvent(componentstatus.StatusOK), + }, + }, + }, + }, + } + + select { + case <-ctx.Done(): + t.Fatal("timeout waiting for collector status update") + case execution.statusCh <- otelStatus: + } + + collectorStatus, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchCollector(), mgr.Errors()) + require.NoError(t, err) + require.NotNil(t, collectorStatus) + assert.Len(t, collectorStatus.ComponentStatusMap, 0) + + componentState, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchComponents(), mgr.Errors()) + require.NoError(t, err) + require.NotNil(t, componentState) + require.Len(t, componentState, 1) + assert.Equal(t, componentState[0].Component, testComp) + }) + + t.Run("collector error is passed up to the component manager", func(t *testing.T) { + collectorErr := errors.New("collector error") + + select { + case <-ctx.Done(): + t.Fatal("timeout waiting for collector status update") + case execution.errCh <- collectorErr: + } + + // we should get a nil status and an error + select { + case <-ctx.Done(): + t.Fatal("timeout waiting for collector status update") + case s := <-mgr.WatchCollector(): + assert.Nil(t, s) + } + select { + case <-ctx.Done(): + t.Fatal("timeout waiting for collector status update") + case err := <-mgr.Errors(): + assert.Equal(t, collectorErr, err) + } + }) +} + +func getFromChannelOrErrorWithContext[T any](t *testing.T, ctx context.Context, ch <-chan T, errCh <-chan error) (T, error) { + t.Helper() + var result T + var err error + for err == nil { + select { + case result = <-ch: + return result, nil + case err = <-errCh: + case <-ctx.Done(): + err = ctx.Err() + } + } + return result, err +} + +func assertOtelStatusesEqualIgnoringTimestamps(t require.TestingT, a, b *status.AggregateStatus) bool { + if a == nil || b == nil { + return assert.Equal(t, a, b) + } + + if !assert.Equal(t, a.Status(), b.Status()) { + return false + } + + if !assert.Equal(t, len(a.ComponentStatusMap), len(b.ComponentStatusMap)) { + return false + } + + for k, v := range a.ComponentStatusMap { + if !assertOtelStatusesEqualIgnoringTimestamps(t, v, b.ComponentStatusMap[k]) { + return false + } + } + + return true +} +>>>>>>> 779fafdcd ([beatreceivers] Integrate beatsauthextension (#9257)) diff --git a/internal/pkg/otel/translate/otelconfig.go b/internal/pkg/otel/translate/otelconfig.go index d4402f7bb95..015a3cc5070 100644 --- a/internal/pkg/otel/translate/otelconfig.go +++ b/internal/pkg/otel/translate/otelconfig.go @@ -18,6 +18,7 @@ import ( "golang.org/x/exp/maps" elasticsearchtranslate "github.com/elastic/beats/v7/libbeat/otelbeat/oteltranslate/outputs/elasticsearch" + "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" "github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver" "github.com/elastic/beats/v7/x-pack/libbeat/management" "github.com/elastic/beats/v7/x-pack/metricbeat/mbreceiver" @@ -36,7 +37,7 @@ const OtelNamePrefix = "_agent-component/" // BeatMonitoringConfigGetter is a function that returns the monitoring configuration for a beat receiver. type BeatMonitoringConfigGetter func(unitID, binary string) map[string]any -type exporterConfigTranslationFunc func(*config.C) (map[string]any, error) +type exporterConfigTranslationFunc func(*config.C, *logp.Logger) (map[string]any, error) var ( OtelSupportedOutputTypes = []string{"elasticsearch"} @@ -53,18 +54,31 @@ func GetOtelConfig( model *component.Model, info info.Agent, beatMonitoringConfigGetter BeatMonitoringConfigGetter, + logger *logp.Logger, ) (*confmap.Conf, error) { components := getSupportedComponents(model) if len(components) == 0 { return nil, nil } - otelConfig := confmap.New() // base config, nothing here for now + otelConfig := confmap.New() // base config, nothing here for now + extensionList := []interface{}{} // we have to maintain a list because otel does not merge lists, it overrides them. This is a known issue: see https://github.com/open-telemetry/opentelemetry-collector/issues/8754 for _, comp := range components { - componentConfig, compErr := getCollectorConfigForComponent(comp, info, beatMonitoringConfigGetter) + componentConfig, compErr := getCollectorConfigForComponent(comp, info, beatMonitoringConfigGetter, logger) if compErr != nil { return nil, compErr } + + // logic to merge extension list + if componentConfig.IsSet("service::extensions") { + extensionList = append(extensionList, componentConfig.Get("service::extensions").([]interface{})...) + extensions := confmap.NewFromStringMap(map[string]any{"service::extensions": extensionList}) + err := componentConfig.Merge(extensions) + if err != nil { + return nil, fmt.Errorf("error merging otel extensions for component %s: %w", comp.ID, err) + } + } + // the assumption here is that each component will define its own receivers, and the shared exporters // will be merged mergeErr := otelConfig.Merge(componentConfig) @@ -116,15 +130,24 @@ func getExporterID(exporterType otelcomponent.Type, outputName string) otelcompo return otelcomponent.NewIDWithName(exporterType, exporterName) } +// getBeatsAuthExtensionID returns the id for beatsauth extension +// outputName here is name of the output defined in elastic-agent.yml. For ex: default, monitoring +func getBeatsAuthExtensionID(outputName string) otelcomponent.ID { + extensionName := fmt.Sprintf("%s%s", OtelNamePrefix, outputName) + return otelcomponent.NewIDWithName(otelcomponent.MustNewType("beatsauth"), extensionName) +} + // getCollectorConfigForComponent returns the Otel collector config required to run the given component. // This function returns a full, valid configuration that can then be merged with configurations for other components. +// Note: Lists are not merged and should be handled by the caller of the method func getCollectorConfigForComponent( comp *component.Component, info info.Agent, beatMonitoringConfigGetter BeatMonitoringConfigGetter, + logger *logp.Logger, ) (*confmap.Conf, error) { - exportersConfig, outputQueueConfig, err := getExportersConfigForComponent(comp) + exportersConfig, outputQueueConfig, extensionConfig, err := getExportersConfigForComponent(comp, logger) if err != nil { return nil, err } @@ -144,13 +167,22 @@ func getCollectorConfigForComponent( }, } + // we need to convert []string to []interface for this to work + extensionKey := make([]interface{}, len(maps.Keys(extensionConfig))) + for i, v := range maps.Keys(extensionConfig) { + extensionKey[i] = v + } + fullConfig := map[string]any{ - "receivers": receiversConfig, - "exporters": exportersConfig, + "receivers": receiversConfig, + "exporters": exportersConfig, + "extensions": extensionConfig, "service": map[string]any{ - "pipelines": pipelinesConfig, + "extensions": extensionKey, + "pipelines": pipelinesConfig, }, } + return confmap.NewFromStringMap(fullConfig), nil } @@ -243,26 +275,31 @@ func getReceiversConfigForComponent( // getReceiversConfigForComponent returns the exporters configuration and queue settings for a component. Usually this will be a single // exporter, but in principle it could be more. -func getExportersConfigForComponent(comp *component.Component) (exporterCfg map[string]any, queueCfg map[string]any, err error) { +func getExportersConfigForComponent(comp *component.Component, logger *logp.Logger) (exporterCfg map[string]any, queueCfg map[string]any, extensionCfg map[string]any, err error) { exportersConfig := map[string]any{} + extensionConfig := map[string]any{} exporterType, err := getExporterTypeForComponent(comp) if err != nil { - return nil, nil, err + return nil, nil, nil, err } var queueSettings map[string]any for _, unit := range comp.Units { if unit.Type == client.UnitTypeOutput { var unitExportersConfig map[string]any - unitExportersConfig, queueSettings, err = unitToExporterConfig(unit, exporterType, comp.InputType) + var unitExtensionConfig map[string]any + unitExportersConfig, queueSettings, unitExtensionConfig, err = unitToExporterConfig(unit, exporterType, comp.InputType, logger) if err != nil { - return nil, nil, err + return nil, nil, nil, err } for k, v := range unitExportersConfig { exportersConfig[k] = v } + for k, v := range unitExtensionConfig { + extensionConfig[k] = v + } } } - return exportersConfig, queueSettings, nil + return exportersConfig, queueSettings, extensionConfig, nil } // getBeatNameForComponent returns the beat binary name that would be used to run this component. @@ -310,13 +347,13 @@ func getExporterTypeForComponent(comp *component.Component) (otelcomponent.Type, } // unitToExporterConfig translates a component.Unit to return an otel exporter configuration and output queue settings -func unitToExporterConfig(unit component.Unit, exporterType otelcomponent.Type, inputType string) (exportersCfg map[string]any, queueSettings map[string]any, err error) { +func unitToExporterConfig(unit component.Unit, exporterType otelcomponent.Type, inputType string, logger *logp.Logger) (exportersCfg map[string]any, queueSettings map[string]any, extensionCfg map[string]any, err error) { if unit.Type == client.UnitTypeInput { - return nil, nil, fmt.Errorf("unit type is an input, expected output: %v", unit) + return nil, nil, nil, fmt.Errorf("unit type is an input, expected output: %v", unit) } configTranslationFunc, ok := configTranslationFuncForExporter[exporterType] if !ok { - return nil, nil, fmt.Errorf("no config translation function for exporter type: %s", exporterType) + return nil, nil, nil, fmt.Errorf("no config translation function for exporter type: %s", exporterType) } // we'd like to use the same exporter for all outputs with the same name, so we parse out the name for the unit id // these will be deduplicated by the configuration merging process at the end @@ -327,30 +364,51 @@ func unitToExporterConfig(unit component.Unit, exporterType otelcomponent.Type, unitConfigMap := unit.Config.GetSource().AsMap() // this is what beats do in libbeat/management/generate.go outputCfgC, err := config.NewConfigFrom(unitConfigMap) if err != nil { - return nil, nil, fmt.Errorf("error translating config for output: %s, unit: %s, error: %w", outputName, unit.ID, err) + return nil, nil, nil, fmt.Errorf("error translating config for output: %s, unit: %s, error: %w", outputName, unit.ID, err) } + // Config translation function can mutate queue settings defined under output config - exporterConfig, err := configTranslationFunc(outputCfgC) + exporterConfig, err := configTranslationFunc(outputCfgC, logger) if err != nil { - return nil, nil, fmt.Errorf("error translating config for output: %s, unit: %s, error: %w", outputName, unit.ID, err) - } - - exportersCfg = map[string]any{ - exporterId.String(): exporterConfig, + return nil, nil, nil, fmt.Errorf("error translating config for output: %s, unit: %s, error: %w", outputName, unit.ID, err) } // If output config contains queue settings defined by user/preset field, it should be promoted to the receiver section if ok := outputCfgC.HasField("queue"); ok { err := outputCfgC.Unpack(&queueSettings) if err != nil { - return nil, nil, fmt.Errorf("error unpacking queue settings for output: %s, unit: %s, error: %w", outputName, unit.ID, err) + return nil, nil, nil, fmt.Errorf("error unpacking queue settings for output: %s, unit: %s, error: %w", outputName, unit.ID, err) } if queue, ok := queueSettings["queue"].(map[string]any); ok { queueSettings = queue } } - return exportersCfg, queueSettings, nil + // beatsauth extension is not tested with output other than elasticsearch + if exporterType.String() == "elasticsearch" { + // get extension ID + extensionID := getBeatsAuthExtensionID(outputName) + extensionConfig, err := getBeatsAuthExtensionConfig(outputCfgC) + if err != nil { + return nil, nil, nil, fmt.Errorf("error supporting http parameters for output: %s, unit: %s, error: %w", outputName, unit.ID, err) + } + + // sets extensionCfg + extensionCfg = map[string]any{ + extensionID.String(): extensionConfig, + } + // add authenticator to ES config + exporterConfig["auth"] = map[string]any{ + "authenticator": extensionID.String(), + } + + } + + exportersCfg = map[string]any{ + exporterId.String(): exporterConfig, + } + + return exportersCfg, queueSettings, extensionCfg, nil } // getInputsForUnit returns the beat inputs for a unit. These can directly be plugged into a beats receiver config. @@ -397,8 +455,13 @@ func getDefaultDatastreamTypeForComponent(comp *component.Component) (string, er } // translateEsOutputToExporter translates an elasticsearch output configuration to an elasticsearch exporter configuration. +<<<<<<< HEAD func translateEsOutputToExporter(cfg *config.C) (map[string]any, error) { esConfig, err := elasticsearchtranslate.ToOTelConfig(cfg, logp.NewLogger("")) +======= +func translateEsOutputToExporter(cfg *config.C, logger *logp.Logger) (map[string]any, error) { + esConfig, err := elasticsearchtranslate.ToOTelConfig(cfg, logger) +>>>>>>> 779fafdcd ([beatreceivers] Integrate beatsauthextension (#9257)) if err != nil { return nil, err } @@ -411,3 +474,33 @@ func translateEsOutputToExporter(cfg *config.C) (map[string]any, error) { esConfig["mapping"] = map[string]any{"mode": "bodymap"} return esConfig, nil } +<<<<<<< HEAD +======= + +func BeatDataPath(componentId string) string { + return filepath.Join(paths.Run(), componentId) +} + +// getBeatsAuthExtensionConfig sets http transport settings on beatsauth +// currently this is only supported for elasticsearch output +func getBeatsAuthExtensionConfig(cfg *config.C) (map[string]any, error) { + defaultTransportSettings := elasticsearch.ESDefaultTransportSettings() + err := cfg.Unpack(&defaultTransportSettings) + if err != nil { + return nil, err + } + + newConfig, err := config.NewConfigFrom(defaultTransportSettings) + if err != nil { + return nil, err + } + + var newMap map[string]any + err = newConfig.Unpack(&newMap) + if err != nil { + return nil, err + } + + return newMap, nil +} +>>>>>>> 779fafdcd ([beatreceivers] Integrate beatsauthextension (#9257)) diff --git a/internal/pkg/otel/translate/otelconfig_test.go b/internal/pkg/otel/translate/otelconfig_test.go index bb145c8bb2f..c3888de43c0 100644 --- a/internal/pkg/otel/translate/otelconfig_test.go +++ b/internal/pkg/otel/translate/otelconfig_test.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/confmap" "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" @@ -207,15 +208,30 @@ func TestGetOtelConfig(t *testing.T) { }, }, } - esOutputConfig := map[string]any{ - "type": "elasticsearch", - "hosts": []any{"localhost:9200"}, - "username": "elastic", - "password": "password", - "preset": "balanced", - "queue.mem.events": 3200, + + type extraParams struct { + key string + value any } + // pass ssl params as extra args to this method + esOutputConfig := func(extra ...extraParams) map[string]any { + finalOutput := map[string]any{ + "type": "elasticsearch", + "hosts": []any{"localhost:9200"}, + "username": "elastic", + "password": "password", + "preset": "balanced", + "queue.mem.events": 3200, + "ssl.enabled": true, + } + for _, v := range extra { + finalOutput[v.key] = v.value + } + return finalOutput + } + +<<<<<<< HEAD expectedESConfig := map[string]any{ "elasticsearch/_agent-component/default": map[string]any{ "batcher": map[string]any{ @@ -223,6 +239,45 @@ func TestGetOtelConfig(t *testing.T) { "max_size": 1600, "min_size": 0, }, +======= + expectedExtensionConfig := func(extra ...extraParams) map[string]any { + finalOutput := map[string]any{ + "idle_connection_timeout": "3s", + "proxy_disable": false, + "ssl": map[string]interface{}{ + "ca_sha256": []interface{}{}, + "ca_trusted_fingerprint": "", + "certificate": "", + "certificate_authorities": []interface{}{}, + "cipher_suites": []interface{}{}, + "curve_types": []interface{}{}, + "enabled": true, + "key": "", + "key_passphrase": "", + "key_passphrase_path": "", + "renegotiation": int64(0), + "supported_protocols": []interface{}{}, + "verification_mode": uint64(0), + }, + "timeout": "1m30s", + } + for _, v := range extra { + // accepts one level deep parameters to replace + if _, ok := v.value.(map[string]any); ok { + for newkey, newvalue := range v.value.(map[string]any) { + // this is brittle - it is expected that developers will pass expected params correctly here + finalOutput[v.key].(map[string]any)[newkey] = newvalue + } + continue + } + finalOutput[v.key] = v.value + } + return finalOutput + } + + expectedESConfig := func(outputName string) map[string]any { + return map[string]any{ +>>>>>>> 779fafdcd ([beatreceivers] Integrate beatsauthextension (#9257)) "compression": "gzip", "compression_params": map[string]any{ "level": 1, @@ -244,7 +299,14 @@ func TestGetOtelConfig(t *testing.T) { }, "timeout": 90 * time.Second, "idle_conn_timeout": 3 * time.Second, - }, + "auth": map[string]any{ + "authenticator": "beatsauth/_agent-component/" + outputName, + }, + "tls": map[string]any{ + "min_version": "1.2", + "max_version": "1.3", + }, + } } defaultProcessors := func(streamId, dataset string, namespace string) []any { @@ -304,6 +366,73 @@ func TestGetOtelConfig(t *testing.T) { } } + // expects input id + expectedFilestreamConfig := func(id string) map[string]any { + return map[string]any{ + "filebeat": map[string]any{ + "inputs": []map[string]any{ + { + "id": "test-1", + "type": "filestream", + "data_stream": map[string]any{ + "dataset": "generic-1", + }, + "paths": []any{ + "/var/log/*.log", + }, + "index": "logs-generic-1-default", + "processors": defaultProcessors("test-1", "generic-1", "logs"), + }, + { + "id": "test-2", + "type": "filestream", + "data_stream": map[string]any{ + "dataset": "generic-2", + }, + "paths": []any{ + "/var/log/*.log", + }, + "index": "logs-generic-2-default", + "processors": defaultProcessors("test-2", "generic-2", "logs"), + }, + }, + }, + "output": map[string]any{ + "otelconsumer": map[string]any{}, + }, + "path": map[string]any{ + "data": filepath.Join(paths.Run(), id), + }, + "queue": map[string]any{ + "mem": map[string]any{ + "events": uint64(3200), + "flush": map[string]any{ + "min_events": uint64(1600), + "timeout": "10s", + }, + }, + }, + "logging": map[string]any{ + "with_fields": map[string]any{ + "component": map[string]any{ + "binary": "filebeat", + "dataset": "elastic_agent.filebeat", + "type": "filestream", + "id": id, + }, + "log": map[string]any{ + "source": id, + }, + }, + }, + "http": map[string]any{ + "enabled": true, + "host": "localhost", + }, + } + + } + getBeatMonitoringConfig := func(_, _ string) map[string]any { return map[string]any{ "http": map[string]any{ @@ -357,83 +486,112 @@ func TestGetOtelConfig(t *testing.T) { { ID: "filestream-default", Type: client.UnitTypeOutput, - Config: component.MustExpectedConfig(esOutputConfig), + Config: component.MustExpectedConfig(esOutputConfig()), }, }, }, }, }, expectedConfig: confmap.NewFromStringMap(map[string]any{ - "exporters": expectedESConfig, + "exporters": map[string]any{ + "elasticsearch/_agent-component/default": expectedESConfig("default"), + }, + "extensions": map[string]any{ + "beatsauth/_agent-component/default": expectedExtensionConfig(), + }, "receivers": map[string]any{ - "filebeatreceiver/_agent-component/filestream-default": map[string]any{ - "filebeat": map[string]any{ - "inputs": []map[string]any{ - { - "id": "test-1", - "type": "filestream", - "data_stream": map[string]any{ - "dataset": "generic-1", - }, - "paths": []any{ - "/var/log/*.log", - }, - "index": "logs-generic-1-default", - "processors": defaultProcessors("test-1", "generic-1", "logs"), - }, - { - "id": "test-2", - "type": "filestream", - "data_stream": map[string]any{ - "dataset": "generic-2", - }, - "paths": []any{ - "/var/log/*.log", - }, - "index": "logs-generic-2-default", - "processors": defaultProcessors("test-2", "generic-2", "logs"), + "filebeatreceiver/_agent-component/filestream-default": expectedFilestreamConfig("filestream-default"), + }, + "service": map[string]any{ + "extensions": []interface{}{"beatsauth/_agent-component/default"}, + "pipelines": map[string]any{ + "logs/_agent-component/filestream-default": map[string][]string{ + "exporters": []string{"elasticsearch/_agent-component/default"}, + "receivers": []string{"filebeatreceiver/_agent-component/filestream-default"}, + }, + }, + }, + }), + }, + { + name: "multiple filestream inputs and output types", + model: &component.Model{ + Components: []component.Component{ + { + ID: "filestream-primaryOutput", + InputType: "filestream", + OutputType: "elasticsearch", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{"filebeat"}, }, }, }, - "output": map[string]any{ - "otelconsumer": map[string]any{}, - }, - "path": map[string]any{ - "data": filepath.Join(paths.Run(), "filestream-default"), - }, - "queue": map[string]any{ - "mem": map[string]any{ - "events": uint64(3200), - "flush": map[string]any{ - "min_events": uint64(1600), - "timeout": "10s", - }, + Units: []component.Unit{ + { + ID: "filestream-unit", + Type: client.UnitTypeInput, + Config: component.MustExpectedConfig(fileStreamConfig), + }, + { + ID: "filestream-primaryOutput", + Type: client.UnitTypeOutput, + Config: component.MustExpectedConfig(esOutputConfig(extraParams{"ssl.verification_mode", "certificate"})), }, }, - "logging": map[string]any{ - "with_fields": map[string]any{ - "component": map[string]any{ - "binary": "filebeat", - "dataset": "elastic_agent.filebeat", - "type": "filestream", - "id": "filestream-default", - }, - "log": map[string]any{ - "source": "filestream-default", + }, + { + ID: "filestream-secondaryOutput", + InputType: "filestream", + OutputType: "elasticsearch", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{"filebeat"}, }, }, }, - "http": map[string]any{ - "enabled": true, - "host": "localhost", + Units: []component.Unit{ + { + ID: "filestream-unit-2", + Type: client.UnitTypeInput, + Config: component.MustExpectedConfig(fileStreamConfig), + }, + { + ID: "filestream-secondaryOutput", + Type: client.UnitTypeOutput, + Config: component.MustExpectedConfig(esOutputConfig(extraParams{"ssl.ca_trusted_fingerprint", "b9a10bbe64ee9826abeda6546fc988c8bf798b41957c33d05db736716513dc9c"})), + }, }, }, }, + }, + expectedConfig: confmap.NewFromStringMap(map[string]any{ + "exporters": map[string]any{ + "elasticsearch/_agent-component/primaryOutput": expectedESConfig("primaryOutput"), + "elasticsearch/_agent-component/secondaryOutput": expectedESConfig("secondaryOutput"), + }, + "extensions": map[string]any{ + "beatsauth/_agent-component/primaryOutput": expectedExtensionConfig(extraParams{"ssl", map[string]any{"verification_mode": uint64(2)}}), + "beatsauth/_agent-component/secondaryOutput": expectedExtensionConfig(extraParams{"ssl", map[string]any{"ca_trusted_fingerprint": "b9a10bbe64ee9826abeda6546fc988c8bf798b41957c33d05db736716513dc9c"}}), + }, + "receivers": map[string]any{ + "filebeatreceiver/_agent-component/filestream-primaryOutput": expectedFilestreamConfig("filestream-primaryOutput"), + "filebeatreceiver/_agent-component/filestream-secondaryOutput": expectedFilestreamConfig("filestream-secondaryOutput"), + }, "service": map[string]any{ + "extensions": []interface{}{"beatsauth/_agent-component/primaryOutput", "beatsauth/_agent-component/secondaryOutput"}, "pipelines": map[string]any{ - "logs/_agent-component/filestream-default": map[string][]string{ - "exporters": []string{"elasticsearch/_agent-component/default"}, - "receivers": []string{"filebeatreceiver/_agent-component/filestream-default"}, + "logs/_agent-component/filestream-primaryOutput": map[string][]string{ + "exporters": []string{"elasticsearch/_agent-component/primaryOutput"}, + "receivers": []string{"filebeatreceiver/_agent-component/filestream-primaryOutput"}, + }, + "logs/_agent-component/filestream-secondaryOutput": map[string][]string{ + "exporters": []string{"elasticsearch/_agent-component/secondaryOutput"}, + "receivers": []string{"filebeatreceiver/_agent-component/filestream-secondaryOutput"}, }, }, }, @@ -464,14 +622,19 @@ func TestGetOtelConfig(t *testing.T) { { ID: "beat/metrics-default", Type: client.UnitTypeOutput, - Config: component.MustExpectedConfig(esOutputConfig), + Config: component.MustExpectedConfig(esOutputConfig()), }, }, }, }, }, expectedConfig: confmap.NewFromStringMap(map[string]any{ - "exporters": expectedESConfig, + "exporters": map[string]any{ + "elasticsearch/_agent-component/default": expectedESConfig("default"), + }, + "extensions": map[string]any{ + "beatsauth/_agent-component/default": expectedExtensionConfig(), + }, "receivers": map[string]any{ "metricbeatreceiver/_agent-component/beat-metrics-monitoring": map[string]any{ "metricbeat": map[string]any{ @@ -523,6 +686,7 @@ func TestGetOtelConfig(t *testing.T) { }, }, "service": map[string]any{ + "extensions": []interface{}{"beatsauth/_agent-component/default"}, "pipelines": map[string]any{ "logs/_agent-component/beat-metrics-monitoring": map[string][]string{ "exporters": []string{"elasticsearch/_agent-component/default"}, @@ -557,14 +721,19 @@ func TestGetOtelConfig(t *testing.T) { { ID: "system/metrics-default", Type: client.UnitTypeOutput, - Config: component.MustExpectedConfig(esOutputConfig), + Config: component.MustExpectedConfig(esOutputConfig()), }, }, }, }, }, expectedConfig: confmap.NewFromStringMap(map[string]any{ - "exporters": expectedESConfig, + "exporters": map[string]any{ + "elasticsearch/_agent-component/default": expectedESConfig("default"), + }, + "extensions": map[string]any{ + "beatsauth/_agent-component/default": expectedExtensionConfig(), + }, "receivers": map[string]any{ "metricbeatreceiver/_agent-component/system-metrics": map[string]any{ "metricbeat": map[string]any{ @@ -627,6 +796,7 @@ func TestGetOtelConfig(t *testing.T) { }, }, "service": map[string]any{ + "extensions": []interface{}{"beatsauth/_agent-component/default"}, "pipelines": map[string]any{ "logs/_agent-component/system-metrics": map[string][]string{ "exporters": []string{"elasticsearch/_agent-component/default"}, @@ -639,7 +809,7 @@ func TestGetOtelConfig(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - actualConf, actualError := GetOtelConfig(tt.model, agentInfo, getBeatMonitoringConfig) + actualConf, actualError := GetOtelConfig(tt.model, agentInfo, getBeatMonitoringConfig, logp.NewNopLogger()) if actualConf == nil || tt.expectedConfig == nil { assert.Equal(t, tt.expectedConfig, actualConf) } else { // this gives a nicer diff @@ -656,4 +826,215 @@ func TestGetOtelConfig(t *testing.T) { } } +<<<<<<< HEAD // TODO: Add unit tests for other config generation functions +======= +func TestGetReceiversConfigForComponent(t *testing.T) { + testAgentInfo := &info.AgentInfo{} + mockBeatMonitoringConfigGetter := func(componentID, beatName string) map[string]any { + return nil // Behavior when self-monitoring is disabled + } + + customBeatMonitoringConfigGetter := func(componentID, beatName string) map[string]any { + return map[string]any{ + "http": map[string]any{ + "enabled": true, + "host": "custom-host:5067", + "port": 5067, + }, + } + } + + // Create proper component configurations that match existing test patterns + filebeatComponent := &component.Component{ + ID: "filebeat-test-id", + InputType: "filestream", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Name: "filestream", + Command: &component.CommandSpec{ + Args: []string{"filebeat"}, + }, + }, + }, + Units: []component.Unit{ + { + ID: "filebeat-test-id-unit", + Type: client.UnitTypeInput, + Config: component.MustExpectedConfig(map[string]any{ + "id": "test", + "use_output": "default", + "streams": []any{ + map[string]any{ + "id": "test-1", + "data_stream": map[string]any{ + "dataset": "generic-1", + }, + "paths": []any{ + "/var/log/*.log", + }, + }, + }, + }), + }, + }, + } + + metricbeatComponent := &component.Component{ + ID: "metricbeat-test-id", + InputType: "system/metrics", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Name: "system/metrics", + Command: &component.CommandSpec{ + Args: []string{"metricbeat"}, + }, + }, + }, + Units: []component.Unit{ + { + ID: "metricbeat-test-id-unit", + Type: client.UnitTypeInput, + Config: component.MustExpectedConfig(map[string]any{ + "id": "test", + "use_output": "default", + "type": "system/metrics", + "streams": []any{ + map[string]any{ + "id": "test-1", + "data_stream": map[string]any{ + "dataset": "generic-1", + }, + "metricsets": map[string]any{ + "cpu": map[string]any{ + "data_stream.dataset": "system.cpu", + }, + }, + }, + }, + }), + }, + }, + } + + tests := []struct { + name string + component *component.Component + outputQueueConfig map[string]any + beatMonitoringConfigGetter BeatMonitoringConfigGetter + expectedError string + expectedReceiverType string + expectedBeatName string + }{ + { + name: "filebeat component with default monitoring", + component: filebeatComponent, + outputQueueConfig: nil, + beatMonitoringConfigGetter: mockBeatMonitoringConfigGetter, + expectedReceiverType: "filebeatreceiver", + expectedBeatName: "filebeat", + }, + { + name: "metricbeat component with custom monitoring and queue config", + component: metricbeatComponent, + outputQueueConfig: map[string]any{ + "type": "memory", + "size": 1000, + }, + beatMonitoringConfigGetter: customBeatMonitoringConfigGetter, + expectedReceiverType: "metricbeatreceiver", + expectedBeatName: "metricbeat", + }, + { + name: "component with no input units", + component: &component.Component{ + ID: "no-inputs-test-id", + InputType: "filestream", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Name: "filestream", + Command: &component.CommandSpec{ + Args: []string{"filebeat"}, + }, + }, + }, + Units: []component.Unit{ + { + ID: "output-unit", + Type: client.UnitTypeOutput, + Config: component.MustExpectedConfig(map[string]any{ + "type": "elasticsearch", + }), + }, + }, + }, + outputQueueConfig: nil, + beatMonitoringConfigGetter: mockBeatMonitoringConfigGetter, + expectedReceiverType: "filebeatreceiver", + expectedBeatName: "filebeat", + }, + { + name: "unsupported component type", + component: &component.Component{ + ID: "unsupported-test-id", + InputType: "unsupported", + }, + outputQueueConfig: nil, + beatMonitoringConfigGetter: mockBeatMonitoringConfigGetter, + expectedError: "unknown otel receiver type for input type: unsupported", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := getReceiversConfigForComponent( + tt.component, + testAgentInfo, + tt.outputQueueConfig, + tt.beatMonitoringConfigGetter, + ) + + if tt.expectedError != "" { + assert.Error(t, err) + assert.ErrorContains(t, err, tt.expectedError) + assert.Nil(t, result) + return + } + + require.NoError(t, err) + assert.NotNil(t, result) + + // Verify the receiver ID is present + receiverID := fmt.Sprintf("%s/_agent-component/%s", tt.expectedReceiverType, tt.component.ID) + assert.Contains(t, result, receiverID) + + receiverConfig, ok := result[receiverID].(map[string]any) + assert.True(t, ok, "receiver config should be a map") + + // Verify configuration section presence + assert.Contains(t, receiverConfig, "output", "output config should be present") + assert.Contains(t, receiverConfig, "path", "path config should be present") + assert.Contains(t, receiverConfig, "logging", "logging config should be present") + assert.Contains(t, receiverConfig, tt.expectedBeatName, fmt.Sprintf("%s config should be present", tt.expectedBeatName)) + + // Verify queue configuration presence + if tt.outputQueueConfig != nil { + assert.Contains(t, receiverConfig, "queue", "queue config should be present") + } else { + assert.NotContains(t, receiverConfig, "queue", "queue config should not be present") + } + + // Verify monitoring configuration is present (http section should exist) + assert.Contains(t, receiverConfig, "http", "http monitoring config should be present") + expectedMonitoringConfig := tt.beatMonitoringConfigGetter(tt.component.ID, tt.component.InputSpec.BinaryName) + // If the monitoring getter is not nil, verify the http section is the same + if expectedMonitoringConfig != nil { + assert.Equal(t, expectedMonitoringConfig["http"], receiverConfig["http"]) + } + }) + } +} +>>>>>>> 779fafdcd ([beatreceivers] Integrate beatsauthextension (#9257)) diff --git a/testing/integration/ess/otel_test.go b/testing/integration/ess/otel_test.go index 76f1b3d6c57..3966047f925 100644 --- a/testing/integration/ess/otel_test.go +++ b/testing/integration/ess/otel_test.go @@ -10,6 +10,7 @@ import ( "bytes" "context" "encoding/base64" + "encoding/pem" "errors" "fmt" "os" @@ -29,6 +30,7 @@ import ( "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/testing/estools" + "github.com/elastic/elastic-agent-libs/transport/tlscommontest" "github.com/elastic/elastic-agent/pkg/control/v2/client" aTesting "github.com/elastic/elastic-agent/pkg/testing" "github.com/elastic/elastic-agent/pkg/testing/define" @@ -1029,7 +1031,8 @@ func TestOtelFilestreamInput(t *testing.T) { require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) decodedApiKey, err := getDecodedApiKey(esApiKey) require.NoError(t, err) - configTemplate := `inputs: + configTemplate := ` +inputs: - type: filestream id: filestream-e2e use_output: default @@ -1048,6 +1051,8 @@ outputs: hosts: [{{.ESEndpoint}}] api_key: "{{.ESApiKey}}" preset: "balanced" + ssl.enabled: true + ssl.verification_mode: full monitoring: type: elasticsearch hosts: [{{.ESEndpoint}}] @@ -1904,3 +1909,160 @@ service: fixtureWg.Wait() require.True(t, err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded), "Retrieved unexpected error: %s", err.Error()) } + +func TestOtelBeatsAuthExtension(t *testing.T) { + info := define.Require(t, define.Requirements{ + Group: integration.Default, + Local: true, + OS: []define.OS{ + // {Type: define.Windows}, we don't support otel on Windows yet + {Type: define.Linux}, + {Type: define.Darwin}, + }, + Stack: &define.Stack{}, + }) + + // Create the otel configuration file + type otelConfigOptions struct { + ESEndpoint string + ESApiKey string + Index string + CAFile string + } + esEndpoint, err := integration.GetESHost() + require.NoError(t, err, "error getting elasticsearch endpoint") + esApiKey, err := createESApiKey(info.ESClient) + require.NoError(t, err, "error creating API key") + require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) + index := "logs-integration-" + info.Namespace + + fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) + require.NoError(t, err) + + ctx, cancel := testcontext.WithDeadline(t, t.Context(), time.Now().Add(5*time.Minute)) + defer cancel() + err = fixture.Prepare(ctx) + require.NoError(t, err) + + // create ca-cert + caCert, err := tlscommontest.GenCA() + if err != nil { + t.Fatalf("could not generate root CA certificate: %s", err) + } + + caFilePath := filepath.Join(t.TempDir(), "ca.pem") + os.WriteFile(caFilePath, pem.EncodeToMemory(&pem.Block{ + Type: "CERTIFICATE", + Bytes: caCert.Leaf.Raw}), 0o777) + + // we pass an incorrect CA to es-exporter + // but we expect beatsauthextension to replace the exporter's + // roundtripper with how beats implements it (with given http configuration block) + // hence we expect events to be indexed to elasticsearch + // if authextension is not used - this test fails + otelConfigTemplate := ` +extensions: + beatsauth: + ssl: + enabled: true + verification_mode: none +receivers: + metricbeatreceiver: + metricbeat: + modules: + - module: system + enabled: true + period: 1s + processes: + - '.*' + metricsets: + - cpu + output: + otelconsumer: + queue.mem.flush.timeout: 0s +exporters: + elasticsearch/log: + endpoints: + - {{.ESEndpoint}} + api_key: {{.ESApiKey}} + logs_index: {{.Index}} + batcher: + enabled: true + flush_timeout: 1s + min_size: 1 + tls: + ca_file: {{ .CAFile }} + auth: + authenticator: beatsauth + mapping: + mode: bodymap +service: + extensions: [beatsauth] + pipelines: + logs: + receivers: + - metricbeatreceiver + exporters: + - elasticsearch/log +` + var otelConfigBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("otelConfig").Parse(otelConfigTemplate)).Execute(&otelConfigBuffer, + otelConfigOptions{ + ESEndpoint: esEndpoint, + ESApiKey: esApiKey.Encoded, + Index: index, + CAFile: caFilePath, + })) + + // configure elastic-agent.yml + err = fixture.Configure(ctx, otelConfigBuffer.Bytes()) + + // prepare agent command + cmd, err := fixture.PrepareAgentCommand(ctx, nil) + require.NoError(t, err, "cannot prepare Elastic-Agent command: %w", err) + + output := strings.Builder{} + cmd.Stderr = &output + cmd.Stdout = &output + + // start elastic-agent + err = cmd.Start() + require.NoError(t, err) + + t.Cleanup(func() { + if t.Failed() { + t.Log("Elastic-Agent output:") + t.Log(output.String()) + } + }) + + require.Eventually(t, func() bool { + err = fixture.IsHealthy(ctx) + if err != nil { + t.Logf("waiting for agent healthy: %s", err.Error()) + return false + } + return true + }, 30*time.Second, 1*time.Second) + + // Make sure find the logs + actualHits := &struct{ Hits int }{} + require.Eventually(t, + func() bool { + findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second) + defer findCancel() + + docs, err := estools.GetLogsForIndexWithContext(findCtx, info.ESClient, ".ds-"+index+"*", map[string]interface{}{ + "metricset.name": "cpu", + }) + require.NoError(t, err) + + actualHits.Hits = docs.Hits.Total.Value + return actualHits.Hits >= 1 + }, + 2*time.Minute, 1*time.Second, + "Expected at least %d logs, got %v", 1, actualHits) + + cancel() +} From 8df6e1a960465b7e7bd0b1d923f5f8436ab0c9ef Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 18 Sep 2025 09:46:35 +0530 Subject: [PATCH 2/3] fix conflicts --- internal/pkg/otel/README.md | 13 +- internal/pkg/otel/manager/manager.go | 136 ----- internal/pkg/otel/manager/manager_test.go | 559 ------------------ internal/pkg/otel/translate/otelconfig.go | 12 - .../pkg/otel/translate/otelconfig_test.go | 221 ------- 5 files changed, 1 insertion(+), 940 deletions(-) diff --git a/internal/pkg/otel/README.md b/internal/pkg/otel/README.md index 7feaebad1df..74668a05e26 100644 --- a/internal/pkg/otel/README.md +++ b/internal/pkg/otel/README.md @@ -89,25 +89,14 @@ This section provides a summary of components included in the Elastic Distributi |---|---| | [apikeyauthextension](https://github.com/elastic/opentelemetry-collector-components/blob/extension/apikeyauthextension/v0.4.1/extension/apikeyauthextension/README.md) | v0.4.1 | | [apmconfigextension](https://github.com/elastic/opentelemetry-collector-components/blob/extension/apmconfigextension/v0.6.0/extension/apmconfigextension/README.md) | v0.6.0 | -<<<<<<< HEAD | [bearertokenauthextension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/bearertokenauthextension/v0.130.0/extension/bearertokenauthextension/README.md) | v0.130.0 | +| [beatsauthextension](https://github.com/elastic/opentelemetry-collector-components/blob/extension/beatsauthextension/v0.2.0/extension/beatsauthextension/README.md) | v0.2.0 | | [filestorage](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/storage/filestorage/v0.130.0/extension/storage/filestorage/README.md) | v0.130.0 | | [healthcheckextension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/healthcheckextension/v0.130.0/extension/healthcheckextension/README.md) | v0.130.0 | | [k8sleaderelector](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/k8sleaderelector/v0.130.0/extension/k8sleaderelector/README.md) | v0.130.0 | | [k8sobserver](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/observer/k8sobserver/v0.130.0/extension/observer/k8sobserver/README.md) | v0.130.0 | | [memorylimiterextension](https://github.com/open-telemetry/opentelemetry-collector/blob/extension/memorylimiterextension/v0.130.0/extension/memorylimiterextension/README.md) | v0.130.0 | | [pprofextension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/pprofextension/v0.130.0/extension/pprofextension/README.md) | v0.130.0 | -======= -| [bearertokenauthextension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/bearertokenauthextension/v0.132.0/extension/bearertokenauthextension/README.md) | v0.132.0 | -| [beatsauthextension](https://github.com/elastic/opentelemetry-collector-components/blob/extension/beatsauthextension/v0.2.0/extension/beatsauthextension/README.md) | v0.2.0 | -| [filestorage](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/storage/filestorage/v0.132.0/extension/storage/filestorage/README.md) | v0.132.0 | -| [healthcheckextension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/healthcheckextension/v0.132.0/extension/healthcheckextension/README.md) | v0.132.0 | -| [healthcheckv2extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/healthcheckv2extension/v0.132.0/extension/healthcheckv2extension/README.md) | v0.132.0 | -| [k8sleaderelector](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/k8sleaderelector/v0.132.0/extension/k8sleaderelector/README.md) | v0.132.0 | -| [k8sobserver](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/observer/k8sobserver/v0.132.0/extension/observer/k8sobserver/README.md) | v0.132.0 | -| [memorylimiterextension](https://github.com/open-telemetry/opentelemetry-collector/blob/extension/memorylimiterextension/v0.132.0/extension/memorylimiterextension/README.md) | v0.132.0 | -| [pprofextension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/pprofextension/v0.132.0/extension/pprofextension/README.md) | v0.132.0 | ->>>>>>> 779fafdcd ([beatreceivers] Integrate beatsauthextension (#9257)) ### Connectors diff --git a/internal/pkg/otel/manager/manager.go b/internal/pkg/otel/manager/manager.go index b8543d2c722..7175cf6b036 100644 --- a/internal/pkg/otel/manager/manager.go +++ b/internal/pkg/otel/manager/manager.go @@ -112,7 +112,6 @@ func (m *OTelManager) Run(ctx context.Context) error { m.reportErr(ctx, err) } } -<<<<<<< HEAD case cfg := <-m.cfgCh: m.cfg = cfg if cfg == nil { @@ -157,35 +156,6 @@ func (m *OTelManager) Run(ctx context.Context) error { // needs to be updated in the collector provider.Update(m.cfg) } -======= - - case cfgUpdate := <-m.updateCh: - // we received a new configuration, thus stop the recovery timer - // and reset the retry count - m.recoveryTimer.Stop() - m.recoveryRetries.Store(0) - mergedCfg, err := buildMergedConfig(cfgUpdate, m.agentInfo, m.beatMonitoringConfigGetter, m.baseLogger) - if err != nil { - reportErr(ctx, m.errCh, err) - continue - } - - // this is the only place where we mutate the internal config attributes, take a write lock for the duration - m.mx.Lock() - m.mergedCollectorCfg = mergedCfg - m.collectorCfg = cfgUpdate.collectorCfg - m.components = cfgUpdate.components - m.mx.Unlock() - - err = m.applyMergedConfig(ctx, collectorStatusCh, collectorRunErr) - // report the error unconditionally to indicate that the config was applied - reportErr(ctx, m.errCh, err) - - case otelStatus := <-collectorStatusCh: - err = m.reportOtelStatusUpdate(ctx, otelStatus) - if err != nil { - reportErr(ctx, m.errCh, err) ->>>>>>> 779fafdcd ([beatreceivers] Integrate beatsauthextension (#9257)) } } } @@ -196,116 +166,10 @@ func (m *OTelManager) Errors() <-chan error { return m.errCh } -<<<<<<< HEAD // Update updates the configuration. // // When nil is passed for the cfg, then the collector is stopped. func (m *OTelManager) Update(cfg *confmap.Conf) { -======= -// buildMergedConfig combines collector configuration with component-derived configuration. -func buildMergedConfig(cfgUpdate configUpdate, agentInfo info.Agent, monitoringConfigGetter translate.BeatMonitoringConfigGetter, logger *logp.Logger) (*confmap.Conf, error) { - mergedOtelCfg := confmap.New() - - // Generate component otel config if there are components - var componentOtelCfg *confmap.Conf - if len(cfgUpdate.components) > 0 { - model := &component.Model{Components: cfgUpdate.components} - var err error - componentOtelCfg, err = translate.GetOtelConfig(model, agentInfo, monitoringConfigGetter, logger) - if err != nil { - return nil, fmt.Errorf("failed to generate otel config: %w", err) - } - } - - // If both configs are nil, return nil so the manager knows to stop the collector - if componentOtelCfg == nil && cfgUpdate.collectorCfg == nil { - return nil, nil - } - - // Merge component config if it exists - if componentOtelCfg != nil { - err := mergedOtelCfg.Merge(componentOtelCfg) - if err != nil { - return nil, fmt.Errorf("failed to merge component otel config: %w", err) - } - } - - // Merge with base collector config if it exists - if cfgUpdate.collectorCfg != nil { - err := mergedOtelCfg.Merge(cfgUpdate.collectorCfg) - if err != nil { - return nil, fmt.Errorf("failed to merge collector otel config: %w", err) - } - } - - return mergedOtelCfg, nil -} - -func (m *OTelManager) applyMergedConfig(ctx context.Context, collectorStatusCh chan *status.AggregateStatus, collectorRunErr chan error) error { - if m.proc != nil { - m.proc.Stop(ctx) - m.proc = nil - select { - case <-collectorRunErr: - case <-ctx.Done(): - // our caller ctx is Done - return ctx.Err() - } - // drain the internal status update channel - // this status handling is normally done in the main loop, but in this case we want to ensure that we emit a - // nil status after the collector has stopped - select { - case statusCh := <-collectorStatusCh: - updateErr := m.reportOtelStatusUpdate(ctx, statusCh) - if updateErr != nil { - m.logger.Error("failed to update otel status", zap.Error(updateErr)) - } - case <-ctx.Done(): - // our caller ctx is Done - return ctx.Err() - default: - } - err := m.reportOtelStatusUpdate(ctx, nil) - if err != nil { - return err - } - } - - if m.mergedCollectorCfg == nil { - // no configuration then the collector should not be - // running. - // ensure that the coordinator knows that there is no error - // as the collector is not running anymore - return nil - } else { - // either a new configuration or the first configuration - // that results in the collector being started - proc, err := m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh) - if err != nil { - // failed to create the collector (this is different then - // it's failing to run). we do not retry creation on failure - // as it will always fail. A new configuration is required for - // it not to fail (a new configuration will result in the retry) - // since this is a new configuration we want to start the timer - // from the initial delay - recoveryDelay := m.recoveryTimer.ResetInitial() - m.logger.Errorf("collector exited with error (will try to recover in %s): %v", recoveryDelay.String(), err) - return err - } else { - // all good at the moment (possible that it will fail) - m.proc = proc - } - } - return nil -} - -// Update sends collector configuration and component updates to the manager's run loop. -func (m *OTelManager) Update(cfg *confmap.Conf, components []component.Component) { - cfgUpdate := configUpdate{ - collectorCfg: cfg, - components: components, - } ->>>>>>> 779fafdcd ([beatreceivers] Integrate beatsauthextension (#9257)) select { case m.cfgCh <- cfg: case <-m.doneChan: diff --git a/internal/pkg/otel/manager/manager_test.go b/internal/pkg/otel/manager/manager_test.go index 814e62e5ddc..72c00d32645 100644 --- a/internal/pkg/otel/manager/manager_test.go +++ b/internal/pkg/otel/manager/manager_test.go @@ -15,17 +15,12 @@ import ( "gopkg.in/yaml.v2" -<<<<<<< HEAD "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/confmap" -======= - "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/logp/logptest" ->>>>>>> 779fafdcd ([beatreceivers] Integrate beatsauthextension (#9257)) "github.com/elastic/elastic-agent/pkg/core/logger/loggertest" ) @@ -277,557 +272,3 @@ func toSerializableStatus(s *status.AggregateStatus) *serializableStatus { } return outputStruct } -<<<<<<< HEAD -======= - -// Mock function for BeatMonitoringConfigGetter -func mockBeatMonitoringConfigGetter(unitID, binary string) map[string]any { - return map[string]any{"test": "config"} -} - -// Helper function to create test logger -func newTestLogger() *logger.Logger { - l, _ := loggertest.New("test") - return l -} - -func TestOTelManager_buildMergedConfig(t *testing.T) { - // Common parameters used across all test cases - commonAgentInfo := &info.AgentInfo{} - commonBeatMonitoringConfigGetter := mockBeatMonitoringConfigGetter - testComp := testComponent("test-component") - - tests := []struct { - name string - collectorCfg *confmap.Conf - components []component.Component - expectedKeys []string - expectedErrorString string - }{ - { - name: "nil config returns nil", - collectorCfg: nil, - components: nil, - }, - { - name: "empty config returns empty config", - collectorCfg: nil, - components: nil, - expectedKeys: []string{}, - }, - { - name: "collector config only", - collectorCfg: confmap.NewFromStringMap(map[string]any{"receivers": map[string]any{"nop": map[string]any{}}}), - components: nil, - expectedKeys: []string{"receivers"}, - }, - { - name: "components only", - collectorCfg: nil, - components: []component.Component{testComp}, - expectedKeys: []string{"receivers", "exporters", "service"}, - }, - { - name: "both collector config and components", - collectorCfg: confmap.NewFromStringMap(map[string]any{"processors": map[string]any{"batch": map[string]any{}}}), - components: []component.Component{testComp}, - expectedKeys: []string{"receivers", "exporters", "service", "processors"}, - }, - { - name: "component config generation error", - collectorCfg: nil, - components: []component.Component{{ - ID: "test-component", - InputType: "filestream", // Supported input type - OutputType: "elasticsearch", // Supported output type - // Missing InputSpec which should cause an error during config generation - }}, - expectedErrorString: "failed to generate otel config: unknown otel receiver type for input type: filestream", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cfgUpdate := configUpdate{ - collectorCfg: tt.collectorCfg, - components: tt.components, - } - result, err := buildMergedConfig(cfgUpdate, commonAgentInfo, commonBeatMonitoringConfigGetter, logptest.NewTestingLogger(t, "")) - - if tt.expectedErrorString != "" { - assert.Error(t, err) - assert.Equal(t, tt.expectedErrorString, err.Error()) - assert.Nil(t, result) - return - } - - assert.NoError(t, err) - - if len(tt.expectedKeys) == 0 { - assert.Nil(t, result) - return - } - - require.NotNil(t, result) - for _, key := range tt.expectedKeys { - assert.True(t, result.IsSet(key), "Expected key %s to be set", key) - } - }) - } -} - -func TestOTelManager_handleOtelStatusUpdate(t *testing.T) { - // Common test component used across test cases - testComp := testComponent("test-component") - - tests := []struct { - name string - components []component.Component - inputStatus *status.AggregateStatus - expectedErrorString string - expectedCollectorStatus *status.AggregateStatus - expectedComponentStates []runtime.ComponentComponentState - }{ - { - name: "successful status update with component states", - components: []component.Component{testComp}, - inputStatus: &status.AggregateStatus{ - Event: componentstatus.NewEvent(componentstatus.StatusOK), - ComponentStatusMap: map[string]*status.AggregateStatus{ - // This represents a pipeline for our component (with OtelNamePrefix) - "pipeline:logs/_agent-component/test-component": { - Event: componentstatus.NewEvent(componentstatus.StatusOK), - ComponentStatusMap: map[string]*status.AggregateStatus{ - "receiver:filebeat/_agent-component/test-component": { - Event: componentstatus.NewEvent(componentstatus.StatusOK), - }, - "exporter:elasticsearch/_agent-component/test-component": { - Event: componentstatus.NewEvent(componentstatus.StatusOK), - }, - }, - }, - // This represents a regular collector pipeline (should remain after cleaning) - "pipeline:logs": { - Event: componentstatus.NewEvent(componentstatus.StatusOK), - }, - }, - }, - expectedCollectorStatus: &status.AggregateStatus{ - Event: componentstatus.NewEvent(componentstatus.StatusOK), - ComponentStatusMap: map[string]*status.AggregateStatus{ - // This represents a regular collector pipeline (should remain after cleaning) - "pipeline:logs": { - Event: componentstatus.NewEvent(componentstatus.StatusOK), - }, - }, - }, - expectedComponentStates: []runtime.ComponentComponentState{ - { - Component: testComp, - State: runtime.ComponentState{ - State: client.UnitStateHealthy, - Message: "HEALTHY", - Units: map[runtime.ComponentUnitKey]runtime.ComponentUnitState{ - runtime.ComponentUnitKey{ - UnitID: "filestream-unit", - UnitType: client.UnitTypeInput, - }: { - State: client.UnitStateHealthy, - Message: "Healthy", - Payload: map[string]any{ - "streams": map[string]map[string]string{ - "test-1": { - "error": "", - "status": client.UnitStateHealthy.String(), - }, - "test-2": { - "error": "", - "status": client.UnitStateHealthy.String(), - }, - }, - }, - }, - runtime.ComponentUnitKey{ - UnitID: "filestream-default", - UnitType: client.UnitTypeOutput, - }: { - State: client.UnitStateHealthy, - Message: "Healthy", - }, - }, - VersionInfo: runtime.ComponentVersionInfo{ - Name: translate.OtelComponentName, - Meta: map[string]string{ - "build_time": version.BuildTime().String(), - "commit": version.Commit(), - }, - BuildHash: version.Commit(), - }, - }, - }, - }, - }, - { - name: "handles nil otel status", - components: []component.Component{}, - inputStatus: nil, - expectedCollectorStatus: nil, - expectedComponentStates: nil, - }, - { - name: "handles empty components list", - components: []component.Component{}, - inputStatus: &status.AggregateStatus{ - Event: componentstatus.NewEvent(componentstatus.StatusOK), - }, - expectedErrorString: "", - expectedCollectorStatus: &status.AggregateStatus{ - Event: componentstatus.NewEvent(componentstatus.StatusOK), - }, - expectedComponentStates: nil, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - mgr := &OTelManager{ - logger: newTestLogger(), - components: tt.components, - currentComponentStates: make(map[string]runtime.ComponentComponentState), - } - - componentStates, err := mgr.handleOtelStatusUpdate(tt.inputStatus) - - // Verify error expectation - if tt.expectedErrorString != "" { - require.Error(t, err) - require.Contains(t, err.Error(), tt.expectedErrorString) - return - } - - require.NoError(t, err) - - // Compare component states - assert.Equal(t, tt.expectedComponentStates, componentStates) - - // Compare collector status - assertOtelStatusesEqualIgnoringTimestamps(t, tt.expectedCollectorStatus, mgr.currentCollectorStatus) - }) - } -} - -func TestOTelManager_processComponentStates(t *testing.T) { - tests := []struct { - name string - currentComponentStates map[string]runtime.ComponentComponentState - inputComponentStates []runtime.ComponentComponentState - expectedOutputStates []runtime.ComponentComponentState - expectedCurrentStatesAfter map[string]runtime.ComponentComponentState - }{ - { - name: "empty input and current states", - currentComponentStates: map[string]runtime.ComponentComponentState{}, - inputComponentStates: []runtime.ComponentComponentState{}, - expectedOutputStates: []runtime.ComponentComponentState{}, - expectedCurrentStatesAfter: map[string]runtime.ComponentComponentState{}, - }, - { - name: "new component state added", - currentComponentStates: map[string]runtime.ComponentComponentState{}, - inputComponentStates: []runtime.ComponentComponentState{ - { - Component: component.Component{ID: "comp1"}, - State: runtime.ComponentState{State: client.UnitStateHealthy}, - }, - }, - expectedOutputStates: []runtime.ComponentComponentState{ - { - Component: component.Component{ID: "comp1"}, - State: runtime.ComponentState{State: client.UnitStateHealthy}, - }, - }, - expectedCurrentStatesAfter: map[string]runtime.ComponentComponentState{ - "comp1": { - Component: component.Component{ID: "comp1"}, - State: runtime.ComponentState{State: client.UnitStateHealthy}, - }, - }, - }, - { - name: "component removed from config generates STOPPED state", - currentComponentStates: map[string]runtime.ComponentComponentState{ - "comp1": { - Component: component.Component{ID: "comp1"}, - State: runtime.ComponentState{State: client.UnitStateHealthy}, - }, - }, - inputComponentStates: []runtime.ComponentComponentState{}, - expectedOutputStates: []runtime.ComponentComponentState{ - { - Component: component.Component{ID: "comp1"}, - State: runtime.ComponentState{State: client.UnitStateStopped}, - }, - }, - expectedCurrentStatesAfter: map[string]runtime.ComponentComponentState{}, - }, - { - name: "component stopped removes from current states", - currentComponentStates: map[string]runtime.ComponentComponentState{ - "comp1": { - Component: component.Component{ID: "comp1"}, - State: runtime.ComponentState{State: client.UnitStateHealthy}, - }, - }, - inputComponentStates: []runtime.ComponentComponentState{ - { - Component: component.Component{ID: "comp1"}, - State: runtime.ComponentState{State: client.UnitStateStopped}, - }, - }, - expectedOutputStates: []runtime.ComponentComponentState{ - { - Component: component.Component{ID: "comp1"}, - State: runtime.ComponentState{State: client.UnitStateStopped}, - }, - }, - expectedCurrentStatesAfter: map[string]runtime.ComponentComponentState{}, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - mgr := &OTelManager{ - logger: newTestLogger(), - currentComponentStates: tt.currentComponentStates, - } - - result := mgr.processComponentStates(tt.inputComponentStates) - - assert.ElementsMatch(t, tt.expectedOutputStates, result) - assert.Equal(t, tt.expectedCurrentStatesAfter, mgr.currentComponentStates) - }) - } -} - -// TestOTelManagerEndToEnd tests the full lifecycle of the OTelManager -// including configuration updates, status updates, and error handling. -func TestOTelManagerEndToEnd(t *testing.T) { - // Setup test logger and dependencies - testLogger, _ := loggertest.New("test") - agentInfo := &info.AgentInfo{} - beatMonitoringConfigGetter := mockBeatMonitoringConfigGetter - collectorStarted := make(chan struct{}) - - execution := &mockExecution{ - collectorStarted: collectorStarted, - } - - // Create manager with test dependencies - mgr := OTelManager{ - logger: testLogger, - baseLogger: testLogger, - errCh: make(chan error, 1), // holds at most one error - updateCh: make(chan configUpdate), - collectorStatusCh: make(chan *status.AggregateStatus, 1), - componentStateCh: make(chan []runtime.ComponentComponentState, 1), - doneChan: make(chan struct{}), - recoveryTimer: newRestarterNoop(), - execution: execution, - agentInfo: agentInfo, - beatMonitoringConfigGetter: beatMonitoringConfigGetter, - } - - // Start manager in a goroutine - ctx, cancel := context.WithTimeout(context.Background(), time.Minute*1) - defer cancel() - - go func() { - err := mgr.Run(ctx) - assert.ErrorIs(t, err, context.Canceled) - }() - - collectorCfg := confmap.NewFromStringMap(map[string]interface{}{ - "receivers": map[string]interface{}{ - "nop": map[string]interface{}{}, - }, - "exporters": map[string]interface{}{"nop": map[string]interface{}{}}, - "service": map[string]interface{}{ - "pipelines": map[string]interface{}{ - "metrics": map[string]interface{}{ - "receivers": []string{"nop"}, - "exporters": []string{"nop"}, - }, - }, - }, - }) - - testComp := testComponent("test") - components := []component.Component{testComp} - - t.Run("collector config is passed down to the collector execution", func(t *testing.T) { - mgr.Update(collectorCfg, nil) - select { - case <-collectorStarted: - case <-ctx.Done(): - t.Fatal("timeout waiting for collector config update") - } - assert.Equal(t, collectorCfg, execution.cfg) - - }) - - t.Run("collector status is passed up to the component manager", func(t *testing.T) { - otelStatus := &status.AggregateStatus{ - Event: componentstatus.NewEvent(componentstatus.StatusOK), - } - - select { - case <-ctx.Done(): - t.Fatal("timeout waiting for collector status update") - case execution.statusCh <- otelStatus: - } - - collectorStatus, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchCollector(), mgr.Errors()) - require.NoError(t, err) - assert.Equal(t, otelStatus, collectorStatus) - }) - - t.Run("component config is passed down to the otel manager", func(t *testing.T) { - mgr.Update(collectorCfg, components) - select { - case <-collectorStarted: - case <-ctx.Done(): - t.Fatal("timeout waiting for collector config update") - } - cfg := execution.cfg - require.NotNil(t, cfg) - receivers, err := cfg.Sub("receivers") - require.NoError(t, err) - require.NotNil(t, receivers) - assert.True(t, receivers.IsSet("nop")) - assert.True(t, receivers.IsSet("filebeatreceiver/_agent-component/test")) - - collectorStatus, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchCollector(), mgr.Errors()) - assert.Nil(t, err) - assert.Nil(t, collectorStatus) - }) - - t.Run("empty collector config leaves the component config running", func(t *testing.T) { - mgr.Update(nil, components) - select { - case <-collectorStarted: - case <-ctx.Done(): - t.Fatal("timeout waiting for collector config update") - } - cfg := execution.cfg - require.NotNil(t, cfg) - receivers, err := cfg.Sub("receivers") - require.NoError(t, err) - require.NotNil(t, receivers) - assert.False(t, receivers.IsSet("nop")) - assert.True(t, receivers.IsSet("filebeatreceiver/_agent-component/test")) - - collectorStatus, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchCollector(), mgr.Errors()) - assert.Nil(t, err) - assert.Nil(t, collectorStatus) - }) - - t.Run("collector status with components is passed up to the component manager", func(t *testing.T) { - otelStatus := &status.AggregateStatus{ - Event: componentstatus.NewEvent(componentstatus.StatusOK), - ComponentStatusMap: map[string]*status.AggregateStatus{ - // This represents a pipeline for our component (with OtelNamePrefix) - "pipeline:logs/_agent-component/test": { - Event: componentstatus.NewEvent(componentstatus.StatusOK), - ComponentStatusMap: map[string]*status.AggregateStatus{ - "receiver:filebeatreceiver/_agent-component/test": { - Event: componentstatus.NewEvent(componentstatus.StatusOK), - }, - "exporter:elasticsearch/_agent-component/test": { - Event: componentstatus.NewEvent(componentstatus.StatusOK), - }, - }, - }, - }, - } - - select { - case <-ctx.Done(): - t.Fatal("timeout waiting for collector status update") - case execution.statusCh <- otelStatus: - } - - collectorStatus, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchCollector(), mgr.Errors()) - require.NoError(t, err) - require.NotNil(t, collectorStatus) - assert.Len(t, collectorStatus.ComponentStatusMap, 0) - - componentState, err := getFromChannelOrErrorWithContext(t, ctx, mgr.WatchComponents(), mgr.Errors()) - require.NoError(t, err) - require.NotNil(t, componentState) - require.Len(t, componentState, 1) - assert.Equal(t, componentState[0].Component, testComp) - }) - - t.Run("collector error is passed up to the component manager", func(t *testing.T) { - collectorErr := errors.New("collector error") - - select { - case <-ctx.Done(): - t.Fatal("timeout waiting for collector status update") - case execution.errCh <- collectorErr: - } - - // we should get a nil status and an error - select { - case <-ctx.Done(): - t.Fatal("timeout waiting for collector status update") - case s := <-mgr.WatchCollector(): - assert.Nil(t, s) - } - select { - case <-ctx.Done(): - t.Fatal("timeout waiting for collector status update") - case err := <-mgr.Errors(): - assert.Equal(t, collectorErr, err) - } - }) -} - -func getFromChannelOrErrorWithContext[T any](t *testing.T, ctx context.Context, ch <-chan T, errCh <-chan error) (T, error) { - t.Helper() - var result T - var err error - for err == nil { - select { - case result = <-ch: - return result, nil - case err = <-errCh: - case <-ctx.Done(): - err = ctx.Err() - } - } - return result, err -} - -func assertOtelStatusesEqualIgnoringTimestamps(t require.TestingT, a, b *status.AggregateStatus) bool { - if a == nil || b == nil { - return assert.Equal(t, a, b) - } - - if !assert.Equal(t, a.Status(), b.Status()) { - return false - } - - if !assert.Equal(t, len(a.ComponentStatusMap), len(b.ComponentStatusMap)) { - return false - } - - for k, v := range a.ComponentStatusMap { - if !assertOtelStatusesEqualIgnoringTimestamps(t, v, b.ComponentStatusMap[k]) { - return false - } - } - - return true -} ->>>>>>> 779fafdcd ([beatreceivers] Integrate beatsauthextension (#9257)) diff --git a/internal/pkg/otel/translate/otelconfig.go b/internal/pkg/otel/translate/otelconfig.go index 015a3cc5070..7129f1c7e9e 100644 --- a/internal/pkg/otel/translate/otelconfig.go +++ b/internal/pkg/otel/translate/otelconfig.go @@ -455,13 +455,8 @@ func getDefaultDatastreamTypeForComponent(comp *component.Component) (string, er } // translateEsOutputToExporter translates an elasticsearch output configuration to an elasticsearch exporter configuration. -<<<<<<< HEAD -func translateEsOutputToExporter(cfg *config.C) (map[string]any, error) { - esConfig, err := elasticsearchtranslate.ToOTelConfig(cfg, logp.NewLogger("")) -======= func translateEsOutputToExporter(cfg *config.C, logger *logp.Logger) (map[string]any, error) { esConfig, err := elasticsearchtranslate.ToOTelConfig(cfg, logger) ->>>>>>> 779fafdcd ([beatreceivers] Integrate beatsauthextension (#9257)) if err != nil { return nil, err } @@ -474,12 +469,6 @@ func translateEsOutputToExporter(cfg *config.C, logger *logp.Logger) (map[string esConfig["mapping"] = map[string]any{"mode": "bodymap"} return esConfig, nil } -<<<<<<< HEAD -======= - -func BeatDataPath(componentId string) string { - return filepath.Join(paths.Run(), componentId) -} // getBeatsAuthExtensionConfig sets http transport settings on beatsauth // currently this is only supported for elasticsearch output @@ -503,4 +492,3 @@ func getBeatsAuthExtensionConfig(cfg *config.C) (map[string]any, error) { return newMap, nil } ->>>>>>> 779fafdcd ([beatreceivers] Integrate beatsauthextension (#9257)) diff --git a/internal/pkg/otel/translate/otelconfig_test.go b/internal/pkg/otel/translate/otelconfig_test.go index c3888de43c0..ef37328b7c3 100644 --- a/internal/pkg/otel/translate/otelconfig_test.go +++ b/internal/pkg/otel/translate/otelconfig_test.go @@ -231,15 +231,6 @@ func TestGetOtelConfig(t *testing.T) { return finalOutput } -<<<<<<< HEAD - expectedESConfig := map[string]any{ - "elasticsearch/_agent-component/default": map[string]any{ - "batcher": map[string]any{ - "enabled": true, - "max_size": 1600, - "min_size": 0, - }, -======= expectedExtensionConfig := func(extra ...extraParams) map[string]any { finalOutput := map[string]any{ "idle_connection_timeout": "3s", @@ -277,7 +268,6 @@ func TestGetOtelConfig(t *testing.T) { expectedESConfig := func(outputName string) map[string]any { return map[string]any{ ->>>>>>> 779fafdcd ([beatreceivers] Integrate beatsauthextension (#9257)) "compression": "gzip", "compression_params": map[string]any{ "level": 1, @@ -826,215 +816,4 @@ func TestGetOtelConfig(t *testing.T) { } } -<<<<<<< HEAD // TODO: Add unit tests for other config generation functions -======= -func TestGetReceiversConfigForComponent(t *testing.T) { - testAgentInfo := &info.AgentInfo{} - mockBeatMonitoringConfigGetter := func(componentID, beatName string) map[string]any { - return nil // Behavior when self-monitoring is disabled - } - - customBeatMonitoringConfigGetter := func(componentID, beatName string) map[string]any { - return map[string]any{ - "http": map[string]any{ - "enabled": true, - "host": "custom-host:5067", - "port": 5067, - }, - } - } - - // Create proper component configurations that match existing test patterns - filebeatComponent := &component.Component{ - ID: "filebeat-test-id", - InputType: "filestream", - InputSpec: &component.InputRuntimeSpec{ - BinaryName: "agentbeat", - Spec: component.InputSpec{ - Name: "filestream", - Command: &component.CommandSpec{ - Args: []string{"filebeat"}, - }, - }, - }, - Units: []component.Unit{ - { - ID: "filebeat-test-id-unit", - Type: client.UnitTypeInput, - Config: component.MustExpectedConfig(map[string]any{ - "id": "test", - "use_output": "default", - "streams": []any{ - map[string]any{ - "id": "test-1", - "data_stream": map[string]any{ - "dataset": "generic-1", - }, - "paths": []any{ - "/var/log/*.log", - }, - }, - }, - }), - }, - }, - } - - metricbeatComponent := &component.Component{ - ID: "metricbeat-test-id", - InputType: "system/metrics", - InputSpec: &component.InputRuntimeSpec{ - BinaryName: "agentbeat", - Spec: component.InputSpec{ - Name: "system/metrics", - Command: &component.CommandSpec{ - Args: []string{"metricbeat"}, - }, - }, - }, - Units: []component.Unit{ - { - ID: "metricbeat-test-id-unit", - Type: client.UnitTypeInput, - Config: component.MustExpectedConfig(map[string]any{ - "id": "test", - "use_output": "default", - "type": "system/metrics", - "streams": []any{ - map[string]any{ - "id": "test-1", - "data_stream": map[string]any{ - "dataset": "generic-1", - }, - "metricsets": map[string]any{ - "cpu": map[string]any{ - "data_stream.dataset": "system.cpu", - }, - }, - }, - }, - }), - }, - }, - } - - tests := []struct { - name string - component *component.Component - outputQueueConfig map[string]any - beatMonitoringConfigGetter BeatMonitoringConfigGetter - expectedError string - expectedReceiverType string - expectedBeatName string - }{ - { - name: "filebeat component with default monitoring", - component: filebeatComponent, - outputQueueConfig: nil, - beatMonitoringConfigGetter: mockBeatMonitoringConfigGetter, - expectedReceiverType: "filebeatreceiver", - expectedBeatName: "filebeat", - }, - { - name: "metricbeat component with custom monitoring and queue config", - component: metricbeatComponent, - outputQueueConfig: map[string]any{ - "type": "memory", - "size": 1000, - }, - beatMonitoringConfigGetter: customBeatMonitoringConfigGetter, - expectedReceiverType: "metricbeatreceiver", - expectedBeatName: "metricbeat", - }, - { - name: "component with no input units", - component: &component.Component{ - ID: "no-inputs-test-id", - InputType: "filestream", - InputSpec: &component.InputRuntimeSpec{ - BinaryName: "agentbeat", - Spec: component.InputSpec{ - Name: "filestream", - Command: &component.CommandSpec{ - Args: []string{"filebeat"}, - }, - }, - }, - Units: []component.Unit{ - { - ID: "output-unit", - Type: client.UnitTypeOutput, - Config: component.MustExpectedConfig(map[string]any{ - "type": "elasticsearch", - }), - }, - }, - }, - outputQueueConfig: nil, - beatMonitoringConfigGetter: mockBeatMonitoringConfigGetter, - expectedReceiverType: "filebeatreceiver", - expectedBeatName: "filebeat", - }, - { - name: "unsupported component type", - component: &component.Component{ - ID: "unsupported-test-id", - InputType: "unsupported", - }, - outputQueueConfig: nil, - beatMonitoringConfigGetter: mockBeatMonitoringConfigGetter, - expectedError: "unknown otel receiver type for input type: unsupported", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - result, err := getReceiversConfigForComponent( - tt.component, - testAgentInfo, - tt.outputQueueConfig, - tt.beatMonitoringConfigGetter, - ) - - if tt.expectedError != "" { - assert.Error(t, err) - assert.ErrorContains(t, err, tt.expectedError) - assert.Nil(t, result) - return - } - - require.NoError(t, err) - assert.NotNil(t, result) - - // Verify the receiver ID is present - receiverID := fmt.Sprintf("%s/_agent-component/%s", tt.expectedReceiverType, tt.component.ID) - assert.Contains(t, result, receiverID) - - receiverConfig, ok := result[receiverID].(map[string]any) - assert.True(t, ok, "receiver config should be a map") - - // Verify configuration section presence - assert.Contains(t, receiverConfig, "output", "output config should be present") - assert.Contains(t, receiverConfig, "path", "path config should be present") - assert.Contains(t, receiverConfig, "logging", "logging config should be present") - assert.Contains(t, receiverConfig, tt.expectedBeatName, fmt.Sprintf("%s config should be present", tt.expectedBeatName)) - - // Verify queue configuration presence - if tt.outputQueueConfig != nil { - assert.Contains(t, receiverConfig, "queue", "queue config should be present") - } else { - assert.NotContains(t, receiverConfig, "queue", "queue config should not be present") - } - - // Verify monitoring configuration is present (http section should exist) - assert.Contains(t, receiverConfig, "http", "http monitoring config should be present") - expectedMonitoringConfig := tt.beatMonitoringConfigGetter(tt.component.ID, tt.component.InputSpec.BinaryName) - // If the monitoring getter is not nil, verify the http section is the same - if expectedMonitoringConfig != nil { - assert.Equal(t, expectedMonitoringConfig["http"], receiverConfig["http"]) - } - }) - } -} ->>>>>>> 779fafdcd ([beatreceivers] Integrate beatsauthextension (#9257)) From 731539b6771f0fcc949b9af16fef493071cd50c1 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 18 Sep 2025 10:20:32 +0530 Subject: [PATCH 3/3] add logger --- internal/pkg/agent/application/coordinator/coordinator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index e39dcddb4be..28ee19f4aab 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -1630,7 +1630,7 @@ func (c *Coordinator) updateOtelManagerConfig(model *component.Model) error { if len(model.Components) > 0 { var err error c.logger.With("components", model.Components).Debug("Updating otel manager model") - componentOtelCfg, err = translate.GetOtelConfig(model, c.agentInfo, c.monitorMgr.ComponentMonitoringConfig) + componentOtelCfg, err = translate.GetOtelConfig(model, c.agentInfo, c.monitorMgr.ComponentMonitoringConfig, logp.NewLogger("")) if err != nil { c.logger.Errorf("failed to generate otel config: %v", err) }