diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-05-12 11:51:57 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-05-12 12:05:27 +0200 |
commit | 605cf17ca394f832bb7c5b9c63615f35c67d7c6a (patch) | |
tree | 5e7c468e4aad9f6dc129511f9df1a5604bb11950 | |
parent | 32c7150b4a1bad2dcb556ad74f80a3081be9e18e (diff) |
Prepare requests using a dedicated worker pool
-rw-r--r-- | client/go/internal/vespa/document/http.go | 33 |
1 files changed, 31 insertions, 2 deletions
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go index 319512458c7..d31663d38d7 100644 --- a/client/go/internal/vespa/document/http.go +++ b/client/go/internal/vespa/document/http.go @@ -9,6 +9,7 @@ import ( "math" "net/http" "net/url" + "runtime" "strconv" "strings" "sync" @@ -41,6 +42,7 @@ type Client struct { sendCount int32 gzippers sync.Pool buffers sync.Pool + pending chan *pendingDocument } // ClientOptions specifices the configuration options of a feed client. @@ -79,6 +81,15 @@ func (c *countingHTTPClient) Do(req *http.Request, timeout time.Duration) (*http return c.client.Do(req, timeout) } +type pendingDocument struct { + document Document + prepared chan bool + + request *http.Request + body *countingReader + err error +} + func NewClient(options ClientOptions, httpClients []util.HTTPClient) (*Client, error) { if len(httpClients) < 1 { return nil, fmt.Errorf("need at least one HTTP client") @@ -99,9 +110,13 @@ func NewClient(options ClientOptions, httpClients []util.HTTPClient) (*Client, e options: options, httpClients: countingClients, now: nowFunc, + pending: make(chan *pendingDocument, 4096), } c.gzippers.New = func() any { return gzip.NewWriter(io.Discard) } c.buffers.New = func() any { return &bytes.Buffer{} } + for i := 0; i < runtime.NumCPU(); i++ { + go c.preparePending() + } return c, nil } @@ -214,6 +229,21 @@ func (c *Client) buffer() *bytes.Buffer { return buf } +func (c *Client) preparePending() { + for pd := range c.pending { + method, url := c.methodAndURL(pd.document) + pd.request, pd.body, pd.err = c.createRequest(method, url, pd.document.Fields) + pd.prepared <- true + } +} + +func (c *Client) prepare(document Document) (*http.Request, *countingReader, error) { + pd := pendingDocument{document: document, prepared: make(chan bool)} + c.pending <- &pd + <-pd.prepared + return pd.request, pd.body, pd.err +} + func (c *Client) createRequest(method, url string, body []byte) (*http.Request, *countingReader, error) { if len(body) == 0 { req, err := http.NewRequest(method, url, nil) @@ -260,8 +290,7 @@ func (c *Client) clientTimeout() time.Duration { func (c *Client) Send(document Document) Result { start := c.now() result := Result{Id: document.Id, Stats: Stats{Requests: 1}} - method, url := c.methodAndURL(document) - req, cr, err := c.createRequest(method, url, document.Fields) + req, cr, err := c.prepare(document) if err != nil { return resultWithErr(result, err) } |