diff --git a/internal/enginenetx/mix.go b/internal/enginenetx/mix.go index 5dc7281fce..1cac673ca3 100644 --- a/internal/enginenetx/mix.go +++ b/internal/enginenetx/mix.go @@ -1,5 +1,7 @@ package enginenetx +import "sync" + // mixSequentially mixes entries from primary followed by entries from fallback. // // This function returns a channel where we emit the edited @@ -17,3 +19,69 @@ func mixSequentially(primary, fallback <-chan *httpsDialerTactic) <-chan *httpsD }() return output } + +// mixDeterministicThenRandomConfig contains config for [mixDeterministicThenRandom]. +type mixDeterministicThenRandomConfig struct { + // C is the channel to mix from. + C <-chan *httpsDialerTactic + + // N is the number of entries to read from at the + // beginning before starting random mixing. + N int +} + +// mixDeterministicThenRandom reads the first N entries from primary, if any, then the first N +// entries from fallback, if any, and then randomly mixes the entries. +func mixDeterministicThenRandom(primary, fallback *mixDeterministicThenRandomConfig) <-chan *httpsDialerTactic { + output := make(chan *httpsDialerTactic) + go func() { + defer close(output) + mixTryEmitN(primary.C, primary.N, output) + mixTryEmitN(fallback.C, fallback.N, output) + for tx := range mixRandomly(primary.C, fallback.C) { + output <- tx + } + }() + return output +} + +func mixTryEmitN(input <-chan *httpsDialerTactic, numToRead int, output chan<- *httpsDialerTactic) { + for idx := 0; idx < numToRead; idx++ { + tactic, good := <-input + if !good { + return + } + output <- tactic + } +} + +func mixRandomly(left, right <-chan *httpsDialerTactic) <-chan *httpsDialerTactic { + output := make(chan *httpsDialerTactic) + go func() { + // read from left + waitg := &sync.WaitGroup{} + waitg.Add(1) + go func() { + defer waitg.Done() + for tx := range left { + output <- tx + } + }() + + // read from right + waitg.Add(1) + go func() { + defer waitg.Done() + for tx := range right { + output <- tx + } + }() + + // close when done + go func() { + waitg.Wait() + close(output) + }() + }() + return output +} diff --git a/internal/enginenetx/mix_test.go b/internal/enginenetx/mix_test.go index 0b30c837b9..990373af8c 100644 --- a/internal/enginenetx/mix_test.go +++ b/internal/enginenetx/mix_test.go @@ -27,3 +27,201 @@ func TestMixSequentially(t *testing.T) { t.Fatal(diff) } } + +func TestMixDeterministicThenRandom(t *testing.T) { + // define primary data source + primary := []*httpsDialerTactic{{ + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "a1.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "a2.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "a3.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "a4.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "a5.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "a6.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "a7.com", + VerifyHostname: "api.ooni.io", + }} + + // define fallback data source + fallback := []*httpsDialerTactic{{ + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "b1.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "b2.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "b3.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "b4.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "b5.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "b6.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "b7.com", + VerifyHostname: "api.ooni.io", + }} + + // define the expectations for the beginning of the result + expectBeginning := []*httpsDialerTactic{{ + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "a1.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "a2.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "b1.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "b2.com", + VerifyHostname: "api.ooni.io", + }, { + Address: "130.192.91.211", + InitialDelay: 0, + Port: "443", + SNI: "b3.com", + VerifyHostname: "api.ooni.io", + }} + + // remix + outch := mixDeterministicThenRandom( + &mixDeterministicThenRandomConfig{ + C: streamTacticsFromSlice(primary), + N: 2, + }, + &mixDeterministicThenRandomConfig{ + C: streamTacticsFromSlice(fallback), + N: 3, + }, + ) + var output []*httpsDialerTactic + for tx := range outch { + output = append(output, tx) + } + + // make sure we have the expected number of entries + if len(output) != 14 { + t.Fatal("we need 14 entries") + } + if diff := cmp.Diff(expectBeginning, output[:5]); diff != "" { + t.Fatal(diff) + } + + // make sure each entry is represented + const ( + inprimary = 1 << 0 + infallback + inoutput + ) + mapping := make(map[string]int) + for _, entry := range primary { + mapping[entry.tacticSummaryKey()] |= inprimary + } + for _, entry := range fallback { + mapping[entry.tacticSummaryKey()] |= infallback + } + for _, entry := range output { + mapping[entry.tacticSummaryKey()] |= inoutput + } + for entry, flags := range mapping { + if flags != (inprimary|inoutput) && flags != (infallback|inoutput) { + t.Fatal("unexpected flags", flags, "for entry", entry) + } + } +} + +func TestMixTryEmitNWithClosedChannel(t *testing.T) { + // create an already closed channel + inputch := make(chan *httpsDialerTactic) + close(inputch) + + // create channel for collecting the results + outputch := make(chan *httpsDialerTactic) + + go func() { + // Implementation note: mixTryEmitN does not close the channel + // when done, therefore we need to close it ourselves. + mixTryEmitN(inputch, 10, outputch) + close(outputch) + }() + + // read the output channel + var output []*httpsDialerTactic + for tx := range outputch { + output = append(output, tx) + } + + // make sure we didn't read anything + if len(output) != 0 { + t.Fatal("expected zero entries") + } +}