summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-04-11 12:55:14 +0200
committerGitHub <noreply@github.com>2023-04-11 12:55:14 +0200
commit367fd87120ba0f1ef00be7a5800d1c612da42942 (patch)
treeb975c9eb0ab34795f25c90ad68c35a481d929d70
parent713394b027d243eaf4ee44882910d8e328a811c6 (diff)
parent2e716d564ef8b8aff1a443f11759acb23a52641b (diff)
Merge pull request #26700 from vespa-engine/mpolden/feed-client-6
Support feeding with multiple connections/clients
-rw-r--r--client/go/internal/cli/auth/zts/zts.go2
-rw-r--r--client/go/internal/cli/cmd/feed.go57
-rw-r--r--client/go/internal/cli/cmd/feed_test.go15
-rw-r--r--client/go/internal/cli/cmd/root.go4
-rw-r--r--client/go/internal/cli/cmd/test.go2
-rw-r--r--client/go/internal/mock/http.go4
-rw-r--r--client/go/internal/util/http.go71
-rw-r--r--client/go/internal/vespa/document/dispatcher.go129
-rw-r--r--client/go/internal/vespa/document/dispatcher_test.go15
-rw-r--r--client/go/internal/vespa/document/document.go21
-rw-r--r--client/go/internal/vespa/document/feeder.go7
-rw-r--r--client/go/internal/vespa/document/http.go110
-rw-r--r--client/go/internal/vespa/document/http_test.go64
-rw-r--r--client/go/internal/vespa/document/throttler.go41
-rw-r--r--client/go/internal/vespa/document/throttler_test.go6
-rw-r--r--client/go/internal/vespa/target.go16
-rw-r--r--client/go/internal/vespa/target_cloud.go4
17 files changed, 378 insertions, 190 deletions
diff --git a/client/go/internal/cli/auth/zts/zts.go b/client/go/internal/cli/auth/zts/zts.go
index 1e84912a271..caa2d03367d 100644
--- a/client/go/internal/cli/auth/zts/zts.go
+++ b/client/go/internal/cli/auth/zts/zts.go
@@ -37,7 +37,7 @@ func (c *Client) AccessToken(domain string, certificate tls.Certificate) (string
if err != nil {
return "", err
}
- util.SetCertificate(c.client, []tls.Certificate{certificate})
+ util.SetCertificates(c.client, []tls.Certificate{certificate})
response, err := c.client.Do(req, 10*time.Second)
if err != nil {
return "", err
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go
index c8e032929b8..895a22d2be5 100644
--- a/client/go/internal/cli/cmd/feed.go
+++ b/client/go/internal/cli/cmd/feed.go
@@ -9,17 +9,20 @@ import (
"time"
"github.com/spf13/cobra"
+ "github.com/vespa-engine/vespa/client/go/internal/util"
+ "github.com/vespa-engine/vespa/client/go/internal/vespa"
"github.com/vespa-engine/vespa/client/go/internal/vespa/document"
)
-func addFeedFlags(cmd *cobra.Command, concurrency *int) {
- // TOOD(mpolden): Remove this flag
- cmd.PersistentFlags().IntVarP(concurrency, "concurrency", "T", 64, "Number of goroutines to use for dispatching")
+func addFeedFlags(cmd *cobra.Command, verbose *bool, connections *int) {
+ cmd.PersistentFlags().IntVarP(connections, "connections", "N", 8, "The number of connections to use")
+ cmd.PersistentFlags().BoolVarP(verbose, "verbose", "v", false, "Verbose mode. Print errors as they happen")
}
func newFeedCmd(cli *CLI) *cobra.Command {
var (
- concurrency int
+ verbose bool
+ connections int
)
cmd := &cobra.Command{
Use: "feed FILE",
@@ -27,42 +30,66 @@ func newFeedCmd(cli *CLI) *cobra.Command {
Long: `Feed documents to a Vespa cluster.
A high performance feeding client. This can be used to feed large amounts of
-documents to Vespa cluster efficiently.
+documents to a Vespa cluster efficiently.
The contents of FILE must be either a JSON array or JSON objects separated by
newline (JSONL).
+
+If FILE is a single dash ('-'), documents will be read from standard input.
`,
Example: `$ vespa feed documents.jsonl
+$ cat documents.jsonl | vespa feed -
`,
Args: cobra.ExactArgs(1),
DisableAutoGenTag: true,
SilenceUsage: true,
Hidden: true, // TODO(mpolden): Remove when ready for public use
RunE: func(cmd *cobra.Command, args []string) error {
- f, err := os.Open(args[0])
- if err != nil {
- return err
+ var r io.Reader
+ if args[0] == "-" {
+ r = cli.Stdin
+ } else {
+ f, err := os.Open(args[0])
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ r = f
}
- defer f.Close()
- return feed(f, cli, concurrency)
+ return feed(r, cli, verbose, connections)
},
}
- addFeedFlags(cmd, &concurrency)
+ addFeedFlags(cmd, &verbose, &connections)
return cmd
}
-func feed(r io.Reader, cli *CLI, concurrency int) error {
+func createServiceClients(service *vespa.Service, n int) []util.HTTPClient {
+ clients := make([]util.HTTPClient, 0, n)
+ for i := 0; i < n; i++ {
+ client := service.Client().Clone()
+ util.ForceHTTP2(client, service.TLSOptions.KeyPair) // Feeding should always use HTTP/2
+ clients = append(clients, client)
+ }
+ return clients
+}
+
+func feed(r io.Reader, cli *CLI, verbose bool, connections int) error {
service, err := documentService(cli)
if err != nil {
return err
}
+ clients := createServiceClients(service, connections)
client := document.NewClient(document.ClientOptions{
BaseURL: service.BaseURL,
- }, service)
- throttler := document.NewThrottler()
+ }, clients)
+ throttler := document.NewThrottler(connections)
// TODO(mpolden): Make doom duration configurable
circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0)
- dispatcher := document.NewDispatcher(client, throttler, circuitBreaker)
+ errWriter := io.Discard
+ if verbose {
+ errWriter = cli.Stderr
+ }
+ dispatcher := document.NewDispatcher(client, throttler, circuitBreaker, errWriter)
dec := document.NewDecoder(r)
start := cli.now()
diff --git a/client/go/internal/cli/cmd/feed_test.go b/client/go/internal/cli/cmd/feed_test.go
index 1bf1ef6ab9b..521d2b2abd0 100644
--- a/client/go/internal/cli/cmd/feed_test.go
+++ b/client/go/internal/cli/cmd/feed_test.go
@@ -1,6 +1,7 @@
package cmd
import (
+ "bytes"
"os"
"path/filepath"
"testing"
@@ -42,7 +43,7 @@ func TestFeed(t *testing.T) {
require.Nil(t, cli.Run("feed", jsonFile))
assert.Equal(t, "", stderr.String())
- assert.Equal(t, `{
+ want := `{
"feeder.seconds": 1.000,
"feeder.ok.count": 1,
"feeder.ok.rate": 1.000,
@@ -63,5 +64,15 @@ func TestFeed(t *testing.T) {
"200": 1
}
}
-`, stdout.String())
+`
+ assert.Equal(t, want, stdout.String())
+
+ stdout.Reset()
+ cli.Stdin = bytes.NewBuffer([]byte(`{
+ "put": "id:ns:type::doc1",
+ "fields": {"foo": "123"}
+}`))
+ httpClient.NextResponseString(200, `{"message":"OK"}`)
+ require.Nil(t, cli.Run("feed", "-"))
+ assert.Equal(t, want, stdout.String())
}
diff --git a/client/go/internal/cli/cmd/root.go b/client/go/internal/cli/cmd/root.go
index 58e940d59ef..360af9d0dcf 100644
--- a/client/go/internal/cli/cmd/root.go
+++ b/client/go/internal/cli/cmd/root.go
@@ -366,7 +366,7 @@ func (c *CLI) createCloudTarget(targetType string, opts targetOptions) (vespa.Ta
return nil, errHint(err, "Deployment to cloud requires a certificate. Try 'vespa auth cert'")
}
deploymentTLSOptions = vespa.TLSOptions{
- KeyPair: &kp.KeyPair,
+ KeyPair: []tls.Certificate{kp.KeyPair},
CertificateFile: kp.CertificateFile,
PrivateKeyFile: kp.PrivateKeyFile,
}
@@ -377,7 +377,7 @@ func (c *CLI) createCloudTarget(targetType string, opts targetOptions) (vespa.Ta
return nil, errHint(err, "Deployment to hosted requires an Athenz certificate", "Try renewing certificate with 'athenz-user-cert'")
}
apiTLSOptions = vespa.TLSOptions{
- KeyPair: &kp.KeyPair,
+ KeyPair: []tls.Certificate{kp.KeyPair},
CertificateFile: kp.CertificateFile,
PrivateKeyFile: kp.PrivateKeyFile,
}
diff --git a/client/go/internal/cli/cmd/test.go b/client/go/internal/cli/cmd/test.go
index 4a53fe6bed3..05633b1135e 100644
--- a/client/go/internal/cli/cmd/test.go
+++ b/client/go/internal/cli/cmd/test.go
@@ -263,7 +263,7 @@ func verify(step step, defaultCluster string, defaultParameters map[string]strin
var response *http.Response
if externalEndpoint {
- util.SetCertificate(context.cli.httpClient, []tls.Certificate{})
+ util.SetCertificates(context.cli.httpClient, []tls.Certificate{})
response, err = context.cli.httpClient.Do(request, 60*time.Second)
} else {
response, err = service.Do(request, 600*time.Second) // Vespa should provide a response within the given request timeout
diff --git a/client/go/internal/mock/http.go b/client/go/internal/mock/http.go
index 9c55f2e79bf..58614d7e5bd 100644
--- a/client/go/internal/mock/http.go
+++ b/client/go/internal/mock/http.go
@@ -6,6 +6,8 @@ import (
"net/http"
"strconv"
"time"
+
+ "github.com/vespa-engine/vespa/client/go/internal/util"
)
type HTTPClient struct {
@@ -58,3 +60,5 @@ func (c *HTTPClient) Do(request *http.Request, timeout time.Duration) (*http.Res
},
nil
}
+
+func (c *HTTPClient) Clone() util.HTTPClient { return c }
diff --git a/client/go/internal/util/http.go b/client/go/internal/util/http.go
index cb35932c8e7..dcf05ed3a14 100644
--- a/client/go/internal/util/http.go
+++ b/client/go/internal/util/http.go
@@ -2,9 +2,10 @@
package util
import (
- "bytes"
+ "context"
"crypto/tls"
"fmt"
+ "net"
"net/http"
"time"
@@ -14,6 +15,7 @@ import (
type HTTPClient interface {
Do(request *http.Request, timeout time.Duration) (response *http.Response, error error)
+ Clone() HTTPClient
}
type defaultHTTPClient struct {
@@ -31,50 +33,57 @@ func (c *defaultHTTPClient) Do(request *http.Request, timeout time.Duration) (re
return c.client.Do(request)
}
-func SetCertificate(client HTTPClient, certificates []tls.Certificate) {
+func (c *defaultHTTPClient) Clone() HTTPClient { return CreateClient(c.client.Timeout) }
+
+func SetCertificates(client HTTPClient, certificates []tls.Certificate) {
c, ok := client.(*defaultHTTPClient)
if !ok {
return
}
- // Use HTTP/2 transport explicitly. Connection reuse does not work properly when using regular http.Transport, even
- // though it upgrades to HTTP/2 automatically
- // https://github.com/golang/go/issues/16582
- // https://github.com/golang/go/issues/22091
- var transport *http2.Transport
- if _, ok := c.client.Transport.(*http.Transport); ok {
- transport = &http2.Transport{}
- c.client.Transport = transport
- } else if t, ok := c.client.Transport.(*http2.Transport); ok {
- transport = t
- } else {
- panic(fmt.Sprintf("unknown transport type: %T", c.client.Transport))
- }
- if ok && !c.hasCertificates(transport.TLSClientConfig, certificates) {
- transport.TLSClientConfig = &tls.Config{
+ var tlsConfig *tls.Config = nil
+ if certificates != nil {
+ tlsConfig = &tls.Config{
Certificates: certificates,
MinVersion: tls.VersionTLS12,
}
}
+ if tr, ok := c.client.Transport.(*http.Transport); ok {
+ tr.TLSClientConfig = tlsConfig
+ } else if tr, ok := c.client.Transport.(*http2.Transport); ok {
+ tr.TLSClientConfig = tlsConfig
+ } else {
+ panic(fmt.Sprintf("unknown transport type: %T", c.client.Transport))
+ }
}
-func (c *defaultHTTPClient) hasCertificates(tlsConfig *tls.Config, certs []tls.Certificate) bool {
- if tlsConfig == nil {
- return false
- }
- if len(tlsConfig.Certificates) != len(certs) {
- return false
+func ForceHTTP2(client HTTPClient, certificates []tls.Certificate) {
+ c, ok := client.(*defaultHTTPClient)
+ if !ok {
+ return
}
- for i := 0; i < len(certs); i++ {
- if len(tlsConfig.Certificates[i].Certificate) != len(certs[i].Certificate) {
- return false
+ var tlsConfig *tls.Config = nil
+ var dialFunc func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error)
+ if certificates != nil {
+ tlsConfig = &tls.Config{
+ Certificates: certificates,
+ MinVersion: tls.VersionTLS12,
}
- for j := 0; j < len(certs[i].Certificate); j++ {
- if !bytes.Equal(tlsConfig.Certificates[i].Certificate[j], certs[i].Certificate[j]) {
- return false
- }
+ } else {
+ // No certificate, so force H2C (HTTP/2 over clear-text) by using a non-TLS Dialer
+ dialer := net.Dialer{}
+ dialFunc = func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error) {
+ return dialer.DialContext(ctx, network, addr)
}
}
- return true
+ // Use HTTP/2 transport explicitly. Connection reuse does not work properly when using regular http.Transport, even
+ // though it upgrades to HTTP/2 automatically
+ // https://github.com/golang/go/issues/16582
+ // https://github.com/golang/go/issues/22091
+ c.client.Transport = &http2.Transport{
+ AllowHTTP: true,
+ TLSClientConfig: tlsConfig,
+ DialTLSContext: dialFunc,
+ }
}
func CreateClient(timeout time.Duration) HTTPClient {
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index 7011ae7a9b6..838a7bc45ee 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -1,7 +1,9 @@
package document
import (
+ "container/list"
"fmt"
+ "io"
"sync"
"sync/atomic"
"time"
@@ -16,64 +18,70 @@ type Dispatcher struct {
circuitBreaker CircuitBreaker
stats Stats
- closed bool
+ started bool
ready chan Id
results chan Result
inflight map[string]*documentGroup
inflightCount int64
+ errWriter io.Writer
mu sync.RWMutex
wg sync.WaitGroup
resultWg sync.WaitGroup
}
-// documentGroup holds document operations which share their ID, and must be dispatched in order.
-type documentGroup struct {
- id Id
- operations []documentOp
- mu sync.Mutex
-}
-
+// documentOp represents a document operation and the number of times it has been attempted.
type documentOp struct {
document Document
attempts int
}
-func (g *documentGroup) append(op documentOp) {
+// documentGroup holds document operations which share an ID, and must be dispatched in order.
+type documentGroup struct {
+ ops *list.List
+ mu sync.Mutex
+}
+
+func (g *documentGroup) add(op documentOp, first bool) {
g.mu.Lock()
defer g.mu.Unlock()
- g.operations = append(g.operations, op)
+ if g.ops == nil {
+ g.ops = list.New()
+ }
+ if first {
+ g.ops.PushFront(op)
+ } else {
+ g.ops.PushBack(op)
+ }
}
-func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker) *Dispatcher {
+func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, errWriter io.Writer) *Dispatcher {
d := &Dispatcher{
feeder: feeder,
throttler: throttler,
circuitBreaker: breaker,
inflight: make(map[string]*documentGroup),
+ errWriter: errWriter,
}
d.start()
return d
}
-func (d *Dispatcher) dispatchAll(g *documentGroup) {
- g.mu.Lock()
- defer g.mu.Unlock()
- for i := 0; i < len(g.operations); i++ {
- op := g.operations[i]
- ok := false
- for !ok {
- op.attempts++
- result := d.feeder.Send(op.document)
- d.results <- result
- ok = result.Status.Success()
- if !d.shouldRetry(op, result) {
- break
- }
- }
- d.releaseSlot()
+func (d *Dispatcher) sendDocumentIn(group *documentGroup) {
+ group.mu.Lock()
+ defer group.mu.Unlock()
+ defer d.releaseSlot()
+ first := group.ops.Front()
+ if first == nil {
+ panic("sending from empty document group, this should not happen")
+ }
+ op := group.ops.Remove(first).(documentOp)
+ op.attempts++
+ result := d.feeder.Send(op.document)
+ d.results <- result
+ if d.shouldRetry(op, result) {
+ d.enqueue(op)
}
- g.operations = nil
}
func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool {
@@ -83,12 +91,26 @@ func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool {
return false
}
if result.HTTPStatus == 429 || result.HTTPStatus == 503 {
+ fmt.Fprintf(d.errWriter, "feed: %s was throttled with status %d: retrying\n", op.document, result.HTTPStatus)
d.throttler.Throttled(atomic.LoadInt64(&d.inflightCount))
return true
}
- if result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 {
+ if result.Err != nil || result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 {
+ retry := op.attempts <= maxAttempts
+ msg := "feed: " + op.document.String() + " failed with "
+ if result.Err != nil {
+ msg += "error " + result.Err.Error()
+ } else {
+ msg += fmt.Sprintf("status %d", result.HTTPStatus)
+ }
+ if retry {
+ msg += ": retrying"
+ } else {
+ msg += fmt.Sprintf(": giving up after %d attempts", maxAttempts)
+ }
+ fmt.Fprintln(d.errWriter, msg)
d.circuitBreaker.Error(fmt.Errorf("request failed with status %d", result.HTTPStatus))
- if op.attempts <= maxAttempts {
+ if retry {
return true
}
}
@@ -98,9 +120,12 @@ func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool {
func (d *Dispatcher) start() {
d.mu.Lock()
defer d.mu.Unlock()
+ if d.started {
+ return
+ }
d.ready = make(chan Id, 4096)
d.results = make(chan Result, 4096)
- d.closed = false
+ d.started = true
d.wg.Add(1)
go func() {
defer d.wg.Done()
@@ -118,13 +143,11 @@ func (d *Dispatcher) readDocuments() {
d.mu.RLock()
group := d.inflight[id.String()]
d.mu.RUnlock()
- if group != nil {
- d.wg.Add(1)
- go func() {
- defer d.wg.Done()
- d.dispatchAll(group)
- }()
- }
+ d.wg.Add(1)
+ go func() {
+ defer d.wg.Done()
+ d.sendDocumentIn(group)
+ }()
}
}
@@ -134,28 +157,22 @@ func (d *Dispatcher) readResults() {
}
}
-func (d *Dispatcher) Enqueue(doc Document) error {
+func (d *Dispatcher) enqueue(op documentOp) error {
d.mu.Lock()
- if d.closed {
+ if !d.started {
return fmt.Errorf("dispatcher is closed")
}
- group, ok := d.inflight[doc.Id.String()]
- if ok {
- group.append(documentOp{document: doc})
- } else {
- group = &documentGroup{
- id: doc.Id,
- operations: []documentOp{{document: doc}},
- }
- d.inflight[doc.Id.String()] = group
+ group, ok := d.inflight[op.document.Id.String()]
+ if !ok {
+ group = &documentGroup{}
+ d.inflight[op.document.Id.String()] = group
}
d.mu.Unlock()
- d.enqueueWithSlot(doc.Id)
+ group.add(op, op.attempts > 0)
+ d.enqueueWithSlot(op.document.Id)
return nil
}
-func (d *Dispatcher) Stats() Stats { return d.stats }
-
func (d *Dispatcher) enqueueWithSlot(id Id) {
d.acquireSlot()
d.ready <- id
@@ -173,16 +190,20 @@ func (d *Dispatcher) releaseSlot() { atomic.AddInt64(&d.inflightCount, -1) }
func closeAndWait[T any](ch chan T, wg *sync.WaitGroup, d *Dispatcher, markClosed bool) {
d.mu.Lock()
- if !d.closed {
+ if d.started {
close(ch)
if markClosed {
- d.closed = true
+ d.started = false
}
}
d.mu.Unlock()
wg.Wait()
}
+func (d *Dispatcher) Enqueue(doc Document) error { return d.enqueue(documentOp{document: doc}) }
+
+func (d *Dispatcher) Stats() Stats { return d.stats }
+
// Close closes the dispatcher and waits for all inflight operations to complete.
func (d *Dispatcher) Close() error {
closeAndWait(d.ready, &d.wg, d, false)
diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go
index 8a6d8c6117c..80bc5f603ae 100644
--- a/client/go/internal/vespa/document/dispatcher_test.go
+++ b/client/go/internal/vespa/document/dispatcher_test.go
@@ -1,6 +1,7 @@
package document
import (
+ "io"
"sync"
"testing"
"time"
@@ -29,7 +30,7 @@ func (f *mockFeeder) Send(doc Document) Result {
} else {
f.documents = append(f.documents, doc)
}
- if !result.Status.Success() {
+ if !result.Success() {
result.Stats.Errors = 1
}
return result
@@ -38,9 +39,9 @@ func (f *mockFeeder) Send(doc Document) Result {
func TestDispatcher(t *testing.T) {
feeder := &mockFeeder{}
clock := &manualClock{tick: time.Second}
- throttler := newThrottler(clock.now)
+ throttler := newThrottler(8, clock.now)
breaker := NewCircuitBreaker(time.Second, 0)
- dispatcher := NewDispatcher(feeder, throttler, breaker)
+ dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard)
docs := []Document{
{Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{"foo": "123"}}`)},
{Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Body: []byte(`{"fields":{"bar": "456"}}`)},
@@ -71,9 +72,9 @@ func TestDispatcherOrdering(t *testing.T) {
{Id: mustParseId("id:ns:type::doc9"), Operation: OperationPut},
}
clock := &manualClock{tick: time.Second}
- throttler := newThrottler(clock.now)
+ throttler := newThrottler(8, clock.now)
breaker := NewCircuitBreaker(time.Second, 0)
- dispatcher := NewDispatcher(feeder, throttler, breaker)
+ dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard)
for _, d := range docs {
dispatcher.Enqueue(d)
}
@@ -107,9 +108,9 @@ func TestDispatcherOrderingWithFailures(t *testing.T) {
}
feeder.failAfterN(2)
clock := &manualClock{tick: time.Second}
- throttler := newThrottler(clock.now)
+ throttler := newThrottler(8, clock.now)
breaker := NewCircuitBreaker(time.Second, 0)
- dispatcher := NewDispatcher(feeder, throttler, breaker)
+ dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard)
for _, d := range docs {
dispatcher.Enqueue(d)
}
diff --git a/client/go/internal/vespa/document/document.go b/client/go/internal/vespa/document/document.go
index 98cb2d1b6c6..efb60ad8c0a 100644
--- a/client/go/internal/vespa/document/document.go
+++ b/client/go/internal/vespa/document/document.go
@@ -130,6 +130,27 @@ type Decoder struct {
jsonl bool
}
+func (d Document) String() string {
+ var sb strings.Builder
+ switch d.Operation {
+ case OperationPut:
+ sb.WriteString("put ")
+ case OperationUpdate:
+ sb.WriteString("update ")
+ case OperationRemove:
+ sb.WriteString("remove ")
+ }
+ sb.WriteString(d.Id.String())
+ if d.Condition != "" {
+ sb.WriteString(", condition=")
+ sb.WriteString(d.Condition)
+ }
+ if d.Create {
+ sb.WriteString(", create=true")
+ }
+ return sb.String()
+}
+
func (d *Decoder) guessMode() error {
for !d.array && !d.jsonl {
b, err := d.buf.ReadByte()
diff --git a/client/go/internal/vespa/document/feeder.go b/client/go/internal/vespa/document/feeder.go
index 8bdd5bca5ba..9e6768d0eb4 100644
--- a/client/go/internal/vespa/document/feeder.go
+++ b/client/go/internal/vespa/document/feeder.go
@@ -17,8 +17,6 @@ const (
// StatusTransportFailure indicates that there was failure in the transport layer error while sending the document
// operation to Vespa.
StatusTransportFailure
- // StatusError is a catch-all status for any other error that might occur.
- StatusError
)
// Result represents the result of a feeding operation.
@@ -32,8 +30,9 @@ type Result struct {
Stats Stats
}
-// Success returns whether status s is considered a success.
-func (s Status) Success() bool { return s == StatusSuccess || s == StatusConditionNotMet }
+func (r Result) Success() bool {
+ return r.Err == nil && (r.Status == StatusSuccess || r.Status == StatusConditionNotMet)
+}
// Stats represents feeding operation statistics.
type Stats struct {
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go
index 4dadcd1d05c..588330a0574 100644
--- a/client/go/internal/vespa/document/http.go
+++ b/client/go/internal/vespa/document/http.go
@@ -5,10 +5,12 @@ import (
"encoding/json"
"fmt"
"io"
+ "math"
"net/http"
"net/url"
"strconv"
"strings"
+ "sync/atomic"
"time"
"github.com/vespa-engine/vespa/client/go/internal/util"
@@ -16,9 +18,10 @@ import (
// Client represents a HTTP client for the /document/v1/ API.
type Client struct {
- options ClientOptions
- httpClient util.HTTPClient
- now func() time.Time
+ options ClientOptions
+ httpClients []countingHTTPClient
+ now func() time.Time
+ sendCount int32
}
// ClientOptions specifices the configuration options of a feed client.
@@ -29,6 +32,18 @@ type ClientOptions struct {
TraceLevel *int
}
+type countingHTTPClient struct {
+ client util.HTTPClient
+ inflight int64
+}
+
+func (c *countingHTTPClient) addInflight(n int64) { atomic.AddInt64(&c.inflight, n) }
+
+func (c *countingHTTPClient) Do(req *http.Request, timeout time.Duration) (*http.Response, error) {
+ defer c.addInflight(-1)
+ return c.client.Do(req, timeout)
+}
+
type countingReader struct {
reader io.Reader
bytesRead int64
@@ -40,13 +55,19 @@ func (r *countingReader) Read(p []byte) (int, error) {
return n, err
}
-func NewClient(options ClientOptions, httpClient util.HTTPClient) *Client {
- c := &Client{
- options: options,
- httpClient: httpClient,
- now: time.Now,
+func NewClient(options ClientOptions, httpClients []util.HTTPClient) *Client {
+ if len(httpClients) < 1 {
+ panic("need at least one HTTP client")
+ }
+ countingClients := make([]countingHTTPClient, 0, len(httpClients))
+ for _, client := range httpClients {
+ countingClients = append(countingClients, countingHTTPClient{client: client})
+ }
+ return &Client{
+ options: options,
+ httpClients: countingClients,
+ now: time.Now,
}
- return c
}
func (c *Client) queryParams() url.Values {
@@ -109,45 +130,56 @@ func (c *Client) feedURL(d Document, queryParams url.Values) (string, *url.URL,
return httpMethod, u, nil
}
-// Send given document the URL configured in this client.
+func (c *Client) leastBusyClient() *countingHTTPClient {
+ leastBusy := c.httpClients[0]
+ min := int64(math.MaxInt64)
+ next := atomic.AddInt32(&c.sendCount, 1)
+ start := int(next) % len(c.httpClients)
+ for i := range c.httpClients {
+ j := (i + start) % len(c.httpClients)
+ client := c.httpClients[j]
+ inflight := atomic.LoadInt64(&client.inflight)
+ if inflight < min {
+ leastBusy = client
+ min = inflight
+ }
+ }
+ leastBusy.addInflight(1)
+ return &leastBusy
+}
+
+// Send given document to the endpoint configured in this client.
func (c *Client) Send(document Document) Result {
start := c.now()
- result := Result{Id: document.Id}
- result.Stats.Requests = 1
+ result := Result{Id: document.Id, Stats: Stats{Requests: 1}}
method, url, err := c.feedURL(document, c.queryParams())
if err != nil {
- result.Stats.Errors = 1
- result.Err = err
- return result
+ return resultWithErr(result, err)
}
req, err := http.NewRequest(method, url.String(), bytes.NewReader(document.Body))
if err != nil {
- result.Stats.Errors = 1
- result.Status = StatusError
- result.Err = err
- return result
+ return resultWithErr(result, err)
}
- resp, err := c.httpClient.Do(req, 190*time.Second)
+ resp, err := c.leastBusyClient().Do(req, 190*time.Second)
if err != nil {
- result.Stats.Errors = 1
- result.Status = StatusTransportFailure
- result.Err = err
- return result
+ return resultWithErr(result, err)
}
defer resp.Body.Close()
- result.Stats.Responses = 1
- result.Stats.ResponsesByCode = map[int]int64{
- resp.StatusCode: 1,
- }
- result.Stats.BytesSent = int64(len(document.Body))
elapsed := c.now().Sub(start)
- result.Stats.TotalLatency = elapsed
- result.Stats.MinLatency = elapsed
- result.Stats.MaxLatency = elapsed
- return c.resultWithResponse(resp, result)
+ return c.resultWithResponse(resp, result, document, elapsed)
+}
+
+func resultWithErr(result Result, err error) Result {
+ result.Stats.Errors++
+ result.Status = StatusTransportFailure
+ result.Err = err
+ return result
}
-func (c *Client) resultWithResponse(resp *http.Response, result Result) Result {
+func (c *Client) resultWithResponse(resp *http.Response, result Result, document Document, elapsed time.Duration) Result {
+ result.HTTPStatus = resp.StatusCode
+ result.Stats.Responses++
+ result.Stats.ResponsesByCode = map[int]int64{resp.StatusCode: 1}
switch resp.StatusCode {
case 200:
result.Status = StatusSuccess
@@ -165,14 +197,18 @@ func (c *Client) resultWithResponse(resp *http.Response, result Result) Result {
cr := countingReader{reader: resp.Body}
jsonDec := json.NewDecoder(&cr)
if err := jsonDec.Decode(&body); err != nil {
- result.Status = StatusError
+ result.Status = StatusVespaFailure
result.Err = fmt.Errorf("failed to decode json response: %w", err)
}
result.Message = body.Message
result.Trace = string(body.Trace)
+ result.Stats.BytesSent = int64(len(document.Body))
result.Stats.BytesRecv = cr.bytesRead
- if !result.Status.Success() {
- result.Stats.Errors = 1
+ if !result.Success() {
+ result.Stats.Errors++
}
+ result.Stats.TotalLatency = elapsed
+ result.Stats.MinLatency = elapsed
+ result.Stats.MaxLatency = elapsed
return result
}
diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go
index 311668fa16e..43eaf1bfdf9 100644
--- a/client/go/internal/vespa/document/http_test.go
+++ b/client/go/internal/vespa/document/http_test.go
@@ -11,6 +11,7 @@ import (
"time"
"github.com/vespa-engine/vespa/client/go/internal/mock"
+ "github.com/vespa-engine/vespa/client/go/internal/util"
)
type manualClock struct {
@@ -25,6 +26,37 @@ func (c *manualClock) now() time.Time {
func (c *manualClock) advance(d time.Duration) { c.t = c.t.Add(d) }
+type mockHTTPClient struct {
+ id int
+ *mock.HTTPClient
+}
+
+func TestLeastBusyClient(t *testing.T) {
+ httpClient := mock.HTTPClient{}
+ var httpClients []util.HTTPClient
+ for i := 0; i < 4; i++ {
+ httpClients = append(httpClients, &mockHTTPClient{i, &httpClient})
+ }
+ client := NewClient(ClientOptions{}, httpClients)
+ client.httpClients[0].addInflight(1)
+ client.httpClients[1].addInflight(1)
+ assertLeastBusy(t, 2, client)
+ assertLeastBusy(t, 2, client)
+ assertLeastBusy(t, 3, client)
+ client.httpClients[3].addInflight(1)
+ client.httpClients[1].addInflight(-1)
+ assertLeastBusy(t, 1, client)
+}
+
+func assertLeastBusy(t *testing.T, id int, client *Client) {
+ t.Helper()
+ leastBusy := client.leastBusyClient()
+ got := leastBusy.client.(*mockHTTPClient).id
+ if got != id {
+ t.Errorf("got client.id=%d, want %d", got, id)
+ }
+}
+
func TestClientSend(t *testing.T) {
docs := []Document{
{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "123"}}`)},
@@ -35,21 +67,43 @@ func TestClientSend(t *testing.T) {
client := NewClient(ClientOptions{
BaseURL: "https://example.com:1337",
Timeout: time.Duration(5 * time.Second),
- }, &httpClient)
+ }, []util.HTTPClient{&httpClient})
clock := manualClock{t: time.Now(), tick: time.Second}
client.now = clock.now
var stats Stats
for i, doc := range docs {
+ wantRes := Result{
+ Id: doc.Id,
+ Stats: Stats{
+ Requests: 1,
+ Responses: 1,
+ TotalLatency: time.Second,
+ MinLatency: time.Second,
+ MaxLatency: time.Second,
+ BytesSent: 25,
+ },
+ }
if i < 2 {
httpClient.NextResponseString(200, `{"message":"All good!"}`)
+ wantRes.Status = StatusSuccess
+ wantRes.HTTPStatus = 200
+ wantRes.Message = "All good!"
+ wantRes.Stats.ResponsesByCode = map[int]int64{200: 1}
+ wantRes.Stats.BytesRecv = 23
} else {
httpClient.NextResponseString(502, `{"message":"Good bye, cruel world!"}`)
+ wantRes.Status = StatusVespaFailure
+ wantRes.HTTPStatus = 502
+ wantRes.Message = "Good bye, cruel world!"
+ wantRes.Stats.ResponsesByCode = map[int]int64{502: 1}
+ wantRes.Stats.Errors = 1
+ wantRes.Stats.BytesRecv = 36
}
res := client.Send(doc)
- stats.Add(res.Stats)
- if res.Err != nil {
- t.Fatalf("got unexpected error %q", res.Err)
+ if !reflect.DeepEqual(res, wantRes) {
+ t.Fatalf("got result %+v, want %+v", res, wantRes)
}
+ stats.Add(res.Stats)
r := httpClient.LastRequest
if r.Method != http.MethodPut {
t.Errorf("got r.Method = %q, want %q", r.Method, http.MethodPut)
@@ -176,7 +230,7 @@ func TestClientFeedURL(t *testing.T) {
httpClient := mock.HTTPClient{}
client := NewClient(ClientOptions{
BaseURL: "https://example.com",
- }, &httpClient)
+ }, []util.HTTPClient{&httpClient})
for i, tt := range tests {
moreParams := url.Values{}
moreParams.Set("foo", "ba/r")
diff --git a/client/go/internal/vespa/document/throttler.go b/client/go/internal/vespa/document/throttler.go
index 69bb7c8d7ac..5b0aab6174e 100644
--- a/client/go/internal/vespa/document/throttler.go
+++ b/client/go/internal/vespa/document/throttler.go
@@ -7,13 +7,7 @@ import (
"time"
)
-const (
- throttlerWeight = 0.7
- // TODO(mpolden): Multiply this by connections per endpoint, and number of endpoints when this becomes configurable
- // for local target
- throttlerMinInflight = 16
- throttlerMaxInflight = 256 * throttlerMinInflight // 4096 max streams per connection on the server side
-)
+const throttlerWeight = 0.7
type Throttler interface {
// Sent notifies the the throttler that a document has been sent.
@@ -27,29 +21,38 @@ type Throttler interface {
}
type dynamicThrottler struct {
- ok int64
+ minInflight int64
+ maxInflight int64
targetInflight int64
targetTimesTen int64
throughputs []float64
+ ok int64
sent int64
start time.Time
now func() time.Time
}
-func newThrottler(nowFunc func() time.Time) *dynamicThrottler {
+func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler {
+ var (
+ minInflight = 16 * int64(connections)
+ maxInflight = 256 * minInflight // 4096 max streams per connection on the server side
+ )
return &dynamicThrottler{
+ minInflight: minInflight,
+ maxInflight: maxInflight,
+ targetInflight: 8 * minInflight,
+ targetTimesTen: 10 * maxInflight,
+
throughputs: make([]float64, 128),
- start: nowFunc(),
- now: nowFunc,
- targetInflight: 8 * throttlerMinInflight,
- targetTimesTen: 10 * throttlerMaxInflight,
+ start: nowFunc(),
+ now: nowFunc,
}
}
-func NewThrottler() Throttler { return newThrottler(time.Now) }
+func NewThrottler(connections int) Throttler { return newThrottler(connections, time.Now) }
func (t *dynamicThrottler) Sent() {
currentInflight := atomic.LoadInt64(&t.targetInflight)
@@ -64,7 +67,7 @@ func (t *dynamicThrottler) Sent() {
currentThroughput := float64(atomic.SwapInt64(&t.ok, 0)) / float64(elapsed)
// Use buckets for throughput over inflight, along the log-scale, in [minInflight, maxInflight).
- index := int(float64(len(t.throughputs)) * math.Log(max(1, min(255, float64(currentInflight)/throttlerMinInflight))) / math.Log(256))
+ index := int(float64(len(t.throughputs)) * math.Log(max(1, min(255, float64(currentInflight)/float64(t.minInflight)))) / math.Log(256))
t.throughputs[index] = currentThroughput
// Loop over throughput measurements and pick the one which optimises throughput and latency.
@@ -74,7 +77,7 @@ func (t *dynamicThrottler) Sent() {
if t.throughputs[i] == 0 {
continue // Skip unknown values
}
- inflight := float64(throttlerMinInflight) * math.Pow(256, (float64(i)+0.5)/float64(len(t.throughputs)))
+ inflight := float64(t.minInflight) * math.Pow(256, (float64(i)+0.5)/float64(len(t.throughputs)))
objective := t.throughputs[i] * math.Pow(inflight, throttlerWeight-1) // Optimise throughput (weight), but also latency (1 - weight)
if objective > maxObjective {
maxObjective = objective
@@ -82,7 +85,7 @@ func (t *dynamicThrottler) Sent() {
}
}
target := int64((rand.Float64()*0.20 + 0.92) * choice) // Random walk, skewed towards increase
- atomic.StoreInt64(&t.targetInflight, max(throttlerMinInflight, min(throttlerMaxInflight, target)))
+ atomic.StoreInt64(&t.targetInflight, max(t.minInflight, min(t.maxInflight, target)))
}
func (t *dynamicThrottler) Success() {
@@ -91,11 +94,11 @@ func (t *dynamicThrottler) Success() {
}
func (t *dynamicThrottler) Throttled(inflight int64) {
- atomic.StoreInt64(&t.targetTimesTen, max(inflight*5, throttlerMinInflight*10))
+ atomic.StoreInt64(&t.targetTimesTen, max(inflight*5, t.minInflight*10))
}
func (t *dynamicThrottler) TargetInflight() int64 {
- staticTargetInflight := min(throttlerMaxInflight, atomic.LoadInt64(&t.targetTimesTen)/10)
+ staticTargetInflight := min(t.maxInflight, atomic.LoadInt64(&t.targetTimesTen)/10)
targetInflight := atomic.LoadInt64(&t.targetInflight)
return min(staticTargetInflight, targetInflight)
}
diff --git a/client/go/internal/vespa/document/throttler_test.go b/client/go/internal/vespa/document/throttler_test.go
index 2fd1e73a45a..a22f059207f 100644
--- a/client/go/internal/vespa/document/throttler_test.go
+++ b/client/go/internal/vespa/document/throttler_test.go
@@ -7,15 +7,15 @@ import (
func TestThrottler(t *testing.T) {
clock := &manualClock{tick: time.Second}
- tr := newThrottler(clock.now)
+ tr := newThrottler(8, clock.now)
for i := 0; i < 100; i++ {
tr.Sent()
}
- if got, want := tr.TargetInflight(), int64(128); got != want {
+ if got, want := tr.TargetInflight(), int64(1024); got != want {
t.Errorf("got TargetInflight() = %d, but want %d", got, want)
}
tr.Throttled(5)
- if got, want := tr.TargetInflight(), int64(16); got != want {
+ if got, want := tr.TargetInflight(), int64(128); got != want {
t.Errorf("got TargetInflight() = %d, but want %d", got, want)
}
}
diff --git a/client/go/internal/vespa/target.go b/client/go/internal/vespa/target.go
index 51861eb12ab..bc936623bcb 100644
--- a/client/go/internal/vespa/target.go
+++ b/client/go/internal/vespa/target.go
@@ -74,7 +74,7 @@ type Target interface {
// TLSOptions configures the client certificate to use for cloud API or service requests.
type TLSOptions struct {
- KeyPair *tls.Certificate
+ KeyPair []tls.Certificate
CertificateFile string
PrivateKeyFile string
AthenzDomain string
@@ -93,7 +93,7 @@ type LogOptions struct {
// Do sends request to this service. Any required authentication happens automatically.
func (s *Service) Do(request *http.Request, timeout time.Duration) (*http.Response, error) {
if s.TLSOptions.AthenzDomain != "" && s.TLSOptions.KeyPair != nil {
- accessToken, err := s.zts.AccessToken(s.TLSOptions.AthenzDomain, *s.TLSOptions.KeyPair)
+ accessToken, err := s.zts.AccessToken(s.TLSOptions.AthenzDomain, s.TLSOptions.KeyPair[0])
if err != nil {
return nil, err
}
@@ -105,6 +105,8 @@ func (s *Service) Do(request *http.Request, timeout time.Duration) (*http.Respon
return s.httpClient.Do(request, timeout)
}
+func (s *Service) Client() util.HTTPClient { return s.httpClient }
+
// Wait polls the health check of this service until it succeeds or timeout passes.
func (s *Service) Wait(timeout time.Duration) (int, error) {
url := s.BaseURL
@@ -139,18 +141,18 @@ type requestFunc func() *http.Request
// waitForOK queries url and returns its status code. If the url returns a non-200 status code, it is repeatedly queried
// until timeout elapses.
-func waitForOK(client util.HTTPClient, url string, certificate *tls.Certificate, timeout time.Duration) (int, error) {
+func waitForOK(client util.HTTPClient, url string, certificates []tls.Certificate, timeout time.Duration) (int, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return 0, err
}
okFunc := func(status int, response []byte) (bool, error) { return isOK(status), nil }
- return wait(client, okFunc, func() *http.Request { return req }, certificate, timeout)
+ return wait(client, okFunc, func() *http.Request { return req }, certificates, timeout)
}
-func wait(client util.HTTPClient, fn responseFunc, reqFn requestFunc, certificate *tls.Certificate, timeout time.Duration) (int, error) {
- if certificate != nil {
- util.SetCertificate(client, []tls.Certificate{*certificate})
+func wait(client util.HTTPClient, fn responseFunc, reqFn requestFunc, certificates []tls.Certificate, timeout time.Duration) (int, error) {
+ if certificates != nil {
+ util.SetCertificates(client, certificates)
}
var (
httpErr error
diff --git a/client/go/internal/vespa/target_cloud.go b/client/go/internal/vespa/target_cloud.go
index 2335d4f3432..1fb3edd78c5 100644
--- a/client/go/internal/vespa/target_cloud.go
+++ b/client/go/internal/vespa/target_cloud.go
@@ -161,7 +161,7 @@ func (t *cloudTarget) Service(name string, timeout time.Duration, runID int64, c
}
if service.TLSOptions.KeyPair != nil {
- util.SetCertificate(service.httpClient, []tls.Certificate{*service.TLSOptions.KeyPair})
+ util.SetCertificates(service.httpClient, service.TLSOptions.KeyPair)
}
return service, nil
}
@@ -175,7 +175,7 @@ func (t *cloudTarget) SignRequest(req *http.Request, keyID string) error {
return t.addAuth0AccessToken(req)
}
} else {
- if t.apiOptions.TLSOptions.KeyPair.Certificate == nil {
+ if t.apiOptions.TLSOptions.KeyPair == nil {
return fmt.Errorf("system %s requires a certificate for authentication", t.apiOptions.System.Name)
}
return nil