From e73277532d7aed8a9557449bad39cd5e4ee5054e Mon Sep 17 00:00:00 2001 From: Martin Polden Date: Mon, 24 Apr 2023 14:33:01 +0200 Subject: Respect circuit breaker --- client/go/internal/vespa/document/dispatcher.go | 21 +++++++++++++++++++-- .../go/internal/vespa/document/dispatcher_test.go | 20 ++++++++++++++++++++ 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 0f3d39d5a78..51e60e4e131 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -163,11 +163,15 @@ func (d *Dispatcher) enqueue(op documentOp) error { } d.mu.Unlock() group.add(op, op.attempts > 0) - d.enqueueWithSlot(group) + d.dispatch(op.document.Id, group) return nil } -func (d *Dispatcher) enqueueWithSlot(group *documentGroup) { +func (d *Dispatcher) dispatch(id Id, group *documentGroup) { + if !d.canDispatch() { + d.msgs <- fmt.Sprintf("refusing to dispatch document %s: too many errors", id) + return + } d.acquireSlot() d.workerWg.Add(1) go func() { @@ -177,6 +181,19 @@ func (d *Dispatcher) enqueueWithSlot(group *documentGroup) { d.throttler.Sent() } +func (d *Dispatcher) canDispatch() bool { + switch d.circuitBreaker.State() { + case CircuitClosed: + return true + case CircuitHalfOpen: + time.Sleep(time.Second) + return true + case CircuitOpen: + return false + } + panic("invalid circuit state") +} + func (d *Dispatcher) acquireSlot() { for atomic.LoadInt64(&d.inflightCount) >= d.throttler.TargetInflight() { time.Sleep(time.Millisecond) diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go index c8f8e550ba4..2e2e9a5abbd 100644 --- a/client/go/internal/vespa/document/dispatcher_test.go +++ b/client/go/internal/vespa/document/dispatcher_test.go @@ -36,6 +36,12 @@ func (f *mockFeeder) Send(doc Document) Result { return result } +type mockCircuitBreaker struct{ state CircuitState } + +func (c *mockCircuitBreaker) Success() {} +func (c *mockCircuitBreaker) Error(err error) {} +func (c *mockCircuitBreaker) State() CircuitState { return c.state } + func TestDispatcher(t *testing.T) { feeder := &mockFeeder{} clock := &manualClock{tick: time.Second} @@ -131,6 +137,20 @@ func TestDispatcherOrderingWithFailures(t *testing.T) { assert.Equal(t, 6, len(feeder.documents)) } +func TestDispatcherOpenCircuit(t *testing.T) { + feeder := &mockFeeder{} + doc := Document{Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut} + clock := &manualClock{tick: time.Second} + throttler := newThrottler(8, clock.now) + breaker := &mockCircuitBreaker{} + dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard, false) + dispatcher.Enqueue(doc) + breaker.state = CircuitOpen + dispatcher.Enqueue(doc) + dispatcher.Close() + assert.Equal(t, 1, len(feeder.documents)) +} + func BenchmarkDocumentDispatching(b *testing.B) { feeder := &mockFeeder{} clock := &manualClock{tick: time.Second} -- cgit v1.2.3 From 0396b8d84cfe66ce48a1671aa8291a1f23a8bfcd Mon Sep 17 00:00:00 2001 From: Martin Polden Date: Mon, 24 Apr 2023 14:44:17 +0200 Subject: Add flag for doom duration --- client/go/internal/cli/cmd/feed.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index 9d90233e62a..df314d02ad1 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -18,10 +18,11 @@ import ( func addFeedFlags(cmd *cobra.Command, options *feedOptions) { cmd.PersistentFlags().IntVar(&options.connections, "connections", 8, "The number of connections to use") cmd.PersistentFlags().StringVar(&options.compression, "compression", "auto", `Compression mode to use. Default is "auto" which compresses large documents. Must be "auto", "gzip" or "none"`) + cmd.PersistentFlags().IntVar(&options.timeoutSecs, "timeout", 0, "Invididual feed operation timeout in seconds. 0 to disable") + cmd.PersistentFlags().IntVar(&options.doomSecs, "max-failure-seconds", 0, "Exit if given number of seconds elapse without any successful operations. 0 to disable") + cmd.PersistentFlags().BoolVar(&options.verbose, "verbose", false, "Verbose mode. Print successful operations in addition to errors") cmd.PersistentFlags().StringVar(&options.route, "route", "", "Target Vespa route for feed operations") cmd.PersistentFlags().IntVar(&options.traceLevel, "trace", 0, "The trace level of network traffic. 0 to disable") - cmd.PersistentFlags().IntVar(&options.timeoutSecs, "timeout", 0, "Feed operation timeout in seconds. 0 to disable") - cmd.PersistentFlags().BoolVar(&options.verbose, "verbose", false, "Verbose mode. Print successful operations in addition to errors") memprofile := "memprofile" cpuprofile := "cpuprofile" cmd.PersistentFlags().StringVar(&options.memprofile, memprofile, "", "Write a heap profile to given file") @@ -38,8 +39,10 @@ type feedOptions struct { verbose bool traceLevel int timeoutSecs int - memprofile string - cpuprofile string + doomSecs int + + memprofile string + cpuprofile string } func newFeedCmd(cli *CLI) *cobra.Command { @@ -131,8 +134,7 @@ func feed(files []string, options feedOptions, cli *CLI) error { NowFunc: cli.now, }, clients) throttler := document.NewThrottler(options.connections) - // TODO(mpolden): Make doom duration configurable - circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0) + circuitBreaker := document.NewCircuitBreaker(10*time.Second, time.Duration(options.doomSecs)*time.Second) dispatcher := document.NewDispatcher(client, throttler, circuitBreaker, cli.Stderr, options.verbose) start := cli.now() for _, name := range files { -- cgit v1.2.3 From 37ce1414c9d38a08e281c3faad059f0cff39560c Mon Sep 17 00:00:00 2001 From: Martin Polden Date: Mon, 24 Apr 2023 14:52:16 +0200 Subject: Tweak help --- client/go/internal/cli/cmd/feed.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index df314d02ad1..a6447ef8d2e 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -48,21 +48,20 @@ type feedOptions struct { func newFeedCmd(cli *CLI) *cobra.Command { var options feedOptions cmd := &cobra.Command{ - Use: "feed FILE", + Use: "feed FILE [FILE]...", Short: "Feed documents to a Vespa cluster", Long: `Feed documents to a Vespa cluster. -A high performance feeding client. This can be used to feed large amounts of -documents to a Vespa cluster efficiently. +This command can be used to feed large amounts of documents to a Vespa cluster +efficiently. The contents of FILE must be either a JSON array or JSON objects separated by newline (JSONL). If FILE is a single dash ('-'), documents will be read from standard input. `, - Example: `$ vespa feed documents.jsonl -$ cat documents.jsonl | vespa feed - -`, + Example: `$ vespa feed docs.jsonl moredocs.json +$ cat docs.jsonl | vespa feed -`, Args: cobra.MinimumNArgs(1), DisableAutoGenTag: true, SilenceUsage: true, -- cgit v1.2.3