From 369e3ffe35620747b39cf92cfe77bd721b1f6949 Mon Sep 17 00:00:00 2001 From: Martin Polden Date: Tue, 23 May 2023 23:56:24 +0200 Subject: Clean up stats collection --- client/go/internal/cli/cmd/feed.go | 8 +- client/go/internal/vespa/document/dispatcher.go | 5 +- .../go/internal/vespa/document/dispatcher_test.go | 9 +- client/go/internal/vespa/document/feeder.go | 95 ---------------------- client/go/internal/vespa/document/feeder_test.go | 34 -------- client/go/internal/vespa/document/http.go | 35 +++----- client/go/internal/vespa/document/http_test.go | 27 ++---- client/go/internal/vespa/document/stats.go | 94 +++++++++++++++++++++ client/go/internal/vespa/document/stats_test.go | 31 +++++++ 9 files changed, 158 insertions(+), 180 deletions(-) delete mode 100644 client/go/internal/vespa/document/feeder.go delete mode 100644 client/go/internal/vespa/document/feeder_test.go create mode 100644 client/go/internal/vespa/document/stats.go create mode 100644 client/go/internal/vespa/document/stats_test.go (limited to 'client') diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index fa87c420f16..2a7d8491578 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -244,7 +244,7 @@ type feedSummary struct { RequestCount int64 `json:"http.request.count"` RequestBytes int64 `json:"http.request.bytes"` RequestRate number `json:"http.request.MBps"` - ExceptionCount int64 `json:"http.exception.count"` // same as ErrorCount, for compatability with vespa-feed-client + ExceptionCount int64 `json:"http.exception.count"` // same as ErrorCount, for compatibility with vespa-feed-client output ResponseCount int64 `json:"http.response.count"` ResponseBytes int64 `json:"http.response.bytes"` @@ -264,8 +264,8 @@ func mbps(bytes int64, duration time.Duration) float64 { func writeSummaryJSON(w io.Writer, stats document.Stats, duration time.Duration) error { summary := feedSummary{ Seconds: number(duration.Seconds()), - SuccessCount: stats.Successes(), - SuccessRate: number(float64(stats.Successes()) / math.Max(1, duration.Seconds())), + SuccessCount: stats.Successful(), + SuccessRate: number(float64(stats.Successful()) / math.Max(1, duration.Seconds())), ErrorCount: stats.Errors, InflightCount: stats.Inflight, @@ -277,7 +277,7 @@ func writeSummaryJSON(w io.Writer, stats document.Stats, duration time.Duration) ResponseCount: stats.Responses, ResponseBytes: stats.BytesRecv, ResponseRate: number(mbps(stats.BytesRecv, duration)), - ResponseErrorCount: stats.Responses - stats.Successes(), + ResponseErrorCount: stats.Unsuccessful(), ResponseMinLatency: stats.MinLatency.Milliseconds(), ResponseAvgLatency: stats.AvgLatency().Milliseconds(), ResponseMaxLatency: stats.MaxLatency.Milliseconds(), diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 8ddb34c8c4d..ca8c585a295 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -12,6 +12,9 @@ import ( // maxAttempts controls the maximum number of times a document operation is attempted before giving up. const maxAttempts = 10 +// Feeder is the interface for a consumer of documents. +type Feeder interface{ Send(Document) Result } + // Dispatcher dispatches documents from a queue to a Feeder. type Dispatcher struct { feeder Feeder @@ -132,7 +135,7 @@ func (d *Dispatcher) processResults() { defer d.wg.Done() for op := range d.results { d.statsMu.Lock() - d.stats.Add(op.result.Stats) + d.stats.Add(op.result) d.statsMu.Unlock() if d.shouldRetry(op, op.result) { d.enqueue(op.resetResult(), true) diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go index 382d21501c3..9bc0c76106c 100644 --- a/client/go/internal/vespa/document/dispatcher_test.go +++ b/client/go/internal/vespa/document/dispatcher_test.go @@ -42,9 +42,6 @@ func (f *mockFeeder) Send(doc Document) Result { } else { f.documents = append(f.documents, doc) } - if !result.Success() { - result.Stats.Errors = 1 - } return result } @@ -135,7 +132,7 @@ func TestDispatcherOrderingWithFailures(t *testing.T) { dispatcher.Close() wantDocs := docs[:2] assert.Equal(t, wantDocs, feeder.documents) - assert.Equal(t, int64(20), dispatcher.Stats().Errors) + assert.Equal(t, int64(20), dispatcher.Stats().Unsuccessful()) // Dispatching more documents for same ID succeed feeder.failAfterN(0) @@ -145,7 +142,7 @@ func TestDispatcherOrderingWithFailures(t *testing.T) { dispatcher.Enqueue(Document{Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut}) dispatcher.Enqueue(Document{Id: mustParseId("id:ns:type::doc3"), Operation: OperationPut}) dispatcher.Close() - assert.Equal(t, int64(20), dispatcher.Stats().Errors) + assert.Equal(t, int64(20), dispatcher.Stats().Unsuccessful()) assert.Equal(t, 6, len(feeder.documents)) } @@ -166,7 +163,7 @@ func TestDispatcherOrderingWithRetry(t *testing.T) { } dispatcher.Close() assert.Equal(t, docs, feeder.documents) - assert.Equal(t, int64(5), dispatcher.Stats().Errors) + assert.Equal(t, int64(5), dispatcher.Stats().Unsuccessful()) } func TestDispatcherOpenCircuit(t *testing.T) { diff --git a/client/go/internal/vespa/document/feeder.go b/client/go/internal/vespa/document/feeder.go deleted file mode 100644 index 6bcd4afe916..00000000000 --- a/client/go/internal/vespa/document/feeder.go +++ /dev/null @@ -1,95 +0,0 @@ -package document - -import ( - "time" -) - -type Status int - -const ( - // StatusSuccess indicates a successful document operation. - StatusSuccess Status = iota - // StatusConditionNotMet indicates that the document operation itself was successful, but did not satisfy its - // test-and-set condition. - StatusConditionNotMet - // StatusVespaFailure indicates that Vespa failed to process the document operation. - StatusVespaFailure - // StatusTransportFailure indicates that there was failure in the transport layer error while sending the document - // operation to Vespa. - StatusTransportFailure -) - -// Result represents the result of a feeding operation. -type Result struct { - Err error - Id Id - Message string - Trace string - Stats Stats - Status Status - HTTPStatus int -} - -func (r Result) Success() bool { - return r.HTTPStatus/100 == 2 || r.HTTPStatus == 404 || r.HTTPStatus == 412 -} - -// Stats represents feeding operation statistics. -type Stats struct { - ResponsesByCode map[int]int64 - Requests int64 - Responses int64 - Errors int64 - Inflight int64 - TotalLatency time.Duration - MinLatency time.Duration - MaxLatency time.Duration - BytesSent int64 - BytesRecv int64 -} - -// AvgLatency returns the average latency for a request. -func (s Stats) AvgLatency() time.Duration { - requests := s.Requests - if requests == 0 { - requests = 1 - } - return s.TotalLatency / time.Duration(requests) -} - -func (s Stats) Successes() int64 { - if s.ResponsesByCode == nil { - return 0 - } - return s.ResponsesByCode[200] -} - -// Add all statistics contained in other to this. -func (s *Stats) Add(other Stats) { - s.Requests += other.Requests - s.Responses += other.Responses - if s.ResponsesByCode == nil && other.ResponsesByCode != nil { - s.ResponsesByCode = make(map[int]int64) - } - for code, count := range other.ResponsesByCode { - _, ok := s.ResponsesByCode[code] - if ok { - s.ResponsesByCode[code] += count - } else { - s.ResponsesByCode[code] = count - } - } - s.Errors += other.Errors - s.TotalLatency += other.TotalLatency - if s.MinLatency == 0 || (other.MinLatency > 0 && other.MinLatency < s.MinLatency) { - s.MinLatency = other.MinLatency - } - if other.MaxLatency > s.MaxLatency { - s.MaxLatency = other.MaxLatency - } - s.BytesSent += other.BytesSent - s.BytesRecv += other.BytesRecv -} - -// Feeder is the interface for a consumer of documents. -type Feeder interface{ Send(Document) Result } diff --git a/client/go/internal/vespa/document/feeder_test.go b/client/go/internal/vespa/document/feeder_test.go deleted file mode 100644 index a7d92495889..00000000000 --- a/client/go/internal/vespa/document/feeder_test.go +++ /dev/null @@ -1,34 +0,0 @@ -package document - -import ( - "reflect" - "testing" - "time" -) - -func TestStatsAdd(t *testing.T) { - var got Stats - got.Add(Stats{Requests: 1}) - got.Add(Stats{Requests: 1}) - got.Add(Stats{Responses: 1}) - got.Add(Stats{Responses: 1}) - got.Add(Stats{ResponsesByCode: map[int]int64{200: 2}}) - got.Add(Stats{ResponsesByCode: map[int]int64{200: 2}}) - got.Add(Stats{MinLatency: 200 * time.Millisecond}) - got.Add(Stats{MaxLatency: 400 * time.Millisecond}) - got.Add(Stats{MinLatency: 100 * time.Millisecond}) - got.Add(Stats{MaxLatency: 500 * time.Millisecond}) - got.Add(Stats{MaxLatency: 300 * time.Millisecond}) - got.Add(Stats{}) - - want := Stats{ - Requests: 2, - Responses: 2, - ResponsesByCode: map[int]int64{200: 4}, - MinLatency: 100 * time.Millisecond, - MaxLatency: 500 * time.Millisecond, - } - if !reflect.DeepEqual(got, want) { - t.Errorf("got %+v, want %+v", got, want) - } -} diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go index e083f017c4a..8ea28203c28 100644 --- a/client/go/internal/vespa/document/http.go +++ b/client/go/internal/vespa/document/http.go @@ -274,7 +274,7 @@ func (c *Client) clientTimeout() time.Duration { // Send given document to the endpoint configured in this client. func (c *Client) Send(document Document) Result { start := c.now() - result := Result{Id: document.Id, Stats: Stats{Requests: 1}} + result := Result{Id: document.Id} req, buf, err := c.prepare(document) defer c.buffers.Put(buf) if err != nil { @@ -294,7 +294,6 @@ func (c *Client) Send(document Document) Result { } func resultWithErr(result Result, err error) Result { - result.Stats.Errors++ result.Status = StatusTransportFailure result.Err = err return result @@ -302,8 +301,6 @@ func resultWithErr(result Result, err error) Result { func resultWithResponse(resp *http.Response, sentBytes int, result Result, elapsed time.Duration, buf *bytes.Buffer) Result { result.HTTPStatus = resp.StatusCode - result.Stats.Responses++ - result.Stats.ResponsesByCode = map[int]int64{resp.StatusCode: 1} switch resp.StatusCode { case 200: result.Status = StatusSuccess @@ -314,30 +311,24 @@ func resultWithResponse(resp *http.Response, sentBytes int, result Result, elaps default: result.Status = StatusTransportFailure } - var body struct { - Message string `json:"message"` - Trace json.RawValue `json:"trace"` - } buf.Reset() written, err := io.Copy(buf, resp.Body) if err != nil { - result.Status = StatusVespaFailure - result.Err = err + result = resultWithErr(result, err) } else { + var body struct { + Message string `json:"message"` + Trace json.RawValue `json:"trace"` + } if err := json.Unmarshal(buf.Bytes(), &body); err != nil { - result.Status = StatusVespaFailure - result.Err = fmt.Errorf("failed to decode json response: %w", err) + result = resultWithErr(result, fmt.Errorf("failed to decode json response: %w", err)) + } else { + result.Message = body.Message + result.Trace = string(body.Trace) } } - result.Message = body.Message - result.Trace = string(body.Trace) - result.Stats.BytesSent = int64(sentBytes) - result.Stats.BytesRecv = int64(written) - if !result.Success() { - result.Stats.Errors++ - } - result.Stats.TotalLatency = elapsed - result.Stats.MinLatency = elapsed - result.Stats.MaxLatency = elapsed + result.Latency = elapsed + result.BytesSent = int64(sentBytes) + result.BytesRecv = int64(written) return result } diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go index 1bc3a6c9f39..bfcd15c6070 100644 --- a/client/go/internal/vespa/document/http_test.go +++ b/client/go/internal/vespa/document/http_test.go @@ -85,37 +85,28 @@ func TestClientSend(t *testing.T) { for i, tt := range tests { doc := tt.in wantRes := Result{ - Id: doc.Id, - Stats: Stats{ - Requests: 1, - Responses: 1, - TotalLatency: time.Second, - MinLatency: time.Second, - MaxLatency: time.Second, - }, + Id: doc.Id, + Latency: time.Second, } if i < 3 { httpClient.NextResponseString(200, `{"message":"All good!"}`) wantRes.Status = StatusSuccess wantRes.HTTPStatus = 200 wantRes.Message = "All good!" - wantRes.Stats.ResponsesByCode = map[int]int64{200: 1} - wantRes.Stats.BytesRecv = 23 + wantRes.BytesRecv = 23 } else { httpClient.NextResponseString(502, `{"message":"Good bye, cruel world!"}`) wantRes.Status = StatusVespaFailure wantRes.HTTPStatus = 502 wantRes.Message = "Good bye, cruel world!" - wantRes.Stats.ResponsesByCode = map[int]int64{502: 1} - wantRes.Stats.Errors = 1 - wantRes.Stats.BytesRecv = 36 + wantRes.BytesRecv = 36 } res := client.Send(doc) - wantRes.Stats.BytesSent = int64(len(httpClient.LastBody)) + wantRes.BytesSent = int64(len(httpClient.LastBody)) if !reflect.DeepEqual(res, wantRes) { t.Fatalf("got result %+v, want %+v", res, wantRes) } - stats.Add(res.Stats) + stats.Add(res) r := httpClient.LastRequest if r.Method != tt.method { t.Errorf("got r.Method = %q, want %q", r.Method, tt.method) @@ -137,7 +128,7 @@ func TestClientSend(t *testing.T) { 200: 3, 502: 1, }, - Errors: 1, + Errors: 0, Inflight: 0, TotalLatency: 4 * time.Second, MinLatency: time.Second, @@ -191,8 +182,8 @@ func assertCompressedRequest(t *testing.T, want bool, result Result, client *moc if gotEnc != wantEnc { t.Errorf("got Content-Encoding=%q, want %q", gotEnc, wantEnc) } - if result.Stats.BytesSent != int64(len(client.LastBody)) { - t.Errorf("got BytesSent=%d, want %d", result.Stats.BytesSent, len(client.LastBody)) + if result.BytesSent != int64(len(client.LastBody)) { + t.Errorf("got BytesSent=%d, want %d", result.BytesSent, len(client.LastBody)) } compressed := bytes.HasPrefix(client.LastBody, []byte{0x1f, 0x8b}) if compressed != want { diff --git a/client/go/internal/vespa/document/stats.go b/client/go/internal/vespa/document/stats.go new file mode 100644 index 00000000000..4865648a826 --- /dev/null +++ b/client/go/internal/vespa/document/stats.go @@ -0,0 +1,94 @@ +package document + +import ( + "time" +) + +// Status of a document operation. +type Status int + +const ( + // StatusSuccess indicates a successful document operation. + StatusSuccess Status = iota + // StatusConditionNotMet indicates that the document operation itself was successful, but did not satisfy its + // test-and-set condition. + StatusConditionNotMet + // StatusVespaFailure indicates that Vespa failed to process the document operation. + StatusVespaFailure + // StatusTransportFailure indicates that there was failure in the transport layer error while sending the document + // operation to Vespa. + StatusTransportFailure +) + +// Result represents the result of a feeding operation. +type Result struct { + Err error + Id Id + Message string + Trace string + Status Status + HTTPStatus int + Latency time.Duration + BytesSent int64 + BytesRecv int64 +} + +func (r Result) Success() bool { + return r.HTTPStatus/100 == 2 || r.HTTPStatus == 404 || r.HTTPStatus == 412 +} + +// Stats represents feeding operation statistics. +type Stats struct { + ResponsesByCode map[int]int64 + Requests int64 + Responses int64 + Errors int64 + Inflight int64 + TotalLatency time.Duration + MinLatency time.Duration + MaxLatency time.Duration + BytesSent int64 + BytesRecv int64 +} + +// AvgLatency returns the average latency for a request. +func (s Stats) AvgLatency() time.Duration { + requests := s.Requests + if requests == 0 { + requests = 1 + } + return s.TotalLatency / time.Duration(requests) +} + +func (s Stats) Successful() int64 { + if s.ResponsesByCode == nil { + return 0 + } + return s.ResponsesByCode[200] +} + +func (s Stats) Unsuccessful() int64 { return s.Requests - s.Successful() } + +// Add statistics from result to this. +func (s *Stats) Add(result Result) { + s.Requests++ + if s.ResponsesByCode == nil { + s.ResponsesByCode = make(map[int]int64) + } + responsesByCode := s.ResponsesByCode[result.HTTPStatus] + s.ResponsesByCode[result.HTTPStatus] = responsesByCode + 1 + if result.Err == nil { + s.Responses++ + } else { + s.Errors++ + } + s.TotalLatency += result.Latency + if result.Latency < s.MinLatency || s.MinLatency == 0 { + s.MinLatency = result.Latency + } + if result.Latency > s.MaxLatency { + s.MaxLatency = result.Latency + } + s.BytesSent += result.BytesSent + s.BytesRecv += result.BytesRecv +} diff --git a/client/go/internal/vespa/document/stats_test.go b/client/go/internal/vespa/document/stats_test.go new file mode 100644 index 00000000000..3999ef2e503 --- /dev/null +++ b/client/go/internal/vespa/document/stats_test.go @@ -0,0 +1,31 @@ +package document + +import ( + "reflect" + "testing" + "time" +) + +func TestStatsAdd(t *testing.T) { + var stats Stats + stats.Add(Result{HTTPStatus: 200, Latency: 200 * time.Millisecond}) + stats.Add(Result{HTTPStatus: 200, Latency: 400 * time.Millisecond}) + stats.Add(Result{HTTPStatus: 200, Latency: 100 * time.Millisecond}) + stats.Add(Result{HTTPStatus: 200, Latency: 500 * time.Millisecond}) + stats.Add(Result{HTTPStatus: 200, Latency: 300 * time.Millisecond}) + stats.Add(Result{HTTPStatus: 500, Latency: 100 * time.Millisecond}) + expected := Stats{ + Requests: 6, + Responses: 6, + ResponsesByCode: map[int]int64{200: 5, 500: 1}, + TotalLatency: 1600 * time.Millisecond, + MinLatency: 100 * time.Millisecond, + MaxLatency: 500 * time.Millisecond, + } + if !reflect.DeepEqual(stats, expected) { + t.Errorf("got %+v, want %+v", stats, expected) + } + if want, got := int64(1), stats.Unsuccessful(); want != got { + t.Errorf("got stats.Unsuccessful() = %d, want %d", got, want) + } +} -- cgit v1.2.3 From 247e75cbe1ae11f8f4b7f2ff5f2bead3c66a19c9 Mon Sep 17 00:00:00 2001 From: Martin Polden Date: Wed, 24 May 2023 09:49:49 +0200 Subject: Avoid returning incomplete stats from dispatcher --- client/go/internal/vespa/document/dispatcher.go | 8 ++++---- client/go/internal/vespa/document/stats.go | 11 +++++++++++ client/go/internal/vespa/document/stats_test.go | 12 ++++++++++++ 3 files changed, 27 insertions(+), 4 deletions(-) (limited to 'client') diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index ca8c585a295..a2b84aaeef2 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -157,8 +157,7 @@ func (d *Dispatcher) dispatchNext(id Id) { } hasNext := q != nil if hasNext { - next, ok := q.Poll() - if ok { + if next, ok := q.Poll(); ok { // we have more operations with this ID: dispatch the next one d.dispatch(next) } else { @@ -240,8 +239,9 @@ func (d *Dispatcher) Enqueue(doc Document) error { return d.enqueue(documentOp{d func (d *Dispatcher) Stats() Stats { d.statsMu.Lock() defer d.statsMu.Unlock() - d.stats.Inflight = d.inflightCount.Load() - return d.stats + statsCopy := d.stats.Clone() + statsCopy.Inflight = d.inflightCount.Load() + return statsCopy } // Close waits for all inflight operations to complete and closes the dispatcher. diff --git a/client/go/internal/vespa/document/stats.go b/client/go/internal/vespa/document/stats.go index 4865648a826..7696648f703 100644 --- a/client/go/internal/vespa/document/stats.go +++ b/client/go/internal/vespa/document/stats.go @@ -69,6 +69,17 @@ func (s Stats) Successful() int64 { func (s Stats) Unsuccessful() int64 { return s.Requests - s.Successful() } +func (s Stats) Clone() Stats { + if s.ResponsesByCode != nil { + mapCopy := make(map[int]int64) + for k, v := range s.ResponsesByCode { + mapCopy[k] = v + } + s.ResponsesByCode = mapCopy + } + return s +} + // Add statistics from result to this. func (s *Stats) Add(result Result) { s.Requests++ diff --git a/client/go/internal/vespa/document/stats_test.go b/client/go/internal/vespa/document/stats_test.go index 3999ef2e503..8788836f9ad 100644 --- a/client/go/internal/vespa/document/stats_test.go +++ b/client/go/internal/vespa/document/stats_test.go @@ -29,3 +29,15 @@ func TestStatsAdd(t *testing.T) { t.Errorf("got stats.Unsuccessful() = %d, want %d", got, want) } } + +func TestStatsClone(t *testing.T) { + var a Stats + a.Add(Result{HTTPStatus: 200}) + b := a.Clone() + a.Add(Result{HTTPStatus: 200}) + + want := Stats{Requests: 1, Responses: 1, ResponsesByCode: map[int]int64{200: 1}} + if !reflect.DeepEqual(b, want) { + t.Errorf("got %+v, want %+v", b, want) + } +} -- cgit v1.2.3