aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-04-24 15:18:38 +0200
committerGitHub <noreply@github.com>2023-04-24 15:18:38 +0200
commitd54798f7eb271a99e80fb7aa2e7487fa148d5166 (patch)
tree04c537037cd9d39dc2f81d506ab08953d18086ab
parentd299e66860c0dfdc62354e07c899f36b46bdccde (diff)
parent37ce1414c9d38a08e281c3faad059f0cff39560c (diff)
Merge pull request #26834 from vespa-engine/mpolden/feed-client-12
Respect circuit breaker
-rw-r--r--client/go/internal/cli/cmd/feed.go25
-rw-r--r--client/go/internal/vespa/document/dispatcher.go21
-rw-r--r--client/go/internal/vespa/document/dispatcher_test.go20
3 files changed, 52 insertions, 14 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go
index 9d90233e62a..a6447ef8d2e 100644
--- a/client/go/internal/cli/cmd/feed.go
+++ b/client/go/internal/cli/cmd/feed.go
@@ -18,10 +18,11 @@ import (
func addFeedFlags(cmd *cobra.Command, options *feedOptions) {
cmd.PersistentFlags().IntVar(&options.connections, "connections", 8, "The number of connections to use")
cmd.PersistentFlags().StringVar(&options.compression, "compression", "auto", `Compression mode to use. Default is "auto" which compresses large documents. Must be "auto", "gzip" or "none"`)
+ cmd.PersistentFlags().IntVar(&options.timeoutSecs, "timeout", 0, "Invididual feed operation timeout in seconds. 0 to disable")
+ cmd.PersistentFlags().IntVar(&options.doomSecs, "max-failure-seconds", 0, "Exit if given number of seconds elapse without any successful operations. 0 to disable")
+ cmd.PersistentFlags().BoolVar(&options.verbose, "verbose", false, "Verbose mode. Print successful operations in addition to errors")
cmd.PersistentFlags().StringVar(&options.route, "route", "", "Target Vespa route for feed operations")
cmd.PersistentFlags().IntVar(&options.traceLevel, "trace", 0, "The trace level of network traffic. 0 to disable")
- cmd.PersistentFlags().IntVar(&options.timeoutSecs, "timeout", 0, "Feed operation timeout in seconds. 0 to disable")
- cmd.PersistentFlags().BoolVar(&options.verbose, "verbose", false, "Verbose mode. Print successful operations in addition to errors")
memprofile := "memprofile"
cpuprofile := "cpuprofile"
cmd.PersistentFlags().StringVar(&options.memprofile, memprofile, "", "Write a heap profile to given file")
@@ -38,28 +39,29 @@ type feedOptions struct {
verbose bool
traceLevel int
timeoutSecs int
- memprofile string
- cpuprofile string
+ doomSecs int
+
+ memprofile string
+ cpuprofile string
}
func newFeedCmd(cli *CLI) *cobra.Command {
var options feedOptions
cmd := &cobra.Command{
- Use: "feed FILE",
+ Use: "feed FILE [FILE]...",
Short: "Feed documents to a Vespa cluster",
Long: `Feed documents to a Vespa cluster.
-A high performance feeding client. This can be used to feed large amounts of
-documents to a Vespa cluster efficiently.
+This command can be used to feed large amounts of documents to a Vespa cluster
+efficiently.
The contents of FILE must be either a JSON array or JSON objects separated by
newline (JSONL).
If FILE is a single dash ('-'), documents will be read from standard input.
`,
- Example: `$ vespa feed documents.jsonl
-$ cat documents.jsonl | vespa feed -
-`,
+ Example: `$ vespa feed docs.jsonl moredocs.json
+$ cat docs.jsonl | vespa feed -`,
Args: cobra.MinimumNArgs(1),
DisableAutoGenTag: true,
SilenceUsage: true,
@@ -131,8 +133,7 @@ func feed(files []string, options feedOptions, cli *CLI) error {
NowFunc: cli.now,
}, clients)
throttler := document.NewThrottler(options.connections)
- // TODO(mpolden): Make doom duration configurable
- circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0)
+ circuitBreaker := document.NewCircuitBreaker(10*time.Second, time.Duration(options.doomSecs)*time.Second)
dispatcher := document.NewDispatcher(client, throttler, circuitBreaker, cli.Stderr, options.verbose)
start := cli.now()
for _, name := range files {
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index 0f3d39d5a78..51e60e4e131 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -163,11 +163,15 @@ func (d *Dispatcher) enqueue(op documentOp) error {
}
d.mu.Unlock()
group.add(op, op.attempts > 0)
- d.enqueueWithSlot(group)
+ d.dispatch(op.document.Id, group)
return nil
}
-func (d *Dispatcher) enqueueWithSlot(group *documentGroup) {
+func (d *Dispatcher) dispatch(id Id, group *documentGroup) {
+ if !d.canDispatch() {
+ d.msgs <- fmt.Sprintf("refusing to dispatch document %s: too many errors", id)
+ return
+ }
d.acquireSlot()
d.workerWg.Add(1)
go func() {
@@ -177,6 +181,19 @@ func (d *Dispatcher) enqueueWithSlot(group *documentGroup) {
d.throttler.Sent()
}
+func (d *Dispatcher) canDispatch() bool {
+ switch d.circuitBreaker.State() {
+ case CircuitClosed:
+ return true
+ case CircuitHalfOpen:
+ time.Sleep(time.Second)
+ return true
+ case CircuitOpen:
+ return false
+ }
+ panic("invalid circuit state")
+}
+
func (d *Dispatcher) acquireSlot() {
for atomic.LoadInt64(&d.inflightCount) >= d.throttler.TargetInflight() {
time.Sleep(time.Millisecond)
diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go
index c8f8e550ba4..2e2e9a5abbd 100644
--- a/client/go/internal/vespa/document/dispatcher_test.go
+++ b/client/go/internal/vespa/document/dispatcher_test.go
@@ -36,6 +36,12 @@ func (f *mockFeeder) Send(doc Document) Result {
return result
}
+type mockCircuitBreaker struct{ state CircuitState }
+
+func (c *mockCircuitBreaker) Success() {}
+func (c *mockCircuitBreaker) Error(err error) {}
+func (c *mockCircuitBreaker) State() CircuitState { return c.state }
+
func TestDispatcher(t *testing.T) {
feeder := &mockFeeder{}
clock := &manualClock{tick: time.Second}
@@ -131,6 +137,20 @@ func TestDispatcherOrderingWithFailures(t *testing.T) {
assert.Equal(t, 6, len(feeder.documents))
}
+func TestDispatcherOpenCircuit(t *testing.T) {
+ feeder := &mockFeeder{}
+ doc := Document{Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut}
+ clock := &manualClock{tick: time.Second}
+ throttler := newThrottler(8, clock.now)
+ breaker := &mockCircuitBreaker{}
+ dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard, false)
+ dispatcher.Enqueue(doc)
+ breaker.state = CircuitOpen
+ dispatcher.Enqueue(doc)
+ dispatcher.Close()
+ assert.Equal(t, 1, len(feeder.documents))
+}
+
func BenchmarkDocumentDispatching(b *testing.B) {
feeder := &mockFeeder{}
clock := &manualClock{tick: time.Second}