aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2023-05-15 10:48:55 +0200
committerGitHub <noreply@github.com>2023-05-15 10:48:55 +0200
commit51df69aa6d2adeef471e443f1b2200eb5d674160 (patch)
treef85db5c018a9174d4fdf6100af6a0ea23109a5ef
parent66eee692f6fe2f78bdf5f4f9383fadd2774ca931 (diff)
parent8d6366ecf616e6e05596a1f8fbe8293179290a62 (diff)
Merge pull request #27106 from vespa-engine/mpolden/remove-stream
Remove streaming
-rw-r--r--client/go/internal/vespa/document/http.go80
-rw-r--r--client/go/internal/vespa/document/http_test.go6
2 files changed, 32 insertions, 54 deletions
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go
index d31663d38d7..3655bd020f4 100644
--- a/client/go/internal/vespa/document/http.go
+++ b/client/go/internal/vespa/document/http.go
@@ -1,7 +1,6 @@
package document
import (
- "bufio"
"bytes"
"encoding/json"
"fmt"
@@ -56,19 +55,6 @@ type ClientOptions struct {
NowFunc func() time.Time
}
-type countingReader struct {
- reader io.ReadCloser
- bytesRead int64
-}
-
-func (r *countingReader) Read(p []byte) (int, error) {
- n, err := r.reader.Read(p)
- r.bytesRead += int64(n)
- return n, err
-}
-
-func (r *countingReader) Close() error { return r.reader.Close() }
-
type countingHTTPClient struct {
client util.HTTPClient
inflight int64
@@ -86,7 +72,7 @@ type pendingDocument struct {
prepared chan bool
request *http.Request
- body *countingReader
+ buf *bytes.Buffer
err error
}
@@ -232,51 +218,50 @@ func (c *Client) buffer() *bytes.Buffer {
func (c *Client) preparePending() {
for pd := range c.pending {
method, url := c.methodAndURL(pd.document)
- pd.request, pd.body, pd.err = c.createRequest(method, url, pd.document.Fields)
+ pd.buf = c.buffer()
+ pd.request, pd.err = c.createRequest(method, url, pd.document.Fields, pd.buf)
pd.prepared <- true
}
}
-func (c *Client) prepare(document Document) (*http.Request, *countingReader, error) {
+func (c *Client) prepare(document Document) (*http.Request, *bytes.Buffer, error) {
pd := pendingDocument{document: document, prepared: make(chan bool)}
c.pending <- &pd
<-pd.prepared
- return pd.request, pd.body, pd.err
+ return pd.request, pd.buf, pd.err
}
-func (c *Client) createRequest(method, url string, body []byte) (*http.Request, *countingReader, error) {
+func (c *Client) createRequest(method, url string, body []byte, buf *bytes.Buffer) (*http.Request, error) {
if len(body) == 0 {
req, err := http.NewRequest(method, url, nil)
- return req, nil, err
+ return req, err
}
bodySize := len(fieldsPrefix) + len(body) + len(fieldsSuffix)
useGzip := c.options.Compression == CompressionGzip || (c.options.Compression == CompressionAuto && bodySize > 512)
- pr, pw := io.Pipe()
- go func() {
- bw := bufio.NewWriterSize(pw, min(1024, bodySize))
- defer func() {
- bw.Flush()
- pw.Close()
- }()
- if useGzip {
- zw := c.gzipWriter(bw)
- writeRequestBody(zw, body)
- zw.Close()
- c.gzippers.Put(zw)
- } else {
- writeRequestBody(bw, body)
+ buf.Grow(min(1024, bodySize))
+ if useGzip {
+ zw := c.gzipWriter(buf)
+ defer c.gzippers.Put(zw)
+ if err := writeRequestBody(zw, body); err != nil {
+ return nil, err
+ }
+ if err := zw.Close(); err != nil {
+ return nil, err
}
- }()
- cr := &countingReader{reader: pr}
- req, err := http.NewRequest(method, url, cr)
+ } else {
+ if err := writeRequestBody(buf, body); err != nil {
+ return nil, err
+ }
+ }
+ req, err := http.NewRequest(method, url, buf)
if err != nil {
- return nil, cr, err
+ return nil, err
}
if useGzip {
req.Header.Set("Content-Encoding", "gzip")
}
req.Header.Set("Content-Type", "application/json; charset=utf-8")
- return req, cr, nil
+ return req, nil
}
func (c *Client) clientTimeout() time.Duration {
@@ -290,21 +275,19 @@ func (c *Client) clientTimeout() time.Duration {
func (c *Client) Send(document Document) Result {
start := c.now()
result := Result{Id: document.Id, Stats: Stats{Requests: 1}}
- req, cr, err := c.prepare(document)
+ req, buf, err := c.prepare(document)
+ defer c.buffers.Put(buf)
if err != nil {
return resultWithErr(result, err)
}
+ bodySize := buf.Len()
resp, err := c.leastBusyClient().Do(req, c.clientTimeout())
if err != nil {
return resultWithErr(result, err)
}
defer resp.Body.Close()
elapsed := c.now().Sub(start)
- var bytesRead int64
- if cr != nil {
- bytesRead = cr.bytesRead
- }
- return c.resultWithResponse(resp, bytesRead, result, elapsed)
+ return resultWithResponse(resp, bodySize, result, elapsed, buf)
}
func resultWithErr(result Result, err error) Result {
@@ -314,7 +297,7 @@ func resultWithErr(result Result, err error) Result {
return result
}
-func (c *Client) resultWithResponse(resp *http.Response, sentBytes int64, result Result, elapsed time.Duration) Result {
+func resultWithResponse(resp *http.Response, sentBytes int, result Result, elapsed time.Duration, buf *bytes.Buffer) Result {
result.HTTPStatus = resp.StatusCode
result.Stats.Responses++
result.Stats.ResponsesByCode = map[int]int64{resp.StatusCode: 1}
@@ -332,8 +315,7 @@ func (c *Client) resultWithResponse(resp *http.Response, sentBytes int64, result
Message string `json:"message"`
Trace json.RawMessage `json:"trace"`
}
- buf := c.buffer()
- defer c.buffers.Put(buf)
+ buf.Reset()
written, err := io.Copy(buf, resp.Body)
if err != nil {
result.Status = StatusVespaFailure
@@ -346,7 +328,7 @@ func (c *Client) resultWithResponse(resp *http.Response, sentBytes int64, result
}
result.Message = body.Message
result.Trace = string(body.Trace)
- result.Stats.BytesSent = sentBytes
+ result.Stats.BytesSent = int64(sentBytes)
result.Stats.BytesRecv = int64(written)
if !result.Success() {
result.Stats.Errors++
diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go
index 8fb0b1b2b95..95af5f997f4 100644
--- a/client/go/internal/vespa/document/http_test.go
+++ b/client/go/internal/vespa/document/http_test.go
@@ -297,13 +297,9 @@ func BenchmarkClientSendSmallUncompressed(b *testing.B) {
}
func BenchmarkClientSendMediumUncompressed(b *testing.B) {
- benchmarkClientSend(b, CompressionNone, makeDocument(10))
+ benchmarkClientSend(b, CompressionNone, makeDocument(1000))
}
func BenchmarkClientSendMediumGzip(b *testing.B) {
benchmarkClientSend(b, CompressionGzip, makeDocument(1000))
}
-
-func BenchmarkClientSendLargeGzip(b *testing.B) {
- benchmarkClientSend(b, CompressionGzip, makeDocument(10000))
-}