From 747a45efd8ff905d3adbc419ad27f9cf57542507 Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Mon, 19 Aug 2024 18:01:12 +0000 Subject: [PATCH 01/12] added timeout --- internal/irzstd/disk.go | 20 +++ internal/irzstd/memory.go | 16 ++ internal/irzstd/writer.go | 7 + internal/outctx/config.go | 14 +- internal/outctx/context.go | 25 ++-- internal/outctx/manager.go | 137 ++++++++++++++++-- plugins/out_clp_s3/README.md | 31 ++-- plugins/out_clp_s3/internal/flush/flush.go | 70 +++++---- .../out_clp_s3/internal/recovery/recovery.go | 10 +- 9 files changed, 260 insertions(+), 70 deletions(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index 0d1cf59..1d4a253 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -302,6 +302,26 @@ func (w *diskWriter) GetZstdOutputSize() (int, error) { return zstdFileSize, err } +// Checks if writer is empty. True if no events are buffered. +// +// Returns: +// - empty: Boolean value that is true if buffer is empty +// - err: Error calling stat +func (w *diskWriter) CheckEmpty() (bool, error) { + zstdFileInfo, err := w.zstdFile.Stat() + if err != nil { + return false , err + } + // Not checking internal IR buffer since should it since should always be empty from + // perspective of interface. The only time not empty is inside WriteIrZstd, however, it will + // be empty again when function terminates. + if (zstdFileInfo.Size() == 0 && w.irTotalBytes == 0) { + return true, nil + } + + return false, nil +} + // Compresses contents of the IR file and outputs it to the Zstd file. The IR file is then // truncated. // diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index 5d8816b..6cf727b 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -128,6 +128,22 @@ func (w *memoryWriter) GetZstdOutputSize() (int, error) { return w.zstdBuffer.Len(), nil } +// Checks if writer is empty. True if no events are buffered. +// +// Returns: +// - empty: Boolean value that is true if buffer is empty +// - err: nil error to comply with interface +func (w *memoryWriter) CheckEmpty() (bool, error) { + // Not checking internal IR buffer since should it since should always be empty from + // perspective of interface. The only time not empty is inside WriteIrZstd, however, it will + // be empty again when function terminates. + if w.zstdBuffer.Len() == 0 { + return true, nil + } + + return false, nil +} + // Closes [memoryWriter]. Currently used during recovery only, and advise caution using elsewhere. // Using [ir.Writer.Serializer.Close] instead of [ir.Writer.Close] so EndofStream byte is not // added. It is preferable to add postamble on recovery so that IR is in the same state diff --git a/internal/irzstd/writer.go b/internal/irzstd/writer.go index eb43758..fe06757 100644 --- a/internal/irzstd/writer.go +++ b/internal/irzstd/writer.go @@ -61,6 +61,13 @@ type Writer interface { // - size: Bytes written // - err GetZstdOutputSize() (int, error) + + // Checks if writer is empty. True if no events are buffered. + // + // Returns: + // - empty: Boolean value that is true if buffer is empty + // - err + CheckEmpty() (bool, error) } // Writes log events to a IR Writer. diff --git a/internal/outctx/config.go b/internal/outctx/config.go index d771405..b05a9b4 100644 --- a/internal/outctx/config.go +++ b/internal/outctx/config.go @@ -6,6 +6,7 @@ import ( "reflect" "strconv" "strings" + "time" "unsafe" "github.com/go-playground/validator/v10" @@ -32,6 +33,7 @@ type S3Config struct { SingleKey string `conf:"single_key" validate:"required_if=use_single_key true"` UseDiskBuffer bool `conf:"use_disk_buffer" validate:"-"` DiskBufferPath string `conf:"disk_buffer_path" validate:"omitempty,dirpath"` + Timeout time.Duration `conf:"timeout" validate:"-"` UploadSizeMb int `conf:"upload_size_mb" validate:"omitempty,gte=2,lt=1000"` TimeZone string `conf:"time_zone" validate:"timezone"` } @@ -46,6 +48,8 @@ type S3Config struct { // - S3Config: Configuration based on fluent-bit.conf // - err: All validation errors in config wrapped, parse bool error func NewS3Config(plugin unsafe.Pointer) (*S3Config, error) { + defaultTimeout, _ := time.ParseDuration("15m") + // Define default values for settings. Setting defaults before validation simplifies validation // configuration, and ensures that default settings are also validated. config := S3Config{ @@ -59,6 +63,7 @@ func NewS3Config(plugin unsafe.Pointer) (*S3Config, error) { SingleKey: "log", UseDiskBuffer: true, DiskBufferPath: "tmp/out_clp_s3/", + Timeout: defaultTimeout, UploadSizeMb: 16, TimeZone: "America/Toronto", } @@ -76,6 +81,7 @@ func NewS3Config(plugin unsafe.Pointer) (*S3Config, error) { "single_key": &config.SingleKey, "use_disk_buffer": &config.UseDiskBuffer, "disk_buffer_path": &config.DiskBufferPath, + "timeout": &config.Timeout, "upload_size_mb": &config.UploadSizeMb, "time_zone": &config.TimeZone, } @@ -90,7 +96,7 @@ func NewS3Config(plugin unsafe.Pointer) (*S3Config, error) { continue } - // Type switch to type parse boolean strings into boolean type. This is necessary since + // Type switch to type parse interface into field type. This is necessary since // all values are provided as strings. switch configField := untypedField.(type) { case *string: @@ -102,6 +108,12 @@ func NewS3Config(plugin unsafe.Pointer) (*S3Config, error) { return nil, fmt.Errorf("error could not parse input %v into bool", userInput) } *configField = boolInput + case *time.Duration: + durationInput, err := time.ParseDuration(userInput) + if err != nil { + return nil, fmt.Errorf("error could not parse input %v into duration", userInput) + } + *configField = durationInput case *int: intInput, err := strconv.Atoi(userInput) if err != nil { diff --git a/internal/outctx/context.go b/internal/outctx/context.go index 6f3eb67..5f4a08b 100644 --- a/internal/outctx/context.go +++ b/internal/outctx/context.go @@ -42,7 +42,7 @@ const ( type S3Context struct { Config S3Config Uploader *manager.Uploader - EventManagers map[string]*EventManager + EventManagers map[string]*S3EventManager } // Creates a new context. Loads configuration from user. Loads and tests aws credentials. @@ -108,7 +108,7 @@ func NewS3Context(plugin unsafe.Pointer) (*S3Context, error) { ctx := S3Context{ Config: *config, Uploader: uploader, - EventManagers: make(map[string]*EventManager), + EventManagers: make(map[string]*S3EventManager), } return &ctx, nil @@ -123,7 +123,7 @@ func NewS3Context(plugin unsafe.Pointer) (*S3Context, error) { // // Returns: // - err: Could not create buffers or tag -func (ctx *S3Context) GetEventManager(tag string, size int) (*EventManager, error) { +func (ctx *S3Context) GetEventManager(tag string, size int) (*S3EventManager, error) { var err error eventManager, ok := ctx.EventManagers[tag] @@ -137,7 +137,7 @@ func (ctx *S3Context) GetEventManager(tag string, size int) (*EventManager, erro return eventManager, nil } -// Recovers [EventManager] from previous execution using existing disk buffers. +// Recovers [S3EventManager] from previous execution using existing disk buffers. // // Parameters: // - tag: Fluent Bit tag @@ -149,7 +149,7 @@ func (ctx *S3Context) GetEventManager(tag string, size int) (*EventManager, erro func (ctx *S3Context) RecoverEventManager( tag string, size int, -) (*EventManager, error) { +) (*S3EventManager, error) { irPath, zstdPath := ctx.GetBufferFilePaths(tag) writer, err := irzstd.RecoverWriter( ctx.Config.TimeZone, @@ -161,17 +161,20 @@ func (ctx *S3Context) RecoverEventManager( return nil, err } - eventManager := EventManager{ + eventManager := S3EventManager{ Tag: tag, Writer: writer, + UploadRequests: make(chan bool), } + go eventManager.Listen(ctx.Config, ctx.Uploader) + ctx.EventManagers[tag] = &eventManager return &eventManager, nil } -// Creates a new [EventManager] with a new [irzstd.Writer]. If UseDiskBuffer is set, buffers are +// Creates a new [S3EventManager] with a new [irzstd.Writer]. If UseDiskBuffer is set, buffers are // created on disk and are used to buffer Fluent Bit chunks. If UseDiskBuffer is off, buffer is // in memory and chunks are not buffered. // @@ -185,7 +188,7 @@ func (ctx *S3Context) RecoverEventManager( func (ctx *S3Context) newEventManager( tag string, size int, -) (*EventManager, error) { +) (*S3EventManager, error) { var err error var writer irzstd.Writer @@ -197,6 +200,7 @@ func (ctx *S3Context) newEventManager( irPath, zstdPath, ) + } else { writer, err = irzstd.NewMemoryWriter(ctx.Config.TimeZone, size) } @@ -205,11 +209,14 @@ func (ctx *S3Context) newEventManager( return nil, err } - eventManager := EventManager{ + eventManager := S3EventManager{ Tag: tag, Writer: writer, + UploadRequests: make(chan bool), } + go eventManager.Listen(ctx.Config, ctx.Uploader) + ctx.EventManagers[tag] = &eventManager return &eventManager, nil diff --git a/internal/outctx/manager.go b/internal/outctx/manager.go index 0ecf85a..d9ed499 100644 --- a/internal/outctx/manager.go +++ b/internal/outctx/manager.go @@ -2,10 +2,12 @@ package outctx import ( "context" + "net/url" + "fmt" "log" - "net/url" "path/filepath" + "sync" "time" "github.com/aws/aws-sdk-go-v2/aws" @@ -19,15 +21,121 @@ import ( const s3TagKey = "fluentBitTag" // Resources and metadata to process Fluent Bit events with the same tag. -type EventManager struct { - Tag string - Index int - Writer irzstd.Writer +type S3EventManager struct { + Tag string + Index int + Writer irzstd.Writer + Listening bool + Mutex sync.Mutex + WaitGroup sync.WaitGroup + UploadRequests chan bool +} + +// Starts upload listener which can receive signals on UploadRequests channel. Upload timeout +// is only active if use_disk_buffer is on. If UploadRequests channel is closed, the WaitGroup +// will decrement. WaitGroup allows graceful exit of listener goroutine when Fluent Bit receives +// a kill signal. Without it, Fluent Bit may abruptly kill goroutines. +// +// Parameters: +// - config: Plugin configuration +// - uploader: S3 uploader manager +func (m *S3EventManager) Listen(config S3Config, uploader *manager.Uploader) { + log.Printf("Starting upload listener for event manager with tag %s", m.Tag) + defer m.WaitGroup.Done() + + m.Listening = true + if m.Writer.GetUseDiskBuffer() { + m.DiskUploadListener(config, uploader) + } else { + m.MemoryUploadListener(config, uploader) + } +} + +// Ends listener goroutine. +func (m *S3EventManager) StopListening() { + log.Printf("Stopping upload listener for event manager with tag %s", m.Tag) + + // Closing the channel sends terminate signal to goroutine. The WaitGroup + // will block until it actually terminates. + close(m.UploadRequests) + m.WaitGroup.Wait() + m.Listening = false +} + +// Immortal listener that uploads events to s3 when receives signal on UploadRequests channel or a timeout is hit. +// Listener will sleep when inactive. +// +// Parameters: +// - config: Plugin configuration +// - uploader: S3 uploader manager +func (m *S3EventManager) DiskUploadListener(config S3Config, uploader *manager.Uploader) { + for { + select { + case _, more := <-m.UploadRequests: + if !more { + return + } + // Timeout will reset if signal sent on UploadRequest channel + case <-time.After(config.Timeout): + } + + err := m.Upload(config, uploader) + if err != nil { + log.Printf("Error uploading to s3 for event manager with tag %s", m.Tag) + } + } +} + +// Immortal listener that uploads events to s3 when receives signal on UploadRequests channel. +// Listener will sleep when inactive. +// +// Parameters: +// - config: Plugin configuration +// - uploader: S3 uploader manager +func (m *S3EventManager) MemoryUploadListener(config S3Config, uploader *manager.Uploader) { + for { + _, more := <-m.UploadRequests + if !more { + return + } + + err := m.Upload(config, uploader) + if err != nil { + log.Printf("Error uploading to s3 for event manager with tag %s", m.Tag) + } + } } -// Sends Zstd buffer to s3 and reset writer and buffers for future uploads. Prior to upload, -// IR buffer is flushed and IR/Zstd streams are terminated. The [EventManager.Index] is incremented -// on successful upload. +// Uploads to s3 after acquiring lock and validating that buffer is not empty. Mutex prevents +// write while uploading. Must check that buffer is not empty as timeout can trigger on empty +// buffer and send empty file to s3. Empty buffer check is not explicitly necessary for +// MemoryUploadListener. +// +// +// Parameters: +// - config: Plugin configuration +// - uploader: S3 uploader manager +func (m *S3EventManager) Upload(config S3Config, uploader *manager.Uploader) error { + m.Mutex.Lock() + defer m.Mutex.Unlock() + + empty, err := m.Writer.CheckEmpty() + if err != nil { + return fmt.Errorf("failed to check if buffer is empty, %w", err) + } + + if empty { + log.Printf("Did not uploads events with tag %s since buffer is empty", m.Tag) + return nil + } + + err = m.ToS3(config, uploader) + return err +} + +// Sends Zstd buffer to s3 and reset writer and buffers for future uploads. Prior to upload, IR +// buffer is flushed and IR/Zstd streams are terminated. The [S3EventManager.Index] is incremented on +// successful upload. // // Parameters: // - config: Plugin configuration @@ -35,13 +143,14 @@ type EventManager struct { // // Returns: // - err: Error creating closing streams, error uploading to s3, error resetting writer -func (m *EventManager) ToS3(config S3Config, uploader *manager.Uploader) error { +func (m *S3EventManager) ToS3(config S3Config, uploader *manager.Uploader) error { + err := m.Writer.CloseStreams() if err != nil { - return fmt.Errorf("error closing irzstd stream: %w", err) + panic(fmt.Errorf("error closing irzstd stream: %w", err)) } - outputLocation, err := uploadToS3( + outputLocation, err := s3Request( config.S3Bucket, config.S3BucketPrefix, m, @@ -59,7 +168,7 @@ func (m *EventManager) ToS3(config S3Config, uploader *manager.Uploader) error { err = m.Writer.Reset() if err != nil { - return err + panic(fmt.Errorf("error resetting irzstd stream: %w", err)) } return nil @@ -76,10 +185,10 @@ func (m *EventManager) ToS3(config S3Config, uploader *manager.Uploader) error { // // Returns: // - err: Error uploading, error unescaping string -func uploadToS3( +func s3Request( bucket string, bucketPrefix string, - eventManager *EventManager, + eventManager *S3EventManager, id string, uploader *manager.Uploader, ) (string, error) { diff --git a/plugins/out_clp_s3/README.md b/plugins/out_clp_s3/README.md index e074ed0..b2ab979 100644 --- a/plugins/out_clp_s3/README.md +++ b/plugins/out_clp_s3/README.md @@ -82,20 +82,21 @@ More detailed information for specifying credentials from AWS can be found [here ### Plugin configuration -| Key | Description | Default | -|---------------------|----------------------------------------------------------------------------------------------------------|-------------------| -| `s3_region` | The AWS region of your S3 bucket | `us-east-1` | -| `s3_bucket` | S3 bucket name. Just the name, no aws prefix neccesary. | `None` | -| `s3_bucket_prefix` | Bucket prefix path | `logs/` | -| `role_arn` | ARN of an IAM role to assume | `None` | -| `id` | Name of output plugin | Random UUID | -| `use_single_key` | Output single key from Fluent Bit record. See [Use Single Key](#use-single-key) for more info. | `TRUE` | -| `allow_missing_key` | Fallback to whole record if key is missing from log. If set to false, an error will be recorded instead. | `TRUE` | -| `single_key` | Value for single key | `log` | -| `use_disk_buffer` | Buffer logs on disk prior to sending to S3. See [Disk Buffering](#disk-buffering) for more info. | `TRUE` | -| `disk_buffer_path` | Directory for disk buffer | `tmp/out_clp_s3/` | -| `upload_size_mb` | Set upload size in MB when disk store is enabled. Size refers to the compressed size. | `16` | -| `time_zone` | Time zone of the log source, so that local times (non-unix timestamps) are handled correctly. | `America/Toronto` | +| Key | Description | Default | +|---------------------|-----------------------------------------------------------------------------------------------------------------|---------------------| +| `s3_region` | The AWS region of your S3 bucket | `us-east-1` | +| `s3_bucket` | S3 bucket name. Just the name, no aws prefix necessary. | `None` | +| `s3_bucket_prefix` | Bucket prefix path | `logs/` | +| `role_arn` | ARN of an IAM role to assume | `None` | +| `id` | Name of output plugin | Random UUID | +| `use_single_key` | Output single key from Fluent Bit record. See [Use Single Key](#use-single-key) for more info. | `TRUE` | +| `allow_missing_key` | Fallback to whole record if single key is missing from log. If set to false, an error will be recorded instead. | `TRUE` | +| `single_key` | Value for single key | `log` | +| `use_disk_buffer` | Buffer logs on disk prior to sending to S3. See [Disk Buffering](#disk-buffering) for more info. | `TRUE` | +| `disk_buffer_path` | Directory for disk buffer. Path should be unique for each output. | `tmp/out_clp_s3/1/` | +| `upload_size_mb` | Set upload size in MB when disk buffer is enabled. Size refers to the compressed size. | `16` | +| `timeout` | Upload timeout if upload size is not met. For use when disk buffer is enabled. Valid time units are s, m, h. | `15m` | +| `time_zone` | Time zone of the log source, so that local times (non-unix timestamps) are handled correctly. | `America/Toronto` | #### Use Single Key @@ -107,7 +108,7 @@ the record as JSON. #### Disk Buffering -The output plugin recieves raw logs from Fluent Bit in small chunks. With `use_disk_buffer` set, the +The output plugin receives raw logs from Fluent Bit in small chunks. With `use_disk_buffer` set, the output plugin will accumulate logs on disk until the upload size is reached. Buffering logs will reduce the amount of S3 API requests and improve the compression ratio. However, the plugin will use disk space and have higher memory requirements. The amount of system resources will be proportional diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index 714be4f..0a5c9f9 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -44,34 +44,11 @@ func Ingest(data unsafe.Pointer, size int, tag string, ctx *outctx.S3Context) (i return output.FLB_RETRY, fmt.Errorf("error getting event manager: %w", err) } - numEvents, err := eventManager.Writer.WriteIrZstd(logEvents) + err = write(eventManager, logEvents, ctx.Config) if err != nil { - log.Printf( - "Wrote %d out of %d total log events for tag %s", - numEvents, - len(logEvents), - eventManager.Tag, - ) return output.FLB_ERROR, err } - uploadCriteriaMet, err := checkUploadCriteriaMet( - eventManager, - ctx.Config.UploadSizeMb, - ) - if err != nil { - return output.FLB_ERROR, fmt.Errorf("error checking upload criteria: %w", err) - } - - if !uploadCriteriaMet { - return output.FLB_OK, nil - } - - err = eventManager.ToS3(ctx.Config, ctx.Uploader) - if err != nil { - return output.FLB_ERROR, fmt.Errorf("error flushing Zstd buffer to s3: %w", err) - } - return output.FLB_OK, nil } @@ -175,6 +152,49 @@ func getMessage(jsonRecord []byte, config outctx.S3Config) (string, error) { return stringMsg, nil } + +// Writes logEvents to event manager buffers. If upload criteria is met, sends upload signal to +// manager's UploadRequest channel. Method acquires lock to prevent upload while writing +// +// Parameters: +// - eventManager: Manager for Fluent Bit events with the same tag +// - logEvents: Slice of log events +// - config: Plugin configuration +// +// Returns: +// - err: Error writing log events, error checking upload criteria +func write(eventManager *outctx.S3EventManager, logEvents []ffi.LogEvent, config outctx.S3Config) error { + eventManager.Mutex.Lock() + defer eventManager.Mutex.Unlock() + + numEvents, err := eventManager.Writer.WriteIrZstd(logEvents) + if err != nil { + log.Printf( + "Wrote %d out of %d total log events for tag %s", + numEvents, + len(logEvents), + eventManager.Tag, + ) + return fmt.Errorf("error writing log events: %w", err) + } + + uploadCriteriaMet, err := checkUploadCriteriaMet( + eventManager, + config.UploadSizeMb, + ) + if err != nil { + return fmt.Errorf("error checking upload criteria: %w", err) + } + + if uploadCriteriaMet { + log.Printf("Sending upload request to upload listener with tag %s", eventManager.Tag) + eventManager.UploadRequests <- true + } + + return nil +} + + // Checks if criteria are met to upload to s3. If useDiskBuffer is false, then the chunk is always // uploaded so always returns true. If useDiskBuffer is true, check if Zstd buffer size is greater // than upload size. @@ -186,7 +206,7 @@ func getMessage(jsonRecord []byte, config outctx.S3Config) (string, error) { // Returns: // - readyToUpload: Boolean if upload criteria met or not // - err: Error getting Zstd buffer size -func checkUploadCriteriaMet(eventManager *outctx.EventManager, uploadSizeMb int) (bool, error) { +func checkUploadCriteriaMet(eventManager *outctx.S3EventManager, uploadSizeMb int) (bool, error) { if !eventManager.Writer.GetUseDiskBuffer() { return true, nil } diff --git a/plugins/out_clp_s3/internal/recovery/recovery.go b/plugins/out_clp_s3/internal/recovery/recovery.go index 2683506..a6cde9f 100644 --- a/plugins/out_clp_s3/internal/recovery/recovery.go +++ b/plugins/out_clp_s3/internal/recovery/recovery.go @@ -30,6 +30,7 @@ func GracefulExit(ctx *outctx.S3Context) error { return err } eventManager.Writer = nil + eventManager.StopListening() } return nil @@ -174,7 +175,7 @@ func checkFilesValid(irFiles map[string]fs.FileInfo, zstdFiles map[string]fs.Fil } // Flushes existing disk buffer to s3 on startup. Prior to sending, opens disk buffer files and -// creates new [outctx.EventManager] using existing buffer files. +// creates new [outctx.S3EventManager] using existing buffer files. // // Parameters: // - tag: Fluent Bit tag @@ -213,11 +214,8 @@ func flushExistingBuffer( log.Printf("Recovered disk buffers with tag %s", tag) - err = eventManager.ToS3(ctx.Config, ctx.Uploader) - if err != nil { - return fmt.Errorf("error flushing Zstd to s3: %w", err) - } - + log.Printf("Sending upload request to upload listener with tag %s", eventManager.Tag) + eventManager.UploadRequests <- true return nil } From e92a66ed62593cadf5ac614544913f145914bd85 Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Mon, 19 Aug 2024 18:04:54 +0000 Subject: [PATCH 02/12] refactor --- internal/irzstd/disk.go | 4 ++-- internal/irzstd/memory.go | 2 +- internal/irzstd/writer.go | 2 +- internal/outctx/config.go | 24 +++++++++++----------- internal/outctx/context.go | 8 ++++---- internal/outctx/manager.go | 18 ++++++++-------- plugins/out_clp_s3/internal/flush/flush.go | 8 +++++--- 7 files changed, 33 insertions(+), 33 deletions(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index 1d4a253..89b9e44 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -310,12 +310,12 @@ func (w *diskWriter) GetZstdOutputSize() (int, error) { func (w *diskWriter) CheckEmpty() (bool, error) { zstdFileInfo, err := w.zstdFile.Stat() if err != nil { - return false , err + return false, err } // Not checking internal IR buffer since should it since should always be empty from // perspective of interface. The only time not empty is inside WriteIrZstd, however, it will // be empty again when function terminates. - if (zstdFileInfo.Size() == 0 && w.irTotalBytes == 0) { + if zstdFileInfo.Size() == 0 && w.irTotalBytes == 0 { return true, nil } diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index 6cf727b..3d12ddc 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -137,7 +137,7 @@ func (w *memoryWriter) CheckEmpty() (bool, error) { // Not checking internal IR buffer since should it since should always be empty from // perspective of interface. The only time not empty is inside WriteIrZstd, however, it will // be empty again when function terminates. - if w.zstdBuffer.Len() == 0 { + if w.zstdBuffer.Len() == 0 { return true, nil } diff --git a/internal/irzstd/writer.go b/internal/irzstd/writer.go index fe06757..5042a80 100644 --- a/internal/irzstd/writer.go +++ b/internal/irzstd/writer.go @@ -65,7 +65,7 @@ type Writer interface { // Checks if writer is empty. True if no events are buffered. // // Returns: - // - empty: Boolean value that is true if buffer is empty + // - empty: Boolean value that is true if buffer is empty // - err CheckEmpty() (bool, error) } diff --git a/internal/outctx/config.go b/internal/outctx/config.go index b05a9b4..aa188ee 100644 --- a/internal/outctx/config.go +++ b/internal/outctx/config.go @@ -23,19 +23,19 @@ import ( // //nolint:revive type S3Config struct { - S3Region string `conf:"s3_region" validate:"required"` - S3Bucket string `conf:"s3_bucket" validate:"required"` - S3BucketPrefix string `conf:"s3_bucket_prefix" validate:"dirpath"` - RoleArn string `conf:"role_arn" validate:"omitempty,startswith=arn:aws:iam"` - Id string `conf:"id" validate:"required"` - UseSingleKey bool `conf:"use_single_key" validate:"-"` - AllowMissingKey bool `conf:"allow_missing_key" validate:"-"` - SingleKey string `conf:"single_key" validate:"required_if=use_single_key true"` - UseDiskBuffer bool `conf:"use_disk_buffer" validate:"-"` - DiskBufferPath string `conf:"disk_buffer_path" validate:"omitempty,dirpath"` + S3Region string `conf:"s3_region" validate:"required"` + S3Bucket string `conf:"s3_bucket" validate:"required"` + S3BucketPrefix string `conf:"s3_bucket_prefix" validate:"dirpath"` + RoleArn string `conf:"role_arn" validate:"omitempty,startswith=arn:aws:iam"` + Id string `conf:"id" validate:"required"` + UseSingleKey bool `conf:"use_single_key" validate:"-"` + AllowMissingKey bool `conf:"allow_missing_key" validate:"-"` + SingleKey string `conf:"single_key" validate:"required_if=use_single_key true"` + UseDiskBuffer bool `conf:"use_disk_buffer" validate:"-"` + DiskBufferPath string `conf:"disk_buffer_path" validate:"omitempty,dirpath"` Timeout time.Duration `conf:"timeout" validate:"-"` - UploadSizeMb int `conf:"upload_size_mb" validate:"omitempty,gte=2,lt=1000"` - TimeZone string `conf:"time_zone" validate:"timezone"` + UploadSizeMb int `conf:"upload_size_mb" validate:"omitempty,gte=2,lt=1000"` + TimeZone string `conf:"time_zone" validate:"timezone"` } // Generates configuration struct containing user-defined settings. In addition, sets default values diff --git a/internal/outctx/context.go b/internal/outctx/context.go index 5f4a08b..0e5e4f0 100644 --- a/internal/outctx/context.go +++ b/internal/outctx/context.go @@ -162,8 +162,8 @@ func (ctx *S3Context) RecoverEventManager( } eventManager := S3EventManager{ - Tag: tag, - Writer: writer, + Tag: tag, + Writer: writer, UploadRequests: make(chan bool), } @@ -210,8 +210,8 @@ func (ctx *S3Context) newEventManager( } eventManager := S3EventManager{ - Tag: tag, - Writer: writer, + Tag: tag, + Writer: writer, UploadRequests: make(chan bool), } diff --git a/internal/outctx/manager.go b/internal/outctx/manager.go index d9ed499..c1b7098 100644 --- a/internal/outctx/manager.go +++ b/internal/outctx/manager.go @@ -2,10 +2,9 @@ package outctx import ( "context" - "net/url" - "fmt" "log" + "net/url" "path/filepath" "sync" "time" @@ -34,7 +33,8 @@ type S3EventManager struct { // Starts upload listener which can receive signals on UploadRequests channel. Upload timeout // is only active if use_disk_buffer is on. If UploadRequests channel is closed, the WaitGroup // will decrement. WaitGroup allows graceful exit of listener goroutine when Fluent Bit receives -// a kill signal. Without it, Fluent Bit may abruptly kill goroutines. +// +// a kill signal. Without it, Fluent Bit may abruptly kill goroutines. // // Parameters: // - config: Plugin configuration @@ -62,8 +62,8 @@ func (m *S3EventManager) StopListening() { m.Listening = false } -// Immortal listener that uploads events to s3 when receives signal on UploadRequests channel or a timeout is hit. -// Listener will sleep when inactive. +// Immortal listener that uploads events to s3 when receives signal on UploadRequests channel or a +// timeout is hit. Listener will sleep when inactive. // // Parameters: // - config: Plugin configuration @@ -94,7 +94,7 @@ func (m *S3EventManager) DiskUploadListener(config S3Config, uploader *manager.U // - uploader: S3 uploader manager func (m *S3EventManager) MemoryUploadListener(config S3Config, uploader *manager.Uploader) { for { - _, more := <-m.UploadRequests + _, more := <-m.UploadRequests if !more { return } @@ -111,7 +111,6 @@ func (m *S3EventManager) MemoryUploadListener(config S3Config, uploader *manager // buffer and send empty file to s3. Empty buffer check is not explicitly necessary for // MemoryUploadListener. // -// // Parameters: // - config: Plugin configuration // - uploader: S3 uploader manager @@ -134,8 +133,8 @@ func (m *S3EventManager) Upload(config S3Config, uploader *manager.Uploader) err } // Sends Zstd buffer to s3 and reset writer and buffers for future uploads. Prior to upload, IR -// buffer is flushed and IR/Zstd streams are terminated. The [S3EventManager.Index] is incremented on -// successful upload. +// buffer is flushed and IR/Zstd streams are terminated. The [S3EventManager.Index] is incremented +// on successful upload. // // Parameters: // - config: Plugin configuration @@ -144,7 +143,6 @@ func (m *S3EventManager) Upload(config S3Config, uploader *manager.Uploader) err // Returns: // - err: Error creating closing streams, error uploading to s3, error resetting writer func (m *S3EventManager) ToS3(config S3Config, uploader *manager.Uploader) error { - err := m.Writer.CloseStreams() if err != nil { panic(fmt.Errorf("error closing irzstd stream: %w", err)) diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index 0a5c9f9..8809082 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -152,7 +152,6 @@ func getMessage(jsonRecord []byte, config outctx.S3Config) (string, error) { return stringMsg, nil } - // Writes logEvents to event manager buffers. If upload criteria is met, sends upload signal to // manager's UploadRequest channel. Method acquires lock to prevent upload while writing // @@ -163,7 +162,11 @@ func getMessage(jsonRecord []byte, config outctx.S3Config) (string, error) { // // Returns: // - err: Error writing log events, error checking upload criteria -func write(eventManager *outctx.S3EventManager, logEvents []ffi.LogEvent, config outctx.S3Config) error { +func write( + eventManager *outctx.S3EventManager, + logEvents []ffi.LogEvent, + config outctx.S3Config, +) error { eventManager.Mutex.Lock() defer eventManager.Mutex.Unlock() @@ -194,7 +197,6 @@ func write(eventManager *outctx.S3EventManager, logEvents []ffi.LogEvent, config return nil } - // Checks if criteria are met to upload to s3. If useDiskBuffer is false, then the chunk is always // uploaded so always returns true. If useDiskBuffer is true, check if Zstd buffer size is greater // than upload size. From 95572b2b5315cc0431cf6f973a2f1db3030c41ac Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Mon, 19 Aug 2024 18:43:01 +0000 Subject: [PATCH 03/12] refactor errors into panic --- internal/irzstd/disk.go | 6 +-- internal/irzstd/memory.go | 6 +-- internal/outctx/context.go | 4 +- internal/outctx/manager.go | 78 +++++++++++++++++--------------------- 4 files changed, 38 insertions(+), 56 deletions(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index 89b9e44..a6b462f 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -315,11 +315,7 @@ func (w *diskWriter) CheckEmpty() (bool, error) { // Not checking internal IR buffer since should it since should always be empty from // perspective of interface. The only time not empty is inside WriteIrZstd, however, it will // be empty again when function terminates. - if zstdFileInfo.Size() == 0 && w.irTotalBytes == 0 { - return true, nil - } - - return false, nil + return (zstdFileInfo.Size() == 0 && w.irTotalBytes == 0), nil } // Compresses contents of the IR file and outputs it to the Zstd file. The IR file is then diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index 3d12ddc..cb21b32 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -137,11 +137,7 @@ func (w *memoryWriter) CheckEmpty() (bool, error) { // Not checking internal IR buffer since should it since should always be empty from // perspective of interface. The only time not empty is inside WriteIrZstd, however, it will // be empty again when function terminates. - if w.zstdBuffer.Len() == 0 { - return true, nil - } - - return false, nil + return w.zstdBuffer.Len() == 0, nil } // Closes [memoryWriter]. Currently used during recovery only, and advise caution using elsewhere. diff --git a/internal/outctx/context.go b/internal/outctx/context.go index 0e5e4f0..f9b7b63 100644 --- a/internal/outctx/context.go +++ b/internal/outctx/context.go @@ -167,7 +167,7 @@ func (ctx *S3Context) RecoverEventManager( UploadRequests: make(chan bool), } - go eventManager.Listen(ctx.Config, ctx.Uploader) + go eventManager.listen(ctx.Config, ctx.Uploader) ctx.EventManagers[tag] = &eventManager @@ -215,7 +215,7 @@ func (ctx *S3Context) newEventManager( UploadRequests: make(chan bool), } - go eventManager.Listen(ctx.Config, ctx.Uploader) + go eventManager.listen(ctx.Config, ctx.Uploader) ctx.EventManagers[tag] = &eventManager diff --git a/internal/outctx/manager.go b/internal/outctx/manager.go index c1b7098..4a31853 100644 --- a/internal/outctx/manager.go +++ b/internal/outctx/manager.go @@ -24,51 +24,53 @@ type S3EventManager struct { Tag string Index int Writer irzstd.Writer - Listening bool Mutex sync.Mutex WaitGroup sync.WaitGroup UploadRequests chan bool + listening bool } -// Starts upload listener which can receive signals on UploadRequests channel. Upload timeout -// is only active if use_disk_buffer is on. If UploadRequests channel is closed, the WaitGroup -// will decrement. WaitGroup allows graceful exit of listener goroutine when Fluent Bit receives -// -// a kill signal. Without it, Fluent Bit may abruptly kill goroutines. +// Ends listener goroutine. +func (m *S3EventManager) StopListening() { + log.Printf("Stopping upload listener for event manager with tag %s", m.Tag) + + // Closing the channel sends terminate signal to goroutine. The WaitGroup + // will block until it actually terminates. + close(m.UploadRequests) + m.WaitGroup.Wait() + m.listening = false +} + +// Starts upload listener which can receive signals on UploadRequests channel. This function should +// be called as a goroutine. Timeout is only triggered if use_disk_buffer is on. Function calls +// immortal functions and thus will not exit. It will only exit if the uploadRequest channel is +// closed which will allow the callee to break out of infinite loop. When function does exit, it +// decrements a WaitGroup signaling that the goroutine has exited. WaitGroup allows graceful exit +// of listener when Fluent Bit receives a kill signal. On [recovery.GracefulExit], plugin will +// wait to exit until all listeners are closed. Without WaitGroup, OS may abruptly kill goroutines. // // Parameters: // - config: Plugin configuration // - uploader: S3 uploader manager -func (m *S3EventManager) Listen(config S3Config, uploader *manager.Uploader) { +func (m *S3EventManager) listen(config S3Config, uploader *manager.Uploader) { log.Printf("Starting upload listener for event manager with tag %s", m.Tag) defer m.WaitGroup.Done() - m.Listening = true + m.listening = true if m.Writer.GetUseDiskBuffer() { - m.DiskUploadListener(config, uploader) + m.diskUploadListener(config, uploader) } else { - m.MemoryUploadListener(config, uploader) + m.memoryUploadListener(config, uploader) } } -// Ends listener goroutine. -func (m *S3EventManager) StopListening() { - log.Printf("Stopping upload listener for event manager with tag %s", m.Tag) - - // Closing the channel sends terminate signal to goroutine. The WaitGroup - // will block until it actually terminates. - close(m.UploadRequests) - m.WaitGroup.Wait() - m.Listening = false -} - // Immortal listener that uploads events to s3 when receives signal on UploadRequests channel or a // timeout is hit. Listener will sleep when inactive. // // Parameters: // - config: Plugin configuration // - uploader: S3 uploader manager -func (m *S3EventManager) DiskUploadListener(config S3Config, uploader *manager.Uploader) { +func (m *S3EventManager) diskUploadListener(config S3Config, uploader *manager.Uploader) { for { select { case _, more := <-m.UploadRequests: @@ -79,10 +81,7 @@ func (m *S3EventManager) DiskUploadListener(config S3Config, uploader *manager.U case <-time.After(config.Timeout): } - err := m.Upload(config, uploader) - if err != nil { - log.Printf("Error uploading to s3 for event manager with tag %s", m.Tag) - } + m.upload(config, uploader) } } @@ -92,17 +91,14 @@ func (m *S3EventManager) DiskUploadListener(config S3Config, uploader *manager.U // Parameters: // - config: Plugin configuration // - uploader: S3 uploader manager -func (m *S3EventManager) MemoryUploadListener(config S3Config, uploader *manager.Uploader) { +func (m *S3EventManager) memoryUploadListener(config S3Config, uploader *manager.Uploader) { for { _, more := <-m.UploadRequests if !more { return } - err := m.Upload(config, uploader) - if err != nil { - log.Printf("Error uploading to s3 for event manager with tag %s", m.Tag) - } + m.upload(config, uploader) } } @@ -114,22 +110,21 @@ func (m *S3EventManager) MemoryUploadListener(config S3Config, uploader *manager // Parameters: // - config: Plugin configuration // - uploader: S3 uploader manager -func (m *S3EventManager) Upload(config S3Config, uploader *manager.Uploader) error { +func (m *S3EventManager) upload(config S3Config, uploader *manager.Uploader) { m.Mutex.Lock() defer m.Mutex.Unlock() empty, err := m.Writer.CheckEmpty() if err != nil { - return fmt.Errorf("failed to check if buffer is empty, %w", err) + panic(fmt.Errorf("failed to check if buffer is empty, %w", err)) } if empty { log.Printf("Did not uploads events with tag %s since buffer is empty", m.Tag) - return nil + return } - err = m.ToS3(config, uploader) - return err + m.toS3(config, uploader) } // Sends Zstd buffer to s3 and reset writer and buffers for future uploads. Prior to upload, IR @@ -139,10 +134,7 @@ func (m *S3EventManager) Upload(config S3Config, uploader *manager.Uploader) err // Parameters: // - config: Plugin configuration // - uploader: S3 uploader manager -// -// Returns: -// - err: Error creating closing streams, error uploading to s3, error resetting writer -func (m *S3EventManager) ToS3(config S3Config, uploader *manager.Uploader) error { +func (m *S3EventManager) toS3(config S3Config, uploader *manager.Uploader) { err := m.Writer.CloseStreams() if err != nil { panic(fmt.Errorf("error closing irzstd stream: %w", err)) @@ -156,8 +148,8 @@ func (m *S3EventManager) ToS3(config S3Config, uploader *manager.Uploader) error uploader, ) if err != nil { - err = fmt.Errorf("failed to upload chunk to s3, %w", err) - return err + log.Print(fmt.Errorf("S3 request failed for event manager with tag %s: %w", m.Tag, err)) + return } m.Index += 1 @@ -168,8 +160,6 @@ func (m *S3EventManager) ToS3(config S3Config, uploader *manager.Uploader) error if err != nil { panic(fmt.Errorf("error resetting irzstd stream: %w", err)) } - - return nil } // Uploads log events to s3. From 45ffa05d73f01aeaf23056215be7ad0eeaeafeae Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Mon, 19 Aug 2024 18:53:05 +0000 Subject: [PATCH 04/12] refactor --- internal/outctx/manager.go | 4 +++- plugins/out_clp_s3/internal/flush/flush.go | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/outctx/manager.go b/internal/outctx/manager.go index 4a31853..a75e158 100644 --- a/internal/outctx/manager.go +++ b/internal/outctx/manager.go @@ -47,7 +47,7 @@ func (m *S3EventManager) StopListening() { // closed which will allow the callee to break out of infinite loop. When function does exit, it // decrements a WaitGroup signaling that the goroutine has exited. WaitGroup allows graceful exit // of listener when Fluent Bit receives a kill signal. On [recovery.GracefulExit], plugin will -// wait to exit until all listeners are closed. Without WaitGroup, OS may abruptly kill goroutines. +// wait to exit until all listeners are closed. Without WaitGroup, OS may abruptly kill goroutine. // // Parameters: // - config: Plugin configuration @@ -74,6 +74,7 @@ func (m *S3EventManager) diskUploadListener(config S3Config, uploader *manager.U for { select { case _, more := <-m.UploadRequests: + // Exit if channel is closed if !more { return } @@ -94,6 +95,7 @@ func (m *S3EventManager) diskUploadListener(config S3Config, uploader *manager.U func (m *S3EventManager) memoryUploadListener(config S3Config, uploader *manager.Uploader) { for { _, more := <-m.UploadRequests + // Exit if channel is closed if !more { return } diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index 8809082..d00dd84 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -152,8 +152,8 @@ func getMessage(jsonRecord []byte, config outctx.S3Config) (string, error) { return stringMsg, nil } -// Writes logEvents to event manager buffers. If upload criteria is met, sends upload signal to -// manager's UploadRequest channel. Method acquires lock to prevent upload while writing +// Writes logEvents to event manager buffer. If upload criteria is met, sends upload signal to +// [EventManager.UploadRequests] channel. Method acquires lock to prevent upload while writing. // // Parameters: // - eventManager: Manager for Fluent Bit events with the same tag From 93590e3163f3dae798da0b8b76ad1df5a918e3b6 Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Mon, 19 Aug 2024 19:19:28 +0000 Subject: [PATCH 05/12] add for waitgroup --- internal/outctx/context.go | 5 +++++ internal/outctx/manager.go | 4 +++- plugins/out_clp_s3/internal/flush/flush.go | 4 +++- plugins/out_clp_s3/internal/recovery/recovery.go | 5 +++-- 4 files changed, 14 insertions(+), 4 deletions(-) diff --git a/internal/outctx/context.go b/internal/outctx/context.go index f9b7b63..caa45aa 100644 --- a/internal/outctx/context.go +++ b/internal/outctx/context.go @@ -8,6 +8,7 @@ import ( "context" "errors" "fmt" + "log" "path/filepath" "unsafe" @@ -167,6 +168,8 @@ func (ctx *S3Context) RecoverEventManager( UploadRequests: make(chan bool), } + log.Printf("Starting upload listener for event manager with tag %s", tag) + eventManager.WaitGroup.Add(1) go eventManager.listen(ctx.Config, ctx.Uploader) ctx.EventManagers[tag] = &eventManager @@ -215,6 +218,8 @@ func (ctx *S3Context) newEventManager( UploadRequests: make(chan bool), } + log.Printf("Starting upload listener for event manager with tag %s", tag) + eventManager.WaitGroup.Add(1) go eventManager.listen(ctx.Config, ctx.Uploader) ctx.EventManagers[tag] = &eventManager diff --git a/internal/outctx/manager.go b/internal/outctx/manager.go index a75e158..8e6981e 100644 --- a/internal/outctx/manager.go +++ b/internal/outctx/manager.go @@ -53,7 +53,6 @@ func (m *S3EventManager) StopListening() { // - config: Plugin configuration // - uploader: S3 uploader manager func (m *S3EventManager) listen(config S3Config, uploader *manager.Uploader) { - log.Printf("Starting upload listener for event manager with tag %s", m.Tag) defer m.WaitGroup.Done() m.listening = true @@ -74,12 +73,14 @@ func (m *S3EventManager) diskUploadListener(config S3Config, uploader *manager.U for { select { case _, more := <-m.UploadRequests: + log.Printf("Listener received upload request for event manager with tag %s", m.Tag) // Exit if channel is closed if !more { return } // Timeout will reset if signal sent on UploadRequest channel case <-time.After(config.Timeout): + log.Printf("Timeout surpassed for event manager with tag %s", m.Tag) } m.upload(config, uploader) @@ -95,6 +96,7 @@ func (m *S3EventManager) diskUploadListener(config S3Config, uploader *manager.U func (m *S3EventManager) memoryUploadListener(config S3Config, uploader *manager.Uploader) { for { _, more := <-m.UploadRequests + log.Printf("Listener received upload request for event manager with tag %s", m.Tag) // Exit if channel is closed if !more { return diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index d00dd84..f7101d6 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -190,7 +190,9 @@ func write( } if uploadCriteriaMet { - log.Printf("Sending upload request to upload listener with tag %s", eventManager.Tag) + // Possible that this occurs before listener actually starts. This is not an issue as channel + // already exists, and listener will start eventually. + log.Printf("Send upload request to channel with tag %s", eventManager.Tag) eventManager.UploadRequests <- true } diff --git a/plugins/out_clp_s3/internal/recovery/recovery.go b/plugins/out_clp_s3/internal/recovery/recovery.go index a6cde9f..e318710 100644 --- a/plugins/out_clp_s3/internal/recovery/recovery.go +++ b/plugins/out_clp_s3/internal/recovery/recovery.go @@ -211,10 +211,11 @@ func flushExistingBuffer( if err != nil { return fmt.Errorf("error recovering event manager with tag: %w", err) } - log.Printf("Recovered disk buffers with tag %s", tag) - log.Printf("Sending upload request to upload listener with tag %s", eventManager.Tag) + // Possible that this occurs before listener actually starts. This is not an issue as channel + // already exists, and listener will start eventually. + log.Printf("Send upload request to channel with tag %s", eventManager.Tag) eventManager.UploadRequests <- true return nil } From 5b9c7b6bd46dc900c8ac2640a8dfa6819886c52c Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Mon, 19 Aug 2024 19:48:18 +0000 Subject: [PATCH 06/12] small changes fix logging --- internal/irzstd/disk.go | 13 +++++++++++++ internal/irzstd/memory.go | 14 ++++++++++++++ internal/irzstd/writer.go | 6 ++++++ internal/outctx/manager.go | 19 +++++++++++-------- plugins/out_clp_s3/internal/flush/flush.go | 4 +--- .../out_clp_s3/internal/recovery/recovery.go | 5 +---- plugins/out_clp_s3/out_clp_s3.go | 17 ++++++++++------- 7 files changed, 56 insertions(+), 22 deletions(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index a6b462f..1ab1deb 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -40,6 +40,7 @@ type diskWriter struct { timezone string irTotalBytes int zstdWriter *zstd.Encoder + closed bool } // Opens a new [diskWriter] using files for IR and Zstd buffers. For use when use_disk_store @@ -203,6 +204,8 @@ func (w *diskWriter) CloseStreams() error { return err } + w.closed = true + return nil } @@ -237,6 +240,8 @@ func (w *diskWriter) Reset() error { w.zstdWriter.Reset(w.zstdFile) + w.closed = false + return nil } @@ -277,6 +282,14 @@ func (w *diskWriter) GetUseDiskBuffer() bool { return true } +// Getter for closed. +// +// Returns: +// - closed: Boolean that is true if IR and Zstd streams are closed. +func (w *diskWriter) GetClosed() bool { + return w.closed +} + // Getter for Zstd Output. // // Returns: diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index cb21b32..c764d8e 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -20,6 +20,7 @@ type memoryWriter struct { size int timezone string zstdWriter *zstd.Encoder + closed bool } // Opens a new [memoryWriter] with a memory buffer for Zstd output. For use when use_disk_store is @@ -82,6 +83,9 @@ func (w *memoryWriter) CloseStreams() error { w.irWriter = nil err = w.zstdWriter.Close() + + w.closed = true + return err } @@ -99,6 +103,8 @@ func (w *memoryWriter) Reset() error { w.zstdBuffer.Reset() w.zstdWriter.Reset(w.zstdBuffer) + + w.closed = false return nil } @@ -110,6 +116,14 @@ func (w *memoryWriter) GetUseDiskBuffer() bool { return false } +// Getter for closed. +// +// Returns: +// - closed: Boolean that is true if IR and Zstd streams are closed. +func (w *memoryWriter) GetClosed() bool { + return w.closed +} + // Getter for Zstd Output. // // Returns: diff --git a/internal/irzstd/writer.go b/internal/irzstd/writer.go index 5042a80..820c13a 100644 --- a/internal/irzstd/writer.go +++ b/internal/irzstd/writer.go @@ -37,6 +37,12 @@ type Writer interface { // - err Close() error + // Getter for closed. + // + // Returns: + // - closed: Boolean that is true if IR and Zstd streams are closed. + GetClosed() bool + // Reinitialize Writer after calling CloseStreams(). // // Returns: diff --git a/internal/outctx/manager.go b/internal/outctx/manager.go index 8e6981e..9e29b10 100644 --- a/internal/outctx/manager.go +++ b/internal/outctx/manager.go @@ -73,14 +73,14 @@ func (m *S3EventManager) diskUploadListener(config S3Config, uploader *manager.U for { select { case _, more := <-m.UploadRequests: - log.Printf("Listener received upload request for event manager with tag %s", m.Tag) + log.Printf("Listener with tag %s received upload request on channel", m.Tag) // Exit if channel is closed if !more { return } // Timeout will reset if signal sent on UploadRequest channel case <-time.After(config.Timeout): - log.Printf("Timeout surpassed for event manager with tag %s", m.Tag) + log.Printf("Timeout surpassed for listener with tag %s", m.Tag) } m.upload(config, uploader) @@ -96,7 +96,7 @@ func (m *S3EventManager) diskUploadListener(config S3Config, uploader *manager.U func (m *S3EventManager) memoryUploadListener(config S3Config, uploader *manager.Uploader) { for { _, more := <-m.UploadRequests - log.Printf("Listener received upload request for event manager with tag %s", m.Tag) + log.Printf("Listener with tag %s received upload request on channel", m.Tag) // Exit if channel is closed if !more { return @@ -109,7 +109,7 @@ func (m *S3EventManager) memoryUploadListener(config S3Config, uploader *manager // Uploads to s3 after acquiring lock and validating that buffer is not empty. Mutex prevents // write while uploading. Must check that buffer is not empty as timeout can trigger on empty // buffer and send empty file to s3. Empty buffer check is not explicitly necessary for -// MemoryUploadListener. +// MemoryUploadListener. Panics instead of returning error. // // Parameters: // - config: Plugin configuration @@ -133,15 +133,18 @@ func (m *S3EventManager) upload(config S3Config, uploader *manager.Uploader) { // Sends Zstd buffer to s3 and reset writer and buffers for future uploads. Prior to upload, IR // buffer is flushed and IR/Zstd streams are terminated. The [S3EventManager.Index] is incremented -// on successful upload. +// on successful upload. Logs errors with s3 request, otherwise panics instead on error. Errors +// closing and resetting writer are difficult to recover from. // // Parameters: // - config: Plugin configuration // - uploader: S3 uploader manager func (m *S3EventManager) toS3(config S3Config, uploader *manager.Uploader) { - err := m.Writer.CloseStreams() - if err != nil { - panic(fmt.Errorf("error closing irzstd stream: %w", err)) + if !m.Writer.GetClosed() { + err := m.Writer.CloseStreams() + if err != nil { + panic(fmt.Errorf("error closing irzstd stream: %w", err)) + } } outputLocation, err := s3Request( diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index f7101d6..50de6ac 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -190,9 +190,7 @@ func write( } if uploadCriteriaMet { - // Possible that this occurs before listener actually starts. This is not an issue as channel - // already exists, and listener will start eventually. - log.Printf("Send upload request to channel with tag %s", eventManager.Tag) + log.Printf("Sending upload request to channel with tag %s", eventManager.Tag) eventManager.UploadRequests <- true } diff --git a/plugins/out_clp_s3/internal/recovery/recovery.go b/plugins/out_clp_s3/internal/recovery/recovery.go index e318710..84fa852 100644 --- a/plugins/out_clp_s3/internal/recovery/recovery.go +++ b/plugins/out_clp_s3/internal/recovery/recovery.go @@ -212,10 +212,7 @@ func flushExistingBuffer( return fmt.Errorf("error recovering event manager with tag: %w", err) } log.Printf("Recovered disk buffers with tag %s", tag) - - // Possible that this occurs before listener actually starts. This is not an issue as channel - // already exists, and listener will start eventually. - log.Printf("Send upload request to channel with tag %s", eventManager.Tag) + log.Printf("Sending upload request to channel with tag %s", eventManager.Tag) eventManager.UploadRequests <- true return nil } diff --git a/plugins/out_clp_s3/out_clp_s3.go b/plugins/out_clp_s3/out_clp_s3.go index c0591df..c0f2175 100644 --- a/plugins/out_clp_s3/out_clp_s3.go +++ b/plugins/out_clp_s3/out_clp_s3.go @@ -10,6 +10,7 @@ package main import ( "C" + "fmt" "log" "unsafe" @@ -32,7 +33,10 @@ const s3PluginName = "out_clp_s3" // //export FLBPluginRegister func FLBPluginRegister(def unsafe.Pointer) int { - log.Printf("[%s] Register called", s3PluginName) + logPrefix := fmt.Sprintf("[%s] ", s3PluginName) + log.SetPrefix(logPrefix) + log.SetFlags(log.LstdFlags|log.Lmsgprefix) + log.Printf("Register called") return output.FLBPluginRegister(def, s3PluginName, "CLP s3 plugin") } @@ -51,7 +55,7 @@ func FLBPluginInit(plugin unsafe.Pointer) int { log.Fatalf("Failed to initialize plugin: %s", err) } - log.Printf("[%s] Init called for id: %s", s3PluginName, outCtx.Config.Id) + log.Printf("Init called for id: %s", outCtx.Config.Id) if outCtx.Config.UseDiskBuffer { err = recovery.RecoverBufferFiles(outCtx) @@ -89,8 +93,7 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int stringTag := C.GoString(tag) log.Printf( - "[%s] Flush called for id %s with tag %s and size %d", - s3PluginName, + "Flush called for id %s with tag %s and size %d", outCtx.Config.Id, stringTag, size, @@ -108,7 +111,7 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int //export FLBPluginExit func FLBPluginExit() int { - log.Printf("[%s] Exit called for unknown instance", s3PluginName) + log.Printf("Exit called for unknown instance") return output.FLB_OK } @@ -130,7 +133,7 @@ func FLBPluginExitCtx(ctx unsafe.Pointer) int { log.Fatal("Could not read context during flush") } - log.Printf("[%s] Exit called for id: %s", s3PluginName, outCtx.Config.Id) + log.Printf("Exit called for id: %s", outCtx.Config.Id) err := recovery.GracefulExit(outCtx) if err != nil { @@ -142,7 +145,7 @@ func FLBPluginExitCtx(ctx unsafe.Pointer) int { //export FLBPluginUnregister func FLBPluginUnregister(def unsafe.Pointer) { - log.Printf("[%s] Unregister called", s3PluginName) + log.Printf("Unregister called") output.FLBPluginUnregister(def) } From d840a6dcd323af0103d0828081a1fc6a7621b425 Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Mon, 19 Aug 2024 20:41:27 +0000 Subject: [PATCH 07/12] fix logs --- internal/irzstd/disk.go | 3 ++- internal/irzstd/memory.go | 16 ++++++++++--- internal/outctx/manager.go | 24 ++++++++++++++----- .../out_clp_s3/internal/recovery/recovery.go | 2 +- 4 files changed, 34 insertions(+), 11 deletions(-) diff --git a/internal/irzstd/disk.go b/internal/irzstd/disk.go index 1ab1deb..4a49e43 100644 --- a/internal/irzstd/disk.go +++ b/internal/irzstd/disk.go @@ -328,7 +328,8 @@ func (w *diskWriter) CheckEmpty() (bool, error) { // Not checking internal IR buffer since should it since should always be empty from // perspective of interface. The only time not empty is inside WriteIrZstd, however, it will // be empty again when function terminates. - return (zstdFileInfo.Size() == 0 && w.irTotalBytes == 0), nil + empty := (zstdFileInfo.Size() == 0) && (w.irTotalBytes == 0) + return empty, nil } // Compresses contents of the IR file and outputs it to the Zstd file. The IR file is then diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index c764d8e..238f124 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -66,6 +66,10 @@ func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { } _, err = w.irWriter.WriteTo(w.zstdWriter) + if err != nil { + return numEvents, err + } + return numEvents, err } @@ -133,25 +137,31 @@ func (w *memoryWriter) GetZstdOutput() io.Reader { } // Get size of Zstd output. [zstd] does not provide the amount of bytes written with each write. -// Instead, calling Len() on buffer. +// Instead, calling Len() on buffer. Try to avoid calling this as will flush Zstd Writer +// potentially creating unnecessary frames. // // Returns: // - size: Bytes written // - err: nil error to comply with interface func (w *memoryWriter) GetZstdOutputSize() (int, error) { + w.zstdWriter.Flush() return w.zstdBuffer.Len(), nil } -// Checks if writer is empty. True if no events are buffered. +// Checks if writer is empty. True if no events are buffered. Try to avoid calling this as will +// flush Zstd Writer potentially creating unnecessary frames. // // Returns: // - empty: Boolean value that is true if buffer is empty // - err: nil error to comply with interface func (w *memoryWriter) CheckEmpty() (bool, error) { + w.zstdWriter.Flush() + // Not checking internal IR buffer since should it since should always be empty from // perspective of interface. The only time not empty is inside WriteIrZstd, however, it will // be empty again when function terminates. - return w.zstdBuffer.Len() == 0, nil + empty := w.zstdBuffer.Len() == 0 + return empty, nil } // Closes [memoryWriter]. Currently used during recovery only, and advise caution using elsewhere. diff --git a/internal/outctx/manager.go b/internal/outctx/manager.go index 9e29b10..857c2cc 100644 --- a/internal/outctx/manager.go +++ b/internal/outctx/manager.go @@ -73,17 +73,17 @@ func (m *S3EventManager) diskUploadListener(config S3Config, uploader *manager.U for { select { case _, more := <-m.UploadRequests: - log.Printf("Listener with tag %s received upload request on channel", m.Tag) // Exit if channel is closed if !more { return } + log.Printf("Listener with tag %s received upload request on channel", m.Tag) // Timeout will reset if signal sent on UploadRequest channel case <-time.After(config.Timeout): log.Printf("Timeout surpassed for listener with tag %s", m.Tag) } - m.upload(config, uploader) + m.diskUpload(config, uploader) } } @@ -102,19 +102,18 @@ func (m *S3EventManager) memoryUploadListener(config S3Config, uploader *manager return } - m.upload(config, uploader) + m.memoryUpload(config, uploader) } } // Uploads to s3 after acquiring lock and validating that buffer is not empty. Mutex prevents // write while uploading. Must check that buffer is not empty as timeout can trigger on empty -// buffer and send empty file to s3. Empty buffer check is not explicitly necessary for -// MemoryUploadListener. Panics instead of returning error. +// buffer and send empty file to s3. Panics instead of returning error. // // Parameters: // - config: Plugin configuration // - uploader: S3 uploader manager -func (m *S3EventManager) upload(config S3Config, uploader *manager.Uploader) { +func (m *S3EventManager) diskUpload(config S3Config, uploader *manager.Uploader) { m.Mutex.Lock() defer m.Mutex.Unlock() @@ -131,6 +130,19 @@ func (m *S3EventManager) upload(config S3Config, uploader *manager.Uploader) { m.toS3(config, uploader) } +// See [diskUpload]; however, not necessary to check size of buffer since there +// is no timeout. MemoryUpload cannot be called with empty buffer. +// +// Parameters: +// - config: Plugin configuration +// - uploader: S3 uploader manager +func (m *S3EventManager) memoryUpload(config S3Config, uploader *manager.Uploader) { + m.Mutex.Lock() + defer m.Mutex.Unlock() + + m.toS3(config, uploader) +} + // Sends Zstd buffer to s3 and reset writer and buffers for future uploads. Prior to upload, IR // buffer is flushed and IR/Zstd streams are terminated. The [S3EventManager.Index] is incremented // on successful upload. Logs errors with s3 request, otherwise panics instead on error. Errors diff --git a/plugins/out_clp_s3/internal/recovery/recovery.go b/plugins/out_clp_s3/internal/recovery/recovery.go index 84fa852..e507cb5 100644 --- a/plugins/out_clp_s3/internal/recovery/recovery.go +++ b/plugins/out_clp_s3/internal/recovery/recovery.go @@ -25,12 +25,12 @@ import ( // - err: Error closing file func GracefulExit(ctx *outctx.S3Context) error { for _, eventManager := range ctx.EventManagers { + eventManager.StopListening() err := eventManager.Writer.Close() if err != nil { return err } eventManager.Writer = nil - eventManager.StopListening() } return nil From 06d056603c5edbb0c4985b9e2489383cab424c13 Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Mon, 19 Aug 2024 22:43:57 +0000 Subject: [PATCH 08/12] add line seperator --- plugins/out_clp_s3/internal/flush/flush.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index 50de6ac..1b54ff0 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -81,7 +81,7 @@ func decodeMsgpack(dec *codec.Decoder, config outctx.S3Config) ([]ffi.LogEvent, } event := ffi.LogEvent{ - LogMessage: msg, + LogMessage: addSpaceAndNewLine(msg), Timestamp: ffi.EpochTimeMs(timestamp.UnixMilli()), } logEvents = append(logEvents, event) @@ -232,3 +232,16 @@ func checkUploadCriteriaMet(eventManager *outctx.S3EventManager, uploadSizeMb in return false, nil } + +// Decompressed IR streams appear as one concatenated string. Adding a space separates the log +// from the timestamp. Adding a new line separates logs from each other. +// +// Parameters: +// - msg: Log event message +// +// Returns: +// - modifiedMsg: Message with space at beginning at newline at end +func addSpaceAndNewLine(msg string) string { + modifiedMsg := " " + msg + "\n" + return modifiedMsg +} From eb884b5947f45f46d2ff6b6f1fe0d5877572945e Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Mon, 19 Aug 2024 22:50:02 +0000 Subject: [PATCH 09/12] remove extra accident err check --- internal/irzstd/memory.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index 238f124..ef9744f 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -66,9 +66,6 @@ func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) { } _, err = w.irWriter.WriteTo(w.zstdWriter) - if err != nil { - return numEvents, err - } return numEvents, err } @@ -157,9 +154,7 @@ func (w *memoryWriter) GetZstdOutputSize() (int, error) { func (w *memoryWriter) CheckEmpty() (bool, error) { w.zstdWriter.Flush() - // Not checking internal IR buffer since should it since should always be empty from - // perspective of interface. The only time not empty is inside WriteIrZstd, however, it will - // be empty again when function terminates. + empty := w.zstdBuffer.Len() == 0 return empty, nil } From 4d5eb456e53e11abd74c4246c0ebe35467cd4f8c Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Mon, 19 Aug 2024 23:10:05 +0000 Subject: [PATCH 10/12] small cleanup --- internal/outctx/manager.go | 22 +++++++++++++--------- plugins/out_clp_s3/README.md | 10 +++++----- plugins/out_clp_s3/internal/flush/flush.go | 2 +- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/internal/outctx/manager.go b/internal/outctx/manager.go index 857c2cc..e6c0015 100644 --- a/internal/outctx/manager.go +++ b/internal/outctx/manager.go @@ -43,11 +43,12 @@ func (m *S3EventManager) StopListening() { // Starts upload listener which can receive signals on UploadRequests channel. This function should // be called as a goroutine. Timeout is only triggered if use_disk_buffer is on. Function calls -// immortal functions and thus will not exit. It will only exit if the uploadRequest channel is -// closed which will allow the callee to break out of infinite loop. When function does exit, it -// decrements a WaitGroup signaling that the goroutine has exited. WaitGroup allows graceful exit -// of listener when Fluent Bit receives a kill signal. On [recovery.GracefulExit], plugin will -// wait to exit until all listeners are closed. Without WaitGroup, OS may abruptly kill goroutine. +// immortal functions and thus will not exit immediately. Instead, it will only exit if the +// uploadRequest channel is closed which will allow immortal functions to break out of infinite +// loop. When function does exit, it decrements a WaitGroup letting event manager know it has exited. +// WaitGroup allows graceful exit of listener when Fluent Bit receives a kill signal. On +// [recovery.GracefulExit], plugin will wait to exit until all listeners are closed. Without +// WaitGroup, OS may abruptly kill listen goroutine. // // Parameters: // - config: Plugin configuration @@ -96,19 +97,18 @@ func (m *S3EventManager) diskUploadListener(config S3Config, uploader *manager.U func (m *S3EventManager) memoryUploadListener(config S3Config, uploader *manager.Uploader) { for { _, more := <-m.UploadRequests - log.Printf("Listener with tag %s received upload request on channel", m.Tag) // Exit if channel is closed if !more { return } - + log.Printf("Listener with tag %s received upload request on channel", m.Tag) m.memoryUpload(config, uploader) } } // Uploads to s3 after acquiring lock and validating that buffer is not empty. Mutex prevents // write while uploading. Must check that buffer is not empty as timeout can trigger on empty -// buffer and send empty file to s3. Panics instead of returning error. +// buffer and send empty file to s3. Panics or logs instead of returning error. // // Parameters: // - config: Plugin configuration @@ -131,7 +131,8 @@ func (m *S3EventManager) diskUpload(config S3Config, uploader *manager.Uploader) } // See [diskUpload]; however, not necessary to check size of buffer since there -// is no timeout. MemoryUpload cannot be called with empty buffer. +// is no timeout. MemoryUpload cannot be called with empty buffer. Panics or logs +// instead of returning error. // // Parameters: // - config: Plugin configuration @@ -152,6 +153,9 @@ func (m *S3EventManager) memoryUpload(config S3Config, uploader *manager.Uploade // - config: Plugin configuration // - uploader: S3 uploader manager func (m *S3EventManager) toS3(config S3Config, uploader *manager.Uploader) { + // In normal operation, writer GetClosed() should always return false. i.e. writer is open and + // the stream should be closed. However, if a s3 request fails, it is already closed. + // Therefore, on retry we don't want to close again. if !m.Writer.GetClosed() { err := m.Writer.CloseStreams() if err != nil { diff --git a/plugins/out_clp_s3/README.md b/plugins/out_clp_s3/README.md index b2ab979..8e5f7c0 100644 --- a/plugins/out_clp_s3/README.md +++ b/plugins/out_clp_s3/README.md @@ -109,11 +109,11 @@ the record as JSON. #### Disk Buffering The output plugin receives raw logs from Fluent Bit in small chunks. With `use_disk_buffer` set, the -output plugin will accumulate logs on disk until the upload size is reached. Buffering logs will -reduce the amount of S3 API requests and improve the compression ratio. However, the plugin will use -disk space and have higher memory requirements. The amount of system resources will be proportional -to the amount of Fluent Bit tags. With `use_disk_buffer` off, the plugin will immediately process -each chunk and send it to S3. +output plugin will accumulate logs on disk until the upload size or timeout is reached. Buffering +logs will reduce the amount of S3 API requests and improve the compression ratio. However, the plugin +will use disk space and have higher memory requirements. The amount of system resources will be +proportional to the amount of Fluent Bit tags. With `use_disk_buffer` off, the plugin will immediately +process each chunk and send it to S3. Logs are stored on the disk as IR and Zstd compressed IR. If the plugin were to crash, stored logs will be sent to S3 when Fluent Bit restarts. The upload index restarts on recovery. diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index 1b54ff0..8288cb3 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -153,7 +153,7 @@ func getMessage(jsonRecord []byte, config outctx.S3Config) (string, error) { } // Writes logEvents to event manager buffer. If upload criteria is met, sends upload signal to -// [EventManager.UploadRequests] channel. Method acquires lock to prevent upload while writing. +// upload request channel. Method acquires lock to prevent upload while writing. // // Parameters: // - eventManager: Manager for Fluent Bit events with the same tag From da19622581af90f50565378450fabd96932e0eb5 Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Mon, 19 Aug 2024 23:52:19 +0000 Subject: [PATCH 11/12] fix linting --- internal/irzstd/memory.go | 1 - internal/outctx/manager.go | 4 ++-- plugins/out_clp_s3/internal/flush/flush.go | 4 ++-- plugins/out_clp_s3/out_clp_s3.go | 2 +- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/internal/irzstd/memory.go b/internal/irzstd/memory.go index ef9744f..4f007bf 100644 --- a/internal/irzstd/memory.go +++ b/internal/irzstd/memory.go @@ -154,7 +154,6 @@ func (w *memoryWriter) GetZstdOutputSize() (int, error) { func (w *memoryWriter) CheckEmpty() (bool, error) { w.zstdWriter.Flush() - empty := w.zstdBuffer.Len() == 0 return empty, nil } diff --git a/internal/outctx/manager.go b/internal/outctx/manager.go index e6c0015..deffcf0 100644 --- a/internal/outctx/manager.go +++ b/internal/outctx/manager.go @@ -45,8 +45,8 @@ func (m *S3EventManager) StopListening() { // be called as a goroutine. Timeout is only triggered if use_disk_buffer is on. Function calls // immortal functions and thus will not exit immediately. Instead, it will only exit if the // uploadRequest channel is closed which will allow immortal functions to break out of infinite -// loop. When function does exit, it decrements a WaitGroup letting event manager know it has exited. -// WaitGroup allows graceful exit of listener when Fluent Bit receives a kill signal. On +// loop. When function does exit, it decrements a WaitGroup letting event manager know it has +// exited. WaitGroup allows graceful exit of listener when Fluent Bit receives a kill signal. On // [recovery.GracefulExit], plugin will wait to exit until all listeners are closed. Without // WaitGroup, OS may abruptly kill listen goroutine. // diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index 8288cb3..d6e2b70 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -242,6 +242,6 @@ func checkUploadCriteriaMet(eventManager *outctx.S3EventManager, uploadSizeMb in // Returns: // - modifiedMsg: Message with space at beginning at newline at end func addSpaceAndNewLine(msg string) string { - modifiedMsg := " " + msg + "\n" - return modifiedMsg + modifiedMsg := " " + msg + "\n" + return modifiedMsg } diff --git a/plugins/out_clp_s3/out_clp_s3.go b/plugins/out_clp_s3/out_clp_s3.go index c0f2175..758c5f4 100644 --- a/plugins/out_clp_s3/out_clp_s3.go +++ b/plugins/out_clp_s3/out_clp_s3.go @@ -35,7 +35,7 @@ const s3PluginName = "out_clp_s3" func FLBPluginRegister(def unsafe.Pointer) int { logPrefix := fmt.Sprintf("[%s] ", s3PluginName) log.SetPrefix(logPrefix) - log.SetFlags(log.LstdFlags|log.Lmsgprefix) + log.SetFlags(log.LstdFlags | log.Lmsgprefix) log.Printf("Register called") return output.FLBPluginRegister(def, s3PluginName, "CLP s3 plugin") } From d0574d5f9db23d56bbad641f56cd8bc9bd1efde0 Mon Sep 17 00:00:00 2001 From: Dave Marco Date: Wed, 21 Aug 2024 19:58:01 +0000 Subject: [PATCH 12/12] small nits: --- internal/outctx/manager.go | 11 ++++++++--- plugins/out_clp_s3/internal/flush/flush.go | 2 +- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/internal/outctx/manager.go b/internal/outctx/manager.go index deffcf0..a94a3bb 100644 --- a/internal/outctx/manager.go +++ b/internal/outctx/manager.go @@ -27,18 +27,23 @@ type S3EventManager struct { Mutex sync.Mutex WaitGroup sync.WaitGroup UploadRequests chan bool - listening bool + Listening bool } // Ends listener goroutine. func (m *S3EventManager) StopListening() { + + if !m.Listening { + return + } + log.Printf("Stopping upload listener for event manager with tag %s", m.Tag) // Closing the channel sends terminate signal to goroutine. The WaitGroup // will block until it actually terminates. close(m.UploadRequests) m.WaitGroup.Wait() - m.listening = false + m.Listening = false } // Starts upload listener which can receive signals on UploadRequests channel. This function should @@ -56,7 +61,7 @@ func (m *S3EventManager) StopListening() { func (m *S3EventManager) listen(config S3Config, uploader *manager.Uploader) { defer m.WaitGroup.Done() - m.listening = true + m.Listening = true if m.Writer.GetUseDiskBuffer() { m.diskUploadListener(config, uploader) } else { diff --git a/plugins/out_clp_s3/internal/flush/flush.go b/plugins/out_clp_s3/internal/flush/flush.go index d6e2b70..050d3cf 100644 --- a/plugins/out_clp_s3/internal/flush/flush.go +++ b/plugins/out_clp_s3/internal/flush/flush.go @@ -240,7 +240,7 @@ func checkUploadCriteriaMet(eventManager *outctx.S3EventManager, uploadSizeMb in // - msg: Log event message // // Returns: -// - modifiedMsg: Message with space at beginning at newline at end +// - modifiedMsg: Message with space at beginning and newline at end func addSpaceAndNewLine(msg string) string { modifiedMsg := " " + msg + "\n" return modifiedMsg