diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-05-22 15:48:01 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-05-22 15:48:01 +0200 |
commit | 69944390cf45f617fb2250a95e27d08e355da78d (patch) | |
tree | 966c7fca8ddab6f0c64a28c8f55ccda0cfb5c116 /client | |
parent | 38601194dc7ece53e180005f10f26c3858956ce6 (diff) |
Remove unnecessary ready channel
Diffstat (limited to 'client')
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 19 |
1 files changed, 4 insertions, 15 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 2ad5b841616..466ed9dd9fa 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -20,7 +20,6 @@ type Dispatcher struct { stats Stats started bool - ready chan documentOp results chan documentOp msgs chan string @@ -110,23 +109,14 @@ func (d *Dispatcher) start() { if d.started { return } - d.ready = make(chan documentOp, 4096) d.results = make(chan documentOp, 4096) d.msgs = make(chan string, 4096) d.started = true - d.wg.Add(3) - go d.dispatchReady() + d.wg.Add(2) go d.processResults() go d.printMessages() } -func (d *Dispatcher) dispatchReady() { - defer d.wg.Done() - for op := range d.ready { - d.dispatch(op) - } -} - func (d *Dispatcher) dispatch(op documentOp) { if !d.acceptDocument() { d.msgs <- fmt.Sprintf("refusing to dispatch document %s: too many errors", op.document.Id.String()) @@ -164,8 +154,8 @@ func (d *Dispatcher) dispatchNext(id Id) { panic("no queue exists for " + id.String() + ": this should not happen") } if next, ok := q.Poll(); ok { - // we have more operations with this ID: notify dispatcher about the next one - d.ready <- next + // we have more operations with this ID: dispatch the next one + d.dispatch(next) } else { // no more operations with this ID: release slot delete(d.inflight, k) @@ -206,7 +196,7 @@ func (d *Dispatcher) enqueue(op documentOp, isRetry bool) error { if !ok && !isRetry { // first operation with this ID: acquire slot d.acquireSlot() - d.ready <- op + d.dispatch(op) d.throttler.Sent() } return nil @@ -248,7 +238,6 @@ func (d *Dispatcher) Close() error { d.inflightWg.Wait() // Wait for all inflight operations to complete d.mu.Lock() if d.started { - close(d.ready) close(d.results) close(d.msgs) d.started = false |