Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(httpclientx): allow configuring max-response-body size #1588

Merged
merged 91 commits into from
May 3, 2024
Merged
Changes from 1 commit
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
75ef7fd
refactor: consolidate httpx and httpapi
bassosimone Apr 22, 2024
f9210ec
refactor to make testing the whole package easier
bassosimone Apr 23, 2024
587290c
Merge branch 'master' into issue/2700
bassosimone Apr 23, 2024
af394c2
Merge branch 'master' into issue/2700
bassosimone Apr 23, 2024
c6f2f5a
Merge branch 'master' into issue/2700
bassosimone Apr 23, 2024
68c9779
Merge branch 'issue/2700' of github.com:ooni/probe-cli into issue/2700
bassosimone Apr 23, 2024
57e29da
Merge branch 'master' into issue/2700
bassosimone Apr 23, 2024
5c953f0
x
bassosimone Apr 23, 2024
e03e810
x
bassosimone Apr 23, 2024
a6046fd
x
bassosimone Apr 23, 2024
341fcf2
x
bassosimone Apr 23, 2024
8c34524
x
bassosimone Apr 23, 2024
4b464ff
try to entirely remove httpx usages
bassosimone Apr 23, 2024
6d57184
fix: make sure there is nil safety
bassosimone Apr 23, 2024
9c2a226
oxford comma: yes/no?
bassosimone Apr 23, 2024
1123b4e
x
bassosimone Apr 23, 2024
d421d24
fix: unit test needs to be adapted
bassosimone Apr 24, 2024
67e0a10
chore: improve testing for cloudflare IP lookup
bassosimone Apr 24, 2024
a69d981
chore: improve the ubuntu IP lookup tests
bassosimone Apr 24, 2024
cd25c56
Merge branch 'master' into issue/2700
bassosimone Apr 24, 2024
642ae5c
x
bassosimone Apr 24, 2024
548e6bc
doc: document oonirun/v2_test.go tests
bassosimone Apr 24, 2024
40db0e5
Merge branch 'master' into issue/2700
bassosimone Apr 24, 2024
4cf3566
start improving probeservices tests
bassosimone Apr 24, 2024
e736e42
Merge branch 'master' into issue/2700
bassosimone Apr 26, 2024
e8471c4
x
bassosimone Apr 26, 2024
aa1c836
Merge branch 'master' into issue/2700
bassosimone Apr 26, 2024
08e81a9
x
bassosimone Apr 26, 2024
fa74b48
x
bassosimone Apr 26, 2024
a7e748f
x
bassosimone Apr 26, 2024
87146cc
x
bassosimone Apr 26, 2024
dac7b8f
x
bassosimone Apr 26, 2024
04b0071
Merge branch 'master' into issue/2700
bassosimone Apr 26, 2024
79d1fee
Merge branch 'master' into issue/2700
bassosimone Apr 29, 2024
88b399d
Merge branch 'master' into issue/2700
bassosimone Apr 29, 2024
de23e7d
x
bassosimone Apr 29, 2024
9d87673
Merge branch 'master' into issue/2700
bassosimone Apr 29, 2024
a436f1e
x
bassosimone Apr 29, 2024
08f8ca9
Merge branch 'master' into issue/2700
bassosimone Apr 29, 2024
25140f3
x
bassosimone Apr 29, 2024
1bbe0b7
chore: write tests for oonicollector.go
bassosimone Apr 30, 2024
6707d61
Merge branch 'master' into issue/2700
bassosimone Apr 30, 2024
4ddd507
Merge branch 'master' into issue/2700
bassosimone Apr 30, 2024
c453ee2
x
bassosimone Apr 30, 2024
ad3d84f
Merge branch 'master' into issue/2700
bassosimone May 2, 2024
28d64f1
feat(probeservices): use httpclientx for check-in
bassosimone May 2, 2024
2107750
cleanup: remove check-in from ooapi
bassosimone May 2, 2024
c2c8ebf
Merge branch 'master' into issue/2700
bassosimone May 2, 2024
36610a8
feat: start moving TH call into engine/session.go
bassosimone May 2, 2024
b7ccf2f
Merge branch 'master' into issue/2700
bassosimone May 2, 2024
b94a8b8
x
bassosimone May 2, 2024
5f1994c
x
bassosimone May 2, 2024
6e16369
Merge branch 'master' into issue/2700
bassosimone May 2, 2024
8400bde
x
bassosimone May 2, 2024
117fcc2
Merge branch 'master' into issue/2700
bassosimone May 2, 2024
17f9b83
x
bassosimone May 2, 2024
8ca93f0
x
bassosimone May 2, 2024
854da9a
Merge branch 'master' into issue/2700
bassosimone May 2, 2024
f78e32d
x
bassosimone May 2, 2024
5a32450
fix(httpclientx): fast fallback on immediate failure
bassosimone May 2, 2024
eb47e28
x
bassosimone May 2, 2024
dbb7d6e
x
bassosimone May 2, 2024
c63e68d
x
bassosimone May 2, 2024
1c55710
try to write algorithm without deadlocks
bassosimone May 2, 2024
b461aa4
x
bassosimone May 2, 2024
421f179
x
bassosimone May 2, 2024
2a44cd5
x
bassosimone May 2, 2024
13777ba
x
bassosimone May 2, 2024
a4fb49e
x
bassosimone May 2, 2024
cdd0e72
x
bassosimone May 3, 2024
0594703
x
bassosimone May 3, 2024
0b693ce
x
bassosimone May 3, 2024
bf35e09
x
bassosimone May 3, 2024
9c94348
x
bassosimone May 3, 2024
8de9e6b
x
bassosimone May 3, 2024
857ff1f
x
bassosimone May 3, 2024
08691b3
x
bassosimone May 3, 2024
b5ef086
x
bassosimone May 3, 2024
9c87724
x
bassosimone May 3, 2024
c857375
x
bassosimone May 3, 2024
9051155
x
bassosimone May 3, 2024
60749d0
Merge branch 'master' into issue/2700
bassosimone May 3, 2024
d99bb15
x
bassosimone May 3, 2024
fad03f7
x
bassosimone May 3, 2024
8fc01d8
x
bassosimone May 3, 2024
210393d
Merge branch 'master' into issue/2700
bassosimone May 3, 2024
2b56f26
x
bassosimone May 3, 2024
c8d0e66
x
bassosimone May 3, 2024
3623db4
x
bassosimone May 3, 2024
9158901
x
bassosimone May 3, 2024
88b40b0
x
bassosimone May 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
x
bassosimone committed May 3, 2024

