Skip to content
394 changes: 394 additions & 0 deletions rfcs/2022-08-25-12217-handling-discarded-events.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,394 @@
# RFC 12217 - 2022-08-25 - Handling Discarded Events

We want to give Vector users the assurance that events are never lost unexpectedly. There are
currently many circumstances in which events are simply discarded without recourse for users of
Vector to recover those events. This RFC presents a framework for handling those discarded events in
a uniform fashion in all components.

## Context

- [Support dead letter queue sources](https://github.com/vectordotdev/vector/issues/1772)
- [Vector sources acknowledge discarded events](https://github.com/vectordotdev/vector/issues/12217)
- [Reroute discarded events from throttle transform](https://github.com/vectordotdev/vector/issues/13549)
- Sink Error Handling RFC (internal document)

## Cross cutting concerns

- The proposed changes will impact emission and receipt of end-to-end acknowledgements.
- The proposed configuration options should fit in the configuration schema, but may have
implications for its interpretation, particularly around the topology connections.
- Handling discarded events in any way will change the interpretation of several exiting internal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Handling discarded events in any way will change the interpretation of several exiting internal
- Handling discarded events in any way will change the interpretation of several existing internal

metrics.

## Scope

### In scope

- Handling of discarded events in sources, transforms, and sinks.

### Out of scope

- Handling of discarded data in sources before it is translated into events.
- Handling of discarded sink request data after it is translated from events.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be entirely accounted for by end-to-end acknowledgements? Or is there a gap I'm not thinking of?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is making me realize that end-to-end acks is another cross-cutting concern here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The situation here plays around like this: For an event, i.e, after all the retries have exhausted, will be marked as discarded. For a discarded event, the following will happen:

  1. Push to DLQ with retries
  2. If the write to DLQ was successful, acknowledge to the source for this event.
  3. if write to DLQ failed, drop the event and acknowledge to the source for this event.
    Is this a fair understanding?


## Pain

Vector has a number of ways in which components may drop events due to errors that are outside of
the operator's control:

- Transform processing
- Sink encoding failure
- Sink partitioning failure
- Failure to write to disk buffer

This leads to the potential for lost events and even silently dropped events. Users want to be able
to capture discarded events at various points in the topology in order to forward them to fail-safe
destinations for diagnosis.

## Proposal

There are three parts to the proposed solution:

1. Add a new standardized output to all components that may discard events after errors.
2. Update configuration validation to require that all outputs are handled.
3. Add a new configuration for discarding or rejecting outputs.

### User Experience

#### Add Discarded Event Output
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will be the format of the events flowing to the errors output? Will we follow the precedent set by the remap transform of wrapping the event with some additional metadata about the failure? I think it'd be good to call that out explicitly in here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should take advantage of the new metadata namespacing too? That feature will only exist for logs, but we could extend it to metrics and traces. Also will we offer any guarantees about the metadata, if we do include it? I could see users starting to route on it based on the type of failure. I'd suggest we push back on that clearly document that the metadata is opaque and just for human consumption for now.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To validate your intuition, I would be tempted to route based on the metadata from an error. Hopefully a specific example will help:

Kafka sinks can throw errors when the broker sees a message that's too large. I'm finding it difficult to know the exact message size before sending, so it's hard to build a filter upstream. I could see handling this situation by routing the error. I might route to a console log or split the message and try again.

At a minimum, I could also envision using the error data to know whether dropping the message is okay or if I need to trigger alerts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit out of scope for this RFC, but very much related -- adding an event output for events that successfully went through a sink would allow customers to provide better observability into events that were successfully delivered. For example, we would like to extract metrics from delivered events so we can show our customers how many events we successfully delivered. As it is now, we can only do that before the sink.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to file an issue about this, but as worded, it sounds like what you're looking for are the existing component metrics such as component_sent_events_total, which sinks do emit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might clarify what I meant: #14708 (comment)


We will introduce a new output to all components that would otherwise discard events, named
`errors`. Note that some components already have such an named output. This proposal standardizes
that output naming and provides additional support for handling it.
Comment on lines +60 to +62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should lean into existing nomenclature, namely "dead letter queues", to make this feel more familiar to users.


#### Validate Handling All Outputs

Additionally, we will enhance the configuration validation to determine if all outputs are
handled. This will provide notification of the new error output and ensure users are aware of the
new feature.

In order to avoid breaking existing configurations, this validation will initially produce a
deprecation warning, notifying the user that an output is present but unhandled. In a later version,
this deprecation will be turned into a hard error, preventing the acceptance of a configuration with
unhandled outputs.

We will also add a new command-line option and environment variable to opt into the stricter
validation for users that want the additional assurance this provides.

#### Simplify Discarded Output Handling
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the names used here awkward, but admittedly don't have any suggestions.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative name for disposition could be on_error (or on_discard).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be, but only as applies to the error output. The proposed disposition applies to all named outputs, including from the datadog_agent source and the remap transform.


This enforced handling will create considerable complication for most users, as well as the
potential of increased overhead and reduced performance if extra components have to be configured
just to handle the outputs that now need to be connected somewhere. To reduce this overhead, we will
add a new optional configuration setting to indicate the internal disposition of each output:
`outputs.NAME.disposition`. This will have two non-default values, `"drop"` to mark all events going
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you see this config applying to other outputs in the future? It sounds like we're only talking about a single hardcoded discards output, so I'm not sure that we need to nest the configuration in such a way that implies it could be applied to other outputs as well.

What's the benefit of outputs.NAME.disposition vs something like on_discard = {send,drop,reject}?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have some sources, like datadog_agent IIRC, that already have multiple outputs. If users do not want to wire up one of those outputs and we enforce that all outputs be handled, then now they need a blackhole for that output as well because we special-cased the discard handling. With the generic handling, all cases are covered.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, I didn't consider the effect on those sources. I am still a little bit hesitant to have configuration that refers to a named output that won't actually exist (when configured to drop or reject), since that could result in some confusion. I'm not sure I have a great alternative though, aside from introducing separate config on those sources that controls which events types are emitted.

to that output as delivered and then drop them, and `"reject"` to mark them as having failed. If
this setting is not present and a component has an unhandled output, a default behavior of `"drop"`
will be assumed, which matches the current behavior. Once validation of output handling is enforced
as above, this default will be removed.

So, to configure the new error output as discarding events with an error result, users would add the
following:

```toml
[transforms.example]
type = "remap"
outputs.errors.disposition = "reject"
```

### Implementation

#### Configuration
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could have global configuration for the disposition as well? That might simplify it for simple use-cases. Like setting disposition = "drop" at the global level and have it apply to all components.

Could also do the same at the component level for those that already have multiple outputs in addition to letting them configure the disposition per-output.


The new `output.*.disposition` configuration mapping will be added to components as needed to
support configuring outputs. In order for the validation to access it, a new method will be added to
all component configuration traits:

```rust
trait SourceConfig: NamedComponent + Debug + Send + Sync {
/// Gets the list of outputs exposed by this source (existing).
fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<config::Output>;

/// The actual user configuration of outputs.
/// Will be `None` if no such configuration is present on this component.
fn outputs_config(&self) -> Option<&OutputsConfig>;
}

trait TransformConfig: DescribeOutputs + NamedComponent + Debug + Send + Sync {
/// Gets the list of outputs exposed by this transform (existing).
fn outputs(&self, merged_definition: &schema::Definition) -> Vec<config::Output>;

/// The actual user configuration of outputs.
/// Will be `None` if no such configuration is present on this component.
fn outputs_config(&self) -> Option<&OutputsConfig>;
}

trait SinkConfig: DescribeOutputs + NamedComponent + Debug + Send + Sync {
/// Gets the list of outputs exposed by this sink.
fn outputs(&self) -> Vec<config::Output>;

/// The actual user configuration of outputs.
/// Will be `None` if no such configuration is present on this component.
fn outputs_config(&self) -> Option<&OutputsConfig>;
}

/// Configuration for a set of named outputs.
struct OutputsConfig(HashMap<String, OutputConfig>);

struct OutputConfig {
#[serde(default)]
disposition: OutputDisposition,
}

enum OutputDisposition {
/// Default behavior, send the event on to further components that name this as an input.
Send,
/// Discard the event and mark it is as delivered.
Drop,
/// Discard the event and mark it as failed.
Reject,
Comment on lines +145 to +150
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These make sense individually, but I find it... let's call it "weird", that I could have a component which selects another component's discards output but then that upstream component could basically just disable the sending of those discarded events even though I've named that output specifically?

If that's not the intention, then maybe this is just a docs problem, but that was my initial impression seeing these three distinct options.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will be something caught by the configuration verification stage, that "handled" outputs are not named as an input. This is an issue I discussed later in the document in "Alternatives" as not having a great solution IMO.

}
```

#### Sources

Sources already have facilities to handle a map of named outputs in `SourceSender`. The standard
builder will be extended to add a new standard output to accept discarded events. The `Inner` will be
rewritten to handle either discarding events or passing them on to the sender, and the send
functions will be extended to take the appropriate action based on this disposition.

```rust
struct Inner {
disposition: InnerDisposition,
output: String,
lag_time: Option<Histogram>,
}

enum InnerDisposition {
Drop,
Reject,
Send(LimitedSender<EventArray>),
}
```

#### Transforms

##### "Function" Transforms

Function transforms are the simplest type, taking an event as input and outputting into a single
`OutputBuffer`. This is a strict subset of the synchronous transform type, and adding the necessary
output buffer type would make this type effectively equivalent to synchronous transforms. As such,
all function transforms that may discard events will be rewritten to the synchronous transform type.

##### "Synchronous" Transforms

Synchronous transforms already have support for named outputs. These will be configured with a new
named `errors` output. The output buffer `TransformOutputsBuf` will be extended with additional
convenience methods to making discarding events more straightforward.

```rust
struct TransformOutputsBuf {
pub fn push_error(&mut self, event: Event);
}
```

##### "Task" Transforms

Each of the current task transforms will be refactored into either a simpler synchronous transform, which
will be handled as above, or into a more specialized `RuntimeTransform` (`aggregate` and `lua` version 2). The
runtime transform type runs inside of a task which, in turn, takes a transform output buffer as an
input parameter. That framework will be rewritten to accept named outputs, with the associated
convenience methods for handling errors. Further, since it is the only instance of a task
transform, that framework will replace the task transform, allowing us to build the outputs in the
topology builder.

- `aggregate` => runtime (flushes on timer tick)
- `aws_ec2_metadata` => synchronous
- `dedupe` => synchronous
- `lua` version 1 => synchronous
- `lua` version 2 => runtime
- `reduce` => runtime (flushes on timer tick)
- `tag_cardinality_limit` => synchronous
- `throttle` => synchronous (uses timers, but only to update state)

##### Output Buffer

The output from transforms is fed into an `OutputBuffer` that is currently a newtype wrapper for a
simple vector of event arrays (ie ```struct OutputBuffer(Vec<EventArray>);```). The additional
output dispositions will be added as enum states:

```rust
pub enum OutputBuffer {
Drop, // Mark events sent to this output as delivered and drop them.
Reject, // Mark events sent to this output as failed and drop them.
Store(Vec<EventArray>), // Forward the events to the next component, as before.
}
```

#### Sinks

Sinks do not have "outputs" as such. In order to add a error output, we will need to wrap the
incoming events with an optional stream adapter. It will be built and added during topology
building, depending on the presence of an output handler for `errors`. This handler will hold on to
a copy of the events and forward them on to the error output when needed:

1. Clone the input event array.
1. Apply a new finalizer to the cloned events.
1. Send the events to the sink.
1. Wait on the reception of the signal from the finalizer.
1. If the events were delivered, mark the original events (with their original finalizers) as
delivered. Otherwise, send the events to the next output.

#### Validation

The configuration validation has a routine for checking the outputs of sources and transforms. This
will be enhanced to scan the topology for connectedness. This can be accomplished by setting up a
map of component outputs, marking each where they were seen (in either/or inputs or outputs), and
then scanning that table for entries that are seen as an output but not as an input.

## Rationale

The unified configuration presents a single visible interface to users, much like we do for
end-to-end acknowledgements, despite very different implementations under the hood. This allows for
less cognitive overhead involved in trying to figure out how to configure a component to redirect
discarded events.

The sink wrapper has the major advantage of requiring no changes to existing sinks to produce the
desired effect. Changing each sink to account for events as they flow through, track completion, and
forward to another output on failure would be a major undertaking. Since finalization already tracks
delivery status, we can instead reuse that handling to track this in a different place.

## Drawbacks

While the above proposal avoids rewriting sinks, it does require an audit and/or rewrite on all
sources and transforms. This will be costly to complete. Further, new sources and transforms will
need to abide by the same rules of operation, making them trickier to write except by copying
existing code.

Ideally, we should have some kind of compile-time enforcement on sources and transforms that would
prevent them from dropping events except through the appropriate interfaces. However, preventing the
invocation of `Drop` is only possible with some crude hacks, and can't be scoped to just one part of
the code while avoiding others.

## Alternatives

### Output Disposition

Instead of the output disposition configuration mechanism described above, we could add a simple
shorthand that is specific to the new error output, such as `on_errors: "drop"`. However, there
are components that already have multiple outputs, or at least the option of having multiple
outputs. Once we enforce that all outputs are handled, these will also become an issue for some
users, which recommends the need for a more generic solution.

### Unhandled Output Configuration

Simplifying the configuration of previously unhandled outputs presents a bit of a Catch-22. As it
stands now, if we just add additional outputs and require all outputs to be handled, we would
require users to explicitly route the unhandled outputs to a new `blackhole` sink or equivalent. So,
we want to add a shorthand to avoid the extra configuration that would require, and potentially the
extra running component internally.

The form of that shorthand presents a bit of a conundrum, though. The configuration for consuming
outputs happens in the `inputs` section of downstream components. If we add the discard
configuration on the outputting components, that could create a configuration conflict of naming a
blackhole output as an input. ie

```toml
[sources.sample1]
outputs.errors = "discard"

[sinks.sample2]
inputs = ["sample1.errors"]
```

If, on the other hand, we add the discard configuration outside of the outputting component, that
isn't much of a simplification over just creating the blackhole component. ie

```toml
[sources.sample1]

[blackhole_outputs]
inputs = ["sample1.errors"]
```

The proposed form was chosen as it allowed for the maximum flexability for configuring the discarded
outputs.

### Function Transforms

The "function" transform type could also be modified to take a new output type containing both the
output buffer and error buffer. This leaves the existing transform framework setup in place, but
increase the amount of work required to complete this project.

### Task Transform

We could potentially use the same kind of stream wrapper as for sinks in the task transform, which
receives and emits a stream of events. However, we would need to also catch the events on the other
side and reattach the original finalizers and drop the copied events, making this process much more
complex than for sinks, where the events don't have another output.

### Sink Wrapper

Instead of a wrapper running in front of sinks, we could pass an output buffer into sinks, much like
transforms, and simply modify them to write failed events to that output. This would have the
benefit of higher performance, as we could likely rework most existing sinks to avoid needing the
clone and would not need the extra finalizer and task to handle forwarding the events. On the other
hand, this performance loss appears to be relatively minor, likely in the low single digit percent
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like another area that could benefit from having a copy-on-write data structure for events to avoid a full clone (I know we do have this optimization now for when the event is completely unchanged, which should help here too).

range. This work would also need to done to each sink in turn which, combined with the potential for
substantial modifications to each sink to save copies of the events, would add up to a large
effort. This path could also increase the amount of ongoing maintenance required due to the
increased complexity of the code base.

We could potentially also integrate more closely with the buffer system that already sits in front
of sinks. Disk buffers already have a reasonable amount of overlap with what we’d like to do, since
they store copies of events until they have been positively acknowledged. The downside is that
memory buffers do not work this way and it would be a significant amount of work to make them work
this way. It’s also not clear how much of a performance hit this would involve, and changing how
memory buffers work would invoke that performance penalty between nearly every single component in
every Vector configuration. It’s still possible that some level of conditional integration with
buffers would be a good approach, but that path towards that is less clear than solving the problem
in isolation.

## Outstanding Questions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any cross-cutting concerns between these new, conditional, error outputs and the configuration schema?


- Do we want to automate the process of rewriting configurations that have missing output handlers,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my suggestion above of allowing global configuration of the disposition would allow users to slowly take advantage of this new behavior component-by-component.

or is it adequate to just have a configuration validator?

- What should be done with the existing discarded event metrics? Should they always be emitted, or
only when the output isn't consumed by another component? Do we need another disposition marker to
indicate discards are not to be counted as errors?
Comment on lines +360 to +362
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 if we're forcing it to be handled, I would think they could be counted like we do with other event streams - events/bytes sent/received. Practically, that seems like a hard sell given the work we've just put into the discarded metrics and compatibility with the ui.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree that if we're not dropping them on the floor, then they aren't really discarded... just routed elsewhere.

Although maybe with true discarded event handling, "discarded" metrics would simply be the indication of how many discarded events were sent by that component, and then if the event made it to a component where we just dropped/rejected it, without further daisy chained error outputs... that would be when we emit an actual error metric?

Dunno, I feel like we could argue the definition of this stuff until the cows come home.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yeah, the relationship between this and the discarded event metric will require some thought.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with tobz here. In the data flow from source to sink, since all the components are sending an event to the next component, the final discard event will be thrown by the last component.
I see the importance of the discarded metrics, it gives the true number of the events. Subsequent metrics from the dlq component helps us understand the lag behind discarded and writes to dlq, and the loss in these events too. We will need indicators, for example: the difference between the number of discarded events versus the discarded events written to the dlq component. These numbers need to be close to zero in a given window.

TL;DR: we need discarded events metrics, and some more.


- Does the list of outputs exposed by the sink need the schema definition passed in like transforms
do?

## Plan Of Attack

Incremental steps to execute this change. These will be converted to issues after the RFC is approved:

- [ ] Add new output configuration settings.
- [ ] Add output disposition and new error output to `SourceSender`.
- [ ] Update sources (`kubernetes_logs`) to discard events through the output buffers.
- [ ] Update transform `OutputBuffer` to add non-send dispositions.
- [ ] Add error output to function transform.
- [ ] Add error output to sync transform.
- [ ] Convert function transforms to synchronous transforms.
- [ ] Update sync transforms to discard events through the output buffers.
- [ ] Make runtime transforms a primitive transform type.
- [ ] Rewrite task transforms in terms of other transform types.
- [ ] Drop task transform type.
- [ ] Verify configured output connectedness.
- [ ] Eliminate `Dropped` event status.

## Future Improvements

We could potentially create a custom lint using [the dylint
framework](https://www.trailofbits.com/post/write-rust-lints-without-forking-clippy) to prevent code
that drops events in sources and transforms (and indirectly elsewhere) from compiling.

We may want to consider lifing the option for turning unhandled output enforcement into a hard error
into a more generic switch to turn all deprecations into errors. This is beyond the scope of this
proposal, as the mechanism for doing so will vary greatly across the different locations where
deprecations are present.
Loading