aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-04-18 15:50:15 +0200
committerMartin Polden <mpolden@mpolden.no>2023-04-19 11:35:43 +0200
commit6701eba23ad0ce4e5427e8aaff403d8697e548e4 (patch)
tree8faff14c86bcb89e895322032c1b6cf74827df98
parent032eb2992d957c98f85a741e55de2facbdc87398 (diff)
Simplify
-rw-r--r--client/go/internal/vespa/document/dispatcher.go57
1 files changed, 21 insertions, 36 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index 533ca7a0019..96090c18685 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -30,7 +30,7 @@ type Dispatcher struct {
verbose bool
mu sync.RWMutex
- wg sync.WaitGroup
+ workerWg sync.WaitGroup
resultWg sync.WaitGroup
}
@@ -138,42 +138,20 @@ func (d *Dispatcher) start() {
d.results = make(chan Result, 4096)
d.msgs = make(chan string, 4096)
d.started = true
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
- d.readDocuments()
- }()
d.resultWg.Add(2)
- go func() {
- defer d.resultWg.Done()
- d.readResults()
- }()
- go func() {
- defer d.resultWg.Done()
- d.readMessages()
- }()
-}
-
-func (d *Dispatcher) readDocuments() {
- for id := range d.ready {
- d.mu.RLock()
- group := d.inflight[id.String()]
- d.mu.RUnlock()
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
- d.sendDocumentIn(group)
- }()
- }
+ go d.sumStats()
+ go d.printMessages()
}
-func (d *Dispatcher) readResults() {
+func (d *Dispatcher) sumStats() {
+ defer d.resultWg.Done()
for result := range d.results {
d.stats.Add(result.Stats)
}
}
-func (d *Dispatcher) readMessages() {
+func (d *Dispatcher) printMessages() {
+ defer d.resultWg.Done()
for msg := range d.msgs {
fmt.Fprintln(d.output, msg)
}
@@ -200,6 +178,19 @@ func (d *Dispatcher) enqueueWithSlot(id Id) {
d.acquireSlot()
d.ready <- id
d.throttler.Sent()
+ d.dispatch()
+}
+
+func (d *Dispatcher) dispatch() {
+ d.workerWg.Add(1)
+ go func() {
+ defer d.workerWg.Done()
+ id := <-d.ready
+ d.mu.RLock()
+ group := d.inflight[id.String()]
+ d.mu.RUnlock()
+ d.sendDocumentIn(group)
+ }()
}
func (d *Dispatcher) acquireSlot() {
@@ -217,13 +208,7 @@ func (d *Dispatcher) Stats() Stats { return d.stats }
// Close closes the dispatcher and waits for all inflight operations to complete.
func (d *Dispatcher) Close() error {
- d.mu.Lock()
- if d.started {
- close(d.ready)
- }
- d.mu.Unlock()
- d.wg.Wait() // Wait for inflight operations to complete
-
+ d.workerWg.Wait() // Wait for all inflight operations to complete
d.mu.Lock()
if d.started {
close(d.results)