summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-10-16 13:53:33 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-10-18 11:25:35 +0000
commitf34acff8f2f40938d97b5b20ed960101d6622eff (patch)
tree607962abf17ef1c30ad137581730fd51e69b97c7
parent3d89f85313a0a8da5d8ad0e435b1877a46e6ecc3 (diff)
Factor out handling of operations that might change bucket ownership.
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp29
-rw-r--r--storage/src/tests/persistence/persistencetestutils.cpp19
-rw-r--r--storage/src/tests/persistence/persistencetestutils.h7
-rw-r--r--storage/src/tests/persistence/persistencethread_splittest.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.h5
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.h2
-rw-r--r--storage/src/vespa/storage/persistence/messages.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/messages.h35
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp209
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.h14
-rw-r--r--storage/src/vespa/storage/persistence/splitjoinhandler.cpp218
-rw-r--r--storage/src/vespa/storage/persistence/splitjoinhandler.h32
-rw-r--r--storage/src/vespa/storage/persistence/testandsethelper.cpp2
15 files changed, 324 insertions, 255 deletions
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 257e943e5ab..4d9b23620f6 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -197,11 +197,12 @@ createThread(vdstestlib::DirConfig& config,
TestServiceLayerApp& node,
spi::PersistenceProvider& provider,
FileStorHandler& filestorHandler,
+ BucketOwnershipNotifier & notifier,
FileStorThreadMetrics& metrics)
{
(void) config;
return std::make_unique<PersistenceThread>(node.executor(), node.getComponentRegister(), config.getConfigId(),
- provider, filestorHandler, metrics);
+ provider, filestorHandler, notifier, metrics);
}
namespace {
@@ -716,12 +717,14 @@ TEST_F(FileStorManagerTest, priority) {
metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(),1, 2);
FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
+ ServiceLayerComponent component(_node->getComponentRegister(), "test");
+ BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender);
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0]));
+ filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]));
std::unique_ptr<DiskThread> thread2(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[1]));
+ filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[1]));
// Creating documents to test with. Different gids, 2 locations.
std::vector<document::Document::SP > documents;
@@ -796,9 +799,11 @@ TEST_F(FileStorManagerTest, split1) {
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1);
FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
+ ServiceLayerComponent component(_node->getComponentRegister(), "test");
+ BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender);
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0]));
+ filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]));
// Creating documents to test with. Different gids, 2 locations.
std::vector<document::Document::SP > documents;
for (uint32_t i=0; i<20; ++i) {
@@ -936,6 +941,8 @@ TEST_F(FileStorManagerTest, split_single_group) {
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(),1, 1);
FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
+ ServiceLayerComponent component(_node->getComponentRegister(), "test");
+ BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender);
spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0));
for (uint32_t j=0; j<1; ++j) {
// Test this twice, once where all the data ends up in file with
@@ -945,7 +952,7 @@ TEST_F(FileStorManagerTest, split_single_group) {
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0]));
+ filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]));
// Creating documents to test with. Different gids, 2 locations.
std::vector<document::Document::SP> documents;
for (uint32_t i=0; i<20; ++i) {
@@ -1048,9 +1055,11 @@ TEST_F(FileStorManagerTest, split_empty_target_with_remapped_ops) {
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1);
FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
+ ServiceLayerComponent component(_node->getComponentRegister(), "test");
+ BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender);
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0]));
+ filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]));
document::BucketId source(16, 0x10001);
@@ -1113,9 +1122,11 @@ TEST_F(FileStorManagerTest, notify_on_split_source_ownership_changed) {
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1);
FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
+ ServiceLayerComponent component(_node->getComponentRegister(), "test");
+ BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender);
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0]));
+ filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]));
document::BucketId source(getFirstBucketNotOwnedByDistributor(0));
createBucket(source, 0);
@@ -1154,9 +1165,11 @@ TEST_F(FileStorManagerTest, join) {
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1);
FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
+ ServiceLayerComponent component(_node->getComponentRegister(), "test");
+ BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender);
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
- filestorHandler, *metrics.disks[0]->threads[0]));
+ filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]));
// Creating documents to test with. Different gids, 2 locations.
std::vector<document::Document::SP > documents;
for (uint32_t i=0; i<20; ++i) {
diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp
index f6ba71940a8..8d06f25319d 100644
--- a/storage/src/tests/persistence/persistencetestutils.cpp
+++ b/storage/src/tests/persistence/persistencetestutils.cpp
@@ -67,7 +67,13 @@ PersistenceTestEnvironment::~PersistenceTestEnvironment() {
}
}
-PersistenceTestUtils::PersistenceTestUtils() = default;
+PersistenceTestUtils::PersistenceTestUtils()
+ : _env(std::make_unique<PersistenceTestEnvironment>("todo-make-unique-persistencetestutils")),
+ _replySender(),
+ _bucketOwnershipNotifier(getEnv()._component, getEnv()._fileStorHandler)
+{
+ setupExecutor(1);
+}
PersistenceTestUtils::~PersistenceTestUtils() = default;
std::string
@@ -76,12 +82,6 @@ PersistenceTestUtils::dumpBucket(const document::BucketId& bid) {
}
void
-PersistenceTestUtils::setupDisks() {
- _env = std::make_unique<PersistenceTestEnvironment>("todo-make-unique-persistencetestutils");
- setupExecutor(1);
-}
-
-void
PersistenceTestUtils::setupExecutor(uint32_t numThreads) {
_sequenceTaskExecutor = vespalib::SequencedTaskExecutor::create(numThreads, 1000, vespalib::Executor::OptimizeFor::ADAPTIVE);
}
@@ -91,7 +91,7 @@ PersistenceTestUtils::createPersistenceThread()
{
return std::make_unique<PersistenceThread>(*_sequenceTaskExecutor, _env->_node.getComponentRegister(),
_env->_config.getConfigId(),getPersistenceProvider(),
- getEnv()._fileStorHandler, getEnv()._metrics);
+ getEnv()._fileStorHandler, _bucketOwnershipNotifier, getEnv()._metrics);
}
document::Document::SP
@@ -101,8 +101,7 @@ PersistenceTestUtils::schedulePut(
uint32_t minSize,
uint32_t maxSize)
{
- document::Document::SP doc(createRandomDocumentAtLocation(
- location, timestamp, minSize, maxSize));
+ document::Document::SP doc(createRandomDocumentAtLocation(location, timestamp, minSize, maxSize));
auto msg = std::make_shared<api::PutCommand>(makeDocumentBucket(document::BucketId(16, location)), doc, timestamp);
fsHandler().schedule(msg);
return doc;
diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h
index 65a7441847e..29a18db413b 100644
--- a/storage/src/tests/persistence/persistencetestutils.h
+++ b/storage/src/tests/persistence/persistencetestutils.h
@@ -6,6 +6,7 @@
#include <vespa/storage/persistence/persistencethread.h>
#include <vespa/storage/persistence/filestorage/filestorhandler.h>
#include <vespa/storage/persistence/persistenceutil.h>
+#include <vespa/storage/persistence/bucketownershipnotifier.h>
#include <vespa/storage/common/messagesender.h>
#include <vespa/storage/common/storagecomponent.h>
#include <vespa/persistence/spi/persistenceprovider.h>
@@ -71,10 +72,11 @@ public:
std::unique_ptr<PersistenceTestEnvironment> _env;
std::unique_ptr<vespalib::ISequencedTaskExecutor> _sequenceTaskExecutor;
ReplySender _replySender;
+ BucketOwnershipNotifier _bucketOwnershipNotifier;
PersistenceTestUtils();
- virtual ~PersistenceTestUtils();
+ ~PersistenceTestUtils() override;
document::Document::SP schedulePut(uint32_t location, spi::Timestamp timestamp, uint32_t minSize = 0, uint32_t maxSize = 128);
@@ -236,9 +238,6 @@ public:
class SingleDiskPersistenceTestUtils : public PersistenceTestUtils
{
public:
- void SetUp() override {
- setupDisks();
- }
};
} // storage
diff --git a/storage/src/tests/persistence/persistencethread_splittest.cpp b/storage/src/tests/persistence/persistencethread_splittest.cpp
index 10fb0b2e6b4..e266d367eab 100644
--- a/storage/src/tests/persistence/persistencethread_splittest.cpp
+++ b/storage/src/tests/persistence/persistencethread_splittest.cpp
@@ -214,7 +214,7 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase)
cmd->setMinByteSize(maxSize);
cmd->setMinDocCount(maxCount);
cmd->setSourceIndex(0);
- MessageTracker::UP result = thread->handleSplitBucket(*cmd, createTracker(cmd, docBucket));
+ MessageTracker::UP result = thread->splitjoinHandler().handleSplitBucket(*cmd, createTracker(cmd, docBucket));
api::ReturnCode code(result->getResult());
EXPECT_EQ(error, code);
if (!code.success()) {
diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt
index 15cd0540338..5707c9ea687 100644
--- a/storage/src/vespa/storage/persistence/CMakeLists.txt
+++ b/storage/src/vespa/storage/persistence/CMakeLists.txt
@@ -12,6 +12,7 @@ vespa_add_library(storage_spersistence OBJECT
processallhandler.cpp
provider_error_wrapper.cpp
splitbitdetector.cpp
+ splitjoinhandler.cpp
testandsethelper.cpp
types.cpp
DEPENDS
diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h
index 4c525b10152..610e66c0615 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.h
+++ b/storage/src/vespa/storage/persistence/asynchandler.h
@@ -13,8 +13,11 @@ namespace spi {
}
struct PersistenceUtil;
+/**
+ * Handle async operations that uses a sequenced executor
+ * It is stateless and thread safe
+ */
class AsyncHandler : public Types {
-
public:
AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, vespalib::ISequencedTaskExecutor & executor);
MessageTrackerUP handlePut(api::PutCommand& cmd, MessageTrackerUP tracker) const;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index fdb52635292..6c2cebfeb19 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -134,7 +134,7 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
LOG(spam, "Setting up the disk");
for (uint32_t j = 0; j < numThreads; j++) {
_threads.push_back(std::make_shared<PersistenceThread>(*_sequencedExecutor, _compReg, _configUri, *_provider,
- *_filestorHandler, *_metrics->disks[0]->threads[j]));
+ *_filestorHandler, *_bucketOwnershipNotifier, *_metrics->disks[0]->threads[j]));
}
}
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
index a0ef9784c1d..85cbbe57d21 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
@@ -49,7 +49,7 @@ class FileStorManager : public StorageLinkQueued,
public MessageSender
{
ServiceLayerComponentRegister & _compReg;
- ServiceLayerComponent _component;
+ ServiceLayerComponent _component;
spi::PersistenceProvider & _providerCore;
ProviderErrorWrapper _providerErrorWrapper;
spi::PersistenceProvider * _provider;
diff --git a/storage/src/vespa/storage/persistence/messages.cpp b/storage/src/vespa/storage/persistence/messages.cpp
index 61a10f71868..90597a49ad7 100644
--- a/storage/src/vespa/storage/persistence/messages.cpp
+++ b/storage/src/vespa/storage/persistence/messages.cpp
@@ -8,7 +8,7 @@ using document::BucketSpace;
namespace storage {
GetIterCommand::GetIterCommand(const document::Bucket &bucket,
- const spi::IteratorId iteratorId,
+ spi::IteratorId iteratorId,
uint32_t maxByteSize)
: api::InternalCommand(ID),
_bucket(bucket),
diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h
index 82dc69156ab..b771af18b17 100644
--- a/storage/src/vespa/storage/persistence/messages.h
+++ b/storage/src/vespa/storage/persistence/messages.h
@@ -8,7 +8,6 @@
#include <vespa/persistence/spi/read_consistency.h>
#include <vespa/vespalib/stllike/hash_set.h>
-
namespace storage {
class GetIterCommand : public api::InternalCommand {
@@ -23,7 +22,7 @@ public:
typedef std::shared_ptr<GetIterCommand> SP;
GetIterCommand(const document::Bucket &bucket,
- const spi::IteratorId iteratorId,
+ spi::IteratorId iteratorId,
uint32_t maxByteSize);
~GetIterCommand() override;
@@ -58,8 +57,8 @@ public:
typedef std::shared_ptr<GetIterReply> SP;
static const uint32_t ID = 1002;
- GetIterReply(GetIterCommand& cmd);
- ~GetIterReply();
+ explicit GetIterReply(GetIterCommand& cmd);
+ ~GetIterReply() override;
bool hasSingleBucketId() const override { return true; }
document::Bucket getBucket() const override { return _bucket; }
@@ -95,7 +94,7 @@ public:
const spi::Selection& selection,
const std::string& fields,
spi::IncludedVersions includedVersions);
- ~CreateIteratorCommand();
+ ~CreateIteratorCommand() override;
bool hasSingleBucketId() const override { return true; }
document::Bucket getBucket() const override { return _bucket; }
const spi::Selection& getSelection() const { return _selection; }
@@ -127,7 +126,7 @@ public:
typedef std::shared_ptr<CreateIteratorReply> SP;
CreateIteratorReply(const CreateIteratorCommand& cmd, spi::IteratorId iteratorId);
- ~CreateIteratorReply();
+ ~CreateIteratorReply() override;
bool hasSingleBucketId() const override { return true; }
document::Bucket getBucket() const override { return _bucket; }
@@ -145,8 +144,8 @@ public:
typedef std::unique_ptr<DestroyIteratorCommand> UP;
typedef std::shared_ptr<DestroyIteratorCommand> SP;
- DestroyIteratorCommand(spi::IteratorId iteratorId);
- ~DestroyIteratorCommand();
+ explicit DestroyIteratorCommand(spi::IteratorId iteratorId);
+ ~DestroyIteratorCommand() override;
spi::IteratorId getIteratorId() const { return _iteratorId; }
@@ -163,8 +162,8 @@ public:
typedef std::unique_ptr<DestroyIteratorReply> UP;
typedef std::shared_ptr<DestroyIteratorReply> SP;
- DestroyIteratorReply(const DestroyIteratorCommand& cmd);
- ~DestroyIteratorReply();
+ explicit DestroyIteratorReply(const DestroyIteratorCommand& cmd);
+ ~DestroyIteratorReply() override;
void print(std::ostream& out, bool verbose, const std::string & indent) const override;
};
@@ -177,8 +176,8 @@ public:
typedef std::shared_ptr<RecheckBucketInfoCommand> SP;
typedef std::unique_ptr<RecheckBucketInfoCommand> UP;
- RecheckBucketInfoCommand(const document::Bucket &bucket);
- ~RecheckBucketInfoCommand();
+ explicit RecheckBucketInfoCommand(const document::Bucket &bucket);
+ ~RecheckBucketInfoCommand() override;
document::Bucket getBucket() const override { return _bucket; }
@@ -195,8 +194,8 @@ public:
typedef std::shared_ptr<RecheckBucketInfoReply> SP;
typedef std::unique_ptr<RecheckBucketInfoReply> UP;
- RecheckBucketInfoReply(const RecheckBucketInfoCommand& cmd);
- ~RecheckBucketInfoReply();
+ explicit RecheckBucketInfoReply(const RecheckBucketInfoCommand& cmd);
+ ~RecheckBucketInfoReply() override;
document::Bucket getBucket() const override { return _bucket; }
@@ -221,8 +220,8 @@ public:
private:
std::unique_ptr<AbortPredicate> _predicate;
public:
- AbortBucketOperationsCommand(std::unique_ptr<AbortPredicate> predicate);
- ~AbortBucketOperationsCommand();
+ explicit AbortBucketOperationsCommand(std::unique_ptr<AbortPredicate> predicate);
+ ~AbortBucketOperationsCommand() override;
bool shouldAbort(const document::Bucket &bucket) const {
return _predicate->shouldAbort(bucket);
@@ -240,8 +239,8 @@ public:
typedef std::shared_ptr<AbortBucketOperationsReply> SP;
typedef std::shared_ptr<const AbortBucketOperationsReply> CSP;
- AbortBucketOperationsReply(const AbortBucketOperationsCommand& cmd);
- ~AbortBucketOperationsReply();
+ explicit AbortBucketOperationsReply(const AbortBucketOperationsCommand& cmd);
+ ~AbortBucketOperationsReply() override;
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
};
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index cc238cd0146..bfa23af5858 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -10,7 +10,6 @@
#include <vespa/document/base/exceptions.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
-#include <vespa/vespalib/stllike/hash_map.hpp>
#include <thread>
#include <vespa/log/log.h>
@@ -25,7 +24,7 @@ namespace {
vespalib::string
createThreadName(size_t stripeId) {
- return vespalib::make_string("PersistenceThread-%zu", stripeId);
+ return fmt("PersistenceThread-%zu", stripeId);
}
}
@@ -35,6 +34,7 @@ PersistenceThread::PersistenceThread(vespalib::ISequencedTaskExecutor & sequence
const config::ConfigUri & configUri,
spi::PersistenceProvider& provider,
FileStorHandler& filestorHandler,
+ BucketOwnershipNotifier & bucketOwnershipNotifier,
FileStorThreadMetrics& metrics)
: _stripeId(filestorHandler.getNextStripeId()),
_component(std::make_unique<ServiceLayerComponent>(compReg, createThreadName(_stripeId))),
@@ -43,9 +43,9 @@ PersistenceThread::PersistenceThread(vespalib::ISequencedTaskExecutor & sequence
_processAllHandler(_env, provider),
_mergeHandler(_env, _spi),
_asyncHandler(_env, _spi, sequencedExecutor),
- _bucketOwnershipNotifier()
+ _splitJoinHandler(_env, provider, bucketOwnershipNotifier),
+ _thread()
{
- _bucketOwnershipNotifier = std::make_unique<BucketOwnershipNotifier>(*_component, filestorHandler);
_thread = _component->startThread(*this, 60s, 1s);
}
@@ -276,140 +276,6 @@ PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd, MessageTrack
return tracker;
}
-MessageTracker::UP
-PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker::UP tracker)
-{
- tracker->setMetric(_env._metrics.splitBuckets);
- NotificationGuard notifyGuard(*_bucketOwnershipNotifier);
-
- // Calculate the various bucket ids involved.
- if (cmd.getBucketId().getUsedBits() >= 58) {
- tracker->fail(api::ReturnCode::ILLEGAL_PARAMETERS,
- "Can't split anymore since maximum split bits is already reached");
- return tracker;
- }
- if (cmd.getMaxSplitBits() <= cmd.getBucketId().getUsedBits()) {
- tracker->fail(api::ReturnCode::ILLEGAL_PARAMETERS,
- "Max lit bits must be set higher than the number of bits used in the bucket to split");
- return tracker;
- }
-
- spi::Bucket spiBucket(cmd.getBucket());
- SplitBitDetector::Result targetInfo;
- if (_env._config.enableMultibitSplitOptimalization) {
- targetInfo = SplitBitDetector::detectSplit(_spi, spiBucket, cmd.getMaxSplitBits(),
- tracker->context(), cmd.getMinDocCount(), cmd.getMinByteSize());
- }
- if (targetInfo.empty() || !_env._config.enableMultibitSplitOptimalization) {
- document::BucketId src(cmd.getBucketId());
- document::BucketId target1(src.getUsedBits() + 1, src.getId());
- document::BucketId target2(src.getUsedBits() + 1, src.getId() | (uint64_t(1) << src.getUsedBits()));
- targetInfo = SplitBitDetector::Result(target1, target2, false);
- }
- if (targetInfo.failed()) {
- tracker->fail(api::ReturnCode::INTERNAL_FAILURE, targetInfo.getReason());
- return tracker;
- }
- // If we get here, we're splitting data in two.
- // (Possibly in special case where a target will be unused)
- assert(targetInfo.success());
- document::Bucket target1(spiBucket.getBucketSpace(), targetInfo.getTarget1());
- document::Bucket target2(spiBucket.getBucketSpace(), targetInfo.getTarget2());
-
- LOG(debug, "split(%s -> %s, %s)", cmd.getBucketId().toString().c_str(),
- target1.getBucketId().toString().c_str(), target2.getBucketId().toString().c_str());
-
- PersistenceUtil::LockResult lock1(_env.lockAndGetDisk(target1));
- PersistenceUtil::LockResult lock2(_env.lockAndGetDisk(target2));
-
-#ifdef ENABLE_BUCKET_OPERATION_LOGGING
- {
- auto desc = fmt("split(%s -> %s, %s)",
- cmd.getBucketId().toString().c_str(),
- target1.getBucketId().toString().c_str(),
- target2.getBucketId().toString().c_str()));
- LOG_BUCKET_OPERATION(cmd.getBucketId(), desc);
- LOG_BUCKET_OPERATION(target1.getBucketId(), desc);
- if (target2.getRawId() != 0) {
- LOG_BUCKET_OPERATION(target2.getBucketId(), desc);
- }
- }
-#endif
- spi::Result result = _spi.split(spiBucket, spi::Bucket(target1),
- spi::Bucket(target2), tracker->context());
- if (result.hasError()) {
- tracker->fail(PersistenceUtil::convertErrorCode(result), result.getErrorMessage());
- return tracker;
- }
- // After split we need to take all bucket db locks to update them.
- // Ensure to take them in rising order.
- StorBucketDatabase::WrappedEntry sourceEntry(_env.getBucketDatabase(spiBucket.getBucket().getBucketSpace()).get(
- cmd.getBucketId(), "PersistenceThread::handleSplitBucket-source"));
- auto reply = std::make_shared<api::SplitBucketReply>(cmd);
- api::SplitBucketReply & splitReply = *reply;
- tracker->setReply(std::move(reply));
-
- typedef std::pair<StorBucketDatabase::WrappedEntry, FileStorHandler::RemapInfo> TargetInfo;
- std::vector<TargetInfo> targets;
- for (uint32_t i = 0; i < 2; i++) {
- const document::Bucket &target(i == 0 ? target1 : target2);
- assert(target.getBucketId().getRawId() != 0);
- targets.emplace_back(_env.getBucketDatabase(target.getBucketSpace()).get(
- target.getBucketId(), "PersistenceThread::handleSplitBucket - Target",
- StorBucketDatabase::CREATE_IF_NONEXISTING),
- FileStorHandler::RemapInfo(target));
- targets.back().first->setBucketInfo(_env.getBucketInfo(target));
- }
- if (LOG_WOULD_LOG(spam)) {
- api::BucketInfo targ1(targets[0].first->getBucketInfo());
- api::BucketInfo targ2(targets[1].first->getBucketInfo());
- LOG(spam, "split(%s - %u -> %s - %u, %s - %u)",
- cmd.getBucketId().toString().c_str(),
- targ1.getMetaCount() + targ2.getMetaCount(),
- target1.getBucketId().toString().c_str(),
- targ1.getMetaCount(),
- target2.getBucketId().toString().c_str(),
- targ2.getMetaCount());
- }
- FileStorHandler::RemapInfo source(cmd.getBucket());
- _env._fileStorHandler.remapQueueAfterSplit(source, targets[0].second, targets[1].second);
- bool ownershipChanged(!_bucketOwnershipNotifier->distributorOwns(cmd.getSourceIndex(), cmd.getBucket()));
- // Now release all the bucketdb locks.
- for (auto & target : targets) {
- if (ownershipChanged) {
- notifyGuard.notifyAlways(target.second.bucket, target.first->getBucketInfo());
- }
- // The entries vector has the source bucket in element zero, so indexing
- // that with i+1
- if (target.second.foundInQueue || target.first->getMetaCount() > 0) {
- if (target.first->getMetaCount() == 0) {
- // Fake that the bucket has content so it is not deleted.
- target.first->info.setMetaCount(1);
- // Must make sure target bucket exists when we have pending ops
- // to an empty target bucket, since the provider will have
- // implicitly erased it by this point.
- spi::Bucket createTarget(spi::Bucket(target.second.bucket));
- LOG(debug, "Split target %s was empty, but re-creating it since there are remapped operations queued to it",
- createTarget.toString().c_str());
- _spi.createBucket(createTarget, tracker->context());
- }
- splitReply.getSplitInfo().emplace_back(target.second.bucket.getBucketId(),
- target.first->getBucketInfo());
- target.first.write();
- } else {
- target.first.remove();
- }
- }
- if (sourceEntry.exist()) {
- if (ownershipChanged) {
- notifyGuard.notifyAlways(cmd.getBucket(), sourceEntry->getBucketInfo());
- }
- // Delete the old entry.
- sourceEntry.remove();
- }
- return tracker;
-}
-
bool
PersistenceThread::validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker)
{
@@ -512,37 +378,6 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTracke
}
MessageTracker::UP
-PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP tracker)
-{
- tracker->setMetric(_env._metrics.setBucketStates);
- NotificationGuard notifyGuard(*_bucketOwnershipNotifier);
-
- LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str());
- spi::Bucket bucket(cmd.getBucket());
- bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE);
- spi::BucketInfo::ActiveState newState(shouldBeActive ? spi::BucketInfo::ACTIVE : spi::BucketInfo::NOT_ACTIVE);
-
- spi::Result result(_spi.setActiveState(bucket, newState));
- if (tracker->checkForError(result)) {
- StorBucketDatabase::WrappedEntry
- entry = _env.getBucketDatabase(bucket.getBucket().getBucketSpace()).get(cmd.getBucketId(), "handleSetBucketState");
- if (entry.exist()) {
- entry->info.setActive(newState == spi::BucketInfo::ACTIVE);
- notifyGuard.notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info);
- entry.write();
- } else {
- LOG(warning, "Got OK setCurrentState result from provider for %s, "
- "but bucket has disappeared from service layer database",
- cmd.getBucketId().toString().c_str());
- }
-
- tracker->setReply(std::make_shared<api::SetBucketStateReply>(cmd));
- }
-
- return tracker;
-}
-
-MessageTracker::UP
PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, MessageTracker::UP tracker)
{
tracker->setMetric(_env._metrics.internalJoin);
@@ -569,36 +404,6 @@ PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, Mess
}
MessageTracker::UP
-PersistenceThread::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker)
-{
- tracker->setMetric(_env._metrics.recheckBucketInfo);
- document::Bucket bucket(cmd.getBucket());
- api::BucketInfo info(_env.getBucketInfo(bucket));
- NotificationGuard notifyGuard(*_bucketOwnershipNotifier);
- {
- // Update bucket database
- StorBucketDatabase::WrappedEntry entry(
- _component->getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(), "handleRecheckBucketInfo"));
-
- if (entry.exist()) {
- api::BucketInfo prevInfo(entry->getBucketInfo());
-
- if (!(prevInfo == info)) {
- notifyGuard.notifyAlways(bucket, info);
- entry->info = info;
- entry.write();
- }
- }
- // else: there is a race condition where concurrent execution of
- // DeleteBucket in the FileStorManager and this function can cause it
- // to look like the provider has a bucket we do not know about, simply
- // because this function was executed before the actual
- // DeleteBucketCommand in the persistence thread (see ticket 6143025).
- }
- return tracker;
-}
-
-MessageTracker::UP
PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTracker::UP tracker)
{
switch (msg.getType().getId()) {
@@ -619,7 +424,7 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTra
case api::MessageType::JOINBUCKETS_ID:
return handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), std::move(tracker));
case api::MessageType::SPLITBUCKET_ID:
- return handleSplitBucket(static_cast<api::SplitBucketCommand&>(msg), std::move(tracker));
+ return _splitJoinHandler.handleSplitBucket(static_cast<api::SplitBucketCommand&>(msg), std::move(tracker));
// Depends on iterators
case api::MessageType::STATBUCKET_ID:
return _processAllHandler.handleStatBucket(static_cast<api::StatBucketCommand&>(msg), std::move(tracker));
@@ -632,7 +437,7 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTra
case api::MessageType::APPLYBUCKETDIFF_ID:
return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), std::move(tracker));
case api::MessageType::SETBUCKETSTATE_ID:
- return handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker));
+ return _splitJoinHandler.handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker));
case api::MessageType::INTERNAL_ID:
switch(static_cast<api::InternalCommand&>(msg).getType()) {
case GetIterCommand::ID:
@@ -646,7 +451,7 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTra
case InternalBucketJoinCommand::ID:
return handleInternalBucketJoin(static_cast<InternalBucketJoinCommand&>(msg), std::move(tracker));
case RecheckBucketInfoCommand::ID:
- return handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg), std::move(tracker));
+ return _splitJoinHandler.handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg), std::move(tracker));
default:
LOG(warning, "Persistence thread received unhandled internal command %s", msg.toString().c_str());
break;
diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h
index 2a3ff813c7e..6afb58dfc4e 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.h
+++ b/storage/src/vespa/storage/persistence/persistencethread.h
@@ -8,6 +8,7 @@
#include "asynchandler.h"
#include "persistenceutil.h"
#include "provider_error_wrapper.h"
+#include "splitjoinhandler.h"
#include <vespa/storage/common/bucketmessages.h>
#include <vespa/storage/common/storagecomponent.h>
#include <vespa/storage/common/statusmessages.h>
@@ -20,9 +21,9 @@ class BucketOwnershipNotifier;
class PersistenceThread final : public DiskThread, public Types
{
public:
- PersistenceThread(vespalib::ISequencedTaskExecutor &, ServiceLayerComponentRegister&,
- const config::ConfigUri & configUri, spi::PersistenceProvider& provider,
- FileStorHandler& filestorHandler, FileStorThreadMetrics& metrics);
+ PersistenceThread(vespalib::ISequencedTaskExecutor &, ServiceLayerComponentRegister &,
+ const config::ConfigUri &, spi::PersistenceProvider &,
+ FileStorHandler &, BucketOwnershipNotifier &, FileStorThreadMetrics&);
~PersistenceThread() override;
/** Waits for current operation to be finished. */
@@ -38,12 +39,11 @@ public:
MessageTracker::UP handleReadBucketList(ReadBucketList& cmd, MessageTracker::UP tracker);
MessageTracker::UP handleReadBucketInfo(ReadBucketInfo& cmd, MessageTracker::UP tracker);
MessageTracker::UP handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTracker::UP tracker);
- MessageTracker::UP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP tracker);
MessageTracker::UP handleInternalBucketJoin(InternalBucketJoinCommand& cmd, MessageTracker::UP tracker);
- MessageTracker::UP handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker::UP tracker);
- MessageTracker::UP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker);
+ //TODO Rewrite tests to avoid this api leak
const AsyncHandler & asyncHandler() const { return _asyncHandler; }
+ const SplitJoinHandler & splitjoinHandler() const { return _splitJoinHandler; }
private:
uint32_t _stripeId;
ServiceLayerComponent::UP _component;
@@ -52,8 +52,8 @@ private:
ProcessAllHandler _processAllHandler;
MergeHandler _mergeHandler;
AsyncHandler _asyncHandler;
+ SplitJoinHandler _splitJoinHandler;
framework::Thread::UP _thread;
- std::unique_ptr<BucketOwnershipNotifier> _bucketOwnershipNotifier;
bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
diff --git a/storage/src/vespa/storage/persistence/splitjoinhandler.cpp b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp
new file mode 100644
index 00000000000..5b66ffa5929
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp
@@ -0,0 +1,218 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "splitjoinhandler.h"
+#include "persistenceutil.h"
+#include "bucketownershipnotifier.h"
+#include "splitbitdetector.h"
+#include "messages.h"
+#include <vespa/persistence/spi/persistenceprovider.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".persistence.splitjoinhandler");
+
+namespace storage {
+
+SplitJoinHandler::SplitJoinHandler(PersistenceUtil & env, spi::PersistenceProvider & spi,
+ BucketOwnershipNotifier & notifier)
+ : _env(env),
+ _spi(spi),
+ _bucketOwnershipNotifier(notifier)
+{
+}
+
+MessageTracker::UP
+SplitJoinHandler::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker::UP tracker) const
+{
+ tracker->setMetric(_env._metrics.splitBuckets);
+ NotificationGuard notifyGuard(_bucketOwnershipNotifier);
+
+ // Calculate the various bucket ids involved.
+ if (cmd.getBucketId().getUsedBits() >= 58) {
+ tracker->fail(api::ReturnCode::ILLEGAL_PARAMETERS,
+ "Can't split anymore since maximum split bits is already reached");
+ return tracker;
+ }
+ if (cmd.getMaxSplitBits() <= cmd.getBucketId().getUsedBits()) {
+ tracker->fail(api::ReturnCode::ILLEGAL_PARAMETERS,
+ "Max lit bits must be set higher than the number of bits used in the bucket to split");
+ return tracker;
+ }
+
+ spi::Bucket spiBucket(cmd.getBucket());
+ SplitBitDetector::Result targetInfo;
+ if (_env._config.enableMultibitSplitOptimalization) {
+ targetInfo = SplitBitDetector::detectSplit(_spi, spiBucket, cmd.getMaxSplitBits(),
+ tracker->context(), cmd.getMinDocCount(), cmd.getMinByteSize());
+ }
+ if (targetInfo.empty() || !_env._config.enableMultibitSplitOptimalization) {
+ document::BucketId src(cmd.getBucketId());
+ document::BucketId target1(src.getUsedBits() + 1, src.getId());
+ document::BucketId target2(src.getUsedBits() + 1, src.getId() | (uint64_t(1) << src.getUsedBits()));
+ targetInfo = SplitBitDetector::Result(target1, target2, false);
+ }
+ if (targetInfo.failed()) {
+ tracker->fail(api::ReturnCode::INTERNAL_FAILURE, targetInfo.getReason());
+ return tracker;
+ }
+ // If we get here, we're splitting data in two.
+ // (Possibly in special case where a target will be unused)
+ assert(targetInfo.success());
+ document::Bucket target1(spiBucket.getBucketSpace(), targetInfo.getTarget1());
+ document::Bucket target2(spiBucket.getBucketSpace(), targetInfo.getTarget2());
+
+ LOG(debug, "split(%s -> %s, %s)", cmd.getBucketId().toString().c_str(),
+ target1.getBucketId().toString().c_str(), target2.getBucketId().toString().c_str());
+
+ PersistenceUtil::LockResult lock1(_env.lockAndGetDisk(target1));
+ PersistenceUtil::LockResult lock2(_env.lockAndGetDisk(target2));
+
+#ifdef ENABLE_BUCKET_OPERATION_LOGGING
+ {
+ auto desc = fmt("split(%s -> %s, %s)",
+ cmd.getBucketId().toString().c_str(),
+ target1.getBucketId().toString().c_str(),
+ target2.getBucketId().toString().c_str()));
+ LOG_BUCKET_OPERATION(cmd.getBucketId(), desc);
+ LOG_BUCKET_OPERATION(target1.getBucketId(), desc);
+ if (target2.getRawId() != 0) {
+ LOG_BUCKET_OPERATION(target2.getBucketId(), desc);
+ }
+}
+#endif
+ spi::Result result = _spi.split(spiBucket, spi::Bucket(target1),
+ spi::Bucket(target2), tracker->context());
+ if (result.hasError()) {
+ tracker->fail(PersistenceUtil::convertErrorCode(result), result.getErrorMessage());
+ return tracker;
+ }
+ // After split we need to take all bucket db locks to update them.
+ // Ensure to take them in rising order.
+ StorBucketDatabase::WrappedEntry sourceEntry(_env.getBucketDatabase(spiBucket.getBucket().getBucketSpace()).get(
+ cmd.getBucketId(), "PersistenceThread::handleSplitBucket-source"));
+ auto reply = std::make_shared<api::SplitBucketReply>(cmd);
+ api::SplitBucketReply & splitReply = *reply;
+ tracker->setReply(std::move(reply));
+
+ typedef std::pair<StorBucketDatabase::WrappedEntry, FileStorHandler::RemapInfo> TargetInfo;
+ std::vector<TargetInfo> targets;
+ for (uint32_t i = 0; i < 2; i++) {
+ const document::Bucket &target(i == 0 ? target1 : target2);
+ assert(target.getBucketId().getRawId() != 0);
+ targets.emplace_back(_env.getBucketDatabase(target.getBucketSpace()).get(
+ target.getBucketId(), "PersistenceThread::handleSplitBucket - Target",
+ StorBucketDatabase::CREATE_IF_NONEXISTING),
+ FileStorHandler::RemapInfo(target));
+ targets.back().first->setBucketInfo(_env.getBucketInfo(target));
+ }
+ if (LOG_WOULD_LOG(spam)) {
+ api::BucketInfo targ1(targets[0].first->getBucketInfo());
+ api::BucketInfo targ2(targets[1].first->getBucketInfo());
+ LOG(spam, "split(%s - %u -> %s - %u, %s - %u)",
+ cmd.getBucketId().toString().c_str(),
+ targ1.getMetaCount() + targ2.getMetaCount(),
+ target1.getBucketId().toString().c_str(),
+ targ1.getMetaCount(),
+ target2.getBucketId().toString().c_str(),
+ targ2.getMetaCount());
+ }
+ FileStorHandler::RemapInfo source(cmd.getBucket());
+ _env._fileStorHandler.remapQueueAfterSplit(source, targets[0].second, targets[1].second);
+ bool ownershipChanged(!_bucketOwnershipNotifier.distributorOwns(cmd.getSourceIndex(), cmd.getBucket()));
+ // Now release all the bucketdb locks.
+ for (auto & target : targets) {
+ if (ownershipChanged) {
+ notifyGuard.notifyAlways(target.second.bucket, target.first->getBucketInfo());
+ }
+ // The entries vector has the source bucket in element zero, so indexing
+ // that with i+1
+ if (target.second.foundInQueue || target.first->getMetaCount() > 0) {
+ if (target.first->getMetaCount() == 0) {
+ // Fake that the bucket has content so it is not deleted.
+ target.first->info.setMetaCount(1);
+ // Must make sure target bucket exists when we have pending ops
+ // to an empty target bucket, since the provider will have
+ // implicitly erased it by this point.
+ spi::Bucket createTarget(spi::Bucket(target.second.bucket));
+ LOG(debug, "Split target %s was empty, but re-creating it since there are remapped operations queued to it",
+ createTarget.toString().c_str());
+ _spi.createBucket(createTarget, tracker->context());
+ }
+ splitReply.getSplitInfo().emplace_back(target.second.bucket.getBucketId(),
+ target.first->getBucketInfo());
+ target.first.write();
+ } else {
+ target.first.remove();
+ }
+ }
+ if (sourceEntry.exist()) {
+ if (ownershipChanged) {
+ notifyGuard.notifyAlways(cmd.getBucket(), sourceEntry->getBucketInfo());
+ }
+ // Delete the old entry.
+ sourceEntry.remove();
+ }
+ return tracker;
+}
+
+MessageTracker::UP
+SplitJoinHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP tracker) const
+{
+ tracker->setMetric(_env._metrics.setBucketStates);
+ NotificationGuard notifyGuard(_bucketOwnershipNotifier);
+
+ LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str());
+ spi::Bucket bucket(cmd.getBucket());
+ bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE);
+ spi::BucketInfo::ActiveState newState(shouldBeActive ? spi::BucketInfo::ACTIVE : spi::BucketInfo::NOT_ACTIVE);
+
+ spi::Result result(_spi.setActiveState(bucket, newState));
+ if (tracker->checkForError(result)) {
+ StorBucketDatabase::WrappedEntry
+ entry = _env.getBucketDatabase(bucket.getBucket().getBucketSpace()).get(cmd.getBucketId(), "handleSetBucketState");
+ if (entry.exist()) {
+ entry->info.setActive(newState == spi::BucketInfo::ACTIVE);
+ notifyGuard.notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info);
+ entry.write();
+ } else {
+ LOG(warning, "Got OK setCurrentState result from provider for %s, "
+ "but bucket has disappeared from service layer database",
+ cmd.getBucketId().toString().c_str());
+ }
+
+ tracker->setReply(std::make_shared<api::SetBucketStateReply>(cmd));
+ }
+
+ return tracker;
+}
+
+MessageTracker::UP
+SplitJoinHandler::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker) const
+{
+ tracker->setMetric(_env._metrics.recheckBucketInfo);
+ document::Bucket bucket(cmd.getBucket());
+ api::BucketInfo info(_env.getBucketInfo(bucket));
+ NotificationGuard notifyGuard(_bucketOwnershipNotifier);
+ {
+ // Update bucket database
+ StorBucketDatabase::WrappedEntry entry(
+ _env.getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(), "handleRecheckBucketInfo"));
+
+ if (entry.exist()) {
+ api::BucketInfo prevInfo(entry->getBucketInfo());
+
+ if (!(prevInfo == info)) {
+ notifyGuard.notifyAlways(bucket, info);
+ entry->info = info;
+ entry.write();
+ }
+ }
+ // else: there is a race condition where concurrent execution of
+ // DeleteBucket in the FileStorManager and this function can cause it
+ // to look like the provider has a bucket we do not know about, simply
+ // because this function was executed before the actual
+ // DeleteBucketCommand in the persistence thread (see ticket 6143025).
+ }
+ return tracker;
+}
+
+}
diff --git a/storage/src/vespa/storage/persistence/splitjoinhandler.h b/storage/src/vespa/storage/persistence/splitjoinhandler.h
new file mode 100644
index 00000000000..b1593931b8f
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/splitjoinhandler.h
@@ -0,0 +1,32 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "types.h"
+#include <vespa/storageapi/message/bucketsplitting.h>
+
+namespace storage {
+
+namespace spi { struct PersistenceProvider; }
+struct PersistenceUtil;
+class BucketOwnershipNotifier;
+class RecheckBucketInfoCommand;
+
+/**
+ * Handle operations that uses changes bucket ownership operations
+ * It is stateless and thread safe
+ */
+class SplitJoinHandler : public Types {
+public:
+ SplitJoinHandler(PersistenceUtil &, spi::PersistenceProvider &, BucketOwnershipNotifier &);
+ MessageTrackerUP handleSplitBucket(api::SplitBucketCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTrackerUP tracker) const;
+private:
+ PersistenceUtil &_env;
+ spi::PersistenceProvider &_spi;
+ BucketOwnershipNotifier &_bucketOwnershipNotifier;
+};
+
+} // storage
+
diff --git a/storage/src/vespa/storage/persistence/testandsethelper.cpp b/storage/src/vespa/storage/persistence/testandsethelper.cpp
index f222325053c..aad82887984 100644
--- a/storage/src/vespa/storage/persistence/testandsethelper.cpp
+++ b/storage/src/vespa/storage/persistence/testandsethelper.cpp
@@ -35,7 +35,7 @@ void TestAndSetHelper::parseDocumentSelection(const document::DocumentTypeRepo &
}
spi::GetResult TestAndSetHelper::retrieveDocument(const document::FieldSet & fieldSet, spi::Context & context) {
- return _spi.get(_env.getBucket(_docId, _cmd.getBucket()), fieldSet,_cmd.getDocumentId(),context);
+ return _spi.get(_env.getBucket(_docId, _cmd.getBucket()), fieldSet, _cmd.getDocumentId(),context);
}
TestAndSetHelper::TestAndSetHelper(const PersistenceUtil & env, const spi::PersistenceProvider & spi,