Skip to content

[FEATURE] buffered channel drain next#43

Merged
niedbalski merged 22 commits intomainfrom
feat-buffered-channel-drain-next
Sep 15, 2023
Merged

[FEATURE] buffered channel drain next#43
niedbalski merged 22 commits intomainfrom
feat-buffered-channel-drain-next

Conversation

@pwhelan
Copy link
Contributor

@pwhelan pwhelan commented Sep 13, 2023

Summary

This is a version of #34 which removes one of the intermediate buffers and also makes the size of the buffer channel configurable via the go.MaxBufferedMessages configuration option.

I have also added tests to make sure that this new model works and does not run into deadlocks.

…e buffer per input call.

Signed-off-by: Phillip Whelan <phil@calyptia.com>
Signed-off-by: Phillip Whelan <phil@calyptia.com>
…allback.

Signed-off-by: Phillip Whelan <phil@calyptia.com>
Signed-off-by: Phillip Whelan <phil@calyptia.com>
Signed-off-by: Phillip Whelan <phil@calyptia.com>
Signed-off-by: Phillip Whelan <phil@calyptia.com>
Signed-off-by: Phillip Whelan <phil@calyptia.com>
Signed-off-by: Phillip Whelan <phil@calyptia.com>
Signed-off-by: Phillip Whelan <phil@calyptia.com>
Signed-off-by: Phillip Whelan <phil@calyptia.com>
Signed-off-by: Phillip Whelan <phil@calyptia.com>
Signed-off-by: Phillip Whelan <phil@calyptia.com>
Signed-off-by: Phillip Whelan <phil@calyptia.com>
Signed-off-by: Phillip Whelan <phil@calyptia.com>
Signed-off-by: Phillip Whelan <phil@calyptia.com>
Signed-off-by: Phillip Whelan <phil@calyptia.com>
…e channel.

Signed-off-by: Phillip Whelan <phil@calyptia.com>
Signed-off-by: Phillip Whelan <phil@calyptia.com>
@pwhelan pwhelan changed the title [FEAT] buffered channel drain next [FEATURE] buffered channel drain next Sep 13, 2023
Copy link
Contributor

@cosmo0920 cosmo0920 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still experiencing for errno=0 Success error logs in fluent-bit side because with your patch we'll experience the edge case on the input callback:

[2023/09/14 15:33:30] [trace] [GO] entering go_collect()
[2023/09/14 15:33:30] [error] [/media/Data3/Gitrepo/fluent-bit/src/flb_input_chunk.c:1696 errno=0] Success
[2023/09/14 15:33:31] [trace] [GO] entering go_collect()
[2023/09/14 15:33:31] [error] [/media/Data3/Gitrepo/fluent-bit/src/flb_input_chunk.c:1696 errno=0] Success
[2023/09/14 15:33:31] [debug] [gdummy] operation succeeded

Instead, we should add length of ingestion check like as:

diff --git a/src/flb_plugin_proxy.c b/src/flb_plugin_proxy.c
index 0097c50ad..d8547eaca 100644
--- a/src/flb_plugin_proxy.c
+++ b/src/flb_plugin_proxy.c
@@ -84,6 +84,10 @@ static int flb_proxy_input_cb_collect(struct flb_input_instance *ins,
         flb_trace("[GO] entering go_collect()");
         ret = proxy_go_input_collect(ctx->proxy, &data, &len);
 
+        if (len == 0) {
+            flb_trace("[GO] No logs are ingested");
+            return -1;
+        }
         if (ret == -1) {
             flb_errno();
             return -1;

Then, the error message is gone:

[2023/09/14 15:35:18] [trace] [GO] entering go_collect()
[2023/09/14 15:35:18] [trace] [GO] No logs are ingested
[2023/09/14 15:35:19] [trace] [GO] entering go_collect()
[2023/09/14 15:35:19] [trace] [GO] No logs are ingested
[2023/09/14 15:35:19] [debug] [gdummy] operation succeeded
[2023/09/14 15:35:20] [trace] [GO] entering go_collect()
[2023/09/14 15:35:20] [debug] [gdummy] operation succeeded
[2023/09/14 15:35:21] [debug] [input chunk] update output instances with new chunk size diff=26, records=1, input=gdummy.0
[2023/09/14 15:35:21] [trace] [GO] entering go_collect()
[2023/09/14 15:35:21] [debug] [gdummy] operation succeeded
[2023/09/14 15:35:22] [debug] [input chunk] update output instances with new chunk size diff=26, records=1, input=gdummy.0
[2023/09/14 15:35:22] [trace] [GO] entering go_collect()
[2023/09/14 15:35:22] [trace] [task 0x7f593c037da0] created (id=0)
[2023/09/14 15:35:22] [debug] [task] created task=0x7f593c037da0 id=0 OK
[2023/09/14 15:35:22] [trace] [GO] entering go_flush()
[0] dummy.local: [2023-09-14 15:35:19.905495947 +0900 JST, {"message": [100 117 109 109 121], }
[1] dummy.local: [2023-09-14 15:35:20.906353368 +0900 JST, {"message": [100 117 109 109 121], }
[2023/09/14 15:35:22] [trace] [engine] [task event] task_id=0 out_id=0 return=OK

WDYT?

@cosmo0920
Copy link
Contributor

I sent my comment's diff as a PR: fluent/fluent-bit#7922

Signed-off-by: Phillip Whelan <phil@calyptia.com>
@pwhelan
Copy link
Contributor Author

pwhelan commented Sep 14, 2023

Instead, we should add length of ingestion check like as:

WDYT?

I am all for it. I had added the same check to a copy of fluent-bit I was using at some point while doing testing of golang plugins.

Signed-off-by: Phillip Whelan <phil@calyptia.com>
Signed-off-by: Phillip Whelan <phil@calyptia.com>
…art.

Signed-off-by: Phillip Whelan <phil@calyptia.com>
@pwhelan pwhelan self-assigned this Sep 14, 2023
@pwhelan pwhelan requested a review from niedbalski September 14, 2023 15:26
@pwhelan pwhelan requested a review from cosmo0920 September 15, 2023 14:21
Copy link
Contributor

@niedbalski niedbalski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants