diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-04-04 14:17:17 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-04-11 10:27:09 +0200 |
commit | 20aec66209b46859a99b0fb80ce6c208f77dc9ff (patch) | |
tree | 0d6664a0c2ae1b559b4d21657f96ca35defac6c1 | |
parent | 0784bd1d2b2c887897b7750281a54ab57cb6badf (diff) |
Add verbose flag
-rw-r--r-- | client/go/internal/cli/cmd/feed.go | 19 | ||||
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 25 | ||||
-rw-r--r-- | client/go/internal/vespa/document/dispatcher_test.go | 9 | ||||
-rw-r--r-- | client/go/internal/vespa/document/document.go | 21 | ||||
-rw-r--r-- | client/go/internal/vespa/document/feeder.go | 5 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http.go | 2 |
6 files changed, 62 insertions, 19 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index 97bee293077..0244004b512 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -12,14 +12,13 @@ import ( "github.com/vespa-engine/vespa/client/go/internal/vespa/document" ) -func addFeedFlags(cmd *cobra.Command, concurrency *int) { - // TOOD(mpolden): Remove this flag - cmd.PersistentFlags().IntVarP(concurrency, "concurrency", "T", 64, "Number of goroutines to use for dispatching") +func addFeedFlags(cmd *cobra.Command, verbose *bool) { + cmd.PersistentFlags().BoolVarP(verbose, "verbose", "v", false, "Verbose mode. Print errors as they happen") } func newFeedCmd(cli *CLI) *cobra.Command { var ( - concurrency int + verbose bool ) cmd := &cobra.Command{ Use: "feed FILE", @@ -44,14 +43,14 @@ newline (JSONL). return err } defer f.Close() - return feed(f, cli, concurrency) + return feed(f, cli, verbose) }, } - addFeedFlags(cmd, &concurrency) + addFeedFlags(cmd, &verbose) return cmd } -func feed(r io.Reader, cli *CLI, concurrency int) error { +func feed(r io.Reader, cli *CLI, verbose bool) error { service, err := documentService(cli) if err != nil { return err @@ -63,7 +62,11 @@ func feed(r io.Reader, cli *CLI, concurrency int) error { throttler := document.NewThrottler() // TODO(mpolden): Make doom duration configurable circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0) - dispatcher := document.NewDispatcher(client, throttler, circuitBreaker) + errWriter := io.Discard + if verbose { + errWriter = cli.Stderr + } + dispatcher := document.NewDispatcher(client, throttler, circuitBreaker, errWriter) dec := document.NewDecoder(r) start := cli.now() diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 7011ae7a9b6..9d757aa51aa 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -2,6 +2,7 @@ package document import ( "fmt" + "io" "sync" "sync/atomic" "time" @@ -21,6 +22,7 @@ type Dispatcher struct { results chan Result inflight map[string]*documentGroup inflightCount int64 + errWriter io.Writer mu sync.RWMutex wg sync.WaitGroup @@ -45,12 +47,13 @@ func (g *documentGroup) append(op documentOp) { g.operations = append(g.operations, op) } -func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker) *Dispatcher { +func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, errWriter io.Writer) *Dispatcher { d := &Dispatcher{ feeder: feeder, throttler: throttler, circuitBreaker: breaker, inflight: make(map[string]*documentGroup), + errWriter: errWriter, } d.start() return d @@ -66,7 +69,7 @@ func (d *Dispatcher) dispatchAll(g *documentGroup) { op.attempts++ result := d.feeder.Send(op.document) d.results <- result - ok = result.Status.Success() + ok = result.Success() if !d.shouldRetry(op, result) { break } @@ -83,12 +86,26 @@ func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool { return false } if result.HTTPStatus == 429 || result.HTTPStatus == 503 { + fmt.Fprintf(d.errWriter, "feed: %s was throttled with status %d: retrying\n", op.document, result.HTTPStatus) d.throttler.Throttled(atomic.LoadInt64(&d.inflightCount)) return true } - if result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 { + if result.Err != nil || result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 { + retry := op.attempts <= maxAttempts + msg := "feed: " + op.document.String() + " failed with " + if result.Err != nil { + msg += "error " + result.Err.Error() + } else { + msg += fmt.Sprintf("status %d", result.HTTPStatus) + } + if retry { + msg += ": retrying" + } else { + msg += fmt.Sprintf(": giving up after %d attempts", maxAttempts) + } + fmt.Fprintln(d.errWriter, msg) d.circuitBreaker.Error(fmt.Errorf("request failed with status %d", result.HTTPStatus)) - if op.attempts <= maxAttempts { + if retry { return true } } diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go index 8a6d8c6117c..fc96adabc96 100644 --- a/client/go/internal/vespa/document/dispatcher_test.go +++ b/client/go/internal/vespa/document/dispatcher_test.go @@ -1,6 +1,7 @@ package document import ( + "io" "sync" "testing" "time" @@ -29,7 +30,7 @@ func (f *mockFeeder) Send(doc Document) Result { } else { f.documents = append(f.documents, doc) } - if !result.Status.Success() { + if !result.Success() { result.Stats.Errors = 1 } return result @@ -40,7 +41,7 @@ func TestDispatcher(t *testing.T) { clock := &manualClock{tick: time.Second} throttler := newThrottler(clock.now) breaker := NewCircuitBreaker(time.Second, 0) - dispatcher := NewDispatcher(feeder, throttler, breaker) + dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard) docs := []Document{ {Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{"foo": "123"}}`)}, {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Body: []byte(`{"fields":{"bar": "456"}}`)}, @@ -73,7 +74,7 @@ func TestDispatcherOrdering(t *testing.T) { clock := &manualClock{tick: time.Second} throttler := newThrottler(clock.now) breaker := NewCircuitBreaker(time.Second, 0) - dispatcher := NewDispatcher(feeder, throttler, breaker) + dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard) for _, d := range docs { dispatcher.Enqueue(d) } @@ -109,7 +110,7 @@ func TestDispatcherOrderingWithFailures(t *testing.T) { clock := &manualClock{tick: time.Second} throttler := newThrottler(clock.now) breaker := NewCircuitBreaker(time.Second, 0) - dispatcher := NewDispatcher(feeder, throttler, breaker) + dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard) for _, d := range docs { dispatcher.Enqueue(d) } diff --git a/client/go/internal/vespa/document/document.go b/client/go/internal/vespa/document/document.go index 98cb2d1b6c6..efb60ad8c0a 100644 --- a/client/go/internal/vespa/document/document.go +++ b/client/go/internal/vespa/document/document.go @@ -130,6 +130,27 @@ type Decoder struct { jsonl bool } +func (d Document) String() string { + var sb strings.Builder + switch d.Operation { + case OperationPut: + sb.WriteString("put ") + case OperationUpdate: + sb.WriteString("update ") + case OperationRemove: + sb.WriteString("remove ") + } + sb.WriteString(d.Id.String()) + if d.Condition != "" { + sb.WriteString(", condition=") + sb.WriteString(d.Condition) + } + if d.Create { + sb.WriteString(", create=true") + } + return sb.String() +} + func (d *Decoder) guessMode() error { for !d.array && !d.jsonl { b, err := d.buf.ReadByte() diff --git a/client/go/internal/vespa/document/feeder.go b/client/go/internal/vespa/document/feeder.go index 8bdd5bca5ba..4ff612067b7 100644 --- a/client/go/internal/vespa/document/feeder.go +++ b/client/go/internal/vespa/document/feeder.go @@ -32,8 +32,9 @@ type Result struct { Stats Stats } -// Success returns whether status s is considered a success. -func (s Status) Success() bool { return s == StatusSuccess || s == StatusConditionNotMet } +func (r Result) Success() bool { + return r.Err == nil && (r.Status == StatusSuccess || r.Status == StatusConditionNotMet) +} // Stats represents feeding operation statistics. type Stats struct { diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go index 4dadcd1d05c..b1d5c80f29f 100644 --- a/client/go/internal/vespa/document/http.go +++ b/client/go/internal/vespa/document/http.go @@ -171,7 +171,7 @@ func (c *Client) resultWithResponse(resp *http.Response, result Result) Result { result.Message = body.Message result.Trace = string(body.Trace) result.Stats.BytesRecv = cr.bytesRead - if !result.Status.Success() { + if !result.Success() { result.Stats.Errors = 1 } return result |