diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-04-05 10:09:40 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-04-11 10:27:09 +0200 |
commit | 724ca1704c58d9b40da0f0730cefd63557c04215 (patch) | |
tree | 415be39c9a432e155a1a3a2c9cb0b3f41b4bf71f | |
parent | 65607d6117b72cefa64ec13189e904f34cff871b (diff) |
Support multiple connections
-rw-r--r-- | client/go/internal/cli/cmd/feed.go | 30 | ||||
-rw-r--r-- | client/go/internal/mock/http.go | 4 | ||||
-rw-r--r-- | client/go/internal/util/http.go | 3 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http.go | 61 | ||||
-rw-r--r-- | client/go/internal/vespa/document/http_test.go | 36 | ||||
-rw-r--r-- | client/go/internal/vespa/target.go | 5 |
6 files changed, 115 insertions, 24 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index 0244004b512..4f43839b4fe 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -9,16 +9,20 @@ import ( "time" "github.com/spf13/cobra" + "github.com/vespa-engine/vespa/client/go/internal/util" + "github.com/vespa-engine/vespa/client/go/internal/vespa" "github.com/vespa-engine/vespa/client/go/internal/vespa/document" ) -func addFeedFlags(cmd *cobra.Command, verbose *bool) { +func addFeedFlags(cmd *cobra.Command, verbose *bool, connections *int) { + cmd.PersistentFlags().IntVarP(connections, "connections", "N", 8, "The number of connections to use") cmd.PersistentFlags().BoolVarP(verbose, "verbose", "v", false, "Verbose mode. Print errors as they happen") } func newFeedCmd(cli *CLI) *cobra.Command { var ( - verbose bool + verbose bool + connections int ) cmd := &cobra.Command{ Use: "feed FILE", @@ -26,7 +30,7 @@ func newFeedCmd(cli *CLI) *cobra.Command { Long: `Feed documents to a Vespa cluster. A high performance feeding client. This can be used to feed large amounts of -documents to Vespa cluster efficiently. +documents to a Vespa cluster efficiently. The contents of FILE must be either a JSON array or JSON objects separated by newline (JSONL). @@ -43,22 +47,32 @@ newline (JSONL). return err } defer f.Close() - return feed(f, cli, verbose) + return feed(f, cli, verbose, connections) }, } - addFeedFlags(cmd, &verbose) + addFeedFlags(cmd, &verbose, &connections) return cmd } -func feed(r io.Reader, cli *CLI, verbose bool) error { +func createServiceClients(service *vespa.Service, n int) []util.HTTPClient { + clients := make([]util.HTTPClient, 0, n) + for i := 0; i < n; i++ { + client := service.Client().Clone() + util.ForceHTTP2(client, service.TLSOptions.KeyPair) // Feeding should always use HTTP/2 + clients = append(clients, client) + } + return clients +} + +func feed(r io.Reader, cli *CLI, verbose bool, connections int) error { service, err := documentService(cli) if err != nil { return err } - service.ForceHTTP2() // Feeding should always use HTTP/2 + clients := createServiceClients(service, connections) client := document.NewClient(document.ClientOptions{ BaseURL: service.BaseURL, - }, service) + }, clients) throttler := document.NewThrottler() // TODO(mpolden): Make doom duration configurable circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0) diff --git a/client/go/internal/mock/http.go b/client/go/internal/mock/http.go index 9c55f2e79bf..58614d7e5bd 100644 --- a/client/go/internal/mock/http.go +++ b/client/go/internal/mock/http.go @@ -6,6 +6,8 @@ import ( "net/http" "strconv" "time" + + "github.com/vespa-engine/vespa/client/go/internal/util" ) type HTTPClient struct { @@ -58,3 +60,5 @@ func (c *HTTPClient) Do(request *http.Request, timeout time.Duration) (*http.Res }, nil } + +func (c *HTTPClient) Clone() util.HTTPClient { return c } diff --git a/client/go/internal/util/http.go b/client/go/internal/util/http.go index a3de212134d..dcf05ed3a14 100644 --- a/client/go/internal/util/http.go +++ b/client/go/internal/util/http.go @@ -15,6 +15,7 @@ import ( type HTTPClient interface { Do(request *http.Request, timeout time.Duration) (response *http.Response, error error) + Clone() HTTPClient } type defaultHTTPClient struct { @@ -32,6 +33,8 @@ func (c *defaultHTTPClient) Do(request *http.Request, timeout time.Duration) (re return c.client.Do(request) } +func (c *defaultHTTPClient) Clone() HTTPClient { return CreateClient(c.client.Timeout) } + func SetCertificates(client HTTPClient, certificates []tls.Certificate) { c, ok := client.(*defaultHTTPClient) if !ok { diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go index b1d5c80f29f..de3e48a8e33 100644 --- a/client/go/internal/vespa/document/http.go +++ b/client/go/internal/vespa/document/http.go @@ -5,10 +5,12 @@ import ( "encoding/json" "fmt" "io" + "math" "net/http" "net/url" "strconv" "strings" + "sync/atomic" "time" "github.com/vespa-engine/vespa/client/go/internal/util" @@ -16,9 +18,10 @@ import ( // Client represents a HTTP client for the /document/v1/ API. type Client struct { - options ClientOptions - httpClient util.HTTPClient - now func() time.Time + options ClientOptions + httpClients []countingHTTPClient + now func() time.Time + sendCount int32 } // ClientOptions specifices the configuration options of a feed client. @@ -29,6 +32,18 @@ type ClientOptions struct { TraceLevel *int } +type countingHTTPClient struct { + client util.HTTPClient + inflight int64 +} + +func (c *countingHTTPClient) addInflight(n int64) { atomic.AddInt64(&c.inflight, n) } + +func (c *countingHTTPClient) Do(req *http.Request, timeout time.Duration) (*http.Response, error) { + defer c.addInflight(-1) + return c.client.Do(req, timeout) +} + type countingReader struct { reader io.Reader bytesRead int64 @@ -40,13 +55,19 @@ func (r *countingReader) Read(p []byte) (int, error) { return n, err } -func NewClient(options ClientOptions, httpClient util.HTTPClient) *Client { - c := &Client{ - options: options, - httpClient: httpClient, - now: time.Now, +func NewClient(options ClientOptions, httpClients []util.HTTPClient) *Client { + if len(httpClients) < 1 { + panic("need at least one HTTP client") + } + countingClients := make([]countingHTTPClient, 0, len(httpClients)) + for _, client := range httpClients { + countingClients = append(countingClients, countingHTTPClient{client: client}) + } + return &Client{ + options: options, + httpClients: countingClients, + now: time.Now, } - return c } func (c *Client) queryParams() url.Values { @@ -109,7 +130,25 @@ func (c *Client) feedURL(d Document, queryParams url.Values) (string, *url.URL, return httpMethod, u, nil } -// Send given document the URL configured in this client. +func (c *Client) leastBusyClient() *countingHTTPClient { + leastBusy := c.httpClients[0] + min := int64(math.MaxInt64) + next := atomic.AddInt32(&c.sendCount, 1) + start := int(next) % len(c.httpClients) + for i := range c.httpClients { + j := (i + start) % len(c.httpClients) + client := c.httpClients[j] + inflight := atomic.LoadInt64(&client.inflight) + if inflight < min { + leastBusy = client + min = inflight + } + } + leastBusy.addInflight(1) + return &leastBusy +} + +// Send given document to the endpoint configured in this client. func (c *Client) Send(document Document) Result { start := c.now() result := Result{Id: document.Id} @@ -127,7 +166,7 @@ func (c *Client) Send(document Document) Result { result.Err = err return result } - resp, err := c.httpClient.Do(req, 190*time.Second) + resp, err := c.leastBusyClient().Do(req, 190*time.Second) if err != nil { result.Stats.Errors = 1 result.Status = StatusTransportFailure diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go index 311668fa16e..7c18111bb5d 100644 --- a/client/go/internal/vespa/document/http_test.go +++ b/client/go/internal/vespa/document/http_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/vespa-engine/vespa/client/go/internal/mock" + "github.com/vespa-engine/vespa/client/go/internal/util" ) type manualClock struct { @@ -25,6 +26,37 @@ func (c *manualClock) now() time.Time { func (c *manualClock) advance(d time.Duration) { c.t = c.t.Add(d) } +type mockHTTPClient struct { + id int + *mock.HTTPClient +} + +func TestLeastBusyClient(t *testing.T) { + httpClient := mock.HTTPClient{} + var httpClients []util.HTTPClient + for i := 0; i < 4; i++ { + httpClients = append(httpClients, &mockHTTPClient{i, &httpClient}) + } + client := NewClient(ClientOptions{}, httpClients) + client.httpClients[0].addInflight(1) + client.httpClients[1].addInflight(1) + assertLeastBusy(t, 2, client) + assertLeastBusy(t, 2, client) + assertLeastBusy(t, 3, client) + client.httpClients[3].addInflight(1) + client.httpClients[1].addInflight(-1) + assertLeastBusy(t, 1, client) +} + +func assertLeastBusy(t *testing.T, id int, client *Client) { + t.Helper() + leastBusy := client.leastBusyClient() + got := leastBusy.client.(*mockHTTPClient).id + if got != id { + t.Errorf("got client.id=%d, want %d", got, id) + } +} + func TestClientSend(t *testing.T) { docs := []Document{ {Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "123"}}`)}, @@ -35,7 +67,7 @@ func TestClientSend(t *testing.T) { client := NewClient(ClientOptions{ BaseURL: "https://example.com:1337", Timeout: time.Duration(5 * time.Second), - }, &httpClient) + }, []util.HTTPClient{&httpClient}) clock := manualClock{t: time.Now(), tick: time.Second} client.now = clock.now var stats Stats @@ -176,7 +208,7 @@ func TestClientFeedURL(t *testing.T) { httpClient := mock.HTTPClient{} client := NewClient(ClientOptions{ BaseURL: "https://example.com", - }, &httpClient) + }, []util.HTTPClient{&httpClient}) for i, tt := range tests { moreParams := url.Values{} moreParams.Set("foo", "ba/r") diff --git a/client/go/internal/vespa/target.go b/client/go/internal/vespa/target.go index 9eba5c6711f..bc936623bcb 100644 --- a/client/go/internal/vespa/target.go +++ b/client/go/internal/vespa/target.go @@ -105,6 +105,8 @@ func (s *Service) Do(request *http.Request, timeout time.Duration) (*http.Respon return s.httpClient.Do(request, timeout) } +func (s *Service) Client() util.HTTPClient { return s.httpClient } + // Wait polls the health check of this service until it succeeds or timeout passes. func (s *Service) Wait(timeout time.Duration) (int, error) { url := s.BaseURL @@ -119,9 +121,6 @@ func (s *Service) Wait(timeout time.Duration) (int, error) { return waitForOK(s.httpClient, url, s.TLSOptions.KeyPair, timeout) } -// ForceHTTP2 forces the underlying HTTP client to use HTTP/2. -func (s *Service) ForceHTTP2() { util.ForceHTTP2(s.httpClient, s.TLSOptions.KeyPair) } - func (s *Service) Description() string { switch s.Name { case QueryService: |