summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-04-04 14:17:17 +0200
committerMartin Polden <mpolden@mpolden.no>2023-04-11 10:27:09 +0200
commit20aec66209b46859a99b0fb80ce6c208f77dc9ff (patch)
tree0d6664a0c2ae1b559b4d21657f96ca35defac6c1
parent0784bd1d2b2c887897b7750281a54ab57cb6badf (diff)
Add verbose flag
-rw-r--r--client/go/internal/cli/cmd/feed.go19
-rw-r--r--client/go/internal/vespa/document/dispatcher.go25
-rw-r--r--client/go/internal/vespa/document/dispatcher_test.go9
-rw-r--r--client/go/internal/vespa/document/document.go21
-rw-r--r--client/go/internal/vespa/document/feeder.go5
-rw-r--r--client/go/internal/vespa/document/http.go2
6 files changed, 62 insertions, 19 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go
index 97bee293077..0244004b512 100644
--- a/client/go/internal/cli/cmd/feed.go
+++ b/client/go/internal/cli/cmd/feed.go
@@ -12,14 +12,13 @@ import (
"github.com/vespa-engine/vespa/client/go/internal/vespa/document"
)
-func addFeedFlags(cmd *cobra.Command, concurrency *int) {
- // TOOD(mpolden): Remove this flag
- cmd.PersistentFlags().IntVarP(concurrency, "concurrency", "T", 64, "Number of goroutines to use for dispatching")
+func addFeedFlags(cmd *cobra.Command, verbose *bool) {
+ cmd.PersistentFlags().BoolVarP(verbose, "verbose", "v", false, "Verbose mode. Print errors as they happen")
}
func newFeedCmd(cli *CLI) *cobra.Command {
var (
- concurrency int
+ verbose bool
)
cmd := &cobra.Command{
Use: "feed FILE",
@@ -44,14 +43,14 @@ newline (JSONL).
return err
}
defer f.Close()
- return feed(f, cli, concurrency)
+ return feed(f, cli, verbose)
},
}
- addFeedFlags(cmd, &concurrency)
+ addFeedFlags(cmd, &verbose)
return cmd
}
-func feed(r io.Reader, cli *CLI, concurrency int) error {
+func feed(r io.Reader, cli *CLI, verbose bool) error {
service, err := documentService(cli)
if err != nil {
return err
@@ -63,7 +62,11 @@ func feed(r io.Reader, cli *CLI, concurrency int) error {
throttler := document.NewThrottler()
// TODO(mpolden): Make doom duration configurable
circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0)
- dispatcher := document.NewDispatcher(client, throttler, circuitBreaker)
+ errWriter := io.Discard
+ if verbose {
+ errWriter = cli.Stderr
+ }
+ dispatcher := document.NewDispatcher(client, throttler, circuitBreaker, errWriter)
dec := document.NewDecoder(r)
start := cli.now()
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index 7011ae7a9b6..9d757aa51aa 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -2,6 +2,7 @@ package document
import (
"fmt"
+ "io"
"sync"
"sync/atomic"
"time"
@@ -21,6 +22,7 @@ type Dispatcher struct {
results chan Result
inflight map[string]*documentGroup
inflightCount int64
+ errWriter io.Writer
mu sync.RWMutex
wg sync.WaitGroup
@@ -45,12 +47,13 @@ func (g *documentGroup) append(op documentOp) {
g.operations = append(g.operations, op)
}
-func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker) *Dispatcher {
+func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, errWriter io.Writer) *Dispatcher {
d := &Dispatcher{
feeder: feeder,
throttler: throttler,
circuitBreaker: breaker,
inflight: make(map[string]*documentGroup),
+ errWriter: errWriter,
}
d.start()
return d
@@ -66,7 +69,7 @@ func (d *Dispatcher) dispatchAll(g *documentGroup) {
op.attempts++
result := d.feeder.Send(op.document)
d.results <- result
- ok = result.Status.Success()
+ ok = result.Success()
if !d.shouldRetry(op, result) {
break
}
@@ -83,12 +86,26 @@ func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool {
return false
}
if result.HTTPStatus == 429 || result.HTTPStatus == 503 {
+ fmt.Fprintf(d.errWriter, "feed: %s was throttled with status %d: retrying\n", op.document, result.HTTPStatus)
d.throttler.Throttled(atomic.LoadInt64(&d.inflightCount))
return true
}
- if result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 {
+ if result.Err != nil || result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 {
+ retry := op.attempts <= maxAttempts
+ msg := "feed: " + op.document.String() + " failed with "
+ if result.Err != nil {
+ msg += "error " + result.Err.Error()
+ } else {
+ msg += fmt.Sprintf("status %d", result.HTTPStatus)
+ }
+ if retry {
+ msg += ": retrying"
+ } else {
+ msg += fmt.Sprintf(": giving up after %d attempts", maxAttempts)
+ }
+ fmt.Fprintln(d.errWriter, msg)
d.circuitBreaker.Error(fmt.Errorf("request failed with status %d", result.HTTPStatus))
- if op.attempts <= maxAttempts {
+ if retry {
return true
}
}
diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go
index 8a6d8c6117c..fc96adabc96 100644
--- a/client/go/internal/vespa/document/dispatcher_test.go
+++ b/client/go/internal/vespa/document/dispatcher_test.go
@@ -1,6 +1,7 @@
package document
import (
+ "io"
"sync"
"testing"
"time"
@@ -29,7 +30,7 @@ func (f *mockFeeder) Send(doc Document) Result {
} else {
f.documents = append(f.documents, doc)
}
- if !result.Status.Success() {
+ if !result.Success() {
result.Stats.Errors = 1
}
return result
@@ -40,7 +41,7 @@ func TestDispatcher(t *testing.T) {
clock := &manualClock{tick: time.Second}
throttler := newThrottler(clock.now)
breaker := NewCircuitBreaker(time.Second, 0)
- dispatcher := NewDispatcher(feeder, throttler, breaker)
+ dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard)
docs := []Document{
{Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{"foo": "123"}}`)},
{Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Body: []byte(`{"fields":{"bar": "456"}}`)},
@@ -73,7 +74,7 @@ func TestDispatcherOrdering(t *testing.T) {
clock := &manualClock{tick: time.Second}
throttler := newThrottler(clock.now)
breaker := NewCircuitBreaker(time.Second, 0)
- dispatcher := NewDispatcher(feeder, throttler, breaker)
+ dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard)
for _, d := range docs {
dispatcher.Enqueue(d)
}
@@ -109,7 +110,7 @@ func TestDispatcherOrderingWithFailures(t *testing.T) {
clock := &manualClock{tick: time.Second}
throttler := newThrottler(clock.now)
breaker := NewCircuitBreaker(time.Second, 0)
- dispatcher := NewDispatcher(feeder, throttler, breaker)
+ dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard)
for _, d := range docs {
dispatcher.Enqueue(d)
}
diff --git a/client/go/internal/vespa/document/document.go b/client/go/internal/vespa/document/document.go
index 98cb2d1b6c6..efb60ad8c0a 100644
--- a/client/go/internal/vespa/document/document.go
+++ b/client/go/internal/vespa/document/document.go
@@ -130,6 +130,27 @@ type Decoder struct {
jsonl bool
}
+func (d Document) String() string {
+ var sb strings.Builder
+ switch d.Operation {
+ case OperationPut:
+ sb.WriteString("put ")
+ case OperationUpdate:
+ sb.WriteString("update ")
+ case OperationRemove:
+ sb.WriteString("remove ")
+ }
+ sb.WriteString(d.Id.String())
+ if d.Condition != "" {
+ sb.WriteString(", condition=")
+ sb.WriteString(d.Condition)
+ }
+ if d.Create {
+ sb.WriteString(", create=true")
+ }
+ return sb.String()
+}
+
func (d *Decoder) guessMode() error {
for !d.array && !d.jsonl {
b, err := d.buf.ReadByte()
diff --git a/client/go/internal/vespa/document/feeder.go b/client/go/internal/vespa/document/feeder.go
index 8bdd5bca5ba..4ff612067b7 100644
--- a/client/go/internal/vespa/document/feeder.go
+++ b/client/go/internal/vespa/document/feeder.go
@@ -32,8 +32,9 @@ type Result struct {
Stats Stats
}
-// Success returns whether status s is considered a success.
-func (s Status) Success() bool { return s == StatusSuccess || s == StatusConditionNotMet }
+func (r Result) Success() bool {
+ return r.Err == nil && (r.Status == StatusSuccess || r.Status == StatusConditionNotMet)
+}
// Stats represents feeding operation statistics.
type Stats struct {
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go
index 4dadcd1d05c..b1d5c80f29f 100644
--- a/client/go/internal/vespa/document/http.go
+++ b/client/go/internal/vespa/document/http.go
@@ -171,7 +171,7 @@ func (c *Client) resultWithResponse(resp *http.Response, result Result) Result {
result.Message = body.Message
result.Trace = string(body.Trace)
result.Stats.BytesRecv = cr.bytesRead
- if !result.Status.Success() {
+ if !result.Success() {
result.Stats.Errors = 1
}
return result