summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--client/go/internal/cli/cmd/config.go9
-rw-r--r--client/go/internal/cli/cmd/curl.go22
-rw-r--r--client/go/internal/cli/cmd/curl_test.go1
-rw-r--r--client/go/internal/cli/cmd/deploy.go40
-rw-r--r--client/go/internal/cli/cmd/deploy_test.go33
-rw-r--r--client/go/internal/cli/cmd/document.go3
-rw-r--r--client/go/internal/cli/cmd/document_test.go41
-rw-r--r--client/go/internal/cli/cmd/feed.go10
-rw-r--r--client/go/internal/cli/cmd/feed_test.go4
-rw-r--r--client/go/internal/cli/cmd/prod.go2
-rw-r--r--client/go/internal/cli/cmd/query.go3
-rw-r--r--client/go/internal/cli/cmd/query_test.go8
-rw-r--r--client/go/internal/cli/cmd/root.go61
-rw-r--r--client/go/internal/cli/cmd/status.go149
-rw-r--r--client/go/internal/cli/cmd/status_test.go193
-rw-r--r--client/go/internal/cli/cmd/test.go19
-rw-r--r--client/go/internal/cli/cmd/test_test.go25
-rw-r--r--client/go/internal/cli/cmd/testutil_test.go32
-rw-r--r--client/go/internal/cli/cmd/visit_test.go8
-rw-r--r--client/go/internal/cli/cmd/waiter.go97
-rw-r--r--client/go/internal/mock/http.go5
-rw-r--r--client/go/internal/vespa/deploy.go10
-rw-r--r--client/go/internal/vespa/deploy_test.go14
-rw-r--r--client/go/internal/vespa/log.go2
-rw-r--r--client/go/internal/vespa/target.go159
-rw-r--r--client/go/internal/vespa/target_cloud.go203
-rw-r--r--client/go/internal/vespa/target_custom.go193
-rw-r--r--client/go/internal/vespa/target_test.go338
-rw-r--r--config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java2
-rw-r--r--config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java7
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/application/validation/UriBindingsValidator.java4
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomHandlerBuilder.java4
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/clients/ContainerDocumentApi.java2
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/container/component/BindingPattern.java12
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/container/component/UserBindingPattern.java27
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java8
-rw-r--r--config-model/src/test/java/com/yahoo/vespa/model/application/validation/UriBindingsValidatorTest.java6
-rw-r--r--config-model/src/test/java/com/yahoo/vespa/model/container/xml/HandlerBuilderTest.java33
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java3
-rw-r--r--controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/configserver/Node.java17
-rw-r--r--flags/src/main/java/com/yahoo/vespa/flags/Flags.java14
-rw-r--r--metrics/pom.xml17
-rw-r--r--metrics/src/main/java/ai/vespa/metrics/VespaMetrics.java2
-rw-r--r--metrics/src/main/java/ai/vespa/metrics/set/BasicMetricSets.java23
-rw-r--r--metrics/src/main/java/ai/vespa/metrics/set/DefaultMetrics.java224
-rw-r--r--metrics/src/main/java/ai/vespa/metrics/set/DefaultVespaMetrics.java13
-rw-r--r--metrics/src/main/java/ai/vespa/metrics/set/MetricSet.java44
-rw-r--r--metrics/src/main/java/ai/vespa/metrics/set/VespaMetricSet.java19
-rw-r--r--metrics/src/test/java/ai/vespa/metrics/MetricSetTest.java (renamed from config-model/src/test/java/com/yahoo/vespa/model/admin/monitoring/MetricSetTest.java)53
-rw-r--r--metrics/src/test/java/ai/vespa/metrics/MetricTest.java (renamed from config-model/src/test/java/com/yahoo/vespa/model/admin/monitoring/MetricTest.java)2
-rw-r--r--searchlib/src/vespa/searchlib/docstore/logdatastore.cpp2
-rw-r--r--storage/src/tests/distributor/check_condition_test.cpp17
-rw-r--r--storage/src/tests/distributor/garbagecollectiontest.cpp60
-rw-r--r--storage/src/tests/distributor/putoperationtest.cpp155
-rw-r--r--storage/src/tests/distributor/removeoperationtest.cpp40
-rw-r--r--storage/src/tests/distributor/twophaseupdateoperationtest.cpp169
-rw-r--r--storage/src/tests/distributor/updateoperationtest.cpp40
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketinfo.cpp2
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketinfo.hpp2
-rw-r--r--storage/src/vespa/storage/distributor/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/distributor/activecopy.cpp1
-rw-r--r--storage/src/vespa/storage/distributor/cancelled_replicas_pruner.cpp22
-rw-r--r--storage/src/vespa/storage/distributor/cancelled_replicas_pruner.h17
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp1
-rw-r--r--storage/src/vespa/storage/distributor/operationowner.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/distributor/operations/cancel_scope.cpp52
-rw-r--r--storage/src/vespa/storage/distributor/operations/cancel_scope.h62
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/check_condition.cpp13
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/check_condition.h4
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.cpp13
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp12
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removeoperation.h1
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp1
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp76
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h21
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp7
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/updateoperation.h1
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp25
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h11
-rw-r--r--storage/src/vespa/storage/distributor/operations/operation.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/operations/operation.h26
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.cpp38
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.h32
-rw-r--r--storage/src/vespa/storage/distributor/sentmessagemap.h12
-rw-r--r--storage/src/vespa/storage/distributor/statechecker.cpp1
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp1
-rw-r--r--storage/src/vespa/storage/persistence/processallhandler.cpp1
-rw-r--r--vespalib/src/tests/alloc/alloc_test.cpp6
-rw-r--r--vespalib/src/vespa/vespalib/util/mmap_file_allocator.cpp2
91 files changed, 2322 insertions, 858 deletions
diff --git a/client/go/internal/cli/cmd/config.go b/client/go/internal/cli/cmd/config.go
index 0a03686dd33..2ebd6b0793e 100644
--- a/client/go/internal/cli/cmd/config.go
+++ b/client/go/internal/cli/cmd/config.go
@@ -52,7 +52,7 @@ most to least preferred:
3. Global config value
4. Default value
-The following flags/options can be configured:
+The following global flags/options can be configured:
application
@@ -96,13 +96,6 @@ e.g. vespa deploy or vespa query. Possible values are:
- hosted: Connect to hosted Vespa (internal platform)
- *url*: Connect to a platform running at given URL.
-wait
-
-Specifies the number of seconds to wait for a service to become ready or
-deployment to complete. Use this to have a potentially long-running command
-block until the operation is complete, e.g. with vespa deploy. Defaults to 0
-(no waiting)
-
zone
Specifies a custom dev or perf zone to use when connecting to a Vespa platform.
diff --git a/client/go/internal/cli/cmd/curl.go b/client/go/internal/cli/cmd/curl.go
index 3009cab2b5e..44540db9ccf 100644
--- a/client/go/internal/cli/cmd/curl.go
+++ b/client/go/internal/cli/cmd/curl.go
@@ -39,7 +39,17 @@ $ vespa curl -- -v --data-urlencode "yql=select * from music where album contain
if err != nil {
return err
}
- service, err := target.Service(curlService, time.Duration(waitSecs)*time.Second, 0, cli.config.cluster())
+ var service *vespa.Service
+ useDeploy := curlService == "deploy"
+ waiter := cli.waiter(false, time.Duration(waitSecs)*time.Second)
+ if useDeploy {
+ if cli.config.cluster() != "" {
+ return fmt.Errorf("cannot specify cluster for service %s", curlService)
+ }
+ service, err = target.DeployService()
+ } else {
+ service, err = waiter.Service(target, cli.config.cluster())
+ }
if err != nil {
return err
}
@@ -49,19 +59,15 @@ $ vespa curl -- -v --data-urlencode "yql=select * from music where album contain
if err != nil {
return err
}
- switch curlService {
- case vespa.DeployService:
+ if useDeploy {
if err := addAccessToken(c, target); err != nil {
return err
}
- case vespa.DocumentService, vespa.QueryService:
+ } else {
c.CaCertificate = service.TLSOptions.CACertificateFile
c.PrivateKey = service.TLSOptions.PrivateKeyFile
c.Certificate = service.TLSOptions.CertificateFile
- default:
- return fmt.Errorf("service not found: %s", curlService)
}
-
if dryRun {
log.Print(c.String())
} else {
@@ -73,7 +79,7 @@ $ vespa curl -- -v --data-urlencode "yql=select * from music where album contain
},
}
cmd.Flags().BoolVarP(&dryRun, "dry-run", "n", false, "Print the curl command that would be executed")
- cmd.Flags().StringVarP(&curlService, "service", "s", "query", "Which service to query. Must be \"deploy\", \"document\" or \"query\"")
+ cmd.Flags().StringVarP(&curlService, "service", "s", "container", "Which service to query. Must be \"deploy\" or \"container\"")
cli.bindWaitFlag(cmd, 60, &waitSecs)
return cmd
}
diff --git a/client/go/internal/cli/cmd/curl_test.go b/client/go/internal/cli/cmd/curl_test.go
index 3eca0726bb4..520cf41e308 100644
--- a/client/go/internal/cli/cmd/curl_test.go
+++ b/client/go/internal/cli/cmd/curl_test.go
@@ -14,7 +14,6 @@ func TestCurl(t *testing.T) {
cli.Environment["VESPA_CLI_ENDPOINTS"] = "{\"endpoints\":[{\"cluster\":\"container\",\"url\":\"http://127.0.0.1:8080\"}]}"
assert.Nil(t, cli.Run("config", "set", "application", "t1.a1.i1"))
assert.Nil(t, cli.Run("config", "set", "target", "cloud"))
- assert.Nil(t, cli.Run("config", "set", "cluster", "container"))
assert.Nil(t, cli.Run("auth", "api-key"))
assert.Nil(t, cli.Run("auth", "cert", "--no-add"))
diff --git a/client/go/internal/cli/cmd/deploy.go b/client/go/internal/cli/cmd/deploy.go
index ef32d7f01b7..cf2e435fea5 100644
--- a/client/go/internal/cli/cmd/deploy.go
+++ b/client/go/internal/cli/cmd/deploy.go
@@ -25,7 +25,7 @@ func newDeployCmd(cli *CLI) *cobra.Command {
copyCert bool
)
cmd := &cobra.Command{
- Use: "deploy [application-directory]",
+ Use: "deploy [application-directory-or-file]",
Short: "Deploy (prepare and activate) an application package",
Long: `Deploy (prepare and activate) an application package.
@@ -73,8 +73,12 @@ $ vespa deploy -t cloud -z perf.aws-us-east-1c`,
return err
}
}
+ waiter := cli.waiter(false, timeout)
+ if _, err := waiter.DeployService(target); err != nil {
+ return err
+ }
var result vespa.PrepareResult
- if err := cli.spinner(cli.Stderr, "Uploading application package ...", func() error {
+ if err := cli.spinner(cli.Stderr, "Uploading application package...", func() error {
result, err = vespa.Deploy(opts)
return err
}); err != nil {
@@ -84,18 +88,18 @@ $ vespa deploy -t cloud -z perf.aws-us-east-1c`,
if opts.Target.IsCloud() {
cli.printSuccess("Triggered deployment of ", color.CyanString(pkg.Path), " with run ID ", color.CyanString(strconv.FormatInt(result.ID, 10)))
} else {
- cli.printSuccess("Deployed ", color.CyanString(pkg.Path))
+ cli.printSuccess("Deployed ", color.CyanString(pkg.Path), " with session ID ", color.CyanString(strconv.FormatInt(result.ID, 10)))
printPrepareLog(cli.Stderr, result)
}
if opts.Target.IsCloud() {
- log.Printf("\nUse %s for deployment status, or follow this deployment at", color.CyanString("vespa status"))
+ log.Printf("\nUse %s for deployment status, or follow this deployment at", color.CyanString("vespa status deployment"))
log.Print(color.CyanString(fmt.Sprintf("%s/tenant/%s/application/%s/%s/instance/%s/job/%s-%s/run/%d",
opts.Target.Deployment().System.ConsoleURL,
opts.Target.Deployment().Application.Tenant, opts.Target.Deployment().Application.Application, opts.Target.Deployment().Zone.Environment,
opts.Target.Deployment().Application.Instance, opts.Target.Deployment().Zone.Environment, opts.Target.Deployment().Zone.Region,
result.ID)))
}
- return waitForQueryService(cli, target, result.ID, timeout)
+ return waitForDeploymentReady(cli, target, result.ID, timeout)
},
}
cmd.Flags().StringVarP(&logLevelArg, "log-level", "l", "error", `Log level for Vespa logs. Must be "error", "warning", "info" or "debug"`)
@@ -107,7 +111,7 @@ $ vespa deploy -t cloud -z perf.aws-us-east-1c`,
func newPrepareCmd(cli *CLI) *cobra.Command {
return &cobra.Command{
- Use: "prepare application-directory",
+ Use: "prepare [application-directory-or-file]",
Short: "Prepare an application package for activation",
Args: cobra.MaximumNArgs(1),
DisableAutoGenTag: true,
@@ -123,7 +127,7 @@ func newPrepareCmd(cli *CLI) *cobra.Command {
}
opts := vespa.DeploymentOptions{ApplicationPackage: pkg, Target: target}
var result vespa.PrepareResult
- err = cli.spinner(cli.Stderr, "Uploading application package ...", func() error {
+ err = cli.spinner(cli.Stderr, "Uploading application package...", func() error {
result, err = vespa.Prepare(opts)
return err
})
@@ -149,10 +153,6 @@ func newActivateCmd(cli *CLI) *cobra.Command {
DisableAutoGenTag: true,
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
- pkg, err := cli.applicationPackageFrom(args, true)
- if err != nil {
- return fmt.Errorf("could not find application package: %w", err)
- }
sessionID, err := cli.config.readSessionID(vespa.DefaultApplication)
if err != nil {
return fmt.Errorf("could not read session id: %w", err)
@@ -162,24 +162,32 @@ func newActivateCmd(cli *CLI) *cobra.Command {
return err
}
timeout := time.Duration(waitSecs) * time.Second
- opts := vespa.DeploymentOptions{ApplicationPackage: pkg, Target: target, Timeout: timeout}
+ waiter := cli.waiter(false, timeout)
+ if _, err := waiter.DeployService(target); err != nil {
+ return err
+ }
+ opts := vespa.DeploymentOptions{Target: target, Timeout: timeout}
err = vespa.Activate(sessionID, opts)
if err != nil {
return err
}
- cli.printSuccess("Activated ", color.CyanString(pkg.Path), " with session ", sessionID)
- return waitForQueryService(cli, target, sessionID, timeout)
+ cli.printSuccess("Activated application with session ", sessionID)
+ return waitForDeploymentReady(cli, target, sessionID, timeout)
},
}
cli.bindWaitFlag(cmd, 60, &waitSecs)
return cmd
}
-func waitForQueryService(cli *CLI, target vespa.Target, sessionOrRunID int64, timeout time.Duration) error {
+func waitForDeploymentReady(cli *CLI, target vespa.Target, sessionOrRunID int64, timeout time.Duration) error {
if timeout == 0 {
return nil
}
- _, err := cli.service(target, vespa.QueryService, sessionOrRunID, cli.config.cluster(), timeout)
+ waiter := cli.waiter(false, timeout)
+ if _, err := waiter.Deployment(target, sessionOrRunID); err != nil {
+ return err
+ }
+ _, err := waiter.Services(target)
return err
}
diff --git a/client/go/internal/cli/cmd/deploy_test.go b/client/go/internal/cli/cmd/deploy_test.go
index 16aa3fd0ed8..d578b2a4629 100644
--- a/client/go/internal/cli/cmd/deploy_test.go
+++ b/client/go/internal/cli/cmd/deploy_test.go
@@ -6,6 +6,7 @@ package cmd
import (
"bytes"
+ "io"
"path/filepath"
"strconv"
"strings"
@@ -61,6 +62,32 @@ Hint: Pass --add-cert to use the certificate of the current application
assert.Contains(t, stdout.String(), "Success: Triggered deployment")
}
+func TestDeployWait(t *testing.T) {
+ cli, stdout, _ := newTestCLI(t)
+ client := &mock.HTTPClient{}
+ cli.httpClient = client
+ cli.retryInterval = 0
+ pkg := "testdata/applications/withSource/src/main/application"
+ // Deploy service is initially unavailable
+ client.NextResponseError(io.EOF)
+ client.NextStatus(500)
+ client.NextStatus(500)
+ // ... then becomes healthy
+ client.NextStatus(200)
+ // Deployment succeeds
+ client.NextResponse(mock.HTTPResponse{
+ URI: "/application/v2/tenant/default/prepareandactivate",
+ Status: 200,
+ Body: []byte(`{"session-id": "1"}`),
+ })
+ mockServiceStatus(client, "foo") // Wait for deployment
+ mockServiceStatus(client, "foo") // Look up services
+ assert.Nil(t, cli.Run("deploy", "--wait=3", pkg))
+ assert.Equal(t,
+ "\nSuccess: Deployed "+pkg+" with session ID 1\n",
+ stdout.String())
+}
+
func TestPrepareZip(t *testing.T) {
assertPrepare("testdata/applications/withTarget/target/application.zip",
[]string{"prepare", "testdata/applications/withTarget/target/application.zip"}, t)
@@ -85,7 +112,7 @@ func TestDeployZipWithURLTargetArgument(t *testing.T) {
cli.httpClient = client
assert.Nil(t, cli.Run(arguments...))
assert.Equal(t,
- "\nSuccess: Deployed "+applicationPackage+"\n",
+ "\nSuccess: Deployed "+applicationPackage+" with session ID 0\n",
stdout.String())
assertDeployRequestMade("http://target:19071", client, t)
}
@@ -161,7 +188,7 @@ func assertDeploy(applicationPackage string, arguments []string, t *testing.T) {
cli.httpClient = client
assert.Nil(t, cli.Run(arguments...))
assert.Equal(t,
- "\nSuccess: Deployed "+applicationPackage+"\n",
+ "\nSuccess: Deployed "+applicationPackage+" with session ID 0\n",
stdout.String())
assertDeployRequestMade("http://127.0.0.1:19071", client, t)
}
@@ -194,7 +221,7 @@ func assertActivate(applicationPackage string, arguments []string, t *testing.T)
}
assert.Nil(t, cli.Run(arguments...))
assert.Equal(t,
- "Success: Activated "+applicationPackage+" with session 42\n",
+ "Success: Activated application with session 42\n",
stdout.String())
url := "http://127.0.0.1:19071/application/v2/tenant/default/session/42/active"
assert.Equal(t, url, client.LastRequest.URL.String())
diff --git a/client/go/internal/cli/cmd/document.go b/client/go/internal/cli/cmd/document.go
index 1e5d1c30f6e..6c46baa297a 100644
--- a/client/go/internal/cli/cmd/document.go
+++ b/client/go/internal/cli/cmd/document.go
@@ -298,7 +298,8 @@ func documentService(cli *CLI, waitSecs int) (*vespa.Service, error) {
if err != nil {
return nil, err
}
- return cli.service(target, vespa.DocumentService, 0, cli.config.cluster(), time.Duration(waitSecs)*time.Second)
+ waiter := cli.waiter(false, time.Duration(waitSecs)*time.Second)
+ return waiter.Service(target, cli.config.cluster())
}
func printResult(cli *CLI, result util.OperationResult, payloadOnlyOnSuccess bool) error {
diff --git a/client/go/internal/cli/cmd/document_test.go b/client/go/internal/cli/cmd/document_test.go
index bce81da91c5..64319296299 100644
--- a/client/go/internal/cli/cmd/document_test.go
+++ b/client/go/internal/cli/cmd/document_test.go
@@ -79,7 +79,7 @@ func TestDocumentRemoveWithoutIdArgVerbose(t *testing.T) {
func TestDocumentSendMissingId(t *testing.T) {
cli, _, stderr := newTestCLI(t)
- assert.NotNil(t, cli.Run("document", "put", "testdata/A-Head-Full-of-Dreams-Without-Operation.json"))
+ assert.NotNil(t, cli.Run("-t", "http://127.0.0.1:8080", "document", "put", "testdata/A-Head-Full-of-Dreams-Without-Operation.json"))
assert.Equal(t,
"Error: no document id given neither as argument or as a 'put', 'update' or 'remove' key in the JSON file\n",
stderr.String())
@@ -87,7 +87,7 @@ func TestDocumentSendMissingId(t *testing.T) {
func TestDocumentSendWithDisagreeingOperations(t *testing.T) {
cli, _, stderr := newTestCLI(t)
- assert.NotNil(t, cli.Run("document", "update", "testdata/A-Head-Full-of-Dreams-Put.json"))
+ assert.NotNil(t, cli.Run("-t", "http://127.0.0.1:8080", "document", "update", "testdata/A-Head-Full-of-Dreams-Put.json"))
assert.Equal(t,
"Error: wanted document operation is update, but JSON file specifies put\n",
stderr.String())
@@ -110,20 +110,20 @@ func TestDocumentGet(t *testing.T) {
"id:mynamespace:music::a-head-full-of-dreams", t)
}
-func assertDocumentSend(arguments []string, expectedOperation string, expectedMethod string, expectedDocumentId string, expectedPayloadFile string, t *testing.T) {
+func assertDocumentSend(args []string, expectedOperation string, expectedMethod string, expectedDocumentId string, expectedPayloadFile string, t *testing.T) {
+ t.Helper()
client := &mock.HTTPClient{}
cli, stdout, stderr := newTestCLI(t)
cli.httpClient = client
- documentURL, err := documentServiceURL(client)
- if err != nil {
- t.Fatal(err)
- }
+ documentURL := "http://127.0.0.1:8080"
expectedPath, _ := vespa.IdToURLPath(expectedDocumentId)
expectedURL := documentURL + "/document/v1/" + expectedPath + "?timeout=60000ms"
- assert.Nil(t, cli.Run(arguments...))
+ finalArgs := []string{"-t", documentURL}
+ finalArgs = append(finalArgs, args...)
+ assert.Nil(t, cli.Run(finalArgs...))
verbose := false
- for _, a := range arguments {
+ for _, a := range args {
if a == "-v" {
verbose = true
}
@@ -154,16 +154,15 @@ func assertDocumentSend(arguments []string, expectedOperation string, expectedMe
}
}
-func assertDocumentGet(arguments []string, documentId string, t *testing.T) {
+func assertDocumentGet(args []string, documentId string, t *testing.T) {
client := &mock.HTTPClient{}
- documentURL, err := documentServiceURL(client)
- if err != nil {
- t.Fatal(err)
- }
+ documentURL := "http://127.0.0.1:8080"
client.NextResponseString(200, "{\"fields\":{\"foo\":\"bar\"}}")
cli, stdout, _ := newTestCLI(t)
cli.httpClient = client
- assert.Nil(t, cli.Run(arguments...))
+ finalArgs := []string{"-t", documentURL}
+ finalArgs = append(finalArgs, args...)
+ assert.Nil(t, cli.Run(finalArgs...))
assert.Equal(t,
`{
"fields": {
@@ -182,7 +181,7 @@ func assertDocumentTransportError(t *testing.T, errorMessage string) {
client.NextResponseError(fmt.Errorf(errorMessage))
cli, _, stderr := newTestCLI(t)
cli.httpClient = client
- assert.NotNil(t, cli.Run("document", "put",
+ assert.NotNil(t, cli.Run("-t", "http://127.0.0.1:8080", "document", "put",
"id:mynamespace:music::a-head-full-of-dreams",
"testdata/A-Head-Full-of-Dreams-Put.json"))
assert.Equal(t,
@@ -195,7 +194,7 @@ func assertDocumentError(t *testing.T, status int, errorMessage string) {
client.NextResponseString(status, errorMessage)
cli, _, stderr := newTestCLI(t)
cli.httpClient = client
- assert.NotNil(t, cli.Run("document", "put",
+ assert.NotNil(t, cli.Run("-t", "http://127.0.0.1:8080", "document", "put",
"id:mynamespace:music::a-head-full-of-dreams",
"testdata/A-Head-Full-of-Dreams-Put.json"))
assert.Equal(t,
@@ -208,14 +207,10 @@ func assertDocumentServerError(t *testing.T, status int, errorMessage string) {
client.NextResponseString(status, errorMessage)
cli, _, stderr := newTestCLI(t)
cli.httpClient = client
- assert.NotNil(t, cli.Run("document", "put",
+ assert.NotNil(t, cli.Run("-t", "http://127.0.0.1:8080", "document", "put",
"id:mynamespace:music::a-head-full-of-dreams",
"testdata/A-Head-Full-of-Dreams-Put.json"))
assert.Equal(t,
- "Error: Container (document API) at http://127.0.0.1:8080: Status "+strconv.Itoa(status)+"\n\n"+errorMessage+"\n",
+ "Error: container at http://127.0.0.1:8080: Status "+strconv.Itoa(status)+"\n\n"+errorMessage+"\n",
stderr.String())
}
-
-func documentServiceURL(client *mock.HTTPClient) (string, error) {
- return "http://127.0.0.1:8080", nil
-}
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go
index 7d4b9cc8042..8b8589baec3 100644
--- a/client/go/internal/cli/cmd/feed.go
+++ b/client/go/internal/cli/cmd/feed.go
@@ -12,7 +12,6 @@ import (
"github.com/spf13/cobra"
"github.com/vespa-engine/vespa/client/go/internal/util"
- "github.com/vespa-engine/vespa/client/go/internal/vespa"
"github.com/vespa-engine/vespa/client/go/internal/vespa/document"
)
@@ -57,17 +56,17 @@ type feedOptions struct {
func newFeedCmd(cli *CLI) *cobra.Command {
var options feedOptions
cmd := &cobra.Command{
- Use: "feed FILE [FILE]...",
+ Use: "feed json-file [json-file]...",
Short: "Feed multiple document operations to a Vespa cluster",
Long: `Feed multiple document operations to a Vespa cluster.
This command can be used to feed large amounts of documents to a Vespa cluster
efficiently.
-The contents of FILE must be either a JSON array or JSON objects separated by
+The contents of JSON-FILE must be either a JSON array or JSON objects separated by
newline (JSONL).
-If FILE is a single dash ('-'), documents will be read from standard input.
+If JSON-FILE is a single dash ('-'), documents will be read from standard input.
`,
Example: `$ vespa feed docs.jsonl moredocs.json
$ cat docs.jsonl | vespa feed -`,
@@ -108,8 +107,9 @@ func createServices(n int, timeout time.Duration, waitSecs int, cli *CLI) ([]uti
}
services := make([]util.HTTPClient, 0, n)
baseURL := ""
+ waiter := cli.waiter(false, time.Duration(waitSecs)*time.Second)
for i := 0; i < n; i++ {
- service, err := cli.service(target, vespa.DocumentService, 0, cli.config.cluster(), time.Duration(waitSecs)*time.Second)
+ service, err := waiter.Service(target, cli.config.cluster())
if err != nil {
return nil, "", err
}
diff --git a/client/go/internal/cli/cmd/feed_test.go b/client/go/internal/cli/cmd/feed_test.go
index fc2c5ec7520..84328cad5fb 100644
--- a/client/go/internal/cli/cmd/feed_test.go
+++ b/client/go/internal/cli/cmd/feed_test.go
@@ -43,7 +43,7 @@ func TestFeed(t *testing.T) {
httpClient.NextResponseString(200, `{"message":"OK"}`)
httpClient.NextResponseString(200, `{"message":"OK"}`)
- require.Nil(t, cli.Run("feed", jsonFile1, jsonFile2))
+ require.Nil(t, cli.Run("feed", "-t", "http://127.0.0.1:8080", jsonFile1, jsonFile2))
assert.Equal(t, "", stderr.String())
want := `{
@@ -113,7 +113,7 @@ func TestFeedInvalid(t *testing.T) {
jsonFile := filepath.Join(td, "docs.jsonl")
require.Nil(t, os.WriteFile(jsonFile, doc, 0644))
httpClient.NextResponseString(200, `{"message":"OK"}`)
- require.NotNil(t, cli.Run("feed", jsonFile))
+ require.NotNil(t, cli.Run("feed", "-t", "http://127.0.0.1:8080", jsonFile))
want := `{
"feeder.seconds": 3.000,
diff --git a/client/go/internal/cli/cmd/prod.go b/client/go/internal/cli/cmd/prod.go
index 3b37197340f..79a6907eef2 100644
--- a/client/go/internal/cli/cmd/prod.go
+++ b/client/go/internal/cli/cmd/prod.go
@@ -114,7 +114,7 @@ type prodDeployOptions struct {
func newProdDeployCmd(cli *CLI) *cobra.Command {
var options prodDeployOptions
cmd := &cobra.Command{
- Use: "deploy",
+ Use: "deploy [application-directory-or-file]",
Aliases: []string{"submit"}, // TODO: Remove in Vespa 9
Short: "Deploy an application to production",
Long: `Deploy an application to production.
diff --git a/client/go/internal/cli/cmd/query.go b/client/go/internal/cli/cmd/query.go
index a5b35052b11..3f849fae99e 100644
--- a/client/go/internal/cli/cmd/query.go
+++ b/client/go/internal/cli/cmd/query.go
@@ -64,7 +64,8 @@ func query(cli *CLI, arguments []string, timeoutSecs, waitSecs int, curl bool) e
if err != nil {
return err
}
- service, err := cli.service(target, vespa.QueryService, 0, cli.config.cluster(), time.Duration(waitSecs)*time.Second)
+ waiter := cli.waiter(false, time.Duration(waitSecs)*time.Second)
+ service, err := waiter.Service(target, cli.config.cluster())
if err != nil {
return err
}
diff --git a/client/go/internal/cli/cmd/query_test.go b/client/go/internal/cli/cmd/query_test.go
index 1caf6d33e70..6d5adc0508e 100644
--- a/client/go/internal/cli/cmd/query_test.go
+++ b/client/go/internal/cli/cmd/query_test.go
@@ -26,7 +26,7 @@ func TestQueryVerbose(t *testing.T) {
cli, stdout, stderr := newTestCLI(t)
cli.httpClient = client
- assert.Nil(t, cli.Run("query", "-v", "select from sources * where title contains 'foo'"))
+ assert.Nil(t, cli.Run("-t", "http://127.0.0.1:8080", "query", "-v", "select from sources * where title contains 'foo'"))
assert.Equal(t, "curl 'http://127.0.0.1:8080/search/?timeout=10s&yql=select+from+sources+%2A+where+title+contains+%27foo%27'\n", stderr.String())
assert.Equal(t, "{\n \"query\": \"result\"\n}\n", stdout.String())
}
@@ -75,7 +75,7 @@ func assertQuery(t *testing.T, expectedQuery string, query ...string) {
cli, stdout, _ := newTestCLI(t)
cli.httpClient = client
- args := []string{"query"}
+ args := []string{"-t", "http://127.0.0.1:8080", "query"}
assert.Nil(t, cli.Run(append(args, query...)...))
assert.Equal(t,
"{\n \"query\": \"result\"\n}\n",
@@ -91,7 +91,7 @@ func assertQueryError(t *testing.T, status int, errorMessage string) {
client.NextResponseString(status, errorMessage)
cli, _, stderr := newTestCLI(t)
cli.httpClient = client
- assert.NotNil(t, cli.Run("query", "yql=select from sources * where title contains 'foo'"))
+ assert.NotNil(t, cli.Run("-t", "http://127.0.0.1:8080", "query", "yql=select from sources * where title contains 'foo'"))
assert.Equal(t,
"Error: invalid query: Status "+strconv.Itoa(status)+"\n"+errorMessage+"\n",
stderr.String(),
@@ -103,7 +103,7 @@ func assertQueryServiceError(t *testing.T, status int, errorMessage string) {
client.NextResponseString(status, errorMessage)
cli, _, stderr := newTestCLI(t)
cli.httpClient = client
- assert.NotNil(t, cli.Run("query", "yql=select from sources * where title contains 'foo'"))
+ assert.NotNil(t, cli.Run("-t", "http://127.0.0.1:8080", "query", "yql=select from sources * where title contains 'foo'"))
assert.Equal(t,
"Error: Status "+strconv.Itoa(status)+" from container at 127.0.0.1:8080\n"+errorMessage+"\n",
stderr.String(),
diff --git a/client/go/internal/cli/cmd/root.go b/client/go/internal/cli/cmd/root.go
index ad42ea588b0..69fd88c1b2b 100644
--- a/client/go/internal/cli/cmd/root.go
+++ b/client/go/internal/cli/cmd/root.go
@@ -43,6 +43,13 @@ type CLI struct {
Stdout io.Writer
Stderr io.Writer
+ exec executor
+ isTerminal func() bool
+ spinner func(w io.Writer, message string, fn func() error) error
+
+ now func() time.Time
+ retryInterval time.Duration
+
cmd *cobra.Command
config *Config
version version.Version
@@ -51,10 +58,6 @@ type CLI struct {
httpClientFactory func(timeout time.Duration) util.HTTPClient
auth0Factory auth0Factory
ztsFactory ztsFactory
- exec executor
- isTerminal func() bool
- spinner func(w io.Writer, message string, fn func() error) error
- now func() time.Time
}
// ErrCLI is an error returned to the user. It wraps an exit status, a regular error and optional hints for resolving
@@ -100,7 +103,7 @@ type ztsFactory func(httpClient util.HTTPClient, domain, url string) (vespa.Auth
// New creates the Vespa CLI, writing output to stdout and stderr, and reading environment variables from environment.
func New(stdout, stderr io.Writer, environment []string) (*CLI, error) {
cmd := &cobra.Command{
- Use: "vespa command-name",
+ Use: "vespa",
Short: "The command-line tool for Vespa.ai",
Long: `The command-line tool for Vespa.ai.
@@ -134,12 +137,15 @@ For detailed description of flags and configuration, see 'vespa help config'.
Stdout: stdout,
Stderr: stderr,
- version: version,
- cmd: cmd,
+ exec: &execSubprocess{},
+ now: time.Now,
+ retryInterval: 2 * time.Second,
+
+ version: version,
+ cmd: cmd,
+
httpClient: httpClientFactory(time.Second * 10),
httpClientFactory: httpClientFactory,
- exec: &execSubprocess{},
- now: time.Now,
auth0Factory: func(httpClient util.HTTPClient, options auth0.Options) (vespa.Authenticator, error) {
return auth0.NewClient(httpClient, options)
},
@@ -267,9 +273,8 @@ func (c *CLI) configureCommands() {
prodCmd.AddCommand(newProdDeployCmd(c)) // prod deploy
rootCmd.AddCommand(prodCmd) // prod
rootCmd.AddCommand(newQueryCmd(c)) // query
- statusCmd.AddCommand(newStatusQueryCmd(c)) // status query
- statusCmd.AddCommand(newStatusDocumentCmd(c)) // status document
statusCmd.AddCommand(newStatusDeployCmd(c)) // status deploy
+ statusCmd.AddCommand(newStatusDeploymentCmd(c)) // status deployment
rootCmd.AddCommand(statusCmd) // status
rootCmd.AddCommand(newTestCmd(c)) // test
rootCmd.AddCommand(newVersionCmd(c)) // version
@@ -278,7 +283,7 @@ func (c *CLI) configureCommands() {
}
func (c *CLI) bindWaitFlag(cmd *cobra.Command, defaultSecs int, value *int) {
- desc := "Number of seconds to wait for a service to become ready. 0 to disable"
+ desc := "Number of seconds to wait for service(s) to become ready. 0 to disable"
if defaultSecs == 0 {
desc += " (default 0)"
}
@@ -296,6 +301,10 @@ func (c *CLI) printSuccess(msg ...interface{}) {
fmt.Fprintln(c.Stdout, color.GreenString("Success:"), fmt.Sprint(msg...))
}
+func (c *CLI) printInfo(msg ...interface{}) {
+ fmt.Fprintln(c.Stderr, fmt.Sprint(msg...))
+}
+
func (c *CLI) printDebug(msg ...interface{}) {
fmt.Fprintln(c.Stderr, color.CyanString("Debug:"), fmt.Sprint(msg...))
}
@@ -334,6 +343,10 @@ func (c *CLI) confirm(question string, confirmByDefault bool) (bool, error) {
}
}
+func (c *CLI) waiter(once bool, timeout time.Duration) *Waiter {
+ return &Waiter{Once: once, Timeout: timeout, cli: c}
+}
+
// target creates a target according the configuration of this CLI and given opts.
func (c *CLI) target(opts targetOptions) (vespa.Target, error) {
targetType, err := c.targetType()
@@ -402,9 +415,9 @@ func (c *CLI) createCustomTarget(targetType, customURL string) (vespa.Target, er
}
switch targetType {
case vespa.TargetLocal:
- return vespa.LocalTarget(c.httpClient, tlsOptions), nil
+ return vespa.LocalTarget(c.httpClient, tlsOptions, c.retryInterval), nil
case vespa.TargetCustom:
- return vespa.CustomTarget(c.httpClient, customURL, tlsOptions), nil
+ return vespa.CustomTarget(c.httpClient, customURL, tlsOptions, c.retryInterval), nil
default:
return nil, fmt.Errorf("invalid custom target: %s", targetType)
}
@@ -486,7 +499,7 @@ func (c *CLI) createCloudTarget(targetType string, opts targetOptions, customURL
Writer: c.Stdout,
Level: vespa.LogLevel(logLevel),
}
- return vespa.CloudTarget(c.httpClient, apiAuth, deploymentAuth, apiOptions, deploymentOptions, logOptions)
+ return vespa.CloudTarget(c.httpClient, apiAuth, deploymentAuth, apiOptions, deploymentOptions, logOptions, c.retryInterval)
}
// system returns the appropiate system for the target configured in this CLI.
@@ -504,24 +517,6 @@ func (c *CLI) system(targetType string) (vespa.System, error) {
return vespa.System{}, fmt.Errorf("no default system found for %s target", targetType)
}
-// service returns the service of given name located at target. If non-empty, cluster specifies a cluster to query. This
-// function blocks according to the wait period configured in this CLI. The parameter sessionOrRunID specifies either
-// the session ID (local target) or run ID (cloud target) to wait for.
-func (c *CLI) service(target vespa.Target, name string, sessionOrRunID int64, cluster string, timeout time.Duration) (*vespa.Service, error) {
- if timeout > 0 {
- log.Printf("Waiting up to %s for %s service to become available ...", color.CyanString(timeout.String()), color.CyanString(name))
- }
- s, err := target.Service(name, timeout, sessionOrRunID, cluster)
- if err != nil {
- err := fmt.Errorf("service '%s' is unavailable: %w", name, err)
- if target.IsCloud() {
- return nil, errHint(err, "Confirm that you're communicating with the correct zone and cluster", "The -z option controls the zone", "The -C option controls the cluster")
- }
- return nil, err
- }
- return s, nil
-}
-
// isCI returns true if running inside a continuous integration environment.
func (c *CLI) isCI() bool {
_, ok := c.Environment["CI"]
diff --git a/client/go/internal/cli/cmd/status.go b/client/go/internal/cli/cmd/status.go
index 6570aeff448..f185fde6ca1 100644
--- a/client/go/internal/cli/cmd/status.go
+++ b/client/go/internal/cli/cmd/status.go
@@ -7,6 +7,8 @@ package cmd
import (
"fmt"
"log"
+ "strconv"
+ "strings"
"time"
"github.com/fatih/color"
@@ -17,94 +19,127 @@ import (
func newStatusCmd(cli *CLI) *cobra.Command {
var waitSecs int
cmd := &cobra.Command{
- Use: "status",
- Short: "Verify that a service is ready to use (query by default)",
- Example: `$ vespa status query`,
- DisableAutoGenTag: true,
- SilenceUsage: true,
- Args: cobra.MaximumNArgs(1),
- RunE: func(cmd *cobra.Command, args []string) error {
- return printServiceStatus(cli, vespa.QueryService, waitSecs)
+ Use: "status",
+ Aliases: []string{
+ "status container",
+ "status document", // TODO: Remove on Vespa 9
+ "status query", // TODO: Remove on Vespa 9
},
- }
- cli.bindWaitFlag(cmd, 0, &waitSecs)
- return cmd
-}
-
-func newStatusQueryCmd(cli *CLI) *cobra.Command {
- var waitSecs int
- cmd := &cobra.Command{
- Use: "query",
- Short: "Verify that the query service is ready to use (default)",
- Example: `$ vespa status query`,
+ Short: "Verify that container service(s) are ready to use",
+ Example: `$ vespa status
+$ vespa status --cluster mycluster`,
DisableAutoGenTag: true,
SilenceUsage: true,
- Args: cobra.ExactArgs(0),
+ Args: cobra.MaximumNArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
- return printServiceStatus(cli, vespa.QueryService, waitSecs)
+ cluster := cli.config.cluster()
+ t, err := cli.target(targetOptions{})
+ if err != nil {
+ return err
+ }
+ waiter := cli.waiter(true, time.Duration(waitSecs)*time.Second)
+ if cluster == "" {
+ services, err := waiter.Services(t)
+ if err != nil {
+ return err
+ }
+ if len(services) == 0 {
+ return errHint(fmt.Errorf("no services exist"), "Deployment may not be ready yet", "Try 'vespa status deployment'")
+ }
+ for _, s := range services {
+ printReadyService(s, cli)
+ }
+ return nil
+ } else {
+ s, err := waiter.Service(t, cluster)
+ if err != nil {
+ return err
+ }
+ printReadyService(s, cli)
+ return nil
+ }
},
}
cli.bindWaitFlag(cmd, 0, &waitSecs)
return cmd
}
-func newStatusDocumentCmd(cli *CLI) *cobra.Command {
+func newStatusDeployCmd(cli *CLI) *cobra.Command {
var waitSecs int
cmd := &cobra.Command{
- Use: "document",
- Short: "Verify that the document service is ready to use",
- Example: `$ vespa status document`,
+ Use: "deploy",
+ Short: "Verify that the deploy service is ready to use",
+ Example: `$ vespa status deploy`,
DisableAutoGenTag: true,
SilenceUsage: true,
Args: cobra.ExactArgs(0),
RunE: func(cmd *cobra.Command, args []string) error {
- return printServiceStatus(cli, vespa.DocumentService, waitSecs)
+ t, err := cli.target(targetOptions{})
+ if err != nil {
+ return err
+ }
+ waiter := cli.waiter(true, time.Duration(waitSecs)*time.Second)
+ s, err := waiter.DeployService(t)
+ if err != nil {
+ return err
+ }
+ printReadyService(s, cli)
+ return nil
},
}
cli.bindWaitFlag(cmd, 0, &waitSecs)
return cmd
}
-func newStatusDeployCmd(cli *CLI) *cobra.Command {
+func newStatusDeploymentCmd(cli *CLI) *cobra.Command {
var waitSecs int
cmd := &cobra.Command{
- Use: "deploy",
- Short: "Verify that the deploy service is ready to use",
- Example: `$ vespa status deploy`,
+ Use: "deployment",
+ Short: "Verify that deployment has converged on latest, or given, ID",
+ Example: `$ vespa status deployment
+$ vespa status deployment -t cloud [run-id]
+$ vespa status deployment -t local [session-id]
+`,
DisableAutoGenTag: true,
SilenceUsage: true,
- Args: cobra.ExactArgs(0),
+ Args: cobra.MaximumNArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
- return printServiceStatus(cli, vespa.DeployService, waitSecs)
+ wantedID := vespa.LatestDeployment
+ if len(args) > 0 {
+ n, err := strconv.ParseInt(args[0], 10, 64)
+ if err != nil {
+ return fmt.Errorf("invalid id: %s: %w", args[0], err)
+ }
+ wantedID = n
+ }
+ t, err := cli.target(targetOptions{logLevel: "none"})
+ if err != nil {
+ return err
+ }
+ waiter := cli.waiter(true, time.Duration(waitSecs)*time.Second)
+ id, err := waiter.Deployment(t, wantedID)
+ if err != nil {
+ return err
+ }
+ if t.IsCloud() {
+ log.Printf("Deployment run %s has completed", color.CyanString(strconv.FormatInt(id, 10)))
+ log.Printf("See %s for more details", color.CyanString(fmt.Sprintf("%s/tenant/%s/application/%s/%s/instance/%s/job/%s-%s/run/%d",
+ t.Deployment().System.ConsoleURL,
+ t.Deployment().Application.Tenant, t.Deployment().Application.Application, t.Deployment().Zone.Environment,
+ t.Deployment().Application.Instance, t.Deployment().Zone.Environment, t.Deployment().Zone.Region,
+ id)))
+ } else {
+ log.Printf("Deployment is %s on config generation %s", color.GreenString("ready"), color.CyanString(strconv.FormatInt(id, 10)))
+ }
+ return nil
},
}
cli.bindWaitFlag(cmd, 0, &waitSecs)
return cmd
}
-func printServiceStatus(cli *CLI, name string, waitSecs int) error {
- t, err := cli.target(targetOptions{})
- if err != nil {
- return err
- }
- cluster := cli.config.cluster()
- s, err := cli.service(t, name, 0, cluster, 0)
- if err != nil {
- return err
- }
- // Wait explicitly
- status, err := s.Wait(time.Duration(waitSecs) * time.Second)
- clusterPart := ""
- if cluster != "" {
- clusterPart = fmt.Sprintf(" named %s", color.CyanString(cluster))
- }
- if status/100 == 2 {
- log.Print(s.Description(), clusterPart, " at ", color.CyanString(s.BaseURL), " is ", color.GreenString("ready"))
- } else {
- if err == nil {
- err = fmt.Errorf("status %d", status)
- }
- return fmt.Errorf("%s%s at %s is %s: %w", s.Description(), clusterPart, color.CyanString(s.BaseURL), color.RedString("not ready"), err)
- }
- return nil
+func printReadyService(s *vespa.Service, cli *CLI) {
+ desc := s.Description()
+ desc = strings.ToUpper(string(desc[0])) + string(desc[1:])
+ log.Print(desc, " at ", color.CyanString(s.BaseURL), " is ", color.GreenString("ready"))
}
diff --git a/client/go/internal/cli/cmd/status_test.go b/client/go/internal/cli/cmd/status_test.go
index 76efea55503..36f51ff5073 100644
--- a/client/go/internal/cli/cmd/status_test.go
+++ b/client/go/internal/cli/cmd/status_test.go
@@ -5,10 +5,12 @@
package cmd
import (
+ "io"
"testing"
"github.com/stretchr/testify/assert"
"github.com/vespa-engine/vespa/client/go/internal/mock"
+ "github.com/vespa-engine/vespa/client/go/internal/vespa"
)
func TestStatusDeployCommand(t *testing.T) {
@@ -23,63 +25,184 @@ func TestStatusDeployCommandWithLocalTarget(t *testing.T) {
assertDeployStatus("http://127.0.0.1:19071", []string{"-t", "local"}, t)
}
-func TestStatusQueryCommand(t *testing.T) {
- assertQueryStatus("http://127.0.0.1:8080", []string{}, t)
+func TestStatusCommand(t *testing.T) {
+ assertStatus("http://127.0.0.1:8080", []string{}, t)
}
-func TestStatusQueryCommandWithUrlTarget(t *testing.T) {
- assertQueryStatus("http://mycontainertarget:8080", []string{"-t", "http://mycontainertarget:8080"}, t)
+func TestStatusCommandMultiCluster(t *testing.T) {
+ client := &mock.HTTPClient{}
+ cli, stdout, stderr := newTestCLI(t)
+ cli.httpClient = client
+
+ mockServiceStatus(client)
+ assert.NotNil(t, cli.Run("status"))
+ assert.Equal(t, "Error: no services exist\nHint: Deployment may not be ready yet\nHint: Try 'vespa status deployment'\n", stderr.String())
+
+ mockServiceStatus(client, "foo", "bar")
+ assert.Nil(t, cli.Run("status"))
+ assert.Equal(t, `Container bar at http://127.0.0.1:8080 is ready
+Container foo at http://127.0.0.1:8080 is ready
+`, stdout.String())
+
+ stdout.Reset()
+ mockServiceStatus(client, "foo", "bar")
+ assert.Nil(t, cli.Run("status", "--cluster", "foo"))
+ assert.Equal(t, "Container foo at http://127.0.0.1:8080 is ready\n", stdout.String())
+}
+
+func TestStatusCommandWithUrlTarget(t *testing.T) {
+ assertStatus("http://mycontainertarget:8080", []string{"-t", "http://mycontainertarget:8080"}, t)
+}
+
+func TestStatusCommandWithLocalTarget(t *testing.T) {
+ assertStatus("http://127.0.0.1:8080", []string{"-t", "local"}, t)
+}
+
+func TestStatusError(t *testing.T) {
+ client := &mock.HTTPClient{}
+ mockServiceStatus(client, "default")
+ client.NextStatus(500)
+ cli, _, stderr := newTestCLI(t)
+ cli.httpClient = client
+ assert.NotNil(t, cli.Run("status", "container"))
+ assert.Equal(t,
+ "Error: unhealthy container default: status 500 at http://127.0.0.1:8080/ApplicationStatus: wait timed out\n",
+ stderr.String())
+
+ stderr.Reset()
+ client.NextResponseError(io.EOF)
+ assert.NotNil(t, cli.Run("status", "container", "-t", "http://example.com"))
+ assert.Equal(t,
+ "Error: unhealthy container at http://example.com/ApplicationStatus: EOF\n",
+ stderr.String())
}
-func TestStatusQueryCommandWithLocalTarget(t *testing.T) {
- assertQueryStatus("http://127.0.0.1:8080", []string{"-t", "local"}, t)
+func TestStatusLocalDeployment(t *testing.T) {
+ client := &mock.HTTPClient{}
+ cli, stdout, stderr := newTestCLI(t)
+ cli.httpClient = client
+ resp := mock.HTTPResponse{
+ URI: "/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge",
+ Status: 200,
+ }
+ // Latest generation
+ resp.Body = []byte(`{"currentGeneration": 42, "converged": true}`)
+ client.NextResponse(resp)
+ assert.Nil(t, cli.Run("status", "deployment"))
+ assert.Equal(t, "", stderr.String())
+ assert.Equal(t, "Deployment is ready on config generation 42\n", stdout.String())
+
+ // Latest generation without convergence
+ resp.Body = []byte(`{"currentGeneration": 42, "converged": false}`)
+ client.NextResponse(resp)
+ assert.NotNil(t, cli.Run("status", "deployment"))
+ assert.Equal(t, "Error: deployment not converged on latest generation after waiting 0s: wait timed out\n", stderr.String())
+
+ // Explicit generation
+ stderr.Reset()
+ client.NextResponse(resp)
+ assert.NotNil(t, cli.Run("status", "deployment", "41"))
+ assert.Equal(t, "Error: deployment not converged on generation 41 after waiting 0s: wait timed out\n", stderr.String())
}
-func TestStatusDocumentCommandWithLocalTarget(t *testing.T) {
- assertDocumentStatus("http://127.0.0.1:8080", []string{"-t", "local"}, t)
+func TestStatusCloudDeployment(t *testing.T) {
+ cli, stdout, stderr := newTestCLI(t, "CI=true")
+ app := vespa.ApplicationID{Tenant: "t1", Application: "a1", Instance: "i1"}
+ assert.Nil(t, cli.Run("config", "set", "application", app.String()))
+ assert.Nil(t, cli.Run("config", "set", "target", "cloud"))
+ assert.Nil(t, cli.Run("config", "set", "zone", "dev.us-north-1"))
+ assert.Nil(t, cli.Run("auth", "api-key"))
+ stdout.Reset()
+ client := &mock.HTTPClient{}
+ cli.httpClient = client
+ // Latest run
+ client.NextResponse(mock.HTTPResponse{
+ URI: "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1?limit=1",
+ Status: 200,
+ Body: []byte(`{"runs": [{"id": 1337}]}`),
+ })
+ client.NextResponse(mock.HTTPResponse{
+ URI: "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1/run/1337?after=-1",
+ Status: 200,
+ Body: []byte(`{"active": false, "status": "success"}`),
+ })
+ assert.Nil(t, cli.Run("status", "deployment"))
+ assert.Equal(t, "", stderr.String())
+ assert.Equal(t,
+ "Deployment run 1337 has completed\nSee https://console.vespa-cloud.com/tenant/t1/application/a1/dev/instance/i1/job/dev-us-north-1/run/1337 for more details\n",
+ stdout.String())
+ // Explicit run
+ client.NextResponse(mock.HTTPResponse{
+ URI: "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1/run/42?after=-1",
+ Status: 200,
+ Body: []byte(`{"active": false, "status": "failure"}`),
+ })
+ assert.NotNil(t, cli.Run("status", "deployment", "42"))
+ assert.Equal(t, "Error: deployment run 42 incomplete after waiting 0s: run 42 ended with unsuccessful status: failure\n", stderr.String())
}
-func TestStatusErrorResponse(t *testing.T) {
- assertQueryStatusError("http://127.0.0.1:8080", []string{}, t)
+func isLocalTarget(args []string) bool {
+ for i := 0; i < len(args)-1; i++ {
+ if args[i] == "-t" {
+ return args[i+1] == "local"
+ }
+ }
+ return true // local is default
}
-func assertDeployStatus(target string, args []string, t *testing.T) {
+func assertDeployStatus(expectedTarget string, args []string, t *testing.T) {
+ t.Helper()
client := &mock.HTTPClient{}
+ client.NextResponse(mock.HTTPResponse{
+ URI: "/status.html",
+ Status: 200,
+ })
cli, stdout, _ := newTestCLI(t)
cli.httpClient = client
statusArgs := []string{"status", "deploy"}
assert.Nil(t, cli.Run(append(statusArgs, args...)...))
assert.Equal(t,
- "Deploy API at "+target+" is ready\n",
- stdout.String(),
- "vespa status config-server")
- assert.Equal(t, target+"/status.html", client.LastRequest.URL.String())
+ "Deploy API at "+expectedTarget+" is ready\n",
+ stdout.String())
+ assert.Equal(t, expectedTarget+"/status.html", client.LastRequest.URL.String())
}
-func assertQueryStatus(target string, args []string, t *testing.T) {
+func assertStatus(expectedTarget string, args []string, t *testing.T) {
+ t.Helper()
client := &mock.HTTPClient{}
+ clusterName := ""
+ for i := 0; i < 2; i++ {
+ if isLocalTarget(args) {
+ clusterName = "foo"
+ mockServiceStatus(client, clusterName)
+ }
+ client.NextResponse(mock.HTTPResponse{URI: "/ApplicationStatus", Status: 200})
+ }
cli, stdout, _ := newTestCLI(t)
cli.httpClient = client
- statusArgs := []string{"status", "query"}
+ statusArgs := []string{"status"}
assert.Nil(t, cli.Run(append(statusArgs, args...)...))
- assert.Equal(t,
- "Container (query API) at "+target+" is ready\n",
- stdout.String(),
- "vespa status container")
- assert.Equal(t, target+"/ApplicationStatus", client.LastRequest.URL.String())
-
- statusArgs = []string{"status"}
+ prefix := "Container"
+ if clusterName != "" {
+ prefix += " " + clusterName
+ }
+ assert.Equal(t, prefix+" at "+expectedTarget+" is ready\n", stdout.String())
+ assert.Equal(t, expectedTarget+"/ApplicationStatus", client.LastRequest.URL.String())
+
+ // Test legacy command
+ statusArgs = []string{"status query"}
stdout.Reset()
assert.Nil(t, cli.Run(append(statusArgs, args...)...))
- assert.Equal(t,
- "Container (query API) at "+target+" is ready\n",
- stdout.String(),
- "vespa status (the default)")
- assert.Equal(t, target+"/ApplicationStatus", client.LastRequest.URL.String())
+ assert.Equal(t, prefix+" at "+expectedTarget+" is ready\n", stdout.String())
+ assert.Equal(t, expectedTarget+"/ApplicationStatus", client.LastRequest.URL.String())
}
func assertDocumentStatus(target string, args []string, t *testing.T) {
+ t.Helper()
client := &mock.HTTPClient{}
+ if isLocalTarget(args) {
+ mockServiceStatus(client, "default")
+ }
cli, stdout, _ := newTestCLI(t)
cli.httpClient = client
assert.Nil(t, cli.Run("status", "document"))
@@ -89,15 +212,3 @@ func assertDocumentStatus(target string, args []string, t *testing.T) {
"vespa status container")
assert.Equal(t, target+"/ApplicationStatus", client.LastRequest.URL.String())
}
-
-func assertQueryStatusError(target string, args []string, t *testing.T) {
- client := &mock.HTTPClient{}
- client.NextStatus(500)
- cli, _, stderr := newTestCLI(t)
- cli.httpClient = client
- assert.NotNil(t, cli.Run("status", "container"))
- assert.Equal(t,
- "Error: Container (query API) at "+target+" is not ready: status 500\n",
- stderr.String(),
- "vespa status container")
-}
diff --git a/client/go/internal/cli/cmd/test.go b/client/go/internal/cli/cmd/test.go
index abee760efbb..5b99973d879 100644
--- a/client/go/internal/cli/cmd/test.go
+++ b/client/go/internal/cli/cmd/test.go
@@ -79,7 +79,7 @@ func runTests(cli *CLI, rootPath string, dryRun bool, waitSecs int) (int, []stri
if err != nil {
return 0, nil, errHint(err, "See https://docs.vespa.ai/en/reference/testing")
}
- context := testContext{testsPath: rootPath, dryRun: dryRun, cli: cli}
+ context := testContext{testsPath: rootPath, dryRun: dryRun, cli: cli, clusters: map[string]*vespa.Service{}}
previousFailed := false
for _, test := range tests {
if !test.IsDir() && filepath.Ext(test.Name()) == ".json" {
@@ -100,7 +100,7 @@ func runTests(cli *CLI, rootPath string, dryRun bool, waitSecs int) (int, []stri
}
}
} else if strings.HasSuffix(stat.Name(), ".json") {
- failure, err := runTest(rootPath, testContext{testsPath: filepath.Dir(rootPath), dryRun: dryRun, cli: cli}, waitSecs)
+ failure, err := runTest(rootPath, testContext{testsPath: filepath.Dir(rootPath), dryRun: dryRun, cli: cli, clusters: map[string]*vespa.Service{}}, waitSecs)
if err != nil {
return 0, nil, err
}
@@ -216,9 +216,16 @@ func verify(step step, defaultCluster string, defaultParameters map[string]strin
if err != nil {
return "", "", err
}
- service, err = target.Service(vespa.QueryService, time.Duration(waitSecs)*time.Second, 0, cluster)
- if err != nil {
- return "", "", err
+ ok := false
+ service, ok = context.clusters[cluster]
+ if !ok {
+ // Cache service so we don't have to discover it for every step
+ waiter := context.cli.waiter(false, time.Duration(waitSecs)*time.Second)
+ service, err = waiter.Service(target, cluster)
+ if err != nil {
+ return "", "", err
+ }
+ context.clusters[cluster] = service
}
requestUrl, err = url.ParseRequestURI(service.BaseURL + requestUri)
if err != nil {
@@ -474,6 +481,8 @@ type testContext struct {
lazyTarget vespa.Target
testsPath string
dryRun bool
+ // Cache of services by their cluster name
+ clusters map[string]*vespa.Service
}
func (t *testContext) target() (vespa.Target, error) {
diff --git a/client/go/internal/cli/cmd/test_test.go b/client/go/internal/cli/cmd/test_test.go
index 5d6bb441b2a..4f8e6d49a2a 100644
--- a/client/go/internal/cli/cmd/test_test.go
+++ b/client/go/internal/cli/cmd/test_test.go
@@ -23,20 +23,26 @@ import (
func TestSuite(t *testing.T) {
client := &mock.HTTPClient{}
searchResponse, _ := os.ReadFile("testdata/tests/response.json")
+ mockServiceStatus(client, "container")
client.NextStatus(200)
client.NextStatus(200)
- for i := 0; i < 11; i++ {
+ for i := 0; i < 2; i++ {
+ client.NextResponseString(200, string(searchResponse))
+ }
+ mockServiceStatus(client, "container") // Some tests do not specify cluster, which is fine since we only have one, but this causes a cache miss
+ for i := 0; i < 9; i++ {
client.NextResponseString(200, string(searchResponse))
}
-
expectedBytes, _ := os.ReadFile("testdata/tests/expected-suite.out")
cli, stdout, stderr := newTestCLI(t)
cli.httpClient = client
assert.NotNil(t, cli.Run("test", "testdata/tests/system-test"))
-
+ assert.Equal(t, "", stderr.String())
baseUrl := "http://127.0.0.1:8080"
urlWithQuery := baseUrl + "/search/?presentation.timing=true&query=artist%3A+foo&timeout=3.4s"
- requests := []*http.Request{createFeedRequest(baseUrl), createFeedRequest(baseUrl), createSearchRequest(urlWithQuery), createSearchRequest(urlWithQuery)}
+ discoveryRequest := createSearchRequest("http://127.0.0.1:19071/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge")
+ requests := []*http.Request{discoveryRequest, createFeedRequest(baseUrl), createFeedRequest(baseUrl), createSearchRequest(urlWithQuery), createSearchRequest(urlWithQuery)}
+ requests = append(requests, discoveryRequest)
requests = append(requests, createSearchRequest(baseUrl+"/search/"))
requests = append(requests, createSearchRequest(baseUrl+"/search/?foo=%2F"))
for i := 0; i < 7; i++ {
@@ -95,6 +101,7 @@ func TestSuiteWithoutTests(t *testing.T) {
func TestSingleTest(t *testing.T) {
client := &mock.HTTPClient{}
searchResponse, _ := os.ReadFile("testdata/tests/response.json")
+ mockServiceStatus(client, "container")
client.NextStatus(200)
client.NextStatus(200)
client.NextResponseString(200, string(searchResponse))
@@ -109,7 +116,8 @@ func TestSingleTest(t *testing.T) {
baseUrl := "http://127.0.0.1:8080"
rawUrl := baseUrl + "/search/?presentation.timing=true&query=artist%3A+foo&timeout=3.4s"
- assertRequests([]*http.Request{createFeedRequest(baseUrl), createFeedRequest(baseUrl), createSearchRequest(rawUrl), createSearchRequest(rawUrl)}, client, t)
+ discoveryRequest := createSearchRequest("http://127.0.0.1:19071/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge")
+ assertRequests([]*http.Request{discoveryRequest, createFeedRequest(baseUrl), createFeedRequest(baseUrl), createSearchRequest(rawUrl), createSearchRequest(rawUrl)}, client, t)
}
func TestSingleTestWithCloudAndEndpoints(t *testing.T) {
@@ -172,12 +180,17 @@ func createRequest(method string, uri string, body string) *http.Request {
}
func assertRequests(requests []*http.Request, client *mock.HTTPClient, t *testing.T) {
+ t.Helper()
if assert.Equal(t, len(requests), len(client.Requests)) {
for i, e := range requests {
a := client.Requests[i]
assert.Equal(t, e.URL.String(), a.URL.String())
assert.Equal(t, e.Method, a.Method)
- assert.Equal(t, util.ReaderToJSON(e.Body), util.ReaderToJSON(a.Body))
+ actualBody := a.Body
+ if actualBody == nil {
+ actualBody = io.NopCloser(strings.NewReader(""))
+ }
+ assert.Equal(t, util.ReaderToJSON(e.Body), util.ReaderToJSON(actualBody))
}
}
}
diff --git a/client/go/internal/cli/cmd/testutil_test.go b/client/go/internal/cli/cmd/testutil_test.go
index 61d6c15c5a0..c16c9f8dc50 100644
--- a/client/go/internal/cli/cmd/testutil_test.go
+++ b/client/go/internal/cli/cmd/testutil_test.go
@@ -3,8 +3,10 @@ package cmd
import (
"bytes"
+ "fmt"
"net/http"
"path/filepath"
+ "strings"
"testing"
"time"
@@ -41,6 +43,36 @@ func newTestCLI(t *testing.T, envVars ...string) (*CLI, *bytes.Buffer, *bytes.Bu
return cli, &stdout, &stderr
}
+func mockServiceStatus(client *mock.HTTPClient, clusterNames ...string) {
+ var serviceObjects []string
+ for _, name := range clusterNames {
+ service := fmt.Sprintf(`{
+ "clusterName": "%s",
+ "host": "localhost",
+ "port": 8080,
+ "type": "container",
+ "url": "http://localhost:19071/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge/localhost:8080",
+ "currentGeneration": 1
+ }
+`, name)
+ serviceObjects = append(serviceObjects, service)
+ }
+ services := "[]"
+ if len(serviceObjects) > 0 {
+ services = "[" + strings.Join(serviceObjects, ",") + "]"
+ }
+ response := fmt.Sprintf(`
+{
+ "services": %s,
+ "currentGeneration": 1
+}`, services)
+ client.NextResponse(mock.HTTPResponse{
+ URI: "/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge",
+ Status: 200,
+ Body: []byte(response),
+ })
+}
+
type mockAuthenticator struct{}
func (a *mockAuthenticator) Authenticate(request *http.Request) error { return nil }
diff --git a/client/go/internal/cli/cmd/visit_test.go b/client/go/internal/cli/cmd/visit_test.go
index f85fb739370..bd806f1d9c9 100644
--- a/client/go/internal/cli/cmd/visit_test.go
+++ b/client/go/internal/cli/cmd/visit_test.go
@@ -93,10 +93,14 @@ func TestRunOneVisit(t *testing.T) {
func withMockClient(t *testing.T, prepCli func(*mock.HTTPClient), runOp func(*vespa.Service)) *http.Request {
client := &mock.HTTPClient{}
+ mockServiceStatus(client, "container")
prepCli(client)
cli, _, _ := newTestCLI(t)
cli.httpClient = client
- service, _ := documentService(cli, 0)
+ service, err := documentService(cli, 0)
+ if err != nil {
+ t.Fatal(err)
+ }
runOp(service)
return client.LastRequest
}
@@ -126,6 +130,7 @@ func TestVisitCommand(t *testing.T) {
}
func assertVisitResults(arguments []string, t *testing.T, responses []string, queryPart, output string) {
+ t.Helper()
client := &mock.HTTPClient{}
client.NextResponseString(200, handlersResponse)
client.NextResponseString(400, clusterStarResponse)
@@ -134,6 +139,7 @@ func assertVisitResults(arguments []string, t *testing.T, responses []string, qu
}
cli, stdout, stderr := newTestCLI(t)
cli.httpClient = client
+ arguments = append(arguments, "-t", "http://127.0.0.1:8080")
assert.Nil(t, cli.Run(arguments...))
assert.Equal(t, output, stdout.String())
assert.Equal(t, "", stderr.String())
diff --git a/client/go/internal/cli/cmd/waiter.go b/client/go/internal/cli/cmd/waiter.go
new file mode 100644
index 00000000000..a1919de798d
--- /dev/null
+++ b/client/go/internal/cli/cmd/waiter.go
@@ -0,0 +1,97 @@
+package cmd
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/fatih/color"
+ "github.com/vespa-engine/vespa/client/go/internal/vespa"
+)
+
+// Waiter waits for Vespa services to become ready, within a timeout.
+type Waiter struct {
+ // Once species whether we should wait at least one time, irregardless of timeout.
+ Once bool
+
+ // Timeout specifies how long we should wait for an operation to complete.
+ Timeout time.Duration // TODO(mpolden): Consider making this a budget
+
+ cli *CLI
+}
+
+func (w *Waiter) wait() bool { return w.Once || w.Timeout > 0 }
+
+// DeployService returns the service providing the deploy API on given target,
+func (w *Waiter) DeployService(target vespa.Target) (*vespa.Service, error) {
+ s, err := target.DeployService()
+ if err != nil {
+ return nil, err
+ }
+ if w.Timeout > 0 {
+ w.cli.printInfo("Waiting up to ", color.CyanString(w.Timeout.String()), " for ", s.Description(), " to become ready...")
+ }
+ if w.wait() {
+ if err := s.Wait(w.Timeout); err != nil {
+ return nil, err
+ }
+ }
+ return s, nil
+}
+
+// Service returns the service identified by cluster ID, available on target.
+func (w *Waiter) Service(target vespa.Target, cluster string) (*vespa.Service, error) {
+ targetType, err := w.cli.targetType()
+ if err != nil {
+ return nil, err
+ }
+ if targetType.url != "" && cluster != "" {
+ return nil, fmt.Errorf("cluster cannot be specified when target is an URL")
+ }
+ services, err := w.Services(target)
+ if err != nil {
+ return nil, err
+ }
+ service, err := vespa.FindService(cluster, services)
+ if err != nil {
+ return nil, errHint(err, "The --cluster option specifies the service to use")
+ }
+ if w.Timeout > 0 {
+ w.cli.printInfo("Waiting up to ", color.CyanString(w.Timeout.String()), " for ", color.CyanString(service.Description()), " to become available...")
+ }
+ if w.wait() {
+ if err := service.Wait(w.Timeout); err != nil {
+ return nil, err
+ }
+ }
+ return service, nil
+}
+
+// Services returns all container services available on target.
+func (w *Waiter) Services(target vespa.Target) ([]*vespa.Service, error) {
+ if w.Timeout > 0 {
+ w.cli.printInfo("Waiting up to ", color.CyanString(w.Timeout.String()), " for cluster discovery...")
+ }
+ services, err := target.ContainerServices(w.Timeout)
+ if err != nil {
+ return nil, err
+ }
+ for _, s := range services {
+ if w.Timeout > 0 {
+ w.cli.printInfo("Waiting up to ", color.CyanString(w.Timeout.String()), " for ", s.Description(), "...")
+ }
+ if w.wait() {
+ if err := s.Wait(w.Timeout); err != nil {
+ return nil, err
+ }
+ }
+ }
+ return services, nil
+}
+
+// Deployment waits for a deployment to become ready, returning the ID of the converged deployment.
+func (w *Waiter) Deployment(target vespa.Target, id int64) (int64, error) {
+ if w.Timeout > 0 {
+ w.cli.printInfo("Waiting up to ", color.CyanString(w.Timeout.String()), " for deployment to converge...")
+ }
+ return target.AwaitDeployment(id, w.Timeout)
+}
diff --git a/client/go/internal/mock/http.go b/client/go/internal/mock/http.go
index 3d4ead596b0..8a17448957f 100644
--- a/client/go/internal/mock/http.go
+++ b/client/go/internal/mock/http.go
@@ -2,6 +2,7 @@ package mock
import (
"bytes"
+ "fmt"
"io"
"net/http"
"strconv"
@@ -32,6 +33,7 @@ type HTTPClient struct {
}
type HTTPResponse struct {
+ URI string
Status int
Body []byte
Header http.Header
@@ -65,6 +67,9 @@ func (c *HTTPClient) Do(request *http.Request, timeout time.Duration) (*http.Res
response := HTTPResponse{Status: 200}
if len(c.nextResponses) > 0 {
response = c.nextResponses[0]
+ if response.URI != "" && response.URI != request.URL.RequestURI() {
+ return nil, fmt.Errorf("uri of response is %s, which does not match request uri %s", response.URI, request.URL.RequestURI())
+ }
c.nextResponses = c.nextResponses[1:]
}
if c.ReadBody && request.Body != nil {
diff --git a/client/go/internal/vespa/deploy.go b/client/go/internal/vespa/deploy.go
index ae4d4678d66..3a3af0d66a0 100644
--- a/client/go/internal/vespa/deploy.go
+++ b/client/go/internal/vespa/deploy.go
@@ -92,7 +92,7 @@ func (d DeploymentOptions) String() string {
}
func (d *DeploymentOptions) url(path string) (*url.URL, error) {
- service, err := d.Target.Service(DeployService, 0, 0, "")
+ service, err := d.Target.DeployService()
if err != nil {
return nil, err
}
@@ -149,7 +149,7 @@ func Prepare(deployment DeploymentOptions) (PrepareResult, error) {
return PrepareResult{}, err
}
var jsonResponse struct {
- SessionID string `json:"session-id"`
+ SessionID string `json:"session-id"` // API returns ID as string
Log []LogLinePrepareResponse `json:"log"`
}
jsonDec := json.NewDecoder(response.Body)
@@ -311,7 +311,7 @@ func Submit(opts DeploymentOptions, submission Submission) error {
}
func deployServiceDo(request *http.Request, timeout time.Duration, opts DeploymentOptions) (*http.Response, error) {
- s, err := opts.Target.Service(DeployService, 0, 0, "")
+ s, err := opts.Target.DeployService()
if err != nil {
return nil, err
}
@@ -373,7 +373,7 @@ func uploadApplicationPackage(url *url.URL, opts DeploymentOptions) (PrepareResu
if err != nil {
return PrepareResult{}, err
}
- service, err := opts.Target.Service(DeployService, opts.Timeout, 0, "")
+ service, err := opts.Target.DeployService()
if err != nil {
return PrepareResult{}, err
}
@@ -384,7 +384,7 @@ func uploadApplicationPackage(url *url.URL, opts DeploymentOptions) (PrepareResu
defer response.Body.Close()
var jsonResponse struct {
- SessionID string `json:"session-id"` // Config server
+ SessionID string `json:"session-id"` // Config server. API returns ID as string
RunID int64 `json:"run"` // Controller
Log []LogLinePrepareResponse `json:"log"`
diff --git a/client/go/internal/vespa/deploy_test.go b/client/go/internal/vespa/deploy_test.go
index c68ad750f1a..9addf81138a 100644
--- a/client/go/internal/vespa/deploy_test.go
+++ b/client/go/internal/vespa/deploy_test.go
@@ -19,7 +19,7 @@ import (
func TestDeploy(t *testing.T) {
httpClient := mock.HTTPClient{}
- target := LocalTarget(&httpClient, TLSOptions{})
+ target := LocalTarget(&httpClient, TLSOptions{}, 0)
appDir, _ := mock.ApplicationPackageDir(t, false, false)
opts := DeploymentOptions{
Target: target,
@@ -38,7 +38,7 @@ func TestDeploy(t *testing.T) {
func TestDeployCloud(t *testing.T) {
httpClient := mock.HTTPClient{}
- target := createCloudTarget(t, "http://vespacloud", io.Discard)
+ target, _ := createCloudTarget(t, io.Discard)
cloudTarget, ok := target.(*cloudTarget)
require.True(t, ok)
cloudTarget.httpClient = &httpClient
@@ -51,7 +51,7 @@ func TestDeployCloud(t *testing.T) {
require.Nil(t, err)
assert.Equal(t, 1, len(httpClient.Requests))
req := httpClient.LastRequest
- assert.Equal(t, "http://vespacloud/application/v4/tenant/t1/application/a1/instance/i1/deploy/dev-us-north-1", req.URL.String())
+ assert.Equal(t, "https://api-ctl.vespa-cloud.com:4443/application/v4/tenant/t1/application/a1/instance/i1/deploy/dev-us-north-1", req.URL.String())
values := parseMultiPart(t, req)
zipData := values["applicationZip"]
@@ -71,7 +71,7 @@ func TestDeployCloud(t *testing.T) {
func TestSubmit(t *testing.T) {
httpClient := mock.HTTPClient{}
- target := createCloudTarget(t, "http://vespacloud", io.Discard)
+ target, _ := createCloudTarget(t, io.Discard)
cloudTarget, ok := target.(*cloudTarget)
require.True(t, ok)
cloudTarget.httpClient = &httpClient
@@ -170,7 +170,7 @@ func TestFindApplicationPackage(t *testing.T) {
func TestDeactivate(t *testing.T) {
httpClient := mock.HTTPClient{}
- target := LocalTarget(&httpClient, TLSOptions{})
+ target := LocalTarget(&httpClient, TLSOptions{}, 0)
opts := DeploymentOptions{Target: target}
require.Nil(t, Deactivate(opts))
assert.Equal(t, 1, len(httpClient.Requests))
@@ -181,7 +181,7 @@ func TestDeactivate(t *testing.T) {
func TestDeactivateCloud(t *testing.T) {
httpClient := mock.HTTPClient{}
- target := createCloudTarget(t, "http://vespacloud", io.Discard)
+ target, _ := createCloudTarget(t, io.Discard)
cloudTarget, ok := target.(*cloudTarget)
require.True(t, ok)
cloudTarget.httpClient = &httpClient
@@ -190,7 +190,7 @@ func TestDeactivateCloud(t *testing.T) {
assert.Equal(t, 1, len(httpClient.Requests))
req := httpClient.LastRequest
assert.Equal(t, "DELETE", req.Method)
- assert.Equal(t, "http://vespacloud/application/v4/tenant/t1/application/a1/instance/i1/environment/dev/region/us-north-1", req.URL.String())
+ assert.Equal(t, "https://api-ctl.vespa-cloud.com:4443/application/v4/tenant/t1/application/a1/instance/i1/environment/dev/region/us-north-1", req.URL.String())
}
type pkgFixture struct {
diff --git a/client/go/internal/vespa/log.go b/client/go/internal/vespa/log.go
index 0e2cb5d0bfd..81088b8c0a1 100644
--- a/client/go/internal/vespa/log.go
+++ b/client/go/internal/vespa/log.go
@@ -72,6 +72,8 @@ func ReadLogEntries(r io.Reader) ([]LogEntry, error) {
// LogLevel returns an int representing a named log level.
func LogLevel(name string) int {
switch name {
+ case "none":
+ return -1
case "error":
return 0
case "warning":
diff --git a/client/go/internal/vespa/target.go b/client/go/internal/vespa/target.go
index 6dd64dd1275..3c65369f986 100644
--- a/client/go/internal/vespa/target.go
+++ b/client/go/internal/vespa/target.go
@@ -4,9 +4,11 @@ package vespa
import (
"crypto/tls"
+ "errors"
"fmt"
"io"
"net/http"
+ "strings"
"sync"
"time"
@@ -27,18 +29,15 @@ const (
// A hosted Vespa target
TargetHosted = "hosted"
- // A Vespa service that handles deployments, either a config server or a controller
- DeployService = "deploy"
+ // LatestDeployment waits for a deployment to converge to latest generation
+ LatestDeployment int64 = -1
- // A Vespa service that handles queries.
- QueryService = "query"
-
- // A Vespa service that handles feeding of document. This may point to the same service as QueryService.
- DocumentService = "document"
-
- retryInterval = 2 * time.Second
+ // AnyDeployment waits for a deployment to converge on any generation
+ AnyDeployment int64 = -2
)
+var errWaitTimeout = errors.New("wait timed out")
+
// Authenticator authenticates the given HTTP request.
type Authenticator interface {
Authenticate(request *http.Request) error
@@ -50,9 +49,11 @@ type Service struct {
Name string
TLSOptions TLSOptions
- once sync.Once
- auth Authenticator
- httpClient util.HTTPClient
+ deployAPI bool
+ once sync.Once
+ auth Authenticator
+ httpClient util.HTTPClient
+ retryInterval time.Duration
}
// Target represents a Vespa platform, running named Vespa services.
@@ -66,8 +67,16 @@ type Target interface {
// Deployment returns the deployment managed by this target.
Deployment() Deployment
- // Service returns the service for given name. If timeout is non-zero, wait for the service to converge.
- Service(name string, timeout time.Duration, sessionOrRunID int64, cluster string) (*Service, error)
+ // DeployService returns the service providing the deploy API on this target.
+ DeployService() (*Service, error)
+
+ // ContainerServices returns all container services of the current deployment. If timeout is positive, wait for
+ // services to be discovered.
+ ContainerServices(timeout time.Duration) ([]*Service, error)
+
+ // AwaitDeployment waits for a deployment identified by id to succeed. It returns the id that succeeded, or an
+ // error. The exact meaning of id depends on the implementation.
+ AwaitDeployment(id int64, timeout time.Duration) (int64, error)
// PrintLog writes the logs of this deployment using given options to control output.
PrintLog(options LogOptions) error
@@ -114,29 +123,63 @@ func (s *Service) Do(request *http.Request, timeout time.Duration) (*http.Respon
func (s *Service) SetClient(client util.HTTPClient) { s.httpClient = client }
// Wait polls the health check of this service until it succeeds or timeout passes.
-func (s *Service) Wait(timeout time.Duration) (int, error) {
+func (s *Service) Wait(timeout time.Duration) error {
url := s.BaseURL
- switch s.Name {
- case DeployService:
+ if s.deployAPI {
url += "/status.html" // because /ApplicationStatus is not publicly reachable in Vespa Cloud
- case QueryService, DocumentService:
+ } else {
url += "/ApplicationStatus"
- default:
- return 0, fmt.Errorf("invalid service: %s", s.Name)
}
- return waitForOK(s, url, timeout)
+ req, err := http.NewRequest("GET", url, nil)
+ if err != nil {
+ return err
+ }
+ okFunc := func(status int, response []byte) (bool, error) { return isOK(status) }
+ status, err := wait(s, okFunc, func() *http.Request { return req }, timeout, s.retryInterval)
+ if err != nil {
+ waitDesc := ""
+ if timeout > 0 {
+ waitDesc = " (after waiting " + timeout.String() + ") "
+ }
+ statusDesc := ""
+ if status > 0 {
+ statusDesc = fmt.Sprintf(": status %d", status)
+ }
+ return fmt.Errorf("unhealthy %s%s%s at %s: %w", s.Description(), waitDesc, statusDesc, url, err)
+ }
+ return nil
}
func (s *Service) Description() string {
- switch s.Name {
- case QueryService:
- return "Container (query API)"
- case DocumentService:
- return "Container (document API)"
- case DeployService:
- return "Deploy API"
+ if s.deployAPI {
+ return "deploy API"
+ }
+ if s.Name == "" {
+ return "container"
+ }
+ return "container " + s.Name
+}
+
+// FindService returns the service of given name, found among services, if any.
+func FindService(name string, services []*Service) (*Service, error) {
+ if name == "" && len(services) == 1 {
+ return services[0], nil
+ }
+ names := make([]string, len(services))
+ for i, s := range services {
+ if name == s.Name {
+ return s, nil
+ }
+ names[i] = s.Name
}
- return fmt.Sprintf("No description of service %s", s.Name)
+ found := "no services found"
+ if len(names) > 0 {
+ found = "known services: " + strings.Join(names, ", ")
+ }
+ if name != "" {
+ return nil, fmt.Errorf("no such service: %q: %s", name, found)
+ }
+ return nil, fmt.Errorf("no service specified: %s", found)
}
func isOK(status int) (bool, error) {
@@ -145,57 +188,48 @@ func isOK(status int) (bool, error) {
case 2: // success
return true, nil
case 4: // client error
- return false, fmt.Errorf("request failed with status %d", status)
- default: // retry
+ return false, fmt.Errorf("got status %d", status)
+ default: // retry on everything else
return false, nil
}
}
-type responseFunc func(status int, response []byte) (bool, error)
+// responseFunc returns whether a HTTP request is considered successful, based on its status and response data.
+// Returning false indicates that the operation should be retried. An error is returned if the response is considered
+// terminal and that the request should not be retried.
+type responseFunc func(status int, response []byte) (ok bool, err error)
type requestFunc func() *http.Request
-// waitForOK queries url and returns its status code. If response status is not 2xx or 4xx, it is repeatedly queried
-// until timeout elapses.
-func waitForOK(service *Service, url string, timeout time.Duration) (int, error) {
- req, err := http.NewRequest("GET", url, nil)
- if err != nil {
- return 0, err
- }
- okFunc := func(status int, response []byte) (bool, error) {
- ok, err := isOK(status)
- if err != nil {
- return false, fmt.Errorf("failed to query %s at %s: %w", service.Description(), url, err)
- }
- return ok, err
- }
- return wait(service, okFunc, func() *http.Request { return req }, timeout)
-}
-
-func wait(service *Service, fn responseFunc, reqFn requestFunc, timeout time.Duration) (int, error) {
+// wait queries service until one of the following conditions are satisfied:
+//
+// 1. okFn returns true or a non-nil error
+// 2. timeout is exceeded
+//
+// It returns the last received HTTP status code and error, if any.
+func wait(service *Service, okFn responseFunc, reqFn requestFunc, timeout, retryInterval time.Duration) (int, error) {
var (
- httpErr error
- response *http.Response
- statusCode int
+ status int
+ response *http.Response
+ err error
)
deadline := time.Now().Add(timeout)
loopOnce := timeout == 0
for time.Now().Before(deadline) || loopOnce {
- req := reqFn()
- response, httpErr = service.Do(req, 10*time.Second)
- if httpErr == nil {
- statusCode = response.StatusCode
+ response, err = service.Do(reqFn(), 10*time.Second)
+ if err == nil {
+ status = response.StatusCode
body, err := io.ReadAll(response.Body)
if err != nil {
return 0, err
}
response.Body.Close()
- ok, err := fn(statusCode, body)
+ ok, err := okFn(status, body)
if err != nil {
- return statusCode, err
+ return status, err
}
if ok {
- return statusCode, nil
+ return status, nil
}
}
timeLeft := time.Until(deadline)
@@ -204,5 +238,8 @@ func wait(service *Service, fn responseFunc, reqFn requestFunc, timeout time.Dur
}
time.Sleep(retryInterval)
}
- return statusCode, httpErr
+ if err == nil {
+ return status, errWaitTimeout
+ }
+ return status, err
}
diff --git a/client/go/internal/vespa/target_cloud.go b/client/go/internal/vespa/target_cloud.go
index c0169f1a9bd..24133ba5fc3 100644
--- a/client/go/internal/vespa/target_cloud.go
+++ b/client/go/internal/vespa/target_cloud.go
@@ -3,12 +3,12 @@ package vespa
import (
"bytes"
"encoding/json"
+ "errors"
"fmt"
"math"
"net/http"
"sort"
"strconv"
- "strings"
"time"
"github.com/vespa-engine/vespa/client/go/internal/util"
@@ -37,6 +37,7 @@ type cloudTarget struct {
httpClient util.HTTPClient
apiAuth Authenticator
deploymentAuth Authenticator
+ retryInterval time.Duration
}
type deploymentEndpoint struct {
@@ -49,13 +50,21 @@ type deploymentResponse struct {
Endpoints []deploymentEndpoint `json:"endpoints"`
}
-type jobResponse struct {
+type runResponse struct {
Active bool `json:"active"`
Status string `json:"status"`
Log map[string][]logMessage `json:"log"`
LastID int64 `json:"lastId"`
}
+type jobResponse struct {
+ ID int64 `json:"id"`
+}
+
+type jobsResponse struct {
+ Runs []jobResponse `json:"runs"`
+}
+
type logMessage struct {
At int64 `json:"at"`
Type string `json:"type"`
@@ -63,7 +72,9 @@ type logMessage struct {
}
// CloudTarget creates a Target for the Vespa Cloud or hosted Vespa platform.
-func CloudTarget(httpClient util.HTTPClient, apiAuth Authenticator, deploymentAuth Authenticator, apiOptions APIOptions, deploymentOptions CloudDeploymentOptions, logOptions LogOptions) (Target, error) {
+func CloudTarget(httpClient util.HTTPClient, apiAuth Authenticator, deploymentAuth Authenticator,
+ apiOptions APIOptions, deploymentOptions CloudDeploymentOptions,
+ logOptions LogOptions, retryInterval time.Duration) (Target, error) {
return &cloudTarget{
httpClient: httpClient,
apiOptions: apiOptions,
@@ -71,40 +82,10 @@ func CloudTarget(httpClient util.HTTPClient, apiAuth Authenticator, deploymentAu
logOptions: logOptions,
apiAuth: apiAuth,
deploymentAuth: deploymentAuth,
+ retryInterval: retryInterval,
}, nil
}
-func (t *cloudTarget) findClusterURL(cluster string, timeout time.Duration, runID int64) (string, error) {
- if t.deploymentOptions.CustomURL != "" {
- return t.deploymentOptions.CustomURL, nil
- }
- if t.deploymentOptions.ClusterURLs == nil {
- if err := t.waitForEndpoints(timeout, runID); err != nil {
- return "", err
- }
- }
- clusters := make([]string, 0, len(t.deploymentOptions.ClusterURLs))
- for c := range t.deploymentOptions.ClusterURLs {
- clusters = append(clusters, c)
- }
- if cluster == "" {
- for _, url := range t.deploymentOptions.ClusterURLs {
- if len(t.deploymentOptions.ClusterURLs) == 1 {
- return url, nil
- } else {
- return "", fmt.Errorf("no cluster specified: found multiple clusters '%s'", strings.Join(clusters, "', '"))
- }
- }
- } else {
- url, ok := t.deploymentOptions.ClusterURLs[cluster]
- if !ok {
- return "", fmt.Errorf("invalid cluster '%s': must be one of '%s'", cluster, strings.Join(clusters, "', '"))
- }
- return url, nil
- }
- return "", fmt.Errorf("no endpoints found")
-}
-
func (t *cloudTarget) Type() string {
switch t.apiOptions.System.Name {
case MainSystem.Name, CDSystem.Name:
@@ -117,41 +98,52 @@ func (t *cloudTarget) IsCloud() bool { return true }
func (t *cloudTarget) Deployment() Deployment { return t.deploymentOptions.Deployment }
-func (t *cloudTarget) Service(name string, timeout time.Duration, runID int64, cluster string) (*Service, error) {
- switch name {
- case DeployService:
+func (t *cloudTarget) DeployService() (*Service, error) {
+ return &Service{
+ BaseURL: t.apiOptions.System.URL,
+ TLSOptions: t.apiOptions.TLSOptions,
+ deployAPI: true,
+ httpClient: t.httpClient,
+ auth: t.apiAuth,
+ retryInterval: t.retryInterval,
+ }, nil
+}
+
+func (t *cloudTarget) ContainerServices(timeout time.Duration) ([]*Service, error) {
+ var clusterUrls map[string]string
+ if t.deploymentOptions.CustomURL != "" {
+ // Custom URL is always preferred
+ clusterUrls = map[string]string{"": t.deploymentOptions.CustomURL}
+ } else if t.deploymentOptions.ClusterURLs != nil {
+ // ... then endpoints specified through environment
+ clusterUrls = t.deploymentOptions.ClusterURLs
+ } else {
+ // ... then discovered endpoints
+ endpoints, err := t.discoverEndpoints(timeout)
+ if err != nil {
+ return nil, err
+ }
+ clusterUrls = endpoints
+ }
+ services := make([]*Service, 0, len(clusterUrls))
+ for name, url := range clusterUrls {
service := &Service{
- Name: name,
- BaseURL: t.apiOptions.System.URL,
- TLSOptions: t.apiOptions.TLSOptions,
- httpClient: t.httpClient,
- auth: t.apiAuth,
+ Name: name,
+ BaseURL: url,
+ TLSOptions: t.deploymentOptions.TLSOptions,
+ httpClient: t.httpClient,
+ auth: t.deploymentAuth,
+ retryInterval: t.retryInterval,
}
if timeout > 0 {
- status, err := service.Wait(timeout)
- if err != nil {
+ if err := service.Wait(timeout); err != nil {
return nil, err
}
- if ok, _ := isOK(status); !ok {
- return nil, fmt.Errorf("got status %d from deploy service at %s", status, service.BaseURL)
- }
- }
- return service, nil
- case QueryService, DocumentService:
- url, err := t.findClusterURL(cluster, timeout, runID)
- if err != nil {
- return nil, err
}
- return &Service{
- Name: name,
- BaseURL: url,
- TLSOptions: t.deploymentOptions.TLSOptions,
- httpClient: t.httpClient,
- auth: t.deploymentAuth,
- }, nil
- default:
- return nil, fmt.Errorf("unknown service: %s", name)
+ services = append(services, service)
}
+ sort.Slice(services, func(i, j int) bool { return services[i].Name < services[j].Name })
+ return services, nil
}
func (t *cloudTarget) CheckVersion(clientVersion version.Version) error {
@@ -162,7 +154,7 @@ func (t *cloudTarget) CheckVersion(clientVersion version.Version) error {
if err != nil {
return err
}
- deployService, err := t.Service(DeployService, 0, 0, "")
+ deployService, err := t.DeployService()
if err != nil {
return err
}
@@ -218,7 +210,7 @@ func (t *cloudTarget) PrintLog(options LogOptions) error {
}
logEntries, err := ReadLogEntries(bytes.NewReader(response))
if err != nil {
- return true, err
+ return false, err
}
for _, le := range logEntries {
if !le.Time.After(lastFrom) {
@@ -238,35 +230,65 @@ func (t *cloudTarget) PrintLog(options LogOptions) error {
if options.Follow {
timeout = math.MaxInt64 // No timeout
}
- _, err = t.deployServiceWait(logFunc, requestFunc, timeout)
- return err
+ // Ignore wait error because logFunc has no concept of completion, we just want to print log entries until timeout is reached
+ if _, err := t.deployServiceWait(logFunc, requestFunc, timeout); err != nil && !errors.Is(err, errWaitTimeout) {
+ return fmt.Errorf("failed to read logs: %s", err)
+ }
+ return nil
}
func (t *cloudTarget) deployServiceWait(fn responseFunc, reqFn requestFunc, timeout time.Duration) (int, error) {
- deployService, err := t.Service(DeployService, 0, 0, "")
+ deployService, err := t.DeployService()
if err != nil {
return 0, err
}
- return wait(deployService, fn, reqFn, timeout)
+ return wait(deployService, fn, reqFn, timeout, t.retryInterval)
}
-func (t *cloudTarget) waitForEndpoints(timeout time.Duration, runID int64) error {
- if runID > 0 {
- if err := t.waitForRun(runID, timeout); err != nil {
- return err
+func (t *cloudTarget) discoverLatestRun(timeout time.Duration) (int64, error) {
+ runsURL := fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/job/%s-%s?limit=1",
+ t.apiOptions.System.URL,
+ t.deploymentOptions.Deployment.Application.Tenant, t.deploymentOptions.Deployment.Application.Application, t.deploymentOptions.Deployment.Application.Instance,
+ t.deploymentOptions.Deployment.Zone.Environment, t.deploymentOptions.Deployment.Zone.Region)
+ req, err := http.NewRequest("GET", runsURL, nil)
+ if err != nil {
+ return 0, err
+ }
+ requestFunc := func() *http.Request { return req }
+ var lastRunID int64
+ jobsSuccessFunc := func(status int, response []byte) (bool, error) {
+ if ok, err := isOK(status); !ok {
+ return ok, err
+ }
+ var resp jobsResponse
+ if err := json.Unmarshal(response, &resp); err != nil {
+ return false, err
+ }
+ if len(resp.Runs) > 0 {
+ lastRunID = resp.Runs[0].ID
+ return true, nil
}
+ return false, nil
}
- return t.discoverEndpoints(timeout)
+ _, err = t.deployServiceWait(jobsSuccessFunc, requestFunc, timeout)
+ return lastRunID, err
}
-func (t *cloudTarget) waitForRun(runID int64, timeout time.Duration) error {
+func (t *cloudTarget) AwaitDeployment(runID int64, timeout time.Duration) (int64, error) {
+ if runID == LatestDeployment {
+ lastRunID, err := t.discoverLatestRun(timeout)
+ if err != nil {
+ return 0, err
+ }
+ runID = lastRunID
+ }
runURL := fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/job/%s-%s/run/%d",
t.apiOptions.System.URL,
t.deploymentOptions.Deployment.Application.Tenant, t.deploymentOptions.Deployment.Application.Application, t.deploymentOptions.Deployment.Application.Instance,
t.deploymentOptions.Deployment.Zone.Environment, t.deploymentOptions.Deployment.Zone.Region, runID)
req, err := http.NewRequest("GET", runURL, nil)
if err != nil {
- return err
+ return 0, err
}
lastID := int64(-1)
requestFunc := func() *http.Request {
@@ -275,13 +297,14 @@ func (t *cloudTarget) waitForRun(runID int64, timeout time.Duration) error {
req.URL.RawQuery = q.Encode()
return req
}
+ success := false
jobSuccessFunc := func(status int, response []byte) (bool, error) {
if ok, err := isOK(status); !ok {
return ok, err
}
- var resp jobResponse
+ var resp runResponse
if err := json.Unmarshal(response, &resp); err != nil {
- return false, nil
+ return false, err
}
if t.logOptions.Writer != nil {
lastID = t.printLog(resp, lastID)
@@ -292,20 +315,27 @@ func (t *cloudTarget) waitForRun(runID int64, timeout time.Duration) error {
if resp.Status != "success" {
return false, fmt.Errorf("run %d ended with unsuccessful status: %s", runID, resp.Status)
}
- return true, nil
+ success = true
+ return success, nil
}
_, err = t.deployServiceWait(jobSuccessFunc, requestFunc, timeout)
- return err
+ if err != nil {
+ return 0, fmt.Errorf("deployment run %d incomplete after waiting %s: %w", runID, timeout, err)
+ }
+ if !success {
+ return 0, fmt.Errorf("deployment run %d incomplete after waiting %s", runID, timeout)
+ }
+ return runID, nil
}
-func (t *cloudTarget) printLog(response jobResponse, last int64) int64 {
+func (t *cloudTarget) printLog(response runResponse, last int64) int64 {
if response.LastID == 0 {
return last
}
var msgs []logMessage
for step, stepMsgs := range response.Log {
for _, msg := range stepMsgs {
- if step == "copyVespaLogs" && LogLevel(msg.Type) > t.logOptions.Level || LogLevel(msg.Type) == 3 {
+ if (step == "copyVespaLogs" && LogLevel(msg.Type) > t.logOptions.Level) || LogLevel(msg.Type) == 3 {
continue
}
msgs = append(msgs, msg)
@@ -320,14 +350,14 @@ func (t *cloudTarget) printLog(response jobResponse, last int64) int64 {
return response.LastID
}
-func (t *cloudTarget) discoverEndpoints(timeout time.Duration) error {
+func (t *cloudTarget) discoverEndpoints(timeout time.Duration) (map[string]string, error) {
deploymentURL := fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/environment/%s/region/%s",
t.apiOptions.System.URL,
t.deploymentOptions.Deployment.Application.Tenant, t.deploymentOptions.Deployment.Application.Application, t.deploymentOptions.Deployment.Application.Instance,
t.deploymentOptions.Deployment.Zone.Environment, t.deploymentOptions.Deployment.Zone.Region)
req, err := http.NewRequest("GET", deploymentURL, nil)
if err != nil {
- return err
+ return nil, err
}
urlsByCluster := make(map[string]string)
endpointFunc := func(status int, response []byte) (bool, error) {
@@ -350,11 +380,10 @@ func (t *cloudTarget) discoverEndpoints(timeout time.Duration) error {
return true, nil
}
if _, err := t.deployServiceWait(endpointFunc, func() *http.Request { return req }, timeout); err != nil {
- return err
+ return nil, fmt.Errorf("no endpoints found after waiting %s: %w", timeout, err)
}
if len(urlsByCluster) == 0 {
- return fmt.Errorf("no endpoints discovered for %s", t.deploymentOptions.Deployment)
+ return nil, fmt.Errorf("no endpoints found after waiting %s", timeout)
}
- t.deploymentOptions.ClusterURLs = urlsByCluster
- return nil
+ return urlsByCluster, nil
}
diff --git a/client/go/internal/vespa/target_custom.go b/client/go/internal/vespa/target_custom.go
index fd0af0e8d53..b0ca4f8492c 100644
--- a/client/go/internal/vespa/target_custom.go
+++ b/client/go/internal/vespa/target_custom.go
@@ -3,8 +3,11 @@ package vespa
import (
"encoding/json"
"fmt"
+ "net"
"net/http"
"net/url"
+ "sort"
+ "strconv"
"time"
"github.com/vespa-engine/vespa/client/go/internal/util"
@@ -12,24 +15,45 @@ import (
)
type customTarget struct {
- targetType string
- baseURL string
- httpClient util.HTTPClient
- tlsOptions TLSOptions
+ targetType string
+ baseURL string
+ httpClient util.HTTPClient
+ tlsOptions TLSOptions
+ retryInterval time.Duration
}
-type serviceConvergeResponse struct {
- Converged bool `json:"converged"`
+type serviceStatus struct {
+ Converged bool `json:"converged"`
+ CurrentGeneration int64 `json:"currentGeneration"`
+ Services []serviceInfo `json:"services"`
+}
+
+type serviceInfo struct {
+ ClusterName string `json:"clusterName"`
+ Type string `json:"type"`
+ Port int `json:"port"`
}
// LocalTarget creates a target for a Vespa platform running locally.
-func LocalTarget(httpClient util.HTTPClient, tlsOptions TLSOptions) Target {
- return &customTarget{targetType: TargetLocal, baseURL: "http://127.0.0.1", httpClient: httpClient, tlsOptions: tlsOptions}
+func LocalTarget(httpClient util.HTTPClient, tlsOptions TLSOptions, retryInterval time.Duration) Target {
+ return &customTarget{
+ targetType: TargetLocal,
+ baseURL: "http://127.0.0.1",
+ httpClient: httpClient,
+ tlsOptions: tlsOptions,
+ retryInterval: retryInterval,
+ }
}
// CustomTarget creates a Target for a Vespa platform running at baseURL.
-func CustomTarget(httpClient util.HTTPClient, baseURL string, tlsOptions TLSOptions) Target {
- return &customTarget{targetType: TargetCustom, baseURL: baseURL, httpClient: httpClient, tlsOptions: tlsOptions}
+func CustomTarget(httpClient util.HTTPClient, baseURL string, tlsOptions TLSOptions, retryInterval time.Duration) Target {
+ return &customTarget{
+ targetType: TargetCustom,
+ baseURL: baseURL,
+ httpClient: httpClient,
+ tlsOptions: tlsOptions,
+ retryInterval: retryInterval,
+ }
}
func (t *customTarget) Type() string { return t.targetType }
@@ -38,95 +62,126 @@ func (t *customTarget) IsCloud() bool { return false }
func (t *customTarget) Deployment() Deployment { return DefaultDeployment }
-func (t *customTarget) createService(name string) (*Service, error) {
- switch name {
- case DeployService, QueryService, DocumentService:
- url, err := t.serviceURL(name, t.targetType)
- if err != nil {
- return nil, err
- }
- return &Service{BaseURL: url, Name: name, httpClient: t.httpClient, TLSOptions: t.tlsOptions}, nil
+func (t *customTarget) PrintLog(options LogOptions) error {
+ return fmt.Errorf("log access is only supported on cloud: run vespa-logfmt on the admin node instead, or export from a container image (here named 'vespa') using docker exec vespa vespa-logfmt")
+}
+
+func (t *customTarget) CheckVersion(version version.Version) error { return nil }
+
+func (t *customTarget) newService(url, name string, deployAPI bool) *Service {
+ return &Service{
+ BaseURL: url,
+ Name: name,
+ deployAPI: deployAPI,
+ httpClient: t.httpClient,
+ TLSOptions: t.tlsOptions,
+ retryInterval: t.retryInterval,
}
- return nil, fmt.Errorf("unknown service: %s", name)
}
-func (t *customTarget) Service(name string, timeout time.Duration, sessionOrRunID int64, cluster string) (*Service, error) {
- service, err := t.createService(name)
+func (t *customTarget) DeployService() (*Service, error) {
+ if t.targetType == TargetCustom {
+ return t.newService(t.baseURL, "", true), nil
+ }
+ u, err := t.urlWithPort(19071)
if err != nil {
return nil, err
}
- if timeout > 0 {
- if name == DeployService {
- status, err := service.Wait(timeout)
- if err != nil {
- return nil, err
- }
- if ok, _ := isOK(status); !ok {
- return nil, fmt.Errorf("got status %d from deploy service at %s", status, service.BaseURL)
- }
- } else {
- if err := t.waitForConvergence(timeout); err != nil {
- return nil, err
- }
+ return t.newService(u.String(), "", true), nil
+}
+
+func (t *customTarget) ContainerServices(timeout time.Duration) ([]*Service, error) {
+ if t.targetType == TargetCustom {
+ return []*Service{t.newService(t.baseURL, "", false)}, nil
+ }
+ status, err := t.serviceStatus(AnyDeployment, timeout)
+ if err != nil {
+ return nil, err
+ }
+ portsByCluster := make(map[string]int)
+ for _, serviceInfo := range status.Services {
+ if serviceInfo.Type != "container" {
+ continue
+ }
+ clusterName := serviceInfo.ClusterName
+ if clusterName == "" { // Vespa version older than 8.206.1, which does not include cluster name in the API
+ clusterName = serviceInfo.Type + strconv.Itoa(serviceInfo.Port)
}
+ portsByCluster[clusterName] = serviceInfo.Port
}
- return service, nil
+ var services []*Service
+ for cluster, port := range portsByCluster {
+ url, err := t.urlWithPort(port)
+ if err != nil {
+ return nil, err
+ }
+ service := t.newService(url.String(), cluster, false)
+ services = append(services, service)
+ }
+ sort.Slice(services, func(i, j int) bool { return services[i].Name < services[j].Name })
+ return services, nil
}
-func (t *customTarget) PrintLog(options LogOptions) error {
- return fmt.Errorf("log access is only supported on cloud: run vespa-logfmt on the admin node instead, or export from a container image (here named 'vespa') using docker exec vespa vespa-logfmt")
+func (t *customTarget) AwaitDeployment(generation int64, timeout time.Duration) (int64, error) {
+ status, err := t.serviceStatus(generation, timeout)
+ if err != nil {
+ return 0, err
+ }
+ return status.CurrentGeneration, nil
}
-func (t *customTarget) CheckVersion(version version.Version) error { return nil }
-
-func (t *customTarget) serviceURL(name string, targetType string) (string, error) {
+func (t *customTarget) urlWithPort(port int) (*url.URL, error) {
u, err := url.Parse(t.baseURL)
if err != nil {
- return "", err
- }
- if targetType == TargetLocal {
- // Use same ports as the vespaengine/vespa container image
- port := ""
- switch name {
- case DeployService:
- port = "19071"
- case QueryService, DocumentService:
- port = "8080"
- default:
- return "", fmt.Errorf("unknown service: %s", name)
- }
- u.Host = u.Host + ":" + port
+ return nil, err
}
- return u.String(), nil
+ if _, _, err := net.SplitHostPort(u.Host); err == nil {
+ return nil, fmt.Errorf("url %s already contains port", u)
+ }
+ u.Host = net.JoinHostPort(u.Host, strconv.Itoa(port))
+ return u, nil
}
-func (t *customTarget) waitForConvergence(timeout time.Duration) error {
- deployService, err := t.createService(DeployService)
+func (t *customTarget) serviceStatus(wantedGeneration int64, timeout time.Duration) (serviceStatus, error) {
+ deployService, err := t.DeployService()
if err != nil {
- return err
+ return serviceStatus{}, err
}
url := fmt.Sprintf("%s/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge", deployService.BaseURL)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
- return err
+ return serviceStatus{}, err
}
+ var status serviceStatus
converged := false
- convergedFunc := func(status int, response []byte) (bool, error) {
- if ok, err := isOK(status); !ok {
+ convergedFunc := func(httpStatus int, response []byte) (bool, error) {
+ if ok, err := isOK(httpStatus); !ok {
return ok, err
}
- var resp serviceConvergeResponse
- if err := json.Unmarshal(response, &resp); err != nil {
- return false, nil
+ if err := json.Unmarshal(response, &status); err != nil {
+ return false, err
}
- converged = resp.Converged
+ converged = wantedGeneration == AnyDeployment ||
+ (wantedGeneration == LatestDeployment && status.Converged) ||
+ status.CurrentGeneration == wantedGeneration
return converged, nil
}
- if _, err := wait(deployService, convergedFunc, func() *http.Request { return req }, timeout); err != nil {
- return err
+ if _, err := wait(deployService, convergedFunc, func() *http.Request { return req }, timeout, t.retryInterval); err != nil {
+ return serviceStatus{}, fmt.Errorf("deployment not converged%s after waiting %s: %w", generationDescription(wantedGeneration), timeout, err)
}
if !converged {
- return fmt.Errorf("services have not converged")
+ return serviceStatus{}, fmt.Errorf("deployment not converged%s after waiting %s", generationDescription(wantedGeneration), timeout)
+ }
+ return status, nil
+}
+
+func generationDescription(generation int64) string {
+ switch generation {
+ case AnyDeployment:
+ return ""
+ case LatestDeployment:
+ return " on latest generation"
+ default:
+ return fmt.Sprintf(" on generation %d", generation)
}
- return nil
}
diff --git a/client/go/internal/vespa/target_test.go b/client/go/internal/vespa/target_test.go
index 6dc97f496f5..b208489ddce 100644
--- a/client/go/internal/vespa/target_test.go
+++ b/client/go/internal/vespa/target_test.go
@@ -3,145 +3,224 @@ package vespa
import (
"bytes"
- "fmt"
"io"
"net/http"
- "net/http/httptest"
+ "strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
"github.com/vespa-engine/vespa/client/go/internal/mock"
- "github.com/vespa-engine/vespa/client/go/internal/util"
"github.com/vespa-engine/vespa/client/go/internal/version"
)
-type mockVespaApi struct {
- deploymentConverged bool
- authFailure bool
- serverURL string
-}
-
-func (v *mockVespaApi) mockVespaHandler(w http.ResponseWriter, req *http.Request) {
- if v.authFailure {
- response := `{"message":"unauthorized"}`
- w.WriteHeader(401)
- w.Write([]byte(response))
- }
- switch req.URL.Path {
- case "/cli/v1/":
- response := `{"minVersion":"8.0.0"}`
- w.Write([]byte(response))
- case "/application/v4/tenant/t1/application/a1/instance/i1/environment/dev/region/us-north-1":
- response := "{}"
- if v.deploymentConverged {
- response = fmt.Sprintf(`{"endpoints": [{"url": "%s","scope": "zone","cluster": "cluster1"}]}`, v.serverURL)
- }
- w.Write([]byte(response))
- case "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1/run/42":
- var response string
- if v.deploymentConverged {
- response = `{"active": false, "status": "success"}`
- } else {
- response = `{"active": true, "status": "running",
- "lastId": 42,
- "log": {"deployReal": [{"at": 1631707708431,
- "type": "info",
- "message": "Deploying platform version 7.465.17 and application version 1.0.2 ..."}]}}`
- }
- w.Write([]byte(response))
- case "/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge":
- response := fmt.Sprintf(`{"converged": %t}`, v.deploymentConverged)
- w.Write([]byte(response))
- case "/application/v4/tenant/t1/application/a1/instance/i1/environment/dev/region/us-north-1/logs":
- log := `1632738690.905535 host1a.dev.aws-us-east-1c 806/53 logserver-container Container.com.yahoo.container.jdisc.ConfiguredApplication info Switching to the latest deployed set of configurations and components. Application config generation: 52532
-1632738698.600189 host1a.dev.aws-us-east-1c 1723/33590 config-sentinel sentinel.sentinel.config-owner config Sentinel got 3 service elements [tenant(vespa-team), application(music), instance(mpolden)] for config generation 52532
-`
- w.Write([]byte(log))
- case "/status.html":
- w.Write([]byte("OK"))
- case "/ApplicationStatus":
- w.WriteHeader(500)
- w.Write([]byte("Unknown error"))
- default:
- w.WriteHeader(400)
- w.Write([]byte("Invalid path: " + req.URL.Path))
+func TestLocalTarget(t *testing.T) {
+ // Local target uses discovery
+ client := &mock.HTTPClient{}
+ lt := LocalTarget(client, TLSOptions{}, 0)
+ assertServiceURL(t, "http://127.0.0.1:19071", lt, "deploy")
+ for i := 0; i < 2; i++ {
+ response := `
+{
+ "services": [
+ {
+ "host": "foo",
+ "port": 8080,
+ "type": "container",
+ "url": "http://localhost:19071/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge/localhost:8080",
+ "currentGeneration": 1
+ },
+ {
+ "host": "bar",
+ "port": 8080,
+ "type": "container",
+ "url": "http://localhost:19071/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge/localhost:8080",
+ "currentGeneration": 1
+ },
+ {
+ "clusterName": "feed",
+ "host": "localhost",
+ "port": 8081,
+ "type": "container",
+ "url": "http://localhost:19071/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge/localhost:8081",
+ "currentGeneration": 1
+ },
+ {
+ "host": "localhost",
+ "port": 19112,
+ "type": "searchnode",
+ "url": "http://localhost:19071/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge/localhost:19112",
+ "currentGeneration": 1
+ }
+ ],
+ "currentGeneration": 1
+}`
+ client.NextResponse(mock.HTTPResponse{
+ URI: "/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge",
+ Status: 200,
+ Body: []byte(response),
+ })
}
+ assertServiceURL(t, "http://127.0.0.1:8080", lt, "container8080")
+ assertServiceURL(t, "http://127.0.0.1:8081", lt, "feed")
}
func TestCustomTarget(t *testing.T) {
- lt := LocalTarget(&mock.HTTPClient{}, TLSOptions{})
- assertServiceURL(t, "http://127.0.0.1:19071", lt, "deploy")
- assertServiceURL(t, "http://127.0.0.1:8080", lt, "query")
- assertServiceURL(t, "http://127.0.0.1:8080", lt, "document")
-
- ct := CustomTarget(&mock.HTTPClient{}, "http://192.0.2.42", TLSOptions{})
+ // Custom target always uses URL directly, without discovery
+ ct := CustomTarget(&mock.HTTPClient{}, "http://192.0.2.42", TLSOptions{}, 0)
assertServiceURL(t, "http://192.0.2.42", ct, "deploy")
- assertServiceURL(t, "http://192.0.2.42", ct, "query")
- assertServiceURL(t, "http://192.0.2.42", ct, "document")
-
- ct2 := CustomTarget(&mock.HTTPClient{}, "http://192.0.2.42:60000", TLSOptions{})
+ assertServiceURL(t, "http://192.0.2.42", ct, "")
+ ct2 := CustomTarget(&mock.HTTPClient{}, "http://192.0.2.42:60000", TLSOptions{}, 0)
assertServiceURL(t, "http://192.0.2.42:60000", ct2, "deploy")
- assertServiceURL(t, "http://192.0.2.42:60000", ct2, "query")
- assertServiceURL(t, "http://192.0.2.42:60000", ct2, "document")
+ assertServiceURL(t, "http://192.0.2.42:60000", ct2, "")
}
func TestCustomTargetWait(t *testing.T) {
- vc := mockVespaApi{}
- srv := httptest.NewServer(http.HandlerFunc(vc.mockVespaHandler))
- defer srv.Close()
- target := CustomTarget(util.CreateClient(time.Second*10), srv.URL, TLSOptions{})
+ client := &mock.HTTPClient{}
+ target := CustomTarget(client, "http://192.0.2.42", TLSOptions{}, 0)
+ // Fails once
+ client.NextStatus(500)
+ assertService(t, true, target, "", 0)
+ // Fails multiple times
+ for i := 0; i < 3; i++ {
+ client.NextStatus(500)
+ client.NextResponseError(io.EOF)
+ }
+ // Then succeeds
+ client.NextResponse(mock.HTTPResponse{URI: "/ApplicationStatus", Status: 200})
+ assertService(t, false, target, "", time.Second)
+}
+
+func TestCustomTargetAwaitDeployment(t *testing.T) {
+ client := &mock.HTTPClient{}
+ target := CustomTarget(client, "http://192.0.2.42", TLSOptions{}, 0)
- _, err := target.Service("query", time.Millisecond, 42, "")
+ // Not converged initially
+ _, err := target.AwaitDeployment(42, 0)
assert.NotNil(t, err)
- vc.deploymentConverged = true
- _, err = target.Service("query", time.Millisecond, 42, "")
- assert.Nil(t, err)
+ // Not converged on this generation
+ response := mock.HTTPResponse{
+ URI: "/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge",
+ Status: 200,
+ Body: []byte(`{"currentGeneration": 42}`),
+ }
+ client.NextResponse(response)
+ _, err = target.AwaitDeployment(41, 0)
+ assert.NotNil(t, err)
- assertServiceWait(t, 200, target, "deploy")
- assertServiceWait(t, 500, target, "query")
- assertServiceWait(t, 500, target, "document")
+ // Converged
+ client.NextResponse(response)
+ convergedID, err := target.AwaitDeployment(42, 0)
+ assert.Nil(t, err)
+ assert.Equal(t, int64(42), convergedID)
}
func TestCloudTargetWait(t *testing.T) {
- vc := mockVespaApi{}
- srv := httptest.NewServer(http.HandlerFunc(vc.mockVespaHandler))
- defer srv.Close()
- vc.serverURL = srv.URL
-
var logWriter bytes.Buffer
- target := createCloudTarget(t, srv.URL, &logWriter)
- vc.authFailure = true
- assertServiceWaitErr(t, 401, true, target, "deploy")
- vc.authFailure = false
- assertServiceWait(t, 200, target, "deploy")
+ target, client := createCloudTarget(t, &logWriter)
+ client.NextStatus(401)
+ assertService(t, true, target, "deploy", time.Second) // No retrying on 4xx
+ client.NextStatus(500)
+ client.NextStatus(500)
+ client.NextResponse(mock.HTTPResponse{URI: "/status.html", Status: 200})
+ assertService(t, false, target, "deploy", time.Second)
- _, err := target.Service("query", time.Millisecond, 42, "")
+ client.NextResponse(mock.HTTPResponse{
+ URI: "/application/v4/tenant/t1/application/a1/instance/i1/environment/dev/region/us-north-1",
+ Status: 200,
+ Body: []byte(`{"endpoints":[]}`),
+ })
+ _, err := target.ContainerServices(time.Millisecond)
assert.NotNil(t, err)
- vc.deploymentConverged = true
- _, err = target.Service("query", time.Millisecond, 42, "")
+ response := mock.HTTPResponse{
+ URI: "/application/v4/tenant/t1/application/a1/instance/i1/environment/dev/region/us-north-1",
+ Status: 200,
+ Body: []byte(`{
+ "endpoints": [
+ {"url": "http://a.example.com","scope": "zone", "cluster": "default"},
+ {"url": "http://b.example.com","scope": "zone", "cluster": "feed"}
+ ]
+}`),
+ }
+ client.NextResponse(response)
+ services, err := target.ContainerServices(time.Millisecond)
assert.Nil(t, err)
+ assert.Equal(t, 2, len(services))
- assertServiceWait(t, 500, target, "query")
- assertServiceWait(t, 500, target, "document")
+ client.NextResponse(response)
+ client.NextResponse(mock.HTTPResponse{URI: "/ApplicationStatus", Status: 500})
+ assertService(t, true, target, "default", 0)
+ client.NextResponse(response)
+ client.NextResponse(mock.HTTPResponse{URI: "/ApplicationStatus", Status: 200})
+ assertService(t, false, target, "feed", 0)
+}
+
+func TestCloudTargetAwaitDeployment(t *testing.T) {
+ var logWriter bytes.Buffer
+ target, client := createCloudTarget(t, &logWriter)
+
+ runningResponse := mock.HTTPResponse{
+ URI: "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1/run/42?after=-1",
+ Status: 200,
+ Body: []byte(`{"active": true, "status": "running",
+ "lastId": 42,
+ "log": {"deployReal": [{"at": 1631707708431,
+ "type": "info",
+ "message": "Deploying platform version 7.465.17 and application version 1.0.2 ..."}]}}`),
+ }
+ client.NextResponse(runningResponse)
+ runningResponse.URI = "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1/run/42?after=42"
+ client.NextResponse(runningResponse)
+ // Deployment has not succeeded yet
+ _, err := target.AwaitDeployment(int64(42), time.Second)
+ assert.NotNil(t, err)
// Log timestamp is converted to local time, do the same here in case the local time where tests are run varies
tm := time.Unix(1631707708, 431000)
expectedTime := tm.Format("[15:04:05]")
- assert.Equal(t, expectedTime+" info Deploying platform version 7.465.17 and application version 1.0.2 ...\n", logWriter.String())
+ assert.Equal(t, strings.Repeat(expectedTime+" info Deploying platform version 7.465.17 and application version 1.0.2 ...\n", 2), logWriter.String())
+
+ // Wanted deployment run eventually succeeds
+ runningResponse.URI = "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1/run/42?after=-1"
+ client.NextResponse(runningResponse)
+ client.NextResponse(mock.HTTPResponse{
+ URI: "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1/run/42?after=42",
+ Status: 200,
+ Body: []byte(`{"active": false, "status": "success"}`),
+ })
+ convergedID, err := target.AwaitDeployment(int64(42), time.Second)
+ assert.Nil(t, err)
+ assert.Equal(t, int64(42), convergedID)
+
+ // Await latest deployment
+ client.NextResponse(mock.HTTPResponse{
+ URI: "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1?limit=1",
+ Status: 200,
+ Body: []byte(`{"runs": [{"id": 1337}]}`),
+ })
+ client.NextResponse(mock.HTTPResponse{
+ URI: "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1/run/1337?after=-1",
+ Status: 200,
+ Body: []byte(`{"active": false, "status": "success"}`),
+ })
+ convergedID, err = target.AwaitDeployment(LatestDeployment, time.Second)
+ assert.Nil(t, err)
+ assert.Equal(t, int64(1337), convergedID)
}
func TestLog(t *testing.T) {
- vc := mockVespaApi{}
- srv := httptest.NewServer(http.HandlerFunc(vc.mockVespaHandler))
- defer srv.Close()
- vc.serverURL = srv.URL
- vc.deploymentConverged = true
-
+ target, client := createCloudTarget(t, io.Discard)
+ client.NextResponse(mock.HTTPResponse{
+ URI: "/application/v4/tenant/t1/application/a1/instance/i1/environment/dev/region/us-north-1/logs?from=-62135596800000",
+ Status: 200,
+ Body: []byte(`1632738690.905535 host1a.dev.aws-us-east-1c 806/53 logserver-container Container.com.yahoo.container.jdisc.ConfiguredApplication info Switching to the latest deployed set of configurations and components. Application config generation: 52532
+1632738698.600189 host1a.dev.aws-us-east-1c 1723/33590 config-sentinel sentinel.sentinel.config-owner config Sentinel got 3 service elements [tenant(vespa-team), application(music), instance(mpolden)] for config generation 52532
+`),
+ })
var buf bytes.Buffer
- target := createCloudTarget(t, srv.URL, io.Discard)
if err := target.PrintLog(LogOptions{Writer: &buf, Level: 3}); err != nil {
t.Fatal(err)
}
@@ -151,23 +230,22 @@ func TestLog(t *testing.T) {
}
func TestCheckVersion(t *testing.T) {
- vc := mockVespaApi{}
- srv := httptest.NewServer(http.HandlerFunc(vc.mockVespaHandler))
- defer srv.Close()
-
- target := createCloudTarget(t, srv.URL, io.Discard)
+ target, client := createCloudTarget(t, io.Discard)
+ for i := 0; i < 3; i++ {
+ client.NextResponse(mock.HTTPResponse{URI: "/cli/v1/", Status: 200, Body: []byte(`{"minVersion":"8.0.0"}`)})
+ }
assert.Nil(t, target.CheckVersion(version.MustParse("8.0.0")))
assert.Nil(t, target.CheckVersion(version.MustParse("8.1.0")))
assert.NotNil(t, target.CheckVersion(version.MustParse("7.0.0")))
}
-func createCloudTarget(t *testing.T, url string, logWriter io.Writer) Target {
+func createCloudTarget(t *testing.T, logWriter io.Writer) (Target, *mock.HTTPClient) {
apiKey, err := CreateAPIKey()
- assert.Nil(t, err)
-
+ require.Nil(t, err)
auth := &mockAuthenticator{}
+ client := &mock.HTTPClient{}
target, err := CloudTarget(
- util.CreateClient(time.Second*10),
+ client,
auth,
auth,
APIOptions{APIKey: apiKey, System: PublicSystem},
@@ -178,39 +256,39 @@ func createCloudTarget(t *testing.T, url string, logWriter io.Writer) Target {
},
},
LogOptions{Writer: logWriter},
+ 0,
)
- if err != nil {
- t.Fatal(err)
- }
- if ct, ok := target.(*cloudTarget); ok {
- ct.apiOptions.System.URL = url
- } else {
- t.Fatalf("Wrong target type %T", ct)
- }
- return target
+ require.Nil(t, err)
+ return target, client
}
-func assertServiceURL(t *testing.T, url string, target Target, service string) {
- s, err := target.Service(service, 0, 42, "")
- assert.Nil(t, err)
- assert.Equal(t, url, s.BaseURL)
+func getService(t *testing.T, target Target, name string) (*Service, error) {
+ t.Helper()
+ if name == "deploy" {
+ return target.DeployService()
+ }
+ services, err := target.ContainerServices(0)
+ require.Nil(t, err)
+ return FindService(name, services)
}
-func assertServiceWait(t *testing.T, expectedStatus int, target Target, service string) {
- assertServiceWaitErr(t, expectedStatus, false, target, service)
+func assertServiceURL(t *testing.T, url string, target Target, serviceName string) {
+ t.Helper()
+ service, err := getService(t, target, serviceName)
+ require.Nil(t, err)
+ assert.Equal(t, url, service.BaseURL)
}
-func assertServiceWaitErr(t *testing.T, expectedStatus int, expectErr bool, target Target, service string) {
- s, err := target.Service(service, 0, 42, "")
- assert.Nil(t, err)
-
- status, err := s.Wait(0)
- if expectErr {
+func assertService(t *testing.T, fail bool, target Target, serviceName string, timeout time.Duration) {
+ t.Helper()
+ service, err := getService(t, target, serviceName)
+ require.Nil(t, err)
+ err = service.Wait(timeout)
+ if fail {
assert.NotNil(t, err)
} else {
assert.Nil(t, err)
}
- assert.Equal(t, expectedStatus, status)
}
type mockAuthenticator struct{}
diff --git a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java
index 1774b4f81d9..1ab3cc30db7 100644
--- a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java
+++ b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java
@@ -108,7 +108,7 @@ public interface ModelContext {
@ModelFeatureFlag(owners = {"hmusum"}) default Architecture adminClusterArchitecture() { return Architecture.getDefault(); }
@ModelFeatureFlag(owners = {"tokle"}) default boolean enableProxyProtocolMixedMode() { return true; }
@ModelFeatureFlag(owners = {"arnej"}) default String logFileCompressionAlgorithm(String defVal) { return defVal; }
- @ModelFeatureFlag(owners = {"tokle"}, removeAfter = "8.210") default boolean useRestrictedDataPlaneBindings() { return true; }
+ @ModelFeatureFlag(owners = {"tokle"}) default boolean useRestrictedDataPlaneBindings() { return false; }
@ModelFeatureFlag(owners = {"arnej, bjorncs"}) default boolean enableGlobalPhase() { return true; }
@ModelFeatureFlag(owners = {"baldersheim"}, comment = "Select summary decode type") default String summaryDecodePolicy() { return "eager"; }
@ModelFeatureFlag(owners = {"hmusum"}) default boolean allowMoreThanOneContentGroupDown(ClusterSpec.Id id) { return false; }
diff --git a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java
index 9f23c9b7231..b06d3572fcb 100644
--- a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java
+++ b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java
@@ -82,6 +82,7 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea
private int mbus_network_threads = 1;
private int heapSizePercentage = ApplicationContainerCluster.defaultHeapSizePercentageOfAvailableMemory;
private Architecture adminClusterNodeResourcesArchitecture = Architecture.getDefault();
+ private boolean useRestrictedDataPlaneBindings = false;
private Optional<CloudAccount> cloudAccount = Optional.empty();
private boolean allowUserFilters = true;
private boolean allowMoreThanOneContentGroupDown = false;
@@ -140,6 +141,7 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea
@Override public int rpcEventsBeforeWakeup() { return rpc_events_before_wakeup; }
@Override public String queryDispatchPolicy() { return queryDispatchPolicy; }
@Override public String summaryDecodePolicy() { return summaryDecodePolicy; }
+ @Override public boolean useRestrictedDataPlaneBindings() { return useRestrictedDataPlaneBindings; }
@Override public Optional<CloudAccount> cloudAccount() { return cloudAccount; }
@Override public boolean allowUserFilters() { return allowUserFilters; }
@Override public boolean enableGlobalPhase() { return true; } // Enable global-phase by default for unit tests only
@@ -364,6 +366,11 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea
return this;
}
+ public TestProperties setUseRestrictedDataPlaneBindings(boolean useRestrictedDataPlaneBindings) {
+ this.useRestrictedDataPlaneBindings = useRestrictedDataPlaneBindings;
+ return this;
+ }
+
public TestProperties setCloudAccount(CloudAccount cloudAccount) {
this.cloudAccount = Optional.ofNullable(cloudAccount);
return this;
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/application/validation/UriBindingsValidator.java b/config-model/src/main/java/com/yahoo/vespa/model/application/validation/UriBindingsValidator.java
index f869d578dcb..f4aa4f649bd 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/application/validation/UriBindingsValidator.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/application/validation/UriBindingsValidator.java
@@ -57,7 +57,7 @@ class UriBindingsValidator extends Validator {
if (binding instanceof SystemBindingPattern) return;
// Allow binding to port if we are restricting data plane bindings
- if (!binding.matchesAnyPort()) {
+ if (!binding.matchesAnyPort() && !deployState.featureFlags().useRestrictedDataPlaneBindings()) {
throw new IllegalArgumentException(createErrorMessage(binding, "binding with port is not allowed"));
}
if (!binding.host().equals(BindingPattern.WILDCARD_PATTERN)) {
@@ -73,7 +73,7 @@ class UriBindingsValidator extends Validator {
}
private static String createErrorMessage(BindingPattern binding, String message) {
- return String.format("For binding '%s': %s", binding.originalPatternString(), message);
+ return String.format("For binding '%s': %s", binding.patternString(), message);
}
}
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomHandlerBuilder.java b/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomHandlerBuilder.java
index d674a56007f..9b5a1429cb7 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomHandlerBuilder.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomHandlerBuilder.java
@@ -48,7 +48,7 @@ public class DomHandlerBuilder extends VespaDomBuilder.DomConfigProducerBuilderB
@Override
protected Handler doBuild(DeployState deployState, TreeConfigProducer<AnyConfigProducer> parent, Element handlerElement) {
Handler handler = createHandler(handlerElement);
- var ports = deployState.isHosted()
+ var ports = deployState.isHosted() && deployState.featureFlags().useRestrictedDataPlaneBindings()
? portBindingOverride : Set.<Integer>of();
for (Element xmlBinding : XML.getChildren(handlerElement, "binding"))
@@ -64,7 +64,7 @@ public class DomHandlerBuilder extends VespaDomBuilder.DomConfigProducerBuilderB
UserBindingPattern bindingPattern = UserBindingPattern.fromPattern(path);
if (portBindingOverride.isEmpty()) return Set.of(bindingPattern);
return portBindingOverride.stream()
- .map(bindingPattern::withOverriddenPort)
+ .map(bindingPattern::withPort)
.toList();
}
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/clients/ContainerDocumentApi.java b/config-model/src/main/java/com/yahoo/vespa/model/clients/ContainerDocumentApi.java
index 0795fdf41d6..a5a567b18f8 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/clients/ContainerDocumentApi.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/clients/ContainerDocumentApi.java
@@ -92,7 +92,7 @@ public class ContainerDocumentApi {
UserBindingPattern bindingPattern = UserBindingPattern.fromPattern(path);
if (ports.isEmpty()) return List.of(bindingPattern);
return ports.stream()
- .map(p -> (BindingPattern)bindingPattern.withOverriddenPort(p))
+ .map(p -> (BindingPattern)bindingPattern.withPort(p))
.toList();
}
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/component/BindingPattern.java b/config-model/src/main/java/com/yahoo/vespa/model/container/component/BindingPattern.java
index f580a0a2cc9..c3dae7e4c8a 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/container/component/BindingPattern.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/container/component/BindingPattern.java
@@ -63,21 +63,11 @@ public abstract class BindingPattern implements Comparable<BindingPattern> {
return builder.append(path).toString();
}
- public String originalPatternString() {
- StringBuilder builder = new StringBuilder(scheme).append("://").append(host);
- originalPort().ifPresent(port -> builder.append(':').append(port));
- return builder.append(path).toString();
- }
-
/** Compares the underlying pattern string for equality */
public boolean hasSamePattern(BindingPattern other) { return this.patternString().equals(other.patternString()); }
/** Returns true if pattern will match any port (if present) in uri **/
- public boolean matchesAnyPort() { return originalPort().filter(p -> !p.equals(WILDCARD_PATTERN)).isEmpty(); }
-
- public Optional<String> originalPort() {
- return port();
- }
+ public boolean matchesAnyPort() { return port().filter(p -> !p.equals(WILDCARD_PATTERN)).isEmpty(); }
@Override
public boolean equals(Object o) {
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/component/UserBindingPattern.java b/config-model/src/main/java/com/yahoo/vespa/model/container/component/UserBindingPattern.java
index e27dfe69f00..182eca835c1 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/container/component/UserBindingPattern.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/container/component/UserBindingPattern.java
@@ -1,9 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.model.container.component;
-import java.util.Objects;
-import java.util.Optional;
-
/**
* A {@link BindingPattern} which is constructed directly from a user provided 'binding' element from services.xml.
*
@@ -11,30 +8,12 @@ import java.util.Optional;
*/
public class UserBindingPattern extends BindingPattern {
- private final Optional<String> originalPort;
-
- private UserBindingPattern(String scheme, String host, String port, String path) {
- super(scheme, host, port, path);
- this.originalPort = null;
- }
- private UserBindingPattern(String scheme, String host, String port, Optional<String> originalPort, String path) {
- super(scheme, host, port, path);
- this.originalPort = originalPort;
- }
- private UserBindingPattern(String binding) {
- super(binding);
- this.originalPort = null;
- }
+ private UserBindingPattern(String scheme, String host, String port, String path) { super(scheme, host, port, path); }
+ private UserBindingPattern(String binding) { super(binding); }
public static UserBindingPattern fromHttpPath(String path) { return new UserBindingPattern("http", "*", null, path); }
public static UserBindingPattern fromPattern(String binding) { return new UserBindingPattern(binding); }
- public UserBindingPattern withOverriddenPort(int port) { return new UserBindingPattern(scheme(), host(), Integer.toString(port), port(), path()); }
-
- public Optional<String> originalPort() {
- return Objects.isNull(originalPort)
- ? port()
- : originalPort;
- }
+ public UserBindingPattern withPort(int port) { return new UserBindingPattern(scheme(), host(), Integer.toString(port), path()); }
@Override
public String toString() {
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java b/config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java
index 80b676159cb..31f8eba48bf 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java
@@ -1114,7 +1114,7 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> {
private void addSearchHandler(DeployState deployState, ApplicationContainerCluster cluster, Element searchElement, ConfigModelContext context) {
var bindingPatterns = List.<BindingPattern>of(SearchHandler.DEFAULT_BINDING);
- if (isHostedTenantApplication(context)) {
+ if (isHostedTenantApplication(context) && deployState.featureFlags().useRestrictedDataPlaneBindings()) {
bindingPatterns = SearchHandler.bindingPattern(getDataplanePorts(deployState));
}
SearchHandler searchHandler = new SearchHandler(cluster,
@@ -1136,7 +1136,7 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> {
private List<BindingPattern> toBindingList(DeployState deployState, ConfigModelContext context, List<Element> bindingElements) {
List<BindingPattern> result = new ArrayList<>();
- var portOverride = isHostedTenantApplication(context) ? getDataplanePorts(deployState) : Set.<Integer>of();
+ var portOverride = isHostedTenantApplication(context) && deployState.featureFlags().useRestrictedDataPlaneBindings() ? getDataplanePorts(deployState) : Set.<Integer>of();
for (Element element: bindingElements) {
String text = element.getTextContent().trim();
if (!text.isEmpty())
@@ -1149,7 +1149,7 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> {
UserBindingPattern bindingPattern = UserBindingPattern.fromPattern(path);
if (portBindingOverride.isEmpty()) return Set.of(bindingPattern);
return portBindingOverride.stream()
- .map(bindingPattern::withOverriddenPort)
+ .map(bindingPattern::withPort)
.toList();
}
@@ -1160,7 +1160,7 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> {
ContainerDocumentApi.HandlerOptions documentApiOptions = DocumentApiOptionsBuilder.build(documentApiElement);
Element ignoreUndefinedFields = XML.getChild(documentApiElement, "ignore-undefined-fields");
- var portBindingOverride = isHostedTenantApplication(context)
+ var portBindingOverride = deployState.featureFlags().useRestrictedDataPlaneBindings() && isHostedTenantApplication(context)
? getDataplanePorts(deployState)
: Set.<Integer>of();
return new ContainerDocumentApi(cluster, documentApiOptions,
diff --git a/config-model/src/test/java/com/yahoo/vespa/model/application/validation/UriBindingsValidatorTest.java b/config-model/src/test/java/com/yahoo/vespa/model/application/validation/UriBindingsValidatorTest.java
index a56b268eeab..ff9596f2062 100644
--- a/config-model/src/test/java/com/yahoo/vespa/model/application/validation/UriBindingsValidatorTest.java
+++ b/config-model/src/test/java/com/yahoo/vespa/model/application/validation/UriBindingsValidatorTest.java
@@ -57,6 +57,12 @@ public class UriBindingsValidatorTest {
runUriBindingValidator(true, createServicesXmlWithHandler("http://*/my-handler"));
}
+ @Test
+ void allows_portbinding_when_restricting_data_plane() throws IOException, SAXException {
+ runUriBindingValidator(new TestProperties().setHostedVespa(true).setUseRestrictedDataPlaneBindings(true), createServicesXmlWithHandler("http://*:4443/my-handler"));
+ }
+
+ @Test
void allows_user_binding_with_wildcard_port() throws IOException, SAXException {
runUriBindingValidator(true, createServicesXmlWithHandler("http://*:*/my-handler"));
}
diff --git a/config-model/src/test/java/com/yahoo/vespa/model/container/xml/HandlerBuilderTest.java b/config-model/src/test/java/com/yahoo/vespa/model/container/xml/HandlerBuilderTest.java
index fac07c6c6e6..6d61610a84f 100644
--- a/config-model/src/test/java/com/yahoo/vespa/model/container/xml/HandlerBuilderTest.java
+++ b/config-model/src/test/java/com/yahoo/vespa/model/container/xml/HandlerBuilderTest.java
@@ -1,6 +1,5 @@
package com.yahoo.vespa.model.container.xml;
-import com.yahoo.config.model.ConfigModelContext;
import com.yahoo.config.model.builder.xml.test.DomBuilderTest;
import com.yahoo.config.model.deploy.DeployState;
import com.yahoo.config.model.deploy.TestProperties;
@@ -111,15 +110,36 @@ public class HandlerBuilderTest extends ContainerModelBuilderTestBase {
@Test
void restricts_default_bindings_in_hosted_vespa() {
DeployState deployState = new DeployState.Builder()
- .properties(new TestProperties().setHostedVespa(true))
+ .properties(new TestProperties().setHostedVespa(true).setUseRestrictedDataPlaneBindings(true))
.build();
verifyDefaultBindings(deployState, "http://*:4443");
}
@Test
+ void does_not_restrict_default_bindings_in_hosted_vespa_when_disabled() {
+ DeployState deployState = new DeployState.Builder()
+ .properties(new TestProperties().setHostedVespa(true).setUseRestrictedDataPlaneBindings(false))
+ .build();
+ verifyDefaultBindings(deployState, "http://*");
+ }
+
+ @Test
+ void does_not_restrict_infrastructure() {
+ DeployState deployState = new DeployState.Builder()
+
+ .properties(
+ new TestProperties()
+ .setApplicationId(ApplicationId.defaultId())
+ .setHostedVespa(true)
+ .setUseRestrictedDataPlaneBindings(false))
+ .build();
+ verifyDefaultBindings(deployState, "http://*");
+ }
+
+ @Test
void restricts_custom_bindings_in_hosted_vespa() {
DeployState deployState = new DeployState.Builder()
- .properties(new TestProperties().setHostedVespa(true))
+ .properties(new TestProperties().setHostedVespa(true).setUseRestrictedDataPlaneBindings(true))
.build();
verifyCustomSearchBindings(deployState, "http://*:4443");
}
@@ -127,7 +147,7 @@ public class HandlerBuilderTest extends ContainerModelBuilderTestBase {
@Test
void does_not_restrict_default_bindings_in_self_hosted() {
DeployState deployState = new DeployState.Builder()
- .properties(new TestProperties().setHostedVespa(false))
+ .properties(new TestProperties().setHostedVespa(false).setUseRestrictedDataPlaneBindings(false))
.build();
verifyDefaultBindings(deployState, "http://*");
}
@@ -135,15 +155,12 @@ public class HandlerBuilderTest extends ContainerModelBuilderTestBase {
@Test
void does_not_restrict_custom_bindings_in_self_hosted() {
DeployState deployState = new DeployState.Builder()
- .properties(new TestProperties().setHostedVespa(false))
+ .properties(new TestProperties().setHostedVespa(false).setUseRestrictedDataPlaneBindings(false))
.build();
verifyCustomSearchBindings(deployState, "http://*");
}
private void verifyDefaultBindings(DeployState deployState, String bindingPrefix) {
- verifyDefaultBindings(deployState, bindingPrefix, ConfigModelContext.ApplicationType.DEFAULT);
- }
- private void verifyDefaultBindings(DeployState deployState, String bindingPrefix, ConfigModelContext.ApplicationType applicationType) {
Element clusterElem = DomBuilderTest.parse(
"<container id='default' version='1.0'>",
" <search/>",
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
index 1ebfef77a51..d815ea3328a 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
@@ -195,6 +195,7 @@ public class ModelContextImpl implements ModelContext {
private final int mbus_cpp_events_before_wakeup;
private final int rpc_num_targets;
private final int rpc_events_before_wakeup;
+ private final boolean useRestrictedDataPlaneBindings;
private final int heapPercentage;
private final boolean enableGlobalPhase;
private final String summaryDecodePolicy;
@@ -238,6 +239,7 @@ public class ModelContextImpl implements ModelContext {
this.rpc_events_before_wakeup = flagValue(source, appId, version, Flags.RPC_EVENTS_BEFORE_WAKEUP);
this.queryDispatchPolicy = flagValue(source, appId, version, Flags.QUERY_DISPATCH_POLICY);
this.queryDispatchWarmup = flagValue(source, appId, version, PermanentFlags.QUERY_DISPATCH_WARMUP);
+ this.useRestrictedDataPlaneBindings = flagValue(source, appId, version, Flags.RESTRICT_DATA_PLANE_BINDINGS);
this.heapPercentage = flagValue(source, appId, version, PermanentFlags.HEAP_SIZE_PERCENTAGE);
this.enableGlobalPhase = flagValue(source, appId, version, Flags.ENABLE_GLOBAL_PHASE);
this.summaryDecodePolicy = flagValue(source, appId, version, Flags.SUMMARY_DECODE_POLICY);
@@ -291,6 +293,7 @@ public class ModelContextImpl implements ModelContext {
}
return defVal;
}
+ @Override public boolean useRestrictedDataPlaneBindings() { return useRestrictedDataPlaneBindings; }
@Override public boolean enableGlobalPhase() { return enableGlobalPhase; }
@Override public boolean allowMoreThanOneContentGroupDown(ClusterSpec.Id id) { return allowMoreThanOneContentGroupDown.test(id); }
@Override public boolean enableDataplaneProxy() { return enableDataplaneProxy; }
diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/configserver/Node.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/configserver/Node.java
index a26e7cce29a..0b0664cd3bf 100644
--- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/configserver/Node.java
+++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/configserver/Node.java
@@ -3,6 +3,7 @@ package com.yahoo.vespa.hosted.controller.api.integration.configserver;
import com.yahoo.component.Version;
import com.yahoo.config.provision.ApplicationId;
+import com.yahoo.config.provision.CloudAccount;
import com.yahoo.config.provision.DockerImage;
import com.yahoo.config.provision.HostName;
import com.yahoo.config.provision.NodeResources;
@@ -75,6 +76,7 @@ public class Node {
private final Optional<String> switchHostname;
private final Optional<String> modelName;
private final Environment environment;
+ private final CloudAccount cloudAccount;
private Node(String id, HostName hostname, Optional<HostName> parentHostname, State state, NodeType type,
NodeResources resources, Optional<ApplicationId> owner, Version currentVersion, Version wantedVersion,
@@ -87,7 +89,7 @@ public class Node {
DockerImage wantedDockerImage, DockerImage currentDockerImage, Optional<ClusterType> exclusiveToClusterType, Map<String, String> reports,
List<Event> history, Set<String> ipAddresses, Set<String> additionalIpAddresses,
Set<String> additionalHostnames, Optional<String> switchHostname,
- Optional<String> modelName, Environment environment) {
+ Optional<String> modelName, Environment environment, CloudAccount cloudAccount) {
this.id = Objects.requireNonNull(id, "id must be non-null");
this.hostname = Objects.requireNonNull(hostname, "hostname must be non-null");
this.parentHostname = Objects.requireNonNull(parentHostname, "parentHostname must be non-null");
@@ -133,6 +135,7 @@ public class Node {
this.switchHostname = Objects.requireNonNull(switchHostname, "switchHostname must be non-null");
this.modelName = Objects.requireNonNull(modelName, "modelName must be non-null");
this.environment = Objects.requireNonNull(environment, "environment must be non-ull");
+ this.cloudAccount = Objects.requireNonNull(cloudAccount, "cloudAccount must be non-null");
}
/** The cloud provider's unique ID for this */
@@ -344,6 +347,10 @@ public class Node {
return environment;
}
+ public CloudAccount cloudAccount() {
+ return cloudAccount;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -501,6 +508,7 @@ public class Node {
private Optional<String> switchHostname = Optional.empty();
private Optional<String> modelName = Optional.empty();
private Environment environment = Environment.unknown;
+ private CloudAccount cloudAccount = CloudAccount.empty;
private Builder() {}
@@ -785,6 +793,11 @@ public class Node {
return this;
}
+ public Builder cloudAccount(CloudAccount cloudAccount) {
+ this.cloudAccount = cloudAccount;
+ return this;
+ }
+
public Node build() {
return new Node(id, hostname, parentHostname, state, type, resources, owner, currentVersion, wantedVersion,
currentOsVersion, wantedOsVersion, deferOsUpgrade, currentFirmwareCheck, wantedFirmwareCheck, serviceState,
@@ -792,7 +805,7 @@ public class Node {
wantedRebootGeneration, cost, failCount, flavor, clusterId, clusterType, group, index, retired,
wantToRetire, wantToDeprovision, wantToRebuild, down, reservedTo, exclusiveTo, wantedDockerImage,
currentDockerImage, exclusiveToClusterType, reports, history, ipAddresses, additionalIpAddresses,
- additionalHostnames, switchHostname, modelName, environment);
+ additionalHostnames, switchHostname, modelName, environment, cloudAccount);
}
}
diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
index d5eadf45b08..c48a4243fde 100644
--- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
+++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
@@ -274,6 +274,13 @@ public class Flags {
APPLICATION_ID,HOSTNAME,NODE_TYPE,TENANT_ID,VESPA_VERSION
);
+ public static final UnboundBooleanFlag RESTRICT_DATA_PLANE_BINDINGS = defineFeatureFlag(
+ "restrict-data-plane-bindings", false,
+ List.of("mortent"), "2022-09-08", "2023-09-01",
+ "Use restricted data plane bindings",
+ "Takes effect at redeployment",
+ APPLICATION_ID);
+
public static final UnboundBooleanFlag ENABLE_OTELCOL = defineFeatureFlag(
"enable-otel-collector", false,
List.of("olaa"), "2022-09-23", "2023-09-01",
@@ -368,6 +375,13 @@ public class Flags {
"Use the vespa user for running Vespa everywhere",
"Takes effect immediately");
+ public static final UnboundBooleanFlag MORE_WIREGUARD = defineFeatureFlag(
+ "more-wireguard", false,
+ List.of("andreer"), "2023-08-21", "2023-09-21",
+ "Use wireguard in INternal enCLAVES",
+ "Takes effect on next host-admin run",
+ HOSTNAME);
+
/** WARNING: public for testing: All flags should be defined in {@link Flags}. */
public static UnboundBooleanFlag defineFeatureFlag(String flagId, boolean defaultValue, List<String> owners,
String createdAt, String expiresAt, String description,
diff --git a/metrics/pom.xml b/metrics/pom.xml
index e8303e5a01f..a2fd17b7ea5 100644
--- a/metrics/pom.xml
+++ b/metrics/pom.xml
@@ -12,8 +12,14 @@
<packaging>jar</packaging>
<version>8-SNAPSHOT</version>
<name>metrics</name>
+
<dependencies>
<dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>com.yahoo.vespa</groupId>
<artifactId>annotations</artifactId>
<version>${project.version}</version>
@@ -25,7 +31,18 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
+
<build>
<plugins>
<plugin>
diff --git a/metrics/src/main/java/ai/vespa/metrics/VespaMetrics.java b/metrics/src/main/java/ai/vespa/metrics/VespaMetrics.java
index 3a17d8a3155..9a498abc911 100644
--- a/metrics/src/main/java/ai/vespa/metrics/VespaMetrics.java
+++ b/metrics/src/main/java/ai/vespa/metrics/VespaMetrics.java
@@ -17,6 +17,8 @@ public interface VespaMetrics {
return baseName() + "." + suffix.suffix();
}
+ // TODO: make the below methods return Metric objects instead of Strings.
+
default String ninety_five_percentile() {
return withSuffix(Suffix.ninety_five_percentile);
}
diff --git a/metrics/src/main/java/ai/vespa/metrics/set/BasicMetricSets.java b/metrics/src/main/java/ai/vespa/metrics/set/BasicMetricSets.java
new file mode 100644
index 00000000000..f167e654e6f
--- /dev/null
+++ b/metrics/src/main/java/ai/vespa/metrics/set/BasicMetricSets.java
@@ -0,0 +1,23 @@
+package ai.vespa.metrics.set;
+
+import ai.vespa.metrics.ContainerMetrics;
+
+/**
+ * Defines metric sets that are meant to be used as building blocks for other metric sets.
+ *
+ * @author gjoranv
+ */
+public class BasicMetricSets {
+
+ static MetricSet containerHttpStatusMetrics() {
+ return new MetricSet.Builder("basic-container-http-status")
+ .metric(ContainerMetrics.HTTP_STATUS_1XX.rate())
+
+ .metric(ContainerMetrics.HTTP_STATUS_2XX.rate())
+ .metric(ContainerMetrics.HTTP_STATUS_3XX.rate())
+ .metric(ContainerMetrics.HTTP_STATUS_4XX.rate())
+ .metric(ContainerMetrics.HTTP_STATUS_5XX.rate())
+ .build();
+ }
+
+}
diff --git a/metrics/src/main/java/ai/vespa/metrics/set/DefaultMetrics.java b/metrics/src/main/java/ai/vespa/metrics/set/DefaultMetrics.java
index 515b06de2d8..6ef23b790cd 100644
--- a/metrics/src/main/java/ai/vespa/metrics/set/DefaultMetrics.java
+++ b/metrics/src/main/java/ai/vespa/metrics/set/DefaultMetrics.java
@@ -2,20 +2,16 @@
package ai.vespa.metrics.set;
+import ai.vespa.metrics.ClusterControllerMetrics;
import ai.vespa.metrics.ContainerMetrics;
-import ai.vespa.metrics.SearchNodeMetrics;
-import ai.vespa.metrics.StorageMetrics;
import ai.vespa.metrics.DistributorMetrics;
-import ai.vespa.metrics.ClusterControllerMetrics;
-import ai.vespa.metrics.SentinelMetrics;
import ai.vespa.metrics.NodeAdminMetrics;
-import ai.vespa.metrics.Suffix;
-import ai.vespa.metrics.VespaMetrics;
+import ai.vespa.metrics.SearchNodeMetrics;
+import ai.vespa.metrics.SentinelMetrics;
+import ai.vespa.metrics.StorageMetrics;
-import java.util.Collections;
import java.util.EnumSet;
-import java.util.LinkedHashSet;
-import java.util.Set;
+import java.util.List;
import static ai.vespa.metrics.Suffix.average;
import static ai.vespa.metrics.Suffix.count;
@@ -41,135 +37,137 @@ public class DefaultMetrics {
private static MetricSet createMetricSet() {
return new MetricSet(defaultMetricSetId,
- getAllMetrics(),
- Set.of(defaultVespaMetricSet));
+ List.of(),
+ List.of(defaultVespaMetricSet,
+ BasicMetricSets.containerHttpStatusMetrics(),
+ getContainerMetrics(),
+ getSearchChainMetrics(),
+ getDocprocMetrics(),
+ getSearchNodeMetrics(),
+ getContentMetrics(),
+ getStorageMetrics(),
+ getDistributorMetrics(),
+ getClusterControllerMetrics(),
+ getSentinelMetrics(),
+ getOtherMetrics()));
}
- private static Set<Metric> getAllMetrics() {
- Set<Metric> metrics = new LinkedHashSet<>();
-
- addContainerMetrics(metrics);
- addSearchChainMetrics(metrics);
- addDocprocMetrics(metrics);
- addSearchNodeMetrics(metrics);
- addContentMetrics(metrics);
- addStorageMetrics(metrics);
- addDistributorMetrics(metrics);
- addClusterControllerMetrics(metrics);
- addSentinelMetrics(metrics);
- addOtherMetrics(metrics);
- return Collections.unmodifiableSet(metrics);
+ private static MetricSet getContainerMetrics() {
+ return new MetricSet.Builder("default-container")
+ .metric(ContainerMetrics.JDISC_GC_MS, EnumSet.of(max, average))
+ .metric(ContainerMetrics.MEM_HEAP_FREE.average())
+ .metric(ContainerMetrics.FEED_LATENCY, EnumSet.of(sum, count))
+ // .metric(ContainerMetrics.CPU.baseName()) // TODO: Add to container metrics
+ .metric(ContainerMetrics.JDISC_THREAD_POOL_SIZE.max())
+ .metric(ContainerMetrics.JDISC_THREAD_POOL_ACTIVE_THREADS, EnumSet.of(sum, count, min, max))
+ .metric(ContainerMetrics.JDISC_THREAD_POOL_WORK_QUEUE_CAPACITY.max())
+ .metric(ContainerMetrics.JDISC_THREAD_POOL_WORK_QUEUE_SIZE, EnumSet.of(sum, count, min, max))
+ .metric(ContainerMetrics.SERVER_ACTIVE_THREADS.average())
+
+ // Metrics needed for alerting
+ .metric(ContainerMetrics.JDISC_SINGLETON_IS_ACTIVE, EnumSet.of(max, last)) // TODO: Vespa 9: Remove last
+ .metric(ContainerMetrics.JDISC_HTTP_SSL_HANDSHAKE_FAILURE_MISSING_CLIENT_CERT.rate())
+ .metric(ContainerMetrics.JDISC_HTTP_SSL_HANDSHAKE_FAILURE_INCOMPATIBLE_PROTOCOLS.rate())
+ .metric(ContainerMetrics.JDISC_HTTP_SSL_HANDSHAKE_FAILURE_INCOMPATIBLE_CHIFERS.rate())
+ .metric(ContainerMetrics.JDISC_HTTP_SSL_HANDSHAKE_FAILURE_UNKNOWN.rate())
+ .metric(ContainerMetrics.JDISC_APPLICATION_FAILED_COMPONENT_GRAPHS.rate())
+ .metric(ContainerMetrics.ATHENZ_TENANT_CERT_EXPIRY_SECONDS, EnumSet.of(max, last)) // TODO: Vespa 9: Remove last
+ .build();
}
- private static void addContainerMetrics(Set<Metric> metrics) {
- addMetric(metrics, ContainerMetrics.HTTP_STATUS_1XX.rate());
- addMetric(metrics, ContainerMetrics.HTTP_STATUS_2XX.rate());
- addMetric(metrics, ContainerMetrics.HTTP_STATUS_3XX.rate());
- addMetric(metrics, ContainerMetrics.HTTP_STATUS_4XX.rate());
- addMetric(metrics, ContainerMetrics.HTTP_STATUS_5XX.rate());
- addMetric(metrics, ContainerMetrics.JDISC_GC_MS, EnumSet.of(max, average));
- addMetric(metrics, ContainerMetrics.MEM_HEAP_FREE.average());
- addMetric(metrics, ContainerMetrics.FEED_LATENCY, EnumSet.of(sum, count));
- // addMetric(metrics, ContainerMetrics.CPU.baseName()); // TODO: Add to container metrics
- addMetric(metrics, ContainerMetrics.JDISC_THREAD_POOL_SIZE.max());
- addMetric(metrics, ContainerMetrics.JDISC_THREAD_POOL_ACTIVE_THREADS, EnumSet.of(sum, count, min, max));
- addMetric(metrics, ContainerMetrics.JDISC_THREAD_POOL_WORK_QUEUE_CAPACITY.max());
- addMetric(metrics, ContainerMetrics.JDISC_THREAD_POOL_WORK_QUEUE_SIZE, EnumSet.of(sum, count, min, max));
- addMetric(metrics, ContainerMetrics.SERVER_ACTIVE_THREADS.average());
-
- // Metrics needed for alerting
- addMetric(metrics, ContainerMetrics.JDISC_SINGLETON_IS_ACTIVE, EnumSet.of(max, last)); // TODO: Vespa 9: Remove last
- addMetric(metrics, ContainerMetrics.JDISC_HTTP_SSL_HANDSHAKE_FAILURE_MISSING_CLIENT_CERT.rate());
- addMetric(metrics, ContainerMetrics.JDISC_HTTP_SSL_HANDSHAKE_FAILURE_INCOMPATIBLE_PROTOCOLS.rate());
- addMetric(metrics, ContainerMetrics.JDISC_HTTP_SSL_HANDSHAKE_FAILURE_INCOMPATIBLE_CHIFERS.rate());
- addMetric(metrics, ContainerMetrics.JDISC_HTTP_SSL_HANDSHAKE_FAILURE_UNKNOWN.rate());
- addMetric(metrics, ContainerMetrics.JDISC_APPLICATION_FAILED_COMPONENT_GRAPHS.rate());
- addMetric(metrics, ContainerMetrics.ATHENZ_TENANT_CERT_EXPIRY_SECONDS, EnumSet.of(max, last)); // TODO: Vespa 9: Remove last
- }
-
- private static void addSearchChainMetrics(Set<Metric> metrics) {
- addMetric(metrics, ContainerMetrics.QUERIES.rate());
- addMetric(metrics, ContainerMetrics.QUERY_LATENCY, EnumSet.of(sum, count, max, ninety_five_percentile, ninety_nine_percentile, average)); // TODO: Remove average with Vespa 9
- addMetric(metrics, ContainerMetrics.HITS_PER_QUERY, EnumSet.of(sum, count, max, average)); // TODO: Remove average with Vespa 9
- addMetric(metrics, ContainerMetrics.TOTAL_HITS_PER_QUERY, EnumSet.of(sum, count, max, average)); // TODO: Remove average with Vespa 9
- addMetric(metrics, ContainerMetrics.DEGRADED_QUERIES.rate());
- addMetric(metrics, ContainerMetrics.FAILED_QUERIES.rate());
+ private static MetricSet getSearchChainMetrics() {
+ return new MetricSet.Builder("default-search-chain")
+ .metric(ContainerMetrics.QUERIES.rate())
+ .metric(ContainerMetrics.QUERY_LATENCY, EnumSet.of(sum, count, max, ninety_five_percentile, ninety_nine_percentile, average)) // TODO: Remove average with Vespa 9
+ .metric(ContainerMetrics.HITS_PER_QUERY, EnumSet.of(sum, count, max, average)) // TODO: Remove average with Vespa 9
+ .metric(ContainerMetrics.TOTAL_HITS_PER_QUERY, EnumSet.of(sum, count, max, average)) // TODO: Remove average with Vespa 9
+ .metric(ContainerMetrics.DEGRADED_QUERIES.rate())
+ .metric(ContainerMetrics.FAILED_QUERIES.rate())
+ .build();
}
- private static void addDocprocMetrics(Set<Metric> metrics) {
- addMetric(metrics, ContainerMetrics.DOCPROC_DOCUMENTS.sum());
+ private static MetricSet getDocprocMetrics() {
+ return new MetricSet.Builder("default-docproc")
+ .metric(ContainerMetrics.DOCPROC_DOCUMENTS.sum())
+ .build();
}
- private static void addSearchNodeMetrics(Set<Metric> metrics) {
+ private static MetricSet getSearchNodeMetrics() {
// Metrics needed for alerting
- addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_RESOURCE_USAGE_DISK.average());
- addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_RESOURCE_USAGE_MEMORY.average());
- addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_RESOURCE_USAGE_FEEDING_BLOCKED, EnumSet.of(max, last)); // TODO: Vespa 9: Remove last
+ return new MetricSet.Builder("default-search-node")
+ .metric(SearchNodeMetrics.CONTENT_PROTON_RESOURCE_USAGE_DISK.average())
+ .metric(SearchNodeMetrics.CONTENT_PROTON_RESOURCE_USAGE_MEMORY.average())
+ .metric(SearchNodeMetrics.CONTENT_PROTON_RESOURCE_USAGE_FEEDING_BLOCKED, EnumSet.of(max, last)) // TODO: Vespa 9: Remove last
+ .build();
}
- private static void addContentMetrics(Set<Metric> metrics) {
- addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_SEARCH_PROTOCOL_DOCSUM_REQUESTED_DOCUMENTS.rate());
- addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_SEARCH_PROTOCOL_DOCSUM_LATENCY, EnumSet.of(sum, count, max, average)); // TODO: Remove average with Vespa 9
- addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_SEARCH_PROTOCOL_QUERY_LATENCY, EnumSet.of(sum, count, max, average)); // TODO: Remove average with Vespa 9
-
- addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_DOCUMENTS_TOTAL, EnumSet.of(max,last)); // TODO: Vespa 9: Remove last
- addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_DOCUMENTS_READY, EnumSet.of(max,last)); // TODO: Vespa 9: Remove last
- addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_DOCUMENTS_ACTIVE, EnumSet.of(max,last)); // TODO: Vespa 9: Remove last
- addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_DISK_USAGE.last());
- addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MEMORY_USAGE_ALLOCATED_BYTES.last());
-
- addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_RESOURCE_USAGE_DISK.average());
- addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_RESOURCE_USAGE_MEMORY.average());
-
- addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MATCHING_DOCS_MATCHED.rate());
- addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MATCHING_DOCS_RERANKED.rate());
- addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MATCHING_RANK_PROFILE_QUERY_SETUP_TIME, EnumSet.of(sum, count, max, average)); // TODO: Remove average with Vespa 9
- addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MATCHING_RANK_PROFILE_QUERY_LATENCY, EnumSet.of(sum, count, max, average)); // TODO: Remove average with Vespa 9
- addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MATCHING_RANK_PROFILE_RERANK_TIME, EnumSet.of(sum, count, max, average)); // TODO: Remove average with Vespa 9
- addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_TRANSACTIONLOG_DISK_USAGE.last());
+ private static MetricSet getContentMetrics() {
+ return new MetricSet.Builder("default-content")
+ .metric(SearchNodeMetrics.CONTENT_PROTON_SEARCH_PROTOCOL_DOCSUM_REQUESTED_DOCUMENTS.rate())
+ .metric(SearchNodeMetrics.CONTENT_PROTON_SEARCH_PROTOCOL_DOCSUM_LATENCY, EnumSet.of(sum, count, max, average)) // TODO: Remove average with Vespa 9
+ .metric(SearchNodeMetrics.CONTENT_PROTON_SEARCH_PROTOCOL_QUERY_LATENCY, EnumSet.of(sum, count, max, average)) // TODO: Remove average with Vespa 9
+
+ .metric(SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_DOCUMENTS_TOTAL, EnumSet.of(max,last)) // TODO: Vespa 9: Remove last
+ .metric(SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_DOCUMENTS_READY, EnumSet.of(max,last)) // TODO: Vespa 9: Remove last
+ .metric(SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_DOCUMENTS_ACTIVE, EnumSet.of(max,last)) // TODO: Vespa 9: Remove last
+ .metric(SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_DISK_USAGE.last())
+ .metric(SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MEMORY_USAGE_ALLOCATED_BYTES.last())
+
+ .metric(SearchNodeMetrics.CONTENT_PROTON_RESOURCE_USAGE_DISK.average())
+ .metric(SearchNodeMetrics.CONTENT_PROTON_RESOURCE_USAGE_MEMORY.average())
+
+ .metric(SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MATCHING_DOCS_MATCHED.rate())
+ .metric(SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MATCHING_DOCS_RERANKED.rate())
+ .metric(SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MATCHING_RANK_PROFILE_QUERY_SETUP_TIME, EnumSet.of(sum, count, max, average)) // TODO: Remove average with Vespa 9
+ .metric(SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MATCHING_RANK_PROFILE_QUERY_LATENCY, EnumSet.of(sum, count, max, average)) // TODO: Remove average with Vespa 9
+ .metric(SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MATCHING_RANK_PROFILE_RERANK_TIME, EnumSet.of(sum, count, max, average)) // TODO: Remove average with Vespa 9
+ .metric(SearchNodeMetrics.CONTENT_PROTON_TRANSACTIONLOG_DISK_USAGE.last())
+ .build();
}
- private static void addStorageMetrics(Set<Metric> metrics) {
- addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_PUT_COUNT.rate());
- addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_UPDATE_COUNT.rate());
- addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_REMOVE_COUNT.rate());
+ private static MetricSet getStorageMetrics() {
+ return new MetricSet.Builder("default-storage")
+ .metric(StorageMetrics.VDS_FILESTOR_ALLTHREADS_PUT_COUNT.rate())
+ .metric(StorageMetrics.VDS_FILESTOR_ALLTHREADS_UPDATE_COUNT.rate())
+ .metric(StorageMetrics.VDS_FILESTOR_ALLTHREADS_REMOVE_COUNT.rate())
+ .build();
}
- private static void addDistributorMetrics(Set<Metric> metrics) {
- addMetric(metrics, DistributorMetrics.VDS_DISTRIBUTOR_DOCSSTORED.average());
+ private static MetricSet getDistributorMetrics() {
+ return new MetricSet.Builder("default-distributor")
+ .metric(DistributorMetrics.VDS_DISTRIBUTOR_DOCSSTORED.average())
- // Metrics needed for alerting
- addMetric(metrics, DistributorMetrics.VDS_BOUNCER_CLOCK_SKEW_ABORTS.count());
+ // Metrics needed for alerting
+ .metric(DistributorMetrics.VDS_BOUNCER_CLOCK_SKEW_ABORTS.count())
+ .build();
}
- private static void addClusterControllerMetrics(Set<Metric> metrics) {
+ private static MetricSet getClusterControllerMetrics() {
// Metrics needed for alerting
- addMetric(metrics, ClusterControllerMetrics.DOWN_COUNT, EnumSet.of(max, last)); // TODO: Vespa 9: Remove last
- addMetric(metrics, ClusterControllerMetrics.MAINTENANCE_COUNT, EnumSet.of(max, last)); // TODO: Vespa 9: Remove last
- addMetric(metrics, ClusterControllerMetrics.UP_COUNT.last());
- addMetric(metrics, ClusterControllerMetrics.IS_MASTER, EnumSet.of(max, last)); // TODO: Vespa 9: Remove last
- addMetric(metrics, ClusterControllerMetrics.RESOURCE_USAGE_NODES_ABOVE_LIMIT, EnumSet.of(max, last)); // TODO: Vespa 9: Remove last
- addMetric(metrics, ClusterControllerMetrics.RESOURCE_USAGE_MAX_MEMORY_UTILIZATION, EnumSet.of(last, max)); // TODO: Vespa 9: Remove last
- addMetric(metrics, ClusterControllerMetrics.RESOURCE_USAGE_MAX_DISK_UTILIZATION, EnumSet.of(last, max)); // TODO: Vespa 9: Remove last
+ return new MetricSet.Builder("default-cluster-controller")
+ .metric(ClusterControllerMetrics.DOWN_COUNT, EnumSet.of(max, last)) // TODO: Vespa 9: Remove last
+ .metric(ClusterControllerMetrics.MAINTENANCE_COUNT, EnumSet.of(max, last)) // TODO: Vespa 9: Remove last
+ .metric(ClusterControllerMetrics.UP_COUNT.last())
+ .metric(ClusterControllerMetrics.IS_MASTER, EnumSet.of(max, last)) // TODO: Vespa 9: Remove last
+ .metric(ClusterControllerMetrics.RESOURCE_USAGE_NODES_ABOVE_LIMIT, EnumSet.of(max, last)) // TODO: Vespa 9: Remove last
+ .metric(ClusterControllerMetrics.RESOURCE_USAGE_MAX_MEMORY_UTILIZATION, EnumSet.of(last, max)) // TODO: Vespa 9: Remove last
+ .metric(ClusterControllerMetrics.RESOURCE_USAGE_MAX_DISK_UTILIZATION, EnumSet.of(last, max)) // TODO: Vespa 9: Remove last
+ .build();
}
- private static void addSentinelMetrics(Set<Metric> metrics) {
+ private static MetricSet getSentinelMetrics() {
// Metrics needed for alerting
- addMetric(metrics, SentinelMetrics.SENTINEL_TOTAL_RESTARTS, EnumSet.of(max, sum, last)); // TODO: Vespa 9: Remove last, sum?
+ return new MetricSet.Builder("default-sentinel")
+ .metric(SentinelMetrics.SENTINEL_TOTAL_RESTARTS, EnumSet.of(max, sum, last)) // TODO: Vespa 9: Remove last, sum?
+ .build();
}
- private static void addOtherMetrics(Set<Metric> metrics) {
+ private static MetricSet getOtherMetrics() {
// Metrics needed for alerting
- addMetric(metrics, NodeAdminMetrics.ENDPOINT_CERTIFICATE_EXPIRY_SECONDS.baseName());
- addMetric(metrics, NodeAdminMetrics.NODE_CERTIFICATE_EXPIRY_SECONDS.baseName());
- }
-
- private static void addMetric(Set<Metric> metrics, String nameWithSuffix) {
- metrics.add(new Metric(nameWithSuffix));
- }
-
- private static void addMetric(Set<Metric> metrics, VespaMetrics metric, EnumSet<Suffix> suffixes) {
- suffixes.forEach(suffix -> metrics.add(new Metric(metric.baseName() + "." + suffix.suffix())));
+ return new MetricSet.Builder("default-other")
+ .metric(NodeAdminMetrics.ENDPOINT_CERTIFICATE_EXPIRY_SECONDS.baseName())
+ .metric(NodeAdminMetrics.NODE_CERTIFICATE_EXPIRY_SECONDS.baseName())
+ .build();
}
private DefaultMetrics() { }
diff --git a/metrics/src/main/java/ai/vespa/metrics/set/DefaultVespaMetrics.java b/metrics/src/main/java/ai/vespa/metrics/set/DefaultVespaMetrics.java
index 93b6bfab002..e34c8ee68eb 100644
--- a/metrics/src/main/java/ai/vespa/metrics/set/DefaultVespaMetrics.java
+++ b/metrics/src/main/java/ai/vespa/metrics/set/DefaultVespaMetrics.java
@@ -4,9 +4,6 @@ package ai.vespa.metrics.set;
import ai.vespa.metrics.ContainerMetrics;
import ai.vespa.metrics.SearchNodeMetrics;
-import java.util.LinkedHashSet;
-import java.util.Set;
-
/**
* Encapsulates a minimal set of Vespa metrics to be used as default for all metrics consumers.
*
@@ -19,11 +16,11 @@ public class DefaultVespaMetrics {
public static final MetricSet defaultVespaMetricSet = createDefaultVespaMetricSet();
private static MetricSet createDefaultVespaMetricSet() {
- Set<Metric> metrics = new LinkedHashSet<>();
-
- metrics.add(new Metric(ContainerMetrics.FEED_OPERATIONS.rate()));
- metrics.add(new Metric(SearchNodeMetrics.CONTENT_PROTON_RESOURCE_USAGE_FEEDING_BLOCKED.last()));
- return new MetricSet("default-vespa", metrics);
+ return new MetricSet.Builder("default-vespa")
+ .metric(ContainerMetrics.FEED_OPERATIONS.rate())
+ .metric(SearchNodeMetrics.CONTENT_PROTON_RESOURCE_USAGE_FEEDING_BLOCKED.last())
+ .build();
}
+
}
diff --git a/metrics/src/main/java/ai/vespa/metrics/set/MetricSet.java b/metrics/src/main/java/ai/vespa/metrics/set/MetricSet.java
index b8409fb7663..f334690a7ca 100644
--- a/metrics/src/main/java/ai/vespa/metrics/set/MetricSet.java
+++ b/metrics/src/main/java/ai/vespa/metrics/set/MetricSet.java
@@ -1,7 +1,11 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.metrics.set;
+import ai.vespa.metrics.Suffix;
+import ai.vespa.metrics.VespaMetrics;
+
import java.util.Collection;
+import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
@@ -88,4 +92,44 @@ public class MetricSet {
return metricMap;
}
+
+ public static class Builder {
+ private final String id;
+ private final Set<Metric> metrics = new LinkedHashSet<>();
+ private final Set<MetricSet> children = new LinkedHashSet<>();
+
+ public Builder(String id) {
+ this.id = id;
+ }
+
+ public Builder metric(String metric) {
+ return metric(new Metric(metric));
+ }
+
+ /** Adds all given suffixes of the given metric to this set. */
+ public Builder metric(VespaMetrics metric, EnumSet<Suffix> suffixes) {
+ suffixes.forEach(suffix -> metrics.add(new Metric(metric.baseName() + "." + suffix.suffix())));
+ return this;
+ }
+
+ public Builder metric(Metric metric) {
+ metrics.add(metric);
+ return this;
+ }
+
+ public Builder metrics(Collection<Metric> metrics) {
+ this.metrics.addAll(metrics);
+ return this;
+ }
+
+ public Builder metricSet(MetricSet child) {
+ children.add(child);
+ return this;
+ }
+
+ public MetricSet build() {
+ return new MetricSet(id, metrics, children);
+ }
+ }
+
}
diff --git a/metrics/src/main/java/ai/vespa/metrics/set/VespaMetricSet.java b/metrics/src/main/java/ai/vespa/metrics/set/VespaMetricSet.java
index bc8567b8bf5..18c5a637eb9 100644
--- a/metrics/src/main/java/ai/vespa/metrics/set/VespaMetricSet.java
+++ b/metrics/src/main/java/ai/vespa/metrics/set/VespaMetricSet.java
@@ -17,6 +17,7 @@ import ai.vespa.metrics.VespaMetrics;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Set;
import static ai.vespa.metrics.Suffix.average;
@@ -29,7 +30,6 @@ import static ai.vespa.metrics.Suffix.ninety_nine_percentile;
import static ai.vespa.metrics.Suffix.rate;
import static ai.vespa.metrics.Suffix.sum;
import static ai.vespa.metrics.set.DefaultVespaMetrics.defaultVespaMetricSet;
-import static java.util.Collections.singleton;
/**
* Encapsulates vespa service metrics.
@@ -38,9 +38,14 @@ import static java.util.Collections.singleton;
*/
public class VespaMetricSet {
- public static final MetricSet vespaMetricSet = new MetricSet("vespa",
- getVespaMetrics(),
- singleton(defaultVespaMetricSet));
+ public static final MetricSet vespaMetricSet = createMetricSet();
+
+ private static MetricSet createMetricSet() {
+ return new MetricSet("vespa",
+ getVespaMetrics(),
+ List.of(defaultVespaMetricSet,
+ BasicMetricSets.containerHttpStatusMetrics()));
+ }
private static Set<Metric> getVespaMetrics() {
Set<Metric> metrics = new LinkedHashSet<>();
@@ -187,12 +192,6 @@ public class VespaMetricSet {
addMetric(metrics, ContainerMetrics.ATHENZ_TENANT_CERT_EXPIRY_SECONDS, EnumSet.of(max, last)); // TODO: Vespa 9: Remove last
addMetric(metrics, ContainerMetrics.CONTAINER_IAM_ROLE_EXPIRY_SECONDS.baseName());
- addMetric(metrics, ContainerMetrics.HTTP_STATUS_1XX.rate());
- addMetric(metrics, ContainerMetrics.HTTP_STATUS_2XX.rate());
- addMetric(metrics, ContainerMetrics.HTTP_STATUS_3XX.rate());
- addMetric(metrics, ContainerMetrics.HTTP_STATUS_4XX.rate());
- addMetric(metrics, ContainerMetrics.HTTP_STATUS_5XX.rate());
-
addMetric(metrics, ContainerMetrics.JDISC_HTTP_REQUEST_PREMATURELY_CLOSED.rate());
addMetric(metrics, ContainerMetrics.JDISC_HTTP_REQUEST_REQUESTS_PER_CONNECTION, EnumSet.of(sum, count, min, max, average));
addMetric(metrics, ContainerMetrics.JDISC_HTTP_REQUEST_URI_LENGTH, EnumSet.of(sum, count, max));
diff --git a/config-model/src/test/java/com/yahoo/vespa/model/admin/monitoring/MetricSetTest.java b/metrics/src/test/java/ai/vespa/metrics/MetricSetTest.java
index 8235f45aaec..788e9e9836c 100644
--- a/config-model/src/test/java/com/yahoo/vespa/model/admin/monitoring/MetricSetTest.java
+++ b/metrics/src/test/java/ai/vespa/metrics/MetricSetTest.java
@@ -1,11 +1,12 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.model.admin.monitoring;
+package ai.vespa.metrics;
import ai.vespa.metrics.set.Metric;
import ai.vespa.metrics.set.MetricSet;
import com.google.common.collect.Sets;
import org.junit.jupiter.api.Test;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@@ -60,4 +61,54 @@ public class MetricSetTest {
assertEquals(3, combinedMetric.dimensions.size());
assertEquals("parentCommonVal", combinedMetric.dimensions.get(COMMON_DIMENSION_KEY));
}
+
+ @Test
+ void it_can_be_generated_from_builder() {
+ MetricSet metricSet = new MetricSet.Builder("test")
+ .metric("metric1")
+ .metric(TestMetrics.ENUM_METRIC1.last())
+ .metric(TestMetrics.ENUM_METRIC2, EnumSet.of(Suffix.sum, Suffix.count))
+ .metric(new Metric("metric2"))
+ .metrics(List.of(new Metric("metric3")))
+ .metricSet(new MetricSet.Builder("child")
+ .metric("child_metric1")
+ .metric("child_metric2")
+ .build())
+ .build();
+
+ Map<String, Metric> metrics = metricSet.getMetrics();
+ assertEquals(8, metrics.size());
+ assertNotNull(metrics.get("metric1"));
+ assertNotNull(metrics.get("emum-metric1.last"));
+ assertNotNull(metrics.get("emum-metric2.sum"));
+ assertNotNull(metrics.get("emum-metric2.count"));
+ assertNotNull(metrics.get("metric2"));
+ assertNotNull(metrics.get("metric3"));
+ assertNotNull(metrics.get("child_metric1"));
+ assertNotNull(metrics.get("child_metric1"));
+ }
+
+ enum TestMetrics implements VespaMetrics {
+ ENUM_METRIC1("emum-metric1"),
+ ENUM_METRIC2("emum-metric2");
+
+ private final String name;
+
+ TestMetrics(String name) {
+ this.name = name;
+ }
+
+ public String baseName() {
+ return name;
+ }
+
+ public Unit unit() {
+ return null;
+ }
+
+ public String description() {
+ return null;
+ }
+
+ }
}
diff --git a/config-model/src/test/java/com/yahoo/vespa/model/admin/monitoring/MetricTest.java b/metrics/src/test/java/ai/vespa/metrics/MetricTest.java
index f07b8c59322..7a0b85f82cc 100644
--- a/config-model/src/test/java/com/yahoo/vespa/model/admin/monitoring/MetricTest.java
+++ b/metrics/src/test/java/ai/vespa/metrics/MetricTest.java
@@ -1,5 +1,5 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.model.admin.monitoring;
+package ai.vespa.metrics;
import ai.vespa.metrics.set.Metric;
import com.google.common.collect.ImmutableMap;
diff --git a/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp b/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp
index d3553ad003f..c4b5d9c2145 100644
--- a/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp
+++ b/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp
@@ -668,7 +668,7 @@ vespalib::string
lsSingleFile(const vespalib::string & fileName)
{
fs::path path(fileName);
- return make_string("%s %20" PRIu64 " %12" PRId64, fileName.c_str(), vespalib::count_ns(fs::last_write_time(path).time_since_epoch()), fs::file_size(path));
+ return make_string("%s %20" PRIu64 " %12" PRIdMAX, fileName.c_str(), vespalib::count_ns(fs::last_write_time(path).time_since_epoch()), fs::file_size(path));
}
}
diff --git a/storage/src/tests/distributor/check_condition_test.cpp b/storage/src/tests/distributor/check_condition_test.cpp
index 757a9329ea6..617401dd271 100644
--- a/storage/src/tests/distributor/check_condition_test.cpp
+++ b/storage/src/tests/distributor/check_condition_test.cpp
@@ -5,6 +5,7 @@
#include <vespa/document/fieldset/fieldsets.h>
#include <vespa/documentapi/messagebus/messages/testandsetcondition.h>
#include <vespa/storage/distributor/node_supported_features.h>
+#include <vespa/storage/distributor/operations/cancel_scope.h>
#include <vespa/storage/distributor/operations/external/check_condition.h>
#include <vespa/storage/distributor/persistence_operation_metric_set.h>
#include <vespa/storageapi/message/persistence.h>
@@ -227,6 +228,20 @@ TEST_F(CheckConditionTest, failed_gets_completes_check_with_error_outcome) {
});
}
+TEST_F(CheckConditionTest, check_fails_if_condition_explicitly_cancelled) {
+ test_cond_with_2_gets_sent([&](auto& cond) {
+ cond.handle_reply(_sender, make_matched_reply(0));
+ cond.cancel(_sender, CancelScope::of_fully_cancelled());
+ cond.handle_reply(_sender, make_matched_reply(1));
+ }, [&](auto& outcome) {
+ EXPECT_FALSE(outcome.matched_condition());
+ EXPECT_FALSE(outcome.not_found());
+ EXPECT_TRUE(outcome.failed());
+ EXPECT_EQ(outcome.error_code().getResult(), api::ReturnCode::ABORTED);
+ });
+}
+
+// TODO deprecate in favor of cancelling
TEST_F(CheckConditionTest, check_fails_if_replica_set_changed_between_start_and_completion) {
test_cond_with_2_gets_sent([&](auto& cond) {
cond.handle_reply(_sender, make_matched_reply(0));
@@ -242,6 +257,7 @@ TEST_F(CheckConditionTest, check_fails_if_replica_set_changed_between_start_and_
});
}
+// TODO deprecate in favor of cancelling
TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start_and_completion_pending_transition_case) {
test_cond_with_2_gets_sent([&](auto& cond) {
cond.handle_reply(_sender, make_matched_reply(0));
@@ -255,6 +271,7 @@ TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start
});
}
+// TODO deprecate in favor of cancelling
TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start_and_completion_completed_transition_case) {
test_cond_with_2_gets_sent([&](auto& cond) {
cond.handle_reply(_sender, make_matched_reply(0));
diff --git a/storage/src/tests/distributor/garbagecollectiontest.cpp b/storage/src/tests/distributor/garbagecollectiontest.cpp
index 9b5056f2066..b1cf1cbc636 100644
--- a/storage/src/tests/distributor/garbagecollectiontest.cpp
+++ b/storage/src/tests/distributor/garbagecollectiontest.cpp
@@ -113,9 +113,8 @@ struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil {
ASSERT_EQ(entry->getNodeCount(), info.size());
EXPECT_EQ(entry->getLastGarbageCollectionTime(), last_gc_time);
for (size_t i = 0; i < info.size(); ++i) {
- EXPECT_EQ(info[i], entry->getNode(i)->getBucketInfo())
- << "Mismatching info for node " << i << ": " << info[i] << " vs "
- << entry->getNode(i)->getBucketInfo();
+ auto& node = entry->getNodeRef(i);
+ EXPECT_EQ(info[i], node.getBucketInfo()) << "Mismatching DB bucket info for node " << node.getNode();
}
}
@@ -172,6 +171,51 @@ TEST_F(GarbageCollectionOperationTest, replica_bucket_info_not_added_to_db_until
EXPECT_EQ(70u, gc_removed_documents_metric()); // Use max of received metrics
}
+TEST_F(GarbageCollectionOperationTest, no_replica_bucket_info_added_to_db_if_operation_fully_canceled) {
+ auto op = create_op();
+ op->start(_sender);
+ ASSERT_EQ(2, _sender.commands().size());
+
+ reply_to_nth_request(*op, 0, 1234, 70);
+ op->cancel(_sender, CancelScope::of_fully_cancelled());
+ reply_to_nth_request(*op, 1, 4567, 60);
+
+ // DB state is unchanged. Note that in a real scenario, the DB entry will have been removed
+ // as part of the ownership change, but there are already non-cancellation behaviors that
+ // avoid creating buckets from scratch in the DB if they do not exist, so just checking to
+ // see if the bucket exists or not risks hiding missing cancellation edge handling.
+ ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(250, 50, 300)}, 0));
+ // However, we still update our metrics if we _did_ remove documents on one or more nodes
+ EXPECT_EQ(70u, gc_removed_documents_metric());
+}
+
+TEST_F(GarbageCollectionOperationTest, no_replica_bucket_info_added_to_db_for_cancelled_node) {
+ auto op = create_op();
+ op->start(_sender);
+ ASSERT_EQ(2, _sender.commands().size());
+
+ reply_to_nth_request(*op, 0, 1234, 70);
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ reply_to_nth_request(*op, 1, 4567, 60);
+
+ // DB state is unchanged for node 0, changed for node 1
+ ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(4567, 90, 500)}, 34));
+}
+
+TEST_F(GarbageCollectionOperationTest, node_cancellation_is_cumulative) {
+ auto op = create_op();
+ op->start(_sender);
+ ASSERT_EQ(2, _sender.commands().size());
+
+ reply_to_nth_request(*op, 0, 1234, 70);
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ op->cancel(_sender, CancelScope::of_node_subset({1}));
+ reply_to_nth_request(*op, 1, 4567, 60);
+
+ // DB state is unchanged for both nodes
+ ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(250, 50, 300)}, 0));
+}
+
TEST_F(GarbageCollectionOperationTest, gc_bucket_info_does_not_overwrite_later_sequenced_bucket_info_writes) {
auto op = create_op();
op->start(_sender);
@@ -363,6 +407,16 @@ TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_bucket_in
receive_phase1_replies_and_assert_no_phase_2_started();
}
+TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_operation_fully_cancelled_between_phases) {
+ _op->cancel(_sender, CancelScope::of_fully_cancelled());
+ receive_phase1_replies_and_assert_no_phase_2_started();
+}
+
+TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_operation_partially_cancelled_between_phases) {
+ _op->cancel(_sender, CancelScope::of_node_subset({0}));
+ receive_phase1_replies_and_assert_no_phase_2_started();
+}
+
TEST_F(GarbageCollectionOperationTest, document_level_write_locks_are_checked_and_held_if_acquired) {
enable_two_phase_gc();
auto op = create_op();
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp
index 76b6741442e..ee87fe84df6 100644
--- a/storage/src/tests/distributor/putoperationtest.cpp
+++ b/storage/src/tests/distributor/putoperationtest.cpp
@@ -6,6 +6,7 @@
#include <vespa/storage/distributor/top_level_distributor.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/storage/distributor/distributor_stripe.h>
+#include <vespa/storage/distributor/operations/cancel_scope.h>
#include <vespa/storage/distributor/operations/external/putoperation.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/persistence.h>
@@ -208,6 +209,43 @@ TEST_F(PutOperationTest, failed_CreateBucket_removes_replica_from_db_and_sends_R
_sender.getCommands(true, true, 4));
}
+TEST_F(PutOperationTest, failed_CreateBucket_does_not_send_RequestBucketInfo_if_op_fully_canceled) {
+ setup_stripe(2, 2, "distributor:1 storage:2");
+
+ auto doc = createDummyDocument("test", "test");
+ sendPut(createPut(doc));
+
+ ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0", _sender.getCommands(true));
+
+ op->cancel(_sender, CancelScope::of_fully_cancelled());
+ sendReply(0, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 1
+
+ // DB is not touched (note: normally node 1 would be removed at the cancel-edge).
+ ASSERT_EQ("BucketId(0x4000000000008f09) : "
+ "node(idx=1,crc=0x1,docs=0/0,bytes=0/0,trusted=true,active=true,ready=false), "
+ "node(idx=0,crc=0x1,docs=0/0,bytes=0/0,trusted=true,active=false,ready=false)",
+ dumpBucket(operation_context().make_split_bit_constrained_bucket_id(doc->getId())));
+ // No new requests sent
+ ASSERT_EQ("", _sender.getCommands(true, true, 4));
+}
+
+TEST_F(PutOperationTest, failed_CreateBucket_does_not_send_RequestBucketInfo_for_cancelled_nodes) {
+ setup_stripe(2, 2, "distributor:1 storage:2");
+
+ auto doc = createDummyDocument("test", "test");
+ sendPut(createPut(doc));
+
+ ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0", _sender.getCommands(true));
+
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ sendReply(0, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 1
+ sendReply(1, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 0
+
+ // Bucket info recheck only sent to node 1, as it's not cancelled
+ ASSERT_EQ("RequestBucketInfoCommand(1 buckets, super bucket BucketId(0x4000000000008f09). ) => 1",
+ _sender.getCommands(true, true, 4));
+}
+
TEST_F(PutOperationTest, send_inline_split_before_put_if_bucket_too_large) {
setup_stripe(1, 1, "storage:1 distributor:1");
auto cfg = make_config();
@@ -272,6 +310,26 @@ TEST_F(PutOperationTest, return_success_if_op_acked_on_all_replicas_even_if_buck
_sender.getLastReply());
}
+TEST_F(PutOperationTest, return_success_if_op_acked_on_all_replicas_even_if_operation_cancelled) {
+ setup_stripe(2, 2, "storage:2 distributor:1");
+ createAndSendSampleDocument(TIMEOUT);
+
+ ASSERT_EQ("Put(BucketId(0x4000000000001dd4), "
+ "id:test:testdoctype1::, timestamp 100, size 45) => 0,"
+ "Put(BucketId(0x4000000000001dd4), "
+ "id:test:testdoctype1::, timestamp 100, size 45) => 1",
+ _sender.getCommands(true, true));
+
+ op->cancel(_sender, CancelScope::of_fully_cancelled());
+
+ sendReply(0);
+ sendReply(1);
+
+ ASSERT_EQ("PutReply(id:test:testdoctype1::, BucketId(0x0000000000000000), "
+ "timestamp 100) ReturnCode(NONE)",
+ _sender.getLastReply());
+}
+
TEST_F(PutOperationTest, storage_failed) {
setup_stripe(2, 1, "storage:1 distributor:1");
@@ -491,7 +549,7 @@ TEST_F(PutOperationTest, update_correct_bucket_on_remapped_put) {
{
std::shared_ptr<api::StorageCommand> msg2 = _sender.command(0);
- std::shared_ptr<api::StorageReply> reply(msg2->makeReply().release());
+ std::shared_ptr<api::StorageReply> reply(msg2->makeReply());
auto* sreply = dynamic_cast<api::PutReply*>(reply.get());
ASSERT_TRUE(sreply);
sreply->remapBucketId(document::BucketId(17, 13));
@@ -511,6 +569,7 @@ TEST_F(PutOperationTest, update_correct_bucket_on_remapped_put) {
dumpBucket(document::BucketId(17, 13)));
}
+// TODO make this redundant through operation cancelling
TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_state) {
setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3");
@@ -535,6 +594,7 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_
dumpBucket(operation_context().make_split_bit_constrained_bucket_id(doc->getId())));
}
+// TODO make this redundant through operation cancelling
TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending_state) {
setup_stripe(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3");
@@ -568,6 +628,8 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending
// TODO probably also do this for updates and removes
// TODO consider if we should use the pending state verbatim for computing targets if it exists
+// TODO make this redundant through operation cancelling
+// ... actually; FIXME shouldn't the ExternalOperationHandler already cover this??
TEST_F(PutOperationTest, put_is_failed_with_busy_if_target_down_in_pending_state) {
setup_stripe(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3");
auto doc = createDummyDocument("test", "test");
@@ -584,6 +646,65 @@ TEST_F(PutOperationTest, put_is_failed_with_busy_if_target_down_in_pending_state
_sender.getLastReply(true));
}
+TEST_F(PutOperationTest, db_not_updated_if_operation_cancelled_by_ownership_change) {
+ setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3");
+
+ auto doc = createDummyDocument("test", "uri");
+ auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId());
+ auto remap_bucket = BucketId(bucket.getUsedBits() + 1, bucket.getId());
+ addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t");
+
+ sendPut(createPut(doc));
+
+ ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true));
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucket), {0, 1, 2});
+ op->cancel(_sender, CancelScope::of_fully_cancelled());
+
+ // Normally DB updates triggered by replies don't _create_ buckets in the DB, unless
+ // they're remapped buckets. Use a remapping to ensure we hit a create-if-missing DB path.
+ {
+ std::shared_ptr<api::StorageCommand> msg2 = _sender.command(0);
+ std::shared_ptr<api::StorageReply> reply(msg2->makeReply());
+ auto* sreply = dynamic_cast<api::PutReply*>(reply.get());
+ ASSERT_TRUE(sreply);
+ sreply->remapBucketId(remap_bucket);
+ sreply->setBucketInfo(api::BucketInfo(1,2,3,4,5));
+ op->receive(_sender, reply);
+ }
+
+ sendReply(1, api::ReturnCode::OK, api::BucketInfo(5, 6, 7));
+ sendReply(2, api::ReturnCode::OK, api::BucketInfo(7, 8, 9));
+
+ EXPECT_EQ("NONEXISTING", dumpBucket(bucket));
+ EXPECT_EQ("NONEXISTING", dumpBucket(remap_bucket));
+}
+
+TEST_F(PutOperationTest, individually_cancelled_nodes_are_not_updated_in_db) {
+ setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3");
+
+ auto doc = createDummyDocument("test", "uri");
+ auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId());
+ addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t");
+
+ sendPut(createPut(doc));
+ ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true));
+
+ // Simulate nodes 0 and 2 going down
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucket), {0, 2});
+ // Cancelling shall be cumulative
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ op->cancel(_sender, CancelScope::of_node_subset({2}));
+
+ sendReply(0, api::ReturnCode::OK, api::BucketInfo(5, 6, 7));
+ sendReply(1, api::ReturnCode::OK, api::BucketInfo(6, 7, 8));
+ sendReply(2, api::ReturnCode::OK, api::BucketInfo(9, 8, 7));
+
+ EXPECT_EQ("BucketId(0x4000000000000593) : "
+ "node(idx=1,crc=0x5,docs=6/6,bytes=7/7,trusted=true,active=false,ready=false)",
+ dumpBucket(bucket));
+}
+
TEST_F(PutOperationTest, send_to_retired_nodes_if_no_up_nodes_available) {
setup_stripe(Redundancy(2), NodeCount(2),
"distributor:1 storage:2 .0.s:r .1.s:r");
@@ -761,6 +882,38 @@ TEST_F(PutOperationTest, failed_condition_probe_fails_op_with_returned_error) {
_sender.getLastReply());
}
+TEST_F(PutOperationTest, ownership_cancellation_during_condition_probe_fails_operation_on_probe_completion) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes());
+
+ op->receive(_sender, make_get_reply(*sent_get_command(0), 0, false, false));
+ op->cancel(_sender, CancelScope::of_fully_cancelled());
+ op->receive(_sender, make_get_reply(*sent_get_command(1), 0, false, false));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ ASSERT_EQ("PutReply(id:test:testdoctype1::test, "
+ "BucketId(0x0000000000000000), timestamp 100) "
+ "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: "
+ "Operation has been cancelled (likely due to a cluster state change))",
+ _sender.getLastReply());
+}
+
+TEST_F(PutOperationTest, replica_subset_cancellation_during_condition_probe_fails_operation_on_probe_completion) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes());
+
+ op->receive(_sender, make_get_reply(*sent_get_command(0), 0, false, false));
+ // 1 of 2 nodes; we still abort after the read phase since we cannot possibly fulfill
+ // the write phase for all replicas.
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ op->receive(_sender, make_get_reply(*sent_get_command(1), 0, false, false));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ ASSERT_EQ("PutReply(id:test:testdoctype1::test, "
+ "BucketId(0x0000000000000000), timestamp 100) "
+ "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: "
+ "Operation has been cancelled (likely due to a cluster state change))",
+ _sender.getLastReply());
+}
+
TEST_F(PutOperationTest, create_flag_in_parent_put_is_propagated_to_sent_puts) {
setup_stripe(Redundancy(2), NodeCount(2), "version:1 storage:2 distributor:1");
auto doc = createDummyDocument("test", "test");
diff --git a/storage/src/tests/distributor/removeoperationtest.cpp b/storage/src/tests/distributor/removeoperationtest.cpp
index d169c80a95d..3fad2c194a2 100644
--- a/storage/src/tests/distributor/removeoperationtest.cpp
+++ b/storage/src/tests/distributor/removeoperationtest.cpp
@@ -68,6 +68,7 @@ struct RemoveOperationTest : Test, DistributorStripeTestUtil {
std::unique_ptr<api::StorageReply> reply(removec->makeReply());
auto* removeR = dynamic_cast<api::RemoveReply*>(reply.get());
removeR->setOldTimestamp(oldTimestamp);
+ removeR->setBucketInfo(api::BucketInfo(1,2,3,4,5));
callback.onReceive(_sender, std::shared_ptr<api::StorageReply>(reply.release()));
}
@@ -307,6 +308,45 @@ TEST_F(ExtRemoveOperationTest, failed_condition_probe_fails_op_with_returned_err
_sender.getLastReply());
}
+// Note: we don't exhaustively test cancellation edges here, as we assume that Put/Update/Remove ops
+// share the same underlying PersistenceMessageTracker logic. See PutOperationTest for more tests.
+
+TEST_F(ExtRemoveOperationTest, cancellation_during_condition_probe_fails_operation_on_probe_completion) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::INCONSISTENT));
+
+ reply_with(make_get_reply(0, 50, false, true));
+ op->cancel(_sender, CancelScope::of_fully_cancelled());
+ reply_with(make_get_reply(1, 50, false, true));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ EXPECT_EQ("RemoveReply(BucketId(0x0000000000000000), "
+ "id:test:test::uri, "
+ "timestamp 100, not found) "
+ "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: "
+ "Operation has been cancelled (likely due to a cluster state change))",
+ _sender.getLastReply());
+}
+
+TEST_F(ExtRemoveOperationTest, cancelled_nodes_are_not_updated_in_db) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::CONSISTENT));
+ ASSERT_EQ("Remove => 1,Remove => 0", _sender.getCommands(true));
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucketId), {1});
+ op->cancel(_sender, CancelScope::of_node_subset({1}));
+
+ replyToMessage(*op, 0, 50);
+ replyToMessage(*op, 1, 50);
+
+ EXPECT_EQ("BucketId(0x4000000000000593) : "
+ "node(idx=0,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false)",
+ dumpBucket(bucketId));
+ // Reply is still OK since the operation went through on the content nodes
+ ASSERT_EQ("RemoveReply(BucketId(0x0000000000000000), "
+ "id:test:test::uri, timestamp 100, removed doc from 50) ReturnCode(NONE)",
+ _sender.getLastReply());
+
+}
+
TEST_F(ExtRemoveOperationTest, trace_is_propagated_from_condition_probe_gets_ok_probe_case) {
ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::INCONSISTENT));
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
index da32225cde3..1907335545a 100644
--- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
+++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
@@ -4,16 +4,16 @@
#include <vespa/config/helper/configgetter.h>
#include <vespa/document/base/testdocrepo.h>
#include <vespa/document/fieldset/fieldsets.h>
-#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/fieldvalue/intfieldvalue.h>
+#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/document/update/arithmeticvalueupdate.h>
-#include <vespa/storage/distributor/top_level_distributor.h>
#include <vespa/storage/distributor/distributor_stripe.h>
#include <vespa/storage/distributor/externaloperationhandler.h>
#include <vespa/storage/distributor/operations/external/twophaseupdateoperation.h>
+#include <vespa/storage/distributor/top_level_distributor.h>
#include <vespa/storageapi/message/persistence.h>
-#include <vespa/vespalib/gtest/gtest.h>
+#include <gtest/gtest.h>
#include <gmock/gmock.h>
namespace storage::distributor {
@@ -30,8 +30,9 @@ using namespace ::testing;
struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
document::TestDocRepo _testRepo;
std::shared_ptr<const DocumentTypeRepo> _repo;
- const DocumentType* _doc_type;
+ const DocumentType* _doc_type{nullptr};
DistributorMessageSenderStub _sender;
+ BucketId _bucket_id{0x400000000000cac4};
TwoPhaseUpdateOperationTest();
~TwoPhaseUpdateOperationTest() override;
@@ -39,7 +40,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
void checkMessageSettingsPropagatedTo(
const api::StorageCommand::SP& msg) const;
- std::string getUpdatedValueFromLastPut(DistributorMessageSenderStub&);
+ static std::string getUpdatedValueFromLastPut(DistributorMessageSenderStub&);
void SetUp() override {
_repo = _testRepo.getTypeRepoSp();
@@ -57,20 +58,21 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
close();
}
- void replyToMessage(Operation& callback,
- DistributorMessageSenderStub& sender,
- uint32_t index,
- uint64_t oldTimestamp,
- api::ReturnCode::Result result = api::ReturnCode::OK);
+ static void replyToMessage(
+ Operation& callback,
+ DistributorMessageSenderStub& sender,
+ uint32_t index,
+ uint64_t oldTimestamp,
+ api::ReturnCode::Result result = api::ReturnCode::OK);
- void replyToPut(
+ static void replyToPut(
Operation& callback,
DistributorMessageSenderStub& sender,
uint32_t index,
api::ReturnCode::Result result = api::ReturnCode::OK,
const std::string& traceMsg = "");
- void replyToCreateBucket(
+ static void replyToCreateBucket(
Operation& callback,
DistributorMessageSenderStub& sender,
uint32_t index,
@@ -85,7 +87,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
api::ReturnCode::Result result = api::ReturnCode::OK,
const std::string& traceMsg = "");
- void reply_to_metadata_get(
+ static void reply_to_metadata_get(
Operation& callback,
DistributorMessageSenderStub& sender,
uint32_t index,
@@ -93,7 +95,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
api::ReturnCode::Result result = api::ReturnCode::OK,
const std::string& trace_msg = "");
- void reply_to_get_with_tombstone(
+ static void reply_to_get_with_tombstone(
Operation& callback,
DistributorMessageSenderStub& sender,
uint32_t index,
@@ -148,11 +150,17 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
Timestamp highest_get_timestamp,
Timestamp expected_response_timestamp);
- std::shared_ptr<TwoPhaseUpdateOperation> set_up_2_inconsistent_replicas_and_start_update(bool enable_3phase = true) {
- setup_stripe(2, 2, "storage:2 distributor:1");
+ void do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(bool in_sync_replicas);
+
+ void enable_3phase_updates(bool enable = true) {
auto cfg = make_config();
- cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(enable_3phase);
+ cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(enable);
configure_stripe(cfg);
+ }
+
+ std::shared_ptr<TwoPhaseUpdateOperation> set_up_2_inconsistent_replicas_and_start_update(bool enable_3phase = true) {
+ setup_stripe(2, 2, "storage:2 distributor:1");
+ enable_3phase_updates(enable_3phase);
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
cb->start(_sender);
return cb;
@@ -199,13 +207,13 @@ TwoPhaseUpdateOperationTest::replyToPut(
{
std::shared_ptr<api::StorageMessage> msg2 = sender.command(index);
auto& putc = dynamic_cast<PutCommand&>(*msg2);
- std::unique_ptr<api::StorageReply> reply(putc.makeReply());
+ std::shared_ptr<api::StorageReply> reply(putc.makeReply());
reply->setResult(api::ReturnCode(result, ""));
+ dynamic_cast<api::PutReply&>(*reply).setBucketInfo(api::BucketInfo(1,2,3,4,5));
if (!traceMsg.empty()) {
MBUS_TRACE(reply->getTrace(), 1, traceMsg);
}
- callback.receive(sender,
- std::shared_ptr<StorageReply>(reply.release()));
+ callback.receive(sender, reply);
}
void
@@ -217,10 +225,9 @@ TwoPhaseUpdateOperationTest::replyToCreateBucket(
{
std::shared_ptr<api::StorageMessage> msg2 = sender.command(index);
auto& putc = dynamic_cast<CreateBucketCommand&>(*msg2);
- std::unique_ptr<api::StorageReply> reply(putc.makeReply());
+ std::shared_ptr<api::StorageReply> reply(putc.makeReply());
reply->setResult(api::ReturnCode(result, ""));
- callback.receive(sender,
- std::shared_ptr<StorageReply>(reply.release()));
+ callback.receive(sender, reply);
}
void
@@ -312,6 +319,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState,
update->setCreateIfNonExistent(options._createIfNonExistent);
document::BucketId id = operation_context().make_split_bit_constrained_bucket_id(update->getId());
+ assert(id == _bucket_id);
document::BucketId id2 = document::BucketId(id.getUsedBits() + 1, id.getRawId());
if (bucketState.length()) {
@@ -554,6 +562,33 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_inconsiste
_sender.getLastReply(true));
}
+TEST_F(TwoPhaseUpdateOperationTest, fast_path_cancellation_transitively_cancels_nested_update_operation) {
+ setup_stripe(2, 2, "storage:2 distributor:1");
+ enable_3phase_updates();
+ auto op = sendUpdate("0=1/2/3,1=1/2/3");
+ op->start(_sender);
+
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
+
+ replyToMessage(*op, _sender, 0, 110);
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {1});
+ op->cancel(_sender, CancelScope::of_node_subset({1}));
+
+ replyToMessage(*op, _sender, 1, 110);
+
+ // Client operation itself should return success since the update went through on all replica nodes
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 110) "
+ "ReturnCode(NONE)",
+ _sender.getLastReply(true));
+
+ EXPECT_EQ("BucketId(0x400000000000cac4) : "
+ "node(idx=0,crc=0x123,docs=1/1,bytes=100/100,trusted=true,active=false,ready=false)",
+ dumpBucket(_bucket_id));
+}
+
void
TwoPhaseUpdateOperationTest::checkMessageSettingsPropagatedTo(
const api::StorageCommand::SP& msg) const
@@ -713,6 +748,38 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_gets_fail) {
EXPECT_EQ(metrics().updates.failures.storagefailure.getValue(), 1);
}
+void TwoPhaseUpdateOperationTest::do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(bool in_sync_replicas) {
+ setup_stripe(2, 2, "storage:2 distributor:1");
+ enable_3phase_updates();
+ auto op = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().createIfNonExistent(true));
+ op->start(_sender);
+
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*op, _sender, 0, 70);
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {1});
+ op->cancel(_sender, CancelScope::of_node_subset({1}));
+
+ replyToGet(*op, _sender, 1, in_sync_replicas ? 70 : 80);
+
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 0) "
+ "ReturnCode(BUCKET_NOT_FOUND, The update operation was cancelled due to a cluster "
+ "state change between executing the read and write phases of a write-repair update)",
+ _sender.getLastReply(true));
+
+ // TODO custom cancellation failure metric?
+}
+
+TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion_consistent_case) {
+ do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(true);
+}
+
+TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion_inconsistent_case) {
+ do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(false);
+}
+
TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_apply_throws_exception) {
setup_stripe(2, 2, "storage:2 distributor:1");
// Create update for wrong doctype which will fail the update.
@@ -1214,6 +1281,59 @@ TEST_F(ThreePhaseUpdateTest, puts_are_sent_after_receiving_full_document_get) {
EXPECT_EQ(1, m.ok.getValue());
}
+TEST_F(ThreePhaseUpdateTest, update_fails_if_cancelled_between_metadata_gets_and_full_get) {
+ auto op = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*op, _sender, 0, 2000);
+ reply_to_metadata_get(*op, _sender, 1, 1000);
+ ASSERT_EQ("Get => 0", _sender.getCommands(true, false, 2));
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {1});
+ op->cancel(_sender, CancelScope::of_node_subset({1}));
+
+ replyToGet(*op, _sender, 2, 2000U);
+ ASSERT_EQ("", _sender.getCommands(true, false, 3)); // No puts sent
+
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 0) "
+ "ReturnCode(BUCKET_NOT_FOUND, The update operation was cancelled due to a cluster "
+ "state change between executing the read and write phases of a write-repair update)",
+ _sender.getLastReply(true));
+
+ // TODO cancellation metrics?
+}
+
+TEST_F(ThreePhaseUpdateTest, fast_path_cancellation_transitively_cancels_nested_put_operation) {
+ auto op = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*op, _sender, 0, 2000);
+ reply_to_metadata_get(*op, _sender, 1, 1000);
+
+ ASSERT_EQ("Get => 0", _sender.getCommands(true, false, 2));
+ replyToGet(*op, _sender, 2, 2000U);
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {0});
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+
+ ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 3));
+ replyToPut(*op, _sender, 3);
+ replyToPut(*op, _sender, 4);
+
+ // Update itself is ACKed
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 2000) "
+ "ReturnCode(NONE)",
+ _sender.getLastReply(true));
+
+ // But cancelled replicas are not reintroduced into the bucket DB
+ EXPECT_EQ("BucketId(0x400000000000cac4) : "
+ "node(idx=1,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false)",
+ dumpBucket(_bucket_id));
+}
+
+
TEST_F(ThreePhaseUpdateTest, consistent_meta_get_timestamps_can_restart_in_fast_path) {
auto cb = set_up_2_inconsistent_replicas_and_start_update();
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
@@ -1277,8 +1397,7 @@ TEST_F(ThreePhaseUpdateTest, update_failed_with_transient_error_code_if_replica_
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // 2 replicas, room for 1 more.
cb->start(_sender);
// Add new replica to deterministic test bucket after gets have been sent
- BucketId bucket(0x400000000000cac4); // Always the same in the test.
- addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3");
+ addNodesToBucketDB(_bucket_id, "0=1/2/3,1=2/3/4,2=3/3/3");
Timestamp old_timestamp = 500;
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp
index d0ae31b9524..fefc88a27c2 100644
--- a/storage/src/tests/distributor/updateoperationtest.cpp
+++ b/storage/src/tests/distributor/updateoperationtest.cpp
@@ -11,7 +11,7 @@
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/state.h>
-#include <vespa/vespalib/gtest/gtest.h>
+#include <gtest/gtest.h>
using config::ConfigGetter;
using config::FileSpec;
@@ -29,11 +29,14 @@ struct UpdateOperationTest : Test, DistributorStripeTestUtil {
std::shared_ptr<const DocumentTypeRepo> _repo;
const DocumentType* _html_type;
+ UpdateOperationTest()
+ : _repo(std::make_shared<DocumentTypeRepo>(*ConfigGetter<DocumenttypesConfig>::
+ getConfig("config-doctypes", FileSpec("../config-doctypes.cfg")))),
+ _html_type(_repo->getDocumentType("text/html"))
+ {
+ }
+
void SetUp() override {
- _repo.reset(
- new DocumentTypeRepo(*ConfigGetter<DocumenttypesConfig>::
- getConfig("config-doctypes", FileSpec("../config-doctypes.cfg"))));
- _html_type = _repo->getDocumentType("text/html");
createLinks();
}
@@ -241,4 +244,31 @@ TEST_F(UpdateOperationTest, inconsistent_create_if_missing_updates_picks_largest
EXPECT_EQ(2, m.diverging_timestamp_updates.getValue());
}
+// Note: we don't exhaustively test cancellation edges here, as we assume that Put/Update/Remove ops
+// share the same underlying PersistenceMessageTracker logic. See PutOperationTest for more tests.
+
+TEST_F(UpdateOperationTest, cancelled_nodes_are_not_updated_in_db) {
+ setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3");
+
+ std::shared_ptr<UpdateOperation> op = sendUpdate("0=1/2/3,1=1/2/3,2=1/2/3");
+ DistributorMessageSenderStub sender;
+ op->start(sender);
+
+ ASSERT_EQ("Update => 0,Update => 1,Update => 2", sender.getCommands(true));
+
+ // Simulate nodes 0 and 2 going down
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bId), {0, 2});
+ // Cancelling shall be cumulative
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ op->cancel(_sender, CancelScope::of_node_subset({2}));
+
+ replyToMessage(*op, sender, 0, 120);
+ replyToMessage(*op, sender, 1, 120);
+ replyToMessage(*op, sender, 2, 120);
+
+ EXPECT_EQ("BucketId(0x400000000000cac4) : "
+ "node(idx=1,crc=0x2,docs=4/4,bytes=6/6,trusted=true,active=false,ready=false)",
+ dumpBucket(_bId));
+}
+
}
diff --git a/storage/src/vespa/storage/bucketdb/bucketinfo.cpp b/storage/src/vespa/storage/bucketdb/bucketinfo.cpp
index a8c21efa793..d2ff7b53403 100644
--- a/storage/src/vespa/storage/bucketdb/bucketinfo.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketinfo.cpp
@@ -142,7 +142,7 @@ BucketInfo::addNode(const BucketCopy& newCopy, const std::vector<uint16_t>& reco
bool
BucketInfo::removeNode(unsigned short node, TrustedUpdate update)
{
- for (auto iter = _nodes.begin(); iter != _nodes.end(); iter++) {
+ for (auto iter = _nodes.begin(); iter != _nodes.end(); ++iter) {
if (iter->getNode() == node) {
_nodes.erase(iter);
if (update == TrustedUpdate::UPDATE) {
diff --git a/storage/src/vespa/storage/bucketdb/bucketinfo.hpp b/storage/src/vespa/storage/bucketdb/bucketinfo.hpp
index f8dbff38a99..a8a1069d587 100644
--- a/storage/src/vespa/storage/bucketdb/bucketinfo.hpp
+++ b/storage/src/vespa/storage/bucketdb/bucketinfo.hpp
@@ -159,7 +159,7 @@ BucketInfoBase<NodeSeq>::getNodes() const noexcept {
}
template <typename NodeSeq>
-BucketInfoBase<NodeSeq>::Highest
+typename BucketInfoBase<NodeSeq>::Highest
BucketInfoBase<NodeSeq>::getHighest() const noexcept {
Highest highest;
for (const auto & n : _nodes) {
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt
index 184fee5d2c9..c889afcc77c 100644
--- a/storage/src/vespa/storage/distributor/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/CMakeLists.txt
@@ -10,6 +10,7 @@ vespa_add_library(storage_distributor OBJECT
bucket_spaces_stats_provider.cpp
bucketgctimecalculator.cpp
bucketlistmerger.cpp
+ cancelled_replicas_pruner.cpp
clusterinformation.cpp
crypto_uuid_generator.cpp
distributor_bucket_space.cpp
diff --git a/storage/src/vespa/storage/distributor/activecopy.cpp b/storage/src/vespa/storage/distributor/activecopy.cpp
index 4e3ef4f88ee..4c35d42a0e7 100644
--- a/storage/src/vespa/storage/distributor/activecopy.cpp
+++ b/storage/src/vespa/storage/distributor/activecopy.cpp
@@ -5,6 +5,7 @@
#include <vespa/vespalib/stllike/asciistream.h>
#include <algorithm>
#include <cassert>
+#include <ostream>
namespace std {
diff --git a/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.cpp b/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.cpp
new file mode 100644
index 00000000000..f453a722d2c
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.cpp
@@ -0,0 +1,22 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "cancelled_replicas_pruner.h"
+
+namespace storage::distributor {
+
+std::vector<BucketCopy> prune_cancelled_nodes(std::span<const BucketCopy> replicas, const CancelScope& cancel_scope) {
+ if (cancel_scope.fully_cancelled()) {
+ return {};
+ }
+ std::vector<BucketCopy> pruned_replicas;
+ // Expect that there will be an input entry for each cancelled node in the common case.
+ pruned_replicas.reserve((replicas.size() >= cancel_scope.cancelled_nodes().size())
+ ? replicas.size() - cancel_scope.cancelled_nodes().size() : 0);
+ for (auto& candidate : replicas) {
+ if (!cancel_scope.node_is_cancelled(candidate.getNode())) {
+ pruned_replicas.emplace_back(candidate);
+ }
+ }
+ return pruned_replicas;
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.h b/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.h
new file mode 100644
index 00000000000..f12f78e569f
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.h
@@ -0,0 +1,17 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/storage/bucketdb/bucketcopy.h>
+#include <vespa/storage/distributor/operations/cancel_scope.h>
+#include <span>
+#include <vector>
+
+namespace storage::distributor {
+
+/**
+ * Returns a new vector that contains all entries of `replicas` whose nodes are _not_ tagged as
+ * cancelled in `cancel_scope`. Returned entry ordering is identical to input ordering.
+ */
+[[nodiscard]] std::vector<BucketCopy> prune_cancelled_nodes(std::span<const BucketCopy> replicas, const CancelScope& cancel_scope);
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index 243b3c5ecb2..ad1cce46bea 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -281,6 +281,7 @@ DistributorStripe::enableClusterStateBundle(const lib::ClusterStateBundle& state
enterRecoveryMode();
// Clear all active messages on nodes that are down.
+ // TODO this should also be done on nodes that are no longer part of the config!
const uint16_t old_node_count = oldState.getBaselineClusterState()->getNodeCount(lib::NodeType::STORAGE);
const uint16_t new_node_count = baseline_state.getNodeCount(lib::NodeType::STORAGE);
for (uint16_t i = 0; i < std::max(old_node_count, new_node_count); ++i) {
diff --git a/storage/src/vespa/storage/distributor/operationowner.cpp b/storage/src/vespa/storage/distributor/operationowner.cpp
index 7b7c9f431f7..c92544c8cb5 100644
--- a/storage/src/vespa/storage/distributor/operationowner.cpp
+++ b/storage/src/vespa/storage/distributor/operationowner.cpp
@@ -73,7 +73,7 @@ OperationOwner::onClose()
void
OperationOwner::erase(api::StorageMessage::Id msgId)
{
- _sentMessageMap.pop(msgId);
+ (void)_sentMessageMap.pop(msgId);
}
}
diff --git a/storage/src/vespa/storage/distributor/operations/CMakeLists.txt b/storage/src/vespa/storage/distributor/operations/CMakeLists.txt
index 5c6a1f3d84c..8cf0470f674 100644
--- a/storage/src/vespa/storage/distributor/operations/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/operations/CMakeLists.txt
@@ -1,6 +1,7 @@
# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_add_library(storage_distributoroperation OBJECT
SOURCES
+ cancel_scope.cpp
operation.cpp
DEPENDS
)
diff --git a/storage/src/vespa/storage/distributor/operations/cancel_scope.cpp b/storage/src/vespa/storage/distributor/operations/cancel_scope.cpp
new file mode 100644
index 00000000000..af62b369517
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/operations/cancel_scope.cpp
@@ -0,0 +1,52 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "cancel_scope.h"
+
+namespace storage::distributor {
+
+CancelScope::CancelScope()
+ : _cancelled_nodes(),
+ _fully_cancelled(false)
+{
+}
+
+CancelScope::CancelScope(fully_cancelled_ctor_tag) noexcept
+ : _cancelled_nodes(),
+ _fully_cancelled(true)
+{
+}
+
+CancelScope::CancelScope(CancelledNodeSet nodes) noexcept
+ : _cancelled_nodes(std::move(nodes)),
+ _fully_cancelled(false)
+{
+}
+
+CancelScope::~CancelScope() = default;
+
+CancelScope::CancelScope(const CancelScope&) = default;
+CancelScope& CancelScope::operator=(const CancelScope&) = default;
+
+CancelScope::CancelScope(CancelScope&&) noexcept = default;
+CancelScope& CancelScope::operator=(CancelScope&&) noexcept = default;
+
+void CancelScope::add_cancelled_node(uint16_t node) {
+ _cancelled_nodes.insert(node);
+}
+
+void CancelScope::merge(const CancelScope& other) {
+ _fully_cancelled |= other._fully_cancelled;
+ // Not using iterator insert(first, last) since that explicitly resizes,
+ for (uint16_t node : other._cancelled_nodes) {
+ _cancelled_nodes.insert(node);
+ }
+}
+
+CancelScope CancelScope::of_fully_cancelled() noexcept {
+ return CancelScope(fully_cancelled_ctor_tag{});
+}
+
+CancelScope CancelScope::of_node_subset(CancelledNodeSet nodes) noexcept {
+ return CancelScope(std::move(nodes));
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/operations/cancel_scope.h b/storage/src/vespa/storage/distributor/operations/cancel_scope.h
new file mode 100644
index 00000000000..7619a64d39f
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/operations/cancel_scope.h
@@ -0,0 +1,62 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/vespalib/stllike/hash_set.h>
+
+namespace storage::distributor {
+
+/**
+ * In the face of concurrent cluster state changes, cluster topology reconfigurations etc.,
+ * it's possible for there to be pending mutating operations to nodes that the distributor
+ * no longer should keep track of. Such operations must therefore be _cancelled_, either
+ * fully or partially. A CancelScope represents the granularity at which an operation should
+ * be cancelled.
+ *
+ * In the case of one or more nodes becoming unavailable, `fully_cancelled()` will be false
+ * and `node_is_cancelled(x)` will return whether node `x` is explicitly cancelled.
+ *
+ * In the case of ownership transfers, `fully_cancelled()` will be true since the distributor
+ * should no longer have any knowledge of the bucket. `node_is_cancelled(x)` is always
+ * implicitly true for all values of `x` for full cancellations.
+ */
+class CancelScope {
+public:
+ using CancelledNodeSet = vespalib::hash_set<uint16_t>;
+private:
+ CancelledNodeSet _cancelled_nodes;
+ bool _fully_cancelled;
+
+ struct fully_cancelled_ctor_tag {};
+
+ explicit CancelScope(fully_cancelled_ctor_tag) noexcept;
+ explicit CancelScope(CancelledNodeSet nodes) noexcept;
+public:
+ CancelScope();
+ ~CancelScope();
+
+ CancelScope(const CancelScope&);
+ CancelScope& operator=(const CancelScope&);
+
+ CancelScope(CancelScope&&) noexcept;
+ CancelScope& operator=(CancelScope&&) noexcept;
+
+ void add_cancelled_node(uint16_t node);
+ void merge(const CancelScope& other);
+
+ [[nodiscard]] bool fully_cancelled() const noexcept { return _fully_cancelled; }
+ [[nodiscard]] bool is_cancelled() const noexcept {
+ return (_fully_cancelled || !_cancelled_nodes.empty());
+ }
+ [[nodiscard]] bool node_is_cancelled(uint16_t node) const noexcept {
+ return (fully_cancelled() || _cancelled_nodes.contains(node));
+ }
+
+ [[nodiscard]] const CancelledNodeSet& cancelled_nodes() const noexcept {
+ return _cancelled_nodes;
+ }
+
+ static CancelScope of_fully_cancelled() noexcept;
+ static CancelScope of_node_subset(CancelledNodeSet nodes) noexcept;
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
index 0e12e3e3019..bd7f3709575 100644
--- a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
@@ -104,7 +104,12 @@ CheckCondition::handle_reply(DistributorStripeMessageSender& sender,
}
}
-void CheckCondition::cancel(DistributorStripeMessageSender& sender) {
+void CheckCondition::cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) {
+ IntermediateMessageSender proxy_sender(_sent_message_map, _cond_get_op, sender);
+ _cond_get_op->cancel(proxy_sender, cancel_scope);
+}
+
+void CheckCondition::close(DistributorStripeMessageSender& sender) {
IntermediateMessageSender proxy_sender(_sent_message_map, _cond_get_op, sender);
_cond_get_op->onClose(proxy_sender);
// We don't propagate any generated reply from the GetOperation, as its existence
@@ -163,6 +168,12 @@ void CheckCondition::handle_internal_get_operation_reply(std::shared_ptr<api::St
reply->steal_trace());
return;
}
+ if (_cond_get_op->is_cancelled()) {
+ _outcome.emplace(api::ReturnCode(api::ReturnCode::ABORTED,
+ "Operation has been cancelled (likely due to a cluster state change)"),
+ reply->steal_trace());
+ return;
+ }
auto state_version_now = _bucket_space.getClusterState().getVersion();
if (_bucket_space.has_pending_cluster_state()) {
state_version_now = _bucket_space.get_pending_cluster_state().getVersion();
diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.h b/storage/src/vespa/storage/distributor/operations/external/check_condition.h
index 999b79adc3d..92a8bc62ae6 100644
--- a/storage/src/vespa/storage/distributor/operations/external/check_condition.h
+++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.h
@@ -17,6 +17,7 @@ namespace storage::api { class StorageReply; }
namespace storage::distributor {
+class CancelScope;
class DistributorBucketSpace;
class DistributorNodeContext;
class DistributorStripeMessageSender;
@@ -122,7 +123,8 @@ public:
void start_and_send(DistributorStripeMessageSender& sender);
void handle_reply(DistributorStripeMessageSender& sender,
const std::shared_ptr<api::StorageReply>& reply);
- void cancel(DistributorStripeMessageSender& sender);
+ void cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope);
+ void close(DistributorStripeMessageSender& sender);
[[nodiscard]] std::optional<Outcome>& maybe_outcome() noexcept {
return _outcome;
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
index 854e7d15f82..e7832fd19e5 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
@@ -109,6 +109,8 @@ PutOperation::sendPutToBucketOnNode(document::BucketSpace bucketSpace, const doc
bool PutOperation::has_unavailable_targets_in_pending_state(const OperationTargetList& targets) const {
// TODO handle this explicitly as part of operation abort/cancel edge
+ // -> we have yet to send anything at this point
+ // -> shouldn't ExternalOperationHandler deal with this before starting the op?
auto* pending_state = _op_ctx.pending_cluster_state_or_null(_msg->getBucket().getBucketSpace());
if (!pending_state) {
return false;
@@ -245,6 +247,15 @@ void PutOperation::start_direct_put_dispatch(DistributorStripeMessageSender& sen
_msg = std::shared_ptr<api::PutCommand>();
}
+void
+PutOperation::on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope)
+{
+ if (_check_condition) {
+ _check_condition->cancel(sender, cancel_scope);
+ }
+ _tracker.cancel(cancel_scope);
+}
+
bool
PutOperation::shouldImplicitlyActivateReplica(const OperationTargetList& targets) const
{
@@ -302,7 +313,7 @@ void
PutOperation::onClose(DistributorStripeMessageSender& sender)
{
if (_check_condition) {
- _check_condition->cancel(sender);
+ _check_condition->close(sender);
}
_tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down"));
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
index 635accc1865..8b8e3e15375 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
@@ -60,6 +60,8 @@ private:
void sendPutToBucketOnNode(document::BucketSpace bucketSpace, const document::BucketId& bucketId,
uint16_t node, std::vector<PersistenceMessageTracker::ToSend>& putBatch);
+ void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override;
+
[[nodiscard]] bool shouldImplicitlyActivateReplica(const OperationTargetList& targets) const;
[[nodiscard]] bool has_unavailable_targets_in_pending_state(const OperationTargetList& targets) const;
diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
index 42d8e318f47..be43aac3d9e 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
@@ -156,9 +156,21 @@ void RemoveOperation::on_completed_check_condition(CheckCondition::Outcome& outc
void
RemoveOperation::onClose(DistributorStripeMessageSender& sender)
{
+ if (_check_condition) {
+ _check_condition->close(sender);
+ }
_tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down"));
}
+void
+RemoveOperation::on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope)
+{
+ if (_check_condition) {
+ _check_condition->cancel(sender, cancel_scope);
+ }
+ _tracker.cancel(cancel_scope);
+}
+
bool RemoveOperation::has_condition() const noexcept {
return _msg->hasTestAndSetCondition();
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
index 9f3a98294ea..221def81fdc 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
@@ -29,6 +29,7 @@ public:
void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
void onClose(DistributorStripeMessageSender& sender) override;
+ void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override;
private:
PersistenceMessageTrackerImpl _tracker_instance;
diff --git a/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp
index a0b3f12f76b..1a8d1cb8f88 100644
--- a/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp
@@ -4,6 +4,7 @@
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/stat.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
+#include <sstream>
#include <vespa/log/log.h>
LOG_SETUP(".distributor.operations.external.stat_bucket");
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index 73c65f54b21..2d1c469d072 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -13,6 +13,7 @@
#include <vespa/storage/distributor/distributor_bucket_space_repo.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/vdslib/state/cluster_state_bundle.h>
+#include <vespa/vespalib/stllike/hash_set.hpp>
#include <cinttypes>
#include <vespa/log/log.h>
@@ -68,10 +69,8 @@ TwoPhaseUpdateOperation::stateToString(SendState state) noexcept
case SendState::SINGLE_GET_SENT: return "SINGLE_GET_SENT";
case SendState::FULL_GETS_SENT: return "FULL_GETS_SENT";
case SendState::PUTS_SENT: return "PUTS_SENT";
- default:
- assert(!"Unknown state");
- return "";
}
+ abort();
}
void
@@ -130,7 +129,7 @@ TwoPhaseUpdateOperation::get_bucket_database_entries() const
}
bool
-TwoPhaseUpdateOperation::isFastPathPossible(const std::vector<BucketDatabase::Entry>& entries) const
+TwoPhaseUpdateOperation::isFastPathPossible(const std::vector<BucketDatabase::Entry>& entries)
{
// Fast path iff bucket exists AND is consistent (split and copies).
if (entries.size() != 1) {
@@ -245,6 +244,16 @@ TwoPhaseUpdateOperation::sendLostOwnershipTransientErrorReply(DistributorStripeM
}
void
+TwoPhaseUpdateOperation::send_operation_cancelled_reply(DistributorStripeMessageSender& sender)
+{
+ sendReplyWithResult(sender,
+ api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND,
+ "The update operation was cancelled due to a cluster state change "
+ "between executing the read and write phases of a write-repair "
+ "update"));
+}
+
+void
TwoPhaseUpdateOperation::send_feed_blocked_error_reply(DistributorStripeMessageSender& sender)
{
sendReplyWithResult(sender,
@@ -257,7 +266,8 @@ void
TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(std::shared_ptr<document::Document> doc,
api::Timestamp putTimestamp, DistributorStripeMessageSender& sender)
{
- if (lostBucketOwnershipBetweenPhases()) {
+ assert(!is_cancelled());
+ if (lostBucketOwnershipBetweenPhases()) { // TODO deprecate with cancellation
sendLostOwnershipTransientErrorReply(sender);
return;
}
@@ -281,6 +291,8 @@ TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(std::shared_ptr<documen
void
TwoPhaseUpdateOperation::onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply>& msg)
{
+ // In the case of cancellations, we let existing operations complete, but must not
+ // start new ones that are unaware of the cancellations.
if (_mode == Mode::FAST_PATH) {
handleFastPathReceive(sender, msg);
} else {
@@ -304,7 +316,10 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorStripeMessageSender& s
sendReplyWithResult(sender, getReply.getResult());
return;
}
-
+ if (is_cancelled()) {
+ send_operation_cancelled_reply(sender);
+ return;
+ }
if (!getReply.getDocument().get()) {
// Weird, document is no longer there ... Just fail.
sendReplyWithResult(sender, api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, ""));
@@ -316,7 +331,7 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorStripeMessageSender& s
std::shared_ptr<Operation> callback = _sentMessageMap.pop(msg->getMsgId());
assert(callback.get());
- Operation & callbackOp = *callback;
+ Operation& callbackOp = *callback;
IntermediateMessageSender intermediate(_sentMessageMap, std::move(callback), sender);
callbackOp.receive(intermediate, msg);
@@ -326,12 +341,12 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorStripeMessageSender& s
addTraceFromReply(*intermediate._reply);
auto& cb = dynamic_cast<UpdateOperation&>(callbackOp);
- std::pair<document::BucketId, uint16_t> bestNode = cb.getNewestTimestampLocation();
+ auto [newest_bucket, newest_node] = cb.getNewestTimestampLocation();
auto intermediate_update_reply = std::dynamic_pointer_cast<api::UpdateReply>(intermediate._reply);
assert(intermediate_update_reply);
if (!intermediate_update_reply->getResult().success() ||
- bestNode.first == document::BucketId(0))
+ (newest_bucket == document::BucketId(0)))
{
if (intermediate_update_reply->getResult().success() &&
(intermediate_update_reply->getOldTimestamp() == 0))
@@ -343,9 +358,14 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorStripeMessageSender& s
} else {
LOG(debug, "Update(%s) fast path: was inconsistent!", update_doc_id().c_str());
+ if (is_cancelled()) {
+ send_operation_cancelled_reply(sender);
+ return;
+ }
+
_updateReply = std::move(intermediate_update_reply);
- _fast_path_repair_source_node = bestNode.second;
- document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), bestNode.first);
+ _fast_path_repair_source_node = newest_node;
+ document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), newest_bucket);
auto cmd = std::make_shared<api::GetCommand>(bucket, _updateCmd->getDocumentId(), document::AllFields::NAME);
copyMessageSettings(*_updateCmd, *cmd);
@@ -383,7 +403,7 @@ TwoPhaseUpdateOperation::handleSafePathReceive(DistributorStripeMessageSender& s
callbackOp.receive(intermediate, msg);
if (!intermediate._reply.get()) {
- return; // Not enough replies received yet or we're draining callbacks.
+ return; // Not enough replies received yet, or we're draining callbacks.
}
addTraceFromReply(*intermediate._reply);
if (_sendState == SendState::METADATA_GETS_SENT) {
@@ -445,6 +465,13 @@ void TwoPhaseUpdateOperation::handle_safe_path_received_metadata_get(
"One or more metadata Get operations failed; aborting Update"));
return;
}
+ if (is_cancelled()) {
+ send_operation_cancelled_reply(sender);
+ return;
+ }
+ // Replicas _removed_ is handled by cancellation, but a concurrent state change may happen
+ // that _adds_ one or more available content nodes, which we cannot then blindly write to.
+ // So we have to explicitly check this edge case.
if (!replica_set_unchanged_after_get_operation()) {
// Use BUCKET_NOT_FOUND to trigger a silent retry.
LOG(debug, "Update(%s): replica set has changed after metadata get phase", update_doc_id().c_str());
@@ -490,6 +517,10 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorStripeMessageSende
sendReplyWithResult(sender, reply.getResult());
return;
}
+ if (is_cancelled()) {
+ send_operation_cancelled_reply(sender);
+ return;
+ }
// Single Get could technically be considered consistent with itself, so make
// sure we never treat that as sufficient for restarting in the fast path.
if ((_sendState != SendState::SINGLE_GET_SENT) && may_restart_with_fast_path(reply)) {
@@ -558,7 +589,8 @@ bool TwoPhaseUpdateOperation::replica_set_unchanged_after_get_operation() const
void TwoPhaseUpdateOperation::restart_with_fast_path_due_to_consistent_get_timestamps(DistributorStripeMessageSender& sender) {
LOG(debug, "Update(%s): all Gets returned in initial safe path were consistent, restarting in fast path mode",
update_doc_id().c_str());
- if (lostBucketOwnershipBetweenPhases()) {
+ assert(!is_cancelled());
+ if (lostBucketOwnershipBetweenPhases()) { // TODO remove once cancellation is wired
sendLostOwnershipTransientErrorReply(sender);
return;
}
@@ -579,7 +611,7 @@ TwoPhaseUpdateOperation::processAndMatchTasCondition(DistributorStripeMessageSen
std::unique_ptr<document::select::Node> selection;
try {
- selection = _parser.parse_selection(_updateCmd->getCondition().getSelection());
+ selection = _parser.parse_selection(_updateCmd->getCondition().getSelection());
} catch (const document::select::ParsingFailedException & e) {
sendReplyWithResult(sender, api::ReturnCode(
api::ReturnCode::ILLEGAL_PARAMETERS,
@@ -679,6 +711,22 @@ TwoPhaseUpdateOperation::onClose(DistributorStripeMessageSender& sender) {
}
}
+void
+TwoPhaseUpdateOperation::on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) {
+ // We have to explicitly cancel any and all pending Operation instances that have been
+ // launched by this operation. This is to ensure any DB updates they may transitively
+ // perform are aware of all cancellations that have occurred.
+ // There may be many messages pending for any given operation, so unique-ify them prior
+ // to avoid duplicate cancellation invocations.
+ vespalib::hash_set<Operation*> ops;
+ for (auto& msg_op : _sentMessageMap) {
+ ops.insert(msg_op.second.get());
+ }
+ for (auto* op : ops) {
+ op->cancel(sender, cancel_scope);
+ }
+}
+
vespalib::string TwoPhaseUpdateOperation::update_doc_id() const {
assert(_updateCmd.get() != nullptr);
return _updateCmd->getDocumentId().toString();
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
index d2ad5359fa6..7f64bb8d56c 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
@@ -71,6 +71,8 @@ public:
void onClose(DistributorStripeMessageSender& sender) override;
+ void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override;
+
private:
enum class SendState {
NONE_SENT,
@@ -94,19 +96,20 @@ private:
void sendReplyWithResult(DistributorStripeMessageSender&, const api::ReturnCode&);
void ensureUpdateReplyCreated();
- std::vector<BucketDatabase::Entry> get_bucket_database_entries() const;
- bool isFastPathPossible(const std::vector<BucketDatabase::Entry>& entries) const;
+ [[nodiscard]] std::vector<BucketDatabase::Entry> get_bucket_database_entries() const;
+ [[nodiscard]] static bool isFastPathPossible(const std::vector<BucketDatabase::Entry>& entries);
void startFastPathUpdate(DistributorStripeMessageSender& sender, std::vector<BucketDatabase::Entry> entries);
void startSafePathUpdate(DistributorStripeMessageSender&);
- bool lostBucketOwnershipBetweenPhases() const;
+ [[nodiscard]] bool lostBucketOwnershipBetweenPhases() const;
void sendLostOwnershipTransientErrorReply(DistributorStripeMessageSender&);
+ void send_operation_cancelled_reply(DistributorStripeMessageSender& sender);
void send_feed_blocked_error_reply(DistributorStripeMessageSender& sender);
void schedulePutsWithUpdatedDocument(
std::shared_ptr<document::Document>,
api::Timestamp,
DistributorStripeMessageSender&);
void applyUpdateToDocument(document::Document&) const;
- std::shared_ptr<document::Document> createBlankDocument() const;
+ [[nodiscard]] std::shared_ptr<document::Document> createBlankDocument() const;
void setUpdatedForTimestamp(api::Timestamp);
void handleFastPathReceive(DistributorStripeMessageSender&,
const std::shared_ptr<api::StorageReply>&);
@@ -120,20 +123,20 @@ private:
void handle_safe_path_received_single_full_get(DistributorStripeMessageSender&, api::GetReply&);
void handleSafePathReceivedGet(DistributorStripeMessageSender&, api::GetReply&);
void handleSafePathReceivedPut(DistributorStripeMessageSender&, const api::PutReply&);
- bool shouldCreateIfNonExistent() const;
+ [[nodiscard]] bool shouldCreateIfNonExistent() const;
bool processAndMatchTasCondition(
DistributorStripeMessageSender& sender,
const document::Document& candidateDoc);
- bool satisfiesUpdateTimestampConstraint(api::Timestamp) const;
+ [[nodiscard]] bool satisfiesUpdateTimestampConstraint(api::Timestamp) const;
void addTraceFromReply(api::StorageReply& reply);
- bool hasTasCondition() const noexcept;
+ [[nodiscard]] bool hasTasCondition() const noexcept;
void replyWithTasFailure(DistributorStripeMessageSender& sender,
vespalib::stringref message);
bool may_restart_with_fast_path(const api::GetReply& reply);
- bool replica_set_unchanged_after_get_operation() const;
+ [[nodiscard]] bool replica_set_unchanged_after_get_operation() const;
void restart_with_fast_path_due_to_consistent_get_timestamps(DistributorStripeMessageSender& sender);
// Precondition: reply has not yet been sent.
- vespalib::string update_doc_id() const;
+ [[nodiscard]] vespalib::string update_doc_id() const;
using ReplicaState = std::vector<std::pair<document::BucketId, uint16_t>>;
diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
index f43a6092372..60bddebbb89 100644
--- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
@@ -207,6 +207,13 @@ UpdateOperation::onClose(DistributorStripeMessageSender& sender)
_tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down"));
}
+void
+UpdateOperation::on_cancel(DistributorStripeMessageSender&, const CancelScope& cancel_scope)
+{
+ _tracker.cancel(cancel_scope);
+}
+
+
// The backend behavior of "create-if-missing" updates is to return the timestamp of the
// _new_ update operation if the document was created from scratch. The two-phase update
// operation logic auto-detects unexpected inconsistencies and tries to reconcile
diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
index 96fd878a324..7d2131d426d 100644
--- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
@@ -31,6 +31,7 @@ public:
std::string getStatus() const override { return ""; };
void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) override;
void onClose(DistributorStripeMessageSender& sender) override;
+ void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override;
std::pair<document::BucketId, uint16_t> getNewestTimestampLocation() const {
return _newestTimestampLocation;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
index 2e6d0e95ec9..bf64fa2eb82 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "garbagecollectionoperation.h"
+#include <vespa/storage/distributor/cancelled_replicas_pruner.h>
#include <vespa/storage/distributor/idealstatemanager.h>
#include <vespa/storage/distributor/idealstatemetricsset.h>
#include <vespa/storage/distributor/top_level_distributor.h>
@@ -22,6 +23,7 @@ GarbageCollectionOperation::GarbageCollectionOperation(const ClusterContext& clu
_cluster_state_version_at_phase1_start_time(0),
_remove_candidates(),
_replica_info(),
+ _cancel_scope(),
_max_documents_removed(0),
_is_done(false)
{}
@@ -148,6 +150,10 @@ GarbageCollectionOperation::onReceive(DistributorStripeMessageSender& sender,
}
}
+void GarbageCollectionOperation::on_cancel(DistributorStripeMessageSender&, const CancelScope& cancel_scope) {
+ _cancel_scope.merge(cancel_scope);
+}
+
void GarbageCollectionOperation::update_replica_response_info_from_reply(uint16_t from_node, const api::RemoveLocationReply& reply) {
_replica_info.emplace_back(_manager->operation_context().generate_unique_timestamp(),
from_node, reply.getBucketInfo());
@@ -186,6 +192,11 @@ bool GarbageCollectionOperation::may_start_write_phase() const {
if (!_ok) {
return false; // Already broken, no reason to proceed.
}
+ if (is_cancelled()) {
+ LOG(debug, "GC(%s): not sending write phase; operation has been explicitly cancelled",
+ getBucket().toString().c_str());
+ return false;
+ }
const auto state_version_now = _bucketSpace->getClusterState().getVersion();
if ((state_version_now != _cluster_state_version_at_phase1_start_time) ||
_bucketSpace->has_pending_cluster_state())
@@ -250,9 +261,17 @@ void GarbageCollectionOperation::update_last_gc_timestamp_in_db() {
}
void GarbageCollectionOperation::merge_received_bucket_info_into_db() {
- // TODO avoid two separate DB ops for this. Current API currently does not make this elegant.
- _manager->operation_context().update_bucket_database(getBucket(), _replica_info);
- update_last_gc_timestamp_in_db();
+ if (_cancel_scope.is_cancelled()) {
+ if (_cancel_scope.fully_cancelled()) {
+ return;
+ }
+ _replica_info = prune_cancelled_nodes(_replica_info, _cancel_scope);
+ }
+ if (!_replica_info.empty()) {
+ // TODO avoid two separate DB ops for this. Current API currently does not make this elegant.
+ _manager->operation_context().update_bucket_database(getBucket(), _replica_info);
+ update_last_gc_timestamp_in_db();
+ } // else: effectively fully cancelled, no touching the DB.
}
void GarbageCollectionOperation::update_gc_metrics() {
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
index 27dc519dcc2..97efbe694de 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
@@ -3,10 +3,11 @@
#include "idealstateoperation.h"
#include <vespa/document/base/documentid.h>
+#include <vespa/persistence/spi/id_and_timestamp.h>
#include <vespa/storage/bucketdb/bucketcopy.h>
#include <vespa/storage/distributor/messagetracker.h>
#include <vespa/storage/distributor/operation_sequencer.h>
-#include <vespa/persistence/spi/id_and_timestamp.h>
+#include <vespa/storage/distributor/operations/cancel_scope.h>
#include <vespa/vespalib/stllike/hash_map.h>
#include <vector>
@@ -22,13 +23,14 @@ public:
void onStart(DistributorStripeMessageSender& sender) override;
void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
+ void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override;
const char* getName() const noexcept override { return "garbagecollection"; };
Type getType() const noexcept override { return GARBAGE_COLLECTION; }
bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override;
- bool is_two_phase() const noexcept {
+ [[nodiscard]] bool is_two_phase() const noexcept {
return ((_phase == Phase::ReadMetadataPhase) || (_phase == Phase::WriteRemovesPhase));
}
- bool is_done() const noexcept { return _is_done; }
+ [[nodiscard]] bool is_done() const noexcept { return _is_done; }
protected:
MessageTracker _tracker;
@@ -54,13 +56,14 @@ private:
RemoveCandidates _remove_candidates;
std::vector<SequencingHandle> _gc_write_locks;
std::vector<BucketCopy> _replica_info;
+ CancelScope _cancel_scope;
uint32_t _max_documents_removed;
bool _is_done;
static RemoveCandidates steal_selection_matches_as_candidates(api::RemoveLocationReply& reply);
void send_current_phase_remove_locations(DistributorStripeMessageSender& sender);
- std::vector<spi::IdAndTimestamp> compile_phase_two_send_set() const;
+ [[nodiscard]] std::vector<spi::IdAndTimestamp> compile_phase_two_send_set() const;
void handle_ok_legacy_reply(uint16_t from_node, const api::RemoveLocationReply& reply);
void handle_ok_phase1_reply(api::RemoveLocationReply& reply);
diff --git a/storage/src/vespa/storage/distributor/operations/operation.cpp b/storage/src/vespa/storage/distributor/operations/operation.cpp
index 4d82de170ae..9f944a94178 100644
--- a/storage/src/vespa/storage/distributor/operations/operation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/operation.cpp
@@ -12,7 +12,8 @@ LOG_SETUP(".distributor.callback");
namespace storage::distributor {
Operation::Operation()
- : _startTime()
+ : _startTime(),
+ _cancelled(false)
{
}
@@ -45,6 +46,11 @@ Operation::copyMessageSettings(const api::StorageCommand& source, api::StorageCo
target.setPriority(source.getPriority());
}
+void Operation::cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) {
+ _cancelled = true;
+ on_cancel(sender, cancel_scope);
+}
+
void
Operation::on_blocked()
{
diff --git a/storage/src/vespa/storage/distributor/operations/operation.h b/storage/src/vespa/storage/distributor/operations/operation.h
index bc7e510a5b6..64caacfc642 100644
--- a/storage/src/vespa/storage/distributor/operations/operation.h
+++ b/storage/src/vespa/storage/distributor/operations/operation.h
@@ -16,6 +16,7 @@ class StorageComponent;
namespace distributor {
+class CancelScope;
class DistributorStripeOperationContext;
class PendingMessageTracker;
class OperationSequencer;
@@ -40,7 +41,7 @@ public:
on the owner of the message that was replied to.
*/
virtual void receive(DistributorStripeMessageSender& sender,
- const std::shared_ptr<api::StorageReply> & msg)
+ const std::shared_ptr<api::StorageReply> & msg)
{
onReceive(sender, msg);
}
@@ -60,6 +61,22 @@ public:
void start(DistributorStripeMessageSender& sender);
/**
+ * Explicitly cancel the operation. Cancelled operations may or may not (depending on
+ * the operation implementation) be immediately aborted, but they should either way
+ * never insert any bucket information _for cancelled nodes_ into the bucket DB after
+ * cancel() has been called.
+ */
+ void cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope);
+
+ /**
+ * Whether cancel() has been invoked at least once on this instance. This does not
+ * distinguish between cancellations caused by ownership transfers and those caused
+ * by nodes becoming unavailable; Operation implementations that care about this need
+ * to implement cancel() themselves and inspect the provided CancelScope.
+ */
+ [[nodiscard]] bool is_cancelled() const noexcept { return _cancelled; }
+
+ /**
* Returns true if we are blocked to start this operation given
* the pending messages.
*/
@@ -93,8 +110,15 @@ private:
const std::shared_ptr<api::StorageReply> & msg) = 0;
protected:
+ virtual void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) {
+ (void)sender;
+ (void)cancel_scope;
+ }
+
static constexpr vespalib::duration MAX_TIMEOUT = 3600s;
+
vespalib::system_time _startTime;
+ bool _cancelled;
};
}
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
index a4295613fd2..498f3a5feab 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
@@ -1,10 +1,12 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "persistencemessagetracker.h"
+#include "cancelled_replicas_pruner.h"
#include "distributor_bucket_space_repo.h"
#include "distributor_bucket_space.h"
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/storageapi/message/persistence.h>
+#include <algorithm>
#include <vespa/log/log.h>
LOG_SETUP(".persistencemessagetracker");
@@ -18,12 +20,15 @@ PersistenceMessageTrackerImpl::PersistenceMessageTrackerImpl(
DistributorStripeOperationContext& op_ctx,
api::Timestamp revertTimestamp)
: MessageTracker(node_ctx),
+ _remapBucketInfo(),
+ _bucketInfo(),
_metric(metric),
_reply(std::move(reply)),
_op_ctx(op_ctx),
_revertTimestamp(revertTimestamp),
_trace(_reply->getTrace().getLevel()),
_requestTimer(node_ctx.clock()),
+ _cancel_scope(),
_n_persistence_replies_total(0),
_n_successful_persistence_replies(0),
_priority(_reply->getPriority()),
@@ -34,8 +39,32 @@ PersistenceMessageTrackerImpl::PersistenceMessageTrackerImpl(
PersistenceMessageTrackerImpl::~PersistenceMessageTrackerImpl() = default;
void
+PersistenceMessageTrackerImpl::cancel(const CancelScope& cancel_scope)
+{
+ _cancel_scope.merge(cancel_scope);
+}
+
+void
+PersistenceMessageTrackerImpl::prune_cancelled_nodes_if_present(
+ BucketInfoMap& bucket_and_replicas,
+ const CancelScope& cancel_scope)
+{
+ for (auto& info : bucket_and_replicas) {
+ info.second = prune_cancelled_nodes(info.second, cancel_scope);
+ }
+}
+
+void
PersistenceMessageTrackerImpl::updateDB()
{
+ if (_cancel_scope.is_cancelled()) {
+ if (_cancel_scope.fully_cancelled()) {
+ return; // Fully cancelled ops cannot mutate the DB at all
+ }
+ prune_cancelled_nodes_if_present(_bucketInfo, _cancel_scope);
+ prune_cancelled_nodes_if_present(_remapBucketInfo, _cancel_scope);
+ }
+
for (const auto & entry : _bucketInfo) {
_op_ctx.update_bucket_database(entry.first, entry.second);
}
@@ -229,12 +258,19 @@ PersistenceMessageTrackerImpl::updateFailureResult(const api::BucketInfoReply& r
_success = false;
}
+bool
+PersistenceMessageTrackerImpl::node_is_effectively_cancelled(uint16_t node) const noexcept
+{
+ return _cancel_scope.node_is_cancelled(node); // Implicitly covers the fully cancelled case
+}
+
void
PersistenceMessageTrackerImpl::handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node)
{
LOG(spam, "Received CreateBucket reply for %s from node %u", reply.getBucketId().toString().c_str(), node);
if (!reply.getResult().success()
- && reply.getResult().getResult() != api::ReturnCode::EXISTS)
+ && (reply.getResult().getResult() != api::ReturnCode::EXISTS)
+ && !node_is_effectively_cancelled(node))
{
LOG(spam, "Create bucket reply failed, so deleting it from bucket db");
// We don't know if the bucket exists at this point, so we remove it from the DB.
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.h b/storage/src/vespa/storage/distributor/persistencemessagetracker.h
index 9b06547dd98..8c44d70062c 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.h
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.h
@@ -4,6 +4,7 @@
#include "distributor_stripe_component.h"
#include "distributormetricsset.h"
#include "messagetracker.h"
+#include <vespa/storage/distributor/operations/cancel_scope.h>
#include <vespa/storageframework/generic/clock/timer.h>
#include <vespa/storageapi/messageapi/bucketinfocommand.h>
#include <vespa/storageapi/messageapi/bucketinforeply.h>
@@ -14,6 +15,7 @@ struct PersistenceMessageTracker {
virtual ~PersistenceMessageTracker() = default;
using ToSend = MessageTracker::ToSend;
+ virtual void cancel(const CancelScope& cancel_scope) = 0;
virtual void fail(MessageSender&, const api::ReturnCode&) = 0;
virtual void queueMessageBatch(std::vector<ToSend> messages) = 0;
virtual uint16_t receiveReply(MessageSender&, api::BucketInfoReply&) = 0;
@@ -26,14 +28,9 @@ struct PersistenceMessageTracker {
};
class PersistenceMessageTrackerImpl final
- : public PersistenceMessageTracker,
- public MessageTracker
+ : public PersistenceMessageTracker,
+ public MessageTracker
{
-private:
- using BucketInfoMap = std::map<document::Bucket, std::vector<BucketCopy>>;
- BucketInfoMap _remapBucketInfo;
- BucketInfoMap _bucketInfo;
-
public:
PersistenceMessageTrackerImpl(PersistenceOperationMetricSet& metric,
std::shared_ptr<api::BucketInfoReply> reply,
@@ -42,6 +39,8 @@ public:
api::Timestamp revertTimestamp = 0);
~PersistenceMessageTrackerImpl() override;
+ void cancel(const CancelScope& cancel_scope) override;
+
void updateDB();
void updateMetrics();
[[nodiscard]] bool success() const noexcept { return _success; }
@@ -67,8 +66,11 @@ public:
void queueMessageBatch(std::vector<MessageTracker::ToSend> messages) override;
private:
- using MessageBatch = std::vector<uint64_t>;
+ using MessageBatch = std::vector<uint64_t>;
+ using BucketInfoMap = std::map<document::Bucket, std::vector<BucketCopy>>;
+ BucketInfoMap _remapBucketInfo;
+ BucketInfoMap _bucketInfo;
std::vector<MessageBatch> _messageBatches;
PersistenceOperationMetricSet& _metric;
std::shared_ptr<api::BucketInfoReply> _reply;
@@ -77,20 +79,24 @@ private:
std::vector<BucketNodePair> _revertNodes;
mbus::Trace _trace;
framework::MilliSecTimer _requestTimer;
+ CancelScope _cancel_scope;
uint32_t _n_persistence_replies_total;
uint32_t _n_successful_persistence_replies;
uint8_t _priority;
bool _success;
- bool canSendReplyEarly() const;
+ static void prune_cancelled_nodes_if_present(BucketInfoMap& bucket_and_replicas,
+ const CancelScope& cancel_scope);
+ [[nodiscard]] bool canSendReplyEarly() const;
void addBucketInfoFromReply(uint16_t node, const api::BucketInfoReply& reply);
void logSuccessfulReply(uint16_t node, const api::BucketInfoReply& reply) const;
- bool hasSentReply() const noexcept { return !_reply; }
- bool shouldRevert() const;
- bool has_majority_successful_replies() const noexcept;
- bool has_minority_test_and_set_failure() const noexcept;
+ [[nodiscard]] bool hasSentReply() const noexcept { return !_reply; }
+ [[nodiscard]] bool shouldRevert() const;
+ [[nodiscard]] bool has_majority_successful_replies() const noexcept;
+ [[nodiscard]] bool has_minority_test_and_set_failure() const noexcept;
void sendReply(MessageSender& sender);
void updateFailureResult(const api::BucketInfoReply& reply);
+ [[nodiscard]] bool node_is_effectively_cancelled(uint16_t node) const noexcept;
void handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node);
void handlePersistenceReply(api::BucketInfoReply& reply, uint16_t node);
void transfer_trace_state_to_reply();
diff --git a/storage/src/vespa/storage/distributor/sentmessagemap.h b/storage/src/vespa/storage/distributor/sentmessagemap.h
index 70bee311f78..951ed6a6877 100644
--- a/storage/src/vespa/storage/distributor/sentmessagemap.h
+++ b/storage/src/vespa/storage/distributor/sentmessagemap.h
@@ -10,19 +10,23 @@ class Operation;
class SentMessageMap {
public:
+ using Map = std::map<api::StorageMessage::Id, std::shared_ptr<Operation>>;
+
SentMessageMap();
~SentMessageMap();
- std::shared_ptr<Operation> pop(api::StorageMessage::Id id);
- std::shared_ptr<Operation> pop();
+ [[nodiscard]] std::shared_ptr<Operation> pop(api::StorageMessage::Id id);
+ [[nodiscard]] std::shared_ptr<Operation> pop();
void insert(api::StorageMessage::Id id, const std::shared_ptr<Operation> & msg);
void clear();
- uint32_t size() const { return _map.size(); }
+ [[nodiscard]] uint32_t size() const { return _map.size(); }
[[nodiscard]] bool empty() const noexcept { return _map.empty(); }
std::string toString() const;
+
+ Map::const_iterator begin() const noexcept { return _map.cbegin(); }
+ Map::const_iterator end() const noexcept { return _map.cend(); }
private:
- using Map = std::map<api::StorageMessage::Id, std::shared_ptr<Operation>>;
Map _map;
};
diff --git a/storage/src/vespa/storage/distributor/statechecker.cpp b/storage/src/vespa/storage/distributor/statechecker.cpp
index cd8b6e934d4..7b30be53c13 100644
--- a/storage/src/vespa/storage/distributor/statechecker.cpp
+++ b/storage/src/vespa/storage/distributor/statechecker.cpp
@@ -5,6 +5,7 @@
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/stllike/hash_set_insert.hpp>
+#include <sstream>
#include <vespa/log/log.h>
LOG_SETUP(".distributor.statechecker");
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index dcc3602c5e7..36d2393e148 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -15,6 +15,7 @@
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <algorithm>
+#include <sstream>
#include <vespa/log/log.h>
LOG_SETUP(".persistence.mergehandler");
diff --git a/storage/src/vespa/storage/persistence/processallhandler.cpp b/storage/src/vespa/storage/persistence/processallhandler.cpp
index c22b08c5ca5..a6f6bd5d3fe 100644
--- a/storage/src/vespa/storage/persistence/processallhandler.cpp
+++ b/storage/src/vespa/storage/persistence/processallhandler.cpp
@@ -8,6 +8,7 @@
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/persistence/spi/docentry.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <sstream>
#include <vespa/log/log.h>
LOG_SETUP(".persistence.processall");
diff --git a/vespalib/src/tests/alloc/alloc_test.cpp b/vespalib/src/tests/alloc/alloc_test.cpp
index 04e009fcf8a..f39543daa2d 100644
--- a/vespalib/src/tests/alloc/alloc_test.cpp
+++ b/vespalib/src/tests/alloc/alloc_test.cpp
@@ -1,11 +1,11 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/config.h>
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/util/alloc.h>
#include <vespa/vespalib/util/memory_allocator.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/round_up_to_page_size.h>
-#include <vespa/vespalib/util/sanitizers.h>
#include <vespa/vespalib/util/size_literals.h>
#include <cstddef>
#include <sys/mman.h>
@@ -216,12 +216,12 @@ TEST("auto alloced mmap alloc can not be extended if no room") {
}
/*
- * The two following tests are disabled when address sanitizer is
+ * The two following tests are disabled when any sanitizer is
* enabled since extra instrumentation code might trigger extra mmap
* or munmap calls, breaking some of the assumptions in the disabled
* tests.
*/
-#ifndef VESPA_USE_ADDRESS_SANITIZER
+#ifndef VESPA_USE_SANITIZER
TEST("mmap alloc can be extended if room") {
Alloc dummy = Alloc::allocMMap(100);
Alloc reserved = Alloc::allocMMap(100);
diff --git a/vespalib/src/vespa/vespalib/util/mmap_file_allocator.cpp b/vespalib/src/vespa/vespalib/util/mmap_file_allocator.cpp
index f711d3d8685..3d0b9debeda 100644
--- a/vespalib/src/vespa/vespalib/util/mmap_file_allocator.cpp
+++ b/vespalib/src/vespa/vespalib/util/mmap_file_allocator.cpp
@@ -57,7 +57,7 @@ MmapFileAllocator::alloc(size_t sz) const
uint64_t offset = alloc_area(sz);
void *buf = mmap(nullptr, sz, PROT_READ | PROT_WRITE, MAP_SHARED, _file.getFileDescriptor(), offset);
if (buf == MAP_FAILED) {
- throw IoException(fmt("Failed mmap(nullptr, %zu, PROT_READ | PROT_WRITE, MAP_SHARED, %s(fd=%d), %lu). Reason given by OS = '%s'",
+ throw IoException(fmt("Failed mmap(nullptr, %zu, PROT_READ | PROT_WRITE, MAP_SHARED, %s(fd=%d), %" PRIu64 "). Reason given by OS = '%s'",
sz, _file.getFilename().c_str(), _file.getFileDescriptor(), offset, getLastErrorString().c_str()),
IoException::getErrorType(errno), VESPA_STRLOC);
}