summaryrefslogtreecommitdiffstats
path: root/client/go/internal/sse/sse.go
blob: a056e4a598a334301348732933f593acbb39d7ad (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package sse

import (
	"bufio"
	"fmt"
	"io"
	"strings"
)

// Event represents a server-sent event. Name and ID are optional fields.
type Event struct {
	Name string
	ID   string
	Data string
}

// Decoder reads and decodes a server-sent event from an input stream.
type Decoder struct {
	scanner *bufio.Scanner
}

// Decode reads and decodes the next event from the underlying reader.
func (d *Decoder) Decode() (*Event, error) {
	// https://www.rfc-editor.org/rfc/rfc8895.html#name-server-push-server-sent-eve
	var (
		event    Event
		data     strings.Builder
		lastRead string
		gotName  bool
		gotID    bool
		gotData  bool
		decoding bool
	)
	for d.scanner.Scan() {
		line := strings.TrimSpace(d.scanner.Text())
		if line == "" {
			if decoding {
				break // Done with event
			} else {
				continue // Waiting for first non-empty line
			}
		}
		lastRead = line
		decoding = true
		parts := strings.SplitN(line, ": ", 2)
		if len(parts) < 2 || parts[0] == "" {
			continue
		}
		switch parts[0] {
		case "event":
			if gotName {
				return nil, fmt.Errorf("got more than one event line: last read %q", lastRead)
			}
			event.Name = parts[1]
			gotName = true
		case "id":
			if gotID {
				return nil, fmt.Errorf("got more than one id line: last read %q", lastRead)
			}
			event.ID = parts[1]
			gotID = true
		case "data":
			if data.Len() > 0 {
				data.WriteString(" ")
			}
			data.WriteString(parts[1])
			gotData = true
		default:
			return nil, fmt.Errorf("invalid field name %q: last read %q", parts[0], lastRead)
		}
	}
	if err := d.scanner.Err(); err != nil {
		return nil, err
	}
	if !decoding {
		return nil, io.EOF
	}
	if !event.IsEnd() && !gotData {
		return nil, fmt.Errorf("no data line found for event: last read %q", lastRead)
	}
	event.Data = data.String()
	return &event, nil
}

// NewDecoder creates a new Decoder that reads from r.
func NewDecoder(r io.Reader) *Decoder { return &Decoder{scanner: bufio.NewScanner(r)} }

// IsEnd returns whether this event indicates that the stream has ended.
func (e Event) IsEnd() bool { return e.Name == "end" }