diff options
author | Martin Polden <mpolden@mpolden.no> | 2023-08-17 14:55:28 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2023-08-21 14:26:36 +0200 |
commit | b1863768b512a7200496f8646fe239d3786d4443 (patch) | |
tree | dd21c88008f42506560b0e034769207d751f4cc3 /client/go/internal/vespa/target_cloud.go | |
parent | 873350caf5e984b5a580e2e0585dfd521eb493c0 (diff) |
Support cluster discovery for all target types
Diffstat (limited to 'client/go/internal/vespa/target_cloud.go')
-rw-r--r-- | client/go/internal/vespa/target_cloud.go | 205 |
1 files changed, 119 insertions, 86 deletions
diff --git a/client/go/internal/vespa/target_cloud.go b/client/go/internal/vespa/target_cloud.go index c0169f1a9bd..9b53180b7bb 100644 --- a/client/go/internal/vespa/target_cloud.go +++ b/client/go/internal/vespa/target_cloud.go @@ -3,12 +3,12 @@ package vespa import ( "bytes" "encoding/json" + "errors" "fmt" "math" "net/http" "sort" "strconv" - "strings" "time" "github.com/vespa-engine/vespa/client/go/internal/util" @@ -37,6 +37,7 @@ type cloudTarget struct { httpClient util.HTTPClient apiAuth Authenticator deploymentAuth Authenticator + retryInterval time.Duration } type deploymentEndpoint struct { @@ -49,13 +50,21 @@ type deploymentResponse struct { Endpoints []deploymentEndpoint `json:"endpoints"` } -type jobResponse struct { +type runResponse struct { Active bool `json:"active"` Status string `json:"status"` Log map[string][]logMessage `json:"log"` LastID int64 `json:"lastId"` } +type jobResponse struct { + ID int64 `json:"id"` +} + +type jobsResponse struct { + Runs []jobResponse `json:"runs"` +} + type logMessage struct { At int64 `json:"at"` Type string `json:"type"` @@ -71,40 +80,10 @@ func CloudTarget(httpClient util.HTTPClient, apiAuth Authenticator, deploymentAu logOptions: logOptions, apiAuth: apiAuth, deploymentAuth: deploymentAuth, + retryInterval: defaultRetryInterval, }, nil } -func (t *cloudTarget) findClusterURL(cluster string, timeout time.Duration, runID int64) (string, error) { - if t.deploymentOptions.CustomURL != "" { - return t.deploymentOptions.CustomURL, nil - } - if t.deploymentOptions.ClusterURLs == nil { - if err := t.waitForEndpoints(timeout, runID); err != nil { - return "", err - } - } - clusters := make([]string, 0, len(t.deploymentOptions.ClusterURLs)) - for c := range t.deploymentOptions.ClusterURLs { - clusters = append(clusters, c) - } - if cluster == "" { - for _, url := range t.deploymentOptions.ClusterURLs { - if len(t.deploymentOptions.ClusterURLs) == 1 { - return url, nil - } else { - return "", fmt.Errorf("no cluster specified: found multiple clusters '%s'", strings.Join(clusters, "', '")) - } - } - } else { - url, ok := t.deploymentOptions.ClusterURLs[cluster] - if !ok { - return "", fmt.Errorf("invalid cluster '%s': must be one of '%s'", cluster, strings.Join(clusters, "', '")) - } - return url, nil - } - return "", fmt.Errorf("no endpoints found") -} - func (t *cloudTarget) Type() string { switch t.apiOptions.System.Name { case MainSystem.Name, CDSystem.Name: @@ -117,41 +96,58 @@ func (t *cloudTarget) IsCloud() bool { return true } func (t *cloudTarget) Deployment() Deployment { return t.deploymentOptions.Deployment } -func (t *cloudTarget) Service(name string, timeout time.Duration, runID int64, cluster string) (*Service, error) { - switch name { - case DeployService: +func (t *cloudTarget) DeployService(timeout time.Duration) (*Service, error) { + service := &Service{ + BaseURL: t.apiOptions.System.URL, + TLSOptions: t.apiOptions.TLSOptions, + deployAPI: true, + httpClient: t.httpClient, + auth: t.apiAuth, + retryInterval: t.retryInterval, + } + if timeout > 0 { + if err := service.Wait(timeout); err != nil { + return nil, err + } + } + return service, nil +} + +func (t *cloudTarget) ContainerServices(timeout time.Duration) ([]*Service, error) { + var clusterUrls map[string]string + if t.deploymentOptions.CustomURL != "" { + // Custom URL is always preferred + clusterUrls = map[string]string{"": t.deploymentOptions.CustomURL} + } else if t.deploymentOptions.ClusterURLs != nil { + // ... then endpoints specified through environment + clusterUrls = t.deploymentOptions.ClusterURLs + } else { + // ... then discovered endpoints + endpoints, err := t.discoverEndpoints(timeout) + if err != nil { + return nil, err + } + clusterUrls = endpoints + } + services := make([]*Service, 0, len(clusterUrls)) + for name, url := range clusterUrls { service := &Service{ - Name: name, - BaseURL: t.apiOptions.System.URL, - TLSOptions: t.apiOptions.TLSOptions, - httpClient: t.httpClient, - auth: t.apiAuth, + Name: name, + BaseURL: url, + TLSOptions: t.deploymentOptions.TLSOptions, + httpClient: t.httpClient, + auth: t.deploymentAuth, + retryInterval: t.retryInterval, } if timeout > 0 { - status, err := service.Wait(timeout) - if err != nil { + if err := service.Wait(timeout); err != nil { return nil, err } - if ok, _ := isOK(status); !ok { - return nil, fmt.Errorf("got status %d from deploy service at %s", status, service.BaseURL) - } } - return service, nil - case QueryService, DocumentService: - url, err := t.findClusterURL(cluster, timeout, runID) - if err != nil { - return nil, err - } - return &Service{ - Name: name, - BaseURL: url, - TLSOptions: t.deploymentOptions.TLSOptions, - httpClient: t.httpClient, - auth: t.deploymentAuth, - }, nil - default: - return nil, fmt.Errorf("unknown service: %s", name) + services = append(services, service) } + sort.Slice(services, func(i, j int) bool { return services[i].Name < services[j].Name }) + return services, nil } func (t *cloudTarget) CheckVersion(clientVersion version.Version) error { @@ -162,7 +158,7 @@ func (t *cloudTarget) CheckVersion(clientVersion version.Version) error { if err != nil { return err } - deployService, err := t.Service(DeployService, 0, 0, "") + deployService, err := t.DeployService(0) if err != nil { return err } @@ -218,7 +214,7 @@ func (t *cloudTarget) PrintLog(options LogOptions) error { } logEntries, err := ReadLogEntries(bytes.NewReader(response)) if err != nil { - return true, err + return false, err } for _, le := range logEntries { if !le.Time.After(lastFrom) { @@ -238,35 +234,65 @@ func (t *cloudTarget) PrintLog(options LogOptions) error { if options.Follow { timeout = math.MaxInt64 // No timeout } - _, err = t.deployServiceWait(logFunc, requestFunc, timeout) - return err + // Ignore wait error because logFunc has no concept of completion, we just want to print log entries until timeout is reached + if _, err := t.deployServiceWait(logFunc, requestFunc, timeout); err != nil && !errors.Is(err, errWaitTimeout) { + return fmt.Errorf("failed to read logs: %s", err) + } + return nil } func (t *cloudTarget) deployServiceWait(fn responseFunc, reqFn requestFunc, timeout time.Duration) (int, error) { - deployService, err := t.Service(DeployService, 0, 0, "") + deployService, err := t.DeployService(0) if err != nil { return 0, err } - return wait(deployService, fn, reqFn, timeout) + return wait(deployService, fn, reqFn, timeout, t.retryInterval) } -func (t *cloudTarget) waitForEndpoints(timeout time.Duration, runID int64) error { - if runID > 0 { - if err := t.waitForRun(runID, timeout); err != nil { - return err +func (t *cloudTarget) discoverLatestRun(timeout time.Duration) (int64, error) { + runsURL := fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/job/%s-%s?limit=1", + t.apiOptions.System.URL, + t.deploymentOptions.Deployment.Application.Tenant, t.deploymentOptions.Deployment.Application.Application, t.deploymentOptions.Deployment.Application.Instance, + t.deploymentOptions.Deployment.Zone.Environment, t.deploymentOptions.Deployment.Zone.Region) + req, err := http.NewRequest("GET", runsURL, nil) + if err != nil { + return 0, err + } + requestFunc := func() *http.Request { return req } + var lastRunID int64 + jobsSuccessFunc := func(status int, response []byte) (bool, error) { + if ok, err := isOK(status); !ok { + return ok, err + } + var resp jobsResponse + if err := json.Unmarshal(response, &resp); err != nil { + return false, err } + if len(resp.Runs) > 0 { + lastRunID = resp.Runs[0].ID + return true, nil + } + return false, nil } - return t.discoverEndpoints(timeout) + _, err = t.deployServiceWait(jobsSuccessFunc, requestFunc, timeout) + return lastRunID, err } -func (t *cloudTarget) waitForRun(runID int64, timeout time.Duration) error { +func (t *cloudTarget) AwaitDeployment(runID int64, timeout time.Duration) (int64, error) { + if runID == LatestDeployment { + lastRunID, err := t.discoverLatestRun(timeout) + if err != nil { + return 0, err + } + runID = lastRunID + } runURL := fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/job/%s-%s/run/%d", t.apiOptions.System.URL, t.deploymentOptions.Deployment.Application.Tenant, t.deploymentOptions.Deployment.Application.Application, t.deploymentOptions.Deployment.Application.Instance, t.deploymentOptions.Deployment.Zone.Environment, t.deploymentOptions.Deployment.Zone.Region, runID) req, err := http.NewRequest("GET", runURL, nil) if err != nil { - return err + return 0, err } lastID := int64(-1) requestFunc := func() *http.Request { @@ -275,13 +301,14 @@ func (t *cloudTarget) waitForRun(runID int64, timeout time.Duration) error { req.URL.RawQuery = q.Encode() return req } + success := false jobSuccessFunc := func(status int, response []byte) (bool, error) { if ok, err := isOK(status); !ok { return ok, err } - var resp jobResponse + var resp runResponse if err := json.Unmarshal(response, &resp); err != nil { - return false, nil + return false, err } if t.logOptions.Writer != nil { lastID = t.printLog(resp, lastID) @@ -292,20 +319,27 @@ func (t *cloudTarget) waitForRun(runID int64, timeout time.Duration) error { if resp.Status != "success" { return false, fmt.Errorf("run %d ended with unsuccessful status: %s", runID, resp.Status) } - return true, nil + success = true + return success, nil } _, err = t.deployServiceWait(jobSuccessFunc, requestFunc, timeout) - return err + if err != nil { + return 0, fmt.Errorf("deployment run %d incomplete after waiting %s: %w", runID, timeout, err) + } + if !success { + return 0, fmt.Errorf("deployment run %d incomplete after waiting %s", runID, timeout) + } + return runID, nil } -func (t *cloudTarget) printLog(response jobResponse, last int64) int64 { +func (t *cloudTarget) printLog(response runResponse, last int64) int64 { if response.LastID == 0 { return last } var msgs []logMessage for step, stepMsgs := range response.Log { for _, msg := range stepMsgs { - if step == "copyVespaLogs" && LogLevel(msg.Type) > t.logOptions.Level || LogLevel(msg.Type) == 3 { + if (step == "copyVespaLogs" && LogLevel(msg.Type) > t.logOptions.Level) || LogLevel(msg.Type) == 3 { continue } msgs = append(msgs, msg) @@ -320,14 +354,14 @@ func (t *cloudTarget) printLog(response jobResponse, last int64) int64 { return response.LastID } -func (t *cloudTarget) discoverEndpoints(timeout time.Duration) error { +func (t *cloudTarget) discoverEndpoints(timeout time.Duration) (map[string]string, error) { deploymentURL := fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/environment/%s/region/%s", t.apiOptions.System.URL, t.deploymentOptions.Deployment.Application.Tenant, t.deploymentOptions.Deployment.Application.Application, t.deploymentOptions.Deployment.Application.Instance, t.deploymentOptions.Deployment.Zone.Environment, t.deploymentOptions.Deployment.Zone.Region) req, err := http.NewRequest("GET", deploymentURL, nil) if err != nil { - return err + return nil, err } urlsByCluster := make(map[string]string) endpointFunc := func(status int, response []byte) (bool, error) { @@ -350,11 +384,10 @@ func (t *cloudTarget) discoverEndpoints(timeout time.Duration) error { return true, nil } if _, err := t.deployServiceWait(endpointFunc, func() *http.Request { return req }, timeout); err != nil { - return err + return nil, fmt.Errorf("no endpoints found after waiting %s: %w", timeout, err) } if len(urlsByCluster) == 0 { - return fmt.Errorf("no endpoints discovered for %s", t.deploymentOptions.Deployment) + return nil, fmt.Errorf("no endpoints found after waiting %s", timeout) } - t.deploymentOptions.ClusterURLs = urlsByCluster - return nil + return urlsByCluster, nil } |