diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-04-27 13:05:26 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-04-27 13:05:26 +0200 |
commit | 2ef830b4fea1f723c5a5117770fe646e8c21315a (patch) | |
tree | 3a886dcbdc02893bbba29de048649e70dad7240a | |
parent | 139646116e78288ee7c53f92a17802e7e329e6c0 (diff) | |
parent | b8e5228f51f47d0b63819b826da34ca7e4946d1a (diff) |
Merge pull request #26883 from vespa-engine/mpolden/feed-client-17
Reduce allocations when creating result
-rw-r--r-- | client/go/internal/cli/cmd/feed.go | 7 | ||||
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 8 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http.go | 66 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http_test.go | 17 | ||||
-rw-r--r-- | client/go/internal/vespa/document/queue.go | 28 | ||||
-rw-r--r-- | client/go/internal/vespa/document/queue_test.go | 3 |
6 files changed, 57 insertions, 72 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index d50a12d8884..5b168ef79a2 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -18,7 +18,7 @@ import ( func addFeedFlags(cmd *cobra.Command, options *feedOptions) { cmd.PersistentFlags().IntVar(&options.connections, "connections", 8, "The number of connections to use") cmd.PersistentFlags().StringVar(&options.compression, "compression", "auto", `Compression mode to use. Default is "auto" which compresses large documents. Must be "auto", "gzip" or "none"`) - cmd.PersistentFlags().IntVar(&options.timeoutSecs, "timeout", 200, "Individual feed operation timeout in seconds") + cmd.PersistentFlags().IntVar(&options.timeoutSecs, "timeout", 0, "Individual feed operation timeout in seconds. 0 to disable (default 0)") cmd.PersistentFlags().IntVar(&options.doomSecs, "deadline", 0, "Exit if this number of seconds elapse without any successful operations. 0 to disable (default 0)") cmd.PersistentFlags().BoolVar(&options.verbose, "verbose", false, "Verbose mode. Print successful operations in addition to errors") cmd.PersistentFlags().StringVar(&options.route, "route", "", `Target Vespa route for feed operations (default "default")`) @@ -139,7 +139,7 @@ func feed(files []string, options feedOptions, cli *CLI) error { if err != nil { return err } - client := document.NewClient(document.ClientOptions{ + client, err := document.NewClient(document.ClientOptions{ Compression: compression, Timeout: time.Duration(options.timeoutSecs) * time.Second, Route: options.route, @@ -147,6 +147,9 @@ func feed(files []string, options feedOptions, cli *CLI) error { BaseURL: service.BaseURL, NowFunc: cli.now, }, clients) + if err != nil { + return err + } throttler := document.NewThrottler(options.connections) circuitBreaker := document.NewCircuitBreaker(10*time.Second, time.Duration(options.doomSecs)*time.Second) dispatcher := document.NewDispatcher(client, throttler, circuitBreaker, cli.Stderr, options.verbose) diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 3876e55210a..9d0a0f7cdb1 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -1,7 +1,6 @@ package document import ( - "container/list" "fmt" "io" "strings" @@ -30,7 +29,7 @@ type Dispatcher struct { output io.Writer verbose bool - listPool sync.Pool + queuePool sync.Pool mu sync.Mutex statsMu sync.Mutex wg sync.WaitGroup @@ -60,6 +59,7 @@ func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, o output: output, verbose: verbose, } + d.queuePool.New = func() any { return NewQueue[documentOp]() } d.start() return d } @@ -112,7 +112,6 @@ func (d *Dispatcher) start() { if d.started { return } - d.listPool.New = func() any { return list.New() } d.ready = make(chan documentOp, 4096) d.results = make(chan documentOp, 4096) d.msgs = make(chan string, 4096) @@ -172,6 +171,7 @@ func (d *Dispatcher) dispatchNext(id Id) { } else { // no more operations with this ID: release slot delete(d.inflight, k) + d.queuePool.Put(q) d.releaseSlot() } } @@ -196,7 +196,7 @@ func (d *Dispatcher) enqueue(op documentOp, isRetry bool) error { key := op.document.Id.String() q, ok := d.inflight[key] if !ok { - q = NewQueue[documentOp](&d.listPool) + q = d.queuePool.Get().(*Queue[documentOp]) d.inflight[key] = q } else { q.Add(op, isRetry) diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go index 8bc9499df87..f4e4ec694c8 100644 --- a/client/go/internal/vespa/document/http.go +++ b/client/go/internal/vespa/document/http.go @@ -28,6 +28,7 @@ const ( // Client represents a HTTP client for the /document/v1/ API. type Client struct { + baseURL *url.URL options ClientOptions httpClients []countingHTTPClient now func() time.Time @@ -57,20 +58,13 @@ func (c *countingHTTPClient) Do(req *http.Request, timeout time.Duration) (*http return c.client.Do(req, timeout) } -type countingReader struct { - reader io.Reader - bytesRead int64 -} - -func (r *countingReader) Read(p []byte) (int, error) { - n, err := r.reader.Read(p) - r.bytesRead += int64(n) - return n, err -} - -func NewClient(options ClientOptions, httpClients []util.HTTPClient) *Client { +func NewClient(options ClientOptions, httpClients []util.HTTPClient) (*Client, error) { if len(httpClients) < 1 { - panic("need at least one HTTP client") + return nil, fmt.Errorf("need at least one HTTP client") + } + u, err := url.Parse(options.BaseURL) + if err != nil { + return nil, fmt.Errorf("invalid base url: %w", err) } countingClients := make([]countingHTTPClient, 0, len(httpClients)) for _, client := range httpClients { @@ -81,18 +75,20 @@ func NewClient(options ClientOptions, httpClients []util.HTTPClient) *Client { nowFunc = time.Now } c := &Client{ + baseURL: u, options: options, httpClients: countingClients, now: nowFunc, } c.gzippers.New = func() any { return gzip.NewWriter(io.Discard) } - return c + return c, nil } func (c *Client) queryParams() url.Values { params := url.Values{} - timeout := c.options.Timeout*11/10 + 1000 - params.Set("timeout", strconv.FormatInt(timeout.Milliseconds(), 10)+"ms") + if c.options.Timeout > 0 { + params.Set("timeout", strconv.FormatInt(c.options.Timeout.Milliseconds(), 10)+"ms") + } if c.options.Route != "" { params.Set("route", c.options.Route) } @@ -123,11 +119,7 @@ func urlPath(id Id) string { return sb.String() } -func (c *Client) feedURL(d Document, queryParams url.Values) (string, *url.URL, error) { - u, err := url.Parse(c.options.BaseURL) - if err != nil { - return "", nil, fmt.Errorf("invalid base url: %w", err) - } +func (c *Client) feedURL(d Document, queryParams url.Values) (string, *url.URL) { httpMethod := "" switch d.Operation { case OperationPut: @@ -143,9 +135,10 @@ func (c *Client) feedURL(d Document, queryParams url.Values) (string, *url.URL, if d.Create { queryParams.Set("create", "true") } + u := *c.baseURL u.Path = urlPath(d.Id) u.RawQuery = queryParams.Encode() - return httpMethod, u, nil + return httpMethod, &u } func (c *Client) leastBusyClient() *countingHTTPClient { @@ -177,6 +170,7 @@ func (c *Client) createRequest(method, url string, body []byte) (*http.Request, useGzip := c.options.Compression == CompressionGzip || (c.options.Compression == CompressionAuto && len(body) > 512) if useGzip { var buf bytes.Buffer + buf.Grow(1024) w := c.gzipWriter(&buf) if _, err := w.Write(body); err != nil { return nil, err @@ -200,19 +194,23 @@ func (c *Client) createRequest(method, url string, body []byte) (*http.Request, return req, nil } +func (c *Client) clientTimeout() time.Duration { + if c.options.Timeout < 1 { + return 190 * time.Second + } + return c.options.Timeout*11/10 + 1000 // slightly higher than the server-side timeout +} + // Send given document to the endpoint configured in this client. func (c *Client) Send(document Document) Result { start := c.now() result := Result{Id: document.Id, Stats: Stats{Requests: 1}} - method, url, err := c.feedURL(document, c.queryParams()) - if err != nil { - return resultWithErr(result, err) - } + method, url := c.feedURL(document, c.queryParams()) req, err := c.createRequest(method, url.String(), document.Body) if err != nil { return resultWithErr(result, err) } - resp, err := c.leastBusyClient().Do(req, 190*time.Second) + resp, err := c.leastBusyClient().Do(req, c.clientTimeout()) if err != nil { return resultWithErr(result, err) } @@ -246,16 +244,20 @@ func resultWithResponse(resp *http.Response, result Result, document Document, e Message string `json:"message"` Trace json.RawMessage `json:"trace"` } - cr := countingReader{reader: resp.Body} - jsonDec := json.NewDecoder(&cr) - if err := jsonDec.Decode(&body); err != nil { + b, err := io.ReadAll(resp.Body) + if err != nil { result.Status = StatusVespaFailure - result.Err = fmt.Errorf("failed to decode json response: %w", err) + result.Err = err + } else { + if err := json.Unmarshal(b, &body); err != nil { + result.Status = StatusVespaFailure + result.Err = fmt.Errorf("failed to decode json response: %w", err) + } } result.Message = body.Message result.Trace = string(body.Trace) result.Stats.BytesSent = int64(len(document.Body)) - result.Stats.BytesRecv = cr.bytesRead + result.Stats.BytesRecv = int64(len(b)) if !result.Success() { result.Stats.Errors++ } diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go index f67368b5128..61ead1ba9c3 100644 --- a/client/go/internal/vespa/document/http_test.go +++ b/client/go/internal/vespa/document/http_test.go @@ -38,7 +38,7 @@ func TestLeastBusyClient(t *testing.T) { for i := 0; i < 4; i++ { httpClients = append(httpClients, &mockHTTPClient{i, &httpClient}) } - client := NewClient(ClientOptions{}, httpClients) + client, _ := NewClient(ClientOptions{}, httpClients) client.httpClients[0].addInflight(1) client.httpClients[1].addInflight(1) assertLeastBusy(t, 2, client) @@ -65,7 +65,7 @@ func TestClientSend(t *testing.T) { {Create: true, Id: mustParseId("id:ns:type::doc3"), Operation: OperationUpdate, Body: []byte(`{"fields":{"baz": "789"}}`)}, } httpClient := mock.HTTPClient{} - client := NewClient(ClientOptions{ + client, _ := NewClient(ClientOptions{ BaseURL: "https://example.com:1337", Timeout: time.Duration(5 * time.Second), }, []util.HTTPClient{&httpClient}) @@ -109,7 +109,7 @@ func TestClientSend(t *testing.T) { if r.Method != http.MethodPut { t.Errorf("got r.Method = %q, want %q", r.Method, http.MethodPut) } - wantURL := fmt.Sprintf("https://example.com:1337/document/v1/ns/type/docid/%s?create=true&timeout=5500ms", doc.Id.UserSpecific) + wantURL := fmt.Sprintf("https://example.com:1337/document/v1/ns/type/docid/%s?create=true&timeout=5000ms", doc.Id.UserSpecific) if r.URL.String() != wantURL { t.Errorf("got r.URL = %q, want %q", r.URL, wantURL) } @@ -144,7 +144,7 @@ func TestClientSend(t *testing.T) { func TestClientSendCompressed(t *testing.T) { httpClient := mock.HTTPClient{} - client := NewClient(ClientOptions{ + client, _ := NewClient(ClientOptions{ BaseURL: "https://example.com:1337", Timeout: time.Duration(5 * time.Second), }, []util.HTTPClient{&httpClient}) @@ -278,16 +278,13 @@ func TestClientFeedURL(t *testing.T) { }, } httpClient := mock.HTTPClient{} - client := NewClient(ClientOptions{ + client, _ := NewClient(ClientOptions{ BaseURL: "https://example.com", }, []util.HTTPClient{&httpClient}) for i, tt := range tests { moreParams := url.Values{} moreParams.Set("foo", "ba/r") - method, u, err := client.feedURL(tt.in, moreParams) - if err != nil { - t.Errorf("#%d: got unexpected error = %s, want none", i, err) - } + method, u := client.feedURL(tt.in, moreParams) if u.String() != tt.url || method != tt.method { t.Errorf("#%d: URL() = (%s, %s), want (%s, %s)", i, method, u.String(), tt.method, tt.url) } @@ -296,7 +293,7 @@ func TestClientFeedURL(t *testing.T) { func benchmarkClientSend(b *testing.B, compression Compression, document Document) { httpClient := mock.HTTPClient{} - client := NewClient(ClientOptions{ + client, _ := NewClient(ClientOptions{ Compression: compression, BaseURL: "https://example.com:1337", Timeout: time.Duration(5 * time.Second), diff --git a/client/go/internal/vespa/document/queue.go b/client/go/internal/vespa/document/queue.go index 2e5a1976d58..eed5209ca9e 100644 --- a/client/go/internal/vespa/document/queue.go +++ b/client/go/internal/vespa/document/queue.go @@ -2,26 +2,14 @@ package document import ( "container/list" - "sync" ) -// Queue wraps a doubly linked list. It attempts to re-use lists through a sync.Pool to reduce GC pressure. -type Queue[T any] struct { - items *list.List - listPool *sync.Pool -} +// Queue is a generic wrapper around a doubly linked list. +type Queue[T any] struct{ items *list.List } -func NewQueue[T any](listPool *sync.Pool) *Queue[T] { - if listPool.New == nil { - listPool.New = func() any { return list.New() } - } - return &Queue[T]{listPool: listPool} -} +func NewQueue[T any]() *Queue[T] { return &Queue[T]{items: list.New()} } func (q *Queue[T]) Add(item T, front bool) { - if q.items == nil { - q.items = q.listPool.Get().(*list.List) - } if front { q.items.PushFront(item) } else { @@ -30,14 +18,10 @@ func (q *Queue[T]) Add(item T, front bool) { } func (q *Queue[T]) Poll() (T, bool) { - if q.items == nil || q.items.Front() == nil { + front := q.items.Front() + if front == nil { var empty T return empty, false } - item := q.items.Remove(q.items.Front()).(T) - if q.items.Front() == nil { // Emptied queue, release list back to pool - q.listPool.Put(q.items) - q.items = nil - } - return item, true + return q.items.Remove(front).(T), true } diff --git a/client/go/internal/vespa/document/queue_test.go b/client/go/internal/vespa/document/queue_test.go index 992e7410053..f2bdf416570 100644 --- a/client/go/internal/vespa/document/queue_test.go +++ b/client/go/internal/vespa/document/queue_test.go @@ -1,12 +1,11 @@ package document import ( - "sync" "testing" ) func TestQueue(t *testing.T) { - q := NewQueue[int](&sync.Pool{}) + q := NewQueue[int]() assertPoll(t, q, 0, false) q.Add(1, false) q.Add(2, false) |