summaryrefslogtreecommitdiffstats
path: root/client/go
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-03-29 17:22:35 +0100
committerMartin Polden <mpolden@mpolden.no>2023-03-29 17:22:35 +0100
commit95dbd20b3dc32e3512f9941245b91bbc3f6ad5ac (patch)
tree740876570aedf1879b8c4f26891bd3b510b56389 /client/go
parent70211538e30cd49226d09d4e65ba17ff40ec2432 (diff)
Reapply "Add throttling to vespa feed"
This reverts commit f9f0cdad3c6721d06a7833be07d6280c68347263.
Diffstat (limited to 'client/go')
-rw-r--r--client/go/internal/cli/cmd/feed.go8
-rw-r--r--client/go/internal/vespa/document/circuit_breaker.go75
-rw-r--r--client/go/internal/vespa/document/circuit_breaker_test.go52
-rw-r--r--client/go/internal/vespa/document/dispatcher.go158
-rw-r--r--client/go/internal/vespa/document/dispatcher_test.go53
-rw-r--r--client/go/internal/vespa/document/feeder.go25
-rw-r--r--client/go/internal/vespa/document/feeder_test.go2
-rw-r--r--client/go/internal/vespa/document/http.go61
-rw-r--r--client/go/internal/vespa/document/http_test.go10
-rw-r--r--client/go/internal/vespa/document/throttler.go117
-rw-r--r--client/go/internal/vespa/document/throttler_test.go21
11 files changed, 453 insertions, 129 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go
index f273c5aa826..c8e032929b8 100644
--- a/client/go/internal/cli/cmd/feed.go
+++ b/client/go/internal/cli/cmd/feed.go
@@ -13,6 +13,7 @@ import (
)
func addFeedFlags(cmd *cobra.Command, concurrency *int) {
+ // TOOD(mpolden): Remove this flag
cmd.PersistentFlags().IntVarP(concurrency, "concurrency", "T", 64, "Number of goroutines to use for dispatching")
}
@@ -58,7 +59,10 @@ func feed(r io.Reader, cli *CLI, concurrency int) error {
client := document.NewClient(document.ClientOptions{
BaseURL: service.BaseURL,
}, service)
- dispatcher := document.NewDispatcher(client, concurrency)
+ throttler := document.NewThrottler()
+ // TODO(mpolden): Make doom duration configurable
+ circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0)
+ dispatcher := document.NewDispatcher(client, throttler, circuitBreaker)
dec := document.NewDecoder(r)
start := cli.now()
@@ -78,7 +82,7 @@ func feed(r io.Reader, cli *CLI, concurrency int) error {
return err
}
elapsed := cli.now().Sub(start)
- return writeSummaryJSON(cli.Stdout, client.Stats(), elapsed)
+ return writeSummaryJSON(cli.Stdout, dispatcher.Stats(), elapsed)
}
type number float32
diff --git a/client/go/internal/vespa/document/circuit_breaker.go b/client/go/internal/vespa/document/circuit_breaker.go
new file mode 100644
index 00000000000..aff15e88069
--- /dev/null
+++ b/client/go/internal/vespa/document/circuit_breaker.go
@@ -0,0 +1,75 @@
+package document
+
+import (
+ "math"
+ "sync/atomic"
+ "time"
+)
+
+type CircuitState int
+
+const (
+ // CircuitClosed represents a closed circuit. Documents are processed successfully
+ CircuitClosed CircuitState = iota
+ // CircuitHalfOpen represents a half-open circuit. Some errors have happend, but processing may still recover
+ CircuitHalfOpen
+ // CircuitOpen represents a open circuit. Something is broken. We should no longer process documents
+ CircuitOpen
+)
+
+type CircuitBreaker interface {
+ Success()
+ Error(error)
+ State() CircuitState
+}
+
+type timeCircuitBreaker struct {
+ graceDuration time.Duration
+ doomDuration time.Duration
+
+ failingSinceMillis atomic.Int64
+ lastError atomic.Value
+ halfOpen atomic.Bool
+ open atomic.Bool
+
+ now func() time.Time
+}
+
+func (b *timeCircuitBreaker) Success() {
+ b.failingSinceMillis.Store(math.MaxInt64)
+ if !b.open.Load() {
+ b.halfOpen.CompareAndSwap(true, false)
+ }
+}
+
+func (b *timeCircuitBreaker) Error(err error) {
+ if b.failingSinceMillis.CompareAndSwap(math.MaxInt64, b.now().UnixMilli()) {
+ b.lastError.Store(err)
+ }
+}
+
+func (b *timeCircuitBreaker) State() CircuitState {
+ failingDuration := b.now().Sub(time.UnixMilli(b.failingSinceMillis.Load()))
+ if failingDuration > b.graceDuration {
+ b.halfOpen.CompareAndSwap(false, true)
+ }
+ if b.doomDuration > 0 && failingDuration > b.doomDuration {
+ b.open.CompareAndSwap(false, true)
+ }
+ if b.open.Load() {
+ return CircuitOpen
+ } else if b.halfOpen.Load() {
+ return CircuitHalfOpen
+ }
+ return CircuitClosed
+}
+
+func NewCircuitBreaker(graceDuration, doomDuration time.Duration) CircuitBreaker {
+ b := &timeCircuitBreaker{
+ graceDuration: graceDuration,
+ doomDuration: doomDuration,
+ now: time.Now,
+ }
+ b.failingSinceMillis.Store(math.MaxInt64)
+ return b
+}
diff --git a/client/go/internal/vespa/document/circuit_breaker_test.go b/client/go/internal/vespa/document/circuit_breaker_test.go
new file mode 100644
index 00000000000..99dd057438d
--- /dev/null
+++ b/client/go/internal/vespa/document/circuit_breaker_test.go
@@ -0,0 +1,52 @@
+package document
+
+import (
+ "errors"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestCircuitBreaker(t *testing.T) {
+ clock := &manualClock{}
+ breaker := &timeCircuitBreaker{
+ graceDuration: time.Second,
+ doomDuration: time.Minute,
+ now: clock.now,
+ }
+ err := errors.New("error")
+
+ assert.Equal(t, CircuitClosed, breaker.State(), "Initial state is closed")
+
+ clock.advance(100 * time.Second)
+ assert.Equal(t, CircuitClosed, breaker.State(), "State is closed after some time without activity")
+
+ breaker.Success()
+ assert.Equal(t, CircuitClosed, breaker.State(), "State is closed after a success")
+
+ clock.advance(100 * time.Second)
+ assert.Equal(t, CircuitClosed, breaker.State(), "State is closed some time after a success")
+
+ breaker.Error(err)
+ assert.Equal(t, CircuitClosed, breaker.State(), "State is closed right after a failure")
+
+ clock.advance(time.Second)
+ assert.Equal(t, CircuitClosed, breaker.State(), "State is closed until grace duration has passed")
+
+ clock.advance(time.Millisecond)
+ assert.Equal(t, CircuitHalfOpen, breaker.State(), "State is half-open when grace duration has passed")
+
+ breaker.Success()
+ assert.Equal(t, CircuitClosed, breaker.State(), "State is closed after a new success")
+
+ breaker.Error(err)
+ clock.advance(time.Minute)
+ assert.Equal(t, CircuitHalfOpen, breaker.State(), "State is half-open until doom duration has passed")
+
+ clock.advance(time.Millisecond)
+ assert.Equal(t, CircuitOpen, breaker.State(), "State is open when doom duration has passed")
+
+ breaker.Success()
+ assert.Equal(t, CircuitOpen, breaker.State(), "State remains open in spite of new successes")
+}
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index feb562a241a..a65f16c9298 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -3,25 +3,33 @@ package document
import (
"fmt"
"sync"
+ "sync/atomic"
+ "time"
)
const maxAttempts = 10
// Dispatcher dispatches documents from a queue to a Feeder.
type Dispatcher struct {
- workers int
- feeder Feeder
- ready chan Id
- inflight map[string]*documentGroup
+ feeder Feeder
+ throttler Throttler
+ circuitBreaker CircuitBreaker
+ stats Stats
+
+ closed bool
+ ready chan Id
+ results chan Result
+ inflight map[string]*documentGroup
+ inflightCount atomic.Int64
+
mu sync.RWMutex
wg sync.WaitGroup
- closed bool
+ resultWg sync.WaitGroup
}
// documentGroup holds document operations which share their ID, and must be dispatched in order.
type documentGroup struct {
id Id
- failed bool
operations []documentOp
mu sync.Mutex
}
@@ -37,68 +45,97 @@ func (g *documentGroup) append(op documentOp) {
g.operations = append(g.operations, op)
}
-func NewDispatcher(feeder Feeder, workers int) *Dispatcher {
- if workers < 1 {
- workers = 1
- }
+func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker) *Dispatcher {
d := &Dispatcher{
- workers: workers,
- feeder: feeder,
- inflight: make(map[string]*documentGroup),
+ feeder: feeder,
+ throttler: throttler,
+ circuitBreaker: breaker,
+ inflight: make(map[string]*documentGroup),
}
d.start()
return d
}
-func (d *Dispatcher) dispatchAll(g *documentGroup) int {
+func (d *Dispatcher) dispatchAll(g *documentGroup) {
g.mu.Lock()
defer g.mu.Unlock()
- failCount := len(g.operations)
- for i := 0; !g.failed && i < len(g.operations); i++ {
+ for i := 0; i < len(g.operations); i++ {
op := g.operations[i]
ok := false
- for op.attempts <= maxAttempts && !ok {
- op.attempts += 1
- // TODO(mpolden): Extract function which does throttling/circuit-breaking
+ for !ok {
+ op.attempts++
result := d.feeder.Send(op.document)
+ d.results <- result
ok = result.Status.Success()
+ if !d.shouldRetry(op, result) {
+ break
+ }
}
- if ok {
- failCount--
- } else {
- g.failed = true
- failCount = len(g.operations) - i
- }
+ d.releaseSlot()
}
g.operations = nil
- return failCount
+}
+
+func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool {
+ if result.HTTPStatus/100 == 2 || result.HTTPStatus == 404 || result.HTTPStatus == 412 {
+ d.throttler.Success()
+ d.circuitBreaker.Success()
+ return false
+ }
+ if result.HTTPStatus == 429 || result.HTTPStatus == 503 {
+ d.throttler.Throttled(d.inflightCount.Load())
+ return true
+ }
+ if result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 {
+ d.circuitBreaker.Error(fmt.Errorf("request failed with status %d", result.HTTPStatus))
+ if op.attempts <= maxAttempts {
+ return true
+ }
+ }
+ return false
}
func (d *Dispatcher) start() {
d.mu.Lock()
defer d.mu.Unlock()
+ d.ready = make(chan Id, 4096)
+ d.results = make(chan Result, 4096)
d.closed = false
- d.ready = make(chan Id, 4*d.workers)
- for i := 0; i < d.workers; i++ {
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
- for id := range d.ready {
- d.mu.RLock()
- group := d.inflight[id.String()]
- d.mu.RUnlock()
- if group != nil {
- failedDocs := d.dispatchAll(group)
- d.feeder.AddStats(Stats{Errors: int64(failedDocs)})
- }
- }
- }()
+ d.wg.Add(1)
+ go func() {
+ defer d.wg.Done()
+ d.readDocuments()
+ }()
+ d.resultWg.Add(1)
+ go func() {
+ defer d.resultWg.Done()
+ d.readResults()
+ }()
+}
+
+func (d *Dispatcher) readDocuments() {
+ for id := range d.ready {
+ d.mu.RLock()
+ group := d.inflight[id.String()]
+ d.mu.RUnlock()
+ if group != nil {
+ d.wg.Add(1)
+ go func() {
+ defer d.wg.Done()
+ d.dispatchAll(group)
+ }()
+ }
+ }
+}
+
+func (d *Dispatcher) readResults() {
+ for result := range d.results {
+ d.stats.Add(result.Stats)
}
}
func (d *Dispatcher) Enqueue(doc Document) error {
d.mu.Lock()
- defer d.mu.Unlock()
if d.closed {
return fmt.Errorf("dispatcher is closed")
}
@@ -112,18 +149,43 @@ func (d *Dispatcher) Enqueue(doc Document) error {
}
d.inflight[doc.Id.String()] = group
}
- d.ready <- doc.Id
+ d.mu.Unlock()
+ d.enqueueWithSlot(doc.Id)
return nil
}
-// Close closes the dispatcher and waits for all inflight operations to complete.
-func (d *Dispatcher) Close() error {
+func (d *Dispatcher) Stats() Stats { return d.stats }
+
+func (d *Dispatcher) enqueueWithSlot(id Id) {
+ d.acquireSlot()
+ d.ready <- id
+ d.throttler.Sent()
+}
+
+func (d *Dispatcher) acquireSlot() {
+ for d.inflightCount.Load() >= d.throttler.TargetInflight() {
+ time.Sleep(time.Millisecond)
+ }
+ d.inflightCount.Add(1)
+}
+
+func (d *Dispatcher) releaseSlot() { d.inflightCount.Add(-1) }
+
+func closeAndWait[T any](ch chan T, wg *sync.WaitGroup, d *Dispatcher, markClosed bool) {
d.mu.Lock()
if !d.closed {
- close(d.ready)
- d.closed = true
+ close(ch)
+ if markClosed {
+ d.closed = true
+ }
}
d.mu.Unlock()
- d.wg.Wait()
+ wg.Wait()
+}
+
+// Close closes the dispatcher and waits for all inflight operations to complete.
+func (d *Dispatcher) Close() error {
+ closeAndWait(d.ready, &d.wg, d, false)
+ closeAndWait(d.results, &d.resultWg, d, true)
return nil
}
diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go
index 04e0928f2a3..8a6d8c6117c 100644
--- a/client/go/internal/vespa/document/dispatcher_test.go
+++ b/client/go/internal/vespa/document/dispatcher_test.go
@@ -3,6 +3,7 @@ package document
import (
"sync"
"testing"
+ "time"
"github.com/stretchr/testify/assert"
)
@@ -10,7 +11,6 @@ import (
type mockFeeder struct {
failAfterNDocs int
documents []Document
- stats Stats
mu sync.Mutex
}
@@ -23,24 +23,24 @@ func (f *mockFeeder) failAfterN(docs int) {
func (f *mockFeeder) Send(doc Document) Result {
f.mu.Lock()
defer f.mu.Unlock()
+ result := Result{Id: doc.Id}
if f.failAfterNDocs > 0 && len(f.documents) >= f.failAfterNDocs {
- return Result{Id: doc.Id, Status: StatusVespaFailure}
+ result.Status = StatusVespaFailure
+ } else {
+ f.documents = append(f.documents, doc)
}
- f.documents = append(f.documents, doc)
- return Result{Id: doc.Id}
-}
-
-func (f *mockFeeder) Stats() Stats { return f.stats }
-
-func (f *mockFeeder) AddStats(stats Stats) {
- f.mu.Lock()
- defer f.mu.Unlock()
- f.stats.Add(stats)
+ if !result.Status.Success() {
+ result.Stats.Errors = 1
+ }
+ return result
}
func TestDispatcher(t *testing.T) {
feeder := &mockFeeder{}
- dispatcher := NewDispatcher(feeder, 2)
+ clock := &manualClock{tick: time.Second}
+ throttler := newThrottler(clock.now)
+ breaker := NewCircuitBreaker(time.Second, 0)
+ dispatcher := NewDispatcher(feeder, throttler, breaker)
docs := []Document{
{Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{"foo": "123"}}`)},
{Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Body: []byte(`{"fields":{"bar": "456"}}`)},
@@ -70,7 +70,10 @@ func TestDispatcherOrdering(t *testing.T) {
{Id: mustParseId("id:ns:type::doc8"), Operation: OperationPut},
{Id: mustParseId("id:ns:type::doc9"), Operation: OperationPut},
}
- dispatcher := NewDispatcher(feeder, len(docs))
+ clock := &manualClock{tick: time.Second}
+ throttler := newThrottler(clock.now)
+ breaker := NewCircuitBreaker(time.Second, 0)
+ dispatcher := NewDispatcher(feeder, throttler, breaker)
for _, d := range docs {
dispatcher.Enqueue(d)
}
@@ -90,7 +93,7 @@ func TestDispatcherOrdering(t *testing.T) {
}
assert.Equal(t, len(docs), len(feeder.documents))
assert.Equal(t, wantDocs, gotDocs)
- assert.Equal(t, int64(0), feeder.Stats().Errors)
+ assert.Equal(t, int64(0), dispatcher.Stats().Errors)
}
func TestDispatcherOrderingWithFailures(t *testing.T) {
@@ -103,26 +106,26 @@ func TestDispatcherOrderingWithFailures(t *testing.T) {
{Id: commonId, Operation: OperationRemove}, // fails
}
feeder.failAfterN(2)
- dispatcher := NewDispatcher(feeder, len(docs))
+ clock := &manualClock{tick: time.Second}
+ throttler := newThrottler(clock.now)
+ breaker := NewCircuitBreaker(time.Second, 0)
+ dispatcher := NewDispatcher(feeder, throttler, breaker)
for _, d := range docs {
dispatcher.Enqueue(d)
}
dispatcher.Close()
wantDocs := docs[:2]
assert.Equal(t, wantDocs, feeder.documents)
- assert.Equal(t, int64(2), feeder.Stats().Errors)
+ assert.Equal(t, int64(2), dispatcher.Stats().Errors)
- // Dispatching more documents for same ID fails implicitly
+ // Dispatching more documents for same ID succeed
feeder.failAfterN(0)
dispatcher.start()
dispatcher.Enqueue(Document{Id: commonId, Operation: OperationPut})
dispatcher.Enqueue(Document{Id: commonId, Operation: OperationRemove})
- // Other IDs are fine
- doc2 := Document{Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut}
- doc3 := Document{Id: mustParseId("id:ns:type::doc3"), Operation: OperationPut}
- dispatcher.Enqueue(doc2)
- dispatcher.Enqueue(doc3)
+ dispatcher.Enqueue(Document{Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut})
+ dispatcher.Enqueue(Document{Id: mustParseId("id:ns:type::doc3"), Operation: OperationPut})
dispatcher.Close()
- assert.Equal(t, int64(4), feeder.Stats().Errors)
- assert.Equal(t, 4, len(feeder.documents))
+ assert.Equal(t, int64(2), dispatcher.Stats().Errors)
+ assert.Equal(t, 6, len(feeder.documents))
}
diff --git a/client/go/internal/vespa/document/feeder.go b/client/go/internal/vespa/document/feeder.go
index 6996e649d24..8bdd5bca5ba 100644
--- a/client/go/internal/vespa/document/feeder.go
+++ b/client/go/internal/vespa/document/feeder.go
@@ -23,17 +23,19 @@ const (
// Result represents the result of a feeding operation.
type Result struct {
- Id Id
- Status Status
- Message string
- Trace string
- Err error
+ Id Id
+ Status Status
+ HTTPStatus int
+ Message string
+ Trace string
+ Err error
+ Stats Stats
}
// Success returns whether status s is considered a success.
func (s Status) Success() bool { return s == StatusSuccess || s == StatusConditionNotMet }
-// Stats represents the summed statistics of a feeder.
+// Stats represents feeding operation statistics.
type Stats struct {
Requests int64
Responses int64
@@ -47,8 +49,6 @@ type Stats struct {
BytesRecv int64
}
-func NewStats() Stats { return Stats{ResponsesByCode: make(map[int]int64)} }
-
// AvgLatency returns the average latency for a request.
func (s Stats) AvgLatency() time.Duration {
requests := s.Requests
@@ -69,6 +69,9 @@ func (s Stats) Successes() int64 {
func (s *Stats) Add(other Stats) {
s.Requests += other.Requests
s.Responses += other.Responses
+ if s.ResponsesByCode == nil && other.ResponsesByCode != nil {
+ s.ResponsesByCode = make(map[int]int64)
+ }
for code, count := range other.ResponsesByCode {
_, ok := s.ResponsesByCode[code]
if ok {
@@ -91,8 +94,4 @@ func (s *Stats) Add(other Stats) {
}
// Feeder is the interface for a consumer of documents.
-type Feeder interface {
- Send(Document) Result
- Stats() Stats
- AddStats(Stats)
-}
+type Feeder interface{ Send(Document) Result }
diff --git a/client/go/internal/vespa/document/feeder_test.go b/client/go/internal/vespa/document/feeder_test.go
index 1368d871436..a7d92495889 100644
--- a/client/go/internal/vespa/document/feeder_test.go
+++ b/client/go/internal/vespa/document/feeder_test.go
@@ -7,7 +7,7 @@ import (
)
func TestStatsAdd(t *testing.T) {
- got := NewStats()
+ var got Stats
got.Add(Stats{Requests: 1})
got.Add(Stats{Requests: 1})
got.Add(Stats{Responses: 1})
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go
index 2e01d4564ab..4dadcd1d05c 100644
--- a/client/go/internal/vespa/document/http.go
+++ b/client/go/internal/vespa/document/http.go
@@ -9,7 +9,6 @@ import (
"net/url"
"strconv"
"strings"
- "sync"
"time"
"github.com/vespa-engine/vespa/client/go/internal/util"
@@ -19,8 +18,6 @@ import (
type Client struct {
options ClientOptions
httpClient util.HTTPClient
- stats Stats
- mu sync.Mutex
now func() time.Time
}
@@ -47,7 +44,6 @@ func NewClient(options ClientOptions, httpClient util.HTTPClient) *Client {
c := &Client{
options: options,
httpClient: httpClient,
- stats: NewStats(),
now: time.Now,
}
return c
@@ -116,49 +112,42 @@ func (c *Client) feedURL(d Document, queryParams url.Values) (string, *url.URL,
// Send given document the URL configured in this client.
func (c *Client) Send(document Document) Result {
start := c.now()
- stats := NewStats()
- stats.Requests = 1
- defer func() {
- latency := c.now().Sub(start)
- stats.TotalLatency = latency
- stats.MinLatency = latency
- stats.MaxLatency = latency
- c.AddStats(stats)
- }()
+ result := Result{Id: document.Id}
+ result.Stats.Requests = 1
method, url, err := c.feedURL(document, c.queryParams())
if err != nil {
- stats.Errors = 1
- return Result{Status: StatusError, Err: err}
+ result.Stats.Errors = 1
+ result.Err = err
+ return result
}
req, err := http.NewRequest(method, url.String(), bytes.NewReader(document.Body))
if err != nil {
- stats.Errors = 1
- return Result{Status: StatusError, Err: err}
+ result.Stats.Errors = 1
+ result.Status = StatusError
+ result.Err = err
+ return result
}
- resp, err := c.httpClient.Do(req, c.options.Timeout)
+ resp, err := c.httpClient.Do(req, 190*time.Second)
if err != nil {
- stats.Errors = 1
- return Result{Status: StatusTransportFailure, Err: err}
+ result.Stats.Errors = 1
+ result.Status = StatusTransportFailure
+ result.Err = err
+ return result
}
defer resp.Body.Close()
- stats.Responses = 1
- stats.ResponsesByCode = map[int]int64{
+ result.Stats.Responses = 1
+ result.Stats.ResponsesByCode = map[int]int64{
resp.StatusCode: 1,
}
- stats.BytesSent = int64(len(document.Body))
- return c.createResult(document.Id, &stats, resp)
-}
-
-func (c *Client) Stats() Stats { return c.stats }
-
-func (c *Client) AddStats(stats Stats) {
- c.mu.Lock()
- defer c.mu.Unlock()
- c.stats.Add(stats)
+ result.Stats.BytesSent = int64(len(document.Body))
+ elapsed := c.now().Sub(start)
+ result.Stats.TotalLatency = elapsed
+ result.Stats.MinLatency = elapsed
+ result.Stats.MaxLatency = elapsed
+ return c.resultWithResponse(resp, result)
}
-func (c *Client) createResult(id Id, stats *Stats, resp *http.Response) Result {
- result := Result{Id: id}
+func (c *Client) resultWithResponse(resp *http.Response, result Result) Result {
switch resp.StatusCode {
case 200:
result.Status = StatusSuccess
@@ -181,9 +170,9 @@ func (c *Client) createResult(id Id, stats *Stats, resp *http.Response) Result {
}
result.Message = body.Message
result.Trace = string(body.Trace)
- stats.BytesRecv = cr.bytesRead
+ result.Stats.BytesRecv = cr.bytesRead
if !result.Status.Success() {
- stats.Errors = 1
+ result.Stats.Errors = 1
}
return result
}
diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go
index f02c87730d5..311668fa16e 100644
--- a/client/go/internal/vespa/document/http_test.go
+++ b/client/go/internal/vespa/document/http_test.go
@@ -19,11 +19,12 @@ type manualClock struct {
}
func (c *manualClock) now() time.Time {
- t := c.t
- c.t = c.t.Add(c.tick)
- return t
+ c.advance(c.tick)
+ return c.t
}
+func (c *manualClock) advance(d time.Duration) { c.t = c.t.Add(d) }
+
func TestClientSend(t *testing.T) {
docs := []Document{
{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "123"}}`)},
@@ -37,6 +38,7 @@ func TestClientSend(t *testing.T) {
}, &httpClient)
clock := manualClock{t: time.Now(), tick: time.Second}
client.now = clock.now
+ var stats Stats
for i, doc := range docs {
if i < 2 {
httpClient.NextResponseString(200, `{"message":"All good!"}`)
@@ -44,6 +46,7 @@ func TestClientSend(t *testing.T) {
httpClient.NextResponseString(502, `{"message":"Good bye, cruel world!"}`)
}
res := client.Send(doc)
+ stats.Add(res.Stats)
if res.Err != nil {
t.Fatalf("got unexpected error %q", res.Err)
}
@@ -64,7 +67,6 @@ func TestClientSend(t *testing.T) {
t.Errorf("got r.Body = %q, want %q", string(body), string(wantBody))
}
}
- stats := client.Stats()
want := Stats{
Requests: 3,
Responses: 3,
diff --git a/client/go/internal/vespa/document/throttler.go b/client/go/internal/vespa/document/throttler.go
new file mode 100644
index 00000000000..f7c57ff97ed
--- /dev/null
+++ b/client/go/internal/vespa/document/throttler.go
@@ -0,0 +1,117 @@
+package document
+
+import (
+ "math"
+ "math/rand"
+ "sync/atomic"
+ "time"
+)
+
+const (
+ throttlerWeight = 0.7
+ // TODO(mpolden): Multiply this by connections per endpoint, and number of endpoints when this becomes configurable
+ // for local target
+ throttlerMinInflight = 16
+ throttlerMaxInflight = 256 * throttlerMinInflight // 4096 max streams per connection on the server side
+)
+
+type Throttler interface {
+ // Sent notifies the the throttler that a document has been sent.
+ Sent()
+ // Success notifies the throttler that document operation succeeded.
+ Success()
+ // Throttled notifies the throttler that a throttling event occured while count documents were in-flight.
+ Throttled(count int64)
+ // TargetInflight returns the ideal number of documents to have in-flight now.
+ TargetInflight() int64
+}
+
+type dynamicThrottler struct {
+ ok atomic.Int64
+ targetInflight atomic.Int64
+ targetTimesTen atomic.Int64
+
+ throughputs []float64
+ sent int64
+
+ start time.Time
+ now func() time.Time
+}
+
+func newThrottler(nowFunc func() time.Time) *dynamicThrottler {
+ d := &dynamicThrottler{
+ throughputs: make([]float64, 128),
+ start: nowFunc(),
+ now: nowFunc,
+ }
+ d.targetInflight.Store(8 * throttlerMinInflight)
+ d.targetTimesTen.Store(10 * throttlerMaxInflight)
+ return d
+}
+
+func NewThrottler() Throttler { return newThrottler(time.Now) }
+
+func (t *dynamicThrottler) Sent() {
+ currentInflight := t.targetInflight.Load()
+ t.sent++
+ if t.sent*t.sent*t.sent < 100*currentInflight*currentInflight {
+ return
+ }
+ t.sent = 0
+ now := t.now()
+ elapsed := now.Sub(t.start)
+ t.start = now
+ currentThroughput := float64(t.ok.Swap(0)) / float64(elapsed)
+
+ // Use buckets for throughput over inflight, along the log-scale, in [minInflight, maxInflight).
+ index := int(float64(len(t.throughputs)) * math.Log(max(1, min(255, float64(currentInflight)/throttlerMinInflight))) / math.Log(256))
+ t.throughputs[index] = currentThroughput
+
+ // Loop over throughput measurements and pick the one which optimises throughput and latency.
+ choice := float64(currentInflight)
+ maxObjective := float64(-1)
+ for i := len(t.throughputs) - 1; i >= 0; i-- {
+ if t.throughputs[i] == 0 {
+ continue // Skip unknown values
+ }
+ inflight := float64(throttlerMinInflight) * math.Pow(256, (float64(i)+0.5)/float64(len(t.throughputs)))
+ objective := t.throughputs[i] * math.Pow(inflight, throttlerWeight-1) // Optimise throughput (weight), but also latency (1 - weight)
+ if objective > maxObjective {
+ maxObjective = objective
+ choice = inflight
+ }
+ }
+ target := int64((rand.Float64()*0.20 + 0.92) * choice) // Random walk, skewed towards increase
+ t.targetInflight.Store(max(throttlerMinInflight, min(throttlerMaxInflight, target)))
+}
+
+func (t *dynamicThrottler) Success() {
+ t.targetTimesTen.Add(1)
+ t.ok.Add(1)
+}
+
+func (t *dynamicThrottler) Throttled(inflight int64) {
+ t.targetTimesTen.Store(max(inflight*5, throttlerMinInflight*10))
+}
+
+func (t *dynamicThrottler) TargetInflight() int64 {
+ staticTargetInflight := min(throttlerMaxInflight, t.targetTimesTen.Load()/10)
+ targetInflight := t.targetInflight.Load()
+ return min(staticTargetInflight, targetInflight)
+}
+
+type number interface{ float64 | int64 }
+
+func min[T number](x, y T) T {
+ if x < y {
+ return x
+ }
+ return y
+}
+
+func max[T number](x, y T) T {
+ if x > y {
+ return x
+ }
+ return y
+}
diff --git a/client/go/internal/vespa/document/throttler_test.go b/client/go/internal/vespa/document/throttler_test.go
new file mode 100644
index 00000000000..2fd1e73a45a
--- /dev/null
+++ b/client/go/internal/vespa/document/throttler_test.go
@@ -0,0 +1,21 @@
+package document
+
+import (
+ "testing"
+ "time"
+)
+
+func TestThrottler(t *testing.T) {
+ clock := &manualClock{tick: time.Second}
+ tr := newThrottler(clock.now)
+ for i := 0; i < 100; i++ {
+ tr.Sent()
+ }
+ if got, want := tr.TargetInflight(), int64(128); got != want {
+ t.Errorf("got TargetInflight() = %d, but want %d", got, want)
+ }
+ tr.Throttled(5)
+ if got, want := tr.TargetInflight(), int64(16); got != want {
+ t.Errorf("got TargetInflight() = %d, but want %d", got, want)
+ }
+}