diff options
author | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-10-25 16:22:04 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-25 16:22:04 +0200 |
commit | 9606e88b7ca082f36eb38b0e197a0513f76ef6eb (patch) | |
tree | dccf8b7a99a0dcf714eb05771e5e7119f68e8377 | |
parent | d5ff1d39ec4e1aeff417fb20d9b997390b91887b (diff) | |
parent | b01db859c00ad78a14e0e0da78c50ca94b6396c9 (diff) |
Merge pull request #3878 from vespa-engine/toregge/use-document-bucket-to-track-merge-state
Toregge/use document bucket to track merge state
16 files changed, 141 insertions, 156 deletions
diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index aab0aa2c0fb..d4b749772a2 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -193,13 +193,37 @@ OperationAbortingTest::validateReplies( namespace { +class ExplicitBucketSetPredicate : public AbortBucketOperationsCommand::AbortPredicate { + using BucketSet = vespalib::hash_set<document::BucketId, document::BucketId::hash>; + BucketSet _bucketsToAbort; + + bool doShouldAbort(const document::Bucket &bucket) const override; +public: + ~ExplicitBucketSetPredicate(); + + template <typename Iterator> + ExplicitBucketSetPredicate(Iterator first, Iterator last) + : _bucketsToAbort(first, last) + { } + + const BucketSet& getBucketsToAbort() const { + return _bucketsToAbort; + } +}; + +bool +ExplicitBucketSetPredicate::doShouldAbort(const document::Bucket &bucket) const { + return _bucketsToAbort.find(bucket.getBucketId()) != _bucketsToAbort.end(); +} + +ExplicitBucketSetPredicate::~ExplicitBucketSetPredicate() { } + template <typename Container> AbortBucketOperationsCommand::SP makeAbortCmd(const Container& buckets) { std::unique_ptr<AbortBucketOperationsCommand::AbortPredicate> pred( - new AbortBucketOperationsCommand::ExplicitBucketSetPredicate( - buckets.begin(), buckets.end())); + new ExplicitBucketSetPredicate(buckets.begin(), buckets.end())); AbortBucketOperationsCommand::SP cmd( new AbortBucketOperationsCommand(std::move(pred))); return cmd; diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 120dbd4ce45..47ed60b62ad 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -20,7 +20,7 @@ namespace storage { struct MergeHandlerTest : public SingleDiskPersistenceTestUtils { uint32_t _location; // Location used for all merge tests - document::BucketId _bucket; // Bucket used for all merge tests + document::Bucket _bucket; // Bucket used for all merge tests uint64_t _maxTimestamp; std::vector<api::MergeBucketCommand::Node> _nodes; std::unique_ptr<spi::Context> _context; @@ -235,13 +235,13 @@ MergeHandlerTest::setUp() { SingleDiskPersistenceTestUtils::setUp(); _location = 1234; - _bucket = document::BucketId(16, _location); + _bucket = makeDocumentBucket(document::BucketId(16, _location)); _maxTimestamp = 11501; LOG(info, "Creating %s in bucket database", _bucket.toString().c_str()); bucketdb::StorageBucketInfo bucketDBEntry; bucketDBEntry.disk = 0; - getEnv().getBucketDatabase().insert(_bucket, bucketDBEntry, "mergetestsetup"); + getEnv().getBucketDatabase().insert(_bucket.getBucketId(), bucketDBEntry, "mergetestsetup"); LOG(info, "Creating bucket to merge"); createTestBucket(_bucket); @@ -267,7 +267,7 @@ MergeHandlerTest::testMergeBucketCommand() MergeHandler handler(getPersistenceProvider(), getEnv()); LOG(info, "Handle a merge bucket command"); - api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp); + api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); cmd.setSourceIndex(1234); MessageTracker::UP tracker = handler.handleMergeBucket(cmd, *_context); @@ -294,7 +294,7 @@ MergeHandlerTest::testGetBucketDiffChain(bool midChain) MergeHandler handler(getPersistenceProvider(), getEnv()); LOG(info, "Verifying that get bucket diff is sent on"); - api::GetBucketDiffCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp); + api::GetBucketDiffCommand cmd(_bucket, _nodes, _maxTimestamp); MessageTracker::UP tracker1 = handler.handleGetBucketDiff(cmd, *_context); api::StorageMessage::SP replySent = tracker1->getReply(); @@ -338,7 +338,7 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain) MergeHandler handler(getPersistenceProvider(), getEnv()); LOG(info, "Verifying that apply bucket diff is sent on"); - api::ApplyBucketDiffCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp); + api::ApplyBucketDiffCommand cmd(_bucket, _nodes, _maxTimestamp); MessageTracker::UP tracker1 = handler.handleApplyBucketDiff(cmd, *_context); api::StorageMessage::SP replySent = tracker1->getReply(); @@ -382,7 +382,7 @@ MergeHandlerTest::testMasterMessageFlow() MergeHandler handler(getPersistenceProvider(), getEnv()); LOG(info, "Handle a merge bucket command"); - api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp); + api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(cmd, *_context); LOG(info, "Check state"); @@ -644,7 +644,7 @@ MergeHandlerTest::testChunkedApplyBucketDiff() MergeHandler handler(getPersistenceProvider(), getEnv(), maxChunkSize); LOG(info, "Handle a merge bucket command"); - api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp); + api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(cmd, *_context); std::shared_ptr<api::GetBucketDiffCommand> getBucketDiffCmd( @@ -732,7 +732,7 @@ MergeHandlerTest::testChunkLimitPartiallyFilledDiff() setUpChain(MIDDLE); std::shared_ptr<api::ApplyBucketDiffCommand> applyBucketDiffCmd( - new api::ApplyBucketDiffCommand(makeDocumentBucket(_bucket), _nodes, maxChunkSize)); + new api::ApplyBucketDiffCommand(_bucket, _nodes, maxChunkSize)); applyBucketDiffCmd->getDiff() = applyDiff; MergeHandler handler( @@ -753,7 +753,7 @@ MergeHandlerTest::testMaxTimestamp() MergeHandler handler(getPersistenceProvider(), getEnv()); - api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp); + api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(cmd, *_context); std::shared_ptr<api::GetBucketDiffCommand> getCmd( @@ -820,7 +820,7 @@ MergeHandlerTest::createDummyApplyDiff(int timestampOffset, } std::shared_ptr<api::ApplyBucketDiffCommand> applyBucketDiffCmd( - new api::ApplyBucketDiffCommand(makeDocumentBucket(_bucket), _nodes, 1024*1024)); + new api::ApplyBucketDiffCommand(_bucket, _nodes, 1024*1024)); applyBucketDiffCmd->getDiff() = applyDiff; return applyBucketDiffCmd; } @@ -856,7 +856,7 @@ MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset, } std::shared_ptr<api::GetBucketDiffCommand> getBucketDiffCmd( - new api::GetBucketDiffCommand(makeDocumentBucket(_bucket), _nodes, 1024*1024)); + new api::GetBucketDiffCommand(_bucket, _nodes, 1024*1024)); getBucketDiffCmd->getDiff() = diff; return getBucketDiffCmd; } @@ -907,7 +907,7 @@ void MergeHandlerTest::testMergeProgressSafeGuard() { MergeHandler handler(getPersistenceProvider(), getEnv()); - api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp); + api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(cmd, *_context); std::shared_ptr<api::GetBucketDiffCommand> getBucketDiffCmd( @@ -943,7 +943,7 @@ MergeHandlerTest::testSafeGuardNotInvokedWhenHasMaskChanges() _nodes.push_back(api::MergeBucketCommand::Node(0, false)); _nodes.push_back(api::MergeBucketCommand::Node(1, false)); _nodes.push_back(api::MergeBucketCommand::Node(2, false)); - api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp); + api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(cmd, *_context); std::shared_ptr<api::GetBucketDiffCommand> getBucketDiffCmd( @@ -991,7 +991,7 @@ MergeHandlerTest::testEntryRemovedAfterGetBucketDiff() } setUpChain(BACK); std::shared_ptr<api::ApplyBucketDiffCommand> applyBucketDiffCmd( - new api::ApplyBucketDiffCommand(makeDocumentBucket(_bucket), _nodes, 1024*1024)); + new api::ApplyBucketDiffCommand(_bucket, _nodes, 1024*1024)); applyBucketDiffCmd->getDiff() = applyDiff; MessageTracker::UP tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context); @@ -1086,7 +1086,7 @@ MergeHandlerTest::HandleMergeBucketInvoker::invoke( MergeHandler& handler, spi::Context& context) { - api::MergeBucketCommand cmd(makeDocumentBucket(test._bucket), test._nodes, test._maxTimestamp); + api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp); handler.handleMergeBucket(cmd, context); } @@ -1126,7 +1126,7 @@ MergeHandlerTest::HandleGetBucketDiffInvoker::invoke( MergeHandler& handler, spi::Context& context) { - api::GetBucketDiffCommand cmd(makeDocumentBucket(test._bucket), test._nodes, test._maxTimestamp); + api::GetBucketDiffCommand cmd(test._bucket, test._nodes, test._maxTimestamp); handler.handleGetBucketDiff(cmd, context); } @@ -1215,7 +1215,7 @@ MergeHandlerTest::HandleGetBucketDiffReplyInvoker::beforeInvoke( MergeHandler& handler, spi::Context& context) { - api::MergeBucketCommand cmd(makeDocumentBucket(test._bucket), test._nodes, test._maxTimestamp); + api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp); handler.handleMergeBucket(cmd, context); _diffCmd = test.fetchSingleMessage<api::GetBucketDiffCommand>(); } @@ -1287,7 +1287,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::beforeInvoke( ++_counter; _stub.clear(); if (getChainPos() == FRONT) { - api::MergeBucketCommand cmd(makeDocumentBucket(test._bucket), test._nodes, test._maxTimestamp); + api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp); handler.handleMergeBucket(cmd, context); std::shared_ptr<api::GetBucketDiffCommand> diffCmd( test.fetchSingleMessage<api::GetBucketDiffCommand>()); @@ -1470,7 +1470,7 @@ MergeHandlerTest::testRemovePutOnExistingTimestamp() } std::shared_ptr<api::ApplyBucketDiffCommand> applyBucketDiffCmd( - new api::ApplyBucketDiffCommand(makeDocumentBucket(_bucket), _nodes, 1024*1024)); + new api::ApplyBucketDiffCommand(_bucket, _nodes, 1024*1024)); applyBucketDiffCmd->getDiff() = applyDiff; MessageTracker::UP tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context); @@ -1480,7 +1480,7 @@ MergeHandlerTest::testRemovePutOnExistingTimestamp() tracker->getReply())); CPPUNIT_ASSERT(applyBucketDiffReply.get()); - api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp); + api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(cmd, *_context); std::shared_ptr<api::GetBucketDiffCommand> getBucketDiffCmd( diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index c17c4837bfa..fb6bc298b08 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -373,10 +373,10 @@ PersistenceTestUtils::createRandomDocumentAtLocation( } void -PersistenceTestUtils::createTestBucket(const document::BucketId& bucket, +PersistenceTestUtils::createTestBucket(const document::Bucket& bucket, uint16_t disk) { - + document::BucketId bucketId(bucket.getBucketId()); uint32_t opsPerType = 2; uint32_t numberOfLocations = 2; uint32_t minDocSize = 0; @@ -388,27 +388,27 @@ PersistenceTestUtils::createTestBucket(const document::BucketId& bucket, uint32_t seed = useHeaderOnly * 10000 + optype * 1000 + i + 1; uint64_t location = (seed % numberOfLocations); location <<= 32; - location += (bucket.getRawId() & 0xffffffff); + location += (bucketId.getRawId() & 0xffffffff); document::Document::SP doc( createRandomDocumentAtLocation( location, seed, minDocSize, maxDocSize)); if (headerOnly) { clearBody(*doc); } - doPut(doc, spi::Timestamp(seed), disk, bucket.getUsedBits()); + doPut(doc, spi::Timestamp(seed), disk, bucketId.getUsedBits()); if (optype == 0) { // Regular put } else if (optype == 1) { // Overwritten later in time document::Document::SP doc2(new document::Document(*doc)); doc2->setValue(doc2->getField("content"), document::StringFieldValue("overwritten")); doPut(doc2, spi::Timestamp(seed + 500), - disk, bucket.getUsedBits()); + disk, bucketId.getUsedBits()); } else if (optype == 2) { // Removed doRemove(doc->getId(), spi::Timestamp(seed + 500), disk, false, - bucket.getUsedBits()); + bucketId.getUsedBits()); } else if (optype == 3) { // Unrevertable removed doRemove(doc->getId(), spi::Timestamp(seed), disk, true, - bucket.getUsedBits()); + bucketId.getUsedBits()); } } } diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h index 1b53b401a62..573ff28a80c 100644 --- a/storage/src/tests/persistence/persistencetestutils.h +++ b/storage/src/tests/persistence/persistencetestutils.h @@ -188,7 +188,7 @@ public: * * @disk If set, use this disk, otherwise lookup in bucket db. */ - void createTestBucket(const document::BucketId&, uint16_t disk = 0xffff); + void createTestBucket(const document::Bucket&, uint16_t disk = 0xffff); /** * Create a new persistence thread. diff --git a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp index 732aff5faf6..18e93d00494 100644 --- a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp +++ b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp @@ -197,7 +197,7 @@ bool hasAbortedAllOf(const AbortBucketOperationsCommand::SP& cmd, const Vec& v) { for (auto& b : v) { - if (!cmd->shouldAbort(b)) { + if (!cmd->shouldAbort(makeDocumentBucket(b))) { return false; } } @@ -209,7 +209,7 @@ bool hasAbortedNoneOf(const AbortBucketOperationsCommand::SP& cmd, const Vec& v) { for (auto& b : v) { - if (cmd->shouldAbort(b)) { + if (cmd->shouldAbort(makeDocumentBucket(b))) { return false; } } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp index 7286b6e3586..e23bfda192c 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp @@ -149,20 +149,20 @@ FileStorHandler::getQueueSize(uint16_t disk) const } void -FileStorHandler::addMergeStatus(const document::BucketId& bucket, +FileStorHandler::addMergeStatus(const document::Bucket& bucket, MergeStatus::SP ms) { return _impl->addMergeStatus(bucket, ms); } MergeStatus& -FileStorHandler::editMergeStatus(const document::BucketId& bucket) +FileStorHandler::editMergeStatus(const document::Bucket& bucket) { return _impl->editMergeStatus(bucket); } bool -FileStorHandler::isMerging(const document::BucketId& bucket) const +FileStorHandler::isMerging(const document::Bucket& bucket) const { return _impl->isMerging(bucket); } @@ -174,14 +174,14 @@ FileStorHandler::getNumActiveMerges() const } void -FileStorHandler::clearMergeStatus(const document::BucketId& bucket, +FileStorHandler::clearMergeStatus(const document::Bucket& bucket, const api::ReturnCode& code) { return _impl->clearMergeStatus(bucket, &code); } void -FileStorHandler::clearMergeStatus(const document::BucketId& bucket) +FileStorHandler::clearMergeStatus(const document::Bucket& bucket) { return _impl->clearMergeStatus(bucket, 0); } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index f7e32554c18..6e2d6a0fc07 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -225,7 +225,7 @@ public: /** * Add a new merge state to the registry. */ - void addMergeStatus(const document::BucketId&, MergeStatus::SP); + void addMergeStatus(const document::Bucket&, MergeStatus::SP); /** * Returns the reference to the current merge status for the given bucket. @@ -234,7 +234,7 @@ public: * * @param bucket The bucket to start merging. */ - MergeStatus& editMergeStatus(const document::BucketId& bucket); + MergeStatus& editMergeStatus(const document::Bucket& bucket); /** * Returns true if the bucket is currently being merged on this node. @@ -242,7 +242,7 @@ public: * @param bucket The bucket to check merge status for * @return Returns true if the bucket is being merged. */ - bool isMerging(const document::BucketId& bucket) const; + bool isMerging(const document::Bucket& bucket) const; /** * @return Returns the number of active merges on the node. @@ -250,8 +250,8 @@ public: uint32_t getNumActiveMerges() const; /** Removes the merge status for the given bucket. */ - void clearMergeStatus(const document::BucketId&); - void clearMergeStatus(const document::BucketId&, const api::ReturnCode&); + void clearMergeStatus(const document::Bucket&); + void clearMergeStatus(const document::Bucket&, const api::ReturnCode&); void abortQueuedOperations(const AbortBucketOperationsCommand& cmd); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index a887d7d3791..fffd559866d 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -58,7 +58,7 @@ FileStorHandlerImpl::FileStorHandlerImpl( FileStorHandlerImpl::~FileStorHandlerImpl() { } void -FileStorHandlerImpl::addMergeStatus(const document::BucketId& bucket, MergeStatus::SP status) +FileStorHandlerImpl::addMergeStatus(const document::Bucket& bucket, MergeStatus::SP status) { vespalib::LockGuard mlock(_mergeStatesLock); if (_mergeStates.find(bucket) != _mergeStates.end()) {; @@ -69,7 +69,7 @@ FileStorHandlerImpl::addMergeStatus(const document::BucketId& bucket, MergeStatu } MergeStatus& -FileStorHandlerImpl::editMergeStatus(const document::BucketId& bucket) +FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket) { vespalib::LockGuard mlock(_mergeStatesLock); MergeStatus::SP status = _mergeStates[bucket]; @@ -81,7 +81,7 @@ FileStorHandlerImpl::editMergeStatus(const document::BucketId& bucket) } bool -FileStorHandlerImpl::isMerging(const document::BucketId& bucket) const +FileStorHandlerImpl::isMerging(const document::Bucket& bucket) const { vespalib::LockGuard mlock(_mergeStatesLock); return (_mergeStates.find(bucket) != _mergeStates.end()); @@ -95,7 +95,7 @@ FileStorHandlerImpl::getNumActiveMerges() const } void -FileStorHandlerImpl::clearMergeStatus(const document::BucketId& bucket, +FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket, const api::ReturnCode* code) { vespalib::LockGuard mlock(_mergeStatesLock); @@ -154,7 +154,7 @@ FileStorHandlerImpl::flush(bool killPendingMerges) if (killPendingMerges) { api::ReturnCode code(api::ReturnCode::ABORTED, "Storage node is shutting down"); - for (std::map<document::BucketId, MergeStatus::SP>::iterator it + for (std::map<document::Bucket, MergeStatus::SP>::iterator it = _mergeStates.begin(); it != _mergeStates.end(); ++it) { MergeStatus& s(*it->second); @@ -337,7 +337,7 @@ FileStorHandlerImpl::abortQueuedCommandsForBuckets( "bucket operation was bound to"); for (iter_t it(t.queue.begin()), e(t.queue.end()); it != e;) { api::StorageMessage& msg(*it->_command); - if (messageMayBeAborted(msg) && cmd.shouldAbort(it->_bucket.getBucketId())) { + if (messageMayBeAborted(msg) && cmd.shouldAbort(it->_bucket)) { LOG(debug, "Aborting operation %s as it is bound for bucket %s", msg.toString().c_str(), @@ -360,7 +360,7 @@ FileStorHandlerImpl::diskHasActiveOperationForAbortedBucket( const AbortBucketOperationsCommand& cmd) const { for (auto& lockedBucket : disk.lockedBuckets) { - if (cmd.shouldAbort(lockedBucket.first.getBucketId())) { + if (cmd.shouldAbort(lockedBucket.first)) { LOG(spam, "Disk had active operation for aborted bucket %s, " "waiting for it to complete...", @@ -868,7 +868,7 @@ FileStorHandlerImpl::remapMessage( << ". Cannot remap merge, so aborting it"; api::ReturnCode code(api::ReturnCode::BUCKET_DELETED, ost.str()); - clearMergeStatus(cmd.getBucketId(), &code); + clearMergeStatus(cmd.getBucket(), &code); } } // Follow onto next to move queue or fail @@ -1373,7 +1373,7 @@ FileStorHandlerImpl::getStatus(std::ostream& out, if (_mergeStates.size() == 0) { out << "None\n"; } - for (std::map<document::BucketId, MergeStatus::SP>::const_iterator it + for (std::map<document::Bucket, MergeStatus::SP>::const_iterator it = _mergeStates.begin(); it != _mergeStates.end(); ++it) { out << "<b>" << it->first.toString() << "</b><br>\n"; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 58e93d733fd..56ac9ea0577 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -182,11 +182,11 @@ public: std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket&, uint16_t disk); - void addMergeStatus(const document::BucketId&, MergeStatus::SP); - MergeStatus& editMergeStatus(const document::BucketId&); - bool isMerging(const document::BucketId&) const; + void addMergeStatus(const document::Bucket&, MergeStatus::SP); + MergeStatus& editMergeStatus(const document::Bucket&); + bool isMerging(const document::Bucket&) const; uint32_t getNumActiveMerges() const; - void clearMergeStatus(const document::BucketId&, const api::ReturnCode*); + void clearMergeStatus(const document::Bucket&, const api::ReturnCode*); std::string dumpQueue(uint16_t disk) const; ResumeGuard pause(); @@ -202,7 +202,7 @@ private: vespalib::Lock _mergeStatesLock; - std::map<document::BucketId, MergeStatus::SP> _mergeStates; + std::map<document::Bucket, MergeStatus::SP> _mergeStates; uint8_t _maxPriorityToBlock; uint8_t _minPriorityToBeBlocking; @@ -212,7 +212,6 @@ private: std::atomic<bool> _paused; void reply(api::StorageMessage&, DiskState state) const; - static document::BucketId getBucketId(const api::StorageMessage&); // Returns the index in the targets array we are sending to, or -1 if none of them match. int calculateTargetBasedOnDocId(const api::StorageMessage& msg, std::vector<RemapInfo*>& targets); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 77f6d912306..7abf480e80b 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -183,12 +183,12 @@ FileStorManager::replyWithBucketNotFound(api::StorageMessage& msg, StorBucketDatabase::WrappedEntry FileStorManager::mapOperationToDisk(api::StorageMessage& msg, - const document::BucketId& bucket) + const document::Bucket& bucket) { StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase().get( - bucket, "FileStorManager::mapOperationToDisk")); + bucket.getBucketId(), "FileStorManager::mapOperationToDisk")); if (!entry.exist()) { - replyWithBucketNotFound(msg, bucket); + replyWithBucketNotFound(msg, bucket.getBucketId()); } return entry; } @@ -259,7 +259,7 @@ FileStorManager::handlePersistenceMessage( msg->getType().getName().c_str(), disk); LOG_BUCKET_OPERATION_NO_LOCK( - getStorageMessageBucketId(*msg).getBucketId(), + getStorageMessageBucket(*msg).getBucketId(), vespalib::make_string("Attempting to queue %s to disk %u", msg->toString().c_str(), disk)); @@ -400,7 +400,7 @@ FileStorManager::onBatchPutRemove(const std::shared_ptr<api::BatchPutRemoveComma bool FileStorManager::onRemoveLocation(const std::shared_ptr<api::RemoveLocationCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -410,7 +410,7 @@ FileStorManager::onRemoveLocation(const std::shared_ptr<api::RemoveLocationComma bool FileStorManager::onStatBucket(const std::shared_ptr<api::StatBucketCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -645,7 +645,7 @@ FileStorManager::validateApplyDiffCommandBucket(api::StorageMessage& msg, const bool FileStorManager::validateDiffReplyBucket(const StorBucketDatabase::WrappedEntry& entry, - const document::BucketId& bucket) + const document::Bucket& bucket) { if (!entry.exist()) { _filestorHandler->clearMergeStatus(bucket, @@ -663,8 +663,8 @@ FileStorManager::validateDiffReplyBucket(const StorBucketDatabase::WrappedEntry& bool FileStorManager::onGetBucketDiffReply(const shared_ptr<api::GetBucketDiffReply>& reply) { - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*reply, reply->getBucketId())); - if (validateDiffReplyBucket(entry, reply->getBucketId())) { + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*reply, reply->getBucket())); + if (validateDiffReplyBucket(entry, reply->getBucket())) { handlePersistenceMessage(reply, entry->disk); } return true; @@ -673,7 +673,7 @@ FileStorManager::onGetBucketDiffReply(const shared_ptr<api::GetBucketDiffReply>& bool FileStorManager::onApplyBucketDiff(const shared_ptr<api::ApplyBucketDiffCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (validateApplyDiffCommandBucket(*cmd, entry)) { handlePersistenceMessage(cmd, entry->disk); } @@ -684,8 +684,8 @@ bool FileStorManager::onApplyBucketDiffReply(const shared_ptr<api::ApplyBucketDiffReply>& reply) { StorBucketDatabase::WrappedEntry entry(mapOperationToDisk( - *reply, reply->getBucketId())); - if (validateDiffReplyBucket(entry, reply->getBucketId())) { + *reply, reply->getBucket())); + if (validateDiffReplyBucket(entry, reply->getBucket())) { handlePersistenceMessage(reply, entry->disk); } return true; @@ -708,7 +708,7 @@ FileStorManager::onJoinBuckets(const std::shared_ptr<api::JoinBucketsCommand>& c bool FileStorManager::onSplitBucket(const std::shared_ptr<api::SplitBucketCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -719,7 +719,7 @@ bool FileStorManager::onSetBucketState( const std::shared_ptr<api::SetBucketStateCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -733,7 +733,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg) case GetIterCommand::ID: { shared_ptr<GetIterCommand> cmd(std::static_pointer_cast<GetIterCommand>(msg)); - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -742,7 +742,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg) case CreateIteratorCommand::ID: { shared_ptr<CreateIteratorCommand> cmd(std::static_pointer_cast<CreateIteratorCommand>(msg)); - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -765,7 +765,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg) case ReadBucketInfo::ID: { shared_ptr<ReadBucketInfo> cmd(std::static_pointer_cast<ReadBucketInfo>(msg)); - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -774,7 +774,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg) case InternalBucketJoinCommand::ID: { shared_ptr<InternalBucketJoinCommand> cmd(std::static_pointer_cast<InternalBucketJoinCommand>(msg)); - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -783,7 +783,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg) case RepairBucketCommand::ID: { shared_ptr<RepairBucketCommand> cmd(std::static_pointer_cast<RepairBucketCommand>(msg)); - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -792,7 +792,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg) case BucketDiskMoveCommand::ID: { shared_ptr<BucketDiskMoveCommand> cmd(std::static_pointer_cast<BucketDiskMoveCommand>(msg)); - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -801,7 +801,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg) case RecheckBucketInfoCommand::ID: { shared_ptr<RecheckBucketInfoCommand> cmd(std::static_pointer_cast<RecheckBucketInfoCommand>(msg)); - StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -939,7 +939,7 @@ FileStorManager::reportHtmlStatus(std::ostream& out, } bool -FileStorManager::isMerging(const document::BucketId& bucket) const +FileStorManager::isMerging(const document::Bucket& bucket) const { return _filestorHandler->isMerging(bucket); } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h index 05b9b9bc430..1f4186bf625 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h @@ -109,7 +109,7 @@ public: void print(std::ostream& out, bool verbose, const std::string& indent) const override; // Return true if we are currently merging the given bucket. - bool isMerging(const document::BucketId& bucket) const; + bool isMerging(const document::Bucket& bucket) const; FileStorHandler& getFileStorHandler() { return *_filestorHandler; @@ -140,9 +140,9 @@ private: const char* callerId); bool validateApplyDiffCommandBucket(api::StorageMessage& msg, const StorBucketDatabase::WrappedEntry&); - bool validateDiffReplyBucket(const StorBucketDatabase::WrappedEntry&, const document::BucketId&); + bool validateDiffReplyBucket(const StorBucketDatabase::WrappedEntry&, const document::Bucket&); - StorBucketDatabase::WrappedEntry mapOperationToDisk(api::StorageMessage&, const document::BucketId&); + StorBucketDatabase::WrappedEntry mapOperationToDisk(api::StorageMessage&, const document::Bucket&); StorBucketDatabase::WrappedEntry mapOperationToBucketAndDisk(api::BucketCommand&, const document::DocumentId*); bool handlePersistenceMessage(const std::shared_ptr<api::StorageMessage>&, uint16_t disk); diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 5fa8ca5c707..92de9720bfb 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -927,11 +927,11 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, class MergeStateDeleter { public: FileStorHandler& _handler; - document::BucketId _bucket; + document::Bucket _bucket; bool _active; MergeStateDeleter(FileStorHandler& handler, - const document::BucketId& bucket) + const document::Bucket& bucket) : _handler(handler), _bucket(bucket), _active(true) @@ -956,7 +956,6 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, _env._component.getClock())); spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); - const document::BucketId id(bucket.getBucketId()); LOG(debug, "MergeBucket(%s) with max timestamp %" PRIu64 ".", bucket.toString().c_str(), cmd.getMaxTimestamp()); @@ -989,7 +988,7 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, } } - if (_env._fileStorHandler.isMerging(id)) { + if (_env._fileStorHandler.isMerging(bucket.getBucket())) { const char* err = "A merge is already running on this bucket."; LOG(debug, "%s", err); tracker->fail(ReturnCode::BUSY, err); @@ -997,11 +996,11 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, } checkResult(_spi.createBucket(bucket, context), bucket, "create bucket"); - MergeStateDeleter stateGuard(_env._fileStorHandler, id); + MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket()); MergeStatus::SP s = MergeStatus::SP(new MergeStatus( _env._component.getClock(), cmd.getLoadType(), cmd.getPriority(), cmd.getTrace().getLevel())); - _env._fileStorHandler.addMergeStatus(id, s); + _env._fileStorHandler.addMergeStatus(bucket.getBucket(), s); s->nodeList = cmd.getNodes(); s->maxTimestamp = Timestamp(cmd.getMaxTimestamp()); s->timeout = cmd.getTimeout(); @@ -1182,11 +1181,10 @@ MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, _env._metrics.getBucketDiff, _env._component.getClock())); spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); - const document::BucketId id(bucket.getBucketId()); LOG(debug, "GetBucketDiff(%s)", bucket.toString().c_str()); checkResult(_spi.createBucket(bucket, context), bucket, "create bucket"); - if (_env._fileStorHandler.isMerging(id)) { + if (_env._fileStorHandler.isMerging(bucket.getBucket())) { tracker->fail(ReturnCode::BUSY, "A merge is already running on this bucket."); return tracker; @@ -1239,11 +1237,11 @@ MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, } else { // When not the last node in merge chain, we must save reply, and // send command on. - MergeStateDeleter stateGuard(_env._fileStorHandler, id); + MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket()); MergeStatus::SP s(new MergeStatus(_env._component.getClock(), cmd.getLoadType(), cmd.getPriority(), cmd.getTrace().getLevel())); - _env._fileStorHandler.addMergeStatus(id, s); + _env._fileStorHandler.addMergeStatus(bucket.getBucket(), s); s->pendingGetDiff = api::GetBucketDiffReply::SP(new api::GetBucketDiffReply(cmd)); @@ -1305,23 +1303,22 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, { ++_env._metrics.getBucketDiffReply; spi::Bucket bucket(reply.getBucket(), spi::PartitionId(_env._partition)); - document::BucketId id(bucket.getBucketId()); LOG(debug, "GetBucketDiffReply(%s)", bucket.toString().c_str()); - if (!_env._fileStorHandler.isMerging(id)) { + if (!_env._fileStorHandler.isMerging(bucket.getBucket())) { LOG(warning, "Got GetBucketDiffReply for %s which we have no " "merge state for.", bucket.toString().c_str()); - DUMP_LOGGED_BUCKET_OPERATIONS(id); + DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId()); return; } - MergeStatus& s = _env._fileStorHandler.editMergeStatus(id); + MergeStatus& s = _env._fileStorHandler.editMergeStatus(bucket.getBucket()); if (s.pendingId != reply.getMsgId()) { LOG(warning, "Got GetBucketDiffReply for %s which had message " "id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.", bucket.toString().c_str(), reply.getMsgId(), s.pendingId); - DUMP_LOGGED_BUCKET_OPERATIONS(id); + DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId()); return; } api::StorageReply::SP replyToSend; @@ -1363,7 +1360,7 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, } } catch (std::exception& e) { _env._fileStorHandler.clearMergeStatus( - id, + bucket.getBucket(), api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.what())); throw; @@ -1372,7 +1369,7 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, } if (clearState) { - _env._fileStorHandler.clearMergeStatus(id); + _env._fileStorHandler.clearMergeStatus(bucket.getBucket()); } if (replyToSend.get()) { replyToSend->setResult(reply.getResult()); @@ -1389,10 +1386,9 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, _env._component.getClock())); spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); - const document::BucketId id(bucket.getBucketId()); LOG(debug, "%s", cmd.toString().c_str()); - if (_env._fileStorHandler.isMerging(id)) { + if (_env._fileStorHandler.isMerging(bucket.getBucket())) { tracker->fail(ReturnCode::BUSY, "A merge is already running on this bucket."); return tracker; @@ -1451,11 +1447,11 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, } else { // When not the last node in merge chain, we must save reply, and // send command on. - MergeStateDeleter stateGuard(_env._fileStorHandler, id); + MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket()); MergeStatus::SP s(new MergeStatus(_env._component.getClock(), cmd.getLoadType(), cmd.getPriority(), cmd.getTrace().getLevel())); - _env._fileStorHandler.addMergeStatus(id, s); + _env._fileStorHandler.addMergeStatus(bucket.getBucket(), s); s->pendingApplyDiff = api::ApplyBucketDiffReply::SP(new api::ApplyBucketDiffReply(cmd)); @@ -1485,24 +1481,23 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, { ++_env._metrics.applyBucketDiffReply; spi::Bucket bucket(reply.getBucket(), spi::PartitionId(_env._partition)); - document::BucketId id(bucket.getBucketId()); std::vector<api::ApplyBucketDiffCommand::Entry>& diff(reply.getDiff()); LOG(debug, "%s", reply.toString().c_str()); - if (!_env._fileStorHandler.isMerging(id)) { + if (!_env._fileStorHandler.isMerging(bucket.getBucket())) { LOG(warning, "Got ApplyBucketDiffReply for %s which we have no " "merge state for.", bucket.toString().c_str()); - DUMP_LOGGED_BUCKET_OPERATIONS(id); + DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId()); return; } - MergeStatus& s = _env._fileStorHandler.editMergeStatus(id); + MergeStatus& s = _env._fileStorHandler.editMergeStatus(bucket.getBucket()); if (s.pendingId != reply.getMsgId()) { LOG(warning, "Got ApplyBucketDiffReply for %s which had message " "id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.", bucket.toString().c_str(), reply.getMsgId(), s.pendingId); - DUMP_LOGGED_BUCKET_OPERATIONS(id); + DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId()); return; } bool clearState = true; @@ -1587,7 +1582,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, } } catch (std::exception& e) { _env._fileStorHandler.clearMergeStatus( - id, + bucket.getBucket(), api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.what())); throw; @@ -1596,7 +1591,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, } if (clearState) { - _env._fileStorHandler.clearMergeStatus(id); + _env._fileStorHandler.clearMergeStatus(bucket.getBucket()); } if (replyToSend.get()) { // Send on diff --git a/storage/src/vespa/storage/persistence/messages.cpp b/storage/src/vespa/storage/persistence/messages.cpp index 78f4b02a192..876067e4e15 100644 --- a/storage/src/vespa/storage/persistence/messages.cpp +++ b/storage/src/vespa/storage/persistence/messages.cpp @@ -151,17 +151,6 @@ RecheckBucketInfoCommand::makeReply() { return std::make_unique<RecheckBucketInfoReply>(*this); } -bool -AbortBucketOperationsCommand::ExplicitBucketSetPredicate::doShouldAbort(const document::BucketId& bid) const { - return _bucketsToAbort.find(bid) != _bucketsToAbort.end(); -} - -AbortBucketOperationsCommand::ExplicitBucketSetPredicate::ExplicitBucketSetPredicate(const BucketSet& bucketsToAbort) - : _bucketsToAbort(bucketsToAbort) -{ } - -AbortBucketOperationsCommand::ExplicitBucketSetPredicate::~ExplicitBucketSetPredicate() { } - AbortBucketOperationsCommand::AbortBucketOperationsCommand(std::unique_ptr<AbortPredicate> predicate) : api::InternalCommand(ID), _predicate(std::move(predicate)) diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h index d47a389a8c1..7bbeea8a12a 100644 --- a/storage/src/vespa/storage/persistence/messages.h +++ b/storage/src/vespa/storage/persistence/messages.h @@ -206,33 +206,11 @@ class AbortBucketOperationsCommand : public api::InternalCommand { public: class AbortPredicate { - virtual bool doShouldAbort(const document::BucketId&) const = 0; + virtual bool doShouldAbort(const document::Bucket&) const = 0; public: virtual ~AbortPredicate() {} - bool shouldAbort(const document::BucketId& bid) const { - return doShouldAbort(bid); - } - }; - - using BucketSet = vespalib::hash_set<document::BucketId, document::BucketId::hash>; - - // Primarily for unit test mocking; actual predicate impl should do lazy - // evaluations based on previous and current cluster states. - class ExplicitBucketSetPredicate : public AbortPredicate { - BucketSet _bucketsToAbort; - - bool doShouldAbort(const document::BucketId& bid) const override; - public: - explicit ExplicitBucketSetPredicate(const BucketSet& bucketsToAbort); - ~ExplicitBucketSetPredicate(); - - template <typename Iterator> - ExplicitBucketSetPredicate(Iterator first, Iterator last) - : _bucketsToAbort(first, last) - { } - - const BucketSet& getBucketsToAbort() const { - return _bucketsToAbort; + bool shouldAbort(const document::Bucket &bucket) const { + return doShouldAbort(bucket); } }; @@ -245,8 +223,8 @@ public: AbortBucketOperationsCommand(std::unique_ptr<AbortPredicate> predicate); ~AbortBucketOperationsCommand(); - bool shouldAbort(const document::BucketId& bid) const { - return _predicate->shouldAbort(bid); + bool shouldAbort(const document::Bucket &bucket) const { + return _predicate->shouldAbort(bucket); } std::unique_ptr<api::StorageReply> makeReply() override; diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index 13a2bbd906b..074e5aafb3c 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -316,7 +316,7 @@ PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd) _env._metrics.createBuckets, _env._component.getClock())); LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str()); - if (_env._fileStorHandler.isMerging(cmd.getBucketId())) { + if (_env._fileStorHandler.isMerging(cmd.getBucket())) { LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str()); DUMP_LOGGED_BUCKET_OPERATIONS(cmd.getBucketId()); @@ -385,8 +385,8 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd) _env._component.getClock())); LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str()); LOG_BUCKET_OPERATION(cmd.getBucketId(), "deleteBucket()"); - if (_env._fileStorHandler.isMerging(cmd.getBucketId())) { - _env._fileStorHandler.clearMergeStatus(cmd.getBucketId(), + if (_env._fileStorHandler.isMerging(cmd.getBucket())) { + _env._fileStorHandler.clearMergeStatus(cmd.getBucket(), api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge")); } diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp index e981e29e806..4b624199459 100644 --- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp +++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp @@ -155,15 +155,15 @@ class StateDiffLazyAbortPredicate // where all distributors are down. bool _allDistributorsHaveGoneDown; - bool doShouldAbort(const document::BucketId& b) const override { + bool doShouldAbort(const document::Bucket &bucket) const override { if (_allDistributorsHaveGoneDown) { return true; } - uint16_t oldOwner(_oldState.ownerOf(b)); - uint16_t newOwner(_newState.ownerOf(b)); + uint16_t oldOwner(_oldState.ownerOf(bucket.getBucketId())); + uint16_t newOwner(_newState.ownerOf(bucket.getBucketId())); if (oldOwner != newOwner) { LOG(spam, "Owner of %s was %u, now %u. Operation should be aborted", - b.toString().c_str(), oldOwner, newOwner); + bucket.toString().c_str(), oldOwner, newOwner); return true; } return false; |