Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -430,3 +430,5 @@ extensions/upstreams/tcp @ggreenway @mattklein123
/contrib/qat/ @giantcroc @soulxu
/contrib/generic_proxy/ @wbpcode @UNOWNED
/contrib/tap_sinks/ @coolg92003 @yiyibaoguo
/contrib/peak_ewma/filters/http/ @rroblak @UNOWNED
/contrib/peak_ewma/load_balancing_policies/ @rroblak @UNOWNED
Comment on lines +433 to +434
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may as well just own /contrib/peak_ewma.

12 changes: 12 additions & 0 deletions api/contrib/envoy/extensions/filters/http/peak_ewma/v3alpha/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py.

load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package")

licenses(["notice"]) # Apache 2

api_proto_package(
deps = [
"@com_github_cncf_xds//udpa/annotations:pkg",
"@com_github_cncf_xds//xds/annotations/v3:pkg",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
syntax = "proto3";

package envoy.extensions.filters.http.peak_ewma.v3alpha;

import "xds/annotations/v3/status.proto";

import "udpa/annotations/status.proto";

option java_package = "io.envoyproxy.envoy.extensions.filters.http.peak_ewma.v3alpha";
option java_outer_classname = "PeakEwmaProto";
option java_multiple_files = true;
option go_package = "github.com/envoyproxy/go-control-plane/contrib/envoy/extensions/filters/http/peak_ewma/v3alpha";
option (udpa.annotations.file_status).package_version_status = ACTIVE;
option (xds.annotations.v3.file_status).work_in_progress = true;

// [#protodoc-title: Peak EWMA HTTP Filter]
// Configuration for the Peak EWMA HTTP filter.
// This filter measures request RTT and provides timing data to the Peak EWMA load balancer.

// [#extension: envoy.filters.http.peak_ewma]
message PeakEwmaConfig {
option (xds.annotations.v3.message_status).work_in_progress = true;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py.

load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package")

licenses(["notice"]) # Apache 2

api_proto_package(
deps = [
"@com_github_cncf_xds//udpa/annotations:pkg",
"@com_github_cncf_xds//xds/annotations/v3:pkg",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
syntax = "proto3";

package envoy.extensions.load_balancing_policies.peak_ewma.v3alpha;

import "google/protobuf/duration.proto";
import "google/protobuf/wrappers.proto";

import "xds/annotations/v3/status.proto";

import "udpa/annotations/status.proto";

option java_package = "io.envoyproxy.envoy.extensions.load_balancing_policies.peak_ewma.v3alpha";
option java_outer_classname = "PeakEwmaProto";
option java_multiple_files = true;
option go_package = "github.com/envoyproxy/go-control-plane/contrib/envoy/extensions/load_balancing_policies/peak_ewma/v3alpha";
option (udpa.annotations.file_status).package_version_status = ACTIVE;
option (xds.annotations.v3.file_status).work_in_progress = true;

// [#protodoc-title: Peak EWMA Load Balancer Configuration]
// Configuration for the Peak EWMA (Exponentially Weighted Moving Average) load balancing policy.
//
// This policy implements a latency-aware variant of the Power of Two Choices (P2C) algorithm.
// It selects the best host from two randomly chosen candidates based on a cost function:
// `Cost = RTT_peak_ewma * (active_requests + 1)`.
//
// The Peak EWMA algorithm is designed to:
// - Automatically route traffic away from slow or overloaded hosts
// - Adapt to changing host performance without manual configuration
// - Provide low-latency request routing with O(1) host selection complexity
// - Work effectively in heterogeneous environments with varying host capabilities
//
// RTT measurements are automatically collected from HTTP request timing and used to update
// the EWMA for each host. This provides real-time performance feedback for routing decisions.
//
// Important: This load balancer only considers latency and load when selecting hosts. It does
// not handle host health or error responses - these should be managed by Envoy's health checking
// and outlier detection systems. Peak EWMA operates on the pool of healthy hosts as determined
// by these other systems.
//
// [#extension: envoy.load_balancing_policies.peak_ewma]
// [#next-free-field: 6]
message PeakEwma {
option (xds.annotations.v3.message_status).work_in_progress = true;

// The decay time for the RTT EWMA calculation. This specifies the time window over which
// latency observations are considered relevant. After this duration, older measurements
// have exponentially decayed to half their original weight.
//
// The Peak EWMA algorithm uses this to calculate the EWMA time constant (tau):
// `tau = decay_time_nanos`, and the EWMA reaches its half-life after `tau * ln(2)`.
//
// This parameter is more intuitive than a raw smoothing factor as it directly relates
// to the time duration over which you want to observe latency trends.
//
// If not specified, defaults to 10 seconds (following Finagle's default).
google.protobuf.Duration decay_time = 1;

// The interval at which EWMA data is aggregated from worker threads to the main thread.
// This controls the frequency of cross-thread synchronization for the per-thread aggregation model.
//
// A shorter interval provides more up-to-date cross-worker information but increases
// synchronization overhead. A longer interval reduces overhead but may cause workers
// to operate with staler information about other workers' latency observations.
//
// If not specified, defaults to 100 milliseconds.
google.protobuf.Duration aggregation_interval = 2;

// Maximum RTT samples to buffer per host per worker thread before overwriting oldest samples.
// This bounds memory usage while allowing burst traffic handling.
//
// Buffer capacity formula: max_samples_per_host / aggregation_interval = RPS capacity per host per worker
// Memory formula: max_samples_per_host × num_hosts × num_workers × 16 bytes
// Memory usage per worker = max_samples_per_host × num_hosts × 16 bytes
//
// If not specified, defaults to 1,000 samples per host per worker.
google.protobuf.UInt32Value max_samples_per_host = 3;

// Default RTT value to use for hosts that don't have measured RTT yet.
// This provides a baseline for cost calculations until actual measurements are available.
//
// This value is critical for initial load balancing decisions when hosts first join
// the cluster or when RTT measurements are temporarily unavailable. It should reflect
// the expected baseline latency for your environment:
//
// If not specified, defaults to 10 milliseconds.
google.protobuf.Duration default_rtt = 4;

// Penalty cost assigned to hosts that cannot provide valid cost calculations.
// This is used when a host has no RTT measurements or is unhealthy, ensuring
// the Power of Two Choices algorithm will prefer hosts with known performance.
//
// You probably should not change this value.
//
// The penalty should be significantly higher than any realistic RTT-based cost
// to ensure hosts with unknown performance are strongly deprioritized while
// still allowing them to receive traffic if no better alternatives exist.
//
// If not specified, defaults to 1,000,000.0 (1 million).
google.protobuf.DoubleValue penalty_value = 5;
}
2 changes: 2 additions & 0 deletions api/versioning/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ proto_library(
"//contrib/envoy/extensions/filters/http/dynamo/v3:pkg",
"//contrib/envoy/extensions/filters/http/golang/v3alpha:pkg",
"//contrib/envoy/extensions/filters/http/language/v3alpha:pkg",
"//contrib/envoy/extensions/filters/http/peak_ewma/v3alpha:pkg",
"//contrib/envoy/extensions/filters/http/sxg/v3alpha:pkg",
"//contrib/envoy/extensions/filters/network/client_ssl_auth/v3:pkg",
"//contrib/envoy/extensions/filters/network/generic_proxy/codecs/kafka/v3:pkg",
Expand All @@ -28,6 +29,7 @@ proto_library(
"//contrib/envoy/extensions/filters/network/sip_proxy/router/v3alpha:pkg",
"//contrib/envoy/extensions/filters/network/sip_proxy/tra/v3alpha:pkg",
"//contrib/envoy/extensions/filters/network/sip_proxy/v3alpha:pkg",
"//contrib/envoy/extensions/load_balancing_policies/peak_ewma/v3alpha:pkg",
"//contrib/envoy/extensions/matching/input_matchers/hyperscan/v3alpha:pkg",
"//contrib/envoy/extensions/network/connection_balance/dlb/v3alpha:pkg",
"//contrib/envoy/extensions/private_key_providers/cryptomb/v3alpha:pkg",
Expand Down
6 changes: 6 additions & 0 deletions contrib/contrib_build_config.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ CONTRIB_EXTENSIONS = {
"envoy.filters.http.dynamo": "//contrib/dynamo/filters/http/source:config",
"envoy.filters.http.golang": "//contrib/golang/filters/http/source:config",
"envoy.filters.http.language": "//contrib/language/filters/http/source:config_lib",
"envoy.filters.http.peak_ewma": "//contrib/peak_ewma/filters/http/source:config",
"envoy.filters.http.sxg": "//contrib/sxg/filters/http/source:config",

#
Expand Down Expand Up @@ -77,6 +78,11 @@ CONTRIB_EXTENSIONS = {
#
"envoy.generic_proxy.codecs.kafka": "//contrib/generic_proxy/filters/network/source/codecs/kafka:config",

#
# Load balancing policies
#
"envoy.load_balancing_policies.peak_ewma": "//contrib/peak_ewma/load_balancing_policies/source:config",

#
# xDS delegates
#
Expand Down
14 changes: 14 additions & 0 deletions contrib/extensions_metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,17 @@ envoy.upstreams.http.tcp.golang:
- envoy.upstreams
security_posture: requires_trusted_downstream_and_upstream
status: alpha
envoy.filters.http.peak_ewma:
categories:
- envoy.filters.http
security_posture: requires_trusted_downstream_and_upstream
status: alpha
type_urls:
- envoy.extensions.filters.http.peak_ewma.v3alpha.PeakEwmaConfig
envoy.load_balancing_policies.peak_ewma:
categories:
- envoy.load_balancing_policies
security_posture: requires_trusted_downstream_and_upstream
status: alpha
type_urls:
- envoy.extensions.load_balancing_policies.peak_ewma.v3alpha.PeakEwma
40 changes: 40 additions & 0 deletions contrib/peak_ewma/filters/http/source/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_contrib_extension",
"envoy_cc_library",
"envoy_contrib_package",
)

licenses(["notice"]) # Apache 2

envoy_contrib_package()

envoy_cc_library(
name = "peak_ewma_http_filter_lib",
srcs = [
"peak_ewma_filter.cc",
],
hdrs = [
"peak_ewma_filter.h",
],
repository = "@envoy",
deps = [
"//contrib/peak_ewma/load_balancing_policies/source:peak_ewma_lb_lib",
"//envoy/http:filter_interface",
"//source/common/common:utility_lib",
"//source/common/http:utility_lib",
"//source/extensions/filters/http/common:pass_through_filter_lib",
],
)

envoy_cc_contrib_extension(
name = "config",
srcs = ["peak_ewma_filter_config.cc"],
hdrs = ["peak_ewma_filter_config.h"],
deps = [
":peak_ewma_http_filter_lib",
"//envoy/registry",
"//source/extensions/filters/http/common:factory_base_lib",
"@envoy_api//contrib/envoy/extensions/filters/http/peak_ewma/v3alpha:pkg_cc_proto",
],
)
56 changes: 56 additions & 0 deletions contrib/peak_ewma/filters/http/source/peak_ewma_filter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#include "contrib/peak_ewma/filters/http/source/peak_ewma_filter.h"

#include "envoy/stream_info/stream_info.h"

#include "source/common/common/utility.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace PeakEwma {

Http::FilterHeadersStatus PeakEwmaRttFilter::encodeHeaders(Http::ResponseHeaderMap&, bool) {
// Get upstream host from stream info.
const StreamInfo::StreamInfo& stream_info = encoder_callbacks_->streamInfo();
const auto& upstream_info = stream_info.upstreamInfo();

if (upstream_info && upstream_info->upstreamHost()) {
const auto& host_description = upstream_info->upstreamHost();

// Get host-attached Peak EWMA data for RTT sample storage.
auto peak_data_opt =
host_description
->typedLbPolicyData<LoadBalancingPolicies::PeakEwma::PeakEwmaHostLbPolicyData>();
if (peak_data_opt.has_value()) {
LoadBalancingPolicies::PeakEwma::PeakEwmaHostLbPolicyData& peak_data = peak_data_opt.ref();

// Calculate TTFB RTT using UpstreamTiming data (more accurate than response time).
const auto& upstream_timing = upstream_info->upstreamTiming();
if (upstream_timing.first_upstream_tx_byte_sent_ &&
upstream_timing.first_upstream_rx_byte_received_) {
auto ttfb_rtt = std::chrono::duration_cast<std::chrono::milliseconds>(
*upstream_timing.first_upstream_rx_byte_received_ -
*upstream_timing.first_upstream_tx_byte_sent_);

// Record RTT sample in host-attached atomic storage.
uint64_t timestamp_ns =
std::chrono::duration_cast<std::chrono::nanoseconds>(
decoder_callbacks_->dispatcher().timeSource().monotonicTime().time_since_epoch())
.count();

peak_data.recordRttSample(static_cast<double>(ttfb_rtt.count()), timestamp_ns);

// RTT sample recorded successfully.
}
} else {
// Host missing Peak EWMA data - should not happen after initialization.
}
}

return Http::FilterHeadersStatus::Continue;
}

} // namespace PeakEwma
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
25 changes: 25 additions & 0 deletions contrib/peak_ewma/filters/http/source/peak_ewma_filter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include "envoy/http/filter.h"
#include "envoy/upstream/cluster_manager.h"

#include "source/extensions/filters/http/common/pass_through_filter.h"

#include "contrib/peak_ewma/load_balancing_policies/source/peak_ewma_lb.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace PeakEwma {

class PeakEwmaRttFilter : public Http::PassThroughFilter {
public:
// Override encode headers to capture RTT
Http::FilterHeadersStatus encodeHeaders(Http::ResponseHeaderMap& headers,
bool end_stream) override;
};

} // namespace PeakEwma
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
25 changes: 25 additions & 0 deletions contrib/peak_ewma/filters/http/source/peak_ewma_filter_config.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#include "contrib/peak_ewma/filters/http/source/peak_ewma_filter_config.h"

#include "envoy/registry/registry.h"

#include "contrib/peak_ewma/filters/http/source/peak_ewma_filter.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace PeakEwma {

Http::FilterFactoryCb PeakEwmaFilterConfigFactory::createFilterFactoryFromProtoTyped(
const envoy::extensions::filters::http::peak_ewma::v3alpha::PeakEwmaConfig&, const std::string&,
Server::Configuration::FactoryContext&) {
return [](Http::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addStreamFilter(std::make_shared<PeakEwmaRttFilter>());
};
}

REGISTER_FACTORY(PeakEwmaFilterConfigFactory, Server::Configuration::NamedHttpFilterConfigFactory);

} // namespace PeakEwma
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
28 changes: 28 additions & 0 deletions contrib/peak_ewma/filters/http/source/peak_ewma_filter_config.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#pragma once

#include "source/extensions/filters/http/common/factory_base.h"

#include "contrib/envoy/extensions/filters/http/peak_ewma/v3alpha/peak_ewma.pb.h"
#include "contrib/envoy/extensions/filters/http/peak_ewma/v3alpha/peak_ewma.pb.validate.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace PeakEwma {

class PeakEwmaFilterConfigFactory
: public Extensions::HttpFilters::Common::FactoryBase<
envoy::extensions::filters::http::peak_ewma::v3alpha::PeakEwmaConfig> {
public:
PeakEwmaFilterConfigFactory() : FactoryBase("envoy.filters.http.peak_ewma") {}

private:
Http::FilterFactoryCb createFilterFactoryFromProtoTyped(
const envoy::extensions::filters::http::peak_ewma::v3alpha::PeakEwmaConfig& proto_config,
const std::string& stats_prefix, Server::Configuration::FactoryContext& context) override;
};

} // namespace PeakEwma
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
Loading
Loading