aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-05-19 14:46:39 +0200
committerGitHub <noreply@github.com>2023-05-19 14:46:39 +0200
commite967fdfefa60f38e5664039808109807c23bc52c (patch)
tree039a4d8f4f7da3e6cd55c9eccea052e181951fd4
parenta1ed382bcd9966230516e5fff41517cabbf8d082 (diff)
parenta05086ede99a21d46557328b2507f582e9d1cedf (diff)
Merge pull request #27157 from vespa-engine/mpolden/shed-buffer
Always drop operations from the buffer
-rw-r--r--client/go/internal/vespa/document/document.go21
-rw-r--r--client/go/internal/vespa/document/document_test.go29
2 files changed, 35 insertions, 15 deletions
diff --git a/client/go/internal/vespa/document/document.go b/client/go/internal/vespa/document/document.go
index 1e7e3af7f73..8f884b223d7 100644
--- a/client/go/internal/vespa/document/document.go
+++ b/client/go/internal/vespa/document/document.go
@@ -1,6 +1,7 @@
package document
import (
+ "bufio"
"bytes"
"fmt"
"io"
@@ -211,7 +212,7 @@ func (d *Decoder) Decode() (Document, error) {
return doc, err
}
-func (d *Decoder) readField(name string, doc *Document) error {
+func (d *Decoder) readField(name string, offset int64, doc *Document) error {
readId := false
switch name {
case "id", "put":
@@ -239,9 +240,9 @@ func (d *Decoder) readField(name string, doc *Document) error {
if _, err := d.readNext(jsonObjectStart); err != nil {
return err
}
- start := d.dec.InputOffset() - 1
- // Skip data between the most recent ending position of fields and current offset
- d.buf.Next(int(start - d.fieldsEnd))
+ // Skip data between start of operation and start of fields
+ fieldsStart := d.dec.InputOffset() - 1
+ d.buf.Next(int(fieldsStart - offset))
depth := 1
for depth > 0 {
t, err := d.dec.ReadToken()
@@ -256,7 +257,7 @@ func (d *Decoder) readField(name string, doc *Document) error {
}
}
d.fieldsEnd = d.dec.InputOffset()
- fields := d.buf.Next(int(d.fieldsEnd - start))
+ fields := d.buf.Next(int(d.fieldsEnd - fieldsStart))
doc.Body = make([]byte, 0, len(fieldsPrefix)+len(fields)+len(fieldsSuffix))
doc.Body = append(doc.Body, fieldsPrefix...)
doc.Body = append(doc.Body, fields...)
@@ -277,6 +278,7 @@ func (d *Decoder) readField(name string, doc *Document) error {
}
func (d *Decoder) decode() (Document, error) {
+ start := d.dec.InputOffset()
if err := d.guessMode(); err != nil {
return Document{}, err
}
@@ -300,13 +302,17 @@ loop:
if err != nil {
return Document{}, err
}
- if err := d.readField(t.String(), &doc); err != nil {
+ if err := d.readField(t.String(), start, &doc); err != nil {
return Document{}, err
}
default:
if _, err := d.readNext(jsonObjectEnd); err != nil {
return Document{}, err
}
+ // Drop operation from the buffer
+ start = max(start, d.fieldsEnd)
+ end := d.dec.InputOffset()
+ d.buf.Next(int(end - start))
break loop
}
}
@@ -314,8 +320,9 @@ loop:
}
func NewDecoder(r io.Reader) *Decoder {
+ br := bufio.NewReaderSize(r, 1<<26)
d := &Decoder{}
- d.dec = json.NewDecoder(io.TeeReader(r, &d.buf))
+ d.dec = json.NewDecoder(io.TeeReader(br, &d.buf))
return d
}
diff --git a/client/go/internal/vespa/document/document_test.go b/client/go/internal/vespa/document/document_test.go
index f6713c4c0a1..fbaa076ab9d 100644
--- a/client/go/internal/vespa/document/document_test.go
+++ b/client/go/internal/vespa/document/document_test.go
@@ -116,7 +116,8 @@ func feedInput(jsonl bool) string {
"fields": { "foo" : "123", "bar": {"a": [1, 2, 3]}}
}`,
`
-{
+
+ {
"put": "id:ns:type::doc2",
"create": false,
"condition": "foo",
@@ -129,9 +130,13 @@ func feedInput(jsonl bool) string {
`,
`
{
+ "fields": {"qux": "789"},
"put": "id:ns:type::doc4",
- "create": true,
- "fields": {"qux": "789"}
+ "create": true
+}`,
+ `
+{
+ "remove": "id:ns:type::doc5"
}`}
if jsonl {
return strings.Join(operations, "\n")
@@ -141,16 +146,17 @@ func feedInput(jsonl bool) string {
func testDocumentDecoder(t *testing.T, jsonLike string) {
t.Helper()
- r := NewDecoder(strings.NewReader(jsonLike))
+ dec := NewDecoder(strings.NewReader(jsonLike))
want := []Document{
{Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{ "foo" : "123", "bar": {"a": [1, 2, 3]}}}`)},
{Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Condition: "foo", Body: []byte(`{"fields":{"bar": "456"}}`)},
{Id: mustParseId("id:ns:type::doc3"), Operation: OperationRemove},
{Id: mustParseId("id:ns:type::doc4"), Operation: OperationPut, Create: true, Body: []byte(`{"fields":{"qux": "789"}}`)},
+ {Id: mustParseId("id:ns:type::doc5"), Operation: OperationRemove},
}
got := []Document{}
for {
- doc, err := r.Decode()
+ doc, err := dec.Decode()
if err == io.EOF {
break
}
@@ -159,6 +165,13 @@ func testDocumentDecoder(t *testing.T, jsonLike string) {
}
got = append(got, doc)
}
+ wantBufLen := 0
+ if dec.array {
+ wantBufLen = 1
+ }
+ if l := dec.buf.Len(); l != wantBufLen {
+ t.Errorf("got dec.buf.Len() = %d, want %d", l, wantBufLen)
+ }
if !reflect.DeepEqual(got, want) {
t.Errorf("got %+v, want %+v", got, want)
}
@@ -179,12 +192,12 @@ func TestDocumentDecoderInvalid(t *testing.T) {
"fields": {"foo": "invalid
}
`
- r := NewDecoder(strings.NewReader(jsonLike))
- _, err := r.Decode() // first object is valid
+ dec := NewDecoder(strings.NewReader(jsonLike))
+ _, err := dec.Decode() // first object is valid
if err != nil {
t.Errorf("unexpected error: %s", err)
}
- _, err = r.Decode()
+ _, err = dec.Decode()
wantErr := "invalid json at byte offset 110: json: invalid character '\\n' within string (expecting non-control character)"
if err.Error() != wantErr {
t.Errorf("want error %q, got %q", wantErr, err.Error())