summaryrefslogtreecommitdiffstats
path: root/client
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-03-22 11:10:49 +0100
committerMartin Polden <mpolden@mpolden.no>2023-03-23 17:57:33 +0100
commit5016289ac451c46299cb20542f96e2f97a7a2f1c (patch)
tree6d0b2c1f334c7e818a077b31933f2bf3df11d5e6 /client
parent8d8bdb5bd10182e2ccf0e9095c4bd6adc26b0635 (diff)
Dispatch documents with common ID in order
Diffstat (limited to 'client')
-rw-r--r--client/go/internal/vespa/document/dispatcher.go120
-rw-r--r--client/go/internal/vespa/document/dispatcher_test.go98
-rw-r--r--client/go/internal/vespa/document/feeder.go3
-rw-r--r--client/go/internal/vespa/document/http.go6
4 files changed, 193 insertions, 34 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index fa15a8a1223..feb562a241a 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -5,59 +5,125 @@ import (
"sync"
)
+const maxAttempts = 10
+
// Dispatcher dispatches documents from a queue to a Feeder.
type Dispatcher struct {
- concurrencyLevel int
- feeder Feeder
- pending chan Document
- closed bool
- mu sync.RWMutex
- wg sync.WaitGroup
+ workers int
+ feeder Feeder
+ ready chan Id
+ inflight map[string]*documentGroup
+ mu sync.RWMutex
+ wg sync.WaitGroup
+ closed bool
+}
+
+// documentGroup holds document operations which share their ID, and must be dispatched in order.
+type documentGroup struct {
+ id Id
+ failed bool
+ operations []documentOp
+ mu sync.Mutex
+}
+
+type documentOp struct {
+ document Document
+ attempts int
+}
+
+func (g *documentGroup) append(op documentOp) {
+ g.mu.Lock()
+ defer g.mu.Unlock()
+ g.operations = append(g.operations, op)
}
-func NewDispatcher(feeder Feeder, concurrencyLevel int) *Dispatcher {
- if concurrencyLevel < 1 {
- concurrencyLevel = 1
+func NewDispatcher(feeder Feeder, workers int) *Dispatcher {
+ if workers < 1 {
+ workers = 1
}
d := &Dispatcher{
- concurrencyLevel: concurrencyLevel,
- feeder: feeder,
- pending: make(chan Document, 4*concurrencyLevel),
+ workers: workers,
+ feeder: feeder,
+ inflight: make(map[string]*documentGroup),
}
- d.readPending()
+ d.start()
return d
}
-func (d *Dispatcher) readPending() {
- for i := 0; i < d.concurrencyLevel; i++ {
+func (d *Dispatcher) dispatchAll(g *documentGroup) int {
+ g.mu.Lock()
+ defer g.mu.Unlock()
+ failCount := len(g.operations)
+ for i := 0; !g.failed && i < len(g.operations); i++ {
+ op := g.operations[i]
+ ok := false
+ for op.attempts <= maxAttempts && !ok {
+ op.attempts += 1
+ // TODO(mpolden): Extract function which does throttling/circuit-breaking
+ result := d.feeder.Send(op.document)
+ ok = result.Status.Success()
+ }
+ if ok {
+ failCount--
+ } else {
+ g.failed = true
+ failCount = len(g.operations) - i
+ }
+ }
+ g.operations = nil
+ return failCount
+}
+
+func (d *Dispatcher) start() {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ d.closed = false
+ d.ready = make(chan Id, 4*d.workers)
+ for i := 0; i < d.workers; i++ {
d.wg.Add(1)
- go func(n int) {
+ go func() {
defer d.wg.Done()
- for doc := range d.pending {
- d.feeder.Send(doc)
+ for id := range d.ready {
+ d.mu.RLock()
+ group := d.inflight[id.String()]
+ d.mu.RUnlock()
+ if group != nil {
+ failedDocs := d.dispatchAll(group)
+ d.feeder.AddStats(Stats{Errors: int64(failedDocs)})
+ }
}
- }(i)
+ }()
}
}
func (d *Dispatcher) Enqueue(doc Document) error {
- d.mu.RLock()
- defer d.mu.RUnlock()
+ d.mu.Lock()
+ defer d.mu.Unlock()
if d.closed {
- return fmt.Errorf("cannot enqueue document in closed dispatcher")
+ return fmt.Errorf("dispatcher is closed")
}
- d.pending <- doc
+ group, ok := d.inflight[doc.Id.String()]
+ if ok {
+ group.append(documentOp{document: doc})
+ } else {
+ group = &documentGroup{
+ id: doc.Id,
+ operations: []documentOp{{document: doc}},
+ }
+ d.inflight[doc.Id.String()] = group
+ }
+ d.ready <- doc.Id
return nil
}
-// Close closes the dispatcher and waits for all goroutines to return.
+// Close closes the dispatcher and waits for all inflight operations to complete.
func (d *Dispatcher) Close() error {
d.mu.Lock()
- defer d.mu.Unlock()
if !d.closed {
+ close(d.ready)
d.closed = true
- close(d.pending)
- d.wg.Wait()
}
+ d.mu.Unlock()
+ d.wg.Wait()
return nil
}
diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go
index 0e4876a7d4b..384097670e1 100644
--- a/client/go/internal/vespa/document/dispatcher_test.go
+++ b/client/go/internal/vespa/document/dispatcher_test.go
@@ -4,21 +4,40 @@ import (
"encoding/json"
"sync"
"testing"
+
+ "github.com/stretchr/testify/assert"
)
type mockFeeder struct {
- documents []Document
- mu sync.Mutex
+ failAfterNDocs int
+ documents []Document
+ stats Stats
+ mu sync.Mutex
+}
+
+func (f *mockFeeder) failAfterN(docs int) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ f.failAfterNDocs = docs
}
func (f *mockFeeder) Send(doc Document) Result {
f.mu.Lock()
defer f.mu.Unlock()
+ if f.failAfterNDocs > 0 && len(f.documents) >= f.failAfterNDocs {
+ return Result{Id: doc.Id, Status: StatusVespaFailure}
+ }
f.documents = append(f.documents, doc)
return Result{Id: doc.Id}
}
-func (f *mockFeeder) Stats() Stats { return Stats{} }
+func (f *mockFeeder) Stats() Stats { return f.stats }
+
+func (f *mockFeeder) AddStats(stats Stats) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ f.stats.Add(stats)
+}
func TestDispatcher(t *testing.T) {
feeder := &mockFeeder{}
@@ -35,3 +54,76 @@ func TestDispatcher(t *testing.T) {
t.Errorf("got %d documents, want %d", got, want)
}
}
+
+func TestDispatcherOrdering(t *testing.T) {
+ feeder := &mockFeeder{}
+ commonId := "id:ns:type::doc1"
+ docs := []Document{
+ mustParseDocument(Document{PutId: commonId}),
+ mustParseDocument(Document{PutId: "id:ns:type::doc2"}),
+ mustParseDocument(Document{PutId: "id:ns:type::doc3"}),
+ mustParseDocument(Document{PutId: "id:ns:type::doc4"}),
+ mustParseDocument(Document{UpdateId: commonId}),
+ mustParseDocument(Document{PutId: "id:ns:type::doc5"}),
+ mustParseDocument(Document{PutId: "id:ns:type::doc6"}),
+ mustParseDocument(Document{RemoveId: commonId}),
+ mustParseDocument(Document{PutId: "id:ns:type::doc7"}),
+ mustParseDocument(Document{PutId: "id:ns:type::doc8"}),
+ mustParseDocument(Document{PutId: "id:ns:type::doc9"}),
+ }
+ dispatcher := NewDispatcher(feeder, len(docs))
+ for _, d := range docs {
+ dispatcher.Enqueue(d)
+ }
+ dispatcher.Close()
+
+ var wantDocs []Document
+ for _, d := range docs {
+ if d.Id.String() == commonId {
+ wantDocs = append(wantDocs, d)
+ }
+ }
+ var gotDocs []Document
+ for _, d := range feeder.documents {
+ if d.Id.String() == commonId {
+ gotDocs = append(gotDocs, d)
+ }
+ }
+ assert.Equal(t, len(docs), len(feeder.documents))
+ assert.Equal(t, wantDocs, gotDocs)
+ assert.Equal(t, int64(0), feeder.Stats().Errors)
+}
+
+func TestDispatcherOrderingWithFailures(t *testing.T) {
+ feeder := &mockFeeder{}
+ commonId := "id:ns:type::doc1"
+ docs := []Document{
+ mustParseDocument(Document{PutId: commonId}),
+ mustParseDocument(Document{PutId: commonId}),
+ mustParseDocument(Document{UpdateId: commonId}), // fails
+ mustParseDocument(Document{RemoveId: commonId}), // fails
+ }
+ feeder.failAfterN(2)
+ dispatcher := NewDispatcher(feeder, len(docs))
+ for _, d := range docs {
+ dispatcher.Enqueue(d)
+ }
+ dispatcher.Close()
+ wantDocs := docs[:2]
+ assert.Equal(t, wantDocs, feeder.documents)
+ assert.Equal(t, int64(2), feeder.Stats().Errors)
+
+ // Dispatching more documents for same ID fails implicitly
+ feeder.failAfterN(0)
+ dispatcher.start()
+ dispatcher.Enqueue(mustParseDocument(Document{PutId: commonId}))
+ dispatcher.Enqueue(mustParseDocument(Document{RemoveId: commonId}))
+ // Other IDs are fine
+ doc2 := mustParseDocument(Document{PutId: "id:ns:type::doc2"})
+ doc3 := mustParseDocument(Document{PutId: "id:ns:type::doc3"})
+ dispatcher.Enqueue(doc2)
+ dispatcher.Enqueue(doc3)
+ dispatcher.Close()
+ assert.Equal(t, int64(4), feeder.Stats().Errors)
+ assert.Equal(t, 4, len(feeder.documents))
+}
diff --git a/client/go/internal/vespa/document/feeder.go b/client/go/internal/vespa/document/feeder.go
index 732db051dab..e7e891a3975 100644
--- a/client/go/internal/vespa/document/feeder.go
+++ b/client/go/internal/vespa/document/feeder.go
@@ -65,7 +65,7 @@ func (s Stats) Successes() int64 {
return s.ResponsesByCode[200]
}
-// Add adds all statistics contained in other to this.
+// Add all statistics contained in other to this.
func (s *Stats) Add(other Stats) {
s.Requests += other.Requests
s.Responses += other.Responses
@@ -94,4 +94,5 @@ func (s *Stats) Add(other Stats) {
type Feeder interface {
Send(Document) Result
Stats() Stats
+ AddStats(Stats)
}
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go
index e86ceb1ebc5..981bad5140d 100644
--- a/client/go/internal/vespa/document/http.go
+++ b/client/go/internal/vespa/document/http.go
@@ -123,7 +123,7 @@ func (c *Client) Send(document Document) Result {
stats.TotalLatency = latency
stats.MinLatency = latency
stats.MaxLatency = latency
- c.addStats(&stats)
+ c.AddStats(stats)
}()
method, url, err := c.feedURL(document, c.queryParams())
if err != nil {
@@ -152,10 +152,10 @@ func (c *Client) Send(document Document) Result {
func (c *Client) Stats() Stats { return c.stats }
-func (c *Client) addStats(stats *Stats) {
+func (c *Client) AddStats(stats Stats) {
c.mu.Lock()
defer c.mu.Unlock()
- c.stats.Add(*stats)
+ c.stats.Add(stats)
}
func (c *Client) createResult(id Id, stats *Stats, resp *http.Response) Result {