summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2017-10-25 16:22:04 +0200
committerGitHub <noreply@github.com>2017-10-25 16:22:04 +0200
commit9606e88b7ca082f36eb38b0e197a0513f76ef6eb (patch)
treedccf8b7a99a0dcf714eb05771e5e7119f68e8377
parentd5ff1d39ec4e1aeff417fb20d9b997390b91887b (diff)
parentb01db859c00ad78a14e0e0da78c50ca94b6396c9 (diff)
Merge pull request #3878 from vespa-engine/toregge/use-document-bucket-to-track-merge-state
Toregge/use document bucket to track merge state
-rw-r--r--storage/src/tests/persistence/filestorage/operationabortingtest.cpp28
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp42
-rw-r--r--storage/src/tests/persistence/persistencetestutils.cpp14
-rw-r--r--storage/src/tests/persistence/persistencetestutils.h2
-rw-r--r--storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp10
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h10
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp18
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h11
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp44
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.h6
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp51
-rw-r--r--storage/src/vespa/storage/persistence/messages.cpp11
-rw-r--r--storage/src/vespa/storage/persistence/messages.h32
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp6
-rw-r--r--storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp8
16 files changed, 141 insertions, 156 deletions
diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
index aab0aa2c0fb..d4b749772a2 100644
--- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
+++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
@@ -193,13 +193,37 @@ OperationAbortingTest::validateReplies(
namespace {
+class ExplicitBucketSetPredicate : public AbortBucketOperationsCommand::AbortPredicate {
+ using BucketSet = vespalib::hash_set<document::BucketId, document::BucketId::hash>;
+ BucketSet _bucketsToAbort;
+
+ bool doShouldAbort(const document::Bucket &bucket) const override;
+public:
+ ~ExplicitBucketSetPredicate();
+
+ template <typename Iterator>
+ ExplicitBucketSetPredicate(Iterator first, Iterator last)
+ : _bucketsToAbort(first, last)
+ { }
+
+ const BucketSet& getBucketsToAbort() const {
+ return _bucketsToAbort;
+ }
+};
+
+bool
+ExplicitBucketSetPredicate::doShouldAbort(const document::Bucket &bucket) const {
+ return _bucketsToAbort.find(bucket.getBucketId()) != _bucketsToAbort.end();
+}
+
+ExplicitBucketSetPredicate::~ExplicitBucketSetPredicate() { }
+
template <typename Container>
AbortBucketOperationsCommand::SP
makeAbortCmd(const Container& buckets)
{
std::unique_ptr<AbortBucketOperationsCommand::AbortPredicate> pred(
- new AbortBucketOperationsCommand::ExplicitBucketSetPredicate(
- buckets.begin(), buckets.end()));
+ new ExplicitBucketSetPredicate(buckets.begin(), buckets.end()));
AbortBucketOperationsCommand::SP cmd(
new AbortBucketOperationsCommand(std::move(pred)));
return cmd;
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index 120dbd4ce45..47ed60b62ad 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -20,7 +20,7 @@ namespace storage {
struct MergeHandlerTest : public SingleDiskPersistenceTestUtils
{
uint32_t _location; // Location used for all merge tests
- document::BucketId _bucket; // Bucket used for all merge tests
+ document::Bucket _bucket; // Bucket used for all merge tests
uint64_t _maxTimestamp;
std::vector<api::MergeBucketCommand::Node> _nodes;
std::unique_ptr<spi::Context> _context;
@@ -235,13 +235,13 @@ MergeHandlerTest::setUp() {
SingleDiskPersistenceTestUtils::setUp();
_location = 1234;
- _bucket = document::BucketId(16, _location);
+ _bucket = makeDocumentBucket(document::BucketId(16, _location));
_maxTimestamp = 11501;
LOG(info, "Creating %s in bucket database", _bucket.toString().c_str());
bucketdb::StorageBucketInfo bucketDBEntry;
bucketDBEntry.disk = 0;
- getEnv().getBucketDatabase().insert(_bucket, bucketDBEntry, "mergetestsetup");
+ getEnv().getBucketDatabase().insert(_bucket.getBucketId(), bucketDBEntry, "mergetestsetup");
LOG(info, "Creating bucket to merge");
createTestBucket(_bucket);
@@ -267,7 +267,7 @@ MergeHandlerTest::testMergeBucketCommand()
MergeHandler handler(getPersistenceProvider(), getEnv());
LOG(info, "Handle a merge bucket command");
- api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp);
+ api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
cmd.setSourceIndex(1234);
MessageTracker::UP tracker = handler.handleMergeBucket(cmd, *_context);
@@ -294,7 +294,7 @@ MergeHandlerTest::testGetBucketDiffChain(bool midChain)
MergeHandler handler(getPersistenceProvider(), getEnv());
LOG(info, "Verifying that get bucket diff is sent on");
- api::GetBucketDiffCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp);
+ api::GetBucketDiffCommand cmd(_bucket, _nodes, _maxTimestamp);
MessageTracker::UP tracker1 = handler.handleGetBucketDiff(cmd, *_context);
api::StorageMessage::SP replySent = tracker1->getReply();
@@ -338,7 +338,7 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain)
MergeHandler handler(getPersistenceProvider(), getEnv());
LOG(info, "Verifying that apply bucket diff is sent on");
- api::ApplyBucketDiffCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp);
+ api::ApplyBucketDiffCommand cmd(_bucket, _nodes, _maxTimestamp);
MessageTracker::UP tracker1 = handler.handleApplyBucketDiff(cmd, *_context);
api::StorageMessage::SP replySent = tracker1->getReply();
@@ -382,7 +382,7 @@ MergeHandlerTest::testMasterMessageFlow()
MergeHandler handler(getPersistenceProvider(), getEnv());
LOG(info, "Handle a merge bucket command");
- api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp);
+ api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(cmd, *_context);
LOG(info, "Check state");
@@ -644,7 +644,7 @@ MergeHandlerTest::testChunkedApplyBucketDiff()
MergeHandler handler(getPersistenceProvider(), getEnv(), maxChunkSize);
LOG(info, "Handle a merge bucket command");
- api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp);
+ api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(cmd, *_context);
std::shared_ptr<api::GetBucketDiffCommand> getBucketDiffCmd(
@@ -732,7 +732,7 @@ MergeHandlerTest::testChunkLimitPartiallyFilledDiff()
setUpChain(MIDDLE);
std::shared_ptr<api::ApplyBucketDiffCommand> applyBucketDiffCmd(
- new api::ApplyBucketDiffCommand(makeDocumentBucket(_bucket), _nodes, maxChunkSize));
+ new api::ApplyBucketDiffCommand(_bucket, _nodes, maxChunkSize));
applyBucketDiffCmd->getDiff() = applyDiff;
MergeHandler handler(
@@ -753,7 +753,7 @@ MergeHandlerTest::testMaxTimestamp()
MergeHandler handler(getPersistenceProvider(), getEnv());
- api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp);
+ api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(cmd, *_context);
std::shared_ptr<api::GetBucketDiffCommand> getCmd(
@@ -820,7 +820,7 @@ MergeHandlerTest::createDummyApplyDiff(int timestampOffset,
}
std::shared_ptr<api::ApplyBucketDiffCommand> applyBucketDiffCmd(
- new api::ApplyBucketDiffCommand(makeDocumentBucket(_bucket), _nodes, 1024*1024));
+ new api::ApplyBucketDiffCommand(_bucket, _nodes, 1024*1024));
applyBucketDiffCmd->getDiff() = applyDiff;
return applyBucketDiffCmd;
}
@@ -856,7 +856,7 @@ MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset,
}
std::shared_ptr<api::GetBucketDiffCommand> getBucketDiffCmd(
- new api::GetBucketDiffCommand(makeDocumentBucket(_bucket), _nodes, 1024*1024));
+ new api::GetBucketDiffCommand(_bucket, _nodes, 1024*1024));
getBucketDiffCmd->getDiff() = diff;
return getBucketDiffCmd;
}
@@ -907,7 +907,7 @@ void
MergeHandlerTest::testMergeProgressSafeGuard()
{
MergeHandler handler(getPersistenceProvider(), getEnv());
- api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp);
+ api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(cmd, *_context);
std::shared_ptr<api::GetBucketDiffCommand> getBucketDiffCmd(
@@ -943,7 +943,7 @@ MergeHandlerTest::testSafeGuardNotInvokedWhenHasMaskChanges()
_nodes.push_back(api::MergeBucketCommand::Node(0, false));
_nodes.push_back(api::MergeBucketCommand::Node(1, false));
_nodes.push_back(api::MergeBucketCommand::Node(2, false));
- api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp);
+ api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(cmd, *_context);
std::shared_ptr<api::GetBucketDiffCommand> getBucketDiffCmd(
@@ -991,7 +991,7 @@ MergeHandlerTest::testEntryRemovedAfterGetBucketDiff()
}
setUpChain(BACK);
std::shared_ptr<api::ApplyBucketDiffCommand> applyBucketDiffCmd(
- new api::ApplyBucketDiffCommand(makeDocumentBucket(_bucket), _nodes, 1024*1024));
+ new api::ApplyBucketDiffCommand(_bucket, _nodes, 1024*1024));
applyBucketDiffCmd->getDiff() = applyDiff;
MessageTracker::UP tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context);
@@ -1086,7 +1086,7 @@ MergeHandlerTest::HandleMergeBucketInvoker::invoke(
MergeHandler& handler,
spi::Context& context)
{
- api::MergeBucketCommand cmd(makeDocumentBucket(test._bucket), test._nodes, test._maxTimestamp);
+ api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp);
handler.handleMergeBucket(cmd, context);
}
@@ -1126,7 +1126,7 @@ MergeHandlerTest::HandleGetBucketDiffInvoker::invoke(
MergeHandler& handler,
spi::Context& context)
{
- api::GetBucketDiffCommand cmd(makeDocumentBucket(test._bucket), test._nodes, test._maxTimestamp);
+ api::GetBucketDiffCommand cmd(test._bucket, test._nodes, test._maxTimestamp);
handler.handleGetBucketDiff(cmd, context);
}
@@ -1215,7 +1215,7 @@ MergeHandlerTest::HandleGetBucketDiffReplyInvoker::beforeInvoke(
MergeHandler& handler,
spi::Context& context)
{
- api::MergeBucketCommand cmd(makeDocumentBucket(test._bucket), test._nodes, test._maxTimestamp);
+ api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp);
handler.handleMergeBucket(cmd, context);
_diffCmd = test.fetchSingleMessage<api::GetBucketDiffCommand>();
}
@@ -1287,7 +1287,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::beforeInvoke(
++_counter;
_stub.clear();
if (getChainPos() == FRONT) {
- api::MergeBucketCommand cmd(makeDocumentBucket(test._bucket), test._nodes, test._maxTimestamp);
+ api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp);
handler.handleMergeBucket(cmd, context);
std::shared_ptr<api::GetBucketDiffCommand> diffCmd(
test.fetchSingleMessage<api::GetBucketDiffCommand>());
@@ -1470,7 +1470,7 @@ MergeHandlerTest::testRemovePutOnExistingTimestamp()
}
std::shared_ptr<api::ApplyBucketDiffCommand> applyBucketDiffCmd(
- new api::ApplyBucketDiffCommand(makeDocumentBucket(_bucket), _nodes, 1024*1024));
+ new api::ApplyBucketDiffCommand(_bucket, _nodes, 1024*1024));
applyBucketDiffCmd->getDiff() = applyDiff;
MessageTracker::UP tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context);
@@ -1480,7 +1480,7 @@ MergeHandlerTest::testRemovePutOnExistingTimestamp()
tracker->getReply()));
CPPUNIT_ASSERT(applyBucketDiffReply.get());
- api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp);
+ api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(cmd, *_context);
std::shared_ptr<api::GetBucketDiffCommand> getBucketDiffCmd(
diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp
index c17c4837bfa..fb6bc298b08 100644
--- a/storage/src/tests/persistence/persistencetestutils.cpp
+++ b/storage/src/tests/persistence/persistencetestutils.cpp
@@ -373,10 +373,10 @@ PersistenceTestUtils::createRandomDocumentAtLocation(
}
void
-PersistenceTestUtils::createTestBucket(const document::BucketId& bucket,
+PersistenceTestUtils::createTestBucket(const document::Bucket& bucket,
uint16_t disk)
{
-
+ document::BucketId bucketId(bucket.getBucketId());
uint32_t opsPerType = 2;
uint32_t numberOfLocations = 2;
uint32_t minDocSize = 0;
@@ -388,27 +388,27 @@ PersistenceTestUtils::createTestBucket(const document::BucketId& bucket,
uint32_t seed = useHeaderOnly * 10000 + optype * 1000 + i + 1;
uint64_t location = (seed % numberOfLocations);
location <<= 32;
- location += (bucket.getRawId() & 0xffffffff);
+ location += (bucketId.getRawId() & 0xffffffff);
document::Document::SP doc(
createRandomDocumentAtLocation(
location, seed, minDocSize, maxDocSize));
if (headerOnly) {
clearBody(*doc);
}
- doPut(doc, spi::Timestamp(seed), disk, bucket.getUsedBits());
+ doPut(doc, spi::Timestamp(seed), disk, bucketId.getUsedBits());
if (optype == 0) { // Regular put
} else if (optype == 1) { // Overwritten later in time
document::Document::SP doc2(new document::Document(*doc));
doc2->setValue(doc2->getField("content"),
document::StringFieldValue("overwritten"));
doPut(doc2, spi::Timestamp(seed + 500),
- disk, bucket.getUsedBits());
+ disk, bucketId.getUsedBits());
} else if (optype == 2) { // Removed
doRemove(doc->getId(), spi::Timestamp(seed + 500), disk, false,
- bucket.getUsedBits());
+ bucketId.getUsedBits());
} else if (optype == 3) { // Unrevertable removed
doRemove(doc->getId(), spi::Timestamp(seed), disk, true,
- bucket.getUsedBits());
+ bucketId.getUsedBits());
}
}
}
diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h
index 1b53b401a62..573ff28a80c 100644
--- a/storage/src/tests/persistence/persistencetestutils.h
+++ b/storage/src/tests/persistence/persistencetestutils.h
@@ -188,7 +188,7 @@ public:
*
* @disk If set, use this disk, otherwise lookup in bucket db.
*/
- void createTestBucket(const document::BucketId&, uint16_t disk = 0xffff);
+ void createTestBucket(const document::Bucket&, uint16_t disk = 0xffff);
/**
* Create a new persistence thread.
diff --git a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
index 732aff5faf6..18e93d00494 100644
--- a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
+++ b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
@@ -197,7 +197,7 @@ bool
hasAbortedAllOf(const AbortBucketOperationsCommand::SP& cmd, const Vec& v)
{
for (auto& b : v) {
- if (!cmd->shouldAbort(b)) {
+ if (!cmd->shouldAbort(makeDocumentBucket(b))) {
return false;
}
}
@@ -209,7 +209,7 @@ bool
hasAbortedNoneOf(const AbortBucketOperationsCommand::SP& cmd, const Vec& v)
{
for (auto& b : v) {
- if (cmd->shouldAbort(b)) {
+ if (cmd->shouldAbort(makeDocumentBucket(b))) {
return false;
}
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
index 7286b6e3586..e23bfda192c 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
@@ -149,20 +149,20 @@ FileStorHandler::getQueueSize(uint16_t disk) const
}
void
-FileStorHandler::addMergeStatus(const document::BucketId& bucket,
+FileStorHandler::addMergeStatus(const document::Bucket& bucket,
MergeStatus::SP ms)
{
return _impl->addMergeStatus(bucket, ms);
}
MergeStatus&
-FileStorHandler::editMergeStatus(const document::BucketId& bucket)
+FileStorHandler::editMergeStatus(const document::Bucket& bucket)
{
return _impl->editMergeStatus(bucket);
}
bool
-FileStorHandler::isMerging(const document::BucketId& bucket) const
+FileStorHandler::isMerging(const document::Bucket& bucket) const
{
return _impl->isMerging(bucket);
}
@@ -174,14 +174,14 @@ FileStorHandler::getNumActiveMerges() const
}
void
-FileStorHandler::clearMergeStatus(const document::BucketId& bucket,
+FileStorHandler::clearMergeStatus(const document::Bucket& bucket,
const api::ReturnCode& code)
{
return _impl->clearMergeStatus(bucket, &code);
}
void
-FileStorHandler::clearMergeStatus(const document::BucketId& bucket)
+FileStorHandler::clearMergeStatus(const document::Bucket& bucket)
{
return _impl->clearMergeStatus(bucket, 0);
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index f7e32554c18..6e2d6a0fc07 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -225,7 +225,7 @@ public:
/**
* Add a new merge state to the registry.
*/
- void addMergeStatus(const document::BucketId&, MergeStatus::SP);
+ void addMergeStatus(const document::Bucket&, MergeStatus::SP);
/**
* Returns the reference to the current merge status for the given bucket.
@@ -234,7 +234,7 @@ public:
*
* @param bucket The bucket to start merging.
*/
- MergeStatus& editMergeStatus(const document::BucketId& bucket);
+ MergeStatus& editMergeStatus(const document::Bucket& bucket);
/**
* Returns true if the bucket is currently being merged on this node.
@@ -242,7 +242,7 @@ public:
* @param bucket The bucket to check merge status for
* @return Returns true if the bucket is being merged.
*/
- bool isMerging(const document::BucketId& bucket) const;
+ bool isMerging(const document::Bucket& bucket) const;
/**
* @return Returns the number of active merges on the node.
@@ -250,8 +250,8 @@ public:
uint32_t getNumActiveMerges() const;
/** Removes the merge status for the given bucket. */
- void clearMergeStatus(const document::BucketId&);
- void clearMergeStatus(const document::BucketId&, const api::ReturnCode&);
+ void clearMergeStatus(const document::Bucket&);
+ void clearMergeStatus(const document::Bucket&, const api::ReturnCode&);
void abortQueuedOperations(const AbortBucketOperationsCommand& cmd);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index a887d7d3791..fffd559866d 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -58,7 +58,7 @@ FileStorHandlerImpl::FileStorHandlerImpl(
FileStorHandlerImpl::~FileStorHandlerImpl() { }
void
-FileStorHandlerImpl::addMergeStatus(const document::BucketId& bucket, MergeStatus::SP status)
+FileStorHandlerImpl::addMergeStatus(const document::Bucket& bucket, MergeStatus::SP status)
{
vespalib::LockGuard mlock(_mergeStatesLock);
if (_mergeStates.find(bucket) != _mergeStates.end()) {;
@@ -69,7 +69,7 @@ FileStorHandlerImpl::addMergeStatus(const document::BucketId& bucket, MergeStatu
}
MergeStatus&
-FileStorHandlerImpl::editMergeStatus(const document::BucketId& bucket)
+FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket)
{
vespalib::LockGuard mlock(_mergeStatesLock);
MergeStatus::SP status = _mergeStates[bucket];
@@ -81,7 +81,7 @@ FileStorHandlerImpl::editMergeStatus(const document::BucketId& bucket)
}
bool
-FileStorHandlerImpl::isMerging(const document::BucketId& bucket) const
+FileStorHandlerImpl::isMerging(const document::Bucket& bucket) const
{
vespalib::LockGuard mlock(_mergeStatesLock);
return (_mergeStates.find(bucket) != _mergeStates.end());
@@ -95,7 +95,7 @@ FileStorHandlerImpl::getNumActiveMerges() const
}
void
-FileStorHandlerImpl::clearMergeStatus(const document::BucketId& bucket,
+FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket,
const api::ReturnCode* code)
{
vespalib::LockGuard mlock(_mergeStatesLock);
@@ -154,7 +154,7 @@ FileStorHandlerImpl::flush(bool killPendingMerges)
if (killPendingMerges) {
api::ReturnCode code(api::ReturnCode::ABORTED,
"Storage node is shutting down");
- for (std::map<document::BucketId, MergeStatus::SP>::iterator it
+ for (std::map<document::Bucket, MergeStatus::SP>::iterator it
= _mergeStates.begin(); it != _mergeStates.end(); ++it)
{
MergeStatus& s(*it->second);
@@ -337,7 +337,7 @@ FileStorHandlerImpl::abortQueuedCommandsForBuckets(
"bucket operation was bound to");
for (iter_t it(t.queue.begin()), e(t.queue.end()); it != e;) {
api::StorageMessage& msg(*it->_command);
- if (messageMayBeAborted(msg) && cmd.shouldAbort(it->_bucket.getBucketId())) {
+ if (messageMayBeAborted(msg) && cmd.shouldAbort(it->_bucket)) {
LOG(debug,
"Aborting operation %s as it is bound for bucket %s",
msg.toString().c_str(),
@@ -360,7 +360,7 @@ FileStorHandlerImpl::diskHasActiveOperationForAbortedBucket(
const AbortBucketOperationsCommand& cmd) const
{
for (auto& lockedBucket : disk.lockedBuckets) {
- if (cmd.shouldAbort(lockedBucket.first.getBucketId())) {
+ if (cmd.shouldAbort(lockedBucket.first)) {
LOG(spam,
"Disk had active operation for aborted bucket %s, "
"waiting for it to complete...",
@@ -868,7 +868,7 @@ FileStorHandlerImpl::remapMessage(
<< ". Cannot remap merge, so aborting it";
api::ReturnCode code(api::ReturnCode::BUCKET_DELETED,
ost.str());
- clearMergeStatus(cmd.getBucketId(), &code);
+ clearMergeStatus(cmd.getBucket(), &code);
}
}
// Follow onto next to move queue or fail
@@ -1373,7 +1373,7 @@ FileStorHandlerImpl::getStatus(std::ostream& out,
if (_mergeStates.size() == 0) {
out << "None\n";
}
- for (std::map<document::BucketId, MergeStatus::SP>::const_iterator it
+ for (std::map<document::Bucket, MergeStatus::SP>::const_iterator it
= _mergeStates.begin(); it != _mergeStates.end(); ++it)
{
out << "<b>" << it->first.toString() << "</b><br>\n";
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 58e93d733fd..56ac9ea0577 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -182,11 +182,11 @@ public:
std::shared_ptr<FileStorHandler::BucketLockInterface>
lock(const document::Bucket&, uint16_t disk);
- void addMergeStatus(const document::BucketId&, MergeStatus::SP);
- MergeStatus& editMergeStatus(const document::BucketId&);
- bool isMerging(const document::BucketId&) const;
+ void addMergeStatus(const document::Bucket&, MergeStatus::SP);
+ MergeStatus& editMergeStatus(const document::Bucket&);
+ bool isMerging(const document::Bucket&) const;
uint32_t getNumActiveMerges() const;
- void clearMergeStatus(const document::BucketId&, const api::ReturnCode*);
+ void clearMergeStatus(const document::Bucket&, const api::ReturnCode*);
std::string dumpQueue(uint16_t disk) const;
ResumeGuard pause();
@@ -202,7 +202,7 @@ private:
vespalib::Lock _mergeStatesLock;
- std::map<document::BucketId, MergeStatus::SP> _mergeStates;
+ std::map<document::Bucket, MergeStatus::SP> _mergeStates;
uint8_t _maxPriorityToBlock;
uint8_t _minPriorityToBeBlocking;
@@ -212,7 +212,6 @@ private:
std::atomic<bool> _paused;
void reply(api::StorageMessage&, DiskState state) const;
- static document::BucketId getBucketId(const api::StorageMessage&);
// Returns the index in the targets array we are sending to, or -1 if none of them match.
int calculateTargetBasedOnDocId(const api::StorageMessage& msg, std::vector<RemapInfo*>& targets);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 77f6d912306..7abf480e80b 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -183,12 +183,12 @@ FileStorManager::replyWithBucketNotFound(api::StorageMessage& msg,
StorBucketDatabase::WrappedEntry
FileStorManager::mapOperationToDisk(api::StorageMessage& msg,
- const document::BucketId& bucket)
+ const document::Bucket& bucket)
{
StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase().get(
- bucket, "FileStorManager::mapOperationToDisk"));
+ bucket.getBucketId(), "FileStorManager::mapOperationToDisk"));
if (!entry.exist()) {
- replyWithBucketNotFound(msg, bucket);
+ replyWithBucketNotFound(msg, bucket.getBucketId());
}
return entry;
}
@@ -259,7 +259,7 @@ FileStorManager::handlePersistenceMessage(
msg->getType().getName().c_str(), disk);
LOG_BUCKET_OPERATION_NO_LOCK(
- getStorageMessageBucketId(*msg).getBucketId(),
+ getStorageMessageBucket(*msg).getBucketId(),
vespalib::make_string("Attempting to queue %s to disk %u",
msg->toString().c_str(), disk));
@@ -400,7 +400,7 @@ FileStorManager::onBatchPutRemove(const std::shared_ptr<api::BatchPutRemoveComma
bool
FileStorManager::onRemoveLocation(const std::shared_ptr<api::RemoveLocationCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -410,7 +410,7 @@ FileStorManager::onRemoveLocation(const std::shared_ptr<api::RemoveLocationComma
bool
FileStorManager::onStatBucket(const std::shared_ptr<api::StatBucketCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -645,7 +645,7 @@ FileStorManager::validateApplyDiffCommandBucket(api::StorageMessage& msg, const
bool
FileStorManager::validateDiffReplyBucket(const StorBucketDatabase::WrappedEntry& entry,
- const document::BucketId& bucket)
+ const document::Bucket& bucket)
{
if (!entry.exist()) {
_filestorHandler->clearMergeStatus(bucket,
@@ -663,8 +663,8 @@ FileStorManager::validateDiffReplyBucket(const StorBucketDatabase::WrappedEntry&
bool
FileStorManager::onGetBucketDiffReply(const shared_ptr<api::GetBucketDiffReply>& reply)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*reply, reply->getBucketId()));
- if (validateDiffReplyBucket(entry, reply->getBucketId())) {
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*reply, reply->getBucket()));
+ if (validateDiffReplyBucket(entry, reply->getBucket())) {
handlePersistenceMessage(reply, entry->disk);
}
return true;
@@ -673,7 +673,7 @@ FileStorManager::onGetBucketDiffReply(const shared_ptr<api::GetBucketDiffReply>&
bool
FileStorManager::onApplyBucketDiff(const shared_ptr<api::ApplyBucketDiffCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (validateApplyDiffCommandBucket(*cmd, entry)) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -684,8 +684,8 @@ bool
FileStorManager::onApplyBucketDiffReply(const shared_ptr<api::ApplyBucketDiffReply>& reply)
{
StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *reply, reply->getBucketId()));
- if (validateDiffReplyBucket(entry, reply->getBucketId())) {
+ *reply, reply->getBucket()));
+ if (validateDiffReplyBucket(entry, reply->getBucket())) {
handlePersistenceMessage(reply, entry->disk);
}
return true;
@@ -708,7 +708,7 @@ FileStorManager::onJoinBuckets(const std::shared_ptr<api::JoinBucketsCommand>& c
bool
FileStorManager::onSplitBucket(const std::shared_ptr<api::SplitBucketCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -719,7 +719,7 @@ bool
FileStorManager::onSetBucketState(
const std::shared_ptr<api::SetBucketStateCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -733,7 +733,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
case GetIterCommand::ID:
{
shared_ptr<GetIterCommand> cmd(std::static_pointer_cast<GetIterCommand>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -742,7 +742,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
case CreateIteratorCommand::ID:
{
shared_ptr<CreateIteratorCommand> cmd(std::static_pointer_cast<CreateIteratorCommand>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -765,7 +765,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
case ReadBucketInfo::ID:
{
shared_ptr<ReadBucketInfo> cmd(std::static_pointer_cast<ReadBucketInfo>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -774,7 +774,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
case InternalBucketJoinCommand::ID:
{
shared_ptr<InternalBucketJoinCommand> cmd(std::static_pointer_cast<InternalBucketJoinCommand>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -783,7 +783,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
case RepairBucketCommand::ID:
{
shared_ptr<RepairBucketCommand> cmd(std::static_pointer_cast<RepairBucketCommand>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -792,7 +792,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
case BucketDiskMoveCommand::ID:
{
shared_ptr<BucketDiskMoveCommand> cmd(std::static_pointer_cast<BucketDiskMoveCommand>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -801,7 +801,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
case RecheckBucketInfoCommand::ID:
{
shared_ptr<RecheckBucketInfoCommand> cmd(std::static_pointer_cast<RecheckBucketInfoCommand>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -939,7 +939,7 @@ FileStorManager::reportHtmlStatus(std::ostream& out,
}
bool
-FileStorManager::isMerging(const document::BucketId& bucket) const
+FileStorManager::isMerging(const document::Bucket& bucket) const
{
return _filestorHandler->isMerging(bucket);
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
index 05b9b9bc430..1f4186bf625 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
@@ -109,7 +109,7 @@ public:
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
// Return true if we are currently merging the given bucket.
- bool isMerging(const document::BucketId& bucket) const;
+ bool isMerging(const document::Bucket& bucket) const;
FileStorHandler& getFileStorHandler() {
return *_filestorHandler;
@@ -140,9 +140,9 @@ private:
const char* callerId);
bool validateApplyDiffCommandBucket(api::StorageMessage& msg, const StorBucketDatabase::WrappedEntry&);
- bool validateDiffReplyBucket(const StorBucketDatabase::WrappedEntry&, const document::BucketId&);
+ bool validateDiffReplyBucket(const StorBucketDatabase::WrappedEntry&, const document::Bucket&);
- StorBucketDatabase::WrappedEntry mapOperationToDisk(api::StorageMessage&, const document::BucketId&);
+ StorBucketDatabase::WrappedEntry mapOperationToDisk(api::StorageMessage&, const document::Bucket&);
StorBucketDatabase::WrappedEntry mapOperationToBucketAndDisk(api::BucketCommand&, const document::DocumentId*);
bool handlePersistenceMessage(const std::shared_ptr<api::StorageMessage>&, uint16_t disk);
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 5fa8ca5c707..92de9720bfb 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -927,11 +927,11 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
class MergeStateDeleter {
public:
FileStorHandler& _handler;
- document::BucketId _bucket;
+ document::Bucket _bucket;
bool _active;
MergeStateDeleter(FileStorHandler& handler,
- const document::BucketId& bucket)
+ const document::Bucket& bucket)
: _handler(handler),
_bucket(bucket),
_active(true)
@@ -956,7 +956,6 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd,
_env._component.getClock()));
spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
- const document::BucketId id(bucket.getBucketId());
LOG(debug, "MergeBucket(%s) with max timestamp %" PRIu64 ".",
bucket.toString().c_str(), cmd.getMaxTimestamp());
@@ -989,7 +988,7 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd,
}
}
- if (_env._fileStorHandler.isMerging(id)) {
+ if (_env._fileStorHandler.isMerging(bucket.getBucket())) {
const char* err = "A merge is already running on this bucket.";
LOG(debug, "%s", err);
tracker->fail(ReturnCode::BUSY, err);
@@ -997,11 +996,11 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd,
}
checkResult(_spi.createBucket(bucket, context), bucket, "create bucket");
- MergeStateDeleter stateGuard(_env._fileStorHandler, id);
+ MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket());
MergeStatus::SP s = MergeStatus::SP(new MergeStatus(
_env._component.getClock(), cmd.getLoadType(),
cmd.getPriority(), cmd.getTrace().getLevel()));
- _env._fileStorHandler.addMergeStatus(id, s);
+ _env._fileStorHandler.addMergeStatus(bucket.getBucket(), s);
s->nodeList = cmd.getNodes();
s->maxTimestamp = Timestamp(cmd.getMaxTimestamp());
s->timeout = cmd.getTimeout();
@@ -1182,11 +1181,10 @@ MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd,
_env._metrics.getBucketDiff,
_env._component.getClock()));
spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
- const document::BucketId id(bucket.getBucketId());
LOG(debug, "GetBucketDiff(%s)", bucket.toString().c_str());
checkResult(_spi.createBucket(bucket, context), bucket, "create bucket");
- if (_env._fileStorHandler.isMerging(id)) {
+ if (_env._fileStorHandler.isMerging(bucket.getBucket())) {
tracker->fail(ReturnCode::BUSY,
"A merge is already running on this bucket.");
return tracker;
@@ -1239,11 +1237,11 @@ MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd,
} else {
// When not the last node in merge chain, we must save reply, and
// send command on.
- MergeStateDeleter stateGuard(_env._fileStorHandler, id);
+ MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket());
MergeStatus::SP s(new MergeStatus(_env._component.getClock(),
cmd.getLoadType(), cmd.getPriority(),
cmd.getTrace().getLevel()));
- _env._fileStorHandler.addMergeStatus(id, s);
+ _env._fileStorHandler.addMergeStatus(bucket.getBucket(), s);
s->pendingGetDiff =
api::GetBucketDiffReply::SP(new api::GetBucketDiffReply(cmd));
@@ -1305,23 +1303,22 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply,
{
++_env._metrics.getBucketDiffReply;
spi::Bucket bucket(reply.getBucket(), spi::PartitionId(_env._partition));
- document::BucketId id(bucket.getBucketId());
LOG(debug, "GetBucketDiffReply(%s)", bucket.toString().c_str());
- if (!_env._fileStorHandler.isMerging(id)) {
+ if (!_env._fileStorHandler.isMerging(bucket.getBucket())) {
LOG(warning, "Got GetBucketDiffReply for %s which we have no "
"merge state for.",
bucket.toString().c_str());
- DUMP_LOGGED_BUCKET_OPERATIONS(id);
+ DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId());
return;
}
- MergeStatus& s = _env._fileStorHandler.editMergeStatus(id);
+ MergeStatus& s = _env._fileStorHandler.editMergeStatus(bucket.getBucket());
if (s.pendingId != reply.getMsgId()) {
LOG(warning, "Got GetBucketDiffReply for %s which had message "
"id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.",
bucket.toString().c_str(), reply.getMsgId(), s.pendingId);
- DUMP_LOGGED_BUCKET_OPERATIONS(id);
+ DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId());
return;
}
api::StorageReply::SP replyToSend;
@@ -1363,7 +1360,7 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply,
}
} catch (std::exception& e) {
_env._fileStorHandler.clearMergeStatus(
- id,
+ bucket.getBucket(),
api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE,
e.what()));
throw;
@@ -1372,7 +1369,7 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply,
}
if (clearState) {
- _env._fileStorHandler.clearMergeStatus(id);
+ _env._fileStorHandler.clearMergeStatus(bucket.getBucket());
}
if (replyToSend.get()) {
replyToSend->setResult(reply.getResult());
@@ -1389,10 +1386,9 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd,
_env._component.getClock()));
spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
- const document::BucketId id(bucket.getBucketId());
LOG(debug, "%s", cmd.toString().c_str());
- if (_env._fileStorHandler.isMerging(id)) {
+ if (_env._fileStorHandler.isMerging(bucket.getBucket())) {
tracker->fail(ReturnCode::BUSY,
"A merge is already running on this bucket.");
return tracker;
@@ -1451,11 +1447,11 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd,
} else {
// When not the last node in merge chain, we must save reply, and
// send command on.
- MergeStateDeleter stateGuard(_env._fileStorHandler, id);
+ MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket());
MergeStatus::SP s(new MergeStatus(_env._component.getClock(),
cmd.getLoadType(), cmd.getPriority(),
cmd.getTrace().getLevel()));
- _env._fileStorHandler.addMergeStatus(id, s);
+ _env._fileStorHandler.addMergeStatus(bucket.getBucket(), s);
s->pendingApplyDiff =
api::ApplyBucketDiffReply::SP(new api::ApplyBucketDiffReply(cmd));
@@ -1485,24 +1481,23 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,
{
++_env._metrics.applyBucketDiffReply;
spi::Bucket bucket(reply.getBucket(), spi::PartitionId(_env._partition));
- document::BucketId id(bucket.getBucketId());
std::vector<api::ApplyBucketDiffCommand::Entry>& diff(reply.getDiff());
LOG(debug, "%s", reply.toString().c_str());
- if (!_env._fileStorHandler.isMerging(id)) {
+ if (!_env._fileStorHandler.isMerging(bucket.getBucket())) {
LOG(warning, "Got ApplyBucketDiffReply for %s which we have no "
"merge state for.",
bucket.toString().c_str());
- DUMP_LOGGED_BUCKET_OPERATIONS(id);
+ DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId());
return;
}
- MergeStatus& s = _env._fileStorHandler.editMergeStatus(id);
+ MergeStatus& s = _env._fileStorHandler.editMergeStatus(bucket.getBucket());
if (s.pendingId != reply.getMsgId()) {
LOG(warning, "Got ApplyBucketDiffReply for %s which had message "
"id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.",
bucket.toString().c_str(), reply.getMsgId(), s.pendingId);
- DUMP_LOGGED_BUCKET_OPERATIONS(id);
+ DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId());
return;
}
bool clearState = true;
@@ -1587,7 +1582,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,
}
} catch (std::exception& e) {
_env._fileStorHandler.clearMergeStatus(
- id,
+ bucket.getBucket(),
api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE,
e.what()));
throw;
@@ -1596,7 +1591,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,
}
if (clearState) {
- _env._fileStorHandler.clearMergeStatus(id);
+ _env._fileStorHandler.clearMergeStatus(bucket.getBucket());
}
if (replyToSend.get()) {
// Send on
diff --git a/storage/src/vespa/storage/persistence/messages.cpp b/storage/src/vespa/storage/persistence/messages.cpp
index 78f4b02a192..876067e4e15 100644
--- a/storage/src/vespa/storage/persistence/messages.cpp
+++ b/storage/src/vespa/storage/persistence/messages.cpp
@@ -151,17 +151,6 @@ RecheckBucketInfoCommand::makeReply() {
return std::make_unique<RecheckBucketInfoReply>(*this);
}
-bool
-AbortBucketOperationsCommand::ExplicitBucketSetPredicate::doShouldAbort(const document::BucketId& bid) const {
- return _bucketsToAbort.find(bid) != _bucketsToAbort.end();
-}
-
-AbortBucketOperationsCommand::ExplicitBucketSetPredicate::ExplicitBucketSetPredicate(const BucketSet& bucketsToAbort)
- : _bucketsToAbort(bucketsToAbort)
-{ }
-
-AbortBucketOperationsCommand::ExplicitBucketSetPredicate::~ExplicitBucketSetPredicate() { }
-
AbortBucketOperationsCommand::AbortBucketOperationsCommand(std::unique_ptr<AbortPredicate> predicate)
: api::InternalCommand(ID),
_predicate(std::move(predicate))
diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h
index d47a389a8c1..7bbeea8a12a 100644
--- a/storage/src/vespa/storage/persistence/messages.h
+++ b/storage/src/vespa/storage/persistence/messages.h
@@ -206,33 +206,11 @@ class AbortBucketOperationsCommand : public api::InternalCommand
{
public:
class AbortPredicate {
- virtual bool doShouldAbort(const document::BucketId&) const = 0;
+ virtual bool doShouldAbort(const document::Bucket&) const = 0;
public:
virtual ~AbortPredicate() {}
- bool shouldAbort(const document::BucketId& bid) const {
- return doShouldAbort(bid);
- }
- };
-
- using BucketSet = vespalib::hash_set<document::BucketId, document::BucketId::hash>;
-
- // Primarily for unit test mocking; actual predicate impl should do lazy
- // evaluations based on previous and current cluster states.
- class ExplicitBucketSetPredicate : public AbortPredicate {
- BucketSet _bucketsToAbort;
-
- bool doShouldAbort(const document::BucketId& bid) const override;
- public:
- explicit ExplicitBucketSetPredicate(const BucketSet& bucketsToAbort);
- ~ExplicitBucketSetPredicate();
-
- template <typename Iterator>
- ExplicitBucketSetPredicate(Iterator first, Iterator last)
- : _bucketsToAbort(first, last)
- { }
-
- const BucketSet& getBucketsToAbort() const {
- return _bucketsToAbort;
+ bool shouldAbort(const document::Bucket &bucket) const {
+ return doShouldAbort(bucket);
}
};
@@ -245,8 +223,8 @@ public:
AbortBucketOperationsCommand(std::unique_ptr<AbortPredicate> predicate);
~AbortBucketOperationsCommand();
- bool shouldAbort(const document::BucketId& bid) const {
- return _predicate->shouldAbort(bid);
+ bool shouldAbort(const document::Bucket &bucket) const {
+ return _predicate->shouldAbort(bucket);
}
std::unique_ptr<api::StorageReply> makeReply() override;
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index 13a2bbd906b..074e5aafb3c 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -316,7 +316,7 @@ PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd)
_env._metrics.createBuckets,
_env._component.getClock()));
LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str());
- if (_env._fileStorHandler.isMerging(cmd.getBucketId())) {
+ if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
LOG(warning, "Bucket %s was merging at create time. Unexpected.",
cmd.getBucketId().toString().c_str());
DUMP_LOGGED_BUCKET_OPERATIONS(cmd.getBucketId());
@@ -385,8 +385,8 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd)
_env._component.getClock()));
LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str());
LOG_BUCKET_OPERATION(cmd.getBucketId(), "deleteBucket()");
- if (_env._fileStorHandler.isMerging(cmd.getBucketId())) {
- _env._fileStorHandler.clearMergeStatus(cmd.getBucketId(),
+ if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
+ _env._fileStorHandler.clearMergeStatus(cmd.getBucket(),
api::ReturnCode(api::ReturnCode::ABORTED,
"Bucket was deleted during the merge"));
}
diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp
index e981e29e806..4b624199459 100644
--- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp
+++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp
@@ -155,15 +155,15 @@ class StateDiffLazyAbortPredicate
// where all distributors are down.
bool _allDistributorsHaveGoneDown;
- bool doShouldAbort(const document::BucketId& b) const override {
+ bool doShouldAbort(const document::Bucket &bucket) const override {
if (_allDistributorsHaveGoneDown) {
return true;
}
- uint16_t oldOwner(_oldState.ownerOf(b));
- uint16_t newOwner(_newState.ownerOf(b));
+ uint16_t oldOwner(_oldState.ownerOf(bucket.getBucketId()));
+ uint16_t newOwner(_newState.ownerOf(bucket.getBucketId()));
if (oldOwner != newOwner) {
LOG(spam, "Owner of %s was %u, now %u. Operation should be aborted",
- b.toString().c_str(), oldOwner, newOwner);
+ bucket.toString().c_str(), oldOwner, newOwner);
return true;
}
return false;