diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-03-17 15:14:57 +0100 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-03-23 12:13:46 +0100 |
commit | 4c1bf7ab4ebc5e56bdc438766c66b5f6e2a3c81c (patch) | |
tree | 2d53f51047af9c1302fbfbfd8552dd80925326af /client | |
parent | de4c623fd4ad427bddf5ae267a21a6105b702b27 (diff) |
Collect statistics
Diffstat (limited to 'client')
-rw-r--r-- | client/go/internal/vespa/feed/feed.go | 64 | ||||
-rw-r--r-- | client/go/internal/vespa/feed/http.go | 61 | ||||
-rw-r--r-- | client/go/internal/vespa/feed/http_test.go | 84 |
3 files changed, 183 insertions, 26 deletions
diff --git a/client/go/internal/vespa/feed/feed.go b/client/go/internal/vespa/feed/feed.go index d042544ae75..ac2dcc4e049 100644 --- a/client/go/internal/vespa/feed/feed.go +++ b/client/go/internal/vespa/feed/feed.go @@ -1,5 +1,9 @@ package feed +import ( + "time" +) + type Status int const ( @@ -29,7 +33,65 @@ type Result struct { // Success returns whether status s is considered a success. func (s Status) Success() bool { return s == StatusSuccess || s == StatusConditionNotMet } -// Feeder is the interface for code that perform a document operation and return its result. +// Stats represents the summed statistics of a feeder. +type Stats struct { + Requests int64 + Responses int64 + ResponsesByCode map[int]int64 + Errors int64 + Inflight int64 + TotalLatency time.Duration + MinLatency time.Duration + MaxLatency time.Duration + BytesSent int64 + BytesRecv int64 +} + +func NewStats() Stats { return Stats{ResponsesByCode: make(map[int]int64)} } + +// AvgLatency returns the average latency for a request. +func (s Stats) AvgLatency() time.Duration { + requests := s.Requests + if requests == 0 { + requests = 1 + } + return s.TotalLatency / time.Duration(requests) +} + +func (s Stats) Successes() int64 { + if s.ResponsesByCode == nil { + return 0 + } + return s.ResponsesByCode[200] +} + +// Add adds all statistics contained in other to this. +func (s *Stats) Add(other Stats) { + s.Requests += other.Requests + s.Responses += other.Responses + for code, count := range other.ResponsesByCode { + _, ok := s.ResponsesByCode[code] + if ok { + s.ResponsesByCode[code] += count + } else { + s.ResponsesByCode[code] = count + } + } + s.Errors += other.Errors + s.Inflight += other.Inflight + s.TotalLatency += other.TotalLatency + if s.MinLatency == 0 || other.MinLatency < s.MinLatency { + s.MinLatency = other.MinLatency + } + if other.MaxLatency > s.MaxLatency { + s.MaxLatency = other.MaxLatency + } + s.BytesSent += other.BytesSent + s.BytesRecv += other.BytesRecv +} + +// Feeder is the interface for code that can feed documents. type Feeder interface { Send(Document) Result + Stats() Stats } diff --git a/client/go/internal/vespa/feed/http.go b/client/go/internal/vespa/feed/http.go index babd6d3d3e7..87630924a3a 100644 --- a/client/go/internal/vespa/feed/http.go +++ b/client/go/internal/vespa/feed/http.go @@ -4,9 +4,11 @@ import ( "bytes" "encoding/json" "fmt" + "io" "net/http" "net/url" "strconv" + "sync" "time" "github.com/vespa-engine/vespa/client/go/internal/util" @@ -16,6 +18,9 @@ import ( type Client struct { options ClientOptions httpClient util.HTTPClient + stats Stats + mu sync.Mutex + now func() time.Time } // ClientOptions specifices the configuration options of a feed client. @@ -26,8 +31,24 @@ type ClientOptions struct { TraceLevel *int } +type countingReader struct { + reader io.Reader + bytesRead int64 +} + +func (r *countingReader) Read(p []byte) (int, error) { + n, err := r.reader.Read(p) + r.bytesRead += int64(n) + return n, err +} + func NewClient(options ClientOptions, httpClient util.HTTPClient) *Client { - return &Client{options: options, httpClient: httpClient} + return &Client{ + options: options, + httpClient: httpClient, + stats: NewStats(), + now: time.Now, + } } func (c *Client) queryParams() url.Values { @@ -46,24 +67,50 @@ func (c *Client) queryParams() url.Values { // Send given document the URL configured in this client. func (c *Client) Send(document Document) Result { + start := c.now() + stats := NewStats() + stats.Requests = 1 + defer func() { + latency := c.now().Sub(start) + stats.TotalLatency = latency + stats.MinLatency = latency + stats.MaxLatency = latency + c.addStats(&stats) + }() method, url, err := document.FeedURL(c.options.BaseURL, c.queryParams()) if err != nil { + stats.Errors = 1 return Result{Status: StatusError, Err: err} } body := document.Body() req, err := http.NewRequest(method, url.String(), bytes.NewReader(body)) if err != nil { + stats.Errors = 1 return Result{Status: StatusError, Err: err} } resp, err := c.httpClient.Do(req, c.options.Timeout) if err != nil { + stats.Errors = 1 return Result{Status: StatusTransportFailure, Err: err} } defer resp.Body.Close() - return createResult(document.Id, resp) + stats.Responses = 1 + stats.ResponsesByCode = map[int]int64{ + resp.StatusCode: 1, + } + stats.BytesSent = int64(len(body)) + return c.createResult(document.Id, &stats, resp) } -func createResult(id DocumentId, resp *http.Response) Result { +func (c *Client) Stats() Stats { return c.stats } + +func (c *Client) addStats(stats *Stats) { + c.mu.Lock() + defer c.mu.Unlock() + c.stats.Add(*stats) +} + +func (c *Client) createResult(id DocumentId, stats *Stats, resp *http.Response) Result { result := Result{Id: id} switch resp.StatusCode { case 200: @@ -79,13 +126,17 @@ func createResult(id DocumentId, resp *http.Response) Result { Message string `json:"message"` Trace json.RawMessage `json:"trace"` } - jsonDec := json.NewDecoder(resp.Body) + cr := countingReader{reader: resp.Body} + jsonDec := json.NewDecoder(&cr) if err := jsonDec.Decode(&body); err != nil { result.Status = StatusError result.Err = fmt.Errorf("failed to decode json response: %w", err) - return result } result.Message = body.Message result.Trace = string(body.Trace) + stats.BytesRecv = cr.bytesRead + if !result.Status.Success() { + stats.Errors = 1 + } return result } diff --git a/client/go/internal/vespa/feed/http_test.go b/client/go/internal/vespa/feed/http_test.go index 9d9c1ed1c33..42a350b9279 100644 --- a/client/go/internal/vespa/feed/http_test.go +++ b/client/go/internal/vespa/feed/http_test.go @@ -3,40 +3,84 @@ package feed import ( "bytes" "encoding/json" + "fmt" "io" "net/http" + "reflect" "testing" "time" "github.com/vespa-engine/vespa/client/go/internal/mock" ) +type manualClock struct { + t time.Time + tick time.Duration +} + +func (c *manualClock) now() time.Time { + t := c.t + c.t = c.t.Add(c.tick) + return t +} + func TestClientSend(t *testing.T) { - doc := mustParseDocument(Document{Create: true, UpdateId: "id:ns:type::doc1", Fields: json.RawMessage(`{"foo": "123"}`)}) + docs := []Document{ + mustParseDocument(Document{Create: true, UpdateId: "id:ns:type::doc1", Fields: json.RawMessage(`{"foo": "123"}`)}), + mustParseDocument(Document{Create: true, UpdateId: "id:ns:type::doc2", Fields: json.RawMessage(`{"foo": "456"}`)}), + mustParseDocument(Document{Create: true, UpdateId: "id:ns:type::doc3", Fields: json.RawMessage(`{"baz": "789"}`)}), + } httpClient := mock.HTTPClient{} - var client Feeder = NewClient(ClientOptions{ + client := NewClient(ClientOptions{ BaseURL: "https://example.com:1337", Timeout: time.Duration(5 * time.Second), }, &httpClient) - httpClient.NextResponseString(200, `{"message":"All good!"}`) - res := client.Send(doc) - if res.Err != nil { - t.Fatalf("got unexpected error %q", res.Err) - } - r := httpClient.LastRequest - if r.Method != http.MethodPut { - t.Errorf("got r.Method = %q, want %q", r.Method, http.MethodPut) - } - wantURL := "https://example.com:1337/document/v1/ns/type/docid/doc1?create=true&timeout=5000ms" - if r.URL.String() != wantURL { - t.Errorf("got r.URL = %q, want %q", r.URL, wantURL) + clock := manualClock{t: time.Now(), tick: time.Second} + client.now = clock.now + for i, doc := range docs { + if i < 2 { + httpClient.NextResponseString(200, `{"message":"All good!"}`) + } else { + httpClient.NextResponseString(502, `{"message":"Good bye, cruel world!"}`) + } + res := client.Send(doc) + if res.Err != nil { + t.Fatalf("got unexpected error %q", res.Err) + } + r := httpClient.LastRequest + if r.Method != http.MethodPut { + t.Errorf("got r.Method = %q, want %q", r.Method, http.MethodPut) + } + wantURL := fmt.Sprintf("https://example.com:1337/document/v1/ns/type/docid/%s?create=true&timeout=5000ms", doc.Id.UserSpecific) + if r.URL.String() != wantURL { + t.Errorf("got r.URL = %q, want %q", r.URL, wantURL) + } + body, err := io.ReadAll(r.Body) + if err != nil { + t.Fatalf("got unexpected error %q", err) + } + wantBody := doc.Body() + if !bytes.Equal(body, wantBody) { + t.Errorf("got r.Body = %q, want %q", string(body), string(wantBody)) + } } - body, err := io.ReadAll(r.Body) - if err != nil { - t.Fatalf("got unexpected error %q", err) + stats := client.Stats() + want := Stats{ + Requests: 3, + Responses: 3, + ResponsesByCode: map[int]int64{ + 200: 2, + 502: 1, + }, + Errors: 1, + Inflight: 0, + TotalLatency: 3 * time.Second, + MinLatency: time.Second, + MaxLatency: time.Second, + BytesSent: 75, + BytesRecv: 82, } - wantBody := []byte(`{"fields":{"foo": "123"}}`) - if !bytes.Equal(body, wantBody) { - t.Errorf("got r.Body = %q, want %q", string(body), string(wantBody)) + if !reflect.DeepEqual(want, stats) { + t.Errorf("got %+v, want %+v", stats, want) } } |