diff --git a/lib/auth/state_unix.go b/lib/auth/state_unix.go index 2790b3ff06159..8ca4f4d864e98 100644 --- a/lib/auth/state_unix.go +++ b/lib/auth/state_unix.go @@ -35,6 +35,7 @@ func NewProcessStorage(ctx context.Context, path string) (*ProcessStorage, error litebk, err := lite.NewWithConfig(ctx, lite.Config{ Path: path, EventsOff: true, + Sync: lite.SyncFull, }) if err != nil { return nil, trace.Wrap(err) diff --git a/lib/backend/lite/lite.go b/lib/backend/lite/lite.go index b5d0e7bb67581..0aced3788b1d6 100644 --- a/lib/backend/lite/lite.go +++ b/lib/backend/lite/lite.go @@ -21,10 +21,11 @@ import ( "context" "database/sql" "database/sql/driver" - "fmt" + "net/url" "os" "path/filepath" "runtime/debug" + "strconv" "sync/atomic" "time" @@ -39,15 +40,36 @@ import ( ) const ( - // BackendName is the name of this backend + // BackendName is the name of this backend. BackendName = "sqlite" // AlternativeName is another name of this backend. - AlternativeName = "dir" - defaultDirMode os.FileMode = 0770 - defaultDBFile = "sqlite.db" - slowTransactionThreshold = time.Second - syncOFF = "OFF" - busyTimeout = 10000 + AlternativeName = "dir" + + // SyncOff disables file system sync after writing. + SyncOff = "OFF" + // SyncFull fsyncs the database file on disk after every write. + SyncFull = "FULL" + + // JournalMemory keeps the rollback journal in memory instead of storing it + // on disk. + JournalMemory = "MEMORY" +) + +const ( + // defaultDirMode is the mode of the newly created directories that are part + // of the Path + defaultDirMode os.FileMode = 0770 + + // defaultDBFile is the file name of the sqlite db in the directory + // specified by Path + defaultDBFile = "sqlite.db" + slowTransactionThreshold = time.Second + + // defaultSync is the default value for Sync + defaultSync = SyncOff + + // defaultBusyTimeout is the default value for BusyTimeout, in ms + defaultBusyTimeout = 10000 ) // GetName is a part of backend API and it returns SQLite backend type @@ -56,14 +78,6 @@ func GetName() string { return BackendName } -func init() { - sql.Register(BackendName, &sqlite3.SQLiteDriver{ - ConnectHook: func(conn *sqlite3.SQLiteConn) error { - return nil - }, - }) -} - // Config structure represents configuration section type Config struct { // Path is a path to the database directory @@ -77,15 +91,12 @@ type Config struct { EventsOff bool `json:"events_off,omitempty"` // Clock allows to override clock used in the backend Clock clockwork.Clock `json:"-"` - // Sync sets synchronous pragrma + // Sync sets the synchronous pragma Sync string `json:"sync,omitempty"` // BusyTimeout sets busy timeout in milliseconds BusyTimeout int `json:"busy_timeout,omitempty"` - // Memory turns memory mode of the database - Memory bool `json:"memory"` - // MemoryName sets the name of the database, - // set to "sqlite.db" by default - MemoryName string `json:"memory_name"` + // Journal sets the journal_mode pragma + Journal string `json:"journal,omitempty"` // Mirror turns on mirror mode for the backend, // which will use record IDs for Put and PutRange passed from // the resources, not generate a new one @@ -95,7 +106,7 @@ type Config struct { // CheckAndSetDefaults is a helper returns an error if the supplied configuration // is not enough to connect to sqlite func (cfg *Config) CheckAndSetDefaults() error { - if cfg.Path == "" && !cfg.Memory { + if cfg.Path == "" { return trace.BadParameter("specify directory path to the database using 'path' parameter") } if cfg.BufferSize == 0 { @@ -108,17 +119,69 @@ func (cfg *Config) CheckAndSetDefaults() error { cfg.Clock = clockwork.NewRealClock() } if cfg.Sync == "" { - cfg.Sync = syncOFF + cfg.Sync = defaultSync } if cfg.BusyTimeout == 0 { - cfg.BusyTimeout = busyTimeout - } - if cfg.MemoryName == "" { - cfg.MemoryName = defaultDBFile + cfg.BusyTimeout = defaultBusyTimeout } return nil } +// ConnectionURI returns a connection string usable with sqlite according to the +// Config. +func (cfg *Config) ConnectionURI() string { + params := url.Values{} + params.Set("_busy_timeout", strconv.Itoa(cfg.BusyTimeout)) + // The _txlock parameter is parsed by go-sqlite to determine if (all) + // transactions should be started with `BEGIN DEFERRED` (the default, same + // as `BEGIN`), `BEGIN IMMEDIATE` or `BEGIN EXCLUSIVE`. + // + // The way we use sqlite relies entirely on the busy timeout handler (also + // configured through the connection URL, with the _busy_timeout parameter) + // to address concurrency problems, and treats any SQLITE_BUSY errors as a + // fatal issue with the database; however, in scenarios with multiple + // readwriters it is possible to still get a busy error even with a generous + // busy timeout handler configured, as two transactions that both start off + // with a SELECT - thus acquiring a SHARED lock, see + // https://www.sqlite.org/lockingv3.html#transaction_control - then attempt + // to upgrade to a RESERVED lock to upsert or delete something can end up + // requiring one of the two transactions to forcibly rollback to avoid a + // deadlock, which is signaled by the sqlite engine with a SQLITE_BUSY error + // returned to one of the two. When that happens, a concurrent-aware program + // can just try the transaction again a few times - making sure to disregard + // what was read before the transaction actually committed. + // + // As we're not really interested in concurrent sqlite access (process + // storage has very little written to, sharing a sqlite database as the + // backend between two auths is not really supported, and caches shouldn't + // ever run on the same underlying sqlite backend) we instead start every + // transaction with `BEGIN IMMEDIATE`, which grabs a RESERVED lock + // immediately (waiting for the busy timeout in case some other connection + // to the database has the lock) at the beginning of the transaction, thus + // avoiding any spurious SQLITE_BUSY error that can happen halfway through a + // transaction. + // + // If we end up requiring better concurrent access to sqlite in the future + // we should consider enabling Write-Ahead Logging mode, to actually allow + // for reads to happen at the same time as writes, adding some amount of + // retries to inTransaction, and double-checking that all uses of it + // correctly handle the possibility of the transaction being restarted. + params.Set("_txlock", "immediate") + if cfg.Sync != "" { + params.Set("_sync", cfg.Sync) + } + if cfg.Journal != "" { + params.Set("_journal", cfg.Journal) + } + + u := url.URL{ + Scheme: "file", + Opaque: url.QueryEscape(filepath.Join(cfg.Path, defaultDBFile)), + RawQuery: params.Encode(), + } + return u.String() +} + // New returns a new instance of sqlite backend func New(ctx context.Context, params backend.Params) (*Backend, error) { var cfg *Config @@ -135,23 +198,18 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) { if err := cfg.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } - var connectorURL string - if !cfg.Memory { - // Ensure that the path to the root directory exists. - err := os.MkdirAll(cfg.Path, defaultDirMode) - if err != nil { - return nil, trace.ConvertSystemError(err) - } - fullPath := filepath.Join(cfg.Path, defaultDBFile) - connectorURL = fmt.Sprintf("file:%v?_busy_timeout=%v&_sync=%v", fullPath, cfg.BusyTimeout, cfg.Sync) - } else { - connectorURL = fmt.Sprintf("file:%v?mode=memory", cfg.MemoryName) + connectionURI := cfg.ConnectionURI() + // Ensure that the path to the root directory exists. + err := os.MkdirAll(cfg.Path, defaultDirMode) + if err != nil { + return nil, trace.ConvertSystemError(err) } - db, err := sql.Open(BackendName, connectorURL) + db, err := sql.Open("sqlite3", cfg.ConnectionURI()) if err != nil { - return nil, trace.Wrap(err, "error opening URI: %v", connectorURL) + return nil, trace.Wrap(err, "error opening URI: %v", connectionURI) } - // serialize access to sqlite to avoid database is locked errors + // serialize access to sqlite, as we're using immediate transactions anyway, + // and in-memory go locks are faster than sqlite locks db.SetMaxOpenConns(1) buf := backend.NewCircularBuffer( backend.BufferCapacity(cfg.BufferSize), @@ -169,9 +227,9 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) { watchStarted: watchStarted, signalWatchStart: signalWatchStart, } - l.Debugf("Connected to: %v, poll stream period: %v", connectorURL, cfg.PollStreamPeriod) + l.Debugf("Connected to: %v, poll stream period: %v", connectionURI, cfg.PollStreamPeriod) if err := l.createSchema(); err != nil { - return nil, trace.Wrap(err, "error creating schema: %v", connectorURL) + return nil, trace.Wrap(err, "error creating schema: %v", connectionURI) } if err := l.showPragmas(); err != nil { l.Warningf("Failed to show pragma settings: %v.", err) @@ -204,17 +262,22 @@ type Backend struct { // parameters, when called, logs some key PRAGMA values func (l *Backend) showPragmas() error { return l.inTransaction(l.ctx, func(tx *sql.Tx) error { - row := tx.QueryRowContext(l.ctx, "PRAGMA synchronous;") - var syncValue string - if err := row.Scan(&syncValue); err != nil { + var journalMode string + row := tx.QueryRowContext(l.ctx, "PRAGMA journal_mode;") + if err := row.Scan(&journalMode); err != nil { + return trace.Wrap(err) + } + row = tx.QueryRowContext(l.ctx, "PRAGMA synchronous;") + var synchronous string + if err := row.Scan(&synchronous); err != nil { return trace.Wrap(err) } - var timeoutValue string + var busyTimeout string row = tx.QueryRowContext(l.ctx, "PRAGMA busy_timeout;") - if err := row.Scan(&timeoutValue); err != nil { + if err := row.Scan(&busyTimeout); err != nil { return trace.Wrap(err) } - l.Debugf("Synchronous: %v, busy timeout: %v", syncValue, timeoutValue) + l.Debugf("journal_mode=%v, synchronous=%v, busy_timeout=%v", journalMode, synchronous, busyTimeout) return nil }) } diff --git a/lib/backend/lite/litemem_test.go b/lib/backend/lite/litemem_test.go deleted file mode 100644 index 418900301847e..0000000000000 --- a/lib/backend/lite/litemem_test.go +++ /dev/null @@ -1,119 +0,0 @@ -/* -Copyright 2018-2019 Gravitational, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package lite - -import ( - "context" - "time" - - "github.com/gravitational/teleport/lib/backend" - "github.com/gravitational/teleport/lib/backend/test" - - "github.com/jonboulle/clockwork" - "gopkg.in/check.v1" -) - -type LiteMemSuite struct { - bk *Backend - suite test.BackendSuite -} - -var _ = check.Suite(&LiteMemSuite{}) - -func (s *LiteMemSuite) SetUpSuite(c *check.C) { - clock := clockwork.NewFakeClock() - newBackend := func() (backend.Backend, error) { - return NewWithConfig(context.Background(), Config{ - Memory: true, - PollStreamPeriod: 300 * time.Millisecond, - Clock: clock, - }) - } - s.suite.NewBackend = newBackend - s.suite.Clock = clock -} - -func (s *LiteMemSuite) SetUpTest(c *check.C) { - bk, err := s.suite.NewBackend() - c.Assert(err, check.IsNil) - s.bk = bk.(*Backend) - s.suite.B = s.bk -} - -func (s *LiteMemSuite) TearDownTest(c *check.C) { - if s.bk != nil { - c.Assert(s.bk.Close(), check.IsNil) - } -} - -func (s *LiteMemSuite) TestCRUD(c *check.C) { - s.suite.CRUD(c) -} - -func (s *LiteMemSuite) TestRange(c *check.C) { - s.suite.Range(c) -} - -func (s *LiteMemSuite) TestCompareAndSwap(c *check.C) { - s.suite.CompareAndSwap(c) -} - -func (s *LiteMemSuite) TestExpiration(c *check.C) { - s.suite.Expiration(c) -} - -func (s *LiteMemSuite) TestKeepAlive(c *check.C) { - s.suite.KeepAlive(c) -} - -func (s *LiteMemSuite) TestEvents(c *check.C) { - s.suite.Events(c) -} - -func (s *LiteMemSuite) TestWatchersClose(c *check.C) { - s.suite.WatchersClose(c) -} - -func (s *LiteMemSuite) TestDeleteRange(c *check.C) { - s.suite.DeleteRange(c) -} - -func (s *LiteMemSuite) TestPutRange(c *check.C) { - s.suite.PutRange(c) -} - -func (s *LiteMemSuite) TestLocking(c *check.C) { - s.suite.Locking(c, s.bk) -} - -func (s *LiteMemSuite) TestConcurrentOperations(c *check.C) { - bk, err := s.suite.NewBackend() - c.Assert(err, check.IsNil) - defer bk.Close() - s.suite.B2 = bk - s.suite.ConcurrentOperations(c) -} - -func (s *LiteMemSuite) TestMirror(c *check.C) { - mem, err := NewWithConfig(context.Background(), Config{ - Memory: true, - Mirror: true, - PollStreamPeriod: 300 * time.Millisecond, - }) - c.Assert(err, check.IsNil) - s.suite.Mirror(c, mem) -} diff --git a/lib/service/service.go b/lib/service/service.go index 3209f5a5b8785..9487994c6618d 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -1526,7 +1526,6 @@ func (process *TeleportProcess) newAccessCache(cfg accessCacheConfig) (*cache.Ca lite.Config{ Path: path, EventsOff: !cfg.events, - Memory: false, Mirror: true, PollStreamPeriod: 100 * time.Millisecond, }) @@ -3345,12 +3344,18 @@ func (process *TeleportProcess) StartShutdown(ctx context.Context) context.Conte process.log.Warnf("Error waiting for all services to complete: %v", err) } process.log.Debug("All supervisor functions are completed.") - localAuth := process.getLocalAuth() - if localAuth != nil { - if err := process.localAuth.Close(); err != nil { + + if localAuth := process.getLocalAuth(); localAuth != nil { + if err := localAuth.Close(); err != nil { process.log.Warningf("Failed closing auth server: %v.", err) } } + + if process.storage != nil { + if err := process.storage.Close(); err != nil { + process.log.Warningf("Failed closing process storage: %v.", err) + } + } }() go process.printShutdownStatus(localCtx) return localCtx @@ -3372,9 +3377,9 @@ func (process *TeleportProcess) Close() error { process.Config.Keygen.Close() var errors []error - localAuth := process.getLocalAuth() - if localAuth != nil { - errors = append(errors, process.localAuth.Close()) + + if localAuth := process.getLocalAuth(); localAuth != nil { + errors = append(errors, localAuth.Close()) } if process.storage != nil {