summaryrefslogtreecommitdiffstats
path: root/client
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-04-24 11:40:16 +0200
committerMartin Polden <mpolden@mpolden.no>2023-04-24 11:40:16 +0200
commitbded0891a0fe433afd3a10162edbc7d3d7543bc1 (patch)
treec5cef0575a6772d9258508a40e2857397013ed69 /client
parent9aea5cefe1c766aae46075a7365c163c4403f56e (diff)
Support feeding multiple files
Diffstat (limited to 'client')
-rw-r--r--client/go/internal/cli/cmd/feed.go51
-rw-r--r--client/go/internal/cli/cmd/feed_test.go39
2 files changed, 47 insertions, 43 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go
index 8635f4aa41b..9d90233e62a 100644
--- a/client/go/internal/cli/cmd/feed.go
+++ b/client/go/internal/cli/cmd/feed.go
@@ -60,22 +60,11 @@ If FILE is a single dash ('-'), documents will be read from standard input.
Example: `$ vespa feed documents.jsonl
$ cat documents.jsonl | vespa feed -
`,
- Args: cobra.ExactArgs(1),
+ Args: cobra.MinimumNArgs(1),
DisableAutoGenTag: true,
SilenceUsage: true,
Hidden: true, // TODO(mpolden): Remove when ready for public use
RunE: func(cmd *cobra.Command, args []string) error {
- var r io.Reader
- if args[0] == "-" {
- r = cli.Stdin
- } else {
- f, err := os.Open(args[0])
- if err != nil {
- return err
- }
- defer f.Close()
- r = f
- }
if options.cpuprofile != "" {
f, err := os.Create(options.cpuprofile)
if err != nil {
@@ -84,7 +73,7 @@ $ cat documents.jsonl | vespa feed -
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
}
- err := feed(r, cli, options)
+ err := feed(args, options, cli)
if options.memprofile != "" {
f, err := os.Create(options.memprofile)
if err != nil {
@@ -123,7 +112,7 @@ func (opts feedOptions) compressionMode() (document.Compression, error) {
return 0, errHint(fmt.Errorf("invalid compression mode: %s", opts.compression), `Must be "auto", "gzip" or "none"`)
}
-func feed(r io.Reader, cli *CLI, options feedOptions) error {
+func feed(files []string, options feedOptions, cli *CLI) error {
service, err := documentService(cli)
if err != nil {
return err
@@ -145,20 +134,32 @@ func feed(r io.Reader, cli *CLI, options feedOptions) error {
// TODO(mpolden): Make doom duration configurable
circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0)
dispatcher := document.NewDispatcher(client, throttler, circuitBreaker, cli.Stderr, options.verbose)
- dec := document.NewDecoder(r)
-
start := cli.now()
- for {
- doc, err := dec.Decode()
- if err == io.EOF {
- break
- }
- if err != nil {
- cli.printErr(fmt.Errorf("failed to decode document: %w", err))
+ for _, name := range files {
+ var r io.ReadCloser
+ if len(files) == 1 && name == "-" {
+ r = io.NopCloser(cli.Stdin)
+ } else {
+ f, err := os.Open(name)
+ if err != nil {
+ return err
+ }
+ r = f
}
- if err := dispatcher.Enqueue(doc); err != nil {
- cli.printErr(err)
+ dec := document.NewDecoder(r)
+ for {
+ doc, err := dec.Decode()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ cli.printErr(fmt.Errorf("failed to decode document: %w", err))
+ }
+ if err := dispatcher.Enqueue(doc); err != nil {
+ cli.printErr(err)
+ }
}
+ r.Close()
}
if err := dispatcher.Close(); err != nil {
return err
diff --git a/client/go/internal/cli/cmd/feed_test.go b/client/go/internal/cli/cmd/feed_test.go
index eb641005ab7..467d55a0a6e 100644
--- a/client/go/internal/cli/cmd/feed_test.go
+++ b/client/go/internal/cli/cmd/feed_test.go
@@ -31,47 +31,50 @@ func TestFeed(t *testing.T) {
cli.now = clock.now
td := t.TempDir()
- jsonFile := filepath.Join(td, "docs.jsonl")
- err := os.WriteFile(jsonFile, []byte(`{
+ doc := []byte(`{
"put": "id:ns:type::doc1",
"fields": {"foo": "123"}
-}`), 0644)
-
- require.Nil(t, err)
+}`)
+ jsonFile1 := filepath.Join(td, "docs1.jsonl")
+ jsonFile2 := filepath.Join(td, "docs2.jsonl")
+ require.Nil(t, os.WriteFile(jsonFile1, doc, 0644))
+ require.Nil(t, os.WriteFile(jsonFile2, doc, 0644))
httpClient.NextResponseString(200, `{"message":"OK"}`)
- require.Nil(t, cli.Run("feed", jsonFile))
+ httpClient.NextResponseString(200, `{"message":"OK"}`)
+ require.Nil(t, cli.Run("feed", jsonFile1, jsonFile2))
assert.Equal(t, "", stderr.String())
want := `{
- "feeder.seconds": 3.000,
- "feeder.ok.count": 1,
- "feeder.ok.rate": 0.333,
+ "feeder.seconds": 5.000,
+ "feeder.ok.count": 2,
+ "feeder.ok.rate": 0.400,
"feeder.error.count": 0,
"feeder.inflight.count": 0,
- "http.request.count": 1,
- "http.request.bytes": 25,
+ "http.request.count": 2,
+ "http.request.bytes": 50,
"http.request.MBps": 0.000,
"http.exception.count": 0,
- "http.response.count": 1,
- "http.response.bytes": 16,
+ "http.response.count": 2,
+ "http.response.bytes": 32,
"http.response.MBps": 0.000,
"http.response.error.count": 0,
"http.response.latency.millis.min": 1000,
"http.response.latency.millis.avg": 1000,
"http.response.latency.millis.max": 1000,
"http.response.code.counts": {
- "200": 1
+ "200": 2
}
}
`
assert.Equal(t, want, stdout.String())
stdout.Reset()
- cli.Stdin = bytes.NewBuffer([]byte(`{
- "put": "id:ns:type::doc1",
- "fields": {"foo": "123"}
-}`))
+ var stdinBuf bytes.Buffer
+ stdinBuf.Write(doc)
+ stdinBuf.Write(doc)
+ cli.Stdin = &stdinBuf
+ httpClient.NextResponseString(200, `{"message":"OK"}`)
httpClient.NextResponseString(200, `{"message":"OK"}`)
require.Nil(t, cli.Run("feed", "-"))
assert.Equal(t, want, stdout.String())