diff --git a/internal/impl/io/input_streaming_file.go b/internal/impl/io/input_streaming_file.go index b43efff32b..3be85c952d 100644 --- a/internal/impl/io/input_streaming_file.go +++ b/internal/impl/io/input_streaming_file.go @@ -61,7 +61,6 @@ type StreamingFileInputConfig struct { MaxBufferSize int `json:"max_buffer_size"` MaxLineSize int `json:"max_line_size"` ReadTimeout time.Duration `json:"read_timeout"` - ShutdownTimeout time.Duration `json:"shutdown_timeout"` Debug bool `json:"debug"` } @@ -155,9 +154,6 @@ func NewStreamingFileInput(cfg StreamingFileInputConfig, logger *service.Logger) if cfg.ReadTimeout <= 0 { cfg.ReadTimeout = 30 * time.Second } - if cfg.ShutdownTimeout <= 0 { - cfg.ShutdownTimeout = 30 * time.Second - } if err := os.MkdirAll(cfg.StateDir, 0755); err != nil { return nil, fmt.Errorf("failed to create state directory: %w", err) @@ -1126,11 +1122,8 @@ func (sfi *StreamingFileInput) Close(ctx context.Context) error { close(sfi.buffer) } - shutdownTimeout := 30 * time.Second - if sfi.config.ShutdownTimeout > 0 { - shutdownTimeout = sfi.config.ShutdownTimeout - } - + // Wait for in-flight messages to be acknowledged, respecting the context deadline + // which is controlled by Bento's shutdown_timeout configuration drainDone := make(chan struct{}) go func() { ticker := time.NewTicker(100 * time.Millisecond) @@ -1152,24 +1145,22 @@ func (sfi *StreamingFileInput) Close(ctx context.Context) error { select { case <-drainDone: sfi.logInfof("All in-flight messages acknowledged") - case <-time.After(shutdownTimeout): + case <-ctx.Done(): remaining := sfi.inFlightCount.Load() - sfi.logErrorf("CRITICAL: Shutdown timeout with %d in-flight messages. Persisting soft checkpoint.", remaining) - if sfi.metrics.ErrorsCounter != nil { - sfi.metrics.ErrorsCounter.Add(context.Background(), remaining, - metric.WithAttributes( - attribute.String("error_type", "shutdown_timeout"), - attribute.String("file", sfi.config.Path))) - } - softCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - if err := sfi.savePositionDurable(softCtx); err != nil { - sfi.logWarnf("Failed to persist soft checkpoint on shutdown timeout: %v", err) - } else { - sfi.logInfof("Soft checkpoint persisted with %d in-flight messages", remaining) + if remaining > 0 { + sfi.logWarnf("Shutdown with %d in-flight messages, persisting checkpoint", remaining) + if sfi.metrics.ErrorsCounter != nil { + sfi.metrics.ErrorsCounter.Add(context.Background(), remaining, + metric.WithAttributes( + attribute.String("error_type", "shutdown_timeout"), + attribute.String("file", sfi.config.Path))) + } + softCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + if err := sfi.savePositionDurable(softCtx); err != nil { + sfi.logWarnf("Failed to persist checkpoint on shutdown: %v", err) + } + cancel() } - case <-ctx.Done(): - sfi.logWarnf("Close context cancelled with %d in-flight messages", sfi.inFlightCount.Load()) } sfi.wg.Wait()