Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/otap-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ azure-monitor-exporter = ["otap-df-otap/azure-monitor-exporter"]
experimental-processors = ["otap-df-otap/experimental-processors"]
condense-attributes-processor = ["otap-df-otap/condense-attributes-processor"]
recordset-kql-processor = ["otap-df-otap/recordset-kql-processor"]
resource-validator-processor = ["otap-df-otap/resource-validator-processor"]

[workspace.lints.rust]
# General compatibility lints
Expand Down
1 change: 1 addition & 0 deletions rust/otap-dataflow/crates/otap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ recordset-kql-processor = [
"dep:data_engine_recordset_otlp_bridge",
"dep:data_engine_recordset"
]
resource-validator-processor = ["experimental-processors"]
Comment thread
lalitb marked this conversation as resolved.


[dev-dependencies]
Expand Down
4 changes: 4 additions & 0 deletions rust/otap-dataflow/crates/otap/src/experimental/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ pub mod condense_attributes_processor;
/// Recordset KQL OTLP Query Engine processor
#[cfg(feature = "recordset-kql-processor")]
pub mod recordset_kql_processor;

/// Resource Validator processor for validating resource attributes
#[cfg(feature = "resource-validator-processor")]
pub mod resource_validator_processor;
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Resource Validator Processor

The Resource Validator Processor validates that telemetry data contains a
required resource attribute with a value from an allowed list. Requests that
fail validation are permanently NACKed, enabling clients to detect
misconfiguration immediately rather than having data silently dropped.

## Example Use Case

In multi-tenant cloud environments, telemetry includes a resource attribute
(e.g., `cloud.resource_id`) containing an identifier for the resource.
The collector must:

1. Validate the attribute exists on the Resource
2. Check value is in an allowed list
3. Reject with permanent NACK on failure

This enables clients to detect misconfiguration immediately rather than having
data silently dropped.

## Why Existing Processors Don't Work

| Processor | Behavior | Gap |
| -------------------- | -------------------------------- | ------------------ |
| Filter Processor | Silently drops non-matching data | No error to client |
| Transform Processor | Modifies/transforms data | Cannot reject |
| Attributes Processor | Adds/updates/deletes attributes | Cannot reject |

None validate attribute values against an allowlist with error propagation
to client.

## Configuration

```yaml
processors:
resource_validator:
# The resource attribute key that must be present (required, no default)
required_attribute_key: "cloud.resource_id"

# List of allowed values (required - empty list rejects all values)
allowed_values:
- "/subscriptions/xxx/resourceGroups/yyy/..."
- "/subscriptions/aaa/resourceGroups/bbb/..."

# Case-sensitive comparison (default: true)
case_sensitive: false
```

## Behavior

| Condition | Result |
| -------------------------------------------- | ------------------------- |
| Attribute present with value in allowed list | Pass through |
| Attribute present, empty allowed_values list | Permanent NACK |
| Attribute missing | Permanent NACK |
| Attribute wrong type (not string) | Permanent NACK |
| Attribute value not in allowed list | Permanent NACK |

> **Note:** The processor sends a permanent NACK (`NackMsg::new_permanent`), but
> the receiver currently maps all NACKs to HTTP 503 / gRPC UNAVAILABLE. Returning
> HTTP 400 / gRPC INVALID_ARGUMENT for permanent NACKs requires receiver-side changes.

## Metrics

- `resource_validator_batches_accepted` - Batches that passed validation
- `resource_validator_batches_rejected_missing` - Rejected: missing attribute
- `resource_validator_batches_rejected_not_allowed` - Rejected: invalid value
- `resource_validator_batches_rejected_invalid_type` - Rejected:
attribute not a string
- `resource_validator_batches_rejected_conversion_error` - Rejected:
internal conversion error
- `resource_validator_items_accepted` - Telemetry items accepted
- `resource_validator_items_rejected` - Telemetry items rejected

## Feature Flag

This processor is experimental and requires the `resource-validator-processor`
feature flag:

```toml
[dependencies]
otap-df-otap = { version = "...", features = ["resource-validator-processor"] }
```

## Extensibility for Dynamic Auth Context

The processor is designed to be extensible for future dynamic validation via
auth context:

### Current Implementation (Static)

Uses `allowed_values` from configuration:

```yaml
processors:
resource_validator:
allowed_values:
- "/subscriptions/xxx/..." # Static list from config
```

### Future Implementation (Dynamic)

When SAT auth extension is ready, allowed values can come from request auth
context:

```text
+-----------+ +------------------+ +--------------------+ +----------+
| Client |-->| OTLP Receiver |-->| Resource Validator |-->| Exporter |
| + token | | + Auth Extension | | Processor | | |
+-----------+ +------------------+ +--------------------+ +----------+
| |
v v
Auth Extension: Processor reads from
1. Validates token context instead of config
2. Extracts claims via get_allowed_values()
3. Sets ctx.auth
```

### Extension Points

The processor provides these extension points for dynamic auth:

1. **`AllowedValuesSource` enum**: Supports `Static` (current) and `Dynamic`
(future) modes
2. **`get_allowed_values()` method**: Returns allowed values for a request;
can be extended to check `pdata.context().auth()` first
3. **Fallback support**: Dynamic mode includes config fallback for requests
without auth context

When the SAT auth extension is implemented, the `get_allowed_values()` method
can be updated to:

1. Check if the request context contains auth information
2. Extract allowed resource IDs from auth claims
3. Fall back to static config if auth context is unavailable
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//! Configuration for the Resource Validator Processor

use otap_df_config::error::Error as ConfigError;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;

