Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion internal/testutils/xds/e2e/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ func DefaultBootstrapContents(t *testing.T, nodeID, serverURI string) []byte {
bs, err := bootstrap.NewContentsForTesting(bootstrap.ConfigOptionsForTesting{
Servers: []byte(fmt.Sprintf(`[{
"server_uri": "passthrough:///%s",
"channel_creds": [{"type": "insecure"}]
"channel_creds": [{"type": "insecure"}],
"server_features": ["trusted_xds_server"]
}]`, serverURI)),
Node: []byte(fmt.Sprintf(`{"id": "%s"}`, nodeID)),
CertificateProviders: cpc,
Expand Down
3 changes: 3 additions & 0 deletions internal/testutils/xds/e2e/clientresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,8 @@ type BackendOptions struct {
HealthStatus v3corepb.HealthStatus
// Weight sets the backend weight. Defaults to 1.
Weight uint32
// Hostname sets the endpoint hostname for authority rewriting.
Hostname string
// Metadata sets the LB endpoint metadata (envoy.lb FilterMetadata field).
// See https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/core/v3/base.proto#envoy-v3-api-msg-config-core-v3-metadata
Metadata map[string]any
Expand Down Expand Up @@ -721,6 +723,7 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: b.Ports[0]},
},
}},
Hostname: b.Hostname,
AdditionalAddresses: additionalAddresses,
}},
HealthStatus: b.HealthStatus,
Expand Down
11 changes: 8 additions & 3 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,9 +574,14 @@ type CallHdr struct {

DoneFunc func() // called when the stream is finished

// Authority is used to explicitly override the `:authority` header. If set,
// this value takes precedence over the Host field and will be used as the
// value for the `:authority` header.
// Authority is used to explicitly override the `:authority` header.
//
// This value comes from one of two sources:
// 1. The `CallAuthority` call option, if specified by the user.
// 2. An override provided by the LB picker (e.g. xDS authority rewriting).
//
// The `CallAuthority` call option always takes precedence over the LB
// picker override.
Authority string
}

Expand Down
2 changes: 1 addition & 1 deletion internal/xds/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ func (s) TestClusterUpdate_SuccessWithLRS(t *testing.T) {
ServiceName: serviceName,
EnableLRS: true,
})
lrsServerCfg, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{URI: fmt.Sprintf("passthrough:///%s", mgmtServer.Address)})
lrsServerCfg, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{URI: fmt.Sprintf("passthrough:///%s", mgmtServer.Address), ServerFeatures: []string{"trusted_xds_server"}})
if err != nil {
t.Fatalf("Failed to create LRS server config for testing: %v", err)
}
Expand Down
5 changes: 5 additions & 0 deletions internal/xds/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"google.golang.org/grpc/internal/xds/clients"
"google.golang.org/grpc/internal/xds/clients/lrsclient"
"google.golang.org/grpc/internal/xds/xdsclient"
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
Expand Down Expand Up @@ -420,6 +421,7 @@ type scWrapper struct {
// locality needs to be atomic because it can be updated while being read by
// the picker.
locality atomic.Pointer[clients.Locality]
hostname string
}

func (scw *scWrapper) updateLocalityID(lID clients.Locality) {
Expand All @@ -442,6 +444,9 @@ func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer
}
var sc balancer.SubConn
scw := &scWrapper{}
if len(addrs) > 0 {
scw.hostname = xdsresource.Hostname(addrs[0])
}
oldListener := opts.StateListener
opts.StateListener = func(state balancer.SubConnState) {
b.updateSubConnState(sc, state, oldListener)
Expand Down
25 changes: 17 additions & 8 deletions internal/xds/balancer/clusterimpl/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
xdsinternal "google.golang.org/grpc/internal/xds"
"google.golang.org/grpc/internal/xds/clients"
"google.golang.org/grpc/internal/xds/xdsclient"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

Expand Down Expand Up @@ -145,6 +146,14 @@ func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// If locality ID isn't found in the wrapper, an empty locality ID will
// be used.
lID = scw.localityID()

if scw.hostname != "" && autoHostRewriteEnabled(info.Ctx) {
if pr.Metadata == nil {
pr.Metadata = metadata.Pairs(":authority", scw.hostname)
} else {
pr.Metadata.Set(":authority", scw.hostname)
}
}
}

if err != nil {
Expand Down Expand Up @@ -199,20 +208,20 @@ func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// route's autoHostRewrite in the RPC context.
type autoHostRewriteKey struct{}

// autoHostRewrite retrieves the autoHostRewrite value from the provided context.
func autoHostRewrite(ctx context.Context) bool {
// autoHostRewriteEnabled retrieves the autoHostRewrite value from the provided context.
func autoHostRewriteEnabled(ctx context.Context) bool {
v, _ := ctx.Value(autoHostRewriteKey{}).(bool)
return v
}

// AutoHostRewriteForTesting returns the value of autoHostRewrite field;
// AutoHostRewriteEnabledForTesting returns the value of autoHostRewrite field;
// to be used for testing only.
func AutoHostRewriteForTesting(ctx context.Context) bool {
return autoHostRewrite(ctx)
func AutoHostRewriteEnabledForTesting(ctx context.Context) bool {
return autoHostRewriteEnabled(ctx)
}

// SetAutoHostRewrite adds the autoHostRewrite value to the context for
// EnableAutoHostRewrite adds the autoHostRewrite value to the context for
// the xds_cluster_impl LB policy to pick.
func SetAutoHostRewrite(ctx context.Context, autohostRewrite bool) context.Context {
return context.WithValue(ctx, autoHostRewriteKey{}, autohostRewrite)
func EnableAutoHostRewrite(ctx context.Context) context.Context {
return context.WithValue(ctx, autoHostRewriteKey{}, true)
}
Loading