Skip to content

Commit

Permalink
physical/spanner: use separate client for updating locks (#9423)
Browse files Browse the repository at this point in the history
* physical/spanner: use separate client for updating locks

We believe this mitigates an issue where a large influx of requests
cause the leader to be unable to update the lock table (since it cannot
grab a client from the pool or the client has no more open connections),
which causes cascading failure.
  • Loading branch information
sethvargo authored and andaley committed Jul 17, 2020
1 parent c76e1e3 commit fd8d839
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 26 deletions.
44 changes: 33 additions & 11 deletions physical/spanner/spanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,26 @@ type Backend struct {
// table is the name of the table in the database.
table string

// client is the API client and permitPool is the allowed concurrent uses of
// the client.
client *spanner.Client
permitPool *physical.PermitPool

// haTable is the name of the table to use for HA in the database.
haTable string

// haEnabled indicates if high availability is enabled. Default: true.
haEnabled bool

// client is the underlying API client for talking to spanner.
client *spanner.Client
// haClient is the API client. This is managed separately from the main client
// because a flood of requests should not block refreshing the TTLs on the
// lock.
//
// This value will be nil if haEnabled is false.
haClient *spanner.Client

// logger and permitPool are internal constructs.
logger log.Logger
permitPool *physical.PermitPool
// logger is the internal logger.
logger log.Logger
}

// NewBackend creates a new Google Spanner storage backend with the given
Expand Down Expand Up @@ -127,6 +135,7 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error
}

// HA configuration
haClient := (*spanner.Client)(nil)
haEnabled := false
haEnabledStr := os.Getenv(envHAEnabled)
if haEnabledStr == "" {
Expand All @@ -139,6 +148,17 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error
return nil, errwrap.Wrapf("failed to parse HA enabled: {{err}}", err)
}
}
if haEnabled {
logger.Debug("creating HA client")
var err error
ctx := context.Background()
haClient, err = spanner.NewClient(ctx, database,
option.WithUserAgent(useragent.String()),
)
if err != nil {
return nil, errwrap.Wrapf("failed to create HA client: {{err}}", err)
}
}

// Max parallel
maxParallel, err := extractInt(c["max_parallel"])
Expand All @@ -153,8 +173,8 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error
"haTable", haTable,
"maxParallel", maxParallel,
)
logger.Debug("creating client")

logger.Debug("creating client")
ctx := context.Background()
client, err := spanner.NewClient(ctx, database,
option.WithUserAgent(useragent.String()),
Expand All @@ -164,14 +184,16 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error
}

return &Backend{
database: database,
table: table,
database: database,
table: table,
client: client,
permitPool: physical.NewPermitPool(maxParallel),

haEnabled: haEnabled,
haTable: haTable,
haClient: haClient,

client: client,
permitPool: physical.NewPermitPool(maxParallel),
logger: logger,
logger: logger,
}, nil
}

Expand Down
18 changes: 3 additions & 15 deletions physical/spanner/spanner_ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,9 @@ func (l *Lock) Unlock() error {
}
l.stopLock.Unlock()

// Pooling
l.backend.permitPool.Acquire()
defer l.backend.permitPool.Release()

// Delete
ctx := context.Background()
if _, err := l.backend.client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
if _, err := l.backend.haClient.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
row, err := txn.ReadRow(ctx, l.backend.haTable, spanner.Key{l.key}, []string{"Identity"})
if err != nil {
if spanner.ErrCode(err) != codes.NotFound {
Expand Down Expand Up @@ -327,10 +323,6 @@ OUTER:
// - if key is empty or identity is the same or timestamp exceeds TTL
// - update the lock to self
func (l *Lock) writeLock() (bool, error) {
// Pooling
l.backend.permitPool.Acquire()
defer l.backend.permitPool.Release()

// Keep track of whether the lock was written
lockWritten := false

Expand All @@ -349,7 +341,7 @@ func (l *Lock) writeLock() (bool, error) {
}
}()

_, err := l.backend.client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
_, err := l.backend.haClient.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
row, err := txn.ReadRow(ctx, l.backend.haTable, spanner.Key{l.key}, []string{"Key", "Identity", "Timestamp"})
if err != nil && spanner.ErrCode(err) != codes.NotFound {
return err
Expand Down Expand Up @@ -396,12 +388,8 @@ func (l *Lock) writeLock() (bool, error) {

// get retrieves the value for the lock.
func (l *Lock) get(ctx context.Context) (*LockRecord, error) {
// Pooling
l.backend.permitPool.Acquire()
defer l.backend.permitPool.Release()

// Read
row, err := l.backend.client.Single().ReadRow(ctx, l.backend.haTable, spanner.Key{l.key}, []string{"Key", "Value", "Timestamp", "Identity"})
row, err := l.backend.haClient.Single().ReadRow(ctx, l.backend.haTable, spanner.Key{l.key}, []string{"Key", "Value", "Timestamp", "Identity"})
if spanner.ErrCode(err) == codes.NotFound {
return nil, nil
}
Expand Down

0 comments on commit fd8d839

Please sign in to comment.