Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions balancer/ringhash/ringhash_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,6 @@ func setupManagementServerAndResolver(t *testing.T) (*e2e.ManagementServer, stri
bc := e2e.DefaultBootstrapContents(t, nodeID, xdsServer.Address)

// Create an xDS resolver with the above bootstrap configuration.
if internal.NewXDSResolverWithConfigForTesting == nil {
t.Fatalf("internal.NewXDSResolverWithConfigForTesting is nil")
}
r, err := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error))(bc)
if err != nil {
t.Fatalf("Failed to create xDS resolver for testing: %v", err)
Expand Down
3 changes: 2 additions & 1 deletion internal/testutils/xds/e2e/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ func DefaultBootstrapContents(t *testing.T, nodeID, serverURI string) []byte {
bs, err := bootstrap.NewContentsForTesting(bootstrap.ConfigOptionsForTesting{
Servers: []byte(fmt.Sprintf(`[{
"server_uri": "passthrough:///%s",
"channel_creds": [{"type": "insecure"}]
"channel_creds": [{"type": "insecure"}],
"server_features": ["trusted_xds_server"]
}]`, serverURI)),
Node: []byte(fmt.Sprintf(`{"id": "%s"}`, nodeID)),
CertificateProviders: cpc,
Expand Down
3 changes: 3 additions & 0 deletions internal/testutils/xds/e2e/clientresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,8 @@ type BackendOptions struct {
HealthStatus v3corepb.HealthStatus
// Weight sets the backend weight. Defaults to 1.
Weight uint32
// Hostname sets the endpoint hostname for authority rewriting.
Hostname string
// Metadata sets the LB endpoint metadata (envoy.lb FilterMetadata field).
// See https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/core/v3/base.proto#envoy-v3-api-msg-config-core-v3-metadata
Metadata map[string]any
Expand Down Expand Up @@ -721,6 +723,7 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: b.Ports[0]},
},
}},
Hostname: b.Hostname,
AdditionalAddresses: additionalAddresses,
}},
HealthStatus: b.HealthStatus,
Expand Down
11 changes: 8 additions & 3 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,9 +574,14 @@ type CallHdr struct {

DoneFunc func() // called when the stream is finished

// Authority is used to explicitly override the `:authority` header. If set,
// this value takes precedence over the Host field and will be used as the
// value for the `:authority` header.
// Authority is used to explicitly override the `:authority` header.
//
// This value comes from one of two sources:
// 1. The `CallAuthority` call option, if specified by the user.
// 2. An override provided by the LB picker (e.g. xDS authority rewriting).
//
// The `CallAuthority` call option always takes precedence over the LB
// picker override.
Authority string
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ import (
func setupForSecurityTests(t *testing.T, bootstrapContents []byte, clientCreds, serverCreds credentials.TransportCredentials) (*grpc.ClientConn, string) {
t.Helper()

if internal.NewXDSResolverWithConfigForTesting == nil {
t.Fatalf("internal.NewXDSResolverWithConfigForTesting is nil")
}
r, err := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error))(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS resolver for testing: %v", err)
Expand Down
5 changes: 1 addition & 4 deletions internal/xds/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,6 @@ func setupWithManagementServer(t *testing.T, lis net.Listener, onStreamRequest f
nodeID := uuid.New().String()
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)

if internal.NewXDSResolverWithConfigForTesting == nil {
t.Fatalf("internal.NewXDSResolverWithConfigForTesting is nil")
}
r, err := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error))(bc)
if err != nil {
t.Fatalf("Failed to create xDS resolver for testing: %v", err)
Expand Down Expand Up @@ -654,7 +651,7 @@ func (s) TestClusterUpdate_SuccessWithLRS(t *testing.T) {
ServiceName: serviceName,
EnableLRS: true,
})
lrsServerCfg, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{URI: fmt.Sprintf("passthrough:///%s", mgmtServer.Address)})
lrsServerCfg, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{URI: fmt.Sprintf("passthrough:///%s", mgmtServer.Address), ServerFeatures: []string{"trusted_xds_server"}})
if err != nil {
t.Fatalf("Failed to create LRS server config for testing: %v", err)
}
Expand Down
3 changes: 0 additions & 3 deletions internal/xds/balancer/cdsbalancer/e2e_test/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ import (
func setupAndDial(t *testing.T, bootstrapContents []byte) (*grpc.ClientConn, func()) {
t.Helper()
// Create an xDS resolver with the above bootstrap configuration.
if internal.NewXDSResolverWithConfigForTesting == nil {
t.Fatalf("internal.NewXDSResolverWithConfigForTesting is nil")
}
r, err := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error))(bootstrapContents)
if err != nil {
t.Fatalf("xDS resolver creation failed: %v", err)
Expand Down
5 changes: 5 additions & 0 deletions internal/xds/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"google.golang.org/grpc/internal/xds/clients"
"google.golang.org/grpc/internal/xds/clients/lrsclient"
"google.golang.org/grpc/internal/xds/xdsclient"
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
Expand Down Expand Up @@ -420,6 +421,7 @@ type scWrapper struct {
// locality needs to be atomic because it can be updated while being read by
// the picker.
locality atomic.Pointer[clients.Locality]
hostname string
}

func (scw *scWrapper) updateLocalityID(lID clients.Locality) {
Expand All @@ -442,6 +444,9 @@ func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer
}
var sc balancer.SubConn
scw := &scWrapper{}
if len(addrs) > 0 {
scw.hostname = xdsresource.Hostname(addrs[0])
}
oldListener := opts.StateListener
opts.StateListener = func(state balancer.SubConnState) {
b.updateSubConnState(sc, state, oldListener)
Expand Down
25 changes: 17 additions & 8 deletions internal/xds/balancer/clusterimpl/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
xdsinternal "google.golang.org/grpc/internal/xds"
"google.golang.org/grpc/internal/xds/clients"
"google.golang.org/grpc/internal/xds/xdsclient"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

Expand Down Expand Up @@ -145,6 +146,14 @@ func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// If locality ID isn't found in the wrapper, an empty locality ID will
// be used.
lID = scw.localityID()

if scw.hostname != "" && autoHostRewriteEnabled(info.Ctx) {
if pr.Metadata == nil {
pr.Metadata = metadata.Pairs(":authority", scw.hostname)
} else {
pr.Metadata.Set(":authority", scw.hostname)
}
}
}

if err != nil {
Expand Down Expand Up @@ -199,20 +208,20 @@ func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// route's autoHostRewrite in the RPC context.
type autoHostRewriteKey struct{}

// autoHostRewrite retrieves the autoHostRewrite value from the provided context.
func autoHostRewrite(ctx context.Context) bool {
// autoHostRewriteEnabled retrieves the autoHostRewrite value from the provided context.
func autoHostRewriteEnabled(ctx context.Context) bool {
v, _ := ctx.Value(autoHostRewriteKey{}).(bool)
return v
}

// AutoHostRewriteForTesting returns the value of autoHostRewrite field;
// AutoHostRewriteEnabledForTesting returns the value of autoHostRewrite field;
// to be used for testing only.
func AutoHostRewriteForTesting(ctx context.Context) bool {
return autoHostRewrite(ctx)
func AutoHostRewriteEnabledForTesting(ctx context.Context) bool {
return autoHostRewriteEnabled(ctx)
}

// SetAutoHostRewrite adds the autoHostRewrite value to the context for
// EnableAutoHostRewrite adds the autoHostRewrite value to the context for
// the xds_cluster_impl LB policy to pick.
func SetAutoHostRewrite(ctx context.Context, autohostRewrite bool) context.Context {
return context.WithValue(ctx, autoHostRewriteKey{}, autohostRewrite)
func EnableAutoHostRewrite(ctx context.Context) context.Context {
return context.WithValue(ctx, autoHostRewriteKey{}, true)
}
Loading