diff --git a/http.go b/http.go index 79241db19c..ce30f8c977 100644 --- a/http.go +++ b/http.go @@ -64,6 +64,10 @@ type Response struct { // Copying Header by value is forbidden. Use pointer to Header instead. Header ResponseHeader + // Flush headers as soon as possible without waiting for first body bytes. + // Relevant for bodyStream only. + ImmediateHeaderFlush bool + bodyStream io.Reader w responseBodyWriter body *bytebufferpool.ByteBuffer @@ -870,6 +874,7 @@ func (resp *Response) Reset() { resp.SkipBody = false resp.raddr = nil resp.laddr = nil + resp.ImmediateHeaderFlush = false } func (resp *Response) resetSkipHeader() { @@ -1455,12 +1460,22 @@ func (resp *Response) writeBodyStream(w *bufio.Writer, sendBody bool) error { } if contentLength >= 0 { if err = resp.Header.Write(w); err == nil && sendBody { - err = writeBodyFixedSize(w, resp.bodyStream, int64(contentLength)) + if resp.ImmediateHeaderFlush { + err = w.Flush() + } + if err == nil { + err = writeBodyFixedSize(w, resp.bodyStream, int64(contentLength)) + } } } else { resp.Header.SetContentLength(-1) if err = resp.Header.Write(w); err == nil && sendBody { - err = writeBodyChunked(w, resp.bodyStream) + if resp.ImmediateHeaderFlush { + err = w.Flush() + } + if err == nil { + err = writeBodyChunked(w, resp.bodyStream) + } } } err1 := resp.closeBodyStream() diff --git a/http_test.go b/http_test.go index cd31f87cb1..03aa52d36c 100644 --- a/http_test.go +++ b/http_test.go @@ -1921,3 +1921,133 @@ func TestResponseRawBodyCopyTo(t *testing.T) { testResponseCopyTo(t, &resp) } + +type testReader struct { + read chan (int) + cb chan (struct{}) +} + +func (r *testReader) Read(b []byte) (int, error) { + read := <-r.read + + if read == -1 { + return 0, io.EOF + } + + r.cb <- struct{}{} + + for i := 0; i < read; i++ { + b[i] = 'x' + } + + return read, nil +} + +func TestResponseImmediateHeaderFlushRegressionFixedLength(t *testing.T) { + var r Response + + expectedS := "aaabbbccc" + buf := bytes.NewBufferString(expectedS) + r.SetBodyStream(buf, len(expectedS)) + r.ImmediateHeaderFlush = true + + testBodyWriteTo(t, &r, expectedS, false) +} + +func TestResponseImmediateHeaderFlushRegressionChunked(t *testing.T) { + var r Response + + expectedS := "aaabbbccc" + buf := bytes.NewBufferString(expectedS) + r.SetBodyStream(buf, -1) + r.ImmediateHeaderFlush = true + + testBodyWriteTo(t, &r, expectedS, false) +} + +func TestResponseImmediateHeaderFlushFixedLength(t *testing.T) { + var r Response + + r.ImmediateHeaderFlush = true + + ch := make(chan int) + cb := make(chan struct{}) + + buf := &testReader{read: ch, cb: cb} + + r.SetBodyStream(buf, 3) + + b := []byte{} + w := bytes.NewBuffer(b) + bb := bufio.NewWriter(w) + + bw := &r + + waitForIt := make(chan struct{}) + + go func() { + if err := bw.Write(bb); err != nil { + t.Fatalf("unexpected error: %s", err) + } + waitForIt <- struct{}{} + }() + + ch <- 3 + + if !strings.Contains(w.String(), "Content-Length: 3") { + t.Fatalf("Expected headers to be flushed") + } + + if strings.Contains(w.String(), "xxx") { + t.Fatalf("Did not expext body to be written yet") + } + + <-cb + ch <- -1 + + <-waitForIt +} + +func TestResponseImmediateHeaderFlushChunked(t *testing.T) { + var r Response + + r.ImmediateHeaderFlush = true + + ch := make(chan int) + cb := make(chan struct{}) + + buf := &testReader{read: ch, cb: cb} + + r.SetBodyStream(buf, -1) + + b := []byte{} + w := bytes.NewBuffer(b) + bb := bufio.NewWriter(w) + + bw := &r + + waitForIt := make(chan struct{}) + + go func() { + if err := bw.Write(bb); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + waitForIt <- struct{}{} + }() + + ch <- 3 + + if !strings.Contains(w.String(), "Transfer-Encoding: chunked") { + t.Fatalf("Expected headers to be flushed") + } + + if strings.Contains(w.String(), "xxx") { + t.Fatalf("Did not expext body to be written yet") + } + + <-cb + ch <- -1 + + <-waitForIt +}