diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-03-20 15:00:28 +0100 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-03-23 12:13:46 +0100 |
commit | 4d201bee4a0d7b0d9425079eb3fe6c51e4cc1f44 (patch) | |
tree | 90f1730d198d6def7aa0dfd424f4ed50f9cbc5a3 /client/go | |
parent | 9476017463771b04f93b817974f37149d22cf38a (diff) |
Basic dispatcher
Diffstat (limited to 'client/go')
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 63 | ||||
-rw-r--r-- | client/go/internal/vespa/document/dispatcher_test.go | 37 |
2 files changed, 100 insertions, 0 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go new file mode 100644 index 00000000000..fa15a8a1223 --- /dev/null +++ b/client/go/internal/vespa/document/dispatcher.go @@ -0,0 +1,63 @@ +package document + +import ( + "fmt" + "sync" +) + +// Dispatcher dispatches documents from a queue to a Feeder. +type Dispatcher struct { + concurrencyLevel int + feeder Feeder + pending chan Document + closed bool + mu sync.RWMutex + wg sync.WaitGroup +} + +func NewDispatcher(feeder Feeder, concurrencyLevel int) *Dispatcher { + if concurrencyLevel < 1 { + concurrencyLevel = 1 + } + d := &Dispatcher{ + concurrencyLevel: concurrencyLevel, + feeder: feeder, + pending: make(chan Document, 4*concurrencyLevel), + } + d.readPending() + return d +} + +func (d *Dispatcher) readPending() { + for i := 0; i < d.concurrencyLevel; i++ { + d.wg.Add(1) + go func(n int) { + defer d.wg.Done() + for doc := range d.pending { + d.feeder.Send(doc) + } + }(i) + } +} + +func (d *Dispatcher) Enqueue(doc Document) error { + d.mu.RLock() + defer d.mu.RUnlock() + if d.closed { + return fmt.Errorf("cannot enqueue document in closed dispatcher") + } + d.pending <- doc + return nil +} + +// Close closes the dispatcher and waits for all goroutines to return. +func (d *Dispatcher) Close() error { + d.mu.Lock() + defer d.mu.Unlock() + if !d.closed { + d.closed = true + close(d.pending) + d.wg.Wait() + } + return nil +} diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go new file mode 100644 index 00000000000..0e4876a7d4b --- /dev/null +++ b/client/go/internal/vespa/document/dispatcher_test.go @@ -0,0 +1,37 @@ +package document + +import ( + "encoding/json" + "sync" + "testing" +) + +type mockFeeder struct { + documents []Document + mu sync.Mutex +} + +func (f *mockFeeder) Send(doc Document) Result { + f.mu.Lock() + defer f.mu.Unlock() + f.documents = append(f.documents, doc) + return Result{Id: doc.Id} +} + +func (f *mockFeeder) Stats() Stats { return Stats{} } + +func TestDispatcher(t *testing.T) { + feeder := &mockFeeder{} + dispatcher := NewDispatcher(feeder, 2) + docs := []Document{ + {PutId: "id:ns:type::doc1", Fields: json.RawMessage(`{"foo": "123"}`)}, + {PutId: "id:ns:type::doc2", Fields: json.RawMessage(`{"bar": "456"}`)}, + } + for _, d := range docs { + dispatcher.Enqueue(d) + } + dispatcher.Close() + if got, want := len(feeder.documents), 2; got != want { + t.Errorf("got %d documents, want %d", got, want) + } +} |