summaryrefslogtreecommitdiffstats
path: root/client/go/vespa/target.go
diff options
context:
space:
mode:
Diffstat (limited to 'client/go/vespa/target.go')
-rw-r--r--client/go/vespa/target.go472
1 files changed, 10 insertions, 462 deletions
diff --git a/client/go/vespa/target.go b/client/go/vespa/target.go
index 2314cc06f33..8d48f1ffa3a 100644
--- a/client/go/vespa/target.go
+++ b/client/go/vespa/target.go
@@ -3,23 +3,15 @@
package vespa
import (
- "bytes"
"crypto/tls"
- "encoding/json"
"fmt"
"io"
"io/ioutil"
- "math"
"net/http"
- "net/url"
- "sort"
- "strconv"
"time"
- "github.com/vespa-engine/vespa/client/go/auth0"
"github.com/vespa-engine/vespa/client/go/util"
"github.com/vespa-engine/vespa/client/go/version"
- "github.com/vespa-engine/vespa/client/go/zts"
)
const (
@@ -94,26 +86,6 @@ type LogOptions struct {
Level int
}
-// CloudOptions configures URL and authentication for a cloud target.
-type APIOptions struct {
- System System
- TLSOptions TLSOptions
- APIKey []byte
- AuthConfigPath string
-}
-
-// CloudDeploymentOptions configures the deployment to manage through a cloud target.
-type CloudDeploymentOptions struct {
- Deployment Deployment
- TLSOptions TLSOptions
- ClusterURLs map[string]string // Endpoints keyed on cluster name
-}
-
-type customTarget struct {
- targetType string
- baseURL string
-}
-
// Do sends request to this service. Any required authentication happens automatically.
func (s *Service) Do(request *http.Request, timeout time.Duration) (*http.Response, error) {
if s.TLSOptions.KeyPair.Certificate != nil {
@@ -143,12 +115,7 @@ func (s *Service) Wait(timeout time.Duration) (int, error) {
default:
return 0, fmt.Errorf("invalid service: %s", s.Name)
}
- req, err := http.NewRequest("GET", url, nil)
- if err != nil {
- return 0, err
- }
- okFunc := func(status int, response []byte) (bool, error) { return status/100 == 2, nil }
- return wait(okFunc, func() *http.Request { return req }, &s.TLSOptions.KeyPair, timeout)
+ return waitForOK(url, &s.TLSOptions.KeyPair, timeout)
}
func (s *Service) Description() string {
@@ -163,442 +130,23 @@ func (s *Service) Description() string {
return fmt.Sprintf("No description of service %s", s.Name)
}
-func (t *customTarget) Type() string { return t.targetType }
-
-func (t *customTarget) Deployment() Deployment { return Deployment{} }
-
-func (t *customTarget) Service(name string, timeout time.Duration, sessionOrRunID int64, cluster string) (*Service, error) {
- if timeout > 0 && name != DeployService {
- if err := t.waitForConvergence(timeout); err != nil {
- return nil, err
- }
- }
- switch name {
- case DeployService, QueryService, DocumentService:
- url, err := t.urlWithPort(name)
- if err != nil {
- return nil, err
- }
- return &Service{BaseURL: url, Name: name}, nil
- }
- return nil, fmt.Errorf("unknown service: %s", name)
-}
-
-func (t *customTarget) PrintLog(options LogOptions) error {
- return fmt.Errorf("reading logs from non-cloud deployment is unsupported")
-}
-
-func (t *customTarget) SignRequest(req *http.Request, sigKeyId string) error { return nil }
+func isOK(status int) bool { return status/100 == 2 }
-func (t *customTarget) CheckVersion(version version.Version) error { return nil }
+type responseFunc func(status int, response []byte) (bool, error)
-func (t *customTarget) urlWithPort(serviceName string) (string, error) {
- u, err := url.Parse(t.baseURL)
- if err != nil {
- return "", err
- }
- port := u.Port()
- if port == "" {
- switch serviceName {
- case DeployService:
- port = "19071"
- case QueryService, DocumentService:
- port = "8080"
- default:
- return "", fmt.Errorf("unknown service: %s", serviceName)
- }
- u.Host = u.Host + ":" + port
- }
- return u.String(), nil
-}
+type requestFunc func() *http.Request
-func (t *customTarget) waitForConvergence(timeout time.Duration) error {
- deployer, err := t.Service(DeployService, 0, 0, "")
- if err != nil {
- return err
- }
- url := fmt.Sprintf("%s/application/v2/tenant/default/application/default/environment/prod/region/default/instance/default/serviceconverge", deployer.BaseURL)
+// waitForOK queries url and returns its status code. If the url returns a non-200 status code, it is repeatedly queried
+// until timeout elapses.
+func waitForOK(url string, certificate *tls.Certificate, timeout time.Duration) (int, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
- return err
- }
- converged := false
- convergedFunc := func(status int, response []byte) (bool, error) {
- if status/100 != 2 {
- return false, nil
- }
- var resp serviceConvergeResponse
- if err := json.Unmarshal(response, &resp); err != nil {
- return false, nil
- }
- converged = resp.Converged
- return converged, nil
- }
- if _, err := wait(convergedFunc, func() *http.Request { return req }, nil, timeout); err != nil {
- return err
- }
- if !converged {
- return fmt.Errorf("services have not converged")
- }
- return nil
-}
-
-type cloudTarget struct {
- apiOptions APIOptions
- deploymentOptions CloudDeploymentOptions
- logOptions LogOptions
- ztsClient ztsClient
-}
-
-type ztsClient interface {
- AccessToken(domain string, certficiate tls.Certificate) (string, error)
-}
-
-func (t *cloudTarget) resolveEndpoint(cluster string) (string, error) {
- if cluster == "" {
- for _, u := range t.deploymentOptions.ClusterURLs {
- if len(t.deploymentOptions.ClusterURLs) == 1 {
- return u, nil
- } else {
- return "", fmt.Errorf("multiple clusters, none chosen: %v", t.deploymentOptions.ClusterURLs)
- }
- }
- } else {
- u := t.deploymentOptions.ClusterURLs[cluster]
- if u == "" {
- clusters := make([]string, len(t.deploymentOptions.ClusterURLs))
- for c := range t.deploymentOptions.ClusterURLs {
- clusters = append(clusters, c)
- }
- return "", fmt.Errorf("unknown cluster '%s': must be one of %v", cluster, clusters)
- }
- return u, nil
- }
-
- return "", fmt.Errorf("no endpoints")
-}
-
-func (t *cloudTarget) Type() string {
- switch t.apiOptions.System.Name {
- case MainSystem.Name, CDSystem.Name:
- return TargetHosted
- }
- return TargetCloud
-}
-
-func (t *cloudTarget) Deployment() Deployment { return t.deploymentOptions.Deployment }
-
-func (t *cloudTarget) Service(name string, timeout time.Duration, runID int64, cluster string) (*Service, error) {
- if name != DeployService && t.deploymentOptions.ClusterURLs == nil {
- if err := t.waitForEndpoints(timeout, runID); err != nil {
- return nil, err
- }
- }
- switch name {
- case DeployService:
- return &Service{Name: name, BaseURL: t.apiOptions.System.URL, TLSOptions: t.apiOptions.TLSOptions, ztsClient: t.ztsClient}, nil
- case QueryService, DocumentService:
- url, err := t.resolveEndpoint(cluster)
- if err != nil {
- return nil, err
- }
- t.deploymentOptions.TLSOptions.AthenzDomain = t.apiOptions.System.AthenzDomain
- return &Service{Name: name, BaseURL: url, TLSOptions: t.deploymentOptions.TLSOptions, ztsClient: t.ztsClient}, nil
- }
- return nil, fmt.Errorf("unknown service: %s", name)
-}
-
-func (t *cloudTarget) SignRequest(req *http.Request, keyID string) error {
- if t.apiOptions.System.IsPublic() {
- if t.apiOptions.APIKey != nil {
- signer := NewRequestSigner(keyID, t.apiOptions.APIKey)
- return signer.SignRequest(req)
- } else {
- return t.addAuth0AccessToken(req)
- }
- } else {
- if t.apiOptions.TLSOptions.KeyPair.Certificate == nil {
- return fmt.Errorf("system %s requires a certificate for authentication", t.apiOptions.System.Name)
- }
- return nil
- }
-}
-
-func (t *cloudTarget) CheckVersion(clientVersion version.Version) error {
- if clientVersion.IsZero() { // development version is always fine
- return nil
- }
- req, err := http.NewRequest("GET", fmt.Sprintf("%s/cli/v1/", t.apiOptions.System.URL), nil)
- if err != nil {
- return err
- }
- response, err := util.HttpDo(req, 10*time.Second, "")
- if err != nil {
- return err
- }
- defer response.Body.Close()
- var cliResponse struct {
- MinVersion string `json:"minVersion"`
- }
- dec := json.NewDecoder(response.Body)
- if err := dec.Decode(&cliResponse); err != nil {
- return err
- }
- minVersion, err := version.Parse(cliResponse.MinVersion)
- if err != nil {
- return err
- }
- if clientVersion.Less(minVersion) {
- return fmt.Errorf("client version %s is less than the minimum supported version: %s", clientVersion, minVersion)
- }
- return nil
-}
-
-func (t *cloudTarget) addAuth0AccessToken(request *http.Request) error {
- a, err := auth0.GetAuth0(t.apiOptions.AuthConfigPath, t.apiOptions.System.Name, t.apiOptions.System.URL)
- if err != nil {
- return err
- }
- system, err := a.PrepareSystem(auth0.ContextWithCancel())
- if err != nil {
- return err
- }
- request.Header.Set("Authorization", "Bearer "+system.AccessToken)
- return nil
-}
-
-func (t *cloudTarget) logsURL() string {
- return fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/environment/%s/region/%s/logs",
- t.apiOptions.System.URL,
- t.deploymentOptions.Deployment.Application.Tenant, t.deploymentOptions.Deployment.Application.Application, t.deploymentOptions.Deployment.Application.Instance,
- t.deploymentOptions.Deployment.Zone.Environment, t.deploymentOptions.Deployment.Zone.Region)
-}
-
-func (t *cloudTarget) PrintLog(options LogOptions) error {
- req, err := http.NewRequest("GET", t.logsURL(), nil)
- if err != nil {
- return err
- }
- lastFrom := options.From
- requestFunc := func() *http.Request {
- fromMillis := lastFrom.Unix() * 1000
- q := req.URL.Query()
- q.Set("from", strconv.FormatInt(fromMillis, 10))
- if !options.To.IsZero() {
- toMillis := options.To.Unix() * 1000
- q.Set("to", strconv.FormatInt(toMillis, 10))
- }
- req.URL.RawQuery = q.Encode()
- t.SignRequest(req, t.deploymentOptions.Deployment.Application.SerializedForm())
- return req
- }
- logFunc := func(status int, response []byte) (bool, error) {
- if ok, err := isOK(status); !ok {
- return ok, err
- }
- logEntries, err := ReadLogEntries(bytes.NewReader(response))
- if err != nil {
- return true, err
- }
- for _, le := range logEntries {
- if !le.Time.After(lastFrom) {
- continue
- }
- if LogLevel(le.Level) > options.Level {
- continue
- }
- fmt.Fprintln(options.Writer, le.Format(options.Dequote))
- }
- if len(logEntries) > 0 {
- lastFrom = logEntries[len(logEntries)-1].Time
- }
- return false, nil
- }
- var timeout time.Duration
- if options.Follow {
- timeout = math.MaxInt64 // No timeout
- }
- _, err = wait(logFunc, requestFunc, &t.apiOptions.TLSOptions.KeyPair, timeout)
- return err
-}
-
-func (t *cloudTarget) waitForEndpoints(timeout time.Duration, runID int64) error {
- if runID > 0 {
- if err := t.waitForRun(runID, timeout); err != nil {
- return err
- }
- }
- return t.discoverEndpoints(timeout)
-}
-
-func (t *cloudTarget) waitForRun(runID int64, timeout time.Duration) error {
- runURL := fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/job/%s-%s/run/%d",
- t.apiOptions.System.URL,
- t.deploymentOptions.Deployment.Application.Tenant, t.deploymentOptions.Deployment.Application.Application, t.deploymentOptions.Deployment.Application.Instance,
- t.deploymentOptions.Deployment.Zone.Environment, t.deploymentOptions.Deployment.Zone.Region, runID)
- req, err := http.NewRequest("GET", runURL, nil)
- if err != nil {
- return err
- }
- lastID := int64(-1)
- requestFunc := func() *http.Request {
- q := req.URL.Query()
- q.Set("after", strconv.FormatInt(lastID, 10))
- req.URL.RawQuery = q.Encode()
- if err := t.SignRequest(req, t.deploymentOptions.Deployment.Application.SerializedForm()); err != nil {
- panic(err)
- }
- return req
- }
- jobSuccessFunc := func(status int, response []byte) (bool, error) {
- if ok, err := isOK(status); !ok {
- return ok, err
- }
- var resp jobResponse
- if err := json.Unmarshal(response, &resp); err != nil {
- return false, nil
- }
- if t.logOptions.Writer != nil {
- lastID = t.printLog(resp, lastID)
- }
- if resp.Active {
- return false, nil
- }
- if resp.Status != "success" {
- return false, fmt.Errorf("run %d ended with unsuccessful status: %s", runID, resp.Status)
- }
- return true, nil
- }
- _, err = wait(jobSuccessFunc, requestFunc, &t.apiOptions.TLSOptions.KeyPair, timeout)
- return err
-}
-
-func (t *cloudTarget) printLog(response jobResponse, last int64) int64 {
- if response.LastID == 0 {
- return last
- }
- var msgs []logMessage
- for step, stepMsgs := range response.Log {
- for _, msg := range stepMsgs {
- if step == "copyVespaLogs" && LogLevel(msg.Type) > t.logOptions.Level || LogLevel(msg.Type) == 3 {
- continue
- }
- msgs = append(msgs, msg)
- }
- }
- sort.Slice(msgs, func(i, j int) bool { return msgs[i].At < msgs[j].At })
- for _, msg := range msgs {
- tm := time.Unix(msg.At/1000, (msg.At%1000)*1000)
- fmtTime := tm.Format("15:04:05")
- fmt.Fprintf(t.logOptions.Writer, "[%s] %-7s %s\n", fmtTime, msg.Type, msg.Message)
- }
- return response.LastID
-}
-
-func (t *cloudTarget) discoverEndpoints(timeout time.Duration) error {
- deploymentURL := fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/environment/%s/region/%s",
- t.apiOptions.System.URL,
- t.deploymentOptions.Deployment.Application.Tenant, t.deploymentOptions.Deployment.Application.Application, t.deploymentOptions.Deployment.Application.Instance,
- t.deploymentOptions.Deployment.Zone.Environment, t.deploymentOptions.Deployment.Zone.Region)
- req, err := http.NewRequest("GET", deploymentURL, nil)
- if err != nil {
- return err
- }
- if err := t.SignRequest(req, t.deploymentOptions.Deployment.Application.SerializedForm()); err != nil {
- return err
- }
- urlsByCluster := make(map[string]string)
- endpointFunc := func(status int, response []byte) (bool, error) {
- if ok, err := isOK(status); !ok {
- return ok, err
- }
- var resp deploymentResponse
- if err := json.Unmarshal(response, &resp); err != nil {
- return false, nil
- }
- if len(resp.Endpoints) == 0 {
- return false, nil
- }
- for _, endpoint := range resp.Endpoints {
- if endpoint.Scope != "zone" {
- continue
- }
- urlsByCluster[endpoint.Cluster] = endpoint.URL
- }
- return true, nil
- }
- if _, err = wait(endpointFunc, func() *http.Request { return req }, &t.apiOptions.TLSOptions.KeyPair, timeout); err != nil {
- return err
- }
- if len(urlsByCluster) == 0 {
- return fmt.Errorf("no endpoints discovered")
- }
- t.deploymentOptions.ClusterURLs = urlsByCluster
- return nil
-}
-
-func isOK(status int) (bool, error) {
- if status == 401 {
- return false, fmt.Errorf("status %d: invalid api key", status)
- }
- return status/100 == 2, nil
-}
-
-// LocalTarget creates a target for a Vespa platform running locally.
-func LocalTarget() Target {
- return &customTarget{targetType: TargetLocal, baseURL: "http://127.0.0.1"}
-}
-
-// CustomTarget creates a Target for a Vespa platform running at baseURL.
-func CustomTarget(baseURL string) Target {
- return &customTarget{targetType: TargetCustom, baseURL: baseURL}
-}
-
-// CloudTarget creates a Target for the Vespa Cloud or hosted Vespa platform.
-func CloudTarget(apiOptions APIOptions, deploymentOptions CloudDeploymentOptions, logOptions LogOptions) (Target, error) {
- ztsClient, err := zts.NewClient(zts.DefaultURL, util.ActiveHttpClient)
- if err != nil {
- return nil, err
+ return 0, err
}
- return &cloudTarget{
- apiOptions: apiOptions,
- deploymentOptions: deploymentOptions,
- logOptions: logOptions,
- ztsClient: ztsClient,
- }, nil
-}
-
-type deploymentEndpoint struct {
- Cluster string `json:"cluster"`
- URL string `json:"url"`
- Scope string `json:"scope"`
-}
-
-type deploymentResponse struct {
- Endpoints []deploymentEndpoint `json:"endpoints"`
+ okFunc := func(status int, response []byte) (bool, error) { return isOK(status), nil }
+ return wait(okFunc, func() *http.Request { return req }, certificate, timeout)
}
-type serviceConvergeResponse struct {
- Converged bool `json:"converged"`
-}
-
-type jobResponse struct {
- Active bool `json:"active"`
- Status string `json:"status"`
- Log map[string][]logMessage `json:"log"`
- LastID int64 `json:"lastId"`
-}
-
-type logMessage struct {
- At int64 `json:"at"`
- Type string `json:"type"`
- Message string `json:"message"`
-}
-
-type responseFunc func(status int, response []byte) (bool, error)
-
-type requestFunc func() *http.Request
-
func wait(fn responseFunc, reqFn requestFunc, certificate *tls.Certificate, timeout time.Duration) (int, error) {
if certificate != nil {
util.ActiveHttpClient.UseCertificate([]tls.Certificate{*certificate})