aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-04-26 14:14:13 +0200
committerMartin Polden <mpolden@mpolden.no>2023-04-26 14:14:13 +0200
commitfcccffa0124d85b36e3611ec554efb37eb67324e (patch)
tree0cfdfdfa2fbdf85cda1d7c0f502727629efd73fb
parentbcc1cde89028e920885920260ac87098e4282911 (diff)
Acquire slot per document ID
-rw-r--r--client/go/internal/vespa/document/dispatcher.go163
-rw-r--r--client/go/internal/vespa/document/dispatcher_test.go49
-rw-r--r--client/go/internal/vespa/document/feeder.go2
3 files changed, 139 insertions, 75 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index b87dfaf55eb..78f8455de9a 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -20,44 +20,41 @@ type Dispatcher struct {
stats Stats
started bool
- results chan Result
+ ready chan documentOp
+ results chan documentOp
msgs chan string
- inflight map[string]*documentGroup
+ inflight map[string]*Queue[documentOp]
inflightCount int64
output io.Writer
verbose bool
- listPool sync.Pool
- mu sync.RWMutex
- workerWg sync.WaitGroup
- resultWg sync.WaitGroup
+ listPool sync.Pool
+ mu sync.Mutex
+ wg sync.WaitGroup
+ inflightWg sync.WaitGroup
}
// documentOp represents a document operation and the number of times it has been attempted.
type documentOp struct {
document Document
+ result Result
attempts int
}
-// documentGroup holds document operations which share an ID, and must be dispatched in order.
-type documentGroup struct {
- q *Queue[documentOp]
- mu sync.Mutex
+func (op documentOp) resetResult() documentOp {
+ op.result = Result{}
+ return op
}
-func (g *documentGroup) add(op documentOp, first bool) {
- g.mu.Lock()
- defer g.mu.Unlock()
- g.q.Add(op, first)
-}
+func (op documentOp) complete() bool { return op.result.Success() || op.attempts > maxAttempts }
func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, output io.Writer, verbose bool) *Dispatcher {
d := &Dispatcher{
feeder: feeder,
throttler: throttler,
circuitBreaker: breaker,
- inflight: make(map[string]*documentGroup),
+ inflight: make(map[string]*Queue[documentOp]),
output: output,
verbose: verbose,
}
@@ -65,24 +62,8 @@ func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, o
return d
}
-func (d *Dispatcher) sendDocumentIn(group *documentGroup) {
- group.mu.Lock()
- op, ok := group.q.Poll()
- if !ok {
- panic("sending from empty document group, this should not happen")
- }
- op.attempts++
- result := d.feeder.Send(op.document)
- d.results <- result
- d.releaseSlot()
- group.mu.Unlock()
- if d.shouldRetry(op, result) {
- d.enqueue(op)
- }
-}
-
func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool {
- if result.HTTPStatus/100 == 2 || result.HTTPStatus == 404 || result.HTTPStatus == 412 {
+ if result.Success() {
if d.verbose {
d.msgs <- fmt.Sprintf("feed: successfully fed %s with status %d", op.document.Id, result.HTTPStatus)
}
@@ -91,7 +72,7 @@ func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool {
return false
}
if result.HTTPStatus == 429 || result.HTTPStatus == 503 {
- d.msgs <- fmt.Sprintf("feed: %s was throttled with status %d: retrying\n", op.document, result.HTTPStatus)
+ d.msgs <- fmt.Sprintf("feed: %s was throttled with status %d: retrying", op.document, result.HTTPStatus)
d.throttler.Throttled(atomic.LoadInt64(&d.inflightCount))
return true
}
@@ -127,60 +108,107 @@ func (d *Dispatcher) start() {
return
}
d.listPool.New = func() any { return list.New() }
- d.results = make(chan Result, 4096)
+ d.ready = make(chan documentOp, 4096)
+ d.results = make(chan documentOp, 4096)
d.msgs = make(chan string, 4096)
d.started = true
- d.resultWg.Add(2)
- go d.sumStats()
+ d.wg.Add(3)
+ go d.dispatchReady()
+ go d.processResults()
go d.printMessages()
}
-func (d *Dispatcher) sumStats() {
- defer d.resultWg.Done()
- for result := range d.results {
- d.stats.Add(result.Stats)
+func (d *Dispatcher) dispatchReady() {
+ defer d.wg.Done()
+ for op := range d.ready {
+ d.dispatch(op)
+ }
+}
+
+func (d *Dispatcher) dispatch(op documentOp) {
+ if !d.acceptDocument() {
+ d.msgs <- fmt.Sprintf("refusing to dispatch document %s: too many errors", op.document.Id.String())
+ d.results <- op.resetResult()
+ return
+ }
+ go func() {
+ op.attempts++
+ op.result = d.feeder.Send(op.document)
+ d.results <- op
+ }()
+}
+
+func (d *Dispatcher) processResults() {
+ defer d.wg.Done()
+ for op := range d.results {
+ d.stats.Add(op.result.Stats)
+ retry := d.shouldRetry(op, op.result)
+ if retry {
+ d.enqueue(op.resetResult(), true)
+ } else if op.complete() {
+ d.inflightWg.Done()
+ }
+ d.dispatchNext(op.document.Id)
+ }
+}
+
+func (d *Dispatcher) dispatchNext(id Id) {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ k := id.String()
+ q, ok := d.inflight[k]
+ if !ok {
+ panic("no queue exists for " + id.String() + ": this should not happen")
+ }
+ if next, ok := q.Poll(); ok {
+ // we have more operations with this ID: notify dispatcher about the next one
+ d.ready <- next
+ } else {
+ // no more operations with this ID: release slot
+ delete(d.inflight, k)
+ d.releaseSlot()
}
}
func (d *Dispatcher) printMessages() {
- defer d.resultWg.Done()
+ defer d.wg.Done()
for msg := range d.msgs {
fmt.Fprintln(d.output, msg)
}
}
-func (d *Dispatcher) enqueue(op documentOp) error {
+func (d *Dispatcher) enqueue(op documentOp, isRetry bool) error {
d.mu.Lock()
if !d.started {
+ d.mu.Unlock()
return fmt.Errorf("dispatcher is closed")
}
+ if !d.acceptDocument() {
+ d.mu.Unlock()
+ return fmt.Errorf("refusing to enqueue document %s: too many errors", op.document.Id.String())
+ }
key := op.document.Id.String()
- group, ok := d.inflight[key]
+ q, ok := d.inflight[key]
if !ok {
- group = &documentGroup{q: NewQueue[documentOp](&d.listPool)}
- d.inflight[key] = group
+ q = NewQueue[documentOp](&d.listPool)
+ d.inflight[key] = q
+ } else {
+ q.Add(op, isRetry)
+ }
+ if !isRetry {
+ d.inflightWg.Add(1)
}
d.mu.Unlock()
- group.add(op, op.attempts > 0)
- d.dispatch(op.document.Id, group)
- return nil
-}
-
-func (d *Dispatcher) dispatch(id Id, group *documentGroup) {
- if !d.canDispatch() {
- d.msgs <- fmt.Sprintf("refusing to dispatch document %s: too many errors", id)
- return
+ if !ok && !isRetry {
+ // first operation with this ID: acquire slot
+ d.acquireSlot()
+ d.ready <- op
+ d.throttler.Sent()
}
- d.acquireSlot()
- d.workerWg.Add(1)
- go func() {
- defer d.workerWg.Done()
- d.sendDocumentIn(group)
- }()
- d.throttler.Sent()
+ return nil
}
-func (d *Dispatcher) canDispatch() bool {
+func (d *Dispatcher) acceptDocument() bool {
switch d.circuitBreaker.State() {
case CircuitClosed:
return true
@@ -202,20 +230,21 @@ func (d *Dispatcher) acquireSlot() {
func (d *Dispatcher) releaseSlot() { atomic.AddInt64(&d.inflightCount, -1) }
-func (d *Dispatcher) Enqueue(doc Document) error { return d.enqueue(documentOp{document: doc}) }
+func (d *Dispatcher) Enqueue(doc Document) error { return d.enqueue(documentOp{document: doc}, false) }
func (d *Dispatcher) Stats() Stats { return d.stats }
-// Close closes the dispatcher and waits for all inflight operations to complete.
+// Close waits for all inflight operations to complete and closes the dispatcher.
func (d *Dispatcher) Close() error {
- d.workerWg.Wait() // Wait for all inflight operations to complete
+ d.inflightWg.Wait() // Wait for all inflight operations to complete
d.mu.Lock()
if d.started {
+ close(d.ready)
close(d.results)
close(d.msgs)
d.started = false
}
d.mu.Unlock()
- d.resultWg.Wait() // Wait for results
+ d.wg.Wait() // Wait for all channel readers to return
return nil
}
diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go
index 2e2e9a5abbd..24d5d716d04 100644
--- a/client/go/internal/vespa/document/dispatcher_test.go
+++ b/client/go/internal/vespa/document/dispatcher_test.go
@@ -10,6 +10,8 @@ import (
)
type mockFeeder struct {
+ sendCount int
+ failCount int
failAfterNDocs int
documents []Document
mu sync.Mutex
@@ -21,11 +23,21 @@ func (f *mockFeeder) failAfterN(docs int) {
f.failAfterNDocs = docs
}
+func (f *mockFeeder) failN(times int) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ f.failCount = times
+}
+
func (f *mockFeeder) Send(doc Document) Result {
f.mu.Lock()
defer f.mu.Unlock()
- result := Result{Id: doc.Id}
- if f.failAfterNDocs > 0 && len(f.documents) >= f.failAfterNDocs {
+ f.sendCount++
+ result := Result{Id: doc.Id, HTTPStatus: 200}
+ failRequest := (f.failAfterNDocs > 0 && len(f.documents) >= f.failAfterNDocs) ||
+ (f.failCount > 0 && f.sendCount <= f.failCount)
+ if failRequest {
+ result.HTTPStatus = 500
result.Status = StatusVespaFailure
} else {
f.documents = append(f.documents, doc)
@@ -123,7 +135,7 @@ func TestDispatcherOrderingWithFailures(t *testing.T) {
dispatcher.Close()
wantDocs := docs[:2]
assert.Equal(t, wantDocs, feeder.documents)
- assert.Equal(t, int64(2), dispatcher.Stats().Errors)
+ assert.Equal(t, int64(22), dispatcher.Stats().Errors)
// Dispatching more documents for same ID succeed
feeder.failAfterN(0)
@@ -133,10 +145,30 @@ func TestDispatcherOrderingWithFailures(t *testing.T) {
dispatcher.Enqueue(Document{Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut})
dispatcher.Enqueue(Document{Id: mustParseId("id:ns:type::doc3"), Operation: OperationPut})
dispatcher.Close()
- assert.Equal(t, int64(2), dispatcher.Stats().Errors)
+ assert.Equal(t, int64(22), dispatcher.Stats().Errors)
assert.Equal(t, 6, len(feeder.documents))
}
+func TestDispatcherOrderingWithRetry(t *testing.T) {
+ feeder := &mockFeeder{}
+ commonId := mustParseId("id:ns:type::doc1")
+ docs := []Document{
+ {Id: commonId, Operation: OperationPut}, // fails
+ {Id: commonId, Operation: OperationRemove},
+ }
+ feeder.failN(5)
+ clock := &manualClock{tick: time.Second}
+ throttler := newThrottler(8, clock.now)
+ breaker := NewCircuitBreaker(time.Second, 0)
+ dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard, false)
+ for _, d := range docs {
+ dispatcher.Enqueue(d)
+ }
+ dispatcher.Close()
+ assert.Equal(t, docs, feeder.documents)
+ assert.Equal(t, int64(5), dispatcher.Stats().Errors)
+}
+
func TestDispatcherOpenCircuit(t *testing.T) {
feeder := &mockFeeder{}
doc := Document{Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut}
@@ -145,8 +177,11 @@ func TestDispatcherOpenCircuit(t *testing.T) {
breaker := &mockCircuitBreaker{}
dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard, false)
dispatcher.Enqueue(doc)
+ dispatcher.inflightWg.Wait()
breaker.state = CircuitOpen
- dispatcher.Enqueue(doc)
+ if err := dispatcher.Enqueue(doc); err == nil {
+ t.Fatal("expected error due to open circuit")
+ }
dispatcher.Close()
assert.Equal(t, 1, len(feeder.documents))
}
@@ -161,7 +196,7 @@ func BenchmarkDocumentDispatching(b *testing.B) {
b.ResetTimer() // ignore setup time
for n := 0; n < b.N; n++ {
- dispatcher.enqueue(documentOp{document: doc})
- dispatcher.workerWg.Wait()
+ dispatcher.Enqueue(doc)
+ dispatcher.inflightWg.Wait()
}
}
diff --git a/client/go/internal/vespa/document/feeder.go b/client/go/internal/vespa/document/feeder.go
index 9e6768d0eb4..a550ea32624 100644
--- a/client/go/internal/vespa/document/feeder.go
+++ b/client/go/internal/vespa/document/feeder.go
@@ -31,7 +31,7 @@ type Result struct {
}
func (r Result) Success() bool {
- return r.Err == nil && (r.Status == StatusSuccess || r.Status == StatusConditionNotMet)
+ return r.HTTPStatus/100 == 2 || r.HTTPStatus == 404 || r.HTTPStatus == 412
}
// Stats represents feeding operation statistics.