Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions .chloggen/filestorage-recreate-subprocess-precheck.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: extension/file_storage

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Recover from additional bbolt panics when the `recreate` option is enabled, so the collector no longer crash-loops on certain corrupt storage files.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35899]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Previously, `recreate` only caught bbolt panics raised on the calling goroutine. Panics raised on
goroutines spawned inside `bbolt.Open` (such as the `freepages: failed to get all reachable pages`
panic) would still terminate the collector. Now `recreate` will also catch these panics and rename
the corrupt database to `{filename}.{timestamp}.backup` to create a fresh database in its place,
so the collector can continue operating normally.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
7 changes: 5 additions & 2 deletions extension/storage/filestorage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ The default timeout is `1s`.
By default, the directories will be created with `0750 (rwxr-x---)` permissions, minus the process umask.
Use `directory_permissions` to customize directory creation permissions, minus the process umask.

`recreate` when set, the filestorage extension will automatically rename the corrupted bbolt database and create a new one when certain bbolt panics occur.
See (#35899)[https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35899] for more details.
`recreate` when set, the filestorage extension will automatically rename the corrupted bbolt database and create a new one when certain bbolt panics occur.
See [#35899](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35899) and [#48565](https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/48565) for more details.

If the database fails to open due to corruption (resulting in a panic), the corrupted file will be automatically renamed to `{filename}.{ISO 8601 timestamp}.backup` and a new data file will be created from scratch. This allows the collector to continue operating even when encountering certain bbolt panics. If no corruption is detected, the existing database continues to be used normally.
There may still be scenarios where manually removing or renaming the file may be required, and this feature flag is not a panacea for all bbolt panics you can encounter.
Expand All @@ -49,6 +49,9 @@ There may still be scenarios where manually removing or renaming the file may be

`compaction.directory` specifies the directory used for compaction (as a midstep).

> [!Warning]
> When using multiple `file_storage` extension instances, each must have a unique `compaction.directory`. Concurrent compactions sharing the same directory can stomp on each other's temporary files (`tempdb*`) and produce corrupt databases.

`compaction.max_transaction_size` (default: 65536): defines maximum size of the compaction transaction.
A value of zero will ignore transaction sizes.

Expand Down
117 changes: 77 additions & 40 deletions extension/storage/filestorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@ import (
type localFileStorage struct {
cfg *Config
logger *zap.Logger

// precheckFn runs the corruption pre-check for a bbolt database file when
// Recreate is enabled. It is a struct field so tests can substitute a fake
// implementation that does not spawn a subprocess.
precheckFn func(ctx context.Context, path string, timeout time.Duration) error

// newClientFn opens (or creates) the underlying bbolt client. It is a
// struct field so tests can substitute a fake that simulates a panic on
// the first call, allowing the defer/recover path in openClient to be
// exercised without constructing a real corrupt bbolt file.
newClientFn func(logger *zap.Logger, filePath string, timeout time.Duration, compactionCfg *CompactionConfig, noSync bool) (*fileStorageClient, error)
}

// Ensure this storage extension implements the appropriate interface
Expand All @@ -44,8 +55,10 @@ func newLocalFileStorage(logger *zap.Logger, config *Config) (extension.Extensio
}
}
return &localFileStorage{
cfg: config,
logger: logger,
cfg: config,
logger: logger,
precheckFn: runPrecheck,
newClientFn: newClient,
}, nil
}

Expand All @@ -65,7 +78,7 @@ func (*localFileStorage) Shutdown(context.Context) error {
}

// GetClient returns a storage client for an individual component
func (lfs *localFileStorage) GetClient(_ context.Context, kind component.Kind, ent component.ID, name string) (storage.Client, error) {
func (lfs *localFileStorage) GetClient(ctx context.Context, kind component.Kind, ent component.ID, name string) (storage.Client, error) {
var rawName string
if name == "" {
rawName = fmt.Sprintf("%s_%s_%s", kindString(kind), ent.Type(), ent.Name())
Expand All @@ -76,23 +89,21 @@ func (lfs *localFileStorage) GetClient(_ context.Context, kind component.Kind, e
rawName = sanitize(rawName)
absoluteName := filepath.Join(lfs.cfg.Directory, rawName)

// Try to create client, handling panics if recreate is enabled
client, err := lfs.createClientWithPanicRecovery(absoluteName)
client, err := lfs.openClient(ctx, absoluteName)

// If the error is due to filename being too long, truncate and try again
if errors.Is(err, syscall.ENAMETOOLONG) {
hashedName := filepath.Join(lfs.cfg.Directory, hash(rawName))
lfs.logger.Warn("filename too long, using hashed filename instead",
zap.String("originalFile", absoluteName), zap.String("component", rawName), zap.String("hashedFileName", hashedName))
client, err = lfs.createClientWithPanicRecovery(hashedName)
client, err = lfs.openClient(ctx, hashedName)
}

// return error if still not successful
if err != nil {
return nil, err
}

// return if compaction is not required
if lfs.cfg.Compaction.OnStart {
compactionErr := client.Compact(lfs.cfg.Compaction.Directory, lfs.cfg.Timeout, lfs.cfg.Compaction.MaxTransactionSize)
if compactionErr != nil {
Expand All @@ -103,43 +114,69 @@ func (lfs *localFileStorage) GetClient(_ context.Context, kind component.Kind, e
return client, nil
}

// createClientWithPanicRecovery attempts to create a client, and if recreate is enabled
// and a panic occurs (typically due to database corruption), it will rename the file
// and try again with a fresh database
func (lfs *localFileStorage) createClientWithPanicRecovery(absoluteName string) (client *fileStorageClient, err error) {
// First attempt: try to create client normally
if !lfs.cfg.Recreate {
// If recreate is disabled, just try once
return newClient(lfs.logger, absoluteName, lfs.cfg.Timeout, lfs.cfg.Compaction, !lfs.cfg.FSync)
}

// If recreate is enabled, handle potential panics during database opening
defer func() {
if r := recover(); r != nil {
lfs.logger.Warn("Database corruption detected, recreating database file",
zap.String("file", absoluteName),
zap.Any("panic", r))

// Rename the corrupted file with ISO 8601 timestamp
timestamp := time.Now().Format("2006-01-02T15:04:05.000")
backupName := absoluteName + "." + timestamp + ".backup"
if renameErr := os.Rename(absoluteName, backupName); renameErr != nil {
err = fmt.Errorf("error renaming corrupted database. Please remove %s manually: %w", absoluteName, renameErr)
return
// openClient opens (or creates) the bbolt database for a single client.
//
// When Recreate is enabled and the database file already exists, corruption
// recovery is performed in two layers:
//
// 1. Subprocess pre-check: catches panics raised inside goroutines spawned by
// bbolt.Open (e.g. the freepages "multiple references" panic), which the
// in-process defer/recover below cannot catch because Go's recover() only
// catches panics in its own goroutine.
// 2. In-process defer/recover around bbolt.Open: catches panics raised in the
// main goroutine (e.g. freepages' "failed to open read only tx" or "failed
// to rollback tx" panics, or panics from tx.recursivelyCheckBucket itself
// prior to the spawned goroutine reading from the error channel).
//
// On either signal of corruption, the file is renamed aside and a fresh
// database is opened in its place.
func (lfs *localFileStorage) openClient(ctx context.Context, absoluteName string) (client *fileStorageClient, err error) {
if lfs.cfg.Recreate {
if _, statErr := os.Stat(absoluteName); statErr == nil {
precheckErr := lfs.precheckFn(ctx, absoluteName, lfs.cfg.Timeout)
switch {
case precheckErr == nil:
// Database opened cleanly in the subprocess; safe to open here.
case errors.Is(precheckErr, errDBCorruption):
if renameErr := lfs.renameCorruptDB(absoluteName); renameErr != nil {
return nil, renameErr
}
default:
lfs.logger.Warn("Database pre-check returned non-corruption error; proceeding with open",
zap.String("file", absoluteName),
zap.Error(precheckErr))
}
}

lfs.logger.Info("Corrupted database file renamed",
zap.String("original", absoluteName),
zap.String("backup", backupName))
defer func() {
if r := recover(); r != nil {
lfs.logger.Warn("bbolt.Open panicked in main goroutine; recreating database",
zap.String("file", absoluteName),
zap.Any("panic", r))
if renameErr := lfs.renameCorruptDB(absoluteName); renameErr != nil {
err = renameErr
return
}
client, err = lfs.newClientFn(lfs.logger, absoluteName, lfs.cfg.Timeout, lfs.cfg.Compaction, !lfs.cfg.FSync)
}
}()
}

// Try to create client again with fresh database
client, err = newClient(lfs.logger, absoluteName, lfs.cfg.Timeout, lfs.cfg.Compaction, !lfs.cfg.FSync)
}
}()
return lfs.newClientFn(lfs.logger, absoluteName, lfs.cfg.Timeout, lfs.cfg.Compaction, !lfs.cfg.FSync)
}

// Try to create the client normally first
client, err = newClient(lfs.logger, absoluteName, lfs.cfg.Timeout, lfs.cfg.Compaction, !lfs.cfg.FSync)
return client, err
// renameCorruptDB moves a corrupt bbolt database file aside with a timestamped
// suffix so that a fresh database can be created in its place.
func (lfs *localFileStorage) renameCorruptDB(path string) error {
timestamp := time.Now().Format("2006-01-02T15:04:05.000")
backup := path + "." + timestamp + ".backup"
if err := os.Rename(path, backup); err != nil {
return fmt.Errorf("rename corrupt database %s -> %s (please remove manually): %w", path, backup, err)
}
lfs.logger.Warn("Renamed corrupt bbolt database; a fresh database will be created",
zap.String("original", path),
zap.String("backup", backup))
return nil
}

func kindString(k component.Kind) string {
Expand Down
15 changes: 11 additions & 4 deletions extension/storage/filestorage/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
package filestorage

import (
"context"
"fmt"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -667,26 +669,31 @@ func TestRecreate(t *testing.T) {
require.NoError(t, ext.Shutdown(ctx))
}

// step 3: re-create the extension, but with Recreate=true and make sure that the data still exists
// (since recreate now only happens on panic, not always when recreate=true)
// step 3: re-create the extension with Recreate=true and make sure that
// the data still exists (the precheck must not trigger a rename on a
// healthy database). A fake precheck is injected so this test does not
// spawn a real subprocess.
{
config.Recreate = true
ext, err := f.Create(ctx, extensiontest.NewNopSettings(f.Type()), config)
require.NoError(t, err)
require.NotNil(t, ext)

lfs, ok := ext.(*localFileStorage)
require.True(t, ok)
lfs.precheckFn = func(context.Context, string, time.Duration) error { return nil }

se, ok := ext.(storage.Extension)
require.True(t, ok)

client, err := se.GetClient(ctx, component.KindReceiver, component.MustNewID("file_log"), "")
require.NoError(t, err)
require.NotNil(t, client)

// The data should still exist since no panic occurred
val, err := client.Get(ctx, "key")
require.Equal(t, val, []byte("val"))
require.NoError(t, err)

// close the extension
require.NoError(t, client.Close(ctx))
require.NoError(t, ext.Shutdown(ctx))
}
Expand Down
Loading
Loading