diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-16 13:53:33 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-18 11:25:35 +0000 |
commit | f34acff8f2f40938d97b5b20ed960101d6622eff (patch) | |
tree | 607962abf17ef1c30ad137581730fd51e69b97c7 | |
parent | 3d89f85313a0a8da5d8ad0e435b1877a46e6ecc3 (diff) |
Factor out handling of operations that might change bucket ownership.
15 files changed, 324 insertions, 255 deletions
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 257e943e5ab..4d9b23620f6 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -197,11 +197,12 @@ createThread(vdstestlib::DirConfig& config, TestServiceLayerApp& node, spi::PersistenceProvider& provider, FileStorHandler& filestorHandler, + BucketOwnershipNotifier & notifier, FileStorThreadMetrics& metrics) { (void) config; return std::make_unique<PersistenceThread>(node.executor(), node.getComponentRegister(), config.getConfigId(), - provider, filestorHandler, metrics); + provider, filestorHandler, notifier, metrics); } namespace { @@ -716,12 +717,14 @@ TEST_F(FileStorManagerTest, priority) { metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(),1, 2); FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); + ServiceLayerComponent component(_node->getComponentRegister(), "test"); + BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0])); + filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0])); std::unique_ptr<DiskThread> thread2(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[1])); + filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[1])); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP > documents; @@ -796,9 +799,11 @@ TEST_F(FileStorManagerTest, split1) { FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1); FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); + ServiceLayerComponent component(_node->getComponentRegister(), "test"); + BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0])); + filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0])); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP > documents; for (uint32_t i=0; i<20; ++i) { @@ -936,6 +941,8 @@ TEST_F(FileStorManagerTest, split_single_group) { FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(),1, 1); FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); + ServiceLayerComponent component(_node->getComponentRegister(), "test"); + BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender); spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0)); for (uint32_t j=0; j<1; ++j) { // Test this twice, once where all the data ends up in file with @@ -945,7 +952,7 @@ TEST_F(FileStorManagerTest, split_single_group) { std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0])); + filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0])); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP> documents; for (uint32_t i=0; i<20; ++i) { @@ -1048,9 +1055,11 @@ TEST_F(FileStorManagerTest, split_empty_target_with_remapped_ops) { FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1); FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); + ServiceLayerComponent component(_node->getComponentRegister(), "test"); + BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0])); + filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0])); document::BucketId source(16, 0x10001); @@ -1113,9 +1122,11 @@ TEST_F(FileStorManagerTest, notify_on_split_source_ownership_changed) { FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1); FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); + ServiceLayerComponent component(_node->getComponentRegister(), "test"); + BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0])); + filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0])); document::BucketId source(getFirstBucketNotOwnedByDistributor(0)); createBucket(source, 0); @@ -1154,9 +1165,11 @@ TEST_F(FileStorManagerTest, join) { FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1); FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); + ServiceLayerComponent component(_node->getComponentRegister(), "test"); + BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0])); + filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0])); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP > documents; for (uint32_t i=0; i<20; ++i) { diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index f6ba71940a8..8d06f25319d 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -67,7 +67,13 @@ PersistenceTestEnvironment::~PersistenceTestEnvironment() { } } -PersistenceTestUtils::PersistenceTestUtils() = default; +PersistenceTestUtils::PersistenceTestUtils() + : _env(std::make_unique<PersistenceTestEnvironment>("todo-make-unique-persistencetestutils")), + _replySender(), + _bucketOwnershipNotifier(getEnv()._component, getEnv()._fileStorHandler) +{ + setupExecutor(1); +} PersistenceTestUtils::~PersistenceTestUtils() = default; std::string @@ -76,12 +82,6 @@ PersistenceTestUtils::dumpBucket(const document::BucketId& bid) { } void -PersistenceTestUtils::setupDisks() { - _env = std::make_unique<PersistenceTestEnvironment>("todo-make-unique-persistencetestutils"); - setupExecutor(1); -} - -void PersistenceTestUtils::setupExecutor(uint32_t numThreads) { _sequenceTaskExecutor = vespalib::SequencedTaskExecutor::create(numThreads, 1000, vespalib::Executor::OptimizeFor::ADAPTIVE); } @@ -91,7 +91,7 @@ PersistenceTestUtils::createPersistenceThread() { return std::make_unique<PersistenceThread>(*_sequenceTaskExecutor, _env->_node.getComponentRegister(), _env->_config.getConfigId(),getPersistenceProvider(), - getEnv()._fileStorHandler, getEnv()._metrics); + getEnv()._fileStorHandler, _bucketOwnershipNotifier, getEnv()._metrics); } document::Document::SP @@ -101,8 +101,7 @@ PersistenceTestUtils::schedulePut( uint32_t minSize, uint32_t maxSize) { - document::Document::SP doc(createRandomDocumentAtLocation( - location, timestamp, minSize, maxSize)); + document::Document::SP doc(createRandomDocumentAtLocation(location, timestamp, minSize, maxSize)); auto msg = std::make_shared<api::PutCommand>(makeDocumentBucket(document::BucketId(16, location)), doc, timestamp); fsHandler().schedule(msg); return doc; diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h index 65a7441847e..29a18db413b 100644 --- a/storage/src/tests/persistence/persistencetestutils.h +++ b/storage/src/tests/persistence/persistencetestutils.h @@ -6,6 +6,7 @@ #include <vespa/storage/persistence/persistencethread.h> #include <vespa/storage/persistence/filestorage/filestorhandler.h> #include <vespa/storage/persistence/persistenceutil.h> +#include <vespa/storage/persistence/bucketownershipnotifier.h> #include <vespa/storage/common/messagesender.h> #include <vespa/storage/common/storagecomponent.h> #include <vespa/persistence/spi/persistenceprovider.h> @@ -71,10 +72,11 @@ public: std::unique_ptr<PersistenceTestEnvironment> _env; std::unique_ptr<vespalib::ISequencedTaskExecutor> _sequenceTaskExecutor; ReplySender _replySender; + BucketOwnershipNotifier _bucketOwnershipNotifier; PersistenceTestUtils(); - virtual ~PersistenceTestUtils(); + ~PersistenceTestUtils() override; document::Document::SP schedulePut(uint32_t location, spi::Timestamp timestamp, uint32_t minSize = 0, uint32_t maxSize = 128); @@ -236,9 +238,6 @@ public: class SingleDiskPersistenceTestUtils : public PersistenceTestUtils { public: - void SetUp() override { - setupDisks(); - } }; } // storage diff --git a/storage/src/tests/persistence/persistencethread_splittest.cpp b/storage/src/tests/persistence/persistencethread_splittest.cpp index 10fb0b2e6b4..e266d367eab 100644 --- a/storage/src/tests/persistence/persistencethread_splittest.cpp +++ b/storage/src/tests/persistence/persistencethread_splittest.cpp @@ -214,7 +214,7 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase) cmd->setMinByteSize(maxSize); cmd->setMinDocCount(maxCount); cmd->setSourceIndex(0); - MessageTracker::UP result = thread->handleSplitBucket(*cmd, createTracker(cmd, docBucket)); + MessageTracker::UP result = thread->splitjoinHandler().handleSplitBucket(*cmd, createTracker(cmd, docBucket)); api::ReturnCode code(result->getResult()); EXPECT_EQ(error, code); if (!code.success()) { diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt index 15cd0540338..5707c9ea687 100644 --- a/storage/src/vespa/storage/persistence/CMakeLists.txt +++ b/storage/src/vespa/storage/persistence/CMakeLists.txt @@ -12,6 +12,7 @@ vespa_add_library(storage_spersistence OBJECT processallhandler.cpp provider_error_wrapper.cpp splitbitdetector.cpp + splitjoinhandler.cpp testandsethelper.cpp types.cpp DEPENDS diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h index 4c525b10152..610e66c0615 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.h +++ b/storage/src/vespa/storage/persistence/asynchandler.h @@ -13,8 +13,11 @@ namespace spi { } struct PersistenceUtil; +/** + * Handle async operations that uses a sequenced executor + * It is stateless and thread safe + */ class AsyncHandler : public Types { - public: AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, vespalib::ISequencedTaskExecutor & executor); MessageTrackerUP handlePut(api::PutCommand& cmd, MessageTrackerUP tracker) const; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index fdb52635292..6c2cebfeb19 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -134,7 +134,7 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC LOG(spam, "Setting up the disk"); for (uint32_t j = 0; j < numThreads; j++) { _threads.push_back(std::make_shared<PersistenceThread>(*_sequencedExecutor, _compReg, _configUri, *_provider, - *_filestorHandler, *_metrics->disks[0]->threads[j])); + *_filestorHandler, *_bucketOwnershipNotifier, *_metrics->disks[0]->threads[j])); } } } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h index a0ef9784c1d..85cbbe57d21 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h @@ -49,7 +49,7 @@ class FileStorManager : public StorageLinkQueued, public MessageSender { ServiceLayerComponentRegister & _compReg; - ServiceLayerComponent _component; + ServiceLayerComponent _component; spi::PersistenceProvider & _providerCore; ProviderErrorWrapper _providerErrorWrapper; spi::PersistenceProvider * _provider; diff --git a/storage/src/vespa/storage/persistence/messages.cpp b/storage/src/vespa/storage/persistence/messages.cpp index 61a10f71868..90597a49ad7 100644 --- a/storage/src/vespa/storage/persistence/messages.cpp +++ b/storage/src/vespa/storage/persistence/messages.cpp @@ -8,7 +8,7 @@ using document::BucketSpace; namespace storage { GetIterCommand::GetIterCommand(const document::Bucket &bucket, - const spi::IteratorId iteratorId, + spi::IteratorId iteratorId, uint32_t maxByteSize) : api::InternalCommand(ID), _bucket(bucket), diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h index 82dc69156ab..b771af18b17 100644 --- a/storage/src/vespa/storage/persistence/messages.h +++ b/storage/src/vespa/storage/persistence/messages.h @@ -8,7 +8,6 @@ #include <vespa/persistence/spi/read_consistency.h> #include <vespa/vespalib/stllike/hash_set.h> - namespace storage { class GetIterCommand : public api::InternalCommand { @@ -23,7 +22,7 @@ public: typedef std::shared_ptr<GetIterCommand> SP; GetIterCommand(const document::Bucket &bucket, - const spi::IteratorId iteratorId, + spi::IteratorId iteratorId, uint32_t maxByteSize); ~GetIterCommand() override; @@ -58,8 +57,8 @@ public: typedef std::shared_ptr<GetIterReply> SP; static const uint32_t ID = 1002; - GetIterReply(GetIterCommand& cmd); - ~GetIterReply(); + explicit GetIterReply(GetIterCommand& cmd); + ~GetIterReply() override; bool hasSingleBucketId() const override { return true; } document::Bucket getBucket() const override { return _bucket; } @@ -95,7 +94,7 @@ public: const spi::Selection& selection, const std::string& fields, spi::IncludedVersions includedVersions); - ~CreateIteratorCommand(); + ~CreateIteratorCommand() override; bool hasSingleBucketId() const override { return true; } document::Bucket getBucket() const override { return _bucket; } const spi::Selection& getSelection() const { return _selection; } @@ -127,7 +126,7 @@ public: typedef std::shared_ptr<CreateIteratorReply> SP; CreateIteratorReply(const CreateIteratorCommand& cmd, spi::IteratorId iteratorId); - ~CreateIteratorReply(); + ~CreateIteratorReply() override; bool hasSingleBucketId() const override { return true; } document::Bucket getBucket() const override { return _bucket; } @@ -145,8 +144,8 @@ public: typedef std::unique_ptr<DestroyIteratorCommand> UP; typedef std::shared_ptr<DestroyIteratorCommand> SP; - DestroyIteratorCommand(spi::IteratorId iteratorId); - ~DestroyIteratorCommand(); + explicit DestroyIteratorCommand(spi::IteratorId iteratorId); + ~DestroyIteratorCommand() override; spi::IteratorId getIteratorId() const { return _iteratorId; } @@ -163,8 +162,8 @@ public: typedef std::unique_ptr<DestroyIteratorReply> UP; typedef std::shared_ptr<DestroyIteratorReply> SP; - DestroyIteratorReply(const DestroyIteratorCommand& cmd); - ~DestroyIteratorReply(); + explicit DestroyIteratorReply(const DestroyIteratorCommand& cmd); + ~DestroyIteratorReply() override; void print(std::ostream& out, bool verbose, const std::string & indent) const override; }; @@ -177,8 +176,8 @@ public: typedef std::shared_ptr<RecheckBucketInfoCommand> SP; typedef std::unique_ptr<RecheckBucketInfoCommand> UP; - RecheckBucketInfoCommand(const document::Bucket &bucket); - ~RecheckBucketInfoCommand(); + explicit RecheckBucketInfoCommand(const document::Bucket &bucket); + ~RecheckBucketInfoCommand() override; document::Bucket getBucket() const override { return _bucket; } @@ -195,8 +194,8 @@ public: typedef std::shared_ptr<RecheckBucketInfoReply> SP; typedef std::unique_ptr<RecheckBucketInfoReply> UP; - RecheckBucketInfoReply(const RecheckBucketInfoCommand& cmd); - ~RecheckBucketInfoReply(); + explicit RecheckBucketInfoReply(const RecheckBucketInfoCommand& cmd); + ~RecheckBucketInfoReply() override; document::Bucket getBucket() const override { return _bucket; } @@ -221,8 +220,8 @@ public: private: std::unique_ptr<AbortPredicate> _predicate; public: - AbortBucketOperationsCommand(std::unique_ptr<AbortPredicate> predicate); - ~AbortBucketOperationsCommand(); + explicit AbortBucketOperationsCommand(std::unique_ptr<AbortPredicate> predicate); + ~AbortBucketOperationsCommand() override; bool shouldAbort(const document::Bucket &bucket) const { return _predicate->shouldAbort(bucket); @@ -240,8 +239,8 @@ public: typedef std::shared_ptr<AbortBucketOperationsReply> SP; typedef std::shared_ptr<const AbortBucketOperationsReply> CSP; - AbortBucketOperationsReply(const AbortBucketOperationsCommand& cmd); - ~AbortBucketOperationsReply(); + explicit AbortBucketOperationsReply(const AbortBucketOperationsCommand& cmd); + ~AbortBucketOperationsReply() override; void print(std::ostream& out, bool verbose, const std::string& indent) const override; }; diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index cc238cd0146..bfa23af5858 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -10,7 +10,6 @@ #include <vespa/document/base/exceptions.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> -#include <vespa/vespalib/stllike/hash_map.hpp> #include <thread> #include <vespa/log/log.h> @@ -25,7 +24,7 @@ namespace { vespalib::string createThreadName(size_t stripeId) { - return vespalib::make_string("PersistenceThread-%zu", stripeId); + return fmt("PersistenceThread-%zu", stripeId); } } @@ -35,6 +34,7 @@ PersistenceThread::PersistenceThread(vespalib::ISequencedTaskExecutor & sequence const config::ConfigUri & configUri, spi::PersistenceProvider& provider, FileStorHandler& filestorHandler, + BucketOwnershipNotifier & bucketOwnershipNotifier, FileStorThreadMetrics& metrics) : _stripeId(filestorHandler.getNextStripeId()), _component(std::make_unique<ServiceLayerComponent>(compReg, createThreadName(_stripeId))), @@ -43,9 +43,9 @@ PersistenceThread::PersistenceThread(vespalib::ISequencedTaskExecutor & sequence _processAllHandler(_env, provider), _mergeHandler(_env, _spi), _asyncHandler(_env, _spi, sequencedExecutor), - _bucketOwnershipNotifier() + _splitJoinHandler(_env, provider, bucketOwnershipNotifier), + _thread() { - _bucketOwnershipNotifier = std::make_unique<BucketOwnershipNotifier>(*_component, filestorHandler); _thread = _component->startThread(*this, 60s, 1s); } @@ -276,140 +276,6 @@ PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd, MessageTrack return tracker; } -MessageTracker::UP -PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker::UP tracker) -{ - tracker->setMetric(_env._metrics.splitBuckets); - NotificationGuard notifyGuard(*_bucketOwnershipNotifier); - - // Calculate the various bucket ids involved. - if (cmd.getBucketId().getUsedBits() >= 58) { - tracker->fail(api::ReturnCode::ILLEGAL_PARAMETERS, - "Can't split anymore since maximum split bits is already reached"); - return tracker; - } - if (cmd.getMaxSplitBits() <= cmd.getBucketId().getUsedBits()) { - tracker->fail(api::ReturnCode::ILLEGAL_PARAMETERS, - "Max lit bits must be set higher than the number of bits used in the bucket to split"); - return tracker; - } - - spi::Bucket spiBucket(cmd.getBucket()); - SplitBitDetector::Result targetInfo; - if (_env._config.enableMultibitSplitOptimalization) { - targetInfo = SplitBitDetector::detectSplit(_spi, spiBucket, cmd.getMaxSplitBits(), - tracker->context(), cmd.getMinDocCount(), cmd.getMinByteSize()); - } - if (targetInfo.empty() || !_env._config.enableMultibitSplitOptimalization) { - document::BucketId src(cmd.getBucketId()); - document::BucketId target1(src.getUsedBits() + 1, src.getId()); - document::BucketId target2(src.getUsedBits() + 1, src.getId() | (uint64_t(1) << src.getUsedBits())); - targetInfo = SplitBitDetector::Result(target1, target2, false); - } - if (targetInfo.failed()) { - tracker->fail(api::ReturnCode::INTERNAL_FAILURE, targetInfo.getReason()); - return tracker; - } - // If we get here, we're splitting data in two. - // (Possibly in special case where a target will be unused) - assert(targetInfo.success()); - document::Bucket target1(spiBucket.getBucketSpace(), targetInfo.getTarget1()); - document::Bucket target2(spiBucket.getBucketSpace(), targetInfo.getTarget2()); - - LOG(debug, "split(%s -> %s, %s)", cmd.getBucketId().toString().c_str(), - target1.getBucketId().toString().c_str(), target2.getBucketId().toString().c_str()); - - PersistenceUtil::LockResult lock1(_env.lockAndGetDisk(target1)); - PersistenceUtil::LockResult lock2(_env.lockAndGetDisk(target2)); - -#ifdef ENABLE_BUCKET_OPERATION_LOGGING - { - auto desc = fmt("split(%s -> %s, %s)", - cmd.getBucketId().toString().c_str(), - target1.getBucketId().toString().c_str(), - target2.getBucketId().toString().c_str())); - LOG_BUCKET_OPERATION(cmd.getBucketId(), desc); - LOG_BUCKET_OPERATION(target1.getBucketId(), desc); - if (target2.getRawId() != 0) { - LOG_BUCKET_OPERATION(target2.getBucketId(), desc); - } - } -#endif - spi::Result result = _spi.split(spiBucket, spi::Bucket(target1), - spi::Bucket(target2), tracker->context()); - if (result.hasError()) { - tracker->fail(PersistenceUtil::convertErrorCode(result), result.getErrorMessage()); - return tracker; - } - // After split we need to take all bucket db locks to update them. - // Ensure to take them in rising order. - StorBucketDatabase::WrappedEntry sourceEntry(_env.getBucketDatabase(spiBucket.getBucket().getBucketSpace()).get( - cmd.getBucketId(), "PersistenceThread::handleSplitBucket-source")); - auto reply = std::make_shared<api::SplitBucketReply>(cmd); - api::SplitBucketReply & splitReply = *reply; - tracker->setReply(std::move(reply)); - - typedef std::pair<StorBucketDatabase::WrappedEntry, FileStorHandler::RemapInfo> TargetInfo; - std::vector<TargetInfo> targets; - for (uint32_t i = 0; i < 2; i++) { - const document::Bucket &target(i == 0 ? target1 : target2); - assert(target.getBucketId().getRawId() != 0); - targets.emplace_back(_env.getBucketDatabase(target.getBucketSpace()).get( - target.getBucketId(), "PersistenceThread::handleSplitBucket - Target", - StorBucketDatabase::CREATE_IF_NONEXISTING), - FileStorHandler::RemapInfo(target)); - targets.back().first->setBucketInfo(_env.getBucketInfo(target)); - } - if (LOG_WOULD_LOG(spam)) { - api::BucketInfo targ1(targets[0].first->getBucketInfo()); - api::BucketInfo targ2(targets[1].first->getBucketInfo()); - LOG(spam, "split(%s - %u -> %s - %u, %s - %u)", - cmd.getBucketId().toString().c_str(), - targ1.getMetaCount() + targ2.getMetaCount(), - target1.getBucketId().toString().c_str(), - targ1.getMetaCount(), - target2.getBucketId().toString().c_str(), - targ2.getMetaCount()); - } - FileStorHandler::RemapInfo source(cmd.getBucket()); - _env._fileStorHandler.remapQueueAfterSplit(source, targets[0].second, targets[1].second); - bool ownershipChanged(!_bucketOwnershipNotifier->distributorOwns(cmd.getSourceIndex(), cmd.getBucket())); - // Now release all the bucketdb locks. - for (auto & target : targets) { - if (ownershipChanged) { - notifyGuard.notifyAlways(target.second.bucket, target.first->getBucketInfo()); - } - // The entries vector has the source bucket in element zero, so indexing - // that with i+1 - if (target.second.foundInQueue || target.first->getMetaCount() > 0) { - if (target.first->getMetaCount() == 0) { - // Fake that the bucket has content so it is not deleted. - target.first->info.setMetaCount(1); - // Must make sure target bucket exists when we have pending ops - // to an empty target bucket, since the provider will have - // implicitly erased it by this point. - spi::Bucket createTarget(spi::Bucket(target.second.bucket)); - LOG(debug, "Split target %s was empty, but re-creating it since there are remapped operations queued to it", - createTarget.toString().c_str()); - _spi.createBucket(createTarget, tracker->context()); - } - splitReply.getSplitInfo().emplace_back(target.second.bucket.getBucketId(), - target.first->getBucketInfo()); - target.first.write(); - } else { - target.first.remove(); - } - } - if (sourceEntry.exist()) { - if (ownershipChanged) { - notifyGuard.notifyAlways(cmd.getBucket(), sourceEntry->getBucketInfo()); - } - // Delete the old entry. - sourceEntry.remove(); - } - return tracker; -} - bool PersistenceThread::validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker) { @@ -512,37 +378,6 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTracke } MessageTracker::UP -PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP tracker) -{ - tracker->setMetric(_env._metrics.setBucketStates); - NotificationGuard notifyGuard(*_bucketOwnershipNotifier); - - LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str()); - spi::Bucket bucket(cmd.getBucket()); - bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE); - spi::BucketInfo::ActiveState newState(shouldBeActive ? spi::BucketInfo::ACTIVE : spi::BucketInfo::NOT_ACTIVE); - - spi::Result result(_spi.setActiveState(bucket, newState)); - if (tracker->checkForError(result)) { - StorBucketDatabase::WrappedEntry - entry = _env.getBucketDatabase(bucket.getBucket().getBucketSpace()).get(cmd.getBucketId(), "handleSetBucketState"); - if (entry.exist()) { - entry->info.setActive(newState == spi::BucketInfo::ACTIVE); - notifyGuard.notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info); - entry.write(); - } else { - LOG(warning, "Got OK setCurrentState result from provider for %s, " - "but bucket has disappeared from service layer database", - cmd.getBucketId().toString().c_str()); - } - - tracker->setReply(std::make_shared<api::SetBucketStateReply>(cmd)); - } - - return tracker; -} - -MessageTracker::UP PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, MessageTracker::UP tracker) { tracker->setMetric(_env._metrics.internalJoin); @@ -569,36 +404,6 @@ PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, Mess } MessageTracker::UP -PersistenceThread::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker) -{ - tracker->setMetric(_env._metrics.recheckBucketInfo); - document::Bucket bucket(cmd.getBucket()); - api::BucketInfo info(_env.getBucketInfo(bucket)); - NotificationGuard notifyGuard(*_bucketOwnershipNotifier); - { - // Update bucket database - StorBucketDatabase::WrappedEntry entry( - _component->getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(), "handleRecheckBucketInfo")); - - if (entry.exist()) { - api::BucketInfo prevInfo(entry->getBucketInfo()); - - if (!(prevInfo == info)) { - notifyGuard.notifyAlways(bucket, info); - entry->info = info; - entry.write(); - } - } - // else: there is a race condition where concurrent execution of - // DeleteBucket in the FileStorManager and this function can cause it - // to look like the provider has a bucket we do not know about, simply - // because this function was executed before the actual - // DeleteBucketCommand in the persistence thread (see ticket 6143025). - } - return tracker; -} - -MessageTracker::UP PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTracker::UP tracker) { switch (msg.getType().getId()) { @@ -619,7 +424,7 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTra case api::MessageType::JOINBUCKETS_ID: return handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), std::move(tracker)); case api::MessageType::SPLITBUCKET_ID: - return handleSplitBucket(static_cast<api::SplitBucketCommand&>(msg), std::move(tracker)); + return _splitJoinHandler.handleSplitBucket(static_cast<api::SplitBucketCommand&>(msg), std::move(tracker)); // Depends on iterators case api::MessageType::STATBUCKET_ID: return _processAllHandler.handleStatBucket(static_cast<api::StatBucketCommand&>(msg), std::move(tracker)); @@ -632,7 +437,7 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTra case api::MessageType::APPLYBUCKETDIFF_ID: return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), std::move(tracker)); case api::MessageType::SETBUCKETSTATE_ID: - return handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker)); + return _splitJoinHandler.handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker)); case api::MessageType::INTERNAL_ID: switch(static_cast<api::InternalCommand&>(msg).getType()) { case GetIterCommand::ID: @@ -646,7 +451,7 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTra case InternalBucketJoinCommand::ID: return handleInternalBucketJoin(static_cast<InternalBucketJoinCommand&>(msg), std::move(tracker)); case RecheckBucketInfoCommand::ID: - return handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg), std::move(tracker)); + return _splitJoinHandler.handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg), std::move(tracker)); default: LOG(warning, "Persistence thread received unhandled internal command %s", msg.toString().c_str()); break; diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index 2a3ff813c7e..6afb58dfc4e 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -8,6 +8,7 @@ #include "asynchandler.h" #include "persistenceutil.h" #include "provider_error_wrapper.h" +#include "splitjoinhandler.h" #include <vespa/storage/common/bucketmessages.h> #include <vespa/storage/common/storagecomponent.h> #include <vespa/storage/common/statusmessages.h> @@ -20,9 +21,9 @@ class BucketOwnershipNotifier; class PersistenceThread final : public DiskThread, public Types { public: - PersistenceThread(vespalib::ISequencedTaskExecutor &, ServiceLayerComponentRegister&, - const config::ConfigUri & configUri, spi::PersistenceProvider& provider, - FileStorHandler& filestorHandler, FileStorThreadMetrics& metrics); + PersistenceThread(vespalib::ISequencedTaskExecutor &, ServiceLayerComponentRegister &, + const config::ConfigUri &, spi::PersistenceProvider &, + FileStorHandler &, BucketOwnershipNotifier &, FileStorThreadMetrics&); ~PersistenceThread() override; /** Waits for current operation to be finished. */ @@ -38,12 +39,11 @@ public: MessageTracker::UP handleReadBucketList(ReadBucketList& cmd, MessageTracker::UP tracker); MessageTracker::UP handleReadBucketInfo(ReadBucketInfo& cmd, MessageTracker::UP tracker); MessageTracker::UP handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTracker::UP tracker); - MessageTracker::UP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP tracker); MessageTracker::UP handleInternalBucketJoin(InternalBucketJoinCommand& cmd, MessageTracker::UP tracker); - MessageTracker::UP handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker::UP tracker); - MessageTracker::UP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker); + //TODO Rewrite tests to avoid this api leak const AsyncHandler & asyncHandler() const { return _asyncHandler; } + const SplitJoinHandler & splitjoinHandler() const { return _splitJoinHandler; } private: uint32_t _stripeId; ServiceLayerComponent::UP _component; @@ -52,8 +52,8 @@ private: ProcessAllHandler _processAllHandler; MergeHandler _mergeHandler; AsyncHandler _asyncHandler; + SplitJoinHandler _splitJoinHandler; framework::Thread::UP _thread; - std::unique_ptr<BucketOwnershipNotifier> _bucketOwnershipNotifier; bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; diff --git a/storage/src/vespa/storage/persistence/splitjoinhandler.cpp b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp new file mode 100644 index 00000000000..5b66ffa5929 --- /dev/null +++ b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp @@ -0,0 +1,218 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "splitjoinhandler.h" +#include "persistenceutil.h" +#include "bucketownershipnotifier.h" +#include "splitbitdetector.h" +#include "messages.h" +#include <vespa/persistence/spi/persistenceprovider.h> + +#include <vespa/log/log.h> +LOG_SETUP(".persistence.splitjoinhandler"); + +namespace storage { + +SplitJoinHandler::SplitJoinHandler(PersistenceUtil & env, spi::PersistenceProvider & spi, + BucketOwnershipNotifier & notifier) + : _env(env), + _spi(spi), + _bucketOwnershipNotifier(notifier) +{ +} + +MessageTracker::UP +SplitJoinHandler::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker::UP tracker) const +{ + tracker->setMetric(_env._metrics.splitBuckets); + NotificationGuard notifyGuard(_bucketOwnershipNotifier); + + // Calculate the various bucket ids involved. + if (cmd.getBucketId().getUsedBits() >= 58) { + tracker->fail(api::ReturnCode::ILLEGAL_PARAMETERS, + "Can't split anymore since maximum split bits is already reached"); + return tracker; + } + if (cmd.getMaxSplitBits() <= cmd.getBucketId().getUsedBits()) { + tracker->fail(api::ReturnCode::ILLEGAL_PARAMETERS, + "Max lit bits must be set higher than the number of bits used in the bucket to split"); + return tracker; + } + + spi::Bucket spiBucket(cmd.getBucket()); + SplitBitDetector::Result targetInfo; + if (_env._config.enableMultibitSplitOptimalization) { + targetInfo = SplitBitDetector::detectSplit(_spi, spiBucket, cmd.getMaxSplitBits(), + tracker->context(), cmd.getMinDocCount(), cmd.getMinByteSize()); + } + if (targetInfo.empty() || !_env._config.enableMultibitSplitOptimalization) { + document::BucketId src(cmd.getBucketId()); + document::BucketId target1(src.getUsedBits() + 1, src.getId()); + document::BucketId target2(src.getUsedBits() + 1, src.getId() | (uint64_t(1) << src.getUsedBits())); + targetInfo = SplitBitDetector::Result(target1, target2, false); + } + if (targetInfo.failed()) { + tracker->fail(api::ReturnCode::INTERNAL_FAILURE, targetInfo.getReason()); + return tracker; + } + // If we get here, we're splitting data in two. + // (Possibly in special case where a target will be unused) + assert(targetInfo.success()); + document::Bucket target1(spiBucket.getBucketSpace(), targetInfo.getTarget1()); + document::Bucket target2(spiBucket.getBucketSpace(), targetInfo.getTarget2()); + + LOG(debug, "split(%s -> %s, %s)", cmd.getBucketId().toString().c_str(), + target1.getBucketId().toString().c_str(), target2.getBucketId().toString().c_str()); + + PersistenceUtil::LockResult lock1(_env.lockAndGetDisk(target1)); + PersistenceUtil::LockResult lock2(_env.lockAndGetDisk(target2)); + +#ifdef ENABLE_BUCKET_OPERATION_LOGGING + { + auto desc = fmt("split(%s -> %s, %s)", + cmd.getBucketId().toString().c_str(), + target1.getBucketId().toString().c_str(), + target2.getBucketId().toString().c_str())); + LOG_BUCKET_OPERATION(cmd.getBucketId(), desc); + LOG_BUCKET_OPERATION(target1.getBucketId(), desc); + if (target2.getRawId() != 0) { + LOG_BUCKET_OPERATION(target2.getBucketId(), desc); + } +} +#endif + spi::Result result = _spi.split(spiBucket, spi::Bucket(target1), + spi::Bucket(target2), tracker->context()); + if (result.hasError()) { + tracker->fail(PersistenceUtil::convertErrorCode(result), result.getErrorMessage()); + return tracker; + } + // After split we need to take all bucket db locks to update them. + // Ensure to take them in rising order. + StorBucketDatabase::WrappedEntry sourceEntry(_env.getBucketDatabase(spiBucket.getBucket().getBucketSpace()).get( + cmd.getBucketId(), "PersistenceThread::handleSplitBucket-source")); + auto reply = std::make_shared<api::SplitBucketReply>(cmd); + api::SplitBucketReply & splitReply = *reply; + tracker->setReply(std::move(reply)); + + typedef std::pair<StorBucketDatabase::WrappedEntry, FileStorHandler::RemapInfo> TargetInfo; + std::vector<TargetInfo> targets; + for (uint32_t i = 0; i < 2; i++) { + const document::Bucket &target(i == 0 ? target1 : target2); + assert(target.getBucketId().getRawId() != 0); + targets.emplace_back(_env.getBucketDatabase(target.getBucketSpace()).get( + target.getBucketId(), "PersistenceThread::handleSplitBucket - Target", + StorBucketDatabase::CREATE_IF_NONEXISTING), + FileStorHandler::RemapInfo(target)); + targets.back().first->setBucketInfo(_env.getBucketInfo(target)); + } + if (LOG_WOULD_LOG(spam)) { + api::BucketInfo targ1(targets[0].first->getBucketInfo()); + api::BucketInfo targ2(targets[1].first->getBucketInfo()); + LOG(spam, "split(%s - %u -> %s - %u, %s - %u)", + cmd.getBucketId().toString().c_str(), + targ1.getMetaCount() + targ2.getMetaCount(), + target1.getBucketId().toString().c_str(), + targ1.getMetaCount(), + target2.getBucketId().toString().c_str(), + targ2.getMetaCount()); + } + FileStorHandler::RemapInfo source(cmd.getBucket()); + _env._fileStorHandler.remapQueueAfterSplit(source, targets[0].second, targets[1].second); + bool ownershipChanged(!_bucketOwnershipNotifier.distributorOwns(cmd.getSourceIndex(), cmd.getBucket())); + // Now release all the bucketdb locks. + for (auto & target : targets) { + if (ownershipChanged) { + notifyGuard.notifyAlways(target.second.bucket, target.first->getBucketInfo()); + } + // The entries vector has the source bucket in element zero, so indexing + // that with i+1 + if (target.second.foundInQueue || target.first->getMetaCount() > 0) { + if (target.first->getMetaCount() == 0) { + // Fake that the bucket has content so it is not deleted. + target.first->info.setMetaCount(1); + // Must make sure target bucket exists when we have pending ops + // to an empty target bucket, since the provider will have + // implicitly erased it by this point. + spi::Bucket createTarget(spi::Bucket(target.second.bucket)); + LOG(debug, "Split target %s was empty, but re-creating it since there are remapped operations queued to it", + createTarget.toString().c_str()); + _spi.createBucket(createTarget, tracker->context()); + } + splitReply.getSplitInfo().emplace_back(target.second.bucket.getBucketId(), + target.first->getBucketInfo()); + target.first.write(); + } else { + target.first.remove(); + } + } + if (sourceEntry.exist()) { + if (ownershipChanged) { + notifyGuard.notifyAlways(cmd.getBucket(), sourceEntry->getBucketInfo()); + } + // Delete the old entry. + sourceEntry.remove(); + } + return tracker; +} + +MessageTracker::UP +SplitJoinHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP tracker) const +{ + tracker->setMetric(_env._metrics.setBucketStates); + NotificationGuard notifyGuard(_bucketOwnershipNotifier); + + LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str()); + spi::Bucket bucket(cmd.getBucket()); + bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE); + spi::BucketInfo::ActiveState newState(shouldBeActive ? spi::BucketInfo::ACTIVE : spi::BucketInfo::NOT_ACTIVE); + + spi::Result result(_spi.setActiveState(bucket, newState)); + if (tracker->checkForError(result)) { + StorBucketDatabase::WrappedEntry + entry = _env.getBucketDatabase(bucket.getBucket().getBucketSpace()).get(cmd.getBucketId(), "handleSetBucketState"); + if (entry.exist()) { + entry->info.setActive(newState == spi::BucketInfo::ACTIVE); + notifyGuard.notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info); + entry.write(); + } else { + LOG(warning, "Got OK setCurrentState result from provider for %s, " + "but bucket has disappeared from service layer database", + cmd.getBucketId().toString().c_str()); + } + + tracker->setReply(std::make_shared<api::SetBucketStateReply>(cmd)); + } + + return tracker; +} + +MessageTracker::UP +SplitJoinHandler::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker) const +{ + tracker->setMetric(_env._metrics.recheckBucketInfo); + document::Bucket bucket(cmd.getBucket()); + api::BucketInfo info(_env.getBucketInfo(bucket)); + NotificationGuard notifyGuard(_bucketOwnershipNotifier); + { + // Update bucket database + StorBucketDatabase::WrappedEntry entry( + _env.getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(), "handleRecheckBucketInfo")); + + if (entry.exist()) { + api::BucketInfo prevInfo(entry->getBucketInfo()); + + if (!(prevInfo == info)) { + notifyGuard.notifyAlways(bucket, info); + entry->info = info; + entry.write(); + } + } + // else: there is a race condition where concurrent execution of + // DeleteBucket in the FileStorManager and this function can cause it + // to look like the provider has a bucket we do not know about, simply + // because this function was executed before the actual + // DeleteBucketCommand in the persistence thread (see ticket 6143025). + } + return tracker; +} + +} diff --git a/storage/src/vespa/storage/persistence/splitjoinhandler.h b/storage/src/vespa/storage/persistence/splitjoinhandler.h new file mode 100644 index 00000000000..b1593931b8f --- /dev/null +++ b/storage/src/vespa/storage/persistence/splitjoinhandler.h @@ -0,0 +1,32 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "types.h" +#include <vespa/storageapi/message/bucketsplitting.h> + +namespace storage { + +namespace spi { struct PersistenceProvider; } +struct PersistenceUtil; +class BucketOwnershipNotifier; +class RecheckBucketInfoCommand; + +/** + * Handle operations that uses changes bucket ownership operations + * It is stateless and thread safe + */ +class SplitJoinHandler : public Types { +public: + SplitJoinHandler(PersistenceUtil &, spi::PersistenceProvider &, BucketOwnershipNotifier &); + MessageTrackerUP handleSplitBucket(api::SplitBucketCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTrackerUP tracker) const; +private: + PersistenceUtil &_env; + spi::PersistenceProvider &_spi; + BucketOwnershipNotifier &_bucketOwnershipNotifier; +}; + +} // storage + diff --git a/storage/src/vespa/storage/persistence/testandsethelper.cpp b/storage/src/vespa/storage/persistence/testandsethelper.cpp index f222325053c..aad82887984 100644 --- a/storage/src/vespa/storage/persistence/testandsethelper.cpp +++ b/storage/src/vespa/storage/persistence/testandsethelper.cpp @@ -35,7 +35,7 @@ void TestAndSetHelper::parseDocumentSelection(const document::DocumentTypeRepo & } spi::GetResult TestAndSetHelper::retrieveDocument(const document::FieldSet & fieldSet, spi::Context & context) { - return _spi.get(_env.getBucket(_docId, _cmd.getBucket()), fieldSet,_cmd.getDocumentId(),context); + return _spi.get(_env.getBucket(_docId, _cmd.getBucket()), fieldSet, _cmd.getDocumentId(),context); } TestAndSetHelper::TestAndSetHelper(const PersistenceUtil & env, const spi::PersistenceProvider & spi, |