Conversation
… into neuronull/component_validation_sink_component_spec
… into neuronull/component_validation_sink_component_spec
…o neuronull/component_validation_sink_sad_path
Datadog ReportBranch report: ✅ 0 Failed, 2122 Passed, 0 Skipped, 1m 25.27s Wall Time |
…onull/OPA-1048_component_validation_fix_task_synchronization
| // this config only runs with the test case "encoding_error" in the yaml file. | ||
| ComponentTestCaseConfig::from_sink( | ||
| sad_config, | ||
| Some("encoding_error".to_owned()), |
There was a problem hiding this comment.
Note that with this PR, there is currently nothing using the functionality I added to tie a specific config to a specific test case(s). But, I think this could be valuable in the future so figured I'd leave the plumbing for that in place.
…x_task_synchronization
tobz
left a comment
There was a problem hiding this comment.
Overall, I think this is generally fine.
I left some comments around areas that perhaps felt a little too specialized to the test cases you were intending to fix, because it feels like we might be trading one bit of complexity for another in some cases... but none of it is blocking, and is really just meant to point out where to be mindful in the future during any further refactoring.
| if self.configuration.component_type == ComponentType::Sink { | ||
| input_task_coordinator.shutdown().await; | ||
|
|
||
| output_task_coordinator.shutdown().await; | ||
|
|
||
| let output_events = output_driver | ||
| .await | ||
| .expect("output driver task should not have panicked"); | ||
|
|
||
| telemetry_task_coordinator.shutdown().await; | ||
|
|
||
| topology_task_coordinator.shutdown().await; | ||
|
|
||
| output_events | ||
|
|
||
| // for sources, and transforms the output events are collected from the controlled edge (vector sink), | ||
| // and as such the coordination of shutdown of the tasks is more straight forward. | ||
| } else { | ||
| input_task_coordinator.shutdown().await; | ||
|
|
||
| telemetry_task_coordinator.shutdown().await; | ||
|
|
||
| topology_task_coordinator.shutdown().await; | ||
|
|
||
| output_task_coordinator.shutdown().await; | ||
|
|
||
| output_driver | ||
| .await | ||
| .expect("output driver task should not have panicked") | ||
| } | ||
| } |
There was a problem hiding this comment.
I might be misreading this, but I wonder: could you not just use the first approach for all cases?
Like, if the first approach is working for the purpose of collecting/waiting long enough for all output events as well as telemetry data... then I'm trying to think of why it also wouldn't work for sources and transforms? 🤔
There was a problem hiding this comment.
I had this thought as well when I was writing this. I had tried what you suggested and it didn't work for sources. I just tried it again and I realized there is a 3 birds one stone solution to the issues you have raised.
Instead of having the external resource be responsible for reporting if events received equals expected, we can do that in the output driver itself. This means sources will be able to use the same shutdown order, because the controlled edge doesn't need to coordinate things. And it means any other external resource implementation doesn't need to have that logic repeated.
It also presented an opportunity to add a timeout mechanism so that we still gracefully shutdown.
Regression Detector ResultsRun ID: b3e7db72-4e27-429d-89ed-4b62b3b9d64d Performance changes are noted in the perf column of each table:
No significant changes in experiment optimization goalsConfidence level: 90.00% There were no significant changes in experiment optimization goals at this confidence level and effect size tolerance.
|
| perf | experiment | goal | Δ mean % | Δ mean % CI |
|---|---|---|---|---|
| ➖ | syslog_log2metric_humio_metrics | ingress throughput | +2.90 | [+2.76, +3.05] |
| ➖ | syslog_log2metric_splunk_hec_metrics | ingress throughput | +2.67 | [+2.52, +2.83] |
| ➖ | syslog_humio_logs | ingress throughput | +1.57 | [+1.47, +1.68] |
| ➖ | socket_to_socket_blackhole | ingress throughput | +1.51 | [+1.44, +1.59] |
| ➖ | http_text_to_http_json | ingress throughput | +1.11 | [+0.98, +1.24] |
| ➖ | otlp_http_to_blackhole | ingress throughput | +0.82 | [+0.67, +0.98] |
| ➖ | file_to_blackhole | egress throughput | +0.82 | [-1.74, +3.38] |
| ➖ | syslog_regex_logs2metric_ddmetrics | ingress throughput | +0.75 | [+0.62, +0.88] |
| ➖ | syslog_loki | ingress throughput | +0.60 | [+0.55, +0.64] |
| ➖ | syslog_log2metric_tag_cardinality_limit_blackhole | ingress throughput | +0.60 | [+0.48, +0.71] |
| ➖ | splunk_hec_route_s3 | ingress throughput | +0.46 | [-0.04, +0.96] |
| ➖ | http_to_s3 | ingress throughput | +0.42 | [+0.14, +0.70] |
| ➖ | http_to_http_acks | ingress throughput | +0.19 | [-1.13, +1.51] |
| ➖ | datadog_agent_remap_datadog_logs | ingress throughput | +0.12 | [+0.02, +0.21] |
| ➖ | http_to_http_noack | ingress throughput | +0.11 | [+0.01, +0.21] |
| ➖ | http_to_http_json | ingress throughput | +0.03 | [-0.05, +0.10] |
| ➖ | splunk_hec_to_splunk_hec_logs_acks | ingress throughput | -0.00 | [-0.16, +0.16] |
| ➖ | splunk_hec_indexer_ack_blackhole | ingress throughput | -0.00 | [-0.15, +0.14] |
| ➖ | splunk_hec_to_splunk_hec_logs_noack | ingress throughput | -0.03 | [-0.14, +0.09] |
| ➖ | enterprise_http_to_http | ingress throughput | -0.11 | [-0.19, -0.03] |
| ➖ | syslog_splunk_hec_logs | ingress throughput | -0.11 | [-0.18, -0.05] |
| ➖ | datadog_agent_remap_datadog_logs_acks | ingress throughput | -0.22 | [-0.31, -0.13] |
| ➖ | fluent_elasticsearch | ingress throughput | -0.39 | [-0.85, +0.08] |
| ➖ | otlp_grpc_to_blackhole | ingress throughput | -0.51 | [-0.60, -0.42] |
| ➖ | datadog_agent_remap_blackhole | ingress throughput | -0.62 | [-0.71, -0.53] |
| ➖ | datadog_agent_remap_blackhole_acks | ingress throughput | -0.95 | [-1.04, -0.87] |
| ➖ | http_elasticsearch | ingress throughput | -0.96 | [-1.02, -0.89] |
Explanation
A regression test is an A/B test of target performance in a repeatable rig, where "performance" is measured as "comparison variant minus baseline variant" for an optimization goal (e.g., ingress throughput). Due to intrinsic variability in measuring that goal, we can only estimate its mean value for each experiment; we report uncertainty in that value as a 90.00% confidence interval denoted "Δ mean % CI".
For each experiment, we decide whether a change in performance is a "regression" -- a change worth investigating further -- if all of the following criteria are true:
-
Its estimated |Δ mean %| ≥ 5.00%, indicating the change is big enough to merit a closer look.
-
Its 90.00% confidence interval "Δ mean % CI" does not contain zero, indicating that if our statistical model is accurate, there is at least a 90.00% chance there is a difference in performance between baseline and comparison variants.
-
Its configuration does not mark it "erratic".
…work tasks (vectordotdev#19927) * add fix and small refactor * fix compilation errors * 3 ticks * dont compute expected metrics in validator * cleanup * cleanup * clippy * feedback tz: sent_eventssssss * feedback tz: fix telemetry shutdown finishing logic * 3 ticks * small reorg to add sinks * mini refactor of the component spec validators * attempt to set expected values from the resource * feedback tz- from not try_from * back to 3 ticks * fix incorrect expected values * Even more reduction * clippy * add the discarded events total check * workaround the new sync issues * multi config support * cleanup * check events * partial feedback * thought i removed that * use ref * feedback: dont introduce PassThroughFail variant * feedback: adjust enum variant names for clarity * feedback: no idea what I was thinking with `input_codec` * spell check * fr * fix sync issues * remove unused enum variant * feedback- update docs * check_events * touchup * spell checker * merge leftover * feedback: log formating * feedback- better approach to driving shutdown * give a generous timeout
TLDR:
This change does a couple things that were side effects of the main goal, which was to remove the sleep that was added in the runner for the case of sinks. That was a temporary hack to get us going but as was seen in #19887 , it's not a long term solution as we needed to increase the sleep there due to the sink taking longer to process things.
In this discussion , we talked about synchronizing the sources but not the sinks.
The first attempt I made was in the
TaskCoordinator, to splitshutdown()into two methods- one to trigger the shutdown and the other to wait for it to complete. After that I played around with various orders of things, triggering the input task to shutdown, triggering topology to shutdown, etc. None of these fixed the issues we were having, in most cases they made other problems arise.The "root cause" of the problem for sinks is that we need to have time for the sink to process all the events, the telemetry events to be received, AND for the external resource to process all the events. The only way to do this that I could see, was to drive the shutdown for sinks based on collecting the output task's result. Once we have that, we know the external resource has handled everything. Then it is safe to shutdown the component under test and the other tasks. As such, the solution was to have two different shutdown orders for sources/transforms, and for sinks.
The side effect of this change, was that it became obvious to me that the current solution for triggering errors in sinks is not ideal and not as representative of real world scenarios. This was discussed a bit in #18062. The current approach also raised challenges for the error path case in synchronizing the tasks shutdown, because we needed a way to know that the error had been raised by the sink somehow. The solution to this was to not leverage a codec error for the sinks, but instead an error from the external resource. This involved passing down the test case data to the external resource to be able to know when it needs to simulate an error.