From 02448ad8603d18c8df9bd85dd0eeb8af1c3e6946 Mon Sep 17 00:00:00 2001 From: Martin Polden Date: Sat, 20 May 2023 12:14:19 +0200 Subject: Use atomic types --- client/go/internal/vespa/document/dispatcher.go | 12 ++++----- client/go/internal/vespa/document/throttler.go | 33 +++++++++++++------------ 2 files changed, 23 insertions(+), 22 deletions(-) (limited to 'client/go') diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 7237a87b7e2..2ad5b841616 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -25,7 +25,7 @@ type Dispatcher struct { msgs chan string inflight map[string]*Queue[documentOp] - inflightCount int64 + inflightCount atomic.Int64 output io.Writer verbose bool @@ -76,7 +76,7 @@ func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool { } if result.HTTPStatus == 429 || result.HTTPStatus == 503 { d.msgs <- fmt.Sprintf("feed: %s was throttled with status %d: retrying", op.document, result.HTTPStatus) - d.throttler.Throttled(atomic.LoadInt64(&d.inflightCount)) + d.throttler.Throttled(d.inflightCount.Load()) return true } if result.Err != nil || result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 { @@ -226,20 +226,20 @@ func (d *Dispatcher) acceptDocument() bool { } func (d *Dispatcher) acquireSlot() { - for atomic.LoadInt64(&d.inflightCount) >= d.throttler.TargetInflight() { + for d.inflightCount.Load() >= d.throttler.TargetInflight() { time.Sleep(time.Millisecond) } - atomic.AddInt64(&d.inflightCount, 1) + d.inflightCount.Add(1) } -func (d *Dispatcher) releaseSlot() { atomic.AddInt64(&d.inflightCount, -1) } +func (d *Dispatcher) releaseSlot() { d.inflightCount.Add(-1) } func (d *Dispatcher) Enqueue(doc Document) error { return d.enqueue(documentOp{document: doc}, false) } func (d *Dispatcher) Stats() Stats { d.statsMu.Lock() defer d.statsMu.Unlock() - d.stats.Inflight = atomic.LoadInt64(&d.inflightCount) + d.stats.Inflight = d.inflightCount.Load() return d.stats } diff --git a/client/go/internal/vespa/document/throttler.go b/client/go/internal/vespa/document/throttler.go index 667a10d28e3..e32fb804b23 100644 --- a/client/go/internal/vespa/document/throttler.go +++ b/client/go/internal/vespa/document/throttler.go @@ -23,11 +23,11 @@ type Throttler interface { type dynamicThrottler struct { minInflight int64 maxInflight int64 - targetInflight int64 - targetTimesTen int64 + targetInflight atomic.Int64 + targetTimesTen atomic.Int64 throughputs []float64 - ok int64 + ok atomic.Int64 sent int64 start time.Time @@ -39,23 +39,24 @@ func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler { minInflight = 16 * int64(connections) maxInflight = 256 * minInflight // 4096 max streams per connection on the server side ) - return &dynamicThrottler{ - minInflight: minInflight, - maxInflight: maxInflight, - targetInflight: 8 * minInflight, - targetTimesTen: 10 * maxInflight, + t := &dynamicThrottler{ + minInflight: minInflight, + maxInflight: maxInflight, throughputs: make([]float64, 128), start: nowFunc(), now: nowFunc, } + t.targetInflight.Store(8 * minInflight) + t.targetTimesTen.Store(10 * maxInflight) + return t } func NewThrottler(connections int) Throttler { return newThrottler(connections, time.Now) } func (t *dynamicThrottler) Sent() { - currentInflight := atomic.LoadInt64(&t.targetInflight) + currentInflight := t.targetInflight.Load() t.sent++ if t.sent*t.sent*t.sent < 100*currentInflight*currentInflight { return @@ -64,7 +65,7 @@ func (t *dynamicThrottler) Sent() { now := t.now() elapsed := now.Sub(t.start) t.start = now - currentThroughput := float64(atomic.SwapInt64(&t.ok, 0)) / float64(elapsed) + currentThroughput := float64(t.ok.Swap(0)) / float64(elapsed) // Use buckets for throughput over inflight, along the log-scale, in [minInflight, maxInflight). index := int(float64(len(t.throughputs)) * math.Log(max(1, min(255, float64(currentInflight)/float64(t.minInflight)))) / math.Log(256)) @@ -85,20 +86,20 @@ func (t *dynamicThrottler) Sent() { } } target := int64((rand.Float64()*0.20 + 0.92) * choice) // Random walk, skewed towards increase - atomic.StoreInt64(&t.targetInflight, max(t.minInflight, min(t.maxInflight, target))) + t.targetInflight.Store(max(t.minInflight, min(t.maxInflight, target))) } func (t *dynamicThrottler) Success() { - atomic.AddInt64(&t.targetTimesTen, 1) - atomic.AddInt64(&t.ok, 1) + t.targetTimesTen.Add(1) + t.ok.Add(1) } func (t *dynamicThrottler) Throttled(inflight int64) { - atomic.StoreInt64(&t.targetTimesTen, max(inflight*5, t.minInflight*10)) + t.targetTimesTen.Store(max(inflight*5, t.minInflight*10)) } func (t *dynamicThrottler) TargetInflight() int64 { - staticTargetInflight := min(t.maxInflight, atomic.LoadInt64(&t.targetTimesTen)/10) - targetInflight := atomic.LoadInt64(&t.targetInflight) + staticTargetInflight := min(t.maxInflight, t.targetTimesTen.Load()/10) + targetInflight := t.targetInflight.Load() return min(staticTargetInflight, targetInflight) } -- cgit v1.2.3