diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-05-24 11:05:02 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-24 11:05:02 +0200 |
commit | ca478cb618bfd8517dcb3b1a01f12b83077d7d30 (patch) | |
tree | a343538e57b595b33cbbce112cc8808aafadddd7 /client | |
parent | d4f477a4d7fc89cad395fae0d2a5180972d14ed4 (diff) | |
parent | 247e75cbe1ae11f8f4b7f2ff5f2bead3c66a19c9 (diff) |
Merge pull request #27200 from vespa-engine/mpolden/stats-cleanup
Clean up stats collection
Diffstat (limited to 'client')
-rw-r--r-- | client/go/internal/cli/cmd/feed.go | 8 | ||||
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 13 | ||||
-rw-r--r-- | client/go/internal/vespa/document/dispatcher_test.go | 9 | ||||
-rw-r--r-- | client/go/internal/vespa/document/feeder_test.go | 34 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http.go | 35 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http_test.go | 27 | ||||
-rw-r--r-- | client/go/internal/vespa/document/stats.go (renamed from client/go/internal/vespa/document/feeder.go) | 60 | ||||
-rw-r--r-- | client/go/internal/vespa/document/stats_test.go | 43 |
8 files changed, 115 insertions, 114 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index fa87c420f16..2a7d8491578 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -244,7 +244,7 @@ type feedSummary struct { RequestCount int64 `json:"http.request.count"` RequestBytes int64 `json:"http.request.bytes"` RequestRate number `json:"http.request.MBps"` - ExceptionCount int64 `json:"http.exception.count"` // same as ErrorCount, for compatability with vespa-feed-client + ExceptionCount int64 `json:"http.exception.count"` // same as ErrorCount, for compatibility with vespa-feed-client output ResponseCount int64 `json:"http.response.count"` ResponseBytes int64 `json:"http.response.bytes"` @@ -264,8 +264,8 @@ func mbps(bytes int64, duration time.Duration) float64 { func writeSummaryJSON(w io.Writer, stats document.Stats, duration time.Duration) error { summary := feedSummary{ Seconds: number(duration.Seconds()), - SuccessCount: stats.Successes(), - SuccessRate: number(float64(stats.Successes()) / math.Max(1, duration.Seconds())), + SuccessCount: stats.Successful(), + SuccessRate: number(float64(stats.Successful()) / math.Max(1, duration.Seconds())), ErrorCount: stats.Errors, InflightCount: stats.Inflight, @@ -277,7 +277,7 @@ func writeSummaryJSON(w io.Writer, stats document.Stats, duration time.Duration) ResponseCount: stats.Responses, ResponseBytes: stats.BytesRecv, ResponseRate: number(mbps(stats.BytesRecv, duration)), - ResponseErrorCount: stats.Responses - stats.Successes(), + ResponseErrorCount: stats.Unsuccessful(), ResponseMinLatency: stats.MinLatency.Milliseconds(), ResponseAvgLatency: stats.AvgLatency().Milliseconds(), ResponseMaxLatency: stats.MaxLatency.Milliseconds(), diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 8ddb34c8c4d..a2b84aaeef2 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -12,6 +12,9 @@ import ( // maxAttempts controls the maximum number of times a document operation is attempted before giving up. const maxAttempts = 10 +// Feeder is the interface for a consumer of documents. +type Feeder interface{ Send(Document) Result } + // Dispatcher dispatches documents from a queue to a Feeder. type Dispatcher struct { feeder Feeder @@ -132,7 +135,7 @@ func (d *Dispatcher) processResults() { defer d.wg.Done() for op := range d.results { d.statsMu.Lock() - d.stats.Add(op.result.Stats) + d.stats.Add(op.result) d.statsMu.Unlock() if d.shouldRetry(op, op.result) { d.enqueue(op.resetResult(), true) @@ -154,8 +157,7 @@ func (d *Dispatcher) dispatchNext(id Id) { } hasNext := q != nil if hasNext { - next, ok := q.Poll() - if ok { + if next, ok := q.Poll(); ok { // we have more operations with this ID: dispatch the next one d.dispatch(next) } else { @@ -237,8 +239,9 @@ func (d *Dispatcher) Enqueue(doc Document) error { return d.enqueue(documentOp{d func (d *Dispatcher) Stats() Stats { d.statsMu.Lock() defer d.statsMu.Unlock() - d.stats.Inflight = d.inflightCount.Load() - return d.stats + statsCopy := d.stats.Clone() + statsCopy.Inflight = d.inflightCount.Load() + return statsCopy } // Close waits for all inflight operations to complete and closes the dispatcher. diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go index 382d21501c3..9bc0c76106c 100644 --- a/client/go/internal/vespa/document/dispatcher_test.go +++ b/client/go/internal/vespa/document/dispatcher_test.go @@ -42,9 +42,6 @@ func (f *mockFeeder) Send(doc Document) Result { } else { f.documents = append(f.documents, doc) } - if !result.Success() { - result.Stats.Errors = 1 - } return result } @@ -135,7 +132,7 @@ func TestDispatcherOrderingWithFailures(t *testing.T) { dispatcher.Close() wantDocs := docs[:2] assert.Equal(t, wantDocs, feeder.documents) - assert.Equal(t, int64(20), dispatcher.Stats().Errors) + assert.Equal(t, int64(20), dispatcher.Stats().Unsuccessful()) // Dispatching more documents for same ID succeed feeder.failAfterN(0) @@ -145,7 +142,7 @@ func TestDispatcherOrderingWithFailures(t *testing.T) { dispatcher.Enqueue(Document{Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut}) dispatcher.Enqueue(Document{Id: mustParseId("id:ns:type::doc3"), Operation: OperationPut}) dispatcher.Close() - assert.Equal(t, int64(20), dispatcher.Stats().Errors) + assert.Equal(t, int64(20), dispatcher.Stats().Unsuccessful()) assert.Equal(t, 6, len(feeder.documents)) } @@ -166,7 +163,7 @@ func TestDispatcherOrderingWithRetry(t *testing.T) { } dispatcher.Close() assert.Equal(t, docs, feeder.documents) - assert.Equal(t, int64(5), dispatcher.Stats().Errors) + assert.Equal(t, int64(5), dispatcher.Stats().Unsuccessful()) } func TestDispatcherOpenCircuit(t *testing.T) { diff --git a/client/go/internal/vespa/document/feeder_test.go b/client/go/internal/vespa/document/feeder_test.go deleted file mode 100644 index a7d92495889..00000000000 --- a/client/go/internal/vespa/document/feeder_test.go +++ /dev/null @@ -1,34 +0,0 @@ -package document - -import ( - "reflect" - "testing" - "time" -) - -func TestStatsAdd(t *testing.T) { - var got Stats - got.Add(Stats{Requests: 1}) - got.Add(Stats{Requests: 1}) - got.Add(Stats{Responses: 1}) - got.Add(Stats{Responses: 1}) - got.Add(Stats{ResponsesByCode: map[int]int64{200: 2}}) - got.Add(Stats{ResponsesByCode: map[int]int64{200: 2}}) - got.Add(Stats{MinLatency: 200 * time.Millisecond}) - got.Add(Stats{MaxLatency: 400 * time.Millisecond}) - got.Add(Stats{MinLatency: 100 * time.Millisecond}) - got.Add(Stats{MaxLatency: 500 * time.Millisecond}) - got.Add(Stats{MaxLatency: 300 * time.Millisecond}) - got.Add(Stats{}) - - want := Stats{ - Requests: 2, - Responses: 2, - ResponsesByCode: map[int]int64{200: 4}, - MinLatency: 100 * time.Millisecond, - MaxLatency: 500 * time.Millisecond, - } - if !reflect.DeepEqual(got, want) { - t.Errorf("got %+v, want %+v", got, want) - } -} diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go index e083f017c4a..8ea28203c28 100644 --- a/client/go/internal/vespa/document/http.go +++ b/client/go/internal/vespa/document/http.go @@ -274,7 +274,7 @@ func (c *Client) clientTimeout() time.Duration { // Send given document to the endpoint configured in this client. func (c *Client) Send(document Document) Result { start := c.now() - result := Result{Id: document.Id, Stats: Stats{Requests: 1}} + result := Result{Id: document.Id} req, buf, err := c.prepare(document) defer c.buffers.Put(buf) if err != nil { @@ -294,7 +294,6 @@ func (c *Client) Send(document Document) Result { } func resultWithErr(result Result, err error) Result { - result.Stats.Errors++ result.Status = StatusTransportFailure result.Err = err return result @@ -302,8 +301,6 @@ func resultWithErr(result Result, err error) Result { func resultWithResponse(resp *http.Response, sentBytes int, result Result, elapsed time.Duration, buf *bytes.Buffer) Result { result.HTTPStatus = resp.StatusCode - result.Stats.Responses++ - result.Stats.ResponsesByCode = map[int]int64{resp.StatusCode: 1} switch resp.StatusCode { case 200: result.Status = StatusSuccess @@ -314,30 +311,24 @@ func resultWithResponse(resp *http.Response, sentBytes int, result Result, elaps default: result.Status = StatusTransportFailure } - var body struct { - Message string `json:"message"` - Trace json.RawValue `json:"trace"` - } buf.Reset() written, err := io.Copy(buf, resp.Body) if err != nil { - result.Status = StatusVespaFailure - result.Err = err + result = resultWithErr(result, err) } else { + var body struct { + Message string `json:"message"` + Trace json.RawValue `json:"trace"` + } if err := json.Unmarshal(buf.Bytes(), &body); err != nil { - result.Status = StatusVespaFailure - result.Err = fmt.Errorf("failed to decode json response: %w", err) + result = resultWithErr(result, fmt.Errorf("failed to decode json response: %w", err)) + } else { + result.Message = body.Message + result.Trace = string(body.Trace) } } - result.Message = body.Message - result.Trace = string(body.Trace) - result.Stats.BytesSent = int64(sentBytes) - result.Stats.BytesRecv = int64(written) - if !result.Success() { - result.Stats.Errors++ - } - result.Stats.TotalLatency = elapsed - result.Stats.MinLatency = elapsed - result.Stats.MaxLatency = elapsed + result.Latency = elapsed + result.BytesSent = int64(sentBytes) + result.BytesRecv = int64(written) return result } diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go index 1bc3a6c9f39..bfcd15c6070 100644 --- a/client/go/internal/vespa/document/http_test.go +++ b/client/go/internal/vespa/document/http_test.go @@ -85,37 +85,28 @@ func TestClientSend(t *testing.T) { for i, tt := range tests { doc := tt.in wantRes := Result{ - Id: doc.Id, - Stats: Stats{ - Requests: 1, - Responses: 1, - TotalLatency: time.Second, - MinLatency: time.Second, - MaxLatency: time.Second, - }, + Id: doc.Id, + Latency: time.Second, } if i < 3 { httpClient.NextResponseString(200, `{"message":"All good!"}`) wantRes.Status = StatusSuccess wantRes.HTTPStatus = 200 wantRes.Message = "All good!" - wantRes.Stats.ResponsesByCode = map[int]int64{200: 1} - wantRes.Stats.BytesRecv = 23 + wantRes.BytesRecv = 23 } else { httpClient.NextResponseString(502, `{"message":"Good bye, cruel world!"}`) wantRes.Status = StatusVespaFailure wantRes.HTTPStatus = 502 wantRes.Message = "Good bye, cruel world!" - wantRes.Stats.ResponsesByCode = map[int]int64{502: 1} - wantRes.Stats.Errors = 1 - wantRes.Stats.BytesRecv = 36 + wantRes.BytesRecv = 36 } res := client.Send(doc) - wantRes.Stats.BytesSent = int64(len(httpClient.LastBody)) + wantRes.BytesSent = int64(len(httpClient.LastBody)) if !reflect.DeepEqual(res, wantRes) { t.Fatalf("got result %+v, want %+v", res, wantRes) } - stats.Add(res.Stats) + stats.Add(res) r := httpClient.LastRequest if r.Method != tt.method { t.Errorf("got r.Method = %q, want %q", r.Method, tt.method) @@ -137,7 +128,7 @@ func TestClientSend(t *testing.T) { 200: 3, 502: 1, }, - Errors: 1, + Errors: 0, Inflight: 0, TotalLatency: 4 * time.Second, MinLatency: time.Second, @@ -191,8 +182,8 @@ func assertCompressedRequest(t *testing.T, want bool, result Result, client *moc if gotEnc != wantEnc { t.Errorf("got Content-Encoding=%q, want %q", gotEnc, wantEnc) } - if result.Stats.BytesSent != int64(len(client.LastBody)) { - t.Errorf("got BytesSent=%d, want %d", result.Stats.BytesSent, len(client.LastBody)) + if result.BytesSent != int64(len(client.LastBody)) { + t.Errorf("got BytesSent=%d, want %d", result.BytesSent, len(client.LastBody)) } compressed := bytes.HasPrefix(client.LastBody, []byte{0x1f, 0x8b}) if compressed != want { diff --git a/client/go/internal/vespa/document/feeder.go b/client/go/internal/vespa/document/stats.go index 6bcd4afe916..7696648f703 100644 --- a/client/go/internal/vespa/document/feeder.go +++ b/client/go/internal/vespa/document/stats.go @@ -4,6 +4,7 @@ import ( "time" ) +// Status of a document operation. type Status int const ( @@ -25,9 +26,11 @@ type Result struct { Id Id Message string Trace string - Stats Stats Status Status HTTPStatus int + Latency time.Duration + BytesSent int64 + BytesRecv int64 } func (r Result) Success() bool { @@ -57,39 +60,46 @@ func (s Stats) AvgLatency() time.Duration { return s.TotalLatency / time.Duration(requests) } -func (s Stats) Successes() int64 { +func (s Stats) Successful() int64 { if s.ResponsesByCode == nil { return 0 } return s.ResponsesByCode[200] } -// Add all statistics contained in other to this. -func (s *Stats) Add(other Stats) { - s.Requests += other.Requests - s.Responses += other.Responses - if s.ResponsesByCode == nil && other.ResponsesByCode != nil { +func (s Stats) Unsuccessful() int64 { return s.Requests - s.Successful() } + +func (s Stats) Clone() Stats { + if s.ResponsesByCode != nil { + mapCopy := make(map[int]int64) + for k, v := range s.ResponsesByCode { + mapCopy[k] = v + } + s.ResponsesByCode = mapCopy + } + return s +} + +// Add statistics from result to this. +func (s *Stats) Add(result Result) { + s.Requests++ + if s.ResponsesByCode == nil { s.ResponsesByCode = make(map[int]int64) } - for code, count := range other.ResponsesByCode { - _, ok := s.ResponsesByCode[code] - if ok { - s.ResponsesByCode[code] += count - } else { - s.ResponsesByCode[code] = count - } + responsesByCode := s.ResponsesByCode[result.HTTPStatus] + s.ResponsesByCode[result.HTTPStatus] = responsesByCode + 1 + if result.Err == nil { + s.Responses++ + } else { + s.Errors++ } - s.Errors += other.Errors - s.TotalLatency += other.TotalLatency - if s.MinLatency == 0 || (other.MinLatency > 0 && other.MinLatency < s.MinLatency) { - s.MinLatency = other.MinLatency + s.TotalLatency += result.Latency + if result.Latency < s.MinLatency || s.MinLatency == 0 { + s.MinLatency = result.Latency } - if other.MaxLatency > s.MaxLatency { - s.MaxLatency = other.MaxLatency + if result.Latency > s.MaxLatency { + s.MaxLatency = result.Latency } - s.BytesSent += other.BytesSent - s.BytesRecv += other.BytesRecv + s.BytesSent += result.BytesSent + s.BytesRecv += result.BytesRecv } - -// Feeder is the interface for a consumer of documents. -type Feeder interface{ Send(Document) Result } diff --git a/client/go/internal/vespa/document/stats_test.go b/client/go/internal/vespa/document/stats_test.go new file mode 100644 index 00000000000..8788836f9ad --- /dev/null +++ b/client/go/internal/vespa/document/stats_test.go @@ -0,0 +1,43 @@ +package document + +import ( + "reflect" + "testing" + "time" +) + +func TestStatsAdd(t *testing.T) { + var stats Stats + stats.Add(Result{HTTPStatus: 200, Latency: 200 * time.Millisecond}) + stats.Add(Result{HTTPStatus: 200, Latency: 400 * time.Millisecond}) + stats.Add(Result{HTTPStatus: 200, Latency: 100 * time.Millisecond}) + stats.Add(Result{HTTPStatus: 200, Latency: 500 * time.Millisecond}) + stats.Add(Result{HTTPStatus: 200, Latency: 300 * time.Millisecond}) + stats.Add(Result{HTTPStatus: 500, Latency: 100 * time.Millisecond}) + expected := Stats{ + Requests: 6, + Responses: 6, + ResponsesByCode: map[int]int64{200: 5, 500: 1}, + TotalLatency: 1600 * time.Millisecond, + MinLatency: 100 * time.Millisecond, + MaxLatency: 500 * time.Millisecond, + } + if !reflect.DeepEqual(stats, expected) { + t.Errorf("got %+v, want %+v", stats, expected) + } + if want, got := int64(1), stats.Unsuccessful(); want != got { + t.Errorf("got stats.Unsuccessful() = %d, want %d", got, want) + } +} + +func TestStatsClone(t *testing.T) { + var a Stats + a.Add(Result{HTTPStatus: 200}) + b := a.Clone() + a.Add(Result{HTTPStatus: 200}) + + want := Stats{Requests: 1, Responses: 1, ResponsesByCode: map[int]int64{200: 1}} + if !reflect.DeepEqual(b, want) { + t.Errorf("got %+v, want %+v", b, want) + } +} |