diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-04-24 14:33:01 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-04-24 14:37:26 +0200 |
commit | e73277532d7aed8a9557449bad39cd5e4ee5054e (patch) | |
tree | 45522501b7aeff231940bf10fc215d8b281f511f /client | |
parent | 1966902403aa9a5cec19eb9c51f7a3f0f1d0e80c (diff) |
Respect circuit breaker
Diffstat (limited to 'client')
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 21 | ||||
-rw-r--r-- | client/go/internal/vespa/document/dispatcher_test.go | 20 |
2 files changed, 39 insertions, 2 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 0f3d39d5a78..51e60e4e131 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -163,11 +163,15 @@ func (d *Dispatcher) enqueue(op documentOp) error { } d.mu.Unlock() group.add(op, op.attempts > 0) - d.enqueueWithSlot(group) + d.dispatch(op.document.Id, group) return nil } -func (d *Dispatcher) enqueueWithSlot(group *documentGroup) { +func (d *Dispatcher) dispatch(id Id, group *documentGroup) { + if !d.canDispatch() { + d.msgs <- fmt.Sprintf("refusing to dispatch document %s: too many errors", id) + return + } d.acquireSlot() d.workerWg.Add(1) go func() { @@ -177,6 +181,19 @@ func (d *Dispatcher) enqueueWithSlot(group *documentGroup) { d.throttler.Sent() } +func (d *Dispatcher) canDispatch() bool { + switch d.circuitBreaker.State() { + case CircuitClosed: + return true + case CircuitHalfOpen: + time.Sleep(time.Second) + return true + case CircuitOpen: + return false + } + panic("invalid circuit state") +} + func (d *Dispatcher) acquireSlot() { for atomic.LoadInt64(&d.inflightCount) >= d.throttler.TargetInflight() { time.Sleep(time.Millisecond) diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go index c8f8e550ba4..2e2e9a5abbd 100644 --- a/client/go/internal/vespa/document/dispatcher_test.go +++ b/client/go/internal/vespa/document/dispatcher_test.go @@ -36,6 +36,12 @@ func (f *mockFeeder) Send(doc Document) Result { return result } +type mockCircuitBreaker struct{ state CircuitState } + +func (c *mockCircuitBreaker) Success() {} +func (c *mockCircuitBreaker) Error(err error) {} +func (c *mockCircuitBreaker) State() CircuitState { return c.state } + func TestDispatcher(t *testing.T) { feeder := &mockFeeder{} clock := &manualClock{tick: time.Second} @@ -131,6 +137,20 @@ func TestDispatcherOrderingWithFailures(t *testing.T) { assert.Equal(t, 6, len(feeder.documents)) } +func TestDispatcherOpenCircuit(t *testing.T) { + feeder := &mockFeeder{} + doc := Document{Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut} + clock := &manualClock{tick: time.Second} + throttler := newThrottler(8, clock.now) + breaker := &mockCircuitBreaker{} + dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard, false) + dispatcher.Enqueue(doc) + breaker.state = CircuitOpen + dispatcher.Enqueue(doc) + dispatcher.Close() + assert.Equal(t, 1, len(feeder.documents)) +} + func BenchmarkDocumentDispatching(b *testing.B) { feeder := &mockFeeder{} clock := &manualClock{tick: time.Second} |