summaryrefslogtreecommitdiffstats
path: root/client
diff options
context:
space:
mode:
authorArne H Juul <arnej27959@users.noreply.github.com>2023-02-28 14:18:39 +0100
committerGitHub <noreply@github.com>2023-02-28 14:18:39 +0100
commit05a7e7a75feae44c6ca9ed27d4d3570873603702 (patch)
treea73d9fb06232da3b442b5fb58bb27782816c2a77 /client
parentcf11ca4ef88b8d05841b7217695ee6612e9d8899 (diff)
parent542e566de9b06299034fa73bec0a2572bf604c5b (diff)
Merge pull request #26086 from vespa-engine/arnej/add-visit-in-vespa-cli
Arnej/add visit in vespa cli
Diffstat (limited to 'client')
-rw-r--r--client/go/internal/cli/cmd/root.go5
-rw-r--r--client/go/internal/cli/cmd/visit.go348
-rw-r--r--client/go/internal/cli/cmd/visit_test.go142
3 files changed, 495 insertions, 0 deletions
diff --git a/client/go/internal/cli/cmd/root.go b/client/go/internal/cli/cmd/root.go
index faba6bbbfd4..70e0afbcd32 100644
--- a/client/go/internal/cli/cmd/root.go
+++ b/client/go/internal/cli/cmd/root.go
@@ -250,6 +250,7 @@ func (c *CLI) configureCommands() {
rootCmd.AddCommand(statusCmd) // status
rootCmd.AddCommand(newTestCmd(c)) // test
rootCmd.AddCommand(newVersionCmd(c)) // version
+ rootCmd.AddCommand(newVisitCmd(c)) // visit
}
func (c *CLI) printErr(err error, hints ...string) {
@@ -263,6 +264,10 @@ func (c *CLI) printSuccess(msg ...interface{}) {
fmt.Fprintln(c.Stdout, color.GreenString("Success:"), fmt.Sprint(msg...))
}
+func (c *CLI) printDebug(msg ...interface{}) {
+ fmt.Fprintln(c.Stderr, color.CyanString("Debug:"), fmt.Sprint(msg...))
+}
+
func (c *CLI) printWarning(msg interface{}, hints ...string) {
fmt.Fprintln(c.Stderr, color.YellowString("Warning:"), msg)
for _, hint := range hints {
diff --git a/client/go/internal/cli/cmd/visit.go b/client/go/internal/cli/cmd/visit.go
new file mode 100644
index 00000000000..1022d74354d
--- /dev/null
+++ b/client/go/internal/cli/cmd/visit.go
@@ -0,0 +1,348 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// vespa visit command
+// Author: arnej
+
+package cmd
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "net/url"
+ "strings"
+ "time"
+
+ "github.com/spf13/cobra"
+ "github.com/vespa-engine/vespa/client/go/internal/util"
+ "github.com/vespa-engine/vespa/client/go/internal/vespa"
+)
+
+type visitArgs struct {
+ contentCluster string
+ fieldSet string
+ selection string
+ makeFeed bool
+ jsonLines bool
+ pretty bool
+ debugMode bool
+ chunkCount int
+ cli *CLI
+}
+
+func (v *visitArgs) writeBytes(b []byte) {
+ v.cli.Stdout.Write(b)
+}
+
+func (v *visitArgs) writeString(s string) {
+ v.writeBytes([]byte(s))
+}
+
+func (v *visitArgs) debugPrint(s string) {
+ if v.debugMode {
+ v.cli.printDebug(s)
+ }
+}
+
+func (v *visitArgs) dumpDocuments(documents []DocumentBlob) {
+ comma := false
+ pretty := false
+ if v.makeFeed {
+ comma = true
+ pretty = v.pretty
+ } else if !v.jsonLines {
+ return
+ }
+ for _, value := range documents {
+ if pretty {
+ var prettyJSON bytes.Buffer
+ parseError := json.Indent(&prettyJSON, value.blob, "", " ")
+ if parseError != nil {
+ v.writeBytes(value.blob)
+ } else {
+ v.writeBytes(prettyJSON.Bytes())
+ }
+ } else {
+ v.writeBytes(value.blob)
+ }
+ if comma {
+ v.writeString(",\n")
+ } else {
+ v.writeString("\n")
+ }
+ }
+}
+
+var totalDocCount int
+
+func newVisitCmd(cli *CLI) *cobra.Command {
+ var (
+ vArgs visitArgs
+ )
+ cmd := &cobra.Command{
+ Use: "visit",
+ Short: "Visit and print all documents in a vespa cluster",
+ Long: `Run visiting to retrieve all documents.
+
+By default prints each document received on its own line (JSON-L format).
+`,
+ Example: `$ vespa visit # get documents from any cluster
+$ vespa visit --content-cluster search # get documents from cluster named "search"
+`,
+ Args: cobra.MaximumNArgs(0),
+ DisableAutoGenTag: true,
+ SilenceUsage: true,
+ RunE: func(cmd *cobra.Command, args []string) error {
+ vArgs.cli = cli
+ service, err := documentService(cli)
+ if err != nil {
+ return err
+ }
+ result := probeHandler(service, cli)
+ if result.Success {
+ result = visitClusters(&vArgs, service)
+ }
+ if !result.Success {
+ return fmt.Errorf("visit failed: %s", result.Message)
+ }
+ vArgs.debugPrint(fmt.Sprintf("sum of 'documentCount': %d", totalDocCount))
+ return nil
+ },
+ }
+ cmd.Flags().StringVar(&vArgs.contentCluster, "content-cluster", "*", `Which content cluster to visit documents from`)
+ cmd.Flags().StringVar(&vArgs.fieldSet, "field-set", "", `Which fieldset to ask for`)
+ cmd.Flags().StringVar(&vArgs.selection, "selection", "", `select subset of cluster`)
+ cmd.Flags().BoolVar(&vArgs.debugMode, "debug-mode", false, `print debugging output`)
+ cmd.Flags().BoolVar(&vArgs.jsonLines, "json-lines", true, `output documents as JSON lines`)
+ cmd.Flags().BoolVar(&vArgs.makeFeed, "make-feed", false, `output JSON array suitable for vespa-feeder`)
+ cmd.Flags().BoolVar(&vArgs.pretty, "pretty-json", false, `format pretty JSON`)
+ cmd.Flags().IntVar(&vArgs.chunkCount, "chunk-count", 1000, `chunk by count`)
+ return cmd
+}
+
+type HandlersInfo struct {
+ Handlers []struct {
+ HandlerId string `json:"id"`
+ HandlerClass string `json:"class"`
+ HandlerBundle string `json:"bundle"`
+ ServerBindings []string `json:"serverBindings"`
+ } `json:"handlers"`
+}
+
+func parseHandlersOutput(r io.Reader) (*HandlersInfo, error) {
+ var handlersInfo HandlersInfo
+ codec := json.NewDecoder(r)
+ err := codec.Decode(&handlersInfo)
+ return &handlersInfo, err
+}
+
+func probeHandler(service *vespa.Service, cli *CLI) (res util.OperationResult) {
+ urlPath := service.BaseURL + "/"
+ url, urlParseError := url.Parse(urlPath)
+ if urlParseError != nil {
+ return util.Failure("Invalid request path: '" + urlPath + "': " + urlParseError.Error())
+ }
+ request := &http.Request{
+ URL: url,
+ Method: "GET",
+ }
+ timeout := time.Duration(90) * time.Second
+ response, err := service.Do(request, timeout)
+ if err != nil {
+ return util.Failure("Request failed: " + err.Error())
+ }
+ defer response.Body.Close()
+ if response.StatusCode == 200 {
+ handlersInfo, err := parseHandlersOutput(response.Body)
+ if err != nil || len(handlersInfo.Handlers) == 0 {
+ cli.printWarning("Could not parse JSON response from"+urlPath, err.Error())
+ return util.Failure("Bad endpoint")
+ }
+ for _, h := range handlersInfo.Handlers {
+ if strings.HasSuffix(h.HandlerClass, "DocumentV1ApiHandler") {
+ for _, binding := range h.ServerBindings {
+ if strings.Contains(binding, "/document/v1/") {
+ return util.Success("handler OK")
+ }
+ }
+ w := fmt.Sprintf("expected /document/v1/ binding, but got: %v", h.ServerBindings)
+ cli.printWarning(w)
+ }
+ }
+ cli.printWarning("Missing /document/v1/ API; add <document-api /> to the container cluster delcaration in services.xml")
+ return util.Failure("Missing /document/v1 API")
+ } else {
+ return util.FailureWithPayload(service.Description()+" at "+request.URL.Host+": "+response.Status, util.ReaderToJSON(response.Body))
+ }
+}
+
+func visitClusters(vArgs *visitArgs, service *vespa.Service) (res util.OperationResult) {
+ clusters := []string{
+ vArgs.contentCluster,
+ }
+ if vArgs.contentCluster == "*" {
+ clusters = probeVisit(vArgs, service)
+ }
+ if vArgs.makeFeed {
+ vArgs.writeString("[\n")
+ }
+ for _, c := range clusters {
+ vArgs.contentCluster = c
+ res = runVisit(vArgs, service)
+ if !res.Success {
+ return res
+ }
+ vArgs.debugPrint("Success: " + res.Message)
+ }
+ if vArgs.makeFeed {
+ vArgs.writeString("{}\n]\n")
+ }
+ return res
+}
+
+func probeVisit(vArgs *visitArgs, service *vespa.Service) []string {
+ clusters := make([]string, 0, 3)
+ vvo, _ := runOneVisit(vArgs, service, "")
+ if vvo != nil {
+ msg := vvo.ErrorMsg
+ if strings.Contains(msg, "no content cluster '*'") {
+ for idx, value := range strings.Split(msg, ",") {
+ if idx > 0 {
+ parts := strings.Split(value, "'")
+ if len(parts) == 3 {
+ clusters = append(clusters, parts[1])
+ }
+ }
+ }
+ }
+ }
+ return clusters
+}
+
+func runVisit(vArgs *visitArgs, service *vespa.Service) (res util.OperationResult) {
+ vArgs.debugPrint(fmt.Sprintf("trying to visit: '%s'", vArgs.contentCluster))
+ var totalDocuments int = 0
+ var continuationToken string
+ for {
+ var vvo *VespaVisitOutput
+ vvo, res = runOneVisit(vArgs, service, continuationToken)
+ if !res.Success {
+ if vvo != nil && vvo.ErrorMsg != "" {
+ vArgs.cli.printWarning(vvo.ErrorMsg)
+ }
+ return res
+ }
+ vArgs.dumpDocuments(vvo.Documents)
+ vArgs.debugPrint(fmt.Sprintf("got %d documents", len(vvo.Documents)))
+ totalDocuments += len(vvo.Documents)
+ continuationToken = vvo.Continuation
+ if continuationToken == "" {
+ break
+ }
+ }
+ res.Message = fmt.Sprintf("%s [%d documents visited]", res.Message, totalDocuments)
+ return
+}
+
+func quoteArgForUrl(arg string) string {
+ var buf strings.Builder
+ buf.Grow(len(arg))
+ for _, r := range arg {
+ switch {
+ case 'a' <= r && r <= 'z':
+ buf.WriteRune(r)
+ case 'A' <= r && r <= 'Z':
+ buf.WriteRune(r)
+ case '0' <= r && r <= '9':
+ buf.WriteRune(r)
+ case r <= ' ' || r > '~':
+ buf.WriteRune('+')
+ default:
+ s := fmt.Sprintf("%s%02X", "%", r)
+ buf.WriteString(s)
+ }
+ }
+ return buf.String()
+}
+
+func runOneVisit(vArgs *visitArgs, service *vespa.Service, contToken string) (*VespaVisitOutput, util.OperationResult) {
+ urlPath := service.BaseURL + "/document/v1/?cluster=" + quoteArgForUrl(vArgs.contentCluster)
+ if vArgs.fieldSet != "" {
+ urlPath = urlPath + "&fieldSet=" + quoteArgForUrl(vArgs.fieldSet)
+ }
+ if vArgs.selection != "" {
+ urlPath = urlPath + "&selection=" + quoteArgForUrl(vArgs.selection)
+ }
+ if contToken != "" {
+ urlPath = urlPath + "&continuation=" + contToken
+ }
+ if vArgs.chunkCount > 0 {
+ urlPath = urlPath + fmt.Sprintf("&wantedDocumentCount=%d", vArgs.chunkCount)
+ }
+ url, urlParseError := url.Parse(urlPath)
+ if urlParseError != nil {
+ return nil, util.Failure("Invalid request path: '" + urlPath + "': " + urlParseError.Error())
+ }
+ request := &http.Request{
+ URL: url,
+ Method: "GET",
+ }
+ timeout := time.Duration(900) * time.Second
+ response, err := service.Do(request, timeout)
+ if err != nil {
+ return nil, util.Failure("Request failed: " + err.Error())
+ }
+ defer response.Body.Close()
+ vvo, err := parseVisitOutput(response.Body)
+ if response.StatusCode == 200 {
+ if err == nil {
+ totalDocCount += vvo.DocumentCount
+ if vvo.DocumentCount != len(vvo.Documents) {
+ vArgs.cli.printWarning(fmt.Sprintf("Inconsistent contents from: %v", url))
+ vArgs.cli.printWarning(fmt.Sprintf("claimed count: %d", vvo.DocumentCount))
+ vArgs.cli.printWarning(fmt.Sprintf("document blobs: %d", len(vvo.Documents)))
+ return nil, util.Failure("Inconsistent contents from document API")
+ }
+ return vvo, util.Success("visited " + vArgs.contentCluster)
+ } else {
+ return nil, util.Failure("error reading response: " + err.Error())
+ }
+ } else if response.StatusCode/100 == 4 {
+ return vvo, util.FailureWithPayload("Invalid document operation: "+response.Status, util.ReaderToJSON(response.Body))
+ } else {
+ return vvo, util.FailureWithPayload(service.Description()+" at "+request.URL.Host+": "+response.Status, util.ReaderToJSON(response.Body))
+ }
+}
+
+type DocumentBlob struct {
+ blob []byte
+}
+
+func (d *DocumentBlob) UnmarshalJSON(data []byte) error {
+ d.blob = make([]byte, len(data))
+ copy(d.blob, data)
+ return nil
+}
+
+func (d *DocumentBlob) MarshalJSON() ([]byte, error) {
+ return d.blob, nil
+}
+
+type VespaVisitOutput struct {
+ PathId string `json:"pathId"`
+ Documents []DocumentBlob `json:"documents"`
+ DocumentCount int `json:"documentCount"`
+ Continuation string `json:"continuation"`
+ ErrorMsg string `json:"message"`
+}
+
+func parseVisitOutput(r io.Reader) (*VespaVisitOutput, error) {
+ codec := json.NewDecoder(r)
+ var parsedJson VespaVisitOutput
+ err := codec.Decode(&parsedJson)
+ if err != nil {
+ return nil, fmt.Errorf("could not decode JSON, error: %s", err.Error())
+ }
+ return &parsedJson, nil
+}
diff --git a/client/go/internal/cli/cmd/visit_test.go b/client/go/internal/cli/cmd/visit_test.go
new file mode 100644
index 00000000000..4302680b9d9
--- /dev/null
+++ b/client/go/internal/cli/cmd/visit_test.go
@@ -0,0 +1,142 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package cmd
+
+import (
+ "fmt"
+ "net/http"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/vespa-engine/vespa/client/go/internal/mock"
+ "github.com/vespa-engine/vespa/client/go/internal/vespa"
+)
+
+const (
+ normalpre = `{"pathId":"/document/v1/","documents":[`
+ document1 = `{"id":"id:t:m::1","fields":{"title":"t"}}`
+ document2 = `{"id":"id:t:m::2","fields":{"title":"t2"}}`
+ document3 = `{"id":"id:t:m::3","fields":{"ar":"xyz","w":63,"title":"xyzzy","year":2000}}`
+
+ savedresponse = `{"pathId":"/document/v1/","documents":[{"id":"id:test:music::1921492307","fields":{"title":"song","year":2010}},{"id":"id:test:music::p_try-this-clean-bonus-dvd-_music_1922003403","fields":{"artist":"xyz","weight":600000,"song":"hate","title":"xyz","year":2000}}],"documentCount":2,"continuation":"AAAACAAAAAAAAAAJAAAAAAAAAAgAAAAAAAABAAAAAAEgAAAAAAAAEAAAAAAAAAAA"}`
+
+ saveddoc0 = `{"id":"id:test:music::1921492307","fields":{"title":"song","year":2010}}`
+ saveddoc1 = `{"id":"id:test:music::p_try-this-clean-bonus-dvd-_music_1922003403","fields":{"artist":"xyz","weight":600000,"song":"hate","title":"xyz","year":2000}}`
+ handlersResponse = `{
+ "handlers" : [ {
+ "id" : "com.yahoo.container.usability.BindingsOverviewHandler",
+ "class" : "com.yahoo.container.usability.BindingsOverviewHandler",
+ "bundle" : "container-disc:8.0.0",
+ "serverBindings" : [ "http://*/" ]
+ }, {
+ "id" : "com.yahoo.document.restapi.resource.DocumentV1ApiHandler",
+ "class" : "com.yahoo.document.restapi.resource.DocumentV1ApiHandler",
+ "bundle" : "vespaclient-container-plugin:8.0.0",
+ "serverBindings" : [ "http://*/document/v1/*", "http://*/document/v1/*/" ]
+ } ]
+}`
+ clusterStarResponse = `{"pathId":"/document/v1/","message":"Your Vespa deployment has no content cluster '*', only 'fooCC'"}`
+)
+
+func TestQuoteFunc(t *testing.T) {
+ var buf []byte = make([]byte, 3)
+ buf[0] = 'a'
+ buf[2] = 'z'
+ for i := 0; i < 256; i++ {
+ buf[1] = byte(i)
+ s := string(buf)
+ res := quoteArgForUrl(s)
+ if i < 32 || i > 127 {
+ assert.Equal(t, "a+z", res)
+ } else {
+ fmt.Printf("res %3d => '%s'\n", i, res)
+ }
+ }
+}
+
+// low-level (unit) test
+func TestRunOneVisit(t *testing.T) {
+ withResponse := func(client *mock.HTTPClient) {
+ client.NextResponseString(200, savedresponse)
+ }
+ op := func(service *vespa.Service) {
+ vArgs := visitArgs{
+ contentCluster: "fooCC",
+ }
+ vvo, res := runOneVisit(&vArgs, service, "BBBB")
+ assert.Equal(t, true, res.Success)
+ assert.Equal(t, "visited fooCC", res.Message)
+ assert.Equal(t, "/document/v1/", vvo.PathId)
+ assert.Equal(t, "", vvo.ErrorMsg)
+ assert.Equal(t, "AAAACAAAAAAAAAAJAAAAAAAAAAgAAAAAAAABAAAAAAEgAAAAAAAAEAAAAAAAAAAA", vvo.Continuation)
+ assert.Equal(t, 2, vvo.DocumentCount)
+ assert.Equal(t, 2, len(vvo.Documents))
+ assert.Equal(t, saveddoc0, string(vvo.Documents[0].blob))
+ assert.Equal(t, saveddoc1, string(vvo.Documents[1].blob))
+ }
+ req := withMockClient(t, withResponse, op)
+ assert.Equal(t, "cluster=fooCC&continuation=BBBB", req.URL.RawQuery)
+
+ op = func(service *vespa.Service) {
+ vArgs := visitArgs{
+ contentCluster: "search",
+ fieldSet: "[id]",
+ selection: "music.year>2000",
+ chunkCount: 123,
+ }
+ vvo, res := runOneVisit(&vArgs, service, "asdf")
+ assert.Equal(t, true, res.Success)
+ assert.Equal(t, 2, vvo.DocumentCount)
+ }
+ req = withMockClient(t, withResponse, op)
+ assert.Equal(t, "cluster=search&fieldSet=%5Bid%5D&selection=music%2Eyear%3E2000&continuation=asdf&wantedDocumentCount=123", req.URL.RawQuery)
+}
+
+func withMockClient(t *testing.T, prepCli func(*mock.HTTPClient), runOp func(*vespa.Service)) *http.Request {
+ client := &mock.HTTPClient{}
+ prepCli(client)
+ cli, _, _ := newTestCLI(t)
+ cli.httpClient = client
+ service, _ := documentService(cli)
+ runOp(service)
+ return client.LastRequest
+}
+
+func TestVisitCommand(t *testing.T) {
+ assertVisitResults(
+ []string{
+ "visit",
+ "--json-lines",
+ },
+ t,
+ []string{
+ normalpre +
+ document1 +
+ `],"documentCount":1,"continuation":"CAFE"}`,
+ normalpre +
+ document2 +
+ "," +
+ document3 +
+ `],"documentCount":2}`,
+ },
+ "cluster=fooCC&continuation=CAFE&wantedDocumentCount=1000",
+ document1+"\n"+
+ document2+"\n"+
+ document3+"\n")
+}
+
+func assertVisitResults(arguments []string, t *testing.T, responses []string, queryPart, output string) {
+ client := &mock.HTTPClient{}
+ client.NextResponseString(200, handlersResponse)
+ client.NextResponseString(400, clusterStarResponse)
+ for _, resp := range responses {
+ client.NextResponseString(200, resp)
+ }
+ cli, stdout, stderr := newTestCLI(t)
+ cli.httpClient = client
+ assert.Nil(t, cli.Run(arguments...))
+ assert.Equal(t, output, stdout.String())
+ assert.Equal(t, "", stderr.String())
+ assert.Equal(t, queryPart, client.LastRequest.URL.RawQuery)
+ assert.Equal(t, "/document/v1/", client.LastRequest.URL.Path)
+ assert.Equal(t, "GET", client.LastRequest.Method)
+}