summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-04-24 14:33:01 +0200
committerMartin Polden <mpolden@mpolden.no>2023-04-24 14:37:26 +0200
commite73277532d7aed8a9557449bad39cd5e4ee5054e (patch)
tree45522501b7aeff231940bf10fc215d8b281f511f
parent1966902403aa9a5cec19eb9c51f7a3f0f1d0e80c (diff)
Respect circuit breaker
-rw-r--r--client/go/internal/vespa/document/dispatcher.go21
-rw-r--r--client/go/internal/vespa/document/dispatcher_test.go20
2 files changed, 39 insertions, 2 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index 0f3d39d5a78..51e60e4e131 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -163,11 +163,15 @@ func (d *Dispatcher) enqueue(op documentOp) error {
}
d.mu.Unlock()
group.add(op, op.attempts > 0)
- d.enqueueWithSlot(group)
+ d.dispatch(op.document.Id, group)
return nil
}
-func (d *Dispatcher) enqueueWithSlot(group *documentGroup) {
+func (d *Dispatcher) dispatch(id Id, group *documentGroup) {
+ if !d.canDispatch() {
+ d.msgs <- fmt.Sprintf("refusing to dispatch document %s: too many errors", id)
+ return
+ }
d.acquireSlot()
d.workerWg.Add(1)
go func() {
@@ -177,6 +181,19 @@ func (d *Dispatcher) enqueueWithSlot(group *documentGroup) {
d.throttler.Sent()
}
+func (d *Dispatcher) canDispatch() bool {
+ switch d.circuitBreaker.State() {
+ case CircuitClosed:
+ return true
+ case CircuitHalfOpen:
+ time.Sleep(time.Second)
+ return true
+ case CircuitOpen:
+ return false
+ }
+ panic("invalid circuit state")
+}
+
func (d *Dispatcher) acquireSlot() {
for atomic.LoadInt64(&d.inflightCount) >= d.throttler.TargetInflight() {
time.Sleep(time.Millisecond)
diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go
index c8f8e550ba4..2e2e9a5abbd 100644
--- a/client/go/internal/vespa/document/dispatcher_test.go
+++ b/client/go/internal/vespa/document/dispatcher_test.go
@@ -36,6 +36,12 @@ func (f *mockFeeder) Send(doc Document) Result {
return result
}
+type mockCircuitBreaker struct{ state CircuitState }
+
+func (c *mockCircuitBreaker) Success() {}
+func (c *mockCircuitBreaker) Error(err error) {}
+func (c *mockCircuitBreaker) State() CircuitState { return c.state }
+
func TestDispatcher(t *testing.T) {
feeder := &mockFeeder{}
clock := &manualClock{tick: time.Second}
@@ -131,6 +137,20 @@ func TestDispatcherOrderingWithFailures(t *testing.T) {
assert.Equal(t, 6, len(feeder.documents))
}
+func TestDispatcherOpenCircuit(t *testing.T) {
+ feeder := &mockFeeder{}
+ doc := Document{Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut}
+ clock := &manualClock{tick: time.Second}
+ throttler := newThrottler(8, clock.now)
+ breaker := &mockCircuitBreaker{}
+ dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard, false)
+ dispatcher.Enqueue(doc)
+ breaker.state = CircuitOpen
+ dispatcher.Enqueue(doc)
+ dispatcher.Close()
+ assert.Equal(t, 1, len(feeder.documents))
+}
+
func BenchmarkDocumentDispatching(b *testing.B) {
feeder := &mockFeeder{}
clock := &manualClock{tick: time.Second}