diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-03-20 15:00:28 +0100 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-03-23 12:13:46 +0100 |
commit | 4d201bee4a0d7b0d9425079eb3fe6c51e4cc1f44 (patch) | |
tree | 90f1730d198d6def7aa0dfd424f4ed50f9cbc5a3 /client/go/internal/vespa/document/dispatcher.go | |
parent | 9476017463771b04f93b817974f37149d22cf38a (diff) |
Basic dispatcher
Diffstat (limited to 'client/go/internal/vespa/document/dispatcher.go')
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 63 |
1 files changed, 63 insertions, 0 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go new file mode 100644 index 00000000000..fa15a8a1223 --- /dev/null +++ b/client/go/internal/vespa/document/dispatcher.go @@ -0,0 +1,63 @@ +package document + +import ( + "fmt" + "sync" +) + +// Dispatcher dispatches documents from a queue to a Feeder. +type Dispatcher struct { + concurrencyLevel int + feeder Feeder + pending chan Document + closed bool + mu sync.RWMutex + wg sync.WaitGroup +} + +func NewDispatcher(feeder Feeder, concurrencyLevel int) *Dispatcher { + if concurrencyLevel < 1 { + concurrencyLevel = 1 + } + d := &Dispatcher{ + concurrencyLevel: concurrencyLevel, + feeder: feeder, + pending: make(chan Document, 4*concurrencyLevel), + } + d.readPending() + return d +} + +func (d *Dispatcher) readPending() { + for i := 0; i < d.concurrencyLevel; i++ { + d.wg.Add(1) + go func(n int) { + defer d.wg.Done() + for doc := range d.pending { + d.feeder.Send(doc) + } + }(i) + } +} + +func (d *Dispatcher) Enqueue(doc Document) error { + d.mu.RLock() + defer d.mu.RUnlock() + if d.closed { + return fmt.Errorf("cannot enqueue document in closed dispatcher") + } + d.pending <- doc + return nil +} + +// Close closes the dispatcher and waits for all goroutines to return. +func (d *Dispatcher) Close() error { + d.mu.Lock() + defer d.mu.Unlock() + if !d.closed { + d.closed = true + close(d.pending) + d.wg.Wait() + } + return nil +} |