Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## Unreleased

- Remove the OTel-Arrow exporter FIFO prioritizer. Let "leastloaded" imply least-loaded
over all streams and use this behavior by default. [#186](https://github.com/open-telemetry/otel-arrow/pull/186)

- Refactor otelarrowreceiver to do stream.Recv, request processing, and stream.Send in separate goroutines. [#181](https://github.com/open-telemetry/otel-arrow/pull/181)

- Add a semaphore package to limit bytes admitted and total number of waiters. [#174](https://github.com/open-telemetry/otel-arrow/pull/174)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,6 @@ restart following unavaiable, instead the manager waits for downgrade.

### Prioritizers

#### FIFO

This prioritizer gives work to the first stream that is ready for it.
The implementation shares one `chan writeItem` between multiple
`streamWorkState` objects.

#### LeastLoadedN

This prioritizer randomly selects N active streams and chooses the one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"google.golang.org/grpc/metadata"
)

var AllPrioritizers = []PrioritizerName{FifoPrioritizer, LeastLoadedTwoPrioritizer}
var AllPrioritizers = []PrioritizerName{LeastLoadedPrioritizer, LeastLoadedTwoPrioritizer}

const defaultMaxStreamLifetime = 11 * time.Second

Expand Down Expand Up @@ -786,30 +786,6 @@ func TestArrowExporterStreamLifetimeAndShutdown(t *testing.T) {
}
}

func BenchmarkFifo4(b *testing.B) {
benchmarkPrioritizer(b, 4, FifoPrioritizer)
}

func BenchmarkFifo8(b *testing.B) {
benchmarkPrioritizer(b, 8, FifoPrioritizer)
}

func BenchmarkFifo16(b *testing.B) {
benchmarkPrioritizer(b, 16, FifoPrioritizer)
}

func BenchmarkFifo32(b *testing.B) {
benchmarkPrioritizer(b, 32, FifoPrioritizer)
}

func BenchmarkFifo64(b *testing.B) {
benchmarkPrioritizer(b, 64, FifoPrioritizer)
}

func BenchmarkFifo128(b *testing.B) {
benchmarkPrioritizer(b, 128, FifoPrioritizer)
}

func BenchmarkLeastLoadedTwo4(b *testing.B) {
benchmarkPrioritizer(b, 4, LeastLoadedTwoPrioritizer)
}
Expand Down
72 changes: 0 additions & 72 deletions collector/exporter/otelarrowexporter/internal/arrow/fifo.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ type PrioritizerName string
var _ component.ConfigValidator = PrioritizerName("")

const (
DefaultPrioritizer PrioritizerName = FifoPrioritizer
FifoPrioritizer PrioritizerName = "fifo"
DefaultPrioritizer PrioritizerName = LeastLoadedPrioritizer
LeastLoadedPrioritizer PrioritizerName = llPrefix
LeastLoadedTwoPrioritizer PrioritizerName = llPrefix + "2"
LeastLoadedFourPrioritizer PrioritizerName = llPrefix + "4"
unsetPrioritizer PrioritizerName = ""

llPrefix = "leastloaded"
defaultLeastLoadedN = 4
llPrefix = "leastloaded"
)

// streamPrioritizer is an interface for prioritizing multiple
Expand Down Expand Up @@ -56,14 +55,13 @@ func newStreamPrioritizer(dc doneCancel, name PrioritizerName, numStreams int) (
name = DefaultPrioritizer
}
if strings.HasPrefix(string(name), llPrefix) {
// error was checked and reported in Validate; in this function,
// error was checked and reported in Validate
n, err := strconv.Atoi(string(name[len(llPrefix):]))
if err != nil {
n = defaultLeastLoadedN
if err == nil {
return newBestOfNPrioritizer(dc, n, numStreams, pendingRequests)
}
return newBestOfNPrioritizer(dc, n, numStreams, pendingRequests)
}
return newFifoPrioritizer(dc, numStreams)
return newBestOfNPrioritizer(dc, numStreams, numStreams, pendingRequests)
}

// pendingRequests is the load function used by leastloadedN.
Expand All @@ -76,9 +74,11 @@ func pendingRequests(sws *streamWorkState) float64 {
// Validate implements component.ConfigValidator
func (p PrioritizerName) Validate() error {
switch p {
case FifoPrioritizer, unsetPrioritizer:
// Exact match cases
case LeastLoadedPrioritizer, unsetPrioritizer:
return nil
}
// "leastloadedN" cases
if !strings.HasPrefix(string(p), llPrefix) {
return fmt.Errorf("unrecognized prioritizer: %q", string(p))
}
Expand Down