diff options
author | Arnstein Ressem <aressem@verizonmedia.com> | 2023-03-29 08:46:45 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-03-29 08:46:45 +0200 |
commit | b4de3620a662c0b4e6f8bd0899471fd0c58fe8ee (patch) | |
tree | 372c712f3b45e1b12778d036d9e65726591ffa1f | |
parent | 4fa6eda0a6b2b286bcd0cebe878d2809f57c28d4 (diff) | |
parent | f9f0cdad3c6721d06a7833be07d6280c68347263 (diff) |
Merge pull request #26625 from vespa-engine/revert-26618-mpolden/feed-client-4
Revert "Add throttling to vespa feed"
-rw-r--r-- | client/go/internal/cli/cmd/feed.go | 8 | ||||
-rw-r--r-- | client/go/internal/vespa/document/circuit_breaker.go | 75 | ||||
-rw-r--r-- | client/go/internal/vespa/document/circuit_breaker_test.go | 52 | ||||
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 158 | ||||
-rw-r--r-- | client/go/internal/vespa/document/dispatcher_test.go | 53 | ||||
-rw-r--r-- | client/go/internal/vespa/document/feeder.go | 25 | ||||
-rw-r--r-- | client/go/internal/vespa/document/feeder_test.go | 2 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http.go | 61 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http_test.go | 10 | ||||
-rw-r--r-- | client/go/internal/vespa/document/throttler.go | 117 | ||||
-rw-r--r-- | client/go/internal/vespa/document/throttler_test.go | 21 |
11 files changed, 129 insertions, 453 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index c8e032929b8..f273c5aa826 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -13,7 +13,6 @@ import ( ) func addFeedFlags(cmd *cobra.Command, concurrency *int) { - // TOOD(mpolden): Remove this flag cmd.PersistentFlags().IntVarP(concurrency, "concurrency", "T", 64, "Number of goroutines to use for dispatching") } @@ -59,10 +58,7 @@ func feed(r io.Reader, cli *CLI, concurrency int) error { client := document.NewClient(document.ClientOptions{ BaseURL: service.BaseURL, }, service) - throttler := document.NewThrottler() - // TODO(mpolden): Make doom duration configurable - circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0) - dispatcher := document.NewDispatcher(client, throttler, circuitBreaker) + dispatcher := document.NewDispatcher(client, concurrency) dec := document.NewDecoder(r) start := cli.now() @@ -82,7 +78,7 @@ func feed(r io.Reader, cli *CLI, concurrency int) error { return err } elapsed := cli.now().Sub(start) - return writeSummaryJSON(cli.Stdout, dispatcher.Stats(), elapsed) + return writeSummaryJSON(cli.Stdout, client.Stats(), elapsed) } type number float32 diff --git a/client/go/internal/vespa/document/circuit_breaker.go b/client/go/internal/vespa/document/circuit_breaker.go deleted file mode 100644 index aff15e88069..00000000000 --- a/client/go/internal/vespa/document/circuit_breaker.go +++ /dev/null @@ -1,75 +0,0 @@ -package document - -import ( - "math" - "sync/atomic" - "time" -) - -type CircuitState int - -const ( - // CircuitClosed represents a closed circuit. Documents are processed successfully - CircuitClosed CircuitState = iota - // CircuitHalfOpen represents a half-open circuit. Some errors have happend, but processing may still recover - CircuitHalfOpen - // CircuitOpen represents a open circuit. Something is broken. We should no longer process documents - CircuitOpen -) - -type CircuitBreaker interface { - Success() - Error(error) - State() CircuitState -} - -type timeCircuitBreaker struct { - graceDuration time.Duration - doomDuration time.Duration - - failingSinceMillis atomic.Int64 - lastError atomic.Value - halfOpen atomic.Bool - open atomic.Bool - - now func() time.Time -} - -func (b *timeCircuitBreaker) Success() { - b.failingSinceMillis.Store(math.MaxInt64) - if !b.open.Load() { - b.halfOpen.CompareAndSwap(true, false) - } -} - -func (b *timeCircuitBreaker) Error(err error) { - if b.failingSinceMillis.CompareAndSwap(math.MaxInt64, b.now().UnixMilli()) { - b.lastError.Store(err) - } -} - -func (b *timeCircuitBreaker) State() CircuitState { - failingDuration := b.now().Sub(time.UnixMilli(b.failingSinceMillis.Load())) - if failingDuration > b.graceDuration { - b.halfOpen.CompareAndSwap(false, true) - } - if b.doomDuration > 0 && failingDuration > b.doomDuration { - b.open.CompareAndSwap(false, true) - } - if b.open.Load() { - return CircuitOpen - } else if b.halfOpen.Load() { - return CircuitHalfOpen - } - return CircuitClosed -} - -func NewCircuitBreaker(graceDuration, doomDuration time.Duration) CircuitBreaker { - b := &timeCircuitBreaker{ - graceDuration: graceDuration, - doomDuration: doomDuration, - now: time.Now, - } - b.failingSinceMillis.Store(math.MaxInt64) - return b -} diff --git a/client/go/internal/vespa/document/circuit_breaker_test.go b/client/go/internal/vespa/document/circuit_breaker_test.go deleted file mode 100644 index 99dd057438d..00000000000 --- a/client/go/internal/vespa/document/circuit_breaker_test.go +++ /dev/null @@ -1,52 +0,0 @@ -package document - -import ( - "errors" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestCircuitBreaker(t *testing.T) { - clock := &manualClock{} - breaker := &timeCircuitBreaker{ - graceDuration: time.Second, - doomDuration: time.Minute, - now: clock.now, - } - err := errors.New("error") - - assert.Equal(t, CircuitClosed, breaker.State(), "Initial state is closed") - - clock.advance(100 * time.Second) - assert.Equal(t, CircuitClosed, breaker.State(), "State is closed after some time without activity") - - breaker.Success() - assert.Equal(t, CircuitClosed, breaker.State(), "State is closed after a success") - - clock.advance(100 * time.Second) - assert.Equal(t, CircuitClosed, breaker.State(), "State is closed some time after a success") - - breaker.Error(err) - assert.Equal(t, CircuitClosed, breaker.State(), "State is closed right after a failure") - - clock.advance(time.Second) - assert.Equal(t, CircuitClosed, breaker.State(), "State is closed until grace duration has passed") - - clock.advance(time.Millisecond) - assert.Equal(t, CircuitHalfOpen, breaker.State(), "State is half-open when grace duration has passed") - - breaker.Success() - assert.Equal(t, CircuitClosed, breaker.State(), "State is closed after a new success") - - breaker.Error(err) - clock.advance(time.Minute) - assert.Equal(t, CircuitHalfOpen, breaker.State(), "State is half-open until doom duration has passed") - - clock.advance(time.Millisecond) - assert.Equal(t, CircuitOpen, breaker.State(), "State is open when doom duration has passed") - - breaker.Success() - assert.Equal(t, CircuitOpen, breaker.State(), "State remains open in spite of new successes") -} diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index a65f16c9298..feb562a241a 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -3,33 +3,25 @@ package document import ( "fmt" "sync" - "sync/atomic" - "time" ) const maxAttempts = 10 // Dispatcher dispatches documents from a queue to a Feeder. type Dispatcher struct { - feeder Feeder - throttler Throttler - circuitBreaker CircuitBreaker - stats Stats - - closed bool - ready chan Id - results chan Result - inflight map[string]*documentGroup - inflightCount atomic.Int64 - + workers int + feeder Feeder + ready chan Id + inflight map[string]*documentGroup mu sync.RWMutex wg sync.WaitGroup - resultWg sync.WaitGroup + closed bool } // documentGroup holds document operations which share their ID, and must be dispatched in order. type documentGroup struct { id Id + failed bool operations []documentOp mu sync.Mutex } @@ -45,97 +37,68 @@ func (g *documentGroup) append(op documentOp) { g.operations = append(g.operations, op) } -func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker) *Dispatcher { +func NewDispatcher(feeder Feeder, workers int) *Dispatcher { + if workers < 1 { + workers = 1 + } d := &Dispatcher{ - feeder: feeder, - throttler: throttler, - circuitBreaker: breaker, - inflight: make(map[string]*documentGroup), + workers: workers, + feeder: feeder, + inflight: make(map[string]*documentGroup), } d.start() return d } -func (d *Dispatcher) dispatchAll(g *documentGroup) { +func (d *Dispatcher) dispatchAll(g *documentGroup) int { g.mu.Lock() defer g.mu.Unlock() - for i := 0; i < len(g.operations); i++ { + failCount := len(g.operations) + for i := 0; !g.failed && i < len(g.operations); i++ { op := g.operations[i] ok := false - for !ok { - op.attempts++ + for op.attempts <= maxAttempts && !ok { + op.attempts += 1 + // TODO(mpolden): Extract function which does throttling/circuit-breaking result := d.feeder.Send(op.document) - d.results <- result ok = result.Status.Success() - if !d.shouldRetry(op, result) { - break - } } - d.releaseSlot() - } - g.operations = nil -} - -func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool { - if result.HTTPStatus/100 == 2 || result.HTTPStatus == 404 || result.HTTPStatus == 412 { - d.throttler.Success() - d.circuitBreaker.Success() - return false - } - if result.HTTPStatus == 429 || result.HTTPStatus == 503 { - d.throttler.Throttled(d.inflightCount.Load()) - return true - } - if result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 { - d.circuitBreaker.Error(fmt.Errorf("request failed with status %d", result.HTTPStatus)) - if op.attempts <= maxAttempts { - return true + if ok { + failCount-- + } else { + g.failed = true + failCount = len(g.operations) - i } } - return false + g.operations = nil + return failCount } func (d *Dispatcher) start() { d.mu.Lock() defer d.mu.Unlock() - d.ready = make(chan Id, 4096) - d.results = make(chan Result, 4096) d.closed = false - d.wg.Add(1) - go func() { - defer d.wg.Done() - d.readDocuments() - }() - d.resultWg.Add(1) - go func() { - defer d.resultWg.Done() - d.readResults() - }() -} - -func (d *Dispatcher) readDocuments() { - for id := range d.ready { - d.mu.RLock() - group := d.inflight[id.String()] - d.mu.RUnlock() - if group != nil { - d.wg.Add(1) - go func() { - defer d.wg.Done() - d.dispatchAll(group) - }() - } - } -} - -func (d *Dispatcher) readResults() { - for result := range d.results { - d.stats.Add(result.Stats) + d.ready = make(chan Id, 4*d.workers) + for i := 0; i < d.workers; i++ { + d.wg.Add(1) + go func() { + defer d.wg.Done() + for id := range d.ready { + d.mu.RLock() + group := d.inflight[id.String()] + d.mu.RUnlock() + if group != nil { + failedDocs := d.dispatchAll(group) + d.feeder.AddStats(Stats{Errors: int64(failedDocs)}) + } + } + }() } } func (d *Dispatcher) Enqueue(doc Document) error { d.mu.Lock() + defer d.mu.Unlock() if d.closed { return fmt.Errorf("dispatcher is closed") } @@ -149,43 +112,18 @@ func (d *Dispatcher) Enqueue(doc Document) error { } d.inflight[doc.Id.String()] = group } - d.mu.Unlock() - d.enqueueWithSlot(doc.Id) + d.ready <- doc.Id return nil } -func (d *Dispatcher) Stats() Stats { return d.stats } - -func (d *Dispatcher) enqueueWithSlot(id Id) { - d.acquireSlot() - d.ready <- id - d.throttler.Sent() -} - -func (d *Dispatcher) acquireSlot() { - for d.inflightCount.Load() >= d.throttler.TargetInflight() { - time.Sleep(time.Millisecond) - } - d.inflightCount.Add(1) -} - -func (d *Dispatcher) releaseSlot() { d.inflightCount.Add(-1) } - -func closeAndWait[T any](ch chan T, wg *sync.WaitGroup, d *Dispatcher, markClosed bool) { +// Close closes the dispatcher and waits for all inflight operations to complete. +func (d *Dispatcher) Close() error { d.mu.Lock() if !d.closed { - close(ch) - if markClosed { - d.closed = true - } + close(d.ready) + d.closed = true } d.mu.Unlock() - wg.Wait() -} - -// Close closes the dispatcher and waits for all inflight operations to complete. -func (d *Dispatcher) Close() error { - closeAndWait(d.ready, &d.wg, d, false) - closeAndWait(d.results, &d.resultWg, d, true) + d.wg.Wait() return nil } diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go index 8a6d8c6117c..04e0928f2a3 100644 --- a/client/go/internal/vespa/document/dispatcher_test.go +++ b/client/go/internal/vespa/document/dispatcher_test.go @@ -3,7 +3,6 @@ package document import ( "sync" "testing" - "time" "github.com/stretchr/testify/assert" ) @@ -11,6 +10,7 @@ import ( type mockFeeder struct { failAfterNDocs int documents []Document + stats Stats mu sync.Mutex } @@ -23,24 +23,24 @@ func (f *mockFeeder) failAfterN(docs int) { func (f *mockFeeder) Send(doc Document) Result { f.mu.Lock() defer f.mu.Unlock() - result := Result{Id: doc.Id} if f.failAfterNDocs > 0 && len(f.documents) >= f.failAfterNDocs { - result.Status = StatusVespaFailure - } else { - f.documents = append(f.documents, doc) + return Result{Id: doc.Id, Status: StatusVespaFailure} } - if !result.Status.Success() { - result.Stats.Errors = 1 - } - return result + f.documents = append(f.documents, doc) + return Result{Id: doc.Id} +} + +func (f *mockFeeder) Stats() Stats { return f.stats } + +func (f *mockFeeder) AddStats(stats Stats) { + f.mu.Lock() + defer f.mu.Unlock() + f.stats.Add(stats) } func TestDispatcher(t *testing.T) { feeder := &mockFeeder{} - clock := &manualClock{tick: time.Second} - throttler := newThrottler(clock.now) - breaker := NewCircuitBreaker(time.Second, 0) - dispatcher := NewDispatcher(feeder, throttler, breaker) + dispatcher := NewDispatcher(feeder, 2) docs := []Document{ {Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{"foo": "123"}}`)}, {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Body: []byte(`{"fields":{"bar": "456"}}`)}, @@ -70,10 +70,7 @@ func TestDispatcherOrdering(t *testing.T) { {Id: mustParseId("id:ns:type::doc8"), Operation: OperationPut}, {Id: mustParseId("id:ns:type::doc9"), Operation: OperationPut}, } - clock := &manualClock{tick: time.Second} - throttler := newThrottler(clock.now) - breaker := NewCircuitBreaker(time.Second, 0) - dispatcher := NewDispatcher(feeder, throttler, breaker) + dispatcher := NewDispatcher(feeder, len(docs)) for _, d := range docs { dispatcher.Enqueue(d) } @@ -93,7 +90,7 @@ func TestDispatcherOrdering(t *testing.T) { } assert.Equal(t, len(docs), len(feeder.documents)) assert.Equal(t, wantDocs, gotDocs) - assert.Equal(t, int64(0), dispatcher.Stats().Errors) + assert.Equal(t, int64(0), feeder.Stats().Errors) } func TestDispatcherOrderingWithFailures(t *testing.T) { @@ -106,26 +103,26 @@ func TestDispatcherOrderingWithFailures(t *testing.T) { {Id: commonId, Operation: OperationRemove}, // fails } feeder.failAfterN(2) - clock := &manualClock{tick: time.Second} - throttler := newThrottler(clock.now) - breaker := NewCircuitBreaker(time.Second, 0) - dispatcher := NewDispatcher(feeder, throttler, breaker) + dispatcher := NewDispatcher(feeder, len(docs)) for _, d := range docs { dispatcher.Enqueue(d) } dispatcher.Close() wantDocs := docs[:2] assert.Equal(t, wantDocs, feeder.documents) - assert.Equal(t, int64(2), dispatcher.Stats().Errors) + assert.Equal(t, int64(2), feeder.Stats().Errors) - // Dispatching more documents for same ID succeed + // Dispatching more documents for same ID fails implicitly feeder.failAfterN(0) dispatcher.start() dispatcher.Enqueue(Document{Id: commonId, Operation: OperationPut}) dispatcher.Enqueue(Document{Id: commonId, Operation: OperationRemove}) - dispatcher.Enqueue(Document{Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut}) - dispatcher.Enqueue(Document{Id: mustParseId("id:ns:type::doc3"), Operation: OperationPut}) + // Other IDs are fine + doc2 := Document{Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut} + doc3 := Document{Id: mustParseId("id:ns:type::doc3"), Operation: OperationPut} + dispatcher.Enqueue(doc2) + dispatcher.Enqueue(doc3) dispatcher.Close() - assert.Equal(t, int64(2), dispatcher.Stats().Errors) - assert.Equal(t, 6, len(feeder.documents)) + assert.Equal(t, int64(4), feeder.Stats().Errors) + assert.Equal(t, 4, len(feeder.documents)) } diff --git a/client/go/internal/vespa/document/feeder.go b/client/go/internal/vespa/document/feeder.go index 8bdd5bca5ba..6996e649d24 100644 --- a/client/go/internal/vespa/document/feeder.go +++ b/client/go/internal/vespa/document/feeder.go @@ -23,19 +23,17 @@ const ( // Result represents the result of a feeding operation. type Result struct { - Id Id - Status Status - HTTPStatus int - Message string - Trace string - Err error - Stats Stats + Id Id + Status Status + Message string + Trace string + Err error } // Success returns whether status s is considered a success. func (s Status) Success() bool { return s == StatusSuccess || s == StatusConditionNotMet } -// Stats represents feeding operation statistics. +// Stats represents the summed statistics of a feeder. type Stats struct { Requests int64 Responses int64 @@ -49,6 +47,8 @@ type Stats struct { BytesRecv int64 } +func NewStats() Stats { return Stats{ResponsesByCode: make(map[int]int64)} } + // AvgLatency returns the average latency for a request. func (s Stats) AvgLatency() time.Duration { requests := s.Requests @@ -69,9 +69,6 @@ func (s Stats) Successes() int64 { func (s *Stats) Add(other Stats) { s.Requests += other.Requests s.Responses += other.Responses - if s.ResponsesByCode == nil && other.ResponsesByCode != nil { - s.ResponsesByCode = make(map[int]int64) - } for code, count := range other.ResponsesByCode { _, ok := s.ResponsesByCode[code] if ok { @@ -94,4 +91,8 @@ func (s *Stats) Add(other Stats) { } // Feeder is the interface for a consumer of documents. -type Feeder interface{ Send(Document) Result } +type Feeder interface { + Send(Document) Result + Stats() Stats + AddStats(Stats) +} diff --git a/client/go/internal/vespa/document/feeder_test.go b/client/go/internal/vespa/document/feeder_test.go index a7d92495889..1368d871436 100644 --- a/client/go/internal/vespa/document/feeder_test.go +++ b/client/go/internal/vespa/document/feeder_test.go @@ -7,7 +7,7 @@ import ( ) func TestStatsAdd(t *testing.T) { - var got Stats + got := NewStats() got.Add(Stats{Requests: 1}) got.Add(Stats{Requests: 1}) got.Add(Stats{Responses: 1}) diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go index 4dadcd1d05c..2e01d4564ab 100644 --- a/client/go/internal/vespa/document/http.go +++ b/client/go/internal/vespa/document/http.go @@ -9,6 +9,7 @@ import ( "net/url" "strconv" "strings" + "sync" "time" "github.com/vespa-engine/vespa/client/go/internal/util" @@ -18,6 +19,8 @@ import ( type Client struct { options ClientOptions httpClient util.HTTPClient + stats Stats + mu sync.Mutex now func() time.Time } @@ -44,6 +47,7 @@ func NewClient(options ClientOptions, httpClient util.HTTPClient) *Client { c := &Client{ options: options, httpClient: httpClient, + stats: NewStats(), now: time.Now, } return c @@ -112,42 +116,49 @@ func (c *Client) feedURL(d Document, queryParams url.Values) (string, *url.URL, // Send given document the URL configured in this client. func (c *Client) Send(document Document) Result { start := c.now() - result := Result{Id: document.Id} - result.Stats.Requests = 1 + stats := NewStats() + stats.Requests = 1 + defer func() { + latency := c.now().Sub(start) + stats.TotalLatency = latency + stats.MinLatency = latency + stats.MaxLatency = latency + c.AddStats(stats) + }() method, url, err := c.feedURL(document, c.queryParams()) if err != nil { - result.Stats.Errors = 1 - result.Err = err - return result + stats.Errors = 1 + return Result{Status: StatusError, Err: err} } req, err := http.NewRequest(method, url.String(), bytes.NewReader(document.Body)) if err != nil { - result.Stats.Errors = 1 - result.Status = StatusError - result.Err = err - return result + stats.Errors = 1 + return Result{Status: StatusError, Err: err} } - resp, err := c.httpClient.Do(req, 190*time.Second) + resp, err := c.httpClient.Do(req, c.options.Timeout) if err != nil { - result.Stats.Errors = 1 - result.Status = StatusTransportFailure - result.Err = err - return result + stats.Errors = 1 + return Result{Status: StatusTransportFailure, Err: err} } defer resp.Body.Close() - result.Stats.Responses = 1 - result.Stats.ResponsesByCode = map[int]int64{ + stats.Responses = 1 + stats.ResponsesByCode = map[int]int64{ resp.StatusCode: 1, } - result.Stats.BytesSent = int64(len(document.Body)) - elapsed := c.now().Sub(start) - result.Stats.TotalLatency = elapsed - result.Stats.MinLatency = elapsed - result.Stats.MaxLatency = elapsed - return c.resultWithResponse(resp, result) + stats.BytesSent = int64(len(document.Body)) + return c.createResult(document.Id, &stats, resp) +} + +func (c *Client) Stats() Stats { return c.stats } + +func (c *Client) AddStats(stats Stats) { + c.mu.Lock() + defer c.mu.Unlock() + c.stats.Add(stats) } -func (c *Client) resultWithResponse(resp *http.Response, result Result) Result { +func (c *Client) createResult(id Id, stats *Stats, resp *http.Response) Result { + result := Result{Id: id} switch resp.StatusCode { case 200: result.Status = StatusSuccess @@ -170,9 +181,9 @@ func (c *Client) resultWithResponse(resp *http.Response, result Result) Result { } result.Message = body.Message result.Trace = string(body.Trace) - result.Stats.BytesRecv = cr.bytesRead + stats.BytesRecv = cr.bytesRead if !result.Status.Success() { - result.Stats.Errors = 1 + stats.Errors = 1 } return result } diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go index 311668fa16e..f02c87730d5 100644 --- a/client/go/internal/vespa/document/http_test.go +++ b/client/go/internal/vespa/document/http_test.go @@ -19,12 +19,11 @@ type manualClock struct { } func (c *manualClock) now() time.Time { - c.advance(c.tick) - return c.t + t := c.t + c.t = c.t.Add(c.tick) + return t } -func (c *manualClock) advance(d time.Duration) { c.t = c.t.Add(d) } - func TestClientSend(t *testing.T) { docs := []Document{ {Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "123"}}`)}, @@ -38,7 +37,6 @@ func TestClientSend(t *testing.T) { }, &httpClient) clock := manualClock{t: time.Now(), tick: time.Second} client.now = clock.now - var stats Stats for i, doc := range docs { if i < 2 { httpClient.NextResponseString(200, `{"message":"All good!"}`) @@ -46,7 +44,6 @@ func TestClientSend(t *testing.T) { httpClient.NextResponseString(502, `{"message":"Good bye, cruel world!"}`) } res := client.Send(doc) - stats.Add(res.Stats) if res.Err != nil { t.Fatalf("got unexpected error %q", res.Err) } @@ -67,6 +64,7 @@ func TestClientSend(t *testing.T) { t.Errorf("got r.Body = %q, want %q", string(body), string(wantBody)) } } + stats := client.Stats() want := Stats{ Requests: 3, Responses: 3, diff --git a/client/go/internal/vespa/document/throttler.go b/client/go/internal/vespa/document/throttler.go deleted file mode 100644 index f7c57ff97ed..00000000000 --- a/client/go/internal/vespa/document/throttler.go +++ /dev/null @@ -1,117 +0,0 @@ -package document - -import ( - "math" - "math/rand" - "sync/atomic" - "time" -) - -const ( - throttlerWeight = 0.7 - // TODO(mpolden): Multiply this by connections per endpoint, and number of endpoints when this becomes configurable - // for local target - throttlerMinInflight = 16 - throttlerMaxInflight = 256 * throttlerMinInflight // 4096 max streams per connection on the server side -) - -type Throttler interface { - // Sent notifies the the throttler that a document has been sent. - Sent() - // Success notifies the throttler that document operation succeeded. - Success() - // Throttled notifies the throttler that a throttling event occured while count documents were in-flight. - Throttled(count int64) - // TargetInflight returns the ideal number of documents to have in-flight now. - TargetInflight() int64 -} - -type dynamicThrottler struct { - ok atomic.Int64 - targetInflight atomic.Int64 - targetTimesTen atomic.Int64 - - throughputs []float64 - sent int64 - - start time.Time - now func() time.Time -} - -func newThrottler(nowFunc func() time.Time) *dynamicThrottler { - d := &dynamicThrottler{ - throughputs: make([]float64, 128), - start: nowFunc(), - now: nowFunc, - } - d.targetInflight.Store(8 * throttlerMinInflight) - d.targetTimesTen.Store(10 * throttlerMaxInflight) - return d -} - -func NewThrottler() Throttler { return newThrottler(time.Now) } - -func (t *dynamicThrottler) Sent() { - currentInflight := t.targetInflight.Load() - t.sent++ - if t.sent*t.sent*t.sent < 100*currentInflight*currentInflight { - return - } - t.sent = 0 - now := t.now() - elapsed := now.Sub(t.start) - t.start = now - currentThroughput := float64(t.ok.Swap(0)) / float64(elapsed) - - // Use buckets for throughput over inflight, along the log-scale, in [minInflight, maxInflight). - index := int(float64(len(t.throughputs)) * math.Log(max(1, min(255, float64(currentInflight)/throttlerMinInflight))) / math.Log(256)) - t.throughputs[index] = currentThroughput - - // Loop over throughput measurements and pick the one which optimises throughput and latency. - choice := float64(currentInflight) - maxObjective := float64(-1) - for i := len(t.throughputs) - 1; i >= 0; i-- { - if t.throughputs[i] == 0 { - continue // Skip unknown values - } - inflight := float64(throttlerMinInflight) * math.Pow(256, (float64(i)+0.5)/float64(len(t.throughputs))) - objective := t.throughputs[i] * math.Pow(inflight, throttlerWeight-1) // Optimise throughput (weight), but also latency (1 - weight) - if objective > maxObjective { - maxObjective = objective - choice = inflight - } - } - target := int64((rand.Float64()*0.20 + 0.92) * choice) // Random walk, skewed towards increase - t.targetInflight.Store(max(throttlerMinInflight, min(throttlerMaxInflight, target))) -} - -func (t *dynamicThrottler) Success() { - t.targetTimesTen.Add(1) - t.ok.Add(1) -} - -func (t *dynamicThrottler) Throttled(inflight int64) { - t.targetTimesTen.Store(max(inflight*5, throttlerMinInflight*10)) -} - -func (t *dynamicThrottler) TargetInflight() int64 { - staticTargetInflight := min(throttlerMaxInflight, t.targetTimesTen.Load()/10) - targetInflight := t.targetInflight.Load() - return min(staticTargetInflight, targetInflight) -} - -type number interface{ float64 | int64 } - -func min[T number](x, y T) T { - if x < y { - return x - } - return y -} - -func max[T number](x, y T) T { - if x > y { - return x - } - return y -} diff --git a/client/go/internal/vespa/document/throttler_test.go b/client/go/internal/vespa/document/throttler_test.go deleted file mode 100644 index 2fd1e73a45a..00000000000 --- a/client/go/internal/vespa/document/throttler_test.go +++ /dev/null @@ -1,21 +0,0 @@ -package document - -import ( - "testing" - "time" -) - -func TestThrottler(t *testing.T) { - clock := &manualClock{tick: time.Second} - tr := newThrottler(clock.now) - for i := 0; i < 100; i++ { - tr.Sent() - } - if got, want := tr.TargetInflight(), int64(128); got != want { - t.Errorf("got TargetInflight() = %d, but want %d", got, want) - } - tr.Throttled(5) - if got, want := tr.TargetInflight(), int64(16); got != want { - t.Errorf("got TargetInflight() = %d, but want %d", got, want) - } -} |