summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2023-05-10 08:33:16 +0200
committerGitHub <noreply@github.com>2023-05-10 08:33:16 +0200
commit32cc36e3af0b5e24fdcb27ece4e5920042a8c483 (patch)
tree7c51bb375683c1ecabeb8777bfe77a7a252b9731
parente0dbf168c3080049e8d000bd175ace44b08f8e35 (diff)
parent311b8b8c7c220840f4277709a8f2c74943a6e7eb (diff)
Merge pull request #27054 from vespa-engine/mpolden/stream-req-body
Stream request body
-rw-r--r--client/go/internal/cli/cmd/feed_test.go1
-rw-r--r--client/go/internal/mock/http.go20
-rw-r--r--client/go/internal/vespa/document/http.go61
-rw-r--r--client/go/internal/vespa/document/http_test.go60
4 files changed, 85 insertions, 57 deletions
diff --git a/client/go/internal/cli/cmd/feed_test.go b/client/go/internal/cli/cmd/feed_test.go
index 097d4ae5fa3..bd0b9544e37 100644
--- a/client/go/internal/cli/cmd/feed_test.go
+++ b/client/go/internal/cli/cmd/feed_test.go
@@ -27,6 +27,7 @@ func TestFeed(t *testing.T) {
clock := &manualClock{tick: time.Second}
cli, stdout, stderr := newTestCLI(t)
httpClient := cli.httpClient.(*mock.HTTPClient)
+ httpClient.ReadBody = true
cli.now = clock.now
td := t.TempDir()
diff --git a/client/go/internal/mock/http.go b/client/go/internal/mock/http.go
index 58614d7e5bd..8a17d9996d6 100644
--- a/client/go/internal/mock/http.go
+++ b/client/go/internal/mock/http.go
@@ -11,13 +11,20 @@ import (
)
type HTTPClient struct {
- // The responses to return for future requests. Once a response is consumed, it's removed from this slice
+ // The responses to return for future requests. Once a response is consumed, it's removed from this slice.
nextResponses []HTTPResponse
- // LastRequest is the last HTTP request made through this
+ // LastRequest is the last HTTP request made through this.
LastRequest *http.Request
- // Requests contains all requests made through this
+ // ReadBody controls whether the client consumes the request body automatically. If true, LastBody will contain the
+ // body of the most recent request.
+ ReadBody bool
+
+ // LastBody is a copy of the last request payload sent through this.
+ LastBody []byte
+
+ // Requests contains all requests made through this.
Requests []*http.Request
}
@@ -48,6 +55,13 @@ func (c *HTTPClient) Do(request *http.Request, timeout time.Duration) (*http.Res
c.nextResponses = c.nextResponses[1:]
}
c.LastRequest = request
+ if c.ReadBody && request.Body != nil {
+ body, err := io.ReadAll(request.Body)
+ if err != nil {
+ return nil, err
+ }
+ c.LastBody = body
+ }
c.Requests = append(c.Requests, request)
if response.Header == nil {
response.Header = make(http.Header)
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go
index bbfb5553dab..74cc46ec962 100644
--- a/client/go/internal/vespa/document/http.go
+++ b/client/go/internal/vespa/document/http.go
@@ -1,6 +1,7 @@
package document
import (
+ "bufio"
"bytes"
"encoding/json"
"fmt"
@@ -53,6 +54,19 @@ type ClientOptions struct {
NowFunc func() time.Time
}
+type countingReader struct {
+ reader io.ReadCloser
+ bytesRead int64
+}
+
+func (r *countingReader) Read(p []byte) (int, error) {
+ n, err := r.reader.Read(p)
+ r.bytesRead += int64(n)
+ return n, err
+}
+
+func (r *countingReader) Close() error { return r.reader.Close() }
+
type countingHTTPClient struct {
client util.HTTPClient
inflight int64
@@ -200,31 +214,36 @@ func (c *Client) buffer() *bytes.Buffer {
return buf
}
-func (c *Client) createRequest(method, url string, body []byte) (*http.Request, error) {
- var buf *bytes.Buffer
+func (c *Client) createRequest(method, url string, body []byte) (*http.Request, *countingReader, error) {
+ if len(body) == 0 {
+ req, err := http.NewRequest(method, url, nil)
+ return req, nil, err
+ }
useGzip := c.options.Compression == CompressionGzip || (c.options.Compression == CompressionAuto && len(body) > 512)
- if useGzip {
- buf = bytes.NewBuffer(make([]byte, 0, 1024))
- w := c.gzipWriter(buf)
- writeRequestBody(w, body)
- if err := w.Close(); err != nil {
- return nil, err
+ r, w := io.Pipe()
+ go func() {
+ defer w.Close()
+ if useGzip {
+ buf := bufio.NewWriterSize(w, 1024)
+ zw := c.gzipWriter(buf)
+ writeRequestBody(zw, body)
+ zw.Close()
+ c.gzippers.Put(zw)
+ buf.Flush()
+ } else {
+ writeRequestBody(w, body)
}
- c.gzippers.Put(w)
- } else {
- buf = bytes.NewBuffer(make([]byte, 0, len(fieldsPrefix)+len(body)+len(fieldsSuffix)))
- writeRequestBody(buf, body)
- }
- req, err := http.NewRequest(method, url, buf)
+ }()
+ cr := &countingReader{reader: r}
+ req, err := http.NewRequest(method, url, cr)
if err != nil {
- return nil, err
+ return nil, cr, err
}
if useGzip {
req.Header.Set("Content-Encoding", "gzip")
}
req.Header.Set("Content-Type", "application/json; charset=utf-8")
- req.ContentLength = int64(buf.Len())
- return req, nil
+ return req, cr, nil
}
func (c *Client) clientTimeout() time.Duration {
@@ -239,7 +258,7 @@ func (c *Client) Send(document Document) Result {
start := c.now()
result := Result{Id: document.Id, Stats: Stats{Requests: 1}}
method, url := c.methodAndURL(document)
- req, err := c.createRequest(method, url, document.Fields)
+ req, cr, err := c.createRequest(method, url, document.Fields)
if err != nil {
return resultWithErr(result, err)
}
@@ -249,7 +268,11 @@ func (c *Client) Send(document Document) Result {
}
defer resp.Body.Close()
elapsed := c.now().Sub(start)
- return c.resultWithResponse(resp, req.ContentLength, result, elapsed)
+ var bytesRead int64
+ if cr != nil {
+ bytesRead = cr.bytesRead
+ }
+ return c.resultWithResponse(resp, bytesRead, result, elapsed)
}
func resultWithErr(result Result, err error) Result {
diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go
index a582cd7ec6e..9a47b4f45fe 100644
--- a/client/go/internal/vespa/document/http_test.go
+++ b/client/go/internal/vespa/document/http_test.go
@@ -3,7 +3,6 @@ package document
import (
"bytes"
"fmt"
- "io"
"net/http"
"reflect"
"strings"
@@ -63,7 +62,7 @@ func TestClientSend(t *testing.T) {
{Create: true, Id: mustParseId("id:ns:type::doc2"), Operation: OperationUpdate, Fields: []byte(`{"foo": "456"}`)},
{Create: true, Id: mustParseId("id:ns:type::doc3"), Operation: OperationUpdate, Fields: []byte(`{"baz": "789"}`)},
}
- httpClient := mock.HTTPClient{}
+ httpClient := mock.HTTPClient{ReadBody: true}
client, _ := NewClient(ClientOptions{
BaseURL: "https://example.com:1337",
Timeout: time.Duration(5 * time.Second),
@@ -80,7 +79,6 @@ func TestClientSend(t *testing.T) {
TotalLatency: time.Second,
MinLatency: time.Second,
MaxLatency: time.Second,
- BytesSent: 25,
},
}
if i < 2 {
@@ -100,6 +98,7 @@ func TestClientSend(t *testing.T) {
wantRes.Stats.BytesRecv = 36
}
res := client.Send(doc)
+ wantRes.Stats.BytesSent = int64(len(httpClient.LastBody))
if !reflect.DeepEqual(res, wantRes) {
t.Fatalf("got result %+v, want %+v", res, wantRes)
}
@@ -112,19 +111,12 @@ func TestClientSend(t *testing.T) {
if r.URL.String() != wantURL {
t.Errorf("got r.URL = %q, want %q", r.URL, wantURL)
}
- body, err := io.ReadAll(r.Body)
- if err != nil {
- t.Fatalf("got unexpected error %q", err)
- }
var wantBody bytes.Buffer
wantBody.WriteString(`{"fields":`)
wantBody.Write(doc.Fields)
wantBody.WriteString("}")
- if !bytes.Equal(body, wantBody.Bytes()) {
- t.Errorf("got r.Body = %q, want %q", string(body), wantBody.String())
- }
- if r.ContentLength != int64(len(body)) {
- t.Errorf("got r.ContentLength=%d, want %d", r.ContentLength, len(body))
+ if !bytes.Equal(httpClient.LastBody, wantBody.Bytes()) {
+ t.Errorf("got r.Body = %q, want %q", string(httpClient.LastBody), wantBody.String())
}
}
want := Stats{
@@ -148,52 +140,50 @@ func TestClientSend(t *testing.T) {
}
func TestClientSendCompressed(t *testing.T) {
- httpClient := mock.HTTPClient{}
+ httpClient := &mock.HTTPClient{ReadBody: true}
client, _ := NewClient(ClientOptions{
BaseURL: "https://example.com:1337",
Timeout: time.Duration(5 * time.Second),
- }, []util.HTTPClient{&httpClient})
+ }, []util.HTTPClient{httpClient})
bigBody := fmt.Sprintf(`{"foo": "%s"}`, strings.Repeat("s", 512+1))
bigDoc := Document{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Fields: []byte(bigBody)}
smallDoc := Document{Create: true, Id: mustParseId("id:ns:type::doc2"), Operation: OperationUpdate, Fields: []byte(`{"foo": "s"}`)}
+ var result Result
client.options.Compression = CompressionNone
- _ = client.Send(bigDoc)
- assertCompressedRequest(t, false, httpClient.LastRequest)
- _ = client.Send(smallDoc)
- assertCompressedRequest(t, false, httpClient.LastRequest)
+ result = client.Send(bigDoc)
+ assertCompressedRequest(t, false, result, httpClient)
+ result = client.Send(smallDoc)
+ assertCompressedRequest(t, false, result, httpClient)
client.options.Compression = CompressionAuto
- _ = client.Send(bigDoc)
- assertCompressedRequest(t, true, httpClient.LastRequest)
- _ = client.Send(smallDoc)
- assertCompressedRequest(t, false, httpClient.LastRequest)
+ result = client.Send(bigDoc)
+ assertCompressedRequest(t, true, result, httpClient)
+ result = client.Send(smallDoc)
+ assertCompressedRequest(t, false, result, httpClient)
client.options.Compression = CompressionGzip
- _ = client.Send(bigDoc)
- assertCompressedRequest(t, true, httpClient.LastRequest)
- _ = client.Send(smallDoc)
- assertCompressedRequest(t, true, httpClient.LastRequest)
+ result = client.Send(bigDoc)
+ assertCompressedRequest(t, true, result, httpClient)
+ result = client.Send(smallDoc)
+ assertCompressedRequest(t, true, result, httpClient)
}
-func assertCompressedRequest(t *testing.T, want bool, request *http.Request) {
+func assertCompressedRequest(t *testing.T, want bool, result Result, client *mock.HTTPClient) {
+ t.Helper()
wantEnc := ""
if want {
wantEnc = "gzip"
}
- gotEnc := request.Header.Get("Content-Encoding")
+ gotEnc := client.LastRequest.Header.Get("Content-Encoding")
if gotEnc != wantEnc {
t.Errorf("got Content-Encoding=%q, want %q", gotEnc, wantEnc)
}
- body, err := io.ReadAll(request.Body)
- if err != nil {
- t.Fatal(err)
- }
- if request.ContentLength != int64(len(body)) {
- t.Errorf("got ContentLength=%d, want %d", request.ContentLength, len(body))
+ if result.Stats.BytesSent != int64(len(client.LastBody)) {
+ t.Errorf("got BytesSent=%d, want %d", result.Stats.BytesSent, len(client.LastBody))
}
- compressed := bytes.HasPrefix(body, []byte{0x1f, 0x8b})
+ compressed := bytes.HasPrefix(client.LastBody, []byte{0x1f, 0x8b})
if compressed != want {
t.Errorf("got compressed=%t, want %t", compressed, want)
}