aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirst@yahooinc.com>2022-03-17 09:25:04 +0000
committerGeir Storli <geirst@yahooinc.com>2022-03-17 09:25:04 +0000
commit0c00a3e980a54f903ed8cdfbca432493f6d694b2 (patch)
treece8e2ff563a1650215de35bb99deefa7e604cdc3
parentedc0babf926099386675b7dac4131c4a35aeaec5 (diff)
Add BucketExecutor to ISharedThreadingService interface.
-rw-r--r--searchcore/src/apps/tests/persistenceconformance_test.cpp7
-rw-r--r--searchcore/src/tests/proton/docsummary/docsummary.cpp5
-rw-r--r--searchcore/src/tests/proton/documentdb/documentdb_test.cpp5
-rw-r--r--searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp13
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h6
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.cpp5
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h8
13 files changed, 48 insertions, 35 deletions
diff --git a/searchcore/src/apps/tests/persistenceconformance_test.cpp b/searchcore/src/apps/tests/persistenceconformance_test.cpp
index 4369666591f..9a7b49065d4 100644
--- a/searchcore/src/apps/tests/persistenceconformance_test.cpp
+++ b/searchcore/src/apps/tests/persistenceconformance_test.cpp
@@ -16,7 +16,6 @@
#include <vespa/document/test/make_bucket_space.h>
#include <vespa/fastos/file.h>
#include <vespa/persistence/conformancetest/conformancetest.h>
-#include <vespa/persistence/dummyimpl/dummy_bucket_executor.h>
#include <vespa/searchcommon/common/schemaconfigurer.h>
#include <vespa/searchcore/proton/common/alloc_config.h>
#include <vespa/searchcore/proton/common/hw_info.h>
@@ -176,7 +175,6 @@ private:
vespalib::ThreadStackExecutor _summaryExecutor;
MockSharedThreadingService _shared_service;
TransLogServer _tls;
- storage::spi::dummy::DummyBucketExecutor _bucketExecutor;
static std::shared_ptr<ProtonConfig> make_proton_config() {
ProtonConfigBuilder proton_config;
@@ -210,7 +208,7 @@ public:
mgr.nextGeneration(_shared_service.transport(), 0ms);
return DocumentDB::create(_baseDir, mgr.getConfig(), _tlsSpec, _queryLimiter, docType, bucketSpace,
*b->getProtonConfigSP(), const_cast<DocumentDBFactory &>(*this),
- _shared_service, _bucketExecutor, _tls, _metricsWireService,
+ _shared_service, _tls, _metricsWireService,
_fileHeaderContext, _config_stores.getConfigStore(docType.toString()),
std::make_shared<vespalib::ThreadStackExecutor>(16, 128_Ki), HwInfo());
}
@@ -225,8 +223,7 @@ DocumentDBFactory::DocumentDBFactory(const vespalib::string &baseDir, int tlsLis
_metricsWireService(),
_summaryExecutor(8, 128_Ki),
_shared_service(_summaryExecutor, _summaryExecutor),
- _tls(_shared_service.transport(), "tls", tlsListenPort, baseDir, _fileHeaderContext),
- _bucketExecutor(2)
+ _tls(_shared_service.transport(), "tls", tlsListenPort, baseDir, _fileHeaderContext)
{}
DocumentDBFactory::~DocumentDBFactory() = default;
diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp
index ef8dc17dc0e..6ae9e7e2d52 100644
--- a/searchcore/src/tests/proton/docsummary/docsummary.cpp
+++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp
@@ -10,7 +10,6 @@
#include <vespa/eval/eval/tensor_spec.h>
#include <vespa/eval/eval/test/value_compare.h>
#include <vespa/eval/eval/value.h>
-#include <vespa/persistence/dummyimpl/dummy_bucket_executor.h>
#include <vespa/searchcore/proton/attribute/attribute_writer.h>
#include <vespa/searchcore/proton/docsummary/docsumcontext.h>
#include <vespa/searchcore/proton/docsummary/documentstoreadapter.h>
@@ -180,7 +179,6 @@ public:
vespalib::ThreadStackExecutor _summaryExecutor;
MockSharedThreadingService _shared_service;
TransLogServer _tls;
- storage::spi::dummy::DummyBucketExecutor _bucketExecutor;
bool _mkdirOk;
matching::QueryLimiter _queryLimiter;
DummyWireService _dummy;
@@ -200,7 +198,6 @@ public:
_summaryExecutor(8, 128_Ki),
_shared_service(_summaryExecutor, _summaryExecutor),
_tls(_shared_service.transport(), "tmp", 9013, ".", _fileHeaderContext),
- _bucketExecutor(2),
_mkdirOk(FastOS_File::MakeDirectory("tmpdb")),
_queryLimiter(),
_dummy(),
@@ -227,7 +224,7 @@ public:
}
_ddb = DocumentDB::create("tmpdb", _configMgr.getConfig(), "tcp/localhost:9013", _queryLimiter,
DocTypeName(docTypeName), makeBucketSpace(), *b->getProtonConfigSP(), *this,
- _shared_service, _bucketExecutor, _tls, _dummy, _fileHeaderContext,
+ _shared_service, _tls, _dummy, _fileHeaderContext,
std::make_unique<MemoryConfigStore>(),
std::make_shared<vespalib::ThreadStackExecutor>(16, 128_Ki), _hwInfo),
_ddb->start();
diff --git a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp
index 1f3be7511da..0edea2c0c5c 100644
--- a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp
@@ -6,7 +6,6 @@
#include <vespa/document/datatype/documenttype.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/test/make_bucket_space.h>
-#include <vespa/persistence/dummyimpl/dummy_bucket_executor.h>
#include <vespa/searchcore/proton/attribute/flushableattribute.h>
#include <vespa/searchcore/proton/common/statusreport.h>
#include <vespa/searchcore/proton/docsummary/summaryflushtarget.h>
@@ -122,7 +121,6 @@ struct Fixture : public FixtureBase {
vespalib::ThreadStackExecutor _summaryExecutor;
MockSharedThreadingService _shared_service;
HwInfo _hwInfo;
- storage::spi::dummy::DummyBucketExecutor _bucketExecutor;
DocumentDB::SP _db;
DummyFileHeaderContext _fileHeaderContext;
TransLogServer _tls;
@@ -146,7 +144,6 @@ Fixture::Fixture(bool file_config)
_summaryExecutor(8, 128_Ki),
_shared_service(_summaryExecutor, _summaryExecutor),
_hwInfo(),
- _bucketExecutor(2),
_db(),
_fileHeaderContext(),
_tls(_shared_service.transport(), "tmp", 9014, ".", _fileHeaderContext),
@@ -167,7 +164,7 @@ Fixture::Fixture(bool file_config)
mgr.nextGeneration(_shared_service.transport(), 0ms);
_db = DocumentDB::create(".", mgr.getConfig(), "tcp/localhost:9014", _queryLimiter, DocTypeName("typea"),
makeBucketSpace(),
- *b->getProtonConfigSP(), _myDBOwner, _shared_service, _bucketExecutor, _tls, _dummy,
+ *b->getProtonConfigSP(), _myDBOwner, _shared_service, _tls, _dummy,
_fileHeaderContext, make_config_store(),
std::make_shared<vespalib::ThreadStackExecutor>(16, 128_Ki), _hwInfo);
_db->start();
diff --git a/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp b/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp
index 8f8200486d7..a15bd8c67c9 100644
--- a/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp
+++ b/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp
@@ -1,18 +1,20 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/persistence/dummyimpl/dummy_bucket_executor.h>
#include <vespa/searchcore/config/config-proton.h>
#include <vespa/searchcore/proton/server/shared_threading_service.h>
#include <vespa/searchcore/proton/server/shared_threading_service_config.h>
#include <vespa/searchcore/proton/test/transport_helper.h>
+#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
-#include <vespa/vespalib/gtest/gtest.h>
+using ProtonConfig = vespa::config::search::core::ProtonConfig;
+using ProtonConfigBuilder = vespa::config::search::core::ProtonConfigBuilder;
using namespace proton;
+using storage::spi::dummy::DummyBucketExecutor;
using vespalib::ISequencedTaskExecutor;
using vespalib::SequencedTaskExecutor;
-using ProtonConfig = vespa::config::search::core::ProtonConfig;
-using ProtonConfigBuilder = vespa::config::search::core::ProtonConfigBuilder;
ProtonConfig
make_proton_config(double concurrency)
@@ -49,15 +51,18 @@ TEST(SharedThreadingServiceConfigTest, shared_threads_are_derived_from_cpu_cores
class SharedThreadingServiceTest : public ::testing::Test {
public:
Transport transport;
+ storage::spi::dummy::DummyBucketExecutor bucket_executor;
std::unique_ptr<SharedThreadingService> service;
SharedThreadingServiceTest()
: transport(),
+ bucket_executor(2),
service()
{ }
~SharedThreadingServiceTest() = default;
void setup(double concurrency, uint32_t cpu_cores) {
service = std::make_unique<SharedThreadingService>(
- SharedThreadingServiceConfig::make(make_proton_config(concurrency), HwInfo::Cpu(cpu_cores)), transport.transport());
+ SharedThreadingServiceConfig::make(make_proton_config(concurrency), HwInfo::Cpu(cpu_cores)),
+ transport.transport(), bucket_executor);
}
SequencedTaskExecutor* field_writer() {
return dynamic_cast<SequencedTaskExecutor*>(service->field_writer());
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
index 86dc38159e8..b42af79d946 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
@@ -591,7 +591,7 @@ MyBmNode::create_document_db(const BmClusterParams& params)
mgr.nextGeneration(_shared_service.transport(), 0ms);
_document_db = DocumentDB::create(_base_dir, mgr.getConfig(), _tls_spec, _query_limiter, _doc_type_name,
_bucket_space, *bootstrap_config->getProtonConfigSP(), _document_db_owner,
- _shared_service, *_persistence_engine, _tls,
+ _shared_service, _tls,
_metrics_wire_service, _file_header_context,
_config_stores.getConfigStore(_doc_type_name.toString()),
std::make_shared<vespalib::ThreadStackExecutor>(16, 128_Ki), HwInfo());
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index e4d75085027..60e9dc68573 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -145,7 +145,6 @@ DocumentDB::create(const vespalib::string &baseDir,
const ProtonConfig &protonCfg,
IDocumentDBOwner &owner,
ISharedThreadingService& shared_service,
- storage::spi::BucketExecutor &bucketExecutor,
const search::transactionlog::WriterFactory &tlsWriterFactory,
MetricsWireService &metricsWireService,
const search::common::FileHeaderContext &fileHeaderContext,
@@ -155,7 +154,7 @@ DocumentDB::create(const vespalib::string &baseDir,
{
return DocumentDB::SP(
new DocumentDB(baseDir, std::move(currentSnapshot), tlsSpec, queryLimiter, docTypeName, bucketSpace,
- protonCfg, owner, shared_service, bucketExecutor, tlsWriterFactory,
+ protonCfg, owner, shared_service, tlsWriterFactory,
metricsWireService, fileHeaderContext, std::move(config_store), initializeThreads, hwInfo));
}
DocumentDB::DocumentDB(const vespalib::string &baseDir,
@@ -167,7 +166,6 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir,
const ProtonConfig &protonCfg,
IDocumentDBOwner &owner,
ISharedThreadingService& shared_service,
- storage::spi::BucketExecutor & bucketExecutor,
const search::transactionlog::WriterFactory &tlsWriterFactory,
MetricsWireService &metricsWireService,
const FileHeaderContext &fileHeaderContext,
@@ -210,7 +208,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir,
_feedView(),
_refCount(),
_owner(owner),
- _bucketExecutor(bucketExecutor),
+ _bucketExecutor(shared_service.bucket_executor()),
_state(),
_dmUsageForwarder(_writeService.master()),
_writeFilter(),
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
index e3d467fc3c1..9dd9cf6542b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
@@ -202,7 +202,6 @@ private:
const ProtonConfig &protonCfg,
IDocumentDBOwner &owner,
ISharedThreadingService& shared_service,
- storage::spi::BucketExecutor &bucketExecutor,
const search::transactionlog::WriterFactory &tlsWriterFactory,
MetricsWireService &metricsWireService,
const search::common::FileHeaderContext &fileHeaderContext,
@@ -232,7 +231,6 @@ public:
const ProtonConfig &protonCfg,
IDocumentDBOwner &owner,
ISharedThreadingService& shared_service,
- storage::spi::BucketExecutor & bucketExecutor,
const search::transactionlog::WriterFactory &tlsWriterFactory,
MetricsWireService &metricsWireService,
const search::common::FileHeaderContext &fileHeaderContext,
diff --git a/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h
index 12377fd25e0..dccea41373f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h
+++ b/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h
@@ -3,6 +3,8 @@
class FNET_Transport;
+namespace storage::spi { struct BucketExecutor; }
+
namespace vespalib {
class ISequencedTaskExecutor;
class ThreadExecutor;
@@ -53,6 +55,12 @@ public:
*/
virtual FNET_Transport & transport() = 0;
+
+ /**
+ * Returns the executor for running a BucketTask in the persistence layer above the SPI.
+ */
+ virtual storage::spi::BucketExecutor& bucket_executor() = 0;
+
/**
* Return a very cheap clock.
*/
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 9bf6d19231e..0c281ed75c2 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -276,8 +276,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
setBucketCheckSumType(protonConfig);
setFS4Compression(protonConfig);
- _shared_service = std::make_unique<SharedThreadingService>(SharedThreadingServiceConfig::make(protonConfig, hwInfo.cpu()), _transport);
- _diskMemUsageSampler = std::make_unique<DiskMemUsageSampler>(_shared_service->transport(), protonConfig.basedir,
+ _diskMemUsageSampler = std::make_unique<DiskMemUsageSampler>(_transport, protonConfig.basedir,
diskMemUsageSamplerConfig(protonConfig, hwInfo));
_tls = std::make_unique<TLS>(_configUri.createWithNewId(protonConfig.tlsconfigid), _fileHeaderContext);
@@ -324,6 +323,8 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
_diskMemUsageSampler->notifier(),
protonConfig.visit.defaultserializedsize,
protonConfig.visit.ignoremaxbytes);
+ _shared_service = std::make_unique<SharedThreadingService>(
+ SharedThreadingServiceConfig::make(protonConfig, hwInfo.cpu()), _transport, *_persistenceEngine);
vespalib::string fileConfigId;
_compile_cache_executor_binding = vespalib::eval::CompileCache::bind(_shared_service->shared_raw());
@@ -612,7 +613,6 @@ Proton::addDocumentDB(const document::DocumentType &docType,
config,
*this,
*_shared_service,
- *_persistenceEngine,
*_tls->getTransLogServer(),
*_metricsEngine,
_fileHeaderContext,
diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
index e55282e31e8..c6f8ca923e0 100644
--- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
@@ -17,16 +17,18 @@ namespace proton {
using SharedFieldWriterExecutor = ThreadingServiceConfig::ProtonConfig::Feeding::SharedFieldWriterExecutor;
-SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfig& cfg, FNET_Transport & transport)
- :
- _transport(transport),
+SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfig& cfg,
+ FNET_Transport& transport,
+ storage::spi::BucketExecutor& bucket_executor)
+ : _transport(transport),
_warmup(cfg.warmup_threads(), 128_Ki, CpuUsage::wrap(proton_warmup_executor, CpuUsage::Category::COMPACT)),
_shared(std::make_shared<vespalib::BlockingThreadStackExecutor>(cfg.shared_threads(), 128_Ki,
- cfg.shared_task_limit(), proton_shared_executor)),
+ cfg.shared_task_limit(), proton_shared_executor)),
_field_writer(),
_invokeService(std::max(vespalib::adjustTimeoutByDetectedHz(1ms),
cfg.field_writer_config().reactionTime())),
_invokeRegistrations(),
+ _bucket_executor(bucket_executor),
_clock(_invokeService.nowRef())
{
const auto& fw_cfg = cfg.field_writer_config();
diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h
index 463823f10cb..04e30b0f9b3 100644
--- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h
+++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h
@@ -23,9 +23,12 @@ private:
std::unique_ptr<vespalib::ISequencedTaskExecutor> _field_writer;
vespalib::InvokeServiceImpl _invokeService;
std::vector<Registration> _invokeRegistrations;
+ storage::spi::BucketExecutor& _bucket_executor;
vespalib::Clock _clock;
public:
- SharedThreadingService(const SharedThreadingServiceConfig& cfg, FNET_Transport & transport);
+ SharedThreadingService(const SharedThreadingServiceConfig& cfg,
+ FNET_Transport& transport,
+ storage::spi::BucketExecutor& bucket_executor);
~SharedThreadingService() override;
std::shared_ptr<vespalib::Executor> shared_raw() { return _shared; }
@@ -36,6 +39,7 @@ public:
vespalib::ISequencedTaskExecutor* field_writer() override { return _field_writer.get(); }
vespalib::InvokeService & invokeService() override { return _invokeService; }
FNET_Transport & transport() override { return _transport; }
+ storage::spi::BucketExecutor& bucket_executor() override { return _bucket_executor; }
const vespalib::Clock & clock() const override { return _clock; }
};
diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.cpp
index ca9b89b1e60..e5d42b34370 100644
--- a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.cpp
+++ b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.cpp
@@ -4,11 +4,14 @@
namespace proton {
-MockSharedThreadingService::MockSharedThreadingService(ThreadExecutor& warmup_in, ThreadExecutor& shared_in)
+MockSharedThreadingService::MockSharedThreadingService(ThreadExecutor& warmup_in,
+ ThreadExecutor& shared_in,
+ size_t num_bucket_executors)
: _warmup(warmup_in),
_shared(shared_in),
_invokeService(10ms),
_transport(),
+ _bucket_executor(num_bucket_executors),
_clock(_invokeService.nowRef())
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h
index 35c8ee46de5..167d15d70eb 100644
--- a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h
+++ b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h
@@ -2,9 +2,10 @@
#pragma once
#include "transport_helper.h"
+#include <vespa/persistence/dummyimpl/dummy_bucket_executor.h>
#include <vespa/searchcore/proton/server/i_shared_threading_service.h>
-#include <vespa/vespalib/util/invokeserviceimpl.h>
#include <vespa/vespalib/util/clock.h>
+#include <vespa/vespalib/util/invokeserviceimpl.h>
namespace proton {
@@ -15,16 +16,19 @@ private:
ThreadExecutor & _shared;
vespalib::InvokeServiceImpl _invokeService;
Transport _transport;
+ storage::spi::dummy::DummyBucketExecutor _bucket_executor;
vespalib::Clock _clock;
public:
MockSharedThreadingService(ThreadExecutor& warmup_in,
- ThreadExecutor& shared_in);
+ ThreadExecutor& shared_in,
+ size_t num_bucket_executors = 2);
~MockSharedThreadingService() override;
ThreadExecutor& warmup() override { return _warmup; }
ThreadExecutor& shared() override { return _shared; }
vespalib::ISequencedTaskExecutor* field_writer() override { return nullptr; }
vespalib::InvokeService & invokeService() override { return _invokeService; }
FNET_Transport & transport() override { return _transport.transport(); }
+ storage::spi::BucketExecutor& bucket_executor() override { return _bucket_executor; }
const vespalib::Clock & clock() const override { return _clock; }
};