aboutsummaryrefslogtreecommitdiffstats
path: root/client
diff options
context:
space:
mode:
Diffstat (limited to 'client')
-rw-r--r--client/go/Makefile2
-rw-r--r--client/go/go.sum15
-rw-r--r--client/go/internal/admin/clusterstate/get_cluster_state.go2
-rw-r--r--client/go/internal/admin/clusterstate/get_node_state.go2
-rw-r--r--client/go/internal/admin/clusterstate/set_node_state.go2
-rw-r--r--client/go/internal/admin/deploy/cmd.go2
-rw-r--r--client/go/internal/admin/jvm/container.go4
-rw-r--r--client/go/internal/admin/trace/log.go8
-rw-r--r--client/go/internal/admin/vespa-wrapper/logfmt/cmd.go2
-rw-r--r--client/go/internal/admin/vespa-wrapper/services/configproxy.go1
-rw-r--r--client/go/internal/build/build.go (renamed from client/go/internal/cli/build/build.go)0
-rw-r--r--client/go/internal/cli/auth/auth0/auth0.go52
-rw-r--r--client/go/internal/cli/auth/auth0/auth0_test.go4
-rw-r--r--client/go/internal/cli/auth/zts/zts.go4
-rw-r--r--client/go/internal/cli/auth/zts/zts_test.go2
-rw-r--r--client/go/internal/cli/cmd/config.go2
-rw-r--r--client/go/internal/cli/cmd/config_test.go22
-rw-r--r--client/go/internal/cli/cmd/feed.go143
-rw-r--r--client/go/internal/cli/cmd/feed_test.go67
-rw-r--r--client/go/internal/cli/cmd/login.go8
-rw-r--r--client/go/internal/cli/cmd/logout.go9
-rw-r--r--client/go/internal/cli/cmd/root.go54
-rw-r--r--client/go/internal/cli/cmd/test.go2
-rw-r--r--client/go/internal/cli/cmd/testutil_test.go22
-rw-r--r--client/go/internal/cli/cmd/version.go2
-rw-r--r--client/go/internal/mock/http.go3
-rw-r--r--client/go/internal/util/http.go26
-rw-r--r--client/go/internal/vespa/document/dispatcher.go63
-rw-r--r--client/go/internal/vespa/document/dispatcher_test.go37
-rw-r--r--client/go/internal/vespa/document/document.go241
-rw-r--r--client/go/internal/vespa/document/document_test.go190
-rw-r--r--client/go/internal/vespa/document/feeder.go97
-rw-r--r--client/go/internal/vespa/document/http.go192
-rw-r--r--client/go/internal/vespa/document/http_test.go190
-rw-r--r--client/go/internal/vespa/target.go12
-rw-r--r--client/go/internal/vespa/target_cloud.go54
-rw-r--r--client/go/internal/vespa/target_test.go16
37 files changed, 1424 insertions, 130 deletions
diff --git a/client/go/Makefile b/client/go/Makefile
index b8ce953eff0..c70154eb247 100644
--- a/client/go/Makefile
+++ b/client/go/Makefile
@@ -13,7 +13,7 @@ BIN ?= $(PREFIX)/bin
SHARE ?= $(PREFIX)/share
DIST ?= $(CURDIR)/dist
-GO_FLAGS := -ldflags "-X github.com/vespa-engine/vespa/client/go/internal/cli/build.Version=$(VERSION)"
+GO_FLAGS := -ldflags "-X github.com/vespa-engine/vespa/client/go/internal/build.Version=$(VERSION)"
GIT_ROOT := $(shell git rev-parse --show-toplevel)
DIST_TARGETS := dist-mac dist-mac-arm64 dist-linux dist-linux-arm64 dist-win32 dist-win64
diff --git a/client/go/go.sum b/client/go/go.sum
index 084bde701ce..3252517a08a 100644
--- a/client/go/go.sum
+++ b/client/go/go.sum
@@ -4,10 +4,13 @@ github.com/cpuguy83/go-md2man/v2 v2.0.1 h1:r/myEWzV9lfsM1tFLgDyu0atFtJ1fXn261LKY
github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/danieljoos/wincred v1.1.0 h1:3RNcEpBg4IhIChZdFRSdlQt1QjCp1sMAPIrOnm7Yf8g=
github.com/danieljoos/wincred v1.1.0/go.mod h1:XYlo+eRTsVA9aHGp7NGjFkPla4m+DCL7hqDjlFjiygg=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
+github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
@@ -19,8 +22,11 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
+github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
+github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.13 h1:qdl+GuBjcsKKDco5BsxPJlId98mSWNKqYA+Co0SC1yA=
github.com/mattn/go-isatty v0.0.13/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/pkg/browser v0.0.0-20210706143420-7d21f8c997e2 h1:acNfDZXmm28D2Yg/c3ALnZStzNaZMSagpbr96vY6Zjc=
@@ -33,17 +39,26 @@ github.com/spf13/cobra v1.4.0 h1:y+wJpx64xcgO1V+RcnwW0LEHxTKRi2ZDPSBjWnrg88Q=
github.com/spf13/cobra v1.4.0/go.mod h1:Wo4iy3BUC+X2Fybo0PDqwJIv3dNRiZLHQymsfxlB84g=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/zalando/go-keyring v0.1.1 h1:w2V9lcx/Uj4l+dzAf1m9s+DJ1O8ROkEHnynonHjTcYE=
github.com/zalando/go-keyring v0.1.1/go.mod h1:OIC+OZ28XbmwFxU/Rp9V7eKzZjamBJwRzC8UFJH9+L8=
+golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210616045830-e2b7044e8c71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/client/go/internal/admin/clusterstate/get_cluster_state.go b/client/go/internal/admin/clusterstate/get_cluster_state.go
index 505235a284e..d5d2b894fe4 100644
--- a/client/go/internal/admin/clusterstate/get_cluster_state.go
+++ b/client/go/internal/admin/clusterstate/get_cluster_state.go
@@ -13,7 +13,7 @@ import (
"github.com/spf13/cobra"
"github.com/vespa-engine/vespa/client/go/internal/admin/envvars"
"github.com/vespa-engine/vespa/client/go/internal/admin/trace"
- "github.com/vespa-engine/vespa/client/go/internal/cli/build"
+ "github.com/vespa-engine/vespa/client/go/internal/build"
)
func NewGetClusterStateCmd() *cobra.Command {
diff --git a/client/go/internal/admin/clusterstate/get_node_state.go b/client/go/internal/admin/clusterstate/get_node_state.go
index 6d45e377a72..2308446e0fc 100644
--- a/client/go/internal/admin/clusterstate/get_node_state.go
+++ b/client/go/internal/admin/clusterstate/get_node_state.go
@@ -14,7 +14,7 @@ import (
"github.com/spf13/cobra"
"github.com/vespa-engine/vespa/client/go/internal/admin/envvars"
"github.com/vespa-engine/vespa/client/go/internal/admin/trace"
- "github.com/vespa-engine/vespa/client/go/internal/cli/build"
+ "github.com/vespa-engine/vespa/client/go/internal/build"
)
const (
diff --git a/client/go/internal/admin/clusterstate/set_node_state.go b/client/go/internal/admin/clusterstate/set_node_state.go
index 2a6869c84f5..6c1542b230c 100644
--- a/client/go/internal/admin/clusterstate/set_node_state.go
+++ b/client/go/internal/admin/clusterstate/set_node_state.go
@@ -15,7 +15,7 @@ import (
"github.com/spf13/cobra"
"github.com/vespa-engine/vespa/client/go/internal/admin/envvars"
"github.com/vespa-engine/vespa/client/go/internal/admin/trace"
- "github.com/vespa-engine/vespa/client/go/internal/cli/build"
+ "github.com/vespa-engine/vespa/client/go/internal/build"
"github.com/vespa-engine/vespa/client/go/internal/util"
)
diff --git a/client/go/internal/admin/deploy/cmd.go b/client/go/internal/admin/deploy/cmd.go
index c4489d11771..9ae7cb894a6 100644
--- a/client/go/internal/admin/deploy/cmd.go
+++ b/client/go/internal/admin/deploy/cmd.go
@@ -10,7 +10,7 @@ import (
"github.com/spf13/cobra"
"github.com/vespa-engine/vespa/client/go/internal/admin/trace"
- "github.com/vespa-engine/vespa/client/go/internal/cli/build"
+ "github.com/vespa-engine/vespa/client/go/internal/build"
"github.com/vespa-engine/vespa/client/go/internal/util"
"github.com/vespa-engine/vespa/client/go/internal/vespa"
)
diff --git a/client/go/internal/admin/jvm/container.go b/client/go/internal/admin/jvm/container.go
index d7bdeae43b6..32110c5e1bc 100644
--- a/client/go/internal/admin/jvm/container.go
+++ b/client/go/internal/admin/jvm/container.go
@@ -68,8 +68,8 @@ func (cb *containerBase) Exec() {
if cb.propsFile != "" {
writeEnvAsProperties(p.EffectiveEnv(), cb.propsFile)
}
- trace.Info("starting container; env:", readableEnv(p.Env))
- trace.Info("starting container; exec:", argv)
+ trace.Info("JVM env:", readableEnv(p.Env))
+ trace.Info("JVM exec:", argv)
err := p.Run()
util.JustExitWith(err)
}
diff --git a/client/go/internal/admin/trace/log.go b/client/go/internal/admin/trace/log.go
index 0254d0e4f4a..7b7fe6d29ec 100644
--- a/client/go/internal/admin/trace/log.go
+++ b/client/go/internal/admin/trace/log.go
@@ -13,6 +13,12 @@ import (
"github.com/vespa-engine/vespa/client/go/internal/admin/envvars"
)
+func getComponent() string {
+ s := os.Args[0]
+ parts := strings.Split(s, "/")
+ return parts[len(parts)-1]
+}
+
// make a vespa-format log line
func logMessage(l outputLevel, msg string) {
@@ -24,7 +30,7 @@ func logMessage(l outputLevel, msg string) {
if service == "" {
service = "-"
}
- component := "stderr"
+ component := getComponent()
level := "error"
switch l {
case levelWarning:
diff --git a/client/go/internal/admin/vespa-wrapper/logfmt/cmd.go b/client/go/internal/admin/vespa-wrapper/logfmt/cmd.go
index a8675c37356..4406457ac23 100644
--- a/client/go/internal/admin/vespa-wrapper/logfmt/cmd.go
+++ b/client/go/internal/admin/vespa-wrapper/logfmt/cmd.go
@@ -6,7 +6,7 @@ package logfmt
import (
"github.com/spf13/cobra"
- "github.com/vespa-engine/vespa/client/go/internal/cli/build"
+ "github.com/vespa-engine/vespa/client/go/internal/build"
)
func NewLogfmtCmd() *cobra.Command {
diff --git a/client/go/internal/admin/vespa-wrapper/services/configproxy.go b/client/go/internal/admin/vespa-wrapper/services/configproxy.go
index 9807d8d4591..0ff3e865c0a 100644
--- a/client/go/internal/admin/vespa-wrapper/services/configproxy.go
+++ b/client/go/internal/admin/vespa-wrapper/services/configproxy.go
@@ -25,6 +25,7 @@ const (
)
func JustRunConfigproxy() int {
+ os.Setenv(envvars.VESPA_SERVICE_NAME, PROXY_SERVICE_NAME)
commonPreChecks()
vespa.CheckCorrectUser()
configsources := defaults.VespaConfigserverRpcAddrs()
diff --git a/client/go/internal/cli/build/build.go b/client/go/internal/build/build.go
index a8342a9fb1e..a8342a9fb1e 100644
--- a/client/go/internal/cli/build/build.go
+++ b/client/go/internal/build/build.go
diff --git a/client/go/internal/cli/auth/auth0/auth0.go b/client/go/internal/cli/auth/auth0/auth0.go
index 36dc0b8871c..5f7612d4d2e 100644
--- a/client/go/internal/cli/auth/auth0/auth0.go
+++ b/client/go/internal/cli/auth/auth0/auth0.go
@@ -33,12 +33,16 @@ type Credentials struct {
type Client struct {
httpClient util.HTTPClient
Authenticator *auth.Authenticator // TODO: Make this private
- configPath string
- systemName string
- systemURL string
+ options Options
provider auth0Provider
}
+type Options struct {
+ ConfigPath string
+ SystemName string
+ SystemURL string
+}
+
// config is the root type of the persisted config
type config struct {
Version int `json:"version"`
@@ -74,12 +78,12 @@ func cancelOnInterrupt() context.Context {
return ctx
}
-func newClient(httpClient util.HTTPClient, configPath, systemName, systemURL string) (*Client, error) {
+// NewClient constructs a new Auth0 client, storing configuration in the given configPath. The client will be configured for
+// use in the given Vespa system.
+func NewClient(httpClient util.HTTPClient, options Options) (*Client, error) {
a := Client{}
a.httpClient = httpClient
- a.configPath = configPath
- a.systemName = systemName
- a.systemURL = systemURL
+ a.options = options
c, err := a.getDeviceFlowConfig()
if err != nil {
return nil, err
@@ -90,7 +94,7 @@ func newClient(httpClient util.HTTPClient, configPath, systemName, systemURL str
DeviceCodeEndpoint: c.DeviceCodeEndpoint,
OauthTokenEndpoint: c.OauthTokenEndpoint,
}
- provider, err := readConfig(configPath)
+ provider, err := readConfig(options.ConfigPath)
if err != nil {
return nil, err
}
@@ -98,14 +102,8 @@ func newClient(httpClient util.HTTPClient, configPath, systemName, systemURL str
return &a, nil
}
-// New constructs a new Auth0 client, storing configuration in the given configPath. The client will be configured for
-// use in the given Vespa system.
-func New(configPath string, systemName, systemURL string) (*Client, error) {
- return newClient(util.CreateClient(time.Second*30), configPath, systemName, systemURL)
-}
-
func (a *Client) getDeviceFlowConfig() (flowConfig, error) {
- url := a.systemURL + "/auth0/v1/device-flow-config"
+ url := a.options.SystemURL + "/auth0/v1/device-flow-config"
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return flowConfig{}, err
@@ -125,11 +123,11 @@ func (a *Client) getDeviceFlowConfig() (flowConfig, error) {
return cfg, nil
}
-// GetAccessToken returns an access token for the configured system, refreshing it if necessary.
-func (a *Client) GetAccessToken() (string, error) {
- creds, ok := a.provider.Systems[a.systemName]
+// AccessToken returns an access token for the configured system, refreshing it if necessary.
+func (a *Client) AccessToken() (string, error) {
+ creds, ok := a.provider.Systems[a.options.SystemName]
if !ok {
- return "", fmt.Errorf("system %s is not configured", a.systemName)
+ return "", fmt.Errorf("system %s is not configured", a.options.SystemName)
} else if creds.AccessToken == "" {
return "", fmt.Errorf("access token missing: %s", reauthMessage)
} else if scopesChanged(creds) {
@@ -142,7 +140,7 @@ func (a *Client) GetAccessToken() (string, error) {
Secrets: &auth.Keyring{},
Client: http.DefaultClient,
}
- resp, err := tr.Refresh(cancelOnInterrupt(), a.systemName)
+ resp, err := tr.Refresh(cancelOnInterrupt(), a.options.SystemName)
if err != nil {
return "", fmt.Errorf("failed to renew access token: %w: %s", err, reauthMessage)
} else {
@@ -177,7 +175,7 @@ func scopesChanged(s Credentials) bool {
// HasCredentials returns true if this client has retrived credentials for the configured system.
func (a *Client) HasCredentials() bool {
- _, ok := a.provider.Systems[a.systemName]
+ _, ok := a.provider.Systems[a.options.SystemName]
return ok
}
@@ -186,8 +184,8 @@ func (a *Client) WriteCredentials(credentials Credentials) error {
if a.provider.Systems == nil {
a.provider.Systems = make(map[string]Credentials)
}
- a.provider.Systems[a.systemName] = credentials
- if err := writeConfig(a.provider, a.configPath); err != nil {
+ a.provider.Systems[a.options.SystemName] = credentials
+ if err := writeConfig(a.provider, a.options.ConfigPath); err != nil {
return fmt.Errorf("failed to write config: %w", err)
}
return nil
@@ -196,11 +194,11 @@ func (a *Client) WriteCredentials(credentials Credentials) error {
// RemoveCredentials removes credentials for the system configured in this client.
func (a *Client) RemoveCredentials() error {
tr := &auth.TokenRetriever{Secrets: &auth.Keyring{}}
- if err := tr.Delete(a.systemName); err != nil {
- return fmt.Errorf("failed to remove system %s from secret storage: %w", a.systemName, err)
+ if err := tr.Delete(a.options.SystemName); err != nil {
+ return fmt.Errorf("failed to remove system %s from secret storage: %w", a.options.SystemName, err)
}
- delete(a.provider.Systems, a.systemName)
- if err := writeConfig(a.provider, a.configPath); err != nil {
+ delete(a.provider.Systems, a.options.SystemName)
+ if err := writeConfig(a.provider, a.options.ConfigPath); err != nil {
return fmt.Errorf("failed to write config: %w", err)
}
return nil
diff --git a/client/go/internal/cli/auth/auth0/auth0_test.go b/client/go/internal/cli/auth/auth0/auth0_test.go
index 39393bbdfc1..b4bd34eb6d6 100644
--- a/client/go/internal/cli/auth/auth0/auth0_test.go
+++ b/client/go/internal/cli/auth/auth0/auth0_test.go
@@ -21,7 +21,7 @@ func TestConfigWriting(t *testing.T) {
"oauth-token-endpoint": "https://example.com/oauth/token"
}`
httpClient.NextResponseString(200, flowConfigResponse)
- client, err := newClient(&httpClient, configPath, "public", "http://example.com")
+ client, err := NewClient(&httpClient, Options{ConfigPath: configPath, SystemName: "public", SystemURL: "http://example.com"})
require.Nil(t, err)
assert.Equal(t, "https://example.com/api/v2/", client.Authenticator.Audience)
assert.Equal(t, "some-id", client.Authenticator.ClientID)
@@ -56,7 +56,7 @@ func TestConfigWriting(t *testing.T) {
// Switch to another system
httpClient.NextResponseString(200, flowConfigResponse)
- client, err = newClient(&httpClient, configPath, "publiccd", "http://example.com")
+ client, err = NewClient(&httpClient, Options{ConfigPath: configPath, SystemName: "publiccd", SystemURL: "http://example.com"})
require.Nil(t, err)
creds2 := Credentials{
AccessToken: "another-token",
diff --git a/client/go/internal/cli/auth/zts/zts.go b/client/go/internal/cli/auth/zts/zts.go
index 9fabe219209..1e84912a271 100644
--- a/client/go/internal/cli/auth/zts/zts.go
+++ b/client/go/internal/cli/auth/zts/zts.go
@@ -21,7 +21,7 @@ type Client struct {
}
// NewClient creates a new client for an Athenz ZTS service located at serviceURL.
-func NewClient(serviceURL string, client util.HTTPClient) (*Client, error) {
+func NewClient(client util.HTTPClient, serviceURL string) (*Client, error) {
tokenURL, err := url.Parse(serviceURL)
if err != nil {
return nil, err
@@ -37,7 +37,7 @@ func (c *Client) AccessToken(domain string, certificate tls.Certificate) (string
if err != nil {
return "", err
}
- c.client.UseCertificate([]tls.Certificate{certificate})
+ util.SetCertificate(c.client, []tls.Certificate{certificate})
response, err := c.client.Do(req, 10*time.Second)
if err != nil {
return "", err
diff --git a/client/go/internal/cli/auth/zts/zts_test.go b/client/go/internal/cli/auth/zts/zts_test.go
index 6c6ced9bb33..d0cc7ea9f9d 100644
--- a/client/go/internal/cli/auth/zts/zts_test.go
+++ b/client/go/internal/cli/auth/zts/zts_test.go
@@ -9,7 +9,7 @@ import (
func TestAccessToken(t *testing.T) {
httpClient := mock.HTTPClient{}
- client, err := NewClient("http://example.com", &httpClient)
+ client, err := NewClient(&httpClient, "http://example.com")
if err != nil {
t.Fatal(err)
}
diff --git a/client/go/internal/cli/cmd/config.go b/client/go/internal/cli/cmd/config.go
index e5124962831..fd049864096 100644
--- a/client/go/internal/cli/cmd/config.go
+++ b/client/go/internal/cli/cmd/config.go
@@ -490,7 +490,7 @@ func (c *Config) readAPIKey(cli *CLI, system vespa.System, tenantName string) ([
return nil, nil // Vespa Cloud CI only talks to data plane and does not have an API key
}
if !cli.isCI() {
- client, err := auth0.New(c.authConfigPath(), system.Name, system.URL)
+ client, err := cli.auth0Factory(cli.httpClient, auth0.Options{ConfigPath: c.authConfigPath(), SystemName: system.Name, SystemURL: system.URL})
if err == nil && client.HasCredentials() {
return nil, nil // use Auth0
}
diff --git a/client/go/internal/cli/cmd/config_test.go b/client/go/internal/cli/cmd/config_test.go
index c76277119a0..612904061de 100644
--- a/client/go/internal/cli/cmd/config_test.go
+++ b/client/go/internal/cli/cmd/config_test.go
@@ -8,7 +8,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "github.com/vespa-engine/vespa/client/go/internal/cli/auth/auth0"
"github.com/vespa-engine/vespa/client/go/internal/mock"
+ "github.com/vespa-engine/vespa/client/go/internal/util"
"github.com/vespa-engine/vespa/client/go/internal/vespa"
)
@@ -184,24 +186,10 @@ func TestReadAPIKey(t *testing.T) {
assert.Equal(t, []byte("baz"), key)
// Auth0 is preferred when configured
- authContent := `
-{
- "version": 1,
- "providers": {
- "auth0": {
- "version": 1,
- "systems": {
- "public": {
- "access_token": "...",
- "scopes": ["openid", "offline_access"],
- "expires_at": "2030-01-01T01:01:01.000001+01:00"
- }
- }
- }
- }
-}`
cli, _, _ = newTestCLI(t)
- require.Nil(t, os.WriteFile(filepath.Join(cli.config.homeDir, "auth.json"), []byte(authContent), 0600))
+ cli.auth0Factory = func(httpClient util.HTTPClient, options auth0.Options) (auth0Client, error) {
+ return &mockAuth0{hasCredentials: true}, nil
+ }
key, err = cli.config.readAPIKey(cli, vespa.PublicSystem, "t1")
require.Nil(t, err)
assert.Nil(t, key)
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go
new file mode 100644
index 00000000000..621676d0353
--- /dev/null
+++ b/client/go/internal/cli/cmd/feed.go
@@ -0,0 +1,143 @@
+package cmd
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "math"
+ "os"
+ "time"
+
+ "github.com/spf13/cobra"
+ "github.com/vespa-engine/vespa/client/go/internal/vespa/document"
+)
+
+func addFeedFlags(cmd *cobra.Command, maxConnections *int, concurrency *int) {
+ cmd.PersistentFlags().IntVarP(maxConnections, "max-connections", "N", 8, "Maximum number of HTTP connections to use")
+ cmd.PersistentFlags().IntVarP(concurrency, "concurrency", "T", 64, "Number of goroutines to use for dispatching")
+}
+
+func newFeedCmd(cli *CLI) *cobra.Command {
+ var (
+ maxConnections int
+ concurrency int
+ )
+ cmd := &cobra.Command{
+ Use: "feed FILE",
+ Short: "Feed documents to a Vespa cluster",
+ Long: `Feed documents to a Vespa cluster.
+
+A high performance feeding client. This can be used to feed large amounts of
+documents to Vespa cluster efficiently.
+
+The contents of FILE must be either a JSON array or JSON objects separated by
+newline (JSONL).
+`,
+ Example: `$ vespa feed documents.jsonl
+`,
+ Args: cobra.ExactArgs(1),
+ DisableAutoGenTag: true,
+ SilenceUsage: true,
+ Hidden: true, // TODO(mpolden): Remove when ready for public use
+ RunE: func(cmd *cobra.Command, args []string) error {
+ f, err := os.Open(args[0])
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ return feed(f, cli, maxConnections, concurrency)
+ },
+ }
+ addFeedFlags(cmd, &maxConnections, &concurrency)
+ return cmd
+}
+
+func feed(r io.Reader, cli *CLI, maxConnections, concurrency int) error {
+ service, err := documentService(cli)
+ if err != nil {
+ return err
+ }
+ client := document.NewClient(document.ClientOptions{
+ BaseURL: service.BaseURL,
+ MaxConnsPerHost: maxConnections,
+ }, service)
+ dispatcher := document.NewDispatcher(client, concurrency)
+ dec := document.NewDecoder(r)
+
+ start := cli.now()
+ for {
+ doc, err := dec.Decode()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ cli.printErr(fmt.Errorf("failed to decode document: %w", err))
+ }
+ if err := dispatcher.Enqueue(doc); err != nil {
+ cli.printErr(err)
+ }
+ }
+ if err := dispatcher.Close(); err != nil {
+ return err
+ }
+ elapsed := cli.now().Sub(start)
+ return writeSummaryJSON(cli.Stdout, client.Stats(), elapsed)
+}
+
+type number float32
+
+func (n number) MarshalJSON() ([]byte, error) { return []byte(fmt.Sprintf("%.3f", n)), nil }
+
+type feedSummary struct {
+ Seconds number `json:"feeder.seconds"`
+ SuccessCount int64 `json:"feeder.ok.count"`
+ SuccessRate number `json:"feeder.ok.rate"`
+ ErrorCount int64 `json:"feeder.error.count"`
+ InflightCount int64 `json:"feeder.inflight.count"`
+
+ RequestCount int64 `json:"http.request.count"`
+ RequestBytes int64 `json:"http.request.bytes"`
+ RequestRate number `json:"http.request.MBps"`
+ ExceptionCount int64 `json:"http.exception.count"` // same as ErrorCount, for compatability with vespa-feed-client
+
+ ResponseCount int64 `json:"http.response.count"`
+ ResponseBytes int64 `json:"http.response.bytes"`
+ ResponseRate number `json:"http.response.MBps"`
+ ResponseErrorCount int64 `json:"http.response.error.count"`
+
+ ResponseMinLatency int64 `json:"http.response.latency.millis.min"`
+ ResponseAvgLatency int64 `json:"http.response.latency.millis.avg"`
+ ResponseMaxLatency int64 `json:"http.response.latency.millis.max"`
+ ResponseCodeCounts map[int]int64 `json:"http.response.code.counts"`
+}
+
+func mbps(bytes int64, duration time.Duration) float64 {
+ return (float64(bytes) / 1000 / 1000) / math.Max(1, duration.Seconds())
+}
+
+func writeSummaryJSON(w io.Writer, stats document.Stats, duration time.Duration) error {
+ summary := feedSummary{
+ Seconds: number(duration.Seconds()),
+ SuccessCount: stats.Successes(),
+ SuccessRate: number(float64(stats.Successes()) / math.Max(1, duration.Seconds())),
+ ErrorCount: stats.Errors,
+ InflightCount: stats.Inflight,
+
+ RequestCount: stats.Requests,
+ RequestBytes: stats.BytesSent,
+ RequestRate: number(mbps(stats.BytesSent, duration)),
+ ExceptionCount: stats.Errors,
+
+ ResponseCount: stats.Responses,
+ ResponseBytes: stats.BytesRecv,
+ ResponseRate: number(mbps(stats.BytesRecv, duration)),
+ ResponseErrorCount: stats.Responses - stats.Successes(),
+ ResponseMinLatency: stats.MinLatency.Milliseconds(),
+ ResponseAvgLatency: stats.AvgLatency().Milliseconds(),
+ ResponseMaxLatency: stats.MaxLatency.Milliseconds(),
+ ResponseCodeCounts: stats.ResponsesByCode,
+ }
+ enc := json.NewEncoder(w)
+ enc.SetIndent("", " ")
+ return enc.Encode(summary)
+}
diff --git a/client/go/internal/cli/cmd/feed_test.go b/client/go/internal/cli/cmd/feed_test.go
new file mode 100644
index 00000000000..1bf1ef6ab9b
--- /dev/null
+++ b/client/go/internal/cli/cmd/feed_test.go
@@ -0,0 +1,67 @@
+package cmd
+
+import (
+ "os"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "github.com/vespa-engine/vespa/client/go/internal/mock"
+)
+
+type manualClock struct {
+ t time.Time
+ tick time.Duration
+}
+
+func (c *manualClock) now() time.Time {
+ t := c.t
+ c.t = c.t.Add(c.tick)
+ return t
+}
+
+func TestFeed(t *testing.T) {
+ httpClient := &mock.HTTPClient{}
+ clock := &manualClock{tick: time.Second}
+ cli, stdout, stderr := newTestCLI(t)
+ cli.httpClient = httpClient
+ cli.now = clock.now
+
+ td := t.TempDir()
+ jsonFile := filepath.Join(td, "docs.jsonl")
+ err := os.WriteFile(jsonFile, []byte(`{
+ "put": "id:ns:type::doc1",
+ "fields": {"foo": "123"}
+}`), 0644)
+
+ require.Nil(t, err)
+
+ httpClient.NextResponseString(200, `{"message":"OK"}`)
+ require.Nil(t, cli.Run("feed", jsonFile))
+
+ assert.Equal(t, "", stderr.String())
+ assert.Equal(t, `{
+ "feeder.seconds": 1.000,
+ "feeder.ok.count": 1,
+ "feeder.ok.rate": 1.000,
+ "feeder.error.count": 0,
+ "feeder.inflight.count": 0,
+ "http.request.count": 1,
+ "http.request.bytes": 25,
+ "http.request.MBps": 0.000,
+ "http.exception.count": 0,
+ "http.response.count": 1,
+ "http.response.bytes": 16,
+ "http.response.MBps": 0.000,
+ "http.response.error.count": 0,
+ "http.response.latency.millis.min": 0,
+ "http.response.latency.millis.avg": 0,
+ "http.response.latency.millis.max": 0,
+ "http.response.code.counts": {
+ "200": 1
+ }
+}
+`, stdout.String())
+}
diff --git a/client/go/internal/cli/cmd/login.go b/client/go/internal/cli/cmd/login.go
index aa6a18b3b38..9ac2262e78d 100644
--- a/client/go/internal/cli/cmd/login.go
+++ b/client/go/internal/cli/cmd/login.go
@@ -35,7 +35,7 @@ func newLoginCmd(cli *CLI) *cobra.Command {
if err != nil {
return err
}
- a, err := auth0.New(cli.config.authConfigPath(), system.Name, system.URL)
+ a, err := auth0.NewClient(cli.httpClient, auth0.Options{ConfigPath: cli.config.authConfigPath(), SystemName: system.Name, SystemURL: system.URL})
if err != nil {
return err
}
@@ -68,16 +68,14 @@ func newLoginCmd(cli *CLI) *cobra.Command {
return fmt.Errorf("login error: %w", err)
}
- log.Print("\n")
- log.Println("Successfully logged in.")
- log.Print("\n")
+ cli.printSuccess("Logged in")
// store the refresh token
secretsStore := &auth.Keyring{}
err = secretsStore.Set(auth.SecretsNamespace, system.Name, res.RefreshToken)
if err != nil {
// log the error but move on
- log.Println("Could not store the refresh token locally, please expect to login again once your access token expired.")
+ cli.printWarning("Could not store the refresh token locally. You may need to login again once your access token expires")
}
creds := auth0.Credentials{
diff --git a/client/go/internal/cli/cmd/logout.go b/client/go/internal/cli/cmd/logout.go
index f2ee6e87ac7..32e7cd9783b 100644
--- a/client/go/internal/cli/cmd/logout.go
+++ b/client/go/internal/cli/cmd/logout.go
@@ -1,8 +1,6 @@
package cmd
import (
- "log"
-
"github.com/spf13/cobra"
"github.com/vespa-engine/vespa/client/go/internal/cli/auth/auth0"
)
@@ -24,17 +22,14 @@ func newLogoutCmd(cli *CLI) *cobra.Command {
if err != nil {
return err
}
- a, err := auth0.New(cli.config.authConfigPath(), system.Name, system.URL)
+ a, err := auth0.NewClient(cli.httpClient, auth0.Options{ConfigPath: cli.config.authConfigPath(), SystemName: system.Name, SystemURL: system.URL})
if err != nil {
return err
}
if err := a.RemoveCredentials(); err != nil {
return err
}
-
- log.Print("\n")
- log.Println("Successfully logged out.")
- log.Print("\n")
+ cli.printSuccess("Logged out")
return nil
},
}
diff --git a/client/go/internal/cli/cmd/root.go b/client/go/internal/cli/cmd/root.go
index 70e0afbcd32..5edfd1136e5 100644
--- a/client/go/internal/cli/cmd/root.go
+++ b/client/go/internal/cli/cmd/root.go
@@ -2,6 +2,7 @@
package cmd
import (
+ "crypto/tls"
"encoding/json"
"fmt"
"io"
@@ -16,7 +17,9 @@ import (
"github.com/mattn/go-isatty"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
- "github.com/vespa-engine/vespa/client/go/internal/cli/build"
+ "github.com/vespa-engine/vespa/client/go/internal/build"
+ "github.com/vespa-engine/vespa/client/go/internal/cli/auth/auth0"
+ "github.com/vespa-engine/vespa/client/go/internal/cli/auth/zts"
"github.com/vespa-engine/vespa/client/go/internal/util"
"github.com/vespa-engine/vespa/client/go/internal/version"
"github.com/vespa-engine/vespa/client/go/internal/vespa"
@@ -45,10 +48,13 @@ type CLI struct {
config *Config
version version.Version
- httpClient util.HTTPClient
- exec executor
- isTerminal func() bool
- spinner func(w io.Writer, message string, fn func() error) error
+ httpClient util.HTTPClient
+ auth0Factory auth0Factory
+ ztsFactory ztsFactory
+ exec executor
+ isTerminal func() bool
+ spinner func(w io.Writer, message string, fn func() error) error
+ now func() time.Time
}
// ErrCLI is an error returned to the user. It wraps an exit status, a regular error and optional hints for resolving
@@ -82,6 +88,19 @@ func (c *execSubprocess) Run(name string, args ...string) ([]byte, error) {
return exec.Command(name, args...).Output()
}
+type ztsClient interface {
+ AccessToken(domain string, certficiate tls.Certificate) (string, error)
+}
+
+type auth0Client interface {
+ AccessToken() (string, error)
+ HasCredentials() bool
+}
+
+type auth0Factory func(httpClient util.HTTPClient, options auth0.Options) (auth0Client, error)
+
+type ztsFactory func(httpClient util.HTTPClient, url string) (ztsClient, error)
+
// New creates the Vespa CLI, writing output to stdout and stderr, and reading environment variables from environment.
func New(stdout, stderr io.Writer, environment []string) (*CLI, error) {
cmd := &cobra.Command{
@@ -123,6 +142,13 @@ For detailed description of flags and configuration, see 'vespa help config'.
cmd: cmd,
httpClient: util.CreateClient(time.Second * 10),
exec: &execSubprocess{},
+ now: time.Now,
+ auth0Factory: func(httpClient util.HTTPClient, options auth0.Options) (auth0Client, error) {
+ return auth0.NewClient(httpClient, options)
+ },
+ ztsFactory: func(httpClient util.HTTPClient, url string) (ztsClient, error) {
+ return zts.NewClient(httpClient, url)
+ },
}
cli.isTerminal = func() bool { return isTerminal(cli.Stdout) && isTerminal(cli.Stderr) }
if err := cli.loadConfig(); err != nil {
@@ -251,6 +277,7 @@ func (c *CLI) configureCommands() {
rootCmd.AddCommand(newTestCmd(c)) // test
rootCmd.AddCommand(newVersionCmd(c)) // version
rootCmd.AddCommand(newVisitCmd(c)) // visit
+ rootCmd.AddCommand(newFeedCmd(c)) // feed
}
func (c *CLI) printErr(err error, hints ...string) {
@@ -359,10 +386,9 @@ func (c *CLI) createCloudTarget(targetType string, opts targetOptions) (vespa.Ta
return nil, fmt.Errorf("invalid cloud target: %s", targetType)
}
apiOptions := vespa.APIOptions{
- System: system,
- TLSOptions: apiTLSOptions,
- APIKey: apiKey,
- AuthConfigPath: authConfigPath,
+ System: system,
+ TLSOptions: apiTLSOptions,
+ APIKey: apiKey,
}
deploymentOptions := vespa.CloudDeploymentOptions{
Deployment: deployment,
@@ -377,7 +403,15 @@ func (c *CLI) createCloudTarget(targetType string, opts targetOptions) (vespa.Ta
Writer: c.Stdout,
Level: vespa.LogLevel(logLevel),
}
- return vespa.CloudTarget(c.httpClient, apiOptions, deploymentOptions, logOptions)
+ auth0, err := c.auth0Factory(c.httpClient, auth0.Options{ConfigPath: authConfigPath, SystemName: apiOptions.System.Name, SystemURL: apiOptions.System.URL})
+ if err != nil {
+ return nil, err
+ }
+ zts, err := c.ztsFactory(c.httpClient, zts.DefaultURL)
+ if err != nil {
+ return nil, err
+ }
+ return vespa.CloudTarget(c.httpClient, zts, auth0, apiOptions, deploymentOptions, logOptions)
}
// system returns the appropiate system for the target configured in this CLI.
diff --git a/client/go/internal/cli/cmd/test.go b/client/go/internal/cli/cmd/test.go
index d071f9556a2..4a53fe6bed3 100644
--- a/client/go/internal/cli/cmd/test.go
+++ b/client/go/internal/cli/cmd/test.go
@@ -263,7 +263,7 @@ func verify(step step, defaultCluster string, defaultParameters map[string]strin
var response *http.Response
if externalEndpoint {
- context.cli.httpClient.UseCertificate([]tls.Certificate{})
+ util.SetCertificate(context.cli.httpClient, []tls.Certificate{})
response, err = context.cli.httpClient.Do(request, 60*time.Second)
} else {
response, err = service.Do(request, 600*time.Second) // Vespa should provide a response within the given request timeout
diff --git a/client/go/internal/cli/cmd/testutil_test.go b/client/go/internal/cli/cmd/testutil_test.go
index 6eade6edd86..61f8dab2264 100644
--- a/client/go/internal/cli/cmd/testutil_test.go
+++ b/client/go/internal/cli/cmd/testutil_test.go
@@ -3,10 +3,13 @@ package cmd
import (
"bytes"
+ "crypto/tls"
"path/filepath"
"testing"
+ "github.com/vespa-engine/vespa/client/go/internal/cli/auth/auth0"
"github.com/vespa-engine/vespa/client/go/internal/mock"
+ "github.com/vespa-engine/vespa/client/go/internal/util"
)
func newTestCLI(t *testing.T, envVars ...string) (*CLI, *bytes.Buffer, *bytes.Buffer) {
@@ -23,7 +26,24 @@ func newTestCLI(t *testing.T, envVars ...string) (*CLI, *bytes.Buffer, *bytes.Bu
if err != nil {
t.Fatal(err)
}
- cli.httpClient = &mock.HTTPClient{}
+ httpClient := &mock.HTTPClient{}
+ cli.httpClient = httpClient
cli.exec = &mock.Exec{}
+ cli.auth0Factory = func(httpClient util.HTTPClient, options auth0.Options) (auth0Client, error) {
+ return &mockAuth0{}, nil
+ }
+ cli.ztsFactory = func(httpClient util.HTTPClient, url string) (ztsClient, error) {
+ return &mockZTS{}, nil
+ }
return cli, &stdout, &stderr
}
+
+type mockZTS struct{}
+
+func (z *mockZTS) AccessToken(domain string, cert tls.Certificate) (string, error) { return "", nil }
+
+type mockAuth0 struct{ hasCredentials bool }
+
+func (a *mockAuth0) AccessToken() (string, error) { return "", nil }
+
+func (a *mockAuth0) HasCredentials() bool { return a.hasCredentials }
diff --git a/client/go/internal/cli/cmd/version.go b/client/go/internal/cli/cmd/version.go
index 864c0668eda..aaf2c42cded 100644
--- a/client/go/internal/cli/cmd/version.go
+++ b/client/go/internal/cli/cmd/version.go
@@ -14,7 +14,7 @@ import (
"github.com/fatih/color"
"github.com/spf13/cobra"
- "github.com/vespa-engine/vespa/client/go/internal/cli/build"
+ "github.com/vespa-engine/vespa/client/go/internal/build"
"github.com/vespa-engine/vespa/client/go/internal/version"
)
diff --git a/client/go/internal/mock/http.go b/client/go/internal/mock/http.go
index 84718e846c1..d1fb4f28327 100644
--- a/client/go/internal/mock/http.go
+++ b/client/go/internal/mock/http.go
@@ -2,7 +2,6 @@ package mock
import (
"bytes"
- "crypto/tls"
"io"
"net/http"
"strconv"
@@ -60,4 +59,4 @@ func (c *HTTPClient) Do(request *http.Request, timeout time.Duration) (*http.Res
nil
}
-func (c *HTTPClient) UseCertificate(certificates []tls.Certificate) {}
+func (c *HTTPClient) Transport() *http.Transport { return &http.Transport{} }
diff --git a/client/go/internal/util/http.go b/client/go/internal/util/http.go
index 53e3c4b36c2..b18f9a00c6a 100644
--- a/client/go/internal/util/http.go
+++ b/client/go/internal/util/http.go
@@ -7,16 +7,17 @@ import (
"net/http"
"time"
- "github.com/vespa-engine/vespa/client/go/internal/cli/build"
+ "github.com/vespa-engine/vespa/client/go/internal/build"
)
type HTTPClient interface {
Do(request *http.Request, timeout time.Duration) (response *http.Response, error error)
- UseCertificate(certificate []tls.Certificate)
+ Transport() *http.Transport
}
type defaultHTTPClient struct {
- client *http.Client
+ client *http.Client
+ transport *http.Transport
}
func (c *defaultHTTPClient) Do(request *http.Request, timeout time.Duration) (response *http.Response, error error) {
@@ -30,13 +31,24 @@ func (c *defaultHTTPClient) Do(request *http.Request, timeout time.Duration) (re
return c.client.Do(request)
}
-func (c *defaultHTTPClient) UseCertificate(certificates []tls.Certificate) {
- c.client.Transport = &http.Transport{TLSClientConfig: &tls.Config{
+func (c *defaultHTTPClient) Transport() *http.Transport { return c.transport }
+
+func SetCertificate(client HTTPClient, certificates []tls.Certificate) {
+ client.Transport().TLSClientConfig = &tls.Config{
Certificates: certificates,
MinVersion: tls.VersionTLS12,
- }}
+ }
}
func CreateClient(timeout time.Duration) HTTPClient {
- return &defaultHTTPClient{client: &http.Client{Timeout: timeout}}
+ transport := http.Transport{
+ ForceAttemptHTTP2: true,
+ }
+ return &defaultHTTPClient{
+ client: &http.Client{
+ Timeout: timeout,
+ Transport: &transport,
+ },
+ transport: &transport,
+ }
}
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
new file mode 100644
index 00000000000..fa15a8a1223
--- /dev/null
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -0,0 +1,63 @@
+package document
+
+import (
+ "fmt"
+ "sync"
+)
+
+// Dispatcher dispatches documents from a queue to a Feeder.
+type Dispatcher struct {
+ concurrencyLevel int
+ feeder Feeder
+ pending chan Document
+ closed bool
+ mu sync.RWMutex
+ wg sync.WaitGroup
+}
+
+func NewDispatcher(feeder Feeder, concurrencyLevel int) *Dispatcher {
+ if concurrencyLevel < 1 {
+ concurrencyLevel = 1
+ }
+ d := &Dispatcher{
+ concurrencyLevel: concurrencyLevel,
+ feeder: feeder,
+ pending: make(chan Document, 4*concurrencyLevel),
+ }
+ d.readPending()
+ return d
+}
+
+func (d *Dispatcher) readPending() {
+ for i := 0; i < d.concurrencyLevel; i++ {
+ d.wg.Add(1)
+ go func(n int) {
+ defer d.wg.Done()
+ for doc := range d.pending {
+ d.feeder.Send(doc)
+ }
+ }(i)
+ }
+}
+
+func (d *Dispatcher) Enqueue(doc Document) error {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+ if d.closed {
+ return fmt.Errorf("cannot enqueue document in closed dispatcher")
+ }
+ d.pending <- doc
+ return nil
+}
+
+// Close closes the dispatcher and waits for all goroutines to return.
+func (d *Dispatcher) Close() error {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ if !d.closed {
+ d.closed = true
+ close(d.pending)
+ d.wg.Wait()
+ }
+ return nil
+}
diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go
new file mode 100644
index 00000000000..0e4876a7d4b
--- /dev/null
+++ b/client/go/internal/vespa/document/dispatcher_test.go
@@ -0,0 +1,37 @@
+package document
+
+import (
+ "encoding/json"
+ "sync"
+ "testing"
+)
+
+type mockFeeder struct {
+ documents []Document
+ mu sync.Mutex
+}
+
+func (f *mockFeeder) Send(doc Document) Result {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ f.documents = append(f.documents, doc)
+ return Result{Id: doc.Id}
+}
+
+func (f *mockFeeder) Stats() Stats { return Stats{} }
+
+func TestDispatcher(t *testing.T) {
+ feeder := &mockFeeder{}
+ dispatcher := NewDispatcher(feeder, 2)
+ docs := []Document{
+ {PutId: "id:ns:type::doc1", Fields: json.RawMessage(`{"foo": "123"}`)},
+ {PutId: "id:ns:type::doc2", Fields: json.RawMessage(`{"bar": "456"}`)},
+ }
+ for _, d := range docs {
+ dispatcher.Enqueue(d)
+ }
+ dispatcher.Close()
+ if got, want := len(feeder.documents), 2; got != want {
+ t.Errorf("got %d documents, want %d", got, want)
+ }
+}
diff --git a/client/go/internal/vespa/document/document.go b/client/go/internal/vespa/document/document.go
new file mode 100644
index 00000000000..6aeafd80005
--- /dev/null
+++ b/client/go/internal/vespa/document/document.go
@@ -0,0 +1,241 @@
+package document
+
+import (
+ "bufio"
+ "encoding/json"
+ "fmt"
+ "io"
+ "strconv"
+ "strings"
+)
+
+var asciiSpace = [256]uint8{'\t': 1, '\n': 1, '\v': 1, '\f': 1, '\r': 1, ' ': 1}
+
+type Operation int
+
+const (
+ OperationPut = iota
+ OperationUpdate
+ OperationRemove
+)
+
+// Id represents a Vespa document ID.
+type Id struct {
+ Type string
+ Namespace string
+ Number *int64
+ Group string
+ UserSpecific string
+}
+
+func (d Id) Equal(o Id) bool {
+ return d.Type == o.Type &&
+ d.Namespace == o.Namespace &&
+ ((d.Number == nil && o.Number == nil) || *d.Number == *o.Number) &&
+ d.Group == o.Group &&
+ d.UserSpecific == o.UserSpecific
+}
+
+func (d Id) String() string {
+ var sb strings.Builder
+ sb.WriteString("id:")
+ sb.WriteString(d.Namespace)
+ sb.WriteString(":")
+ sb.WriteString(d.Type)
+ sb.WriteString(":")
+ if d.Number != nil {
+ sb.WriteString("n=")
+ sb.WriteString(strconv.FormatInt(*d.Number, 10))
+ } else if d.Group != "" {
+ sb.WriteString("g=")
+ sb.WriteString(d.Group)
+ }
+ sb.WriteString(":")
+ sb.WriteString(d.UserSpecific)
+ return sb.String()
+}
+
+// ParseId parses a serialized document ID string.
+func ParseId(serialized string) (Id, error) {
+ parts := strings.SplitN(serialized, ":", 4)
+ if len(parts) < 4 || parts[0] != "id" {
+ return Id{}, parseError(serialized)
+ }
+ namespace := parts[1]
+ if namespace == "" {
+ return Id{}, parseError(serialized)
+ }
+ docType := parts[2]
+ if docType == "" {
+ return Id{}, parseError(serialized)
+ }
+ rest := strings.SplitN(parts[3], ":", 2)
+ if len(rest) < 2 {
+ return Id{}, parseError(serialized)
+ }
+ options := rest[0]
+ userSpecific := rest[1]
+ var number *int64
+ group := ""
+ if strings.HasPrefix(options, "n=") {
+ n, err := strconv.ParseInt(options[2:], 10, 64)
+ if err != nil {
+ return Id{}, parseError(serialized)
+ }
+ number = &n
+ } else if strings.HasPrefix(options, "g=") {
+ group = options[2:]
+ if len(group) == 0 {
+ return Id{}, parseError(serialized)
+ }
+ } else if options != "" {
+ return Id{}, parseError(serialized)
+ }
+ if userSpecific == "" {
+ return Id{}, parseError(serialized)
+ }
+ return Id{
+ Namespace: namespace,
+ Type: docType,
+ Number: number,
+ Group: group,
+ UserSpecific: userSpecific,
+ }, nil
+}
+
+// Document represents a Vespa document, and its operation.
+type Document struct {
+ Id Id
+ Operation Operation
+
+ IdString string `json:"id"`
+ PutId string `json:"put"`
+ UpdateId string `json:"update"`
+ RemoveId string `json:"remove"`
+ Condition string `json:"condition"`
+ Create bool `json:"create"`
+ Fields json.RawMessage `json:"fields"`
+}
+
+// Body returns the body part of this document, suitable for sending to the /document/v1 API.
+func (d Document) Body() []byte {
+ jsonObject := `{"fields":`
+ body := make([]byte, 0, len(jsonObject)+len(d.Fields)+1)
+ body = append(body, []byte(jsonObject)...)
+ body = append(body, d.Fields...)
+ body = append(body, byte('}'))
+ return body
+}
+
+// Decoder decodes documents from a JSON structure which is either an array of objects, or objects separated by newline.
+type Decoder struct {
+ buf *bufio.Reader
+ dec *json.Decoder
+ array bool
+ jsonl bool
+}
+
+func (d *Decoder) guessMode() error {
+ for !d.array && !d.jsonl {
+ b, err := d.buf.ReadByte()
+ if err != nil {
+ return err
+ }
+ // Skip leading whitespace
+ if b < 0x80 && asciiSpace[b] != 0 {
+ continue
+ }
+ switch rune(b) {
+ case '{':
+ d.jsonl = true
+ case '[':
+ d.array = true
+ default:
+ return fmt.Errorf("unexpected token: %q", string(b))
+ }
+ if err := d.buf.UnreadByte(); err != nil {
+ return err
+ }
+ if d.array {
+ // prepare for decoding objects inside array
+ if _, err := d.dec.Token(); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+func (d *Decoder) readCloseToken() error {
+ if !d.array {
+ return nil
+ }
+ _, err := d.dec.Token()
+ return err
+}
+
+func (d *Decoder) Decode() (Document, error) {
+ doc, err := d.decode()
+ if err != nil && err != io.EOF {
+ return Document{}, fmt.Errorf("invalid json at byte offset %d: %w", d.dec.InputOffset(), err)
+ }
+ return doc, err
+}
+
+func (d *Decoder) decode() (Document, error) {
+ if err := d.guessMode(); err != nil {
+ return Document{}, err
+ }
+ if !d.dec.More() {
+ err := io.EOF
+ if tokenErr := d.readCloseToken(); tokenErr != nil {
+ err = tokenErr
+ }
+ return Document{}, err
+ }
+ doc := Document{}
+ if err := d.dec.Decode(&doc); err != nil {
+ return Document{}, err
+ }
+ if err := parseDocument(&doc); err != nil {
+ return Document{}, err
+ }
+ return doc, nil
+}
+
+func NewDecoder(r io.Reader) *Decoder {
+ buf := bufio.NewReader(r)
+ return &Decoder{
+ buf: buf,
+ dec: json.NewDecoder(buf),
+ }
+}
+
+func parseDocument(d *Document) error {
+ id := ""
+ if d.IdString != "" {
+ d.Operation = OperationPut
+ id = d.IdString
+ } else if d.PutId != "" {
+ d.Operation = OperationPut
+ id = d.PutId
+ } else if d.UpdateId != "" {
+ d.Operation = OperationUpdate
+ id = d.UpdateId
+ } else if d.RemoveId != "" {
+ d.Operation = OperationRemove
+ id = d.RemoveId
+ } else {
+ return fmt.Errorf("invalid document: missing operation: %v", d)
+ }
+ docId, err := ParseId(id)
+ if err != nil {
+ return err
+ }
+ d.Id = docId
+ return nil
+}
+
+func parseError(value string) error {
+ return fmt.Errorf("invalid document: expected id:<namespace>:<document-type>:[n=<number>|g=<group>]:<user-specific>, got %q", value)
+}
diff --git a/client/go/internal/vespa/document/document_test.go b/client/go/internal/vespa/document/document_test.go
new file mode 100644
index 00000000000..478dd795dd8
--- /dev/null
+++ b/client/go/internal/vespa/document/document_test.go
@@ -0,0 +1,190 @@
+package document
+
+import (
+ "bytes"
+ "encoding/json"
+ "io"
+ "reflect"
+ "strings"
+ "testing"
+)
+
+func ptr[T any](v T) *T { return &v }
+
+func mustParseDocument(d Document) Document {
+ if err := parseDocument(&d); err != nil {
+ panic(err)
+ }
+ return d
+}
+
+func TestParseId(t *testing.T) {
+ tests := []struct {
+ in string
+ out Id
+ fail bool
+ }{
+ {"id:ns:type::user",
+ Id{
+ Namespace: "ns",
+ Type: "type",
+ UserSpecific: "user",
+ },
+ false,
+ },
+ {"id:ns:type:n=123:user",
+ Id{
+ Namespace: "ns",
+ Type: "type",
+ Number: ptr(int64(123)),
+ UserSpecific: "user",
+ },
+ false,
+ },
+ {"id:ns:type:g=foo:user",
+ Id{
+ Namespace: "ns",
+ Type: "type",
+ Group: "foo",
+ UserSpecific: "user",
+ },
+ false,
+ },
+ {"id:ns:type::user::specific",
+ Id{
+ Namespace: "ns",
+ Type: "type",
+ UserSpecific: "user::specific",
+ },
+ false,
+ },
+ {"id:ns:type:::",
+ Id{
+ Namespace: "ns",
+ Type: "type",
+ UserSpecific: ":",
+ },
+ false,
+ },
+ {"id:ns:type::n=user-specific",
+ Id{
+ Namespace: "ns",
+ Type: "type",
+ UserSpecific: "n=user-specific",
+ },
+ false,
+ },
+ {"id:ns:type::g=user-specific",
+ Id{
+ Namespace: "ns",
+ Type: "type",
+ UserSpecific: "g=user-specific",
+ },
+ false,
+ },
+ {"", Id{}, true},
+ {"foobar", Id{}, true},
+ {"idd:ns:type:user", Id{}, true},
+ {"id:ns::user", Id{}, true},
+ {"id::type:user", Id{}, true},
+ {"id:ns:type:g=:user", Id{}, true},
+ {"id:ns:type:n=:user", Id{}, true},
+ {"id:ns:type:n=foo:user", Id{}, true},
+ {"id:ns:type::", Id{}, true},
+ {"id:ns:type:user-specific:foo-bar", Id{}, true},
+ }
+ for i, tt := range tests {
+ parsed, err := ParseId(tt.in)
+ if err == nil && tt.fail {
+ t.Errorf("#%d: expected error for ParseDocumentId(%q), but got none", i, tt.in)
+ }
+ if err != nil && !tt.fail {
+ t.Errorf("#%d: got unexpected error for ParseDocumentId(%q) = (_, %v)", i, tt.in, err)
+ }
+ if !parsed.Equal(tt.out) {
+ t.Errorf("#%d: ParseDocumentId(%q) = (%s, _), want %s", i, tt.in, parsed, tt.out)
+ }
+ }
+}
+
+func feedInput(jsonl bool) string {
+ operations := []string{
+ `
+{
+ "put": "id:ns:type::doc1",
+ "fields": {"foo": "123"}
+}`,
+ `
+{
+ "put": "id:ns:type::doc2",
+ "fields": {"bar": "456"}
+}`,
+ `
+{
+ "remove": "id:ns:type::doc1"
+}
+`}
+ if jsonl {
+ return strings.Join(operations, "\n")
+ }
+ return " \n[" + strings.Join(operations, ",") + "]"
+}
+
+func testDocumentDecoder(t *testing.T, jsonLike string) {
+ t.Helper()
+ r := NewDecoder(strings.NewReader(jsonLike))
+ want := []Document{
+ mustParseDocument(Document{PutId: "id:ns:type::doc1", Fields: json.RawMessage(`{"foo": "123"}`)}),
+ mustParseDocument(Document{PutId: "id:ns:type::doc2", Fields: json.RawMessage(`{"bar": "456"}`)}),
+ mustParseDocument(Document{RemoveId: "id:ns:type::doc1", Fields: json.RawMessage(nil)}),
+ }
+ got := []Document{}
+ for {
+ doc, err := r.Decode()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ t.Fatal(err)
+ }
+ got = append(got, doc)
+ }
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf("got %+v, want %+v", got, want)
+ }
+}
+
+func TestDocumentDecoder(t *testing.T) {
+ testDocumentDecoder(t, feedInput(false))
+ testDocumentDecoder(t, feedInput(true))
+
+ jsonLike := `
+{
+ "put": "id:ns:type::doc1",
+ "fields": {"foo": "123"}
+}
+{
+ "put": "id:ns:type::doc1",
+ "fields": {"foo": "invalid
+}
+`
+ r := NewDecoder(strings.NewReader(jsonLike))
+ _, err := r.Decode() // first object is valid
+ if err != nil {
+ t.Errorf("unexpected error: %s", err)
+ }
+ _, err = r.Decode()
+ wantErr := "invalid json at byte offset 60: invalid character '\\n' in string literal"
+ if err.Error() != wantErr {
+ t.Errorf("want error %q, got %q", wantErr, err.Error())
+ }
+}
+
+func TestDocumentBody(t *testing.T) {
+ doc := Document{Fields: json.RawMessage([]byte(`{"foo": "123"}`))}
+ got := doc.Body()
+ want := []byte(`{"fields":{"foo": "123"}}`)
+ if !bytes.Equal(got, want) {
+ t.Errorf("got %q, want %q", got, want)
+ }
+}
diff --git a/client/go/internal/vespa/document/feeder.go b/client/go/internal/vespa/document/feeder.go
new file mode 100644
index 00000000000..732db051dab
--- /dev/null
+++ b/client/go/internal/vespa/document/feeder.go
@@ -0,0 +1,97 @@
+package document
+
+import (
+ "time"
+)
+
+type Status int
+
+const (
+ // StatusSuccess indicates a successful document operation.
+ StatusSuccess Status = iota
+ // StatusConditionNotMet indicates that the document operation itself was successful, but did not satisfy its
+ // test-and-set condition.
+ StatusConditionNotMet
+ // StatusVespaFailure indicates that Vespa failed to process the document operation.
+ StatusVespaFailure
+ // StatusTransportFailure indicates that there was failure in the transport layer error while sending the document
+ // operation to Vespa.
+ StatusTransportFailure
+ // StatusError is a catch-all status for any other error that might occur.
+ StatusError
+)
+
+// Result represents the result of a feeding operation.
+type Result struct {
+ Id Id
+ Status Status
+ Message string
+ Trace string
+ Err error
+}
+
+// Success returns whether status s is considered a success.
+func (s Status) Success() bool { return s == StatusSuccess || s == StatusConditionNotMet }
+
+// Stats represents the summed statistics of a feeder.
+type Stats struct {
+ Requests int64
+ Responses int64
+ ResponsesByCode map[int]int64
+ Errors int64
+ Inflight int64
+ TotalLatency time.Duration
+ MinLatency time.Duration
+ MaxLatency time.Duration
+ BytesSent int64
+ BytesRecv int64
+}
+
+func NewStats() Stats { return Stats{ResponsesByCode: make(map[int]int64)} }
+
+// AvgLatency returns the average latency for a request.
+func (s Stats) AvgLatency() time.Duration {
+ requests := s.Requests
+ if requests == 0 {
+ requests = 1
+ }
+ return s.TotalLatency / time.Duration(requests)
+}
+
+func (s Stats) Successes() int64 {
+ if s.ResponsesByCode == nil {
+ return 0
+ }
+ return s.ResponsesByCode[200]
+}
+
+// Add adds all statistics contained in other to this.
+func (s *Stats) Add(other Stats) {
+ s.Requests += other.Requests
+ s.Responses += other.Responses
+ for code, count := range other.ResponsesByCode {
+ _, ok := s.ResponsesByCode[code]
+ if ok {
+ s.ResponsesByCode[code] += count
+ } else {
+ s.ResponsesByCode[code] = count
+ }
+ }
+ s.Errors += other.Errors
+ s.Inflight += other.Inflight
+ s.TotalLatency += other.TotalLatency
+ if s.MinLatency == 0 || other.MinLatency < s.MinLatency {
+ s.MinLatency = other.MinLatency
+ }
+ if other.MaxLatency > s.MaxLatency {
+ s.MaxLatency = other.MaxLatency
+ }
+ s.BytesSent += other.BytesSent
+ s.BytesRecv += other.BytesRecv
+}
+
+// Feeder is the interface for a consumer of documents.
+type Feeder interface {
+ Send(Document) Result
+ Stats() Stats
+}
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go
new file mode 100644
index 00000000000..ad6765aecc8
--- /dev/null
+++ b/client/go/internal/vespa/document/http.go
@@ -0,0 +1,192 @@
+package document
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "net/url"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/vespa-engine/vespa/client/go/internal/util"
+)
+
+// Client represents a HTTP client for the /document/v1/ API.
+type Client struct {
+ options ClientOptions
+ httpClient util.HTTPClient
+ stats Stats
+ mu sync.Mutex
+ now func() time.Time
+}
+
+// ClientOptions specifices the configuration options of a feed client.
+type ClientOptions struct {
+ MaxConnsPerHost int
+ BaseURL string
+ Timeout time.Duration
+ Route string
+ TraceLevel *int
+}
+
+type countingReader struct {
+ reader io.Reader
+ bytesRead int64
+}
+
+func (r *countingReader) Read(p []byte) (int, error) {
+ n, err := r.reader.Read(p)
+ r.bytesRead += int64(n)
+ return n, err
+}
+
+func NewClient(options ClientOptions, httpClient util.HTTPClient) *Client {
+ c := &Client{
+ options: options,
+ httpClient: httpClient,
+ stats: NewStats(),
+ now: time.Now,
+ }
+ httpClient.Transport().MaxConnsPerHost = options.MaxConnsPerHost
+ return c
+}
+
+func (c *Client) queryParams() url.Values {
+ params := url.Values{}
+ if c.options.Timeout > 0 {
+ params.Set("timeout", strconv.FormatInt(c.options.Timeout.Milliseconds(), 10)+"ms")
+ }
+ if c.options.Route != "" {
+ params.Set("route", c.options.Route)
+ }
+ if c.options.TraceLevel != nil {
+ params.Set("tracelevel", strconv.Itoa(*c.options.TraceLevel))
+ }
+ return params
+}
+
+func urlPath(id Id) string {
+ var sb strings.Builder
+ sb.WriteString("/document/v1/")
+ sb.WriteString(url.PathEscape(id.Namespace))
+ sb.WriteString("/")
+ sb.WriteString(url.PathEscape(id.Type))
+ if id.Number != nil {
+ sb.WriteString("/number/")
+ n := uint64(*id.Number)
+ sb.WriteString(strconv.FormatUint(n, 10))
+ } else if id.Group != "" {
+ sb.WriteString("/group/")
+ sb.WriteString(url.PathEscape(id.Group))
+ } else {
+ sb.WriteString("/docid")
+ }
+ sb.WriteString("/")
+ sb.WriteString(url.PathEscape(id.UserSpecific))
+ return sb.String()
+}
+
+func (c *Client) feedURL(d Document, queryParams url.Values) (string, *url.URL, error) {
+ u, err := url.Parse(c.options.BaseURL)
+ if err != nil {
+ return "", nil, fmt.Errorf("invalid base url: %w", err)
+ }
+ httpMethod := ""
+ switch d.Operation {
+ case OperationPut:
+ httpMethod = "POST"
+ case OperationUpdate:
+ httpMethod = "PUT"
+ case OperationRemove:
+ httpMethod = "DELETE"
+ }
+ if d.Condition != "" {
+ queryParams.Set("condition", d.Condition)
+ }
+ if d.Create {
+ queryParams.Set("create", "true")
+ }
+ u.Path = urlPath(d.Id)
+ u.RawQuery = queryParams.Encode()
+ return httpMethod, u, nil
+}
+
+// Send given document the URL configured in this client.
+func (c *Client) Send(document Document) Result {
+ start := c.now()
+ stats := NewStats()
+ stats.Requests = 1
+ defer func() {
+ latency := c.now().Sub(start)
+ stats.TotalLatency = latency
+ stats.MinLatency = latency
+ stats.MaxLatency = latency
+ c.addStats(&stats)
+ }()
+ method, url, err := c.feedURL(document, c.queryParams())
+ if err != nil {
+ stats.Errors = 1
+ return Result{Status: StatusError, Err: err}
+ }
+ body := document.Body()
+ req, err := http.NewRequest(method, url.String(), bytes.NewReader(body))
+ if err != nil {
+ stats.Errors = 1
+ return Result{Status: StatusError, Err: err}
+ }
+ resp, err := c.httpClient.Do(req, c.options.Timeout)
+ if err != nil {
+ stats.Errors = 1
+ return Result{Status: StatusTransportFailure, Err: err}
+ }
+ defer resp.Body.Close()
+ stats.Responses = 1
+ stats.ResponsesByCode = map[int]int64{
+ resp.StatusCode: 1,
+ }
+ stats.BytesSent = int64(len(body))
+ return c.createResult(document.Id, &stats, resp)
+}
+
+func (c *Client) Stats() Stats { return c.stats }
+
+func (c *Client) addStats(stats *Stats) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ c.stats.Add(*stats)
+}
+
+func (c *Client) createResult(id Id, stats *Stats, resp *http.Response) Result {
+ result := Result{Id: id}
+ switch resp.StatusCode {
+ case 200:
+ result.Status = StatusSuccess
+ case 412:
+ result.Status = StatusConditionNotMet
+ case 502, 504, 507:
+ result.Status = StatusVespaFailure
+ default:
+ result.Status = StatusTransportFailure
+ }
+ var body struct {
+ Message string `json:"message"`
+ Trace json.RawMessage `json:"trace"`
+ }
+ cr := countingReader{reader: resp.Body}
+ jsonDec := json.NewDecoder(&cr)
+ if err := jsonDec.Decode(&body); err != nil {
+ result.Status = StatusError
+ result.Err = fmt.Errorf("failed to decode json response: %w", err)
+ }
+ result.Message = body.Message
+ result.Trace = string(body.Trace)
+ stats.BytesRecv = cr.bytesRead
+ if !result.Status.Success() {
+ stats.Errors = 1
+ }
+ return result
+}
diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go
new file mode 100644
index 00000000000..ca59c4c2af0
--- /dev/null
+++ b/client/go/internal/vespa/document/http_test.go
@@ -0,0 +1,190 @@
+package document
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "net/url"
+ "reflect"
+ "testing"
+ "time"
+
+ "github.com/vespa-engine/vespa/client/go/internal/mock"
+)
+
+type manualClock struct {
+ t time.Time
+ tick time.Duration
+}
+
+func (c *manualClock) now() time.Time {
+ t := c.t
+ c.t = c.t.Add(c.tick)
+ return t
+}
+
+func TestClientSend(t *testing.T) {
+ docs := []Document{
+ mustParseDocument(Document{Create: true, UpdateId: "id:ns:type::doc1", Fields: json.RawMessage(`{"foo": "123"}`)}),
+ mustParseDocument(Document{Create: true, UpdateId: "id:ns:type::doc2", Fields: json.RawMessage(`{"foo": "456"}`)}),
+ mustParseDocument(Document{Create: true, UpdateId: "id:ns:type::doc3", Fields: json.RawMessage(`{"baz": "789"}`)}),
+ }
+ httpClient := mock.HTTPClient{}
+ client := NewClient(ClientOptions{
+ BaseURL: "https://example.com:1337",
+ Timeout: time.Duration(5 * time.Second),
+ }, &httpClient)
+ clock := manualClock{t: time.Now(), tick: time.Second}
+ client.now = clock.now
+ for i, doc := range docs {
+ if i < 2 {
+ httpClient.NextResponseString(200, `{"message":"All good!"}`)
+ } else {
+ httpClient.NextResponseString(502, `{"message":"Good bye, cruel world!"}`)
+ }
+ res := client.Send(doc)
+ if res.Err != nil {
+ t.Fatalf("got unexpected error %q", res.Err)
+ }
+ r := httpClient.LastRequest
+ if r.Method != http.MethodPut {
+ t.Errorf("got r.Method = %q, want %q", r.Method, http.MethodPut)
+ }
+ wantURL := fmt.Sprintf("https://example.com:1337/document/v1/ns/type/docid/%s?create=true&timeout=5000ms", doc.Id.UserSpecific)
+ if r.URL.String() != wantURL {
+ t.Errorf("got r.URL = %q, want %q", r.URL, wantURL)
+ }
+ body, err := io.ReadAll(r.Body)
+ if err != nil {
+ t.Fatalf("got unexpected error %q", err)
+ }
+ wantBody := doc.Body()
+ if !bytes.Equal(body, wantBody) {
+ t.Errorf("got r.Body = %q, want %q", string(body), string(wantBody))
+ }
+ }
+ stats := client.Stats()
+ want := Stats{
+ Requests: 3,
+ Responses: 3,
+ ResponsesByCode: map[int]int64{
+ 200: 2,
+ 502: 1,
+ },
+ Errors: 1,
+ Inflight: 0,
+ TotalLatency: 3 * time.Second,
+ MinLatency: time.Second,
+ MaxLatency: time.Second,
+ BytesSent: 75,
+ BytesRecv: 82,
+ }
+ if !reflect.DeepEqual(want, stats) {
+ t.Errorf("got %+v, want %+v", stats, want)
+ }
+}
+
+func TestURLPath(t *testing.T) {
+ tests := []struct {
+ in Id
+ out string
+ }{
+ {
+ Id{
+ Namespace: "ns-with-/",
+ Type: "type-with-/",
+ UserSpecific: "user",
+ },
+ "/document/v1/ns-with-%2F/type-with-%2F/docid/user",
+ },
+ {
+ Id{
+ Namespace: "ns",
+ Type: "type",
+ Number: ptr(int64(123)),
+ UserSpecific: "user",
+ },
+ "/document/v1/ns/type/number/123/user",
+ },
+ {
+ Id{
+ Namespace: "ns",
+ Type: "type",
+ Group: "foo",
+ UserSpecific: "user",
+ },
+ "/document/v1/ns/type/group/foo/user",
+ },
+ {
+ Id{
+ Namespace: "ns",
+ Type: "type",
+ UserSpecific: "user::specific",
+ },
+ "/document/v1/ns/type/docid/user::specific",
+ },
+ {
+ Id{
+ Namespace: "ns",
+ Type: "type",
+ UserSpecific: ":",
+ },
+ "/document/v1/ns/type/docid/:",
+ },
+ }
+ for i, tt := range tests {
+ path := urlPath(tt.in)
+ if path != tt.out {
+ t.Errorf("#%d: documentPath(%q) = %s, want %s", i, tt.in, path, tt.out)
+ }
+ }
+}
+
+func TestClientFeedURL(t *testing.T) {
+ tests := []struct {
+ in Document
+ method string
+ url string
+ }{
+ {
+ mustParseDocument(Document{
+ IdString: "id:ns:type::user",
+ }),
+ "POST",
+ "https://example.com/document/v1/ns/type/docid/user?foo=ba%2Fr",
+ },
+ {
+ mustParseDocument(Document{
+ UpdateId: "id:ns:type::user",
+ Create: true,
+ Condition: "false",
+ }),
+ "PUT",
+ "https://example.com/document/v1/ns/type/docid/user?condition=false&create=true&foo=ba%2Fr",
+ },
+ {
+ mustParseDocument(Document{
+ RemoveId: "id:ns:type::user",
+ }),
+ "DELETE",
+ "https://example.com/document/v1/ns/type/docid/user?foo=ba%2Fr",
+ },
+ }
+ httpClient := mock.HTTPClient{}
+ client := NewClient(ClientOptions{
+ BaseURL: "https://example.com",
+ }, &httpClient)
+ for i, tt := range tests {
+ moreParams := url.Values{}
+ moreParams.Set("foo", "ba/r")
+ method, u, err := client.feedURL(tt.in, moreParams)
+ if err != nil {
+ t.Errorf("#%d: got unexpected error = %s, want none", i, err)
+ }
+ if u.String() != tt.url || method != tt.method {
+ t.Errorf("#%d: URL() = (%s, %s), want (%s, %s)", i, method, u.String(), tt.method, tt.url)
+ }
+ }
+}
diff --git a/client/go/internal/vespa/target.go b/client/go/internal/vespa/target.go
index 446b02c05cf..0e173175720 100644
--- a/client/go/internal/vespa/target.go
+++ b/client/go/internal/vespa/target.go
@@ -43,7 +43,8 @@ type Service struct {
BaseURL string
Name string
TLSOptions TLSOptions
- ztsClient ztsClient
+
+ zts zts
httpClient util.HTTPClient
}
@@ -91,11 +92,8 @@ type LogOptions struct {
// Do sends request to this service. Any required authentication happens automatically.
func (s *Service) Do(request *http.Request, timeout time.Duration) (*http.Response, error) {
- if s.TLSOptions.KeyPair.Certificate != nil {
- s.httpClient.UseCertificate([]tls.Certificate{s.TLSOptions.KeyPair})
- }
if s.TLSOptions.AthenzDomain != "" {
- accessToken, err := s.ztsClient.AccessToken(s.TLSOptions.AthenzDomain, s.TLSOptions.KeyPair)
+ accessToken, err := s.zts.AccessToken(s.TLSOptions.AthenzDomain, s.TLSOptions.KeyPair)
if err != nil {
return nil, err
}
@@ -107,6 +105,8 @@ func (s *Service) Do(request *http.Request, timeout time.Duration) (*http.Respon
return s.httpClient.Do(request, timeout)
}
+func (s *Service) Transport() *http.Transport { return s.httpClient.Transport() }
+
// Wait polls the health check of this service until it succeeds or timeout passes.
func (s *Service) Wait(timeout time.Duration) (int, error) {
url := s.BaseURL
@@ -152,7 +152,7 @@ func waitForOK(client util.HTTPClient, url string, certificate *tls.Certificate,
func wait(client util.HTTPClient, fn responseFunc, reqFn requestFunc, certificate *tls.Certificate, timeout time.Duration) (int, error) {
if certificate != nil {
- client.UseCertificate([]tls.Certificate{*certificate})
+ util.SetCertificate(client, []tls.Certificate{*certificate})
}
var (
httpErr error
diff --git a/client/go/internal/vespa/target_cloud.go b/client/go/internal/vespa/target_cloud.go
index eb9cc41014a..827d6c6a56a 100644
--- a/client/go/internal/vespa/target_cloud.go
+++ b/client/go/internal/vespa/target_cloud.go
@@ -12,18 +12,15 @@ import (
"strings"
"time"
- "github.com/vespa-engine/vespa/client/go/internal/cli/auth/auth0"
- "github.com/vespa-engine/vespa/client/go/internal/cli/auth/zts"
"github.com/vespa-engine/vespa/client/go/internal/util"
"github.com/vespa-engine/vespa/client/go/internal/version"
)
// CloudOptions configures URL and authentication for a cloud target.
type APIOptions struct {
- System System
- TLSOptions TLSOptions
- APIKey []byte
- AuthConfigPath string
+ System System
+ TLSOptions TLSOptions
+ APIKey []byte
}
// CloudDeploymentOptions configures the deployment to manage through a cloud target.
@@ -38,7 +35,8 @@ type cloudTarget struct {
deploymentOptions CloudDeploymentOptions
logOptions LogOptions
httpClient util.HTTPClient
- ztsClient ztsClient
+ zts zts
+ auth0 auth0
}
type deploymentEndpoint struct {
@@ -64,22 +62,23 @@ type logMessage struct {
Message string `json:"message"`
}
-type ztsClient interface {
+type zts interface {
AccessToken(domain string, certficiate tls.Certificate) (string, error)
}
+type auth0 interface {
+ AccessToken() (string, error)
+}
+
// CloudTarget creates a Target for the Vespa Cloud or hosted Vespa platform.
-func CloudTarget(httpClient util.HTTPClient, apiOptions APIOptions, deploymentOptions CloudDeploymentOptions, logOptions LogOptions) (Target, error) {
- ztsClient, err := zts.NewClient(zts.DefaultURL, httpClient)
- if err != nil {
- return nil, err
- }
+func CloudTarget(httpClient util.HTTPClient, ztsClient zts, auth0Client auth0, apiOptions APIOptions, deploymentOptions CloudDeploymentOptions, logOptions LogOptions) (Target, error) {
return &cloudTarget{
httpClient: httpClient,
apiOptions: apiOptions,
deploymentOptions: deploymentOptions,
logOptions: logOptions,
- ztsClient: ztsClient,
+ zts: ztsClient,
+ auth0: auth0Client,
}, nil
}
@@ -119,13 +118,14 @@ func (t *cloudTarget) IsCloud() bool { return true }
func (t *cloudTarget) Deployment() Deployment { return t.deploymentOptions.Deployment }
func (t *cloudTarget) Service(name string, timeout time.Duration, runID int64, cluster string) (*Service, error) {
+ var service *Service
switch name {
case DeployService:
- service := &Service{
+ service = &Service{
Name: name,
BaseURL: t.apiOptions.System.URL,
TLSOptions: t.apiOptions.TLSOptions,
- ztsClient: t.ztsClient,
+ zts: t.zts,
httpClient: t.httpClient,
}
if timeout > 0 {
@@ -137,7 +137,6 @@ func (t *cloudTarget) Service(name string, timeout time.Duration, runID int64, c
return nil, fmt.Errorf("got status %d from deploy service at %s", status, service.BaseURL)
}
}
- return service, nil
case QueryService, DocumentService:
if t.deploymentOptions.ClusterURLs == nil {
if err := t.waitForEndpoints(timeout, runID); err != nil {
@@ -149,15 +148,22 @@ func (t *cloudTarget) Service(name string, timeout time.Duration, runID int64, c
return nil, err
}
t.deploymentOptions.TLSOptions.AthenzDomain = t.apiOptions.System.AthenzDomain
- return &Service{
+ service = &Service{
Name: name,
BaseURL: url,
TLSOptions: t.deploymentOptions.TLSOptions,
- ztsClient: t.ztsClient,
+ zts: t.zts,
httpClient: t.httpClient,
- }, nil
+ }
+
+ default:
+ return nil, fmt.Errorf("unknown service: %s", name)
+
+ }
+ if service.TLSOptions.KeyPair.Certificate != nil {
+ util.SetCertificate(service, []tls.Certificate{service.TLSOptions.KeyPair})
}
- return nil, fmt.Errorf("unknown service: %s", name)
+ return service, nil
}
func (t *cloudTarget) SignRequest(req *http.Request, keyID string) error {
@@ -207,11 +213,7 @@ func (t *cloudTarget) CheckVersion(clientVersion version.Version) error {
}
func (t *cloudTarget) addAuth0AccessToken(request *http.Request) error {
- client, err := auth0.New(t.apiOptions.AuthConfigPath, t.apiOptions.System.Name, t.apiOptions.System.URL)
- if err != nil {
- return err
- }
- accessToken, err := client.GetAccessToken()
+ accessToken, err := t.auth0.AccessToken()
if err != nil {
return err
}
diff --git a/client/go/internal/vespa/target_test.go b/client/go/internal/vespa/target_test.go
index f7731611deb..4f2e361fb39 100644
--- a/client/go/internal/vespa/target_test.go
+++ b/client/go/internal/vespa/target_test.go
@@ -164,6 +164,8 @@ func createCloudTarget(t *testing.T, url string, logWriter io.Writer) Target {
target, err := CloudTarget(
util.CreateClient(time.Second*10),
+ &mockZTS{},
+ &mockAuth0{},
APIOptions{APIKey: apiKey, System: PublicSystem},
CloudDeploymentOptions{
Deployment: Deployment{
@@ -179,7 +181,7 @@ func createCloudTarget(t *testing.T, url string, logWriter io.Writer) Target {
}
if ct, ok := target.(*cloudTarget); ok {
ct.apiOptions.System.URL = url
- ct.ztsClient = &mockZTSClient{token: "foo bar"}
+ ct.zts = &mockZTS{token: "foo bar"}
} else {
t.Fatalf("Wrong target type %T", ct)
}
@@ -201,10 +203,14 @@ func assertServiceWait(t *testing.T, expectedStatus int, target Target, service
assert.Equal(t, expectedStatus, status)
}
-type mockZTSClient struct {
- token string
-}
+type mockZTS struct{ token string }
-func (c *mockZTSClient) AccessToken(domain string, certificate tls.Certificate) (string, error) {
+func (c *mockZTS) AccessToken(domain string, certificate tls.Certificate) (string, error) {
return c.token, nil
}
+
+type mockAuth0 struct{}
+
+func (a *mockAuth0) AccessToken() (string, error) { return "", nil }
+
+func (a *mockAuth0) HasCredentials() bool { return true }