diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-04-05 10:41:18 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-04-11 10:27:09 +0200 |
commit | 071da64bb540ded7f56b83cf68419d885184079b (patch) | |
tree | e103b38c08cd4c49e64640bc2cee26b54bc5c6c1 | |
parent | 724ca1704c58d9b40da0f0730cefd63557c04215 (diff) |
Adjust min inflight based on connection count
-rw-r--r-- | client/go/internal/cli/cmd/feed.go | 2 | ||||
-rw-r--r-- | client/go/internal/vespa/document/dispatcher_test.go | 6 | ||||
-rw-r--r-- | client/go/internal/vespa/document/throttler.go | 41 | ||||
-rw-r--r-- | client/go/internal/vespa/document/throttler_test.go | 6 |
4 files changed, 29 insertions, 26 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index 4f43839b4fe..ff8b0bc0c0f 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -73,7 +73,7 @@ func feed(r io.Reader, cli *CLI, verbose bool, connections int) error { client := document.NewClient(document.ClientOptions{ BaseURL: service.BaseURL, }, clients) - throttler := document.NewThrottler() + throttler := document.NewThrottler(connections) // TODO(mpolden): Make doom duration configurable circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0) errWriter := io.Discard diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go index fc96adabc96..80bc5f603ae 100644 --- a/client/go/internal/vespa/document/dispatcher_test.go +++ b/client/go/internal/vespa/document/dispatcher_test.go @@ -39,7 +39,7 @@ func (f *mockFeeder) Send(doc Document) Result { func TestDispatcher(t *testing.T) { feeder := &mockFeeder{} clock := &manualClock{tick: time.Second} - throttler := newThrottler(clock.now) + throttler := newThrottler(8, clock.now) breaker := NewCircuitBreaker(time.Second, 0) dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard) docs := []Document{ @@ -72,7 +72,7 @@ func TestDispatcherOrdering(t *testing.T) { {Id: mustParseId("id:ns:type::doc9"), Operation: OperationPut}, } clock := &manualClock{tick: time.Second} - throttler := newThrottler(clock.now) + throttler := newThrottler(8, clock.now) breaker := NewCircuitBreaker(time.Second, 0) dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard) for _, d := range docs { @@ -108,7 +108,7 @@ func TestDispatcherOrderingWithFailures(t *testing.T) { } feeder.failAfterN(2) clock := &manualClock{tick: time.Second} - throttler := newThrottler(clock.now) + throttler := newThrottler(8, clock.now) breaker := NewCircuitBreaker(time.Second, 0) dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard) for _, d := range docs { diff --git a/client/go/internal/vespa/document/throttler.go b/client/go/internal/vespa/document/throttler.go index 69bb7c8d7ac..5b0aab6174e 100644 --- a/client/go/internal/vespa/document/throttler.go +++ b/client/go/internal/vespa/document/throttler.go @@ -7,13 +7,7 @@ import ( "time" ) -const ( - throttlerWeight = 0.7 - // TODO(mpolden): Multiply this by connections per endpoint, and number of endpoints when this becomes configurable - // for local target - throttlerMinInflight = 16 - throttlerMaxInflight = 256 * throttlerMinInflight // 4096 max streams per connection on the server side -) +const throttlerWeight = 0.7 type Throttler interface { // Sent notifies the the throttler that a document has been sent. @@ -27,29 +21,38 @@ type Throttler interface { } type dynamicThrottler struct { - ok int64 + minInflight int64 + maxInflight int64 targetInflight int64 targetTimesTen int64 throughputs []float64 + ok int64 sent int64 start time.Time now func() time.Time } -func newThrottler(nowFunc func() time.Time) *dynamicThrottler { +func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler { + var ( + minInflight = 16 * int64(connections) + maxInflight = 256 * minInflight // 4096 max streams per connection on the server side + ) return &dynamicThrottler{ + minInflight: minInflight, + maxInflight: maxInflight, + targetInflight: 8 * minInflight, + targetTimesTen: 10 * maxInflight, + throughputs: make([]float64, 128), - start: nowFunc(), - now: nowFunc, - targetInflight: 8 * throttlerMinInflight, - targetTimesTen: 10 * throttlerMaxInflight, + start: nowFunc(), + now: nowFunc, } } -func NewThrottler() Throttler { return newThrottler(time.Now) } +func NewThrottler(connections int) Throttler { return newThrottler(connections, time.Now) } func (t *dynamicThrottler) Sent() { currentInflight := atomic.LoadInt64(&t.targetInflight) @@ -64,7 +67,7 @@ func (t *dynamicThrottler) Sent() { currentThroughput := float64(atomic.SwapInt64(&t.ok, 0)) / float64(elapsed) // Use buckets for throughput over inflight, along the log-scale, in [minInflight, maxInflight). - index := int(float64(len(t.throughputs)) * math.Log(max(1, min(255, float64(currentInflight)/throttlerMinInflight))) / math.Log(256)) + index := int(float64(len(t.throughputs)) * math.Log(max(1, min(255, float64(currentInflight)/float64(t.minInflight)))) / math.Log(256)) t.throughputs[index] = currentThroughput // Loop over throughput measurements and pick the one which optimises throughput and latency. @@ -74,7 +77,7 @@ func (t *dynamicThrottler) Sent() { if t.throughputs[i] == 0 { continue // Skip unknown values } - inflight := float64(throttlerMinInflight) * math.Pow(256, (float64(i)+0.5)/float64(len(t.throughputs))) + inflight := float64(t.minInflight) * math.Pow(256, (float64(i)+0.5)/float64(len(t.throughputs))) objective := t.throughputs[i] * math.Pow(inflight, throttlerWeight-1) // Optimise throughput (weight), but also latency (1 - weight) if objective > maxObjective { maxObjective = objective @@ -82,7 +85,7 @@ func (t *dynamicThrottler) Sent() { } } target := int64((rand.Float64()*0.20 + 0.92) * choice) // Random walk, skewed towards increase - atomic.StoreInt64(&t.targetInflight, max(throttlerMinInflight, min(throttlerMaxInflight, target))) + atomic.StoreInt64(&t.targetInflight, max(t.minInflight, min(t.maxInflight, target))) } func (t *dynamicThrottler) Success() { @@ -91,11 +94,11 @@ func (t *dynamicThrottler) Success() { } func (t *dynamicThrottler) Throttled(inflight int64) { - atomic.StoreInt64(&t.targetTimesTen, max(inflight*5, throttlerMinInflight*10)) + atomic.StoreInt64(&t.targetTimesTen, max(inflight*5, t.minInflight*10)) } func (t *dynamicThrottler) TargetInflight() int64 { - staticTargetInflight := min(throttlerMaxInflight, atomic.LoadInt64(&t.targetTimesTen)/10) + staticTargetInflight := min(t.maxInflight, atomic.LoadInt64(&t.targetTimesTen)/10) targetInflight := atomic.LoadInt64(&t.targetInflight) return min(staticTargetInflight, targetInflight) } diff --git a/client/go/internal/vespa/document/throttler_test.go b/client/go/internal/vespa/document/throttler_test.go index 2fd1e73a45a..a22f059207f 100644 --- a/client/go/internal/vespa/document/throttler_test.go +++ b/client/go/internal/vespa/document/throttler_test.go @@ -7,15 +7,15 @@ import ( func TestThrottler(t *testing.T) { clock := &manualClock{tick: time.Second} - tr := newThrottler(clock.now) + tr := newThrottler(8, clock.now) for i := 0; i < 100; i++ { tr.Sent() } - if got, want := tr.TargetInflight(), int64(128); got != want { + if got, want := tr.TargetInflight(), int64(1024); got != want { t.Errorf("got TargetInflight() = %d, but want %d", got, want) } tr.Throttled(5) - if got, want := tr.TargetInflight(), int64(16); got != want { + if got, want := tr.TargetInflight(), int64(128); got != want { t.Errorf("got TargetInflight() = %d, but want %d", got, want) } } |