diff options
author | Jon Bratseth <bratseth@gmail.com> | 2023-05-15 10:48:55 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-15 10:48:55 +0200 |
commit | 51df69aa6d2adeef471e443f1b2200eb5d674160 (patch) | |
tree | f85db5c018a9174d4fdf6100af6a0ea23109a5ef | |
parent | 66eee692f6fe2f78bdf5f4f9383fadd2774ca931 (diff) | |
parent | 8d6366ecf616e6e05596a1f8fbe8293179290a62 (diff) |
Merge pull request #27106 from vespa-engine/mpolden/remove-stream
Remove streaming
-rw-r--r-- | client/go/internal/vespa/document/http.go | 80 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http_test.go | 6 |
2 files changed, 32 insertions, 54 deletions
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go index d31663d38d7..3655bd020f4 100644 --- a/client/go/internal/vespa/document/http.go +++ b/client/go/internal/vespa/document/http.go @@ -1,7 +1,6 @@ package document import ( - "bufio" "bytes" "encoding/json" "fmt" @@ -56,19 +55,6 @@ type ClientOptions struct { NowFunc func() time.Time } -type countingReader struct { - reader io.ReadCloser - bytesRead int64 -} - -func (r *countingReader) Read(p []byte) (int, error) { - n, err := r.reader.Read(p) - r.bytesRead += int64(n) - return n, err -} - -func (r *countingReader) Close() error { return r.reader.Close() } - type countingHTTPClient struct { client util.HTTPClient inflight int64 @@ -86,7 +72,7 @@ type pendingDocument struct { prepared chan bool request *http.Request - body *countingReader + buf *bytes.Buffer err error } @@ -232,51 +218,50 @@ func (c *Client) buffer() *bytes.Buffer { func (c *Client) preparePending() { for pd := range c.pending { method, url := c.methodAndURL(pd.document) - pd.request, pd.body, pd.err = c.createRequest(method, url, pd.document.Fields) + pd.buf = c.buffer() + pd.request, pd.err = c.createRequest(method, url, pd.document.Fields, pd.buf) pd.prepared <- true } } -func (c *Client) prepare(document Document) (*http.Request, *countingReader, error) { +func (c *Client) prepare(document Document) (*http.Request, *bytes.Buffer, error) { pd := pendingDocument{document: document, prepared: make(chan bool)} c.pending <- &pd <-pd.prepared - return pd.request, pd.body, pd.err + return pd.request, pd.buf, pd.err } -func (c *Client) createRequest(method, url string, body []byte) (*http.Request, *countingReader, error) { +func (c *Client) createRequest(method, url string, body []byte, buf *bytes.Buffer) (*http.Request, error) { if len(body) == 0 { req, err := http.NewRequest(method, url, nil) - return req, nil, err + return req, err } bodySize := len(fieldsPrefix) + len(body) + len(fieldsSuffix) useGzip := c.options.Compression == CompressionGzip || (c.options.Compression == CompressionAuto && bodySize > 512) - pr, pw := io.Pipe() - go func() { - bw := bufio.NewWriterSize(pw, min(1024, bodySize)) - defer func() { - bw.Flush() - pw.Close() - }() - if useGzip { - zw := c.gzipWriter(bw) - writeRequestBody(zw, body) - zw.Close() - c.gzippers.Put(zw) - } else { - writeRequestBody(bw, body) + buf.Grow(min(1024, bodySize)) + if useGzip { + zw := c.gzipWriter(buf) + defer c.gzippers.Put(zw) + if err := writeRequestBody(zw, body); err != nil { + return nil, err + } + if err := zw.Close(); err != nil { + return nil, err } - }() - cr := &countingReader{reader: pr} - req, err := http.NewRequest(method, url, cr) + } else { + if err := writeRequestBody(buf, body); err != nil { + return nil, err + } + } + req, err := http.NewRequest(method, url, buf) if err != nil { - return nil, cr, err + return nil, err } if useGzip { req.Header.Set("Content-Encoding", "gzip") } req.Header.Set("Content-Type", "application/json; charset=utf-8") - return req, cr, nil + return req, nil } func (c *Client) clientTimeout() time.Duration { @@ -290,21 +275,19 @@ func (c *Client) clientTimeout() time.Duration { func (c *Client) Send(document Document) Result { start := c.now() result := Result{Id: document.Id, Stats: Stats{Requests: 1}} - req, cr, err := c.prepare(document) + req, buf, err := c.prepare(document) + defer c.buffers.Put(buf) if err != nil { return resultWithErr(result, err) } + bodySize := buf.Len() resp, err := c.leastBusyClient().Do(req, c.clientTimeout()) if err != nil { return resultWithErr(result, err) } defer resp.Body.Close() elapsed := c.now().Sub(start) - var bytesRead int64 - if cr != nil { - bytesRead = cr.bytesRead - } - return c.resultWithResponse(resp, bytesRead, result, elapsed) + return resultWithResponse(resp, bodySize, result, elapsed, buf) } func resultWithErr(result Result, err error) Result { @@ -314,7 +297,7 @@ func resultWithErr(result Result, err error) Result { return result } -func (c *Client) resultWithResponse(resp *http.Response, sentBytes int64, result Result, elapsed time.Duration) Result { +func resultWithResponse(resp *http.Response, sentBytes int, result Result, elapsed time.Duration, buf *bytes.Buffer) Result { result.HTTPStatus = resp.StatusCode result.Stats.Responses++ result.Stats.ResponsesByCode = map[int]int64{resp.StatusCode: 1} @@ -332,8 +315,7 @@ func (c *Client) resultWithResponse(resp *http.Response, sentBytes int64, result Message string `json:"message"` Trace json.RawMessage `json:"trace"` } - buf := c.buffer() - defer c.buffers.Put(buf) + buf.Reset() written, err := io.Copy(buf, resp.Body) if err != nil { result.Status = StatusVespaFailure @@ -346,7 +328,7 @@ func (c *Client) resultWithResponse(resp *http.Response, sentBytes int64, result } result.Message = body.Message result.Trace = string(body.Trace) - result.Stats.BytesSent = sentBytes + result.Stats.BytesSent = int64(sentBytes) result.Stats.BytesRecv = int64(written) if !result.Success() { result.Stats.Errors++ diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go index 8fb0b1b2b95..95af5f997f4 100644 --- a/client/go/internal/vespa/document/http_test.go +++ b/client/go/internal/vespa/document/http_test.go @@ -297,13 +297,9 @@ func BenchmarkClientSendSmallUncompressed(b *testing.B) { } func BenchmarkClientSendMediumUncompressed(b *testing.B) { - benchmarkClientSend(b, CompressionNone, makeDocument(10)) + benchmarkClientSend(b, CompressionNone, makeDocument(1000)) } func BenchmarkClientSendMediumGzip(b *testing.B) { benchmarkClientSend(b, CompressionGzip, makeDocument(1000)) } - -func BenchmarkClientSendLargeGzip(b *testing.B) { - benchmarkClientSend(b, CompressionGzip, makeDocument(10000)) -} |