diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-18 14:54:28 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-18 14:54:28 +0000 |
commit | e099df397486313831f09befa1d081b178384632 (patch) | |
tree | 2a3b98ba69c25255724653df521ce0a5a38d6972 /storage | |
parent | b17c5c96e18ae04a1b39b4d0ec3dc89832038c05 (diff) |
Move join handling together with split handling.
Diffstat (limited to 'storage')
7 files changed, 152 insertions, 162 deletions
diff --git a/storage/src/vespa/storage/common/bucketmessages.h b/storage/src/vespa/storage/common/bucketmessages.h index d6ba7915b3f..b0dfa35fbd1 100644 --- a/storage/src/vespa/storage/common/bucketmessages.h +++ b/storage/src/vespa/storage/common/bucketmessages.h @@ -1,12 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vespa/persistence/spi/persistenceprovider.h> +#include <vespa/persistence/spi/result.h> #include <vespa/storageapi/message/internal.h> -#include <vespa/document/bucket/bucket.h> #include <vespa/storageapi/buckets/bucketinfo.h> -#include <vector> -#include <set> namespace storage { diff --git a/storage/src/vespa/storage/distributor/bucketlistmerger.h b/storage/src/vespa/storage/distributor/bucketlistmerger.h index 25574721c59..1db2bc55f5c 100644 --- a/storage/src/vespa/storage/distributor/bucketlistmerger.h +++ b/storage/src/vespa/storage/distributor/bucketlistmerger.h @@ -3,10 +3,9 @@ #include <vespa/document/bucket/bucketid.h> #include <vespa/storageapi/buckets/bucketinfo.h> +#include <vector> -namespace storage { - -namespace distributor { +namespace storage::distributor { /** Merges two sorted lists of buckets. @@ -18,27 +17,18 @@ namespace distributor { class BucketListMerger { public: - typedef std::pair<document::BucketId, api::BucketInfo> BucketEntry; - typedef std::vector<BucketEntry> BucketList; - - BucketListMerger(const BucketList& newList, const BucketList& oldList, - uint64_t timestamp); + using BucketEntry = std::pair<document::BucketId, api::BucketInfo>; + using BucketList = std::vector<BucketEntry>; - const std::vector<BucketEntry>& getAddedEntries() - { return _addedEntries; } - - const std::vector<document::BucketId>& getRemovedEntries() - { return _removedEntries; } + BucketListMerger(const BucketList& newList, const BucketList& oldList, uint64_t timestamp); + const std::vector<BucketEntry>& getAddedEntries() const { return _addedEntries; } + const std::vector<document::BucketId>& getRemovedEntries() const { return _removedEntries; } uint64_t getTimestamp() const { return _timestamp; } - private: - std::vector<BucketEntry> _addedEntries; + std::vector<BucketEntry> _addedEntries; std::vector<document::BucketId> _removedEntries; - uint64_t _timestamp; + uint64_t _timestamp; }; } - -} - diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergemetadata.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergemetadata.h index 273603ca6ad..2e4c61b10d5 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergemetadata.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergemetadata.h @@ -4,6 +4,8 @@ #include <vespa/storage/bucketdb/bucketcopy.h> +namespace vespalib { class asciistream; } + namespace storage::distributor { struct MergeMetaData { diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index 6602e9ece4a..74d5c304bfb 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -278,133 +278,6 @@ PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd, MessageTrack return tracker; } -bool -PersistenceThread::validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker) -{ - if (cmd.getSourceBuckets().size() != 2) { - tracker.fail(ReturnCode::ILLEGAL_PARAMETERS, - "Join needs exactly two buckets to be joined together" + cmd.getBucketId().toString()); - return false; - } - // Verify that source and target buckets look sane. - for (uint32_t i = 0; i < cmd.getSourceBuckets().size(); i++) { - if (cmd.getSourceBuckets()[i] == cmd.getBucketId()) { - tracker.fail(ReturnCode::ILLEGAL_PARAMETERS, - "Join had both source and target bucket " + cmd.getBucketId().toString()); - return false; - } - if (!cmd.getBucketId().contains(cmd.getSourceBuckets()[i])) { - tracker.fail(ReturnCode::ILLEGAL_PARAMETERS, - "Source bucket " + cmd.getSourceBuckets()[i].toString() - + " is not contained in target " + cmd.getBucketId().toString()); - return false; - } - } - return true; -} - -MessageTracker::UP -PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTracker::UP tracker) -{ - tracker->setMetric(_env._metrics.joinBuckets); - if (!validateJoinCommand(cmd, *tracker)) { - return tracker; - } - document::Bucket destBucket = cmd.getBucket(); - // To avoid a potential deadlock all operations locking multiple - // buckets must lock their buckets in the same order (sort order of - // bucket id, lowest countbits, lowest location first). - // Sort buckets to join in order to ensure we lock in correct order - std::sort(cmd.getSourceBuckets().begin(), cmd.getSourceBuckets().end()); - { - // Create empty bucket for target. - StorBucketDatabase::WrappedEntry entry = - _env.getBucketDatabase(destBucket.getBucketSpace()).get(destBucket.getBucketId(), "join", - StorBucketDatabase::CREATE_IF_NONEXISTING); - entry.write(); - } - - document::Bucket firstBucket(destBucket.getBucketSpace(), cmd.getSourceBuckets()[0]); - document::Bucket secondBucket(destBucket.getBucketSpace(), cmd.getSourceBuckets()[1]); - - PersistenceUtil::LockResult lock1(_env.lockAndGetDisk(firstBucket)); - PersistenceUtil::LockResult lock2; - if (firstBucket != secondBucket) { - lock2 = _env.lockAndGetDisk(secondBucket); - } - -#ifdef ENABLE_BUCKET_OPERATION_LOGGING - { - auto desc = fmt("join(%s, %s -> %s)", - firstBucket.getBucketId().toString().c_str(), - secondBucket.getBucketId().toString().c_str(), - cmd.getBucketId().toString().c_str()); - LOG_BUCKET_OPERATION(cmd.getBucketId(), desc); - LOG_BUCKET_OPERATION(firstBucket.getBucketId(), desc); - if (firstBucket != secondBucket) { - LOG_BUCKET_OPERATION(secondBucket.getBucketId(), desc); - } - } -#endif - spi::Result result = - _spi.join(spi::Bucket(firstBucket), - spi::Bucket(secondBucket), - spi::Bucket(destBucket), - tracker->context()); - if (!tracker->checkForError(result)) { - return tracker; - } - uint64_t lastModified = 0; - for (uint32_t i = 0; i < cmd.getSourceBuckets().size(); i++) { - document::Bucket srcBucket(destBucket.getBucketSpace(), cmd.getSourceBuckets()[i]); - FileStorHandler::RemapInfo target(cmd.getBucket()); - _env._fileStorHandler.remapQueueAfterJoin(FileStorHandler::RemapInfo(srcBucket), target); - // Remove source from bucket db. - StorBucketDatabase::WrappedEntry entry = - _env.getBucketDatabase(srcBucket.getBucketSpace()).get(srcBucket.getBucketId(), "join-remove-source"); - if (entry.exist()) { - lastModified = std::max(lastModified, entry->info.getLastModified()); - entry.remove(); - } - } - { - StorBucketDatabase::WrappedEntry entry = - _env.getBucketDatabase(destBucket.getBucketSpace()).get(destBucket.getBucketId(), "join", - StorBucketDatabase::CREATE_IF_NONEXISTING); - if (entry->info.getLastModified() == 0) { - entry->info.setLastModified(std::max(lastModified, entry->info.getLastModified())); - } - entry.write(); - } - return tracker; -} - -MessageTracker::UP -PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, MessageTracker::UP tracker) -{ - tracker->setMetric(_env._metrics.internalJoin); - document::Bucket destBucket = cmd.getBucket(); - { - // Create empty bucket for target. - StorBucketDatabase::WrappedEntry entry = - _env.getBucketDatabase(destBucket.getBucketSpace()).get( - destBucket.getBucketId(), "join", StorBucketDatabase::CREATE_IF_NONEXISTING); - - entry.write(); - } - assert(cmd.getDiskOfInstanceToJoin() == 0u); - assert(cmd.getDiskOfInstanceToKeep() == 0u); - spi::Result result = - _spi.join(spi::Bucket(destBucket), - spi::Bucket(destBucket), - spi::Bucket(destBucket), - tracker->context()); - if (tracker->checkForError(result)) { - tracker->setReply(std::make_shared<InternalBucketJoinReply>(cmd, _env.getBucketInfo(cmd.getBucket()))); - } - return tracker; -} - MessageTracker::UP PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTracker::UP tracker) { @@ -424,7 +297,7 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTra case api::MessageType::DELETEBUCKET_ID: return handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker)); case api::MessageType::JOINBUCKETS_ID: - return handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), std::move(tracker)); + return _splitJoinHandler.handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), std::move(tracker)); case api::MessageType::SPLITBUCKET_ID: return _splitJoinHandler.handleSplitBucket(static_cast<api::SplitBucketCommand&>(msg), std::move(tracker)); // Depends on iterators @@ -451,7 +324,7 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTra case ReadBucketInfo::ID: return handleReadBucketInfo(static_cast<ReadBucketInfo&>(msg), std::move(tracker)); case InternalBucketJoinCommand::ID: - return handleInternalBucketJoin(static_cast<InternalBucketJoinCommand&>(msg), std::move(tracker)); + return _splitJoinHandler.handleInternalBucketJoin(static_cast<InternalBucketJoinCommand&>(msg), std::move(tracker)); case RecheckBucketInfoCommand::ID: return _splitJoinHandler.handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg), std::move(tracker)); default: diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index 969a17768e2..48223afd405 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -39,8 +39,6 @@ public: MessageTracker::UP handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker); MessageTracker::UP handleReadBucketList(ReadBucketList& cmd, MessageTracker::UP tracker); MessageTracker::UP handleReadBucketInfo(ReadBucketInfo& cmd, MessageTracker::UP tracker); - MessageTracker::UP handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTracker::UP tracker); - MessageTracker::UP handleInternalBucketJoin(InternalBucketJoinCommand& cmd, MessageTracker::UP tracker); //TODO Rewrite tests to avoid this api leak const AsyncHandler & asyncHandler() const { return _asyncHandler; } @@ -58,13 +56,6 @@ private: bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; - /** - * Sanity-checking of join command parameters. Invokes tracker.fail() with - * an appropriate error and returns false iff the command does not validate - * OK. Returns true and does not touch the tracker otherwise. - */ - static bool validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker); - // Message handling functions MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker); void handleReply(api::StorageReply&); diff --git a/storage/src/vespa/storage/persistence/splitjoinhandler.cpp b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp index 31f4f8055d5..6ba5f5ba276 100644 --- a/storage/src/vespa/storage/persistence/splitjoinhandler.cpp +++ b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp @@ -5,6 +5,7 @@ #include "bucketownershipnotifier.h" #include "splitbitdetector.h" #include "messages.h" +#include <vespa/storage/common/bucketmessages.h> #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/log/log.h> @@ -133,7 +134,7 @@ SplitJoinHandler::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker // Must make sure target bucket exists when we have pending ops // to an empty target bucket, since the provider will have // implicitly erased it by this point. - spi::Bucket createTarget(spi::Bucket(target.second.bucket)); + spi::Bucket createTarget(target.second.bucket); LOG(debug, "Split target %s was empty, but re-creating it since there are remapped operations queued to it", createTarget.toString().c_str()); _spi.createBucket(createTarget, tracker->context()); @@ -216,4 +217,131 @@ SplitJoinHandler::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, Message return tracker; } +MessageTracker::UP +SplitJoinHandler::handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTracker::UP tracker) const +{ + tracker->setMetric(_env._metrics.joinBuckets); + if (!validateJoinCommand(cmd, *tracker)) { + return tracker; + } + document::Bucket destBucket = cmd.getBucket(); + // To avoid a potential deadlock all operations locking multiple + // buckets must lock their buckets in the same order (sort order of + // bucket id, lowest countbits, lowest location first). + // Sort buckets to join in order to ensure we lock in correct order + std::sort(cmd.getSourceBuckets().begin(), cmd.getSourceBuckets().end()); + { + // Create empty bucket for target. + StorBucketDatabase::WrappedEntry entry = + _env.getBucketDatabase(destBucket.getBucketSpace()).get(destBucket.getBucketId(), "join", + StorBucketDatabase::CREATE_IF_NONEXISTING); + entry.write(); + } + + document::Bucket firstBucket(destBucket.getBucketSpace(), cmd.getSourceBuckets()[0]); + document::Bucket secondBucket(destBucket.getBucketSpace(), cmd.getSourceBuckets()[1]); + + PersistenceUtil::LockResult lock1(_env.lockAndGetDisk(firstBucket)); + PersistenceUtil::LockResult lock2; + if (firstBucket != secondBucket) { + lock2 = _env.lockAndGetDisk(secondBucket); + } + +#ifdef ENABLE_BUCKET_OPERATION_LOGGING + { + auto desc = fmt("join(%s, %s -> %s)", + firstBucket.getBucketId().toString().c_str(), + secondBucket.getBucketId().toString().c_str(), + cmd.getBucketId().toString().c_str()); + LOG_BUCKET_OPERATION(cmd.getBucketId(), desc); + LOG_BUCKET_OPERATION(firstBucket.getBucketId(), desc); + if (firstBucket != secondBucket) { + LOG_BUCKET_OPERATION(secondBucket.getBucketId(), desc); + } +} +#endif + spi::Result result = + _spi.join(spi::Bucket(firstBucket), + spi::Bucket(secondBucket), + spi::Bucket(destBucket), + tracker->context()); + if (!tracker->checkForError(result)) { + return tracker; + } + uint64_t lastModified = 0; + for (uint32_t i = 0; i < cmd.getSourceBuckets().size(); i++) { + document::Bucket srcBucket(destBucket.getBucketSpace(), cmd.getSourceBuckets()[i]); + FileStorHandler::RemapInfo target(cmd.getBucket()); + _env._fileStorHandler.remapQueueAfterJoin(FileStorHandler::RemapInfo(srcBucket), target); + // Remove source from bucket db. + StorBucketDatabase::WrappedEntry entry = + _env.getBucketDatabase(srcBucket.getBucketSpace()).get(srcBucket.getBucketId(), "join-remove-source"); + if (entry.exist()) { + lastModified = std::max(lastModified, entry->info.getLastModified()); + entry.remove(); + } + } + { + StorBucketDatabase::WrappedEntry entry = + _env.getBucketDatabase(destBucket.getBucketSpace()).get(destBucket.getBucketId(), "join", + StorBucketDatabase::CREATE_IF_NONEXISTING); + if (entry->info.getLastModified() == 0) { + entry->info.setLastModified(std::max(lastModified, entry->info.getLastModified())); + } + entry.write(); + } + return tracker; +} + +MessageTracker::UP +SplitJoinHandler::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, MessageTracker::UP tracker) const +{ + tracker->setMetric(_env._metrics.internalJoin); + document::Bucket destBucket = cmd.getBucket(); + { + // Create empty bucket for target. + StorBucketDatabase::WrappedEntry entry = + _env.getBucketDatabase(destBucket.getBucketSpace()).get( + destBucket.getBucketId(), "join", StorBucketDatabase::CREATE_IF_NONEXISTING); + + entry.write(); + } + assert(cmd.getDiskOfInstanceToJoin() == 0u); + assert(cmd.getDiskOfInstanceToKeep() == 0u); + spi::Result result = + _spi.join(spi::Bucket(destBucket), + spi::Bucket(destBucket), + spi::Bucket(destBucket), + tracker->context()); + if (tracker->checkForError(result)) { + tracker->setReply(std::make_shared<InternalBucketJoinReply>(cmd, _env.getBucketInfo(cmd.getBucket()))); + } + return tracker; +} + +bool +SplitJoinHandler::validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker) +{ + if (cmd.getSourceBuckets().size() != 2) { + tracker.fail(ReturnCode::ILLEGAL_PARAMETERS, + "Join needs exactly two buckets to be joined together" + cmd.getBucketId().toString()); + return false; + } + // Verify that source and target buckets look sane. + for (uint32_t i = 0; i < cmd.getSourceBuckets().size(); i++) { + if (cmd.getSourceBuckets()[i] == cmd.getBucketId()) { + tracker.fail(ReturnCode::ILLEGAL_PARAMETERS, + "Join had both source and target bucket " + cmd.getBucketId().toString()); + return false; + } + if (!cmd.getBucketId().contains(cmd.getSourceBuckets()[i])) { + tracker.fail(ReturnCode::ILLEGAL_PARAMETERS, + "Source bucket " + cmd.getSourceBuckets()[i].toString() + + " is not contained in target " + cmd.getBucketId().toString()); + return false; + } + } + return true; +} + } diff --git a/storage/src/vespa/storage/persistence/splitjoinhandler.h b/storage/src/vespa/storage/persistence/splitjoinhandler.h index 0d7f593c762..e24ca7e0ae1 100644 --- a/storage/src/vespa/storage/persistence/splitjoinhandler.h +++ b/storage/src/vespa/storage/persistence/splitjoinhandler.h @@ -11,6 +11,7 @@ namespace spi { struct PersistenceProvider; } struct PersistenceUtil; class BucketOwnershipNotifier; class RecheckBucketInfoCommand; +class InternalBucketJoinCommand; /** * Handle operations that uses changes bucket ownership operations @@ -23,7 +24,15 @@ public: MessageTrackerUP handleSplitBucket(api::SplitBucketCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleInternalBucketJoin(InternalBucketJoinCommand& cmd, MessageTrackerUP tracker) const; private: + /** + * Sanity-checking of join command parameters. Invokes tracker.fail() with + * an appropriate error and returns false iff the command does not validate + * OK. Returns true and does not touch the tracker otherwise. + */ + static bool validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker); PersistenceUtil &_env; spi::PersistenceProvider &_spi; BucketOwnershipNotifier &_bucketOwnershipNotifier; |