/// Configuration for the Resource Validator Processor
///
/// This processor validates that a required resource attribute exists and its value
/// is in an allowed list. Requests that fail validation are NACKed with a permanent
/// error, enabling clients to detect misconfiguration immediately.
///
/// # Example Configuration
/// ```yaml
/// processors:
/// resource_validator:
/// required_attribute_key: "cloud.resource_id"
/// allowed_values:
/// - "/subscriptions/xxx/resourceGroups/yyy/..."
/// case_sensitive: false # optional, defaults to true
/// ```
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
/// The resource attribute key that must be present on all resources.
/// This is a required field with no default.
pub required_attribute_key: String,

/// List of allowed values for the required attribute.
/// Empty list rejects all values.
Comment thread
lalitb marked this conversation as resolved.
#[serde(default)]
pub allowed_values: Vec<String>,

/// Whether to perform case-sensitive comparison of attribute values.
/// Note: this only affects `allowed_values` matching. The `required_attribute_key`
/// key lookup is always case-sensitive.
/// Default: true
#[serde(default = "default_case_sensitive")]
pub case_sensitive: bool,
Comment thread
lalitb marked this conversation as resolved.
}

const fn default_case_sensitive() -> bool {
true
}

impl Config {
/// Validates the configuration.
pub fn validate(&self) -> Result<(), ConfigError> {
if self.required_attribute_key.trim().is_empty() {
return Err(ConfigError::InvalidUserConfig {
error: "required_attribute_key cannot be empty".to_string(),
});
}
if self.allowed_values.is_empty() {
return Err(ConfigError::InvalidUserConfig {
error: "allowed_values cannot be empty (would reject all data)".to_string(),
});
}
Ok(())
}

/// Returns a pre-processed set of allowed values for efficient lookup.
/// If case_sensitive is false, all values are lowercased.
#[must_use]
pub fn allowed_values_set(&self) -> HashSet<String> {
if self.case_sensitive {
self.allowed_values.iter().cloned().collect()
} else {
self.allowed_values
.iter()
.map(|v| v.to_lowercase())
.collect()
}
}
}

#[cfg(test)]
mod tests {
use super::*;

/// Helper to create a config for testing
fn test_config(required_attribute_key: &str, allowed_values: Vec<&str>) -> Config {
Config {
required_attribute_key: required_attribute_key.to_string(),
allowed_values: allowed_values.into_iter().map(String::from).collect(),
case_sensitive: true,
}
}

#[test]
fn test_validate_empty_attribute() {
let config = Config {
required_attribute_key: "".to_string(),
allowed_values: vec!["value".to_string()],
case_sensitive: true,
};
assert!(config.validate().is_err());
}

#[test]
fn test_validate_empty_allowed_values() {
let config = test_config("cloud.resource_id", vec![]);
assert!(config.validate().is_err());
}

#[test]
fn test_validate_valid_config() {
let config = test_config("cloud.resource_id", vec!["/subscriptions/123"]);
assert!(config.validate().is_ok());
}

#[test]
fn test_allowed_values_set_case_sensitive() {
let config = Config {
required_attribute_key: "cloud.resource_id".to_string(),
allowed_values: vec!["Value1".to_string(), "Value2".to_string()],
case_sensitive: true,
};
let set = config.allowed_values_set();
assert!(set.contains("Value1"));
assert!(set.contains("Value2"));
assert!(!set.contains("value1"));
}

#[test]
fn test_allowed_values_set_case_insensitive() {
let config = Config {
required_attribute_key: "cloud.resource_id".to_string(),
allowed_values: vec!["Value1".to_string(), "VALUE2".to_string()],
case_sensitive: false,
};
let set = config.allowed_values_set();
assert!(set.contains("value1"));
assert!(set.contains("value2"));
assert!(!set.contains("Value1"));
}

#[test]
fn test_deserialize_config() {
let json = r#"{
"required_attribute_key": "my.attribute",
"allowed_values": ["val1", "val2"],
"case_sensitive": false
}"#;
let config: Config = serde_json::from_str(json).unwrap();
assert_eq!(config.required_attribute_key, "my.attribute");
assert_eq!(config.allowed_values, vec!["val1", "val2"]);
assert!(!config.case_sensitive);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//! Metrics for the Resource Validator Processor

use otap_df_telemetry::instrument::Counter;
use otap_df_telemetry_macros::metric_set;

/// Metrics collected by the Resource Validator Processor.
///
/// Tracks both batch-level and item-level counts. Validation is pass/fail for
/// the entire batch — if any resource fails, the whole batch is NACKed. Item
/// counts capture the magnitude of data loss on rejection.
#[metric_set(name = "resource_validator.processor.metrics")]
#[derive(Debug, Default, Clone)]
pub struct ResourceValidatorMetrics {
/// Number of batches that passed validation
Comment thread
lalitb marked this conversation as resolved.
#[metric(unit = "{batch}")]
pub batches_accepted: Counter<u64>,

/// Number of batches rejected due to missing required attribute
#[metric(unit = "{batch}")]
pub batches_rejected_missing: Counter<u64>,

/// Number of batches rejected due to value not in allowed list
#[metric(unit = "{batch}")]
pub batches_rejected_not_allowed: Counter<u64>,

/// Number of batches rejected due to invalid attribute type (not a string)
#[metric(unit = "{batch}")]
pub batches_rejected_invalid_type: Counter<u64>,

/// Number of batches rejected due to internal conversion error
#[metric(unit = "{batch}")]
pub batches_rejected_conversion_error: Counter<u64>,

/// Number of telemetry items accepted
#[metric(unit = "{item}")]
pub items_accepted: Counter<u64>,

/// Number of telemetry items rejected
#[metric(unit = "{item}")]
pub items_rejected: Counter<u64>,
}
Loading
Loading