Skip to content

Commit

Permalink
Address problems in concurrent sqlite access (#10706)
Browse files Browse the repository at this point in the history
* Use BEGIN IMMEDIATE to start transactions

This makes it so all transactions grab a write lock
rather than a read lock that can be upgraded in case of
a write; in case of multiple writers (which, in our
case, can only happen during a restart as the new
process reopens the same sqlite database) this will
prevent two transactions from attempting to upgrade
their lock, which would cause a SQLITE_BUSY error in
one of them. In regular operation this shouldn't cause
a performance hit, as we're using a single connection
to the sqlite database (guarded by locks in the go side)
anyway.

* Escape path in sqlite connection URL

This makes it so that the sqlite backend supports paths with ? in them.

* Close process storage on TeleportProcess shutdown

This aligns the behavior of Shutdown with that of Close.

* Allow specifying the journal mode in sqlite

This will let sqlite backend users specify WAL mode in their config
file, and will allow us to specify alternate journal modes for our
on-disk caches in the future.

This also removes sqlite memory mode, as it's not used anywhere because
of its poor query performance compared to our in-memory backend, and
cleans up a bit of old cruft, and runs process storage in FULL sync
mode - it's very seldom written to and holds important data.
  • Loading branch information
espadolini committed Mar 16, 2022
1 parent 0d18409 commit cef6a9f
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 176 deletions.
1 change: 1 addition & 0 deletions lib/auth/state_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func NewProcessStorage(ctx context.Context, path string) (*ProcessStorage, error
litebk, err := lite.NewWithConfig(ctx, lite.Config{
Path: path,
EventsOff: true,
Sync: lite.SyncFull,
})
if err != nil {
return nil, trace.Wrap(err)
Expand Down
163 changes: 113 additions & 50 deletions lib/backend/lite/lite.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
"context"
"database/sql"
"database/sql/driver"
"fmt"
"net/url"
"os"
"path/filepath"
"runtime/debug"
"strconv"
"sync/atomic"
"time"

Expand All @@ -39,15 +40,36 @@ import (
)

const (
// BackendName is the name of this backend
// BackendName is the name of this backend.
BackendName = "sqlite"
// AlternativeName is another name of this backend.
AlternativeName = "dir"
defaultDirMode os.FileMode = 0770
defaultDBFile = "sqlite.db"
slowTransactionThreshold = time.Second
syncOFF = "OFF"
busyTimeout = 10000
AlternativeName = "dir"

// SyncOff disables file system sync after writing.
SyncOff = "OFF"
// SyncFull fsyncs the database file on disk after every write.
SyncFull = "FULL"

// JournalMemory keeps the rollback journal in memory instead of storing it
// on disk.
JournalMemory = "MEMORY"
)

const (
// defaultDirMode is the mode of the newly created directories that are part
// of the Path
defaultDirMode os.FileMode = 0770

// defaultDBFile is the file name of the sqlite db in the directory
// specified by Path
defaultDBFile = "sqlite.db"
slowTransactionThreshold = time.Second

// defaultSync is the default value for Sync
defaultSync = SyncOff

// defaultBusyTimeout is the default value for BusyTimeout, in ms
defaultBusyTimeout = 10000
)

// GetName is a part of backend API and it returns SQLite backend type
Expand All @@ -56,14 +78,6 @@ func GetName() string {
return BackendName
}

func init() {
sql.Register(BackendName, &sqlite3.SQLiteDriver{
ConnectHook: func(conn *sqlite3.SQLiteConn) error {
return nil
},
})
}

// Config structure represents configuration section
type Config struct {
// Path is a path to the database directory
Expand All @@ -77,15 +91,12 @@ type Config struct {
EventsOff bool `json:"events_off,omitempty"`
// Clock allows to override clock used in the backend
Clock clockwork.Clock `json:"-"`
// Sync sets synchronous pragrma
// Sync sets the synchronous pragma
Sync string `json:"sync,omitempty"`
// BusyTimeout sets busy timeout in milliseconds
BusyTimeout int `json:"busy_timeout,omitempty"`
// Memory turns memory mode of the database
Memory bool `json:"memory"`
// MemoryName sets the name of the database,
// set to "sqlite.db" by default
MemoryName string `json:"memory_name"`
// Journal sets the journal_mode pragma
Journal string `json:"journal,omitempty"`
// Mirror turns on mirror mode for the backend,
// which will use record IDs for Put and PutRange passed from
// the resources, not generate a new one
Expand All @@ -95,7 +106,7 @@ type Config struct {
// CheckAndSetDefaults is a helper returns an error if the supplied configuration
// is not enough to connect to sqlite
func (cfg *Config) CheckAndSetDefaults() error {
if cfg.Path == "" && !cfg.Memory {
if cfg.Path == "" {
return trace.BadParameter("specify directory path to the database using 'path' parameter")
}
if cfg.BufferSize == 0 {
Expand All @@ -108,17 +119,69 @@ func (cfg *Config) CheckAndSetDefaults() error {
cfg.Clock = clockwork.NewRealClock()
}
if cfg.Sync == "" {
cfg.Sync = syncOFF
cfg.Sync = defaultSync
}
if cfg.BusyTimeout == 0 {
cfg.BusyTimeout = busyTimeout
}
if cfg.MemoryName == "" {
cfg.MemoryName = defaultDBFile
cfg.BusyTimeout = defaultBusyTimeout
}
return nil
}

// ConnectionURI returns a connection string usable with sqlite according to the
// Config.
func (cfg *Config) ConnectionURI() string {
params := url.Values{}
params.Set("_busy_timeout", strconv.Itoa(cfg.BusyTimeout))
// The _txlock parameter is parsed by go-sqlite to determine if (all)
// transactions should be started with `BEGIN DEFERRED` (the default, same
// as `BEGIN`), `BEGIN IMMEDIATE` or `BEGIN EXCLUSIVE`.
//
// The way we use sqlite relies entirely on the busy timeout handler (also
// configured through the connection URL, with the _busy_timeout parameter)
// to address concurrency problems, and treats any SQLITE_BUSY errors as a
// fatal issue with the database; however, in scenarios with multiple
// readwriters it is possible to still get a busy error even with a generous
// busy timeout handler configured, as two transactions that both start off
// with a SELECT - thus acquiring a SHARED lock, see
// https://www.sqlite.org/lockingv3.html#transaction_control - then attempt
// to upgrade to a RESERVED lock to upsert or delete something can end up
// requiring one of the two transactions to forcibly rollback to avoid a
// deadlock, which is signaled by the sqlite engine with a SQLITE_BUSY error
// returned to one of the two. When that happens, a concurrent-aware program
// can just try the transaction again a few times - making sure to disregard
// what was read before the transaction actually committed.
//
// As we're not really interested in concurrent sqlite access (process
// storage has very little written to, sharing a sqlite database as the
// backend between two auths is not really supported, and caches shouldn't
// ever run on the same underlying sqlite backend) we instead start every
// transaction with `BEGIN IMMEDIATE`, which grabs a RESERVED lock
// immediately (waiting for the busy timeout in case some other connection
// to the database has the lock) at the beginning of the transaction, thus
// avoiding any spurious SQLITE_BUSY error that can happen halfway through a
// transaction.
//
// If we end up requiring better concurrent access to sqlite in the future
// we should consider enabling Write-Ahead Logging mode, to actually allow
// for reads to happen at the same time as writes, adding some amount of
// retries to inTransaction, and double-checking that all uses of it
// correctly handle the possibility of the transaction being restarted.
params.Set("_txlock", "immediate")
if cfg.Sync != "" {
params.Set("_sync", cfg.Sync)
}
if cfg.Journal != "" {
params.Set("_journal", cfg.Journal)
}

u := url.URL{
Scheme: "file",
Opaque: url.QueryEscape(filepath.Join(cfg.Path, defaultDBFile)),
RawQuery: params.Encode(),
}
return u.String()
}

// New returns a new instance of sqlite backend
func New(ctx context.Context, params backend.Params) (*Backend, error) {
var cfg *Config
Expand All @@ -135,23 +198,18 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
var connectorURL string
if !cfg.Memory {
// Ensure that the path to the root directory exists.
err := os.MkdirAll(cfg.Path, defaultDirMode)
if err != nil {
return nil, trace.ConvertSystemError(err)
}
fullPath := filepath.Join(cfg.Path, defaultDBFile)
connectorURL = fmt.Sprintf("file:%v?_busy_timeout=%v&_sync=%v", fullPath, cfg.BusyTimeout, cfg.Sync)
} else {
connectorURL = fmt.Sprintf("file:%v?mode=memory", cfg.MemoryName)
connectionURI := cfg.ConnectionURI()
// Ensure that the path to the root directory exists.
err := os.MkdirAll(cfg.Path, defaultDirMode)
if err != nil {
return nil, trace.ConvertSystemError(err)
}
db, err := sql.Open(BackendName, connectorURL)
db, err := sql.Open("sqlite3", cfg.ConnectionURI())
if err != nil {
return nil, trace.Wrap(err, "error opening URI: %v", connectorURL)
return nil, trace.Wrap(err, "error opening URI: %v", connectionURI)
}
// serialize access to sqlite to avoid database is locked errors
// serialize access to sqlite, as we're using immediate transactions anyway,
// and in-memory go locks are faster than sqlite locks
db.SetMaxOpenConns(1)
buf := backend.NewCircularBuffer(
backend.BufferCapacity(cfg.BufferSize),
Expand All @@ -169,9 +227,9 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) {
watchStarted: watchStarted,
signalWatchStart: signalWatchStart,
}
l.Debugf("Connected to: %v, poll stream period: %v", connectorURL, cfg.PollStreamPeriod)
l.Debugf("Connected to: %v, poll stream period: %v", connectionURI, cfg.PollStreamPeriod)
if err := l.createSchema(); err != nil {
return nil, trace.Wrap(err, "error creating schema: %v", connectorURL)
return nil, trace.Wrap(err, "error creating schema: %v", connectionURI)
}
if err := l.showPragmas(); err != nil {
l.Warningf("Failed to show pragma settings: %v.", err)
Expand Down Expand Up @@ -204,17 +262,22 @@ type Backend struct {
// parameters, when called, logs some key PRAGMA values
func (l *Backend) showPragmas() error {
return l.inTransaction(l.ctx, func(tx *sql.Tx) error {
row := tx.QueryRowContext(l.ctx, "PRAGMA synchronous;")
var syncValue string
if err := row.Scan(&syncValue); err != nil {
var journalMode string
row := tx.QueryRowContext(l.ctx, "PRAGMA journal_mode;")
if err := row.Scan(&journalMode); err != nil {
return trace.Wrap(err)
}
row = tx.QueryRowContext(l.ctx, "PRAGMA synchronous;")
var synchronous string
if err := row.Scan(&synchronous); err != nil {
return trace.Wrap(err)
}
var timeoutValue string
var busyTimeout string
row = tx.QueryRowContext(l.ctx, "PRAGMA busy_timeout;")
if err := row.Scan(&timeoutValue); err != nil {
if err := row.Scan(&busyTimeout); err != nil {
return trace.Wrap(err)
}
l.Debugf("Synchronous: %v, busy timeout: %v", syncValue, timeoutValue)
l.Debugf("journal_mode=%v, synchronous=%v, busy_timeout=%v", journalMode, synchronous, busyTimeout)
return nil
})
}
Expand Down
119 changes: 0 additions & 119 deletions lib/backend/lite/litemem_test.go

This file was deleted.

Loading

0 comments on commit cef6a9f

Please sign in to comment.