Add experimental resource validator processor#1956
Add experimental resource validator processor#1956jmacd merged 23 commits intoopen-telemetry:mainfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1956 +/- ##
==========================================
- Coverage 85.78% 85.72% -0.07%
==========================================
Files 513 515 +2
Lines 163122 163764 +642
==========================================
+ Hits 139941 140384 +443
- Misses 22647 22846 +199
Partials 534 534
🚀 New features to boost your workflow:
|
|
@AaronRM - I’m seeing some intermittent CI failures related to durable_buffer_processor test. This PR doesn’t touch that code, and the failures went away on rerun. Flagging it in case you’ve seen this before. |
cijothomas
left a comment
There was a problem hiding this comment.
LGTM.
One minor point - this looks quite specific for Azure, so maybe rename it to be Azure specific processor. If there is interest to have a general purpose one, then we can add such a one in future too.
Not a blocker, still in experimental feature only.
| // Metrics/Traces Arrow views not yet available - convert to OTLP | ||
| // TODO: Implement OtapMetricsView/OtapTracesView to avoid clone + conversion |
There was a problem hiding this comment.
nit: if we have an issue number already, please link it, otherwise please create a new issue in the repo!
| for resource_logs in logs_view.resources() { | ||
| if let Some(resource) = resource_logs.resource() { |
There was a problem hiding this comment.
I can imagine that if we wanted to, that we could optimize the Arrow-based implementation of this logic in a way that would be faster, maybe a single pass over the ResourceAttrs table. @albertlockett could probably guide us. :-)
There was a problem hiding this comment.
Hey -- Sorry it took me a few days to reply to this, I was on vacation 🌴 !
This is something we might be able to optimize using Arrow compute kernels, but performing this check isn't trivial. If we did implement this, it would also be good to benchmark the against the existing approach for different batch sizes.
For each row, there's basically three things we need to check:
- key == required_attribute_key
- type == AttributeValueType::Str
- value in (allowed_values)
So we could do something like:
use otap_df_pdata::{
otlp::attributes::AttributeValueType,
schema::consts
};
use arrow::compute::kernels::{boolean::{and, or}, cmp::eq, filter::filter};
// TODO handle missing columns & other errors correctly instead of using unwrapping
let type_col = resource_attrs_record_batch.column_by_name(consts::ATTRIBUTE_TYPE).unwrap();
let key_col = reousrce_attrs_record_batch.column_by_name(conts::ATTRIBUTE_KEY).unwrap();
let has_type = eq(type_col, &UInt8Array::new_scalar(AttributeValuesType::Str as u8).unwrap();
let has_key = eq(key_col, &StringArray::new_scalar(required_attribute_key).unwrap();
// Note - I'm unwrapping this, but str column is actually optional - if it's not present,
// it means there are no non-null/empty string attributes, which would effectively mean
// we reject this batch, unless empty string is one of allowed_values the allowed values,
// then I guess we accept it?
let str_col = reousrce_attrs_record_batch.column_by_name(consts::ATTRIBUTE_STR).unwrap();
// here, we compare against every row in the values column.. Depending on the cardinality,
// it might actually be faster to prefilter down str_col using and(has_type, has_key) and compare
// each value of the prefiltered str_col against each allowed value. Clearly that would change how
// we take the parent_id column below (we'd just do valid_rows = filter(&parent_ids, &has_val))
let mut has_val = BooleanArray::new_unset(str_col.len());
for val in allowed_values {
let has_this_val = eq(str_col, &StringArray::new_scalar(val)).unwrap();
has_val = or(&has_val, &has_this_val).unwrap();
}
// combine filter results together - this will be a boolean array where the rows represent the
// rows that have valid resource attribtues
let valid_rows = and(&has_type, and(&has_key, &has_val).unwrap()).unwrap();
//
let parent_ids = resource_attrs_record_batch.column_by_name(consts::PARENT_ID).unwrap();
let valid_resource_ids = filter(&parent_ids, &valid_rows).unwrap();One you have the valid resource IDs, the next step would be to get the Resource ID column from the logs/traces/metrics record batch (it's inside a struct column), and ensure that every row contains a non-null value that is in the valid_resource_ids array.
Typically, we've done this by collecting valid_resource_ids into a Roaring Bitmap.
let id_mask: RoaringBitmap = valid_resource_ids
.as_any()
.downcast_ref::<UInt16Array>()
.unwrap()
.iter().flatten().map(|i| i.into()).collect();
let resource_ids = root_rb
.column_by_name(consts::RESOURCE)
.and_then(|arr| arr.as_any().downcast_ref::<StructArray>())
.and_then(|arr| arr.column_by_name(consts::ID));
// then do something like:
// for id in resource_ids {
// if id.is_none() |!id_mask.contains(id) { return Err }
// }
Note: if the resource ID column contains a null value, it means the log/trace/metric has no resource attributes, which I guess would mean it fails the validity check.
There was a problem hiding this comment.
BTW, the filter module in columnar query-engine already has a bunch of code for doing this kind of attribute filter & joining the parent_ids back to the main record batch.
Unfortunately what we don't yet have would be a mechanism to apply this filter & then reject the entire batch, nor any way to expose that behaviour via OPL.
I might spend some time this week to figure out how we'd do that. I think it would be a good opportunity to improve OPL/Columnar Query Engine w/ this real world use case.
There was a problem hiding this comment.
@jmacd @lalitb (cc @lquerel): We might be able to use OPL/columnar query engine to check if the batch is valid in the case where the signal is OTAP. This would save a round-trip from OTAP to OTLP for metrics/traces (since we don't yet have OTAP backed views implemented signal types), and the filtering might be faster (although it would be good to benchmark to confirm).
Below is an example of a pipeline that we could invoke to check if the signal is valid.
The pipeline will basically split the batch, and if there's any rows that have an invalid resource ID, it will try to "route" them to a route called "invalid". We then provide a custom "Router" that checks if it receives any rows, and sets a flag that some rows were invalid. We then make the Nack/Forward decision based on this flag.
use data_engine_kql_parser::Parser;
use otap_df_opl::parser::OplParser;
use otap_df_query_engine::pipeline::{
Pipeline,
routing::{RouteName, Router, RouterExtType},
state::ExecutionState,
};
let pipeline_expr = OplParser::parse(r#"
signals |
if (not(
resource.attributes["microsoft.resourceId"] == "value1" or
resource.attributes["microsoft.resourceId"] == "value2"
)) {
route_to "invalid"
}
"#).unwrap().pipeline;
let mut pipeline = Pipeline::new(pipeline_expr);
// "Router" impl that just sets the valid flag to false if anything is routed
struct ValidityRouter {
valid: bool
}
#[async_trait(?Send)]
impl Router for ValidityRouter {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}
async fn send(&mut self,
_route_name: RouteName,
otap_batch: OtapArrowRecords
) -> otap_df_query_engine::error::Result<()> {
let is_empty = otap_batch.root_record_batch().is_none();
if !is_empty {
self.valid = false;
}
Ok(())
}
}
let mut exec_state = ExecutionState::new();
exec_state.set_extension::<RouterExtType>(Box::new(ValidityRouter{ valid: true }));
// batch 1 (valid)
let logs_bytes = create_logs_request_with_resource(vec![KeyValue::new(
"microsoft.resourceId",
AnyValue::new_string("value1"),
)]);
let pdata: OtapPayload = OtlpProtoBytes::ExportLogsRequest(logs_bytes).into();
let otap_batch1: OtapArrowRecords = pdata.try_into().unwrap();
// batch 2 (not valid)
let logs_bytes = create_logs_request_with_resource(vec![KeyValue::new(
"microsoft.resourceId",
AnyValue::new_string("value_not_valid"),
)]);
let pdata: OtapPayload = OtlpProtoBytes::ExportLogsRequest(logs_bytes).into();
let otap_batch2: OtapArrowRecords = pdata.try_into().unwrap();
// batch 3 (valid)
let logs_bytes = create_logs_request_with_resource(vec![KeyValue::new(
"microsoft.resourceId",
AnyValue::new_string("value2"),
)]);
let pdata: OtapPayload = OtlpProtoBytes::ExportLogsRequest(logs_bytes).into();
let otap_batch3: OtapArrowRecords = pdata.try_into().unwrap();
for otap_batch in [otap_batch1, otap_batch2, otap_batch3] {
let _ = pipeline.execute_with_state(otap_batch, &mut exec_state).await.unwrap();
let router_impl = exec_state
.get_extension_mut::<RouterExtType>()
.and_then(|router| router.as_any_mut().downcast_mut::<ValidityRouter>())
.unwrap();
if !router_impl.valid {
// reset validity flag
router_impl.valid = true;
println!("not valid (NACK)");
} else {
println!("valid (send to default out port)");
}
}Prints:
valid (send to default out port)
not valid (NACK)
valid (send to default out port)
I think the major caveat here is that building the actual OPL query is a bit tricky, as we don't yet support any kind of parameterized pipelines (certainly its something we can/should add soon). Doing something like the following might open us up to injection attacks if we don't trust the processor config:
let pipeline_expr = OplParser::parse(&format!(
r#"
signals |
if (not(
{}
)) {{
route_to "invalid"
}}
"#,
allowed_values
.iter()
.map(|val| {
format!(
"resource.attributes[\"microsoft.resourceId\"] == \"{}\"",
val
)
})
.collect::<Vec<String>>()
.join(" or ")
))
.unwrap()
.pipeline;To get around this, we could technically build the pipeline expression manually using AST Expression types in the query_engine/expressions crate
https://github.com/open-telemetry/otel-arrow/tree/main/rust/experimental/query_engine/expressions
Good point. If the interest is to have a general-purpose processor, then we should probably consider checking non-string values as well and possibly look for multiple resource attributes. |
|
I think qualifies as a general purpose component, but we can start with it in experimental. |
Thanks for the feedback! The processor itself is fully generic - with no Azure-specific logic. The use case that motivated it happens to be an Azure scenario , but the same pattern applies to any multi-tenant environment. I think keeping the generic name makes sense since the implementation doesn't have anything Azure-specific in it. The reason it's under experimental (and also under feature flag) is that there's no equivalent processor in the Go collector, so moving it to core would mean shipping it in the binary without any existing users outside our use case. Great point about non-string values and multiple attribute keys - I think those would be wonderful additions that could be contributed by folks who have those use cases. Keeping the current scope minimal and letting the community extend it as needed feels like a nice collaborative path forward. |
6446866
fixes: #1941
Description:
Adds an experimental
resource_validator_processorthat validates telemetry resources against an allowlist.Behavior:
microsoft.resourceId(or configurable attribute) exists on resourcesConfig:
Feature flag: resource-validator-processor
Status: Static config only. Extensible for future dynamic auth context support.