aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-03-24 14:06:41 +0100
committerGitHub <noreply@github.com>2023-03-24 14:06:41 +0100
commit72c83e118e09f73ef8cd7b97c7c0ac63b9ba5ca1 (patch)
tree0633c1ad34207bf98df4fd1e42e3ea07ecaf10e1
parentaccf8cb44007d42959d403349a068c3c6d3fa8c2 (diff)
parent0814e1e59375f633704c315f1ec83eac9c50e7e7 (diff)
Merge pull request #26565 from vespa-engine/mpolden/feed-client-3
Dispatch documents with common ID in order
-rw-r--r--client/go/internal/vespa/document/dispatcher.go120
-rw-r--r--client/go/internal/vespa/document/dispatcher_test.go103
-rw-r--r--client/go/internal/vespa/document/document.go56
-rw-r--r--client/go/internal/vespa/document/document_test.go24
-rw-r--r--client/go/internal/vespa/document/feeder.go5
-rw-r--r--client/go/internal/vespa/document/feeder_test.go34
-rw-r--r--client/go/internal/vespa/document/http.go11
-rw-r--r--client/go/internal/vespa/document/http_test.go27
8 files changed, 283 insertions, 97 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index fa15a8a1223..feb562a241a 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -5,59 +5,125 @@ import (
"sync"
)
+const maxAttempts = 10
+
// Dispatcher dispatches documents from a queue to a Feeder.
type Dispatcher struct {
- concurrencyLevel int
- feeder Feeder
- pending chan Document
- closed bool
- mu sync.RWMutex
- wg sync.WaitGroup
+ workers int
+ feeder Feeder
+ ready chan Id
+ inflight map[string]*documentGroup
+ mu sync.RWMutex
+ wg sync.WaitGroup
+ closed bool
+}
+
+// documentGroup holds document operations which share their ID, and must be dispatched in order.
+type documentGroup struct {
+ id Id
+ failed bool
+ operations []documentOp
+ mu sync.Mutex
+}
+
+type documentOp struct {
+ document Document
+ attempts int
+}
+
+func (g *documentGroup) append(op documentOp) {
+ g.mu.Lock()
+ defer g.mu.Unlock()
+ g.operations = append(g.operations, op)
}
-func NewDispatcher(feeder Feeder, concurrencyLevel int) *Dispatcher {
- if concurrencyLevel < 1 {
- concurrencyLevel = 1
+func NewDispatcher(feeder Feeder, workers int) *Dispatcher {
+ if workers < 1 {
+ workers = 1
}
d := &Dispatcher{
- concurrencyLevel: concurrencyLevel,
- feeder: feeder,
- pending: make(chan Document, 4*concurrencyLevel),
+ workers: workers,
+ feeder: feeder,
+ inflight: make(map[string]*documentGroup),
}
- d.readPending()
+ d.start()
return d
}
-func (d *Dispatcher) readPending() {
- for i := 0; i < d.concurrencyLevel; i++ {
+func (d *Dispatcher) dispatchAll(g *documentGroup) int {
+ g.mu.Lock()
+ defer g.mu.Unlock()
+ failCount := len(g.operations)
+ for i := 0; !g.failed && i < len(g.operations); i++ {
+ op := g.operations[i]
+ ok := false
+ for op.attempts <= maxAttempts && !ok {
+ op.attempts += 1
+ // TODO(mpolden): Extract function which does throttling/circuit-breaking
+ result := d.feeder.Send(op.document)
+ ok = result.Status.Success()
+ }
+ if ok {
+ failCount--
+ } else {
+ g.failed = true
+ failCount = len(g.operations) - i
+ }
+ }
+ g.operations = nil
+ return failCount
+}
+
+func (d *Dispatcher) start() {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ d.closed = false
+ d.ready = make(chan Id, 4*d.workers)
+ for i := 0; i < d.workers; i++ {
d.wg.Add(1)
- go func(n int) {
+ go func() {
defer d.wg.Done()
- for doc := range d.pending {
- d.feeder.Send(doc)
+ for id := range d.ready {
+ d.mu.RLock()
+ group := d.inflight[id.String()]
+ d.mu.RUnlock()
+ if group != nil {
+ failedDocs := d.dispatchAll(group)
+ d.feeder.AddStats(Stats{Errors: int64(failedDocs)})
+ }
}
- }(i)
+ }()
}
}
func (d *Dispatcher) Enqueue(doc Document) error {
- d.mu.RLock()
- defer d.mu.RUnlock()
+ d.mu.Lock()
+ defer d.mu.Unlock()
if d.closed {
- return fmt.Errorf("cannot enqueue document in closed dispatcher")
+ return fmt.Errorf("dispatcher is closed")
}
- d.pending <- doc
+ group, ok := d.inflight[doc.Id.String()]
+ if ok {
+ group.append(documentOp{document: doc})
+ } else {
+ group = &documentGroup{
+ id: doc.Id,
+ operations: []documentOp{{document: doc}},
+ }
+ d.inflight[doc.Id.String()] = group
+ }
+ d.ready <- doc.Id
return nil
}
-// Close closes the dispatcher and waits for all goroutines to return.
+// Close closes the dispatcher and waits for all inflight operations to complete.
func (d *Dispatcher) Close() error {
d.mu.Lock()
- defer d.mu.Unlock()
if !d.closed {
+ close(d.ready)
d.closed = true
- close(d.pending)
- d.wg.Wait()
}
+ d.mu.Unlock()
+ d.wg.Wait()
return nil
}
diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go
index 0e4876a7d4b..04e0928f2a3 100644
--- a/client/go/internal/vespa/document/dispatcher_test.go
+++ b/client/go/internal/vespa/document/dispatcher_test.go
@@ -1,31 +1,49 @@
package document
import (
- "encoding/json"
"sync"
"testing"
+
+ "github.com/stretchr/testify/assert"
)
type mockFeeder struct {
- documents []Document
- mu sync.Mutex
+ failAfterNDocs int
+ documents []Document
+ stats Stats
+ mu sync.Mutex
+}
+
+func (f *mockFeeder) failAfterN(docs int) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ f.failAfterNDocs = docs
}
func (f *mockFeeder) Send(doc Document) Result {
f.mu.Lock()
defer f.mu.Unlock()
+ if f.failAfterNDocs > 0 && len(f.documents) >= f.failAfterNDocs {
+ return Result{Id: doc.Id, Status: StatusVespaFailure}
+ }
f.documents = append(f.documents, doc)
return Result{Id: doc.Id}
}
-func (f *mockFeeder) Stats() Stats { return Stats{} }
+func (f *mockFeeder) Stats() Stats { return f.stats }
+
+func (f *mockFeeder) AddStats(stats Stats) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ f.stats.Add(stats)
+}
func TestDispatcher(t *testing.T) {
feeder := &mockFeeder{}
dispatcher := NewDispatcher(feeder, 2)
docs := []Document{
- {PutId: "id:ns:type::doc1", Fields: json.RawMessage(`{"foo": "123"}`)},
- {PutId: "id:ns:type::doc2", Fields: json.RawMessage(`{"bar": "456"}`)},
+ {Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{"foo": "123"}}`)},
+ {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Body: []byte(`{"fields":{"bar": "456"}}`)},
}
for _, d := range docs {
dispatcher.Enqueue(d)
@@ -35,3 +53,76 @@ func TestDispatcher(t *testing.T) {
t.Errorf("got %d documents, want %d", got, want)
}
}
+
+func TestDispatcherOrdering(t *testing.T) {
+ feeder := &mockFeeder{}
+ commonId := mustParseId("id:ns:type::doc1")
+ docs := []Document{
+ {Id: commonId, Operation: OperationPut},
+ {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut},
+ {Id: mustParseId("id:ns:type::doc3"), Operation: OperationPut},
+ {Id: mustParseId("id:ns:type::doc4"), Operation: OperationPut},
+ {Id: commonId, Operation: OperationUpdate},
+ {Id: mustParseId("id:ns:type::doc5"), Operation: OperationPut},
+ {Id: mustParseId("id:ns:type::doc6"), Operation: OperationPut},
+ {Id: commonId, Operation: OperationRemove},
+ {Id: mustParseId("id:ns:type::doc7"), Operation: OperationPut},
+ {Id: mustParseId("id:ns:type::doc8"), Operation: OperationPut},
+ {Id: mustParseId("id:ns:type::doc9"), Operation: OperationPut},
+ }
+ dispatcher := NewDispatcher(feeder, len(docs))
+ for _, d := range docs {
+ dispatcher.Enqueue(d)
+ }
+ dispatcher.Close()
+
+ var wantDocs []Document
+ for _, d := range docs {
+ if d.Id.Equal(commonId) {
+ wantDocs = append(wantDocs, d)
+ }
+ }
+ var gotDocs []Document
+ for _, d := range feeder.documents {
+ if d.Id.Equal(commonId) {
+ gotDocs = append(gotDocs, d)
+ }
+ }
+ assert.Equal(t, len(docs), len(feeder.documents))
+ assert.Equal(t, wantDocs, gotDocs)
+ assert.Equal(t, int64(0), feeder.Stats().Errors)
+}
+
+func TestDispatcherOrderingWithFailures(t *testing.T) {
+ feeder := &mockFeeder{}
+ commonId := mustParseId("id:ns:type::doc1")
+ docs := []Document{
+ {Id: commonId, Operation: OperationPut},
+ {Id: commonId, Operation: OperationPut},
+ {Id: commonId, Operation: OperationUpdate}, // fails
+ {Id: commonId, Operation: OperationRemove}, // fails
+ }
+ feeder.failAfterN(2)
+ dispatcher := NewDispatcher(feeder, len(docs))
+ for _, d := range docs {
+ dispatcher.Enqueue(d)
+ }
+ dispatcher.Close()
+ wantDocs := docs[:2]
+ assert.Equal(t, wantDocs, feeder.documents)
+ assert.Equal(t, int64(2), feeder.Stats().Errors)
+
+ // Dispatching more documents for same ID fails implicitly
+ feeder.failAfterN(0)
+ dispatcher.start()
+ dispatcher.Enqueue(Document{Id: commonId, Operation: OperationPut})
+ dispatcher.Enqueue(Document{Id: commonId, Operation: OperationRemove})
+ // Other IDs are fine
+ doc2 := Document{Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut}
+ doc3 := Document{Id: mustParseId("id:ns:type::doc3"), Operation: OperationPut}
+ dispatcher.Enqueue(doc2)
+ dispatcher.Enqueue(doc3)
+ dispatcher.Close()
+ assert.Equal(t, int64(4), feeder.Stats().Errors)
+ assert.Equal(t, 4, len(feeder.documents))
+}
diff --git a/client/go/internal/vespa/document/document.go b/client/go/internal/vespa/document/document.go
index 6aeafd80005..98cb2d1b6c6 100644
--- a/client/go/internal/vespa/document/document.go
+++ b/client/go/internal/vespa/document/document.go
@@ -103,11 +103,16 @@ func ParseId(serialized string) (Id, error) {
}, nil
}
-// Document represents a Vespa document, and its operation.
+// Document represents a Vespa document operation.
type Document struct {
Id Id
Operation Operation
+ Condition string
+ Create bool
+ Body []byte
+}
+type jsonDocument struct {
IdString string `json:"id"`
PutId string `json:"put"`
UpdateId string `json:"update"`
@@ -117,16 +122,6 @@ type Document struct {
Fields json.RawMessage `json:"fields"`
}
-// Body returns the body part of this document, suitable for sending to the /document/v1 API.
-func (d Document) Body() []byte {
- jsonObject := `{"fields":`
- body := make([]byte, 0, len(jsonObject)+len(d.Fields)+1)
- body = append(body, []byte(jsonObject)...)
- body = append(body, d.Fields...)
- body = append(body, byte('}'))
- return body
-}
-
// Decoder decodes documents from a JSON structure which is either an array of objects, or objects separated by newline.
type Decoder struct {
buf *bufio.Reader
@@ -193,14 +188,11 @@ func (d *Decoder) decode() (Document, error) {
}
return Document{}, err
}
- doc := Document{}
+ doc := jsonDocument{}
if err := d.dec.Decode(&doc); err != nil {
return Document{}, err
}
- if err := parseDocument(&doc); err != nil {
- return Document{}, err
- }
- return doc, nil
+ return parseDocument(&doc)
}
func NewDecoder(r io.Reader) *Decoder {
@@ -211,29 +203,43 @@ func NewDecoder(r io.Reader) *Decoder {
}
}
-func parseDocument(d *Document) error {
+func parseDocument(d *jsonDocument) (Document, error) {
id := ""
+ var op Operation
if d.IdString != "" {
- d.Operation = OperationPut
+ op = OperationPut
id = d.IdString
} else if d.PutId != "" {
- d.Operation = OperationPut
+ op = OperationPut
id = d.PutId
} else if d.UpdateId != "" {
- d.Operation = OperationUpdate
+ op = OperationUpdate
id = d.UpdateId
} else if d.RemoveId != "" {
- d.Operation = OperationRemove
+ op = OperationRemove
id = d.RemoveId
} else {
- return fmt.Errorf("invalid document: missing operation: %v", d)
+ return Document{}, fmt.Errorf("invalid document: missing operation: %v", d)
}
docId, err := ParseId(id)
if err != nil {
- return err
+ return Document{}, err
}
- d.Id = docId
- return nil
+ var body []byte
+ if d.Fields != nil {
+ jsonObject := `{"fields":`
+ body = make([]byte, 0, len(jsonObject)+len(d.Fields)+1)
+ body = append(body, []byte(jsonObject)...)
+ body = append(body, d.Fields...)
+ body = append(body, byte('}'))
+ }
+ return Document{
+ Id: docId,
+ Operation: op,
+ Condition: d.Condition,
+ Create: d.Create,
+ Body: body,
+ }, nil
}
func parseError(value string) error {
diff --git a/client/go/internal/vespa/document/document_test.go b/client/go/internal/vespa/document/document_test.go
index 478dd795dd8..111b9e37acc 100644
--- a/client/go/internal/vespa/document/document_test.go
+++ b/client/go/internal/vespa/document/document_test.go
@@ -1,8 +1,6 @@
package document
import (
- "bytes"
- "encoding/json"
"io"
"reflect"
"strings"
@@ -11,11 +9,12 @@ import (
func ptr[T any](v T) *T { return &v }
-func mustParseDocument(d Document) Document {
- if err := parseDocument(&d); err != nil {
+func mustParseId(s string) Id {
+ id, err := ParseId(s)
+ if err != nil {
panic(err)
}
- return d
+ return id
}
func TestParseId(t *testing.T) {
@@ -134,9 +133,9 @@ func testDocumentDecoder(t *testing.T, jsonLike string) {
t.Helper()
r := NewDecoder(strings.NewReader(jsonLike))
want := []Document{
- mustParseDocument(Document{PutId: "id:ns:type::doc1", Fields: json.RawMessage(`{"foo": "123"}`)}),
- mustParseDocument(Document{PutId: "id:ns:type::doc2", Fields: json.RawMessage(`{"bar": "456"}`)}),
- mustParseDocument(Document{RemoveId: "id:ns:type::doc1", Fields: json.RawMessage(nil)}),
+ {Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{"foo": "123"}}`)},
+ {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Body: []byte(`{"fields":{"bar": "456"}}`)},
+ {Id: mustParseId("id:ns:type::doc1"), Operation: OperationRemove},
}
got := []Document{}
for {
@@ -179,12 +178,3 @@ func TestDocumentDecoder(t *testing.T) {
t.Errorf("want error %q, got %q", wantErr, err.Error())
}
}
-
-func TestDocumentBody(t *testing.T) {
- doc := Document{Fields: json.RawMessage([]byte(`{"foo": "123"}`))}
- got := doc.Body()
- want := []byte(`{"fields":{"foo": "123"}}`)
- if !bytes.Equal(got, want) {
- t.Errorf("got %q, want %q", got, want)
- }
-}
diff --git a/client/go/internal/vespa/document/feeder.go b/client/go/internal/vespa/document/feeder.go
index 732db051dab..6996e649d24 100644
--- a/client/go/internal/vespa/document/feeder.go
+++ b/client/go/internal/vespa/document/feeder.go
@@ -65,7 +65,7 @@ func (s Stats) Successes() int64 {
return s.ResponsesByCode[200]
}
-// Add adds all statistics contained in other to this.
+// Add all statistics contained in other to this.
func (s *Stats) Add(other Stats) {
s.Requests += other.Requests
s.Responses += other.Responses
@@ -80,7 +80,7 @@ func (s *Stats) Add(other Stats) {
s.Errors += other.Errors
s.Inflight += other.Inflight
s.TotalLatency += other.TotalLatency
- if s.MinLatency == 0 || other.MinLatency < s.MinLatency {
+ if s.MinLatency == 0 || (other.MinLatency > 0 && other.MinLatency < s.MinLatency) {
s.MinLatency = other.MinLatency
}
if other.MaxLatency > s.MaxLatency {
@@ -94,4 +94,5 @@ func (s *Stats) Add(other Stats) {
type Feeder interface {
Send(Document) Result
Stats() Stats
+ AddStats(Stats)
}
diff --git a/client/go/internal/vespa/document/feeder_test.go b/client/go/internal/vespa/document/feeder_test.go
new file mode 100644
index 00000000000..1368d871436
--- /dev/null
+++ b/client/go/internal/vespa/document/feeder_test.go
@@ -0,0 +1,34 @@
+package document
+
+import (
+ "reflect"
+ "testing"
+ "time"
+)
+
+func TestStatsAdd(t *testing.T) {
+ got := NewStats()
+ got.Add(Stats{Requests: 1})
+ got.Add(Stats{Requests: 1})
+ got.Add(Stats{Responses: 1})
+ got.Add(Stats{Responses: 1})
+ got.Add(Stats{ResponsesByCode: map[int]int64{200: 2}})
+ got.Add(Stats{ResponsesByCode: map[int]int64{200: 2}})
+ got.Add(Stats{MinLatency: 200 * time.Millisecond})
+ got.Add(Stats{MaxLatency: 400 * time.Millisecond})
+ got.Add(Stats{MinLatency: 100 * time.Millisecond})
+ got.Add(Stats{MaxLatency: 500 * time.Millisecond})
+ got.Add(Stats{MaxLatency: 300 * time.Millisecond})
+ got.Add(Stats{})
+
+ want := Stats{
+ Requests: 2,
+ Responses: 2,
+ ResponsesByCode: map[int]int64{200: 4},
+ MinLatency: 100 * time.Millisecond,
+ MaxLatency: 500 * time.Millisecond,
+ }
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf("got %+v, want %+v", got, want)
+ }
+}
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go
index e86ceb1ebc5..2e01d4564ab 100644
--- a/client/go/internal/vespa/document/http.go
+++ b/client/go/internal/vespa/document/http.go
@@ -123,15 +123,14 @@ func (c *Client) Send(document Document) Result {
stats.TotalLatency = latency
stats.MinLatency = latency
stats.MaxLatency = latency
- c.addStats(&stats)
+ c.AddStats(stats)
}()
method, url, err := c.feedURL(document, c.queryParams())
if err != nil {
stats.Errors = 1
return Result{Status: StatusError, Err: err}
}
- body := document.Body()
- req, err := http.NewRequest(method, url.String(), bytes.NewReader(body))
+ req, err := http.NewRequest(method, url.String(), bytes.NewReader(document.Body))
if err != nil {
stats.Errors = 1
return Result{Status: StatusError, Err: err}
@@ -146,16 +145,16 @@ func (c *Client) Send(document Document) Result {
stats.ResponsesByCode = map[int]int64{
resp.StatusCode: 1,
}
- stats.BytesSent = int64(len(body))
+ stats.BytesSent = int64(len(document.Body))
return c.createResult(document.Id, &stats, resp)
}
func (c *Client) Stats() Stats { return c.stats }
-func (c *Client) addStats(stats *Stats) {
+func (c *Client) AddStats(stats Stats) {
c.mu.Lock()
defer c.mu.Unlock()
- c.stats.Add(*stats)
+ c.stats.Add(stats)
}
func (c *Client) createResult(id Id, stats *Stats, resp *http.Response) Result {
diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go
index ca59c4c2af0..f02c87730d5 100644
--- a/client/go/internal/vespa/document/http_test.go
+++ b/client/go/internal/vespa/document/http_test.go
@@ -2,7 +2,6 @@ package document
import (
"bytes"
- "encoding/json"
"fmt"
"io"
"net/http"
@@ -27,9 +26,9 @@ func (c *manualClock) now() time.Time {
func TestClientSend(t *testing.T) {
docs := []Document{
- mustParseDocument(Document{Create: true, UpdateId: "id:ns:type::doc1", Fields: json.RawMessage(`{"foo": "123"}`)}),
- mustParseDocument(Document{Create: true, UpdateId: "id:ns:type::doc2", Fields: json.RawMessage(`{"foo": "456"}`)}),
- mustParseDocument(Document{Create: true, UpdateId: "id:ns:type::doc3", Fields: json.RawMessage(`{"baz": "789"}`)}),
+ {Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "123"}}`)},
+ {Create: true, Id: mustParseId("id:ns:type::doc2"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "456"}}`)},
+ {Create: true, Id: mustParseId("id:ns:type::doc3"), Operation: OperationUpdate, Body: []byte(`{"fields":{"baz": "789"}}`)},
}
httpClient := mock.HTTPClient{}
client := NewClient(ClientOptions{
@@ -60,7 +59,7 @@ func TestClientSend(t *testing.T) {
if err != nil {
t.Fatalf("got unexpected error %q", err)
}
- wantBody := doc.Body()
+ wantBody := doc.Body
if !bytes.Equal(body, wantBody) {
t.Errorf("got r.Body = %q, want %q", string(body), string(wantBody))
}
@@ -149,25 +148,25 @@ func TestClientFeedURL(t *testing.T) {
url string
}{
{
- mustParseDocument(Document{
- IdString: "id:ns:type::user",
- }),
+ Document{Id: mustParseId("id:ns:type::user")},
"POST",
"https://example.com/document/v1/ns/type/docid/user?foo=ba%2Fr",
},
{
- mustParseDocument(Document{
- UpdateId: "id:ns:type::user",
+ Document{
+ Id: mustParseId("id:ns:type::user"),
+ Operation: OperationUpdate,
Create: true,
Condition: "false",
- }),
+ },
"PUT",
"https://example.com/document/v1/ns/type/docid/user?condition=false&create=true&foo=ba%2Fr",
},
{
- mustParseDocument(Document{
- RemoveId: "id:ns:type::user",
- }),
+ Document{
+ Id: mustParseId("id:ns:type::user"),
+ Operation: OperationRemove,
+ },
"DELETE",
"https://example.com/document/v1/ns/type/docid/user?foo=ba%2Fr",
},