diff options
author | Tor Egge <Tor.Egge@oath.com> | 2017-10-24 12:40:19 +0000 |
---|---|---|
committer | Tor Egge <Tor.Egge@oath.com> | 2017-10-24 12:40:19 +0000 |
commit | ff645a32f1a216f66bd8805e7ae2ced7b6544cd8 (patch) | |
tree | 466a3de8fc15e7a48daee2552c9f4bd0fce1b219 /storage | |
parent | 823100d9d380c2749c466bdfe3ef640fa5d57cae (diff) |
Use document::Bucket instead of document::BucketId as key for
locking buckets and remapping queued operations after split/join.
Diffstat (limited to 'storage')
17 files changed, 200 insertions, 191 deletions
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index bbfe2c19c7b..67b93108139 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -905,9 +905,9 @@ FileStorManagerTest::testRemapSplit() "BucketId(0x40000000000011d7): Put(BucketId(0x40000000000011d7), userdoc:footype:4567:bar, timestamp 13, size 108) (priority: 127)\n"), filestorHandler.dumpQueue(0)); - FileStorHandler::RemapInfo a(document::BucketId(17, 1234), 0); - FileStorHandler::RemapInfo b(document::BucketId(17, 1234 | 1 << 16), 0); - filestorHandler.remapQueueAfterSplit(FileStorHandler::RemapInfo(bucket1, 0), a, b); + FileStorHandler::RemapInfo a(makeDocumentBucket(document::BucketId(17, 1234)), 0); + FileStorHandler::RemapInfo b(makeDocumentBucket(document::BucketId(17, 1234 | 1 << 16)), 0); + filestorHandler.remapQueueAfterSplit(FileStorHandler::RemapInfo(makeDocumentBucket(bucket1), 0), a, b); CPPUNIT_ASSERT(a.foundInQueue); CPPUNIT_ASSERT(!b.foundInQueue); diff --git a/storage/src/vespa/storage/common/CMakeLists.txt b/storage/src/vespa/storage/common/CMakeLists.txt index 9014c9fdd46..aff675b2a65 100644 --- a/storage/src/vespa/storage/common/CMakeLists.txt +++ b/storage/src/vespa/storage/common/CMakeLists.txt @@ -5,7 +5,7 @@ vespa_add_library(storage_common OBJECT storagelink.cpp storagelinkqueued.cpp bucketoperationlogger.cpp - messagebucketid.cpp + messagebucket.cpp bucketmessages.cpp statusmessages.cpp messagesender.cpp diff --git a/storage/src/vespa/storage/common/bucketmessages.cpp b/storage/src/vespa/storage/common/bucketmessages.cpp index 1317f5e0814..1d9d64ad24f 100644 --- a/storage/src/vespa/storage/common/bucketmessages.cpp +++ b/storage/src/vespa/storage/common/bucketmessages.cpp @@ -15,6 +15,12 @@ ReadBucketList::ReadBucketList(BucketSpace bucketSpace, spi::PartitionId partiti ReadBucketList::~ReadBucketList() { } +document::Bucket +ReadBucketList::getBucket() const +{ + return document::Bucket(_bucketSpace, document::BucketId()); +} + void ReadBucketList::print(std::ostream& out, bool verbose, const std::string& indent) const { out << "ReadBucketList(" << _partition << ")"; diff --git a/storage/src/vespa/storage/common/bucketmessages.h b/storage/src/vespa/storage/common/bucketmessages.h index 36e3a18c6b7..0ff7a22aa4d 100644 --- a/storage/src/vespa/storage/common/bucketmessages.h +++ b/storage/src/vespa/storage/common/bucketmessages.h @@ -28,6 +28,7 @@ public: ~ReadBucketList(); document::BucketSpace getBucketSpace() const { return _bucketSpace; } spi::PartitionId getPartition() const { return _partition; } + document::Bucket getBucket() const override; std::unique_ptr<api::StorageReply> makeReply() override; diff --git a/storage/src/vespa/storage/common/messagebucketid.cpp b/storage/src/vespa/storage/common/messagebucket.cpp index 8cb72040ed2..f23bece015f 100644 --- a/storage/src/vespa/storage/common/messagebucketid.cpp +++ b/storage/src/vespa/storage/common/messagebucket.cpp @@ -1,6 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "messagebucketid.h" +#include "messagebucket.h" #include "statusmessages.h" #include "bucketmessages.h" #include <vespa/storageapi/message/bucket.h> @@ -16,76 +16,76 @@ namespace storage { -document::BucketId -getStorageMessageBucketId(const api::StorageMessage& msg) +document::Bucket +getStorageMessageBucket(const api::StorageMessage& msg) { switch (msg.getType().getId()) { case api::MessageType::GET_ID: - return static_cast<const api::GetCommand&>(msg).getBucketId(); + return static_cast<const api::GetCommand&>(msg).getBucket(); case api::MessageType::PUT_ID: - return static_cast<const api::PutCommand&>(msg).getBucketId(); + return static_cast<const api::PutCommand&>(msg).getBucket(); case api::MessageType::UPDATE_ID: - return static_cast<const api::UpdateCommand&>(msg).getBucketId(); + return static_cast<const api::UpdateCommand&>(msg).getBucket(); case api::MessageType::REMOVE_ID: - return static_cast<const api::RemoveCommand&>(msg).getBucketId(); + return static_cast<const api::RemoveCommand&>(msg).getBucket(); case api::MessageType::REVERT_ID: - return static_cast<const api::RevertCommand&>(msg).getBucketId(); + return static_cast<const api::RevertCommand&>(msg).getBucket(); case api::MessageType::STATBUCKET_ID: - return static_cast<const api::StatBucketCommand&>(msg).getBucketId(); + return static_cast<const api::StatBucketCommand&>(msg).getBucket(); case api::MessageType::MULTIOPERATION_ID: return static_cast<const api::MultiOperationCommand&>(msg) - .getBucketId(); + .getBucket(); case api::MessageType::BATCHPUTREMOVE_ID: return static_cast<const api::BatchPutRemoveCommand&>(msg) - .getBucketId(); + .getBucket(); case api::MessageType::REMOVELOCATION_ID: return static_cast<const api::RemoveLocationCommand&>(msg) - .getBucketId(); + .getBucket(); case api::MessageType::CREATEBUCKET_ID: - return static_cast<const api::CreateBucketCommand&>(msg).getBucketId(); + return static_cast<const api::CreateBucketCommand&>(msg).getBucket(); case api::MessageType::DELETEBUCKET_ID: - return static_cast<const api::DeleteBucketCommand&>(msg).getBucketId(); + return static_cast<const api::DeleteBucketCommand&>(msg).getBucket(); case api::MessageType::MERGEBUCKET_ID: - return static_cast<const api::MergeBucketCommand&>(msg).getBucketId(); + return static_cast<const api::MergeBucketCommand&>(msg).getBucket(); case api::MessageType::GETBUCKETDIFF_ID: - return static_cast<const api::GetBucketDiffCommand&>(msg).getBucketId(); + return static_cast<const api::GetBucketDiffCommand&>(msg).getBucket(); case api::MessageType::GETBUCKETDIFF_REPLY_ID: - return static_cast<const api::GetBucketDiffReply&>(msg).getBucketId(); + return static_cast<const api::GetBucketDiffReply&>(msg).getBucket(); case api::MessageType::APPLYBUCKETDIFF_ID: return static_cast<const api::ApplyBucketDiffCommand&>(msg) - .getBucketId(); + .getBucket(); case api::MessageType::APPLYBUCKETDIFF_REPLY_ID: - return static_cast<const api::ApplyBucketDiffReply&>(msg).getBucketId(); + return static_cast<const api::ApplyBucketDiffReply&>(msg).getBucket(); case api::MessageType::JOINBUCKETS_ID: - return static_cast<const api::JoinBucketsCommand&>(msg).getBucketId(); + return static_cast<const api::JoinBucketsCommand&>(msg).getBucket(); case api::MessageType::SPLITBUCKET_ID: - return static_cast<const api::SplitBucketCommand&>(msg).getBucketId(); + return static_cast<const api::SplitBucketCommand&>(msg).getBucket(); case api::MessageType::SETBUCKETSTATE_ID: - return static_cast<const api::SetBucketStateCommand&>(msg).getBucketId(); + return static_cast<const api::SetBucketStateCommand&>(msg).getBucket(); case api::MessageType::INTERNAL_ID: switch(static_cast<const api::InternalCommand&>(msg).getType()) { case RequestStatusPage::ID: - return document::BucketId(); + return document::Bucket(); case GetIterCommand::ID: - return static_cast<const GetIterCommand&>(msg).getBucketId(); + return static_cast<const GetIterCommand&>(msg).getBucket(); case CreateIteratorCommand::ID: return static_cast<const CreateIteratorCommand&>(msg) - .getBucketId(); + .getBucket(); case ReadBucketList::ID: - return document::BucketId(); + return static_cast<const ReadBucketList&>(msg).getBucket(); case ReadBucketInfo::ID: - return static_cast<const ReadBucketInfo&>(msg).getBucketId(); + return static_cast<const ReadBucketInfo&>(msg).getBucket(); case RepairBucketCommand::ID: - return static_cast<const RepairBucketCommand&>(msg).getBucketId(); + return static_cast<const RepairBucketCommand&>(msg).getBucket(); case BucketDiskMoveCommand::ID: - return static_cast<const BucketDiskMoveCommand&>(msg).getBucketId(); + return static_cast<const BucketDiskMoveCommand&>(msg).getBucket(); case InternalBucketJoinCommand::ID: return static_cast<const InternalBucketJoinCommand&>(msg) - .getBucketId(); + .getBucket(); case RecheckBucketInfoCommand::ID: - return static_cast<const RecheckBucketInfoCommand&>(msg).getBucketId(); + return static_cast<const RecheckBucketInfoCommand&>(msg).getBucket(); default: break; } diff --git a/storage/src/vespa/storage/common/messagebucketid.h b/storage/src/vespa/storage/common/messagebucket.h index 863ee234f82..c383a14d541 100644 --- a/storage/src/vespa/storage/common/messagebucketid.h +++ b/storage/src/vespa/storage/common/messagebucket.h @@ -1,7 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vespa/document/bucket/bucketid.h> +#include <vespa/document/bucket/bucket.h> namespace storage { @@ -14,7 +14,7 @@ class StorageMessage; * @throws vespalib::IllegalArgumentException if msg does not * have a bucket id. */ -document::BucketId getStorageMessageBucketId( +document::Bucket getStorageMessageBucket( const api::StorageMessage& msg); } diff --git a/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp b/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp index 299917138db..4f4d47cacac 100644 --- a/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp +++ b/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp @@ -72,7 +72,7 @@ DiskMoveOperationHandler::handleBucketDiskMove(BucketDiskMoveCommand& cmd, // Move queued operations in bucket to new thread. Hold bucket lock // while doing it, so filestor manager can't put in other operations // first, such that operations change order. - _env._fileStorHandler.remapQueueAfterDiskMove(bucket.getBucketId(), deviceIndex, targetDisk); + _env._fileStorHandler.remapQueueAfterDiskMove(bucket, deviceIndex, targetDisk); if (entry.exist()) { entry->setBucketInfo(bInfo); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp index 27831d5edff..7286b6e3586 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp @@ -77,14 +77,14 @@ FileStorHandler::getNextMessage(uint16_t thread, } FileStorHandler::BucketLockInterface::SP -FileStorHandler::lock(const document::BucketId& bucket, uint16_t disk) +FileStorHandler::lock(const document::Bucket& bucket, uint16_t disk) { return _impl->lock(bucket, disk); } void FileStorHandler::remapQueueAfterDiskMove( - const document::BucketId& bucket, + const document::Bucket& bucket, uint16_t sourceDisk, uint16_t targetDisk) { RemapInfo target(bucket, targetDisk); @@ -111,10 +111,10 @@ FileStorHandler::remapQueueAfterSplit( } void -FileStorHandler::failOperations(const document::BucketId& bid, +FileStorHandler::failOperations(const document::Bucket &bucket, uint16_t fromDisk, const api::ReturnCode& err) { - _impl->failOperations(bid, fromDisk, err); + _impl->failOperations(bucket, fromDisk, err); } void diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index 04eec67a8c4..f7e32554c18 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -14,7 +14,7 @@ #pragma once #include "mergestatus.h" -#include <vespa/document/bucket/bucketid.h> +#include <vespa/document/bucket/bucket.h> #include <ostream> #include <vespa/storage/storageutil/resumeguard.h> #include <vespa/storage/common/messagesender.h> @@ -43,12 +43,12 @@ class AbortBucketOperationsCommand; class FileStorHandler : public MessageSender { public: struct RemapInfo { - document::BucketId bid; + document::Bucket bucket; uint16_t diskIndex; bool foundInQueue; - RemapInfo(const document::BucketId& bucketId, uint16_t diskIdx) - : bid(bucketId), + RemapInfo(const document::Bucket &bucket_, uint16_t diskIdx) + : bucket(bucket_), diskIndex(diskIdx), foundInQueue(false) {} @@ -58,7 +58,7 @@ public: public: typedef std::shared_ptr<BucketLockInterface> SP; - virtual const document::BucketId& getBucketId() const = 0; + virtual const document::Bucket &getBucket() const = 0; virtual ~BucketLockInterface() {}; }; @@ -156,7 +156,7 @@ public: * * */ - BucketLockInterface::SP lock(const document::BucketId&, uint16_t disk); + BucketLockInterface::SP lock(const document::Bucket&, uint16_t disk); /** * Called by FileStorThread::onBucketDiskMove() after moving file, in case @@ -168,7 +168,7 @@ public: * requeststatus - Ignore * readbucketinfo/bucketdiskmove/internalbucketjoin - Fail and log errors */ - void remapQueueAfterDiskMove(const document::BucketId& bucket, + void remapQueueAfterDiskMove(const document::Bucket &bucket, uint16_t sourceDisk, uint16_t targetDisk); /** @@ -219,7 +219,7 @@ public: * Fail all operations towards a single bucket currently queued to the * given thread with the given error code. */ - void failOperations(const document::BucketId&, uint16_t fromDisk, + void failOperations(const document::Bucket&, uint16_t fromDisk, const api::ReturnCode&); /** diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index e7fa4eadda0..a887d7d3791 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -10,7 +10,7 @@ #include <vespa/storage/common/bucketmessages.h> #include <vespa/storage/common/statusmessages.h> #include <vespa/storage/common/bucketoperationlogger.h> -#include <vespa/storage/common/messagebucketid.h> +#include <vespa/storage/common/messagebucket.h> #include <vespa/storage/persistence/messages.h> #include <vespa/storageapi/message/stat.h> #include <vespa/storageapi/message/batch.h> @@ -20,6 +20,8 @@ #include <vespa/log/log.h> LOG_SETUP(".persistence.filestor.handler.impl"); +using document::BucketSpace; + namespace storage { FileStorHandlerImpl::FileStorHandlerImpl( @@ -248,7 +250,7 @@ FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg, { assert(disk < _diskInfo.size()); Disk& t(_diskInfo[disk]); - MessageEntry messageEntry(msg, getStorageMessageBucketId(*msg)); + MessageEntry messageEntry(msg, getStorageMessageBucket(*msg)); vespalib::MonitorGuard lockGuard(t.lock); if (t.getState() == FileStorHandler::AVAILABLE) { @@ -335,11 +337,11 @@ FileStorHandlerImpl::abortQueuedCommandsForBuckets( "bucket operation was bound to"); for (iter_t it(t.queue.begin()), e(t.queue.end()); it != e;) { api::StorageMessage& msg(*it->_command); - if (messageMayBeAborted(msg) && cmd.shouldAbort(it->_bucketId)) { + if (messageMayBeAborted(msg) && cmd.shouldAbort(it->_bucket.getBucketId())) { LOG(debug, "Aborting operation %s as it is bound for bucket %s", msg.toString().c_str(), - it->_bucketId.toString().c_str()); + it->_bucket.getBucketId().toString().c_str()); std::shared_ptr<api::StorageReply> msgReply( static_cast<api::StorageCommand&>(msg).makeReply().release()); msgReply->setResult(abortedCode); @@ -358,7 +360,7 @@ FileStorHandlerImpl::diskHasActiveOperationForAbortedBucket( const AbortBucketOperationsCommand& cmd) const { for (auto& lockedBucket : disk.lockedBuckets) { - if (cmd.shouldAbort(lockedBucket.first)) { + if (cmd.shouldAbort(lockedBucket.first.getBucketId())) { LOG(spam, "Disk had active operation for aborted bucket %s, " "waiting for it to complete...", @@ -423,12 +425,12 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lck, uint8_t maxPriority) { - document::BucketId id(lck.first->getBucketId()); + document::Bucket bucket(lck.first->getBucket()); LOG(spam, "Disk %d retrieving message for buffered bucket %s", disk, - id.toString().c_str()); + bucket.getBucketId().toString().c_str()); assert(disk < _diskInfo.size()); Disk& t(_diskInfo[disk]); @@ -440,7 +442,7 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, vespalib::MonitorGuard lockGuard(t.lock); BucketIdx& idx = boost::multi_index::get<2>(t.queue); - std::pair<BucketIdx::iterator, BucketIdx::iterator> range = idx.equal_range(id); + std::pair<BucketIdx::iterator, BucketIdx::iterator> range = idx.equal_range(bucket); // No more for this bucket. if (range.first == range.second) { @@ -467,7 +469,7 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, LOG(debug, "Message %s waited %" PRIu64 " ms in storage queue (bucket %s), " "timeout %d", - m.toString().c_str(), waitTime, id.toString().c_str(), + m.toString().c_str(), waitTime, bucket.getBucketId().toString().c_str(), static_cast<api::StorageCommand&>(m).getTimeout()); if (m.getType().isReply() || @@ -540,11 +542,11 @@ std::unique_ptr<FileStorHandler::BucketLockInterface> FileStorHandlerImpl::takeDiskBucketLockOwnership( const vespalib::MonitorGuard & guard, Disk& disk, - const document::BucketId& id, + const document::Bucket &bucket, const api::StorageMessage& msg) { return std::unique_ptr<FileStorHandler::BucketLockInterface>( - new BucketLock(guard, disk, id, msg.getPriority(), msg.getSummary())); + new BucketLock(guard, disk, bucket, msg.getPriority(), msg.getSummary())); } std::unique_ptr<api::StorageReply> @@ -561,8 +563,8 @@ FileStorHandlerImpl::makeQueueTimeoutReply(api::StorageMessage& msg) const namespace { bool - bucketIsLockedOnDisk(const document::BucketId &id, const FileStorHandlerImpl::Disk &t) { - return (id.getRawId() != 0 && t.isLocked(id)); + bucketIsLockedOnDisk(const document::Bucket &id, const FileStorHandlerImpl::Disk &t) { + return (id.getBucketId().getRawId() != 0 && t.isLocked(id)); } /** @@ -598,7 +600,7 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, uint8_t maxPriority) PriorityIdx& idx(boost::multi_index::get<1>(t.queue)); PriorityIdx::iterator iter(idx.begin()), end(idx.end()); - while (iter != end && bucketIsLockedOnDisk(iter->_bucketId, t)) { + while (iter != end && bucketIsLockedOnDisk(iter->_bucket, t)) { iter++; } if (iter != end) { @@ -632,11 +634,11 @@ FileStorHandlerImpl::getMessage(vespalib::MonitorGuard & guard, Disk & t, Priori m.toString().c_str(), waitTime, static_cast<api::StorageCommand &>(m).getTimeout()); std::shared_ptr<api::StorageMessage> msg = std::move(iter->_command); - document::BucketId bucketId(iter->_bucketId); + document::Bucket bucket(iter->_bucket); idx.erase(iter); // iter not used after this point. if (!messageTimedOutInQueue(*msg, waitTime)) { - auto locker = takeDiskBucketLockOwnership(guard, t, bucketId, *msg); + auto locker = takeDiskBucketLockOwnership(guard, t, bucket, *msg); guard.unlock(); MBUS_TRACE(trace, 9, "FileStorHandler: Got lock on bucket"); return std::move(FileStorHandler::LockedMessage(std::move(locker), std::move(msg))); @@ -650,22 +652,22 @@ FileStorHandlerImpl::getMessage(vespalib::MonitorGuard & guard, Disk & t, Priori } std::shared_ptr<FileStorHandler::BucketLockInterface> -FileStorHandlerImpl::lock(const document::BucketId& bucket, uint16_t disk) +FileStorHandlerImpl::lock(const document::Bucket &bucket, uint16_t disk) { assert(disk < _diskInfo.size()); Disk& t(_diskInfo[disk]); LOG(spam, "Acquiring filestor lock for %s on disk %d", - bucket.toString().c_str(), + bucket.getBucketId().toString().c_str(), disk); vespalib::MonitorGuard lockGuard(t.lock); - while (bucket.getRawId() != 0 && t.isLocked(bucket)) { + while (bucket.getBucketId().getRawId() != 0 && t.isLocked(bucket)) { LOG(spam, "Contending for filestor lock for %s", - bucket.toString().c_str()); + bucket.getBucketId().toString().c_str()); lockGuard.wait(100); } @@ -740,10 +742,12 @@ FileStorHandlerImpl::calculateTargetBasedOnDocId( std::vector<RemapInfo*>& targets) { document::DocumentId id(getDocId(msg)); - document::BucketId bucket(_bucketIdFactory.getBucketId(id)); + document::Bucket bucket(BucketSpace::placeHolder(), _bucketIdFactory.getBucketId(id)); for (uint32_t i = 0; i < targets.size(); i++) { - if (targets[i]->bid.getRawId() != 0 && targets[i]->bid.contains(bucket)) { + if (targets[i]->bucket.getBucketId().getRawId() != 0 && + targets[i]->bucket.getBucketSpace() == bucket.getBucketSpace() && + targets[i]->bucket.getBucketId().contains(bucket.getBucketId())) { return i; } } @@ -751,15 +755,15 @@ FileStorHandlerImpl::calculateTargetBasedOnDocId( return -1; } -document::BucketId +document::Bucket FileStorHandlerImpl::remapMessage( api::StorageMessage& msg, - const document::BucketId& source, + const document::Bucket& source, Operation op, std::vector<RemapInfo*>& targets, uint16_t& targetDisk, api::ReturnCode& returnCode) { - document::BucketId newBucketId = source; + document::Bucket newBucket = source; switch (msg.getType().getId()) { case api::MessageType::GET_ID: @@ -771,12 +775,12 @@ FileStorHandlerImpl::remapMessage( api::BucketCommand& cmd( static_cast<api::BucketCommand&>(msg)); - if (cmd.getBucketId() == source) { + if (cmd.getBucket() == source) { if (op == SPLIT) { int idx = calculateTargetBasedOnDocId(msg, targets); if (idx > -1) { - cmd.remapBucketId(targets[idx]->bid); + cmd.remapBucketId(targets[idx]->bucket.getBucketId()); targets[idx]->foundInQueue = true; targetDisk = targets[idx]->diskIndex; #if defined(ENABLE_BUCKET_OPERATION_LOGGING) @@ -789,19 +793,19 @@ FileStorHandlerImpl::remapMessage( LOG_BUCKET_OPERATION_NO_LOCK(targets[idx]->bid, desc); } #endif - newBucketId = targets[idx]->bid; + newBucket = targets[idx]->bucket; } else { document::DocumentId did(getDocId(msg)); document::BucketId bucket = _bucketIdFactory.getBucketId(did); uint32_t commonBits( - findCommonBits(targets[0]->bid, bucket)); - if (commonBits < source.getUsedBits()) { + findCommonBits(targets[0]->bucket.getBucketId(), bucket)); + if (commonBits < source.getBucketId().getUsedBits()) { std::ostringstream ost; ost << bucket << " belongs in neither " - << targets[0]->bid << " nor " << targets[1]->bid + << targets[0]->bucket.getBucketId() << " nor " << targets[1]->bucket.getBucketId() << ". Cannot remap it after split. It " << "did not belong in the original " - << "bucket " << source; + << "bucket " << source.getBucketId(); LOG(error, "Error remapping %s after split %s", cmd.getType().toString().c_str(), ost.str().c_str()); @@ -810,9 +814,9 @@ FileStorHandlerImpl::remapMessage( } else { std::ostringstream ost; assert(targets.size() == 2); - ost << "Bucket " << source << " was split and " - << "neither bucket " << targets[0]->bid << " nor " - << targets[1]->bid << " fit for this operation. " + ost << "Bucket " << source.getBucketId() << " was split and " + << "neither bucket " << targets[0]->bucket.getBucketId() << " nor " + << targets[1]->bucket.getBucketId() << " fit for this operation. " << "Failing operation so distributor can create " << "bucket on correct node."; LOG(debug, "%s", ost.str().c_str()); @@ -823,9 +827,9 @@ FileStorHandlerImpl::remapMessage( } } else { LOG(debug, "Remapping %s operation to bucket %s", - cmd.toString().c_str(), targets[0]->bid.toString().c_str()); - cmd.remapBucketId(targets[0]->bid); - newBucketId = targets[0]->bid; + cmd.toString().c_str(), targets[0]->bucket.getBucketId().toString().c_str()); + cmd.remapBucketId(targets[0]->bucket.getBucketId()); + newBucket = targets[0]->bucket; targetDisk = targets[0]->diskIndex; #ifdef ENABLE_BUCKET_OPERATION_LOGGING { @@ -857,7 +861,7 @@ FileStorHandlerImpl::remapMessage( { api::BucketCommand& cmd( static_cast<api::BucketCommand&>(msg)); - if (cmd.getBucketId() == source) { + if (cmd.getBucket() == source) { if (op != MOVE) { std::ostringstream ost; ost << "Bucket " << (op == SPLIT ? "split" : "joined") @@ -877,7 +881,7 @@ FileStorHandlerImpl::remapMessage( { api::BucketCommand& cmd( static_cast<api::BucketCommand&>(msg)); - if (cmd.getBucketId() == source) { + if (cmd.getBucket() == source) { if (op == MOVE) { targetDisk = targets[0]->diskIndex; } else if (op == SPLIT) { @@ -903,7 +907,7 @@ FileStorHandlerImpl::remapMessage( // Fail with bucket not found if op != MOVE api::BucketCommand& cmd( static_cast<api::BucketCommand&>(msg)); - if (cmd.getBucketId() == source) { + if (cmd.getBucket() == source) { if (op == MOVE) { targetDisk = targets[0]->diskIndex; } else { @@ -922,7 +926,7 @@ FileStorHandlerImpl::remapMessage( { api::BucketCommand& cmd( static_cast<api::BucketCommand&>(msg)); - if (cmd.getBucketId() == source) { + if (cmd.getBucket() == source) { if (op == MOVE) { targetDisk = targets[0]->diskIndex; } @@ -933,13 +937,13 @@ FileStorHandlerImpl::remapMessage( { const api::InternalCommand& icmd( static_cast<const api::InternalCommand&>(msg)); - document::BucketId bucket; + document::Bucket bucket; switch(icmd.getType()) { case RequestStatusPage::ID: // Ignore break; case CreateIteratorCommand::ID: - bucket = static_cast<CreateIteratorCommand&>(msg).getBucketId(); + bucket = static_cast<CreateIteratorCommand&>(msg).getBucket(); // Move to correct queue if op == MOVE // Fail with bucket not found if op != MOVE if (bucket == source) { @@ -955,12 +959,12 @@ FileStorHandlerImpl::remapMessage( } break; case GetIterCommand::ID: - bucket = static_cast<GetIterCommand&>(msg).getBucketId(); + bucket = static_cast<GetIterCommand&>(msg).getBucket(); //@fallthrough@ case RepairBucketCommand::ID: - if (bucket.getRawId() == 0) { + if (bucket.getBucketId().getRawId() == 0) { bucket = static_cast<RepairBucketCommand&>(msg) - .getBucketId(); + .getBucket(); } // Move to correct queue if op == MOVE // Fail with bucket not found if op != MOVE @@ -982,7 +986,7 @@ FileStorHandlerImpl::remapMessage( { api::BucketCommand& cmd( static_cast<api::BucketCommand&>(msg)); - if (cmd.getBucketId() == source) { + if (cmd.getBucket() == source) { if (op == MOVE) { returnCode = api::ReturnCode( api::ReturnCode::INTERNAL_FAILURE, @@ -1005,7 +1009,7 @@ FileStorHandlerImpl::remapMessage( { LOG(debug, "While remapping load for bucket %s for reason %u, " "we abort read bucket info request for this bucket.", - source.toString().c_str(), op); + source.getBucketId().toString().c_str(), op); break; } case InternalBucketJoinCommand::ID: @@ -1035,7 +1039,7 @@ FileStorHandlerImpl::remapMessage( } } // End of switch - return newBucketId; + return newBucket; } void @@ -1047,13 +1051,13 @@ FileStorHandlerImpl::remapQueueNoLock( { BucketIdx& idx(boost::multi_index::get<2>(from.queue)); std::pair<BucketIdx::iterator, BucketIdx::iterator> range( - idx.equal_range(source.bid)); + idx.equal_range(source.bucket)); std::vector<MessageEntry> entriesFound; // Find all the messages for the given bucket. for (BucketIdx::iterator i = range.first; i != range.second; ++i) { - assert(i->_bucketId == source.bid); + assert(i->_bucket == source.bucket); entriesFound.push_back(std::move(*i)); } @@ -1071,14 +1075,14 @@ FileStorHandlerImpl::remapQueueNoLock( // If not OK, reply to this message with the following message api::ReturnCode returnCode(api::ReturnCode::OK); api::StorageMessage& msg(*entry._command); - assert(entry._bucketId == source.bid); + assert(entry._bucket == source.bucket); - document::BucketId bid = remapMessage(msg, - source.bid, - op, - targets, - targetDisk, - returnCode); + document::Bucket bucket = remapMessage(msg, + source.bucket, + op, + targets, + targetDisk, + returnCode); if (returnCode.getResult() != api::ReturnCode::OK) { // Fail message if errorcode set @@ -1094,7 +1098,7 @@ FileStorHandlerImpl::remapQueueNoLock( _messageSender.sendReply(rep); } } else { - entry._bucketId = bid; + entry._bucket = bucket; // Move to correct disk queue if needed _diskInfo[targetDisk].queue.emplace_back(std::move(entry)); } @@ -1115,7 +1119,7 @@ FileStorHandlerImpl::remapQueue( guard.addLock(from.lock, source.diskIndex); Disk& to1(_diskInfo[target.diskIndex]); - if (target.bid.getRawId() != 0) { + if (target.bucket.getBucketId().getRawId() != 0) { guard.addLock(to1.lock, target.diskIndex); } @@ -1142,12 +1146,12 @@ FileStorHandlerImpl::remapQueue( guard.addLock(from.lock, source.diskIndex); Disk& to1(_diskInfo[target1.diskIndex]); - if (target1.bid.getRawId() != 0) { + if (target1.bucket.getBucketId().getRawId() != 0) { guard.addLock(to1.lock, target1.diskIndex); } Disk& to2(_diskInfo[target2.diskIndex]); - if (target2.bid.getRawId() != 0) { + if (target2.bucket.getBucketId().getRawId() != 0) { guard.addLock(to2.lock, target2.diskIndex); } @@ -1162,7 +1166,7 @@ FileStorHandlerImpl::remapQueue( void FileStorHandlerImpl::failOperations( - const document::BucketId& bucket, uint16_t fromDisk, + const document::Bucket &bucket, uint16_t fromDisk, const api::ReturnCode& err) { Disk& from(_diskInfo[fromDisk]); @@ -1204,10 +1208,10 @@ FileStorHandlerImpl::sendReply(const std::shared_ptr<api::StorageReply>& msg) } FileStorHandlerImpl::MessageEntry::MessageEntry(const std::shared_ptr<api::StorageMessage>& cmd, - const document::BucketId& bId) + const document::Bucket &bucket) : _command(cmd), _timer(), - _bucketId(bId), + _bucket(bucket), _priority(cmd->getPriority()) { } @@ -1215,7 +1219,7 @@ FileStorHandlerImpl::MessageEntry::MessageEntry(const std::shared_ptr<api::Stora FileStorHandlerImpl::MessageEntry::MessageEntry(const MessageEntry& entry) : _command(entry._command), _timer(entry._timer), - _bucketId(entry._bucketId), + _bucket(entry._bucket), _priority(entry._priority) { } @@ -1223,7 +1227,7 @@ FileStorHandlerImpl::MessageEntry::MessageEntry(const MessageEntry& entry) FileStorHandlerImpl::MessageEntry::MessageEntry(MessageEntry && entry) noexcept : _command(std::move(entry._command)), _timer(entry._timer), - _bucketId(entry._bucketId), + _bucket(entry._bucket), _priority(entry._priority) { } @@ -1240,7 +1244,7 @@ FileStorHandlerImpl::Disk::Disk() FileStorHandlerImpl::Disk::~Disk() { } bool -FileStorHandlerImpl::Disk::isLocked(const document::BucketId& bucket) const noexcept +FileStorHandlerImpl::Disk::isLocked(const document::Bucket& bucket) const noexcept { return (lockedBuckets.find(bucket) != lockedBuckets.end()); } @@ -1262,25 +1266,25 @@ FileStorHandlerImpl::getQueueSize(uint16_t disk) const FileStorHandlerImpl::BucketLock::BucketLock( const vespalib::MonitorGuard & guard, Disk& disk, - const document::BucketId& id, + const document::Bucket &bucket, uint8_t priority, const vespalib::stringref & statusString) : _disk(disk), - _id(id) + _bucket(bucket) { (void) guard; - if (_id.getRawId() != 0) { + if (_bucket.getBucketId().getRawId() != 0) { // Lock the bucket and wait until it is not the current operation for // the disk itself. _disk.lockedBuckets.insert( - std::make_pair(_id, Disk::LockEntry(priority, statusString))); + std::make_pair(_bucket, Disk::LockEntry(priority, statusString))); LOG(debug, "Locked bucket %s with priority %u", - id.toString().c_str(), + bucket.getBucketId().toString().c_str(), priority); LOG_BUCKET_OPERATION_SET_LOCK_STATE( - _id, "acquired filestor lock", false, + _bucket.getBucketId(), "acquired filestor lock", false, debug::BucketOperationLogger::State::BUCKET_LOCKED); } } @@ -1288,12 +1292,12 @@ FileStorHandlerImpl::BucketLock::BucketLock( FileStorHandlerImpl::BucketLock::~BucketLock() { - if (_id.getRawId() != 0) { + if (_bucket.getBucketId().getRawId() != 0) { vespalib::MonitorGuard lockGuard(_disk.lock); - _disk.lockedBuckets.erase(_id); - LOG(debug, "Unlocked bucket %s", _id.toString().c_str()); + _disk.lockedBuckets.erase(_bucket); + LOG(debug, "Unlocked bucket %s", _bucket.getBucketId().toString().c_str()); LOG_BUCKET_OPERATION_SET_LOCK_STATE( - _id, "released filestor lock", true, + _bucket.getBucketId(), "released filestor lock", true, debug::BucketOperationLogger::State::BUCKET_UNLOCKED); lockGuard.broadcast(); } @@ -1312,7 +1316,7 @@ FileStorHandlerImpl::dumpQueue(uint16_t disk) const it != idx.end(); it++) { - ost << it->_bucketId << ": " << it->_command->toString() << " (priority: " + ost << it->_bucket.getBucketId() << ": " << it->_command->toString() << " (priority: " << (int)it->_command->getPriority() << ")\n"; } @@ -1339,7 +1343,7 @@ FileStorHandlerImpl::getStatus(std::ostream& out, out << "<h4>Active operations</h4>\n"; for (const auto& lockedBucket : t.lockedBuckets) { out << lockedBucket.second.statusString - << " (" << lockedBucket.first + << " (" << lockedBucket.first.getBucketId() << ") Running for " << (_component.getClock().getTimeInSeconds().getTime() - lockedBucket.second.timestamp) diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 7d051a3a631..58e93d733fd 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -47,10 +47,10 @@ public: struct MessageEntry { std::shared_ptr<api::StorageMessage> _command; metrics::MetricTimer _timer; - document::BucketId _bucketId; + document::Bucket _bucket; uint8_t _priority; - MessageEntry(const std::shared_ptr<api::StorageMessage>& cmd, const document::BucketId& bId); + MessageEntry(const std::shared_ptr<api::StorageMessage>& cmd, const document::Bucket &bId); MessageEntry(MessageEntry &&) noexcept ; MessageEntry(const MessageEntry &); MessageEntry & operator = (const MessageEntry &) = delete; @@ -66,8 +66,8 @@ public: typedef boost::multi_index::ordered_non_unique< boost::multi_index::member<MessageEntry, - document::BucketId, - &MessageEntry::_bucketId> > BucketOrder; + document::Bucket, + &MessageEntry::_bucket> > BucketOrder; typedef boost::multi_index::multi_index_container< MessageEntry, @@ -101,7 +101,7 @@ public: { } }; - typedef vespalib::hash_map<document::BucketId, LockEntry, document::BucketId::hash> LockedBuckets; + typedef vespalib::hash_map<document::Bucket, LockEntry, document::Bucket::hash> LockedBuckets; LockedBuckets lockedBuckets; FileStorDiskMetrics* metrics; @@ -123,7 +123,7 @@ public: Disk(); ~Disk(); - bool isLocked(const document::BucketId&) const noexcept; + bool isLocked(const document::Bucket&) const noexcept; uint32_t getQueueSize() const noexcept; private: std::atomic<DiskState> state; @@ -131,15 +131,15 @@ public: class BucketLock : public FileStorHandler::BucketLockInterface { public: - BucketLock(const vespalib::MonitorGuard & guard, Disk& disk, const document::BucketId& id, uint8_t priority, + BucketLock(const vespalib::MonitorGuard & guard, Disk& disk, const document::Bucket &bucket, uint8_t priority, const vespalib::stringref & statusString); ~BucketLock(); - const document::BucketId& getBucketId() const override { return _id; } + const document::Bucket &getBucket() const override { return _bucket; } private: Disk& _disk; - document::BucketId _id; + document::Bucket _bucket; }; FileStorHandlerImpl(MessageSender&, @@ -170,7 +170,7 @@ public: void remapQueue(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2, Operation op); - void failOperations(const document::BucketId&, uint16_t fromDisk, const api::ReturnCode&); + void failOperations(const document::Bucket&, uint16_t fromDisk, const api::ReturnCode&); void sendCommand(const std::shared_ptr<api::StorageCommand>&) override; void sendReply(const std::shared_ptr<api::StorageReply>&) override; @@ -180,7 +180,7 @@ public: uint32_t getQueueSize(uint16_t disk) const; std::shared_ptr<FileStorHandler::BucketLockInterface> - lock(const document::BucketId&, uint16_t disk); + lock(const document::Bucket&, uint16_t disk); void addMergeStatus(const document::BucketId&, MergeStatus::SP); MergeStatus& editMergeStatus(const document::BucketId&); @@ -255,7 +255,7 @@ private: */ std::unique_ptr<FileStorHandler::BucketLockInterface> takeDiskBucketLockOwnership(const vespalib::MonitorGuard & guard, - Disk& disk, const document::BucketId& id, const api::StorageMessage& msg); + Disk& disk, const document::Bucket &bucket, const api::StorageMessage& msg); /** * Creates and returns a reply with api::TIMEOUT return code for msg. @@ -271,12 +271,12 @@ private: // Update hook void updateMetrics(const MetricLockGuard &) override; - document::BucketId remapMessage(api::StorageMessage& msg, - const document::BucketId& source, - Operation op, - std::vector<RemapInfo*>& targets, - uint16_t& targetDisk, - api::ReturnCode& returnCode); + document::Bucket remapMessage(api::StorageMessage& msg, + const document::Bucket &source, + Operation op, + std::vector<RemapInfo*>& targets, + uint16_t& targetDisk, + api::ReturnCode& returnCode); void remapQueueNoLock(Disk& from, const RemapInfo& source, std::vector<RemapInfo*>& targets, Operation op); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index ac3d901fd65..77f6d912306 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -11,7 +11,7 @@ #include <vespa/storage/config/config-stor-server.h> #include <vespa/storage/persistence/persistencethread.h> #include <vespa/storage/storageutil/log.h> -#include <vespa/storage/common/messagebucketid.h> +#include <vespa/storage/common/messagebucket.h> #include <vespa/storage/persistence/bucketownershipnotifier.h> #include <vespa/storageapi/message/batch.h> #include <vespa/storage/common/bucketoperationlogger.h> @@ -259,7 +259,7 @@ FileStorManager::handlePersistenceMessage( msg->getType().getName().c_str(), disk); LOG_BUCKET_OPERATION_NO_LOCK( - getStorageMessageBucketId(*msg), + getStorageMessageBucketId(*msg).getBucketId(), vespalib::make_string("Attempting to queue %s to disk %u", msg->toString().c_str(), disk)); @@ -514,7 +514,7 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd) disk = entry->disk; entry.remove(); } - _filestorHandler->failOperations(cmd->getBucketId(), disk, + _filestorHandler->failOperations(cmd->getBucket(), disk, api::ReturnCode(api::ReturnCode::BUCKET_DELETED, vespalib::make_string("Bucket %s about to be deleted anyway", cmd->getBucketId().toString().c_str()))); diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index d0255405dda..2f1dafbb388 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -545,8 +545,8 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) LOG(debug, "split(%s -> %s, %s)", cmd.getBucketId().toString().c_str(), target1.getBucketId().toString().c_str(), target2.getBucketId().toString().c_str()); - PersistenceUtil::LockResult lock1(_env.lockAndGetDisk(target1.getBucketId())); - PersistenceUtil::LockResult lock2(_env.lockAndGetDisk(target2.getBucketId())); + PersistenceUtil::LockResult lock1(_env.lockAndGetDisk(target1)); + PersistenceUtil::LockResult lock2(_env.lockAndGetDisk(target2)); #ifdef ENABLE_BUCKET_OPERATION_LOGGING { @@ -590,7 +590,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) _env.getBucketDatabase().get( target.getBucketId(), "PersistenceThread::handleSplitBucket - Target", StorBucketDatabase::CREATE_IF_NONEXISTING), - FileStorHandler::RemapInfo(target.getBucketId(), disk))); + FileStorHandler::RemapInfo(target, disk))); targets.back().first->setBucketInfo( _env.getBucketInfo(target, disk)); targets.back().first->disk = disk; @@ -606,7 +606,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) target2.getBucketId().toString().c_str(), targ2.getMetaCount()); } - FileStorHandler::RemapInfo source(cmd.getBucketId(), _env._partition); + FileStorHandler::RemapInfo source(cmd.getBucket(), _env._partition); _env._fileStorHandler.remapQueueAfterSplit( source, targets[0].second, targets[1].second); bool ownershipChanged( @@ -615,7 +615,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) // Now release all the bucketdb locks. for (uint32_t i = 0; i < targets.size(); i++) { if (ownershipChanged) { - notifyGuard.notifyAlways(document::Bucket(spiBucket.getBucketSpace(), targets[i].second.bid), + notifyGuard.notifyAlways(targets[i].second.bucket, targets[i].first->getBucketInfo()); } // The entries vector has the source bucket in element zero, so indexing @@ -630,7 +630,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) // to an empty target bucket, since the provider will have // implicitly erased it by this point. spi::Bucket createTarget( - spi::Bucket(document::Bucket(spiBucket.getBucketSpace(), targets[i].second.bid), + spi::Bucket(targets[i].second.bucket, spi::PartitionId(targets[i].second.diskIndex))); LOG(debug, "Split target %s was empty, but re-creating it since " @@ -640,7 +640,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) } splitReply->getSplitInfo().push_back( api::SplitBucketReply::Entry( - targets[i].second.bid, + targets[i].second.bucket.getBucketId(), targets[i].first->getBucketInfo())); targets[i].first.write(); } else { @@ -715,8 +715,9 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd) entry.write(); } - document::BucketId firstBucket(cmd.getSourceBuckets()[0]); - document::BucketId secondBucket(cmd.getSourceBuckets()[1]); + document::Bucket destBucket = cmd.getBucket(); + document::Bucket firstBucket(destBucket.getBucketSpace(), cmd.getSourceBuckets()[0]); + document::Bucket secondBucket(destBucket.getBucketSpace(), cmd.getSourceBuckets()[1]); PersistenceUtil::LockResult lock1(_env.lockAndGetDisk(firstBucket)); PersistenceUtil::LockResult lock2; @@ -729,20 +730,19 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd) vespalib::string desc( vespalib::make_string( "join(%s, %s -> %s)", - firstBucket.toString().c_str(), - secondBucket.toString().c_str(), + firstBucket.getBucketId().toString().c_str(), + secondBucket.getBucketId().toString().c_str(), cmd.getBucketId().toString().c_str())); LOG_BUCKET_OPERATION(cmd.getBucketId(), desc); - LOG_BUCKET_OPERATION(firstBucket, desc); + LOG_BUCKET_OPERATION(firstBucket.getBucketId(), desc); if (firstBucket != secondBucket) { - LOG_BUCKET_OPERATION(secondBucket, desc); + LOG_BUCKET_OPERATION(secondBucket.getBucketId(), desc); } } #endif - document::Bucket destBucket = cmd.getBucket(); spi::Result result = - _spi.join(spi::Bucket(document::Bucket(destBucket.getBucketSpace(), firstBucket), spi::PartitionId(lock1.disk)), - spi::Bucket(document::Bucket(destBucket.getBucketSpace(), secondBucket), spi::PartitionId(lock2.disk)), + _spi.join(spi::Bucket(firstBucket, spi::PartitionId(lock1.disk)), + spi::Bucket(secondBucket, spi::PartitionId(lock2.disk)), spi::Bucket(destBucket, spi::PartitionId(_env._partition)), _context); if (!checkForError(result, *tracker)) { @@ -754,17 +754,17 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd) } uint64_t lastModified = 0; for (uint32_t i = 0; i < cmd.getSourceBuckets().size(); i++) { - document::BucketId bId = cmd.getSourceBuckets()[i]; + document::Bucket srcBucket(destBucket.getBucketSpace(), cmd.getSourceBuckets()[i]); uint16_t disk = (i == 0) ? lock1.disk : lock2.disk; - FileStorHandler::RemapInfo target(cmd.getBucketId(), + FileStorHandler::RemapInfo target(cmd.getBucket(), _env._partition); _env._fileStorHandler.remapQueueAfterJoin( - FileStorHandler::RemapInfo(bId, disk), + FileStorHandler::RemapInfo(srcBucket, disk), target); // Remove source from bucket db. StorBucketDatabase::WrappedEntry entry( _env.getBucketDatabase().get( - bId, "join-remove-source")); + srcBucket.getBucketId(), "join-remove-source")); if (entry.exist()) { lastModified = std::max(lastModified, entry->info.getLastModified()); @@ -1101,7 +1101,7 @@ bool hasBucketInfo(const api::StorageMessage& msg) void PersistenceThread::flushAllReplies( - const document::BucketId& bucketId, + const document::Bucket& bucket, std::vector<std::unique_ptr<MessageTracker> >& replies) { if (replies.empty()) { @@ -1128,14 +1128,14 @@ PersistenceThread::flushAllReplies( } } LOG_BUCKET_OPERATION( - bucketId, + bucket.getBucketId(), vespalib::make_string( "flushing %zu operations (%zu puts, %zu removes, " "%zu other)", replies.size(), nputs, nremoves, nother)); } #endif - spi::Bucket b(document::Bucket(document::BucketSpace::placeHolder(), bucketId), spi::PartitionId(_env._partition)); + spi::Bucket b(bucket, spi::PartitionId(_env._partition)); spi::Result result = _spi.flush(b, _context); uint32_t errorCode = _env.convertErrorCode(result); if (errorCode != 0) { @@ -1167,7 +1167,7 @@ PersistenceThread::flushAllReplies( void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock) { std::vector<MessageTracker::UP> trackers; - document::BucketId bucketId = lock.first->getBucketId(); + document::Bucket bucket = lock.first->getBucket(); while (lock.second.get() != 0) { LOG(debug, "Inside while loop %d, nodeIndex %d, ptr=%p", @@ -1178,7 +1178,7 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock) // If the next operation wasn't batchable, we should flush // everything that came before. if (!batchable) { - flushAllReplies(bucketId, trackers); + flushAllReplies(bucket, trackers); } std::unique_ptr<MessageTracker> tracker = processMessage(*msg); @@ -1189,15 +1189,13 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock) if (hasBucketInfo(*msg)) { if (tracker->getReply()->getResult().success()) { - document::Bucket bucket = msg->getBucket(); - assert(bucket.getBucketId() == bucketId); _env.setBucketInfo(*tracker, bucket); } } if (batchable) { LOG(spam, "Adding reply %s to batch for bucket %s", tracker->getReply()->toString().c_str(), - bucketId.toString().c_str()); + bucket.getBucketId().toString().c_str()); trackers.push_back(std::move(tracker)); @@ -1220,7 +1218,7 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock) } } - flushAllReplies(bucketId, trackers); + flushAllReplies(bucket, trackers); } void diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index 4d714dc878a..1e844735fff 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -87,7 +87,7 @@ private: bool checkForError(const spi::Result& response, MessageTracker& tracker); spi::Bucket getBucket(const DocumentId& id, const BucketId& bucket) const; - void flushAllReplies(const document::BucketId& bucketId, std::vector<MessageTracker::UP>& trackers); + void flushAllReplies(const document::Bucket& bucket, std::vector<MessageTracker::UP>& trackers); friend class TestAndSetHelper; bool tasConditionExists(const api::TestAndSetCommand & cmd); diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index e28281a5819..25578048f69 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -113,13 +113,13 @@ PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket, } uint16_t -PersistenceUtil::getPreferredAvailableDisk(const document::BucketId& id) const +PersistenceUtil::getPreferredAvailableDisk(const document::Bucket &bucket) const { - return _component.getPreferredAvailablePartition(id); + return _component.getPreferredAvailablePartition(bucket.getBucketId()); } PersistenceUtil::LockResult -PersistenceUtil::lockAndGetDisk(const document::BucketId& bucket, +PersistenceUtil::lockAndGetDisk(const document::Bucket &bucket, StorBucketDatabase::Flag flags) { // To lock the bucket, we need to ensure that we don't conflict with @@ -135,7 +135,7 @@ PersistenceUtil::lockAndGetDisk(const document::BucketId& bucket, _fileStorHandler.lock(bucket, result.disk)); StorBucketDatabase::WrappedEntry entry(getBucketDatabase().get( - bucket, "join-lockAndGetDisk-1", flags)); + bucket.getBucketId(), "join-lockAndGetDisk-1", flags)); if (entry.exist() && entry->disk != result.disk) { result.disk = entry->disk; continue; diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h index c8ec6767c4b..49ea5c82ab4 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.h +++ b/storage/src/vespa/storage/persistence/persistenceutil.h @@ -91,7 +91,7 @@ struct PersistenceUtil { void updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& info); - uint16_t getPreferredAvailableDisk(const document::BucketId& id) const; + uint16_t getPreferredAvailableDisk(const document::Bucket &bucket) const; /** Lock the given bucket in the file stor handler. */ struct LockResult { @@ -104,7 +104,7 @@ struct PersistenceUtil { }; LockResult lockAndGetDisk( - const document::BucketId& bucket, + const document::Bucket &bucket, StorBucketDatabase::Flag flags = StorBucketDatabase::NONE); api::BucketInfo getBucketInfo(const document::Bucket &bucket, int disk = -1) const; diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp index c69a8cc06f7..e981e29e806 100644 --- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp +++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp @@ -3,7 +3,7 @@ #include "changedbucketownershiphandler.h" #include <vespa/storageapi/message/state.h> #include <vespa/storage/bucketdb/storbucketdb.h> -#include <vespa/storage/common/messagebucketid.h> +#include <vespa/storage/common/messagebucket.h> #include <vespa/storage/common/nodestateupdater.h> #include <vespa/vespalib/util/exceptions.h> @@ -323,8 +323,8 @@ ChangedBucketOwnershipHandler::sendingDistributorOwnsBucketInCurrentState( } try { - document::BucketId opBucket(getStorageMessageBucketId(cmd)); - return (current->ownerOf(opBucket) == cmd.getSourceIndex()); + document::Bucket opBucket(getStorageMessageBucket(cmd)); + return (current->ownerOf(opBucket.getBucketId()) == cmd.getSourceIndex()); } catch (vespalib::IllegalArgumentException& e) { LOG(error, "Precondition violation: unable to get bucket from " |