diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-04-28 16:24:30 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-04-28 16:24:30 +0200 |
commit | a0eae946b01b3935496ef520886ffbff1d7dffc2 (patch) | |
tree | fbfd370c47bade332803466e58c66571e21aa8ef | |
parent | d8f8731f6e91337f241912f71cab12e5c3febf00 (diff) | |
parent | 61e6128c410d21624ff5b6bd3b4c2abc77214045 (diff) |
Merge pull request #26911 from vespa-engine/mpolden/reuse-buffers
Re-use response buffers
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 2 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http.go | 110 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http_test.go | 101 |
3 files changed, 105 insertions, 108 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 9d0a0f7cdb1..00b7dbb411a 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -86,8 +86,8 @@ func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool { var msg strings.Builder msg.WriteString("feed: ") msg.WriteString(op.document.String()) + msg.WriteString(" failed: ") if result.Err != nil { - msg.WriteString("error ") msg.WriteString(result.Err.Error()) } else { msg.WriteString(fmt.Sprintf("status %d", result.HTTPStatus)) diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go index f4e4ec694c8..d42615d1e71 100644 --- a/client/go/internal/vespa/document/http.go +++ b/client/go/internal/vespa/document/http.go @@ -28,12 +28,12 @@ const ( // Client represents a HTTP client for the /document/v1/ API. type Client struct { - baseURL *url.URL options ClientOptions httpClients []countingHTTPClient now func() time.Time sendCount int32 gzippers sync.Pool + buffers sync.Pool } // ClientOptions specifices the configuration options of a feed client. @@ -62,7 +62,7 @@ func NewClient(options ClientOptions, httpClients []util.HTTPClient) (*Client, e if len(httpClients) < 1 { return nil, fmt.Errorf("need at least one HTTP client") } - u, err := url.Parse(options.BaseURL) + _, err := url.Parse(options.BaseURL) if err != nil { return nil, fmt.Errorf("invalid base url: %w", err) } @@ -75,51 +75,27 @@ func NewClient(options ClientOptions, httpClients []util.HTTPClient) (*Client, e nowFunc = time.Now } c := &Client{ - baseURL: u, options: options, httpClients: countingClients, now: nowFunc, } c.gzippers.New = func() any { return gzip.NewWriter(io.Discard) } + c.buffers.New = func() any { return &bytes.Buffer{} } return c, nil } -func (c *Client) queryParams() url.Values { - params := url.Values{} - if c.options.Timeout > 0 { - params.Set("timeout", strconv.FormatInt(c.options.Timeout.Milliseconds(), 10)+"ms") - } - if c.options.Route != "" { - params.Set("route", c.options.Route) - } - if c.options.TraceLevel > 0 { - params.Set("tracelevel", strconv.Itoa(c.options.TraceLevel)) - } - return params -} - -func urlPath(id Id) string { - var sb strings.Builder - sb.WriteString("/document/v1/") - sb.WriteString(url.PathEscape(id.Namespace)) - sb.WriteString("/") - sb.WriteString(url.PathEscape(id.Type)) - if id.Number != nil { - sb.WriteString("/number/") - n := uint64(*id.Number) - sb.WriteString(strconv.FormatUint(n, 10)) - } else if id.Group != "" { - sb.WriteString("/group/") - sb.WriteString(url.PathEscape(id.Group)) +func writeQueryParam(sb *strings.Builder, start int, k, v string) { + if sb.Len() == start { + sb.WriteString("?") } else { - sb.WriteString("/docid") + sb.WriteString("&") } - sb.WriteString("/") - sb.WriteString(url.PathEscape(id.UserSpecific)) - return sb.String() + sb.WriteString(k) + sb.WriteString("=") + sb.WriteString(url.QueryEscape(v)) } -func (c *Client) feedURL(d Document, queryParams url.Values) (string, *url.URL) { +func (c *Client) methodAndURL(d Document) (string, string) { httpMethod := "" switch d.Operation { case OperationPut: @@ -129,16 +105,46 @@ func (c *Client) feedURL(d Document, queryParams url.Values) (string, *url.URL) case OperationRemove: httpMethod = "DELETE" } + var sb strings.Builder + // Base URL and path + sb.WriteString(c.options.BaseURL) + if !strings.HasSuffix(c.options.BaseURL, "/") { + sb.WriteString("/") + } + sb.WriteString("document/v1/") + sb.WriteString(url.PathEscape(d.Id.Namespace)) + sb.WriteString("/") + sb.WriteString(url.PathEscape(d.Id.Type)) + if d.Id.Number != nil { + sb.WriteString("/number/") + n := uint64(*d.Id.Number) + sb.WriteString(strconv.FormatUint(n, 10)) + } else if d.Id.Group != "" { + sb.WriteString("/group/") + sb.WriteString(url.PathEscape(d.Id.Group)) + } else { + sb.WriteString("/docid") + } + sb.WriteString("/") + sb.WriteString(url.PathEscape(d.Id.UserSpecific)) + // Query part + queryStart := sb.Len() + if c.options.Timeout > 0 { + writeQueryParam(&sb, queryStart, "timeout", strconv.FormatInt(c.options.Timeout.Milliseconds(), 10)+"ms") + } + if c.options.Route != "" { + writeQueryParam(&sb, queryStart, "route", c.options.Route) + } + if c.options.TraceLevel > 0 { + writeQueryParam(&sb, queryStart, "tracelevel", strconv.Itoa(c.options.TraceLevel)) + } if d.Condition != "" { - queryParams.Set("condition", d.Condition) + writeQueryParam(&sb, queryStart, "condition", d.Condition) } if d.Create { - queryParams.Set("create", "true") + writeQueryParam(&sb, queryStart, "create", "true") } - u := *c.baseURL - u.Path = urlPath(d.Id) - u.RawQuery = queryParams.Encode() - return httpMethod, &u + return httpMethod, sb.String() } func (c *Client) leastBusyClient() *countingHTTPClient { @@ -165,6 +171,12 @@ func (c *Client) gzipWriter(w io.Writer) *gzip.Writer { return gzipWriter } +func (c *Client) buffer() *bytes.Buffer { + buf := c.buffers.Get().(*bytes.Buffer) + buf.Reset() + return buf +} + func (c *Client) createRequest(method, url string, body []byte) (*http.Request, error) { var r io.Reader useGzip := c.options.Compression == CompressionGzip || (c.options.Compression == CompressionAuto && len(body) > 512) @@ -205,8 +217,8 @@ func (c *Client) clientTimeout() time.Duration { func (c *Client) Send(document Document) Result { start := c.now() result := Result{Id: document.Id, Stats: Stats{Requests: 1}} - method, url := c.feedURL(document, c.queryParams()) - req, err := c.createRequest(method, url.String(), document.Body) + method, url := c.methodAndURL(document) + req, err := c.createRequest(method, url, document.Body) if err != nil { return resultWithErr(result, err) } @@ -216,7 +228,7 @@ func (c *Client) Send(document Document) Result { } defer resp.Body.Close() elapsed := c.now().Sub(start) - return resultWithResponse(resp, result, document, elapsed) + return c.resultWithResponse(resp, result, document, elapsed) } func resultWithErr(result Result, err error) Result { @@ -226,7 +238,7 @@ func resultWithErr(result Result, err error) Result { return result } -func resultWithResponse(resp *http.Response, result Result, document Document, elapsed time.Duration) Result { +func (c *Client) resultWithResponse(resp *http.Response, result Result, document Document, elapsed time.Duration) Result { result.HTTPStatus = resp.StatusCode result.Stats.Responses++ result.Stats.ResponsesByCode = map[int]int64{resp.StatusCode: 1} @@ -244,12 +256,14 @@ func resultWithResponse(resp *http.Response, result Result, document Document, e Message string `json:"message"` Trace json.RawMessage `json:"trace"` } - b, err := io.ReadAll(resp.Body) + buf := c.buffer() + defer c.buffers.Put(buf) + written, err := io.Copy(buf, resp.Body) if err != nil { result.Status = StatusVespaFailure result.Err = err } else { - if err := json.Unmarshal(b, &body); err != nil { + if err := json.Unmarshal(buf.Bytes(), &body); err != nil { result.Status = StatusVespaFailure result.Err = fmt.Errorf("failed to decode json response: %w", err) } @@ -257,7 +271,7 @@ func resultWithResponse(resp *http.Response, result Result, document Document, e result.Message = body.Message result.Trace = string(body.Trace) result.Stats.BytesSent = int64(len(document.Body)) - result.Stats.BytesRecv = int64(len(b)) + result.Stats.BytesRecv = int64(written) if !result.Success() { result.Stats.Errors++ } diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go index 61ead1ba9c3..7e858f96020 100644 --- a/client/go/internal/vespa/document/http_test.go +++ b/client/go/internal/vespa/document/http_test.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "net/http" - "net/url" "reflect" "strings" "testing" @@ -109,7 +108,7 @@ func TestClientSend(t *testing.T) { if r.Method != http.MethodPut { t.Errorf("got r.Method = %q, want %q", r.Method, http.MethodPut) } - wantURL := fmt.Sprintf("https://example.com:1337/document/v1/ns/type/docid/%s?create=true&timeout=5000ms", doc.Id.UserSpecific) + wantURL := fmt.Sprintf("https://example.com:1337/document/v1/ns/type/docid/%s?timeout=5000ms&create=true", doc.Id.UserSpecific) if r.URL.String() != wantURL { t.Errorf("got r.URL = %q, want %q", r.URL, wantURL) } @@ -191,72 +190,53 @@ func assertCompressedRequest(t *testing.T, want bool, request *http.Request) { } } -func TestURLPath(t *testing.T) { +func TestClientMethodAndURL(t *testing.T) { tests := []struct { - in Id - out string + in Document + options ClientOptions + method string + url string }{ { - Id{ - Namespace: "ns-with-/", - Type: "type-with-/", - UserSpecific: "user", + Document{ + Id: mustParseId("id:ns:type:n=123:user"), }, - "/document/v1/ns-with-%2F/type-with-%2F/docid/user", + ClientOptions{}, + "POST", + "https://example.com/document/v1/ns/type/number/123/user", }, { - Id{ - Namespace: "ns", - Type: "type", - Number: ptr(int64(123)), - UserSpecific: "user", + Document{ + Id: mustParseId("id:ns:type:g=foo:user"), }, - "/document/v1/ns/type/number/123/user", + ClientOptions{}, + "POST", + "https://example.com/document/v1/ns/type/group/foo/user", }, { - Id{ - Namespace: "ns", - Type: "type", - Group: "foo", - UserSpecific: "user", + Document{ + Id: mustParseId("id:ns:type::user::specific"), }, - "/document/v1/ns/type/group/foo/user", + ClientOptions{}, + "POST", + "https://example.com/document/v1/ns/type/docid/user::specific", }, { - Id{ - Namespace: "ns", - Type: "type", - UserSpecific: "user::specific", + Document{ + Id: mustParseId("id:ns:type:::"), }, - "/document/v1/ns/type/docid/user::specific", + ClientOptions{Route: "elsewhere"}, + "POST", + "https://example.com/document/v1/ns/type/docid/:?route=elsewhere", }, { - Id{ - Namespace: "ns", - Type: "type", - UserSpecific: ":", + Document{ + Id: mustParseId("id:ns:type-with-/::user"), + Condition: "foo/bar", }, - "/document/v1/ns/type/docid/:", - }, - } - for i, tt := range tests { - path := urlPath(tt.in) - if path != tt.out { - t.Errorf("#%d: documentPath(%q) = %s, want %s", i, tt.in, path, tt.out) - } - } -} - -func TestClientFeedURL(t *testing.T) { - tests := []struct { - in Document - method string - url string - }{ - { - Document{Id: mustParseId("id:ns:type::user")}, + ClientOptions{}, "POST", - "https://example.com/document/v1/ns/type/docid/user?foo=ba%2Fr", + "https://example.com/document/v1/ns/type-with-%2F/docid/user?condition=foo%2Fbar", }, { Document{ @@ -265,28 +245,31 @@ func TestClientFeedURL(t *testing.T) { Create: true, Condition: "false", }, + ClientOptions{Timeout: 10 * time.Second, TraceLevel: 5}, "PUT", - "https://example.com/document/v1/ns/type/docid/user?condition=false&create=true&foo=ba%2Fr", + "https://example.com/document/v1/ns/type/docid/user?timeout=10000ms&tracelevel=5&condition=false&create=true", }, { Document{ Id: mustParseId("id:ns:type::user"), Operation: OperationRemove, }, + ClientOptions{}, "DELETE", - "https://example.com/document/v1/ns/type/docid/user?foo=ba%2Fr", + "https://example.com/document/v1/ns/type/docid/user", }, } httpClient := mock.HTTPClient{} client, _ := NewClient(ClientOptions{ - BaseURL: "https://example.com", + BaseURL: "https://example.com/", }, []util.HTTPClient{&httpClient}) for i, tt := range tests { - moreParams := url.Values{} - moreParams.Set("foo", "ba/r") - method, u := client.feedURL(tt.in, moreParams) - if u.String() != tt.url || method != tt.method { - t.Errorf("#%d: URL() = (%s, %s), want (%s, %s)", i, method, u.String(), tt.method, tt.url) + client.options.Timeout = tt.options.Timeout + client.options.Route = tt.options.Route + client.options.TraceLevel = tt.options.TraceLevel + method, url := client.methodAndURL(tt.in) + if url != tt.url || method != tt.method { + t.Errorf("#%d: methodAndURL(doc) = (%s, %s), want (%s, %s)", i, method, url, tt.method, tt.url) } } } |