aboutsummaryrefslogtreecommitdiffstats
path: root/client/go/internal
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2024-06-18 12:09:43 +0200
committerMartin Polden <mpolden@mpolden.no>2024-06-18 12:10:47 +0200
commit9539043519f7180370442149c6cf9ab677c56b52 (patch)
tree6cee7efc8c4d33c34b7d7d449d8e2cdaac5f748e /client/go/internal
parentba2f41af2496845f962fb81616a9b0f5269ec604 (diff)
Increase size of SSE decoder buffer
Diffstat (limited to 'client/go/internal')
-rw-r--r--client/go/internal/cli/cmd/query.go3
-rw-r--r--client/go/internal/sse/sse.go9
-rw-r--r--client/go/internal/sse/sse_test.go7
3 files changed, 18 insertions, 1 deletions
diff --git a/client/go/internal/cli/cmd/query.go b/client/go/internal/cli/cmd/query.go
index d43ae8d3308..9c230a721fd 100644
--- a/client/go/internal/cli/cmd/query.go
+++ b/client/go/internal/cli/cmd/query.go
@@ -161,7 +161,8 @@ func printResponseBody(body io.Reader, options printOptions, cli *CLI) error {
_, err := io.Copy(cli.Stdout, body)
return err
} else if options.tokenStream {
- dec := sse.NewDecoder(body)
+ bufSize := 1024 * 1024 // Handle events up to this size
+ dec := sse.NewDecoderSize(body, bufSize)
writingLine := false
for {
event, err := dec.Decode()
diff --git a/client/go/internal/sse/sse.go b/client/go/internal/sse/sse.go
index 9a120944eec..caccc90d354 100644
--- a/client/go/internal/sse/sse.go
+++ b/client/go/internal/sse/sse.go
@@ -87,6 +87,15 @@ func NewDecoder(r io.Reader) *Decoder {
return &Decoder{scanner: bufio.NewScanner(r)}
}
+// NewDecoderSize creates a new Decoder that reads from r. The size argument specifies of the size of the buffer that
+// decoder will use when decoding events. Size must be large enough to fit the largest expected event.
+func NewDecoderSize(r io.Reader, size int) *Decoder {
+ scanner := bufio.NewScanner(r)
+ buf := make([]byte, 0, size)
+ scanner.Buffer(buf, size)
+ return &Decoder{scanner: scanner}
+}
+
// IsEnd returns whether this event indicates that the stream has ended.
func (e Event) IsEnd() bool { return e.Name == "end" }
diff --git a/client/go/internal/sse/sse_test.go b/client/go/internal/sse/sse_test.go
index 0e0d6929c75..3e3decaacec 100644
--- a/client/go/internal/sse/sse_test.go
+++ b/client/go/internal/sse/sse_test.go
@@ -41,6 +41,13 @@ event: end
assertDecodeErr(io.EOF, dec, t)
}
+func TestDecoderLarge(t *testing.T) {
+ data := strings.Repeat("c", (256*1024)-50)
+ r := strings.NewReader("event: foo\nid: 42\ndata: " + data + "\n")
+ dec := NewDecoderSize(r, 256*1024)
+ assertDecode(&Event{Name: "foo", ID: "42", Data: data}, dec, t)
+}
+
func TestDecoderInvalid(t *testing.T) {
r := strings.NewReader(`
event: foo