From 32c7150b4a1bad2dcb556ad74f80a3081be9e18e Mon Sep 17 00:00:00 2001 From: Martin Polden Date: Fri, 12 May 2023 11:36:28 +0200 Subject: Write periodic output to stderr --- client/go/internal/cli/cmd/feed.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index a8d8c9d284b..fa87c420f16 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -128,7 +128,7 @@ func summaryTicker(secs int, cli *CLI, start time.Time, statsFunc func() documen ticker := time.NewTicker(time.Duration(secs) * time.Second) go func() { for range ticker.C { - writeSummaryJSON(cli.Stdout, statsFunc(), cli.now().Sub(start)) + writeSummaryJSON(cli.Stderr, statsFunc(), cli.now().Sub(start)) } }() return ticker -- cgit v1.2.3 From 605cf17ca394f832bb7c5b9c63615f35c67d7c6a Mon Sep 17 00:00:00 2001 From: Martin Polden Date: Fri, 12 May 2023 11:51:57 +0200 Subject: Prepare requests using a dedicated worker pool --- client/go/internal/vespa/document/http.go | 33 +++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go index 319512458c7..d31663d38d7 100644 --- a/client/go/internal/vespa/document/http.go +++ b/client/go/internal/vespa/document/http.go @@ -9,6 +9,7 @@ import ( "math" "net/http" "net/url" + "runtime" "strconv" "strings" "sync" @@ -41,6 +42,7 @@ type Client struct { sendCount int32 gzippers sync.Pool buffers sync.Pool + pending chan *pendingDocument } // ClientOptions specifices the configuration options of a feed client. @@ -79,6 +81,15 @@ func (c *countingHTTPClient) Do(req *http.Request, timeout time.Duration) (*http return c.client.Do(req, timeout) } +type pendingDocument struct { + document Document + prepared chan bool + + request *http.Request + body *countingReader + err error +} + func NewClient(options ClientOptions, httpClients []util.HTTPClient) (*Client, error) { if len(httpClients) < 1 { return nil, fmt.Errorf("need at least one HTTP client") @@ -99,9 +110,13 @@ func NewClient(options ClientOptions, httpClients []util.HTTPClient) (*Client, e options: options, httpClients: countingClients, now: nowFunc, + pending: make(chan *pendingDocument, 4096), } c.gzippers.New = func() any { return gzip.NewWriter(io.Discard) } c.buffers.New = func() any { return &bytes.Buffer{} } + for i := 0; i < runtime.NumCPU(); i++ { + go c.preparePending() + } return c, nil } @@ -214,6 +229,21 @@ func (c *Client) buffer() *bytes.Buffer { return buf } +func (c *Client) preparePending() { + for pd := range c.pending { + method, url := c.methodAndURL(pd.document) + pd.request, pd.body, pd.err = c.createRequest(method, url, pd.document.Fields) + pd.prepared <- true + } +} + +func (c *Client) prepare(document Document) (*http.Request, *countingReader, error) { + pd := pendingDocument{document: document, prepared: make(chan bool)} + c.pending <- &pd + <-pd.prepared + return pd.request, pd.body, pd.err +} + func (c *Client) createRequest(method, url string, body []byte) (*http.Request, *countingReader, error) { if len(body) == 0 { req, err := http.NewRequest(method, url, nil) @@ -260,8 +290,7 @@ func (c *Client) clientTimeout() time.Duration { func (c *Client) Send(document Document) Result { start := c.now() result := Result{Id: document.Id, Stats: Stats{Requests: 1}} - method, url := c.methodAndURL(document) - req, cr, err := c.createRequest(method, url, document.Fields) + req, cr, err := c.prepare(document) if err != nil { return resultWithErr(result, err) } -- cgit v1.2.3