aboutsummaryrefslogtreecommitdiffstats
path: root/client/go
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-04-14 13:18:17 +0200
committerMartin Polden <mpolden@mpolden.no>2023-04-17 10:31:40 +0200
commit58fb99d57e3b81f9c1c4567355355ef4a97e989f (patch)
treee62e0b664655a887ad7e5e037ea59865cef8230f /client/go
parent96d8aae1ec9b4f6130b6b610ce23d2bbdb79298a (diff)
Always print errors
Diffstat (limited to 'client/go')
-rw-r--r--client/go/internal/cli/cmd/feed.go8
-rw-r--r--client/go/internal/vespa/document/dispatcher.go79
-rw-r--r--client/go/internal/vespa/document/dispatcher_test.go6
3 files changed, 56 insertions, 37 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go
index c284328255a..f0f82dd80d1 100644
--- a/client/go/internal/cli/cmd/feed.go
+++ b/client/go/internal/cli/cmd/feed.go
@@ -19,7 +19,7 @@ func addFeedFlags(cmd *cobra.Command, options *feedOptions) {
cmd.PersistentFlags().StringVar(&options.route, "route", "", "Target Vespa route for feed operations")
cmd.PersistentFlags().IntVar(&options.traceLevel, "trace", 0, "The trace level of network traffic. 0 to disable")
cmd.PersistentFlags().IntVar(&options.timeoutSecs, "timeout", 0, "Feed operation timeout in seconds. 0 to disable")
- cmd.PersistentFlags().BoolVar(&options.verbose, "verbose", false, "Verbose mode. Print errors as they happen")
+ cmd.PersistentFlags().BoolVar(&options.verbose, "verbose", false, "Verbose mode. Print successful operations in addition to errors")
}
type feedOptions struct {
@@ -97,11 +97,7 @@ func feed(r io.Reader, cli *CLI, options feedOptions) error {
throttler := document.NewThrottler(options.connections)
// TODO(mpolden): Make doom duration configurable
circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0)
- errWriter := io.Discard
- if options.verbose {
- errWriter = cli.Stderr
- }
- dispatcher := document.NewDispatcher(client, throttler, circuitBreaker, errWriter)
+ dispatcher := document.NewDispatcher(client, throttler, circuitBreaker, cli.Stderr, options.verbose)
dec := document.NewDecoder(r)
start := cli.now()
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index 838a7bc45ee..798a888d677 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -4,6 +4,7 @@ import (
"container/list"
"fmt"
"io"
+ "strings"
"sync"
"sync/atomic"
"time"
@@ -18,12 +19,15 @@ type Dispatcher struct {
circuitBreaker CircuitBreaker
stats Stats
- started bool
- ready chan Id
- results chan Result
+ started bool
+ ready chan Id
+ results chan Result
+ msgs chan string
+
inflight map[string]*documentGroup
inflightCount int64
- errWriter io.Writer
+ output io.Writer
+ verbose bool
mu sync.RWMutex
wg sync.WaitGroup
@@ -55,13 +59,14 @@ func (g *documentGroup) add(op documentOp, first bool) {
}
}
-func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, errWriter io.Writer) *Dispatcher {
+func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, output io.Writer, verbose bool) *Dispatcher {
d := &Dispatcher{
feeder: feeder,
throttler: throttler,
circuitBreaker: breaker,
inflight: make(map[string]*documentGroup),
- errWriter: errWriter,
+ output: output,
+ verbose: verbose,
}
d.start()
return d
@@ -86,29 +91,35 @@ func (d *Dispatcher) sendDocumentIn(group *documentGroup) {
func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool {
if result.HTTPStatus/100 == 2 || result.HTTPStatus == 404 || result.HTTPStatus == 412 {
+ if d.verbose {
+ d.msgs <- fmt.Sprintf("feed: successfully fed %s with status %d", op.document.Id, result.HTTPStatus)
+ }
d.throttler.Success()
d.circuitBreaker.Success()
return false
}
if result.HTTPStatus == 429 || result.HTTPStatus == 503 {
- fmt.Fprintf(d.errWriter, "feed: %s was throttled with status %d: retrying\n", op.document, result.HTTPStatus)
+ d.msgs <- fmt.Sprintf("feed: %s was throttled with status %d: retrying\n", op.document, result.HTTPStatus)
d.throttler.Throttled(atomic.LoadInt64(&d.inflightCount))
return true
}
if result.Err != nil || result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 {
retry := op.attempts <= maxAttempts
- msg := "feed: " + op.document.String() + " failed with "
+ var msg strings.Builder
+ msg.WriteString("feed: ")
+ msg.WriteString(op.document.String())
if result.Err != nil {
- msg += "error " + result.Err.Error()
+ msg.WriteString("error ")
+ msg.WriteString(result.Err.Error())
} else {
- msg += fmt.Sprintf("status %d", result.HTTPStatus)
+ msg.WriteString(fmt.Sprintf("status %d", result.HTTPStatus))
}
if retry {
- msg += ": retrying"
+ msg.WriteString(": retrying")
} else {
- msg += fmt.Sprintf(": giving up after %d attempts", maxAttempts)
+ msg.WriteString(fmt.Sprintf(": giving up after %d attempts", maxAttempts))
}
- fmt.Fprintln(d.errWriter, msg)
+ d.msgs <- msg.String()
d.circuitBreaker.Error(fmt.Errorf("request failed with status %d", result.HTTPStatus))
if retry {
return true
@@ -125,17 +136,22 @@ func (d *Dispatcher) start() {
}
d.ready = make(chan Id, 4096)
d.results = make(chan Result, 4096)
+ d.msgs = make(chan string, 4096)
d.started = true
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.readDocuments()
}()
- d.resultWg.Add(1)
+ d.resultWg.Add(2)
go func() {
defer d.resultWg.Done()
d.readResults()
}()
+ go func() {
+ defer d.resultWg.Done()
+ d.readMessages()
+ }()
}
func (d *Dispatcher) readDocuments() {
@@ -157,6 +173,12 @@ func (d *Dispatcher) readResults() {
}
}
+func (d *Dispatcher) readMessages() {
+ for msg := range d.msgs {
+ fmt.Fprintln(d.output, msg)
+ }
+}
+
func (d *Dispatcher) enqueue(op documentOp) error {
d.mu.Lock()
if !d.started {
@@ -188,25 +210,26 @@ func (d *Dispatcher) acquireSlot() {
func (d *Dispatcher) releaseSlot() { atomic.AddInt64(&d.inflightCount, -1) }
-func closeAndWait[T any](ch chan T, wg *sync.WaitGroup, d *Dispatcher, markClosed bool) {
- d.mu.Lock()
- if d.started {
- close(ch)
- if markClosed {
- d.started = false
- }
- }
- d.mu.Unlock()
- wg.Wait()
-}
-
func (d *Dispatcher) Enqueue(doc Document) error { return d.enqueue(documentOp{document: doc}) }
func (d *Dispatcher) Stats() Stats { return d.stats }
// Close closes the dispatcher and waits for all inflight operations to complete.
func (d *Dispatcher) Close() error {
- closeAndWait(d.ready, &d.wg, d, false)
- closeAndWait(d.results, &d.resultWg, d, true)
+ d.mu.Lock()
+ if d.started {
+ close(d.ready)
+ }
+ d.mu.Unlock()
+ d.wg.Wait() // Wait for inflight operations to complete
+
+ d.mu.Lock()
+ if d.started {
+ close(d.results)
+ close(d.msgs)
+ d.started = false
+ }
+ d.mu.Unlock()
+ d.resultWg.Wait() // Wait for results
return nil
}
diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go
index 80bc5f603ae..d066f5bc9ae 100644
--- a/client/go/internal/vespa/document/dispatcher_test.go
+++ b/client/go/internal/vespa/document/dispatcher_test.go
@@ -41,7 +41,7 @@ func TestDispatcher(t *testing.T) {
clock := &manualClock{tick: time.Second}
throttler := newThrottler(8, clock.now)
breaker := NewCircuitBreaker(time.Second, 0)
- dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard)
+ dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard, false)
docs := []Document{
{Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{"foo": "123"}}`)},
{Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Body: []byte(`{"fields":{"bar": "456"}}`)},
@@ -74,7 +74,7 @@ func TestDispatcherOrdering(t *testing.T) {
clock := &manualClock{tick: time.Second}
throttler := newThrottler(8, clock.now)
breaker := NewCircuitBreaker(time.Second, 0)
- dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard)
+ dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard, false)
for _, d := range docs {
dispatcher.Enqueue(d)
}
@@ -110,7 +110,7 @@ func TestDispatcherOrderingWithFailures(t *testing.T) {
clock := &manualClock{tick: time.Second}
throttler := newThrottler(8, clock.now)
breaker := NewCircuitBreaker(time.Second, 0)
- dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard)
+ dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard, false)
for _, d := range docs {
dispatcher.Enqueue(d)
}