aboutsummaryrefslogtreecommitdiffstats
path: root/client/go
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-05-25 19:20:17 +0200
committerMartin Polden <mpolden@mpolden.no>2023-05-25 19:22:11 +0200
commit3928e182afac9c6072a684c570917c8eb861ce82 (patch)
tree64773e19418d6a28cae40f0d86b493e1d1293ea0 /client/go
parentee148dc3631c5cc2fc9843f067f790d7dda817e1 (diff)
Clean up response logging
Diffstat (limited to 'client/go')
-rw-r--r--client/go/internal/vespa/document/circuit_breaker.go9
-rw-r--r--client/go/internal/vespa/document/circuit_breaker_test.go6
-rw-r--r--client/go/internal/vespa/document/dispatcher.go65
-rw-r--r--client/go/internal/vespa/document/dispatcher_test.go2
4 files changed, 44 insertions, 38 deletions
diff --git a/client/go/internal/vespa/document/circuit_breaker.go b/client/go/internal/vespa/document/circuit_breaker.go
index f7f0f4360df..9bcf2e3f619 100644
--- a/client/go/internal/vespa/document/circuit_breaker.go
+++ b/client/go/internal/vespa/document/circuit_breaker.go
@@ -19,7 +19,7 @@ const (
type CircuitBreaker interface {
Success()
- Error(error)
+ Failure()
State() CircuitState
}
@@ -28,7 +28,6 @@ type timeCircuitBreaker struct {
doomDuration time.Duration
failingSinceMillis atomic.Int64
- lastError atomic.Value
halfOpen atomic.Bool
open atomic.Bool
@@ -42,10 +41,8 @@ func (b *timeCircuitBreaker) Success() {
}
}
-func (b *timeCircuitBreaker) Error(err error) {
- if b.failingSinceMillis.CompareAndSwap(math.MaxInt64, b.now().UnixMilli()) {
- b.lastError.Store(err)
- }
+func (b *timeCircuitBreaker) Failure() {
+ b.failingSinceMillis.CompareAndSwap(math.MaxInt64, b.now().UnixMilli())
}
func (b *timeCircuitBreaker) State() CircuitState {
diff --git a/client/go/internal/vespa/document/circuit_breaker_test.go b/client/go/internal/vespa/document/circuit_breaker_test.go
index 7a4fffaae27..05dbd6da2f5 100644
--- a/client/go/internal/vespa/document/circuit_breaker_test.go
+++ b/client/go/internal/vespa/document/circuit_breaker_test.go
@@ -1,7 +1,6 @@
package document
import (
- "errors"
"testing"
"time"
@@ -12,7 +11,6 @@ func TestCircuitBreaker(t *testing.T) {
clock := &manualClock{}
breaker := NewCircuitBreaker(time.Second, time.Minute)
breaker.now = clock.now
- err := errors.New("error")
assert.Equal(t, CircuitClosed, breaker.State(), "Initial state is closed")
@@ -25,7 +23,7 @@ func TestCircuitBreaker(t *testing.T) {
clock.advance(100 * time.Second)
assert.Equal(t, CircuitClosed, breaker.State(), "State is closed some time after a success")
- breaker.Error(err)
+ breaker.Failure()
assert.Equal(t, CircuitClosed, breaker.State(), "State is closed right after a failure")
clock.advance(time.Second)
@@ -37,7 +35,7 @@ func TestCircuitBreaker(t *testing.T) {
breaker.Success()
assert.Equal(t, CircuitClosed, breaker.State(), "State is closed after a new success")
- breaker.Error(err)
+ breaker.Failure()
clock.advance(time.Minute)
assert.Equal(t, CircuitHalfOpen, breaker.State(), "State is half-open until doom duration has passed")
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index a2b84aaeef2..d9273d2f677 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -3,6 +3,7 @@ package document
import (
"fmt"
"io"
+ "strconv"
"strings"
"sync"
"sync/atomic"
@@ -62,41 +63,51 @@ func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, o
return d
}
-func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool {
+func (d *Dispatcher) logResult(doc Document, result Result, retry bool) {
if result.Trace != "" {
- d.msgs <- fmt.Sprintf("feed: trace for %s:\n%s", op.document, result.Trace)
+ d.msgs <- fmt.Sprintf("feed: trace for %s %s:\n%s", doc.Operation, doc.Id, result.Trace)
}
- if result.Success() {
- if d.verbose {
- d.msgs <- fmt.Sprintf("feed: %s succeeded with status %d", op.document, result.HTTPStatus)
- }
- d.throttler.Success()
- d.circuitBreaker.Success()
- return false
+ if !d.verbose && result.Success() {
+ return
}
- if result.HTTPStatus == 429 || result.HTTPStatus == 503 {
- d.msgs <- fmt.Sprintf("feed: %s was throttled with status %d: retrying", op.document, result.HTTPStatus)
- d.throttler.Throttled(d.inflightCount.Load())
- return true
+ var msg strings.Builder
+ msg.WriteString("feed: got status ")
+ msg.WriteString(strconv.Itoa(result.HTTPStatus))
+ msg.WriteString(" (")
+ if result.Body != nil {
+ msg.Write(result.Body)
+ } else {
+ msg.WriteString("no body")
}
- if result.Err != nil || result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 {
- retry := op.attempts < maxAttempts
- var msg strings.Builder
- msg.WriteString("feed: ")
- msg.WriteString(op.document.String())
- msg.WriteString(" failed: ")
- if result.Err != nil {
- msg.WriteString(result.Err.Error())
- } else {
- msg.WriteString(fmt.Sprintf("status %d", result.HTTPStatus))
- }
+ msg.WriteString(")")
+ msg.WriteString(" for ")
+ msg.WriteString(doc.Operation.String())
+ msg.WriteString(" ")
+ msg.WriteString(doc.Id.String())
+ if !result.Success() {
if retry {
msg.WriteString(": retrying")
} else {
- msg.WriteString(fmt.Sprintf(": giving up after %d attempts", maxAttempts))
+ msg.WriteString(": giving up after ")
+ msg.WriteString(strconv.Itoa(maxAttempts))
+ msg.WriteString(" attempts")
}
- d.msgs <- msg.String()
- d.circuitBreaker.Error(fmt.Errorf("request failed with status %d", result.HTTPStatus))
+ }
+ d.msgs <- msg.String()
+}
+
+func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool {
+ retry := op.attempts < maxAttempts
+ d.logResult(op.document, result, retry)
+ if result.Success() {
+ d.throttler.Success()
+ d.circuitBreaker.Success()
+ return false
+ } else if result.HTTPStatus == 429 || result.HTTPStatus == 503 {
+ d.throttler.Throttled(d.inflightCount.Load())
+ return true
+ } else if result.Err != nil || result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 {
+ d.circuitBreaker.Failure()
if retry {
return true
}
diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go
index 9bc0c76106c..834ec8490a6 100644
--- a/client/go/internal/vespa/document/dispatcher_test.go
+++ b/client/go/internal/vespa/document/dispatcher_test.go
@@ -48,7 +48,7 @@ func (f *mockFeeder) Send(doc Document) Result {
type mockCircuitBreaker struct{ state CircuitState }
func (c *mockCircuitBreaker) Success() {}
-func (c *mockCircuitBreaker) Error(err error) {}
+func (c *mockCircuitBreaker) Failure() {}
func (c *mockCircuitBreaker) State() CircuitState { return c.state }
func TestDispatcher(t *testing.T) {