diff options
Diffstat (limited to 'client/go/internal/cli/cmd/feed.go')
-rw-r--r-- | client/go/internal/cli/cmd/feed.go | 92 |
1 files changed, 73 insertions, 19 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index 895a22d2be5..06568dd35c3 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -6,6 +6,7 @@ import ( "io" "math" "os" + "runtime/pprof" "time" "github.com/spf13/cobra" @@ -14,16 +15,35 @@ import ( "github.com/vespa-engine/vespa/client/go/internal/vespa/document" ) -func addFeedFlags(cmd *cobra.Command, verbose *bool, connections *int) { - cmd.PersistentFlags().IntVarP(connections, "connections", "N", 8, "The number of connections to use") - cmd.PersistentFlags().BoolVarP(verbose, "verbose", "v", false, "Verbose mode. Print errors as they happen") +func addFeedFlags(cmd *cobra.Command, options *feedOptions) { + cmd.PersistentFlags().IntVar(&options.connections, "connections", 8, "The number of connections to use") + cmd.PersistentFlags().StringVar(&options.compression, "compression", "auto", `Compression mode to use. Default is "auto" which compresses large documents. Must be "auto", "gzip" or "none"`) + cmd.PersistentFlags().StringVar(&options.route, "route", "", "Target Vespa route for feed operations") + cmd.PersistentFlags().IntVar(&options.traceLevel, "trace", 0, "The trace level of network traffic. 0 to disable") + cmd.PersistentFlags().IntVar(&options.timeoutSecs, "timeout", 0, "Feed operation timeout in seconds. 0 to disable") + cmd.PersistentFlags().BoolVar(&options.verbose, "verbose", false, "Verbose mode. Print successful operations in addition to errors") + memprofile := "memprofile" + cpuprofile := "cpuprofile" + cmd.PersistentFlags().StringVar(&options.memprofile, memprofile, "", "Write a heap profile to given file") + cmd.PersistentFlags().StringVar(&options.cpuprofile, cpuprofile, "", "Write a CPU profile to given file") + // Hide these flags as they are intended for internal use + cmd.PersistentFlags().MarkHidden(memprofile) + cmd.PersistentFlags().MarkHidden(cpuprofile) +} + +type feedOptions struct { + connections int + compression string + route string + verbose bool + traceLevel int + timeoutSecs int + memprofile string + cpuprofile string } func newFeedCmd(cli *CLI) *cobra.Command { - var ( - verbose bool - connections int - ) + var options feedOptions cmd := &cobra.Command{ Use: "feed FILE", Short: "Feed documents to a Vespa cluster", @@ -56,10 +76,27 @@ $ cat documents.jsonl | vespa feed - defer f.Close() r = f } - return feed(r, cli, verbose, connections) + if options.cpuprofile != "" { + f, err := os.Create(options.cpuprofile) + if err != nil { + return err + } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() + } + err := feed(r, cli, options) + if options.memprofile != "" { + f, err := os.Create(options.memprofile) + if err != nil { + return err + } + defer f.Close() + pprof.WriteHeapProfile(f) + } + return err }, } - addFeedFlags(cmd, &verbose, &connections) + addFeedFlags(cmd, &options) return cmd } @@ -67,29 +104,46 @@ func createServiceClients(service *vespa.Service, n int) []util.HTTPClient { clients := make([]util.HTTPClient, 0, n) for i := 0; i < n; i++ { client := service.Client().Clone() - util.ForceHTTP2(client, service.TLSOptions.KeyPair) // Feeding should always use HTTP/2 + // Feeding should always use HTTP/2 + util.ForceHTTP2(client, service.TLSOptions.KeyPair, service.TLSOptions.CACertificate, service.TLSOptions.TrustAll) clients = append(clients, client) } return clients } -func feed(r io.Reader, cli *CLI, verbose bool, connections int) error { +func (opts feedOptions) compressionMode() (document.Compression, error) { + switch opts.compression { + case "auto": + return document.CompressionAuto, nil + case "none": + return document.CompressionNone, nil + case "gzip": + return document.CompressionGzip, nil + } + return 0, errHint(fmt.Errorf("invalid compression mode: %s", opts.compression), `Must be "auto", "gzip" or "none"`) +} + +func feed(r io.Reader, cli *CLI, options feedOptions) error { service, err := documentService(cli) if err != nil { return err } - clients := createServiceClients(service, connections) + clients := createServiceClients(service, options.connections) + compression, err := options.compressionMode() + if err != nil { + return err + } client := document.NewClient(document.ClientOptions{ - BaseURL: service.BaseURL, + Compression: compression, + Timeout: time.Duration(options.timeoutSecs) * time.Second, + Route: options.route, + TraceLevel: options.traceLevel, + BaseURL: service.BaseURL, }, clients) - throttler := document.NewThrottler(connections) + throttler := document.NewThrottler(options.connections) // TODO(mpolden): Make doom duration configurable circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0) - errWriter := io.Discard - if verbose { - errWriter = cli.Stderr - } - dispatcher := document.NewDispatcher(client, throttler, circuitBreaker, errWriter) + dispatcher := document.NewDispatcher(client, throttler, circuitBreaker, cli.Stderr, options.verbose) dec := document.NewDecoder(r) start := cli.now() |