aboutsummaryrefslogtreecommitdiffstats
path: root/client/go/internal/vespa/document
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-05-08 11:00:44 +0200
committerMartin Polden <mpolden@mpolden.no>2023-05-08 11:00:44 +0200
commit3b4d93a502d855558ccf1225de34f6b1a2832aff (patch)
tree89b71ded3da65d9c7e77608fa6aa7ab6520e4574 /client/go/internal/vespa/document
parentfba3ea069bad515c7fb8ffc9da26aa9e437ca3b0 (diff)
Avoid copying fields
Diffstat (limited to 'client/go/internal/vespa/document')
-rw-r--r--client/go/internal/vespa/document/dispatcher_test.go6
-rw-r--r--client/go/internal/vespa/document/document.go12
-rw-r--r--client/go/internal/vespa/document/document_test.go4
-rw-r--r--client/go/internal/vespa/document/http.go18
-rw-r--r--client/go/internal/vespa/document/http_test.go27
5 files changed, 34 insertions, 33 deletions
diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go
index 7ef2043f9c5..252bd94dff9 100644
--- a/client/go/internal/vespa/document/dispatcher_test.go
+++ b/client/go/internal/vespa/document/dispatcher_test.go
@@ -61,8 +61,8 @@ func TestDispatcher(t *testing.T) {
breaker := NewCircuitBreaker(time.Second, 0)
dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard, false)
docs := []Document{
- {Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{"foo": "123"}}`)},
- {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Body: []byte(`{"fields":{"bar": "456"}}`)},
+ {Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Fields: []byte(`{"foo": "123"}`)},
+ {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Fields: []byte(`{"bar": "456"}`)},
}
for _, d := range docs {
dispatcher.Enqueue(d)
@@ -192,7 +192,7 @@ func BenchmarkDocumentDispatching(b *testing.B) {
throttler := newThrottler(8, clock.now)
breaker := NewCircuitBreaker(time.Second, 0)
dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard, false)
- doc := Document{Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{"foo": "123"}}`)}
+ doc := Document{Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Fields: []byte(`{"foo": "123"}`)}
b.ResetTimer() // ignore setup time
for n := 0; n < b.N; n++ {
diff --git a/client/go/internal/vespa/document/document.go b/client/go/internal/vespa/document/document.go
index 8bc897d49b3..984f3d3741a 100644
--- a/client/go/internal/vespa/document/document.go
+++ b/client/go/internal/vespa/document/document.go
@@ -96,7 +96,7 @@ type Document struct {
Operation Operation
Condition string
Create bool
- Body []byte
+ Fields []byte
}
type jsonDocument struct {
@@ -235,20 +235,12 @@ func parseDocument(d *jsonDocument) (Document, error) {
if err != nil {
return Document{}, err
}
- var body []byte
- if d.Fields != nil {
- jsonObject := `{"fields":`
- body = make([]byte, 0, len(jsonObject)+len(d.Fields)+1)
- body = append(body, []byte(jsonObject)...)
- body = append(body, d.Fields...)
- body = append(body, byte('}'))
- }
return Document{
Id: docId,
Operation: op,
Condition: d.Condition,
Create: d.Create,
- Body: body,
+ Fields: d.Fields,
}, nil
}
diff --git a/client/go/internal/vespa/document/document_test.go b/client/go/internal/vespa/document/document_test.go
index bdf18586753..495f3963322 100644
--- a/client/go/internal/vespa/document/document_test.go
+++ b/client/go/internal/vespa/document/document_test.go
@@ -134,8 +134,8 @@ func testDocumentDecoder(t *testing.T, jsonLike string) {
t.Helper()
r := NewDecoder(strings.NewReader(jsonLike))
want := []Document{
- {Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{"foo": "123"}}`)},
- {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Body: []byte(`{"fields":{"bar": "456"}}`)},
+ {Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Fields: []byte(`{"foo": "123"}`)},
+ {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Fields: []byte(`{"bar": "456"}`)},
{Id: mustParseId("id:ns:type::doc1"), Operation: OperationRemove},
}
got := []Document{}
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go
index 3581a791dbe..6e7c2219e8f 100644
--- a/client/go/internal/vespa/document/http.go
+++ b/client/go/internal/vespa/document/http.go
@@ -25,6 +25,9 @@ const (
CompressionAuto Compression = iota
CompressionNone
CompressionGzip
+
+ fieldsPrefix = `{"fields":`
+ fieldsSuffix = "}"
)
// Client represents a HTTP client for the /document/v1/ API.
@@ -179,13 +182,18 @@ func (c *Client) buffer() *bytes.Buffer {
}
func (c *Client) createRequest(method, url string, body []byte) (*http.Request, error) {
- var r io.Reader
+ // include the outer object expected by /document/v1/ without copying the body
+ r := io.MultiReader(
+ strings.NewReader(fieldsPrefix),
+ bytes.NewReader(body),
+ strings.NewReader(fieldsSuffix),
+ )
useGzip := c.options.Compression == CompressionGzip || (c.options.Compression == CompressionAuto && len(body) > 512)
if useGzip {
var buf bytes.Buffer
buf.Grow(1024)
w := c.gzipWriter(&buf)
- if _, err := w.Write(body); err != nil {
+ if _, err := io.Copy(w, r); err != nil {
return nil, err
}
if err := w.Close(); err != nil {
@@ -193,8 +201,6 @@ func (c *Client) createRequest(method, url string, body []byte) (*http.Request,
}
c.gzippers.Put(w)
r = &buf
- } else {
- r = bytes.NewReader(body)
}
req, err := http.NewRequest(method, url, r)
if err != nil {
@@ -219,7 +225,7 @@ func (c *Client) Send(document Document) Result {
start := c.now()
result := Result{Id: document.Id, Stats: Stats{Requests: 1}}
method, url := c.methodAndURL(document)
- req, err := c.createRequest(method, url, document.Body)
+ req, err := c.createRequest(method, url, document.Fields)
if err != nil {
return resultWithErr(result, err)
}
@@ -271,7 +277,7 @@ func (c *Client) resultWithResponse(resp *http.Response, result Result, document
}
result.Message = body.Message
result.Trace = string(body.Trace)
- result.Stats.BytesSent = int64(len(document.Body))
+ result.Stats.BytesSent = int64(len(document.Fields) + len(fieldsPrefix) + len(fieldsSuffix))
result.Stats.BytesRecv = int64(written)
if !result.Success() {
result.Stats.Errors++
diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go
index 489460b3ed7..da7f264e8d8 100644
--- a/client/go/internal/vespa/document/http_test.go
+++ b/client/go/internal/vespa/document/http_test.go
@@ -59,9 +59,9 @@ func assertLeastBusy(t *testing.T, id int, client *Client) {
func TestClientSend(t *testing.T) {
docs := []Document{
- {Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "123"}}`)},
- {Create: true, Id: mustParseId("id:ns:type::doc2"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "456"}}`)},
- {Create: true, Id: mustParseId("id:ns:type::doc3"), Operation: OperationUpdate, Body: []byte(`{"fields":{"baz": "789"}}`)},
+ {Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Fields: []byte(`{"foo": "123"}`)},
+ {Create: true, Id: mustParseId("id:ns:type::doc2"), Operation: OperationUpdate, Fields: []byte(`{"foo": "456"}`)},
+ {Create: true, Id: mustParseId("id:ns:type::doc3"), Operation: OperationUpdate, Fields: []byte(`{"baz": "789"}`)},
}
httpClient := mock.HTTPClient{}
client, _ := NewClient(ClientOptions{
@@ -116,9 +116,12 @@ func TestClientSend(t *testing.T) {
if err != nil {
t.Fatalf("got unexpected error %q", err)
}
- wantBody := doc.Body
- if !bytes.Equal(body, wantBody) {
- t.Errorf("got r.Body = %q, want %q", string(body), string(wantBody))
+ var wantBody bytes.Buffer
+ wantBody.WriteString(`{"fields":`)
+ wantBody.Write(doc.Fields)
+ wantBody.WriteString("}")
+ if !bytes.Equal(body, wantBody.Bytes()) {
+ t.Errorf("got r.Body = %q, want %q", string(body), wantBody.String())
}
}
want := Stats{
@@ -148,9 +151,9 @@ func TestClientSendCompressed(t *testing.T) {
Timeout: time.Duration(5 * time.Second),
}, []util.HTTPClient{&httpClient})
- bigBody := fmt.Sprintf(`{"fields":{"foo": "%s"}}`, strings.Repeat("s", 512+1))
- bigDoc := Document{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(bigBody)}
- smallDoc := Document{Create: true, Id: mustParseId("id:ns:type::doc2"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "s"}}`)}
+ bigBody := fmt.Sprintf(`{"foo": "%s"}`, strings.Repeat("s", 512+1))
+ bigDoc := Document{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Fields: []byte(bigBody)}
+ smallDoc := Document{Create: true, Id: mustParseId("id:ns:type::doc2"), Operation: OperationUpdate, Fields: []byte(`{"foo": "s"}`)}
client.options.Compression = CompressionNone
_ = client.Send(bigDoc)
@@ -289,12 +292,12 @@ func benchmarkClientSend(b *testing.B, compression Compression, document Documen
}
func BenchmarkClientSend(b *testing.B) {
- doc := Document{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "my document"}}`)}
+ doc := Document{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Fields: []byte(`{"foo": "my document"}`)}
benchmarkClientSend(b, CompressionNone, doc)
}
func BenchmarkClientSendCompressed(b *testing.B) {
- body := fmt.Sprintf(`{"fields":{"foo": "%s"}}`, strings.Repeat("my document", 100))
- doc := Document{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(body)}
+ body := fmt.Sprintf(`{"foo": "%s"}`, strings.Repeat("my document", 100))
+ doc := Document{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Fields: []byte(body)}
benchmarkClientSend(b, CompressionGzip, doc)
}