-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
database: Avoid race condition in connection creation #26147
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -161,8 +161,9 @@ func (b *databaseBackend) collectPluginInstanceGaugeValues(context.Context) ([]m | |
|
||
type databaseBackend struct { | ||
// connections holds configured database connections by config name | ||
connections *syncmap.SyncMap[string, *dbPluginInstance] | ||
logger log.Logger | ||
createConnectionLock sync.Mutex | ||
connections *syncmap.SyncMap[string, *dbPluginInstance] | ||
logger log.Logger | ||
|
||
*framework.Backend | ||
// credRotationQueue is an in-memory priority queue used to track Static Roles | ||
|
@@ -291,11 +292,23 @@ func (b *databaseBackend) GetConnection(ctx context.Context, s logical.Storage, | |
} | ||
|
||
func (b *databaseBackend) GetConnectionWithConfig(ctx context.Context, name string, config *DatabaseConfig) (*dbPluginInstance, error) { | ||
// fast path, reuse the existing connection | ||
dbi := b.connections.Get(name) | ||
if dbi != nil { | ||
return dbi, nil | ||
} | ||
|
||
// slow path, create a new connection | ||
// if we don't lock the rest of the operation, there is a race condition for multiple callers of this function | ||
b.createConnectionLock.Lock() | ||
defer b.createConnectionLock.Unlock() | ||
|
||
// check again in case we lost the race | ||
dbi = b.connections.Get(name) | ||
if dbi != nil { | ||
return dbi, nil | ||
} | ||
|
||
id, err := uuid.GenerateUUID() | ||
if err != nil { | ||
return nil, err | ||
|
@@ -332,14 +345,17 @@ func (b *databaseBackend) GetConnectionWithConfig(ctx context.Context, name stri | |
name: name, | ||
runningPluginVersion: pluginVersion, | ||
} | ||
oldConn := b.connections.Put(name, dbi) | ||
if oldConn != nil { | ||
err := oldConn.Close() | ||
conn, ok := b.connections.PutIfEmpty(name, dbi) | ||
if !ok { | ||
// this is a bug | ||
b.Logger().Error("Error: there was a race condition adding to the database connection map") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we mention that we hit a bug here? A user could then report it. Should this be |
||
// There was already an existing connection, so we will use that and close our new one to avoid a race condition. | ||
err := dbi.Close() | ||
if err != nil { | ||
b.Logger().Warn("Error closing database connection", "error", err) | ||
b.Logger().Warn("Error closing new database connection", "error", err) | ||
} | ||
} | ||
return dbi, nil | ||
return conn, nil | ||
} | ||
|
||
// ClearConnection closes the database connection and | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
package database | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
"testing" | ||
|
||
"github.com/hashicorp/vault/sdk/helper/consts" | ||
"github.com/hashicorp/vault/sdk/helper/pluginutil" | ||
"github.com/hashicorp/vault/sdk/logical" | ||
"github.com/hashicorp/vault/sdk/queue" | ||
) | ||
|
||
func newSystemViewWrapper(view logical.SystemView) logical.SystemView { | ||
return &systemViewWrapper{ | ||
view, | ||
} | ||
} | ||
|
||
type systemViewWrapper struct { | ||
logical.SystemView | ||
} | ||
|
||
var _ logical.ExtendedSystemView = (*systemViewWrapper)(nil) | ||
|
||
func (s *systemViewWrapper) RequestWellKnownRedirect(ctx context.Context, src, dest string) error { | ||
panic("nope") | ||
} | ||
|
||
func (s *systemViewWrapper) DeregisterWellKnownRedirect(ctx context.Context, src string) bool { | ||
panic("nope") | ||
} | ||
|
||
func (s *systemViewWrapper) Auditor() logical.Auditor { | ||
panic("nope") | ||
} | ||
|
||
func (s *systemViewWrapper) ForwardGenericRequest(ctx context.Context, request *logical.Request) (*logical.Response, error) { | ||
panic("nope") | ||
} | ||
|
||
func (s *systemViewWrapper) APILockShouldBlockRequest() (bool, error) { | ||
panic("nope") | ||
} | ||
|
||
func (s *systemViewWrapper) GetPinnedPluginVersion(ctx context.Context, pluginType consts.PluginType, pluginName string) (*pluginutil.PinnedVersion, error) { | ||
return nil, pluginutil.ErrPinnedVersionNotFound | ||
} | ||
|
||
func (s *systemViewWrapper) LookupPluginVersion(ctx context.Context, pluginName string, pluginType consts.PluginType, version string) (*pluginutil.PluginRunner, error) { | ||
return &pluginutil.PluginRunner{ | ||
Name: "mockv5", | ||
Type: consts.PluginTypeDatabase, | ||
Builtin: true, | ||
BuiltinFactory: New, | ||
}, nil | ||
} | ||
|
||
func getDbBackend(t *testing.T) (*databaseBackend, logical.Storage) { | ||
t.Helper() | ||
config := logical.TestBackendConfig() | ||
config.System = newSystemViewWrapper(config.System) | ||
config.StorageView = &logical.InmemStorage{} | ||
// Create and init the backend ourselves instead of using a Factory because | ||
// the factory function kicks off threads that cause racy tests. | ||
b := Backend(config) | ||
if err := b.Setup(context.Background(), config); err != nil { | ||
t.Fatal(err) | ||
} | ||
b.schedule = &TestSchedule{} | ||
b.credRotationQueue = queue.New() | ||
b.populateQueue(context.Background(), config.StorageView) | ||
|
||
return b, config.StorageView | ||
} | ||
|
||
// TestGetConnectionRaceCondition checks that GetConnection always returns the same instance, even when asked | ||
// by multiple goroutines in parallel. | ||
func TestGetConnectionRaceCondition(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
ctx := context.Background() | ||
b, s := getDbBackend(t) | ||
defer b.Cleanup(ctx) | ||
configureDBMount(t, s) | ||
|
||
goroutines := 16 | ||
|
||
wg := sync.WaitGroup{} | ||
wg.Add(goroutines) | ||
dbis := make([]*dbPluginInstance, goroutines) | ||
errs := make([]error, goroutines) | ||
for i := 0; i < goroutines; i++ { | ||
go func(i int) { | ||
dbis[i], errs[i] = b.GetConnection(ctx, s, "mockv5") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: might want to factor |
||
wg.Done() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: probably not necessary, but deferring this call just before calling |
||
}(i) | ||
} | ||
wg.Wait() | ||
for i := 0; i < goroutines; i++ { | ||
if errs[i] != nil { | ||
t.Fatal(errs[i]) | ||
} | ||
if dbis[0] != dbis[i] { | ||
t.Fatal("Error: database instances did not match") | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we delete this block, the fix will still work; we still see a spike in open file descriptors on leadership transfers under load, but the file descriptors are closed correctly (because we switch to
PutIfEmpty
below).