summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-04-28 16:24:30 +0200
committerGitHub <noreply@github.com>2023-04-28 16:24:30 +0200
commita0eae946b01b3935496ef520886ffbff1d7dffc2 (patch)
treefbfd370c47bade332803466e58c66571e21aa8ef
parentd8f8731f6e91337f241912f71cab12e5c3febf00 (diff)
parent61e6128c410d21624ff5b6bd3b4c2abc77214045 (diff)
Merge pull request #26911 from vespa-engine/mpolden/reuse-buffers
Re-use response buffers
-rw-r--r--client/go/internal/vespa/document/dispatcher.go2
-rw-r--r--client/go/internal/vespa/document/http.go110
-rw-r--r--client/go/internal/vespa/document/http_test.go101
3 files changed, 105 insertions, 108 deletions
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index 9d0a0f7cdb1..00b7dbb411a 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -86,8 +86,8 @@ func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool {
var msg strings.Builder
msg.WriteString("feed: ")
msg.WriteString(op.document.String())
+ msg.WriteString(" failed: ")
if result.Err != nil {
- msg.WriteString("error ")
msg.WriteString(result.Err.Error())
} else {
msg.WriteString(fmt.Sprintf("status %d", result.HTTPStatus))
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go
index f4e4ec694c8..d42615d1e71 100644
--- a/client/go/internal/vespa/document/http.go
+++ b/client/go/internal/vespa/document/http.go
@@ -28,12 +28,12 @@ const (
// Client represents a HTTP client for the /document/v1/ API.
type Client struct {
- baseURL *url.URL
options ClientOptions
httpClients []countingHTTPClient
now func() time.Time
sendCount int32
gzippers sync.Pool
+ buffers sync.Pool
}
// ClientOptions specifices the configuration options of a feed client.
@@ -62,7 +62,7 @@ func NewClient(options ClientOptions, httpClients []util.HTTPClient) (*Client, e
if len(httpClients) < 1 {
return nil, fmt.Errorf("need at least one HTTP client")
}
- u, err := url.Parse(options.BaseURL)
+ _, err := url.Parse(options.BaseURL)
if err != nil {
return nil, fmt.Errorf("invalid base url: %w", err)
}
@@ -75,51 +75,27 @@ func NewClient(options ClientOptions, httpClients []util.HTTPClient) (*Client, e
nowFunc = time.Now
}
c := &Client{
- baseURL: u,
options: options,
httpClients: countingClients,
now: nowFunc,
}
c.gzippers.New = func() any { return gzip.NewWriter(io.Discard) }
+ c.buffers.New = func() any { return &bytes.Buffer{} }
return c, nil
}
-func (c *Client) queryParams() url.Values {
- params := url.Values{}
- if c.options.Timeout > 0 {
- params.Set("timeout", strconv.FormatInt(c.options.Timeout.Milliseconds(), 10)+"ms")
- }
- if c.options.Route != "" {
- params.Set("route", c.options.Route)
- }
- if c.options.TraceLevel > 0 {
- params.Set("tracelevel", strconv.Itoa(c.options.TraceLevel))
- }
- return params
-}
-
-func urlPath(id Id) string {
- var sb strings.Builder
- sb.WriteString("/document/v1/")
- sb.WriteString(url.PathEscape(id.Namespace))
- sb.WriteString("/")
- sb.WriteString(url.PathEscape(id.Type))
- if id.Number != nil {
- sb.WriteString("/number/")
- n := uint64(*id.Number)
- sb.WriteString(strconv.FormatUint(n, 10))
- } else if id.Group != "" {
- sb.WriteString("/group/")
- sb.WriteString(url.PathEscape(id.Group))
+func writeQueryParam(sb *strings.Builder, start int, k, v string) {
+ if sb.Len() == start {
+ sb.WriteString("?")
} else {
- sb.WriteString("/docid")
+ sb.WriteString("&")
}
- sb.WriteString("/")
- sb.WriteString(url.PathEscape(id.UserSpecific))
- return sb.String()
+ sb.WriteString(k)
+ sb.WriteString("=")
+ sb.WriteString(url.QueryEscape(v))
}
-func (c *Client) feedURL(d Document, queryParams url.Values) (string, *url.URL) {
+func (c *Client) methodAndURL(d Document) (string, string) {
httpMethod := ""
switch d.Operation {
case OperationPut:
@@ -129,16 +105,46 @@ func (c *Client) feedURL(d Document, queryParams url.Values) (string, *url.URL)
case OperationRemove:
httpMethod = "DELETE"
}
+ var sb strings.Builder
+ // Base URL and path
+ sb.WriteString(c.options.BaseURL)
+ if !strings.HasSuffix(c.options.BaseURL, "/") {
+ sb.WriteString("/")
+ }
+ sb.WriteString("document/v1/")
+ sb.WriteString(url.PathEscape(d.Id.Namespace))
+ sb.WriteString("/")
+ sb.WriteString(url.PathEscape(d.Id.Type))
+ if d.Id.Number != nil {
+ sb.WriteString("/number/")
+ n := uint64(*d.Id.Number)
+ sb.WriteString(strconv.FormatUint(n, 10))
+ } else if d.Id.Group != "" {
+ sb.WriteString("/group/")
+ sb.WriteString(url.PathEscape(d.Id.Group))
+ } else {
+ sb.WriteString("/docid")
+ }
+ sb.WriteString("/")
+ sb.WriteString(url.PathEscape(d.Id.UserSpecific))
+ // Query part
+ queryStart := sb.Len()
+ if c.options.Timeout > 0 {
+ writeQueryParam(&sb, queryStart, "timeout", strconv.FormatInt(c.options.Timeout.Milliseconds(), 10)+"ms")
+ }
+ if c.options.Route != "" {
+ writeQueryParam(&sb, queryStart, "route", c.options.Route)
+ }
+ if c.options.TraceLevel > 0 {
+ writeQueryParam(&sb, queryStart, "tracelevel", strconv.Itoa(c.options.TraceLevel))
+ }
if d.Condition != "" {
- queryParams.Set("condition", d.Condition)
+ writeQueryParam(&sb, queryStart, "condition", d.Condition)
}
if d.Create {
- queryParams.Set("create", "true")
+ writeQueryParam(&sb, queryStart, "create", "true")
}
- u := *c.baseURL
- u.Path = urlPath(d.Id)
- u.RawQuery = queryParams.Encode()
- return httpMethod, &u
+ return httpMethod, sb.String()
}
func (c *Client) leastBusyClient() *countingHTTPClient {
@@ -165,6 +171,12 @@ func (c *Client) gzipWriter(w io.Writer) *gzip.Writer {
return gzipWriter
}
+func (c *Client) buffer() *bytes.Buffer {
+ buf := c.buffers.Get().(*bytes.Buffer)
+ buf.Reset()
+ return buf
+}
+
func (c *Client) createRequest(method, url string, body []byte) (*http.Request, error) {
var r io.Reader
useGzip := c.options.Compression == CompressionGzip || (c.options.Compression == CompressionAuto && len(body) > 512)
@@ -205,8 +217,8 @@ func (c *Client) clientTimeout() time.Duration {
func (c *Client) Send(document Document) Result {
start := c.now()
result := Result{Id: document.Id, Stats: Stats{Requests: 1}}
- method, url := c.feedURL(document, c.queryParams())
- req, err := c.createRequest(method, url.String(), document.Body)
+ method, url := c.methodAndURL(document)
+ req, err := c.createRequest(method, url, document.Body)
if err != nil {
return resultWithErr(result, err)
}
@@ -216,7 +228,7 @@ func (c *Client) Send(document Document) Result {
}
defer resp.Body.Close()
elapsed := c.now().Sub(start)
- return resultWithResponse(resp, result, document, elapsed)
+ return c.resultWithResponse(resp, result, document, elapsed)
}
func resultWithErr(result Result, err error) Result {
@@ -226,7 +238,7 @@ func resultWithErr(result Result, err error) Result {
return result
}
-func resultWithResponse(resp *http.Response, result Result, document Document, elapsed time.Duration) Result {
+func (c *Client) resultWithResponse(resp *http.Response, result Result, document Document, elapsed time.Duration) Result {
result.HTTPStatus = resp.StatusCode
result.Stats.Responses++
result.Stats.ResponsesByCode = map[int]int64{resp.StatusCode: 1}
@@ -244,12 +256,14 @@ func resultWithResponse(resp *http.Response, result Result, document Document, e
Message string `json:"message"`
Trace json.RawMessage `json:"trace"`
}
- b, err := io.ReadAll(resp.Body)
+ buf := c.buffer()
+ defer c.buffers.Put(buf)
+ written, err := io.Copy(buf, resp.Body)
if err != nil {
result.Status = StatusVespaFailure
result.Err = err
} else {
- if err := json.Unmarshal(b, &body); err != nil {
+ if err := json.Unmarshal(buf.Bytes(), &body); err != nil {
result.Status = StatusVespaFailure
result.Err = fmt.Errorf("failed to decode json response: %w", err)
}
@@ -257,7 +271,7 @@ func resultWithResponse(resp *http.Response, result Result, document Document, e
result.Message = body.Message
result.Trace = string(body.Trace)
result.Stats.BytesSent = int64(len(document.Body))
- result.Stats.BytesRecv = int64(len(b))
+ result.Stats.BytesRecv = int64(written)
if !result.Success() {
result.Stats.Errors++
}
diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go
index 61ead1ba9c3..7e858f96020 100644
--- a/client/go/internal/vespa/document/http_test.go
+++ b/client/go/internal/vespa/document/http_test.go
@@ -5,7 +5,6 @@ import (
"fmt"
"io"
"net/http"
- "net/url"
"reflect"
"strings"
"testing"
@@ -109,7 +108,7 @@ func TestClientSend(t *testing.T) {
if r.Method != http.MethodPut {
t.Errorf("got r.Method = %q, want %q", r.Method, http.MethodPut)
}
- wantURL := fmt.Sprintf("https://example.com:1337/document/v1/ns/type/docid/%s?create=true&timeout=5000ms", doc.Id.UserSpecific)
+ wantURL := fmt.Sprintf("https://example.com:1337/document/v1/ns/type/docid/%s?timeout=5000ms&create=true", doc.Id.UserSpecific)
if r.URL.String() != wantURL {
t.Errorf("got r.URL = %q, want %q", r.URL, wantURL)
}
@@ -191,72 +190,53 @@ func assertCompressedRequest(t *testing.T, want bool, request *http.Request) {
}
}
-func TestURLPath(t *testing.T) {
+func TestClientMethodAndURL(t *testing.T) {
tests := []struct {
- in Id
- out string
+ in Document
+ options ClientOptions
+ method string
+ url string
}{
{
- Id{
- Namespace: "ns-with-/",
- Type: "type-with-/",
- UserSpecific: "user",
+ Document{
+ Id: mustParseId("id:ns:type:n=123:user"),
},
- "/document/v1/ns-with-%2F/type-with-%2F/docid/user",
+ ClientOptions{},
+ "POST",
+ "https://example.com/document/v1/ns/type/number/123/user",
},
{
- Id{
- Namespace: "ns",
- Type: "type",
- Number: ptr(int64(123)),
- UserSpecific: "user",
+ Document{
+ Id: mustParseId("id:ns:type:g=foo:user"),
},
- "/document/v1/ns/type/number/123/user",
+ ClientOptions{},
+ "POST",
+ "https://example.com/document/v1/ns/type/group/foo/user",
},
{
- Id{
- Namespace: "ns",
- Type: "type",
- Group: "foo",
- UserSpecific: "user",
+ Document{
+ Id: mustParseId("id:ns:type::user::specific"),
},
- "/document/v1/ns/type/group/foo/user",
+ ClientOptions{},
+ "POST",
+ "https://example.com/document/v1/ns/type/docid/user::specific",
},
{
- Id{
- Namespace: "ns",
- Type: "type",
- UserSpecific: "user::specific",
+ Document{
+ Id: mustParseId("id:ns:type:::"),
},
- "/document/v1/ns/type/docid/user::specific",
+ ClientOptions{Route: "elsewhere"},
+ "POST",
+ "https://example.com/document/v1/ns/type/docid/:?route=elsewhere",
},
{
- Id{
- Namespace: "ns",
- Type: "type",
- UserSpecific: ":",
+ Document{
+ Id: mustParseId("id:ns:type-with-/::user"),
+ Condition: "foo/bar",
},
- "/document/v1/ns/type/docid/:",
- },
- }
- for i, tt := range tests {
- path := urlPath(tt.in)
- if path != tt.out {
- t.Errorf("#%d: documentPath(%q) = %s, want %s", i, tt.in, path, tt.out)
- }
- }
-}
-
-func TestClientFeedURL(t *testing.T) {
- tests := []struct {
- in Document
- method string
- url string
- }{
- {
- Document{Id: mustParseId("id:ns:type::user")},
+ ClientOptions{},
"POST",
- "https://example.com/document/v1/ns/type/docid/user?foo=ba%2Fr",
+ "https://example.com/document/v1/ns/type-with-%2F/docid/user?condition=foo%2Fbar",
},
{
Document{
@@ -265,28 +245,31 @@ func TestClientFeedURL(t *testing.T) {
Create: true,
Condition: "false",
},
+ ClientOptions{Timeout: 10 * time.Second, TraceLevel: 5},
"PUT",
- "https://example.com/document/v1/ns/type/docid/user?condition=false&create=true&foo=ba%2Fr",
+ "https://example.com/document/v1/ns/type/docid/user?timeout=10000ms&tracelevel=5&condition=false&create=true",
},
{
Document{
Id: mustParseId("id:ns:type::user"),
Operation: OperationRemove,
},
+ ClientOptions{},
"DELETE",
- "https://example.com/document/v1/ns/type/docid/user?foo=ba%2Fr",
+ "https://example.com/document/v1/ns/type/docid/user",
},
}
httpClient := mock.HTTPClient{}
client, _ := NewClient(ClientOptions{
- BaseURL: "https://example.com",
+ BaseURL: "https://example.com/",
}, []util.HTTPClient{&httpClient})
for i, tt := range tests {
- moreParams := url.Values{}
- moreParams.Set("foo", "ba/r")
- method, u := client.feedURL(tt.in, moreParams)
- if u.String() != tt.url || method != tt.method {
- t.Errorf("#%d: URL() = (%s, %s), want (%s, %s)", i, method, u.String(), tt.method, tt.url)
+ client.options.Timeout = tt.options.Timeout
+ client.options.Route = tt.options.Route
+ client.options.TraceLevel = tt.options.TraceLevel
+ method, url := client.methodAndURL(tt.in)
+ if url != tt.url || method != tt.method {
+ t.Errorf("#%d: methodAndURL(doc) = (%s, %s), want (%s, %s)", i, method, url, tt.method, tt.url)
}
}
}