aboutsummaryrefslogtreecommitdiffstats
path: root/client/go/internal/vespa/target_cloud.go
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2023-08-17 14:55:28 +0200
committerMartin Polden <mpolden@mpolden.no>2023-08-21 14:26:36 +0200
commitb1863768b512a7200496f8646fe239d3786d4443 (patch)
treedd21c88008f42506560b0e034769207d751f4cc3 /client/go/internal/vespa/target_cloud.go
parent873350caf5e984b5a580e2e0585dfd521eb493c0 (diff)
Support cluster discovery for all target types
Diffstat (limited to 'client/go/internal/vespa/target_cloud.go')
-rw-r--r--client/go/internal/vespa/target_cloud.go205
1 files changed, 119 insertions, 86 deletions
diff --git a/client/go/internal/vespa/target_cloud.go b/client/go/internal/vespa/target_cloud.go
index c0169f1a9bd..9b53180b7bb 100644
--- a/client/go/internal/vespa/target_cloud.go
+++ b/client/go/internal/vespa/target_cloud.go
@@ -3,12 +3,12 @@ package vespa
import (
"bytes"
"encoding/json"
+ "errors"
"fmt"
"math"
"net/http"
"sort"
"strconv"
- "strings"
"time"
"github.com/vespa-engine/vespa/client/go/internal/util"
@@ -37,6 +37,7 @@ type cloudTarget struct {
httpClient util.HTTPClient
apiAuth Authenticator
deploymentAuth Authenticator
+ retryInterval time.Duration
}
type deploymentEndpoint struct {
@@ -49,13 +50,21 @@ type deploymentResponse struct {
Endpoints []deploymentEndpoint `json:"endpoints"`
}
-type jobResponse struct {
+type runResponse struct {
Active bool `json:"active"`
Status string `json:"status"`
Log map[string][]logMessage `json:"log"`
LastID int64 `json:"lastId"`
}
+type jobResponse struct {
+ ID int64 `json:"id"`
+}
+
+type jobsResponse struct {
+ Runs []jobResponse `json:"runs"`
+}
+
type logMessage struct {
At int64 `json:"at"`
Type string `json:"type"`
@@ -71,40 +80,10 @@ func CloudTarget(httpClient util.HTTPClient, apiAuth Authenticator, deploymentAu
logOptions: logOptions,
apiAuth: apiAuth,
deploymentAuth: deploymentAuth,
+ retryInterval: defaultRetryInterval,
}, nil
}
-func (t *cloudTarget) findClusterURL(cluster string, timeout time.Duration, runID int64) (string, error) {
- if t.deploymentOptions.CustomURL != "" {
- return t.deploymentOptions.CustomURL, nil
- }
- if t.deploymentOptions.ClusterURLs == nil {
- if err := t.waitForEndpoints(timeout, runID); err != nil {
- return "", err
- }
- }
- clusters := make([]string, 0, len(t.deploymentOptions.ClusterURLs))
- for c := range t.deploymentOptions.ClusterURLs {
- clusters = append(clusters, c)
- }
- if cluster == "" {
- for _, url := range t.deploymentOptions.ClusterURLs {
- if len(t.deploymentOptions.ClusterURLs) == 1 {
- return url, nil
- } else {
- return "", fmt.Errorf("no cluster specified: found multiple clusters '%s'", strings.Join(clusters, "', '"))
- }
- }
- } else {
- url, ok := t.deploymentOptions.ClusterURLs[cluster]
- if !ok {
- return "", fmt.Errorf("invalid cluster '%s': must be one of '%s'", cluster, strings.Join(clusters, "', '"))
- }
- return url, nil
- }
- return "", fmt.Errorf("no endpoints found")
-}
-
func (t *cloudTarget) Type() string {
switch t.apiOptions.System.Name {
case MainSystem.Name, CDSystem.Name:
@@ -117,41 +96,58 @@ func (t *cloudTarget) IsCloud() bool { return true }
func (t *cloudTarget) Deployment() Deployment { return t.deploymentOptions.Deployment }
-func (t *cloudTarget) Service(name string, timeout time.Duration, runID int64, cluster string) (*Service, error) {
- switch name {
- case DeployService:
+func (t *cloudTarget) DeployService(timeout time.Duration) (*Service, error) {
+ service := &Service{
+ BaseURL: t.apiOptions.System.URL,
+ TLSOptions: t.apiOptions.TLSOptions,
+ deployAPI: true,
+ httpClient: t.httpClient,
+ auth: t.apiAuth,
+ retryInterval: t.retryInterval,
+ }
+ if timeout > 0 {
+ if err := service.Wait(timeout); err != nil {
+ return nil, err
+ }
+ }
+ return service, nil
+}
+
+func (t *cloudTarget) ContainerServices(timeout time.Duration) ([]*Service, error) {
+ var clusterUrls map[string]string
+ if t.deploymentOptions.CustomURL != "" {
+ // Custom URL is always preferred
+ clusterUrls = map[string]string{"": t.deploymentOptions.CustomURL}
+ } else if t.deploymentOptions.ClusterURLs != nil {
+ // ... then endpoints specified through environment
+ clusterUrls = t.deploymentOptions.ClusterURLs
+ } else {
+ // ... then discovered endpoints
+ endpoints, err := t.discoverEndpoints(timeout)
+ if err != nil {
+ return nil, err
+ }
+ clusterUrls = endpoints
+ }
+ services := make([]*Service, 0, len(clusterUrls))
+ for name, url := range clusterUrls {
service := &Service{
- Name: name,
- BaseURL: t.apiOptions.System.URL,
- TLSOptions: t.apiOptions.TLSOptions,
- httpClient: t.httpClient,
- auth: t.apiAuth,
+ Name: name,
+ BaseURL: url,
+ TLSOptions: t.deploymentOptions.TLSOptions,
+ httpClient: t.httpClient,
+ auth: t.deploymentAuth,
+ retryInterval: t.retryInterval,
}
if timeout > 0 {
- status, err := service.Wait(timeout)
- if err != nil {
+ if err := service.Wait(timeout); err != nil {
return nil, err
}
- if ok, _ := isOK(status); !ok {
- return nil, fmt.Errorf("got status %d from deploy service at %s", status, service.BaseURL)
- }
}
- return service, nil
- case QueryService, DocumentService:
- url, err := t.findClusterURL(cluster, timeout, runID)
- if err != nil {
- return nil, err
- }
- return &Service{
- Name: name,
- BaseURL: url,
- TLSOptions: t.deploymentOptions.TLSOptions,
- httpClient: t.httpClient,
- auth: t.deploymentAuth,
- }, nil
- default:
- return nil, fmt.Errorf("unknown service: %s", name)
+ services = append(services, service)
}
+ sort.Slice(services, func(i, j int) bool { return services[i].Name < services[j].Name })
+ return services, nil
}
func (t *cloudTarget) CheckVersion(clientVersion version.Version) error {
@@ -162,7 +158,7 @@ func (t *cloudTarget) CheckVersion(clientVersion version.Version) error {
if err != nil {
return err
}
- deployService, err := t.Service(DeployService, 0, 0, "")
+ deployService, err := t.DeployService(0)
if err != nil {
return err
}
@@ -218,7 +214,7 @@ func (t *cloudTarget) PrintLog(options LogOptions) error {
}
logEntries, err := ReadLogEntries(bytes.NewReader(response))
if err != nil {
- return true, err
+ return false, err
}
for _, le := range logEntries {
if !le.Time.After(lastFrom) {
@@ -238,35 +234,65 @@ func (t *cloudTarget) PrintLog(options LogOptions) error {
if options.Follow {
timeout = math.MaxInt64 // No timeout
}
- _, err = t.deployServiceWait(logFunc, requestFunc, timeout)
- return err
+ // Ignore wait error because logFunc has no concept of completion, we just want to print log entries until timeout is reached
+ if _, err := t.deployServiceWait(logFunc, requestFunc, timeout); err != nil && !errors.Is(err, errWaitTimeout) {
+ return fmt.Errorf("failed to read logs: %s", err)
+ }
+ return nil
}
func (t *cloudTarget) deployServiceWait(fn responseFunc, reqFn requestFunc, timeout time.Duration) (int, error) {
- deployService, err := t.Service(DeployService, 0, 0, "")
+ deployService, err := t.DeployService(0)
if err != nil {
return 0, err
}
- return wait(deployService, fn, reqFn, timeout)
+ return wait(deployService, fn, reqFn, timeout, t.retryInterval)
}
-func (t *cloudTarget) waitForEndpoints(timeout time.Duration, runID int64) error {
- if runID > 0 {
- if err := t.waitForRun(runID, timeout); err != nil {
- return err
+func (t *cloudTarget) discoverLatestRun(timeout time.Duration) (int64, error) {
+ runsURL := fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/job/%s-%s?limit=1",
+ t.apiOptions.System.URL,
+ t.deploymentOptions.Deployment.Application.Tenant, t.deploymentOptions.Deployment.Application.Application, t.deploymentOptions.Deployment.Application.Instance,
+ t.deploymentOptions.Deployment.Zone.Environment, t.deploymentOptions.Deployment.Zone.Region)
+ req, err := http.NewRequest("GET", runsURL, nil)
+ if err != nil {
+ return 0, err
+ }
+ requestFunc := func() *http.Request { return req }
+ var lastRunID int64
+ jobsSuccessFunc := func(status int, response []byte) (bool, error) {
+ if ok, err := isOK(status); !ok {
+ return ok, err
+ }
+ var resp jobsResponse
+ if err := json.Unmarshal(response, &resp); err != nil {
+ return false, err
}
+ if len(resp.Runs) > 0 {
+ lastRunID = resp.Runs[0].ID
+ return true, nil
+ }
+ return false, nil
}
- return t.discoverEndpoints(timeout)
+ _, err = t.deployServiceWait(jobsSuccessFunc, requestFunc, timeout)
+ return lastRunID, err
}
-func (t *cloudTarget) waitForRun(runID int64, timeout time.Duration) error {
+func (t *cloudTarget) AwaitDeployment(runID int64, timeout time.Duration) (int64, error) {
+ if runID == LatestDeployment {
+ lastRunID, err := t.discoverLatestRun(timeout)
+ if err != nil {
+ return 0, err
+ }
+ runID = lastRunID
+ }
runURL := fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/job/%s-%s/run/%d",
t.apiOptions.System.URL,
t.deploymentOptions.Deployment.Application.Tenant, t.deploymentOptions.Deployment.Application.Application, t.deploymentOptions.Deployment.Application.Instance,
t.deploymentOptions.Deployment.Zone.Environment, t.deploymentOptions.Deployment.Zone.Region, runID)
req, err := http.NewRequest("GET", runURL, nil)
if err != nil {
- return err
+ return 0, err
}
lastID := int64(-1)
requestFunc := func() *http.Request {
@@ -275,13 +301,14 @@ func (t *cloudTarget) waitForRun(runID int64, timeout time.Duration) error {
req.URL.RawQuery = q.Encode()
return req
}
+ success := false
jobSuccessFunc := func(status int, response []byte) (bool, error) {
if ok, err := isOK(status); !ok {
return ok, err
}
- var resp jobResponse
+ var resp runResponse
if err := json.Unmarshal(response, &resp); err != nil {
- return false, nil
+ return false, err
}
if t.logOptions.Writer != nil {
lastID = t.printLog(resp, lastID)
@@ -292,20 +319,27 @@ func (t *cloudTarget) waitForRun(runID int64, timeout time.Duration) error {
if resp.Status != "success" {
return false, fmt.Errorf("run %d ended with unsuccessful status: %s", runID, resp.Status)
}
- return true, nil
+ success = true
+ return success, nil
}
_, err = t.deployServiceWait(jobSuccessFunc, requestFunc, timeout)
- return err
+ if err != nil {
+ return 0, fmt.Errorf("deployment run %d incomplete after waiting %s: %w", runID, timeout, err)
+ }
+ if !success {
+ return 0, fmt.Errorf("deployment run %d incomplete after waiting %s", runID, timeout)
+ }
+ return runID, nil
}
-func (t *cloudTarget) printLog(response jobResponse, last int64) int64 {
+func (t *cloudTarget) printLog(response runResponse, last int64) int64 {
if response.LastID == 0 {
return last
}
var msgs []logMessage
for step, stepMsgs := range response.Log {
for _, msg := range stepMsgs {
- if step == "copyVespaLogs" && LogLevel(msg.Type) > t.logOptions.Level || LogLevel(msg.Type) == 3 {
+ if (step == "copyVespaLogs" && LogLevel(msg.Type) > t.logOptions.Level) || LogLevel(msg.Type) == 3 {
continue
}
msgs = append(msgs, msg)
@@ -320,14 +354,14 @@ func (t *cloudTarget) printLog(response jobResponse, last int64) int64 {
return response.LastID
}
-func (t *cloudTarget) discoverEndpoints(timeout time.Duration) error {
+func (t *cloudTarget) discoverEndpoints(timeout time.Duration) (map[string]string, error) {
deploymentURL := fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/environment/%s/region/%s",
t.apiOptions.System.URL,
t.deploymentOptions.Deployment.Application.Tenant, t.deploymentOptions.Deployment.Application.Application, t.deploymentOptions.Deployment.Application.Instance,
t.deploymentOptions.Deployment.Zone.Environment, t.deploymentOptions.Deployment.Zone.Region)
req, err := http.NewRequest("GET", deploymentURL, nil)
if err != nil {
- return err
+ return nil, err
}
urlsByCluster := make(map[string]string)
endpointFunc := func(status int, response []byte) (bool, error) {
@@ -350,11 +384,10 @@ func (t *cloudTarget) discoverEndpoints(timeout time.Duration) error {
return true, nil
}
if _, err := t.deployServiceWait(endpointFunc, func() *http.Request { return req }, timeout); err != nil {
- return err
+ return nil, fmt.Errorf("no endpoints found after waiting %s: %w", timeout, err)
}
if len(urlsByCluster) == 0 {
- return fmt.Errorf("no endpoints discovered for %s", t.deploymentOptions.Deployment)
+ return nil, fmt.Errorf("no endpoints found after waiting %s", timeout)
}
- t.deploymentOptions.ClusterURLs = urlsByCluster
- return nil
+ return urlsByCluster, nil
}