From f43e661cf122c7faee2b2d88f1319dcfc79a93b3 Mon Sep 17 00:00:00 2001 From: Anton Miniailo Date: Tue, 13 Sep 2022 10:58:03 -0400 Subject: [PATCH 1/4] Refactor our `utils/fs` locking to use `flock` library and add support for windows Flock works differently on linx/mac vs windows. On windows it's impossible to read/write file after locking, so we need to create separate lockfile. --- lib/client/keystore.go | 19 +------ lib/events/filesessions/fileasync.go | 18 ++++--- lib/events/filesessions/filestream.go | 7 +-- lib/utils/fs.go | 64 +++++++++++++++++++++++ lib/utils/fs_test.go | 75 +++++++++++++++++++++++++++ lib/utils/fs_unix.go | 48 +++-------------- lib/utils/fs_windows.go | 36 ++++++------- 7 files changed, 178 insertions(+), 89 deletions(-) create mode 100644 lib/utils/fs_test.go diff --git a/lib/client/keystore.go b/lib/client/keystore.go index ac74337a595fa..81c407ec74078 100644 --- a/lib/client/keystore.go +++ b/lib/client/keystore.go @@ -31,8 +31,6 @@ import ( "golang.org/x/crypto/ssh" - "github.com/gofrs/flock" - "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/constants" "github.com/gravitational/teleport/api/profile" @@ -582,24 +580,11 @@ func (fs *fsLocalNonSessionKeyStore) kubeCertPath(idx KeyIndex, kubename string) return keypaths.KubeCertPath(fs.KeyDir, idx.ProxyHost, idx.Username, idx.ClusterName, kubename) } -// acquireFileLock is trying to lock the file, until it's successful or timeout is exceeded. -// File will be created if it doesn't exist. -func acquireFileLock(filePath string, timeout time.Duration) (func() error, error) { - fileLock := flock.New(filePath) - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - if _, err := fileLock.TryLockContext(ctx, 10*time.Millisecond); err != nil { - return nil, err - } - - return fileLock.Unlock, nil -} - // AddKnownHostKeys adds a new entry to `known_hosts` file. func (fs *fsLocalNonSessionKeyStore) AddKnownHostKeys(hostname, proxyHost string, hostKeys []ssh.PublicKey) (retErr error) { // We're trying to serialize our writes to the 'known_hosts' file to avoid corruption, since there // are cases when multiple tsh instances will try to write to it. - unlock, err := acquireFileLock(fs.knownHostsPath(), 5*time.Second) + unlock, err := utils.FSTryWriteLockTimeout(context.Background(), fs.knownHostsPath(), 5*time.Second) if err != nil { return trace.WrapWithMessage(err, "could not acquire lock for the `known_hosts` file") } @@ -695,7 +680,7 @@ func matchesWildcard(hostname, pattern string) bool { // GetKnownHostKeys returns all known public keys from `known_hosts`. func (fs *fsLocalNonSessionKeyStore) GetKnownHostKeys(hostname string) (keys []ssh.PublicKey, retErr error) { - unlock, err := acquireFileLock(fs.knownHostsPath(), 5*time.Second) + unlock, err := utils.FSTryReadLockTimeout(context.Background(), fs.knownHostsPath(), 5*time.Second) if err != nil { return nil, trace.WrapWithMessage(err, "could not acquire lock for the `known_hosts` file") } diff --git a/lib/events/filesessions/fileasync.go b/lib/events/filesessions/fileasync.go index 70216b9956bd2..0ae9a65044c2e 100644 --- a/lib/events/filesessions/fileasync.go +++ b/lib/events/filesessions/fileasync.go @@ -114,7 +114,6 @@ func NewUploader(cfg UploaderConfig) (*Uploader, error) { // the upload that have been aborted. // // It marks corrupted session files to skip their processing. -// type Uploader struct { semaphore chan struct{} @@ -241,7 +240,7 @@ func (u *Uploader) Scan(ctx context.Context) (*ScanStats, error) { } stats.Scanned++ if err := u.startUpload(ctx, fi.Name()); err != nil { - if trace.IsCompareFailed(err) { + if errors.Is(err, utils.ErrUnsuccessfulLockTry) { u.log.Debugf("Scan is skipping recording %v that is locked by another process.", fi.Name()) continue } @@ -277,6 +276,7 @@ type upload struct { sessionID session.ID reader *events.ProtoReader file *os.File + fileUnlockFn func() error checkpointFile *os.File } @@ -322,7 +322,7 @@ func (u *upload) writeStatus(status apievents.StreamStatus) error { func (u *upload) Close() error { return trace.NewAggregate( u.reader.Close(), - utils.FSUnlock(u.file), + u.fileUnlockFn(), u.file.Close(), utils.NilCloser(u.checkpointFile).Close(), ) @@ -366,17 +366,19 @@ func (u *Uploader) startUpload(ctx context.Context, fileName string) error { if err != nil { return trace.ConvertSystemError(err) } - if err := utils.FSTryWriteLock(sessionFile); err != nil { + unlock, err := utils.FSTryWriteLock(sessionFilePath) + if err != nil { if e := sessionFile.Close(); e != nil { u.log.WithError(e).Warningf("Failed to close %v.", fileName) } - return trace.Wrap(err) + return trace.WrapWithMessage(err, "could not acquire file lock for %q", sessionFilePath) } upload := &upload{ - sessionID: sessionID, - reader: events.NewProtoReader(sessionFile), - file: sessionFile, + sessionID: sessionID, + reader: events.NewProtoReader(sessionFile), + file: sessionFile, + fileUnlockFn: unlock, } upload.checkpointFile, err = os.OpenFile(u.checkpointFilePath(sessionID), os.O_RDWR|os.O_CREATE, 0600) if err != nil { diff --git a/lib/events/filesessions/filestream.go b/lib/events/filesessions/filestream.go index ff49e7e0ef65e..8011d8887f552 100644 --- a/lib/events/filesessions/filestream.go +++ b/lib/events/filesessions/filestream.go @@ -148,11 +148,12 @@ func (h *Handler) CompleteUpload(ctx context.Context, upload events.StreamUpload if err != nil { return trace.ConvertSystemError(err) } - if err := utils.FSTryWriteLock(f); err != nil { - return trace.Wrap(err) + unlock, err := utils.FSTryWriteLock(uploadPath) + if err != nil { + return trace.WrapWithMessage(err, "could not acquire file lock for %q", uploadPath) } defer func() { - if err := utils.FSUnlock(f); err != nil { + if err := unlock(); err != nil { h.WithError(err).Errorf("Failed to unlock filesystem lock.") } if err := f.Close(); err != nil { diff --git a/lib/utils/fs.go b/lib/utils/fs.go index 82e73239a065f..cb142f2d4bae4 100644 --- a/lib/utils/fs.go +++ b/lib/utils/fs.go @@ -17,13 +17,21 @@ limitations under the License. package utils import ( + "context" + "errors" + "github.com/gofrs/flock" "os" "path/filepath" + "time" "github.com/gravitational/teleport" "github.com/gravitational/trace" ) +// ErrUnsuccessfulLockTry designates an error when we temporarily couldn't acquire lock +// (most probably it was already locked by someone else), another try might succeed. +var ErrUnsuccessfulLockTry = errors.New("could not acquire lock on the file at this time") + // OpenFileWithFlagsFunc defines a function used to open files providing options. type OpenFileWithFlagsFunc func(name string, flag int, perm os.FileMode) (*os.File, error) @@ -144,3 +152,59 @@ func StatDir(path string) (os.FileInfo, error) { } return fi, nil } + +// FSTryWriteLock tries to grab write lock, returns ErrUnsuccessfulLockTry +// if lock is already acquired by someone else +func FSTryWriteLock(filePath string) (unlock func() error, err error) { + fileLock := flock.New(filePath + lockPostfix) + locked, err := fileLock.TryLock() + if err != nil { + return nil, trace.ConvertSystemError(err) + } + if !locked { + return nil, trace.Wrap(ErrUnsuccessfulLockTry) + } + + return unlockWrapper(fileLock.Unlock, fileLock.Path()), nil +} + +// FSTryWriteLockTimeout tries to grab write lock, it's doing it until locks is acquired, or timeout is expired, +// or context is expired. +func FSTryWriteLockTimeout(ctx context.Context, filePath string, timeout time.Duration) (unlock func() error, err error) { + fileLock := flock.New(filePath + lockPostfix) + timedCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + if _, err := fileLock.TryLockContext(timedCtx, 10*time.Millisecond); err != nil { + return nil, trace.ConvertSystemError(err) + } + + return unlockWrapper(fileLock.Unlock, fileLock.Path()), nil +} + +// FSTryReadLock tries to grab write lock, returns ErrUnsuccessfulLockTry +// if lock is already acquired by someone else +func FSTryReadLock(filePath string) (unlock func() error, err error) { + fileLock := flock.New(filePath + lockPostfix) + locked, err := fileLock.TryRLock() + if err != nil { + return nil, trace.ConvertSystemError(err) + } + if !locked { + return nil, trace.Wrap(ErrUnsuccessfulLockTry) + } + + return unlockWrapper(fileLock.Unlock, fileLock.Path()), nil +} + +// FSTryReadLockTimeout tries to grab read lock, it's doing it until locks is acquired, or timeout is expired, +// or context is expired. +func FSTryReadLockTimeout(ctx context.Context, filePath string, timeout time.Duration) (unlock func() error, err error) { + fileLock := flock.New(filePath + lockPostfix) + timedCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + if _, err := fileLock.TryRLockContext(timedCtx, 10*time.Millisecond); err != nil { + return nil, trace.ConvertSystemError(err) + } + + return unlockWrapper(fileLock.Unlock, fileLock.Path()), nil +} diff --git a/lib/utils/fs_test.go b/lib/utils/fs_test.go new file mode 100644 index 0000000000000..e5c0955f44c9b --- /dev/null +++ b/lib/utils/fs_test.go @@ -0,0 +1,75 @@ +package utils + +import ( + "context" + "github.com/stretchr/testify/require" + "os" + "testing" + "time" +) + +func TestLocks(t *testing.T) { + t.Parallel() + + tmpFile, err := os.CreateTemp("", "teleport-lock-test") + fp := tmpFile.Name() + t.Cleanup(func() { + _ = os.Remove(fp) + }) + require.NoError(t, err) + + // Can take read lock + unlock, err := FSTryReadLock(fp) + require.NoError(t, err) + + require.NoError(t, unlock()) + + // Can take write lock + unlock, err = FSTryWriteLock(fp) + require.NoError(t, err) + + // Can't take read lock while write lock is held. + unlock2, err := FSTryReadLock(fp) + require.ErrorIs(t, err, ErrUnsuccessfulLockTry) + require.Nil(t, unlock2) + + // Can't take write lock while another write lock is held. + unlock2, err = FSTryWriteLock(fp) + require.ErrorIs(t, err, ErrUnsuccessfulLockTry) + require.Nil(t, unlock2) + + require.NoError(t, unlock()) + + unlock, err = FSTryReadLock(fp) + require.NoError(t, err) + + // Can take second read lock on the same file. + unlock2, err = FSTryReadLock(fp) + require.NoError(t, err) + + require.NoError(t, unlock()) + require.NoError(t, unlock2()) + + // Can take read lock with timeout + unlock, err = FSTryReadLockTimeout(context.Background(), fp, time.Second) + require.NoError(t, err) + require.NoError(t, unlock()) + + // Can take write lock with timeout + unlock, err = FSTryWriteLockTimeout(context.Background(), fp, time.Second) + require.NoError(t, err) + + // Fails because timeout is exceeded, since file is already locked. + unlock2, err = FSTryWriteLockTimeout(context.Background(), fp, time.Millisecond) + require.ErrorIs(t, err, context.DeadlineExceeded) + require.Nil(t, unlock2) + + // Fails because context is expired while waiting for timeout. + ctx, cancel := context.WithDeadline(context.Background(), time.Now()) + defer cancel() + unlock2, err = FSTryWriteLockTimeout(ctx, fp, time.Hour*1000) + require.ErrorIs(t, err, context.DeadlineExceeded) + require.Nil(t, unlock2) + + require.NoError(t, unlock()) +} diff --git a/lib/utils/fs_unix.go b/lib/utils/fs_unix.go index 55221c26a8deb..0472b76ec6f5e 100644 --- a/lib/utils/fs_unix.go +++ b/lib/utils/fs_unix.go @@ -19,48 +19,14 @@ limitations under the License. package utils -import ( - "os" - "syscall" +// On non-windows we lock the target file itself. +const lockPostfix = "" - "github.com/gravitational/trace" -) - -// FSWriteLock grabs Flock-style filesystem lock on an open file -// in exclusive mode. -func FSWriteLock(f *os.File) error { - if err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX); err != nil { - return trace.ConvertSystemError(err) - } - return nil -} - -// FSTryWriteLock tries to grab write lock, returns CompareFailed -// if lock is already acquired -func FSTryWriteLock(f *os.File) error { - err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) - if err != nil { - if err == syscall.EWOULDBLOCK { - return trace.CompareFailed("lock %v is acquired by another process", f.Name()) +var unlockWrapper = func(unlockFn func() error, path string) func() error { + return func() error { + if unlockFn == nil { + return nil } - return trace.ConvertSystemError(err) - } - return nil -} - -// FSReadLock grabs Flock-style filesystem lock on an open file -// in read (shared) mode -func FSReadLock(f *os.File) error { - if err := syscall.Flock(int(f.Fd()), syscall.LOCK_SH); err != nil { - return trace.ConvertSystemError(err) - } - return nil -} - -// FSUnlock unlcocks Flock-style filesystem lock -func FSUnlock(f *os.File) error { - if err := syscall.Flock(int(f.Fd()), syscall.LOCK_UN); err != nil { - return trace.ConvertSystemError(err) + return unlockFn() } - return nil } diff --git a/lib/utils/fs_windows.go b/lib/utils/fs_windows.go index 6ae164e3633c0..883d2a6c7eef5 100644 --- a/lib/utils/fs_windows.go +++ b/lib/utils/fs_windows.go @@ -21,26 +21,22 @@ limitations under the License. import ( "os" - - "github.com/gravitational/trace" ) -// FSWriteLock not supported on Windows. -func FSWriteLock(f *os.File) error { - return trace.BadParameter("file locking not supported on Windows") -} - -// FSTryWriteLock not supported on Windows. -func FSTryWriteLock(f *os.File) error { - return trace.BadParameter("file locking not supported on Windows") -} - -// FSReadLock not supported on Windows. -func FSReadLock(f *os.File) error { - return trace.BadParameter("file locking not supported on Windows") -} - -// FSUnlock not supported on Windows. -func FSUnlock(f *os.File) error { - return trace.BadParameter("file locking not supported on Windows") +// On Windows we use auxiliary .lock files to acquire locks, so we can still read/write target files +// themselves. On unlock we delete the .lock file. +const lockPostfix = ".lock" + +var unlockWrapper = func(unlockFn func() error, path string) func() error { + return func() error { + if unlockFn == nil { + return nil + } + err := unlockFn() + + // At this point file can be locked again, and we can get an error, so we do our best effort + // to remove .lock file, but can't guarantee it. Last locker should be able to successfully clean it. + _ = os.Remove(path) + return err + } } From cc9f1725eef1fa5a6e7872621cff8d4683d61266 Mon Sep 17 00:00:00 2001 From: Anton Miniailo Date: Wed, 14 Sep 2022 09:33:43 -0400 Subject: [PATCH 2/4] Make unlockWrapper() regular function --- lib/utils/fs.go | 3 ++- lib/utils/fs_test.go | 19 ++++++++++++++++++- lib/utils/fs_unix.go | 2 +- lib/utils/fs_windows.go | 2 +- 4 files changed, 22 insertions(+), 4 deletions(-) diff --git a/lib/utils/fs.go b/lib/utils/fs.go index cb142f2d4bae4..573bcf43f79d6 100644 --- a/lib/utils/fs.go +++ b/lib/utils/fs.go @@ -19,11 +19,12 @@ package utils import ( "context" "errors" - "github.com/gofrs/flock" "os" "path/filepath" "time" + "github.com/gofrs/flock" + "github.com/gravitational/teleport" "github.com/gravitational/trace" ) diff --git a/lib/utils/fs_test.go b/lib/utils/fs_test.go index e5c0955f44c9b..10e60cf0a82b6 100644 --- a/lib/utils/fs_test.go +++ b/lib/utils/fs_test.go @@ -1,11 +1,28 @@ +/* +Copyright 2022 Gravitational, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package utils import ( "context" - "github.com/stretchr/testify/require" "os" "testing" "time" + + "github.com/stretchr/testify/require" ) func TestLocks(t *testing.T) { diff --git a/lib/utils/fs_unix.go b/lib/utils/fs_unix.go index 0472b76ec6f5e..cab531ba2b1d0 100644 --- a/lib/utils/fs_unix.go +++ b/lib/utils/fs_unix.go @@ -22,7 +22,7 @@ package utils // On non-windows we lock the target file itself. const lockPostfix = "" -var unlockWrapper = func(unlockFn func() error, path string) func() error { +func unlockWrapper(unlockFn func() error, path string) func() error { return func() error { if unlockFn == nil { return nil diff --git a/lib/utils/fs_windows.go b/lib/utils/fs_windows.go index 883d2a6c7eef5..5f68bc647644c 100644 --- a/lib/utils/fs_windows.go +++ b/lib/utils/fs_windows.go @@ -27,7 +27,7 @@ import ( // themselves. On unlock we delete the .lock file. const lockPostfix = ".lock" -var unlockWrapper = func(unlockFn func() error, path string) func() error { +func unlockWrapper(unlockFn func() error, path string) func() error { return func() error { if unlockFn == nil { return nil From 91204e39f9653c20ad27c0b511b40481d053b05e Mon Sep 17 00:00:00 2001 From: Anton Miniailo Date: Wed, 14 Sep 2022 16:26:01 -0400 Subject: [PATCH 3/4] Make code a bit more clear. --- lib/utils/fs.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/lib/utils/fs.go b/lib/utils/fs.go index 573bcf43f79d6..95a399d1e32fc 100644 --- a/lib/utils/fs.go +++ b/lib/utils/fs.go @@ -154,16 +154,20 @@ func StatDir(path string) (os.FileInfo, error) { return fi, nil } +func getPlatformLockFilePath(path string) string { + return path + lockPostfix +} + // FSTryWriteLock tries to grab write lock, returns ErrUnsuccessfulLockTry // if lock is already acquired by someone else func FSTryWriteLock(filePath string) (unlock func() error, err error) { - fileLock := flock.New(filePath + lockPostfix) + fileLock := flock.New(getPlatformLockFilePath(filePath)) locked, err := fileLock.TryLock() if err != nil { return nil, trace.ConvertSystemError(err) } if !locked { - return nil, trace.Wrap(ErrUnsuccessfulLockTry) + return nil, trace.Retry(ErrUnsuccessfulLockTry, "") } return unlockWrapper(fileLock.Unlock, fileLock.Path()), nil @@ -172,7 +176,7 @@ func FSTryWriteLock(filePath string) (unlock func() error, err error) { // FSTryWriteLockTimeout tries to grab write lock, it's doing it until locks is acquired, or timeout is expired, // or context is expired. func FSTryWriteLockTimeout(ctx context.Context, filePath string, timeout time.Duration) (unlock func() error, err error) { - fileLock := flock.New(filePath + lockPostfix) + fileLock := flock.New(getPlatformLockFilePath(filePath)) timedCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() if _, err := fileLock.TryLockContext(timedCtx, 10*time.Millisecond); err != nil { @@ -185,13 +189,13 @@ func FSTryWriteLockTimeout(ctx context.Context, filePath string, timeout time.Du // FSTryReadLock tries to grab write lock, returns ErrUnsuccessfulLockTry // if lock is already acquired by someone else func FSTryReadLock(filePath string) (unlock func() error, err error) { - fileLock := flock.New(filePath + lockPostfix) + fileLock := flock.New(getPlatformLockFilePath(filePath)) locked, err := fileLock.TryRLock() if err != nil { return nil, trace.ConvertSystemError(err) } if !locked { - return nil, trace.Wrap(ErrUnsuccessfulLockTry) + return nil, trace.Retry(ErrUnsuccessfulLockTry, "") } return unlockWrapper(fileLock.Unlock, fileLock.Path()), nil @@ -200,7 +204,7 @@ func FSTryReadLock(filePath string) (unlock func() error, err error) { // FSTryReadLockTimeout tries to grab read lock, it's doing it until locks is acquired, or timeout is expired, // or context is expired. func FSTryReadLockTimeout(ctx context.Context, filePath string, timeout time.Duration) (unlock func() error, err error) { - fileLock := flock.New(filePath + lockPostfix) + fileLock := flock.New(getPlatformLockFilePath(filePath)) timedCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() if _, err := fileLock.TryRLockContext(timedCtx, 10*time.Millisecond); err != nil { From 1321dd9fabc333d054fe33043e859ebde084973f Mon Sep 17 00:00:00 2001 From: Anton Miniailo Date: Thu, 15 Sep 2022 11:40:02 -0400 Subject: [PATCH 4/4] Move platform-dependand path func to platform specific source files. --- lib/utils/fs.go | 4 ---- lib/utils/fs_unix.go | 6 ++++-- lib/utils/fs_windows.go | 4 ++++ 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/lib/utils/fs.go b/lib/utils/fs.go index 95a399d1e32fc..03210501184e7 100644 --- a/lib/utils/fs.go +++ b/lib/utils/fs.go @@ -154,10 +154,6 @@ func StatDir(path string) (os.FileInfo, error) { return fi, nil } -func getPlatformLockFilePath(path string) string { - return path + lockPostfix -} - // FSTryWriteLock tries to grab write lock, returns ErrUnsuccessfulLockTry // if lock is already acquired by someone else func FSTryWriteLock(filePath string) (unlock func() error, err error) { diff --git a/lib/utils/fs_unix.go b/lib/utils/fs_unix.go index cab531ba2b1d0..9e8cda49d8d25 100644 --- a/lib/utils/fs_unix.go +++ b/lib/utils/fs_unix.go @@ -19,8 +19,10 @@ limitations under the License. package utils -// On non-windows we lock the target file itself. -const lockPostfix = "" +// On non-windows we just lock the target file itself. +func getPlatformLockFilePath(path string) string { + return path +} func unlockWrapper(unlockFn func() error, path string) func() error { return func() error { diff --git a/lib/utils/fs_windows.go b/lib/utils/fs_windows.go index 5f68bc647644c..04fa2bb82418a 100644 --- a/lib/utils/fs_windows.go +++ b/lib/utils/fs_windows.go @@ -27,6 +27,10 @@ import ( // themselves. On unlock we delete the .lock file. const lockPostfix = ".lock" +func getPlatformLockFilePath(path string) string { + return path + lockPostfix +} + func unlockWrapper(unlockFn func() error, path string) func() error { return func() error { if unlockFn == nil {