summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-05-22 20:29:06 +0200
committerMartin Polden <mpolden@mpolden.no>2023-05-23 09:03:09 +0200
commite0e23b71806574e50a729867c6fbf93782fce3a6 (patch)
tree0029d89393945259fdb9d68273a977e3b1f0d62d
parent257527e34e0cb37936350d6aa55b38d478b2c6bf (diff)
Defer queue allocation until needed
-rw-r--r--client/go/internal/vespa/document/dispatcher.go31
1 files changed, 19 insertions, 12 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index 466ed9dd9fa..7a19d21f278 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -28,7 +28,6 @@ type Dispatcher struct {
output io.Writer
verbose bool
- queuePool sync.Pool
mu sync.Mutex
statsMu sync.Mutex
wg sync.WaitGroup
@@ -56,7 +55,6 @@ func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, o
output: output,
verbose: verbose,
}
- d.queuePool.New = func() any { return NewQueue[documentOp]() }
d.start()
return d
}
@@ -153,13 +151,19 @@ func (d *Dispatcher) dispatchNext(id Id) {
if !ok {
panic("no queue exists for " + id.String() + ": this should not happen")
}
- if next, ok := q.Poll(); ok {
- // we have more operations with this ID: dispatch the next one
- d.dispatch(next)
- } else {
+ hasNext := q != nil
+ if hasNext {
+ next, ok := q.Poll()
+ if ok {
+ // we have more operations with this ID: dispatch the next one
+ d.dispatch(next)
+ } else {
+ hasNext = false
+ }
+ }
+ if !hasNext {
// no more operations with this ID: release slot
delete(d.inflight, k)
- d.queuePool.Put(q)
d.releaseSlot()
}
}
@@ -181,12 +185,15 @@ func (d *Dispatcher) enqueue(op documentOp, isRetry bool) error {
d.mu.Unlock()
return fmt.Errorf("refusing to enqueue document %s: too many errors", op.document.Id.String())
}
- key := op.document.Id.String()
- q, ok := d.inflight[key]
+ k := op.document.Id.String()
+ q, ok := d.inflight[k]
if !ok {
- q = d.queuePool.Get().(*Queue[documentOp])
- d.inflight[key] = q
+ d.inflight[k] = nil // track operation, but defer allocating queue until needed
} else {
+ if q == nil {
+ q = NewQueue[documentOp]()
+ d.inflight[k] = q
+ }
q.Add(op, isRetry)
}
if !isRetry {
@@ -194,7 +201,7 @@ func (d *Dispatcher) enqueue(op documentOp, isRetry bool) error {
}
d.mu.Unlock()
if !ok && !isRetry {
- // first operation with this ID: acquire slot
+ // first operation with this ID: acquire slot and dispatch
d.acquireSlot()
d.dispatch(op)
d.throttler.Sent()