aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-04-19 10:50:59 +0200
committerMartin Polden <mpolden@mpolden.no>2023-04-19 11:36:02 +0200
commit9f3ba858930efafa2a466971574e4dc98a3d0d7a (patch)
tree6abdb47300b28378ff5894fe7934be7dd20e3967
parent57a6011fb28c8ccd91a4d60f52630b50160d7901 (diff)
Add compression
-rw-r--r--client/go/internal/cli/cmd/feed.go27
-rw-r--r--client/go/internal/vespa/document/document.go2
-rw-r--r--client/go/internal/vespa/document/http.go47
-rw-r--r--client/go/internal/vespa/document/http_test.go50
4 files changed, 116 insertions, 10 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go
index 49be72b169c..06568dd35c3 100644
--- a/client/go/internal/cli/cmd/feed.go
+++ b/client/go/internal/cli/cmd/feed.go
@@ -17,6 +17,7 @@ import (
func addFeedFlags(cmd *cobra.Command, options *feedOptions) {
cmd.PersistentFlags().IntVar(&options.connections, "connections", 8, "The number of connections to use")
+ cmd.PersistentFlags().StringVar(&options.compression, "compression", "auto", `Compression mode to use. Default is "auto" which compresses large documents. Must be "auto", "gzip" or "none"`)
cmd.PersistentFlags().StringVar(&options.route, "route", "", "Target Vespa route for feed operations")
cmd.PersistentFlags().IntVar(&options.traceLevel, "trace", 0, "The trace level of network traffic. 0 to disable")
cmd.PersistentFlags().IntVar(&options.timeoutSecs, "timeout", 0, "Feed operation timeout in seconds. 0 to disable")
@@ -32,6 +33,7 @@ func addFeedFlags(cmd *cobra.Command, options *feedOptions) {
type feedOptions struct {
connections int
+ compression string
route string
verbose bool
traceLevel int
@@ -109,17 +111,34 @@ func createServiceClients(service *vespa.Service, n int) []util.HTTPClient {
return clients
}
+func (opts feedOptions) compressionMode() (document.Compression, error) {
+ switch opts.compression {
+ case "auto":
+ return document.CompressionAuto, nil
+ case "none":
+ return document.CompressionNone, nil
+ case "gzip":
+ return document.CompressionGzip, nil
+ }
+ return 0, errHint(fmt.Errorf("invalid compression mode: %s", opts.compression), `Must be "auto", "gzip" or "none"`)
+}
+
func feed(r io.Reader, cli *CLI, options feedOptions) error {
service, err := documentService(cli)
if err != nil {
return err
}
clients := createServiceClients(service, options.connections)
+ compression, err := options.compressionMode()
+ if err != nil {
+ return err
+ }
client := document.NewClient(document.ClientOptions{
- Timeout: time.Duration(options.timeoutSecs) * time.Second,
- Route: options.route,
- TraceLevel: options.traceLevel,
- BaseURL: service.BaseURL,
+ Compression: compression,
+ Timeout: time.Duration(options.timeoutSecs) * time.Second,
+ Route: options.route,
+ TraceLevel: options.traceLevel,
+ BaseURL: service.BaseURL,
}, clients)
throttler := document.NewThrottler(options.connections)
// TODO(mpolden): Make doom duration configurable
diff --git a/client/go/internal/vespa/document/document.go b/client/go/internal/vespa/document/document.go
index 4ec3eac1cbc..214d1dc4797 100644
--- a/client/go/internal/vespa/document/document.go
+++ b/client/go/internal/vespa/document/document.go
@@ -14,7 +14,7 @@ var asciiSpace = [256]uint8{'\t': 1, '\n': 1, '\v': 1, '\f': 1, '\r': 1, ' ': 1}
type Operation int
const (
- OperationPut = iota
+ OperationPut Operation = iota
OperationUpdate
OperationRemove
)
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go
index 1bcd7eff39e..51b6fa4de39 100644
--- a/client/go/internal/vespa/document/http.go
+++ b/client/go/internal/vespa/document/http.go
@@ -2,6 +2,7 @@ package document
import (
"bytes"
+ "compress/gzip"
"encoding/json"
"fmt"
"io"
@@ -16,6 +17,14 @@ import (
"github.com/vespa-engine/vespa/client/go/internal/util"
)
+type Compression int
+
+const (
+ CompressionAuto Compression = iota
+ CompressionNone
+ CompressionGzip
+)
+
// Client represents a HTTP client for the /document/v1/ API.
type Client struct {
options ClientOptions
@@ -26,10 +35,11 @@ type Client struct {
// ClientOptions specifices the configuration options of a feed client.
type ClientOptions struct {
- BaseURL string
- Timeout time.Duration
- Route string
- TraceLevel int
+ BaseURL string
+ Timeout time.Duration
+ Route string
+ TraceLevel int
+ Compression Compression
}
type countingHTTPClient struct {
@@ -152,6 +162,33 @@ func (c *Client) leastBusyClient() *countingHTTPClient {
return &leastBusy
}
+func (c *Client) createRequest(method, url string, body []byte) (*http.Request, error) {
+ var r io.Reader
+ useGzip := c.options.Compression == CompressionGzip || (c.options.Compression == CompressionAuto && len(body) > 512)
+ if useGzip {
+ var buf bytes.Buffer
+ w := gzip.NewWriter(&buf)
+ if _, err := w.Write(body); err != nil {
+ return nil, err
+ }
+ if err := w.Close(); err != nil {
+ return nil, err
+ }
+ r = &buf
+ } else {
+ r = bytes.NewReader(body)
+ }
+ req, err := http.NewRequest(method, url, r)
+ if err != nil {
+ return nil, err
+ }
+ if useGzip {
+ req.Header.Set("Content-Encoding", "gzip")
+ }
+ req.Header.Set("Content-Type", "application/json; charset=utf-8")
+ return req, nil
+}
+
// Send given document to the endpoint configured in this client.
func (c *Client) Send(document Document) Result {
start := c.now()
@@ -160,7 +197,7 @@ func (c *Client) Send(document Document) Result {
if err != nil {
return resultWithErr(result, err)
}
- req, err := http.NewRequest(method, url.String(), bytes.NewReader(document.Body))
+ req, err := c.createRequest(method, url.String(), document.Body)
if err != nil {
return resultWithErr(result, err)
}
diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go
index 8f8394a5d4e..314113c53be 100644
--- a/client/go/internal/vespa/document/http_test.go
+++ b/client/go/internal/vespa/document/http_test.go
@@ -7,6 +7,7 @@ import (
"net/http"
"net/url"
"reflect"
+ "strings"
"testing"
"time"
@@ -141,6 +142,55 @@ func TestClientSend(t *testing.T) {
}
}
+func TestClientSendCompressed(t *testing.T) {
+ httpClient := mock.HTTPClient{}
+ client := NewClient(ClientOptions{
+ BaseURL: "https://example.com:1337",
+ Timeout: time.Duration(5 * time.Second),
+ }, []util.HTTPClient{&httpClient})
+
+ bigBody := fmt.Sprintf(`{"fields":{"foo": "%s"}}`, strings.Repeat("s", 512+1))
+ bigDoc := Document{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(bigBody)}
+ smallDoc := Document{Create: true, Id: mustParseId("id:ns:type::doc2"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "s"}}`)}
+
+ client.options.Compression = CompressionNone
+ _ = client.Send(bigDoc)
+ assertCompressedRequest(t, false, httpClient.LastRequest)
+ _ = client.Send(smallDoc)
+ assertCompressedRequest(t, false, httpClient.LastRequest)
+
+ client.options.Compression = CompressionAuto
+ _ = client.Send(bigDoc)
+ assertCompressedRequest(t, true, httpClient.LastRequest)
+ _ = client.Send(smallDoc)
+ assertCompressedRequest(t, false, httpClient.LastRequest)
+
+ client.options.Compression = CompressionGzip
+ _ = client.Send(bigDoc)
+ assertCompressedRequest(t, true, httpClient.LastRequest)
+ _ = client.Send(smallDoc)
+ assertCompressedRequest(t, true, httpClient.LastRequest)
+}
+
+func assertCompressedRequest(t *testing.T, want bool, request *http.Request) {
+ wantEnc := ""
+ if want {
+ wantEnc = "gzip"
+ }
+ gotEnc := request.Header.Get("Content-Encoding")
+ if gotEnc != wantEnc {
+ t.Errorf("got Content-Encoding=%q, want %q", gotEnc, wantEnc)
+ }
+ body, err := io.ReadAll(request.Body)
+ if err != nil {
+ t.Fatal(err)
+ }
+ compressed := bytes.HasPrefix(body, []byte{0x1f, 0x8b})
+ if compressed != want {
+ t.Errorf("got compressed=%t, want %t", compressed, want)
+ }
+}
+
func TestURLPath(t *testing.T) {
tests := []struct {
in Id