diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-03-29 17:22:35 +0100 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-03-29 17:22:35 +0100 |
commit | 95dbd20b3dc32e3512f9941245b91bbc3f6ad5ac (patch) | |
tree | 740876570aedf1879b8c4f26891bd3b510b56389 /client/go/internal/vespa/document/dispatcher.go | |
parent | 70211538e30cd49226d09d4e65ba17ff40ec2432 (diff) |
Reapply "Add throttling to vespa feed"
This reverts commit f9f0cdad3c6721d06a7833be07d6280c68347263.
Diffstat (limited to 'client/go/internal/vespa/document/dispatcher.go')
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 158 |
1 files changed, 110 insertions, 48 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index feb562a241a..a65f16c9298 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -3,25 +3,33 @@ package document import ( "fmt" "sync" + "sync/atomic" + "time" ) const maxAttempts = 10 // Dispatcher dispatches documents from a queue to a Feeder. type Dispatcher struct { - workers int - feeder Feeder - ready chan Id - inflight map[string]*documentGroup + feeder Feeder + throttler Throttler + circuitBreaker CircuitBreaker + stats Stats + + closed bool + ready chan Id + results chan Result + inflight map[string]*documentGroup + inflightCount atomic.Int64 + mu sync.RWMutex wg sync.WaitGroup - closed bool + resultWg sync.WaitGroup } // documentGroup holds document operations which share their ID, and must be dispatched in order. type documentGroup struct { id Id - failed bool operations []documentOp mu sync.Mutex } @@ -37,68 +45,97 @@ func (g *documentGroup) append(op documentOp) { g.operations = append(g.operations, op) } -func NewDispatcher(feeder Feeder, workers int) *Dispatcher { - if workers < 1 { - workers = 1 - } +func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker) *Dispatcher { d := &Dispatcher{ - workers: workers, - feeder: feeder, - inflight: make(map[string]*documentGroup), + feeder: feeder, + throttler: throttler, + circuitBreaker: breaker, + inflight: make(map[string]*documentGroup), } d.start() return d } -func (d *Dispatcher) dispatchAll(g *documentGroup) int { +func (d *Dispatcher) dispatchAll(g *documentGroup) { g.mu.Lock() defer g.mu.Unlock() - failCount := len(g.operations) - for i := 0; !g.failed && i < len(g.operations); i++ { + for i := 0; i < len(g.operations); i++ { op := g.operations[i] ok := false - for op.attempts <= maxAttempts && !ok { - op.attempts += 1 - // TODO(mpolden): Extract function which does throttling/circuit-breaking + for !ok { + op.attempts++ result := d.feeder.Send(op.document) + d.results <- result ok = result.Status.Success() + if !d.shouldRetry(op, result) { + break + } } - if ok { - failCount-- - } else { - g.failed = true - failCount = len(g.operations) - i - } + d.releaseSlot() } g.operations = nil - return failCount +} + +func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool { + if result.HTTPStatus/100 == 2 || result.HTTPStatus == 404 || result.HTTPStatus == 412 { + d.throttler.Success() + d.circuitBreaker.Success() + return false + } + if result.HTTPStatus == 429 || result.HTTPStatus == 503 { + d.throttler.Throttled(d.inflightCount.Load()) + return true + } + if result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 { + d.circuitBreaker.Error(fmt.Errorf("request failed with status %d", result.HTTPStatus)) + if op.attempts <= maxAttempts { + return true + } + } + return false } func (d *Dispatcher) start() { d.mu.Lock() defer d.mu.Unlock() + d.ready = make(chan Id, 4096) + d.results = make(chan Result, 4096) d.closed = false - d.ready = make(chan Id, 4*d.workers) - for i := 0; i < d.workers; i++ { - d.wg.Add(1) - go func() { - defer d.wg.Done() - for id := range d.ready { - d.mu.RLock() - group := d.inflight[id.String()] - d.mu.RUnlock() - if group != nil { - failedDocs := d.dispatchAll(group) - d.feeder.AddStats(Stats{Errors: int64(failedDocs)}) - } - } - }() + d.wg.Add(1) + go func() { + defer d.wg.Done() + d.readDocuments() + }() + d.resultWg.Add(1) + go func() { + defer d.resultWg.Done() + d.readResults() + }() +} + +func (d *Dispatcher) readDocuments() { + for id := range d.ready { + d.mu.RLock() + group := d.inflight[id.String()] + d.mu.RUnlock() + if group != nil { + d.wg.Add(1) + go func() { + defer d.wg.Done() + d.dispatchAll(group) + }() + } + } +} + +func (d *Dispatcher) readResults() { + for result := range d.results { + d.stats.Add(result.Stats) } } func (d *Dispatcher) Enqueue(doc Document) error { d.mu.Lock() - defer d.mu.Unlock() if d.closed { return fmt.Errorf("dispatcher is closed") } @@ -112,18 +149,43 @@ func (d *Dispatcher) Enqueue(doc Document) error { } d.inflight[doc.Id.String()] = group } - d.ready <- doc.Id + d.mu.Unlock() + d.enqueueWithSlot(doc.Id) return nil } -// Close closes the dispatcher and waits for all inflight operations to complete. -func (d *Dispatcher) Close() error { +func (d *Dispatcher) Stats() Stats { return d.stats } + +func (d *Dispatcher) enqueueWithSlot(id Id) { + d.acquireSlot() + d.ready <- id + d.throttler.Sent() +} + +func (d *Dispatcher) acquireSlot() { + for d.inflightCount.Load() >= d.throttler.TargetInflight() { + time.Sleep(time.Millisecond) + } + d.inflightCount.Add(1) +} + +func (d *Dispatcher) releaseSlot() { d.inflightCount.Add(-1) } + +func closeAndWait[T any](ch chan T, wg *sync.WaitGroup, d *Dispatcher, markClosed bool) { d.mu.Lock() if !d.closed { - close(d.ready) - d.closed = true + close(ch) + if markClosed { + d.closed = true + } } d.mu.Unlock() - d.wg.Wait() + wg.Wait() +} + +// Close closes the dispatcher and waits for all inflight operations to complete. +func (d *Dispatcher) Close() error { + closeAndWait(d.ready, &d.wg, d, false) + closeAndWait(d.results, &d.resultWg, d, true) return nil } |