diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-04-24 15:18:38 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-04-24 15:18:38 +0200 |
commit | d54798f7eb271a99e80fb7aa2e7487fa148d5166 (patch) | |
tree | 04c537037cd9d39dc2f81d506ab08953d18086ab | |
parent | d299e66860c0dfdc62354e07c899f36b46bdccde (diff) | |
parent | 37ce1414c9d38a08e281c3faad059f0cff39560c (diff) |
Merge pull request #26834 from vespa-engine/mpolden/feed-client-12
Respect circuit breaker
-rw-r--r-- | client/go/internal/cli/cmd/feed.go | 25 | ||||
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 21 | ||||
-rw-r--r-- | client/go/internal/vespa/document/dispatcher_test.go | 20 |
3 files changed, 52 insertions, 14 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index 9d90233e62a..a6447ef8d2e 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -18,10 +18,11 @@ import ( func addFeedFlags(cmd *cobra.Command, options *feedOptions) { cmd.PersistentFlags().IntVar(&options.connections, "connections", 8, "The number of connections to use") cmd.PersistentFlags().StringVar(&options.compression, "compression", "auto", `Compression mode to use. Default is "auto" which compresses large documents. Must be "auto", "gzip" or "none"`) + cmd.PersistentFlags().IntVar(&options.timeoutSecs, "timeout", 0, "Invididual feed operation timeout in seconds. 0 to disable") + cmd.PersistentFlags().IntVar(&options.doomSecs, "max-failure-seconds", 0, "Exit if given number of seconds elapse without any successful operations. 0 to disable") + cmd.PersistentFlags().BoolVar(&options.verbose, "verbose", false, "Verbose mode. Print successful operations in addition to errors") cmd.PersistentFlags().StringVar(&options.route, "route", "", "Target Vespa route for feed operations") cmd.PersistentFlags().IntVar(&options.traceLevel, "trace", 0, "The trace level of network traffic. 0 to disable") - cmd.PersistentFlags().IntVar(&options.timeoutSecs, "timeout", 0, "Feed operation timeout in seconds. 0 to disable") - cmd.PersistentFlags().BoolVar(&options.verbose, "verbose", false, "Verbose mode. Print successful operations in addition to errors") memprofile := "memprofile" cpuprofile := "cpuprofile" cmd.PersistentFlags().StringVar(&options.memprofile, memprofile, "", "Write a heap profile to given file") @@ -38,28 +39,29 @@ type feedOptions struct { verbose bool traceLevel int timeoutSecs int - memprofile string - cpuprofile string + doomSecs int + + memprofile string + cpuprofile string } func newFeedCmd(cli *CLI) *cobra.Command { var options feedOptions cmd := &cobra.Command{ - Use: "feed FILE", + Use: "feed FILE [FILE]...", Short: "Feed documents to a Vespa cluster", Long: `Feed documents to a Vespa cluster. -A high performance feeding client. This can be used to feed large amounts of -documents to a Vespa cluster efficiently. +This command can be used to feed large amounts of documents to a Vespa cluster +efficiently. The contents of FILE must be either a JSON array or JSON objects separated by newline (JSONL). If FILE is a single dash ('-'), documents will be read from standard input. `, - Example: `$ vespa feed documents.jsonl -$ cat documents.jsonl | vespa feed - -`, + Example: `$ vespa feed docs.jsonl moredocs.json +$ cat docs.jsonl | vespa feed -`, Args: cobra.MinimumNArgs(1), DisableAutoGenTag: true, SilenceUsage: true, @@ -131,8 +133,7 @@ func feed(files []string, options feedOptions, cli *CLI) error { NowFunc: cli.now, }, clients) throttler := document.NewThrottler(options.connections) - // TODO(mpolden): Make doom duration configurable - circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0) + circuitBreaker := document.NewCircuitBreaker(10*time.Second, time.Duration(options.doomSecs)*time.Second) dispatcher := document.NewDispatcher(client, throttler, circuitBreaker, cli.Stderr, options.verbose) start := cli.now() for _, name := range files { diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 0f3d39d5a78..51e60e4e131 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -163,11 +163,15 @@ func (d *Dispatcher) enqueue(op documentOp) error { } d.mu.Unlock() group.add(op, op.attempts > 0) - d.enqueueWithSlot(group) + d.dispatch(op.document.Id, group) return nil } -func (d *Dispatcher) enqueueWithSlot(group *documentGroup) { +func (d *Dispatcher) dispatch(id Id, group *documentGroup) { + if !d.canDispatch() { + d.msgs <- fmt.Sprintf("refusing to dispatch document %s: too many errors", id) + return + } d.acquireSlot() d.workerWg.Add(1) go func() { @@ -177,6 +181,19 @@ func (d *Dispatcher) enqueueWithSlot(group *documentGroup) { d.throttler.Sent() } +func (d *Dispatcher) canDispatch() bool { + switch d.circuitBreaker.State() { + case CircuitClosed: + return true + case CircuitHalfOpen: + time.Sleep(time.Second) + return true + case CircuitOpen: + return false + } + panic("invalid circuit state") +} + func (d *Dispatcher) acquireSlot() { for atomic.LoadInt64(&d.inflightCount) >= d.throttler.TargetInflight() { time.Sleep(time.Millisecond) diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go index c8f8e550ba4..2e2e9a5abbd 100644 --- a/client/go/internal/vespa/document/dispatcher_test.go +++ b/client/go/internal/vespa/document/dispatcher_test.go @@ -36,6 +36,12 @@ func (f *mockFeeder) Send(doc Document) Result { return result } +type mockCircuitBreaker struct{ state CircuitState } + +func (c *mockCircuitBreaker) Success() {} +func (c *mockCircuitBreaker) Error(err error) {} +func (c *mockCircuitBreaker) State() CircuitState { return c.state } + func TestDispatcher(t *testing.T) { feeder := &mockFeeder{} clock := &manualClock{tick: time.Second} @@ -131,6 +137,20 @@ func TestDispatcherOrderingWithFailures(t *testing.T) { assert.Equal(t, 6, len(feeder.documents)) } +func TestDispatcherOpenCircuit(t *testing.T) { + feeder := &mockFeeder{} + doc := Document{Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut} + clock := &manualClock{tick: time.Second} + throttler := newThrottler(8, clock.now) + breaker := &mockCircuitBreaker{} + dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard, false) + dispatcher.Enqueue(doc) + breaker.state = CircuitOpen + dispatcher.Enqueue(doc) + dispatcher.Close() + assert.Equal(t, 1, len(feeder.documents)) +} + func BenchmarkDocumentDispatching(b *testing.B) { feeder := &mockFeeder{} clock := &manualClock{tick: time.Second} |