summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2023-05-23 09:44:16 +0200
committerGitHub <noreply@github.com>2023-05-23 09:44:16 +0200
commitf3d91a0ee3f4d810d213d6f44a73ed21f30b6180 (patch)
treed2119811d3f151d402ccfb4d696fe3e9bbbf9602
parent23128f585da2b38d33978c14e80662a183fbe725 (diff)
parente0e23b71806574e50a729867c6fbf93782fce3a6 (diff)
Merge pull request #27178 from vespa-engine/mpolden/defer-q-alloc
Defer queue allocation until needed
-rw-r--r--client/go/internal/vespa/document/circuit_breaker.go26
-rw-r--r--client/go/internal/vespa/document/dispatcher.go46
-rw-r--r--client/go/internal/vespa/document/http.go14
-rw-r--r--client/go/internal/vespa/document/http_test.go8
4 files changed, 44 insertions, 50 deletions
diff --git a/client/go/internal/vespa/document/circuit_breaker.go b/client/go/internal/vespa/document/circuit_breaker.go
index 17fc595d58f..f7f0f4360df 100644
--- a/client/go/internal/vespa/document/circuit_breaker.go
+++ b/client/go/internal/vespa/document/circuit_breaker.go
@@ -27,38 +27,38 @@ type timeCircuitBreaker struct {
graceDuration time.Duration
doomDuration time.Duration
- failingSinceMillis int64
+ failingSinceMillis atomic.Int64
lastError atomic.Value
- halfOpen atomic.Value
- open atomic.Value
+ halfOpen atomic.Bool
+ open atomic.Bool
now func() time.Time
}
func (b *timeCircuitBreaker) Success() {
- atomic.StoreInt64(&b.failingSinceMillis, math.MaxInt64)
- if !b.open.Load().(bool) {
+ b.failingSinceMillis.Store(math.MaxInt64)
+ if !b.open.Load() {
b.halfOpen.CompareAndSwap(true, false)
}
}
func (b *timeCircuitBreaker) Error(err error) {
- if atomic.CompareAndSwapInt64(&b.failingSinceMillis, math.MaxInt64, b.now().UnixMilli()) {
+ if b.failingSinceMillis.CompareAndSwap(math.MaxInt64, b.now().UnixMilli()) {
b.lastError.Store(err)
}
}
func (b *timeCircuitBreaker) State() CircuitState {
- failingDuration := b.now().Sub(time.UnixMilli(atomic.LoadInt64(&b.failingSinceMillis)))
+ failingDuration := b.now().Sub(time.UnixMilli(b.failingSinceMillis.Load()))
if failingDuration > b.graceDuration {
b.halfOpen.CompareAndSwap(false, true)
}
if b.doomDuration > 0 && failingDuration > b.doomDuration {
b.open.CompareAndSwap(false, true)
}
- if b.open.Load().(bool) {
+ if b.open.Load() {
return CircuitOpen
- } else if b.halfOpen.Load().(bool) {
+ } else if b.halfOpen.Load() {
return CircuitHalfOpen
}
return CircuitClosed
@@ -66,11 +66,11 @@ func (b *timeCircuitBreaker) State() CircuitState {
func NewCircuitBreaker(graceDuration, doomDuration time.Duration) *timeCircuitBreaker {
b := &timeCircuitBreaker{
- graceDuration: graceDuration,
- doomDuration: doomDuration,
- now: time.Now,
- failingSinceMillis: math.MaxInt64,
+ graceDuration: graceDuration,
+ doomDuration: doomDuration,
+ now: time.Now,
}
+ b.failingSinceMillis.Store(math.MaxInt64)
b.open.Store(false)
b.halfOpen.Store(false)
return b
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index 2ad5b841616..7a19d21f278 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -20,7 +20,6 @@ type Dispatcher struct {
stats Stats
started bool
- ready chan documentOp
results chan documentOp
msgs chan string
@@ -29,7 +28,6 @@ type Dispatcher struct {
output io.Writer
verbose bool
- queuePool sync.Pool
mu sync.Mutex
statsMu sync.Mutex
wg sync.WaitGroup
@@ -57,7 +55,6 @@ func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, o
output: output,
verbose: verbose,
}
- d.queuePool.New = func() any { return NewQueue[documentOp]() }
d.start()
return d
}
@@ -110,23 +107,14 @@ func (d *Dispatcher) start() {
if d.started {
return
}
- d.ready = make(chan documentOp, 4096)
d.results = make(chan documentOp, 4096)
d.msgs = make(chan string, 4096)
d.started = true
- d.wg.Add(3)
- go d.dispatchReady()
+ d.wg.Add(2)
go d.processResults()
go d.printMessages()
}
-func (d *Dispatcher) dispatchReady() {
- defer d.wg.Done()
- for op := range d.ready {
- d.dispatch(op)
- }
-}
-
func (d *Dispatcher) dispatch(op documentOp) {
if !d.acceptDocument() {
d.msgs <- fmt.Sprintf("refusing to dispatch document %s: too many errors", op.document.Id.String())
@@ -163,13 +151,19 @@ func (d *Dispatcher) dispatchNext(id Id) {
if !ok {
panic("no queue exists for " + id.String() + ": this should not happen")
}
- if next, ok := q.Poll(); ok {
- // we have more operations with this ID: notify dispatcher about the next one
- d.ready <- next
- } else {
+ hasNext := q != nil
+ if hasNext {
+ next, ok := q.Poll()
+ if ok {
+ // we have more operations with this ID: dispatch the next one
+ d.dispatch(next)
+ } else {
+ hasNext = false
+ }
+ }
+ if !hasNext {
// no more operations with this ID: release slot
delete(d.inflight, k)
- d.queuePool.Put(q)
d.releaseSlot()
}
}
@@ -191,12 +185,15 @@ func (d *Dispatcher) enqueue(op documentOp, isRetry bool) error {
d.mu.Unlock()
return fmt.Errorf("refusing to enqueue document %s: too many errors", op.document.Id.String())
}
- key := op.document.Id.String()
- q, ok := d.inflight[key]
+ k := op.document.Id.String()
+ q, ok := d.inflight[k]
if !ok {
- q = d.queuePool.Get().(*Queue[documentOp])
- d.inflight[key] = q
+ d.inflight[k] = nil // track operation, but defer allocating queue until needed
} else {
+ if q == nil {
+ q = NewQueue[documentOp]()
+ d.inflight[k] = q
+ }
q.Add(op, isRetry)
}
if !isRetry {
@@ -204,9 +201,9 @@ func (d *Dispatcher) enqueue(op documentOp, isRetry bool) error {
}
d.mu.Unlock()
if !ok && !isRetry {
- // first operation with this ID: acquire slot
+ // first operation with this ID: acquire slot and dispatch
d.acquireSlot()
- d.ready <- op
+ d.dispatch(op)
d.throttler.Sent()
}
return nil
@@ -248,7 +245,6 @@ func (d *Dispatcher) Close() error {
d.inflightWg.Wait() // Wait for all inflight operations to complete
d.mu.Lock()
if d.started {
- close(d.ready)
close(d.results)
close(d.msgs)
d.started = false
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go
index ce57ac55f03..e083f017c4a 100644
--- a/client/go/internal/vespa/document/http.go
+++ b/client/go/internal/vespa/document/http.go
@@ -46,7 +46,7 @@ type Client struct {
options ClientOptions
httpClients []countingHTTPClient
now func() time.Time
- sendCount int32
+ sendCount atomic.Int32
gzippers sync.Pool
buffers sync.Pool
pending chan *pendingDocument
@@ -65,13 +65,11 @@ type ClientOptions struct {
type countingHTTPClient struct {
client util.HTTPClient
- inflight int64
+ inflight atomic.Int64
}
-func (c *countingHTTPClient) addInflight(n int64) { atomic.AddInt64(&c.inflight, n) }
-
func (c *countingHTTPClient) Do(req *http.Request, timeout time.Duration) (*http.Response, error) {
- defer c.addInflight(-1)
+ defer c.inflight.Add(-1)
return c.client.Do(req, timeout)
}
@@ -186,18 +184,18 @@ func (c *Client) methodAndURL(d Document, sb *bytes.Buffer) (string, string) {
func (c *Client) leastBusyClient() *countingHTTPClient {
leastBusy := c.httpClients[0]
min := int64(math.MaxInt64)
- next := atomic.AddInt32(&c.sendCount, 1)
+ next := c.sendCount.Add(1)
start := int(next) % len(c.httpClients)
for i := range c.httpClients {
j := (i + start) % len(c.httpClients)
client := c.httpClients[j]
- inflight := atomic.LoadInt64(&client.inflight)
+ inflight := client.inflight.Load()
if inflight < min {
leastBusy = client
min = inflight
}
}
- leastBusy.addInflight(1)
+ leastBusy.inflight.Add(1)
return &leastBusy
}
diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go
index 6eda5f04fd6..1bc3a6c9f39 100644
--- a/client/go/internal/vespa/document/http_test.go
+++ b/client/go/internal/vespa/document/http_test.go
@@ -36,13 +36,13 @@ func TestLeastBusyClient(t *testing.T) {
httpClients = append(httpClients, &mockHTTPClient{i, &httpClient})
}
client, _ := NewClient(ClientOptions{}, httpClients)
- client.httpClients[0].addInflight(1)
- client.httpClients[1].addInflight(1)
+ client.httpClients[0].inflight.Add(1)
+ client.httpClients[1].inflight.Add(1)
assertLeastBusy(t, 2, client)
assertLeastBusy(t, 2, client)
assertLeastBusy(t, 3, client)
- client.httpClients[3].addInflight(1)
- client.httpClients[1].addInflight(-1)
+ client.httpClients[3].inflight.Add(1)
+ client.httpClients[1].inflight.Add(-1)
assertLeastBusy(t, 1, client)
}