diff options
author | Martin Polden <mpolden@mpolden.no> | 2024-06-20 09:58:55 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-06-20 09:58:55 +0200 |
commit | 4c5c25cdd097424c2596b2202f902e0c3fa9b91c (patch) | |
tree | 3f08d4f19f04b6714fe04fee8d39afe6cccde191 /client/go | |
parent | 7954a76d91d47fae17ec5c03705aef5bc87745b9 (diff) | |
parent | 9539043519f7180370442149c6cf9ab677c56b52 (diff) |
Merge pull request #31604 from vespa-engine/mpolden/handle-large-sse
Handle larger SSE events
Diffstat (limited to 'client/go')
-rw-r--r-- | client/go/internal/cli/cmd/query.go | 11 | ||||
-rw-r--r-- | client/go/internal/sse/sse.go | 9 | ||||
-rw-r--r-- | client/go/internal/sse/sse_test.go | 7 |
3 files changed, 20 insertions, 7 deletions
diff --git a/client/go/internal/cli/cmd/query.go b/client/go/internal/cli/cmd/query.go index 54bbc5fb59a..4d5941943ca 100644 --- a/client/go/internal/cli/cmd/query.go +++ b/client/go/internal/cli/cmd/query.go @@ -5,7 +5,6 @@ package cmd import ( - "bufio" "encoding/json" "fmt" "io" @@ -146,13 +145,11 @@ type printOptions struct { func printResponseBody(body io.Reader, options printOptions, cli *CLI) error { if options.plainStream { - scanner := bufio.NewScanner(body) - for scanner.Scan() { - fmt.Fprintln(cli.Stdout, scanner.Text()) - } - return scanner.Err() + _, err := io.Copy(cli.Stdout, body) + return err } else if options.tokenStream { - dec := sse.NewDecoder(body) + bufSize := 1024 * 1024 // Handle events up to this size + dec := sse.NewDecoderSize(body, bufSize) writingLine := false for { event, err := dec.Decode() diff --git a/client/go/internal/sse/sse.go b/client/go/internal/sse/sse.go index 9a120944eec..caccc90d354 100644 --- a/client/go/internal/sse/sse.go +++ b/client/go/internal/sse/sse.go @@ -87,6 +87,15 @@ func NewDecoder(r io.Reader) *Decoder { return &Decoder{scanner: bufio.NewScanner(r)} } +// NewDecoderSize creates a new Decoder that reads from r. The size argument specifies of the size of the buffer that +// decoder will use when decoding events. Size must be large enough to fit the largest expected event. +func NewDecoderSize(r io.Reader, size int) *Decoder { + scanner := bufio.NewScanner(r) + buf := make([]byte, 0, size) + scanner.Buffer(buf, size) + return &Decoder{scanner: scanner} +} + // IsEnd returns whether this event indicates that the stream has ended. func (e Event) IsEnd() bool { return e.Name == "end" } diff --git a/client/go/internal/sse/sse_test.go b/client/go/internal/sse/sse_test.go index 0e0d6929c75..3e3decaacec 100644 --- a/client/go/internal/sse/sse_test.go +++ b/client/go/internal/sse/sse_test.go @@ -41,6 +41,13 @@ event: end assertDecodeErr(io.EOF, dec, t) } +func TestDecoderLarge(t *testing.T) { + data := strings.Repeat("c", (256*1024)-50) + r := strings.NewReader("event: foo\nid: 42\ndata: " + data + "\n") + dec := NewDecoderSize(r, 256*1024) + assertDecode(&Event{Name: "foo", ID: "42", Data: data}, dec, t) +} + func TestDecoderInvalid(t *testing.T) { r := strings.NewReader(` event: foo |