diff options
author | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-10-16 12:44:54 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-16 12:44:54 +0200 |
commit | 6ac5a9c0f069287422e617be88e36b3d769ba014 (patch) | |
tree | 4c351c055e86b115377cb64257367cc8fa8a17a0 /storage | |
parent | fa59545b68e03ebd3568dd33ca40624ecd05dd35 (diff) | |
parent | a179357a7f8ebc6d052c913d5ff6303ce3bc88fc (diff) |
Merge pull request #3747 from vespa-engine/toregge/add-bucket-space-to-storage-commands
Adjust API for storage messages: Replace virtual getBucketId() method…
Diffstat (limited to 'storage')
6 files changed, 52 insertions, 67 deletions
diff --git a/storage/src/vespa/storage/common/bucketmessages.h b/storage/src/vespa/storage/common/bucketmessages.h index dd8d411b47e..abfadb77c4a 100644 --- a/storage/src/vespa/storage/common/bucketmessages.h +++ b/storage/src/vespa/storage/common/bucketmessages.h @@ -3,7 +3,7 @@ #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/storageapi/message/internal.h> -#include <vespa/document/bucket/bucketid.h> +#include <vespa/document/bucket/bucket.h> #include <vespa/storageapi/buckets/bucketinfo.h> #include <vector> #include <set> @@ -78,7 +78,7 @@ public: ReadBucketInfo(const document::BucketId& bucketId); ~ReadBucketInfo(); - document::BucketId getBucketId() const override { return _bucketId; } + document::Bucket getBucket() const override { return getPlaceHolderBucket(_bucketId); } bool hasSingleBucketId() const override { return true; } std::unique_ptr<api::StorageReply> makeReply() override; @@ -102,7 +102,7 @@ public: ReadBucketInfoReply(const ReadBucketInfo& cmd); ~ReadBucketInfoReply(); - document::BucketId getBucketId() const override { return _bucketId; } + document::Bucket getBucket() const override { return getPlaceHolderBucket(_bucketId); } bool hasSingleBucketId() const override { return true; } void print(std::ostream& out, bool verbose, const std::string& indent) const override; @@ -133,7 +133,7 @@ public: ~RepairBucketCommand(); bool hasSingleBucketId() const override { return true; } - document::BucketId getBucketId() const override { return _bucket; } + document::Bucket getBucket() const override { return getPlaceHolderBucket(_bucket); } uint16_t getDisk() const { return _disk; } bool verifyBody() const { return _verifyBody; } @@ -166,7 +166,7 @@ public: RepairBucketReply(const RepairBucketCommand& cmd, const api::BucketInfo& bucketInfo = api::BucketInfo()); ~RepairBucketReply(); - document::BucketId getBucketId() const override { return _bucket; } + document::Bucket getBucket() const override { return getPlaceHolderBucket(_bucket); } bool hasSingleBucketId() const override { return true; } const api::BucketInfo& getBucketInfo() const { return _bucketInfo; } @@ -199,7 +199,7 @@ public: BucketDiskMoveCommand(const document::BucketId& bucket, uint16_t srcDisk, uint16_t dstDisk); ~BucketDiskMoveCommand(); - document::BucketId getBucketId() const override { return _bucket; } + document::Bucket getBucket() const override { return getPlaceHolderBucket(_bucket); } bool hasSingleBucketId() const override { return true; } uint16_t getSrcDisk() const { return _srcDisk; } @@ -234,7 +234,7 @@ public: uint32_t destinationFileSize = 0); ~BucketDiskMoveReply(); - document::BucketId getBucketId() const override { return _bucket; } + document::Bucket getBucket() const override { return getPlaceHolderBucket(_bucket); } bool hasSingleBucketId() const override { return true; } const api::BucketInfo& getBucketInfo() const { return _bucketInfo; } @@ -270,7 +270,7 @@ public: InternalBucketJoinCommand(const document::BucketId& bucket, uint16_t keepOnDisk, uint16_t joinFromDisk); ~InternalBucketJoinCommand(); - document::BucketId getBucketId() const override { return _bucket; } + document::Bucket getBucket() const override { return getPlaceHolderBucket(_bucket); } bool hasSingleBucketId() const override { return true; } uint16_t getDiskOfInstanceToKeep() const { return _keepOnDisk; } @@ -296,7 +296,7 @@ public: const api::BucketInfo& info = api::BucketInfo()); ~InternalBucketJoinReply(); - document::BucketId getBucketId() const override { return _bucket; } + document::Bucket getBucket() const override { return getPlaceHolderBucket(_bucket); } bool hasSingleBucketId() const override { return true; } const api::BucketInfo& getBucketInfo() const { return _bucketInfo; } diff --git a/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp b/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp index 86b04066a5a..93c005728f4 100644 --- a/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp +++ b/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp @@ -22,7 +22,7 @@ DiskMoveOperationHandler::handleBucketDiskMove(BucketDiskMoveCommand& cmd, _env._metrics.movedBuckets, _env._component.getClock())); - document::BucketId bucket(cmd.getBucketId()); + document::Bucket bucket(cmd.getBucket()); uint32_t targetDisk(cmd.getDstDisk()); uint32_t deviceIndex(_env._partition); @@ -45,8 +45,8 @@ DiskMoveOperationHandler::handleBucketDiskMove(BucketDiskMoveCommand& cmd, bucket.toString().c_str(), deviceIndex, targetDisk); - spi::Bucket from(document::Bucket(document::BucketSpace::placeHolder(), bucket), spi::PartitionId(deviceIndex)); - spi::Bucket to(document::Bucket(document::BucketSpace::placeHolder(), bucket), spi::PartitionId(targetDisk)); + spi::Bucket from(bucket, spi::PartitionId(deviceIndex)); + spi::Bucket to(bucket, spi::PartitionId(targetDisk)); spi::Result result( _provider.move(from, spi::PartitionId(targetDisk), context)); @@ -66,13 +66,13 @@ DiskMoveOperationHandler::handleBucketDiskMove(BucketDiskMoveCommand& cmd, // is executed. moving queue should move delete command to correct disk StorBucketDatabase::WrappedEntry entry( _env.getBucketDatabase().get( - bucket, "FileStorThread::onBucketDiskMove", + bucket.getBucketId(), "FileStorThread::onBucketDiskMove", StorBucketDatabase::LOCK_IF_NONEXISTING_AND_NOT_CREATING)); // Move queued operations in bucket to new thread. Hold bucket lock // while doing it, so filestor manager can't put in other operations // first, such that operations change order. - _env._fileStorHandler.remapQueueAfterDiskMove(bucket, deviceIndex, targetDisk); + _env._fileStorHandler.remapQueueAfterDiskMove(bucket.getBucketId(), deviceIndex, targetDisk); if (entry.exist()) { entry->setBucketInfo(bInfo); diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 36e417b65e4..88bca44558c 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -955,8 +955,8 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, _env._metrics.mergeBuckets, _env._component.getClock())); - const document::BucketId& id(cmd.getBucketId()); - spi::Bucket bucket(document::Bucket(document::BucketSpace::placeHolder(), id), spi::PartitionId(_env._partition)); + spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); + const document::BucketId id(bucket.getBucketId()); LOG(debug, "MergeBucket(%s) with max timestamp %" PRIu64 ".", bucket.toString().c_str(), cmd.getMaxTimestamp()); @@ -1181,8 +1181,8 @@ MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker(new MessageTracker( _env._metrics.getBucketDiff, _env._component.getClock())); - const document::BucketId& id(cmd.getBucketId()); - spi::Bucket bucket(document::Bucket(document::BucketSpace::placeHolder(), id), spi::PartitionId(_env._partition)); + spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); + const document::BucketId id(bucket.getBucketId()); LOG(debug, "GetBucketDiff(%s)", bucket.toString().c_str()); checkResult(_spi.createBucket(bucket, context), bucket, "create bucket"); @@ -1304,8 +1304,8 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSender& sender) { ++_env._metrics.getBucketDiffReply; - document::BucketId id(reply.getBucketId()); - spi::Bucket bucket(document::Bucket(document::BucketSpace::placeHolder(), id), spi::PartitionId(_env._partition)); + spi::Bucket bucket(reply.getBucket(), spi::PartitionId(_env._partition)); + document::BucketId id(bucket.getBucketId()); LOG(debug, "GetBucketDiffReply(%s)", bucket.toString().c_str()); if (!_env._fileStorHandler.isMerging(id)) { @@ -1388,8 +1388,8 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, _env._metrics.applyBucketDiff, _env._component.getClock())); - const document::BucketId& id(cmd.getBucketId()); - spi::Bucket bucket(document::Bucket(document::BucketSpace::placeHolder(), id), spi::PartitionId(_env._partition)); + spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); + const document::BucketId id(bucket.getBucketId()); LOG(debug, "%s", cmd.toString().c_str()); if (_env._fileStorHandler.isMerging(id)) { @@ -1484,8 +1484,8 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, MessageSender& sender) { ++_env._metrics.applyBucketDiffReply; - document::BucketId id(reply.getBucketId()); - spi::Bucket bucket(document::Bucket(document::BucketSpace::placeHolder(), id), spi::PartitionId(_env._partition)); + spi::Bucket bucket(reply.getBucket(), spi::PartitionId(_env._partition)); + document::BucketId id(bucket.getBucketId()); std::vector<api::ApplyBucketDiffCommand::Entry>& diff(reply.getDiff()); LOG(debug, "%s", reply.toString().c_str()); diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h index 2112c336f61..d574eaa2108 100644 --- a/storage/src/vespa/storage/persistence/messages.h +++ b/storage/src/vespa/storage/persistence/messages.h @@ -32,7 +32,7 @@ public: std::unique_ptr<api::StorageReply> makeReply() override; - document::BucketId getBucketId() const override { return _bucketId; } + document::Bucket getBucket() const override { return getPlaceHolderBucket(_bucketId); } bool hasSingleBucketId() const override { return true; } spi::IteratorId getIteratorId() const { return _iteratorId; } @@ -64,9 +64,7 @@ public: ~GetIterReply(); bool hasSingleBucketId() const override { return true; } - document::BucketId getBucketId() const override { - return _bucketId; - } + document::Bucket getBucket() const override { return getPlaceHolderBucket(_bucketId); } const std::vector<spi::DocEntry::UP>& getEntries() const { return _entries; @@ -101,7 +99,7 @@ public: spi::IncludedVersions includedVersions); ~CreateIteratorCommand(); bool hasSingleBucketId() const override { return true; } - document::BucketId getBucketId() const override { return _bucketId; } + document::Bucket getBucket() const override { return getPlaceHolderBucket(_bucketId); } const spi::Selection& getSelection() const { return _selection; } spi::IncludedVersions getIncludedVersions() const { return _includedVersions; } const std::string& getFields() const { return _fieldSet; } @@ -131,7 +129,7 @@ public: ~CreateIteratorReply(); bool hasSingleBucketId() const override { return true; } - document::BucketId getBucketId() const override { return _bucketId; } + document::Bucket getBucket() const override { return getPlaceHolderBucket(_bucketId); } spi::IteratorId getIteratorId() const { return _iteratorId; } @@ -181,9 +179,7 @@ public: RecheckBucketInfoCommand(const document::BucketId& bucketId); ~RecheckBucketInfoCommand(); - document::BucketId getBucketId() const override { - return _bucketId; - } + document::Bucket getBucket() const override { return getPlaceHolderBucket(_bucketId); } std::unique_ptr<api::StorageReply> makeReply() override; @@ -201,9 +197,7 @@ public: RecheckBucketInfoReply(const RecheckBucketInfoCommand& cmd); ~RecheckBucketInfoReply(); - document::BucketId getBucketId() const override { - return _bucketId; - } + document::Bucket getBucket() const override { return getPlaceHolderBucket(_bucketId); } void print(std::ostream& out, bool verbose, const std::string& indent) const override; }; diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index 11f8b5d1cf4..7a8aec6afa2 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -217,7 +217,7 @@ PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd) (cmd.verifyBody() ? "Verifying body" : "Not verifying body")); api::BucketInfo before = _env.getBucketInfo(cmd.getBucketId()); spi::Result result = - _spi.maintain(spi::Bucket(document::Bucket(document::BucketSpace::placeHolder(), cmd.getBucketId()), + _spi.maintain(spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)), cmd.verifyBody() ? spi::HIGH : spi::LOW); @@ -243,8 +243,7 @@ PersistenceThread::handleMultiOperation(api::MultiOperationCommand& cmd) MessageTracker::UP tracker(new MessageTracker( _env._metrics.multiOp[cmd.getLoadType()], _env._component.getClock())); - spi::Bucket b = spi::Bucket(document::Bucket(document::BucketSpace::placeHolder(), cmd.getBucketId()), - spi::PartitionId(_env._partition)); + spi::Bucket b = spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); long puts = 0; long removes = 0; long updates = 0; @@ -300,8 +299,7 @@ PersistenceThread::handleRevert(api::RevertCommand& cmd) MessageTracker::UP tracker(new MessageTracker( _env._metrics.revert[cmd.getLoadType()], _env._component.getClock())); - spi::Bucket b = spi::Bucket(document::Bucket(document::BucketSpace::placeHolder(), cmd.getBucketId()), - spi::PartitionId(_env._partition)); + spi::Bucket b = spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); const std::vector<api::Timestamp> tokens = cmd.getRevertTokens(); for (uint32_t i = 0; i < tokens.size(); ++i) { spi::Result result = _spi.removeEntry(b, @@ -323,7 +321,7 @@ PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd) cmd.getBucketId().toString().c_str()); DUMP_LOGGED_BUCKET_OPERATIONS(cmd.getBucketId()); } - spi::Bucket spiBucket(document::Bucket(document::BucketSpace::placeHolder(), cmd.getBucketId()), spi::PartitionId(_env._partition)); + spi::Bucket spiBucket(cmd.getBucket(), spi::PartitionId(_env._partition)); _spi.createBucket(spiBucket, _context); if (cmd.getActive()) { _spi.setActiveState(spiBucket, spi::BucketInfo::ACTIVE); @@ -392,7 +390,7 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd) api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge")); } - spi::Bucket bucket(document::Bucket(document::BucketSpace::placeHolder(), cmd.getBucketId()), spi::PartitionId(_env._partition)); + spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) { return tracker; } @@ -483,7 +481,7 @@ PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd) // _context is reset per command, so it's safe to modify it like this. _context.setReadConsistency(cmd.getReadConsistency()); spi::CreateIteratorResult result(_spi.createIterator( - spi::Bucket(document::Bucket(document::BucketSpace::placeHolder(), cmd.getBucketId()), spi::PartitionId(_env._partition)), + spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)), *fieldSet, cmd.getSelection(), cmd.getIncludedVersions(), @@ -519,7 +517,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) return tracker; } - spi::Bucket spiBucket(document::Bucket(document::BucketSpace::placeHolder(), cmd.getBucketId()), spi::PartitionId(_env._partition)); + spi::Bucket spiBucket(cmd.getBucket(), spi::PartitionId(_env._partition)); SplitBitDetector::Result targetInfo; if (_env._config.enableMultibitSplitOptimalization) { targetInfo = SplitBitDetector::detectSplit( @@ -567,8 +565,8 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) #endif spi::Result result = _spi.split( spiBucket, - spi::Bucket(document::Bucket(document::BucketSpace::placeHolder(), target1), spi::PartitionId(lock1.disk)), - spi::Bucket(document::Bucket(document::BucketSpace::placeHolder(), target2), spi::PartitionId(lock2.disk)), _context); + spi::Bucket(document::Bucket(spiBucket.getBucketSpace(), target1), spi::PartitionId(lock1.disk)), + spi::Bucket(document::Bucket(spiBucket.getBucketSpace(), target2), spi::PartitionId(lock2.disk)), _context); if (result.hasError()) { tracker->fail(_env.convertErrorCode(result), result.getErrorMessage()); @@ -632,7 +630,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) // to an empty target bucket, since the provider will have // implicitly erased it by this point. spi::Bucket createTarget( - spi::Bucket(document::Bucket(document::BucketSpace::placeHolder(), targets[i].second.bid), + spi::Bucket(document::Bucket(spiBucket.getBucketSpace(), targets[i].second.bid), spi::PartitionId(targets[i].second.diskIndex))); LOG(debug, "Split target %s was empty, but re-creating it since " @@ -741,18 +739,16 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd) } } #endif + document::Bucket destBucket = cmd.getBucket(); spi::Result result = - _spi.join(spi::Bucket(document::Bucket(document::BucketSpace::placeHolder(), firstBucket), spi::PartitionId(lock1.disk)), - spi::Bucket(document::Bucket(document::BucketSpace::placeHolder(), secondBucket), spi::PartitionId(lock2.disk)), - spi::Bucket(document::Bucket(document::BucketSpace::placeHolder(), cmd.getBucketId()), - spi::PartitionId(_env._partition)), + _spi.join(spi::Bucket(document::Bucket(destBucket.getBucketSpace(), firstBucket), spi::PartitionId(lock1.disk)), + spi::Bucket(document::Bucket(destBucket.getBucketSpace(), secondBucket), spi::PartitionId(lock2.disk)), + spi::Bucket(destBucket, spi::PartitionId(_env._partition)), _context); if (!checkForError(result, *tracker)) { return tracker; } - result = _spi.flush(spi::Bucket(document::Bucket(document::BucketSpace::placeHolder(), cmd.getBucketId()), - spi::PartitionId(_env._partition)), - _context); + result = _spi.flush(spi::Bucket(destBucket, spi::PartitionId(_env._partition)), _context); if (!checkForError(result, *tracker)) { return tracker; } @@ -799,7 +795,7 @@ PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd) NotificationGuard notifyGuard(*_bucketOwnershipNotifier); LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str()); - spi::Bucket bucket(document::Bucket(document::BucketSpace::placeHolder(), cmd.getBucketId()), spi::PartitionId(_env._partition)); + spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE); spi::BucketInfo::ActiveState newState( shouldBeActive @@ -846,13 +842,11 @@ PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd) entry->disk = _env._partition; entry.write(); } + document::Bucket destBucket = cmd.getBucket(); spi::Result result = - _spi.join(spi::Bucket(document::Bucket(document::BucketSpace::placeHolder(), cmd.getBucketId()), - spi::PartitionId(cmd.getDiskOfInstanceToJoin())), - spi::Bucket(document::Bucket(document::BucketSpace::placeHolder(), cmd.getBucketId()), - spi::PartitionId(cmd.getDiskOfInstanceToJoin())), - spi::Bucket(document::Bucket(document::BucketSpace::placeHolder(), cmd.getBucketId()), - spi::PartitionId(cmd.getDiskOfInstanceToKeep())), + _spi.join(spi::Bucket(destBucket, spi::PartitionId(cmd.getDiskOfInstanceToJoin())), + spi::Bucket(destBucket, spi::PartitionId(cmd.getDiskOfInstanceToJoin())), + spi::Bucket(destBucket, spi::PartitionId(cmd.getDiskOfInstanceToKeep())), _context); if (checkForError(result, *tracker)) { tracker->setReply( diff --git a/storage/src/vespa/storage/persistence/processallhandler.cpp b/storage/src/vespa/storage/persistence/processallhandler.cpp index 67b150ec70a..51ef67dc7ac 100644 --- a/storage/src/vespa/storage/persistence/processallhandler.cpp +++ b/storage/src/vespa/storage/persistence/processallhandler.cpp @@ -87,9 +87,7 @@ ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, cmd.getBucketId().toString().c_str(), cmd.getDocumentSelection().c_str()); - spi::Bucket bucket(document::Bucket(document::BucketSpace::placeHolder(), - cmd.getBucketId()), - spi::PartitionId(_env._partition)); + spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); UnrevertableRemoveEntryProcessor processor(_spi, bucket, context); BucketProcessor::iterateAll(_spi, bucket, @@ -118,8 +116,7 @@ ProcessAllHandler::handleStatBucket(api::StatBucketCommand& cmd, ost << "Persistence bucket " << cmd.getBucketId() << ", partition " << _env._partition << "\n"; - spi::Bucket bucket(document::Bucket(document::BucketSpace::placeHolder(), cmd.getBucketId()), - spi::PartitionId(_env._partition)); + spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); StatEntryProcessor processor(ost); BucketProcessor::iterateAll(_spi, bucket, |