summaryrefslogtreecommitdiffstats
path: root/client
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-05-22 15:48:01 +0200
committerMartin Polden <mpolden@mpolden.no>2023-05-22 15:48:01 +0200
commit69944390cf45f617fb2250a95e27d08e355da78d (patch)
tree966c7fca8ddab6f0c64a28c8f55ccda0cfb5c116 /client
parent38601194dc7ece53e180005f10f26c3858956ce6 (diff)
Remove unnecessary ready channel
Diffstat (limited to 'client')
-rw-r--r--client/go/internal/vespa/document/dispatcher.go19
1 files changed, 4 insertions, 15 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index 2ad5b841616..466ed9dd9fa 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -20,7 +20,6 @@ type Dispatcher struct {
stats Stats
started bool
- ready chan documentOp
results chan documentOp
msgs chan string
@@ -110,23 +109,14 @@ func (d *Dispatcher) start() {
if d.started {
return
}
- d.ready = make(chan documentOp, 4096)
d.results = make(chan documentOp, 4096)
d.msgs = make(chan string, 4096)
d.started = true
- d.wg.Add(3)
- go d.dispatchReady()
+ d.wg.Add(2)
go d.processResults()
go d.printMessages()
}
-func (d *Dispatcher) dispatchReady() {
- defer d.wg.Done()
- for op := range d.ready {
- d.dispatch(op)
- }
-}
-
func (d *Dispatcher) dispatch(op documentOp) {
if !d.acceptDocument() {
d.msgs <- fmt.Sprintf("refusing to dispatch document %s: too many errors", op.document.Id.String())
@@ -164,8 +154,8 @@ func (d *Dispatcher) dispatchNext(id Id) {
panic("no queue exists for " + id.String() + ": this should not happen")
}
if next, ok := q.Poll(); ok {
- // we have more operations with this ID: notify dispatcher about the next one
- d.ready <- next
+ // we have more operations with this ID: dispatch the next one
+ d.dispatch(next)
} else {
// no more operations with this ID: release slot
delete(d.inflight, k)
@@ -206,7 +196,7 @@ func (d *Dispatcher) enqueue(op documentOp, isRetry bool) error {
if !ok && !isRetry {
// first operation with this ID: acquire slot
d.acquireSlot()
- d.ready <- op
+ d.dispatch(op)
d.throttler.Sent()
}
return nil
@@ -248,7 +238,6 @@ func (d *Dispatcher) Close() error {
d.inflightWg.Wait() // Wait for all inflight operations to complete
d.mu.Lock()
if d.started {
- close(d.ready)
close(d.results)
close(d.msgs)
d.started = false