diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-04-18 15:50:15 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-04-19 11:35:43 +0200 |
commit | 6701eba23ad0ce4e5427e8aaff403d8697e548e4 (patch) | |
tree | 8faff14c86bcb89e895322032c1b6cf74827df98 | |
parent | 032eb2992d957c98f85a741e55de2facbdc87398 (diff) |
Simplify
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 57 |
1 files changed, 21 insertions, 36 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 533ca7a0019..96090c18685 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -30,7 +30,7 @@ type Dispatcher struct { verbose bool mu sync.RWMutex - wg sync.WaitGroup + workerWg sync.WaitGroup resultWg sync.WaitGroup } @@ -138,42 +138,20 @@ func (d *Dispatcher) start() { d.results = make(chan Result, 4096) d.msgs = make(chan string, 4096) d.started = true - d.wg.Add(1) - go func() { - defer d.wg.Done() - d.readDocuments() - }() d.resultWg.Add(2) - go func() { - defer d.resultWg.Done() - d.readResults() - }() - go func() { - defer d.resultWg.Done() - d.readMessages() - }() -} - -func (d *Dispatcher) readDocuments() { - for id := range d.ready { - d.mu.RLock() - group := d.inflight[id.String()] - d.mu.RUnlock() - d.wg.Add(1) - go func() { - defer d.wg.Done() - d.sendDocumentIn(group) - }() - } + go d.sumStats() + go d.printMessages() } -func (d *Dispatcher) readResults() { +func (d *Dispatcher) sumStats() { + defer d.resultWg.Done() for result := range d.results { d.stats.Add(result.Stats) } } -func (d *Dispatcher) readMessages() { +func (d *Dispatcher) printMessages() { + defer d.resultWg.Done() for msg := range d.msgs { fmt.Fprintln(d.output, msg) } @@ -200,6 +178,19 @@ func (d *Dispatcher) enqueueWithSlot(id Id) { d.acquireSlot() d.ready <- id d.throttler.Sent() + d.dispatch() +} + +func (d *Dispatcher) dispatch() { + d.workerWg.Add(1) + go func() { + defer d.workerWg.Done() + id := <-d.ready + d.mu.RLock() + group := d.inflight[id.String()] + d.mu.RUnlock() + d.sendDocumentIn(group) + }() } func (d *Dispatcher) acquireSlot() { @@ -217,13 +208,7 @@ func (d *Dispatcher) Stats() Stats { return d.stats } // Close closes the dispatcher and waits for all inflight operations to complete. func (d *Dispatcher) Close() error { - d.mu.Lock() - if d.started { - close(d.ready) - } - d.mu.Unlock() - d.wg.Wait() // Wait for inflight operations to complete - + d.workerWg.Wait() // Wait for all inflight operations to complete d.mu.Lock() if d.started { close(d.results) |