aboutsummaryrefslogtreecommitdiffstats
path: root/client/go/internal/vespa/document/dispatcher.go
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-04-26 16:12:03 +0200
committerMartin Polden <mpolden@mpolden.no>2023-04-26 16:12:23 +0200
commitab9b6acf5d3839eddd3cc0981b2c618610163c52 (patch)
treec96d356e140a804320498d3aeb85859ad512bae8 /client/go/internal/vespa/document/dispatcher.go
parentf47188b7b2de589ffee511e90759f36bfb0335c1 (diff)
Add inflight count to stats
Diffstat (limited to 'client/go/internal/vespa/document/dispatcher.go')
-rw-r--r--client/go/internal/vespa/document/dispatcher.go10
1 files changed, 9 insertions, 1 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index 51963ff86c7..3876e55210a 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -32,6 +32,7 @@ type Dispatcher struct {
listPool sync.Pool
mu sync.Mutex
+ statsMu sync.Mutex
wg sync.WaitGroup
inflightWg sync.WaitGroup
}
@@ -145,7 +146,9 @@ func (d *Dispatcher) dispatch(op documentOp) {
func (d *Dispatcher) processResults() {
defer d.wg.Done()
for op := range d.results {
+ d.statsMu.Lock()
d.stats.Add(op.result.Stats)
+ d.statsMu.Unlock()
if d.shouldRetry(op, op.result) {
d.enqueue(op.resetResult(), true)
} else if op.complete() {
@@ -235,7 +238,12 @@ func (d *Dispatcher) releaseSlot() { atomic.AddInt64(&d.inflightCount, -1) }
func (d *Dispatcher) Enqueue(doc Document) error { return d.enqueue(documentOp{document: doc}, false) }
-func (d *Dispatcher) Stats() Stats { return d.stats }
+func (d *Dispatcher) Stats() Stats {
+ d.statsMu.Lock()
+ defer d.statsMu.Unlock()
+ d.stats.Inflight = atomic.LoadInt64(&d.inflightCount)
+ return d.stats
+}
// Close waits for all inflight operations to complete and closes the dispatcher.
func (d *Dispatcher) Close() error {