aboutsummaryrefslogtreecommitdiffstats
path: root/client/go
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-05-20 12:14:19 +0200
committerMartin Polden <mpolden@mpolden.no>2023-05-20 12:14:19 +0200
commit02448ad8603d18c8df9bd85dd0eeb8af1c3e6946 (patch)
tree974c4d26f7e16320e48cd6b6075523bfedcceca4 /client/go
parent018add596d64ada2954c200a45b32cf750b5a500 (diff)
Use atomic types
Diffstat (limited to 'client/go')
-rw-r--r--client/go/internal/vespa/document/dispatcher.go12
-rw-r--r--client/go/internal/vespa/document/throttler.go33
2 files changed, 23 insertions, 22 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index 7237a87b7e2..2ad5b841616 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -25,7 +25,7 @@ type Dispatcher struct {
msgs chan string
inflight map[string]*Queue[documentOp]
- inflightCount int64
+ inflightCount atomic.Int64
output io.Writer
verbose bool
@@ -76,7 +76,7 @@ func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool {
}
if result.HTTPStatus == 429 || result.HTTPStatus == 503 {
d.msgs <- fmt.Sprintf("feed: %s was throttled with status %d: retrying", op.document, result.HTTPStatus)
- d.throttler.Throttled(atomic.LoadInt64(&d.inflightCount))
+ d.throttler.Throttled(d.inflightCount.Load())
return true
}
if result.Err != nil || result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 {
@@ -226,20 +226,20 @@ func (d *Dispatcher) acceptDocument() bool {
}
func (d *Dispatcher) acquireSlot() {
- for atomic.LoadInt64(&d.inflightCount) >= d.throttler.TargetInflight() {
+ for d.inflightCount.Load() >= d.throttler.TargetInflight() {
time.Sleep(time.Millisecond)
}
- atomic.AddInt64(&d.inflightCount, 1)
+ d.inflightCount.Add(1)
}
-func (d *Dispatcher) releaseSlot() { atomic.AddInt64(&d.inflightCount, -1) }
+func (d *Dispatcher) releaseSlot() { d.inflightCount.Add(-1) }
func (d *Dispatcher) Enqueue(doc Document) error { return d.enqueue(documentOp{document: doc}, false) }
func (d *Dispatcher) Stats() Stats {
d.statsMu.Lock()
defer d.statsMu.Unlock()
- d.stats.Inflight = atomic.LoadInt64(&d.inflightCount)
+ d.stats.Inflight = d.inflightCount.Load()
return d.stats
}
diff --git a/client/go/internal/vespa/document/throttler.go b/client/go/internal/vespa/document/throttler.go
index 667a10d28e3..e32fb804b23 100644
--- a/client/go/internal/vespa/document/throttler.go
+++ b/client/go/internal/vespa/document/throttler.go
@@ -23,11 +23,11 @@ type Throttler interface {
type dynamicThrottler struct {
minInflight int64
maxInflight int64
- targetInflight int64
- targetTimesTen int64
+ targetInflight atomic.Int64
+ targetTimesTen atomic.Int64
throughputs []float64
- ok int64
+ ok atomic.Int64
sent int64
start time.Time
@@ -39,23 +39,24 @@ func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler {
minInflight = 16 * int64(connections)
maxInflight = 256 * minInflight // 4096 max streams per connection on the server side
)
- return &dynamicThrottler{
- minInflight: minInflight,
- maxInflight: maxInflight,
- targetInflight: 8 * minInflight,
- targetTimesTen: 10 * maxInflight,
+ t := &dynamicThrottler{
+ minInflight: minInflight,
+ maxInflight: maxInflight,
throughputs: make([]float64, 128),
start: nowFunc(),
now: nowFunc,
}
+ t.targetInflight.Store(8 * minInflight)
+ t.targetTimesTen.Store(10 * maxInflight)
+ return t
}
func NewThrottler(connections int) Throttler { return newThrottler(connections, time.Now) }
func (t *dynamicThrottler) Sent() {
- currentInflight := atomic.LoadInt64(&t.targetInflight)
+ currentInflight := t.targetInflight.Load()
t.sent++
if t.sent*t.sent*t.sent < 100*currentInflight*currentInflight {
return
@@ -64,7 +65,7 @@ func (t *dynamicThrottler) Sent() {
now := t.now()
elapsed := now.Sub(t.start)
t.start = now
- currentThroughput := float64(atomic.SwapInt64(&t.ok, 0)) / float64(elapsed)
+ currentThroughput := float64(t.ok.Swap(0)) / float64(elapsed)
// Use buckets for throughput over inflight, along the log-scale, in [minInflight, maxInflight).
index := int(float64(len(t.throughputs)) * math.Log(max(1, min(255, float64(currentInflight)/float64(t.minInflight)))) / math.Log(256))
@@ -85,20 +86,20 @@ func (t *dynamicThrottler) Sent() {
}
}
target := int64((rand.Float64()*0.20 + 0.92) * choice) // Random walk, skewed towards increase
- atomic.StoreInt64(&t.targetInflight, max(t.minInflight, min(t.maxInflight, target)))
+ t.targetInflight.Store(max(t.minInflight, min(t.maxInflight, target)))
}
func (t *dynamicThrottler) Success() {
- atomic.AddInt64(&t.targetTimesTen, 1)
- atomic.AddInt64(&t.ok, 1)
+ t.targetTimesTen.Add(1)
+ t.ok.Add(1)
}
func (t *dynamicThrottler) Throttled(inflight int64) {
- atomic.StoreInt64(&t.targetTimesTen, max(inflight*5, t.minInflight*10))
+ t.targetTimesTen.Store(max(inflight*5, t.minInflight*10))
}
func (t *dynamicThrottler) TargetInflight() int64 {
- staticTargetInflight := min(t.maxInflight, atomic.LoadInt64(&t.targetTimesTen)/10)
- targetInflight := atomic.LoadInt64(&t.targetInflight)
+ staticTargetInflight := min(t.maxInflight, t.targetTimesTen.Load()/10)
+ targetInflight := t.targetInflight.Load()
return min(staticTargetInflight, targetInflight)
}