diff options
Diffstat (limited to 'client')
37 files changed, 1424 insertions, 130 deletions
diff --git a/client/go/Makefile b/client/go/Makefile index b8ce953eff0..c70154eb247 100644 --- a/client/go/Makefile +++ b/client/go/Makefile @@ -13,7 +13,7 @@ BIN ?= $(PREFIX)/bin SHARE ?= $(PREFIX)/share DIST ?= $(CURDIR)/dist -GO_FLAGS := -ldflags "-X github.com/vespa-engine/vespa/client/go/internal/cli/build.Version=$(VERSION)" +GO_FLAGS := -ldflags "-X github.com/vespa-engine/vespa/client/go/internal/build.Version=$(VERSION)" GIT_ROOT := $(shell git rev-parse --show-toplevel) DIST_TARGETS := dist-mac dist-mac-arm64 dist-linux dist-linux-arm64 dist-win32 dist-win64 diff --git a/client/go/go.sum b/client/go/go.sum index 084bde701ce..3252517a08a 100644 --- a/client/go/go.sum +++ b/client/go/go.sum @@ -4,10 +4,13 @@ github.com/cpuguy83/go-md2man/v2 v2.0.1 h1:r/myEWzV9lfsM1tFLgDyu0atFtJ1fXn261LKY github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/danieljoos/wincred v1.1.0 h1:3RNcEpBg4IhIChZdFRSdlQt1QjCp1sMAPIrOnm7Yf8g= github.com/danieljoos/wincred v1.1.0/go.mod h1:XYlo+eRTsVA9aHGp7NGjFkPla4m+DCL7hqDjlFjiygg= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= +github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= @@ -19,8 +22,11 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8= github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.13 h1:qdl+GuBjcsKKDco5BsxPJlId98mSWNKqYA+Co0SC1yA= github.com/mattn/go-isatty v0.0.13/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/pkg/browser v0.0.0-20210706143420-7d21f8c997e2 h1:acNfDZXmm28D2Yg/c3ALnZStzNaZMSagpbr96vY6Zjc= @@ -33,17 +39,26 @@ github.com/spf13/cobra v1.4.0 h1:y+wJpx64xcgO1V+RcnwW0LEHxTKRi2ZDPSBjWnrg88Q= github.com/spf13/cobra v1.4.0/go.mod h1:Wo4iy3BUC+X2Fybo0PDqwJIv3dNRiZLHQymsfxlB84g= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/zalando/go-keyring v0.1.1 h1:w2V9lcx/Uj4l+dzAf1m9s+DJ1O8ROkEHnynonHjTcYE= github.com/zalando/go-keyring v0.1.1/go.mod h1:OIC+OZ28XbmwFxU/Rp9V7eKzZjamBJwRzC8UFJH9+L8= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210616045830-e2b7044e8c71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/client/go/internal/admin/clusterstate/get_cluster_state.go b/client/go/internal/admin/clusterstate/get_cluster_state.go index 505235a284e..d5d2b894fe4 100644 --- a/client/go/internal/admin/clusterstate/get_cluster_state.go +++ b/client/go/internal/admin/clusterstate/get_cluster_state.go @@ -13,7 +13,7 @@ import ( "github.com/spf13/cobra" "github.com/vespa-engine/vespa/client/go/internal/admin/envvars" "github.com/vespa-engine/vespa/client/go/internal/admin/trace" - "github.com/vespa-engine/vespa/client/go/internal/cli/build" + "github.com/vespa-engine/vespa/client/go/internal/build" ) func NewGetClusterStateCmd() *cobra.Command { diff --git a/client/go/internal/admin/clusterstate/get_node_state.go b/client/go/internal/admin/clusterstate/get_node_state.go index 6d45e377a72..2308446e0fc 100644 --- a/client/go/internal/admin/clusterstate/get_node_state.go +++ b/client/go/internal/admin/clusterstate/get_node_state.go @@ -14,7 +14,7 @@ import ( "github.com/spf13/cobra" "github.com/vespa-engine/vespa/client/go/internal/admin/envvars" "github.com/vespa-engine/vespa/client/go/internal/admin/trace" - "github.com/vespa-engine/vespa/client/go/internal/cli/build" + "github.com/vespa-engine/vespa/client/go/internal/build" ) const ( diff --git a/client/go/internal/admin/clusterstate/set_node_state.go b/client/go/internal/admin/clusterstate/set_node_state.go index 2a6869c84f5..6c1542b230c 100644 --- a/client/go/internal/admin/clusterstate/set_node_state.go +++ b/client/go/internal/admin/clusterstate/set_node_state.go @@ -15,7 +15,7 @@ import ( "github.com/spf13/cobra" "github.com/vespa-engine/vespa/client/go/internal/admin/envvars" "github.com/vespa-engine/vespa/client/go/internal/admin/trace" - "github.com/vespa-engine/vespa/client/go/internal/cli/build" + "github.com/vespa-engine/vespa/client/go/internal/build" "github.com/vespa-engine/vespa/client/go/internal/util" ) diff --git a/client/go/internal/admin/deploy/cmd.go b/client/go/internal/admin/deploy/cmd.go index c4489d11771..9ae7cb894a6 100644 --- a/client/go/internal/admin/deploy/cmd.go +++ b/client/go/internal/admin/deploy/cmd.go @@ -10,7 +10,7 @@ import ( "github.com/spf13/cobra" "github.com/vespa-engine/vespa/client/go/internal/admin/trace" - "github.com/vespa-engine/vespa/client/go/internal/cli/build" + "github.com/vespa-engine/vespa/client/go/internal/build" "github.com/vespa-engine/vespa/client/go/internal/util" "github.com/vespa-engine/vespa/client/go/internal/vespa" ) diff --git a/client/go/internal/admin/jvm/container.go b/client/go/internal/admin/jvm/container.go index d7bdeae43b6..32110c5e1bc 100644 --- a/client/go/internal/admin/jvm/container.go +++ b/client/go/internal/admin/jvm/container.go @@ -68,8 +68,8 @@ func (cb *containerBase) Exec() { if cb.propsFile != "" { writeEnvAsProperties(p.EffectiveEnv(), cb.propsFile) } - trace.Info("starting container; env:", readableEnv(p.Env)) - trace.Info("starting container; exec:", argv) + trace.Info("JVM env:", readableEnv(p.Env)) + trace.Info("JVM exec:", argv) err := p.Run() util.JustExitWith(err) } diff --git a/client/go/internal/admin/trace/log.go b/client/go/internal/admin/trace/log.go index 0254d0e4f4a..7b7fe6d29ec 100644 --- a/client/go/internal/admin/trace/log.go +++ b/client/go/internal/admin/trace/log.go @@ -13,6 +13,12 @@ import ( "github.com/vespa-engine/vespa/client/go/internal/admin/envvars" ) +func getComponent() string { + s := os.Args[0] + parts := strings.Split(s, "/") + return parts[len(parts)-1] +} + // make a vespa-format log line func logMessage(l outputLevel, msg string) { @@ -24,7 +30,7 @@ func logMessage(l outputLevel, msg string) { if service == "" { service = "-" } - component := "stderr" + component := getComponent() level := "error" switch l { case levelWarning: diff --git a/client/go/internal/admin/vespa-wrapper/logfmt/cmd.go b/client/go/internal/admin/vespa-wrapper/logfmt/cmd.go index a8675c37356..4406457ac23 100644 --- a/client/go/internal/admin/vespa-wrapper/logfmt/cmd.go +++ b/client/go/internal/admin/vespa-wrapper/logfmt/cmd.go @@ -6,7 +6,7 @@ package logfmt import ( "github.com/spf13/cobra" - "github.com/vespa-engine/vespa/client/go/internal/cli/build" + "github.com/vespa-engine/vespa/client/go/internal/build" ) func NewLogfmtCmd() *cobra.Command { diff --git a/client/go/internal/admin/vespa-wrapper/services/configproxy.go b/client/go/internal/admin/vespa-wrapper/services/configproxy.go index 9807d8d4591..0ff3e865c0a 100644 --- a/client/go/internal/admin/vespa-wrapper/services/configproxy.go +++ b/client/go/internal/admin/vespa-wrapper/services/configproxy.go @@ -25,6 +25,7 @@ const ( ) func JustRunConfigproxy() int { + os.Setenv(envvars.VESPA_SERVICE_NAME, PROXY_SERVICE_NAME) commonPreChecks() vespa.CheckCorrectUser() configsources := defaults.VespaConfigserverRpcAddrs() diff --git a/client/go/internal/cli/build/build.go b/client/go/internal/build/build.go index a8342a9fb1e..a8342a9fb1e 100644 --- a/client/go/internal/cli/build/build.go +++ b/client/go/internal/build/build.go diff --git a/client/go/internal/cli/auth/auth0/auth0.go b/client/go/internal/cli/auth/auth0/auth0.go index 36dc0b8871c..5f7612d4d2e 100644 --- a/client/go/internal/cli/auth/auth0/auth0.go +++ b/client/go/internal/cli/auth/auth0/auth0.go @@ -33,12 +33,16 @@ type Credentials struct { type Client struct { httpClient util.HTTPClient Authenticator *auth.Authenticator // TODO: Make this private - configPath string - systemName string - systemURL string + options Options provider auth0Provider } +type Options struct { + ConfigPath string + SystemName string + SystemURL string +} + // config is the root type of the persisted config type config struct { Version int `json:"version"` @@ -74,12 +78,12 @@ func cancelOnInterrupt() context.Context { return ctx } -func newClient(httpClient util.HTTPClient, configPath, systemName, systemURL string) (*Client, error) { +// NewClient constructs a new Auth0 client, storing configuration in the given configPath. The client will be configured for +// use in the given Vespa system. +func NewClient(httpClient util.HTTPClient, options Options) (*Client, error) { a := Client{} a.httpClient = httpClient - a.configPath = configPath - a.systemName = systemName - a.systemURL = systemURL + a.options = options c, err := a.getDeviceFlowConfig() if err != nil { return nil, err @@ -90,7 +94,7 @@ func newClient(httpClient util.HTTPClient, configPath, systemName, systemURL str DeviceCodeEndpoint: c.DeviceCodeEndpoint, OauthTokenEndpoint: c.OauthTokenEndpoint, } - provider, err := readConfig(configPath) + provider, err := readConfig(options.ConfigPath) if err != nil { return nil, err } @@ -98,14 +102,8 @@ func newClient(httpClient util.HTTPClient, configPath, systemName, systemURL str return &a, nil } -// New constructs a new Auth0 client, storing configuration in the given configPath. The client will be configured for -// use in the given Vespa system. -func New(configPath string, systemName, systemURL string) (*Client, error) { - return newClient(util.CreateClient(time.Second*30), configPath, systemName, systemURL) -} - func (a *Client) getDeviceFlowConfig() (flowConfig, error) { - url := a.systemURL + "/auth0/v1/device-flow-config" + url := a.options.SystemURL + "/auth0/v1/device-flow-config" req, err := http.NewRequest("GET", url, nil) if err != nil { return flowConfig{}, err @@ -125,11 +123,11 @@ func (a *Client) getDeviceFlowConfig() (flowConfig, error) { return cfg, nil } -// GetAccessToken returns an access token for the configured system, refreshing it if necessary. -func (a *Client) GetAccessToken() (string, error) { - creds, ok := a.provider.Systems[a.systemName] +// AccessToken returns an access token for the configured system, refreshing it if necessary. +func (a *Client) AccessToken() (string, error) { + creds, ok := a.provider.Systems[a.options.SystemName] if !ok { - return "", fmt.Errorf("system %s is not configured", a.systemName) + return "", fmt.Errorf("system %s is not configured", a.options.SystemName) } else if creds.AccessToken == "" { return "", fmt.Errorf("access token missing: %s", reauthMessage) } else if scopesChanged(creds) { @@ -142,7 +140,7 @@ func (a *Client) GetAccessToken() (string, error) { Secrets: &auth.Keyring{}, Client: http.DefaultClient, } - resp, err := tr.Refresh(cancelOnInterrupt(), a.systemName) + resp, err := tr.Refresh(cancelOnInterrupt(), a.options.SystemName) if err != nil { return "", fmt.Errorf("failed to renew access token: %w: %s", err, reauthMessage) } else { @@ -177,7 +175,7 @@ func scopesChanged(s Credentials) bool { // HasCredentials returns true if this client has retrived credentials for the configured system. func (a *Client) HasCredentials() bool { - _, ok := a.provider.Systems[a.systemName] + _, ok := a.provider.Systems[a.options.SystemName] return ok } @@ -186,8 +184,8 @@ func (a *Client) WriteCredentials(credentials Credentials) error { if a.provider.Systems == nil { a.provider.Systems = make(map[string]Credentials) } - a.provider.Systems[a.systemName] = credentials - if err := writeConfig(a.provider, a.configPath); err != nil { + a.provider.Systems[a.options.SystemName] = credentials + if err := writeConfig(a.provider, a.options.ConfigPath); err != nil { return fmt.Errorf("failed to write config: %w", err) } return nil @@ -196,11 +194,11 @@ func (a *Client) WriteCredentials(credentials Credentials) error { // RemoveCredentials removes credentials for the system configured in this client. func (a *Client) RemoveCredentials() error { tr := &auth.TokenRetriever{Secrets: &auth.Keyring{}} - if err := tr.Delete(a.systemName); err != nil { - return fmt.Errorf("failed to remove system %s from secret storage: %w", a.systemName, err) + if err := tr.Delete(a.options.SystemName); err != nil { + return fmt.Errorf("failed to remove system %s from secret storage: %w", a.options.SystemName, err) } - delete(a.provider.Systems, a.systemName) - if err := writeConfig(a.provider, a.configPath); err != nil { + delete(a.provider.Systems, a.options.SystemName) + if err := writeConfig(a.provider, a.options.ConfigPath); err != nil { return fmt.Errorf("failed to write config: %w", err) } return nil diff --git a/client/go/internal/cli/auth/auth0/auth0_test.go b/client/go/internal/cli/auth/auth0/auth0_test.go index 39393bbdfc1..b4bd34eb6d6 100644 --- a/client/go/internal/cli/auth/auth0/auth0_test.go +++ b/client/go/internal/cli/auth/auth0/auth0_test.go @@ -21,7 +21,7 @@ func TestConfigWriting(t *testing.T) { "oauth-token-endpoint": "https://example.com/oauth/token" }` httpClient.NextResponseString(200, flowConfigResponse) - client, err := newClient(&httpClient, configPath, "public", "http://example.com") + client, err := NewClient(&httpClient, Options{ConfigPath: configPath, SystemName: "public", SystemURL: "http://example.com"}) require.Nil(t, err) assert.Equal(t, "https://example.com/api/v2/", client.Authenticator.Audience) assert.Equal(t, "some-id", client.Authenticator.ClientID) @@ -56,7 +56,7 @@ func TestConfigWriting(t *testing.T) { // Switch to another system httpClient.NextResponseString(200, flowConfigResponse) - client, err = newClient(&httpClient, configPath, "publiccd", "http://example.com") + client, err = NewClient(&httpClient, Options{ConfigPath: configPath, SystemName: "publiccd", SystemURL: "http://example.com"}) require.Nil(t, err) creds2 := Credentials{ AccessToken: "another-token", diff --git a/client/go/internal/cli/auth/zts/zts.go b/client/go/internal/cli/auth/zts/zts.go index 9fabe219209..1e84912a271 100644 --- a/client/go/internal/cli/auth/zts/zts.go +++ b/client/go/internal/cli/auth/zts/zts.go @@ -21,7 +21,7 @@ type Client struct { } // NewClient creates a new client for an Athenz ZTS service located at serviceURL. -func NewClient(serviceURL string, client util.HTTPClient) (*Client, error) { +func NewClient(client util.HTTPClient, serviceURL string) (*Client, error) { tokenURL, err := url.Parse(serviceURL) if err != nil { return nil, err @@ -37,7 +37,7 @@ func (c *Client) AccessToken(domain string, certificate tls.Certificate) (string if err != nil { return "", err } - c.client.UseCertificate([]tls.Certificate{certificate}) + util.SetCertificate(c.client, []tls.Certificate{certificate}) response, err := c.client.Do(req, 10*time.Second) if err != nil { return "", err diff --git a/client/go/internal/cli/auth/zts/zts_test.go b/client/go/internal/cli/auth/zts/zts_test.go index 6c6ced9bb33..d0cc7ea9f9d 100644 --- a/client/go/internal/cli/auth/zts/zts_test.go +++ b/client/go/internal/cli/auth/zts/zts_test.go @@ -9,7 +9,7 @@ import ( func TestAccessToken(t *testing.T) { httpClient := mock.HTTPClient{} - client, err := NewClient("http://example.com", &httpClient) + client, err := NewClient(&httpClient, "http://example.com") if err != nil { t.Fatal(err) } diff --git a/client/go/internal/cli/cmd/config.go b/client/go/internal/cli/cmd/config.go index e5124962831..fd049864096 100644 --- a/client/go/internal/cli/cmd/config.go +++ b/client/go/internal/cli/cmd/config.go @@ -490,7 +490,7 @@ func (c *Config) readAPIKey(cli *CLI, system vespa.System, tenantName string) ([ return nil, nil // Vespa Cloud CI only talks to data plane and does not have an API key } if !cli.isCI() { - client, err := auth0.New(c.authConfigPath(), system.Name, system.URL) + client, err := cli.auth0Factory(cli.httpClient, auth0.Options{ConfigPath: c.authConfigPath(), SystemName: system.Name, SystemURL: system.URL}) if err == nil && client.HasCredentials() { return nil, nil // use Auth0 } diff --git a/client/go/internal/cli/cmd/config_test.go b/client/go/internal/cli/cmd/config_test.go index c76277119a0..612904061de 100644 --- a/client/go/internal/cli/cmd/config_test.go +++ b/client/go/internal/cli/cmd/config_test.go @@ -8,7 +8,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/vespa-engine/vespa/client/go/internal/cli/auth/auth0" "github.com/vespa-engine/vespa/client/go/internal/mock" + "github.com/vespa-engine/vespa/client/go/internal/util" "github.com/vespa-engine/vespa/client/go/internal/vespa" ) @@ -184,24 +186,10 @@ func TestReadAPIKey(t *testing.T) { assert.Equal(t, []byte("baz"), key) // Auth0 is preferred when configured - authContent := ` -{ - "version": 1, - "providers": { - "auth0": { - "version": 1, - "systems": { - "public": { - "access_token": "...", - "scopes": ["openid", "offline_access"], - "expires_at": "2030-01-01T01:01:01.000001+01:00" - } - } - } - } -}` cli, _, _ = newTestCLI(t) - require.Nil(t, os.WriteFile(filepath.Join(cli.config.homeDir, "auth.json"), []byte(authContent), 0600)) + cli.auth0Factory = func(httpClient util.HTTPClient, options auth0.Options) (auth0Client, error) { + return &mockAuth0{hasCredentials: true}, nil + } key, err = cli.config.readAPIKey(cli, vespa.PublicSystem, "t1") require.Nil(t, err) assert.Nil(t, key) diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go new file mode 100644 index 00000000000..621676d0353 --- /dev/null +++ b/client/go/internal/cli/cmd/feed.go @@ -0,0 +1,143 @@ +package cmd + +import ( + "encoding/json" + "fmt" + "io" + "math" + "os" + "time" + + "github.com/spf13/cobra" + "github.com/vespa-engine/vespa/client/go/internal/vespa/document" +) + +func addFeedFlags(cmd *cobra.Command, maxConnections *int, concurrency *int) { + cmd.PersistentFlags().IntVarP(maxConnections, "max-connections", "N", 8, "Maximum number of HTTP connections to use") + cmd.PersistentFlags().IntVarP(concurrency, "concurrency", "T", 64, "Number of goroutines to use for dispatching") +} + +func newFeedCmd(cli *CLI) *cobra.Command { + var ( + maxConnections int + concurrency int + ) + cmd := &cobra.Command{ + Use: "feed FILE", + Short: "Feed documents to a Vespa cluster", + Long: `Feed documents to a Vespa cluster. + +A high performance feeding client. This can be used to feed large amounts of +documents to Vespa cluster efficiently. + +The contents of FILE must be either a JSON array or JSON objects separated by +newline (JSONL). +`, + Example: `$ vespa feed documents.jsonl +`, + Args: cobra.ExactArgs(1), + DisableAutoGenTag: true, + SilenceUsage: true, + Hidden: true, // TODO(mpolden): Remove when ready for public use + RunE: func(cmd *cobra.Command, args []string) error { + f, err := os.Open(args[0]) + if err != nil { + return err + } + defer f.Close() + return feed(f, cli, maxConnections, concurrency) + }, + } + addFeedFlags(cmd, &maxConnections, &concurrency) + return cmd +} + +func feed(r io.Reader, cli *CLI, maxConnections, concurrency int) error { + service, err := documentService(cli) + if err != nil { + return err + } + client := document.NewClient(document.ClientOptions{ + BaseURL: service.BaseURL, + MaxConnsPerHost: maxConnections, + }, service) + dispatcher := document.NewDispatcher(client, concurrency) + dec := document.NewDecoder(r) + + start := cli.now() + for { + doc, err := dec.Decode() + if err == io.EOF { + break + } + if err != nil { + cli.printErr(fmt.Errorf("failed to decode document: %w", err)) + } + if err := dispatcher.Enqueue(doc); err != nil { + cli.printErr(err) + } + } + if err := dispatcher.Close(); err != nil { + return err + } + elapsed := cli.now().Sub(start) + return writeSummaryJSON(cli.Stdout, client.Stats(), elapsed) +} + +type number float32 + +func (n number) MarshalJSON() ([]byte, error) { return []byte(fmt.Sprintf("%.3f", n)), nil } + +type feedSummary struct { + Seconds number `json:"feeder.seconds"` + SuccessCount int64 `json:"feeder.ok.count"` + SuccessRate number `json:"feeder.ok.rate"` + ErrorCount int64 `json:"feeder.error.count"` + InflightCount int64 `json:"feeder.inflight.count"` + + RequestCount int64 `json:"http.request.count"` + RequestBytes int64 `json:"http.request.bytes"` + RequestRate number `json:"http.request.MBps"` + ExceptionCount int64 `json:"http.exception.count"` // same as ErrorCount, for compatability with vespa-feed-client + + ResponseCount int64 `json:"http.response.count"` + ResponseBytes int64 `json:"http.response.bytes"` + ResponseRate number `json:"http.response.MBps"` + ResponseErrorCount int64 `json:"http.response.error.count"` + + ResponseMinLatency int64 `json:"http.response.latency.millis.min"` + ResponseAvgLatency int64 `json:"http.response.latency.millis.avg"` + ResponseMaxLatency int64 `json:"http.response.latency.millis.max"` + ResponseCodeCounts map[int]int64 `json:"http.response.code.counts"` +} + +func mbps(bytes int64, duration time.Duration) float64 { + return (float64(bytes) / 1000 / 1000) / math.Max(1, duration.Seconds()) +} + +func writeSummaryJSON(w io.Writer, stats document.Stats, duration time.Duration) error { + summary := feedSummary{ + Seconds: number(duration.Seconds()), + SuccessCount: stats.Successes(), + SuccessRate: number(float64(stats.Successes()) / math.Max(1, duration.Seconds())), + ErrorCount: stats.Errors, + InflightCount: stats.Inflight, + + RequestCount: stats.Requests, + RequestBytes: stats.BytesSent, + RequestRate: number(mbps(stats.BytesSent, duration)), + ExceptionCount: stats.Errors, + + ResponseCount: stats.Responses, + ResponseBytes: stats.BytesRecv, + ResponseRate: number(mbps(stats.BytesRecv, duration)), + ResponseErrorCount: stats.Responses - stats.Successes(), + ResponseMinLatency: stats.MinLatency.Milliseconds(), + ResponseAvgLatency: stats.AvgLatency().Milliseconds(), + ResponseMaxLatency: stats.MaxLatency.Milliseconds(), + ResponseCodeCounts: stats.ResponsesByCode, + } + enc := json.NewEncoder(w) + enc.SetIndent("", " ") + return enc.Encode(summary) +} diff --git a/client/go/internal/cli/cmd/feed_test.go b/client/go/internal/cli/cmd/feed_test.go new file mode 100644 index 00000000000..1bf1ef6ab9b --- /dev/null +++ b/client/go/internal/cli/cmd/feed_test.go @@ -0,0 +1,67 @@ +package cmd + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/vespa-engine/vespa/client/go/internal/mock" +) + +type manualClock struct { + t time.Time + tick time.Duration +} + +func (c *manualClock) now() time.Time { + t := c.t + c.t = c.t.Add(c.tick) + return t +} + +func TestFeed(t *testing.T) { + httpClient := &mock.HTTPClient{} + clock := &manualClock{tick: time.Second} + cli, stdout, stderr := newTestCLI(t) + cli.httpClient = httpClient + cli.now = clock.now + + td := t.TempDir() + jsonFile := filepath.Join(td, "docs.jsonl") + err := os.WriteFile(jsonFile, []byte(`{ + "put": "id:ns:type::doc1", + "fields": {"foo": "123"} +}`), 0644) + + require.Nil(t, err) + + httpClient.NextResponseString(200, `{"message":"OK"}`) + require.Nil(t, cli.Run("feed", jsonFile)) + + assert.Equal(t, "", stderr.String()) + assert.Equal(t, `{ + "feeder.seconds": 1.000, + "feeder.ok.count": 1, + "feeder.ok.rate": 1.000, + "feeder.error.count": 0, + "feeder.inflight.count": 0, + "http.request.count": 1, + "http.request.bytes": 25, + "http.request.MBps": 0.000, + "http.exception.count": 0, + "http.response.count": 1, + "http.response.bytes": 16, + "http.response.MBps": 0.000, + "http.response.error.count": 0, + "http.response.latency.millis.min": 0, + "http.response.latency.millis.avg": 0, + "http.response.latency.millis.max": 0, + "http.response.code.counts": { + "200": 1 + } +} +`, stdout.String()) +} diff --git a/client/go/internal/cli/cmd/login.go b/client/go/internal/cli/cmd/login.go index aa6a18b3b38..9ac2262e78d 100644 --- a/client/go/internal/cli/cmd/login.go +++ b/client/go/internal/cli/cmd/login.go @@ -35,7 +35,7 @@ func newLoginCmd(cli *CLI) *cobra.Command { if err != nil { return err } - a, err := auth0.New(cli.config.authConfigPath(), system.Name, system.URL) + a, err := auth0.NewClient(cli.httpClient, auth0.Options{ConfigPath: cli.config.authConfigPath(), SystemName: system.Name, SystemURL: system.URL}) if err != nil { return err } @@ -68,16 +68,14 @@ func newLoginCmd(cli *CLI) *cobra.Command { return fmt.Errorf("login error: %w", err) } - log.Print("\n") - log.Println("Successfully logged in.") - log.Print("\n") + cli.printSuccess("Logged in") // store the refresh token secretsStore := &auth.Keyring{} err = secretsStore.Set(auth.SecretsNamespace, system.Name, res.RefreshToken) if err != nil { // log the error but move on - log.Println("Could not store the refresh token locally, please expect to login again once your access token expired.") + cli.printWarning("Could not store the refresh token locally. You may need to login again once your access token expires") } creds := auth0.Credentials{ diff --git a/client/go/internal/cli/cmd/logout.go b/client/go/internal/cli/cmd/logout.go index f2ee6e87ac7..32e7cd9783b 100644 --- a/client/go/internal/cli/cmd/logout.go +++ b/client/go/internal/cli/cmd/logout.go @@ -1,8 +1,6 @@ package cmd import ( - "log" - "github.com/spf13/cobra" "github.com/vespa-engine/vespa/client/go/internal/cli/auth/auth0" ) @@ -24,17 +22,14 @@ func newLogoutCmd(cli *CLI) *cobra.Command { if err != nil { return err } - a, err := auth0.New(cli.config.authConfigPath(), system.Name, system.URL) + a, err := auth0.NewClient(cli.httpClient, auth0.Options{ConfigPath: cli.config.authConfigPath(), SystemName: system.Name, SystemURL: system.URL}) if err != nil { return err } if err := a.RemoveCredentials(); err != nil { return err } - - log.Print("\n") - log.Println("Successfully logged out.") - log.Print("\n") + cli.printSuccess("Logged out") return nil }, } diff --git a/client/go/internal/cli/cmd/root.go b/client/go/internal/cli/cmd/root.go index 70e0afbcd32..5edfd1136e5 100644 --- a/client/go/internal/cli/cmd/root.go +++ b/client/go/internal/cli/cmd/root.go @@ -2,6 +2,7 @@ package cmd import ( + "crypto/tls" "encoding/json" "fmt" "io" @@ -16,7 +17,9 @@ import ( "github.com/mattn/go-isatty" "github.com/spf13/cobra" "github.com/spf13/pflag" - "github.com/vespa-engine/vespa/client/go/internal/cli/build" + "github.com/vespa-engine/vespa/client/go/internal/build" + "github.com/vespa-engine/vespa/client/go/internal/cli/auth/auth0" + "github.com/vespa-engine/vespa/client/go/internal/cli/auth/zts" "github.com/vespa-engine/vespa/client/go/internal/util" "github.com/vespa-engine/vespa/client/go/internal/version" "github.com/vespa-engine/vespa/client/go/internal/vespa" @@ -45,10 +48,13 @@ type CLI struct { config *Config version version.Version - httpClient util.HTTPClient - exec executor - isTerminal func() bool - spinner func(w io.Writer, message string, fn func() error) error + httpClient util.HTTPClient + auth0Factory auth0Factory + ztsFactory ztsFactory + exec executor + isTerminal func() bool + spinner func(w io.Writer, message string, fn func() error) error + now func() time.Time } // ErrCLI is an error returned to the user. It wraps an exit status, a regular error and optional hints for resolving @@ -82,6 +88,19 @@ func (c *execSubprocess) Run(name string, args ...string) ([]byte, error) { return exec.Command(name, args...).Output() } +type ztsClient interface { + AccessToken(domain string, certficiate tls.Certificate) (string, error) +} + +type auth0Client interface { + AccessToken() (string, error) + HasCredentials() bool +} + +type auth0Factory func(httpClient util.HTTPClient, options auth0.Options) (auth0Client, error) + +type ztsFactory func(httpClient util.HTTPClient, url string) (ztsClient, error) + // New creates the Vespa CLI, writing output to stdout and stderr, and reading environment variables from environment. func New(stdout, stderr io.Writer, environment []string) (*CLI, error) { cmd := &cobra.Command{ @@ -123,6 +142,13 @@ For detailed description of flags and configuration, see 'vespa help config'. cmd: cmd, httpClient: util.CreateClient(time.Second * 10), exec: &execSubprocess{}, + now: time.Now, + auth0Factory: func(httpClient util.HTTPClient, options auth0.Options) (auth0Client, error) { + return auth0.NewClient(httpClient, options) + }, + ztsFactory: func(httpClient util.HTTPClient, url string) (ztsClient, error) { + return zts.NewClient(httpClient, url) + }, } cli.isTerminal = func() bool { return isTerminal(cli.Stdout) && isTerminal(cli.Stderr) } if err := cli.loadConfig(); err != nil { @@ -251,6 +277,7 @@ func (c *CLI) configureCommands() { rootCmd.AddCommand(newTestCmd(c)) // test rootCmd.AddCommand(newVersionCmd(c)) // version rootCmd.AddCommand(newVisitCmd(c)) // visit + rootCmd.AddCommand(newFeedCmd(c)) // feed } func (c *CLI) printErr(err error, hints ...string) { @@ -359,10 +386,9 @@ func (c *CLI) createCloudTarget(targetType string, opts targetOptions) (vespa.Ta return nil, fmt.Errorf("invalid cloud target: %s", targetType) } apiOptions := vespa.APIOptions{ - System: system, - TLSOptions: apiTLSOptions, - APIKey: apiKey, - AuthConfigPath: authConfigPath, + System: system, + TLSOptions: apiTLSOptions, + APIKey: apiKey, } deploymentOptions := vespa.CloudDeploymentOptions{ Deployment: deployment, @@ -377,7 +403,15 @@ func (c *CLI) createCloudTarget(targetType string, opts targetOptions) (vespa.Ta Writer: c.Stdout, Level: vespa.LogLevel(logLevel), } - return vespa.CloudTarget(c.httpClient, apiOptions, deploymentOptions, logOptions) + auth0, err := c.auth0Factory(c.httpClient, auth0.Options{ConfigPath: authConfigPath, SystemName: apiOptions.System.Name, SystemURL: apiOptions.System.URL}) + if err != nil { + return nil, err + } + zts, err := c.ztsFactory(c.httpClient, zts.DefaultURL) + if err != nil { + return nil, err + } + return vespa.CloudTarget(c.httpClient, zts, auth0, apiOptions, deploymentOptions, logOptions) } // system returns the appropiate system for the target configured in this CLI. diff --git a/client/go/internal/cli/cmd/test.go b/client/go/internal/cli/cmd/test.go index d071f9556a2..4a53fe6bed3 100644 --- a/client/go/internal/cli/cmd/test.go +++ b/client/go/internal/cli/cmd/test.go @@ -263,7 +263,7 @@ func verify(step step, defaultCluster string, defaultParameters map[string]strin var response *http.Response if externalEndpoint { - context.cli.httpClient.UseCertificate([]tls.Certificate{}) + util.SetCertificate(context.cli.httpClient, []tls.Certificate{}) response, err = context.cli.httpClient.Do(request, 60*time.Second) } else { response, err = service.Do(request, 600*time.Second) // Vespa should provide a response within the given request timeout diff --git a/client/go/internal/cli/cmd/testutil_test.go b/client/go/internal/cli/cmd/testutil_test.go index 6eade6edd86..61f8dab2264 100644 --- a/client/go/internal/cli/cmd/testutil_test.go +++ b/client/go/internal/cli/cmd/testutil_test.go @@ -3,10 +3,13 @@ package cmd import ( "bytes" + "crypto/tls" "path/filepath" "testing" + "github.com/vespa-engine/vespa/client/go/internal/cli/auth/auth0" "github.com/vespa-engine/vespa/client/go/internal/mock" + "github.com/vespa-engine/vespa/client/go/internal/util" ) func newTestCLI(t *testing.T, envVars ...string) (*CLI, *bytes.Buffer, *bytes.Buffer) { @@ -23,7 +26,24 @@ func newTestCLI(t *testing.T, envVars ...string) (*CLI, *bytes.Buffer, *bytes.Bu if err != nil { t.Fatal(err) } - cli.httpClient = &mock.HTTPClient{} + httpClient := &mock.HTTPClient{} + cli.httpClient = httpClient cli.exec = &mock.Exec{} + cli.auth0Factory = func(httpClient util.HTTPClient, options auth0.Options) (auth0Client, error) { + return &mockAuth0{}, nil + } + cli.ztsFactory = func(httpClient util.HTTPClient, url string) (ztsClient, error) { + return &mockZTS{}, nil + } return cli, &stdout, &stderr } + +type mockZTS struct{} + +func (z *mockZTS) AccessToken(domain string, cert tls.Certificate) (string, error) { return "", nil } + +type mockAuth0 struct{ hasCredentials bool } + +func (a *mockAuth0) AccessToken() (string, error) { return "", nil } + +func (a *mockAuth0) HasCredentials() bool { return a.hasCredentials } diff --git a/client/go/internal/cli/cmd/version.go b/client/go/internal/cli/cmd/version.go index 864c0668eda..aaf2c42cded 100644 --- a/client/go/internal/cli/cmd/version.go +++ b/client/go/internal/cli/cmd/version.go @@ -14,7 +14,7 @@ import ( "github.com/fatih/color" "github.com/spf13/cobra" - "github.com/vespa-engine/vespa/client/go/internal/cli/build" + "github.com/vespa-engine/vespa/client/go/internal/build" "github.com/vespa-engine/vespa/client/go/internal/version" ) diff --git a/client/go/internal/mock/http.go b/client/go/internal/mock/http.go index 84718e846c1..d1fb4f28327 100644 --- a/client/go/internal/mock/http.go +++ b/client/go/internal/mock/http.go @@ -2,7 +2,6 @@ package mock import ( "bytes" - "crypto/tls" "io" "net/http" "strconv" @@ -60,4 +59,4 @@ func (c *HTTPClient) Do(request *http.Request, timeout time.Duration) (*http.Res nil } -func (c *HTTPClient) UseCertificate(certificates []tls.Certificate) {} +func (c *HTTPClient) Transport() *http.Transport { return &http.Transport{} } diff --git a/client/go/internal/util/http.go b/client/go/internal/util/http.go index 53e3c4b36c2..b18f9a00c6a 100644 --- a/client/go/internal/util/http.go +++ b/client/go/internal/util/http.go @@ -7,16 +7,17 @@ import ( "net/http" "time" - "github.com/vespa-engine/vespa/client/go/internal/cli/build" + "github.com/vespa-engine/vespa/client/go/internal/build" ) type HTTPClient interface { Do(request *http.Request, timeout time.Duration) (response *http.Response, error error) - UseCertificate(certificate []tls.Certificate) + Transport() *http.Transport } type defaultHTTPClient struct { - client *http.Client + client *http.Client + transport *http.Transport } func (c *defaultHTTPClient) Do(request *http.Request, timeout time.Duration) (response *http.Response, error error) { @@ -30,13 +31,24 @@ func (c *defaultHTTPClient) Do(request *http.Request, timeout time.Duration) (re return c.client.Do(request) } -func (c *defaultHTTPClient) UseCertificate(certificates []tls.Certificate) { - c.client.Transport = &http.Transport{TLSClientConfig: &tls.Config{ +func (c *defaultHTTPClient) Transport() *http.Transport { return c.transport } + +func SetCertificate(client HTTPClient, certificates []tls.Certificate) { + client.Transport().TLSClientConfig = &tls.Config{ Certificates: certificates, MinVersion: tls.VersionTLS12, - }} + } } func CreateClient(timeout time.Duration) HTTPClient { - return &defaultHTTPClient{client: &http.Client{Timeout: timeout}} + transport := http.Transport{ + ForceAttemptHTTP2: true, + } + return &defaultHTTPClient{ + client: &http.Client{ + Timeout: timeout, + Transport: &transport, + }, + transport: &transport, + } } diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go new file mode 100644 index 00000000000..fa15a8a1223 --- /dev/null +++ b/client/go/internal/vespa/document/dispatcher.go @@ -0,0 +1,63 @@ +package document + +import ( + "fmt" + "sync" +) + +// Dispatcher dispatches documents from a queue to a Feeder. +type Dispatcher struct { + concurrencyLevel int + feeder Feeder + pending chan Document + closed bool + mu sync.RWMutex + wg sync.WaitGroup +} + +func NewDispatcher(feeder Feeder, concurrencyLevel int) *Dispatcher { + if concurrencyLevel < 1 { + concurrencyLevel = 1 + } + d := &Dispatcher{ + concurrencyLevel: concurrencyLevel, + feeder: feeder, + pending: make(chan Document, 4*concurrencyLevel), + } + d.readPending() + return d +} + +func (d *Dispatcher) readPending() { + for i := 0; i < d.concurrencyLevel; i++ { + d.wg.Add(1) + go func(n int) { + defer d.wg.Done() + for doc := range d.pending { + d.feeder.Send(doc) + } + }(i) + } +} + +func (d *Dispatcher) Enqueue(doc Document) error { + d.mu.RLock() + defer d.mu.RUnlock() + if d.closed { + return fmt.Errorf("cannot enqueue document in closed dispatcher") + } + d.pending <- doc + return nil +} + +// Close closes the dispatcher and waits for all goroutines to return. +func (d *Dispatcher) Close() error { + d.mu.Lock() + defer d.mu.Unlock() + if !d.closed { + d.closed = true + close(d.pending) + d.wg.Wait() + } + return nil +} diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go new file mode 100644 index 00000000000..0e4876a7d4b --- /dev/null +++ b/client/go/internal/vespa/document/dispatcher_test.go @@ -0,0 +1,37 @@ +package document + +import ( + "encoding/json" + "sync" + "testing" +) + +type mockFeeder struct { + documents []Document + mu sync.Mutex +} + +func (f *mockFeeder) Send(doc Document) Result { + f.mu.Lock() + defer f.mu.Unlock() + f.documents = append(f.documents, doc) + return Result{Id: doc.Id} +} + +func (f *mockFeeder) Stats() Stats { return Stats{} } + +func TestDispatcher(t *testing.T) { + feeder := &mockFeeder{} + dispatcher := NewDispatcher(feeder, 2) + docs := []Document{ + {PutId: "id:ns:type::doc1", Fields: json.RawMessage(`{"foo": "123"}`)}, + {PutId: "id:ns:type::doc2", Fields: json.RawMessage(`{"bar": "456"}`)}, + } + for _, d := range docs { + dispatcher.Enqueue(d) + } + dispatcher.Close() + if got, want := len(feeder.documents), 2; got != want { + t.Errorf("got %d documents, want %d", got, want) + } +} diff --git a/client/go/internal/vespa/document/document.go b/client/go/internal/vespa/document/document.go new file mode 100644 index 00000000000..6aeafd80005 --- /dev/null +++ b/client/go/internal/vespa/document/document.go @@ -0,0 +1,241 @@ +package document + +import ( + "bufio" + "encoding/json" + "fmt" + "io" + "strconv" + "strings" +) + +var asciiSpace = [256]uint8{'\t': 1, '\n': 1, '\v': 1, '\f': 1, '\r': 1, ' ': 1} + +type Operation int + +const ( + OperationPut = iota + OperationUpdate + OperationRemove +) + +// Id represents a Vespa document ID. +type Id struct { + Type string + Namespace string + Number *int64 + Group string + UserSpecific string +} + +func (d Id) Equal(o Id) bool { + return d.Type == o.Type && + d.Namespace == o.Namespace && + ((d.Number == nil && o.Number == nil) || *d.Number == *o.Number) && + d.Group == o.Group && + d.UserSpecific == o.UserSpecific +} + +func (d Id) String() string { + var sb strings.Builder + sb.WriteString("id:") + sb.WriteString(d.Namespace) + sb.WriteString(":") + sb.WriteString(d.Type) + sb.WriteString(":") + if d.Number != nil { + sb.WriteString("n=") + sb.WriteString(strconv.FormatInt(*d.Number, 10)) + } else if d.Group != "" { + sb.WriteString("g=") + sb.WriteString(d.Group) + } + sb.WriteString(":") + sb.WriteString(d.UserSpecific) + return sb.String() +} + +// ParseId parses a serialized document ID string. +func ParseId(serialized string) (Id, error) { + parts := strings.SplitN(serialized, ":", 4) + if len(parts) < 4 || parts[0] != "id" { + return Id{}, parseError(serialized) + } + namespace := parts[1] + if namespace == "" { + return Id{}, parseError(serialized) + } + docType := parts[2] + if docType == "" { + return Id{}, parseError(serialized) + } + rest := strings.SplitN(parts[3], ":", 2) + if len(rest) < 2 { + return Id{}, parseError(serialized) + } + options := rest[0] + userSpecific := rest[1] + var number *int64 + group := "" + if strings.HasPrefix(options, "n=") { + n, err := strconv.ParseInt(options[2:], 10, 64) + if err != nil { + return Id{}, parseError(serialized) + } + number = &n + } else if strings.HasPrefix(options, "g=") { + group = options[2:] + if len(group) == 0 { + return Id{}, parseError(serialized) + } + } else if options != "" { + return Id{}, parseError(serialized) + } + if userSpecific == "" { + return Id{}, parseError(serialized) + } + return Id{ + Namespace: namespace, + Type: docType, + Number: number, + Group: group, + UserSpecific: userSpecific, + }, nil +} + +// Document represents a Vespa document, and its operation. +type Document struct { + Id Id + Operation Operation + + IdString string `json:"id"` + PutId string `json:"put"` + UpdateId string `json:"update"` + RemoveId string `json:"remove"` + Condition string `json:"condition"` + Create bool `json:"create"` + Fields json.RawMessage `json:"fields"` +} + +// Body returns the body part of this document, suitable for sending to the /document/v1 API. +func (d Document) Body() []byte { + jsonObject := `{"fields":` + body := make([]byte, 0, len(jsonObject)+len(d.Fields)+1) + body = append(body, []byte(jsonObject)...) + body = append(body, d.Fields...) + body = append(body, byte('}')) + return body +} + +// Decoder decodes documents from a JSON structure which is either an array of objects, or objects separated by newline. +type Decoder struct { + buf *bufio.Reader + dec *json.Decoder + array bool + jsonl bool +} + +func (d *Decoder) guessMode() error { + for !d.array && !d.jsonl { + b, err := d.buf.ReadByte() + if err != nil { + return err + } + // Skip leading whitespace + if b < 0x80 && asciiSpace[b] != 0 { + continue + } + switch rune(b) { + case '{': + d.jsonl = true + case '[': + d.array = true + default: + return fmt.Errorf("unexpected token: %q", string(b)) + } + if err := d.buf.UnreadByte(); err != nil { + return err + } + if d.array { + // prepare for decoding objects inside array + if _, err := d.dec.Token(); err != nil { + return err + } + } + } + return nil +} + +func (d *Decoder) readCloseToken() error { + if !d.array { + return nil + } + _, err := d.dec.Token() + return err +} + +func (d *Decoder) Decode() (Document, error) { + doc, err := d.decode() + if err != nil && err != io.EOF { + return Document{}, fmt.Errorf("invalid json at byte offset %d: %w", d.dec.InputOffset(), err) + } + return doc, err +} + +func (d *Decoder) decode() (Document, error) { + if err := d.guessMode(); err != nil { + return Document{}, err + } + if !d.dec.More() { + err := io.EOF + if tokenErr := d.readCloseToken(); tokenErr != nil { + err = tokenErr + } + return Document{}, err + } + doc := Document{} + if err := d.dec.Decode(&doc); err != nil { + return Document{}, err + } + if err := parseDocument(&doc); err != nil { + return Document{}, err + } + return doc, nil +} + +func NewDecoder(r io.Reader) *Decoder { + buf := bufio.NewReader(r) + return &Decoder{ + buf: buf, + dec: json.NewDecoder(buf), + } +} + +func parseDocument(d *Document) error { + id := "" + if d.IdString != "" { + d.Operation = OperationPut + id = d.IdString + } else if d.PutId != "" { + d.Operation = OperationPut + id = d.PutId + } else if d.UpdateId != "" { + d.Operation = OperationUpdate + id = d.UpdateId + } else if d.RemoveId != "" { + d.Operation = OperationRemove + id = d.RemoveId + } else { + return fmt.Errorf("invalid document: missing operation: %v", d) + } + docId, err := ParseId(id) + if err != nil { + return err + } + d.Id = docId + return nil +} + +func parseError(value string) error { + return fmt.Errorf("invalid document: expected id:<namespace>:<document-type>:[n=<number>|g=<group>]:<user-specific>, got %q", value) +} diff --git a/client/go/internal/vespa/document/document_test.go b/client/go/internal/vespa/document/document_test.go new file mode 100644 index 00000000000..478dd795dd8 --- /dev/null +++ b/client/go/internal/vespa/document/document_test.go @@ -0,0 +1,190 @@ +package document + +import ( + "bytes" + "encoding/json" + "io" + "reflect" + "strings" + "testing" +) + +func ptr[T any](v T) *T { return &v } + +func mustParseDocument(d Document) Document { + if err := parseDocument(&d); err != nil { + panic(err) + } + return d +} + +func TestParseId(t *testing.T) { + tests := []struct { + in string + out Id + fail bool + }{ + {"id:ns:type::user", + Id{ + Namespace: "ns", + Type: "type", + UserSpecific: "user", + }, + false, + }, + {"id:ns:type:n=123:user", + Id{ + Namespace: "ns", + Type: "type", + Number: ptr(int64(123)), + UserSpecific: "user", + }, + false, + }, + {"id:ns:type:g=foo:user", + Id{ + Namespace: "ns", + Type: "type", + Group: "foo", + UserSpecific: "user", + }, + false, + }, + {"id:ns:type::user::specific", + Id{ + Namespace: "ns", + Type: "type", + UserSpecific: "user::specific", + }, + false, + }, + {"id:ns:type:::", + Id{ + Namespace: "ns", + Type: "type", + UserSpecific: ":", + }, + false, + }, + {"id:ns:type::n=user-specific", + Id{ + Namespace: "ns", + Type: "type", + UserSpecific: "n=user-specific", + }, + false, + }, + {"id:ns:type::g=user-specific", + Id{ + Namespace: "ns", + Type: "type", + UserSpecific: "g=user-specific", + }, + false, + }, + {"", Id{}, true}, + {"foobar", Id{}, true}, + {"idd:ns:type:user", Id{}, true}, + {"id:ns::user", Id{}, true}, + {"id::type:user", Id{}, true}, + {"id:ns:type:g=:user", Id{}, true}, + {"id:ns:type:n=:user", Id{}, true}, + {"id:ns:type:n=foo:user", Id{}, true}, + {"id:ns:type::", Id{}, true}, + {"id:ns:type:user-specific:foo-bar", Id{}, true}, + } + for i, tt := range tests { + parsed, err := ParseId(tt.in) + if err == nil && tt.fail { + t.Errorf("#%d: expected error for ParseDocumentId(%q), but got none", i, tt.in) + } + if err != nil && !tt.fail { + t.Errorf("#%d: got unexpected error for ParseDocumentId(%q) = (_, %v)", i, tt.in, err) + } + if !parsed.Equal(tt.out) { + t.Errorf("#%d: ParseDocumentId(%q) = (%s, _), want %s", i, tt.in, parsed, tt.out) + } + } +} + +func feedInput(jsonl bool) string { + operations := []string{ + ` +{ + "put": "id:ns:type::doc1", + "fields": {"foo": "123"} +}`, + ` +{ + "put": "id:ns:type::doc2", + "fields": {"bar": "456"} +}`, + ` +{ + "remove": "id:ns:type::doc1" +} +`} + if jsonl { + return strings.Join(operations, "\n") + } + return " \n[" + strings.Join(operations, ",") + "]" +} + +func testDocumentDecoder(t *testing.T, jsonLike string) { + t.Helper() + r := NewDecoder(strings.NewReader(jsonLike)) + want := []Document{ + mustParseDocument(Document{PutId: "id:ns:type::doc1", Fields: json.RawMessage(`{"foo": "123"}`)}), + mustParseDocument(Document{PutId: "id:ns:type::doc2", Fields: json.RawMessage(`{"bar": "456"}`)}), + mustParseDocument(Document{RemoveId: "id:ns:type::doc1", Fields: json.RawMessage(nil)}), + } + got := []Document{} + for { + doc, err := r.Decode() + if err == io.EOF { + break + } + if err != nil { + t.Fatal(err) + } + got = append(got, doc) + } + if !reflect.DeepEqual(got, want) { + t.Errorf("got %+v, want %+v", got, want) + } +} + +func TestDocumentDecoder(t *testing.T) { + testDocumentDecoder(t, feedInput(false)) + testDocumentDecoder(t, feedInput(true)) + + jsonLike := ` +{ + "put": "id:ns:type::doc1", + "fields": {"foo": "123"} +} +{ + "put": "id:ns:type::doc1", + "fields": {"foo": "invalid +} +` + r := NewDecoder(strings.NewReader(jsonLike)) + _, err := r.Decode() // first object is valid + if err != nil { + t.Errorf("unexpected error: %s", err) + } + _, err = r.Decode() + wantErr := "invalid json at byte offset 60: invalid character '\\n' in string literal" + if err.Error() != wantErr { + t.Errorf("want error %q, got %q", wantErr, err.Error()) + } +} + +func TestDocumentBody(t *testing.T) { + doc := Document{Fields: json.RawMessage([]byte(`{"foo": "123"}`))} + got := doc.Body() + want := []byte(`{"fields":{"foo": "123"}}`) + if !bytes.Equal(got, want) { + t.Errorf("got %q, want %q", got, want) + } +} diff --git a/client/go/internal/vespa/document/feeder.go b/client/go/internal/vespa/document/feeder.go new file mode 100644 index 00000000000..732db051dab --- /dev/null +++ b/client/go/internal/vespa/document/feeder.go @@ -0,0 +1,97 @@ +package document + +import ( + "time" +) + +type Status int + +const ( + // StatusSuccess indicates a successful document operation. + StatusSuccess Status = iota + // StatusConditionNotMet indicates that the document operation itself was successful, but did not satisfy its + // test-and-set condition. + StatusConditionNotMet + // StatusVespaFailure indicates that Vespa failed to process the document operation. + StatusVespaFailure + // StatusTransportFailure indicates that there was failure in the transport layer error while sending the document + // operation to Vespa. + StatusTransportFailure + // StatusError is a catch-all status for any other error that might occur. + StatusError +) + +// Result represents the result of a feeding operation. +type Result struct { + Id Id + Status Status + Message string + Trace string + Err error +} + +// Success returns whether status s is considered a success. +func (s Status) Success() bool { return s == StatusSuccess || s == StatusConditionNotMet } + +// Stats represents the summed statistics of a feeder. +type Stats struct { + Requests int64 + Responses int64 + ResponsesByCode map[int]int64 + Errors int64 + Inflight int64 + TotalLatency time.Duration + MinLatency time.Duration + MaxLatency time.Duration + BytesSent int64 + BytesRecv int64 +} + +func NewStats() Stats { return Stats{ResponsesByCode: make(map[int]int64)} } + +// AvgLatency returns the average latency for a request. +func (s Stats) AvgLatency() time.Duration { + requests := s.Requests + if requests == 0 { + requests = 1 + } + return s.TotalLatency / time.Duration(requests) +} + +func (s Stats) Successes() int64 { + if s.ResponsesByCode == nil { + return 0 + } + return s.ResponsesByCode[200] +} + +// Add adds all statistics contained in other to this. +func (s *Stats) Add(other Stats) { + s.Requests += other.Requests + s.Responses += other.Responses + for code, count := range other.ResponsesByCode { + _, ok := s.ResponsesByCode[code] + if ok { + s.ResponsesByCode[code] += count + } else { + s.ResponsesByCode[code] = count + } + } + s.Errors += other.Errors + s.Inflight += other.Inflight + s.TotalLatency += other.TotalLatency + if s.MinLatency == 0 || other.MinLatency < s.MinLatency { + s.MinLatency = other.MinLatency + } + if other.MaxLatency > s.MaxLatency { + s.MaxLatency = other.MaxLatency + } + s.BytesSent += other.BytesSent + s.BytesRecv += other.BytesRecv +} + +// Feeder is the interface for a consumer of documents. +type Feeder interface { + Send(Document) Result + Stats() Stats +} diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go new file mode 100644 index 00000000000..ad6765aecc8 --- /dev/null +++ b/client/go/internal/vespa/document/http.go @@ -0,0 +1,192 @@ +package document + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + "time" + + "github.com/vespa-engine/vespa/client/go/internal/util" +) + +// Client represents a HTTP client for the /document/v1/ API. +type Client struct { + options ClientOptions + httpClient util.HTTPClient + stats Stats + mu sync.Mutex + now func() time.Time +} + +// ClientOptions specifices the configuration options of a feed client. +type ClientOptions struct { + MaxConnsPerHost int + BaseURL string + Timeout time.Duration + Route string + TraceLevel *int +} + +type countingReader struct { + reader io.Reader + bytesRead int64 +} + +func (r *countingReader) Read(p []byte) (int, error) { + n, err := r.reader.Read(p) + r.bytesRead += int64(n) + return n, err +} + +func NewClient(options ClientOptions, httpClient util.HTTPClient) *Client { + c := &Client{ + options: options, + httpClient: httpClient, + stats: NewStats(), + now: time.Now, + } + httpClient.Transport().MaxConnsPerHost = options.MaxConnsPerHost + return c +} + +func (c *Client) queryParams() url.Values { + params := url.Values{} + if c.options.Timeout > 0 { + params.Set("timeout", strconv.FormatInt(c.options.Timeout.Milliseconds(), 10)+"ms") + } + if c.options.Route != "" { + params.Set("route", c.options.Route) + } + if c.options.TraceLevel != nil { + params.Set("tracelevel", strconv.Itoa(*c.options.TraceLevel)) + } + return params +} + +func urlPath(id Id) string { + var sb strings.Builder + sb.WriteString("/document/v1/") + sb.WriteString(url.PathEscape(id.Namespace)) + sb.WriteString("/") + sb.WriteString(url.PathEscape(id.Type)) + if id.Number != nil { + sb.WriteString("/number/") + n := uint64(*id.Number) + sb.WriteString(strconv.FormatUint(n, 10)) + } else if id.Group != "" { + sb.WriteString("/group/") + sb.WriteString(url.PathEscape(id.Group)) + } else { + sb.WriteString("/docid") + } + sb.WriteString("/") + sb.WriteString(url.PathEscape(id.UserSpecific)) + return sb.String() +} + +func (c *Client) feedURL(d Document, queryParams url.Values) (string, *url.URL, error) { + u, err := url.Parse(c.options.BaseURL) + if err != nil { + return "", nil, fmt.Errorf("invalid base url: %w", err) + } + httpMethod := "" + switch d.Operation { + case OperationPut: + httpMethod = "POST" + case OperationUpdate: + httpMethod = "PUT" + case OperationRemove: + httpMethod = "DELETE" + } + if d.Condition != "" { + queryParams.Set("condition", d.Condition) + } + if d.Create { + queryParams.Set("create", "true") + } + u.Path = urlPath(d.Id) + u.RawQuery = queryParams.Encode() + return httpMethod, u, nil +} + +// Send given document the URL configured in this client. +func (c *Client) Send(document Document) Result { + start := c.now() + stats := NewStats() + stats.Requests = 1 + defer func() { + latency := c.now().Sub(start) + stats.TotalLatency = latency + stats.MinLatency = latency + stats.MaxLatency = latency + c.addStats(&stats) + }() + method, url, err := c.feedURL(document, c.queryParams()) + if err != nil { + stats.Errors = 1 + return Result{Status: StatusError, Err: err} + } + body := document.Body() + req, err := http.NewRequest(method, url.String(), bytes.NewReader(body)) + if err != nil { + stats.Errors = 1 + return Result{Status: StatusError, Err: err} + } + resp, err := c.httpClient.Do(req, c.options.Timeout) + if err != nil { + stats.Errors = 1 + return Result{Status: StatusTransportFailure, Err: err} + } + defer resp.Body.Close() + stats.Responses = 1 + stats.ResponsesByCode = map[int]int64{ + resp.StatusCode: 1, + } + stats.BytesSent = int64(len(body)) + return c.createResult(document.Id, &stats, resp) +} + +func (c *Client) Stats() Stats { return c.stats } + +func (c *Client) addStats(stats *Stats) { + c.mu.Lock() + defer c.mu.Unlock() + c.stats.Add(*stats) +} + +func (c *Client) createResult(id Id, stats *Stats, resp *http.Response) Result { + result := Result{Id: id} + switch resp.StatusCode { + case 200: + result.Status = StatusSuccess + case 412: + result.Status = StatusConditionNotMet + case 502, 504, 507: + result.Status = StatusVespaFailure + default: + result.Status = StatusTransportFailure + } + var body struct { + Message string `json:"message"` + Trace json.RawMessage `json:"trace"` + } + cr := countingReader{reader: resp.Body} + jsonDec := json.NewDecoder(&cr) + if err := jsonDec.Decode(&body); err != nil { + result.Status = StatusError + result.Err = fmt.Errorf("failed to decode json response: %w", err) + } + result.Message = body.Message + result.Trace = string(body.Trace) + stats.BytesRecv = cr.bytesRead + if !result.Status.Success() { + stats.Errors = 1 + } + return result +} diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go new file mode 100644 index 00000000000..ca59c4c2af0 --- /dev/null +++ b/client/go/internal/vespa/document/http_test.go @@ -0,0 +1,190 @@ +package document + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "reflect" + "testing" + "time" + + "github.com/vespa-engine/vespa/client/go/internal/mock" +) + +type manualClock struct { + t time.Time + tick time.Duration +} + +func (c *manualClock) now() time.Time { + t := c.t + c.t = c.t.Add(c.tick) + return t +} + +func TestClientSend(t *testing.T) { + docs := []Document{ + mustParseDocument(Document{Create: true, UpdateId: "id:ns:type::doc1", Fields: json.RawMessage(`{"foo": "123"}`)}), + mustParseDocument(Document{Create: true, UpdateId: "id:ns:type::doc2", Fields: json.RawMessage(`{"foo": "456"}`)}), + mustParseDocument(Document{Create: true, UpdateId: "id:ns:type::doc3", Fields: json.RawMessage(`{"baz": "789"}`)}), + } + httpClient := mock.HTTPClient{} + client := NewClient(ClientOptions{ + BaseURL: "https://example.com:1337", + Timeout: time.Duration(5 * time.Second), + }, &httpClient) + clock := manualClock{t: time.Now(), tick: time.Second} + client.now = clock.now + for i, doc := range docs { + if i < 2 { + httpClient.NextResponseString(200, `{"message":"All good!"}`) + } else { + httpClient.NextResponseString(502, `{"message":"Good bye, cruel world!"}`) + } + res := client.Send(doc) + if res.Err != nil { + t.Fatalf("got unexpected error %q", res.Err) + } + r := httpClient.LastRequest + if r.Method != http.MethodPut { + t.Errorf("got r.Method = %q, want %q", r.Method, http.MethodPut) + } + wantURL := fmt.Sprintf("https://example.com:1337/document/v1/ns/type/docid/%s?create=true&timeout=5000ms", doc.Id.UserSpecific) + if r.URL.String() != wantURL { + t.Errorf("got r.URL = %q, want %q", r.URL, wantURL) + } + body, err := io.ReadAll(r.Body) + if err != nil { + t.Fatalf("got unexpected error %q", err) + } + wantBody := doc.Body() + if !bytes.Equal(body, wantBody) { + t.Errorf("got r.Body = %q, want %q", string(body), string(wantBody)) + } + } + stats := client.Stats() + want := Stats{ + Requests: 3, + Responses: 3, + ResponsesByCode: map[int]int64{ + 200: 2, + 502: 1, + }, + Errors: 1, + Inflight: 0, + TotalLatency: 3 * time.Second, + MinLatency: time.Second, + MaxLatency: time.Second, + BytesSent: 75, + BytesRecv: 82, + } + if !reflect.DeepEqual(want, stats) { + t.Errorf("got %+v, want %+v", stats, want) + } +} + +func TestURLPath(t *testing.T) { + tests := []struct { + in Id + out string + }{ + { + Id{ + Namespace: "ns-with-/", + Type: "type-with-/", + UserSpecific: "user", + }, + "/document/v1/ns-with-%2F/type-with-%2F/docid/user", + }, + { + Id{ + Namespace: "ns", + Type: "type", + Number: ptr(int64(123)), + UserSpecific: "user", + }, + "/document/v1/ns/type/number/123/user", + }, + { + Id{ + Namespace: "ns", + Type: "type", + Group: "foo", + UserSpecific: "user", + }, + "/document/v1/ns/type/group/foo/user", + }, + { + Id{ + Namespace: "ns", + Type: "type", + UserSpecific: "user::specific", + }, + "/document/v1/ns/type/docid/user::specific", + }, + { + Id{ + Namespace: "ns", + Type: "type", + UserSpecific: ":", + }, + "/document/v1/ns/type/docid/:", + }, + } + for i, tt := range tests { + path := urlPath(tt.in) + if path != tt.out { + t.Errorf("#%d: documentPath(%q) = %s, want %s", i, tt.in, path, tt.out) + } + } +} + +func TestClientFeedURL(t *testing.T) { + tests := []struct { + in Document + method string + url string + }{ + { + mustParseDocument(Document{ + IdString: "id:ns:type::user", + }), + "POST", + "https://example.com/document/v1/ns/type/docid/user?foo=ba%2Fr", + }, + { + mustParseDocument(Document{ + UpdateId: "id:ns:type::user", + Create: true, + Condition: "false", + }), + "PUT", + "https://example.com/document/v1/ns/type/docid/user?condition=false&create=true&foo=ba%2Fr", + }, + { + mustParseDocument(Document{ + RemoveId: "id:ns:type::user", + }), + "DELETE", + "https://example.com/document/v1/ns/type/docid/user?foo=ba%2Fr", + }, + } + httpClient := mock.HTTPClient{} + client := NewClient(ClientOptions{ + BaseURL: "https://example.com", + }, &httpClient) + for i, tt := range tests { + moreParams := url.Values{} + moreParams.Set("foo", "ba/r") + method, u, err := client.feedURL(tt.in, moreParams) + if err != nil { + t.Errorf("#%d: got unexpected error = %s, want none", i, err) + } + if u.String() != tt.url || method != tt.method { + t.Errorf("#%d: URL() = (%s, %s), want (%s, %s)", i, method, u.String(), tt.method, tt.url) + } + } +} diff --git a/client/go/internal/vespa/target.go b/client/go/internal/vespa/target.go index 446b02c05cf..0e173175720 100644 --- a/client/go/internal/vespa/target.go +++ b/client/go/internal/vespa/target.go @@ -43,7 +43,8 @@ type Service struct { BaseURL string Name string TLSOptions TLSOptions - ztsClient ztsClient + + zts zts httpClient util.HTTPClient } @@ -91,11 +92,8 @@ type LogOptions struct { // Do sends request to this service. Any required authentication happens automatically. func (s *Service) Do(request *http.Request, timeout time.Duration) (*http.Response, error) { - if s.TLSOptions.KeyPair.Certificate != nil { - s.httpClient.UseCertificate([]tls.Certificate{s.TLSOptions.KeyPair}) - } if s.TLSOptions.AthenzDomain != "" { - accessToken, err := s.ztsClient.AccessToken(s.TLSOptions.AthenzDomain, s.TLSOptions.KeyPair) + accessToken, err := s.zts.AccessToken(s.TLSOptions.AthenzDomain, s.TLSOptions.KeyPair) if err != nil { return nil, err } @@ -107,6 +105,8 @@ func (s *Service) Do(request *http.Request, timeout time.Duration) (*http.Respon return s.httpClient.Do(request, timeout) } +func (s *Service) Transport() *http.Transport { return s.httpClient.Transport() } + // Wait polls the health check of this service until it succeeds or timeout passes. func (s *Service) Wait(timeout time.Duration) (int, error) { url := s.BaseURL @@ -152,7 +152,7 @@ func waitForOK(client util.HTTPClient, url string, certificate *tls.Certificate, func wait(client util.HTTPClient, fn responseFunc, reqFn requestFunc, certificate *tls.Certificate, timeout time.Duration) (int, error) { if certificate != nil { - client.UseCertificate([]tls.Certificate{*certificate}) + util.SetCertificate(client, []tls.Certificate{*certificate}) } var ( httpErr error diff --git a/client/go/internal/vespa/target_cloud.go b/client/go/internal/vespa/target_cloud.go index eb9cc41014a..827d6c6a56a 100644 --- a/client/go/internal/vespa/target_cloud.go +++ b/client/go/internal/vespa/target_cloud.go @@ -12,18 +12,15 @@ import ( "strings" "time" - "github.com/vespa-engine/vespa/client/go/internal/cli/auth/auth0" - "github.com/vespa-engine/vespa/client/go/internal/cli/auth/zts" "github.com/vespa-engine/vespa/client/go/internal/util" "github.com/vespa-engine/vespa/client/go/internal/version" ) // CloudOptions configures URL and authentication for a cloud target. type APIOptions struct { - System System - TLSOptions TLSOptions - APIKey []byte - AuthConfigPath string + System System + TLSOptions TLSOptions + APIKey []byte } // CloudDeploymentOptions configures the deployment to manage through a cloud target. @@ -38,7 +35,8 @@ type cloudTarget struct { deploymentOptions CloudDeploymentOptions logOptions LogOptions httpClient util.HTTPClient - ztsClient ztsClient + zts zts + auth0 auth0 } type deploymentEndpoint struct { @@ -64,22 +62,23 @@ type logMessage struct { Message string `json:"message"` } -type ztsClient interface { +type zts interface { AccessToken(domain string, certficiate tls.Certificate) (string, error) } +type auth0 interface { + AccessToken() (string, error) +} + // CloudTarget creates a Target for the Vespa Cloud or hosted Vespa platform. -func CloudTarget(httpClient util.HTTPClient, apiOptions APIOptions, deploymentOptions CloudDeploymentOptions, logOptions LogOptions) (Target, error) { - ztsClient, err := zts.NewClient(zts.DefaultURL, httpClient) - if err != nil { - return nil, err - } +func CloudTarget(httpClient util.HTTPClient, ztsClient zts, auth0Client auth0, apiOptions APIOptions, deploymentOptions CloudDeploymentOptions, logOptions LogOptions) (Target, error) { return &cloudTarget{ httpClient: httpClient, apiOptions: apiOptions, deploymentOptions: deploymentOptions, logOptions: logOptions, - ztsClient: ztsClient, + zts: ztsClient, + auth0: auth0Client, }, nil } @@ -119,13 +118,14 @@ func (t *cloudTarget) IsCloud() bool { return true } func (t *cloudTarget) Deployment() Deployment { return t.deploymentOptions.Deployment } func (t *cloudTarget) Service(name string, timeout time.Duration, runID int64, cluster string) (*Service, error) { + var service *Service switch name { case DeployService: - service := &Service{ + service = &Service{ Name: name, BaseURL: t.apiOptions.System.URL, TLSOptions: t.apiOptions.TLSOptions, - ztsClient: t.ztsClient, + zts: t.zts, httpClient: t.httpClient, } if timeout > 0 { @@ -137,7 +137,6 @@ func (t *cloudTarget) Service(name string, timeout time.Duration, runID int64, c return nil, fmt.Errorf("got status %d from deploy service at %s", status, service.BaseURL) } } - return service, nil case QueryService, DocumentService: if t.deploymentOptions.ClusterURLs == nil { if err := t.waitForEndpoints(timeout, runID); err != nil { @@ -149,15 +148,22 @@ func (t *cloudTarget) Service(name string, timeout time.Duration, runID int64, c return nil, err } t.deploymentOptions.TLSOptions.AthenzDomain = t.apiOptions.System.AthenzDomain - return &Service{ + service = &Service{ Name: name, BaseURL: url, TLSOptions: t.deploymentOptions.TLSOptions, - ztsClient: t.ztsClient, + zts: t.zts, httpClient: t.httpClient, - }, nil + } + + default: + return nil, fmt.Errorf("unknown service: %s", name) + + } + if service.TLSOptions.KeyPair.Certificate != nil { + util.SetCertificate(service, []tls.Certificate{service.TLSOptions.KeyPair}) } - return nil, fmt.Errorf("unknown service: %s", name) + return service, nil } func (t *cloudTarget) SignRequest(req *http.Request, keyID string) error { @@ -207,11 +213,7 @@ func (t *cloudTarget) CheckVersion(clientVersion version.Version) error { } func (t *cloudTarget) addAuth0AccessToken(request *http.Request) error { - client, err := auth0.New(t.apiOptions.AuthConfigPath, t.apiOptions.System.Name, t.apiOptions.System.URL) - if err != nil { - return err - } - accessToken, err := client.GetAccessToken() + accessToken, err := t.auth0.AccessToken() if err != nil { return err } diff --git a/client/go/internal/vespa/target_test.go b/client/go/internal/vespa/target_test.go index f7731611deb..4f2e361fb39 100644 --- a/client/go/internal/vespa/target_test.go +++ b/client/go/internal/vespa/target_test.go @@ -164,6 +164,8 @@ func createCloudTarget(t *testing.T, url string, logWriter io.Writer) Target { target, err := CloudTarget( util.CreateClient(time.Second*10), + &mockZTS{}, + &mockAuth0{}, APIOptions{APIKey: apiKey, System: PublicSystem}, CloudDeploymentOptions{ Deployment: Deployment{ @@ -179,7 +181,7 @@ func createCloudTarget(t *testing.T, url string, logWriter io.Writer) Target { } if ct, ok := target.(*cloudTarget); ok { ct.apiOptions.System.URL = url - ct.ztsClient = &mockZTSClient{token: "foo bar"} + ct.zts = &mockZTS{token: "foo bar"} } else { t.Fatalf("Wrong target type %T", ct) } @@ -201,10 +203,14 @@ func assertServiceWait(t *testing.T, expectedStatus int, target Target, service assert.Equal(t, expectedStatus, status) } -type mockZTSClient struct { - token string -} +type mockZTS struct{ token string } -func (c *mockZTSClient) AccessToken(domain string, certificate tls.Certificate) (string, error) { +func (c *mockZTS) AccessToken(domain string, certificate tls.Certificate) (string, error) { return c.token, nil } + +type mockAuth0 struct{} + +func (a *mockAuth0) AccessToken() (string, error) { return "", nil } + +func (a *mockAuth0) HasCredentials() bool { return true } |