Verified

This commit was signed with the committer’s verified signature.
Ddystopia Oleksandr Babak
commit 059470348901c3a4927a9e9f34c84fb1ef23b53b
52 changes: 39 additions & 13 deletions internal/httpclientx/overlapped.go
Original file line number Diff line number Diff line change
@@ -8,14 +8,16 @@ import (
"context"
"errors"
"time"

"github.com/ooni/probe-cli/v3/internal/erroror"
)

// OverlappedDefaultScheduleInterval is the default schedule interval. After this interval
// has elapsed for a URL without seeing a success, we will schedule the next URL.
const OverlappedDefaultScheduleInterval = 15 * time.Second

// OverlappedDefaultWatchdogTimeout is the timeout after which we assume all the API calls
// have gone rogue and forcibly interrupt all of them.
const OverlappedDefaultWatchdogTimeout = 5 * time.Minute

// Overlapped represents the possibility of overlapping HTTP calls for a set of
// functionally equivalent URLs, such that we start a new call if the previous one
// has failed to produce a result within the configured ScheduleInterval.
@@ -24,7 +26,7 @@ const OverlappedDefaultScheduleInterval = 15 * time.Second
//
// Under very bad networking conditions, [*Overlapped] would cause a new network
// call to start while the previous one is still in progress and very slowly downloading
// a response. A future implementation SHOULD probably account for this possibility.
// a response. A future implementation MIGHT want to account for this possibility.
type Overlapped[Output any] struct {
// RunFunc is the MANDATORY function that fetches the given [*Endpoint].
//
@@ -42,12 +44,22 @@ type Overlapped[Output any] struct {
//
// If you set it manually, you MUST modify it before calling [*Overlapped.Run].
ScheduleInterval time.Duration

// WatchdogTimeout is the MANDATORY timeout after which the code assumes
// that all API calls must be aborted and give up.
//
// This field is typically initialized by [NewOverlappedGetJSON], [NewOverlappedGetRaw],
// [NewOverlappedGetXML], or [NewOverlappedPostJSON] to be [OverlappedDefaultWatchdogTimeout].
//
// If you set it manually, you MUST modify it before calling [*Overlapped.Run].
WatchdogTimeout time.Duration
}

func newOverlappedWithFunc[Output any](fx func(context.Context, *Endpoint) (Output, error)) *Overlapped[Output] {
return &Overlapped[Output]{
RunFunc: fx,
ScheduleInterval: OverlappedDefaultScheduleInterval,
WatchdogTimeout: OverlappedDefaultWatchdogTimeout,
}
}

@@ -82,12 +94,22 @@ func NewOverlappedPostJSON[Input, Output any](input Input, config *Config) *Over
// ErrGenericOverlappedFailure indicates that a generic [*Overlapped] failure occurred.
var ErrGenericOverlappedFailure = errors.New("overlapped: generic failure")

// TODO(bassosimone): to use this API with test helpers, we also need to return
// the successful index to the caller.

// Run runs the overlapped operations, returning the result of the first operation
// that succeeds and otherwise returning an error describing what happened.
func (ovx *Overlapped[Output]) Run(ctx context.Context, epnts ...*Endpoint) (Output, error) {
return ovx.Reduce(ovx.Map(ctx, epnts...))
}

// OverlappedErrorOr combines error information, result information and the endpoint index.
type OverlappedErrorOr[Output any] struct {
Err error
Index int
Value Output
}

// Map applies the [*Overlapped.RunFunc] function to each epnts entry, thus producing
// a result for each entry. This function will cancel subsequent operations until there
// is a success: subsequent results will be [context.Canceled] errors.
@@ -96,19 +118,20 @@ func (ovx *Overlapped[Output]) Run(ctx context.Context, epnts ...*Endpoint) (Out
// of each operation, which is mostly useful when running unit tests.
//
// Note that this function will return a zero length slice of epnts lenth is also zero.
func (ovx *Overlapped[Output]) Map(ctx context.Context, epnts ...*Endpoint) []*erroror.Value[Output] {
// create cancellable context for early cancellation
func (ovx *Overlapped[Output]) Map(ctx context.Context, epnts ...*Endpoint) []*OverlappedErrorOr[Output] {
// create cancellable context for early cancellation and also apply the
// watchdog timeout so that eventually this code returns.
//
// we are going to cancel this context as soon as we have a successful response so
// that we do not waste network resources by performing other attempts.
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithTimeout(ctx, ovx.WatchdogTimeout)
defer cancel()

// construct channel for collecting the results
//
// we're using this channel to communicate results back from goroutines running
// in the background and performing the real API call
output := make(chan *erroror.Value[Output])
output := make(chan *OverlappedErrorOr[Output])

// create ticker for scheduling subsequent attempts
//
@@ -125,7 +148,7 @@ func (ovx *Overlapped[Output]) Map(ctx context.Context, epnts ...*Endpoint) []*e
// for simplicity, we're going to collect results from every goroutine
// including the ones cancelled by context after the previous success and
// then we're going to filter the results and produce a final result
results := []*erroror.Value[Output]{}
results := []*OverlappedErrorOr[Output]{}

// keep looping until we have results for each endpoints
for len(results) < len(epnts) {
@@ -165,7 +188,7 @@ func (ovx *Overlapped[Output]) Map(ctx context.Context, epnts ...*Endpoint) []*e
}

// Reduce takes the results of [*Overlapped.Map] and returns either an Output or an error.
func (ovx *Overlapped[Output]) Reduce(results []*erroror.Value[Output]) (Output, error) {
func (ovx *Overlapped[Output]) Reduce(results []*OverlappedErrorOr[Output]) (Output, error) {
// postprocess the results to check for success and
// aggregate all the errors that occurred
errorv := []error{}
@@ -191,15 +214,18 @@ func (ovx *Overlapped[Output]) Reduce(results []*erroror.Value[Output]) (Output,
}

// transact performs an HTTP transaction with the given URL and writes results to the output channel.
func (ovx *Overlapped[Output]) transact(ctx context.Context, _ int, epnt *Endpoint, output chan<- *erroror.Value[Output]) {
// TODO(bassosimone): the index is currently unused but we need to use it
// soon to return back which endpoint actually succeded
func (ovx *Overlapped[Output]) transact(
ctx context.Context, idx int, epnt *Endpoint, output chan<- *OverlappedErrorOr[Output]) {
// obtain the results
value, err := ovx.RunFunc(ctx, epnt)

// emit the results
//
// note that this unconditional channel write REQUIRES that we keep reading from
// the results channel in Run until we have a result per input endpoint
output <- &erroror.Value[Output]{Err: err, Value: value}
output <- &OverlappedErrorOr[Output]{
Err: err,
Index: idx,
Value: value,
}
}
250 changes: 216 additions & 34 deletions internal/httpclientx/overlapped_test.go
Original file line number Diff line number Diff line change
@@ -11,6 +11,8 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/must"
"github.com/ooni/probe-cli/v3/internal/netxlite"
"github.com/ooni/probe-cli/v3/internal/runtimex"
"github.com/ooni/probe-cli/v3/internal/testingx"
)

@@ -73,15 +75,46 @@ func TestNewOverlappedPostJSONFastRecoverFromEarlyErrors(t *testing.T) {
})

// Now we issue the requests and check we're getting the correct response.
//
// We're splitting the algorithm into its Map step and its Reduce step because
// this allows us to clearly observe what happened.

apiResp, err := overlapped.Run(
results := overlapped.Map(
context.Background(),
NewEndpoint(zeroTh.URL),
NewEndpoint(oneTh.URL),
NewEndpoint(twoTh.URL),
NewEndpoint(threeTh.URL),
)

runtimex.Assert(len(results) == 4, "unexpected number of results")

// the first three attempts should have failed with connection reset
// while the fourth result should be successful
for _, entry := range results {
t.Log(entry.Index, string(must.MarshalJSON(entry)))
switch entry.Index {
case 0, 1, 2:
if err := entry.Err; !errors.Is(err, netxlite.ECONNRESET) {
t.Fatal("unexpected error", err)
}
case 3:
if err := entry.Err; err != nil {
t.Fatal("unexpected error", err)
}
if diff := cmp.Diff(expectedResponse, entry.Value); diff != "" {
t.Fatal(diff)
}
default:
t.Fatal("unexpected index", entry.Index)
}
}

// Now run the reduce step of the algorithm and make sure we correctly
// return the first success and the nil error

apiResp, err := overlapped.Reduce(results)

// we do not expect to see a failure because threeTh is WAI
if err != nil {
t.Fatal(err)
@@ -104,39 +137,30 @@ func TestNewOverlappedPostJSONFirstCallSucceeds(t *testing.T) {
// - 3.th.ooni.org is WAI but slow
//
// We expect to get a response from the first TH because it's the first goroutine
// that we schedule and, even if the wakeup signals for THs are random, the schedule
// interval is 15 seconds while we emit a wakeup signal every 0.25 seconds.
//
// What we're testing here, therefore, is that subsequent calls w
// that we schedule. Subsequent calls should be canceled.
//

expectedResponse := &apiResponse{
Age: 41,
Name: "sbs",
}

slowwakeup := make(chan any)

zeroTh := testingx.MustNewHTTPServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
<-slowwakeup
w.Write(must.MarshalJSON(expectedResponse))
}))
defer zeroTh.Close()

oneTh := testingx.MustNewHTTPServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
<-slowwakeup
w.Write(must.MarshalJSON(expectedResponse))
}))
defer oneTh.Close()

twoTh := testingx.MustNewHTTPServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
<-slowwakeup
w.Write(must.MarshalJSON(expectedResponse))
}))
defer twoTh.Close()

