aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-04-18 18:36:52 +0200
committerMartin Polden <mpolden@mpolden.no>2023-04-19 11:35:43 +0200
commit57a6011fb28c8ccd91a4d60f52630b50160d7901 (patch)
treefdc4223ea88c2f20d6d24e838a570091db1b7da0
parent0483d95ca48b6f5ba1c9ad3b5c73b50fc2390145 (diff)
Extract Queue type
-rw-r--r--client/go/internal/vespa/document/dispatcher.go28
-rw-r--r--client/go/internal/vespa/document/queue.go43
-rw-r--r--client/go/internal/vespa/document/queue_test.go29
3 files changed, 80 insertions, 20 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index 1c950210a72..5c99f3bf056 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -43,21 +43,14 @@ type documentOp struct {
// documentGroup holds document operations which share an ID, and must be dispatched in order.
type documentGroup struct {
- ops *list.List
- mu sync.Mutex
+ q *Queue[documentOp]
+ mu sync.Mutex
}
-func (g *documentGroup) add(op documentOp, first bool, listPool *sync.Pool) {
+func (g *documentGroup) add(op documentOp, first bool) {
g.mu.Lock()
defer g.mu.Unlock()
- if g.ops == nil {
- g.ops = listPool.Get().(*list.List)
- }
- if first {
- g.ops.PushFront(op)
- } else {
- g.ops.PushBack(op)
- }
+ g.q.Add(op, first)
}
func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, output io.Writer, verbose bool) *Dispatcher {
@@ -75,19 +68,14 @@ func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, o
func (d *Dispatcher) sendDocumentIn(group *documentGroup) {
group.mu.Lock()
- first := group.ops.Front()
- if first == nil {
+ op, ok := group.q.Poll()
+ if !ok {
panic("sending from empty document group, this should not happen")
}
- op := group.ops.Remove(first).(documentOp)
op.attempts++
result := d.feeder.Send(op.document)
d.results <- result
d.releaseSlot()
- if group.ops.Front() == nil { // Empty list, release it back to the pool
- d.listPool.Put(group.ops)
- group.ops = nil
- }
group.mu.Unlock()
if d.shouldRetry(op, result) {
d.enqueue(op)
@@ -171,11 +159,11 @@ func (d *Dispatcher) enqueue(op documentOp) error {
key := op.document.Id.String()
group, ok := d.inflight[key]
if !ok {
- group = &documentGroup{}
+ group = &documentGroup{q: NewQueue[documentOp](&d.listPool)}
d.inflight[key] = group
}
d.mu.Unlock()
- group.add(op, op.attempts > 0, &d.listPool)
+ group.add(op, op.attempts > 0)
d.enqueueWithSlot(op.document.Id)
return nil
}
diff --git a/client/go/internal/vespa/document/queue.go b/client/go/internal/vespa/document/queue.go
new file mode 100644
index 00000000000..2e5a1976d58
--- /dev/null
+++ b/client/go/internal/vespa/document/queue.go
@@ -0,0 +1,43 @@
+package document
+
+import (
+ "container/list"
+ "sync"
+)
+
+// Queue wraps a doubly linked list. It attempts to re-use lists through a sync.Pool to reduce GC pressure.
+type Queue[T any] struct {
+ items *list.List
+ listPool *sync.Pool
+}
+
+func NewQueue[T any](listPool *sync.Pool) *Queue[T] {
+ if listPool.New == nil {
+ listPool.New = func() any { return list.New() }
+ }
+ return &Queue[T]{listPool: listPool}
+}
+
+func (q *Queue[T]) Add(item T, front bool) {
+ if q.items == nil {
+ q.items = q.listPool.Get().(*list.List)
+ }
+ if front {
+ q.items.PushFront(item)
+ } else {
+ q.items.PushBack(item)
+ }
+}
+
+func (q *Queue[T]) Poll() (T, bool) {
+ if q.items == nil || q.items.Front() == nil {
+ var empty T
+ return empty, false
+ }
+ item := q.items.Remove(q.items.Front()).(T)
+ if q.items.Front() == nil { // Emptied queue, release list back to pool
+ q.listPool.Put(q.items)
+ q.items = nil
+ }
+ return item, true
+}
diff --git a/client/go/internal/vespa/document/queue_test.go b/client/go/internal/vespa/document/queue_test.go
new file mode 100644
index 00000000000..992e7410053
--- /dev/null
+++ b/client/go/internal/vespa/document/queue_test.go
@@ -0,0 +1,29 @@
+package document
+
+import (
+ "sync"
+ "testing"
+)
+
+func TestQueue(t *testing.T) {
+ q := NewQueue[int](&sync.Pool{})
+ assertPoll(t, q, 0, false)
+ q.Add(1, false)
+ q.Add(2, false)
+ assertPoll(t, q, 1, true)
+ assertPoll(t, q, 2, true)
+ q.Add(3, false)
+ q.Add(4, true)
+ assertPoll(t, q, 4, true)
+ assertPoll(t, q, 3, true)
+}
+
+func assertPoll(t *testing.T, q *Queue[int], want int, wantOk bool) {
+ got, ok := q.Poll()
+ if ok != wantOk {
+ t.Fatalf("got ok=%t, want %t", ok, wantOk)
+ }
+ if got != want {
+ t.Fatalf("got v=%d, want %d", got, want)
+ }
+}