aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-03-28 15:30:11 +0200
committerMartin Polden <mpolden@mpolden.no>2023-03-28 15:54:54 +0200
commit471e4dc6cad87c887756f6479bddfa968684abde (patch)
tree73320831d2996cd0cc0861fa679a332371e8cc88
parentf66405e8b76c85eb426f2a8fbc9345a6c0a2afb7 (diff)
Use circuit breaker in dispatcher
-rw-r--r--client/go/internal/cli/cmd/feed.go4
-rw-r--r--client/go/internal/vespa/document/dispatcher.go28
-rw-r--r--client/go/internal/vespa/document/dispatcher_test.go9
3 files changed, 26 insertions, 15 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go
index 04deccf056a..c8e032929b8 100644
--- a/client/go/internal/cli/cmd/feed.go
+++ b/client/go/internal/cli/cmd/feed.go
@@ -60,7 +60,9 @@ func feed(r io.Reader, cli *CLI, concurrency int) error {
BaseURL: service.BaseURL,
}, service)
throttler := document.NewThrottler()
- dispatcher := document.NewDispatcher(client, throttler)
+ // TODO(mpolden): Make doom duration configurable
+ circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0)
+ dispatcher := document.NewDispatcher(client, throttler, circuitBreaker)
dec := document.NewDecoder(r)
start := cli.now()
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index 8a034ffef10..a65f16c9298 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -11,9 +11,10 @@ const maxAttempts = 10
// Dispatcher dispatches documents from a queue to a Feeder.
type Dispatcher struct {
- feeder Feeder
- throttler Throttler
- stats Stats
+ feeder Feeder
+ throttler Throttler
+ circuitBreaker CircuitBreaker
+ stats Stats
closed bool
ready chan Id
@@ -44,11 +45,12 @@ func (g *documentGroup) append(op documentOp) {
g.operations = append(g.operations, op)
}
-func NewDispatcher(feeder Feeder, throttler Throttler) *Dispatcher {
+func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker) *Dispatcher {
d := &Dispatcher{
- feeder: feeder,
- throttler: throttler,
- inflight: make(map[string]*documentGroup),
+ feeder: feeder,
+ throttler: throttler,
+ circuitBreaker: breaker,
+ inflight: make(map[string]*documentGroup),
}
d.start()
return d
@@ -60,16 +62,16 @@ func (d *Dispatcher) dispatchAll(g *documentGroup) {
for i := 0; i < len(g.operations); i++ {
op := g.operations[i]
ok := false
- for op.attempts <= maxAttempts && !ok {
- op.attempts += 1
+ for !ok {
+ op.attempts++
result := d.feeder.Send(op.document)
- d.releaseSlot()
d.results <- result
ok = result.Status.Success()
if !d.shouldRetry(op, result) {
break
}
}
+ d.releaseSlot()
}
g.operations = nil
}
@@ -77,6 +79,7 @@ func (d *Dispatcher) dispatchAll(g *documentGroup) {
func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool {
if result.HTTPStatus/100 == 2 || result.HTTPStatus == 404 || result.HTTPStatus == 412 {
d.throttler.Success()
+ d.circuitBreaker.Success()
return false
}
if result.HTTPStatus == 429 || result.HTTPStatus == 503 {
@@ -84,7 +87,10 @@ func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool {
return true
}
if result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 {
- // TODO(mpolden): Trigger circuit-breaker
+ d.circuitBreaker.Error(fmt.Errorf("request failed with status %d", result.HTTPStatus))
+ if op.attempts <= maxAttempts {
+ return true
+ }
}
return false
}
diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go
index 32d6e68e3c7..8a6d8c6117c 100644
--- a/client/go/internal/vespa/document/dispatcher_test.go
+++ b/client/go/internal/vespa/document/dispatcher_test.go
@@ -39,7 +39,8 @@ func TestDispatcher(t *testing.T) {
feeder := &mockFeeder{}
clock := &manualClock{tick: time.Second}
throttler := newThrottler(clock.now)
- dispatcher := NewDispatcher(feeder, throttler)
+ breaker := NewCircuitBreaker(time.Second, 0)
+ dispatcher := NewDispatcher(feeder, throttler, breaker)
docs := []Document{
{Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{"foo": "123"}}`)},
{Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Body: []byte(`{"fields":{"bar": "456"}}`)},
@@ -71,7 +72,8 @@ func TestDispatcherOrdering(t *testing.T) {
}
clock := &manualClock{tick: time.Second}
throttler := newThrottler(clock.now)
- dispatcher := NewDispatcher(feeder, throttler)
+ breaker := NewCircuitBreaker(time.Second, 0)
+ dispatcher := NewDispatcher(feeder, throttler, breaker)
for _, d := range docs {
dispatcher.Enqueue(d)
}
@@ -106,7 +108,8 @@ func TestDispatcherOrderingWithFailures(t *testing.T) {
feeder.failAfterN(2)
clock := &manualClock{tick: time.Second}
throttler := newThrottler(clock.now)
- dispatcher := NewDispatcher(feeder, throttler)
+ breaker := NewCircuitBreaker(time.Second, 0)
+ dispatcher := NewDispatcher(feeder, throttler, breaker)
for _, d := range docs {
dispatcher.Enqueue(d)
}