aboutsummaryrefslogtreecommitdiffstats
path: root/client/go/internal/vespa/document/dispatcher.go
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-03-29 17:22:35 +0100
committerMartin Polden <mpolden@mpolden.no>2023-03-29 17:22:35 +0100
commit95dbd20b3dc32e3512f9941245b91bbc3f6ad5ac (patch)
tree740876570aedf1879b8c4f26891bd3b510b56389 /client/go/internal/vespa/document/dispatcher.go
parent70211538e30cd49226d09d4e65ba17ff40ec2432 (diff)
Reapply "Add throttling to vespa feed"
This reverts commit f9f0cdad3c6721d06a7833be07d6280c68347263.
Diffstat (limited to 'client/go/internal/vespa/document/dispatcher.go')
-rw-r--r--client/go/internal/vespa/document/dispatcher.go158
1 files changed, 110 insertions, 48 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index feb562a241a..a65f16c9298 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -3,25 +3,33 @@ package document
import (
"fmt"
"sync"
+ "sync/atomic"
+ "time"
)
const maxAttempts = 10
// Dispatcher dispatches documents from a queue to a Feeder.
type Dispatcher struct {
- workers int
- feeder Feeder
- ready chan Id
- inflight map[string]*documentGroup
+ feeder Feeder
+ throttler Throttler
+ circuitBreaker CircuitBreaker
+ stats Stats
+
+ closed bool
+ ready chan Id
+ results chan Result
+ inflight map[string]*documentGroup
+ inflightCount atomic.Int64
+
mu sync.RWMutex
wg sync.WaitGroup
- closed bool
+ resultWg sync.WaitGroup
}
// documentGroup holds document operations which share their ID, and must be dispatched in order.
type documentGroup struct {
id Id
- failed bool
operations []documentOp
mu sync.Mutex
}
@@ -37,68 +45,97 @@ func (g *documentGroup) append(op documentOp) {
g.operations = append(g.operations, op)
}
-func NewDispatcher(feeder Feeder, workers int) *Dispatcher {
- if workers < 1 {
- workers = 1
- }
+func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker) *Dispatcher {
d := &Dispatcher{
- workers: workers,
- feeder: feeder,
- inflight: make(map[string]*documentGroup),
+ feeder: feeder,
+ throttler: throttler,
+ circuitBreaker: breaker,
+ inflight: make(map[string]*documentGroup),
}
d.start()
return d
}
-func (d *Dispatcher) dispatchAll(g *documentGroup) int {
+func (d *Dispatcher) dispatchAll(g *documentGroup) {
g.mu.Lock()
defer g.mu.Unlock()
- failCount := len(g.operations)
- for i := 0; !g.failed && i < len(g.operations); i++ {
+ for i := 0; i < len(g.operations); i++ {
op := g.operations[i]
ok := false
- for op.attempts <= maxAttempts && !ok {
- op.attempts += 1
- // TODO(mpolden): Extract function which does throttling/circuit-breaking
+ for !ok {
+ op.attempts++
result := d.feeder.Send(op.document)
+ d.results <- result
ok = result.Status.Success()
+ if !d.shouldRetry(op, result) {
+ break
+ }
}
- if ok {
- failCount--
- } else {
- g.failed = true
- failCount = len(g.operations) - i
- }
+ d.releaseSlot()
}
g.operations = nil
- return failCount
+}
+
+func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool {
+ if result.HTTPStatus/100 == 2 || result.HTTPStatus == 404 || result.HTTPStatus == 412 {
+ d.throttler.Success()
+ d.circuitBreaker.Success()
+ return false
+ }
+ if result.HTTPStatus == 429 || result.HTTPStatus == 503 {
+ d.throttler.Throttled(d.inflightCount.Load())
+ return true
+ }
+ if result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 {
+ d.circuitBreaker.Error(fmt.Errorf("request failed with status %d", result.HTTPStatus))
+ if op.attempts <= maxAttempts {
+ return true
+ }
+ }
+ return false
}
func (d *Dispatcher) start() {
d.mu.Lock()
defer d.mu.Unlock()
+ d.ready = make(chan Id, 4096)
+ d.results = make(chan Result, 4096)
d.closed = false
- d.ready = make(chan Id, 4*d.workers)
- for i := 0; i < d.workers; i++ {
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
- for id := range d.ready {
- d.mu.RLock()
- group := d.inflight[id.String()]
- d.mu.RUnlock()
- if group != nil {
- failedDocs := d.dispatchAll(group)
- d.feeder.AddStats(Stats{Errors: int64(failedDocs)})
- }
- }
- }()
+ d.wg.Add(1)
+ go func() {
+ defer d.wg.Done()
+ d.readDocuments()
+ }()
+ d.resultWg.Add(1)
+ go func() {
+ defer d.resultWg.Done()
+ d.readResults()
+ }()
+}
+
+func (d *Dispatcher) readDocuments() {
+ for id := range d.ready {
+ d.mu.RLock()
+ group := d.inflight[id.String()]
+ d.mu.RUnlock()
+ if group != nil {
+ d.wg.Add(1)
+ go func() {
+ defer d.wg.Done()
+ d.dispatchAll(group)
+ }()
+ }
+ }
+}
+
+func (d *Dispatcher) readResults() {
+ for result := range d.results {
+ d.stats.Add(result.Stats)
}
}
func (d *Dispatcher) Enqueue(doc Document) error {
d.mu.Lock()
- defer d.mu.Unlock()
if d.closed {
return fmt.Errorf("dispatcher is closed")
}
@@ -112,18 +149,43 @@ func (d *Dispatcher) Enqueue(doc Document) error {
}
d.inflight[doc.Id.String()] = group
}
- d.ready <- doc.Id
+ d.mu.Unlock()
+ d.enqueueWithSlot(doc.Id)
return nil
}
-// Close closes the dispatcher and waits for all inflight operations to complete.
-func (d *Dispatcher) Close() error {
+func (d *Dispatcher) Stats() Stats { return d.stats }
+
+func (d *Dispatcher) enqueueWithSlot(id Id) {
+ d.acquireSlot()
+ d.ready <- id
+ d.throttler.Sent()
+}
+
+func (d *Dispatcher) acquireSlot() {
+ for d.inflightCount.Load() >= d.throttler.TargetInflight() {
+ time.Sleep(time.Millisecond)
+ }
+ d.inflightCount.Add(1)
+}
+
+func (d *Dispatcher) releaseSlot() { d.inflightCount.Add(-1) }
+
+func closeAndWait[T any](ch chan T, wg *sync.WaitGroup, d *Dispatcher, markClosed bool) {
d.mu.Lock()
if !d.closed {
- close(d.ready)
- d.closed = true
+ close(ch)
+ if markClosed {
+ d.closed = true
+ }
}
d.mu.Unlock()
- d.wg.Wait()
+ wg.Wait()
+}
+
+// Close closes the dispatcher and waits for all inflight operations to complete.
+func (d *Dispatcher) Close() error {
+ closeAndWait(d.ready, &d.wg, d, false)
+ closeAndWait(d.results, &d.resultWg, d, true)
return nil
}