diff options
Diffstat (limited to 'client')
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 1 | ||||
-rw-r--r-- | client/go/internal/vespa/document/document.go | 40 | ||||
-rw-r--r-- | client/go/internal/vespa/document/document_test.go | 18 |
3 files changed, 49 insertions, 10 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 7a19d21f278..8ddb34c8c4d 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -137,6 +137,7 @@ func (d *Dispatcher) processResults() { if d.shouldRetry(op, op.result) { d.enqueue(op.resetResult(), true) } else { + op.document.Reset() d.inflightWg.Done() } d.dispatchNext(op.document.Id) diff --git a/client/go/internal/vespa/document/document.go b/client/go/internal/vespa/document/document.go index 8f884b223d7..616013dc59a 100644 --- a/client/go/internal/vespa/document/document.go +++ b/client/go/internal/vespa/document/document.go @@ -8,6 +8,7 @@ import ( "math/rand" "strconv" "strings" + "sync" "time" @@ -116,6 +117,24 @@ type Document struct { Body []byte Operation Operation Create bool + + resetFunc func() +} + +func (d Document) Equal(o Document) bool { + return d.Id.Equal(o.Id) && + d.Condition == o.Condition && + bytes.Equal(d.Body, o.Body) && + d.Operation == o.Operation && + d.Create == o.Create +} + +// Reset discards the body of this document. +func (d *Document) Reset() { + d.Body = nil + if d.resetFunc != nil { + d.resetFunc() + } } // Decoder decodes documents from a JSON structure which is either an array of objects, or objects separated by newline. @@ -127,6 +146,8 @@ type Decoder struct { jsonl bool fieldsEnd int64 + + documentBuffers sync.Pool } func (d Document) String() string { @@ -212,6 +233,12 @@ func (d *Decoder) Decode() (Document, error) { return doc, err } +func (d *Decoder) buffer() *bytes.Buffer { + buf := d.documentBuffers.Get().(*bytes.Buffer) + buf.Reset() + return buf +} + func (d *Decoder) readField(name string, offset int64, doc *Document) error { readId := false switch name { @@ -258,10 +285,14 @@ func (d *Decoder) readField(name string, offset int64, doc *Document) error { } d.fieldsEnd = d.dec.InputOffset() fields := d.buf.Next(int(d.fieldsEnd - fieldsStart)) - doc.Body = make([]byte, 0, len(fieldsPrefix)+len(fields)+len(fieldsSuffix)) - doc.Body = append(doc.Body, fieldsPrefix...) - doc.Body = append(doc.Body, fields...) - doc.Body = append(doc.Body, fieldsSuffix...) + // Try to re-use buffers holding the document body. The buffer is released by document.Reset() + bodyBuf := d.buffer() + bodyBuf.Grow(len(fieldsPrefix) + len(fields) + len(fieldsSuffix)) + bodyBuf.Write(fieldsPrefix) + bodyBuf.Write(fields) + bodyBuf.Write(fieldsSuffix) + doc.Body = bodyBuf.Bytes() + doc.resetFunc = func() { d.documentBuffers.Put(bodyBuf) } } if readId { s, err := d.readString() @@ -322,6 +353,7 @@ loop: func NewDecoder(r io.Reader) *Decoder { br := bufio.NewReaderSize(r, 1<<26) d := &Decoder{} + d.documentBuffers.New = func() any { return &bytes.Buffer{} } d.dec = json.NewDecoder(io.TeeReader(br, &d.buf)) return d } diff --git a/client/go/internal/vespa/document/document_test.go b/client/go/internal/vespa/document/document_test.go index fbaa076ab9d..f9bf321f1fb 100644 --- a/client/go/internal/vespa/document/document_test.go +++ b/client/go/internal/vespa/document/document_test.go @@ -3,7 +3,6 @@ package document import ( "fmt" "io" - "reflect" "strings" "testing" "time" @@ -147,14 +146,14 @@ func feedInput(jsonl bool) string { func testDocumentDecoder(t *testing.T, jsonLike string) { t.Helper() dec := NewDecoder(strings.NewReader(jsonLike)) - want := []Document{ + docs := []Document{ {Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{ "foo" : "123", "bar": {"a": [1, 2, 3]}}}`)}, {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Condition: "foo", Body: []byte(`{"fields":{"bar": "456"}}`)}, {Id: mustParseId("id:ns:type::doc3"), Operation: OperationRemove}, {Id: mustParseId("id:ns:type::doc4"), Operation: OperationPut, Create: true, Body: []byte(`{"fields":{"qux": "789"}}`)}, {Id: mustParseId("id:ns:type::doc5"), Operation: OperationRemove}, } - got := []Document{} + result := []Document{} for { doc, err := dec.Decode() if err == io.EOF { @@ -163,7 +162,7 @@ func testDocumentDecoder(t *testing.T, jsonLike string) { if err != nil { t.Fatal(err) } - got = append(got, doc) + result = append(result, doc) } wantBufLen := 0 if dec.array { @@ -172,8 +171,15 @@ func testDocumentDecoder(t *testing.T, jsonLike string) { if l := dec.buf.Len(); l != wantBufLen { t.Errorf("got dec.buf.Len() = %d, want %d", l, wantBufLen) } - if !reflect.DeepEqual(got, want) { - t.Errorf("got %+v, want %+v", got, want) + if len(docs) != len(result) { + t.Errorf("len(result) = %d, want %d", len(result), len(docs)) + } + for i := 0; i < len(docs); i++ { + got := result[i] + want := docs[i] + if !got.Equal(want) { + t.Errorf("got %+v, want %+v", got, want) + } } } |