Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 40 additions & 83 deletions cshared.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package plugin
import "C"

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -17,7 +16,6 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"time"
"unsafe"

Expand All @@ -29,19 +27,10 @@ import (
"github.com/calyptia/plugin/output"
)

const (
// maxBufferedMessages is the number of messages that will be buffered
// between each fluent-bit interval (approx 1 second).
defaultMaxBufferedMessages = 300000
// collectInterval is set to the interval present before in core-fluent-bit.
collectInterval = 1000 * time.Nanosecond
)

var (
unregister func()
cmt *cmetrics.Context
logger Logger
maxBufferedMessages = defaultMaxBufferedMessages
unregister func()
cmt *cmetrics.Context
logger Logger
)

// FLBPluginRegister registers a plugin in the context of the fluent-bit runtime, a name and description
Expand Down Expand Up @@ -73,7 +62,7 @@ func FLBPluginRegister(def unsafe.Pointer) int {
}

// FLBPluginInit this method gets invoked once by the fluent-bit runtime at initialisation phase.
// here all the plugin context should be initialized and any data or flag required for
// here all the plugin context should be initialised and any data or flag required for
// plugins to execute the collect or flush callback.
//
//export FLBPluginInit
Expand Down Expand Up @@ -105,12 +94,6 @@ func FLBPluginInit(ptr unsafe.Pointer) int {
}

