summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorArne H Juul <arnej@yahooinc.com>2022-02-17 18:42:02 +0000
committerArne H Juul <arnej@yahooinc.com>2022-02-17 18:42:02 +0000
commit419ca82c28e7ea97ae7f7f1265eca158fae0844c (patch)
tree55ee9be894ad4f862ffeb12f733048451ff8c063 /searchcore
parentc1e701cd70e2531302a6e72d0900772f4296ab2c (diff)
parent70e50ea1b7d974ffb2e1db805e8e273eeffd6d0e (diff)
Merge branch 'master' into arnej/rename-summaryfeatures-back-to-original
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/apps/tests/persistenceconformance_test.cpp11
-rw-r--r--searchcore/src/apps/verify_ranksetup/verify_ranksetup.cpp2
-rw-r--r--searchcore/src/apps/vespa-dump-feed/vespa-dump-feed.cpp2
-rw-r--r--searchcore/src/apps/vespa-gen-testdocs/vespa-gen-testdocs.cpp28
-rw-r--r--searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp6
-rw-r--r--searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp1
-rw-r--r--searchcore/src/tests/proton/documentdb/documentdb_test.cpp2
-rw-r--r--searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp13
-rw-r--r--searchcore/src/tests/proton/index/diskindexcleaner_test.cpp32
-rw-r--r--searchcore/src/tests/proton/persistenceengine/resource_usage_tracker/resource_usage_tracker_test.cpp14
-rw-r--r--searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp2
-rw-r--r--searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp8
-rw-r--r--searchcore/src/tests/proton/server/disk_mem_usage_filter/disk_mem_usage_filter_test.cpp20
-rw-r--r--searchcore/src/tests/proton/server/disk_mem_usage_metrics/disk_mem_usage_metrics_test.cpp40
-rw-r--r--searchcore/src/tests/proton/server/feedstates_test.cpp5
-rw-r--r--searchcore/src/tests/proton/server/memory_flush_config_updater/memory_flush_config_updater_test.cpp28
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h3
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/config/proton.def14
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp10
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attributemanager.h26
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.h9
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/i_attribute_manager.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/feedtoken.h24
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/hw_info_sampler.cpp43
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.cpp37
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.h26
-rw-r--r--searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h3
-rw-r--r--searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp11
-rw-r--r--searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp12
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/docsum_matcher.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/extract_features.cpp27
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/extract_features.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/documentdb_job_trackers.cpp8
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/documentdb_job_trackers.h34
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/i_job_tracker.h6
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp8
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h8
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_task.cpp9
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_task.h12
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/job_tracker.h10
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/resource_usage_metrics.cpp40
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/resource_usage_metrics.h26
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/resource_usage_tracker.cpp9
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bootstrapconfigmanager.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_metrics.cpp32
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_metrics.h28
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_state.h3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp19
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentretriever.cpp9
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp44
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedstates.cpp39
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedstates.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp14
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenancejobrunner.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/memory_flush_config_updater.cpp5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp13
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp59
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.h9
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/putdonecontext.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/removedonecontext.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/replay_throttling_policy.h27
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp9
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp71
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/threading_service_config.h8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/mock_attribute_manager.h6
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h4
100 files changed, 801 insertions, 414 deletions
diff --git a/searchcore/src/apps/tests/persistenceconformance_test.cpp b/searchcore/src/apps/tests/persistenceconformance_test.cpp
index 214c57557bc..483cc3f2792 100644
--- a/searchcore/src/apps/tests/persistenceconformance_test.cpp
+++ b/searchcore/src/apps/tests/persistenceconformance_test.cpp
@@ -39,6 +39,7 @@
#include <vespa/searchsummary/config/config-juniperrc.h>
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/vespalib/util/size_literals.h>
+#include <vespa/config/subscription/sourcespec.h>
#include <vespa/log/log.h>
LOG_SETUP("persistenceconformance_test");
@@ -178,6 +179,12 @@ private:
MockSharedThreadingService _shared_service;
storage::spi::dummy::DummyBucketExecutor _bucketExecutor;
+ static std::shared_ptr<ProtonConfig> make_proton_config() {
+ ProtonConfigBuilder proton_config;
+ proton_config.indexing.optimize = ProtonConfigBuilder::Indexing::Optimize::LATENCY;
+ return std::make_shared<ProtonConfig>(proton_config);
+ }
+
public:
DocumentDBFactory(const vespalib::string &baseDir, int tlsListenPort);
~DocumentDBFactory() override;
@@ -193,10 +200,10 @@ public:
fileCfg.saveConfig(*snapshot, 1);
}
config::DirSpec spec(inputCfg + "/config-1");
- TuneFileDocumentDB::SP tuneFileDocDB(new TuneFileDocumentDB());
+ auto tuneFileDocDB = std::make_shared<TuneFileDocumentDB>();
DocumentDBConfigHelper mgr(spec, docType.getName());
auto b = std::make_shared<BootstrapConfig>(1, factory.getTypeCfg(), factory.getTypeRepo(),
- std::make_shared<ProtonConfig>(),
+ make_proton_config(),
std::make_shared<FiledistributorrpcConfig>(),
std::make_shared<BucketspacesConfig>(),
tuneFileDocDB, HwInfo());
diff --git a/searchcore/src/apps/verify_ranksetup/verify_ranksetup.cpp b/searchcore/src/apps/verify_ranksetup/verify_ranksetup.cpp
index a52c9ec2fb6..5ec1794c1b0 100644
--- a/searchcore/src/apps/verify_ranksetup/verify_ranksetup.cpp
+++ b/searchcore/src/apps/verify_ranksetup/verify_ranksetup.cpp
@@ -4,7 +4,6 @@
#include <vespa/config-attributes.h>
#include <vespa/config-indexschema.h>
#include <vespa/config-rank-profiles.h>
-#include <vespa/config/config.h>
#include <vespa/config/helper/legacy.h>
#include <vespa/config/common/configcontext.h>
#include <vespa/config/common/exceptions.h>
@@ -21,6 +20,7 @@
#include <vespa/searchlib/features/setup.h>
#include <vespa/searchlib/fef/fef.h>
#include <vespa/searchlib/fef/test/plugin/setup.h>
+#include <vespa/config/subscription/configsubscriber.hpp>
#include <vespa/fastos/app.h>
#include <optional>
diff --git a/searchcore/src/apps/vespa-dump-feed/vespa-dump-feed.cpp b/searchcore/src/apps/vespa-dump-feed/vespa-dump-feed.cpp
index 9819a1d50af..9f6254f9baa 100644
--- a/searchcore/src/apps/vespa-dump-feed/vespa-dump-feed.cpp
+++ b/searchcore/src/apps/vespa-dump-feed/vespa-dump-feed.cpp
@@ -1,13 +1,11 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/config/config.h>
#include <vespa/config/print/fileconfigwriter.h>
#include <vespa/document/config/config-documenttypes.h>
#include <vespa/document/document.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/documentapi/documentapi.h>
#include <vespa/messagebus/destinationsession.h>
-#include <vespa/messagebus/protocolset.h>
#include <vespa/messagebus/rpcmessagebus.h>
#include <vespa/messagebus/network/rpcnetworkparams.h>
#include <vespa/vespalib/io/fileutil.h>
diff --git a/searchcore/src/apps/vespa-gen-testdocs/vespa-gen-testdocs.cpp b/searchcore/src/apps/vespa-gen-testdocs/vespa-gen-testdocs.cpp
index 6e8223399e6..c5c50ff596d 100644
--- a/searchcore/src/apps/vespa-gen-testdocs/vespa-gen-testdocs.cpp
+++ b/searchcore/src/apps/vespa-gen-testdocs/vespa-gen-testdocs.cpp
@@ -12,13 +12,13 @@
#include <openssl/evp.h>
#include <cassert>
#include <getopt.h>
+#include <vector>
#include <vespa/log/log.h>
LOG_SETUP("vespa-gen-testdocs");
typedef vespalib::hash_set<vespalib::string> StringSet;
typedef std::vector<vespalib::string> StringArray;
-typedef std::shared_ptr<StringArray> StringArraySP;
using namespace vespalib::alloc;
using vespalib::string;
@@ -38,10 +38,7 @@ void
usageHeader()
{
using std::cerr;
- cerr <<
- "vespa-gen-testdocs version 0.0\n"
- "\n"
- "USAGE:\n";
+ cerr << "vespa-gen-testdocs version 0.0\n\nUSAGE:\n";
}
string
@@ -71,8 +68,7 @@ splitArg(const string &arg)
}
void
-shafile(const string &baseDir,
- const string &file)
+shafile(const string &baseDir, const string &file)
{
unsigned char digest[EVP_MAX_MD_SIZE];
unsigned int digest_len = 0;
@@ -98,7 +94,6 @@ shafile(const string &baseDir,
EVP_DigestUpdate(md_ctx.get(), buf.get(), thistime);
remainder -= thistime;
}
- f.Close();
EVP_DigestFinal_ex(md_ctx.get(), &digest[0], &digest_len);
assert(digest_len > 0u && digest_len <= EVP_MAX_MD_SIZE);
for (unsigned int i = 0; i < digest_len; ++i) {
@@ -106,10 +101,7 @@ shafile(const string &baseDir,
os.fill('0');
os << std::hex << static_cast<unsigned int>(digest[i]);
}
- LOG(info,
- "SHA256(%s)= %s",
- file.c_str(),
- os.str().c_str());
+ LOG(info, "SHA256(%s)= %s", file.c_str(), os.str().c_str());
}
class StringGenerator
@@ -119,14 +111,9 @@ class StringGenerator
public:
StringGenerator(vespalib::Rand48 &rnd);
- void
- rand_string(string &res, uint32_t minLen, uint32_t maxLen);
+ void rand_string(string &res, uint32_t minLen, uint32_t maxLen);
- void
- rand_unique_array(StringArray &res,
- uint32_t minLen,
- uint32_t maxLen,
- uint32_t size);
+ void rand_unique_array(StringArray &res, uint32_t minLen, uint32_t maxLen, uint32_t size);
};
@@ -590,7 +577,8 @@ DocumentGenerator::generate(uint32_t docMin, uint32_t docIdLimit,
}
}
f.Flush();
- f.Close();
+ bool close_ok = f.Close();
+ assert(close_ok);
LOG(info, "Calculating sha256 for %s", feedFileName.c_str());
shafile(baseDir, feedFileName);
}
diff --git a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp
index c1dbe9b2bd2..beb1e0bd6bc 100644
--- a/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp
+++ b/searchcore/src/apps/vespa-redistribute-bm/vespa_redistribute_bm.cpp
@@ -465,7 +465,6 @@ App::usage()
"USAGE:\n";
std::cerr <<
"vespa-redistribute-bm\n"
- "[--async-apply-bucket-diff]\n"
"[--bucket-db-stripe-bits bits]\n"
"[--client-threads threads]\n"
"[--distributor-merge-busy-wait distributor-merge-busy-wait]\n"
@@ -502,7 +501,6 @@ App::get_options()
const char *opt_argument = nullptr;
int long_opt_index = 0;
static struct option long_opts[] = {
- { "async-apply-bucket-diff", 0, nullptr, 0 },
{ "bucket-db-stripe-bits", 1, nullptr, 0 },
{ "client-threads", 1, nullptr, 0 },
{ "distributor-merge-busy-wait", 1, nullptr, 0 },
@@ -533,7 +531,6 @@ App::get_options()
{ nullptr, 0, nullptr, 0 }
};
enum longopts_enum {
- LONGOPT_ASYNC_APPLY_BUCKET_DIFF,
LONGOPT_BUCKET_DB_STRIPE_BITS,
LONGOPT_CLIENT_THREADS,
LONGOPT_DISTRIBUTOR_MERGE_BUSY_WAIT,
@@ -568,9 +565,6 @@ App::get_options()
switch (c) {
case 0:
switch(long_opt_index) {
- case LONGOPT_ASYNC_APPLY_BUCKET_DIFF:
- _bm_params.set_async_apply_bucket_diff(true);
- break;
case LONGOPT_BUCKET_DB_STRIPE_BITS:
_bm_params.set_bucket_db_stripe_bits(atoi(opt_argument));
break;
diff --git a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp
index 1851455e321..b9e3549053a 100644
--- a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp
@@ -35,6 +35,7 @@
#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/vespalib/util/destructor_callbacks.h>
+#include <vespa/config/subscription/sourcespec.h>
using namespace cloud::config::filedistribution;
using namespace document;
diff --git a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp
index 77f7cf4d8ed..7ba3e0b8240 100644
--- a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp
@@ -6,7 +6,6 @@
#include <vespa/document/datatype/documenttype.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/test/make_bucket_space.h>
-#include <vespa/fastos/file.h>
#include <vespa/persistence/dummyimpl/dummy_bucket_executor.h>
#include <vespa/searchcore/proton/attribute/flushableattribute.h>
#include <vespa/searchcore/proton/common/statusreport.h>
@@ -35,6 +34,7 @@
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/testkit/test_kit.h>
#include <vespa/vespalib/util/size_literals.h>
+#include <vespa/config/subscription/sourcespec.h>
#include <iostream>
using namespace cloud::config::filedistribution;
diff --git a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp
index a59b3e9bc6f..fc8bd474813 100644
--- a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp
@@ -14,11 +14,11 @@ using ProtonConfigBuilder = vespa::config::search::core::ProtonConfigBuilder;
struct Fixture {
ProtonConfig cfg;
- Fixture(uint32_t baseLineIndexingThreads = 2, uint32_t master_task_limit = 2000, uint32_t task_limit = 500)
+ Fixture(uint32_t baseLineIndexingThreads = 2, uint32_t master_task_limit = 2000, int32_t task_limit = 500)
: cfg(makeConfig(baseLineIndexingThreads, master_task_limit, task_limit))
{
}
- ProtonConfig makeConfig(uint32_t baseLineIndexingThreads, uint32_t master_task_limit, uint32_t task_limit) {
+ ProtonConfig makeConfig(uint32_t baseLineIndexingThreads, uint32_t master_task_limit, int32_t task_limit) {
ProtonConfigBuilder builder;
builder.indexing.threads = baseLineIndexingThreads;
builder.indexing.tasklimit = task_limit;
@@ -56,6 +56,15 @@ TEST_F("require that task limits are set", Fixture)
auto tcfg = f.make(24);
EXPECT_EQUAL(2000u, tcfg.master_task_limit());
EXPECT_EQUAL(500u, tcfg.defaultTaskLimit());
+ EXPECT_TRUE(tcfg.is_task_limit_hard());
+}
+
+TEST_F("require that negative task limit makes it soft", Fixture(2, 3000, -700))
+{
+ auto tcfg = f.make(24);
+ EXPECT_EQUAL(3000u, tcfg.master_task_limit());
+ EXPECT_EQUAL(700u, tcfg.defaultTaskLimit());
+ EXPECT_FALSE(tcfg.is_task_limit_hard());
}
namespace {
diff --git a/searchcore/src/tests/proton/index/diskindexcleaner_test.cpp b/searchcore/src/tests/proton/index/diskindexcleaner_test.cpp
index 6eab2c5bf3d..c814bdf3f37 100644
--- a/searchcore/src/tests/proton/index/diskindexcleaner_test.cpp
+++ b/searchcore/src/tests/proton/index/diskindexcleaner_test.cpp
@@ -1,7 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
// Unit tests for diskindexcleaner.
-#include <vespa/searchcorespi/index/activediskindexes.h>
+#include <vespa/searchcorespi/index/disk_indexes.h>
#include <vespa/searchcorespi/index/diskindexcleaner.h>
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/fastos/file.h>
@@ -90,8 +90,8 @@ void createIndexes() {
void Test::requireThatAllIndexesOlderThanLastFusionIsRemoved() {
createIndexes();
- ActiveDiskIndexes active_indexes;
- DiskIndexCleaner::clean(index_dir, active_indexes);
+ DiskIndexes disk_indexes;
+ DiskIndexCleaner::clean(index_dir, disk_indexes);
vector<string> indexes = readIndexes();
EXPECT_EQUAL(3u, indexes.size());
EXPECT_TRUE(contains(indexes, "index.fusion.2"));
@@ -101,17 +101,17 @@ void Test::requireThatAllIndexesOlderThanLastFusionIsRemoved() {
void Test::requireThatIndexesInUseAreNotRemoved() {
createIndexes();
- ActiveDiskIndexes active_indexes;
- active_indexes.setActive(index_dir + "/index.fusion.1");
- active_indexes.setActive(index_dir + "/index.flush.2");
- DiskIndexCleaner::clean(index_dir, active_indexes);
+ DiskIndexes disk_indexes;
+ disk_indexes.setActive(index_dir + "/index.fusion.1", 0);
+ disk_indexes.setActive(index_dir + "/index.flush.2", 0);
+ DiskIndexCleaner::clean(index_dir, disk_indexes);
vector<string> indexes = readIndexes();
EXPECT_TRUE(contains(indexes, "index.fusion.1"));
EXPECT_TRUE(contains(indexes, "index.flush.2"));
- active_indexes.notActive(index_dir + "/index.fusion.1");
- active_indexes.notActive(index_dir + "/index.flush.2");
- DiskIndexCleaner::clean(index_dir, active_indexes);
+ disk_indexes.notActive(index_dir + "/index.fusion.1");
+ disk_indexes.notActive(index_dir + "/index.flush.2");
+ DiskIndexCleaner::clean(index_dir, disk_indexes);
indexes = readIndexes();
EXPECT_TRUE(!contains(indexes, "index.fusion.1"));
EXPECT_TRUE(!contains(indexes, "index.flush.2"));
@@ -120,8 +120,8 @@ void Test::requireThatIndexesInUseAreNotRemoved() {
void Test::requireThatInvalidFlushIndexesAreRemoved() {
createIndexes();
FastOS_File((index_dir + "/index.flush.4/serial.dat").c_str()).Delete();
- ActiveDiskIndexes active_indexes;
- DiskIndexCleaner::clean(index_dir, active_indexes);
+ DiskIndexes disk_indexes;
+ DiskIndexCleaner::clean(index_dir, disk_indexes);
vector<string> indexes = readIndexes();
EXPECT_EQUAL(2u, indexes.size());
EXPECT_TRUE(contains(indexes, "index.fusion.2"));
@@ -131,8 +131,8 @@ void Test::requireThatInvalidFlushIndexesAreRemoved() {
void Test::requireThatInvalidFusionIndexesAreRemoved() {
createIndexes();
FastOS_File((index_dir + "/index.fusion.2/serial.dat").c_str()).Delete();
- ActiveDiskIndexes active_indexes;
- DiskIndexCleaner::clean(index_dir, active_indexes);
+ DiskIndexes disk_indexes;
+ DiskIndexCleaner::clean(index_dir, disk_indexes);
vector<string> indexes = readIndexes();
EXPECT_EQUAL(4u, indexes.size());
EXPECT_TRUE(contains(indexes, "index.fusion.1"));
@@ -144,8 +144,8 @@ void Test::requireThatInvalidFusionIndexesAreRemoved() {
void Test::requireThatRemoveDontTouchNewIndexes() {
createIndexes();
FastOS_File((index_dir + "/index.flush.4/serial.dat").c_str()).Delete();
- ActiveDiskIndexes active_indexes;
- DiskIndexCleaner::removeOldIndexes(index_dir, active_indexes);
+ DiskIndexes disk_indexes;
+ DiskIndexCleaner::removeOldIndexes(index_dir, disk_indexes);
vector<string> indexes = readIndexes();
EXPECT_EQUAL(3u, indexes.size());
EXPECT_TRUE(contains(indexes, "index.fusion.2"));
diff --git a/searchcore/src/tests/proton/persistenceengine/resource_usage_tracker/resource_usage_tracker_test.cpp b/searchcore/src/tests/proton/persistenceengine/resource_usage_tracker/resource_usage_tracker_test.cpp
index 313f3e9a270..c0d94ba6376 100644
--- a/searchcore/src/tests/proton/persistenceengine/resource_usage_tracker/resource_usage_tracker_test.cpp
+++ b/searchcore/src/tests/proton/persistenceengine/resource_usage_tracker/resource_usage_tracker_test.cpp
@@ -57,9 +57,10 @@ public:
~ResourceUsageTrackerTest();
- void notify(double disk_usage, double memory_usage)
+ void notify(double disk_usage, double memory_usage, double transient_disk_usage = 0.0, double transient_memory_usage = 0.0)
{
- _notifier.notify(DiskMemUsageState({ 0.8, disk_usage }, { 0.8, memory_usage }));
+ _notifier.notify(DiskMemUsageState({ 0.8, disk_usage }, { 0.8, memory_usage },
+ transient_disk_usage, transient_memory_usage));
}
ResourceUsage get_usage() { return _listener->get_usage(); }
@@ -77,6 +78,15 @@ TEST_F(ResourceUsageTrackerTest, resource_usage_is_forwarded_to_listener)
EXPECT_EQ(ResourceUsage(0.75, 0.25), get_usage());
}
+TEST_F(ResourceUsageTrackerTest, transient_resource_usage_is_subtracted_from_absolute_usage)
+{
+ auto register_guard = _tracker->set_listener(*_listener);
+ notify(0.8, 0.5, 0.4, 0.2);
+ EXPECT_EQ(ResourceUsage(0.4, 0.3), get_usage());
+ notify(0.8, 0.5, 0.9, 0.6);
+ EXPECT_EQ(ResourceUsage(0.0, 0.0), get_usage());
+}
+
TEST_F(ResourceUsageTrackerTest, forwarding_depends_on_register_guard)
{
auto register_guard = _tracker->set_listener(*_listener);
diff --git a/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp b/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp
index 6d3eaa30263..3c9e9fc3a64 100644
--- a/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp
+++ b/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp
@@ -69,7 +69,7 @@ struct ConfigTestFixture {
BucketspacesConfigBuilder bucketspacesBuilder;
map<std::string, DoctypeFixture::UP> dbConfig;
ConfigSet set;
- IConfigContext::SP context;
+ std::shared_ptr<IConfigContext> context;
int idcounter;
ConfigTestFixture(const std::string & id)
diff --git a/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp b/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp
index aa819d08b58..85f8e8171a8 100644
--- a/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp
+++ b/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp
@@ -7,12 +7,12 @@
#include <vespa/config-rank-profiles.h>
#include <vespa/config-summary.h>
#include <vespa/config-summarymap.h>
+#include <vespa/config-bucketspaces.h>
#include <vespa/document/config/documenttypes_config_fwd.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/fileacquirer/config-filedistributorrpc.h>
#include <vespa/searchcore/proton/common/alloc_config.h>
#include <vespa/searchcore/proton/server/bootstrapconfig.h>
-#include <vespa/searchcore/proton/server/bootstrapconfigmanager.h>
#include <vespa/searchcore/proton/server/documentdbconfigmanager.h>
#include <vespa/searchcore/proton/server/document_db_config_owner.h>
#include <vespa/searchcore/proton/server/proton_config_snapshot.h>
@@ -21,14 +21,12 @@
#include <vespa/searchcore/proton/server/i_proton_disk_layout.h>
#include <vespa/searchcore/proton/server/threading_service_config.h>
#include <vespa/searchsummary/config/config-juniperrc.h>
-#include <vespa/searchcore/config/config-ranking-constants.h>
-#include <vespa/searchcore/config/config-onnx-models.h>
-#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/searchcommon/common/schemaconfigurer.h>
#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/vespalib/test/insertion_operators.h>
-#include <vespa/config-bucketspaces.h>
+#include <vespa/config/subscription/configuri.h>
+#include <vespa/vespalib/gtest/gtest.h>
using namespace config;
using namespace proton;
diff --git a/searchcore/src/tests/proton/server/disk_mem_usage_filter/disk_mem_usage_filter_test.cpp b/searchcore/src/tests/proton/server/disk_mem_usage_filter/disk_mem_usage_filter_test.cpp
index ce85517ee09..db32c1e77c4 100644
--- a/searchcore/src/tests/proton/server/disk_mem_usage_filter/disk_mem_usage_filter_test.cpp
+++ b/searchcore/src/tests/proton/server/disk_mem_usage_filter/disk_mem_usage_filter_test.cpp
@@ -121,18 +121,22 @@ TEST_F(DiskMemUsageFilterTest, both_disk_limit_and_memory_limit_can_be_reached)
"capacity: 100, used: 90, diskUsed: 0.9, diskLimit: 0.8}}");
}
-TEST_F(DiskMemUsageFilterTest, transient_disk_usage_is_tracked_in_usage_state_and_metrics)
+TEST_F(DiskMemUsageFilterTest, transient_and_non_transient_disk_usage_tracked_in_usage_state_and_metrics)
{
- _filter.set_transient_resource_usage({40, 0});
- EXPECT_EQ(0.4, _filter.usageState().transient_disk_usage());
- EXPECT_EQ(0.4, _filter.get_metrics().get_transient_disk_usage());
+ _filter.set_transient_resource_usage({15, 0});
+ EXPECT_DOUBLE_EQ(0.15, _filter.usageState().transient_disk_usage());
+ EXPECT_DOUBLE_EQ(0.15, _filter.get_metrics().transient_disk_usage());
+ EXPECT_DOUBLE_EQ(0.05, _filter.usageState().non_transient_disk_usage());
+ EXPECT_DOUBLE_EQ(0.05, _filter.get_metrics().non_transient_disk_usage());
}
-TEST_F(DiskMemUsageFilterTest, transient_memory_usage_is_tracked_in_usage_state_and_metrics)
+TEST_F(DiskMemUsageFilterTest, transient_and_non_transient_memory_usage_tracked_in_usage_state_and_metrics)
{
- _filter.set_transient_resource_usage({0, 200});
- EXPECT_EQ(0.2, _filter.usageState().transient_memory_usage());
- EXPECT_EQ(0.2, _filter.get_metrics().get_transient_memory_usage());
+ _filter.set_transient_resource_usage({0, 100});
+ EXPECT_DOUBLE_EQ(0.1, _filter.usageState().transient_memory_usage());
+ EXPECT_DOUBLE_EQ(0.1, _filter.get_metrics().transient_memory_usage());
+ EXPECT_DOUBLE_EQ(0.2, _filter.usageState().non_transient_memory_usage());
+ EXPECT_DOUBLE_EQ(0.2, _filter.get_metrics().non_transient_memory_usage());
}
GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/searchcore/src/tests/proton/server/disk_mem_usage_metrics/disk_mem_usage_metrics_test.cpp b/searchcore/src/tests/proton/server/disk_mem_usage_metrics/disk_mem_usage_metrics_test.cpp
index 60240e06fc4..8ba02875dc0 100644
--- a/searchcore/src/tests/proton/server/disk_mem_usage_metrics/disk_mem_usage_metrics_test.cpp
+++ b/searchcore/src/tests/proton/server/disk_mem_usage_metrics/disk_mem_usage_metrics_test.cpp
@@ -10,30 +10,46 @@ using proton::DiskMemUsageState;
using proton::ResourceUsageState;
bool
-expect_metrics(double disk_usage, double disk_utilization, double memory_usage, double memory_utilization, const DiskMemUsageMetrics &dm_metrics)
+expect_metrics(double disk_usage, double disk_utilization, double transient_disk, double non_transient_disk,
+ double memory_usage, double memory_utilization, double transient_memory, double non_transient_memory,
+ const DiskMemUsageMetrics &dm_metrics)
{
bool result = true;
- EXPECT_DOUBLE_EQ(disk_usage, dm_metrics.get_disk_usage()) << (result = false, "");
- EXPECT_DOUBLE_EQ(disk_utilization, dm_metrics.get_disk_utilization()) << (result = false, "");
- EXPECT_DOUBLE_EQ(memory_usage, dm_metrics.get_memory_usage()) << (result = false, "");
- EXPECT_DOUBLE_EQ(memory_utilization, dm_metrics.get_memory_utilization()) << (result = false, "");
+ EXPECT_DOUBLE_EQ(disk_usage, dm_metrics.total_disk_usage()) << (result = false, "");
+ EXPECT_DOUBLE_EQ(disk_utilization, dm_metrics.total_disk_utilization()) << (result = false, "");
+ EXPECT_DOUBLE_EQ(transient_disk, dm_metrics.transient_disk_usage()) << (result = false, "");
+ EXPECT_DOUBLE_EQ(non_transient_disk, dm_metrics.non_transient_disk_usage()) << (result = false, "");
+ EXPECT_DOUBLE_EQ(memory_usage, dm_metrics.total_memory_usage()) << (result = false, "");
+ EXPECT_DOUBLE_EQ(memory_utilization, dm_metrics.total_memory_utilization()) << (result = false, "");
+ EXPECT_DOUBLE_EQ(transient_memory, dm_metrics.transient_memory_usage()) << (result = false, "");
+ EXPECT_DOUBLE_EQ(non_transient_memory, dm_metrics.non_transient_memory_usage()) << (result = false, "");
return result;
}
TEST(DiskMemUsageMetricsTest, default_value_is_zero)
{
DiskMemUsageMetrics dm_metrics;
- EXPECT_TRUE(expect_metrics(0.0, 0.0, 0.0, 0.0, dm_metrics));
+ EXPECT_TRUE(expect_metrics(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, dm_metrics));
}
TEST(DiskMemUsageMetricsTest, merging_uses_max)
{
- DiskMemUsageMetrics dm_metrics({ResourceUsageState(0.5, 0.4), ResourceUsageState(0.5, 0.3)});
- EXPECT_TRUE(expect_metrics(0.4, 0.8, 0.3, 0.6, dm_metrics));
- dm_metrics.merge({ResourceUsageState(0.4, 0.4), ResourceUsageState(0.5, 0.4)});
- EXPECT_TRUE(expect_metrics(0.4, 1.0, 0.4, 0.8, dm_metrics));
- dm_metrics.merge({ResourceUsageState(0.5, 0.4), ResourceUsageState(0.5, 0.3)});
- EXPECT_TRUE(expect_metrics(0.4, 1.0, 0.4, 0.8, dm_metrics));
+ DiskMemUsageMetrics dm_metrics({ResourceUsageState(0.5, 0.4),
+ ResourceUsageState(0.5, 0.3), 0.1, 0.05});
+ EXPECT_TRUE(expect_metrics(0.4, 0.8, 0.1, 0.3,
+ 0.3, 0.6, 0.05, 0.25, dm_metrics));
+ dm_metrics.merge({ResourceUsageState(0.4, 0.4),
+ ResourceUsageState(0.3, 0.3), 0.1, 0.05});
+ EXPECT_TRUE(expect_metrics(0.4, 1.0, 0.1, 0.3,
+ 0.3, 1.0, 0.05, 0.25, dm_metrics));
+ dm_metrics.merge({ResourceUsageState(0.5, 0.45),
+ ResourceUsageState(0.5, 0.35), 0.1, 0.05});
+ EXPECT_TRUE(expect_metrics(0.45, 1.0, 0.1, 0.35,
+ 0.35, 1.0, 0.05, 0.3, dm_metrics));
+ dm_metrics.merge({ResourceUsageState(0.5, 0.4),
+ ResourceUsageState(0.5, 0.3), 0.15, 0.1});
+ EXPECT_TRUE(expect_metrics(0.45, 1.0, 0.15, 0.35,
+ 0.35, 1.0, 0.10, 0.3, dm_metrics));
}
GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/searchcore/src/tests/proton/server/feedstates_test.cpp b/searchcore/src/tests/proton/server/feedstates_test.cpp
index bb0c4eeb282..e01d90f5eed 100644
--- a/searchcore/src/tests/proton/server/feedstates_test.cpp
+++ b/searchcore/src/tests/proton/server/feedstates_test.cpp
@@ -8,6 +8,7 @@
#include <vespa/searchcore/proton/server/feedstates.h>
#include <vespa/searchcore/proton/server/ireplayconfig.h>
#include <vespa/searchcore/proton/server/memoryconfigstore.h>
+#include <vespa/searchcore/proton/server/replay_throttling_policy.h>
#include <vespa/searchcore/proton/feedoperation/removeoperation.h>
#include <vespa/searchcore/proton/test/dummy_feed_view.h>
#include <vespa/searchlib/common/serialnum.h>
@@ -71,6 +72,7 @@ struct Fixture
MemoryConfigStore config_store;
bucketdb::BucketDBOwner _bucketDB;
bucketdb::BucketDBHandler _bucketDBHandler;
+ ReplayThrottlingPolicy _replay_throttling_policy;
MyIncSerialNum _inc_serial_num;
ReplayTransactionLogState state;
@@ -86,8 +88,9 @@ Fixture::Fixture()
config_store(),
_bucketDB(),
_bucketDBHandler(_bucketDB),
+ _replay_throttling_policy({}),
_inc_serial_num(9u),
- state("doctypename", feed_view_ptr, _bucketDBHandler, replay_config, config_store, _inc_serial_num)
+ state("doctypename", feed_view_ptr, _bucketDBHandler, replay_config, config_store, _replay_throttling_policy, _inc_serial_num)
{
}
Fixture::~Fixture() = default;
diff --git a/searchcore/src/tests/proton/server/memory_flush_config_updater/memory_flush_config_updater_test.cpp b/searchcore/src/tests/proton/server/memory_flush_config_updater/memory_flush_config_updater_test.cpp
index 7ff59a9f41a..d544f1cb1ab 100644
--- a/searchcore/src/tests/proton/server/memory_flush_config_updater/memory_flush_config_updater_test.cpp
+++ b/searchcore/src/tests/proton/server/memory_flush_config_updater/memory_flush_config_updater_test.cpp
@@ -11,6 +11,7 @@ ProtonConfig::Flush::Memory
getConfig(int64_t maxMemory, int64_t eachMaxMemory, int64_t maxTlsSize,
double conservativeMemoryLimitFactor = 0.5,
double conservativeDiskLimitFactor = 0.6,
+ double high_watermark_factor = 0.9,
double lowWatermarkFactor = 0.8)
{
ProtonConfig::Flush::Memory result;
@@ -19,6 +20,7 @@ getConfig(int64_t maxMemory, int64_t eachMaxMemory, int64_t maxTlsSize,
result.maxtlssize = maxTlsSize;
result.conservative.memorylimitfactor = conservativeMemoryLimitFactor;
result.conservative.disklimitfactor = conservativeDiskLimitFactor;
+ result.conservative.highwatermarkfactor = high_watermark_factor;
result.conservative.lowwatermarkfactor = lowWatermarkFactor;
return result;
}
@@ -32,14 +34,16 @@ getDefaultConfig()
ResourceUsageState
aboveLimit()
{
- return ResourceUsageState(0.7, 0.8);
+ // The high watermark limit is 0.63 (0.7 * 0.9 (factor)).
+ return ResourceUsageState(0.7, 0.64);
}
ResourceUsageState
belowLimit()
{
- // This is still over default low watermark of 0.56 (0.7 * 0.8)
- return ResourceUsageState(0.7, 0.6);
+ // The high watermark limit is 0.63 (0.7 * 0.9 (factor)).
+ // This is still over the low watermark limit of 0.56 (0.7 * 0.8 (factor)).
+ return ResourceUsageState(0.7, 0.62);
}
const HwInfo::Memory defaultMemory(8_Gi);
@@ -175,6 +179,24 @@ TEST_F("require that last config if remembered when setting new disk/memory usag
TEST_DO(f.assertStrategyConfig(6, 3, 18));
}
+TEST_F("Use conservative settings when above high watermark for disk usage", Fixture)
+{
+ // The high watermark limit is 0.63 (0.7 * 0.9 (factor)).
+ f.notifyDiskMemUsage(ResourceUsageState(0.7, 0.62), belowLimit());
+ TEST_DO(f.assertStrategyConfig(4, 1, 20));
+ f.notifyDiskMemUsage(ResourceUsageState(0.7, 0.64), belowLimit());
+ TEST_DO(f.assertStrategyConfig(4, 1, 12));
+}
+
+TEST_F("Use conservative settings when above high watermark for memory usage", Fixture)
+{
+ // The high watermark limit is 0.54 (0.6 * 0.9 (factor)).
+ f.notifyDiskMemUsage(belowLimit(), ResourceUsageState(0.6, 0.53));
+ TEST_DO(f.assertStrategyConfig(4, 1, 20));
+ f.notifyDiskMemUsage(belowLimit(), ResourceUsageState(0.6, 0.55));
+ TEST_DO(f.assertStrategyConfig(2, 0, 20));
+}
+
TEST_F("require that we must go below low watermark for disk usage before using normal tls size value again", Fixture)
{
f.notifyDiskMemUsage(ResourceUsageState(0.7, 0.8), belowLimit());
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp
index 4a3466f1a51..3ff10b19164 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.cpp
@@ -6,8 +6,7 @@
namespace search::bmcluster {
BmClusterParams::BmClusterParams()
- : _async_apply_bucket_diff(),
- _bucket_db_stripe_bits(4),
+ : _bucket_db_stripe_bits(4),
_disable_queue_limits_for_chained_merges(false), // Same default as in stor-server.def
_distributor_merge_busy_wait(10), // Same default as stor_distributormanager.def
_distributor_stripes(0),
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h
index 36b4c22f6a8..d365a28b0b6 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_params.h
@@ -13,7 +13,6 @@ namespace search::bmcluster {
*/
class BmClusterParams
{
- std::optional<bool> _async_apply_bucket_diff;
uint32_t _bucket_db_stripe_bits;
bool _disable_queue_limits_for_chained_merges;
uint32_t _distributor_merge_busy_wait;
@@ -45,7 +44,6 @@ class BmClusterParams
public:
BmClusterParams();
~BmClusterParams();
- const std::optional<bool>& get_async_apply_bucket_diff() const noexcept { return _async_apply_bucket_diff; }
uint32_t get_bucket_db_stripe_bits() const { return _bucket_db_stripe_bits; }
bool get_disable_queue_limits_for_chained_merges() const noexcept { return _disable_queue_limits_for_chained_merges; }
uint32_t get_distributor_merge_busy_wait() const { return _distributor_merge_busy_wait; }
@@ -75,7 +73,6 @@ public:
bool needs_distributor() const { return _enable_distributor || _use_document_api; }
bool needs_message_bus() const { return _use_message_bus || _use_document_api; }
bool needs_service_layer() const { return _enable_service_layer || _enable_distributor || _use_storage_chain || _use_message_bus || _use_document_api; }
- void set_async_apply_bucket_diff(bool value) { _async_apply_bucket_diff = value; }
void set_bucket_db_stripe_bits(uint32_t value) { _bucket_db_stripe_bits = value; }
void set_disable_queue_limits_for_chained_merges(bool value) { _disable_queue_limits_for_chained_merges = value; }
void set_distributor_merge_busy_wait(uint32_t value) { _distributor_merge_busy_wait = value; }
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
index db2060bacf7..3587e8008f2 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
@@ -382,9 +382,6 @@ struct ServiceLayerConfigSet : public StorageConfigSet
stor_bucket_init(),
stor_visitor()
{
- if (params.get_async_apply_bucket_diff().has_value()) {
- stor_filestor.asyncApplyBucketDiff = params.get_async_apply_bucket_diff().value();
- }
stor_filestor.numResponseThreads = params.get_response_threads();
stor_filestor.numNetworkThreads = params.get_rpc_network_threads();
stor_filestor.useAsyncMessageHandlingOnSchedule = params.get_use_async_message_handling_on_schedule();
diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def
index 34530afe9f9..808535924f1 100644
--- a/searchcore/src/vespa/searchcore/config/proton.def
+++ b/searchcore/src/vespa/searchcore/config/proton.def
@@ -81,6 +81,10 @@ flush.memory.conservative.memorylimitfactor double default=0.5
## In this case this factor is multiplied with 'maxtlssize' to calculate a conservative value to use instead.
flush.memory.conservative.disklimitfactor double default=0.5
+## The factor used to multiply with the resource limits for disk / memory to find the high
+## watermark indicating when to from normal into conservative mode for the flush strategy.
+flush.memory.conservative.highwatermarkfactor double default=0.95
+
## The factor used to multiply with the resource limits for disk / memory to find the low
## watermark indicating when to go back from conservative to normal mode for the flush strategy.
flush.memory.conservative.lowwatermarkfactor double default=0.9
@@ -442,7 +446,7 @@ initialize.threads int default = 0
## Portion of max address space used in components in attribute vectors
## before put and update operations in feed is blocked.
-writefilter.attribute.address_space_limit double default = 0.9
+writefilter.attribute.address_space_limit double default = 0.92
## Portion of physical memory that can be resident memory in anonymous mapping
## by the proton process before put and update portion of feed is blocked.
@@ -543,3 +547,11 @@ tensor_implementation enum {TENSOR_ENGINE, FAST_VALUE} default = FAST_VALUE
## Whether to report issues back to the container via protobuf field
forward_issues bool default = true
+
+## Chooses the throttling policy used to control the window size
+## of the SharedOperationThrottler component used by the transaction log replay feed state.
+replay_throttling_policy.type enum { UNLIMITED, DYNAMIC } default=UNLIMITED
+## Only used if replay_throttling_policy.type == DYNAMIC:
+replay_throttling_policy.min_window_size int default=100
+replay_throttling_policy.max_window_size int default=10000
+replay_throttling_policy.window_size_increment int default=20
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.cpp
index 66be0737fe9..713582bf1ce 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.cpp
@@ -193,7 +193,7 @@ AttributeInitializer::loadAttribute(const AttributeVectorSP &attr,
assert(attr->hasLoadData());
vespalib::Timer timer;
EventLogger::loadAttributeStart(_documentSubDbName, attr->getName());
- if (!attr->load(&_executor)) {
+ if (!attr->load(&_shared_executor)) {
LOG(warning, "Could not load attribute vector '%s' from disk. Returning empty attribute vector",
attr->getBaseFileName().c_str());
return false;
@@ -235,13 +235,13 @@ AttributeInitializer::AttributeInitializer(const std::shared_ptr<AttributeDirect
const AttributeSpec &spec,
uint64_t currentSerialNum,
const IAttributeFactory &factory,
- vespalib::Executor & executor)
+ vespalib::Executor& shared_executor)
: _attrDir(attrDir),
_documentSubDbName(documentSubDbName),
_spec(spec),
_currentSerialNum(currentSerialNum),
_factory(factory),
- _executor(executor),
+ _shared_executor(shared_executor),
_header(),
_header_ok(false)
{
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.h b/searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.h
index 7b5d968353a..78a798c929e 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.h
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.h
@@ -30,7 +30,7 @@ private:
const AttributeSpec _spec;
const uint64_t _currentSerialNum;
const IAttributeFactory &_factory;
- vespalib::Executor &_executor;
+ vespalib::Executor &_shared_executor;
std::unique_ptr<const search::attribute::AttributeHeader> _header;
bool _header_ok;
@@ -48,7 +48,7 @@ private:
public:
AttributeInitializer(const std::shared_ptr<AttributeDirectory> &attrDir, const vespalib::string &documentSubDbName,
const AttributeSpec &spec, uint64_t currentSerialNum, const IAttributeFactory &factory,
- vespalib::Executor & executor);
+ vespalib::Executor& shared_executor);
~AttributeInitializer();
AttributeInitializerResult init() const;
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
index 52b367fd14b..f0a23f1038e 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
@@ -14,10 +14,10 @@
#include <vespa/searchlib/attribute/imported_attribute_vector.h>
#include <vespa/searchlib/tensor/prepare_result.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vespalib/util/cpu_usage.h>
#include <vespa/vespalib/util/destructor_callbacks.h>
#include <vespa/vespalib/util/gate.h>
#include <vespa/vespalib/util/idestructorcallback.h>
-#include <vespa/vespalib/util/threadexecutor.h>
#include <future>
#include <vespa/log/log.h>
@@ -29,6 +29,7 @@ using namespace search;
using ExecutorId = vespalib::ISequencedTaskExecutor::ExecutorId;
using search::attribute::ImportedAttributeVector;
using search::tensor::PrepareResult;
+using vespalib::CpuUsage;
using vespalib::GateCallback;
using vespalib::ISequencedTaskExecutor;
@@ -631,7 +632,7 @@ AttributeWriter::internalPut(SerialNum serialNum, const Document &doc, DocumentI
wc.consider_build_field_paths(doc);
auto prepare_task = std::make_unique<PreparePutTask>(serialNum, lid, wc, doc);
auto complete_task = std::make_unique<CompletePutTask>(*prepare_task, onWriteDone);
- _shared_executor.execute(std::move(prepare_task));
+ _shared_executor.execute(CpuUsage::wrap(std::move(prepare_task), CpuUsage::Category::WRITE));
_attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(complete_task));
} else {
if (allAttributes || wc.hasStructFieldAttribute()) {
@@ -781,7 +782,7 @@ AttributeWriter::update(SerialNum serialNum, const DocumentUpdate &upd, Document
auto complete_task = std::make_unique<CompletePutTask>(*prepare_task, onWriteDone);
LOG(debug, "About to handle assign update as two phase put for docid %u in attribute vector '%s'",
lid, attrp->getName().c_str());
- _shared_executor.execute(std::move(prepare_task));
+ _shared_executor.execute(CpuUsage::wrap(std::move(prepare_task), CpuUsage::Category::WRITE));
_attributeFieldWriter.executeTask(itr->second.executor_id, std::move(complete_task));
} else {
args[itr->second.executor_id.getId()]->_updates.emplace_back(attrp, &fupd);
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
index f43aab0f385..a5907233c4b 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.h
@@ -25,7 +25,7 @@ private:
using FieldValue = document::FieldValue;
const IAttributeManager::SP _mgr;
vespalib::ISequencedTaskExecutor &_attributeFieldWriter;
- vespalib::ThreadExecutor& _shared_executor;
+ vespalib::Executor& _shared_executor;
using ExecutorId = vespalib::ISequencedTaskExecutor::ExecutorId;
public:
/**
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp
index eee6264b9f4..ef7e422c722 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp
@@ -240,7 +240,7 @@ AttributeManager::AttributeManager(const vespalib::string &baseDir,
const TuneFileAttributes &tuneFileAttributes,
const FileHeaderContext &fileHeaderContext,
vespalib::ISequencedTaskExecutor &attributeFieldWriter,
- vespalib::ThreadExecutor& shared_executor,
+ vespalib::Executor& shared_executor,
const HwInfo &hwInfo)
: proton::IAttributeManager(),
_attributes(),
@@ -264,7 +264,7 @@ AttributeManager::AttributeManager(const vespalib::string &baseDir,
const search::TuneFileAttributes &tuneFileAttributes,
const search::common::FileHeaderContext &fileHeaderContext,
vespalib::ISequencedTaskExecutor &attributeFieldWriter,
- vespalib::ThreadExecutor& shared_executor,
+ vespalib::Executor& shared_executor,
const IAttributeFactory::SP &factory,
const HwInfo &hwInfo)
: proton::IAttributeManager(),
@@ -558,12 +558,6 @@ AttributeManager::getAttributeFieldWriter() const
return _attributeFieldWriter;
}
-vespalib::ThreadExecutor&
-AttributeManager::get_shared_executor() const
-{
- return _shared_executor;
-}
-
AttributeVector *
AttributeManager::getWritableAttribute(const vespalib::string &name) const
{
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.h b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.h
index 08e2d511d70..beea59b5350 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.h
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.h
@@ -31,14 +31,14 @@ class ShrinkLidSpaceFlushTarget;
class AttributeManager : public proton::IAttributeManager
{
private:
- typedef search::attribute::Config Config;
- typedef search::SerialNum SerialNum;
- typedef AttributeCollectionSpec Spec;
+ using AttributeReadGuard = search::attribute::AttributeReadGuard;
+ using AttributeVectorSP = std::shared_ptr<search::AttributeVector>;
+ using Config = search::attribute::Config;
using FlushableAttributeSP = std::shared_ptr<FlushableAttribute>;
- using ShrinkerSP = std::shared_ptr<ShrinkLidSpaceFlushTarget>;
using IFlushTargetSP = std::shared_ptr<searchcorespi::IFlushTarget>;
- using AttributeVectorSP = std::shared_ptr<search::AttributeVector>;
- using AttributeReadGuard = search::attribute::AttributeReadGuard;
+ using SerialNum = search::SerialNum;
+ using ShrinkerSP = std::shared_ptr<ShrinkLidSpaceFlushTarget>;
+ using Spec = AttributeCollectionSpec;
class AttributeWrap
{
@@ -67,8 +67,8 @@ private:
const ShrinkerSP &getShrinker() const { return _shrinker; }
};
- typedef vespalib::hash_map<vespalib::string, AttributeWrap> AttributeMap;
- typedef vespalib::hash_map<vespalib::string, FlushableWrap> FlushableMap;
+ using AttributeMap = vespalib::hash_map<vespalib::string, AttributeWrap>;
+ using FlushableMap = vespalib::hash_map<vespalib::string, FlushableWrap>;
AttributeMap _attributes;
FlushableMap _flushables;
@@ -80,7 +80,7 @@ private:
IAttributeFactory::SP _factory;
std::shared_ptr<search::attribute::Interlock> _interlock;
vespalib::ISequencedTaskExecutor &_attributeFieldWriter;
- vespalib::ThreadExecutor& _shared_executor;
+ vespalib::Executor& _shared_executor;
HwInfo _hwInfo;
std::unique_ptr<ImportedAttributesRepo> _importedAttributes;
@@ -100,14 +100,14 @@ private:
void transferExtraAttributes(const AttributeManager &currMgr);
public:
- typedef std::shared_ptr<AttributeManager> SP;
+ using SP = std::shared_ptr<AttributeManager>;
AttributeManager(const vespalib::string &baseDir,
const vespalib::string &documentSubDbName,
const search::TuneFileAttributes &tuneFileAttributes,
const search::common::FileHeaderContext & fileHeaderContext,
vespalib::ISequencedTaskExecutor &attributeFieldWriter,
- vespalib::ThreadExecutor& shared_executor,
+ vespalib::Executor& shared_executor,
const HwInfo &hwInfo);
AttributeManager(const vespalib::string &baseDir,
@@ -115,7 +115,7 @@ public:
const search::TuneFileAttributes &tuneFileAttributes,
const search::common::FileHeaderContext & fileHeaderContext,
vespalib::ISequencedTaskExecutor &attributeFieldWriter,
- vespalib::ThreadExecutor& shared_executor,
+ vespalib::Executor& shared_executor,
const IAttributeFactory::SP &factory,
const HwInfo &hwInfo);
@@ -171,7 +171,7 @@ public:
vespalib::ISequencedTaskExecutor &getAttributeFieldWriter() const override;
- vespalib::ThreadExecutor& get_shared_executor() const override;
+ vespalib::Executor& get_shared_executor() const override { return _shared_executor; }
search::AttributeVector *getWritableAttribute(const vespalib::string &name) const override;
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp b/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp
index 5f162281d96..d0caf92be17 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp
@@ -167,12 +167,6 @@ FilterAttributeManager::getAttributeFieldWriter() const
return _mgr->getAttributeFieldWriter();
}
-vespalib::ThreadExecutor&
-FilterAttributeManager::get_shared_executor() const
-{
- return _mgr->get_shared_executor();
-}
-
search::AttributeVector *
FilterAttributeManager::getWritableAttribute(const vespalib::string &name) const
{
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.h b/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.h
index 1512ab32d62..e291aca6922 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.h
+++ b/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.h
@@ -20,15 +20,14 @@ public:
typedef std::set<vespalib::string> AttributeSet;
private:
- AttributeSet _acceptedAttributes;
- IAttributeManager::SP _mgr;
+ AttributeSet _acceptedAttributes;
+ IAttributeManager::SP _mgr;
std::vector<search::AttributeVector *> _acceptedWritableAttributes;
bool acceptAttribute(const vespalib::string &name) const;
public:
- FilterAttributeManager(const AttributeSet &acceptedAttributes,
- IAttributeManager::SP mgr);
+ FilterAttributeManager(const AttributeSet &acceptedAttributes, IAttributeManager::SP mgr);
~FilterAttributeManager() override;
// Implements search::IAttributeManager
@@ -47,7 +46,7 @@ public:
void pruneRemovedFields(search::SerialNum serialNum) override;
const IAttributeFactory::SP &getFactory() const override;
vespalib::ISequencedTaskExecutor & getAttributeFieldWriter() const override;
- vespalib::ThreadExecutor& get_shared_executor() const override;
+ vespalib::Executor& get_shared_executor() const override { return _mgr->get_shared_executor(); }
search::AttributeVector * getWritableAttribute(const vespalib::string &name) const override;
const std::vector<search::AttributeVector *> & getWritableAttributes() const override;
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/i_attribute_manager.h b/searchcore/src/vespa/searchcore/proton/attribute/i_attribute_manager.h
index b8968ba9d2e..d32052fe4fa 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/i_attribute_manager.h
+++ b/searchcore/src/vespa/searchcore/proton/attribute/i_attribute_manager.h
@@ -14,7 +14,7 @@ namespace search::attribute { class IAttributeFunctor; }
namespace vespalib {
class ISequencedTaskExecutor;
- class ThreadExecutor;
+ class Executor;
class IDestructorCallback;
}
@@ -76,7 +76,7 @@ struct IAttributeManager : public search::IAttributeManager
virtual vespalib::ISequencedTaskExecutor &getAttributeFieldWriter() const = 0;
- virtual vespalib::ThreadExecutor& get_shared_executor() const = 0;
+ virtual vespalib::Executor& get_shared_executor() const = 0;
/*
* Get pointer to named writable attribute. If attribute isn't
diff --git a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt
index 421e602a9cf..03bad4c7eaf 100644
--- a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt
@@ -18,6 +18,7 @@ vespa_add_library(searchcore_pcommon STATIC
ipendinglidtracker.cpp
operation_rate_tracker.cpp
pendinglidtracker.cpp
+ replay_feedtoken_state.cpp
select_utils.cpp
selectcontext.cpp
selectpruner.cpp
diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp
index 675dce10be4..c74819577e9 100644
--- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp
+++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp
@@ -33,6 +33,12 @@ State::setResult(ResultUP result, bool documentWasFound) {
_result = std::move(result);
}
+bool
+State::is_replay() const noexcept
+{
+ return false;
+}
+
void
State::fail()
{
diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
index 8ccb4863878..53f551999d2 100644
--- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
+++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
@@ -22,19 +22,33 @@ public:
virtual void send(ResultUP result, bool documentWasFound) = 0;
};
+
+/*
+ * Interface class for feed token state.
+ */
+class IState : public vespalib::IDestructorCallback {
+public:
+ virtual bool is_replay() const noexcept = 0;
+ virtual void fail() = 0;
+ virtual void setResult(ResultUP result, bool documentWasFound) = 0;
+ virtual const storage::spi::Result &getResult() = 0;
+};
+
+
/**
* This holds the result of the feed operation until it is either failed or acked.
* Guarantees that the result is propagated back to the invoker via ITransport interface.
*/
-class State : public vespalib::IDestructorCallback {
+class State : public IState {
public:
State(const State &) = delete;
State & operator = (const State &) = delete;
State(ITransport & transport);
~State() override;
- void fail();
- void setResult(ResultUP result, bool documentWasFound);
- const storage::spi::Result &getResult() { return *_result; }
+ bool is_replay() const noexcept override;
+ void fail() override;
+ void setResult(ResultUP result, bool documentWasFound) override;
+ const storage::spi::Result &getResult() override { return *_result; }
protected:
void ack();
private:
@@ -71,7 +85,7 @@ make(std::shared_ptr<ITransport> transport) {
}
-using FeedToken = std::shared_ptr<feedtoken::State>;
+using FeedToken = std::shared_ptr<feedtoken::IState>;
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/common/hw_info_sampler.cpp b/searchcore/src/vespa/searchcore/proton/common/hw_info_sampler.cpp
index 2f0db6083c7..6d69c884c90 100644
--- a/searchcore/src/vespa/searchcore/proton/common/hw_info_sampler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/common/hw_info_sampler.cpp
@@ -1,15 +1,17 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "hw_info_sampler.h"
-#include <vespa/config/config.h>
#include <vespa/config/print/fileconfigwriter.h>
+#include <vespa/config/subscription/configsubscriber.hpp>
#include <vespa/fastos/file.h>
#include <vespa/searchcore/config/config-hwinfo.h>
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/vespalib/util/time.h>
#include <vespa/vespalib/util/size_literals.h>
+#include <vespa/vespalib/util/alloc.h>
#include <filesystem>
#include <thread>
+
#include <vespa/log/log.h>
LOG_SETUP(".proton.common.hw_info_sampler");
@@ -55,7 +57,8 @@ sampleCpuCores(const HwInfoSampler::Config &cfg)
return std::thread::hardware_concurrency();
}
-std::unique_ptr<HwinfoConfig> readConfig(const vespalib::string &path) {
+std::unique_ptr<HwinfoConfig>
+readConfig(const vespalib::string &path) {
FileSpec spec(path + "/" + "hwinfo.cfg");
ConfigSubscriber s(spec);
std::unique_ptr<ConfigHandle<HwinfoConfig>> handle = s.subscribe<HwinfoConfig>("hwinfo");
@@ -79,29 +82,31 @@ void writeConfig(const vespalib::string &path,
double measureDiskWriteSpeed(const vespalib::string &path,
size_t diskWriteLen)
{
- FastOS_File testFile;
vespalib::string fileName = path + "/hwinfo-writespeed";
size_t bufferLen = 1_Mi;
Alloc buffer(Alloc::allocMMap(bufferLen));
memset(buffer.get(), 0, buffer.size());
- testFile.EnableDirectIO();
- testFile.OpenWriteOnlyTruncate(fileName.c_str());
- sync();
- sleep(1);
- sync();
- sleep(1);
- Clock::time_point before = Clock::now();
- size_t residue = diskWriteLen;
- while (residue > 0) {
- size_t writeNow = std::min(residue, bufferLen);
- testFile.WriteBuf(buffer.get(), writeNow);
- residue -= writeNow;
+ double diskWriteSpeed;
+ {
+ FastOS_File testFile;
+ testFile.EnableDirectIO();
+ testFile.OpenWriteOnlyTruncate(fileName.c_str());
+ sync();
+ sleep(1);
+ sync();
+ sleep(1);
+ Clock::time_point before = Clock::now();
+ size_t residue = diskWriteLen;
+ while (residue > 0) {
+ size_t writeNow = std::min(residue, bufferLen);
+ testFile.WriteBuf(buffer.get(), writeNow);
+ residue -= writeNow;
+ }
+ Clock::time_point after = Clock::now();
+ double elapsed = vespalib::to_s(after - before);
+ diskWriteSpeed = diskWriteLen / elapsed / 1_Mi;
}
- Clock::time_point after = Clock::now();
- testFile.Close();
vespalib::unlink(fileName);
- double elapsed = vespalib::to_s(after - before);
- double diskWriteSpeed = diskWriteLen / elapsed / 1_Mi;
return diskWriteSpeed;
}
diff --git a/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.cpp b/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.cpp
new file mode 100644
index 00000000000..a3a473c9548
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.cpp
@@ -0,0 +1,37 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "replay_feedtoken_state.h"
+
+namespace proton::feedtoken {
+
+ReplayState::ReplayState(vespalib::SharedOperationThrottler::Token throttler_token)
+ : IState(),
+ _throttler_token(std::move(throttler_token))
+{
+}
+
+ReplayState::~ReplayState() = default;
+
+bool
+ReplayState::is_replay() const noexcept
+{
+ return true;
+}
+
+void
+ReplayState::fail()
+{
+}
+
+void
+ReplayState::setResult(ResultUP, bool)
+{
+}
+
+const storage::spi::Result&
+ReplayState::getResult()
+{
+ abort();
+}
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.h b/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.h
new file mode 100644
index 00000000000..512f12a50af
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/common/replay_feedtoken_state.h
@@ -0,0 +1,26 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "feedtoken.h"
+#include <vespa/vespalib/util/shared_operation_throttler.h>
+
+namespace proton::feedtoken {
+
+/*
+ * Feed token state used during replay. It contains a throttler token
+ * which allows the related shared operation throttler to track the completion
+ * of the feed operation.
+ */
+class ReplayState : public IState {
+ vespalib::SharedOperationThrottler::Token _throttler_token;
+public:
+ ~ReplayState() override;
+ ReplayState(vespalib::SharedOperationThrottler::Token throttler_token);
+ bool is_replay() const noexcept override;
+ void fail() override;
+ void setResult(ResultUP result, bool documentWasFound) override;
+ const storage::spi::Result &getResult() override;
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp
index 28a91e1444d..a04f5bfa651 100644
--- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp
@@ -145,7 +145,7 @@ SummaryManager::createSummarySetup(const SummaryConfig & summaryCfg, const Summa
juniperCfg, attributeMgr, _docStore, repo);
}
-SummaryManager::SummaryManager(vespalib::ThreadExecutor & executor, const LogDocumentStore::Config & storeConfig,
+SummaryManager::SummaryManager(vespalib::Executor &shared_executor, const LogDocumentStore::Config & storeConfig,
const search::GrowStrategy & growStrategy, const vespalib::string &baseDir,
const DocTypeName &docTypeName, const TuneFileSummary &tuneFileSummary,
const FileHeaderContext &fileHeaderContext, search::transactionlog::SyncProxy &tlSyncer,
@@ -154,7 +154,7 @@ SummaryManager::SummaryManager(vespalib::ThreadExecutor & executor, const LogDoc
_docTypeName(docTypeName),
_docStore()
{
- _docStore = std::make_shared<LogDocumentStore>(executor, baseDir, storeConfig, growStrategy, tuneFileSummary,
+ _docStore = std::make_shared<LogDocumentStore>(shared_executor, baseDir, storeConfig, growStrategy, tuneFileSummary,
fileHeaderContext, tlSyncer, std::move(bucketizer));
}
diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h
index b3cbd399262..ba55761d091 100644
--- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h
+++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h
@@ -10,7 +10,6 @@
#include <vespa/searchlib/docstore/logdocumentstore.h>
#include <vespa/searchlib/transactionlog/syncproxy.h>
#include <vespa/document/fieldvalue/document.h>
-#include <vespa/vespalib/util/threadexecutor.h>
namespace search { class IBucketizer; }
namespace search::common { class FileHeaderContext; }
@@ -60,7 +59,7 @@ private:
public:
typedef std::shared_ptr<SummaryManager> SP;
- SummaryManager(vespalib::ThreadExecutor & executor,
+ SummaryManager(vespalib::Executor &shared_executor,
const search::LogDocumentStore::Config & summary,
const search::GrowStrategy & growStrategy,
const vespalib::string &baseDir,
diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.cpp
index 21506c3014f..568fa740d6f 100644
--- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.cpp
@@ -11,7 +11,7 @@ SummaryManagerInitializer(const search::GrowStrategy &grow,
const vespalib::string & baseDir,
const vespalib::string &subDbName,
const DocTypeName &docTypeName,
- vespalib::ThreadExecutor &summaryExecutor,
+ vespalib::Executor &shared_executor,
const search::LogDocumentStore::Config & storeCfg,
const search::TuneFileSummary &tuneFile,
const search::common::FileHeaderContext &fileHeaderContext,
@@ -23,7 +23,7 @@ SummaryManagerInitializer(const search::GrowStrategy &grow,
_baseDir(baseDir),
_subDbName(subDbName),
_docTypeName(docTypeName),
- _summaryExecutor(summaryExecutor),
+ _shared_executor(shared_executor),
_storeCfg(storeCfg),
_tuneFile(tuneFile),
_fileHeaderContext(fileHeaderContext),
@@ -41,7 +41,7 @@ SummaryManagerInitializer::run()
vespalib::Timer timer;
EventLogger::loadDocumentStoreStart(_subDbName);
*_result = std::make_shared<SummaryManager>
- (_summaryExecutor, _storeCfg, _grow, _baseDir, _docTypeName,
+ (_shared_executor, _storeCfg, _grow, _baseDir, _docTypeName,
_tuneFile, _fileHeaderContext, _tlSyncer, _bucketizer);
EventLogger::loadDocumentStoreComplete(_subDbName, timer.elapsed());
}
diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.h b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.h
index 7075560ed56..318edc425a7 100644
--- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.h
+++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.h
@@ -5,7 +5,6 @@
#include "summarymanager.h"
#include <vespa/searchcore/proton/initializer/initializer_task.h>
#include <vespa/searchcommon/common/growstrategy.h>
-#include <vespa/vespalib/stllike/string.h>
namespace proton {
@@ -20,7 +19,7 @@ class SummaryManagerInitializer : public initializer::InitializerTask
const vespalib::string _baseDir;
const vespalib::string _subDbName;
const DocTypeName _docTypeName;
- vespalib::ThreadExecutor &_summaryExecutor;
+ vespalib::Executor &_shared_executor;
const search::LogDocumentStore::Config _storeCfg;
const search::TuneFileSummary _tuneFile;
const search::common::FileHeaderContext &_fileHeaderContext;
@@ -36,7 +35,7 @@ public:
const vespalib::string & baseDir,
const vespalib::string &subDbName,
const DocTypeName &docTypeName,
- vespalib::ThreadExecutor & summaryExecutor,
+ vespalib::Executor &shared_executor,
const search::LogDocumentStore::Config & storeCfg,
const search::TuneFileSummary &tuneFile,
const search::common::FileHeaderContext & fileHeaderContext,
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
index 2b76faa8d7f..011d97d4609 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
@@ -4,20 +4,21 @@
#include "flush_all_strategy.h"
#include "flushengine.h"
#include "flushtask.h"
-#include "tls_stats_map.h"
#include "tls_stats_factory.h"
+#include "tls_stats_map.h"
#include <vespa/searchcore/proton/common/eventlogger.h>
#include <vespa/searchlib/common/flush_token.h>
-#include <vespa/vespalib/util/jsonwriter.h>
+#include <vespa/vespalib/util/cpu_usage.h>
#include <vespa/vespalib/util/size_literals.h>
#include <thread>
#include <vespa/log/log.h>
LOG_SETUP(".proton.flushengine.flushengine");
-typedef vespalib::Executor::Task Task;
-using searchcorespi::IFlushTarget;
+using Task = vespalib::Executor::Task;
using searchcorespi::FlushStats;
+using searchcorespi::IFlushTarget;
+using vespalib::CpuUsage;
using namespace std::chrono_literals;
namespace proton {
@@ -86,7 +87,7 @@ FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> tlsStats
_threadPool(128_Ki),
_strategy(std::move(strategy)),
_priorityStrategy(),
- _executor(numThreads, 128_Ki, flush_engine_executor),
+ _executor(numThreads, 128_Ki, CpuUsage::wrap(flush_engine_executor, CpuUsage::Category::COMPACT)),
_lock(),
_cond(),
_handlers(),
diff --git a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp
index 58dc473b85e..8489a68af15 100644
--- a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp
@@ -6,6 +6,7 @@
#include <vespa/vespalib/data/smart_buffer.h>
#include <vespa/vespalib/data/slime/binary_format.h>
#include <vespa/vespalib/util/size_literals.h>
+#include <vespa/vespalib/util/cpu_usage.h>
#include <vespa/log/log.h>
@@ -34,12 +35,14 @@ public:
};
VESPA_THREAD_STACK_TAG(match_engine_executor)
+VESPA_THREAD_STACK_TAG(match_engine_thread_bundle)
} // namespace anon
namespace proton {
using namespace vespalib::slime;
+using vespalib::CpuUsage;
MatchEngine::MatchEngine(size_t numThreads, size_t threadsPerSearch, uint32_t distributionKey, bool async)
: _lock(),
@@ -48,8 +51,10 @@ MatchEngine::MatchEngine(size_t numThreads, size_t threadsPerSearch, uint32_t di
_closed(false),
_forward_issues(true),
_handlers(),
- _executor(std::max(size_t(1), numThreads / threadsPerSearch), 256_Ki, match_engine_executor),
- _threadBundlePool(std::max(size_t(1), threadsPerSearch)),
+ _executor(std::max(size_t(1), numThreads / threadsPerSearch), 256_Ki,
+ CpuUsage::wrap(match_engine_executor, CpuUsage::Category::READ)),
+ _threadBundlePool(std::max(size_t(1), threadsPerSearch),
+ CpuUsage::wrap(match_engine_thread_bundle, CpuUsage::Category::READ)),
_nodeUp(false),
_nodeMaintenance(false)
{
@@ -147,6 +152,9 @@ MatchEngine::performSearch(search::engine::SearchRequest::Source req)
}
}
_threadBundlePool.release(std::move(threadBundle));
+ if (searchRequest->expired()) {
+ vespalib::Issue::report("search request timed out; results may be incomplete");
+ }
}
ret->request = req.release();
if (_forward_issues) {
diff --git a/searchcore/src/vespa/searchcore/proton/matching/docsum_matcher.cpp b/searchcore/src/vespa/searchcore/proton/matching/docsum_matcher.cpp
index ce04464a851..032991cbf6a 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/docsum_matcher.cpp
+++ b/searchcore/src/vespa/searchcore/proton/matching/docsum_matcher.cpp
@@ -47,7 +47,8 @@ get_feature_set(const MatchToolsFactory &mtf,
} else {
matchTools->setup_dump();
}
- auto retval = ExtractFeatures::get_feature_set(matchTools->search(), matchTools->rank_program(), docs, mtf.get_feature_rename_map());
+ auto retval = ExtractFeatures::get_feature_set(matchTools->search(), matchTools->rank_program(), docs,
+ matchTools->getDoom(), mtf.get_feature_rename_map());
if (auto onSummaryTask = mtf.createOnSummaryTask()) {
onSummaryTask->run(docs);
}
diff --git a/searchcore/src/vespa/searchcore/proton/matching/extract_features.cpp b/searchcore/src/vespa/searchcore/proton/matching/extract_features.cpp
index d74885bd0be..9f09255795a 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/extract_features.cpp
+++ b/searchcore/src/vespa/searchcore/proton/matching/extract_features.cpp
@@ -2,6 +2,7 @@
#include "extract_features.h"
#include "match_tools.h"
+#include <vespa/vespalib/util/doom.h>
#include <vespa/eval/eval/value_codec.h>
#include <vespa/vespalib/objects/nbostream.h>
#include <vespa/vespalib/util/runnable.h>
@@ -10,6 +11,7 @@
#include <vespa/searchlib/fef/rank_program.h>
#include <vespa/searchlib/queryeval/searchiterator.h>
+using vespalib::Doom;
using vespalib::Runnable;
using vespalib::ThreadBundle;
using search::FeatureSet;
@@ -59,15 +61,19 @@ struct MyChunk : Runnable {
const std::pair<uint32_t,uint32_t> *begin;
const std::pair<uint32_t,uint32_t> *end;
FeatureValues &result;
+ const Doom &doom;
MyChunk(const std::pair<uint32_t,uint32_t> *begin_in,
- const std::pair<uint32_t,uint32_t> *end_in,
- FeatureValues &result_in)
- : begin(begin_in), end(end_in), result(result_in) {}
+ const std::pair<uint32_t,uint32_t> *end_in,
+ FeatureValues &result_in, const Doom &doom_in)
+ : begin(begin_in), end(end_in), result(result_in), doom(doom_in) {}
void calculate_features(SearchIterator &search, const FeatureResolver &resolver) {
assert(end > begin);
assert(resolver.num_features() == result.names.size());
search.initRange(begin[0].first, end[-1].first + 1);
for (auto pos = begin; pos != end; ++pos) {
+ if (doom.hard_doom()) {
+ return;
+ }
search.unpack(pos->first);
auto *dst = &result.values[pos->second * resolver.num_features()];
extract_values(resolver, pos->first, dst);
@@ -81,9 +87,10 @@ struct FirstChunk : MyChunk {
FirstChunk(const std::pair<uint32_t,uint32_t> *begin_in,
const std::pair<uint32_t,uint32_t> *end_in,
FeatureValues &result_in,
+ const Doom &doom_in,
SearchIterator &search_in,
const FeatureResolver &resolver_in)
- : MyChunk(begin_in, end_in, result_in),
+ : MyChunk(begin_in, end_in, result_in, doom_in),
search(search_in),
resolver(resolver_in) {}
void run() override { calculate_features(search, resolver); }
@@ -94,8 +101,9 @@ struct LaterChunk : MyChunk {
LaterChunk(const std::pair<uint32_t,uint32_t> *begin_in,
const std::pair<uint32_t,uint32_t> *end_in,
FeatureValues &result_in,
+ const Doom &doom_in,
const MatchToolsFactory &mtf_in)
- : MyChunk(begin_in, end_in, result_in),
+ : MyChunk(begin_in, end_in, result_in, doom_in),
mtf(mtf_in) {}
void run() override {
auto tools = mtf.createMatchTools();
@@ -125,13 +133,16 @@ struct MyWork {
FeatureSet::UP
ExtractFeatures::get_feature_set(SearchIterator &search, RankProgram &rank_program, const std::vector<uint32_t> &docs,
- const MatchToolsFactory::StringStringMap &renames)
+ const Doom &doom, const MatchToolsFactory::StringStringMap &renames)
{
FeatureResolver resolver(rank_program.get_seeds(false));
auto result = std::make_unique<FeatureSet>(extract_names(resolver, renames), docs.size());
if (!docs.empty()) {
search.initRange(docs.front(), docs.back()+1);
for (uint32_t docid: docs) {
+ if (doom.hard_doom()) {
+ return result;
+ }
search.unpack(docid);
auto *dst = result->getFeaturesByIndex(result->addDocId(docid));
extract_values(resolver, docid, dst);
@@ -159,9 +170,9 @@ ExtractFeatures::get_match_features(const MatchToolsFactory &mtf, const OrderedD
break;
}
if (i == 0) {
- work.chunks.push_back(std::make_unique<FirstChunk>(&docs[idx], &docs[idx + chunk_size], result, tools->search(), resolver));
+ work.chunks.push_back(std::make_unique<FirstChunk>(&docs[idx], &docs[idx + chunk_size], result, tools->getDoom(), tools->search(), resolver));
} else {
- work.chunks.push_back(std::make_unique<LaterChunk>(&docs[idx], &docs[idx + chunk_size], result, mtf));
+ work.chunks.push_back(std::make_unique<LaterChunk>(&docs[idx], &docs[idx + chunk_size], result, tools->getDoom(), mtf));
}
idx += chunk_size;
}
diff --git a/searchcore/src/vespa/searchcore/proton/matching/extract_features.h b/searchcore/src/vespa/searchcore/proton/matching/extract_features.h
index 44bff08df2e..48c3476f164 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/extract_features.h
+++ b/searchcore/src/vespa/searchcore/proton/matching/extract_features.h
@@ -6,6 +6,7 @@
#include <vespa/searchlib/common/stringmap.h>
#include <vector>
+namespace vespalib { class Doom; };
namespace vespalib { struct ThreadBundle; };
namespace search::queryeval { class SearchIterator; }
namespace search::fef { class RankProgram; }
@@ -27,8 +28,7 @@ struct ExtractFeatures {
* documents (must be in ascending order) using unpack information
* from a search.
**/
- static FeatureSet::UP get_feature_set(SearchIterator &search, RankProgram &rank_program, const std::vector<uint32_t> &docs, const StringStringMap &renames);
-
+ static FeatureSet::UP get_feature_set(SearchIterator &search, RankProgram &rank_program, const std::vector<uint32_t> &docs, const vespalib::Doom &doom, const StringStringMap &renames);
// first: docid, second: result index (must be sorted on docid)
using OrderedDocs = std::vector<std::pair<uint32_t,uint32_t>>;
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/documentdb_job_trackers.cpp b/searchcore/src/vespa/searchcore/proton/metrics/documentdb_job_trackers.cpp
index af250b61d03..2e56b7010f9 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/documentdb_job_trackers.cpp
+++ b/searchcore/src/vespa/searchcore/proton/metrics/documentdb_job_trackers.cpp
@@ -28,15 +28,15 @@ DocumentDBJobTrackers::DocumentDBJobTrackers()
{
}
-DocumentDBJobTrackers::~DocumentDBJobTrackers() {}
+DocumentDBJobTrackers::~DocumentDBJobTrackers() = default;
namespace {
IFlushTarget::SP
-trackFlushTarget(const IJobTracker::SP &tracker,
- const IFlushTarget::SP &target)
+trackFlushTarget(std::shared_ptr<IJobTracker> tracker,
+ std::shared_ptr<IFlushTarget> target)
{
- return std::make_shared<JobTrackedFlushTarget>(tracker, target);
+ return std::make_shared<JobTrackedFlushTarget>(std::move(tracker), std::move(target));
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/documentdb_job_trackers.h b/searchcore/src/vespa/searchcore/proton/metrics/documentdb_job_trackers.h
index a05fbb49a41..cd7ec612d98 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/documentdb_job_trackers.h
+++ b/searchcore/src/vespa/searchcore/proton/metrics/documentdb_job_trackers.h
@@ -4,8 +4,6 @@
#include "documentdb_tagged_metrics.h"
#include "job_tracker.h"
#include <vespa/searchcorespi/flush/iflushtarget.h>
-#include <chrono>
-#include <mutex>
namespace proton {
@@ -16,20 +14,23 @@ namespace proton {
class DocumentDBJobTrackers
{
private:
- std::mutex _lock;
using time_point = std::chrono::time_point<std::chrono::steady_clock>;
- time_point _now;
- JobTracker::SP _attributeFlush;
- JobTracker::SP _memoryIndexFlush;
- JobTracker::SP _diskIndexFusion;
- JobTracker::SP _documentStoreFlush;
- JobTracker::SP _documentStoreCompact;
- JobTracker::SP _bucketMove;
- JobTracker::SP _lidSpaceCompact;
- JobTracker::SP _removedDocumentsPrune;
+ using JobTrackerSP = std::shared_ptr<JobTracker>;
+ std::mutex _lock;
+ time_point _now;
+ JobTrackerSP _attributeFlush;
+ JobTrackerSP _memoryIndexFlush;
+ JobTrackerSP _diskIndexFusion;
+ JobTrackerSP _documentStoreFlush;
+ JobTrackerSP _documentStoreCompact;
+ JobTrackerSP _bucketMove;
+ JobTrackerSP _lidSpaceCompact;
+ JobTrackerSP _removedDocumentsPrune;
public:
DocumentDBJobTrackers();
+ DocumentDBJobTrackers(const DocumentDBJobTrackers &) = delete;
+ DocumentDBJobTrackers & operator = (const DocumentDBJobTrackers &) = delete;
~DocumentDBJobTrackers();
IJobTracker &getAttributeFlush() { return *_attributeFlush; }
@@ -37,9 +38,9 @@ public:
IJobTracker &getDiskIndexFusion() { return *_diskIndexFusion; }
IJobTracker &getDocumentStoreFlush() { return *_documentStoreFlush; }
IJobTracker &getDocumentStoreCompact() { return *_documentStoreCompact; }
- IJobTracker::SP getBucketMove() { return _bucketMove; }
- IJobTracker::SP getLidSpaceCompact() { return _lidSpaceCompact; }
- IJobTracker::SP getRemovedDocumentsPrune() { return _removedDocumentsPrune; }
+ std::shared_ptr<IJobTracker> getBucketMove() { return _bucketMove; }
+ std::shared_ptr<IJobTracker> getLidSpaceCompact() { return _lidSpaceCompact; }
+ std::shared_ptr<IJobTracker> getRemovedDocumentsPrune() { return _removedDocumentsPrune; }
searchcorespi::IFlushTarget::List
trackFlushTargets(const searchcorespi::IFlushTarget::List &flushTargets);
@@ -47,5 +48,4 @@ public:
void updateMetrics(DocumentDBTaggedMetrics::JobMetrics &metrics);
};
-} // namespace proton
-
+}
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/i_job_tracker.h b/searchcore/src/vespa/searchcore/proton/metrics/i_job_tracker.h
index 646a9f07672..eec8cfbc44a 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/i_job_tracker.h
+++ b/searchcore/src/vespa/searchcore/proton/metrics/i_job_tracker.h
@@ -2,8 +2,6 @@
#pragma once
-#include <memory>
-
namespace proton {
/**
@@ -11,9 +9,7 @@ namespace proton {
*/
struct IJobTracker
{
- typedef std::shared_ptr<IJobTracker> SP;
-
- virtual ~IJobTracker() {}
+ virtual ~IJobTracker() = default;
virtual void start() = 0;
virtual void end() = 0;
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp
index 8de3c29d02b..5b49c724c6f 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp
+++ b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.cpp
@@ -8,11 +8,11 @@ using searchcorespi::FlushTask;
namespace proton {
-JobTrackedFlushTarget::JobTrackedFlushTarget(const IJobTracker::SP &tracker,
- const IFlushTarget::SP &target)
+JobTrackedFlushTarget::JobTrackedFlushTarget(std::shared_ptr<IJobTracker> tracker,
+ std::shared_ptr<IFlushTarget> target)
: IFlushTarget(target->getName(), target->getType(), target->getComponent()),
- _tracker(tracker),
- _target(target)
+ _tracker(std::move(tracker)),
+ _target(std::move(target))
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h
index 7a9bae7f662..35d1b0b0b12 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h
+++ b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_target.h
@@ -13,12 +13,12 @@ namespace proton {
class JobTrackedFlushTarget : public searchcorespi::IFlushTarget
{
private:
- IJobTracker::SP _tracker;
- searchcorespi::IFlushTarget::SP _target;
+ std::shared_ptr<IJobTracker> _tracker;
+ std::shared_ptr<searchcorespi::IFlushTarget> _target;
public:
- JobTrackedFlushTarget(const IJobTracker::SP &tracker,
- const searchcorespi::IFlushTarget::SP &target);
+ JobTrackedFlushTarget(std::shared_ptr<IJobTracker> tracker,
+ std::shared_ptr<searchcorespi::IFlushTarget> target);
~JobTrackedFlushTarget();
const IJobTracker &getTracker() const { return *_tracker; }
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_task.cpp b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_task.cpp
index f5cf0b1afff..3e06c8321fd 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_task.cpp
+++ b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_task.cpp
@@ -6,13 +6,14 @@ using searchcorespi::FlushTask;
namespace proton {
-JobTrackedFlushTask::JobTrackedFlushTask(const IJobTracker::SP &tracker,
- FlushTask::UP task)
- : _tracker(tracker),
+JobTrackedFlushTask::JobTrackedFlushTask(std::shared_ptr<IJobTracker> tracker, FlushTask::UP task)
+ : _tracker(std::move(tracker)),
_task(std::move(task))
{
}
+JobTrackedFlushTask::~JobTrackedFlushTask() = default;
+
void
JobTrackedFlushTask::run()
{
@@ -21,4 +22,4 @@ JobTrackedFlushTask::run()
_tracker->end();
}
-} // namespace proton
+}
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_task.h b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_task.h
index fe8d59b9dd6..a10ccbdffd6 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_task.h
+++ b/searchcore/src/vespa/searchcore/proton/metrics/job_tracked_flush_task.h
@@ -12,16 +12,18 @@ namespace proton {
class JobTrackedFlushTask : public searchcorespi::FlushTask
{
private:
- IJobTracker::SP _tracker;
+ std::shared_ptr<IJobTracker> _tracker;
searchcorespi::FlushTask::UP _task;
public:
- JobTrackedFlushTask(const IJobTracker::SP &tracker,
+ JobTrackedFlushTask(std::shared_ptr<IJobTracker> tracker,
searchcorespi::FlushTask::UP task);
+ JobTrackedFlushTask(const JobTrackedFlushTask &) = delete;
+ JobTrackedFlushTask & operator = (const JobTrackedFlushTask &) = delete;
+ ~JobTrackedFlushTask() override;
- // Implements searchcorespi::FlushTask
- virtual void run() override;
- virtual search::SerialNum getFlushSerial() const override {
+ void run() override;
+ search::SerialNum getFlushSerial() const override {
return _task->getFlushSerial();
}
};
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/job_tracker.h b/searchcore/src/vespa/searchcore/proton/metrics/job_tracker.h
index 49b63a6d018..dee974f8b56 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/job_tracker.h
+++ b/searchcore/src/vespa/searchcore/proton/metrics/job_tracker.h
@@ -19,8 +19,6 @@ private:
std::mutex &_lock;
public:
- typedef std::shared_ptr<JobTracker> SP;
-
JobTracker(time_point now, std::mutex &lock);
/**
@@ -29,10 +27,8 @@ public:
*/
double sampleLoad(time_point now, const std::lock_guard<std::mutex> &guard);
- // Implements IJobTracker
- virtual void start() override;
- virtual void end() override;
+ void start() override;
+ void end() override;
};
-} // namespace proton
-
+}
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/resource_usage_metrics.cpp b/searchcore/src/vespa/searchcore/proton/metrics/resource_usage_metrics.cpp
index 9bc20f95d13..d026ef549d4 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/resource_usage_metrics.cpp
+++ b/searchcore/src/vespa/searchcore/proton/metrics/resource_usage_metrics.cpp
@@ -1,21 +1,53 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "resource_usage_metrics.h"
+#include <vespa/vespalib/util/stringfmt.h>
+
+using vespalib::make_string;
namespace proton {
+ResourceUsageMetrics::CpuUtilMetrics::CpuUtilMetrics(metrics::MetricSet *parent)
+ : MetricSet("cpu_util", {}, "Unnormalized cpu utilization for various categories", parent),
+ setup("setup", {}, "cpu used by system init and (re-)configuration", this),
+ read("read", {}, "cpu used by reading data from the system", this),
+ write("write", {}, "cpu used by writing data to the system", this),
+ compact("compact", {}, "cpu used by internal data re-structuring", this),
+ other("other", {}, "cpu used by work not classified as a specific category", this)
+{
+}
+
+ResourceUsageMetrics::CpuUtilMetrics::~CpuUtilMetrics() = default;
+
+ResourceUsageMetrics::DetailedResourceMetrics::DetailedResourceMetrics(const vespalib::string& resource_type, metrics::MetricSet* parent)
+ : MetricSet(make_string("%s_usage", resource_type.c_str()), {}, make_string("Detailed resource usage metrics for %s",
+ resource_type.c_str()), parent),
+ total("total", {}, make_string("The total relative amount of %s used by this content node (value in the range [0, 1])",
+ resource_type.c_str()), this),
+ total_util("total_utilization", {}, make_string("The relative amount of %s used compared to the content node %s resource limit",
+ resource_type.c_str(), resource_type.c_str()), this),
+ transient("transient", {}, make_string("The relative amount of transient %s used by this content node (value in the range [0, 1])",
+ resource_type.c_str()), this)
+{
+}
+
+ResourceUsageMetrics::DetailedResourceMetrics::~DetailedResourceMetrics() = default;
+
ResourceUsageMetrics::ResourceUsageMetrics(metrics::MetricSet *parent)
- : MetricSet("resource_usage", {}, "Usage metrics for various resources in this search engine", parent),
- disk("disk", {}, "The relative amount of disk space used on this machine (value in the range [0, 1])", this),
+ : MetricSet("resource_usage", {}, "Usage metrics for various resources in this content node", parent),
+ disk("disk", {}, "The relative amount of disk used by this content node (transient usage not included, value in the range [0, 1]). Same value as reported to the cluster controller", this),
diskUtilization("disk_utilization", {}, "The relative amount of disk used compared to the disk resource limit", this),
- memory("memory", {}, "The relative amount of memory used by this process (value in the range [0, 1])", this),
+ memory("memory", {}, "The relative amount of memory used by this content node (transient usage not included, value in the range [0, 1]). Same value as reported to the cluster controller", this),
memoryUtilization("memory_utilization", {}, "The relative amount of memory used compared to the memory resource limit", this),
transient_memory("transient_memory", {}, "The relative amount of transient memory needed for loading attributes. Max value among all attributes (value in the range [0, 1])", this),
transient_disk("transient_disk", {}, "The relative amount of transient disk needed for running disk index fusion. Max value among all disk indexes (value in the range [0, 1])", this),
+ disk_usage("disk", this),
+ memory_usage("memory", this),
memoryMappings("memory_mappings", {}, "The number of mapped memory areas", this),
openFileDescriptors("open_file_descriptors", {}, "The number of open files", this),
feedingBlocked("feeding_blocked", {}, "Whether feeding is blocked due to resource limits being reached (value is either 0 or 1)", this),
- mallocArena("malloc_arena", {}, "Size of malloc arena", this)
+ mallocArena("malloc_arena", {}, "Size of malloc arena", this),
+ cpu_util(this)
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/resource_usage_metrics.h b/searchcore/src/vespa/searchcore/proton/metrics/resource_usage_metrics.h
index 774bb645c84..97cad935dba 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/resource_usage_metrics.h
+++ b/searchcore/src/vespa/searchcore/proton/metrics/resource_usage_metrics.h
@@ -12,16 +12,42 @@ namespace proton {
*/
struct ResourceUsageMetrics : metrics::MetricSet
{
+ struct CpuUtilMetrics : metrics::MetricSet {
+ metrics::DoubleValueMetric setup;
+ metrics::DoubleValueMetric read;
+ metrics::DoubleValueMetric write;
+ metrics::DoubleValueMetric compact;
+ metrics::DoubleValueMetric other;
+
+ CpuUtilMetrics(metrics::MetricSet *parent);
+ ~CpuUtilMetrics();
+ };
+
+ struct DetailedResourceMetrics : metrics::MetricSet {
+ metrics::DoubleValueMetric total;
+ metrics::DoubleValueMetric total_util;
+ metrics::DoubleValueMetric transient;
+
+ DetailedResourceMetrics(const vespalib::string& resource_type, metrics::MetricSet* parent);
+ ~DetailedResourceMetrics();
+ };
+
+ // TODO Vespa 8: Remove diskUtilization, memoryUtilization, transient_memory, transient_disk.
+ // These are now included in disk_usage and memory_usage.
+
metrics::DoubleValueMetric disk;
metrics::DoubleValueMetric diskUtilization;
metrics::DoubleValueMetric memory;
metrics::DoubleValueMetric memoryUtilization;
metrics::DoubleValueMetric transient_memory;
metrics::DoubleValueMetric transient_disk;
+ DetailedResourceMetrics disk_usage;
+ DetailedResourceMetrics memory_usage;
metrics::LongValueMetric memoryMappings;
metrics::LongValueMetric openFileDescriptors;
metrics::LongValueMetric feedingBlocked;
metrics::LongValueMetric mallocArena;
+ CpuUtilMetrics cpu_util;
ResourceUsageMetrics(metrics::MetricSet *parent);
~ResourceUsageMetrics();
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/resource_usage_tracker.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/resource_usage_tracker.cpp
index 6307604598d..9c8e6591730 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/resource_usage_tracker.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/resource_usage_tracker.cpp
@@ -94,8 +94,13 @@ void
ResourceUsageTracker::notifyDiskMemUsage(DiskMemUsageState state)
{
std::lock_guard guard(_lock);
- // TODO: Subtract transient resource (memory and disk) usage from the absolute numbers.
- _resource_usage = ResourceUsage(state.diskState().usage(), state.memoryState().usage(), _resource_usage.get_attribute_address_space_usage());
+ // The transient resource usage is subtracted from the total resource usage
+ // before it eventually is reported to the cluster controller (to decide whether to block client feed).
+ // This ensures that the transient resource usage is covered by the resource headroom on the content node,
+ // instead of leading to feed blocked due to natural fluctuations.
+ _resource_usage = ResourceUsage(state.non_transient_disk_usage(),
+ state.non_transient_memory_usage(),
+ _resource_usage.get_attribute_address_space_usage());
if (_listener != nullptr) {
_listener->update_resource_usage(_resource_usage);
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/bootstrapconfigmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/bootstrapconfigmanager.cpp
index 8f99eb5e8a7..f0cb123b49f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bootstrapconfigmanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/bootstrapconfigmanager.cpp
@@ -8,6 +8,7 @@
#include <vespa/config-bucketspaces.h>
#include <vespa/searchlib/common/tunefileinfo.hpp>
#include <vespa/vespalib/io/fileutil.h>
+#include <vespa/config/retriever/configsnapshot.hpp>
#include <cassert>
#include <vespa/log/log.h>
diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_metrics.cpp b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_metrics.cpp
index 230593c2c1d..d740d5b129d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_metrics.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_metrics.cpp
@@ -11,25 +11,29 @@ DiskMemUsageMetrics::DiskMemUsageMetrics() noexcept
{
}
-DiskMemUsageMetrics::DiskMemUsageMetrics(const DiskMemUsageState &usage_state) noexcept
- : _disk_usage(usage_state.diskState().usage()),
- _disk_utilization(usage_state.diskState().utilization()),
- _transient_disk_usage(usage_state.transient_disk_usage()),
- _memory_usage(usage_state.memoryState().usage()),
- _memory_utilization(usage_state.memoryState().utilization()),
- _transient_memory_usage(usage_state.transient_memory_usage())
+DiskMemUsageMetrics::DiskMemUsageMetrics(const DiskMemUsageState& usage) noexcept
+ : _total_disk_usage(usage.diskState().usage()),
+ _total_disk_utilization(usage.diskState().utilization()),
+ _transient_disk_usage(usage.transient_disk_usage()),
+ _non_transient_disk_usage(usage.non_transient_disk_usage()),
+ _total_memory_usage(usage.memoryState().usage()),
+ _total_memory_utilization(usage.memoryState().utilization()),
+ _transient_memory_usage(usage.transient_memory_usage()),
+ _non_transient_memory_usage(usage.non_transient_memory_usage())
{
}
void
-DiskMemUsageMetrics::merge(const DiskMemUsageState &usage_state) noexcept
+DiskMemUsageMetrics::merge(const DiskMemUsageState& usage) noexcept
{
- _disk_usage = std::max(_disk_usage, usage_state.diskState().usage());
- _disk_utilization = std::max(_disk_utilization, usage_state.diskState().utilization());
- _transient_disk_usage = std::max(_transient_disk_usage, usage_state.transient_disk_usage());
- _memory_usage = std::max(_memory_usage, usage_state.memoryState().usage());
- _memory_utilization = std::max(_memory_utilization, usage_state.memoryState().utilization());
- _transient_memory_usage = std::max(_transient_memory_usage, usage_state.transient_memory_usage());
+ _total_disk_usage = std::max(_total_disk_usage, usage.diskState().usage());
+ _total_disk_utilization = std::max(_total_disk_utilization, usage.diskState().utilization());
+ _transient_disk_usage = std::max(_transient_disk_usage, usage.transient_disk_usage());
+ _non_transient_disk_usage = std::max(_non_transient_disk_usage, usage.non_transient_disk_usage());
+ _total_memory_usage = std::max(_total_memory_usage, usage.memoryState().usage());
+ _total_memory_utilization = std::max(_total_memory_utilization, usage.memoryState().utilization());
+ _transient_memory_usage = std::max(_transient_memory_usage, usage.transient_memory_usage());
+ _non_transient_memory_usage = std::max(_non_transient_memory_usage, usage.non_transient_memory_usage());
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_metrics.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_metrics.h
index cb97eb4c891..3e3d6fdc752 100644
--- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_metrics.h
+++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_metrics.h
@@ -12,23 +12,27 @@ class DiskMemUsageState;
*/
class DiskMemUsageMetrics
{
- double _disk_usage;
- double _disk_utilization;
+ double _total_disk_usage;
+ double _total_disk_utilization;
double _transient_disk_usage;
- double _memory_usage;
- double _memory_utilization;
+ double _non_transient_disk_usage;
+ double _total_memory_usage;
+ double _total_memory_utilization;
double _transient_memory_usage;
+ double _non_transient_memory_usage;
public:
DiskMemUsageMetrics() noexcept;
- DiskMemUsageMetrics(const DiskMemUsageState &usage_state) noexcept;
- void merge(const DiskMemUsageState &usage_state) noexcept;
- double get_disk_usage() const noexcept { return _disk_usage; }
- double get_disk_utilization() const noexcept { return _disk_utilization; }
- double get_transient_disk_usage() const noexcept { return _transient_disk_usage; }
- double get_memory_usage() const noexcept { return _memory_usage; }
- double get_memory_utilization() const noexcept { return _memory_utilization; }
- double get_transient_memory_usage() const noexcept { return _transient_memory_usage; }
+ DiskMemUsageMetrics(const DiskMemUsageState& usage) noexcept;
+ void merge(const DiskMemUsageState& usage) noexcept;
+ double total_disk_usage() const noexcept { return _total_disk_usage; }
+ double total_disk_utilization() const noexcept { return _total_disk_utilization; }
+ double transient_disk_usage() const noexcept { return _transient_disk_usage; }
+ double non_transient_disk_usage() const noexcept { return _non_transient_disk_usage; }
+ double total_memory_usage() const noexcept { return _total_memory_usage; }
+ double total_memory_utilization() const noexcept { return _total_memory_utilization; }
+ double transient_memory_usage() const noexcept { return _transient_memory_usage; }
+ double non_transient_memory_usage() const noexcept { return _non_transient_memory_usage; }
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_state.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_state.h
index b205b441bcf..2730388de9a 100644
--- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_state.h
+++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_state.h
@@ -3,6 +3,7 @@
#pragma once
#include "resource_usage_state.h"
+#include <algorithm>
namespace proton {
@@ -42,6 +43,8 @@ public:
const ResourceUsageState &memoryState() const { return _memoryState; }
double transient_disk_usage() const { return _transient_disk_usage; }
double transient_memory_usage() const { return _transient_memory_usage; }
+ double non_transient_disk_usage() const { return std::max(0.0, _diskState.usage() - _transient_disk_usage); }
+ double non_transient_memory_usage() const { return std::max(0.0, _memoryState.usage() - _transient_memory_usage); }
bool aboveDiskLimit(double resourceLimitFactor) const { return diskState().aboveLimit(resourceLimitFactor); }
bool aboveMemoryLimit(double resourceLimitFactor) const { return memoryState().aboveLimit(resourceLimitFactor); }
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index f052d663ba6..a30aa916896 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -12,6 +12,7 @@
#include "idocumentsubdb.h"
#include "maintenance_jobs_injector.h"
#include "reconfig_params.h"
+#include "replay_throttling_policy.h"
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/metrics/updatehook.h>
#include <vespa/searchcore/proton/attribute/attribute_config_inspector.h>
@@ -77,6 +78,18 @@ makeIndexConfig(const ProtonConfig::Index & cfg) {
return index::IndexConfig(WarmupConfig(vespalib::from_s(cfg.warmup.time), cfg.warmup.unpack), cfg.maxflushed, cfg.cache.size);
}
+ReplayThrottlingPolicy
+make_replay_throttling_policy(const ProtonConfig::ReplayThrottlingPolicy& cfg) {
+ if (cfg.type == ProtonConfig::ReplayThrottlingPolicy::Type::UNLIMITED) {
+ return ReplayThrottlingPolicy({});
+ }
+ vespalib::SharedOperationThrottler::DynamicThrottleParams params;
+ params.min_window_size = cfg.minWindowSize;
+ params.max_window_size = cfg.maxWindowSize;
+ params.window_size_increment = cfg.windowSizeIncrement;
+ return ReplayThrottlingPolicy(params);
+}
+
class MetricsUpdateHook : public metrics::UpdateHook {
DocumentDB &_db;
public:
@@ -190,6 +203,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir,
_clusterStateHandler(_writeService.master()),
_bucketHandler(_writeService.master()),
_indexCfg(makeIndexConfig(protonCfg.index)),
+ _replay_throttling_policy(std::make_unique<ReplayThrottlingPolicy>(make_replay_throttling_policy(protonCfg.replayThrottlingPolicy))),
_config_store(std::move(config_store)),
_sessionManager(std::make_shared<matching::SessionManager>(protonCfg.grouping.sessionmanager.maxentries)),
_metricsWireService(metricsWireService),
@@ -486,7 +500,7 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum
_writeServiceConfig.defaultTaskLimit());
if (params.shouldSubDbsChange()) {
applySubDBConfig(*configSnapshot, serialNum, params);
- if (serialNum < _feedHandler->getSerialNum()) {
+ if (serialNum < _feedHandler->get_replay_end_serial_num()) {
// Not last entry in tls. Reprocessing should already be done.
_subDBs.getReprocessingRunner().reset();
}
@@ -720,7 +734,8 @@ DocumentDB::startTransactionLogReplay()
getBackingStore().lastSyncToken(),
oldestFlushedSerial,
newestFlushedSerial,
- *_config_store);
+ *_config_store,
+ *_replay_throttling_policy);
_initGate.countDown();
LOG(debug, "DocumentDB(%s): Database started.", _docTypeName.toString().c_str());
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
index 8e8391b2f31..2030e6ffac9 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
@@ -51,6 +51,7 @@ class ExecutorThreadingServiceStats;
class IDocumentDBOwner;
class ISharedThreadingService;
class ITransientResourceUsageProvider;
+class ReplayThrottlingPolicy;
class StatusReport;
struct MetricsWireService;
@@ -104,6 +105,7 @@ private:
ClusterStateHandler _clusterStateHandler;
BucketHandler _bucketHandler;
index::IndexConfig _indexCfg;
+ std::unique_ptr<ReplayThrottlingPolicy> _replay_throttling_policy;
ConfigStore::UP _config_store;
std::shared_ptr<matching::SessionManager> _sessionManager; // TODO: This should not have to be a shared pointer.
MetricsWireService &_metricsWireService;
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp
index 06132803414..b7c319d46f5 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp
@@ -14,6 +14,7 @@
#include <vespa/config-summarymap.h>
#include <vespa/config/file_acquirer/file_acquirer.h>
#include <vespa/config/common/configcontext.h>
+#include <vespa/config/retriever/configretriever.h>
#include <vespa/config/helper/legacy.h>
#include <vespa/config-attributes.h>
#include <vespa/config-indexschema.h>
@@ -23,6 +24,7 @@
#include <vespa/searchsummary/config/config-juniperrc.h>
#include <vespa/vespalib/time/time_box.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/config/retriever/configsnapshot.hpp>
#include <thread>
#include <cassert>
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.h b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.h
index a3c29212c66..ad5959d551b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.h
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.h
@@ -3,9 +3,12 @@
#pragma once
#include "documentdbconfig.h"
-#include <vespa/config/config.h>
#include <mutex>
+namespace config {
+ class ConfigRetriever;
+ class DirSpec;
+}
namespace proton {
class BootstrapConfig;
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentretriever.cpp b/searchcore/src/vespa/searchcore/proton/server/documentretriever.cpp
index 6285f4ce70f..d3778b4d745 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentretriever.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentretriever.cpp
@@ -127,6 +127,7 @@ DocumentRetriever::needFetchFromDocStore(const FieldSet & fieldSet) const {
case FieldSet::Type::NONE:
case FieldSet::Type::DOCID:
return false;
+ case FieldSet::Type::DOCUMENT_ONLY:
case FieldSet::Type::ALL:
return ! _areAllFieldsAttributes;
case FieldSet::Type::FIELD: {
@@ -257,6 +258,14 @@ DocumentRetriever::getPartialDocument(search::DocumentIdT lid, const document::D
populate(lid, *doc, set.getFields());
break;
}
+ case FieldSet::Type::DOCUMENT_ONLY: {
+ const auto * actual = getDocumentType().getFieldSet(document::DocumentOnly::NAME);
+ if (actual != nullptr) {
+ const auto &set = actual->asCollection();
+ populate(lid, *doc, set.getFields());
+ }
+ break;
+ }
case FieldSet::Type::NONE:
case FieldSet::Type::DOCID:
break;
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
index 36c8070f140..ca6b3d9ba0f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
@@ -3,14 +3,16 @@
#include "executorthreadingservice.h"
#include "threading_service_config.h"
#include <vespa/searchcore/proton/metrics/executor_threading_service_stats.h>
+#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
+#include <vespa/vespalib/util/cpu_usage.h>
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <vespa/vespalib/util/singleexecutor.h>
-#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
-using vespalib::SyncableThreadExecutor;
using vespalib::BlockingThreadStackExecutor;
-using vespalib::SingleExecutor;
+using vespalib::CpuUsage;
using vespalib::SequencedTaskExecutor;
+using vespalib::SingleExecutor;
+using vespalib::SyncableThreadExecutor;
using OptimizeFor = vespalib::Executor::OptimizeFor;
using SharedFieldWriterExecutor = proton::ThreadingServiceConfig::SharedFieldWriterExecutor;
@@ -38,22 +40,24 @@ VESPA_THREAD_STACK_TAG(field_writer_executor)
}
-ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor &sharedExecutor, uint32_t num_treads)
+ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor &sharedExecutor, uint32_t num_treads)
: ExecutorThreadingService(sharedExecutor, nullptr, nullptr, ThreadingServiceConfig::make(num_treads))
{}
-ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor,
+ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor& sharedExecutor,
vespalib::ISequencedTaskExecutor* field_writer,
vespalib::InvokeService * invokerService,
const ThreadingServiceConfig& cfg,
uint32_t stackSize)
: _sharedExecutor(sharedExecutor),
- _masterExecutor(1, stackSize, master_executor),
+ _masterExecutor(1, stackSize, CpuUsage::wrap(master_executor, CpuUsage::Category::WRITE)),
_shared_field_writer(cfg.shared_field_writer()),
_master_task_limit(cfg.master_task_limit()),
- _indexExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), index_executor)),
- _summaryExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), summary_executor)),
+ _indexExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(),
+ CpuUsage::wrap(index_executor, CpuUsage::Category::WRITE))),
+ _summaryExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(),
+ CpuUsage::wrap(summary_executor, CpuUsage::Category::WRITE))),
_masterService(_masterExecutor),
_indexService(*_indexExecutor),
_indexFieldInverter(),
@@ -70,9 +74,11 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha
_invokeRegistrations.push_back(invokerService->registerInvoke([executor=_summaryExecutor.get()](){ executor->wakeup();}));
}
if (_shared_field_writer == SharedFieldWriterExecutor::INDEX) {
- _field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 2, cfg.defaultTaskLimit());
- _attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(),
- cfg.optimize(), cfg.kindOfwatermark());
+ _field_writer = SequencedTaskExecutor::create(CpuUsage::wrap(field_writer_executor, CpuUsage::Category::WRITE),
+ cfg.indexingThreads() * 2, cfg.defaultTaskLimit());
+ _attributeFieldWriter = SequencedTaskExecutor::create(CpuUsage::wrap(attribute_field_writer_executor, CpuUsage::Category::WRITE),
+ cfg.indexingThreads(), cfg.defaultTaskLimit(),
+ cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark());
if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) {
_invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();}));
}
@@ -81,8 +87,9 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha
_attribute_field_writer_ptr = _attributeFieldWriter.get();
} else if (_shared_field_writer == SharedFieldWriterExecutor::INDEX_AND_ATTRIBUTE) {
- _field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 3, cfg.defaultTaskLimit(),
- cfg.optimize(), cfg.kindOfwatermark());
+ _field_writer = SequencedTaskExecutor::create(CpuUsage::wrap(field_writer_executor, CpuUsage::Category::WRITE),
+ cfg.indexingThreads() * 3, cfg.defaultTaskLimit(),
+ cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark());
if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) {
_invokeRegistrations.push_back(invokerService->registerInvoke([executor=_field_writer.get()](){ executor->wakeup();}));
}
@@ -95,10 +102,13 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha
_index_field_writer_ptr = field_writer;
_attribute_field_writer_ptr = field_writer;
} else {
- _indexFieldInverter = SequencedTaskExecutor::create(index_field_inverter_executor, cfg.indexingThreads(), cfg.defaultTaskLimit());
- _indexFieldWriter = SequencedTaskExecutor::create(index_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit());
- _attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(),
- cfg.optimize(), cfg.kindOfwatermark());
+ _indexFieldInverter = SequencedTaskExecutor::create(CpuUsage::wrap(index_field_inverter_executor, CpuUsage::Category::WRITE),
+ cfg.indexingThreads(), cfg.defaultTaskLimit());
+ _indexFieldWriter = SequencedTaskExecutor::create(CpuUsage::wrap(index_field_writer_executor, CpuUsage::Category::WRITE),
+ cfg.indexingThreads(), cfg.defaultTaskLimit());
+ _attributeFieldWriter = SequencedTaskExecutor::create(CpuUsage::wrap(attribute_field_writer_executor, CpuUsage::Category::WRITE),
+ cfg.indexingThreads(), cfg.defaultTaskLimit(),
+ cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark());
if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) {
_invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();}));
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
index 8572f7126d6..43d546927c2 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
@@ -20,7 +20,7 @@ class ExecutorThreadingService : public searchcorespi::index::IThreadingService
{
private:
using Registration = std::unique_ptr<vespalib::IDestructorCallback>;
- vespalib::ThreadExecutor & _sharedExecutor;
+ vespalib::Executor & _sharedExecutor;
vespalib::ThreadStackExecutor _masterExecutor;
ThreadingServiceConfig::SharedFieldWriterExecutor _shared_field_writer;
std::atomic<uint32_t> _master_task_limit;
@@ -42,9 +42,9 @@ public:
/**
* Convenience constructor used in unit tests.
*/
- ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor, uint32_t num_treads = 1);
+ ExecutorThreadingService(vespalib::Executor& sharedExecutor, uint32_t num_treads = 1);
- ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor,
+ ExecutorThreadingService(vespalib::Executor& sharedExecutor,
vespalib::ISequencedTaskExecutor* field_writer,
vespalib::InvokeService * invokeService,
const ThreadingServiceConfig& cfg,
@@ -72,7 +72,7 @@ public:
vespalib::ThreadExecutor &summary() override {
return *_summaryExecutor;
}
- vespalib::ThreadExecutor &shared() override {
+ vespalib::Executor &shared() override {
return _sharedExecutor;
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index 8b99c39dd65..16bd2537813 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -464,13 +464,14 @@ FeedHandler::close()
void
FeedHandler::replayTransactionLog(SerialNum flushedIndexMgrSerial, SerialNum flushedSummaryMgrSerial,
SerialNum oldestFlushedSerial, SerialNum newestFlushedSerial,
- ConfigStore &config_store)
+ ConfigStore &config_store,
+ const ReplayThrottlingPolicy& replay_throttling_policy)
{
(void) newestFlushedSerial;
assert(_activeFeedView);
assert(_bucketDBHandler);
auto state = make_shared<ReplayTransactionLogState>
- (getDocTypeName(), _activeFeedView, *_bucketDBHandler, _replayConfig, config_store, *this);
+ (getDocTypeName(), _activeFeedView, *_bucketDBHandler, _replayConfig, config_store, replay_throttling_policy, *this);
changeFeedState(state);
// Resurrected attribute vector might cause oldestFlushedSerial to
// be lower than _prunedSerialNum, so don't warn for now.
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
index 417d9c21548..32a70f7c2b0 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
@@ -36,6 +36,7 @@ class IReplayConfig;
class JoinBucketsOperation;
class PutOperation;
class RemoveOperation;
+class ReplayThrottlingPolicy;
class SplitBucketOperation;
class UpdateOperation;
@@ -189,7 +190,8 @@ public:
SerialNum flushedSummaryMgrSerial,
SerialNum oldestFlushedSerial,
SerialNum newestFlushedSerial,
- ConfigStore &config_store);
+ ConfigStore &config_store,
+ const ReplayThrottlingPolicy& replay_throttling_policy);
/**
* Called when a flush is done and allows pruning of the transaction log.
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
index d2626a0d9f4..d9fdee85e4a 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
@@ -5,11 +5,14 @@
#include "ifeedview.h"
#include "ireplayconfig.h"
#include "replaypacketdispatcher.h"
+#include "replay_throttling_policy.h"
#include <vespa/searchcore/proton/bucketdb/ibucketdbhandler.h>
#include <vespa/searchcore/proton/feedoperation/operations.h>
#include <vespa/searchcore/proton/common/eventlogger.h>
+#include <vespa/searchcore/proton/common/replay_feedtoken_state.h>
#include <vespa/vespalib/util/idestructorcallback.h>
#include <vespa/vespalib/util/lambdatask.h>
+#include <vespa/vespalib/util/shared_operation_throttler.h>
#include <cassert>
#include <vespa/log/log.h>
@@ -21,6 +24,7 @@ using search::SerialNum;
using vespalib::Executor;
using vespalib::makeLambdaTask;
using vespalib::IDestructorCallback;
+using vespalib::SharedOperationThrottler;
using vespalib::make_string;
using proton::bucketdb::IBucketDBHandler;
@@ -52,31 +56,47 @@ class TransactionLogReplayPacketHandler : public IReplayPacketHandler {
FeedConfigStore &_config_store;
IIncSerialNum &_inc_serial_num;
CommitTimeTracker _commitTimeTracker;
+ std::unique_ptr<SharedOperationThrottler> _throttler;
+
+ static std::unique_ptr<SharedOperationThrottler> make_throttler(const ReplayThrottlingPolicy& replay_throttling_policy) {
+ auto& params = replay_throttling_policy.get_params();
+ if (!params.has_value()) {
+ return SharedOperationThrottler::make_unlimited_throttler();
+ }
+ return SharedOperationThrottler::make_dynamic_throttler(params.value());
+ }
public:
TransactionLogReplayPacketHandler(IFeedView *& feed_view_ptr,
IBucketDBHandler &bucketDBHandler,
IReplayConfig &replay_config,
FeedConfigStore &config_store,
+ const ReplayThrottlingPolicy& replay_throttling_policy,
IIncSerialNum &inc_serial_num)
: _feed_view_ptr(feed_view_ptr),
_bucketDBHandler(bucketDBHandler),
_replay_config(replay_config),
_config_store(config_store),
_inc_serial_num(inc_serial_num),
- _commitTimeTracker(5ms)
+ _commitTimeTracker(5ms),
+ _throttler(make_throttler(replay_throttling_policy))
{ }
~TransactionLogReplayPacketHandler() override = default;
+ FeedToken make_replay_feed_token() {
+ SharedOperationThrottler::Token throttler_token = _throttler->blocking_acquire_one();
+ return std::make_shared<feedtoken::ReplayState>(std::move(throttler_token));
+ }
+
void replay(const PutOperation &op) override {
- _feed_view_ptr->handlePut(FeedToken(), op);
+ _feed_view_ptr->handlePut(make_replay_feed_token(), op);
}
void replay(const RemoveOperation &op) override {
- _feed_view_ptr->handleRemove(FeedToken(), op);
+ _feed_view_ptr->handleRemove(make_replay_feed_token(), op);
}
void replay(const UpdateOperation &op) override {
- _feed_view_ptr->handleUpdate(FeedToken(), op);
+ _feed_view_ptr->handleUpdate(make_replay_feed_token(), op);
}
void replay(const NoopOperation &) override {} // ignored
void replay(const NewConfigOperation &op) override {
@@ -84,7 +104,7 @@ public:
}
void replay(const DeleteBucketOperation &op) override {
- _feed_view_ptr->handleDeleteBucket(op, IDestructorCallback::SP());
+ _feed_view_ptr->handleDeleteBucket(op, make_replay_feed_token());
}
void replay(const SplitBucketOperation &op) override {
_bucketDBHandler.handleSplit(op.getSerialNum(), op.getSource(),
@@ -95,15 +115,15 @@ public:
op.getSource2(), op.getTarget());
}
void replay(const PruneRemovedDocumentsOperation &op) override {
- _feed_view_ptr->handlePruneRemovedDocuments(op, IDestructorCallback::SP());
+ _feed_view_ptr->handlePruneRemovedDocuments(op, make_replay_feed_token());
}
void replay(const MoveOperation &op) override {
- _feed_view_ptr->handleMove(op, IDestructorCallback::SP());
+ _feed_view_ptr->handleMove(op, make_replay_feed_token());
}
void replay(const CreateBucketOperation &) override {
}
void replay(const CompactLidSpaceOperation &op) override {
- _feed_view_ptr->handleCompactLidSpace(op, IDestructorCallback::SP());
+ _feed_view_ptr->handleCompactLidSpace(op, make_replay_feed_token());
}
NewConfigOperation::IStreamHandler &getNewConfigStreamHandler() override {
return _config_store;
@@ -173,10 +193,11 @@ ReplayTransactionLogState::ReplayTransactionLogState(
IBucketDBHandler &bucketDBHandler,
IReplayConfig &replay_config,
FeedConfigStore &config_store,
+ const ReplayThrottlingPolicy &replay_throttling_policy,
IIncSerialNum& inc_serial_num)
: FeedState(REPLAY_TRANSACTION_LOG),
_doc_type_name(name),
- _packet_handler(std::make_unique<TransactionLogReplayPacketHandler>(feed_view_ptr, bucketDBHandler, replay_config, config_store, inc_serial_num))
+ _packet_handler(std::make_unique<TransactionLogReplayPacketHandler>(feed_view_ptr, bucketDBHandler, replay_config, config_store, replay_throttling_policy, inc_serial_num))
{ }
ReplayTransactionLogState::~ReplayTransactionLogState() = default;
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.h b/searchcore/src/vespa/searchcore/proton/server/feedstates.h
index 7533d6443a7..fcff28fe481 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedstates.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.h
@@ -55,6 +55,7 @@ public:
bucketdb::IBucketDBHandler &bucketDBHandler,
IReplayConfig &replay_config,
FeedConfigStore &config_store,
+ const ReplayThrottlingPolicy &replay_throttling_policy,
IIncSerialNum &inc_serial_num);
~ReplayTransactionLogState() override;
diff --git a/searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.cpp
index 9a525731d0d..34369a0803e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.cpp
@@ -68,7 +68,6 @@ fsyncFile(const vespalib::string &fileName)
if (!f.Sync()) {
LOG(error, "Could not fsync file '%s'", fileName.c_str());
}
- f.Close();
}
template <class Config>
@@ -131,7 +130,6 @@ ConfigFile::ConfigFile(const vespalib::string &name, const vespalib::string &ful
_content.resize(fileSize);
file.ReadBuf(&_content[0], fileSize);
_modTime = file.GetModificationTime();
- file.Close();
}
nbostream &
diff --git a/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.cpp b/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.cpp
index e317c86df05..c0d8168ab61 100644
--- a/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.cpp
@@ -4,7 +4,7 @@
namespace proton {
-JobTrackedMaintenanceJob::JobTrackedMaintenanceJob(IJobTracker::SP tracker,
+JobTrackedMaintenanceJob::JobTrackedMaintenanceJob(std::shared_ptr<IJobTracker> tracker,
IMaintenanceJob::SP job)
: IMaintenanceJob(job->getName(), job->getDelay(), job->getInterval()),
_tracker(std::move(tracker)),
diff --git a/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h b/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h
index 1953393bad7..20ecfdf023d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h
+++ b/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h
@@ -12,12 +12,14 @@ namespace proton {
class JobTrackedMaintenanceJob final : public IMaintenanceJob
{
private:
- IJobTracker::SP _tracker;
- IMaintenanceJob::SP _job;
- bool _running;
+ std::shared_ptr<IJobTracker> _tracker;
+ IMaintenanceJob::SP _job;
+ bool _running;
public:
- JobTrackedMaintenanceJob(IJobTracker::SP tracker, IMaintenanceJob::SP job);
+ JobTrackedMaintenanceJob(std::shared_ptr<IJobTracker> tracker, IMaintenanceJob::SP job);
+ JobTrackedMaintenanceJob(const JobTrackedMaintenanceJob &) = delete;
+ JobTrackedMaintenanceJob & operator = (const JobTrackedMaintenanceJob &) = delete;
~JobTrackedMaintenanceJob() override;
bool isBlocked() const override { return _job->isBlocked(); }
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
index f4b92876891..49b301da26e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
@@ -17,7 +17,7 @@ namespace proton {
namespace {
IMaintenanceJob::UP
-trackJob(IJobTracker::SP tracker, std::shared_ptr<IMaintenanceJob> job)
+trackJob(std::shared_ptr<IJobTracker> tracker, std::shared_ptr<IMaintenanceJob> job)
{
return std::make_unique<JobTrackedMaintenanceJob>(std::move(tracker), std::move(job));
}
@@ -28,7 +28,7 @@ injectLidSpaceCompactionJobs(MaintenanceController &controller,
storage::spi::BucketExecutor & bucketExecutor,
ILidSpaceCompactionHandler::Vector lscHandlers,
IOperationStorer &opStorer,
- IJobTracker::SP tracker,
+ std::shared_ptr<IJobTracker> tracker,
IDiskMemUsageNotifier &diskMemUsageNotifier,
IClusterStateChangedNotifier &clusterStateChangedNotifier,
const std::shared_ptr<IBucketStateCalculator> &calc,
@@ -89,7 +89,8 @@ MaintenanceJobsInjector::injectJobs(MaintenanceController &controller,
AttributeUsageFilter &attributeUsageFilter)
{
controller.registerJobInMasterThread(std::make_unique<HeartBeatJob>(hbHandler, config.getHeartBeatConfig()));
- controller.registerJobInDefaultPool(std::make_unique<PruneSessionCacheJob>(scPruner, config.getSessionCachePruneInterval()));
+ controller.registerJobInSharedExecutor(
+ std::make_unique<PruneSessionCacheJob>(scPruner, config.getSessionCachePruneInterval()));
const auto & docTypeName = controller.getDocTypeName().getName();
const MaintenanceDocumentSubDB &mRemSubDB(controller.getRemSubDB());
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
index 0d75464a161..fa4bae8f01b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
@@ -39,12 +39,12 @@ isRunnable(const MaintenanceJobRunner & job, const Executor * master) {
}
-MaintenanceController::MaintenanceController(ISyncableThreadService &masterThread,
- vespalib::Executor & defaultExecutor,
- MonitoredRefCount & refCount,
- const DocTypeName &docTypeName)
+MaintenanceController::MaintenanceController(ISyncableThreadService& masterThread,
+ vespalib::Executor& shared_executor,
+ MonitoredRefCount& refCount,
+ const DocTypeName& docTypeName)
: _masterThread(masterThread),
- _defaultExecutor(defaultExecutor),
+ _shared_executor(shared_executor),
_refCount(refCount),
_readySubDB(),
_remSubDB(),
@@ -70,10 +70,10 @@ MaintenanceController::registerJobInMasterThread(IMaintenanceJob::UP job)
}
void
-MaintenanceController::registerJobInDefaultPool(IMaintenanceJob::UP job)
+MaintenanceController::registerJobInSharedExecutor(IMaintenanceJob::UP job)
{
// Called by master write thread
- registerJob(_defaultExecutor, std::move(job));
+ registerJob(_shared_executor, std::move(job));
}
void
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
index f2c425b2fd0..086f5a36404 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
@@ -42,11 +42,12 @@ public:
using UP = std::unique_ptr<MaintenanceController>;
enum class State {INITIALIZING, STARTED, PAUSED, STOPPING};
- MaintenanceController(ISyncableThreadService &masterThread, vespalib::Executor & defaultExecutor, vespalib::MonitoredRefCount & refCount, const DocTypeName &docTypeName);
+ MaintenanceController(ISyncableThreadService& masterThread, vespalib::Executor& shared_executor,
+ vespalib::MonitoredRefCount& refCount, const DocTypeName& docTypeName);
~MaintenanceController();
void registerJobInMasterThread(IMaintenanceJob::UP job);
- void registerJobInDefaultPool(IMaintenanceJob::UP job);
+ void registerJobInSharedExecutor(IMaintenanceJob::UP job);
void killJobs();
@@ -82,7 +83,7 @@ private:
using Guard = std::lock_guard<Mutex>;
ISyncableThreadService &_masterThread;
- vespalib::Executor &_defaultExecutor;
+ vespalib::Executor &_shared_executor;
vespalib::MonitoredRefCount &_refCount;
MaintenanceDocumentSubDB _readySubDB;
MaintenanceDocumentSubDB _remSubDB;
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancejobrunner.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancejobrunner.cpp
index 9eb0596ff1f..5e6cee94292 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenancejobrunner.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenancejobrunner.cpp
@@ -1,12 +1,14 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "maintenancejobrunner.h"
-#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/fastos/thread.h>
+#include <vespa/vespalib/util/cpu_usage.h>
+#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/log/log.h>
LOG_SETUP(".proton.server.maintenancejobrunner");
+using vespalib::CpuUsage;
using vespalib::Executor;
using vespalib::makeLambdaTask;
@@ -34,7 +36,8 @@ MaintenanceJobRunner::addExecutorTask()
Guard guard(_lock);
if (!_stopped && !_job->isBlocked() && !_queued) {
_queued = true;
- _executor.execute(makeLambdaTask([this]() { runJobInExecutor(); }));
+ auto task = makeLambdaTask([this]() { runJobInExecutor(); });
+ _executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::COMPACT));
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/memory_flush_config_updater.cpp b/searchcore/src/vespa/searchcore/proton/server/memory_flush_config_updater.cpp
index 51243b718ec..50a499c8a73 100644
--- a/searchcore/src/vespa/searchcore/proton/server/memory_flush_config_updater.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/memory_flush_config_updater.cpp
@@ -13,9 +13,10 @@ namespace {
bool
shouldUseConservativeMode(const ResourceUsageState &resourceState,
bool currentlyUseConservativeMode,
+ double high_watermark_factor,
double lowWatermarkFactor)
{
- return resourceState.aboveLimit() ||
+ return resourceState.aboveLimit(high_watermark_factor) ||
(currentlyUseConservativeMode && resourceState.aboveLimit(lowWatermarkFactor));
}
@@ -25,6 +26,7 @@ void
MemoryFlushConfigUpdater::considerUseConservativeDiskMode(const LockGuard &guard, MemoryFlush::Config &newConfig)
{
if (shouldUseConservativeMode(_currState.diskState(), _useConservativeDiskMode,
+ _currConfig.conservative.highwatermarkfactor,
_currConfig.conservative.lowwatermarkfactor))
{
newConfig.maxGlobalTlsSize = _currConfig.maxtlssize * _currConfig.conservative.disklimitfactor;
@@ -41,6 +43,7 @@ void
MemoryFlushConfigUpdater::considerUseConservativeMemoryMode(const LockGuard &, MemoryFlush::Config &newConfig)
{
if (shouldUseConservativeMode(_currState.memoryState(), _useConservativeMemoryMode,
+ _currConfig.conservative.highwatermarkfactor,
_currConfig.conservative.lowwatermarkfactor))
{
newConfig.maxGlobalMemory = _currConfig.maxmemory * _currConfig.conservative.memorylimitfactor;
diff --git a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp
index 53b12972c44..9b1c6de15df 100644
--- a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp
@@ -1,14 +1,23 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "operationdonecontext.h"
+#include <vespa/searchcore/proton/common/feedtoken.h>
namespace proton {
-OperationDoneContext::OperationDoneContext(IDestructorCallback::SP token)
- : _token(std::move(token))
+OperationDoneContext::OperationDoneContext(std::shared_ptr<feedtoken::IState> token, std::shared_ptr<vespalib::IDestructorCallback> done_callback)
+ : _token(std::move(token)),
+ _done_callback(std::move(done_callback))
{
}
OperationDoneContext::~OperationDoneContext() = default;
+bool
+OperationDoneContext::is_replay() const
+{
+ return (!_token || _token->is_replay());
+}
+
+
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h
index 113aae8c9bf..baf0e98d5a4 100644
--- a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h
+++ b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h
@@ -4,6 +4,8 @@
#include <vespa/vespalib/util/idestructorcallback.h>
+namespace proton::feedtoken { class IState; }
+
namespace proton {
/**
@@ -16,13 +18,13 @@ namespace proton {
class OperationDoneContext : public vespalib::IDestructorCallback
{
public:
- using IDestructorCallback = vespalib::IDestructorCallback;
- OperationDoneContext(IDestructorCallback::SP token);
+ OperationDoneContext(std::shared_ptr<feedtoken::IState> token, std::shared_ptr<IDestructorCallback> done_callback);
~OperationDoneContext() override;
- bool hasToken() const { return static_cast<bool>(_token); }
+ bool is_replay() const;
private:
- IDestructorCallback::SP _token;
+ std::shared_ptr<feedtoken::IState> _token;
+ std::shared_ptr<vespalib::IDestructorCallback> _done_callback;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index b128fe16e5e..6001e6b88d4 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -24,10 +24,10 @@
#include <vespa/searchcore/proton/flushengine/flushengine.h>
#include <vespa/searchcore/proton/flushengine/tls_stats_factory.h>
#include <vespa/searchcore/proton/matchengine/matchengine.h>
+#include <vespa/searchcore/proton/metrics/content_proton_metrics.h>
+#include <vespa/searchcore/proton/metrics/metrics_engine.h>
#include <vespa/searchcore/proton/persistenceengine/persistenceengine.h>
#include <vespa/searchcore/proton/reference/document_db_reference_registry.h>
-#include <vespa/searchcore/proton/metrics/metrics_engine.h>
-#include <vespa/searchcore/proton/metrics/content_proton_metrics.h>
#include <vespa/searchcore/proton/summaryengine/summaryengine.h>
#include <vespa/searchlib/common/packets.h>
#include <vespa/searchlib/transactionlog/trans_log_server_explorer.h>
@@ -36,13 +36,13 @@
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/vespalib/net/state_server.h>
#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
+#include <vespa/vespalib/util/cpu_usage.h>
#include <vespa/vespalib/util/host_name.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/util/mmap_file_allocator_factory.h>
#include <vespa/vespalib/util/random.h>
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <vespa/vespalib/util/size_literals.h>
-#include <vespa/vespalib/util/invokeserviceimpl.h>
#ifdef __linux__
#include <malloc.h>
#endif
@@ -54,20 +54,22 @@
#include <vespa/log/log.h>
LOG_SETUP(".proton.server.proton");
+using CpuCategory = vespalib::CpuUsage::Category;
+
using document::DocumentTypeRepo;
+using search::engine::MonitorReply;
+using search::transactionlog::DomainStats;
+using vespa::config::search::core::ProtonConfig;
+using vespa::config::search::core::internal::InternalProtonType;
+using vespalib::CpuUsage;
using vespalib::FileHeader;
using vespalib::IllegalStateException;
using vespalib::Slime;
+using vespalib::compression::CompressionConfig;
using vespalib::makeLambdaTask;
using vespalib::slime::ArrayInserter;
using vespalib::slime::Cursor;
-using search::transactionlog::DomainStats;
-using vespa::config::search::core::ProtonConfig;
-using vespa::config::search::core::internal::InternalProtonType;
-using vespalib::compression::CompressionConfig;
-using search::engine::MonitorReply;
-
namespace proton {
namespace {
@@ -139,8 +141,8 @@ struct MetricsUpdateHook : metrics::UpdateHook
const vespalib::string CUSTOM_COMPONENT_API_PATH = "/state/v1/custom/component";
-VESPA_THREAD_STACK_TAG(initialize_executor)
-VESPA_THREAD_STACK_TAG(close_executor)
+VESPA_THREAD_STACK_TAG(proton_initialize_executor)
+VESPA_THREAD_STACK_TAG(proton_close_executor)
}
@@ -207,6 +209,7 @@ Proton::Proton(const config::ConfigUri & configUri,
StatusProducer(),
IPersistenceEngineOwner(),
ComponentConfigProducer(),
+ _cpu_util(),
_configUri(configUri),
_mutex(),
_metricsHook(std::make_unique<MetricsUpdateHook>(*this)),
@@ -310,7 +313,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
_protonDiskLayout = std::make_unique<ProtonDiskLayout>(protonConfig.basedir, protonConfig.tlsspec);
vespalib::chdir(protonConfig.basedir);
vespalib::alloc::MmapFileAllocatorFactory::instance().setup(protonConfig.basedir + "/swapdirs");
- _tls->start();
+ _tls->start(hwInfo.cpu().cores());
_flushEngine = std::make_unique<FlushEngine>(std::make_shared<flushengine::TlsStatsFactory>(_tls->getTransLogServer()),
strategy, flush.maxconcurrent, vespalib::from_s(flush.idleinterval));
_metricsEngine->addExternalMetrics(_summaryEngine->getMetrics());
@@ -329,7 +332,8 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
_compile_cache_executor_binding = vespalib::eval::CompileCache::bind(_shared_service->shared_raw());
InitializeThreads initializeThreads;
if (protonConfig.initialize.threads > 0) {
- initializeThreads = std::make_shared<vespalib::ThreadStackExecutor>(protonConfig.initialize.threads, 128_Ki, initialize_executor);
+ initializeThreads = std::make_shared<vespalib::ThreadStackExecutor>(protonConfig.initialize.threads, 128_Ki,
+ CpuUsage::wrap(proton_initialize_executor, CpuCategory::SETUP));
_initDocumentDbsInSequence = (protonConfig.initialize.threads == 1);
}
_protonConfigurer.applyInitialConfig(initializeThreads);
@@ -463,7 +467,8 @@ Proton::~Proton()
}
}
- vespalib::ThreadStackExecutor closePool(std::min(_documentDBMap.size(), numCores), 0x20000, close_executor);
+ vespalib::ThreadStackExecutor closePool(std::min(_documentDBMap.size(), numCores), 0x20000,
+ CpuUsage::wrap(proton_close_executor, CpuCategory::SETUP));
closeDocumentDBs(closePool);
}
_documentDBMap.clear();
@@ -753,12 +758,20 @@ Proton::updateMetrics(const metrics::MetricLockGuard &)
const DiskMemUsageFilter &usageFilter = _diskMemUsageSampler->writeFilter();
auto dm_metrics = usageFilter.get_metrics();
- metrics.resourceUsage.disk.set(dm_metrics.get_disk_usage());
- metrics.resourceUsage.diskUtilization.set(dm_metrics.get_disk_utilization());
- metrics.resourceUsage.memory.set(dm_metrics.get_memory_usage());
- metrics.resourceUsage.memoryUtilization.set(dm_metrics.get_memory_utilization());
- metrics.resourceUsage.transient_memory.set(dm_metrics.get_transient_memory_usage());
- metrics.resourceUsage.transient_disk.set(dm_metrics.get_transient_disk_usage());
+ metrics.resourceUsage.disk.set(dm_metrics.non_transient_disk_usage());
+ metrics.resourceUsage.diskUtilization.set(dm_metrics.total_disk_utilization());
+ metrics.resourceUsage.transient_disk.set(dm_metrics.transient_disk_usage());
+ metrics.resourceUsage.disk_usage.total.set(dm_metrics.total_disk_usage());
+ metrics.resourceUsage.disk_usage.total_util.set(dm_metrics.total_disk_utilization());
+ metrics.resourceUsage.disk_usage.transient.set(dm_metrics.transient_disk_usage());
+
+ metrics.resourceUsage.memory.set(dm_metrics.non_transient_memory_usage());
+ metrics.resourceUsage.memoryUtilization.set(dm_metrics.total_memory_utilization());
+ metrics.resourceUsage.transient_memory.set(dm_metrics.transient_memory_usage());
+ metrics.resourceUsage.memory_usage.total.set(dm_metrics.total_memory_usage());
+ metrics.resourceUsage.memory_usage.total_util.set(dm_metrics.total_memory_utilization());
+ metrics.resourceUsage.memory_usage.transient.set(dm_metrics.transient_memory_usage());
+
metrics.resourceUsage.memoryMappings.set(usageFilter.getMemoryStats().getMappingsCount());
metrics.resourceUsage.openFileDescriptors.set(FastOS_File::count_open_files());
metrics.resourceUsage.feedingBlocked.set((usageFilter.acceptWriteOperation() ? 0.0 : 1.0));
@@ -775,6 +788,12 @@ Proton::updateMetrics(const metrics::MetricLockGuard &)
#else
metrics.resourceUsage.mallocArena.set(UINT64_C(0));
#endif
+ auto cpu_util = _cpu_util.get_util();
+ metrics.resourceUsage.cpu_util.setup.set(cpu_util[CpuCategory::SETUP]);
+ metrics.resourceUsage.cpu_util.read.set(cpu_util[CpuCategory::READ]);
+ metrics.resourceUsage.cpu_util.write.set(cpu_util[CpuCategory::WRITE]);
+ metrics.resourceUsage.cpu_util.compact.set(cpu_util[CpuCategory::COMPACT]);
+ metrics.resourceUsage.cpu_util.other.set(cpu_util[CpuCategory::OTHER]);
}
{
ContentProtonMetrics::ProtonExecutorMetrics &metrics = _metricsEngine->root().executor;
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h
index 573f215c722..90a257a0aaa 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.h
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.h
@@ -25,6 +25,7 @@
#include <vespa/vespalib/net/json_handler_repo.h>
#include <vespa/vespalib/net/state_explorer.h>
#include <vespa/vespalib/util/varholder.h>
+#include <vespa/vespalib/util/cpu_usage.h>
#include <mutex>
#include <shared_mutex>
@@ -81,6 +82,7 @@ private:
void setClusterName(const vespalib::string &clusterName, const vespalib::string &baseDir);
};
+ vespalib::CpuUtil _cpu_util;
const config::ConfigUri _configUri;
mutable std::shared_mutex _mutex;
std::unique_ptr<metrics::UpdateHook> _metricsHook;
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.h b/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.h
index 56a4d1b8c9f..e6d036df7d9 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.h
+++ b/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.h
@@ -2,14 +2,13 @@
#pragma once
-#include <vespa/fastos/thread.h>
-#include <vespa/searchcore/proton/common/doctypename.h>
-#include <vespa/vespalib/stllike/string.h>
-#include <vespa/config/config.h>
#include "bootstrapconfigmanager.h"
#include "documentdbconfigmanager.h"
#include "i_document_db_config_owner.h"
-#include <chrono>
+#include <vespa/fastos/thread.h>
+#include <vespa/searchcore/proton/common/doctypename.h>
+#include <vespa/config/retriever/configretriever.h>
+#include <vespa/config/subscription/configuri.h>
namespace document { class DocumentTypeRepo; }
diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp
index 7801f21c906..695f39c0097 100644
--- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp
@@ -9,9 +9,11 @@ using document::Document;
namespace proton {
-PutDoneContext::PutDoneContext(IDestructorCallback::SP token, IPendingLidTracker::Token uncommitted,
+PutDoneContext::PutDoneContext(std::shared_ptr<feedtoken::IState> token,
+ std::shared_ptr<vespalib::IDestructorCallback> done_callback,
+ IPendingLidTracker::Token uncommitted,
std::shared_ptr<const Document> doc, uint32_t lid)
- : OperationDoneContext(std::move(token)),
+ : OperationDoneContext(std::move(token), std::move(done_callback)),
_uncommitted(std::move(uncommitted)),
_lid(lid),
_docIdLimit(nullptr),
diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h
index 00e8ac46c93..e0f55816314 100644
--- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h
+++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h
@@ -28,7 +28,9 @@ class PutDoneContext : public OperationDoneContext
std::shared_ptr<const document::Document> _doc;
public:
- PutDoneContext(IDestructorCallback::SP token, IPendingLidTracker::Token uncommitted,
+ PutDoneContext(std::shared_ptr<feedtoken::IState> token,
+ std::shared_ptr<vespalib::IDestructorCallback> done_callback,
+ IPendingLidTracker::Token uncommitted,
std::shared_ptr<const document::Document> doc, uint32_t lid);
~PutDoneContext() override;
diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp
index 558a57bb8c6..0c93992d427 100644
--- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp
@@ -4,8 +4,8 @@
namespace proton {
-RemoveDoneContext::RemoveDoneContext(IDestructorCallback::SP token, IPendingLidTracker::Token uncommitted)
- : OperationDoneContext(std::move(token)),
+RemoveDoneContext::RemoveDoneContext(std::shared_ptr<feedtoken::IState> token, std::shared_ptr<IDestructorCallback> done_callback, IPendingLidTracker::Token uncommitted)
+ : OperationDoneContext(std::move(token), std::move(done_callback)),
_uncommitted(std::move(uncommitted))
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h
index 28e15389bb2..62db0f20b84 100644
--- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h
+++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h
@@ -19,7 +19,7 @@ class RemoveDoneContext : public OperationDoneContext
IPendingLidTracker::Token _uncommitted;
public:
- RemoveDoneContext(IDestructorCallback::SP token, IPendingLidTracker::Token uncommitted);
+ RemoveDoneContext(std::shared_ptr<feedtoken::IState>, std::shared_ptr<vespalib::IDestructorCallback> done_callback, IPendingLidTracker::Token uncommitted);
~RemoveDoneContext() override;
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/replay_throttling_policy.h b/searchcore/src/vespa/searchcore/proton/server/replay_throttling_policy.h
new file mode 100644
index 00000000000..3e1485d94dd
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/replay_throttling_policy.h
@@ -0,0 +1,27 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/util/shared_operation_throttler.h>
+#include <optional>
+
+namespace proton {
+
+/*
+ * Policy for transaction log replay throttling. If params are set then a dynamic throttler
+ * is used, otherwise an unlimited throttler is used.
+ */
+class ReplayThrottlingPolicy
+{
+ using DynamicThrottleParams = vespalib::SharedOperationThrottler::DynamicThrottleParams;
+ std::optional<DynamicThrottleParams> _params;
+
+public:
+ explicit ReplayThrottlingPolicy(std::optional<DynamicThrottleParams> params)
+ : _params(std::move(params))
+ {
+ }
+ const std::optional<DynamicThrottleParams>& get_params() const noexcept { return _params; }
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp
index 66b1ba1ae2e..207f1d813d8 100644
--- a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp
@@ -99,8 +99,8 @@ SearchableFeedView::updateIndexedFields(SerialNum serialNum, search::DocumentIdT
{
_writeService.index().execute(
makeLambdaTask([serialNum, lid, futureDoc = std::move(futureDoc),
- onWriteDone = std::move(onWriteDone), this]() mutable {
- performIndexPut(serialNum, lid, std::move(futureDoc), std::move(onWriteDone));
+ onWriteDone, this]() mutable {
+ performIndexPut(serialNum, lid, std::move(futureDoc), onWriteDone);
}));
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
index e32cd6f5f4e..7e799e506e3 100644
--- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
@@ -2,10 +2,12 @@
#include "shared_threading_service.h"
#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
+#include <vespa/vespalib/util/cpu_usage.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <vespa/vespalib/util/size_literals.h>
-#include <vespa/vespalib/util/invokeserviceimpl.h>
+
+using vespalib::CpuUsage;
VESPA_THREAD_STACK_TAG(proton_field_writer_executor)
VESPA_THREAD_STACK_TAG(proton_shared_executor)
@@ -16,7 +18,7 @@ namespace proton {
using SharedFieldWriterExecutor = ThreadingServiceConfig::ProtonConfig::Feeding::SharedFieldWriterExecutor;
SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfig& cfg)
- : _warmup(cfg.warmup_threads(), 128_Ki, proton_warmup_executor),
+ : _warmup(cfg.warmup_threads(), 128_Ki, CpuUsage::wrap(proton_warmup_executor, CpuUsage::Category::COMPACT)),
_shared(std::make_shared<vespalib::BlockingThreadStackExecutor>(cfg.shared_threads(), 128_Ki,
cfg.shared_task_limit(), proton_shared_executor)),
_field_writer(),
@@ -25,9 +27,10 @@ SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfi
{
const auto& fw_cfg = cfg.field_writer_config();
if (fw_cfg.shared_field_writer() == SharedFieldWriterExecutor::DOCUMENT_DB) {
- _field_writer = vespalib::SequencedTaskExecutor::create(proton_field_writer_executor,
+ _field_writer = vespalib::SequencedTaskExecutor::create(CpuUsage::wrap(proton_field_writer_executor, CpuUsage::Category::WRITE),
fw_cfg.indexingThreads() * 3,
fw_cfg.defaultTaskLimit(),
+ fw_cfg.is_task_limit_hard(),
fw_cfg.optimize(),
fw_cfg.kindOfwatermark());
if (fw_cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT) {
diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp
index 8a81c3f4388..76b7982fedd 100644
--- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp
@@ -30,6 +30,11 @@ derive_shared_threads(const ProtonConfig& cfg, const HwInfo::Cpu& cpu_info)
return std::max(scaled_cores, cfg.documentdb.size() + cfg.flush.maxconcurrent + 1);
}
+size_t
+derive_warmup_threads(const HwInfo::Cpu& cpu_info) {
+ return std::max(1u, std::min(4u, cpu_info.cores()/8));
+}
+
}
SharedThreadingServiceConfig
@@ -37,7 +42,7 @@ SharedThreadingServiceConfig::make(const proton::SharedThreadingServiceConfig::P
const proton::HwInfo::Cpu& cpu_info)
{
size_t shared_threads = derive_shared_threads(cfg, cpu_info);
- return proton::SharedThreadingServiceConfig(shared_threads, shared_threads * 16, 4,
+ return proton::SharedThreadingServiceConfig(shared_threads, shared_threads * 16, derive_warmup_threads(cpu_info),
ThreadingServiceConfig::make(cfg, cfg.feeding.concurrency, cpu_info));
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
index 97bd940b403..170b6f99930 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
@@ -15,9 +15,10 @@
#include <vespa/searchcore/proton/feedoperation/operations.h>
#include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h>
#include <vespa/searchcore/proton/reference/i_pending_gid_to_lid_changes.h>
+#include <vespa/vespalib/util/cpu_usage.h>
#include <vespa/vespalib/util/destructor_callbacks.h>
-#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/log/log.h>
LOG_SETUP(".proton.server.storeonlyfeedview");
@@ -25,28 +26,29 @@ LOG_SETUP(".proton.server.storeonlyfeedview");
using document::BucketId;
using document::Document;
using document::DocumentId;
-using document::GlobalId;
using document::DocumentTypeRepo;
using document::DocumentUpdate;
-using vespalib::IDestructorCallback;
+using document::GlobalId;
+using proton::documentmetastore::LidReuseDelayer;
using search::SerialNum;
using search::index::Schema;
using storage::spi::BucketInfoResult;
using storage::spi::Timestamp;
+using vespalib::CpuUsage;
+using vespalib::IDestructorCallback;
using vespalib::IllegalStateException;
using vespalib::makeLambdaTask;
using vespalib::make_string;
-using proton::documentmetastore::LidReuseDelayer;
namespace proton {
namespace {
std::shared_ptr<PutDoneContext>
-createPutDoneContext(IDestructorCallback::SP token, IPendingLidTracker::Token uncommitted,
+createPutDoneContext(FeedToken token, std::shared_ptr<IDestructorCallback> done_callback, IPendingLidTracker::Token uncommitted,
std::shared_ptr<const Document> doc, uint32_t lid)
{
- return std::make_shared<PutDoneContext>(std::move(token), std::move(uncommitted), std::move(doc), lid);
+ return std::make_shared<PutDoneContext>(std::move(token), std::move(done_callback), std::move(uncommitted), std::move(doc), lid);
}
std::shared_ptr<UpdateDoneContext>
@@ -66,9 +68,9 @@ void setPrev(DocumentOperation &op, const documentmetastore::IStore::Result &res
}
std::shared_ptr<RemoveDoneContext>
-createRemoveDoneContext(IDestructorCallback::SP token, IPendingLidTracker::Token uncommitted)
+createRemoveDoneContext(FeedToken token, std::shared_ptr<IDestructorCallback> done_callback, IPendingLidTracker::Token uncommitted)
{
- return std::make_shared<RemoveDoneContext>(std::move(token), std::move(uncommitted));
+ return std::make_shared<RemoveDoneContext>(std::move(token), std::move(done_callback), std::move(uncommitted));
}
class SummaryPutDoneContext : public OperationDoneContext
@@ -80,7 +82,7 @@ public:
};
SummaryPutDoneContext::SummaryPutDoneContext(FeedToken token, IPendingLidTracker::Token uncommitted)
- : OperationDoneContext(std::move(token)),
+ : OperationDoneContext(std::move(token), {}),
_uncommitted(std::move(uncommitted))
{}
@@ -247,9 +249,19 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp)
if (putOp.getValidDbdId(_params._subDbId)) {
if (putOp.changedDbdId() && useDocumentMetaStore(serialNum)) {
- _gidToLidChangeHandler.notifyPut(token, docId.getGlobalId(), putOp.getLid(), serialNum);
+ /*
+ * Don't pass replay feed token to GidToLidChangeHandler.
+ *
+ * The passed feed token is kept until the ForceCommitDoneTask scheduled by the next
+ * force commit has completed. If a replay feed token containing an active throttler
+ * token is passed to GidToLidChangeHandler then
+ * TransactionLogReplayFeedHandler::make_replay_feed_token() might deadlock, waiting for
+ * active throttler tokens to be destroyed.
+ */
+ FeedToken token_copy = (token && !token->is_replay()) ? token : FeedToken();
+ _gidToLidChangeHandler.notifyPut(std::move(token_copy), docId.getGlobalId(), putOp.getLid(), serialNum);
}
- auto onWriteDone = createPutDoneContext(std::move(token), get_pending_lid_token(putOp), doc, putOp.getLid());
+ auto onWriteDone = createPutDoneContext(std::move(token), {}, get_pending_lid_token(putOp), doc, putOp.getLid());
putSummary(serialNum, putOp.getLid(), doc, onWriteDone);
putAttributes(serialNum, putOp.getLid(), *doc, onWriteDone);
putIndexedFields(serialNum, putOp.getLid(), doc, onWriteDone);
@@ -257,7 +269,7 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp)
if (docAlreadyExists && putOp.changedDbdId()) {
//TODO, better to have an else than an assert ?
assert(!putOp.getValidDbdId(_params._subDbId));
- internalRemove(std::move(token), _pendingLidsForCommit->produce(putOp.getPrevLid()), serialNum, putOp.getPrevLid());
+ internalRemove(std::move(token), {}, _pendingLidsForCommit->produce(putOp.getPrevLid()), serialNum, putOp.getPrevLid());
}
}
@@ -357,7 +369,7 @@ StoreOnlyFeedView::removeSummaries(SerialNum serialNum, const LidVector & lids,
trackerTokens.emplace_back(_pendingLidsForDocStore.produce(lid));
});
summaryExecutor().execute(
- makeLambdaTask([serialNum, lids = std::move(lids), onDone, trackerTokens = std::move(trackerTokens), this] {
+ makeLambdaTask([serialNum, lids, onDone, trackerTokens = std::move(trackerTokens), this] {
(void) onDone;
(void) trackerTokens;
std::for_each(lids.begin(), lids.end(), [this, serialNum](Lid lid) {
@@ -439,13 +451,14 @@ StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp)
} else {
putSummaryNoop(std::move(futureStream), onWriteDone);
}
- _writeService.shared().execute(makeLambdaTask(
- [upd = updOp.getUpdate(), useDocStore, lid, onWriteDone, promisedDoc = std::move(promisedDoc),
- promisedStream = std::move(promisedStream), this]() mutable
- {
- makeUpdatedDocument(useDocStore, lid, *upd, onWriteDone,
- std::move(promisedDoc), std::move(promisedStream));
- }));
+ auto task = makeLambdaTask([upd = updOp.getUpdate(), useDocStore, lid, onWriteDone,
+ promisedDoc = std::move(promisedDoc),
+ promisedStream = std::move(promisedStream), this]() mutable
+ {
+ makeUpdatedDocument(useDocStore, lid, *upd, onWriteDone,
+ std::move(promisedDoc), std::move(promisedStream));
+ });
+ _writeService.shared().execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::WRITE));
updateAttributes(serialNum, lid, std::move(futureDoc), onWriteDone);
}
}
@@ -458,13 +471,13 @@ StoreOnlyFeedView::makeUpdatedDocument(bool useDocStore, Lid lid, const Document
Document::UP prevDoc = _summaryAdapter->get(lid, *_repo);
Document::UP newDoc;
vespalib::nbostream newStream(12345);
- assert(!onWriteDone->hasToken() || useDocStore);
+ assert(onWriteDone->is_replay() || useDocStore);
if (useDocStore) {
assert(prevDoc);
}
if (!prevDoc) {
// Replaying, document removed later before summary was flushed.
- assert(!onWriteDone->hasToken());
+ assert(onWriteDone->is_replay());
// If we've passed serial number for flushed index then we could
// also check that this operation is marked for ignore by index
// proxy.
@@ -478,7 +491,7 @@ StoreOnlyFeedView::makeUpdatedDocument(bool useDocStore, Lid lid, const Document
} else {
// Replaying, document removed and lid reused before summary
// was flushed.
- assert(!onWriteDone->hasToken() && !useDocStore);
+ assert(onWriteDone->is_replay() && !useDocStore);
}
}
promisedDoc.set_value(std::move(newDoc));
@@ -552,7 +565,7 @@ StoreOnlyFeedView::internalRemove(FeedToken token, const RemoveOperationWithDocI
if (rmOp.changedDbdId()) {
//TODO Prefer else over assert ?
assert(!rmOp.getValidDbdId(_params._subDbId));
- internalRemove(std::move(token), _pendingLidsForCommit->produce(rmOp.getPrevLid()), serialNum, rmOp.getPrevLid());
+ internalRemove(std::move(token), {}, _pendingLidsForCommit->produce(rmOp.getPrevLid()), serialNum, rmOp.getPrevLid());
}
}
}
@@ -569,16 +582,16 @@ StoreOnlyFeedView::internalRemove(FeedToken token, const RemoveOperationWithGid
if (rmOp.getValidPrevDbdId(_params._subDbId)) {
if (rmOp.changedDbdId()) {
assert(!rmOp.getValidDbdId(_params._subDbId));
- internalRemove(std::move(token), _pendingLidsForCommit->produce(rmOp.getPrevLid()), serialNum, rmOp.getPrevLid());
+ internalRemove(std::move(token), {}, _pendingLidsForCommit->produce(rmOp.getPrevLid()), serialNum, rmOp.getPrevLid());
}
}
}
void
-StoreOnlyFeedView::internalRemove(IDestructorCallback::SP token, IPendingLidTracker::Token uncommitted, SerialNum serialNum, Lid lid)
+StoreOnlyFeedView::internalRemove(FeedToken token, std::shared_ptr<IDestructorCallback> done_callback, IPendingLidTracker::Token uncommitted, SerialNum serialNum, Lid lid)
{
_lidReuseDelayer.delayReuse(lid);
- auto onWriteDone = createRemoveDoneContext(std::move(token), std::move(uncommitted));
+ auto onWriteDone = createRemoveDoneContext(std::move(token), std::move(done_callback), std::move(uncommitted));
removeSummary(serialNum, lid, onWriteDone);
removeAttributes(serialNum, lid, onWriteDone);
removeIndexedFields(serialNum, lid, onWriteDone);
@@ -706,13 +719,13 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, DoneCallback doneCtx)
if (moveOp.changedDbdId() && useDocumentMetaStore(serialNum)) {
_gidToLidChangeHandler.notifyPut(FeedToken(), docId.getGlobalId(), moveOp.getLid(), serialNum);
}
- auto onWriteDone = createPutDoneContext(doneCtx, _pendingLidsForCommit->produce(moveOp.getLid()), doc, moveOp.getLid());
+ auto onWriteDone = createPutDoneContext({}, doneCtx, _pendingLidsForCommit->produce(moveOp.getLid()), doc, moveOp.getLid());
putSummary(serialNum, moveOp.getLid(), doc, onWriteDone);
putAttributes(serialNum, moveOp.getLid(), *doc, onWriteDone);
putIndexedFields(serialNum, moveOp.getLid(), doc, onWriteDone);
}
if (docAlreadyExists && moveOp.changedDbdId()) {
- internalRemove(doneCtx, _pendingLidsForCommit->produce(moveOp.getPrevLid()), serialNum, moveOp.getPrevLid());
+ internalRemove({}, doneCtx, _pendingLidsForCommit->produce(moveOp.getPrevLid()), serialNum, moveOp.getPrevLid());
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
index c25accaf4a4..bd8509fa796 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
@@ -180,7 +180,7 @@ private:
// returns the number of documents removed.
size_t removeDocuments(const RemoveDocumentsOperation &op, bool remove_index_and_attribute_fields, DoneCallback onDone);
- void internalRemove(IDestructorCallbackSP token, IPendingLidTracker::Token uncommitted, SerialNum serialNum, Lid lid);
+ void internalRemove(FeedToken token, std::shared_ptr<vespalib::IDestructorCallback> done_callback,IPendingLidTracker::Token uncommitted, SerialNum serialNum, Lid lid);
IPendingLidTracker::Token get_pending_lid_token(const DocumentOperation &op);
diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
index 7982e8a8414..335d5bab8d0 100644
--- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
@@ -12,14 +12,15 @@ using OptimizeFor = vespalib::Executor::OptimizeFor;
ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_,
uint32_t master_task_limit_,
- uint32_t defaultTaskLimit_,
+ int32_t defaultTaskLimit_,
OptimizeFor optimize_,
uint32_t kindOfWatermark_,
vespalib::duration reactionTime_,
SharedFieldWriterExecutor shared_field_writer_)
: _indexingThreads(indexingThreads_),
_master_task_limit(master_task_limit_),
- _defaultTaskLimit(defaultTaskLimit_),
+ _defaultTaskLimit(std::abs(defaultTaskLimit_)),
+ _is_task_limit_hard(defaultTaskLimit_ >= 0),
_optimize(optimize_),
_kindOfWatermark(kindOfWatermark_),
_reactionTime(reactionTime_),
@@ -81,6 +82,7 @@ ThreadingServiceConfig::operator==(const ThreadingServiceConfig &rhs) const
return _indexingThreads == rhs._indexingThreads &&
_master_task_limit == rhs._master_task_limit &&
_defaultTaskLimit == rhs._defaultTaskLimit &&
+ _is_task_limit_hard == rhs._is_task_limit_hard &&
_optimize == rhs._optimize &&
_kindOfWatermark == rhs._kindOfWatermark &&
_reactionTime == rhs._reactionTime &&
diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
index f1a4f0525d1..a54c0674263 100644
--- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
+++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
@@ -23,15 +23,16 @@ private:
uint32_t _indexingThreads;
uint32_t _master_task_limit;
uint32_t _defaultTaskLimit;
+ bool _is_task_limit_hard;
OptimizeFor _optimize;
uint32_t _kindOfWatermark;
vespalib::duration _reactionTime; // Maximum reaction time to new tasks
SharedFieldWriterExecutor _shared_field_writer;
private:
- ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t master_task_limit_, uint32_t defaultTaskLimit_,
- OptimizeFor optimize_, uint32_t kindOfWatermark_, vespalib::duration reactionTime_,
- SharedFieldWriterExecutor shared_field_writer_);
+ ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t master_task_limit_, int32_t defaultTaskLimit_,
+ OptimizeFor optimize_, uint32_t kindOfWatermark_,
+ vespalib::duration reactionTime_, SharedFieldWriterExecutor shared_field_writer_);
public:
static ThreadingServiceConfig make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo);
@@ -40,6 +41,7 @@ public:
uint32_t indexingThreads() const { return _indexingThreads; }
uint32_t master_task_limit() const { return _master_task_limit; }
uint32_t defaultTaskLimit() const { return _defaultTaskLimit; }
+ bool is_task_limit_hard() const { return _is_task_limit_hard; }
OptimizeFor optimize() const { return _optimize; }
uint32_t kindOfwatermark() const { return _kindOfWatermark; }
vespalib::duration reactionTime() const { return _reactionTime; }
diff --git a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp
index 5842965a18a..16099700980 100644
--- a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp
@@ -7,8 +7,8 @@ using document::Document;
namespace proton {
-UpdateDoneContext::UpdateDoneContext(IDestructorCallback::SP token, IPendingLidTracker::Token uncommitted, const document::DocumentUpdate::SP &upd)
- : OperationDoneContext(std::move(token)),
+UpdateDoneContext::UpdateDoneContext(std::shared_ptr<feedtoken::IState> token, IPendingLidTracker::Token uncommitted, const document::DocumentUpdate::SP &upd)
+ : OperationDoneContext(std::move(token), {}),
_uncommitted(std::move(uncommitted)),
_upd(upd),
_doc()
diff --git a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h
index 62448102f30..7dd6c4a61f2 100644
--- a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h
+++ b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h
@@ -24,7 +24,7 @@ class UpdateDoneContext : public OperationDoneContext
document::DocumentUpdate::SP _upd;
std::shared_future<std::unique_ptr<const document::Document>> _doc;
public:
- UpdateDoneContext(IDestructorCallback::SP token, IPendingLidTracker::Token uncommitted, const document::DocumentUpdate::SP &upd);
+ UpdateDoneContext(std::shared_ptr<feedtoken::IState> token, IPendingLidTracker::Token uncommitted, const document::DocumentUpdate::SP &upd);
~UpdateDoneContext() override;
const document::DocumentUpdate &getUpdate() { return *_upd; }
diff --git a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp
index 95643c7d1a8..0bf5b351fd0 100644
--- a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp
@@ -2,6 +2,7 @@
#include "summaryengine.h"
#include <vespa/vespalib/data/slime/slime.h>
#include <vespa/vespalib/util/size_literals.h>
+#include <vespa/vespalib/util/cpu_usage.h>
#include <vespa/log/log.h>
LOG_SETUP(".proton.summaryengine.summaryengine");
@@ -10,6 +11,7 @@ using namespace search::engine;
using namespace proton;
using vespalib::Memory;
using vespalib::slime::Inspector;
+using vespalib::CpuUsage;
namespace {
@@ -61,7 +63,7 @@ SummaryEngine::SummaryEngine(size_t numThreads, bool async)
_closed(false),
_forward_issues(true),
_handlers(),
- _executor(numThreads, 128_Ki, summary_engine_executor),
+ _executor(numThreads, 128_Ki, CpuUsage::wrap(summary_engine_executor, CpuUsage::Category::READ)),
_metrics(std::make_unique<DocsumMetrics>())
{ }
@@ -141,6 +143,9 @@ SummaryEngine::getDocsums(DocsumRequest::UP req)
}
}
updateDocsumMetrics(vespalib::to_s(req->getTimeUsed()), getNumDocs(*reply));
+ if (req->expired()) {
+ vespalib::Issue::report("docsum request timed out; results may be incomplete");
+ }
}
if (! reply) {
reply = std::make_unique<DocsumReply>();
diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_attribute_manager.h b/searchcore/src/vespa/searchcore/proton/test/mock_attribute_manager.h
index 743cd9af8fc..0e2491e62fa 100644
--- a/searchcore/src/vespa/searchcore/proton/test/mock_attribute_manager.h
+++ b/searchcore/src/vespa/searchcore/proton/test/mock_attribute_manager.h
@@ -14,7 +14,7 @@ private:
std::vector<search::AttributeVector*> _writables;
std::unique_ptr<ImportedAttributesRepo> _importedAttributes;
vespalib::ISequencedTaskExecutor* _writer;
- vespalib::ThreadExecutor* _shared;
+ vespalib::Executor* _shared;
public:
MockAttributeManager()
@@ -33,7 +33,7 @@ public:
void set_writer(vespalib::ISequencedTaskExecutor& writer) {
_writer = &writer;
}
- void set_shared_executor(vespalib::ThreadExecutor& shared) {
+ void set_shared_executor(vespalib::Executor& shared) {
_shared = &shared;
}
search::AttributeGuard::UP getAttribute(const vespalib::string &name) const override {
@@ -72,7 +72,7 @@ public:
assert(_writer != nullptr);
return *_writer;
}
- vespalib::ThreadExecutor& get_shared_executor() const override {
+ vespalib::Executor& get_shared_executor() const override {
assert(_shared != nullptr);
return *_shared;
}
diff --git a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h
index e93b1632b3f..78a740ec2c3 100644
--- a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h
+++ b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h
@@ -15,7 +15,7 @@ private:
SyncableThreadServiceObserver _master;
ThreadServiceObserver _index;
ThreadExecutorObserver _summary;
- vespalib::ThreadExecutor & _shared;
+ vespalib::Executor & _shared;
vespalib::SequencedTaskExecutorObserver _indexFieldInverter;
vespalib::SequencedTaskExecutorObserver _indexFieldWriter;
vespalib::SequencedTaskExecutorObserver _attributeFieldWriter;
@@ -46,7 +46,7 @@ public:
vespalib::ThreadExecutor &summary() override {
return _summary;
}
- vespalib::ThreadExecutor &shared() override {
+ vespalib::Executor &shared() override {
return _shared;
}
vespalib::ISequencedTaskExecutor &indexFieldInverter() override {