-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Split events emitted at once to multi chunks #1062
Split events emitted at once to multi chunks #1062
Conversation
@repeatedly Please review this change! |
e960f94
to
8985883
Compare
@sonots Could you review this change first? |
BTW, tests are filed on Mac environment. Is this known issue? |
@@ -70,6 +88,10 @@ def repeatable? | |||
true | |||
end | |||
|
|||
def slice(index, num) | |||
self.dup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If slice(1, 1)
is called, slice
should return same content, not empty stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't understand what you mean.
But, I think that OneEventStream#slice
should return empty stream for first argument >=1
.
I'll fix this code so.
@repeatedly That's a know one, I'm investigating it continuously. |
if splits_count > data.size | ||
splits_count = data.size | ||
end | ||
slice_size = if splits_count > data.size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first condition is nonsense.
8985883
to
9ae7d1b
Compare
I pushed some commits to fixes along with some comments on this thread, and rebased on current master HEAD. |
AppVeyor's one task is still failing (maybe file handle leak or something else), but I'll merge this later. |
I need more time to review multi-threaded part and benchmark. |
Memo: I found that this change miss to update |
9ae7d1b
to
5e051e2
Compare
I rebased changes on master HEAD, and added a fix for compat layer. |
@repeatedly Please review the changes. I'll take a look for test failures in osx environments. |
14c7719
to
164fa09
Compare
end | ||
# @size should be updated always right after unpack. | ||
# The real size of unpacked objects are correct, rather than given size. | ||
@size = @unpacked_times.size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After unpacked, @data
should be set to nil
for GC?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... @data
is used in empty?
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@data
should be kept as is, because #to_msgpack_stream
returns @data
itself.
@repeatedly Please check this change with latest commits. |
@repeatedly ping? |
end | ||
end | ||
|
||
unless stored | ||
# try step-by-step appending if data can't be stored into existing a chunk in non-bulk mode | ||
write_step_by_step(metadata, data, data.size / 3, &block) | ||
write_step_by_step(metadata, data, format, 10, &block) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this 10
heuristic value?
If so, comment is good for why we choose 10.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a comment for it.
…BufferChunkOverflowError
To implement it efficiently, caching unpacked objects were introduced in MessagePackEventStream. It make it possible to slice, iterate and duplicate it efficiently after it was iterated or #size called at once.
… in write_step_by_step
by split-and-join event streams. With this change, locking/releasing was improved at the same time.
* unstaged chunks will be enqueued, so there are no reason to add these sizes to staged bytes * only when unstaged chunk is staged, its size should be added
136678b
to
fe3a860
Compare
@repeatedly I added a commit to add code comment. Is that all for your review comment? |
I tested split case CPU usage on 2 c3.8xlarge instances. fluentd-benchmark one_forward based configurationflush_interval 0s. No split chunks.
Baseline: v0.12.27
Use tdlog instead of flowcounter_simple for buffer testForwarder (buffer_chunk_limit 32m)
Aggregator (tdlog + buffer_chunk_limit 65m)No split chunks because aggregator's buffer_chunk_limit is larger than forwarder's chunk.
Aggregator (tdlog + buffer_chunk_limit 8m)Split 32mb chunks into smaller chunks.
|
It is in my design - we can't help to consume CPU usage for splitting a chunk into chunks. We get safer chunking in exchange for additional CPU usage. |
@tagomoris Yeah, hard to reduce CPU usage for it. |
I agree about it. |
This change make it possible to emit large event stream into some chunks.
It can't be done at v0.10/v0.12, but it should be done to remove warnings about chunks larger than chunk limit size.
When I was writing this change, I found some problems about locking chunks and buffer global lock.
I also improved it because the buffer with this change will operate much more chunks than ever.