summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-04-27 13:05:26 +0200
committerGitHub <noreply@github.com>2023-04-27 13:05:26 +0200
commit2ef830b4fea1f723c5a5117770fe646e8c21315a (patch)
tree3a886dcbdc02893bbba29de048649e70dad7240a
parent139646116e78288ee7c53f92a17802e7e329e6c0 (diff)
parentb8e5228f51f47d0b63819b826da34ca7e4946d1a (diff)
Merge pull request #26883 from vespa-engine/mpolden/feed-client-17
Reduce allocations when creating result
-rw-r--r--client/go/internal/cli/cmd/feed.go7
-rw-r--r--client/go/internal/vespa/document/dispatcher.go8
-rw-r--r--client/go/internal/vespa/document/http.go66
-rw-r--r--client/go/internal/vespa/document/http_test.go17
-rw-r--r--client/go/internal/vespa/document/queue.go28
-rw-r--r--client/go/internal/vespa/document/queue_test.go3
6 files changed, 57 insertions, 72 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go
index d50a12d8884..5b168ef79a2 100644
--- a/client/go/internal/cli/cmd/feed.go
+++ b/client/go/internal/cli/cmd/feed.go
@@ -18,7 +18,7 @@ import (
func addFeedFlags(cmd *cobra.Command, options *feedOptions) {
cmd.PersistentFlags().IntVar(&options.connections, "connections", 8, "The number of connections to use")
cmd.PersistentFlags().StringVar(&options.compression, "compression", "auto", `Compression mode to use. Default is "auto" which compresses large documents. Must be "auto", "gzip" or "none"`)
- cmd.PersistentFlags().IntVar(&options.timeoutSecs, "timeout", 200, "Individual feed operation timeout in seconds")
+ cmd.PersistentFlags().IntVar(&options.timeoutSecs, "timeout", 0, "Individual feed operation timeout in seconds. 0 to disable (default 0)")
cmd.PersistentFlags().IntVar(&options.doomSecs, "deadline", 0, "Exit if this number of seconds elapse without any successful operations. 0 to disable (default 0)")
cmd.PersistentFlags().BoolVar(&options.verbose, "verbose", false, "Verbose mode. Print successful operations in addition to errors")
cmd.PersistentFlags().StringVar(&options.route, "route", "", `Target Vespa route for feed operations (default "default")`)
@@ -139,7 +139,7 @@ func feed(files []string, options feedOptions, cli *CLI) error {
if err != nil {
return err
}
- client := document.NewClient(document.ClientOptions{
+ client, err := document.NewClient(document.ClientOptions{
Compression: compression,
Timeout: time.Duration(options.timeoutSecs) * time.Second,
Route: options.route,
@@ -147,6 +147,9 @@ func feed(files []string, options feedOptions, cli *CLI) error {
BaseURL: service.BaseURL,
NowFunc: cli.now,
}, clients)
+ if err != nil {
+ return err
+ }
throttler := document.NewThrottler(options.connections)
circuitBreaker := document.NewCircuitBreaker(10*time.Second, time.Duration(options.doomSecs)*time.Second)
dispatcher := document.NewDispatcher(client, throttler, circuitBreaker, cli.Stderr, options.verbose)
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index 3876e55210a..9d0a0f7cdb1 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -1,7 +1,6 @@
package document
import (
- "container/list"
"fmt"
"io"
"strings"
@@ -30,7 +29,7 @@ type Dispatcher struct {
output io.Writer
verbose bool
- listPool sync.Pool
+ queuePool sync.Pool
mu sync.Mutex
statsMu sync.Mutex
wg sync.WaitGroup
@@ -60,6 +59,7 @@ func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, o
output: output,
verbose: verbose,
}
+ d.queuePool.New = func() any { return NewQueue[documentOp]() }
d.start()
return d
}
@@ -112,7 +112,6 @@ func (d *Dispatcher) start() {
if d.started {
return
}
- d.listPool.New = func() any { return list.New() }
d.ready = make(chan documentOp, 4096)
d.results = make(chan documentOp, 4096)
d.msgs = make(chan string, 4096)
@@ -172,6 +171,7 @@ func (d *Dispatcher) dispatchNext(id Id) {
} else {
// no more operations with this ID: release slot
delete(d.inflight, k)
+ d.queuePool.Put(q)
d.releaseSlot()
}
}
@@ -196,7 +196,7 @@ func (d *Dispatcher) enqueue(op documentOp, isRetry bool) error {
key := op.document.Id.String()
q, ok := d.inflight[key]
if !ok {
- q = NewQueue[documentOp](&d.listPool)
+ q = d.queuePool.Get().(*Queue[documentOp])
d.inflight[key] = q
} else {
q.Add(op, isRetry)
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go
index 8bc9499df87..f4e4ec694c8 100644
--- a/client/go/internal/vespa/document/http.go
+++ b/client/go/internal/vespa/document/http.go
@@ -28,6 +28,7 @@ const (
// Client represents a HTTP client for the /document/v1/ API.
type Client struct {
+ baseURL *url.URL
options ClientOptions
httpClients []countingHTTPClient
now func() time.Time
@@ -57,20 +58,13 @@ func (c *countingHTTPClient) Do(req *http.Request, timeout time.Duration) (*http
return c.client.Do(req, timeout)
}
-type countingReader struct {
- reader io.Reader
- bytesRead int64
-}
-
-func (r *countingReader) Read(p []byte) (int, error) {
- n, err := r.reader.Read(p)
- r.bytesRead += int64(n)
- return n, err
-}
-
-func NewClient(options ClientOptions, httpClients []util.HTTPClient) *Client {
+func NewClient(options ClientOptions, httpClients []util.HTTPClient) (*Client, error) {
if len(httpClients) < 1 {
- panic("need at least one HTTP client")
+ return nil, fmt.Errorf("need at least one HTTP client")
+ }
+ u, err := url.Parse(options.BaseURL)
+ if err != nil {
+ return nil, fmt.Errorf("invalid base url: %w", err)
}
countingClients := make([]countingHTTPClient, 0, len(httpClients))
for _, client := range httpClients {
@@ -81,18 +75,20 @@ func NewClient(options ClientOptions, httpClients []util.HTTPClient) *Client {
nowFunc = time.Now
}
c := &Client{
+ baseURL: u,
options: options,
httpClients: countingClients,
now: nowFunc,
}
c.gzippers.New = func() any { return gzip.NewWriter(io.Discard) }
- return c
+ return c, nil
}
func (c *Client) queryParams() url.Values {
params := url.Values{}
- timeout := c.options.Timeout*11/10 + 1000
- params.Set("timeout", strconv.FormatInt(timeout.Milliseconds(), 10)+"ms")
+ if c.options.Timeout > 0 {
+ params.Set("timeout", strconv.FormatInt(c.options.Timeout.Milliseconds(), 10)+"ms")
+ }
if c.options.Route != "" {
params.Set("route", c.options.Route)
}
@@ -123,11 +119,7 @@ func urlPath(id Id) string {
return sb.String()
}
-func (c *Client) feedURL(d Document, queryParams url.Values) (string, *url.URL, error) {
- u, err := url.Parse(c.options.BaseURL)
- if err != nil {
- return "", nil, fmt.Errorf("invalid base url: %w", err)
- }
+func (c *Client) feedURL(d Document, queryParams url.Values) (string, *url.URL) {
httpMethod := ""
switch d.Operation {
case OperationPut:
@@ -143,9 +135,10 @@ func (c *Client) feedURL(d Document, queryParams url.Values) (string, *url.URL,
if d.Create {
queryParams.Set("create", "true")
}
+ u := *c.baseURL
u.Path = urlPath(d.Id)
u.RawQuery = queryParams.Encode()
- return httpMethod, u, nil
+ return httpMethod, &u
}
func (c *Client) leastBusyClient() *countingHTTPClient {
@@ -177,6 +170,7 @@ func (c *Client) createRequest(method, url string, body []byte) (*http.Request,
useGzip := c.options.Compression == CompressionGzip || (c.options.Compression == CompressionAuto && len(body) > 512)
if useGzip {
var buf bytes.Buffer
+ buf.Grow(1024)
w := c.gzipWriter(&buf)
if _, err := w.Write(body); err != nil {
return nil, err
@@ -200,19 +194,23 @@ func (c *Client) createRequest(method, url string, body []byte) (*http.Request,
return req, nil
}
+func (c *Client) clientTimeout() time.Duration {
+ if c.options.Timeout < 1 {
+ return 190 * time.Second
+ }
+ return c.options.Timeout*11/10 + 1000 // slightly higher than the server-side timeout
+}
+
// Send given document to the endpoint configured in this client.
func (c *Client) Send(document Document) Result {
start := c.now()
result := Result{Id: document.Id, Stats: Stats{Requests: 1}}
- method, url, err := c.feedURL(document, c.queryParams())
- if err != nil {
- return resultWithErr(result, err)
- }
+ method, url := c.feedURL(document, c.queryParams())
req, err := c.createRequest(method, url.String(), document.Body)
if err != nil {
return resultWithErr(result, err)
}
- resp, err := c.leastBusyClient().Do(req, 190*time.Second)
+ resp, err := c.leastBusyClient().Do(req, c.clientTimeout())
if err != nil {
return resultWithErr(result, err)
}
@@ -246,16 +244,20 @@ func resultWithResponse(resp *http.Response, result Result, document Document, e
Message string `json:"message"`
Trace json.RawMessage `json:"trace"`
}
- cr := countingReader{reader: resp.Body}
- jsonDec := json.NewDecoder(&cr)
- if err := jsonDec.Decode(&body); err != nil {
+ b, err := io.ReadAll(resp.Body)
+ if err != nil {
result.Status = StatusVespaFailure
- result.Err = fmt.Errorf("failed to decode json response: %w", err)
+ result.Err = err
+ } else {
+ if err := json.Unmarshal(b, &body); err != nil {
+ result.Status = StatusVespaFailure
+ result.Err = fmt.Errorf("failed to decode json response: %w", err)
+ }
}
result.Message = body.Message
result.Trace = string(body.Trace)
result.Stats.BytesSent = int64(len(document.Body))
- result.Stats.BytesRecv = cr.bytesRead
+ result.Stats.BytesRecv = int64(len(b))
if !result.Success() {
result.Stats.Errors++
}
diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go
index f67368b5128..61ead1ba9c3 100644
--- a/client/go/internal/vespa/document/http_test.go
+++ b/client/go/internal/vespa/document/http_test.go
@@ -38,7 +38,7 @@ func TestLeastBusyClient(t *testing.T) {
for i := 0; i < 4; i++ {
httpClients = append(httpClients, &mockHTTPClient{i, &httpClient})
}
- client := NewClient(ClientOptions{}, httpClients)
+ client, _ := NewClient(ClientOptions{}, httpClients)
client.httpClients[0].addInflight(1)
client.httpClients[1].addInflight(1)
assertLeastBusy(t, 2, client)
@@ -65,7 +65,7 @@ func TestClientSend(t *testing.T) {
{Create: true, Id: mustParseId("id:ns:type::doc3"), Operation: OperationUpdate, Body: []byte(`{"fields":{"baz": "789"}}`)},
}
httpClient := mock.HTTPClient{}
- client := NewClient(ClientOptions{
+ client, _ := NewClient(ClientOptions{
BaseURL: "https://example.com:1337",
Timeout: time.Duration(5 * time.Second),
}, []util.HTTPClient{&httpClient})
@@ -109,7 +109,7 @@ func TestClientSend(t *testing.T) {
if r.Method != http.MethodPut {
t.Errorf("got r.Method = %q, want %q", r.Method, http.MethodPut)
}
- wantURL := fmt.Sprintf("https://example.com:1337/document/v1/ns/type/docid/%s?create=true&timeout=5500ms", doc.Id.UserSpecific)
+ wantURL := fmt.Sprintf("https://example.com:1337/document/v1/ns/type/docid/%s?create=true&timeout=5000ms", doc.Id.UserSpecific)
if r.URL.String() != wantURL {
t.Errorf("got r.URL = %q, want %q", r.URL, wantURL)
}
@@ -144,7 +144,7 @@ func TestClientSend(t *testing.T) {
func TestClientSendCompressed(t *testing.T) {
httpClient := mock.HTTPClient{}
- client := NewClient(ClientOptions{
+ client, _ := NewClient(ClientOptions{
BaseURL: "https://example.com:1337",
Timeout: time.Duration(5 * time.Second),
}, []util.HTTPClient{&httpClient})
@@ -278,16 +278,13 @@ func TestClientFeedURL(t *testing.T) {
},
}
httpClient := mock.HTTPClient{}
- client := NewClient(ClientOptions{
+ client, _ := NewClient(ClientOptions{
BaseURL: "https://example.com",
}, []util.HTTPClient{&httpClient})
for i, tt := range tests {
moreParams := url.Values{}
moreParams.Set("foo", "ba/r")
- method, u, err := client.feedURL(tt.in, moreParams)
- if err != nil {
- t.Errorf("#%d: got unexpected error = %s, want none", i, err)
- }
+ method, u := client.feedURL(tt.in, moreParams)
if u.String() != tt.url || method != tt.method {
t.Errorf("#%d: URL() = (%s, %s), want (%s, %s)", i, method, u.String(), tt.method, tt.url)
}
@@ -296,7 +293,7 @@ func TestClientFeedURL(t *testing.T) {
func benchmarkClientSend(b *testing.B, compression Compression, document Document) {
httpClient := mock.HTTPClient{}
- client := NewClient(ClientOptions{
+ client, _ := NewClient(ClientOptions{
Compression: compression,
BaseURL: "https://example.com:1337",
Timeout: time.Duration(5 * time.Second),
diff --git a/client/go/internal/vespa/document/queue.go b/client/go/internal/vespa/document/queue.go
index 2e5a1976d58..eed5209ca9e 100644
--- a/client/go/internal/vespa/document/queue.go
+++ b/client/go/internal/vespa/document/queue.go
@@ -2,26 +2,14 @@ package document
import (
"container/list"
- "sync"
)
-// Queue wraps a doubly linked list. It attempts to re-use lists through a sync.Pool to reduce GC pressure.
-type Queue[T any] struct {
- items *list.List
- listPool *sync.Pool
-}
+// Queue is a generic wrapper around a doubly linked list.
+type Queue[T any] struct{ items *list.List }
-func NewQueue[T any](listPool *sync.Pool) *Queue[T] {
- if listPool.New == nil {
- listPool.New = func() any { return list.New() }
- }
- return &Queue[T]{listPool: listPool}
-}
+func NewQueue[T any]() *Queue[T] { return &Queue[T]{items: list.New()} }
func (q *Queue[T]) Add(item T, front bool) {
- if q.items == nil {
- q.items = q.listPool.Get().(*list.List)
- }
if front {
q.items.PushFront(item)
} else {
@@ -30,14 +18,10 @@ func (q *Queue[T]) Add(item T, front bool) {
}
func (q *Queue[T]) Poll() (T, bool) {
- if q.items == nil || q.items.Front() == nil {
+ front := q.items.Front()
+ if front == nil {
var empty T
return empty, false
}
- item := q.items.Remove(q.items.Front()).(T)
- if q.items.Front() == nil { // Emptied queue, release list back to pool
- q.listPool.Put(q.items)
- q.items = nil
- }
- return item, true
+ return q.items.Remove(front).(T), true
}
diff --git a/client/go/internal/vespa/document/queue_test.go b/client/go/internal/vespa/document/queue_test.go
index 992e7410053..f2bdf416570 100644
--- a/client/go/internal/vespa/document/queue_test.go
+++ b/client/go/internal/vespa/document/queue_test.go
@@ -1,12 +1,11 @@
package document
import (
- "sync"
"testing"
)
func TestQueue(t *testing.T) {
- q := NewQueue[int](&sync.Pool{})
+ q := NewQueue[int]()
assertPoll(t, q, 0, false)
q.Add(1, false)
q.Add(2, false)