diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-03-22 11:10:49 +0100 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-03-23 17:57:33 +0100 |
commit | 5016289ac451c46299cb20542f96e2f97a7a2f1c (patch) | |
tree | 6d0b2c1f334c7e818a077b31933f2bf3df11d5e6 /client | |
parent | 8d8bdb5bd10182e2ccf0e9095c4bd6adc26b0635 (diff) |
Dispatch documents with common ID in order
Diffstat (limited to 'client')
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 120 | ||||
-rw-r--r-- | client/go/internal/vespa/document/dispatcher_test.go | 98 | ||||
-rw-r--r-- | client/go/internal/vespa/document/feeder.go | 3 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http.go | 6 |
4 files changed, 193 insertions, 34 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index fa15a8a1223..feb562a241a 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -5,59 +5,125 @@ import ( "sync" ) +const maxAttempts = 10 + // Dispatcher dispatches documents from a queue to a Feeder. type Dispatcher struct { - concurrencyLevel int - feeder Feeder - pending chan Document - closed bool - mu sync.RWMutex - wg sync.WaitGroup + workers int + feeder Feeder + ready chan Id + inflight map[string]*documentGroup + mu sync.RWMutex + wg sync.WaitGroup + closed bool +} + +// documentGroup holds document operations which share their ID, and must be dispatched in order. +type documentGroup struct { + id Id + failed bool + operations []documentOp + mu sync.Mutex +} + +type documentOp struct { + document Document + attempts int +} + +func (g *documentGroup) append(op documentOp) { + g.mu.Lock() + defer g.mu.Unlock() + g.operations = append(g.operations, op) } -func NewDispatcher(feeder Feeder, concurrencyLevel int) *Dispatcher { - if concurrencyLevel < 1 { - concurrencyLevel = 1 +func NewDispatcher(feeder Feeder, workers int) *Dispatcher { + if workers < 1 { + workers = 1 } d := &Dispatcher{ - concurrencyLevel: concurrencyLevel, - feeder: feeder, - pending: make(chan Document, 4*concurrencyLevel), + workers: workers, + feeder: feeder, + inflight: make(map[string]*documentGroup), } - d.readPending() + d.start() return d } -func (d *Dispatcher) readPending() { - for i := 0; i < d.concurrencyLevel; i++ { +func (d *Dispatcher) dispatchAll(g *documentGroup) int { + g.mu.Lock() + defer g.mu.Unlock() + failCount := len(g.operations) + for i := 0; !g.failed && i < len(g.operations); i++ { + op := g.operations[i] + ok := false + for op.attempts <= maxAttempts && !ok { + op.attempts += 1 + // TODO(mpolden): Extract function which does throttling/circuit-breaking + result := d.feeder.Send(op.document) + ok = result.Status.Success() + } + if ok { + failCount-- + } else { + g.failed = true + failCount = len(g.operations) - i + } + } + g.operations = nil + return failCount +} + +func (d *Dispatcher) start() { + d.mu.Lock() + defer d.mu.Unlock() + d.closed = false + d.ready = make(chan Id, 4*d.workers) + for i := 0; i < d.workers; i++ { d.wg.Add(1) - go func(n int) { + go func() { defer d.wg.Done() - for doc := range d.pending { - d.feeder.Send(doc) + for id := range d.ready { + d.mu.RLock() + group := d.inflight[id.String()] + d.mu.RUnlock() + if group != nil { + failedDocs := d.dispatchAll(group) + d.feeder.AddStats(Stats{Errors: int64(failedDocs)}) + } } - }(i) + }() } } func (d *Dispatcher) Enqueue(doc Document) error { - d.mu.RLock() - defer d.mu.RUnlock() + d.mu.Lock() + defer d.mu.Unlock() if d.closed { - return fmt.Errorf("cannot enqueue document in closed dispatcher") + return fmt.Errorf("dispatcher is closed") } - d.pending <- doc + group, ok := d.inflight[doc.Id.String()] + if ok { + group.append(documentOp{document: doc}) + } else { + group = &documentGroup{ + id: doc.Id, + operations: []documentOp{{document: doc}}, + } + d.inflight[doc.Id.String()] = group + } + d.ready <- doc.Id return nil } -// Close closes the dispatcher and waits for all goroutines to return. +// Close closes the dispatcher and waits for all inflight operations to complete. func (d *Dispatcher) Close() error { d.mu.Lock() - defer d.mu.Unlock() if !d.closed { + close(d.ready) d.closed = true - close(d.pending) - d.wg.Wait() } + d.mu.Unlock() + d.wg.Wait() return nil } diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go index 0e4876a7d4b..384097670e1 100644 --- a/client/go/internal/vespa/document/dispatcher_test.go +++ b/client/go/internal/vespa/document/dispatcher_test.go @@ -4,21 +4,40 @@ import ( "encoding/json" "sync" "testing" + + "github.com/stretchr/testify/assert" ) type mockFeeder struct { - documents []Document - mu sync.Mutex + failAfterNDocs int + documents []Document + stats Stats + mu sync.Mutex +} + +func (f *mockFeeder) failAfterN(docs int) { + f.mu.Lock() + defer f.mu.Unlock() + f.failAfterNDocs = docs } func (f *mockFeeder) Send(doc Document) Result { f.mu.Lock() defer f.mu.Unlock() + if f.failAfterNDocs > 0 && len(f.documents) >= f.failAfterNDocs { + return Result{Id: doc.Id, Status: StatusVespaFailure} + } f.documents = append(f.documents, doc) return Result{Id: doc.Id} } -func (f *mockFeeder) Stats() Stats { return Stats{} } +func (f *mockFeeder) Stats() Stats { return f.stats } + +func (f *mockFeeder) AddStats(stats Stats) { + f.mu.Lock() + defer f.mu.Unlock() + f.stats.Add(stats) +} func TestDispatcher(t *testing.T) { feeder := &mockFeeder{} @@ -35,3 +54,76 @@ func TestDispatcher(t *testing.T) { t.Errorf("got %d documents, want %d", got, want) } } + +func TestDispatcherOrdering(t *testing.T) { + feeder := &mockFeeder{} + commonId := "id:ns:type::doc1" + docs := []Document{ + mustParseDocument(Document{PutId: commonId}), + mustParseDocument(Document{PutId: "id:ns:type::doc2"}), + mustParseDocument(Document{PutId: "id:ns:type::doc3"}), + mustParseDocument(Document{PutId: "id:ns:type::doc4"}), + mustParseDocument(Document{UpdateId: commonId}), + mustParseDocument(Document{PutId: "id:ns:type::doc5"}), + mustParseDocument(Document{PutId: "id:ns:type::doc6"}), + mustParseDocument(Document{RemoveId: commonId}), + mustParseDocument(Document{PutId: "id:ns:type::doc7"}), + mustParseDocument(Document{PutId: "id:ns:type::doc8"}), + mustParseDocument(Document{PutId: "id:ns:type::doc9"}), + } + dispatcher := NewDispatcher(feeder, len(docs)) + for _, d := range docs { + dispatcher.Enqueue(d) + } + dispatcher.Close() + + var wantDocs []Document + for _, d := range docs { + if d.Id.String() == commonId { + wantDocs = append(wantDocs, d) + } + } + var gotDocs []Document + for _, d := range feeder.documents { + if d.Id.String() == commonId { + gotDocs = append(gotDocs, d) + } + } + assert.Equal(t, len(docs), len(feeder.documents)) + assert.Equal(t, wantDocs, gotDocs) + assert.Equal(t, int64(0), feeder.Stats().Errors) +} + +func TestDispatcherOrderingWithFailures(t *testing.T) { + feeder := &mockFeeder{} + commonId := "id:ns:type::doc1" + docs := []Document{ + mustParseDocument(Document{PutId: commonId}), + mustParseDocument(Document{PutId: commonId}), + mustParseDocument(Document{UpdateId: commonId}), // fails + mustParseDocument(Document{RemoveId: commonId}), // fails + } + feeder.failAfterN(2) + dispatcher := NewDispatcher(feeder, len(docs)) + for _, d := range docs { + dispatcher.Enqueue(d) + } + dispatcher.Close() + wantDocs := docs[:2] + assert.Equal(t, wantDocs, feeder.documents) + assert.Equal(t, int64(2), feeder.Stats().Errors) + + // Dispatching more documents for same ID fails implicitly + feeder.failAfterN(0) + dispatcher.start() + dispatcher.Enqueue(mustParseDocument(Document{PutId: commonId})) + dispatcher.Enqueue(mustParseDocument(Document{RemoveId: commonId})) + // Other IDs are fine + doc2 := mustParseDocument(Document{PutId: "id:ns:type::doc2"}) + doc3 := mustParseDocument(Document{PutId: "id:ns:type::doc3"}) + dispatcher.Enqueue(doc2) + dispatcher.Enqueue(doc3) + dispatcher.Close() + assert.Equal(t, int64(4), feeder.Stats().Errors) + assert.Equal(t, 4, len(feeder.documents)) +} diff --git a/client/go/internal/vespa/document/feeder.go b/client/go/internal/vespa/document/feeder.go index 732db051dab..e7e891a3975 100644 --- a/client/go/internal/vespa/document/feeder.go +++ b/client/go/internal/vespa/document/feeder.go @@ -65,7 +65,7 @@ func (s Stats) Successes() int64 { return s.ResponsesByCode[200] } -// Add adds all statistics contained in other to this. +// Add all statistics contained in other to this. func (s *Stats) Add(other Stats) { s.Requests += other.Requests s.Responses += other.Responses @@ -94,4 +94,5 @@ func (s *Stats) Add(other Stats) { type Feeder interface { Send(Document) Result Stats() Stats + AddStats(Stats) } diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go index e86ceb1ebc5..981bad5140d 100644 --- a/client/go/internal/vespa/document/http.go +++ b/client/go/internal/vespa/document/http.go @@ -123,7 +123,7 @@ func (c *Client) Send(document Document) Result { stats.TotalLatency = latency stats.MinLatency = latency stats.MaxLatency = latency - c.addStats(&stats) + c.AddStats(stats) }() method, url, err := c.feedURL(document, c.queryParams()) if err != nil { @@ -152,10 +152,10 @@ func (c *Client) Send(document Document) Result { func (c *Client) Stats() Stats { return c.stats } -func (c *Client) addStats(stats *Stats) { +func (c *Client) AddStats(stats Stats) { c.mu.Lock() defer c.mu.Unlock() - c.stats.Add(*stats) + c.stats.Add(stats) } func (c *Client) createResult(id Id, stats *Stats, resp *http.Response) Result { |