diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-04-26 15:24:28 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-04-26 15:24:28 +0200 |
commit | f5bc53a2187a0c04e5ebfabff0f27aeb1db1e6d1 (patch) | |
tree | 6a8c0d9939cf121e292bfef697059c3dc331c38d | |
parent | 49aba5432a2965b9ab4792e4b38445f6f3289074 (diff) | |
parent | fcccffa0124d85b36e3611ec554efb37eb67324e (diff) |
Merge pull request #26865 from vespa-engine/mpolden/feed-client-15
Acquire slot per document ID
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 163 | ||||
-rw-r--r-- | client/go/internal/vespa/document/dispatcher_test.go | 49 | ||||
-rw-r--r-- | client/go/internal/vespa/document/feeder.go | 2 |
3 files changed, 139 insertions, 75 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index b87dfaf55eb..78f8455de9a 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -20,44 +20,41 @@ type Dispatcher struct { stats Stats started bool - results chan Result + ready chan documentOp + results chan documentOp msgs chan string - inflight map[string]*documentGroup + inflight map[string]*Queue[documentOp] inflightCount int64 output io.Writer verbose bool - listPool sync.Pool - mu sync.RWMutex - workerWg sync.WaitGroup - resultWg sync.WaitGroup + listPool sync.Pool + mu sync.Mutex + wg sync.WaitGroup + inflightWg sync.WaitGroup } // documentOp represents a document operation and the number of times it has been attempted. type documentOp struct { document Document + result Result attempts int } -// documentGroup holds document operations which share an ID, and must be dispatched in order. -type documentGroup struct { - q *Queue[documentOp] - mu sync.Mutex +func (op documentOp) resetResult() documentOp { + op.result = Result{} + return op } -func (g *documentGroup) add(op documentOp, first bool) { - g.mu.Lock() - defer g.mu.Unlock() - g.q.Add(op, first) -} +func (op documentOp) complete() bool { return op.result.Success() || op.attempts > maxAttempts } func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, output io.Writer, verbose bool) *Dispatcher { d := &Dispatcher{ feeder: feeder, throttler: throttler, circuitBreaker: breaker, - inflight: make(map[string]*documentGroup), + inflight: make(map[string]*Queue[documentOp]), output: output, verbose: verbose, } @@ -65,24 +62,8 @@ func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, o return d } -func (d *Dispatcher) sendDocumentIn(group *documentGroup) { - group.mu.Lock() - op, ok := group.q.Poll() - if !ok { - panic("sending from empty document group, this should not happen") - } - op.attempts++ - result := d.feeder.Send(op.document) - d.results <- result - d.releaseSlot() - group.mu.Unlock() - if d.shouldRetry(op, result) { - d.enqueue(op) - } -} - func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool { - if result.HTTPStatus/100 == 2 || result.HTTPStatus == 404 || result.HTTPStatus == 412 { + if result.Success() { if d.verbose { d.msgs <- fmt.Sprintf("feed: successfully fed %s with status %d", op.document.Id, result.HTTPStatus) } @@ -91,7 +72,7 @@ func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool { return false } if result.HTTPStatus == 429 || result.HTTPStatus == 503 { - d.msgs <- fmt.Sprintf("feed: %s was throttled with status %d: retrying\n", op.document, result.HTTPStatus) + d.msgs <- fmt.Sprintf("feed: %s was throttled with status %d: retrying", op.document, result.HTTPStatus) d.throttler.Throttled(atomic.LoadInt64(&d.inflightCount)) return true } @@ -127,60 +108,107 @@ func (d *Dispatcher) start() { return } d.listPool.New = func() any { return list.New() } - d.results = make(chan Result, 4096) + d.ready = make(chan documentOp, 4096) + d.results = make(chan documentOp, 4096) d.msgs = make(chan string, 4096) d.started = true - d.resultWg.Add(2) - go d.sumStats() + d.wg.Add(3) + go d.dispatchReady() + go d.processResults() go d.printMessages() } -func (d *Dispatcher) sumStats() { - defer d.resultWg.Done() - for result := range d.results { - d.stats.Add(result.Stats) +func (d *Dispatcher) dispatchReady() { + defer d.wg.Done() + for op := range d.ready { + d.dispatch(op) + } +} + +func (d *Dispatcher) dispatch(op documentOp) { + if !d.acceptDocument() { + d.msgs <- fmt.Sprintf("refusing to dispatch document %s: too many errors", op.document.Id.String()) + d.results <- op.resetResult() + return + } + go func() { + op.attempts++ + op.result = d.feeder.Send(op.document) + d.results <- op + }() +} + +func (d *Dispatcher) processResults() { + defer d.wg.Done() + for op := range d.results { + d.stats.Add(op.result.Stats) + retry := d.shouldRetry(op, op.result) + if retry { + d.enqueue(op.resetResult(), true) + } else if op.complete() { + d.inflightWg.Done() + } + d.dispatchNext(op.document.Id) + } +} + +func (d *Dispatcher) dispatchNext(id Id) { + d.mu.Lock() + defer d.mu.Unlock() + k := id.String() + q, ok := d.inflight[k] + if !ok { + panic("no queue exists for " + id.String() + ": this should not happen") + } + if next, ok := q.Poll(); ok { + // we have more operations with this ID: notify dispatcher about the next one + d.ready <- next + } else { + // no more operations with this ID: release slot + delete(d.inflight, k) + d.releaseSlot() } } func (d *Dispatcher) printMessages() { - defer d.resultWg.Done() + defer d.wg.Done() for msg := range d.msgs { fmt.Fprintln(d.output, msg) } } -func (d *Dispatcher) enqueue(op documentOp) error { +func (d *Dispatcher) enqueue(op documentOp, isRetry bool) error { d.mu.Lock() if !d.started { + d.mu.Unlock() return fmt.Errorf("dispatcher is closed") } + if !d.acceptDocument() { + d.mu.Unlock() + return fmt.Errorf("refusing to enqueue document %s: too many errors", op.document.Id.String()) + } key := op.document.Id.String() - group, ok := d.inflight[key] + q, ok := d.inflight[key] if !ok { - group = &documentGroup{q: NewQueue[documentOp](&d.listPool)} - d.inflight[key] = group + q = NewQueue[documentOp](&d.listPool) + d.inflight[key] = q + } else { + q.Add(op, isRetry) + } + if !isRetry { + d.inflightWg.Add(1) } d.mu.Unlock() - group.add(op, op.attempts > 0) - d.dispatch(op.document.Id, group) - return nil -} - -func (d *Dispatcher) dispatch(id Id, group *documentGroup) { - if !d.canDispatch() { - d.msgs <- fmt.Sprintf("refusing to dispatch document %s: too many errors", id) - return + if !ok && !isRetry { + // first operation with this ID: acquire slot + d.acquireSlot() + d.ready <- op + d.throttler.Sent() } - d.acquireSlot() - d.workerWg.Add(1) - go func() { - defer d.workerWg.Done() - d.sendDocumentIn(group) - }() - d.throttler.Sent() + return nil } -func (d *Dispatcher) canDispatch() bool { +func (d *Dispatcher) acceptDocument() bool { switch d.circuitBreaker.State() { case CircuitClosed: return true @@ -202,20 +230,21 @@ func (d *Dispatcher) acquireSlot() { func (d *Dispatcher) releaseSlot() { atomic.AddInt64(&d.inflightCount, -1) } -func (d *Dispatcher) Enqueue(doc Document) error { return d.enqueue(documentOp{document: doc}) } +func (d *Dispatcher) Enqueue(doc Document) error { return d.enqueue(documentOp{document: doc}, false) } func (d *Dispatcher) Stats() Stats { return d.stats } -// Close closes the dispatcher and waits for all inflight operations to complete. +// Close waits for all inflight operations to complete and closes the dispatcher. func (d *Dispatcher) Close() error { - d.workerWg.Wait() // Wait for all inflight operations to complete + d.inflightWg.Wait() // Wait for all inflight operations to complete d.mu.Lock() if d.started { + close(d.ready) close(d.results) close(d.msgs) d.started = false } d.mu.Unlock() - d.resultWg.Wait() // Wait for results + d.wg.Wait() // Wait for all channel readers to return return nil } diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go index 2e2e9a5abbd..24d5d716d04 100644 --- a/client/go/internal/vespa/document/dispatcher_test.go +++ b/client/go/internal/vespa/document/dispatcher_test.go @@ -10,6 +10,8 @@ import ( ) type mockFeeder struct { + sendCount int + failCount int failAfterNDocs int documents []Document mu sync.Mutex @@ -21,11 +23,21 @@ func (f *mockFeeder) failAfterN(docs int) { f.failAfterNDocs = docs } +func (f *mockFeeder) failN(times int) { + f.mu.Lock() + defer f.mu.Unlock() + f.failCount = times +} + func (f *mockFeeder) Send(doc Document) Result { f.mu.Lock() defer f.mu.Unlock() - result := Result{Id: doc.Id} - if f.failAfterNDocs > 0 && len(f.documents) >= f.failAfterNDocs { + f.sendCount++ + result := Result{Id: doc.Id, HTTPStatus: 200} + failRequest := (f.failAfterNDocs > 0 && len(f.documents) >= f.failAfterNDocs) || + (f.failCount > 0 && f.sendCount <= f.failCount) + if failRequest { + result.HTTPStatus = 500 result.Status = StatusVespaFailure } else { f.documents = append(f.documents, doc) @@ -123,7 +135,7 @@ func TestDispatcherOrderingWithFailures(t *testing.T) { dispatcher.Close() wantDocs := docs[:2] assert.Equal(t, wantDocs, feeder.documents) - assert.Equal(t, int64(2), dispatcher.Stats().Errors) + assert.Equal(t, int64(22), dispatcher.Stats().Errors) // Dispatching more documents for same ID succeed feeder.failAfterN(0) @@ -133,10 +145,30 @@ func TestDispatcherOrderingWithFailures(t *testing.T) { dispatcher.Enqueue(Document{Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut}) dispatcher.Enqueue(Document{Id: mustParseId("id:ns:type::doc3"), Operation: OperationPut}) dispatcher.Close() - assert.Equal(t, int64(2), dispatcher.Stats().Errors) + assert.Equal(t, int64(22), dispatcher.Stats().Errors) assert.Equal(t, 6, len(feeder.documents)) } +func TestDispatcherOrderingWithRetry(t *testing.T) { + feeder := &mockFeeder{} + commonId := mustParseId("id:ns:type::doc1") + docs := []Document{ + {Id: commonId, Operation: OperationPut}, // fails + {Id: commonId, Operation: OperationRemove}, + } + feeder.failN(5) + clock := &manualClock{tick: time.Second} + throttler := newThrottler(8, clock.now) + breaker := NewCircuitBreaker(time.Second, 0) + dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard, false) + for _, d := range docs { + dispatcher.Enqueue(d) + } + dispatcher.Close() + assert.Equal(t, docs, feeder.documents) + assert.Equal(t, int64(5), dispatcher.Stats().Errors) +} + func TestDispatcherOpenCircuit(t *testing.T) { feeder := &mockFeeder{} doc := Document{Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut} @@ -145,8 +177,11 @@ func TestDispatcherOpenCircuit(t *testing.T) { breaker := &mockCircuitBreaker{} dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard, false) dispatcher.Enqueue(doc) + dispatcher.inflightWg.Wait() breaker.state = CircuitOpen - dispatcher.Enqueue(doc) + if err := dispatcher.Enqueue(doc); err == nil { + t.Fatal("expected error due to open circuit") + } dispatcher.Close() assert.Equal(t, 1, len(feeder.documents)) } @@ -161,7 +196,7 @@ func BenchmarkDocumentDispatching(b *testing.B) { b.ResetTimer() // ignore setup time for n := 0; n < b.N; n++ { - dispatcher.enqueue(documentOp{document: doc}) - dispatcher.workerWg.Wait() + dispatcher.Enqueue(doc) + dispatcher.inflightWg.Wait() } } diff --git a/client/go/internal/vespa/document/feeder.go b/client/go/internal/vespa/document/feeder.go index 9e6768d0eb4..a550ea32624 100644 --- a/client/go/internal/vespa/document/feeder.go +++ b/client/go/internal/vespa/document/feeder.go @@ -31,7 +31,7 @@ type Result struct { } func (r Result) Success() bool { - return r.Err == nil && (r.Status == StatusSuccess || r.Status == StatusConditionNotMet) + return r.HTTPStatus/100 == 2 || r.HTTPStatus == 404 || r.HTTPStatus == 412 } // Stats represents feeding operation statistics. |