aboutsummaryrefslogtreecommitdiffstats
path: root/client
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-03-17 15:14:57 +0100
committerMartin Polden <mpolden@mpolden.no>2023-03-23 12:13:46 +0100
commit4c1bf7ab4ebc5e56bdc438766c66b5f6e2a3c81c (patch)
tree2d53f51047af9c1302fbfbfd8552dd80925326af /client
parentde4c623fd4ad427bddf5ae267a21a6105b702b27 (diff)
Collect statistics
Diffstat (limited to 'client')
-rw-r--r--client/go/internal/vespa/feed/feed.go64
-rw-r--r--client/go/internal/vespa/feed/http.go61
-rw-r--r--client/go/internal/vespa/feed/http_test.go84
3 files changed, 183 insertions, 26 deletions
diff --git a/client/go/internal/vespa/feed/feed.go b/client/go/internal/vespa/feed/feed.go
index d042544ae75..ac2dcc4e049 100644
--- a/client/go/internal/vespa/feed/feed.go
+++ b/client/go/internal/vespa/feed/feed.go
@@ -1,5 +1,9 @@
package feed
+import (
+ "time"
+)
+
type Status int
const (
@@ -29,7 +33,65 @@ type Result struct {
// Success returns whether status s is considered a success.
func (s Status) Success() bool { return s == StatusSuccess || s == StatusConditionNotMet }
-// Feeder is the interface for code that perform a document operation and return its result.
+// Stats represents the summed statistics of a feeder.
+type Stats struct {
+ Requests int64
+ Responses int64
+ ResponsesByCode map[int]int64
+ Errors int64
+ Inflight int64
+ TotalLatency time.Duration
+ MinLatency time.Duration
+ MaxLatency time.Duration
+ BytesSent int64
+ BytesRecv int64
+}
+
+func NewStats() Stats { return Stats{ResponsesByCode: make(map[int]int64)} }
+
+// AvgLatency returns the average latency for a request.
+func (s Stats) AvgLatency() time.Duration {
+ requests := s.Requests
+ if requests == 0 {
+ requests = 1
+ }
+ return s.TotalLatency / time.Duration(requests)
+}
+
+func (s Stats) Successes() int64 {
+ if s.ResponsesByCode == nil {
+ return 0
+ }
+ return s.ResponsesByCode[200]
+}
+
+// Add adds all statistics contained in other to this.
+func (s *Stats) Add(other Stats) {
+ s.Requests += other.Requests
+ s.Responses += other.Responses
+ for code, count := range other.ResponsesByCode {
+ _, ok := s.ResponsesByCode[code]
+ if ok {
+ s.ResponsesByCode[code] += count
+ } else {
+ s.ResponsesByCode[code] = count
+ }
+ }
+ s.Errors += other.Errors
+ s.Inflight += other.Inflight
+ s.TotalLatency += other.TotalLatency
+ if s.MinLatency == 0 || other.MinLatency < s.MinLatency {
+ s.MinLatency = other.MinLatency
+ }
+ if other.MaxLatency > s.MaxLatency {
+ s.MaxLatency = other.MaxLatency
+ }
+ s.BytesSent += other.BytesSent
+ s.BytesRecv += other.BytesRecv
+}
+
+// Feeder is the interface for code that can feed documents.
type Feeder interface {
Send(Document) Result
+ Stats() Stats
}
diff --git a/client/go/internal/vespa/feed/http.go b/client/go/internal/vespa/feed/http.go
index babd6d3d3e7..87630924a3a 100644
--- a/client/go/internal/vespa/feed/http.go
+++ b/client/go/internal/vespa/feed/http.go
@@ -4,9 +4,11 @@ import (
"bytes"
"encoding/json"
"fmt"
+ "io"
"net/http"
"net/url"
"strconv"
+ "sync"
"time"
"github.com/vespa-engine/vespa/client/go/internal/util"
@@ -16,6 +18,9 @@ import (
type Client struct {
options ClientOptions
httpClient util.HTTPClient
+ stats Stats
+ mu sync.Mutex
+ now func() time.Time
}
// ClientOptions specifices the configuration options of a feed client.
@@ -26,8 +31,24 @@ type ClientOptions struct {
TraceLevel *int
}
+type countingReader struct {
+ reader io.Reader
+ bytesRead int64
+}
+
+func (r *countingReader) Read(p []byte) (int, error) {
+ n, err := r.reader.Read(p)
+ r.bytesRead += int64(n)
+ return n, err
+}
+
func NewClient(options ClientOptions, httpClient util.HTTPClient) *Client {
- return &Client{options: options, httpClient: httpClient}
+ return &Client{
+ options: options,
+ httpClient: httpClient,
+ stats: NewStats(),
+ now: time.Now,
+ }
}
func (c *Client) queryParams() url.Values {
@@ -46,24 +67,50 @@ func (c *Client) queryParams() url.Values {
// Send given document the URL configured in this client.
func (c *Client) Send(document Document) Result {
+ start := c.now()
+ stats := NewStats()
+ stats.Requests = 1
+ defer func() {
+ latency := c.now().Sub(start)
+ stats.TotalLatency = latency
+ stats.MinLatency = latency
+ stats.MaxLatency = latency
+ c.addStats(&stats)
+ }()
method, url, err := document.FeedURL(c.options.BaseURL, c.queryParams())
if err != nil {
+ stats.Errors = 1
return Result{Status: StatusError, Err: err}
}
body := document.Body()
req, err := http.NewRequest(method, url.String(), bytes.NewReader(body))
if err != nil {
+ stats.Errors = 1
return Result{Status: StatusError, Err: err}
}
resp, err := c.httpClient.Do(req, c.options.Timeout)
if err != nil {
+ stats.Errors = 1
return Result{Status: StatusTransportFailure, Err: err}
}
defer resp.Body.Close()
- return createResult(document.Id, resp)
+ stats.Responses = 1
+ stats.ResponsesByCode = map[int]int64{
+ resp.StatusCode: 1,
+ }
+ stats.BytesSent = int64(len(body))
+ return c.createResult(document.Id, &stats, resp)
}
-func createResult(id DocumentId, resp *http.Response) Result {
+func (c *Client) Stats() Stats { return c.stats }
+
+func (c *Client) addStats(stats *Stats) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ c.stats.Add(*stats)
+}
+
+func (c *Client) createResult(id DocumentId, stats *Stats, resp *http.Response) Result {
result := Result{Id: id}
switch resp.StatusCode {
case 200:
@@ -79,13 +126,17 @@ func createResult(id DocumentId, resp *http.Response) Result {
Message string `json:"message"`
Trace json.RawMessage `json:"trace"`
}
- jsonDec := json.NewDecoder(resp.Body)
+ cr := countingReader{reader: resp.Body}
+ jsonDec := json.NewDecoder(&cr)
if err := jsonDec.Decode(&body); err != nil {
result.Status = StatusError
result.Err = fmt.Errorf("failed to decode json response: %w", err)
- return result
}
result.Message = body.Message
result.Trace = string(body.Trace)
+ stats.BytesRecv = cr.bytesRead
+ if !result.Status.Success() {
+ stats.Errors = 1
+ }
return result
}
diff --git a/client/go/internal/vespa/feed/http_test.go b/client/go/internal/vespa/feed/http_test.go
index 9d9c1ed1c33..42a350b9279 100644
--- a/client/go/internal/vespa/feed/http_test.go
+++ b/client/go/internal/vespa/feed/http_test.go
@@ -3,40 +3,84 @@ package feed
import (
"bytes"
"encoding/json"
+ "fmt"
"io"
"net/http"
+ "reflect"
"testing"
"time"
"github.com/vespa-engine/vespa/client/go/internal/mock"
)
+type manualClock struct {
+ t time.Time
+ tick time.Duration
+}
+
+func (c *manualClock) now() time.Time {
+ t := c.t
+ c.t = c.t.Add(c.tick)
+ return t
+}
+
func TestClientSend(t *testing.T) {
- doc := mustParseDocument(Document{Create: true, UpdateId: "id:ns:type::doc1", Fields: json.RawMessage(`{"foo": "123"}`)})
+ docs := []Document{
+ mustParseDocument(Document{Create: true, UpdateId: "id:ns:type::doc1", Fields: json.RawMessage(`{"foo": "123"}`)}),
+ mustParseDocument(Document{Create: true, UpdateId: "id:ns:type::doc2", Fields: json.RawMessage(`{"foo": "456"}`)}),
+ mustParseDocument(Document{Create: true, UpdateId: "id:ns:type::doc3", Fields: json.RawMessage(`{"baz": "789"}`)}),
+ }
httpClient := mock.HTTPClient{}
- var client Feeder = NewClient(ClientOptions{
+ client := NewClient(ClientOptions{
BaseURL: "https://example.com:1337",
Timeout: time.Duration(5 * time.Second),
}, &httpClient)
- httpClient.NextResponseString(200, `{"message":"All good!"}`)
- res := client.Send(doc)
- if res.Err != nil {
- t.Fatalf("got unexpected error %q", res.Err)
- }
- r := httpClient.LastRequest
- if r.Method != http.MethodPut {
- t.Errorf("got r.Method = %q, want %q", r.Method, http.MethodPut)
- }
- wantURL := "https://example.com:1337/document/v1/ns/type/docid/doc1?create=true&timeout=5000ms"
- if r.URL.String() != wantURL {
- t.Errorf("got r.URL = %q, want %q", r.URL, wantURL)
+ clock := manualClock{t: time.Now(), tick: time.Second}
+ client.now = clock.now
+ for i, doc := range docs {
+ if i < 2 {
+ httpClient.NextResponseString(200, `{"message":"All good!"}`)
+ } else {
+ httpClient.NextResponseString(502, `{"message":"Good bye, cruel world!"}`)
+ }
+ res := client.Send(doc)
+ if res.Err != nil {
+ t.Fatalf("got unexpected error %q", res.Err)
+ }
+ r := httpClient.LastRequest
+ if r.Method != http.MethodPut {
+ t.Errorf("got r.Method = %q, want %q", r.Method, http.MethodPut)
+ }
+ wantURL := fmt.Sprintf("https://example.com:1337/document/v1/ns/type/docid/%s?create=true&timeout=5000ms", doc.Id.UserSpecific)
+ if r.URL.String() != wantURL {
+ t.Errorf("got r.URL = %q, want %q", r.URL, wantURL)
+ }
+ body, err := io.ReadAll(r.Body)
+ if err != nil {
+ t.Fatalf("got unexpected error %q", err)
+ }
+ wantBody := doc.Body()
+ if !bytes.Equal(body, wantBody) {
+ t.Errorf("got r.Body = %q, want %q", string(body), string(wantBody))
+ }
}
- body, err := io.ReadAll(r.Body)
- if err != nil {
- t.Fatalf("got unexpected error %q", err)
+ stats := client.Stats()
+ want := Stats{
+ Requests: 3,
+ Responses: 3,
+ ResponsesByCode: map[int]int64{
+ 200: 2,
+ 502: 1,
+ },
+ Errors: 1,
+ Inflight: 0,
+ TotalLatency: 3 * time.Second,
+ MinLatency: time.Second,
+ MaxLatency: time.Second,
+ BytesSent: 75,
+ BytesRecv: 82,
}
- wantBody := []byte(`{"fields":{"foo": "123"}}`)
- if !bytes.Equal(body, wantBody) {
- t.Errorf("got r.Body = %q, want %q", string(body), string(wantBody))
+ if !reflect.DeepEqual(want, stats) {
+ t.Errorf("got %+v, want %+v", stats, want)
}
}