diff options
author | Arnstein Ressem <aressem@verizonmedia.com> | 2023-05-22 10:16:43 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-22 10:16:43 +0200 |
commit | 64e7dbfd3be5bea33646f4a1bf1e39175f0bdc51 (patch) | |
tree | bdcf52a119ebe9e038ff4e13d096f7dadb2564ce /client | |
parent | be02a3e03d997353cfd35b9c7337a56ef88d2a11 (diff) | |
parent | 02448ad8603d18c8df9bd85dd0eeb8af1c3e6946 (diff) |
Merge pull request #27161 from vespa-engine/mpolden/go119
Require Go 1.19
Diffstat (limited to 'client')
-rw-r--r-- | client/go/go.mod | 6 | ||||
-rw-r--r-- | client/go/go.sum | 4 | ||||
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 12 | ||||
-rw-r--r-- | client/go/internal/vespa/document/throttler.go | 33 |
4 files changed, 28 insertions, 27 deletions
diff --git a/client/go/go.mod b/client/go/go.mod index f5f923cc063..5d1f6175e55 100644 --- a/client/go/go.mod +++ b/client/go/go.mod @@ -1,13 +1,13 @@ module github.com/vespa-engine/vespa/client/go -go 1.18 +go 1.19 require ( github.com/alessio/shellescape v1.4.1 github.com/briandowns/spinner v1.23.0 github.com/fatih/color v1.15.0 - // This is the most recent version compatible with Go 1.18. Upgrade when we upgrade our Go version - github.com/go-json-experiment/json v0.0.0-20220727223814-4987ed27d447 + // This is the most recent version compatible with Go 1.19. Upgrade when we upgrade our Go version + github.com/go-json-experiment/json v0.0.0-20230216065249-540f01442424 github.com/klauspost/compress v1.16.5 github.com/mattn/go-colorable v0.1.13 github.com/mattn/go-isatty v0.0.18 diff --git a/client/go/go.sum b/client/go/go.sum index 861c8725ed0..03206b0c5e8 100644 --- a/client/go/go.sum +++ b/client/go/go.sum @@ -11,8 +11,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= -github.com/go-json-experiment/json v0.0.0-20220727223814-4987ed27d447 h1:hDdASyrtiSuQvaafDrVTX34wy4ibhxrJO9/vyFbBt0k= -github.com/go-json-experiment/json v0.0.0-20220727223814-4987ed27d447/go.mod h1:jbpkervfdK2HCcB2YEFmwYeaq057KFiaaKTNTHV4OOQ= +github.com/go-json-experiment/json v0.0.0-20230216065249-540f01442424 h1:I1EK0t+BDH+kvlozNqrvzKqsWeM2QUKxXH0iW2fjDDw= +github.com/go-json-experiment/json v0.0.0-20230216065249-540f01442424/go.mod h1:I+I5/LT2lLP0eZsBNaVDrOrYASx9h7o7mRHmy+535/A= github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 7237a87b7e2..2ad5b841616 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -25,7 +25,7 @@ type Dispatcher struct { msgs chan string inflight map[string]*Queue[documentOp] - inflightCount int64 + inflightCount atomic.Int64 output io.Writer verbose bool @@ -76,7 +76,7 @@ func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool { } if result.HTTPStatus == 429 || result.HTTPStatus == 503 { d.msgs <- fmt.Sprintf("feed: %s was throttled with status %d: retrying", op.document, result.HTTPStatus) - d.throttler.Throttled(atomic.LoadInt64(&d.inflightCount)) + d.throttler.Throttled(d.inflightCount.Load()) return true } if result.Err != nil || result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 { @@ -226,20 +226,20 @@ func (d *Dispatcher) acceptDocument() bool { } func (d *Dispatcher) acquireSlot() { - for atomic.LoadInt64(&d.inflightCount) >= d.throttler.TargetInflight() { + for d.inflightCount.Load() >= d.throttler.TargetInflight() { time.Sleep(time.Millisecond) } - atomic.AddInt64(&d.inflightCount, 1) + d.inflightCount.Add(1) } -func (d *Dispatcher) releaseSlot() { atomic.AddInt64(&d.inflightCount, -1) } +func (d *Dispatcher) releaseSlot() { d.inflightCount.Add(-1) } func (d *Dispatcher) Enqueue(doc Document) error { return d.enqueue(documentOp{document: doc}, false) } func (d *Dispatcher) Stats() Stats { d.statsMu.Lock() defer d.statsMu.Unlock() - d.stats.Inflight = atomic.LoadInt64(&d.inflightCount) + d.stats.Inflight = d.inflightCount.Load() return d.stats } diff --git a/client/go/internal/vespa/document/throttler.go b/client/go/internal/vespa/document/throttler.go index 667a10d28e3..e32fb804b23 100644 --- a/client/go/internal/vespa/document/throttler.go +++ b/client/go/internal/vespa/document/throttler.go @@ -23,11 +23,11 @@ type Throttler interface { type dynamicThrottler struct { minInflight int64 maxInflight int64 - targetInflight int64 - targetTimesTen int64 + targetInflight atomic.Int64 + targetTimesTen atomic.Int64 throughputs []float64 - ok int64 + ok atomic.Int64 sent int64 start time.Time @@ -39,23 +39,24 @@ func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler { minInflight = 16 * int64(connections) maxInflight = 256 * minInflight // 4096 max streams per connection on the server side ) - return &dynamicThrottler{ - minInflight: minInflight, - maxInflight: maxInflight, - targetInflight: 8 * minInflight, - targetTimesTen: 10 * maxInflight, + t := &dynamicThrottler{ + minInflight: minInflight, + maxInflight: maxInflight, throughputs: make([]float64, 128), start: nowFunc(), now: nowFunc, } + t.targetInflight.Store(8 * minInflight) + t.targetTimesTen.Store(10 * maxInflight) + return t } func NewThrottler(connections int) Throttler { return newThrottler(connections, time.Now) } func (t *dynamicThrottler) Sent() { - currentInflight := atomic.LoadInt64(&t.targetInflight) + currentInflight := t.targetInflight.Load() t.sent++ if t.sent*t.sent*t.sent < 100*currentInflight*currentInflight { return @@ -64,7 +65,7 @@ func (t *dynamicThrottler) Sent() { now := t.now() elapsed := now.Sub(t.start) t.start = now - currentThroughput := float64(atomic.SwapInt64(&t.ok, 0)) / float64(elapsed) + currentThroughput := float64(t.ok.Swap(0)) / float64(elapsed) // Use buckets for throughput over inflight, along the log-scale, in [minInflight, maxInflight). index := int(float64(len(t.throughputs)) * math.Log(max(1, min(255, float64(currentInflight)/float64(t.minInflight)))) / math.Log(256)) @@ -85,20 +86,20 @@ func (t *dynamicThrottler) Sent() { } } target := int64((rand.Float64()*0.20 + 0.92) * choice) // Random walk, skewed towards increase - atomic.StoreInt64(&t.targetInflight, max(t.minInflight, min(t.maxInflight, target))) + t.targetInflight.Store(max(t.minInflight, min(t.maxInflight, target))) } func (t *dynamicThrottler) Success() { - atomic.AddInt64(&t.targetTimesTen, 1) - atomic.AddInt64(&t.ok, 1) + t.targetTimesTen.Add(1) + t.ok.Add(1) } func (t *dynamicThrottler) Throttled(inflight int64) { - atomic.StoreInt64(&t.targetTimesTen, max(inflight*5, t.minInflight*10)) + t.targetTimesTen.Store(max(inflight*5, t.minInflight*10)) } func (t *dynamicThrottler) TargetInflight() int64 { - staticTargetInflight := min(t.maxInflight, atomic.LoadInt64(&t.targetTimesTen)/10) - targetInflight := atomic.LoadInt64(&t.targetInflight) + staticTargetInflight := min(t.maxInflight, t.targetTimesTen.Load()/10) + targetInflight := t.targetInflight.Load() return min(staticTargetInflight, targetInflight) } |