From 56c96c41342f8a9cd6a6db7ece82f42973834834 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Thu, 9 May 2024 10:32:56 -0700 Subject: [PATCH 1/2] Remove the FIFO prioritizer; use least-loaded over all streams by default --- CHANGELOG.md | 3 + .../internal/arrow/README.md | 6 -- .../internal/arrow/exporter_test.go | 26 +------ .../otelarrowexporter/internal/arrow/fifo.go | 72 ------------------- .../internal/arrow/prioritizer.go | 20 +++--- 5 files changed, 14 insertions(+), 113 deletions(-) delete mode 100644 collector/exporter/otelarrowexporter/internal/arrow/fifo.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 49cb06000c..717cd4a2ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## Unreleased +- Remove the OTel-Arrow exporter FIFO prioritizer. Let "leastloaded" imply least-loaded + over all streams and use this behavior by default. + - Refactor otelarrowreceiver to do stream.Recv, request processing, and stream.Send in separate goroutines. [#181](https://github.com/open-telemetry/otel-arrow/pull/181) - Add a semaphore package to limit bytes admitted and total number of waiters. [#174](https://github.com/open-telemetry/otel-arrow/pull/174) diff --git a/collector/exporter/otelarrowexporter/internal/arrow/README.md b/collector/exporter/otelarrowexporter/internal/arrow/README.md index c837cd18f9..33aa03f630 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/README.md +++ b/collector/exporter/otelarrowexporter/internal/arrow/README.md @@ -108,12 +108,6 @@ restart following unavaiable, instead the manager waits for downgrade. ### Prioritizers -#### FIFO - -This prioritizer gives work to the first stream that is ready for it. -The implementation shares one `chan writeItem` between multiple -`streamWorkState` objects. - #### LeastLoadedN This prioritizer randomly selects N active streams and chooses the one diff --git a/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go b/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go index 2740cdb16e..186bf7edb0 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/exporter_test.go @@ -34,7 +34,7 @@ import ( "google.golang.org/grpc/metadata" ) -var AllPrioritizers = []PrioritizerName{FifoPrioritizer, LeastLoadedTwoPrioritizer} +var AllPrioritizers = []PrioritizerName{LeastLoadedPrioritizer, LeastLoadedTwoPrioritizer} const defaultMaxStreamLifetime = 11 * time.Second @@ -786,30 +786,6 @@ func TestArrowExporterStreamLifetimeAndShutdown(t *testing.T) { } } -func BenchmarkFifo4(b *testing.B) { - benchmarkPrioritizer(b, 4, FifoPrioritizer) -} - -func BenchmarkFifo8(b *testing.B) { - benchmarkPrioritizer(b, 8, FifoPrioritizer) -} - -func BenchmarkFifo16(b *testing.B) { - benchmarkPrioritizer(b, 16, FifoPrioritizer) -} - -func BenchmarkFifo32(b *testing.B) { - benchmarkPrioritizer(b, 32, FifoPrioritizer) -} - -func BenchmarkFifo64(b *testing.B) { - benchmarkPrioritizer(b, 64, FifoPrioritizer) -} - -func BenchmarkFifo128(b *testing.B) { - benchmarkPrioritizer(b, 128, FifoPrioritizer) -} - func BenchmarkLeastLoadedTwo4(b *testing.B) { benchmarkPrioritizer(b, 4, LeastLoadedTwoPrioritizer) } diff --git a/collector/exporter/otelarrowexporter/internal/arrow/fifo.go b/collector/exporter/otelarrowexporter/internal/arrow/fifo.go deleted file mode 100644 index a1dd5aa8b6..0000000000 --- a/collector/exporter/otelarrowexporter/internal/arrow/fifo.go +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package arrow // import "github.com/open-telemetry/otel-arrow/collector/exporter/otelarrowexporter/internal/arrow" - -import ( - "context" -) - -// fifoPrioritizer is a prioritizer that selects the next stream to write. -// It is the simplest prioritizer implementation. -type fifoPrioritizer struct { - doneCancel - - // shared is shared by all streams. will be closed to - // downgrade to standard OTLP, otherwise it returns the - // first-available. - shared chan writeItem -} - -var _ streamPrioritizer = &fifoPrioritizer{} - -// newFifoPrioritizer constructs a channel-based first-available prioritizer. -func newFifoPrioritizer(dc doneCancel, numStreams int) (*fifoPrioritizer, []*streamWorkState) { - var state []*streamWorkState - - shared := make(chan writeItem, numStreams) - - for i := 0; i < numStreams; i++ { - state = append(state, &streamWorkState{ - waiters: map[int64]chan<- error{}, - toWrite: shared, - }) - } - - return &fifoPrioritizer{ - doneCancel: dc, - shared: shared, - }, state -} - -// downgrade indicates that streams are never going to be ready. Note -// the caller is required to ensure that setReady() and unsetReady() -// cannot be called concurrently; this is done by waiting for -// Stream.writeStream() calls to return before downgrading. -func (fp *fifoPrioritizer) downgrade(ctx context.Context) { - go drain(fp.shared, ctx.Done()) -} - -// nextWriter returns the first-available stream. -func (fp *fifoPrioritizer) nextWriter(ctx context.Context) streamWriter { - select { - case <-fp.done: - // In case of downgrade, return nil to return into a - // non-Arrow code path. - return nil - default: - // Fall through to sendAndWait(). - return fp - } -} - -func (fp *fifoPrioritizer) sendAndWait(ctx context.Context, errCh <-chan error, wri writeItem) error { - select { - case <-ctx.Done(): - return ctx.Err() - case fp.shared <- wri: - return waitForWrite(ctx, errCh, fp.done) - case <-fp.done: - return ErrStreamRestarting - } -} diff --git a/collector/exporter/otelarrowexporter/internal/arrow/prioritizer.go b/collector/exporter/otelarrowexporter/internal/arrow/prioritizer.go index 4b872e640a..7c4b8c4f9b 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/prioritizer.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/prioritizer.go @@ -21,14 +21,13 @@ type PrioritizerName string var _ component.ConfigValidator = PrioritizerName("") const ( - DefaultPrioritizer PrioritizerName = FifoPrioritizer - FifoPrioritizer PrioritizerName = "fifo" + DefaultPrioritizer PrioritizerName = LeastLoadedPrioritizer + LeastLoadedPrioritizer PrioritizerName = llPrefix LeastLoadedTwoPrioritizer PrioritizerName = llPrefix + "2" LeastLoadedFourPrioritizer PrioritizerName = llPrefix + "4" unsetPrioritizer PrioritizerName = "" - llPrefix = "leastloaded" - defaultLeastLoadedN = 4 + llPrefix = "leastloaded" ) // streamPrioritizer is an interface for prioritizing multiple @@ -56,14 +55,13 @@ func newStreamPrioritizer(dc doneCancel, name PrioritizerName, numStreams int) ( name = DefaultPrioritizer } if strings.HasPrefix(string(name), llPrefix) { - // error was checked and reported in Validate; in this function, + // error was checked and reported in Validate n, err := strconv.Atoi(string(name[len(llPrefix):])) - if err != nil { - n = defaultLeastLoadedN + if err == nil { + return newBestOfNPrioritizer(dc, n, numStreams, pendingRequests) } - return newBestOfNPrioritizer(dc, n, numStreams, pendingRequests) } - return newFifoPrioritizer(dc, numStreams) + return newBestOfNPrioritizer(dc, numStreams, numStreams, pendingRequests) } // pendingRequests is the load function used by leastloadedN. @@ -76,9 +74,11 @@ func pendingRequests(sws *streamWorkState) float64 { // Validate implements component.ConfigValidator func (p PrioritizerName) Validate() error { switch p { - case FifoPrioritizer, unsetPrioritizer: + // Exact match cases + case LeastLoadedPrioritizer, unsetPrioritizer: return nil } + // "leastloadedN" cases if !strings.HasPrefix(string(p), llPrefix) { return fmt.Errorf("unrecognized prioritizer: %q", string(p)) } From 001d1bd53aec804fe4b96d7eeed6848e005ecc94 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Thu, 9 May 2024 10:41:12 -0700 Subject: [PATCH 2/2] PR num --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 717cd4a2ae..c9adeafc6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## Unreleased - Remove the OTel-Arrow exporter FIFO prioritizer. Let "leastloaded" imply least-loaded - over all streams and use this behavior by default. + over all streams and use this behavior by default. [#186](https://github.com/open-telemetry/otel-arrow/pull/186) - Refactor otelarrowreceiver to do stream.Recv, request processing, and stream.Send in separate goroutines. [#181](https://github.com/open-telemetry/otel-arrow/pull/181)