threeTh := testingx.MustNewHTTPServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
<-slowwakeup
w.Write(must.MarshalJSON(expectedResponse))
}))
defer threeTh.Close()
@@ -160,28 +184,47 @@ func TestNewOverlappedPostJSONFirstCallSucceeds(t *testing.T) {
// to fetch from their respective URLs.
overlapped.ScheduleInterval = 15 * time.Second

// In the background we're going to emit slow wakeup signals at fixed intervals
// after an initial waiting interval, such that goroutines unblock in order

go func() {
time.Sleep(250 * time.Millisecond)
for idx := 0; idx < 4; idx++ {
slowwakeup <- true
time.Sleep(250 * time.Millisecond)
}
close(slowwakeup)
}()

// Now we issue the requests and check we're getting the correct response.
//
// We're splitting the algorithm into its Map step and its Reduce step because
// this allows us to clearly observe what happened.

apiResp, err := overlapped.Run(
results := overlapped.Map(
context.Background(),
NewEndpoint(zeroTh.URL),
NewEndpoint(oneTh.URL),
NewEndpoint(twoTh.URL),
NewEndpoint(threeTh.URL),
)

runtimex.Assert(len(results) == 4, "unexpected number of results")

// the first three attempts should succeed and subsequent ones should
// have failed with the context.Canceled error
for _, entry := range results {
t.Log(entry.Index, string(must.MarshalJSON(entry)))
switch entry.Index {
case 1, 2, 3:
if err := entry.Err; !errors.Is(err, context.Canceled) {
t.Fatal("unexpected error", err)
}
case 0:
if err := entry.Err; err != nil {
t.Fatal("unexpected error", err)
}
if diff := cmp.Diff(expectedResponse, entry.Value); diff != "" {
t.Fatal(diff)
}
default:
t.Fatal("unexpected index", entry.Index)
}
}

// Now run the reduce step of the algorithm and make sure we correctly
// return the first success and the nil error

apiResp, err := overlapped.Reduce(results)

// we do not expect to see a failure because all the THs are WAI
if err != nil {
t.Fatal(err)
@@ -252,30 +295,169 @@ func TestNewOverlappedPostJSONHandlesAllTimeouts(t *testing.T) {

// Now we issue the requests and check we're getting the correct response.
//
// IMPORTANT: here we need a context with timeout to ensure that we
// eventually stop trying with the blocked-forever servers. In a more
// real scenario, even without a context timeout, we have other
// safeguards to unblock stuck readers in netxlite code.
// We're splitting the algorithm into its Map step and its Reduce step because
// this allows us to clearly observe what happened.

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
// modify the watchdog timeout be much smaller than usual
overlapped.WatchdogTimeout = 2 * time.Second

apiResp, err := overlapped.Run(
ctx,
results := overlapped.Map(
context.Background(),
NewEndpoint(zeroTh.URL),
NewEndpoint(oneTh.URL),
NewEndpoint(twoTh.URL),
NewEndpoint(threeTh.URL),
)

// we do not expect to see a failure because all the THs are WAI
runtimex.Assert(len(results) == 4, "unexpected number of results")

// all the attempts should have failed with context deadline exceeded
for _, entry := range results {
t.Log(entry.Index, string(must.MarshalJSON(entry)))
switch entry.Index {
case 0, 1, 2, 3:
if err := entry.Err; !errors.Is(err, context.DeadlineExceeded) {
t.Fatal("unexpected error", err)
}
default:
t.Fatal("unexpected index", entry.Index)
}
}

// Now run the reduce step of the algorithm and make sure we correctly
// return the first success and the nil error

apiResp, err := overlapped.Reduce(results)

// we expect to see a failure because the watchdog timeout should have fired
if !errors.Is(err, context.DeadlineExceeded) {
t.Fatal("unexpected error", err)
}

// we expect the api response to be nil
if apiResp != nil {
t.Fatal("expected non-nil resp")
t.Fatal("expected nil resp")
}

// now unblock the blocked goroutines
close(blockforever)
}

func TestNewOverlappedPostJSONResetTimeoutSuccessCanceled(t *testing.T) {

//
// Scenario:
//
// - 0.th.ooni.org resets the connection
// - 1.th.ooni.org causes timeout
// - 2.th.ooni.org is WAI
// - 3.th.ooni.org causes timeout
//
// We expect to see a success and to never attempt with 3.th.ooni.org.
//

blockforever := make(chan any)

zeroTh := testingx.MustNewHTTPServer(testingx.HTTPHandlerReset())
defer zeroTh.Close()

oneTh := testingx.MustNewHTTPServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
<-blockforever
w.WriteHeader(http.StatusBadGateway)
}))
defer oneTh.Close()

expectedResponse := &apiResponse{
Age: 41,
Name: "sbs",
}

twoTh := testingx.MustNewHTTPServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write(must.MarshalJSON(expectedResponse))
}))
defer twoTh.Close()

