Skip to content
369 changes: 369 additions & 0 deletions rfcs/2022-08-25-12217-handling-discarded-events.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,369 @@
# RFC 12217 - 2022-08-25 - Handling Discarded Events

We want to give Vector users the assurance that events are never lost unexpectedly. There are
currently many circumstances in which events are simply discarded without recourse for users of
Vector to recover those events. This RFC presents a framework for handling those discarded events in
a uniform fashion in all components.

## Context

- [Support dead letter queue sources](https://github.com/vectordotdev/vector/issues/1772)
- [Vector sources acknowledge discarded events](https://github.com/vectordotdev/vector/issues/12217)
- [Reroute discarded events from throttle transform](https://github.com/vectordotdev/vector/issues/13549)
- Sink Error Handling RFC (internal document)

## Scope

### In scope

- Handling of discarded and errored events in sources, transforms, and sinks.

### Out of scope

- Handling of discarded data in sources before it is translated into events.
- Handling of discarded sink batch buffers.
Comment thread
tobz marked this conversation as resolved.
Outdated
Comment thread
lukesteensen marked this conversation as resolved.
Outdated

## Pain

Vector has a number of ways in which components may drop events due that are outside of the
operator's control:

- Transform processing
- Sink encoding failure
- Sink partitioning failure
- Failure to write to disk buffer
Comment thread
tobz marked this conversation as resolved.
- Incompatible event type
- Unhandled metric value type
- Unwired component output
Comment thread
lukesteensen marked this conversation as resolved.
Outdated
- Failure to send to next component

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this type of failure possible in cases other than a non-graceful shutdown (i.e. panic)?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, at the end of a graceful shutdown where components are being forcefully terminated. I think we could still send the data to the discard output in hopes it is still connected, and/or mark them as failed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're already forcefully terminating components then it seems unlikely that we'd be able to successfully send the events anywhere. I ask mostly because this would be a failure case that'd apply to every source and transform, even those that may not have any other way of dropping event, and I'd rather focus this feature around the components where there's a more meaningful type of failure possible.


This leads to the potential for lost events and even silently dropped events. Users want to be able
to capture discarded events at various points in the topology in order to forward them to fail-safe
destinations for diagnosis.
Comment thread
spencergilbert marked this conversation as resolved.

## Proposal

There are three parts to the proposed solution:

1. Add a new standardized output to all components that may discard events.
2. Update configuration validation to require that all outputs are handled.
3. Add a new configuration for discarding or rejecting outputs.

### User Experience

#### Add Discarded Event Output

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will be the format of the events flowing to the errors output? Will we follow the precedent set by the remap transform of wrapping the event with some additional metadata about the failure? I think it'd be good to call that out explicitly in here.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should take advantage of the new metadata namespacing too? That feature will only exist for logs, but we could extend it to metrics and traces. Also will we offer any guarantees about the metadata, if we do include it? I could see users starting to route on it based on the type of failure. I'd suggest we push back on that clearly document that the metadata is opaque and just for human consumption for now.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To validate your intuition, I would be tempted to route based on the metadata from an error. Hopefully a specific example will help:

Kafka sinks can throw errors when the broker sees a message that's too large. I'm finding it difficult to know the exact message size before sending, so it's hard to build a filter upstream. I could see handling this situation by routing the error. I might route to a console log or split the message and try again.

At a minimum, I could also envision using the error data to know whether dropping the message is okay or if I need to trigger alerts.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit out of scope for this RFC, but very much related -- adding an event output for events that successfully went through a sink would allow customers to provide better observability into events that were successfully delivered. For example, we would like to extract metrics from delivered events so we can show our customers how many events we successfully delivered. As it is now, we can only do that before the sink.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to file an issue about this, but as worded, it sounds like what you're looking for are the existing component metrics such as component_sent_events_total, which sinks do emit.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might clarify what I meant: #14708 (comment)


We will introduce a new output to all components that would otherwise discard events, named
`discards`.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will this relate to the existing errors output from the remap transform?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not exactly sure. I am of the mind that it should be split between errors and discards (ie abort), but that introduces a significant breaking change. If, on the other hand, we call the new output errors, this works for remap but not everything that is discarded is an error.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nevermind, the remap transform already has a dropped output that is what we need, so I think all the rest of this RFC is about errors in particular and not generic "discards".


#### Validate Handling All Outputs

Additionally, we will enhance the configuration validation to determine if all outputs are
handled. This will provide notification of the new discard outputs and ensure users are aware of the
new feature.

In order to avoid breaking existing configurations, this validation will initially produce a
deprecation warning, notifying the user that an output is present but unhandled. In a later version,
this deprecation will be turned into a hard error, preventing the acceptance of a configuration with
unhandled outputs.

We will also add a new command-line option and environment variable to opt into the stricter
validation for users that want the additional assurance this provides.

#### Simplify Discarded Output Handling

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the names used here awkward, but admittedly don't have any suggestions.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative name for disposition could be on_error (or on_discard).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be, but only as applies to the error output. The proposed disposition applies to all named outputs, including from the datadog_agent source and the remap transform.


This enforced handling will create considerable complication for most users, as well as the
potential of increased overhead and reduced performance if extra components have to be configured
just to handle the outputs that now need to be connected somewhere. To reduce this overhead, we will
add a new optional configuration setting to indicate the internal disposition of each output:
`outputs.NAME.disposition`. This will have two non-default values, `"drop"` to mark all events going

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you see this config applying to other outputs in the future? It sounds like we're only talking about a single hardcoded discards output, so I'm not sure that we need to nest the configuration in such a way that implies it could be applied to other outputs as well.

What's the benefit of outputs.NAME.disposition vs something like on_discard = {send,drop,reject}?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have some sources, like datadog_agent IIRC, that already have multiple outputs. If users do not want to wire up one of those outputs and we enforce that all outputs be handled, then now they need a blackhole for that output as well because we special-cased the discard handling. With the generic handling, all cases are covered.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, I didn't consider the effect on those sources. I am still a little bit hesitant to have configuration that refers to a named output that won't actually exist (when configured to drop or reject), since that could result in some confusion. I'm not sure I have a great alternative though, aside from introducing separate config on those sources that controls which events types are emitted.

to that output as delivered and then drop them, and `"reject"` to mark them as having failed. If
this setting is not present and a component has an unhandled output, a default behavior of `"drop"`
will be assumed, which matches the current behavior.
Comment thread
spencergilbert marked this conversation as resolved.
Outdated

So, to configure the new discard output as discarding events with an error result, users would add the
following:

```toml
[transforms.example]
type = "remap"
outputs.discards.disposition = "reject"
```

### Implementation

#### Configuration

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could have global configuration for the disposition as well? That might simplify it for simple use-cases. Like setting disposition = "drop" at the global level and have it apply to all components.

Could also do the same at the component level for those that already have multiple outputs in addition to letting them configure the disposition per-output.


The new options will show up in all of the enclosing configuration option structures:
Comment thread
lukesteensen marked this conversation as resolved.
Outdated

```rust
struct SourceOuter {

/// The configuration of all named outputs.
outputs: HashMap<String, OutputConfig>,
}

struct TransformOuter {

/// The configuration of all named outputs.
outputs: HashMap<String, OutputConfig>,
}

struct SinkOuter {

/// The configuration of all named outputs.
outputs: HashMap<String, OutputConfig>,
}

struct OutputConfig {
#[serde(default)]
disposition: OutputDisposition,
}

enum OutputDisposition {
/// Default behavior, send the event on to further components that name this as an input.
Send,
/// Discard the event and mark it is as delivered.
Drop,
/// Discard the event and mark it as failed.
Reject,
Comment on lines +145 to +150

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These make sense individually, but I find it... let's call it "weird", that I could have a component which selects another component's discards output but then that upstream component could basically just disable the sending of those discarded events even though I've named that output specifically?

If that's not the intention, then maybe this is just a docs problem, but that was my initial impression seeing these three distinct options.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will be something caught by the configuration verification stage, that "handled" outputs are not named as an input. This is an issue I discussed later in the document in "Alternatives" as not having a great solution IMO.

}
```

#### Sources
Comment thread
bruceg marked this conversation as resolved.

Sources already have facilities to handle a map of named outputs in `SourceSender`. The standard
builder will be extended to add a new standard output to accept discarded events. The `Inner` will be
rewritten to handle either discarding events or passing them on to the sender, and the send
functions will be extended to take the appropriate action based on this disposition.
Comment thread
tobz marked this conversation as resolved.

```rust
struct Inner {
disposition: InnerDisposition,
output: String,
lag_time: Option<Histogram>,
}

enum InnerDisposition {
Drop,
Reject,
Send(LimitedSender<EventArray>),
}
```

#### Transforms

##### "Function" Transforms

Function transforms are the simplest type, taking an event as input and outputting into a single
`OutputBuffer`. This is a strict subset of the synchronous transform type, and adding the necessary
output buffer type would make this type effectively equivalent to synchronous transforms. As such,
all function transforms that may discard events will be rewritten to the synchronous transform type.

##### "Synchronous" Transforms

Synchronous transforms already have support for named outputs. These will be configured with a new
named `discards` output. The output buffer `TransformOputputsBuf` will be extended with additional
convenience methods to making discarding events more straightforward.

##### "Task" Transforms

Each of the current task transforms will be refactored into either a simpler synchronous transform, which
will be handled as above, or into a more specialized `RuntimeTransform` (`aggregate` and `lua` version 2). The
runtime transform type runs inside of a task which, in turn, takes a transform output buffer as an
input parameter. That framework will be rewritten to accept named outputs, with the associated
convenience methods for handling discards. Further, since it is the only instance of a task
transform, that framework will replace the task transform, allowing us to build the outputs in the
topology builder.

- `aggregate` => runtime (flushes on timer tick)
- `aws_ec2_metadata` => synchronous
- `dedupe` => synchronous
- `lua` version 1 => synchronous
- `lua` version 2 => runtime
- `reduce` => runtime (flushes on timer tick)
- `tag_cardinality_limit` => synchronous
- `throttle` => synchronous (uses timers, but only to update state)

##### Output Buffer

The output from transforms is fed into an `OutputBuffer` that is currently a newtype wrapper for a
simple vector of event arrays. The additional output dispositions will be added as enum states:

```rust
pub enum OutputBuffer {
Drop,
Reject,
Store(Vec<EventArray>),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Store here is for events that are going to passed to a downstream component?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. I'll note that.

}
```

#### Sinks

Sinks do not have "outputs" as such. In order to add a discard output, we will need to wrap the
incoming events with an optional stream adapter. It will be built and added during topology
building, depending on the presence of a discard output handler. This handler will hold on to a copy
of the events and forward them on to the discard output when needed:

1. Clone the input event array.
1. Apply a new finalizer to the cloned events.
1. Send the events to the sink.
1. Wait on the reception of the signal from the finalizer.
1. If the events were delivered, mark the original events (with their original finalizers) as
delivered. Otherwise, send the events to the next output.

#### Validation

The configuration validation has a routine for checking the outputs of sources and transforms. This
will be enhanced to scan the topology for connectedness. This can be accomplished by setting up a
map of component outputs, marking each where they were seen (in either/or inputs or outputs), and
then scanning that table for entries that are seen as an output but not as an input.

## Rationale

The unified configuration presents a single visible interface to users, much like we do for
end-to-end acknowledgements, despite very different implementations under the hood. This allows for
less cognitive overhead involved in trying to figure out how to configure a component to redirect
discarded events.

The sink wrapper has the major advantage of requiring no changes to existing sinks to produce the
desired effect. Changing each sink to account for events as they flow through, track completion, and
forward to another output on failure would be a major undertaking. Since finalization already tracks
delivery status, we can instead reuse that handling to track this in a different place.

## Drawbacks

While the above proposal avoids rewriting sinks, it does require an audit and/or rewrite on all
sources and transforms. This will be costly to complete. Further, new sources and transforms will
need to abide by the same rules of operation, making them trickier to write except by copying
existing code.

Ideally, we should have some kind of compile-time enforcement on sources and transforms that would
prevent them from dropping events except through the appropriate interfaces. However, preventing the
invocation of `Drop` is only possible with some crude hacks, and can't be scoped to just one part of
the code while avoiding others.

## Alternatives

### Output Disposition

Instead of the output disposition configuration mechanism described above, we could add a simple
shorthand that is specific to the new discard output, such as `on_discard: "drop"`. However, there
are components that already have multiple outputs, or at least the option of having multiple
outputs. Once we enforce that all outputs are handled, these will also become an issue for some
users, which recommends the need for a more generic solution.

### Unhandled Output Configuration

Simplifying the configuration of previously unhandled outputs presents a bit of a Catch-22. Without
any help from Vector, this would require users to explicitly route the unhandled outputs to a new
`blackhole` sink. So, we want to add a shorthand to avoid the extra configuration that would
require, and potentially the extra running component internally.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section confuses me a little bit.

Why would a sink be needed if operators can control the output behavior at the component itself? Which is to say, if a source can configure its discarded events output to drop or reject... why would we ever set up a downstream component to collect those events unless it was actually going to do something with them?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "without any help from Vector" is trying to say that we are not helping sinks by allowing a source to configure its output to drop or reject. I will try to reword to clarify.


The form of that shorthand presents a bit of a conundrum, though. The configuration for consuming
outputs happens in the `inputs` section of downstream components. If we add the discard
configuration on the outputting components, that could create a configuration conflict of naming a
blackhole output as an input. ie

```toml
[sources.sample1]
outputs.error = "blackhole"

[sinks.sample2]
inputs = ["sample1.error"]
```

If, on the other hand, we add the discard configuration outside of the outputting component, that
isn't much of a simplification over just creating the blackhole component. ie

```toml
[sources.sample1]

[blackhole_outputs]
inputs = ["sample1.error"]
```

The proposed form was chosen as it allowed for the maximum flexability for configuring the discarded
outputs.

### Function Transforms

The "function" transform type could also be modified to take a new output type containing both the
output buffer and error buffer. This leaves the existing transform framework setup in place, but
increase the amount of work required to complete this project.

### Task Transform

We could potentially use the same kind of stream wrapper as for sinks in the task transform, which
receives and emits a stream of events. However, we would need to also catch the events on the other
side and reattach the original finalizers and drop the copied events, making this process much more
complex than for sinks, where the events don't have another output.

### Sink Wrapper

Instead of a wrapper running in front of sinks, we could pass an output buffer into sinks, much like
transforms, and simply modify them to write failed events to that output. This would have the
benefit of higher performance, as we could likely rework most existing sinks to avoid needing the
clone and would not need the extra finalizer and task to handle forwarding the events. On the other
hand, this performance loss appears to be relatively minor, likely in the low single digit percent

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like another area that could benefit from having a copy-on-write data structure for events to avoid a full clone (I know we do have this optimization now for when the event is completely unchanged, which should help here too).

range. This work would also need to done to each sink in turn which, combined with the potential for
substantial modifications to each sink to save copies of the events, would add up to a large
effort. This path could also increase the amount of ongoing maintenance required due to the
increased complexity of the code base.

We could potentially also integrate more closely with the buffer system that already sits in front
of sinks. Disk buffers already have a reasonable amount of overlap with what we’d like to do, since
they store copies of events until they have been positively acknowledged. The downside is that
memory buffers do not work this way and it would be a significant amount of work to make them work
this way. It’s also not clear how much of a performance hit this would involve, and changing how
memory buffers work would invoke that performance penalty between nearly every single component in
every Vector configuration. It’s still possible that some level of conditional integration with
buffers would be a good approach, but that path towards that is less clear than solving the problem
in isolation.

## Outstanding Questions

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any cross-cutting concerns between these new, conditional, error outputs and the configuration schema?


- Do we want to automate the process of rewriting configurations that have missing output handlers,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my suggestion above of allowing global configuration of the disposition would allow users to slowly take advantage of this new behavior component-by-component.

or can we just leave the behavior as producing an error?
Comment thread
tobz marked this conversation as resolved.
Outdated

- Should the option for turning unhandled output enforcement into a hard error be written as a more
generic switch to turn _all_ deprecations into errors?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! 👍

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This feels similar to firewall/WAF products having "advisory" and "blocking" mode. As a learner, it's nice to get things up and running and be warned of dangerous configurations. Once I'm comfortable and depending on the product, I want a "strict" mode that more forcefully calls things out.


- Do we need separate outputs to distinguish between discards and errors?
Comment thread
bruceg marked this conversation as resolved.
Outdated

- Do we want to have a third value for `output.X.disposition` to allow for marking events as either
permanently failed (ie rejected) vs temporarily failed (ie errored)?
Comment thread
bruceg marked this conversation as resolved.
Outdated

- What should be done with the existing discarded event metrics? Should they always be emitted, or
only when the output isn't consumed by another component? Do we need another disposition marker to
indicate discards are not to be counted as errors?
Comment on lines +360 to +362

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 if we're forcing it to be handled, I would think they could be counted like we do with other event streams - events/bytes sent/received. Practically, that seems like a hard sell given the work we've just put into the discarded metrics and compatibility with the ui.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree that if we're not dropping them on the floor, then they aren't really discarded... just routed elsewhere.

Although maybe with true discarded event handling, "discarded" metrics would simply be the indication of how many discarded events were sent by that component, and then if the event made it to a component where we just dropped/rejected it, without further daisy chained error outputs... that would be when we emit an actual error metric?

Dunno, I feel like we could argue the definition of this stuff until the cows come home.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yeah, the relationship between this and the discarded event metric will require some thought.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with tobz here. In the data flow from source to sink, since all the components are sending an event to the next component, the final discard event will be thrown by the last component.
I see the importance of the discarded metrics, it gives the true number of the events. Subsequent metrics from the dlq component helps us understand the lag behind discarded and writes to dlq, and the loss in these events too. We will need indicators, for example: the difference between the number of discarded events versus the discarded events written to the dlq component. These numbers need to be close to zero in a given window.

TL;DR: we need discarded events metrics, and some more.


## Plan Of Attack

Incremental steps to execute this change. These will be converted to issues after the RFC is approved:

- [ ] Add new output configuration settings.
- [ ] Add output disposition and new discard output to `SourceSender`.
- [ ] Update sources to discard events through the output buffers.
- [ ] Update transform `OutputBuffer` to add non-send dispositions.
- [ ] Add discard output to function transform.
- [ ] Add discard output to sync transform.
- [ ] Convert function transforms to synchronous transforms.
- [ ] Update sync transforms to discard events through the output buffers.
- [ ] Make runtime transforms a primitive transform type.
- [ ] Rewrite task transforms in terms of other transform types.
- [ ] Drop task transform type.
- [ ] Verify configured output connectedness.
- [ ] Eliminate `Dropped` event status.

## Future Improvements

We could potentially create a custom lint using [the dylint
framework](https://www.trailofbits.com/post/write-rust-lints-without-forking-clippy) to prevent code
that drops events in sources and transforms (and indirectly elsewhere) from compiling.