summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-05-23 16:18:43 +0200
committerMartin Polden <mpolden@mpolden.no>2023-05-23 16:32:53 +0200
commit340f1cde8109d3afd2b57330bf1c450ca664f56d (patch)
treef0237b0c2955cb784f969cdaa93ef5f7ad08551f
parentbe124652c6f20677ecf07aad887d0cf02944c83e (diff)
Reuse buffer holding body after document is processed
-rw-r--r--client/go/internal/vespa/document/dispatcher.go1
-rw-r--r--client/go/internal/vespa/document/document.go40
-rw-r--r--client/go/internal/vespa/document/document_test.go18
3 files changed, 49 insertions, 10 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index 7a19d21f278..8ddb34c8c4d 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -137,6 +137,7 @@ func (d *Dispatcher) processResults() {
if d.shouldRetry(op, op.result) {
d.enqueue(op.resetResult(), true)
} else {
+ op.document.Reset()
d.inflightWg.Done()
}
d.dispatchNext(op.document.Id)
diff --git a/client/go/internal/vespa/document/document.go b/client/go/internal/vespa/document/document.go
index 8f884b223d7..616013dc59a 100644
--- a/client/go/internal/vespa/document/document.go
+++ b/client/go/internal/vespa/document/document.go
@@ -8,6 +8,7 @@ import (
"math/rand"
"strconv"
"strings"
+ "sync"
"time"
@@ -116,6 +117,24 @@ type Document struct {
Body []byte
Operation Operation
Create bool
+
+ resetFunc func()
+}
+
+func (d Document) Equal(o Document) bool {
+ return d.Id.Equal(o.Id) &&
+ d.Condition == o.Condition &&
+ bytes.Equal(d.Body, o.Body) &&
+ d.Operation == o.Operation &&
+ d.Create == o.Create
+}
+
+// Reset discards the body of this document.
+func (d *Document) Reset() {
+ d.Body = nil
+ if d.resetFunc != nil {
+ d.resetFunc()
+ }
}
// Decoder decodes documents from a JSON structure which is either an array of objects, or objects separated by newline.
@@ -127,6 +146,8 @@ type Decoder struct {
jsonl bool
fieldsEnd int64
+
+ documentBuffers sync.Pool
}
func (d Document) String() string {
@@ -212,6 +233,12 @@ func (d *Decoder) Decode() (Document, error) {
return doc, err
}
+func (d *Decoder) buffer() *bytes.Buffer {
+ buf := d.documentBuffers.Get().(*bytes.Buffer)
+ buf.Reset()
+ return buf
+}
+
func (d *Decoder) readField(name string, offset int64, doc *Document) error {
readId := false
switch name {
@@ -258,10 +285,14 @@ func (d *Decoder) readField(name string, offset int64, doc *Document) error {
}
d.fieldsEnd = d.dec.InputOffset()
fields := d.buf.Next(int(d.fieldsEnd - fieldsStart))
- doc.Body = make([]byte, 0, len(fieldsPrefix)+len(fields)+len(fieldsSuffix))
- doc.Body = append(doc.Body, fieldsPrefix...)
- doc.Body = append(doc.Body, fields...)
- doc.Body = append(doc.Body, fieldsSuffix...)
+ // Try to re-use buffers holding the document body. The buffer is released by document.Reset()
+ bodyBuf := d.buffer()
+ bodyBuf.Grow(len(fieldsPrefix) + len(fields) + len(fieldsSuffix))
+ bodyBuf.Write(fieldsPrefix)
+ bodyBuf.Write(fields)
+ bodyBuf.Write(fieldsSuffix)
+ doc.Body = bodyBuf.Bytes()
+ doc.resetFunc = func() { d.documentBuffers.Put(bodyBuf) }
}
if readId {
s, err := d.readString()
@@ -322,6 +353,7 @@ loop:
func NewDecoder(r io.Reader) *Decoder {
br := bufio.NewReaderSize(r, 1<<26)
d := &Decoder{}
+ d.documentBuffers.New = func() any { return &bytes.Buffer{} }
d.dec = json.NewDecoder(io.TeeReader(br, &d.buf))
return d
}
diff --git a/client/go/internal/vespa/document/document_test.go b/client/go/internal/vespa/document/document_test.go
index fbaa076ab9d..f9bf321f1fb 100644
--- a/client/go/internal/vespa/document/document_test.go
+++ b/client/go/internal/vespa/document/document_test.go
@@ -3,7 +3,6 @@ package document
import (
"fmt"
"io"
- "reflect"
"strings"
"testing"
"time"
@@ -147,14 +146,14 @@ func feedInput(jsonl bool) string {
func testDocumentDecoder(t *testing.T, jsonLike string) {
t.Helper()
dec := NewDecoder(strings.NewReader(jsonLike))
- want := []Document{
+ docs := []Document{
{Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{ "foo" : "123", "bar": {"a": [1, 2, 3]}}}`)},
{Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Condition: "foo", Body: []byte(`{"fields":{"bar": "456"}}`)},
{Id: mustParseId("id:ns:type::doc3"), Operation: OperationRemove},
{Id: mustParseId("id:ns:type::doc4"), Operation: OperationPut, Create: true, Body: []byte(`{"fields":{"qux": "789"}}`)},
{Id: mustParseId("id:ns:type::doc5"), Operation: OperationRemove},
}
- got := []Document{}
+ result := []Document{}
for {
doc, err := dec.Decode()
if err == io.EOF {
@@ -163,7 +162,7 @@ func testDocumentDecoder(t *testing.T, jsonLike string) {
if err != nil {
t.Fatal(err)
}
- got = append(got, doc)
+ result = append(result, doc)
}
wantBufLen := 0
if dec.array {
@@ -172,8 +171,15 @@ func testDocumentDecoder(t *testing.T, jsonLike string) {
if l := dec.buf.Len(); l != wantBufLen {
t.Errorf("got dec.buf.Len() = %d, want %d", l, wantBufLen)
}
- if !reflect.DeepEqual(got, want) {
- t.Errorf("got %+v, want %+v", got, want)
+ if len(docs) != len(result) {
+ t.Errorf("len(result) = %d, want %d", len(result), len(docs))
+ }
+ for i := 0; i < len(docs); i++ {
+ got := result[i]
+ want := docs[i]
+ if !got.Equal(want) {
+ t.Errorf("got %+v, want %+v", got, want)
+ }
}
}