aboutsummaryrefslogtreecommitdiffstats
path: root/client/go
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-05-18 22:57:24 +0200
committerMartin Polden <mpolden@mpolden.no>2023-05-18 22:57:24 +0200
commitd5a086ea424663718636ecd4717cc9329059aec1 (patch)
treecf69c7bcc15759c0a8b08ef777064219cae0032a /client/go
parent0f2637115c2d37a8646befc778fcbb0d46bce049 (diff)
Skip buffer copy for uncompressed requests
Diffstat (limited to 'client/go')
-rw-r--r--client/go/internal/vespa/document/dispatcher_test.go6
-rw-r--r--client/go/internal/vespa/document/document.go22
-rw-r--r--client/go/internal/vespa/document/document_test.go6
-rw-r--r--client/go/internal/vespa/document/http.go34
-rw-r--r--client/go/internal/vespa/document/http_test.go25
5 files changed, 41 insertions, 52 deletions
diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go
index 252bd94dff9..382d21501c3 100644
--- a/client/go/internal/vespa/document/dispatcher_test.go
+++ b/client/go/internal/vespa/document/dispatcher_test.go
@@ -61,8 +61,8 @@ func TestDispatcher(t *testing.T) {
breaker := NewCircuitBreaker(time.Second, 0)
dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard, false)
docs := []Document{
- {Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Fields: []byte(`{"foo": "123"}`)},
- {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Fields: []byte(`{"bar": "456"}`)},
+ {Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields": {"foo": "123"}}`)},
+ {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Body: []byte(`{"fields": {"bar": "456"}}`)},
}
for _, d := range docs {
dispatcher.Enqueue(d)
@@ -192,7 +192,7 @@ func BenchmarkDocumentDispatching(b *testing.B) {
throttler := newThrottler(8, clock.now)
breaker := NewCircuitBreaker(time.Second, 0)
dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard, false)
- doc := Document{Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Fields: []byte(`{"foo": "123"}`)}
+ doc := Document{Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields": {"foo": "123"}}`)}
b.ResetTimer() // ignore setup time
for n := 0; n < b.N; n++ {
diff --git a/client/go/internal/vespa/document/document.go b/client/go/internal/vespa/document/document.go
index 0ee8dae3f60..1e7e3af7f73 100644
--- a/client/go/internal/vespa/document/document.go
+++ b/client/go/internal/vespa/document/document.go
@@ -33,6 +33,11 @@ const (
jsonString json.Kind = '"'
)
+var (
+ fieldsPrefix = []byte(`{"fields":`)
+ fieldsSuffix = []byte("}")
+)
+
// Id represents a Vespa document ID.
type Id struct {
id string
@@ -107,7 +112,7 @@ func ParseId(serialized string) (Id, error) {
type Document struct {
Id Id
Condition string
- Fields []byte
+ Body []byte
Operation Operation
Create bool
}
@@ -141,9 +146,9 @@ func (d Document) String() string {
if d.Create {
sb.WriteString(", create=true")
}
- if d.Fields != nil {
- sb.WriteString(", fields=")
- sb.WriteString(string(d.Fields))
+ if d.Body != nil {
+ sb.WriteString(", body=")
+ sb.WriteString(string(d.Body))
}
return sb.String()
}
@@ -251,10 +256,11 @@ func (d *Decoder) readField(name string, doc *Document) error {
}
}
d.fieldsEnd = d.dec.InputOffset()
- doc.Fields = make([]byte, int(d.fieldsEnd-start))
- if _, err := d.buf.Read(doc.Fields); err != nil {
- return err
- }
+ fields := d.buf.Next(int(d.fieldsEnd - start))
+ doc.Body = make([]byte, 0, len(fieldsPrefix)+len(fields)+len(fieldsSuffix))
+ doc.Body = append(doc.Body, fieldsPrefix...)
+ doc.Body = append(doc.Body, fields...)
+ doc.Body = append(doc.Body, fieldsSuffix...)
}
if readId {
s, err := d.readString()
diff --git a/client/go/internal/vespa/document/document_test.go b/client/go/internal/vespa/document/document_test.go
index 0b9017011dd..f6713c4c0a1 100644
--- a/client/go/internal/vespa/document/document_test.go
+++ b/client/go/internal/vespa/document/document_test.go
@@ -143,10 +143,10 @@ func testDocumentDecoder(t *testing.T, jsonLike string) {
t.Helper()
r := NewDecoder(strings.NewReader(jsonLike))
want := []Document{
- {Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Fields: []byte(`{ "foo" : "123", "bar": {"a": [1, 2, 3]}}`)},
- {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Condition: "foo", Fields: []byte(`{"bar": "456"}`)},
+ {Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{ "foo" : "123", "bar": {"a": [1, 2, 3]}}}`)},
+ {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Condition: "foo", Body: []byte(`{"fields":{"bar": "456"}}`)},
{Id: mustParseId("id:ns:type::doc3"), Operation: OperationRemove},
- {Id: mustParseId("id:ns:type::doc4"), Operation: OperationPut, Create: true, Fields: []byte(`{"qux": "789"}`)},
+ {Id: mustParseId("id:ns:type::doc4"), Operation: OperationPut, Create: true, Body: []byte(`{"fields":{"qux": "789"}}`)},
}
got := []Document{}
for {
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go
index 8f7ac5bfe63..ce57ac55f03 100644
--- a/client/go/internal/vespa/document/http.go
+++ b/client/go/internal/vespa/document/http.go
@@ -30,9 +30,6 @@ const (
)
var (
- fieldsPrefix = []byte(`{"fields":`)
- fieldsSuffix = []byte("}")
-
defaultHeaders http.Header = map[string][]string{
"User-Agent": {fmt.Sprintf("Vespa CLI/%s", build.Version)},
"Content-Type": {"application/json; charset=utf-8"},
@@ -132,15 +129,6 @@ func writeQueryParam(sb *bytes.Buffer, start int, escape bool, k, v string) {
}
}
-func writeRequestBody(w io.Writer, body []byte) error {
- for _, b := range [][]byte{fieldsPrefix, body, fieldsSuffix} {
- if _, err := w.Write(b); err != nil {
- return err
- }
- }
- return nil
-}
-
func (c *Client) methodAndURL(d Document, sb *bytes.Buffer) (string, string) {
httpMethod := ""
switch d.Operation {
@@ -229,7 +217,7 @@ func (c *Client) preparePending() {
for pd := range c.pending {
pd.buf = c.buffer()
method, url := c.methodAndURL(pd.document, pd.buf)
- pd.request, pd.err = c.createRequest(method, url, pd.document.Fields, pd.buf)
+ pd.request, pd.err = c.createRequest(method, url, pd.document.Body, pd.buf)
pd.prepared <- true
}
}
@@ -259,24 +247,23 @@ func (c *Client) createRequest(method, url string, body []byte, buf *bytes.Buffe
if len(body) == 0 {
return newRequest(method, url, nil, false)
}
- bodySize := len(fieldsPrefix) + len(body) + len(fieldsSuffix)
- useGzip := c.options.Compression == CompressionGzip || (c.options.Compression == CompressionAuto && bodySize > 512)
- buf.Grow(min(1024, bodySize))
+ useGzip := c.options.Compression == CompressionGzip || (c.options.Compression == CompressionAuto && len(body) > 512)
+ var r io.Reader
if useGzip {
+ buf.Grow(min(1024, len(body)))
zw := c.gzipWriter(buf)
defer c.gzippers.Put(zw)
- if err := writeRequestBody(zw, body); err != nil {
+ if _, err := zw.Write(body); err != nil {
return nil, err
}
if err := zw.Close(); err != nil {
return nil, err
}
+ r = buf
} else {
- if err := writeRequestBody(buf, body); err != nil {
- return nil, err
- }
+ r = bytes.NewReader(body)
}
- return newRequest(method, url, buf, useGzip)
+ return newRequest(method, url, r, useGzip)
}
func (c *Client) clientTimeout() time.Duration {
@@ -295,7 +282,10 @@ func (c *Client) Send(document Document) Result {
if err != nil {
return resultWithErr(result, err)
}
- bodySize := buf.Len()
+ bodySize := len(document.Body)
+ if buf.Len() > 0 {
+ bodySize = buf.Len()
+ }
resp, err := c.leastBusyClient().Do(req, c.clientTimeout())
if err != nil {
return resultWithErr(result, err)
diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go
index 7d636aa8d5c..6eda5f04fd6 100644
--- a/client/go/internal/vespa/document/http_test.go
+++ b/client/go/internal/vespa/document/http_test.go
@@ -3,7 +3,6 @@ package document
import (
"bytes"
"fmt"
- "net/http"
"reflect"
"strings"
"testing"
@@ -62,16 +61,16 @@ func TestClientSend(t *testing.T) {
method string
url string
}{
- {Document{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Fields: []byte(`{"foo": "123"}`)},
+ {Document{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "123"}}`)},
"PUT",
"https://example.com:1337/document/v1/ns/type/docid/doc1?timeout=5000ms&create=true"},
- {Document{Id: mustParseId("id:ns:type::doc2"), Operation: OperationUpdate, Fields: []byte(`{"foo": "456"}`)},
+ {Document{Id: mustParseId("id:ns:type::doc2"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "456"}}`)},
"PUT",
"https://example.com:1337/document/v1/ns/type/docid/doc2?timeout=5000ms"},
{Document{Id: mustParseId("id:ns:type::doc3"), Operation: OperationRemove},
"DELETE",
"https://example.com:1337/document/v1/ns/type/docid/doc3?timeout=5000ms"},
- {Document{Condition: "foo", Id: mustParseId("id:ns:type::doc4"), Operation: OperationUpdate, Fields: []byte(`{"baz": "789"}`)},
+ {Document{Condition: "foo", Id: mustParseId("id:ns:type::doc4"), Operation: OperationUpdate, Body: []byte(`{"fields":{"baz": "789"}}`)},
"PUT",
"https://example.com:1337/document/v1/ns/type/docid/doc4?timeout=5000ms&condition=foo"},
}
@@ -95,7 +94,6 @@ func TestClientSend(t *testing.T) {
MaxLatency: time.Second,
},
}
- var wantBody bytes.Buffer
if i < 3 {
httpClient.NextResponseString(200, `{"message":"All good!"}`)
wantRes.Status = StatusSuccess
@@ -112,11 +110,6 @@ func TestClientSend(t *testing.T) {
wantRes.Stats.Errors = 1
wantRes.Stats.BytesRecv = 36
}
- if tt.method == http.MethodPut {
- wantBody.WriteString(`{"fields":`)
- wantBody.Write(doc.Fields)
- wantBody.WriteString("}")
- }
res := client.Send(doc)
wantRes.Stats.BytesSent = int64(len(httpClient.LastBody))
if !reflect.DeepEqual(res, wantRes) {
@@ -133,8 +126,8 @@ func TestClientSend(t *testing.T) {
if r.URL.String() != tt.url {
t.Errorf("got r.URL = %q, want %q", r.URL, tt.url)
}
- if !bytes.Equal(httpClient.LastBody, wantBody.Bytes()) {
- t.Errorf("got r.Body = %q, want %q", string(httpClient.LastBody), wantBody.String())
+ if !bytes.Equal(httpClient.LastBody, doc.Body) {
+ t.Errorf("got r.Body = %q, want %q", string(httpClient.LastBody), doc.Body)
}
}
want := Stats{
@@ -164,9 +157,9 @@ func TestClientSendCompressed(t *testing.T) {
Timeout: time.Duration(5 * time.Second),
}, []util.HTTPClient{httpClient})
- bigBody := fmt.Sprintf(`{"foo": "%s"}`, strings.Repeat("s", 512+1))
- bigDoc := Document{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Fields: []byte(bigBody)}
- smallDoc := Document{Create: true, Id: mustParseId("id:ns:type::doc2"), Operation: OperationUpdate, Fields: []byte(`{"foo": "s"}`)}
+ bigBody := fmt.Sprintf(`{"fields": {"foo": "%s"}}`, strings.Repeat("s", 512+1))
+ bigDoc := Document{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(bigBody)}
+ smallDoc := Document{Create: true, Id: mustParseId("id:ns:type::doc2"), Operation: OperationUpdate, Body: []byte(`{"fields": {"foo": "s"}}`)}
var result Result
client.options.Compression = CompressionNone
@@ -307,7 +300,7 @@ func benchmarkClientSend(b *testing.B, compression Compression, document Documen
}
func makeDocument(size int) Document {
- return Document{Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Fields: []byte(fmt.Sprintf(`{"foo": "%s"}`, randString(size)))}
+ return Document{Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(fmt.Sprintf(`{"fields": {"foo": "%s"}}`, randString(size)))}
}
func BenchmarkClientSendSmallUncompressed(b *testing.B) {