diff options
author | Martin Polden <mpolden@mpolden.no> | 2022-03-01 14:05:13 +0100 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2022-03-01 14:08:55 +0100 |
commit | bbe2ae845c5e9ad82c3eb8212eb67e3221af21d3 (patch) | |
tree | 91fdee8e312ee63aa554429e77560b49f6a27aae | |
parent | c8efa8ed2003e9032adad3b4b3acf912800f7247 (diff) |
Split target implementations into separate files
-rw-r--r-- | client/go/vespa/target.go | 460 | ||||
-rw-r--r-- | client/go/vespa/target_cloud.go | 371 | ||||
-rw-r--r-- | client/go/vespa/target_custom.go | 110 |
3 files changed, 481 insertions, 460 deletions
diff --git a/client/go/vespa/target.go b/client/go/vespa/target.go index 2314cc06f33..9657904b489 100644 --- a/client/go/vespa/target.go +++ b/client/go/vespa/target.go @@ -3,23 +3,15 @@ package vespa import ( - "bytes" "crypto/tls" - "encoding/json" "fmt" "io" "io/ioutil" - "math" "net/http" - "net/url" - "sort" - "strconv" "time" - "github.com/vespa-engine/vespa/client/go/auth0" "github.com/vespa-engine/vespa/client/go/util" "github.com/vespa-engine/vespa/client/go/version" - "github.com/vespa-engine/vespa/client/go/zts" ) const ( @@ -94,26 +86,6 @@ type LogOptions struct { Level int } -// CloudOptions configures URL and authentication for a cloud target. -type APIOptions struct { - System System - TLSOptions TLSOptions - APIKey []byte - AuthConfigPath string -} - -// CloudDeploymentOptions configures the deployment to manage through a cloud target. -type CloudDeploymentOptions struct { - Deployment Deployment - TLSOptions TLSOptions - ClusterURLs map[string]string // Endpoints keyed on cluster name -} - -type customTarget struct { - targetType string - baseURL string -} - // Do sends request to this service. Any required authentication happens automatically. func (s *Service) Do(request *http.Request, timeout time.Duration) (*http.Response, error) { if s.TLSOptions.KeyPair.Certificate != nil { @@ -163,438 +135,6 @@ func (s *Service) Description() string { return fmt.Sprintf("No description of service %s", s.Name) } -func (t *customTarget) Type() string { return t.targetType } - -func (t *customTarget) Deployment() Deployment { return Deployment{} } - -func (t *customTarget) Service(name string, timeout time.Duration, sessionOrRunID int64, cluster string) (*Service, error) { - if timeout > 0 && name != DeployService { - if err := t.waitForConvergence(timeout); err != nil { - return nil, err - } - } - switch name { - case DeployService, QueryService, DocumentService: - url, err := t.urlWithPort(name) - if err != nil { - return nil, err - } - return &Service{BaseURL: url, Name: name}, nil - } - return nil, fmt.Errorf("unknown service: %s", name) -} - -func (t *customTarget) PrintLog(options LogOptions) error { - return fmt.Errorf("reading logs from non-cloud deployment is unsupported") -} - -func (t *customTarget) SignRequest(req *http.Request, sigKeyId string) error { return nil } - -func (t *customTarget) CheckVersion(version version.Version) error { return nil } - -func (t *customTarget) urlWithPort(serviceName string) (string, error) { - u, err := url.Parse(t.baseURL) - if err != nil { - return "", err - } - port := u.Port() - if port == "" { - switch serviceName { - case DeployService: - port = "19071" - case QueryService, DocumentService: - port = "8080" - default: - return "", fmt.Errorf("unknown service: %s", serviceName) - } - u.Host = u.Host + ":" + port - } - return u.String(), nil -} - -func (t *customTarget) waitForConvergence(timeout time.Duration) error { - deployer, err := t.Service(DeployService, 0, 0, "") - if err != nil { - return err - } - url := fmt.Sprintf("%s/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge", deployer.BaseURL) - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return err - } - converged := false - convergedFunc := func(status int, response []byte) (bool, error) { - if status/100 != 2 { - return false, nil - } - var resp serviceConvergeResponse - if err := json.Unmarshal(response, &resp); err != nil { - return false, nil - } - converged = resp.Converged - return converged, nil - } - if _, err := wait(convergedFunc, func() *http.Request { return req }, nil, timeout); err != nil { - return err - } - if !converged { - return fmt.Errorf("services have not converged") - } - return nil -} - -type cloudTarget struct { - apiOptions APIOptions - deploymentOptions CloudDeploymentOptions - logOptions LogOptions - ztsClient ztsClient -} - -type ztsClient interface { - AccessToken(domain string, certficiate tls.Certificate) (string, error) -} - -func (t *cloudTarget) resolveEndpoint(cluster string) (string, error) { - if cluster == "" { - for _, u := range t.deploymentOptions.ClusterURLs { - if len(t.deploymentOptions.ClusterURLs) == 1 { - return u, nil - } else { - return "", fmt.Errorf("multiple clusters, none chosen: %v", t.deploymentOptions.ClusterURLs) - } - } - } else { - u := t.deploymentOptions.ClusterURLs[cluster] - if u == "" { - clusters := make([]string, len(t.deploymentOptions.ClusterURLs)) - for c := range t.deploymentOptions.ClusterURLs { - clusters = append(clusters, c) - } - return "", fmt.Errorf("unknown cluster '%s': must be one of %v", cluster, clusters) - } - return u, nil - } - - return "", fmt.Errorf("no endpoints") -} - -func (t *cloudTarget) Type() string { - switch t.apiOptions.System.Name { - case MainSystem.Name, CDSystem.Name: - return TargetHosted - } - return TargetCloud -} - -func (t *cloudTarget) Deployment() Deployment { return t.deploymentOptions.Deployment } - -func (t *cloudTarget) Service(name string, timeout time.Duration, runID int64, cluster string) (*Service, error) { - if name != DeployService && t.deploymentOptions.ClusterURLs == nil { - if err := t.waitForEndpoints(timeout, runID); err != nil { - return nil, err - } - } - switch name { - case DeployService: - return &Service{Name: name, BaseURL: t.apiOptions.System.URL, TLSOptions: t.apiOptions.TLSOptions, ztsClient: t.ztsClient}, nil - case QueryService, DocumentService: - url, err := t.resolveEndpoint(cluster) - if err != nil { - return nil, err - } - t.deploymentOptions.TLSOptions.AthenzDomain = t.apiOptions.System.AthenzDomain - return &Service{Name: name, BaseURL: url, TLSOptions: t.deploymentOptions.TLSOptions, ztsClient: t.ztsClient}, nil - } - return nil, fmt.Errorf("unknown service: %s", name) -} - -func (t *cloudTarget) SignRequest(req *http.Request, keyID string) error { - if t.apiOptions.System.IsPublic() { - if t.apiOptions.APIKey != nil { - signer := NewRequestSigner(keyID, t.apiOptions.APIKey) - return signer.SignRequest(req) - } else { - return t.addAuth0AccessToken(req) - } - } else { - if t.apiOptions.TLSOptions.KeyPair.Certificate == nil { - return fmt.Errorf("system %s requires a certificate for authentication", t.apiOptions.System.Name) - } - return nil - } -} - -func (t *cloudTarget) CheckVersion(clientVersion version.Version) error { - if clientVersion.IsZero() { // development version is always fine - return nil - } - req, err := http.NewRequest("GET", fmt.Sprintf("%s/cli/v1/", t.apiOptions.System.URL), nil) - if err != nil { - return err - } - response, err := util.HttpDo(req, 10*time.Second, "") - if err != nil { - return err - } - defer response.Body.Close() - var cliResponse struct { - MinVersion string `json:"minVersion"` - } - dec := json.NewDecoder(response.Body) - if err := dec.Decode(&cliResponse); err != nil { - return err - } - minVersion, err := version.Parse(cliResponse.MinVersion) - if err != nil { - return err - } - if clientVersion.Less(minVersion) { - return fmt.Errorf("client version %s is less than the minimum supported version: %s", clientVersion, minVersion) - } - return nil -} - -func (t *cloudTarget) addAuth0AccessToken(request *http.Request) error { - a, err := auth0.GetAuth0(t.apiOptions.AuthConfigPath, t.apiOptions.System.Name, t.apiOptions.System.URL) - if err != nil { - return err - } - system, err := a.PrepareSystem(auth0.ContextWithCancel()) - if err != nil { - return err - } - request.Header.Set("Authorization", "Bearer "+system.AccessToken) - return nil -} - -func (t *cloudTarget) logsURL() string { - return fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/environment/%s/region/%s/logs", - t.apiOptions.System.URL, - t.deploymentOptions.Deployment.Application.Tenant, t.deploymentOptions.Deployment.Application.Application, t.deploymentOptions.Deployment.Application.Instance, - t.deploymentOptions.Deployment.Zone.Environment, t.deploymentOptions.Deployment.Zone.Region) -} - -func (t *cloudTarget) PrintLog(options LogOptions) error { - req, err := http.NewRequest("GET", t.logsURL(), nil) - if err != nil { - return err - } - lastFrom := options.From - requestFunc := func() *http.Request { - fromMillis := lastFrom.Unix() * 1000 - q := req.URL.Query() - q.Set("from", strconv.FormatInt(fromMillis, 10)) - if !options.To.IsZero() { - toMillis := options.To.Unix() * 1000 - q.Set("to", strconv.FormatInt(toMillis, 10)) - } - req.URL.RawQuery = q.Encode() - t.SignRequest(req, t.deploymentOptions.Deployment.Application.SerializedForm()) - return req - } - logFunc := func(status int, response []byte) (bool, error) { - if ok, err := isOK(status); !ok { - return ok, err - } - logEntries, err := ReadLogEntries(bytes.NewReader(response)) - if err != nil { - return true, err - } - for _, le := range logEntries { - if !le.Time.After(lastFrom) { - continue - } - if LogLevel(le.Level) > options.Level { - continue - } - fmt.Fprintln(options.Writer, le.Format(options.Dequote)) - } - if len(logEntries) > 0 { - lastFrom = logEntries[len(logEntries)-1].Time - } - return false, nil - } - var timeout time.Duration - if options.Follow { - timeout = math.MaxInt64 // No timeout - } - _, err = wait(logFunc, requestFunc, &t.apiOptions.TLSOptions.KeyPair, timeout) - return err -} - -func (t *cloudTarget) waitForEndpoints(timeout time.Duration, runID int64) error { - if runID > 0 { - if err := t.waitForRun(runID, timeout); err != nil { - return err - } - } - return t.discoverEndpoints(timeout) -} - -func (t *cloudTarget) waitForRun(runID int64, timeout time.Duration) error { - runURL := fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/job/%s-%s/run/%d", - t.apiOptions.System.URL, - t.deploymentOptions.Deployment.Application.Tenant, t.deploymentOptions.Deployment.Application.Application, t.deploymentOptions.Deployment.Application.Instance, - t.deploymentOptions.Deployment.Zone.Environment, t.deploymentOptions.Deployment.Zone.Region, runID) - req, err := http.NewRequest("GET", runURL, nil) - if err != nil { - return err - } - lastID := int64(-1) - requestFunc := func() *http.Request { - q := req.URL.Query() - q.Set("after", strconv.FormatInt(lastID, 10)) - req.URL.RawQuery = q.Encode() - if err := t.SignRequest(req, t.deploymentOptions.Deployment.Application.SerializedForm()); err != nil { - panic(err) - } - return req - } - jobSuccessFunc := func(status int, response []byte) (bool, error) { - if ok, err := isOK(status); !ok { - return ok, err - } - var resp jobResponse - if err := json.Unmarshal(response, &resp); err != nil { - return false, nil - } - if t.logOptions.Writer != nil { - lastID = t.printLog(resp, lastID) - } - if resp.Active { - return false, nil - } - if resp.Status != "success" { - return false, fmt.Errorf("run %d ended with unsuccessful status: %s", runID, resp.Status) - } - return true, nil - } - _, err = wait(jobSuccessFunc, requestFunc, &t.apiOptions.TLSOptions.KeyPair, timeout) - return err -} - -func (t *cloudTarget) printLog(response jobResponse, last int64) int64 { - if response.LastID == 0 { - return last - } - var msgs []logMessage - for step, stepMsgs := range response.Log { - for _, msg := range stepMsgs { - if step == "copyVespaLogs" && LogLevel(msg.Type) > t.logOptions.Level || LogLevel(msg.Type) == 3 { - continue - } - msgs = append(msgs, msg) - } - } - sort.Slice(msgs, func(i, j int) bool { return msgs[i].At < msgs[j].At }) - for _, msg := range msgs { - tm := time.Unix(msg.At/1000, (msg.At%1000)*1000) - fmtTime := tm.Format("15:04:05") - fmt.Fprintf(t.logOptions.Writer, "[%s] %-7s %s\n", fmtTime, msg.Type, msg.Message) - } - return response.LastID -} - -func (t *cloudTarget) discoverEndpoints(timeout time.Duration) error { - deploymentURL := fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/environment/%s/region/%s", - t.apiOptions.System.URL, - t.deploymentOptions.Deployment.Application.Tenant, t.deploymentOptions.Deployment.Application.Application, t.deploymentOptions.Deployment.Application.Instance, - t.deploymentOptions.Deployment.Zone.Environment, t.deploymentOptions.Deployment.Zone.Region) - req, err := http.NewRequest("GET", deploymentURL, nil) - if err != nil { - return err - } - if err := t.SignRequest(req, t.deploymentOptions.Deployment.Application.SerializedForm()); err != nil { - return err - } - urlsByCluster := make(map[string]string) - endpointFunc := func(status int, response []byte) (bool, error) { - if ok, err := isOK(status); !ok { - return ok, err - } - var resp deploymentResponse - if err := json.Unmarshal(response, &resp); err != nil { - return false, nil - } - if len(resp.Endpoints) == 0 { - return false, nil - } - for _, endpoint := range resp.Endpoints { - if endpoint.Scope != "zone" { - continue - } - urlsByCluster[endpoint.Cluster] = endpoint.URL - } - return true, nil - } - if _, err = wait(endpointFunc, func() *http.Request { return req }, &t.apiOptions.TLSOptions.KeyPair, timeout); err != nil { - return err - } - if len(urlsByCluster) == 0 { - return fmt.Errorf("no endpoints discovered") - } - t.deploymentOptions.ClusterURLs = urlsByCluster - return nil -} - -func isOK(status int) (bool, error) { - if status == 401 { - return false, fmt.Errorf("status %d: invalid api key", status) - } - return status/100 == 2, nil -} - -// LocalTarget creates a target for a Vespa platform running locally. -func LocalTarget() Target { - return &customTarget{targetType: TargetLocal, baseURL: "http://127.0.0.1"} -} - -// CustomTarget creates a Target for a Vespa platform running at baseURL. -func CustomTarget(baseURL string) Target { - return &customTarget{targetType: TargetCustom, baseURL: baseURL} -} - -// CloudTarget creates a Target for the Vespa Cloud or hosted Vespa platform. -func CloudTarget(apiOptions APIOptions, deploymentOptions CloudDeploymentOptions, logOptions LogOptions) (Target, error) { - ztsClient, err := zts.NewClient(zts.DefaultURL, util.ActiveHttpClient) - if err != nil { - return nil, err - } - return &cloudTarget{ - apiOptions: apiOptions, - deploymentOptions: deploymentOptions, - logOptions: logOptions, - ztsClient: ztsClient, - }, nil -} - -type deploymentEndpoint struct { - Cluster string `json:"cluster"` - URL string `json:"url"` - Scope string `json:"scope"` -} - -type deploymentResponse struct { - Endpoints []deploymentEndpoint `json:"endpoints"` -} - -type serviceConvergeResponse struct { - Converged bool `json:"converged"` -} - -type jobResponse struct { - Active bool `json:"active"` - Status string `json:"status"` - Log map[string][]logMessage `json:"log"` - LastID int64 `json:"lastId"` -} - -type logMessage struct { - At int64 `json:"at"` - Type string `json:"type"` - Message string `json:"message"` -} - type responseFunc func(status int, response []byte) (bool, error) type requestFunc func() *http.Request diff --git a/client/go/vespa/target_cloud.go b/client/go/vespa/target_cloud.go new file mode 100644 index 00000000000..e4603c0101e --- /dev/null +++ b/client/go/vespa/target_cloud.go @@ -0,0 +1,371 @@ +package vespa + +import ( + "bytes" + "crypto/tls" + "encoding/json" + "fmt" + "math" + "net/http" + "sort" + "strconv" + "time" + + "github.com/vespa-engine/vespa/client/go/auth0" + "github.com/vespa-engine/vespa/client/go/util" + "github.com/vespa-engine/vespa/client/go/version" + "github.com/vespa-engine/vespa/client/go/zts" +) + +// CloudOptions configures URL and authentication for a cloud target. +type APIOptions struct { + System System + TLSOptions TLSOptions + APIKey []byte + AuthConfigPath string +} + +// CloudDeploymentOptions configures the deployment to manage through a cloud target. +type CloudDeploymentOptions struct { + Deployment Deployment + TLSOptions TLSOptions + ClusterURLs map[string]string // Endpoints keyed on cluster name +} + +type cloudTarget struct { + apiOptions APIOptions + deploymentOptions CloudDeploymentOptions + logOptions LogOptions + ztsClient ztsClient +} + +type deploymentEndpoint struct { + Cluster string `json:"cluster"` + URL string `json:"url"` + Scope string `json:"scope"` +} + +type deploymentResponse struct { + Endpoints []deploymentEndpoint `json:"endpoints"` +} + +type jobResponse struct { + Active bool `json:"active"` + Status string `json:"status"` + Log map[string][]logMessage `json:"log"` + LastID int64 `json:"lastId"` +} + +type logMessage struct { + At int64 `json:"at"` + Type string `json:"type"` + Message string `json:"message"` +} + +type ztsClient interface { + AccessToken(domain string, certficiate tls.Certificate) (string, error) +} + +// CloudTarget creates a Target for the Vespa Cloud or hosted Vespa platform. +func CloudTarget(apiOptions APIOptions, deploymentOptions CloudDeploymentOptions, logOptions LogOptions) (Target, error) { + ztsClient, err := zts.NewClient(zts.DefaultURL, util.ActiveHttpClient) + if err != nil { + return nil, err + } + return &cloudTarget{ + apiOptions: apiOptions, + deploymentOptions: deploymentOptions, + logOptions: logOptions, + ztsClient: ztsClient, + }, nil +} + +func (t *cloudTarget) resolveEndpoint(cluster string) (string, error) { + if cluster == "" { + for _, u := range t.deploymentOptions.ClusterURLs { + if len(t.deploymentOptions.ClusterURLs) == 1 { + return u, nil + } else { + return "", fmt.Errorf("multiple clusters, none chosen: %v", t.deploymentOptions.ClusterURLs) + } + } + } else { + u := t.deploymentOptions.ClusterURLs[cluster] + if u == "" { + clusters := make([]string, len(t.deploymentOptions.ClusterURLs)) + for c := range t.deploymentOptions.ClusterURLs { + clusters = append(clusters, c) + } + return "", fmt.Errorf("unknown cluster '%s': must be one of %v", cluster, clusters) + } + return u, nil + } + + return "", fmt.Errorf("no endpoints") +} + +func (t *cloudTarget) Type() string { + switch t.apiOptions.System.Name { + case MainSystem.Name, CDSystem.Name: + return TargetHosted + } + return TargetCloud +} + +func (t *cloudTarget) Deployment() Deployment { return t.deploymentOptions.Deployment } + +func (t *cloudTarget) Service(name string, timeout time.Duration, runID int64, cluster string) (*Service, error) { + if name != DeployService && t.deploymentOptions.ClusterURLs == nil { + if err := t.waitForEndpoints(timeout, runID); err != nil { + return nil, err + } + } + switch name { + case DeployService: + return &Service{Name: name, BaseURL: t.apiOptions.System.URL, TLSOptions: t.apiOptions.TLSOptions, ztsClient: t.ztsClient}, nil + case QueryService, DocumentService: + url, err := t.resolveEndpoint(cluster) + if err != nil { + return nil, err + } + t.deploymentOptions.TLSOptions.AthenzDomain = t.apiOptions.System.AthenzDomain + return &Service{Name: name, BaseURL: url, TLSOptions: t.deploymentOptions.TLSOptions, ztsClient: t.ztsClient}, nil + } + return nil, fmt.Errorf("unknown service: %s", name) +} + +func (t *cloudTarget) SignRequest(req *http.Request, keyID string) error { + if t.apiOptions.System.IsPublic() { + if t.apiOptions.APIKey != nil { + signer := NewRequestSigner(keyID, t.apiOptions.APIKey) + return signer.SignRequest(req) + } else { + return t.addAuth0AccessToken(req) + } + } else { + if t.apiOptions.TLSOptions.KeyPair.Certificate == nil { + return fmt.Errorf("system %s requires a certificate for authentication", t.apiOptions.System.Name) + } + return nil + } +} + +func (t *cloudTarget) CheckVersion(clientVersion version.Version) error { + if clientVersion.IsZero() { // development version is always fine + return nil + } + req, err := http.NewRequest("GET", fmt.Sprintf("%s/cli/v1/", t.apiOptions.System.URL), nil) + if err != nil { + return err + } + response, err := util.HttpDo(req, 10*time.Second, "") + if err != nil { + return err + } + defer response.Body.Close() + var cliResponse struct { + MinVersion string `json:"minVersion"` + } + dec := json.NewDecoder(response.Body) + if err := dec.Decode(&cliResponse); err != nil { + return err + } + minVersion, err := version.Parse(cliResponse.MinVersion) + if err != nil { + return err + } + if clientVersion.Less(minVersion) { + return fmt.Errorf("client version %s is less than the minimum supported version: %s", clientVersion, minVersion) + } + return nil +} + +func (t *cloudTarget) addAuth0AccessToken(request *http.Request) error { + a, err := auth0.GetAuth0(t.apiOptions.AuthConfigPath, t.apiOptions.System.Name, t.apiOptions.System.URL) + if err != nil { + return err + } + system, err := a.PrepareSystem(auth0.ContextWithCancel()) + if err != nil { + return err + } + request.Header.Set("Authorization", "Bearer "+system.AccessToken) + return nil +} + +func (t *cloudTarget) logsURL() string { + return fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/environment/%s/region/%s/logs", + t.apiOptions.System.URL, + t.deploymentOptions.Deployment.Application.Tenant, t.deploymentOptions.Deployment.Application.Application, t.deploymentOptions.Deployment.Application.Instance, + t.deploymentOptions.Deployment.Zone.Environment, t.deploymentOptions.Deployment.Zone.Region) +} + +func (t *cloudTarget) PrintLog(options LogOptions) error { + req, err := http.NewRequest("GET", t.logsURL(), nil) + if err != nil { + return err + } + lastFrom := options.From + requestFunc := func() *http.Request { + fromMillis := lastFrom.Unix() * 1000 + q := req.URL.Query() + q.Set("from", strconv.FormatInt(fromMillis, 10)) + if !options.To.IsZero() { + toMillis := options.To.Unix() * 1000 + q.Set("to", strconv.FormatInt(toMillis, 10)) + } + req.URL.RawQuery = q.Encode() + t.SignRequest(req, t.deploymentOptions.Deployment.Application.SerializedForm()) + return req + } + logFunc := func(status int, response []byte) (bool, error) { + if ok, err := isOK(status); !ok { + return ok, err + } + logEntries, err := ReadLogEntries(bytes.NewReader(response)) + if err != nil { + return true, err + } + for _, le := range logEntries { + if !le.Time.After(lastFrom) { + continue + } + if LogLevel(le.Level) > options.Level { + continue + } + fmt.Fprintln(options.Writer, le.Format(options.Dequote)) + } + if len(logEntries) > 0 { + lastFrom = logEntries[len(logEntries)-1].Time + } + return false, nil + } + var timeout time.Duration + if options.Follow { + timeout = math.MaxInt64 // No timeout + } + _, err = wait(logFunc, requestFunc, &t.apiOptions.TLSOptions.KeyPair, timeout) + return err +} + +func (t *cloudTarget) waitForEndpoints(timeout time.Duration, runID int64) error { + if runID > 0 { + if err := t.waitForRun(runID, timeout); err != nil { + return err + } + } + return t.discoverEndpoints(timeout) +} + +func (t *cloudTarget) waitForRun(runID int64, timeout time.Duration) error { + runURL := fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/job/%s-%s/run/%d", + t.apiOptions.System.URL, + t.deploymentOptions.Deployment.Application.Tenant, t.deploymentOptions.Deployment.Application.Application, t.deploymentOptions.Deployment.Application.Instance, + t.deploymentOptions.Deployment.Zone.Environment, t.deploymentOptions.Deployment.Zone.Region, runID) + req, err := http.NewRequest("GET", runURL, nil) + if err != nil { + return err + } + lastID := int64(-1) + requestFunc := func() *http.Request { + q := req.URL.Query() + q.Set("after", strconv.FormatInt(lastID, 10)) + req.URL.RawQuery = q.Encode() + if err := t.SignRequest(req, t.deploymentOptions.Deployment.Application.SerializedForm()); err != nil { + panic(err) + } + return req + } + jobSuccessFunc := func(status int, response []byte) (bool, error) { + if ok, err := isOK(status); !ok { + return ok, err + } + var resp jobResponse + if err := json.Unmarshal(response, &resp); err != nil { + return false, nil + } + if t.logOptions.Writer != nil { + lastID = t.printLog(resp, lastID) + } + if resp.Active { + return false, nil + } + if resp.Status != "success" { + return false, fmt.Errorf("run %d ended with unsuccessful status: %s", runID, resp.Status) + } + return true, nil + } + _, err = wait(jobSuccessFunc, requestFunc, &t.apiOptions.TLSOptions.KeyPair, timeout) + return err +} + +func (t *cloudTarget) printLog(response jobResponse, last int64) int64 { + if response.LastID == 0 { + return last + } + var msgs []logMessage + for step, stepMsgs := range response.Log { + for _, msg := range stepMsgs { + if step == "copyVespaLogs" && LogLevel(msg.Type) > t.logOptions.Level || LogLevel(msg.Type) == 3 { + continue + } + msgs = append(msgs, msg) + } + } + sort.Slice(msgs, func(i, j int) bool { return msgs[i].At < msgs[j].At }) + for _, msg := range msgs { + tm := time.Unix(msg.At/1000, (msg.At%1000)*1000) + fmtTime := tm.Format("15:04:05") + fmt.Fprintf(t.logOptions.Writer, "[%s] %-7s %s\n", fmtTime, msg.Type, msg.Message) + } + return response.LastID +} + +func (t *cloudTarget) discoverEndpoints(timeout time.Duration) error { + deploymentURL := fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/environment/%s/region/%s", + t.apiOptions.System.URL, + t.deploymentOptions.Deployment.Application.Tenant, t.deploymentOptions.Deployment.Application.Application, t.deploymentOptions.Deployment.Application.Instance, + t.deploymentOptions.Deployment.Zone.Environment, t.deploymentOptions.Deployment.Zone.Region) + req, err := http.NewRequest("GET", deploymentURL, nil) + if err != nil { + return err + } + if err := t.SignRequest(req, t.deploymentOptions.Deployment.Application.SerializedForm()); err != nil { + return err + } + urlsByCluster := make(map[string]string) + endpointFunc := func(status int, response []byte) (bool, error) { + if ok, err := isOK(status); !ok { + return ok, err + } + var resp deploymentResponse + if err := json.Unmarshal(response, &resp); err != nil { + return false, nil + } + if len(resp.Endpoints) == 0 { + return false, nil + } + for _, endpoint := range resp.Endpoints { + if endpoint.Scope != "zone" { + continue + } + urlsByCluster[endpoint.Cluster] = endpoint.URL + } + return true, nil + } + if _, err = wait(endpointFunc, func() *http.Request { return req }, &t.apiOptions.TLSOptions.KeyPair, timeout); err != nil { + return err + } + if len(urlsByCluster) == 0 { + return fmt.Errorf("no endpoints discovered") + } + t.deploymentOptions.ClusterURLs = urlsByCluster + return nil +} + +func isOK(status int) (bool, error) { + if status == 401 { + return false, fmt.Errorf("status %d: invalid api key", status) + } + return status/100 == 2, nil +} diff --git a/client/go/vespa/target_custom.go b/client/go/vespa/target_custom.go new file mode 100644 index 00000000000..2e366005f29 --- /dev/null +++ b/client/go/vespa/target_custom.go @@ -0,0 +1,110 @@ +package vespa + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "time" + + "github.com/vespa-engine/vespa/client/go/version" +) + +type customTarget struct { + targetType string + baseURL string +} + +type serviceConvergeResponse struct { + Converged bool `json:"converged"` +} + +// LocalTarget creates a target for a Vespa platform running locally. +func LocalTarget() Target { + return &customTarget{targetType: TargetLocal, baseURL: "http://127.0.0.1"} +} + +// CustomTarget creates a Target for a Vespa platform running at baseURL. +func CustomTarget(baseURL string) Target { + return &customTarget{targetType: TargetCustom, baseURL: baseURL} +} + +func (t *customTarget) Type() string { return t.targetType } + +func (t *customTarget) Deployment() Deployment { return Deployment{} } + +func (t *customTarget) Service(name string, timeout time.Duration, sessionOrRunID int64, cluster string) (*Service, error) { + if timeout > 0 && name != DeployService { + if err := t.waitForConvergence(timeout); err != nil { + return nil, err + } + } + switch name { + case DeployService, QueryService, DocumentService: + url, err := t.urlWithPort(name) + if err != nil { + return nil, err + } + return &Service{BaseURL: url, Name: name}, nil + } + return nil, fmt.Errorf("unknown service: %s", name) +} + +func (t *customTarget) PrintLog(options LogOptions) error { + return fmt.Errorf("reading logs from non-cloud deployment is unsupported") +} + +func (t *customTarget) SignRequest(req *http.Request, sigKeyId string) error { return nil } + +func (t *customTarget) CheckVersion(version version.Version) error { return nil } + +func (t *customTarget) urlWithPort(serviceName string) (string, error) { + u, err := url.Parse(t.baseURL) + if err != nil { + return "", err + } + port := u.Port() + if port == "" { + switch serviceName { + case DeployService: + port = "19071" + case QueryService, DocumentService: + port = "8080" + default: + return "", fmt.Errorf("unknown service: %s", serviceName) + } + u.Host = u.Host + ":" + port + } + return u.String(), nil +} + +func (t *customTarget) waitForConvergence(timeout time.Duration) error { + deployer, err := t.Service(DeployService, 0, 0, "") + if err != nil { + return err + } + url := fmt.Sprintf("%s/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge", deployer.BaseURL) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return err + } + converged := false + convergedFunc := func(status int, response []byte) (bool, error) { + if status/100 != 2 { + return false, nil + } + var resp serviceConvergeResponse + if err := json.Unmarshal(response, &resp); err != nil { + return false, nil + } + converged = resp.Converged + return converged, nil + } + if _, err := wait(convergedFunc, func() *http.Request { return req }, nil, timeout); err != nil { + return err + } + if !converged { + return fmt.Errorf("services have not converged") + } + return nil +} |