diff options
author | Jon Marius Venstad <jonmv@users.noreply.github.com> | 2023-04-24 11:44:16 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-04-24 11:44:16 +0200 |
commit | d643ff9ae0fb06cf27f6ba5f51b3db4b25ffa108 (patch) | |
tree | c5cef0575a6772d9258508a40e2857397013ed69 | |
parent | 9aea5cefe1c766aae46075a7365c163c4403f56e (diff) | |
parent | bded0891a0fe433afd3a10162edbc7d3d7543bc1 (diff) |
Merge pull request #26828 from vespa-engine/mpolden/feed-client-10
Support feeding multiple files
-rw-r--r-- | client/go/internal/cli/cmd/feed.go | 51 | ||||
-rw-r--r-- | client/go/internal/cli/cmd/feed_test.go | 39 |
2 files changed, 47 insertions, 43 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index 8635f4aa41b..9d90233e62a 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -60,22 +60,11 @@ If FILE is a single dash ('-'), documents will be read from standard input. Example: `$ vespa feed documents.jsonl $ cat documents.jsonl | vespa feed - `, - Args: cobra.ExactArgs(1), + Args: cobra.MinimumNArgs(1), DisableAutoGenTag: true, SilenceUsage: true, Hidden: true, // TODO(mpolden): Remove when ready for public use RunE: func(cmd *cobra.Command, args []string) error { - var r io.Reader - if args[0] == "-" { - r = cli.Stdin - } else { - f, err := os.Open(args[0]) - if err != nil { - return err - } - defer f.Close() - r = f - } if options.cpuprofile != "" { f, err := os.Create(options.cpuprofile) if err != nil { @@ -84,7 +73,7 @@ $ cat documents.jsonl | vespa feed - pprof.StartCPUProfile(f) defer pprof.StopCPUProfile() } - err := feed(r, cli, options) + err := feed(args, options, cli) if options.memprofile != "" { f, err := os.Create(options.memprofile) if err != nil { @@ -123,7 +112,7 @@ func (opts feedOptions) compressionMode() (document.Compression, error) { return 0, errHint(fmt.Errorf("invalid compression mode: %s", opts.compression), `Must be "auto", "gzip" or "none"`) } -func feed(r io.Reader, cli *CLI, options feedOptions) error { +func feed(files []string, options feedOptions, cli *CLI) error { service, err := documentService(cli) if err != nil { return err @@ -145,20 +134,32 @@ func feed(r io.Reader, cli *CLI, options feedOptions) error { // TODO(mpolden): Make doom duration configurable circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0) dispatcher := document.NewDispatcher(client, throttler, circuitBreaker, cli.Stderr, options.verbose) - dec := document.NewDecoder(r) - start := cli.now() - for { - doc, err := dec.Decode() - if err == io.EOF { - break - } - if err != nil { - cli.printErr(fmt.Errorf("failed to decode document: %w", err)) + for _, name := range files { + var r io.ReadCloser + if len(files) == 1 && name == "-" { + r = io.NopCloser(cli.Stdin) + } else { + f, err := os.Open(name) + if err != nil { + return err + } + r = f } - if err := dispatcher.Enqueue(doc); err != nil { - cli.printErr(err) + dec := document.NewDecoder(r) + for { + doc, err := dec.Decode() + if err == io.EOF { + break + } + if err != nil { + cli.printErr(fmt.Errorf("failed to decode document: %w", err)) + } + if err := dispatcher.Enqueue(doc); err != nil { + cli.printErr(err) + } } + r.Close() } if err := dispatcher.Close(); err != nil { return err diff --git a/client/go/internal/cli/cmd/feed_test.go b/client/go/internal/cli/cmd/feed_test.go index eb641005ab7..467d55a0a6e 100644 --- a/client/go/internal/cli/cmd/feed_test.go +++ b/client/go/internal/cli/cmd/feed_test.go @@ -31,47 +31,50 @@ func TestFeed(t *testing.T) { cli.now = clock.now td := t.TempDir() - jsonFile := filepath.Join(td, "docs.jsonl") - err := os.WriteFile(jsonFile, []byte(`{ + doc := []byte(`{ "put": "id:ns:type::doc1", "fields": {"foo": "123"} -}`), 0644) - - require.Nil(t, err) +}`) + jsonFile1 := filepath.Join(td, "docs1.jsonl") + jsonFile2 := filepath.Join(td, "docs2.jsonl") + require.Nil(t, os.WriteFile(jsonFile1, doc, 0644)) + require.Nil(t, os.WriteFile(jsonFile2, doc, 0644)) httpClient.NextResponseString(200, `{"message":"OK"}`) - require.Nil(t, cli.Run("feed", jsonFile)) + httpClient.NextResponseString(200, `{"message":"OK"}`) + require.Nil(t, cli.Run("feed", jsonFile1, jsonFile2)) assert.Equal(t, "", stderr.String()) want := `{ - "feeder.seconds": 3.000, - "feeder.ok.count": 1, - "feeder.ok.rate": 0.333, + "feeder.seconds": 5.000, + "feeder.ok.count": 2, + "feeder.ok.rate": 0.400, "feeder.error.count": 0, "feeder.inflight.count": 0, - "http.request.count": 1, - "http.request.bytes": 25, + "http.request.count": 2, + "http.request.bytes": 50, "http.request.MBps": 0.000, "http.exception.count": 0, - "http.response.count": 1, - "http.response.bytes": 16, + "http.response.count": 2, + "http.response.bytes": 32, "http.response.MBps": 0.000, "http.response.error.count": 0, "http.response.latency.millis.min": 1000, "http.response.latency.millis.avg": 1000, "http.response.latency.millis.max": 1000, "http.response.code.counts": { - "200": 1 + "200": 2 } } ` assert.Equal(t, want, stdout.String()) stdout.Reset() - cli.Stdin = bytes.NewBuffer([]byte(`{ - "put": "id:ns:type::doc1", - "fields": {"foo": "123"} -}`)) + var stdinBuf bytes.Buffer + stdinBuf.Write(doc) + stdinBuf.Write(doc) + cli.Stdin = &stdinBuf + httpClient.NextResponseString(200, `{"message":"OK"}`) httpClient.NextResponseString(200, `{"message":"OK"}`) require.Nil(t, cli.Run("feed", "-")) assert.Equal(t, want, stdout.String()) |