diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-06-29 15:47:57 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-06-29 15:47:57 +0200 |
commit | 9d7ac9e21db6df4b2b7a89f6513be5fcddded76c (patch) | |
tree | e9a386985f106fa8e3791f9f74af0efa03feb8e2 /client/go | |
parent | 826edc1dc433eb54b15ba414f16dff20f8d0586d (diff) |
Let decoding and en-queuing errors bubble up
Diffstat (limited to 'client/go')
-rw-r--r-- | client/go/internal/cli/cmd/feed.go | 57 | ||||
-rw-r--r-- | client/go/internal/cli/cmd/feed_test.go | 48 |
2 files changed, 79 insertions, 26 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index cad3568a89f..fb01998b83f 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -147,7 +147,7 @@ func (opts feedOptions) compressionMode() (document.Compression, error) { return 0, errHint(fmt.Errorf("invalid compression mode: %s", opts.compression), `Must be "auto", "gzip" or "none"`) } -func feedFiles(files []string, dispatcher *document.Dispatcher, cli *CLI) { +func enqueueFromFiles(files []string, dispatcher *document.Dispatcher, cli *CLI) error { for _, name := range files { var r io.ReadCloser if len(files) == 1 && name == "-" { @@ -160,11 +160,14 @@ func feedFiles(files []string, dispatcher *document.Dispatcher, cli *CLI) { } r = f } - dispatchFrom(r, dispatcher, cli) + if err := enqueueFrom(r, dispatcher, cli); err != nil { + return err + } } + return nil } -func dispatchFrom(r io.ReadCloser, dispatcher *document.Dispatcher, cli *CLI) { +func enqueueFrom(r io.ReadCloser, dispatcher *document.Dispatcher, cli *CLI) error { dec := document.NewDecoder(bufio.NewReaderSize(r, 1<<26)) // Buffer up to 64M of data at a time defer r.Close() for { @@ -173,12 +176,27 @@ func dispatchFrom(r io.ReadCloser, dispatcher *document.Dispatcher, cli *CLI) { break } if err != nil { - cli.printErr(fmt.Errorf("failed to decode document: %w", err)) + return fmt.Errorf("failed to decode document: %w", err) } if err := dispatcher.Enqueue(doc); err != nil { - cli.printErr(err) + return err + } + } + return nil +} + +func enqueueAndWait(files []string, dispatcher *document.Dispatcher, options feedOptions, cli *CLI) error { + defer dispatcher.Close() + if options.speedtestBytes > 0 { + if len(files) > 0 { + return fmt.Errorf("option --speedtest cannot be combined with feed files") } + gen := document.NewGenerator(options.speedtestBytes, cli.now().Add(time.Duration(options.speedtestSecs)*time.Second)) + return enqueueFrom(io.NopCloser(gen), dispatcher, cli) + } else if len(files) > 0 { + return enqueueFromFiles(files, dispatcher, cli) } + return fmt.Errorf("at least one file to feed from must specified") } func feed(files []string, options feedOptions, cli *CLI) error { @@ -191,14 +209,13 @@ func feed(files []string, options feedOptions, cli *CLI) error { if err != nil { return err } - speedtest := options.speedtestBytes > 0 client, err := document.NewClient(document.ClientOptions{ Compression: compression, Timeout: timeout, Route: options.route, TraceLevel: options.traceLevel, BaseURL: baseURL, - Speedtest: speedtest, + Speedtest: options.speedtestBytes > 0, NowFunc: cli.now, }, clients) if err != nil { @@ -209,26 +226,14 @@ func feed(files []string, options feedOptions, cli *CLI) error { dispatcher := document.NewDispatcher(client, throttler, circuitBreaker, cli.Stderr, options.verbose) start := cli.now() summaryTicker := summaryTicker(options.summarySecs, cli, start, dispatcher.Stats) - if speedtest { - if len(files) > 0 { - return fmt.Errorf("option --speedtest cannot be combined with feed files") + defer func() { + if summaryTicker != nil { + summaryTicker.Stop() } - gen := document.NewGenerator(options.speedtestBytes, cli.now().Add(time.Duration(options.speedtestSecs)*time.Second)) - dispatchFrom(io.NopCloser(gen), dispatcher, cli) - } else if len(files) > 0 { - feedFiles(files, dispatcher, cli) - } else { - dispatcher.Close() - return fmt.Errorf("at least one file to feed from must specified") - } - if err := dispatcher.Close(); err != nil { - return err - } - if summaryTicker != nil { - summaryTicker.Stop() - } - elapsed := cli.now().Sub(start) - return writeSummaryJSON(cli.Stdout, dispatcher.Stats(), elapsed) + elapsed := cli.now().Sub(start) + writeSummaryJSON(cli.Stdout, dispatcher.Stats(), elapsed) + }() + return enqueueAndWait(files, dispatcher, options, cli) } type number float32 diff --git a/client/go/internal/cli/cmd/feed_test.go b/client/go/internal/cli/cmd/feed_test.go index 96df24f2035..d1904ad815e 100644 --- a/client/go/internal/cli/cmd/feed_test.go +++ b/client/go/internal/cli/cmd/feed_test.go @@ -88,3 +88,51 @@ func TestFeed(t *testing.T) { require.Nil(t, cli.Run("feed", jsonFile1)) assert.Equal(t, "feed: got error \"something else is broken\" (no body) for put id:ns:type::doc1: retrying\n", stderr.String()) } + +func TestFeedInvalid(t *testing.T) { + clock := &manualClock{tick: time.Second} + cli, stdout, stderr := newTestCLI(t) + httpClient := cli.httpClient.(*mock.HTTPClient) + httpClient.ReadBody = true + cli.now = clock.now + + td := t.TempDir() + doc := []byte(` +{ + "put": "id:ns:type::doc1", + "fields": {"foo": "123"} +}, +{ + "put": "id:ns:type::doc2", + "fields": {"foo": "invalid json" +}`) + jsonFile := filepath.Join(td, "docs.jsonl") + require.Nil(t, os.WriteFile(jsonFile, doc, 0644)) + httpClient.NextResponseString(200, `{"message":"OK"}`) + require.NotNil(t, cli.Run("feed", jsonFile)) + + want := `{ + "feeder.seconds": 3.000, + "feeder.ok.count": 1, + "feeder.ok.rate": 0.333, + "feeder.error.count": 0, + "feeder.inflight.count": 0, + "http.request.count": 1, + "http.request.bytes": 25, + "http.request.MBps": 0.000, + "http.exception.count": 0, + "http.response.count": 1, + "http.response.bytes": 16, + "http.response.MBps": 0.000, + "http.response.error.count": 0, + "http.response.latency.millis.min": 1000, + "http.response.latency.millis.avg": 1000, + "http.response.latency.millis.max": 1000, + "http.response.code.counts": { + "200": 1 + } +} +` + assert.Equal(t, want, stdout.String()) + assert.Contains(t, stderr.String(), "Error: failed to decode document") +} |