diff --git a/.chloggen/filestorage-recreate-subprocess-precheck.yaml b/.chloggen/filestorage-recreate-subprocess-precheck.yaml new file mode 100644 index 0000000000000..bcc3cf90c2040 --- /dev/null +++ b/.chloggen/filestorage-recreate-subprocess-precheck.yaml @@ -0,0 +1,32 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: extension/file_storage + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Recover from additional bbolt panics when the `recreate` option is enabled, so the collector no longer crash-loops on certain corrupt storage files. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35899] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Previously, `recreate` only caught bbolt panics raised on the calling goroutine. Panics raised on + goroutines spawned inside `bbolt.Open` (such as the `freepages: failed to get all reachable pages` + panic) would still terminate the collector. Now `recreate` will also catch these panics and rename + the corrupt database to `{filename}.{timestamp}.backup` to create a fresh database in its place, + so the collector can continue operating normally. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/extension/storage/filestorage/README.md b/extension/storage/filestorage/README.md index 49c51c426d0b0..78850d1142848 100644 --- a/extension/storage/filestorage/README.md +++ b/extension/storage/filestorage/README.md @@ -33,8 +33,8 @@ The default timeout is `1s`. By default, the directories will be created with `0750 (rwxr-x---)` permissions, minus the process umask. Use `directory_permissions` to customize directory creation permissions, minus the process umask. -`recreate` when set, the filestorage extension will automatically rename the corrupted bbolt database and create a new one when certain bbolt panics occur. -See (#35899)[https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35899] for more details. +`recreate` when set, the filestorage extension will automatically rename the corrupted bbolt database and create a new one when certain bbolt panics occur. +See [#35899](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35899) and [#48565](https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/48565) for more details. If the database fails to open due to corruption (resulting in a panic), the corrupted file will be automatically renamed to `{filename}.{ISO 8601 timestamp}.backup` and a new data file will be created from scratch. This allows the collector to continue operating even when encountering certain bbolt panics. If no corruption is detected, the existing database continues to be used normally. There may still be scenarios where manually removing or renaming the file may be required, and this feature flag is not a panacea for all bbolt panics you can encounter. @@ -49,6 +49,9 @@ There may still be scenarios where manually removing or renaming the file may be `compaction.directory` specifies the directory used for compaction (as a midstep). +> [!Warning] +> When using multiple `file_storage` extension instances, each must have a unique `compaction.directory`. Concurrent compactions sharing the same directory can stomp on each other's temporary files (`tempdb*`) and produce corrupt databases. + `compaction.max_transaction_size` (default: 65536): defines maximum size of the compaction transaction. A value of zero will ignore transaction sizes. diff --git a/extension/storage/filestorage/extension.go b/extension/storage/filestorage/extension.go index 9d90aa2d98352..ee695b6234363 100644 --- a/extension/storage/filestorage/extension.go +++ b/extension/storage/filestorage/extension.go @@ -24,6 +24,17 @@ import ( type localFileStorage struct { cfg *Config logger *zap.Logger + + // precheckFn runs the corruption pre-check for a bbolt database file when + // Recreate is enabled. It is a struct field so tests can substitute a fake + // implementation that does not spawn a subprocess. + precheckFn func(ctx context.Context, path string, timeout time.Duration) error + + // newClientFn opens (or creates) the underlying bbolt client. It is a + // struct field so tests can substitute a fake that simulates a panic on + // the first call, allowing the defer/recover path in openClient to be + // exercised without constructing a real corrupt bbolt file. + newClientFn func(logger *zap.Logger, filePath string, timeout time.Duration, compactionCfg *CompactionConfig, noSync bool) (*fileStorageClient, error) } // Ensure this storage extension implements the appropriate interface @@ -44,8 +55,10 @@ func newLocalFileStorage(logger *zap.Logger, config *Config) (extension.Extensio } } return &localFileStorage{ - cfg: config, - logger: logger, + cfg: config, + logger: logger, + precheckFn: runPrecheck, + newClientFn: newClient, }, nil } @@ -65,7 +78,7 @@ func (*localFileStorage) Shutdown(context.Context) error { } // GetClient returns a storage client for an individual component -func (lfs *localFileStorage) GetClient(_ context.Context, kind component.Kind, ent component.ID, name string) (storage.Client, error) { +func (lfs *localFileStorage) GetClient(ctx context.Context, kind component.Kind, ent component.ID, name string) (storage.Client, error) { var rawName string if name == "" { rawName = fmt.Sprintf("%s_%s_%s", kindString(kind), ent.Type(), ent.Name()) @@ -76,15 +89,14 @@ func (lfs *localFileStorage) GetClient(_ context.Context, kind component.Kind, e rawName = sanitize(rawName) absoluteName := filepath.Join(lfs.cfg.Directory, rawName) - // Try to create client, handling panics if recreate is enabled - client, err := lfs.createClientWithPanicRecovery(absoluteName) + client, err := lfs.openClient(ctx, absoluteName) // If the error is due to filename being too long, truncate and try again if errors.Is(err, syscall.ENAMETOOLONG) { hashedName := filepath.Join(lfs.cfg.Directory, hash(rawName)) lfs.logger.Warn("filename too long, using hashed filename instead", zap.String("originalFile", absoluteName), zap.String("component", rawName), zap.String("hashedFileName", hashedName)) - client, err = lfs.createClientWithPanicRecovery(hashedName) + client, err = lfs.openClient(ctx, hashedName) } // return error if still not successful @@ -92,7 +104,6 @@ func (lfs *localFileStorage) GetClient(_ context.Context, kind component.Kind, e return nil, err } - // return if compaction is not required if lfs.cfg.Compaction.OnStart { compactionErr := client.Compact(lfs.cfg.Compaction.Directory, lfs.cfg.Timeout, lfs.cfg.Compaction.MaxTransactionSize) if compactionErr != nil { @@ -103,43 +114,69 @@ func (lfs *localFileStorage) GetClient(_ context.Context, kind component.Kind, e return client, nil } -// createClientWithPanicRecovery attempts to create a client, and if recreate is enabled -// and a panic occurs (typically due to database corruption), it will rename the file -// and try again with a fresh database -func (lfs *localFileStorage) createClientWithPanicRecovery(absoluteName string) (client *fileStorageClient, err error) { - // First attempt: try to create client normally - if !lfs.cfg.Recreate { - // If recreate is disabled, just try once - return newClient(lfs.logger, absoluteName, lfs.cfg.Timeout, lfs.cfg.Compaction, !lfs.cfg.FSync) - } - - // If recreate is enabled, handle potential panics during database opening - defer func() { - if r := recover(); r != nil { - lfs.logger.Warn("Database corruption detected, recreating database file", - zap.String("file", absoluteName), - zap.Any("panic", r)) - - // Rename the corrupted file with ISO 8601 timestamp - timestamp := time.Now().Format("2006-01-02T15:04:05.000") - backupName := absoluteName + "." + timestamp + ".backup" - if renameErr := os.Rename(absoluteName, backupName); renameErr != nil { - err = fmt.Errorf("error renaming corrupted database. Please remove %s manually: %w", absoluteName, renameErr) - return +// openClient opens (or creates) the bbolt database for a single client. +// +// When Recreate is enabled and the database file already exists, corruption +// recovery is performed in two layers: +// +// 1. Subprocess pre-check: catches panics raised inside goroutines spawned by +// bbolt.Open (e.g. the freepages "multiple references" panic), which the +// in-process defer/recover below cannot catch because Go's recover() only +// catches panics in its own goroutine. +// 2. In-process defer/recover around bbolt.Open: catches panics raised in the +// main goroutine (e.g. freepages' "failed to open read only tx" or "failed +// to rollback tx" panics, or panics from tx.recursivelyCheckBucket itself +// prior to the spawned goroutine reading from the error channel). +// +// On either signal of corruption, the file is renamed aside and a fresh +// database is opened in its place. +func (lfs *localFileStorage) openClient(ctx context.Context, absoluteName string) (client *fileStorageClient, err error) { + if lfs.cfg.Recreate { + if _, statErr := os.Stat(absoluteName); statErr == nil { + precheckErr := lfs.precheckFn(ctx, absoluteName, lfs.cfg.Timeout) + switch { + case precheckErr == nil: + // Database opened cleanly in the subprocess; safe to open here. + case errors.Is(precheckErr, errDBCorruption): + if renameErr := lfs.renameCorruptDB(absoluteName); renameErr != nil { + return nil, renameErr + } + default: + lfs.logger.Warn("Database pre-check returned non-corruption error; proceeding with open", + zap.String("file", absoluteName), + zap.Error(precheckErr)) } + } - lfs.logger.Info("Corrupted database file renamed", - zap.String("original", absoluteName), - zap.String("backup", backupName)) + defer func() { + if r := recover(); r != nil { + lfs.logger.Warn("bbolt.Open panicked in main goroutine; recreating database", + zap.String("file", absoluteName), + zap.Any("panic", r)) + if renameErr := lfs.renameCorruptDB(absoluteName); renameErr != nil { + err = renameErr + return + } + client, err = lfs.newClientFn(lfs.logger, absoluteName, lfs.cfg.Timeout, lfs.cfg.Compaction, !lfs.cfg.FSync) + } + }() + } - // Try to create client again with fresh database - client, err = newClient(lfs.logger, absoluteName, lfs.cfg.Timeout, lfs.cfg.Compaction, !lfs.cfg.FSync) - } - }() + return lfs.newClientFn(lfs.logger, absoluteName, lfs.cfg.Timeout, lfs.cfg.Compaction, !lfs.cfg.FSync) +} - // Try to create the client normally first - client, err = newClient(lfs.logger, absoluteName, lfs.cfg.Timeout, lfs.cfg.Compaction, !lfs.cfg.FSync) - return client, err +// renameCorruptDB moves a corrupt bbolt database file aside with a timestamped +// suffix so that a fresh database can be created in its place. +func (lfs *localFileStorage) renameCorruptDB(path string) error { + timestamp := time.Now().Format("2006-01-02T15:04:05.000") + backup := path + "." + timestamp + ".backup" + if err := os.Rename(path, backup); err != nil { + return fmt.Errorf("rename corrupt database %s -> %s (please remove manually): %w", path, backup, err) + } + lfs.logger.Warn("Renamed corrupt bbolt database; a fresh database will be created", + zap.String("original", path), + zap.String("backup", backup)) + return nil } func kindString(k component.Kind) string { diff --git a/extension/storage/filestorage/extension_test.go b/extension/storage/filestorage/extension_test.go index 4d64b0d718948..c564824d6a599 100644 --- a/extension/storage/filestorage/extension_test.go +++ b/extension/storage/filestorage/extension_test.go @@ -4,6 +4,7 @@ package filestorage import ( + "context" "fmt" "os" "path/filepath" @@ -11,6 +12,7 @@ import ( "strings" "sync" "testing" + "time" "github.com/google/uuid" "github.com/stretchr/testify/assert" @@ -667,13 +669,20 @@ func TestRecreate(t *testing.T) { require.NoError(t, ext.Shutdown(ctx)) } - // step 3: re-create the extension, but with Recreate=true and make sure that the data still exists - // (since recreate now only happens on panic, not always when recreate=true) + // step 3: re-create the extension with Recreate=true and make sure that + // the data still exists (the precheck must not trigger a rename on a + // healthy database). A fake precheck is injected so this test does not + // spawn a real subprocess. { config.Recreate = true ext, err := f.Create(ctx, extensiontest.NewNopSettings(f.Type()), config) require.NoError(t, err) require.NotNil(t, ext) + + lfs, ok := ext.(*localFileStorage) + require.True(t, ok) + lfs.precheckFn = func(context.Context, string, time.Duration) error { return nil } + se, ok := ext.(storage.Extension) require.True(t, ok) @@ -681,12 +690,10 @@ func TestRecreate(t *testing.T) { require.NoError(t, err) require.NotNil(t, client) - // The data should still exist since no panic occurred val, err := client.Get(ctx, "key") require.Equal(t, val, []byte("val")) require.NoError(t, err) - // close the extension require.NoError(t, client.Close(ctx)) require.NoError(t, ext.Shutdown(ctx)) } diff --git a/extension/storage/filestorage/precheck.go b/extension/storage/filestorage/precheck.go new file mode 100644 index 0000000000000..747728fbccf2e --- /dev/null +++ b/extension/storage/filestorage/precheck.go @@ -0,0 +1,134 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package filestorage // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/filestorage" + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "os/exec" + "time" + + "go.etcd.io/bbolt" +) + +// precheckEnvVar, when set on a process, causes the binary to attempt to open +// the bbolt database at the given path and exit before main() runs. The +// filestorage extension uses this to isolate bbolt panics (some of which occur +// in goroutines spawned inside bbolt.Open and therefore cannot be recovered +// from in-process) to a short-lived subprocess. +const precheckEnvVar = "OTELCOL_FILESTORAGE_PRECHECK_DB" + +// precheckTimeoutEnvVar overrides the bbolt.Open timeout used inside the +// subprocess. The value must be a Go duration (e.g. "5s"). +const precheckTimeoutEnvVar = "OTELCOL_FILESTORAGE_PRECHECK_TIMEOUT" + +// Subprocess exit codes. The Go runtime exits with 2 on an unrecovered panic, +// so distinct codes are used for non-panic outcomes. +const ( + precheckExitOK = 0 // bbolt.Open + Close succeeded. + precheckExitPanic = 2 // Set implicitly by the Go runtime on panic. + precheckExitOpenError = 3 // bbolt.Open returned a non-panic error. +) + +// errDBCorruption is returned by runPrecheck when the subprocess is observed +// to have crashed with the runtime panic exit code, which strongly suggests +// the bbolt database is corrupt in a way bbolt cannot open without panicking. +var errDBCorruption = errors.New("filestorage: bbolt database appears corrupt") + +func init() { + runPrecheckIfRequested() +} + +// runPrecheckIfRequested is called from init(). When precheckEnvVar is set, +// the process attempts to open the bbolt database at the configured path and +// exits immediately. main() is never reached, so the rest of the collector +// never starts up in this short-lived child process. +// +// If bbolt.Open panics in a spawned goroutine, the Go runtime terminates the +// process with exit code 2; the parent process treats that exit code as a +// corruption signal. +func runPrecheckIfRequested() { + path := os.Getenv(precheckEnvVar) + if path == "" { + return + } + + timeout := 5 * time.Second + if raw := os.Getenv(precheckTimeoutEnvVar); raw != "" { + if parsed, err := time.ParseDuration(raw); err == nil && parsed > 0 { + timeout = parsed + } + } + + opts := &bbolt.Options{ + Timeout: timeout, + NoFreelistSync: true, + FreelistType: bbolt.FreelistMapType, + } + + db, err := bbolt.Open(path, 0o600, opts) + if err != nil { + fmt.Fprintf(os.Stderr, "filestorage precheck: bbolt.Open returned error: %v\n", err) + os.Exit(precheckExitOpenError) + } + if closeErr := db.Close(); closeErr != nil { + fmt.Fprintf(os.Stderr, "filestorage precheck: bbolt.Close returned error: %v\n", closeErr) + os.Exit(precheckExitOpenError) + } + os.Exit(precheckExitOK) +} + +// runPrecheck attempts to open the bbolt database at path in a subprocess. +// It returns: +// +// - nil if the database opened cleanly, +// - errDBCorruption if the subprocess crashed with the runtime panic exit code +// (strong signal of bbolt corruption that cannot be opened in-process), or +// - a non-nil non-corruption error for any other failure (e.g. file lock, +// permission denied, subprocess could not be started). +// +// The caller should only treat errDBCorruption as a reason to recreate the +// database. Other errors should typically be surfaced to the user. +func runPrecheck(ctx context.Context, path string, bboltTimeout time.Duration) error { + exe, err := os.Executable() + if err != nil { + return fmt.Errorf("determine executable path: %w", err) + } + + // Allow the subprocess slightly more time than the bbolt timeout to start + // up, parse options, and exit cleanly. + subprocessTimeout := max(bboltTimeout+10*time.Second, 15*time.Second) + + cmdCtx, cancel := context.WithTimeout(ctx, subprocessTimeout) + defer cancel() + + cmd := exec.CommandContext(cmdCtx, exe) + cmd.Env = append(os.Environ(), + precheckEnvVar+"="+path, + precheckTimeoutEnvVar+"="+bboltTimeout.String(), + ) + cmd.Stdout = io.Discard + cmd.Stderr = io.Discard + + runErr := cmd.Run() + if runErr == nil { + return nil + } + + var exitErr *exec.ExitError + if errors.As(runErr, &exitErr) { + switch exitErr.ExitCode() { + case precheckExitPanic: + return errDBCorruption + case precheckExitOpenError: + return fmt.Errorf("precheck: subprocess could not open database: %w", runErr) + default: + return fmt.Errorf("precheck: subprocess exited with code %d: %w", exitErr.ExitCode(), runErr) + } + } + return fmt.Errorf("precheck: subprocess failed to run: %w", runErr) +} diff --git a/extension/storage/filestorage/precheck_test.go b/extension/storage/filestorage/precheck_test.go new file mode 100644 index 0000000000000..1ee9e44f8dd05 --- /dev/null +++ b/extension/storage/filestorage/precheck_test.go @@ -0,0 +1,346 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package filestorage + +import ( + "context" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/extension/extensiontest" + "go.opentelemetry.io/collector/extension/xextension/storage" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" +) + +func TestOpenClient_RecreateDisabled_SkipsPrecheck(t *testing.T) { + ctx := t.Context() + tempDir := t.TempDir() + dbPath := filepath.Join(tempDir, "db.bbolt") + + require.NoError(t, seedBboltFile(dbPath)) + + var precheckCalled bool + lfs := newTestStorage( + defaultRecreateConfig(t, tempDir, false), + zap.NewNop(), + func(context.Context, string, time.Duration) error { + precheckCalled = true + return nil + }, + ) + + client, err := lfs.openClient(ctx, dbPath) + require.NoError(t, err) + require.NotNil(t, client) + require.False(t, precheckCalled, "precheck must not run when Recreate is false") + require.NoError(t, client.Close(ctx)) +} + +func TestOpenClient_RecreateEnabled_MissingFile_SkipsPrecheck(t *testing.T) { + ctx := t.Context() + tempDir := t.TempDir() + dbPath := filepath.Join(tempDir, "db.bbolt") + + var precheckCalled bool + lfs := newTestStorage( + defaultRecreateConfig(t, tempDir, true), + zap.NewNop(), + func(context.Context, string, time.Duration) error { + precheckCalled = true + return nil + }, + ) + + client, err := lfs.openClient(ctx, dbPath) + require.NoError(t, err) + require.NotNil(t, client) + require.False(t, precheckCalled, "precheck must not run when DB file is absent") + require.NoError(t, client.Close(ctx)) +} + +func TestOpenClient_RecreateEnabled_PrecheckClean_PreservesData(t *testing.T) { + ctx := t.Context() + tempDir := t.TempDir() + dbPath := filepath.Join(tempDir, "db.bbolt") + + require.NoError(t, seedBboltFile(dbPath)) + originalSize := fileSize(t, dbPath) + + lfs := newTestStorage( + defaultRecreateConfig(t, tempDir, true), + zap.NewNop(), + func(context.Context, string, time.Duration) error { + return nil + }, + ) + + client, err := lfs.openClient(ctx, dbPath) + require.NoError(t, err) + require.NotNil(t, client) + require.NoError(t, client.Close(ctx)) + + require.NoFileExists(t, findBackup(t, tempDir), "no backup should be created on clean precheck") + require.Equal(t, originalSize, fileSize(t, dbPath)) +} + +func TestOpenClient_RecreateEnabled_PrecheckCorruption_RenamesFile(t *testing.T) { + ctx := t.Context() + tempDir := t.TempDir() + dbPath := filepath.Join(tempDir, "db.bbolt") + + require.NoError(t, seedBboltFile(dbPath)) + + logCore, logObserver := observer.New(zap.WarnLevel) + + lfs := newTestStorage( + defaultRecreateConfig(t, tempDir, true), + zap.New(logCore), + func(context.Context, string, time.Duration) error { + return errDBCorruption + }, + ) + + client, err := lfs.openClient(ctx, dbPath) + require.NoError(t, err) + require.NotNil(t, client) + t.Cleanup(func() { require.NoError(t, client.Close(ctx)) }) + + backupPath := findBackup(t, tempDir) + require.NotEmpty(t, backupPath, "corrupt database should be renamed") + require.FileExists(t, backupPath) + require.True(t, strings.HasSuffix(backupPath, ".backup"), + "backup file must end with .backup suffix, got %s", backupPath) + + require.NotEmpty(t, logObserver.FilterMessageSnippet("Renamed corrupt bbolt database").All(), + "expected log entry about renaming the corrupt database") +} + +func TestOpenClient_RecreateEnabled_MainGoroutinePanic_Recovers(t *testing.T) { + ctx := t.Context() + tempDir := t.TempDir() + dbPath := filepath.Join(tempDir, "db.bbolt") + + require.NoError(t, seedBboltFile(dbPath)) + + logCore, logObserver := observer.New(zap.WarnLevel) + + var calls int + lfs := newTestStorage( + defaultRecreateConfig(t, tempDir, true), + zap.New(logCore), + func(context.Context, string, time.Duration) error { + return nil + }, + ) + lfs.newClientFn = func(logger *zap.Logger, filePath string, timeout time.Duration, c *CompactionConfig, noSync bool) (*fileStorageClient, error) { + calls++ + if calls == 1 { + panic("simulated bbolt main-goroutine panic") + } + return newClient(logger, filePath, timeout, c, noSync) + } + + client, err := lfs.openClient(ctx, dbPath) + require.NoError(t, err, "openClient must swallow the panic and recover via rename + retry") + require.NotNil(t, client) + t.Cleanup(func() { require.NoError(t, client.Close(ctx)) }) + + require.Equal(t, 2, calls, "newClientFn must be called twice: once panicking, once on the fresh file") + require.NotEmpty(t, findBackup(t, tempDir), + "main-goroutine panic must trigger rename of the corrupt file") + require.NotEmpty(t, + logObserver.FilterMessageSnippet("bbolt.Open panicked in main goroutine").All(), + "expected log entry about the recovered panic") + require.NotEmpty(t, + logObserver.FilterMessageSnippet("Renamed corrupt bbolt database").All(), + "expected log entry about the rename") +} + +func TestOpenClient_RecreateEnabled_PrecheckTransientError_DoesNotRename(t *testing.T) { + ctx := t.Context() + tempDir := t.TempDir() + dbPath := filepath.Join(tempDir, "db.bbolt") + + require.NoError(t, seedBboltFile(dbPath)) + originalSize := fileSize(t, dbPath) + + logCore, logObserver := observer.New(zap.WarnLevel) + + lfs := newTestStorage( + defaultRecreateConfig(t, tempDir, true), + zap.New(logCore), + func(context.Context, string, time.Duration) error { + return os.ErrPermission + }, + ) + + client, err := lfs.openClient(ctx, dbPath) + require.NoError(t, err) + require.NotNil(t, client) + require.NoError(t, client.Close(ctx)) + + require.NoFileExists(t, findBackup(t, tempDir), + "transient errors must not trigger rename") + require.Equal(t, originalSize, fileSize(t, dbPath)) + require.NotEmpty(t, + logObserver.FilterMessage("Database pre-check returned non-corruption error; proceeding with open").All(), + "expected log entry about transient precheck error") +} + +func TestRunPrecheck_RealSubprocess_OnCleanDatabase(t *testing.T) { + if testing.Short() { + t.Skip("skipping subprocess test in short mode") + } + + tempDir := t.TempDir() + dbPath := filepath.Join(tempDir, "clean.bbolt") + require.NoError(t, seedBboltFile(dbPath)) + + ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second) + defer cancel() + + require.NoError(t, runPrecheck(ctx, dbPath, time.Second), + "precheck on a healthy bbolt file should return nil") +} + +func TestRunPrecheck_RealSubprocess_OnMissingFile(t *testing.T) { + if testing.Short() { + t.Skip("skipping subprocess test in short mode") + } + + tempDir := t.TempDir() + missingPath := filepath.Join(tempDir, "does-not-exist.bbolt") + + ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second) + defer cancel() + + // bbolt.Open creates the file when it does not exist, so this should succeed. + require.NoError(t, runPrecheck(ctx, missingPath, time.Second)) +} + +func TestExtensionRecreate_FactoryWiresPrecheck(t *testing.T) { + ctx := t.Context() + f := NewFactory() + + cfg := f.CreateDefaultConfig().(*Config) + cfg.Directory = t.TempDir() + cfg.Recreate = true + + ext, err := f.Create(ctx, extensiontest.NewNopSettings(f.Type()), cfg) + require.NoError(t, err) + + lfs, ok := ext.(*localFileStorage) + require.True(t, ok, "factory must return *localFileStorage") + require.NotNil(t, lfs.precheckFn, "factory must wire a default precheckFn") +} + +func TestExtensionRecreate_EndToEndWithInjectedPrecheck(t *testing.T) { + ctx := t.Context() + tempDir := t.TempDir() + + f := NewFactory() + cfg := f.CreateDefaultConfig().(*Config) + cfg.Directory = tempDir + + { + ext, err := f.Create(ctx, extensiontest.NewNopSettings(f.Type()), cfg) + require.NoError(t, err) + + lfs := ext.(*localFileStorage) + lfs.precheckFn = func(context.Context, string, time.Duration) error { return nil } + + se := ext.(storage.Extension) + client, err := se.GetClient(ctx, component.KindReceiver, component.MustNewID("filelog"), "") + require.NoError(t, err) + require.NoError(t, client.Set(ctx, "key", []byte("val"))) + require.NoError(t, client.Close(ctx)) + require.NoError(t, ext.Shutdown(ctx)) + } + + cfg.Recreate = true + { + ext, err := f.Create(ctx, extensiontest.NewNopSettings(f.Type()), cfg) + require.NoError(t, err) + + lfs := ext.(*localFileStorage) + lfs.precheckFn = func(context.Context, string, time.Duration) error { + return errDBCorruption + } + + se := ext.(storage.Extension) + client, err := se.GetClient(ctx, component.KindReceiver, component.MustNewID("filelog"), "") + require.NoError(t, err) + + val, err := client.Get(ctx, "key") + require.NoError(t, err) + require.Nil(t, val, "data should be gone after corruption-triggered recreate") + + require.NoError(t, client.Close(ctx)) + require.NoError(t, ext.Shutdown(ctx)) + } + + require.NotEmpty(t, findBackup(t, tempDir), "a .backup file must exist after corruption recovery") +} + +func defaultRecreateConfig(t *testing.T, dir string, recreate bool) *Config { + t.Helper() + cfg := NewFactory().CreateDefaultConfig().(*Config) + cfg.Directory = dir + cfg.Recreate = recreate + return cfg +} + +func newTestStorage(cfg *Config, logger *zap.Logger, precheck func(context.Context, string, time.Duration) error) *localFileStorage { + return &localFileStorage{ + cfg: cfg, + logger: logger, + precheckFn: precheck, + newClientFn: newClient, + } +} + +func seedBboltFile(path string) error { + db, err := bbolt.Open(path, 0o600, &bbolt.Options{ + Timeout: time.Second, + NoFreelistSync: true, + FreelistType: bbolt.FreelistMapType, + }) + if err != nil { + return err + } + if err := db.Update(func(tx *bbolt.Tx) error { + _, e := tx.CreateBucketIfNotExists([]byte("default")) + return e + }); err != nil { + _ = db.Close() + return err + } + return db.Close() +} + +func findBackup(t *testing.T, dir string) string { + t.Helper() + entries, err := os.ReadDir(dir) + require.NoError(t, err) + for _, e := range entries { + if strings.HasSuffix(e.Name(), ".backup") { + return filepath.Join(dir, e.Name()) + } + } + return "" +} + +func fileSize(t *testing.T, path string) int64 { + t.Helper() + info, err := os.Stat(path) + require.NoError(t, err) + return info.Size() +}