diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-03-28 15:30:11 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-03-28 15:54:54 +0200 |
commit | 471e4dc6cad87c887756f6479bddfa968684abde (patch) | |
tree | 73320831d2996cd0cc0861fa679a332371e8cc88 | |
parent | f66405e8b76c85eb426f2a8fbc9345a6c0a2afb7 (diff) |
Use circuit breaker in dispatcher
-rw-r--r-- | client/go/internal/cli/cmd/feed.go | 4 | ||||
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 28 | ||||
-rw-r--r-- | client/go/internal/vespa/document/dispatcher_test.go | 9 |
3 files changed, 26 insertions, 15 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index 04deccf056a..c8e032929b8 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -60,7 +60,9 @@ func feed(r io.Reader, cli *CLI, concurrency int) error { BaseURL: service.BaseURL, }, service) throttler := document.NewThrottler() - dispatcher := document.NewDispatcher(client, throttler) + // TODO(mpolden): Make doom duration configurable + circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0) + dispatcher := document.NewDispatcher(client, throttler, circuitBreaker) dec := document.NewDecoder(r) start := cli.now() diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 8a034ffef10..a65f16c9298 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -11,9 +11,10 @@ const maxAttempts = 10 // Dispatcher dispatches documents from a queue to a Feeder. type Dispatcher struct { - feeder Feeder - throttler Throttler - stats Stats + feeder Feeder + throttler Throttler + circuitBreaker CircuitBreaker + stats Stats closed bool ready chan Id @@ -44,11 +45,12 @@ func (g *documentGroup) append(op documentOp) { g.operations = append(g.operations, op) } -func NewDispatcher(feeder Feeder, throttler Throttler) *Dispatcher { +func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker) *Dispatcher { d := &Dispatcher{ - feeder: feeder, - throttler: throttler, - inflight: make(map[string]*documentGroup), + feeder: feeder, + throttler: throttler, + circuitBreaker: breaker, + inflight: make(map[string]*documentGroup), } d.start() return d @@ -60,16 +62,16 @@ func (d *Dispatcher) dispatchAll(g *documentGroup) { for i := 0; i < len(g.operations); i++ { op := g.operations[i] ok := false - for op.attempts <= maxAttempts && !ok { - op.attempts += 1 + for !ok { + op.attempts++ result := d.feeder.Send(op.document) - d.releaseSlot() d.results <- result ok = result.Status.Success() if !d.shouldRetry(op, result) { break } } + d.releaseSlot() } g.operations = nil } @@ -77,6 +79,7 @@ func (d *Dispatcher) dispatchAll(g *documentGroup) { func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool { if result.HTTPStatus/100 == 2 || result.HTTPStatus == 404 || result.HTTPStatus == 412 { d.throttler.Success() + d.circuitBreaker.Success() return false } if result.HTTPStatus == 429 || result.HTTPStatus == 503 { @@ -84,7 +87,10 @@ func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool { return true } if result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 { - // TODO(mpolden): Trigger circuit-breaker + d.circuitBreaker.Error(fmt.Errorf("request failed with status %d", result.HTTPStatus)) + if op.attempts <= maxAttempts { + return true + } } return false } diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go index 32d6e68e3c7..8a6d8c6117c 100644 --- a/client/go/internal/vespa/document/dispatcher_test.go +++ b/client/go/internal/vespa/document/dispatcher_test.go @@ -39,7 +39,8 @@ func TestDispatcher(t *testing.T) { feeder := &mockFeeder{} clock := &manualClock{tick: time.Second} throttler := newThrottler(clock.now) - dispatcher := NewDispatcher(feeder, throttler) + breaker := NewCircuitBreaker(time.Second, 0) + dispatcher := NewDispatcher(feeder, throttler, breaker) docs := []Document{ {Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{"foo": "123"}}`)}, {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Body: []byte(`{"fields":{"bar": "456"}}`)}, @@ -71,7 +72,8 @@ func TestDispatcherOrdering(t *testing.T) { } clock := &manualClock{tick: time.Second} throttler := newThrottler(clock.now) - dispatcher := NewDispatcher(feeder, throttler) + breaker := NewCircuitBreaker(time.Second, 0) + dispatcher := NewDispatcher(feeder, throttler, breaker) for _, d := range docs { dispatcher.Enqueue(d) } @@ -106,7 +108,8 @@ func TestDispatcherOrderingWithFailures(t *testing.T) { feeder.failAfterN(2) clock := &manualClock{tick: time.Second} throttler := newThrottler(clock.now) - dispatcher := NewDispatcher(feeder, throttler) + breaker := NewCircuitBreaker(time.Second, 0) + dispatcher := NewDispatcher(feeder, throttler, breaker) for _, d := range docs { dispatcher.Enqueue(d) } |