aboutsummaryrefslogtreecommitdiffstats
path: root/client/go
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-05-23 23:56:24 +0200
committerMartin Polden <mpolden@mpolden.no>2023-05-24 10:00:29 +0200
commit369e3ffe35620747b39cf92cfe77bd721b1f6949 (patch)
treeca0e17908767b1a495bcaf58a1d85acac42de638 /client/go
parent482e915731f2d3a78acfff7b2ce4a11ec612ff58 (diff)
Clean up stats collection
Diffstat (limited to 'client/go')
-rw-r--r--client/go/internal/cli/cmd/feed.go8
-rw-r--r--client/go/internal/vespa/document/dispatcher.go5
-rw-r--r--client/go/internal/vespa/document/dispatcher_test.go9
-rw-r--r--client/go/internal/vespa/document/feeder_test.go34
-rw-r--r--client/go/internal/vespa/document/http.go35
-rw-r--r--client/go/internal/vespa/document/http_test.go27
-rw-r--r--client/go/internal/vespa/document/stats.go (renamed from client/go/internal/vespa/document/feeder.go)49
-rw-r--r--client/go/internal/vespa/document/stats_test.go31
8 files changed, 88 insertions, 110 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go
index fa87c420f16..2a7d8491578 100644
--- a/client/go/internal/cli/cmd/feed.go
+++ b/client/go/internal/cli/cmd/feed.go
@@ -244,7 +244,7 @@ type feedSummary struct {
RequestCount int64 `json:"http.request.count"`
RequestBytes int64 `json:"http.request.bytes"`
RequestRate number `json:"http.request.MBps"`
- ExceptionCount int64 `json:"http.exception.count"` // same as ErrorCount, for compatability with vespa-feed-client
+ ExceptionCount int64 `json:"http.exception.count"` // same as ErrorCount, for compatibility with vespa-feed-client output
ResponseCount int64 `json:"http.response.count"`
ResponseBytes int64 `json:"http.response.bytes"`
@@ -264,8 +264,8 @@ func mbps(bytes int64, duration time.Duration) float64 {
func writeSummaryJSON(w io.Writer, stats document.Stats, duration time.Duration) error {
summary := feedSummary{
Seconds: number(duration.Seconds()),
- SuccessCount: stats.Successes(),
- SuccessRate: number(float64(stats.Successes()) / math.Max(1, duration.Seconds())),
+ SuccessCount: stats.Successful(),
+ SuccessRate: number(float64(stats.Successful()) / math.Max(1, duration.Seconds())),
ErrorCount: stats.Errors,
InflightCount: stats.Inflight,
@@ -277,7 +277,7 @@ func writeSummaryJSON(w io.Writer, stats document.Stats, duration time.Duration)
ResponseCount: stats.Responses,
ResponseBytes: stats.BytesRecv,
ResponseRate: number(mbps(stats.BytesRecv, duration)),
- ResponseErrorCount: stats.Responses - stats.Successes(),
+ ResponseErrorCount: stats.Unsuccessful(),
ResponseMinLatency: stats.MinLatency.Milliseconds(),
ResponseAvgLatency: stats.AvgLatency().Milliseconds(),
ResponseMaxLatency: stats.MaxLatency.Milliseconds(),
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index 8ddb34c8c4d..ca8c585a295 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -12,6 +12,9 @@ import (
// maxAttempts controls the maximum number of times a document operation is attempted before giving up.
const maxAttempts = 10
+// Feeder is the interface for a consumer of documents.
+type Feeder interface{ Send(Document) Result }
+
// Dispatcher dispatches documents from a queue to a Feeder.
type Dispatcher struct {
feeder Feeder
@@ -132,7 +135,7 @@ func (d *Dispatcher) processResults() {
defer d.wg.Done()
for op := range d.results {
d.statsMu.Lock()
- d.stats.Add(op.result.Stats)
+ d.stats.Add(op.result)
d.statsMu.Unlock()
if d.shouldRetry(op, op.result) {
d.enqueue(op.resetResult(), true)
diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go
index 382d21501c3..9bc0c76106c 100644
--- a/client/go/internal/vespa/document/dispatcher_test.go
+++ b/client/go/internal/vespa/document/dispatcher_test.go
@@ -42,9 +42,6 @@ func (f *mockFeeder) Send(doc Document) Result {
} else {
f.documents = append(f.documents, doc)
}
- if !result.Success() {
- result.Stats.Errors = 1
- }
return result
}
@@ -135,7 +132,7 @@ func TestDispatcherOrderingWithFailures(t *testing.T) {
dispatcher.Close()
wantDocs := docs[:2]
assert.Equal(t, wantDocs, feeder.documents)
- assert.Equal(t, int64(20), dispatcher.Stats().Errors)
+ assert.Equal(t, int64(20), dispatcher.Stats().Unsuccessful())
// Dispatching more documents for same ID succeed
feeder.failAfterN(0)
@@ -145,7 +142,7 @@ func TestDispatcherOrderingWithFailures(t *testing.T) {
dispatcher.Enqueue(Document{Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut})
dispatcher.Enqueue(Document{Id: mustParseId("id:ns:type::doc3"), Operation: OperationPut})
dispatcher.Close()
- assert.Equal(t, int64(20), dispatcher.Stats().Errors)
+ assert.Equal(t, int64(20), dispatcher.Stats().Unsuccessful())
assert.Equal(t, 6, len(feeder.documents))
}
@@ -166,7 +163,7 @@ func TestDispatcherOrderingWithRetry(t *testing.T) {
}
dispatcher.Close()
assert.Equal(t, docs, feeder.documents)
- assert.Equal(t, int64(5), dispatcher.Stats().Errors)
+ assert.Equal(t, int64(5), dispatcher.Stats().Unsuccessful())
}
func TestDispatcherOpenCircuit(t *testing.T) {
diff --git a/client/go/internal/vespa/document/feeder_test.go b/client/go/internal/vespa/document/feeder_test.go
deleted file mode 100644
index a7d92495889..00000000000
--- a/client/go/internal/vespa/document/feeder_test.go
+++ /dev/null
@@ -1,34 +0,0 @@
-package document
-
-import (
- "reflect"
- "testing"
- "time"
-)
-
-func TestStatsAdd(t *testing.T) {
- var got Stats
- got.Add(Stats{Requests: 1})
- got.Add(Stats{Requests: 1})
- got.Add(Stats{Responses: 1})
- got.Add(Stats{Responses: 1})
- got.Add(Stats{ResponsesByCode: map[int]int64{200: 2}})
- got.Add(Stats{ResponsesByCode: map[int]int64{200: 2}})
- got.Add(Stats{MinLatency: 200 * time.Millisecond})
- got.Add(Stats{MaxLatency: 400 * time.Millisecond})
- got.Add(Stats{MinLatency: 100 * time.Millisecond})
- got.Add(Stats{MaxLatency: 500 * time.Millisecond})
- got.Add(Stats{MaxLatency: 300 * time.Millisecond})
- got.Add(Stats{})
-
- want := Stats{
- Requests: 2,
- Responses: 2,
- ResponsesByCode: map[int]int64{200: 4},
- MinLatency: 100 * time.Millisecond,
- MaxLatency: 500 * time.Millisecond,
- }
- if !reflect.DeepEqual(got, want) {
- t.Errorf("got %+v, want %+v", got, want)
- }
-}
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go
index e083f017c4a..8ea28203c28 100644
--- a/client/go/internal/vespa/document/http.go
+++ b/client/go/internal/vespa/document/http.go
@@ -274,7 +274,7 @@ func (c *Client) clientTimeout() time.Duration {
// Send given document to the endpoint configured in this client.
func (c *Client) Send(document Document) Result {
start := c.now()
- result := Result{Id: document.Id, Stats: Stats{Requests: 1}}
+ result := Result{Id: document.Id}
req, buf, err := c.prepare(document)
defer c.buffers.Put(buf)
if err != nil {
@@ -294,7 +294,6 @@ func (c *Client) Send(document Document) Result {
}
func resultWithErr(result Result, err error) Result {
- result.Stats.Errors++
result.Status = StatusTransportFailure
result.Err = err
return result
@@ -302,8 +301,6 @@ func resultWithErr(result Result, err error) Result {
func resultWithResponse(resp *http.Response, sentBytes int, result Result, elapsed time.Duration, buf *bytes.Buffer) Result {
result.HTTPStatus = resp.StatusCode
- result.Stats.Responses++
- result.Stats.ResponsesByCode = map[int]int64{resp.StatusCode: 1}
switch resp.StatusCode {
case 200:
result.Status = StatusSuccess
@@ -314,30 +311,24 @@ func resultWithResponse(resp *http.Response, sentBytes int, result Result, elaps
default:
result.Status = StatusTransportFailure
}
- var body struct {
- Message string `json:"message"`
- Trace json.RawValue `json:"trace"`
- }
buf.Reset()
written, err := io.Copy(buf, resp.Body)
if err != nil {
- result.Status = StatusVespaFailure
- result.Err = err
+ result = resultWithErr(result, err)
} else {
+ var body struct {
+ Message string `json:"message"`
+ Trace json.RawValue `json:"trace"`
+ }
if err := json.Unmarshal(buf.Bytes(), &body); err != nil {
- result.Status = StatusVespaFailure
- result.Err = fmt.Errorf("failed to decode json response: %w", err)
+ result = resultWithErr(result, fmt.Errorf("failed to decode json response: %w", err))
+ } else {
+ result.Message = body.Message
+ result.Trace = string(body.Trace)
}
}
- result.Message = body.Message
- result.Trace = string(body.Trace)
- result.Stats.BytesSent = int64(sentBytes)
- result.Stats.BytesRecv = int64(written)
- if !result.Success() {
- result.Stats.Errors++
- }
- result.Stats.TotalLatency = elapsed
- result.Stats.MinLatency = elapsed
- result.Stats.MaxLatency = elapsed
+ result.Latency = elapsed
+ result.BytesSent = int64(sentBytes)
+ result.BytesRecv = int64(written)
return result
}
diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go
index 1bc3a6c9f39..bfcd15c6070 100644
--- a/client/go/internal/vespa/document/http_test.go
+++ b/client/go/internal/vespa/document/http_test.go
@@ -85,37 +85,28 @@ func TestClientSend(t *testing.T) {
for i, tt := range tests {
doc := tt.in
wantRes := Result{
- Id: doc.Id,
- Stats: Stats{
- Requests: 1,
- Responses: 1,
- TotalLatency: time.Second,
- MinLatency: time.Second,
- MaxLatency: time.Second,
- },
+ Id: doc.Id,
+ Latency: time.Second,
}
if i < 3 {
httpClient.NextResponseString(200, `{"message":"All good!"}`)
wantRes.Status = StatusSuccess
wantRes.HTTPStatus = 200
wantRes.Message = "All good!"
- wantRes.Stats.ResponsesByCode = map[int]int64{200: 1}
- wantRes.Stats.BytesRecv = 23
+ wantRes.BytesRecv = 23
} else {
httpClient.NextResponseString(502, `{"message":"Good bye, cruel world!"}`)
wantRes.Status = StatusVespaFailure
wantRes.HTTPStatus = 502
wantRes.Message = "Good bye, cruel world!"
- wantRes.Stats.ResponsesByCode = map[int]int64{502: 1}
- wantRes.Stats.Errors = 1
- wantRes.Stats.BytesRecv = 36
+ wantRes.BytesRecv = 36
}
res := client.Send(doc)
- wantRes.Stats.BytesSent = int64(len(httpClient.LastBody))
+ wantRes.BytesSent = int64(len(httpClient.LastBody))
if !reflect.DeepEqual(res, wantRes) {
t.Fatalf("got result %+v, want %+v", res, wantRes)
}
- stats.Add(res.Stats)
+ stats.Add(res)
r := httpClient.LastRequest
if r.Method != tt.method {
t.Errorf("got r.Method = %q, want %q", r.Method, tt.method)
@@ -137,7 +128,7 @@ func TestClientSend(t *testing.T) {
200: 3,
502: 1,
},
- Errors: 1,
+ Errors: 0,
Inflight: 0,
TotalLatency: 4 * time.Second,
MinLatency: time.Second,
@@ -191,8 +182,8 @@ func assertCompressedRequest(t *testing.T, want bool, result Result, client *moc
if gotEnc != wantEnc {
t.Errorf("got Content-Encoding=%q, want %q", gotEnc, wantEnc)
}
- if result.Stats.BytesSent != int64(len(client.LastBody)) {
- t.Errorf("got BytesSent=%d, want %d", result.Stats.BytesSent, len(client.LastBody))
+ if result.BytesSent != int64(len(client.LastBody)) {
+ t.Errorf("got BytesSent=%d, want %d", result.BytesSent, len(client.LastBody))
}
compressed := bytes.HasPrefix(client.LastBody, []byte{0x1f, 0x8b})
if compressed != want {
diff --git a/client/go/internal/vespa/document/feeder.go b/client/go/internal/vespa/document/stats.go
index 6bcd4afe916..4865648a826 100644
--- a/client/go/internal/vespa/document/feeder.go
+++ b/client/go/internal/vespa/document/stats.go
@@ -4,6 +4,7 @@ import (
"time"
)
+// Status of a document operation.
type Status int
const (
@@ -25,9 +26,11 @@ type Result struct {
Id Id
Message string
Trace string
- Stats Stats
Status Status
HTTPStatus int
+ Latency time.Duration
+ BytesSent int64
+ BytesRecv int64
}
func (r Result) Success() bool {
@@ -57,39 +60,35 @@ func (s Stats) AvgLatency() time.Duration {
return s.TotalLatency / time.Duration(requests)
}
-func (s Stats) Successes() int64 {
+func (s Stats) Successful() int64 {
if s.ResponsesByCode == nil {
return 0
}
return s.ResponsesByCode[200]
}
-// Add all statistics contained in other to this.
-func (s *Stats) Add(other Stats) {
- s.Requests += other.Requests
- s.Responses += other.Responses
- if s.ResponsesByCode == nil && other.ResponsesByCode != nil {
+func (s Stats) Unsuccessful() int64 { return s.Requests - s.Successful() }
+
+// Add statistics from result to this.
+func (s *Stats) Add(result Result) {
+ s.Requests++
+ if s.ResponsesByCode == nil {
s.ResponsesByCode = make(map[int]int64)
}
- for code, count := range other.ResponsesByCode {
- _, ok := s.ResponsesByCode[code]
- if ok {
- s.ResponsesByCode[code] += count
- } else {
- s.ResponsesByCode[code] = count
- }
+ responsesByCode := s.ResponsesByCode[result.HTTPStatus]
+ s.ResponsesByCode[result.HTTPStatus] = responsesByCode + 1
+ if result.Err == nil {
+ s.Responses++
+ } else {
+ s.Errors++
}
- s.Errors += other.Errors
- s.TotalLatency += other.TotalLatency
- if s.MinLatency == 0 || (other.MinLatency > 0 && other.MinLatency < s.MinLatency) {
- s.MinLatency = other.MinLatency
+ s.TotalLatency += result.Latency
+ if result.Latency < s.MinLatency || s.MinLatency == 0 {
+ s.MinLatency = result.Latency
}
- if other.MaxLatency > s.MaxLatency {
- s.MaxLatency = other.MaxLatency
+ if result.Latency > s.MaxLatency {
+ s.MaxLatency = result.Latency
}
- s.BytesSent += other.BytesSent
- s.BytesRecv += other.BytesRecv
+ s.BytesSent += result.BytesSent
+ s.BytesRecv += result.BytesRecv
}
-
-// Feeder is the interface for a consumer of documents.
-type Feeder interface{ Send(Document) Result }
diff --git a/client/go/internal/vespa/document/stats_test.go b/client/go/internal/vespa/document/stats_test.go
new file mode 100644
index 00000000000..3999ef2e503
--- /dev/null
+++ b/client/go/internal/vespa/document/stats_test.go
@@ -0,0 +1,31 @@
+package document
+
+import (
+ "reflect"
+ "testing"
+ "time"
+)
+
+func TestStatsAdd(t *testing.T) {
+ var stats Stats
+ stats.Add(Result{HTTPStatus: 200, Latency: 200 * time.Millisecond})
+ stats.Add(Result{HTTPStatus: 200, Latency: 400 * time.Millisecond})
+ stats.Add(Result{HTTPStatus: 200, Latency: 100 * time.Millisecond})
+ stats.Add(Result{HTTPStatus: 200, Latency: 500 * time.Millisecond})
+ stats.Add(Result{HTTPStatus: 200, Latency: 300 * time.Millisecond})
+ stats.Add(Result{HTTPStatus: 500, Latency: 100 * time.Millisecond})
+ expected := Stats{
+ Requests: 6,
+ Responses: 6,
+ ResponsesByCode: map[int]int64{200: 5, 500: 1},
+ TotalLatency: 1600 * time.Millisecond,
+ MinLatency: 100 * time.Millisecond,
+ MaxLatency: 500 * time.Millisecond,
+ }
+ if !reflect.DeepEqual(stats, expected) {
+ t.Errorf("got %+v, want %+v", stats, expected)
+ }
+ if want, got := int64(1), stats.Unsuccessful(); want != got {
+ t.Errorf("got stats.Unsuccessful() = %d, want %d", got, want)
+ }
+}