From cef6a9f0ded042849854294a1bfddf039c5ffdb2 Mon Sep 17 00:00:00 2001 From: Edoardo Spadolini Date: Tue, 15 Mar 2022 17:54:48 +0100 Subject: [PATCH] Address problems in concurrent sqlite access (#10706) * Use BEGIN IMMEDIATE to start transactions This makes it so all transactions grab a write lock rather than a read lock that can be upgraded in case of a write; in case of multiple writers (which, in our case, can only happen during a restart as the new process reopens the same sqlite database) this will prevent two transactions from attempting to upgrade their lock, which would cause a SQLITE_BUSY error in one of them. In regular operation this shouldn't cause a performance hit, as we're using a single connection to the sqlite database (guarded by locks in the go side) anyway. * Escape path in sqlite connection URL This makes it so that the sqlite backend supports paths with ? in them. * Close process storage on TeleportProcess shutdown This aligns the behavior of Shutdown with that of Close. * Allow specifying the journal mode in sqlite This will let sqlite backend users specify WAL mode in their config file, and will allow us to specify alternate journal modes for our on-disk caches in the future. This also removes sqlite memory mode, as it's not used anywhere because of its poor query performance compared to our in-memory backend, and cleans up a bit of old cruft, and runs process storage in FULL sync mode - it's very seldom written to and holds important data. --- lib/auth/state_unix.go | 1 + lib/backend/lite/lite.go | 163 +++++++++++++++++++++---------- lib/backend/lite/litemem_test.go | 119 ---------------------- lib/service/service.go | 19 ++-- 4 files changed, 126 insertions(+), 176 deletions(-) delete mode 100644 lib/backend/lite/litemem_test.go diff --git a/lib/auth/state_unix.go b/lib/auth/state_unix.go index 2790b3ff06159..8ca4f4d864e98 100644 --- a/lib/auth/state_unix.go +++ b/lib/auth/state_unix.go @@ -35,6 +35,7 @@ func NewProcessStorage(ctx context.Context, path string) (*ProcessStorage, error litebk, err := lite.NewWithConfig(ctx, lite.Config{ Path: path, EventsOff: true, + Sync: lite.SyncFull, }) if err != nil { return nil, trace.Wrap(err) diff --git a/lib/backend/lite/lite.go b/lib/backend/lite/lite.go index b5d0e7bb67581..0aced3788b1d6 100644 --- a/lib/backend/lite/lite.go +++ b/lib/backend/lite/lite.go @@ -21,10 +21,11 @@ import ( "context" "database/sql" "database/sql/driver" - "fmt" + "net/url" "os" "path/filepath" "runtime/debug" + "strconv" "sync/atomic" "time" @@ -39,15 +40,36 @@ import ( ) const ( - // BackendName is the name of this backend + // BackendName is the name of this backend. BackendName = "sqlite" // AlternativeName is another name of this backend. - AlternativeName = "dir" - defaultDirMode os.FileMode = 0770 - defaultDBFile = "sqlite.db" - slowTransactionThreshold = time.Second - syncOFF = "OFF" - busyTimeout = 10000 + AlternativeName = "dir" + + // SyncOff disables file system sync after writing. + SyncOff = "OFF" + // SyncFull fsyncs the database file on disk after every write. + SyncFull = "FULL" + + // JournalMemory keeps the rollback journal in memory instead of storing it + // on disk. + JournalMemory = "MEMORY" +) + +const ( + // defaultDirMode is the mode of the newly created directories that are part + // of the Path + defaultDirMode os.FileMode = 0770 + + // defaultDBFile is the file name of the sqlite db in the directory + // specified by Path + defaultDBFile = "sqlite.db" + slowTransactionThreshold = time.Second + + // defaultSync is the default value for Sync + defaultSync = SyncOff + + // defaultBusyTimeout is the default value for BusyTimeout, in ms + defaultBusyTimeout = 10000 ) // GetName is a part of backend API and it returns SQLite backend type @@ -56,14 +78,6 @@ func GetName() string { return BackendName } -func init() { - sql.Register(BackendName, &sqlite3.SQLiteDriver{ - ConnectHook: func(conn *sqlite3.SQLiteConn) error { - return nil - }, - }) -} - // Config structure represents configuration section type Config struct { // Path is a path to the database directory @@ -77,15 +91,12 @@ type Config struct { EventsOff bool `json:"events_off,omitempty"` // Clock allows to override clock used in the backend Clock clockwork.Clock `json:"-"` - // Sync sets synchronous pragrma + // Sync sets the synchronous pragma Sync string `json:"sync,omitempty"` // BusyTimeout sets busy timeout in milliseconds BusyTimeout int `json:"busy_timeout,omitempty"` - // Memory turns memory mode of the database - Memory bool `json:"memory"` - // MemoryName sets the name of the database, - // set to "sqlite.db" by default - MemoryName string `json:"memory_name"` + // Journal sets the journal_mode pragma + Journal string `json:"journal,omitempty"` // Mirror turns on mirror mode for the backend, // which will use record IDs for Put and PutRange passed from // the resources, not generate a new one @@ -95,7 +106,7 @@ type Config struct { // CheckAndSetDefaults is a helper returns an error if the supplied configuration // is not enough to connect to sqlite func (cfg *Config) CheckAndSetDefaults() error { - if cfg.Path == "" && !cfg.Memory { + if cfg.Path == "" { return trace.BadParameter("specify directory path to the database using 'path' parameter") } if cfg.BufferSize == 0 { @@ -108,17 +119,69 @@ func (cfg *Config) CheckAndSetDefaults() error { cfg.Clock = clockwork.NewRealClock() } if cfg.Sync == "" { - cfg.Sync = syncOFF + cfg.Sync = defaultSync } if cfg.BusyTimeout == 0 { - cfg.BusyTimeout = busyTimeout - } - if cfg.MemoryName == "" { - cfg.MemoryName = defaultDBFile + cfg.BusyTimeout = defaultBusyTimeout } return nil } +// ConnectionURI returns a connection string usable with sqlite according to the +// Config. +func (cfg *Config) ConnectionURI() string { + params := url.Values{} + params.Set("_busy_timeout", strconv.Itoa(cfg.BusyTimeout)) + // The _txlock parameter is parsed by go-sqlite to determine if (all) + // transactions should be started with `BEGIN DEFERRED` (the default, same + // as `BEGIN`), `BEGIN IMMEDIATE` or `BEGIN EXCLUSIVE`. + // + // The way we use sqlite relies entirely on the busy timeout handler (also + // configured through the connection URL, with the _busy_timeout parameter) + // to address concurrency problems, and treats any SQLITE_BUSY errors as a + // fatal issue with the database; however, in scenarios with multiple + // readwriters it is possible to still get a busy error even with a generous + // busy timeout handler configured, as two transactions that both start off + // with a SELECT - thus acquiring a SHARED lock, see + // https://www.sqlite.org/lockingv3.html#transaction_control - then attempt + // to upgrade to a RESERVED lock to upsert or delete something can end up + // requiring one of the two transactions to forcibly rollback to avoid a + // deadlock, which is signaled by the sqlite engine with a SQLITE_BUSY error + // returned to one of the two. When that happens, a concurrent-aware program + // can just try the transaction again a few times - making sure to disregard + // what was read before the transaction actually committed. + // + // As we're not really interested in concurrent sqlite access (process + // storage has very little written to, sharing a sqlite database as the + // backend between two auths is not really supported, and caches shouldn't + // ever run on the same underlying sqlite backend) we instead start every + // transaction with `BEGIN IMMEDIATE`, which grabs a RESERVED lock + // immediately (waiting for the busy timeout in case some other connection + // to the database has the lock) at the beginning of the transaction, thus + // avoiding any spurious SQLITE_BUSY error that can happen halfway through a + // transaction. + // + // If we end up requiring better concurrent access to sqlite in the future + // we should consider enabling Write-Ahead Logging mode, to actually allow + // for reads to happen at the same time as writes, adding some amount of + // retries to inTransaction, and double-checking that all uses of it + // correctly handle the possibility of the transaction being restarted. + params.Set("_txlock", "immediate") + if cfg.Sync != "" { + params.Set("_sync", cfg.Sync) + } + if cfg.Journal != "" { + params.Set("_journal", cfg.Journal) + } + + u := url.URL{ + Scheme: "file", + Opaque: url.QueryEscape(filepath.Join(cfg.Path, defaultDBFile)), + RawQuery: params.Encode(), + } + return u.String() +} + // New returns a new instance of sqlite backend func New(ctx context.Context, params backend.Params) (*Backend, error) { var cfg *Config @@ -135,23 +198,18 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) { if err := cfg.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } - var connectorURL string - if !cfg.Memory { - // Ensure that the path to the root directory exists. - err := os.MkdirAll(cfg.Path, defaultDirMode) - if err != nil { - return nil, trace.ConvertSystemError(err) - } - fullPath := filepath.Join(cfg.Path, defaultDBFile) - connectorURL = fmt.Sprintf("file:%v?_busy_timeout=%v&_sync=%v", fullPath, cfg.BusyTimeout, cfg.Sync) - } else { - connectorURL = fmt.Sprintf("file:%v?mode=memory", cfg.MemoryName) + connectionURI := cfg.ConnectionURI() + // Ensure that the path to the root directory exists. + err := os.MkdirAll(cfg.Path, defaultDirMode) + if err != nil { + return nil, trace.ConvertSystemError(err) } - db, err := sql.Open(BackendName, connectorURL) + db, err := sql.Open("sqlite3", cfg.ConnectionURI()) if err != nil { - return nil, trace.Wrap(err, "error opening URI: %v", connectorURL) + return nil, trace.Wrap(err, "error opening URI: %v", connectionURI) } - // serialize access to sqlite to avoid database is locked errors + // serialize access to sqlite, as we're using immediate transactions anyway, + // and in-memory go locks are faster than sqlite locks db.SetMaxOpenConns(1) buf := backend.NewCircularBuffer( backend.BufferCapacity(cfg.BufferSize), @@ -169,9 +227,9 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) { watchStarted: watchStarted, signalWatchStart: signalWatchStart, } - l.Debugf("Connected to: %v, poll stream period: %v", connectorURL, cfg.PollStreamPeriod) + l.Debugf("Connected to: %v, poll stream period: %v", connectionURI, cfg.PollStreamPeriod) if err := l.createSchema(); err != nil { - return nil, trace.Wrap(err, "error creating schema: %v", connectorURL) + return nil, trace.Wrap(err, "error creating schema: %v", connectionURI) } if err := l.showPragmas(); err != nil { l.Warningf("Failed to show pragma settings: %v.", err) @@ -204,17 +262,22 @@ type Backend struct { // parameters, when called, logs some key PRAGMA values func (l *Backend) showPragmas() error { return l.inTransaction(l.ctx, func(tx *sql.Tx) error { - row := tx.QueryRowContext(l.ctx, "PRAGMA synchronous;") - var syncValue string - if err := row.Scan(&syncValue); err != nil { + var journalMode string + row := tx.QueryRowContext(l.ctx, "PRAGMA journal_mode;") + if err := row.Scan(&journalMode); err != nil { + return trace.Wrap(err) + } + row = tx.QueryRowContext(l.ctx, "PRAGMA synchronous;") + var synchronous string + if err := row.Scan(&synchronous); err != nil { return trace.Wrap(err) } - var timeoutValue string + var busyTimeout string row = tx.QueryRowContext(l.ctx, "PRAGMA busy_timeout;") - if err := row.Scan(&timeoutValue); err != nil { + if err := row.Scan(&busyTimeout); err != nil { return trace.Wrap(err) } - l.Debugf("Synchronous: %v, busy timeout: %v", syncValue, timeoutValue) + l.Debugf("journal_mode=%v, synchronous=%v, busy_timeout=%v", journalMode, synchronous, busyTimeout) return nil }) } diff --git a/lib/backend/lite/litemem_test.go b/lib/backend/lite/litemem_test.go deleted file mode 100644 index 418900301847e..0000000000000 --- a/lib/backend/lite/litemem_test.go +++ /dev/null @@ -1,119 +0,0 @@ -/* -Copyright 2018-2019 Gravitational, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package lite - -import ( - "context" - "time" - - "github.com/gravitational/teleport/lib/backend" - "github.com/gravitational/teleport/lib/backend/test" - - "github.com/jonboulle/clockwork" - "gopkg.in/check.v1" -) - -type LiteMemSuite struct { - bk *Backend - suite test.BackendSuite -} - -var _ = check.Suite(&LiteMemSuite{}) - -func (s *LiteMemSuite) SetUpSuite(c *check.C) { - clock := clockwork.NewFakeClock() - newBackend := func() (backend.Backend, error) { - return NewWithConfig(context.Background(), Config{ - Memory: true, - PollStreamPeriod: 300 * time.Millisecond, - Clock: clock, - }) - } - s.suite.NewBackend = newBackend - s.suite.Clock = clock -} - -func (s *LiteMemSuite) SetUpTest(c *check.C) { - bk, err := s.suite.NewBackend() - c.Assert(err, check.IsNil) - s.bk = bk.(*Backend) - s.suite.B = s.bk -} - -func (s *LiteMemSuite) TearDownTest(c *check.C) { - if s.bk != nil { - c.Assert(s.bk.Close(), check.IsNil) - } -} - -func (s *LiteMemSuite) TestCRUD(c *check.C) { - s.suite.CRUD(c) -} - -func (s *LiteMemSuite) TestRange(c *check.C) { - s.suite.Range(c) -} - -func (s *LiteMemSuite) TestCompareAndSwap(c *check.C) { - s.suite.CompareAndSwap(c) -} - -func (s *LiteMemSuite) TestExpiration(c *check.C) { - s.suite.Expiration(c) -} - -func (s *LiteMemSuite) TestKeepAlive(c *check.C) { - s.suite.KeepAlive(c) -} - -func (s *LiteMemSuite) TestEvents(c *check.C) { - s.suite.Events(c) -} - -func (s *LiteMemSuite) TestWatchersClose(c *check.C) { - s.suite.WatchersClose(c) -} - -func (s *LiteMemSuite) TestDeleteRange(c *check.C) { - s.suite.DeleteRange(c) -} - -func (s *LiteMemSuite) TestPutRange(c *check.C) { - s.suite.PutRange(c) -} - -func (s *LiteMemSuite) TestLocking(c *check.C) { - s.suite.Locking(c, s.bk) -} - -func (s *LiteMemSuite) TestConcurrentOperations(c *check.C) { - bk, err := s.suite.NewBackend() - c.Assert(err, check.IsNil) - defer bk.Close() - s.suite.B2 = bk - s.suite.ConcurrentOperations(c) -} - -func (s *LiteMemSuite) TestMirror(c *check.C) { - mem, err := NewWithConfig(context.Background(), Config{ - Memory: true, - Mirror: true, - PollStreamPeriod: 300 * time.Millisecond, - }) - c.Assert(err, check.IsNil) - s.suite.Mirror(c, mem) -} diff --git a/lib/service/service.go b/lib/service/service.go index 3209f5a5b8785..9487994c6618d 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -1526,7 +1526,6 @@ func (process *TeleportProcess) newAccessCache(cfg accessCacheConfig) (*cache.Ca lite.Config{ Path: path, EventsOff: !cfg.events, - Memory: false, Mirror: true, PollStreamPeriod: 100 * time.Millisecond, }) @@ -3345,12 +3344,18 @@ func (process *TeleportProcess) StartShutdown(ctx context.Context) context.Conte process.log.Warnf("Error waiting for all services to complete: %v", err) } process.log.Debug("All supervisor functions are completed.") - localAuth := process.getLocalAuth() - if localAuth != nil { - if err := process.localAuth.Close(); err != nil { + + if localAuth := process.getLocalAuth(); localAuth != nil { + if err := localAuth.Close(); err != nil { process.log.Warningf("Failed closing auth server: %v.", err) } } + + if process.storage != nil { + if err := process.storage.Close(); err != nil { + process.log.Warningf("Failed closing process storage: %v.", err) + } + } }() go process.printShutdownStatus(localCtx) return localCtx @@ -3372,9 +3377,9 @@ func (process *TeleportProcess) Close() error { process.Config.Keygen.Close() var errors []error - localAuth := process.getLocalAuth() - if localAuth != nil { - errors = append(errors, process.localAuth.Close()) + + if localAuth := process.getLocalAuth(); localAuth != nil { + errors = append(errors, localAuth.Close()) } if process.storage != nil {