diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-04-05 12:27:54 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-04-11 10:27:09 +0200 |
commit | a51e87a9b5c9931fdb1895d996661687166bb863 (patch) | |
tree | 060402e59bc2bbe955fffe376ac2aebfbee98568 /client | |
parent | ffc9c7e4f350f58a70f46844f04cb1007f83cafc (diff) |
Queue retries
Diffstat (limited to 'client')
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 106 |
1 files changed, 55 insertions, 51 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 9d757aa51aa..838a7bc45ee 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -1,6 +1,7 @@ package document import ( + "container/list" "fmt" "io" "sync" @@ -17,7 +18,7 @@ type Dispatcher struct { circuitBreaker CircuitBreaker stats Stats - closed bool + started bool ready chan Id results chan Result inflight map[string]*documentGroup @@ -29,22 +30,29 @@ type Dispatcher struct { resultWg sync.WaitGroup } -// documentGroup holds document operations which share their ID, and must be dispatched in order. -type documentGroup struct { - id Id - operations []documentOp - mu sync.Mutex -} - +// documentOp represents a document operation and the number of times it has been attempted. type documentOp struct { document Document attempts int } -func (g *documentGroup) append(op documentOp) { +// documentGroup holds document operations which share an ID, and must be dispatched in order. +type documentGroup struct { + ops *list.List + mu sync.Mutex +} + +func (g *documentGroup) add(op documentOp, first bool) { g.mu.Lock() defer g.mu.Unlock() - g.operations = append(g.operations, op) + if g.ops == nil { + g.ops = list.New() + } + if first { + g.ops.PushFront(op) + } else { + g.ops.PushBack(op) + } } func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, errWriter io.Writer) *Dispatcher { @@ -59,24 +67,21 @@ func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, e return d } -func (d *Dispatcher) dispatchAll(g *documentGroup) { - g.mu.Lock() - defer g.mu.Unlock() - for i := 0; i < len(g.operations); i++ { - op := g.operations[i] - ok := false - for !ok { - op.attempts++ - result := d.feeder.Send(op.document) - d.results <- result - ok = result.Success() - if !d.shouldRetry(op, result) { - break - } - } - d.releaseSlot() +func (d *Dispatcher) sendDocumentIn(group *documentGroup) { + group.mu.Lock() + defer group.mu.Unlock() + defer d.releaseSlot() + first := group.ops.Front() + if first == nil { + panic("sending from empty document group, this should not happen") + } + op := group.ops.Remove(first).(documentOp) + op.attempts++ + result := d.feeder.Send(op.document) + d.results <- result + if d.shouldRetry(op, result) { + d.enqueue(op) } - g.operations = nil } func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool { @@ -115,9 +120,12 @@ func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool { func (d *Dispatcher) start() { d.mu.Lock() defer d.mu.Unlock() + if d.started { + return + } d.ready = make(chan Id, 4096) d.results = make(chan Result, 4096) - d.closed = false + d.started = true d.wg.Add(1) go func() { defer d.wg.Done() @@ -135,13 +143,11 @@ func (d *Dispatcher) readDocuments() { d.mu.RLock() group := d.inflight[id.String()] d.mu.RUnlock() - if group != nil { - d.wg.Add(1) - go func() { - defer d.wg.Done() - d.dispatchAll(group) - }() - } + d.wg.Add(1) + go func() { + defer d.wg.Done() + d.sendDocumentIn(group) + }() } } @@ -151,28 +157,22 @@ func (d *Dispatcher) readResults() { } } -func (d *Dispatcher) Enqueue(doc Document) error { +func (d *Dispatcher) enqueue(op documentOp) error { d.mu.Lock() - if d.closed { + if !d.started { return fmt.Errorf("dispatcher is closed") } - group, ok := d.inflight[doc.Id.String()] - if ok { - group.append(documentOp{document: doc}) - } else { - group = &documentGroup{ - id: doc.Id, - operations: []documentOp{{document: doc}}, - } - d.inflight[doc.Id.String()] = group + group, ok := d.inflight[op.document.Id.String()] + if !ok { + group = &documentGroup{} + d.inflight[op.document.Id.String()] = group } d.mu.Unlock() - d.enqueueWithSlot(doc.Id) + group.add(op, op.attempts > 0) + d.enqueueWithSlot(op.document.Id) return nil } -func (d *Dispatcher) Stats() Stats { return d.stats } - func (d *Dispatcher) enqueueWithSlot(id Id) { d.acquireSlot() d.ready <- id @@ -190,16 +190,20 @@ func (d *Dispatcher) releaseSlot() { atomic.AddInt64(&d.inflightCount, -1) } func closeAndWait[T any](ch chan T, wg *sync.WaitGroup, d *Dispatcher, markClosed bool) { d.mu.Lock() - if !d.closed { + if d.started { close(ch) if markClosed { - d.closed = true + d.started = false } } d.mu.Unlock() wg.Wait() } +func (d *Dispatcher) Enqueue(doc Document) error { return d.enqueue(documentOp{document: doc}) } + +func (d *Dispatcher) Stats() Stats { return d.stats } + // Close closes the dispatcher and waits for all inflight operations to complete. func (d *Dispatcher) Close() error { closeAndWait(d.ready, &d.wg, d, false) |