From 0784bd1d2b2c887897b7750281a54ab57cb6badf Mon Sep 17 00:00:00 2001 From: Martin Polden Date: Tue, 4 Apr 2023 14:10:24 +0200 Subject: Always use HTTP/2 when feeding --- client/go/internal/cli/cmd/feed.go | 1 + client/go/internal/util/http.go | 66 +++++++++++++++++++++----------------- client/go/internal/vespa/target.go | 9 ++++++ 3 files changed, 46 insertions(+), 30 deletions(-) (limited to 'client') diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index c8e032929b8..97bee293077 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -56,6 +56,7 @@ func feed(r io.Reader, cli *CLI, concurrency int) error { if err != nil { return err } + service.ForceHTTP2() // Feeding should always use HTTP/2 client := document.NewClient(document.ClientOptions{ BaseURL: service.BaseURL, }, service) diff --git a/client/go/internal/util/http.go b/client/go/internal/util/http.go index cb35932c8e7..b1646b06a80 100644 --- a/client/go/internal/util/http.go +++ b/client/go/internal/util/http.go @@ -2,9 +2,10 @@ package util import ( - "bytes" + "context" "crypto/tls" "fmt" + "net" "net/http" "time" @@ -36,45 +37,50 @@ func SetCertificate(client HTTPClient, certificates []tls.Certificate) { if !ok { return } - // Use HTTP/2 transport explicitly. Connection reuse does not work properly when using regular http.Transport, even - // though it upgrades to HTTP/2 automatically - // https://github.com/golang/go/issues/16582 - // https://github.com/golang/go/issues/22091 - var transport *http2.Transport - if _, ok := c.client.Transport.(*http.Transport); ok { - transport = &http2.Transport{} - c.client.Transport = transport - } else if t, ok := c.client.Transport.(*http2.Transport); ok { - transport = t - } else { - panic(fmt.Sprintf("unknown transport type: %T", c.client.Transport)) - } - if ok && !c.hasCertificates(transport.TLSClientConfig, certificates) { - transport.TLSClientConfig = &tls.Config{ + var tlsConfig *tls.Config = nil + if certificates != nil { + tlsConfig = &tls.Config{ Certificates: certificates, MinVersion: tls.VersionTLS12, } } + if tr, ok := c.client.Transport.(*http.Transport); ok { + tr.TLSClientConfig = tlsConfig + } else if tr, ok := c.client.Transport.(*http2.Transport); ok { + tr.TLSClientConfig = tlsConfig + } else { + panic(fmt.Sprintf("unknown transport type: %T", c.client.Transport)) + } } -func (c *defaultHTTPClient) hasCertificates(tlsConfig *tls.Config, certs []tls.Certificate) bool { - if tlsConfig == nil { - return false - } - if len(tlsConfig.Certificates) != len(certs) { - return false +func ForceHTTP2(client HTTPClient, certificates []tls.Certificate) { + c, ok := client.(*defaultHTTPClient) + if !ok { + return } - for i := 0; i < len(certs); i++ { - if len(tlsConfig.Certificates[i].Certificate) != len(certs[i].Certificate) { - return false + var tlsConfig *tls.Config = nil + var dialFunc func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error) + if certificates != nil { + tlsConfig = &tls.Config{ + Certificates: certificates, + MinVersion: tls.VersionTLS12, } - for j := 0; j < len(certs[i].Certificate); j++ { - if !bytes.Equal(tlsConfig.Certificates[i].Certificate[j], certs[i].Certificate[j]) { - return false - } + } else { + // No certificate, so force H2C (HTTP/2 over clear-text) by using a non-TLS Dialer + dialer := net.Dialer{} + dialFunc = func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error) { + return dialer.DialContext(ctx, network, addr) } } - return true + // Use HTTP/2 transport explicitly. Connection reuse does not work properly when using regular http.Transport, even + // though it upgrades to HTTP/2 automatically + // https://github.com/golang/go/issues/16582 + // https://github.com/golang/go/issues/22091 + c.client.Transport = &http2.Transport{ + AllowHTTP: true, + TLSClientConfig: tlsConfig, + DialTLSContext: dialFunc, + } } func CreateClient(timeout time.Duration) HTTPClient { diff --git a/client/go/internal/vespa/target.go b/client/go/internal/vespa/target.go index 51861eb12ab..1ad36b1d799 100644 --- a/client/go/internal/vespa/target.go +++ b/client/go/internal/vespa/target.go @@ -119,6 +119,15 @@ func (s *Service) Wait(timeout time.Duration) (int, error) { return waitForOK(s.httpClient, url, s.TLSOptions.KeyPair, timeout) } +// ForceHTTP2 forces the underlying HTTP client to use HTTP/2. +func (s *Service) ForceHTTP2() { + var certs []tls.Certificate + if s.TLSOptions.KeyPair != nil { + certs = []tls.Certificate{*s.TLSOptions.KeyPair} + } + util.ForceHTTP2(s.httpClient, certs) +} + func (s *Service) Description() string { switch s.Name { case QueryService: -- cgit v1.2.3 From 20aec66209b46859a99b0fb80ce6c208f77dc9ff Mon Sep 17 00:00:00 2001 From: Martin Polden Date: Tue, 4 Apr 2023 14:17:17 +0200 Subject: Add verbose flag --- client/go/internal/cli/cmd/feed.go | 19 +++++++++------- client/go/internal/vespa/document/dispatcher.go | 25 ++++++++++++++++++---- .../go/internal/vespa/document/dispatcher_test.go | 9 ++++---- client/go/internal/vespa/document/document.go | 21 ++++++++++++++++++ client/go/internal/vespa/document/feeder.go | 5 +++-- client/go/internal/vespa/document/http.go | 2 +- 6 files changed, 62 insertions(+), 19 deletions(-) (limited to 'client') diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index 97bee293077..0244004b512 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -12,14 +12,13 @@ import ( "github.com/vespa-engine/vespa/client/go/internal/vespa/document" ) -func addFeedFlags(cmd *cobra.Command, concurrency *int) { - // TOOD(mpolden): Remove this flag - cmd.PersistentFlags().IntVarP(concurrency, "concurrency", "T", 64, "Number of goroutines to use for dispatching") +func addFeedFlags(cmd *cobra.Command, verbose *bool) { + cmd.PersistentFlags().BoolVarP(verbose, "verbose", "v", false, "Verbose mode. Print errors as they happen") } func newFeedCmd(cli *CLI) *cobra.Command { var ( - concurrency int + verbose bool ) cmd := &cobra.Command{ Use: "feed FILE", @@ -44,14 +43,14 @@ newline (JSONL). return err } defer f.Close() - return feed(f, cli, concurrency) + return feed(f, cli, verbose) }, } - addFeedFlags(cmd, &concurrency) + addFeedFlags(cmd, &verbose) return cmd } -func feed(r io.Reader, cli *CLI, concurrency int) error { +func feed(r io.Reader, cli *CLI, verbose bool) error { service, err := documentService(cli) if err != nil { return err @@ -63,7 +62,11 @@ func feed(r io.Reader, cli *CLI, concurrency int) error { throttler := document.NewThrottler() // TODO(mpolden): Make doom duration configurable circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0) - dispatcher := document.NewDispatcher(client, throttler, circuitBreaker) + errWriter := io.Discard + if verbose { + errWriter = cli.Stderr + } + dispatcher := document.NewDispatcher(client, throttler, circuitBreaker, errWriter) dec := document.NewDecoder(r) start := cli.now() diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 7011ae7a9b6..9d757aa51aa 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -2,6 +2,7 @@ package document import ( "fmt" + "io" "sync" "sync/atomic" "time" @@ -21,6 +22,7 @@ type Dispatcher struct { results chan Result inflight map[string]*documentGroup inflightCount int64 + errWriter io.Writer mu sync.RWMutex wg sync.WaitGroup @@ -45,12 +47,13 @@ func (g *documentGroup) append(op documentOp) { g.operations = append(g.operations, op) } -func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker) *Dispatcher { +func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, errWriter io.Writer) *Dispatcher { d := &Dispatcher{ feeder: feeder, throttler: throttler, circuitBreaker: breaker, inflight: make(map[string]*documentGroup), + errWriter: errWriter, } d.start() return d @@ -66,7 +69,7 @@ func (d *Dispatcher) dispatchAll(g *documentGroup) { op.attempts++ result := d.feeder.Send(op.document) d.results <- result - ok = result.Status.Success() + ok = result.Success() if !d.shouldRetry(op, result) { break } @@ -83,12 +86,26 @@ func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool { return false } if result.HTTPStatus == 429 || result.HTTPStatus == 503 { + fmt.Fprintf(d.errWriter, "feed: %s was throttled with status %d: retrying\n", op.document, result.HTTPStatus) d.throttler.Throttled(atomic.LoadInt64(&d.inflightCount)) return true } - if result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 { + if result.Err != nil || result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 { + retry := op.attempts <= maxAttempts + msg := "feed: " + op.document.String() + " failed with " + if result.Err != nil { + msg += "error " + result.Err.Error() + } else { + msg += fmt.Sprintf("status %d", result.HTTPStatus) + } + if retry { + msg += ": retrying" + } else { + msg += fmt.Sprintf(": giving up after %d attempts", maxAttempts) + } + fmt.Fprintln(d.errWriter, msg) d.circuitBreaker.Error(fmt.Errorf("request failed with status %d", result.HTTPStatus)) - if op.attempts <= maxAttempts { + if retry { return true } } diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go index 8a6d8c6117c..fc96adabc96 100644 --- a/client/go/internal/vespa/document/dispatcher_test.go +++ b/client/go/internal/vespa/document/dispatcher_test.go @@ -1,6 +1,7 @@ package document import ( + "io" "sync" "testing" "time" @@ -29,7 +30,7 @@ func (f *mockFeeder) Send(doc Document) Result { } else { f.documents = append(f.documents, doc) } - if !result.Status.Success() { + if !result.Success() { result.Stats.Errors = 1 } return result @@ -40,7 +41,7 @@ func TestDispatcher(t *testing.T) { clock := &manualClock{tick: time.Second} throttler := newThrottler(clock.now) breaker := NewCircuitBreaker(time.Second, 0) - dispatcher := NewDispatcher(feeder, throttler, breaker) + dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard) docs := []Document{ {Id: mustParseId("id:ns:type::doc1"), Operation: OperationPut, Body: []byte(`{"fields":{"foo": "123"}}`)}, {Id: mustParseId("id:ns:type::doc2"), Operation: OperationPut, Body: []byte(`{"fields":{"bar": "456"}}`)}, @@ -73,7 +74,7 @@ func TestDispatcherOrdering(t *testing.T) { clock := &manualClock{tick: time.Second} throttler := newThrottler(clock.now) breaker := NewCircuitBreaker(time.Second, 0) - dispatcher := NewDispatcher(feeder, throttler, breaker) + dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard) for _, d := range docs { dispatcher.Enqueue(d) } @@ -109,7 +110,7 @@ func TestDispatcherOrderingWithFailures(t *testing.T) { clock := &manualClock{tick: time.Second} throttler := newThrottler(clock.now) breaker := NewCircuitBreaker(time.Second, 0) - dispatcher := NewDispatcher(feeder, throttler, breaker) + dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard) for _, d := range docs { dispatcher.Enqueue(d) } diff --git a/client/go/internal/vespa/document/document.go b/client/go/internal/vespa/document/document.go index 98cb2d1b6c6..efb60ad8c0a 100644 --- a/client/go/internal/vespa/document/document.go +++ b/client/go/internal/vespa/document/document.go @@ -130,6 +130,27 @@ type Decoder struct { jsonl bool } +func (d Document) String() string { + var sb strings.Builder + switch d.Operation { + case OperationPut: + sb.WriteString("put ") + case OperationUpdate: + sb.WriteString("update ") + case OperationRemove: + sb.WriteString("remove ") + } + sb.WriteString(d.Id.String()) + if d.Condition != "" { + sb.WriteString(", condition=") + sb.WriteString(d.Condition) + } + if d.Create { + sb.WriteString(", create=true") + } + return sb.String() +} + func (d *Decoder) guessMode() error { for !d.array && !d.jsonl { b, err := d.buf.ReadByte() diff --git a/client/go/internal/vespa/document/feeder.go b/client/go/internal/vespa/document/feeder.go index 8bdd5bca5ba..4ff612067b7 100644 --- a/client/go/internal/vespa/document/feeder.go +++ b/client/go/internal/vespa/document/feeder.go @@ -32,8 +32,9 @@ type Result struct { Stats Stats } -// Success returns whether status s is considered a success. -func (s Status) Success() bool { return s == StatusSuccess || s == StatusConditionNotMet } +func (r Result) Success() bool { + return r.Err == nil && (r.Status == StatusSuccess || r.Status == StatusConditionNotMet) +} // Stats represents feeding operation statistics. type Stats struct { diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go index 4dadcd1d05c..b1d5c80f29f 100644 --- a/client/go/internal/vespa/document/http.go +++ b/client/go/internal/vespa/document/http.go @@ -171,7 +171,7 @@ func (c *Client) resultWithResponse(resp *http.Response, result Result) Result { result.Message = body.Message result.Trace = string(body.Trace) result.Stats.BytesRecv = cr.bytesRead - if !result.Status.Success() { + if !result.Success() { result.Stats.Errors = 1 } return result -- cgit v1.2.3 From 65607d6117b72cefa64ec13189e904f34cff871b Mon Sep 17 00:00:00 2001 From: Martin Polden Date: Tue, 4 Apr 2023 16:33:18 +0200 Subject: Use slice of certificates instead of pointer --- client/go/internal/cli/auth/zts/zts.go | 2 +- client/go/internal/cli/cmd/root.go | 4 ++-- client/go/internal/cli/cmd/test.go | 2 +- client/go/internal/util/http.go | 2 +- client/go/internal/vespa/target.go | 22 ++++++++-------------- client/go/internal/vespa/target_cloud.go | 4 ++-- 6 files changed, 15 insertions(+), 21 deletions(-) (limited to 'client') diff --git a/client/go/internal/cli/auth/zts/zts.go b/client/go/internal/cli/auth/zts/zts.go index 1e84912a271..caa2d03367d 100644 --- a/client/go/internal/cli/auth/zts/zts.go +++ b/client/go/internal/cli/auth/zts/zts.go @@ -37,7 +37,7 @@ func (c *Client) AccessToken(domain string, certificate tls.Certificate) (string if err != nil { return "", err } - util.SetCertificate(c.client, []tls.Certificate{certificate}) + util.SetCertificates(c.client, []tls.Certificate{certificate}) response, err := c.client.Do(req, 10*time.Second) if err != nil { return "", err diff --git a/client/go/internal/cli/cmd/root.go b/client/go/internal/cli/cmd/root.go index 58e940d59ef..360af9d0dcf 100644 --- a/client/go/internal/cli/cmd/root.go +++ b/client/go/internal/cli/cmd/root.go @@ -366,7 +366,7 @@ func (c *CLI) createCloudTarget(targetType string, opts targetOptions) (vespa.Ta return nil, errHint(err, "Deployment to cloud requires a certificate. Try 'vespa auth cert'") } deploymentTLSOptions = vespa.TLSOptions{ - KeyPair: &kp.KeyPair, + KeyPair: []tls.Certificate{kp.KeyPair}, CertificateFile: kp.CertificateFile, PrivateKeyFile: kp.PrivateKeyFile, } @@ -377,7 +377,7 @@ func (c *CLI) createCloudTarget(targetType string, opts targetOptions) (vespa.Ta return nil, errHint(err, "Deployment to hosted requires an Athenz certificate", "Try renewing certificate with 'athenz-user-cert'") } apiTLSOptions = vespa.TLSOptions{ - KeyPair: &kp.KeyPair, + KeyPair: []tls.Certificate{kp.KeyPair}, CertificateFile: kp.CertificateFile, PrivateKeyFile: kp.PrivateKeyFile, } diff --git a/client/go/internal/cli/cmd/test.go b/client/go/internal/cli/cmd/test.go index 4a53fe6bed3..05633b1135e 100644 --- a/client/go/internal/cli/cmd/test.go +++ b/client/go/internal/cli/cmd/test.go @@ -263,7 +263,7 @@ func verify(step step, defaultCluster string, defaultParameters map[string]strin var response *http.Response if externalEndpoint { - util.SetCertificate(context.cli.httpClient, []tls.Certificate{}) + util.SetCertificates(context.cli.httpClient, []tls.Certificate{}) response, err = context.cli.httpClient.Do(request, 60*time.Second) } else { response, err = service.Do(request, 600*time.Second) // Vespa should provide a response within the given request timeout diff --git a/client/go/internal/util/http.go b/client/go/internal/util/http.go index b1646b06a80..a3de212134d 100644 --- a/client/go/internal/util/http.go +++ b/client/go/internal/util/http.go @@ -32,7 +32,7 @@ func (c *defaultHTTPClient) Do(request *http.Request, timeout time.Duration) (re return c.client.Do(request) } -func SetCertificate(client HTTPClient, certificates []tls.Certificate) { +func SetCertificates(client HTTPClient, certificates []tls.Certificate) { c, ok := client.(*defaultHTTPClient) if !ok { return diff --git a/client/go/internal/vespa/target.go b/client/go/internal/vespa/target.go index 1ad36b1d799..9eba5c6711f 100644 --- a/client/go/internal/vespa/target.go +++ b/client/go/internal/vespa/target.go @@ -74,7 +74,7 @@ type Target interface { // TLSOptions configures the client certificate to use for cloud API or service requests. type TLSOptions struct { - KeyPair *tls.Certificate + KeyPair []tls.Certificate CertificateFile string PrivateKeyFile string AthenzDomain string @@ -93,7 +93,7 @@ type LogOptions struct { // Do sends request to this service. Any required authentication happens automatically. func (s *Service) Do(request *http.Request, timeout time.Duration) (*http.Response, error) { if s.TLSOptions.AthenzDomain != "" && s.TLSOptions.KeyPair != nil { - accessToken, err := s.zts.AccessToken(s.TLSOptions.AthenzDomain, *s.TLSOptions.KeyPair) + accessToken, err := s.zts.AccessToken(s.TLSOptions.AthenzDomain, s.TLSOptions.KeyPair[0]) if err != nil { return nil, err } @@ -120,13 +120,7 @@ func (s *Service) Wait(timeout time.Duration) (int, error) { } // ForceHTTP2 forces the underlying HTTP client to use HTTP/2. -func (s *Service) ForceHTTP2() { - var certs []tls.Certificate - if s.TLSOptions.KeyPair != nil { - certs = []tls.Certificate{*s.TLSOptions.KeyPair} - } - util.ForceHTTP2(s.httpClient, certs) -} +func (s *Service) ForceHTTP2() { util.ForceHTTP2(s.httpClient, s.TLSOptions.KeyPair) } func (s *Service) Description() string { switch s.Name { @@ -148,18 +142,18 @@ type requestFunc func() *http.Request // waitForOK queries url and returns its status code. If the url returns a non-200 status code, it is repeatedly queried // until timeout elapses. -func waitForOK(client util.HTTPClient, url string, certificate *tls.Certificate, timeout time.Duration) (int, error) { +func waitForOK(client util.HTTPClient, url string, certificates []tls.Certificate, timeout time.Duration) (int, error) { req, err := http.NewRequest("GET", url, nil) if err != nil { return 0, err } okFunc := func(status int, response []byte) (bool, error) { return isOK(status), nil } - return wait(client, okFunc, func() *http.Request { return req }, certificate, timeout) + return wait(client, okFunc, func() *http.Request { return req }, certificates, timeout) } -func wait(client util.HTTPClient, fn responseFunc, reqFn requestFunc, certificate *tls.Certificate, timeout time.Duration) (int, error) { - if certificate != nil { - util.SetCertificate(client, []tls.Certificate{*certificate}) +func wait(client util.HTTPClient, fn responseFunc, reqFn requestFunc, certificates []tls.Certificate, timeout time.Duration) (int, error) { + if certificates != nil { + util.SetCertificates(client, certificates) } var ( httpErr error diff --git a/client/go/internal/vespa/target_cloud.go b/client/go/internal/vespa/target_cloud.go index 2335d4f3432..1fb3edd78c5 100644 --- a/client/go/internal/vespa/target_cloud.go +++ b/client/go/internal/vespa/target_cloud.go @@ -161,7 +161,7 @@ func (t *cloudTarget) Service(name string, timeout time.Duration, runID int64, c } if service.TLSOptions.KeyPair != nil { - util.SetCertificate(service.httpClient, []tls.Certificate{*service.TLSOptions.KeyPair}) + util.SetCertificates(service.httpClient, service.TLSOptions.KeyPair) } return service, nil } @@ -175,7 +175,7 @@ func (t *cloudTarget) SignRequest(req *http.Request, keyID string) error { return t.addAuth0AccessToken(req) } } else { - if t.apiOptions.TLSOptions.KeyPair.Certificate == nil { + if t.apiOptions.TLSOptions.KeyPair == nil { return fmt.Errorf("system %s requires a certificate for authentication", t.apiOptions.System.Name) } return nil -- cgit v1.2.3 From 724ca1704c58d9b40da0f0730cefd63557c04215 Mon Sep 17 00:00:00 2001 From: Martin Polden Date: Wed, 5 Apr 2023 10:09:40 +0200 Subject: Support multiple connections --- client/go/internal/cli/cmd/feed.go | 30 +++++++++---- client/go/internal/mock/http.go | 4 ++ client/go/internal/util/http.go | 3 ++ client/go/internal/vespa/document/http.go | 61 +++++++++++++++++++++----- client/go/internal/vespa/document/http_test.go | 36 ++++++++++++++- client/go/internal/vespa/target.go | 5 +-- 6 files changed, 115 insertions(+), 24 deletions(-) (limited to 'client') diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index 0244004b512..4f43839b4fe 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -9,16 +9,20 @@ import ( "time" "github.com/spf13/cobra" + "github.com/vespa-engine/vespa/client/go/internal/util" + "github.com/vespa-engine/vespa/client/go/internal/vespa" "github.com/vespa-engine/vespa/client/go/internal/vespa/document" ) -func addFeedFlags(cmd *cobra.Command, verbose *bool) { +func addFeedFlags(cmd *cobra.Command, verbose *bool, connections *int) { + cmd.PersistentFlags().IntVarP(connections, "connections", "N", 8, "The number of connections to use") cmd.PersistentFlags().BoolVarP(verbose, "verbose", "v", false, "Verbose mode. Print errors as they happen") } func newFeedCmd(cli *CLI) *cobra.Command { var ( - verbose bool + verbose bool + connections int ) cmd := &cobra.Command{ Use: "feed FILE", @@ -26,7 +30,7 @@ func newFeedCmd(cli *CLI) *cobra.Command { Long: `Feed documents to a Vespa cluster. A high performance feeding client. This can be used to feed large amounts of -documents to Vespa cluster efficiently. +documents to a Vespa cluster efficiently. The contents of FILE must be either a JSON array or JSON objects separated by newline (JSONL). @@ -43,22 +47,32 @@ newline (JSONL). return err } defer f.Close() - return feed(f, cli, verbose) + return feed(f, cli, verbose, connections) }, } - addFeedFlags(cmd, &verbose) + addFeedFlags(cmd, &verbose, &connections) return cmd } -func feed(r io.Reader, cli *CLI, verbose bool) error { +func createServiceClients(service *vespa.Service, n int) []util.HTTPClient { + clients := make([]util.HTTPClient, 0, n) + for i := 0; i < n; i++ { + client := service.Client().Clone() + util.ForceHTTP2(client, service.TLSOptions.KeyPair) // Feeding should always use HTTP/2 + clients = append(clients, client) + } + return clients +} + +func feed(r io.Reader, cli *CLI, verbose bool, connections int) error { service, err := documentService(cli) if err != nil { return err } - service.ForceHTTP2() // Feeding should always use HTTP/2 + clients := createServiceClients(service, connections) client := document.NewClient(document.ClientOptions{ BaseURL: service.BaseURL, - }, service) + }, clients) throttler := document.NewThrottler() // TODO(mpolden): Make doom duration configurable circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0) diff --git a/client/go/internal/mock/http.go b/client/go/internal/mock/http.go index 9c55f2e79bf..58614d7e5bd 100644 --- a/client/go/internal/mock/http.go +++ b/client/go/internal/mock/http.go @@ -6,6 +6,8 @@ import ( "net/http" "strconv" "time" + + "github.com/vespa-engine/vespa/client/go/internal/util" ) type HTTPClient struct { @@ -58,3 +60,5 @@ func (c *HTTPClient) Do(request *http.Request, timeout time.Duration) (*http.Res }, nil } + +func (c *HTTPClient) Clone() util.HTTPClient { return c } diff --git a/client/go/internal/util/http.go b/client/go/internal/util/http.go index a3de212134d..dcf05ed3a14 100644 --- a/client/go/internal/util/http.go +++ b/client/go/internal/util/http.go @@ -15,6 +15,7 @@ import ( type HTTPClient interface { Do(request *http.Request, timeout time.Duration) (response *http.Response, error error) + Clone() HTTPClient } type defaultHTTPClient struct { @@ -32,6 +33,8 @@ func (c *defaultHTTPClient) Do(request *http.Request, timeout time.Duration) (re return c.client.Do(request) } +func (c *defaultHTTPClient) Clone() HTTPClient { return CreateClient(c.client.Timeout) } + func SetCertificates(client HTTPClient, certificates []tls.Certificate) { c, ok := client.(*defaultHTTPClient) if !ok { diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go index b1d5c80f29f..de3e48a8e33 100644 --- a/client/go/internal/vespa/document/http.go +++ b/client/go/internal/vespa/document/http.go @@ -5,10 +5,12 @@ import ( "encoding/json" "fmt" "io" + "math" "net/http" "net/url" "strconv" "strings" + "sync/atomic" "time" "github.com/vespa-engine/vespa/client/go/internal/util" @@ -16,9 +18,10 @@ import ( // Client represents a HTTP client for the /document/v1/ API. type Client struct { - options ClientOptions - httpClient util.HTTPClient - now func() time.Time + options ClientOptions + httpClients []countingHTTPClient + now func() time.Time + sendCount int32 } // ClientOptions specifices the configuration options of a feed client. @@ -29,6 +32,18 @@ type ClientOptions struct { TraceLevel *int } +type countingHTTPClient struct { + client util.HTTPClient + inflight int64 +} + +func (c *countingHTTPClient) addInflight(n int64) { atomic.AddInt64(&c.inflight, n) } + +func (c *countingHTTPClient) Do(req *http.Request, timeout time.Duration) (*http.Response, error) { + defer c.addInflight(-1) + return c.client.Do(req, timeout) +} + type countingReader struct { reader io.Reader bytesRead int64 @@ -40,13 +55,19 @@ func (r *countingReader) Read(p []byte) (int, error) { return n, err } -func NewClient(options ClientOptions, httpClient util.HTTPClient) *Client { - c := &Client{ - options: options, - httpClient: httpClient, - now: time.Now, +func NewClient(options ClientOptions, httpClients []util.HTTPClient) *Client { + if len(httpClients) < 1 { + panic("need at least one HTTP client") + } + countingClients := make([]countingHTTPClient, 0, len(httpClients)) + for _, client := range httpClients { + countingClients = append(countingClients, countingHTTPClient{client: client}) + } + return &Client{ + options: options, + httpClients: countingClients, + now: time.Now, } - return c } func (c *Client) queryParams() url.Values { @@ -109,7 +130,25 @@ func (c *Client) feedURL(d Document, queryParams url.Values) (string, *url.URL, return httpMethod, u, nil } -// Send given document the URL configured in this client. +func (c *Client) leastBusyClient() *countingHTTPClient { + leastBusy := c.httpClients[0] + min := int64(math.MaxInt64) + next := atomic.AddInt32(&c.sendCount, 1) + start := int(next) % len(c.httpClients) + for i := range c.httpClients { + j := (i + start) % len(c.httpClients) + client := c.httpClients[j] + inflight := atomic.LoadInt64(&client.inflight) + if inflight < min { + leastBusy = client + min = inflight + } + } + leastBusy.addInflight(1) + return &leastBusy +} + +// Send given document to the endpoint configured in this client. func (c *Client) Send(document Document) Result { start := c.now() result := Result{Id: document.Id} @@ -127,7 +166,7 @@ func (c *Client) Send(document Document) Result { result.Err = err return result } - resp, err := c.httpClient.Do(req, 190*time.Second) + resp, err := c.leastBusyClient().Do(req, 190*time.Second) if err != nil { result.Stats.Errors = 1 result.Status = StatusTransportFailure diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go index 311668fa16e..7c18111bb5d 100644 --- a/client/go/internal/vespa/document/http_test.go +++ b/client/go/internal/vespa/document/http_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/vespa-engine/vespa/client/go/internal/mock" + "github.com/vespa-engine/vespa/client/go/internal/util" ) type manualClock struct { @@ -25,6 +26,37 @@ func (c *manualClock) now() time.Time { func (c *manualClock) advance(d time.Duration) { c.t = c.t.Add(d) } +type mockHTTPClient struct { + id int + *mock.HTTPClient +} + +func TestLeastBusyClient(t *testing.T) { + httpClient := mock.HTTPClient{} + var httpClients []util.HTTPClient + for i := 0; i < 4; i++ { + httpClients = append(httpClients, &mockHTTPClient{i, &httpClient}) + } + client := NewClient(ClientOptions{}, httpClients) + client.httpClients[0].addInflight(1) + client.httpClients[1].addInflight(1) + assertLeastBusy(t, 2, client) + assertLeastBusy(t, 2, client) + assertLeastBusy(t, 3, client) + client.httpClients[3].addInflight(1) + client.httpClients[1].addInflight(-1) + assertLeastBusy(t, 1, client) +} + +func assertLeastBusy(t *testing.T, id int, client *Client) { + t.Helper() + leastBusy := client.leastBusyClient() + got := leastBusy.client.(*mockHTTPClient).id + if got != id { + t.Errorf("got client.id=%d, want %d", got, id) + } +} + func TestClientSend(t *testing.T) { docs := []Document{ {Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "123"}}`)}, @@ -35,7 +67,7 @@ func TestClientSend(t *testing.T) { client := NewClient(ClientOptions{ BaseURL: "https://example.com:1337", Timeout: time.Duration(5 * time.Second), - }, &httpClient) + }, []util.HTTPClient{&httpClient}) clock := manualClock{t: time.Now(), tick: time.Second} client.now = clock.now var stats Stats @@ -176,7 +208,7 @@ func TestClientFeedURL(t *testing.T) { httpClient := mock.HTTPClient{} client := NewClient(ClientOptions{ BaseURL: "https://example.com", - }, &httpClient) + }, []util.HTTPClient{&httpClient}) for i, tt := range tests { moreParams := url.Values{} moreParams.Set("foo", "ba/r") diff --git a/client/go/internal/vespa/target.go b/client/go/internal/vespa/target.go index 9eba5c6711f..bc936623bcb 100644 --- a/client/go/internal/vespa/target.go +++ b/client/go/internal/vespa/target.go @@ -105,6 +105,8 @@ func (s *Service) Do(request *http.Request, timeout time.Duration) (*http.Respon return s.httpClient.Do(request, timeout) } +func (s *Service) Client() util.HTTPClient { return s.httpClient } + // Wait polls the health check of this service until it succeeds or timeout passes. func (s *Service) Wait(timeout time.Duration) (int, error) { url := s.BaseURL @@ -119,9 +121,6 @@ func (s *Service) Wait(timeout time.Duration) (int, error) { return waitForOK(s.httpClient, url, s.TLSOptions.KeyPair, timeout) } -// ForceHTTP2 forces the underlying HTTP client to use HTTP/2. -func (s *Service) ForceHTTP2() { util.ForceHTTP2(s.httpClient, s.TLSOptions.KeyPair) } - func (s *Service) Description() string { switch s.Name { case QueryService: -- cgit v1.2.3 From 071da64bb540ded7f56b83cf68419d885184079b Mon Sep 17 00:00:00 2001 From: Martin Polden Date: Wed, 5 Apr 2023 10:41:18 +0200 Subject: Adjust min inflight based on connection count --- client/go/internal/cli/cmd/feed.go | 2 +- .../go/internal/vespa/document/dispatcher_test.go | 6 ++-- client/go/internal/vespa/document/throttler.go | 41 ++++++++++++---------- .../go/internal/vespa/document/throttler_test.go | 6 ++-- 4 files changed, 29 insertions(+), 26 deletions(-) (limited to 'client') diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index 4f43839b4fe..ff8b0bc0c0f 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -73,7 +73,7 @@ func feed(r io.Reader, cli *CLI, verbose bool, connections int) error { client := document.NewClient(document.ClientOptions{ BaseURL: service.BaseURL, }, clients) - throttler := document.NewThrottler() + throttler := document.NewThrottler(connections) // TODO(mpolden): Make doom duration configurable circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0) errWriter := io.Discard diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go index fc96adabc96..80bc5f603ae 100644 --- a/client/go/internal/vespa/document/dispatcher_test.go +++ b/client/go/internal/vespa/document/dispatcher_test.go @@ -39,7 +39,7 @@ func (f *mockFeeder) Send(doc Document) Result { func TestDispatcher(t *testing.T) { feeder := &mockFeeder{} clock := &manualClock{tick: time.Second} - throttler := newThrottler(clock.now) + throttler := newThrottler(8, clock.now) breaker := NewCircuitBreaker(time.Second, 0) dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard) docs := []Document{ @@ -72,7 +72,7 @@ func TestDispatcherOrdering(t *testing.T) { {Id: mustParseId("id:ns:type::doc9"), Operation: OperationPut}, } clock := &manualClock{tick: time.Second} - throttler := newThrottler(clock.now) + throttler := newThrottler(8, clock.now) breaker := NewCircuitBreaker(time.Second, 0) dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard) for _, d := range docs { @@ -108,7 +108,7 @@ func TestDispatcherOrderingWithFailures(t *testing.T) { } feeder.failAfterN(2) clock := &manualClock{tick: time.Second} - throttler := newThrottler(clock.now) + throttler := newThrottler(8, clock.now) breaker := NewCircuitBreaker(time.Second, 0) dispatcher := NewDispatcher(feeder, throttler, breaker, io.Discard) for _, d := range docs { diff --git a/client/go/internal/vespa/document/throttler.go b/client/go/internal/vespa/document/throttler.go index 69bb7c8d7ac..5b0aab6174e 100644 --- a/client/go/internal/vespa/document/throttler.go +++ b/client/go/internal/vespa/document/throttler.go @@ -7,13 +7,7 @@ import ( "time" ) -const ( - throttlerWeight = 0.7 - // TODO(mpolden): Multiply this by connections per endpoint, and number of endpoints when this becomes configurable - // for local target - throttlerMinInflight = 16 - throttlerMaxInflight = 256 * throttlerMinInflight // 4096 max streams per connection on the server side -) +const throttlerWeight = 0.7 type Throttler interface { // Sent notifies the the throttler that a document has been sent. @@ -27,29 +21,38 @@ type Throttler interface { } type dynamicThrottler struct { - ok int64 + minInflight int64 + maxInflight int64 targetInflight int64 targetTimesTen int64 throughputs []float64 + ok int64 sent int64 start time.Time now func() time.Time } -func newThrottler(nowFunc func() time.Time) *dynamicThrottler { +func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler { + var ( + minInflight = 16 * int64(connections) + maxInflight = 256 * minInflight // 4096 max streams per connection on the server side + ) return &dynamicThrottler{ + minInflight: minInflight, + maxInflight: maxInflight, + targetInflight: 8 * minInflight, + targetTimesTen: 10 * maxInflight, + throughputs: make([]float64, 128), - start: nowFunc(), - now: nowFunc, - targetInflight: 8 * throttlerMinInflight, - targetTimesTen: 10 * throttlerMaxInflight, + start: nowFunc(), + now: nowFunc, } } -func NewThrottler() Throttler { return newThrottler(time.Now) } +func NewThrottler(connections int) Throttler { return newThrottler(connections, time.Now) } func (t *dynamicThrottler) Sent() { currentInflight := atomic.LoadInt64(&t.targetInflight) @@ -64,7 +67,7 @@ func (t *dynamicThrottler) Sent() { currentThroughput := float64(atomic.SwapInt64(&t.ok, 0)) / float64(elapsed) // Use buckets for throughput over inflight, along the log-scale, in [minInflight, maxInflight). - index := int(float64(len(t.throughputs)) * math.Log(max(1, min(255, float64(currentInflight)/throttlerMinInflight))) / math.Log(256)) + index := int(float64(len(t.throughputs)) * math.Log(max(1, min(255, float64(currentInflight)/float64(t.minInflight)))) / math.Log(256)) t.throughputs[index] = currentThroughput // Loop over throughput measurements and pick the one which optimises throughput and latency. @@ -74,7 +77,7 @@ func (t *dynamicThrottler) Sent() { if t.throughputs[i] == 0 { continue // Skip unknown values } - inflight := float64(throttlerMinInflight) * math.Pow(256, (float64(i)+0.5)/float64(len(t.throughputs))) + inflight := float64(t.minInflight) * math.Pow(256, (float64(i)+0.5)/float64(len(t.throughputs))) objective := t.throughputs[i] * math.Pow(inflight, throttlerWeight-1) // Optimise throughput (weight), but also latency (1 - weight) if objective > maxObjective { maxObjective = objective @@ -82,7 +85,7 @@ func (t *dynamicThrottler) Sent() { } } target := int64((rand.Float64()*0.20 + 0.92) * choice) // Random walk, skewed towards increase - atomic.StoreInt64(&t.targetInflight, max(throttlerMinInflight, min(throttlerMaxInflight, target))) + atomic.StoreInt64(&t.targetInflight, max(t.minInflight, min(t.maxInflight, target))) } func (t *dynamicThrottler) Success() { @@ -91,11 +94,11 @@ func (t *dynamicThrottler) Success() { } func (t *dynamicThrottler) Throttled(inflight int64) { - atomic.StoreInt64(&t.targetTimesTen, max(inflight*5, throttlerMinInflight*10)) + atomic.StoreInt64(&t.targetTimesTen, max(inflight*5, t.minInflight*10)) } func (t *dynamicThrottler) TargetInflight() int64 { - staticTargetInflight := min(throttlerMaxInflight, atomic.LoadInt64(&t.targetTimesTen)/10) + staticTargetInflight := min(t.maxInflight, atomic.LoadInt64(&t.targetTimesTen)/10) targetInflight := atomic.LoadInt64(&t.targetInflight) return min(staticTargetInflight, targetInflight) } diff --git a/client/go/internal/vespa/document/throttler_test.go b/client/go/internal/vespa/document/throttler_test.go index 2fd1e73a45a..a22f059207f 100644 --- a/client/go/internal/vespa/document/throttler_test.go +++ b/client/go/internal/vespa/document/throttler_test.go @@ -7,15 +7,15 @@ import ( func TestThrottler(t *testing.T) { clock := &manualClock{tick: time.Second} - tr := newThrottler(clock.now) + tr := newThrottler(8, clock.now) for i := 0; i < 100; i++ { tr.Sent() } - if got, want := tr.TargetInflight(), int64(128); got != want { + if got, want := tr.TargetInflight(), int64(1024); got != want { t.Errorf("got TargetInflight() = %d, but want %d", got, want) } tr.Throttled(5) - if got, want := tr.TargetInflight(), int64(16); got != want { + if got, want := tr.TargetInflight(), int64(128); got != want { t.Errorf("got TargetInflight() = %d, but want %d", got, want) } } -- cgit v1.2.3 From ffc9c7e4f350f58a70f46844f04cb1007f83cafc Mon Sep 17 00:00:00 2001 From: Martin Polden Date: Wed, 5 Apr 2023 12:26:48 +0200 Subject: Simplify stats collecting --- client/go/internal/vespa/document/feeder.go | 2 -- client/go/internal/vespa/document/http.go | 47 ++++++++++++-------------- client/go/internal/vespa/document/http_test.go | 28 +++++++++++++-- 3 files changed, 47 insertions(+), 30 deletions(-) (limited to 'client') diff --git a/client/go/internal/vespa/document/feeder.go b/client/go/internal/vespa/document/feeder.go index 4ff612067b7..9e6768d0eb4 100644 --- a/client/go/internal/vespa/document/feeder.go +++ b/client/go/internal/vespa/document/feeder.go @@ -17,8 +17,6 @@ const ( // StatusTransportFailure indicates that there was failure in the transport layer error while sending the document // operation to Vespa. StatusTransportFailure - // StatusError is a catch-all status for any other error that might occur. - StatusError ) // Result represents the result of a feeding operation. diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go index de3e48a8e33..588330a0574 100644 --- a/client/go/internal/vespa/document/http.go +++ b/client/go/internal/vespa/document/http.go @@ -151,42 +151,35 @@ func (c *Client) leastBusyClient() *countingHTTPClient { // Send given document to the endpoint configured in this client. func (c *Client) Send(document Document) Result { start := c.now() - result := Result{Id: document.Id} - result.Stats.Requests = 1 + result := Result{Id: document.Id, Stats: Stats{Requests: 1}} method, url, err := c.feedURL(document, c.queryParams()) if err != nil { - result.Stats.Errors = 1 - result.Err = err - return result + return resultWithErr(result, err) } req, err := http.NewRequest(method, url.String(), bytes.NewReader(document.Body)) if err != nil { - result.Stats.Errors = 1 - result.Status = StatusError - result.Err = err - return result + return resultWithErr(result, err) } resp, err := c.leastBusyClient().Do(req, 190*time.Second) if err != nil { - result.Stats.Errors = 1 - result.Status = StatusTransportFailure - result.Err = err - return result + return resultWithErr(result, err) } defer resp.Body.Close() - result.Stats.Responses = 1 - result.Stats.ResponsesByCode = map[int]int64{ - resp.StatusCode: 1, - } - result.Stats.BytesSent = int64(len(document.Body)) elapsed := c.now().Sub(start) - result.Stats.TotalLatency = elapsed - result.Stats.MinLatency = elapsed - result.Stats.MaxLatency = elapsed - return c.resultWithResponse(resp, result) + return c.resultWithResponse(resp, result, document, elapsed) +} + +func resultWithErr(result Result, err error) Result { + result.Stats.Errors++ + result.Status = StatusTransportFailure + result.Err = err + return result } -func (c *Client) resultWithResponse(resp *http.Response, result Result) Result { +func (c *Client) resultWithResponse(resp *http.Response, result Result, document Document, elapsed time.Duration) Result { + result.HTTPStatus = resp.StatusCode + result.Stats.Responses++ + result.Stats.ResponsesByCode = map[int]int64{resp.StatusCode: 1} switch resp.StatusCode { case 200: result.Status = StatusSuccess @@ -204,14 +197,18 @@ func (c *Client) resultWithResponse(resp *http.Response, result Result) Result { cr := countingReader{reader: resp.Body} jsonDec := json.NewDecoder(&cr) if err := jsonDec.Decode(&body); err != nil { - result.Status = StatusError + result.Status = StatusVespaFailure result.Err = fmt.Errorf("failed to decode json response: %w", err) } result.Message = body.Message result.Trace = string(body.Trace) + result.Stats.BytesSent = int64(len(document.Body)) result.Stats.BytesRecv = cr.bytesRead if !result.Success() { - result.Stats.Errors = 1 + result.Stats.Errors++ } + result.Stats.TotalLatency = elapsed + result.Stats.MinLatency = elapsed + result.Stats.MaxLatency = elapsed return result } diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go index 7c18111bb5d..43eaf1bfdf9 100644 --- a/client/go/internal/vespa/document/http_test.go +++ b/client/go/internal/vespa/document/http_test.go @@ -72,16 +72,38 @@ func TestClientSend(t *testing.T) { client.now = clock.now var stats Stats for i, doc := range docs { + wantRes := Result{ + Id: doc.Id, + Stats: Stats{ + Requests: 1, + Responses: 1, + TotalLatency: time.Second, + MinLatency: time.Second, + MaxLatency: time.Second, + BytesSent: 25, + }, + } if i < 2 { httpClient.NextResponseString(200, `{"message":"All good!"}`) + wantRes.Status = StatusSuccess + wantRes.HTTPStatus = 200 + wantRes.Message = "All good!" + wantRes.Stats.ResponsesByCode = map[int]int64{200: 1} + wantRes.Stats.BytesRecv = 23 } else { httpClient.NextResponseString(502, `{"message":"Good bye, cruel world!"}`) + wantRes.Status = StatusVespaFailure + wantRes.HTTPStatus = 502 + wantRes.Message = "Good bye, cruel world!" + wantRes.Stats.ResponsesByCode = map[int]int64{502: 1} + wantRes.Stats.Errors = 1 + wantRes.Stats.BytesRecv = 36 } res := client.Send(doc) - stats.Add(res.Stats) - if res.Err != nil { - t.Fatalf("got unexpected error %q", res.Err) + if !reflect.DeepEqual(res, wantRes) { + t.Fatalf("got result %+v, want %+v", res, wantRes) } + stats.Add(res.Stats) r := httpClient.LastRequest if r.Method != http.MethodPut { t.Errorf("got r.Method = %q, want %q", r.Method, http.MethodPut) -- cgit v1.2.3 From a51e87a9b5c9931fdb1895d996661687166bb863 Mon Sep 17 00:00:00 2001 From: Martin Polden Date: Wed, 5 Apr 2023 12:27:54 +0200 Subject: Queue retries --- client/go/internal/vespa/document/dispatcher.go | 106 ++++++++++++------------ 1 file changed, 55 insertions(+), 51 deletions(-) (limited to 'client') diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index 9d757aa51aa..838a7bc45ee 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -1,6 +1,7 @@ package document import ( + "container/list" "fmt" "io" "sync" @@ -17,7 +18,7 @@ type Dispatcher struct { circuitBreaker CircuitBreaker stats Stats - closed bool + started bool ready chan Id results chan Result inflight map[string]*documentGroup @@ -29,22 +30,29 @@ type Dispatcher struct { resultWg sync.WaitGroup } -// documentGroup holds document operations which share their ID, and must be dispatched in order. -type documentGroup struct { - id Id - operations []documentOp - mu sync.Mutex -} - +// documentOp represents a document operation and the number of times it has been attempted. type documentOp struct { document Document attempts int } -func (g *documentGroup) append(op documentOp) { +// documentGroup holds document operations which share an ID, and must be dispatched in order. +type documentGroup struct { + ops *list.List + mu sync.Mutex +} + +func (g *documentGroup) add(op documentOp, first bool) { g.mu.Lock() defer g.mu.Unlock() - g.operations = append(g.operations, op) + if g.ops == nil { + g.ops = list.New() + } + if first { + g.ops.PushFront(op) + } else { + g.ops.PushBack(op) + } } func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, errWriter io.Writer) *Dispatcher { @@ -59,24 +67,21 @@ func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, e return d } -func (d *Dispatcher) dispatchAll(g *documentGroup) { - g.mu.Lock() - defer g.mu.Unlock() - for i := 0; i < len(g.operations); i++ { - op := g.operations[i] - ok := false - for !ok { - op.attempts++ - result := d.feeder.Send(op.document) - d.results <- result - ok = result.Success() - if !d.shouldRetry(op, result) { - break - } - } - d.releaseSlot() +func (d *Dispatcher) sendDocumentIn(group *documentGroup) { + group.mu.Lock() + defer group.mu.Unlock() + defer d.releaseSlot() + first := group.ops.Front() + if first == nil { + panic("sending from empty document group, this should not happen") + } + op := group.ops.Remove(first).(documentOp) + op.attempts++ + result := d.feeder.Send(op.document) + d.results <- result + if d.shouldRetry(op, result) { + d.enqueue(op) } - g.operations = nil } func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool { @@ -115,9 +120,12 @@ func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool { func (d *Dispatcher) start() { d.mu.Lock() defer d.mu.Unlock() + if d.started { + return + } d.ready = make(chan Id, 4096) d.results = make(chan Result, 4096) - d.closed = false + d.started = true d.wg.Add(1) go func() { defer d.wg.Done() @@ -135,13 +143,11 @@ func (d *Dispatcher) readDocuments() { d.mu.RLock() group := d.inflight[id.String()] d.mu.RUnlock() - if group != nil { - d.wg.Add(1) - go func() { - defer d.wg.Done() - d.dispatchAll(group) - }() - } + d.wg.Add(1) + go func() { + defer d.wg.Done() + d.sendDocumentIn(group) + }() } } @@ -151,28 +157,22 @@ func (d *Dispatcher) readResults() { } } -func (d *Dispatcher) Enqueue(doc Document) error { +func (d *Dispatcher) enqueue(op documentOp) error { d.mu.Lock() - if d.closed { + if !d.started { return fmt.Errorf("dispatcher is closed") } - group, ok := d.inflight[doc.Id.String()] - if ok { - group.append(documentOp{document: doc}) - } else { - group = &documentGroup{ - id: doc.Id, - operations: []documentOp{{document: doc}}, - } - d.inflight[doc.Id.String()] = group + group, ok := d.inflight[op.document.Id.String()] + if !ok { + group = &documentGroup{} + d.inflight[op.document.Id.String()] = group } d.mu.Unlock() - d.enqueueWithSlot(doc.Id) + group.add(op, op.attempts > 0) + d.enqueueWithSlot(op.document.Id) return nil } -func (d *Dispatcher) Stats() Stats { return d.stats } - func (d *Dispatcher) enqueueWithSlot(id Id) { d.acquireSlot() d.ready <- id @@ -190,16 +190,20 @@ func (d *Dispatcher) releaseSlot() { atomic.AddInt64(&d.inflightCount, -1) } func closeAndWait[T any](ch chan T, wg *sync.WaitGroup, d *Dispatcher, markClosed bool) { d.mu.Lock() - if !d.closed { + if d.started { close(ch) if markClosed { - d.closed = true + d.started = false } } d.mu.Unlock() wg.Wait() } +func (d *Dispatcher) Enqueue(doc Document) error { return d.enqueue(documentOp{document: doc}) } + +func (d *Dispatcher) Stats() Stats { return d.stats } + // Close closes the dispatcher and waits for all inflight operations to complete. func (d *Dispatcher) Close() error { closeAndWait(d.ready, &d.wg, d, false) -- cgit v1.2.3 From 2e716d564ef8b8aff1a443f11759acb23a52641b Mon Sep 17 00:00:00 2001 From: Martin Polden Date: Wed, 5 Apr 2023 16:44:17 +0200 Subject: Support reading documents from stdin --- client/go/internal/cli/cmd/feed.go | 19 ++++++++++++++----- client/go/internal/cli/cmd/feed_test.go | 15 +++++++++++++-- 2 files changed, 27 insertions(+), 7 deletions(-) (limited to 'client') diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index ff8b0bc0c0f..895a22d2be5 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -34,20 +34,29 @@ documents to a Vespa cluster efficiently. The contents of FILE must be either a JSON array or JSON objects separated by newline (JSONL). + +If FILE is a single dash ('-'), documents will be read from standard input. `, Example: `$ vespa feed documents.jsonl +$ cat documents.jsonl | vespa feed - `, Args: cobra.ExactArgs(1), DisableAutoGenTag: true, SilenceUsage: true, Hidden: true, // TODO(mpolden): Remove when ready for public use RunE: func(cmd *cobra.Command, args []string) error { - f, err := os.Open(args[0]) - if err != nil { - return err + var r io.Reader + if args[0] == "-" { + r = cli.Stdin + } else { + f, err := os.Open(args[0]) + if err != nil { + return err + } + defer f.Close() + r = f } - defer f.Close() - return feed(f, cli, verbose, connections) + return feed(r, cli, verbose, connections) }, } addFeedFlags(cmd, &verbose, &connections) diff --git a/client/go/internal/cli/cmd/feed_test.go b/client/go/internal/cli/cmd/feed_test.go index 1bf1ef6ab9b..521d2b2abd0 100644 --- a/client/go/internal/cli/cmd/feed_test.go +++ b/client/go/internal/cli/cmd/feed_test.go @@ -1,6 +1,7 @@ package cmd import ( + "bytes" "os" "path/filepath" "testing" @@ -42,7 +43,7 @@ func TestFeed(t *testing.T) { require.Nil(t, cli.Run("feed", jsonFile)) assert.Equal(t, "", stderr.String()) - assert.Equal(t, `{ + want := `{ "feeder.seconds": 1.000, "feeder.ok.count": 1, "feeder.ok.rate": 1.000, @@ -63,5 +64,15 @@ func TestFeed(t *testing.T) { "200": 1 } } -`, stdout.String()) +` + assert.Equal(t, want, stdout.String()) + + stdout.Reset() + cli.Stdin = bytes.NewBuffer([]byte(`{ + "put": "id:ns:type::doc1", + "fields": {"foo": "123"} +}`)) + httpClient.NextResponseString(200, `{"message":"OK"}`) + require.Nil(t, cli.Run("feed", "-")) + assert.Equal(t, want, stdout.String()) } -- cgit v1.2.3