diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-03-30 10:40:26 +0100 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-03-30 10:40:26 +0100 |
commit | 4b311c85dff067f9d7ebbb87882b1e0df9cf653b (patch) | |
tree | 6856845a52973e20dac486c96556091db4b2e8cd /client/go | |
parent | 95dbd20b3dc32e3512f9941245b91bbc3f6ad5ac (diff) |
Ensure Go 1.18 compatibility
Diffstat (limited to 'client/go')
4 files changed, 38 insertions, 39 deletions
diff --git a/client/go/internal/vespa/document/circuit_breaker.go b/client/go/internal/vespa/document/circuit_breaker.go index aff15e88069..17fc595d58f 100644 --- a/client/go/internal/vespa/document/circuit_breaker.go +++ b/client/go/internal/vespa/document/circuit_breaker.go @@ -27,49 +27,51 @@ type timeCircuitBreaker struct { graceDuration time.Duration doomDuration time.Duration - failingSinceMillis atomic.Int64 + failingSinceMillis int64 lastError atomic.Value - halfOpen atomic.Bool - open atomic.Bool + halfOpen atomic.Value + open atomic.Value now func() time.Time } func (b *timeCircuitBreaker) Success() { - b.failingSinceMillis.Store(math.MaxInt64) - if !b.open.Load() { + atomic.StoreInt64(&b.failingSinceMillis, math.MaxInt64) + if !b.open.Load().(bool) { b.halfOpen.CompareAndSwap(true, false) } } func (b *timeCircuitBreaker) Error(err error) { - if b.failingSinceMillis.CompareAndSwap(math.MaxInt64, b.now().UnixMilli()) { + if atomic.CompareAndSwapInt64(&b.failingSinceMillis, math.MaxInt64, b.now().UnixMilli()) { b.lastError.Store(err) } } func (b *timeCircuitBreaker) State() CircuitState { - failingDuration := b.now().Sub(time.UnixMilli(b.failingSinceMillis.Load())) + failingDuration := b.now().Sub(time.UnixMilli(atomic.LoadInt64(&b.failingSinceMillis))) if failingDuration > b.graceDuration { b.halfOpen.CompareAndSwap(false, true) } if b.doomDuration > 0 && failingDuration > b.doomDuration { b.open.CompareAndSwap(false, true) } - if b.open.Load() { + if b.open.Load().(bool) { return CircuitOpen - } else if b.halfOpen.Load() { + } else if b.halfOpen.Load().(bool) { return CircuitHalfOpen } return CircuitClosed } -func NewCircuitBreaker(graceDuration, doomDuration time.Duration) CircuitBreaker { +func NewCircuitBreaker(graceDuration, doomDuration time.Duration) *timeCircuitBreaker { b := &timeCircuitBreaker{ - graceDuration: graceDuration, - doomDuration: doomDuration, - now: time.Now, + graceDuration: graceDuration, + doomDuration: doomDuration, + now: time.Now, + failingSinceMillis: math.MaxInt64, } - b.failingSinceMillis.Store(math.MaxInt64) + b.open.Store(false) + b.halfOpen.Store(false) return b } diff --git a/client/go/internal/vespa/document/circuit_breaker_test.go b/client/go/internal/vespa/document/circuit_breaker_test.go index 99dd057438d..7a4fffaae27 100644 --- a/client/go/internal/vespa/document/circuit_breaker_test.go +++ b/client/go/internal/vespa/document/circuit_breaker_test.go @@ -10,11 +10,8 @@ import ( func TestCircuitBreaker(t *testing.T) { clock := &manualClock{} - breaker := &timeCircuitBreaker{ - graceDuration: time.Second, - doomDuration: time.Minute, - now: clock.now, - } + breaker := NewCircuitBreaker(time.Second, time.Minute) + breaker.now = clock.now err := errors.New("error") assert.Equal(t, CircuitClosed, breaker.State(), "Initial state is closed") diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index a65f16c9298..7011ae7a9b6 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -20,7 +20,7 @@ type Dispatcher struct { ready chan Id results chan Result inflight map[string]*documentGroup - inflightCount atomic.Int64 + inflightCount int64 mu sync.RWMutex wg sync.WaitGroup @@ -83,7 +83,7 @@ func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool { return false } if result.HTTPStatus == 429 || result.HTTPStatus == 503 { - d.throttler.Throttled(d.inflightCount.Load()) + d.throttler.Throttled(atomic.LoadInt64(&d.inflightCount)) return true } if result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 { @@ -163,13 +163,13 @@ func (d *Dispatcher) enqueueWithSlot(id Id) { } func (d *Dispatcher) acquireSlot() { - for d.inflightCount.Load() >= d.throttler.TargetInflight() { + for atomic.LoadInt64(&d.inflightCount) >= d.throttler.TargetInflight() { time.Sleep(time.Millisecond) } - d.inflightCount.Add(1) + atomic.AddInt64(&d.inflightCount, 1) } -func (d *Dispatcher) releaseSlot() { d.inflightCount.Add(-1) } +func (d *Dispatcher) releaseSlot() { atomic.AddInt64(&d.inflightCount, -1) } func closeAndWait[T any](ch chan T, wg *sync.WaitGroup, d *Dispatcher, markClosed bool) { d.mu.Lock() diff --git a/client/go/internal/vespa/document/throttler.go b/client/go/internal/vespa/document/throttler.go index f7c57ff97ed..69bb7c8d7ac 100644 --- a/client/go/internal/vespa/document/throttler.go +++ b/client/go/internal/vespa/document/throttler.go @@ -27,9 +27,9 @@ type Throttler interface { } type dynamicThrottler struct { - ok atomic.Int64 - targetInflight atomic.Int64 - targetTimesTen atomic.Int64 + ok int64 + targetInflight int64 + targetTimesTen int64 throughputs []float64 sent int64 @@ -39,20 +39,20 @@ type dynamicThrottler struct { } func newThrottler(nowFunc func() time.Time) *dynamicThrottler { - d := &dynamicThrottler{ + return &dynamicThrottler{ throughputs: make([]float64, 128), start: nowFunc(), now: nowFunc, + + targetInflight: 8 * throttlerMinInflight, + targetTimesTen: 10 * throttlerMaxInflight, } - d.targetInflight.Store(8 * throttlerMinInflight) - d.targetTimesTen.Store(10 * throttlerMaxInflight) - return d } func NewThrottler() Throttler { return newThrottler(time.Now) } func (t *dynamicThrottler) Sent() { - currentInflight := t.targetInflight.Load() + currentInflight := atomic.LoadInt64(&t.targetInflight) t.sent++ if t.sent*t.sent*t.sent < 100*currentInflight*currentInflight { return @@ -61,7 +61,7 @@ func (t *dynamicThrottler) Sent() { now := t.now() elapsed := now.Sub(t.start) t.start = now - currentThroughput := float64(t.ok.Swap(0)) / float64(elapsed) + currentThroughput := float64(atomic.SwapInt64(&t.ok, 0)) / float64(elapsed) // Use buckets for throughput over inflight, along the log-scale, in [minInflight, maxInflight). index := int(float64(len(t.throughputs)) * math.Log(max(1, min(255, float64(currentInflight)/throttlerMinInflight))) / math.Log(256)) @@ -82,21 +82,21 @@ func (t *dynamicThrottler) Sent() { } } target := int64((rand.Float64()*0.20 + 0.92) * choice) // Random walk, skewed towards increase - t.targetInflight.Store(max(throttlerMinInflight, min(throttlerMaxInflight, target))) + atomic.StoreInt64(&t.targetInflight, max(throttlerMinInflight, min(throttlerMaxInflight, target))) } func (t *dynamicThrottler) Success() { - t.targetTimesTen.Add(1) - t.ok.Add(1) + atomic.AddInt64(&t.targetTimesTen, 1) + atomic.AddInt64(&t.ok, 1) } func (t *dynamicThrottler) Throttled(inflight int64) { - t.targetTimesTen.Store(max(inflight*5, throttlerMinInflight*10)) + atomic.StoreInt64(&t.targetTimesTen, max(inflight*5, throttlerMinInflight*10)) } func (t *dynamicThrottler) TargetInflight() int64 { - staticTargetInflight := min(throttlerMaxInflight, t.targetTimesTen.Load()/10) - targetInflight := t.targetInflight.Load() + staticTargetInflight := min(throttlerMaxInflight, atomic.LoadInt64(&t.targetTimesTen)/10) + targetInflight := atomic.LoadInt64(&t.targetInflight) return min(staticTargetInflight, targetInflight) } |