diff options
91 files changed, 2322 insertions, 858 deletions
diff --git a/client/go/internal/cli/cmd/config.go b/client/go/internal/cli/cmd/config.go index 0a03686dd33..2ebd6b0793e 100644 --- a/client/go/internal/cli/cmd/config.go +++ b/client/go/internal/cli/cmd/config.go @@ -52,7 +52,7 @@ most to least preferred: 3. Global config value 4. Default value -The following flags/options can be configured: +The following global flags/options can be configured: application @@ -96,13 +96,6 @@ e.g. vespa deploy or vespa query. Possible values are: - hosted: Connect to hosted Vespa (internal platform) - *url*: Connect to a platform running at given URL. -wait - -Specifies the number of seconds to wait for a service to become ready or -deployment to complete. Use this to have a potentially long-running command -block until the operation is complete, e.g. with vespa deploy. Defaults to 0 -(no waiting) - zone Specifies a custom dev or perf zone to use when connecting to a Vespa platform. diff --git a/client/go/internal/cli/cmd/curl.go b/client/go/internal/cli/cmd/curl.go index 3009cab2b5e..44540db9ccf 100644 --- a/client/go/internal/cli/cmd/curl.go +++ b/client/go/internal/cli/cmd/curl.go @@ -39,7 +39,17 @@ $ vespa curl -- -v --data-urlencode "yql=select * from music where album contain if err != nil { return err } - service, err := target.Service(curlService, time.Duration(waitSecs)*time.Second, 0, cli.config.cluster()) + var service *vespa.Service + useDeploy := curlService == "deploy" + waiter := cli.waiter(false, time.Duration(waitSecs)*time.Second) + if useDeploy { + if cli.config.cluster() != "" { + return fmt.Errorf("cannot specify cluster for service %s", curlService) + } + service, err = target.DeployService() + } else { + service, err = waiter.Service(target, cli.config.cluster()) + } if err != nil { return err } @@ -49,19 +59,15 @@ $ vespa curl -- -v --data-urlencode "yql=select * from music where album contain if err != nil { return err } - switch curlService { - case vespa.DeployService: + if useDeploy { if err := addAccessToken(c, target); err != nil { return err } - case vespa.DocumentService, vespa.QueryService: + } else { c.CaCertificate = service.TLSOptions.CACertificateFile c.PrivateKey = service.TLSOptions.PrivateKeyFile c.Certificate = service.TLSOptions.CertificateFile - default: - return fmt.Errorf("service not found: %s", curlService) } - if dryRun { log.Print(c.String()) } else { @@ -73,7 +79,7 @@ $ vespa curl -- -v --data-urlencode "yql=select * from music where album contain }, } cmd.Flags().BoolVarP(&dryRun, "dry-run", "n", false, "Print the curl command that would be executed") - cmd.Flags().StringVarP(&curlService, "service", "s", "query", "Which service to query. Must be \"deploy\", \"document\" or \"query\"") + cmd.Flags().StringVarP(&curlService, "service", "s", "container", "Which service to query. Must be \"deploy\" or \"container\"") cli.bindWaitFlag(cmd, 60, &waitSecs) return cmd } diff --git a/client/go/internal/cli/cmd/curl_test.go b/client/go/internal/cli/cmd/curl_test.go index 3eca0726bb4..520cf41e308 100644 --- a/client/go/internal/cli/cmd/curl_test.go +++ b/client/go/internal/cli/cmd/curl_test.go @@ -14,7 +14,6 @@ func TestCurl(t *testing.T) { cli.Environment["VESPA_CLI_ENDPOINTS"] = "{\"endpoints\":[{\"cluster\":\"container\",\"url\":\"http://127.0.0.1:8080\"}]}" assert.Nil(t, cli.Run("config", "set", "application", "t1.a1.i1")) assert.Nil(t, cli.Run("config", "set", "target", "cloud")) - assert.Nil(t, cli.Run("config", "set", "cluster", "container")) assert.Nil(t, cli.Run("auth", "api-key")) assert.Nil(t, cli.Run("auth", "cert", "--no-add")) diff --git a/client/go/internal/cli/cmd/deploy.go b/client/go/internal/cli/cmd/deploy.go index ef32d7f01b7..cf2e435fea5 100644 --- a/client/go/internal/cli/cmd/deploy.go +++ b/client/go/internal/cli/cmd/deploy.go @@ -25,7 +25,7 @@ func newDeployCmd(cli *CLI) *cobra.Command { copyCert bool ) cmd := &cobra.Command{ - Use: "deploy [application-directory]", + Use: "deploy [application-directory-or-file]", Short: "Deploy (prepare and activate) an application package", Long: `Deploy (prepare and activate) an application package. @@ -73,8 +73,12 @@ $ vespa deploy -t cloud -z perf.aws-us-east-1c`, return err } } + waiter := cli.waiter(false, timeout) + if _, err := waiter.DeployService(target); err != nil { + return err + } var result vespa.PrepareResult - if err := cli.spinner(cli.Stderr, "Uploading application package ...", func() error { + if err := cli.spinner(cli.Stderr, "Uploading application package...", func() error { result, err = vespa.Deploy(opts) return err }); err != nil { @@ -84,18 +88,18 @@ $ vespa deploy -t cloud -z perf.aws-us-east-1c`, if opts.Target.IsCloud() { cli.printSuccess("Triggered deployment of ", color.CyanString(pkg.Path), " with run ID ", color.CyanString(strconv.FormatInt(result.ID, 10))) } else { - cli.printSuccess("Deployed ", color.CyanString(pkg.Path)) + cli.printSuccess("Deployed ", color.CyanString(pkg.Path), " with session ID ", color.CyanString(strconv.FormatInt(result.ID, 10))) printPrepareLog(cli.Stderr, result) } if opts.Target.IsCloud() { - log.Printf("\nUse %s for deployment status, or follow this deployment at", color.CyanString("vespa status")) + log.Printf("\nUse %s for deployment status, or follow this deployment at", color.CyanString("vespa status deployment")) log.Print(color.CyanString(fmt.Sprintf("%s/tenant/%s/application/%s/%s/instance/%s/job/%s-%s/run/%d", opts.Target.Deployment().System.ConsoleURL, opts.Target.Deployment().Application.Tenant, opts.Target.Deployment().Application.Application, opts.Target.Deployment().Zone.Environment, opts.Target.Deployment().Application.Instance, opts.Target.Deployment().Zone.Environment, opts.Target.Deployment().Zone.Region, result.ID))) } - return waitForQueryService(cli, target, result.ID, timeout) + return waitForDeploymentReady(cli, target, result.ID, timeout) }, } cmd.Flags().StringVarP(&logLevelArg, "log-level", "l", "error", `Log level for Vespa logs. Must be "error", "warning", "info" or "debug"`) @@ -107,7 +111,7 @@ $ vespa deploy -t cloud -z perf.aws-us-east-1c`, func newPrepareCmd(cli *CLI) *cobra.Command { return &cobra.Command{ - Use: "prepare application-directory", + Use: "prepare [application-directory-or-file]", Short: "Prepare an application package for activation", Args: cobra.MaximumNArgs(1), DisableAutoGenTag: true, @@ -123,7 +127,7 @@ func newPrepareCmd(cli *CLI) *cobra.Command { } opts := vespa.DeploymentOptions{ApplicationPackage: pkg, Target: target} var result vespa.PrepareResult - err = cli.spinner(cli.Stderr, "Uploading application package ...", func() error { + err = cli.spinner(cli.Stderr, "Uploading application package...", func() error { result, err = vespa.Prepare(opts) return err }) @@ -149,10 +153,6 @@ func newActivateCmd(cli *CLI) *cobra.Command { DisableAutoGenTag: true, SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { - pkg, err := cli.applicationPackageFrom(args, true) - if err != nil { - return fmt.Errorf("could not find application package: %w", err) - } sessionID, err := cli.config.readSessionID(vespa.DefaultApplication) if err != nil { return fmt.Errorf("could not read session id: %w", err) @@ -162,24 +162,32 @@ func newActivateCmd(cli *CLI) *cobra.Command { return err } timeout := time.Duration(waitSecs) * time.Second - opts := vespa.DeploymentOptions{ApplicationPackage: pkg, Target: target, Timeout: timeout} + waiter := cli.waiter(false, timeout) + if _, err := waiter.DeployService(target); err != nil { + return err + } + opts := vespa.DeploymentOptions{Target: target, Timeout: timeout} err = vespa.Activate(sessionID, opts) if err != nil { return err } - cli.printSuccess("Activated ", color.CyanString(pkg.Path), " with session ", sessionID) - return waitForQueryService(cli, target, sessionID, timeout) + cli.printSuccess("Activated application with session ", sessionID) + return waitForDeploymentReady(cli, target, sessionID, timeout) }, } cli.bindWaitFlag(cmd, 60, &waitSecs) return cmd } -func waitForQueryService(cli *CLI, target vespa.Target, sessionOrRunID int64, timeout time.Duration) error { +func waitForDeploymentReady(cli *CLI, target vespa.Target, sessionOrRunID int64, timeout time.Duration) error { if timeout == 0 { return nil } - _, err := cli.service(target, vespa.QueryService, sessionOrRunID, cli.config.cluster(), timeout) + waiter := cli.waiter(false, timeout) + if _, err := waiter.Deployment(target, sessionOrRunID); err != nil { + return err + } + _, err := waiter.Services(target) return err } diff --git a/client/go/internal/cli/cmd/deploy_test.go b/client/go/internal/cli/cmd/deploy_test.go index 16aa3fd0ed8..d578b2a4629 100644 --- a/client/go/internal/cli/cmd/deploy_test.go +++ b/client/go/internal/cli/cmd/deploy_test.go @@ -6,6 +6,7 @@ package cmd import ( "bytes" + "io" "path/filepath" "strconv" "strings" @@ -61,6 +62,32 @@ Hint: Pass --add-cert to use the certificate of the current application assert.Contains(t, stdout.String(), "Success: Triggered deployment") } +func TestDeployWait(t *testing.T) { + cli, stdout, _ := newTestCLI(t) + client := &mock.HTTPClient{} + cli.httpClient = client + cli.retryInterval = 0 + pkg := "testdata/applications/withSource/src/main/application" + // Deploy service is initially unavailable + client.NextResponseError(io.EOF) + client.NextStatus(500) + client.NextStatus(500) + // ... then becomes healthy + client.NextStatus(200) + // Deployment succeeds + client.NextResponse(mock.HTTPResponse{ + URI: "/application/v2/tenant/default/prepareandactivate", + Status: 200, + Body: []byte(`{"session-id": "1"}`), + }) + mockServiceStatus(client, "foo") // Wait for deployment + mockServiceStatus(client, "foo") // Look up services + assert.Nil(t, cli.Run("deploy", "--wait=3", pkg)) + assert.Equal(t, + "\nSuccess: Deployed "+pkg+" with session ID 1\n", + stdout.String()) +} + func TestPrepareZip(t *testing.T) { assertPrepare("testdata/applications/withTarget/target/application.zip", []string{"prepare", "testdata/applications/withTarget/target/application.zip"}, t) @@ -85,7 +112,7 @@ func TestDeployZipWithURLTargetArgument(t *testing.T) { cli.httpClient = client assert.Nil(t, cli.Run(arguments...)) assert.Equal(t, - "\nSuccess: Deployed "+applicationPackage+"\n", + "\nSuccess: Deployed "+applicationPackage+" with session ID 0\n", stdout.String()) assertDeployRequestMade("http://target:19071", client, t) } @@ -161,7 +188,7 @@ func assertDeploy(applicationPackage string, arguments []string, t *testing.T) { cli.httpClient = client assert.Nil(t, cli.Run(arguments...)) assert.Equal(t, - "\nSuccess: Deployed "+applicationPackage+"\n", + "\nSuccess: Deployed "+applicationPackage+" with session ID 0\n", stdout.String()) assertDeployRequestMade("http://127.0.0.1:19071", client, t) } @@ -194,7 +221,7 @@ func assertActivate(applicationPackage string, arguments []string, t *testing.T) } assert.Nil(t, cli.Run(arguments...)) assert.Equal(t, - "Success: Activated "+applicationPackage+" with session 42\n", + "Success: Activated application with session 42\n", stdout.String()) url := "http://127.0.0.1:19071/application/v2/tenant/default/session/42/active" assert.Equal(t, url, client.LastRequest.URL.String()) diff --git a/client/go/internal/cli/cmd/document.go b/client/go/internal/cli/cmd/document.go index 1e5d1c30f6e..6c46baa297a 100644 --- a/client/go/internal/cli/cmd/document.go +++ b/client/go/internal/cli/cmd/document.go @@ -298,7 +298,8 @@ func documentService(cli *CLI, waitSecs int) (*vespa.Service, error) { if err != nil { return nil, err } - return cli.service(target, vespa.DocumentService, 0, cli.config.cluster(), time.Duration(waitSecs)*time.Second) + waiter := cli.waiter(false, time.Duration(waitSecs)*time.Second) + return waiter.Service(target, cli.config.cluster()) } func printResult(cli *CLI, result util.OperationResult, payloadOnlyOnSuccess bool) error { diff --git a/client/go/internal/cli/cmd/document_test.go b/client/go/internal/cli/cmd/document_test.go index bce81da91c5..64319296299 100644 --- a/client/go/internal/cli/cmd/document_test.go +++ b/client/go/internal/cli/cmd/document_test.go @@ -79,7 +79,7 @@ func TestDocumentRemoveWithoutIdArgVerbose(t *testing.T) { func TestDocumentSendMissingId(t *testing.T) { cli, _, stderr := newTestCLI(t) - assert.NotNil(t, cli.Run("document", "put", "testdata/A-Head-Full-of-Dreams-Without-Operation.json")) + assert.NotNil(t, cli.Run("-t", "http://127.0.0.1:8080", "document", "put", "testdata/A-Head-Full-of-Dreams-Without-Operation.json")) assert.Equal(t, "Error: no document id given neither as argument or as a 'put', 'update' or 'remove' key in the JSON file\n", stderr.String()) @@ -87,7 +87,7 @@ func TestDocumentSendMissingId(t *testing.T) { func TestDocumentSendWithDisagreeingOperations(t *testing.T) { cli, _, stderr := newTestCLI(t) - assert.NotNil(t, cli.Run("document", "update", "testdata/A-Head-Full-of-Dreams-Put.json")) + assert.NotNil(t, cli.Run("-t", "http://127.0.0.1:8080", "document", "update", "testdata/A-Head-Full-of-Dreams-Put.json")) assert.Equal(t, "Error: wanted document operation is update, but JSON file specifies put\n", stderr.String()) @@ -110,20 +110,20 @@ func TestDocumentGet(t *testing.T) { "id:mynamespace:music::a-head-full-of-dreams", t) } -func assertDocumentSend(arguments []string, expectedOperation string, expectedMethod string, expectedDocumentId string, expectedPayloadFile string, t *testing.T) { +func assertDocumentSend(args []string, expectedOperation string, expectedMethod string, expectedDocumentId string, expectedPayloadFile string, t *testing.T) { + t.Helper() client := &mock.HTTPClient{} cli, stdout, stderr := newTestCLI(t) cli.httpClient = client - documentURL, err := documentServiceURL(client) - if err != nil { - t.Fatal(err) - } + documentURL := "http://127.0.0.1:8080" expectedPath, _ := vespa.IdToURLPath(expectedDocumentId) expectedURL := documentURL + "/document/v1/" + expectedPath + "?timeout=60000ms" - assert.Nil(t, cli.Run(arguments...)) + finalArgs := []string{"-t", documentURL} + finalArgs = append(finalArgs, args...) + assert.Nil(t, cli.Run(finalArgs...)) verbose := false - for _, a := range arguments { + for _, a := range args { if a == "-v" { verbose = true } @@ -154,16 +154,15 @@ func assertDocumentSend(arguments []string, expectedOperation string, expectedMe } } -func assertDocumentGet(arguments []string, documentId string, t *testing.T) { +func assertDocumentGet(args []string, documentId string, t *testing.T) { client := &mock.HTTPClient{} - documentURL, err := documentServiceURL(client) - if err != nil { - t.Fatal(err) - } + documentURL := "http://127.0.0.1:8080" client.NextResponseString(200, "{\"fields\":{\"foo\":\"bar\"}}") cli, stdout, _ := newTestCLI(t) cli.httpClient = client - assert.Nil(t, cli.Run(arguments...)) + finalArgs := []string{"-t", documentURL} + finalArgs = append(finalArgs, args...) + assert.Nil(t, cli.Run(finalArgs...)) assert.Equal(t, `{ "fields": { @@ -182,7 +181,7 @@ func assertDocumentTransportError(t *testing.T, errorMessage string) { client.NextResponseError(fmt.Errorf(errorMessage)) cli, _, stderr := newTestCLI(t) cli.httpClient = client - assert.NotNil(t, cli.Run("document", "put", + assert.NotNil(t, cli.Run("-t", "http://127.0.0.1:8080", "document", "put", "id:mynamespace:music::a-head-full-of-dreams", "testdata/A-Head-Full-of-Dreams-Put.json")) assert.Equal(t, @@ -195,7 +194,7 @@ func assertDocumentError(t *testing.T, status int, errorMessage string) { client.NextResponseString(status, errorMessage) cli, _, stderr := newTestCLI(t) cli.httpClient = client - assert.NotNil(t, cli.Run("document", "put", + assert.NotNil(t, cli.Run("-t", "http://127.0.0.1:8080", "document", "put", "id:mynamespace:music::a-head-full-of-dreams", "testdata/A-Head-Full-of-Dreams-Put.json")) assert.Equal(t, @@ -208,14 +207,10 @@ func assertDocumentServerError(t *testing.T, status int, errorMessage string) { client.NextResponseString(status, errorMessage) cli, _, stderr := newTestCLI(t) cli.httpClient = client - assert.NotNil(t, cli.Run("document", "put", + assert.NotNil(t, cli.Run("-t", "http://127.0.0.1:8080", "document", "put", "id:mynamespace:music::a-head-full-of-dreams", "testdata/A-Head-Full-of-Dreams-Put.json")) assert.Equal(t, - "Error: Container (document API) at http://127.0.0.1:8080: Status "+strconv.Itoa(status)+"\n\n"+errorMessage+"\n", + "Error: container at http://127.0.0.1:8080: Status "+strconv.Itoa(status)+"\n\n"+errorMessage+"\n", stderr.String()) } - -func documentServiceURL(client *mock.HTTPClient) (string, error) { - return "http://127.0.0.1:8080", nil -} diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index 7d4b9cc8042..8b8589baec3 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -12,7 +12,6 @@ import ( "github.com/spf13/cobra" "github.com/vespa-engine/vespa/client/go/internal/util" - "github.com/vespa-engine/vespa/client/go/internal/vespa" "github.com/vespa-engine/vespa/client/go/internal/vespa/document" ) @@ -57,17 +56,17 @@ type feedOptions struct { func newFeedCmd(cli *CLI) *cobra.Command { var options feedOptions cmd := &cobra.Command{ - Use: "feed FILE [FILE]...", + Use: "feed json-file [json-file]...", Short: "Feed multiple document operations to a Vespa cluster", Long: `Feed multiple document operations to a Vespa cluster. This command can be used to feed large amounts of documents to a Vespa cluster efficiently. -The contents of FILE must be either a JSON array or JSON objects separated by +The contents of JSON-FILE must be either a JSON array or JSON objects separated by newline (JSONL). -If FILE is a single dash ('-'), documents will be read from standard input. +If JSON-FILE is a single dash ('-'), documents will be read from standard input. `, Example: `$ vespa feed docs.jsonl moredocs.json $ cat docs.jsonl | vespa feed -`, @@ -108,8 +107,9 @@ func createServices(n int, timeout time.Duration, waitSecs int, cli *CLI) ([]uti } services := make([]util.HTTPClient, 0, n) baseURL := "" + waiter := cli.waiter(false, time.Duration(waitSecs)*time.Second) for i := 0; i < n; i++ { - service, err := cli.service(target, vespa.DocumentService, 0, cli.config.cluster(), time.Duration(waitSecs)*time.Second) + service, err := waiter.Service(target, cli.config.cluster()) if err != nil { return nil, "", err } diff --git a/client/go/internal/cli/cmd/feed_test.go b/client/go/internal/cli/cmd/feed_test.go index fc2c5ec7520..84328cad5fb 100644 --- a/client/go/internal/cli/cmd/feed_test.go +++ b/client/go/internal/cli/cmd/feed_test.go @@ -43,7 +43,7 @@ func TestFeed(t *testing.T) { httpClient.NextResponseString(200, `{"message":"OK"}`) httpClient.NextResponseString(200, `{"message":"OK"}`) - require.Nil(t, cli.Run("feed", jsonFile1, jsonFile2)) + require.Nil(t, cli.Run("feed", "-t", "http://127.0.0.1:8080", jsonFile1, jsonFile2)) assert.Equal(t, "", stderr.String()) want := `{ @@ -113,7 +113,7 @@ func TestFeedInvalid(t *testing.T) { jsonFile := filepath.Join(td, "docs.jsonl") require.Nil(t, os.WriteFile(jsonFile, doc, 0644)) httpClient.NextResponseString(200, `{"message":"OK"}`) - require.NotNil(t, cli.Run("feed", jsonFile)) + require.NotNil(t, cli.Run("feed", "-t", "http://127.0.0.1:8080", jsonFile)) want := `{ "feeder.seconds": 3.000, diff --git a/client/go/internal/cli/cmd/prod.go b/client/go/internal/cli/cmd/prod.go index 3b37197340f..79a6907eef2 100644 --- a/client/go/internal/cli/cmd/prod.go +++ b/client/go/internal/cli/cmd/prod.go @@ -114,7 +114,7 @@ type prodDeployOptions struct { func newProdDeployCmd(cli *CLI) *cobra.Command { var options prodDeployOptions cmd := &cobra.Command{ - Use: "deploy", + Use: "deploy [application-directory-or-file]", Aliases: []string{"submit"}, // TODO: Remove in Vespa 9 Short: "Deploy an application to production", Long: `Deploy an application to production. diff --git a/client/go/internal/cli/cmd/query.go b/client/go/internal/cli/cmd/query.go index a5b35052b11..3f849fae99e 100644 --- a/client/go/internal/cli/cmd/query.go +++ b/client/go/internal/cli/cmd/query.go @@ -64,7 +64,8 @@ func query(cli *CLI, arguments []string, timeoutSecs, waitSecs int, curl bool) e if err != nil { return err } - service, err := cli.service(target, vespa.QueryService, 0, cli.config.cluster(), time.Duration(waitSecs)*time.Second) + waiter := cli.waiter(false, time.Duration(waitSecs)*time.Second) + service, err := waiter.Service(target, cli.config.cluster()) if err != nil { return err } diff --git a/client/go/internal/cli/cmd/query_test.go b/client/go/internal/cli/cmd/query_test.go index 1caf6d33e70..6d5adc0508e 100644 --- a/client/go/internal/cli/cmd/query_test.go +++ b/client/go/internal/cli/cmd/query_test.go @@ -26,7 +26,7 @@ func TestQueryVerbose(t *testing.T) { cli, stdout, stderr := newTestCLI(t) cli.httpClient = client - assert.Nil(t, cli.Run("query", "-v", "select from sources * where title contains 'foo'")) + assert.Nil(t, cli.Run("-t", "http://127.0.0.1:8080", "query", "-v", "select from sources * where title contains 'foo'")) assert.Equal(t, "curl 'http://127.0.0.1:8080/search/?timeout=10s&yql=select+from+sources+%2A+where+title+contains+%27foo%27'\n", stderr.String()) assert.Equal(t, "{\n \"query\": \"result\"\n}\n", stdout.String()) } @@ -75,7 +75,7 @@ func assertQuery(t *testing.T, expectedQuery string, query ...string) { cli, stdout, _ := newTestCLI(t) cli.httpClient = client - args := []string{"query"} + args := []string{"-t", "http://127.0.0.1:8080", "query"} assert.Nil(t, cli.Run(append(args, query...)...)) assert.Equal(t, "{\n \"query\": \"result\"\n}\n", @@ -91,7 +91,7 @@ func assertQueryError(t *testing.T, status int, errorMessage string) { client.NextResponseString(status, errorMessage) cli, _, stderr := newTestCLI(t) cli.httpClient = client - assert.NotNil(t, cli.Run("query", "yql=select from sources * where title contains 'foo'")) + assert.NotNil(t, cli.Run("-t", "http://127.0.0.1:8080", "query", "yql=select from sources * where title contains 'foo'")) assert.Equal(t, "Error: invalid query: Status "+strconv.Itoa(status)+"\n"+errorMessage+"\n", stderr.String(), @@ -103,7 +103,7 @@ func assertQueryServiceError(t *testing.T, status int, errorMessage string) { client.NextResponseString(status, errorMessage) cli, _, stderr := newTestCLI(t) cli.httpClient = client - assert.NotNil(t, cli.Run("query", "yql=select from sources * where title contains 'foo'")) + assert.NotNil(t, cli.Run("-t", "http://127.0.0.1:8080", "query", "yql=select from sources * where title contains 'foo'")) assert.Equal(t, "Error: Status "+strconv.Itoa(status)+" from container at 127.0.0.1:8080\n"+errorMessage+"\n", stderr.String(), diff --git a/client/go/internal/cli/cmd/root.go b/client/go/internal/cli/cmd/root.go index ad42ea588b0..69fd88c1b2b 100644 --- a/client/go/internal/cli/cmd/root.go +++ b/client/go/internal/cli/cmd/root.go @@ -43,6 +43,13 @@ type CLI struct { Stdout io.Writer Stderr io.Writer + exec executor + isTerminal func() bool + spinner func(w io.Writer, message string, fn func() error) error + + now func() time.Time + retryInterval time.Duration + cmd *cobra.Command config *Config version version.Version @@ -51,10 +58,6 @@ type CLI struct { httpClientFactory func(timeout time.Duration) util.HTTPClient auth0Factory auth0Factory ztsFactory ztsFactory - exec executor - isTerminal func() bool - spinner func(w io.Writer, message string, fn func() error) error - now func() time.Time } // ErrCLI is an error returned to the user. It wraps an exit status, a regular error and optional hints for resolving @@ -100,7 +103,7 @@ type ztsFactory func(httpClient util.HTTPClient, domain, url string) (vespa.Auth // New creates the Vespa CLI, writing output to stdout and stderr, and reading environment variables from environment. func New(stdout, stderr io.Writer, environment []string) (*CLI, error) { cmd := &cobra.Command{ - Use: "vespa command-name", + Use: "vespa", Short: "The command-line tool for Vespa.ai", Long: `The command-line tool for Vespa.ai. @@ -134,12 +137,15 @@ For detailed description of flags and configuration, see 'vespa help config'. Stdout: stdout, Stderr: stderr, - version: version, - cmd: cmd, + exec: &execSubprocess{}, + now: time.Now, + retryInterval: 2 * time.Second, + + version: version, + cmd: cmd, + httpClient: httpClientFactory(time.Second * 10), httpClientFactory: httpClientFactory, - exec: &execSubprocess{}, - now: time.Now, auth0Factory: func(httpClient util.HTTPClient, options auth0.Options) (vespa.Authenticator, error) { return auth0.NewClient(httpClient, options) }, @@ -267,9 +273,8 @@ func (c *CLI) configureCommands() { prodCmd.AddCommand(newProdDeployCmd(c)) // prod deploy rootCmd.AddCommand(prodCmd) // prod rootCmd.AddCommand(newQueryCmd(c)) // query - statusCmd.AddCommand(newStatusQueryCmd(c)) // status query - statusCmd.AddCommand(newStatusDocumentCmd(c)) // status document statusCmd.AddCommand(newStatusDeployCmd(c)) // status deploy + statusCmd.AddCommand(newStatusDeploymentCmd(c)) // status deployment rootCmd.AddCommand(statusCmd) // status rootCmd.AddCommand(newTestCmd(c)) // test rootCmd.AddCommand(newVersionCmd(c)) // version @@ -278,7 +283,7 @@ func (c *CLI) configureCommands() { } func (c *CLI) bindWaitFlag(cmd *cobra.Command, defaultSecs int, value *int) { - desc := "Number of seconds to wait for a service to become ready. 0 to disable" + desc := "Number of seconds to wait for service(s) to become ready. 0 to disable" if defaultSecs == 0 { desc += " (default 0)" } @@ -296,6 +301,10 @@ func (c *CLI) printSuccess(msg ...interface{}) { fmt.Fprintln(c.Stdout, color.GreenString("Success:"), fmt.Sprint(msg...)) } +func (c *CLI) printInfo(msg ...interface{}) { + fmt.Fprintln(c.Stderr, fmt.Sprint(msg...)) +} + func (c *CLI) printDebug(msg ...interface{}) { fmt.Fprintln(c.Stderr, color.CyanString("Debug:"), fmt.Sprint(msg...)) } @@ -334,6 +343,10 @@ func (c *CLI) confirm(question string, confirmByDefault bool) (bool, error) { } } +func (c *CLI) waiter(once bool, timeout time.Duration) *Waiter { + return &Waiter{Once: once, Timeout: timeout, cli: c} +} + // target creates a target according the configuration of this CLI and given opts. func (c *CLI) target(opts targetOptions) (vespa.Target, error) { targetType, err := c.targetType() @@ -402,9 +415,9 @@ func (c *CLI) createCustomTarget(targetType, customURL string) (vespa.Target, er } switch targetType { case vespa.TargetLocal: - return vespa.LocalTarget(c.httpClient, tlsOptions), nil + return vespa.LocalTarget(c.httpClient, tlsOptions, c.retryInterval), nil case vespa.TargetCustom: - return vespa.CustomTarget(c.httpClient, customURL, tlsOptions), nil + return vespa.CustomTarget(c.httpClient, customURL, tlsOptions, c.retryInterval), nil default: return nil, fmt.Errorf("invalid custom target: %s", targetType) } @@ -486,7 +499,7 @@ func (c *CLI) createCloudTarget(targetType string, opts targetOptions, customURL Writer: c.Stdout, Level: vespa.LogLevel(logLevel), } - return vespa.CloudTarget(c.httpClient, apiAuth, deploymentAuth, apiOptions, deploymentOptions, logOptions) + return vespa.CloudTarget(c.httpClient, apiAuth, deploymentAuth, apiOptions, deploymentOptions, logOptions, c.retryInterval) } // system returns the appropiate system for the target configured in this CLI. @@ -504,24 +517,6 @@ func (c *CLI) system(targetType string) (vespa.System, error) { return vespa.System{}, fmt.Errorf("no default system found for %s target", targetType) } -// service returns the service of given name located at target. If non-empty, cluster specifies a cluster to query. This -// function blocks according to the wait period configured in this CLI. The parameter sessionOrRunID specifies either -// the session ID (local target) or run ID (cloud target) to wait for. -func (c *CLI) service(target vespa.Target, name string, sessionOrRunID int64, cluster string, timeout time.Duration) (*vespa.Service, error) { - if timeout > 0 { - log.Printf("Waiting up to %s for %s service to become available ...", color.CyanString(timeout.String()), color.CyanString(name)) - } - s, err := target.Service(name, timeout, sessionOrRunID, cluster) - if err != nil { - err := fmt.Errorf("service '%s' is unavailable: %w", name, err) - if target.IsCloud() { - return nil, errHint(err, "Confirm that you're communicating with the correct zone and cluster", "The -z option controls the zone", "The -C option controls the cluster") - } - return nil, err - } - return s, nil -} - // isCI returns true if running inside a continuous integration environment. func (c *CLI) isCI() bool { _, ok := c.Environment["CI"] diff --git a/client/go/internal/cli/cmd/status.go b/client/go/internal/cli/cmd/status.go index 6570aeff448..f185fde6ca1 100644 --- a/client/go/internal/cli/cmd/status.go +++ b/client/go/internal/cli/cmd/status.go @@ -7,6 +7,8 @@ package cmd import ( "fmt" "log" + "strconv" + "strings" "time" "github.com/fatih/color" @@ -17,94 +19,127 @@ import ( func newStatusCmd(cli *CLI) *cobra.Command { var waitSecs int cmd := &cobra.Command{ - Use: "status", - Short: "Verify that a service is ready to use (query by default)", - Example: `$ vespa status query`, - DisableAutoGenTag: true, - SilenceUsage: true, - Args: cobra.MaximumNArgs(1), - RunE: func(cmd *cobra.Command, args []string) error { - return printServiceStatus(cli, vespa.QueryService, waitSecs) + Use: "status", + Aliases: []string{ + "status container", + "status document", // TODO: Remove on Vespa 9 + "status query", // TODO: Remove on Vespa 9 }, - } - cli.bindWaitFlag(cmd, 0, &waitSecs) - return cmd -} - -func newStatusQueryCmd(cli *CLI) *cobra.Command { - var waitSecs int - cmd := &cobra.Command{ - Use: "query", - Short: "Verify that the query service is ready to use (default)", - Example: `$ vespa status query`, + Short: "Verify that container service(s) are ready to use", + Example: `$ vespa status +$ vespa status --cluster mycluster`, DisableAutoGenTag: true, SilenceUsage: true, - Args: cobra.ExactArgs(0), + Args: cobra.MaximumNArgs(1), RunE: func(cmd *cobra.Command, args []string) error { - return printServiceStatus(cli, vespa.QueryService, waitSecs) + cluster := cli.config.cluster() + t, err := cli.target(targetOptions{}) + if err != nil { + return err + } + waiter := cli.waiter(true, time.Duration(waitSecs)*time.Second) + if cluster == "" { + services, err := waiter.Services(t) + if err != nil { + return err + } + if len(services) == 0 { + return errHint(fmt.Errorf("no services exist"), "Deployment may not be ready yet", "Try 'vespa status deployment'") + } + for _, s := range services { + printReadyService(s, cli) + } + return nil + } else { + s, err := waiter.Service(t, cluster) + if err != nil { + return err + } + printReadyService(s, cli) + return nil + } }, } cli.bindWaitFlag(cmd, 0, &waitSecs) return cmd } -func newStatusDocumentCmd(cli *CLI) *cobra.Command { +func newStatusDeployCmd(cli *CLI) *cobra.Command { var waitSecs int cmd := &cobra.Command{ - Use: "document", - Short: "Verify that the document service is ready to use", - Example: `$ vespa status document`, + Use: "deploy", + Short: "Verify that the deploy service is ready to use", + Example: `$ vespa status deploy`, DisableAutoGenTag: true, SilenceUsage: true, Args: cobra.ExactArgs(0), RunE: func(cmd *cobra.Command, args []string) error { - return printServiceStatus(cli, vespa.DocumentService, waitSecs) + t, err := cli.target(targetOptions{}) + if err != nil { + return err + } + waiter := cli.waiter(true, time.Duration(waitSecs)*time.Second) + s, err := waiter.DeployService(t) + if err != nil { + return err + } + printReadyService(s, cli) + return nil }, } cli.bindWaitFlag(cmd, 0, &waitSecs) return cmd } -func newStatusDeployCmd(cli *CLI) *cobra.Command { +func newStatusDeploymentCmd(cli *CLI) *cobra.Command { var waitSecs int cmd := &cobra.Command{ - Use: "deploy", - Short: "Verify that the deploy service is ready to use", - Example: `$ vespa status deploy`, + Use: "deployment", + Short: "Verify that deployment has converged on latest, or given, ID", + Example: `$ vespa status deployment +$ vespa status deployment -t cloud [run-id] +$ vespa status deployment -t local [session-id] +`, DisableAutoGenTag: true, SilenceUsage: true, - Args: cobra.ExactArgs(0), + Args: cobra.MaximumNArgs(1), RunE: func(cmd *cobra.Command, args []string) error { - return printServiceStatus(cli, vespa.DeployService, waitSecs) + wantedID := vespa.LatestDeployment + if len(args) > 0 { + n, err := strconv.ParseInt(args[0], 10, 64) + if err != nil { + return fmt.Errorf("invalid id: %s: %w", args[0], err) + } + wantedID = n + } + t, err := cli.target(targetOptions{logLevel: "none"}) + if err != nil { + return err + } + waiter := cli.waiter(true, time.Duration(waitSecs)*time.Second) + id, err := waiter.Deployment(t, wantedID) + if err != nil { + return err + } + if t.IsCloud() { + log.Printf("Deployment run %s has completed", color.CyanString(strconv.FormatInt(id, 10))) + log.Printf("See %s for more details", color.CyanString(fmt.Sprintf("%s/tenant/%s/application/%s/%s/instance/%s/job/%s-%s/run/%d", + t.Deployment().System.ConsoleURL, + t.Deployment().Application.Tenant, t.Deployment().Application.Application, t.Deployment().Zone.Environment, + t.Deployment().Application.Instance, t.Deployment().Zone.Environment, t.Deployment().Zone.Region, + id))) + } else { + log.Printf("Deployment is %s on config generation %s", color.GreenString("ready"), color.CyanString(strconv.FormatInt(id, 10))) + } + return nil }, } cli.bindWaitFlag(cmd, 0, &waitSecs) return cmd } -func printServiceStatus(cli *CLI, name string, waitSecs int) error { - t, err := cli.target(targetOptions{}) - if err != nil { - return err - } - cluster := cli.config.cluster() - s, err := cli.service(t, name, 0, cluster, 0) - if err != nil { - return err - } - // Wait explicitly - status, err := s.Wait(time.Duration(waitSecs) * time.Second) - clusterPart := "" - if cluster != "" { - clusterPart = fmt.Sprintf(" named %s", color.CyanString(cluster)) - } - if status/100 == 2 { - log.Print(s.Description(), clusterPart, " at ", color.CyanString(s.BaseURL), " is ", color.GreenString("ready")) - } else { - if err == nil { - err = fmt.Errorf("status %d", status) - } - return fmt.Errorf("%s%s at %s is %s: %w", s.Description(), clusterPart, color.CyanString(s.BaseURL), color.RedString("not ready"), err) - } - return nil +func printReadyService(s *vespa.Service, cli *CLI) { + desc := s.Description() + desc = strings.ToUpper(string(desc[0])) + string(desc[1:]) + log.Print(desc, " at ", color.CyanString(s.BaseURL), " is ", color.GreenString("ready")) } diff --git a/client/go/internal/cli/cmd/status_test.go b/client/go/internal/cli/cmd/status_test.go index 76efea55503..36f51ff5073 100644 --- a/client/go/internal/cli/cmd/status_test.go +++ b/client/go/internal/cli/cmd/status_test.go @@ -5,10 +5,12 @@ package cmd import ( + "io" "testing" "github.com/stretchr/testify/assert" "github.com/vespa-engine/vespa/client/go/internal/mock" + "github.com/vespa-engine/vespa/client/go/internal/vespa" ) func TestStatusDeployCommand(t *testing.T) { @@ -23,63 +25,184 @@ func TestStatusDeployCommandWithLocalTarget(t *testing.T) { assertDeployStatus("http://127.0.0.1:19071", []string{"-t", "local"}, t) } -func TestStatusQueryCommand(t *testing.T) { - assertQueryStatus("http://127.0.0.1:8080", []string{}, t) +func TestStatusCommand(t *testing.T) { + assertStatus("http://127.0.0.1:8080", []string{}, t) } -func TestStatusQueryCommandWithUrlTarget(t *testing.T) { - assertQueryStatus("http://mycontainertarget:8080", []string{"-t", "http://mycontainertarget:8080"}, t) +func TestStatusCommandMultiCluster(t *testing.T) { + client := &mock.HTTPClient{} + cli, stdout, stderr := newTestCLI(t) + cli.httpClient = client + + mockServiceStatus(client) + assert.NotNil(t, cli.Run("status")) + assert.Equal(t, "Error: no services exist\nHint: Deployment may not be ready yet\nHint: Try 'vespa status deployment'\n", stderr.String()) + + mockServiceStatus(client, "foo", "bar") + assert.Nil(t, cli.Run("status")) + assert.Equal(t, `Container bar at http://127.0.0.1:8080 is ready +Container foo at http://127.0.0.1:8080 is ready +`, stdout.String()) + + stdout.Reset() + mockServiceStatus(client, "foo", "bar") + assert.Nil(t, cli.Run("status", "--cluster", "foo")) + assert.Equal(t, "Container foo at http://127.0.0.1:8080 is ready\n", stdout.String()) +} + +func TestStatusCommandWithUrlTarget(t *testing.T) { + assertStatus("http://mycontainertarget:8080", []string{"-t", "http://mycontainertarget:8080"}, t) +} + +func TestStatusCommandWithLocalTarget(t *testing.T) { + assertStatus("http://127.0.0.1:8080", []string{"-t", "local"}, t) +} + +func TestStatusError(t *testing.T) { + client := &mock.HTTPClient{} + mockServiceStatus(client, "default") + client.NextStatus(500) + cli, _, stderr := newTestCLI(t) + cli.httpClient = client + assert.NotNil(t, cli.Run("status", "container")) + assert.Equal(t, + "Error: unhealthy container default: status 500 at http://127.0.0.1:8080/ApplicationStatus: wait timed out\n", + stderr.String()) + + stderr.Reset() + client.NextResponseError(io.EOF) + assert.NotNil(t, cli.Run("status", "container", "-t", "http://example.com")) + assert.Equal(t, + "Error: unhealthy container at http://example.com/ApplicationStatus: EOF\n", + stderr.String()) } -func TestStatusQueryCommandWithLocalTarget(t *testing.T) { - assertQueryStatus("http://127.0.0.1:8080", []string{"-t", "local"}, t) +func TestStatusLocalDeployment(t *testing.T) { + client := &mock.HTTPClient{} + cli, stdout, stderr := newTestCLI(t) + cli.httpClient = client + resp := mock.HTTPResponse{ + URI: "/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge", + Status: 200, + } + // Latest generation + resp.Body = []byte(`{"currentGeneration": 42, "converged": true}`) + client.NextResponse(resp) + assert.Nil(t, cli.Run("status", "deployment")) + assert.Equal(t, "", stderr.String()) + assert.Equal(t, "Deployment is ready on config generation 42\n", stdout.String()) + + // Latest generation without convergence + resp.Body = []byte(`{"currentGeneration": 42, "converged": false}`) + client.NextResponse(resp) + assert.NotNil(t, cli.Run("status", "deployment")) + assert.Equal(t, "Error: deployment not converged on latest generation after waiting 0s: wait timed out\n", stderr.String()) + + // Explicit generation + stderr.Reset() + client.NextResponse(resp) + assert.NotNil(t, cli.Run("status", "deployment", "41")) + assert.Equal(t, "Error: deployment not converged on generation 41 after waiting 0s: wait timed out\n", stderr.String()) } -func TestStatusDocumentCommandWithLocalTarget(t *testing.T) { - assertDocumentStatus("http://127.0.0.1:8080", []string{"-t", "local"}, t) +func TestStatusCloudDeployment(t *testing.T) { + cli, stdout, stderr := newTestCLI(t, "CI=true") + app := vespa.ApplicationID{Tenant: "t1", Application: "a1", Instance: "i1"} + assert.Nil(t, cli.Run("config", "set", "application", app.String())) + assert.Nil(t, cli.Run("config", "set", "target", "cloud")) + assert.Nil(t, cli.Run("config", "set", "zone", "dev.us-north-1")) + assert.Nil(t, cli.Run("auth", "api-key")) + stdout.Reset() + client := &mock.HTTPClient{} + cli.httpClient = client + // Latest run + client.NextResponse(mock.HTTPResponse{ + URI: "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1?limit=1", + Status: 200, + Body: []byte(`{"runs": [{"id": 1337}]}`), + }) + client.NextResponse(mock.HTTPResponse{ + URI: "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1/run/1337?after=-1", + Status: 200, + Body: []byte(`{"active": false, "status": "success"}`), + }) + assert.Nil(t, cli.Run("status", "deployment")) + assert.Equal(t, "", stderr.String()) + assert.Equal(t, + "Deployment run 1337 has completed\nSee https://console.vespa-cloud.com/tenant/t1/application/a1/dev/instance/i1/job/dev-us-north-1/run/1337 for more details\n", + stdout.String()) + // Explicit run + client.NextResponse(mock.HTTPResponse{ + URI: "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1/run/42?after=-1", + Status: 200, + Body: []byte(`{"active": false, "status": "failure"}`), + }) + assert.NotNil(t, cli.Run("status", "deployment", "42")) + assert.Equal(t, "Error: deployment run 42 incomplete after waiting 0s: run 42 ended with unsuccessful status: failure\n", stderr.String()) } -func TestStatusErrorResponse(t *testing.T) { - assertQueryStatusError("http://127.0.0.1:8080", []string{}, t) +func isLocalTarget(args []string) bool { + for i := 0; i < len(args)-1; i++ { + if args[i] == "-t" { + return args[i+1] == "local" + } + } + return true // local is default } -func assertDeployStatus(target string, args []string, t *testing.T) { +func assertDeployStatus(expectedTarget string, args []string, t *testing.T) { + t.Helper() client := &mock.HTTPClient{} + client.NextResponse(mock.HTTPResponse{ + URI: "/status.html", + Status: 200, + }) cli, stdout, _ := newTestCLI(t) cli.httpClient = client statusArgs := []string{"status", "deploy"} assert.Nil(t, cli.Run(append(statusArgs, args...)...)) assert.Equal(t, - "Deploy API at "+target+" is ready\n", - stdout.String(), - "vespa status config-server") - assert.Equal(t, target+"/status.html", client.LastRequest.URL.String()) + "Deploy API at "+expectedTarget+" is ready\n", + stdout.String()) + assert.Equal(t, expectedTarget+"/status.html", client.LastRequest.URL.String()) } -func assertQueryStatus(target string, args []string, t *testing.T) { +func assertStatus(expectedTarget string, args []string, t *testing.T) { + t.Helper() client := &mock.HTTPClient{} + clusterName := "" + for i := 0; i < 2; i++ { + if isLocalTarget(args) { + clusterName = "foo" + mockServiceStatus(client, clusterName) + } + client.NextResponse(mock.HTTPResponse{URI: "/ApplicationStatus", Status: 200}) + } cli, stdout, _ := newTestCLI(t) cli.httpClient = client - statusArgs := []string{"status", "query"} + statusArgs := []string{"status"} assert.Nil(t, cli.Run(append(statusArgs, args...)...)) - assert.Equal(t, - "Container (query API) at "+target+" is ready\n", - stdout.String(), - "vespa status container") - assert.Equal(t, target+"/ApplicationStatus", client.LastRequest.URL.String()) - - statusArgs = []string{"status"} + prefix := "Container" + if clusterName != "" { + prefix += " " + clusterName + } + assert.Equal(t, prefix+" at "+expectedTarget+" is ready\n", stdout.String()) + assert.Equal(t, expectedTarget+"/ApplicationStatus", client.LastRequest.URL.String()) + + // Test legacy command + statusArgs = []string{"status query"} stdout.Reset() assert.Nil(t, cli.Run(append(statusArgs, args...)...)) - assert.Equal(t, - "Container (query API) at "+target+" is ready\n", - stdout.String(), - "vespa status (the default)") - assert.Equal(t, target+"/ApplicationStatus", client.LastRequest.URL.String()) + assert.Equal(t, prefix+" at "+expectedTarget+" is ready\n", stdout.String()) + assert.Equal(t, expectedTarget+"/ApplicationStatus", client.LastRequest.URL.String()) } func assertDocumentStatus(target string, args []string, t *testing.T) { + t.Helper() client := &mock.HTTPClient{} + if isLocalTarget(args) { + mockServiceStatus(client, "default") + } cli, stdout, _ := newTestCLI(t) cli.httpClient = client assert.Nil(t, cli.Run("status", "document")) @@ -89,15 +212,3 @@ func assertDocumentStatus(target string, args []string, t *testing.T) { "vespa status container") assert.Equal(t, target+"/ApplicationStatus", client.LastRequest.URL.String()) } - -func assertQueryStatusError(target string, args []string, t *testing.T) { - client := &mock.HTTPClient{} - client.NextStatus(500) - cli, _, stderr := newTestCLI(t) - cli.httpClient = client - assert.NotNil(t, cli.Run("status", "container")) - assert.Equal(t, - "Error: Container (query API) at "+target+" is not ready: status 500\n", - stderr.String(), - "vespa status container") -} diff --git a/client/go/internal/cli/cmd/test.go b/client/go/internal/cli/cmd/test.go index abee760efbb..5b99973d879 100644 --- a/client/go/internal/cli/cmd/test.go +++ b/client/go/internal/cli/cmd/test.go @@ -79,7 +79,7 @@ func runTests(cli *CLI, rootPath string, dryRun bool, waitSecs int) (int, []stri if err != nil { return 0, nil, errHint(err, "See https://docs.vespa.ai/en/reference/testing") } - context := testContext{testsPath: rootPath, dryRun: dryRun, cli: cli} + context := testContext{testsPath: rootPath, dryRun: dryRun, cli: cli, clusters: map[string]*vespa.Service{}} previousFailed := false for _, test := range tests { if !test.IsDir() && filepath.Ext(test.Name()) == ".json" { @@ -100,7 +100,7 @@ func runTests(cli *CLI, rootPath string, dryRun bool, waitSecs int) (int, []stri } } } else if strings.HasSuffix(stat.Name(), ".json") { - failure, err := runTest(rootPath, testContext{testsPath: filepath.Dir(rootPath), dryRun: dryRun, cli: cli}, waitSecs) + failure, err := runTest(rootPath, testContext{testsPath: filepath.Dir(rootPath), dryRun: dryRun, cli: cli, clusters: map[string]*vespa.Service{}}, waitSecs) if err != nil { return 0, nil, err } @@ -216,9 +216,16 @@ func verify(step step, defaultCluster string, defaultParameters map[string]strin if err != nil { return "", "", err } - service, err = target.Service(vespa.QueryService, time.Duration(waitSecs)*time.Second, 0, cluster) - if err != nil { - return "", "", err + ok := false + service, ok = context.clusters[cluster] + if !ok { + // Cache service so we don't have to discover it for every step + waiter := context.cli.waiter(false, time.Duration(waitSecs)*time.Second) + service, err = waiter.Service(target, cluster) + if err != nil { + return "", "", err + } + context.clusters[cluster] = service } requestUrl, err = url.ParseRequestURI(service.BaseURL + requestUri) if err != nil { @@ -474,6 +481,8 @@ type testContext struct { lazyTarget vespa.Target testsPath string dryRun bool + // Cache of services by their cluster name + clusters map[string]*vespa.Service } func (t *testContext) target() (vespa.Target, error) { diff --git a/client/go/internal/cli/cmd/test_test.go b/client/go/internal/cli/cmd/test_test.go index 5d6bb441b2a..4f8e6d49a2a 100644 --- a/client/go/internal/cli/cmd/test_test.go +++ b/client/go/internal/cli/cmd/test_test.go @@ -23,20 +23,26 @@ import ( func TestSuite(t *testing.T) { client := &mock.HTTPClient{} searchResponse, _ := os.ReadFile("testdata/tests/response.json") + mockServiceStatus(client, "container") client.NextStatus(200) client.NextStatus(200) - for i := 0; i < 11; i++ { + for i := 0; i < 2; i++ { + client.NextResponseString(200, string(searchResponse)) + } + mockServiceStatus(client, "container") // Some tests do not specify cluster, which is fine since we only have one, but this causes a cache miss + for i := 0; i < 9; i++ { client.NextResponseString(200, string(searchResponse)) } - expectedBytes, _ := os.ReadFile("testdata/tests/expected-suite.out") cli, stdout, stderr := newTestCLI(t) cli.httpClient = client assert.NotNil(t, cli.Run("test", "testdata/tests/system-test")) - + assert.Equal(t, "", stderr.String()) baseUrl := "http://127.0.0.1:8080" urlWithQuery := baseUrl + "/search/?presentation.timing=true&query=artist%3A+foo&timeout=3.4s" - requests := []*http.Request{createFeedRequest(baseUrl), createFeedRequest(baseUrl), createSearchRequest(urlWithQuery), createSearchRequest(urlWithQuery)} + discoveryRequest := createSearchRequest("http://127.0.0.1:19071/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge") + requests := []*http.Request{discoveryRequest, createFeedRequest(baseUrl), createFeedRequest(baseUrl), createSearchRequest(urlWithQuery), createSearchRequest(urlWithQuery)} + requests = append(requests, discoveryRequest) requests = append(requests, createSearchRequest(baseUrl+"/search/")) requests = append(requests, createSearchRequest(baseUrl+"/search/?foo=%2F")) for i := 0; i < 7; i++ { @@ -95,6 +101,7 @@ func TestSuiteWithoutTests(t *testing.T) { func TestSingleTest(t *testing.T) { client := &mock.HTTPClient{} searchResponse, _ := os.ReadFile("testdata/tests/response.json") + mockServiceStatus(client, "container") client.NextStatus(200) client.NextStatus(200) client.NextResponseString(200, string(searchResponse)) @@ -109,7 +116,8 @@ func TestSingleTest(t *testing.T) { baseUrl := "http://127.0.0.1:8080" rawUrl := baseUrl + "/search/?presentation.timing=true&query=artist%3A+foo&timeout=3.4s" - assertRequests([]*http.Request{createFeedRequest(baseUrl), createFeedRequest(baseUrl), createSearchRequest(rawUrl), createSearchRequest(rawUrl)}, client, t) + discoveryRequest := createSearchRequest("http://127.0.0.1:19071/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge") + assertRequests([]*http.Request{discoveryRequest, createFeedRequest(baseUrl), createFeedRequest(baseUrl), createSearchRequest(rawUrl), createSearchRequest(rawUrl)}, client, t) } func TestSingleTestWithCloudAndEndpoints(t *testing.T) { @@ -172,12 +180,17 @@ func createRequest(method string, uri string, body string) *http.Request { } func assertRequests(requests []*http.Request, client *mock.HTTPClient, t *testing.T) { + t.Helper() if assert.Equal(t, len(requests), len(client.Requests)) { for i, e := range requests { a := client.Requests[i] assert.Equal(t, e.URL.String(), a.URL.String()) assert.Equal(t, e.Method, a.Method) - assert.Equal(t, util.ReaderToJSON(e.Body), util.ReaderToJSON(a.Body)) + actualBody := a.Body + if actualBody == nil { + actualBody = io.NopCloser(strings.NewReader("")) + } + assert.Equal(t, util.ReaderToJSON(e.Body), util.ReaderToJSON(actualBody)) } } } diff --git a/client/go/internal/cli/cmd/testutil_test.go b/client/go/internal/cli/cmd/testutil_test.go index 61d6c15c5a0..c16c9f8dc50 100644 --- a/client/go/internal/cli/cmd/testutil_test.go +++ b/client/go/internal/cli/cmd/testutil_test.go @@ -3,8 +3,10 @@ package cmd import ( "bytes" + "fmt" "net/http" "path/filepath" + "strings" "testing" "time" @@ -41,6 +43,36 @@ func newTestCLI(t *testing.T, envVars ...string) (*CLI, *bytes.Buffer, *bytes.Bu return cli, &stdout, &stderr } +func mockServiceStatus(client *mock.HTTPClient, clusterNames ...string) { + var serviceObjects []string + for _, name := range clusterNames { + service := fmt.Sprintf(`{ + "clusterName": "%s", + "host": "localhost", + "port": 8080, + "type": "container", + "url": "http://localhost:19071/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge/localhost:8080", + "currentGeneration": 1 + } +`, name) + serviceObjects = append(serviceObjects, service) + } + services := "[]" + if len(serviceObjects) > 0 { + services = "[" + strings.Join(serviceObjects, ",") + "]" + } + response := fmt.Sprintf(` +{ + "services": %s, + "currentGeneration": 1 +}`, services) + client.NextResponse(mock.HTTPResponse{ + URI: "/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge", + Status: 200, + Body: []byte(response), + }) +} + type mockAuthenticator struct{} func (a *mockAuthenticator) Authenticate(request *http.Request) error { return nil } diff --git a/client/go/internal/cli/cmd/visit_test.go b/client/go/internal/cli/cmd/visit_test.go index f85fb739370..bd806f1d9c9 100644 --- a/client/go/internal/cli/cmd/visit_test.go +++ b/client/go/internal/cli/cmd/visit_test.go @@ -93,10 +93,14 @@ func TestRunOneVisit(t *testing.T) { func withMockClient(t *testing.T, prepCli func(*mock.HTTPClient), runOp func(*vespa.Service)) *http.Request { client := &mock.HTTPClient{} + mockServiceStatus(client, "container") prepCli(client) cli, _, _ := newTestCLI(t) cli.httpClient = client - service, _ := documentService(cli, 0) + service, err := documentService(cli, 0) + if err != nil { + t.Fatal(err) + } runOp(service) return client.LastRequest } @@ -126,6 +130,7 @@ func TestVisitCommand(t *testing.T) { } func assertVisitResults(arguments []string, t *testing.T, responses []string, queryPart, output string) { + t.Helper() client := &mock.HTTPClient{} client.NextResponseString(200, handlersResponse) client.NextResponseString(400, clusterStarResponse) @@ -134,6 +139,7 @@ func assertVisitResults(arguments []string, t *testing.T, responses []string, qu } cli, stdout, stderr := newTestCLI(t) cli.httpClient = client + arguments = append(arguments, "-t", "http://127.0.0.1:8080") assert.Nil(t, cli.Run(arguments...)) assert.Equal(t, output, stdout.String()) assert.Equal(t, "", stderr.String()) diff --git a/client/go/internal/cli/cmd/waiter.go b/client/go/internal/cli/cmd/waiter.go new file mode 100644 index 00000000000..a1919de798d --- /dev/null +++ b/client/go/internal/cli/cmd/waiter.go @@ -0,0 +1,97 @@ +package cmd + +import ( + "fmt" + "time" + + "github.com/fatih/color" + "github.com/vespa-engine/vespa/client/go/internal/vespa" +) + +// Waiter waits for Vespa services to become ready, within a timeout. +type Waiter struct { + // Once species whether we should wait at least one time, irregardless of timeout. + Once bool + + // Timeout specifies how long we should wait for an operation to complete. + Timeout time.Duration // TODO(mpolden): Consider making this a budget + + cli *CLI +} + +func (w *Waiter) wait() bool { return w.Once || w.Timeout > 0 } + +// DeployService returns the service providing the deploy API on given target, +func (w *Waiter) DeployService(target vespa.Target) (*vespa.Service, error) { + s, err := target.DeployService() + if err != nil { + return nil, err + } + if w.Timeout > 0 { + w.cli.printInfo("Waiting up to ", color.CyanString(w.Timeout.String()), " for ", s.Description(), " to become ready...") + } + if w.wait() { + if err := s.Wait(w.Timeout); err != nil { + return nil, err + } + } + return s, nil +} + +// Service returns the service identified by cluster ID, available on target. +func (w *Waiter) Service(target vespa.Target, cluster string) (*vespa.Service, error) { + targetType, err := w.cli.targetType() + if err != nil { + return nil, err + } + if targetType.url != "" && cluster != "" { + return nil, fmt.Errorf("cluster cannot be specified when target is an URL") + } + services, err := w.Services(target) + if err != nil { + return nil, err + } + service, err := vespa.FindService(cluster, services) + if err != nil { + return nil, errHint(err, "The --cluster option specifies the service to use") + } + if w.Timeout > 0 { + w.cli.printInfo("Waiting up to ", color.CyanString(w.Timeout.String()), " for ", color.CyanString(service.Description()), " to become available...") + } + if w.wait() { + if err := service.Wait(w.Timeout); err != nil { + return nil, err + } + } + return service, nil +} + +// Services returns all container services available on target. +func (w *Waiter) Services(target vespa.Target) ([]*vespa.Service, error) { + if w.Timeout > 0 { + w.cli.printInfo("Waiting up to ", color.CyanString(w.Timeout.String()), " for cluster discovery...") + } + services, err := target.ContainerServices(w.Timeout) + if err != nil { + return nil, err + } + for _, s := range services { + if w.Timeout > 0 { + w.cli.printInfo("Waiting up to ", color.CyanString(w.Timeout.String()), " for ", s.Description(), "...") + } + if w.wait() { + if err := s.Wait(w.Timeout); err != nil { + return nil, err + } + } + } + return services, nil +} + +// Deployment waits for a deployment to become ready, returning the ID of the converged deployment. +func (w *Waiter) Deployment(target vespa.Target, id int64) (int64, error) { + if w.Timeout > 0 { + w.cli.printInfo("Waiting up to ", color.CyanString(w.Timeout.String()), " for deployment to converge...") + } + return target.AwaitDeployment(id, w.Timeout) +} diff --git a/client/go/internal/mock/http.go b/client/go/internal/mock/http.go index 3d4ead596b0..8a17448957f 100644 --- a/client/go/internal/mock/http.go +++ b/client/go/internal/mock/http.go @@ -2,6 +2,7 @@ package mock import ( "bytes" + "fmt" "io" "net/http" "strconv" @@ -32,6 +33,7 @@ type HTTPClient struct { } type HTTPResponse struct { + URI string Status int Body []byte Header http.Header @@ -65,6 +67,9 @@ func (c *HTTPClient) Do(request *http.Request, timeout time.Duration) (*http.Res response := HTTPResponse{Status: 200} if len(c.nextResponses) > 0 { response = c.nextResponses[0] + if response.URI != "" && response.URI != request.URL.RequestURI() { + return nil, fmt.Errorf("uri of response is %s, which does not match request uri %s", response.URI, request.URL.RequestURI()) + } c.nextResponses = c.nextResponses[1:] } if c.ReadBody && request.Body != nil { diff --git a/client/go/internal/vespa/deploy.go b/client/go/internal/vespa/deploy.go index ae4d4678d66..3a3af0d66a0 100644 --- a/client/go/internal/vespa/deploy.go +++ b/client/go/internal/vespa/deploy.go @@ -92,7 +92,7 @@ func (d DeploymentOptions) String() string { } func (d *DeploymentOptions) url(path string) (*url.URL, error) { - service, err := d.Target.Service(DeployService, 0, 0, "") + service, err := d.Target.DeployService() if err != nil { return nil, err } @@ -149,7 +149,7 @@ func Prepare(deployment DeploymentOptions) (PrepareResult, error) { return PrepareResult{}, err } var jsonResponse struct { - SessionID string `json:"session-id"` + SessionID string `json:"session-id"` // API returns ID as string Log []LogLinePrepareResponse `json:"log"` } jsonDec := json.NewDecoder(response.Body) @@ -311,7 +311,7 @@ func Submit(opts DeploymentOptions, submission Submission) error { } func deployServiceDo(request *http.Request, timeout time.Duration, opts DeploymentOptions) (*http.Response, error) { - s, err := opts.Target.Service(DeployService, 0, 0, "") + s, err := opts.Target.DeployService() if err != nil { return nil, err } @@ -373,7 +373,7 @@ func uploadApplicationPackage(url *url.URL, opts DeploymentOptions) (PrepareResu if err != nil { return PrepareResult{}, err } - service, err := opts.Target.Service(DeployService, opts.Timeout, 0, "") + service, err := opts.Target.DeployService() if err != nil { return PrepareResult{}, err } @@ -384,7 +384,7 @@ func uploadApplicationPackage(url *url.URL, opts DeploymentOptions) (PrepareResu defer response.Body.Close() var jsonResponse struct { - SessionID string `json:"session-id"` // Config server + SessionID string `json:"session-id"` // Config server. API returns ID as string RunID int64 `json:"run"` // Controller Log []LogLinePrepareResponse `json:"log"` diff --git a/client/go/internal/vespa/deploy_test.go b/client/go/internal/vespa/deploy_test.go index c68ad750f1a..9addf81138a 100644 --- a/client/go/internal/vespa/deploy_test.go +++ b/client/go/internal/vespa/deploy_test.go @@ -19,7 +19,7 @@ import ( func TestDeploy(t *testing.T) { httpClient := mock.HTTPClient{} - target := LocalTarget(&httpClient, TLSOptions{}) + target := LocalTarget(&httpClient, TLSOptions{}, 0) appDir, _ := mock.ApplicationPackageDir(t, false, false) opts := DeploymentOptions{ Target: target, @@ -38,7 +38,7 @@ func TestDeploy(t *testing.T) { func TestDeployCloud(t *testing.T) { httpClient := mock.HTTPClient{} - target := createCloudTarget(t, "http://vespacloud", io.Discard) + target, _ := createCloudTarget(t, io.Discard) cloudTarget, ok := target.(*cloudTarget) require.True(t, ok) cloudTarget.httpClient = &httpClient @@ -51,7 +51,7 @@ func TestDeployCloud(t *testing.T) { require.Nil(t, err) assert.Equal(t, 1, len(httpClient.Requests)) req := httpClient.LastRequest - assert.Equal(t, "http://vespacloud/application/v4/tenant/t1/application/a1/instance/i1/deploy/dev-us-north-1", req.URL.String()) + assert.Equal(t, "https://api-ctl.vespa-cloud.com:4443/application/v4/tenant/t1/application/a1/instance/i1/deploy/dev-us-north-1", req.URL.String()) values := parseMultiPart(t, req) zipData := values["applicationZip"] @@ -71,7 +71,7 @@ func TestDeployCloud(t *testing.T) { func TestSubmit(t *testing.T) { httpClient := mock.HTTPClient{} - target := createCloudTarget(t, "http://vespacloud", io.Discard) + target, _ := createCloudTarget(t, io.Discard) cloudTarget, ok := target.(*cloudTarget) require.True(t, ok) cloudTarget.httpClient = &httpClient @@ -170,7 +170,7 @@ func TestFindApplicationPackage(t *testing.T) { func TestDeactivate(t *testing.T) { httpClient := mock.HTTPClient{} - target := LocalTarget(&httpClient, TLSOptions{}) + target := LocalTarget(&httpClient, TLSOptions{}, 0) opts := DeploymentOptions{Target: target} require.Nil(t, Deactivate(opts)) assert.Equal(t, 1, len(httpClient.Requests)) @@ -181,7 +181,7 @@ func TestDeactivate(t *testing.T) { func TestDeactivateCloud(t *testing.T) { httpClient := mock.HTTPClient{} - target := createCloudTarget(t, "http://vespacloud", io.Discard) + target, _ := createCloudTarget(t, io.Discard) cloudTarget, ok := target.(*cloudTarget) require.True(t, ok) cloudTarget.httpClient = &httpClient @@ -190,7 +190,7 @@ func TestDeactivateCloud(t *testing.T) { assert.Equal(t, 1, len(httpClient.Requests)) req := httpClient.LastRequest assert.Equal(t, "DELETE", req.Method) - assert.Equal(t, "http://vespacloud/application/v4/tenant/t1/application/a1/instance/i1/environment/dev/region/us-north-1", req.URL.String()) + assert.Equal(t, "https://api-ctl.vespa-cloud.com:4443/application/v4/tenant/t1/application/a1/instance/i1/environment/dev/region/us-north-1", req.URL.String()) } type pkgFixture struct { diff --git a/client/go/internal/vespa/log.go b/client/go/internal/vespa/log.go index 0e2cb5d0bfd..81088b8c0a1 100644 --- a/client/go/internal/vespa/log.go +++ b/client/go/internal/vespa/log.go @@ -72,6 +72,8 @@ func ReadLogEntries(r io.Reader) ([]LogEntry, error) { // LogLevel returns an int representing a named log level. func LogLevel(name string) int { switch name { + case "none": + return -1 case "error": return 0 case "warning": diff --git a/client/go/internal/vespa/target.go b/client/go/internal/vespa/target.go index 6dd64dd1275..3c65369f986 100644 --- a/client/go/internal/vespa/target.go +++ b/client/go/internal/vespa/target.go @@ -4,9 +4,11 @@ package vespa import ( "crypto/tls" + "errors" "fmt" "io" "net/http" + "strings" "sync" "time" @@ -27,18 +29,15 @@ const ( // A hosted Vespa target TargetHosted = "hosted" - // A Vespa service that handles deployments, either a config server or a controller - DeployService = "deploy" + // LatestDeployment waits for a deployment to converge to latest generation + LatestDeployment int64 = -1 - // A Vespa service that handles queries. - QueryService = "query" - - // A Vespa service that handles feeding of document. This may point to the same service as QueryService. - DocumentService = "document" - - retryInterval = 2 * time.Second + // AnyDeployment waits for a deployment to converge on any generation + AnyDeployment int64 = -2 ) +var errWaitTimeout = errors.New("wait timed out") + // Authenticator authenticates the given HTTP request. type Authenticator interface { Authenticate(request *http.Request) error @@ -50,9 +49,11 @@ type Service struct { Name string TLSOptions TLSOptions - once sync.Once - auth Authenticator - httpClient util.HTTPClient + deployAPI bool + once sync.Once + auth Authenticator + httpClient util.HTTPClient + retryInterval time.Duration } // Target represents a Vespa platform, running named Vespa services. @@ -66,8 +67,16 @@ type Target interface { // Deployment returns the deployment managed by this target. Deployment() Deployment - // Service returns the service for given name. If timeout is non-zero, wait for the service to converge. - Service(name string, timeout time.Duration, sessionOrRunID int64, cluster string) (*Service, error) + // DeployService returns the service providing the deploy API on this target. + DeployService() (*Service, error) + + // ContainerServices returns all container services of the current deployment. If timeout is positive, wait for + // services to be discovered. + ContainerServices(timeout time.Duration) ([]*Service, error) + + // AwaitDeployment waits for a deployment identified by id to succeed. It returns the id that succeeded, or an + // error. The exact meaning of id depends on the implementation. + AwaitDeployment(id int64, timeout time.Duration) (int64, error) // PrintLog writes the logs of this deployment using given options to control output. PrintLog(options LogOptions) error @@ -114,29 +123,63 @@ func (s *Service) Do(request *http.Request, timeout time.Duration) (*http.Respon func (s *Service) SetClient(client util.HTTPClient) { s.httpClient = client } // Wait polls the health check of this service until it succeeds or timeout passes. -func (s *Service) Wait(timeout time.Duration) (int, error) { +func (s *Service) Wait(timeout time.Duration) error { url := s.BaseURL - switch s.Name { - case DeployService: + if s.deployAPI { url += "/status.html" // because /ApplicationStatus is not publicly reachable in Vespa Cloud - case QueryService, DocumentService: + } else { url += "/ApplicationStatus" - default: - return 0, fmt.Errorf("invalid service: %s", s.Name) } - return waitForOK(s, url, timeout) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return err + } + okFunc := func(status int, response []byte) (bool, error) { return isOK(status) } + status, err := wait(s, okFunc, func() *http.Request { return req }, timeout, s.retryInterval) + if err != nil { + waitDesc := "" + if timeout > 0 { + waitDesc = " (after waiting " + timeout.String() + ") " + } + statusDesc := "" + if status > 0 { + statusDesc = fmt.Sprintf(": status %d", status) + } + return fmt.Errorf("unhealthy %s%s%s at %s: %w", s.Description(), waitDesc, statusDesc, url, err) + } + return nil } func (s *Service) Description() string { - switch s.Name { - case QueryService: - return "Container (query API)" - case DocumentService: - return "Container (document API)" - case DeployService: - return "Deploy API" + if s.deployAPI { + return "deploy API" + } + if s.Name == "" { + return "container" + } + return "container " + s.Name +} + +// FindService returns the service of given name, found among services, if any. +func FindService(name string, services []*Service) (*Service, error) { + if name == "" && len(services) == 1 { + return services[0], nil + } + names := make([]string, len(services)) + for i, s := range services { + if name == s.Name { + return s, nil + } + names[i] = s.Name } - return fmt.Sprintf("No description of service %s", s.Name) + found := "no services found" + if len(names) > 0 { + found = "known services: " + strings.Join(names, ", ") + } + if name != "" { + return nil, fmt.Errorf("no such service: %q: %s", name, found) + } + return nil, fmt.Errorf("no service specified: %s", found) } func isOK(status int) (bool, error) { @@ -145,57 +188,48 @@ func isOK(status int) (bool, error) { case 2: // success return true, nil case 4: // client error - return false, fmt.Errorf("request failed with status %d", status) - default: // retry + return false, fmt.Errorf("got status %d", status) + default: // retry on everything else return false, nil } } -type responseFunc func(status int, response []byte) (bool, error) +// responseFunc returns whether a HTTP request is considered successful, based on its status and response data. +// Returning false indicates that the operation should be retried. An error is returned if the response is considered +// terminal and that the request should not be retried. +type responseFunc func(status int, response []byte) (ok bool, err error) type requestFunc func() *http.Request -// waitForOK queries url and returns its status code. If response status is not 2xx or 4xx, it is repeatedly queried -// until timeout elapses. -func waitForOK(service *Service, url string, timeout time.Duration) (int, error) { - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return 0, err - } - okFunc := func(status int, response []byte) (bool, error) { - ok, err := isOK(status) - if err != nil { - return false, fmt.Errorf("failed to query %s at %s: %w", service.Description(), url, err) - } - return ok, err - } - return wait(service, okFunc, func() *http.Request { return req }, timeout) -} - -func wait(service *Service, fn responseFunc, reqFn requestFunc, timeout time.Duration) (int, error) { +// wait queries service until one of the following conditions are satisfied: +// +// 1. okFn returns true or a non-nil error +// 2. timeout is exceeded +// +// It returns the last received HTTP status code and error, if any. +func wait(service *Service, okFn responseFunc, reqFn requestFunc, timeout, retryInterval time.Duration) (int, error) { var ( - httpErr error - response *http.Response - statusCode int + status int + response *http.Response + err error ) deadline := time.Now().Add(timeout) loopOnce := timeout == 0 for time.Now().Before(deadline) || loopOnce { - req := reqFn() - response, httpErr = service.Do(req, 10*time.Second) - if httpErr == nil { - statusCode = response.StatusCode + response, err = service.Do(reqFn(), 10*time.Second) + if err == nil { + status = response.StatusCode body, err := io.ReadAll(response.Body) if err != nil { return 0, err } response.Body.Close() - ok, err := fn(statusCode, body) + ok, err := okFn(status, body) if err != nil { - return statusCode, err + return status, err } if ok { - return statusCode, nil + return status, nil } } timeLeft := time.Until(deadline) @@ -204,5 +238,8 @@ func wait(service *Service, fn responseFunc, reqFn requestFunc, timeout time.Dur } time.Sleep(retryInterval) } - return statusCode, httpErr + if err == nil { + return status, errWaitTimeout + } + return status, err } diff --git a/client/go/internal/vespa/target_cloud.go b/client/go/internal/vespa/target_cloud.go index c0169f1a9bd..24133ba5fc3 100644 --- a/client/go/internal/vespa/target_cloud.go +++ b/client/go/internal/vespa/target_cloud.go @@ -3,12 +3,12 @@ package vespa import ( "bytes" "encoding/json" + "errors" "fmt" "math" "net/http" "sort" "strconv" - "strings" "time" "github.com/vespa-engine/vespa/client/go/internal/util" @@ -37,6 +37,7 @@ type cloudTarget struct { httpClient util.HTTPClient apiAuth Authenticator deploymentAuth Authenticator + retryInterval time.Duration } type deploymentEndpoint struct { @@ -49,13 +50,21 @@ type deploymentResponse struct { Endpoints []deploymentEndpoint `json:"endpoints"` } -type jobResponse struct { +type runResponse struct { Active bool `json:"active"` Status string `json:"status"` Log map[string][]logMessage `json:"log"` LastID int64 `json:"lastId"` } +type jobResponse struct { + ID int64 `json:"id"` +} + +type jobsResponse struct { + Runs []jobResponse `json:"runs"` +} + type logMessage struct { At int64 `json:"at"` Type string `json:"type"` @@ -63,7 +72,9 @@ type logMessage struct { } // CloudTarget creates a Target for the Vespa Cloud or hosted Vespa platform. -func CloudTarget(httpClient util.HTTPClient, apiAuth Authenticator, deploymentAuth Authenticator, apiOptions APIOptions, deploymentOptions CloudDeploymentOptions, logOptions LogOptions) (Target, error) { +func CloudTarget(httpClient util.HTTPClient, apiAuth Authenticator, deploymentAuth Authenticator, + apiOptions APIOptions, deploymentOptions CloudDeploymentOptions, + logOptions LogOptions, retryInterval time.Duration) (Target, error) { return &cloudTarget{ httpClient: httpClient, apiOptions: apiOptions, @@ -71,40 +82,10 @@ func CloudTarget(httpClient util.HTTPClient, apiAuth Authenticator, deploymentAu logOptions: logOptions, apiAuth: apiAuth, deploymentAuth: deploymentAuth, + retryInterval: retryInterval, }, nil } -func (t *cloudTarget) findClusterURL(cluster string, timeout time.Duration, runID int64) (string, error) { - if t.deploymentOptions.CustomURL != "" { - return t.deploymentOptions.CustomURL, nil - } - if t.deploymentOptions.ClusterURLs == nil { - if err := t.waitForEndpoints(timeout, runID); err != nil { - return "", err - } - } - clusters := make([]string, 0, len(t.deploymentOptions.ClusterURLs)) - for c := range t.deploymentOptions.ClusterURLs { - clusters = append(clusters, c) - } - if cluster == "" { - for _, url := range t.deploymentOptions.ClusterURLs { - if len(t.deploymentOptions.ClusterURLs) == 1 { - return url, nil - } else { - return "", fmt.Errorf("no cluster specified: found multiple clusters '%s'", strings.Join(clusters, "', '")) - } - } - } else { - url, ok := t.deploymentOptions.ClusterURLs[cluster] - if !ok { - return "", fmt.Errorf("invalid cluster '%s': must be one of '%s'", cluster, strings.Join(clusters, "', '")) - } - return url, nil - } - return "", fmt.Errorf("no endpoints found") -} - func (t *cloudTarget) Type() string { switch t.apiOptions.System.Name { case MainSystem.Name, CDSystem.Name: @@ -117,41 +98,52 @@ func (t *cloudTarget) IsCloud() bool { return true } func (t *cloudTarget) Deployment() Deployment { return t.deploymentOptions.Deployment } -func (t *cloudTarget) Service(name string, timeout time.Duration, runID int64, cluster string) (*Service, error) { - switch name { - case DeployService: +func (t *cloudTarget) DeployService() (*Service, error) { + return &Service{ + BaseURL: t.apiOptions.System.URL, + TLSOptions: t.apiOptions.TLSOptions, + deployAPI: true, + httpClient: t.httpClient, + auth: t.apiAuth, + retryInterval: t.retryInterval, + }, nil +} + +func (t *cloudTarget) ContainerServices(timeout time.Duration) ([]*Service, error) { + var clusterUrls map[string]string + if t.deploymentOptions.CustomURL != "" { + // Custom URL is always preferred + clusterUrls = map[string]string{"": t.deploymentOptions.CustomURL} + } else if t.deploymentOptions.ClusterURLs != nil { + // ... then endpoints specified through environment + clusterUrls = t.deploymentOptions.ClusterURLs + } else { + // ... then discovered endpoints + endpoints, err := t.discoverEndpoints(timeout) + if err != nil { + return nil, err + } + clusterUrls = endpoints + } + services := make([]*Service, 0, len(clusterUrls)) + for name, url := range clusterUrls { service := &Service{ - Name: name, - BaseURL: t.apiOptions.System.URL, - TLSOptions: t.apiOptions.TLSOptions, - httpClient: t.httpClient, - auth: t.apiAuth, + Name: name, + BaseURL: url, + TLSOptions: t.deploymentOptions.TLSOptions, + httpClient: t.httpClient, + auth: t.deploymentAuth, + retryInterval: t.retryInterval, } if timeout > 0 { - status, err := service.Wait(timeout) - if err != nil { + if err := service.Wait(timeout); err != nil { return nil, err } - if ok, _ := isOK(status); !ok { - return nil, fmt.Errorf("got status %d from deploy service at %s", status, service.BaseURL) - } - } - return service, nil - case QueryService, DocumentService: - url, err := t.findClusterURL(cluster, timeout, runID) - if err != nil { - return nil, err } - return &Service{ - Name: name, - BaseURL: url, - TLSOptions: t.deploymentOptions.TLSOptions, - httpClient: t.httpClient, - auth: t.deploymentAuth, - }, nil - default: - return nil, fmt.Errorf("unknown service: %s", name) + services = append(services, service) } + sort.Slice(services, func(i, j int) bool { return services[i].Name < services[j].Name }) + return services, nil } func (t *cloudTarget) CheckVersion(clientVersion version.Version) error { @@ -162,7 +154,7 @@ func (t *cloudTarget) CheckVersion(clientVersion version.Version) error { if err != nil { return err } - deployService, err := t.Service(DeployService, 0, 0, "") + deployService, err := t.DeployService() if err != nil { return err } @@ -218,7 +210,7 @@ func (t *cloudTarget) PrintLog(options LogOptions) error { } logEntries, err := ReadLogEntries(bytes.NewReader(response)) if err != nil { - return true, err + return false, err } for _, le := range logEntries { if !le.Time.After(lastFrom) { @@ -238,35 +230,65 @@ func (t *cloudTarget) PrintLog(options LogOptions) error { if options.Follow { timeout = math.MaxInt64 // No timeout } - _, err = t.deployServiceWait(logFunc, requestFunc, timeout) - return err + // Ignore wait error because logFunc has no concept of completion, we just want to print log entries until timeout is reached + if _, err := t.deployServiceWait(logFunc, requestFunc, timeout); err != nil && !errors.Is(err, errWaitTimeout) { + return fmt.Errorf("failed to read logs: %s", err) + } + return nil } func (t *cloudTarget) deployServiceWait(fn responseFunc, reqFn requestFunc, timeout time.Duration) (int, error) { - deployService, err := t.Service(DeployService, 0, 0, "") + deployService, err := t.DeployService() if err != nil { return 0, err } - return wait(deployService, fn, reqFn, timeout) + return wait(deployService, fn, reqFn, timeout, t.retryInterval) } -func (t *cloudTarget) waitForEndpoints(timeout time.Duration, runID int64) error { - if runID > 0 { - if err := t.waitForRun(runID, timeout); err != nil { - return err +func (t *cloudTarget) discoverLatestRun(timeout time.Duration) (int64, error) { + runsURL := fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/job/%s-%s?limit=1", + t.apiOptions.System.URL, + t.deploymentOptions.Deployment.Application.Tenant, t.deploymentOptions.Deployment.Application.Application, t.deploymentOptions.Deployment.Application.Instance, + t.deploymentOptions.Deployment.Zone.Environment, t.deploymentOptions.Deployment.Zone.Region) + req, err := http.NewRequest("GET", runsURL, nil) + if err != nil { + return 0, err + } + requestFunc := func() *http.Request { return req } + var lastRunID int64 + jobsSuccessFunc := func(status int, response []byte) (bool, error) { + if ok, err := isOK(status); !ok { + return ok, err + } + var resp jobsResponse + if err := json.Unmarshal(response, &resp); err != nil { + return false, err + } + if len(resp.Runs) > 0 { + lastRunID = resp.Runs[0].ID + return true, nil } + return false, nil } - return t.discoverEndpoints(timeout) + _, err = t.deployServiceWait(jobsSuccessFunc, requestFunc, timeout) + return lastRunID, err } -func (t *cloudTarget) waitForRun(runID int64, timeout time.Duration) error { +func (t *cloudTarget) AwaitDeployment(runID int64, timeout time.Duration) (int64, error) { + if runID == LatestDeployment { + lastRunID, err := t.discoverLatestRun(timeout) + if err != nil { + return 0, err + } + runID = lastRunID + } runURL := fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/job/%s-%s/run/%d", t.apiOptions.System.URL, t.deploymentOptions.Deployment.Application.Tenant, t.deploymentOptions.Deployment.Application.Application, t.deploymentOptions.Deployment.Application.Instance, t.deploymentOptions.Deployment.Zone.Environment, t.deploymentOptions.Deployment.Zone.Region, runID) req, err := http.NewRequest("GET", runURL, nil) if err != nil { - return err + return 0, err } lastID := int64(-1) requestFunc := func() *http.Request { @@ -275,13 +297,14 @@ func (t *cloudTarget) waitForRun(runID int64, timeout time.Duration) error { req.URL.RawQuery = q.Encode() return req } + success := false jobSuccessFunc := func(status int, response []byte) (bool, error) { if ok, err := isOK(status); !ok { return ok, err } - var resp jobResponse + var resp runResponse if err := json.Unmarshal(response, &resp); err != nil { - return false, nil + return false, err } if t.logOptions.Writer != nil { lastID = t.printLog(resp, lastID) @@ -292,20 +315,27 @@ func (t *cloudTarget) waitForRun(runID int64, timeout time.Duration) error { if resp.Status != "success" { return false, fmt.Errorf("run %d ended with unsuccessful status: %s", runID, resp.Status) } - return true, nil + success = true + return success, nil } _, err = t.deployServiceWait(jobSuccessFunc, requestFunc, timeout) - return err + if err != nil { + return 0, fmt.Errorf("deployment run %d incomplete after waiting %s: %w", runID, timeout, err) + } + if !success { + return 0, fmt.Errorf("deployment run %d incomplete after waiting %s", runID, timeout) + } + return runID, nil } -func (t *cloudTarget) printLog(response jobResponse, last int64) int64 { +func (t *cloudTarget) printLog(response runResponse, last int64) int64 { if response.LastID == 0 { return last } var msgs []logMessage for step, stepMsgs := range response.Log { for _, msg := range stepMsgs { - if step == "copyVespaLogs" && LogLevel(msg.Type) > t.logOptions.Level || LogLevel(msg.Type) == 3 { + if (step == "copyVespaLogs" && LogLevel(msg.Type) > t.logOptions.Level) || LogLevel(msg.Type) == 3 { continue } msgs = append(msgs, msg) @@ -320,14 +350,14 @@ func (t *cloudTarget) printLog(response jobResponse, last int64) int64 { return response.LastID } -func (t *cloudTarget) discoverEndpoints(timeout time.Duration) error { +func (t *cloudTarget) discoverEndpoints(timeout time.Duration) (map[string]string, error) { deploymentURL := fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/environment/%s/region/%s", t.apiOptions.System.URL, t.deploymentOptions.Deployment.Application.Tenant, t.deploymentOptions.Deployment.Application.Application, t.deploymentOptions.Deployment.Application.Instance, t.deploymentOptions.Deployment.Zone.Environment, t.deploymentOptions.Deployment.Zone.Region) req, err := http.NewRequest("GET", deploymentURL, nil) if err != nil { - return err + return nil, err } urlsByCluster := make(map[string]string) endpointFunc := func(status int, response []byte) (bool, error) { @@ -350,11 +380,10 @@ func (t *cloudTarget) discoverEndpoints(timeout time.Duration) error { return true, nil } if _, err := t.deployServiceWait(endpointFunc, func() *http.Request { return req }, timeout); err != nil { - return err + return nil, fmt.Errorf("no endpoints found after waiting %s: %w", timeout, err) } if len(urlsByCluster) == 0 { - return fmt.Errorf("no endpoints discovered for %s", t.deploymentOptions.Deployment) + return nil, fmt.Errorf("no endpoints found after waiting %s", timeout) } - t.deploymentOptions.ClusterURLs = urlsByCluster - return nil + return urlsByCluster, nil } diff --git a/client/go/internal/vespa/target_custom.go b/client/go/internal/vespa/target_custom.go index fd0af0e8d53..b0ca4f8492c 100644 --- a/client/go/internal/vespa/target_custom.go +++ b/client/go/internal/vespa/target_custom.go @@ -3,8 +3,11 @@ package vespa import ( "encoding/json" "fmt" + "net" "net/http" "net/url" + "sort" + "strconv" "time" "github.com/vespa-engine/vespa/client/go/internal/util" @@ -12,24 +15,45 @@ import ( ) type customTarget struct { - targetType string - baseURL string - httpClient util.HTTPClient - tlsOptions TLSOptions + targetType string + baseURL string + httpClient util.HTTPClient + tlsOptions TLSOptions + retryInterval time.Duration } -type serviceConvergeResponse struct { - Converged bool `json:"converged"` +type serviceStatus struct { + Converged bool `json:"converged"` + CurrentGeneration int64 `json:"currentGeneration"` + Services []serviceInfo `json:"services"` +} + +type serviceInfo struct { + ClusterName string `json:"clusterName"` + Type string `json:"type"` + Port int `json:"port"` } // LocalTarget creates a target for a Vespa platform running locally. -func LocalTarget(httpClient util.HTTPClient, tlsOptions TLSOptions) Target { - return &customTarget{targetType: TargetLocal, baseURL: "http://127.0.0.1", httpClient: httpClient, tlsOptions: tlsOptions} +func LocalTarget(httpClient util.HTTPClient, tlsOptions TLSOptions, retryInterval time.Duration) Target { + return &customTarget{ + targetType: TargetLocal, + baseURL: "http://127.0.0.1", + httpClient: httpClient, + tlsOptions: tlsOptions, + retryInterval: retryInterval, + } } // CustomTarget creates a Target for a Vespa platform running at baseURL. -func CustomTarget(httpClient util.HTTPClient, baseURL string, tlsOptions TLSOptions) Target { - return &customTarget{targetType: TargetCustom, baseURL: baseURL, httpClient: httpClient, tlsOptions: tlsOptions} +func CustomTarget(httpClient util.HTTPClient, baseURL string, tlsOptions TLSOptions, retryInterval time.Duration) Target { + return &customTarget{ + targetType: TargetCustom, + baseURL: baseURL, + httpClient: httpClient, + tlsOptions: tlsOptions, + retryInterval: retryInterval, + } } func (t *customTarget) Type() string { return t.targetType } @@ -38,95 +62,126 @@ func (t *customTarget) IsCloud() bool { return false } func (t *customTarget) Deployment() Deployment { return DefaultDeployment } -func (t *customTarget) createService(name string) (*Service, error) { - switch name { - case DeployService, QueryService, DocumentService: - url, err := t.serviceURL(name, t.targetType) - if err != nil { - return nil, err - } - return &Service{BaseURL: url, Name: name, httpClient: t.httpClient, TLSOptions: t.tlsOptions}, nil +func (t *customTarget) PrintLog(options LogOptions) error { + return fmt.Errorf("log access is only supported on cloud: run vespa-logfmt on the admin node instead, or export from a container image (here named 'vespa') using docker exec vespa vespa-logfmt") +} + +func (t *customTarget) CheckVersion(version version.Version) error { return nil } + +func (t *customTarget) newService(url, name string, deployAPI bool) *Service { + return &Service{ + BaseURL: url, + Name: name, + deployAPI: deployAPI, + httpClient: t.httpClient, + TLSOptions: t.tlsOptions, + retryInterval: t.retryInterval, } - return nil, fmt.Errorf("unknown service: %s", name) } -func (t *customTarget) Service(name string, timeout time.Duration, sessionOrRunID int64, cluster string) (*Service, error) { - service, err := t.createService(name) +func (t *customTarget) DeployService() (*Service, error) { + if t.targetType == TargetCustom { + return t.newService(t.baseURL, "", true), nil + } + u, err := t.urlWithPort(19071) if err != nil { return nil, err } - if timeout > 0 { - if name == DeployService { - status, err := service.Wait(timeout) - if err != nil { - return nil, err - } - if ok, _ := isOK(status); !ok { - return nil, fmt.Errorf("got status %d from deploy service at %s", status, service.BaseURL) - } - } else { - if err := t.waitForConvergence(timeout); err != nil { - return nil, err - } + return t.newService(u.String(), "", true), nil +} + +func (t *customTarget) ContainerServices(timeout time.Duration) ([]*Service, error) { + if t.targetType == TargetCustom { + return []*Service{t.newService(t.baseURL, "", false)}, nil + } + status, err := t.serviceStatus(AnyDeployment, timeout) + if err != nil { + return nil, err + } + portsByCluster := make(map[string]int) + for _, serviceInfo := range status.Services { + if serviceInfo.Type != "container" { + continue + } + clusterName := serviceInfo.ClusterName + if clusterName == "" { // Vespa version older than 8.206.1, which does not include cluster name in the API + clusterName = serviceInfo.Type + strconv.Itoa(serviceInfo.Port) } + portsByCluster[clusterName] = serviceInfo.Port } - return service, nil + var services []*Service + for cluster, port := range portsByCluster { + url, err := t.urlWithPort(port) + if err != nil { + return nil, err + } + service := t.newService(url.String(), cluster, false) + services = append(services, service) + } + sort.Slice(services, func(i, j int) bool { return services[i].Name < services[j].Name }) + return services, nil } -func (t *customTarget) PrintLog(options LogOptions) error { - return fmt.Errorf("log access is only supported on cloud: run vespa-logfmt on the admin node instead, or export from a container image (here named 'vespa') using docker exec vespa vespa-logfmt") +func (t *customTarget) AwaitDeployment(generation int64, timeout time.Duration) (int64, error) { + status, err := t.serviceStatus(generation, timeout) + if err != nil { + return 0, err + } + return status.CurrentGeneration, nil } -func (t *customTarget) CheckVersion(version version.Version) error { return nil } - -func (t *customTarget) serviceURL(name string, targetType string) (string, error) { +func (t *customTarget) urlWithPort(port int) (*url.URL, error) { u, err := url.Parse(t.baseURL) if err != nil { - return "", err - } - if targetType == TargetLocal { - // Use same ports as the vespaengine/vespa container image - port := "" - switch name { - case DeployService: - port = "19071" - case QueryService, DocumentService: - port = "8080" - default: - return "", fmt.Errorf("unknown service: %s", name) - } - u.Host = u.Host + ":" + port + return nil, err } - return u.String(), nil + if _, _, err := net.SplitHostPort(u.Host); err == nil { + return nil, fmt.Errorf("url %s already contains port", u) + } + u.Host = net.JoinHostPort(u.Host, strconv.Itoa(port)) + return u, nil } -func (t *customTarget) waitForConvergence(timeout time.Duration) error { - deployService, err := t.createService(DeployService) +func (t *customTarget) serviceStatus(wantedGeneration int64, timeout time.Duration) (serviceStatus, error) { + deployService, err := t.DeployService() if err != nil { - return err + return serviceStatus{}, err } url := fmt.Sprintf("%s/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge", deployService.BaseURL) req, err := http.NewRequest("GET", url, nil) if err != nil { - return err + return serviceStatus{}, err } + var status serviceStatus converged := false - convergedFunc := func(status int, response []byte) (bool, error) { - if ok, err := isOK(status); !ok { + convergedFunc := func(httpStatus int, response []byte) (bool, error) { + if ok, err := isOK(httpStatus); !ok { return ok, err } - var resp serviceConvergeResponse - if err := json.Unmarshal(response, &resp); err != nil { - return false, nil + if err := json.Unmarshal(response, &status); err != nil { + return false, err } - converged = resp.Converged + converged = wantedGeneration == AnyDeployment || + (wantedGeneration == LatestDeployment && status.Converged) || + status.CurrentGeneration == wantedGeneration return converged, nil } - if _, err := wait(deployService, convergedFunc, func() *http.Request { return req }, timeout); err != nil { - return err + if _, err := wait(deployService, convergedFunc, func() *http.Request { return req }, timeout, t.retryInterval); err != nil { + return serviceStatus{}, fmt.Errorf("deployment not converged%s after waiting %s: %w", generationDescription(wantedGeneration), timeout, err) } if !converged { - return fmt.Errorf("services have not converged") + return serviceStatus{}, fmt.Errorf("deployment not converged%s after waiting %s", generationDescription(wantedGeneration), timeout) + } + return status, nil +} + +func generationDescription(generation int64) string { + switch generation { + case AnyDeployment: + return "" + case LatestDeployment: + return " on latest generation" + default: + return fmt.Sprintf(" on generation %d", generation) } - return nil } diff --git a/client/go/internal/vespa/target_test.go b/client/go/internal/vespa/target_test.go index 6dc97f496f5..b208489ddce 100644 --- a/client/go/internal/vespa/target_test.go +++ b/client/go/internal/vespa/target_test.go @@ -3,145 +3,224 @@ package vespa import ( "bytes" - "fmt" "io" "net/http" - "net/http/httptest" + "strings" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/vespa-engine/vespa/client/go/internal/mock" - "github.com/vespa-engine/vespa/client/go/internal/util" "github.com/vespa-engine/vespa/client/go/internal/version" ) -type mockVespaApi struct { - deploymentConverged bool - authFailure bool - serverURL string -} - -func (v *mockVespaApi) mockVespaHandler(w http.ResponseWriter, req *http.Request) { - if v.authFailure { - response := `{"message":"unauthorized"}` - w.WriteHeader(401) - w.Write([]byte(response)) - } - switch req.URL.Path { - case "/cli/v1/": - response := `{"minVersion":"8.0.0"}` - w.Write([]byte(response)) - case "/application/v4/tenant/t1/application/a1/instance/i1/environment/dev/region/us-north-1": - response := "{}" - if v.deploymentConverged { - response = fmt.Sprintf(`{"endpoints": [{"url": "%s","scope": "zone","cluster": "cluster1"}]}`, v.serverURL) - } - w.Write([]byte(response)) - case "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1/run/42": - var response string - if v.deploymentConverged { - response = `{"active": false, "status": "success"}` - } else { - response = `{"active": true, "status": "running", - "lastId": 42, - "log": {"deployReal": [{"at": 1631707708431, - "type": "info", - "message": "Deploying platform version 7.465.17 and application version 1.0.2 ..."}]}}` - } - w.Write([]byte(response)) - case "/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge": - response := fmt.Sprintf(`{"converged": %t}`, v.deploymentConverged) - w.Write([]byte(response)) - case "/application/v4/tenant/t1/application/a1/instance/i1/environment/dev/region/us-north-1/logs": - log := `1632738690.905535 host1a.dev.aws-us-east-1c 806/53 logserver-container Container.com.yahoo.container.jdisc.ConfiguredApplication info Switching to the latest deployed set of configurations and components. Application config generation: 52532 -1632738698.600189 host1a.dev.aws-us-east-1c 1723/33590 config-sentinel sentinel.sentinel.config-owner config Sentinel got 3 service elements [tenant(vespa-team), application(music), instance(mpolden)] for config generation 52532 -` - w.Write([]byte(log)) - case "/status.html": - w.Write([]byte("OK")) - case "/ApplicationStatus": - w.WriteHeader(500) - w.Write([]byte("Unknown error")) - default: - w.WriteHeader(400) - w.Write([]byte("Invalid path: " + req.URL.Path)) +func TestLocalTarget(t *testing.T) { + // Local target uses discovery + client := &mock.HTTPClient{} + lt := LocalTarget(client, TLSOptions{}, 0) + assertServiceURL(t, "http://127.0.0.1:19071", lt, "deploy") + for i := 0; i < 2; i++ { + response := ` +{ + "services": [ + { + "host": "foo", + "port": 8080, + "type": "container", + "url": "http://localhost:19071/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge/localhost:8080", + "currentGeneration": 1 + }, + { + "host": "bar", + "port": 8080, + "type": "container", + "url": "http://localhost:19071/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge/localhost:8080", + "currentGeneration": 1 + }, + { + "clusterName": "feed", + "host": "localhost", + "port": 8081, + "type": "container", + "url": "http://localhost:19071/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge/localhost:8081", + "currentGeneration": 1 + }, + { + "host": "localhost", + "port": 19112, + "type": "searchnode", + "url": "http://localhost:19071/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge/localhost:19112", + "currentGeneration": 1 + } + ], + "currentGeneration": 1 +}` + client.NextResponse(mock.HTTPResponse{ + URI: "/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge", + Status: 200, + Body: []byte(response), + }) } + assertServiceURL(t, "http://127.0.0.1:8080", lt, "container8080") + assertServiceURL(t, "http://127.0.0.1:8081", lt, "feed") } func TestCustomTarget(t *testing.T) { - lt := LocalTarget(&mock.HTTPClient{}, TLSOptions{}) - assertServiceURL(t, "http://127.0.0.1:19071", lt, "deploy") - assertServiceURL(t, "http://127.0.0.1:8080", lt, "query") - assertServiceURL(t, "http://127.0.0.1:8080", lt, "document") - - ct := CustomTarget(&mock.HTTPClient{}, "http://192.0.2.42", TLSOptions{}) + // Custom target always uses URL directly, without discovery + ct := CustomTarget(&mock.HTTPClient{}, "http://192.0.2.42", TLSOptions{}, 0) assertServiceURL(t, "http://192.0.2.42", ct, "deploy") - assertServiceURL(t, "http://192.0.2.42", ct, "query") - assertServiceURL(t, "http://192.0.2.42", ct, "document") - - ct2 := CustomTarget(&mock.HTTPClient{}, "http://192.0.2.42:60000", TLSOptions{}) + assertServiceURL(t, "http://192.0.2.42", ct, "") + ct2 := CustomTarget(&mock.HTTPClient{}, "http://192.0.2.42:60000", TLSOptions{}, 0) assertServiceURL(t, "http://192.0.2.42:60000", ct2, "deploy") - assertServiceURL(t, "http://192.0.2.42:60000", ct2, "query") - assertServiceURL(t, "http://192.0.2.42:60000", ct2, "document") + assertServiceURL(t, "http://192.0.2.42:60000", ct2, "") } func TestCustomTargetWait(t *testing.T) { - vc := mockVespaApi{} - srv := httptest.NewServer(http.HandlerFunc(vc.mockVespaHandler)) - defer srv.Close() - target := CustomTarget(util.CreateClient(time.Second*10), srv.URL, TLSOptions{}) + client := &mock.HTTPClient{} + target := CustomTarget(client, "http://192.0.2.42", TLSOptions{}, 0) + // Fails once + client.NextStatus(500) + assertService(t, true, target, "", 0) + // Fails multiple times + for i := 0; i < 3; i++ { + client.NextStatus(500) + client.NextResponseError(io.EOF) + } + // Then succeeds + client.NextResponse(mock.HTTPResponse{URI: "/ApplicationStatus", Status: 200}) + assertService(t, false, target, "", time.Second) +} + +func TestCustomTargetAwaitDeployment(t *testing.T) { + client := &mock.HTTPClient{} + target := CustomTarget(client, "http://192.0.2.42", TLSOptions{}, 0) - _, err := target.Service("query", time.Millisecond, 42, "") + // Not converged initially + _, err := target.AwaitDeployment(42, 0) assert.NotNil(t, err) - vc.deploymentConverged = true - _, err = target.Service("query", time.Millisecond, 42, "") - assert.Nil(t, err) + // Not converged on this generation + response := mock.HTTPResponse{ + URI: "/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge", + Status: 200, + Body: []byte(`{"currentGeneration": 42}`), + } + client.NextResponse(response) + _, err = target.AwaitDeployment(41, 0) + assert.NotNil(t, err) - assertServiceWait(t, 200, target, "deploy") - assertServiceWait(t, 500, target, "query") - assertServiceWait(t, 500, target, "document") + // Converged + client.NextResponse(response) + convergedID, err := target.AwaitDeployment(42, 0) + assert.Nil(t, err) + assert.Equal(t, int64(42), convergedID) } func TestCloudTargetWait(t *testing.T) { - vc := mockVespaApi{} - srv := httptest.NewServer(http.HandlerFunc(vc.mockVespaHandler)) - defer srv.Close() - vc.serverURL = srv.URL - var logWriter bytes.Buffer - target := createCloudTarget(t, srv.URL, &logWriter) - vc.authFailure = true - assertServiceWaitErr(t, 401, true, target, "deploy") - vc.authFailure = false - assertServiceWait(t, 200, target, "deploy") + target, client := createCloudTarget(t, &logWriter) + client.NextStatus(401) + assertService(t, true, target, "deploy", time.Second) // No retrying on 4xx + client.NextStatus(500) + client.NextStatus(500) + client.NextResponse(mock.HTTPResponse{URI: "/status.html", Status: 200}) + assertService(t, false, target, "deploy", time.Second) - _, err := target.Service("query", time.Millisecond, 42, "") + client.NextResponse(mock.HTTPResponse{ + URI: "/application/v4/tenant/t1/application/a1/instance/i1/environment/dev/region/us-north-1", + Status: 200, + Body: []byte(`{"endpoints":[]}`), + }) + _, err := target.ContainerServices(time.Millisecond) assert.NotNil(t, err) - vc.deploymentConverged = true - _, err = target.Service("query", time.Millisecond, 42, "") + response := mock.HTTPResponse{ + URI: "/application/v4/tenant/t1/application/a1/instance/i1/environment/dev/region/us-north-1", + Status: 200, + Body: []byte(`{ + "endpoints": [ + {"url": "http://a.example.com","scope": "zone", "cluster": "default"}, + {"url": "http://b.example.com","scope": "zone", "cluster": "feed"} + ] +}`), + } + client.NextResponse(response) + services, err := target.ContainerServices(time.Millisecond) assert.Nil(t, err) + assert.Equal(t, 2, len(services)) - assertServiceWait(t, 500, target, "query") - assertServiceWait(t, 500, target, "document") + client.NextResponse(response) + client.NextResponse(mock.HTTPResponse{URI: "/ApplicationStatus", Status: 500}) + assertService(t, true, target, "default", 0) + client.NextResponse(response) + client.NextResponse(mock.HTTPResponse{URI: "/ApplicationStatus", Status: 200}) + assertService(t, false, target, "feed", 0) +} + +func TestCloudTargetAwaitDeployment(t *testing.T) { + var logWriter bytes.Buffer + target, client := createCloudTarget(t, &logWriter) + + runningResponse := mock.HTTPResponse{ + URI: "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1/run/42?after=-1", + Status: 200, + Body: []byte(`{"active": true, "status": "running", + "lastId": 42, + "log": {"deployReal": [{"at": 1631707708431, + "type": "info", + "message": "Deploying platform version 7.465.17 and application version 1.0.2 ..."}]}}`), + } + client.NextResponse(runningResponse) + runningResponse.URI = "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1/run/42?after=42" + client.NextResponse(runningResponse) + // Deployment has not succeeded yet + _, err := target.AwaitDeployment(int64(42), time.Second) + assert.NotNil(t, err) // Log timestamp is converted to local time, do the same here in case the local time where tests are run varies tm := time.Unix(1631707708, 431000) expectedTime := tm.Format("[15:04:05]") - assert.Equal(t, expectedTime+" info Deploying platform version 7.465.17 and application version 1.0.2 ...\n", logWriter.String()) + assert.Equal(t, strings.Repeat(expectedTime+" info Deploying platform version 7.465.17 and application version 1.0.2 ...\n", 2), logWriter.String()) + + // Wanted deployment run eventually succeeds + runningResponse.URI = "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1/run/42?after=-1" + client.NextResponse(runningResponse) + client.NextResponse(mock.HTTPResponse{ + URI: "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1/run/42?after=42", + Status: 200, + Body: []byte(`{"active": false, "status": "success"}`), + }) + convergedID, err := target.AwaitDeployment(int64(42), time.Second) + assert.Nil(t, err) + assert.Equal(t, int64(42), convergedID) + + // Await latest deployment + client.NextResponse(mock.HTTPResponse{ + URI: "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1?limit=1", + Status: 200, + Body: []byte(`{"runs": [{"id": 1337}]}`), + }) + client.NextResponse(mock.HTTPResponse{ + URI: "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1/run/1337?after=-1", + Status: 200, + Body: []byte(`{"active": false, "status": "success"}`), + }) + convergedID, err = target.AwaitDeployment(LatestDeployment, time.Second) + assert.Nil(t, err) + assert.Equal(t, int64(1337), convergedID) } func TestLog(t *testing.T) { - vc := mockVespaApi{} - srv := httptest.NewServer(http.HandlerFunc(vc.mockVespaHandler)) - defer srv.Close() - vc.serverURL = srv.URL - vc.deploymentConverged = true - + target, client := createCloudTarget(t, io.Discard) + client.NextResponse(mock.HTTPResponse{ + URI: "/application/v4/tenant/t1/application/a1/instance/i1/environment/dev/region/us-north-1/logs?from=-62135596800000", + Status: 200, + Body: []byte(`1632738690.905535 host1a.dev.aws-us-east-1c 806/53 logserver-container Container.com.yahoo.container.jdisc.ConfiguredApplication info Switching to the latest deployed set of configurations and components. Application config generation: 52532 +1632738698.600189 host1a.dev.aws-us-east-1c 1723/33590 config-sentinel sentinel.sentinel.config-owner config Sentinel got 3 service elements [tenant(vespa-team), application(music), instance(mpolden)] for config generation 52532 +`), + }) var buf bytes.Buffer - target := createCloudTarget(t, srv.URL, io.Discard) if err := target.PrintLog(LogOptions{Writer: &buf, Level: 3}); err != nil { t.Fatal(err) } @@ -151,23 +230,22 @@ func TestLog(t *testing.T) { } func TestCheckVersion(t *testing.T) { - vc := mockVespaApi{} - srv := httptest.NewServer(http.HandlerFunc(vc.mockVespaHandler)) - defer srv.Close() - - target := createCloudTarget(t, srv.URL, io.Discard) + target, client := createCloudTarget(t, io.Discard) + for i := 0; i < 3; i++ { + client.NextResponse(mock.HTTPResponse{URI: "/cli/v1/", Status: 200, Body: []byte(`{"minVersion":"8.0.0"}`)}) + } assert.Nil(t, target.CheckVersion(version.MustParse("8.0.0"))) assert.Nil(t, target.CheckVersion(version.MustParse("8.1.0"))) assert.NotNil(t, target.CheckVersion(version.MustParse("7.0.0"))) } -func createCloudTarget(t *testing.T, url string, logWriter io.Writer) Target { +func createCloudTarget(t *testing.T, logWriter io.Writer) (Target, *mock.HTTPClient) { apiKey, err := CreateAPIKey() - assert.Nil(t, err) - + require.Nil(t, err) auth := &mockAuthenticator{} + client := &mock.HTTPClient{} target, err := CloudTarget( - util.CreateClient(time.Second*10), + client, auth, auth, APIOptions{APIKey: apiKey, System: PublicSystem}, @@ -178,39 +256,39 @@ func createCloudTarget(t *testing.T, url string, logWriter io.Writer) Target { }, }, LogOptions{Writer: logWriter}, + 0, ) - if err != nil { - t.Fatal(err) - } - if ct, ok := target.(*cloudTarget); ok { - ct.apiOptions.System.URL = url - } else { - t.Fatalf("Wrong target type %T", ct) - } - return target + require.Nil(t, err) + return target, client } -func assertServiceURL(t *testing.T, url string, target Target, service string) { - s, err := target.Service(service, 0, 42, "") - assert.Nil(t, err) - assert.Equal(t, url, s.BaseURL) +func getService(t *testing.T, target Target, name string) (*Service, error) { + t.Helper() + if name == "deploy" { + return target.DeployService() + } + services, err := target.ContainerServices(0) + require.Nil(t, err) + return FindService(name, services) } -func assertServiceWait(t *testing.T, expectedStatus int, target Target, service string) { - assertServiceWaitErr(t, expectedStatus, false, target, service) +func assertServiceURL(t *testing.T, url string, target Target, serviceName string) { + t.Helper() + service, err := getService(t, target, serviceName) + require.Nil(t, err) + assert.Equal(t, url, service.BaseURL) } -func assertServiceWaitErr(t *testing.T, expectedStatus int, expectErr bool, target Target, service string) { - s, err := target.Service(service, 0, 42, "") - assert.Nil(t, err) - - status, err := s.Wait(0) - if expectErr { +func assertService(t *testing.T, fail bool, target Target, serviceName string, timeout time.Duration) { + t.Helper() + service, err := getService(t, target, serviceName) + require.Nil(t, err) + err = service.Wait(timeout) + if fail { assert.NotNil(t, err) } else { assert.Nil(t, err) } - assert.Equal(t, expectedStatus, status) } type mockAuthenticator struct{} diff --git a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java index 1774b4f81d9..1ab3cc30db7 100644 --- a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java +++ b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java @@ -108,7 +108,7 @@ public interface ModelContext { @ModelFeatureFlag(owners = {"hmusum"}) default Architecture adminClusterArchitecture() { return Architecture.getDefault(); } @ModelFeatureFlag(owners = {"tokle"}) default boolean enableProxyProtocolMixedMode() { return true; } @ModelFeatureFlag(owners = {"arnej"}) default String logFileCompressionAlgorithm(String defVal) { return defVal; } - @ModelFeatureFlag(owners = {"tokle"}, removeAfter = "8.210") default boolean useRestrictedDataPlaneBindings() { return true; } + @ModelFeatureFlag(owners = {"tokle"}) default boolean useRestrictedDataPlaneBindings() { return false; } @ModelFeatureFlag(owners = {"arnej, bjorncs"}) default boolean enableGlobalPhase() { return true; } @ModelFeatureFlag(owners = {"baldersheim"}, comment = "Select summary decode type") default String summaryDecodePolicy() { return "eager"; } @ModelFeatureFlag(owners = {"hmusum"}) default boolean allowMoreThanOneContentGroupDown(ClusterSpec.Id id) { return false; } diff --git a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java index 9f23c9b7231..b06d3572fcb 100644 --- a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java +++ b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java @@ -82,6 +82,7 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea private int mbus_network_threads = 1; private int heapSizePercentage = ApplicationContainerCluster.defaultHeapSizePercentageOfAvailableMemory; private Architecture adminClusterNodeResourcesArchitecture = Architecture.getDefault(); + private boolean useRestrictedDataPlaneBindings = false; private Optional<CloudAccount> cloudAccount = Optional.empty(); private boolean allowUserFilters = true; private boolean allowMoreThanOneContentGroupDown = false; @@ -140,6 +141,7 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea @Override public int rpcEventsBeforeWakeup() { return rpc_events_before_wakeup; } @Override public String queryDispatchPolicy() { return queryDispatchPolicy; } @Override public String summaryDecodePolicy() { return summaryDecodePolicy; } + @Override public boolean useRestrictedDataPlaneBindings() { return useRestrictedDataPlaneBindings; } @Override public Optional<CloudAccount> cloudAccount() { return cloudAccount; } @Override public boolean allowUserFilters() { return allowUserFilters; } @Override public boolean enableGlobalPhase() { return true; } // Enable global-phase by default for unit tests only @@ -364,6 +366,11 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea return this; } + public TestProperties setUseRestrictedDataPlaneBindings(boolean useRestrictedDataPlaneBindings) { + this.useRestrictedDataPlaneBindings = useRestrictedDataPlaneBindings; + return this; + } + public TestProperties setCloudAccount(CloudAccount cloudAccount) { this.cloudAccount = Optional.ofNullable(cloudAccount); return this; diff --git a/config-model/src/main/java/com/yahoo/vespa/model/application/validation/UriBindingsValidator.java b/config-model/src/main/java/com/yahoo/vespa/model/application/validation/UriBindingsValidator.java index f869d578dcb..f4aa4f649bd 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/application/validation/UriBindingsValidator.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/application/validation/UriBindingsValidator.java @@ -57,7 +57,7 @@ class UriBindingsValidator extends Validator { if (binding instanceof SystemBindingPattern) return; // Allow binding to port if we are restricting data plane bindings - if (!binding.matchesAnyPort()) { + if (!binding.matchesAnyPort() && !deployState.featureFlags().useRestrictedDataPlaneBindings()) { throw new IllegalArgumentException(createErrorMessage(binding, "binding with port is not allowed")); } if (!binding.host().equals(BindingPattern.WILDCARD_PATTERN)) { @@ -73,7 +73,7 @@ class UriBindingsValidator extends Validator { } private static String createErrorMessage(BindingPattern binding, String message) { - return String.format("For binding '%s': %s", binding.originalPatternString(), message); + return String.format("For binding '%s': %s", binding.patternString(), message); } } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomHandlerBuilder.java b/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomHandlerBuilder.java index d674a56007f..9b5a1429cb7 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomHandlerBuilder.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomHandlerBuilder.java @@ -48,7 +48,7 @@ public class DomHandlerBuilder extends VespaDomBuilder.DomConfigProducerBuilderB @Override protected Handler doBuild(DeployState deployState, TreeConfigProducer<AnyConfigProducer> parent, Element handlerElement) { Handler handler = createHandler(handlerElement); - var ports = deployState.isHosted() + var ports = deployState.isHosted() && deployState.featureFlags().useRestrictedDataPlaneBindings() ? portBindingOverride : Set.<Integer>of(); for (Element xmlBinding : XML.getChildren(handlerElement, "binding")) @@ -64,7 +64,7 @@ public class DomHandlerBuilder extends VespaDomBuilder.DomConfigProducerBuilderB UserBindingPattern bindingPattern = UserBindingPattern.fromPattern(path); if (portBindingOverride.isEmpty()) return Set.of(bindingPattern); return portBindingOverride.stream() - .map(bindingPattern::withOverriddenPort) + .map(bindingPattern::withPort) .toList(); } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/clients/ContainerDocumentApi.java b/config-model/src/main/java/com/yahoo/vespa/model/clients/ContainerDocumentApi.java index 0795fdf41d6..a5a567b18f8 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/clients/ContainerDocumentApi.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/clients/ContainerDocumentApi.java @@ -92,7 +92,7 @@ public class ContainerDocumentApi { UserBindingPattern bindingPattern = UserBindingPattern.fromPattern(path); if (ports.isEmpty()) return List.of(bindingPattern); return ports.stream() - .map(p -> (BindingPattern)bindingPattern.withOverriddenPort(p)) + .map(p -> (BindingPattern)bindingPattern.withPort(p)) .toList(); } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/component/BindingPattern.java b/config-model/src/main/java/com/yahoo/vespa/model/container/component/BindingPattern.java index f580a0a2cc9..c3dae7e4c8a 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/container/component/BindingPattern.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/container/component/BindingPattern.java @@ -63,21 +63,11 @@ public abstract class BindingPattern implements Comparable<BindingPattern> { return builder.append(path).toString(); } - public String originalPatternString() { - StringBuilder builder = new StringBuilder(scheme).append("://").append(host); - originalPort().ifPresent(port -> builder.append(':').append(port)); - return builder.append(path).toString(); - } - /** Compares the underlying pattern string for equality */ public boolean hasSamePattern(BindingPattern other) { return this.patternString().equals(other.patternString()); } /** Returns true if pattern will match any port (if present) in uri **/ - public boolean matchesAnyPort() { return originalPort().filter(p -> !p.equals(WILDCARD_PATTERN)).isEmpty(); } - - public Optional<String> originalPort() { - return port(); - } + public boolean matchesAnyPort() { return port().filter(p -> !p.equals(WILDCARD_PATTERN)).isEmpty(); } @Override public boolean equals(Object o) { diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/component/UserBindingPattern.java b/config-model/src/main/java/com/yahoo/vespa/model/container/component/UserBindingPattern.java index e27dfe69f00..182eca835c1 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/container/component/UserBindingPattern.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/container/component/UserBindingPattern.java @@ -1,9 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.model.container.component; -import java.util.Objects; -import java.util.Optional; - /** * A {@link BindingPattern} which is constructed directly from a user provided 'binding' element from services.xml. * @@ -11,30 +8,12 @@ import java.util.Optional; */ public class UserBindingPattern extends BindingPattern { - private final Optional<String> originalPort; - - private UserBindingPattern(String scheme, String host, String port, String path) { - super(scheme, host, port, path); - this.originalPort = null; - } - private UserBindingPattern(String scheme, String host, String port, Optional<String> originalPort, String path) { - super(scheme, host, port, path); - this.originalPort = originalPort; - } - private UserBindingPattern(String binding) { - super(binding); - this.originalPort = null; - } + private UserBindingPattern(String scheme, String host, String port, String path) { super(scheme, host, port, path); } + private UserBindingPattern(String binding) { super(binding); } public static UserBindingPattern fromHttpPath(String path) { return new UserBindingPattern("http", "*", null, path); } public static UserBindingPattern fromPattern(String binding) { return new UserBindingPattern(binding); } - public UserBindingPattern withOverriddenPort(int port) { return new UserBindingPattern(scheme(), host(), Integer.toString(port), port(), path()); } - - public Optional<String> originalPort() { - return Objects.isNull(originalPort) - ? port() - : originalPort; - } + public UserBindingPattern withPort(int port) { return new UserBindingPattern(scheme(), host(), Integer.toString(port), path()); } @Override public String toString() { diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java b/config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java index 80b676159cb..31f8eba48bf 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java @@ -1114,7 +1114,7 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> { private void addSearchHandler(DeployState deployState, ApplicationContainerCluster cluster, Element searchElement, ConfigModelContext context) { var bindingPatterns = List.<BindingPattern>of(SearchHandler.DEFAULT_BINDING); - if (isHostedTenantApplication(context)) { + if (isHostedTenantApplication(context) && deployState.featureFlags().useRestrictedDataPlaneBindings()) { bindingPatterns = SearchHandler.bindingPattern(getDataplanePorts(deployState)); } SearchHandler searchHandler = new SearchHandler(cluster, @@ -1136,7 +1136,7 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> { private List<BindingPattern> toBindingList(DeployState deployState, ConfigModelContext context, List<Element> bindingElements) { List<BindingPattern> result = new ArrayList<>(); - var portOverride = isHostedTenantApplication(context) ? getDataplanePorts(deployState) : Set.<Integer>of(); + var portOverride = isHostedTenantApplication(context) && deployState.featureFlags().useRestrictedDataPlaneBindings() ? getDataplanePorts(deployState) : Set.<Integer>of(); for (Element element: bindingElements) { String text = element.getTextContent().trim(); if (!text.isEmpty()) @@ -1149,7 +1149,7 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> { UserBindingPattern bindingPattern = UserBindingPattern.fromPattern(path); if (portBindingOverride.isEmpty()) return Set.of(bindingPattern); return portBindingOverride.stream() - .map(bindingPattern::withOverriddenPort) + .map(bindingPattern::withPort) .toList(); } @@ -1160,7 +1160,7 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> { ContainerDocumentApi.HandlerOptions documentApiOptions = DocumentApiOptionsBuilder.build(documentApiElement); Element ignoreUndefinedFields = XML.getChild(documentApiElement, "ignore-undefined-fields"); - var portBindingOverride = isHostedTenantApplication(context) + var portBindingOverride = deployState.featureFlags().useRestrictedDataPlaneBindings() && isHostedTenantApplication(context) ? getDataplanePorts(deployState) : Set.<Integer>of(); return new ContainerDocumentApi(cluster, documentApiOptions, diff --git a/config-model/src/test/java/com/yahoo/vespa/model/application/validation/UriBindingsValidatorTest.java b/config-model/src/test/java/com/yahoo/vespa/model/application/validation/UriBindingsValidatorTest.java index a56b268eeab..ff9596f2062 100644 --- a/config-model/src/test/java/com/yahoo/vespa/model/application/validation/UriBindingsValidatorTest.java +++ b/config-model/src/test/java/com/yahoo/vespa/model/application/validation/UriBindingsValidatorTest.java @@ -57,6 +57,12 @@ public class UriBindingsValidatorTest { runUriBindingValidator(true, createServicesXmlWithHandler("http://*/my-handler")); } + @Test + void allows_portbinding_when_restricting_data_plane() throws IOException, SAXException { + runUriBindingValidator(new TestProperties().setHostedVespa(true).setUseRestrictedDataPlaneBindings(true), createServicesXmlWithHandler("http://*:4443/my-handler")); + } + + @Test void allows_user_binding_with_wildcard_port() throws IOException, SAXException { runUriBindingValidator(true, createServicesXmlWithHandler("http://*:*/my-handler")); } diff --git a/config-model/src/test/java/com/yahoo/vespa/model/container/xml/HandlerBuilderTest.java b/config-model/src/test/java/com/yahoo/vespa/model/container/xml/HandlerBuilderTest.java index fac07c6c6e6..6d61610a84f 100644 --- a/config-model/src/test/java/com/yahoo/vespa/model/container/xml/HandlerBuilderTest.java +++ b/config-model/src/test/java/com/yahoo/vespa/model/container/xml/HandlerBuilderTest.java @@ -1,6 +1,5 @@ package com.yahoo.vespa.model.container.xml; -import com.yahoo.config.model.ConfigModelContext; import com.yahoo.config.model.builder.xml.test.DomBuilderTest; import com.yahoo.config.model.deploy.DeployState; import com.yahoo.config.model.deploy.TestProperties; @@ -111,15 +110,36 @@ public class HandlerBuilderTest extends ContainerModelBuilderTestBase { @Test void restricts_default_bindings_in_hosted_vespa() { DeployState deployState = new DeployState.Builder() - .properties(new TestProperties().setHostedVespa(true)) + .properties(new TestProperties().setHostedVespa(true).setUseRestrictedDataPlaneBindings(true)) .build(); verifyDefaultBindings(deployState, "http://*:4443"); } @Test + void does_not_restrict_default_bindings_in_hosted_vespa_when_disabled() { + DeployState deployState = new DeployState.Builder() + .properties(new TestProperties().setHostedVespa(true).setUseRestrictedDataPlaneBindings(false)) + .build(); + verifyDefaultBindings(deployState, "http://*"); + } + + @Test + void does_not_restrict_infrastructure() { + DeployState deployState = new DeployState.Builder() + + .properties( + new TestProperties() + .setApplicationId(ApplicationId.defaultId()) + .setHostedVespa(true) + .setUseRestrictedDataPlaneBindings(false)) + .build(); + verifyDefaultBindings(deployState, "http://*"); + } + + @Test void restricts_custom_bindings_in_hosted_vespa() { DeployState deployState = new DeployState.Builder() - .properties(new TestProperties().setHostedVespa(true)) + .properties(new TestProperties().setHostedVespa(true).setUseRestrictedDataPlaneBindings(true)) .build(); verifyCustomSearchBindings(deployState, "http://*:4443"); } @@ -127,7 +147,7 @@ public class HandlerBuilderTest extends ContainerModelBuilderTestBase { @Test void does_not_restrict_default_bindings_in_self_hosted() { DeployState deployState = new DeployState.Builder() - .properties(new TestProperties().setHostedVespa(false)) + .properties(new TestProperties().setHostedVespa(false).setUseRestrictedDataPlaneBindings(false)) .build(); verifyDefaultBindings(deployState, "http://*"); } @@ -135,15 +155,12 @@ public class HandlerBuilderTest extends ContainerModelBuilderTestBase { @Test void does_not_restrict_custom_bindings_in_self_hosted() { DeployState deployState = new DeployState.Builder() - .properties(new TestProperties().setHostedVespa(false)) + .properties(new TestProperties().setHostedVespa(false).setUseRestrictedDataPlaneBindings(false)) .build(); verifyCustomSearchBindings(deployState, "http://*"); } private void verifyDefaultBindings(DeployState deployState, String bindingPrefix) { - verifyDefaultBindings(deployState, bindingPrefix, ConfigModelContext.ApplicationType.DEFAULT); - } - private void verifyDefaultBindings(DeployState deployState, String bindingPrefix, ConfigModelContext.ApplicationType applicationType) { Element clusterElem = DomBuilderTest.parse( "<container id='default' version='1.0'>", " <search/>", diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java index 1ebfef77a51..d815ea3328a 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java @@ -195,6 +195,7 @@ public class ModelContextImpl implements ModelContext { private final int mbus_cpp_events_before_wakeup; private final int rpc_num_targets; private final int rpc_events_before_wakeup; + private final boolean useRestrictedDataPlaneBindings; private final int heapPercentage; private final boolean enableGlobalPhase; private final String summaryDecodePolicy; @@ -238,6 +239,7 @@ public class ModelContextImpl implements ModelContext { this.rpc_events_before_wakeup = flagValue(source, appId, version, Flags.RPC_EVENTS_BEFORE_WAKEUP); this.queryDispatchPolicy = flagValue(source, appId, version, Flags.QUERY_DISPATCH_POLICY); this.queryDispatchWarmup = flagValue(source, appId, version, PermanentFlags.QUERY_DISPATCH_WARMUP); + this.useRestrictedDataPlaneBindings = flagValue(source, appId, version, Flags.RESTRICT_DATA_PLANE_BINDINGS); this.heapPercentage = flagValue(source, appId, version, PermanentFlags.HEAP_SIZE_PERCENTAGE); this.enableGlobalPhase = flagValue(source, appId, version, Flags.ENABLE_GLOBAL_PHASE); this.summaryDecodePolicy = flagValue(source, appId, version, Flags.SUMMARY_DECODE_POLICY); @@ -291,6 +293,7 @@ public class ModelContextImpl implements ModelContext { } return defVal; } + @Override public boolean useRestrictedDataPlaneBindings() { return useRestrictedDataPlaneBindings; } @Override public boolean enableGlobalPhase() { return enableGlobalPhase; } @Override public boolean allowMoreThanOneContentGroupDown(ClusterSpec.Id id) { return allowMoreThanOneContentGroupDown.test(id); } @Override public boolean enableDataplaneProxy() { return enableDataplaneProxy; } diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/configserver/Node.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/configserver/Node.java index a26e7cce29a..0b0664cd3bf 100644 --- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/configserver/Node.java +++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/configserver/Node.java @@ -3,6 +3,7 @@ package com.yahoo.vespa.hosted.controller.api.integration.configserver; import com.yahoo.component.Version; import com.yahoo.config.provision.ApplicationId; +import com.yahoo.config.provision.CloudAccount; import com.yahoo.config.provision.DockerImage; import com.yahoo.config.provision.HostName; import com.yahoo.config.provision.NodeResources; @@ -75,6 +76,7 @@ public class Node { private final Optional<String> switchHostname; private final Optional<String> modelName; private final Environment environment; + private final CloudAccount cloudAccount; private Node(String id, HostName hostname, Optional<HostName> parentHostname, State state, NodeType type, NodeResources resources, Optional<ApplicationId> owner, Version currentVersion, Version wantedVersion, @@ -87,7 +89,7 @@ public class Node { DockerImage wantedDockerImage, DockerImage currentDockerImage, Optional<ClusterType> exclusiveToClusterType, Map<String, String> reports, List<Event> history, Set<String> ipAddresses, Set<String> additionalIpAddresses, Set<String> additionalHostnames, Optional<String> switchHostname, - Optional<String> modelName, Environment environment) { + Optional<String> modelName, Environment environment, CloudAccount cloudAccount) { this.id = Objects.requireNonNull(id, "id must be non-null"); this.hostname = Objects.requireNonNull(hostname, "hostname must be non-null"); this.parentHostname = Objects.requireNonNull(parentHostname, "parentHostname must be non-null"); @@ -133,6 +135,7 @@ public class Node { this.switchHostname = Objects.requireNonNull(switchHostname, "switchHostname must be non-null"); this.modelName = Objects.requireNonNull(modelName, "modelName must be non-null"); this.environment = Objects.requireNonNull(environment, "environment must be non-ull"); + this.cloudAccount = Objects.requireNonNull(cloudAccount, "cloudAccount must be non-null"); } /** The cloud provider's unique ID for this */ @@ -344,6 +347,10 @@ public class Node { return environment; } + public CloudAccount cloudAccount() { + return cloudAccount; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -501,6 +508,7 @@ public class Node { private Optional<String> switchHostname = Optional.empty(); private Optional<String> modelName = Optional.empty(); private Environment environment = Environment.unknown; + private CloudAccount cloudAccount = CloudAccount.empty; private Builder() {} @@ -785,6 +793,11 @@ public class Node { return this; } + public Builder cloudAccount(CloudAccount cloudAccount) { + this.cloudAccount = cloudAccount; + return this; + } + public Node build() { return new Node(id, hostname, parentHostname, state, type, resources, owner, currentVersion, wantedVersion, currentOsVersion, wantedOsVersion, deferOsUpgrade, currentFirmwareCheck, wantedFirmwareCheck, serviceState, @@ -792,7 +805,7 @@ public class Node { wantedRebootGeneration, cost, failCount, flavor, clusterId, clusterType, group, index, retired, wantToRetire, wantToDeprovision, wantToRebuild, down, reservedTo, exclusiveTo, wantedDockerImage, currentDockerImage, exclusiveToClusterType, reports, history, ipAddresses, additionalIpAddresses, - additionalHostnames, switchHostname, modelName, environment); + additionalHostnames, switchHostname, modelName, environment, cloudAccount); } } diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java index d5eadf45b08..c48a4243fde 100644 --- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java +++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java @@ -274,6 +274,13 @@ public class Flags { APPLICATION_ID,HOSTNAME,NODE_TYPE,TENANT_ID,VESPA_VERSION ); + public static final UnboundBooleanFlag RESTRICT_DATA_PLANE_BINDINGS = defineFeatureFlag( + "restrict-data-plane-bindings", false, + List.of("mortent"), "2022-09-08", "2023-09-01", + "Use restricted data plane bindings", + "Takes effect at redeployment", + APPLICATION_ID); + public static final UnboundBooleanFlag ENABLE_OTELCOL = defineFeatureFlag( "enable-otel-collector", false, List.of("olaa"), "2022-09-23", "2023-09-01", @@ -368,6 +375,13 @@ public class Flags { "Use the vespa user for running Vespa everywhere", "Takes effect immediately"); + public static final UnboundBooleanFlag MORE_WIREGUARD = defineFeatureFlag( + "more-wireguard", false, + List.of("andreer"), "2023-08-21", "2023-09-21", + "Use wireguard in INternal enCLAVES", + "Takes effect on next host-admin run", + HOSTNAME); + /** WARNING: public for testing: All flags should be defined in {@link Flags}. */ public static UnboundBooleanFlag defineFeatureFlag(String flagId, boolean defaultValue, List<String> owners, String createdAt, String expiresAt, String description, diff --git a/metrics/pom.xml b/metrics/pom.xml index e8303e5a01f..a2fd17b7ea5 100644 --- a/metrics/pom.xml +++ b/metrics/pom.xml @@ -12,8 +12,14 @@ <packaging>jar</packaging> <version>8-SNAPSHOT</version> <name>metrics</name> + <dependencies> <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>com.yahoo.vespa</groupId> <artifactId>annotations</artifactId> <version>${project.version}</version> @@ -25,7 +31,18 @@ <version>${project.version}</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-engine</artifactId> + <scope>test</scope> + </dependency> </dependencies> + <build> <plugins> <plugin> diff --git a/metrics/src/main/java/ai/vespa/metrics/VespaMetrics.java b/metrics/src/main/java/ai/vespa/metrics/VespaMetrics.java index 3a17d8a3155..9a498abc911 100644 --- a/metrics/src/main/java/ai/vespa/metrics/VespaMetrics.java +++ b/metrics/src/main/java/ai/vespa/metrics/VespaMetrics.java @@ -17,6 +17,8 @@ public interface VespaMetrics { return baseName() + "." + suffix.suffix(); } + // TODO: make the below methods return Metric objects instead of Strings. + default String ninety_five_percentile() { return withSuffix(Suffix.ninety_five_percentile); } diff --git a/metrics/src/main/java/ai/vespa/metrics/set/BasicMetricSets.java b/metrics/src/main/java/ai/vespa/metrics/set/BasicMetricSets.java new file mode 100644 index 00000000000..f167e654e6f --- /dev/null +++ b/metrics/src/main/java/ai/vespa/metrics/set/BasicMetricSets.java @@ -0,0 +1,23 @@ +package ai.vespa.metrics.set; + +import ai.vespa.metrics.ContainerMetrics; + +/** + * Defines metric sets that are meant to be used as building blocks for other metric sets. + * + * @author gjoranv + */ +public class BasicMetricSets { + + static MetricSet containerHttpStatusMetrics() { + return new MetricSet.Builder("basic-container-http-status") + .metric(ContainerMetrics.HTTP_STATUS_1XX.rate()) + + .metric(ContainerMetrics.HTTP_STATUS_2XX.rate()) + .metric(ContainerMetrics.HTTP_STATUS_3XX.rate()) + .metric(ContainerMetrics.HTTP_STATUS_4XX.rate()) + .metric(ContainerMetrics.HTTP_STATUS_5XX.rate()) + .build(); + } + +} diff --git a/metrics/src/main/java/ai/vespa/metrics/set/DefaultMetrics.java b/metrics/src/main/java/ai/vespa/metrics/set/DefaultMetrics.java index 515b06de2d8..6ef23b790cd 100644 --- a/metrics/src/main/java/ai/vespa/metrics/set/DefaultMetrics.java +++ b/metrics/src/main/java/ai/vespa/metrics/set/DefaultMetrics.java @@ -2,20 +2,16 @@ package ai.vespa.metrics.set; +import ai.vespa.metrics.ClusterControllerMetrics; import ai.vespa.metrics.ContainerMetrics; -import ai.vespa.metrics.SearchNodeMetrics; -import ai.vespa.metrics.StorageMetrics; import ai.vespa.metrics.DistributorMetrics; -import ai.vespa.metrics.ClusterControllerMetrics; -import ai.vespa.metrics.SentinelMetrics; import ai.vespa.metrics.NodeAdminMetrics; -import ai.vespa.metrics.Suffix; -import ai.vespa.metrics.VespaMetrics; +import ai.vespa.metrics.SearchNodeMetrics; +import ai.vespa.metrics.SentinelMetrics; +import ai.vespa.metrics.StorageMetrics; -import java.util.Collections; import java.util.EnumSet; -import java.util.LinkedHashSet; -import java.util.Set; +import java.util.List; import static ai.vespa.metrics.Suffix.average; import static ai.vespa.metrics.Suffix.count; @@ -41,135 +37,137 @@ public class DefaultMetrics { private static MetricSet createMetricSet() { return new MetricSet(defaultMetricSetId, - getAllMetrics(), - Set.of(defaultVespaMetricSet)); + List.of(), + List.of(defaultVespaMetricSet, + BasicMetricSets.containerHttpStatusMetrics(), + getContainerMetrics(), + getSearchChainMetrics(), + getDocprocMetrics(), + getSearchNodeMetrics(), + getContentMetrics(), + getStorageMetrics(), + getDistributorMetrics(), + getClusterControllerMetrics(), + getSentinelMetrics(), + getOtherMetrics())); } - private static Set<Metric> getAllMetrics() { - Set<Metric> metrics = new LinkedHashSet<>(); - - addContainerMetrics(metrics); - addSearchChainMetrics(metrics); - addDocprocMetrics(metrics); - addSearchNodeMetrics(metrics); - addContentMetrics(metrics); - addStorageMetrics(metrics); - addDistributorMetrics(metrics); - addClusterControllerMetrics(metrics); - addSentinelMetrics(metrics); - addOtherMetrics(metrics); - return Collections.unmodifiableSet(metrics); + private static MetricSet getContainerMetrics() { + return new MetricSet.Builder("default-container") + .metric(ContainerMetrics.JDISC_GC_MS, EnumSet.of(max, average)) + .metric(ContainerMetrics.MEM_HEAP_FREE.average()) + .metric(ContainerMetrics.FEED_LATENCY, EnumSet.of(sum, count)) + // .metric(ContainerMetrics.CPU.baseName()) // TODO: Add to container metrics + .metric(ContainerMetrics.JDISC_THREAD_POOL_SIZE.max()) + .metric(ContainerMetrics.JDISC_THREAD_POOL_ACTIVE_THREADS, EnumSet.of(sum, count, min, max)) + .metric(ContainerMetrics.JDISC_THREAD_POOL_WORK_QUEUE_CAPACITY.max()) + .metric(ContainerMetrics.JDISC_THREAD_POOL_WORK_QUEUE_SIZE, EnumSet.of(sum, count, min, max)) + .metric(ContainerMetrics.SERVER_ACTIVE_THREADS.average()) + + // Metrics needed for alerting + .metric(ContainerMetrics.JDISC_SINGLETON_IS_ACTIVE, EnumSet.of(max, last)) // TODO: Vespa 9: Remove last + .metric(ContainerMetrics.JDISC_HTTP_SSL_HANDSHAKE_FAILURE_MISSING_CLIENT_CERT.rate()) + .metric(ContainerMetrics.JDISC_HTTP_SSL_HANDSHAKE_FAILURE_INCOMPATIBLE_PROTOCOLS.rate()) + .metric(ContainerMetrics.JDISC_HTTP_SSL_HANDSHAKE_FAILURE_INCOMPATIBLE_CHIFERS.rate()) + .metric(ContainerMetrics.JDISC_HTTP_SSL_HANDSHAKE_FAILURE_UNKNOWN.rate()) + .metric(ContainerMetrics.JDISC_APPLICATION_FAILED_COMPONENT_GRAPHS.rate()) + .metric(ContainerMetrics.ATHENZ_TENANT_CERT_EXPIRY_SECONDS, EnumSet.of(max, last)) // TODO: Vespa 9: Remove last + .build(); } - private static void addContainerMetrics(Set<Metric> metrics) { - addMetric(metrics, ContainerMetrics.HTTP_STATUS_1XX.rate()); - addMetric(metrics, ContainerMetrics.HTTP_STATUS_2XX.rate()); - addMetric(metrics, ContainerMetrics.HTTP_STATUS_3XX.rate()); - addMetric(metrics, ContainerMetrics.HTTP_STATUS_4XX.rate()); - addMetric(metrics, ContainerMetrics.HTTP_STATUS_5XX.rate()); - addMetric(metrics, ContainerMetrics.JDISC_GC_MS, EnumSet.of(max, average)); - addMetric(metrics, ContainerMetrics.MEM_HEAP_FREE.average()); - addMetric(metrics, ContainerMetrics.FEED_LATENCY, EnumSet.of(sum, count)); - // addMetric(metrics, ContainerMetrics.CPU.baseName()); // TODO: Add to container metrics - addMetric(metrics, ContainerMetrics.JDISC_THREAD_POOL_SIZE.max()); - addMetric(metrics, ContainerMetrics.JDISC_THREAD_POOL_ACTIVE_THREADS, EnumSet.of(sum, count, min, max)); - addMetric(metrics, ContainerMetrics.JDISC_THREAD_POOL_WORK_QUEUE_CAPACITY.max()); - addMetric(metrics, ContainerMetrics.JDISC_THREAD_POOL_WORK_QUEUE_SIZE, EnumSet.of(sum, count, min, max)); - addMetric(metrics, ContainerMetrics.SERVER_ACTIVE_THREADS.average()); - - // Metrics needed for alerting - addMetric(metrics, ContainerMetrics.JDISC_SINGLETON_IS_ACTIVE, EnumSet.of(max, last)); // TODO: Vespa 9: Remove last - addMetric(metrics, ContainerMetrics.JDISC_HTTP_SSL_HANDSHAKE_FAILURE_MISSING_CLIENT_CERT.rate()); - addMetric(metrics, ContainerMetrics.JDISC_HTTP_SSL_HANDSHAKE_FAILURE_INCOMPATIBLE_PROTOCOLS.rate()); - addMetric(metrics, ContainerMetrics.JDISC_HTTP_SSL_HANDSHAKE_FAILURE_INCOMPATIBLE_CHIFERS.rate()); - addMetric(metrics, ContainerMetrics.JDISC_HTTP_SSL_HANDSHAKE_FAILURE_UNKNOWN.rate()); - addMetric(metrics, ContainerMetrics.JDISC_APPLICATION_FAILED_COMPONENT_GRAPHS.rate()); - addMetric(metrics, ContainerMetrics.ATHENZ_TENANT_CERT_EXPIRY_SECONDS, EnumSet.of(max, last)); // TODO: Vespa 9: Remove last - } - - private static void addSearchChainMetrics(Set<Metric> metrics) { - addMetric(metrics, ContainerMetrics.QUERIES.rate()); - addMetric(metrics, ContainerMetrics.QUERY_LATENCY, EnumSet.of(sum, count, max, ninety_five_percentile, ninety_nine_percentile, average)); // TODO: Remove average with Vespa 9 - addMetric(metrics, ContainerMetrics.HITS_PER_QUERY, EnumSet.of(sum, count, max, average)); // TODO: Remove average with Vespa 9 - addMetric(metrics, ContainerMetrics.TOTAL_HITS_PER_QUERY, EnumSet.of(sum, count, max, average)); // TODO: Remove average with Vespa 9 - addMetric(metrics, ContainerMetrics.DEGRADED_QUERIES.rate()); - addMetric(metrics, ContainerMetrics.FAILED_QUERIES.rate()); + private static MetricSet getSearchChainMetrics() { + return new MetricSet.Builder("default-search-chain") + .metric(ContainerMetrics.QUERIES.rate()) + .metric(ContainerMetrics.QUERY_LATENCY, EnumSet.of(sum, count, max, ninety_five_percentile, ninety_nine_percentile, average)) // TODO: Remove average with Vespa 9 + .metric(ContainerMetrics.HITS_PER_QUERY, EnumSet.of(sum, count, max, average)) // TODO: Remove average with Vespa 9 + .metric(ContainerMetrics.TOTAL_HITS_PER_QUERY, EnumSet.of(sum, count, max, average)) // TODO: Remove average with Vespa 9 + .metric(ContainerMetrics.DEGRADED_QUERIES.rate()) + .metric(ContainerMetrics.FAILED_QUERIES.rate()) + .build(); } - private static void addDocprocMetrics(Set<Metric> metrics) { - addMetric(metrics, ContainerMetrics.DOCPROC_DOCUMENTS.sum()); + private static MetricSet getDocprocMetrics() { + return new MetricSet.Builder("default-docproc") + .metric(ContainerMetrics.DOCPROC_DOCUMENTS.sum()) + .build(); } - private static void addSearchNodeMetrics(Set<Metric> metrics) { + private static MetricSet getSearchNodeMetrics() { // Metrics needed for alerting - addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_RESOURCE_USAGE_DISK.average()); - addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_RESOURCE_USAGE_MEMORY.average()); - addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_RESOURCE_USAGE_FEEDING_BLOCKED, EnumSet.of(max, last)); // TODO: Vespa 9: Remove last + return new MetricSet.Builder("default-search-node") + .metric(SearchNodeMetrics.CONTENT_PROTON_RESOURCE_USAGE_DISK.average()) + .metric(SearchNodeMetrics.CONTENT_PROTON_RESOURCE_USAGE_MEMORY.average()) + .metric(SearchNodeMetrics.CONTENT_PROTON_RESOURCE_USAGE_FEEDING_BLOCKED, EnumSet.of(max, last)) // TODO: Vespa 9: Remove last + .build(); } - private static void addContentMetrics(Set<Metric> metrics) { - addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_SEARCH_PROTOCOL_DOCSUM_REQUESTED_DOCUMENTS.rate()); - addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_SEARCH_PROTOCOL_DOCSUM_LATENCY, EnumSet.of(sum, count, max, average)); // TODO: Remove average with Vespa 9 - addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_SEARCH_PROTOCOL_QUERY_LATENCY, EnumSet.of(sum, count, max, average)); // TODO: Remove average with Vespa 9 - - addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_DOCUMENTS_TOTAL, EnumSet.of(max,last)); // TODO: Vespa 9: Remove last - addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_DOCUMENTS_READY, EnumSet.of(max,last)); // TODO: Vespa 9: Remove last - addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_DOCUMENTS_ACTIVE, EnumSet.of(max,last)); // TODO: Vespa 9: Remove last - addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_DISK_USAGE.last()); - addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MEMORY_USAGE_ALLOCATED_BYTES.last()); - - addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_RESOURCE_USAGE_DISK.average()); - addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_RESOURCE_USAGE_MEMORY.average()); - - addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MATCHING_DOCS_MATCHED.rate()); - addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MATCHING_DOCS_RERANKED.rate()); - addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MATCHING_RANK_PROFILE_QUERY_SETUP_TIME, EnumSet.of(sum, count, max, average)); // TODO: Remove average with Vespa 9 - addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MATCHING_RANK_PROFILE_QUERY_LATENCY, EnumSet.of(sum, count, max, average)); // TODO: Remove average with Vespa 9 - addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MATCHING_RANK_PROFILE_RERANK_TIME, EnumSet.of(sum, count, max, average)); // TODO: Remove average with Vespa 9 - addMetric(metrics, SearchNodeMetrics.CONTENT_PROTON_TRANSACTIONLOG_DISK_USAGE.last()); + private static MetricSet getContentMetrics() { + return new MetricSet.Builder("default-content") + .metric(SearchNodeMetrics.CONTENT_PROTON_SEARCH_PROTOCOL_DOCSUM_REQUESTED_DOCUMENTS.rate()) + .metric(SearchNodeMetrics.CONTENT_PROTON_SEARCH_PROTOCOL_DOCSUM_LATENCY, EnumSet.of(sum, count, max, average)) // TODO: Remove average with Vespa 9 + .metric(SearchNodeMetrics.CONTENT_PROTON_SEARCH_PROTOCOL_QUERY_LATENCY, EnumSet.of(sum, count, max, average)) // TODO: Remove average with Vespa 9 + + .metric(SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_DOCUMENTS_TOTAL, EnumSet.of(max,last)) // TODO: Vespa 9: Remove last + .metric(SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_DOCUMENTS_READY, EnumSet.of(max,last)) // TODO: Vespa 9: Remove last + .metric(SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_DOCUMENTS_ACTIVE, EnumSet.of(max,last)) // TODO: Vespa 9: Remove last + .metric(SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_DISK_USAGE.last()) + .metric(SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MEMORY_USAGE_ALLOCATED_BYTES.last()) + + .metric(SearchNodeMetrics.CONTENT_PROTON_RESOURCE_USAGE_DISK.average()) + .metric(SearchNodeMetrics.CONTENT_PROTON_RESOURCE_USAGE_MEMORY.average()) + + .metric(SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MATCHING_DOCS_MATCHED.rate()) + .metric(SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MATCHING_DOCS_RERANKED.rate()) + .metric(SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MATCHING_RANK_PROFILE_QUERY_SETUP_TIME, EnumSet.of(sum, count, max, average)) // TODO: Remove average with Vespa 9 + .metric(SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MATCHING_RANK_PROFILE_QUERY_LATENCY, EnumSet.of(sum, count, max, average)) // TODO: Remove average with Vespa 9 + .metric(SearchNodeMetrics.CONTENT_PROTON_DOCUMENTDB_MATCHING_RANK_PROFILE_RERANK_TIME, EnumSet.of(sum, count, max, average)) // TODO: Remove average with Vespa 9 + .metric(SearchNodeMetrics.CONTENT_PROTON_TRANSACTIONLOG_DISK_USAGE.last()) + .build(); } - private static void addStorageMetrics(Set<Metric> metrics) { - addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_PUT_COUNT.rate()); - addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_UPDATE_COUNT.rate()); - addMetric(metrics, StorageMetrics.VDS_FILESTOR_ALLTHREADS_REMOVE_COUNT.rate()); + private static MetricSet getStorageMetrics() { + return new MetricSet.Builder("default-storage") + .metric(StorageMetrics.VDS_FILESTOR_ALLTHREADS_PUT_COUNT.rate()) + .metric(StorageMetrics.VDS_FILESTOR_ALLTHREADS_UPDATE_COUNT.rate()) + .metric(StorageMetrics.VDS_FILESTOR_ALLTHREADS_REMOVE_COUNT.rate()) + .build(); } - private static void addDistributorMetrics(Set<Metric> metrics) { - addMetric(metrics, DistributorMetrics.VDS_DISTRIBUTOR_DOCSSTORED.average()); + private static MetricSet getDistributorMetrics() { + return new MetricSet.Builder("default-distributor") + .metric(DistributorMetrics.VDS_DISTRIBUTOR_DOCSSTORED.average()) - // Metrics needed for alerting - addMetric(metrics, DistributorMetrics.VDS_BOUNCER_CLOCK_SKEW_ABORTS.count()); + // Metrics needed for alerting + .metric(DistributorMetrics.VDS_BOUNCER_CLOCK_SKEW_ABORTS.count()) + .build(); } - private static void addClusterControllerMetrics(Set<Metric> metrics) { + private static MetricSet getClusterControllerMetrics() { // Metrics needed for alerting - addMetric(metrics, ClusterControllerMetrics.DOWN_COUNT, EnumSet.of(max, last)); // TODO: Vespa 9: Remove last - addMetric(metrics, ClusterControllerMetrics.MAINTENANCE_COUNT, EnumSet.of(max, last)); // TODO: Vespa 9: Remove last - addMetric(metrics, ClusterControllerMetrics.UP_COUNT.last()); - addMetric(metrics, ClusterControllerMetrics.IS_MASTER, EnumSet.of(max, last)); // TODO: Vespa 9: Remove last - addMetric(metrics, ClusterControllerMetrics.RESOURCE_USAGE_NODES_ABOVE_LIMIT, EnumSet.of(max, last)); // TODO: Vespa 9: Remove last - addMetric(metrics, ClusterControllerMetrics.RESOURCE_USAGE_MAX_MEMORY_UTILIZATION, EnumSet.of(last, max)); // TODO: Vespa 9: Remove last - addMetric(metrics, ClusterControllerMetrics.RESOURCE_USAGE_MAX_DISK_UTILIZATION, EnumSet.of(last, max)); // TODO: Vespa 9: Remove last + return new MetricSet.Builder("default-cluster-controller") + .metric(ClusterControllerMetrics.DOWN_COUNT, EnumSet.of(max, last)) // TODO: Vespa 9: Remove last + .metric(ClusterControllerMetrics.MAINTENANCE_COUNT, EnumSet.of(max, last)) // TODO: Vespa 9: Remove last + .metric(ClusterControllerMetrics.UP_COUNT.last()) + .metric(ClusterControllerMetrics.IS_MASTER, EnumSet.of(max, last)) // TODO: Vespa 9: Remove last + .metric(ClusterControllerMetrics.RESOURCE_USAGE_NODES_ABOVE_LIMIT, EnumSet.of(max, last)) // TODO: Vespa 9: Remove last + .metric(ClusterControllerMetrics.RESOURCE_USAGE_MAX_MEMORY_UTILIZATION, EnumSet.of(last, max)) // TODO: Vespa 9: Remove last + .metric(ClusterControllerMetrics.RESOURCE_USAGE_MAX_DISK_UTILIZATION, EnumSet.of(last, max)) // TODO: Vespa 9: Remove last + .build(); } - private static void addSentinelMetrics(Set<Metric> metrics) { + private static MetricSet getSentinelMetrics() { // Metrics needed for alerting - addMetric(metrics, SentinelMetrics.SENTINEL_TOTAL_RESTARTS, EnumSet.of(max, sum, last)); // TODO: Vespa 9: Remove last, sum? + return new MetricSet.Builder("default-sentinel") + .metric(SentinelMetrics.SENTINEL_TOTAL_RESTARTS, EnumSet.of(max, sum, last)) // TODO: Vespa 9: Remove last, sum? + .build(); } - private static void addOtherMetrics(Set<Metric> metrics) { + private static MetricSet getOtherMetrics() { // Metrics needed for alerting - addMetric(metrics, NodeAdminMetrics.ENDPOINT_CERTIFICATE_EXPIRY_SECONDS.baseName()); - addMetric(metrics, NodeAdminMetrics.NODE_CERTIFICATE_EXPIRY_SECONDS.baseName()); - } - - private static void addMetric(Set<Metric> metrics, String nameWithSuffix) { - metrics.add(new Metric(nameWithSuffix)); - } - - private static void addMetric(Set<Metric> metrics, VespaMetrics metric, EnumSet<Suffix> suffixes) { - suffixes.forEach(suffix -> metrics.add(new Metric(metric.baseName() + "." + suffix.suffix()))); + return new MetricSet.Builder("default-other") + .metric(NodeAdminMetrics.ENDPOINT_CERTIFICATE_EXPIRY_SECONDS.baseName()) + .metric(NodeAdminMetrics.NODE_CERTIFICATE_EXPIRY_SECONDS.baseName()) + .build(); } private DefaultMetrics() { } diff --git a/metrics/src/main/java/ai/vespa/metrics/set/DefaultVespaMetrics.java b/metrics/src/main/java/ai/vespa/metrics/set/DefaultVespaMetrics.java index 93b6bfab002..e34c8ee68eb 100644 --- a/metrics/src/main/java/ai/vespa/metrics/set/DefaultVespaMetrics.java +++ b/metrics/src/main/java/ai/vespa/metrics/set/DefaultVespaMetrics.java @@ -4,9 +4,6 @@ package ai.vespa.metrics.set; import ai.vespa.metrics.ContainerMetrics; import ai.vespa.metrics.SearchNodeMetrics; -import java.util.LinkedHashSet; -import java.util.Set; - /** * Encapsulates a minimal set of Vespa metrics to be used as default for all metrics consumers. * @@ -19,11 +16,11 @@ public class DefaultVespaMetrics { public static final MetricSet defaultVespaMetricSet = createDefaultVespaMetricSet(); private static MetricSet createDefaultVespaMetricSet() { - Set<Metric> metrics = new LinkedHashSet<>(); - - metrics.add(new Metric(ContainerMetrics.FEED_OPERATIONS.rate())); - metrics.add(new Metric(SearchNodeMetrics.CONTENT_PROTON_RESOURCE_USAGE_FEEDING_BLOCKED.last())); - return new MetricSet("default-vespa", metrics); + return new MetricSet.Builder("default-vespa") + .metric(ContainerMetrics.FEED_OPERATIONS.rate()) + .metric(SearchNodeMetrics.CONTENT_PROTON_RESOURCE_USAGE_FEEDING_BLOCKED.last()) + .build(); } + } diff --git a/metrics/src/main/java/ai/vespa/metrics/set/MetricSet.java b/metrics/src/main/java/ai/vespa/metrics/set/MetricSet.java index b8409fb7663..f334690a7ca 100644 --- a/metrics/src/main/java/ai/vespa/metrics/set/MetricSet.java +++ b/metrics/src/main/java/ai/vespa/metrics/set/MetricSet.java @@ -1,7 +1,11 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.metrics.set; +import ai.vespa.metrics.Suffix; +import ai.vespa.metrics.VespaMetrics; + import java.util.Collection; +import java.util.EnumSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.Map; @@ -88,4 +92,44 @@ public class MetricSet { return metricMap; } + + public static class Builder { + private final String id; + private final Set<Metric> metrics = new LinkedHashSet<>(); + private final Set<MetricSet> children = new LinkedHashSet<>(); + + public Builder(String id) { + this.id = id; + } + + public Builder metric(String metric) { + return metric(new Metric(metric)); + } + + /** Adds all given suffixes of the given metric to this set. */ + public Builder metric(VespaMetrics metric, EnumSet<Suffix> suffixes) { + suffixes.forEach(suffix -> metrics.add(new Metric(metric.baseName() + "." + suffix.suffix()))); + return this; + } + + public Builder metric(Metric metric) { + metrics.add(metric); + return this; + } + + public Builder metrics(Collection<Metric> metrics) { + this.metrics.addAll(metrics); + return this; + } + + public Builder metricSet(MetricSet child) { + children.add(child); + return this; + } + + public MetricSet build() { + return new MetricSet(id, metrics, children); + } + } + } diff --git a/metrics/src/main/java/ai/vespa/metrics/set/VespaMetricSet.java b/metrics/src/main/java/ai/vespa/metrics/set/VespaMetricSet.java index bc8567b8bf5..18c5a637eb9 100644 --- a/metrics/src/main/java/ai/vespa/metrics/set/VespaMetricSet.java +++ b/metrics/src/main/java/ai/vespa/metrics/set/VespaMetricSet.java @@ -17,6 +17,7 @@ import ai.vespa.metrics.VespaMetrics; import java.util.Collections; import java.util.EnumSet; import java.util.LinkedHashSet; +import java.util.List; import java.util.Set; import static ai.vespa.metrics.Suffix.average; @@ -29,7 +30,6 @@ import static ai.vespa.metrics.Suffix.ninety_nine_percentile; import static ai.vespa.metrics.Suffix.rate; import static ai.vespa.metrics.Suffix.sum; import static ai.vespa.metrics.set.DefaultVespaMetrics.defaultVespaMetricSet; -import static java.util.Collections.singleton; /** * Encapsulates vespa service metrics. @@ -38,9 +38,14 @@ import static java.util.Collections.singleton; */ public class VespaMetricSet { - public static final MetricSet vespaMetricSet = new MetricSet("vespa", - getVespaMetrics(), - singleton(defaultVespaMetricSet)); + public static final MetricSet vespaMetricSet = createMetricSet(); + + private static MetricSet createMetricSet() { + return new MetricSet("vespa", + getVespaMetrics(), + List.of(defaultVespaMetricSet, + BasicMetricSets.containerHttpStatusMetrics())); + } private static Set<Metric> getVespaMetrics() { Set<Metric> metrics = new LinkedHashSet<>(); @@ -187,12 +192,6 @@ public class VespaMetricSet { addMetric(metrics, ContainerMetrics.ATHENZ_TENANT_CERT_EXPIRY_SECONDS, EnumSet.of(max, last)); // TODO: Vespa 9: Remove last addMetric(metrics, ContainerMetrics.CONTAINER_IAM_ROLE_EXPIRY_SECONDS.baseName()); - addMetric(metrics, ContainerMetrics.HTTP_STATUS_1XX.rate()); - addMetric(metrics, ContainerMetrics.HTTP_STATUS_2XX.rate()); - addMetric(metrics, ContainerMetrics.HTTP_STATUS_3XX.rate()); - addMetric(metrics, ContainerMetrics.HTTP_STATUS_4XX.rate()); - addMetric(metrics, ContainerMetrics.HTTP_STATUS_5XX.rate()); - addMetric(metrics, ContainerMetrics.JDISC_HTTP_REQUEST_PREMATURELY_CLOSED.rate()); addMetric(metrics, ContainerMetrics.JDISC_HTTP_REQUEST_REQUESTS_PER_CONNECTION, EnumSet.of(sum, count, min, max, average)); addMetric(metrics, ContainerMetrics.JDISC_HTTP_REQUEST_URI_LENGTH, EnumSet.of(sum, count, max)); diff --git a/config-model/src/test/java/com/yahoo/vespa/model/admin/monitoring/MetricSetTest.java b/metrics/src/test/java/ai/vespa/metrics/MetricSetTest.java index 8235f45aaec..788e9e9836c 100644 --- a/config-model/src/test/java/com/yahoo/vespa/model/admin/monitoring/MetricSetTest.java +++ b/metrics/src/test/java/ai/vespa/metrics/MetricSetTest.java @@ -1,11 +1,12 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.model.admin.monitoring; +package ai.vespa.metrics; import ai.vespa.metrics.set.Metric; import ai.vespa.metrics.set.MetricSet; import com.google.common.collect.Sets; import org.junit.jupiter.api.Test; +import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -60,4 +61,54 @@ public class MetricSetTest { assertEquals(3, combinedMetric.dimensions.size()); assertEquals("parentCommonVal", combinedMetric.dimensions.get(COMMON_DIMENSION_KEY)); } + + @Test + void it_can_be_generated_from_builder() { + MetricSet metricSet = new MetricSet.Builder("test") + .metric("metric1") + .metric(TestMetrics.ENUM_METRIC1.last()) + .metric(TestMetrics.ENUM_METRIC2, EnumSet.of(Suffix.sum, Suffix.count)) + .metric(new Metric("metric2")) + .metrics(List.of(new Metric("metric3"))) + .metricSet(new MetricSet.Builder("child") + .metric("child_metric1") + .metric("child_metric2") + .build()) + .build(); + + Map<String, Metric> metrics = metricSet.getMetrics(); + assertEquals(8, metrics.size()); + assertNotNull(metrics.get("metric1")); + assertNotNull(metrics.get("emum-metric1.last")); + assertNotNull(metrics.get("emum-metric2.sum")); + assertNotNull(metrics.get("emum-metric2.count")); + assertNotNull(metrics.get("metric2")); + assertNotNull(metrics.get("metric3")); + assertNotNull(metrics.get("child_metric1")); + assertNotNull(metrics.get("child_metric1")); + } + + enum TestMetrics implements VespaMetrics { + ENUM_METRIC1("emum-metric1"), + ENUM_METRIC2("emum-metric2"); + + private final String name; + + TestMetrics(String name) { + this.name = name; + } + + public String baseName() { + return name; + } + + public Unit unit() { + return null; + } + + public String description() { + return null; + } + + } } diff --git a/config-model/src/test/java/com/yahoo/vespa/model/admin/monitoring/MetricTest.java b/metrics/src/test/java/ai/vespa/metrics/MetricTest.java index f07b8c59322..7a0b85f82cc 100644 --- a/config-model/src/test/java/com/yahoo/vespa/model/admin/monitoring/MetricTest.java +++ b/metrics/src/test/java/ai/vespa/metrics/MetricTest.java @@ -1,5 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.model.admin.monitoring; +package ai.vespa.metrics; import ai.vespa.metrics.set.Metric; import com.google.common.collect.ImmutableMap; diff --git a/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp b/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp index d3553ad003f..c4b5d9c2145 100644 --- a/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp +++ b/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp @@ -668,7 +668,7 @@ vespalib::string lsSingleFile(const vespalib::string & fileName) { fs::path path(fileName); - return make_string("%s %20" PRIu64 " %12" PRId64, fileName.c_str(), vespalib::count_ns(fs::last_write_time(path).time_since_epoch()), fs::file_size(path)); + return make_string("%s %20" PRIu64 " %12" PRIdMAX, fileName.c_str(), vespalib::count_ns(fs::last_write_time(path).time_since_epoch()), fs::file_size(path)); } } diff --git a/storage/src/tests/distributor/check_condition_test.cpp b/storage/src/tests/distributor/check_condition_test.cpp index 757a9329ea6..617401dd271 100644 --- a/storage/src/tests/distributor/check_condition_test.cpp +++ b/storage/src/tests/distributor/check_condition_test.cpp @@ -5,6 +5,7 @@ #include <vespa/document/fieldset/fieldsets.h> #include <vespa/documentapi/messagebus/messages/testandsetcondition.h> #include <vespa/storage/distributor/node_supported_features.h> +#include <vespa/storage/distributor/operations/cancel_scope.h> #include <vespa/storage/distributor/operations/external/check_condition.h> #include <vespa/storage/distributor/persistence_operation_metric_set.h> #include <vespa/storageapi/message/persistence.h> @@ -227,6 +228,20 @@ TEST_F(CheckConditionTest, failed_gets_completes_check_with_error_outcome) { }); } +TEST_F(CheckConditionTest, check_fails_if_condition_explicitly_cancelled) { + test_cond_with_2_gets_sent([&](auto& cond) { + cond.handle_reply(_sender, make_matched_reply(0)); + cond.cancel(_sender, CancelScope::of_fully_cancelled()); + cond.handle_reply(_sender, make_matched_reply(1)); + }, [&](auto& outcome) { + EXPECT_FALSE(outcome.matched_condition()); + EXPECT_FALSE(outcome.not_found()); + EXPECT_TRUE(outcome.failed()); + EXPECT_EQ(outcome.error_code().getResult(), api::ReturnCode::ABORTED); + }); +} + +// TODO deprecate in favor of cancelling TEST_F(CheckConditionTest, check_fails_if_replica_set_changed_between_start_and_completion) { test_cond_with_2_gets_sent([&](auto& cond) { cond.handle_reply(_sender, make_matched_reply(0)); @@ -242,6 +257,7 @@ TEST_F(CheckConditionTest, check_fails_if_replica_set_changed_between_start_and_ }); } +// TODO deprecate in favor of cancelling TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start_and_completion_pending_transition_case) { test_cond_with_2_gets_sent([&](auto& cond) { cond.handle_reply(_sender, make_matched_reply(0)); @@ -255,6 +271,7 @@ TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start }); } +// TODO deprecate in favor of cancelling TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start_and_completion_completed_transition_case) { test_cond_with_2_gets_sent([&](auto& cond) { cond.handle_reply(_sender, make_matched_reply(0)); diff --git a/storage/src/tests/distributor/garbagecollectiontest.cpp b/storage/src/tests/distributor/garbagecollectiontest.cpp index 9b5056f2066..b1cf1cbc636 100644 --- a/storage/src/tests/distributor/garbagecollectiontest.cpp +++ b/storage/src/tests/distributor/garbagecollectiontest.cpp @@ -113,9 +113,8 @@ struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil { ASSERT_EQ(entry->getNodeCount(), info.size()); EXPECT_EQ(entry->getLastGarbageCollectionTime(), last_gc_time); for (size_t i = 0; i < info.size(); ++i) { - EXPECT_EQ(info[i], entry->getNode(i)->getBucketInfo()) - << "Mismatching info for node " << i << ": " << info[i] << " vs " - << entry->getNode(i)->getBucketInfo(); + auto& node = entry->getNodeRef(i); + EXPECT_EQ(info[i], node.getBucketInfo()) << "Mismatching DB bucket info for node " << node.getNode(); } } @@ -172,6 +171,51 @@ TEST_F(GarbageCollectionOperationTest, replica_bucket_info_not_added_to_db_until EXPECT_EQ(70u, gc_removed_documents_metric()); // Use max of received metrics } +TEST_F(GarbageCollectionOperationTest, no_replica_bucket_info_added_to_db_if_operation_fully_canceled) { + auto op = create_op(); + op->start(_sender); + ASSERT_EQ(2, _sender.commands().size()); + + reply_to_nth_request(*op, 0, 1234, 70); + op->cancel(_sender, CancelScope::of_fully_cancelled()); + reply_to_nth_request(*op, 1, 4567, 60); + + // DB state is unchanged. Note that in a real scenario, the DB entry will have been removed + // as part of the ownership change, but there are already non-cancellation behaviors that + // avoid creating buckets from scratch in the DB if they do not exist, so just checking to + // see if the bucket exists or not risks hiding missing cancellation edge handling. + ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(250, 50, 300)}, 0)); + // However, we still update our metrics if we _did_ remove documents on one or more nodes + EXPECT_EQ(70u, gc_removed_documents_metric()); +} + +TEST_F(GarbageCollectionOperationTest, no_replica_bucket_info_added_to_db_for_cancelled_node) { + auto op = create_op(); + op->start(_sender); + ASSERT_EQ(2, _sender.commands().size()); + + reply_to_nth_request(*op, 0, 1234, 70); + op->cancel(_sender, CancelScope::of_node_subset({0})); + reply_to_nth_request(*op, 1, 4567, 60); + + // DB state is unchanged for node 0, changed for node 1 + ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(4567, 90, 500)}, 34)); +} + +TEST_F(GarbageCollectionOperationTest, node_cancellation_is_cumulative) { + auto op = create_op(); + op->start(_sender); + ASSERT_EQ(2, _sender.commands().size()); + + reply_to_nth_request(*op, 0, 1234, 70); + op->cancel(_sender, CancelScope::of_node_subset({0})); + op->cancel(_sender, CancelScope::of_node_subset({1})); + reply_to_nth_request(*op, 1, 4567, 60); + + // DB state is unchanged for both nodes + ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(250, 50, 300)}, 0)); +} + TEST_F(GarbageCollectionOperationTest, gc_bucket_info_does_not_overwrite_later_sequenced_bucket_info_writes) { auto op = create_op(); op->start(_sender); @@ -363,6 +407,16 @@ TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_bucket_in receive_phase1_replies_and_assert_no_phase_2_started(); } +TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_operation_fully_cancelled_between_phases) { + _op->cancel(_sender, CancelScope::of_fully_cancelled()); + receive_phase1_replies_and_assert_no_phase_2_started(); +} + +TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_operation_partially_cancelled_between_phases) { + _op->cancel(_sender, CancelScope::of_node_subset({0})); + receive_phase1_replies_and_assert_no_phase_2_started(); +} + TEST_F(GarbageCollectionOperationTest, document_level_write_locks_are_checked_and_held_if_acquired) { enable_two_phase_gc(); auto op = create_op(); diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp index 76b6741442e..ee87fe84df6 100644 --- a/storage/src/tests/distributor/putoperationtest.cpp +++ b/storage/src/tests/distributor/putoperationtest.cpp @@ -6,6 +6,7 @@ #include <vespa/storage/distributor/top_level_distributor.h> #include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/storage/distributor/distributor_stripe.h> +#include <vespa/storage/distributor/operations/cancel_scope.h> #include <vespa/storage/distributor/operations/external/putoperation.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/persistence.h> @@ -208,6 +209,43 @@ TEST_F(PutOperationTest, failed_CreateBucket_removes_replica_from_db_and_sends_R _sender.getCommands(true, true, 4)); } +TEST_F(PutOperationTest, failed_CreateBucket_does_not_send_RequestBucketInfo_if_op_fully_canceled) { + setup_stripe(2, 2, "distributor:1 storage:2"); + + auto doc = createDummyDocument("test", "test"); + sendPut(createPut(doc)); + + ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0", _sender.getCommands(true)); + + op->cancel(_sender, CancelScope::of_fully_cancelled()); + sendReply(0, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 1 + + // DB is not touched (note: normally node 1 would be removed at the cancel-edge). + ASSERT_EQ("BucketId(0x4000000000008f09) : " + "node(idx=1,crc=0x1,docs=0/0,bytes=0/0,trusted=true,active=true,ready=false), " + "node(idx=0,crc=0x1,docs=0/0,bytes=0/0,trusted=true,active=false,ready=false)", + dumpBucket(operation_context().make_split_bit_constrained_bucket_id(doc->getId()))); + // No new requests sent + ASSERT_EQ("", _sender.getCommands(true, true, 4)); +} + +TEST_F(PutOperationTest, failed_CreateBucket_does_not_send_RequestBucketInfo_for_cancelled_nodes) { + setup_stripe(2, 2, "distributor:1 storage:2"); + + auto doc = createDummyDocument("test", "test"); + sendPut(createPut(doc)); + + ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0", _sender.getCommands(true)); + + op->cancel(_sender, CancelScope::of_node_subset({0})); + sendReply(0, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 1 + sendReply(1, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 0 + + // Bucket info recheck only sent to node 1, as it's not cancelled + ASSERT_EQ("RequestBucketInfoCommand(1 buckets, super bucket BucketId(0x4000000000008f09). ) => 1", + _sender.getCommands(true, true, 4)); +} + TEST_F(PutOperationTest, send_inline_split_before_put_if_bucket_too_large) { setup_stripe(1, 1, "storage:1 distributor:1"); auto cfg = make_config(); @@ -272,6 +310,26 @@ TEST_F(PutOperationTest, return_success_if_op_acked_on_all_replicas_even_if_buck _sender.getLastReply()); } +TEST_F(PutOperationTest, return_success_if_op_acked_on_all_replicas_even_if_operation_cancelled) { + setup_stripe(2, 2, "storage:2 distributor:1"); + createAndSendSampleDocument(TIMEOUT); + + ASSERT_EQ("Put(BucketId(0x4000000000001dd4), " + "id:test:testdoctype1::, timestamp 100, size 45) => 0," + "Put(BucketId(0x4000000000001dd4), " + "id:test:testdoctype1::, timestamp 100, size 45) => 1", + _sender.getCommands(true, true)); + + op->cancel(_sender, CancelScope::of_fully_cancelled()); + + sendReply(0); + sendReply(1); + + ASSERT_EQ("PutReply(id:test:testdoctype1::, BucketId(0x0000000000000000), " + "timestamp 100) ReturnCode(NONE)", + _sender.getLastReply()); +} + TEST_F(PutOperationTest, storage_failed) { setup_stripe(2, 1, "storage:1 distributor:1"); @@ -491,7 +549,7 @@ TEST_F(PutOperationTest, update_correct_bucket_on_remapped_put) { { std::shared_ptr<api::StorageCommand> msg2 = _sender.command(0); - std::shared_ptr<api::StorageReply> reply(msg2->makeReply().release()); + std::shared_ptr<api::StorageReply> reply(msg2->makeReply()); auto* sreply = dynamic_cast<api::PutReply*>(reply.get()); ASSERT_TRUE(sreply); sreply->remapBucketId(document::BucketId(17, 13)); @@ -511,6 +569,7 @@ TEST_F(PutOperationTest, update_correct_bucket_on_remapped_put) { dumpBucket(document::BucketId(17, 13))); } +// TODO make this redundant through operation cancelling TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_state) { setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3"); @@ -535,6 +594,7 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_ dumpBucket(operation_context().make_split_bit_constrained_bucket_id(doc->getId()))); } +// TODO make this redundant through operation cancelling TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending_state) { setup_stripe(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3"); @@ -568,6 +628,8 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending // TODO probably also do this for updates and removes // TODO consider if we should use the pending state verbatim for computing targets if it exists +// TODO make this redundant through operation cancelling +// ... actually; FIXME shouldn't the ExternalOperationHandler already cover this?? TEST_F(PutOperationTest, put_is_failed_with_busy_if_target_down_in_pending_state) { setup_stripe(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3"); auto doc = createDummyDocument("test", "test"); @@ -584,6 +646,65 @@ TEST_F(PutOperationTest, put_is_failed_with_busy_if_target_down_in_pending_state _sender.getLastReply(true)); } +TEST_F(PutOperationTest, db_not_updated_if_operation_cancelled_by_ownership_change) { + setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3"); + + auto doc = createDummyDocument("test", "uri"); + auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId()); + auto remap_bucket = BucketId(bucket.getUsedBits() + 1, bucket.getId()); + addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t"); + + sendPut(createPut(doc)); + + ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true)); + + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucket), {0, 1, 2}); + op->cancel(_sender, CancelScope::of_fully_cancelled()); + + // Normally DB updates triggered by replies don't _create_ buckets in the DB, unless + // they're remapped buckets. Use a remapping to ensure we hit a create-if-missing DB path. + { + std::shared_ptr<api::StorageCommand> msg2 = _sender.command(0); + std::shared_ptr<api::StorageReply> reply(msg2->makeReply()); + auto* sreply = dynamic_cast<api::PutReply*>(reply.get()); + ASSERT_TRUE(sreply); + sreply->remapBucketId(remap_bucket); + sreply->setBucketInfo(api::BucketInfo(1,2,3,4,5)); + op->receive(_sender, reply); + } + + sendReply(1, api::ReturnCode::OK, api::BucketInfo(5, 6, 7)); + sendReply(2, api::ReturnCode::OK, api::BucketInfo(7, 8, 9)); + + EXPECT_EQ("NONEXISTING", dumpBucket(bucket)); + EXPECT_EQ("NONEXISTING", dumpBucket(remap_bucket)); +} + +TEST_F(PutOperationTest, individually_cancelled_nodes_are_not_updated_in_db) { + setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3"); + + auto doc = createDummyDocument("test", "uri"); + auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId()); + addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t"); + + sendPut(createPut(doc)); + ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true)); + + // Simulate nodes 0 and 2 going down + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucket), {0, 2}); + // Cancelling shall be cumulative + op->cancel(_sender, CancelScope::of_node_subset({0})); + op->cancel(_sender, CancelScope::of_node_subset({2})); + + sendReply(0, api::ReturnCode::OK, api::BucketInfo(5, 6, 7)); + sendReply(1, api::ReturnCode::OK, api::BucketInfo(6, 7, 8)); + sendReply(2, api::ReturnCode::OK, api::BucketInfo(9, 8, 7)); + + EXPECT_EQ("BucketId(0x4000000000000593) : " + "node(idx=1,crc=0x5,docs=6/6,bytes=7/7,trusted=true,active=false,ready=false)", + dumpBucket(bucket)); +} + TEST_F(PutOperationTest, send_to_retired_nodes_if_no_up_nodes_available) { setup_stripe(Redundancy(2), NodeCount(2), "distributor:1 storage:2 .0.s:r .1.s:r"); @@ -761,6 +882,38 @@ TEST_F(PutOperationTest, failed_condition_probe_fails_op_with_returned_error) { _sender.getLastReply()); } +TEST_F(PutOperationTest, ownership_cancellation_during_condition_probe_fails_operation_on_probe_completion) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes()); + + op->receive(_sender, make_get_reply(*sent_get_command(0), 0, false, false)); + op->cancel(_sender, CancelScope::of_fully_cancelled()); + op->receive(_sender, make_get_reply(*sent_get_command(1), 0, false, false)); + + ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true)); + ASSERT_EQ("PutReply(id:test:testdoctype1::test, " + "BucketId(0x0000000000000000), timestamp 100) " + "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: " + "Operation has been cancelled (likely due to a cluster state change))", + _sender.getLastReply()); +} + +TEST_F(PutOperationTest, replica_subset_cancellation_during_condition_probe_fails_operation_on_probe_completion) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes()); + + op->receive(_sender, make_get_reply(*sent_get_command(0), 0, false, false)); + // 1 of 2 nodes; we still abort after the read phase since we cannot possibly fulfill + // the write phase for all replicas. + op->cancel(_sender, CancelScope::of_node_subset({0})); + op->receive(_sender, make_get_reply(*sent_get_command(1), 0, false, false)); + + ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true)); + ASSERT_EQ("PutReply(id:test:testdoctype1::test, " + "BucketId(0x0000000000000000), timestamp 100) " + "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: " + "Operation has been cancelled (likely due to a cluster state change))", + _sender.getLastReply()); +} + TEST_F(PutOperationTest, create_flag_in_parent_put_is_propagated_to_sent_puts) { setup_stripe(Redundancy(2), NodeCount(2), "version:1 storage:2 distributor:1"); auto doc = createDummyDocument("test", "test"); diff --git a/storage/src/tests/distributor/removeoperationtest.cpp b/storage/src/tests/distributor/removeoperationtest.cpp index d169c80a95d..3fad2c194a2 100644 --- a/storage/src/tests/distributor/removeoperationtest.cpp +++ b/storage/src/tests/distributor/removeoperationtest.cpp @@ -68,6 +68,7 @@ struct RemoveOperationTest : Test, DistributorStripeTestUtil { std::unique_ptr<api::StorageReply> reply(removec->makeReply()); auto* removeR = dynamic_cast<api::RemoveReply*>(reply.get()); removeR->setOldTimestamp(oldTimestamp); + removeR->setBucketInfo(api::BucketInfo(1,2,3,4,5)); callback.onReceive(_sender, std::shared_ptr<api::StorageReply>(reply.release())); } @@ -307,6 +308,45 @@ TEST_F(ExtRemoveOperationTest, failed_condition_probe_fails_op_with_returned_err _sender.getLastReply()); } +// Note: we don't exhaustively test cancellation edges here, as we assume that Put/Update/Remove ops +// share the same underlying PersistenceMessageTracker logic. See PutOperationTest for more tests. + +TEST_F(ExtRemoveOperationTest, cancellation_during_condition_probe_fails_operation_on_probe_completion) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::INCONSISTENT)); + + reply_with(make_get_reply(0, 50, false, true)); + op->cancel(_sender, CancelScope::of_fully_cancelled()); + reply_with(make_get_reply(1, 50, false, true)); + + ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true)); + EXPECT_EQ("RemoveReply(BucketId(0x0000000000000000), " + "id:test:test::uri, " + "timestamp 100, not found) " + "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: " + "Operation has been cancelled (likely due to a cluster state change))", + _sender.getLastReply()); +} + +TEST_F(ExtRemoveOperationTest, cancelled_nodes_are_not_updated_in_db) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::CONSISTENT)); + ASSERT_EQ("Remove => 1,Remove => 0", _sender.getCommands(true)); + + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucketId), {1}); + op->cancel(_sender, CancelScope::of_node_subset({1})); + + replyToMessage(*op, 0, 50); + replyToMessage(*op, 1, 50); + + EXPECT_EQ("BucketId(0x4000000000000593) : " + "node(idx=0,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false)", + dumpBucket(bucketId)); + // Reply is still OK since the operation went through on the content nodes + ASSERT_EQ("RemoveReply(BucketId(0x0000000000000000), " + "id:test:test::uri, timestamp 100, removed doc from 50) ReturnCode(NONE)", + _sender.getLastReply()); + +} + TEST_F(ExtRemoveOperationTest, trace_is_propagated_from_condition_probe_gets_ok_probe_case) { ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::INCONSISTENT)); diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp index da32225cde3..1907335545a 100644 --- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp +++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp @@ -4,16 +4,16 @@ #include <vespa/config/helper/configgetter.h> #include <vespa/document/base/testdocrepo.h> #include <vespa/document/fieldset/fieldsets.h> -#include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/fieldvalue/intfieldvalue.h> +#include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/document/update/arithmeticvalueupdate.h> -#include <vespa/storage/distributor/top_level_distributor.h> #include <vespa/storage/distributor/distributor_stripe.h> #include <vespa/storage/distributor/externaloperationhandler.h> #include <vespa/storage/distributor/operations/external/twophaseupdateoperation.h> +#include <vespa/storage/distributor/top_level_distributor.h> #include <vespa/storageapi/message/persistence.h> -#include <vespa/vespalib/gtest/gtest.h> +#include <gtest/gtest.h> #include <gmock/gmock.h> namespace storage::distributor { @@ -30,8 +30,9 @@ using namespace ::testing; struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { document::TestDocRepo _testRepo; std::shared_ptr<const DocumentTypeRepo> _repo; - const DocumentType* _doc_type; + const DocumentType* _doc_type{nullptr}; DistributorMessageSenderStub _sender; + BucketId _bucket_id{0x400000000000cac4}; TwoPhaseUpdateOperationTest(); ~TwoPhaseUpdateOperationTest() override; @@ -39,7 +40,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { void checkMessageSettingsPropagatedTo( const api::StorageCommand::SP& msg) const; - std::string getUpdatedValueFromLastPut(DistributorMessageSenderStub&); + static std::string getUpdatedValueFromLastPut(DistributorMessageSenderStub&); void SetUp() override { _repo = _testRepo.getTypeRepoSp(); @@ -57,20 +58,21 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { close(); } - void replyToMessage(Operation& callback, - DistributorMessageSenderStub& sender, - uint32_t index, - uint64_t oldTimestamp, - api::ReturnCode::Result result = api::ReturnCode::OK); + static void replyToMessage( + Operation& callback, + DistributorMessageSenderStub& sender, + uint32_t index, + uint64_t oldTimestamp, + api::ReturnCode::Result result = api::ReturnCode::OK); - void replyToPut( + static void replyToPut( Operation& callback, DistributorMessageSenderStub& sender, uint32_t index, api::ReturnCode::Result result = api::ReturnCode::OK, const std::string& traceMsg = ""); - void replyToCreateBucket( + static void replyToCreateBucket( Operation& callback, DistributorMessageSenderStub& sender, uint32_t index, @@ -85,7 +87,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { api::ReturnCode::Result result = api::ReturnCode::OK, const std::string& traceMsg = ""); - void reply_to_metadata_get( + static void reply_to_metadata_get( Operation& callback, DistributorMessageSenderStub& sender, uint32_t index, @@ -93,7 +95,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { api::ReturnCode::Result result = api::ReturnCode::OK, const std::string& trace_msg = ""); - void reply_to_get_with_tombstone( + static void reply_to_get_with_tombstone( Operation& callback, DistributorMessageSenderStub& sender, uint32_t index, @@ -148,11 +150,17 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { Timestamp highest_get_timestamp, Timestamp expected_response_timestamp); - std::shared_ptr<TwoPhaseUpdateOperation> set_up_2_inconsistent_replicas_and_start_update(bool enable_3phase = true) { - setup_stripe(2, 2, "storage:2 distributor:1"); + void do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(bool in_sync_replicas); + + void enable_3phase_updates(bool enable = true) { auto cfg = make_config(); - cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(enable_3phase); + cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(enable); configure_stripe(cfg); + } + + std::shared_ptr<TwoPhaseUpdateOperation> set_up_2_inconsistent_replicas_and_start_update(bool enable_3phase = true) { + setup_stripe(2, 2, "storage:2 distributor:1"); + enable_3phase_updates(enable_3phase); auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas. cb->start(_sender); return cb; @@ -199,13 +207,13 @@ TwoPhaseUpdateOperationTest::replyToPut( { std::shared_ptr<api::StorageMessage> msg2 = sender.command(index); auto& putc = dynamic_cast<PutCommand&>(*msg2); - std::unique_ptr<api::StorageReply> reply(putc.makeReply()); + std::shared_ptr<api::StorageReply> reply(putc.makeReply()); reply->setResult(api::ReturnCode(result, "")); + dynamic_cast<api::PutReply&>(*reply).setBucketInfo(api::BucketInfo(1,2,3,4,5)); if (!traceMsg.empty()) { MBUS_TRACE(reply->getTrace(), 1, traceMsg); } - callback.receive(sender, - std::shared_ptr<StorageReply>(reply.release())); + callback.receive(sender, reply); } void @@ -217,10 +225,9 @@ TwoPhaseUpdateOperationTest::replyToCreateBucket( { std::shared_ptr<api::StorageMessage> msg2 = sender.command(index); auto& putc = dynamic_cast<CreateBucketCommand&>(*msg2); - std::unique_ptr<api::StorageReply> reply(putc.makeReply()); + std::shared_ptr<api::StorageReply> reply(putc.makeReply()); reply->setResult(api::ReturnCode(result, "")); - callback.receive(sender, - std::shared_ptr<StorageReply>(reply.release())); + callback.receive(sender, reply); } void @@ -312,6 +319,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState, update->setCreateIfNonExistent(options._createIfNonExistent); document::BucketId id = operation_context().make_split_bit_constrained_bucket_id(update->getId()); + assert(id == _bucket_id); document::BucketId id2 = document::BucketId(id.getUsedBits() + 1, id.getRawId()); if (bucketState.length()) { @@ -554,6 +562,33 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_inconsiste _sender.getLastReply(true)); } +TEST_F(TwoPhaseUpdateOperationTest, fast_path_cancellation_transitively_cancels_nested_update_operation) { + setup_stripe(2, 2, "storage:2 distributor:1"); + enable_3phase_updates(); + auto op = sendUpdate("0=1/2/3,1=1/2/3"); + op->start(_sender); + + ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true)); + + replyToMessage(*op, _sender, 0, 110); + + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {1}); + op->cancel(_sender, CancelScope::of_node_subset({1})); + + replyToMessage(*op, _sender, 1, 110); + + // Client operation itself should return success since the update went through on all replica nodes + EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " + "BucketId(0x0000000000000000), " + "timestamp 0, timestamp of updated doc: 110) " + "ReturnCode(NONE)", + _sender.getLastReply(true)); + + EXPECT_EQ("BucketId(0x400000000000cac4) : " + "node(idx=0,crc=0x123,docs=1/1,bytes=100/100,trusted=true,active=false,ready=false)", + dumpBucket(_bucket_id)); +} + void TwoPhaseUpdateOperationTest::checkMessageSettingsPropagatedTo( const api::StorageCommand::SP& msg) const @@ -713,6 +748,38 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_gets_fail) { EXPECT_EQ(metrics().updates.failures.storagefailure.getValue(), 1); } +void TwoPhaseUpdateOperationTest::do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(bool in_sync_replicas) { + setup_stripe(2, 2, "storage:2 distributor:1"); + enable_3phase_updates(); + auto op = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().createIfNonExistent(true)); + op->start(_sender); + + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + replyToGet(*op, _sender, 0, 70); + + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {1}); + op->cancel(_sender, CancelScope::of_node_subset({1})); + + replyToGet(*op, _sender, 1, in_sync_replicas ? 70 : 80); + + EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " + "BucketId(0x0000000000000000), " + "timestamp 0, timestamp of updated doc: 0) " + "ReturnCode(BUCKET_NOT_FOUND, The update operation was cancelled due to a cluster " + "state change between executing the read and write phases of a write-repair update)", + _sender.getLastReply(true)); + + // TODO custom cancellation failure metric? +} + +TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion_consistent_case) { + do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(true); +} + +TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion_inconsistent_case) { + do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(false); +} + TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_apply_throws_exception) { setup_stripe(2, 2, "storage:2 distributor:1"); // Create update for wrong doctype which will fail the update. @@ -1214,6 +1281,59 @@ TEST_F(ThreePhaseUpdateTest, puts_are_sent_after_receiving_full_document_get) { EXPECT_EQ(1, m.ok.getValue()); } +TEST_F(ThreePhaseUpdateTest, update_fails_if_cancelled_between_metadata_gets_and_full_get) { + auto op = set_up_2_inconsistent_replicas_and_start_update(); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + reply_to_metadata_get(*op, _sender, 0, 2000); + reply_to_metadata_get(*op, _sender, 1, 1000); + ASSERT_EQ("Get => 0", _sender.getCommands(true, false, 2)); + + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {1}); + op->cancel(_sender, CancelScope::of_node_subset({1})); + + replyToGet(*op, _sender, 2, 2000U); + ASSERT_EQ("", _sender.getCommands(true, false, 3)); // No puts sent + + EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " + "BucketId(0x0000000000000000), " + "timestamp 0, timestamp of updated doc: 0) " + "ReturnCode(BUCKET_NOT_FOUND, The update operation was cancelled due to a cluster " + "state change between executing the read and write phases of a write-repair update)", + _sender.getLastReply(true)); + + // TODO cancellation metrics? +} + +TEST_F(ThreePhaseUpdateTest, fast_path_cancellation_transitively_cancels_nested_put_operation) { + auto op = set_up_2_inconsistent_replicas_and_start_update(); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + reply_to_metadata_get(*op, _sender, 0, 2000); + reply_to_metadata_get(*op, _sender, 1, 1000); + + ASSERT_EQ("Get => 0", _sender.getCommands(true, false, 2)); + replyToGet(*op, _sender, 2, 2000U); + + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {0}); + op->cancel(_sender, CancelScope::of_node_subset({0})); + + ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 3)); + replyToPut(*op, _sender, 3); + replyToPut(*op, _sender, 4); + + // Update itself is ACKed + EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " + "BucketId(0x0000000000000000), " + "timestamp 0, timestamp of updated doc: 2000) " + "ReturnCode(NONE)", + _sender.getLastReply(true)); + + // But cancelled replicas are not reintroduced into the bucket DB + EXPECT_EQ("BucketId(0x400000000000cac4) : " + "node(idx=1,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false)", + dumpBucket(_bucket_id)); +} + + TEST_F(ThreePhaseUpdateTest, consistent_meta_get_timestamps_can_restart_in_fast_path) { auto cb = set_up_2_inconsistent_replicas_and_start_update(); ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); @@ -1277,8 +1397,7 @@ TEST_F(ThreePhaseUpdateTest, update_failed_with_transient_error_code_if_replica_ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // 2 replicas, room for 1 more. cb->start(_sender); // Add new replica to deterministic test bucket after gets have been sent - BucketId bucket(0x400000000000cac4); // Always the same in the test. - addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3"); + addNodesToBucketDB(_bucket_id, "0=1/2/3,1=2/3/4,2=3/3/3"); Timestamp old_timestamp = 500; ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp index d0ae31b9524..fefc88a27c2 100644 --- a/storage/src/tests/distributor/updateoperationtest.cpp +++ b/storage/src/tests/distributor/updateoperationtest.cpp @@ -11,7 +11,7 @@ #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/state.h> -#include <vespa/vespalib/gtest/gtest.h> +#include <gtest/gtest.h> using config::ConfigGetter; using config::FileSpec; @@ -29,11 +29,14 @@ struct UpdateOperationTest : Test, DistributorStripeTestUtil { std::shared_ptr<const DocumentTypeRepo> _repo; const DocumentType* _html_type; + UpdateOperationTest() + : _repo(std::make_shared<DocumentTypeRepo>(*ConfigGetter<DocumenttypesConfig>:: + getConfig("config-doctypes", FileSpec("../config-doctypes.cfg")))), + _html_type(_repo->getDocumentType("text/html")) + { + } + void SetUp() override { - _repo.reset( - new DocumentTypeRepo(*ConfigGetter<DocumenttypesConfig>:: - getConfig("config-doctypes", FileSpec("../config-doctypes.cfg")))); - _html_type = _repo->getDocumentType("text/html"); createLinks(); } @@ -241,4 +244,31 @@ TEST_F(UpdateOperationTest, inconsistent_create_if_missing_updates_picks_largest EXPECT_EQ(2, m.diverging_timestamp_updates.getValue()); } +// Note: we don't exhaustively test cancellation edges here, as we assume that Put/Update/Remove ops +// share the same underlying PersistenceMessageTracker logic. See PutOperationTest for more tests. + +TEST_F(UpdateOperationTest, cancelled_nodes_are_not_updated_in_db) { + setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3"); + + std::shared_ptr<UpdateOperation> op = sendUpdate("0=1/2/3,1=1/2/3,2=1/2/3"); + DistributorMessageSenderStub sender; + op->start(sender); + + ASSERT_EQ("Update => 0,Update => 1,Update => 2", sender.getCommands(true)); + + // Simulate nodes 0 and 2 going down + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bId), {0, 2}); + // Cancelling shall be cumulative + op->cancel(_sender, CancelScope::of_node_subset({0})); + op->cancel(_sender, CancelScope::of_node_subset({2})); + + replyToMessage(*op, sender, 0, 120); + replyToMessage(*op, sender, 1, 120); + replyToMessage(*op, sender, 2, 120); + + EXPECT_EQ("BucketId(0x400000000000cac4) : " + "node(idx=1,crc=0x2,docs=4/4,bytes=6/6,trusted=true,active=false,ready=false)", + dumpBucket(_bId)); +} + } diff --git a/storage/src/vespa/storage/bucketdb/bucketinfo.cpp b/storage/src/vespa/storage/bucketdb/bucketinfo.cpp index a8c21efa793..d2ff7b53403 100644 --- a/storage/src/vespa/storage/bucketdb/bucketinfo.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketinfo.cpp @@ -142,7 +142,7 @@ BucketInfo::addNode(const BucketCopy& newCopy, const std::vector<uint16_t>& reco bool BucketInfo::removeNode(unsigned short node, TrustedUpdate update) { - for (auto iter = _nodes.begin(); iter != _nodes.end(); iter++) { + for (auto iter = _nodes.begin(); iter != _nodes.end(); ++iter) { if (iter->getNode() == node) { _nodes.erase(iter); if (update == TrustedUpdate::UPDATE) { diff --git a/storage/src/vespa/storage/bucketdb/bucketinfo.hpp b/storage/src/vespa/storage/bucketdb/bucketinfo.hpp index f8dbff38a99..a8a1069d587 100644 --- a/storage/src/vespa/storage/bucketdb/bucketinfo.hpp +++ b/storage/src/vespa/storage/bucketdb/bucketinfo.hpp @@ -159,7 +159,7 @@ BucketInfoBase<NodeSeq>::getNodes() const noexcept { } template <typename NodeSeq> -BucketInfoBase<NodeSeq>::Highest +typename BucketInfoBase<NodeSeq>::Highest BucketInfoBase<NodeSeq>::getHighest() const noexcept { Highest highest; for (const auto & n : _nodes) { diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt index 184fee5d2c9..c889afcc77c 100644 --- a/storage/src/vespa/storage/distributor/CMakeLists.txt +++ b/storage/src/vespa/storage/distributor/CMakeLists.txt @@ -10,6 +10,7 @@ vespa_add_library(storage_distributor OBJECT bucket_spaces_stats_provider.cpp bucketgctimecalculator.cpp bucketlistmerger.cpp + cancelled_replicas_pruner.cpp clusterinformation.cpp crypto_uuid_generator.cpp distributor_bucket_space.cpp diff --git a/storage/src/vespa/storage/distributor/activecopy.cpp b/storage/src/vespa/storage/distributor/activecopy.cpp index 4e3ef4f88ee..4c35d42a0e7 100644 --- a/storage/src/vespa/storage/distributor/activecopy.cpp +++ b/storage/src/vespa/storage/distributor/activecopy.cpp @@ -5,6 +5,7 @@ #include <vespa/vespalib/stllike/asciistream.h> #include <algorithm> #include <cassert> +#include <ostream> namespace std { diff --git a/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.cpp b/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.cpp new file mode 100644 index 00000000000..f453a722d2c --- /dev/null +++ b/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.cpp @@ -0,0 +1,22 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "cancelled_replicas_pruner.h" + +namespace storage::distributor { + +std::vector<BucketCopy> prune_cancelled_nodes(std::span<const BucketCopy> replicas, const CancelScope& cancel_scope) { + if (cancel_scope.fully_cancelled()) { + return {}; + } + std::vector<BucketCopy> pruned_replicas; + // Expect that there will be an input entry for each cancelled node in the common case. + pruned_replicas.reserve((replicas.size() >= cancel_scope.cancelled_nodes().size()) + ? replicas.size() - cancel_scope.cancelled_nodes().size() : 0); + for (auto& candidate : replicas) { + if (!cancel_scope.node_is_cancelled(candidate.getNode())) { + pruned_replicas.emplace_back(candidate); + } + } + return pruned_replicas; +} + +} diff --git a/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.h b/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.h new file mode 100644 index 00000000000..f12f78e569f --- /dev/null +++ b/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.h @@ -0,0 +1,17 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/storage/bucketdb/bucketcopy.h> +#include <vespa/storage/distributor/operations/cancel_scope.h> +#include <span> +#include <vector> + +namespace storage::distributor { + +/** + * Returns a new vector that contains all entries of `replicas` whose nodes are _not_ tagged as + * cancelled in `cancel_scope`. Returned entry ordering is identical to input ordering. + */ +[[nodiscard]] std::vector<BucketCopy> prune_cancelled_nodes(std::span<const BucketCopy> replicas, const CancelScope& cancel_scope); + +} diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index 243b3c5ecb2..ad1cce46bea 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -281,6 +281,7 @@ DistributorStripe::enableClusterStateBundle(const lib::ClusterStateBundle& state enterRecoveryMode(); // Clear all active messages on nodes that are down. + // TODO this should also be done on nodes that are no longer part of the config! const uint16_t old_node_count = oldState.getBaselineClusterState()->getNodeCount(lib::NodeType::STORAGE); const uint16_t new_node_count = baseline_state.getNodeCount(lib::NodeType::STORAGE); for (uint16_t i = 0; i < std::max(old_node_count, new_node_count); ++i) { diff --git a/storage/src/vespa/storage/distributor/operationowner.cpp b/storage/src/vespa/storage/distributor/operationowner.cpp index 7b7c9f431f7..c92544c8cb5 100644 --- a/storage/src/vespa/storage/distributor/operationowner.cpp +++ b/storage/src/vespa/storage/distributor/operationowner.cpp @@ -73,7 +73,7 @@ OperationOwner::onClose() void OperationOwner::erase(api::StorageMessage::Id msgId) { - _sentMessageMap.pop(msgId); + (void)_sentMessageMap.pop(msgId); } } diff --git a/storage/src/vespa/storage/distributor/operations/CMakeLists.txt b/storage/src/vespa/storage/distributor/operations/CMakeLists.txt index 5c6a1f3d84c..8cf0470f674 100644 --- a/storage/src/vespa/storage/distributor/operations/CMakeLists.txt +++ b/storage/src/vespa/storage/distributor/operations/CMakeLists.txt @@ -1,6 +1,7 @@ # Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(storage_distributoroperation OBJECT SOURCES + cancel_scope.cpp operation.cpp DEPENDS ) diff --git a/storage/src/vespa/storage/distributor/operations/cancel_scope.cpp b/storage/src/vespa/storage/distributor/operations/cancel_scope.cpp new file mode 100644 index 00000000000..af62b369517 --- /dev/null +++ b/storage/src/vespa/storage/distributor/operations/cancel_scope.cpp @@ -0,0 +1,52 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "cancel_scope.h" + +namespace storage::distributor { + +CancelScope::CancelScope() + : _cancelled_nodes(), + _fully_cancelled(false) +{ +} + +CancelScope::CancelScope(fully_cancelled_ctor_tag) noexcept + : _cancelled_nodes(), + _fully_cancelled(true) +{ +} + +CancelScope::CancelScope(CancelledNodeSet nodes) noexcept + : _cancelled_nodes(std::move(nodes)), + _fully_cancelled(false) +{ +} + +CancelScope::~CancelScope() = default; + +CancelScope::CancelScope(const CancelScope&) = default; +CancelScope& CancelScope::operator=(const CancelScope&) = default; + +CancelScope::CancelScope(CancelScope&&) noexcept = default; +CancelScope& CancelScope::operator=(CancelScope&&) noexcept = default; + +void CancelScope::add_cancelled_node(uint16_t node) { + _cancelled_nodes.insert(node); +} + +void CancelScope::merge(const CancelScope& other) { + _fully_cancelled |= other._fully_cancelled; + // Not using iterator insert(first, last) since that explicitly resizes, + for (uint16_t node : other._cancelled_nodes) { + _cancelled_nodes.insert(node); + } +} + +CancelScope CancelScope::of_fully_cancelled() noexcept { + return CancelScope(fully_cancelled_ctor_tag{}); +} + +CancelScope CancelScope::of_node_subset(CancelledNodeSet nodes) noexcept { + return CancelScope(std::move(nodes)); +} + +} diff --git a/storage/src/vespa/storage/distributor/operations/cancel_scope.h b/storage/src/vespa/storage/distributor/operations/cancel_scope.h new file mode 100644 index 00000000000..7619a64d39f --- /dev/null +++ b/storage/src/vespa/storage/distributor/operations/cancel_scope.h @@ -0,0 +1,62 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/vespalib/stllike/hash_set.h> + +namespace storage::distributor { + +/** + * In the face of concurrent cluster state changes, cluster topology reconfigurations etc., + * it's possible for there to be pending mutating operations to nodes that the distributor + * no longer should keep track of. Such operations must therefore be _cancelled_, either + * fully or partially. A CancelScope represents the granularity at which an operation should + * be cancelled. + * + * In the case of one or more nodes becoming unavailable, `fully_cancelled()` will be false + * and `node_is_cancelled(x)` will return whether node `x` is explicitly cancelled. + * + * In the case of ownership transfers, `fully_cancelled()` will be true since the distributor + * should no longer have any knowledge of the bucket. `node_is_cancelled(x)` is always + * implicitly true for all values of `x` for full cancellations. + */ +class CancelScope { +public: + using CancelledNodeSet = vespalib::hash_set<uint16_t>; +private: + CancelledNodeSet _cancelled_nodes; + bool _fully_cancelled; + + struct fully_cancelled_ctor_tag {}; + + explicit CancelScope(fully_cancelled_ctor_tag) noexcept; + explicit CancelScope(CancelledNodeSet nodes) noexcept; +public: + CancelScope(); + ~CancelScope(); + + CancelScope(const CancelScope&); + CancelScope& operator=(const CancelScope&); + + CancelScope(CancelScope&&) noexcept; + CancelScope& operator=(CancelScope&&) noexcept; + + void add_cancelled_node(uint16_t node); + void merge(const CancelScope& other); + + [[nodiscard]] bool fully_cancelled() const noexcept { return _fully_cancelled; } + [[nodiscard]] bool is_cancelled() const noexcept { + return (_fully_cancelled || !_cancelled_nodes.empty()); + } + [[nodiscard]] bool node_is_cancelled(uint16_t node) const noexcept { + return (fully_cancelled() || _cancelled_nodes.contains(node)); + } + + [[nodiscard]] const CancelledNodeSet& cancelled_nodes() const noexcept { + return _cancelled_nodes; + } + + static CancelScope of_fully_cancelled() noexcept; + static CancelScope of_node_subset(CancelledNodeSet nodes) noexcept; +}; + +} diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp index 0e12e3e3019..bd7f3709575 100644 --- a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp @@ -104,7 +104,12 @@ CheckCondition::handle_reply(DistributorStripeMessageSender& sender, } } -void CheckCondition::cancel(DistributorStripeMessageSender& sender) { +void CheckCondition::cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) { + IntermediateMessageSender proxy_sender(_sent_message_map, _cond_get_op, sender); + _cond_get_op->cancel(proxy_sender, cancel_scope); +} + +void CheckCondition::close(DistributorStripeMessageSender& sender) { IntermediateMessageSender proxy_sender(_sent_message_map, _cond_get_op, sender); _cond_get_op->onClose(proxy_sender); // We don't propagate any generated reply from the GetOperation, as its existence @@ -163,6 +168,12 @@ void CheckCondition::handle_internal_get_operation_reply(std::shared_ptr<api::St reply->steal_trace()); return; } + if (_cond_get_op->is_cancelled()) { + _outcome.emplace(api::ReturnCode(api::ReturnCode::ABORTED, + "Operation has been cancelled (likely due to a cluster state change)"), + reply->steal_trace()); + return; + } auto state_version_now = _bucket_space.getClusterState().getVersion(); if (_bucket_space.has_pending_cluster_state()) { state_version_now = _bucket_space.get_pending_cluster_state().getVersion(); diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.h b/storage/src/vespa/storage/distributor/operations/external/check_condition.h index 999b79adc3d..92a8bc62ae6 100644 --- a/storage/src/vespa/storage/distributor/operations/external/check_condition.h +++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.h @@ -17,6 +17,7 @@ namespace storage::api { class StorageReply; } namespace storage::distributor { +class CancelScope; class DistributorBucketSpace; class DistributorNodeContext; class DistributorStripeMessageSender; @@ -122,7 +123,8 @@ public: void start_and_send(DistributorStripeMessageSender& sender); void handle_reply(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply>& reply); - void cancel(DistributorStripeMessageSender& sender); + void cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope); + void close(DistributorStripeMessageSender& sender); [[nodiscard]] std::optional<Outcome>& maybe_outcome() noexcept { return _outcome; diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp index 854e7d15f82..e7832fd19e5 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp @@ -109,6 +109,8 @@ PutOperation::sendPutToBucketOnNode(document::BucketSpace bucketSpace, const doc bool PutOperation::has_unavailable_targets_in_pending_state(const OperationTargetList& targets) const { // TODO handle this explicitly as part of operation abort/cancel edge + // -> we have yet to send anything at this point + // -> shouldn't ExternalOperationHandler deal with this before starting the op? auto* pending_state = _op_ctx.pending_cluster_state_or_null(_msg->getBucket().getBucketSpace()); if (!pending_state) { return false; @@ -245,6 +247,15 @@ void PutOperation::start_direct_put_dispatch(DistributorStripeMessageSender& sen _msg = std::shared_ptr<api::PutCommand>(); } +void +PutOperation::on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) +{ + if (_check_condition) { + _check_condition->cancel(sender, cancel_scope); + } + _tracker.cancel(cancel_scope); +} + bool PutOperation::shouldImplicitlyActivateReplica(const OperationTargetList& targets) const { @@ -302,7 +313,7 @@ void PutOperation::onClose(DistributorStripeMessageSender& sender) { if (_check_condition) { - _check_condition->cancel(sender); + _check_condition->close(sender); } _tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down")); } diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h index 635accc1865..8b8e3e15375 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h @@ -60,6 +60,8 @@ private: void sendPutToBucketOnNode(document::BucketSpace bucketSpace, const document::BucketId& bucketId, uint16_t node, std::vector<PersistenceMessageTracker::ToSend>& putBatch); + void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override; + [[nodiscard]] bool shouldImplicitlyActivateReplica(const OperationTargetList& targets) const; [[nodiscard]] bool has_unavailable_targets_in_pending_state(const OperationTargetList& targets) const; diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp index 42d8e318f47..be43aac3d9e 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp @@ -156,9 +156,21 @@ void RemoveOperation::on_completed_check_condition(CheckCondition::Outcome& outc void RemoveOperation::onClose(DistributorStripeMessageSender& sender) { + if (_check_condition) { + _check_condition->close(sender); + } _tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down")); } +void +RemoveOperation::on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) +{ + if (_check_condition) { + _check_condition->cancel(sender, cancel_scope); + } + _tracker.cancel(cancel_scope); +} + bool RemoveOperation::has_condition() const noexcept { return _msg->hasTestAndSetCondition(); } diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h index 9f3a98294ea..221def81fdc 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h @@ -29,6 +29,7 @@ public: void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override; void onClose(DistributorStripeMessageSender& sender) override; + void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override; private: PersistenceMessageTrackerImpl _tracker_instance; diff --git a/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp index a0b3f12f76b..1a8d1cb8f88 100644 --- a/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp @@ -4,6 +4,7 @@ #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/stat.h> #include <vespa/storage/distributor/distributor_bucket_space.h> +#include <sstream> #include <vespa/log/log.h> LOG_SETUP(".distributor.operations.external.stat_bucket"); diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp index 73c65f54b21..2d1c469d072 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp @@ -13,6 +13,7 @@ #include <vespa/storage/distributor/distributor_bucket_space_repo.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/vdslib/state/cluster_state_bundle.h> +#include <vespa/vespalib/stllike/hash_set.hpp> #include <cinttypes> #include <vespa/log/log.h> @@ -68,10 +69,8 @@ TwoPhaseUpdateOperation::stateToString(SendState state) noexcept case SendState::SINGLE_GET_SENT: return "SINGLE_GET_SENT"; case SendState::FULL_GETS_SENT: return "FULL_GETS_SENT"; case SendState::PUTS_SENT: return "PUTS_SENT"; - default: - assert(!"Unknown state"); - return ""; } + abort(); } void @@ -130,7 +129,7 @@ TwoPhaseUpdateOperation::get_bucket_database_entries() const } bool -TwoPhaseUpdateOperation::isFastPathPossible(const std::vector<BucketDatabase::Entry>& entries) const +TwoPhaseUpdateOperation::isFastPathPossible(const std::vector<BucketDatabase::Entry>& entries) { // Fast path iff bucket exists AND is consistent (split and copies). if (entries.size() != 1) { @@ -245,6 +244,16 @@ TwoPhaseUpdateOperation::sendLostOwnershipTransientErrorReply(DistributorStripeM } void +TwoPhaseUpdateOperation::send_operation_cancelled_reply(DistributorStripeMessageSender& sender) +{ + sendReplyWithResult(sender, + api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND, + "The update operation was cancelled due to a cluster state change " + "between executing the read and write phases of a write-repair " + "update")); +} + +void TwoPhaseUpdateOperation::send_feed_blocked_error_reply(DistributorStripeMessageSender& sender) { sendReplyWithResult(sender, @@ -257,7 +266,8 @@ void TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(std::shared_ptr<document::Document> doc, api::Timestamp putTimestamp, DistributorStripeMessageSender& sender) { - if (lostBucketOwnershipBetweenPhases()) { + assert(!is_cancelled()); + if (lostBucketOwnershipBetweenPhases()) { // TODO deprecate with cancellation sendLostOwnershipTransientErrorReply(sender); return; } @@ -281,6 +291,8 @@ TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(std::shared_ptr<documen void TwoPhaseUpdateOperation::onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply>& msg) { + // In the case of cancellations, we let existing operations complete, but must not + // start new ones that are unaware of the cancellations. if (_mode == Mode::FAST_PATH) { handleFastPathReceive(sender, msg); } else { @@ -304,7 +316,10 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorStripeMessageSender& s sendReplyWithResult(sender, getReply.getResult()); return; } - + if (is_cancelled()) { + send_operation_cancelled_reply(sender); + return; + } if (!getReply.getDocument().get()) { // Weird, document is no longer there ... Just fail. sendReplyWithResult(sender, api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, "")); @@ -316,7 +331,7 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorStripeMessageSender& s std::shared_ptr<Operation> callback = _sentMessageMap.pop(msg->getMsgId()); assert(callback.get()); - Operation & callbackOp = *callback; + Operation& callbackOp = *callback; IntermediateMessageSender intermediate(_sentMessageMap, std::move(callback), sender); callbackOp.receive(intermediate, msg); @@ -326,12 +341,12 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorStripeMessageSender& s addTraceFromReply(*intermediate._reply); auto& cb = dynamic_cast<UpdateOperation&>(callbackOp); - std::pair<document::BucketId, uint16_t> bestNode = cb.getNewestTimestampLocation(); + auto [newest_bucket, newest_node] = cb.getNewestTimestampLocation(); auto intermediate_update_reply = std::dynamic_pointer_cast<api::UpdateReply>(intermediate._reply); assert(intermediate_update_reply); if (!intermediate_update_reply->getResult().success() || - bestNode.first == document::BucketId(0)) + (newest_bucket == document::BucketId(0))) { if (intermediate_update_reply->getResult().success() && (intermediate_update_reply->getOldTimestamp() == 0)) @@ -343,9 +358,14 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorStripeMessageSender& s } else { LOG(debug, "Update(%s) fast path: was inconsistent!", update_doc_id().c_str()); + if (is_cancelled()) { + send_operation_cancelled_reply(sender); + return; + } + _updateReply = std::move(intermediate_update_reply); - _fast_path_repair_source_node = bestNode.second; - document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), bestNode.first); + _fast_path_repair_source_node = newest_node; + document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), newest_bucket); auto cmd = std::make_shared<api::GetCommand>(bucket, _updateCmd->getDocumentId(), document::AllFields::NAME); copyMessageSettings(*_updateCmd, *cmd); @@ -383,7 +403,7 @@ TwoPhaseUpdateOperation::handleSafePathReceive(DistributorStripeMessageSender& s callbackOp.receive(intermediate, msg); if (!intermediate._reply.get()) { - return; // Not enough replies received yet or we're draining callbacks. + return; // Not enough replies received yet, or we're draining callbacks. } addTraceFromReply(*intermediate._reply); if (_sendState == SendState::METADATA_GETS_SENT) { @@ -445,6 +465,13 @@ void TwoPhaseUpdateOperation::handle_safe_path_received_metadata_get( "One or more metadata Get operations failed; aborting Update")); return; } + if (is_cancelled()) { + send_operation_cancelled_reply(sender); + return; + } + // Replicas _removed_ is handled by cancellation, but a concurrent state change may happen + // that _adds_ one or more available content nodes, which we cannot then blindly write to. + // So we have to explicitly check this edge case. if (!replica_set_unchanged_after_get_operation()) { // Use BUCKET_NOT_FOUND to trigger a silent retry. LOG(debug, "Update(%s): replica set has changed after metadata get phase", update_doc_id().c_str()); @@ -490,6 +517,10 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorStripeMessageSende sendReplyWithResult(sender, reply.getResult()); return; } + if (is_cancelled()) { + send_operation_cancelled_reply(sender); + return; + } // Single Get could technically be considered consistent with itself, so make // sure we never treat that as sufficient for restarting in the fast path. if ((_sendState != SendState::SINGLE_GET_SENT) && may_restart_with_fast_path(reply)) { @@ -558,7 +589,8 @@ bool TwoPhaseUpdateOperation::replica_set_unchanged_after_get_operation() const void TwoPhaseUpdateOperation::restart_with_fast_path_due_to_consistent_get_timestamps(DistributorStripeMessageSender& sender) { LOG(debug, "Update(%s): all Gets returned in initial safe path were consistent, restarting in fast path mode", update_doc_id().c_str()); - if (lostBucketOwnershipBetweenPhases()) { + assert(!is_cancelled()); + if (lostBucketOwnershipBetweenPhases()) { // TODO remove once cancellation is wired sendLostOwnershipTransientErrorReply(sender); return; } @@ -579,7 +611,7 @@ TwoPhaseUpdateOperation::processAndMatchTasCondition(DistributorStripeMessageSen std::unique_ptr<document::select::Node> selection; try { - selection = _parser.parse_selection(_updateCmd->getCondition().getSelection()); + selection = _parser.parse_selection(_updateCmd->getCondition().getSelection()); } catch (const document::select::ParsingFailedException & e) { sendReplyWithResult(sender, api::ReturnCode( api::ReturnCode::ILLEGAL_PARAMETERS, @@ -679,6 +711,22 @@ TwoPhaseUpdateOperation::onClose(DistributorStripeMessageSender& sender) { } } +void +TwoPhaseUpdateOperation::on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) { + // We have to explicitly cancel any and all pending Operation instances that have been + // launched by this operation. This is to ensure any DB updates they may transitively + // perform are aware of all cancellations that have occurred. + // There may be many messages pending for any given operation, so unique-ify them prior + // to avoid duplicate cancellation invocations. + vespalib::hash_set<Operation*> ops; + for (auto& msg_op : _sentMessageMap) { + ops.insert(msg_op.second.get()); + } + for (auto* op : ops) { + op->cancel(sender, cancel_scope); + } +} + vespalib::string TwoPhaseUpdateOperation::update_doc_id() const { assert(_updateCmd.get() != nullptr); return _updateCmd->getDocumentId().toString(); diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h index d2ad5359fa6..7f64bb8d56c 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h @@ -71,6 +71,8 @@ public: void onClose(DistributorStripeMessageSender& sender) override; + void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override; + private: enum class SendState { NONE_SENT, @@ -94,19 +96,20 @@ private: void sendReplyWithResult(DistributorStripeMessageSender&, const api::ReturnCode&); void ensureUpdateReplyCreated(); - std::vector<BucketDatabase::Entry> get_bucket_database_entries() const; - bool isFastPathPossible(const std::vector<BucketDatabase::Entry>& entries) const; + [[nodiscard]] std::vector<BucketDatabase::Entry> get_bucket_database_entries() const; + [[nodiscard]] static bool isFastPathPossible(const std::vector<BucketDatabase::Entry>& entries); void startFastPathUpdate(DistributorStripeMessageSender& sender, std::vector<BucketDatabase::Entry> entries); void startSafePathUpdate(DistributorStripeMessageSender&); - bool lostBucketOwnershipBetweenPhases() const; + [[nodiscard]] bool lostBucketOwnershipBetweenPhases() const; void sendLostOwnershipTransientErrorReply(DistributorStripeMessageSender&); + void send_operation_cancelled_reply(DistributorStripeMessageSender& sender); void send_feed_blocked_error_reply(DistributorStripeMessageSender& sender); void schedulePutsWithUpdatedDocument( std::shared_ptr<document::Document>, api::Timestamp, DistributorStripeMessageSender&); void applyUpdateToDocument(document::Document&) const; - std::shared_ptr<document::Document> createBlankDocument() const; + [[nodiscard]] std::shared_ptr<document::Document> createBlankDocument() const; void setUpdatedForTimestamp(api::Timestamp); void handleFastPathReceive(DistributorStripeMessageSender&, const std::shared_ptr<api::StorageReply>&); @@ -120,20 +123,20 @@ private: void handle_safe_path_received_single_full_get(DistributorStripeMessageSender&, api::GetReply&); void handleSafePathReceivedGet(DistributorStripeMessageSender&, api::GetReply&); void handleSafePathReceivedPut(DistributorStripeMessageSender&, const api::PutReply&); - bool shouldCreateIfNonExistent() const; + [[nodiscard]] bool shouldCreateIfNonExistent() const; bool processAndMatchTasCondition( DistributorStripeMessageSender& sender, const document::Document& candidateDoc); - bool satisfiesUpdateTimestampConstraint(api::Timestamp) const; + [[nodiscard]] bool satisfiesUpdateTimestampConstraint(api::Timestamp) const; void addTraceFromReply(api::StorageReply& reply); - bool hasTasCondition() const noexcept; + [[nodiscard]] bool hasTasCondition() const noexcept; void replyWithTasFailure(DistributorStripeMessageSender& sender, vespalib::stringref message); bool may_restart_with_fast_path(const api::GetReply& reply); - bool replica_set_unchanged_after_get_operation() const; + [[nodiscard]] bool replica_set_unchanged_after_get_operation() const; void restart_with_fast_path_due_to_consistent_get_timestamps(DistributorStripeMessageSender& sender); // Precondition: reply has not yet been sent. - vespalib::string update_doc_id() const; + [[nodiscard]] vespalib::string update_doc_id() const; using ReplicaState = std::vector<std::pair<document::BucketId, uint16_t>>; diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp index f43a6092372..60bddebbb89 100644 --- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp @@ -207,6 +207,13 @@ UpdateOperation::onClose(DistributorStripeMessageSender& sender) _tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down")); } +void +UpdateOperation::on_cancel(DistributorStripeMessageSender&, const CancelScope& cancel_scope) +{ + _tracker.cancel(cancel_scope); +} + + // The backend behavior of "create-if-missing" updates is to return the timestamp of the // _new_ update operation if the document was created from scratch. The two-phase update // operation logic auto-detects unexpected inconsistencies and tries to reconcile diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h index 96fd878a324..7d2131d426d 100644 --- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h @@ -31,6 +31,7 @@ public: std::string getStatus() const override { return ""; }; void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) override; void onClose(DistributorStripeMessageSender& sender) override; + void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override; std::pair<document::BucketId, uint16_t> getNewestTimestampLocation() const { return _newestTimestampLocation; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp index 2e6d0e95ec9..bf64fa2eb82 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "garbagecollectionoperation.h" +#include <vespa/storage/distributor/cancelled_replicas_pruner.h> #include <vespa/storage/distributor/idealstatemanager.h> #include <vespa/storage/distributor/idealstatemetricsset.h> #include <vespa/storage/distributor/top_level_distributor.h> @@ -22,6 +23,7 @@ GarbageCollectionOperation::GarbageCollectionOperation(const ClusterContext& clu _cluster_state_version_at_phase1_start_time(0), _remove_candidates(), _replica_info(), + _cancel_scope(), _max_documents_removed(0), _is_done(false) {} @@ -148,6 +150,10 @@ GarbageCollectionOperation::onReceive(DistributorStripeMessageSender& sender, } } +void GarbageCollectionOperation::on_cancel(DistributorStripeMessageSender&, const CancelScope& cancel_scope) { + _cancel_scope.merge(cancel_scope); +} + void GarbageCollectionOperation::update_replica_response_info_from_reply(uint16_t from_node, const api::RemoveLocationReply& reply) { _replica_info.emplace_back(_manager->operation_context().generate_unique_timestamp(), from_node, reply.getBucketInfo()); @@ -186,6 +192,11 @@ bool GarbageCollectionOperation::may_start_write_phase() const { if (!_ok) { return false; // Already broken, no reason to proceed. } + if (is_cancelled()) { + LOG(debug, "GC(%s): not sending write phase; operation has been explicitly cancelled", + getBucket().toString().c_str()); + return false; + } const auto state_version_now = _bucketSpace->getClusterState().getVersion(); if ((state_version_now != _cluster_state_version_at_phase1_start_time) || _bucketSpace->has_pending_cluster_state()) @@ -250,9 +261,17 @@ void GarbageCollectionOperation::update_last_gc_timestamp_in_db() { } void GarbageCollectionOperation::merge_received_bucket_info_into_db() { - // TODO avoid two separate DB ops for this. Current API currently does not make this elegant. - _manager->operation_context().update_bucket_database(getBucket(), _replica_info); - update_last_gc_timestamp_in_db(); + if (_cancel_scope.is_cancelled()) { + if (_cancel_scope.fully_cancelled()) { + return; + } + _replica_info = prune_cancelled_nodes(_replica_info, _cancel_scope); + } + if (!_replica_info.empty()) { + // TODO avoid two separate DB ops for this. Current API currently does not make this elegant. + _manager->operation_context().update_bucket_database(getBucket(), _replica_info); + update_last_gc_timestamp_in_db(); + } // else: effectively fully cancelled, no touching the DB. } void GarbageCollectionOperation::update_gc_metrics() { diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h index 27dc519dcc2..97efbe694de 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h @@ -3,10 +3,11 @@ #include "idealstateoperation.h" #include <vespa/document/base/documentid.h> +#include <vespa/persistence/spi/id_and_timestamp.h> #include <vespa/storage/bucketdb/bucketcopy.h> #include <vespa/storage/distributor/messagetracker.h> #include <vespa/storage/distributor/operation_sequencer.h> -#include <vespa/persistence/spi/id_and_timestamp.h> +#include <vespa/storage/distributor/operations/cancel_scope.h> #include <vespa/vespalib/stllike/hash_map.h> #include <vector> @@ -22,13 +23,14 @@ public: void onStart(DistributorStripeMessageSender& sender) override; void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override; + void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override; const char* getName() const noexcept override { return "garbagecollection"; }; Type getType() const noexcept override { return GARBAGE_COLLECTION; } bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override; - bool is_two_phase() const noexcept { + [[nodiscard]] bool is_two_phase() const noexcept { return ((_phase == Phase::ReadMetadataPhase) || (_phase == Phase::WriteRemovesPhase)); } - bool is_done() const noexcept { return _is_done; } + [[nodiscard]] bool is_done() const noexcept { return _is_done; } protected: MessageTracker _tracker; @@ -54,13 +56,14 @@ private: RemoveCandidates _remove_candidates; std::vector<SequencingHandle> _gc_write_locks; std::vector<BucketCopy> _replica_info; + CancelScope _cancel_scope; uint32_t _max_documents_removed; bool _is_done; static RemoveCandidates steal_selection_matches_as_candidates(api::RemoveLocationReply& reply); void send_current_phase_remove_locations(DistributorStripeMessageSender& sender); - std::vector<spi::IdAndTimestamp> compile_phase_two_send_set() const; + [[nodiscard]] std::vector<spi::IdAndTimestamp> compile_phase_two_send_set() const; void handle_ok_legacy_reply(uint16_t from_node, const api::RemoveLocationReply& reply); void handle_ok_phase1_reply(api::RemoveLocationReply& reply); diff --git a/storage/src/vespa/storage/distributor/operations/operation.cpp b/storage/src/vespa/storage/distributor/operations/operation.cpp index 4d82de170ae..9f944a94178 100644 --- a/storage/src/vespa/storage/distributor/operations/operation.cpp +++ b/storage/src/vespa/storage/distributor/operations/operation.cpp @@ -12,7 +12,8 @@ LOG_SETUP(".distributor.callback"); namespace storage::distributor { Operation::Operation() - : _startTime() + : _startTime(), + _cancelled(false) { } @@ -45,6 +46,11 @@ Operation::copyMessageSettings(const api::StorageCommand& source, api::StorageCo target.setPriority(source.getPriority()); } +void Operation::cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) { + _cancelled = true; + on_cancel(sender, cancel_scope); +} + void Operation::on_blocked() { diff --git a/storage/src/vespa/storage/distributor/operations/operation.h b/storage/src/vespa/storage/distributor/operations/operation.h index bc7e510a5b6..64caacfc642 100644 --- a/storage/src/vespa/storage/distributor/operations/operation.h +++ b/storage/src/vespa/storage/distributor/operations/operation.h @@ -16,6 +16,7 @@ class StorageComponent; namespace distributor { +class CancelScope; class DistributorStripeOperationContext; class PendingMessageTracker; class OperationSequencer; @@ -40,7 +41,7 @@ public: on the owner of the message that was replied to. */ virtual void receive(DistributorStripeMessageSender& sender, - const std::shared_ptr<api::StorageReply> & msg) + const std::shared_ptr<api::StorageReply> & msg) { onReceive(sender, msg); } @@ -60,6 +61,22 @@ public: void start(DistributorStripeMessageSender& sender); /** + * Explicitly cancel the operation. Cancelled operations may or may not (depending on + * the operation implementation) be immediately aborted, but they should either way + * never insert any bucket information _for cancelled nodes_ into the bucket DB after + * cancel() has been called. + */ + void cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope); + + /** + * Whether cancel() has been invoked at least once on this instance. This does not + * distinguish between cancellations caused by ownership transfers and those caused + * by nodes becoming unavailable; Operation implementations that care about this need + * to implement cancel() themselves and inspect the provided CancelScope. + */ + [[nodiscard]] bool is_cancelled() const noexcept { return _cancelled; } + + /** * Returns true if we are blocked to start this operation given * the pending messages. */ @@ -93,8 +110,15 @@ private: const std::shared_ptr<api::StorageReply> & msg) = 0; protected: + virtual void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) { + (void)sender; + (void)cancel_scope; + } + static constexpr vespalib::duration MAX_TIMEOUT = 3600s; + vespalib::system_time _startTime; + bool _cancelled; }; } diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp index a4295613fd2..498f3a5feab 100644 --- a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp +++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp @@ -1,10 +1,12 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "persistencemessagetracker.h" +#include "cancelled_replicas_pruner.h" #include "distributor_bucket_space_repo.h" #include "distributor_bucket_space.h" #include <vespa/vdslib/distribution/distribution.h> #include <vespa/storageapi/message/persistence.h> +#include <algorithm> #include <vespa/log/log.h> LOG_SETUP(".persistencemessagetracker"); @@ -18,12 +20,15 @@ PersistenceMessageTrackerImpl::PersistenceMessageTrackerImpl( DistributorStripeOperationContext& op_ctx, api::Timestamp revertTimestamp) : MessageTracker(node_ctx), + _remapBucketInfo(), + _bucketInfo(), _metric(metric), _reply(std::move(reply)), _op_ctx(op_ctx), _revertTimestamp(revertTimestamp), _trace(_reply->getTrace().getLevel()), _requestTimer(node_ctx.clock()), + _cancel_scope(), _n_persistence_replies_total(0), _n_successful_persistence_replies(0), _priority(_reply->getPriority()), @@ -34,8 +39,32 @@ PersistenceMessageTrackerImpl::PersistenceMessageTrackerImpl( PersistenceMessageTrackerImpl::~PersistenceMessageTrackerImpl() = default; void +PersistenceMessageTrackerImpl::cancel(const CancelScope& cancel_scope) +{ + _cancel_scope.merge(cancel_scope); +} + +void +PersistenceMessageTrackerImpl::prune_cancelled_nodes_if_present( + BucketInfoMap& bucket_and_replicas, + const CancelScope& cancel_scope) +{ + for (auto& info : bucket_and_replicas) { + info.second = prune_cancelled_nodes(info.second, cancel_scope); + } +} + +void PersistenceMessageTrackerImpl::updateDB() { + if (_cancel_scope.is_cancelled()) { + if (_cancel_scope.fully_cancelled()) { + return; // Fully cancelled ops cannot mutate the DB at all + } + prune_cancelled_nodes_if_present(_bucketInfo, _cancel_scope); + prune_cancelled_nodes_if_present(_remapBucketInfo, _cancel_scope); + } + for (const auto & entry : _bucketInfo) { _op_ctx.update_bucket_database(entry.first, entry.second); } @@ -229,12 +258,19 @@ PersistenceMessageTrackerImpl::updateFailureResult(const api::BucketInfoReply& r _success = false; } +bool +PersistenceMessageTrackerImpl::node_is_effectively_cancelled(uint16_t node) const noexcept +{ + return _cancel_scope.node_is_cancelled(node); // Implicitly covers the fully cancelled case +} + void PersistenceMessageTrackerImpl::handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node) { LOG(spam, "Received CreateBucket reply for %s from node %u", reply.getBucketId().toString().c_str(), node); if (!reply.getResult().success() - && reply.getResult().getResult() != api::ReturnCode::EXISTS) + && (reply.getResult().getResult() != api::ReturnCode::EXISTS) + && !node_is_effectively_cancelled(node)) { LOG(spam, "Create bucket reply failed, so deleting it from bucket db"); // We don't know if the bucket exists at this point, so we remove it from the DB. diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.h b/storage/src/vespa/storage/distributor/persistencemessagetracker.h index 9b06547dd98..8c44d70062c 100644 --- a/storage/src/vespa/storage/distributor/persistencemessagetracker.h +++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.h @@ -4,6 +4,7 @@ #include "distributor_stripe_component.h" #include "distributormetricsset.h" #include "messagetracker.h" +#include <vespa/storage/distributor/operations/cancel_scope.h> #include <vespa/storageframework/generic/clock/timer.h> #include <vespa/storageapi/messageapi/bucketinfocommand.h> #include <vespa/storageapi/messageapi/bucketinforeply.h> @@ -14,6 +15,7 @@ struct PersistenceMessageTracker { virtual ~PersistenceMessageTracker() = default; using ToSend = MessageTracker::ToSend; + virtual void cancel(const CancelScope& cancel_scope) = 0; virtual void fail(MessageSender&, const api::ReturnCode&) = 0; virtual void queueMessageBatch(std::vector<ToSend> messages) = 0; virtual uint16_t receiveReply(MessageSender&, api::BucketInfoReply&) = 0; @@ -26,14 +28,9 @@ struct PersistenceMessageTracker { }; class PersistenceMessageTrackerImpl final - : public PersistenceMessageTracker, - public MessageTracker + : public PersistenceMessageTracker, + public MessageTracker { -private: - using BucketInfoMap = std::map<document::Bucket, std::vector<BucketCopy>>; - BucketInfoMap _remapBucketInfo; - BucketInfoMap _bucketInfo; - public: PersistenceMessageTrackerImpl(PersistenceOperationMetricSet& metric, std::shared_ptr<api::BucketInfoReply> reply, @@ -42,6 +39,8 @@ public: api::Timestamp revertTimestamp = 0); ~PersistenceMessageTrackerImpl() override; + void cancel(const CancelScope& cancel_scope) override; + void updateDB(); void updateMetrics(); [[nodiscard]] bool success() const noexcept { return _success; } @@ -67,8 +66,11 @@ public: void queueMessageBatch(std::vector<MessageTracker::ToSend> messages) override; private: - using MessageBatch = std::vector<uint64_t>; + using MessageBatch = std::vector<uint64_t>; + using BucketInfoMap = std::map<document::Bucket, std::vector<BucketCopy>>; + BucketInfoMap _remapBucketInfo; + BucketInfoMap _bucketInfo; std::vector<MessageBatch> _messageBatches; PersistenceOperationMetricSet& _metric; std::shared_ptr<api::BucketInfoReply> _reply; @@ -77,20 +79,24 @@ private: std::vector<BucketNodePair> _revertNodes; mbus::Trace _trace; framework::MilliSecTimer _requestTimer; + CancelScope _cancel_scope; uint32_t _n_persistence_replies_total; uint32_t _n_successful_persistence_replies; uint8_t _priority; bool _success; - bool canSendReplyEarly() const; + static void prune_cancelled_nodes_if_present(BucketInfoMap& bucket_and_replicas, + const CancelScope& cancel_scope); + [[nodiscard]] bool canSendReplyEarly() const; void addBucketInfoFromReply(uint16_t node, const api::BucketInfoReply& reply); void logSuccessfulReply(uint16_t node, const api::BucketInfoReply& reply) const; - bool hasSentReply() const noexcept { return !_reply; } - bool shouldRevert() const; - bool has_majority_successful_replies() const noexcept; - bool has_minority_test_and_set_failure() const noexcept; + [[nodiscard]] bool hasSentReply() const noexcept { return !_reply; } + [[nodiscard]] bool shouldRevert() const; + [[nodiscard]] bool has_majority_successful_replies() const noexcept; + [[nodiscard]] bool has_minority_test_and_set_failure() const noexcept; void sendReply(MessageSender& sender); void updateFailureResult(const api::BucketInfoReply& reply); + [[nodiscard]] bool node_is_effectively_cancelled(uint16_t node) const noexcept; void handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node); void handlePersistenceReply(api::BucketInfoReply& reply, uint16_t node); void transfer_trace_state_to_reply(); diff --git a/storage/src/vespa/storage/distributor/sentmessagemap.h b/storage/src/vespa/storage/distributor/sentmessagemap.h index 70bee311f78..951ed6a6877 100644 --- a/storage/src/vespa/storage/distributor/sentmessagemap.h +++ b/storage/src/vespa/storage/distributor/sentmessagemap.h @@ -10,19 +10,23 @@ class Operation; class SentMessageMap { public: + using Map = std::map<api::StorageMessage::Id, std::shared_ptr<Operation>>; + SentMessageMap(); ~SentMessageMap(); - std::shared_ptr<Operation> pop(api::StorageMessage::Id id); - std::shared_ptr<Operation> pop(); + [[nodiscard]] std::shared_ptr<Operation> pop(api::StorageMessage::Id id); + [[nodiscard]] std::shared_ptr<Operation> pop(); void insert(api::StorageMessage::Id id, const std::shared_ptr<Operation> & msg); void clear(); - uint32_t size() const { return _map.size(); } + [[nodiscard]] uint32_t size() const { return _map.size(); } [[nodiscard]] bool empty() const noexcept { return _map.empty(); } std::string toString() const; + + Map::const_iterator begin() const noexcept { return _map.cbegin(); } + Map::const_iterator end() const noexcept { return _map.cend(); } private: - using Map = std::map<api::StorageMessage::Id, std::shared_ptr<Operation>>; Map _map; }; diff --git a/storage/src/vespa/storage/distributor/statechecker.cpp b/storage/src/vespa/storage/distributor/statechecker.cpp index cd8b6e934d4..7b30be53c13 100644 --- a/storage/src/vespa/storage/distributor/statechecker.cpp +++ b/storage/src/vespa/storage/distributor/statechecker.cpp @@ -5,6 +5,7 @@ #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vdslib/state/clusterstate.h> #include <vespa/vespalib/stllike/hash_set_insert.hpp> +#include <sstream> #include <vespa/log/log.h> LOG_SETUP(".distributor.statechecker"); diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index dcc3602c5e7..36d2393e148 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -15,6 +15,7 @@ #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <algorithm> +#include <sstream> #include <vespa/log/log.h> LOG_SETUP(".persistence.mergehandler"); diff --git a/storage/src/vespa/storage/persistence/processallhandler.cpp b/storage/src/vespa/storage/persistence/processallhandler.cpp index c22b08c5ca5..a6f6bd5d3fe 100644 --- a/storage/src/vespa/storage/persistence/processallhandler.cpp +++ b/storage/src/vespa/storage/persistence/processallhandler.cpp @@ -8,6 +8,7 @@ #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/persistence/spi/docentry.h> #include <vespa/vespalib/util/stringfmt.h> +#include <sstream> #include <vespa/log/log.h> LOG_SETUP(".persistence.processall"); diff --git a/vespalib/src/tests/alloc/alloc_test.cpp b/vespalib/src/tests/alloc/alloc_test.cpp index 04e009fcf8a..f39543daa2d 100644 --- a/vespalib/src/tests/alloc/alloc_test.cpp +++ b/vespalib/src/tests/alloc/alloc_test.cpp @@ -1,11 +1,11 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/config.h> #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/util/alloc.h> #include <vespa/vespalib/util/memory_allocator.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/round_up_to_page_size.h> -#include <vespa/vespalib/util/sanitizers.h> #include <vespa/vespalib/util/size_literals.h> #include <cstddef> #include <sys/mman.h> @@ -216,12 +216,12 @@ TEST("auto alloced mmap alloc can not be extended if no room") { } /* - * The two following tests are disabled when address sanitizer is + * The two following tests are disabled when any sanitizer is * enabled since extra instrumentation code might trigger extra mmap * or munmap calls, breaking some of the assumptions in the disabled * tests. */ -#ifndef VESPA_USE_ADDRESS_SANITIZER +#ifndef VESPA_USE_SANITIZER TEST("mmap alloc can be extended if room") { Alloc dummy = Alloc::allocMMap(100); Alloc reserved = Alloc::allocMMap(100); diff --git a/vespalib/src/vespa/vespalib/util/mmap_file_allocator.cpp b/vespalib/src/vespa/vespalib/util/mmap_file_allocator.cpp index f711d3d8685..3d0b9debeda 100644 --- a/vespalib/src/vespa/vespalib/util/mmap_file_allocator.cpp +++ b/vespalib/src/vespa/vespalib/util/mmap_file_allocator.cpp @@ -57,7 +57,7 @@ MmapFileAllocator::alloc(size_t sz) const uint64_t offset = alloc_area(sz); void *buf = mmap(nullptr, sz, PROT_READ | PROT_WRITE, MAP_SHARED, _file.getFileDescriptor(), offset); if (buf == MAP_FAILED) { - throw IoException(fmt("Failed mmap(nullptr, %zu, PROT_READ | PROT_WRITE, MAP_SHARED, %s(fd=%d), %lu). Reason given by OS = '%s'", + throw IoException(fmt("Failed mmap(nullptr, %zu, PROT_READ | PROT_WRITE, MAP_SHARED, %s(fd=%d), %" PRIu64 "). Reason given by OS = '%s'", sz, _file.getFilename().c_str(), _file.getFileDescriptor(), offset, getLastErrorString().c_str()), IoException::getErrorType(errno), VESPA_STRLOC); } |