aboutsummaryrefslogtreecommitdiffstats
path: root/client/go/internal/vespa/document/dispatcher.go
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-03-29 08:45:27 +0200
committerGitHub <noreply@github.com>2023-03-29 08:45:27 +0200
commitf9f0cdad3c6721d06a7833be07d6280c68347263 (patch)
tree372c712f3b45e1b12778d036d9e65726591ffa1f /client/go/internal/vespa/document/dispatcher.go
parent4fa6eda0a6b2b286bcd0cebe878d2809f57c28d4 (diff)
Revert "Add throttling to vespa feed"
Diffstat (limited to 'client/go/internal/vespa/document/dispatcher.go')
-rw-r--r--client/go/internal/vespa/document/dispatcher.go158
1 files changed, 48 insertions, 110 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index a65f16c9298..feb562a241a 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -3,33 +3,25 @@ package document
import (
"fmt"
"sync"
- "sync/atomic"
- "time"
)
const maxAttempts = 10
// Dispatcher dispatches documents from a queue to a Feeder.
type Dispatcher struct {
- feeder Feeder
- throttler Throttler
- circuitBreaker CircuitBreaker
- stats Stats
-
- closed bool
- ready chan Id
- results chan Result
- inflight map[string]*documentGroup
- inflightCount atomic.Int64
-
+ workers int
+ feeder Feeder
+ ready chan Id
+ inflight map[string]*documentGroup
mu sync.RWMutex
wg sync.WaitGroup
- resultWg sync.WaitGroup
+ closed bool
}
// documentGroup holds document operations which share their ID, and must be dispatched in order.
type documentGroup struct {
id Id
+ failed bool
operations []documentOp
mu sync.Mutex
}
@@ -45,97 +37,68 @@ func (g *documentGroup) append(op documentOp) {
g.operations = append(g.operations, op)
}
-func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker) *Dispatcher {
+func NewDispatcher(feeder Feeder, workers int) *Dispatcher {
+ if workers < 1 {
+ workers = 1
+ }
d := &Dispatcher{
- feeder: feeder,
- throttler: throttler,
- circuitBreaker: breaker,
- inflight: make(map[string]*documentGroup),
+ workers: workers,
+ feeder: feeder,
+ inflight: make(map[string]*documentGroup),
}
d.start()
return d
}
-func (d *Dispatcher) dispatchAll(g *documentGroup) {
+func (d *Dispatcher) dispatchAll(g *documentGroup) int {
g.mu.Lock()
defer g.mu.Unlock()
- for i := 0; i < len(g.operations); i++ {
+ failCount := len(g.operations)
+ for i := 0; !g.failed && i < len(g.operations); i++ {
op := g.operations[i]
ok := false
- for !ok {
- op.attempts++
+ for op.attempts <= maxAttempts && !ok {
+ op.attempts += 1
+ // TODO(mpolden): Extract function which does throttling/circuit-breaking
result := d.feeder.Send(op.document)
- d.results <- result
ok = result.Status.Success()
- if !d.shouldRetry(op, result) {
- break
- }
}
- d.releaseSlot()
- }
- g.operations = nil
-}
-
-func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool {
- if result.HTTPStatus/100 == 2 || result.HTTPStatus == 404 || result.HTTPStatus == 412 {
- d.throttler.Success()
- d.circuitBreaker.Success()
- return false
- }
- if result.HTTPStatus == 429 || result.HTTPStatus == 503 {
- d.throttler.Throttled(d.inflightCount.Load())
- return true
- }
- if result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 {
- d.circuitBreaker.Error(fmt.Errorf("request failed with status %d", result.HTTPStatus))
- if op.attempts <= maxAttempts {
- return true
+ if ok {
+ failCount--
+ } else {
+ g.failed = true
+ failCount = len(g.operations) - i
}
}
- return false
+ g.operations = nil
+ return failCount
}
func (d *Dispatcher) start() {
d.mu.Lock()
defer d.mu.Unlock()
- d.ready = make(chan Id, 4096)
- d.results = make(chan Result, 4096)
d.closed = false
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
- d.readDocuments()
- }()
- d.resultWg.Add(1)
- go func() {
- defer d.resultWg.Done()
- d.readResults()
- }()
-}
-
-func (d *Dispatcher) readDocuments() {
- for id := range d.ready {
- d.mu.RLock()
- group := d.inflight[id.String()]
- d.mu.RUnlock()
- if group != nil {
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
- d.dispatchAll(group)
- }()
- }
- }
-}
-
-func (d *Dispatcher) readResults() {
- for result := range d.results {
- d.stats.Add(result.Stats)
+ d.ready = make(chan Id, 4*d.workers)
+ for i := 0; i < d.workers; i++ {
+ d.wg.Add(1)
+ go func() {
+ defer d.wg.Done()
+ for id := range d.ready {
+ d.mu.RLock()
+ group := d.inflight[id.String()]
+ d.mu.RUnlock()
+ if group != nil {
+ failedDocs := d.dispatchAll(group)
+ d.feeder.AddStats(Stats{Errors: int64(failedDocs)})
+ }
+ }
+ }()
}
}
func (d *Dispatcher) Enqueue(doc Document) error {
d.mu.Lock()
+ defer d.mu.Unlock()
if d.closed {
return fmt.Errorf("dispatcher is closed")
}
@@ -149,43 +112,18 @@ func (d *Dispatcher) Enqueue(doc Document) error {
}
d.inflight[doc.Id.String()] = group
}
- d.mu.Unlock()
- d.enqueueWithSlot(doc.Id)
+ d.ready <- doc.Id
return nil
}
-func (d *Dispatcher) Stats() Stats { return d.stats }
-
-func (d *Dispatcher) enqueueWithSlot(id Id) {
- d.acquireSlot()
- d.ready <- id
- d.throttler.Sent()
-}
-
-func (d *Dispatcher) acquireSlot() {
- for d.inflightCount.Load() >= d.throttler.TargetInflight() {
- time.Sleep(time.Millisecond)
- }
- d.inflightCount.Add(1)
-}
-
-func (d *Dispatcher) releaseSlot() { d.inflightCount.Add(-1) }
-
-func closeAndWait[T any](ch chan T, wg *sync.WaitGroup, d *Dispatcher, markClosed bool) {
+// Close closes the dispatcher and waits for all inflight operations to complete.
+func (d *Dispatcher) Close() error {
d.mu.Lock()
if !d.closed {
- close(ch)
- if markClosed {
- d.closed = true
- }
+ close(d.ready)
+ d.closed = true
}
d.mu.Unlock()
- wg.Wait()
-}
-
-// Close closes the dispatcher and waits for all inflight operations to complete.
-func (d *Dispatcher) Close() error {
- closeAndWait(d.ready, &d.wg, d, false)
- closeAndWait(d.results, &d.resultWg, d, true)
+ d.wg.Wait()
return nil
}