summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-04-05 10:41:18 +0200
committerMartin Polden <mpolden@mpolden.no>2023-04-11 10:27:09 +0200
commit071da64bb540ded7f56b83cf68419d885184079b (patch)
treee103b38c08cd4c49e64640bc2cee26b54bc5c6c1
parent724ca1704c58d9b40da0f0730cefd63557c04215 (diff)
Adjust min inflight based on connection count
-rw-r--r--client/go/internal/cli/cmd/feed.go2
-rw-r--r--client/go/internal/vespa/document/dispatcher_test.go6
-rw-r--r--client/go/internal/vespa/document/throttler.go41
-rw-r--r--client/go/internal/vespa/document/throttler_test.go6
4 files changed, 29 insertions, 26 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go
index 4f43839b4fe..ff8b0bc0c0f 100644
--- a/client/go/internal/cli/cmd/feed.go
+++ b/client/go/internal/cli/cmd/feed.go
@@ -73,7 +73,7 @@ func feed(r io.Reader, cli *CLI, verbose bool, connections int) error {
client := document.NewClient(document.ClientOptions{
BaseURL: service.BaseURL,
}, clients)
- throttler := document.NewThrottler()
+ throttler := document.NewThrottler(connections)
// TODO(mpolden): Make doom duration configurable
circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0)
errWriter := io.Discard
diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go
index fc96adabc96..80bc5f603ae 100644
--- a/client/go/internal/vespa/document/dispatcher_test.go
+++ b/client/go/internal/vespa/document/dispatcher_test.go
@@ -39,7 +39,7 @@ func (f *mockFeeder) Send(doc Document) Result {
func TestDispatcher(t *testing.T) {
feeder := &mockFeeder{}
clock := &manualClock{tick: time.Second}
- throttler := newThrottler(clock.now)
+ throttler := newThrottler(8, clock.now)
breaker := NewCircuitBreaker(time.Second, 0)
dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard)
docs := []Document{
@@ -72,7 +72,7 @@ func TestDispatcherOrdering(t *testing.T) {
{Id: mustParseId("id:ns:type::doc9"), Operation: OperationPut},
}
clock := &manualClock{tick: time.Second}
- throttler := newThrottler(clock.now)
+ throttler := newThrottler(8, clock.now)
breaker := NewCircuitBreaker(time.Second, 0)
dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard)
for _, d := range docs {
@@ -108,7 +108,7 @@ func TestDispatcherOrderingWithFailures(t *testing.T) {
}
feeder.failAfterN(2)
clock := &manualClock{tick: time.Second}
- throttler := newThrottler(clock.now)
+ throttler := newThrottler(8, clock.now)
breaker := NewCircuitBreaker(time.Second, 0)
dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard)
for _, d := range docs {
diff --git a/client/go/internal/vespa/document/throttler.go b/client/go/internal/vespa/document/throttler.go
index 69bb7c8d7ac..5b0aab6174e 100644
--- a/client/go/internal/vespa/document/throttler.go
+++ b/client/go/internal/vespa/document/throttler.go
@@ -7,13 +7,7 @@ import (
"time"
)
-const (
- throttlerWeight = 0.7
- // TODO(mpolden): Multiply this by connections per endpoint, and number of endpoints when this becomes configurable
- // for local target
- throttlerMinInflight = 16
- throttlerMaxInflight = 256 * throttlerMinInflight // 4096 max streams per connection on the server side
-)
+const throttlerWeight = 0.7
type Throttler interface {
// Sent notifies the the throttler that a document has been sent.
@@ -27,29 +21,38 @@ type Throttler interface {
}
type dynamicThrottler struct {
- ok int64
+ minInflight int64
+ maxInflight int64
targetInflight int64
targetTimesTen int64
throughputs []float64
+ ok int64
sent int64
start time.Time
now func() time.Time
}
-func newThrottler(nowFunc func() time.Time) *dynamicThrottler {
+func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler {
+ var (
+ minInflight = 16 * int64(connections)
+ maxInflight = 256 * minInflight // 4096 max streams per connection on the server side
+ )
return &dynamicThrottler{
+ minInflight: minInflight,
+ maxInflight: maxInflight,
+ targetInflight: 8 * minInflight,
+ targetTimesTen: 10 * maxInflight,
+
throughputs: make([]float64, 128),
- start: nowFunc(),
- now: nowFunc,
- targetInflight: 8 * throttlerMinInflight,
- targetTimesTen: 10 * throttlerMaxInflight,
+ start: nowFunc(),
+ now: nowFunc,
}
}
-func NewThrottler() Throttler { return newThrottler(time.Now) }
+func NewThrottler(connections int) Throttler { return newThrottler(connections, time.Now) }
func (t *dynamicThrottler) Sent() {
currentInflight := atomic.LoadInt64(&t.targetInflight)
@@ -64,7 +67,7 @@ func (t *dynamicThrottler) Sent() {
currentThroughput := float64(atomic.SwapInt64(&t.ok, 0)) / float64(elapsed)
// Use buckets for throughput over inflight, along the log-scale, in [minInflight, maxInflight).
- index := int(float64(len(t.throughputs)) * math.Log(max(1, min(255, float64(currentInflight)/throttlerMinInflight))) / math.Log(256))
+ index := int(float64(len(t.throughputs)) * math.Log(max(1, min(255, float64(currentInflight)/float64(t.minInflight)))) / math.Log(256))
t.throughputs[index] = currentThroughput
// Loop over throughput measurements and pick the one which optimises throughput and latency.
@@ -74,7 +77,7 @@ func (t *dynamicThrottler) Sent() {
if t.throughputs[i] == 0 {
continue // Skip unknown values
}
- inflight := float64(throttlerMinInflight) * math.Pow(256, (float64(i)+0.5)/float64(len(t.throughputs)))
+ inflight := float64(t.minInflight) * math.Pow(256, (float64(i)+0.5)/float64(len(t.throughputs)))
objective := t.throughputs[i] * math.Pow(inflight, throttlerWeight-1) // Optimise throughput (weight), but also latency (1 - weight)
if objective > maxObjective {
maxObjective = objective
@@ -82,7 +85,7 @@ func (t *dynamicThrottler) Sent() {
}
}
target := int64((rand.Float64()*0.20 + 0.92) * choice) // Random walk, skewed towards increase
- atomic.StoreInt64(&t.targetInflight, max(throttlerMinInflight, min(throttlerMaxInflight, target)))
+ atomic.StoreInt64(&t.targetInflight, max(t.minInflight, min(t.maxInflight, target)))
}
func (t *dynamicThrottler) Success() {
@@ -91,11 +94,11 @@ func (t *dynamicThrottler) Success() {
}
func (t *dynamicThrottler) Throttled(inflight int64) {
- atomic.StoreInt64(&t.targetTimesTen, max(inflight*5, throttlerMinInflight*10))
+ atomic.StoreInt64(&t.targetTimesTen, max(inflight*5, t.minInflight*10))
}
func (t *dynamicThrottler) TargetInflight() int64 {
- staticTargetInflight := min(throttlerMaxInflight, atomic.LoadInt64(&t.targetTimesTen)/10)
+ staticTargetInflight := min(t.maxInflight, atomic.LoadInt64(&t.targetTimesTen)/10)
targetInflight := atomic.LoadInt64(&t.targetInflight)
return min(staticTargetInflight, targetInflight)
}
diff --git a/client/go/internal/vespa/document/throttler_test.go b/client/go/internal/vespa/document/throttler_test.go
index 2fd1e73a45a..a22f059207f 100644
--- a/client/go/internal/vespa/document/throttler_test.go
+++ b/client/go/internal/vespa/document/throttler_test.go
@@ -7,15 +7,15 @@ import (
func TestThrottler(t *testing.T) {
clock := &manualClock{tick: time.Second}
- tr := newThrottler(clock.now)
+ tr := newThrottler(8, clock.now)
for i := 0; i < 100; i++ {
tr.Sent()
}
- if got, want := tr.TargetInflight(), int64(128); got != want {
+ if got, want := tr.TargetInflight(), int64(1024); got != want {
t.Errorf("got TargetInflight() = %d, but want %d", got, want)
}
tr.Throttled(5)
- if got, want := tr.TargetInflight(), int64(16); got != want {
+ if got, want := tr.TargetInflight(), int64(128); got != want {
t.Errorf("got TargetInflight() = %d, but want %d", got, want)
}
}