aboutsummaryrefslogtreecommitdiffstats
path: root/client/go/internal/vespa/document/dispatcher.go
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-04-14 13:18:17 +0200
committerMartin Polden <mpolden@mpolden.no>2023-04-17 10:31:40 +0200
commit58fb99d57e3b81f9c1c4567355355ef4a97e989f (patch)
treee62e0b664655a887ad7e5e037ea59865cef8230f /client/go/internal/vespa/document/dispatcher.go
parent96d8aae1ec9b4f6130b6b610ce23d2bbdb79298a (diff)
Always print errors
Diffstat (limited to 'client/go/internal/vespa/document/dispatcher.go')
-rw-r--r--client/go/internal/vespa/document/dispatcher.go79
1 files changed, 51 insertions, 28 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index 838a7bc45ee..798a888d677 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -4,6 +4,7 @@ import (
"container/list"
"fmt"
"io"
+ "strings"
"sync"
"sync/atomic"
"time"
@@ -18,12 +19,15 @@ type Dispatcher struct {
circuitBreaker CircuitBreaker
stats Stats
- started bool
- ready chan Id
- results chan Result
+ started bool
+ ready chan Id
+ results chan Result
+ msgs chan string
+
inflight map[string]*documentGroup
inflightCount int64
- errWriter io.Writer
+ output io.Writer
+ verbose bool
mu sync.RWMutex
wg sync.WaitGroup
@@ -55,13 +59,14 @@ func (g *documentGroup) add(op documentOp, first bool) {
}
}
-func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, errWriter io.Writer) *Dispatcher {
+func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, output io.Writer, verbose bool) *Dispatcher {
d := &Dispatcher{
feeder: feeder,
throttler: throttler,
circuitBreaker: breaker,
inflight: make(map[string]*documentGroup),
- errWriter: errWriter,
+ output: output,
+ verbose: verbose,
}
d.start()
return d
@@ -86,29 +91,35 @@ func (d *Dispatcher) sendDocumentIn(group *documentGroup) {
func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool {
if result.HTTPStatus/100 == 2 || result.HTTPStatus == 404 || result.HTTPStatus == 412 {
+ if d.verbose {
+ d.msgs <- fmt.Sprintf("feed: successfully fed %s with status %d", op.document.Id, result.HTTPStatus)
+ }
d.throttler.Success()
d.circuitBreaker.Success()
return false
}
if result.HTTPStatus == 429 || result.HTTPStatus == 503 {
- fmt.Fprintf(d.errWriter, "feed: %s was throttled with status %d: retrying\n", op.document, result.HTTPStatus)
+ d.msgs <- fmt.Sprintf("feed: %s was throttled with status %d: retrying\n", op.document, result.HTTPStatus)
d.throttler.Throttled(atomic.LoadInt64(&d.inflightCount))
return true
}
if result.Err != nil || result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 {
retry := op.attempts <= maxAttempts
- msg := "feed: " + op.document.String() + " failed with "
+ var msg strings.Builder
+ msg.WriteString("feed: ")
+ msg.WriteString(op.document.String())
if result.Err != nil {
- msg += "error " + result.Err.Error()
+ msg.WriteString("error ")
+ msg.WriteString(result.Err.Error())
} else {
- msg += fmt.Sprintf("status %d", result.HTTPStatus)
+ msg.WriteString(fmt.Sprintf("status %d", result.HTTPStatus))
}
if retry {
- msg += ": retrying"
+ msg.WriteString(": retrying")
} else {
- msg += fmt.Sprintf(": giving up after %d attempts", maxAttempts)
+ msg.WriteString(fmt.Sprintf(": giving up after %d attempts", maxAttempts))
}
- fmt.Fprintln(d.errWriter, msg)
+ d.msgs <- msg.String()
d.circuitBreaker.Error(fmt.Errorf("request failed with status %d", result.HTTPStatus))
if retry {
return true
@@ -125,17 +136,22 @@ func (d *Dispatcher) start() {
}
d.ready = make(chan Id, 4096)
d.results = make(chan Result, 4096)
+ d.msgs = make(chan string, 4096)
d.started = true
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.readDocuments()
}()
- d.resultWg.Add(1)
+ d.resultWg.Add(2)
go func() {
defer d.resultWg.Done()
d.readResults()
}()
+ go func() {
+ defer d.resultWg.Done()
+ d.readMessages()
+ }()
}
func (d *Dispatcher) readDocuments() {
@@ -157,6 +173,12 @@ func (d *Dispatcher) readResults() {
}
}
+func (d *Dispatcher) readMessages() {
+ for msg := range d.msgs {
+ fmt.Fprintln(d.output, msg)
+ }
+}
+
func (d *Dispatcher) enqueue(op documentOp) error {
d.mu.Lock()
if !d.started {
@@ -188,25 +210,26 @@ func (d *Dispatcher) acquireSlot() {
func (d *Dispatcher) releaseSlot() { atomic.AddInt64(&d.inflightCount, -1) }
-func closeAndWait[T any](ch chan T, wg *sync.WaitGroup, d *Dispatcher, markClosed bool) {
- d.mu.Lock()
- if d.started {
- close(ch)
- if markClosed {
- d.started = false
- }
- }
- d.mu.Unlock()
- wg.Wait()
-}
-
func (d *Dispatcher) Enqueue(doc Document) error { return d.enqueue(documentOp{document: doc}) }
func (d *Dispatcher) Stats() Stats { return d.stats }
// Close closes the dispatcher and waits for all inflight operations to complete.
func (d *Dispatcher) Close() error {
- closeAndWait(d.ready, &d.wg, d, false)
- closeAndWait(d.results, &d.resultWg, d, true)
+ d.mu.Lock()
+ if d.started {
+ close(d.ready)
+ }
+ d.mu.Unlock()
+ d.wg.Wait() // Wait for inflight operations to complete
+
+ d.mu.Lock()
+ if d.started {
+ close(d.results)
+ close(d.msgs)
+ d.started = false
+ }
+ d.mu.Unlock()
+ d.resultWg.Wait() // Wait for results
return nil
}