diff --git a/cshared.go b/cshared.go index 596da68..afa215d 100644 --- a/cshared.go +++ b/cshared.go @@ -6,7 +6,6 @@ package plugin import "C" import ( - "bytes" "context" "errors" "fmt" @@ -17,7 +16,6 @@ import ( "runtime" "strconv" "strings" - "sync" "time" "unsafe" @@ -29,19 +27,10 @@ import ( "github.com/calyptia/plugin/output" ) -const ( - // maxBufferedMessages is the number of messages that will be buffered - // between each fluent-bit interval (approx 1 second). - defaultMaxBufferedMessages = 300000 - // collectInterval is set to the interval present before in core-fluent-bit. - collectInterval = 1000 * time.Nanosecond -) - var ( - unregister func() - cmt *cmetrics.Context - logger Logger - maxBufferedMessages = defaultMaxBufferedMessages + unregister func() + cmt *cmetrics.Context + logger Logger ) // FLBPluginRegister registers a plugin in the context of the fluent-bit runtime, a name and description @@ -73,7 +62,7 @@ func FLBPluginRegister(def unsafe.Pointer) int { } // FLBPluginInit this method gets invoked once by the fluent-bit runtime at initialisation phase. -// here all the plugin context should be initialized and any data or flag required for +// here all the plugin context should be initialised and any data or flag required for // plugins to execute the collect or flush callback. // //export FLBPluginInit @@ -105,12 +94,6 @@ func FLBPluginInit(ptr unsafe.Pointer) int { } err = theInput.Init(ctx, fbit) - if maxbuffered := fbit.Conf.String("go.MaxBufferedMessages"); maxbuffered != "" { - maxbuffered, err := strconv.Atoi(maxbuffered) - if err != nil { - maxBufferedMessages = maxbuffered - } - } } else { conf := &flbOutputConfigLoader{ptr: ptr} cmt, err = output.FLBPluginGetCMetricsContext(ptr) @@ -134,7 +117,7 @@ func FLBPluginInit(ptr unsafe.Pointer) int { } // FLBPluginInputCallback this method gets invoked by the fluent-bit runtime, once the plugin has been -// initialized, the plugin implementation is responsible for handling the incoming data and the context +// initialised, the plugin implementation is responsible for handling the incoming data and the context // that gets past, for long-living collectors the plugin itself should keep a running thread and fluent-bit // will not execute further callbacks. // @@ -147,75 +130,50 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { return input.FLB_RETRY } + var err error once.Do(func() { runCtx, runCancel = context.WithCancel(context.Background()) - theChannel = make(chan Message, maxBufferedMessages) - - // We use a timer instead of a Ticker so that it is not - // rescheduled during a cancel(). We start the timer at 0 - // so the first interval gets executed immediately. - t := time.NewTimer(0) - - go func(t *time.Timer, theChannel chan<- Message) { - for { - select { - case <-t.C: - err := theInput.Collect(runCtx, theChannel) - if err != nil { - fmt.Fprintf(os.Stderr, - "collect error: %s\n", err.Error()) - } - t.Reset(collectInterval) - case <-runCtx.Done(): - t.Stop() - once = sync.Once{} - close(theChannel) - return - } - } - }(t, theChannel) + theChannel = make(chan Message) + go func() { + err = theInput.Collect(runCtx, theChannel) + }() }) + if err != nil { + fmt.Fprintf(os.Stderr, "run: %s\n", err) + return input.FLB_ERROR + } - buf := bytes.NewBuffer([]byte{}) - - for loop := min(len(theChannel), maxBufferedMessages); loop > 0; loop-- { - select { - case msg, ok := <-theChannel: - if !ok { - return input.FLB_ERROR - } + select { + case msg, ok := <-theChannel: + if !ok { + return input.FLB_OK + } - t := input.FLBTime{Time: msg.Time} - b, err := input.NewEncoder().Encode([]any{t, msg.Record}) - if err != nil { - fmt.Fprintf(os.Stderr, "encode: %s\n", err) - return input.FLB_ERROR - } - buf.Grow(len(b)) - buf.Write(b) - case <-runCtx.Done(): - err := runCtx.Err() - if err != nil && !errors.Is(err, context.Canceled) { - fmt.Fprintf(os.Stderr, "run: %s\n", err) - return input.FLB_ERROR - } - // enforce a runtime gc, to prevent the thread finalizer on - // fluent-bit to kick in before any remaining data has not been GC'ed - // causing a sigsegv. - defer runtime.GC() - loop = 0 - default: - loop = 0 + t := input.FLBTime{Time: msg.Time} + b, err := input.NewEncoder().Encode([]any{t, msg.Record}) + if err != nil { + fmt.Fprintf(os.Stderr, "encode: %s\n", err) + return input.FLB_ERROR } - } - if buf.Len() > 0 { - b := buf.Bytes() cdata := C.CBytes(b) + *data = cdata - if csize != nil { - *csize = C.size_t(len(b)) + *csize = C.size_t(len(b)) + + // C.free(unsafe.Pointer(cdata)) + case <-runCtx.Done(): + err := runCtx.Err() + if err != nil && !errors.Is(err, context.Canceled) { + fmt.Fprintf(os.Stderr, "run: %s\n", err) + return input.FLB_ERROR } + // enforce a runtime gc, to prevent the thread finalizer on + // fluent-bit to kick in before any remaining data has not been GC'ed + // causing a sigsegv. + defer runtime.GC() + default: + break } return input.FLB_OK @@ -233,8 +191,7 @@ func FLBPluginInputCleanupCallback(data unsafe.Pointer) int { // plugin in the pipeline, a data pointer, length and a tag are passed to the plugin interface implementation. // //export FLBPluginFlush -// TODO: refactor into smaller functions. -//nolint:funlen //ignore length requirement for this function +//nolint:funlen,gocognit,gocyclo //ignore length requirement for this function, TODO: refactor into smaller functions. func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int { initWG.Wait() diff --git a/cshared_test.go b/cshared_test.go index 0569abe..1ffdcac 100644 --- a/cshared_test.go +++ b/cshared_test.go @@ -2,160 +2,38 @@ package plugin import ( "context" - "fmt" - "sync" - "sync/atomic" "testing" "time" "unsafe" ) -type testPluginInputCallbackCtrlC struct{} +type testPluginInputCallback struct{} -func (t testPluginInputCallbackCtrlC) Init(ctx context.Context, fbit *Fluentbit) error { +func (t testPluginInputCallback) Init(ctx context.Context, fbit *Fluentbit) error { return nil } -func (t testPluginInputCallbackCtrlC) Collect(ctx context.Context, ch chan<- Message) error { +func (t testPluginInputCallback) Collect(ctx context.Context, ch chan<- Message) error { return nil } -func init() { - initWG.Done() - registerWG.Done() -} - func TestInputCallbackCtrlC(t *testing.T) { - theInput = testPluginInputCallbackCtrlC{} + theInput = testPluginInputCallback{} cdone := make(chan bool) timeout := time.NewTimer(1 * time.Second) ptr := unsafe.Pointer(nil) - go func() { - FLBPluginInputCallback(&ptr, nil) - cdone <- true - }() - - select { - case <-cdone: - runCancel() - case <-timeout.C: - t.Fail() - } -} - -var testPluginInputCallbackInfiniteFuncs atomic.Int64 - -type testPluginInputCallbackInfinite struct{} - -func (t testPluginInputCallbackInfinite) Init(ctx context.Context, fbit *Fluentbit) error { - return nil -} - -func (t testPluginInputCallbackInfinite) Collect(ctx context.Context, ch chan<- Message) error { - testPluginInputCallbackInfiniteFuncs.Add(1) - for { - select { - default: - ch <- Message{ - Time: time.Now(), - Record: map[string]string{ - "Foo": "BAR", - }, - } - // for tests to correctly pass our infinite loop needs - // to return once the context has been finished. - case <-ctx.Done(): - return nil - } - } -} - -// TestInputCallbackInfinite is a test for the main method most plugins -// use where they do not return from the first invocation of collect. -func TestInputCallbackInfinite(t *testing.T) { - theInput = testPluginInputCallbackInfinite{} - cdone := make(chan bool) - timeout := time.NewTimer(10 * time.Second) - ptr := unsafe.Pointer(nil) - - go func() { - for { - FLBPluginInputCallback(&ptr, nil) - time.Sleep(1 * time.Second) - - if ptr != nil { - cdone <- true - } - } - }() - - select { - case <-cdone: - runCancel() - // Test the assumption that only a single goroutine is - // ingesting records. - if testPluginInputCallbackInfiniteFuncs.Load() != 1 { - t.Fail() - } - return - case <-timeout.C: - runCancel() - t.Fail() - } - t.Fail() -} - -type testInputCallbackInfiniteConcurrent struct{} - -var concurrentWait sync.WaitGroup - -func (t testInputCallbackInfiniteConcurrent) Init(ctx context.Context, fbit *Fluentbit) error { - return nil -} - -func (t testInputCallbackInfiniteConcurrent) Collect(ctx context.Context, ch chan<- Message) error { - for i := 0; i < 64; i++ { - go func(ch chan<- Message, id int) { - ch <- Message{ - Time: time.Now(), - Record: map[string]string{ - "ID": fmt.Sprintf("%d", id), - }, - } - concurrentWait.Done() - }(ch, i) - } - // for tests to correctly pass our infinite loop needs - // to return once the context has been finished. - for { - select { - case <-ctx.Done(): - return nil - } - } -} - -// TestInputCallbackInfiniteConcurrent is meant to make sure we do not -// break anythin with respect to concurrent ingest. -func TestInputCallbackInfiniteConcurrent(t *testing.T) { - theInput = testInputCallbackInfiniteConcurrent{} - cdone := make(chan bool) - timeout := time.NewTimer(10 * time.Second) - ptr := unsafe.Pointer(nil) + initWG.Done() + registerWG.Done() - concurrentWait.Add(64) go func() { FLBPluginInputCallback(&ptr, nil) - concurrentWait.Wait() cdone <- true }() select { case <-cdone: - runCancel() case <-timeout.C: - runCancel() t.Fail() } }