diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-04-19 12:18:28 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-04-19 12:18:28 +0200 |
commit | f8f367921956b5e0a7e9927fdecaf1713a80fbf8 (patch) | |
tree | 52e3f33f75af97c17750a444df6be8263fbffb12 | |
parent | 90e9ba99448f970558fd9d7cefae370b522a8e91 (diff) | |
parent | 9f3ba858930efafa2a466971574e4dc98a3d0d7a (diff) |
Merge pull request #26778 from vespa-engine/mpolden/feed-client-8
Add compression
-rw-r--r-- | client/go/internal/cli/cmd/feed.go | 56 | ||||
-rw-r--r-- | client/go/internal/vespa/document/dispatcher.go | 79 | ||||
-rw-r--r-- | client/go/internal/vespa/document/document.go | 24 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http.go | 47 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http_test.go | 50 | ||||
-rw-r--r-- | client/go/internal/vespa/document/queue.go | 43 | ||||
-rw-r--r-- | client/go/internal/vespa/document/queue_test.go | 29 |
7 files changed, 249 insertions, 79 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index f0f82dd80d1..06568dd35c3 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -6,6 +6,7 @@ import ( "io" "math" "os" + "runtime/pprof" "time" "github.com/spf13/cobra" @@ -16,18 +17,29 @@ import ( func addFeedFlags(cmd *cobra.Command, options *feedOptions) { cmd.PersistentFlags().IntVar(&options.connections, "connections", 8, "The number of connections to use") + cmd.PersistentFlags().StringVar(&options.compression, "compression", "auto", `Compression mode to use. Default is "auto" which compresses large documents. Must be "auto", "gzip" or "none"`) cmd.PersistentFlags().StringVar(&options.route, "route", "", "Target Vespa route for feed operations") cmd.PersistentFlags().IntVar(&options.traceLevel, "trace", 0, "The trace level of network traffic. 0 to disable") cmd.PersistentFlags().IntVar(&options.timeoutSecs, "timeout", 0, "Feed operation timeout in seconds. 0 to disable") cmd.PersistentFlags().BoolVar(&options.verbose, "verbose", false, "Verbose mode. Print successful operations in addition to errors") + memprofile := "memprofile" + cpuprofile := "cpuprofile" + cmd.PersistentFlags().StringVar(&options.memprofile, memprofile, "", "Write a heap profile to given file") + cmd.PersistentFlags().StringVar(&options.cpuprofile, cpuprofile, "", "Write a CPU profile to given file") + // Hide these flags as they are intended for internal use + cmd.PersistentFlags().MarkHidden(memprofile) + cmd.PersistentFlags().MarkHidden(cpuprofile) } type feedOptions struct { connections int + compression string route string verbose bool traceLevel int timeoutSecs int + memprofile string + cpuprofile string } func newFeedCmd(cli *CLI) *cobra.Command { @@ -64,7 +76,24 @@ $ cat documents.jsonl | vespa feed - defer f.Close() r = f } - return feed(r, cli, options) + if options.cpuprofile != "" { + f, err := os.Create(options.cpuprofile) + if err != nil { + return err + } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() + } + err := feed(r, cli, options) + if options.memprofile != "" { + f, err := os.Create(options.memprofile) + if err != nil { + return err + } + defer f.Close() + pprof.WriteHeapProfile(f) + } + return err }, } addFeedFlags(cmd, &options) @@ -82,17 +111,34 @@ func createServiceClients(service *vespa.Service, n int) []util.HTTPClient { return clients } +func (opts feedOptions) compressionMode() (document.Compression, error) { + switch opts.compression { + case "auto": + return document.CompressionAuto, nil + case "none": + return document.CompressionNone, nil + case "gzip": + return document.CompressionGzip, nil + } + return 0, errHint(fmt.Errorf("invalid compression mode: %s", opts.compression), `Must be "auto", "gzip" or "none"`) +} + func feed(r io.Reader, cli *CLI, options feedOptions) error { service, err := documentService(cli) if err != nil { return err } clients := createServiceClients(service, options.connections) + compression, err := options.compressionMode() + if err != nil { + return err + } client := document.NewClient(document.ClientOptions{ - Timeout: time.Duration(options.timeoutSecs) * time.Second, - Route: options.route, - TraceLevel: options.traceLevel, - BaseURL: service.BaseURL, + Compression: compression, + Timeout: time.Duration(options.timeoutSecs) * time.Second, + Route: options.route, + TraceLevel: options.traceLevel, + BaseURL: service.BaseURL, }, clients) throttler := document.NewThrottler(options.connections) // TODO(mpolden): Make doom duration configurable diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 533ca7a0019..5c99f3bf056 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -29,8 +29,9 @@ type Dispatcher struct { output io.Writer verbose bool + listPool sync.Pool mu sync.RWMutex - wg sync.WaitGroup + workerWg sync.WaitGroup resultWg sync.WaitGroup } @@ -42,21 +43,14 @@ type documentOp struct { // documentGroup holds document operations which share an ID, and must be dispatched in order. type documentGroup struct { - ops *list.List - mu sync.Mutex + q *Queue[documentOp] + mu sync.Mutex } func (g *documentGroup) add(op documentOp, first bool) { g.mu.Lock() defer g.mu.Unlock() - if g.ops == nil { - g.ops = list.New() - } - if first { - g.ops.PushFront(op) - } else { - g.ops.PushBack(op) - } + g.q.Add(op, first) } func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, output io.Writer, verbose bool) *Dispatcher { @@ -74,11 +68,10 @@ func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, o func (d *Dispatcher) sendDocumentIn(group *documentGroup) { group.mu.Lock() - first := group.ops.Front() - if first == nil { + op, ok := group.q.Poll() + if !ok { panic("sending from empty document group, this should not happen") } - op := group.ops.Remove(first).(documentOp) op.attempts++ result := d.feeder.Send(op.document) d.results <- result @@ -134,46 +127,25 @@ func (d *Dispatcher) start() { if d.started { return } + d.listPool.New = func() any { return list.New() } d.ready = make(chan Id, 4096) d.results = make(chan Result, 4096) d.msgs = make(chan string, 4096) d.started = true - d.wg.Add(1) - go func() { - defer d.wg.Done() - d.readDocuments() - }() d.resultWg.Add(2) - go func() { - defer d.resultWg.Done() - d.readResults() - }() - go func() { - defer d.resultWg.Done() - d.readMessages() - }() + go d.sumStats() + go d.printMessages() } -func (d *Dispatcher) readDocuments() { - for id := range d.ready { - d.mu.RLock() - group := d.inflight[id.String()] - d.mu.RUnlock() - d.wg.Add(1) - go func() { - defer d.wg.Done() - d.sendDocumentIn(group) - }() - } -} - -func (d *Dispatcher) readResults() { +func (d *Dispatcher) sumStats() { + defer d.resultWg.Done() for result := range d.results { d.stats.Add(result.Stats) } } -func (d *Dispatcher) readMessages() { +func (d *Dispatcher) printMessages() { + defer d.resultWg.Done() for msg := range d.msgs { fmt.Fprintln(d.output, msg) } @@ -187,7 +159,7 @@ func (d *Dispatcher) enqueue(op documentOp) error { key := op.document.Id.String() group, ok := d.inflight[key] if !ok { - group = &documentGroup{} + group = &documentGroup{q: NewQueue[documentOp](&d.listPool)} d.inflight[key] = group } d.mu.Unlock() @@ -200,6 +172,19 @@ func (d *Dispatcher) enqueueWithSlot(id Id) { d.acquireSlot() d.ready <- id d.throttler.Sent() + d.dispatch() +} + +func (d *Dispatcher) dispatch() { + d.workerWg.Add(1) + go func() { + defer d.workerWg.Done() + id := <-d.ready + d.mu.RLock() + group := d.inflight[id.String()] + d.mu.RUnlock() + d.sendDocumentIn(group) + }() } func (d *Dispatcher) acquireSlot() { @@ -217,13 +202,7 @@ func (d *Dispatcher) Stats() Stats { return d.stats } // Close closes the dispatcher and waits for all inflight operations to complete. func (d *Dispatcher) Close() error { - d.mu.Lock() - if d.started { - close(d.ready) - } - d.mu.Unlock() - d.wg.Wait() // Wait for inflight operations to complete - + d.workerWg.Wait() // Wait for all inflight operations to complete d.mu.Lock() if d.started { close(d.results) diff --git a/client/go/internal/vespa/document/document.go b/client/go/internal/vespa/document/document.go index efb60ad8c0a..214d1dc4797 100644 --- a/client/go/internal/vespa/document/document.go +++ b/client/go/internal/vespa/document/document.go @@ -14,13 +14,15 @@ var asciiSpace = [256]uint8{'\t': 1, '\n': 1, '\v': 1, '\f': 1, '\r': 1, ' ': 1} type Operation int const ( - OperationPut = iota + OperationPut Operation = iota OperationUpdate OperationRemove ) // Id represents a Vespa document ID. type Id struct { + id string + Type string Namespace string Number *int64 @@ -36,24 +38,7 @@ func (d Id) Equal(o Id) bool { d.UserSpecific == o.UserSpecific } -func (d Id) String() string { - var sb strings.Builder - sb.WriteString("id:") - sb.WriteString(d.Namespace) - sb.WriteString(":") - sb.WriteString(d.Type) - sb.WriteString(":") - if d.Number != nil { - sb.WriteString("n=") - sb.WriteString(strconv.FormatInt(*d.Number, 10)) - } else if d.Group != "" { - sb.WriteString("g=") - sb.WriteString(d.Group) - } - sb.WriteString(":") - sb.WriteString(d.UserSpecific) - return sb.String() -} +func (d Id) String() string { return d.id } // ParseId parses a serialized document ID string. func ParseId(serialized string) (Id, error) { @@ -95,6 +80,7 @@ func ParseId(serialized string) (Id, error) { return Id{}, parseError(serialized) } return Id{ + id: serialized, Namespace: namespace, Type: docType, Number: number, diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go index 1bcd7eff39e..51b6fa4de39 100644 --- a/client/go/internal/vespa/document/http.go +++ b/client/go/internal/vespa/document/http.go @@ -2,6 +2,7 @@ package document import ( "bytes" + "compress/gzip" "encoding/json" "fmt" "io" @@ -16,6 +17,14 @@ import ( "github.com/vespa-engine/vespa/client/go/internal/util" ) +type Compression int + +const ( + CompressionAuto Compression = iota + CompressionNone + CompressionGzip +) + // Client represents a HTTP client for the /document/v1/ API. type Client struct { options ClientOptions @@ -26,10 +35,11 @@ type Client struct { // ClientOptions specifices the configuration options of a feed client. type ClientOptions struct { - BaseURL string - Timeout time.Duration - Route string - TraceLevel int + BaseURL string + Timeout time.Duration + Route string + TraceLevel int + Compression Compression } type countingHTTPClient struct { @@ -152,6 +162,33 @@ func (c *Client) leastBusyClient() *countingHTTPClient { return &leastBusy } +func (c *Client) createRequest(method, url string, body []byte) (*http.Request, error) { + var r io.Reader + useGzip := c.options.Compression == CompressionGzip || (c.options.Compression == CompressionAuto && len(body) > 512) + if useGzip { + var buf bytes.Buffer + w := gzip.NewWriter(&buf) + if _, err := w.Write(body); err != nil { + return nil, err + } + if err := w.Close(); err != nil { + return nil, err + } + r = &buf + } else { + r = bytes.NewReader(body) + } + req, err := http.NewRequest(method, url, r) + if err != nil { + return nil, err + } + if useGzip { + req.Header.Set("Content-Encoding", "gzip") + } + req.Header.Set("Content-Type", "application/json; charset=utf-8") + return req, nil +} + // Send given document to the endpoint configured in this client. func (c *Client) Send(document Document) Result { start := c.now() @@ -160,7 +197,7 @@ func (c *Client) Send(document Document) Result { if err != nil { return resultWithErr(result, err) } - req, err := http.NewRequest(method, url.String(), bytes.NewReader(document.Body)) + req, err := c.createRequest(method, url.String(), document.Body) if err != nil { return resultWithErr(result, err) } diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go index 8f8394a5d4e..314113c53be 100644 --- a/client/go/internal/vespa/document/http_test.go +++ b/client/go/internal/vespa/document/http_test.go @@ -7,6 +7,7 @@ import ( "net/http" "net/url" "reflect" + "strings" "testing" "time" @@ -141,6 +142,55 @@ func TestClientSend(t *testing.T) { } } +func TestClientSendCompressed(t *testing.T) { + httpClient := mock.HTTPClient{} + client := NewClient(ClientOptions{ + BaseURL: "https://example.com:1337", + Timeout: time.Duration(5 * time.Second), + }, []util.HTTPClient{&httpClient}) + + bigBody := fmt.Sprintf(`{"fields":{"foo": "%s"}}`, strings.Repeat("s", 512+1)) + bigDoc := Document{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(bigBody)} + smallDoc := Document{Create: true, Id: mustParseId("id:ns:type::doc2"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "s"}}`)} + + client.options.Compression = CompressionNone + _ = client.Send(bigDoc) + assertCompressedRequest(t, false, httpClient.LastRequest) + _ = client.Send(smallDoc) + assertCompressedRequest(t, false, httpClient.LastRequest) + + client.options.Compression = CompressionAuto + _ = client.Send(bigDoc) + assertCompressedRequest(t, true, httpClient.LastRequest) + _ = client.Send(smallDoc) + assertCompressedRequest(t, false, httpClient.LastRequest) + + client.options.Compression = CompressionGzip + _ = client.Send(bigDoc) + assertCompressedRequest(t, true, httpClient.LastRequest) + _ = client.Send(smallDoc) + assertCompressedRequest(t, true, httpClient.LastRequest) +} + +func assertCompressedRequest(t *testing.T, want bool, request *http.Request) { + wantEnc := "" + if want { + wantEnc = "gzip" + } + gotEnc := request.Header.Get("Content-Encoding") + if gotEnc != wantEnc { + t.Errorf("got Content-Encoding=%q, want %q", gotEnc, wantEnc) + } + body, err := io.ReadAll(request.Body) + if err != nil { + t.Fatal(err) + } + compressed := bytes.HasPrefix(body, []byte{0x1f, 0x8b}) + if compressed != want { + t.Errorf("got compressed=%t, want %t", compressed, want) + } +} + func TestURLPath(t *testing.T) { tests := []struct { in Id diff --git a/client/go/internal/vespa/document/queue.go b/client/go/internal/vespa/document/queue.go new file mode 100644 index 00000000000..2e5a1976d58 --- /dev/null +++ b/client/go/internal/vespa/document/queue.go @@ -0,0 +1,43 @@ +package document + +import ( + "container/list" + "sync" +) + +// Queue wraps a doubly linked list. It attempts to re-use lists through a sync.Pool to reduce GC pressure. +type Queue[T any] struct { + items *list.List + listPool *sync.Pool +} + +func NewQueue[T any](listPool *sync.Pool) *Queue[T] { + if listPool.New == nil { + listPool.New = func() any { return list.New() } + } + return &Queue[T]{listPool: listPool} +} + +func (q *Queue[T]) Add(item T, front bool) { + if q.items == nil { + q.items = q.listPool.Get().(*list.List) + } + if front { + q.items.PushFront(item) + } else { + q.items.PushBack(item) + } +} + +func (q *Queue[T]) Poll() (T, bool) { + if q.items == nil || q.items.Front() == nil { + var empty T + return empty, false + } + item := q.items.Remove(q.items.Front()).(T) + if q.items.Front() == nil { // Emptied queue, release list back to pool + q.listPool.Put(q.items) + q.items = nil + } + return item, true +} diff --git a/client/go/internal/vespa/document/queue_test.go b/client/go/internal/vespa/document/queue_test.go new file mode 100644 index 00000000000..992e7410053 --- /dev/null +++ b/client/go/internal/vespa/document/queue_test.go @@ -0,0 +1,29 @@ +package document + +import ( + "sync" + "testing" +) + +func TestQueue(t *testing.T) { + q := NewQueue[int](&sync.Pool{}) + assertPoll(t, q, 0, false) + q.Add(1, false) + q.Add(2, false) + assertPoll(t, q, 1, true) + assertPoll(t, q, 2, true) + q.Add(3, false) + q.Add(4, true) + assertPoll(t, q, 4, true) + assertPoll(t, q, 3, true) +} + +func assertPoll(t *testing.T, q *Queue[int], want int, wantOk bool) { + got, ok := q.Poll() + if ok != wantOk { + t.Fatalf("got ok=%t, want %t", ok, wantOk) + } + if got != want { + t.Fatalf("got v=%d, want %d", got, want) + } +} |