aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-01-23 22:00:01 +0100
committerGitHub <noreply@github.com>2021-01-23 22:00:01 +0100
commitfa4ba96f1ebb4d2e544ff0a8e5d9cc6c3a693f78 (patch)
tree2b3a1e84d51a35b529982ffec9699d663fb8b298
parent0d43351fa8c26eb7d319c0da6a7867d431f0c50e (diff)
Revert "Revert "Wire in the BucketExecutor.""
-rw-r--r--searchcore/src/apps/tests/persistenceconformance_test.cpp6
-rw-r--r--searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp1
-rw-r--r--searchcore/src/tests/proton/docsummary/docsummary.cpp5
-rw-r--r--searchcore/src/tests/proton/documentdb/documentdb_test.cpp22
-rw-r--r--searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp10
-rw-r--r--searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h2
-rw-r--r--searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp12
-rw-r--r--searchcore/src/vespa/searchcore/config/proton.def3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.cpp9
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.h3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp14
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp33
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp2
18 files changed, 100 insertions, 43 deletions
diff --git a/searchcore/src/apps/tests/persistenceconformance_test.cpp b/searchcore/src/apps/tests/persistenceconformance_test.cpp
index da194d1ff7b..6b4061081ea 100644
--- a/searchcore/src/apps/tests/persistenceconformance_test.cpp
+++ b/searchcore/src/apps/tests/persistenceconformance_test.cpp
@@ -9,6 +9,7 @@
#include <vespa/document/base/testdocman.h>
#include <vespa/fastos/file.h>
#include <vespa/persistence/conformancetest/conformancetest.h>
+#include <vespa/persistence/dummyimpl/dummy_bucket_executor.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/test/make_bucket_space.h>
#include <vespa/searchcommon/common/schemaconfigurer.h>
@@ -168,6 +169,7 @@ private:
mutable DummyWireService _metricsWireService;
mutable MemoryConfigStores _config_stores;
vespalib::ThreadStackExecutor _summaryExecutor;
+ storage::spi::dummy::DummyBucketExecutor _bucketExecutor;
public:
DocumentDBFactory(const vespalib::string &baseDir, int tlsListenPort);
@@ -206,6 +208,7 @@ public:
const_cast<DocumentDBFactory &>(*this),
_summaryExecutor,
_summaryExecutor,
+ _bucketExecutor,
_tls,
_metricsWireService,
_fileHeaderContext,
@@ -225,7 +228,8 @@ DocumentDBFactory::DocumentDBFactory(const vespalib::string &baseDir, int tlsLis
_queryLimiter(),
_clock(),
_metricsWireService(),
- _summaryExecutor(8, 128 * 1024)
+ _summaryExecutor(8, 128 * 1024),
+ _bucketExecutor(2)
{}
DocumentDBFactory::~DocumentDBFactory() = default;
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
index 861b2ad5307..90942e9aef4 100644
--- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
@@ -805,6 +805,7 @@ PersistenceProviderFixture::create_document_db(const BMParams & params)
_document_db_owner,
_summary_executor,
_summary_executor,
+ *_persistence_engine,
_tls,
_metrics_wire_service,
_file_header_context,
diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp
index e84cf338bcd..38e5e08ec02 100644
--- a/searchcore/src/tests/proton/docsummary/docsummary.cpp
+++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp
@@ -25,6 +25,7 @@
#include <vespa/searchcore/proton/server/searchview.h>
#include <vespa/searchcore/proton/server/summaryadapter.h>
#include <vespa/searchcore/proton/matching/querylimiter.h>
+#include <vespa/persistence/dummyimpl/dummy_bucket_executor.h>
#include <vespa/vespalib/util/destructor_callbacks.h>
#include <vespa/searchlib/engine/docsumapi.h>
#include <vespa/searchlib/index/docbuilder.h>
@@ -173,6 +174,7 @@ public:
DummyFileHeaderContext _fileHeaderContext;
TransLogServer _tls;
vespalib::ThreadStackExecutor _summaryExecutor;
+ storage::spi::dummy::DummyBucketExecutor _bucketExecutor;
bool _mkdirOk;
matching::QueryLimiter _queryLimiter;
vespalib::Clock _clock;
@@ -192,6 +194,7 @@ public:
_fileHeaderContext(),
_tls("tmp", 9013, ".", _fileHeaderContext),
_summaryExecutor(8, 128*1024),
+ _bucketExecutor(2),
_mkdirOk(FastOS_File::MakeDirectory("tmpdb")),
_queryLimiter(),
_clock(),
@@ -219,7 +222,7 @@ public:
}
_ddb = std::make_unique<DocumentDB>("tmpdb", _configMgr.getConfig(), "tcp/localhost:9013", _queryLimiter, _clock,
DocTypeName(docTypeName), makeBucketSpace(), *b->getProtonConfigSP(), *this,
- _summaryExecutor, _summaryExecutor, _tls, _dummy, _fileHeaderContext,
+ _summaryExecutor, _summaryExecutor, _bucketExecutor, _tls, _dummy, _fileHeaderContext,
std::make_unique<MemoryConfigStore>(),
std::make_shared<vespalib::ThreadStackExecutor>(16, 128 * 1024), _hwInfo),
_ddb->start();
diff --git a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp
index 4064fc24105..91046fbb567 100644
--- a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp
@@ -19,6 +19,7 @@
#include <vespa/searchcore/proton/server/document_db_explorer.h>
#include <vespa/searchcore/proton/server/documentdbconfigmanager.h>
#include <vespa/searchcore/proton/server/memoryconfigstore.h>
+#include <vespa/persistence/dummyimpl/dummy_bucket_executor.h>
#include <vespa/searchcorespi/index/indexflushtarget.h>
#include <vespa/searchlib/attribute/attribute_read_guard.h>
#include <vespa/searchlib/index/dummyfileheadercontext.h>
@@ -69,6 +70,7 @@ struct Fixture {
MyDBOwner _myDBOwner;
vespalib::ThreadStackExecutor _summaryExecutor;
HwInfo _hwInfo;
+ storage::spi::dummy::DummyBucketExecutor _bucketExecutor;
DocumentDB::SP _db;
DummyFileHeaderContext _fileHeaderContext;
TransLogServer _tls;
@@ -84,31 +86,31 @@ Fixture::Fixture()
_myDBOwner(),
_summaryExecutor(8, 128*1024),
_hwInfo(),
+ _bucketExecutor(2),
_db(),
_fileHeaderContext(),
_tls("tmp", 9014, ".", _fileHeaderContext),
_queryLimiter(),
_clock()
{
- DocumentDBConfig::DocumenttypesConfigSP documenttypesConfig(new DocumenttypesConfig());
+ auto documenttypesConfig = std::make_shared<DocumenttypesConfig>();
DocumentType docType("typea", 0);
- std::shared_ptr<const DocumentTypeRepo> repo(new DocumentTypeRepo(docType));
- TuneFileDocumentDB::SP tuneFileDocumentDB(new TuneFileDocumentDB);
+ auto repo = std::make_shared<DocumentTypeRepo>(docType);
+ auto tuneFileDocumentDB = std::make_shared<TuneFileDocumentDB>();
config::DirSpec spec(TEST_PATH("cfg"));
DocumentDBConfigHelper mgr(spec, "typea");
- BootstrapConfig::SP
- b(new BootstrapConfig(1, documenttypesConfig, repo,
+ auto b = std::make_shared<BootstrapConfig>(1, documenttypesConfig, repo,
std::make_shared<ProtonConfig>(),
std::make_shared<FiledistributorrpcConfig>(),
std::make_shared<BucketspacesConfig>(),
- tuneFileDocumentDB, HwInfo()));
+ tuneFileDocumentDB, HwInfo());
mgr.forwardConfig(b);
mgr.nextGeneration(0ms);
- _db.reset(new DocumentDB(".", mgr.getConfig(), "tcp/localhost:9014", _queryLimiter, _clock, DocTypeName("typea"),
+ _db = std::make_shared<DocumentDB>(".", mgr.getConfig(), "tcp/localhost:9014", _queryLimiter, _clock, DocTypeName("typea"),
makeBucketSpace(),
- *b->getProtonConfigSP(), _myDBOwner, _summaryExecutor, _summaryExecutor, _tls, _dummy,
- _fileHeaderContext, ConfigStore::UP(new MemoryConfigStore),
- std::make_shared<vespalib::ThreadStackExecutor>(16, 128 * 1024), _hwInfo));
+ *b->getProtonConfigSP(), _myDBOwner, _summaryExecutor, _summaryExecutor, _bucketExecutor, _tls, _dummy,
+ _fileHeaderContext, std::make_unique<MemoryConfigStore>(),
+ std::make_shared<vespalib::ThreadStackExecutor>(16, 128 * 1024), _hwInfo);
_db->start();
_db->waitForOnlineState();
}
diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp
index 926530c228e..af8086941fd 100644
--- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp
+++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp
@@ -46,23 +46,27 @@ JobTestBase::init(uint32_t allowedLidBloat,
{
_handler = std::make_unique<MyHandler>(maxOutstandingMoveOps != MAX_OUTSTANDING_MOVE_OPS, useBucketDB());
DocumentDBLidSpaceCompactionConfig compactCfg(interval, allowedLidBloat, allowedLidBloatFactor,
- REMOVE_BATCH_BLOCK_RATE, REMOVE_BLOCK_RATE, false);
+ REMOVE_BATCH_BLOCK_RATE, REMOVE_BLOCK_RATE, false, useBucketDB());
BlockableMaintenanceJobConfig blockableCfg(resourceLimitFactor, maxOutstandingMoveOps);
if (useBucketDB()) {
+ _singleExecutor = std::make_unique<vespalib::ThreadStackExecutor>(1, 0x10000);
+ _master = std::make_unique<proton::ExecutorThreadService> (*_singleExecutor);
_bucketExecutor = std::make_unique<storage::spi::dummy::DummyBucketExecutor>(4);
- _job = std::make_unique<lidspace::CompactionJob>(compactCfg, *_handler, _storer, *_bucketExecutor, _diskMemUsageNotifier,
- blockableCfg, _clusterStateHandler, nodeRetired,
+ _job = std::make_unique<lidspace::CompactionJob>(compactCfg, *_handler, _storer, *_master, *_bucketExecutor,
+ _diskMemUsageNotifier, blockableCfg, _clusterStateHandler, nodeRetired,
document::BucketSpace::placeHolder());
} else {
_job = std::make_unique<LidSpaceCompactionJob>(compactCfg, *_handler, _storer, _frozenHandler, _diskMemUsageNotifier,
blockableCfg, _clusterStateHandler, nodeRetired);
}
}
+
void
JobTestBase::sync() const {
if (_bucketExecutor) {
_bucketExecutor->sync();
+ _master->sync();
}
}
diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h
index 637314e2879..c81f8c8d387 100644
--- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h
+++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h
@@ -7,6 +7,8 @@
struct JobTestBase : public ::testing::TestWithParam<bool> {
std::unique_ptr<storage::spi::BucketExecutor> _bucketExecutor;
+ std::unique_ptr<vespalib::SyncableThreadExecutor> _singleExecutor;
+ std::unique_ptr<searchcorespi::index::IThreadService> _master;
std::unique_ptr<MyHandler> _handler;
MyStorer _storer;
MyFrozenBucketHandler _frozenHandler;
diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
index d6fddf16659..ab447d5d38f 100644
--- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
@@ -862,14 +862,10 @@ void
MaintenanceControllerFixture::injectMaintenanceJobs()
{
if (_injectDefaultJobs) {
- MaintenanceJobsInjector::injectJobs(_mc, *_mcCfg, _fh, _gsp,
- _lscHandlers, _fh, _mc, _bucketCreateNotifier, _docTypeName.getName(), makeBucketSpace(),
- _fh, _fh, _bmc, _clusterStateHandler, _bucketHandler,
- _calc,
- _diskMemUsageNotifier,
- _jobTrackers,
- _readyAttributeManager,
- _notReadyAttributeManager,
+ MaintenanceJobsInjector::injectJobs(_mc, *_mcCfg, _bucketExecutor, _fh, _gsp, _lscHandlers, _fh, _mc,
+ _bucketCreateNotifier, _docTypeName.getName(), makeBucketSpace(), _fh, _fh,
+ _bmc, _clusterStateHandler, _bucketHandler, _calc, _diskMemUsageNotifier,
+ _jobTrackers, _readyAttributeManager, _notReadyAttributeManager,
std::make_unique<const AttributeConfigInspector>(AttributesConfigBuilder()),
std::make_shared<TransientMemoryUsageProvider>(),
_attributeUsageFilter);
diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def
index c12500570bc..17d0ef8ad37 100644
--- a/searchcore/src/vespa/searchcore/config/proton.def
+++ b/searchcore/src/vespa/searchcore/config/proton.def
@@ -394,6 +394,9 @@ lidspacecompaction.removebatchblockrate double default=0.5
## It is considered again at the next regular interval (see above).
lidspacecompaction.removeblockrate double default=100.0
+## Set to true to enable bucket locking via content layer
+lidspacecompaction.usebucketexecutor bool default=false
+
## This is the maximum value visibilitydelay you can have.
## A to higher value here will cost more memory while not improving too much.
maxvisibilitydelay double default=1.0
diff --git a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.cpp b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.cpp
index f365b6ae02d..2f41ca83a74 100644
--- a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.cpp
@@ -55,7 +55,8 @@ DocumentDBLidSpaceCompactionConfig::DocumentDBLidSpaceCompactionConfig()
_allowedLidBloatFactor(1.0),
_remove_batch_block_rate(0.5),
_remove_block_rate(100),
- _disabled(false)
+ _disabled(false),
+ _useBucketExecutor(false)
{
}
@@ -64,14 +65,16 @@ DocumentDBLidSpaceCompactionConfig::DocumentDBLidSpaceCompactionConfig(vespalib:
double allowedLidBloatFactor,
double remove_batch_block_rate,
double remove_block_rate,
- bool disabled)
+ bool disabled,
+ bool useBucketExecutor)
: _delay(std::min(MAX_DELAY_SEC, interval)),
_interval(interval),
_allowedLidBloat(allowedLidBloat),
_allowedLidBloatFactor(allowedLidBloatFactor),
_remove_batch_block_rate(remove_batch_block_rate),
_remove_block_rate(remove_block_rate),
- _disabled(disabled)
+ _disabled(disabled),
+ _useBucketExecutor(useBucketExecutor)
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h
index 731add1f62c..9bdb8fa0ae6 100644
--- a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h
+++ b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h
@@ -50,6 +50,7 @@ private:
double _remove_batch_block_rate;
double _remove_block_rate;
bool _disabled;
+ bool _useBucketExecutor;
public:
DocumentDBLidSpaceCompactionConfig();
@@ -58,7 +59,8 @@ public:
double allowwedLidBloatFactor,
double remove_batch_block_rate,
double remove_block_rate,
- bool disabled);
+ bool disabled,
+ bool useBucketExecutor);
static DocumentDBLidSpaceCompactionConfig createDisabled();
bool operator==(const DocumentDBLidSpaceCompactionConfig &rhs) const;
@@ -69,6 +71,7 @@ public:
double get_remove_batch_block_rate() const { return _remove_batch_block_rate; }
double get_remove_block_rate() const { return _remove_block_rate; }
bool isDisabled() const { return _disabled; }
+ bool useBucketExecutor() const { return _useBucketExecutor; }
};
class BlockableMaintenanceJobConfig {
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index 7c4296db304..7fb16e851fa 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -134,6 +134,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir,
IDocumentDBOwner &owner,
vespalib::SyncableThreadExecutor &warmupExecutor,
vespalib::ThreadStackExecutorBase &sharedExecutor,
+ storage::spi::BucketExecutor & bucketExecutor,
const search::transactionlog::WriterFactory &tlsWriterFactory,
MetricsWireService &metricsWireService,
const FileHeaderContext &fileHeaderContext,
@@ -174,6 +175,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir,
_refCount(),
_syncFeedViewEnabled(false),
_owner(owner),
+ _bucketExecutor(bucketExecutor),
_state(),
_dmUsageForwarder(_writeService.master()),
_writeFilter(),
@@ -934,6 +936,7 @@ DocumentDB::injectMaintenanceJobs(const DocumentDBMaintenanceConfig &config, std
_maintenanceController.killJobs();
MaintenanceJobsInjector::injectJobs(_maintenanceController,
config,
+ _bucketExecutor,
*_feedHandler, // IHeartBeatHandler
*_sessionManager, // ISessionCachePruner
_lidSpaceCompactionHandlers,
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
index 9ea88360b07..6ed876f197d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
@@ -45,6 +45,7 @@ namespace metrics {
class UpdateHook;
class MetricLockGuard;
}
+namespace storage::spi { struct BucketExecutor; }
namespace proton {
class AttributeConfigInspector;
@@ -115,6 +116,7 @@ private:
MonitoredRefCount _refCount;
bool _syncFeedViewEnabled;
IDocumentDBOwner &_owner;
+ storage::spi::BucketExecutor &_bucketExecutor;
DDBState _state;
DiskMemUsageForwarder _dmUsageForwarder;
AttributeUsageFilter _writeFilter;
@@ -233,6 +235,7 @@ public:
IDocumentDBOwner &owner,
vespalib::SyncableThreadExecutor &warmupExecutor,
vespalib::ThreadStackExecutorBase &sharedExecutor,
+ storage::spi::BucketExecutor & bucketExecutor,
const search::transactionlog::WriterFactory &tlsWriterFactory,
MetricsWireService &metricsWireService,
const search::common::FileHeaderContext &fileHeaderContext,
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp
index c5587b33f6b..115a49fe997 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp
@@ -142,7 +142,8 @@ buildMaintenanceConfig(const BootstrapConfig::SP &bootstrapConfig,
proton.lidspacecompaction.allowedlidbloatfactor,
proton.lidspacecompaction.removebatchblockrate,
proton.lidspacecompaction.removeblockrate,
- isDocumentTypeGlobal),
+ isDocumentTypeGlobal,
+ proton.lidspacecompaction.usebucketexecutor),
AttributeUsageFilterConfig(
proton.writefilter.attribute.enumstorelimit,
proton.writefilter.attribute.multivaluelimit),
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp
index 51b84d5ec6e..5ccc2793d6b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp
@@ -5,14 +5,17 @@
#include "i_lid_space_compaction_handler.h"
#include "i_operation_storer.h"
#include <vespa/searchcore/proton/feedoperation/moveoperation.h>
+#include <vespa/searchcorespi/index/i_thread_service.h>
#include <vespa/persistence/spi/bucket_tasks.h>
#include <vespa/vespalib/util/destructor_callbacks.h>
+#include <vespa/vespalib/util/lambdatask.h>
#include <cassert>
using search::DocumentMetaData;
using search::LidUsageStats;
using storage::spi::makeBucketTask;
using storage::spi::Bucket;
+using vespalib::makeLambdaTask;
namespace proton::lidspace {
@@ -44,16 +47,20 @@ CompactionJob::moveDocument(const search::DocumentMetaData & metaThen, std::shar
if (metaNow.lid != metaThen.lid) return;
if (metaNow.bucketId != metaThen.bucketId) return;
- MoveOperation::UP op = _handler.createMoveOperation(metaNow, _handler.getLidStatus().getLowestFreeLid());
+ auto op = _handler.createMoveOperation(metaNow, 0); // The real lid must be sampled in the master thread.
if (!op) return;
- _opStorer.appendOperation(*op, context);
- _handler.handleMove(*op, std::move(context));
+ _master.execute(makeLambdaTask([this, moveOp=std::move(op), onDone=std::move(context)]() {
+ moveOp->setTargetLid(_handler.getLidStatus().getLowestFreeLid());
+ _opStorer.appendOperation(*moveOp, onDone);
+ _handler.handleMove(*moveOp, std::move(onDone));
+ }));
}
CompactionJob::CompactionJob(const DocumentDBLidSpaceCompactionConfig &config,
ILidSpaceCompactionHandler &handler,
IOperationStorer &opStorer,
+ IThreadService & master,
BucketExecutor & bucketExecutor,
IDiskMemUsageNotifier &diskMemUsageNotifier,
const BlockableMaintenanceJobConfig &blockableConfig,
@@ -62,6 +69,7 @@ CompactionJob::CompactionJob(const DocumentDBLidSpaceCompactionConfig &config,
document::BucketSpace bucketSpace)
: LidSpaceCompactionJobBase(config, handler, opStorer, diskMemUsageNotifier,
blockableConfig, clusterStateChangedNotifier, nodeRetired),
+ _master(master),
_bucketExecutor(bucketExecutor),
_bucketSpace(bucketSpace)
{
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h
index 96e6f0a0b7e..3a61ee85a87 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h
@@ -6,6 +6,7 @@
#include <vespa/document/bucket/bucketspace.h>
namespace storage::spi { struct BucketExecutor;}
+namespace searchcorespi::index { class IThreadService; }
namespace proton {
class IDiskMemUsageNotifier;
class IClusterStateChangedNotifier;
@@ -22,8 +23,10 @@ class CompactionJob : public LidSpaceCompactionJobBase
private:
using BucketExecutor = storage::spi::BucketExecutor;
using IDestructorCallback = vespalib::IDestructorCallback;
- BucketExecutor &_bucketExecutor;
- document::BucketSpace _bucketSpace;
+ using IThreadService = searchcorespi::index::IThreadService;
+ IThreadService & _master;
+ BucketExecutor &_bucketExecutor;
+ document::BucketSpace _bucketSpace;
bool scanDocuments(const search::LidUsageStats &stats) override;
void moveDocument(const search::DocumentMetaData & meta, std::shared_ptr<IDestructorCallback> onDone);
@@ -32,6 +35,7 @@ public:
CompactionJob(const DocumentDBLidSpaceCompactionConfig &config,
ILidSpaceCompactionHandler &handler,
IOperationStorer &opStorer,
+ IThreadService & master,
BucketExecutor & bucketExecutor,
IDiskMemUsageNotifier &diskMemUsageNotifier,
const BlockableMaintenanceJobConfig &blockableConfig,
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
index c9ac5dd3c49..2f45b530092 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
@@ -4,6 +4,7 @@
#include "heart_beat_job.h"
#include "job_tracked_maintenance_job.h"
#include "lid_space_compaction_job.h"
+#include "lid_space_compaction_job_take2.h"
#include "maintenance_jobs_injector.h"
#include "prune_session_cache_job.h"
#include "pruneremoveddocumentsjob.h"
@@ -25,6 +26,7 @@ trackJob(const IJobTracker::SP &tracker, IMaintenanceJob::UP job)
void
injectLidSpaceCompactionJobs(MaintenanceController &controller,
const DocumentDBMaintenanceConfig &config,
+ storage::spi::BucketExecutor & bucketExecutor,
const ILidSpaceCompactionHandler::Vector &lscHandlers,
IOperationStorer &opStorer,
IFrozenBucketHandler &fbHandler,
@@ -34,15 +36,27 @@ injectLidSpaceCompactionJobs(MaintenanceController &controller,
const std::shared_ptr<IBucketStateCalculator> &calc,
document::BucketSpace bucketSpace)
{
- (void) bucketSpace;
for (auto &lidHandler : lscHandlers) {
- IMaintenanceJob::UP job = std::make_unique<LidSpaceCompactionJob>(
- config.getLidSpaceCompactionConfig(),
- *lidHandler, opStorer, fbHandler,
- diskMemUsageNotifier,
- config.getBlockableJobConfig(),
- clusterStateChangedNotifier,
- (calc ? calc->nodeRetired() : false));
+
+ IMaintenanceJob::UP job;
+ if (config.getLidSpaceCompactionConfig().useBucketExecutor()) {
+ job = std::make_unique<LidSpaceCompactionJob>(
+ config.getLidSpaceCompactionConfig(),
+ *lidHandler, opStorer, fbHandler,
+ diskMemUsageNotifier,
+ config.getBlockableJobConfig(),
+ clusterStateChangedNotifier,
+ (calc ? calc->nodeRetired() : false));
+ } else {
+ job = std::make_unique<lidspace::CompactionJob>(
+ config.getLidSpaceCompactionConfig(),
+ *lidHandler, opStorer, controller.masterThread(), bucketExecutor,
+ diskMemUsageNotifier,
+ config.getBlockableJobConfig(),
+ clusterStateChangedNotifier,
+ (calc ? calc->nodeRetired() : false),
+ bucketSpace);
+ }
controller.registerJobInMasterThread(trackJob(tracker, std::move(job)));
}
}
@@ -82,6 +96,7 @@ injectBucketMoveJob(MaintenanceController &controller,
void
MaintenanceJobsInjector::injectJobs(MaintenanceController &controller,
const DocumentDBMaintenanceConfig &config,
+ storage::spi::BucketExecutor & bucketExecutor,
IHeartBeatHandler &hbHandler,
matching::ISessionCachePruner &scPruner,
const ILidSpaceCompactionHandler::Vector &lscHandlers,
@@ -111,7 +126,7 @@ MaintenanceJobsInjector::injectJobs(MaintenanceController &controller,
controller.registerJobInMasterThread(
trackJob(jobTrackers.getRemovedDocumentsPrune(), std::move(pruneRDjob)));
if (!config.getLidSpaceCompactionConfig().isDisabled()) {
- injectLidSpaceCompactionJobs(controller, config, lscHandlers, opStorer, fbHandler,
+ injectLidSpaceCompactionJobs(controller, config, bucketExecutor, lscHandlers, opStorer, fbHandler,
jobTrackers.getLidSpaceCompact(), diskMemUsageNotifier,
clusterStateChangedNotifier, calc, bucketSpace);
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h
index 3468ec40923..e35f9e1aa48 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h
@@ -9,6 +9,7 @@
#include <vespa/searchcore/proton/matching/isessioncachepruner.h>
#include <vespa/searchcore/proton/metrics/documentdb_job_trackers.h>
+namespace storage::spi {struct BucketExecutor; }
namespace proton {
class AttributeConfigInspector;
@@ -33,6 +34,7 @@ struct MaintenanceJobsInjector
using IAttributeManagerSP = std::shared_ptr<IAttributeManager>;
static void injectJobs(MaintenanceController &controller,
const DocumentDBMaintenanceConfig &config,
+ storage::spi::BucketExecutor & bucketExecutor,
IHeartBeatHandler &hbHandler,
matching::ISessionCachePruner &scPruner,
const ILidSpaceCompactionHandler::Vector &lscHandlers,
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 7d97214244d..6706581f52c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -602,7 +602,7 @@ Proton::addDocumentDB(const document::DocumentType &docType,
}
auto ret = std::make_shared<DocumentDB>(config.basedir + "/documents", documentDBConfig, config.tlsspec,
_queryLimiter, _clock, docTypeName, bucketSpace, config, *this,
- *_warmupExecutor, *_sharedExecutor, *_tls->getTransLogServer(),
+ *_warmupExecutor, *_sharedExecutor, *_persistenceEngine, *_tls->getTransLogServer(),
*_metricsEngine, _fileHeaderContext, std::move(config_store),
initializeThreads, bootstrapConfig->getHwInfo());
try {