aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-04-19 12:18:28 +0200
committerGitHub <noreply@github.com>2023-04-19 12:18:28 +0200
commitf8f367921956b5e0a7e9927fdecaf1713a80fbf8 (patch)
tree52e3f33f75af97c17750a444df6be8263fbffb12
parent90e9ba99448f970558fd9d7cefae370b522a8e91 (diff)
parent9f3ba858930efafa2a466971574e4dc98a3d0d7a (diff)
Merge pull request #26778 from vespa-engine/mpolden/feed-client-8
Add compression
-rw-r--r--client/go/internal/cli/cmd/feed.go56
-rw-r--r--client/go/internal/vespa/document/dispatcher.go79
-rw-r--r--client/go/internal/vespa/document/document.go24
-rw-r--r--client/go/internal/vespa/document/http.go47
-rw-r--r--client/go/internal/vespa/document/http_test.go50
-rw-r--r--client/go/internal/vespa/document/queue.go43
-rw-r--r--client/go/internal/vespa/document/queue_test.go29
7 files changed, 249 insertions, 79 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go
index f0f82dd80d1..06568dd35c3 100644
--- a/client/go/internal/cli/cmd/feed.go
+++ b/client/go/internal/cli/cmd/feed.go
@@ -6,6 +6,7 @@ import (
"io"
"math"
"os"
+ "runtime/pprof"
"time"
"github.com/spf13/cobra"
@@ -16,18 +17,29 @@ import (
func addFeedFlags(cmd *cobra.Command, options *feedOptions) {
cmd.PersistentFlags().IntVar(&options.connections, "connections", 8, "The number of connections to use")
+ cmd.PersistentFlags().StringVar(&options.compression, "compression", "auto", `Compression mode to use. Default is "auto" which compresses large documents. Must be "auto", "gzip" or "none"`)
cmd.PersistentFlags().StringVar(&options.route, "route", "", "Target Vespa route for feed operations")
cmd.PersistentFlags().IntVar(&options.traceLevel, "trace", 0, "The trace level of network traffic. 0 to disable")
cmd.PersistentFlags().IntVar(&options.timeoutSecs, "timeout", 0, "Feed operation timeout in seconds. 0 to disable")
cmd.PersistentFlags().BoolVar(&options.verbose, "verbose", false, "Verbose mode. Print successful operations in addition to errors")
+ memprofile := "memprofile"
+ cpuprofile := "cpuprofile"
+ cmd.PersistentFlags().StringVar(&options.memprofile, memprofile, "", "Write a heap profile to given file")
+ cmd.PersistentFlags().StringVar(&options.cpuprofile, cpuprofile, "", "Write a CPU profile to given file")
+ // Hide these flags as they are intended for internal use
+ cmd.PersistentFlags().MarkHidden(memprofile)
+ cmd.PersistentFlags().MarkHidden(cpuprofile)
}
type feedOptions struct {
connections int
+ compression string
route string
verbose bool
traceLevel int
timeoutSecs int
+ memprofile string
+ cpuprofile string
}
func newFeedCmd(cli *CLI) *cobra.Command {
@@ -64,7 +76,24 @@ $ cat documents.jsonl | vespa feed -
defer f.Close()
r = f
}
- return feed(r, cli, options)
+ if options.cpuprofile != "" {
+ f, err := os.Create(options.cpuprofile)
+ if err != nil {
+ return err
+ }
+ pprof.StartCPUProfile(f)
+ defer pprof.StopCPUProfile()
+ }
+ err := feed(r, cli, options)
+ if options.memprofile != "" {
+ f, err := os.Create(options.memprofile)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ pprof.WriteHeapProfile(f)
+ }
+ return err
},
}
addFeedFlags(cmd, &options)
@@ -82,17 +111,34 @@ func createServiceClients(service *vespa.Service, n int) []util.HTTPClient {
return clients
}
+func (opts feedOptions) compressionMode() (document.Compression, error) {
+ switch opts.compression {
+ case "auto":
+ return document.CompressionAuto, nil
+ case "none":
+ return document.CompressionNone, nil
+ case "gzip":
+ return document.CompressionGzip, nil
+ }
+ return 0, errHint(fmt.Errorf("invalid compression mode: %s", opts.compression), `Must be "auto", "gzip" or "none"`)
+}
+
func feed(r io.Reader, cli *CLI, options feedOptions) error {
service, err := documentService(cli)
if err != nil {
return err
}
clients := createServiceClients(service, options.connections)
+ compression, err := options.compressionMode()
+ if err != nil {
+ return err
+ }
client := document.NewClient(document.ClientOptions{
- Timeout: time.Duration(options.timeoutSecs) * time.Second,
- Route: options.route,
- TraceLevel: options.traceLevel,
- BaseURL: service.BaseURL,
+ Compression: compression,
+ Timeout: time.Duration(options.timeoutSecs) * time.Second,
+ Route: options.route,
+ TraceLevel: options.traceLevel,
+ BaseURL: service.BaseURL,
}, clients)
throttler := document.NewThrottler(options.connections)
// TODO(mpolden): Make doom duration configurable
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index 533ca7a0019..5c99f3bf056 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -29,8 +29,9 @@ type Dispatcher struct {
output io.Writer
verbose bool
+ listPool sync.Pool
mu sync.RWMutex
- wg sync.WaitGroup
+ workerWg sync.WaitGroup
resultWg sync.WaitGroup
}
@@ -42,21 +43,14 @@ type documentOp struct {
// documentGroup holds document operations which share an ID, and must be dispatched in order.
type documentGroup struct {
- ops *list.List
- mu sync.Mutex
+ q *Queue[documentOp]
+ mu sync.Mutex
}
func (g *documentGroup) add(op documentOp, first bool) {
g.mu.Lock()
defer g.mu.Unlock()
- if g.ops == nil {
- g.ops = list.New()
- }
- if first {
- g.ops.PushFront(op)
- } else {
- g.ops.PushBack(op)
- }
+ g.q.Add(op, first)
}
func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, output io.Writer, verbose bool) *Dispatcher {
@@ -74,11 +68,10 @@ func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, o
func (d *Dispatcher) sendDocumentIn(group *documentGroup) {
group.mu.Lock()
- first := group.ops.Front()
- if first == nil {
+ op, ok := group.q.Poll()
+ if !ok {
panic("sending from empty document group, this should not happen")
}
- op := group.ops.Remove(first).(documentOp)
op.attempts++
result := d.feeder.Send(op.document)
d.results <- result
@@ -134,46 +127,25 @@ func (d *Dispatcher) start() {
if d.started {
return
}
+ d.listPool.New = func() any { return list.New() }
d.ready = make(chan Id, 4096)
d.results = make(chan Result, 4096)
d.msgs = make(chan string, 4096)
d.started = true
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
- d.readDocuments()
- }()
d.resultWg.Add(2)
- go func() {
- defer d.resultWg.Done()
- d.readResults()
- }()
- go func() {
- defer d.resultWg.Done()
- d.readMessages()
- }()
+ go d.sumStats()
+ go d.printMessages()
}
-func (d *Dispatcher) readDocuments() {
- for id := range d.ready {
- d.mu.RLock()
- group := d.inflight[id.String()]
- d.mu.RUnlock()
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
- d.sendDocumentIn(group)
- }()
- }
-}
-
-func (d *Dispatcher) readResults() {
+func (d *Dispatcher) sumStats() {
+ defer d.resultWg.Done()
for result := range d.results {
d.stats.Add(result.Stats)
}
}
-func (d *Dispatcher) readMessages() {
+func (d *Dispatcher) printMessages() {
+ defer d.resultWg.Done()
for msg := range d.msgs {
fmt.Fprintln(d.output, msg)
}
@@ -187,7 +159,7 @@ func (d *Dispatcher) enqueue(op documentOp) error {
key := op.document.Id.String()
group, ok := d.inflight[key]
if !ok {
- group = &documentGroup{}
+ group = &documentGroup{q: NewQueue[documentOp](&d.listPool)}
d.inflight[key] = group
}
d.mu.Unlock()
@@ -200,6 +172,19 @@ func (d *Dispatcher) enqueueWithSlot(id Id) {
d.acquireSlot()
d.ready <- id
d.throttler.Sent()
+ d.dispatch()
+}
+
+func (d *Dispatcher) dispatch() {
+ d.workerWg.Add(1)
+ go func() {
+ defer d.workerWg.Done()
+ id := <-d.ready
+ d.mu.RLock()
+ group := d.inflight[id.String()]
+ d.mu.RUnlock()
+ d.sendDocumentIn(group)
+ }()
}
func (d *Dispatcher) acquireSlot() {
@@ -217,13 +202,7 @@ func (d *Dispatcher) Stats() Stats { return d.stats }
// Close closes the dispatcher and waits for all inflight operations to complete.
func (d *Dispatcher) Close() error {
- d.mu.Lock()
- if d.started {
- close(d.ready)
- }
- d.mu.Unlock()
- d.wg.Wait() // Wait for inflight operations to complete
-
+ d.workerWg.Wait() // Wait for all inflight operations to complete
d.mu.Lock()
if d.started {
close(d.results)
diff --git a/client/go/internal/vespa/document/document.go b/client/go/internal/vespa/document/document.go
index efb60ad8c0a..214d1dc4797 100644
--- a/client/go/internal/vespa/document/document.go
+++ b/client/go/internal/vespa/document/document.go
@@ -14,13 +14,15 @@ var asciiSpace = [256]uint8{'\t': 1, '\n': 1, '\v': 1, '\f': 1, '\r': 1, ' ': 1}
type Operation int
const (
- OperationPut = iota
+ OperationPut Operation = iota
OperationUpdate
OperationRemove
)
// Id represents a Vespa document ID.
type Id struct {
+ id string
+
Type string
Namespace string
Number *int64
@@ -36,24 +38,7 @@ func (d Id) Equal(o Id) bool {
d.UserSpecific == o.UserSpecific
}
-func (d Id) String() string {
- var sb strings.Builder
- sb.WriteString("id:")
- sb.WriteString(d.Namespace)
- sb.WriteString(":")
- sb.WriteString(d.Type)
- sb.WriteString(":")
- if d.Number != nil {
- sb.WriteString("n=")
- sb.WriteString(strconv.FormatInt(*d.Number, 10))
- } else if d.Group != "" {
- sb.WriteString("g=")
- sb.WriteString(d.Group)
- }
- sb.WriteString(":")
- sb.WriteString(d.UserSpecific)
- return sb.String()
-}
+func (d Id) String() string { return d.id }
// ParseId parses a serialized document ID string.
func ParseId(serialized string) (Id, error) {
@@ -95,6 +80,7 @@ func ParseId(serialized string) (Id, error) {
return Id{}, parseError(serialized)
}
return Id{
+ id: serialized,
Namespace: namespace,
Type: docType,
Number: number,
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go
index 1bcd7eff39e..51b6fa4de39 100644
--- a/client/go/internal/vespa/document/http.go
+++ b/client/go/internal/vespa/document/http.go
@@ -2,6 +2,7 @@ package document
import (
"bytes"
+ "compress/gzip"
"encoding/json"
"fmt"
"io"
@@ -16,6 +17,14 @@ import (
"github.com/vespa-engine/vespa/client/go/internal/util"
)
+type Compression int
+
+const (
+ CompressionAuto Compression = iota
+ CompressionNone
+ CompressionGzip
+)
+
// Client represents a HTTP client for the /document/v1/ API.
type Client struct {
options ClientOptions
@@ -26,10 +35,11 @@ type Client struct {
// ClientOptions specifices the configuration options of a feed client.
type ClientOptions struct {
- BaseURL string
- Timeout time.Duration
- Route string
- TraceLevel int
+ BaseURL string
+ Timeout time.Duration
+ Route string
+ TraceLevel int
+ Compression Compression
}
type countingHTTPClient struct {
@@ -152,6 +162,33 @@ func (c *Client) leastBusyClient() *countingHTTPClient {
return &leastBusy
}
+func (c *Client) createRequest(method, url string, body []byte) (*http.Request, error) {
+ var r io.Reader
+ useGzip := c.options.Compression == CompressionGzip || (c.options.Compression == CompressionAuto && len(body) > 512)
+ if useGzip {
+ var buf bytes.Buffer
+ w := gzip.NewWriter(&buf)
+ if _, err := w.Write(body); err != nil {
+ return nil, err
+ }
+ if err := w.Close(); err != nil {
+ return nil, err
+ }
+ r = &buf
+ } else {
+ r = bytes.NewReader(body)
+ }
+ req, err := http.NewRequest(method, url, r)
+ if err != nil {
+ return nil, err
+ }
+ if useGzip {
+ req.Header.Set("Content-Encoding", "gzip")
+ }
+ req.Header.Set("Content-Type", "application/json; charset=utf-8")
+ return req, nil
+}
+
// Send given document to the endpoint configured in this client.
func (c *Client) Send(document Document) Result {
start := c.now()
@@ -160,7 +197,7 @@ func (c *Client) Send(document Document) Result {
if err != nil {
return resultWithErr(result, err)
}
- req, err := http.NewRequest(method, url.String(), bytes.NewReader(document.Body))
+ req, err := c.createRequest(method, url.String(), document.Body)
if err != nil {
return resultWithErr(result, err)
}
diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go
index 8f8394a5d4e..314113c53be 100644
--- a/client/go/internal/vespa/document/http_test.go
+++ b/client/go/internal/vespa/document/http_test.go
@@ -7,6 +7,7 @@ import (
"net/http"
"net/url"
"reflect"
+ "strings"
"testing"
"time"
@@ -141,6 +142,55 @@ func TestClientSend(t *testing.T) {
}
}
+func TestClientSendCompressed(t *testing.T) {
+ httpClient := mock.HTTPClient{}
+ client := NewClient(ClientOptions{
+ BaseURL: "https://example.com:1337",
+ Timeout: time.Duration(5 * time.Second),
+ }, []util.HTTPClient{&httpClient})
+
+ bigBody := fmt.Sprintf(`{"fields":{"foo": "%s"}}`, strings.Repeat("s", 512+1))
+ bigDoc := Document{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(bigBody)}
+ smallDoc := Document{Create: true, Id: mustParseId("id:ns:type::doc2"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "s"}}`)}
+
+ client.options.Compression = CompressionNone
+ _ = client.Send(bigDoc)
+ assertCompressedRequest(t, false, httpClient.LastRequest)
+ _ = client.Send(smallDoc)
+ assertCompressedRequest(t, false, httpClient.LastRequest)
+
+ client.options.Compression = CompressionAuto
+ _ = client.Send(bigDoc)
+ assertCompressedRequest(t, true, httpClient.LastRequest)
+ _ = client.Send(smallDoc)
+ assertCompressedRequest(t, false, httpClient.LastRequest)
+
+ client.options.Compression = CompressionGzip
+ _ = client.Send(bigDoc)
+ assertCompressedRequest(t, true, httpClient.LastRequest)
+ _ = client.Send(smallDoc)
+ assertCompressedRequest(t, true, httpClient.LastRequest)
+}
+
+func assertCompressedRequest(t *testing.T, want bool, request *http.Request) {
+ wantEnc := ""
+ if want {
+ wantEnc = "gzip"
+ }
+ gotEnc := request.Header.Get("Content-Encoding")
+ if gotEnc != wantEnc {
+ t.Errorf("got Content-Encoding=%q, want %q", gotEnc, wantEnc)
+ }
+ body, err := io.ReadAll(request.Body)
+ if err != nil {
+ t.Fatal(err)
+ }
+ compressed := bytes.HasPrefix(body, []byte{0x1f, 0x8b})
+ if compressed != want {
+ t.Errorf("got compressed=%t, want %t", compressed, want)
+ }
+}
+
func TestURLPath(t *testing.T) {
tests := []struct {
in Id
diff --git a/client/go/internal/vespa/document/queue.go b/client/go/internal/vespa/document/queue.go
new file mode 100644
index 00000000000..2e5a1976d58
--- /dev/null
+++ b/client/go/internal/vespa/document/queue.go
@@ -0,0 +1,43 @@
+package document
+
+import (
+ "container/list"
+ "sync"
+)
+
+// Queue wraps a doubly linked list. It attempts to re-use lists through a sync.Pool to reduce GC pressure.
+type Queue[T any] struct {
+ items *list.List
+ listPool *sync.Pool
+}
+
+func NewQueue[T any](listPool *sync.Pool) *Queue[T] {
+ if listPool.New == nil {
+ listPool.New = func() any { return list.New() }
+ }
+ return &Queue[T]{listPool: listPool}
+}
+
+func (q *Queue[T]) Add(item T, front bool) {
+ if q.items == nil {
+ q.items = q.listPool.Get().(*list.List)
+ }
+ if front {
+ q.items.PushFront(item)
+ } else {
+ q.items.PushBack(item)
+ }
+}
+
+func (q *Queue[T]) Poll() (T, bool) {
+ if q.items == nil || q.items.Front() == nil {
+ var empty T
+ return empty, false
+ }
+ item := q.items.Remove(q.items.Front()).(T)
+ if q.items.Front() == nil { // Emptied queue, release list back to pool
+ q.listPool.Put(q.items)
+ q.items = nil
+ }
+ return item, true
+}
diff --git a/client/go/internal/vespa/document/queue_test.go b/client/go/internal/vespa/document/queue_test.go
new file mode 100644
index 00000000000..992e7410053
--- /dev/null
+++ b/client/go/internal/vespa/document/queue_test.go
@@ -0,0 +1,29 @@
+package document
+
+import (
+ "sync"
+ "testing"
+)
+
+func TestQueue(t *testing.T) {
+ q := NewQueue[int](&sync.Pool{})
+ assertPoll(t, q, 0, false)
+ q.Add(1, false)
+ q.Add(2, false)
+ assertPoll(t, q, 1, true)
+ assertPoll(t, q, 2, true)
+ q.Add(3, false)
+ q.Add(4, true)
+ assertPoll(t, q, 4, true)
+ assertPoll(t, q, 3, true)
+}
+
+func assertPoll(t *testing.T, q *Queue[int], want int, wantOk bool) {
+ got, ok := q.Poll()
+ if ok != wantOk {
+ t.Fatalf("got ok=%t, want %t", ok, wantOk)
+ }
+ if got != want {
+ t.Fatalf("got v=%d, want %d", got, want)
+ }
+}