summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArnstein Ressem <aressem@verizonmedia.com>2023-03-29 08:46:45 +0200
committerGitHub <noreply@github.com>2023-03-29 08:46:45 +0200
commitb4de3620a662c0b4e6f8bd0899471fd0c58fe8ee (patch)
tree372c712f3b45e1b12778d036d9e65726591ffa1f
parent4fa6eda0a6b2b286bcd0cebe878d2809f57c28d4 (diff)
parentf9f0cdad3c6721d06a7833be07d6280c68347263 (diff)
Merge pull request #26625 from vespa-engine/revert-26618-mpolden/feed-client-4
Revert "Add throttling to vespa feed"
-rw-r--r--client/go/internal/cli/cmd/feed.go8
-rw-r--r--client/go/internal/vespa/document/circuit_breaker.go75
-rw-r--r--client/go/internal/vespa/document/circuit_breaker_test.go52
-rw-r--r--client/go/internal/vespa/document/dispatcher.go158
-rw-r--r--client/go/internal/vespa/document/dispatcher_test.go53
-rw-r--r--client/go/internal/vespa/document/feeder.go25
-rw-r--r--client/go/internal/vespa/document/feeder_test.go2
-rw-r--r--client/go/internal/vespa/document/http.go61
-rw-r--r--client/go/internal/vespa/document/http_test.go10
-rw-r--r--client/go/internal/vespa/document/throttler.go117
-rw-r--r--client/go/internal/vespa/document/throttler_test.go21
11 files changed, 129 insertions, 453 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go
index c8e032929b8..f273c5aa826 100644
--- a/client/go/internal/cli/cmd/feed.go
+++ b/client/go/internal/cli/cmd/feed.go
@@ -13,7 +13,6 @@ import (
)
func addFeedFlags(cmd *cobra.Command, concurrency *int) {
- // TOOD(mpolden): Remove this flag
cmd.PersistentFlags().IntVarP(concurrency, "concurrency", "T", 64, "Number of goroutines to use for dispatching")
}
@@ -59,10 +58,7 @@ func feed(r io.Reader, cli *CLI, concurrency int) error {
client := document.NewClient(document.ClientOptions{
BaseURL: service.BaseURL,
}, service)
- throttler := document.NewThrottler()
- // TODO(mpolden): Make doom duration configurable
- circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0)
- dispatcher := document.NewDispatcher(client, throttler, circuitBreaker)
+ dispatcher := document.NewDispatcher(client, concurrency)
dec := document.NewDecoder(r)
start := cli.now()
@@ -82,7 +78,7 @@ func feed(r io.Reader, cli *CLI, concurrency int) error {
return err
}
elapsed := cli.now().Sub(start)
- return writeSummaryJSON(cli.Stdout, dispatcher.Stats(), elapsed)
+ return writeSummaryJSON(cli.Stdout, client.Stats(), elapsed)
}
type number float32
diff --git a/client/go/internal/vespa/document/circuit_breaker.go b/client/go/internal/vespa/document/circuit_breaker.go
deleted file mode 100644
index aff15e88069..00000000000
--- a/client/go/internal/vespa/document/circuit_breaker.go
+++ /dev/null
@@ -1,75 +0,0 @@
-package document
-
-import (
- "math"
- "sync/atomic"
- "time"
-)
-
-type CircuitState int
-
-const (
- // CircuitClosed represents a closed circuit. Documents are processed successfully
- CircuitClosed CircuitState = iota
- // CircuitHalfOpen represents a half-open circuit. Some errors have happend, but processing may still recover
- CircuitHalfOpen
- // CircuitOpen represents a open circuit. Something is broken. We should no longer process documents
- CircuitOpen
-)
-
-type CircuitBreaker interface {
- Success()
- Error(error)
- State() CircuitState
-}
-
-type timeCircuitBreaker struct {
- graceDuration time.Duration
- doomDuration time.Duration
-
- failingSinceMillis atomic.Int64
- lastError atomic.Value
- halfOpen atomic.Bool
- open atomic.Bool
-
- now func() time.Time
-}
-
-func (b *timeCircuitBreaker) Success() {
- b.failingSinceMillis.Store(math.MaxInt64)
- if !b.open.Load() {
- b.halfOpen.CompareAndSwap(true, false)
- }
-}
-
-func (b *timeCircuitBreaker) Error(err error) {
- if b.failingSinceMillis.CompareAndSwap(math.MaxInt64, b.now().UnixMilli()) {
- b.lastError.Store(err)
- }
-}
-
-func (b *timeCircuitBreaker) State() CircuitState {
- failingDuration := b.now().Sub(time.UnixMilli(b.failingSinceMillis.Load()))
- if failingDuration > b.graceDuration {
- b.halfOpen.CompareAndSwap(false, true)
- }
- if b.doomDuration > 0 && failingDuration > b.doomDuration {
- b.open.CompareAndSwap(false, true)
- }
- if b.open.Load() {
- return CircuitOpen
- } else if b.halfOpen.Load() {
- return CircuitHalfOpen
- }
- return CircuitClosed
-}
-
-func NewCircuitBreaker(graceDuration, doomDuration time.Duration) CircuitBreaker {
- b := &timeCircuitBreaker{
- graceDuration: graceDuration,
- doomDuration: doomDuration,
- now: time.Now,
- }
- b.failingSinceMillis.Store(math.MaxInt64)
- return b
-}
diff --git a/client/go/internal/vespa/document/circuit_breaker_test.go b/client/go/internal/vespa/document/circuit_breaker_test.go
deleted file mode 100644
index 99dd057438d..00000000000
--- a/client/go/internal/vespa/document/circuit_breaker_test.go
+++ /dev/null
@@ -1,52 +0,0 @@
-package document
-
-import (
- "errors"
- "testing"
- "time"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestCircuitBreaker(t *testing.T) {
- clock := &manualClock{}
- breaker := &timeCircuitBreaker{
- graceDuration: time.Second,
- doomDuration: time.Minute,
- now: clock.now,
- }
- err := errors.New("error")
-
- assert.Equal(t, CircuitClosed, breaker.State(), "Initial state is closed")
-
- clock.advance(100 * time.Second)
- assert.Equal(t, CircuitClosed, breaker.State(), "State is closed after some time without activity")
-
- breaker.Success()
- assert.Equal(t, CircuitClosed, breaker.State(), "State is closed after a success")
-
- clock.advance(100 * time.Second)
- assert.Equal(t, CircuitClosed, breaker.State(), "State is closed some time after a success")
-
- breaker.Error(err)
- assert.Equal(t, CircuitClosed, breaker.State(), "State is closed right after a failure")
-
- clock.advance(time.Second)
- assert.Equal(t, CircuitClosed, breaker.State(), "State is closed until grace duration has passed")
-
- clock.advance(time.Millisecond)
- assert.Equal(t, CircuitHalfOpen, breaker.State(), "State is half-open when grace duration has passed")
-
- breaker.Success()
- assert.Equal(t, CircuitClosed, breaker.State(), "State is closed after a new success")
-
- breaker.Error(err)
- clock.advance(time.Minute)
- assert.Equal(t, CircuitHalfOpen, breaker.State(), "State is half-open until doom duration has passed")
-
- clock.advance(time.Millisecond)
- assert.Equal(t, CircuitOpen, breaker.State(), "State is open when doom duration has passed")
-
- breaker.Success()
- assert.Equal(t, CircuitOpen, breaker.State(), "State remains open in spite of new successes")
-}
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index a65f16c9298..feb562a241a 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -3,33 +3,25 @@ package document
import (
"fmt"
"sync"
- "sync/atomic"
- "time"
)
const maxAttempts = 10
// Dispatcher dispatches documents from a queue to a Feeder.
type Dispatcher struct {
- feeder Feeder
- throttler Throttler
- circuitBreaker CircuitBreaker
- stats Stats
-
- closed bool
- ready chan Id
- results chan Result
- inflight map[string]*documentGroup
- inflightCount atomic.Int64
-
+ workers int
+ feeder Feeder
+ ready chan Id
+ inflight map[string]*documentGroup
mu sync.RWMutex
wg sync.WaitGroup
- resultWg sync.WaitGroup
+ closed bool
}
// documentGroup holds document operations which share their ID, and must be dispatched in order.
type documentGroup struct {
id Id
+ failed bool
operations []documentOp
mu sync.Mutex
}
@@ -45,97 +37,68 @@ func (g *documentGroup) append(op documentOp) {
g.operations = append(g.operations, op)
}
-func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker) *Dispatcher {
+func NewDispatcher(feeder Feeder, workers int) *Dispatcher {
+ if workers < 1 {
+ workers = 1
+ }
d := &Dispatcher{
- feeder: feeder,
- throttler: throttler,
- circuitBreaker: breaker,
- inflight: make(map[string]*documentGroup),
+ workers: workers,
+ feeder: feeder,
+ inflight: make(map[string]*documentGroup),
}
d.start()
return d
}
-func (d *Dispatcher) dispatchAll(g *documentGroup) {
+func (d *Dispatcher) dispatchAll(g *documentGroup) int {
g.mu.Lock()
defer g.mu.Unlock()
- for i := 0; i < len(g.operations); i++ {
+ failCount := len(g.operations)
+ for i := 0; !g.failed && i < len(g.operations); i++ {
op := g.operations[i]
ok := false
- for !ok {
- op.attempts++
+ for op.attempts <= maxAttempts && !ok {
+ op.attempts += 1
+ // TODO(mpolden): Extract function which does throttling/circuit-breaking
result := d.feeder.Send(op.document)
- d.results <- result
ok = result.Status.Success()
- if !d.shouldRetry(op, result) {
- break
- }
}
- d.releaseSlot()
- }
- g.operations = nil
-}
-
-func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool {
- if result.HTTPStatus/100 == 2 || result.HTTPStatus == 404 || result.HTTPStatus == 412 {
- d.throttler.Success()
- d.circuitBreaker.Success()
- return false
- }
- if result.HTTPStatus == 429 || result.HTTPStatus == 503 {
- d.throttler.Throttled(d.inflightCount.Load())
- return true
- }
- if result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 {
- d.circuitBreaker.Error(fmt.Errorf("request failed with status %d", result.HTTPStatus))
- if op.attempts <= maxAttempts {
- return true
+ if ok {
+ failCount--
+ } else {
+ g.failed = true
+ failCount = len(g.operations) - i
}
}
- return false
+ g.operations = nil
+ return failCount
}
func (d *Dispatcher) start() {
d.mu.Lock()
defer d.mu.Unlock()
- d.ready = make(chan Id, 4096)
- d.results = make(chan Result, 4096)
d.closed = false
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
- d.readDocuments()
- }()
- d.resultWg.Add(1)
- go func() {
- defer d.resultWg.Done()
- d.readResults()
- }()
-}
-
-func (d *Dispatcher) readDocuments() {
- for id := range d.ready {
- d.mu.RLock()
- group := d.inflight[id.String()]
- d.mu.RUnlock()
- if group != nil {
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
- d.dispatchAll(group)
- }()
- }
- }
-}
-
-func (d *Dispatcher) readResults() {
- for result := range d.results {
- d.stats.Add(result.Stats)
+ d.ready = make(chan Id, 4*d.workers)
+ for i := 0; i < d.workers; i++ {
+ d.wg.Add(1)
+ go func() {
+ defer d.wg.Done()
+ for id := range d.ready {
+ d.mu.RLock()
+ group := d.inflight[id.String()]
+ d.mu.RUnlock()
+ if group != nil {
+ failedDocs := d.dispatchAll(group)
+ d.feeder.AddStats(Stats{Errors: int64(failedDocs)})
+ }
+ }
+ }()
}
}
func (d *Dispatcher) Enqueue(doc Document) error {
d.mu.Lock()
+ defer d.mu.Unlock()
if d.closed {
return fmt.Errorf("dispatcher is closed")
}
@@ -149,43 +112,18 @@ func (d *Dispatcher) Enqueue(doc Document) error {
}
d.inflight[doc.Id.String()] = group
}
- d.mu.Unlock()
- d.enqueueWithSlot(doc.Id)
+ d.ready <- doc.Id
return nil
}
-func (d *Dispatcher) Stats() Stats { return d.stats }
-
-func (d *Dispatcher) enqueueWithSlot(id Id) {
- d.acquireSlot()
- d.ready <- id
- d.throttler.Sent()
-}
-
-func (d *Dispatcher) acquireSlot() {
- for d.inflightCount.Load() >= d.throttler.TargetInflight() {
- time.Sleep(time.Millisecond)
- }
- d.inflightCount.Add(1)
-}
-
-func (d *Dispatcher) releaseSlot() { d.inflightCount.Add(-1) }
-
-func closeAndWait[T any](ch chan T, wg *sync.WaitGroup, d *Dispatcher, markClosed bool) {
+// Close closes the dispatcher and waits for all inflight operations to complete.
+func (d *Dispatcher) Close() error {
d.mu.Lock()
if !d.closed {
- close(ch)
- if markClosed {
- d.closed = true
- }
+ close(d.ready)
+ d.closed = true
}
d.mu.Unlock()
- wg.Wait()
-}
-
-// Close closes the dispatcher and waits for all inflight operations to complete.
-func (d *Dispatcher) Close() error {
- closeAndWait(d.ready, &d.wg, d, false)
- closeAndWait(d.results, &d.resultWg, d, true)
+ d.wg.Wait()
return nil
}
diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go
index 8a6d8c6117c..04e0928f2a3 100644
--- a/client/go/internal/vespa/document/dispatcher_test.go
+++ b/client/go/internal/vespa/document/dispatcher_test.go
@@ -3,7 +3,6 @@ package document
import (
"sync"
"testing"
- "time"
"github.com/stretchr/testify/assert"
)
@@ -11,6 +10,7 @@ import (
type mockFeeder struct {
failAfterNDocs int
documents []Document
+ stats Stats
mu sync.Mutex
}
@@ -23,24 +23,24 @@ func (f *mockFeeder) failAfterN(docs int) {
func (f *mockFeeder) Send(doc Document) Result {
f.mu.Lock()
defer f.mu.Unlock()
- result := Result{Id: doc.Id}
if f.failAfterNDocs > 0 && len(f.documents) >= f.failAfterNDocs {
- result.Status = StatusVespaFailure
- } else {
- f.documents = append(f.documents, doc)
+ return Result{Id: doc.Id, Status: StatusVespaFailure}
}
- if !result.Status.Success() {
- result.Stats.Errors = 1
- }
- return result
+ f.documents = append(f.documents, doc)
+ return Result{Id: doc.Id}
+}
+
+func (f *mockFeeder) Stats() Stats { return f.stats }
+
+func (f *mockFeeder) AddStats(stats Stats) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ f.stats.Add(stats)
}
func TestDispatcher(t *testing.T) {
feeder := &mockFeeder{}
- clock := &manualClock{tick: time.Second}
- throttler := newThrottler(clock.now)
- breaker := NewCircuitBreaker(time.Second, 0)
- dispatcher := NewDispatcher(feeder, throttler, breaker)
+ dispatcher := NewDispatcher(feeder, 2)
docs := []Document{
{Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{"foo": "123"}}`)},
{Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Body: []byte(`{"fields":{"bar": "456"}}`)},
@@ -70,10 +70,7 @@ func TestDispatcherOrdering(t *testing.T) {
{Id: mustParseId("id:ns:type::doc8"), Operation: OperationPut},
{Id: mustParseId("id:ns:type::doc9"), Operation: OperationPut},
}
- clock := &manualClock{tick: time.Second}
- throttler := newThrottler(clock.now)
- breaker := NewCircuitBreaker(time.Second, 0)
- dispatcher := NewDispatcher(feeder, throttler, breaker)
+ dispatcher := NewDispatcher(feeder, len(docs))
for _, d := range docs {
dispatcher.Enqueue(d)
}
@@ -93,7 +90,7 @@ func TestDispatcherOrdering(t *testing.T) {
}
assert.Equal(t, len(docs), len(feeder.documents))
assert.Equal(t, wantDocs, gotDocs)
- assert.Equal(t, int64(0), dispatcher.Stats().Errors)
+ assert.Equal(t, int64(0), feeder.Stats().Errors)
}
func TestDispatcherOrderingWithFailures(t *testing.T) {
@@ -106,26 +103,26 @@ func TestDispatcherOrderingWithFailures(t *testing.T) {
{Id: commonId, Operation: OperationRemove}, // fails
}
feeder.failAfterN(2)
- clock := &manualClock{tick: time.Second}
- throttler := newThrottler(clock.now)
- breaker := NewCircuitBreaker(time.Second, 0)
- dispatcher := NewDispatcher(feeder, throttler, breaker)
+ dispatcher := NewDispatcher(feeder, len(docs))
for _, d := range docs {
dispatcher.Enqueue(d)
}
dispatcher.Close()
wantDocs := docs[:2]
assert.Equal(t, wantDocs, feeder.documents)
- assert.Equal(t, int64(2), dispatcher.Stats().Errors)
+ assert.Equal(t, int64(2), feeder.Stats().Errors)
- // Dispatching more documents for same ID succeed
+ // Dispatching more documents for same ID fails implicitly
feeder.failAfterN(0)
dispatcher.start()
dispatcher.Enqueue(Document{Id: commonId, Operation: OperationPut})
dispatcher.Enqueue(Document{Id: commonId, Operation: OperationRemove})
- dispatcher.Enqueue(Document{Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut})
- dispatcher.Enqueue(Document{Id: mustParseId("id:ns:type::doc3"), Operation: OperationPut})
+ // Other IDs are fine
+ doc2 := Document{Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut}
+ doc3 := Document{Id: mustParseId("id:ns:type::doc3"), Operation: OperationPut}
+ dispatcher.Enqueue(doc2)
+ dispatcher.Enqueue(doc3)
dispatcher.Close()
- assert.Equal(t, int64(2), dispatcher.Stats().Errors)
- assert.Equal(t, 6, len(feeder.documents))
+ assert.Equal(t, int64(4), feeder.Stats().Errors)
+ assert.Equal(t, 4, len(feeder.documents))
}
diff --git a/client/go/internal/vespa/document/feeder.go b/client/go/internal/vespa/document/feeder.go
index 8bdd5bca5ba..6996e649d24 100644
--- a/client/go/internal/vespa/document/feeder.go
+++ b/client/go/internal/vespa/document/feeder.go
@@ -23,19 +23,17 @@ const (
// Result represents the result of a feeding operation.
type Result struct {
- Id Id
- Status Status
- HTTPStatus int
- Message string
- Trace string
- Err error
- Stats Stats
+ Id Id
+ Status Status
+ Message string
+ Trace string
+ Err error
}
// Success returns whether status s is considered a success.
func (s Status) Success() bool { return s == StatusSuccess || s == StatusConditionNotMet }
-// Stats represents feeding operation statistics.
+// Stats represents the summed statistics of a feeder.
type Stats struct {
Requests int64
Responses int64
@@ -49,6 +47,8 @@ type Stats struct {
BytesRecv int64
}
+func NewStats() Stats { return Stats{ResponsesByCode: make(map[int]int64)} }
+
// AvgLatency returns the average latency for a request.
func (s Stats) AvgLatency() time.Duration {
requests := s.Requests
@@ -69,9 +69,6 @@ func (s Stats) Successes() int64 {
func (s *Stats) Add(other Stats) {
s.Requests += other.Requests
s.Responses += other.Responses
- if s.ResponsesByCode == nil && other.ResponsesByCode != nil {
- s.ResponsesByCode = make(map[int]int64)
- }
for code, count := range other.ResponsesByCode {
_, ok := s.ResponsesByCode[code]
if ok {
@@ -94,4 +91,8 @@ func (s *Stats) Add(other Stats) {
}
// Feeder is the interface for a consumer of documents.
-type Feeder interface{ Send(Document) Result }
+type Feeder interface {
+ Send(Document) Result
+ Stats() Stats
+ AddStats(Stats)
+}
diff --git a/client/go/internal/vespa/document/feeder_test.go b/client/go/internal/vespa/document/feeder_test.go
index a7d92495889..1368d871436 100644
--- a/client/go/internal/vespa/document/feeder_test.go
+++ b/client/go/internal/vespa/document/feeder_test.go
@@ -7,7 +7,7 @@ import (
)
func TestStatsAdd(t *testing.T) {
- var got Stats
+ got := NewStats()
got.Add(Stats{Requests: 1})
got.Add(Stats{Requests: 1})
got.Add(Stats{Responses: 1})
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go
index 4dadcd1d05c..2e01d4564ab 100644
--- a/client/go/internal/vespa/document/http.go
+++ b/client/go/internal/vespa/document/http.go
@@ -9,6 +9,7 @@ import (
"net/url"
"strconv"
"strings"
+ "sync"
"time"
"github.com/vespa-engine/vespa/client/go/internal/util"
@@ -18,6 +19,8 @@ import (
type Client struct {
options ClientOptions
httpClient util.HTTPClient
+ stats Stats
+ mu sync.Mutex
now func() time.Time
}
@@ -44,6 +47,7 @@ func NewClient(options ClientOptions, httpClient util.HTTPClient) *Client {
c := &Client{
options: options,
httpClient: httpClient,
+ stats: NewStats(),
now: time.Now,
}
return c
@@ -112,42 +116,49 @@ func (c *Client) feedURL(d Document, queryParams url.Values) (string, *url.URL,
// Send given document the URL configured in this client.
func (c *Client) Send(document Document) Result {
start := c.now()
- result := Result{Id: document.Id}
- result.Stats.Requests = 1
+ stats := NewStats()
+ stats.Requests = 1
+ defer func() {
+ latency := c.now().Sub(start)
+ stats.TotalLatency = latency
+ stats.MinLatency = latency
+ stats.MaxLatency = latency
+ c.AddStats(stats)
+ }()
method, url, err := c.feedURL(document, c.queryParams())
if err != nil {
- result.Stats.Errors = 1
- result.Err = err
- return result
+ stats.Errors = 1
+ return Result{Status: StatusError, Err: err}
}
req, err := http.NewRequest(method, url.String(), bytes.NewReader(document.Body))
if err != nil {
- result.Stats.Errors = 1
- result.Status = StatusError
- result.Err = err
- return result
+ stats.Errors = 1
+ return Result{Status: StatusError, Err: err}
}
- resp, err := c.httpClient.Do(req, 190*time.Second)
+ resp, err := c.httpClient.Do(req, c.options.Timeout)
if err != nil {
- result.Stats.Errors = 1
- result.Status = StatusTransportFailure
- result.Err = err
- return result
+ stats.Errors = 1
+ return Result{Status: StatusTransportFailure, Err: err}
}
defer resp.Body.Close()
- result.Stats.Responses = 1
- result.Stats.ResponsesByCode = map[int]int64{
+ stats.Responses = 1
+ stats.ResponsesByCode = map[int]int64{
resp.StatusCode: 1,
}
- result.Stats.BytesSent = int64(len(document.Body))
- elapsed := c.now().Sub(start)
- result.Stats.TotalLatency = elapsed
- result.Stats.MinLatency = elapsed
- result.Stats.MaxLatency = elapsed
- return c.resultWithResponse(resp, result)
+ stats.BytesSent = int64(len(document.Body))
+ return c.createResult(document.Id, &stats, resp)
+}
+
+func (c *Client) Stats() Stats { return c.stats }
+
+func (c *Client) AddStats(stats Stats) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ c.stats.Add(stats)
}
-func (c *Client) resultWithResponse(resp *http.Response, result Result) Result {
+func (c *Client) createResult(id Id, stats *Stats, resp *http.Response) Result {
+ result := Result{Id: id}
switch resp.StatusCode {
case 200:
result.Status = StatusSuccess
@@ -170,9 +181,9 @@ func (c *Client) resultWithResponse(resp *http.Response, result Result) Result {
}
result.Message = body.Message
result.Trace = string(body.Trace)
- result.Stats.BytesRecv = cr.bytesRead
+ stats.BytesRecv = cr.bytesRead
if !result.Status.Success() {
- result.Stats.Errors = 1
+ stats.Errors = 1
}
return result
}
diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go
index 311668fa16e..f02c87730d5 100644
--- a/client/go/internal/vespa/document/http_test.go
+++ b/client/go/internal/vespa/document/http_test.go
@@ -19,12 +19,11 @@ type manualClock struct {
}
func (c *manualClock) now() time.Time {
- c.advance(c.tick)
- return c.t
+ t := c.t
+ c.t = c.t.Add(c.tick)
+ return t
}
-func (c *manualClock) advance(d time.Duration) { c.t = c.t.Add(d) }
-
func TestClientSend(t *testing.T) {
docs := []Document{
{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "123"}}`)},
@@ -38,7 +37,6 @@ func TestClientSend(t *testing.T) {
}, &httpClient)
clock := manualClock{t: time.Now(), tick: time.Second}
client.now = clock.now
- var stats Stats
for i, doc := range docs {
if i < 2 {
httpClient.NextResponseString(200, `{"message":"All good!"}`)
@@ -46,7 +44,6 @@ func TestClientSend(t *testing.T) {
httpClient.NextResponseString(502, `{"message":"Good bye, cruel world!"}`)
}
res := client.Send(doc)
- stats.Add(res.Stats)
if res.Err != nil {
t.Fatalf("got unexpected error %q", res.Err)
}
@@ -67,6 +64,7 @@ func TestClientSend(t *testing.T) {
t.Errorf("got r.Body = %q, want %q", string(body), string(wantBody))
}
}
+ stats := client.Stats()
want := Stats{
Requests: 3,
Responses: 3,
diff --git a/client/go/internal/vespa/document/throttler.go b/client/go/internal/vespa/document/throttler.go
deleted file mode 100644
index f7c57ff97ed..00000000000
--- a/client/go/internal/vespa/document/throttler.go
+++ /dev/null
@@ -1,117 +0,0 @@
-package document
-
-import (
- "math"
- "math/rand"
- "sync/atomic"
- "time"
-)
-
-const (
- throttlerWeight = 0.7
- // TODO(mpolden): Multiply this by connections per endpoint, and number of endpoints when this becomes configurable
- // for local target
- throttlerMinInflight = 16
- throttlerMaxInflight = 256 * throttlerMinInflight // 4096 max streams per connection on the server side
-)
-
-type Throttler interface {
- // Sent notifies the the throttler that a document has been sent.
- Sent()
- // Success notifies the throttler that document operation succeeded.
- Success()
- // Throttled notifies the throttler that a throttling event occured while count documents were in-flight.
- Throttled(count int64)
- // TargetInflight returns the ideal number of documents to have in-flight now.
- TargetInflight() int64
-}
-
-type dynamicThrottler struct {
- ok atomic.Int64
- targetInflight atomic.Int64
- targetTimesTen atomic.Int64
-
- throughputs []float64
- sent int64
-
- start time.Time
- now func() time.Time
-}
-
-func newThrottler(nowFunc func() time.Time) *dynamicThrottler {
- d := &dynamicThrottler{
- throughputs: make([]float64, 128),
- start: nowFunc(),
- now: nowFunc,
- }
- d.targetInflight.Store(8 * throttlerMinInflight)
- d.targetTimesTen.Store(10 * throttlerMaxInflight)
- return d
-}
-
-func NewThrottler() Throttler { return newThrottler(time.Now) }
-
-func (t *dynamicThrottler) Sent() {
- currentInflight := t.targetInflight.Load()
- t.sent++
- if t.sent*t.sent*t.sent < 100*currentInflight*currentInflight {
- return
- }
- t.sent = 0
- now := t.now()
- elapsed := now.Sub(t.start)
- t.start = now
- currentThroughput := float64(t.ok.Swap(0)) / float64(elapsed)
-
- // Use buckets for throughput over inflight, along the log-scale, in [minInflight, maxInflight).
- index := int(float64(len(t.throughputs)) * math.Log(max(1, min(255, float64(currentInflight)/throttlerMinInflight))) / math.Log(256))
- t.throughputs[index] = currentThroughput
-
- // Loop over throughput measurements and pick the one which optimises throughput and latency.
- choice := float64(currentInflight)
- maxObjective := float64(-1)
- for i := len(t.throughputs) - 1; i >= 0; i-- {
- if t.throughputs[i] == 0 {
- continue // Skip unknown values
- }
- inflight := float64(throttlerMinInflight) * math.Pow(256, (float64(i)+0.5)/float64(len(t.throughputs)))
- objective := t.throughputs[i] * math.Pow(inflight, throttlerWeight-1) // Optimise throughput (weight), but also latency (1 - weight)
- if objective > maxObjective {
- maxObjective = objective
- choice = inflight
- }
- }
- target := int64((rand.Float64()*0.20 + 0.92) * choice) // Random walk, skewed towards increase
- t.targetInflight.Store(max(throttlerMinInflight, min(throttlerMaxInflight, target)))
-}
-
-func (t *dynamicThrottler) Success() {
- t.targetTimesTen.Add(1)
- t.ok.Add(1)
-}
-
-func (t *dynamicThrottler) Throttled(inflight int64) {
- t.targetTimesTen.Store(max(inflight*5, throttlerMinInflight*10))
-}
-
-func (t *dynamicThrottler) TargetInflight() int64 {
- staticTargetInflight := min(throttlerMaxInflight, t.targetTimesTen.Load()/10)
- targetInflight := t.targetInflight.Load()
- return min(staticTargetInflight, targetInflight)
-}
-
-type number interface{ float64 | int64 }
-
-func min[T number](x, y T) T {
- if x < y {
- return x
- }
- return y
-}
-
-func max[T number](x, y T) T {
- if x > y {
- return x
- }
- return y
-}
diff --git a/client/go/internal/vespa/document/throttler_test.go b/client/go/internal/vespa/document/throttler_test.go
deleted file mode 100644
index 2fd1e73a45a..00000000000
--- a/client/go/internal/vespa/document/throttler_test.go
+++ /dev/null
@@ -1,21 +0,0 @@
-package document
-
-import (
- "testing"
- "time"
-)
-
-func TestThrottler(t *testing.T) {
- clock := &manualClock{tick: time.Second}
- tr := newThrottler(clock.now)
- for i := 0; i < 100; i++ {
- tr.Sent()
- }
- if got, want := tr.TargetInflight(), int64(128); got != want {
- t.Errorf("got TargetInflight() = %d, but want %d", got, want)
- }
- tr.Throttled(5)
- if got, want := tr.TargetInflight(), int64(16); got != want {
- t.Errorf("got TargetInflight() = %d, but want %d", got, want)
- }
-}