aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-05-12 11:51:57 +0200
committerMartin Polden <mpolden@mpolden.no>2023-05-12 12:05:27 +0200
commit605cf17ca394f832bb7c5b9c63615f35c67d7c6a (patch)
tree5e7c468e4aad9f6dc129511f9df1a5604bb11950
parent32c7150b4a1bad2dcb556ad74f80a3081be9e18e (diff)
Prepare requests using a dedicated worker pool
-rw-r--r--client/go/internal/vespa/document/http.go33
1 files changed, 31 insertions, 2 deletions
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go
index 319512458c7..d31663d38d7 100644
--- a/client/go/internal/vespa/document/http.go
+++ b/client/go/internal/vespa/document/http.go
@@ -9,6 +9,7 @@ import (
"math"
"net/http"
"net/url"
+ "runtime"
"strconv"
"strings"
"sync"
@@ -41,6 +42,7 @@ type Client struct {
sendCount int32
gzippers sync.Pool
buffers sync.Pool
+ pending chan *pendingDocument
}
// ClientOptions specifices the configuration options of a feed client.
@@ -79,6 +81,15 @@ func (c *countingHTTPClient) Do(req *http.Request, timeout time.Duration) (*http
return c.client.Do(req, timeout)
}
+type pendingDocument struct {
+ document Document
+ prepared chan bool
+
+ request *http.Request
+ body *countingReader
+ err error
+}
+
func NewClient(options ClientOptions, httpClients []util.HTTPClient) (*Client, error) {
if len(httpClients) < 1 {
return nil, fmt.Errorf("need at least one HTTP client")
@@ -99,9 +110,13 @@ func NewClient(options ClientOptions, httpClients []util.HTTPClient) (*Client, e
options: options,
httpClients: countingClients,
now: nowFunc,
+ pending: make(chan *pendingDocument, 4096),
}
c.gzippers.New = func() any { return gzip.NewWriter(io.Discard) }
c.buffers.New = func() any { return &bytes.Buffer{} }
+ for i := 0; i < runtime.NumCPU(); i++ {
+ go c.preparePending()
+ }
return c, nil
}
@@ -214,6 +229,21 @@ func (c *Client) buffer() *bytes.Buffer {
return buf
}
+func (c *Client) preparePending() {
+ for pd := range c.pending {
+ method, url := c.methodAndURL(pd.document)
+ pd.request, pd.body, pd.err = c.createRequest(method, url, pd.document.Fields)
+ pd.prepared <- true
+ }
+}
+
+func (c *Client) prepare(document Document) (*http.Request, *countingReader, error) {
+ pd := pendingDocument{document: document, prepared: make(chan bool)}
+ c.pending <- &pd
+ <-pd.prepared
+ return pd.request, pd.body, pd.err
+}
+
func (c *Client) createRequest(method, url string, body []byte) (*http.Request, *countingReader, error) {
if len(body) == 0 {
req, err := http.NewRequest(method, url, nil)
@@ -260,8 +290,7 @@ func (c *Client) clientTimeout() time.Duration {
func (c *Client) Send(document Document) Result {
start := c.now()
result := Result{Id: document.Id, Stats: Stats{Requests: 1}}
- method, url := c.methodAndURL(document)
- req, cr, err := c.createRequest(method, url, document.Fields)
+ req, cr, err := c.prepare(document)
if err != nil {
return resultWithErr(result, err)
}