aboutsummaryrefslogtreecommitdiffstats
path: root/client
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-03-20 15:55:10 +0100
committerMartin Polden <mpolden@mpolden.no>2023-03-23 12:14:24 +0100
commit88cdc07df2a58388120a7a91a58bdafac31d64b4 (patch)
tree764defb986e883b778dd5c55b46a1dcc48b046ff /client
parentbe8965384717caac4a13c741a2927ca13f827074 (diff)
Initial feed frontend
Diffstat (limited to 'client')
-rw-r--r--client/go/internal/cli/cmd/feed.go143
-rw-r--r--client/go/internal/cli/cmd/feed_test.go67
-rw-r--r--client/go/internal/cli/cmd/root.go1
3 files changed, 211 insertions, 0 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go
new file mode 100644
index 00000000000..621676d0353
--- /dev/null
+++ b/client/go/internal/cli/cmd/feed.go
@@ -0,0 +1,143 @@
+package cmd
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "math"
+ "os"
+ "time"
+
+ "github.com/spf13/cobra"
+ "github.com/vespa-engine/vespa/client/go/internal/vespa/document"
+)
+
+func addFeedFlags(cmd *cobra.Command, maxConnections *int, concurrency *int) {
+ cmd.PersistentFlags().IntVarP(maxConnections, "max-connections", "N", 8, "Maximum number of HTTP connections to use")
+ cmd.PersistentFlags().IntVarP(concurrency, "concurrency", "T", 64, "Number of goroutines to use for dispatching")
+}
+
+func newFeedCmd(cli *CLI) *cobra.Command {
+ var (
+ maxConnections int
+ concurrency int
+ )
+ cmd := &cobra.Command{
+ Use: "feed FILE",
+ Short: "Feed documents to a Vespa cluster",
+ Long: `Feed documents to a Vespa cluster.
+
+A high performance feeding client. This can be used to feed large amounts of
+documents to Vespa cluster efficiently.
+
+The contents of FILE must be either a JSON array or JSON objects separated by
+newline (JSONL).
+`,
+ Example: `$ vespa feed documents.jsonl
+`,
+ Args: cobra.ExactArgs(1),
+ DisableAutoGenTag: true,
+ SilenceUsage: true,
+ Hidden: true, // TODO(mpolden): Remove when ready for public use
+ RunE: func(cmd *cobra.Command, args []string) error {
+ f, err := os.Open(args[0])
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ return feed(f, cli, maxConnections, concurrency)
+ },
+ }
+ addFeedFlags(cmd, &maxConnections, &concurrency)
+ return cmd
+}
+
+func feed(r io.Reader, cli *CLI, maxConnections, concurrency int) error {
+ service, err := documentService(cli)
+ if err != nil {
+ return err
+ }
+ client := document.NewClient(document.ClientOptions{
+ BaseURL: service.BaseURL,
+ MaxConnsPerHost: maxConnections,
+ }, service)
+ dispatcher := document.NewDispatcher(client, concurrency)
+ dec := document.NewDecoder(r)
+
+ start := cli.now()
+ for {
+ doc, err := dec.Decode()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ cli.printErr(fmt.Errorf("failed to decode document: %w", err))
+ }
+ if err := dispatcher.Enqueue(doc); err != nil {
+ cli.printErr(err)
+ }
+ }
+ if err := dispatcher.Close(); err != nil {
+ return err
+ }
+ elapsed := cli.now().Sub(start)
+ return writeSummaryJSON(cli.Stdout, client.Stats(), elapsed)
+}
+
+type number float32
+
+func (n number) MarshalJSON() ([]byte, error) { return []byte(fmt.Sprintf("%.3f", n)), nil }
+
+type feedSummary struct {
+ Seconds number `json:"feeder.seconds"`
+ SuccessCount int64 `json:"feeder.ok.count"`
+ SuccessRate number `json:"feeder.ok.rate"`
+ ErrorCount int64 `json:"feeder.error.count"`
+ InflightCount int64 `json:"feeder.inflight.count"`
+
+ RequestCount int64 `json:"http.request.count"`
+ RequestBytes int64 `json:"http.request.bytes"`
+ RequestRate number `json:"http.request.MBps"`
+ ExceptionCount int64 `json:"http.exception.count"` // same as ErrorCount, for compatability with vespa-feed-client
+
+ ResponseCount int64 `json:"http.response.count"`
+ ResponseBytes int64 `json:"http.response.bytes"`
+ ResponseRate number `json:"http.response.MBps"`
+ ResponseErrorCount int64 `json:"http.response.error.count"`
+
+ ResponseMinLatency int64 `json:"http.response.latency.millis.min"`
+ ResponseAvgLatency int64 `json:"http.response.latency.millis.avg"`
+ ResponseMaxLatency int64 `json:"http.response.latency.millis.max"`
+ ResponseCodeCounts map[int]int64 `json:"http.response.code.counts"`
+}
+
+func mbps(bytes int64, duration time.Duration) float64 {
+ return (float64(bytes) / 1000 / 1000) / math.Max(1, duration.Seconds())
+}
+
+func writeSummaryJSON(w io.Writer, stats document.Stats, duration time.Duration) error {
+ summary := feedSummary{
+ Seconds: number(duration.Seconds()),
+ SuccessCount: stats.Successes(),
+ SuccessRate: number(float64(stats.Successes()) / math.Max(1, duration.Seconds())),
+ ErrorCount: stats.Errors,
+ InflightCount: stats.Inflight,
+
+ RequestCount: stats.Requests,
+ RequestBytes: stats.BytesSent,
+ RequestRate: number(mbps(stats.BytesSent, duration)),
+ ExceptionCount: stats.Errors,
+
+ ResponseCount: stats.Responses,
+ ResponseBytes: stats.BytesRecv,
+ ResponseRate: number(mbps(stats.BytesRecv, duration)),
+ ResponseErrorCount: stats.Responses - stats.Successes(),
+ ResponseMinLatency: stats.MinLatency.Milliseconds(),
+ ResponseAvgLatency: stats.AvgLatency().Milliseconds(),
+ ResponseMaxLatency: stats.MaxLatency.Milliseconds(),
+ ResponseCodeCounts: stats.ResponsesByCode,
+ }
+ enc := json.NewEncoder(w)
+ enc.SetIndent("", " ")
+ return enc.Encode(summary)
+}
diff --git a/client/go/internal/cli/cmd/feed_test.go b/client/go/internal/cli/cmd/feed_test.go
new file mode 100644
index 00000000000..1bf1ef6ab9b
--- /dev/null
+++ b/client/go/internal/cli/cmd/feed_test.go
@@ -0,0 +1,67 @@
+package cmd
+
+import (
+ "os"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "github.com/vespa-engine/vespa/client/go/internal/mock"
+)
+
+type manualClock struct {
+ t time.Time
+ tick time.Duration
+}
+
+func (c *manualClock) now() time.Time {
+ t := c.t
+ c.t = c.t.Add(c.tick)
+ return t
+}
+
+func TestFeed(t *testing.T) {
+ httpClient := &mock.HTTPClient{}
+ clock := &manualClock{tick: time.Second}
+ cli, stdout, stderr := newTestCLI(t)
+ cli.httpClient = httpClient
+ cli.now = clock.now
+
+ td := t.TempDir()
+ jsonFile := filepath.Join(td, "docs.jsonl")
+ err := os.WriteFile(jsonFile, []byte(`{
+ "put": "id:ns:type::doc1",
+ "fields": {"foo": "123"}
+}`), 0644)
+
+ require.Nil(t, err)
+
+ httpClient.NextResponseString(200, `{"message":"OK"}`)
+ require.Nil(t, cli.Run("feed", jsonFile))
+
+ assert.Equal(t, "", stderr.String())
+ assert.Equal(t, `{
+ "feeder.seconds": 1.000,
+ "feeder.ok.count": 1,
+ "feeder.ok.rate": 1.000,
+ "feeder.error.count": 0,
+ "feeder.inflight.count": 0,
+ "http.request.count": 1,
+ "http.request.bytes": 25,
+ "http.request.MBps": 0.000,
+ "http.exception.count": 0,
+ "http.response.count": 1,
+ "http.response.bytes": 16,
+ "http.response.MBps": 0.000,
+ "http.response.error.count": 0,
+ "http.response.latency.millis.min": 0,
+ "http.response.latency.millis.avg": 0,
+ "http.response.latency.millis.max": 0,
+ "http.response.code.counts": {
+ "200": 1
+ }
+}
+`, stdout.String())
+}
diff --git a/client/go/internal/cli/cmd/root.go b/client/go/internal/cli/cmd/root.go
index b9143a0f7d9..5edfd1136e5 100644
--- a/client/go/internal/cli/cmd/root.go
+++ b/client/go/internal/cli/cmd/root.go
@@ -277,6 +277,7 @@ func (c *CLI) configureCommands() {
rootCmd.AddCommand(newTestCmd(c)) // test
rootCmd.AddCommand(newVersionCmd(c)) // version
rootCmd.AddCommand(newVisitCmd(c)) // visit
+ rootCmd.AddCommand(newFeedCmd(c)) // feed
}
func (c *CLI) printErr(err error, hints ...string) {