diff --git a/pkg/broker/handler/processors/deliver/processor.go b/pkg/broker/handler/processors/deliver/processor.go index 4366cce032..a9a2163a9a 100644 --- a/pkg/broker/handler/processors/deliver/processor.go +++ b/pkg/broker/handler/processors/deliver/processor.go @@ -18,6 +18,7 @@ package deliver import ( "context" + "errors" "fmt" "net/http" "time" @@ -154,7 +155,19 @@ func (p *Processor) deliver(ctx context.Context, target *config.Target, broker * respMsg := cehttp.NewMessageFromHttpResponse(resp) if respMsg.ReadEncoding() == binding.EncodingUnknown { - // No reply + // If the response code is 2xx and has a body but the encoding is unknown, + // consider it's a malformed reply. + // This is a similar workaround as in https://github.com/knative/eventing/pull/3450. + // This fix is not efficient as it attempts to read the body. In HTTP case we can + // probably just use the content-length header to tell. But it will be a upstream + // fix instead. + body := make([]byte, 1) + n, _ := respMsg.BodyReader.Read(body) + respMsg.BodyReader.Close() + if n != 0 { + return errors.New("Received a malformed event in reply") + } + // No reply. return nil } diff --git a/pkg/broker/handler/processors/deliver/processor_test.go b/pkg/broker/handler/processors/deliver/processor_test.go index a5a33739b7..04c0f0a4a6 100644 --- a/pkg/broker/handler/processors/deliver/processor_test.go +++ b/pkg/broker/handler/processors/deliver/processor_test.go @@ -200,49 +200,78 @@ func TestDeliverSuccess(t *testing.T) { } } +type targetWithFailureHandler struct { + t *testing.T + delay time.Duration + respCode int + malFormedEvent bool +} + +func (h *targetWithFailureHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + h.t.Helper() + _, err := ioutil.ReadAll(req.Body) + if err != nil { + h.t.Errorf("Failed to read request: %v", err) + w.WriteHeader(http.StatusBadRequest) + return + } + + time.Sleep(h.delay) + + if h.malFormedEvent { + w.Write([]byte("not an valid event body")) + } + w.WriteHeader(h.respCode) +} + func TestDeliverFailure(t *testing.T) { cases := []struct { name string withRetry bool - withRespDelay time.Duration + targetHandler *targetWithFailureHandler failRetry bool wantErr bool }{{ - name: "delivery error no retry", - wantErr: true, + name: "delivery error no retry", + targetHandler: &targetWithFailureHandler{respCode: http.StatusInternalServerError}, + wantErr: true, }, { - name: "delivery error retry success", - withRetry: true, + name: "delivery error retry success", + targetHandler: &targetWithFailureHandler{respCode: http.StatusInternalServerError}, + withRetry: true, }, { - name: "delivery error retry failure", - withRetry: true, - failRetry: true, - wantErr: true, + name: "delivery error retry failure", + targetHandler: &targetWithFailureHandler{respCode: http.StatusInternalServerError}, + withRetry: true, + failRetry: true, + wantErr: true, }, { name: "delivery timeout no retry", - withRespDelay: time.Second, + targetHandler: &targetWithFailureHandler{delay: time.Second, respCode: http.StatusOK}, wantErr: true, }, { name: "delivery timeout retry success", - withRespDelay: time.Second, + targetHandler: &targetWithFailureHandler{delay: time.Second, respCode: http.StatusOK}, withRetry: true, }, { name: "delivery timeout retry failure", withRetry: true, - withRespDelay: time.Second, + targetHandler: &targetWithFailureHandler{delay: time.Second, respCode: http.StatusOK}, failRetry: true, wantErr: true, + }, { + name: "malformed reply failure", + // Return 2xx but with a malformed event should be considered error. + targetHandler: &targetWithFailureHandler{respCode: http.StatusOK, malFormedEvent: true}, + wantErr: true, }} for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { reportertest.ResetDeliveryMetrics() ctx := logtest.TestContextWithLogger(t) - targetClient, err := cehttp.New() - if err != nil { - t.Fatalf("failed to create target cloudevents client: %v", err) - } - targetSvr := httptest.NewServer(targetClient) + tc.targetHandler.t = t + targetSvr := httptest.NewServer(tc.targetHandler) defer targetSvr.Close() _, c, close := testPubsubClient(ctx, t, "test-project") @@ -295,38 +324,10 @@ func TestDeliverFailure(t *testing.T) { } origin := newSampleEvent() - - rctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - - go func() { - msg, resp, err := targetClient.Respond(rctx) - if err != nil && err != io.EOF { - t.Errorf("unexpected error from target receiving event: %v", err) - } - defer msg.Finish(nil) - - // If with delay, we reply OK so that we know the error is for sure caused by timeout. - if tc.withRespDelay > 0 { - time.Sleep(tc.withRespDelay) - if err := resp(rctx, nil, &cehttp.Result{StatusCode: http.StatusOK}); err != nil { - t.Errorf("unexpected error from target responding event: %v", err) - } - return - } - - // Due to https://github.com/cloudevents/sdk-go/issues/433 - // it's not possible to use Receive to easily return error. - if err := resp(rctx, nil, &cehttp.Result{StatusCode: http.StatusInternalServerError}); err != nil { - t.Errorf("unexpected error from target responding event: %v", err) - } - }() - err = p.Process(ctx, origin) if (err != nil) != tc.wantErr { t.Errorf("processing got error=%v, want=%v", err, tc.wantErr) } - <-rctx.Done() }) } }