diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-03-29 08:45:27 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-03-29 08:45:27 +0200 |
commit | f9f0cdad3c6721d06a7833be07d6280c68347263 (patch) | |
tree | 372c712f3b45e1b12778d036d9e65726591ffa1f /client/go/internal/vespa/document/dispatcher.go | |
parent | 4fa6eda0a6b2b286bcd0cebe878d2809f57c28d4 (diff) |
Revert "Add throttling to vespa feed"
Diffstat (limited to 'client/go/internal/vespa/document/dispatcher.go')
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 158 |
1 files changed, 48 insertions, 110 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index a65f16c9298..feb562a241a 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -3,33 +3,25 @@ package document import ( "fmt" "sync" - "sync/atomic" - "time" ) const maxAttempts = 10 // Dispatcher dispatches documents from a queue to a Feeder. type Dispatcher struct { - feeder Feeder - throttler Throttler - circuitBreaker CircuitBreaker - stats Stats - - closed bool - ready chan Id - results chan Result - inflight map[string]*documentGroup - inflightCount atomic.Int64 - + workers int + feeder Feeder + ready chan Id + inflight map[string]*documentGroup mu sync.RWMutex wg sync.WaitGroup - resultWg sync.WaitGroup + closed bool } // documentGroup holds document operations which share their ID, and must be dispatched in order. type documentGroup struct { id Id + failed bool operations []documentOp mu sync.Mutex } @@ -45,97 +37,68 @@ func (g *documentGroup) append(op documentOp) { g.operations = append(g.operations, op) } -func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker) *Dispatcher { +func NewDispatcher(feeder Feeder, workers int) *Dispatcher { + if workers < 1 { + workers = 1 + } d := &Dispatcher{ - feeder: feeder, - throttler: throttler, - circuitBreaker: breaker, - inflight: make(map[string]*documentGroup), + workers: workers, + feeder: feeder, + inflight: make(map[string]*documentGroup), } d.start() return d } -func (d *Dispatcher) dispatchAll(g *documentGroup) { +func (d *Dispatcher) dispatchAll(g *documentGroup) int { g.mu.Lock() defer g.mu.Unlock() - for i := 0; i < len(g.operations); i++ { + failCount := len(g.operations) + for i := 0; !g.failed && i < len(g.operations); i++ { op := g.operations[i] ok := false - for !ok { - op.attempts++ + for op.attempts <= maxAttempts && !ok { + op.attempts += 1 + // TODO(mpolden): Extract function which does throttling/circuit-breaking result := d.feeder.Send(op.document) - d.results <- result ok = result.Status.Success() - if !d.shouldRetry(op, result) { - break - } } - d.releaseSlot() - } - g.operations = nil -} - -func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool { - if result.HTTPStatus/100 == 2 || result.HTTPStatus == 404 || result.HTTPStatus == 412 { - d.throttler.Success() - d.circuitBreaker.Success() - return false - } - if result.HTTPStatus == 429 || result.HTTPStatus == 503 { - d.throttler.Throttled(d.inflightCount.Load()) - return true - } - if result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 { - d.circuitBreaker.Error(fmt.Errorf("request failed with status %d", result.HTTPStatus)) - if op.attempts <= maxAttempts { - return true + if ok { + failCount-- + } else { + g.failed = true + failCount = len(g.operations) - i } } - return false + g.operations = nil + return failCount } func (d *Dispatcher) start() { d.mu.Lock() defer d.mu.Unlock() - d.ready = make(chan Id, 4096) - d.results = make(chan Result, 4096) d.closed = false - d.wg.Add(1) - go func() { - defer d.wg.Done() - d.readDocuments() - }() - d.resultWg.Add(1) - go func() { - defer d.resultWg.Done() - d.readResults() - }() -} - -func (d *Dispatcher) readDocuments() { - for id := range d.ready { - d.mu.RLock() - group := d.inflight[id.String()] - d.mu.RUnlock() - if group != nil { - d.wg.Add(1) - go func() { - defer d.wg.Done() - d.dispatchAll(group) - }() - } - } -} - -func (d *Dispatcher) readResults() { - for result := range d.results { - d.stats.Add(result.Stats) + d.ready = make(chan Id, 4*d.workers) + for i := 0; i < d.workers; i++ { + d.wg.Add(1) + go func() { + defer d.wg.Done() + for id := range d.ready { + d.mu.RLock() + group := d.inflight[id.String()] + d.mu.RUnlock() + if group != nil { + failedDocs := d.dispatchAll(group) + d.feeder.AddStats(Stats{Errors: int64(failedDocs)}) + } + } + }() } } func (d *Dispatcher) Enqueue(doc Document) error { d.mu.Lock() + defer d.mu.Unlock() if d.closed { return fmt.Errorf("dispatcher is closed") } @@ -149,43 +112,18 @@ func (d *Dispatcher) Enqueue(doc Document) error { } d.inflight[doc.Id.String()] = group } - d.mu.Unlock() - d.enqueueWithSlot(doc.Id) + d.ready <- doc.Id return nil } -func (d *Dispatcher) Stats() Stats { return d.stats } - -func (d *Dispatcher) enqueueWithSlot(id Id) { - d.acquireSlot() - d.ready <- id - d.throttler.Sent() -} - -func (d *Dispatcher) acquireSlot() { - for d.inflightCount.Load() >= d.throttler.TargetInflight() { - time.Sleep(time.Millisecond) - } - d.inflightCount.Add(1) -} - -func (d *Dispatcher) releaseSlot() { d.inflightCount.Add(-1) } - -func closeAndWait[T any](ch chan T, wg *sync.WaitGroup, d *Dispatcher, markClosed bool) { +// Close closes the dispatcher and waits for all inflight operations to complete. +func (d *Dispatcher) Close() error { d.mu.Lock() if !d.closed { - close(ch) - if markClosed { - d.closed = true - } + close(d.ready) + d.closed = true } d.mu.Unlock() - wg.Wait() -} - -// Close closes the dispatcher and waits for all inflight operations to complete. -func (d *Dispatcher) Close() error { - closeAndWait(d.ready, &d.wg, d, false) - closeAndWait(d.results, &d.resultWg, d, true) + d.wg.Wait() return nil } |