aboutsummaryrefslogtreecommitdiffstats
path: root/client/go/internal/vespa/document/dispatcher.go
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-04-05 12:27:54 +0200
committerMartin Polden <mpolden@mpolden.no>2023-04-11 10:27:09 +0200
commita51e87a9b5c9931fdb1895d996661687166bb863 (patch)
tree060402e59bc2bbe955fffe376ac2aebfbee98568 /client/go/internal/vespa/document/dispatcher.go
parentffc9c7e4f350f58a70f46844f04cb1007f83cafc (diff)
Queue retries
Diffstat (limited to 'client/go/internal/vespa/document/dispatcher.go')
-rw-r--r--client/go/internal/vespa/document/dispatcher.go106
1 files changed, 55 insertions, 51 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index 9d757aa51aa..838a7bc45ee 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -1,6 +1,7 @@
package document
import (
+ "container/list"
"fmt"
"io"
"sync"
@@ -17,7 +18,7 @@ type Dispatcher struct {
circuitBreaker CircuitBreaker
stats Stats
- closed bool
+ started bool
ready chan Id
results chan Result
inflight map[string]*documentGroup
@@ -29,22 +30,29 @@ type Dispatcher struct {
resultWg sync.WaitGroup
}
-// documentGroup holds document operations which share their ID, and must be dispatched in order.
-type documentGroup struct {
- id Id
- operations []documentOp
- mu sync.Mutex
-}
-
+// documentOp represents a document operation and the number of times it has been attempted.
type documentOp struct {
document Document
attempts int
}
-func (g *documentGroup) append(op documentOp) {
+// documentGroup holds document operations which share an ID, and must be dispatched in order.
+type documentGroup struct {
+ ops *list.List
+ mu sync.Mutex
+}
+
+func (g *documentGroup) add(op documentOp, first bool) {
g.mu.Lock()
defer g.mu.Unlock()
- g.operations = append(g.operations, op)
+ if g.ops == nil {
+ g.ops = list.New()
+ }
+ if first {
+ g.ops.PushFront(op)
+ } else {
+ g.ops.PushBack(op)
+ }
}
func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, errWriter io.Writer) *Dispatcher {
@@ -59,24 +67,21 @@ func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, e
return d
}
-func (d *Dispatcher) dispatchAll(g *documentGroup) {
- g.mu.Lock()
- defer g.mu.Unlock()
- for i := 0; i < len(g.operations); i++ {
- op := g.operations[i]
- ok := false
- for !ok {
- op.attempts++
- result := d.feeder.Send(op.document)
- d.results <- result
- ok = result.Success()
- if !d.shouldRetry(op, result) {
- break
- }
- }
- d.releaseSlot()
+func (d *Dispatcher) sendDocumentIn(group *documentGroup) {
+ group.mu.Lock()
+ defer group.mu.Unlock()
+ defer d.releaseSlot()
+ first := group.ops.Front()
+ if first == nil {
+ panic("sending from empty document group, this should not happen")
+ }
+ op := group.ops.Remove(first).(documentOp)
+ op.attempts++
+ result := d.feeder.Send(op.document)
+ d.results <- result
+ if d.shouldRetry(op, result) {
+ d.enqueue(op)
}
- g.operations = nil
}
func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool {
@@ -115,9 +120,12 @@ func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool {
func (d *Dispatcher) start() {
d.mu.Lock()
defer d.mu.Unlock()
+ if d.started {
+ return
+ }
d.ready = make(chan Id, 4096)
d.results = make(chan Result, 4096)
- d.closed = false
+ d.started = true
d.wg.Add(1)
go func() {
defer d.wg.Done()
@@ -135,13 +143,11 @@ func (d *Dispatcher) readDocuments() {
d.mu.RLock()
group := d.inflight[id.String()]
d.mu.RUnlock()
- if group != nil {
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
- d.dispatchAll(group)
- }()
- }
+ d.wg.Add(1)
+ go func() {
+ defer d.wg.Done()
+ d.sendDocumentIn(group)
+ }()
}
}
@@ -151,28 +157,22 @@ func (d *Dispatcher) readResults() {
}
}
-func (d *Dispatcher) Enqueue(doc Document) error {
+func (d *Dispatcher) enqueue(op documentOp) error {
d.mu.Lock()
- if d.closed {
+ if !d.started {
return fmt.Errorf("dispatcher is closed")
}
- group, ok := d.inflight[doc.Id.String()]
- if ok {
- group.append(documentOp{document: doc})
- } else {
- group = &documentGroup{
- id: doc.Id,
- operations: []documentOp{{document: doc}},
- }
- d.inflight[doc.Id.String()] = group
+ group, ok := d.inflight[op.document.Id.String()]
+ if !ok {
+ group = &documentGroup{}
+ d.inflight[op.document.Id.String()] = group
}
d.mu.Unlock()
- d.enqueueWithSlot(doc.Id)
+ group.add(op, op.attempts > 0)
+ d.enqueueWithSlot(op.document.Id)
return nil
}
-func (d *Dispatcher) Stats() Stats { return d.stats }
-
func (d *Dispatcher) enqueueWithSlot(id Id) {
d.acquireSlot()
d.ready <- id
@@ -190,16 +190,20 @@ func (d *Dispatcher) releaseSlot() { atomic.AddInt64(&d.inflightCount, -1) }
func closeAndWait[T any](ch chan T, wg *sync.WaitGroup, d *Dispatcher, markClosed bool) {
d.mu.Lock()
- if !d.closed {
+ if d.started {
close(ch)
if markClosed {
- d.closed = true
+ d.started = false
}
}
d.mu.Unlock()
wg.Wait()
}
+func (d *Dispatcher) Enqueue(doc Document) error { return d.enqueue(documentOp{document: doc}) }
+
+func (d *Dispatcher) Stats() Stats { return d.stats }
+
// Close closes the dispatcher and waits for all inflight operations to complete.
func (d *Dispatcher) Close() error {
closeAndWait(d.ready, &d.wg, d, false)