diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-05-12 23:00:42 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-05-12 23:01:15 +0200 |
commit | b1918ed026cc58c55e69e03bfae9a083c709feff (patch) | |
tree | 7a51b04e92e1c3de6842c1901d06450508d87096 | |
parent | 266cedc2d35ecf145145bb5bd94fe6a7b64da756 (diff) |
Remove streaming
Not needed now that there are dedicated workers preparing requests.
-rw-r--r-- | client/go/internal/vespa/document/http.go | 74 |
1 files changed, 27 insertions, 47 deletions
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go index d31663d38d7..b41b98df382 100644 --- a/client/go/internal/vespa/document/http.go +++ b/client/go/internal/vespa/document/http.go @@ -1,7 +1,6 @@ package document import ( - "bufio" "bytes" "encoding/json" "fmt" @@ -56,19 +55,6 @@ type ClientOptions struct { NowFunc func() time.Time } -type countingReader struct { - reader io.ReadCloser - bytesRead int64 -} - -func (r *countingReader) Read(p []byte) (int, error) { - n, err := r.reader.Read(p) - r.bytesRead += int64(n) - return n, err -} - -func (r *countingReader) Close() error { return r.reader.Close() } - type countingHTTPClient struct { client util.HTTPClient inflight int64 @@ -86,7 +72,7 @@ type pendingDocument struct { prepared chan bool request *http.Request - body *countingReader + size int err error } @@ -232,51 +218,49 @@ func (c *Client) buffer() *bytes.Buffer { func (c *Client) preparePending() { for pd := range c.pending { method, url := c.methodAndURL(pd.document) - pd.request, pd.body, pd.err = c.createRequest(method, url, pd.document.Fields) + pd.request, pd.size, pd.err = c.createRequest(method, url, pd.document.Fields) pd.prepared <- true } } -func (c *Client) prepare(document Document) (*http.Request, *countingReader, error) { +func (c *Client) prepare(document Document) (*http.Request, int, error) { pd := pendingDocument{document: document, prepared: make(chan bool)} c.pending <- &pd <-pd.prepared - return pd.request, pd.body, pd.err + return pd.request, pd.size, pd.err } -func (c *Client) createRequest(method, url string, body []byte) (*http.Request, *countingReader, error) { +func (c *Client) createRequest(method, url string, body []byte) (*http.Request, int, error) { if len(body) == 0 { req, err := http.NewRequest(method, url, nil) - return req, nil, err + return req, 0, err } bodySize := len(fieldsPrefix) + len(body) + len(fieldsSuffix) useGzip := c.options.Compression == CompressionGzip || (c.options.Compression == CompressionAuto && bodySize > 512) - pr, pw := io.Pipe() - go func() { - bw := bufio.NewWriterSize(pw, min(1024, bodySize)) - defer func() { - bw.Flush() - pw.Close() - }() - if useGzip { - zw := c.gzipWriter(bw) - writeRequestBody(zw, body) - zw.Close() - c.gzippers.Put(zw) - } else { - writeRequestBody(bw, body) + buf := bytes.NewBuffer(make([]byte, 0, min(1024, bodySize))) + if useGzip { + zw := c.gzipWriter(buf) + defer c.gzippers.Put(zw) + if err := writeRequestBody(zw, body); err != nil { + return nil, 0, err } - }() - cr := &countingReader{reader: pr} - req, err := http.NewRequest(method, url, cr) + if err := zw.Close(); err != nil { + return nil, 0, err + } + } else { + if err := writeRequestBody(buf, body); err != nil { + return nil, 0, err + } + } + req, err := http.NewRequest(method, url, buf) if err != nil { - return nil, cr, err + return nil, 0, err } if useGzip { req.Header.Set("Content-Encoding", "gzip") } req.Header.Set("Content-Type", "application/json; charset=utf-8") - return req, cr, nil + return req, buf.Len(), nil } func (c *Client) clientTimeout() time.Duration { @@ -290,7 +274,7 @@ func (c *Client) clientTimeout() time.Duration { func (c *Client) Send(document Document) Result { start := c.now() result := Result{Id: document.Id, Stats: Stats{Requests: 1}} - req, cr, err := c.prepare(document) + req, size, err := c.prepare(document) if err != nil { return resultWithErr(result, err) } @@ -300,11 +284,7 @@ func (c *Client) Send(document Document) Result { } defer resp.Body.Close() elapsed := c.now().Sub(start) - var bytesRead int64 - if cr != nil { - bytesRead = cr.bytesRead - } - return c.resultWithResponse(resp, bytesRead, result, elapsed) + return c.resultWithResponse(resp, size, result, elapsed) } func resultWithErr(result Result, err error) Result { @@ -314,7 +294,7 @@ func resultWithErr(result Result, err error) Result { return result } -func (c *Client) resultWithResponse(resp *http.Response, sentBytes int64, result Result, elapsed time.Duration) Result { +func (c *Client) resultWithResponse(resp *http.Response, sentBytes int, result Result, elapsed time.Duration) Result { result.HTTPStatus = resp.StatusCode result.Stats.Responses++ result.Stats.ResponsesByCode = map[int]int64{resp.StatusCode: 1} @@ -346,7 +326,7 @@ func (c *Client) resultWithResponse(resp *http.Response, sentBytes int64, result } result.Message = body.Message result.Trace = string(body.Trace) - result.Stats.BytesSent = sentBytes + result.Stats.BytesSent = int64(sentBytes) result.Stats.BytesRecv = int64(written) if !result.Success() { result.Stats.Errors++ |