Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add upload timeout when using disk buffer to maintain fresh logs on s3. #8

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
30 changes: 30 additions & 0 deletions internal/irzstd/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type diskWriter struct {
timezone string
irTotalBytes int
zstdWriter *zstd.Encoder
closed bool
}

// Opens a new [diskWriter] using files for IR and Zstd buffers. For use when use_disk_store
Expand Down Expand Up @@ -203,6 +204,8 @@ func (w *diskWriter) CloseStreams() error {
return err
}

w.closed = true

return nil
}

Expand Down Expand Up @@ -237,6 +240,8 @@ func (w *diskWriter) Reset() error {

w.zstdWriter.Reset(w.zstdFile)

w.closed = false

return nil
}

Expand Down Expand Up @@ -277,6 +282,14 @@ func (w *diskWriter) GetUseDiskBuffer() bool {
return true
}

// Getter for closed.
//
// Returns:
// - closed: Boolean that is true if IR and Zstd streams are closed.
func (w *diskWriter) GetClosed() bool {
return w.closed
}

// Getter for Zstd Output.
//
// Returns:
Expand All @@ -302,6 +315,23 @@ func (w *diskWriter) GetZstdOutputSize() (int, error) {
return zstdFileSize, err
}

// Checks if writer is empty. True if no events are buffered.
//
// Returns:
// - empty: Boolean value that is true if buffer is empty
// - err: Error calling stat
func (w *diskWriter) CheckEmpty() (bool, error) {
zstdFileInfo, err := w.zstdFile.Stat()
if err != nil {
return false, err
}
// Not checking internal IR buffer since should it since should always be empty from
// perspective of interface. The only time not empty is inside WriteIrZstd, however, it will
// be empty again when function terminates.
empty := (zstdFileInfo.Size() == 0) && (w.irTotalBytes == 0)
return empty, nil
}

// Compresses contents of the IR file and outputs it to the Zstd file. The IR file is then
// truncated.
//
Expand Down
32 changes: 31 additions & 1 deletion internal/irzstd/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type memoryWriter struct {
size int
timezone string
zstdWriter *zstd.Encoder
closed bool
}

// Opens a new [memoryWriter] with a memory buffer for Zstd output. For use when use_disk_store is
Expand Down Expand Up @@ -65,6 +66,7 @@ func (w *memoryWriter) WriteIrZstd(logEvents []ffi.LogEvent) (int, error) {
}

_, err = w.irWriter.WriteTo(w.zstdWriter)

return numEvents, err
}

Expand All @@ -82,6 +84,9 @@ func (w *memoryWriter) CloseStreams() error {
w.irWriter = nil

err = w.zstdWriter.Close()

w.closed = true

return err
}

Expand All @@ -99,6 +104,8 @@ func (w *memoryWriter) Reset() error {

w.zstdBuffer.Reset()
w.zstdWriter.Reset(w.zstdBuffer)

w.closed = false
return nil
}

Expand All @@ -110,6 +117,14 @@ func (w *memoryWriter) GetUseDiskBuffer() bool {
return false
}

// Getter for closed.
//
// Returns:
// - closed: Boolean that is true if IR and Zstd streams are closed.
func (w *memoryWriter) GetClosed() bool {
return w.closed
}

// Getter for Zstd Output.
//
// Returns:
Expand All @@ -119,15 +134,30 @@ func (w *memoryWriter) GetZstdOutput() io.Reader {
}

// Get size of Zstd output. [zstd] does not provide the amount of bytes written with each write.
// Instead, calling Len() on buffer.
// Instead, calling Len() on buffer. Try to avoid calling this as will flush Zstd Writer
// potentially creating unnecessary frames.
//
// Returns:
// - size: Bytes written
// - err: nil error to comply with interface
func (w *memoryWriter) GetZstdOutputSize() (int, error) {
w.zstdWriter.Flush()
return w.zstdBuffer.Len(), nil
}

// Checks if writer is empty. True if no events are buffered. Try to avoid calling this as will
// flush Zstd Writer potentially creating unnecessary frames.
//
// Returns:
// - empty: Boolean value that is true if buffer is empty
// - err: nil error to comply with interface
func (w *memoryWriter) CheckEmpty() (bool, error) {
w.zstdWriter.Flush()

empty := w.zstdBuffer.Len() == 0
return empty, nil
}

