diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-04-26 16:12:03 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-04-26 16:12:23 +0200 |
commit | ab9b6acf5d3839eddd3cc0981b2c618610163c52 (patch) | |
tree | c96d356e140a804320498d3aeb85859ad512bae8 /client/go/internal/vespa/document/dispatcher.go | |
parent | f47188b7b2de589ffee511e90759f36bfb0335c1 (diff) |
Add inflight count to stats
Diffstat (limited to 'client/go/internal/vespa/document/dispatcher.go')
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 10 |
1 files changed, 9 insertions, 1 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 51963ff86c7..3876e55210a 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -32,6 +32,7 @@ type Dispatcher struct { listPool sync.Pool mu sync.Mutex + statsMu sync.Mutex wg sync.WaitGroup inflightWg sync.WaitGroup } @@ -145,7 +146,9 @@ func (d *Dispatcher) dispatch(op documentOp) { func (d *Dispatcher) processResults() { defer d.wg.Done() for op := range d.results { + d.statsMu.Lock() d.stats.Add(op.result.Stats) + d.statsMu.Unlock() if d.shouldRetry(op, op.result) { d.enqueue(op.resetResult(), true) } else if op.complete() { @@ -235,7 +238,12 @@ func (d *Dispatcher) releaseSlot() { atomic.AddInt64(&d.inflightCount, -1) } func (d *Dispatcher) Enqueue(doc Document) error { return d.enqueue(documentOp{document: doc}, false) } -func (d *Dispatcher) Stats() Stats { return d.stats } +func (d *Dispatcher) Stats() Stats { + d.statsMu.Lock() + defer d.statsMu.Unlock() + d.stats.Inflight = atomic.LoadInt64(&d.inflightCount) + return d.stats +} // Close waits for all inflight operations to complete and closes the dispatcher. func (d *Dispatcher) Close() error { |