diff options
author | Jon Bratseth <bratseth@gmail.com> | 2023-05-23 09:44:16 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-23 09:44:16 +0200 |
commit | f3d91a0ee3f4d810d213d6f44a73ed21f30b6180 (patch) | |
tree | d2119811d3f151d402ccfb4d696fe3e9bbbf9602 | |
parent | 23128f585da2b38d33978c14e80662a183fbe725 (diff) | |
parent | e0e23b71806574e50a729867c6fbf93782fce3a6 (diff) |
Merge pull request #27178 from vespa-engine/mpolden/defer-q-alloc
Defer queue allocation until needed
-rw-r--r-- | client/go/internal/vespa/document/circuit_breaker.go | 26 | ||||
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 46 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http.go | 14 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http_test.go | 8 |
4 files changed, 44 insertions, 50 deletions
diff --git a/client/go/internal/vespa/document/circuit_breaker.go b/client/go/internal/vespa/document/circuit_breaker.go index 17fc595d58f..f7f0f4360df 100644 --- a/client/go/internal/vespa/document/circuit_breaker.go +++ b/client/go/internal/vespa/document/circuit_breaker.go @@ -27,38 +27,38 @@ type timeCircuitBreaker struct { graceDuration time.Duration doomDuration time.Duration - failingSinceMillis int64 + failingSinceMillis atomic.Int64 lastError atomic.Value - halfOpen atomic.Value - open atomic.Value + halfOpen atomic.Bool + open atomic.Bool now func() time.Time } func (b *timeCircuitBreaker) Success() { - atomic.StoreInt64(&b.failingSinceMillis, math.MaxInt64) - if !b.open.Load().(bool) { + b.failingSinceMillis.Store(math.MaxInt64) + if !b.open.Load() { b.halfOpen.CompareAndSwap(true, false) } } func (b *timeCircuitBreaker) Error(err error) { - if atomic.CompareAndSwapInt64(&b.failingSinceMillis, math.MaxInt64, b.now().UnixMilli()) { + if b.failingSinceMillis.CompareAndSwap(math.MaxInt64, b.now().UnixMilli()) { b.lastError.Store(err) } } func (b *timeCircuitBreaker) State() CircuitState { - failingDuration := b.now().Sub(time.UnixMilli(atomic.LoadInt64(&b.failingSinceMillis))) + failingDuration := b.now().Sub(time.UnixMilli(b.failingSinceMillis.Load())) if failingDuration > b.graceDuration { b.halfOpen.CompareAndSwap(false, true) } if b.doomDuration > 0 && failingDuration > b.doomDuration { b.open.CompareAndSwap(false, true) } - if b.open.Load().(bool) { + if b.open.Load() { return CircuitOpen - } else if b.halfOpen.Load().(bool) { + } else if b.halfOpen.Load() { return CircuitHalfOpen } return CircuitClosed @@ -66,11 +66,11 @@ func (b *timeCircuitBreaker) State() CircuitState { func NewCircuitBreaker(graceDuration, doomDuration time.Duration) *timeCircuitBreaker { b := &timeCircuitBreaker{ - graceDuration: graceDuration, - doomDuration: doomDuration, - now: time.Now, - failingSinceMillis: math.MaxInt64, + graceDuration: graceDuration, + doomDuration: doomDuration, + now: time.Now, } + b.failingSinceMillis.Store(math.MaxInt64) b.open.Store(false) b.halfOpen.Store(false) return b diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 2ad5b841616..7a19d21f278 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -20,7 +20,6 @@ type Dispatcher struct { stats Stats started bool - ready chan documentOp results chan documentOp msgs chan string @@ -29,7 +28,6 @@ type Dispatcher struct { output io.Writer verbose bool - queuePool sync.Pool mu sync.Mutex statsMu sync.Mutex wg sync.WaitGroup @@ -57,7 +55,6 @@ func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, o output: output, verbose: verbose, } - d.queuePool.New = func() any { return NewQueue[documentOp]() } d.start() return d } @@ -110,23 +107,14 @@ func (d *Dispatcher) start() { if d.started { return } - d.ready = make(chan documentOp, 4096) d.results = make(chan documentOp, 4096) d.msgs = make(chan string, 4096) d.started = true - d.wg.Add(3) - go d.dispatchReady() + d.wg.Add(2) go d.processResults() go d.printMessages() } -func (d *Dispatcher) dispatchReady() { - defer d.wg.Done() - for op := range d.ready { - d.dispatch(op) - } -} - func (d *Dispatcher) dispatch(op documentOp) { if !d.acceptDocument() { d.msgs <- fmt.Sprintf("refusing to dispatch document %s: too many errors", op.document.Id.String()) @@ -163,13 +151,19 @@ func (d *Dispatcher) dispatchNext(id Id) { if !ok { panic("no queue exists for " + id.String() + ": this should not happen") } - if next, ok := q.Poll(); ok { - // we have more operations with this ID: notify dispatcher about the next one - d.ready <- next - } else { + hasNext := q != nil + if hasNext { + next, ok := q.Poll() + if ok { + // we have more operations with this ID: dispatch the next one + d.dispatch(next) + } else { + hasNext = false + } + } + if !hasNext { // no more operations with this ID: release slot delete(d.inflight, k) - d.queuePool.Put(q) d.releaseSlot() } } @@ -191,12 +185,15 @@ func (d *Dispatcher) enqueue(op documentOp, isRetry bool) error { d.mu.Unlock() return fmt.Errorf("refusing to enqueue document %s: too many errors", op.document.Id.String()) } - key := op.document.Id.String() - q, ok := d.inflight[key] + k := op.document.Id.String() + q, ok := d.inflight[k] if !ok { - q = d.queuePool.Get().(*Queue[documentOp]) - d.inflight[key] = q + d.inflight[k] = nil // track operation, but defer allocating queue until needed } else { + if q == nil { + q = NewQueue[documentOp]() + d.inflight[k] = q + } q.Add(op, isRetry) } if !isRetry { @@ -204,9 +201,9 @@ func (d *Dispatcher) enqueue(op documentOp, isRetry bool) error { } d.mu.Unlock() if !ok && !isRetry { - // first operation with this ID: acquire slot + // first operation with this ID: acquire slot and dispatch d.acquireSlot() - d.ready <- op + d.dispatch(op) d.throttler.Sent() } return nil @@ -248,7 +245,6 @@ func (d *Dispatcher) Close() error { d.inflightWg.Wait() // Wait for all inflight operations to complete d.mu.Lock() if d.started { - close(d.ready) close(d.results) close(d.msgs) d.started = false diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go index ce57ac55f03..e083f017c4a 100644 --- a/client/go/internal/vespa/document/http.go +++ b/client/go/internal/vespa/document/http.go @@ -46,7 +46,7 @@ type Client struct { options ClientOptions httpClients []countingHTTPClient now func() time.Time - sendCount int32 + sendCount atomic.Int32 gzippers sync.Pool buffers sync.Pool pending chan *pendingDocument @@ -65,13 +65,11 @@ type ClientOptions struct { type countingHTTPClient struct { client util.HTTPClient - inflight int64 + inflight atomic.Int64 } -func (c *countingHTTPClient) addInflight(n int64) { atomic.AddInt64(&c.inflight, n) } - func (c *countingHTTPClient) Do(req *http.Request, timeout time.Duration) (*http.Response, error) { - defer c.addInflight(-1) + defer c.inflight.Add(-1) return c.client.Do(req, timeout) } @@ -186,18 +184,18 @@ func (c *Client) methodAndURL(d Document, sb *bytes.Buffer) (string, string) { func (c *Client) leastBusyClient() *countingHTTPClient { leastBusy := c.httpClients[0] min := int64(math.MaxInt64) - next := atomic.AddInt32(&c.sendCount, 1) + next := c.sendCount.Add(1) start := int(next) % len(c.httpClients) for i := range c.httpClients { j := (i + start) % len(c.httpClients) client := c.httpClients[j] - inflight := atomic.LoadInt64(&client.inflight) + inflight := client.inflight.Load() if inflight < min { leastBusy = client min = inflight } } - leastBusy.addInflight(1) + leastBusy.inflight.Add(1) return &leastBusy } diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go index 6eda5f04fd6..1bc3a6c9f39 100644 --- a/client/go/internal/vespa/document/http_test.go +++ b/client/go/internal/vespa/document/http_test.go @@ -36,13 +36,13 @@ func TestLeastBusyClient(t *testing.T) { httpClients = append(httpClients, &mockHTTPClient{i, &httpClient}) } client, _ := NewClient(ClientOptions{}, httpClients) - client.httpClients[0].addInflight(1) - client.httpClients[1].addInflight(1) + client.httpClients[0].inflight.Add(1) + client.httpClients[1].inflight.Add(1) assertLeastBusy(t, 2, client) assertLeastBusy(t, 2, client) assertLeastBusy(t, 3, client) - client.httpClients[3].addInflight(1) - client.httpClients[1].addInflight(-1) + client.httpClients[3].inflight.Add(1) + client.httpClients[1].inflight.Add(-1) assertLeastBusy(t, 1, client) } |