diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-03-24 14:06:41 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-03-24 14:06:41 +0100 |
commit | 72c83e118e09f73ef8cd7b97c7c0ac63b9ba5ca1 (patch) | |
tree | 0633c1ad34207bf98df4fd1e42e3ea07ecaf10e1 | |
parent | accf8cb44007d42959d403349a068c3c6d3fa8c2 (diff) | |
parent | 0814e1e59375f633704c315f1ec83eac9c50e7e7 (diff) |
Merge pull request #26565 from vespa-engine/mpolden/feed-client-3
Dispatch documents with common ID in order
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 120 | ||||
-rw-r--r-- | client/go/internal/vespa/document/dispatcher_test.go | 103 | ||||
-rw-r--r-- | client/go/internal/vespa/document/document.go | 56 | ||||
-rw-r--r-- | client/go/internal/vespa/document/document_test.go | 24 | ||||
-rw-r--r-- | client/go/internal/vespa/document/feeder.go | 5 | ||||
-rw-r--r-- | client/go/internal/vespa/document/feeder_test.go | 34 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http.go | 11 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http_test.go | 27 |
8 files changed, 283 insertions, 97 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index fa15a8a1223..feb562a241a 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -5,59 +5,125 @@ import ( "sync" ) +const maxAttempts = 10 + // Dispatcher dispatches documents from a queue to a Feeder. type Dispatcher struct { - concurrencyLevel int - feeder Feeder - pending chan Document - closed bool - mu sync.RWMutex - wg sync.WaitGroup + workers int + feeder Feeder + ready chan Id + inflight map[string]*documentGroup + mu sync.RWMutex + wg sync.WaitGroup + closed bool +} + +// documentGroup holds document operations which share their ID, and must be dispatched in order. +type documentGroup struct { + id Id + failed bool + operations []documentOp + mu sync.Mutex +} + +type documentOp struct { + document Document + attempts int +} + +func (g *documentGroup) append(op documentOp) { + g.mu.Lock() + defer g.mu.Unlock() + g.operations = append(g.operations, op) } -func NewDispatcher(feeder Feeder, concurrencyLevel int) *Dispatcher { - if concurrencyLevel < 1 { - concurrencyLevel = 1 +func NewDispatcher(feeder Feeder, workers int) *Dispatcher { + if workers < 1 { + workers = 1 } d := &Dispatcher{ - concurrencyLevel: concurrencyLevel, - feeder: feeder, - pending: make(chan Document, 4*concurrencyLevel), + workers: workers, + feeder: feeder, + inflight: make(map[string]*documentGroup), } - d.readPending() + d.start() return d } -func (d *Dispatcher) readPending() { - for i := 0; i < d.concurrencyLevel; i++ { +func (d *Dispatcher) dispatchAll(g *documentGroup) int { + g.mu.Lock() + defer g.mu.Unlock() + failCount := len(g.operations) + for i := 0; !g.failed && i < len(g.operations); i++ { + op := g.operations[i] + ok := false + for op.attempts <= maxAttempts && !ok { + op.attempts += 1 + // TODO(mpolden): Extract function which does throttling/circuit-breaking + result := d.feeder.Send(op.document) + ok = result.Status.Success() + } + if ok { + failCount-- + } else { + g.failed = true + failCount = len(g.operations) - i + } + } + g.operations = nil + return failCount +} + +func (d *Dispatcher) start() { + d.mu.Lock() + defer d.mu.Unlock() + d.closed = false + d.ready = make(chan Id, 4*d.workers) + for i := 0; i < d.workers; i++ { d.wg.Add(1) - go func(n int) { + go func() { defer d.wg.Done() - for doc := range d.pending { - d.feeder.Send(doc) + for id := range d.ready { + d.mu.RLock() + group := d.inflight[id.String()] + d.mu.RUnlock() + if group != nil { + failedDocs := d.dispatchAll(group) + d.feeder.AddStats(Stats{Errors: int64(failedDocs)}) + } } - }(i) + }() } } func (d *Dispatcher) Enqueue(doc Document) error { - d.mu.RLock() - defer d.mu.RUnlock() + d.mu.Lock() + defer d.mu.Unlock() if d.closed { - return fmt.Errorf("cannot enqueue document in closed dispatcher") + return fmt.Errorf("dispatcher is closed") } - d.pending <- doc + group, ok := d.inflight[doc.Id.String()] + if ok { + group.append(documentOp{document: doc}) + } else { + group = &documentGroup{ + id: doc.Id, + operations: []documentOp{{document: doc}}, + } + d.inflight[doc.Id.String()] = group + } + d.ready <- doc.Id return nil } -// Close closes the dispatcher and waits for all goroutines to return. +// Close closes the dispatcher and waits for all inflight operations to complete. func (d *Dispatcher) Close() error { d.mu.Lock() - defer d.mu.Unlock() if !d.closed { + close(d.ready) d.closed = true - close(d.pending) - d.wg.Wait() } + d.mu.Unlock() + d.wg.Wait() return nil } diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go index 0e4876a7d4b..04e0928f2a3 100644 --- a/client/go/internal/vespa/document/dispatcher_test.go +++ b/client/go/internal/vespa/document/dispatcher_test.go @@ -1,31 +1,49 @@ package document import ( - "encoding/json" "sync" "testing" + + "github.com/stretchr/testify/assert" ) type mockFeeder struct { - documents []Document - mu sync.Mutex + failAfterNDocs int + documents []Document + stats Stats + mu sync.Mutex +} + +func (f *mockFeeder) failAfterN(docs int) { + f.mu.Lock() + defer f.mu.Unlock() + f.failAfterNDocs = docs } func (f *mockFeeder) Send(doc Document) Result { f.mu.Lock() defer f.mu.Unlock() + if f.failAfterNDocs > 0 && len(f.documents) >= f.failAfterNDocs { + return Result{Id: doc.Id, Status: StatusVespaFailure} + } f.documents = append(f.documents, doc) return Result{Id: doc.Id} } -func (f *mockFeeder) Stats() Stats { return Stats{} } +func (f *mockFeeder) Stats() Stats { return f.stats } + +func (f *mockFeeder) AddStats(stats Stats) { + f.mu.Lock() + defer f.mu.Unlock() + f.stats.Add(stats) +} func TestDispatcher(t *testing.T) { feeder := &mockFeeder{} dispatcher := NewDispatcher(feeder, 2) docs := []Document{ - {PutId: "id:ns:type::doc1", Fields: json.RawMessage(`{"foo": "123"}`)}, - {PutId: "id:ns:type::doc2", Fields: json.RawMessage(`{"bar": "456"}`)}, + {Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{"foo": "123"}}`)}, + {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Body: []byte(`{"fields":{"bar": "456"}}`)}, } for _, d := range docs { dispatcher.Enqueue(d) @@ -35,3 +53,76 @@ func TestDispatcher(t *testing.T) { t.Errorf("got %d documents, want %d", got, want) } } + +func TestDispatcherOrdering(t *testing.T) { + feeder := &mockFeeder{} + commonId := mustParseId("id:ns:type::doc1") + docs := []Document{ + {Id: commonId, Operation: OperationPut}, + {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut}, + {Id: mustParseId("id:ns:type::doc3"), Operation: OperationPut}, + {Id: mustParseId("id:ns:type::doc4"), Operation: OperationPut}, + {Id: commonId, Operation: OperationUpdate}, + {Id: mustParseId("id:ns:type::doc5"), Operation: OperationPut}, + {Id: mustParseId("id:ns:type::doc6"), Operation: OperationPut}, + {Id: commonId, Operation: OperationRemove}, + {Id: mustParseId("id:ns:type::doc7"), Operation: OperationPut}, + {Id: mustParseId("id:ns:type::doc8"), Operation: OperationPut}, + {Id: mustParseId("id:ns:type::doc9"), Operation: OperationPut}, + } + dispatcher := NewDispatcher(feeder, len(docs)) + for _, d := range docs { + dispatcher.Enqueue(d) + } + dispatcher.Close() + + var wantDocs []Document + for _, d := range docs { + if d.Id.Equal(commonId) { + wantDocs = append(wantDocs, d) + } + } + var gotDocs []Document + for _, d := range feeder.documents { + if d.Id.Equal(commonId) { + gotDocs = append(gotDocs, d) + } + } + assert.Equal(t, len(docs), len(feeder.documents)) + assert.Equal(t, wantDocs, gotDocs) + assert.Equal(t, int64(0), feeder.Stats().Errors) +} + +func TestDispatcherOrderingWithFailures(t *testing.T) { + feeder := &mockFeeder{} + commonId := mustParseId("id:ns:type::doc1") + docs := []Document{ + {Id: commonId, Operation: OperationPut}, + {Id: commonId, Operation: OperationPut}, + {Id: commonId, Operation: OperationUpdate}, // fails + {Id: commonId, Operation: OperationRemove}, // fails + } + feeder.failAfterN(2) + dispatcher := NewDispatcher(feeder, len(docs)) + for _, d := range docs { + dispatcher.Enqueue(d) + } + dispatcher.Close() + wantDocs := docs[:2] + assert.Equal(t, wantDocs, feeder.documents) + assert.Equal(t, int64(2), feeder.Stats().Errors) + + // Dispatching more documents for same ID fails implicitly + feeder.failAfterN(0) + dispatcher.start() + dispatcher.Enqueue(Document{Id: commonId, Operation: OperationPut}) + dispatcher.Enqueue(Document{Id: commonId, Operation: OperationRemove}) + // Other IDs are fine + doc2 := Document{Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut} + doc3 := Document{Id: mustParseId("id:ns:type::doc3"), Operation: OperationPut} + dispatcher.Enqueue(doc2) + dispatcher.Enqueue(doc3) + dispatcher.Close() + assert.Equal(t, int64(4), feeder.Stats().Errors) + assert.Equal(t, 4, len(feeder.documents)) +} diff --git a/client/go/internal/vespa/document/document.go b/client/go/internal/vespa/document/document.go index 6aeafd80005..98cb2d1b6c6 100644 --- a/client/go/internal/vespa/document/document.go +++ b/client/go/internal/vespa/document/document.go @@ -103,11 +103,16 @@ func ParseId(serialized string) (Id, error) { }, nil } -// Document represents a Vespa document, and its operation. +// Document represents a Vespa document operation. type Document struct { Id Id Operation Operation + Condition string + Create bool + Body []byte +} +type jsonDocument struct { IdString string `json:"id"` PutId string `json:"put"` UpdateId string `json:"update"` @@ -117,16 +122,6 @@ type Document struct { Fields json.RawMessage `json:"fields"` } -// Body returns the body part of this document, suitable for sending to the /document/v1 API. -func (d Document) Body() []byte { - jsonObject := `{"fields":` - body := make([]byte, 0, len(jsonObject)+len(d.Fields)+1) - body = append(body, []byte(jsonObject)...) - body = append(body, d.Fields...) - body = append(body, byte('}')) - return body -} - // Decoder decodes documents from a JSON structure which is either an array of objects, or objects separated by newline. type Decoder struct { buf *bufio.Reader @@ -193,14 +188,11 @@ func (d *Decoder) decode() (Document, error) { } return Document{}, err } - doc := Document{} + doc := jsonDocument{} if err := d.dec.Decode(&doc); err != nil { return Document{}, err } - if err := parseDocument(&doc); err != nil { - return Document{}, err - } - return doc, nil + return parseDocument(&doc) } func NewDecoder(r io.Reader) *Decoder { @@ -211,29 +203,43 @@ func NewDecoder(r io.Reader) *Decoder { } } -func parseDocument(d *Document) error { +func parseDocument(d *jsonDocument) (Document, error) { id := "" + var op Operation if d.IdString != "" { - d.Operation = OperationPut + op = OperationPut id = d.IdString } else if d.PutId != "" { - d.Operation = OperationPut + op = OperationPut id = d.PutId } else if d.UpdateId != "" { - d.Operation = OperationUpdate + op = OperationUpdate id = d.UpdateId } else if d.RemoveId != "" { - d.Operation = OperationRemove + op = OperationRemove id = d.RemoveId } else { - return fmt.Errorf("invalid document: missing operation: %v", d) + return Document{}, fmt.Errorf("invalid document: missing operation: %v", d) } docId, err := ParseId(id) if err != nil { - return err + return Document{}, err } - d.Id = docId - return nil + var body []byte + if d.Fields != nil { + jsonObject := `{"fields":` + body = make([]byte, 0, len(jsonObject)+len(d.Fields)+1) + body = append(body, []byte(jsonObject)...) + body = append(body, d.Fields...) + body = append(body, byte('}')) + } + return Document{ + Id: docId, + Operation: op, + Condition: d.Condition, + Create: d.Create, + Body: body, + }, nil } func parseError(value string) error { diff --git a/client/go/internal/vespa/document/document_test.go b/client/go/internal/vespa/document/document_test.go index 478dd795dd8..111b9e37acc 100644 --- a/client/go/internal/vespa/document/document_test.go +++ b/client/go/internal/vespa/document/document_test.go @@ -1,8 +1,6 @@ package document import ( - "bytes" - "encoding/json" "io" "reflect" "strings" @@ -11,11 +9,12 @@ import ( func ptr[T any](v T) *T { return &v } -func mustParseDocument(d Document) Document { - if err := parseDocument(&d); err != nil { +func mustParseId(s string) Id { + id, err := ParseId(s) + if err != nil { panic(err) } - return d + return id } func TestParseId(t *testing.T) { @@ -134,9 +133,9 @@ func testDocumentDecoder(t *testing.T, jsonLike string) { t.Helper() r := NewDecoder(strings.NewReader(jsonLike)) want := []Document{ - mustParseDocument(Document{PutId: "id:ns:type::doc1", Fields: json.RawMessage(`{"foo": "123"}`)}), - mustParseDocument(Document{PutId: "id:ns:type::doc2", Fields: json.RawMessage(`{"bar": "456"}`)}), - mustParseDocument(Document{RemoveId: "id:ns:type::doc1", Fields: json.RawMessage(nil)}), + {Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{"foo": "123"}}`)}, + {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Body: []byte(`{"fields":{"bar": "456"}}`)}, + {Id: mustParseId("id:ns:type::doc1"), Operation: OperationRemove}, } got := []Document{} for { @@ -179,12 +178,3 @@ func TestDocumentDecoder(t *testing.T) { t.Errorf("want error %q, got %q", wantErr, err.Error()) } } - -func TestDocumentBody(t *testing.T) { - doc := Document{Fields: json.RawMessage([]byte(`{"foo": "123"}`))} - got := doc.Body() - want := []byte(`{"fields":{"foo": "123"}}`) - if !bytes.Equal(got, want) { - t.Errorf("got %q, want %q", got, want) - } -} diff --git a/client/go/internal/vespa/document/feeder.go b/client/go/internal/vespa/document/feeder.go index 732db051dab..6996e649d24 100644 --- a/client/go/internal/vespa/document/feeder.go +++ b/client/go/internal/vespa/document/feeder.go @@ -65,7 +65,7 @@ func (s Stats) Successes() int64 { return s.ResponsesByCode[200] } -// Add adds all statistics contained in other to this. +// Add all statistics contained in other to this. func (s *Stats) Add(other Stats) { s.Requests += other.Requests s.Responses += other.Responses @@ -80,7 +80,7 @@ func (s *Stats) Add(other Stats) { s.Errors += other.Errors s.Inflight += other.Inflight s.TotalLatency += other.TotalLatency - if s.MinLatency == 0 || other.MinLatency < s.MinLatency { + if s.MinLatency == 0 || (other.MinLatency > 0 && other.MinLatency < s.MinLatency) { s.MinLatency = other.MinLatency } if other.MaxLatency > s.MaxLatency { @@ -94,4 +94,5 @@ func (s *Stats) Add(other Stats) { type Feeder interface { Send(Document) Result Stats() Stats + AddStats(Stats) } diff --git a/client/go/internal/vespa/document/feeder_test.go b/client/go/internal/vespa/document/feeder_test.go new file mode 100644 index 00000000000..1368d871436 --- /dev/null +++ b/client/go/internal/vespa/document/feeder_test.go @@ -0,0 +1,34 @@ +package document + +import ( + "reflect" + "testing" + "time" +) + +func TestStatsAdd(t *testing.T) { + got := NewStats() + got.Add(Stats{Requests: 1}) + got.Add(Stats{Requests: 1}) + got.Add(Stats{Responses: 1}) + got.Add(Stats{Responses: 1}) + got.Add(Stats{ResponsesByCode: map[int]int64{200: 2}}) + got.Add(Stats{ResponsesByCode: map[int]int64{200: 2}}) + got.Add(Stats{MinLatency: 200 * time.Millisecond}) + got.Add(Stats{MaxLatency: 400 * time.Millisecond}) + got.Add(Stats{MinLatency: 100 * time.Millisecond}) + got.Add(Stats{MaxLatency: 500 * time.Millisecond}) + got.Add(Stats{MaxLatency: 300 * time.Millisecond}) + got.Add(Stats{}) + + want := Stats{ + Requests: 2, + Responses: 2, + ResponsesByCode: map[int]int64{200: 4}, + MinLatency: 100 * time.Millisecond, + MaxLatency: 500 * time.Millisecond, + } + if !reflect.DeepEqual(got, want) { + t.Errorf("got %+v, want %+v", got, want) + } +} diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go index e86ceb1ebc5..2e01d4564ab 100644 --- a/client/go/internal/vespa/document/http.go +++ b/client/go/internal/vespa/document/http.go @@ -123,15 +123,14 @@ func (c *Client) Send(document Document) Result { stats.TotalLatency = latency stats.MinLatency = latency stats.MaxLatency = latency - c.addStats(&stats) + c.AddStats(stats) }() method, url, err := c.feedURL(document, c.queryParams()) if err != nil { stats.Errors = 1 return Result{Status: StatusError, Err: err} } - body := document.Body() - req, err := http.NewRequest(method, url.String(), bytes.NewReader(body)) + req, err := http.NewRequest(method, url.String(), bytes.NewReader(document.Body)) if err != nil { stats.Errors = 1 return Result{Status: StatusError, Err: err} @@ -146,16 +145,16 @@ func (c *Client) Send(document Document) Result { stats.ResponsesByCode = map[int]int64{ resp.StatusCode: 1, } - stats.BytesSent = int64(len(body)) + stats.BytesSent = int64(len(document.Body)) return c.createResult(document.Id, &stats, resp) } func (c *Client) Stats() Stats { return c.stats } -func (c *Client) addStats(stats *Stats) { +func (c *Client) AddStats(stats Stats) { c.mu.Lock() defer c.mu.Unlock() - c.stats.Add(*stats) + c.stats.Add(stats) } func (c *Client) createResult(id Id, stats *Stats, resp *http.Response) Result { diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go index ca59c4c2af0..f02c87730d5 100644 --- a/client/go/internal/vespa/document/http_test.go +++ b/client/go/internal/vespa/document/http_test.go @@ -2,7 +2,6 @@ package document import ( "bytes" - "encoding/json" "fmt" "io" "net/http" @@ -27,9 +26,9 @@ func (c *manualClock) now() time.Time { func TestClientSend(t *testing.T) { docs := []Document{ - mustParseDocument(Document{Create: true, UpdateId: "id:ns:type::doc1", Fields: json.RawMessage(`{"foo": "123"}`)}), - mustParseDocument(Document{Create: true, UpdateId: "id:ns:type::doc2", Fields: json.RawMessage(`{"foo": "456"}`)}), - mustParseDocument(Document{Create: true, UpdateId: "id:ns:type::doc3", Fields: json.RawMessage(`{"baz": "789"}`)}), + {Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "123"}}`)}, + {Create: true, Id: mustParseId("id:ns:type::doc2"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "456"}}`)}, + {Create: true, Id: mustParseId("id:ns:type::doc3"), Operation: OperationUpdate, Body: []byte(`{"fields":{"baz": "789"}}`)}, } httpClient := mock.HTTPClient{} client := NewClient(ClientOptions{ @@ -60,7 +59,7 @@ func TestClientSend(t *testing.T) { if err != nil { t.Fatalf("got unexpected error %q", err) } - wantBody := doc.Body() + wantBody := doc.Body if !bytes.Equal(body, wantBody) { t.Errorf("got r.Body = %q, want %q", string(body), string(wantBody)) } @@ -149,25 +148,25 @@ func TestClientFeedURL(t *testing.T) { url string }{ { - mustParseDocument(Document{ - IdString: "id:ns:type::user", - }), + Document{Id: mustParseId("id:ns:type::user")}, "POST", "https://example.com/document/v1/ns/type/docid/user?foo=ba%2Fr", }, { - mustParseDocument(Document{ - UpdateId: "id:ns:type::user", + Document{ + Id: mustParseId("id:ns:type::user"), + Operation: OperationUpdate, Create: true, Condition: "false", - }), + }, "PUT", "https://example.com/document/v1/ns/type/docid/user?condition=false&create=true&foo=ba%2Fr", }, { - mustParseDocument(Document{ - RemoveId: "id:ns:type::user", - }), + Document{ + Id: mustParseId("id:ns:type::user"), + Operation: OperationRemove, + }, "DELETE", "https://example.com/document/v1/ns/type/docid/user?foo=ba%2Fr", }, |