Skip to content
This repository was archived by the owner on Apr 27, 2026. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 2 additions & 13 deletions internal/impl/io/input_streaming_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ type StreamingFileInputConfig struct {
MaxBufferSize int `json:"max_buffer_size"`
MaxLineSize int `json:"max_line_size"`
ReadTimeout time.Duration `json:"read_timeout"`
Debug bool `json:"debug"`
}

// FilePosition tracks the current position in a file
Expand Down Expand Up @@ -262,10 +261,7 @@ The input maintains state in a JSON file to enable recovery from the exact posit
Field(service.NewIntField("max_line_size").
Description("Maximum line size in bytes to prevent OOM").
Default(1048576).
Example(1048576)).
Field(service.NewBoolField("debug").
Description("Enable debug logging").
Default(false))
Example(1048576))
}

// logDebugf logs a debug message using Bento logger
Expand Down Expand Up @@ -939,9 +935,7 @@ func (sfi *StreamingFileInput) loadPosition() error {
sfi.position.LineNumber.Store(loadedPos.RawLineNum)
sfi.position.Timestamp = loadedPos.Timestamp

if sfi.config.Debug {
sfi.logDebugf("Loaded position: line=%d, offset=%d", loadedPos.RawLineNum, loadedPos.RawOffset)
}
sfi.logDebugf("Loaded position: line=%d, offset=%d", loadedPos.RawLineNum, loadedPos.RawOffset)

return nil
}
Expand Down Expand Up @@ -1277,18 +1271,13 @@ func init() {
if err != nil {
return nil, err
}
debug, err := pConf.FieldBool("debug")
if err != nil {
return nil, err
}

cfg := StreamingFileInputConfig{
Path: path,
StateDir: stateDir,
CheckpointInterval: checkpointInterval,
MaxBufferSize: maxBufferSize,
MaxLineSize: maxLineSize,
Debug: debug,
}

return NewStreamingFileInput(cfg, res.Logger())
Expand Down