Conversation
|
Pinging @elastic/es-core-infra (Team:Core/Infra) |
|
Just a passing comment, without having looked deeply at the code:
In general, I found myself having the same issues in my PR when I tried to check the emitted metrics. I added another API to the server at the new path and added protobuf parsing. This looks more flexible and long term so happy to make this work! |
|
Hey @mamazzol,
The protobuf parsing is pretty straightforward assuming we're allowed to pull in additional test dependencies. (Production dependencies are frowned upon, but test dependencies ought to be ok.) |
|
|
||
| /** | ||
| * Parses a single line of APM intake NDJSON into a protocol-neutral {@link ReceivedTelemetry} event. | ||
| * Intake-specific; a future OTLP decoder will produce the same ADT from OTLP payloads. |
There was a problem hiding this comment.
Heh Cursor can be so earnest about its intentions sometimes.
This way, instead of mutating the mode, we can have a separate test for each mode.
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Repository YAML (base), Organization UI (inherited) Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughAdds explicit flush hooks to telemetry: ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/OtelMetricsIT.java`:
- Around line 21-24: The cluster builder currently hardcodes "127.0.0.1" for the
OTLP endpoint; change the endpoint construction in the static `cluster` (built
via `AbstractMetricsIT.baseClusterBuilder()`) to use the recorder's loopback
address from `recordingApmServer.getHttpAddress()` instead of a literal IPv4
address, e.g. use the `InetSocketAddress`/address's host string (via
`recordingApmServer.getHttpAddress().getHostString()` or equivalent) combined
with `recordingApmServer.getPort()` so the exporter hits the correct loopback
address (handles `::1` on IPv6-preferred systems).
In
`@test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/RecordingApmServer.java`:
- Around line 37-42: The getReceivedMessages() implementation is draining the
ArrayBlockingQueue received (using poll) so it only returns leftovers; instead
maintain an append-only history list (e.g., a thread-safe
List<ReceivedTelemetry> retainedHistory) that the consumer thread and
addMessageConsumer(...) append to when they remove items from received, and
change getReceivedMessages() to return a snapshot of retainedHistory; update the
consumerThread() logic (referenced by messageConsumerThread and consumer) to add
each polled ReceivedTelemetry to retainedHistory before invoking the volatile
consumer, and ensure any existing code that currently drains received (the poll
calls) no longer discards items without recording them.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository YAML (base), Organization UI (inherited)
Review profile: CHILL
Plan: Pro
Run ID: b626d4ce-d98b-4b55-a3b7-fcb7f8858691
📒 Files selected for processing (21)
modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMMeterService.javamodules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMTelemetryProvider.javamodules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/MeterSupplier.javamodules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/OTelSdkMeterSupplier.javamodules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracer.javaserver/src/main/java/org/elasticsearch/telemetry/TelemetryProvider.javatest/external-modules/apm-integration/build.gradletest/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/AbstractMetricsIT.javatest/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/ApmAgentMetricsIT.javatest/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/ApmIntakeMessageParser.javatest/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/MetricsApmIT.javatest/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/OtelMetricsIT.javatest/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/OtlpMetricsParser.javatest/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/ReceivedTelemetry.javatest/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/RecordingApmServer.javatest/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/TracesApmIT.javatest/external-modules/apm-integration/src/main/java/org/elasticsearch/test/apmintegration/ApmIntegrationPlugin.javatest/external-modules/apm-integration/src/main/java/org/elasticsearch/test/apmintegration/FlushTelemetryRestHandler.javatest/framework/src/main/java/org/elasticsearch/telemetry/TestTelemetryPlugin.javatest/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.javax-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportTlsHandshakeThrottleTests.java
💤 Files with no reviewable changes (1)
- test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/MetricsApmIT.java
modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMMeterService.java
Outdated
Show resolved
Hide resolved
...m-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/OtelMetricsIT.java
Show resolved
Hide resolved
...egration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/RecordingApmServer.java
Show resolved
Hide resolved
modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMMeterService.java
Outdated
Show resolved
Hide resolved
modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMMeterService.java
Outdated
Show resolved
Hide resolved
| * Parses a single line of APM intake NDJSON into a protocol-neutral {@link ReceivedTelemetry} event. | ||
| * Intake-specific; a future OTLP decoder will produce the same ADT from OTLP payloads. | ||
| */ | ||
| public final class ApmIntakeMessageParser { |
There was a problem hiding this comment.
Can we use this to also assert metric attributes?
There was a problem hiding this comment.
I'm not sure what you mean. Are there existing assertions on attributes?
My intent was to generalize existing tests so they could be used as regression tests when we remove the APM agent. We could add more assertions in the future, of course, but I'd like to add them to this PR only if they already exist in the current tests.
There was a problem hiding this comment.
Something to keep in mind for follow up PR then! For a complete coverage of metrics, attributes are important so we need to find a way to assert on them as well.
| import static org.elasticsearch.rest.RestRequest.Method.GET; | ||
|
|
||
| /** | ||
| * REST handler for tests that triggers a flush of all telemetry (traces, metrics) so tests can await export. |
There was a problem hiding this comment.
What is this doing for the tests that wouldn't work on it's own? Also, is this actually restricted to tests or would it be available to call?
There was a problem hiding this comment.
The REST endpoint is just for tests. I added the flush methods because we're going to want them eventually anyway, but I don't think there's any need to expose them via a REST endpoint in production.
This endpoint provides a way for tests to cause metrics to be emitted to the RecordingApmServer at a predictable time, after which they can make assertions. Without this, we would just have to wait a while.
We could make the other test endpoints do flushes at the end automatically, but I opted for this because I thought in the future the tests would be more composable this way: they could call multiple REST endpoints in various combos and then do one flush.
| public static ElasticsearchCluster cluster = AbstractMetricsIT.baseClusterBuilder() | ||
| .systemProperty("telemetry.otel.metrics.enabled", "true") | ||
| .setting("telemetry.otel.metrics.endpoint", () -> "http://127.0.0.1:" + recordingApmServer.getPort() + "/v1/metrics") | ||
| .setting("telemetry.otel.metrics.interval", "10m") // one giant batch instead of multiple small ones with deltas we need to sum |
There was a problem hiding this comment.
Was 10m a value you intended to use forever? Can it make suites go for way too long?
There was a problem hiding this comment.
It's essentially meant to be "forever" because a flush will happen first.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@test/external-modules/apm-integration/src/main/java/org/elasticsearch/test/apmintegration/FlushTelemetryRestHandler.java`:
- Around line 45-52: prepareRequest in FlushTelemetryRestHandler calls
telemetryProvider.get() without null-check, which can return null if the plugin
hasn't finished initialization; update prepareRequest to check the result of
telemetryProvider.get() before invoking
attemptFlushMetrics()/attemptFlushTraces() and handle the null case by sending
an appropriate error response (e.g., RestStatus.SERVICE_UNAVAILABLE with a short
message) instead of dereferencing null; reference the telemetryProvider.get()
call and the methods attemptFlushMetrics()/attemptFlushTraces() in the handler
to locate where to add the null-check and response path.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository YAML (base), Organization UI (inherited)
Review profile: CHILL
Plan: Pro
Run ID: 1b65d41a-70bb-4120-becc-63c9b8c3e337
📒 Files selected for processing (21)
modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMMeterService.javamodules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMTelemetryProvider.javamodules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/MeterSupplier.javamodules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/OTelSdkMeterSupplier.javamodules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracer.javaserver/src/main/java/org/elasticsearch/telemetry/TelemetryProvider.javatest/external-modules/apm-integration/build.gradletest/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/AbstractMetricsIT.javatest/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/ApmAgentMetricsIT.javatest/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/ApmIntakeMessageParser.javatest/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/MetricsApmIT.javatest/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/OtelMetricsIT.javatest/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/OtlpMetricsParser.javatest/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/ReceivedTelemetry.javatest/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/RecordingApmServer.javatest/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/TracesApmIT.javatest/external-modules/apm-integration/src/main/java/org/elasticsearch/test/apmintegration/ApmIntegrationPlugin.javatest/external-modules/apm-integration/src/main/java/org/elasticsearch/test/apmintegration/FlushTelemetryRestHandler.javatest/framework/src/main/java/org/elasticsearch/telemetry/TestTelemetryPlugin.javatest/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.javax-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportTlsHandshakeThrottleTests.java
💤 Files with no reviewable changes (1)
- test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/MetricsApmIT.java
✅ Files skipped from review due to trivial changes (2)
- modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/MeterSupplier.java
- test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/ReceivedTelemetry.java
🚧 Files skipped from review as they are similar to previous changes (10)
- modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMTelemetryProvider.java
- modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/OTelSdkMeterSupplier.java
- test/framework/src/main/java/org/elasticsearch/telemetry/TestTelemetryPlugin.java
- test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/OtelMetricsIT.java
- test/external-modules/apm-integration/src/main/java/org/elasticsearch/test/apmintegration/ApmIntegrationPlugin.java
- x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportTlsHandshakeThrottleTests.java
- test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/ApmAgentMetricsIT.java
- test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/ApmIntakeMessageParser.java
- test/external-modules/apm-integration/src/javaRestTest/java/org/elasticsearch/test/apmintegration/RecordingApmServer.java
- modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/APMMeterService.java
...tegration/src/main/java/org/elasticsearch/test/apmintegration/FlushTelemetryRestHandler.java
Show resolved
Hide resolved
| // from the code, but we're using the APM agent (instead of the OTel SDK) to export it. | ||
| // That's why this "else" branch, where otelMetricsEnabled is false, is still using OpenTelemetry. | ||
|
|
||
| /* |
There was a problem hiding this comment.
nit: perhaps this would be best placed near the method that flushed to not cause confusion with the comment above.
There was a problem hiding this comment.
My intent was to comment why we're doing a wait, so I put the comment on agentFlushWaitMs, but I see your point. Let me rearrange things a little.
* Fix tests so I can run them locally * Flush interfaces * Cursor refactor: two IT subclasses instead of one. This way, instead of mutating the mode, we can have a separate test for each mode. * Long interval, not short! * Refactor APM ITs to use protocol-independent assertions * TELEMETRY_TIMEOUT * Tidy: use addAll instead of forEach(::add) * Logging in APMMeterService * Cleanup APMMeterService.attemptFlushMetrics * Minor changes based on coderabbit review * Rearrange comments per PR feedback
* Fix tests so I can run them locally * Flush interfaces * Cursor refactor: two IT subclasses instead of one. This way, instead of mutating the mode, we can have a separate test for each mode. * Long interval, not short! * Refactor APM ITs to use protocol-independent assertions * TELEMETRY_TIMEOUT * Tidy: use addAll instead of forEach(::add) * Logging in APMMeterService * Cleanup APMMeterService.attemptFlushMetrics * Minor changes based on coderabbit review * Rearrange comments per PR feedback
This lays a foundation for removing the APM agent and exporting telemetry directly using the OTel API. The intent is to test the current behaviour in a way that we can continue testing once the APM agent is gone.
One hurdle is that the APM agent doesn't support the OpenTelemetry Line Protocol (OTLP) and instead uses Elastic's intake NDJSON format, meaning that the tests need to be written in a way that is independent of the line protocol. The existing mock server didn't work that way: it simply captured all the NDJSON text and allowed the test to make assertions about the captured text.
To make this suitable for testing both before and after removing the APM agent, I've moved the parsing logic from the test itself into the mock server to parse incoming intake NDJSON into semantically meaningful data structures for each event, and changed the tests to assert on those structures. The intent is that a future PR can add mock endpoints that accept OTLP and produce the same data structures, allowing us to test both protocols with the same assertions, thereby achieving high confidence that removing the APM agent hasn't made any meaningful difference to the telemetry stream.
I've also added tests for the metrics the APM agent already emits (e.g. JVM and system metrics), so we have coverage for those before switching.
Relates to ES-14012