err = theInput.Init(ctx, fbit)
if maxbuffered := fbit.Conf.String("go.MaxBufferedMessages"); maxbuffered != "" {
maxbuffered, err := strconv.Atoi(maxbuffered)
if err != nil {
maxBufferedMessages = maxbuffered
}
}
} else {
conf := &flbOutputConfigLoader{ptr: ptr}
cmt, err = output.FLBPluginGetCMetricsContext(ptr)
Expand All @@ -134,7 +117,7 @@ func FLBPluginInit(ptr unsafe.Pointer) int {
}

// FLBPluginInputCallback this method gets invoked by the fluent-bit runtime, once the plugin has been
// initialized, the plugin implementation is responsible for handling the incoming data and the context
// initialised, the plugin implementation is responsible for handling the incoming data and the context
// that gets past, for long-living collectors the plugin itself should keep a running thread and fluent-bit
// will not execute further callbacks.
//
Expand All @@ -147,75 +130,50 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int {
return input.FLB_RETRY
}

var err error
once.Do(func() {
runCtx, runCancel = context.WithCancel(context.Background())
theChannel = make(chan Message, maxBufferedMessages)

// We use a timer instead of a Ticker so that it is not
// rescheduled during a cancel(). We start the timer at 0
// so the first interval gets executed immediately.
t := time.NewTimer(0)

go func(t *time.Timer, theChannel chan<- Message) {
for {
select {
case <-t.C:
err := theInput.Collect(runCtx, theChannel)
if err != nil {
fmt.Fprintf(os.Stderr,
"collect error: %s\n", err.Error())
}
t.Reset(collectInterval)
case <-runCtx.Done():
t.Stop()
once = sync.Once{}
close(theChannel)
return
}
}
}(t, theChannel)
theChannel = make(chan Message)
go func() {
err = theInput.Collect(runCtx, theChannel)
}()
})
if err != nil {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return input.FLB_ERROR
}

buf := bytes.NewBuffer([]byte{})

for loop := min(len(theChannel), maxBufferedMessages); loop > 0; loop-- {
select {
case msg, ok := <-theChannel:
if !ok {
return input.FLB_ERROR
}
select {
case msg, ok := <-theChannel:
if !ok {
return input.FLB_OK
}

t := input.FLBTime{Time: msg.Time}
b, err := input.NewEncoder().Encode([]any{t, msg.Record})
if err != nil {
fmt.Fprintf(os.Stderr, "encode: %s\n", err)
return input.FLB_ERROR
}
buf.Grow(len(b))
buf.Write(b)
case <-runCtx.Done():
err := runCtx.Err()
if err != nil && !errors.Is(err, context.Canceled) {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return input.FLB_ERROR
}
// enforce a runtime gc, to prevent the thread finalizer on
// fluent-bit to kick in before any remaining data has not been GC'ed
// causing a sigsegv.
defer runtime.GC()
loop = 0
default:
loop = 0
t := input.FLBTime{Time: msg.Time}
b, err := input.NewEncoder().Encode([]any{t, msg.Record})
if err != nil {
fmt.Fprintf(os.Stderr, "encode: %s\n", err)
return input.FLB_ERROR
}
}

if buf.Len() > 0 {
b := buf.Bytes()
cdata := C.CBytes(b)

*data = cdata
if csize != nil {
*csize = C.size_t(len(b))
*csize = C.size_t(len(b))

// C.free(unsafe.Pointer(cdata))
case <-runCtx.Done():
err := runCtx.Err()
if err != nil && !errors.Is(err, context.Canceled) {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
return input.FLB_ERROR
}
// enforce a runtime gc, to prevent the thread finalizer on
// fluent-bit to kick in before any remaining data has not been GC'ed
// causing a sigsegv.
defer runtime.GC()
default:
break
}

return input.FLB_OK
Expand All @@ -233,8 +191,7 @@ func FLBPluginInputCleanupCallback(data unsafe.Pointer) int {
// plugin in the pipeline, a data pointer, length and a tag are passed to the plugin interface implementation.
//
//export FLBPluginFlush
// TODO: refactor into smaller functions.
//nolint:funlen //ignore length requirement for this function
//nolint:funlen,gocognit,gocyclo //ignore length requirement for this function, TODO: refactor into smaller functions.
func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int {
initWG.Wait()

Expand Down
134 changes: 6 additions & 128 deletions cshared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,160 +2,38 @@ package plugin

import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"unsafe"
)

type testPluginInputCallbackCtrlC struct{}
type testPluginInputCallback struct{}

func (t testPluginInputCallbackCtrlC) Init(ctx context.Context, fbit *Fluentbit) error {
func (t testPluginInputCallback) Init(ctx context.Context, fbit *Fluentbit) error {
return nil
}

func (t testPluginInputCallbackCtrlC) Collect(ctx context.Context, ch chan<- Message) error {
func (t testPluginInputCallback) Collect(ctx context.Context, ch chan<- Message) error {
return nil
}

func init() {
initWG.Done()
registerWG.Done()
}

func TestInputCallbackCtrlC(t *testing.T) {
theInput = testPluginInputCallbackCtrlC{}
theInput = testPluginInputCallback{}
cdone := make(chan bool)
timeout := time.NewTimer(1 * time.Second)
ptr := unsafe.Pointer(nil)

go func() {
FLBPluginInputCallback(&ptr, nil)
cdone <- true
}()

select {
case <-cdone:
runCancel()
case <-timeout.C:
t.Fail()
}
}

var testPluginInputCallbackInfiniteFuncs atomic.Int64

type testPluginInputCallbackInfinite struct{}

func (t testPluginInputCallbackInfinite) Init(ctx context.Context, fbit *Fluentbit) error {
return nil
}

func (t testPluginInputCallbackInfinite) Collect(ctx context.Context, ch chan<- Message) error {
testPluginInputCallbackInfiniteFuncs.Add(1)
for {
select {
default:
ch <- Message{
Time: time.Now(),
Record: map[string]string{
"Foo": "BAR",
},
}
// for tests to correctly pass our infinite loop needs
// to return once the context has been finished.
case <-ctx.Done():
return nil
}
}
}

// TestInputCallbackInfinite is a test for the main method most plugins
// use where they do not return from the first invocation of collect.
func TestInputCallbackInfinite(t *testing.T) {
theInput = testPluginInputCallbackInfinite{}
cdone := make(chan bool)
timeout := time.NewTimer(10 * time.Second)
ptr := unsafe.Pointer(nil)

go func() {
for {
FLBPluginInputCallback(&ptr, nil)
time.Sleep(1 * time.Second)

if ptr != nil {
cdone <- true
}
}
}()

select {
case <-cdone:
runCancel()
// Test the assumption that only a single goroutine is
// ingesting records.
if testPluginInputCallbackInfiniteFuncs.Load() != 1 {
t.Fail()
}
return
case <-timeout.C:
runCancel()
t.Fail()
}
t.Fail()
}

type testInputCallbackInfiniteConcurrent struct{}

var concurrentWait sync.WaitGroup

func (t testInputCallbackInfiniteConcurrent) Init(ctx context.Context, fbit *Fluentbit) error {
return nil
}

func (t testInputCallbackInfiniteConcurrent) Collect(ctx context.Context, ch chan<- Message) error {
for i := 0; i < 64; i++ {
go func(ch chan<- Message, id int) {
ch <- Message{
Time: time.Now(),
Record: map[string]string{
"ID": fmt.Sprintf("%d", id),
},
}
concurrentWait.Done()
}(ch, i)
}
// for tests to correctly pass our infinite loop needs
// to return once the context has been finished.
for {
select {
case <-ctx.Done():
return nil
}
}
}

// TestInputCallbackInfiniteConcurrent is meant to make sure we do not
// break anythin with respect to concurrent ingest.
func TestInputCallbackInfiniteConcurrent(t *testing.T) {
theInput = testInputCallbackInfiniteConcurrent{}
cdone := make(chan bool)
timeout := time.NewTimer(10 * time.Second)
ptr := unsafe.Pointer(nil)
initWG.Done()
registerWG.Done()

concurrentWait.Add(64)
go func() {
FLBPluginInputCallback(&ptr, nil)
concurrentWait.Wait()
cdone <- true
}()

select {
case <-cdone:
runCancel()
case <-timeout.C:
runCancel()
t.Fail()
}
}