diff options
author | Tor Egge <Tor.Egge@oath.com> | 2017-10-25 09:59:55 +0000 |
---|---|---|
committer | Tor Egge <Tor.Egge@oath.com> | 2017-10-25 12:06:17 +0000 |
commit | c7fff4e03569a23a7306c03dd4cd7473291efe05 (patch) | |
tree | e34ff0a5f011ee38f25fa694671748be19ed399f /storage | |
parent | f671f4e689e1dc2f9e878ac678c626096cece26e (diff) |
Use document::Bucket as key for merge state tracking.
Diffstat (limited to 'storage')
11 files changed, 83 insertions, 89 deletions
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 120dbd4ce45..47ed60b62ad 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -20,7 +20,7 @@ namespace storage { struct MergeHandlerTest : public SingleDiskPersistenceTestUtils { uint32_t _location; // Location used for all merge tests - document::BucketId _bucket; // Bucket used for all merge tests + document::Bucket _bucket; // Bucket used for all merge tests uint64_t _maxTimestamp; std::vector<api::MergeBucketCommand::Node> _nodes; std::unique_ptr<spi::Context> _context; @@ -235,13 +235,13 @@ MergeHandlerTest::setUp() { SingleDiskPersistenceTestUtils::setUp(); _location = 1234; - _bucket = document::BucketId(16, _location); + _bucket = makeDocumentBucket(document::BucketId(16, _location)); _maxTimestamp = 11501; LOG(info, "Creating %s in bucket database", _bucket.toString().c_str()); bucketdb::StorageBucketInfo bucketDBEntry; bucketDBEntry.disk = 0; - getEnv().getBucketDatabase().insert(_bucket, bucketDBEntry, "mergetestsetup"); + getEnv().getBucketDatabase().insert(_bucket.getBucketId(), bucketDBEntry, "mergetestsetup"); LOG(info, "Creating bucket to merge"); createTestBucket(_bucket); @@ -267,7 +267,7 @@ MergeHandlerTest::testMergeBucketCommand() MergeHandler handler(getPersistenceProvider(), getEnv()); LOG(info, "Handle a merge bucket command"); - api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp); + api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); cmd.setSourceIndex(1234); MessageTracker::UP tracker = handler.handleMergeBucket(cmd, *_context); @@ -294,7 +294,7 @@ MergeHandlerTest::testGetBucketDiffChain(bool midChain) MergeHandler handler(getPersistenceProvider(), getEnv()); LOG(info, "Verifying that get bucket diff is sent on"); - api::GetBucketDiffCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp); + api::GetBucketDiffCommand cmd(_bucket, _nodes, _maxTimestamp); MessageTracker::UP tracker1 = handler.handleGetBucketDiff(cmd, *_context); api::StorageMessage::SP replySent = tracker1->getReply(); @@ -338,7 +338,7 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain) MergeHandler handler(getPersistenceProvider(), getEnv()); LOG(info, "Verifying that apply bucket diff is sent on"); - api::ApplyBucketDiffCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp); + api::ApplyBucketDiffCommand cmd(_bucket, _nodes, _maxTimestamp); MessageTracker::UP tracker1 = handler.handleApplyBucketDiff(cmd, *_context); api::StorageMessage::SP replySent = tracker1->getReply(); @@ -382,7 +382,7 @@ MergeHandlerTest::testMasterMessageFlow() MergeHandler handler(getPersistenceProvider(), getEnv()); LOG(info, "Handle a merge bucket command"); - api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp); + api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(cmd, *_context); LOG(info, "Check state"); @@ -644,7 +644,7 @@ MergeHandlerTest::testChunkedApplyBucketDiff() MergeHandler handler(getPersistenceProvider(), getEnv(), maxChunkSize); LOG(info, "Handle a merge bucket command"); - api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp); + api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(cmd, *_context); std::shared_ptr<api::GetBucketDiffCommand> getBucketDiffCmd( @@ -732,7 +732,7 @@ MergeHandlerTest::testChunkLimitPartiallyFilledDiff() setUpChain(MIDDLE); std::shared_ptr<api::ApplyBucketDiffCommand> applyBucketDiffCmd( - new api::ApplyBucketDiffCommand(makeDocumentBucket(_bucket), _nodes, maxChunkSize)); + new api::ApplyBucketDiffCommand(_bucket, _nodes, maxChunkSize)); applyBucketDiffCmd->getDiff() = applyDiff; MergeHandler handler( @@ -753,7 +753,7 @@ MergeHandlerTest::testMaxTimestamp() MergeHandler handler(getPersistenceProvider(), getEnv()); - api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp); + api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(cmd, *_context); std::shared_ptr<api::GetBucketDiffCommand> getCmd( @@ -820,7 +820,7 @@ MergeHandlerTest::createDummyApplyDiff(int timestampOffset, } std::shared_ptr<api::ApplyBucketDiffCommand> applyBucketDiffCmd( - new api::ApplyBucketDiffCommand(makeDocumentBucket(_bucket), _nodes, 1024*1024)); + new api::ApplyBucketDiffCommand(_bucket, _nodes, 1024*1024)); applyBucketDiffCmd->getDiff() = applyDiff; return applyBucketDiffCmd; } @@ -856,7 +856,7 @@ MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset, } std::shared_ptr<api::GetBucketDiffCommand> getBucketDiffCmd( - new api::GetBucketDiffCommand(makeDocumentBucket(_bucket), _nodes, 1024*1024)); + new api::GetBucketDiffCommand(_bucket, _nodes, 1024*1024)); getBucketDiffCmd->getDiff() = diff; return getBucketDiffCmd; } @@ -907,7 +907,7 @@ void MergeHandlerTest::testMergeProgressSafeGuard() { MergeHandler handler(getPersistenceProvider(), getEnv()); - api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp); + api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(cmd, *_context); std::shared_ptr<api::GetBucketDiffCommand> getBucketDiffCmd( @@ -943,7 +943,7 @@ MergeHandlerTest::testSafeGuardNotInvokedWhenHasMaskChanges() _nodes.push_back(api::MergeBucketCommand::Node(0, false)); _nodes.push_back(api::MergeBucketCommand::Node(1, false)); _nodes.push_back(api::MergeBucketCommand::Node(2, false)); - api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp); + api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(cmd, *_context); std::shared_ptr<api::GetBucketDiffCommand> getBucketDiffCmd( @@ -991,7 +991,7 @@ MergeHandlerTest::testEntryRemovedAfterGetBucketDiff() } setUpChain(BACK); std::shared_ptr<api::ApplyBucketDiffCommand> applyBucketDiffCmd( - new api::ApplyBucketDiffCommand(makeDocumentBucket(_bucket), _nodes, 1024*1024)); + new api::ApplyBucketDiffCommand(_bucket, _nodes, 1024*1024)); applyBucketDiffCmd->getDiff() = applyDiff; MessageTracker::UP tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context); @@ -1086,7 +1086,7 @@ MergeHandlerTest::HandleMergeBucketInvoker::invoke( MergeHandler& handler, spi::Context& context) { - api::MergeBucketCommand cmd(makeDocumentBucket(test._bucket), test._nodes, test._maxTimestamp); + api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp); handler.handleMergeBucket(cmd, context); } @@ -1126,7 +1126,7 @@ MergeHandlerTest::HandleGetBucketDiffInvoker::invoke( MergeHandler& handler, spi::Context& context) { - api::GetBucketDiffCommand cmd(makeDocumentBucket(test._bucket), test._nodes, test._maxTimestamp); + api::GetBucketDiffCommand cmd(test._bucket, test._nodes, test._maxTimestamp); handler.handleGetBucketDiff(cmd, context); } @@ -1215,7 +1215,7 @@ MergeHandlerTest::HandleGetBucketDiffReplyInvoker::beforeInvoke( MergeHandler& handler, spi::Context& context) { - api::MergeBucketCommand cmd(makeDocumentBucket(test._bucket), test._nodes, test._maxTimestamp); + api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp); handler.handleMergeBucket(cmd, context); _diffCmd = test.fetchSingleMessage<api::GetBucketDiffCommand>(); } @@ -1287,7 +1287,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::beforeInvoke( ++_counter; _stub.clear(); if (getChainPos() == FRONT) { - api::MergeBucketCommand cmd(makeDocumentBucket(test._bucket), test._nodes, test._maxTimestamp); + api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp); handler.handleMergeBucket(cmd, context); std::shared_ptr<api::GetBucketDiffCommand> diffCmd( test.fetchSingleMessage<api::GetBucketDiffCommand>()); @@ -1470,7 +1470,7 @@ MergeHandlerTest::testRemovePutOnExistingTimestamp() } std::shared_ptr<api::ApplyBucketDiffCommand> applyBucketDiffCmd( - new api::ApplyBucketDiffCommand(makeDocumentBucket(_bucket), _nodes, 1024*1024)); + new api::ApplyBucketDiffCommand(_bucket, _nodes, 1024*1024)); applyBucketDiffCmd->getDiff() = applyDiff; MessageTracker::UP tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context); @@ -1480,7 +1480,7 @@ MergeHandlerTest::testRemovePutOnExistingTimestamp() tracker->getReply())); CPPUNIT_ASSERT(applyBucketDiffReply.get()); - api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp); + api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(cmd, *_context); std::shared_ptr<api::GetBucketDiffCommand> getBucketDiffCmd( diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index c17c4837bfa..fb6bc298b08 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -373,10 +373,10 @@ PersistenceTestUtils::createRandomDocumentAtLocation( } void -PersistenceTestUtils::createTestBucket(const document::BucketId& bucket, +PersistenceTestUtils::createTestBucket(const document::Bucket& bucket, uint16_t disk) { - + document::BucketId bucketId(bucket.getBucketId()); uint32_t opsPerType = 2; uint32_t numberOfLocations = 2; uint32_t minDocSize = 0; @@ -388,27 +388,27 @@ PersistenceTestUtils::createTestBucket(const document::BucketId& bucket, uint32_t seed = useHeaderOnly * 10000 + optype * 1000 + i + 1; uint64_t location = (seed % numberOfLocations); location <<= 32; - location += (bucket.getRawId() & 0xffffffff); + location += (bucketId.getRawId() & 0xffffffff); document::Document::SP doc( createRandomDocumentAtLocation( location, seed, minDocSize, maxDocSize)); if (headerOnly) { clearBody(*doc); } - doPut(doc, spi::Timestamp(seed), disk, bucket.getUsedBits()); + doPut(doc, spi::Timestamp(seed), disk, bucketId.getUsedBits()); if (optype == 0) { // Regular put } else if (optype == 1) { // Overwritten later in time document::Document::SP doc2(new document::Document(*doc)); doc2->setValue(doc2->getField("content"), document::StringFieldValue("overwritten")); doPut(doc2, spi::Timestamp(seed + 500), - disk, bucket.getUsedBits()); + disk, bucketId.getUsedBits()); } else if (optype == 2) { // Removed doRemove(doc->getId(), spi::Timestamp(seed + 500), disk, false, - bucket.getUsedBits()); + bucketId.getUsedBits()); } else if (optype == 3) { // Unrevertable removed doRemove(doc->getId(), spi::Timestamp(seed), disk, true, - bucket.getUsedBits()); + bucketId.getUsedBits()); } } } diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h index 1b53b401a62..573ff28a80c 100644 --- a/storage/src/tests/persistence/persistencetestutils.h +++ b/storage/src/tests/persistence/persistencetestutils.h @@ -188,7 +188,7 @@ public: * * @disk If set, use this disk, otherwise lookup in bucket db. */ - void createTestBucket(const document::BucketId&, uint16_t disk = 0xffff); + void createTestBucket(const document::Bucket&, uint16_t disk = 0xffff); /** * Create a new persistence thread. diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp index 7286b6e3586..e23bfda192c 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp @@ -149,20 +149,20 @@ FileStorHandler::getQueueSize(uint16_t disk) const } void -FileStorHandler::addMergeStatus(const document::BucketId& bucket, +FileStorHandler::addMergeStatus(const document::Bucket& bucket, MergeStatus::SP ms) { return _impl->addMergeStatus(bucket, ms); } MergeStatus& -FileStorHandler::editMergeStatus(const document::BucketId& bucket) +FileStorHandler::editMergeStatus(const document::Bucket& bucket) { return _impl->editMergeStatus(bucket); } bool -FileStorHandler::isMerging(const document::BucketId& bucket) const +FileStorHandler::isMerging(const document::Bucket& bucket) const { return _impl->isMerging(bucket); } @@ -174,14 +174,14 @@ FileStorHandler::getNumActiveMerges() const } void -FileStorHandler::clearMergeStatus(const document::BucketId& bucket, +FileStorHandler::clearMergeStatus(const document::Bucket& bucket, const api::ReturnCode& code) { return _impl->clearMergeStatus(bucket, &code); } void -FileStorHandler::clearMergeStatus(const document::BucketId& bucket) +FileStorHandler::clearMergeStatus(const document::Bucket& bucket) { return _impl->clearMergeStatus(bucket, 0); } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index f7e32554c18..6e2d6a0fc07 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -225,7 +225,7 @@ public: /** * Add a new merge state to the registry. */ - void addMergeStatus(const document::BucketId&, MergeStatus::SP); + void addMergeStatus(const document::Bucket&, MergeStatus::SP); /** * Returns the reference to the current merge status for the given bucket. @@ -234,7 +234,7 @@ public: * * @param bucket The bucket to start merging. */ - MergeStatus& editMergeStatus(const document::BucketId& bucket); + MergeStatus& editMergeStatus(const document::Bucket& bucket); /** * Returns true if the bucket is currently being merged on this node. @@ -242,7 +242,7 @@ public: * @param bucket The bucket to check merge status for * @return Returns true if the bucket is being merged. */ - bool isMerging(const document::BucketId& bucket) const; + bool isMerging(const document::Bucket& bucket) const; /** * @return Returns the number of active merges on the node. @@ -250,8 +250,8 @@ public: uint32_t getNumActiveMerges() const; /** Removes the merge status for the given bucket. */ - void clearMergeStatus(const document::BucketId&); - void clearMergeStatus(const document::BucketId&, const api::ReturnCode&); + void clearMergeStatus(const document::Bucket&); + void clearMergeStatus(const document::Bucket&, const api::ReturnCode&); void abortQueuedOperations(const AbortBucketOperationsCommand& cmd); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index a887d7d3791..718c36b78d4 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -58,7 +58,7 @@ FileStorHandlerImpl::FileStorHandlerImpl( FileStorHandlerImpl::~FileStorHandlerImpl() { } void -FileStorHandlerImpl::addMergeStatus(const document::BucketId& bucket, MergeStatus::SP status) +FileStorHandlerImpl::addMergeStatus(const document::Bucket& bucket, MergeStatus::SP status) { vespalib::LockGuard mlock(_mergeStatesLock); if (_mergeStates.find(bucket) != _mergeStates.end()) {; @@ -69,7 +69,7 @@ FileStorHandlerImpl::addMergeStatus(const document::BucketId& bucket, MergeStatu } MergeStatus& -FileStorHandlerImpl::editMergeStatus(const document::BucketId& bucket) +FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket) { vespalib::LockGuard mlock(_mergeStatesLock); MergeStatus::SP status = _mergeStates[bucket]; @@ -81,7 +81,7 @@ FileStorHandlerImpl::editMergeStatus(const document::BucketId& bucket) } bool -FileStorHandlerImpl::isMerging(const document::BucketId& bucket) const +FileStorHandlerImpl::isMerging(const document::Bucket& bucket) const { vespalib::LockGuard mlock(_mergeStatesLock); return (_mergeStates.find(bucket) != _mergeStates.end()); @@ -95,7 +95,7 @@ FileStorHandlerImpl::getNumActiveMerges() const } void -FileStorHandlerImpl::clearMergeStatus(const document::BucketId& bucket, +FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket, const api::ReturnCode* code) { vespalib::LockGuard mlock(_mergeStatesLock); @@ -154,7 +154,7 @@ FileStorHandlerImpl::flush(bool killPendingMerges) if (killPendingMerges) { api::ReturnCode code(api::ReturnCode::ABORTED, "Storage node is shutting down"); - for (std::map<document::BucketId, MergeStatus::SP>::iterator it + for (std::map<document::Bucket, MergeStatus::SP>::iterator it = _mergeStates.begin(); it != _mergeStates.end(); ++it) { MergeStatus& s(*it->second); @@ -868,7 +868,7 @@ FileStorHandlerImpl::remapMessage( << ". Cannot remap merge, so aborting it"; api::ReturnCode code(api::ReturnCode::BUCKET_DELETED, ost.str()); - clearMergeStatus(cmd.getBucketId(), &code); + clearMergeStatus(cmd.getBucket(), &code); } } // Follow onto next to move queue or fail @@ -1373,7 +1373,7 @@ FileStorHandlerImpl::getStatus(std::ostream& out, if (_mergeStates.size() == 0) { out << "None\n"; } - for (std::map<document::BucketId, MergeStatus::SP>::const_iterator it + for (std::map<document::Bucket, MergeStatus::SP>::const_iterator it = _mergeStates.begin(); it != _mergeStates.end(); ++it) { out << "<b>" << it->first.toString() << "</b><br>\n"; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 58e93d733fd..56ac9ea0577 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -182,11 +182,11 @@ public: std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket&, uint16_t disk); - void addMergeStatus(const document::BucketId&, MergeStatus::SP); - MergeStatus& editMergeStatus(const document::BucketId&); - bool isMerging(const document::BucketId&) const; + void addMergeStatus(const document::Bucket&, MergeStatus::SP); + MergeStatus& editMergeStatus(const document::Bucket&); + bool isMerging(const document::Bucket&) const; uint32_t getNumActiveMerges() const; - void clearMergeStatus(const document::BucketId&, const api::ReturnCode*); + void clearMergeStatus(const document::Bucket&, const api::ReturnCode*); std::string dumpQueue(uint16_t disk) const; ResumeGuard pause(); @@ -202,7 +202,7 @@ private: vespalib::Lock _mergeStatesLock; - std::map<document::BucketId, MergeStatus::SP> _mergeStates; + std::map<document::Bucket, MergeStatus::SP> _mergeStates; uint8_t _maxPriorityToBlock; uint8_t _minPriorityToBeBlocking; @@ -212,7 +212,6 @@ private: std::atomic<bool> _paused; void reply(api::StorageMessage&, DiskState state) const; - static document::BucketId getBucketId(const api::StorageMessage&); // Returns the index in the targets array we are sending to, or -1 if none of them match. int calculateTargetBasedOnDocId(const api::StorageMessage& msg, std::vector<RemapInfo*>& targets); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 77f6d912306..909c5c467cd 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -645,7 +645,7 @@ FileStorManager::validateApplyDiffCommandBucket(api::StorageMessage& msg, const bool FileStorManager::validateDiffReplyBucket(const StorBucketDatabase::WrappedEntry& entry, - const document::BucketId& bucket) + const document::Bucket& bucket) { if (!entry.exist()) { _filestorHandler->clearMergeStatus(bucket, @@ -664,7 +664,7 @@ bool FileStorManager::onGetBucketDiffReply(const shared_ptr<api::GetBucketDiffReply>& reply) { StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*reply, reply->getBucketId())); - if (validateDiffReplyBucket(entry, reply->getBucketId())) { + if (validateDiffReplyBucket(entry, reply->getBucket())) { handlePersistenceMessage(reply, entry->disk); } return true; @@ -685,7 +685,7 @@ FileStorManager::onApplyBucketDiffReply(const shared_ptr<api::ApplyBucketDiffRep { StorBucketDatabase::WrappedEntry entry(mapOperationToDisk( *reply, reply->getBucketId())); - if (validateDiffReplyBucket(entry, reply->getBucketId())) { + if (validateDiffReplyBucket(entry, reply->getBucket())) { handlePersistenceMessage(reply, entry->disk); } return true; @@ -939,7 +939,7 @@ FileStorManager::reportHtmlStatus(std::ostream& out, } bool -FileStorManager::isMerging(const document::BucketId& bucket) const +FileStorManager::isMerging(const document::Bucket& bucket) const { return _filestorHandler->isMerging(bucket); } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h index 05b9b9bc430..89d657a4f5c 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h @@ -109,7 +109,7 @@ public: void print(std::ostream& out, bool verbose, const std::string& indent) const override; // Return true if we are currently merging the given bucket. - bool isMerging(const document::BucketId& bucket) const; + bool isMerging(const document::Bucket& bucket) const; FileStorHandler& getFileStorHandler() { return *_filestorHandler; @@ -140,7 +140,7 @@ private: const char* callerId); bool validateApplyDiffCommandBucket(api::StorageMessage& msg, const StorBucketDatabase::WrappedEntry&); - bool validateDiffReplyBucket(const StorBucketDatabase::WrappedEntry&, const document::BucketId&); + bool validateDiffReplyBucket(const StorBucketDatabase::WrappedEntry&, const document::Bucket&); StorBucketDatabase::WrappedEntry mapOperationToDisk(api::StorageMessage&, const document::BucketId&); StorBucketDatabase::WrappedEntry mapOperationToBucketAndDisk(api::BucketCommand&, const document::DocumentId*); diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 5fa8ca5c707..92de9720bfb 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -927,11 +927,11 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, class MergeStateDeleter { public: FileStorHandler& _handler; - document::BucketId _bucket; + document::Bucket _bucket; bool _active; MergeStateDeleter(FileStorHandler& handler, - const document::BucketId& bucket) + const document::Bucket& bucket) : _handler(handler), _bucket(bucket), _active(true) @@ -956,7 +956,6 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, _env._component.getClock())); spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); - const document::BucketId id(bucket.getBucketId()); LOG(debug, "MergeBucket(%s) with max timestamp %" PRIu64 ".", bucket.toString().c_str(), cmd.getMaxTimestamp()); @@ -989,7 +988,7 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, } } - if (_env._fileStorHandler.isMerging(id)) { + if (_env._fileStorHandler.isMerging(bucket.getBucket())) { const char* err = "A merge is already running on this bucket."; LOG(debug, "%s", err); tracker->fail(ReturnCode::BUSY, err); @@ -997,11 +996,11 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, } checkResult(_spi.createBucket(bucket, context), bucket, "create bucket"); - MergeStateDeleter stateGuard(_env._fileStorHandler, id); + MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket()); MergeStatus::SP s = MergeStatus::SP(new MergeStatus( _env._component.getClock(), cmd.getLoadType(), cmd.getPriority(), cmd.getTrace().getLevel())); - _env._fileStorHandler.addMergeStatus(id, s); + _env._fileStorHandler.addMergeStatus(bucket.getBucket(), s); s->nodeList = cmd.getNodes(); s->maxTimestamp = Timestamp(cmd.getMaxTimestamp()); s->timeout = cmd.getTimeout(); @@ -1182,11 +1181,10 @@ MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, _env._metrics.getBucketDiff, _env._component.getClock())); spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); - const document::BucketId id(bucket.getBucketId()); LOG(debug, "GetBucketDiff(%s)", bucket.toString().c_str()); checkResult(_spi.createBucket(bucket, context), bucket, "create bucket"); - if (_env._fileStorHandler.isMerging(id)) { + if (_env._fileStorHandler.isMerging(bucket.getBucket())) { tracker->fail(ReturnCode::BUSY, "A merge is already running on this bucket."); return tracker; @@ -1239,11 +1237,11 @@ MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, } else { // When not the last node in merge chain, we must save reply, and // send command on. - MergeStateDeleter stateGuard(_env._fileStorHandler, id); + MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket()); MergeStatus::SP s(new MergeStatus(_env._component.getClock(), cmd.getLoadType(), cmd.getPriority(), cmd.getTrace().getLevel())); - _env._fileStorHandler.addMergeStatus(id, s); + _env._fileStorHandler.addMergeStatus(bucket.getBucket(), s); s->pendingGetDiff = api::GetBucketDiffReply::SP(new api::GetBucketDiffReply(cmd)); @@ -1305,23 +1303,22 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, { ++_env._metrics.getBucketDiffReply; spi::Bucket bucket(reply.getBucket(), spi::PartitionId(_env._partition)); - document::BucketId id(bucket.getBucketId()); LOG(debug, "GetBucketDiffReply(%s)", bucket.toString().c_str()); - if (!_env._fileStorHandler.isMerging(id)) { + if (!_env._fileStorHandler.isMerging(bucket.getBucket())) { LOG(warning, "Got GetBucketDiffReply for %s which we have no " "merge state for.", bucket.toString().c_str()); - DUMP_LOGGED_BUCKET_OPERATIONS(id); + DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId()); return; } - MergeStatus& s = _env._fileStorHandler.editMergeStatus(id); + MergeStatus& s = _env._fileStorHandler.editMergeStatus(bucket.getBucket()); if (s.pendingId != reply.getMsgId()) { LOG(warning, "Got GetBucketDiffReply for %s which had message " "id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.", bucket.toString().c_str(), reply.getMsgId(), s.pendingId); - DUMP_LOGGED_BUCKET_OPERATIONS(id); + DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId()); return; } api::StorageReply::SP replyToSend; @@ -1363,7 +1360,7 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, } } catch (std::exception& e) { _env._fileStorHandler.clearMergeStatus( - id, + bucket.getBucket(), api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.what())); throw; @@ -1372,7 +1369,7 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, } if (clearState) { - _env._fileStorHandler.clearMergeStatus(id); + _env._fileStorHandler.clearMergeStatus(bucket.getBucket()); } if (replyToSend.get()) { replyToSend->setResult(reply.getResult()); @@ -1389,10 +1386,9 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, _env._component.getClock())); spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); - const document::BucketId id(bucket.getBucketId()); LOG(debug, "%s", cmd.toString().c_str()); - if (_env._fileStorHandler.isMerging(id)) { + if (_env._fileStorHandler.isMerging(bucket.getBucket())) { tracker->fail(ReturnCode::BUSY, "A merge is already running on this bucket."); return tracker; @@ -1451,11 +1447,11 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, } else { // When not the last node in merge chain, we must save reply, and // send command on. - MergeStateDeleter stateGuard(_env._fileStorHandler, id); + MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket()); MergeStatus::SP s(new MergeStatus(_env._component.getClock(), cmd.getLoadType(), cmd.getPriority(), cmd.getTrace().getLevel())); - _env._fileStorHandler.addMergeStatus(id, s); + _env._fileStorHandler.addMergeStatus(bucket.getBucket(), s); s->pendingApplyDiff = api::ApplyBucketDiffReply::SP(new api::ApplyBucketDiffReply(cmd)); @@ -1485,24 +1481,23 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, { ++_env._metrics.applyBucketDiffReply; spi::Bucket bucket(reply.getBucket(), spi::PartitionId(_env._partition)); - document::BucketId id(bucket.getBucketId()); std::vector<api::ApplyBucketDiffCommand::Entry>& diff(reply.getDiff()); LOG(debug, "%s", reply.toString().c_str()); - if (!_env._fileStorHandler.isMerging(id)) { + if (!_env._fileStorHandler.isMerging(bucket.getBucket())) { LOG(warning, "Got ApplyBucketDiffReply for %s which we have no " "merge state for.", bucket.toString().c_str()); - DUMP_LOGGED_BUCKET_OPERATIONS(id); + DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId()); return; } - MergeStatus& s = _env._fileStorHandler.editMergeStatus(id); + MergeStatus& s = _env._fileStorHandler.editMergeStatus(bucket.getBucket()); if (s.pendingId != reply.getMsgId()) { LOG(warning, "Got ApplyBucketDiffReply for %s which had message " "id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.", bucket.toString().c_str(), reply.getMsgId(), s.pendingId); - DUMP_LOGGED_BUCKET_OPERATIONS(id); + DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId()); return; } bool clearState = true; @@ -1587,7 +1582,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, } } catch (std::exception& e) { _env._fileStorHandler.clearMergeStatus( - id, + bucket.getBucket(), api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.what())); throw; @@ -1596,7 +1591,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, } if (clearState) { - _env._fileStorHandler.clearMergeStatus(id); + _env._fileStorHandler.clearMergeStatus(bucket.getBucket()); } if (replyToSend.get()) { // Send on diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index 13a2bbd906b..074e5aafb3c 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -316,7 +316,7 @@ PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd) _env._metrics.createBuckets, _env._component.getClock())); LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str()); - if (_env._fileStorHandler.isMerging(cmd.getBucketId())) { + if (_env._fileStorHandler.isMerging(cmd.getBucket())) { LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str()); DUMP_LOGGED_BUCKET_OPERATIONS(cmd.getBucketId()); @@ -385,8 +385,8 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd) _env._component.getClock())); LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str()); LOG_BUCKET_OPERATION(cmd.getBucketId(), "deleteBucket()"); - if (_env._fileStorHandler.isMerging(cmd.getBucketId())) { - _env._fileStorHandler.clearMergeStatus(cmd.getBucketId(), + if (_env._fileStorHandler.isMerging(cmd.getBucket())) { + _env._fileStorHandler.clearMergeStatus(cmd.getBucket(), api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge")); } |