Skip to content
This repository was archived by the owner on Apr 27, 2026. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 16 additions & 25 deletions internal/impl/io/input_streaming_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ type StreamingFileInputConfig struct {
MaxBufferSize int `json:"max_buffer_size"`
MaxLineSize int `json:"max_line_size"`
ReadTimeout time.Duration `json:"read_timeout"`
ShutdownTimeout time.Duration `json:"shutdown_timeout"`
Debug bool `json:"debug"`
}

Expand Down Expand Up @@ -155,9 +154,6 @@ func NewStreamingFileInput(cfg StreamingFileInputConfig, logger *service.Logger)
if cfg.ReadTimeout <= 0 {
cfg.ReadTimeout = 30 * time.Second
}
if cfg.ShutdownTimeout <= 0 {
cfg.ShutdownTimeout = 30 * time.Second
}

if err := os.MkdirAll(cfg.StateDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create state directory: %w", err)
Expand Down Expand Up @@ -1126,11 +1122,8 @@ func (sfi *StreamingFileInput) Close(ctx context.Context) error {
close(sfi.buffer)
}

shutdownTimeout := 30 * time.Second
if sfi.config.ShutdownTimeout > 0 {
shutdownTimeout = sfi.config.ShutdownTimeout
}

// Wait for in-flight messages to be acknowledged, respecting the context deadline
// which is controlled by Bento's shutdown_timeout configuration
drainDone := make(chan struct{})
go func() {
ticker := time.NewTicker(100 * time.Millisecond)
Expand All @@ -1152,24 +1145,22 @@ func (sfi *StreamingFileInput) Close(ctx context.Context) error {
select {
case <-drainDone:
sfi.logInfof("All in-flight messages acknowledged")
case <-time.After(shutdownTimeout):
case <-ctx.Done():
remaining := sfi.inFlightCount.Load()
sfi.logErrorf("CRITICAL: Shutdown timeout with %d in-flight messages. Persisting soft checkpoint.", remaining)
if sfi.metrics.ErrorsCounter != nil {
sfi.metrics.ErrorsCounter.Add(context.Background(), remaining,
metric.WithAttributes(
attribute.String("error_type", "shutdown_timeout"),
attribute.String("file", sfi.config.Path)))
}
softCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := sfi.savePositionDurable(softCtx); err != nil {
sfi.logWarnf("Failed to persist soft checkpoint on shutdown timeout: %v", err)
} else {
sfi.logInfof("Soft checkpoint persisted with %d in-flight messages", remaining)
if remaining > 0 {
sfi.logWarnf("Shutdown with %d in-flight messages, persisting checkpoint", remaining)
if sfi.metrics.ErrorsCounter != nil {
sfi.metrics.ErrorsCounter.Add(context.Background(), remaining,
metric.WithAttributes(
attribute.String("error_type", "shutdown_timeout"),
attribute.String("file", sfi.config.Path)))
}
softCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
if err := sfi.savePositionDurable(softCtx); err != nil {
sfi.logWarnf("Failed to persist checkpoint on shutdown: %v", err)
}
cancel()
}
case <-ctx.Done():
sfi.logWarnf("Close context cancelled with %d in-flight messages", sfi.inFlightCount.Load())
}

sfi.wg.Wait()
Expand Down