diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-04-14 13:18:17 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-04-17 10:31:40 +0200 |
commit | 58fb99d57e3b81f9c1c4567355355ef4a97e989f (patch) | |
tree | e62e0b664655a887ad7e5e037ea59865cef8230f /client/go/internal/vespa/document/dispatcher.go | |
parent | 96d8aae1ec9b4f6130b6b610ce23d2bbdb79298a (diff) |
Always print errors
Diffstat (limited to 'client/go/internal/vespa/document/dispatcher.go')
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 79 |
1 files changed, 51 insertions, 28 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 838a7bc45ee..798a888d677 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -4,6 +4,7 @@ import ( "container/list" "fmt" "io" + "strings" "sync" "sync/atomic" "time" @@ -18,12 +19,15 @@ type Dispatcher struct { circuitBreaker CircuitBreaker stats Stats - started bool - ready chan Id - results chan Result + started bool + ready chan Id + results chan Result + msgs chan string + inflight map[string]*documentGroup inflightCount int64 - errWriter io.Writer + output io.Writer + verbose bool mu sync.RWMutex wg sync.WaitGroup @@ -55,13 +59,14 @@ func (g *documentGroup) add(op documentOp, first bool) { } } -func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, errWriter io.Writer) *Dispatcher { +func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, output io.Writer, verbose bool) *Dispatcher { d := &Dispatcher{ feeder: feeder, throttler: throttler, circuitBreaker: breaker, inflight: make(map[string]*documentGroup), - errWriter: errWriter, + output: output, + verbose: verbose, } d.start() return d @@ -86,29 +91,35 @@ func (d *Dispatcher) sendDocumentIn(group *documentGroup) { func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool { if result.HTTPStatus/100 == 2 || result.HTTPStatus == 404 || result.HTTPStatus == 412 { + if d.verbose { + d.msgs <- fmt.Sprintf("feed: successfully fed %s with status %d", op.document.Id, result.HTTPStatus) + } d.throttler.Success() d.circuitBreaker.Success() return false } if result.HTTPStatus == 429 || result.HTTPStatus == 503 { - fmt.Fprintf(d.errWriter, "feed: %s was throttled with status %d: retrying\n", op.document, result.HTTPStatus) + d.msgs <- fmt.Sprintf("feed: %s was throttled with status %d: retrying\n", op.document, result.HTTPStatus) d.throttler.Throttled(atomic.LoadInt64(&d.inflightCount)) return true } if result.Err != nil || result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 { retry := op.attempts <= maxAttempts - msg := "feed: " + op.document.String() + " failed with " + var msg strings.Builder + msg.WriteString("feed: ") + msg.WriteString(op.document.String()) if result.Err != nil { - msg += "error " + result.Err.Error() + msg.WriteString("error ") + msg.WriteString(result.Err.Error()) } else { - msg += fmt.Sprintf("status %d", result.HTTPStatus) + msg.WriteString(fmt.Sprintf("status %d", result.HTTPStatus)) } if retry { - msg += ": retrying" + msg.WriteString(": retrying") } else { - msg += fmt.Sprintf(": giving up after %d attempts", maxAttempts) + msg.WriteString(fmt.Sprintf(": giving up after %d attempts", maxAttempts)) } - fmt.Fprintln(d.errWriter, msg) + d.msgs <- msg.String() d.circuitBreaker.Error(fmt.Errorf("request failed with status %d", result.HTTPStatus)) if retry { return true @@ -125,17 +136,22 @@ func (d *Dispatcher) start() { } d.ready = make(chan Id, 4096) d.results = make(chan Result, 4096) + d.msgs = make(chan string, 4096) d.started = true d.wg.Add(1) go func() { defer d.wg.Done() d.readDocuments() }() - d.resultWg.Add(1) + d.resultWg.Add(2) go func() { defer d.resultWg.Done() d.readResults() }() + go func() { + defer d.resultWg.Done() + d.readMessages() + }() } func (d *Dispatcher) readDocuments() { @@ -157,6 +173,12 @@ func (d *Dispatcher) readResults() { } } +func (d *Dispatcher) readMessages() { + for msg := range d.msgs { + fmt.Fprintln(d.output, msg) + } +} + func (d *Dispatcher) enqueue(op documentOp) error { d.mu.Lock() if !d.started { @@ -188,25 +210,26 @@ func (d *Dispatcher) acquireSlot() { func (d *Dispatcher) releaseSlot() { atomic.AddInt64(&d.inflightCount, -1) } -func closeAndWait[T any](ch chan T, wg *sync.WaitGroup, d *Dispatcher, markClosed bool) { - d.mu.Lock() - if d.started { - close(ch) - if markClosed { - d.started = false - } - } - d.mu.Unlock() - wg.Wait() -} - func (d *Dispatcher) Enqueue(doc Document) error { return d.enqueue(documentOp{document: doc}) } func (d *Dispatcher) Stats() Stats { return d.stats } // Close closes the dispatcher and waits for all inflight operations to complete. func (d *Dispatcher) Close() error { - closeAndWait(d.ready, &d.wg, d, false) - closeAndWait(d.results, &d.resultWg, d, true) + d.mu.Lock() + if d.started { + close(d.ready) + } + d.mu.Unlock() + d.wg.Wait() // Wait for inflight operations to complete + + d.mu.Lock() + if d.started { + close(d.results) + close(d.msgs) + d.started = false + } + d.mu.Unlock() + d.resultWg.Wait() // Wait for results return nil } |