diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-03-20 15:55:10 +0100 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-03-23 12:14:24 +0100 |
commit | 88cdc07df2a58388120a7a91a58bdafac31d64b4 (patch) | |
tree | 764defb986e883b778dd5c55b46a1dcc48b046ff /client | |
parent | be8965384717caac4a13c741a2927ca13f827074 (diff) |
Initial feed frontend
Diffstat (limited to 'client')
-rw-r--r-- | client/go/internal/cli/cmd/feed.go | 143 | ||||
-rw-r--r-- | client/go/internal/cli/cmd/feed_test.go | 67 | ||||
-rw-r--r-- | client/go/internal/cli/cmd/root.go | 1 |
3 files changed, 211 insertions, 0 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go new file mode 100644 index 00000000000..621676d0353 --- /dev/null +++ b/client/go/internal/cli/cmd/feed.go @@ -0,0 +1,143 @@ +package cmd + +import ( + "encoding/json" + "fmt" + "io" + "math" + "os" + "time" + + "github.com/spf13/cobra" + "github.com/vespa-engine/vespa/client/go/internal/vespa/document" +) + +func addFeedFlags(cmd *cobra.Command, maxConnections *int, concurrency *int) { + cmd.PersistentFlags().IntVarP(maxConnections, "max-connections", "N", 8, "Maximum number of HTTP connections to use") + cmd.PersistentFlags().IntVarP(concurrency, "concurrency", "T", 64, "Number of goroutines to use for dispatching") +} + +func newFeedCmd(cli *CLI) *cobra.Command { + var ( + maxConnections int + concurrency int + ) + cmd := &cobra.Command{ + Use: "feed FILE", + Short: "Feed documents to a Vespa cluster", + Long: `Feed documents to a Vespa cluster. + +A high performance feeding client. This can be used to feed large amounts of +documents to Vespa cluster efficiently. + +The contents of FILE must be either a JSON array or JSON objects separated by +newline (JSONL). +`, + Example: `$ vespa feed documents.jsonl +`, + Args: cobra.ExactArgs(1), + DisableAutoGenTag: true, + SilenceUsage: true, + Hidden: true, // TODO(mpolden): Remove when ready for public use + RunE: func(cmd *cobra.Command, args []string) error { + f, err := os.Open(args[0]) + if err != nil { + return err + } + defer f.Close() + return feed(f, cli, maxConnections, concurrency) + }, + } + addFeedFlags(cmd, &maxConnections, &concurrency) + return cmd +} + +func feed(r io.Reader, cli *CLI, maxConnections, concurrency int) error { + service, err := documentService(cli) + if err != nil { + return err + } + client := document.NewClient(document.ClientOptions{ + BaseURL: service.BaseURL, + MaxConnsPerHost: maxConnections, + }, service) + dispatcher := document.NewDispatcher(client, concurrency) + dec := document.NewDecoder(r) + + start := cli.now() + for { + doc, err := dec.Decode() + if err == io.EOF { + break + } + if err != nil { + cli.printErr(fmt.Errorf("failed to decode document: %w", err)) + } + if err := dispatcher.Enqueue(doc); err != nil { + cli.printErr(err) + } + } + if err := dispatcher.Close(); err != nil { + return err + } + elapsed := cli.now().Sub(start) + return writeSummaryJSON(cli.Stdout, client.Stats(), elapsed) +} + +type number float32 + +func (n number) MarshalJSON() ([]byte, error) { return []byte(fmt.Sprintf("%.3f", n)), nil } + +type feedSummary struct { + Seconds number `json:"feeder.seconds"` + SuccessCount int64 `json:"feeder.ok.count"` + SuccessRate number `json:"feeder.ok.rate"` + ErrorCount int64 `json:"feeder.error.count"` + InflightCount int64 `json:"feeder.inflight.count"` + + RequestCount int64 `json:"http.request.count"` + RequestBytes int64 `json:"http.request.bytes"` + RequestRate number `json:"http.request.MBps"` + ExceptionCount int64 `json:"http.exception.count"` // same as ErrorCount, for compatability with vespa-feed-client + + ResponseCount int64 `json:"http.response.count"` + ResponseBytes int64 `json:"http.response.bytes"` + ResponseRate number `json:"http.response.MBps"` + ResponseErrorCount int64 `json:"http.response.error.count"` + + ResponseMinLatency int64 `json:"http.response.latency.millis.min"` + ResponseAvgLatency int64 `json:"http.response.latency.millis.avg"` + ResponseMaxLatency int64 `json:"http.response.latency.millis.max"` + ResponseCodeCounts map[int]int64 `json:"http.response.code.counts"` +} + +func mbps(bytes int64, duration time.Duration) float64 { + return (float64(bytes) / 1000 / 1000) / math.Max(1, duration.Seconds()) +} + +func writeSummaryJSON(w io.Writer, stats document.Stats, duration time.Duration) error { + summary := feedSummary{ + Seconds: number(duration.Seconds()), + SuccessCount: stats.Successes(), + SuccessRate: number(float64(stats.Successes()) / math.Max(1, duration.Seconds())), + ErrorCount: stats.Errors, + InflightCount: stats.Inflight, + + RequestCount: stats.Requests, + RequestBytes: stats.BytesSent, + RequestRate: number(mbps(stats.BytesSent, duration)), + ExceptionCount: stats.Errors, + + ResponseCount: stats.Responses, + ResponseBytes: stats.BytesRecv, + ResponseRate: number(mbps(stats.BytesRecv, duration)), + ResponseErrorCount: stats.Responses - stats.Successes(), + ResponseMinLatency: stats.MinLatency.Milliseconds(), + ResponseAvgLatency: stats.AvgLatency().Milliseconds(), + ResponseMaxLatency: stats.MaxLatency.Milliseconds(), + ResponseCodeCounts: stats.ResponsesByCode, + } + enc := json.NewEncoder(w) + enc.SetIndent("", " ") + return enc.Encode(summary) +} diff --git a/client/go/internal/cli/cmd/feed_test.go b/client/go/internal/cli/cmd/feed_test.go new file mode 100644 index 00000000000..1bf1ef6ab9b --- /dev/null +++ b/client/go/internal/cli/cmd/feed_test.go @@ -0,0 +1,67 @@ +package cmd + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/vespa-engine/vespa/client/go/internal/mock" +) + +type manualClock struct { + t time.Time + tick time.Duration +} + +func (c *manualClock) now() time.Time { + t := c.t + c.t = c.t.Add(c.tick) + return t +} + +func TestFeed(t *testing.T) { + httpClient := &mock.HTTPClient{} + clock := &manualClock{tick: time.Second} + cli, stdout, stderr := newTestCLI(t) + cli.httpClient = httpClient + cli.now = clock.now + + td := t.TempDir() + jsonFile := filepath.Join(td, "docs.jsonl") + err := os.WriteFile(jsonFile, []byte(`{ + "put": "id:ns:type::doc1", + "fields": {"foo": "123"} +}`), 0644) + + require.Nil(t, err) + + httpClient.NextResponseString(200, `{"message":"OK"}`) + require.Nil(t, cli.Run("feed", jsonFile)) + + assert.Equal(t, "", stderr.String()) + assert.Equal(t, `{ + "feeder.seconds": 1.000, + "feeder.ok.count": 1, + "feeder.ok.rate": 1.000, + "feeder.error.count": 0, + "feeder.inflight.count": 0, + "http.request.count": 1, + "http.request.bytes": 25, + "http.request.MBps": 0.000, + "http.exception.count": 0, + "http.response.count": 1, + "http.response.bytes": 16, + "http.response.MBps": 0.000, + "http.response.error.count": 0, + "http.response.latency.millis.min": 0, + "http.response.latency.millis.avg": 0, + "http.response.latency.millis.max": 0, + "http.response.code.counts": { + "200": 1 + } +} +`, stdout.String()) +} diff --git a/client/go/internal/cli/cmd/root.go b/client/go/internal/cli/cmd/root.go index b9143a0f7d9..5edfd1136e5 100644 --- a/client/go/internal/cli/cmd/root.go +++ b/client/go/internal/cli/cmd/root.go @@ -277,6 +277,7 @@ func (c *CLI) configureCommands() { rootCmd.AddCommand(newTestCmd(c)) // test rootCmd.AddCommand(newVersionCmd(c)) // version rootCmd.AddCommand(newVisitCmd(c)) // visit + rootCmd.AddCommand(newFeedCmd(c)) // feed } func (c *CLI) printErr(err error, hints ...string) { |