diff options
author | Arne H Juul <arnej@yahooinc.com> | 2022-02-17 18:42:02 +0000 |
---|---|---|
committer | Arne H Juul <arnej@yahooinc.com> | 2022-02-17 18:42:02 +0000 |
commit | 419ca82c28e7ea97ae7f7f1265eca158fae0844c (patch) | |
tree | 55ee9be894ad4f862ffeb12f733048451ff8c063 /searchcore | |
parent | c1e701cd70e2531302a6e72d0900772f4296ab2c (diff) | |
parent | 70e50ea1b7d974ffb2e1db805e8e273eeffd6d0e (diff) |
Merge branch 'master' into arnej/rename-summaryfeatures-back-to-original
Diffstat (limited to 'searchcore')
100 files changed, 801 insertions, 414 deletions
diff --git a/searchcore/src/apps/tests/persistenceconformance_test.cpp b/searchcore/src/apps/tests/persistenceconformance_test.cpp index 214c57557bc..483cc3f2792 100644 --- a/searchcore/src/apps/tests/persistenceconformance_test.cpp +++ b/searchcore/src/apps/tests/persistenceconformance_test.cpp @@ -39,6 +39,7 @@ #include <vespa/searchsummary/config/config-juniperrc.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/util/size_literals.h> +#include <vespa/config/subscription/sourcespec.h> #include <vespa/log/log.h> LOG_SETUP("persistenceconformance_test"); @@ -178,6 +179,12 @@ private: MockSharedThreadingService _shared_service; storage::spi::dummy::DummyBucketExecutor _bucketExecutor; + static std::shared_ptr<ProtonConfig> make_proton_config() { + ProtonConfigBuilder proton_config; + proton_config.indexing.optimize = ProtonConfigBuilder::Indexing::Optimize::LATENCY; + return std::make_shared<ProtonConfig>(proton_config); + } + public: DocumentDBFactory(const vespalib::string &baseDir, int tlsListenPort); ~DocumentDBFactory() override; @@ -193,10 +200,10 @@ public: fileCfg.saveConfig(*snapshot, 1); } config::DirSpec spec(inputCfg + "/config-1"); - TuneFileDocumentDB::SP tuneFileDocDB(new TuneFileDocumentDB()); + auto tuneFileDocDB = std::make_shared<TuneFileDocumentDB>(); DocumentDBConfigHelper mgr(spec, docType.getName()); auto b = std::make_shared<BootstrapConfig>(1, factory.getTypeCfg(), factory.getTypeRepo(), - std::make_shared<ProtonConfig>(), + make_proton_config(), std::make_shared<FiledistributorrpcConfig>(), std::make_shared<BucketspacesConfig>(), tuneFileDocDB, HwInfo()); diff --git a/searchcore/src/apps/verify_ranksetup/verify_ranksetup.cpp b/searchcore/src/apps/verify_ranksetup/verify_ranksetup.cpp index a52c9ec2fb6..5ec1794c1b0 100644 --- a/searchcore/src/apps/verify_ranksetup/verify_ranksetup.cpp +++ b/searchcore/src/apps/verify_ranksetup/verify_ranksetup.cpp @@ -4,7 +4,6 @@ #include <vespa/config-attributes.h> #include <vespa/config-indexschema.h> #include <vespa/config-rank-profiles.h> -#include <vespa/config/config.h> #include <vespa/config/helper/legacy.h> #include <vespa/config/common/configcontext.h> #include <vespa/config/common/exceptions.h> @@ -21,6 +20,7 @@ #include <vespa/searchlib/features/setup.h> #include <vespa/searchlib/fef/fef.h> #include <vespa/searchlib/fef/test/plugin/setup.h> +#include <vespa/config/subscription/configsubscriber.hpp> #include <vespa/fastos/app.h> #include <optional> diff --git a/searchcore/src/apps/vespa-dump-feed/vespa-dump-feed.cpp b/searchcore/src/apps/vespa-dump-feed/vespa-dump-feed.cpp index 9819a1d50af..9f6254f9baa 100644 --- a/searchcore/src/apps/vespa-dump-feed/vespa-dump-feed.cpp +++ b/searchcore/src/apps/vespa-dump-feed/vespa-dump-feed.cpp @@ -1,13 +1,11 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/config/config.h> #include <vespa/config/print/fileconfigwriter.h> #include <vespa/document/config/config-documenttypes.h> #include <vespa/document/document.h> #include <vespa/document/repo/documenttyperepo.h> #include <vespa/documentapi/documentapi.h> #include <vespa/messagebus/destinationsession.h> -#include <vespa/messagebus/protocolset.h> #include <vespa/messagebus/rpcmessagebus.h> #include <vespa/messagebus/network/rpcnetworkparams.h> #include <vespa/vespalib/io/fileutil.h> diff --git a/searchcore/src/apps/vespa-gen-testdocs/vespa-gen-testdocs.cpp b/searchcore/src/apps/vespa-gen-testdocs/vespa-gen-testdocs.cpp index 6e8223399e6..c5c50ff596d 100644 --- a/searchcore/src/apps/vespa-gen-testdocs/vespa-gen-testdocs.cpp +++ b/searchcore/src/apps/vespa-gen-testdocs/vespa-gen-testdocs.cpp @@ -12,13 +12,13 @@ #include <openssl/evp.h> #include <cassert> #include <getopt.h> +#include <vector> #include <vespa/log/log.h> LOG_SETUP("vespa-gen-testdocs"); typedef vespalib::hash_set<vespalib::string> StringSet; typedef std::vector<vespalib::string> StringArray; -typedef std::shared_ptr<StringArray> StringArraySP; using namespace vespalib::alloc; using vespalib::string; @@ -38,10 +38,7 @@ void usageHeader() { using std::cerr; - cerr << - "vespa-gen-testdocs version 0.0\n" - "\n" - "USAGE:\n"; + cerr << "vespa-gen-testdocs version 0.0\n\nUSAGE:\n"; } string @@ -71,8 +68,7 @@ splitArg(const string &arg) } void -shafile(const string &baseDir, - const string &file) +shafile(const string &baseDir, const string &file) { unsigned char digest[EVP_MAX_MD_SIZE]; unsigned int digest_len = 0; @@ -98,7 +94,6 @@ shafile(const string &baseDir, EVP_DigestUpdate(md_ctx.get(), buf.get(), thistime); remainder -= thistime; } - f.Close(); EVP_DigestFinal_ex(md_ctx.get(), &digest[0], &digest_len); assert(digest_len > 0u && digest_len <= EVP_MAX_MD_SIZE); for (unsigned int i = 0; i < digest_len; ++i) { @@ -106,10 +101,7 @@ shafile(const string &baseDir, os.fill('0'); os << std::hex << static_cast<unsigned int>(digest[i]); } - LOG(info, - "SHA256(%s)= %s", - file.c_str(), - os.str().c_str()); + LOG(info, "SHA256(%s)= %s", file.c_str(), os.str().c_str()); } class StringGenerator @@ -119,14 +111,9 @@ class StringGenerator public: StringGenerator(vespalib::Rand48 &rnd); - void - rand_string(string &res, uint32_t minLen, uint32_t maxLen); + void rand_string(string &res, uint32_t minLen, uint32_t maxLen); - void - rand_unique_array(StringArray &res, - uint32_t minLen, - uint32_t maxLen, - uint32_t size); + void rand_unique_array(StringArray &res, uint32_t minLen, uint32_t maxLen, uint32_t size); }; @@ -590,7 +577,8 @@ DocumentGenerator::generate(uint32_t docMin, uint32_t docIdLimit, } } f.Flush(); - f.Close(); + bool close_ok = f.Close(); + assert(close_ok); LOG(info, "Calculating sha256 for %s", feedFileName.c_str()); shafile(baseDir, feedFileName); } diff --git a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp index c1dbe9b2bd2..beb1e0bd6bc 100644 --- a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp +++ b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp @@ -465,7 +465,6 @@ App::usage() "USAGE:\n"; std::cerr << "vespa-redistribute-bm\n" - "[--async-apply-bucket-diff]\n" "[--bucket-db-stripe-bits bits]\n" "[--client-threads threads]\n" "[--distributor-merge-busy-wait distributor-merge-busy-wait]\n" @@ -502,7 +501,6 @@ App::get_options() const char *opt_argument = nullptr; int long_opt_index = 0; static struct option long_opts[] = { - { "async-apply-bucket-diff", 0, nullptr, 0 }, { "bucket-db-stripe-bits", 1, nullptr, 0 }, { "client-threads", 1, nullptr, 0 }, { "distributor-merge-busy-wait", 1, nullptr, 0 }, @@ -533,7 +531,6 @@ App::get_options() { nullptr, 0, nullptr, 0 } }; enum longopts_enum { - LONGOPT_ASYNC_APPLY_BUCKET_DIFF, LONGOPT_BUCKET_DB_STRIPE_BITS, LONGOPT_CLIENT_THREADS, LONGOPT_DISTRIBUTOR_MERGE_BUSY_WAIT, @@ -568,9 +565,6 @@ App::get_options() switch (c) { case 0: switch(long_opt_index) { - case LONGOPT_ASYNC_APPLY_BUCKET_DIFF: - _bm_params.set_async_apply_bucket_diff(true); - break; case LONGOPT_BUCKET_DB_STRIPE_BITS: _bm_params.set_bucket_db_stripe_bits(atoi(opt_argument)); break; diff --git a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp index 1851455e321..b9e3549053a 100644 --- a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp +++ b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp @@ -35,6 +35,7 @@ #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/util/destructor_callbacks.h> +#include <vespa/config/subscription/sourcespec.h> using namespace cloud::config::filedistribution; using namespace document; diff --git a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp index 77f7cf4d8ed..7ba3e0b8240 100644 --- a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp @@ -6,7 +6,6 @@ #include <vespa/document/datatype/documenttype.h> #include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/test/make_bucket_space.h> -#include <vespa/fastos/file.h> #include <vespa/persistence/dummyimpl/dummy_bucket_executor.h> #include <vespa/searchcore/proton/attribute/flushableattribute.h> #include <vespa/searchcore/proton/common/statusreport.h> @@ -35,6 +34,7 @@ #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/testkit/test_kit.h> #include <vespa/vespalib/util/size_literals.h> +#include <vespa/config/subscription/sourcespec.h> #include <iostream> using namespace cloud::config::filedistribution; diff --git a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp index a59b3e9bc6f..fc8bd474813 100644 --- a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp +++ b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp @@ -14,11 +14,11 @@ using ProtonConfigBuilder = vespa::config::search::core::ProtonConfigBuilder; struct Fixture { ProtonConfig cfg; - Fixture(uint32_t baseLineIndexingThreads = 2, uint32_t master_task_limit = 2000, uint32_t task_limit = 500) + Fixture(uint32_t baseLineIndexingThreads = 2, uint32_t master_task_limit = 2000, int32_t task_limit = 500) : cfg(makeConfig(baseLineIndexingThreads, master_task_limit, task_limit)) { } - ProtonConfig makeConfig(uint32_t baseLineIndexingThreads, uint32_t master_task_limit, uint32_t task_limit) { + ProtonConfig makeConfig(uint32_t baseLineIndexingThreads, uint32_t master_task_limit, int32_t task_limit) { ProtonConfigBuilder builder; builder.indexing.threads = baseLineIndexingThreads; builder.indexing.tasklimit = task_limit; @@ -56,6 +56,15 @@ TEST_F("require that task limits are set", Fixture) auto tcfg = f.make(24); EXPECT_EQUAL(2000u, tcfg.master_task_limit()); EXPECT_EQUAL(500u, tcfg.defaultTaskLimit()); + EXPECT_TRUE(tcfg.is_task_limit_hard()); +} + +TEST_F("require that negative task limit makes it soft", Fixture(2, 3000, -700)) +{ + auto tcfg = f.make(24); + EXPECT_EQUAL(3000u, tcfg.master_task_limit()); + EXPECT_EQUAL(700u, tcfg.defaultTaskLimit()); + EXPECT_FALSE(tcfg.is_task_limit_hard()); } namespace { diff --git a/searchcore/src/tests/proton/index/diskindexcleaner_test.cpp b/searchcore/src/tests/proton/index/diskindexcleaner_test.cpp index 6eab2c5bf3d..c814bdf3f37 100644 --- a/searchcore/src/tests/proton/index/diskindexcleaner_test.cpp +++ b/searchcore/src/tests/proton/index/diskindexcleaner_test.cpp @@ -1,7 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. // Unit tests for diskindexcleaner. -#include <vespa/searchcorespi/index/activediskindexes.h> +#include <vespa/searchcorespi/index/disk_indexes.h> #include <vespa/searchcorespi/index/diskindexcleaner.h> #include <vespa/vespalib/testkit/testapp.h> #include <vespa/fastos/file.h> @@ -90,8 +90,8 @@ void createIndexes() { void Test::requireThatAllIndexesOlderThanLastFusionIsRemoved() { createIndexes(); - ActiveDiskIndexes active_indexes; - DiskIndexCleaner::clean(index_dir, active_indexes); + DiskIndexes disk_indexes; + DiskIndexCleaner::clean(index_dir, disk_indexes); vector<string> indexes = readIndexes(); EXPECT_EQUAL(3u, indexes.size()); EXPECT_TRUE(contains(indexes, "index.fusion.2")); @@ -101,17 +101,17 @@ void Test::requireThatAllIndexesOlderThanLastFusionIsRemoved() { void Test::requireThatIndexesInUseAreNotRemoved() { createIndexes(); - ActiveDiskIndexes active_indexes; - active_indexes.setActive(index_dir + "/index.fusion.1"); - active_indexes.setActive(index_dir + "/index.flush.2"); - DiskIndexCleaner::clean(index_dir, active_indexes); + DiskIndexes disk_indexes; + disk_indexes.setActive(index_dir + "/index.fusion.1", 0); + disk_indexes.setActive(index_dir + "/index.flush.2", 0); + DiskIndexCleaner::clean(index_dir, disk_indexes); vector<string> indexes = readIndexes(); EXPECT_TRUE(contains(indexes, "index.fusion.1")); EXPECT_TRUE(contains(indexes, "index.flush.2")); - active_indexes.notActive(index_dir + "/index.fusion.1"); - active_indexes.notActive(index_dir + "/index.flush.2"); - DiskIndexCleaner::clean(index_dir, active_indexes); + disk_indexes.notActive(index_dir + "/index.fusion.1"); + disk_indexes.notActive(index_dir + "/index.flush.2"); + DiskIndexCleaner::clean(index_dir, disk_indexes); indexes = readIndexes(); EXPECT_TRUE(!contains(indexes, "index.fusion.1")); EXPECT_TRUE(!contains(indexes, "index.flush.2")); @@ -120,8 +120,8 @@ void Test::requireThatIndexesInUseAreNotRemoved() { void Test::requireThatInvalidFlushIndexesAreRemoved() { createIndexes(); FastOS_File((index_dir + "/index.flush.4/serial.dat").c_str()).Delete(); - ActiveDiskIndexes active_indexes; - DiskIndexCleaner::clean(index_dir, active_indexes); + DiskIndexes disk_indexes; + DiskIndexCleaner::clean(index_dir, disk_indexes); vector<string> indexes = readIndexes(); EXPECT_EQUAL(2u, indexes.size()); EXPECT_TRUE(contains(indexes, "index.fusion.2")); @@ -131,8 +131,8 @@ void Test::requireThatInvalidFlushIndexesAreRemoved() { void Test::requireThatInvalidFusionIndexesAreRemoved() { createIndexes(); FastOS_File((index_dir + "/index.fusion.2/serial.dat").c_str()).Delete(); - ActiveDiskIndexes active_indexes; - DiskIndexCleaner::clean(index_dir, active_indexes); + DiskIndexes disk_indexes; + DiskIndexCleaner::clean(index_dir, disk_indexes); vector<string> indexes = readIndexes(); EXPECT_EQUAL(4u, indexes.size()); EXPECT_TRUE(contains(indexes, "index.fusion.1")); @@ -144,8 +144,8 @@ void Test::requireThatInvalidFusionIndexesAreRemoved() { void Test::requireThatRemoveDontTouchNewIndexes() { createIndexes(); FastOS_File((index_dir + "/index.flush.4/serial.dat").c_str()).Delete(); - ActiveDiskIndexes active_indexes; - DiskIndexCleaner::removeOldIndexes(index_dir, active_indexes); + DiskIndexes disk_indexes; + DiskIndexCleaner::removeOldIndexes(index_dir, disk_indexes); vector<string> indexes = readIndexes(); EXPECT_EQUAL(3u, indexes.size()); EXPECT_TRUE(contains(indexes, "index.fusion.2")); diff --git a/searchcore/src/tests/proton/persistenceengine/resource_usage_tracker/resource_usage_tracker_test.cpp b/searchcore/src/tests/proton/persistenceengine/resource_usage_tracker/resource_usage_tracker_test.cpp index 313f3e9a270..c0d94ba6376 100644 --- a/searchcore/src/tests/proton/persistenceengine/resource_usage_tracker/resource_usage_tracker_test.cpp +++ b/searchcore/src/tests/proton/persistenceengine/resource_usage_tracker/resource_usage_tracker_test.cpp @@ -57,9 +57,10 @@ public: ~ResourceUsageTrackerTest(); - void notify(double disk_usage, double memory_usage) + void notify(double disk_usage, double memory_usage, double transient_disk_usage = 0.0, double transient_memory_usage = 0.0) { - _notifier.notify(DiskMemUsageState({ 0.8, disk_usage }, { 0.8, memory_usage })); + _notifier.notify(DiskMemUsageState({ 0.8, disk_usage }, { 0.8, memory_usage }, + transient_disk_usage, transient_memory_usage)); } ResourceUsage get_usage() { return _listener->get_usage(); } @@ -77,6 +78,15 @@ TEST_F(ResourceUsageTrackerTest, resource_usage_is_forwarded_to_listener) EXPECT_EQ(ResourceUsage(0.75, 0.25), get_usage()); } +TEST_F(ResourceUsageTrackerTest, transient_resource_usage_is_subtracted_from_absolute_usage) +{ + auto register_guard = _tracker->set_listener(*_listener); + notify(0.8, 0.5, 0.4, 0.2); + EXPECT_EQ(ResourceUsage(0.4, 0.3), get_usage()); + notify(0.8, 0.5, 0.9, 0.6); + EXPECT_EQ(ResourceUsage(0.0, 0.0), get_usage()); +} + TEST_F(ResourceUsageTrackerTest, forwarding_depends_on_register_guard) { auto register_guard = _tracker->set_listener(*_listener); diff --git a/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp b/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp index 6d3eaa30263..3c9e9fc3a64 100644 --- a/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp +++ b/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp @@ -69,7 +69,7 @@ struct ConfigTestFixture { BucketspacesConfigBuilder bucketspacesBuilder; map<std::string, DoctypeFixture::UP> dbConfig; ConfigSet set; - IConfigContext::SP context; + std::shared_ptr<IConfigContext> context; int idcounter; ConfigTestFixture(const std::string & id) diff --git a/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp b/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp index aa819d08b58..85f8e8171a8 100644 --- a/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp +++ b/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp @@ -7,12 +7,12 @@ #include <vespa/config-rank-profiles.h> #include <vespa/config-summary.h> #include <vespa/config-summarymap.h> +#include <vespa/config-bucketspaces.h> #include <vespa/document/config/documenttypes_config_fwd.h> #include <vespa/document/repo/documenttyperepo.h> #include <vespa/fileacquirer/config-filedistributorrpc.h> #include <vespa/searchcore/proton/common/alloc_config.h> #include <vespa/searchcore/proton/server/bootstrapconfig.h> -#include <vespa/searchcore/proton/server/bootstrapconfigmanager.h> #include <vespa/searchcore/proton/server/documentdbconfigmanager.h> #include <vespa/searchcore/proton/server/document_db_config_owner.h> #include <vespa/searchcore/proton/server/proton_config_snapshot.h> @@ -21,14 +21,12 @@ #include <vespa/searchcore/proton/server/i_proton_disk_layout.h> #include <vespa/searchcore/proton/server/threading_service_config.h> #include <vespa/searchsummary/config/config-juniperrc.h> -#include <vespa/searchcore/config/config-ranking-constants.h> -#include <vespa/searchcore/config/config-onnx-models.h> -#include <vespa/vespalib/gtest/gtest.h> #include <vespa/searchcommon/common/schemaconfigurer.h> #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/test/insertion_operators.h> -#include <vespa/config-bucketspaces.h> +#include <vespa/config/subscription/configuri.h> +#include <vespa/vespalib/gtest/gtest.h> using namespace config; using namespace proton; diff --git a/searchcore/src/tests/proton/server/disk_mem_usage_filter/disk_mem_usage_filter_test.cpp b/searchcore/src/tests/proton/server/disk_mem_usage_filter/disk_mem_usage_filter_test.cpp index ce85517ee09..db32c1e77c4 100644 --- a/searchcore/src/tests/proton/server/disk_mem_usage_filter/disk_mem_usage_filter_test.cpp +++ b/searchcore/src/tests/proton/server/disk_mem_usage_filter/disk_mem_usage_filter_test.cpp @@ -121,18 +121,22 @@ TEST_F(DiskMemUsageFilterTest, both_disk_limit_and_memory_limit_can_be_reached) "capacity: 100, used: 90, diskUsed: 0.9, diskLimit: 0.8}}"); } -TEST_F(DiskMemUsageFilterTest, transient_disk_usage_is_tracked_in_usage_state_and_metrics) +TEST_F(DiskMemUsageFilterTest, transient_and_non_transient_disk_usage_tracked_in_usage_state_and_metrics) { - _filter.set_transient_resource_usage({40, 0}); - EXPECT_EQ(0.4, _filter.usageState().transient_disk_usage()); - EXPECT_EQ(0.4, _filter.get_metrics().get_transient_disk_usage()); + _filter.set_transient_resource_usage({15, 0}); + EXPECT_DOUBLE_EQ(0.15, _filter.usageState().transient_disk_usage()); + EXPECT_DOUBLE_EQ(0.15, _filter.get_metrics().transient_disk_usage()); + EXPECT_DOUBLE_EQ(0.05, _filter.usageState().non_transient_disk_usage()); + EXPECT_DOUBLE_EQ(0.05, _filter.get_metrics().non_transient_disk_usage()); } -TEST_F(DiskMemUsageFilterTest, transient_memory_usage_is_tracked_in_usage_state_and_metrics) +TEST_F(DiskMemUsageFilterTest, transient_and_non_transient_memory_usage_tracked_in_usage_state_and_metrics) { - _filter.set_transient_resource_usage({0, 200}); - EXPECT_EQ(0.2, _filter.usageState().transient_memory_usage()); - EXPECT_EQ(0.2, _filter.get_metrics().get_transient_memory_usage()); + _filter.set_transient_resource_usage({0, 100}); + EXPECT_DOUBLE_EQ(0.1, _filter.usageState().transient_memory_usage()); + EXPECT_DOUBLE_EQ(0.1, _filter.get_metrics().transient_memory_usage()); + EXPECT_DOUBLE_EQ(0.2, _filter.usageState().non_transient_memory_usage()); + EXPECT_DOUBLE_EQ(0.2, _filter.get_metrics().non_transient_memory_usage()); } GTEST_MAIN_RUN_ALL_TESTS() diff --git a/searchcore/src/tests/proton/server/disk_mem_usage_metrics/disk_mem_usage_metrics_test.cpp b/searchcore/src/tests/proton/server/disk_mem_usage_metrics/disk_mem_usage_metrics_test.cpp index 60240e06fc4..8ba02875dc0 100644 --- a/searchcore/src/tests/proton/server/disk_mem_usage_metrics/disk_mem_usage_metrics_test.cpp +++ b/searchcore/src/tests/proton/server/disk_mem_usage_metrics/disk_mem_usage_metrics_test.cpp @@ -10,30 +10,46 @@ using proton::DiskMemUsageState; using proton::ResourceUsageState; bool -expect_metrics(double disk_usage, double disk_utilization, double memory_usage, double memory_utilization, const DiskMemUsageMetrics &dm_metrics) +expect_metrics(double disk_usage, double disk_utilization, double transient_disk, double non_transient_disk, + double memory_usage, double memory_utilization, double transient_memory, double non_transient_memory, + const DiskMemUsageMetrics &dm_metrics) { bool result = true; - EXPECT_DOUBLE_EQ(disk_usage, dm_metrics.get_disk_usage()) << (result = false, ""); - EXPECT_DOUBLE_EQ(disk_utilization, dm_metrics.get_disk_utilization()) << (result = false, ""); - EXPECT_DOUBLE_EQ(memory_usage, dm_metrics.get_memory_usage()) << (result = false, ""); - EXPECT_DOUBLE_EQ(memory_utilization, dm_metrics.get_memory_utilization()) << (result = false, ""); + EXPECT_DOUBLE_EQ(disk_usage, dm_metrics.total_disk_usage()) << (result = false, ""); + EXPECT_DOUBLE_EQ(disk_utilization, dm_metrics.total_disk_utilization()) << (result = false, ""); + EXPECT_DOUBLE_EQ(transient_disk, dm_metrics.transient_disk_usage()) << (result = false, ""); + EXPECT_DOUBLE_EQ(non_transient_disk, dm_metrics.non_transient_disk_usage()) << (result = false, ""); + EXPECT_DOUBLE_EQ(memory_usage, dm_metrics.total_memory_usage()) << (result = false, ""); + EXPECT_DOUBLE_EQ(memory_utilization, dm_metrics.total_memory_utilization()) << (result = false, ""); + EXPECT_DOUBLE_EQ(transient_memory, dm_metrics.transient_memory_usage()) << (result = false, ""); + EXPECT_DOUBLE_EQ(non_transient_memory, dm_metrics.non_transient_memory_usage()) << (result = false, ""); return result; } TEST(DiskMemUsageMetricsTest, default_value_is_zero) { DiskMemUsageMetrics dm_metrics; - EXPECT_TRUE(expect_metrics(0.0, 0.0, 0.0, 0.0, dm_metrics)); + EXPECT_TRUE(expect_metrics(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, dm_metrics)); } TEST(DiskMemUsageMetricsTest, merging_uses_max) { - DiskMemUsageMetrics dm_metrics({ResourceUsageState(0.5, 0.4), ResourceUsageState(0.5, 0.3)}); - EXPECT_TRUE(expect_metrics(0.4, 0.8, 0.3, 0.6, dm_metrics)); - dm_metrics.merge({ResourceUsageState(0.4, 0.4), ResourceUsageState(0.5, 0.4)}); - EXPECT_TRUE(expect_metrics(0.4, 1.0, 0.4, 0.8, dm_metrics)); - dm_metrics.merge({ResourceUsageState(0.5, 0.4), ResourceUsageState(0.5, 0.3)}); - EXPECT_TRUE(expect_metrics(0.4, 1.0, 0.4, 0.8, dm_metrics)); + DiskMemUsageMetrics dm_metrics({ResourceUsageState(0.5, 0.4), + ResourceUsageState(0.5, 0.3), 0.1, 0.05}); + EXPECT_TRUE(expect_metrics(0.4, 0.8, 0.1, 0.3, + 0.3, 0.6, 0.05, 0.25, dm_metrics)); + dm_metrics.merge({ResourceUsageState(0.4, 0.4), + ResourceUsageState(0.3, 0.3), 0.1, 0.05}); + EXPECT_TRUE(expect_metrics(0.4, 1.0, 0.1, 0.3, + 0.3, 1.0, 0.05, 0.25, dm_metrics)); + dm_metrics.merge({ResourceUsageState(0.5, 0.45), + ResourceUsageState(0.5, 0.35), 0.1, 0.05}); + EXPECT_TRUE(expect_metrics(0.45, 1.0, 0.1, 0.35, + 0.35, 1.0, 0.05, 0.3, dm_metrics)); + dm_metrics.merge({ResourceUsageState(0.5, 0.4), + ResourceUsageState(0.5, 0.3), 0.15, 0.1}); + EXPECT_TRUE(expect_metrics(0.45, 1.0, 0.15, 0.35, + 0.35, 1.0, 0.10, 0.3, dm_metrics)); } GTEST_MAIN_RUN_ALL_TESTS() diff --git a/searchcore/src/tests/proton/server/feedstates_test.cpp b/searchcore/src/tests/proton/server/feedstates_test.cpp index bb0c4eeb282..e01d90f5eed 100644 --- a/searchcore/src/tests/proton/server/feedstates_test.cpp +++ b/searchcore/src/tests/proton/server/feedstates_test.cpp @@ -8,6 +8,7 @@ #include <vespa/searchcore/proton/server/feedstates.h> #include <vespa/searchcore/proton/server/ireplayconfig.h> #include <vespa/searchcore/proton/server/memoryconfigstore.h> +#include <vespa/searchcore/proton/server/replay_throttling_policy.h> #include <vespa/searchcore/proton/feedoperation/removeoperation.h> #include <vespa/searchcore/proton/test/dummy_feed_view.h> #include <vespa/searchlib/common/serialnum.h> @@ -71,6 +72,7 @@ struct Fixture MemoryConfigStore config_store; bucketdb::BucketDBOwner _bucketDB; bucketdb::BucketDBHandler _bucketDBHandler; + ReplayThrottlingPolicy _replay_throttling_policy; MyIncSerialNum _inc_serial_num; ReplayTransactionLogState state; @@ -86,8 +88,9 @@ Fixture::Fixture() config_store(), _bucketDB(), _bucketDBHandler(_bucketDB), + _replay_throttling_policy({}), _inc_serial_num(9u), - state("doctypename", feed_view_ptr, _bucketDBHandler, replay_config, config_store, _inc_serial_num) + state("doctypename", feed_view_ptr, _bucketDBHandler, replay_config, config_store, _replay_throttling_policy, _inc_serial_num) { } Fixture::~Fixture() = default; diff --git a/searchcore/src/tests/proton/server/memory_flush_config_updater/memory_flush_config_updater_test.cpp b/searchcore/src/tests/proton/server/memory_flush_config_updater/memory_flush_config_updater_test.cpp index 7ff59a9f41a..d544f1cb1ab 100644 --- a/searchcore/src/tests/proton/server/memory_flush_config_updater/memory_flush_config_updater_test.cpp +++ b/searchcore/src/tests/proton/server/memory_flush_config_updater/memory_flush_config_updater_test.cpp @@ -11,6 +11,7 @@ ProtonConfig::Flush::Memory getConfig(int64_t maxMemory, int64_t eachMaxMemory, int64_t maxTlsSize, double conservativeMemoryLimitFactor = 0.5, double conservativeDiskLimitFactor = 0.6, + double high_watermark_factor = 0.9, double lowWatermarkFactor = 0.8) { ProtonConfig::Flush::Memory result; @@ -19,6 +20,7 @@ getConfig(int64_t maxMemory, int64_t eachMaxMemory, int64_t maxTlsSize, result.maxtlssize = maxTlsSize; result.conservative.memorylimitfactor = conservativeMemoryLimitFactor; result.conservative.disklimitfactor = conservativeDiskLimitFactor; + result.conservative.highwatermarkfactor = high_watermark_factor; result.conservative.lowwatermarkfactor = lowWatermarkFactor; return result; } @@ -32,14 +34,16 @@ getDefaultConfig() ResourceUsageState aboveLimit() { - return ResourceUsageState(0.7, 0.8); + // The high watermark limit is 0.63 (0.7 * 0.9 (factor)). + return ResourceUsageState(0.7, 0.64); } ResourceUsageState belowLimit() { - // This is still over default low watermark of 0.56 (0.7 * 0.8) - return ResourceUsageState(0.7, 0.6); + // The high watermark limit is 0.63 (0.7 * 0.9 (factor)). + // This is still over the low watermark limit of 0.56 (0.7 * 0.8 (factor)). + return ResourceUsageState(0.7, 0.62); } const HwInfo::Memory defaultMemory(8_Gi); @@ -175,6 +179,24 @@ TEST_F("require that last config if remembered when setting new disk/memory usag TEST_DO(f.assertStrategyConfig(6, 3, 18)); } +TEST_F("Use conservative settings when above high watermark for disk usage", Fixture) +{ + // The high watermark limit is 0.63 (0.7 * 0.9 (factor)). + f.notifyDiskMemUsage(ResourceUsageState(0.7, 0.62), belowLimit()); + TEST_DO(f.assertStrategyConfig(4, 1, 20)); + f.notifyDiskMemUsage(ResourceUsageState(0.7, 0.64), belowLimit()); + TEST_DO(f.assertStrategyConfig(4, 1, 12)); +} + +TEST_F("Use conservative settings when above high watermark for memory usage", Fixture) +{ + // The high watermark limit is 0.54 (0.6 * 0.9 (factor)). + f.notifyDiskMemUsage(belowLimit(), ResourceUsageState(0.6, 0.53)); + TEST_DO(f.assertStrategyConfig(4, 1, 20)); + f.notifyDiskMemUsage(belowLimit(), ResourceUsageState(0.6, 0.55)); + TEST_DO(f.assertStrategyConfig(2, 0, 20)); +} + TEST_F("require that we must go below low watermark for disk usage before using normal tls size value again", Fixture) { f.notifyDiskMemUsage(ResourceUsageState(0.7, 0.8), belowLimit()); diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp index 4a3466f1a51..3ff10b19164 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp @@ -6,8 +6,7 @@ namespace search::bmcluster { BmClusterParams::BmClusterParams() - : _async_apply_bucket_diff(), - _bucket_db_stripe_bits(4), + : _bucket_db_stripe_bits(4), _disable_queue_limits_for_chained_merges(false), // Same default as in stor-server.def _distributor_merge_busy_wait(10), // Same default as stor_distributormanager.def _distributor_stripes(0), diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h index 36b4c22f6a8..d365a28b0b6 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h @@ -13,7 +13,6 @@ namespace search::bmcluster { */ class BmClusterParams { - std::optional<bool> _async_apply_bucket_diff; uint32_t _bucket_db_stripe_bits; bool _disable_queue_limits_for_chained_merges; uint32_t _distributor_merge_busy_wait; @@ -45,7 +44,6 @@ class BmClusterParams public: BmClusterParams(); ~BmClusterParams(); - const std::optional<bool>& get_async_apply_bucket_diff() const noexcept { return _async_apply_bucket_diff; } uint32_t get_bucket_db_stripe_bits() const { return _bucket_db_stripe_bits; } bool get_disable_queue_limits_for_chained_merges() const noexcept { return _disable_queue_limits_for_chained_merges; } uint32_t get_distributor_merge_busy_wait() const { return _distributor_merge_busy_wait; } @@ -75,7 +73,6 @@ public: bool needs_distributor() const { return _enable_distributor || _use_document_api; } bool needs_message_bus() const { return _use_message_bus || _use_document_api; } bool needs_service_layer() const { return _enable_service_layer || _enable_distributor || _use_storage_chain || _use_message_bus || _use_document_api; } - void set_async_apply_bucket_diff(bool value) { _async_apply_bucket_diff = value; } void set_bucket_db_stripe_bits(uint32_t value) { _bucket_db_stripe_bits = value; } void set_disable_queue_limits_for_chained_merges(bool value) { _disable_queue_limits_for_chained_merges = value; } void set_distributor_merge_busy_wait(uint32_t value) { _distributor_merge_busy_wait = value; } diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp index db2060bacf7..3587e8008f2 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp @@ -382,9 +382,6 @@ struct ServiceLayerConfigSet : public StorageConfigSet stor_bucket_init(), stor_visitor() { - if (params.get_async_apply_bucket_diff().has_value()) { - stor_filestor.asyncApplyBucketDiff = params.get_async_apply_bucket_diff().value(); - } stor_filestor.numResponseThreads = params.get_response_threads(); stor_filestor.numNetworkThreads = params.get_rpc_network_threads(); stor_filestor.useAsyncMessageHandlingOnSchedule = params.get_use_async_message_handling_on_schedule(); diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def index 34530afe9f9..808535924f1 100644 --- a/searchcore/src/vespa/searchcore/config/proton.def +++ b/searchcore/src/vespa/searchcore/config/proton.def @@ -81,6 +81,10 @@ flush.memory.conservative.memorylimitfactor double default=0.5 ## In this case this factor is multiplied with 'maxtlssize' to calculate a conservative value to use instead. flush.memory.conservative.disklimitfactor double default=0.5 +## The factor used to multiply with the resource limits for disk / memory to find the high +## watermark indicating when to from normal into conservative mode for the flush strategy. +flush.memory.conservative.highwatermarkfactor double default=0.95 + ## The factor used to multiply with the resource limits for disk / memory to find the low ## watermark indicating when to go back from conservative to normal mode for the flush strategy. flush.memory.conservative.lowwatermarkfactor double default=0.9 @@ -442,7 +446,7 @@ initialize.threads int default = 0 ## Portion of max address space used in components in attribute vectors ## before put and update operations in feed is blocked. -writefilter.attribute.address_space_limit double default = 0.9 +writefilter.attribute.address_space_limit double default = 0.92 ## Portion of physical memory that can be resident memory in anonymous mapping ## by the proton process before put and update portion of feed is blocked. @@ -543,3 +547,11 @@ tensor_implementation enum {TENSOR_ENGINE, FAST_VALUE} default = FAST_VALUE ## Whether to report issues back to the container via protobuf field forward_issues bool default = true + +## Chooses the throttling policy used to control the window size +## of the SharedOperationThrottler component used by the transaction log replay feed state. +replay_throttling_policy.type enum { UNLIMITED, DYNAMIC } default=UNLIMITED +## Only used if replay_throttling_policy.type == DYNAMIC: +replay_throttling_policy.min_window_size int default=100 +replay_throttling_policy.max_window_size int default=10000 +replay_throttling_policy.window_size_increment int default=20 diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.cpp index 66be0737fe9..713582bf1ce 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.cpp @@ -193,7 +193,7 @@ AttributeInitializer::loadAttribute(const AttributeVectorSP &attr, assert(attr->hasLoadData()); vespalib::Timer timer; EventLogger::loadAttributeStart(_documentSubDbName, attr->getName()); - if (!attr->load(&_executor)) { + if (!attr->load(&_shared_executor)) { LOG(warning, "Could not load attribute vector '%s' from disk. Returning empty attribute vector", attr->getBaseFileName().c_str()); return false; @@ -235,13 +235,13 @@ AttributeInitializer::AttributeInitializer(const std::shared_ptr<AttributeDirect const AttributeSpec &spec, uint64_t currentSerialNum, const IAttributeFactory &factory, - vespalib::Executor & executor) + vespalib::Executor& shared_executor) : _attrDir(attrDir), _documentSubDbName(documentSubDbName), _spec(spec), _currentSerialNum(currentSerialNum), _factory(factory), - _executor(executor), + _shared_executor(shared_executor), _header(), _header_ok(false) { diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.h b/searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.h index 7b5d968353a..78a798c929e 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.h +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.h @@ -30,7 +30,7 @@ private: const AttributeSpec _spec; const uint64_t _currentSerialNum; const IAttributeFactory &_factory; - vespalib::Executor &_executor; + vespalib::Executor &_shared_executor; std::unique_ptr<const search::attribute::AttributeHeader> _header; bool _header_ok; @@ -48,7 +48,7 @@ private: public: AttributeInitializer(const std::shared_ptr<AttributeDirectory> &attrDir, const vespalib::string &documentSubDbName, const AttributeSpec &spec, uint64_t currentSerialNum, const IAttributeFactory &factory, - vespalib::Executor & executor); + vespalib::Executor& shared_executor); ~AttributeInitializer(); AttributeInitializerResult init() const; diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp index 52b367fd14b..f0a23f1038e 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp @@ -14,10 +14,10 @@ #include <vespa/searchlib/attribute/imported_attribute_vector.h> #include <vespa/searchlib/tensor/prepare_result.h> #include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/vespalib/util/cpu_usage.h> #include <vespa/vespalib/util/destructor_callbacks.h> #include <vespa/vespalib/util/gate.h> #include <vespa/vespalib/util/idestructorcallback.h> -#include <vespa/vespalib/util/threadexecutor.h> #include <future> #include <vespa/log/log.h> @@ -29,6 +29,7 @@ using namespace search; using ExecutorId = vespalib::ISequencedTaskExecutor::ExecutorId; using search::attribute::ImportedAttributeVector; using search::tensor::PrepareResult; +using vespalib::CpuUsage; using vespalib::GateCallback; using vespalib::ISequencedTaskExecutor; @@ -631,7 +632,7 @@ AttributeWriter::internalPut(SerialNum serialNum, const Document &doc, DocumentI wc.consider_build_field_paths(doc); auto prepare_task = std::make_unique<PreparePutTask>(serialNum, lid, wc, doc); auto complete_task = std::make_unique<CompletePutTask>(*prepare_task, onWriteDone); - _shared_executor.execute(std::move(prepare_task)); + _shared_executor.execute(CpuUsage::wrap(std::move(prepare_task), CpuUsage::Category::WRITE)); _attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(complete_task)); } else { if (allAttributes || wc.hasStructFieldAttribute()) { @@ -781,7 +782,7 @@ AttributeWriter::update(SerialNum serialNum, const DocumentUpdate &upd, Document auto complete_task = std::make_unique<CompletePutTask>(*prepare_task, onWriteDone); LOG(debug, "About to handle assign update as two phase put for docid %u in attribute vector '%s'", lid, attrp->getName().c_str()); - _shared_executor.execute(std::move(prepare_task)); + _shared_executor.execute(CpuUsage::wrap(std::move(prepare_task), CpuUsage::Category::WRITE)); _attributeFieldWriter.executeTask(itr->second.executor_id, std::move(complete_task)); } else { args[itr->second.executor_id.getId()]->_updates.emplace_back(attrp, &fupd); diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h index f43aab0f385..a5907233c4b 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h @@ -25,7 +25,7 @@ private: using FieldValue = document::FieldValue; const IAttributeManager::SP _mgr; vespalib::ISequencedTaskExecutor &_attributeFieldWriter; - vespalib::ThreadExecutor& _shared_executor; + vespalib::Executor& _shared_executor; using ExecutorId = vespalib::ISequencedTaskExecutor::ExecutorId; public: /** diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp index eee6264b9f4..ef7e422c722 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp @@ -240,7 +240,7 @@ AttributeManager::AttributeManager(const vespalib::string &baseDir, const TuneFileAttributes &tuneFileAttributes, const FileHeaderContext &fileHeaderContext, vespalib::ISequencedTaskExecutor &attributeFieldWriter, - vespalib::ThreadExecutor& shared_executor, + vespalib::Executor& shared_executor, const HwInfo &hwInfo) : proton::IAttributeManager(), _attributes(), @@ -264,7 +264,7 @@ AttributeManager::AttributeManager(const vespalib::string &baseDir, const search::TuneFileAttributes &tuneFileAttributes, const search::common::FileHeaderContext &fileHeaderContext, vespalib::ISequencedTaskExecutor &attributeFieldWriter, - vespalib::ThreadExecutor& shared_executor, + vespalib::Executor& shared_executor, const IAttributeFactory::SP &factory, const HwInfo &hwInfo) : proton::IAttributeManager(), @@ -558,12 +558,6 @@ AttributeManager::getAttributeFieldWriter() const return _attributeFieldWriter; } -vespalib::ThreadExecutor& -AttributeManager::get_shared_executor() const -{ - return _shared_executor; -} - AttributeVector * AttributeManager::getWritableAttribute(const vespalib::string &name) const { diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.h b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.h index 08e2d511d70..beea59b5350 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.h +++ b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.h @@ -31,14 +31,14 @@ class ShrinkLidSpaceFlushTarget; class AttributeManager : public proton::IAttributeManager { private: - typedef search::attribute::Config Config; - typedef search::SerialNum SerialNum; - typedef AttributeCollectionSpec Spec; + using AttributeReadGuard = search::attribute::AttributeReadGuard; + using AttributeVectorSP = std::shared_ptr<search::AttributeVector>; + using Config = search::attribute::Config; using FlushableAttributeSP = std::shared_ptr<FlushableAttribute>; - using ShrinkerSP = std::shared_ptr<ShrinkLidSpaceFlushTarget>; using IFlushTargetSP = std::shared_ptr<searchcorespi::IFlushTarget>; - using AttributeVectorSP = std::shared_ptr<search::AttributeVector>; - using AttributeReadGuard = search::attribute::AttributeReadGuard; + using SerialNum = search::SerialNum; + using ShrinkerSP = std::shared_ptr<ShrinkLidSpaceFlushTarget>; + using Spec = AttributeCollectionSpec; class AttributeWrap { @@ -67,8 +67,8 @@ private: const ShrinkerSP &getShrinker() const { return _shrinker; } }; - typedef vespalib::hash_map<vespalib::string, AttributeWrap> AttributeMap; - typedef vespalib::hash_map<vespalib::string, FlushableWrap> FlushableMap; + using AttributeMap = vespalib::hash_map<vespalib::string, AttributeWrap>; + using FlushableMap = vespalib::hash_map<vespalib::string, FlushableWrap>; AttributeMap _attributes; FlushableMap _flushables; @@ -80,7 +80,7 @@ private: IAttributeFactory::SP _factory; std::shared_ptr<search::attribute::Interlock> _interlock; vespalib::ISequencedTaskExecutor &_attributeFieldWriter; - vespalib::ThreadExecutor& _shared_executor; + vespalib::Executor& _shared_executor; HwInfo _hwInfo; std::unique_ptr<ImportedAttributesRepo> _importedAttributes; @@ -100,14 +100,14 @@ private: void transferExtraAttributes(const AttributeManager &currMgr); public: - typedef std::shared_ptr<AttributeManager> SP; + using SP = std::shared_ptr<AttributeManager>; AttributeManager(const vespalib::string &baseDir, const vespalib::string &documentSubDbName, const search::TuneFileAttributes &tuneFileAttributes, const search::common::FileHeaderContext & fileHeaderContext, vespalib::ISequencedTaskExecutor &attributeFieldWriter, - vespalib::ThreadExecutor& shared_executor, + vespalib::Executor& shared_executor, const HwInfo &hwInfo); AttributeManager(const vespalib::string &baseDir, @@ -115,7 +115,7 @@ public: const search::TuneFileAttributes &tuneFileAttributes, const search::common::FileHeaderContext & fileHeaderContext, vespalib::ISequencedTaskExecutor &attributeFieldWriter, - vespalib::ThreadExecutor& shared_executor, + vespalib::Executor& shared_executor, const IAttributeFactory::SP &factory, const HwInfo &hwInfo); @@ -171,7 +171,7 @@ public: vespalib::ISequencedTaskExecutor &getAttributeFieldWriter() const override; - vespalib::ThreadExecutor& get_shared_executor() const override; + vespalib::Executor& get_shared_executor() const override { return _shared_executor; } search::AttributeVector *getWritableAttribute(const vespalib::string &name) const override; diff --git a/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp b/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp index 5f162281d96..d0caf92be17 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp @@ -167,12 +167,6 @@ FilterAttributeManager::getAttributeFieldWriter() const return _mgr->getAttributeFieldWriter(); } -vespalib::ThreadExecutor& -FilterAttributeManager::get_shared_executor() const -{ - return _mgr->get_shared_executor(); -} - search::AttributeVector * FilterAttributeManager::getWritableAttribute(const vespalib::string &name) const { diff --git a/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.h b/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.h index 1512ab32d62..e291aca6922 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.h +++ b/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.h @@ -20,15 +20,14 @@ public: typedef std::set<vespalib::string> AttributeSet; private: - AttributeSet _acceptedAttributes; - IAttributeManager::SP _mgr; + AttributeSet _acceptedAttributes; + IAttributeManager::SP _mgr; std::vector<search::AttributeVector *> _acceptedWritableAttributes; bool acceptAttribute(const vespalib::string &name) const; public: - FilterAttributeManager(const AttributeSet &acceptedAttributes, - IAttributeManager::SP mgr); + FilterAttributeManager(const AttributeSet &acceptedAttributes, IAttributeManager::SP mgr); ~FilterAttributeManager() override; // Implements search::IAttributeManager @@ -47,7 +46,7 @@ public: void pruneRemovedFields(search::SerialNum serialNum) override; const IAttributeFactory::SP &getFactory() const override; vespalib::ISequencedTaskExecutor & getAttributeFieldWriter() const override; - vespalib::ThreadExecutor& get_shared_executor() const override; + vespalib::Executor& get_shared_executor() const override { return _mgr->get_shared_executor(); } search::AttributeVector * getWritableAttribute(const vespalib::string &name) const override; const std::vector<search::AttributeVector *> & getWritableAttributes() const override; diff --git a/searchcore/src/vespa/searchcore/proton/attribute/i_attribute_manager.h b/searchcore/src/vespa/searchcore/proton/attribute/i_attribute_manager.h index b8968ba9d2e..d32052fe4fa 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/i_attribute_manager.h +++ b/searchcore/src/vespa/searchcore/proton/attribute/i_attribute_manager.h @@ -14,7 +14,7 @@ namespace search::attribute { class IAttributeFunctor; } namespace vespalib { class ISequencedTaskExecutor; - class ThreadExecutor; + class Executor; class IDestructorCallback; } @@ -76,7 +76,7 @@ struct IAttributeManager : public search::IAttributeManager virtual vespalib::ISequencedTaskExecutor &getAttributeFieldWriter() const = 0; - virtual vespalib::ThreadExecutor& get_shared_executor() const = 0; + virtual vespalib::Executor& get_shared_executor() const = 0; /* * Get pointer to named writable attribute. If attribute isn't diff --git a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt index 421e602a9cf..03bad4c7eaf 100644 --- a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt @@ -18,6 +18,7 @@ vespa_add_library(searchcore_pcommon STATIC ipendinglidtracker.cpp operation_rate_tracker.cpp pendinglidtracker.cpp + replay_feedtoken_state.cpp select_utils.cpp selectcontext.cpp selectpruner.cpp diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp index 675dce10be4..c74819577e9 100644 --- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp @@ -33,6 +33,12 @@ State::setResult(ResultUP result, bool documentWasFound) { _result = std::move(result); } +bool +State::is_replay() const noexcept +{ + return false; +} + void State::fail() { diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h index 8ccb4863878..53f551999d2 100644 --- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h +++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h @@ -22,19 +22,33 @@ public: virtual void send(ResultUP result, bool documentWasFound) = 0; }; + +/* + * Interface class for feed token state. + */ +class IState : public vespalib::IDestructorCallback { +public: + virtual bool is_replay() const noexcept = 0; + virtual void fail() = 0; + virtual void setResult(ResultUP result, bool documentWasFound) = 0; + virtual const storage::spi::Result &getResult() = 0; +}; + + /** * This holds the result of the feed operation until it is either failed or acked. * Guarantees that the result is propagated back to the invoker via ITransport interface. */ -class State : public vespalib::IDestructorCallback { +class State : public IState { public: State(const State &) = delete; State & operator = (const State &) = delete; State(ITransport & transport); ~State() override; - void fail(); - void setResult(ResultUP result, bool documentWasFound); - const storage::spi::Result &getResult() { return *_result; } + bool is_replay() const noexcept override; + void fail() override; + void setResult(ResultUP result, bool documentWasFound) override; + const storage::spi::Result &getResult() override { return *_result; } protected: void ack(); private: @@ -71,7 +85,7 @@ make(std::shared_ptr<ITransport> transport) { } -using FeedToken = std::shared_ptr<feedtoken::State>; +using FeedToken = std::shared_ptr<feedtoken::IState>; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/common/hw_info_sampler.cpp b/searchcore/src/vespa/searchcore/proton/common/hw_info_sampler.cpp index 2f0db6083c7..6d69c884c90 100644 --- a/searchcore/src/vespa/searchcore/proton/common/hw_info_sampler.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/hw_info_sampler.cpp @@ -1,15 +1,17 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "hw_info_sampler.h" -#include <vespa/config/config.h> #include <vespa/config/print/fileconfigwriter.h> +#include <vespa/config/subscription/configsubscriber.hpp> #include <vespa/fastos/file.h> #include <vespa/searchcore/config/config-hwinfo.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/util/time.h> #include <vespa/vespalib/util/size_literals.h> +#include <vespa/vespalib/util/alloc.h> #include <filesystem> #include <thread> + #include <vespa/log/log.h> LOG_SETUP(".proton.common.hw_info_sampler"); @@ -55,7 +57,8 @@ sampleCpuCores(const HwInfoSampler::Config &cfg) return std::thread::hardware_concurrency(); } -std::unique_ptr<HwinfoConfig> readConfig(const vespalib::string &path) { +std::unique_ptr<HwinfoConfig> +readConfig(const vespalib::string &path) { FileSpec spec(path + "/" + "hwinfo.cfg"); ConfigSubscriber s(spec); std::unique_ptr<ConfigHandle<HwinfoConfig>> handle = s.subscribe<HwinfoConfig>("hwinfo"); @@ -79,29 +82,31 @@ void writeConfig(const vespalib::string &path, double measureDiskWriteSpeed(const vespalib::string &path, size_t diskWriteLen) { - FastOS_File testFile; vespalib::string fileName = path + "/hwinfo-writespeed"; size_t bufferLen = 1_Mi; Alloc buffer(Alloc::allocMMap(bufferLen)); memset(buffer.get(), 0, buffer.size()); - testFile.EnableDirectIO(); - testFile.OpenWriteOnlyTruncate(fileName.c_str()); - sync(); - sleep(1); - sync(); - sleep(1); - Clock::time_point before = Clock::now(); - size_t residue = diskWriteLen; - while (residue > 0) { - size_t writeNow = std::min(residue, bufferLen); - testFile.WriteBuf(buffer.get(), writeNow); - residue -= writeNow; + double diskWriteSpeed; + { + FastOS_File testFile; + testFile.EnableDirectIO(); + testFile.OpenWriteOnlyTruncate(fileName.c_str()); + sync(); + sleep(1); + sync(); + sleep(1); + Clock::time_point before = Clock::now(); + size_t residue = diskWriteLen; + while (residue > 0) { + size_t writeNow = std::min(residue, bufferLen); + testFile.WriteBuf(buffer.get(), writeNow); + residue -= writeNow; + } + Clock::time_point after = Clock::now(); + double elapsed = vespalib::to_s(after - before); + diskWriteSpeed = diskWriteLen / elapsed / 1_Mi; } - Clock::time_point after = Clock::now(); - testFile.Close(); vespalib::unlink(fileName); - double elapsed = vespalib::to_s(after - before); - double diskWriteSpeed = diskWriteLen / elapsed / 1_Mi; return diskWriteSpeed; } diff --git a/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.cpp b/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.cpp new file mode 100644 index 00000000000..a3a473c9548 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.cpp @@ -0,0 +1,37 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "replay_feedtoken_state.h" + +namespace proton::feedtoken { + +ReplayState::ReplayState(vespalib::SharedOperationThrottler::Token throttler_token) + : IState(), + _throttler_token(std::move(throttler_token)) +{ +} + +ReplayState::~ReplayState() = default; + +bool +ReplayState::is_replay() const noexcept +{ + return true; +} + +void +ReplayState::fail() +{ +} + +void +ReplayState::setResult(ResultUP, bool) +{ +} + +const storage::spi::Result& +ReplayState::getResult() +{ + abort(); +} + +} diff --git a/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.h b/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.h new file mode 100644 index 00000000000..512f12a50af --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.h @@ -0,0 +1,26 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "feedtoken.h" +#include <vespa/vespalib/util/shared_operation_throttler.h> + +namespace proton::feedtoken { + +/* + * Feed token state used during replay. It contains a throttler token + * which allows the related shared operation throttler to track the completion + * of the feed operation. + */ +class ReplayState : public IState { + vespalib::SharedOperationThrottler::Token _throttler_token; +public: + ~ReplayState() override; + ReplayState(vespalib::SharedOperationThrottler::Token throttler_token); + bool is_replay() const noexcept override; + void fail() override; + void setResult(ResultUP result, bool documentWasFound) override; + const storage::spi::Result &getResult() override; +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp index 28a91e1444d..a04f5bfa651 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp @@ -145,7 +145,7 @@ SummaryManager::createSummarySetup(const SummaryConfig & summaryCfg, const Summa juniperCfg, attributeMgr, _docStore, repo); } -SummaryManager::SummaryManager(vespalib::ThreadExecutor & executor, const LogDocumentStore::Config & storeConfig, +SummaryManager::SummaryManager(vespalib::Executor &shared_executor, const LogDocumentStore::Config & storeConfig, const search::GrowStrategy & growStrategy, const vespalib::string &baseDir, const DocTypeName &docTypeName, const TuneFileSummary &tuneFileSummary, const FileHeaderContext &fileHeaderContext, search::transactionlog::SyncProxy &tlSyncer, @@ -154,7 +154,7 @@ SummaryManager::SummaryManager(vespalib::ThreadExecutor & executor, const LogDoc _docTypeName(docTypeName), _docStore() { - _docStore = std::make_shared<LogDocumentStore>(executor, baseDir, storeConfig, growStrategy, tuneFileSummary, + _docStore = std::make_shared<LogDocumentStore>(shared_executor, baseDir, storeConfig, growStrategy, tuneFileSummary, fileHeaderContext, tlSyncer, std::move(bucketizer)); } diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h index b3cbd399262..ba55761d091 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h @@ -10,7 +10,6 @@ #include <vespa/searchlib/docstore/logdocumentstore.h> #include <vespa/searchlib/transactionlog/syncproxy.h> #include <vespa/document/fieldvalue/document.h> -#include <vespa/vespalib/util/threadexecutor.h> namespace search { class IBucketizer; } namespace search::common { class FileHeaderContext; } @@ -60,7 +59,7 @@ private: public: typedef std::shared_ptr<SummaryManager> SP; - SummaryManager(vespalib::ThreadExecutor & executor, + SummaryManager(vespalib::Executor &shared_executor, const search::LogDocumentStore::Config & summary, const search::GrowStrategy & growStrategy, const vespalib::string &baseDir, diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.cpp index 21506c3014f..568fa740d6f 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.cpp +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.cpp @@ -11,7 +11,7 @@ SummaryManagerInitializer(const search::GrowStrategy &grow, const vespalib::string & baseDir, const vespalib::string &subDbName, const DocTypeName &docTypeName, - vespalib::ThreadExecutor &summaryExecutor, + vespalib::Executor &shared_executor, const search::LogDocumentStore::Config & storeCfg, const search::TuneFileSummary &tuneFile, const search::common::FileHeaderContext &fileHeaderContext, @@ -23,7 +23,7 @@ SummaryManagerInitializer(const search::GrowStrategy &grow, _baseDir(baseDir), _subDbName(subDbName), _docTypeName(docTypeName), - _summaryExecutor(summaryExecutor), + _shared_executor(shared_executor), _storeCfg(storeCfg), _tuneFile(tuneFile), _fileHeaderContext(fileHeaderContext), @@ -41,7 +41,7 @@ SummaryManagerInitializer::run() vespalib::Timer timer; EventLogger::loadDocumentStoreStart(_subDbName); *_result = std::make_shared<SummaryManager> - (_summaryExecutor, _storeCfg, _grow, _baseDir, _docTypeName, + (_shared_executor, _storeCfg, _grow, _baseDir, _docTypeName, _tuneFile, _fileHeaderContext, _tlSyncer, _bucketizer); EventLogger::loadDocumentStoreComplete(_subDbName, timer.elapsed()); } diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.h b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.h index 7075560ed56..318edc425a7 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.h +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.h @@ -5,7 +5,6 @@ #include "summarymanager.h" #include <vespa/searchcore/proton/initializer/initializer_task.h> #include <vespa/searchcommon/common/growstrategy.h> -#include <vespa/vespalib/stllike/string.h> namespace proton { @@ -20,7 +19,7 @@ class SummaryManagerInitializer : public initializer::InitializerTask const vespalib::string _baseDir; const vespalib::string _subDbName; const DocTypeName _docTypeName; - vespalib::ThreadExecutor &_summaryExecutor; + vespalib::Executor &_shared_executor; const search::LogDocumentStore::Config _storeCfg; const search::TuneFileSummary _tuneFile; const search::common::FileHeaderContext &_fileHeaderContext; @@ -36,7 +35,7 @@ public: const vespalib::string & baseDir, const vespalib::string &subDbName, const DocTypeName &docTypeName, - vespalib::ThreadExecutor & summaryExecutor, + vespalib::Executor &shared_executor, const search::LogDocumentStore::Config & storeCfg, const search::TuneFileSummary &tuneFile, const search::common::FileHeaderContext & fileHeaderContext, diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp index 2b76faa8d7f..011d97d4609 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp @@ -4,20 +4,21 @@ #include "flush_all_strategy.h" #include "flushengine.h" #include "flushtask.h" -#include "tls_stats_map.h" #include "tls_stats_factory.h" +#include "tls_stats_map.h" #include <vespa/searchcore/proton/common/eventlogger.h> #include <vespa/searchlib/common/flush_token.h> -#include <vespa/vespalib/util/jsonwriter.h> +#include <vespa/vespalib/util/cpu_usage.h> #include <vespa/vespalib/util/size_literals.h> #include <thread> #include <vespa/log/log.h> LOG_SETUP(".proton.flushengine.flushengine"); -typedef vespalib::Executor::Task Task; -using searchcorespi::IFlushTarget; +using Task = vespalib::Executor::Task; using searchcorespi::FlushStats; +using searchcorespi::IFlushTarget; +using vespalib::CpuUsage; using namespace std::chrono_literals; namespace proton { @@ -86,7 +87,7 @@ FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> tlsStats _threadPool(128_Ki), _strategy(std::move(strategy)), _priorityStrategy(), - _executor(numThreads, 128_Ki, flush_engine_executor), + _executor(numThreads, 128_Ki, CpuUsage::wrap(flush_engine_executor, CpuUsage::Category::COMPACT)), _lock(), _cond(), _handlers(), diff --git a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp index 58dc473b85e..8489a68af15 100644 --- a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp @@ -6,6 +6,7 @@ #include <vespa/vespalib/data/smart_buffer.h> #include <vespa/vespalib/data/slime/binary_format.h> #include <vespa/vespalib/util/size_literals.h> +#include <vespa/vespalib/util/cpu_usage.h> #include <vespa/log/log.h> @@ -34,12 +35,14 @@ public: }; VESPA_THREAD_STACK_TAG(match_engine_executor) +VESPA_THREAD_STACK_TAG(match_engine_thread_bundle) } // namespace anon namespace proton { using namespace vespalib::slime; +using vespalib::CpuUsage; MatchEngine::MatchEngine(size_t numThreads, size_t threadsPerSearch, uint32_t distributionKey, bool async) : _lock(), @@ -48,8 +51,10 @@ MatchEngine::MatchEngine(size_t numThreads, size_t threadsPerSearch, uint32_t di _closed(false), _forward_issues(true), _handlers(), - _executor(std::max(size_t(1), numThreads / threadsPerSearch), 256_Ki, match_engine_executor), - _threadBundlePool(std::max(size_t(1), threadsPerSearch)), + _executor(std::max(size_t(1), numThreads / threadsPerSearch), 256_Ki, + CpuUsage::wrap(match_engine_executor, CpuUsage::Category::READ)), + _threadBundlePool(std::max(size_t(1), threadsPerSearch), + CpuUsage::wrap(match_engine_thread_bundle, CpuUsage::Category::READ)), _nodeUp(false), _nodeMaintenance(false) { @@ -147,6 +152,9 @@ MatchEngine::performSearch(search::engine::SearchRequest::Source req) } } _threadBundlePool.release(std::move(threadBundle)); + if (searchRequest->expired()) { + vespalib::Issue::report("search request timed out; results may be incomplete"); + } } ret->request = req.release(); if (_forward_issues) { diff --git a/searchcore/src/vespa/searchcore/proton/matching/docsum_matcher.cpp b/searchcore/src/vespa/searchcore/proton/matching/docsum_matcher.cpp index ce04464a851..032991cbf6a 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/docsum_matcher.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/docsum_matcher.cpp @@ -47,7 +47,8 @@ get_feature_set(const MatchToolsFactory &mtf, } else { matchTools->setup_dump(); } - auto retval = ExtractFeatures::get_feature_set(matchTools->search(), matchTools->rank_program(), docs, mtf.get_feature_rename_map()); + auto retval = ExtractFeatures::get_feature_set(matchTools->search(), matchTools->rank_program(), docs, + matchTools->getDoom(), mtf.get_feature_rename_map()); if (auto onSummaryTask = mtf.createOnSummaryTask()) { onSummaryTask->run(docs); } diff --git a/searchcore/src/vespa/searchcore/proton/matching/extract_features.cpp b/searchcore/src/vespa/searchcore/proton/matching/extract_features.cpp index d74885bd0be..9f09255795a 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/extract_features.cpp +++ b/searchcore/src/vespa/searchcore/proton/matching/extract_features.cpp @@ -2,6 +2,7 @@ #include "extract_features.h" #include "match_tools.h" +#include <vespa/vespalib/util/doom.h> #include <vespa/eval/eval/value_codec.h> #include <vespa/vespalib/objects/nbostream.h> #include <vespa/vespalib/util/runnable.h> @@ -10,6 +11,7 @@ #include <vespa/searchlib/fef/rank_program.h> #include <vespa/searchlib/queryeval/searchiterator.h> +using vespalib::Doom; using vespalib::Runnable; using vespalib::ThreadBundle; using search::FeatureSet; @@ -59,15 +61,19 @@ struct MyChunk : Runnable { const std::pair<uint32_t,uint32_t> *begin; const std::pair<uint32_t,uint32_t> *end; FeatureValues &result; + const Doom &doom; MyChunk(const std::pair<uint32_t,uint32_t> *begin_in, - const std::pair<uint32_t,uint32_t> *end_in, - FeatureValues &result_in) - : begin(begin_in), end(end_in), result(result_in) {} + const std::pair<uint32_t,uint32_t> *end_in, + FeatureValues &result_in, const Doom &doom_in) + : begin(begin_in), end(end_in), result(result_in), doom(doom_in) {} void calculate_features(SearchIterator &search, const FeatureResolver &resolver) { assert(end > begin); assert(resolver.num_features() == result.names.size()); search.initRange(begin[0].first, end[-1].first + 1); for (auto pos = begin; pos != end; ++pos) { + if (doom.hard_doom()) { + return; + } search.unpack(pos->first); auto *dst = &result.values[pos->second * resolver.num_features()]; extract_values(resolver, pos->first, dst); @@ -81,9 +87,10 @@ struct FirstChunk : MyChunk { FirstChunk(const std::pair<uint32_t,uint32_t> *begin_in, const std::pair<uint32_t,uint32_t> *end_in, FeatureValues &result_in, + const Doom &doom_in, SearchIterator &search_in, const FeatureResolver &resolver_in) - : MyChunk(begin_in, end_in, result_in), + : MyChunk(begin_in, end_in, result_in, doom_in), search(search_in), resolver(resolver_in) {} void run() override { calculate_features(search, resolver); } @@ -94,8 +101,9 @@ struct LaterChunk : MyChunk { LaterChunk(const std::pair<uint32_t,uint32_t> *begin_in, const std::pair<uint32_t,uint32_t> *end_in, FeatureValues &result_in, + const Doom &doom_in, const MatchToolsFactory &mtf_in) - : MyChunk(begin_in, end_in, result_in), + : MyChunk(begin_in, end_in, result_in, doom_in), mtf(mtf_in) {} void run() override { auto tools = mtf.createMatchTools(); @@ -125,13 +133,16 @@ struct MyWork { FeatureSet::UP ExtractFeatures::get_feature_set(SearchIterator &search, RankProgram &rank_program, const std::vector<uint32_t> &docs, - const MatchToolsFactory::StringStringMap &renames) + const Doom &doom, const MatchToolsFactory::StringStringMap &renames) { FeatureResolver resolver(rank_program.get_seeds(false)); auto result = std::make_unique<FeatureSet>(extract_names(resolver, renames), docs.size()); if (!docs.empty()) { search.initRange(docs.front(), docs.back()+1); for (uint32_t docid: docs) { + if (doom.hard_doom()) { + return result; + } search.unpack(docid); auto *dst = result->getFeaturesByIndex(result->addDocId(docid)); extract_values(resolver, docid, dst); @@ -159,9 +170,9 @@ ExtractFeatures::get_match_features(const MatchToolsFactory &mtf, const OrderedD break; } if (i == 0) { - work.chunks.push_back(std::make_unique<FirstChunk>(&docs[idx], &docs[idx + chunk_size], result, tools->search(), resolver)); + work.chunks.push_back(std::make_unique<FirstChunk>(&docs[idx], &docs[idx + chunk_size], result, tools->getDoom(), tools->search(), resolver)); } else { - work.chunks.push_back(std::make_unique<LaterChunk>(&docs[idx], &docs[idx + chunk_size], result, mtf)); + work.chunks.push_back(std::make_unique<LaterChunk>(&docs[idx], &docs[idx + chunk_size], result, tools->getDoom(), mtf)); } idx += chunk_size; } diff --git a/searchcore/src/vespa/searchcore/proton/matching/extract_features.h b/searchcore/src/vespa/searchcore/proton/matching/extract_features.h index 44bff08df2e..48c3476f164 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/extract_features.h +++ b/searchcore/src/vespa/searchcore/proton/matching/extract_features.h @@ -6,6 +6,7 @@ #include <vespa/searchlib/common/stringmap.h> #include <vector> +namespace vespalib { class Doom; }; namespace vespalib { struct ThreadBundle; }; namespace search::queryeval { class SearchIterator; } namespace search::fef { class RankProgram; } @@ -27,8 +28,7 @@ struct ExtractFeatures { * documents (must be in ascending order) using unpack information * from a search. **/ - static FeatureSet::UP get_feature_set(SearchIterator &search, RankProgram &rank_program, const std::vector<uint32_t> &docs, const StringStringMap &renames); - + static FeatureSet::UP get_feature_set(SearchIterator &search, RankProgram &rank_program, const std::vector<uint32_t> &docs, const vespalib::Doom &doom, const StringStringMap &renames); // first: docid, second: result index (must be sorted on docid) using OrderedDocs = std::vector<std::pair<uint32_t,uint32_t>>; diff --git a/searchcore/src/vespa/searchcore/proton/metrics/documentdb_job_trackers.cpp b/searchcore/src/vespa/searchcore/proton/metrics/documentdb_job_trackers.cpp index af250b61d03..2e56b7010f9 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/documentdb_job_trackers.cpp +++ b/searchcore/src/vespa/searchcore/proton/metrics/documentdb_job_trackers.cpp @@ -28,15 +28,15 @@ DocumentDBJobTrackers::DocumentDBJobTrackers() { } -DocumentDBJobTrackers::~DocumentDBJobTrackers() {} +DocumentDBJobTrackers::~DocumentDBJobTrackers() = default; namespace { IFlushTarget::SP -trackFlushTarget(const IJobTracker::SP &tracker, - const IFlushTarget::SP &target) +trackFlushTarget(std::shared_ptr<IJobTracker> tracker, + std::shared_ptr<IFlushTarget> target) { - return std::make_shared<JobTrackedFlushTarget>(tracker, target); + return std::make_shared<JobTrackedFlushTarget>(std::move(tracker), std::move(target)); } } diff --git a/searchcore/src/vespa/searchcore/proton/metrics/documentdb_job_trackers.h b/searchcore/src/vespa/searchcore/proton/metrics/documentdb_job_trackers.h index a05fbb49a41..cd7ec612d98 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/documentdb_job_trackers.h +++ b/searchcore/src/vespa/searchcore/proton/metrics/documentdb_job_trackers.h @@ -4,8 +4,6 @@ #include "documentdb_tagged_metrics.h" #include "job_tracker.h" #include <vespa/searchcorespi/flush/iflushtarget.h> -#include <chrono> -#include <mutex> namespace proton { @@ -16,20 +14,23 @@ namespace proton { class DocumentDBJobTrackers { private: - std::mutex _lock; using time_point = std::chrono::time_point<std::chrono::steady_clock>; - time_point _now; - JobTracker::SP _attributeFlush; - JobTracker::SP _memoryIndexFlush; - JobTracker::SP _diskIndexFusion; - JobTracker::SP _documentStoreFlush; - JobTracker::SP _documentStoreCompact; - JobTracker::SP _bucketMove; - JobTracker::SP _lidSpaceCompact; - JobTracker::SP _removedDocumentsPrune; + using JobTrackerSP = std::shared_ptr<JobTracker>; + std::mutex _lock; + time_point _now; + JobTrackerSP _attributeFlush; + JobTrackerSP _memoryIndexFlush; + JobTrackerSP _diskIndexFusion; + JobTrackerSP _documentStoreFlush; + JobTrackerSP _documentStoreCompact; + JobTrackerSP _bucketMove; + JobTrackerSP _lidSpaceCompact; + JobTrackerSP _removedDocumentsPrune; public: DocumentDBJobTrackers(); + DocumentDBJobTrackers(const DocumentDBJobTrackers &) = delete; + DocumentDBJobTrackers & operator = (const DocumentDBJobTrackers &) = delete; ~DocumentDBJobTrackers(); IJobTracker &getAttributeFlush() { return *_attributeFlush; } @@ -37,9 +38,9 @@ public: IJobTracker &getDiskIndexFusion() { return *_diskIndexFusion; } IJobTracker &getDocumentStoreFlush() { return *_documentStoreFlush; } IJobTracker &getDocumentStoreCompact() { return *_documentStoreCompact; } - IJobTracker::SP getBucketMove() { return _bucketMove; } - IJobTracker::SP getLidSpaceCompact() { return _lidSpaceCompact; } - IJobTracker::SP getRemovedDocumentsPrune() { return _removedDocumentsPrune; } + std::shared_ptr<IJobTracker> getBucketMove() { return _bucketMove; } + std::shared_ptr<IJobTracker> getLidSpaceCompact() { return _lidSpaceCompact; } + std::shared_ptr<IJobTracker> getRemovedDocumentsPrune() { return _removedDocumentsPrune; } searchcorespi::IFlushTarget::List trackFlushTargets(const searchcorespi::IFlushTarget::List &flushTargets); @@ -47,5 +48,4 @@ public: void updateMetrics(DocumentDBTaggedMetrics::JobMetrics &metrics); }; -} // namespace proton - +} diff --git a/searchcore/src/vespa/searchcore/proton/metrics/i_job_tracker.h b/searchcore/src/vespa/searchcore/proton/metrics/i_job_tracker.h index 646a9f07672..eec8cfbc44a 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/i_job_tracker.h +++ b/searchcore/src/vespa/searchcore/proton/metrics/i_job_tracker.h @@ -2,8 +2,6 @@ #pragma once -#include <memory> - namespace proton { /** @@ -11,9 +9,7 @@ namespace proton { */ struct IJobTracker { - typedef std::shared_ptr<IJobTracker> SP; - - virtual ~IJobTracker() {} + virtual ~IJobTracker() = default; virtual void start() = 0; virtual void end() = 0; diff --git a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp index 8de3c29d02b..5b49c724c6f 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp +++ b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp @@ -8,11 +8,11 @@ using searchcorespi::FlushTask; namespace proton { -JobTrackedFlushTarget::JobTrackedFlushTarget(const IJobTracker::SP &tracker, - const IFlushTarget::SP &target) +JobTrackedFlushTarget::JobTrackedFlushTarget(std::shared_ptr<IJobTracker> tracker, + std::shared_ptr<IFlushTarget> target) : IFlushTarget(target->getName(), target->getType(), target->getComponent()), - _tracker(tracker), - _target(target) + _tracker(std::move(tracker)), + _target(std::move(target)) { } diff --git a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h index 7a9bae7f662..35d1b0b0b12 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h +++ b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h @@ -13,12 +13,12 @@ namespace proton { class JobTrackedFlushTarget : public searchcorespi::IFlushTarget { private: - IJobTracker::SP _tracker; - searchcorespi::IFlushTarget::SP _target; + std::shared_ptr<IJobTracker> _tracker; + std::shared_ptr<searchcorespi::IFlushTarget> _target; public: - JobTrackedFlushTarget(const IJobTracker::SP &tracker, - const searchcorespi::IFlushTarget::SP &target); + JobTrackedFlushTarget(std::shared_ptr<IJobTracker> tracker, + std::shared_ptr<searchcorespi::IFlushTarget> target); ~JobTrackedFlushTarget(); const IJobTracker &getTracker() const { return *_tracker; } diff --git a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_task.cpp b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_task.cpp index f5cf0b1afff..3e06c8321fd 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_task.cpp +++ b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_task.cpp @@ -6,13 +6,14 @@ using searchcorespi::FlushTask; namespace proton { -JobTrackedFlushTask::JobTrackedFlushTask(const IJobTracker::SP &tracker, - FlushTask::UP task) - : _tracker(tracker), +JobTrackedFlushTask::JobTrackedFlushTask(std::shared_ptr<IJobTracker> tracker, FlushTask::UP task) + : _tracker(std::move(tracker)), _task(std::move(task)) { } +JobTrackedFlushTask::~JobTrackedFlushTask() = default; + void JobTrackedFlushTask::run() { @@ -21,4 +22,4 @@ JobTrackedFlushTask::run() _tracker->end(); } -} // namespace proton +} diff --git a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_task.h b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_task.h index fe8d59b9dd6..a10ccbdffd6 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_task.h +++ b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_task.h @@ -12,16 +12,18 @@ namespace proton { class JobTrackedFlushTask : public searchcorespi::FlushTask { private: - IJobTracker::SP _tracker; + std::shared_ptr<IJobTracker> _tracker; searchcorespi::FlushTask::UP _task; public: - JobTrackedFlushTask(const IJobTracker::SP &tracker, + JobTrackedFlushTask(std::shared_ptr<IJobTracker> tracker, searchcorespi::FlushTask::UP task); + JobTrackedFlushTask(const JobTrackedFlushTask &) = delete; + JobTrackedFlushTask & operator = (const JobTrackedFlushTask &) = delete; + ~JobTrackedFlushTask() override; - // Implements searchcorespi::FlushTask - virtual void run() override; - virtual search::SerialNum getFlushSerial() const override { + void run() override; + search::SerialNum getFlushSerial() const override { return _task->getFlushSerial(); } }; diff --git a/searchcore/src/vespa/searchcore/proton/metrics/job_tracker.h b/searchcore/src/vespa/searchcore/proton/metrics/job_tracker.h index 49b63a6d018..dee974f8b56 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/job_tracker.h +++ b/searchcore/src/vespa/searchcore/proton/metrics/job_tracker.h @@ -19,8 +19,6 @@ private: std::mutex &_lock; public: - typedef std::shared_ptr<JobTracker> SP; - JobTracker(time_point now, std::mutex &lock); /** @@ -29,10 +27,8 @@ public: */ double sampleLoad(time_point now, const std::lock_guard<std::mutex> &guard); - // Implements IJobTracker - virtual void start() override; - virtual void end() override; + void start() override; + void end() override; }; -} // namespace proton - +} diff --git a/searchcore/src/vespa/searchcore/proton/metrics/resource_usage_metrics.cpp b/searchcore/src/vespa/searchcore/proton/metrics/resource_usage_metrics.cpp index 9bc20f95d13..d026ef549d4 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/resource_usage_metrics.cpp +++ b/searchcore/src/vespa/searchcore/proton/metrics/resource_usage_metrics.cpp @@ -1,21 +1,53 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "resource_usage_metrics.h" +#include <vespa/vespalib/util/stringfmt.h> + +using vespalib::make_string; namespace proton { +ResourceUsageMetrics::CpuUtilMetrics::CpuUtilMetrics(metrics::MetricSet *parent) + : MetricSet("cpu_util", {}, "Unnormalized cpu utilization for various categories", parent), + setup("setup", {}, "cpu used by system init and (re-)configuration", this), + read("read", {}, "cpu used by reading data from the system", this), + write("write", {}, "cpu used by writing data to the system", this), + compact("compact", {}, "cpu used by internal data re-structuring", this), + other("other", {}, "cpu used by work not classified as a specific category", this) +{ +} + +ResourceUsageMetrics::CpuUtilMetrics::~CpuUtilMetrics() = default; + +ResourceUsageMetrics::DetailedResourceMetrics::DetailedResourceMetrics(const vespalib::string& resource_type, metrics::MetricSet* parent) + : MetricSet(make_string("%s_usage", resource_type.c_str()), {}, make_string("Detailed resource usage metrics for %s", + resource_type.c_str()), parent), + total("total", {}, make_string("The total relative amount of %s used by this content node (value in the range [0, 1])", + resource_type.c_str()), this), + total_util("total_utilization", {}, make_string("The relative amount of %s used compared to the content node %s resource limit", + resource_type.c_str(), resource_type.c_str()), this), + transient("transient", {}, make_string("The relative amount of transient %s used by this content node (value in the range [0, 1])", + resource_type.c_str()), this) +{ +} + +ResourceUsageMetrics::DetailedResourceMetrics::~DetailedResourceMetrics() = default; + ResourceUsageMetrics::ResourceUsageMetrics(metrics::MetricSet *parent) - : MetricSet("resource_usage", {}, "Usage metrics for various resources in this search engine", parent), - disk("disk", {}, "The relative amount of disk space used on this machine (value in the range [0, 1])", this), + : MetricSet("resource_usage", {}, "Usage metrics for various resources in this content node", parent), + disk("disk", {}, "The relative amount of disk used by this content node (transient usage not included, value in the range [0, 1]). Same value as reported to the cluster controller", this), diskUtilization("disk_utilization", {}, "The relative amount of disk used compared to the disk resource limit", this), - memory("memory", {}, "The relative amount of memory used by this process (value in the range [0, 1])", this), + memory("memory", {}, "The relative amount of memory used by this content node (transient usage not included, value in the range [0, 1]). Same value as reported to the cluster controller", this), memoryUtilization("memory_utilization", {}, "The relative amount of memory used compared to the memory resource limit", this), transient_memory("transient_memory", {}, "The relative amount of transient memory needed for loading attributes. Max value among all attributes (value in the range [0, 1])", this), transient_disk("transient_disk", {}, "The relative amount of transient disk needed for running disk index fusion. Max value among all disk indexes (value in the range [0, 1])", this), + disk_usage("disk", this), + memory_usage("memory", this), memoryMappings("memory_mappings", {}, "The number of mapped memory areas", this), openFileDescriptors("open_file_descriptors", {}, "The number of open files", this), feedingBlocked("feeding_blocked", {}, "Whether feeding is blocked due to resource limits being reached (value is either 0 or 1)", this), - mallocArena("malloc_arena", {}, "Size of malloc arena", this) + mallocArena("malloc_arena", {}, "Size of malloc arena", this), + cpu_util(this) { } diff --git a/searchcore/src/vespa/searchcore/proton/metrics/resource_usage_metrics.h b/searchcore/src/vespa/searchcore/proton/metrics/resource_usage_metrics.h index 774bb645c84..97cad935dba 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/resource_usage_metrics.h +++ b/searchcore/src/vespa/searchcore/proton/metrics/resource_usage_metrics.h @@ -12,16 +12,42 @@ namespace proton { */ struct ResourceUsageMetrics : metrics::MetricSet { + struct CpuUtilMetrics : metrics::MetricSet { + metrics::DoubleValueMetric setup; + metrics::DoubleValueMetric read; + metrics::DoubleValueMetric write; + metrics::DoubleValueMetric compact; + metrics::DoubleValueMetric other; + + CpuUtilMetrics(metrics::MetricSet *parent); + ~CpuUtilMetrics(); + }; + + struct DetailedResourceMetrics : metrics::MetricSet { + metrics::DoubleValueMetric total; + metrics::DoubleValueMetric total_util; + metrics::DoubleValueMetric transient; + + DetailedResourceMetrics(const vespalib::string& resource_type, metrics::MetricSet* parent); + ~DetailedResourceMetrics(); + }; + + // TODO Vespa 8: Remove diskUtilization, memoryUtilization, transient_memory, transient_disk. + // These are now included in disk_usage and memory_usage. + metrics::DoubleValueMetric disk; metrics::DoubleValueMetric diskUtilization; metrics::DoubleValueMetric memory; metrics::DoubleValueMetric memoryUtilization; metrics::DoubleValueMetric transient_memory; metrics::DoubleValueMetric transient_disk; + DetailedResourceMetrics disk_usage; + DetailedResourceMetrics memory_usage; metrics::LongValueMetric memoryMappings; metrics::LongValueMetric openFileDescriptors; metrics::LongValueMetric feedingBlocked; metrics::LongValueMetric mallocArena; + CpuUtilMetrics cpu_util; ResourceUsageMetrics(metrics::MetricSet *parent); ~ResourceUsageMetrics(); diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/resource_usage_tracker.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/resource_usage_tracker.cpp index 6307604598d..9c8e6591730 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/resource_usage_tracker.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/resource_usage_tracker.cpp @@ -94,8 +94,13 @@ void ResourceUsageTracker::notifyDiskMemUsage(DiskMemUsageState state) { std::lock_guard guard(_lock); - // TODO: Subtract transient resource (memory and disk) usage from the absolute numbers. - _resource_usage = ResourceUsage(state.diskState().usage(), state.memoryState().usage(), _resource_usage.get_attribute_address_space_usage()); + // The transient resource usage is subtracted from the total resource usage + // before it eventually is reported to the cluster controller (to decide whether to block client feed). + // This ensures that the transient resource usage is covered by the resource headroom on the content node, + // instead of leading to feed blocked due to natural fluctuations. + _resource_usage = ResourceUsage(state.non_transient_disk_usage(), + state.non_transient_memory_usage(), + _resource_usage.get_attribute_address_space_usage()); if (_listener != nullptr) { _listener->update_resource_usage(_resource_usage); } diff --git a/searchcore/src/vespa/searchcore/proton/server/bootstrapconfigmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/bootstrapconfigmanager.cpp index 8f99eb5e8a7..f0cb123b49f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bootstrapconfigmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/bootstrapconfigmanager.cpp @@ -8,6 +8,7 @@ #include <vespa/config-bucketspaces.h> #include <vespa/searchlib/common/tunefileinfo.hpp> #include <vespa/vespalib/io/fileutil.h> +#include <vespa/config/retriever/configsnapshot.hpp> #include <cassert> #include <vespa/log/log.h> diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_metrics.cpp b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_metrics.cpp index 230593c2c1d..d740d5b129d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_metrics.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_metrics.cpp @@ -11,25 +11,29 @@ DiskMemUsageMetrics::DiskMemUsageMetrics() noexcept { } -DiskMemUsageMetrics::DiskMemUsageMetrics(const DiskMemUsageState &usage_state) noexcept - : _disk_usage(usage_state.diskState().usage()), - _disk_utilization(usage_state.diskState().utilization()), - _transient_disk_usage(usage_state.transient_disk_usage()), - _memory_usage(usage_state.memoryState().usage()), - _memory_utilization(usage_state.memoryState().utilization()), - _transient_memory_usage(usage_state.transient_memory_usage()) +DiskMemUsageMetrics::DiskMemUsageMetrics(const DiskMemUsageState& usage) noexcept + : _total_disk_usage(usage.diskState().usage()), + _total_disk_utilization(usage.diskState().utilization()), + _transient_disk_usage(usage.transient_disk_usage()), + _non_transient_disk_usage(usage.non_transient_disk_usage()), + _total_memory_usage(usage.memoryState().usage()), + _total_memory_utilization(usage.memoryState().utilization()), + _transient_memory_usage(usage.transient_memory_usage()), + _non_transient_memory_usage(usage.non_transient_memory_usage()) { } void -DiskMemUsageMetrics::merge(const DiskMemUsageState &usage_state) noexcept +DiskMemUsageMetrics::merge(const DiskMemUsageState& usage) noexcept { - _disk_usage = std::max(_disk_usage, usage_state.diskState().usage()); - _disk_utilization = std::max(_disk_utilization, usage_state.diskState().utilization()); - _transient_disk_usage = std::max(_transient_disk_usage, usage_state.transient_disk_usage()); - _memory_usage = std::max(_memory_usage, usage_state.memoryState().usage()); - _memory_utilization = std::max(_memory_utilization, usage_state.memoryState().utilization()); - _transient_memory_usage = std::max(_transient_memory_usage, usage_state.transient_memory_usage()); + _total_disk_usage = std::max(_total_disk_usage, usage.diskState().usage()); + _total_disk_utilization = std::max(_total_disk_utilization, usage.diskState().utilization()); + _transient_disk_usage = std::max(_transient_disk_usage, usage.transient_disk_usage()); + _non_transient_disk_usage = std::max(_non_transient_disk_usage, usage.non_transient_disk_usage()); + _total_memory_usage = std::max(_total_memory_usage, usage.memoryState().usage()); + _total_memory_utilization = std::max(_total_memory_utilization, usage.memoryState().utilization()); + _transient_memory_usage = std::max(_transient_memory_usage, usage.transient_memory_usage()); + _non_transient_memory_usage = std::max(_non_transient_memory_usage, usage.non_transient_memory_usage()); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_metrics.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_metrics.h index cb97eb4c891..3e3d6fdc752 100644 --- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_metrics.h +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_metrics.h @@ -12,23 +12,27 @@ class DiskMemUsageState; */ class DiskMemUsageMetrics { - double _disk_usage; - double _disk_utilization; + double _total_disk_usage; + double _total_disk_utilization; double _transient_disk_usage; - double _memory_usage; - double _memory_utilization; + double _non_transient_disk_usage; + double _total_memory_usage; + double _total_memory_utilization; double _transient_memory_usage; + double _non_transient_memory_usage; public: DiskMemUsageMetrics() noexcept; - DiskMemUsageMetrics(const DiskMemUsageState &usage_state) noexcept; - void merge(const DiskMemUsageState &usage_state) noexcept; - double get_disk_usage() const noexcept { return _disk_usage; } - double get_disk_utilization() const noexcept { return _disk_utilization; } - double get_transient_disk_usage() const noexcept { return _transient_disk_usage; } - double get_memory_usage() const noexcept { return _memory_usage; } - double get_memory_utilization() const noexcept { return _memory_utilization; } - double get_transient_memory_usage() const noexcept { return _transient_memory_usage; } + DiskMemUsageMetrics(const DiskMemUsageState& usage) noexcept; + void merge(const DiskMemUsageState& usage) noexcept; + double total_disk_usage() const noexcept { return _total_disk_usage; } + double total_disk_utilization() const noexcept { return _total_disk_utilization; } + double transient_disk_usage() const noexcept { return _transient_disk_usage; } + double non_transient_disk_usage() const noexcept { return _non_transient_disk_usage; } + double total_memory_usage() const noexcept { return _total_memory_usage; } + double total_memory_utilization() const noexcept { return _total_memory_utilization; } + double transient_memory_usage() const noexcept { return _transient_memory_usage; } + double non_transient_memory_usage() const noexcept { return _non_transient_memory_usage; } }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_state.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_state.h index b205b441bcf..2730388de9a 100644 --- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_state.h +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_state.h @@ -3,6 +3,7 @@ #pragma once #include "resource_usage_state.h" +#include <algorithm> namespace proton { @@ -42,6 +43,8 @@ public: const ResourceUsageState &memoryState() const { return _memoryState; } double transient_disk_usage() const { return _transient_disk_usage; } double transient_memory_usage() const { return _transient_memory_usage; } + double non_transient_disk_usage() const { return std::max(0.0, _diskState.usage() - _transient_disk_usage); } + double non_transient_memory_usage() const { return std::max(0.0, _memoryState.usage() - _transient_memory_usage); } bool aboveDiskLimit(double resourceLimitFactor) const { return diskState().aboveLimit(resourceLimitFactor); } bool aboveMemoryLimit(double resourceLimitFactor) const { return memoryState().aboveLimit(resourceLimitFactor); } }; diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index f052d663ba6..a30aa916896 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -12,6 +12,7 @@ #include "idocumentsubdb.h" #include "maintenance_jobs_injector.h" #include "reconfig_params.h" +#include "replay_throttling_policy.h" #include <vespa/document/repo/documenttyperepo.h> #include <vespa/metrics/updatehook.h> #include <vespa/searchcore/proton/attribute/attribute_config_inspector.h> @@ -77,6 +78,18 @@ makeIndexConfig(const ProtonConfig::Index & cfg) { return index::IndexConfig(WarmupConfig(vespalib::from_s(cfg.warmup.time), cfg.warmup.unpack), cfg.maxflushed, cfg.cache.size); } +ReplayThrottlingPolicy +make_replay_throttling_policy(const ProtonConfig::ReplayThrottlingPolicy& cfg) { + if (cfg.type == ProtonConfig::ReplayThrottlingPolicy::Type::UNLIMITED) { + return ReplayThrottlingPolicy({}); + } + vespalib::SharedOperationThrottler::DynamicThrottleParams params; + params.min_window_size = cfg.minWindowSize; + params.max_window_size = cfg.maxWindowSize; + params.window_size_increment = cfg.windowSizeIncrement; + return ReplayThrottlingPolicy(params); +} + class MetricsUpdateHook : public metrics::UpdateHook { DocumentDB &_db; public: @@ -190,6 +203,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _clusterStateHandler(_writeService.master()), _bucketHandler(_writeService.master()), _indexCfg(makeIndexConfig(protonCfg.index)), + _replay_throttling_policy(std::make_unique<ReplayThrottlingPolicy>(make_replay_throttling_policy(protonCfg.replayThrottlingPolicy))), _config_store(std::move(config_store)), _sessionManager(std::make_shared<matching::SessionManager>(protonCfg.grouping.sessionmanager.maxentries)), _metricsWireService(metricsWireService), @@ -486,7 +500,7 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum _writeServiceConfig.defaultTaskLimit()); if (params.shouldSubDbsChange()) { applySubDBConfig(*configSnapshot, serialNum, params); - if (serialNum < _feedHandler->getSerialNum()) { + if (serialNum < _feedHandler->get_replay_end_serial_num()) { // Not last entry in tls. Reprocessing should already be done. _subDBs.getReprocessingRunner().reset(); } @@ -720,7 +734,8 @@ DocumentDB::startTransactionLogReplay() getBackingStore().lastSyncToken(), oldestFlushedSerial, newestFlushedSerial, - *_config_store); + *_config_store, + *_replay_throttling_policy); _initGate.countDown(); LOG(debug, "DocumentDB(%s): Database started.", _docTypeName.toString().c_str()); diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h index 8e8391b2f31..2030e6ffac9 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h @@ -51,6 +51,7 @@ class ExecutorThreadingServiceStats; class IDocumentDBOwner; class ISharedThreadingService; class ITransientResourceUsageProvider; +class ReplayThrottlingPolicy; class StatusReport; struct MetricsWireService; @@ -104,6 +105,7 @@ private: ClusterStateHandler _clusterStateHandler; BucketHandler _bucketHandler; index::IndexConfig _indexCfg; + std::unique_ptr<ReplayThrottlingPolicy> _replay_throttling_policy; ConfigStore::UP _config_store; std::shared_ptr<matching::SessionManager> _sessionManager; // TODO: This should not have to be a shared pointer. MetricsWireService &_metricsWireService; diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp index 06132803414..b7c319d46f5 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp @@ -14,6 +14,7 @@ #include <vespa/config-summarymap.h> #include <vespa/config/file_acquirer/file_acquirer.h> #include <vespa/config/common/configcontext.h> +#include <vespa/config/retriever/configretriever.h> #include <vespa/config/helper/legacy.h> #include <vespa/config-attributes.h> #include <vespa/config-indexschema.h> @@ -23,6 +24,7 @@ #include <vespa/searchsummary/config/config-juniperrc.h> #include <vespa/vespalib/time/time_box.h> #include <vespa/vespalib/util/stringfmt.h> +#include <vespa/config/retriever/configsnapshot.hpp> #include <thread> #include <cassert> diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.h b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.h index a3c29212c66..ad5959d551b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.h @@ -3,9 +3,12 @@ #pragma once #include "documentdbconfig.h" -#include <vespa/config/config.h> #include <mutex> +namespace config { + class ConfigRetriever; + class DirSpec; +} namespace proton { class BootstrapConfig; diff --git a/searchcore/src/vespa/searchcore/proton/server/documentretriever.cpp b/searchcore/src/vespa/searchcore/proton/server/documentretriever.cpp index 6285f4ce70f..d3778b4d745 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentretriever.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentretriever.cpp @@ -127,6 +127,7 @@ DocumentRetriever::needFetchFromDocStore(const FieldSet & fieldSet) const { case FieldSet::Type::NONE: case FieldSet::Type::DOCID: return false; + case FieldSet::Type::DOCUMENT_ONLY: case FieldSet::Type::ALL: return ! _areAllFieldsAttributes; case FieldSet::Type::FIELD: { @@ -257,6 +258,14 @@ DocumentRetriever::getPartialDocument(search::DocumentIdT lid, const document::D populate(lid, *doc, set.getFields()); break; } + case FieldSet::Type::DOCUMENT_ONLY: { + const auto * actual = getDocumentType().getFieldSet(document::DocumentOnly::NAME); + if (actual != nullptr) { + const auto &set = actual->asCollection(); + populate(lid, *doc, set.getFields()); + } + break; + } case FieldSet::Type::NONE: case FieldSet::Type::DOCID: break; diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp index 36c8070f140..ca6b3d9ba0f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp @@ -3,14 +3,16 @@ #include "executorthreadingservice.h" #include "threading_service_config.h" #include <vespa/searchcore/proton/metrics/executor_threading_service_stats.h> +#include <vespa/vespalib/util/blockingthreadstackexecutor.h> +#include <vespa/vespalib/util/cpu_usage.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/vespalib/util/singleexecutor.h> -#include <vespa/vespalib/util/blockingthreadstackexecutor.h> -using vespalib::SyncableThreadExecutor; using vespalib::BlockingThreadStackExecutor; -using vespalib::SingleExecutor; +using vespalib::CpuUsage; using vespalib::SequencedTaskExecutor; +using vespalib::SingleExecutor; +using vespalib::SyncableThreadExecutor; using OptimizeFor = vespalib::Executor::OptimizeFor; using SharedFieldWriterExecutor = proton::ThreadingServiceConfig::SharedFieldWriterExecutor; @@ -38,22 +40,24 @@ VESPA_THREAD_STACK_TAG(field_writer_executor) } -ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor &sharedExecutor, uint32_t num_treads) +ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor &sharedExecutor, uint32_t num_treads) : ExecutorThreadingService(sharedExecutor, nullptr, nullptr, ThreadingServiceConfig::make(num_treads)) {} -ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor, +ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor& sharedExecutor, vespalib::ISequencedTaskExecutor* field_writer, vespalib::InvokeService * invokerService, const ThreadingServiceConfig& cfg, uint32_t stackSize) : _sharedExecutor(sharedExecutor), - _masterExecutor(1, stackSize, master_executor), + _masterExecutor(1, stackSize, CpuUsage::wrap(master_executor, CpuUsage::Category::WRITE)), _shared_field_writer(cfg.shared_field_writer()), _master_task_limit(cfg.master_task_limit()), - _indexExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), index_executor)), - _summaryExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), summary_executor)), + _indexExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), + CpuUsage::wrap(index_executor, CpuUsage::Category::WRITE))), + _summaryExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), + CpuUsage::wrap(summary_executor, CpuUsage::Category::WRITE))), _masterService(_masterExecutor), _indexService(*_indexExecutor), _indexFieldInverter(), @@ -70,9 +74,11 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_summaryExecutor.get()](){ executor->wakeup();})); } if (_shared_field_writer == SharedFieldWriterExecutor::INDEX) { - _field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 2, cfg.defaultTaskLimit()); - _attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(), - cfg.optimize(), cfg.kindOfwatermark()); + _field_writer = SequencedTaskExecutor::create(CpuUsage::wrap(field_writer_executor, CpuUsage::Category::WRITE), + cfg.indexingThreads() * 2, cfg.defaultTaskLimit()); + _attributeFieldWriter = SequencedTaskExecutor::create(CpuUsage::wrap(attribute_field_writer_executor, CpuUsage::Category::WRITE), + cfg.indexingThreads(), cfg.defaultTaskLimit(), + cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark()); if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) { _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();})); } @@ -81,8 +87,9 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha _attribute_field_writer_ptr = _attributeFieldWriter.get(); } else if (_shared_field_writer == SharedFieldWriterExecutor::INDEX_AND_ATTRIBUTE) { - _field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 3, cfg.defaultTaskLimit(), - cfg.optimize(), cfg.kindOfwatermark()); + _field_writer = SequencedTaskExecutor::create(CpuUsage::wrap(field_writer_executor, CpuUsage::Category::WRITE), + cfg.indexingThreads() * 3, cfg.defaultTaskLimit(), + cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark()); if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) { _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_field_writer.get()](){ executor->wakeup();})); } @@ -95,10 +102,13 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha _index_field_writer_ptr = field_writer; _attribute_field_writer_ptr = field_writer; } else { - _indexFieldInverter = SequencedTaskExecutor::create(index_field_inverter_executor, cfg.indexingThreads(), cfg.defaultTaskLimit()); - _indexFieldWriter = SequencedTaskExecutor::create(index_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit()); - _attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(), - cfg.optimize(), cfg.kindOfwatermark()); + _indexFieldInverter = SequencedTaskExecutor::create(CpuUsage::wrap(index_field_inverter_executor, CpuUsage::Category::WRITE), + cfg.indexingThreads(), cfg.defaultTaskLimit()); + _indexFieldWriter = SequencedTaskExecutor::create(CpuUsage::wrap(index_field_writer_executor, CpuUsage::Category::WRITE), + cfg.indexingThreads(), cfg.defaultTaskLimit()); + _attributeFieldWriter = SequencedTaskExecutor::create(CpuUsage::wrap(attribute_field_writer_executor, CpuUsage::Category::WRITE), + cfg.indexingThreads(), cfg.defaultTaskLimit(), + cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark()); if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) { _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();})); } diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h index 8572f7126d6..43d546927c2 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h @@ -20,7 +20,7 @@ class ExecutorThreadingService : public searchcorespi::index::IThreadingService { private: using Registration = std::unique_ptr<vespalib::IDestructorCallback>; - vespalib::ThreadExecutor & _sharedExecutor; + vespalib::Executor & _sharedExecutor; vespalib::ThreadStackExecutor _masterExecutor; ThreadingServiceConfig::SharedFieldWriterExecutor _shared_field_writer; std::atomic<uint32_t> _master_task_limit; @@ -42,9 +42,9 @@ public: /** * Convenience constructor used in unit tests. */ - ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor, uint32_t num_treads = 1); + ExecutorThreadingService(vespalib::Executor& sharedExecutor, uint32_t num_treads = 1); - ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor, + ExecutorThreadingService(vespalib::Executor& sharedExecutor, vespalib::ISequencedTaskExecutor* field_writer, vespalib::InvokeService * invokeService, const ThreadingServiceConfig& cfg, @@ -72,7 +72,7 @@ public: vespalib::ThreadExecutor &summary() override { return *_summaryExecutor; } - vespalib::ThreadExecutor &shared() override { + vespalib::Executor &shared() override { return _sharedExecutor; } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index 8b99c39dd65..16bd2537813 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -464,13 +464,14 @@ FeedHandler::close() void FeedHandler::replayTransactionLog(SerialNum flushedIndexMgrSerial, SerialNum flushedSummaryMgrSerial, SerialNum oldestFlushedSerial, SerialNum newestFlushedSerial, - ConfigStore &config_store) + ConfigStore &config_store, + const ReplayThrottlingPolicy& replay_throttling_policy) { (void) newestFlushedSerial; assert(_activeFeedView); assert(_bucketDBHandler); auto state = make_shared<ReplayTransactionLogState> - (getDocTypeName(), _activeFeedView, *_bucketDBHandler, _replayConfig, config_store, *this); + (getDocTypeName(), _activeFeedView, *_bucketDBHandler, _replayConfig, config_store, replay_throttling_policy, *this); changeFeedState(state); // Resurrected attribute vector might cause oldestFlushedSerial to // be lower than _prunedSerialNum, so don't warn for now. diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index 417d9c21548..32a70f7c2b0 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -36,6 +36,7 @@ class IReplayConfig; class JoinBucketsOperation; class PutOperation; class RemoveOperation; +class ReplayThrottlingPolicy; class SplitBucketOperation; class UpdateOperation; @@ -189,7 +190,8 @@ public: SerialNum flushedSummaryMgrSerial, SerialNum oldestFlushedSerial, SerialNum newestFlushedSerial, - ConfigStore &config_store); + ConfigStore &config_store, + const ReplayThrottlingPolicy& replay_throttling_policy); /** * Called when a flush is done and allows pruning of the transaction log. diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp index d2626a0d9f4..d9fdee85e4a 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp @@ -5,11 +5,14 @@ #include "ifeedview.h" #include "ireplayconfig.h" #include "replaypacketdispatcher.h" +#include "replay_throttling_policy.h" #include <vespa/searchcore/proton/bucketdb/ibucketdbhandler.h> #include <vespa/searchcore/proton/feedoperation/operations.h> #include <vespa/searchcore/proton/common/eventlogger.h> +#include <vespa/searchcore/proton/common/replay_feedtoken_state.h> #include <vespa/vespalib/util/idestructorcallback.h> #include <vespa/vespalib/util/lambdatask.h> +#include <vespa/vespalib/util/shared_operation_throttler.h> #include <cassert> #include <vespa/log/log.h> @@ -21,6 +24,7 @@ using search::SerialNum; using vespalib::Executor; using vespalib::makeLambdaTask; using vespalib::IDestructorCallback; +using vespalib::SharedOperationThrottler; using vespalib::make_string; using proton::bucketdb::IBucketDBHandler; @@ -52,31 +56,47 @@ class TransactionLogReplayPacketHandler : public IReplayPacketHandler { FeedConfigStore &_config_store; IIncSerialNum &_inc_serial_num; CommitTimeTracker _commitTimeTracker; + std::unique_ptr<SharedOperationThrottler> _throttler; + + static std::unique_ptr<SharedOperationThrottler> make_throttler(const ReplayThrottlingPolicy& replay_throttling_policy) { + auto& params = replay_throttling_policy.get_params(); + if (!params.has_value()) { + return SharedOperationThrottler::make_unlimited_throttler(); + } + return SharedOperationThrottler::make_dynamic_throttler(params.value()); + } public: TransactionLogReplayPacketHandler(IFeedView *& feed_view_ptr, IBucketDBHandler &bucketDBHandler, IReplayConfig &replay_config, FeedConfigStore &config_store, + const ReplayThrottlingPolicy& replay_throttling_policy, IIncSerialNum &inc_serial_num) : _feed_view_ptr(feed_view_ptr), _bucketDBHandler(bucketDBHandler), _replay_config(replay_config), _config_store(config_store), _inc_serial_num(inc_serial_num), - _commitTimeTracker(5ms) + _commitTimeTracker(5ms), + _throttler(make_throttler(replay_throttling_policy)) { } ~TransactionLogReplayPacketHandler() override = default; + FeedToken make_replay_feed_token() { + SharedOperationThrottler::Token throttler_token = _throttler->blocking_acquire_one(); + return std::make_shared<feedtoken::ReplayState>(std::move(throttler_token)); + } + void replay(const PutOperation &op) override { - _feed_view_ptr->handlePut(FeedToken(), op); + _feed_view_ptr->handlePut(make_replay_feed_token(), op); } void replay(const RemoveOperation &op) override { - _feed_view_ptr->handleRemove(FeedToken(), op); + _feed_view_ptr->handleRemove(make_replay_feed_token(), op); } void replay(const UpdateOperation &op) override { - _feed_view_ptr->handleUpdate(FeedToken(), op); + _feed_view_ptr->handleUpdate(make_replay_feed_token(), op); } void replay(const NoopOperation &) override {} // ignored void replay(const NewConfigOperation &op) override { @@ -84,7 +104,7 @@ public: } void replay(const DeleteBucketOperation &op) override { - _feed_view_ptr->handleDeleteBucket(op, IDestructorCallback::SP()); + _feed_view_ptr->handleDeleteBucket(op, make_replay_feed_token()); } void replay(const SplitBucketOperation &op) override { _bucketDBHandler.handleSplit(op.getSerialNum(), op.getSource(), @@ -95,15 +115,15 @@ public: op.getSource2(), op.getTarget()); } void replay(const PruneRemovedDocumentsOperation &op) override { - _feed_view_ptr->handlePruneRemovedDocuments(op, IDestructorCallback::SP()); + _feed_view_ptr->handlePruneRemovedDocuments(op, make_replay_feed_token()); } void replay(const MoveOperation &op) override { - _feed_view_ptr->handleMove(op, IDestructorCallback::SP()); + _feed_view_ptr->handleMove(op, make_replay_feed_token()); } void replay(const CreateBucketOperation &) override { } void replay(const CompactLidSpaceOperation &op) override { - _feed_view_ptr->handleCompactLidSpace(op, IDestructorCallback::SP()); + _feed_view_ptr->handleCompactLidSpace(op, make_replay_feed_token()); } NewConfigOperation::IStreamHandler &getNewConfigStreamHandler() override { return _config_store; @@ -173,10 +193,11 @@ ReplayTransactionLogState::ReplayTransactionLogState( IBucketDBHandler &bucketDBHandler, IReplayConfig &replay_config, FeedConfigStore &config_store, + const ReplayThrottlingPolicy &replay_throttling_policy, IIncSerialNum& inc_serial_num) : FeedState(REPLAY_TRANSACTION_LOG), _doc_type_name(name), - _packet_handler(std::make_unique<TransactionLogReplayPacketHandler>(feed_view_ptr, bucketDBHandler, replay_config, config_store, inc_serial_num)) + _packet_handler(std::make_unique<TransactionLogReplayPacketHandler>(feed_view_ptr, bucketDBHandler, replay_config, config_store, replay_throttling_policy, inc_serial_num)) { } ReplayTransactionLogState::~ReplayTransactionLogState() = default; diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.h b/searchcore/src/vespa/searchcore/proton/server/feedstates.h index 7533d6443a7..fcff28fe481 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedstates.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.h @@ -55,6 +55,7 @@ public: bucketdb::IBucketDBHandler &bucketDBHandler, IReplayConfig &replay_config, FeedConfigStore &config_store, + const ReplayThrottlingPolicy &replay_throttling_policy, IIncSerialNum &inc_serial_num); ~ReplayTransactionLogState() override; diff --git a/searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.cpp index 9a525731d0d..34369a0803e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.cpp @@ -68,7 +68,6 @@ fsyncFile(const vespalib::string &fileName) if (!f.Sync()) { LOG(error, "Could not fsync file '%s'", fileName.c_str()); } - f.Close(); } template <class Config> @@ -131,7 +130,6 @@ ConfigFile::ConfigFile(const vespalib::string &name, const vespalib::string &ful _content.resize(fileSize); file.ReadBuf(&_content[0], fileSize); _modTime = file.GetModificationTime(); - file.Close(); } nbostream & diff --git a/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.cpp b/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.cpp index e317c86df05..c0d8168ab61 100644 --- a/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.cpp @@ -4,7 +4,7 @@ namespace proton { -JobTrackedMaintenanceJob::JobTrackedMaintenanceJob(IJobTracker::SP tracker, +JobTrackedMaintenanceJob::JobTrackedMaintenanceJob(std::shared_ptr<IJobTracker> tracker, IMaintenanceJob::SP job) : IMaintenanceJob(job->getName(), job->getDelay(), job->getInterval()), _tracker(std::move(tracker)), diff --git a/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h b/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h index 1953393bad7..20ecfdf023d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h +++ b/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h @@ -12,12 +12,14 @@ namespace proton { class JobTrackedMaintenanceJob final : public IMaintenanceJob { private: - IJobTracker::SP _tracker; - IMaintenanceJob::SP _job; - bool _running; + std::shared_ptr<IJobTracker> _tracker; + IMaintenanceJob::SP _job; + bool _running; public: - JobTrackedMaintenanceJob(IJobTracker::SP tracker, IMaintenanceJob::SP job); + JobTrackedMaintenanceJob(std::shared_ptr<IJobTracker> tracker, IMaintenanceJob::SP job); + JobTrackedMaintenanceJob(const JobTrackedMaintenanceJob &) = delete; + JobTrackedMaintenanceJob & operator = (const JobTrackedMaintenanceJob &) = delete; ~JobTrackedMaintenanceJob() override; bool isBlocked() const override { return _job->isBlocked(); } diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp index f4b92876891..49b301da26e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp @@ -17,7 +17,7 @@ namespace proton { namespace { IMaintenanceJob::UP -trackJob(IJobTracker::SP tracker, std::shared_ptr<IMaintenanceJob> job) +trackJob(std::shared_ptr<IJobTracker> tracker, std::shared_ptr<IMaintenanceJob> job) { return std::make_unique<JobTrackedMaintenanceJob>(std::move(tracker), std::move(job)); } @@ -28,7 +28,7 @@ injectLidSpaceCompactionJobs(MaintenanceController &controller, storage::spi::BucketExecutor & bucketExecutor, ILidSpaceCompactionHandler::Vector lscHandlers, IOperationStorer &opStorer, - IJobTracker::SP tracker, + std::shared_ptr<IJobTracker> tracker, IDiskMemUsageNotifier &diskMemUsageNotifier, IClusterStateChangedNotifier &clusterStateChangedNotifier, const std::shared_ptr<IBucketStateCalculator> &calc, @@ -89,7 +89,8 @@ MaintenanceJobsInjector::injectJobs(MaintenanceController &controller, AttributeUsageFilter &attributeUsageFilter) { controller.registerJobInMasterThread(std::make_unique<HeartBeatJob>(hbHandler, config.getHeartBeatConfig())); - controller.registerJobInDefaultPool(std::make_unique<PruneSessionCacheJob>(scPruner, config.getSessionCachePruneInterval())); + controller.registerJobInSharedExecutor( + std::make_unique<PruneSessionCacheJob>(scPruner, config.getSessionCachePruneInterval())); const auto & docTypeName = controller.getDocTypeName().getName(); const MaintenanceDocumentSubDB &mRemSubDB(controller.getRemSubDB()); diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp index 0d75464a161..fa4bae8f01b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp @@ -39,12 +39,12 @@ isRunnable(const MaintenanceJobRunner & job, const Executor * master) { } -MaintenanceController::MaintenanceController(ISyncableThreadService &masterThread, - vespalib::Executor & defaultExecutor, - MonitoredRefCount & refCount, - const DocTypeName &docTypeName) +MaintenanceController::MaintenanceController(ISyncableThreadService& masterThread, + vespalib::Executor& shared_executor, + MonitoredRefCount& refCount, + const DocTypeName& docTypeName) : _masterThread(masterThread), - _defaultExecutor(defaultExecutor), + _shared_executor(shared_executor), _refCount(refCount), _readySubDB(), _remSubDB(), @@ -70,10 +70,10 @@ MaintenanceController::registerJobInMasterThread(IMaintenanceJob::UP job) } void -MaintenanceController::registerJobInDefaultPool(IMaintenanceJob::UP job) +MaintenanceController::registerJobInSharedExecutor(IMaintenanceJob::UP job) { // Called by master write thread - registerJob(_defaultExecutor, std::move(job)); + registerJob(_shared_executor, std::move(job)); } void diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h index f2c425b2fd0..086f5a36404 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h @@ -42,11 +42,12 @@ public: using UP = std::unique_ptr<MaintenanceController>; enum class State {INITIALIZING, STARTED, PAUSED, STOPPING}; - MaintenanceController(ISyncableThreadService &masterThread, vespalib::Executor & defaultExecutor, vespalib::MonitoredRefCount & refCount, const DocTypeName &docTypeName); + MaintenanceController(ISyncableThreadService& masterThread, vespalib::Executor& shared_executor, + vespalib::MonitoredRefCount& refCount, const DocTypeName& docTypeName); ~MaintenanceController(); void registerJobInMasterThread(IMaintenanceJob::UP job); - void registerJobInDefaultPool(IMaintenanceJob::UP job); + void registerJobInSharedExecutor(IMaintenanceJob::UP job); void killJobs(); @@ -82,7 +83,7 @@ private: using Guard = std::lock_guard<Mutex>; ISyncableThreadService &_masterThread; - vespalib::Executor &_defaultExecutor; + vespalib::Executor &_shared_executor; vespalib::MonitoredRefCount &_refCount; MaintenanceDocumentSubDB _readySubDB; MaintenanceDocumentSubDB _remSubDB; diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancejobrunner.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancejobrunner.cpp index 9eb0596ff1f..5e6cee94292 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancejobrunner.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancejobrunner.cpp @@ -1,12 +1,14 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "maintenancejobrunner.h" -#include <vespa/vespalib/util/lambdatask.h> #include <vespa/fastos/thread.h> +#include <vespa/vespalib/util/cpu_usage.h> +#include <vespa/vespalib/util/lambdatask.h> #include <vespa/log/log.h> LOG_SETUP(".proton.server.maintenancejobrunner"); +using vespalib::CpuUsage; using vespalib::Executor; using vespalib::makeLambdaTask; @@ -34,7 +36,8 @@ MaintenanceJobRunner::addExecutorTask() Guard guard(_lock); if (!_stopped && !_job->isBlocked() && !_queued) { _queued = true; - _executor.execute(makeLambdaTask([this]() { runJobInExecutor(); })); + auto task = makeLambdaTask([this]() { runJobInExecutor(); }); + _executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::COMPACT)); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/memory_flush_config_updater.cpp b/searchcore/src/vespa/searchcore/proton/server/memory_flush_config_updater.cpp index 51243b718ec..50a499c8a73 100644 --- a/searchcore/src/vespa/searchcore/proton/server/memory_flush_config_updater.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/memory_flush_config_updater.cpp @@ -13,9 +13,10 @@ namespace { bool shouldUseConservativeMode(const ResourceUsageState &resourceState, bool currentlyUseConservativeMode, + double high_watermark_factor, double lowWatermarkFactor) { - return resourceState.aboveLimit() || + return resourceState.aboveLimit(high_watermark_factor) || (currentlyUseConservativeMode && resourceState.aboveLimit(lowWatermarkFactor)); } @@ -25,6 +26,7 @@ void MemoryFlushConfigUpdater::considerUseConservativeDiskMode(const LockGuard &guard, MemoryFlush::Config &newConfig) { if (shouldUseConservativeMode(_currState.diskState(), _useConservativeDiskMode, + _currConfig.conservative.highwatermarkfactor, _currConfig.conservative.lowwatermarkfactor)) { newConfig.maxGlobalTlsSize = _currConfig.maxtlssize * _currConfig.conservative.disklimitfactor; @@ -41,6 +43,7 @@ void MemoryFlushConfigUpdater::considerUseConservativeMemoryMode(const LockGuard &, MemoryFlush::Config &newConfig) { if (shouldUseConservativeMode(_currState.memoryState(), _useConservativeMemoryMode, + _currConfig.conservative.highwatermarkfactor, _currConfig.conservative.lowwatermarkfactor)) { newConfig.maxGlobalMemory = _currConfig.maxmemory * _currConfig.conservative.memorylimitfactor; diff --git a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp index 53b12972c44..9b1c6de15df 100644 --- a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp @@ -1,14 +1,23 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "operationdonecontext.h" +#include <vespa/searchcore/proton/common/feedtoken.h> namespace proton { -OperationDoneContext::OperationDoneContext(IDestructorCallback::SP token) - : _token(std::move(token)) +OperationDoneContext::OperationDoneContext(std::shared_ptr<feedtoken::IState> token, std::shared_ptr<vespalib::IDestructorCallback> done_callback) + : _token(std::move(token)), + _done_callback(std::move(done_callback)) { } OperationDoneContext::~OperationDoneContext() = default; +bool +OperationDoneContext::is_replay() const +{ + return (!_token || _token->is_replay()); +} + + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h index 113aae8c9bf..baf0e98d5a4 100644 --- a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h @@ -4,6 +4,8 @@ #include <vespa/vespalib/util/idestructorcallback.h> +namespace proton::feedtoken { class IState; } + namespace proton { /** @@ -16,13 +18,13 @@ namespace proton { class OperationDoneContext : public vespalib::IDestructorCallback { public: - using IDestructorCallback = vespalib::IDestructorCallback; - OperationDoneContext(IDestructorCallback::SP token); + OperationDoneContext(std::shared_ptr<feedtoken::IState> token, std::shared_ptr<IDestructorCallback> done_callback); ~OperationDoneContext() override; - bool hasToken() const { return static_cast<bool>(_token); } + bool is_replay() const; private: - IDestructorCallback::SP _token; + std::shared_ptr<feedtoken::IState> _token; + std::shared_ptr<vespalib::IDestructorCallback> _done_callback; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index b128fe16e5e..6001e6b88d4 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -24,10 +24,10 @@ #include <vespa/searchcore/proton/flushengine/flushengine.h> #include <vespa/searchcore/proton/flushengine/tls_stats_factory.h> #include <vespa/searchcore/proton/matchengine/matchengine.h> +#include <vespa/searchcore/proton/metrics/content_proton_metrics.h> +#include <vespa/searchcore/proton/metrics/metrics_engine.h> #include <vespa/searchcore/proton/persistenceengine/persistenceengine.h> #include <vespa/searchcore/proton/reference/document_db_reference_registry.h> -#include <vespa/searchcore/proton/metrics/metrics_engine.h> -#include <vespa/searchcore/proton/metrics/content_proton_metrics.h> #include <vespa/searchcore/proton/summaryengine/summaryengine.h> #include <vespa/searchlib/common/packets.h> #include <vespa/searchlib/transactionlog/trans_log_server_explorer.h> @@ -36,13 +36,13 @@ #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/net/state_server.h> #include <vespa/vespalib/util/blockingthreadstackexecutor.h> +#include <vespa/vespalib/util/cpu_usage.h> #include <vespa/vespalib/util/host_name.h> #include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/util/mmap_file_allocator_factory.h> #include <vespa/vespalib/util/random.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/vespalib/util/size_literals.h> -#include <vespa/vespalib/util/invokeserviceimpl.h> #ifdef __linux__ #include <malloc.h> #endif @@ -54,20 +54,22 @@ #include <vespa/log/log.h> LOG_SETUP(".proton.server.proton"); +using CpuCategory = vespalib::CpuUsage::Category; + using document::DocumentTypeRepo; +using search::engine::MonitorReply; +using search::transactionlog::DomainStats; +using vespa::config::search::core::ProtonConfig; +using vespa::config::search::core::internal::InternalProtonType; +using vespalib::CpuUsage; using vespalib::FileHeader; using vespalib::IllegalStateException; using vespalib::Slime; +using vespalib::compression::CompressionConfig; using vespalib::makeLambdaTask; using vespalib::slime::ArrayInserter; using vespalib::slime::Cursor; -using search::transactionlog::DomainStats; -using vespa::config::search::core::ProtonConfig; -using vespa::config::search::core::internal::InternalProtonType; -using vespalib::compression::CompressionConfig; -using search::engine::MonitorReply; - namespace proton { namespace { @@ -139,8 +141,8 @@ struct MetricsUpdateHook : metrics::UpdateHook const vespalib::string CUSTOM_COMPONENT_API_PATH = "/state/v1/custom/component"; -VESPA_THREAD_STACK_TAG(initialize_executor) -VESPA_THREAD_STACK_TAG(close_executor) +VESPA_THREAD_STACK_TAG(proton_initialize_executor) +VESPA_THREAD_STACK_TAG(proton_close_executor) } @@ -207,6 +209,7 @@ Proton::Proton(const config::ConfigUri & configUri, StatusProducer(), IPersistenceEngineOwner(), ComponentConfigProducer(), + _cpu_util(), _configUri(configUri), _mutex(), _metricsHook(std::make_unique<MetricsUpdateHook>(*this)), @@ -310,7 +313,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) _protonDiskLayout = std::make_unique<ProtonDiskLayout>(protonConfig.basedir, protonConfig.tlsspec); vespalib::chdir(protonConfig.basedir); vespalib::alloc::MmapFileAllocatorFactory::instance().setup(protonConfig.basedir + "/swapdirs"); - _tls->start(); + _tls->start(hwInfo.cpu().cores()); _flushEngine = std::make_unique<FlushEngine>(std::make_shared<flushengine::TlsStatsFactory>(_tls->getTransLogServer()), strategy, flush.maxconcurrent, vespalib::from_s(flush.idleinterval)); _metricsEngine->addExternalMetrics(_summaryEngine->getMetrics()); @@ -329,7 +332,8 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) _compile_cache_executor_binding = vespalib::eval::CompileCache::bind(_shared_service->shared_raw()); InitializeThreads initializeThreads; if (protonConfig.initialize.threads > 0) { - initializeThreads = std::make_shared<vespalib::ThreadStackExecutor>(protonConfig.initialize.threads, 128_Ki, initialize_executor); + initializeThreads = std::make_shared<vespalib::ThreadStackExecutor>(protonConfig.initialize.threads, 128_Ki, + CpuUsage::wrap(proton_initialize_executor, CpuCategory::SETUP)); _initDocumentDbsInSequence = (protonConfig.initialize.threads == 1); } _protonConfigurer.applyInitialConfig(initializeThreads); @@ -463,7 +467,8 @@ Proton::~Proton() } } - vespalib::ThreadStackExecutor closePool(std::min(_documentDBMap.size(), numCores), 0x20000, close_executor); + vespalib::ThreadStackExecutor closePool(std::min(_documentDBMap.size(), numCores), 0x20000, + CpuUsage::wrap(proton_close_executor, CpuCategory::SETUP)); closeDocumentDBs(closePool); } _documentDBMap.clear(); @@ -753,12 +758,20 @@ Proton::updateMetrics(const metrics::MetricLockGuard &) const DiskMemUsageFilter &usageFilter = _diskMemUsageSampler->writeFilter(); auto dm_metrics = usageFilter.get_metrics(); - metrics.resourceUsage.disk.set(dm_metrics.get_disk_usage()); - metrics.resourceUsage.diskUtilization.set(dm_metrics.get_disk_utilization()); - metrics.resourceUsage.memory.set(dm_metrics.get_memory_usage()); - metrics.resourceUsage.memoryUtilization.set(dm_metrics.get_memory_utilization()); - metrics.resourceUsage.transient_memory.set(dm_metrics.get_transient_memory_usage()); - metrics.resourceUsage.transient_disk.set(dm_metrics.get_transient_disk_usage()); + metrics.resourceUsage.disk.set(dm_metrics.non_transient_disk_usage()); + metrics.resourceUsage.diskUtilization.set(dm_metrics.total_disk_utilization()); + metrics.resourceUsage.transient_disk.set(dm_metrics.transient_disk_usage()); + metrics.resourceUsage.disk_usage.total.set(dm_metrics.total_disk_usage()); + metrics.resourceUsage.disk_usage.total_util.set(dm_metrics.total_disk_utilization()); + metrics.resourceUsage.disk_usage.transient.set(dm_metrics.transient_disk_usage()); + + metrics.resourceUsage.memory.set(dm_metrics.non_transient_memory_usage()); + metrics.resourceUsage.memoryUtilization.set(dm_metrics.total_memory_utilization()); + metrics.resourceUsage.transient_memory.set(dm_metrics.transient_memory_usage()); + metrics.resourceUsage.memory_usage.total.set(dm_metrics.total_memory_usage()); + metrics.resourceUsage.memory_usage.total_util.set(dm_metrics.total_memory_utilization()); + metrics.resourceUsage.memory_usage.transient.set(dm_metrics.transient_memory_usage()); + metrics.resourceUsage.memoryMappings.set(usageFilter.getMemoryStats().getMappingsCount()); metrics.resourceUsage.openFileDescriptors.set(FastOS_File::count_open_files()); metrics.resourceUsage.feedingBlocked.set((usageFilter.acceptWriteOperation() ? 0.0 : 1.0)); @@ -775,6 +788,12 @@ Proton::updateMetrics(const metrics::MetricLockGuard &) #else metrics.resourceUsage.mallocArena.set(UINT64_C(0)); #endif + auto cpu_util = _cpu_util.get_util(); + metrics.resourceUsage.cpu_util.setup.set(cpu_util[CpuCategory::SETUP]); + metrics.resourceUsage.cpu_util.read.set(cpu_util[CpuCategory::READ]); + metrics.resourceUsage.cpu_util.write.set(cpu_util[CpuCategory::WRITE]); + metrics.resourceUsage.cpu_util.compact.set(cpu_util[CpuCategory::COMPACT]); + metrics.resourceUsage.cpu_util.other.set(cpu_util[CpuCategory::OTHER]); } { ContentProtonMetrics::ProtonExecutorMetrics &metrics = _metricsEngine->root().executor; diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h index 573f215c722..90a257a0aaa 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton.h @@ -25,6 +25,7 @@ #include <vespa/vespalib/net/json_handler_repo.h> #include <vespa/vespalib/net/state_explorer.h> #include <vespa/vespalib/util/varholder.h> +#include <vespa/vespalib/util/cpu_usage.h> #include <mutex> #include <shared_mutex> @@ -81,6 +82,7 @@ private: void setClusterName(const vespalib::string &clusterName, const vespalib::string &baseDir); }; + vespalib::CpuUtil _cpu_util; const config::ConfigUri _configUri; mutable std::shared_mutex _mutex; std::unique_ptr<metrics::UpdateHook> _metricsHook; diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.h b/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.h index 56a4d1b8c9f..e6d036df7d9 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.h @@ -2,14 +2,13 @@ #pragma once -#include <vespa/fastos/thread.h> -#include <vespa/searchcore/proton/common/doctypename.h> -#include <vespa/vespalib/stllike/string.h> -#include <vespa/config/config.h> #include "bootstrapconfigmanager.h" #include "documentdbconfigmanager.h" #include "i_document_db_config_owner.h" -#include <chrono> +#include <vespa/fastos/thread.h> +#include <vespa/searchcore/proton/common/doctypename.h> +#include <vespa/config/retriever/configretriever.h> +#include <vespa/config/subscription/configuri.h> namespace document { class DocumentTypeRepo; } diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp index 7801f21c906..695f39c0097 100644 --- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp @@ -9,9 +9,11 @@ using document::Document; namespace proton { -PutDoneContext::PutDoneContext(IDestructorCallback::SP token, IPendingLidTracker::Token uncommitted, +PutDoneContext::PutDoneContext(std::shared_ptr<feedtoken::IState> token, + std::shared_ptr<vespalib::IDestructorCallback> done_callback, + IPendingLidTracker::Token uncommitted, std::shared_ptr<const Document> doc, uint32_t lid) - : OperationDoneContext(std::move(token)), + : OperationDoneContext(std::move(token), std::move(done_callback)), _uncommitted(std::move(uncommitted)), _lid(lid), _docIdLimit(nullptr), diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h index 00e8ac46c93..e0f55816314 100644 --- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h @@ -28,7 +28,9 @@ class PutDoneContext : public OperationDoneContext std::shared_ptr<const document::Document> _doc; public: - PutDoneContext(IDestructorCallback::SP token, IPendingLidTracker::Token uncommitted, + PutDoneContext(std::shared_ptr<feedtoken::IState> token, + std::shared_ptr<vespalib::IDestructorCallback> done_callback, + IPendingLidTracker::Token uncommitted, std::shared_ptr<const document::Document> doc, uint32_t lid); ~PutDoneContext() override; diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp index 558a57bb8c6..0c93992d427 100644 --- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp @@ -4,8 +4,8 @@ namespace proton { -RemoveDoneContext::RemoveDoneContext(IDestructorCallback::SP token, IPendingLidTracker::Token uncommitted) - : OperationDoneContext(std::move(token)), +RemoveDoneContext::RemoveDoneContext(std::shared_ptr<feedtoken::IState> token, std::shared_ptr<IDestructorCallback> done_callback, IPendingLidTracker::Token uncommitted) + : OperationDoneContext(std::move(token), std::move(done_callback)), _uncommitted(std::move(uncommitted)) { } diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h index 28e15389bb2..62db0f20b84 100644 --- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h @@ -19,7 +19,7 @@ class RemoveDoneContext : public OperationDoneContext IPendingLidTracker::Token _uncommitted; public: - RemoveDoneContext(IDestructorCallback::SP token, IPendingLidTracker::Token uncommitted); + RemoveDoneContext(std::shared_ptr<feedtoken::IState>, std::shared_ptr<vespalib::IDestructorCallback> done_callback, IPendingLidTracker::Token uncommitted); ~RemoveDoneContext() override; }; diff --git a/searchcore/src/vespa/searchcore/proton/server/replay_throttling_policy.h b/searchcore/src/vespa/searchcore/proton/server/replay_throttling_policy.h new file mode 100644 index 00000000000..3e1485d94dd --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/replay_throttling_policy.h @@ -0,0 +1,27 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/util/shared_operation_throttler.h> +#include <optional> + +namespace proton { + +/* + * Policy for transaction log replay throttling. If params are set then a dynamic throttler + * is used, otherwise an unlimited throttler is used. + */ +class ReplayThrottlingPolicy +{ + using DynamicThrottleParams = vespalib::SharedOperationThrottler::DynamicThrottleParams; + std::optional<DynamicThrottleParams> _params; + +public: + explicit ReplayThrottlingPolicy(std::optional<DynamicThrottleParams> params) + : _params(std::move(params)) + { + } + const std::optional<DynamicThrottleParams>& get_params() const noexcept { return _params; } +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp index 66b1ba1ae2e..207f1d813d8 100644 --- a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp @@ -99,8 +99,8 @@ SearchableFeedView::updateIndexedFields(SerialNum serialNum, search::DocumentIdT { _writeService.index().execute( makeLambdaTask([serialNum, lid, futureDoc = std::move(futureDoc), - onWriteDone = std::move(onWriteDone), this]() mutable { - performIndexPut(serialNum, lid, std::move(futureDoc), std::move(onWriteDone)); + onWriteDone, this]() mutable { + performIndexPut(serialNum, lid, std::move(futureDoc), onWriteDone); })); } diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp index e32cd6f5f4e..7e799e506e3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp @@ -2,10 +2,12 @@ #include "shared_threading_service.h" #include <vespa/vespalib/util/blockingthreadstackexecutor.h> +#include <vespa/vespalib/util/cpu_usage.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/vespalib/util/size_literals.h> -#include <vespa/vespalib/util/invokeserviceimpl.h> + +using vespalib::CpuUsage; VESPA_THREAD_STACK_TAG(proton_field_writer_executor) VESPA_THREAD_STACK_TAG(proton_shared_executor) @@ -16,7 +18,7 @@ namespace proton { using SharedFieldWriterExecutor = ThreadingServiceConfig::ProtonConfig::Feeding::SharedFieldWriterExecutor; SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfig& cfg) - : _warmup(cfg.warmup_threads(), 128_Ki, proton_warmup_executor), + : _warmup(cfg.warmup_threads(), 128_Ki, CpuUsage::wrap(proton_warmup_executor, CpuUsage::Category::COMPACT)), _shared(std::make_shared<vespalib::BlockingThreadStackExecutor>(cfg.shared_threads(), 128_Ki, cfg.shared_task_limit(), proton_shared_executor)), _field_writer(), @@ -25,9 +27,10 @@ SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfi { const auto& fw_cfg = cfg.field_writer_config(); if (fw_cfg.shared_field_writer() == SharedFieldWriterExecutor::DOCUMENT_DB) { - _field_writer = vespalib::SequencedTaskExecutor::create(proton_field_writer_executor, + _field_writer = vespalib::SequencedTaskExecutor::create(CpuUsage::wrap(proton_field_writer_executor, CpuUsage::Category::WRITE), fw_cfg.indexingThreads() * 3, fw_cfg.defaultTaskLimit(), + fw_cfg.is_task_limit_hard(), fw_cfg.optimize(), fw_cfg.kindOfwatermark()); if (fw_cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT) { diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp index 8a81c3f4388..76b7982fedd 100644 --- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp @@ -30,6 +30,11 @@ derive_shared_threads(const ProtonConfig& cfg, const HwInfo::Cpu& cpu_info) return std::max(scaled_cores, cfg.documentdb.size() + cfg.flush.maxconcurrent + 1); } +size_t +derive_warmup_threads(const HwInfo::Cpu& cpu_info) { + return std::max(1u, std::min(4u, cpu_info.cores()/8)); +} + } SharedThreadingServiceConfig @@ -37,7 +42,7 @@ SharedThreadingServiceConfig::make(const proton::SharedThreadingServiceConfig::P const proton::HwInfo::Cpu& cpu_info) { size_t shared_threads = derive_shared_threads(cfg, cpu_info); - return proton::SharedThreadingServiceConfig(shared_threads, shared_threads * 16, 4, + return proton::SharedThreadingServiceConfig(shared_threads, shared_threads * 16, derive_warmup_threads(cpu_info), ThreadingServiceConfig::make(cfg, cfg.feeding.concurrency, cpu_info)); } diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index 97bd940b403..170b6f99930 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -15,9 +15,10 @@ #include <vespa/searchcore/proton/feedoperation/operations.h> #include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h> #include <vespa/searchcore/proton/reference/i_pending_gid_to_lid_changes.h> +#include <vespa/vespalib/util/cpu_usage.h> #include <vespa/vespalib/util/destructor_callbacks.h> -#include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/log/log.h> LOG_SETUP(".proton.server.storeonlyfeedview"); @@ -25,28 +26,29 @@ LOG_SETUP(".proton.server.storeonlyfeedview"); using document::BucketId; using document::Document; using document::DocumentId; -using document::GlobalId; using document::DocumentTypeRepo; using document::DocumentUpdate; -using vespalib::IDestructorCallback; +using document::GlobalId; +using proton::documentmetastore::LidReuseDelayer; using search::SerialNum; using search::index::Schema; using storage::spi::BucketInfoResult; using storage::spi::Timestamp; +using vespalib::CpuUsage; +using vespalib::IDestructorCallback; using vespalib::IllegalStateException; using vespalib::makeLambdaTask; using vespalib::make_string; -using proton::documentmetastore::LidReuseDelayer; namespace proton { namespace { std::shared_ptr<PutDoneContext> -createPutDoneContext(IDestructorCallback::SP token, IPendingLidTracker::Token uncommitted, +createPutDoneContext(FeedToken token, std::shared_ptr<IDestructorCallback> done_callback, IPendingLidTracker::Token uncommitted, std::shared_ptr<const Document> doc, uint32_t lid) { - return std::make_shared<PutDoneContext>(std::move(token), std::move(uncommitted), std::move(doc), lid); + return std::make_shared<PutDoneContext>(std::move(token), std::move(done_callback), std::move(uncommitted), std::move(doc), lid); } std::shared_ptr<UpdateDoneContext> @@ -66,9 +68,9 @@ void setPrev(DocumentOperation &op, const documentmetastore::IStore::Result &res } std::shared_ptr<RemoveDoneContext> -createRemoveDoneContext(IDestructorCallback::SP token, IPendingLidTracker::Token uncommitted) +createRemoveDoneContext(FeedToken token, std::shared_ptr<IDestructorCallback> done_callback, IPendingLidTracker::Token uncommitted) { - return std::make_shared<RemoveDoneContext>(std::move(token), std::move(uncommitted)); + return std::make_shared<RemoveDoneContext>(std::move(token), std::move(done_callback), std::move(uncommitted)); } class SummaryPutDoneContext : public OperationDoneContext @@ -80,7 +82,7 @@ public: }; SummaryPutDoneContext::SummaryPutDoneContext(FeedToken token, IPendingLidTracker::Token uncommitted) - : OperationDoneContext(std::move(token)), + : OperationDoneContext(std::move(token), {}), _uncommitted(std::move(uncommitted)) {} @@ -247,9 +249,19 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp) if (putOp.getValidDbdId(_params._subDbId)) { if (putOp.changedDbdId() && useDocumentMetaStore(serialNum)) { - _gidToLidChangeHandler.notifyPut(token, docId.getGlobalId(), putOp.getLid(), serialNum); + /* + * Don't pass replay feed token to GidToLidChangeHandler. + * + * The passed feed token is kept until the ForceCommitDoneTask scheduled by the next + * force commit has completed. If a replay feed token containing an active throttler + * token is passed to GidToLidChangeHandler then + * TransactionLogReplayFeedHandler::make_replay_feed_token() might deadlock, waiting for + * active throttler tokens to be destroyed. + */ + FeedToken token_copy = (token && !token->is_replay()) ? token : FeedToken(); + _gidToLidChangeHandler.notifyPut(std::move(token_copy), docId.getGlobalId(), putOp.getLid(), serialNum); } - auto onWriteDone = createPutDoneContext(std::move(token), get_pending_lid_token(putOp), doc, putOp.getLid()); + auto onWriteDone = createPutDoneContext(std::move(token), {}, get_pending_lid_token(putOp), doc, putOp.getLid()); putSummary(serialNum, putOp.getLid(), doc, onWriteDone); putAttributes(serialNum, putOp.getLid(), *doc, onWriteDone); putIndexedFields(serialNum, putOp.getLid(), doc, onWriteDone); @@ -257,7 +269,7 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp) if (docAlreadyExists && putOp.changedDbdId()) { //TODO, better to have an else than an assert ? assert(!putOp.getValidDbdId(_params._subDbId)); - internalRemove(std::move(token), _pendingLidsForCommit->produce(putOp.getPrevLid()), serialNum, putOp.getPrevLid()); + internalRemove(std::move(token), {}, _pendingLidsForCommit->produce(putOp.getPrevLid()), serialNum, putOp.getPrevLid()); } } @@ -357,7 +369,7 @@ StoreOnlyFeedView::removeSummaries(SerialNum serialNum, const LidVector & lids, trackerTokens.emplace_back(_pendingLidsForDocStore.produce(lid)); }); summaryExecutor().execute( - makeLambdaTask([serialNum, lids = std::move(lids), onDone, trackerTokens = std::move(trackerTokens), this] { + makeLambdaTask([serialNum, lids, onDone, trackerTokens = std::move(trackerTokens), this] { (void) onDone; (void) trackerTokens; std::for_each(lids.begin(), lids.end(), [this, serialNum](Lid lid) { @@ -439,13 +451,14 @@ StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp) } else { putSummaryNoop(std::move(futureStream), onWriteDone); } - _writeService.shared().execute(makeLambdaTask( - [upd = updOp.getUpdate(), useDocStore, lid, onWriteDone, promisedDoc = std::move(promisedDoc), - promisedStream = std::move(promisedStream), this]() mutable - { - makeUpdatedDocument(useDocStore, lid, *upd, onWriteDone, - std::move(promisedDoc), std::move(promisedStream)); - })); + auto task = makeLambdaTask([upd = updOp.getUpdate(), useDocStore, lid, onWriteDone, + promisedDoc = std::move(promisedDoc), + promisedStream = std::move(promisedStream), this]() mutable + { + makeUpdatedDocument(useDocStore, lid, *upd, onWriteDone, + std::move(promisedDoc), std::move(promisedStream)); + }); + _writeService.shared().execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::WRITE)); updateAttributes(serialNum, lid, std::move(futureDoc), onWriteDone); } } @@ -458,13 +471,13 @@ StoreOnlyFeedView::makeUpdatedDocument(bool useDocStore, Lid lid, const Document Document::UP prevDoc = _summaryAdapter->get(lid, *_repo); Document::UP newDoc; vespalib::nbostream newStream(12345); - assert(!onWriteDone->hasToken() || useDocStore); + assert(onWriteDone->is_replay() || useDocStore); if (useDocStore) { assert(prevDoc); } if (!prevDoc) { // Replaying, document removed later before summary was flushed. - assert(!onWriteDone->hasToken()); + assert(onWriteDone->is_replay()); // If we've passed serial number for flushed index then we could // also check that this operation is marked for ignore by index // proxy. @@ -478,7 +491,7 @@ StoreOnlyFeedView::makeUpdatedDocument(bool useDocStore, Lid lid, const Document } else { // Replaying, document removed and lid reused before summary // was flushed. - assert(!onWriteDone->hasToken() && !useDocStore); + assert(onWriteDone->is_replay() && !useDocStore); } } promisedDoc.set_value(std::move(newDoc)); @@ -552,7 +565,7 @@ StoreOnlyFeedView::internalRemove(FeedToken token, const RemoveOperationWithDocI if (rmOp.changedDbdId()) { //TODO Prefer else over assert ? assert(!rmOp.getValidDbdId(_params._subDbId)); - internalRemove(std::move(token), _pendingLidsForCommit->produce(rmOp.getPrevLid()), serialNum, rmOp.getPrevLid()); + internalRemove(std::move(token), {}, _pendingLidsForCommit->produce(rmOp.getPrevLid()), serialNum, rmOp.getPrevLid()); } } } @@ -569,16 +582,16 @@ StoreOnlyFeedView::internalRemove(FeedToken token, const RemoveOperationWithGid if (rmOp.getValidPrevDbdId(_params._subDbId)) { if (rmOp.changedDbdId()) { assert(!rmOp.getValidDbdId(_params._subDbId)); - internalRemove(std::move(token), _pendingLidsForCommit->produce(rmOp.getPrevLid()), serialNum, rmOp.getPrevLid()); + internalRemove(std::move(token), {}, _pendingLidsForCommit->produce(rmOp.getPrevLid()), serialNum, rmOp.getPrevLid()); } } } void -StoreOnlyFeedView::internalRemove(IDestructorCallback::SP token, IPendingLidTracker::Token uncommitted, SerialNum serialNum, Lid lid) +StoreOnlyFeedView::internalRemove(FeedToken token, std::shared_ptr<IDestructorCallback> done_callback, IPendingLidTracker::Token uncommitted, SerialNum serialNum, Lid lid) { _lidReuseDelayer.delayReuse(lid); - auto onWriteDone = createRemoveDoneContext(std::move(token), std::move(uncommitted)); + auto onWriteDone = createRemoveDoneContext(std::move(token), std::move(done_callback), std::move(uncommitted)); removeSummary(serialNum, lid, onWriteDone); removeAttributes(serialNum, lid, onWriteDone); removeIndexedFields(serialNum, lid, onWriteDone); @@ -706,13 +719,13 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, DoneCallback doneCtx) if (moveOp.changedDbdId() && useDocumentMetaStore(serialNum)) { _gidToLidChangeHandler.notifyPut(FeedToken(), docId.getGlobalId(), moveOp.getLid(), serialNum); } - auto onWriteDone = createPutDoneContext(doneCtx, _pendingLidsForCommit->produce(moveOp.getLid()), doc, moveOp.getLid()); + auto onWriteDone = createPutDoneContext({}, doneCtx, _pendingLidsForCommit->produce(moveOp.getLid()), doc, moveOp.getLid()); putSummary(serialNum, moveOp.getLid(), doc, onWriteDone); putAttributes(serialNum, moveOp.getLid(), *doc, onWriteDone); putIndexedFields(serialNum, moveOp.getLid(), doc, onWriteDone); } if (docAlreadyExists && moveOp.changedDbdId()) { - internalRemove(doneCtx, _pendingLidsForCommit->produce(moveOp.getPrevLid()), serialNum, moveOp.getPrevLid()); + internalRemove({}, doneCtx, _pendingLidsForCommit->produce(moveOp.getPrevLid()), serialNum, moveOp.getPrevLid()); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h index c25accaf4a4..bd8509fa796 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h @@ -180,7 +180,7 @@ private: // returns the number of documents removed. size_t removeDocuments(const RemoveDocumentsOperation &op, bool remove_index_and_attribute_fields, DoneCallback onDone); - void internalRemove(IDestructorCallbackSP token, IPendingLidTracker::Token uncommitted, SerialNum serialNum, Lid lid); + void internalRemove(FeedToken token, std::shared_ptr<vespalib::IDestructorCallback> done_callback,IPendingLidTracker::Token uncommitted, SerialNum serialNum, Lid lid); IPendingLidTracker::Token get_pending_lid_token(const DocumentOperation &op); diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp index 7982e8a8414..335d5bab8d0 100644 --- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp @@ -12,14 +12,15 @@ using OptimizeFor = vespalib::Executor::OptimizeFor; ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t master_task_limit_, - uint32_t defaultTaskLimit_, + int32_t defaultTaskLimit_, OptimizeFor optimize_, uint32_t kindOfWatermark_, vespalib::duration reactionTime_, SharedFieldWriterExecutor shared_field_writer_) : _indexingThreads(indexingThreads_), _master_task_limit(master_task_limit_), - _defaultTaskLimit(defaultTaskLimit_), + _defaultTaskLimit(std::abs(defaultTaskLimit_)), + _is_task_limit_hard(defaultTaskLimit_ >= 0), _optimize(optimize_), _kindOfWatermark(kindOfWatermark_), _reactionTime(reactionTime_), @@ -81,6 +82,7 @@ ThreadingServiceConfig::operator==(const ThreadingServiceConfig &rhs) const return _indexingThreads == rhs._indexingThreads && _master_task_limit == rhs._master_task_limit && _defaultTaskLimit == rhs._defaultTaskLimit && + _is_task_limit_hard == rhs._is_task_limit_hard && _optimize == rhs._optimize && _kindOfWatermark == rhs._kindOfWatermark && _reactionTime == rhs._reactionTime && diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h index f1a4f0525d1..a54c0674263 100644 --- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h @@ -23,15 +23,16 @@ private: uint32_t _indexingThreads; uint32_t _master_task_limit; uint32_t _defaultTaskLimit; + bool _is_task_limit_hard; OptimizeFor _optimize; uint32_t _kindOfWatermark; vespalib::duration _reactionTime; // Maximum reaction time to new tasks SharedFieldWriterExecutor _shared_field_writer; private: - ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t master_task_limit_, uint32_t defaultTaskLimit_, - OptimizeFor optimize_, uint32_t kindOfWatermark_, vespalib::duration reactionTime_, - SharedFieldWriterExecutor shared_field_writer_); + ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t master_task_limit_, int32_t defaultTaskLimit_, + OptimizeFor optimize_, uint32_t kindOfWatermark_, + vespalib::duration reactionTime_, SharedFieldWriterExecutor shared_field_writer_); public: static ThreadingServiceConfig make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo); @@ -40,6 +41,7 @@ public: uint32_t indexingThreads() const { return _indexingThreads; } uint32_t master_task_limit() const { return _master_task_limit; } uint32_t defaultTaskLimit() const { return _defaultTaskLimit; } + bool is_task_limit_hard() const { return _is_task_limit_hard; } OptimizeFor optimize() const { return _optimize; } uint32_t kindOfwatermark() const { return _kindOfWatermark; } vespalib::duration reactionTime() const { return _reactionTime; } diff --git a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp index 5842965a18a..16099700980 100644 --- a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp @@ -7,8 +7,8 @@ using document::Document; namespace proton { -UpdateDoneContext::UpdateDoneContext(IDestructorCallback::SP token, IPendingLidTracker::Token uncommitted, const document::DocumentUpdate::SP &upd) - : OperationDoneContext(std::move(token)), +UpdateDoneContext::UpdateDoneContext(std::shared_ptr<feedtoken::IState> token, IPendingLidTracker::Token uncommitted, const document::DocumentUpdate::SP &upd) + : OperationDoneContext(std::move(token), {}), _uncommitted(std::move(uncommitted)), _upd(upd), _doc() diff --git a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h index 62448102f30..7dd6c4a61f2 100644 --- a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h @@ -24,7 +24,7 @@ class UpdateDoneContext : public OperationDoneContext document::DocumentUpdate::SP _upd; std::shared_future<std::unique_ptr<const document::Document>> _doc; public: - UpdateDoneContext(IDestructorCallback::SP token, IPendingLidTracker::Token uncommitted, const document::DocumentUpdate::SP &upd); + UpdateDoneContext(std::shared_ptr<feedtoken::IState> token, IPendingLidTracker::Token uncommitted, const document::DocumentUpdate::SP &upd); ~UpdateDoneContext() override; const document::DocumentUpdate &getUpdate() { return *_upd; } diff --git a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp index 95643c7d1a8..0bf5b351fd0 100644 --- a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp @@ -2,6 +2,7 @@ #include "summaryengine.h" #include <vespa/vespalib/data/slime/slime.h> #include <vespa/vespalib/util/size_literals.h> +#include <vespa/vespalib/util/cpu_usage.h> #include <vespa/log/log.h> LOG_SETUP(".proton.summaryengine.summaryengine"); @@ -10,6 +11,7 @@ using namespace search::engine; using namespace proton; using vespalib::Memory; using vespalib::slime::Inspector; +using vespalib::CpuUsage; namespace { @@ -61,7 +63,7 @@ SummaryEngine::SummaryEngine(size_t numThreads, bool async) _closed(false), _forward_issues(true), _handlers(), - _executor(numThreads, 128_Ki, summary_engine_executor), + _executor(numThreads, 128_Ki, CpuUsage::wrap(summary_engine_executor, CpuUsage::Category::READ)), _metrics(std::make_unique<DocsumMetrics>()) { } @@ -141,6 +143,9 @@ SummaryEngine::getDocsums(DocsumRequest::UP req) } } updateDocsumMetrics(vespalib::to_s(req->getTimeUsed()), getNumDocs(*reply)); + if (req->expired()) { + vespalib::Issue::report("docsum request timed out; results may be incomplete"); + } } if (! reply) { reply = std::make_unique<DocsumReply>(); diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_attribute_manager.h b/searchcore/src/vespa/searchcore/proton/test/mock_attribute_manager.h index 743cd9af8fc..0e2491e62fa 100644 --- a/searchcore/src/vespa/searchcore/proton/test/mock_attribute_manager.h +++ b/searchcore/src/vespa/searchcore/proton/test/mock_attribute_manager.h @@ -14,7 +14,7 @@ private: std::vector<search::AttributeVector*> _writables; std::unique_ptr<ImportedAttributesRepo> _importedAttributes; vespalib::ISequencedTaskExecutor* _writer; - vespalib::ThreadExecutor* _shared; + vespalib::Executor* _shared; public: MockAttributeManager() @@ -33,7 +33,7 @@ public: void set_writer(vespalib::ISequencedTaskExecutor& writer) { _writer = &writer; } - void set_shared_executor(vespalib::ThreadExecutor& shared) { + void set_shared_executor(vespalib::Executor& shared) { _shared = &shared; } search::AttributeGuard::UP getAttribute(const vespalib::string &name) const override { @@ -72,7 +72,7 @@ public: assert(_writer != nullptr); return *_writer; } - vespalib::ThreadExecutor& get_shared_executor() const override { + vespalib::Executor& get_shared_executor() const override { assert(_shared != nullptr); return *_shared; } diff --git a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h index e93b1632b3f..78a740ec2c3 100644 --- a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h +++ b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h @@ -15,7 +15,7 @@ private: SyncableThreadServiceObserver _master; ThreadServiceObserver _index; ThreadExecutorObserver _summary; - vespalib::ThreadExecutor & _shared; + vespalib::Executor & _shared; vespalib::SequencedTaskExecutorObserver _indexFieldInverter; vespalib::SequencedTaskExecutorObserver _indexFieldWriter; vespalib::SequencedTaskExecutorObserver _attributeFieldWriter; @@ -46,7 +46,7 @@ public: vespalib::ThreadExecutor &summary() override { return _summary; } - vespalib::ThreadExecutor &shared() override { + vespalib::Executor &shared() override { return _shared; } vespalib::ISequencedTaskExecutor &indexFieldInverter() override { |