diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-05-18 22:57:24 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-05-18 22:57:24 +0200 |
commit | d5a086ea424663718636ecd4717cc9329059aec1 (patch) | |
tree | cf69c7bcc15759c0a8b08ef777064219cae0032a /client | |
parent | 0f2637115c2d37a8646befc778fcbb0d46bce049 (diff) |
Skip buffer copy for uncompressed requests
Diffstat (limited to 'client')
-rw-r--r-- | client/go/internal/vespa/document/dispatcher_test.go | 6 | ||||
-rw-r--r-- | client/go/internal/vespa/document/document.go | 22 | ||||
-rw-r--r-- | client/go/internal/vespa/document/document_test.go | 6 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http.go | 34 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http_test.go | 25 |
5 files changed, 41 insertions, 52 deletions
diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go index 252bd94dff9..382d21501c3 100644 --- a/client/go/internal/vespa/document/dispatcher_test.go +++ b/client/go/internal/vespa/document/dispatcher_test.go @@ -61,8 +61,8 @@ func TestDispatcher(t *testing.T) { breaker := NewCircuitBreaker(time.Second, 0) dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard, false) docs := []Document{ - {Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Fields: []byte(`{"foo": "123"}`)}, - {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Fields: []byte(`{"bar": "456"}`)}, + {Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields": {"foo": "123"}}`)}, + {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Body: []byte(`{"fields": {"bar": "456"}}`)}, } for _, d := range docs { dispatcher.Enqueue(d) @@ -192,7 +192,7 @@ func BenchmarkDocumentDispatching(b *testing.B) { throttler := newThrottler(8, clock.now) breaker := NewCircuitBreaker(time.Second, 0) dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard, false) - doc := Document{Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Fields: []byte(`{"foo": "123"}`)} + doc := Document{Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields": {"foo": "123"}}`)} b.ResetTimer() // ignore setup time for n := 0; n < b.N; n++ { diff --git a/client/go/internal/vespa/document/document.go b/client/go/internal/vespa/document/document.go index 0ee8dae3f60..1e7e3af7f73 100644 --- a/client/go/internal/vespa/document/document.go +++ b/client/go/internal/vespa/document/document.go @@ -33,6 +33,11 @@ const ( jsonString json.Kind = '"' ) +var ( + fieldsPrefix = []byte(`{"fields":`) + fieldsSuffix = []byte("}") +) + // Id represents a Vespa document ID. type Id struct { id string @@ -107,7 +112,7 @@ func ParseId(serialized string) (Id, error) { type Document struct { Id Id Condition string - Fields []byte + Body []byte Operation Operation Create bool } @@ -141,9 +146,9 @@ func (d Document) String() string { if d.Create { sb.WriteString(", create=true") } - if d.Fields != nil { - sb.WriteString(", fields=") - sb.WriteString(string(d.Fields)) + if d.Body != nil { + sb.WriteString(", body=") + sb.WriteString(string(d.Body)) } return sb.String() } @@ -251,10 +256,11 @@ func (d *Decoder) readField(name string, doc *Document) error { } } d.fieldsEnd = d.dec.InputOffset() - doc.Fields = make([]byte, int(d.fieldsEnd-start)) - if _, err := d.buf.Read(doc.Fields); err != nil { - return err - } + fields := d.buf.Next(int(d.fieldsEnd - start)) + doc.Body = make([]byte, 0, len(fieldsPrefix)+len(fields)+len(fieldsSuffix)) + doc.Body = append(doc.Body, fieldsPrefix...) + doc.Body = append(doc.Body, fields...) + doc.Body = append(doc.Body, fieldsSuffix...) } if readId { s, err := d.readString() diff --git a/client/go/internal/vespa/document/document_test.go b/client/go/internal/vespa/document/document_test.go index 0b9017011dd..f6713c4c0a1 100644 --- a/client/go/internal/vespa/document/document_test.go +++ b/client/go/internal/vespa/document/document_test.go @@ -143,10 +143,10 @@ func testDocumentDecoder(t *testing.T, jsonLike string) { t.Helper() r := NewDecoder(strings.NewReader(jsonLike)) want := []Document{ - {Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Fields: []byte(`{ "foo" : "123", "bar": {"a": [1, 2, 3]}}`)}, - {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Condition: "foo", Fields: []byte(`{"bar": "456"}`)}, + {Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{ "foo" : "123", "bar": {"a": [1, 2, 3]}}}`)}, + {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Condition: "foo", Body: []byte(`{"fields":{"bar": "456"}}`)}, {Id: mustParseId("id:ns:type::doc3"), Operation: OperationRemove}, - {Id: mustParseId("id:ns:type::doc4"), Operation: OperationPut, Create: true, Fields: []byte(`{"qux": "789"}`)}, + {Id: mustParseId("id:ns:type::doc4"), Operation: OperationPut, Create: true, Body: []byte(`{"fields":{"qux": "789"}}`)}, } got := []Document{} for { diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go index 8f7ac5bfe63..ce57ac55f03 100644 --- a/client/go/internal/vespa/document/http.go +++ b/client/go/internal/vespa/document/http.go @@ -30,9 +30,6 @@ const ( ) var ( - fieldsPrefix = []byte(`{"fields":`) - fieldsSuffix = []byte("}") - defaultHeaders http.Header = map[string][]string{ "User-Agent": {fmt.Sprintf("Vespa CLI/%s", build.Version)}, "Content-Type": {"application/json; charset=utf-8"}, @@ -132,15 +129,6 @@ func writeQueryParam(sb *bytes.Buffer, start int, escape bool, k, v string) { } } -func writeRequestBody(w io.Writer, body []byte) error { - for _, b := range [][]byte{fieldsPrefix, body, fieldsSuffix} { - if _, err := w.Write(b); err != nil { - return err - } - } - return nil -} - func (c *Client) methodAndURL(d Document, sb *bytes.Buffer) (string, string) { httpMethod := "" switch d.Operation { @@ -229,7 +217,7 @@ func (c *Client) preparePending() { for pd := range c.pending { pd.buf = c.buffer() method, url := c.methodAndURL(pd.document, pd.buf) - pd.request, pd.err = c.createRequest(method, url, pd.document.Fields, pd.buf) + pd.request, pd.err = c.createRequest(method, url, pd.document.Body, pd.buf) pd.prepared <- true } } @@ -259,24 +247,23 @@ func (c *Client) createRequest(method, url string, body []byte, buf *bytes.Buffe if len(body) == 0 { return newRequest(method, url, nil, false) } - bodySize := len(fieldsPrefix) + len(body) + len(fieldsSuffix) - useGzip := c.options.Compression == CompressionGzip || (c.options.Compression == CompressionAuto && bodySize > 512) - buf.Grow(min(1024, bodySize)) + useGzip := c.options.Compression == CompressionGzip || (c.options.Compression == CompressionAuto && len(body) > 512) + var r io.Reader if useGzip { + buf.Grow(min(1024, len(body))) zw := c.gzipWriter(buf) defer c.gzippers.Put(zw) - if err := writeRequestBody(zw, body); err != nil { + if _, err := zw.Write(body); err != nil { return nil, err } if err := zw.Close(); err != nil { return nil, err } + r = buf } else { - if err := writeRequestBody(buf, body); err != nil { - return nil, err - } + r = bytes.NewReader(body) } - return newRequest(method, url, buf, useGzip) + return newRequest(method, url, r, useGzip) } func (c *Client) clientTimeout() time.Duration { @@ -295,7 +282,10 @@ func (c *Client) Send(document Document) Result { if err != nil { return resultWithErr(result, err) } - bodySize := buf.Len() + bodySize := len(document.Body) + if buf.Len() > 0 { + bodySize = buf.Len() + } resp, err := c.leastBusyClient().Do(req, c.clientTimeout()) if err != nil { return resultWithErr(result, err) diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go index 7d636aa8d5c..6eda5f04fd6 100644 --- a/client/go/internal/vespa/document/http_test.go +++ b/client/go/internal/vespa/document/http_test.go @@ -3,7 +3,6 @@ package document import ( "bytes" "fmt" - "net/http" "reflect" "strings" "testing" @@ -62,16 +61,16 @@ func TestClientSend(t *testing.T) { method string url string }{ - {Document{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Fields: []byte(`{"foo": "123"}`)}, + {Document{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "123"}}`)}, "PUT", "https://example.com:1337/document/v1/ns/type/docid/doc1?timeout=5000ms&create=true"}, - {Document{Id: mustParseId("id:ns:type::doc2"), Operation: OperationUpdate, Fields: []byte(`{"foo": "456"}`)}, + {Document{Id: mustParseId("id:ns:type::doc2"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "456"}}`)}, "PUT", "https://example.com:1337/document/v1/ns/type/docid/doc2?timeout=5000ms"}, {Document{Id: mustParseId("id:ns:type::doc3"), Operation: OperationRemove}, "DELETE", "https://example.com:1337/document/v1/ns/type/docid/doc3?timeout=5000ms"}, - {Document{Condition: "foo", Id: mustParseId("id:ns:type::doc4"), Operation: OperationUpdate, Fields: []byte(`{"baz": "789"}`)}, + {Document{Condition: "foo", Id: mustParseId("id:ns:type::doc4"), Operation: OperationUpdate, Body: []byte(`{"fields":{"baz": "789"}}`)}, "PUT", "https://example.com:1337/document/v1/ns/type/docid/doc4?timeout=5000ms&condition=foo"}, } @@ -95,7 +94,6 @@ func TestClientSend(t *testing.T) { MaxLatency: time.Second, }, } - var wantBody bytes.Buffer if i < 3 { httpClient.NextResponseString(200, `{"message":"All good!"}`) wantRes.Status = StatusSuccess @@ -112,11 +110,6 @@ func TestClientSend(t *testing.T) { wantRes.Stats.Errors = 1 wantRes.Stats.BytesRecv = 36 } - if tt.method == http.MethodPut { - wantBody.WriteString(`{"fields":`) - wantBody.Write(doc.Fields) - wantBody.WriteString("}") - } res := client.Send(doc) wantRes.Stats.BytesSent = int64(len(httpClient.LastBody)) if !reflect.DeepEqual(res, wantRes) { @@ -133,8 +126,8 @@ func TestClientSend(t *testing.T) { if r.URL.String() != tt.url { t.Errorf("got r.URL = %q, want %q", r.URL, tt.url) } - if !bytes.Equal(httpClient.LastBody, wantBody.Bytes()) { - t.Errorf("got r.Body = %q, want %q", string(httpClient.LastBody), wantBody.String()) + if !bytes.Equal(httpClient.LastBody, doc.Body) { + t.Errorf("got r.Body = %q, want %q", string(httpClient.LastBody), doc.Body) } } want := Stats{ @@ -164,9 +157,9 @@ func TestClientSendCompressed(t *testing.T) { Timeout: time.Duration(5 * time.Second), }, []util.HTTPClient{httpClient}) - bigBody := fmt.Sprintf(`{"foo": "%s"}`, strings.Repeat("s", 512+1)) - bigDoc := Document{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Fields: []byte(bigBody)} - smallDoc := Document{Create: true, Id: mustParseId("id:ns:type::doc2"), Operation: OperationUpdate, Fields: []byte(`{"foo": "s"}`)} + bigBody := fmt.Sprintf(`{"fields": {"foo": "%s"}}`, strings.Repeat("s", 512+1)) + bigDoc := Document{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(bigBody)} + smallDoc := Document{Create: true, Id: mustParseId("id:ns:type::doc2"), Operation: OperationUpdate, Body: []byte(`{"fields": {"foo": "s"}}`)} var result Result client.options.Compression = CompressionNone @@ -307,7 +300,7 @@ func benchmarkClientSend(b *testing.B, compression Compression, document Documen } func makeDocument(size int) Document { - return Document{Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Fields: []byte(fmt.Sprintf(`{"foo": "%s"}`, randString(size)))} + return Document{Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(fmt.Sprintf(`{"fields": {"foo": "%s"}}`, randString(size)))} } func BenchmarkClientSendSmallUncompressed(b *testing.B) { |