(otelarrowreceiver): asynchronous stream operations#181
Conversation
…ompSz in exporter
348bb99 to
366936e
Compare
| if err != nil { | ||
| response.bytesToRelease = int64(0) | ||
| } else { | ||
| response.bytesToRelease = uncompSize |
There was a problem hiding this comment.
This looks better than before, thanks for working on it.
There was a problem hiding this comment.
Thanks for the helpful review!
| err = r.boundedQueue.Acquire(ctx, int64(prevAcquiredBytes)) | ||
| if err != nil { | ||
| return fmt.Errorf("breaking stream: %v", err) | ||
| } |
There was a problem hiding this comment.
Breaking the stream when there are too many waiters -- LGTM.
|
@lquerel please take a look, thanks! |
lquerel
left a comment
There was a problem hiding this comment.
Overall, it seems good to me. However, I suggest improving the logs when the size specified in the header does not match the actual size, for the reasons mentioned in the comments.
| // bounded queue to memory limit based on incoming uncompressed request size and waiters. | ||
| // Acquire will fail immediately if there are too many waiters, | ||
| // or will otherwise block until timeout or enough memory becomes available. | ||
| err = r.boundedQueue.Acquire(ctx, int64(prevAcquiredBytes)) |
There was a problem hiding this comment.
What prevents a buggy (or misbehaving) client/exporter from setting the 'otlp-pdata-size' arbitrarily high? Without an upper limit or some other form of protection, could this create a type of DoS attack at minimal cost? Could we detect when the 'otlp-pdata-size' does not match the actual size of the message after decompression and then ban the sender for a period of time?
EDIT: Okay, there is detection in place with logging. Could we create some form of structured logs/events containing enough information to enable external (or internal) banning of the same clients that send us invalid data? The ban mechanism could be the topic of a future PR or be an external process triggered by these events.
There was a problem hiding this comment.
Thanks for the review! Yes we log in the case that the header otlp-pdata-size, but I think you are correct that we could face a DoS attack if we accept requests with headers that acquire a large portion of the semaphore. I can file an issue for this and would love to here more about how to get the appropriate client information that would allow us to block these clients.
| if sizeHeaderFound { | ||
| // a mismatch between header set by exporter and the uncompSize just calculated. | ||
| r.telemetry.Logger.Debug("mismatch between uncompressed size in receiver and otlp-pdata-size header", zap.Int("uncompsize", int(uncompSize)), zap.Int("otlp-pdata-size", int(response.bytesToRelease))) | ||
| } else if diff < 0 { | ||
| // proto.Size() on compressed request was greater than pdata uncompressed size. | ||
| r.telemetry.Logger.Debug("uncompressed size is less than compressed size", zap.Int("uncompressed", int(uncompSize)), zap.Int("compressed", int(response.bytesToRelease))) | ||
| } |
There was a problem hiding this comment.
In my opinion, we must ensure that these structured logs contain enough information to enable us to ban buggy or misbehaving clients/exporters.
There was a problem hiding this comment.
If any significant work is needed for this, I'd like to file an issue and come back to it. OTLP doesn't have an in-band way to identify the sender -- I believe we have minimal information in the gRPC peer struct, plus any headers we add ourselves. If we could standardize on a useful header for the exporter to emit, then we could log it here.
There was a problem hiding this comment.
I created this issue #185 to work on in a followup
There was a problem hiding this comment.
However after thinking about this more I'm wondering how reliable the address is in the client.Info that we get from the incoming stream context. Does this address show up consistently https://github.com/open-telemetry/opentelemetry-collector/blob/main/client/client.go#L93
There was a problem hiding this comment.
Could this be enough information to identify harmful clients and block them from sending requests to the receiver?
There was a problem hiding this comment.
I don't know if client.Info is reliable. But we can probably find a way to get the client IP address at the gRPC connection level.
This PR adds information about the new config options added in #181 to the otelarrowreceiver readme. --------- Co-authored-by: Joshua MacDonald <jmacd@users.noreply.github.com>
…or to v0.23.0 (#33055) [](https://renovatebot.com) This PR contains the following updates: | Package | Change | Age | Adoption | Passing | Confidence | |---|---|---|---|---|---| | [github.com/open-telemetry/otel-arrow/collector](https://github.com/open-telemetry/otel-arrow) | `v0.22.0` -> `v0.23.0` | [](https://docs.renovatebot.com/merge-confidence/) | [](https://docs.renovatebot.com/merge-confidence/) | [](https://docs.renovatebot.com/merge-confidence/) | [](https://docs.renovatebot.com/merge-confidence/) | --- > [!WARNING] > Some dependencies could not be looked up. Check the Dependency Dashboard for more information. --- ### Release Notes <details> <summary>open-telemetry/otel-arrow (github.com/open-telemetry/otel-arrow/collector)</summary> ### [`v0.23.0`](https://github.com/open-telemetry/otel-arrow/releases/tag/v0.23.0) [Compare Source](https://github.com/open-telemetry/otel-arrow/compare/v0.22.0...v0.23.0) ##### What's Changed - Update go.mod files post v0.22, restore CI by [@​jmacd](https://github.com/jmacd) in [https://github.com/open-telemetry/otel-arrow/pull/180](https://github.com/open-telemetry/otel-arrow/pull/180) - (boundedqueue): new semaphore implementation by [@​moh-osman3](https://github.com/moh-osman3) in [https://github.com/open-telemetry/otel-arrow/pull/174](https://github.com/open-telemetry/otel-arrow/pull/174) - (concurrentbatchprocessor): propagate metadataKeys correctly when using `multiShardBatcher` by [@​moh-osman3](https://github.com/moh-osman3) in [https://github.com/open-telemetry/otel-arrow/pull/184](https://github.com/open-telemetry/otel-arrow/pull/184) - (otelarrowreceiver): asynchronous stream operations by [@​moh-osman3](https://github.com/moh-osman3) in [https://github.com/open-telemetry/otel-arrow/pull/181](https://github.com/open-telemetry/otel-arrow/pull/181) - Remove the FIFO prioritizer; use least-loaded over all streams by default by [@​jmacd](https://github.com/jmacd) in [https://github.com/open-telemetry/otel-arrow/pull/186](https://github.com/open-telemetry/otel-arrow/pull/186) - Lint fixes from open collector-contrib PRs. by [@​jmacd](https://github.com/jmacd) in [https://github.com/open-telemetry/otel-arrow/pull/188](https://github.com/open-telemetry/otel-arrow/pull/188) - Release otel-arrow v0.23.0 by [@​moh-osman3](https://github.com/moh-osman3) in [https://github.com/open-telemetry/otel-arrow/pull/187](https://github.com/open-telemetry/otel-arrow/pull/187) **Full Changelog**: open-telemetry/otel-arrow@v0.22.0...v0.23.0 </details> --- ### Configuration 📅 **Schedule**: Branch creation - "on tuesday" (UTC), Automerge - At any time (no schedule defined). 🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied. ♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox. 🔕 **Ignore**: Close this PR and you won't be reminded about this update again. --- - [ ] <!-- rebase-check -->If you want to rebase/retry this PR, check this box --- This PR has been generated by [Mend Renovate](https://www.mend.io/free-developer-tools/renovate/). View repository job log [here](https://developer.mend.io/github/open-telemetry/opentelemetry-collector-contrib). <!--renovate-debug:eyJjcmVhdGVkSW5WZXIiOiIzNy4zNTEuMiIsInVwZGF0ZWRJblZlciI6IjM3LjM1MS4yIiwidGFyZ2V0QnJhbmNoIjoibWFpbiIsImxhYmVscyI6WyJkZXBlbmRlbmNpZXMiLCJyZW5vdmF0ZWJvdCJdfQ==--> --------- Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> Co-authored-by: opentelemetrybot <107717825+opentelemetrybot@users.noreply.github.com> Co-authored-by: Alex Boten <223565+codeboten@users.noreply.github.com>
…3.0 (#33050) [](https://renovatebot.com) This PR contains the following updates: | Package | Change | Age | Adoption | Passing | Confidence | |---|---|---|---|---|---| | [github.com/open-telemetry/otel-arrow](https://github.com/open-telemetry/otel-arrow) | `v0.22.0` -> `v0.23.0` | [](https://docs.renovatebot.com/merge-confidence/) | [](https://docs.renovatebot.com/merge-confidence/) | [](https://docs.renovatebot.com/merge-confidence/) | [](https://docs.renovatebot.com/merge-confidence/) | --- > [!WARNING] > Some dependencies could not be looked up. Check the Dependency Dashboard for more information. --- ### Release Notes <details> <summary>open-telemetry/otel-arrow (github.com/open-telemetry/otel-arrow)</summary> ### [`v0.23.0`](https://github.com/open-telemetry/otel-arrow/releases/tag/v0.23.0) [Compare Source](https://github.com/open-telemetry/otel-arrow/compare/v0.22.0...v0.23.0) ##### What's Changed - Update go.mod files post v0.22, restore CI by [@​jmacd](https://github.com/jmacd) in [https://github.com/open-telemetry/otel-arrow/pull/180](https://github.com/open-telemetry/otel-arrow/pull/180) - (boundedqueue): new semaphore implementation by [@​moh-osman3](https://github.com/moh-osman3) in [https://github.com/open-telemetry/otel-arrow/pull/174](https://github.com/open-telemetry/otel-arrow/pull/174) - (concurrentbatchprocessor): propagate metadataKeys correctly when using `multiShardBatcher` by [@​moh-osman3](https://github.com/moh-osman3) in [https://github.com/open-telemetry/otel-arrow/pull/184](https://github.com/open-telemetry/otel-arrow/pull/184) - (otelarrowreceiver): asynchronous stream operations by [@​moh-osman3](https://github.com/moh-osman3) in [https://github.com/open-telemetry/otel-arrow/pull/181](https://github.com/open-telemetry/otel-arrow/pull/181) - Remove the FIFO prioritizer; use least-loaded over all streams by default by [@​jmacd](https://github.com/jmacd) in [https://github.com/open-telemetry/otel-arrow/pull/186](https://github.com/open-telemetry/otel-arrow/pull/186) - Lint fixes from open collector-contrib PRs. by [@​jmacd](https://github.com/jmacd) in [https://github.com/open-telemetry/otel-arrow/pull/188](https://github.com/open-telemetry/otel-arrow/pull/188) - Release otel-arrow v0.23.0 by [@​moh-osman3](https://github.com/moh-osman3) in [https://github.com/open-telemetry/otel-arrow/pull/187](https://github.com/open-telemetry/otel-arrow/pull/187) **Full Changelog**: open-telemetry/otel-arrow@v0.22.0...v0.23.0 </details> --- ### Configuration 📅 **Schedule**: Branch creation - "on tuesday" (UTC), Automerge - At any time (no schedule defined). 🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied. ♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox. 🔕 **Ignore**: Close this PR and you won't be reminded about this update again. --- - [ ] <!-- rebase-check -->If you want to rebase/retry this PR, check this box --- This PR has been generated by [Mend Renovate](https://www.mend.io/free-developer-tools/renovate/). View repository job log [here](https://developer.mend.io/github/open-telemetry/opentelemetry-collector-contrib). <!--renovate-debug:eyJjcmVhdGVkSW5WZXIiOiIzNy4zNTEuMiIsInVwZGF0ZWRJblZlciI6IjM3LjM2My41IiwidGFyZ2V0QnJhbmNoIjoibWFpbiIsImxhYmVscyI6WyJkZXBlbmRlbmNpZXMiLCJyZW5vdmF0ZWJvdCJdfQ==--> --------- Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> Co-authored-by: opentelemetrybot <107717825+opentelemetrybot@users.noreply.github.com> Co-authored-by: Alex Boten <223565+codeboten@users.noreply.github.com>
…ments & restructuring (#205) Restructure receiver code to improve readability. There are a number of metrics that are incremented when a batch starts being processed and are decremented when the batch is finished, but the control flow that maintained the balance of these updates was convoluted. The root-cause of #204 is that Arrow batches meant for a consumer to be processed in order were processed out-of-order. There was a large function body which served two purposes: consume Arrow data of the appropriate kind, enter data for the pipeline to consume next. This had to be split into two parts and should have been done as part of #181. (I, as reviewer, missed this and find, in hindsight, that the code is not easy to follow.) This improves the code structure by moving all stateful aspects of starting/finishing a request into a new `inFlightData` object which has a deferrable method to finish the request. Here, we keep: 1. The `inFlightWG` done count 2. The active requests metric 3. The active items metric 4. The active bytes metric 5. The bytes-acquired from the semaphore 6. A per-request span covering Arrow decode 7. Netstat-related instrumentation Authorization now happens before acquiring from the semaphore. A number of `fmt.Errorf()` calls are replaced with `status.Errorf(...)` and a specific error code. The tests are updated to be more specific. Several Arrow tests were accidentally canceling the test before an expected error condition was actually tested, they have been audited and improved. One new concurrent-receiver test was added. Fixes #204.
This PR