diff --git a/config/examples/streaming_file_tail.yaml b/config/examples/streaming_file_tail.yaml new file mode 100644 index 0000000000..b66eda4f60 --- /dev/null +++ b/config/examples/streaming_file_tail.yaml @@ -0,0 +1,39 @@ +# Streaming File Input Example +# ============================ +# This example demonstrates using streaming_file to tail a log file, +# similar to 'tail -F' but with automatic rotation and truncation handling. +# +# Quick Test: +# 1. Create a test file: +# echo "hello world" > /tmp/test.log +# +# 2. Run bento: +# bento -c config/examples/streaming_file_tail.yaml +# +# 3. In another terminal, append lines: +# echo "new line 1" >> /tmp/test.log +# echo "new line 2" >> /tmp/test.log +# +# Performance Note: +# By default, polling-only mode is used (disable_fsnotify: true) which is +# more CPU-efficient for high-volume log files. For lower latency on +# low-volume files, set disable_fsnotify: false. + +input: + streaming_file: + path: /tmp/test.log + poll_interval: 200ms # How often to check for new data + disable_fsnotify: true # true = polling only (CPU efficient) + # false = use fsnotify (lower latency) + +pipeline: + processors: + - mapping: | + # Add timestamp to each line + root.timestamp = now() + root.content = content().string() + root.metadata = @ + +output: + stdout: + codec: lines diff --git a/internal/impl/io/inode_other.go b/internal/impl/io/inode_other.go new file mode 100644 index 0000000000..1ac3f64a52 --- /dev/null +++ b/internal/impl/io/inode_other.go @@ -0,0 +1,13 @@ +//go:build !unix + +package io + +import "os" + +// inodeOf extracts the inode from a FileInfo on non-Unix systems +// On Windows and other non-Unix systems, inodes are not available +// Returns (0, false) to indicate inode support is not available +// Rotation detection will fall back to size-based heuristics +func inodeOf(_ os.FileInfo) (uint64, bool) { + return 0, false +} diff --git a/internal/impl/io/inode_unix.go b/internal/impl/io/inode_unix.go new file mode 100644 index 0000000000..9f3a777eff --- /dev/null +++ b/internal/impl/io/inode_unix.go @@ -0,0 +1,17 @@ +//go:build unix + +package io + +import ( + "os" + "syscall" +) + +// inodeOf extracts the inode from a FileInfo on Unix-like systems +// Returns (inode, true) if inode is available, (0, false) otherwise +func inodeOf(fi os.FileInfo) (uint64, bool) { + if st, ok := fi.Sys().(*syscall.Stat_t); ok { + return st.Ino, true + } + return 0, false +} diff --git a/internal/impl/io/input_streaming_file.go b/internal/impl/io/input_streaming_file.go new file mode 100644 index 0000000000..46b0d3297b --- /dev/null +++ b/internal/impl/io/input_streaming_file.go @@ -0,0 +1,671 @@ +package io + +import ( + "bufio" + "bytes" + "context" + "errors" + "fmt" + "os" + "path/filepath" + "strconv" + "sync" + "time" + + "github.com/Jeffail/shutdown" + "github.com/cenkalti/backoff/v4" + "github.com/fsnotify/fsnotify" + + "github.com/warpstreamlabs/bento/internal/retries" + "github.com/warpstreamlabs/bento/public/service" +) + +// splitKeepNewline is a custom scanner split function that keeps delimiters +var splitKeepNewline = func(data []byte, atEOF bool) (int, []byte, error) { + if i := bytes.IndexByte(data, '\n'); i >= 0 { + return i + 1, data[:i+1], nil + } + if atEOF && len(data) > 0 { + return len(data), data, nil + } + return 0, nil, nil +} + +// StreamingFileInputConfig holds configuration for the streaming file input. +type StreamingFileInputConfig struct { + Path string + MaxBufferSize int + MaxLineSize int + PollInterval time.Duration + DisableFSNotify bool // When true, use polling only (more CPU-efficient at high write rates) + BackoffCtor func() backoff.BackOff +} + +// FilePosition represents the current read position in a file. +// This is exposed as metadata on messages so users can implement their own +// persistence logic using Bento's cache system in their pipelines. +type FilePosition struct { + FilePath string `json:"file_path"` + Inode uint64 `json:"inode"` + ByteOffset int64 `json:"byte_offset"` +} + +// StreamingFileInput reads from a file continuously like 'tail -F'. +type StreamingFileInput struct { + config StreamingFileInputConfig + logger *service.Logger + position FilePosition + positionMutex sync.RWMutex + file *os.File + fileMu sync.RWMutex + reader *bufio.Reader + buffer chan []byte + wg sync.WaitGroup + shutSig *shutdown.Signaller + watcher *fsnotify.Watcher +} + +// streamingFileMetadataDescription returns the metadata documentation block. +func streamingFileMetadataDescription() string { + return ` + +### Metadata + +This input adds the following metadata fields to each message: + +` + "```text" + ` +- streaming_file_path +- streaming_file_inode +- streaming_file_offset +` + "```" + ` + +You can access these metadata fields using +[function interpolation](/docs/configuration/interpolation#bloblang-queries). +` +} + +// NewStreamingFileInput creates a new streaming file input. +func NewStreamingFileInput(cfg StreamingFileInputConfig, logger *service.Logger) (*StreamingFileInput, error) { + if cfg.Path == "" { + return nil, errors.New("path is required") + } + if cfg.MaxBufferSize <= 0 { + cfg.MaxBufferSize = 1000 + } + if cfg.MaxLineSize <= 0 { + cfg.MaxLineSize = 1024 * 1024 // 1MB default + } + if cfg.PollInterval <= 0 { + cfg.PollInterval = 1 * time.Second // Default 1s polling, like tail -F + } + if cfg.BackoffCtor == nil { + cfg.BackoffCtor = func() backoff.BackOff { + boff := backoff.NewExponentialBackOff() + boff.InitialInterval = 50 * time.Millisecond + boff.MaxInterval = 1 * time.Second + boff.MaxElapsedTime = 5 * time.Second + return boff + } + } + + return &StreamingFileInput{ + config: cfg, + logger: logger, + buffer: make(chan []byte, cfg.MaxBufferSize), + shutSig: shutdown.NewSignaller(), + position: FilePosition{ + FilePath: cfg.Path, + }, + }, nil +} + +func streamingFileInputSpec() *service.ConfigSpec { + return service.NewConfigSpec(). + Categories("Local"). + Summary("Streaming file input with log rotation and truncation handling."). + Description(` +Reads from a file continuously with automatic handling of log rotation and truncation. + +## Core Features + +- **Log Rotation**: Detects when a file is rotated (renamed/recreated) via inode changes and seamlessly switches to the new file +- **Truncation**: Detects when a file is truncated and resets to read from the beginning +- **Position Metadata**: Each message includes file position metadata (path, inode, byte_offset) that can be used with Bento's cache system to implement custom persistence + +## Position Tracking + +This component exposes file position as metadata on each message. To implement crash recovery, you can: + +1. Store the position in a cache on each message +2. On startup, read the cached position and use a processor to filter already-processed lines + +This approach keeps the input stateless while enabling persistence through pipeline composition. + +## Performance Considerations + +By default, this component uses polling-only mode for better CPU efficiency at high write rates. +This is based on findings from large-scale deployments where inotify/fsnotify can cause significant +CPU overhead when files are written to frequently (each write triggers an event, leading to excessive +fstat calls). See ` + "`disable_fsnotify`" + ` option below. + +For low-volume log files where you want sub-second latency, you can enable fsnotify by setting +` + "`disable_fsnotify: false`" + `. + +### Platform Limitations + +When fsnotify is enabled: + +- **NFS/Network Filesystems**: fsnotify does not work reliably on NFS or other network filesystems +- **Supported Platforms**: Linux (inotify), macOS (FSEvents), Windows (ReadDirectoryChangesW), BSD variants (kqueue) +- **Container Environments**: Ensure the file path is mounted from the host, not a container-internal path +` + streamingFileMetadataDescription()). + Field(service.NewStringField("path"). + Description("Path to the file to read from."). + Example("/var/log/app.log")). + Field(service.NewIntField("max_buffer_size"). + Description("Maximum number of lines to buffer."). + Default(1000)). + Field(service.NewIntField("max_line_size"). + Description("Maximum line size in bytes to prevent OOM."). + Default(1048576)). + Field(service.NewDurationField("poll_interval"). + Description("How often to poll the file for new data. This is the primary mechanism for detecting new data. Lower values mean lower latency but higher CPU usage."). + Default("1s"). + Example("1s"). + Example("200ms")). + Field(service.NewBoolField("disable_fsnotify"). + Description("When true (default), only use polling to detect file changes. This is more CPU-efficient for high-throughput log files where inotify would fire constantly. Set to false to enable fsnotify for lower latency on low-volume files."). + Default(true)). + Fields(retries.CommonRetryBackOffFields(0, "50ms", "1s", "5s")...). + LintRule(`root = if this.path.or("") == "" { "path is required" }`) +} + +// Connect opens the file and starts monitoring for changes. +func (sfi *StreamingFileInput) Connect(ctx context.Context) error { + file, err := os.Open(sfi.config.Path) + if err != nil { + return fmt.Errorf("failed to open file: %s: %w", sfi.config.Path, err) + } + + stat, err := file.Stat() + if err != nil { + file.Close() + return fmt.Errorf("failed to stat file: %s: %w", sfi.config.Path, err) + } + + currentInode, hasInode := inodeOf(stat) + + sfi.positionMutex.Lock() + if hasInode { + sfi.position.Inode = currentInode + } + sfi.position.ByteOffset = 0 + sfi.positionMutex.Unlock() + + sfi.fileMu.Lock() + sfi.file = file + sfi.reader = bufio.NewReader(file) + sfi.fileMu.Unlock() + + // Only create fsnotify watcher if not disabled. + // Polling-only mode (DisableFSNotify=true) is more CPU-efficient for high-volume logs. + if !sfi.config.DisableFSNotify { + watcher, err := fsnotify.NewWatcher() + if err != nil { + sfi.logger.Warnf("Failed to create fsnotify watcher, falling back to polling: %v", err) + } else { + if err := watcher.Add(sfi.config.Path); err != nil { + watcher.Close() + sfi.logger.Warnf("Failed to watch file, falling back to polling: %v", err) + } else { + parentDir := filepath.Dir(sfi.config.Path) + if err := watcher.Add(parentDir); err != nil { + sfi.logger.Warnf("Failed to watch parent directory '%s', rotation detection may be degraded: %v", parentDir, err) + } + sfi.watcher = watcher + } + } + } + + sfi.wg.Add(1) + go sfi.monitorFile() + + return nil +} + +// monitorFile is the primary goroutine for watching and reading the file. +// Supports two modes: +// - Polling-only (default, DisableFSNotify=true): More CPU-efficient for high-volume logs +// - Event-driven with polling fallback (DisableFSNotify=false): Lower latency for low-volume logs +func (sfi *StreamingFileInput) monitorFile() { + defer sfi.wg.Done() + if sfi.watcher != nil { + defer sfi.watcher.Close() + } + + // Do an initial drain of any existing file content + sfi.drainAvailableData() + + // Polling ticker - the primary mechanism in polling-only mode, + // or a fallback for missed fsnotify events in event-driven mode. + pollInterval := time.NewTicker(sfi.config.PollInterval) + defer pollInterval.Stop() + + // If no watcher, run in pure polling mode (more CPU-efficient) + if sfi.watcher == nil { + sfi.logger.Debugf("Running in polling-only mode (interval: %v)", sfi.config.PollInterval) + for { + select { + case <-pollInterval.C: + sfi.checkStateAndReact() + sfi.drainAvailableData() + case <-sfi.shutSig.SoftStopChan(): + return + } + } + } + + // Event-driven mode with polling fallback + sfi.logger.Debugf("Running in fsnotify mode with polling fallback (interval: %v)", sfi.config.PollInterval) + for { + select { + case event, ok := <-sfi.watcher.Events: + if !ok { + return + } + + if event.Has(fsnotify.Write) && event.Name == sfi.config.Path { + sfi.drainAvailableData() + } + + if event.Has(fsnotify.Create) && event.Name == sfi.config.Path { + if err := sfi.handleRotation(); err != nil { + sfi.logger.Errorf("Error handling rotation: %v", err) + } + } + + if event.Has(fsnotify.Rename) || event.Has(fsnotify.Remove) { + time.Sleep(100 * time.Millisecond) + sfi.checkStateAndReact() + } + + case <-pollInterval.C: + // Fallback polling: check for rotation/truncation AND try to read + // any new data. This handles cases where fsnotify misses events. + sfi.checkStateAndReact() + sfi.drainAvailableData() + + case err, ok := <-sfi.watcher.Errors: + if !ok { + return + } + sfi.logger.Errorf("fsnotify watcher error: %v", err) + + case <-sfi.shutSig.SoftStopChan(): + return + } + } +} + +// checkStateAndReact performs a stat check to detect rotation or truncation. +func (sfi *StreamingFileInput) checkStateAndReact() { + rotated, truncated, err := sfi.detectFileChanges() + if err != nil { + sfi.logger.Warnf("Error during state check: %v", err) + return + } + if rotated { + if err := sfi.handleRotation(); err != nil { + sfi.logger.Errorf("Error handling rotation: %v", err) + } + } else if truncated { + if err := sfi.handleTruncation(); err != nil { + sfi.logger.Errorf("Error handling truncation: %v", err) + } + } +} + +// detectFileChanges checks for rotation and truncation using inode comparison. +func (sfi *StreamingFileInput) detectFileChanges() (rotated, truncated bool, err error) { + currentStat, err := os.Stat(sfi.config.Path) + if err != nil { + if os.IsNotExist(err) { + return true, false, nil + } + return false, false, err + } + + currentInode, _ := inodeOf(currentStat) + currentSize := currentStat.Size() + + sfi.positionMutex.RLock() + lastInode := sfi.position.Inode + lastOffset := sfi.position.ByteOffset + sfi.positionMutex.RUnlock() + + if currentInode != 0 && lastInode != 0 && currentInode != lastInode { + return true, false, nil + } + + if currentInode == lastInode && currentSize < lastOffset { + sfi.logger.Warnf("File truncation detected: current size=%d is less than last offset=%d", currentSize, lastOffset) + return false, true, nil + } + + return false, false, nil +} + +// handleTruncation resets the position for the current file. +func (sfi *StreamingFileInput) handleTruncation() error { + sfi.logger.Infof("Handling file truncation, resetting position to zero") + + sfi.positionMutex.Lock() + sfi.position.ByteOffset = 0 + sfi.positionMutex.Unlock() + + sfi.drainBufferChannel() + + sfi.fileMu.Lock() + if sfi.file != nil { + if _, err := sfi.file.Seek(0, 0); err != nil { + sfi.logger.Errorf("Failed to seek to start after truncation, will reopen: %v", err) + err2 := sfi.reopenFileLocked() + sfi.fileMu.Unlock() + return err2 + } + sfi.reader.Reset(sfi.file) + } + sfi.fileMu.Unlock() + + sfi.drainAvailableData() + + return nil +} + +// handleRotation manages the full file rotation process. +func (sfi *StreamingFileInput) handleRotation() error { + sfi.logger.Infof("File rotation detected, handling transition") + + sfi.fileMu.Lock() + if sfi.file != nil { + sfi.logger.Debugf("Draining remaining data from old file handle before closing") + sfi.drainFromReaderLocked() + sfi.file.Close() + sfi.file = nil + sfi.reader = nil + } + sfi.fileMu.Unlock() + + // Use configurable backoff for retrying file open after rotation + boff := sfi.config.BackoffCtor() + var err error + for { + if err = sfi.reopenFile(); err == nil { + break + } + if !os.IsNotExist(err) { + sfi.logger.Errorf("Failed to open new file after rotation: %v", err) + if sfi.watcher != nil { + if err2 := sfi.watcher.Add(sfi.config.Path); err2 != nil { + sfi.logger.Warnf("Failed to re-add file to watcher after rotation: %v", err2) + } + } + return err + } + wait := boff.NextBackOff() + if wait == backoff.Stop { + sfi.logger.Errorf("Failed to open new file after rotation after retries: %v", err) + if sfi.watcher != nil { + if err2 := sfi.watcher.Add(sfi.config.Path); err2 != nil { + sfi.logger.Warnf("Failed to re-add file to watcher after rotation: %v", err2) + } + } + return err + } + select { + case <-time.After(wait): + case <-sfi.shutSig.SoftStopChan(): + return nil + } + } + + if sfi.watcher != nil { + if err := sfi.watcher.Add(sfi.config.Path); err != nil { + sfi.logger.Warnf("Failed to re-add file to watcher after rotation: %v", err) + } + } + + sfi.drainAvailableData() + + sfi.logger.Infof("Successfully switched to new file after rotation") + return nil +} + +// reopenFile opens the configured path and updates the position. +func (sfi *StreamingFileInput) reopenFile() error { + sfi.fileMu.Lock() + defer sfi.fileMu.Unlock() + return sfi.reopenFileLocked() +} + +// reopenFileLocked opens the configured path (assumes lock is held). +func (sfi *StreamingFileInput) reopenFileLocked() error { + file, err := os.Open(sfi.config.Path) + if err != nil { + return err + } + + info, err := file.Stat() + if err != nil { + file.Close() + return err + } + + newInode, _ := inodeOf(info) + + sfi.positionMutex.Lock() + sfi.position.Inode = newInode + sfi.position.ByteOffset = 0 + sfi.positionMutex.Unlock() + + sfi.file = file + sfi.reader = bufio.NewReader(file) + return nil +} + +// drainAvailableData reads data from the file and buffers it. +func (sfi *StreamingFileInput) drainAvailableData() { + sfi.fileMu.RLock() + file := sfi.file + reader := sfi.reader + sfi.fileMu.RUnlock() + + if reader == nil || file == nil { + return + } + + rotated, truncated, err := sfi.detectFileChanges() + if err != nil { + sfi.logger.Errorf("Error detecting file changes: %v", err) + } + if rotated { + if err := sfi.handleRotation(); err != nil { + sfi.logger.Errorf("Error handling rotation: %v", err) + } + return + } + if truncated { + if err := sfi.handleTruncation(); err != nil { + sfi.logger.Errorf("Error handling truncation: %v", err) + } + return + } + + sfi.drainFromReader(reader) +} + +// drainFromReader reads lines from the reader and buffers them. +func (sfi *StreamingFileInput) drainFromReader(reader *bufio.Reader) { + scanner := bufio.NewScanner(reader) + maxScanTokenSize := sfi.config.MaxLineSize + 1024 + buf := make([]byte, 0, 64*1024) + scanner.Buffer(buf, maxScanTokenSize) + scanner.Split(splitKeepNewline) + + for scanner.Scan() { + lineBytes := scanner.Bytes() + if len(lineBytes) == 0 { + continue + } + + lineCopy := make([]byte, len(lineBytes)) + copy(lineCopy, lineBytes) + + select { + case sfi.buffer <- lineCopy: + case <-sfi.shutSig.SoftStopChan(): + return + } + } + + if err := scanner.Err(); err != nil && err != bufio.ErrTooLong { + sfi.logger.Warnf("Error while draining data: %v", err) + } +} + +// drainFromReaderLocked drains data from the reader (assumes fileMu is held). +func (sfi *StreamingFileInput) drainFromReaderLocked() { + if sfi.reader == nil || sfi.file == nil { + return + } + sfi.drainFromReader(sfi.reader) +} + +// drainBufferChannel drains all pending data from the buffer channel. +func (sfi *StreamingFileInput) drainBufferChannel() { + for { + select { + case _, ok := <-sfi.buffer: + if !ok { + return + } + default: + return + } + } +} + +// Read returns the next message from the file. +// Like tail -F, this blocks indefinitely until data is available, +// the input is closed, or the context is cancelled. +func (sfi *StreamingFileInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error) { + select { + case lineBytes, ok := <-sfi.buffer: + if !ok { + return nil, nil, service.ErrEndOfInput + } + return sfi.createMessage(lineBytes) + case <-ctx.Done(): + return nil, nil, ctx.Err() + case <-sfi.shutSig.SoftStopChan(): + // On shutdown, drain any remaining buffered data + select { + case lineBytes, ok := <-sfi.buffer: + if !ok { + return nil, nil, service.ErrEndOfInput + } + return sfi.createMessage(lineBytes) + default: + return nil, nil, service.ErrEndOfInput + } + } +} + +// createMessage creates a message with position metadata. +func (sfi *StreamingFileInput) createMessage(lineBytes []byte) (*service.Message, service.AckFunc, error) { + sfi.positionMutex.RLock() + pos := FilePosition{ + FilePath: sfi.position.FilePath, + Inode: sfi.position.Inode, + ByteOffset: sfi.position.ByteOffset, + } + sfi.positionMutex.RUnlock() + + delta := int64(len(lineBytes)) + + sfi.positionMutex.Lock() + sfi.position.ByteOffset += delta + sfi.positionMutex.Unlock() + + msg := service.NewMessage(lineBytes) + msg.MetaSet("streaming_file_path", pos.FilePath) + msg.MetaSet("streaming_file_inode", strconv.FormatUint(pos.Inode, 10)) + msg.MetaSet("streaming_file_offset", strconv.FormatInt(pos.ByteOffset, 10)) + + return msg, func(_ context.Context, _ error) error { + return nil + }, nil +} + +// Close shuts down the input. +func (sfi *StreamingFileInput) Close(ctx context.Context) error { + sfi.shutSig.TriggerSoftStop() + + sfi.fileMu.Lock() + if sfi.file != nil { + sfi.file.Close() + sfi.file = nil + } + sfi.fileMu.Unlock() + + sfi.wg.Wait() + close(sfi.buffer) + + sfi.logger.Debugf("Streaming file input closed successfully") + return nil +} + +func init() { + err := service.RegisterInput("streaming_file", streamingFileInputSpec(), + func(pConf *service.ParsedConfig, res *service.Resources) (service.Input, error) { + path, err := pConf.FieldString("path") + if err != nil { + return nil, err + } + maxBufferSize, err := pConf.FieldInt("max_buffer_size") + if err != nil { + return nil, err + } + maxLineSize, err := pConf.FieldInt("max_line_size") + if err != nil { + return nil, err + } + pollInterval, err := pConf.FieldDuration("poll_interval") + if err != nil { + return nil, err + } + disableFSNotify, err := pConf.FieldBool("disable_fsnotify") + if err != nil { + return nil, err + } + + backoffCtor, err := retries.CommonRetryBackOffCtorFromParsed(pConf) + if err != nil { + return nil, fmt.Errorf("failed to parse backoff config: %w", err) + } + + cfg := StreamingFileInputConfig{ + Path: path, + MaxBufferSize: maxBufferSize, + MaxLineSize: maxLineSize, + PollInterval: pollInterval, + DisableFSNotify: disableFSNotify, + BackoffCtor: backoffCtor, + } + + return NewStreamingFileInput(cfg, res.Logger()) + }) + if err != nil { + panic(err) + } +} diff --git a/internal/impl/io/input_streaming_file_test.go b/internal/impl/io/input_streaming_file_test.go new file mode 100644 index 0000000000..f874f7857f --- /dev/null +++ b/internal/impl/io/input_streaming_file_test.go @@ -0,0 +1,406 @@ +package io_test + +import ( + "bytes" + "context" + "fmt" + "os" + "path/filepath" + "runtime" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/internal/impl/io" +) + +// stripNewline removes trailing newline from line bytes for comparison +// The custom scanner keeps delimiters, so we need to strip them for testing +func stripNewline(b []byte) string { + return string(bytes.TrimSuffix(b, []byte("\n"))) +} + +func TestStreamingFileInput_BasicReading(t *testing.T) { + tmpDir := t.TempDir() + filePath := filepath.Join(tmpDir, "test.log") + + // Create test file + testData := "line1\nline2\nline3\n" + require.NoError(t, os.WriteFile(filePath, []byte(testData), 0644)) + + cfg := io.StreamingFileInputConfig{ + Path: filePath, + MaxBufferSize: 10, + MaxLineSize: 1024 * 1024, + } + + input, err := io.NewStreamingFileInput(cfg, nil) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + require.NoError(t, input.Connect(ctx)) + defer input.Close(ctx) + + // Give event loop time to process initial file + time.Sleep(100 * time.Millisecond) + + // Read lines + msg1, ack1, err := input.Read(ctx) + require.NoError(t, err) + b1, err := msg1.AsBytes() + require.NoError(t, err) + assert.Equal(t, "line1", stripNewline(b1)) + require.NoError(t, ack1(ctx, nil)) + + msg2, ack2, err := input.Read(ctx) + require.NoError(t, err) + b2, err := msg2.AsBytes() + require.NoError(t, err) + assert.Equal(t, "line2", stripNewline(b2)) + require.NoError(t, ack2(ctx, nil)) + + msg3, ack3, err := input.Read(ctx) + require.NoError(t, err) + b3, err := msg3.AsBytes() + require.NoError(t, err) + assert.Equal(t, "line3", stripNewline(b3)) + require.NoError(t, ack3(ctx, nil)) +} + +func TestStreamingFileInput_PositionMetadata(t *testing.T) { + tmpDir := t.TempDir() + filePath := filepath.Join(tmpDir, "test.log") + + // Create test file with multiple lines + testData := "line1\nline2\nline3\n" + require.NoError(t, os.WriteFile(filePath, []byte(testData), 0644)) + + cfg := io.StreamingFileInputConfig{ + Path: filePath, + MaxBufferSize: 10, + MaxLineSize: 1024 * 1024, + } + + input, err := io.NewStreamingFileInput(cfg, nil) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + require.NoError(t, input.Connect(ctx)) + defer input.Close(ctx) + + time.Sleep(100 * time.Millisecond) + + // Read first line and check metadata + msg1, ack1, err := input.Read(ctx) + require.NoError(t, err) + + // Check metadata fields exist + path, ok := msg1.MetaGet("streaming_file_path") + assert.True(t, ok, "streaming_file_path metadata should exist") + assert.Equal(t, filePath, path) + + inode, ok := msg1.MetaGet("streaming_file_inode") + assert.True(t, ok, "streaming_file_inode metadata should exist") + assert.NotEmpty(t, inode) + + offset, ok := msg1.MetaGet("streaming_file_offset") + assert.True(t, ok, "streaming_file_offset metadata should exist") + assert.Equal(t, "0", offset, "first line should start at offset 0") + + require.NoError(t, ack1(ctx, nil)) + + // Read second line - offset should have advanced + msg2, ack2, err := input.Read(ctx) + require.NoError(t, err) + + offset2, ok := msg2.MetaGet("streaming_file_offset") + assert.True(t, ok) + assert.Equal(t, "6", offset2, "second line should start at offset 6 (after 'line1\\n')") + + require.NoError(t, ack2(ctx, nil)) + + // Read third line + msg3, ack3, err := input.Read(ctx) + require.NoError(t, err) + + offset3, ok := msg3.MetaGet("streaming_file_offset") + assert.True(t, ok) + assert.Equal(t, "12", offset3, "third line should start at offset 12") + require.NoError(t, ack3(ctx, nil)) +} + +func TestStreamingFileInput_FileRotation(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("File rotation test requires Unix-like system") + } + + tmpDir := t.TempDir() + filePath := filepath.Join(tmpDir, "test.log") + + // Create initial file + testData := "line1\nline2\n" + require.NoError(t, os.WriteFile(filePath, []byte(testData), 0644)) + + cfg := io.StreamingFileInputConfig{ + Path: filePath, + MaxBufferSize: 10, + MaxLineSize: 1024 * 1024, + } + + input, err := io.NewStreamingFileInput(cfg, nil) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + require.NoError(t, input.Connect(ctx)) + defer input.Close(ctx) + + time.Sleep(100 * time.Millisecond) + + // Read first line + msg1, ack1, err := input.Read(ctx) + require.NoError(t, err) + b1, err := msg1.AsBytes() + require.NoError(t, err) + assert.Equal(t, "line1", stripNewline(b1)) + require.NoError(t, ack1(ctx, nil)) + + // Read second line (to consume it from buffer before rotation) + msg2, ack2, err := input.Read(ctx) + require.NoError(t, err) + b2, err := msg2.AsBytes() + require.NoError(t, err) + assert.Equal(t, "line2", stripNewline(b2)) + require.NoError(t, ack2(ctx, nil)) + + // Simulate file rotation by moving the old file and creating a new one + // This changes the inode, which is how real log rotation works + rotatedPath := filepath.Join(tmpDir, "test.log.1") + require.NoError(t, os.Rename(filePath, rotatedPath)) + + // Create new file with new content + newData := "line3\nline4\n" + require.NoError(t, os.WriteFile(filePath, []byte(newData), 0644)) + + // Give time for fsnotify event processing + time.Sleep(500 * time.Millisecond) + + // Should read from new file + msg3, ack3, err := input.Read(ctx) + require.NoError(t, err) + b3, err := msg3.AsBytes() + require.NoError(t, err) + assert.Equal(t, "line3", stripNewline(b3)) + + // Verify metadata shows offset reset to 0 for new file + offset, ok := msg3.MetaGet("streaming_file_offset") + assert.True(t, ok) + assert.Equal(t, "0", offset, "offset should reset to 0 after rotation") + require.NoError(t, ack3(ctx, nil)) +} + +func TestStreamingFileInput_ConcurrentReads(t *testing.T) { + tmpDir := t.TempDir() + filePath := filepath.Join(tmpDir, "test.log") + + // Create test file with many lines + var testData string + for i := 1; i <= 100; i++ { + testData += fmt.Sprintf("line%d\n", i) + } + require.NoError(t, os.WriteFile(filePath, []byte(testData), 0644)) + + cfg := io.StreamingFileInputConfig{ + Path: filePath, + MaxBufferSize: 50, + MaxLineSize: 1024 * 1024, + } + + input, err := io.NewStreamingFileInput(cfg, nil) + require.NoError(t, err) + + // Use a longer timeout for setup, but we'll cancel once all lines are read + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + require.NoError(t, input.Connect(ctx)) + defer input.Close(ctx) + + time.Sleep(100 * time.Millisecond) + + // Read all lines concurrently + var wg sync.WaitGroup + var mu sync.Mutex + readLines := make(map[string]bool) + var readCount int + + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + msg, ack, err := input.Read(ctx) + if err != nil { + return + } + b, _ := msg.AsBytes() + line := stripNewline(b) + mu.Lock() + readLines[line] = true + readCount++ + done := readCount >= 100 + mu.Unlock() + _ = ack(ctx, nil) + if done { + cancel() // Signal all readers to stop + } + } + }() + } + + wg.Wait() + + // Verify we read all lines + assert.Len(t, readLines, 100) + for i := 1; i <= 100; i++ { + assert.True(t, readLines[fmt.Sprintf("line%d", i)]) + } +} + +func TestStreamingFileInput_FileTruncation(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("File truncation test requires Unix-like system") + } + + tmpDir := t.TempDir() + filePath := filepath.Join(tmpDir, "test.log") + + // Create initial file with multiple lines + testData := "line1\nline2\nline3\nline4\nline5\nline6\nline7\nline8\n" + require.NoError(t, os.WriteFile(filePath, []byte(testData), 0644)) + + cfg := io.StreamingFileInputConfig{ + Path: filePath, + MaxBufferSize: 10, + MaxLineSize: 1024 * 1024, + } + + input, err := io.NewStreamingFileInput(cfg, nil) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + require.NoError(t, input.Connect(ctx)) + defer input.Close(ctx) + + time.Sleep(100 * time.Millisecond) + + // Read first 7 lines to consume most of the initial content + for i := 0; i < 7; i++ { + msg, ack, err := input.Read(ctx) + require.NoError(t, err) + require.NoError(t, ack(ctx, nil)) + b, _ := msg.AsBytes() + assert.Equal(t, fmt.Sprintf("line%d", i+1), stripNewline(b)) + } + + // Truncate the file (write much fewer lines to ensure size is smaller than current offset) + truncatedData := "x\n" + require.NoError(t, os.WriteFile(filePath, []byte(truncatedData), 0644)) + + // Give time for truncation detection and event processing + time.Sleep(500 * time.Millisecond) + + // Should read from the beginning of the truncated file + msg, ack, err := input.Read(ctx) + require.NoError(t, err) + b, _ := msg.AsBytes() + assert.Equal(t, "x", stripNewline(b)) + + // Verify metadata shows offset reset to 0 after truncation + offset, ok := msg.MetaGet("streaming_file_offset") + assert.True(t, ok) + assert.Equal(t, "0", offset, "offset should reset to 0 after truncation") + require.NoError(t, ack(ctx, nil)) +} + +func TestStreamingFileInput_LiveAppend(t *testing.T) { + tmpDir := t.TempDir() + filePath := filepath.Join(tmpDir, "test.log") + + // Create initial empty file + require.NoError(t, os.WriteFile(filePath, []byte{}, 0644)) + + cfg := io.StreamingFileInputConfig{ + Path: filePath, + MaxBufferSize: 10, + MaxLineSize: 1024 * 1024, + } + + input, err := io.NewStreamingFileInput(cfg, nil) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + require.NoError(t, input.Connect(ctx)) + defer input.Close(ctx) + + time.Sleep(100 * time.Millisecond) + + // Append lines to the file + f, err := os.OpenFile(filePath, os.O_APPEND|os.O_WRONLY, 0644) + require.NoError(t, err) + + _, err = f.WriteString("appended1\n") + require.NoError(t, err) + require.NoError(t, f.Sync()) + + // Give fsnotify time to detect the change + time.Sleep(200 * time.Millisecond) + + // Read the appended line + msg1, ack1, err := input.Read(ctx) + require.NoError(t, err) + b1, err := msg1.AsBytes() + require.NoError(t, err) + assert.Equal(t, "appended1", stripNewline(b1)) + require.NoError(t, ack1(ctx, nil)) + + // Append another line + _, err = f.WriteString("appended2\n") + require.NoError(t, err) + require.NoError(t, f.Sync()) + require.NoError(t, f.Close()) + + time.Sleep(200 * time.Millisecond) + + // Read the second appended line + msg2, ack2, err := input.Read(ctx) + require.NoError(t, err) + b2, err := msg2.AsBytes() + require.NoError(t, err) + assert.Equal(t, "appended2", stripNewline(b2)) + require.NoError(t, ack2(ctx, nil)) +} + +func TestStreamingFileInput_FilePositionStruct(t *testing.T) { + // Test that FilePosition struct is correctly defined for metadata use + pos := io.FilePosition{ + FilePath: "/var/log/test.log", + Inode: 12345, + ByteOffset: 1024, + } + + assert.Equal(t, "/var/log/test.log", pos.FilePath) + assert.Equal(t, uint64(12345), pos.Inode) + assert.Equal(t, int64(1024), pos.ByteOffset) +} diff --git a/scripts/stress_test_streaming_file.py b/scripts/stress_test_streaming_file.py new file mode 100755 index 0000000000..228c14e254 --- /dev/null +++ b/scripts/stress_test_streaming_file.py @@ -0,0 +1,552 @@ +#!/usr/bin/env -S uv run -s +# /// script +# requires-python = ">=3.11" +# dependencies = [ +# "rich", +# ] +# /// +""" +Stress test for streaming_file input. + +Tests: +- High-throughput writes (thousands of lines/sec) +- File rotation simulation +- File truncation +- Concurrent writers +- Bento start/stop cycles +- Mixed workloads + +Usage: + uv run -s scripts/stress_test_streaming_file.py + +Or make executable: + chmod +x scripts/stress_test_streaming_file.py + ./scripts/stress_test_streaming_file.py +""" + +import subprocess +import tempfile +import time +import threading +import os +import signal +import sys +import random +import shutil +from pathlib import Path +from dataclasses import dataclass, field +from typing import Optional +from concurrent.futures import ThreadPoolExecutor + +from rich.console import Console +from rich.table import Table +from rich.live import Live +from rich.panel import Panel +from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn + +console = Console() + +# Find bento binary +BENTO_BIN = os.environ.get("BENTO_BIN", "./target/bin/bento") +if not Path(BENTO_BIN).exists(): + # Try to find it + for candidate in ["./target/bin/bento", "./bento", "bento"]: + if Path(candidate).exists() or shutil.which(candidate): + BENTO_BIN = candidate + break + + +@dataclass +class TestStats: + """Track test statistics.""" + lines_written: int = 0 + lines_received: int = 0 + rotations: int = 0 + truncations: int = 0 + bento_restarts: int = 0 + errors: list = field(default_factory=list) + start_time: float = field(default_factory=time.time) + + @property + def elapsed(self) -> float: + return time.time() - self.start_time + + @property + def write_rate(self) -> float: + if self.elapsed > 0: + return self.lines_written / self.elapsed + return 0 + + @property + def receive_rate(self) -> float: + if self.elapsed > 0: + return self.lines_received / self.elapsed + return 0 + + +class BentoProcess: + """Manage a bento process.""" + + def __init__(self, config_path: str, output_file: str): + self.config_path = config_path + self.output_file = output_file + self.process: Optional[subprocess.Popen] = None + self._output_thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + + def start(self) -> None: + """Start bento process.""" + self._stop_event.clear() + # Redirect stdout to a file we can read + self.output_fh = open(self.output_file, "a") + self.process = subprocess.Popen( + [BENTO_BIN, "-c", self.config_path], + stdout=self.output_fh, + stderr=subprocess.PIPE, + text=True, + ) + time.sleep(0.5) # Give it time to start + + def stop(self) -> None: + """Stop bento process gracefully.""" + if self.process: + self.process.send_signal(signal.SIGTERM) + try: + self.process.wait(timeout=5) + except subprocess.TimeoutExpired: + self.process.kill() + self.process.wait() + self.process = None + if hasattr(self, 'output_fh'): + self.output_fh.close() + + def is_running(self) -> bool: + return self.process is not None and self.process.poll() is None + + +def create_config(log_path: str, poll_interval: str = "100ms", + disable_fsnotify: bool = True) -> str: + """Create a bento config file.""" + config = f""" +input: + streaming_file: + path: {log_path} + poll_interval: {poll_interval} + disable_fsnotify: {str(disable_fsnotify).lower()} + max_buffer_size: 10000 + +output: + stdout: + codec: lines +""" + return config + + +def test_high_throughput(stats: TestStats, log_path: Path, duration: float = 5.0): + """Test high-throughput writes.""" + console.print("\n[bold blue]Test 1: High Throughput Writes[/]") + console.print(f" Writing as fast as possible for {duration}s...") + + end_time = time.time() + duration + batch_size = 100 + line_num = 0 + + while time.time() < end_time: + lines = [f"high_throughput_line_{line_num + i}\n" for i in range(batch_size)] + with open(log_path, "a") as f: + f.writelines(lines) + line_num += batch_size + stats.lines_written += batch_size + + console.print(f" [green]✓[/] Wrote {line_num:,} lines ({stats.write_rate:,.0f} lines/sec)") + + +def test_rotation(stats: TestStats, log_path: Path, rotations: int = 5): + """Test file rotation handling.""" + console.print("\n[bold blue]Test 2: File Rotation[/]") + + for i in range(rotations): + # Write some lines + with open(log_path, "a") as f: + for j in range(100): + f.write(f"pre_rotation_{i}_line_{j}\n") + stats.lines_written += 1 + + time.sleep(0.2) + + # Rotate: rename current file, create new one + rotated_path = log_path.with_suffix(f".{i}") + os.rename(log_path, rotated_path) + stats.rotations += 1 + + # Create new file + with open(log_path, "w") as f: + for j in range(100): + f.write(f"post_rotation_{i}_line_{j}\n") + stats.lines_written += 1 + + time.sleep(0.3) + console.print(f" [green]✓[/] Rotation {i + 1}/{rotations} complete") + + # Clean up rotated file + rotated_path.unlink(missing_ok=True) + + +def test_truncation(stats: TestStats, log_path: Path, truncations: int = 5): + """Test file truncation handling.""" + console.print("\n[bold blue]Test 3: File Truncation[/]") + + for i in range(truncations): + # Write a bunch of lines + with open(log_path, "a") as f: + for j in range(200): + f.write(f"pre_truncation_{i}_line_{j}\n") + stats.lines_written += 1 + + time.sleep(0.2) + + # Truncate the file + with open(log_path, "w") as f: + f.write(f"truncated_file_{i}_first_line\n") + stats.lines_written += 1 + stats.truncations += 1 + + time.sleep(0.3) + console.print(f" [green]✓[/] Truncation {i + 1}/{truncations} complete") + + +def test_concurrent_writers(stats: TestStats, log_path: Path, + num_writers: int = 5, duration: float = 3.0): + """Test concurrent writers to the same file.""" + console.print(f"\n[bold blue]Test 4: Concurrent Writers ({num_writers} threads)[/]") + + stop_event = threading.Event() + lock = threading.Lock() + + def writer(writer_id: int): + line_num = 0 + while not stop_event.is_set(): + line = f"writer_{writer_id}_line_{line_num}\n" + with lock: + with open(log_path, "a") as f: + f.write(line) + with lock: + stats.lines_written += 1 + line_num += 1 + time.sleep(random.uniform(0.001, 0.01)) + + threads = [threading.Thread(target=writer, args=(i,)) for i in range(num_writers)] + for t in threads: + t.start() + + time.sleep(duration) + stop_event.set() + + for t in threads: + t.join() + + console.print(f" [green]✓[/] Concurrent writes complete") + + +def test_bento_restarts(stats: TestStats, log_path: Path, config_path: str, + output_file: str, restarts: int = 3): + """Test bento restart cycles.""" + console.print(f"\n[bold blue]Test 5: Bento Restart Cycles ({restarts}x)[/]") + + for i in range(restarts): + # Start bento + bento = BentoProcess(config_path, output_file) + bento.start() + stats.bento_restarts += 1 + + # Write some lines while running + with open(log_path, "a") as f: + for j in range(50): + f.write(f"restart_cycle_{i}_line_{j}\n") + stats.lines_written += 1 + + time.sleep(0.5) + + # Stop bento + bento.stop() + + # Write lines while bento is stopped + with open(log_path, "a") as f: + for j in range(50): + f.write(f"while_stopped_{i}_line_{j}\n") + stats.lines_written += 1 + + console.print(f" [green]✓[/] Restart cycle {i + 1}/{restarts} complete") + + +def test_rapid_small_writes(stats: TestStats, log_path: Path, duration: float = 3.0): + """Test rapid small writes (simulating line-by-line logging).""" + console.print(f"\n[bold blue]Test 6: Rapid Small Writes (line-by-line)[/]") + + end_time = time.time() + duration + line_num = 0 + + while time.time() < end_time: + with open(log_path, "a") as f: + f.write(f"rapid_line_{line_num}\n") + f.flush() + stats.lines_written += 1 + line_num += 1 + # No sleep - as fast as possible, one line at a time + + console.print(f" [green]✓[/] Wrote {line_num:,} individual lines") + + +def test_mixed_workload(stats: TestStats, log_path: Path, duration: float = 5.0): + """Test mixed workload: writes, rotations, truncations all happening.""" + console.print(f"\n[bold blue]Test 7: Mixed Workload (chaos mode)[/]") + + stop_event = threading.Event() + lock = threading.Lock() + + def continuous_writer(): + line_num = 0 + while not stop_event.is_set(): + with lock: + try: + with open(log_path, "a") as f: + f.write(f"mixed_line_{line_num}\n") + stats.lines_written += 1 + except: + pass + line_num += 1 + time.sleep(random.uniform(0.001, 0.05)) + + def chaos_monkey(): + while not stop_event.is_set(): + time.sleep(random.uniform(0.5, 1.5)) + action = random.choice(["rotate", "truncate", "nothing", "nothing"]) + + with lock: + try: + if action == "rotate": + rotated = log_path.with_suffix(".old") + if log_path.exists(): + os.rename(log_path, rotated) + Path(log_path).touch() + rotated.unlink(missing_ok=True) + stats.rotations += 1 + elif action == "truncate": + with open(log_path, "w") as f: + f.write("truncated\n") + stats.truncations += 1 + stats.lines_written += 1 + except: + pass + + writer_thread = threading.Thread(target=continuous_writer) + chaos_thread = threading.Thread(target=chaos_monkey) + + writer_thread.start() + chaos_thread.start() + + time.sleep(duration) + stop_event.set() + + writer_thread.join() + chaos_thread.join() + + console.print(f" [green]✓[/] Chaos complete") + + +def test_fsnotify_vs_polling(stats: TestStats, log_path: Path, config_dir: Path, + output_dir: Path): + """Compare fsnotify vs polling modes.""" + console.print(f"\n[bold blue]Test 8: FSNotify vs Polling Comparison[/]") + + results = {} + + for mode_name, disable_fsnotify in [("polling", True), ("fsnotify", False)]: + config_content = create_config(str(log_path), "50ms", disable_fsnotify) + config_path = config_dir / f"config_{mode_name}.yaml" + config_path.write_text(config_content) + + output_file = output_dir / f"output_{mode_name}.txt" + output_file.touch() + + # Clear log file + log_path.write_text("") + + bento = BentoProcess(str(config_path), str(output_file)) + bento.start() + time.sleep(0.5) + + # Write test lines + start = time.time() + num_lines = 1000 + with open(log_path, "a") as f: + for i in range(num_lines): + f.write(f"{mode_name}_test_line_{i}\n") + write_time = time.time() - start + + # Wait for processing + time.sleep(1.0) + bento.stop() + + # Count received lines + received = sum(1 for line in output_file.read_text().splitlines() + if mode_name in line) + + results[mode_name] = { + "write_time": write_time, + "lines_sent": num_lines, + "lines_received": received, + } + stats.lines_written += num_lines + + # Display comparison + table = Table(title="FSNotify vs Polling") + table.add_column("Mode") + table.add_column("Lines Sent") + table.add_column("Lines Received") + table.add_column("Write Time") + + for mode, data in results.items(): + table.add_row( + mode, + str(data["lines_sent"]), + str(data["lines_received"]), + f"{data['write_time']:.3f}s" + ) + + console.print(table) + + +def count_output_lines(output_file: Path) -> int: + """Count non-empty, non-log lines in output.""" + if not output_file.exists(): + return 0 + count = 0 + for line in output_file.read_text().splitlines(): + # Skip bento log lines + if line.startswith("level=") or not line.strip(): + continue + count += 1 + return count + + +def main(): + console.print(Panel.fit( + "[bold]Streaming File Input Stress Test[/]\n" + f"Using bento: {BENTO_BIN}", + title="🔥 Stress Test" + )) + + # Check bento exists + if not Path(BENTO_BIN).exists() and not shutil.which(BENTO_BIN): + console.print(f"[red]Error: bento binary not found at {BENTO_BIN}[/]") + console.print("Build it with: go build -o ./target/bin/bento ./cmd/bento") + sys.exit(1) + + stats = TestStats() + + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + log_path = tmpdir / "test.log" + config_path = tmpdir / "config.yaml" + output_file = tmpdir / "output.txt" + + # Create initial log file + log_path.write_text("initial_line\n") + stats.lines_written += 1 + + # Create config + config_content = create_config(str(log_path), "100ms", True) + config_path.write_text(config_content) + output_file.touch() + + # Start bento + console.print("\n[yellow]Starting bento...[/]") + bento = BentoProcess(str(config_path), str(output_file)) + bento.start() + + if not bento.is_running(): + console.print("[red]Failed to start bento![/]") + sys.exit(1) + + console.print("[green]Bento started successfully[/]") + time.sleep(0.5) + + try: + # Run tests + test_high_throughput(stats, log_path, duration=3.0) + time.sleep(0.5) + + test_rotation(stats, log_path, rotations=5) + time.sleep(0.5) + + test_truncation(stats, log_path, truncations=5) + time.sleep(0.5) + + test_concurrent_writers(stats, log_path, num_writers=5, duration=2.0) + time.sleep(0.5) + + test_rapid_small_writes(stats, log_path, duration=2.0) + time.sleep(0.5) + + test_mixed_workload(stats, log_path, duration=3.0) + time.sleep(0.5) + + # Stop main bento for restart test + bento.stop() + time.sleep(0.2) + + test_bento_restarts(stats, log_path, str(config_path), + str(output_file), restarts=3) + + # Restart for final test + bento.start() + time.sleep(0.5) + + test_fsnotify_vs_polling(stats, log_path, tmpdir, tmpdir) + + finally: + # Clean up + console.print("\n[yellow]Stopping bento...[/]") + bento.stop() + + # Wait a moment for output to flush + time.sleep(0.5) + + # Count received lines + stats.lines_received = count_output_lines(output_file) + + # Final summary + console.print("\n") + summary = Table(title="📊 Test Summary", show_header=True) + summary.add_column("Metric", style="cyan") + summary.add_column("Value", style="green") + + summary.add_row("Total Duration", f"{stats.elapsed:.1f}s") + summary.add_row("Lines Written", f"{stats.lines_written:,}") + summary.add_row("Lines Received", f"{stats.lines_received:,}") + summary.add_row("Write Rate", f"{stats.write_rate:,.0f} lines/sec") + summary.add_row("File Rotations", str(stats.rotations)) + summary.add_row("File Truncations", str(stats.truncations)) + summary.add_row("Bento Restarts", str(stats.bento_restarts)) + + if stats.lines_written > 0: + capture_rate = (stats.lines_received / stats.lines_written) * 100 + summary.add_row("Capture Rate", f"{capture_rate:.1f}%") + + console.print(summary) + + if stats.errors: + console.print("\n[red]Errors encountered:[/]") + for err in stats.errors: + console.print(f" - {err}") + + # Final verdict + if stats.lines_received > 0: + console.print("\n[bold green]✓ Stress test completed successfully![/]") + else: + console.print("\n[bold red]✗ No lines received - check for issues[/]") + + +if __name__ == "__main__": + main() diff --git a/website/docs/components/inputs/streaming_file.md b/website/docs/components/inputs/streaming_file.md new file mode 100644 index 0000000000..64d2b358c1 --- /dev/null +++ b/website/docs/components/inputs/streaming_file.md @@ -0,0 +1,154 @@ +--- +title: streaming_file +slug: streaming_file +type: input +status: experimental +categories: ["Local"] +--- + + + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +:::caution EXPERIMENTAL +This component is experimental and therefore subject to change or removal outside of major version releases. +::: +Streaming file input with log rotation and truncation handling + +```yml +# Config fields, showing default values +input: + label: "" + streaming_file: + path: /var/log/app.log # No default (required) + max_buffer_size: 1000 + max_line_size: 1048576 + poll_interval: 1s + disable_fsnotify: true +``` + +Reads from a file continuously with automatic handling of log rotation and truncation. + +## Core Features + +- **Log Rotation**: Detects when a file is rotated (renamed/recreated) via inode changes and seamlessly switches to the new file +- **Truncation**: Detects when a file is truncated and resets to read from the beginning +- **Position Metadata**: Each message includes file position metadata (path, inode, byte_offset) that can be used with Bento's cache system to implement custom persistence + +## Position Tracking + +This component exposes file position as metadata on each message. To implement crash recovery, you can: + +1. Store the position in a cache on each message +2. On startup, read the cached position and use a processor to filter already-processed lines + +This approach keeps the input stateless while enabling persistence through pipeline composition. + +## Metadata Fields + +Each message includes the following metadata: + +- `streaming_file_path` - The file path being read +- `streaming_file_inode` - The file's inode (for rotation detection) +- `streaming_file_offset` - Byte offset where this line started + +## Performance Considerations + +By default, this component uses polling-only mode for better CPU efficiency at high write rates. +This is based on findings from large-scale deployments where inotify/fsnotify can cause significant +CPU overhead when files are written to frequently (each write triggers an event, leading to excessive +fstat calls). See `disable_fsnotify` option below. + +For low-volume log files where you want sub-second latency, you can enable fsnotify by setting +`disable_fsnotify: false`. + +### Platform Limitations + +When fsnotify is enabled: + +- **NFS/Network Filesystems**: fsnotify does not work reliably on NFS or other network filesystems +- **Supported Platforms**: Linux (inotify), macOS (FSEvents), Windows (ReadDirectoryChangesW), BSD variants (kqueue) +- **Container Environments**: Ensure the file path is mounted from the host, not a container-internal path + + +## Fields + +### `path` + +Path to the file to read from + + +Type: `string` + +```yml +# Examples + +path: /var/log/app.log +``` + +### `max_buffer_size` + +Maximum number of lines to buffer + + +Type: `int` +Default: `1000` + +```yml +# Examples + +max_buffer_size: 1000 +``` + +### `max_line_size` + +Maximum line size in bytes to prevent OOM + + +Type: `int` +Default: `1048576` + +```yml +# Examples + +max_line_size: 1048576 +``` + +### `poll_interval` + +How often to poll the file for new data. This is the primary mechanism for detecting new data. Lower values mean lower latency but higher CPU usage. + + +Type: `string` +Default: `"1s"` + +```yml +# Examples + +poll_interval: 1s + +poll_interval: 200ms +``` + +### `disable_fsnotify` + +When true (default), only use polling to detect file changes. This is more CPU-efficient for high-throughput log files where inotify would fire constantly. Set to false to enable fsnotify for lower latency on low-volume files. + + +Type: `bool` +Default: `true` + +```yml +# Examples + +disable_fsnotify: true + +disable_fsnotify: false +``` + +