diff options
37 files changed, 522 insertions, 407 deletions
diff --git a/ann_benchmark/src/vespa/ann_benchmark/vespa_ann_benchmark.cpp b/ann_benchmark/src/vespa/ann_benchmark/vespa_ann_benchmark.cpp index a52e0850b7d..730ee141f83 100644 --- a/ann_benchmark/src/vespa/ann_benchmark/vespa_ann_benchmark.cpp +++ b/ann_benchmark/src/vespa/ann_benchmark/vespa_ann_benchmark.cpp @@ -10,6 +10,7 @@ #include <vespa/searchcommon/attribute/config.h> #include <vespa/eval/eval/value.h> #include <vespa/vespalib/test/insertion_operators.h> +#include <vespa/vespalib/util/fake_doom.h> #include <iostream> #include <sstream> #include <limits> @@ -67,6 +68,7 @@ class HnswIndex const NearestNeighborIndex* _nearest_neighbor_index; size_t _dim_size; bool _normalize_vectors; + vespalib::FakeDoom _no_doom; bool check_lid(uint32_t lid); bool check_value(const char *op, const std::vector<float>& value); @@ -87,7 +89,8 @@ HnswIndex::HnswIndex(uint32_t dim_size, const HnswIndexParams &hnsw_index_params _tensor_attribute(nullptr), _nearest_neighbor_index(nullptr), _dim_size(0u), - _normalize_vectors(normalize_vectors) + _normalize_vectors(normalize_vectors), + _no_doom() { Config cfg(BasicType::TENSOR, CollectionType::SINGLE); _tensor_type = ValueType::from_spec(make_tensor_spec(dim_size)); @@ -208,7 +211,7 @@ HnswIndex::find_top_k(uint32_t k, const std::vector<float>& value, uint32_t expl std::vector<float> normalized_value; auto typed_cells = get_typed_cells(value, normalized_value); auto df = _nearest_neighbor_index->distance_function_factory().for_query_vector(typed_cells); - auto raw_result = _nearest_neighbor_index->find_top_k(k, *df, explore_k, std::numeric_limits<double>::max()); + auto raw_result = _nearest_neighbor_index->find_top_k(k, *df, explore_k, _no_doom.get_doom(), std::numeric_limits<double>::max()); result.reserve(raw_result.size()); switch (_hnsw_index_params.distance_metric()) { case DistanceMetric::Euclidean: diff --git a/client/go/internal/cli/cmd/document.go b/client/go/internal/cli/cmd/document.go index b5b63fd32df..07a98d2e626 100644 --- a/client/go/internal/cli/cmd/document.go +++ b/client/go/internal/cli/cmd/document.go @@ -5,15 +5,22 @@ package cmd import ( + "bytes" + "errors" "fmt" "io" + "net/http" + "os" + "strconv" "strings" "time" "github.com/fatih/color" "github.com/spf13/cobra" + "github.com/vespa-engine/vespa/client/go/internal/curl" "github.com/vespa-engine/vespa/client/go/internal/util" "github.com/vespa-engine/vespa/client/go/internal/vespa" + "github.com/vespa-engine/vespa/client/go/internal/vespa/document" ) func addDocumentFlags(cmd *cobra.Command, printCurl *bool, timeoutSecs *int) { @@ -21,6 +28,128 @@ func addDocumentFlags(cmd *cobra.Command, printCurl *bool, timeoutSecs *int) { cmd.PersistentFlags().IntVarP(timeoutSecs, "timeout", "T", 60, "Timeout for the document request in seconds") } +type serviceWithCurl struct { + curlCmdWriter io.Writer + bodyFile string + service *vespa.Service +} + +func (s *serviceWithCurl) Do(request *http.Request, timeout time.Duration) (*http.Response, error) { + cmd, err := curl.RawArgs(request.URL.String()) + if err != nil { + return nil, err + } + cmd.Method = request.Method + for k, vs := range request.Header { + for _, v := range vs { + cmd.Header(k, v) + } + } + if s.bodyFile != "" { + cmd.WithBodyFile(s.bodyFile) + } + cmd.Certificate = s.service.TLSOptions.CertificateFile + cmd.PrivateKey = s.service.TLSOptions.PrivateKeyFile + out := cmd.String() + "\n" + if _, err := io.WriteString(s.curlCmdWriter, out); err != nil { + return nil, err + } + return s.service.Do(request, timeout) +} + +func documentClient(cli *CLI, timeoutSecs int, printCurl bool) (*document.Client, *serviceWithCurl, error) { + docService, err := documentService(cli) + if err != nil { + return nil, nil, err + } + service := &serviceWithCurl{curlCmdWriter: io.Discard, service: docService} + if printCurl { + service.curlCmdWriter = cli.Stderr + } + client, err := document.NewClient(document.ClientOptions{ + Compression: document.CompressionAuto, + Timeout: time.Duration(timeoutSecs) * time.Second, + BaseURL: docService.BaseURL, + NowFunc: time.Now, + }, []util.HTTPClient{service}) + if err != nil { + return nil, nil, err + } + return client, service, nil +} + +func sendOperation(op document.Operation, args []string, timeoutSecs int, printCurl bool, cli *CLI) error { + client, service, err := documentClient(cli, timeoutSecs, printCurl) + if err != nil { + return err + } + id := "" + filename := args[0] + if len(args) > 1 { + id = args[0] + filename = args[1] + } + f, err := os.Open(filename) + if err != nil { + return err + } + defer f.Close() + doc, err := document.NewDecoder(f).Decode() + if errors.Is(err, document.ErrMissingId) { + if id == "" { + return fmt.Errorf("no document id given neither as argument or as a 'put', 'update' or 'remove' key in the JSON file") + } + } else if err != nil { + return err + } + if id != "" { + docId, err := document.ParseId(id) + if err != nil { + return err + } + doc.Id = docId + } + if op > -1 { + if id == "" && op != doc.Operation { + return fmt.Errorf("wanted document operation is %s, but JSON file specifies %s", op, doc.Operation) + } + doc.Operation = op + } + if doc.Body != nil { + service.bodyFile = f.Name() + } + result := client.Send(doc) + return printResult(cli, operationResult(false, doc, service.service, result), false) +} + +func readDocument(id string, timeoutSecs int, printCurl bool, cli *CLI) error { + client, service, err := documentClient(cli, timeoutSecs, printCurl) + if err != nil { + return err + } + docId, err := document.ParseId(id) + if err != nil { + return err + } + result := client.Get(docId) + return printResult(cli, operationResult(true, document.Document{Id: docId}, service.service, result), true) +} + +func operationResult(read bool, doc document.Document, service *vespa.Service, result document.Result) util.OperationResult { + bodyReader := bytes.NewReader(result.Body) + if result.HTTPStatus == 200 { + if read { + return util.SuccessWithPayload("Read "+doc.Id.String(), util.ReaderToJSON(bodyReader)) + } else { + return util.Success(doc.Operation.String() + " " + doc.Id.String()) + } + } + if result.HTTPStatus/100 == 4 { + return util.FailureWithPayload("Invalid document operation: Status "+strconv.Itoa(result.HTTPStatus), util.ReaderToJSON(bodyReader)) + } + return util.FailureWithPayload(service.Description()+" at "+service.BaseURL+": Status "+strconv.Itoa(result.HTTPStatus), util.ReaderToJSON(bodyReader)) +} + func newDocumentCmd(cli *CLI) *cobra.Command { var ( printCurl bool @@ -44,11 +173,7 @@ should be used instead of this.`, SilenceUsage: true, Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { - service, err := documentService(cli) - if err != nil { - return err - } - return printResult(cli, vespa.Send(args[0], service, operationOptions(cli.Stderr, printCurl, timeoutSecs)), false) + return sendOperation(-1, args, timeoutSecs, printCurl, cli) }, } addDocumentFlags(cmd, &printCurl, &timeoutSecs) @@ -72,15 +197,7 @@ $ vespa document put id:mynamespace:music::a-head-full-of-dreams src/test/resour DisableAutoGenTag: true, SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { - service, err := documentService(cli) - if err != nil { - return err - } - if len(args) == 1 { - return printResult(cli, vespa.Put("", args[0], service, operationOptions(cli.Stderr, printCurl, timeoutSecs)), false) - } else { - return printResult(cli, vespa.Put(args[0], args[1], service, operationOptions(cli.Stderr, printCurl, timeoutSecs)), false) - } + return sendOperation(document.OperationPut, args, timeoutSecs, printCurl, cli) }, } addDocumentFlags(cmd, &printCurl, &timeoutSecs) @@ -103,15 +220,7 @@ $ vespa document update id:mynamespace:music::a-head-full-of-dreams src/test/res DisableAutoGenTag: true, SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { - service, err := documentService(cli) - if err != nil { - return err - } - if len(args) == 1 { - return printResult(cli, vespa.Update("", args[0], service, operationOptions(cli.Stderr, printCurl, timeoutSecs)), false) - } else { - return printResult(cli, vespa.Update(args[0], args[1], service, operationOptions(cli.Stderr, printCurl, timeoutSecs)), false) - } + return sendOperation(document.OperationUpdate, args, timeoutSecs, printCurl, cli) }, } addDocumentFlags(cmd, &printCurl, &timeoutSecs) @@ -134,14 +243,20 @@ $ vespa document remove id:mynamespace:music::a-head-full-of-dreams`, DisableAutoGenTag: true, SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { - service, err := documentService(cli) - if err != nil { - return err - } if strings.HasPrefix(args[0], "id:") { - return printResult(cli, vespa.RemoveId(args[0], service, operationOptions(cli.Stderr, printCurl, timeoutSecs)), false) + client, service, err := documentClient(cli, timeoutSecs, printCurl) + if err != nil { + return err + } + id, err := document.ParseId(args[0]) + if err != nil { + return err + } + doc := document.Document{Id: id, Operation: document.OperationRemove} + result := client.Send(doc) + return printResult(cli, operationResult(false, doc, service.service, result), false) } else { - return printResult(cli, vespa.RemoveOperation(args[0], service, operationOptions(cli.Stderr, printCurl, timeoutSecs)), false) + return sendOperation(document.OperationRemove, args, timeoutSecs, printCurl, cli) } }, } @@ -162,11 +277,7 @@ func newDocumentGetCmd(cli *CLI) *cobra.Command { SilenceUsage: true, Example: `$ vespa document get id:mynamespace:music::a-head-full-of-dreams`, RunE: func(cmd *cobra.Command, args []string) error { - service, err := documentService(cli) - if err != nil { - return err - } - return printResult(cli, vespa.Get(args[0], service, operationOptions(cli.Stderr, printCurl, timeoutSecs)), true) + return readDocument(args[0], timeoutSecs, printCurl, cli) }, } addDocumentFlags(cmd, &printCurl, &timeoutSecs) @@ -181,17 +292,6 @@ func documentService(cli *CLI) (*vespa.Service, error) { return cli.service(target, vespa.DocumentService, 0, cli.config.cluster()) } -func operationOptions(stderr io.Writer, printCurl bool, timeoutSecs int) vespa.OperationOptions { - curlOutput := io.Discard - if printCurl { - curlOutput = stderr - } - return vespa.OperationOptions{ - CurlOutput: curlOutput, - Timeout: time.Second * time.Duration(timeoutSecs), - } -} - func printResult(cli *CLI, result util.OperationResult, payloadOnlyOnSuccess bool) error { out := cli.Stdout if !result.Success { diff --git a/client/go/internal/cli/cmd/document_test.go b/client/go/internal/cli/cmd/document_test.go index bf9cc0404dc..00f98ee1333 100644 --- a/client/go/internal/cli/cmd/document_test.go +++ b/client/go/internal/cli/cmd/document_test.go @@ -5,6 +5,7 @@ package cmd import ( + "encoding/json" "os" "strconv" "testing" @@ -20,6 +21,11 @@ func TestDocumentSendPut(t *testing.T) { "put", "POST", "id:mynamespace:music::a-head-full-of-dreams", "testdata/A-Head-Full-of-Dreams-Put.json", t) } +func TestDocumentSendPutWithIdInFile(t *testing.T) { + assertDocumentSend([]string{"document", "testdata/A-Head-Full-of-Dreams-Put-Id.json"}, + "put", "POST", "id:mynamespace:music::a-head-full-of-dreams", "testdata/A-Head-Full-of-Dreams-Put-Id.json", t) +} + func TestDocumentSendPutVerbose(t *testing.T) { assertDocumentSend([]string{"document", "-v", "testdata/A-Head-Full-of-Dreams-Put.json"}, "put", "POST", "id:mynamespace:music::a-head-full-of-dreams", "testdata/A-Head-Full-of-Dreams-Put.json", t) @@ -32,7 +38,7 @@ func TestDocumentSendUpdate(t *testing.T) { func TestDocumentSendRemove(t *testing.T) { assertDocumentSend([]string{"document", "testdata/A-Head-Full-of-Dreams-Remove.json"}, - "remove", "DELETE", "id:mynamespace:music::a-head-full-of-dreams", "testdata/A-Head-Full-of-Dreams-Remove.json", t) + "remove", "DELETE", "id:mynamespace:music::a-head-full-of-dreams", "", t) } func TestDocumentPutWithIdArg(t *testing.T) { @@ -57,19 +63,24 @@ func TestDocumentUpdateWithoutIdArg(t *testing.T) { func TestDocumentRemoveWithIdArg(t *testing.T) { assertDocumentSend([]string{"document", "remove", "id:mynamespace:music::a-head-full-of-dreams"}, - "remove", "DELETE", "id:mynamespace:music::a-head-full-of-dreams", "testdata/A-Head-Full-of-Dreams-Remove.json", t) + "remove", "DELETE", "id:mynamespace:music::a-head-full-of-dreams", "", t) } func TestDocumentRemoveWithoutIdArg(t *testing.T) { assertDocumentSend([]string{"document", "remove", "testdata/A-Head-Full-of-Dreams-Remove.json"}, - "remove", "DELETE", "id:mynamespace:music::a-head-full-of-dreams", "testdata/A-Head-Full-of-Dreams-Remove.json", t) + "remove", "DELETE", "id:mynamespace:music::a-head-full-of-dreams", "", t) +} + +func TestDocumentRemoveWithoutIdArgVerbose(t *testing.T) { + assertDocumentSend([]string{"document", "remove", "-v", "testdata/A-Head-Full-of-Dreams-Remove.json"}, + "remove", "DELETE", "id:mynamespace:music::a-head-full-of-dreams", "", t) } func TestDocumentSendMissingId(t *testing.T) { cli, _, stderr := newTestCLI(t) assert.NotNil(t, cli.Run("document", "put", "testdata/A-Head-Full-of-Dreams-Without-Operation.json")) assert.Equal(t, - "Error: No document id given neither as argument or as a 'put' key in the json file\n", + "Error: no document id given neither as argument or as a 'put', 'update' or 'remove' key in the JSON file\n", stderr.String()) } @@ -77,7 +88,7 @@ func TestDocumentSendWithDisagreeingOperations(t *testing.T) { cli, _, stderr := newTestCLI(t) assert.NotNil(t, cli.Run("document", "update", "testdata/A-Head-Full-of-Dreams-Put.json")) assert.Equal(t, - "Error: Wanted document operation is update but the JSON file specifies put\n", + "Error: wanted document operation is update, but JSON file specifies put\n", stderr.String()) } @@ -103,7 +114,7 @@ func assertDocumentSend(arguments []string, expectedOperation string, expectedMe t.Fatal(err) } expectedPath, _ := vespa.IdToURLPath(expectedDocumentId) - expectedURL := documentURL + "/document/v1/" + expectedPath + expectedURL := documentURL + "/document/v1/" + expectedPath + "?timeout=60000ms" assert.Nil(t, cli.Run(arguments...)) verbose := false @@ -113,16 +124,29 @@ func assertDocumentSend(arguments []string, expectedOperation string, expectedMe } } if verbose { - expectedCurl := "curl -X " + expectedMethod + " -H 'Content-Type: application/json' --data-binary @" + expectedPayloadFile + " " + expectedURL + "\n" + expectedCurl := "curl -X " + expectedMethod + " -H 'Content-Type: application/json; charset=utf-8' -H 'User-Agent: Vespa CLI/0.0.0-devel'" + if expectedPayloadFile != "" { + expectedCurl += " --data-binary @" + expectedPayloadFile + } + expectedCurl += " '" + expectedURL + "'\n" assert.Equal(t, expectedCurl, stderr.String()) } assert.Equal(t, "Success: "+expectedOperation+" "+expectedDocumentId+"\n", stdout.String()) assert.Equal(t, expectedURL, client.LastRequest.URL.String()) - assert.Equal(t, "application/json", client.LastRequest.Header.Get("Content-Type")) + assert.Equal(t, "application/json; charset=utf-8", client.LastRequest.Header.Get("Content-Type")) assert.Equal(t, expectedMethod, client.LastRequest.Method) - expectedPayload, _ := os.ReadFile(expectedPayloadFile) - assert.Equal(t, string(expectedPayload), util.ReaderToString(client.LastRequest.Body)) + if expectedPayloadFile != "" { + data, err := os.ReadFile(expectedPayloadFile) + assert.Nil(t, err) + var expectedPayload struct { + Fields json.RawMessage `json:"fields"` + } + assert.Nil(t, json.Unmarshal(data, &expectedPayload)) + assert.Equal(t, `{"fields":`+string(expectedPayload.Fields)+"}", util.ReaderToString(client.LastRequest.Body)) + } else { + assert.Nil(t, client.LastRequest.Body) + } } func assertDocumentGet(arguments []string, documentId string, t *testing.T) { @@ -170,7 +194,7 @@ func assertDocumentServerError(t *testing.T, status int, errorMessage string) { "id:mynamespace:music::a-head-full-of-dreams", "testdata/A-Head-Full-of-Dreams-Put.json")) assert.Equal(t, - "Error: Container (document API) at 127.0.0.1:8080: Status "+strconv.Itoa(status)+"\n\n"+errorMessage+"\n", + "Error: Container (document API) at http://127.0.0.1:8080: Status "+strconv.Itoa(status)+"\n\n"+errorMessage+"\n", stderr.String()) } diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go index 2a7d8491578..6d368cb210b 100644 --- a/client/go/internal/cli/cmd/feed.go +++ b/client/go/internal/cli/cmd/feed.go @@ -1,6 +1,7 @@ package cmd import ( + "bufio" "encoding/json" "fmt" "io" @@ -164,7 +165,7 @@ func feedFiles(files []string, dispatcher *document.Dispatcher, cli *CLI) { } func dispatchFrom(r io.ReadCloser, dispatcher *document.Dispatcher, cli *CLI) { - dec := document.NewDecoder(r) + dec := document.NewDecoder(bufio.NewReaderSize(r, 1<<26)) // Buffer up to 64M of data at a time defer r.Close() for { doc, err := dec.Decode() diff --git a/client/go/internal/cli/cmd/testdata/A-Head-Full-of-Dreams-Put-Id.json b/client/go/internal/cli/cmd/testdata/A-Head-Full-of-Dreams-Put-Id.json new file mode 100644 index 00000000000..d39b22782ab --- /dev/null +++ b/client/go/internal/cli/cmd/testdata/A-Head-Full-of-Dreams-Put-Id.json @@ -0,0 +1,15 @@ +{ + "id": "id:mynamespace:music::a-head-full-of-dreams", + "fields": { + "album": "A Head Full of Dreams", + "artist": "Coldplay", + "year": 2015, + "category_scores": { + "cells": [ + { "address" : { "cat" : "pop" }, "value": 1 }, + { "address" : { "cat" : "rock" }, "value": 0.2 }, + { "address" : { "cat" : "jazz" }, "value": 0 } + ] + } + } +} diff --git a/client/go/internal/curl/curl.go b/client/go/internal/curl/curl.go index daa60e6ff14..5f4b7928704 100644 --- a/client/go/internal/curl/curl.go +++ b/client/go/internal/curl/curl.go @@ -6,6 +6,7 @@ import ( "net/url" "os/exec" "runtime" + "sort" "github.com/alessio/shellescape" "github.com/vespa-engine/vespa/client/go/internal/util" @@ -61,6 +62,7 @@ func (c *Command) Args() []string { if c.Method != "" { args = append(args, "-X", c.Method) } + sort.Slice(c.headers, func(i, j int) bool { return c.headers[i].key < c.headers[j].key }) for _, header := range c.headers { args = append(args, "-H", header.key+": "+header.value) } diff --git a/client/go/internal/vespa/document.go b/client/go/internal/vespa/document.go deleted file mode 100644 index 9e4c8e7d136..00000000000 --- a/client/go/internal/vespa/document.go +++ /dev/null @@ -1,197 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -// vespa document API client -// Author: bratseth - -package vespa - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - "net/http" - "net/url" - "os" - "time" - - "github.com/vespa-engine/vespa/client/go/internal/curl" - "github.com/vespa-engine/vespa/client/go/internal/util" -) - -// Sends the operation given in the file -func Send(jsonFile string, service *Service, options OperationOptions) util.OperationResult { - return sendOperation("", jsonFile, service, anyOperation, options) -} - -func Put(documentId string, jsonFile string, service *Service, options OperationOptions) util.OperationResult { - return sendOperation(documentId, jsonFile, service, putOperation, options) -} - -func Update(documentId string, jsonFile string, service *Service, options OperationOptions) util.OperationResult { - return sendOperation(documentId, jsonFile, service, updateOperation, options) -} - -func RemoveId(documentId string, service *Service, options OperationOptions) util.OperationResult { - return sendOperation(documentId, "", service, removeOperation, options) -} - -func RemoveOperation(jsonFile string, service *Service, options OperationOptions) util.OperationResult { - return sendOperation("", jsonFile, service, removeOperation, options) -} - -const ( - anyOperation string = "any" - putOperation string = "put" - updateOperation string = "update" - removeOperation string = "remove" -) - -type OperationOptions struct { - CurlOutput io.Writer - Timeout time.Duration -} - -func sendOperation(documentId string, jsonFile string, service *Service, operation string, options OperationOptions) util.OperationResult { - header := http.Header{} - header.Add("Content-Type", "application/json") - - var documentData []byte - if operation == "remove" && jsonFile == "" { - documentData = []byte("{\n \"remove\": \"" + documentId + "\"\n}\n") - } else { - fileReader, err := os.Open(jsonFile) - if err != nil { - return util.FailureWithDetail("Could not open file '"+jsonFile+"'", err.Error()) - } - defer fileReader.Close() - documentData, err = io.ReadAll(fileReader) - if err != nil { - return util.FailureWithDetail("Failed to read '"+jsonFile+"'", err.Error()) - } - } - - var doc map[string]interface{} - if err := json.Unmarshal(documentData, &doc); err != nil { - return util.Failure(fmt.Sprintf("Document is not valid JSON: %s", err)) - } - - operationInFile := operationIn(doc) - if operation == anyOperation { // Operation is decided by file content - operation = operationInFile - } else if operationInFile != "" && operationInFile != operation { // Otherwise operation must match - return util.Failure("Wanted document operation is " + operation + " but the JSON file specifies " + operationInFile) - } - - if documentId == "" { // Document id is decided by file content - if doc[operation] == nil { - return util.Failure("No document id given neither as argument or as a '" + operation + "' key in the json file") - } - documentId = doc[operation].(string) // document feeder format - } - - documentPath, documentPathError := IdToURLPath(documentId) - if documentPathError != nil { - return util.Failure("Invalid document id '" + documentId + "': " + documentPathError.Error()) - } - - url, urlParseError := url.Parse(service.BaseURL + "/document/v1/" + documentPath) - if urlParseError != nil { - return util.Failure("Invalid request path: '" + service.BaseURL + "/document/v1/" + documentPath + "': " + urlParseError.Error()) - } - - request := &http.Request{ - URL: url, - Method: operationToHTTPMethod(operation), - Header: header, - Body: io.NopCloser(bytes.NewReader(documentData)), - } - response, err := serviceDo(service, request, jsonFile, options) - if err != nil { - return util.Failure("Request failed: " + err.Error()) - } - - defer response.Body.Close() - if response.StatusCode == 200 { - return util.Success(operation + " " + documentId) - } else if response.StatusCode/100 == 4 { - return util.FailureWithPayload("Invalid document operation: "+response.Status, util.ReaderToJSON(response.Body)) - } else { - return util.FailureWithPayload(service.Description()+" at "+request.URL.Host+": "+response.Status, util.ReaderToJSON(response.Body)) - } -} - -func operationIn(doc map[string]interface{}) string { - if doc["put"] != nil { - return "put" - } else if doc["update"] != nil { - return "update" - } else if doc["remove"] != nil { - return "remove" - } else { - return "" - } -} - -func operationToHTTPMethod(operation string) string { - switch operation { - case "put": - return "POST" - case "update": - return "PUT" - case "remove": - return "DELETE" - } - util.JustExitMsg("Unexpected document operation ''" + operation + "'") - panic("unreachable") -} - -func serviceDo(service *Service, request *http.Request, filename string, options OperationOptions) (*http.Response, error) { - cmd, err := curl.RawArgs(request.URL.String()) - if err != nil { - return nil, err - } - cmd.Method = request.Method - for k, vs := range request.Header { - for _, v := range vs { - cmd.Header(k, v) - } - } - cmd.WithBodyFile(filename) - cmd.Certificate = service.TLSOptions.CertificateFile - cmd.PrivateKey = service.TLSOptions.PrivateKeyFile - out := cmd.String() + "\n" - if _, err := io.WriteString(options.CurlOutput, out); err != nil { - return nil, err - } - return service.Do(request, options.Timeout) -} - -func Get(documentId string, service *Service, options OperationOptions) util.OperationResult { - documentPath, documentPathError := IdToURLPath(documentId) - if documentPathError != nil { - return util.Failure("Invalid document id '" + documentId + "': " + documentPathError.Error()) - } - - url, urlParseError := url.Parse(service.BaseURL + "/document/v1/" + documentPath) - if urlParseError != nil { - return util.Failure("Invalid request path: '" + service.BaseURL + "/document/v1/" + documentPath + "': " + urlParseError.Error()) - } - - request := &http.Request{ - URL: url, - Method: "GET", - } - response, err := serviceDo(service, request, "", options) - if err != nil { - return util.Failure("Request failed: " + err.Error()) - } - - defer response.Body.Close() - if response.StatusCode == 200 { - return util.SuccessWithPayload("Read "+documentId, util.ReaderToJSON(response.Body)) - } else if response.StatusCode/100 == 4 { - return util.FailureWithPayload("Invalid document operation: "+response.Status, util.ReaderToJSON(response.Body)) - } else { - return util.FailureWithPayload(service.Description()+" at "+request.URL.Host+": "+response.Status, util.ReaderToJSON(response.Body)) - } -} diff --git a/client/go/internal/vespa/document/circuit_breaker.go b/client/go/internal/vespa/document/circuit_breaker.go index f7f0f4360df..9bcf2e3f619 100644 --- a/client/go/internal/vespa/document/circuit_breaker.go +++ b/client/go/internal/vespa/document/circuit_breaker.go @@ -19,7 +19,7 @@ const ( type CircuitBreaker interface { Success() - Error(error) + Failure() State() CircuitState } @@ -28,7 +28,6 @@ type timeCircuitBreaker struct { doomDuration time.Duration failingSinceMillis atomic.Int64 - lastError atomic.Value halfOpen atomic.Bool open atomic.Bool @@ -42,10 +41,8 @@ func (b *timeCircuitBreaker) Success() { } } -func (b *timeCircuitBreaker) Error(err error) { - if b.failingSinceMillis.CompareAndSwap(math.MaxInt64, b.now().UnixMilli()) { - b.lastError.Store(err) - } +func (b *timeCircuitBreaker) Failure() { + b.failingSinceMillis.CompareAndSwap(math.MaxInt64, b.now().UnixMilli()) } func (b *timeCircuitBreaker) State() CircuitState { diff --git a/client/go/internal/vespa/document/circuit_breaker_test.go b/client/go/internal/vespa/document/circuit_breaker_test.go index 7a4fffaae27..05dbd6da2f5 100644 --- a/client/go/internal/vespa/document/circuit_breaker_test.go +++ b/client/go/internal/vespa/document/circuit_breaker_test.go @@ -1,7 +1,6 @@ package document import ( - "errors" "testing" "time" @@ -12,7 +11,6 @@ func TestCircuitBreaker(t *testing.T) { clock := &manualClock{} breaker := NewCircuitBreaker(time.Second, time.Minute) breaker.now = clock.now - err := errors.New("error") assert.Equal(t, CircuitClosed, breaker.State(), "Initial state is closed") @@ -25,7 +23,7 @@ func TestCircuitBreaker(t *testing.T) { clock.advance(100 * time.Second) assert.Equal(t, CircuitClosed, breaker.State(), "State is closed some time after a success") - breaker.Error(err) + breaker.Failure() assert.Equal(t, CircuitClosed, breaker.State(), "State is closed right after a failure") clock.advance(time.Second) @@ -37,7 +35,7 @@ func TestCircuitBreaker(t *testing.T) { breaker.Success() assert.Equal(t, CircuitClosed, breaker.State(), "State is closed after a new success") - breaker.Error(err) + breaker.Failure() clock.advance(time.Minute) assert.Equal(t, CircuitHalfOpen, breaker.State(), "State is half-open until doom duration has passed") diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go index a2b84aaeef2..d9273d2f677 100644 --- a/client/go/internal/vespa/document/dispatcher.go +++ b/client/go/internal/vespa/document/dispatcher.go @@ -3,6 +3,7 @@ package document import ( "fmt" "io" + "strconv" "strings" "sync" "sync/atomic" @@ -62,41 +63,51 @@ func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, o return d } -func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool { +func (d *Dispatcher) logResult(doc Document, result Result, retry bool) { if result.Trace != "" { - d.msgs <- fmt.Sprintf("feed: trace for %s:\n%s", op.document, result.Trace) + d.msgs <- fmt.Sprintf("feed: trace for %s %s:\n%s", doc.Operation, doc.Id, result.Trace) } - if result.Success() { - if d.verbose { - d.msgs <- fmt.Sprintf("feed: %s succeeded with status %d", op.document, result.HTTPStatus) - } - d.throttler.Success() - d.circuitBreaker.Success() - return false + if !d.verbose && result.Success() { + return } - if result.HTTPStatus == 429 || result.HTTPStatus == 503 { - d.msgs <- fmt.Sprintf("feed: %s was throttled with status %d: retrying", op.document, result.HTTPStatus) - d.throttler.Throttled(d.inflightCount.Load()) - return true + var msg strings.Builder + msg.WriteString("feed: got status ") + msg.WriteString(strconv.Itoa(result.HTTPStatus)) + msg.WriteString(" (") + if result.Body != nil { + msg.Write(result.Body) + } else { + msg.WriteString("no body") } - if result.Err != nil || result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 { - retry := op.attempts < maxAttempts - var msg strings.Builder - msg.WriteString("feed: ") - msg.WriteString(op.document.String()) - msg.WriteString(" failed: ") - if result.Err != nil { - msg.WriteString(result.Err.Error()) - } else { - msg.WriteString(fmt.Sprintf("status %d", result.HTTPStatus)) - } + msg.WriteString(")") + msg.WriteString(" for ") + msg.WriteString(doc.Operation.String()) + msg.WriteString(" ") + msg.WriteString(doc.Id.String()) + if !result.Success() { if retry { msg.WriteString(": retrying") } else { - msg.WriteString(fmt.Sprintf(": giving up after %d attempts", maxAttempts)) + msg.WriteString(": giving up after ") + msg.WriteString(strconv.Itoa(maxAttempts)) + msg.WriteString(" attempts") } - d.msgs <- msg.String() - d.circuitBreaker.Error(fmt.Errorf("request failed with status %d", result.HTTPStatus)) + } + d.msgs <- msg.String() +} + +func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool { + retry := op.attempts < maxAttempts + d.logResult(op.document, result, retry) + if result.Success() { + d.throttler.Success() + d.circuitBreaker.Success() + return false + } else if result.HTTPStatus == 429 || result.HTTPStatus == 503 { + d.throttler.Throttled(d.inflightCount.Load()) + return true + } else if result.Err != nil || result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 { + d.circuitBreaker.Failure() if retry { return true } diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go index 9bc0c76106c..834ec8490a6 100644 --- a/client/go/internal/vespa/document/dispatcher_test.go +++ b/client/go/internal/vespa/document/dispatcher_test.go @@ -48,7 +48,7 @@ func (f *mockFeeder) Send(doc Document) Result { type mockCircuitBreaker struct{ state CircuitState } func (c *mockCircuitBreaker) Success() {} -func (c *mockCircuitBreaker) Error(err error) {} +func (c *mockCircuitBreaker) Failure() {} func (c *mockCircuitBreaker) State() CircuitState { return c.state } func TestDispatcher(t *testing.T) { diff --git a/client/go/internal/vespa/document/document.go b/client/go/internal/vespa/document/document.go index 616013dc59a..a9b184190fb 100644 --- a/client/go/internal/vespa/document/document.go +++ b/client/go/internal/vespa/document/document.go @@ -1,8 +1,8 @@ package document import ( - "bufio" "bytes" + "errors" "fmt" "io" "math/rand" @@ -36,10 +36,23 @@ const ( ) var ( + ErrMissingId = errors.New("no id specified") fieldsPrefix = []byte(`{"fields":`) fieldsSuffix = []byte("}") ) +func (o Operation) String() string { + switch o { + case OperationPut: + return "put" + case OperationUpdate: + return "update" + case OperationRemove: + return "remove" + } + return "" +} + // Id represents a Vespa document ID. type Id struct { id string @@ -152,14 +165,8 @@ type Decoder struct { func (d Document) String() string { var sb strings.Builder - switch d.Operation { - case OperationPut: - sb.WriteString("put ") - case OperationUpdate: - sb.WriteString("update ") - case OperationRemove: - sb.WriteString("remove ") - } + sb.WriteString(d.Operation.String()) + sb.WriteString(" ") sb.WriteString(d.Id.String()) if d.Condition != "" { sb.WriteString(", condition=") @@ -228,7 +235,7 @@ func (d *Decoder) readBool() (bool, error) { func (d *Decoder) Decode() (Document, error) { doc, err := d.decode() if err != nil && err != io.EOF { - return Document{}, fmt.Errorf("invalid json at byte offset %d: %w", d.dec.InputOffset(), err) + return doc, fmt.Errorf("invalid operation at byte offset %d: %w", d.dec.InputOffset(), err) } return doc, err } @@ -347,14 +354,16 @@ loop: break loop } } + if doc.Id.id == "" { + return doc, ErrMissingId + } return doc, nil } func NewDecoder(r io.Reader) *Decoder { - br := bufio.NewReaderSize(r, 1<<26) d := &Decoder{} d.documentBuffers.New = func() any { return &bytes.Buffer{} } - d.dec = json.NewDecoder(io.TeeReader(br, &d.buf)) + d.dec = json.NewDecoder(io.TeeReader(r, &d.buf)) return d } diff --git a/client/go/internal/vespa/document/document_test.go b/client/go/internal/vespa/document/document_test.go index f9bf321f1fb..d37febf3da8 100644 --- a/client/go/internal/vespa/document/document_test.go +++ b/client/go/internal/vespa/document/document_test.go @@ -1,6 +1,7 @@ package document import ( + "errors" "fmt" "io" "strings" @@ -204,10 +205,17 @@ func TestDocumentDecoderInvalid(t *testing.T) { t.Errorf("unexpected error: %s", err) } _, err = dec.Decode() - wantErr := "invalid json at byte offset 110: json: invalid character '\\n' within string (expecting non-control character)" + wantErr := "invalid operation at byte offset 110: json: invalid character '\\n' within string (expecting non-control character)" if err.Error() != wantErr { t.Errorf("want error %q, got %q", wantErr, err.Error()) } + + dec = NewDecoder(strings.NewReader(`{}`)) + _, err = dec.Decode() + wantErr = "invalid operation at byte offset 2: no id specified" + if !errors.Is(err, ErrMissingId) { + t.Errorf("want error %q, got %q", ErrMissingId, err.Error()) + } } func benchmarkDocumentDecoder(b *testing.B, size int) { diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go index 0c27b0d5440..28e3c225e34 100644 --- a/client/go/internal/vespa/document/http.go +++ b/client/go/internal/vespa/document/http.go @@ -127,37 +127,38 @@ func writeQueryParam(sb *bytes.Buffer, start int, escape bool, k, v string) { } } -func (c *Client) methodAndURL(d Document, sb *bytes.Buffer) (string, string) { - httpMethod := "" - switch d.Operation { - case OperationPut: - httpMethod = "POST" - case OperationUpdate: - httpMethod = "PUT" - case OperationRemove: - httpMethod = "DELETE" - } - // Base URL and path - sb.WriteString(c.options.BaseURL) - if !strings.HasSuffix(c.options.BaseURL, "/") { - sb.WriteString("/") - } - sb.WriteString("document/v1/") - sb.WriteString(url.PathEscape(d.Id.Namespace)) +func (c *Client) writeDocumentPath(id Id, sb *bytes.Buffer) { + sb.WriteString(strings.TrimSuffix(c.options.BaseURL, "/")) + sb.WriteString("/document/v1/") + sb.WriteString(url.PathEscape(id.Namespace)) sb.WriteString("/") - sb.WriteString(url.PathEscape(d.Id.Type)) - if d.Id.Number != nil { + sb.WriteString(url.PathEscape(id.Type)) + if id.Number != nil { sb.WriteString("/number/") - n := uint64(*d.Id.Number) + n := uint64(*id.Number) sb.WriteString(strconv.FormatUint(n, 10)) - } else if d.Id.Group != "" { + } else if id.Group != "" { sb.WriteString("/group/") - sb.WriteString(url.PathEscape(d.Id.Group)) + sb.WriteString(url.PathEscape(id.Group)) } else { sb.WriteString("/docid") } sb.WriteString("/") - sb.WriteString(url.PathEscape(d.Id.UserSpecific)) + sb.WriteString(url.PathEscape(id.UserSpecific)) +} + +func (c *Client) methodAndURL(d Document, sb *bytes.Buffer) (string, string) { + httpMethod := "" + switch d.Operation { + case OperationPut: + httpMethod = http.MethodPost + case OperationUpdate: + httpMethod = http.MethodPut + case OperationRemove: + httpMethod = http.MethodDelete + } + // Base URL and path + c.writeDocumentPath(d.Id, sb) // Query part queryStart := sb.Len() if c.options.Timeout > 0 { @@ -290,7 +291,28 @@ func (c *Client) Send(document Document) Result { } defer resp.Body.Close() elapsed := c.now().Sub(start) - return resultWithResponse(resp, bodySize, result, elapsed, buf) + return c.resultWithResponse(resp, bodySize, result, elapsed) +} + +// Get retrieves document with given ID.nnnnnn +func (c *Client) Get(id Id) Result { + start := c.now() + buf := c.buffer() + defer c.buffers.Put(buf) + c.writeDocumentPath(id, buf) + url := buf.String() + result := Result{Id: id} + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return resultWithErr(result, err) + } + resp, err := c.leastBusyClient().Do(req, c.clientTimeout()) + if err != nil { + return resultWithErr(result, err) + } + defer resp.Body.Close() + elapsed := c.now().Sub(start) + return c.resultWithResponse(resp, 0, result, elapsed) } func resultWithErr(result Result, err error) Result { @@ -299,7 +321,7 @@ func resultWithErr(result Result, err error) Result { return result } -func resultWithResponse(resp *http.Response, sentBytes int, result Result, elapsed time.Duration, buf *bytes.Buffer) Result { +func (c *Client) resultWithResponse(resp *http.Response, sentBytes int, result Result, elapsed time.Duration) Result { result.HTTPStatus = resp.StatusCode switch resp.StatusCode { case 200: @@ -311,24 +333,24 @@ func resultWithResponse(resp *http.Response, sentBytes int, result Result, elaps default: result.Status = StatusTransportFailure } - buf.Reset() - written, err := io.Copy(buf, resp.Body) + b, err := io.ReadAll(resp.Body) if err != nil { result = resultWithErr(result, err) } else { - var body struct { - Message string `json:"message"` - Trace json.RawValue `json:"trace"` - } - if err := json.Unmarshal(buf.Bytes(), &body); err != nil { - result = resultWithErr(result, fmt.Errorf("failed to decode json response: %w", err)) - } else { - result.Message = body.Message - result.Trace = string(body.Trace) + result.Body = b + if result.HTTPStatus == 200 && c.options.TraceLevel > 0 { + var jsonResponse struct { + Trace json.RawValue `json:"trace"` + } + if err := json.Unmarshal(b, &jsonResponse); err != nil { + result = resultWithErr(result, fmt.Errorf("failed to decode json response: %w", err)) + } else { + result.Trace = string(jsonResponse.Trace) + } } } result.Latency = elapsed result.BytesSent = int64(sentBytes) - result.BytesRecv = int64(written) + result.BytesRecv = int64(len(b)) return result } diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go index b34b322f6eb..f8871cb0905 100644 --- a/client/go/internal/vespa/document/http_test.go +++ b/client/go/internal/vespa/document/http_test.go @@ -89,17 +89,19 @@ func TestClientSend(t *testing.T) { Latency: time.Second, } if i < 3 { - httpClient.NextResponseString(200, `{"message":"All good!"}`) + msg := `{"message":"All good!"}` + httpClient.NextResponseString(200, msg) wantRes.Status = StatusSuccess wantRes.HTTPStatus = 200 - wantRes.Message = "All good!" + wantRes.Body = []byte(msg) wantRes.BytesRecv = 23 } else { - httpClient.NextResponseString(502, `{"message":"Good bye, cruel world!"}`) + errMsg := `something went wront` + httpClient.NextResponseString(502, errMsg) wantRes.Status = StatusVespaFailure wantRes.HTTPStatus = 502 - wantRes.Message = "Good bye, cruel world!" - wantRes.BytesRecv = 36 + wantRes.Body = []byte(errMsg) + wantRes.BytesRecv = 20 } res := client.Send(doc) wantRes.BytesSent = int64(len(httpClient.LastBody)) @@ -134,13 +136,45 @@ func TestClientSend(t *testing.T) { MinLatency: time.Second, MaxLatency: time.Second, BytesSent: 75, - BytesRecv: 105, + BytesRecv: 89, } if !reflect.DeepEqual(want, stats) { t.Errorf("got %+v, want %+v", stats, want) } } +func TestClientGet(t *testing.T) { + httpClient := mock.HTTPClient{ReadBody: true} + client, _ := NewClient(ClientOptions{ + BaseURL: "https://example.com:1337", + Timeout: time.Duration(5 * time.Second), + }, []util.HTTPClient{&httpClient}) + clock := manualClock{t: time.Now(), tick: time.Second} + client.now = clock.now + doc := `{ + "pathId": "/document/v1/mynamespace/music/docid/doc1", + "id": "id:mynamespace:music::doc1", + "fields": { + "artist": "Metallica", + "album": "Master of Puppets" + } +}` + id := Id{Namespace: "mynamespace", Type: "music", UserSpecific: "doc1"} + httpClient.NextResponseString(200, doc) + result := client.Get(id) + want := Result{ + Id: id, + Body: []byte(doc), + Status: StatusSuccess, + HTTPStatus: 200, + Latency: time.Second, + BytesRecv: 192, + } + if !reflect.DeepEqual(want, result) { + t.Errorf("got %+v, want %+v", result, want) + } +} + func TestClientSendCompressed(t *testing.T) { httpClient := &mock.HTTPClient{ReadBody: true} client, _ := NewClient(ClientOptions{ diff --git a/client/go/internal/vespa/document/stats.go b/client/go/internal/vespa/document/stats.go index 7696648f703..3e647d0f893 100644 --- a/client/go/internal/vespa/document/stats.go +++ b/client/go/internal/vespa/document/stats.go @@ -24,8 +24,8 @@ const ( type Result struct { Err error Id Id - Message string Trace string + Body []byte Status Status HTTPStatus int Latency time.Duration diff --git a/clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterControllerClusterConfigurer.java b/clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterControllerClusterConfigurer.java index 786928391a5..a893abb519e 100644 --- a/clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterControllerClusterConfigurer.java +++ b/clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterControllerClusterConfigurer.java @@ -69,7 +69,6 @@ public class ClusterControllerClusterConfigurer extends AbstractComponent { builder.setCount(config.fleet_controller_count()); builder.setZooKeeperSessionTimeout((int) (config.zookeeper_session_timeout() * 1000)); builder.setMasterZooKeeperCooldownPeriod((int) (config.master_zookeeper_cooldown_period() * 1000)); - builder.setStateGatherCount(config.state_gather_count()); builder.setRpcPort(config.rpc_port()); builder.setHttpPort(config.http_port()); builder.setMaxTransitionTime(NodeType.STORAGE, config.storage_transition_time()); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java index 6876ac8cf56..42460b5943e 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java @@ -561,7 +561,7 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta if ( ! isRunning()) { return; } - if (masterElectionHandler.isAmongNthFirst(options.stateGatherCount())) { + if (masterElectionHandler.isFirstInLine()) { didWork |= resyncLocallyCachedState(); // Calls to metricUpdate.forWork inside method } else { stepDownAsStateGatherer(); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java index 1541e1a4218..bac6a838300 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java @@ -24,7 +24,6 @@ public class FleetControllerOptions { private final String clusterName; private final int fleetControllerIndex; private final int fleetControllerCount; - private final int stateGatherCount; private final String[] slobrokConnectionSpecs; private final int rpcPort; @@ -132,7 +131,6 @@ public class FleetControllerOptions { private FleetControllerOptions(String clusterName, int fleetControllerIndex, int fleetControllerCount, - int stateGatherCount, String[] slobrokConnectionSpecs, int rpcPort, int httpPort, @@ -174,7 +172,6 @@ public class FleetControllerOptions { this.clusterName = clusterName; this.fleetControllerIndex = fleetControllerIndex; this.fleetControllerCount = fleetControllerCount; - this.stateGatherCount = stateGatherCount; this.slobrokConnectionSpecs = slobrokConnectionSpecs; this.rpcPort = rpcPort; this.httpPort = httpPort; @@ -235,10 +232,6 @@ public class FleetControllerOptions { return fleetControllerCount; } - public int stateGatherCount() { - return stateGatherCount; - } - public String[] slobrokConnectionSpecs() { return slobrokConnectionSpecs; } @@ -394,7 +387,6 @@ public class FleetControllerOptions { private String clusterName; private int index = 0; private int count = 1; - private int stateGatherCount = 2; private String[] slobrokConnectionSpecs; private int rpcPort = 0; private int httpPort = 0; @@ -464,11 +456,6 @@ public class FleetControllerOptions { return this; } - public Builder setStateGatherCount(int stateGatherCount) { - this.stateGatherCount = stateGatherCount; - return this; - } - public Builder setSlobrokConnectionSpecs(String[] slobrokConnectionSpecs) { Objects.requireNonNull(slobrokConnectionSpecs, "slobrokConnectionSpecs cannot be null"); this.slobrokConnectionSpecs = slobrokConnectionSpecs; @@ -694,7 +681,6 @@ public class FleetControllerOptions { return new FleetControllerOptions(clusterName, index, count, - stateGatherCount, slobrokConnectionSpecs, rpcPort, httpPort, @@ -740,7 +726,6 @@ public class FleetControllerOptions { builder.clusterName = options.clusterName; builder.index = options.fleetControllerIndex; builder.count = options.fleetControllerCount; - builder.stateGatherCount = options.stateGatherCount; builder.slobrokConnectionSpecs = options.slobrokConnectionSpecs; builder.rpcPort = options.rpcPort; builder.httpPort = options.httpPort; diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java index b041e6b14f8..fa303533355 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java @@ -118,7 +118,7 @@ public class MasterElectionHandler implements MasterInterface { return 2 * followers <= totalCount; } - public boolean isAmongNthFirst(int first) { return (nextInLineCount < first); } + public boolean isFirstInLine() { return (nextInLineCount < 1); } public boolean watchMasterElection(DatabaseHandler database, DatabaseHandler.DatabaseContext dbContext) { if (totalCount == 1 && !usingZooKeeper) { @@ -251,7 +251,7 @@ public class MasterElectionHandler implements MasterInterface { nextMasterData = null; } - public void writeHtmlState(StringBuilder sb, int stateGatherCount) { + public void writeHtmlState(StringBuilder sb) { sb.append("<h2>Master state</h2>\n"); Integer master = getMaster(); if (master != null) { @@ -270,7 +270,7 @@ public class MasterElectionHandler implements MasterInterface { .append(" before electing new master unless all possible master candidates are online.</p>"); } } - if ((master == null || master != index) && nextInLineCount < stateGatherCount) { + if ((master == null || master != index) && nextInLineCount < 1) { sb.append("<p>As we are number ").append(nextInLineCount) .append(" in line for taking over as master, we're gathering state from nodes.</p>"); sb.append("<p><font color=\"red\">As we are not the master, we don't know about nodes current system state" diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java index 3297d511469..5aae401e157 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java @@ -81,7 +81,7 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa .append(" ]</font></p>\n"); content.append("<table><tr><td>UTC time when creating this page:</td><td align=\"right\">").append(RealTimer.printDateNoMilliSeconds(currentTime, tz)).append("</td></tr>"); content.append("<tr><td>Cluster controller uptime:</td><td align=\"right\">" + RealTimer.printDuration(currentTime - startedTime) + "</td></tr></table>"); - if (masterElectionHandler.isAmongNthFirst(options.stateGatherCount())) { + if (masterElectionHandler.isFirstInLine()) { // Table overview of all the nodes writeHtmlState(cluster, content, timer, stateVersionTracker, options, eventLog); // Current cluster state and cluster state history @@ -91,7 +91,7 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa writeHtmlState(content, options); } // State of master election - masterElectionHandler.writeHtmlState(content, options.stateGatherCount()); + masterElectionHandler.writeHtmlState(content); // Overview of current config writeHtmlState(content, options); // Event log @@ -223,7 +223,6 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa sb.append("<tr><td><nobr>Cluster name</nobr></td><td align=\"right\">").append(options.clusterName()).append("</td></tr>"); sb.append("<tr><td><nobr>Fleet controller index</nobr></td><td align=\"right\">").append(options.fleetControllerIndex()).append("/").append(options.fleetControllerCount()).append("</td></tr>"); - sb.append("<tr><td><nobr>Number of fleetcontrollers gathering states from nodes</nobr></td><td align=\"right\">").append(options.stateGatherCount()).append("</td></tr>"); sb.append("<tr><td><nobr>Slobrok connection spec</nobr></td><td align=\"right\">").append(slobrokspecs).append("</td></tr>"); sb.append("<tr><td><nobr>RPC port</nobr></td><td align=\"right\">").append(options.rpcPort() == 0 ? "Pick random available" : options.rpcPort()).append("</td></tr>"); diff --git a/configdefinitions/src/vespa/fleetcontroller.def b/configdefinitions/src/vespa/fleetcontroller.def index 93a20e4ee0d..c3e161eb038 100644 --- a/configdefinitions/src/vespa/fleetcontroller.def +++ b/configdefinitions/src/vespa/fleetcontroller.def @@ -29,6 +29,7 @@ master_zookeeper_cooldown_period double default=60.0 ## If set to 1, only master will gather state. If set higher, others will ## also do so, prioritizing those fleetcontrollers likely to be the ones to ## take over if the master fails. +# TODO: Deprecated, not used anymore, remove in Vespa 9 state_gather_count int default=1 ## Location of ZooKeeper servers diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java index 040f230a40e..cb8a4fec77a 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java @@ -4,8 +4,8 @@ package com.yahoo.vespa.config.server.application; import ai.vespa.util.http.hc5.VespaAsyncHttpClientBuilder; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.yahoo.component.annotation.Inject; import com.yahoo.component.AbstractComponent; +import com.yahoo.component.annotation.Inject; import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.config.model.api.ApplicationClusterInfo; import com.yahoo.config.model.api.HostInfo; @@ -103,7 +103,7 @@ public class ConfigConvergenceChecker extends AbstractComponent { .filter(serviceInfo -> shouldCheckService(hostsToCheck, application, serviceInfo)) .forEach(service -> getStatePort(service).ifPresent(port -> servicesToCheck.add(service)))); - log.log(Level.FINE, "Services to check for config convergence: " + servicesToCheck); + log.log(Level.FINE, () -> "Services to check for config convergence: " + servicesToCheck); return getServiceGenerations(servicesToCheck, timeoutPerService); } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ApplicationMaintainer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ApplicationMaintainer.java index 50eee9e33b3..a1eae42da38 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ApplicationMaintainer.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ApplicationMaintainer.java @@ -95,16 +95,21 @@ public abstract class ApplicationMaintainer extends NodeRepositoryMaintainer { @Override public void shutdown() { super.shutdown(); - deploymentExecutor.shutdownNow(); + deploymentExecutor.shutdown(); } @Override public void awaitShutdown() { + Instant deadline = clock().instant().plus(Duration.ofMinutes(1)); super.awaitShutdown(); try { + long remainder = Duration.between(clock().instant(), deadline).toMillis(); + if (deploymentExecutor.isShutdown()) return; + // Give deployments in progress some time to complete - if (!deploymentExecutor.awaitTermination(1, TimeUnit.MINUTES)) { + if (remainder < 0 || !deploymentExecutor.awaitTermination(remainder, TimeUnit.MILLISECONDS)) { log.log(Level.WARNING, "Failed to shut down deployment executor within deadline"); + deploymentExecutor.shutdownNow(); } } catch (InterruptedException e) { throw new RuntimeException(e); diff --git a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp index 841d7f92b62..6ca7d298ee2 100644 --- a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp +++ b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp @@ -25,6 +25,7 @@ #include <vespa/vespalib/testkit/test_kit.h> #include <vespa/vespalib/util/mmap_file_allocator_factory.h> #include <vespa/searchlib/util/bufferwriter.h> +#include <vespa/vespalib/util/fake_doom.h> #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/document/base/exceptions.h> #include <vespa/eval/eval/fast_value.h> @@ -301,23 +302,27 @@ public: std::vector<Neighbor> find_top_k(uint32_t k, const search::tensor::BoundDistanceFunction &df, uint32_t explore_k, + const vespalib::Doom& doom, double distance_threshold) const override { (void) k; (void) df; (void) explore_k; + (void) doom; (void) distance_threshold; return std::vector<Neighbor>(); } std::vector<Neighbor> find_top_k_with_filter(uint32_t k, const search::tensor::BoundDistanceFunction &df, const GlobalFilter& filter, uint32_t explore_k, + const vespalib::Doom& doom, double distance_threshold) const override { (void) k; (void) df; (void) explore_k; (void) filter; + (void) doom; (void) distance_threshold; return std::vector<Neighbor>(); } @@ -1291,10 +1296,12 @@ template <typename ParentT> class NearestNeighborBlueprintFixtureBase : public ParentT { private: std::unique_ptr<Value> _query_tensor; + vespalib::FakeDoom _no_doom; public: NearestNeighborBlueprintFixtureBase() - : _query_tensor() + : _query_tensor(), + _no_doom() { this->set_tensor(1, vec_2d(1, 1)); this->set_tensor(2, vec_2d(2, 2)); @@ -1321,7 +1328,7 @@ public: create_query_tensor(vec_2d(17, 42))), 3, approximate, 5, 100100.25, - global_filter_lower_limit, 1.0); + global_filter_lower_limit, 1.0, _no_doom.get_doom()); EXPECT_EQUAL(11u, bp->getState().estimate().estHits); EXPECT_EQUAL(100100.25 * 100100.25, bp->get_distance_threshold()); return bp; diff --git a/searchlib/src/tests/tensor/hnsw_index/hnsw_index_test.cpp b/searchlib/src/tests/tensor/hnsw_index/hnsw_index_test.cpp index 4d759132114..f59c16c76f9 100644 --- a/searchlib/src/tests/tensor/hnsw_index/hnsw_index_test.cpp +++ b/searchlib/src/tests/tensor/hnsw_index/hnsw_index_test.cpp @@ -17,6 +17,7 @@ #include <vespa/vespalib/datastore/compaction_spec.h> #include <vespa/vespalib/datastore/compaction_strategy.h> #include <vespa/vespalib/gtest/gtest.h> +#include <vespa/vespalib/util/fake_doom.h> #include <vespa/vespalib/util/generationhandler.h> #include <vespa/vespalib/data/slime/slime.h> #include <type_traits> @@ -90,13 +91,15 @@ public: LevelGenerator* level_generator; GenerationHandler gen_handler; std::unique_ptr<IndexType> index; + std::unique_ptr<vespalib::FakeDoom> _doom; HnswIndexTest() : vectors(), global_filter(GlobalFilter::create()), level_generator(), gen_handler(), - index() + index(), + _doom(std::make_unique<vespalib::FakeDoom>()) { vectors.set(1, {2, 2}).set(2, {3, 2}).set(3, {2, 3}) .set(4, {1, 2}).set(5, {8, 3}).set(6, {7, 2}) @@ -173,8 +176,8 @@ public: vespalib::eval::TypedCells qv_cells(qv_ref); auto df = index->distance_function_factory().for_query_vector(qv_cells); auto got_by_docid = (global_filter->is_active()) ? - index->find_top_k_with_filter(k, *df, *global_filter, explore_k, 10000.0) : - index->find_top_k(k, *df, explore_k, 10000.0); + index->find_top_k_with_filter(k, *df, *global_filter, explore_k, _doom->get_doom(), 10000.0) : + index->find_top_k(k, *df, explore_k, _doom->get_doom(), 10000.0); std::vector<uint32_t> act; act.reserve(got_by_docid.size()); for (auto& hit : got_by_docid) { @@ -186,7 +189,7 @@ public: uint32_t k = 3; auto qv = vectors.get_vector(docid, 0); auto df = index->distance_function_factory().for_query_vector(qv); - auto rv = index->top_k_candidates(*df, k, global_filter->ptr_if_active()).peek(); + auto rv = index->top_k_candidates(*df, k, global_filter->ptr_if_active(), _doom->get_doom()).peek(); std::sort(rv.begin(), rv.end(), LesserDistance()); size_t idx = 0; for (const auto & hit : rv) { @@ -197,25 +200,27 @@ public: if (exp_hits.size() == k) { std::vector<uint32_t> expected_by_docid = exp_hits; std::sort(expected_by_docid.begin(), expected_by_docid.end()); - auto got_by_docid = index->find_top_k(k, *df, k, 100100.25); + auto got_by_docid = index->find_top_k(k, *df, k, _doom->get_doom(), 100100.25); for (idx = 0; idx < k; ++idx) { EXPECT_EQ(expected_by_docid[idx], got_by_docid[idx].docid); } } - check_with_distance_threshold(docid); + if (!exp_hits.empty()) { + check_with_distance_threshold(docid); + } } void check_with_distance_threshold(uint32_t docid) { auto qv = vectors.get_vector(docid, 0); auto df = index->distance_function_factory().for_query_vector(qv); uint32_t k = 3; - auto rv = index->top_k_candidates(*df, k, global_filter->ptr_if_active()).peek(); + auto rv = index->top_k_candidates(*df, k, global_filter->ptr_if_active(), _doom->get_doom()).peek(); std::sort(rv.begin(), rv.end(), LesserDistance()); EXPECT_EQ(rv.size(), 3); EXPECT_LE(rv[0].distance, rv[1].distance); double thr = (rv[0].distance + rv[1].distance) * 0.5; auto got_by_docid = (global_filter->is_active()) - ? index->find_top_k_with_filter(k, *df, *global_filter, k, thr) - : index->find_top_k(k, *df, k, thr); + ? index->find_top_k_with_filter(k, *df, *global_filter, k, _doom->get_doom(), thr) + : index->find_top_k(k, *df, k, _doom->get_doom(), thr); EXPECT_EQ(got_by_docid.size(), 1); EXPECT_EQ(got_by_docid[0].docid, index->get_docid(rv[0].nodeid)); for (const auto & hit : got_by_docid) { @@ -262,6 +267,12 @@ public: HnswIndexLoader<VectorBufferReader, IndexType::index_type> loader(graph, id_mapping, std::make_unique<VectorBufferReader>(data)); while (loader.load_next()) {} } + void reset_doom() { + _doom = std::make_unique<vespalib::FakeDoom>(); + } + void reset_doom(vespalib::steady_time::duration time_to_doom) { + _doom = std::make_unique<vespalib::FakeDoom>(time_to_doom); + } static constexpr bool is_single = std::is_same_v<IndexType, HnswIndex<HnswIndexType::SINGLE>>; }; @@ -334,6 +345,8 @@ TYPED_TEST(HnswIndexTest, 2d_vectors_inserted_in_level_0_graph_with_simple_selec this->expect_top_3(7, {3, 2}); this->expect_top_3(8, {4, 3}); this->expect_top_3(9, {3, 2}); + this->reset_doom(-1s); + this->expect_top_3(2, {}); } TYPED_TEST(HnswIndexTest, 2d_vectors_inserted_and_removed) @@ -824,6 +837,10 @@ TEST_F(HnswMultiIndexTest, duplicate_docid_is_removed) this->expect_top_3_by_docid("{2, 0}", {2, 0}, {1, 2, 4}); this->expect_top_3_by_docid("{2, 1}", {2, 1}, {2, 3, 4}); this->expect_top_3_by_docid("{2, 2}", {2, 2}, {1, 3, 4}); + this->reset_doom(-1s); // 1s beyond doom => no hits + this->expect_top_3_by_docid("{2, 2}", {2, 2}, {}); + this->reset_doom(); + this->expect_top_3_by_docid("{2, 2}", {2, 2}, {1, 3, 4}); auto filter = std::make_shared<MyGlobalFilter>(GlobalFilter::create({1, 2}, 3)); global_filter = filter; this->expect_top_3_by_docid("{2,2}", {2, 2}, {1, 2}); diff --git a/searchlib/src/vespa/searchlib/attribute/attribute_blueprint_factory.cpp b/searchlib/src/vespa/searchlib/attribute/attribute_blueprint_factory.cpp index 7a622030d98..62f76b5cee0 100644 --- a/searchlib/src/vespa/searchlib/attribute/attribute_blueprint_factory.cpp +++ b/searchlib/src/vespa/searchlib/attribute/attribute_blueprint_factory.cpp @@ -835,7 +835,8 @@ public: n.get_explore_additional_hits(), n.get_distance_threshold(), getRequestContext().get_attribute_blueprint_params().global_filter_lower_limit, - getRequestContext().get_attribute_blueprint_params().global_filter_upper_limit)); + getRequestContext().get_attribute_blueprint_params().global_filter_upper_limit, + getRequestContext().getDoom())); } catch (const vespalib::IllegalArgumentException& ex) { return fail_nearest_neighbor_term(n, ex.getMessage()); diff --git a/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_blueprint.cpp b/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_blueprint.cpp index 7c307a1e35f..87ddb8b6edc 100644 --- a/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_blueprint.cpp +++ b/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_blueprint.cpp @@ -14,6 +14,8 @@ LOG_SETUP(".searchlib.queryeval.nearest_neighbor_blueprint"); using vespalib::eval::Value; +namespace vespalib { class Doom; } + namespace search::queryeval { namespace { @@ -40,7 +42,8 @@ NearestNeighborBlueprint::NearestNeighborBlueprint(const queryeval::FieldSpec& f uint32_t explore_additional_hits, double distance_threshold, double global_filter_lower_limit, - double global_filter_upper_limit) + double global_filter_upper_limit, + const vespalib::Doom& doom) : ComplexLeafBlueprint(field), _distance_calc(std::move(distance_calc)), _attr_tensor(_distance_calc->attribute_tensor()), @@ -58,7 +61,8 @@ NearestNeighborBlueprint::NearestNeighborBlueprint(const queryeval::FieldSpec& f _global_filter(GlobalFilter::create()), _global_filter_set(false), _global_filter_hits(), - _global_filter_hit_ratio() + _global_filter_hit_ratio(), + _doom(doom) { if (distance_threshold < std::numeric_limits<double>::max()) { _distance_threshold = _distance_calc->function().convert_threshold(distance_threshold); @@ -109,10 +113,10 @@ NearestNeighborBlueprint::perform_top_k(const search::tensor::NearestNeighborInd uint32_t k = _adjusted_target_hits; const auto &df = _distance_calc->function(); if (_global_filter->is_active()) { - _found_hits = nns_index->find_top_k_with_filter(k, df, *_global_filter, k + _explore_additional_hits, _distance_threshold); + _found_hits = nns_index->find_top_k_with_filter(k, df, *_global_filter, k + _explore_additional_hits, _doom, _distance_threshold); _algorithm = Algorithm::INDEX_TOP_K_WITH_FILTER; } else { - _found_hits = nns_index->find_top_k(k, df, k + _explore_additional_hits, _distance_threshold); + _found_hits = nns_index->find_top_k(k, df, k + _explore_additional_hits, _doom, _distance_threshold); _algorithm = Algorithm::INDEX_TOP_K; } } diff --git a/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_blueprint.h b/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_blueprint.h index 3defb34cffd..f88cdd5adb1 100644 --- a/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_blueprint.h +++ b/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_blueprint.h @@ -45,6 +45,7 @@ private: bool _global_filter_set; std::optional<uint32_t> _global_filter_hits; std::optional<double> _global_filter_hit_ratio; + const vespalib::Doom& _doom; void perform_top_k(const search::tensor::NearestNeighborIndex* nns_index); public: @@ -53,7 +54,8 @@ public: uint32_t target_hits, bool approximate, uint32_t explore_additional_hits, double distance_threshold, double global_filter_lower_limit, - double global_filter_upper_limit); + double global_filter_upper_limit, + const vespalib::Doom& doom); NearestNeighborBlueprint(const NearestNeighborBlueprint&) = delete; NearestNeighborBlueprint& operator=(const NearestNeighborBlueprint&) = delete; ~NearestNeighborBlueprint(); diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp b/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp index f2f7ad212de..e3fa3f1f9f3 100644 --- a/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp +++ b/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp @@ -18,6 +18,7 @@ #include <vespa/vespalib/data/slime/inserter.h> #include <vespa/vespalib/datastore/array_store.hpp> #include <vespa/vespalib/datastore/compaction_strategy.h> +#include <vespa/vespalib/util/doom.h> #include <vespa/vespalib/util/memory_allocator.h> #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/time.h> @@ -373,12 +374,19 @@ HnswIndex<type>::search_layer_helper( const BoundDistanceFunction &df, uint32_t neighbors_to_find, BestNeighbors& best_neighbors, uint32_t level, const GlobalFilter *filter, - uint32_t nodeid_limit, uint32_t estimated_visited_nodes) const + uint32_t nodeid_limit, const vespalib::Doom* const doom, + uint32_t estimated_visited_nodes) const { NearestPriQ candidates; GlobalFilterWrapper<type> filter_wrapper(filter); filter_wrapper.clamp_nodeid_limit(nodeid_limit); VisitedTracker visited(nodeid_limit, estimated_visited_nodes); + if (doom != nullptr && doom->soft_doom()) { + while (!best_neighbors.empty()) { + best_neighbors.pop(); + } + return; + } for (const auto &entry : best_neighbors.peek()) { if (entry.nodeid >= nodeid_limit) { continue; @@ -423,6 +431,9 @@ HnswIndex<type>::search_layer_helper( } } } + if (doom != nullptr && doom->soft_doom()) { + break; + } } } @@ -432,14 +443,15 @@ void HnswIndex<type>::search_layer( const BoundDistanceFunction &df, uint32_t neighbors_to_find, - BestNeighbors& best_neighbors, uint32_t level, const GlobalFilter *filter) const + BestNeighbors& best_neighbors, uint32_t level, + const vespalib::Doom* const doom, const GlobalFilter *filter) const { uint32_t nodeid_limit = _graph.nodes_size.load(std::memory_order_acquire); uint32_t estimated_visited_nodes = estimate_visited_nodes(level, nodeid_limit, neighbors_to_find, filter); if (estimated_visited_nodes >= nodeid_limit / 128) { - search_layer_helper<BitVectorVisitedTracker>(df, neighbors_to_find, best_neighbors, level, filter, nodeid_limit, estimated_visited_nodes); + search_layer_helper<BitVectorVisitedTracker>(df, neighbors_to_find, best_neighbors, level, filter, nodeid_limit, doom, estimated_visited_nodes); } else { - search_layer_helper<HashSetVisitedTracker>(df, neighbors_to_find, best_neighbors, level, filter, nodeid_limit, estimated_visited_nodes); + search_layer_helper<HashSetVisitedTracker>(df, neighbors_to_find, best_neighbors, level, filter, nodeid_limit, doom, estimated_visited_nodes); } } @@ -522,7 +534,7 @@ HnswIndex<type>::internal_prepare_add_node(PreparedAddDoc& op, TypedCells input_ search_level = std::min(node_max_level, search_level); // Find neighbors of the added document in each level it should exist in. while (search_level >= 0) { - search_layer(*df, _cfg.neighbors_to_explore_at_construction(), best_neighbors, search_level); + search_layer(*df, _cfg.neighbors_to_explore_at_construction(), best_neighbors, search_level, nullptr); auto neighbors = select_neighbors(best_neighbors.peek(), _cfg.max_links_on_inserts()); auto& links = connections[search_level]; links.reserve(neighbors.used.size()); @@ -887,9 +899,10 @@ HnswIndex<type>::top_k_by_docid( uint32_t k, const BoundDistanceFunction &df, const GlobalFilter *filter, uint32_t explore_k, + const vespalib::Doom& doom, double distance_threshold) const { - SearchBestNeighbors candidates = top_k_candidates(df, std::max(k, explore_k), filter); + SearchBestNeighbors candidates = top_k_candidates(df, std::max(k, explore_k), filter, doom); auto result = candidates.get_neighbors(k, distance_threshold); std::sort(result.begin(), result.end(), NeighborsByDocId()); return result; @@ -901,9 +914,10 @@ HnswIndex<type>::find_top_k( uint32_t k, const BoundDistanceFunction &df, uint32_t explore_k, + const vespalib::Doom& doom, double distance_threshold) const { - return top_k_by_docid(k, df, nullptr, explore_k, distance_threshold); + return top_k_by_docid(k, df, nullptr, explore_k, doom, distance_threshold); } template <HnswIndexType type> @@ -912,16 +926,18 @@ HnswIndex<type>::find_top_k_with_filter( uint32_t k, const BoundDistanceFunction &df, const GlobalFilter &filter, uint32_t explore_k, + const vespalib::Doom& doom, double distance_threshold) const { - return top_k_by_docid(k, df, &filter, explore_k, distance_threshold); + return top_k_by_docid(k, df, &filter, explore_k, doom, distance_threshold); } template <HnswIndexType type> typename HnswIndex<type>::SearchBestNeighbors HnswIndex<type>::top_k_candidates( const BoundDistanceFunction &df, - uint32_t k, const GlobalFilter *filter) const + uint32_t k, const GlobalFilter *filter, + const vespalib::Doom& doom) const { SearchBestNeighbors best_neighbors; auto entry = _graph.get_entry_node(); @@ -939,7 +955,7 @@ HnswIndex<type>::top_k_candidates( --search_level; } best_neighbors.push(entry_point); - search_layer(df, k, best_neighbors, 0, filter); + search_layer(df, k, best_neighbors, 0, &doom, filter); return best_neighbors; } diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_index.h b/searchlib/src/vespa/searchlib/tensor/hnsw_index.h index 20ab0dbea92..1ea8d1be558 100644 --- a/searchlib/src/vespa/searchlib/tensor/hnsw_index.h +++ b/searchlib/src/vespa/searchlib/tensor/hnsw_index.h @@ -170,12 +170,15 @@ protected: void search_layer_helper(const BoundDistanceFunction &df, uint32_t neighbors_to_find, BestNeighbors& best_neighbors, uint32_t level, const GlobalFilter *filter, uint32_t nodeid_limit, + const vespalib::Doom* const doom, uint32_t estimated_visited_nodes) const; template <class BestNeighbors> void search_layer(const BoundDistanceFunction &df, uint32_t neighbors_to_find, BestNeighbors& best_neighbors, - uint32_t level, const GlobalFilter *filter = nullptr) const; + uint32_t level, const vespalib::Doom* const doom, + const GlobalFilter *filter = nullptr) const; std::vector<Neighbor> top_k_by_docid(uint32_t k, const BoundDistanceFunction &df, const GlobalFilter *filter, uint32_t explore_k, + const vespalib::Doom& doom, double distance_threshold) const; internal::PreparedAddDoc internal_prepare_add(uint32_t docid, VectorBundle input_vectors, @@ -217,19 +220,22 @@ public: uint32_t k, const BoundDistanceFunction &df, uint32_t explore_k, + const vespalib::Doom& doom, double distance_threshold) const override; std::vector<Neighbor> find_top_k_with_filter( uint32_t k, const BoundDistanceFunction &df, const GlobalFilter &filter, uint32_t explore_k, + const vespalib::Doom& doom, double distance_threshold) const override; DistanceFunctionFactory &distance_function_factory() const override { return *_distance_ff; } SearchBestNeighbors top_k_candidates( const BoundDistanceFunction &df, - uint32_t k, const GlobalFilter *filter) const; + uint32_t k, const GlobalFilter *filter, + const vespalib::Doom& doom) const; uint32_t get_entry_nodeid() const { return _graph.get_entry_node().nodeid; } int32_t get_entry_level() const { return _graph.get_entry_node().level; } diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_multi_best_neighbors.h b/searchlib/src/vespa/searchlib/tensor/hnsw_multi_best_neighbors.h index de707999f11..67eff4b33c7 100644 --- a/searchlib/src/vespa/searchlib/tensor/hnsw_multi_best_neighbors.h +++ b/searchlib/src/vespa/searchlib/tensor/hnsw_multi_best_neighbors.h @@ -57,6 +57,7 @@ public: _candidates.pop(); } const HnswCandidateVector& peek() const { return _candidates.peek(); } + bool empty() const { return _candidates.empty(); } const HnswCandidate& top() const { return _candidates.top(); } size_t size() const { return _docids.size(); } void emplace(uint32_t nodeid, uint32_t docid, EntryRef ref, double distance) { diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_single_best_neighbors.h b/searchlib/src/vespa/searchlib/tensor/hnsw_single_best_neighbors.h index e7c0a7fded6..acb14f79b7a 100644 --- a/searchlib/src/vespa/searchlib/tensor/hnsw_single_best_neighbors.h +++ b/searchlib/src/vespa/searchlib/tensor/hnsw_single_best_neighbors.h @@ -25,6 +25,7 @@ public: void push(const HnswCandidate& candidate) { _candidates.push(candidate); } void pop() { _candidates.pop(); } const HnswCandidateVector& peek() const { return _candidates.peek(); } + bool empty() const { return _candidates.empty(); } const HnswCandidate& top() const { return _candidates.top(); } size_t size() const { return _candidates.size(); } void emplace(uint32_t nodeid, uint32_t docid, EntryRef ref, double distance) { _candidates.emplace(nodeid, docid, ref, distance); } diff --git a/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index.h b/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index.h index 9cd1065a356..a11be086697 100644 --- a/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index.h +++ b/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index.h @@ -14,6 +14,7 @@ class FastOS_FileInterface; +namespace vespalib { class Doom; } namespace vespalib { class GenericHeader; } namespace vespalib::datastore { class CompactionSpec; @@ -101,6 +102,7 @@ public: virtual std::vector<Neighbor> find_top_k(uint32_t k, const BoundDistanceFunction &df, uint32_t explore_k, + const vespalib::Doom& doom, double distance_threshold) const = 0; // only return neighbors where the corresponding filter bit is set @@ -108,6 +110,7 @@ public: const BoundDistanceFunction &df, const GlobalFilter &filter, uint32_t explore_k, + const vespalib::Doom& doom, double distance_threshold) const = 0; virtual DistanceFunctionFactory &distance_function_factory() const = 0; diff --git a/vespalib/src/vespa/vespalib/util/CMakeLists.txt b/vespalib/src/vespa/vespalib/util/CMakeLists.txt index 91365d446c1..21642cbd842 100644 --- a/vespalib/src/vespa/vespalib/util/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/util/CMakeLists.txt @@ -31,6 +31,7 @@ vespa_add_library(vespalib_vespalib_util OBJECT exceptions.cpp execution_profiler.cpp executor_idle_tracking.cpp + fake_doom.cpp featureset.cpp file_area_freelist.cpp foregroundtaskexecutor.cpp diff --git a/vespalib/src/vespa/vespalib/util/fake_doom.cpp b/vespalib/src/vespa/vespalib/util/fake_doom.cpp new file mode 100644 index 00000000000..4ca71afd2c5 --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/fake_doom.cpp @@ -0,0 +1,16 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "fake_doom.h" + +namespace vespalib { + +FakeDoom::FakeDoom(steady_time::duration time_to_doom) + : _time(steady_clock::now()), + _clock(_time), + _doom(_clock, _clock.getTimeNS() + time_to_doom) +{ +} + +FakeDoom::~FakeDoom() = default; + +} diff --git a/vespalib/src/vespa/vespalib/util/fake_doom.h b/vespalib/src/vespa/vespalib/util/fake_doom.h new file mode 100644 index 00000000000..496129d8f0f --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/fake_doom.h @@ -0,0 +1,24 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "doom.h" + +namespace vespalib { + +/* + * Class containing a fake doom controlled by the time_to_doom + * constructor argument. + */ +class FakeDoom { + std::atomic<steady_time> _time; + Clock _clock; + Doom _doom; +public: + FakeDoom() : FakeDoom(1s) { } + FakeDoom(steady_time::duration time_to_doom); + ~FakeDoom(); + const Doom& get_doom() const noexcept { return _doom; } +}; + +} |