aboutsummaryrefslogtreecommitdiffstats
path: root/client/go
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-06-29 15:47:57 +0200
committerMartin Polden <mpolden@mpolden.no>2023-06-29 15:47:57 +0200
commit9d7ac9e21db6df4b2b7a89f6513be5fcddded76c (patch)
treee9a386985f106fa8e3791f9f74af0efa03feb8e2 /client/go
parent826edc1dc433eb54b15ba414f16dff20f8d0586d (diff)
Let decoding and en-queuing errors bubble up
Diffstat (limited to 'client/go')
-rw-r--r--client/go/internal/cli/cmd/feed.go57
-rw-r--r--client/go/internal/cli/cmd/feed_test.go48
2 files changed, 79 insertions, 26 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go
index cad3568a89f..fb01998b83f 100644
--- a/client/go/internal/cli/cmd/feed.go
+++ b/client/go/internal/cli/cmd/feed.go
@@ -147,7 +147,7 @@ func (opts feedOptions) compressionMode() (document.Compression, error) {
return 0, errHint(fmt.Errorf("invalid compression mode: %s", opts.compression), `Must be "auto", "gzip" or "none"`)
}
-func feedFiles(files []string, dispatcher *document.Dispatcher, cli *CLI) {
+func enqueueFromFiles(files []string, dispatcher *document.Dispatcher, cli *CLI) error {
for _, name := range files {
var r io.ReadCloser
if len(files) == 1 && name == "-" {
@@ -160,11 +160,14 @@ func feedFiles(files []string, dispatcher *document.Dispatcher, cli *CLI) {
}
r = f
}
- dispatchFrom(r, dispatcher, cli)
+ if err := enqueueFrom(r, dispatcher, cli); err != nil {
+ return err
+ }
}
+ return nil
}
-func dispatchFrom(r io.ReadCloser, dispatcher *document.Dispatcher, cli *CLI) {
+func enqueueFrom(r io.ReadCloser, dispatcher *document.Dispatcher, cli *CLI) error {
dec := document.NewDecoder(bufio.NewReaderSize(r, 1<<26)) // Buffer up to 64M of data at a time
defer r.Close()
for {
@@ -173,12 +176,27 @@ func dispatchFrom(r io.ReadCloser, dispatcher *document.Dispatcher, cli *CLI) {
break
}
if err != nil {
- cli.printErr(fmt.Errorf("failed to decode document: %w", err))
+ return fmt.Errorf("failed to decode document: %w", err)
}
if err := dispatcher.Enqueue(doc); err != nil {
- cli.printErr(err)
+ return err
+ }
+ }
+ return nil
+}
+
+func enqueueAndWait(files []string, dispatcher *document.Dispatcher, options feedOptions, cli *CLI) error {
+ defer dispatcher.Close()
+ if options.speedtestBytes > 0 {
+ if len(files) > 0 {
+ return fmt.Errorf("option --speedtest cannot be combined with feed files")
}
+ gen := document.NewGenerator(options.speedtestBytes, cli.now().Add(time.Duration(options.speedtestSecs)*time.Second))
+ return enqueueFrom(io.NopCloser(gen), dispatcher, cli)
+ } else if len(files) > 0 {
+ return enqueueFromFiles(files, dispatcher, cli)
}
+ return fmt.Errorf("at least one file to feed from must specified")
}
func feed(files []string, options feedOptions, cli *CLI) error {
@@ -191,14 +209,13 @@ func feed(files []string, options feedOptions, cli *CLI) error {
if err != nil {
return err
}
- speedtest := options.speedtestBytes > 0
client, err := document.NewClient(document.ClientOptions{
Compression: compression,
Timeout: timeout,
Route: options.route,
TraceLevel: options.traceLevel,
BaseURL: baseURL,
- Speedtest: speedtest,
+ Speedtest: options.speedtestBytes > 0,
NowFunc: cli.now,
}, clients)
if err != nil {
@@ -209,26 +226,14 @@ func feed(files []string, options feedOptions, cli *CLI) error {
dispatcher := document.NewDispatcher(client, throttler, circuitBreaker, cli.Stderr, options.verbose)
start := cli.now()
summaryTicker := summaryTicker(options.summarySecs, cli, start, dispatcher.Stats)
- if speedtest {
- if len(files) > 0 {
- return fmt.Errorf("option --speedtest cannot be combined with feed files")
+ defer func() {
+ if summaryTicker != nil {
+ summaryTicker.Stop()
}
- gen := document.NewGenerator(options.speedtestBytes, cli.now().Add(time.Duration(options.speedtestSecs)*time.Second))
- dispatchFrom(io.NopCloser(gen), dispatcher, cli)
- } else if len(files) > 0 {
- feedFiles(files, dispatcher, cli)
- } else {
- dispatcher.Close()
- return fmt.Errorf("at least one file to feed from must specified")
- }
- if err := dispatcher.Close(); err != nil {
- return err
- }
- if summaryTicker != nil {
- summaryTicker.Stop()
- }
- elapsed := cli.now().Sub(start)
- return writeSummaryJSON(cli.Stdout, dispatcher.Stats(), elapsed)
+ elapsed := cli.now().Sub(start)
+ writeSummaryJSON(cli.Stdout, dispatcher.Stats(), elapsed)
+ }()
+ return enqueueAndWait(files, dispatcher, options, cli)
}
type number float32
diff --git a/client/go/internal/cli/cmd/feed_test.go b/client/go/internal/cli/cmd/feed_test.go
index 96df24f2035..d1904ad815e 100644
--- a/client/go/internal/cli/cmd/feed_test.go
+++ b/client/go/internal/cli/cmd/feed_test.go
@@ -88,3 +88,51 @@ func TestFeed(t *testing.T) {
require.Nil(t, cli.Run("feed", jsonFile1))
assert.Equal(t, "feed: got error \"something else is broken\" (no body) for put id:ns:type::doc1: retrying\n", stderr.String())
}
+
+func TestFeedInvalid(t *testing.T) {
+ clock := &manualClock{tick: time.Second}
+ cli, stdout, stderr := newTestCLI(t)
+ httpClient := cli.httpClient.(*mock.HTTPClient)
+ httpClient.ReadBody = true
+ cli.now = clock.now
+
+ td := t.TempDir()
+ doc := []byte(`
+{
+ "put": "id:ns:type::doc1",
+ "fields": {"foo": "123"}
+},
+{
+ "put": "id:ns:type::doc2",
+ "fields": {"foo": "invalid json"
+}`)
+ jsonFile := filepath.Join(td, "docs.jsonl")
+ require.Nil(t, os.WriteFile(jsonFile, doc, 0644))
+ httpClient.NextResponseString(200, `{"message":"OK"}`)
+ require.NotNil(t, cli.Run("feed", jsonFile))
+
+ want := `{
+ "feeder.seconds": 3.000,
+ "feeder.ok.count": 1,
+ "feeder.ok.rate": 0.333,
+ "feeder.error.count": 0,
+ "feeder.inflight.count": 0,
+ "http.request.count": 1,
+ "http.request.bytes": 25,
+ "http.request.MBps": 0.000,
+ "http.exception.count": 0,
+ "http.response.count": 1,
+ "http.response.bytes": 16,
+ "http.response.MBps": 0.000,
+ "http.response.error.count": 0,
+ "http.response.latency.millis.min": 1000,
+ "http.response.latency.millis.avg": 1000,
+ "http.response.latency.millis.max": 1000,
+ "http.response.code.counts": {
+ "200": 1
+ }
+}
+`
+ assert.Equal(t, want, stdout.String())
+ assert.Contains(t, stderr.String(), "Error: failed to decode document")
+}