summaryrefslogtreecommitdiffstats
path: root/client
diff options
context:
space:
mode:
authorArnstein Ressem <aressem@verizonmedia.com>2023-05-22 10:16:43 +0200
committerGitHub <noreply@github.com>2023-05-22 10:16:43 +0200
commit64e7dbfd3be5bea33646f4a1bf1e39175f0bdc51 (patch)
treebdcf52a119ebe9e038ff4e13d096f7dadb2564ce /client
parentbe02a3e03d997353cfd35b9c7337a56ef88d2a11 (diff)
parent02448ad8603d18c8df9bd85dd0eeb8af1c3e6946 (diff)
Merge pull request #27161 from vespa-engine/mpolden/go119
Require Go 1.19
Diffstat (limited to 'client')
-rw-r--r--client/go/go.mod6
-rw-r--r--client/go/go.sum4
-rw-r--r--client/go/internal/vespa/document/dispatcher.go12
-rw-r--r--client/go/internal/vespa/document/throttler.go33
4 files changed, 28 insertions, 27 deletions
diff --git a/client/go/go.mod b/client/go/go.mod
index f5f923cc063..5d1f6175e55 100644
--- a/client/go/go.mod
+++ b/client/go/go.mod
@@ -1,13 +1,13 @@
module github.com/vespa-engine/vespa/client/go
-go 1.18
+go 1.19
require (
github.com/alessio/shellescape v1.4.1
github.com/briandowns/spinner v1.23.0
github.com/fatih/color v1.15.0
- // This is the most recent version compatible with Go 1.18. Upgrade when we upgrade our Go version
- github.com/go-json-experiment/json v0.0.0-20220727223814-4987ed27d447
+ // This is the most recent version compatible with Go 1.19. Upgrade when we upgrade our Go version
+ github.com/go-json-experiment/json v0.0.0-20230216065249-540f01442424
github.com/klauspost/compress v1.16.5
github.com/mattn/go-colorable v0.1.13
github.com/mattn/go-isatty v0.0.18
diff --git a/client/go/go.sum b/client/go/go.sum
index 861c8725ed0..03206b0c5e8 100644
--- a/client/go/go.sum
+++ b/client/go/go.sum
@@ -11,8 +11,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs=
github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw=
-github.com/go-json-experiment/json v0.0.0-20220727223814-4987ed27d447 h1:hDdASyrtiSuQvaafDrVTX34wy4ibhxrJO9/vyFbBt0k=
-github.com/go-json-experiment/json v0.0.0-20220727223814-4987ed27d447/go.mod h1:jbpkervfdK2HCcB2YEFmwYeaq057KFiaaKTNTHV4OOQ=
+github.com/go-json-experiment/json v0.0.0-20230216065249-540f01442424 h1:I1EK0t+BDH+kvlozNqrvzKqsWeM2QUKxXH0iW2fjDDw=
+github.com/go-json-experiment/json v0.0.0-20230216065249-540f01442424/go.mod h1:I+I5/LT2lLP0eZsBNaVDrOrYASx9h7o7mRHmy+535/A=
github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk=
github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index 7237a87b7e2..2ad5b841616 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -25,7 +25,7 @@ type Dispatcher struct {
msgs chan string
inflight map[string]*Queue[documentOp]
- inflightCount int64
+ inflightCount atomic.Int64
output io.Writer
verbose bool
@@ -76,7 +76,7 @@ func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool {
}
if result.HTTPStatus == 429 || result.HTTPStatus == 503 {
d.msgs <- fmt.Sprintf("feed: %s was throttled with status %d: retrying", op.document, result.HTTPStatus)
- d.throttler.Throttled(atomic.LoadInt64(&d.inflightCount))
+ d.throttler.Throttled(d.inflightCount.Load())
return true
}
if result.Err != nil || result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 {
@@ -226,20 +226,20 @@ func (d *Dispatcher) acceptDocument() bool {
}
func (d *Dispatcher) acquireSlot() {
- for atomic.LoadInt64(&d.inflightCount) >= d.throttler.TargetInflight() {
+ for d.inflightCount.Load() >= d.throttler.TargetInflight() {
time.Sleep(time.Millisecond)
}
- atomic.AddInt64(&d.inflightCount, 1)
+ d.inflightCount.Add(1)
}
-func (d *Dispatcher) releaseSlot() { atomic.AddInt64(&d.inflightCount, -1) }
+func (d *Dispatcher) releaseSlot() { d.inflightCount.Add(-1) }
func (d *Dispatcher) Enqueue(doc Document) error { return d.enqueue(documentOp{document: doc}, false) }
func (d *Dispatcher) Stats() Stats {
d.statsMu.Lock()
defer d.statsMu.Unlock()
- d.stats.Inflight = atomic.LoadInt64(&d.inflightCount)
+ d.stats.Inflight = d.inflightCount.Load()
return d.stats
}
diff --git a/client/go/internal/vespa/document/throttler.go b/client/go/internal/vespa/document/throttler.go
index 667a10d28e3..e32fb804b23 100644
--- a/client/go/internal/vespa/document/throttler.go
+++ b/client/go/internal/vespa/document/throttler.go
@@ -23,11 +23,11 @@ type Throttler interface {
type dynamicThrottler struct {
minInflight int64
maxInflight int64
- targetInflight int64
- targetTimesTen int64
+ targetInflight atomic.Int64
+ targetTimesTen atomic.Int64
throughputs []float64
- ok int64
+ ok atomic.Int64
sent int64
start time.Time
@@ -39,23 +39,24 @@ func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler {
minInflight = 16 * int64(connections)
maxInflight = 256 * minInflight // 4096 max streams per connection on the server side
)
- return &dynamicThrottler{
- minInflight: minInflight,
- maxInflight: maxInflight,
- targetInflight: 8 * minInflight,
- targetTimesTen: 10 * maxInflight,
+ t := &dynamicThrottler{
+ minInflight: minInflight,
+ maxInflight: maxInflight,
throughputs: make([]float64, 128),
start: nowFunc(),
now: nowFunc,
}
+ t.targetInflight.Store(8 * minInflight)
+ t.targetTimesTen.Store(10 * maxInflight)
+ return t
}
func NewThrottler(connections int) Throttler { return newThrottler(connections, time.Now) }
func (t *dynamicThrottler) Sent() {
- currentInflight := atomic.LoadInt64(&t.targetInflight)
+ currentInflight := t.targetInflight.Load()
t.sent++
if t.sent*t.sent*t.sent < 100*currentInflight*currentInflight {
return
@@ -64,7 +65,7 @@ func (t *dynamicThrottler) Sent() {
now := t.now()
elapsed := now.Sub(t.start)
t.start = now
- currentThroughput := float64(atomic.SwapInt64(&t.ok, 0)) / float64(elapsed)
+ currentThroughput := float64(t.ok.Swap(0)) / float64(elapsed)
// Use buckets for throughput over inflight, along the log-scale, in [minInflight, maxInflight).
index := int(float64(len(t.throughputs)) * math.Log(max(1, min(255, float64(currentInflight)/float64(t.minInflight)))) / math.Log(256))
@@ -85,20 +86,20 @@ func (t *dynamicThrottler) Sent() {
}
}
target := int64((rand.Float64()*0.20 + 0.92) * choice) // Random walk, skewed towards increase
- atomic.StoreInt64(&t.targetInflight, max(t.minInflight, min(t.maxInflight, target)))
+ t.targetInflight.Store(max(t.minInflight, min(t.maxInflight, target)))
}
func (t *dynamicThrottler) Success() {
- atomic.AddInt64(&t.targetTimesTen, 1)
- atomic.AddInt64(&t.ok, 1)
+ t.targetTimesTen.Add(1)
+ t.ok.Add(1)
}
func (t *dynamicThrottler) Throttled(inflight int64) {
- atomic.StoreInt64(&t.targetTimesTen, max(inflight*5, t.minInflight*10))
+ t.targetTimesTen.Store(max(inflight*5, t.minInflight*10))
}
func (t *dynamicThrottler) TargetInflight() int64 {
- staticTargetInflight := min(t.maxInflight, atomic.LoadInt64(&t.targetTimesTen)/10)
- targetInflight := atomic.LoadInt64(&t.targetInflight)
+ staticTargetInflight := min(t.maxInflight, t.targetTimesTen.Load()/10)
+ targetInflight := t.targetInflight.Load()
return min(staticTargetInflight, targetInflight)
}