diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-04-19 10:50:59 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-04-19 11:36:02 +0200 |
commit | 9f3ba858930efafa2a466971574e4dc98a3d0d7a (patch) | |
tree | 6abdb47300b28378ff5894fe7934be7dd20e3967 /client | |
parent | 57a6011fb28c8ccd91a4d60f52630b50160d7901 (diff) |
Add compression
Diffstat (limited to 'client')
-rw-r--r-- | client/go/internal/cli/cmd/feed.go | 27 | ||||
-rw-r--r-- | client/go/internal/vespa/document/document.go | 2 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http.go | 47 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http_test.go | 50 |
4 files changed, 116 insertions, 10 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index 49be72b169c..06568dd35c3 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -17,6 +17,7 @@ import ( func addFeedFlags(cmd *cobra.Command, options *feedOptions) { cmd.PersistentFlags().IntVar(&options.connections, "connections", 8, "The number of connections to use") + cmd.PersistentFlags().StringVar(&options.compression, "compression", "auto", `Compression mode to use. Default is "auto" which compresses large documents. Must be "auto", "gzip" or "none"`) cmd.PersistentFlags().StringVar(&options.route, "route", "", "Target Vespa route for feed operations") cmd.PersistentFlags().IntVar(&options.traceLevel, "trace", 0, "The trace level of network traffic. 0 to disable") cmd.PersistentFlags().IntVar(&options.timeoutSecs, "timeout", 0, "Feed operation timeout in seconds. 0 to disable") @@ -32,6 +33,7 @@ func addFeedFlags(cmd *cobra.Command, options *feedOptions) { type feedOptions struct { connections int + compression string route string verbose bool traceLevel int @@ -109,17 +111,34 @@ func createServiceClients(service *vespa.Service, n int) []util.HTTPClient { return clients } +func (opts feedOptions) compressionMode() (document.Compression, error) { + switch opts.compression { + case "auto": + return document.CompressionAuto, nil + case "none": + return document.CompressionNone, nil + case "gzip": + return document.CompressionGzip, nil + } + return 0, errHint(fmt.Errorf("invalid compression mode: %s", opts.compression), `Must be "auto", "gzip" or "none"`) +} + func feed(r io.Reader, cli *CLI, options feedOptions) error { service, err := documentService(cli) if err != nil { return err } clients := createServiceClients(service, options.connections) + compression, err := options.compressionMode() + if err != nil { + return err + } client := document.NewClient(document.ClientOptions{ - Timeout: time.Duration(options.timeoutSecs) * time.Second, - Route: options.route, - TraceLevel: options.traceLevel, - BaseURL: service.BaseURL, + Compression: compression, + Timeout: time.Duration(options.timeoutSecs) * time.Second, + Route: options.route, + TraceLevel: options.traceLevel, + BaseURL: service.BaseURL, }, clients) throttler := document.NewThrottler(options.connections) // TODO(mpolden): Make doom duration configurable diff --git a/client/go/internal/vespa/document/document.go b/client/go/internal/vespa/document/document.go index 4ec3eac1cbc..214d1dc4797 100644 --- a/client/go/internal/vespa/document/document.go +++ b/client/go/internal/vespa/document/document.go @@ -14,7 +14,7 @@ var asciiSpace = [256]uint8{'\t': 1, '\n': 1, '\v': 1, '\f': 1, '\r': 1, ' ': 1} type Operation int const ( - OperationPut = iota + OperationPut Operation = iota OperationUpdate OperationRemove ) diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go index 1bcd7eff39e..51b6fa4de39 100644 --- a/client/go/internal/vespa/document/http.go +++ b/client/go/internal/vespa/document/http.go @@ -2,6 +2,7 @@ package document import ( "bytes" + "compress/gzip" "encoding/json" "fmt" "io" @@ -16,6 +17,14 @@ import ( "github.com/vespa-engine/vespa/client/go/internal/util" ) +type Compression int + +const ( + CompressionAuto Compression = iota + CompressionNone + CompressionGzip +) + // Client represents a HTTP client for the /document/v1/ API. type Client struct { options ClientOptions @@ -26,10 +35,11 @@ type Client struct { // ClientOptions specifices the configuration options of a feed client. type ClientOptions struct { - BaseURL string - Timeout time.Duration - Route string - TraceLevel int + BaseURL string + Timeout time.Duration + Route string + TraceLevel int + Compression Compression } type countingHTTPClient struct { @@ -152,6 +162,33 @@ func (c *Client) leastBusyClient() *countingHTTPClient { return &leastBusy } +func (c *Client) createRequest(method, url string, body []byte) (*http.Request, error) { + var r io.Reader + useGzip := c.options.Compression == CompressionGzip || (c.options.Compression == CompressionAuto && len(body) > 512) + if useGzip { + var buf bytes.Buffer + w := gzip.NewWriter(&buf) + if _, err := w.Write(body); err != nil { + return nil, err + } + if err := w.Close(); err != nil { + return nil, err + } + r = &buf + } else { + r = bytes.NewReader(body) + } + req, err := http.NewRequest(method, url, r) + if err != nil { + return nil, err + } + if useGzip { + req.Header.Set("Content-Encoding", "gzip") + } + req.Header.Set("Content-Type", "application/json; charset=utf-8") + return req, nil +} + // Send given document to the endpoint configured in this client. func (c *Client) Send(document Document) Result { start := c.now() @@ -160,7 +197,7 @@ func (c *Client) Send(document Document) Result { if err != nil { return resultWithErr(result, err) } - req, err := http.NewRequest(method, url.String(), bytes.NewReader(document.Body)) + req, err := c.createRequest(method, url.String(), document.Body) if err != nil { return resultWithErr(result, err) } diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go index 8f8394a5d4e..314113c53be 100644 --- a/client/go/internal/vespa/document/http_test.go +++ b/client/go/internal/vespa/document/http_test.go @@ -7,6 +7,7 @@ import ( "net/http" "net/url" "reflect" + "strings" "testing" "time" @@ -141,6 +142,55 @@ func TestClientSend(t *testing.T) { } } +func TestClientSendCompressed(t *testing.T) { + httpClient := mock.HTTPClient{} + client := NewClient(ClientOptions{ + BaseURL: "https://example.com:1337", + Timeout: time.Duration(5 * time.Second), + }, []util.HTTPClient{&httpClient}) + + bigBody := fmt.Sprintf(`{"fields":{"foo": "%s"}}`, strings.Repeat("s", 512+1)) + bigDoc := Document{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(bigBody)} + smallDoc := Document{Create: true, Id: mustParseId("id:ns:type::doc2"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "s"}}`)} + + client.options.Compression = CompressionNone + _ = client.Send(bigDoc) + assertCompressedRequest(t, false, httpClient.LastRequest) + _ = client.Send(smallDoc) + assertCompressedRequest(t, false, httpClient.LastRequest) + + client.options.Compression = CompressionAuto + _ = client.Send(bigDoc) + assertCompressedRequest(t, true, httpClient.LastRequest) + _ = client.Send(smallDoc) + assertCompressedRequest(t, false, httpClient.LastRequest) + + client.options.Compression = CompressionGzip + _ = client.Send(bigDoc) + assertCompressedRequest(t, true, httpClient.LastRequest) + _ = client.Send(smallDoc) + assertCompressedRequest(t, true, httpClient.LastRequest) +} + +func assertCompressedRequest(t *testing.T, want bool, request *http.Request) { + wantEnc := "" + if want { + wantEnc = "gzip" + } + gotEnc := request.Header.Get("Content-Encoding") + if gotEnc != wantEnc { + t.Errorf("got Content-Encoding=%q, want %q", gotEnc, wantEnc) + } + body, err := io.ReadAll(request.Body) + if err != nil { + t.Fatal(err) + } + compressed := bytes.HasPrefix(body, []byte{0x1f, 0x8b}) + if compressed != want { + t.Errorf("got compressed=%t, want %t", compressed, want) + } +} + func TestURLPath(t *testing.T) { tests := []struct { in Id |