aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--client/go/auth0/auth0.go (renamed from client/go/cli/cli.go)96
-rw-r--r--client/go/cmd/config.go2
-rw-r--r--client/go/cmd/login.go6
-rw-r--r--client/go/vespa/target.go6
-rw-r--r--searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp12
-rw-r--r--searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp26
-rw-r--r--searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/config/proton.def7
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp45
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp31
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h23
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/threading_service_config.h7
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h4
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h6
17 files changed, 181 insertions, 119 deletions
diff --git a/client/go/cli/cli.go b/client/go/auth0/auth0.go
index 22c40f195b4..92bd1178fec 100644
--- a/client/go/cli/cli.go
+++ b/client/go/auth0/auth0.go
@@ -1,6 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package cli
+package auth0
import (
"context"
@@ -37,7 +37,7 @@ type System struct {
ExpiresAt time.Time `json:"expires_at"`
}
-type Cli struct {
+type Auth0 struct {
Authenticator *auth.Authenticator
system string
initOnce sync.Once
@@ -68,36 +68,36 @@ func ContextWithCancel() context.Context {
return ctx
}
-// GetCli will try to initialize the config context, as well as figure out if
+// GetAuth0 will try to initialize the config context, as well as figure out if
// there's a readily available system.
-func GetCli(configPath string, systemName string) (*Cli, error) {
- c := Cli{}
- c.Path = configPath
- c.system = systemName
+func GetAuth0(configPath string, systemName string) (*Auth0, error) {
+ a := Auth0{}
+ a.Path = configPath
+ a.system = systemName
if err := envdecode.StrictDecode(&authCfg); err != nil {
return nil, fmt.Errorf("could not decode env: %w", err)
}
- c.Authenticator = &auth.Authenticator{
+ a.Authenticator = &auth.Authenticator{
Audience: authCfg.Audience,
ClientID: authCfg.ClientID,
DeviceCodeEndpoint: authCfg.DeviceCodeEndpoint,
OauthTokenEndpoint: authCfg.OauthTokenEndpoint,
}
- return &c, nil
+ return &a, nil
}
// IsLoggedIn encodes the domain logic for determining whether we're
// logged in. This might check our config storage, or just in memory.
-func (c *Cli) IsLoggedIn() bool {
+func (a *Auth0) IsLoggedIn() bool {
// No need to check errors for initializing context.
- _ = c.init()
+ _ = a.init()
- if c.system == "" {
+ if a.system == "" {
return false
}
// Parse the access token for the system.
- token, err := jwt.ParseString(c.config.Systems[c.system].AccessToken)
+ token, err := jwt.ParseString(a.config.Systems[a.system].AccessToken)
if err != nil {
return false
}
@@ -114,17 +114,17 @@ func (c *Cli) IsLoggedIn() bool {
// The System access token needs a refresh if:
// 1. the System scopes are different from the currently required scopes - (auth0 changes).
// 2. the access token is expired.
-func (c *Cli) PrepareSystem(ctx context.Context) (System, error) {
- if err := c.init(); err != nil {
+func (a *Auth0) PrepareSystem(ctx context.Context) (System, error) {
+ if err := a.init(); err != nil {
return System{}, err
}
- s, err := c.getSystem()
+ s, err := a.getSystem()
if err != nil {
return System{}, err
}
if s.AccessToken == "" || scopesChanged(s) {
- s, err = RunLogin(ctx, c, true)
+ s, err = RunLogin(ctx, a, true)
if err != nil {
return System{}, err
}
@@ -132,16 +132,16 @@ func (c *Cli) PrepareSystem(ctx context.Context) (System, error) {
// check if the stored access token is expired:
// use the refresh token to get a new access token:
tr := &auth.TokenRetriever{
- Authenticator: c.Authenticator,
+ Authenticator: a.Authenticator,
Secrets: &auth.Keyring{},
Client: http.DefaultClient,
}
- res, err := tr.Refresh(ctx, c.system)
+ res, err := tr.Refresh(ctx, a.system)
if err != nil {
// ask and guide the user through the login process:
fmt.Println(fmt.Errorf("failed to renew access token, %s", err))
- s, err = RunLogin(ctx, c, true)
+ s, err = RunLogin(ctx, a, true)
if err != nil {
return System{}, err
}
@@ -152,7 +152,7 @@ func (c *Cli) PrepareSystem(ctx context.Context) (System, error) {
time.Duration(res.ExpiresIn) * time.Second,
)
- err = c.AddSystem(s)
+ err = a.AddSystem(s)
if err != nil {
return System{}, err
}
@@ -193,14 +193,14 @@ func scopesChanged(s System) bool {
return false
}
-func (c *Cli) getSystem() (System, error) {
- if err := c.init(); err != nil {
+func (a *Auth0) getSystem() (System, error) {
+ if err := a.init(); err != nil {
return System{}, err
}
- s, ok := c.config.Systems[c.system]
+ s, ok := a.config.Systems[a.system]
if !ok {
- return System{}, fmt.Errorf("unable to find system: %s; run 'vespa login' to configure a new system", c.system)
+ return System{}, fmt.Errorf("unable to find system: %s; run 'vespa login' to configure a new system", a.system)
}
return s, nil
@@ -208,63 +208,63 @@ func (c *Cli) getSystem() (System, error) {
// AddSystem assigns an existing, or new System. This is expected to be called
// after a login has completed.
-func (c *Cli) AddSystem(s System) error {
- _ = c.init()
+func (a *Auth0) AddSystem(s System) error {
+ _ = a.init()
// If we're dealing with an empty file, we'll need to initialize this map.
- if c.config.Systems == nil {
- c.config.Systems = map[string]System{}
+ if a.config.Systems == nil {
+ a.config.Systems = map[string]System{}
}
- c.config.Systems[c.system] = s
+ a.config.Systems[a.system] = s
- if err := c.persistConfig(); err != nil {
+ if err := a.persistConfig(); err != nil {
return fmt.Errorf("unexpected error persisting config: %w", err)
}
return nil
}
-func (c *Cli) persistConfig() error {
- dir := filepath.Dir(c.Path)
+func (a *Auth0) persistConfig() error {
+ dir := filepath.Dir(a.Path)
if _, err := os.Stat(dir); os.IsNotExist(err) {
if err := os.MkdirAll(dir, 0700); err != nil {
return err
}
}
- buf, err := json.MarshalIndent(c.config, "", " ")
+ buf, err := json.MarshalIndent(a.config, "", " ")
if err != nil {
return err
}
- if err := ioutil.WriteFile(c.Path, buf, 0600); err != nil {
+ if err := ioutil.WriteFile(a.Path, buf, 0600); err != nil {
return err
}
return nil
}
-func (c *Cli) init() error {
- c.initOnce.Do(func() {
- if c.errOnce = c.initContext(); c.errOnce != nil {
+func (a *Auth0) init() error {
+ a.initOnce.Do(func() {
+ if a.errOnce = a.initContext(); a.errOnce != nil {
return
}
})
- return c.errOnce
+ return a.errOnce
}
-func (c *Cli) initContext() (err error) {
- if _, err := os.Stat(c.Path); os.IsNotExist(err) {
+func (a *Auth0) initContext() (err error) {
+ if _, err := os.Stat(a.Path); os.IsNotExist(err) {
return errUnauthenticated
}
var buf []byte
- if buf, err = ioutil.ReadFile(c.Path); err != nil {
+ if buf, err = ioutil.ReadFile(a.Path); err != nil {
return err
}
- if err := json.Unmarshal(buf, &c.config); err != nil {
+ if err := json.Unmarshal(buf, &a.config); err != nil {
return err
}
@@ -275,12 +275,12 @@ func (c *Cli) initContext() (err error) {
// by showing the login instructions, opening the browser.
// Use `expired` to run the login from other commands setup:
// this will only affect the messages.
-func RunLogin(ctx context.Context, c *Cli, expired bool) (System, error) {
+func RunLogin(ctx context.Context, a *Auth0, expired bool) (System, error) {
if expired {
fmt.Println("Please sign in to re-authorize the CLI.")
}
- state, err := c.Authenticator.Start(ctx)
+ state, err := a.Authenticator.Start(ctx)
if err != nil {
return System{}, fmt.Errorf("could not start the authentication process: %w", err)
}
@@ -297,7 +297,7 @@ func RunLogin(ctx context.Context, c *Cli, expired bool) (System, error) {
var res auth.Result
err = util.Spinner("Waiting for login to complete in browser", func() error {
- res, err = c.Authenticator.Wait(ctx, state)
+ res, err = a.Authenticator.Wait(ctx, state)
return err
})
@@ -311,7 +311,7 @@ func RunLogin(ctx context.Context, c *Cli, expired bool) (System, error) {
// store the refresh token
secretsStore := &auth.Keyring{}
- err = secretsStore.Set(auth.SecretsNamespace, c.system, res.RefreshToken)
+ err = secretsStore.Set(auth.SecretsNamespace, a.system, res.RefreshToken)
if err != nil {
// log the error but move on
fmt.Println("Could not store the refresh token locally, please expect to login again once your access token expired.")
@@ -322,7 +322,7 @@ func RunLogin(ctx context.Context, c *Cli, expired bool) (System, error) {
ExpiresAt: time.Now().Add(time.Duration(res.ExpiresIn) * time.Second),
Scopes: auth.RequiredScopes(),
}
- err = c.AddSystem(s)
+ err = a.AddSystem(s)
if err != nil {
return System{}, fmt.Errorf("could not add system to config: %w", err)
}
diff --git a/client/go/cmd/config.go b/client/go/cmd/config.go
index 0b08a2dc28d..62a97d9749d 100644
--- a/client/go/cmd/config.go
+++ b/client/go/cmd/config.go
@@ -149,7 +149,7 @@ func (c *Config) ReadAPIKey(tenantName string) ([]byte, error) {
}
func (c *Config) AuthConfigPath() string {
- return filepath.Join(c.Home, "auth", "config.json")
+ return filepath.Join(c.Home, "auth0.json")
}
func (c *Config) ReadSessionID(app vespa.ApplicationID) (int64, error) {
diff --git a/client/go/cmd/login.go b/client/go/cmd/login.go
index 415d44b75db..0e09a6d6244 100644
--- a/client/go/cmd/login.go
+++ b/client/go/cmd/login.go
@@ -2,7 +2,7 @@ package cmd
import (
"github.com/spf13/cobra"
- "github.com/vespa-engine/vespa/client/go/cli"
+ "github.com/vespa-engine/vespa/client/go/auth0"
"github.com/vespa-engine/vespa/client/go/vespa"
)
@@ -24,11 +24,11 @@ var loginCmd = &cobra.Command{
if err != nil {
return err
}
- c, err := cli.GetCli(cfg.AuthConfigPath(), getSystemName())
+ a, err := auth0.GetAuth0(cfg.AuthConfigPath(), getSystemName())
if err != nil {
return err
}
- _, err = cli.RunLogin(ctx, c, false)
+ _, err = auth0.RunLogin(ctx, a, false)
return err
},
}
diff --git a/client/go/vespa/target.go b/client/go/vespa/target.go
index 367685df34d..e4779e14c0d 100644
--- a/client/go/vespa/target.go
+++ b/client/go/vespa/target.go
@@ -18,7 +18,7 @@ import (
"strings"
"time"
- "github.com/vespa-engine/vespa/client/go/cli"
+ "github.com/vespa-engine/vespa/client/go/auth0"
"github.com/vespa-engine/vespa/client/go/util"
)
@@ -255,8 +255,8 @@ func (t *cloudTarget) PrepareApiRequest(req *http.Request, sigKeyId string) erro
}
func (t *cloudTarget) addAuth0AccessToken(request *http.Request) error {
- c, err := cli.GetCli(t.authConfigPath, t.systemName)
- system, err := c.PrepareSystem(cli.ContextWithCancel())
+ a, err := auth0.GetAuth0(t.authConfigPath, t.systemName)
+ system, err := a.PrepareSystem(auth0.ContextWithCancel())
if err != nil {
return err
}
diff --git a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp
index 714ffaa16b7..32707f8a69f 100644
--- a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp
@@ -75,5 +75,17 @@ TEST_F(ExecutorThreadingServiceTest, shared_executor_for_index_and_attribute_fie
assert_executor(index_inverter(), 12, 100);
}
+TEST_F(ExecutorThreadingServiceTest, tasks_limits_can_be_updated)
+{
+ setup(4, SharedFieldWriterExecutor::NONE);
+ service->set_task_limits(5, 7, 11);
+ EXPECT_EQ(5, service->master_task_limit());
+ EXPECT_EQ(7, service->index().getTaskLimit());
+ EXPECT_EQ(11, service->summary().getTaskLimit());
+ EXPECT_EQ(7, index_inverter()->first_executor()->getTaskLimit());
+ EXPECT_EQ(7, index_writer()->first_executor()->getTaskLimit());
+ EXPECT_EQ(7, attribute_writer()->first_executor()->getTaskLimit());
+}
+
GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp
index 353ebeb3abc..50a8349b859 100644
--- a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp
@@ -14,15 +14,15 @@ using ProtonConfigBuilder = vespa::config::search::core::ProtonConfigBuilder;
struct Fixture {
ProtonConfig cfg;
- Fixture(uint32_t baseLineIndexingThreads = 2, uint32_t task_limit = 500, uint32_t semi_unbound_task_limit = 50000)
- : cfg(makeConfig(baseLineIndexingThreads, task_limit, semi_unbound_task_limit))
+ Fixture(uint32_t baseLineIndexingThreads = 2, uint32_t master_task_limit = 2000, uint32_t task_limit = 500)
+ : cfg(makeConfig(baseLineIndexingThreads, master_task_limit, task_limit))
{
}
- ProtonConfig makeConfig(uint32_t baseLineIndexingThreads, uint32_t task_limit, uint32_t semi_unbound_task_limit) {
+ ProtonConfig makeConfig(uint32_t baseLineIndexingThreads, uint32_t master_task_limit, uint32_t task_limit) {
ProtonConfigBuilder builder;
builder.indexing.threads = baseLineIndexingThreads;
builder.indexing.tasklimit = task_limit;
- builder.indexing.semiunboundtasklimit = semi_unbound_task_limit;
+ builder.feeding.masterTaskLimit = master_task_limit;
return builder;
}
ThreadingServiceConfig make(uint32_t cpuCores) {
@@ -51,28 +51,32 @@ TEST_F("require that indexing threads is always >= 1", Fixture(0))
TEST_DO(f.assertIndexingThreads(1, 0));
}
-TEST_F("require that default task limit is set", Fixture)
+TEST_F("require that task limits are set", Fixture)
{
- EXPECT_EQUAL(500u, f.make(24).defaultTaskLimit());
+ auto tcfg = f.make(24);
+ EXPECT_EQUAL(2000u, tcfg.master_task_limit());
+ EXPECT_EQUAL(500u, tcfg.defaultTaskLimit());
}
namespace {
-void assertConfig(uint32_t exp_indexing_threads, uint32_t exp_default_task_limit, const ThreadingServiceConfig &config) {
+void assertConfig(uint32_t exp_indexing_threads, uint32_t exp_master_task_limit,
+ uint32_t exp_default_task_limit, const ThreadingServiceConfig& config) {
EXPECT_EQUAL(exp_indexing_threads, config.indexingThreads());
+ EXPECT_EQUAL(exp_master_task_limit, config.master_task_limit());
EXPECT_EQUAL(exp_default_task_limit, config.defaultTaskLimit());
}
}
-TEST_FF("require that config can be somewhat updated", Fixture(), Fixture(2, 1000, 100000))
+TEST_FF("require that config can be somewhat updated", Fixture(), Fixture(2, 3000, 1000))
{
auto cfg1 = f1.make(1);
- assertConfig(2u, 500u, cfg1);
+ assertConfig(2u, 2000, 500u, cfg1);
const auto cfg2 = f2.make(13);
- assertConfig(3u, 1000u, cfg2);
+ assertConfig(3u, 3000u, 1000u, cfg2);
cfg1.update(cfg2);
- assertConfig(2u, 1000u, cfg1); // Indexing threads not changed
+ assertConfig(2u, 3000u, 1000u, cfg1); // Indexing threads not changed
}
TEST_MAIN()
diff --git a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
index c252c89a2f8..ce0b09a48b8 100644
--- a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
+++ b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
@@ -18,7 +18,6 @@
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/testkit/testapp.h>
-#include <algorithm>
#include <set>
using document::BucketId;
@@ -287,12 +286,6 @@ struct MyHandler : public IPersistenceHandler, IBucketFreezer {
ASSERT_TRUE(it != frozen.end());
frozen.erase(it);
}
- bool isFrozen(const Bucket &bucket) {
- return frozen.find(bucket.getBucketId().getId()) != frozen.end();
- }
- bool wasFrozen(const Bucket &bucket) {
- return was_frozen.find(bucket.getBucketId().getId()) != was_frozen.end();
- }
};
diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def
index 6b85f4a6829..5857fdd4f8d 100644
--- a/searchcore/src/vespa/searchcore/config/proton.def
+++ b/searchcore/src/vespa/searchcore/config/proton.def
@@ -503,6 +503,13 @@ feeding.concurrency double default = 0.2 restart
## TODO: Remove this when a shared executor is the default.
feeding.shared_field_writer_executor enum {NONE, INDEX, INDEX_AND_ATTRIBUTE, DOCUMENT_DB} default = NONE restart
+## Maximum number of pending tasks for the master thread in each document db.
+##
+## This limit is only considered when executing tasks for handling external feed operations.
+## In that case the calling thread (persistence thread) is blocked until the master thread has capacity to handle more tasks.
+## When this limit is set to 0 it is ignored.
+feeding.master_task_limit int default = 0
+
## Adjustment to resource limit when determining if maintenance jobs can run.
##
## Currently used by 'lid_space_compaction' and 'move_buckets' jobs.
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
index 04d91b9c028..9d9e1e0a344 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
@@ -29,7 +29,7 @@ using storage::spi::Result;
using storage::spi::OperationComplete;
using vespalib::IllegalStateException;
using vespalib::Sequence;
-using vespalib::make_string;
+using vespalib::make_string_short::fmt;
using std::make_unique;
using namespace std::chrono_literals;
@@ -347,8 +347,7 @@ PersistenceEngine::putAsync(const Bucket &bucket, Timestamp ts, storage::spi::Do
IResourceWriteFilter::State state = _writeFilter.getAcceptState();
if (!state.acceptWriteOperation()) {
return onComplete->onComplete(std::make_unique<Result>(Result::ErrorType::RESOURCE_EXHAUSTED,
- make_string("Put operation rejected for document '%s': '%s'",
- doc->getId().toString().c_str(), state.message().c_str())));
+ fmt("Put operation rejected for document '%s': '%s'", doc->getId().toString().c_str(), state.message().c_str())));
}
}
ReadGuard rguard(_rwMutex);
@@ -357,12 +356,12 @@ PersistenceEngine::putAsync(const Bucket &bucket, Timestamp ts, storage::spi::Do
docType.toString().c_str(), doc->getId().toString().c_str());
if (!doc->getId().hasDocType()) {
return onComplete->onComplete(std::make_unique<Result>(Result::ErrorType::PERMANENT_ERROR,
- make_string("Old id scheme not supported in elastic mode (%s)", doc->getId().toString().c_str())));
+ fmt("Old id scheme not supported in elastic mode (%s)", doc->getId().toString().c_str())));
}
IPersistenceHandler * handler = getHandler(rguard, bucket.getBucketSpace(), docType);
if (!handler) {
return onComplete->onComplete(std::make_unique<Result>(Result::ErrorType::PERMANENT_ERROR,
- make_string("No handler for document type '%s'", docType.toString().c_str())));
+ fmt("No handler for document type '%s'", docType.toString().c_str())));
}
auto transportContext = std::make_shared<AsyncTransportContext>(1, std::move(onComplete));
handler->handlePut(feedtoken::make(std::move(transportContext)), bucket, ts, std::move(doc));
@@ -376,13 +375,13 @@ PersistenceEngine::removeAsync(const Bucket& b, Timestamp t, const DocumentId& d
static_cast<uint64_t>(t.getValue()), did.toString().c_str());
if (!did.hasDocType()) {
return onComplete->onComplete(std::make_unique<RemoveResult>(Result::ErrorType::PERMANENT_ERROR,
- make_string("Old id scheme not supported in elastic mode (%s)", did.toString().c_str())));
+ fmt("Old id scheme not supported in elastic mode (%s)", did.toString().c_str())));
}
DocTypeName docType(did.getDocType());
IPersistenceHandler * handler = getHandler(rguard, b.getBucketSpace(), docType);
if (!handler) {
return onComplete->onComplete(std::make_unique<RemoveResult>(Result::ErrorType::PERMANENT_ERROR,
- make_string("No handler for document type '%s'", docType.toString().c_str())));
+ fmt("No handler for document type '%s'", docType.toString().c_str())));
}
auto transportContext = std::make_shared<AsyncTransportContext>(1, std::move(onComplete));
handler->handleRemove(feedtoken::make(std::move(transportContext)), b, t, did);
@@ -396,27 +395,24 @@ PersistenceEngine::updateAsync(const Bucket& b, Timestamp t, DocumentUpdate::SP
IResourceWriteFilter::State state = _writeFilter.getAcceptState();
if (!state.acceptWriteOperation() && document::FeedRejectHelper::mustReject(*upd)) {
return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::RESOURCE_EXHAUSTED,
- make_string("Update operation rejected for document '%s': '%s'",
- upd->getId().toString().c_str(), state.message().c_str())));
+ fmt("Update operation rejected for document '%s': '%s'", upd->getId().toString().c_str(), state.message().c_str())));
}
}
try {
upd->eagerDeserialize();
} catch (document::FieldNotFoundException & e) {
return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::TRANSIENT_ERROR,
- make_string("Update operation rejected for document '%s' of type '%s': 'Field not found'",
- upd->getId().toString().c_str(), upd->getType().getName().c_str())));
+ fmt("Update operation rejected for document '%s' of type '%s': 'Field not found'",
+ upd->getId().toString().c_str(), upd->getType().getName().c_str())));
} catch (document::DocumentTypeNotFoundException & e) {
return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::TRANSIENT_ERROR,
- make_string("Update operation rejected for document '%s' of type '%s'.",
- upd->getId().toString().c_str(), e.getDocumentTypeName().c_str())));
+ fmt("Update operation rejected for document '%s' of type '%s'.",
+ upd->getId().toString().c_str(), e.getDocumentTypeName().c_str())));
} catch (document::WrongTensorTypeException &e) {
return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::TRANSIENT_ERROR,
- make_string("Update operation rejected for document '%s' of type '%s': 'Wrong tensor type: %s'",
- upd->getId().toString().c_str(),
- upd->getType().getName().c_str(),
- e.getMessage().c_str())));
+ fmt("Update operation rejected for document '%s' of type '%s': 'Wrong tensor type: %s'",
+ upd->getId().toString().c_str(), upd->getType().getName().c_str(), e.getMessage().c_str())));
}
ReadGuard rguard(_rwMutex);
DocTypeName docType(upd->getType());
@@ -425,16 +421,17 @@ PersistenceEngine::updateAsync(const Bucket& b, Timestamp t, DocumentUpdate::SP
upd->getId().toString().c_str(), (upd->getCreateIfNonExistent() ? "true" : "false"));
if (!upd->getId().hasDocType()) {
return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR,
- make_string("Old id scheme not supported in elastic mode (%s)", upd->getId().toString().c_str())));
+ fmt("Old id scheme not supported in elastic mode (%s)", upd->getId().toString().c_str())));
}
if (upd->getId().getDocType() != docType.getName()) {
return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR,
- make_string("Update operation rejected due to bad id (%s, %s)", upd->getId().toString().c_str(), docType.getName().c_str())));
+ fmt("Update operation rejected due to bad id (%s, %s)", upd->getId().toString().c_str(), docType.getName().c_str())));
}
IPersistenceHandler * handler = getHandler(rguard, b.getBucketSpace(), docType);
if (handler == nullptr) {
- return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str())));
+ return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR,
+ fmt("No handler for document type '%s'", docType.toString().c_str())));
}
auto transportContext = std::make_shared<AsyncTransportContext>(1, std::move(onComplete));
handler->handleUpdate(feedtoken::make(std::move(transportContext)), b, t, std::move(upd));
@@ -505,11 +502,11 @@ PersistenceEngine::iterate(IteratorId id, uint64_t maxByteSize, Context&) const
std::lock_guard<std::mutex> guard(_iterators_lock);
auto it = _iterators.find(id);
if (it == _iterators.end()) {
- return IterateResult(Result::ErrorType::PERMANENT_ERROR, make_string("Unknown iterator with id %" PRIu64, id.getValue()));
+ return IterateResult(Result::ErrorType::PERMANENT_ERROR, fmt("Unknown iterator with id %" PRIu64, id.getValue()));
}
iteratorEntry = it->second;
if (iteratorEntry->in_use) {
- return IterateResult(Result::ErrorType::TRANSIENT_ERROR, make_string("Iterator with id %" PRIu64 " is already in use", id.getValue()));
+ return IterateResult(Result::ErrorType::TRANSIENT_ERROR, fmt("Iterator with id %" PRIu64 " is already in use", id.getValue()));
}
iteratorEntry->in_use = true;
}
@@ -521,7 +518,7 @@ PersistenceEngine::iterate(IteratorId id, uint64_t maxByteSize, Context&) const
iteratorEntry->in_use = false;
return result;
} catch (const std::exception & e) {
- IterateResult result(Result::ErrorType::PERMANENT_ERROR, make_string("Caught exception during visitor iterator.iterate() = '%s'", e.what()));
+ IterateResult result(Result::ErrorType::PERMANENT_ERROR, fmt("Caught exception during visitor iterator.iterate() = '%s'", e.what()));
LOG(warning, "Caught exception during visitor iterator.iterate() = '%s'", e.what());
std::lock_guard<std::mutex> guard(_iterators_lock);
iteratorEntry->in_use = false;
@@ -540,7 +537,7 @@ PersistenceEngine::destroyIterator(IteratorId id, Context&)
return Result();
}
if (it->second->in_use) {
- return Result(Result::ErrorType::TRANSIENT_ERROR, make_string("Iterator with id %" PRIu64 " is currently in use", id.getValue()));
+ return Result(Result::ErrorType::TRANSIENT_ERROR, fmt("Iterator with id %" PRIu64 " is currently in use", id.getValue()));
}
delete it->second;
_iterators.erase(it);
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index 3f2fb6c4634..427d435aae7 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -482,7 +482,9 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum
if (_state.getState() >= DDBState::State::APPLY_LIVE_CONFIG) {
_writeServiceConfig.update(configSnapshot->get_threading_service_config());
}
- _writeService.setTaskLimit(_writeServiceConfig.defaultTaskLimit(), _writeServiceConfig.defaultTaskLimit());
+ _writeService.set_task_limits(_writeServiceConfig.master_task_limit(),
+ _writeServiceConfig.defaultTaskLimit(),
+ _writeServiceConfig.defaultTaskLimit());
if (params.shouldSubDbsChange()) {
applySubDBConfig(*configSnapshot, serialNum, params);
if (serialNum < _feedHandler->getSerialNum()) {
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
index 0e9ba7a24c8..7e0a1851bf5 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
@@ -42,11 +42,13 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor &sha
: ExecutorThreadingService(sharedExecutor, ThreadingServiceConfig::make(num_treads))
{}
-ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor & sharedExecutor,
- const ThreadingServiceConfig & cfg, uint32_t stackSize)
+ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor,
+ const ThreadingServiceConfig& cfg,
+ uint32_t stackSize)
: _sharedExecutor(sharedExecutor),
_masterExecutor(1, stackSize, master_executor),
+ _master_task_limit(cfg.master_task_limit()),
_indexExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), index_executor)),
_summaryExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), summary_executor)),
_masterService(_masterExecutor),
@@ -97,6 +99,16 @@ ExecutorThreadingService::sync_all_executors() {
}
void
+ExecutorThreadingService::blocking_master_execute(vespalib::Executor::Task::UP task)
+{
+ uint32_t limit = master_task_limit();
+ if (limit > 0) {
+ _masterExecutor.wait_for_task_count(limit);
+ }
+ _masterExecutor.execute(std::move(task));
+}
+
+void
ExecutorThreadingService::syncOnce() {
bool isMasterThread = _masterService.isCurrentThread();
if (!isMasterThread) {
@@ -127,13 +139,16 @@ ExecutorThreadingService::shutdown()
}
void
-ExecutorThreadingService::setTaskLimit(uint32_t taskLimit, uint32_t summaryTaskLimit)
+ExecutorThreadingService::set_task_limits(uint32_t master_task_limit,
+ uint32_t field_task_limit,
+ uint32_t summary_task_limit)
{
- _indexExecutor->setTaskLimit(taskLimit);
- _summaryExecutor->setTaskLimit(summaryTaskLimit);
- _index_field_inverter_ptr->setTaskLimit(taskLimit);
- _index_field_writer_ptr->setTaskLimit(taskLimit);
- _attribute_field_writer_ptr->setTaskLimit(taskLimit);
+ _master_task_limit.store(master_task_limit, std::memory_order_release);
+ _indexExecutor->setTaskLimit(field_task_limit);
+ _summaryExecutor->setTaskLimit(summary_task_limit);
+ _index_field_inverter_ptr->setTaskLimit(field_task_limit);
+ _index_field_writer_ptr->setTaskLimit(field_task_limit);
+ _attribute_field_writer_ptr->setTaskLimit(field_task_limit);
}
ExecutorThreadingServiceStats
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
index e571e205f47..1890ca300e2 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
@@ -19,6 +19,7 @@ class ExecutorThreadingService : public searchcorespi::index::IThreadingService
private:
vespalib::ThreadExecutor & _sharedExecutor;
vespalib::ThreadStackExecutor _masterExecutor;
+ std::atomic<uint32_t> _master_task_limit;
std::unique_ptr<vespalib::SyncableThreadExecutor> _indexExecutor;
std::unique_ptr<vespalib::SyncableThreadExecutor> _summaryExecutor;
ExecutorThreadService _masterService;
@@ -36,21 +37,27 @@ private:
public:
using OptimizeFor = vespalib::Executor::OptimizeFor;
/**
- * Constructor.
- *
- * @stackSize The size of the stack of the underlying executors.
- * @cfg config used to set up all executors.
+ * Convenience constructor used in unit tests.
*/
- ExecutorThreadingService(vespalib::ThreadExecutor &sharedExecutor,
- const ThreadingServiceConfig & cfg, uint32_t stackSize = 128 * 1024);
- ExecutorThreadingService(vespalib::ThreadExecutor &sharedExecutor, uint32_t num_treads = 1);
+ ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor, uint32_t num_treads = 1);
+
+ ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor,
+ const ThreadingServiceConfig& cfg,
+ uint32_t stackSize = 128 * 1024);
~ExecutorThreadingService() override;
void sync_all_executors() override;
+ void blocking_master_execute(vespalib::Executor::Task::UP task) override;
+
void shutdown();
- void setTaskLimit(uint32_t taskLimit, uint32_t summaryTaskLimit);
+ uint32_t master_task_limit() const {
+ return _master_task_limit.load(std::memory_order_relaxed);
+ }
+ void set_task_limits(uint32_t master_task_limit,
+ uint32_t field_task_limit,
+ uint32_t summary_task_limit);
// Expose the underlying executors for stats fetching and testing.
// TOD: Remove - This is only used for casting to check the underlying type
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index c9294150f16..ea63d59c830 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -736,7 +736,13 @@ FeedHandler::performOperation(FeedToken token, FeedOperation::UP op)
void
FeedHandler::handleOperation(FeedToken token, FeedOperation::UP op)
{
- _writeService.master().execute(makeLambdaTask([this, token = std::move(token), op = std::move(op)]() mutable {
+ // This function is only called when handling external feed operations (see PersistenceHandlerProxy),
+ // and ensures that the calling thread (persistence thread) is blocked until the master thread has capacity to handle more tasks.
+ // This helps keeping feed operation latencies and memory usage in check.
+ // NOTE: Tasks that are created and executed from the master thread itself or some of its helpers
+ // cannot use blocking_master_execute() as that could lead to deadlocks.
+ // See FeedHandler::initiateCommit() for a concrete example.
+ _writeService.blocking_master_execute(makeLambdaTask([this, token = std::move(token), op = std::move(op)]() mutable {
doHandleOperation(std::move(token), std::move(op));
}));
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
index 012d91cb49f..ff75a59c41b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
@@ -11,12 +11,14 @@ using OptimizeFor = vespalib::Executor::OptimizeFor;
ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_,
+ uint32_t master_task_limit_,
uint32_t defaultTaskLimit_,
OptimizeFor optimize_,
uint32_t kindOfWatermark_,
vespalib::duration reactionTime_,
SharedFieldWriterExecutor shared_field_writer_)
: _indexingThreads(indexingThreads_),
+ _master_task_limit(master_task_limit_),
_defaultTaskLimit(defaultTaskLimit_),
_optimize(optimize_),
_kindOfWatermark(kindOfWatermark_),
@@ -59,7 +61,9 @@ ThreadingServiceConfig
ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo)
{
uint32_t indexingThreads = calculateIndexingThreads(cfg.indexing, concurrency, cpuInfo);
- return ThreadingServiceConfig(indexingThreads, cfg.indexing.tasklimit,
+ return ThreadingServiceConfig(indexingThreads,
+ cfg.feeding.masterTaskLimit,
+ cfg.indexing.tasklimit,
selectOptimization(cfg.indexing.optimize),
cfg.indexing.kindOfWatermark,
vespalib::from_s(cfg.indexing.reactiontime),
@@ -68,12 +72,13 @@ ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const
ThreadingServiceConfig
ThreadingServiceConfig::make(uint32_t indexingThreads, SharedFieldWriterExecutor shared_field_writer_) {
- return ThreadingServiceConfig(indexingThreads, 100, OptimizeFor::LATENCY, 0, 10ms, shared_field_writer_);
+ return ThreadingServiceConfig(indexingThreads, 0, 100, OptimizeFor::LATENCY, 0, 10ms, shared_field_writer_);
}
void
ThreadingServiceConfig::update(const ThreadingServiceConfig& cfg)
{
+ _master_task_limit = cfg._master_task_limit;
_defaultTaskLimit = cfg._defaultTaskLimit;
}
@@ -81,6 +86,7 @@ bool
ThreadingServiceConfig::operator==(const ThreadingServiceConfig &rhs) const
{
return _indexingThreads == rhs._indexingThreads &&
+ _master_task_limit == rhs._master_task_limit &&
_defaultTaskLimit == rhs._defaultTaskLimit &&
_optimize == rhs._optimize &&
_kindOfWatermark == rhs._kindOfWatermark &&
diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
index 5869eaf9c2e..f1a4f0525d1 100644
--- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
+++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
@@ -21,6 +21,7 @@ public:
private:
uint32_t _indexingThreads;
+ uint32_t _master_task_limit;
uint32_t _defaultTaskLimit;
OptimizeFor _optimize;
uint32_t _kindOfWatermark;
@@ -28,14 +29,16 @@ private:
SharedFieldWriterExecutor _shared_field_writer;
private:
- ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, OptimizeFor optimize_,
- uint32_t kindOfWatermark_, vespalib::duration reactionTime_, SharedFieldWriterExecutor shared_field_writer_);
+ ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t master_task_limit_, uint32_t defaultTaskLimit_,
+ OptimizeFor optimize_, uint32_t kindOfWatermark_, vespalib::duration reactionTime_,
+ SharedFieldWriterExecutor shared_field_writer_);
public:
static ThreadingServiceConfig make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo);
static ThreadingServiceConfig make(uint32_t indexingThreads, SharedFieldWriterExecutor shared_field_writer_ = SharedFieldWriterExecutor::NONE);
void update(const ThreadingServiceConfig& cfg);
uint32_t indexingThreads() const { return _indexingThreads; }
+ uint32_t master_task_limit() const { return _master_task_limit; }
uint32_t defaultTaskLimit() const { return _defaultTaskLimit; }
OptimizeFor optimize() const { return _optimize; }
uint32_t kindOfwatermark() const { return _kindOfWatermark; }
diff --git a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h
index 1d379f439fa..46527362091 100644
--- a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h
+++ b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h
@@ -37,6 +37,10 @@ public:
_service.sync_all_executors();
}
+ void blocking_master_execute(vespalib::Executor::Task::UP task) override {
+ _service.blocking_master_execute(std::move(task));
+ }
+
searchcorespi::index::IThreadService &master() override {
return _master;
}
diff --git a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h
index f30aec94d53..0660f3ab495 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h
+++ b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h
@@ -65,6 +65,12 @@ struct IThreadingService
virtual void sync_all_executors() = 0;
+ /**
+ * Block the calling thread until the master thread has capacity to handle more tasks,
+ * and then execute the given task in the master thread.
+ */
+ virtual void blocking_master_execute(vespalib::Executor::Task::UP task) = 0;
+
virtual IThreadService &master() = 0;
virtual IThreadService &index() = 0;
virtual IThreadService &summary() = 0;