diff options
17 files changed, 181 insertions, 119 deletions
diff --git a/client/go/cli/cli.go b/client/go/auth0/auth0.go index 22c40f195b4..92bd1178fec 100644 --- a/client/go/cli/cli.go +++ b/client/go/auth0/auth0.go @@ -1,6 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package cli +package auth0 import ( "context" @@ -37,7 +37,7 @@ type System struct { ExpiresAt time.Time `json:"expires_at"` } -type Cli struct { +type Auth0 struct { Authenticator *auth.Authenticator system string initOnce sync.Once @@ -68,36 +68,36 @@ func ContextWithCancel() context.Context { return ctx } -// GetCli will try to initialize the config context, as well as figure out if +// GetAuth0 will try to initialize the config context, as well as figure out if // there's a readily available system. -func GetCli(configPath string, systemName string) (*Cli, error) { - c := Cli{} - c.Path = configPath - c.system = systemName +func GetAuth0(configPath string, systemName string) (*Auth0, error) { + a := Auth0{} + a.Path = configPath + a.system = systemName if err := envdecode.StrictDecode(&authCfg); err != nil { return nil, fmt.Errorf("could not decode env: %w", err) } - c.Authenticator = &auth.Authenticator{ + a.Authenticator = &auth.Authenticator{ Audience: authCfg.Audience, ClientID: authCfg.ClientID, DeviceCodeEndpoint: authCfg.DeviceCodeEndpoint, OauthTokenEndpoint: authCfg.OauthTokenEndpoint, } - return &c, nil + return &a, nil } // IsLoggedIn encodes the domain logic for determining whether we're // logged in. This might check our config storage, or just in memory. -func (c *Cli) IsLoggedIn() bool { +func (a *Auth0) IsLoggedIn() bool { // No need to check errors for initializing context. - _ = c.init() + _ = a.init() - if c.system == "" { + if a.system == "" { return false } // Parse the access token for the system. - token, err := jwt.ParseString(c.config.Systems[c.system].AccessToken) + token, err := jwt.ParseString(a.config.Systems[a.system].AccessToken) if err != nil { return false } @@ -114,17 +114,17 @@ func (c *Cli) IsLoggedIn() bool { // The System access token needs a refresh if: // 1. the System scopes are different from the currently required scopes - (auth0 changes). // 2. the access token is expired. -func (c *Cli) PrepareSystem(ctx context.Context) (System, error) { - if err := c.init(); err != nil { +func (a *Auth0) PrepareSystem(ctx context.Context) (System, error) { + if err := a.init(); err != nil { return System{}, err } - s, err := c.getSystem() + s, err := a.getSystem() if err != nil { return System{}, err } if s.AccessToken == "" || scopesChanged(s) { - s, err = RunLogin(ctx, c, true) + s, err = RunLogin(ctx, a, true) if err != nil { return System{}, err } @@ -132,16 +132,16 @@ func (c *Cli) PrepareSystem(ctx context.Context) (System, error) { // check if the stored access token is expired: // use the refresh token to get a new access token: tr := &auth.TokenRetriever{ - Authenticator: c.Authenticator, + Authenticator: a.Authenticator, Secrets: &auth.Keyring{}, Client: http.DefaultClient, } - res, err := tr.Refresh(ctx, c.system) + res, err := tr.Refresh(ctx, a.system) if err != nil { // ask and guide the user through the login process: fmt.Println(fmt.Errorf("failed to renew access token, %s", err)) - s, err = RunLogin(ctx, c, true) + s, err = RunLogin(ctx, a, true) if err != nil { return System{}, err } @@ -152,7 +152,7 @@ func (c *Cli) PrepareSystem(ctx context.Context) (System, error) { time.Duration(res.ExpiresIn) * time.Second, ) - err = c.AddSystem(s) + err = a.AddSystem(s) if err != nil { return System{}, err } @@ -193,14 +193,14 @@ func scopesChanged(s System) bool { return false } -func (c *Cli) getSystem() (System, error) { - if err := c.init(); err != nil { +func (a *Auth0) getSystem() (System, error) { + if err := a.init(); err != nil { return System{}, err } - s, ok := c.config.Systems[c.system] + s, ok := a.config.Systems[a.system] if !ok { - return System{}, fmt.Errorf("unable to find system: %s; run 'vespa login' to configure a new system", c.system) + return System{}, fmt.Errorf("unable to find system: %s; run 'vespa login' to configure a new system", a.system) } return s, nil @@ -208,63 +208,63 @@ func (c *Cli) getSystem() (System, error) { // AddSystem assigns an existing, or new System. This is expected to be called // after a login has completed. -func (c *Cli) AddSystem(s System) error { - _ = c.init() +func (a *Auth0) AddSystem(s System) error { + _ = a.init() // If we're dealing with an empty file, we'll need to initialize this map. - if c.config.Systems == nil { - c.config.Systems = map[string]System{} + if a.config.Systems == nil { + a.config.Systems = map[string]System{} } - c.config.Systems[c.system] = s + a.config.Systems[a.system] = s - if err := c.persistConfig(); err != nil { + if err := a.persistConfig(); err != nil { return fmt.Errorf("unexpected error persisting config: %w", err) } return nil } -func (c *Cli) persistConfig() error { - dir := filepath.Dir(c.Path) +func (a *Auth0) persistConfig() error { + dir := filepath.Dir(a.Path) if _, err := os.Stat(dir); os.IsNotExist(err) { if err := os.MkdirAll(dir, 0700); err != nil { return err } } - buf, err := json.MarshalIndent(c.config, "", " ") + buf, err := json.MarshalIndent(a.config, "", " ") if err != nil { return err } - if err := ioutil.WriteFile(c.Path, buf, 0600); err != nil { + if err := ioutil.WriteFile(a.Path, buf, 0600); err != nil { return err } return nil } -func (c *Cli) init() error { - c.initOnce.Do(func() { - if c.errOnce = c.initContext(); c.errOnce != nil { +func (a *Auth0) init() error { + a.initOnce.Do(func() { + if a.errOnce = a.initContext(); a.errOnce != nil { return } }) - return c.errOnce + return a.errOnce } -func (c *Cli) initContext() (err error) { - if _, err := os.Stat(c.Path); os.IsNotExist(err) { +func (a *Auth0) initContext() (err error) { + if _, err := os.Stat(a.Path); os.IsNotExist(err) { return errUnauthenticated } var buf []byte - if buf, err = ioutil.ReadFile(c.Path); err != nil { + if buf, err = ioutil.ReadFile(a.Path); err != nil { return err } - if err := json.Unmarshal(buf, &c.config); err != nil { + if err := json.Unmarshal(buf, &a.config); err != nil { return err } @@ -275,12 +275,12 @@ func (c *Cli) initContext() (err error) { // by showing the login instructions, opening the browser. // Use `expired` to run the login from other commands setup: // this will only affect the messages. -func RunLogin(ctx context.Context, c *Cli, expired bool) (System, error) { +func RunLogin(ctx context.Context, a *Auth0, expired bool) (System, error) { if expired { fmt.Println("Please sign in to re-authorize the CLI.") } - state, err := c.Authenticator.Start(ctx) + state, err := a.Authenticator.Start(ctx) if err != nil { return System{}, fmt.Errorf("could not start the authentication process: %w", err) } @@ -297,7 +297,7 @@ func RunLogin(ctx context.Context, c *Cli, expired bool) (System, error) { var res auth.Result err = util.Spinner("Waiting for login to complete in browser", func() error { - res, err = c.Authenticator.Wait(ctx, state) + res, err = a.Authenticator.Wait(ctx, state) return err }) @@ -311,7 +311,7 @@ func RunLogin(ctx context.Context, c *Cli, expired bool) (System, error) { // store the refresh token secretsStore := &auth.Keyring{} - err = secretsStore.Set(auth.SecretsNamespace, c.system, res.RefreshToken) + err = secretsStore.Set(auth.SecretsNamespace, a.system, res.RefreshToken) if err != nil { // log the error but move on fmt.Println("Could not store the refresh token locally, please expect to login again once your access token expired.") @@ -322,7 +322,7 @@ func RunLogin(ctx context.Context, c *Cli, expired bool) (System, error) { ExpiresAt: time.Now().Add(time.Duration(res.ExpiresIn) * time.Second), Scopes: auth.RequiredScopes(), } - err = c.AddSystem(s) + err = a.AddSystem(s) if err != nil { return System{}, fmt.Errorf("could not add system to config: %w", err) } diff --git a/client/go/cmd/config.go b/client/go/cmd/config.go index 0b08a2dc28d..62a97d9749d 100644 --- a/client/go/cmd/config.go +++ b/client/go/cmd/config.go @@ -149,7 +149,7 @@ func (c *Config) ReadAPIKey(tenantName string) ([]byte, error) { } func (c *Config) AuthConfigPath() string { - return filepath.Join(c.Home, "auth", "config.json") + return filepath.Join(c.Home, "auth0.json") } func (c *Config) ReadSessionID(app vespa.ApplicationID) (int64, error) { diff --git a/client/go/cmd/login.go b/client/go/cmd/login.go index 415d44b75db..0e09a6d6244 100644 --- a/client/go/cmd/login.go +++ b/client/go/cmd/login.go @@ -2,7 +2,7 @@ package cmd import ( "github.com/spf13/cobra" - "github.com/vespa-engine/vespa/client/go/cli" + "github.com/vespa-engine/vespa/client/go/auth0" "github.com/vespa-engine/vespa/client/go/vespa" ) @@ -24,11 +24,11 @@ var loginCmd = &cobra.Command{ if err != nil { return err } - c, err := cli.GetCli(cfg.AuthConfigPath(), getSystemName()) + a, err := auth0.GetAuth0(cfg.AuthConfigPath(), getSystemName()) if err != nil { return err } - _, err = cli.RunLogin(ctx, c, false) + _, err = auth0.RunLogin(ctx, a, false) return err }, } diff --git a/client/go/vespa/target.go b/client/go/vespa/target.go index 367685df34d..e4779e14c0d 100644 --- a/client/go/vespa/target.go +++ b/client/go/vespa/target.go @@ -18,7 +18,7 @@ import ( "strings" "time" - "github.com/vespa-engine/vespa/client/go/cli" + "github.com/vespa-engine/vespa/client/go/auth0" "github.com/vespa-engine/vespa/client/go/util" ) @@ -255,8 +255,8 @@ func (t *cloudTarget) PrepareApiRequest(req *http.Request, sigKeyId string) erro } func (t *cloudTarget) addAuth0AccessToken(request *http.Request) error { - c, err := cli.GetCli(t.authConfigPath, t.systemName) - system, err := c.PrepareSystem(cli.ContextWithCancel()) + a, err := auth0.GetAuth0(t.authConfigPath, t.systemName) + system, err := a.PrepareSystem(auth0.ContextWithCancel()) if err != nil { return err } diff --git a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp index 714ffaa16b7..32707f8a69f 100644 --- a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp +++ b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp @@ -75,5 +75,17 @@ TEST_F(ExecutorThreadingServiceTest, shared_executor_for_index_and_attribute_fie assert_executor(index_inverter(), 12, 100); } +TEST_F(ExecutorThreadingServiceTest, tasks_limits_can_be_updated) +{ + setup(4, SharedFieldWriterExecutor::NONE); + service->set_task_limits(5, 7, 11); + EXPECT_EQ(5, service->master_task_limit()); + EXPECT_EQ(7, service->index().getTaskLimit()); + EXPECT_EQ(11, service->summary().getTaskLimit()); + EXPECT_EQ(7, index_inverter()->first_executor()->getTaskLimit()); + EXPECT_EQ(7, index_writer()->first_executor()->getTaskLimit()); + EXPECT_EQ(7, attribute_writer()->first_executor()->getTaskLimit()); +} + GTEST_MAIN_RUN_ALL_TESTS() diff --git a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp index 353ebeb3abc..50a8349b859 100644 --- a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp +++ b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp @@ -14,15 +14,15 @@ using ProtonConfigBuilder = vespa::config::search::core::ProtonConfigBuilder; struct Fixture { ProtonConfig cfg; - Fixture(uint32_t baseLineIndexingThreads = 2, uint32_t task_limit = 500, uint32_t semi_unbound_task_limit = 50000) - : cfg(makeConfig(baseLineIndexingThreads, task_limit, semi_unbound_task_limit)) + Fixture(uint32_t baseLineIndexingThreads = 2, uint32_t master_task_limit = 2000, uint32_t task_limit = 500) + : cfg(makeConfig(baseLineIndexingThreads, master_task_limit, task_limit)) { } - ProtonConfig makeConfig(uint32_t baseLineIndexingThreads, uint32_t task_limit, uint32_t semi_unbound_task_limit) { + ProtonConfig makeConfig(uint32_t baseLineIndexingThreads, uint32_t master_task_limit, uint32_t task_limit) { ProtonConfigBuilder builder; builder.indexing.threads = baseLineIndexingThreads; builder.indexing.tasklimit = task_limit; - builder.indexing.semiunboundtasklimit = semi_unbound_task_limit; + builder.feeding.masterTaskLimit = master_task_limit; return builder; } ThreadingServiceConfig make(uint32_t cpuCores) { @@ -51,28 +51,32 @@ TEST_F("require that indexing threads is always >= 1", Fixture(0)) TEST_DO(f.assertIndexingThreads(1, 0)); } -TEST_F("require that default task limit is set", Fixture) +TEST_F("require that task limits are set", Fixture) { - EXPECT_EQUAL(500u, f.make(24).defaultTaskLimit()); + auto tcfg = f.make(24); + EXPECT_EQUAL(2000u, tcfg.master_task_limit()); + EXPECT_EQUAL(500u, tcfg.defaultTaskLimit()); } namespace { -void assertConfig(uint32_t exp_indexing_threads, uint32_t exp_default_task_limit, const ThreadingServiceConfig &config) { +void assertConfig(uint32_t exp_indexing_threads, uint32_t exp_master_task_limit, + uint32_t exp_default_task_limit, const ThreadingServiceConfig& config) { EXPECT_EQUAL(exp_indexing_threads, config.indexingThreads()); + EXPECT_EQUAL(exp_master_task_limit, config.master_task_limit()); EXPECT_EQUAL(exp_default_task_limit, config.defaultTaskLimit()); } } -TEST_FF("require that config can be somewhat updated", Fixture(), Fixture(2, 1000, 100000)) +TEST_FF("require that config can be somewhat updated", Fixture(), Fixture(2, 3000, 1000)) { auto cfg1 = f1.make(1); - assertConfig(2u, 500u, cfg1); + assertConfig(2u, 2000, 500u, cfg1); const auto cfg2 = f2.make(13); - assertConfig(3u, 1000u, cfg2); + assertConfig(3u, 3000u, 1000u, cfg2); cfg1.update(cfg2); - assertConfig(2u, 1000u, cfg1); // Indexing threads not changed + assertConfig(2u, 3000u, 1000u, cfg1); // Indexing threads not changed } TEST_MAIN() diff --git a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp index c252c89a2f8..ce0b09a48b8 100644 --- a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp +++ b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp @@ -18,7 +18,6 @@ #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vdslib/state/clusterstate.h> #include <vespa/vespalib/testkit/testapp.h> -#include <algorithm> #include <set> using document::BucketId; @@ -287,12 +286,6 @@ struct MyHandler : public IPersistenceHandler, IBucketFreezer { ASSERT_TRUE(it != frozen.end()); frozen.erase(it); } - bool isFrozen(const Bucket &bucket) { - return frozen.find(bucket.getBucketId().getId()) != frozen.end(); - } - bool wasFrozen(const Bucket &bucket) { - return was_frozen.find(bucket.getBucketId().getId()) != was_frozen.end(); - } }; diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def index 6b85f4a6829..5857fdd4f8d 100644 --- a/searchcore/src/vespa/searchcore/config/proton.def +++ b/searchcore/src/vespa/searchcore/config/proton.def @@ -503,6 +503,13 @@ feeding.concurrency double default = 0.2 restart ## TODO: Remove this when a shared executor is the default. feeding.shared_field_writer_executor enum {NONE, INDEX, INDEX_AND_ATTRIBUTE, DOCUMENT_DB} default = NONE restart +## Maximum number of pending tasks for the master thread in each document db. +## +## This limit is only considered when executing tasks for handling external feed operations. +## In that case the calling thread (persistence thread) is blocked until the master thread has capacity to handle more tasks. +## When this limit is set to 0 it is ignored. +feeding.master_task_limit int default = 0 + ## Adjustment to resource limit when determining if maintenance jobs can run. ## ## Currently used by 'lid_space_compaction' and 'move_buckets' jobs. diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 04d91b9c028..9d9e1e0a344 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -29,7 +29,7 @@ using storage::spi::Result; using storage::spi::OperationComplete; using vespalib::IllegalStateException; using vespalib::Sequence; -using vespalib::make_string; +using vespalib::make_string_short::fmt; using std::make_unique; using namespace std::chrono_literals; @@ -347,8 +347,7 @@ PersistenceEngine::putAsync(const Bucket &bucket, Timestamp ts, storage::spi::Do IResourceWriteFilter::State state = _writeFilter.getAcceptState(); if (!state.acceptWriteOperation()) { return onComplete->onComplete(std::make_unique<Result>(Result::ErrorType::RESOURCE_EXHAUSTED, - make_string("Put operation rejected for document '%s': '%s'", - doc->getId().toString().c_str(), state.message().c_str()))); + fmt("Put operation rejected for document '%s': '%s'", doc->getId().toString().c_str(), state.message().c_str()))); } } ReadGuard rguard(_rwMutex); @@ -357,12 +356,12 @@ PersistenceEngine::putAsync(const Bucket &bucket, Timestamp ts, storage::spi::Do docType.toString().c_str(), doc->getId().toString().c_str()); if (!doc->getId().hasDocType()) { return onComplete->onComplete(std::make_unique<Result>(Result::ErrorType::PERMANENT_ERROR, - make_string("Old id scheme not supported in elastic mode (%s)", doc->getId().toString().c_str()))); + fmt("Old id scheme not supported in elastic mode (%s)", doc->getId().toString().c_str()))); } IPersistenceHandler * handler = getHandler(rguard, bucket.getBucketSpace(), docType); if (!handler) { return onComplete->onComplete(std::make_unique<Result>(Result::ErrorType::PERMANENT_ERROR, - make_string("No handler for document type '%s'", docType.toString().c_str()))); + fmt("No handler for document type '%s'", docType.toString().c_str()))); } auto transportContext = std::make_shared<AsyncTransportContext>(1, std::move(onComplete)); handler->handlePut(feedtoken::make(std::move(transportContext)), bucket, ts, std::move(doc)); @@ -376,13 +375,13 @@ PersistenceEngine::removeAsync(const Bucket& b, Timestamp t, const DocumentId& d static_cast<uint64_t>(t.getValue()), did.toString().c_str()); if (!did.hasDocType()) { return onComplete->onComplete(std::make_unique<RemoveResult>(Result::ErrorType::PERMANENT_ERROR, - make_string("Old id scheme not supported in elastic mode (%s)", did.toString().c_str()))); + fmt("Old id scheme not supported in elastic mode (%s)", did.toString().c_str()))); } DocTypeName docType(did.getDocType()); IPersistenceHandler * handler = getHandler(rguard, b.getBucketSpace(), docType); if (!handler) { return onComplete->onComplete(std::make_unique<RemoveResult>(Result::ErrorType::PERMANENT_ERROR, - make_string("No handler for document type '%s'", docType.toString().c_str()))); + fmt("No handler for document type '%s'", docType.toString().c_str()))); } auto transportContext = std::make_shared<AsyncTransportContext>(1, std::move(onComplete)); handler->handleRemove(feedtoken::make(std::move(transportContext)), b, t, did); @@ -396,27 +395,24 @@ PersistenceEngine::updateAsync(const Bucket& b, Timestamp t, DocumentUpdate::SP IResourceWriteFilter::State state = _writeFilter.getAcceptState(); if (!state.acceptWriteOperation() && document::FeedRejectHelper::mustReject(*upd)) { return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::RESOURCE_EXHAUSTED, - make_string("Update operation rejected for document '%s': '%s'", - upd->getId().toString().c_str(), state.message().c_str()))); + fmt("Update operation rejected for document '%s': '%s'", upd->getId().toString().c_str(), state.message().c_str()))); } } try { upd->eagerDeserialize(); } catch (document::FieldNotFoundException & e) { return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::TRANSIENT_ERROR, - make_string("Update operation rejected for document '%s' of type '%s': 'Field not found'", - upd->getId().toString().c_str(), upd->getType().getName().c_str()))); + fmt("Update operation rejected for document '%s' of type '%s': 'Field not found'", + upd->getId().toString().c_str(), upd->getType().getName().c_str()))); } catch (document::DocumentTypeNotFoundException & e) { return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::TRANSIENT_ERROR, - make_string("Update operation rejected for document '%s' of type '%s'.", - upd->getId().toString().c_str(), e.getDocumentTypeName().c_str()))); + fmt("Update operation rejected for document '%s' of type '%s'.", + upd->getId().toString().c_str(), e.getDocumentTypeName().c_str()))); } catch (document::WrongTensorTypeException &e) { return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::TRANSIENT_ERROR, - make_string("Update operation rejected for document '%s' of type '%s': 'Wrong tensor type: %s'", - upd->getId().toString().c_str(), - upd->getType().getName().c_str(), - e.getMessage().c_str()))); + fmt("Update operation rejected for document '%s' of type '%s': 'Wrong tensor type: %s'", + upd->getId().toString().c_str(), upd->getType().getName().c_str(), e.getMessage().c_str()))); } ReadGuard rguard(_rwMutex); DocTypeName docType(upd->getType()); @@ -425,16 +421,17 @@ PersistenceEngine::updateAsync(const Bucket& b, Timestamp t, DocumentUpdate::SP upd->getId().toString().c_str(), (upd->getCreateIfNonExistent() ? "true" : "false")); if (!upd->getId().hasDocType()) { return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR, - make_string("Old id scheme not supported in elastic mode (%s)", upd->getId().toString().c_str()))); + fmt("Old id scheme not supported in elastic mode (%s)", upd->getId().toString().c_str()))); } if (upd->getId().getDocType() != docType.getName()) { return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR, - make_string("Update operation rejected due to bad id (%s, %s)", upd->getId().toString().c_str(), docType.getName().c_str()))); + fmt("Update operation rejected due to bad id (%s, %s)", upd->getId().toString().c_str(), docType.getName().c_str()))); } IPersistenceHandler * handler = getHandler(rguard, b.getBucketSpace(), docType); if (handler == nullptr) { - return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str()))); + return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR, + fmt("No handler for document type '%s'", docType.toString().c_str()))); } auto transportContext = std::make_shared<AsyncTransportContext>(1, std::move(onComplete)); handler->handleUpdate(feedtoken::make(std::move(transportContext)), b, t, std::move(upd)); @@ -505,11 +502,11 @@ PersistenceEngine::iterate(IteratorId id, uint64_t maxByteSize, Context&) const std::lock_guard<std::mutex> guard(_iterators_lock); auto it = _iterators.find(id); if (it == _iterators.end()) { - return IterateResult(Result::ErrorType::PERMANENT_ERROR, make_string("Unknown iterator with id %" PRIu64, id.getValue())); + return IterateResult(Result::ErrorType::PERMANENT_ERROR, fmt("Unknown iterator with id %" PRIu64, id.getValue())); } iteratorEntry = it->second; if (iteratorEntry->in_use) { - return IterateResult(Result::ErrorType::TRANSIENT_ERROR, make_string("Iterator with id %" PRIu64 " is already in use", id.getValue())); + return IterateResult(Result::ErrorType::TRANSIENT_ERROR, fmt("Iterator with id %" PRIu64 " is already in use", id.getValue())); } iteratorEntry->in_use = true; } @@ -521,7 +518,7 @@ PersistenceEngine::iterate(IteratorId id, uint64_t maxByteSize, Context&) const iteratorEntry->in_use = false; return result; } catch (const std::exception & e) { - IterateResult result(Result::ErrorType::PERMANENT_ERROR, make_string("Caught exception during visitor iterator.iterate() = '%s'", e.what())); + IterateResult result(Result::ErrorType::PERMANENT_ERROR, fmt("Caught exception during visitor iterator.iterate() = '%s'", e.what())); LOG(warning, "Caught exception during visitor iterator.iterate() = '%s'", e.what()); std::lock_guard<std::mutex> guard(_iterators_lock); iteratorEntry->in_use = false; @@ -540,7 +537,7 @@ PersistenceEngine::destroyIterator(IteratorId id, Context&) return Result(); } if (it->second->in_use) { - return Result(Result::ErrorType::TRANSIENT_ERROR, make_string("Iterator with id %" PRIu64 " is currently in use", id.getValue())); + return Result(Result::ErrorType::TRANSIENT_ERROR, fmt("Iterator with id %" PRIu64 " is currently in use", id.getValue())); } delete it->second; _iterators.erase(it); diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 3f2fb6c4634..427d435aae7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -482,7 +482,9 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum if (_state.getState() >= DDBState::State::APPLY_LIVE_CONFIG) { _writeServiceConfig.update(configSnapshot->get_threading_service_config()); } - _writeService.setTaskLimit(_writeServiceConfig.defaultTaskLimit(), _writeServiceConfig.defaultTaskLimit()); + _writeService.set_task_limits(_writeServiceConfig.master_task_limit(), + _writeServiceConfig.defaultTaskLimit(), + _writeServiceConfig.defaultTaskLimit()); if (params.shouldSubDbsChange()) { applySubDBConfig(*configSnapshot, serialNum, params); if (serialNum < _feedHandler->getSerialNum()) { diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp index 0e9ba7a24c8..7e0a1851bf5 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp @@ -42,11 +42,13 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor &sha : ExecutorThreadingService(sharedExecutor, ThreadingServiceConfig::make(num_treads)) {} -ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor & sharedExecutor, - const ThreadingServiceConfig & cfg, uint32_t stackSize) +ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor, + const ThreadingServiceConfig& cfg, + uint32_t stackSize) : _sharedExecutor(sharedExecutor), _masterExecutor(1, stackSize, master_executor), + _master_task_limit(cfg.master_task_limit()), _indexExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), index_executor)), _summaryExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), summary_executor)), _masterService(_masterExecutor), @@ -97,6 +99,16 @@ ExecutorThreadingService::sync_all_executors() { } void +ExecutorThreadingService::blocking_master_execute(vespalib::Executor::Task::UP task) +{ + uint32_t limit = master_task_limit(); + if (limit > 0) { + _masterExecutor.wait_for_task_count(limit); + } + _masterExecutor.execute(std::move(task)); +} + +void ExecutorThreadingService::syncOnce() { bool isMasterThread = _masterService.isCurrentThread(); if (!isMasterThread) { @@ -127,13 +139,16 @@ ExecutorThreadingService::shutdown() } void -ExecutorThreadingService::setTaskLimit(uint32_t taskLimit, uint32_t summaryTaskLimit) +ExecutorThreadingService::set_task_limits(uint32_t master_task_limit, + uint32_t field_task_limit, + uint32_t summary_task_limit) { - _indexExecutor->setTaskLimit(taskLimit); - _summaryExecutor->setTaskLimit(summaryTaskLimit); - _index_field_inverter_ptr->setTaskLimit(taskLimit); - _index_field_writer_ptr->setTaskLimit(taskLimit); - _attribute_field_writer_ptr->setTaskLimit(taskLimit); + _master_task_limit.store(master_task_limit, std::memory_order_release); + _indexExecutor->setTaskLimit(field_task_limit); + _summaryExecutor->setTaskLimit(summary_task_limit); + _index_field_inverter_ptr->setTaskLimit(field_task_limit); + _index_field_writer_ptr->setTaskLimit(field_task_limit); + _attribute_field_writer_ptr->setTaskLimit(field_task_limit); } ExecutorThreadingServiceStats diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h index e571e205f47..1890ca300e2 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h @@ -19,6 +19,7 @@ class ExecutorThreadingService : public searchcorespi::index::IThreadingService private: vespalib::ThreadExecutor & _sharedExecutor; vespalib::ThreadStackExecutor _masterExecutor; + std::atomic<uint32_t> _master_task_limit; std::unique_ptr<vespalib::SyncableThreadExecutor> _indexExecutor; std::unique_ptr<vespalib::SyncableThreadExecutor> _summaryExecutor; ExecutorThreadService _masterService; @@ -36,21 +37,27 @@ private: public: using OptimizeFor = vespalib::Executor::OptimizeFor; /** - * Constructor. - * - * @stackSize The size of the stack of the underlying executors. - * @cfg config used to set up all executors. + * Convenience constructor used in unit tests. */ - ExecutorThreadingService(vespalib::ThreadExecutor &sharedExecutor, - const ThreadingServiceConfig & cfg, uint32_t stackSize = 128 * 1024); - ExecutorThreadingService(vespalib::ThreadExecutor &sharedExecutor, uint32_t num_treads = 1); + ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor, uint32_t num_treads = 1); + + ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor, + const ThreadingServiceConfig& cfg, + uint32_t stackSize = 128 * 1024); ~ExecutorThreadingService() override; void sync_all_executors() override; + void blocking_master_execute(vespalib::Executor::Task::UP task) override; + void shutdown(); - void setTaskLimit(uint32_t taskLimit, uint32_t summaryTaskLimit); + uint32_t master_task_limit() const { + return _master_task_limit.load(std::memory_order_relaxed); + } + void set_task_limits(uint32_t master_task_limit, + uint32_t field_task_limit, + uint32_t summary_task_limit); // Expose the underlying executors for stats fetching and testing. // TOD: Remove - This is only used for casting to check the underlying type diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index c9294150f16..ea63d59c830 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -736,7 +736,13 @@ FeedHandler::performOperation(FeedToken token, FeedOperation::UP op) void FeedHandler::handleOperation(FeedToken token, FeedOperation::UP op) { - _writeService.master().execute(makeLambdaTask([this, token = std::move(token), op = std::move(op)]() mutable { + // This function is only called when handling external feed operations (see PersistenceHandlerProxy), + // and ensures that the calling thread (persistence thread) is blocked until the master thread has capacity to handle more tasks. + // This helps keeping feed operation latencies and memory usage in check. + // NOTE: Tasks that are created and executed from the master thread itself or some of its helpers + // cannot use blocking_master_execute() as that could lead to deadlocks. + // See FeedHandler::initiateCommit() for a concrete example. + _writeService.blocking_master_execute(makeLambdaTask([this, token = std::move(token), op = std::move(op)]() mutable { doHandleOperation(std::move(token), std::move(op)); })); } diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp index 012d91cb49f..ff75a59c41b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp @@ -11,12 +11,14 @@ using OptimizeFor = vespalib::Executor::OptimizeFor; ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_, + uint32_t master_task_limit_, uint32_t defaultTaskLimit_, OptimizeFor optimize_, uint32_t kindOfWatermark_, vespalib::duration reactionTime_, SharedFieldWriterExecutor shared_field_writer_) : _indexingThreads(indexingThreads_), + _master_task_limit(master_task_limit_), _defaultTaskLimit(defaultTaskLimit_), _optimize(optimize_), _kindOfWatermark(kindOfWatermark_), @@ -59,7 +61,9 @@ ThreadingServiceConfig ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo) { uint32_t indexingThreads = calculateIndexingThreads(cfg.indexing, concurrency, cpuInfo); - return ThreadingServiceConfig(indexingThreads, cfg.indexing.tasklimit, + return ThreadingServiceConfig(indexingThreads, + cfg.feeding.masterTaskLimit, + cfg.indexing.tasklimit, selectOptimization(cfg.indexing.optimize), cfg.indexing.kindOfWatermark, vespalib::from_s(cfg.indexing.reactiontime), @@ -68,12 +72,13 @@ ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const ThreadingServiceConfig ThreadingServiceConfig::make(uint32_t indexingThreads, SharedFieldWriterExecutor shared_field_writer_) { - return ThreadingServiceConfig(indexingThreads, 100, OptimizeFor::LATENCY, 0, 10ms, shared_field_writer_); + return ThreadingServiceConfig(indexingThreads, 0, 100, OptimizeFor::LATENCY, 0, 10ms, shared_field_writer_); } void ThreadingServiceConfig::update(const ThreadingServiceConfig& cfg) { + _master_task_limit = cfg._master_task_limit; _defaultTaskLimit = cfg._defaultTaskLimit; } @@ -81,6 +86,7 @@ bool ThreadingServiceConfig::operator==(const ThreadingServiceConfig &rhs) const { return _indexingThreads == rhs._indexingThreads && + _master_task_limit == rhs._master_task_limit && _defaultTaskLimit == rhs._defaultTaskLimit && _optimize == rhs._optimize && _kindOfWatermark == rhs._kindOfWatermark && diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h index 5869eaf9c2e..f1a4f0525d1 100644 --- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h @@ -21,6 +21,7 @@ public: private: uint32_t _indexingThreads; + uint32_t _master_task_limit; uint32_t _defaultTaskLimit; OptimizeFor _optimize; uint32_t _kindOfWatermark; @@ -28,14 +29,16 @@ private: SharedFieldWriterExecutor _shared_field_writer; private: - ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, OptimizeFor optimize_, - uint32_t kindOfWatermark_, vespalib::duration reactionTime_, SharedFieldWriterExecutor shared_field_writer_); + ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t master_task_limit_, uint32_t defaultTaskLimit_, + OptimizeFor optimize_, uint32_t kindOfWatermark_, vespalib::duration reactionTime_, + SharedFieldWriterExecutor shared_field_writer_); public: static ThreadingServiceConfig make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo); static ThreadingServiceConfig make(uint32_t indexingThreads, SharedFieldWriterExecutor shared_field_writer_ = SharedFieldWriterExecutor::NONE); void update(const ThreadingServiceConfig& cfg); uint32_t indexingThreads() const { return _indexingThreads; } + uint32_t master_task_limit() const { return _master_task_limit; } uint32_t defaultTaskLimit() const { return _defaultTaskLimit; } OptimizeFor optimize() const { return _optimize; } uint32_t kindOfwatermark() const { return _kindOfWatermark; } diff --git a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h index 1d379f439fa..46527362091 100644 --- a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h +++ b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h @@ -37,6 +37,10 @@ public: _service.sync_all_executors(); } + void blocking_master_execute(vespalib::Executor::Task::UP task) override { + _service.blocking_master_execute(std::move(task)); + } + searchcorespi::index::IThreadService &master() override { return _master; } diff --git a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h index f30aec94d53..0660f3ab495 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h +++ b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h @@ -65,6 +65,12 @@ struct IThreadingService virtual void sync_all_executors() = 0; + /** + * Block the calling thread until the master thread has capacity to handle more tasks, + * and then execute the given task in the master thread. + */ + virtual void blocking_master_execute(vespalib::Executor::Task::UP task) = 0; + virtual IThreadService &master() = 0; virtual IThreadService &index() = 0; virtual IThreadService &summary() = 0; |