diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-04-14 13:18:17 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-04-17 10:31:40 +0200 |
commit | 58fb99d57e3b81f9c1c4567355355ef4a97e989f (patch) | |
tree | e62e0b664655a887ad7e5e037ea59865cef8230f | |
parent | 96d8aae1ec9b4f6130b6b610ce23d2bbdb79298a (diff) |
Always print errors
-rw-r--r-- | client/go/internal/cli/cmd/feed.go | 8 | ||||
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 79 | ||||
-rw-r--r-- | client/go/internal/vespa/document/dispatcher_test.go | 6 |
3 files changed, 56 insertions, 37 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index c284328255a..f0f82dd80d1 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -19,7 +19,7 @@ func addFeedFlags(cmd *cobra.Command, options *feedOptions) { cmd.PersistentFlags().StringVar(&options.route, "route", "", "Target Vespa route for feed operations") cmd.PersistentFlags().IntVar(&options.traceLevel, "trace", 0, "The trace level of network traffic. 0 to disable") cmd.PersistentFlags().IntVar(&options.timeoutSecs, "timeout", 0, "Feed operation timeout in seconds. 0 to disable") - cmd.PersistentFlags().BoolVar(&options.verbose, "verbose", false, "Verbose mode. Print errors as they happen") + cmd.PersistentFlags().BoolVar(&options.verbose, "verbose", false, "Verbose mode. Print successful operations in addition to errors") } type feedOptions struct { @@ -97,11 +97,7 @@ func feed(r io.Reader, cli *CLI, options feedOptions) error { throttler := document.NewThrottler(options.connections) // TODO(mpolden): Make doom duration configurable circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0) - errWriter := io.Discard - if options.verbose { - errWriter = cli.Stderr - } - dispatcher := document.NewDispatcher(client, throttler, circuitBreaker, errWriter) + dispatcher := document.NewDispatcher(client, throttler, circuitBreaker, cli.Stderr, options.verbose) dec := document.NewDecoder(r) start := cli.now() diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 838a7bc45ee..798a888d677 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -4,6 +4,7 @@ import ( "container/list" "fmt" "io" + "strings" "sync" "sync/atomic" "time" @@ -18,12 +19,15 @@ type Dispatcher struct { circuitBreaker CircuitBreaker stats Stats - started bool - ready chan Id - results chan Result + started bool + ready chan Id + results chan Result + msgs chan string + inflight map[string]*documentGroup inflightCount int64 - errWriter io.Writer + output io.Writer + verbose bool mu sync.RWMutex wg sync.WaitGroup @@ -55,13 +59,14 @@ func (g *documentGroup) add(op documentOp, first bool) { } } -func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, errWriter io.Writer) *Dispatcher { +func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, output io.Writer, verbose bool) *Dispatcher { d := &Dispatcher{ feeder: feeder, throttler: throttler, circuitBreaker: breaker, inflight: make(map[string]*documentGroup), - errWriter: errWriter, + output: output, + verbose: verbose, } d.start() return d @@ -86,29 +91,35 @@ func (d *Dispatcher) sendDocumentIn(group *documentGroup) { func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool { if result.HTTPStatus/100 == 2 || result.HTTPStatus == 404 || result.HTTPStatus == 412 { + if d.verbose { + d.msgs <- fmt.Sprintf("feed: successfully fed %s with status %d", op.document.Id, result.HTTPStatus) + } d.throttler.Success() d.circuitBreaker.Success() return false } if result.HTTPStatus == 429 || result.HTTPStatus == 503 { - fmt.Fprintf(d.errWriter, "feed: %s was throttled with status %d: retrying\n", op.document, result.HTTPStatus) + d.msgs <- fmt.Sprintf("feed: %s was throttled with status %d: retrying\n", op.document, result.HTTPStatus) d.throttler.Throttled(atomic.LoadInt64(&d.inflightCount)) return true } if result.Err != nil || result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 { retry := op.attempts <= maxAttempts - msg := "feed: " + op.document.String() + " failed with " + var msg strings.Builder + msg.WriteString("feed: ") + msg.WriteString(op.document.String()) if result.Err != nil { - msg += "error " + result.Err.Error() + msg.WriteString("error ") + msg.WriteString(result.Err.Error()) } else { - msg += fmt.Sprintf("status %d", result.HTTPStatus) + msg.WriteString(fmt.Sprintf("status %d", result.HTTPStatus)) } if retry { - msg += ": retrying" + msg.WriteString(": retrying") } else { - msg += fmt.Sprintf(": giving up after %d attempts", maxAttempts) + msg.WriteString(fmt.Sprintf(": giving up after %d attempts", maxAttempts)) } - fmt.Fprintln(d.errWriter, msg) + d.msgs <- msg.String() d.circuitBreaker.Error(fmt.Errorf("request failed with status %d", result.HTTPStatus)) if retry { return true @@ -125,17 +136,22 @@ func (d *Dispatcher) start() { } d.ready = make(chan Id, 4096) d.results = make(chan Result, 4096) + d.msgs = make(chan string, 4096) d.started = true d.wg.Add(1) go func() { defer d.wg.Done() d.readDocuments() }() - d.resultWg.Add(1) + d.resultWg.Add(2) go func() { defer d.resultWg.Done() d.readResults() }() + go func() { + defer d.resultWg.Done() + d.readMessages() + }() } func (d *Dispatcher) readDocuments() { @@ -157,6 +173,12 @@ func (d *Dispatcher) readResults() { } } +func (d *Dispatcher) readMessages() { + for msg := range d.msgs { + fmt.Fprintln(d.output, msg) + } +} + func (d *Dispatcher) enqueue(op documentOp) error { d.mu.Lock() if !d.started { @@ -188,25 +210,26 @@ func (d *Dispatcher) acquireSlot() { func (d *Dispatcher) releaseSlot() { atomic.AddInt64(&d.inflightCount, -1) } -func closeAndWait[T any](ch chan T, wg *sync.WaitGroup, d *Dispatcher, markClosed bool) { - d.mu.Lock() - if d.started { - close(ch) - if markClosed { - d.started = false - } - } - d.mu.Unlock() - wg.Wait() -} - func (d *Dispatcher) Enqueue(doc Document) error { return d.enqueue(documentOp{document: doc}) } func (d *Dispatcher) Stats() Stats { return d.stats } // Close closes the dispatcher and waits for all inflight operations to complete. func (d *Dispatcher) Close() error { - closeAndWait(d.ready, &d.wg, d, false) - closeAndWait(d.results, &d.resultWg, d, true) + d.mu.Lock() + if d.started { + close(d.ready) + } + d.mu.Unlock() + d.wg.Wait() // Wait for inflight operations to complete + + d.mu.Lock() + if d.started { + close(d.results) + close(d.msgs) + d.started = false + } + d.mu.Unlock() + d.resultWg.Wait() // Wait for results return nil } diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go index 80bc5f603ae..d066f5bc9ae 100644 --- a/client/go/internal/vespa/document/dispatcher_test.go +++ b/client/go/internal/vespa/document/dispatcher_test.go @@ -41,7 +41,7 @@ func TestDispatcher(t *testing.T) { clock := &manualClock{tick: time.Second} throttler := newThrottler(8, clock.now) breaker := NewCircuitBreaker(time.Second, 0) - dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard) + dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard, false) docs := []Document{ {Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{"foo": "123"}}`)}, {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Body: []byte(`{"fields":{"bar": "456"}}`)}, @@ -74,7 +74,7 @@ func TestDispatcherOrdering(t *testing.T) { clock := &manualClock{tick: time.Second} throttler := newThrottler(8, clock.now) breaker := NewCircuitBreaker(time.Second, 0) - dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard) + dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard, false) for _, d := range docs { dispatcher.Enqueue(d) } @@ -110,7 +110,7 @@ func TestDispatcherOrderingWithFailures(t *testing.T) { clock := &manualClock{tick: time.Second} throttler := newThrottler(8, clock.now) breaker := NewCircuitBreaker(time.Second, 0) - dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard) + dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard, false) for _, d := range docs { dispatcher.Enqueue(d) } |