aboutsummaryrefslogtreecommitdiffstats
path: root/client/go
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2024-06-20 09:58:55 +0200
committerGitHub <noreply@github.com>2024-06-20 09:58:55 +0200
commit4c5c25cdd097424c2596b2202f902e0c3fa9b91c (patch)
tree3f08d4f19f04b6714fe04fee8d39afe6cccde191 /client/go
parent7954a76d91d47fae17ec5c03705aef5bc87745b9 (diff)
parent9539043519f7180370442149c6cf9ab677c56b52 (diff)
Merge pull request #31604 from vespa-engine/mpolden/handle-large-sse
Handle larger SSE events
Diffstat (limited to 'client/go')
-rw-r--r--client/go/internal/cli/cmd/query.go11
-rw-r--r--client/go/internal/sse/sse.go9
-rw-r--r--client/go/internal/sse/sse_test.go7
3 files changed, 20 insertions, 7 deletions
diff --git a/client/go/internal/cli/cmd/query.go b/client/go/internal/cli/cmd/query.go
index 54bbc5fb59a..4d5941943ca 100644
--- a/client/go/internal/cli/cmd/query.go
+++ b/client/go/internal/cli/cmd/query.go
@@ -5,7 +5,6 @@
package cmd
import (
- "bufio"
"encoding/json"
"fmt"
"io"
@@ -146,13 +145,11 @@ type printOptions struct {
func printResponseBody(body io.Reader, options printOptions, cli *CLI) error {
if options.plainStream {
- scanner := bufio.NewScanner(body)
- for scanner.Scan() {
- fmt.Fprintln(cli.Stdout, scanner.Text())
- }
- return scanner.Err()
+ _, err := io.Copy(cli.Stdout, body)
+ return err
} else if options.tokenStream {
- dec := sse.NewDecoder(body)
+ bufSize := 1024 * 1024 // Handle events up to this size
+ dec := sse.NewDecoderSize(body, bufSize)
writingLine := false
for {
event, err := dec.Decode()
diff --git a/client/go/internal/sse/sse.go b/client/go/internal/sse/sse.go
index 9a120944eec..caccc90d354 100644
--- a/client/go/internal/sse/sse.go
+++ b/client/go/internal/sse/sse.go
@@ -87,6 +87,15 @@ func NewDecoder(r io.Reader) *Decoder {
return &Decoder{scanner: bufio.NewScanner(r)}
}
+// NewDecoderSize creates a new Decoder that reads from r. The size argument specifies of the size of the buffer that
+// decoder will use when decoding events. Size must be large enough to fit the largest expected event.
+func NewDecoderSize(r io.Reader, size int) *Decoder {
+ scanner := bufio.NewScanner(r)
+ buf := make([]byte, 0, size)
+ scanner.Buffer(buf, size)
+ return &Decoder{scanner: scanner}
+}
+
// IsEnd returns whether this event indicates that the stream has ended.
func (e Event) IsEnd() bool { return e.Name == "end" }
diff --git a/client/go/internal/sse/sse_test.go b/client/go/internal/sse/sse_test.go
index 0e0d6929c75..3e3decaacec 100644
--- a/client/go/internal/sse/sse_test.go
+++ b/client/go/internal/sse/sse_test.go
@@ -41,6 +41,13 @@ event: end
assertDecodeErr(io.EOF, dec, t)
}
+func TestDecoderLarge(t *testing.T) {
+ data := strings.Repeat("c", (256*1024)-50)
+ r := strings.NewReader("event: foo\nid: 42\ndata: " + data + "\n")
+ dec := NewDecoderSize(r, 256*1024)
+ assertDecode(&Event{Name: "foo", ID: "42", Data: data}, dec, t)
+}
+
func TestDecoderInvalid(t *testing.T) {
r := strings.NewReader(`
event: foo