summaryrefslogtreecommitdiffstats
path: root/client/go
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-03-30 10:40:26 +0100
committerMartin Polden <mpolden@mpolden.no>2023-03-30 10:40:26 +0100
commit4b311c85dff067f9d7ebbb87882b1e0df9cf653b (patch)
tree6856845a52973e20dac486c96556091db4b2e8cd /client/go
parent95dbd20b3dc32e3512f9941245b91bbc3f6ad5ac (diff)
Ensure Go 1.18 compatibility
Diffstat (limited to 'client/go')
-rw-r--r--client/go/internal/vespa/document/circuit_breaker.go30
-rw-r--r--client/go/internal/vespa/document/circuit_breaker_test.go7
-rw-r--r--client/go/internal/vespa/document/dispatcher.go10
-rw-r--r--client/go/internal/vespa/document/throttler.go30
4 files changed, 38 insertions, 39 deletions
diff --git a/client/go/internal/vespa/document/circuit_breaker.go b/client/go/internal/vespa/document/circuit_breaker.go
index aff15e88069..17fc595d58f 100644
--- a/client/go/internal/vespa/document/circuit_breaker.go
+++ b/client/go/internal/vespa/document/circuit_breaker.go
@@ -27,49 +27,51 @@ type timeCircuitBreaker struct {
graceDuration time.Duration
doomDuration time.Duration
- failingSinceMillis atomic.Int64
+ failingSinceMillis int64
lastError atomic.Value
- halfOpen atomic.Bool
- open atomic.Bool
+ halfOpen atomic.Value
+ open atomic.Value
now func() time.Time
}
func (b *timeCircuitBreaker) Success() {
- b.failingSinceMillis.Store(math.MaxInt64)
- if !b.open.Load() {
+ atomic.StoreInt64(&b.failingSinceMillis, math.MaxInt64)
+ if !b.open.Load().(bool) {
b.halfOpen.CompareAndSwap(true, false)
}
}
func (b *timeCircuitBreaker) Error(err error) {
- if b.failingSinceMillis.CompareAndSwap(math.MaxInt64, b.now().UnixMilli()) {
+ if atomic.CompareAndSwapInt64(&b.failingSinceMillis, math.MaxInt64, b.now().UnixMilli()) {
b.lastError.Store(err)
}
}
func (b *timeCircuitBreaker) State() CircuitState {
- failingDuration := b.now().Sub(time.UnixMilli(b.failingSinceMillis.Load()))
+ failingDuration := b.now().Sub(time.UnixMilli(atomic.LoadInt64(&b.failingSinceMillis)))
if failingDuration > b.graceDuration {
b.halfOpen.CompareAndSwap(false, true)
}
if b.doomDuration > 0 && failingDuration > b.doomDuration {
b.open.CompareAndSwap(false, true)
}
- if b.open.Load() {
+ if b.open.Load().(bool) {
return CircuitOpen
- } else if b.halfOpen.Load() {
+ } else if b.halfOpen.Load().(bool) {
return CircuitHalfOpen
}
return CircuitClosed
}
-func NewCircuitBreaker(graceDuration, doomDuration time.Duration) CircuitBreaker {
+func NewCircuitBreaker(graceDuration, doomDuration time.Duration) *timeCircuitBreaker {
b := &timeCircuitBreaker{
- graceDuration: graceDuration,
- doomDuration: doomDuration,
- now: time.Now,
+ graceDuration: graceDuration,
+ doomDuration: doomDuration,
+ now: time.Now,
+ failingSinceMillis: math.MaxInt64,
}
- b.failingSinceMillis.Store(math.MaxInt64)
+ b.open.Store(false)
+ b.halfOpen.Store(false)
return b
}
diff --git a/client/go/internal/vespa/document/circuit_breaker_test.go b/client/go/internal/vespa/document/circuit_breaker_test.go
index 99dd057438d..7a4fffaae27 100644
--- a/client/go/internal/vespa/document/circuit_breaker_test.go
+++ b/client/go/internal/vespa/document/circuit_breaker_test.go
@@ -10,11 +10,8 @@ import (
func TestCircuitBreaker(t *testing.T) {
clock := &manualClock{}
- breaker := &timeCircuitBreaker{
- graceDuration: time.Second,
- doomDuration: time.Minute,
- now: clock.now,
- }
+ breaker := NewCircuitBreaker(time.Second, time.Minute)
+ breaker.now = clock.now
err := errors.New("error")
assert.Equal(t, CircuitClosed, breaker.State(), "Initial state is closed")
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index a65f16c9298..7011ae7a9b6 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -20,7 +20,7 @@ type Dispatcher struct {
ready chan Id
results chan Result
inflight map[string]*documentGroup
- inflightCount atomic.Int64
+ inflightCount int64
mu sync.RWMutex
wg sync.WaitGroup
@@ -83,7 +83,7 @@ func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool {
return false
}
if result.HTTPStatus == 429 || result.HTTPStatus == 503 {
- d.throttler.Throttled(d.inflightCount.Load())
+ d.throttler.Throttled(atomic.LoadInt64(&d.inflightCount))
return true
}
if result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 {
@@ -163,13 +163,13 @@ func (d *Dispatcher) enqueueWithSlot(id Id) {
}
func (d *Dispatcher) acquireSlot() {
- for d.inflightCount.Load() >= d.throttler.TargetInflight() {
+ for atomic.LoadInt64(&d.inflightCount) >= d.throttler.TargetInflight() {
time.Sleep(time.Millisecond)
}
- d.inflightCount.Add(1)
+ atomic.AddInt64(&d.inflightCount, 1)
}
-func (d *Dispatcher) releaseSlot() { d.inflightCount.Add(-1) }
+func (d *Dispatcher) releaseSlot() { atomic.AddInt64(&d.inflightCount, -1) }
func closeAndWait[T any](ch chan T, wg *sync.WaitGroup, d *Dispatcher, markClosed bool) {
d.mu.Lock()
diff --git a/client/go/internal/vespa/document/throttler.go b/client/go/internal/vespa/document/throttler.go
index f7c57ff97ed..69bb7c8d7ac 100644
--- a/client/go/internal/vespa/document/throttler.go
+++ b/client/go/internal/vespa/document/throttler.go
@@ -27,9 +27,9 @@ type Throttler interface {
}
type dynamicThrottler struct {
- ok atomic.Int64
- targetInflight atomic.Int64
- targetTimesTen atomic.Int64
+ ok int64
+ targetInflight int64
+ targetTimesTen int64
throughputs []float64
sent int64
@@ -39,20 +39,20 @@ type dynamicThrottler struct {
}
func newThrottler(nowFunc func() time.Time) *dynamicThrottler {
- d := &dynamicThrottler{
+ return &dynamicThrottler{
throughputs: make([]float64, 128),
start: nowFunc(),
now: nowFunc,
+
+ targetInflight: 8 * throttlerMinInflight,
+ targetTimesTen: 10 * throttlerMaxInflight,
}
- d.targetInflight.Store(8 * throttlerMinInflight)
- d.targetTimesTen.Store(10 * throttlerMaxInflight)
- return d
}
func NewThrottler() Throttler { return newThrottler(time.Now) }
func (t *dynamicThrottler) Sent() {
- currentInflight := t.targetInflight.Load()
+ currentInflight := atomic.LoadInt64(&t.targetInflight)
t.sent++
if t.sent*t.sent*t.sent < 100*currentInflight*currentInflight {
return
@@ -61,7 +61,7 @@ func (t *dynamicThrottler) Sent() {
now := t.now()
elapsed := now.Sub(t.start)
t.start = now
- currentThroughput := float64(t.ok.Swap(0)) / float64(elapsed)
+ currentThroughput := float64(atomic.SwapInt64(&t.ok, 0)) / float64(elapsed)
// Use buckets for throughput over inflight, along the log-scale, in [minInflight, maxInflight).
index := int(float64(len(t.throughputs)) * math.Log(max(1, min(255, float64(currentInflight)/throttlerMinInflight))) / math.Log(256))
@@ -82,21 +82,21 @@ func (t *dynamicThrottler) Sent() {
}
}
target := int64((rand.Float64()*0.20 + 0.92) * choice) // Random walk, skewed towards increase
- t.targetInflight.Store(max(throttlerMinInflight, min(throttlerMaxInflight, target)))
+ atomic.StoreInt64(&t.targetInflight, max(throttlerMinInflight, min(throttlerMaxInflight, target)))
}
func (t *dynamicThrottler) Success() {
- t.targetTimesTen.Add(1)
- t.ok.Add(1)
+ atomic.AddInt64(&t.targetTimesTen, 1)
+ atomic.AddInt64(&t.ok, 1)
}
func (t *dynamicThrottler) Throttled(inflight int64) {
- t.targetTimesTen.Store(max(inflight*5, throttlerMinInflight*10))
+ atomic.StoreInt64(&t.targetTimesTen, max(inflight*5, throttlerMinInflight*10))
}
func (t *dynamicThrottler) TargetInflight() int64 {
- staticTargetInflight := min(throttlerMaxInflight, t.targetTimesTen.Load()/10)
- targetInflight := t.targetInflight.Load()
+ staticTargetInflight := min(throttlerMaxInflight, atomic.LoadInt64(&t.targetTimesTen)/10)
+ targetInflight := atomic.LoadInt64(&t.targetInflight)
return min(staticTargetInflight, targetInflight)
}