diff options
author | Geir Storli <geirstorli@yahoo.no> | 2017-09-14 21:59:49 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-09-14 21:59:49 +0200 |
commit | b0154a2c2259e67b2923823daf52724bebee1c07 (patch) | |
tree | 4071f1ab7e1fd6df149e6d6787ed703e2dd16a7f | |
parent | 8d91d6bfc7dec56b1bab3f81be595b2f6fba7780 (diff) | |
parent | 8435cc6d567d8681d45a784755fd981eb80e4193 (diff) |
Merge pull request #3405 from vespa-engine/toregge/let-split-and-join-handler-pass-new-buckets-to-bucket-move-job-for-reconsideration
Documents moved to new buckets due to split or join might trigger a need
22 files changed, 283 insertions, 14 deletions
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp index 34c7677257e..5dbf5d523bc 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/searchcore/proton/bucketdb/bucketdbhandler.h> +#include <vespa/searchcore/proton/bucketdb/bucket_create_notifier.h> #include <vespa/searchcore/proton/common/bucketfactory.h> #include <vespa/searchcore/proton/feedoperation/moveoperation.h> #include <vespa/searchcore/proton/server/bucketmovejob.h> @@ -22,6 +23,7 @@ using document::Document; using document::DocumentId; using document::DocumentTypeRepo; using document::GlobalId; +using proton::bucketdb::BucketCreateNotifier; using search::DocumentIdT; using search::DocumentMetaData; using search::IDestructorCallback; @@ -546,6 +548,7 @@ struct ControllerFixtureBase MySubDb _ready; MySubDb _notReady; MyFrozenBucketHandler _fbh; + BucketCreateNotifier _bucketCreateNotifier; test::DiskMemUsageNotifier _diskMemUsageNotifier; BucketMoveJob _bmj; MyCountJobRunner _runner; @@ -598,6 +601,10 @@ struct ControllerFixtureBase const BucketIdVector &calcAsked() const { return _calc->asked(); } + void runLoop() { + while (!_bmj.isBlocked() && !_bmj.run()) { + } + } }; ControllerFixtureBase::ControllerFixtureBase(const BlockableMaintenanceJobConfig &blockableConfig, bool storeMoveDoneContexts) @@ -610,9 +617,10 @@ ControllerFixtureBase::ControllerFixtureBase(const BlockableMaintenanceJobConfig _ready(_builder.getRepo(), _bucketDB, 1, SubDbType::READY), _notReady(_builder.getRepo(), _bucketDB, 2, SubDbType::NOTREADY), _fbh(), + _bucketCreateNotifier(), _diskMemUsageNotifier(), _bmj(_calc, _moveHandler, _modifiedHandler, _ready._subDb, - _notReady._subDb, _fbh, _clusterStateHandler, _bucketHandler, + _notReady._subDb, _fbh, _bucketCreateNotifier, _clusterStateHandler, _bucketHandler, _diskMemUsageNotifier, blockableConfig, "test"), _runner(_bmj) @@ -1215,6 +1223,25 @@ TEST_F("explicitly active not ready bucket can be moved to ready even if node is EXPECT_EQUAL(f._notReady.bucket(3), f.bucketsModified()[0]); } +TEST_F("require that notifyCreateBucket causes bucket to be reconsidered by job", ControllerFixture) +{ + EXPECT_FALSE(f._bmj.done()); + f.addReady(f._ready.bucket(1)); + f.addReady(f._ready.bucket(2)); + f.runLoop(); + EXPECT_TRUE(f._bmj.done()); + EXPECT_TRUE(f.docsMoved().empty()); + EXPECT_TRUE(f.bucketsModified().empty()); + f.addReady(f._notReady.bucket(3)); // bucket 3 now ready, no notify + EXPECT_TRUE(f._bmj.done()); // move job still believes work done + f._bmj.notifyCreateBucket(f._notReady.bucket(3)); // reconsider bucket 3 + EXPECT_FALSE(f._bmj.done()); + f.runLoop(); + EXPECT_TRUE(f._bmj.done()); + EXPECT_EQUAL(1u, f.bucketsModified().size()); + EXPECT_EQUAL(2u, f.docsMoved().size()); +} + struct ResourceLimitControllerFixture : public ControllerFixture { ResourceLimitControllerFixture(double resourceLimitFactor = RESOURCE_LIMIT_FACTOR) : diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index 04ee9669d73..0b48686bbdd 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -4,6 +4,7 @@ #include <vespa/searchcore/proton/attribute/i_attribute_manager.h> #include <vespa/searchcore/proton/common/doctypename.h> #include <vespa/searchcore/proton/common/feedtoken.h> +#include <vespa/searchcore/proton/bucketdb/bucket_create_notifier.h> #include <vespa/searchcore/proton/feedoperation/moveoperation.h> #include <vespa/searchcore/proton/feedoperation/pruneremoveddocumentsoperation.h> #include <vespa/searchcore/proton/feedoperation/putoperation.h> @@ -43,6 +44,7 @@ using document::Document; using document::DocumentId; using fastos::ClockSystem; using fastos::TimeStamp; +using proton::bucketdb::BucketCreateNotifier; using proton::matching::ISessionCachePruner; using search::AttributeGuard; using search::DocumentIdT; @@ -430,6 +432,7 @@ public: std::shared_ptr<proton::IAttributeManager> _notReadyAttributeManager; AttributeUsageFilter _attributeUsageFilter; test::DiskMemUsageNotifier _diskMemUsageNotifier; + BucketCreateNotifier _bucketCreateNotifier; MaintenanceController _mc; MaintenanceControllerFixture(); @@ -891,6 +894,7 @@ MaintenanceControllerFixture::MaintenanceControllerFixture() _readyAttributeManager(std::make_shared<MyAttributeManager>()), _notReadyAttributeManager(std::make_shared<MyAttributeManager>()), _attributeUsageFilter(), + _bucketCreateNotifier(), _mc(_threadService, _genericExecutor, _docTypeName) { std::vector<MyDocumentSubDB *> subDBs; @@ -958,7 +962,7 @@ MaintenanceControllerFixture::injectMaintenanceJobs() { if (_injectDefaultJobs) { MaintenanceJobsInjector::injectJobs(_mc, *_mcCfg, _fh, _gsp, - _lscHandlers, _fh, _mc, _docTypeName.getName(), + _lscHandlers, _fh, _mc, _bucketCreateNotifier, _docTypeName.getName(), _fh, _fh, _bmc, _clusterStateHandler, _bucketHandler, _calc, _diskMemUsageNotifier, diff --git a/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp b/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp index a389e702c69..9bcc63ad8d6 100644 --- a/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp +++ b/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp @@ -4,6 +4,7 @@ #include <vespa/searchcore/proton/documentmetastore/documentmetastore.h> #include <vespa/searchcore/proton/flushengine/shrink_lid_space_flush_target.h> #include <vespa/searchcore/proton/bucketdb/bucketdbhandler.h> +#include <vespa/searchcore/proton/bucketdb/i_bucket_create_listener.h> #include <vespa/searchlib/attribute/attributefilesavetarget.h> #include <vespa/searchlib/fef/matchdatalayout.h> #include <vespa/searchlib/queryeval/simpleresult.h> @@ -12,6 +13,7 @@ #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/test/insertion_operators.h> #include <vespa/searchcore/proton/server/itlssyncer.h> #include <vespa/searchcore/proton/common/hw_info.h> #include <vespa/searchlib/query/queryterm.h> @@ -20,6 +22,7 @@ LOG_SETUP("documentmetastore_test"); using namespace document; using proton::bucketdb::BucketState; +using proton::bucketdb::IBucketCreateListener; using search::AttributeFileSaveTarget; using search::AttributeGuard; using search::AttributeVector; @@ -1132,6 +1135,30 @@ struct GlobalIdEntry { typedef std::vector<GlobalIdEntry> GlobalIdVector; +struct MyBucketCreateListener : public IBucketCreateListener +{ + std::vector<document::BucketId> _buckets; + + MyBucketCreateListener(); + ~MyBucketCreateListener(); + virtual void notifyCreateBucket(const document::BucketId &bucket) override; +}; + +MyBucketCreateListener::MyBucketCreateListener() +{ +} + +MyBucketCreateListener::~MyBucketCreateListener() +{ +} + + +void +MyBucketCreateListener::notifyCreateBucket(const document::BucketId &bucket) +{ + _buckets.emplace_back(bucket); +} + struct SplitAndJoinEmptyFixture { DocumentMetaStore dms; @@ -1146,6 +1173,7 @@ struct SplitAndJoinEmptyFixture BucketId bid34; // contained in bid10 and bid20 BucketId bid36; // contained in bid10 and bid22 bucketdb::BucketDBHandler _bucketDBHandler; + MyBucketCreateListener _bucketCreateListener; SplitAndJoinEmptyFixture(); ~SplitAndJoinEmptyFixture(); @@ -1153,6 +1181,10 @@ struct SplitAndJoinEmptyFixture BucketInfo getInfo(const BucketId &bid) const { return dms.getBucketDB().takeGuard()->get(bid); } + + void assertNotifyCreateBuckets(std::vector<document::BucketId> expBuckets) { + EXPECT_EQUAL(expBuckets, _bucketCreateListener._buckets); + } }; SplitAndJoinEmptyFixture::SplitAndJoinEmptyFixture() @@ -1160,11 +1192,16 @@ SplitAndJoinEmptyFixture::SplitAndJoinEmptyFixture() bid10(1, 0), bid11(1, 1), bid20(2, 0), bid21(2, 1), bid22(2, 2), bid23(2, 3), bid30(3, 0), bid32(3, 2), bid34(3, 4), bid36(3, 6), - _bucketDBHandler(dms.getBucketDB()) + _bucketDBHandler(dms.getBucketDB()), + _bucketCreateListener() { _bucketDBHandler.addDocumentMetaStore(&dms, 0); + _bucketDBHandler.getBucketCreateNotifier().addListener(&_bucketCreateListener); +} +SplitAndJoinEmptyFixture::~SplitAndJoinEmptyFixture() +{ + _bucketDBHandler.getBucketCreateNotifier().removeListener(&_bucketCreateListener); } -SplitAndJoinEmptyFixture::~SplitAndJoinEmptyFixture() {} struct SplitAndJoinFixture : public SplitAndJoinEmptyFixture @@ -1314,6 +1351,7 @@ TEST("requireThatBucketInfoIsCorrectAfterSplit") EXPECT_EQUAL(bi11.getDocumentCount(), bi21.getDocumentCount() + bi23.getDocumentCount()); + f.assertNotifyCreateBuckets({ f.bid21, f.bid23 }); } TEST("requireThatActiveStateIsPreservedAfterSplit") @@ -1460,6 +1498,7 @@ TEST("requireThatBucketInfoIsCorrectAfterJoin") EXPECT_EQUAL(bi21.getDocumentCount() + bi23.getDocumentCount(), bi11.getDocumentCount()); + f.assertNotifyCreateBuckets({ f.bid11 }); } TEST("requireThatActiveStateIsPreservedAfterJoin") diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt index 16758449743..6cb22e6d1e8 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt @@ -1,6 +1,7 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(searchcore_bucketdb STATIC SOURCES + bucket_create_notifier.cpp bucket_db_explorer.cpp bucket_db_owner.cpp bucketdb.cpp diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucket_create_notifier.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/bucket_create_notifier.cpp new file mode 100644 index 00000000000..d8789e8bd6b --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucket_create_notifier.cpp @@ -0,0 +1,46 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bucket_create_notifier.h" +#include "i_bucket_create_listener.h" +#include <algorithm> +#include <cassert> + +using document::BucketId; + +namespace proton::bucketdb { + +BucketCreateNotifier::BucketCreateNotifier() + : _listeners() +{ +} + +BucketCreateNotifier::~BucketCreateNotifier() +{ + assert(_listeners.empty()); +} + +void +BucketCreateNotifier::notifyCreateBucket(const BucketId &bucket) +{ + for (const auto &listener : _listeners) { + listener->notifyCreateBucket(bucket); + } +} + +void +BucketCreateNotifier::addListener(IBucketCreateListener *listener) +{ + _listeners.push_back(listener); +} + +void +BucketCreateNotifier::removeListener(IBucketCreateListener *listener) +{ + auto it = std::find(_listeners.begin(), _listeners.end(), listener); + if (it != _listeners.end()) { + _listeners.erase(it); + } +} + +} + diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucket_create_notifier.h b/searchcore/src/vespa/searchcore/proton/bucketdb/bucket_create_notifier.h new file mode 100644 index 00000000000..50e53743e26 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucket_create_notifier.h @@ -0,0 +1,29 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "i_bucket_create_notifier.h" +#include <vector> + +namespace proton::bucketdb { + +class IBucketCreateListener; + +/** + * Class used to (un)register a listener to get notifications about + * non-empty buckets created due to split/join operations. + */ +class BucketCreateNotifier : public IBucketCreateNotifier +{ + std::vector<IBucketCreateListener *> _listeners; + +public: + BucketCreateNotifier(); + virtual ~BucketCreateNotifier() override; + + virtual void notifyCreateBucket(const document::BucketId &bucket) override; + virtual void addListener(IBucketCreateListener *listener) override; + virtual void removeListener(IBucketCreateListener *listener) override; +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdbhandler.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdbhandler.cpp index 9f598186ac4..f8fe7227f03 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdbhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdbhandler.cpp @@ -4,12 +4,18 @@ #include "splitbucketsession.h" #include "joinbucketssession.h" #include <vespa/searchcore/proton/documentmetastore/i_document_meta_store.h> +#include <algorithm> namespace proton::bucketdb { BucketDBHandler::BucketDBHandler(BucketDBOwner &bucketDB) : _bucketDB(bucketDB), - _dmsv() + _dmsv(), + _bucketCreateNotifier() +{ +} + +BucketDBHandler::~BucketDBHandler() { } @@ -42,7 +48,7 @@ BucketDBHandler::handleSplit(search::SerialNum serialNum, assert(!target1.contains(target2)); assert(!target2.contains(target1)); } - SplitBucketSession session(_bucketDB, source, target1, target2); + SplitBucketSession session(_bucketDB, _bucketCreateNotifier, source, target1, target2); session.setup(); for (auto &desc : _dmsv) { IDocumentMetaStore *dms = desc._dms; @@ -63,7 +69,7 @@ BucketDBHandler::handleJoin(search::SerialNum serialNum, const BucketId &target) { // Called by writer thread - JoinBucketsSession session(_bucketDB, source1, source2, target); + JoinBucketsSession session(_bucketDB, _bucketCreateNotifier, source1, source2, target); session.setup(); for (auto &desc : _dmsv) { IDocumentMetaStore *dms = desc._dms; diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdbhandler.h b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdbhandler.h index 5b621a2b332..6ab6b8d8e36 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdbhandler.h +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdbhandler.h @@ -5,6 +5,7 @@ #include "bucket_db_owner.h" #include "ibucketdbhandler.h" #include "ibucketdbhandlerinitializer.h" +#include "bucket_create_notifier.h" namespace proton::bucketdb { @@ -30,10 +31,13 @@ private: BucketDBOwner &_bucketDB; std::vector<MetaStoreDesc> _dmsv; + BucketCreateNotifier _bucketCreateNotifier; public: BucketDBHandler(BucketDBOwner &bucketDB); + ~BucketDBHandler(); + void setBucketDB(BucketDBOwner &bucketDB); @@ -58,6 +62,8 @@ public: virtual void handleDeleteBucket(const BucketId &bucketId) override; + + IBucketCreateNotifier &getBucketCreateNotifier() { return _bucketCreateNotifier; } }; } diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketsessionbase.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketsessionbase.cpp index 32ecade5fb1..81ce49b5b69 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketsessionbase.cpp +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketsessionbase.cpp @@ -4,8 +4,9 @@ namespace proton::bucketdb { -BucketSessionBase::BucketSessionBase(BucketDBOwner &bucketDB) - : _bucketDB(bucketDB.takeGuard()) +BucketSessionBase::BucketSessionBase(BucketDBOwner &bucketDB, IBucketCreateNotifier &bucketCreateNotifier) + : _bucketDB(bucketDB.takeGuard()), + _bucketCreateNotifier(bucketCreateNotifier) { } diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketsessionbase.h b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketsessionbase.h index f75dfc8e9d2..6e2c0043ef9 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketsessionbase.h +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketsessionbase.h @@ -6,6 +6,8 @@ namespace proton::bucketdb { +class IBucketCreateNotifier; + /** * Base class for split/join handling utility classes that bundles temporary * variables used during the operation. @@ -19,9 +21,10 @@ public: protected: BucketDBOwner::Guard _bucketDB; + IBucketCreateNotifier &_bucketCreateNotifier; public: - BucketSessionBase(BucketDBOwner &bucketDB); + BucketSessionBase(BucketDBOwner &bucketDB, IBucketCreateNotifier &bucketCreateNotifier); bool extractInfo(const BucketId &bucket, BucketState *&info); diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/i_bucket_create_listener.h b/searchcore/src/vespa/searchcore/proton/bucketdb/i_bucket_create_listener.h new file mode 100644 index 00000000000..e00e24332e0 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/i_bucket_create_listener.h @@ -0,0 +1,25 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +namespace document { class BucketId; } + +namespace proton::bucketdb { + +/** + * Interface class used by a registered listener to get notifications about + * non-empty buckets created due to split/join operations. + */ +class IBucketCreateListener +{ +public: + virtual ~IBucketCreateListener() {} + + /** + * Signal that the given bucket has been created due to split/join + * operation. + */ + virtual void notifyCreateBucket(const document::BucketId &bucket) = 0; +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/i_bucket_create_notifier.h b/searchcore/src/vespa/searchcore/proton/bucketdb/i_bucket_create_notifier.h new file mode 100644 index 00000000000..0fd4e6707a1 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/i_bucket_create_notifier.h @@ -0,0 +1,37 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +namespace document { class BucketId; } + +namespace proton::bucketdb { + +class IBucketCreateListener; + +/** + * Interface class used to (un)register a listener to get notifications about + * non-empty buckets created due to split/join operations. + */ +class IBucketCreateNotifier +{ +public: + virtual ~IBucketCreateNotifier() {} + + /** + * Signal that the given bucket has been created due to split/join + * operation. + */ + virtual void notifyCreateBucket(const document::BucketId &bucket) = 0; + + /* + * Register bucket create listener. + */ + virtual void addListener(IBucketCreateListener *listener) = 0; + + /* + * Unregister bucket create listener. + */ + virtual void removeListener(IBucketCreateListener *listener) = 0; +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.cpp index e9083bfbc98..f4c88402409 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.cpp +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.cpp @@ -2,15 +2,17 @@ #include "joinbucketssession.h" #include "bucketdeltapair.h" +#include "i_bucket_create_notifier.h" #include <cassert> namespace proton::bucketdb { JoinBucketsSession::JoinBucketsSession(BucketDBOwner &bucketDB, + IBucketCreateNotifier &bucketCreateNotifier, const BucketId &source1, const BucketId &source2, const BucketId &target) - : BucketSessionBase(bucketDB), + : BucketSessionBase(bucketDB, bucketCreateNotifier), _source1Delta(), _source2Delta(), _wantTargetActive(false), @@ -99,7 +101,9 @@ JoinBucketsSession::finish() if (source2Empty) { _bucketDB->deleteEmptyBucket(_source2); } + if (!_source1Delta.empty() || !_source2Delta.empty()) { + _bucketCreateNotifier.notifyCreateBucket(_target); + } } } - diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.h b/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.h index 97ba0c43673..1029379dfb1 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.h +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.h @@ -36,6 +36,7 @@ private: public: JoinBucketsSession(BucketDBOwner &bucketDB, + IBucketCreateNotifier &bucketCreateNotifier, const BucketId &source1, const BucketId &source2, const BucketId &target); diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/splitbucketsession.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/splitbucketsession.cpp index 9c5c861d8e4..0e085507a95 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/splitbucketsession.cpp +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/splitbucketsession.cpp @@ -2,16 +2,18 @@ #include "splitbucketsession.h" #include "bucketdeltapair.h" +#include "i_bucket_create_notifier.h" #include <cassert> namespace proton::bucketdb { SplitBucketSession::SplitBucketSession(BucketDBOwner &bucketDB, + IBucketCreateNotifier &bucketCreateNotifier, const BucketId &source, const BucketId &target1, const BucketId &target2) - : BucketSessionBase(bucketDB), + : BucketSessionBase(bucketDB, bucketCreateNotifier), _target1Delta(), _target2Delta(), _sourceActive(false), @@ -85,6 +87,12 @@ SplitBucketSession::finish() if (sourceState && sourceState->empty()) { _bucketDB->deleteEmptyBucket(_source); } + if (!_target1Delta.empty()) { + _bucketCreateNotifier.notifyCreateBucket(_target1); + } + if (!_target2Delta.empty()) { + _bucketCreateNotifier.notifyCreateBucket(_target2); + } } } diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/splitbucketsession.h b/searchcore/src/vespa/searchcore/proton/bucketdb/splitbucketsession.h index 956369bf59a..cc631b71ec4 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/splitbucketsession.h +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/splitbucketsession.h @@ -34,6 +34,7 @@ private: public: SplitBucketSession(BucketDBOwner &bucketDB, + IBucketCreateNotifier &bucketCreateNotifier, const BucketId &source, const BucketId &target1, const BucketId &target2); diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp index 39859c0adb4..f36c2add23a 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp @@ -6,6 +6,7 @@ #include "iclusterstatechangednotifier.h" #include "maintenancedocumentsubdb.h" #include "i_disk_mem_usage_notifier.h" +#include <vespa/searchcore/proton/bucketdb/i_bucket_create_notifier.h> #include <vespa/searchcore/proton/documentmetastore/i_document_meta_store.h> #include <vespa/log/log.h> @@ -154,6 +155,7 @@ BucketMoveJob(const IBucketStateCalculator::SP &calc, const MaintenanceDocumentSubDB &ready, const MaintenanceDocumentSubDB ¬Ready, IFrozenBucketHandler &frozenBuckets, + bucketdb::IBucketCreateNotifier &bucketCreateNotifier, IClusterStateChangedNotifier &clusterStateChangedNotifier, IBucketStateChangedNotifier &bucketStateChangedNotifier, IDiskMemUsageNotifier &diskMemUsageNotifier, @@ -162,6 +164,9 @@ BucketMoveJob(const IBucketStateCalculator::SP &calc, : BlockableMaintenanceJob("move_buckets." + docTypeName, 0.0, 0.0, blockableConfig), IClusterStateChangedHandler(), IBucketFreezeListener(), + bucketdb::IBucketCreateListener(), + IBucketStateChangedHandler(), + IDiskMemUsageListener(), _calc(calc), _moveHandler(moveHandler), _modifiedHandler(modifiedHandler), @@ -175,6 +180,7 @@ BucketMoveJob(const IBucketStateCalculator::SP &calc, _delayedBuckets(), _delayedBucketsFrozen(), _frozenBuckets(frozenBuckets), + _bucketCreateNotifier(bucketCreateNotifier), _delayedMover(*_moveOpsLimiter), _clusterStateChangedNotifier(clusterStateChangedNotifier), _bucketStateChangedNotifier(bucketStateChangedNotifier), @@ -185,6 +191,7 @@ BucketMoveJob(const IBucketStateCalculator::SP &calc, } _frozenBuckets.addListener(this); + _bucketCreateNotifier.addListener(this); _clusterStateChangedNotifier.addClusterStateChangedHandler(this); _bucketStateChangedNotifier.addBucketStateChangedHandler(this); _diskMemUsageNotifier.addDiskMemUsageListener(this); @@ -193,6 +200,7 @@ BucketMoveJob(const IBucketStateCalculator::SP &calc, BucketMoveJob::~BucketMoveJob() { _frozenBuckets.removeListener(this); + _bucketCreateNotifier.removeListener(this); _clusterStateChangedNotifier.removeClusterStateChangedHandler(this); _bucketStateChangedNotifier.removeBucketStateChangedHandler(this); _diskMemUsageNotifier.removeDiskMemUsageListener(this); @@ -247,6 +255,12 @@ BucketMoveJob::activateBucket(BucketId bucket) _delayedBuckets.insert(bucket); } +void +BucketMoveJob::notifyCreateBucket(const BucketId &bucket) +{ + _delayedBuckets.insert(bucket); + considerRun(); +} void BucketMoveJob::changedCalculator() diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h index 2ff94fd1415..6b43af9d4c0 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h @@ -12,6 +12,7 @@ #include "iclusterstatechangedhandler.h" #include "ifrozenbuckethandler.h" #include <vespa/searchcore/proton/bucketdb/bucket_db_owner.h> +#include <vespa/searchcore/proton/bucketdb/i_bucket_create_listener.h> #include <set> namespace proton @@ -21,6 +22,7 @@ class BlockableMaintenanceJobConfig; class IBucketStateChangedNotifier; class IClusterStateChangedNotifier; class IDiskMemUsageNotifier; +namespace bucketdb { class IBucketCreateNotifier; } /** * Class used to control the moving of buckets between the ready and @@ -29,6 +31,7 @@ class IDiskMemUsageNotifier; class BucketMoveJob : public BlockableMaintenanceJob, public IClusterStateChangedHandler, public IBucketFreezeListener, + public bucketdb::IBucketCreateListener, public IBucketStateChangedHandler, public IDiskMemUsageListener { @@ -96,6 +99,7 @@ private: // Frozen buckets that cannot be moved at all. DelayedBucketSet _delayedBucketsFrozen; IFrozenBucketHandler &_frozenBuckets; + bucketdb::IBucketCreateNotifier &_bucketCreateNotifier; DocumentBucketMover _delayedMover; IClusterStateChangedNotifier &_clusterStateChangedNotifier; IBucketStateChangedNotifier &_bucketStateChangedNotifier; @@ -138,6 +142,7 @@ public: const MaintenanceDocumentSubDB &ready, const MaintenanceDocumentSubDB ¬Ready, IFrozenBucketHandler &frozenBuckets, + bucketdb::IBucketCreateNotifier &bucketCreateNotifier, IClusterStateChangedNotifier &clusterStateChangedNotifier, IBucketStateChangedNotifier &bucketStateChangedNotifier, IDiskMemUsageNotifier &diskMemUsageNotifier, @@ -172,6 +177,9 @@ public: storage::spi::BucketInfo::ActiveState newState) override; virtual void notifyDiskMemUsage(DiskMemUsageState state) override; + + // bucketdb::IBucketCreateListener API + virtual void notifyCreateBucket(const document::BucketId &bucket) override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index d83f3714e4c..16d4a47a0c5 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -948,6 +948,7 @@ DocumentDB::injectMaintenanceJobs(const DocumentDBMaintenanceConfig &config) _lidSpaceCompactionHandlers, _feedHandler, // IOperationStorer _maintenanceController, // IFrozenBucketHandler + _subDBs.getBucketCreateNotifier(), _docTypeName.getName(), _feedHandler, // IPruneRemovedDocumentsHandler _feedHandler, // IDocumentMoveHandler diff --git a/searchcore/src/vespa/searchcore/proton/server/documentsubdbcollection.h b/searchcore/src/vespa/searchcore/proton/server/documentsubdbcollection.h index 01ce59c6ffb..a3acd68d377 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentsubdbcollection.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentsubdbcollection.h @@ -119,6 +119,9 @@ public: bucketdb::IBucketDBHandler &getBucketDBHandler() { return *_bucketDBHandler; } + bucketdb::IBucketCreateNotifier &getBucketCreateNotifier() { + return _bucketDBHandler->getBucketCreateNotifier(); + } std::shared_ptr<initializer::InitializerTask> createInitializer(const DocumentDBConfig &configSnapshot, diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp index d830bf59461..d19e702c574 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp @@ -51,6 +51,7 @@ injectLidSpaceCompactionJobs(MaintenanceController &controller, void injectBucketMoveJob(MaintenanceController &controller, IFrozenBucketHandler &fbHandler, + bucketdb::IBucketCreateNotifier &bucketCreateNotifier, const vespalib::string &docTypeName, IDocumentMoveHandler &moveHandler, IBucketModifiedHandler &bucketModifiedHandler, @@ -68,6 +69,7 @@ injectBucketMoveJob(MaintenanceController &controller, controller.getReadySubDB(), controller.getNotReadySubDB(), fbHandler, + bucketCreateNotifier, clusterStateChangedNotifier, bucketStateChangedNotifier, diskMemUsageNotifier, @@ -87,6 +89,7 @@ MaintenanceJobsInjector::injectJobs(MaintenanceController &controller, const ILidSpaceCompactionHandler::Vector &lscHandlers, IOperationStorer &opStorer, IFrozenBucketHandler &fbHandler, + bucketdb::IBucketCreateNotifier &bucketCreateNotifier, const vespalib::string &docTypeName, IPruneRemovedDocumentsHandler &prdHandler, IDocumentMoveHandler &moveHandler, @@ -116,7 +119,7 @@ MaintenanceJobsInjector::injectJobs(MaintenanceController &controller, fbHandler, jobTrackers.getLidSpaceCompact(), diskMemUsageNotifier, clusterStateChangedNotifier, calc); } - injectBucketMoveJob(controller, fbHandler, docTypeName, moveHandler, bucketModifiedHandler, + injectBucketMoveJob(controller, fbHandler, bucketCreateNotifier, docTypeName, moveHandler, bucketModifiedHandler, clusterStateChangedNotifier, bucketStateChangedNotifier, calc, jobTrackers, diskMemUsageNotifier, config.getBlockableJobConfig()); controller.registerJobInMasterThread(std::make_unique<SampleAttributeUsageJob> diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h index e0b1a0fe496..85fed392ab6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h +++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h @@ -21,6 +21,7 @@ class IBucketStateCalculator; class IAttributeManager; class AttributeUsageFilter; class IDiskMemUsageNotifier; +namespace bucketdb { class IBucketCreateNotifier; } /** * Class that injects all concrete maintenance jobs used in document db @@ -36,6 +37,7 @@ struct MaintenanceJobsInjector const ILidSpaceCompactionHandler::Vector &lscHandlers, IOperationStorer &opStorer, IFrozenBucketHandler &fbHandler, + bucketdb::IBucketCreateNotifier &bucketCreateNotifier, const vespalib::string &docTypeName, IPruneRemovedDocumentsHandler &prdHandler, IDocumentMoveHandler &moveHandler, |