Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb"
btopt "cloud.google.com/go/bigtable/internal/option"
btransport "cloud.google.com/go/bigtable/internal/transport"
"cloud.google.com/go/internal/trace"
gax "github.com/googleapis/gax-go/v2"
"github.com/googleapis/gax-go/v2/apierror"
Expand All @@ -55,6 +56,8 @@ const (
queryExpiredViolationType = "PREPARED_QUERY_EXPIRED"
preparedQueryExpireEarlyDuration = time.Second
methodNameReadRows = "ReadRows"
// Cannot extract extract d.GRPCConnPoolSize as DialSettings is in internal grpc pacakage
defaultBigtableConnPoolSize = 10

// For routing cookie
cookiePrefix = "x-goog-cbt-cookie-"
Expand Down Expand Up @@ -156,11 +159,6 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
// TODO(b/372244283): Remove after b/358175516 has been fixed
o = append(o, internaloption.EnableAsyncRefreshDryRun(metricsTracerFactory.newAsyncRefreshErrHandler()))

connPool, err := gtransport.DialPool(ctx, o...)
if err != nil {
return nil, err
}

disableRetryInfo := false

// If DISABLE_RETRY_INFO=1, library does not base retry decision and back off time on server returned RetryInfo value.
Expand All @@ -172,6 +170,23 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
retryOption = clientOnlyRetryOption
executeQueryRetryOption = clientOnlyExecuteQueryRetryOption
}

var connPool gtransport.ConnPool
var connPoolErr error
enableBigtableConnPool := btopt.EnableBigtableConnectionPool()
if enableBigtableConnPool {
connPool, connPoolErr = btransport.NewBigtableChannelPool(defaultBigtableConnPoolSize, btopt.BigtableLoadBalancingStrategy(), func() (*grpc.ClientConn, error) {
return gtransport.Dial(ctx, o...)
})
} else {
// use to regular ConnPool
connPool, connPoolErr = gtransport.DialPool(ctx, o...)
}

if connPoolErr != nil {
return nil, connPoolErr
}

return &Client{
connPool: connPool,
client: btpb.NewBigtableClient(connPool),
Expand Down
77 changes: 77 additions & 0 deletions bigtable/internal/option/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"os"
"strconv"
"strings"
"time"

"cloud.google.com/go/bigtable/internal"
Expand All @@ -34,6 +35,19 @@ import (
"google.golang.org/grpc/metadata"
)

const (
// LoadBalancingStrategyEnvVar is the environment variable to control the gRPC load balancing strategy.
LoadBalancingStrategyEnvVar = "CBT_LOAD_BALANCING_STRATEGY"
// RoundRobinLBPolicy is the policy name for round-robin.
RoundRobinLBPolicy = "round_robin"
// LeastInFlightLBPolicy is the policy name for least in flight (custom).
LeastInFlightLBPolicy = "least_in_flight"
// PowerOfTwoLeastInFlightLBPolicy is the policy name for power of two least in flight (custom).
PowerOfTwoLeastInFlightLBPolicy = "power_of_two_least_in_flight"
// BigtableConnectionPoolEnvVar is the env var for enabling Bigtable Connection Pool.
BigtableConnectionPoolEnvVar = "CBT_BIGTABLE_CONN_POOL"
)

// mergeOutgoingMetadata returns a context populated by the existing outgoing
// metadata merged with the provided mds.
func mergeOutgoingMetadata(ctx context.Context, mds ...metadata.MD) context.Context {
Expand Down Expand Up @@ -124,3 +138,66 @@ func ClientInterceptorOptions(stream []grpc.StreamClientInterceptor, unary []grp
option.WithGRPCDialOption(grpc.WithChainUnaryInterceptor(unary...)),
}
}

// LoadBalancingStrategy for connection pool.
type LoadBalancingStrategy int

const (
// RoundRobin is the round_robin gRPC load balancing policy.
RoundRobin LoadBalancingStrategy = iota
// LeastInFlight is the least_in_flight gRPC load balancing policy (custom).
LeastInFlight
// PowerOfTwoLeastInFlight is the power_of_two_least_in_flight gRPC load balancing policy (custom).
PowerOfTwoLeastInFlight
)

// String returns the string representation of the LoadBalancingStrategy.
func (s LoadBalancingStrategy) String() string {
switch s {
case LeastInFlight:
return "least_in_flight"
case PowerOfTwoLeastInFlight:
return "power_of_two_least_in_flight"
case RoundRobin:
return "round_robin"
default:
return "round_robin" // Default
}
}

// parseLoadBalancingStrategy parses the string from the environment variable
// into a LoadBalancingStrategy enum value.
func parseLoadBalancingStrategy(strategyStr string) LoadBalancingStrategy {
switch strings.ToUpper(strategyStr) {
case "LEAST_IN_FLIGHT":
return LeastInFlight
case "POWER_OF_TWO_LEAST_IN_FLIGHT":
return PowerOfTwoLeastInFlight
case "ROUND_ROBIN":
return RoundRobin
case "":
return RoundRobin // Default if env var is not set
default:
return RoundRobin // Default for unknown values
}
}

// BigtableLoadBalancingStrategy returns the gRPC service config JSON string for the chosen policy.
func BigtableLoadBalancingStrategy() LoadBalancingStrategy {
strategyStr := os.Getenv(LoadBalancingStrategyEnvVar)
return parseLoadBalancingStrategy(strategyStr)
}

// EnableBigtableConnectionPool uses new conn pool if envVar is set.
func EnableBigtableConnectionPool() bool {
bigtableConnPoolEnvVal := os.Getenv(BigtableConnectionPoolEnvVar)
if bigtableConnPoolEnvVal == "" {
return false
}
enableBigtableConnPool, err := strconv.ParseBool(bigtableConnPoolEnvVal)
if err != nil {
// just fail and use default conn pool
return false
}
return enableBigtableConnPool
}
Loading
Loading