aboutsummaryrefslogtreecommitdiffstats
path: root/client/go/internal/vespa/document/dispatcher.go
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-03-20 15:00:28 +0100
committerMartin Polden <mpolden@mpolden.no>2023-03-23 12:13:46 +0100
commit4d201bee4a0d7b0d9425079eb3fe6c51e4cc1f44 (patch)
tree90f1730d198d6def7aa0dfd424f4ed50f9cbc5a3 /client/go/internal/vespa/document/dispatcher.go
parent9476017463771b04f93b817974f37149d22cf38a (diff)
Basic dispatcher
Diffstat (limited to 'client/go/internal/vespa/document/dispatcher.go')
-rw-r--r--client/go/internal/vespa/document/dispatcher.go63
1 files changed, 63 insertions, 0 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
new file mode 100644
index 00000000000..fa15a8a1223
--- /dev/null
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -0,0 +1,63 @@
+package document
+
+import (
+ "fmt"
+ "sync"
+)
+
+// Dispatcher dispatches documents from a queue to a Feeder.
+type Dispatcher struct {
+ concurrencyLevel int
+ feeder Feeder
+ pending chan Document
+ closed bool
+ mu sync.RWMutex
+ wg sync.WaitGroup
+}
+
+func NewDispatcher(feeder Feeder, concurrencyLevel int) *Dispatcher {
+ if concurrencyLevel < 1 {
+ concurrencyLevel = 1
+ }
+ d := &Dispatcher{
+ concurrencyLevel: concurrencyLevel,
+ feeder: feeder,
+ pending: make(chan Document, 4*concurrencyLevel),
+ }
+ d.readPending()
+ return d
+}
+
+func (d *Dispatcher) readPending() {
+ for i := 0; i < d.concurrencyLevel; i++ {
+ d.wg.Add(1)
+ go func(n int) {
+ defer d.wg.Done()
+ for doc := range d.pending {
+ d.feeder.Send(doc)
+ }
+ }(i)
+ }
+}
+
+func (d *Dispatcher) Enqueue(doc Document) error {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+ if d.closed {
+ return fmt.Errorf("cannot enqueue document in closed dispatcher")
+ }
+ d.pending <- doc
+ return nil
+}
+
+// Close closes the dispatcher and waits for all goroutines to return.
+func (d *Dispatcher) Close() error {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ if !d.closed {
+ d.closed = true
+ close(d.pending)
+ d.wg.Wait()
+ }
+ return nil
+}