aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-05-12 23:00:42 +0200
committerMartin Polden <mpolden@mpolden.no>2023-05-12 23:01:15 +0200
commitb1918ed026cc58c55e69e03bfae9a083c709feff (patch)
tree7a51b04e92e1c3de6842c1901d06450508d87096
parent266cedc2d35ecf145145bb5bd94fe6a7b64da756 (diff)
Remove streaming
Not needed now that there are dedicated workers preparing requests.
-rw-r--r--client/go/internal/vespa/document/http.go74
1 files changed, 27 insertions, 47 deletions
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go
index d31663d38d7..b41b98df382 100644
--- a/client/go/internal/vespa/document/http.go
+++ b/client/go/internal/vespa/document/http.go
@@ -1,7 +1,6 @@
package document
import (
- "bufio"
"bytes"
"encoding/json"
"fmt"
@@ -56,19 +55,6 @@ type ClientOptions struct {
NowFunc func() time.Time
}
-type countingReader struct {
- reader io.ReadCloser
- bytesRead int64
-}
-
-func (r *countingReader) Read(p []byte) (int, error) {
- n, err := r.reader.Read(p)
- r.bytesRead += int64(n)
- return n, err
-}
-
-func (r *countingReader) Close() error { return r.reader.Close() }
-
type countingHTTPClient struct {
client util.HTTPClient
inflight int64
@@ -86,7 +72,7 @@ type pendingDocument struct {
prepared chan bool
request *http.Request
- body *countingReader
+ size int
err error
}
@@ -232,51 +218,49 @@ func (c *Client) buffer() *bytes.Buffer {
func (c *Client) preparePending() {
for pd := range c.pending {
method, url := c.methodAndURL(pd.document)
- pd.request, pd.body, pd.err = c.createRequest(method, url, pd.document.Fields)
+ pd.request, pd.size, pd.err = c.createRequest(method, url, pd.document.Fields)
pd.prepared <- true
}
}
-func (c *Client) prepare(document Document) (*http.Request, *countingReader, error) {
+func (c *Client) prepare(document Document) (*http.Request, int, error) {
pd := pendingDocument{document: document, prepared: make(chan bool)}
c.pending <- &pd
<-pd.prepared
- return pd.request, pd.body, pd.err
+ return pd.request, pd.size, pd.err
}
-func (c *Client) createRequest(method, url string, body []byte) (*http.Request, *countingReader, error) {
+func (c *Client) createRequest(method, url string, body []byte) (*http.Request, int, error) {
if len(body) == 0 {
req, err := http.NewRequest(method, url, nil)
- return req, nil, err
+ return req, 0, err
}
bodySize := len(fieldsPrefix) + len(body) + len(fieldsSuffix)
useGzip := c.options.Compression == CompressionGzip || (c.options.Compression == CompressionAuto && bodySize > 512)
- pr, pw := io.Pipe()
- go func() {
- bw := bufio.NewWriterSize(pw, min(1024, bodySize))
- defer func() {
- bw.Flush()
- pw.Close()
- }()
- if useGzip {
- zw := c.gzipWriter(bw)
- writeRequestBody(zw, body)
- zw.Close()
- c.gzippers.Put(zw)
- } else {
- writeRequestBody(bw, body)
+ buf := bytes.NewBuffer(make([]byte, 0, min(1024, bodySize)))
+ if useGzip {
+ zw := c.gzipWriter(buf)
+ defer c.gzippers.Put(zw)
+ if err := writeRequestBody(zw, body); err != nil {
+ return nil, 0, err
}
- }()
- cr := &countingReader{reader: pr}
- req, err := http.NewRequest(method, url, cr)
+ if err := zw.Close(); err != nil {
+ return nil, 0, err
+ }
+ } else {
+ if err := writeRequestBody(buf, body); err != nil {
+ return nil, 0, err
+ }
+ }
+ req, err := http.NewRequest(method, url, buf)
if err != nil {
- return nil, cr, err
+ return nil, 0, err
}
if useGzip {
req.Header.Set("Content-Encoding", "gzip")
}
req.Header.Set("Content-Type", "application/json; charset=utf-8")
- return req, cr, nil
+ return req, buf.Len(), nil
}
func (c *Client) clientTimeout() time.Duration {
@@ -290,7 +274,7 @@ func (c *Client) clientTimeout() time.Duration {
func (c *Client) Send(document Document) Result {
start := c.now()
result := Result{Id: document.Id, Stats: Stats{Requests: 1}}
- req, cr, err := c.prepare(document)
+ req, size, err := c.prepare(document)
if err != nil {
return resultWithErr(result, err)
}
@@ -300,11 +284,7 @@ func (c *Client) Send(document Document) Result {
}
defer resp.Body.Close()
elapsed := c.now().Sub(start)
- var bytesRead int64
- if cr != nil {
- bytesRead = cr.bytesRead
- }
- return c.resultWithResponse(resp, bytesRead, result, elapsed)
+ return c.resultWithResponse(resp, size, result, elapsed)
}
func resultWithErr(result Result, err error) Result {
@@ -314,7 +294,7 @@ func resultWithErr(result Result, err error) Result {
return result
}
-func (c *Client) resultWithResponse(resp *http.Response, sentBytes int64, result Result, elapsed time.Duration) Result {
+func (c *Client) resultWithResponse(resp *http.Response, sentBytes int, result Result, elapsed time.Duration) Result {
result.HTTPStatus = resp.StatusCode
result.Stats.Responses++
result.Stats.ResponsesByCode = map[int]int64{resp.StatusCode: 1}
@@ -346,7 +326,7 @@ func (c *Client) resultWithResponse(resp *http.Response, sentBytes int64, result
}
result.Message = body.Message
result.Trace = string(body.Trace)
- result.Stats.BytesSent = sentBytes
+ result.Stats.BytesSent = int64(sentBytes)
result.Stats.BytesRecv = int64(written)
if !result.Success() {
result.Stats.Errors++