diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-03-15 15:07:25 +0100 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-03-23 12:13:16 +0100 |
commit | 1f568435f5111da3eb19d6be2e9be7d89c1d60dc (patch) | |
tree | 32e7e85c420f383c161ab1d88af3b4cb8549ca1f | |
parent | 98d07af6a96a3f58762ee3e64550c399fb917ee4 (diff) |
Implement document decoder
-rw-r--r-- | client/go/internal/vespa/feed/document.go | 112 | ||||
-rw-r--r-- | client/go/internal/vespa/feed/document_test.go | 56 |
2 files changed, 168 insertions, 0 deletions
diff --git a/client/go/internal/vespa/feed/document.go b/client/go/internal/vespa/feed/document.go index 363620fc863..eccbb80b7c2 100644 --- a/client/go/internal/vespa/feed/document.go +++ b/client/go/internal/vespa/feed/document.go @@ -1,11 +1,123 @@ package feed import ( + "bufio" + "encoding/json" "fmt" + "io" "strconv" "strings" ) +var asciiSpace = [256]uint8{'\t': 1, '\n': 1, '\v': 1, '\f': 1, '\r': 1, ' ': 1} + +type Document struct { + Id string `json:"id"` + PutId string `json:"put"` + UpdateId string `json:"update"` + RemoveId string `json:"remove"` + Condition string `json:"condition"` + Create bool `json:"create"` + Fields json.RawMessage `json:"fields"` +} + +// Operation returns the operator to perform and the target document ID of the operation. +func (d Document) Operation() (string, DocumentId, error) { + operation := "" + id := "" + if id != "" { + operation = "put" + id = d.Id + } else if d.PutId != "" { + operation = "put" + id = d.PutId + } else if d.UpdateId != "" { + operation = "update" + id = d.UpdateId + } else if d.RemoveId != "" { + operation = "remove" + id = d.RemoveId + } else { + return "", DocumentId{}, fmt.Errorf("invalid document: missing operation") + } + docId, err := ParseDocumentId(id) + if err != nil { + return "", DocumentId{}, err + } + return operation, docId, err +} + +// Decoder decodes documents from a JSON structure which is either an array of objects, or objects separated by newline. +type Decoder struct { + buf *bufio.Reader + dec *json.Decoder + array bool + jsonl bool +} + +func (d *Decoder) guessMode() error { + for !d.array && !d.jsonl { + b, err := d.buf.ReadByte() + if err != nil { + return err + } + // Skip leading whitespace + if b < 0x80 && asciiSpace[b] != 0 { + continue + } + switch rune(b) { + case '{': + d.jsonl = true + case '[': + d.array = true + default: + return fmt.Errorf("unexpected token: %q", string(b)) + } + if err := d.buf.UnreadByte(); err != nil { + return err + } + if d.array { + // prepare for decoding objects inside array + if _, err := d.dec.Token(); err != nil { + return err + } + } + } + return nil +} + +func (d *Decoder) readCloseToken() error { + if !d.array { + return nil + } + _, err := d.dec.Token() + return err +} + +func (d *Decoder) Decode() (Document, error) { + if err := d.guessMode(); err != nil { + return Document{}, err + } + if !d.dec.More() { + err := io.EOF + if tokenErr := d.readCloseToken(); tokenErr != nil { + err = tokenErr + } + return Document{}, err + } + doc := Document{} + err := d.dec.Decode(&doc) + return doc, err +} + +func NewDecoder(r io.Reader) *Decoder { + buf := bufio.NewReader(r) + return &Decoder{ + buf: buf, + dec: json.NewDecoder(buf), + } +} + // A Vespa document ID. type DocumentId struct { Type string diff --git a/client/go/internal/vespa/feed/document_test.go b/client/go/internal/vespa/feed/document_test.go index 93188f95b22..6c8f92bdaa6 100644 --- a/client/go/internal/vespa/feed/document_test.go +++ b/client/go/internal/vespa/feed/document_test.go @@ -1,6 +1,10 @@ package feed import ( + "encoding/json" + "io" + "reflect" + "strings" "testing" ) @@ -77,3 +81,55 @@ func TestParseDocumentId(t *testing.T) { } } } + +func feedInput(jsonl bool) string { + operations := []string{ + ` +{ + "put": "id:ns:type::doc1", + "fields": {"foo": "123"} +}`, + ` +{ + "put": "id:ns:type::doc2", + "fields": {"bar": "456"} +}`, + ` +{ + "remove": "id:ns:type::doc1" +} +`} + if jsonl { + return strings.Join(operations, "\n") + } + return " \n[" + strings.Join(operations, ",") + "]" +} + +func testDocumentDecoder(t *testing.T, jsonLike string) { + t.Helper() + r := NewDecoder(strings.NewReader(jsonLike)) + want := []Document{ + {PutId: "id:ns:type::doc1", Fields: json.RawMessage(`{"foo": "123"}`)}, + {PutId: "id:ns:type::doc2", Fields: json.RawMessage(`{"bar": "456"}`)}, + {RemoveId: "id:ns:type::doc1", Fields: json.RawMessage(nil)}, + } + got := []Document{} + for { + doc, err := r.Decode() + if err == io.EOF { + break + } + if err != nil { + t.Fatal(err) + } + got = append(got, doc) + } + if !reflect.DeepEqual(got, want) { + t.Errorf("got %+v, want %+v", got, want) + } +} + +func TestDocumentDecoder(t *testing.T) { + testDocumentDecoder(t, feedInput(false)) + testDocumentDecoder(t, feedInput(true)) +} |