summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ann_benchmark/src/vespa/ann_benchmark/vespa_ann_benchmark.cpp7
-rw-r--r--client/go/internal/cli/cmd/document.go190
-rw-r--r--client/go/internal/cli/cmd/document_test.go46
-rw-r--r--client/go/internal/cli/cmd/feed.go3
-rw-r--r--client/go/internal/cli/cmd/testdata/A-Head-Full-of-Dreams-Put-Id.json15
-rw-r--r--client/go/internal/curl/curl.go2
-rw-r--r--client/go/internal/vespa/document.go197
-rw-r--r--client/go/internal/vespa/document/circuit_breaker.go9
-rw-r--r--client/go/internal/vespa/document/circuit_breaker_test.go6
-rw-r--r--client/go/internal/vespa/document/dispatcher.go65
-rw-r--r--client/go/internal/vespa/document/dispatcher_test.go2
-rw-r--r--client/go/internal/vespa/document/document.go33
-rw-r--r--client/go/internal/vespa/document/document_test.go10
-rw-r--r--client/go/internal/vespa/document/http.go96
-rw-r--r--client/go/internal/vespa/document/http_test.go46
-rw-r--r--client/go/internal/vespa/document/stats.go2
-rw-r--r--clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterControllerClusterConfigurer.java1
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java2
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java15
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java6
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java5
-rw-r--r--configdefinitions/src/vespa/fleetcontroller.def1
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java4
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ApplicationMaintainer.java9
-rw-r--r--searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp11
-rw-r--r--searchlib/src/tests/tensor/hnsw_index/hnsw_index_test.cpp35
-rw-r--r--searchlib/src/vespa/searchlib/attribute/attribute_blueprint_factory.cpp3
-rw-r--r--searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_blueprint.cpp12
-rw-r--r--searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_blueprint.h4
-rw-r--r--searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp36
-rw-r--r--searchlib/src/vespa/searchlib/tensor/hnsw_index.h10
-rw-r--r--searchlib/src/vespa/searchlib/tensor/hnsw_multi_best_neighbors.h1
-rw-r--r--searchlib/src/vespa/searchlib/tensor/hnsw_single_best_neighbors.h1
-rw-r--r--searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index.h3
-rw-r--r--vespalib/src/vespa/vespalib/util/CMakeLists.txt1
-rw-r--r--vespalib/src/vespa/vespalib/util/fake_doom.cpp16
-rw-r--r--vespalib/src/vespa/vespalib/util/fake_doom.h24
37 files changed, 522 insertions, 407 deletions
diff --git a/ann_benchmark/src/vespa/ann_benchmark/vespa_ann_benchmark.cpp b/ann_benchmark/src/vespa/ann_benchmark/vespa_ann_benchmark.cpp
index a52e0850b7d..730ee141f83 100644
--- a/ann_benchmark/src/vespa/ann_benchmark/vespa_ann_benchmark.cpp
+++ b/ann_benchmark/src/vespa/ann_benchmark/vespa_ann_benchmark.cpp
@@ -10,6 +10,7 @@
#include <vespa/searchcommon/attribute/config.h>
#include <vespa/eval/eval/value.h>
#include <vespa/vespalib/test/insertion_operators.h>
+#include <vespa/vespalib/util/fake_doom.h>
#include <iostream>
#include <sstream>
#include <limits>
@@ -67,6 +68,7 @@ class HnswIndex
const NearestNeighborIndex* _nearest_neighbor_index;
size_t _dim_size;
bool _normalize_vectors;
+ vespalib::FakeDoom _no_doom;
bool check_lid(uint32_t lid);
bool check_value(const char *op, const std::vector<float>& value);
@@ -87,7 +89,8 @@ HnswIndex::HnswIndex(uint32_t dim_size, const HnswIndexParams &hnsw_index_params
_tensor_attribute(nullptr),
_nearest_neighbor_index(nullptr),
_dim_size(0u),
- _normalize_vectors(normalize_vectors)
+ _normalize_vectors(normalize_vectors),
+ _no_doom()
{
Config cfg(BasicType::TENSOR, CollectionType::SINGLE);
_tensor_type = ValueType::from_spec(make_tensor_spec(dim_size));
@@ -208,7 +211,7 @@ HnswIndex::find_top_k(uint32_t k, const std::vector<float>& value, uint32_t expl
std::vector<float> normalized_value;
auto typed_cells = get_typed_cells(value, normalized_value);
auto df = _nearest_neighbor_index->distance_function_factory().for_query_vector(typed_cells);
- auto raw_result = _nearest_neighbor_index->find_top_k(k, *df, explore_k, std::numeric_limits<double>::max());
+ auto raw_result = _nearest_neighbor_index->find_top_k(k, *df, explore_k, _no_doom.get_doom(), std::numeric_limits<double>::max());
result.reserve(raw_result.size());
switch (_hnsw_index_params.distance_metric()) {
case DistanceMetric::Euclidean:
diff --git a/client/go/internal/cli/cmd/document.go b/client/go/internal/cli/cmd/document.go
index b5b63fd32df..07a98d2e626 100644
--- a/client/go/internal/cli/cmd/document.go
+++ b/client/go/internal/cli/cmd/document.go
@@ -5,15 +5,22 @@
package cmd
import (
+ "bytes"
+ "errors"
"fmt"
"io"
+ "net/http"
+ "os"
+ "strconv"
"strings"
"time"
"github.com/fatih/color"
"github.com/spf13/cobra"
+ "github.com/vespa-engine/vespa/client/go/internal/curl"
"github.com/vespa-engine/vespa/client/go/internal/util"
"github.com/vespa-engine/vespa/client/go/internal/vespa"
+ "github.com/vespa-engine/vespa/client/go/internal/vespa/document"
)
func addDocumentFlags(cmd *cobra.Command, printCurl *bool, timeoutSecs *int) {
@@ -21,6 +28,128 @@ func addDocumentFlags(cmd *cobra.Command, printCurl *bool, timeoutSecs *int) {
cmd.PersistentFlags().IntVarP(timeoutSecs, "timeout", "T", 60, "Timeout for the document request in seconds")
}
+type serviceWithCurl struct {
+ curlCmdWriter io.Writer
+ bodyFile string
+ service *vespa.Service
+}
+
+func (s *serviceWithCurl) Do(request *http.Request, timeout time.Duration) (*http.Response, error) {
+ cmd, err := curl.RawArgs(request.URL.String())
+ if err != nil {
+ return nil, err
+ }
+ cmd.Method = request.Method
+ for k, vs := range request.Header {
+ for _, v := range vs {
+ cmd.Header(k, v)
+ }
+ }
+ if s.bodyFile != "" {
+ cmd.WithBodyFile(s.bodyFile)
+ }
+ cmd.Certificate = s.service.TLSOptions.CertificateFile
+ cmd.PrivateKey = s.service.TLSOptions.PrivateKeyFile
+ out := cmd.String() + "\n"
+ if _, err := io.WriteString(s.curlCmdWriter, out); err != nil {
+ return nil, err
+ }
+ return s.service.Do(request, timeout)
+}
+
+func documentClient(cli *CLI, timeoutSecs int, printCurl bool) (*document.Client, *serviceWithCurl, error) {
+ docService, err := documentService(cli)
+ if err != nil {
+ return nil, nil, err
+ }
+ service := &serviceWithCurl{curlCmdWriter: io.Discard, service: docService}
+ if printCurl {
+ service.curlCmdWriter = cli.Stderr
+ }
+ client, err := document.NewClient(document.ClientOptions{
+ Compression: document.CompressionAuto,
+ Timeout: time.Duration(timeoutSecs) * time.Second,
+ BaseURL: docService.BaseURL,
+ NowFunc: time.Now,
+ }, []util.HTTPClient{service})
+ if err != nil {
+ return nil, nil, err
+ }
+ return client, service, nil
+}
+
+func sendOperation(op document.Operation, args []string, timeoutSecs int, printCurl bool, cli *CLI) error {
+ client, service, err := documentClient(cli, timeoutSecs, printCurl)
+ if err != nil {
+ return err
+ }
+ id := ""
+ filename := args[0]
+ if len(args) > 1 {
+ id = args[0]
+ filename = args[1]
+ }
+ f, err := os.Open(filename)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ doc, err := document.NewDecoder(f).Decode()
+ if errors.Is(err, document.ErrMissingId) {
+ if id == "" {
+ return fmt.Errorf("no document id given neither as argument or as a 'put', 'update' or 'remove' key in the JSON file")
+ }
+ } else if err != nil {
+ return err
+ }
+ if id != "" {
+ docId, err := document.ParseId(id)
+ if err != nil {
+ return err
+ }
+ doc.Id = docId
+ }
+ if op > -1 {
+ if id == "" && op != doc.Operation {
+ return fmt.Errorf("wanted document operation is %s, but JSON file specifies %s", op, doc.Operation)
+ }
+ doc.Operation = op
+ }
+ if doc.Body != nil {
+ service.bodyFile = f.Name()
+ }
+ result := client.Send(doc)
+ return printResult(cli, operationResult(false, doc, service.service, result), false)
+}
+
+func readDocument(id string, timeoutSecs int, printCurl bool, cli *CLI) error {
+ client, service, err := documentClient(cli, timeoutSecs, printCurl)
+ if err != nil {
+ return err
+ }
+ docId, err := document.ParseId(id)
+ if err != nil {
+ return err
+ }
+ result := client.Get(docId)
+ return printResult(cli, operationResult(true, document.Document{Id: docId}, service.service, result), true)
+}
+
+func operationResult(read bool, doc document.Document, service *vespa.Service, result document.Result) util.OperationResult {
+ bodyReader := bytes.NewReader(result.Body)
+ if result.HTTPStatus == 200 {
+ if read {
+ return util.SuccessWithPayload("Read "+doc.Id.String(), util.ReaderToJSON(bodyReader))
+ } else {
+ return util.Success(doc.Operation.String() + " " + doc.Id.String())
+ }
+ }
+ if result.HTTPStatus/100 == 4 {
+ return util.FailureWithPayload("Invalid document operation: Status "+strconv.Itoa(result.HTTPStatus), util.ReaderToJSON(bodyReader))
+ }
+ return util.FailureWithPayload(service.Description()+" at "+service.BaseURL+": Status "+strconv.Itoa(result.HTTPStatus), util.ReaderToJSON(bodyReader))
+}
+
func newDocumentCmd(cli *CLI) *cobra.Command {
var (
printCurl bool
@@ -44,11 +173,7 @@ should be used instead of this.`,
SilenceUsage: true,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
- service, err := documentService(cli)
- if err != nil {
- return err
- }
- return printResult(cli, vespa.Send(args[0], service, operationOptions(cli.Stderr, printCurl, timeoutSecs)), false)
+ return sendOperation(-1, args, timeoutSecs, printCurl, cli)
},
}
addDocumentFlags(cmd, &printCurl, &timeoutSecs)
@@ -72,15 +197,7 @@ $ vespa document put id:mynamespace:music::a-head-full-of-dreams src/test/resour
DisableAutoGenTag: true,
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
- service, err := documentService(cli)
- if err != nil {
- return err
- }
- if len(args) == 1 {
- return printResult(cli, vespa.Put("", args[0], service, operationOptions(cli.Stderr, printCurl, timeoutSecs)), false)
- } else {
- return printResult(cli, vespa.Put(args[0], args[1], service, operationOptions(cli.Stderr, printCurl, timeoutSecs)), false)
- }
+ return sendOperation(document.OperationPut, args, timeoutSecs, printCurl, cli)
},
}
addDocumentFlags(cmd, &printCurl, &timeoutSecs)
@@ -103,15 +220,7 @@ $ vespa document update id:mynamespace:music::a-head-full-of-dreams src/test/res
DisableAutoGenTag: true,
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
- service, err := documentService(cli)
- if err != nil {
- return err
- }
- if len(args) == 1 {
- return printResult(cli, vespa.Update("", args[0], service, operationOptions(cli.Stderr, printCurl, timeoutSecs)), false)
- } else {
- return printResult(cli, vespa.Update(args[0], args[1], service, operationOptions(cli.Stderr, printCurl, timeoutSecs)), false)
- }
+ return sendOperation(document.OperationUpdate, args, timeoutSecs, printCurl, cli)
},
}
addDocumentFlags(cmd, &printCurl, &timeoutSecs)
@@ -134,14 +243,20 @@ $ vespa document remove id:mynamespace:music::a-head-full-of-dreams`,
DisableAutoGenTag: true,
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
- service, err := documentService(cli)
- if err != nil {
- return err
- }
if strings.HasPrefix(args[0], "id:") {
- return printResult(cli, vespa.RemoveId(args[0], service, operationOptions(cli.Stderr, printCurl, timeoutSecs)), false)
+ client, service, err := documentClient(cli, timeoutSecs, printCurl)
+ if err != nil {
+ return err
+ }
+ id, err := document.ParseId(args[0])
+ if err != nil {
+ return err
+ }
+ doc := document.Document{Id: id, Operation: document.OperationRemove}
+ result := client.Send(doc)
+ return printResult(cli, operationResult(false, doc, service.service, result), false)
} else {
- return printResult(cli, vespa.RemoveOperation(args[0], service, operationOptions(cli.Stderr, printCurl, timeoutSecs)), false)
+ return sendOperation(document.OperationRemove, args, timeoutSecs, printCurl, cli)
}
},
}
@@ -162,11 +277,7 @@ func newDocumentGetCmd(cli *CLI) *cobra.Command {
SilenceUsage: true,
Example: `$ vespa document get id:mynamespace:music::a-head-full-of-dreams`,
RunE: func(cmd *cobra.Command, args []string) error {
- service, err := documentService(cli)
- if err != nil {
- return err
- }
- return printResult(cli, vespa.Get(args[0], service, operationOptions(cli.Stderr, printCurl, timeoutSecs)), true)
+ return readDocument(args[0], timeoutSecs, printCurl, cli)
},
}
addDocumentFlags(cmd, &printCurl, &timeoutSecs)
@@ -181,17 +292,6 @@ func documentService(cli *CLI) (*vespa.Service, error) {
return cli.service(target, vespa.DocumentService, 0, cli.config.cluster())
}
-func operationOptions(stderr io.Writer, printCurl bool, timeoutSecs int) vespa.OperationOptions {
- curlOutput := io.Discard
- if printCurl {
- curlOutput = stderr
- }
- return vespa.OperationOptions{
- CurlOutput: curlOutput,
- Timeout: time.Second * time.Duration(timeoutSecs),
- }
-}
-
func printResult(cli *CLI, result util.OperationResult, payloadOnlyOnSuccess bool) error {
out := cli.Stdout
if !result.Success {
diff --git a/client/go/internal/cli/cmd/document_test.go b/client/go/internal/cli/cmd/document_test.go
index bf9cc0404dc..00f98ee1333 100644
--- a/client/go/internal/cli/cmd/document_test.go
+++ b/client/go/internal/cli/cmd/document_test.go
@@ -5,6 +5,7 @@
package cmd
import (
+ "encoding/json"
"os"
"strconv"
"testing"
@@ -20,6 +21,11 @@ func TestDocumentSendPut(t *testing.T) {
"put", "POST", "id:mynamespace:music::a-head-full-of-dreams", "testdata/A-Head-Full-of-Dreams-Put.json", t)
}
+func TestDocumentSendPutWithIdInFile(t *testing.T) {
+ assertDocumentSend([]string{"document", "testdata/A-Head-Full-of-Dreams-Put-Id.json"},
+ "put", "POST", "id:mynamespace:music::a-head-full-of-dreams", "testdata/A-Head-Full-of-Dreams-Put-Id.json", t)
+}
+
func TestDocumentSendPutVerbose(t *testing.T) {
assertDocumentSend([]string{"document", "-v", "testdata/A-Head-Full-of-Dreams-Put.json"},
"put", "POST", "id:mynamespace:music::a-head-full-of-dreams", "testdata/A-Head-Full-of-Dreams-Put.json", t)
@@ -32,7 +38,7 @@ func TestDocumentSendUpdate(t *testing.T) {
func TestDocumentSendRemove(t *testing.T) {
assertDocumentSend([]string{"document", "testdata/A-Head-Full-of-Dreams-Remove.json"},
- "remove", "DELETE", "id:mynamespace:music::a-head-full-of-dreams", "testdata/A-Head-Full-of-Dreams-Remove.json", t)
+ "remove", "DELETE", "id:mynamespace:music::a-head-full-of-dreams", "", t)
}
func TestDocumentPutWithIdArg(t *testing.T) {
@@ -57,19 +63,24 @@ func TestDocumentUpdateWithoutIdArg(t *testing.T) {
func TestDocumentRemoveWithIdArg(t *testing.T) {
assertDocumentSend([]string{"document", "remove", "id:mynamespace:music::a-head-full-of-dreams"},
- "remove", "DELETE", "id:mynamespace:music::a-head-full-of-dreams", "testdata/A-Head-Full-of-Dreams-Remove.json", t)
+ "remove", "DELETE", "id:mynamespace:music::a-head-full-of-dreams", "", t)
}
func TestDocumentRemoveWithoutIdArg(t *testing.T) {
assertDocumentSend([]string{"document", "remove", "testdata/A-Head-Full-of-Dreams-Remove.json"},
- "remove", "DELETE", "id:mynamespace:music::a-head-full-of-dreams", "testdata/A-Head-Full-of-Dreams-Remove.json", t)
+ "remove", "DELETE", "id:mynamespace:music::a-head-full-of-dreams", "", t)
+}
+
+func TestDocumentRemoveWithoutIdArgVerbose(t *testing.T) {
+ assertDocumentSend([]string{"document", "remove", "-v", "testdata/A-Head-Full-of-Dreams-Remove.json"},
+ "remove", "DELETE", "id:mynamespace:music::a-head-full-of-dreams", "", t)
}
func TestDocumentSendMissingId(t *testing.T) {
cli, _, stderr := newTestCLI(t)
assert.NotNil(t, cli.Run("document", "put", "testdata/A-Head-Full-of-Dreams-Without-Operation.json"))
assert.Equal(t,
- "Error: No document id given neither as argument or as a 'put' key in the json file\n",
+ "Error: no document id given neither as argument or as a 'put', 'update' or 'remove' key in the JSON file\n",
stderr.String())
}
@@ -77,7 +88,7 @@ func TestDocumentSendWithDisagreeingOperations(t *testing.T) {
cli, _, stderr := newTestCLI(t)
assert.NotNil(t, cli.Run("document", "update", "testdata/A-Head-Full-of-Dreams-Put.json"))
assert.Equal(t,
- "Error: Wanted document operation is update but the JSON file specifies put\n",
+ "Error: wanted document operation is update, but JSON file specifies put\n",
stderr.String())
}
@@ -103,7 +114,7 @@ func assertDocumentSend(arguments []string, expectedOperation string, expectedMe
t.Fatal(err)
}
expectedPath, _ := vespa.IdToURLPath(expectedDocumentId)
- expectedURL := documentURL + "/document/v1/" + expectedPath
+ expectedURL := documentURL + "/document/v1/" + expectedPath + "?timeout=60000ms"
assert.Nil(t, cli.Run(arguments...))
verbose := false
@@ -113,16 +124,29 @@ func assertDocumentSend(arguments []string, expectedOperation string, expectedMe
}
}
if verbose {
- expectedCurl := "curl -X " + expectedMethod + " -H 'Content-Type: application/json' --data-binary @" + expectedPayloadFile + " " + expectedURL + "\n"
+ expectedCurl := "curl -X " + expectedMethod + " -H 'Content-Type: application/json; charset=utf-8' -H 'User-Agent: Vespa CLI/0.0.0-devel'"
+ if expectedPayloadFile != "" {
+ expectedCurl += " --data-binary @" + expectedPayloadFile
+ }
+ expectedCurl += " '" + expectedURL + "'\n"
assert.Equal(t, expectedCurl, stderr.String())
}
assert.Equal(t, "Success: "+expectedOperation+" "+expectedDocumentId+"\n", stdout.String())
assert.Equal(t, expectedURL, client.LastRequest.URL.String())
- assert.Equal(t, "application/json", client.LastRequest.Header.Get("Content-Type"))
+ assert.Equal(t, "application/json; charset=utf-8", client.LastRequest.Header.Get("Content-Type"))
assert.Equal(t, expectedMethod, client.LastRequest.Method)
- expectedPayload, _ := os.ReadFile(expectedPayloadFile)
- assert.Equal(t, string(expectedPayload), util.ReaderToString(client.LastRequest.Body))
+ if expectedPayloadFile != "" {
+ data, err := os.ReadFile(expectedPayloadFile)
+ assert.Nil(t, err)
+ var expectedPayload struct {
+ Fields json.RawMessage `json:"fields"`
+ }
+ assert.Nil(t, json.Unmarshal(data, &expectedPayload))
+ assert.Equal(t, `{"fields":`+string(expectedPayload.Fields)+"}", util.ReaderToString(client.LastRequest.Body))
+ } else {
+ assert.Nil(t, client.LastRequest.Body)
+ }
}
func assertDocumentGet(arguments []string, documentId string, t *testing.T) {
@@ -170,7 +194,7 @@ func assertDocumentServerError(t *testing.T, status int, errorMessage string) {
"id:mynamespace:music::a-head-full-of-dreams",
"testdata/A-Head-Full-of-Dreams-Put.json"))
assert.Equal(t,
- "Error: Container (document API) at 127.0.0.1:8080: Status "+strconv.Itoa(status)+"\n\n"+errorMessage+"\n",
+ "Error: Container (document API) at http://127.0.0.1:8080: Status "+strconv.Itoa(status)+"\n\n"+errorMessage+"\n",
stderr.String())
}
diff --git a/client/go/internal/cli/cmd/feed.go b/client/go/internal/cli/cmd/feed.go
index 2a7d8491578..6d368cb210b 100644
--- a/client/go/internal/cli/cmd/feed.go
+++ b/client/go/internal/cli/cmd/feed.go
@@ -1,6 +1,7 @@
package cmd
import (
+ "bufio"
"encoding/json"
"fmt"
"io"
@@ -164,7 +165,7 @@ func feedFiles(files []string, dispatcher *document.Dispatcher, cli *CLI) {
}
func dispatchFrom(r io.ReadCloser, dispatcher *document.Dispatcher, cli *CLI) {
- dec := document.NewDecoder(r)
+ dec := document.NewDecoder(bufio.NewReaderSize(r, 1<<26)) // Buffer up to 64M of data at a time
defer r.Close()
for {
doc, err := dec.Decode()
diff --git a/client/go/internal/cli/cmd/testdata/A-Head-Full-of-Dreams-Put-Id.json b/client/go/internal/cli/cmd/testdata/A-Head-Full-of-Dreams-Put-Id.json
new file mode 100644
index 00000000000..d39b22782ab
--- /dev/null
+++ b/client/go/internal/cli/cmd/testdata/A-Head-Full-of-Dreams-Put-Id.json
@@ -0,0 +1,15 @@
+{
+ "id": "id:mynamespace:music::a-head-full-of-dreams",
+ "fields": {
+ "album": "A Head Full of Dreams",
+ "artist": "Coldplay",
+ "year": 2015,
+ "category_scores": {
+ "cells": [
+ { "address" : { "cat" : "pop" }, "value": 1 },
+ { "address" : { "cat" : "rock" }, "value": 0.2 },
+ { "address" : { "cat" : "jazz" }, "value": 0 }
+ ]
+ }
+ }
+}
diff --git a/client/go/internal/curl/curl.go b/client/go/internal/curl/curl.go
index daa60e6ff14..5f4b7928704 100644
--- a/client/go/internal/curl/curl.go
+++ b/client/go/internal/curl/curl.go
@@ -6,6 +6,7 @@ import (
"net/url"
"os/exec"
"runtime"
+ "sort"
"github.com/alessio/shellescape"
"github.com/vespa-engine/vespa/client/go/internal/util"
@@ -61,6 +62,7 @@ func (c *Command) Args() []string {
if c.Method != "" {
args = append(args, "-X", c.Method)
}
+ sort.Slice(c.headers, func(i, j int) bool { return c.headers[i].key < c.headers[j].key })
for _, header := range c.headers {
args = append(args, "-H", header.key+": "+header.value)
}
diff --git a/client/go/internal/vespa/document.go b/client/go/internal/vespa/document.go
deleted file mode 100644
index 9e4c8e7d136..00000000000
--- a/client/go/internal/vespa/document.go
+++ /dev/null
@@ -1,197 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-// vespa document API client
-// Author: bratseth
-
-package vespa
-
-import (
- "bytes"
- "encoding/json"
- "fmt"
- "io"
- "net/http"
- "net/url"
- "os"
- "time"
-
- "github.com/vespa-engine/vespa/client/go/internal/curl"
- "github.com/vespa-engine/vespa/client/go/internal/util"
-)
-
-// Sends the operation given in the file
-func Send(jsonFile string, service *Service, options OperationOptions) util.OperationResult {
- return sendOperation("", jsonFile, service, anyOperation, options)
-}
-
-func Put(documentId string, jsonFile string, service *Service, options OperationOptions) util.OperationResult {
- return sendOperation(documentId, jsonFile, service, putOperation, options)
-}
-
-func Update(documentId string, jsonFile string, service *Service, options OperationOptions) util.OperationResult {
- return sendOperation(documentId, jsonFile, service, updateOperation, options)
-}
-
-func RemoveId(documentId string, service *Service, options OperationOptions) util.OperationResult {
- return sendOperation(documentId, "", service, removeOperation, options)
-}
-
-func RemoveOperation(jsonFile string, service *Service, options OperationOptions) util.OperationResult {
- return sendOperation("", jsonFile, service, removeOperation, options)
-}
-
-const (
- anyOperation string = "any"
- putOperation string = "put"
- updateOperation string = "update"
- removeOperation string = "remove"
-)
-
-type OperationOptions struct {
- CurlOutput io.Writer
- Timeout time.Duration
-}
-
-func sendOperation(documentId string, jsonFile string, service *Service, operation string, options OperationOptions) util.OperationResult {
- header := http.Header{}
- header.Add("Content-Type", "application/json")
-
- var documentData []byte
- if operation == "remove" && jsonFile == "" {
- documentData = []byte("{\n \"remove\": \"" + documentId + "\"\n}\n")
- } else {
- fileReader, err := os.Open(jsonFile)
- if err != nil {
- return util.FailureWithDetail("Could not open file '"+jsonFile+"'", err.Error())
- }
- defer fileReader.Close()
- documentData, err = io.ReadAll(fileReader)
- if err != nil {
- return util.FailureWithDetail("Failed to read '"+jsonFile+"'", err.Error())
- }
- }
-
- var doc map[string]interface{}
- if err := json.Unmarshal(documentData, &doc); err != nil {
- return util.Failure(fmt.Sprintf("Document is not valid JSON: %s", err))
- }
-
- operationInFile := operationIn(doc)
- if operation == anyOperation { // Operation is decided by file content
- operation = operationInFile
- } else if operationInFile != "" && operationInFile != operation { // Otherwise operation must match
- return util.Failure("Wanted document operation is " + operation + " but the JSON file specifies " + operationInFile)
- }
-
- if documentId == "" { // Document id is decided by file content
- if doc[operation] == nil {
- return util.Failure("No document id given neither as argument or as a '" + operation + "' key in the json file")
- }
- documentId = doc[operation].(string) // document feeder format
- }
-
- documentPath, documentPathError := IdToURLPath(documentId)
- if documentPathError != nil {
- return util.Failure("Invalid document id '" + documentId + "': " + documentPathError.Error())
- }
-
- url, urlParseError := url.Parse(service.BaseURL + "/document/v1/" + documentPath)
- if urlParseError != nil {
- return util.Failure("Invalid request path: '" + service.BaseURL + "/document/v1/" + documentPath + "': " + urlParseError.Error())
- }
-
- request := &http.Request{
- URL: url,
- Method: operationToHTTPMethod(operation),
- Header: header,
- Body: io.NopCloser(bytes.NewReader(documentData)),
- }
- response, err := serviceDo(service, request, jsonFile, options)
- if err != nil {
- return util.Failure("Request failed: " + err.Error())
- }
-
- defer response.Body.Close()
- if response.StatusCode == 200 {
- return util.Success(operation + " " + documentId)
- } else if response.StatusCode/100 == 4 {
- return util.FailureWithPayload("Invalid document operation: "+response.Status, util.ReaderToJSON(response.Body))
- } else {
- return util.FailureWithPayload(service.Description()+" at "+request.URL.Host+": "+response.Status, util.ReaderToJSON(response.Body))
- }
-}
-
-func operationIn(doc map[string]interface{}) string {
- if doc["put"] != nil {
- return "put"
- } else if doc["update"] != nil {
- return "update"
- } else if doc["remove"] != nil {
- return "remove"
- } else {
- return ""
- }
-}
-
-func operationToHTTPMethod(operation string) string {
- switch operation {
- case "put":
- return "POST"
- case "update":
- return "PUT"
- case "remove":
- return "DELETE"
- }
- util.JustExitMsg("Unexpected document operation ''" + operation + "'")
- panic("unreachable")
-}
-
-func serviceDo(service *Service, request *http.Request, filename string, options OperationOptions) (*http.Response, error) {
- cmd, err := curl.RawArgs(request.URL.String())
- if err != nil {
- return nil, err
- }
- cmd.Method = request.Method
- for k, vs := range request.Header {
- for _, v := range vs {
- cmd.Header(k, v)
- }
- }
- cmd.WithBodyFile(filename)
- cmd.Certificate = service.TLSOptions.CertificateFile
- cmd.PrivateKey = service.TLSOptions.PrivateKeyFile
- out := cmd.String() + "\n"
- if _, err := io.WriteString(options.CurlOutput, out); err != nil {
- return nil, err
- }
- return service.Do(request, options.Timeout)
-}
-
-func Get(documentId string, service *Service, options OperationOptions) util.OperationResult {
- documentPath, documentPathError := IdToURLPath(documentId)
- if documentPathError != nil {
- return util.Failure("Invalid document id '" + documentId + "': " + documentPathError.Error())
- }
-
- url, urlParseError := url.Parse(service.BaseURL + "/document/v1/" + documentPath)
- if urlParseError != nil {
- return util.Failure("Invalid request path: '" + service.BaseURL + "/document/v1/" + documentPath + "': " + urlParseError.Error())
- }
-
- request := &http.Request{
- URL: url,
- Method: "GET",
- }
- response, err := serviceDo(service, request, "", options)
- if err != nil {
- return util.Failure("Request failed: " + err.Error())
- }
-
- defer response.Body.Close()
- if response.StatusCode == 200 {
- return util.SuccessWithPayload("Read "+documentId, util.ReaderToJSON(response.Body))
- } else if response.StatusCode/100 == 4 {
- return util.FailureWithPayload("Invalid document operation: "+response.Status, util.ReaderToJSON(response.Body))
- } else {
- return util.FailureWithPayload(service.Description()+" at "+request.URL.Host+": "+response.Status, util.ReaderToJSON(response.Body))
- }
-}
diff --git a/client/go/internal/vespa/document/circuit_breaker.go b/client/go/internal/vespa/document/circuit_breaker.go
index f7f0f4360df..9bcf2e3f619 100644
--- a/client/go/internal/vespa/document/circuit_breaker.go
+++ b/client/go/internal/vespa/document/circuit_breaker.go
@@ -19,7 +19,7 @@ const (
type CircuitBreaker interface {
Success()
- Error(error)
+ Failure()
State() CircuitState
}
@@ -28,7 +28,6 @@ type timeCircuitBreaker struct {
doomDuration time.Duration
failingSinceMillis atomic.Int64
- lastError atomic.Value
halfOpen atomic.Bool
open atomic.Bool
@@ -42,10 +41,8 @@ func (b *timeCircuitBreaker) Success() {
}
}
-func (b *timeCircuitBreaker) Error(err error) {
- if b.failingSinceMillis.CompareAndSwap(math.MaxInt64, b.now().UnixMilli()) {
- b.lastError.Store(err)
- }
+func (b *timeCircuitBreaker) Failure() {
+ b.failingSinceMillis.CompareAndSwap(math.MaxInt64, b.now().UnixMilli())
}
func (b *timeCircuitBreaker) State() CircuitState {
diff --git a/client/go/internal/vespa/document/circuit_breaker_test.go b/client/go/internal/vespa/document/circuit_breaker_test.go
index 7a4fffaae27..05dbd6da2f5 100644
--- a/client/go/internal/vespa/document/circuit_breaker_test.go
+++ b/client/go/internal/vespa/document/circuit_breaker_test.go
@@ -1,7 +1,6 @@
package document
import (
- "errors"
"testing"
"time"
@@ -12,7 +11,6 @@ func TestCircuitBreaker(t *testing.T) {
clock := &manualClock{}
breaker := NewCircuitBreaker(time.Second, time.Minute)
breaker.now = clock.now
- err := errors.New("error")
assert.Equal(t, CircuitClosed, breaker.State(), "Initial state is closed")
@@ -25,7 +23,7 @@ func TestCircuitBreaker(t *testing.T) {
clock.advance(100 * time.Second)
assert.Equal(t, CircuitClosed, breaker.State(), "State is closed some time after a success")
- breaker.Error(err)
+ breaker.Failure()
assert.Equal(t, CircuitClosed, breaker.State(), "State is closed right after a failure")
clock.advance(time.Second)
@@ -37,7 +35,7 @@ func TestCircuitBreaker(t *testing.T) {
breaker.Success()
assert.Equal(t, CircuitClosed, breaker.State(), "State is closed after a new success")
- breaker.Error(err)
+ breaker.Failure()
clock.advance(time.Minute)
assert.Equal(t, CircuitHalfOpen, breaker.State(), "State is half-open until doom duration has passed")
diff --git a/client/go/internal/vespa/document/dispatcher.go b/client/go/internal/vespa/document/dispatcher.go
index a2b84aaeef2..d9273d2f677 100644
--- a/client/go/internal/vespa/document/dispatcher.go
+++ b/client/go/internal/vespa/document/dispatcher.go
@@ -3,6 +3,7 @@ package document
import (
"fmt"
"io"
+ "strconv"
"strings"
"sync"
"sync/atomic"
@@ -62,41 +63,51 @@ func NewDispatcher(feeder Feeder, throttler Throttler, breaker CircuitBreaker, o
return d
}
-func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool {
+func (d *Dispatcher) logResult(doc Document, result Result, retry bool) {
if result.Trace != "" {
- d.msgs <- fmt.Sprintf("feed: trace for %s:\n%s", op.document, result.Trace)
+ d.msgs <- fmt.Sprintf("feed: trace for %s %s:\n%s", doc.Operation, doc.Id, result.Trace)
}
- if result.Success() {
- if d.verbose {
- d.msgs <- fmt.Sprintf("feed: %s succeeded with status %d", op.document, result.HTTPStatus)
- }
- d.throttler.Success()
- d.circuitBreaker.Success()
- return false
+ if !d.verbose && result.Success() {
+ return
}
- if result.HTTPStatus == 429 || result.HTTPStatus == 503 {
- d.msgs <- fmt.Sprintf("feed: %s was throttled with status %d: retrying", op.document, result.HTTPStatus)
- d.throttler.Throttled(d.inflightCount.Load())
- return true
+ var msg strings.Builder
+ msg.WriteString("feed: got status ")
+ msg.WriteString(strconv.Itoa(result.HTTPStatus))
+ msg.WriteString(" (")
+ if result.Body != nil {
+ msg.Write(result.Body)
+ } else {
+ msg.WriteString("no body")
}
- if result.Err != nil || result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 {
- retry := op.attempts < maxAttempts
- var msg strings.Builder
- msg.WriteString("feed: ")
- msg.WriteString(op.document.String())
- msg.WriteString(" failed: ")
- if result.Err != nil {
- msg.WriteString(result.Err.Error())
- } else {
- msg.WriteString(fmt.Sprintf("status %d", result.HTTPStatus))
- }
+ msg.WriteString(")")
+ msg.WriteString(" for ")
+ msg.WriteString(doc.Operation.String())
+ msg.WriteString(" ")
+ msg.WriteString(doc.Id.String())
+ if !result.Success() {
if retry {
msg.WriteString(": retrying")
} else {
- msg.WriteString(fmt.Sprintf(": giving up after %d attempts", maxAttempts))
+ msg.WriteString(": giving up after ")
+ msg.WriteString(strconv.Itoa(maxAttempts))
+ msg.WriteString(" attempts")
}
- d.msgs <- msg.String()
- d.circuitBreaker.Error(fmt.Errorf("request failed with status %d", result.HTTPStatus))
+ }
+ d.msgs <- msg.String()
+}
+
+func (d *Dispatcher) shouldRetry(op documentOp, result Result) bool {
+ retry := op.attempts < maxAttempts
+ d.logResult(op.document, result, retry)
+ if result.Success() {
+ d.throttler.Success()
+ d.circuitBreaker.Success()
+ return false
+ } else if result.HTTPStatus == 429 || result.HTTPStatus == 503 {
+ d.throttler.Throttled(d.inflightCount.Load())
+ return true
+ } else if result.Err != nil || result.HTTPStatus == 500 || result.HTTPStatus == 502 || result.HTTPStatus == 504 {
+ d.circuitBreaker.Failure()
if retry {
return true
}
diff --git a/client/go/internal/vespa/document/dispatcher_test.go b/client/go/internal/vespa/document/dispatcher_test.go
index 9bc0c76106c..834ec8490a6 100644
--- a/client/go/internal/vespa/document/dispatcher_test.go
+++ b/client/go/internal/vespa/document/dispatcher_test.go
@@ -48,7 +48,7 @@ func (f *mockFeeder) Send(doc Document) Result {
type mockCircuitBreaker struct{ state CircuitState }
func (c *mockCircuitBreaker) Success() {}
-func (c *mockCircuitBreaker) Error(err error) {}
+func (c *mockCircuitBreaker) Failure() {}
func (c *mockCircuitBreaker) State() CircuitState { return c.state }
func TestDispatcher(t *testing.T) {
diff --git a/client/go/internal/vespa/document/document.go b/client/go/internal/vespa/document/document.go
index 616013dc59a..a9b184190fb 100644
--- a/client/go/internal/vespa/document/document.go
+++ b/client/go/internal/vespa/document/document.go
@@ -1,8 +1,8 @@
package document
import (
- "bufio"
"bytes"
+ "errors"
"fmt"
"io"
"math/rand"
@@ -36,10 +36,23 @@ const (
)
var (
+ ErrMissingId = errors.New("no id specified")
fieldsPrefix = []byte(`{"fields":`)
fieldsSuffix = []byte("}")
)
+func (o Operation) String() string {
+ switch o {
+ case OperationPut:
+ return "put"
+ case OperationUpdate:
+ return "update"
+ case OperationRemove:
+ return "remove"
+ }
+ return ""
+}
+
// Id represents a Vespa document ID.
type Id struct {
id string
@@ -152,14 +165,8 @@ type Decoder struct {
func (d Document) String() string {
var sb strings.Builder
- switch d.Operation {
- case OperationPut:
- sb.WriteString("put ")
- case OperationUpdate:
- sb.WriteString("update ")
- case OperationRemove:
- sb.WriteString("remove ")
- }
+ sb.WriteString(d.Operation.String())
+ sb.WriteString(" ")
sb.WriteString(d.Id.String())
if d.Condition != "" {
sb.WriteString(", condition=")
@@ -228,7 +235,7 @@ func (d *Decoder) readBool() (bool, error) {
func (d *Decoder) Decode() (Document, error) {
doc, err := d.decode()
if err != nil && err != io.EOF {
- return Document{}, fmt.Errorf("invalid json at byte offset %d: %w", d.dec.InputOffset(), err)
+ return doc, fmt.Errorf("invalid operation at byte offset %d: %w", d.dec.InputOffset(), err)
}
return doc, err
}
@@ -347,14 +354,16 @@ loop:
break loop
}
}
+ if doc.Id.id == "" {
+ return doc, ErrMissingId
+ }
return doc, nil
}
func NewDecoder(r io.Reader) *Decoder {
- br := bufio.NewReaderSize(r, 1<<26)
d := &Decoder{}
d.documentBuffers.New = func() any { return &bytes.Buffer{} }
- d.dec = json.NewDecoder(io.TeeReader(br, &d.buf))
+ d.dec = json.NewDecoder(io.TeeReader(r, &d.buf))
return d
}
diff --git a/client/go/internal/vespa/document/document_test.go b/client/go/internal/vespa/document/document_test.go
index f9bf321f1fb..d37febf3da8 100644
--- a/client/go/internal/vespa/document/document_test.go
+++ b/client/go/internal/vespa/document/document_test.go
@@ -1,6 +1,7 @@
package document
import (
+ "errors"
"fmt"
"io"
"strings"
@@ -204,10 +205,17 @@ func TestDocumentDecoderInvalid(t *testing.T) {
t.Errorf("unexpected error: %s", err)
}
_, err = dec.Decode()
- wantErr := "invalid json at byte offset 110: json: invalid character '\\n' within string (expecting non-control character)"
+ wantErr := "invalid operation at byte offset 110: json: invalid character '\\n' within string (expecting non-control character)"
if err.Error() != wantErr {
t.Errorf("want error %q, got %q", wantErr, err.Error())
}
+
+ dec = NewDecoder(strings.NewReader(`{}`))
+ _, err = dec.Decode()
+ wantErr = "invalid operation at byte offset 2: no id specified"
+ if !errors.Is(err, ErrMissingId) {
+ t.Errorf("want error %q, got %q", ErrMissingId, err.Error())
+ }
}
func benchmarkDocumentDecoder(b *testing.B, size int) {
diff --git a/client/go/internal/vespa/document/http.go b/client/go/internal/vespa/document/http.go
index 0c27b0d5440..28e3c225e34 100644
--- a/client/go/internal/vespa/document/http.go
+++ b/client/go/internal/vespa/document/http.go
@@ -127,37 +127,38 @@ func writeQueryParam(sb *bytes.Buffer, start int, escape bool, k, v string) {
}
}
-func (c *Client) methodAndURL(d Document, sb *bytes.Buffer) (string, string) {
- httpMethod := ""
- switch d.Operation {
- case OperationPut:
- httpMethod = "POST"
- case OperationUpdate:
- httpMethod = "PUT"
- case OperationRemove:
- httpMethod = "DELETE"
- }
- // Base URL and path
- sb.WriteString(c.options.BaseURL)
- if !strings.HasSuffix(c.options.BaseURL, "/") {
- sb.WriteString("/")
- }
- sb.WriteString("document/v1/")
- sb.WriteString(url.PathEscape(d.Id.Namespace))
+func (c *Client) writeDocumentPath(id Id, sb *bytes.Buffer) {
+ sb.WriteString(strings.TrimSuffix(c.options.BaseURL, "/"))
+ sb.WriteString("/document/v1/")
+ sb.WriteString(url.PathEscape(id.Namespace))
sb.WriteString("/")
- sb.WriteString(url.PathEscape(d.Id.Type))
- if d.Id.Number != nil {
+ sb.WriteString(url.PathEscape(id.Type))
+ if id.Number != nil {
sb.WriteString("/number/")
- n := uint64(*d.Id.Number)
+ n := uint64(*id.Number)
sb.WriteString(strconv.FormatUint(n, 10))
- } else if d.Id.Group != "" {
+ } else if id.Group != "" {
sb.WriteString("/group/")
- sb.WriteString(url.PathEscape(d.Id.Group))
+ sb.WriteString(url.PathEscape(id.Group))
} else {
sb.WriteString("/docid")
}
sb.WriteString("/")
- sb.WriteString(url.PathEscape(d.Id.UserSpecific))
+ sb.WriteString(url.PathEscape(id.UserSpecific))
+}
+
+func (c *Client) methodAndURL(d Document, sb *bytes.Buffer) (string, string) {
+ httpMethod := ""
+ switch d.Operation {
+ case OperationPut:
+ httpMethod = http.MethodPost
+ case OperationUpdate:
+ httpMethod = http.MethodPut
+ case OperationRemove:
+ httpMethod = http.MethodDelete
+ }
+ // Base URL and path
+ c.writeDocumentPath(d.Id, sb)
// Query part
queryStart := sb.Len()
if c.options.Timeout > 0 {
@@ -290,7 +291,28 @@ func (c *Client) Send(document Document) Result {
}
defer resp.Body.Close()
elapsed := c.now().Sub(start)
- return resultWithResponse(resp, bodySize, result, elapsed, buf)
+ return c.resultWithResponse(resp, bodySize, result, elapsed)
+}
+
+// Get retrieves document with given ID.nnnnnn
+func (c *Client) Get(id Id) Result {
+ start := c.now()
+ buf := c.buffer()
+ defer c.buffers.Put(buf)
+ c.writeDocumentPath(id, buf)
+ url := buf.String()
+ result := Result{Id: id}
+ req, err := http.NewRequest(http.MethodGet, url, nil)
+ if err != nil {
+ return resultWithErr(result, err)
+ }
+ resp, err := c.leastBusyClient().Do(req, c.clientTimeout())
+ if err != nil {
+ return resultWithErr(result, err)
+ }
+ defer resp.Body.Close()
+ elapsed := c.now().Sub(start)
+ return c.resultWithResponse(resp, 0, result, elapsed)
}
func resultWithErr(result Result, err error) Result {
@@ -299,7 +321,7 @@ func resultWithErr(result Result, err error) Result {
return result
}
-func resultWithResponse(resp *http.Response, sentBytes int, result Result, elapsed time.Duration, buf *bytes.Buffer) Result {
+func (c *Client) resultWithResponse(resp *http.Response, sentBytes int, result Result, elapsed time.Duration) Result {
result.HTTPStatus = resp.StatusCode
switch resp.StatusCode {
case 200:
@@ -311,24 +333,24 @@ func resultWithResponse(resp *http.Response, sentBytes int, result Result, elaps
default:
result.Status = StatusTransportFailure
}
- buf.Reset()
- written, err := io.Copy(buf, resp.Body)
+ b, err := io.ReadAll(resp.Body)
if err != nil {
result = resultWithErr(result, err)
} else {
- var body struct {
- Message string `json:"message"`
- Trace json.RawValue `json:"trace"`
- }
- if err := json.Unmarshal(buf.Bytes(), &body); err != nil {
- result = resultWithErr(result, fmt.Errorf("failed to decode json response: %w", err))
- } else {
- result.Message = body.Message
- result.Trace = string(body.Trace)
+ result.Body = b
+ if result.HTTPStatus == 200 && c.options.TraceLevel > 0 {
+ var jsonResponse struct {
+ Trace json.RawValue `json:"trace"`
+ }
+ if err := json.Unmarshal(b, &jsonResponse); err != nil {
+ result = resultWithErr(result, fmt.Errorf("failed to decode json response: %w", err))
+ } else {
+ result.Trace = string(jsonResponse.Trace)
+ }
}
}
result.Latency = elapsed
result.BytesSent = int64(sentBytes)
- result.BytesRecv = int64(written)
+ result.BytesRecv = int64(len(b))
return result
}
diff --git a/client/go/internal/vespa/document/http_test.go b/client/go/internal/vespa/document/http_test.go
index b34b322f6eb..f8871cb0905 100644
--- a/client/go/internal/vespa/document/http_test.go
+++ b/client/go/internal/vespa/document/http_test.go
@@ -89,17 +89,19 @@ func TestClientSend(t *testing.T) {
Latency: time.Second,
}
if i < 3 {
- httpClient.NextResponseString(200, `{"message":"All good!"}`)
+ msg := `{"message":"All good!"}`
+ httpClient.NextResponseString(200, msg)
wantRes.Status = StatusSuccess
wantRes.HTTPStatus = 200
- wantRes.Message = "All good!"
+ wantRes.Body = []byte(msg)
wantRes.BytesRecv = 23
} else {
- httpClient.NextResponseString(502, `{"message":"Good bye, cruel world!"}`)
+ errMsg := `something went wront`
+ httpClient.NextResponseString(502, errMsg)
wantRes.Status = StatusVespaFailure
wantRes.HTTPStatus = 502
- wantRes.Message = "Good bye, cruel world!"
- wantRes.BytesRecv = 36
+ wantRes.Body = []byte(errMsg)
+ wantRes.BytesRecv = 20
}
res := client.Send(doc)
wantRes.BytesSent = int64(len(httpClient.LastBody))
@@ -134,13 +136,45 @@ func TestClientSend(t *testing.T) {
MinLatency: time.Second,
MaxLatency: time.Second,
BytesSent: 75,
- BytesRecv: 105,
+ BytesRecv: 89,
}
if !reflect.DeepEqual(want, stats) {
t.Errorf("got %+v, want %+v", stats, want)
}
}
+func TestClientGet(t *testing.T) {
+ httpClient := mock.HTTPClient{ReadBody: true}
+ client, _ := NewClient(ClientOptions{
+ BaseURL: "https://example.com:1337",
+ Timeout: time.Duration(5 * time.Second),
+ }, []util.HTTPClient{&httpClient})
+ clock := manualClock{t: time.Now(), tick: time.Second}
+ client.now = clock.now
+ doc := `{
+ "pathId": "/document/v1/mynamespace/music/docid/doc1",
+ "id": "id:mynamespace:music::doc1",
+ "fields": {
+ "artist": "Metallica",
+ "album": "Master of Puppets"
+ }
+}`
+ id := Id{Namespace: "mynamespace", Type: "music", UserSpecific: "doc1"}
+ httpClient.NextResponseString(200, doc)
+ result := client.Get(id)
+ want := Result{
+ Id: id,
+ Body: []byte(doc),
+ Status: StatusSuccess,
+ HTTPStatus: 200,
+ Latency: time.Second,
+ BytesRecv: 192,
+ }
+ if !reflect.DeepEqual(want, result) {
+ t.Errorf("got %+v, want %+v", result, want)
+ }
+}
+
func TestClientSendCompressed(t *testing.T) {
httpClient := &mock.HTTPClient{ReadBody: true}
client, _ := NewClient(ClientOptions{
diff --git a/client/go/internal/vespa/document/stats.go b/client/go/internal/vespa/document/stats.go
index 7696648f703..3e647d0f893 100644
--- a/client/go/internal/vespa/document/stats.go
+++ b/client/go/internal/vespa/document/stats.go
@@ -24,8 +24,8 @@ const (
type Result struct {
Err error
Id Id
- Message string
Trace string
+ Body []byte
Status Status
HTTPStatus int
Latency time.Duration
diff --git a/clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterControllerClusterConfigurer.java b/clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterControllerClusterConfigurer.java
index 786928391a5..a893abb519e 100644
--- a/clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterControllerClusterConfigurer.java
+++ b/clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterControllerClusterConfigurer.java
@@ -69,7 +69,6 @@ public class ClusterControllerClusterConfigurer extends AbstractComponent {
builder.setCount(config.fleet_controller_count());
builder.setZooKeeperSessionTimeout((int) (config.zookeeper_session_timeout() * 1000));
builder.setMasterZooKeeperCooldownPeriod((int) (config.master_zookeeper_cooldown_period() * 1000));
- builder.setStateGatherCount(config.state_gather_count());
builder.setRpcPort(config.rpc_port());
builder.setHttpPort(config.http_port());
builder.setMaxTransitionTime(NodeType.STORAGE, config.storage_transition_time());
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index 6876ac8cf56..42460b5943e 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -561,7 +561,7 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta
if ( ! isRunning()) { return; }
- if (masterElectionHandler.isAmongNthFirst(options.stateGatherCount())) {
+ if (masterElectionHandler.isFirstInLine()) {
didWork |= resyncLocallyCachedState(); // Calls to metricUpdate.forWork inside method
} else {
stepDownAsStateGatherer();
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java
index 1541e1a4218..bac6a838300 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java
@@ -24,7 +24,6 @@ public class FleetControllerOptions {
private final String clusterName;
private final int fleetControllerIndex;
private final int fleetControllerCount;
- private final int stateGatherCount;
private final String[] slobrokConnectionSpecs;
private final int rpcPort;
@@ -132,7 +131,6 @@ public class FleetControllerOptions {
private FleetControllerOptions(String clusterName,
int fleetControllerIndex,
int fleetControllerCount,
- int stateGatherCount,
String[] slobrokConnectionSpecs,
int rpcPort,
int httpPort,
@@ -174,7 +172,6 @@ public class FleetControllerOptions {
this.clusterName = clusterName;
this.fleetControllerIndex = fleetControllerIndex;
this.fleetControllerCount = fleetControllerCount;
- this.stateGatherCount = stateGatherCount;
this.slobrokConnectionSpecs = slobrokConnectionSpecs;
this.rpcPort = rpcPort;
this.httpPort = httpPort;
@@ -235,10 +232,6 @@ public class FleetControllerOptions {
return fleetControllerCount;
}
- public int stateGatherCount() {
- return stateGatherCount;
- }
-
public String[] slobrokConnectionSpecs() {
return slobrokConnectionSpecs;
}
@@ -394,7 +387,6 @@ public class FleetControllerOptions {
private String clusterName;
private int index = 0;
private int count = 1;
- private int stateGatherCount = 2;
private String[] slobrokConnectionSpecs;
private int rpcPort = 0;
private int httpPort = 0;
@@ -464,11 +456,6 @@ public class FleetControllerOptions {
return this;
}
- public Builder setStateGatherCount(int stateGatherCount) {
- this.stateGatherCount = stateGatherCount;
- return this;
- }
-
public Builder setSlobrokConnectionSpecs(String[] slobrokConnectionSpecs) {
Objects.requireNonNull(slobrokConnectionSpecs, "slobrokConnectionSpecs cannot be null");
this.slobrokConnectionSpecs = slobrokConnectionSpecs;
@@ -694,7 +681,6 @@ public class FleetControllerOptions {
return new FleetControllerOptions(clusterName,
index,
count,
- stateGatherCount,
slobrokConnectionSpecs,
rpcPort,
httpPort,
@@ -740,7 +726,6 @@ public class FleetControllerOptions {
builder.clusterName = options.clusterName;
builder.index = options.fleetControllerIndex;
builder.count = options.fleetControllerCount;
- builder.stateGatherCount = options.stateGatherCount;
builder.slobrokConnectionSpecs = options.slobrokConnectionSpecs;
builder.rpcPort = options.rpcPort;
builder.httpPort = options.httpPort;
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java
index b041e6b14f8..fa303533355 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java
@@ -118,7 +118,7 @@ public class MasterElectionHandler implements MasterInterface {
return 2 * followers <= totalCount;
}
- public boolean isAmongNthFirst(int first) { return (nextInLineCount < first); }
+ public boolean isFirstInLine() { return (nextInLineCount < 1); }
public boolean watchMasterElection(DatabaseHandler database, DatabaseHandler.DatabaseContext dbContext) {
if (totalCount == 1 && !usingZooKeeper) {
@@ -251,7 +251,7 @@ public class MasterElectionHandler implements MasterInterface {
nextMasterData = null;
}
- public void writeHtmlState(StringBuilder sb, int stateGatherCount) {
+ public void writeHtmlState(StringBuilder sb) {
sb.append("<h2>Master state</h2>\n");
Integer master = getMaster();
if (master != null) {
@@ -270,7 +270,7 @@ public class MasterElectionHandler implements MasterInterface {
.append(" before electing new master unless all possible master candidates are online.</p>");
}
}
- if ((master == null || master != index) && nextInLineCount < stateGatherCount) {
+ if ((master == null || master != index) && nextInLineCount < 1) {
sb.append("<p>As we are number ").append(nextInLineCount)
.append(" in line for taking over as master, we're gathering state from nodes.</p>");
sb.append("<p><font color=\"red\">As we are not the master, we don't know about nodes current system state"
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java
index 3297d511469..5aae401e157 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java
@@ -81,7 +81,7 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa
.append(" ]</font></p>\n");
content.append("<table><tr><td>UTC time when creating this page:</td><td align=\"right\">").append(RealTimer.printDateNoMilliSeconds(currentTime, tz)).append("</td></tr>");
content.append("<tr><td>Cluster controller uptime:</td><td align=\"right\">" + RealTimer.printDuration(currentTime - startedTime) + "</td></tr></table>");
- if (masterElectionHandler.isAmongNthFirst(options.stateGatherCount())) {
+ if (masterElectionHandler.isFirstInLine()) {
// Table overview of all the nodes
writeHtmlState(cluster, content, timer, stateVersionTracker, options, eventLog);
// Current cluster state and cluster state history
@@ -91,7 +91,7 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa
writeHtmlState(content, options);
}
// State of master election
- masterElectionHandler.writeHtmlState(content, options.stateGatherCount());
+ masterElectionHandler.writeHtmlState(content);
// Overview of current config
writeHtmlState(content, options);
// Event log
@@ -223,7 +223,6 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa
sb.append("<tr><td><nobr>Cluster name</nobr></td><td align=\"right\">").append(options.clusterName()).append("</td></tr>");
sb.append("<tr><td><nobr>Fleet controller index</nobr></td><td align=\"right\">").append(options.fleetControllerIndex()).append("/").append(options.fleetControllerCount()).append("</td></tr>");
- sb.append("<tr><td><nobr>Number of fleetcontrollers gathering states from nodes</nobr></td><td align=\"right\">").append(options.stateGatherCount()).append("</td></tr>");
sb.append("<tr><td><nobr>Slobrok connection spec</nobr></td><td align=\"right\">").append(slobrokspecs).append("</td></tr>");
sb.append("<tr><td><nobr>RPC port</nobr></td><td align=\"right\">").append(options.rpcPort() == 0 ? "Pick random available" : options.rpcPort()).append("</td></tr>");
diff --git a/configdefinitions/src/vespa/fleetcontroller.def b/configdefinitions/src/vespa/fleetcontroller.def
index 93a20e4ee0d..c3e161eb038 100644
--- a/configdefinitions/src/vespa/fleetcontroller.def
+++ b/configdefinitions/src/vespa/fleetcontroller.def
@@ -29,6 +29,7 @@ master_zookeeper_cooldown_period double default=60.0
## If set to 1, only master will gather state. If set higher, others will
## also do so, prioritizing those fleetcontrollers likely to be the ones to
## take over if the master fails.
+# TODO: Deprecated, not used anymore, remove in Vespa 9
state_gather_count int default=1
## Location of ZooKeeper servers
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java
index 040f230a40e..cb8a4fec77a 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ConfigConvergenceChecker.java
@@ -4,8 +4,8 @@ package com.yahoo.vespa.config.server.application;
import ai.vespa.util.http.hc5.VespaAsyncHttpClientBuilder;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.yahoo.component.annotation.Inject;
import com.yahoo.component.AbstractComponent;
+import com.yahoo.component.annotation.Inject;
import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.config.model.api.ApplicationClusterInfo;
import com.yahoo.config.model.api.HostInfo;
@@ -103,7 +103,7 @@ public class ConfigConvergenceChecker extends AbstractComponent {
.filter(serviceInfo -> shouldCheckService(hostsToCheck, application, serviceInfo))
.forEach(service -> getStatePort(service).ifPresent(port -> servicesToCheck.add(service))));
- log.log(Level.FINE, "Services to check for config convergence: " + servicesToCheck);
+ log.log(Level.FINE, () -> "Services to check for config convergence: " + servicesToCheck);
return getServiceGenerations(servicesToCheck, timeoutPerService);
}
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ApplicationMaintainer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ApplicationMaintainer.java
index 50eee9e33b3..a1eae42da38 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ApplicationMaintainer.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/ApplicationMaintainer.java
@@ -95,16 +95,21 @@ public abstract class ApplicationMaintainer extends NodeRepositoryMaintainer {
@Override
public void shutdown() {
super.shutdown();
- deploymentExecutor.shutdownNow();
+ deploymentExecutor.shutdown();
}
@Override
public void awaitShutdown() {
+ Instant deadline = clock().instant().plus(Duration.ofMinutes(1));
super.awaitShutdown();
try {
+ long remainder = Duration.between(clock().instant(), deadline).toMillis();
+ if (deploymentExecutor.isShutdown()) return;
+
// Give deployments in progress some time to complete
- if (!deploymentExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
+ if (remainder < 0 || !deploymentExecutor.awaitTermination(remainder, TimeUnit.MILLISECONDS)) {
log.log(Level.WARNING, "Failed to shut down deployment executor within deadline");
+ deploymentExecutor.shutdownNow();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
diff --git a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp
index 841d7f92b62..6ca7d298ee2 100644
--- a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp
+++ b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp
@@ -25,6 +25,7 @@
#include <vespa/vespalib/testkit/test_kit.h>
#include <vespa/vespalib/util/mmap_file_allocator_factory.h>
#include <vespa/searchlib/util/bufferwriter.h>
+#include <vespa/vespalib/util/fake_doom.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/document/base/exceptions.h>
#include <vespa/eval/eval/fast_value.h>
@@ -301,23 +302,27 @@ public:
std::vector<Neighbor> find_top_k(uint32_t k,
const search::tensor::BoundDistanceFunction &df,
uint32_t explore_k,
+ const vespalib::Doom& doom,
double distance_threshold) const override
{
(void) k;
(void) df;
(void) explore_k;
+ (void) doom;
(void) distance_threshold;
return std::vector<Neighbor>();
}
std::vector<Neighbor> find_top_k_with_filter(uint32_t k,
const search::tensor::BoundDistanceFunction &df,
const GlobalFilter& filter, uint32_t explore_k,
+ const vespalib::Doom& doom,
double distance_threshold) const override
{
(void) k;
(void) df;
(void) explore_k;
(void) filter;
+ (void) doom;
(void) distance_threshold;
return std::vector<Neighbor>();
}
@@ -1291,10 +1296,12 @@ template <typename ParentT>
class NearestNeighborBlueprintFixtureBase : public ParentT {
private:
std::unique_ptr<Value> _query_tensor;
+ vespalib::FakeDoom _no_doom;
public:
NearestNeighborBlueprintFixtureBase()
- : _query_tensor()
+ : _query_tensor(),
+ _no_doom()
{
this->set_tensor(1, vec_2d(1, 1));
this->set_tensor(2, vec_2d(2, 2));
@@ -1321,7 +1328,7 @@ public:
create_query_tensor(vec_2d(17, 42))),
3, approximate, 5,
100100.25,
- global_filter_lower_limit, 1.0);
+ global_filter_lower_limit, 1.0, _no_doom.get_doom());
EXPECT_EQUAL(11u, bp->getState().estimate().estHits);
EXPECT_EQUAL(100100.25 * 100100.25, bp->get_distance_threshold());
return bp;
diff --git a/searchlib/src/tests/tensor/hnsw_index/hnsw_index_test.cpp b/searchlib/src/tests/tensor/hnsw_index/hnsw_index_test.cpp
index 4d759132114..f59c16c76f9 100644
--- a/searchlib/src/tests/tensor/hnsw_index/hnsw_index_test.cpp
+++ b/searchlib/src/tests/tensor/hnsw_index/hnsw_index_test.cpp
@@ -17,6 +17,7 @@
#include <vespa/vespalib/datastore/compaction_spec.h>
#include <vespa/vespalib/datastore/compaction_strategy.h>
#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/vespalib/util/fake_doom.h>
#include <vespa/vespalib/util/generationhandler.h>
#include <vespa/vespalib/data/slime/slime.h>
#include <type_traits>
@@ -90,13 +91,15 @@ public:
LevelGenerator* level_generator;
GenerationHandler gen_handler;
std::unique_ptr<IndexType> index;
+ std::unique_ptr<vespalib::FakeDoom> _doom;
HnswIndexTest()
: vectors(),
global_filter(GlobalFilter::create()),
level_generator(),
gen_handler(),
- index()
+ index(),
+ _doom(std::make_unique<vespalib::FakeDoom>())
{
vectors.set(1, {2, 2}).set(2, {3, 2}).set(3, {2, 3})
.set(4, {1, 2}).set(5, {8, 3}).set(6, {7, 2})
@@ -173,8 +176,8 @@ public:
vespalib::eval::TypedCells qv_cells(qv_ref);
auto df = index->distance_function_factory().for_query_vector(qv_cells);
auto got_by_docid = (global_filter->is_active()) ?
- index->find_top_k_with_filter(k, *df, *global_filter, explore_k, 10000.0) :
- index->find_top_k(k, *df, explore_k, 10000.0);
+ index->find_top_k_with_filter(k, *df, *global_filter, explore_k, _doom->get_doom(), 10000.0) :
+ index->find_top_k(k, *df, explore_k, _doom->get_doom(), 10000.0);
std::vector<uint32_t> act;
act.reserve(got_by_docid.size());
for (auto& hit : got_by_docid) {
@@ -186,7 +189,7 @@ public:
uint32_t k = 3;
auto qv = vectors.get_vector(docid, 0);
auto df = index->distance_function_factory().for_query_vector(qv);
- auto rv = index->top_k_candidates(*df, k, global_filter->ptr_if_active()).peek();
+ auto rv = index->top_k_candidates(*df, k, global_filter->ptr_if_active(), _doom->get_doom()).peek();
std::sort(rv.begin(), rv.end(), LesserDistance());
size_t idx = 0;
for (const auto & hit : rv) {
@@ -197,25 +200,27 @@ public:
if (exp_hits.size() == k) {
std::vector<uint32_t> expected_by_docid = exp_hits;
std::sort(expected_by_docid.begin(), expected_by_docid.end());
- auto got_by_docid = index->find_top_k(k, *df, k, 100100.25);
+ auto got_by_docid = index->find_top_k(k, *df, k, _doom->get_doom(), 100100.25);
for (idx = 0; idx < k; ++idx) {
EXPECT_EQ(expected_by_docid[idx], got_by_docid[idx].docid);
}
}
- check_with_distance_threshold(docid);
+ if (!exp_hits.empty()) {
+ check_with_distance_threshold(docid);
+ }
}
void check_with_distance_threshold(uint32_t docid) {
auto qv = vectors.get_vector(docid, 0);
auto df = index->distance_function_factory().for_query_vector(qv);
uint32_t k = 3;
- auto rv = index->top_k_candidates(*df, k, global_filter->ptr_if_active()).peek();
+ auto rv = index->top_k_candidates(*df, k, global_filter->ptr_if_active(), _doom->get_doom()).peek();
std::sort(rv.begin(), rv.end(), LesserDistance());
EXPECT_EQ(rv.size(), 3);
EXPECT_LE(rv[0].distance, rv[1].distance);
double thr = (rv[0].distance + rv[1].distance) * 0.5;
auto got_by_docid = (global_filter->is_active())
- ? index->find_top_k_with_filter(k, *df, *global_filter, k, thr)
- : index->find_top_k(k, *df, k, thr);
+ ? index->find_top_k_with_filter(k, *df, *global_filter, k, _doom->get_doom(), thr)
+ : index->find_top_k(k, *df, k, _doom->get_doom(), thr);
EXPECT_EQ(got_by_docid.size(), 1);
EXPECT_EQ(got_by_docid[0].docid, index->get_docid(rv[0].nodeid));
for (const auto & hit : got_by_docid) {
@@ -262,6 +267,12 @@ public:
HnswIndexLoader<VectorBufferReader, IndexType::index_type> loader(graph, id_mapping, std::make_unique<VectorBufferReader>(data));
while (loader.load_next()) {}
}
+ void reset_doom() {
+ _doom = std::make_unique<vespalib::FakeDoom>();
+ }
+ void reset_doom(vespalib::steady_time::duration time_to_doom) {
+ _doom = std::make_unique<vespalib::FakeDoom>(time_to_doom);
+ }
static constexpr bool is_single = std::is_same_v<IndexType, HnswIndex<HnswIndexType::SINGLE>>;
};
@@ -334,6 +345,8 @@ TYPED_TEST(HnswIndexTest, 2d_vectors_inserted_in_level_0_graph_with_simple_selec
this->expect_top_3(7, {3, 2});
this->expect_top_3(8, {4, 3});
this->expect_top_3(9, {3, 2});
+ this->reset_doom(-1s);
+ this->expect_top_3(2, {});
}
TYPED_TEST(HnswIndexTest, 2d_vectors_inserted_and_removed)
@@ -824,6 +837,10 @@ TEST_F(HnswMultiIndexTest, duplicate_docid_is_removed)
this->expect_top_3_by_docid("{2, 0}", {2, 0}, {1, 2, 4});
this->expect_top_3_by_docid("{2, 1}", {2, 1}, {2, 3, 4});
this->expect_top_3_by_docid("{2, 2}", {2, 2}, {1, 3, 4});
+ this->reset_doom(-1s); // 1s beyond doom => no hits
+ this->expect_top_3_by_docid("{2, 2}", {2, 2}, {});
+ this->reset_doom();
+ this->expect_top_3_by_docid("{2, 2}", {2, 2}, {1, 3, 4});
auto filter = std::make_shared<MyGlobalFilter>(GlobalFilter::create({1, 2}, 3));
global_filter = filter;
this->expect_top_3_by_docid("{2,2}", {2, 2}, {1, 2});
diff --git a/searchlib/src/vespa/searchlib/attribute/attribute_blueprint_factory.cpp b/searchlib/src/vespa/searchlib/attribute/attribute_blueprint_factory.cpp
index 7a622030d98..62f76b5cee0 100644
--- a/searchlib/src/vespa/searchlib/attribute/attribute_blueprint_factory.cpp
+++ b/searchlib/src/vespa/searchlib/attribute/attribute_blueprint_factory.cpp
@@ -835,7 +835,8 @@ public:
n.get_explore_additional_hits(),
n.get_distance_threshold(),
getRequestContext().get_attribute_blueprint_params().global_filter_lower_limit,
- getRequestContext().get_attribute_blueprint_params().global_filter_upper_limit));
+ getRequestContext().get_attribute_blueprint_params().global_filter_upper_limit,
+ getRequestContext().getDoom()));
} catch (const vespalib::IllegalArgumentException& ex) {
return fail_nearest_neighbor_term(n, ex.getMessage());
diff --git a/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_blueprint.cpp b/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_blueprint.cpp
index 7c307a1e35f..87ddb8b6edc 100644
--- a/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_blueprint.cpp
+++ b/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_blueprint.cpp
@@ -14,6 +14,8 @@ LOG_SETUP(".searchlib.queryeval.nearest_neighbor_blueprint");
using vespalib::eval::Value;
+namespace vespalib { class Doom; }
+
namespace search::queryeval {
namespace {
@@ -40,7 +42,8 @@ NearestNeighborBlueprint::NearestNeighborBlueprint(const queryeval::FieldSpec& f
uint32_t explore_additional_hits,
double distance_threshold,
double global_filter_lower_limit,
- double global_filter_upper_limit)
+ double global_filter_upper_limit,
+ const vespalib::Doom& doom)
: ComplexLeafBlueprint(field),
_distance_calc(std::move(distance_calc)),
_attr_tensor(_distance_calc->attribute_tensor()),
@@ -58,7 +61,8 @@ NearestNeighborBlueprint::NearestNeighborBlueprint(const queryeval::FieldSpec& f
_global_filter(GlobalFilter::create()),
_global_filter_set(false),
_global_filter_hits(),
- _global_filter_hit_ratio()
+ _global_filter_hit_ratio(),
+ _doom(doom)
{
if (distance_threshold < std::numeric_limits<double>::max()) {
_distance_threshold = _distance_calc->function().convert_threshold(distance_threshold);
@@ -109,10 +113,10 @@ NearestNeighborBlueprint::perform_top_k(const search::tensor::NearestNeighborInd
uint32_t k = _adjusted_target_hits;
const auto &df = _distance_calc->function();
if (_global_filter->is_active()) {
- _found_hits = nns_index->find_top_k_with_filter(k, df, *_global_filter, k + _explore_additional_hits, _distance_threshold);
+ _found_hits = nns_index->find_top_k_with_filter(k, df, *_global_filter, k + _explore_additional_hits, _doom, _distance_threshold);
_algorithm = Algorithm::INDEX_TOP_K_WITH_FILTER;
} else {
- _found_hits = nns_index->find_top_k(k, df, k + _explore_additional_hits, _distance_threshold);
+ _found_hits = nns_index->find_top_k(k, df, k + _explore_additional_hits, _doom, _distance_threshold);
_algorithm = Algorithm::INDEX_TOP_K;
}
}
diff --git a/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_blueprint.h b/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_blueprint.h
index 3defb34cffd..f88cdd5adb1 100644
--- a/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_blueprint.h
+++ b/searchlib/src/vespa/searchlib/queryeval/nearest_neighbor_blueprint.h
@@ -45,6 +45,7 @@ private:
bool _global_filter_set;
std::optional<uint32_t> _global_filter_hits;
std::optional<double> _global_filter_hit_ratio;
+ const vespalib::Doom& _doom;
void perform_top_k(const search::tensor::NearestNeighborIndex* nns_index);
public:
@@ -53,7 +54,8 @@ public:
uint32_t target_hits, bool approximate, uint32_t explore_additional_hits,
double distance_threshold,
double global_filter_lower_limit,
- double global_filter_upper_limit);
+ double global_filter_upper_limit,
+ const vespalib::Doom& doom);
NearestNeighborBlueprint(const NearestNeighborBlueprint&) = delete;
NearestNeighborBlueprint& operator=(const NearestNeighborBlueprint&) = delete;
~NearestNeighborBlueprint();
diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp b/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp
index f2f7ad212de..e3fa3f1f9f3 100644
--- a/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp
+++ b/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp
@@ -18,6 +18,7 @@
#include <vespa/vespalib/data/slime/inserter.h>
#include <vespa/vespalib/datastore/array_store.hpp>
#include <vespa/vespalib/datastore/compaction_strategy.h>
+#include <vespa/vespalib/util/doom.h>
#include <vespa/vespalib/util/memory_allocator.h>
#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/util/time.h>
@@ -373,12 +374,19 @@ HnswIndex<type>::search_layer_helper(
const BoundDistanceFunction &df,
uint32_t neighbors_to_find,
BestNeighbors& best_neighbors, uint32_t level, const GlobalFilter *filter,
- uint32_t nodeid_limit, uint32_t estimated_visited_nodes) const
+ uint32_t nodeid_limit, const vespalib::Doom* const doom,
+ uint32_t estimated_visited_nodes) const
{
NearestPriQ candidates;
GlobalFilterWrapper<type> filter_wrapper(filter);
filter_wrapper.clamp_nodeid_limit(nodeid_limit);
VisitedTracker visited(nodeid_limit, estimated_visited_nodes);
+ if (doom != nullptr && doom->soft_doom()) {
+ while (!best_neighbors.empty()) {
+ best_neighbors.pop();
+ }
+ return;
+ }
for (const auto &entry : best_neighbors.peek()) {
if (entry.nodeid >= nodeid_limit) {
continue;
@@ -423,6 +431,9 @@ HnswIndex<type>::search_layer_helper(
}
}
}
+ if (doom != nullptr && doom->soft_doom()) {
+ break;
+ }
}
}
@@ -432,14 +443,15 @@ void
HnswIndex<type>::search_layer(
const BoundDistanceFunction &df,
uint32_t neighbors_to_find,
- BestNeighbors& best_neighbors, uint32_t level, const GlobalFilter *filter) const
+ BestNeighbors& best_neighbors, uint32_t level,
+ const vespalib::Doom* const doom, const GlobalFilter *filter) const
{
uint32_t nodeid_limit = _graph.nodes_size.load(std::memory_order_acquire);
uint32_t estimated_visited_nodes = estimate_visited_nodes(level, nodeid_limit, neighbors_to_find, filter);
if (estimated_visited_nodes >= nodeid_limit / 128) {
- search_layer_helper<BitVectorVisitedTracker>(df, neighbors_to_find, best_neighbors, level, filter, nodeid_limit, estimated_visited_nodes);
+ search_layer_helper<BitVectorVisitedTracker>(df, neighbors_to_find, best_neighbors, level, filter, nodeid_limit, doom, estimated_visited_nodes);
} else {
- search_layer_helper<HashSetVisitedTracker>(df, neighbors_to_find, best_neighbors, level, filter, nodeid_limit, estimated_visited_nodes);
+ search_layer_helper<HashSetVisitedTracker>(df, neighbors_to_find, best_neighbors, level, filter, nodeid_limit, doom, estimated_visited_nodes);
}
}
@@ -522,7 +534,7 @@ HnswIndex<type>::internal_prepare_add_node(PreparedAddDoc& op, TypedCells input_
search_level = std::min(node_max_level, search_level);
// Find neighbors of the added document in each level it should exist in.
while (search_level >= 0) {
- search_layer(*df, _cfg.neighbors_to_explore_at_construction(), best_neighbors, search_level);
+ search_layer(*df, _cfg.neighbors_to_explore_at_construction(), best_neighbors, search_level, nullptr);
auto neighbors = select_neighbors(best_neighbors.peek(), _cfg.max_links_on_inserts());
auto& links = connections[search_level];
links.reserve(neighbors.used.size());
@@ -887,9 +899,10 @@ HnswIndex<type>::top_k_by_docid(
uint32_t k,
const BoundDistanceFunction &df,
const GlobalFilter *filter, uint32_t explore_k,
+ const vespalib::Doom& doom,
double distance_threshold) const
{
- SearchBestNeighbors candidates = top_k_candidates(df, std::max(k, explore_k), filter);
+ SearchBestNeighbors candidates = top_k_candidates(df, std::max(k, explore_k), filter, doom);
auto result = candidates.get_neighbors(k, distance_threshold);
std::sort(result.begin(), result.end(), NeighborsByDocId());
return result;
@@ -901,9 +914,10 @@ HnswIndex<type>::find_top_k(
uint32_t k,
const BoundDistanceFunction &df,
uint32_t explore_k,
+ const vespalib::Doom& doom,
double distance_threshold) const
{
- return top_k_by_docid(k, df, nullptr, explore_k, distance_threshold);
+ return top_k_by_docid(k, df, nullptr, explore_k, doom, distance_threshold);
}
template <HnswIndexType type>
@@ -912,16 +926,18 @@ HnswIndex<type>::find_top_k_with_filter(
uint32_t k,
const BoundDistanceFunction &df,
const GlobalFilter &filter, uint32_t explore_k,
+ const vespalib::Doom& doom,
double distance_threshold) const
{
- return top_k_by_docid(k, df, &filter, explore_k, distance_threshold);
+ return top_k_by_docid(k, df, &filter, explore_k, doom, distance_threshold);
}
template <HnswIndexType type>
typename HnswIndex<type>::SearchBestNeighbors
HnswIndex<type>::top_k_candidates(
const BoundDistanceFunction &df,
- uint32_t k, const GlobalFilter *filter) const
+ uint32_t k, const GlobalFilter *filter,
+ const vespalib::Doom& doom) const
{
SearchBestNeighbors best_neighbors;
auto entry = _graph.get_entry_node();
@@ -939,7 +955,7 @@ HnswIndex<type>::top_k_candidates(
--search_level;
}
best_neighbors.push(entry_point);
- search_layer(df, k, best_neighbors, 0, filter);
+ search_layer(df, k, best_neighbors, 0, &doom, filter);
return best_neighbors;
}
diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_index.h b/searchlib/src/vespa/searchlib/tensor/hnsw_index.h
index 20ab0dbea92..1ea8d1be558 100644
--- a/searchlib/src/vespa/searchlib/tensor/hnsw_index.h
+++ b/searchlib/src/vespa/searchlib/tensor/hnsw_index.h
@@ -170,12 +170,15 @@ protected:
void search_layer_helper(const BoundDistanceFunction &df, uint32_t neighbors_to_find, BestNeighbors& best_neighbors,
uint32_t level, const GlobalFilter *filter,
uint32_t nodeid_limit,
+ const vespalib::Doom* const doom,
uint32_t estimated_visited_nodes) const;
template <class BestNeighbors>
void search_layer(const BoundDistanceFunction &df, uint32_t neighbors_to_find, BestNeighbors& best_neighbors,
- uint32_t level, const GlobalFilter *filter = nullptr) const;
+ uint32_t level, const vespalib::Doom* const doom,
+ const GlobalFilter *filter = nullptr) const;
std::vector<Neighbor> top_k_by_docid(uint32_t k, const BoundDistanceFunction &df,
const GlobalFilter *filter, uint32_t explore_k,
+ const vespalib::Doom& doom,
double distance_threshold) const;
internal::PreparedAddDoc internal_prepare_add(uint32_t docid, VectorBundle input_vectors,
@@ -217,19 +220,22 @@ public:
uint32_t k,
const BoundDistanceFunction &df,
uint32_t explore_k,
+ const vespalib::Doom& doom,
double distance_threshold) const override;
std::vector<Neighbor> find_top_k_with_filter(
uint32_t k,
const BoundDistanceFunction &df,
const GlobalFilter &filter, uint32_t explore_k,
+ const vespalib::Doom& doom,
double distance_threshold) const override;
DistanceFunctionFactory &distance_function_factory() const override { return *_distance_ff; }
SearchBestNeighbors top_k_candidates(
const BoundDistanceFunction &df,
- uint32_t k, const GlobalFilter *filter) const;
+ uint32_t k, const GlobalFilter *filter,
+ const vespalib::Doom& doom) const;
uint32_t get_entry_nodeid() const { return _graph.get_entry_node().nodeid; }
int32_t get_entry_level() const { return _graph.get_entry_node().level; }
diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_multi_best_neighbors.h b/searchlib/src/vespa/searchlib/tensor/hnsw_multi_best_neighbors.h
index de707999f11..67eff4b33c7 100644
--- a/searchlib/src/vespa/searchlib/tensor/hnsw_multi_best_neighbors.h
+++ b/searchlib/src/vespa/searchlib/tensor/hnsw_multi_best_neighbors.h
@@ -57,6 +57,7 @@ public:
_candidates.pop();
}
const HnswCandidateVector& peek() const { return _candidates.peek(); }
+ bool empty() const { return _candidates.empty(); }
const HnswCandidate& top() const { return _candidates.top(); }
size_t size() const { return _docids.size(); }
void emplace(uint32_t nodeid, uint32_t docid, EntryRef ref, double distance) {
diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_single_best_neighbors.h b/searchlib/src/vespa/searchlib/tensor/hnsw_single_best_neighbors.h
index e7c0a7fded6..acb14f79b7a 100644
--- a/searchlib/src/vespa/searchlib/tensor/hnsw_single_best_neighbors.h
+++ b/searchlib/src/vespa/searchlib/tensor/hnsw_single_best_neighbors.h
@@ -25,6 +25,7 @@ public:
void push(const HnswCandidate& candidate) { _candidates.push(candidate); }
void pop() { _candidates.pop(); }
const HnswCandidateVector& peek() const { return _candidates.peek(); }
+ bool empty() const { return _candidates.empty(); }
const HnswCandidate& top() const { return _candidates.top(); }
size_t size() const { return _candidates.size(); }
void emplace(uint32_t nodeid, uint32_t docid, EntryRef ref, double distance) { _candidates.emplace(nodeid, docid, ref, distance); }
diff --git a/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index.h b/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index.h
index 9cd1065a356..a11be086697 100644
--- a/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index.h
+++ b/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index.h
@@ -14,6 +14,7 @@
class FastOS_FileInterface;
+namespace vespalib { class Doom; }
namespace vespalib { class GenericHeader; }
namespace vespalib::datastore {
class CompactionSpec;
@@ -101,6 +102,7 @@ public:
virtual std::vector<Neighbor> find_top_k(uint32_t k,
const BoundDistanceFunction &df,
uint32_t explore_k,
+ const vespalib::Doom& doom,
double distance_threshold) const = 0;
// only return neighbors where the corresponding filter bit is set
@@ -108,6 +110,7 @@ public:
const BoundDistanceFunction &df,
const GlobalFilter &filter,
uint32_t explore_k,
+ const vespalib::Doom& doom,
double distance_threshold) const = 0;
virtual DistanceFunctionFactory &distance_function_factory() const = 0;
diff --git a/vespalib/src/vespa/vespalib/util/CMakeLists.txt b/vespalib/src/vespa/vespalib/util/CMakeLists.txt
index 91365d446c1..21642cbd842 100644
--- a/vespalib/src/vespa/vespalib/util/CMakeLists.txt
+++ b/vespalib/src/vespa/vespalib/util/CMakeLists.txt
@@ -31,6 +31,7 @@ vespa_add_library(vespalib_vespalib_util OBJECT
exceptions.cpp
execution_profiler.cpp
executor_idle_tracking.cpp
+ fake_doom.cpp
featureset.cpp
file_area_freelist.cpp
foregroundtaskexecutor.cpp
diff --git a/vespalib/src/vespa/vespalib/util/fake_doom.cpp b/vespalib/src/vespa/vespalib/util/fake_doom.cpp
new file mode 100644
index 00000000000..4ca71afd2c5
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/util/fake_doom.cpp
@@ -0,0 +1,16 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "fake_doom.h"
+
+namespace vespalib {
+
+FakeDoom::FakeDoom(steady_time::duration time_to_doom)
+ : _time(steady_clock::now()),
+ _clock(_time),
+ _doom(_clock, _clock.getTimeNS() + time_to_doom)
+{
+}
+
+FakeDoom::~FakeDoom() = default;
+
+}
diff --git a/vespalib/src/vespa/vespalib/util/fake_doom.h b/vespalib/src/vespa/vespalib/util/fake_doom.h
new file mode 100644
index 00000000000..496129d8f0f
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/util/fake_doom.h
@@ -0,0 +1,24 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "doom.h"
+
+namespace vespalib {
+
+/*
+ * Class containing a fake doom controlled by the time_to_doom
+ * constructor argument.
+ */
+class FakeDoom {
+ std::atomic<steady_time> _time;
+ Clock _clock;
+ Doom _doom;
+public:
+ FakeDoom() : FakeDoom(1s) { }
+ FakeDoom(steady_time::duration time_to_doom);
+ ~FakeDoom();
+ const Doom& get_doom() const noexcept { return _doom; }
+};
+
+}