-
Notifications
You must be signed in to change notification settings - Fork 855
[SDK] Circular buffer tweaks + cpu pressure test #3349
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
12d1612
673ea1d
7bda54c
dd0a640
248d559
223e255
fe3a40f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -14,6 +14,9 @@ | |||
| // limitations under the License. | ||||
| // </copyright> | ||||
|
|
||||
| #nullable enable | ||||
|
|
||||
| using System.Diagnostics; | ||||
| using System.Runtime.CompilerServices; | ||||
| using System.Threading; | ||||
|
|
||||
|
|
@@ -26,7 +29,7 @@ namespace OpenTelemetry.Internal | |||
| internal class CircularBuffer<T> | ||||
| where T : class | ||||
| { | ||||
| private readonly T[] trait; | ||||
| private readonly T?[] trait; | ||||
| private long head; | ||||
| private long tail; | ||||
|
|
||||
|
|
@@ -54,20 +57,20 @@ public int Count | |||
| { | ||||
| get | ||||
| { | ||||
| var tailSnapshot = this.tail; | ||||
| return (int)(this.head - tailSnapshot); | ||||
| var tailSnapshot = Volatile.Read(ref this.tail); | ||||
| return (int)(Volatile.Read(ref this.head) - tailSnapshot); | ||||
| } | ||||
| } | ||||
|
|
||||
| /// <summary> | ||||
| /// Gets the number of items added to the <see cref="CircularBuffer{T}"/>. | ||||
| /// </summary> | ||||
| public long AddedCount => this.head; | ||||
| public long AddedCount => Volatile.Read(ref this.head); | ||||
|
|
||||
| /// <summary> | ||||
| /// Gets the number of items removed from the <see cref="CircularBuffer{T}"/>. | ||||
| /// </summary> | ||||
| public long RemovedCount => this.tail; | ||||
| public long RemovedCount => Volatile.Read(ref this.tail); | ||||
|
|
||||
| /// <summary> | ||||
| /// Adds the specified item to the buffer. | ||||
|
|
@@ -83,22 +86,23 @@ public bool Add(T value) | |||
|
|
||||
| while (true) | ||||
| { | ||||
| var tailSnapshot = this.tail; | ||||
| var headSnapshot = this.head; | ||||
| var tailSnapshot = Volatile.Read(ref this.tail); | ||||
| var headSnapshot = Volatile.Read(ref this.head); | ||||
|
|
||||
| if (headSnapshot - tailSnapshot >= this.Capacity) | ||||
| { | ||||
| return false; // buffer is full | ||||
| } | ||||
|
|
||||
| var head = Interlocked.CompareExchange(ref this.head, headSnapshot + 1, headSnapshot); | ||||
| if (head != headSnapshot) | ||||
| if (Interlocked.CompareExchange(ref this.head, headSnapshot + 1, headSnapshot) != headSnapshot) | ||||
| { | ||||
| continue; | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be useful to add
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Defer to @reyang on this one, but I don't think so. This thread isn't so much waiting on another thread to finish as it is learning that some other thread took the head. It should retry immediately and just take the next head/index available.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SpinOnce might be more smart yielding if singlecore etc?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think a yield here would be even worse than a spin 😄 Because it doesn't need to wait on anything. If there is only 1 core this thread should just loop around, take the next index, and continue on doing its thing. Same as if there were many cores, really. There should probably be a SpinOnce here though:
Because that logic is actually waiting on the writer thread to finish. On single core, it should yield immediately because a spin won't accomplish anything other than delay letting the writer get the CPU back to finish its job 🤣
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
+1 |
||||
| } | ||||
|
|
||||
| var index = (int)(head % this.Capacity); | ||||
| this.trait[index] = value; | ||||
| var previous = Interlocked.Exchange(ref this.trait[headSnapshot % this.Capacity], value); | ||||
|
|
||||
| Debug.Assert(previous == null, "Race: Another thread wrote to index."); | ||||
|
|
||||
| return true; | ||||
| } | ||||
| } | ||||
|
|
@@ -125,16 +129,15 @@ public bool TryAdd(T value, int maxSpinCount) | |||
|
|
||||
| while (true) | ||||
| { | ||||
| var tailSnapshot = this.tail; | ||||
| var headSnapshot = this.head; | ||||
| var tailSnapshot = Volatile.Read(ref this.tail); | ||||
| var headSnapshot = Volatile.Read(ref this.head); | ||||
|
|
||||
| if (headSnapshot - tailSnapshot >= this.Capacity) | ||||
| { | ||||
| return false; // buffer is full | ||||
| } | ||||
|
|
||||
| var head = Interlocked.CompareExchange(ref this.head, headSnapshot + 1, headSnapshot); | ||||
| if (head != headSnapshot) | ||||
| if (Interlocked.CompareExchange(ref this.head, headSnapshot + 1, headSnapshot) != headSnapshot) | ||||
| { | ||||
| if (spinCountDown-- == 0) | ||||
| { | ||||
|
|
@@ -144,8 +147,10 @@ public bool TryAdd(T value, int maxSpinCount) | |||
| continue; | ||||
| } | ||||
|
|
||||
| var index = (int)(head % this.Capacity); | ||||
| this.trait[index] = value; | ||||
| var previous = Interlocked.Exchange(ref this.trait[headSnapshot % this.Capacity], value); | ||||
CodeBlanch marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||
|
|
||||
| Debug.Assert(previous == null, "Race: Another thread wrote to index."); | ||||
|
|
||||
| return true; | ||||
| } | ||||
| } | ||||
|
|
@@ -161,19 +166,18 @@ public bool TryAdd(T value, int maxSpinCount) | |||
| [MethodImpl(MethodImplOptions.AggressiveInlining)] | ||||
| public T Read() | ||||
| { | ||||
| var index = (int)(this.tail % this.Capacity); | ||||
| var index = (int)(Volatile.Read(ref this.tail) % this.Capacity); | ||||
| while (true) | ||||
| { | ||||
| var value = this.trait[index]; | ||||
| if (value == null) | ||||
| var previous = Interlocked.Exchange(ref this.trait[index], null); | ||||
| if (previous == null) | ||||
| { | ||||
| // If we got here it means a writer isn't done. | ||||
| continue; | ||||
| } | ||||
|
|
||||
| this.trait[index] = null; | ||||
| this.tail++; | ||||
| return value; | ||||
| Interlocked.Increment(ref this.tail); | ||||
| return previous; | ||||
| } | ||||
| } | ||||
| } | ||||
|
|
||||
Uh oh!
There was an error while loading. Please reload this page.