summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-04-05 10:09:40 +0200
committerMartin Polden <mpolden@mpolden.no>2023-04-11 10:27:09 +0200
commit724ca1704c58d9b40da0f0730cefd63557c04215 (patch)
tree415be39c9a432e155a1a3a2c9cb0b3f41b4bf71f
parent65607d6117b72cefa64ec13189e904f34cff871b (diff)
Support multiple connections
-rw-r--r--client/go/internal/cli/cmd/feed.go30
-rw-r--r--client/go/internal/mock/http.go4
-rw-r--r--client/go/internal/util/http.go3
-rw-r--r--client/go/internal/vespa/document/http.go61
-rw-r--r--client/go/internal/vespa/document/http_test.go36
-rw-r--r--client/go/internal/vespa/target.go5
6 files changed, 115 insertions, 24 deletions
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go
index 0244004b512..4f43839b4fe 100644
--- a/client/go/internal/cli/cmd/feed.go
+++ b/client/go/internal/cli/cmd/feed.go
@@ -9,16 +9,20 @@ import (
"time"
"github.com/spf13/cobra"
+ "github.com/vespa-engine/vespa/client/go/internal/util"
+ "github.com/vespa-engine/vespa/client/go/internal/vespa"
"github.com/vespa-engine/vespa/client/go/internal/vespa/document"
)
-func addFeedFlags(cmd *cobra.Command, verbose *bool) {
+func addFeedFlags(cmd *cobra.Command, verbose *bool, connections *int) {
+ cmd.PersistentFlags().IntVarP(connections, "connections", "N", 8, "The number of connections to use")
cmd.PersistentFlags().BoolVarP(verbose, "verbose", "v", false, "Verbose mode. Print errors as they happen")
}
func newFeedCmd(cli *CLI) *cobra.Command {
var (
- verbose bool
+ verbose bool
+ connections int
)
cmd := &cobra.Command{
Use: "feed FILE",
@@ -26,7 +30,7 @@ func newFeedCmd(cli *CLI) *cobra.Command {
Long: `Feed documents to a Vespa cluster.
A high performance feeding client. This can be used to feed large amounts of
-documents to Vespa cluster efficiently.
+documents to a Vespa cluster efficiently.
The contents of FILE must be either a JSON array or JSON objects separated by
newline (JSONL).
@@ -43,22 +47,32 @@ newline (JSONL).
return err
}
defer f.Close()
- return feed(f, cli, verbose)
+ return feed(f, cli, verbose, connections)
},
}
- addFeedFlags(cmd, &verbose)
+ addFeedFlags(cmd, &verbose, &connections)
return cmd
}
-func feed(r io.Reader, cli *CLI, verbose bool) error {
+func createServiceClients(service *vespa.Service, n int) []util.HTTPClient {
+ clients := make([]util.HTTPClient, 0, n)
+ for i := 0; i < n; i++ {
+ client := service.Client().Clone()
+ util.ForceHTTP2(client, service.TLSOptions.KeyPair) // Feeding should always use HTTP/2
+ clients = append(clients, client)
+ }
+ return clients
+}
+
+func feed(r io.Reader, cli *CLI, verbose bool, connections int) error {
service, err := documentService(cli)
if err != nil {
return err
}
- service.ForceHTTP2() // Feeding should always use HTTP/2
+ clients := createServiceClients(service, connections)
client := document.NewClient(document.ClientOptions{
BaseURL: service.BaseURL,
- }, service)
+ }, clients)
throttler := document.NewThrottler()
// TODO(mpolden): Make doom duration configurable
circuitBreaker := document.NewCircuitBreaker(10*time.Second, 0)
diff --git a/client/go/internal/mock/http.go b/client/go/internal/mock/http.go
index 9c55f2e79bf..58614d7e5bd 100644
--- a/client/go/internal/mock/http.go
+++ b/client/go/internal/mock/http.go
@@ -6,6 +6,8 @@ import (
"net/http"
"strconv"
"time"
+
+ "github.com/vespa-engine/vespa/client/go/internal/util"
)
type HTTPClient struct {
@@ -58,3 +60,5 @@ func (c *HTTPClient) Do(request *http.Request, timeout time.Duration) (*http.Res
},
nil
}
+
+func (c *HTTPClient) Clone() util.HTTPClient { return c }
diff --git a/client/go/internal/util/http.go b/client/go/internal/util/http.go
index a3de212134d..dcf05ed3a14 100644
--- a/client/go/internal/util/http.go
+++ b/client/go/internal/util/http.go
@@ -15,6 +15,7 @@ import (
type HTTPClient interface {
Do(request *http.Request, timeout time.Duration) (response *http.Response, error error)
+ Clone() HTTPClient
}
type defaultHTTPClient struct {
@@ -32,6 +33,8 @@ func (c *defaultHTTPClient) Do(request *http.Request, timeout time.Duration) (re
return c.client.Do(request)
}
+func (c *defaultHTTPClient) Clone() HTTPClient { return CreateClient(c.client.Timeout) }
+
func SetCertificates(client HTTPClient, certificates []tls.Certificate) {
c, ok := client.(*defaultHTTPClient)
if !ok {
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go
index b1d5c80f29f..de3e48a8e33 100644
--- a/client/go/internal/vespa/document/http.go
+++ b/client/go/internal/vespa/document/http.go
@@ -5,10 +5,12 @@ import (
"encoding/json"
"fmt"
"io"
+ "math"
"net/http"
"net/url"
"strconv"
"strings"
+ "sync/atomic"
"time"
"github.com/vespa-engine/vespa/client/go/internal/util"
@@ -16,9 +18,10 @@ import (
// Client represents a HTTP client for the /document/v1/ API.
type Client struct {
- options ClientOptions
- httpClient util.HTTPClient
- now func() time.Time
+ options ClientOptions
+ httpClients []countingHTTPClient
+ now func() time.Time
+ sendCount int32
}
// ClientOptions specifices the configuration options of a feed client.
@@ -29,6 +32,18 @@ type ClientOptions struct {
TraceLevel *int
}
+type countingHTTPClient struct {
+ client util.HTTPClient
+ inflight int64
+}
+
+func (c *countingHTTPClient) addInflight(n int64) { atomic.AddInt64(&c.inflight, n) }
+
+func (c *countingHTTPClient) Do(req *http.Request, timeout time.Duration) (*http.Response, error) {
+ defer c.addInflight(-1)
+ return c.client.Do(req, timeout)
+}
+
type countingReader struct {
reader io.Reader
bytesRead int64
@@ -40,13 +55,19 @@ func (r *countingReader) Read(p []byte) (int, error) {
return n, err
}
-func NewClient(options ClientOptions, httpClient util.HTTPClient) *Client {
- c := &Client{
- options: options,
- httpClient: httpClient,
- now: time.Now,
+func NewClient(options ClientOptions, httpClients []util.HTTPClient) *Client {
+ if len(httpClients) < 1 {
+ panic("need at least one HTTP client")
+ }
+ countingClients := make([]countingHTTPClient, 0, len(httpClients))
+ for _, client := range httpClients {
+ countingClients = append(countingClients, countingHTTPClient{client: client})
+ }
+ return &Client{
+ options: options,
+ httpClients: countingClients,
+ now: time.Now,
}
- return c
}
func (c *Client) queryParams() url.Values {
@@ -109,7 +130,25 @@ func (c *Client) feedURL(d Document, queryParams url.Values) (string, *url.URL,
return httpMethod, u, nil
}
-// Send given document the URL configured in this client.
+func (c *Client) leastBusyClient() *countingHTTPClient {
+ leastBusy := c.httpClients[0]
+ min := int64(math.MaxInt64)
+ next := atomic.AddInt32(&c.sendCount, 1)
+ start := int(next) % len(c.httpClients)
+ for i := range c.httpClients {
+ j := (i + start) % len(c.httpClients)
+ client := c.httpClients[j]
+ inflight := atomic.LoadInt64(&client.inflight)
+ if inflight < min {
+ leastBusy = client
+ min = inflight
+ }
+ }
+ leastBusy.addInflight(1)
+ return &leastBusy
+}
+
+// Send given document to the endpoint configured in this client.
func (c *Client) Send(document Document) Result {
start := c.now()
result := Result{Id: document.Id}
@@ -127,7 +166,7 @@ func (c *Client) Send(document Document) Result {
result.Err = err
return result
}
- resp, err := c.httpClient.Do(req, 190*time.Second)
+ resp, err := c.leastBusyClient().Do(req, 190*time.Second)
if err != nil {
result.Stats.Errors = 1
result.Status = StatusTransportFailure
diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go
index 311668fa16e..7c18111bb5d 100644
--- a/client/go/internal/vespa/document/http_test.go
+++ b/client/go/internal/vespa/document/http_test.go
@@ -11,6 +11,7 @@ import (
"time"
"github.com/vespa-engine/vespa/client/go/internal/mock"
+ "github.com/vespa-engine/vespa/client/go/internal/util"
)
type manualClock struct {
@@ -25,6 +26,37 @@ func (c *manualClock) now() time.Time {
func (c *manualClock) advance(d time.Duration) { c.t = c.t.Add(d) }
+type mockHTTPClient struct {
+ id int
+ *mock.HTTPClient
+}
+
+func TestLeastBusyClient(t *testing.T) {
+ httpClient := mock.HTTPClient{}
+ var httpClients []util.HTTPClient
+ for i := 0; i < 4; i++ {
+ httpClients = append(httpClients, &mockHTTPClient{i, &httpClient})
+ }
+ client := NewClient(ClientOptions{}, httpClients)
+ client.httpClients[0].addInflight(1)
+ client.httpClients[1].addInflight(1)
+ assertLeastBusy(t, 2, client)
+ assertLeastBusy(t, 2, client)
+ assertLeastBusy(t, 3, client)
+ client.httpClients[3].addInflight(1)
+ client.httpClients[1].addInflight(-1)
+ assertLeastBusy(t, 1, client)
+}
+
+func assertLeastBusy(t *testing.T, id int, client *Client) {
+ t.Helper()
+ leastBusy := client.leastBusyClient()
+ got := leastBusy.client.(*mockHTTPClient).id
+ if got != id {
+ t.Errorf("got client.id=%d, want %d", got, id)
+ }
+}
+
func TestClientSend(t *testing.T) {
docs := []Document{
{Create: true, Id: mustParseId("id:ns:type::doc1"), Operation: OperationUpdate, Body: []byte(`{"fields":{"foo": "123"}}`)},
@@ -35,7 +67,7 @@ func TestClientSend(t *testing.T) {
client := NewClient(ClientOptions{
BaseURL: "https://example.com:1337",
Timeout: time.Duration(5 * time.Second),
- }, &httpClient)
+ }, []util.HTTPClient{&httpClient})
clock := manualClock{t: time.Now(), tick: time.Second}
client.now = clock.now
var stats Stats
@@ -176,7 +208,7 @@ func TestClientFeedURL(t *testing.T) {
httpClient := mock.HTTPClient{}
client := NewClient(ClientOptions{
BaseURL: "https://example.com",
- }, &httpClient)
+ }, []util.HTTPClient{&httpClient})
for i, tt := range tests {
moreParams := url.Values{}
moreParams.Set("foo", "ba/r")
diff --git a/client/go/internal/vespa/target.go b/client/go/internal/vespa/target.go
index 9eba5c6711f..bc936623bcb 100644
--- a/client/go/internal/vespa/target.go
+++ b/client/go/internal/vespa/target.go
@@ -105,6 +105,8 @@ func (s *Service) Do(request *http.Request, timeout time.Duration) (*http.Respon
return s.httpClient.Do(request, timeout)
}
+func (s *Service) Client() util.HTTPClient { return s.httpClient }
+
// Wait polls the health check of this service until it succeeds or timeout passes.
func (s *Service) Wait(timeout time.Duration) (int, error) {
url := s.BaseURL
@@ -119,9 +121,6 @@ func (s *Service) Wait(timeout time.Duration) (int, error) {
return waitForOK(s.httpClient, url, s.TLSOptions.KeyPair, timeout)
}
-// ForceHTTP2 forces the underlying HTTP client to use HTTP/2.
-func (s *Service) ForceHTTP2() { util.ForceHTTP2(s.httpClient, s.TLSOptions.KeyPair) }
-
func (s *Service) Description() string {
switch s.Name {
case QueryService: