summaryrefslogtreecommitdiffstats
path: root/client/go/internal/cli/cmd/feed.go
diff options
context:
space:
mode:
Diffstat (limited to 'client/go/internal/cli/cmd/feed.go')
-rw-r--r--client/go/internal/cli/cmd/feed.go92
1 files changed, 73 insertions, 19 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go
index 895a22d2be5..06568dd35c3 100644
--- a/client/go/internal/cli/cmd/feed.go
+++ b/client/go/internal/cli/cmd/feed.go
@@ -6,6 +6,7 @@ import (
"io"
"math"
"os"
+ "runtime/pprof"
"time"
"github.com/spf13/cobra"
@@ -14,16 +15,35 @@ import (
"github.com/vespa-engine/vespa/client/go/internal/vespa/document"
)
-func addFeedFlags(cmd *cobra.Command, verbose *bool, connections *int) {
- cmd.PersistentFlags().IntVarP(connections, "connections", "N", 8, "The number of connections to use")
- cmd.PersistentFlags().BoolVarP(verbose, "verbose", "v", false, "Verbose mode. Print errors as they happen")
+func addFeedFlags(cmd *cobra.Command, options *feedOptions) {
+ cmd.PersistentFlags().IntVar(&options.connections, "connections", 8, "The number of connections to use")
+ cmd.PersistentFlags().StringVar(&options.compression, "compression", "auto", `Compression mode to use. Default is "auto" which compresses large documents. Must be "auto", "gzip" or "none"`)
+ cmd.PersistentFlags().StringVar(&options.route, "route", "", "Target Vespa route for feed operations")
+ cmd.PersistentFlags().IntVar(&options.traceLevel, "trace", 0, "The trace level of network traffic. 0 to disable")
+ cmd.PersistentFlags().IntVar(&options.timeoutSecs, "timeout", 0, "Feed operation timeout in seconds. 0 to disable")
+ cmd.PersistentFlags().BoolVar(&options.verbose, "verbose", false, "Verbose mode. Print successful operations in addition to errors")
+ memprofile := "memprofile"
+ cpuprofile := "cpuprofile"
+ cmd.PersistentFlags().StringVar(&options.memprofile, memprofile, "", "Write a heap profile to given file")
+ cmd.PersistentFlags().StringVar(&options.cpuprofile, cpuprofile, "", "Write a CPU profile to given file")
+ // Hide these flags as they are intended for internal use
+ cmd.PersistentFlags().MarkHidden(memprofile)
+ cmd.PersistentFlags().MarkHidden(cpuprofile)
+}
+
+type feedOptions struct {
+ connections int
+ compression string
+ route string
+ verbose bool
+ traceLevel int
+ timeoutSecs int
+ memprofile string
+ cpuprofile string
}
func newFeedCmd(cli *CLI) *cobra.Command {
- var (
- verbose bool
- connections int
- )
+ var options feedOptions
cmd := &cobra.Command{
Use: "feed FILE",
Short: "Feed documents to a Vespa cluster",
@@ -56,10 +76,27 @@ $ cat documents.jsonl | vespa feed -
defer f.Close()
r = f
}
- return feed(r, cli, verbose, connections)
+ if options.cpuprofile != "" {
+ f, err := os.Create(options.cpuprofile)
+ if err != nil {
+ return err
+ }
+ pprof.StartCPUProfile(f)
+ defer pprof.StopCPUProfile()
+ }
+ err := feed(r, cli, options)
+ if options.memprofile != "" {
+ f, err := os.Create(options.memprofile)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ pprof.WriteHeapProfile(f)
+ }
+ return err
},
}
- addFeedFlags(cmd, &verbose, &connections)
+ addFeedFlags(cmd, &options)
return cmd
}
@@ -67,29 +104,46 @@ func createServiceClients(service *vespa.Service, n int) []util.HTTPClient {
clients := make([]util.HTTPClient, 0, n)
for i := 0; i < n; i++ {
client := service.Client().Clone()
- util.ForceHTTP2(client, service.TLSOptions.KeyPair) // Feeding should always use HTTP/2
+ // Feeding should always use HTTP/2
+ util.ForceHTTP2(client, service.TLSOptions.KeyPair, service.TLSOptions.CACertificate, service.TLSOptions.TrustAll)
clients = append(clients, client)
}
return clients
}
-func feed(r io.Reader, cli *CLI, verbose bool, connections int) error {
+func (opts feedOptions) compressionMode() (document.Compression, error) {
+ switch opts.compression {
+ case "auto":
+ return document.CompressionAuto, nil
+ case "none":
+ return document.CompressionNone, nil
+ case "gzip":
+ return document.CompressionGzip, nil
+ }
+ return 0, errHint(fmt.Errorf("invalid compression mode: %s", opts.compression), `Must be "auto", "gzip" or "none"`)
+}
+
+func feed(r io.Reader, cli *CLI, options feedOptions) error {
service, err := documentService(cli)
if err != nil {
return err
}
- clients := createServiceClients(service, connections)
+ clients := createServiceClients(service, options.connections)
+ compression, err := options.compressionMode()
+ if err != nil {
+ return err
+ }
client := document.NewClient(document.ClientOptions{
- BaseURL: service.BaseURL,
+ Compression: compression,
+ Timeout: time.Duration(options.timeoutSecs) * time.Second,
+ Route: options.route,
+ TraceLevel: options.traceLevel,
+ BaseURL: service.BaseURL,
}, clients)
- throttler := document.NewThrottler(connections)
+ throttler := document.NewThrottler(options.connections)
// TODO(mpolden): Make doom duration configurable
circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0)
- errWriter := io.Discard
- if verbose {
- errWriter = cli.Stderr
- }
- dispatcher := document.NewDispatcher(client, throttler, circuitBreaker, errWriter)
+ dispatcher := document.NewDispatcher(client, throttler, circuitBreaker, cli.Stderr, options.verbose)
dec := document.NewDecoder(r)
start := cli.now()