diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-03-29 17:22:35 +0100 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-03-29 17:22:35 +0100 |
commit | 95dbd20b3dc32e3512f9941245b91bbc3f6ad5ac (patch) | |
tree | 740876570aedf1879b8c4f26891bd3b510b56389 /client/go | |
parent | 70211538e30cd49226d09d4e65ba17ff40ec2432 (diff) |
Reapply "Add throttling to vespa feed"
This reverts commit f9f0cdad3c6721d06a7833be07d6280c68347263.
Diffstat (limited to 'client/go')
-rw-r--r-- | client/go/internal/cli/cmd/feed.go | 8 | ||||
-rw-r--r-- | client/go/internal/vespa/document/circuit_breaker.go | 75 | ||||
-rw-r--r-- | client/go/internal/vespa/document/circuit_breaker_test.go | 52 | ||||
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 158 | ||||
-rw-r--r-- | client/go/internal/vespa/document/dispatcher_test.go | 53 | ||||
-rw-r--r-- | client/go/internal/vespa/document/feeder.go | 25 | ||||
-rw-r--r-- | client/go/internal/vespa/document/feeder_test.go | 2 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http.go | 61 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http_test.go | 10 | ||||
-rw-r--r-- | client/go/internal/vespa/document/throttler.go | 117 | ||||
-rw-r--r-- | client/go/internal/vespa/document/throttler_test.go | 21 |
11 files changed, 453 insertions, 129 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index f273c5aa826..c8e032929b8 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -13,6 +13,7 @@ import ( ) func addFeedFlags(cmd *cobra.Command, concurrency *int) { + // TOOD(mpolden): Remove this flag cmd.PersistentFlags().IntVarP(concurrency, "concurrency", "T", 64, "Number of goroutines to use for dispatching") } @@ -58,7 +59,10 @@ func feed(r io.Reader, cli *CLI, concurrency int) error { client := document.NewClient(document.ClientOptions{ BaseURL: service.BaseURL, }, service) - dispatcher := document.NewDispatcher(client, concurrency) + throttler := document.NewThrottler() + // TODO(mpolden): Make doom duration configurable + circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0) + dispatcher := document.NewDispatcher(client, throttler, circuitBreaker) dec := document.NewDecoder(r) start := cli.now() @@ -78,7 +82,7 @@ func feed(r io.Reader, cli *CLI, concurrency int) error { return err } elapsed := cli.now().Sub(start) - return writeSummaryJSON(cli.Stdout, client.Stats(), elapsed) + return writeSummaryJSON(cli.Stdout, dispatcher.Stats(), elapsed) } type number float32 diff --git a/client/go/internal/vespa/document/circuit_breaker.go b/client/go/internal/vespa/document/circuit_breaker.go new file mode 100644 index 00000000000..aff15e88069 --- /dev/null +++ b/client/go/internal/vespa/document/circuit_breaker.go @@ -0,0 +1,75 @@ +package document + +import ( + "math" + "sync/atomic" + "time" +) + +type CircuitState int + +const ( + // CircuitClosed represents a closed circuit. Documents are processed successfully + CircuitClosed CircuitState = iota + // CircuitHalfOpen represents a half-open circuit. Some errors have happend, but processing may still recover + CircuitHalfOpen + // CircuitOpen represents a open circuit. Something is broken. We should no longer process documents + CircuitOpen +) + +type CircuitBreaker interface { + Success() + Error(error) + State() CircuitState +} + +type timeCircuitBreaker struct { + graceDuration time.Duration + doomDuration time.Duration + + failingSinceMillis atomic.Int64 + lastError atomic.Value + halfOpen atomic.Bool + open atomic.Bool + + now func() time.Time +} + +func (b *timeCircuitBreaker) Success() { + b.failingSinceMillis.Store(math.MaxInt64) + if !b.open.Load() { + b.halfOpen.CompareAndSwap(true, false) + } +} + +func (b *timeCircuitBreaker) Error(err error) { + if b.failingSinceMillis.CompareAndSwap(math.MaxInt64, b.now().UnixMilli()) { + b.lastError.Store(err) + } +} + +func (b *timeCircuitBreaker) State() CircuitState { + failingDuration := b.now().Sub(time.UnixMilli(b.failingSinceMillis.Load())) + if failingDuration > b.graceDuration { + b.halfOpen.CompareAndSwap(false, true) + } + if b.doomDuration > 0 && failingDuration > b.doomDuration { + b.open.CompareAndSwap(false, true) + } + if b.open.Load() { + return CircuitOpen + } else if b.halfOpen.Load() { + return CircuitHalfOpen + } + return CircuitClosed +} + +func NewCircuitBreaker(graceDuration, doomDuration time.Duration) CircuitBreaker { + b := &timeCircuitBreaker{ + graceDuration: graceDuration, + doomDuration: doomDuration, + now: time.Now, + } + b.failingSinceMillis.Store(math.MaxInt64) + return b +} diff --git a/client/go/internal/vespa/document/circuit_breaker_test.go b/client/go/internal/vespa/document/circuit_breaker_test.go new file mode 100644 index 00000000000..99dd057438d --- /dev/null +++ b/client/go/internal/vespa/document/circuit_breaker_test.go @@ -0,0 +1,52 @@ +package document + +import ( + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestCircuitBreaker(t *testing.T) { + clock := &manualClock{} + breaker := &timeCircuitBreaker{ + graceDuration: time.Second, + doomDuration: time.Minute, + now: clock.now, + } + err := errors.New("error") + + assert.Equal(t, CircuitClosed, breaker.State(), "Initial state is closed") + + clock.advance(100 * time.Second) + assert.Equal(t, CircuitClosed, breaker.State(), "State is closed after some time without activity") + + breaker.Success() + assert.Equal(t, CircuitClosed, breaker.State(), "State is closed after a success") + + clock.advance(100 * time.Second) + assert.Equal(t, CircuitClosed, breaker.State(), "State is closed some time after a success") + + breaker.Error(err) + assert.Equal(t, CircuitClosed, breaker.State(), "State is closed right after a failure") + + clock.advance(time.Second) + assert.Equal(t, CircuitClosed, breaker.State(), "State is closed until grace duration has passed") + + clock.advance(time.Millisecond) + assert.Equal(t, CircuitHalfOpen, breaker.State(), "State is half-open when grace duration has passed") + + breaker.Success() + assert.Equal(t, CircuitClosed, breaker.State(), "State is closed after a new success") + + breaker.Error(err) + clock.advance(time.Minute) + assert.Equal(t, CircuitHalfOpen, breaker.State(), "State is half-open until doom duration has passed") + + clock.advance(time.Millisecond) + assert.Equal(t, CircuitOpen, breaker.State(), "State is open when doom duration has passed") + + breaker.Success() + assert.Equal(t, CircuitOpen, breaker.State(), "State remains open in spite of new successes") +} diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index feb562a241a..a65f16c9298 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -3,25 +3,33 @@ package document import ( "fmt" "sync" + "sync/atomic" + "time" ) const maxAttempts = 10 // Dispatcher dispatches documents from a queue to a Feeder. type Dispatcher struct { - workers int - feeder Feeder - ready chan Id - inflight map[string]*documentGroup + feeder Feeder + throttler Throttler + circuitBreaker CircuitBreaker + stats Stats + + closed bool + ready chan Id + results chan Result + inflight map[string]*documentGroup + inflightCount atomic.Int64 + mu sync.RWMutex wg sync.WaitGroup - closed bool + resultWg sync.WaitGroup } // documentGroup holds document operations which share their ID, and must be dispatched in order. type documentGroup struct { id Id - failed bool operations []documentOp mu sync.Mutex } @@ -37,68 +45,97 @@ func (g *documentGroup) append(op documentOp) { g.operations = append(g.operations, op) } -func NewDispatcher(feeder Feeder, workers int) *Dispatcher { - if workers < 1 { - workers = 1 - } +func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker) *Dispatcher { d := &Dispatcher{ - workers: workers, - feeder: feeder, - inflight: make(map[string]*documentGroup), + feeder: feeder, + throttler: throttler, + circuitBreaker: breaker, + inflight: make(map[string]*documentGroup), } d.start() return d } -func (d *Dispatcher) dispatchAll(g *documentGroup) int { +func (d *Dispatcher) dispatchAll(g *documentGroup) { g.mu.Lock() defer g.mu.Unlock() - failCount := len(g.operations) - for i := 0; !g.failed && i < len(g.operations); i++ { + for i := 0; i < len(g.operations); i++ { op := g.operations[i] ok := false - for op.attempts <= maxAttempts && !ok { - op.attempts += 1 - // TODO(mpolden): Extract function which does throttling/circuit-breaking + for !ok { + op.attempts++ result := d.feeder.Send(op.document) + d.results <- result ok = result.Status.Success() + if !d.shouldRetry(op, result) { + break + } } - if ok { - failCount-- - } else { - g.failed = true - failCount = len(g.operations) - i - } + d.releaseSlot() } g.operations = nil - return failCount +} + +func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool { + if result.HTTPStatus/100 == 2 || result.HTTPStatus == 404 || result.HTTPStatus == 412 { + d.throttler.Success() + d.circuitBreaker.Success() + return false + } + if result.HTTPStatus == 429 || result.HTTPStatus == 503 { + d.throttler.Throttled(d.inflightCount.Load()) + return true + } + if result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 { + d.circuitBreaker.Error(fmt.Errorf("request failed with status %d", result.HTTPStatus)) + if op.attempts <= maxAttempts { + return true + } + } + return false } func (d *Dispatcher) start() { d.mu.Lock() defer d.mu.Unlock() + d.ready = make(chan Id, 4096) + d.results = make(chan Result, 4096) d.closed = false - d.ready = make(chan Id, 4*d.workers) - for i := 0; i < d.workers; i++ { - d.wg.Add(1) - go func() { - defer d.wg.Done() - for id := range d.ready { - d.mu.RLock() - group := d.inflight[id.String()] - d.mu.RUnlock() - if group != nil { - failedDocs := d.dispatchAll(group) - d.feeder.AddStats(Stats{Errors: int64(failedDocs)}) - } - } - }() + d.wg.Add(1) + go func() { + defer d.wg.Done() + d.readDocuments() + }() + d.resultWg.Add(1) + go func() { + defer d.resultWg.Done() + d.readResults() + }() +} + +func (d *Dispatcher) readDocuments() { + for id := range d.ready { + d.mu.RLock() + group := d.inflight[id.String()] + d.mu.RUnlock() + if group != nil { + d.wg.Add(1) + go func() { + defer d.wg.Done() + d.dispatchAll(group) + }() + } + } +} + +func (d *Dispatcher) readResults() { + for result := range d.results { + d.stats.Add(result.Stats) } } func (d *Dispatcher) Enqueue(doc Document) error { d.mu.Lock() - defer d.mu.Unlock() if d.closed { return fmt.Errorf("dispatcher is closed") } @@ -112,18 +149,43 @@ func (d *Dispatcher) Enqueue(doc Document) error { } d.inflight[doc.Id.String()] = group } - d.ready <- doc.Id + d.mu.Unlock() + d.enqueueWithSlot(doc.Id) return nil } -// Close closes the dispatcher and waits for all inflight operations to complete. -func (d *Dispatcher) Close() error { +func (d *Dispatcher) Stats() Stats { return d.stats } + +func (d *Dispatcher) enqueueWithSlot(id Id) { + d.acquireSlot() + d.ready <- id + d.throttler.Sent() +} + +func (d *Dispatcher) acquireSlot() { + for d.inflightCount.Load() >= d.throttler.TargetInflight() { + time.Sleep(time.Millisecond) + } + d.inflightCount.Add(1) +} + +func (d *Dispatcher) releaseSlot() { d.inflightCount.Add(-1) } + +func closeAndWait[T any](ch chan T, wg *sync.WaitGroup, d *Dispatcher, markClosed bool) { d.mu.Lock() if !d.closed { - close(d.ready) - d.closed = true + close(ch) + if markClosed { + d.closed = true + } } d.mu.Unlock() - d.wg.Wait() + wg.Wait() +} + +// Close closes the dispatcher and waits for all inflight operations to complete. +func (d *Dispatcher) Close() error { + closeAndWait(d.ready, &d.wg, d, false) + closeAndWait(d.results, &d.resultWg, d, true) return nil } diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go index 04e0928f2a3..8a6d8c6117c 100644 --- a/client/go/internal/vespa/document/dispatcher_test.go +++ b/client/go/internal/vespa/document/dispatcher_test.go @@ -3,6 +3,7 @@ package document import ( "sync" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -10,7 +11,6 @@ import ( type mockFeeder struct { failAfterNDocs int documents []Document - stats Stats mu sync.Mutex } @@ -23,24 +23,24 @@ func (f *mockFeeder) failAfterN(docs int) { func (f *mockFeeder) Send(doc Document) Result { f.mu.Lock() defer f.mu.Unlock() + result := Result{Id: doc.Id} if f.failAfterNDocs > 0 && len(f.documents) >= f.failAfterNDocs { - return Result{Id: doc.Id, Status: StatusVespaFailure} + result.Status = StatusVespaFailure + } else { + f.documents = append(f.documents, doc) } - f.documents = append(f.documents, doc) - return Result{Id: doc.Id} -} - -func (f *mockFeeder) Stats() Stats { return f.stats } - -func (f *mockFeeder) AddStats(stats Stats) { - f.mu.Lock() - defer f.mu.Unlock() - f.stats.Add(stats) + if !result.Status.Success() { + result.Stats.Errors = 1 + } + return result } func TestDispatcher(t *testing.T) { feeder := &mockFeeder{} - dispatcher := NewDispatcher(feeder, 2) + clock := &manualClock{tick: time.Second} + throttler := newThrottler(clock.now) + breaker := NewCircuitBreaker(time.Second, 0) + dispatcher := NewDispatcher(feeder, throttler, breaker) docs := []Document{ {Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{"foo": "123"}}`)}, {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Body: []byte(`{"fields":{"bar": "456"}}`)}, @@ -70,7 +70,10 @@ func TestDispatcherOrdering(t *testing.T) { {Id: mustParseId("id:ns:type::doc8"), Operation: OperationPut}, {Id: mustParseId("id:ns:type::doc9"), Operation: OperationPut}, } - dispatcher := NewDispatcher(feeder, len(docs)) + clock := &manualClock{tick: time.Second} + throttler := newThrottler(clock.now) + breaker := NewCircuitBreaker(time.Second, 0) + dispatcher := NewDispatcher(feeder, throttler, breaker) for _, d := range docs { dispatcher.Enqueue(d) } @@ -90,7 +93,7 @@ func TestDispatcherOrdering(t *testing.T) { } assert.Equal(t, len(docs), len(feeder.documents)) assert.Equal(t, wantDocs, gotDocs) - assert.Equal(t, int64(0), feeder.Stats().Errors) + assert.Equal(t, int64(0), dispatcher.Stats().Errors) } func TestDispatcherOrderingWithFailures(t *testing.T) { @@ -103,26 +106,26 @@ func TestDispatcherOrderingWithFailures(t *testing.T) { {Id: commonId, Operation: OperationRemove}, // fails } feeder.failAfterN(2) - dispatcher := NewDispatcher(feeder, len(docs)) + clock := &manualClock{tick: time.Second} + throttler := newThrottler(clock.now) + breaker := NewCircuitBreaker(time.Second, 0) + dispatcher := NewDispatcher(feeder, throttler, breaker) for _, d := range docs { dispatcher.Enqueue(d) } dispatcher.Close() wantDocs := docs[:2] assert.Equal(t, wantDocs, feeder.documents) - assert.Equal(t, int64(2), feeder.Stats().Errors) + assert.Equal(t, int64(2), dispatcher.Stats().Errors) - // Dispatching more documents for same ID fails implicitly + // Dispatching more documents for same ID succeed feeder.failAfterN(0) dispatcher.start() dispatcher.Enqueue(Document{Id: commonId, Operation: OperationPut}) dispatcher.Enqueue(Document{Id: commonId, Operation: OperationRemove}) - // Other IDs are fine - doc2 := Document{Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut} - doc3 := Document{Id: mustParseId("id:ns:type::doc3"), Operation: OperationPut} - dispatcher.Enqueue(doc2) - dispatcher.Enqueue(doc3) + dispatcher.Enqueue(Document{Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut}) + dispatcher.Enqueue(Document{Id: mustParseId("id:ns:type::doc3"), Operation: OperationPut}) dispatcher.Close() - assert.Equal(t, int64(4), feeder.Stats().Errors) - assert.Equal(t, 4, len(feeder.documents)) + assert.Equal(t, int64(2), dispatcher.Stats().Errors) + assert.Equal(t, 6, len(feeder.documents)) } diff --git a/client/go/internal/vespa/document/feeder.go b/client/go/internal/vespa/document/feeder.go index 6996e649d24..8bdd5bca5ba 100644 --- a/client/go/internal/vespa/document/feeder.go +++ b/client/go/internal/vespa/document/feeder.go @@ -23,17 +23,19 @@ const ( // Result represents the result of a feeding operation. type Result struct { - Id Id - Status Status - Message string - Trace string - Err error + Id Id + Status Status + HTTPStatus int + Message string + Trace string + Err error + Stats Stats } // Success returns whether status s is considered a success. func (s Status) Success() bool { return s == StatusSuccess || s == StatusConditionNotMet } -// Stats represents the summed statistics of a feeder. +// Stats represents feeding operation statistics. type Stats struct { Requests int64 Responses int64 @@ -47,8 +49,6 @@ type Stats struct { BytesRecv int64 } -func NewStats() Stats { return Stats{ResponsesByCode: make(map[int]int64)} } - // AvgLatency returns the average latency for a request. func (s Stats) AvgLatency() time.Duration { requests := s.Requests @@ -69,6 +69,9 @@ func (s Stats) Successes() int64 { func (s *Stats) Add(other Stats) { s.Requests += other.Requests s.Responses += other.Responses + if s.ResponsesByCode == nil && other.ResponsesByCode != nil { + s.ResponsesByCode = make(map[int]int64) + } for code, count := range other.ResponsesByCode { _, ok := s.ResponsesByCode[code] if ok { @@ -91,8 +94,4 @@ func (s *Stats) Add(other Stats) { } // Feeder is the interface for a consumer of documents. -type Feeder interface { - Send(Document) Result - Stats() Stats - AddStats(Stats) -} +type Feeder interface{ Send(Document) Result } diff --git a/client/go/internal/vespa/document/feeder_test.go b/client/go/internal/vespa/document/feeder_test.go index 1368d871436..a7d92495889 100644 --- a/client/go/internal/vespa/document/feeder_test.go +++ b/client/go/internal/vespa/document/feeder_test.go @@ -7,7 +7,7 @@ import ( ) func TestStatsAdd(t *testing.T) { - got := NewStats() + var got Stats got.Add(Stats{Requests: 1}) got.Add(Stats{Requests: 1}) got.Add(Stats{Responses: 1}) diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go index 2e01d4564ab..4dadcd1d05c 100644 --- a/client/go/internal/vespa/document/http.go +++ b/client/go/internal/vespa/document/http.go @@ -9,7 +9,6 @@ import ( "net/url" "strconv" "strings" - "sync" "time" "github.com/vespa-engine/vespa/client/go/internal/util" @@ -19,8 +18,6 @@ import ( type Client struct { options ClientOptions httpClient util.HTTPClient - stats Stats - mu sync.Mutex now func() time.Time } @@ -47,7 +44,6 @@ func NewClient(options ClientOptions, httpClient util.HTTPClient) *Client { c := &Client{ options: options, httpClient: httpClient, - stats: NewStats(), now: time.Now, } return c @@ -116,49 +112,42 @@ func (c *Client) feedURL(d Document, queryParams url.Values) (string, *url.URL, // Send given document the URL configured in this client. func (c *Client) Send(document Document) Result { start := c.now() - stats := NewStats() - stats.Requests = 1 - defer func() { - latency := c.now().Sub(start) - stats.TotalLatency = latency - stats.MinLatency = latency - stats.MaxLatency = latency - c.AddStats(stats) - }() + result := Result{Id: document.Id} + result.Stats.Requests = 1 method, url, err := c.feedURL(document, c.queryParams()) if err != nil { - stats.Errors = 1 - return Result{Status: StatusError, Err: err} + result.Stats.Errors = 1 + result.Err = err + return result } req, err := http.NewRequest(method, url.String(), bytes.NewReader(document.Body)) if err != nil { - stats.Errors = 1 - return Result{Status: StatusError, Err: err} + result.Stats.Errors = 1 + result.Status = StatusError + result.Err = err + return result } - resp, err := c.httpClient.Do(req, c.options.Timeout) + resp, err := c.httpClient.Do(req, 190*time.Second) if err != nil { - stats.Errors = 1 - return Result{Status: StatusTransportFailure, Err: err} + result.Stats.Errors = 1 + result.Status = StatusTransportFailure + result.Err = err + return result } defer resp.Body.Close() - stats.Responses = 1 - stats.ResponsesByCode = map[int]int64{ + result.Stats.Responses = 1 + result.Stats.ResponsesByCode = map[int]int64{ resp.StatusCode: 1, } - stats.BytesSent = int64(len(document.Body)) - return c.createResult(document.Id, &stats, resp) -} - -func (c *Client) Stats() Stats { return c.stats } - -func (c *Client) AddStats(stats Stats) { - c.mu.Lock() - defer c.mu.Unlock() - c.stats.Add(stats) + result.Stats.BytesSent = int64(len(document.Body)) + elapsed := c.now().Sub(start) + result.Stats.TotalLatency = elapsed + result.Stats.MinLatency = elapsed + result.Stats.MaxLatency = elapsed + return c.resultWithResponse(resp, result) } -func (c *Client) createResult(id Id, stats *Stats, resp *http.Response) Result { - result := Result{Id: id} +func (c *Client) resultWithResponse(resp *http.Response, result Result) Result { switch resp.StatusCode { case 200: result.Status = StatusSuccess @@ -181,9 +170,9 @@ func (c *Client) createResult(id Id, stats *Stats, resp *http.Response) Result { } result.Message = body.Message result.Trace = string(body.Trace) - stats.BytesRecv = cr.bytesRead + result.Stats.BytesRecv = cr.bytesRead if !result.Status.Success() { - stats.Errors = 1 + result.Stats.Errors = 1 } return result } diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go index f02c87730d5..311668fa16e 100644 --- a/client/go/internal/vespa/document/http_test.go +++ b/client/go/internal/vespa/document/http_test.go @@ -19,11 +19,12 @@ type manualClock struct { } func (c *manualClock) now() time.Time { - t := c.t - c.t = c.t.Add(c.tick) - return t + c.advance(c.tick) + return c.t } +func (c *manualClock) advance(d time.Duration) { c.t = c.t.Add(d) } + func TestClientSend(t *testing.T) { docs := []Document{ {Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "123"}}`)}, @@ -37,6 +38,7 @@ func TestClientSend(t *testing.T) { }, &httpClient) clock := manualClock{t: time.Now(), tick: time.Second} client.now = clock.now + var stats Stats for i, doc := range docs { if i < 2 { httpClient.NextResponseString(200, `{"message":"All good!"}`) @@ -44,6 +46,7 @@ func TestClientSend(t *testing.T) { httpClient.NextResponseString(502, `{"message":"Good bye, cruel world!"}`) } res := client.Send(doc) + stats.Add(res.Stats) if res.Err != nil { t.Fatalf("got unexpected error %q", res.Err) } @@ -64,7 +67,6 @@ func TestClientSend(t *testing.T) { t.Errorf("got r.Body = %q, want %q", string(body), string(wantBody)) } } - stats := client.Stats() want := Stats{ Requests: 3, Responses: 3, diff --git a/client/go/internal/vespa/document/throttler.go b/client/go/internal/vespa/document/throttler.go new file mode 100644 index 00000000000..f7c57ff97ed --- /dev/null +++ b/client/go/internal/vespa/document/throttler.go @@ -0,0 +1,117 @@ +package document + +import ( + "math" + "math/rand" + "sync/atomic" + "time" +) + +const ( + throttlerWeight = 0.7 + // TODO(mpolden): Multiply this by connections per endpoint, and number of endpoints when this becomes configurable + // for local target + throttlerMinInflight = 16 + throttlerMaxInflight = 256 * throttlerMinInflight // 4096 max streams per connection on the server side +) + +type Throttler interface { + // Sent notifies the the throttler that a document has been sent. + Sent() + // Success notifies the throttler that document operation succeeded. + Success() + // Throttled notifies the throttler that a throttling event occured while count documents were in-flight. + Throttled(count int64) + // TargetInflight returns the ideal number of documents to have in-flight now. + TargetInflight() int64 +} + +type dynamicThrottler struct { + ok atomic.Int64 + targetInflight atomic.Int64 + targetTimesTen atomic.Int64 + + throughputs []float64 + sent int64 + + start time.Time + now func() time.Time +} + +func newThrottler(nowFunc func() time.Time) *dynamicThrottler { + d := &dynamicThrottler{ + throughputs: make([]float64, 128), + start: nowFunc(), + now: nowFunc, + } + d.targetInflight.Store(8 * throttlerMinInflight) + d.targetTimesTen.Store(10 * throttlerMaxInflight) + return d +} + +func NewThrottler() Throttler { return newThrottler(time.Now) } + +func (t *dynamicThrottler) Sent() { + currentInflight := t.targetInflight.Load() + t.sent++ + if t.sent*t.sent*t.sent < 100*currentInflight*currentInflight { + return + } + t.sent = 0 + now := t.now() + elapsed := now.Sub(t.start) + t.start = now + currentThroughput := float64(t.ok.Swap(0)) / float64(elapsed) + + // Use buckets for throughput over inflight, along the log-scale, in [minInflight, maxInflight). + index := int(float64(len(t.throughputs)) * math.Log(max(1, min(255, float64(currentInflight)/throttlerMinInflight))) / math.Log(256)) + t.throughputs[index] = currentThroughput + + // Loop over throughput measurements and pick the one which optimises throughput and latency. + choice := float64(currentInflight) + maxObjective := float64(-1) + for i := len(t.throughputs) - 1; i >= 0; i-- { + if t.throughputs[i] == 0 { + continue // Skip unknown values + } + inflight := float64(throttlerMinInflight) * math.Pow(256, (float64(i)+0.5)/float64(len(t.throughputs))) + objective := t.throughputs[i] * math.Pow(inflight, throttlerWeight-1) // Optimise throughput (weight), but also latency (1 - weight) + if objective > maxObjective { + maxObjective = objective + choice = inflight + } + } + target := int64((rand.Float64()*0.20 + 0.92) * choice) // Random walk, skewed towards increase + t.targetInflight.Store(max(throttlerMinInflight, min(throttlerMaxInflight, target))) +} + +func (t *dynamicThrottler) Success() { + t.targetTimesTen.Add(1) + t.ok.Add(1) +} + +func (t *dynamicThrottler) Throttled(inflight int64) { + t.targetTimesTen.Store(max(inflight*5, throttlerMinInflight*10)) +} + +func (t *dynamicThrottler) TargetInflight() int64 { + staticTargetInflight := min(throttlerMaxInflight, t.targetTimesTen.Load()/10) + targetInflight := t.targetInflight.Load() + return min(staticTargetInflight, targetInflight) +} + +type number interface{ float64 | int64 } + +func min[T number](x, y T) T { + if x < y { + return x + } + return y +} + +func max[T number](x, y T) T { + if x > y { + return x + } + return y +} diff --git a/client/go/internal/vespa/document/throttler_test.go b/client/go/internal/vespa/document/throttler_test.go new file mode 100644 index 00000000000..2fd1e73a45a --- /dev/null +++ b/client/go/internal/vespa/document/throttler_test.go @@ -0,0 +1,21 @@ +package document + +import ( + "testing" + "time" +) + +func TestThrottler(t *testing.T) { + clock := &manualClock{tick: time.Second} + tr := newThrottler(clock.now) + for i := 0; i < 100; i++ { + tr.Sent() + } + if got, want := tr.TargetInflight(), int64(128); got != want { + t.Errorf("got TargetInflight() = %d, but want %d", got, want) + } + tr.Throttled(5) + if got, want := tr.TargetInflight(), int64(16); got != want { + t.Errorf("got TargetInflight() = %d, but want %d", got, want) + } +} |