threeTh := testingx.MustNewHTTPServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
<-blockforever
w.WriteHeader(http.StatusBadGateway)
}))
defer threeTh.Close()

// Create client configuration. We don't care much about the
// JSON requests and reponses being aligned to reality.

apiReq := &apiRequest{
UserID: 117,
}

overlapped := NewOverlappedPostJSON[*apiRequest, *apiResponse](apiReq, &Config{
Authorization: "", // not relevant for this test
Client: http.DefaultClient,
Logger: log.Log,
UserAgent: model.HTTPHeaderUserAgent,
})

// make sure the schedule interval is low to make this test run faster.
overlapped.ScheduleInterval = 250 * time.Millisecond

// Now we issue the requests and check we're getting the correct response.
//
// We're splitting the algorithm into its Map step and its Reduce step because
// this allows us to clearly observe what happened.

// modify the watchdog timeout be much smaller than usual
overlapped.WatchdogTimeout = 2 * time.Second

results := overlapped.Map(
context.Background(),
NewEndpoint(zeroTh.URL),
NewEndpoint(oneTh.URL),
NewEndpoint(twoTh.URL),
NewEndpoint(threeTh.URL),
)

runtimex.Assert(len(results) == 4, "unexpected number of results")

// attempt 0: should have seen connection reset
// attempt 1: should have seen the context canceled
// attempt 2: should be successful
// attempt 3: should have seen the context canceled
for _, entry := range results {
t.Log(entry.Index, string(must.MarshalJSON(entry)))
switch entry.Index {
case 0:
if err := entry.Err; !errors.Is(err, netxlite.ECONNRESET) {
t.Fatal("unexpected error", err)
}
case 1, 3:
if err := entry.Err; !errors.Is(err, context.Canceled) {
t.Fatal("unexpected error", err)
}
case 2:
if err := entry.Err; err != nil {
t.Fatal("unexpected error", err)
}
if diff := cmp.Diff(expectedResponse, entry.Value); diff != "" {
t.Fatal(diff)
}
default:
t.Fatal("unexpected index", entry.Index)
}
}

// Now run the reduce step of the algorithm and make sure we correctly
// return the first success and the nil error

apiResp, err := overlapped.Reduce(results)

// we do not expect to see a failure because all the THs are WAI
if err != nil {
t.Fatal(err)
}

// compare response to expectation
if diff := cmp.Diff(expectedResponse, apiResp); diff != "" {
t.Fatal(diff)
}

// now unblock the blocked goroutines