// Closes [memoryWriter]. Currently used during recovery only, and advise caution using elsewhere.
// Using [ir.Writer.Serializer.Close] instead of [ir.Writer.Close] so EndofStream byte is not
// added. It is preferable to add postamble on recovery so that IR is in the same state
Expand Down
13 changes: 13 additions & 0 deletions internal/irzstd/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ type Writer interface {
// - err
Close() error

// Getter for closed.
//
// Returns:
// - closed: Boolean that is true if IR and Zstd streams are closed.
GetClosed() bool

// Reinitialize Writer after calling CloseStreams().
//
// Returns:
Expand All @@ -61,6 +67,13 @@ type Writer interface {
// - size: Bytes written
// - err
GetZstdOutputSize() (int, error)

// Checks if writer is empty. True if no events are buffered.
//
// Returns:
// - empty: Boolean value that is true if buffer is empty
// - err
CheckEmpty() (bool, error)
}

// Writes log events to a IR Writer.
Expand Down
38 changes: 25 additions & 13 deletions internal/outctx/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"reflect"
"strconv"
"strings"
"time"
"unsafe"

"github.com/go-playground/validator/v10"
Expand All @@ -22,18 +23,19 @@ import (
//
//nolint:revive
type S3Config struct {
S3Region string `conf:"s3_region" validate:"required"`
S3Bucket string `conf:"s3_bucket" validate:"required"`
S3BucketPrefix string `conf:"s3_bucket_prefix" validate:"dirpath"`
RoleArn string `conf:"role_arn" validate:"omitempty,startswith=arn:aws:iam"`
Id string `conf:"id" validate:"required"`
UseSingleKey bool `conf:"use_single_key" validate:"-"`
AllowMissingKey bool `conf:"allow_missing_key" validate:"-"`
SingleKey string `conf:"single_key" validate:"required_if=use_single_key true"`
UseDiskBuffer bool `conf:"use_disk_buffer" validate:"-"`
DiskBufferPath string `conf:"disk_buffer_path" validate:"omitempty,dirpath"`
UploadSizeMb int `conf:"upload_size_mb" validate:"omitempty,gte=2,lt=1000"`
TimeZone string `conf:"time_zone" validate:"timezone"`
S3Region string `conf:"s3_region" validate:"required"`
S3Bucket string `conf:"s3_bucket" validate:"required"`
S3BucketPrefix string `conf:"s3_bucket_prefix" validate:"dirpath"`
RoleArn string `conf:"role_arn" validate:"omitempty,startswith=arn:aws:iam"`
Id string `conf:"id" validate:"required"`
UseSingleKey bool `conf:"use_single_key" validate:"-"`
AllowMissingKey bool `conf:"allow_missing_key" validate:"-"`
SingleKey string `conf:"single_key" validate:"required_if=use_single_key true"`
UseDiskBuffer bool `conf:"use_disk_buffer" validate:"-"`
DiskBufferPath string `conf:"disk_buffer_path" validate:"omitempty,dirpath"`
Timeout time.Duration `conf:"timeout" validate:"-"`
UploadSizeMb int `conf:"upload_size_mb" validate:"omitempty,gte=2,lt=1000"`
TimeZone string `conf:"time_zone" validate:"timezone"`
}

// Generates configuration struct containing user-defined settings. In addition, sets default values
Expand All @@ -46,6 +48,8 @@ type S3Config struct {
// - S3Config: Configuration based on fluent-bit.conf
// - err: All validation errors in config wrapped, parse bool error
func NewS3Config(plugin unsafe.Pointer) (*S3Config, error) {
defaultTimeout, _ := time.ParseDuration("15m")

// Define default values for settings. Setting defaults before validation simplifies validation
// configuration, and ensures that default settings are also validated.
config := S3Config{
Expand All @@ -59,6 +63,7 @@ func NewS3Config(plugin unsafe.Pointer) (*S3Config, error) {
SingleKey: "log",
UseDiskBuffer: true,
DiskBufferPath: "tmp/out_clp_s3/",
Timeout: defaultTimeout,
UploadSizeMb: 16,
TimeZone: "America/Toronto",
}
Expand All @@ -76,6 +81,7 @@ func NewS3Config(plugin unsafe.Pointer) (*S3Config, error) {
"single_key": &config.SingleKey,
"use_disk_buffer": &config.UseDiskBuffer,
"disk_buffer_path": &config.DiskBufferPath,
"timeout": &config.Timeout,
"upload_size_mb": &config.UploadSizeMb,
"time_zone": &config.TimeZone,
}
Expand All @@ -90,7 +96,7 @@ func NewS3Config(plugin unsafe.Pointer) (*S3Config, error) {
continue
}

// Type switch to type parse boolean strings into boolean type. This is necessary since
// Type switch to type parse interface into field type. This is necessary since
// all values are provided as strings.
switch configField := untypedField.(type) {
case *string:
Expand All @@ -102,6 +108,12 @@ func NewS3Config(plugin unsafe.Pointer) (*S3Config, error) {
return nil, fmt.Errorf("error could not parse input %v into bool", userInput)
}
*configField = boolInput
case *time.Duration:
durationInput, err := time.ParseDuration(userInput)
if err != nil {
return nil, fmt.Errorf("error could not parse input %v into duration", userInput)
}
*configField = durationInput
case *int:
intInput, err := strconv.Atoi(userInput)
if err != nil {
Expand Down
38 changes: 25 additions & 13 deletions internal/outctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"errors"
"fmt"
"log"
"path/filepath"
"unsafe"

Expand Down Expand Up @@ -42,7 +43,7 @@ const (
type S3Context struct {
Config S3Config
Uploader *manager.Uploader
EventManagers map[string]*EventManager
EventManagers map[string]*S3EventManager
}

// Creates a new context. Loads configuration from user. Loads and tests aws credentials.
Expand Down Expand Up @@ -108,7 +109,7 @@ func NewS3Context(plugin unsafe.Pointer) (*S3Context, error) {
ctx := S3Context{
Config: *config,
Uploader: uploader,
EventManagers: make(map[string]*EventManager),
EventManagers: make(map[string]*S3EventManager),
}

return &ctx, nil
Expand All @@ -123,7 +124,7 @@ func NewS3Context(plugin unsafe.Pointer) (*S3Context, error) {
//
// Returns:
// - err: Could not create buffers or tag
func (ctx *S3Context) GetEventManager(tag string, size int) (*EventManager, error) {
func (ctx *S3Context) GetEventManager(tag string, size int) (*S3EventManager, error) {
var err error
eventManager, ok := ctx.EventManagers[tag]

Expand All @@ -137,7 +138,7 @@ func (ctx *S3Context) GetEventManager(tag string, size int) (*EventManager, erro
return eventManager, nil
}

// Recovers [EventManager] from previous execution using existing disk buffers.
// Recovers [S3EventManager] from previous execution using existing disk buffers.
//
// Parameters:
// - tag: Fluent Bit tag
Expand All @@ -149,7 +150,7 @@ func (ctx *S3Context) GetEventManager(tag string, size int) (*EventManager, erro
func (ctx *S3Context) RecoverEventManager(
tag string,
size int,
) (*EventManager, error) {
) (*S3EventManager, error) {
irPath, zstdPath := ctx.GetBufferFilePaths(tag)
writer, err := irzstd.RecoverWriter(
ctx.Config.TimeZone,
Expand All @@ -161,17 +162,22 @@ func (ctx *S3Context) RecoverEventManager(
return nil, err
}

eventManager := EventManager{
Tag: tag,
Writer: writer,
eventManager := S3EventManager{
Tag: tag,
Writer: writer,
UploadRequests: make(chan bool),
}

log.Printf("Starting upload listener for event manager with tag %s", tag)
eventManager.WaitGroup.Add(1)
go eventManager.listen(ctx.Config, ctx.Uploader)

ctx.EventManagers[tag] = &eventManager

return &eventManager, nil
}

// Creates a new [EventManager] with a new [irzstd.Writer]. If UseDiskBuffer is set, buffers are
// Creates a new [S3EventManager] with a new [irzstd.Writer]. If UseDiskBuffer is set, buffers are
// created on disk and are used to buffer Fluent Bit chunks. If UseDiskBuffer is off, buffer is
// in memory and chunks are not buffered.
//
Expand All @@ -185,7 +191,7 @@ func (ctx *S3Context) RecoverEventManager(
func (ctx *S3Context) newEventManager(
tag string,
size int,
) (*EventManager, error) {
) (*S3EventManager, error) {
var err error
var writer irzstd.Writer

Expand All @@ -197,6 +203,7 @@ func (ctx *S3Context) newEventManager(
irPath,
zstdPath,
)

} else {
writer, err = irzstd.NewMemoryWriter(ctx.Config.TimeZone, size)
}
Expand All @@ -205,11 +212,16 @@ func (ctx *S3Context) newEventManager(
return nil, err
}

eventManager := EventManager{
Tag: tag,
Writer: writer,
eventManager := S3EventManager{
Tag: tag,
Writer: writer,
UploadRequests: make(chan bool),
}

log.Printf("Starting upload listener for event manager with tag %s", tag)
eventManager.WaitGroup.Add(1)
go eventManager.listen(ctx.Config, ctx.Uploader)

ctx.EventManagers[tag] = &eventManager

return &eventManager, nil
Expand Down
Loading