diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-08-17 14:55:28 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-08-21 14:26:36 +0200 |
commit | b1863768b512a7200496f8646fe239d3786d4443 (patch) | |
tree | dd21c88008f42506560b0e034769207d751f4cc3 /client | |
parent | 873350caf5e984b5a580e2e0585dfd521eb493c0 (diff) |
Support cluster discovery for all target types
Diffstat (limited to 'client')
26 files changed, 881 insertions, 533 deletions
diff --git a/client/go/internal/cli/cmd/curl.go b/client/go/internal/cli/cmd/curl.go index 3009cab2b5e..500a2a75663 100644 --- a/client/go/internal/cli/cmd/curl.go +++ b/client/go/internal/cli/cmd/curl.go @@ -39,7 +39,17 @@ $ vespa curl -- -v --data-urlencode "yql=select * from music where album contain if err != nil { return err } - service, err := target.Service(curlService, time.Duration(waitSecs)*time.Second, 0, cli.config.cluster()) + var service *vespa.Service + useDeploy := curlService == "deploy" + timeout := time.Duration(waitSecs) * time.Second + if useDeploy { + if cli.config.cluster() != "" { + return fmt.Errorf("cannot specify cluster for service %s", curlService) + } + service, err = target.DeployService(timeout) + } else { + service, err = cli.service(target, cli.config.cluster(), time.Duration(waitSecs)*time.Second) + } if err != nil { return err } @@ -49,19 +59,15 @@ $ vespa curl -- -v --data-urlencode "yql=select * from music where album contain if err != nil { return err } - switch curlService { - case vespa.DeployService: + if useDeploy { if err := addAccessToken(c, target); err != nil { return err } - case vespa.DocumentService, vespa.QueryService: + } else { c.CaCertificate = service.TLSOptions.CACertificateFile c.PrivateKey = service.TLSOptions.PrivateKeyFile c.Certificate = service.TLSOptions.CertificateFile - default: - return fmt.Errorf("service not found: %s", curlService) } - if dryRun { log.Print(c.String()) } else { @@ -73,7 +79,7 @@ $ vespa curl -- -v --data-urlencode "yql=select * from music where album contain }, } cmd.Flags().BoolVarP(&dryRun, "dry-run", "n", false, "Print the curl command that would be executed") - cmd.Flags().StringVarP(&curlService, "service", "s", "query", "Which service to query. Must be \"deploy\", \"document\" or \"query\"") + cmd.Flags().StringVarP(&curlService, "service", "s", "container", "Which service to query. Must be \"deploy\" or \"container\"") cli.bindWaitFlag(cmd, 60, &waitSecs) return cmd } diff --git a/client/go/internal/cli/cmd/curl_test.go b/client/go/internal/cli/cmd/curl_test.go index 3eca0726bb4..520cf41e308 100644 --- a/client/go/internal/cli/cmd/curl_test.go +++ b/client/go/internal/cli/cmd/curl_test.go @@ -14,7 +14,6 @@ func TestCurl(t *testing.T) { cli.Environment["VESPA_CLI_ENDPOINTS"] = "{\"endpoints\":[{\"cluster\":\"container\",\"url\":\"http://127.0.0.1:8080\"}]}" assert.Nil(t, cli.Run("config", "set", "application", "t1.a1.i1")) assert.Nil(t, cli.Run("config", "set", "target", "cloud")) - assert.Nil(t, cli.Run("config", "set", "cluster", "container")) assert.Nil(t, cli.Run("auth", "api-key")) assert.Nil(t, cli.Run("auth", "cert", "--no-add")) diff --git a/client/go/internal/cli/cmd/deploy.go b/client/go/internal/cli/cmd/deploy.go index ef32d7f01b7..09ad06da627 100644 --- a/client/go/internal/cli/cmd/deploy.go +++ b/client/go/internal/cli/cmd/deploy.go @@ -25,7 +25,7 @@ func newDeployCmd(cli *CLI) *cobra.Command { copyCert bool ) cmd := &cobra.Command{ - Use: "deploy [application-directory]", + Use: "deploy [application-directory-or-file]", Short: "Deploy (prepare and activate) an application package", Long: `Deploy (prepare and activate) an application package. @@ -88,14 +88,14 @@ $ vespa deploy -t cloud -z perf.aws-us-east-1c`, printPrepareLog(cli.Stderr, result) } if opts.Target.IsCloud() { - log.Printf("\nUse %s for deployment status, or follow this deployment at", color.CyanString("vespa status")) + log.Printf("\nUse %s for deployment status, or follow this deployment at", color.CyanString("vespa status deployment")) log.Print(color.CyanString(fmt.Sprintf("%s/tenant/%s/application/%s/%s/instance/%s/job/%s-%s/run/%d", opts.Target.Deployment().System.ConsoleURL, opts.Target.Deployment().Application.Tenant, opts.Target.Deployment().Application.Application, opts.Target.Deployment().Zone.Environment, opts.Target.Deployment().Application.Instance, opts.Target.Deployment().Zone.Environment, opts.Target.Deployment().Zone.Region, result.ID))) } - return waitForQueryService(cli, target, result.ID, timeout) + return waitForContainerServices(cli, target, result.ID, timeout) }, } cmd.Flags().StringVarP(&logLevelArg, "log-level", "l", "error", `Log level for Vespa logs. Must be "error", "warning", "info" or "debug"`) @@ -107,7 +107,7 @@ $ vespa deploy -t cloud -z perf.aws-us-east-1c`, func newPrepareCmd(cli *CLI) *cobra.Command { return &cobra.Command{ - Use: "prepare application-directory", + Use: "prepare [application-directory-or-file]", Short: "Prepare an application package for activation", Args: cobra.MaximumNArgs(1), DisableAutoGenTag: true, @@ -149,10 +149,6 @@ func newActivateCmd(cli *CLI) *cobra.Command { DisableAutoGenTag: true, SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { - pkg, err := cli.applicationPackageFrom(args, true) - if err != nil { - return fmt.Errorf("could not find application package: %w", err) - } sessionID, err := cli.config.readSessionID(vespa.DefaultApplication) if err != nil { return fmt.Errorf("could not read session id: %w", err) @@ -162,25 +158,36 @@ func newActivateCmd(cli *CLI) *cobra.Command { return err } timeout := time.Duration(waitSecs) * time.Second - opts := vespa.DeploymentOptions{ApplicationPackage: pkg, Target: target, Timeout: timeout} + opts := vespa.DeploymentOptions{Target: target, Timeout: timeout} err = vespa.Activate(sessionID, opts) if err != nil { return err } - cli.printSuccess("Activated ", color.CyanString(pkg.Path), " with session ", sessionID) - return waitForQueryService(cli, target, sessionID, timeout) + cli.printSuccess("Activated application with session ", sessionID) + return waitForContainerServices(cli, target, sessionID, timeout) }, } cli.bindWaitFlag(cmd, 60, &waitSecs) return cmd } -func waitForQueryService(cli *CLI, target vespa.Target, sessionOrRunID int64, timeout time.Duration) error { +func waitForContainerServices(cli *CLI, target vespa.Target, sessionOrRunID int64, timeout time.Duration) error { if timeout == 0 { return nil } - _, err := cli.service(target, vespa.QueryService, sessionOrRunID, cli.config.cluster(), timeout) - return err + if _, err := target.AwaitDeployment(sessionOrRunID, timeout); err != nil { + return err + } + services, err := cli.services(target, timeout) + if err != nil { + return err + } + for _, s := range services { + if err := s.Wait(timeout); err != nil { + return err + } + } + return nil } func printPrepareLog(stderr io.Writer, result vespa.PrepareResult) { diff --git a/client/go/internal/cli/cmd/deploy_test.go b/client/go/internal/cli/cmd/deploy_test.go index 16aa3fd0ed8..b72cf7bc714 100644 --- a/client/go/internal/cli/cmd/deploy_test.go +++ b/client/go/internal/cli/cmd/deploy_test.go @@ -194,7 +194,7 @@ func assertActivate(applicationPackage string, arguments []string, t *testing.T) } assert.Nil(t, cli.Run(arguments...)) assert.Equal(t, - "Success: Activated "+applicationPackage+" with session 42\n", + "Success: Activated application with session 42\n", stdout.String()) url := "http://127.0.0.1:19071/application/v2/tenant/default/session/42/active" assert.Equal(t, url, client.LastRequest.URL.String()) diff --git a/client/go/internal/cli/cmd/document.go b/client/go/internal/cli/cmd/document.go index c31f8c34d14..aacef9be825 100644 --- a/client/go/internal/cli/cmd/document.go +++ b/client/go/internal/cli/cmd/document.go @@ -298,7 +298,7 @@ func documentService(cli *CLI, waitSecs int) (*vespa.Service, error) { if err != nil { return nil, err } - return cli.service(target, vespa.DocumentService, 0, cli.config.cluster(), time.Duration(waitSecs)*time.Second) + return cli.service(target, cli.config.cluster(), time.Duration(waitSecs)*time.Second) } func printResult(cli *CLI, result util.OperationResult, payloadOnlyOnSuccess bool) error { diff --git a/client/go/internal/cli/cmd/document_test.go b/client/go/internal/cli/cmd/document_test.go index bce81da91c5..64319296299 100644 --- a/client/go/internal/cli/cmd/document_test.go +++ b/client/go/internal/cli/cmd/document_test.go @@ -79,7 +79,7 @@ func TestDocumentRemoveWithoutIdArgVerbose(t *testing.T) { func TestDocumentSendMissingId(t *testing.T) { cli, _, stderr := newTestCLI(t) - assert.NotNil(t, cli.Run("document", "put", "testdata/A-Head-Full-of-Dreams-Without-Operation.json")) + assert.NotNil(t, cli.Run("-t", "http://127.0.0.1:8080", "document", "put", "testdata/A-Head-Full-of-Dreams-Without-Operation.json")) assert.Equal(t, "Error: no document id given neither as argument or as a 'put', 'update' or 'remove' key in the JSON file\n", stderr.String()) @@ -87,7 +87,7 @@ func TestDocumentSendMissingId(t *testing.T) { func TestDocumentSendWithDisagreeingOperations(t *testing.T) { cli, _, stderr := newTestCLI(t) - assert.NotNil(t, cli.Run("document", "update", "testdata/A-Head-Full-of-Dreams-Put.json")) + assert.NotNil(t, cli.Run("-t", "http://127.0.0.1:8080", "document", "update", "testdata/A-Head-Full-of-Dreams-Put.json")) assert.Equal(t, "Error: wanted document operation is update, but JSON file specifies put\n", stderr.String()) @@ -110,20 +110,20 @@ func TestDocumentGet(t *testing.T) { "id:mynamespace:music::a-head-full-of-dreams", t) } -func assertDocumentSend(arguments []string, expectedOperation string, expectedMethod string, expectedDocumentId string, expectedPayloadFile string, t *testing.T) { +func assertDocumentSend(args []string, expectedOperation string, expectedMethod string, expectedDocumentId string, expectedPayloadFile string, t *testing.T) { + t.Helper() client := &mock.HTTPClient{} cli, stdout, stderr := newTestCLI(t) cli.httpClient = client - documentURL, err := documentServiceURL(client) - if err != nil { - t.Fatal(err) - } + documentURL := "http://127.0.0.1:8080" expectedPath, _ := vespa.IdToURLPath(expectedDocumentId) expectedURL := documentURL + "/document/v1/" + expectedPath + "?timeout=60000ms" - assert.Nil(t, cli.Run(arguments...)) + finalArgs := []string{"-t", documentURL} + finalArgs = append(finalArgs, args...) + assert.Nil(t, cli.Run(finalArgs...)) verbose := false - for _, a := range arguments { + for _, a := range args { if a == "-v" { verbose = true } @@ -154,16 +154,15 @@ func assertDocumentSend(arguments []string, expectedOperation string, expectedMe } } -func assertDocumentGet(arguments []string, documentId string, t *testing.T) { +func assertDocumentGet(args []string, documentId string, t *testing.T) { client := &mock.HTTPClient{} - documentURL, err := documentServiceURL(client) - if err != nil { - t.Fatal(err) - } + documentURL := "http://127.0.0.1:8080" client.NextResponseString(200, "{\"fields\":{\"foo\":\"bar\"}}") cli, stdout, _ := newTestCLI(t) cli.httpClient = client - assert.Nil(t, cli.Run(arguments...)) + finalArgs := []string{"-t", documentURL} + finalArgs = append(finalArgs, args...) + assert.Nil(t, cli.Run(finalArgs...)) assert.Equal(t, `{ "fields": { @@ -182,7 +181,7 @@ func assertDocumentTransportError(t *testing.T, errorMessage string) { client.NextResponseError(fmt.Errorf(errorMessage)) cli, _, stderr := newTestCLI(t) cli.httpClient = client - assert.NotNil(t, cli.Run("document", "put", + assert.NotNil(t, cli.Run("-t", "http://127.0.0.1:8080", "document", "put", "id:mynamespace:music::a-head-full-of-dreams", "testdata/A-Head-Full-of-Dreams-Put.json")) assert.Equal(t, @@ -195,7 +194,7 @@ func assertDocumentError(t *testing.T, status int, errorMessage string) { client.NextResponseString(status, errorMessage) cli, _, stderr := newTestCLI(t) cli.httpClient = client - assert.NotNil(t, cli.Run("document", "put", + assert.NotNil(t, cli.Run("-t", "http://127.0.0.1:8080", "document", "put", "id:mynamespace:music::a-head-full-of-dreams", "testdata/A-Head-Full-of-Dreams-Put.json")) assert.Equal(t, @@ -208,14 +207,10 @@ func assertDocumentServerError(t *testing.T, status int, errorMessage string) { client.NextResponseString(status, errorMessage) cli, _, stderr := newTestCLI(t) cli.httpClient = client - assert.NotNil(t, cli.Run("document", "put", + assert.NotNil(t, cli.Run("-t", "http://127.0.0.1:8080", "document", "put", "id:mynamespace:music::a-head-full-of-dreams", "testdata/A-Head-Full-of-Dreams-Put.json")) assert.Equal(t, - "Error: Container (document API) at http://127.0.0.1:8080: Status "+strconv.Itoa(status)+"\n\n"+errorMessage+"\n", + "Error: container at http://127.0.0.1:8080: Status "+strconv.Itoa(status)+"\n\n"+errorMessage+"\n", stderr.String()) } - -func documentServiceURL(client *mock.HTTPClient) (string, error) { - return "http://127.0.0.1:8080", nil -} diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index 7d4b9cc8042..8a2bdf04ce3 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -12,7 +12,6 @@ import ( "github.com/spf13/cobra" "github.com/vespa-engine/vespa/client/go/internal/util" - "github.com/vespa-engine/vespa/client/go/internal/vespa" "github.com/vespa-engine/vespa/client/go/internal/vespa/document" ) @@ -57,17 +56,17 @@ type feedOptions struct { func newFeedCmd(cli *CLI) *cobra.Command { var options feedOptions cmd := &cobra.Command{ - Use: "feed FILE [FILE]...", + Use: "feed json-file [json-file]...", Short: "Feed multiple document operations to a Vespa cluster", Long: `Feed multiple document operations to a Vespa cluster. This command can be used to feed large amounts of documents to a Vespa cluster efficiently. -The contents of FILE must be either a JSON array or JSON objects separated by +The contents of JSON-FILE must be either a JSON array or JSON objects separated by newline (JSONL). -If FILE is a single dash ('-'), documents will be read from standard input. +If JSON-FILE is a single dash ('-'), documents will be read from standard input. `, Example: `$ vespa feed docs.jsonl moredocs.json $ cat docs.jsonl | vespa feed -`, @@ -109,7 +108,7 @@ func createServices(n int, timeout time.Duration, waitSecs int, cli *CLI) ([]uti services := make([]util.HTTPClient, 0, n) baseURL := "" for i := 0; i < n; i++ { - service, err := cli.service(target, vespa.DocumentService, 0, cli.config.cluster(), time.Duration(waitSecs)*time.Second) + service, err := cli.service(target, cli.config.cluster(), time.Duration(waitSecs)*time.Second) if err != nil { return nil, "", err } diff --git a/client/go/internal/cli/cmd/feed_test.go b/client/go/internal/cli/cmd/feed_test.go index fc2c5ec7520..84328cad5fb 100644 --- a/client/go/internal/cli/cmd/feed_test.go +++ b/client/go/internal/cli/cmd/feed_test.go @@ -43,7 +43,7 @@ func TestFeed(t *testing.T) { httpClient.NextResponseString(200, `{"message":"OK"}`) httpClient.NextResponseString(200, `{"message":"OK"}`) - require.Nil(t, cli.Run("feed", jsonFile1, jsonFile2)) + require.Nil(t, cli.Run("feed", "-t", "http://127.0.0.1:8080", jsonFile1, jsonFile2)) assert.Equal(t, "", stderr.String()) want := `{ @@ -113,7 +113,7 @@ func TestFeedInvalid(t *testing.T) { jsonFile := filepath.Join(td, "docs.jsonl") require.Nil(t, os.WriteFile(jsonFile, doc, 0644)) httpClient.NextResponseString(200, `{"message":"OK"}`) - require.NotNil(t, cli.Run("feed", jsonFile)) + require.NotNil(t, cli.Run("feed", "-t", "http://127.0.0.1:8080", jsonFile)) want := `{ "feeder.seconds": 3.000, diff --git a/client/go/internal/cli/cmd/prod.go b/client/go/internal/cli/cmd/prod.go index 3b37197340f..79a6907eef2 100644 --- a/client/go/internal/cli/cmd/prod.go +++ b/client/go/internal/cli/cmd/prod.go @@ -114,7 +114,7 @@ type prodDeployOptions struct { func newProdDeployCmd(cli *CLI) *cobra.Command { var options prodDeployOptions cmd := &cobra.Command{ - Use: "deploy", + Use: "deploy [application-directory-or-file]", Aliases: []string{"submit"}, // TODO: Remove in Vespa 9 Short: "Deploy an application to production", Long: `Deploy an application to production. diff --git a/client/go/internal/cli/cmd/query.go b/client/go/internal/cli/cmd/query.go index a5b35052b11..90f51b398c2 100644 --- a/client/go/internal/cli/cmd/query.go +++ b/client/go/internal/cli/cmd/query.go @@ -64,7 +64,7 @@ func query(cli *CLI, arguments []string, timeoutSecs, waitSecs int, curl bool) e if err != nil { return err } - service, err := cli.service(target, vespa.QueryService, 0, cli.config.cluster(), time.Duration(waitSecs)*time.Second) + service, err := cli.service(target, cli.config.cluster(), time.Duration(waitSecs)*time.Second) if err != nil { return err } diff --git a/client/go/internal/cli/cmd/query_test.go b/client/go/internal/cli/cmd/query_test.go index 1caf6d33e70..6d5adc0508e 100644 --- a/client/go/internal/cli/cmd/query_test.go +++ b/client/go/internal/cli/cmd/query_test.go @@ -26,7 +26,7 @@ func TestQueryVerbose(t *testing.T) { cli, stdout, stderr := newTestCLI(t) cli.httpClient = client - assert.Nil(t, cli.Run("query", "-v", "select from sources * where title contains 'foo'")) + assert.Nil(t, cli.Run("-t", "http://127.0.0.1:8080", "query", "-v", "select from sources * where title contains 'foo'")) assert.Equal(t, "curl 'http://127.0.0.1:8080/search/?timeout=10s&yql=select+from+sources+%2A+where+title+contains+%27foo%27'\n", stderr.String()) assert.Equal(t, "{\n \"query\": \"result\"\n}\n", stdout.String()) } @@ -75,7 +75,7 @@ func assertQuery(t *testing.T, expectedQuery string, query ...string) { cli, stdout, _ := newTestCLI(t) cli.httpClient = client - args := []string{"query"} + args := []string{"-t", "http://127.0.0.1:8080", "query"} assert.Nil(t, cli.Run(append(args, query...)...)) assert.Equal(t, "{\n \"query\": \"result\"\n}\n", @@ -91,7 +91,7 @@ func assertQueryError(t *testing.T, status int, errorMessage string) { client.NextResponseString(status, errorMessage) cli, _, stderr := newTestCLI(t) cli.httpClient = client - assert.NotNil(t, cli.Run("query", "yql=select from sources * where title contains 'foo'")) + assert.NotNil(t, cli.Run("-t", "http://127.0.0.1:8080", "query", "yql=select from sources * where title contains 'foo'")) assert.Equal(t, "Error: invalid query: Status "+strconv.Itoa(status)+"\n"+errorMessage+"\n", stderr.String(), @@ -103,7 +103,7 @@ func assertQueryServiceError(t *testing.T, status int, errorMessage string) { client.NextResponseString(status, errorMessage) cli, _, stderr := newTestCLI(t) cli.httpClient = client - assert.NotNil(t, cli.Run("query", "yql=select from sources * where title contains 'foo'")) + assert.NotNil(t, cli.Run("-t", "http://127.0.0.1:8080", "query", "yql=select from sources * where title contains 'foo'")) assert.Equal(t, "Error: Status "+strconv.Itoa(status)+" from container at 127.0.0.1:8080\n"+errorMessage+"\n", stderr.String(), diff --git a/client/go/internal/cli/cmd/root.go b/client/go/internal/cli/cmd/root.go index ad42ea588b0..2598bff4246 100644 --- a/client/go/internal/cli/cmd/root.go +++ b/client/go/internal/cli/cmd/root.go @@ -100,7 +100,7 @@ type ztsFactory func(httpClient util.HTTPClient, domain, url string) (vespa.Auth // New creates the Vespa CLI, writing output to stdout and stderr, and reading environment variables from environment. func New(stdout, stderr io.Writer, environment []string) (*CLI, error) { cmd := &cobra.Command{ - Use: "vespa command-name", + Use: "vespa", Short: "The command-line tool for Vespa.ai", Long: `The command-line tool for Vespa.ai. @@ -267,8 +267,6 @@ func (c *CLI) configureCommands() { prodCmd.AddCommand(newProdDeployCmd(c)) // prod deploy rootCmd.AddCommand(prodCmd) // prod rootCmd.AddCommand(newQueryCmd(c)) // query - statusCmd.AddCommand(newStatusQueryCmd(c)) // status query - statusCmd.AddCommand(newStatusDocumentCmd(c)) // status document statusCmd.AddCommand(newStatusDeployCmd(c)) // status deploy rootCmd.AddCommand(statusCmd) // status rootCmd.AddCommand(newTestCmd(c)) // test @@ -296,6 +294,10 @@ func (c *CLI) printSuccess(msg ...interface{}) { fmt.Fprintln(c.Stdout, color.GreenString("Success:"), fmt.Sprint(msg...)) } +func (c *CLI) printInfo(msg ...interface{}) { + fmt.Fprintln(c.Stderr, "Info:", fmt.Sprint(msg...)) +} + func (c *CLI) printDebug(msg ...interface{}) { fmt.Fprintln(c.Stderr, color.CyanString("Debug:"), fmt.Sprint(msg...)) } @@ -504,22 +506,38 @@ func (c *CLI) system(targetType string) (vespa.System, error) { return vespa.System{}, fmt.Errorf("no default system found for %s target", targetType) } -// service returns the service of given name located at target. If non-empty, cluster specifies a cluster to query. This -// function blocks according to the wait period configured in this CLI. The parameter sessionOrRunID specifies either -// the session ID (local target) or run ID (cloud target) to wait for. -func (c *CLI) service(target vespa.Target, name string, sessionOrRunID int64, cluster string, timeout time.Duration) (*vespa.Service, error) { - if timeout > 0 { - log.Printf("Waiting up to %s for %s service to become available ...", color.CyanString(timeout.String()), color.CyanString(name)) +// service returns the service identified by cluster ID, located at target. If timeout is positive, this waits for the +// service to become ready. +func (c *CLI) service(target vespa.Target, cluster string, timeout time.Duration) (*vespa.Service, error) { + targetType, err := c.targetType() + if err != nil { + return nil, err + } + if targetType.url != "" && cluster != "" { + return nil, fmt.Errorf("cluster cannot be specified when target is an URL") } - s, err := target.Service(name, timeout, sessionOrRunID, cluster) + services, err := c.services(target, timeout) if err != nil { - err := fmt.Errorf("service '%s' is unavailable: %w", name, err) - if target.IsCloud() { - return nil, errHint(err, "Confirm that you're communicating with the correct zone and cluster", "The -z option controls the zone", "The -C option controls the cluster") - } return nil, err } - return s, nil + service, err := vespa.FindService(cluster, services) + if err != nil { + return nil, errHint(err, "The --cluster option specifies the service to use") + } + if timeout > 0 { + c.printInfo("Waiting up to ", color.CyanString(timeout.String()), " for ", color.CyanString(service.Description()), " to become available ...") + if err := service.Wait(timeout); err != nil { + return nil, err + } + } + return service, nil +} + +func (c *CLI) services(target vespa.Target, timeout time.Duration) ([]*vespa.Service, error) { + if timeout > 0 { + c.printInfo("Waiting up to ", color.CyanString(timeout.String()), " for cluster discovery ...") + } + return target.ContainerServices(timeout) } // isCI returns true if running inside a continuous integration environment. diff --git a/client/go/internal/cli/cmd/status.go b/client/go/internal/cli/cmd/status.go index 6570aeff448..7d17cce97fa 100644 --- a/client/go/internal/cli/cmd/status.go +++ b/client/go/internal/cli/cmd/status.go @@ -7,6 +7,7 @@ package cmd import ( "fmt" "log" + "strings" "time" "github.com/fatih/color" @@ -17,48 +18,43 @@ import ( func newStatusCmd(cli *CLI) *cobra.Command { var waitSecs int cmd := &cobra.Command{ - Use: "status", - Short: "Verify that a service is ready to use (query by default)", - Example: `$ vespa status query`, - DisableAutoGenTag: true, - SilenceUsage: true, - Args: cobra.MaximumNArgs(1), - RunE: func(cmd *cobra.Command, args []string) error { - return printServiceStatus(cli, vespa.QueryService, waitSecs) - }, - } - cli.bindWaitFlag(cmd, 0, &waitSecs) - return cmd -} - -func newStatusQueryCmd(cli *CLI) *cobra.Command { - var waitSecs int - cmd := &cobra.Command{ - Use: "query", - Short: "Verify that the query service is ready to use (default)", - Example: `$ vespa status query`, - DisableAutoGenTag: true, - SilenceUsage: true, - Args: cobra.ExactArgs(0), - RunE: func(cmd *cobra.Command, args []string) error { - return printServiceStatus(cli, vespa.QueryService, waitSecs) + Use: "status", + Aliases: []string{ + "status container", + "status document", // TODO: Remove on Vespa 9 + "status query", // TODO: Remove on Vespa 9 }, - } - cli.bindWaitFlag(cmd, 0, &waitSecs) - return cmd -} - -func newStatusDocumentCmd(cli *CLI) *cobra.Command { - var waitSecs int - cmd := &cobra.Command{ - Use: "document", - Short: "Verify that the document service is ready to use", - Example: `$ vespa status document`, + Short: "Verify that container service(s) are ready to use", + Example: `$ vespa status +$ vespa status --cluster mycluster`, DisableAutoGenTag: true, SilenceUsage: true, - Args: cobra.ExactArgs(0), + Args: cobra.MaximumNArgs(1), RunE: func(cmd *cobra.Command, args []string) error { - return printServiceStatus(cli, vespa.DocumentService, waitSecs) + cluster := cli.config.cluster() + t, err := cli.target(targetOptions{}) + if err != nil { + return err + } + if cluster == "" { + timeout := time.Duration(waitSecs) * time.Second + services, err := t.ContainerServices(timeout) + if err != nil { + return err + } + if len(services) == 0 { + return errHint(fmt.Errorf("no services exist"), "Deployment may not be ready yet", "Try 'vespa status deployment'") + } + for _, service := range services { + if err := printServiceStatus(service, service.Wait(timeout), cli); err != nil { + return err + } + } + return nil + } else { + s, err := cli.service(t, cluster, 0) + return printServiceStatus(s, err, cli) + } }, } cli.bindWaitFlag(cmd, 0, &waitSecs) @@ -75,36 +71,27 @@ func newStatusDeployCmd(cli *CLI) *cobra.Command { SilenceUsage: true, Args: cobra.ExactArgs(0), RunE: func(cmd *cobra.Command, args []string) error { - return printServiceStatus(cli, vespa.DeployService, waitSecs) + t, err := cli.target(targetOptions{}) + if err != nil { + return err + } + s, err := t.DeployService(0) + if err != nil { + return err + } + return printServiceStatus(s, s.Wait(time.Duration(waitSecs)*time.Second), cli) }, } cli.bindWaitFlag(cmd, 0, &waitSecs) return cmd } -func printServiceStatus(cli *CLI, name string, waitSecs int) error { - t, err := cli.target(targetOptions{}) - if err != nil { - return err - } - cluster := cli.config.cluster() - s, err := cli.service(t, name, 0, cluster, 0) - if err != nil { - return err - } - // Wait explicitly - status, err := s.Wait(time.Duration(waitSecs) * time.Second) - clusterPart := "" - if cluster != "" { - clusterPart = fmt.Sprintf(" named %s", color.CyanString(cluster)) - } - if status/100 == 2 { - log.Print(s.Description(), clusterPart, " at ", color.CyanString(s.BaseURL), " is ", color.GreenString("ready")) - } else { - if err == nil { - err = fmt.Errorf("status %d", status) - } - return fmt.Errorf("%s%s at %s is %s: %w", s.Description(), clusterPart, color.CyanString(s.BaseURL), color.RedString("not ready"), err) +func printServiceStatus(s *vespa.Service, waitErr error, cli *CLI) error { + if waitErr != nil { + return waitErr } + desc := s.Description() + desc = strings.ToUpper(string(desc[0])) + string(desc[1:]) + log.Print(desc, " at ", color.CyanString(s.BaseURL), " is ", color.GreenString("ready")) return nil } diff --git a/client/go/internal/cli/cmd/status_test.go b/client/go/internal/cli/cmd/status_test.go index 76efea55503..15ec9280587 100644 --- a/client/go/internal/cli/cmd/status_test.go +++ b/client/go/internal/cli/cmd/status_test.go @@ -5,6 +5,7 @@ package cmd import ( + "io" "testing" "github.com/stretchr/testify/assert" @@ -23,63 +24,120 @@ func TestStatusDeployCommandWithLocalTarget(t *testing.T) { assertDeployStatus("http://127.0.0.1:19071", []string{"-t", "local"}, t) } -func TestStatusQueryCommand(t *testing.T) { - assertQueryStatus("http://127.0.0.1:8080", []string{}, t) +func TestStatusCommand(t *testing.T) { + assertStatus("http://127.0.0.1:8080", []string{}, t) } -func TestStatusQueryCommandWithUrlTarget(t *testing.T) { - assertQueryStatus("http://mycontainertarget:8080", []string{"-t", "http://mycontainertarget:8080"}, t) +func TestStatusCommandMultiCluster(t *testing.T) { + client := &mock.HTTPClient{} + cli, stdout, stderr := newTestCLI(t) + cli.httpClient = client + + mockServiceStatus(client) + assert.NotNil(t, cli.Run("status")) + assert.Equal(t, "Error: no services exist\nHint: Deployment may not be ready yet\nHint: Try 'vespa status deployment'\n", stderr.String()) + + mockServiceStatus(client, "foo", "bar") + assert.Nil(t, cli.Run("status")) + assert.Equal(t, `Container bar at http://127.0.0.1:8080 is ready +Container foo at http://127.0.0.1:8080 is ready +`, stdout.String()) + + stdout.Reset() + mockServiceStatus(client, "foo", "bar") + assert.Nil(t, cli.Run("status", "--cluster", "foo")) + assert.Equal(t, "Container foo at http://127.0.0.1:8080 is ready\n", stdout.String()) } -func TestStatusQueryCommandWithLocalTarget(t *testing.T) { - assertQueryStatus("http://127.0.0.1:8080", []string{"-t", "local"}, t) +func TestStatusCommandWithUrlTarget(t *testing.T) { + assertStatus("http://mycontainertarget:8080", []string{"-t", "http://mycontainertarget:8080"}, t) } -func TestStatusDocumentCommandWithLocalTarget(t *testing.T) { - assertDocumentStatus("http://127.0.0.1:8080", []string{"-t", "local"}, t) +func TestStatusCommandWithLocalTarget(t *testing.T) { + assertStatus("http://127.0.0.1:8080", []string{"-t", "local"}, t) } -func TestStatusErrorResponse(t *testing.T) { - assertQueryStatusError("http://127.0.0.1:8080", []string{}, t) +func TestStatusError(t *testing.T) { + client := &mock.HTTPClient{} + mockServiceStatus(client, "default") + client.NextStatus(500) + cli, _, stderr := newTestCLI(t) + cli.httpClient = client + assert.NotNil(t, cli.Run("status", "container")) + assert.Equal(t, + "Error: unhealthy container default: status 500 at http://127.0.0.1:8080/ApplicationStatus: wait timed out\n", + stderr.String()) + + stderr.Reset() + client.NextResponseError(io.EOF) + assert.NotNil(t, cli.Run("status", "container", "-t", "http://example.com")) + assert.Equal(t, + "Error: unhealthy container at http://example.com/ApplicationStatus: EOF\n", + stderr.String()) +} + +func isLocalTarget(args []string) bool { + for i := 0; i < len(args)-1; i++ { + if args[i] == "-t" { + return args[i+1] == "local" + } + } + return true // local is default } -func assertDeployStatus(target string, args []string, t *testing.T) { +func assertDeployStatus(expectedTarget string, args []string, t *testing.T) { + t.Helper() client := &mock.HTTPClient{} + client.NextResponse(mock.HTTPResponse{ + URI: "/status.html", + Status: 200, + }) cli, stdout, _ := newTestCLI(t) cli.httpClient = client statusArgs := []string{"status", "deploy"} assert.Nil(t, cli.Run(append(statusArgs, args...)...)) assert.Equal(t, - "Deploy API at "+target+" is ready\n", - stdout.String(), - "vespa status config-server") - assert.Equal(t, target+"/status.html", client.LastRequest.URL.String()) + "Deploy API at "+expectedTarget+" is ready\n", + stdout.String()) + assert.Equal(t, expectedTarget+"/status.html", client.LastRequest.URL.String()) } -func assertQueryStatus(target string, args []string, t *testing.T) { +func assertStatus(expectedTarget string, args []string, t *testing.T) { + t.Helper() client := &mock.HTTPClient{} + clusterName := "" + for i := 0; i < 2; i++ { + if isLocalTarget(args) { + clusterName = "foo" + mockServiceStatus(client, clusterName) + } + client.NextResponse(mock.HTTPResponse{URI: "/ApplicationStatus", Status: 200}) + } cli, stdout, _ := newTestCLI(t) cli.httpClient = client - statusArgs := []string{"status", "query"} + statusArgs := []string{"status"} assert.Nil(t, cli.Run(append(statusArgs, args...)...)) - assert.Equal(t, - "Container (query API) at "+target+" is ready\n", - stdout.String(), - "vespa status container") - assert.Equal(t, target+"/ApplicationStatus", client.LastRequest.URL.String()) - - statusArgs = []string{"status"} + prefix := "Container" + if clusterName != "" { + prefix += " " + clusterName + } + assert.Equal(t, prefix+" at "+expectedTarget+" is ready\n", stdout.String()) + assert.Equal(t, expectedTarget+"/ApplicationStatus", client.LastRequest.URL.String()) + + // Test legacy command + statusArgs = []string{"status query"} stdout.Reset() assert.Nil(t, cli.Run(append(statusArgs, args...)...)) - assert.Equal(t, - "Container (query API) at "+target+" is ready\n", - stdout.String(), - "vespa status (the default)") - assert.Equal(t, target+"/ApplicationStatus", client.LastRequest.URL.String()) + assert.Equal(t, prefix+" at "+expectedTarget+" is ready\n", stdout.String()) + assert.Equal(t, expectedTarget+"/ApplicationStatus", client.LastRequest.URL.String()) } func assertDocumentStatus(target string, args []string, t *testing.T) { + t.Helper() client := &mock.HTTPClient{} + if isLocalTarget(args) { + mockServiceStatus(client, "default") + } cli, stdout, _ := newTestCLI(t) cli.httpClient = client assert.Nil(t, cli.Run("status", "document")) @@ -89,15 +147,3 @@ func assertDocumentStatus(target string, args []string, t *testing.T) { "vespa status container") assert.Equal(t, target+"/ApplicationStatus", client.LastRequest.URL.String()) } - -func assertQueryStatusError(target string, args []string, t *testing.T) { - client := &mock.HTTPClient{} - client.NextStatus(500) - cli, _, stderr := newTestCLI(t) - cli.httpClient = client - assert.NotNil(t, cli.Run("status", "container")) - assert.Equal(t, - "Error: Container (query API) at "+target+" is not ready: status 500\n", - stderr.String(), - "vespa status container") -} diff --git a/client/go/internal/cli/cmd/test.go b/client/go/internal/cli/cmd/test.go index abee760efbb..58c254ad6e8 100644 --- a/client/go/internal/cli/cmd/test.go +++ b/client/go/internal/cli/cmd/test.go @@ -79,7 +79,7 @@ func runTests(cli *CLI, rootPath string, dryRun bool, waitSecs int) (int, []stri if err != nil { return 0, nil, errHint(err, "See https://docs.vespa.ai/en/reference/testing") } - context := testContext{testsPath: rootPath, dryRun: dryRun, cli: cli} + context := testContext{testsPath: rootPath, dryRun: dryRun, cli: cli, clusters: map[string]*vespa.Service{}} previousFailed := false for _, test := range tests { if !test.IsDir() && filepath.Ext(test.Name()) == ".json" { @@ -100,7 +100,7 @@ func runTests(cli *CLI, rootPath string, dryRun bool, waitSecs int) (int, []stri } } } else if strings.HasSuffix(stat.Name(), ".json") { - failure, err := runTest(rootPath, testContext{testsPath: filepath.Dir(rootPath), dryRun: dryRun, cli: cli}, waitSecs) + failure, err := runTest(rootPath, testContext{testsPath: filepath.Dir(rootPath), dryRun: dryRun, cli: cli, clusters: map[string]*vespa.Service{}}, waitSecs) if err != nil { return 0, nil, err } @@ -216,9 +216,15 @@ func verify(step step, defaultCluster string, defaultParameters map[string]strin if err != nil { return "", "", err } - service, err = target.Service(vespa.QueryService, time.Duration(waitSecs)*time.Second, 0, cluster) - if err != nil { - return "", "", err + ok := false + service, ok = context.clusters[cluster] + if !ok { + // Cache service so we don't have to discover it for every step + service, err = context.cli.service(target, cluster, time.Duration(waitSecs)*time.Second) + if err != nil { + return "", "", err + } + context.clusters[cluster] = service } requestUrl, err = url.ParseRequestURI(service.BaseURL + requestUri) if err != nil { @@ -474,6 +480,8 @@ type testContext struct { lazyTarget vespa.Target testsPath string dryRun bool + // Cache of services by their cluster name + clusters map[string]*vespa.Service } func (t *testContext) target() (vespa.Target, error) { diff --git a/client/go/internal/cli/cmd/test_test.go b/client/go/internal/cli/cmd/test_test.go index 5d6bb441b2a..4f8e6d49a2a 100644 --- a/client/go/internal/cli/cmd/test_test.go +++ b/client/go/internal/cli/cmd/test_test.go @@ -23,20 +23,26 @@ import ( func TestSuite(t *testing.T) { client := &mock.HTTPClient{} searchResponse, _ := os.ReadFile("testdata/tests/response.json") + mockServiceStatus(client, "container") client.NextStatus(200) client.NextStatus(200) - for i := 0; i < 11; i++ { + for i := 0; i < 2; i++ { + client.NextResponseString(200, string(searchResponse)) + } + mockServiceStatus(client, "container") // Some tests do not specify cluster, which is fine since we only have one, but this causes a cache miss + for i := 0; i < 9; i++ { client.NextResponseString(200, string(searchResponse)) } - expectedBytes, _ := os.ReadFile("testdata/tests/expected-suite.out") cli, stdout, stderr := newTestCLI(t) cli.httpClient = client assert.NotNil(t, cli.Run("test", "testdata/tests/system-test")) - + assert.Equal(t, "", stderr.String()) baseUrl := "http://127.0.0.1:8080" urlWithQuery := baseUrl + "/search/?presentation.timing=true&query=artist%3A+foo&timeout=3.4s" - requests := []*http.Request{createFeedRequest(baseUrl), createFeedRequest(baseUrl), createSearchRequest(urlWithQuery), createSearchRequest(urlWithQuery)} + discoveryRequest := createSearchRequest("http://127.0.0.1:19071/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge") + requests := []*http.Request{discoveryRequest, createFeedRequest(baseUrl), createFeedRequest(baseUrl), createSearchRequest(urlWithQuery), createSearchRequest(urlWithQuery)} + requests = append(requests, discoveryRequest) requests = append(requests, createSearchRequest(baseUrl+"/search/")) requests = append(requests, createSearchRequest(baseUrl+"/search/?foo=%2F")) for i := 0; i < 7; i++ { @@ -95,6 +101,7 @@ func TestSuiteWithoutTests(t *testing.T) { func TestSingleTest(t *testing.T) { client := &mock.HTTPClient{} searchResponse, _ := os.ReadFile("testdata/tests/response.json") + mockServiceStatus(client, "container") client.NextStatus(200) client.NextStatus(200) client.NextResponseString(200, string(searchResponse)) @@ -109,7 +116,8 @@ func TestSingleTest(t *testing.T) { baseUrl := "http://127.0.0.1:8080" rawUrl := baseUrl + "/search/?presentation.timing=true&query=artist%3A+foo&timeout=3.4s" - assertRequests([]*http.Request{createFeedRequest(baseUrl), createFeedRequest(baseUrl), createSearchRequest(rawUrl), createSearchRequest(rawUrl)}, client, t) + discoveryRequest := createSearchRequest("http://127.0.0.1:19071/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge") + assertRequests([]*http.Request{discoveryRequest, createFeedRequest(baseUrl), createFeedRequest(baseUrl), createSearchRequest(rawUrl), createSearchRequest(rawUrl)}, client, t) } func TestSingleTestWithCloudAndEndpoints(t *testing.T) { @@ -172,12 +180,17 @@ func createRequest(method string, uri string, body string) *http.Request { } func assertRequests(requests []*http.Request, client *mock.HTTPClient, t *testing.T) { + t.Helper() if assert.Equal(t, len(requests), len(client.Requests)) { for i, e := range requests { a := client.Requests[i] assert.Equal(t, e.URL.String(), a.URL.String()) assert.Equal(t, e.Method, a.Method) - assert.Equal(t, util.ReaderToJSON(e.Body), util.ReaderToJSON(a.Body)) + actualBody := a.Body + if actualBody == nil { + actualBody = io.NopCloser(strings.NewReader("")) + } + assert.Equal(t, util.ReaderToJSON(e.Body), util.ReaderToJSON(actualBody)) } } } diff --git a/client/go/internal/cli/cmd/testutil_test.go b/client/go/internal/cli/cmd/testutil_test.go index 61d6c15c5a0..c16c9f8dc50 100644 --- a/client/go/internal/cli/cmd/testutil_test.go +++ b/client/go/internal/cli/cmd/testutil_test.go @@ -3,8 +3,10 @@ package cmd import ( "bytes" + "fmt" "net/http" "path/filepath" + "strings" "testing" "time" @@ -41,6 +43,36 @@ func newTestCLI(t *testing.T, envVars ...string) (*CLI, *bytes.Buffer, *bytes.Bu return cli, &stdout, &stderr } +func mockServiceStatus(client *mock.HTTPClient, clusterNames ...string) { + var serviceObjects []string + for _, name := range clusterNames { + service := fmt.Sprintf(`{ + "clusterName": "%s", + "host": "localhost", + "port": 8080, + "type": "container", + "url": "http://localhost:19071/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge/localhost:8080", + "currentGeneration": 1 + } +`, name) + serviceObjects = append(serviceObjects, service) + } + services := "[]" + if len(serviceObjects) > 0 { + services = "[" + strings.Join(serviceObjects, ",") + "]" + } + response := fmt.Sprintf(` +{ + "services": %s, + "currentGeneration": 1 +}`, services) + client.NextResponse(mock.HTTPResponse{ + URI: "/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge", + Status: 200, + Body: []byte(response), + }) +} + type mockAuthenticator struct{} func (a *mockAuthenticator) Authenticate(request *http.Request) error { return nil } diff --git a/client/go/internal/cli/cmd/visit_test.go b/client/go/internal/cli/cmd/visit_test.go index f85fb739370..bd806f1d9c9 100644 --- a/client/go/internal/cli/cmd/visit_test.go +++ b/client/go/internal/cli/cmd/visit_test.go @@ -93,10 +93,14 @@ func TestRunOneVisit(t *testing.T) { func withMockClient(t *testing.T, prepCli func(*mock.HTTPClient), runOp func(*vespa.Service)) *http.Request { client := &mock.HTTPClient{} + mockServiceStatus(client, "container") prepCli(client) cli, _, _ := newTestCLI(t) cli.httpClient = client - service, _ := documentService(cli, 0) + service, err := documentService(cli, 0) + if err != nil { + t.Fatal(err) + } runOp(service) return client.LastRequest } @@ -126,6 +130,7 @@ func TestVisitCommand(t *testing.T) { } func assertVisitResults(arguments []string, t *testing.T, responses []string, queryPart, output string) { + t.Helper() client := &mock.HTTPClient{} client.NextResponseString(200, handlersResponse) client.NextResponseString(400, clusterStarResponse) @@ -134,6 +139,7 @@ func assertVisitResults(arguments []string, t *testing.T, responses []string, qu } cli, stdout, stderr := newTestCLI(t) cli.httpClient = client + arguments = append(arguments, "-t", "http://127.0.0.1:8080") assert.Nil(t, cli.Run(arguments...)) assert.Equal(t, output, stdout.String()) assert.Equal(t, "", stderr.String()) diff --git a/client/go/internal/mock/http.go b/client/go/internal/mock/http.go index 3d4ead596b0..8a17448957f 100644 --- a/client/go/internal/mock/http.go +++ b/client/go/internal/mock/http.go @@ -2,6 +2,7 @@ package mock import ( "bytes" + "fmt" "io" "net/http" "strconv" @@ -32,6 +33,7 @@ type HTTPClient struct { } type HTTPResponse struct { + URI string Status int Body []byte Header http.Header @@ -65,6 +67,9 @@ func (c *HTTPClient) Do(request *http.Request, timeout time.Duration) (*http.Res response := HTTPResponse{Status: 200} if len(c.nextResponses) > 0 { response = c.nextResponses[0] + if response.URI != "" && response.URI != request.URL.RequestURI() { + return nil, fmt.Errorf("uri of response is %s, which does not match request uri %s", response.URI, request.URL.RequestURI()) + } c.nextResponses = c.nextResponses[1:] } if c.ReadBody && request.Body != nil { diff --git a/client/go/internal/vespa/deploy.go b/client/go/internal/vespa/deploy.go index ae4d4678d66..6577ac0d38b 100644 --- a/client/go/internal/vespa/deploy.go +++ b/client/go/internal/vespa/deploy.go @@ -92,7 +92,7 @@ func (d DeploymentOptions) String() string { } func (d *DeploymentOptions) url(path string) (*url.URL, error) { - service, err := d.Target.Service(DeployService, 0, 0, "") + service, err := d.Target.DeployService(0) if err != nil { return nil, err } @@ -311,7 +311,7 @@ func Submit(opts DeploymentOptions, submission Submission) error { } func deployServiceDo(request *http.Request, timeout time.Duration, opts DeploymentOptions) (*http.Response, error) { - s, err := opts.Target.Service(DeployService, 0, 0, "") + s, err := opts.Target.DeployService(0) if err != nil { return nil, err } @@ -373,7 +373,7 @@ func uploadApplicationPackage(url *url.URL, opts DeploymentOptions) (PrepareResu if err != nil { return PrepareResult{}, err } - service, err := opts.Target.Service(DeployService, opts.Timeout, 0, "") + service, err := opts.Target.DeployService(opts.Timeout) if err != nil { return PrepareResult{}, err } diff --git a/client/go/internal/vespa/deploy_test.go b/client/go/internal/vespa/deploy_test.go index c68ad750f1a..276e0f958d9 100644 --- a/client/go/internal/vespa/deploy_test.go +++ b/client/go/internal/vespa/deploy_test.go @@ -38,7 +38,7 @@ func TestDeploy(t *testing.T) { func TestDeployCloud(t *testing.T) { httpClient := mock.HTTPClient{} - target := createCloudTarget(t, "http://vespacloud", io.Discard) + target, _ := createCloudTarget(t, io.Discard) cloudTarget, ok := target.(*cloudTarget) require.True(t, ok) cloudTarget.httpClient = &httpClient @@ -51,7 +51,7 @@ func TestDeployCloud(t *testing.T) { require.Nil(t, err) assert.Equal(t, 1, len(httpClient.Requests)) req := httpClient.LastRequest - assert.Equal(t, "http://vespacloud/application/v4/tenant/t1/application/a1/instance/i1/deploy/dev-us-north-1", req.URL.String()) + assert.Equal(t, "https://api-ctl.vespa-cloud.com:4443/application/v4/tenant/t1/application/a1/instance/i1/deploy/dev-us-north-1", req.URL.String()) values := parseMultiPart(t, req) zipData := values["applicationZip"] @@ -71,7 +71,7 @@ func TestDeployCloud(t *testing.T) { func TestSubmit(t *testing.T) { httpClient := mock.HTTPClient{} - target := createCloudTarget(t, "http://vespacloud", io.Discard) + target, _ := createCloudTarget(t, io.Discard) cloudTarget, ok := target.(*cloudTarget) require.True(t, ok) cloudTarget.httpClient = &httpClient @@ -181,7 +181,7 @@ func TestDeactivate(t *testing.T) { func TestDeactivateCloud(t *testing.T) { httpClient := mock.HTTPClient{} - target := createCloudTarget(t, "http://vespacloud", io.Discard) + target, _ := createCloudTarget(t, io.Discard) cloudTarget, ok := target.(*cloudTarget) require.True(t, ok) cloudTarget.httpClient = &httpClient @@ -190,7 +190,7 @@ func TestDeactivateCloud(t *testing.T) { assert.Equal(t, 1, len(httpClient.Requests)) req := httpClient.LastRequest assert.Equal(t, "DELETE", req.Method) - assert.Equal(t, "http://vespacloud/application/v4/tenant/t1/application/a1/instance/i1/environment/dev/region/us-north-1", req.URL.String()) + assert.Equal(t, "https://api-ctl.vespa-cloud.com:4443/application/v4/tenant/t1/application/a1/instance/i1/environment/dev/region/us-north-1", req.URL.String()) } type pkgFixture struct { diff --git a/client/go/internal/vespa/log.go b/client/go/internal/vespa/log.go index 0e2cb5d0bfd..81088b8c0a1 100644 --- a/client/go/internal/vespa/log.go +++ b/client/go/internal/vespa/log.go @@ -72,6 +72,8 @@ func ReadLogEntries(r io.Reader) ([]LogEntry, error) { // LogLevel returns an int representing a named log level. func LogLevel(name string) int { switch name { + case "none": + return -1 case "error": return 0 case "warning": diff --git a/client/go/internal/vespa/target.go b/client/go/internal/vespa/target.go index 6dd64dd1275..df82cc435b2 100644 --- a/client/go/internal/vespa/target.go +++ b/client/go/internal/vespa/target.go @@ -4,9 +4,11 @@ package vespa import ( "crypto/tls" + "errors" "fmt" "io" "net/http" + "strings" "sync" "time" @@ -27,18 +29,17 @@ const ( // A hosted Vespa target TargetHosted = "hosted" - // A Vespa service that handles deployments, either a config server or a controller - DeployService = "deploy" + // LatestDeployment waits for a deployment to converge to latest generation + LatestDeployment int64 = -1 - // A Vespa service that handles queries. - QueryService = "query" + // AnyDeployment waits for a deployment to converge on any generation + AnyDeployment int64 = -2 - // A Vespa service that handles feeding of document. This may point to the same service as QueryService. - DocumentService = "document" - - retryInterval = 2 * time.Second + defaultRetryInterval = 2 * time.Second ) +var errWaitTimeout = errors.New("wait timed out") + // Authenticator authenticates the given HTTP request. type Authenticator interface { Authenticate(request *http.Request) error @@ -50,9 +51,11 @@ type Service struct { Name string TLSOptions TLSOptions - once sync.Once - auth Authenticator - httpClient util.HTTPClient + deployAPI bool + once sync.Once + auth Authenticator + httpClient util.HTTPClient + retryInterval time.Duration } // Target represents a Vespa platform, running named Vespa services. @@ -66,8 +69,17 @@ type Target interface { // Deployment returns the deployment managed by this target. Deployment() Deployment - // Service returns the service for given name. If timeout is non-zero, wait for the service to converge. - Service(name string, timeout time.Duration, sessionOrRunID int64, cluster string) (*Service, error) + // DeployService returns the service to use for deployments. If timeout is positive, wait for the service to become + // ready. + DeployService(timeout time.Duration) (*Service, error) + + // ContainerServices returns all container services of the current deployment. If timeout is positive, wait for + // services to be discovered. + ContainerServices(timeout time.Duration) ([]*Service, error) + + // AwaitDeployment waits for a deployment identified by id to succeed. It returns the id that succeeded, or an + // error. The exact meaning of id depends on the implementation. + AwaitDeployment(id int64, timeout time.Duration) (int64, error) // PrintLog writes the logs of this deployment using given options to control output. PrintLog(options LogOptions) error @@ -114,29 +126,63 @@ func (s *Service) Do(request *http.Request, timeout time.Duration) (*http.Respon func (s *Service) SetClient(client util.HTTPClient) { s.httpClient = client } // Wait polls the health check of this service until it succeeds or timeout passes. -func (s *Service) Wait(timeout time.Duration) (int, error) { +func (s *Service) Wait(timeout time.Duration) error { url := s.BaseURL - switch s.Name { - case DeployService: + if s.deployAPI { url += "/status.html" // because /ApplicationStatus is not publicly reachable in Vespa Cloud - case QueryService, DocumentService: + } else { url += "/ApplicationStatus" - default: - return 0, fmt.Errorf("invalid service: %s", s.Name) } - return waitForOK(s, url, timeout) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return err + } + okFunc := func(status int, response []byte) (bool, error) { return isOK(status) } + status, err := wait(s, okFunc, func() *http.Request { return req }, timeout, s.retryInterval) + if err != nil { + waitDesc := "" + if timeout > 0 { + waitDesc = " (after waiting " + timeout.String() + ") " + } + statusDesc := "" + if status > 0 { + statusDesc = fmt.Sprintf(": status %d", status) + } + return fmt.Errorf("unhealthy %s%s%s at %s: %w", s.Description(), waitDesc, statusDesc, url, err) + } + return nil } func (s *Service) Description() string { - switch s.Name { - case QueryService: - return "Container (query API)" - case DocumentService: - return "Container (document API)" - case DeployService: - return "Deploy API" + if s.deployAPI { + return "deploy API" } - return fmt.Sprintf("No description of service %s", s.Name) + if s.Name == "" { + return "container" + } + return "container " + s.Name +} + +// FindService returns the service of given name, found among services, if any. +func FindService(name string, services []*Service) (*Service, error) { + if name == "" && len(services) == 1 { + return services[0], nil + } + names := make([]string, len(services)) + for i, s := range services { + if name == s.Name { + return s, nil + } + names[i] = s.Name + } + found := "no services found" + if len(names) > 0 { + found = "known services: " + strings.Join(names, ", ") + } + if name != "" { + return nil, fmt.Errorf("no such service: %q: %s", name, found) + } + return nil, fmt.Errorf("no service specified: %s", found) } func isOK(status int) (bool, error) { @@ -145,57 +191,48 @@ func isOK(status int) (bool, error) { case 2: // success return true, nil case 4: // client error - return false, fmt.Errorf("request failed with status %d", status) - default: // retry + return false, fmt.Errorf("got status %d", status) + default: // retry on everything else return false, nil } } -type responseFunc func(status int, response []byte) (bool, error) +// responseFunc returns whether a HTTP request is considered successful, based on its status and response data. +// Returning false indicates that the operation should be retried. An error is returned if the response is considered +// terminal and that the request should not be retried. +type responseFunc func(status int, response []byte) (ok bool, err error) type requestFunc func() *http.Request -// waitForOK queries url and returns its status code. If response status is not 2xx or 4xx, it is repeatedly queried -// until timeout elapses. -func waitForOK(service *Service, url string, timeout time.Duration) (int, error) { - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return 0, err - } - okFunc := func(status int, response []byte) (bool, error) { - ok, err := isOK(status) - if err != nil { - return false, fmt.Errorf("failed to query %s at %s: %w", service.Description(), url, err) - } - return ok, err - } - return wait(service, okFunc, func() *http.Request { return req }, timeout) -} - -func wait(service *Service, fn responseFunc, reqFn requestFunc, timeout time.Duration) (int, error) { +// wait queries service until one of the following conditions are satisfied: +// +// 1. okFn returns true or a non-nil error +// 2. timeout is exceeded +// +// It returns the last received HTTP status code and error, if any. +func wait(service *Service, okFn responseFunc, reqFn requestFunc, timeout, retryInterval time.Duration) (int, error) { var ( - httpErr error - response *http.Response - statusCode int + status int + response *http.Response + err error ) deadline := time.Now().Add(timeout) loopOnce := timeout == 0 for time.Now().Before(deadline) || loopOnce { - req := reqFn() - response, httpErr = service.Do(req, 10*time.Second) - if httpErr == nil { - statusCode = response.StatusCode + response, err = service.Do(reqFn(), 10*time.Second) + if err == nil { + status = response.StatusCode body, err := io.ReadAll(response.Body) if err != nil { return 0, err } response.Body.Close() - ok, err := fn(statusCode, body) + ok, err := okFn(status, body) if err != nil { - return statusCode, err + return status, err } if ok { - return statusCode, nil + return status, nil } } timeLeft := time.Until(deadline) @@ -204,5 +241,8 @@ func wait(service *Service, fn responseFunc, reqFn requestFunc, timeout time.Dur } time.Sleep(retryInterval) } - return statusCode, httpErr + if err == nil { + return status, errWaitTimeout + } + return status, err } diff --git a/client/go/internal/vespa/target_cloud.go b/client/go/internal/vespa/target_cloud.go index c0169f1a9bd..9b53180b7bb 100644 --- a/client/go/internal/vespa/target_cloud.go +++ b/client/go/internal/vespa/target_cloud.go @@ -3,12 +3,12 @@ package vespa import ( "bytes" "encoding/json" + "errors" "fmt" "math" "net/http" "sort" "strconv" - "strings" "time" "github.com/vespa-engine/vespa/client/go/internal/util" @@ -37,6 +37,7 @@ type cloudTarget struct { httpClient util.HTTPClient apiAuth Authenticator deploymentAuth Authenticator + retryInterval time.Duration } type deploymentEndpoint struct { @@ -49,13 +50,21 @@ type deploymentResponse struct { Endpoints []deploymentEndpoint `json:"endpoints"` } -type jobResponse struct { +type runResponse struct { Active bool `json:"active"` Status string `json:"status"` Log map[string][]logMessage `json:"log"` LastID int64 `json:"lastId"` } +type jobResponse struct { + ID int64 `json:"id"` +} + +type jobsResponse struct { + Runs []jobResponse `json:"runs"` +} + type logMessage struct { At int64 `json:"at"` Type string `json:"type"` @@ -71,40 +80,10 @@ func CloudTarget(httpClient util.HTTPClient, apiAuth Authenticator, deploymentAu logOptions: logOptions, apiAuth: apiAuth, deploymentAuth: deploymentAuth, + retryInterval: defaultRetryInterval, }, nil } -func (t *cloudTarget) findClusterURL(cluster string, timeout time.Duration, runID int64) (string, error) { - if t.deploymentOptions.CustomURL != "" { - return t.deploymentOptions.CustomURL, nil - } - if t.deploymentOptions.ClusterURLs == nil { - if err := t.waitForEndpoints(timeout, runID); err != nil { - return "", err - } - } - clusters := make([]string, 0, len(t.deploymentOptions.ClusterURLs)) - for c := range t.deploymentOptions.ClusterURLs { - clusters = append(clusters, c) - } - if cluster == "" { - for _, url := range t.deploymentOptions.ClusterURLs { - if len(t.deploymentOptions.ClusterURLs) == 1 { - return url, nil - } else { - return "", fmt.Errorf("no cluster specified: found multiple clusters '%s'", strings.Join(clusters, "', '")) - } - } - } else { - url, ok := t.deploymentOptions.ClusterURLs[cluster] - if !ok { - return "", fmt.Errorf("invalid cluster '%s': must be one of '%s'", cluster, strings.Join(clusters, "', '")) - } - return url, nil - } - return "", fmt.Errorf("no endpoints found") -} - func (t *cloudTarget) Type() string { switch t.apiOptions.System.Name { case MainSystem.Name, CDSystem.Name: @@ -117,41 +96,58 @@ func (t *cloudTarget) IsCloud() bool { return true } func (t *cloudTarget) Deployment() Deployment { return t.deploymentOptions.Deployment } -func (t *cloudTarget) Service(name string, timeout time.Duration, runID int64, cluster string) (*Service, error) { - switch name { - case DeployService: +func (t *cloudTarget) DeployService(timeout time.Duration) (*Service, error) { + service := &Service{ + BaseURL: t.apiOptions.System.URL, + TLSOptions: t.apiOptions.TLSOptions, + deployAPI: true, + httpClient: t.httpClient, + auth: t.apiAuth, + retryInterval: t.retryInterval, + } + if timeout > 0 { + if err := service.Wait(timeout); err != nil { + return nil, err + } + } + return service, nil +} + +func (t *cloudTarget) ContainerServices(timeout time.Duration) ([]*Service, error) { + var clusterUrls map[string]string + if t.deploymentOptions.CustomURL != "" { + // Custom URL is always preferred + clusterUrls = map[string]string{"": t.deploymentOptions.CustomURL} + } else if t.deploymentOptions.ClusterURLs != nil { + // ... then endpoints specified through environment + clusterUrls = t.deploymentOptions.ClusterURLs + } else { + // ... then discovered endpoints + endpoints, err := t.discoverEndpoints(timeout) + if err != nil { + return nil, err + } + clusterUrls = endpoints + } + services := make([]*Service, 0, len(clusterUrls)) + for name, url := range clusterUrls { service := &Service{ - Name: name, - BaseURL: t.apiOptions.System.URL, - TLSOptions: t.apiOptions.TLSOptions, - httpClient: t.httpClient, - auth: t.apiAuth, + Name: name, + BaseURL: url, + TLSOptions: t.deploymentOptions.TLSOptions, + httpClient: t.httpClient, + auth: t.deploymentAuth, + retryInterval: t.retryInterval, } if timeout > 0 { - status, err := service.Wait(timeout) - if err != nil { + if err := service.Wait(timeout); err != nil { return nil, err } - if ok, _ := isOK(status); !ok { - return nil, fmt.Errorf("got status %d from deploy service at %s", status, service.BaseURL) - } } - return service, nil - case QueryService, DocumentService: - url, err := t.findClusterURL(cluster, timeout, runID) - if err != nil { - return nil, err - } - return &Service{ - Name: name, - BaseURL: url, - TLSOptions: t.deploymentOptions.TLSOptions, - httpClient: t.httpClient, - auth: t.deploymentAuth, - }, nil - default: - return nil, fmt.Errorf("unknown service: %s", name) + services = append(services, service) } + sort.Slice(services, func(i, j int) bool { return services[i].Name < services[j].Name }) + return services, nil } func (t *cloudTarget) CheckVersion(clientVersion version.Version) error { @@ -162,7 +158,7 @@ func (t *cloudTarget) CheckVersion(clientVersion version.Version) error { if err != nil { return err } - deployService, err := t.Service(DeployService, 0, 0, "") + deployService, err := t.DeployService(0) if err != nil { return err } @@ -218,7 +214,7 @@ func (t *cloudTarget) PrintLog(options LogOptions) error { } logEntries, err := ReadLogEntries(bytes.NewReader(response)) if err != nil { - return true, err + return false, err } for _, le := range logEntries { if !le.Time.After(lastFrom) { @@ -238,35 +234,65 @@ func (t *cloudTarget) PrintLog(options LogOptions) error { if options.Follow { timeout = math.MaxInt64 // No timeout } - _, err = t.deployServiceWait(logFunc, requestFunc, timeout) - return err + // Ignore wait error because logFunc has no concept of completion, we just want to print log entries until timeout is reached + if _, err := t.deployServiceWait(logFunc, requestFunc, timeout); err != nil && !errors.Is(err, errWaitTimeout) { + return fmt.Errorf("failed to read logs: %s", err) + } + return nil } func (t *cloudTarget) deployServiceWait(fn responseFunc, reqFn requestFunc, timeout time.Duration) (int, error) { - deployService, err := t.Service(DeployService, 0, 0, "") + deployService, err := t.DeployService(0) if err != nil { return 0, err } - return wait(deployService, fn, reqFn, timeout) + return wait(deployService, fn, reqFn, timeout, t.retryInterval) } -func (t *cloudTarget) waitForEndpoints(timeout time.Duration, runID int64) error { - if runID > 0 { - if err := t.waitForRun(runID, timeout); err != nil { - return err +func (t *cloudTarget) discoverLatestRun(timeout time.Duration) (int64, error) { + runsURL := fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/job/%s-%s?limit=1", + t.apiOptions.System.URL, + t.deploymentOptions.Deployment.Application.Tenant, t.deploymentOptions.Deployment.Application.Application, t.deploymentOptions.Deployment.Application.Instance, + t.deploymentOptions.Deployment.Zone.Environment, t.deploymentOptions.Deployment.Zone.Region) + req, err := http.NewRequest("GET", runsURL, nil) + if err != nil { + return 0, err + } + requestFunc := func() *http.Request { return req } + var lastRunID int64 + jobsSuccessFunc := func(status int, response []byte) (bool, error) { + if ok, err := isOK(status); !ok { + return ok, err + } + var resp jobsResponse + if err := json.Unmarshal(response, &resp); err != nil { + return false, err } + if len(resp.Runs) > 0 { + lastRunID = resp.Runs[0].ID + return true, nil + } + return false, nil } - return t.discoverEndpoints(timeout) + _, err = t.deployServiceWait(jobsSuccessFunc, requestFunc, timeout) + return lastRunID, err } -func (t *cloudTarget) waitForRun(runID int64, timeout time.Duration) error { +func (t *cloudTarget) AwaitDeployment(runID int64, timeout time.Duration) (int64, error) { + if runID == LatestDeployment { + lastRunID, err := t.discoverLatestRun(timeout) + if err != nil { + return 0, err + } + runID = lastRunID + } runURL := fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/job/%s-%s/run/%d", t.apiOptions.System.URL, t.deploymentOptions.Deployment.Application.Tenant, t.deploymentOptions.Deployment.Application.Application, t.deploymentOptions.Deployment.Application.Instance, t.deploymentOptions.Deployment.Zone.Environment, t.deploymentOptions.Deployment.Zone.Region, runID) req, err := http.NewRequest("GET", runURL, nil) if err != nil { - return err + return 0, err } lastID := int64(-1) requestFunc := func() *http.Request { @@ -275,13 +301,14 @@ func (t *cloudTarget) waitForRun(runID int64, timeout time.Duration) error { req.URL.RawQuery = q.Encode() return req } + success := false jobSuccessFunc := func(status int, response []byte) (bool, error) { if ok, err := isOK(status); !ok { return ok, err } - var resp jobResponse + var resp runResponse if err := json.Unmarshal(response, &resp); err != nil { - return false, nil + return false, err } if t.logOptions.Writer != nil { lastID = t.printLog(resp, lastID) @@ -292,20 +319,27 @@ func (t *cloudTarget) waitForRun(runID int64, timeout time.Duration) error { if resp.Status != "success" { return false, fmt.Errorf("run %d ended with unsuccessful status: %s", runID, resp.Status) } - return true, nil + success = true + return success, nil } _, err = t.deployServiceWait(jobSuccessFunc, requestFunc, timeout) - return err + if err != nil { + return 0, fmt.Errorf("deployment run %d incomplete after waiting %s: %w", runID, timeout, err) + } + if !success { + return 0, fmt.Errorf("deployment run %d incomplete after waiting %s", runID, timeout) + } + return runID, nil } -func (t *cloudTarget) printLog(response jobResponse, last int64) int64 { +func (t *cloudTarget) printLog(response runResponse, last int64) int64 { if response.LastID == 0 { return last } var msgs []logMessage for step, stepMsgs := range response.Log { for _, msg := range stepMsgs { - if step == "copyVespaLogs" && LogLevel(msg.Type) > t.logOptions.Level || LogLevel(msg.Type) == 3 { + if (step == "copyVespaLogs" && LogLevel(msg.Type) > t.logOptions.Level) || LogLevel(msg.Type) == 3 { continue } msgs = append(msgs, msg) @@ -320,14 +354,14 @@ func (t *cloudTarget) printLog(response jobResponse, last int64) int64 { return response.LastID } -func (t *cloudTarget) discoverEndpoints(timeout time.Duration) error { +func (t *cloudTarget) discoverEndpoints(timeout time.Duration) (map[string]string, error) { deploymentURL := fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/environment/%s/region/%s", t.apiOptions.System.URL, t.deploymentOptions.Deployment.Application.Tenant, t.deploymentOptions.Deployment.Application.Application, t.deploymentOptions.Deployment.Application.Instance, t.deploymentOptions.Deployment.Zone.Environment, t.deploymentOptions.Deployment.Zone.Region) req, err := http.NewRequest("GET", deploymentURL, nil) if err != nil { - return err + return nil, err } urlsByCluster := make(map[string]string) endpointFunc := func(status int, response []byte) (bool, error) { @@ -350,11 +384,10 @@ func (t *cloudTarget) discoverEndpoints(timeout time.Duration) error { return true, nil } if _, err := t.deployServiceWait(endpointFunc, func() *http.Request { return req }, timeout); err != nil { - return err + return nil, fmt.Errorf("no endpoints found after waiting %s: %w", timeout, err) } if len(urlsByCluster) == 0 { - return fmt.Errorf("no endpoints discovered for %s", t.deploymentOptions.Deployment) + return nil, fmt.Errorf("no endpoints found after waiting %s", timeout) } - t.deploymentOptions.ClusterURLs = urlsByCluster - return nil + return urlsByCluster, nil } diff --git a/client/go/internal/vespa/target_custom.go b/client/go/internal/vespa/target_custom.go index fd0af0e8d53..e6c13502430 100644 --- a/client/go/internal/vespa/target_custom.go +++ b/client/go/internal/vespa/target_custom.go @@ -3,8 +3,11 @@ package vespa import ( "encoding/json" "fmt" + "net" "net/http" "net/url" + "sort" + "strconv" "time" "github.com/vespa-engine/vespa/client/go/internal/util" @@ -12,24 +15,45 @@ import ( ) type customTarget struct { - targetType string - baseURL string - httpClient util.HTTPClient - tlsOptions TLSOptions + targetType string + baseURL string + httpClient util.HTTPClient + tlsOptions TLSOptions + retryInterval time.Duration } -type serviceConvergeResponse struct { - Converged bool `json:"converged"` +type serviceStatus struct { + Converged bool `json:"converged"` + CurrentGeneration int64 `json:"currentGeneration"` + Services []serviceInfo `json:"services"` +} + +type serviceInfo struct { + ClusterName string `json:"clusterName"` + Type string `json:"type"` + Port int `json:"port"` } // LocalTarget creates a target for a Vespa platform running locally. func LocalTarget(httpClient util.HTTPClient, tlsOptions TLSOptions) Target { - return &customTarget{targetType: TargetLocal, baseURL: "http://127.0.0.1", httpClient: httpClient, tlsOptions: tlsOptions} + return &customTarget{ + targetType: TargetLocal, + baseURL: "http://127.0.0.1", + httpClient: httpClient, + tlsOptions: tlsOptions, + retryInterval: defaultRetryInterval, + } } // CustomTarget creates a Target for a Vespa platform running at baseURL. func CustomTarget(httpClient util.HTTPClient, baseURL string, tlsOptions TLSOptions) Target { - return &customTarget{targetType: TargetCustom, baseURL: baseURL, httpClient: httpClient, tlsOptions: tlsOptions} + return &customTarget{ + targetType: TargetCustom, + baseURL: baseURL, + httpClient: httpClient, + tlsOptions: tlsOptions, + retryInterval: defaultRetryInterval, + } } func (t *customTarget) Type() string { return t.targetType } @@ -38,95 +62,132 @@ func (t *customTarget) IsCloud() bool { return false } func (t *customTarget) Deployment() Deployment { return DefaultDeployment } -func (t *customTarget) createService(name string) (*Service, error) { - switch name { - case DeployService, QueryService, DocumentService: - url, err := t.serviceURL(name, t.targetType) - if err != nil { - return nil, err - } - return &Service{BaseURL: url, Name: name, httpClient: t.httpClient, TLSOptions: t.tlsOptions}, nil +func (t *customTarget) PrintLog(options LogOptions) error { + return fmt.Errorf("log access is only supported on cloud: run vespa-logfmt on the admin node instead, or export from a container image (here named 'vespa') using docker exec vespa vespa-logfmt") +} + +func (t *customTarget) CheckVersion(version version.Version) error { return nil } + +func (t *customTarget) newService(url, name string, deployAPI bool) *Service { + return &Service{ + BaseURL: url, + Name: name, + deployAPI: deployAPI, + httpClient: t.httpClient, + TLSOptions: t.tlsOptions, + retryInterval: t.retryInterval, } - return nil, fmt.Errorf("unknown service: %s", name) } -func (t *customTarget) Service(name string, timeout time.Duration, sessionOrRunID int64, cluster string) (*Service, error) { - service, err := t.createService(name) +func (t *customTarget) DeployService(timeout time.Duration) (*Service, error) { + if t.targetType == TargetCustom { + return t.newService(t.baseURL, "", true), nil + } + u, err := t.urlWithPort(19071) if err != nil { return nil, err } + service := t.newService(u.String(), "", true) if timeout > 0 { - if name == DeployService { - status, err := service.Wait(timeout) - if err != nil { - return nil, err - } - if ok, _ := isOK(status); !ok { - return nil, fmt.Errorf("got status %d from deploy service at %s", status, service.BaseURL) - } - } else { - if err := t.waitForConvergence(timeout); err != nil { - return nil, err - } + if err := service.Wait(timeout); err != nil { + return nil, err } } return service, nil } -func (t *customTarget) PrintLog(options LogOptions) error { - return fmt.Errorf("log access is only supported on cloud: run vespa-logfmt on the admin node instead, or export from a container image (here named 'vespa') using docker exec vespa vespa-logfmt") +func (t *customTarget) ContainerServices(timeout time.Duration) ([]*Service, error) { + if t.targetType == TargetCustom { + return []*Service{t.newService(t.baseURL, "", false)}, nil + } + status, err := t.serviceStatus(AnyDeployment, timeout) + if err != nil { + return nil, err + } + portsByCluster := make(map[string]int) + for _, serviceInfo := range status.Services { + if serviceInfo.Type != "container" { + continue + } + clusterName := serviceInfo.ClusterName + if clusterName == "" { // Vespa version older than 8.206.1, which does not include cluster name in the API + clusterName = serviceInfo.Type + strconv.Itoa(serviceInfo.Port) + } + portsByCluster[clusterName] = serviceInfo.Port + } + var services []*Service + for cluster, port := range portsByCluster { + url, err := t.urlWithPort(port) + if err != nil { + return nil, err + } + service := t.newService(url.String(), cluster, false) + services = append(services, service) + } + sort.Slice(services, func(i, j int) bool { return services[i].Name < services[j].Name }) + return services, nil } -func (t *customTarget) CheckVersion(version version.Version) error { return nil } +func (t *customTarget) AwaitDeployment(generation int64, timeout time.Duration) (int64, error) { + status, err := t.serviceStatus(generation, timeout) + if err != nil { + return 0, err + } + return status.CurrentGeneration, nil +} -func (t *customTarget) serviceURL(name string, targetType string) (string, error) { +func (t *customTarget) urlWithPort(port int) (*url.URL, error) { u, err := url.Parse(t.baseURL) if err != nil { - return "", err - } - if targetType == TargetLocal { - // Use same ports as the vespaengine/vespa container image - port := "" - switch name { - case DeployService: - port = "19071" - case QueryService, DocumentService: - port = "8080" - default: - return "", fmt.Errorf("unknown service: %s", name) - } - u.Host = u.Host + ":" + port + return nil, err } - return u.String(), nil + if _, _, err := net.SplitHostPort(u.Host); err == nil { + return nil, fmt.Errorf("url %s already contains port", u) + } + u.Host = net.JoinHostPort(u.Host, strconv.Itoa(port)) + return u, nil } -func (t *customTarget) waitForConvergence(timeout time.Duration) error { - deployService, err := t.createService(DeployService) +func (t *customTarget) serviceStatus(wantedGeneration int64, timeout time.Duration) (serviceStatus, error) { + deployService, err := t.DeployService(0) if err != nil { - return err + return serviceStatus{}, err } url := fmt.Sprintf("%s/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge", deployService.BaseURL) req, err := http.NewRequest("GET", url, nil) if err != nil { - return err + return serviceStatus{}, err } + var status serviceStatus converged := false - convergedFunc := func(status int, response []byte) (bool, error) { - if ok, err := isOK(status); !ok { + convergedFunc := func(httpStatus int, response []byte) (bool, error) { + if ok, err := isOK(httpStatus); !ok { return ok, err } - var resp serviceConvergeResponse - if err := json.Unmarshal(response, &resp); err != nil { - return false, nil + if err := json.Unmarshal(response, &status); err != nil { + return false, err } - converged = resp.Converged + converged = wantedGeneration == AnyDeployment || + (wantedGeneration == LatestDeployment && status.Converged) || + status.CurrentGeneration == wantedGeneration return converged, nil } - if _, err := wait(deployService, convergedFunc, func() *http.Request { return req }, timeout); err != nil { - return err + if _, err := wait(deployService, convergedFunc, func() *http.Request { return req }, timeout, t.retryInterval); err != nil { + return serviceStatus{}, fmt.Errorf("deployment not converged%s after waiting %s: %w", generationDescription(wantedGeneration), timeout, err) } if !converged { - return fmt.Errorf("services have not converged") + return serviceStatus{}, fmt.Errorf("deployment not converged%s after waiting %s", generationDescription(wantedGeneration), timeout) + } + return status, nil +} + +func generationDescription(generation int64) string { + switch generation { + case AnyDeployment: + return "" + case LatestDeployment: + return " on latest generation" + default: + return fmt.Sprintf(" on generation %d", generation) } - return nil } diff --git a/client/go/internal/vespa/target_test.go b/client/go/internal/vespa/target_test.go index 6dc97f496f5..8fe424e897c 100644 --- a/client/go/internal/vespa/target_test.go +++ b/client/go/internal/vespa/target_test.go @@ -6,142 +6,234 @@ import ( "fmt" "io" "net/http" - "net/http/httptest" + "strings" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/vespa-engine/vespa/client/go/internal/mock" - "github.com/vespa-engine/vespa/client/go/internal/util" "github.com/vespa-engine/vespa/client/go/internal/version" ) -type mockVespaApi struct { - deploymentConverged bool - authFailure bool - serverURL string +func TestLocalTarget(t *testing.T) { + // Local target uses discovery + client := &mock.HTTPClient{} + lt := LocalTarget(client, TLSOptions{}) + assertServiceURL(t, "http://127.0.0.1:19071", lt, "deploy") + for i := 0; i < 2; i++ { + response := ` +{ + "services": [ + { + "host": "foo", + "port": 8080, + "type": "container", + "url": "http://localhost:19071/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge/localhost:8080", + "currentGeneration": 1 + }, + { + "host": "bar", + "port": 8080, + "type": "container", + "url": "http://localhost:19071/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge/localhost:8080", + "currentGeneration": 1 + }, + { + "clusterName": "feed", + "host": "localhost", + "port": 8081, + "type": "container", + "url": "http://localhost:19071/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge/localhost:8081", + "currentGeneration": 1 + }, + { + "host": "localhost", + "port": 19112, + "type": "searchnode", + "url": "http://localhost:19071/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge/localhost:19112", + "currentGeneration": 1 + } + ], + "currentGeneration": 1 +}` + client.NextResponse(mock.HTTPResponse{ + URI: "/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge", + Status: 200, + Body: []byte(response), + }) + } + assertServiceURL(t, "http://127.0.0.1:8080", lt, "container8080") + assertServiceURL(t, "http://127.0.0.1:8081", lt, "feed") } -func (v *mockVespaApi) mockVespaHandler(w http.ResponseWriter, req *http.Request) { - if v.authFailure { - response := `{"message":"unauthorized"}` - w.WriteHeader(401) - w.Write([]byte(response)) - } - switch req.URL.Path { - case "/cli/v1/": - response := `{"minVersion":"8.0.0"}` - w.Write([]byte(response)) - case "/application/v4/tenant/t1/application/a1/instance/i1/environment/dev/region/us-north-1": - response := "{}" - if v.deploymentConverged { - response = fmt.Sprintf(`{"endpoints": [{"url": "%s","scope": "zone","cluster": "cluster1"}]}`, v.serverURL) - } - w.Write([]byte(response)) - case "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1/run/42": - var response string - if v.deploymentConverged { - response = `{"active": false, "status": "success"}` - } else { - response = `{"active": true, "status": "running", - "lastId": 42, - "log": {"deployReal": [{"at": 1631707708431, - "type": "info", - "message": "Deploying platform version 7.465.17 and application version 1.0.2 ..."}]}}` - } - w.Write([]byte(response)) - case "/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge": - response := fmt.Sprintf(`{"converged": %t}`, v.deploymentConverged) - w.Write([]byte(response)) - case "/application/v4/tenant/t1/application/a1/instance/i1/environment/dev/region/us-north-1/logs": - log := `1632738690.905535 host1a.dev.aws-us-east-1c 806/53 logserver-container Container.com.yahoo.container.jdisc.ConfiguredApplication info Switching to the latest deployed set of configurations and components. Application config generation: 52532 -1632738698.600189 host1a.dev.aws-us-east-1c 1723/33590 config-sentinel sentinel.sentinel.config-owner config Sentinel got 3 service elements [tenant(vespa-team), application(music), instance(mpolden)] for config generation 52532 -` - w.Write([]byte(log)) - case "/status.html": - w.Write([]byte("OK")) - case "/ApplicationStatus": - w.WriteHeader(500) - w.Write([]byte("Unknown error")) +func setRetryInterval(target Target, interval time.Duration) { + switch t := target.(type) { + case *cloudTarget: + t.retryInterval = interval + case *customTarget: + t.retryInterval = interval default: - w.WriteHeader(400) - w.Write([]byte("Invalid path: " + req.URL.Path)) + panic(fmt.Sprintf("unexpected type %T", t)) } } func TestCustomTarget(t *testing.T) { - lt := LocalTarget(&mock.HTTPClient{}, TLSOptions{}) - assertServiceURL(t, "http://127.0.0.1:19071", lt, "deploy") - assertServiceURL(t, "http://127.0.0.1:8080", lt, "query") - assertServiceURL(t, "http://127.0.0.1:8080", lt, "document") - + // Custom target always uses URL directly, without discovery ct := CustomTarget(&mock.HTTPClient{}, "http://192.0.2.42", TLSOptions{}) assertServiceURL(t, "http://192.0.2.42", ct, "deploy") - assertServiceURL(t, "http://192.0.2.42", ct, "query") - assertServiceURL(t, "http://192.0.2.42", ct, "document") - + assertServiceURL(t, "http://192.0.2.42", ct, "") ct2 := CustomTarget(&mock.HTTPClient{}, "http://192.0.2.42:60000", TLSOptions{}) assertServiceURL(t, "http://192.0.2.42:60000", ct2, "deploy") - assertServiceURL(t, "http://192.0.2.42:60000", ct2, "query") - assertServiceURL(t, "http://192.0.2.42:60000", ct2, "document") + assertServiceURL(t, "http://192.0.2.42:60000", ct2, "") } func TestCustomTargetWait(t *testing.T) { - vc := mockVespaApi{} - srv := httptest.NewServer(http.HandlerFunc(vc.mockVespaHandler)) - defer srv.Close() - target := CustomTarget(util.CreateClient(time.Second*10), srv.URL, TLSOptions{}) + client := &mock.HTTPClient{} + target := CustomTarget(client, "http://192.0.2.42", TLSOptions{}) + setRetryInterval(target, 0) + // Fails once + client.NextStatus(500) + assertService(t, true, target, "", 0) + // Fails multiple times + for i := 0; i < 3; i++ { + client.NextStatus(500) + client.NextResponseError(io.EOF) + } + // Then succeeds + client.NextResponse(mock.HTTPResponse{URI: "/ApplicationStatus", Status: 200}) + assertService(t, false, target, "", time.Second) +} + +func TestCustomTargetAwaitDeployment(t *testing.T) { + client := &mock.HTTPClient{} + target := CustomTarget(client, "http://192.0.2.42", TLSOptions{}) - _, err := target.Service("query", time.Millisecond, 42, "") + // Not converged initially + _, err := target.AwaitDeployment(42, 0) assert.NotNil(t, err) - vc.deploymentConverged = true - _, err = target.Service("query", time.Millisecond, 42, "") - assert.Nil(t, err) + // Not converged on this generation + response := mock.HTTPResponse{ + URI: "/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge", + Status: 200, + Body: []byte(`{"currentGeneration": 42}`), + } + client.NextResponse(response) + _, err = target.AwaitDeployment(41, 0) + assert.NotNil(t, err) - assertServiceWait(t, 200, target, "deploy") - assertServiceWait(t, 500, target, "query") - assertServiceWait(t, 500, target, "document") + // Converged + client.NextResponse(response) + convergedID, err := target.AwaitDeployment(42, 0) + assert.Nil(t, err) + assert.Equal(t, int64(42), convergedID) } func TestCloudTargetWait(t *testing.T) { - vc := mockVespaApi{} - srv := httptest.NewServer(http.HandlerFunc(vc.mockVespaHandler)) - defer srv.Close() - vc.serverURL = srv.URL - var logWriter bytes.Buffer - target := createCloudTarget(t, srv.URL, &logWriter) - vc.authFailure = true - assertServiceWaitErr(t, 401, true, target, "deploy") - vc.authFailure = false - assertServiceWait(t, 200, target, "deploy") + target, client := createCloudTarget(t, &logWriter) + client.NextStatus(401) + assertService(t, true, target, "deploy", time.Second) // No retrying on 4xx + client.NextStatus(500) + client.NextStatus(500) + client.NextResponse(mock.HTTPResponse{URI: "/status.html", Status: 200}) + assertService(t, false, target, "deploy", time.Second) - _, err := target.Service("query", time.Millisecond, 42, "") + client.NextResponse(mock.HTTPResponse{ + URI: "/application/v4/tenant/t1/application/a1/instance/i1/environment/dev/region/us-north-1", + Status: 200, + Body: []byte(`{"endpoints":[]}`), + }) + _, err := target.ContainerServices(time.Millisecond) assert.NotNil(t, err) - vc.deploymentConverged = true - _, err = target.Service("query", time.Millisecond, 42, "") + response := mock.HTTPResponse{ + URI: "/application/v4/tenant/t1/application/a1/instance/i1/environment/dev/region/us-north-1", + Status: 200, + Body: []byte(`{ + "endpoints": [ + {"url": "http://a.example.com","scope": "zone", "cluster": "default"}, + {"url": "http://b.example.com","scope": "zone", "cluster": "feed"} + ] +}`), + } + client.NextResponse(response) + services, err := target.ContainerServices(time.Millisecond) assert.Nil(t, err) + assert.Equal(t, 2, len(services)) + + client.NextResponse(response) + client.NextResponse(mock.HTTPResponse{URI: "/ApplicationStatus", Status: 500}) + assertService(t, true, target, "default", 0) + client.NextResponse(response) + client.NextResponse(mock.HTTPResponse{URI: "/ApplicationStatus", Status: 200}) + assertService(t, false, target, "feed", 0) +} + +func TestCloudTargetAwaitDeployment(t *testing.T) { + var logWriter bytes.Buffer + target, client := createCloudTarget(t, &logWriter) - assertServiceWait(t, 500, target, "query") - assertServiceWait(t, 500, target, "document") + runningResponse := mock.HTTPResponse{ + URI: "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1/run/42?after=-1", + Status: 200, + Body: []byte(`{"active": true, "status": "running", + "lastId": 42, + "log": {"deployReal": [{"at": 1631707708431, + "type": "info", + "message": "Deploying platform version 7.465.17 and application version 1.0.2 ..."}]}}`), + } + client.NextResponse(runningResponse) + runningResponse.URI = "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1/run/42?after=42" + client.NextResponse(runningResponse) + // Deployment has not succeeded yet + _, err := target.AwaitDeployment(int64(42), time.Second) + assert.NotNil(t, err) // Log timestamp is converted to local time, do the same here in case the local time where tests are run varies tm := time.Unix(1631707708, 431000) expectedTime := tm.Format("[15:04:05]") - assert.Equal(t, expectedTime+" info Deploying platform version 7.465.17 and application version 1.0.2 ...\n", logWriter.String()) + assert.Equal(t, strings.Repeat(expectedTime+" info Deploying platform version 7.465.17 and application version 1.0.2 ...\n", 2), logWriter.String()) + + // Wanted deployment run eventually succeeds + runningResponse.URI = "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1/run/42?after=-1" + client.NextResponse(runningResponse) + client.NextResponse(mock.HTTPResponse{ + URI: "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1/run/42?after=42", + Status: 200, + Body: []byte(`{"active": false, "status": "success"}`), + }) + convergedID, err := target.AwaitDeployment(int64(42), time.Second) + assert.Nil(t, err) + assert.Equal(t, int64(42), convergedID) + + // Await latest deployment + client.NextResponse(mock.HTTPResponse{ + URI: "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1?limit=1", + Status: 200, + Body: []byte(`{"runs": [{"id": 1337}]}`), + }) + client.NextResponse(mock.HTTPResponse{ + URI: "/application/v4/tenant/t1/application/a1/instance/i1/job/dev-us-north-1/run/1337?after=-1", + Status: 200, + Body: []byte(`{"active": false, "status": "success"}`), + }) + convergedID, err = target.AwaitDeployment(LatestDeployment, time.Second) + assert.Nil(t, err) + assert.Equal(t, int64(1337), convergedID) } func TestLog(t *testing.T) { - vc := mockVespaApi{} - srv := httptest.NewServer(http.HandlerFunc(vc.mockVespaHandler)) - defer srv.Close() - vc.serverURL = srv.URL - vc.deploymentConverged = true - + target, client := createCloudTarget(t, io.Discard) + client.NextResponse(mock.HTTPResponse{ + URI: "/application/v4/tenant/t1/application/a1/instance/i1/environment/dev/region/us-north-1/logs?from=-62135596800000", + Status: 200, + Body: []byte(`1632738690.905535 host1a.dev.aws-us-east-1c 806/53 logserver-container Container.com.yahoo.container.jdisc.ConfiguredApplication info Switching to the latest deployed set of configurations and components. Application config generation: 52532 +1632738698.600189 host1a.dev.aws-us-east-1c 1723/33590 config-sentinel sentinel.sentinel.config-owner config Sentinel got 3 service elements [tenant(vespa-team), application(music), instance(mpolden)] for config generation 52532 +`), + }) var buf bytes.Buffer - target := createCloudTarget(t, srv.URL, io.Discard) if err := target.PrintLog(LogOptions{Writer: &buf, Level: 3}); err != nil { t.Fatal(err) } @@ -151,23 +243,22 @@ func TestLog(t *testing.T) { } func TestCheckVersion(t *testing.T) { - vc := mockVespaApi{} - srv := httptest.NewServer(http.HandlerFunc(vc.mockVespaHandler)) - defer srv.Close() - - target := createCloudTarget(t, srv.URL, io.Discard) + target, client := createCloudTarget(t, io.Discard) + for i := 0; i < 3; i++ { + client.NextResponse(mock.HTTPResponse{URI: "/cli/v1/", Status: 200, Body: []byte(`{"minVersion":"8.0.0"}`)}) + } assert.Nil(t, target.CheckVersion(version.MustParse("8.0.0"))) assert.Nil(t, target.CheckVersion(version.MustParse("8.1.0"))) assert.NotNil(t, target.CheckVersion(version.MustParse("7.0.0"))) } -func createCloudTarget(t *testing.T, url string, logWriter io.Writer) Target { +func createCloudTarget(t *testing.T, logWriter io.Writer) (Target, *mock.HTTPClient) { apiKey, err := CreateAPIKey() - assert.Nil(t, err) - + require.Nil(t, err) auth := &mockAuthenticator{} + client := &mock.HTTPClient{} target, err := CloudTarget( - util.CreateClient(time.Second*10), + client, auth, auth, APIOptions{APIKey: apiKey, System: PublicSystem}, @@ -179,38 +270,38 @@ func createCloudTarget(t *testing.T, url string, logWriter io.Writer) Target { }, LogOptions{Writer: logWriter}, ) - if err != nil { - t.Fatal(err) - } - if ct, ok := target.(*cloudTarget); ok { - ct.apiOptions.System.URL = url - } else { - t.Fatalf("Wrong target type %T", ct) - } - return target + require.Nil(t, err) + setRetryInterval(target, 0) + return target, client } -func assertServiceURL(t *testing.T, url string, target Target, service string) { - s, err := target.Service(service, 0, 42, "") - assert.Nil(t, err) - assert.Equal(t, url, s.BaseURL) +func getService(t *testing.T, target Target, name string) (*Service, error) { + t.Helper() + if name == "deploy" { + return target.DeployService(0) + } + services, err := target.ContainerServices(0) + require.Nil(t, err) + return FindService(name, services) } -func assertServiceWait(t *testing.T, expectedStatus int, target Target, service string) { - assertServiceWaitErr(t, expectedStatus, false, target, service) +func assertServiceURL(t *testing.T, url string, target Target, serviceName string) { + t.Helper() + service, err := getService(t, target, serviceName) + require.Nil(t, err) + assert.Equal(t, url, service.BaseURL) } -func assertServiceWaitErr(t *testing.T, expectedStatus int, expectErr bool, target Target, service string) { - s, err := target.Service(service, 0, 42, "") - assert.Nil(t, err) - - status, err := s.Wait(0) - if expectErr { +func assertService(t *testing.T, fail bool, target Target, serviceName string, timeout time.Duration) { + t.Helper() + service, err := getService(t, target, serviceName) + require.Nil(t, err) + err = service.Wait(timeout) + if fail { assert.NotNil(t, err) } else { assert.Nil(t, err) } - assert.Equal(t, expectedStatus, status) } type mockAuthenticator struct{} |