diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-05-22 20:29:06 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-05-23 09:03:09 +0200 |
commit | e0e23b71806574e50a729867c6fbf93782fce3a6 (patch) | |
tree | 0029d89393945259fdb9d68273a977e3b1f0d62d /client | |
parent | 257527e34e0cb37936350d6aa55b38d478b2c6bf (diff) |
Defer queue allocation until needed
Diffstat (limited to 'client')
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 31 |
1 files changed, 19 insertions, 12 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 466ed9dd9fa..7a19d21f278 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -28,7 +28,6 @@ type Dispatcher struct { output io.Writer verbose bool - queuePool sync.Pool mu sync.Mutex statsMu sync.Mutex wg sync.WaitGroup @@ -56,7 +55,6 @@ func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, o output: output, verbose: verbose, } - d.queuePool.New = func() any { return NewQueue[documentOp]() } d.start() return d } @@ -153,13 +151,19 @@ func (d *Dispatcher) dispatchNext(id Id) { if !ok { panic("no queue exists for " + id.String() + ": this should not happen") } - if next, ok := q.Poll(); ok { - // we have more operations with this ID: dispatch the next one - d.dispatch(next) - } else { + hasNext := q != nil + if hasNext { + next, ok := q.Poll() + if ok { + // we have more operations with this ID: dispatch the next one + d.dispatch(next) + } else { + hasNext = false + } + } + if !hasNext { // no more operations with this ID: release slot delete(d.inflight, k) - d.queuePool.Put(q) d.releaseSlot() } } @@ -181,12 +185,15 @@ func (d *Dispatcher) enqueue(op documentOp, isRetry bool) error { d.mu.Unlock() return fmt.Errorf("refusing to enqueue document %s: too many errors", op.document.Id.String()) } - key := op.document.Id.String() - q, ok := d.inflight[key] + k := op.document.Id.String() + q, ok := d.inflight[k] if !ok { - q = d.queuePool.Get().(*Queue[documentOp]) - d.inflight[key] = q + d.inflight[k] = nil // track operation, but defer allocating queue until needed } else { + if q == nil { + q = NewQueue[documentOp]() + d.inflight[k] = q + } q.Add(op, isRetry) } if !isRetry { @@ -194,7 +201,7 @@ func (d *Dispatcher) enqueue(op documentOp, isRetry bool) error { } d.mu.Unlock() if !ok && !isRetry { - // first operation with this ID: acquire slot + // first operation with this ID: acquire slot and dispatch d.acquireSlot() d.dispatch(op) d.throttler.Sent() |