Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

physical/spanner: use separate client for updating locks #9423

Merged
merged 3 commits into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 33 additions & 11 deletions physical/spanner/spanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,26 @@ type Backend struct {
// table is the name of the table in the database.
table string

// client is the API client and permitPool is the allowed concurrent uses of
// the client.
client *spanner.Client
permitPool *physical.PermitPool

// haTable is the name of the table to use for HA in the database.
haTable string

// haEnabled indicates if high availability is enabled. Default: true.
haEnabled bool

// client is the underlying API client for talking to spanner.
client *spanner.Client
// haClient is the API client. This is managed separately from the main client
// because a flood of requests should not block refreshing the TTLs on the
// lock.
//
// This value will be nil if haEnabled is false.
haClient *spanner.Client

// logger and permitPool are internal constructs.
logger log.Logger
permitPool *physical.PermitPool
// logger is the internal logger.
logger log.Logger
}

// NewBackend creates a new Google Spanner storage backend with the given
Expand Down Expand Up @@ -127,6 +135,7 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error
}

// HA configuration
haClient := (*spanner.Client)(nil)
haEnabled := false
haEnabledStr := os.Getenv(envHAEnabled)
if haEnabledStr == "" {
Expand All @@ -139,6 +148,17 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error
return nil, errwrap.Wrapf("failed to parse HA enabled: {{err}}", err)
}
}
if haEnabled {
logger.Debug("creating HA client")
var err error
ctx := context.Background()
haClient, err = spanner.NewClient(ctx, database,
option.WithUserAgent(useragent.String()),
)
if err != nil {
return nil, errwrap.Wrapf("failed to create HA client: {{err}}", err)
}
}

// Max parallel
maxParallel, err := extractInt(c["max_parallel"])
Expand All @@ -153,8 +173,8 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error
"haTable", haTable,
"maxParallel", maxParallel,
)
logger.Debug("creating client")

logger.Debug("creating client")
ctx := context.Background()
client, err := spanner.NewClient(ctx, database,
option.WithUserAgent(useragent.String()),
Expand All @@ -164,14 +184,16 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error
}

return &Backend{
database: database,
table: table,
database: database,
table: table,
client: client,
permitPool: physical.NewPermitPool(maxParallel),

haEnabled: haEnabled,
haTable: haTable,
haClient: haClient,

client: client,
permitPool: physical.NewPermitPool(maxParallel),
logger: logger,
logger: logger,
}, nil
}

Expand Down
18 changes: 3 additions & 15 deletions physical/spanner/spanner_ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,9 @@ func (l *Lock) Unlock() error {
}
l.stopLock.Unlock()

// Pooling
l.backend.permitPool.Acquire()
defer l.backend.permitPool.Release()

// Delete
ctx := context.Background()
if _, err := l.backend.client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
if _, err := l.backend.haClient.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
row, err := txn.ReadRow(ctx, l.backend.haTable, spanner.Key{l.key}, []string{"Identity"})
if err != nil {
if spanner.ErrCode(err) != codes.NotFound {
Expand Down Expand Up @@ -327,10 +323,6 @@ OUTER:
// - if key is empty or identity is the same or timestamp exceeds TTL
// - update the lock to self
func (l *Lock) writeLock() (bool, error) {
// Pooling
l.backend.permitPool.Acquire()
defer l.backend.permitPool.Release()

// Keep track of whether the lock was written
lockWritten := false

Expand All @@ -349,7 +341,7 @@ func (l *Lock) writeLock() (bool, error) {
}
}()

_, err := l.backend.client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
_, err := l.backend.haClient.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
row, err := txn.ReadRow(ctx, l.backend.haTable, spanner.Key{l.key}, []string{"Key", "Identity", "Timestamp"})
if err != nil && spanner.ErrCode(err) != codes.NotFound {
return err
Expand Down Expand Up @@ -396,12 +388,8 @@ func (l *Lock) writeLock() (bool, error) {

// get retrieves the value for the lock.
func (l *Lock) get(ctx context.Context) (*LockRecord, error) {
// Pooling
l.backend.permitPool.Acquire()
defer l.backend.permitPool.Release()

// Read
row, err := l.backend.client.Single().ReadRow(ctx, l.backend.haTable, spanner.Key{l.key}, []string{"Key", "Value", "Timestamp", "Identity"})
row, err := l.backend.haClient.Single().ReadRow(ctx, l.backend.haTable, spanner.Key{l.key}, []string{"Key", "Value", "Timestamp", "Identity"})
if spanner.ErrCode(err) == codes.NotFound {
return nil, nil
}
Expand Down