diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-04-18 18:36:52 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-04-19 11:35:43 +0200 |
commit | 57a6011fb28c8ccd91a4d60f52630b50160d7901 (patch) | |
tree | fdc4223ea88c2f20d6d24e838a570091db1b7da0 /client | |
parent | 0483d95ca48b6f5ba1c9ad3b5c73b50fc2390145 (diff) |
Extract Queue type
Diffstat (limited to 'client')
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 28 | ||||
-rw-r--r-- | client/go/internal/vespa/document/queue.go | 43 | ||||
-rw-r--r-- | client/go/internal/vespa/document/queue_test.go | 29 |
3 files changed, 80 insertions, 20 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 1c950210a72..5c99f3bf056 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -43,21 +43,14 @@ type documentOp struct { // documentGroup holds document operations which share an ID, and must be dispatched in order. type documentGroup struct { - ops *list.List - mu sync.Mutex + q *Queue[documentOp] + mu sync.Mutex } -func (g *documentGroup) add(op documentOp, first bool, listPool *sync.Pool) { +func (g *documentGroup) add(op documentOp, first bool) { g.mu.Lock() defer g.mu.Unlock() - if g.ops == nil { - g.ops = listPool.Get().(*list.List) - } - if first { - g.ops.PushFront(op) - } else { - g.ops.PushBack(op) - } + g.q.Add(op, first) } func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, output io.Writer, verbose bool) *Dispatcher { @@ -75,19 +68,14 @@ func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, o func (d *Dispatcher) sendDocumentIn(group *documentGroup) { group.mu.Lock() - first := group.ops.Front() - if first == nil { + op, ok := group.q.Poll() + if !ok { panic("sending from empty document group, this should not happen") } - op := group.ops.Remove(first).(documentOp) op.attempts++ result := d.feeder.Send(op.document) d.results <- result d.releaseSlot() - if group.ops.Front() == nil { // Empty list, release it back to the pool - d.listPool.Put(group.ops) - group.ops = nil - } group.mu.Unlock() if d.shouldRetry(op, result) { d.enqueue(op) @@ -171,11 +159,11 @@ func (d *Dispatcher) enqueue(op documentOp) error { key := op.document.Id.String() group, ok := d.inflight[key] if !ok { - group = &documentGroup{} + group = &documentGroup{q: NewQueue[documentOp](&d.listPool)} d.inflight[key] = group } d.mu.Unlock() - group.add(op, op.attempts > 0, &d.listPool) + group.add(op, op.attempts > 0) d.enqueueWithSlot(op.document.Id) return nil } diff --git a/client/go/internal/vespa/document/queue.go b/client/go/internal/vespa/document/queue.go new file mode 100644 index 00000000000..2e5a1976d58 --- /dev/null +++ b/client/go/internal/vespa/document/queue.go @@ -0,0 +1,43 @@ +package document + +import ( + "container/list" + "sync" +) + +// Queue wraps a doubly linked list. It attempts to re-use lists through a sync.Pool to reduce GC pressure. +type Queue[T any] struct { + items *list.List + listPool *sync.Pool +} + +func NewQueue[T any](listPool *sync.Pool) *Queue[T] { + if listPool.New == nil { + listPool.New = func() any { return list.New() } + } + return &Queue[T]{listPool: listPool} +} + +func (q *Queue[T]) Add(item T, front bool) { + if q.items == nil { + q.items = q.listPool.Get().(*list.List) + } + if front { + q.items.PushFront(item) + } else { + q.items.PushBack(item) + } +} + +func (q *Queue[T]) Poll() (T, bool) { + if q.items == nil || q.items.Front() == nil { + var empty T + return empty, false + } + item := q.items.Remove(q.items.Front()).(T) + if q.items.Front() == nil { // Emptied queue, release list back to pool + q.listPool.Put(q.items) + q.items = nil + } + return item, true +} diff --git a/client/go/internal/vespa/document/queue_test.go b/client/go/internal/vespa/document/queue_test.go new file mode 100644 index 00000000000..992e7410053 --- /dev/null +++ b/client/go/internal/vespa/document/queue_test.go @@ -0,0 +1,29 @@ +package document + +import ( + "sync" + "testing" +) + +func TestQueue(t *testing.T) { + q := NewQueue[int](&sync.Pool{}) + assertPoll(t, q, 0, false) + q.Add(1, false) + q.Add(2, false) + assertPoll(t, q, 1, true) + assertPoll(t, q, 2, true) + q.Add(3, false) + q.Add(4, true) + assertPoll(t, q, 4, true) + assertPoll(t, q, 3, true) +} + +func assertPoll(t *testing.T, q *Queue[int], want int, wantOk bool) { + got, ok := q.Poll() + if ok != wantOk { + t.Fatalf("got ok=%t, want %t", ok, wantOk) + } + if got != want { + t.Fatalf("got v=%d, want %d", got, want) + } +} |