aboutsummaryrefslogtreecommitdiffstats
path: root/client/go
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-03-20 15:00:28 +0100
committerMartin Polden <mpolden@mpolden.no>2023-03-23 12:13:46 +0100
commit4d201bee4a0d7b0d9425079eb3fe6c51e4cc1f44 (patch)
tree90f1730d198d6def7aa0dfd424f4ed50f9cbc5a3 /client/go
parent9476017463771b04f93b817974f37149d22cf38a (diff)
Basic dispatcher
Diffstat (limited to 'client/go')
-rw-r--r--client/go/internal/vespa/document/dispatcher.go63
-rw-r--r--client/go/internal/vespa/document/dispatcher_test.go37
2 files changed, 100 insertions, 0 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
new file mode 100644
index 00000000000..fa15a8a1223
--- /dev/null
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -0,0 +1,63 @@
+package document
+
+import (
+ "fmt"
+ "sync"
+)
+
+// Dispatcher dispatches documents from a queue to a Feeder.
+type Dispatcher struct {
+ concurrencyLevel int
+ feeder Feeder
+ pending chan Document
+ closed bool
+ mu sync.RWMutex
+ wg sync.WaitGroup
+}
+
+func NewDispatcher(feeder Feeder, concurrencyLevel int) *Dispatcher {
+ if concurrencyLevel < 1 {
+ concurrencyLevel = 1
+ }
+ d := &Dispatcher{
+ concurrencyLevel: concurrencyLevel,
+ feeder: feeder,
+ pending: make(chan Document, 4*concurrencyLevel),
+ }
+ d.readPending()
+ return d
+}
+
+func (d *Dispatcher) readPending() {
+ for i := 0; i < d.concurrencyLevel; i++ {
+ d.wg.Add(1)
+ go func(n int) {
+ defer d.wg.Done()
+ for doc := range d.pending {
+ d.feeder.Send(doc)
+ }
+ }(i)
+ }
+}
+
+func (d *Dispatcher) Enqueue(doc Document) error {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+ if d.closed {
+ return fmt.Errorf("cannot enqueue document in closed dispatcher")
+ }
+ d.pending <- doc
+ return nil
+}
+
+// Close closes the dispatcher and waits for all goroutines to return.
+func (d *Dispatcher) Close() error {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ if !d.closed {
+ d.closed = true
+ close(d.pending)
+ d.wg.Wait()
+ }
+ return nil
+}
diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go
new file mode 100644
index 00000000000..0e4876a7d4b
--- /dev/null
+++ b/client/go/internal/vespa/document/dispatcher_test.go
@@ -0,0 +1,37 @@
+package document
+
+import (
+ "encoding/json"
+ "sync"
+ "testing"
+)
+
+type mockFeeder struct {
+ documents []Document
+ mu sync.Mutex
+}
+
+func (f *mockFeeder) Send(doc Document) Result {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ f.documents = append(f.documents, doc)
+ return Result{Id: doc.Id}
+}
+
+func (f *mockFeeder) Stats() Stats { return Stats{} }
+
+func TestDispatcher(t *testing.T) {
+ feeder := &mockFeeder{}
+ dispatcher := NewDispatcher(feeder, 2)
+ docs := []Document{
+ {PutId: "id:ns:type::doc1", Fields: json.RawMessage(`{"foo": "123"}`)},
+ {PutId: "id:ns:type::doc2", Fields: json.RawMessage(`{"bar": "456"}`)},
+ }
+ for _, d := range docs {
+ dispatcher.Enqueue(d)
+ }
+ dispatcher.Close()
+ if got, want := len(feeder.documents), 2; got != want {
+ t.Errorf("got %d documents, want %d", got, want)
+ }
+}