summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@oath.com>2017-10-25 09:59:55 +0000
committerTor Egge <Tor.Egge@oath.com>2017-10-25 12:06:17 +0000
commitc7fff4e03569a23a7306c03dd4cd7473291efe05 (patch)
treee34ff0a5f011ee38f25fa694671748be19ed399f /storage
parentf671f4e689e1dc2f9e878ac678c626096cece26e (diff)
Use document::Bucket as key for merge state tracking.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp42
-rw-r--r--storage/src/tests/persistence/persistencetestutils.cpp14
-rw-r--r--storage/src/tests/persistence/persistencetestutils.h2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp10
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h10
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp14
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h11
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp8
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.h4
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp51
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp6
11 files changed, 83 insertions, 89 deletions
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index 120dbd4ce45..47ed60b62ad 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -20,7 +20,7 @@ namespace storage {
struct MergeHandlerTest : public SingleDiskPersistenceTestUtils
{
uint32_t _location; // Location used for all merge tests
- document::BucketId _bucket; // Bucket used for all merge tests
+ document::Bucket _bucket; // Bucket used for all merge tests
uint64_t _maxTimestamp;
std::vector<api::MergeBucketCommand::Node> _nodes;
std::unique_ptr<spi::Context> _context;
@@ -235,13 +235,13 @@ MergeHandlerTest::setUp() {
SingleDiskPersistenceTestUtils::setUp();
_location = 1234;
- _bucket = document::BucketId(16, _location);
+ _bucket = makeDocumentBucket(document::BucketId(16, _location));
_maxTimestamp = 11501;
LOG(info, "Creating %s in bucket database", _bucket.toString().c_str());
bucketdb::StorageBucketInfo bucketDBEntry;
bucketDBEntry.disk = 0;
- getEnv().getBucketDatabase().insert(_bucket, bucketDBEntry, "mergetestsetup");
+ getEnv().getBucketDatabase().insert(_bucket.getBucketId(), bucketDBEntry, "mergetestsetup");
LOG(info, "Creating bucket to merge");
createTestBucket(_bucket);
@@ -267,7 +267,7 @@ MergeHandlerTest::testMergeBucketCommand()
MergeHandler handler(getPersistenceProvider(), getEnv());
LOG(info, "Handle a merge bucket command");
- api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp);
+ api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
cmd.setSourceIndex(1234);
MessageTracker::UP tracker = handler.handleMergeBucket(cmd, *_context);
@@ -294,7 +294,7 @@ MergeHandlerTest::testGetBucketDiffChain(bool midChain)
MergeHandler handler(getPersistenceProvider(), getEnv());
LOG(info, "Verifying that get bucket diff is sent on");
- api::GetBucketDiffCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp);
+ api::GetBucketDiffCommand cmd(_bucket, _nodes, _maxTimestamp);
MessageTracker::UP tracker1 = handler.handleGetBucketDiff(cmd, *_context);
api::StorageMessage::SP replySent = tracker1->getReply();
@@ -338,7 +338,7 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain)
MergeHandler handler(getPersistenceProvider(), getEnv());
LOG(info, "Verifying that apply bucket diff is sent on");
- api::ApplyBucketDiffCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp);
+ api::ApplyBucketDiffCommand cmd(_bucket, _nodes, _maxTimestamp);
MessageTracker::UP tracker1 = handler.handleApplyBucketDiff(cmd, *_context);
api::StorageMessage::SP replySent = tracker1->getReply();
@@ -382,7 +382,7 @@ MergeHandlerTest::testMasterMessageFlow()
MergeHandler handler(getPersistenceProvider(), getEnv());
LOG(info, "Handle a merge bucket command");
- api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp);
+ api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(cmd, *_context);
LOG(info, "Check state");
@@ -644,7 +644,7 @@ MergeHandlerTest::testChunkedApplyBucketDiff()
MergeHandler handler(getPersistenceProvider(), getEnv(), maxChunkSize);
LOG(info, "Handle a merge bucket command");
- api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp);
+ api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(cmd, *_context);
std::shared_ptr<api::GetBucketDiffCommand> getBucketDiffCmd(
@@ -732,7 +732,7 @@ MergeHandlerTest::testChunkLimitPartiallyFilledDiff()
setUpChain(MIDDLE);
std::shared_ptr<api::ApplyBucketDiffCommand> applyBucketDiffCmd(
- new api::ApplyBucketDiffCommand(makeDocumentBucket(_bucket), _nodes, maxChunkSize));
+ new api::ApplyBucketDiffCommand(_bucket, _nodes, maxChunkSize));
applyBucketDiffCmd->getDiff() = applyDiff;
MergeHandler handler(
@@ -753,7 +753,7 @@ MergeHandlerTest::testMaxTimestamp()
MergeHandler handler(getPersistenceProvider(), getEnv());
- api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp);
+ api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(cmd, *_context);
std::shared_ptr<api::GetBucketDiffCommand> getCmd(
@@ -820,7 +820,7 @@ MergeHandlerTest::createDummyApplyDiff(int timestampOffset,
}
std::shared_ptr<api::ApplyBucketDiffCommand> applyBucketDiffCmd(
- new api::ApplyBucketDiffCommand(makeDocumentBucket(_bucket), _nodes, 1024*1024));
+ new api::ApplyBucketDiffCommand(_bucket, _nodes, 1024*1024));
applyBucketDiffCmd->getDiff() = applyDiff;
return applyBucketDiffCmd;
}
@@ -856,7 +856,7 @@ MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset,
}
std::shared_ptr<api::GetBucketDiffCommand> getBucketDiffCmd(
- new api::GetBucketDiffCommand(makeDocumentBucket(_bucket), _nodes, 1024*1024));
+ new api::GetBucketDiffCommand(_bucket, _nodes, 1024*1024));
getBucketDiffCmd->getDiff() = diff;
return getBucketDiffCmd;
}
@@ -907,7 +907,7 @@ void
MergeHandlerTest::testMergeProgressSafeGuard()
{
MergeHandler handler(getPersistenceProvider(), getEnv());
- api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp);
+ api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(cmd, *_context);
std::shared_ptr<api::GetBucketDiffCommand> getBucketDiffCmd(
@@ -943,7 +943,7 @@ MergeHandlerTest::testSafeGuardNotInvokedWhenHasMaskChanges()
_nodes.push_back(api::MergeBucketCommand::Node(0, false));
_nodes.push_back(api::MergeBucketCommand::Node(1, false));
_nodes.push_back(api::MergeBucketCommand::Node(2, false));
- api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp);
+ api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(cmd, *_context);
std::shared_ptr<api::GetBucketDiffCommand> getBucketDiffCmd(
@@ -991,7 +991,7 @@ MergeHandlerTest::testEntryRemovedAfterGetBucketDiff()
}
setUpChain(BACK);
std::shared_ptr<api::ApplyBucketDiffCommand> applyBucketDiffCmd(
- new api::ApplyBucketDiffCommand(makeDocumentBucket(_bucket), _nodes, 1024*1024));
+ new api::ApplyBucketDiffCommand(_bucket, _nodes, 1024*1024));
applyBucketDiffCmd->getDiff() = applyDiff;
MessageTracker::UP tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context);
@@ -1086,7 +1086,7 @@ MergeHandlerTest::HandleMergeBucketInvoker::invoke(
MergeHandler& handler,
spi::Context& context)
{
- api::MergeBucketCommand cmd(makeDocumentBucket(test._bucket), test._nodes, test._maxTimestamp);
+ api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp);
handler.handleMergeBucket(cmd, context);
}
@@ -1126,7 +1126,7 @@ MergeHandlerTest::HandleGetBucketDiffInvoker::invoke(
MergeHandler& handler,
spi::Context& context)
{
- api::GetBucketDiffCommand cmd(makeDocumentBucket(test._bucket), test._nodes, test._maxTimestamp);
+ api::GetBucketDiffCommand cmd(test._bucket, test._nodes, test._maxTimestamp);
handler.handleGetBucketDiff(cmd, context);
}
@@ -1215,7 +1215,7 @@ MergeHandlerTest::HandleGetBucketDiffReplyInvoker::beforeInvoke(
MergeHandler& handler,
spi::Context& context)
{
- api::MergeBucketCommand cmd(makeDocumentBucket(test._bucket), test._nodes, test._maxTimestamp);
+ api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp);
handler.handleMergeBucket(cmd, context);
_diffCmd = test.fetchSingleMessage<api::GetBucketDiffCommand>();
}
@@ -1287,7 +1287,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::beforeInvoke(
++_counter;
_stub.clear();
if (getChainPos() == FRONT) {
- api::MergeBucketCommand cmd(makeDocumentBucket(test._bucket), test._nodes, test._maxTimestamp);
+ api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp);
handler.handleMergeBucket(cmd, context);
std::shared_ptr<api::GetBucketDiffCommand> diffCmd(
test.fetchSingleMessage<api::GetBucketDiffCommand>());
@@ -1470,7 +1470,7 @@ MergeHandlerTest::testRemovePutOnExistingTimestamp()
}
std::shared_ptr<api::ApplyBucketDiffCommand> applyBucketDiffCmd(
- new api::ApplyBucketDiffCommand(makeDocumentBucket(_bucket), _nodes, 1024*1024));
+ new api::ApplyBucketDiffCommand(_bucket, _nodes, 1024*1024));
applyBucketDiffCmd->getDiff() = applyDiff;
MessageTracker::UP tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context);
@@ -1480,7 +1480,7 @@ MergeHandlerTest::testRemovePutOnExistingTimestamp()
tracker->getReply()));
CPPUNIT_ASSERT(applyBucketDiffReply.get());
- api::MergeBucketCommand cmd(makeDocumentBucket(_bucket), _nodes, _maxTimestamp);
+ api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
handler.handleMergeBucket(cmd, *_context);
std::shared_ptr<api::GetBucketDiffCommand> getBucketDiffCmd(
diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp
index c17c4837bfa..fb6bc298b08 100644
--- a/storage/src/tests/persistence/persistencetestutils.cpp
+++ b/storage/src/tests/persistence/persistencetestutils.cpp
@@ -373,10 +373,10 @@ PersistenceTestUtils::createRandomDocumentAtLocation(
}
void
-PersistenceTestUtils::createTestBucket(const document::BucketId& bucket,
+PersistenceTestUtils::createTestBucket(const document::Bucket& bucket,
uint16_t disk)
{
-
+ document::BucketId bucketId(bucket.getBucketId());
uint32_t opsPerType = 2;
uint32_t numberOfLocations = 2;
uint32_t minDocSize = 0;
@@ -388,27 +388,27 @@ PersistenceTestUtils::createTestBucket(const document::BucketId& bucket,
uint32_t seed = useHeaderOnly * 10000 + optype * 1000 + i + 1;
uint64_t location = (seed % numberOfLocations);
location <<= 32;
- location += (bucket.getRawId() & 0xffffffff);
+ location += (bucketId.getRawId() & 0xffffffff);
document::Document::SP doc(
createRandomDocumentAtLocation(
location, seed, minDocSize, maxDocSize));
if (headerOnly) {
clearBody(*doc);
}
- doPut(doc, spi::Timestamp(seed), disk, bucket.getUsedBits());
+ doPut(doc, spi::Timestamp(seed), disk, bucketId.getUsedBits());
if (optype == 0) { // Regular put
} else if (optype == 1) { // Overwritten later in time
document::Document::SP doc2(new document::Document(*doc));
doc2->setValue(doc2->getField("content"),
document::StringFieldValue("overwritten"));
doPut(doc2, spi::Timestamp(seed + 500),
- disk, bucket.getUsedBits());
+ disk, bucketId.getUsedBits());
} else if (optype == 2) { // Removed
doRemove(doc->getId(), spi::Timestamp(seed + 500), disk, false,
- bucket.getUsedBits());
+ bucketId.getUsedBits());
} else if (optype == 3) { // Unrevertable removed
doRemove(doc->getId(), spi::Timestamp(seed), disk, true,
- bucket.getUsedBits());
+ bucketId.getUsedBits());
}
}
}
diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h
index 1b53b401a62..573ff28a80c 100644
--- a/storage/src/tests/persistence/persistencetestutils.h
+++ b/storage/src/tests/persistence/persistencetestutils.h
@@ -188,7 +188,7 @@ public:
*
* @disk If set, use this disk, otherwise lookup in bucket db.
*/
- void createTestBucket(const document::BucketId&, uint16_t disk = 0xffff);
+ void createTestBucket(const document::Bucket&, uint16_t disk = 0xffff);
/**
* Create a new persistence thread.
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
index 7286b6e3586..e23bfda192c 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
@@ -149,20 +149,20 @@ FileStorHandler::getQueueSize(uint16_t disk) const
}
void
-FileStorHandler::addMergeStatus(const document::BucketId& bucket,
+FileStorHandler::addMergeStatus(const document::Bucket& bucket,
MergeStatus::SP ms)
{
return _impl->addMergeStatus(bucket, ms);
}
MergeStatus&
-FileStorHandler::editMergeStatus(const document::BucketId& bucket)
+FileStorHandler::editMergeStatus(const document::Bucket& bucket)
{
return _impl->editMergeStatus(bucket);
}
bool
-FileStorHandler::isMerging(const document::BucketId& bucket) const
+FileStorHandler::isMerging(const document::Bucket& bucket) const
{
return _impl->isMerging(bucket);
}
@@ -174,14 +174,14 @@ FileStorHandler::getNumActiveMerges() const
}
void
-FileStorHandler::clearMergeStatus(const document::BucketId& bucket,
+FileStorHandler::clearMergeStatus(const document::Bucket& bucket,
const api::ReturnCode& code)
{
return _impl->clearMergeStatus(bucket, &code);
}
void
-FileStorHandler::clearMergeStatus(const document::BucketId& bucket)
+FileStorHandler::clearMergeStatus(const document::Bucket& bucket)
{
return _impl->clearMergeStatus(bucket, 0);
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index f7e32554c18..6e2d6a0fc07 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -225,7 +225,7 @@ public:
/**
* Add a new merge state to the registry.
*/
- void addMergeStatus(const document::BucketId&, MergeStatus::SP);
+ void addMergeStatus(const document::Bucket&, MergeStatus::SP);
/**
* Returns the reference to the current merge status for the given bucket.
@@ -234,7 +234,7 @@ public:
*
* @param bucket The bucket to start merging.
*/
- MergeStatus& editMergeStatus(const document::BucketId& bucket);
+ MergeStatus& editMergeStatus(const document::Bucket& bucket);
/**
* Returns true if the bucket is currently being merged on this node.
@@ -242,7 +242,7 @@ public:
* @param bucket The bucket to check merge status for
* @return Returns true if the bucket is being merged.
*/
- bool isMerging(const document::BucketId& bucket) const;
+ bool isMerging(const document::Bucket& bucket) const;
/**
* @return Returns the number of active merges on the node.
@@ -250,8 +250,8 @@ public:
uint32_t getNumActiveMerges() const;
/** Removes the merge status for the given bucket. */
- void clearMergeStatus(const document::BucketId&);
- void clearMergeStatus(const document::BucketId&, const api::ReturnCode&);
+ void clearMergeStatus(const document::Bucket&);
+ void clearMergeStatus(const document::Bucket&, const api::ReturnCode&);
void abortQueuedOperations(const AbortBucketOperationsCommand& cmd);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index a887d7d3791..718c36b78d4 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -58,7 +58,7 @@ FileStorHandlerImpl::FileStorHandlerImpl(
FileStorHandlerImpl::~FileStorHandlerImpl() { }
void
-FileStorHandlerImpl::addMergeStatus(const document::BucketId& bucket, MergeStatus::SP status)
+FileStorHandlerImpl::addMergeStatus(const document::Bucket& bucket, MergeStatus::SP status)
{
vespalib::LockGuard mlock(_mergeStatesLock);
if (_mergeStates.find(bucket) != _mergeStates.end()) {;
@@ -69,7 +69,7 @@ FileStorHandlerImpl::addMergeStatus(const document::BucketId& bucket, MergeStatu
}
MergeStatus&
-FileStorHandlerImpl::editMergeStatus(const document::BucketId& bucket)
+FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket)
{
vespalib::LockGuard mlock(_mergeStatesLock);
MergeStatus::SP status = _mergeStates[bucket];
@@ -81,7 +81,7 @@ FileStorHandlerImpl::editMergeStatus(const document::BucketId& bucket)
}
bool
-FileStorHandlerImpl::isMerging(const document::BucketId& bucket) const
+FileStorHandlerImpl::isMerging(const document::Bucket& bucket) const
{
vespalib::LockGuard mlock(_mergeStatesLock);
return (_mergeStates.find(bucket) != _mergeStates.end());
@@ -95,7 +95,7 @@ FileStorHandlerImpl::getNumActiveMerges() const
}
void
-FileStorHandlerImpl::clearMergeStatus(const document::BucketId& bucket,
+FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket,
const api::ReturnCode* code)
{
vespalib::LockGuard mlock(_mergeStatesLock);
@@ -154,7 +154,7 @@ FileStorHandlerImpl::flush(bool killPendingMerges)
if (killPendingMerges) {
api::ReturnCode code(api::ReturnCode::ABORTED,
"Storage node is shutting down");
- for (std::map<document::BucketId, MergeStatus::SP>::iterator it
+ for (std::map<document::Bucket, MergeStatus::SP>::iterator it
= _mergeStates.begin(); it != _mergeStates.end(); ++it)
{
MergeStatus& s(*it->second);
@@ -868,7 +868,7 @@ FileStorHandlerImpl::remapMessage(
<< ". Cannot remap merge, so aborting it";
api::ReturnCode code(api::ReturnCode::BUCKET_DELETED,
ost.str());
- clearMergeStatus(cmd.getBucketId(), &code);
+ clearMergeStatus(cmd.getBucket(), &code);
}
}
// Follow onto next to move queue or fail
@@ -1373,7 +1373,7 @@ FileStorHandlerImpl::getStatus(std::ostream& out,
if (_mergeStates.size() == 0) {
out << "None\n";
}
- for (std::map<document::BucketId, MergeStatus::SP>::const_iterator it
+ for (std::map<document::Bucket, MergeStatus::SP>::const_iterator it
= _mergeStates.begin(); it != _mergeStates.end(); ++it)
{
out << "<b>" << it->first.toString() << "</b><br>\n";
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 58e93d733fd..56ac9ea0577 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -182,11 +182,11 @@ public:
std::shared_ptr<FileStorHandler::BucketLockInterface>
lock(const document::Bucket&, uint16_t disk);
- void addMergeStatus(const document::BucketId&, MergeStatus::SP);
- MergeStatus& editMergeStatus(const document::BucketId&);
- bool isMerging(const document::BucketId&) const;
+ void addMergeStatus(const document::Bucket&, MergeStatus::SP);
+ MergeStatus& editMergeStatus(const document::Bucket&);
+ bool isMerging(const document::Bucket&) const;
uint32_t getNumActiveMerges() const;
- void clearMergeStatus(const document::BucketId&, const api::ReturnCode*);
+ void clearMergeStatus(const document::Bucket&, const api::ReturnCode*);
std::string dumpQueue(uint16_t disk) const;
ResumeGuard pause();
@@ -202,7 +202,7 @@ private:
vespalib::Lock _mergeStatesLock;
- std::map<document::BucketId, MergeStatus::SP> _mergeStates;
+ std::map<document::Bucket, MergeStatus::SP> _mergeStates;
uint8_t _maxPriorityToBlock;
uint8_t _minPriorityToBeBlocking;
@@ -212,7 +212,6 @@ private:
std::atomic<bool> _paused;
void reply(api::StorageMessage&, DiskState state) const;
- static document::BucketId getBucketId(const api::StorageMessage&);
// Returns the index in the targets array we are sending to, or -1 if none of them match.
int calculateTargetBasedOnDocId(const api::StorageMessage& msg, std::vector<RemapInfo*>& targets);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 77f6d912306..909c5c467cd 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -645,7 +645,7 @@ FileStorManager::validateApplyDiffCommandBucket(api::StorageMessage& msg, const
bool
FileStorManager::validateDiffReplyBucket(const StorBucketDatabase::WrappedEntry& entry,
- const document::BucketId& bucket)
+ const document::Bucket& bucket)
{
if (!entry.exist()) {
_filestorHandler->clearMergeStatus(bucket,
@@ -664,7 +664,7 @@ bool
FileStorManager::onGetBucketDiffReply(const shared_ptr<api::GetBucketDiffReply>& reply)
{
StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*reply, reply->getBucketId()));
- if (validateDiffReplyBucket(entry, reply->getBucketId())) {
+ if (validateDiffReplyBucket(entry, reply->getBucket())) {
handlePersistenceMessage(reply, entry->disk);
}
return true;
@@ -685,7 +685,7 @@ FileStorManager::onApplyBucketDiffReply(const shared_ptr<api::ApplyBucketDiffRep
{
StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
*reply, reply->getBucketId()));
- if (validateDiffReplyBucket(entry, reply->getBucketId())) {
+ if (validateDiffReplyBucket(entry, reply->getBucket())) {
handlePersistenceMessage(reply, entry->disk);
}
return true;
@@ -939,7 +939,7 @@ FileStorManager::reportHtmlStatus(std::ostream& out,
}
bool
-FileStorManager::isMerging(const document::BucketId& bucket) const
+FileStorManager::isMerging(const document::Bucket& bucket) const
{
return _filestorHandler->isMerging(bucket);
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
index 05b9b9bc430..89d657a4f5c 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
@@ -109,7 +109,7 @@ public:
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
// Return true if we are currently merging the given bucket.
- bool isMerging(const document::BucketId& bucket) const;
+ bool isMerging(const document::Bucket& bucket) const;
FileStorHandler& getFileStorHandler() {
return *_filestorHandler;
@@ -140,7 +140,7 @@ private:
const char* callerId);
bool validateApplyDiffCommandBucket(api::StorageMessage& msg, const StorBucketDatabase::WrappedEntry&);
- bool validateDiffReplyBucket(const StorBucketDatabase::WrappedEntry&, const document::BucketId&);
+ bool validateDiffReplyBucket(const StorBucketDatabase::WrappedEntry&, const document::Bucket&);
StorBucketDatabase::WrappedEntry mapOperationToDisk(api::StorageMessage&, const document::BucketId&);
StorBucketDatabase::WrappedEntry mapOperationToBucketAndDisk(api::BucketCommand&, const document::DocumentId*);
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 5fa8ca5c707..92de9720bfb 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -927,11 +927,11 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
class MergeStateDeleter {
public:
FileStorHandler& _handler;
- document::BucketId _bucket;
+ document::Bucket _bucket;
bool _active;
MergeStateDeleter(FileStorHandler& handler,
- const document::BucketId& bucket)
+ const document::Bucket& bucket)
: _handler(handler),
_bucket(bucket),
_active(true)
@@ -956,7 +956,6 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd,
_env._component.getClock()));
spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
- const document::BucketId id(bucket.getBucketId());
LOG(debug, "MergeBucket(%s) with max timestamp %" PRIu64 ".",
bucket.toString().c_str(), cmd.getMaxTimestamp());
@@ -989,7 +988,7 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd,
}
}
- if (_env._fileStorHandler.isMerging(id)) {
+ if (_env._fileStorHandler.isMerging(bucket.getBucket())) {
const char* err = "A merge is already running on this bucket.";
LOG(debug, "%s", err);
tracker->fail(ReturnCode::BUSY, err);
@@ -997,11 +996,11 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd,
}
checkResult(_spi.createBucket(bucket, context), bucket, "create bucket");
- MergeStateDeleter stateGuard(_env._fileStorHandler, id);
+ MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket());
MergeStatus::SP s = MergeStatus::SP(new MergeStatus(
_env._component.getClock(), cmd.getLoadType(),
cmd.getPriority(), cmd.getTrace().getLevel()));
- _env._fileStorHandler.addMergeStatus(id, s);
+ _env._fileStorHandler.addMergeStatus(bucket.getBucket(), s);
s->nodeList = cmd.getNodes();
s->maxTimestamp = Timestamp(cmd.getMaxTimestamp());
s->timeout = cmd.getTimeout();
@@ -1182,11 +1181,10 @@ MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd,
_env._metrics.getBucketDiff,
_env._component.getClock()));
spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
- const document::BucketId id(bucket.getBucketId());
LOG(debug, "GetBucketDiff(%s)", bucket.toString().c_str());
checkResult(_spi.createBucket(bucket, context), bucket, "create bucket");
- if (_env._fileStorHandler.isMerging(id)) {
+ if (_env._fileStorHandler.isMerging(bucket.getBucket())) {
tracker->fail(ReturnCode::BUSY,
"A merge is already running on this bucket.");
return tracker;
@@ -1239,11 +1237,11 @@ MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd,
} else {
// When not the last node in merge chain, we must save reply, and
// send command on.
- MergeStateDeleter stateGuard(_env._fileStorHandler, id);
+ MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket());
MergeStatus::SP s(new MergeStatus(_env._component.getClock(),
cmd.getLoadType(), cmd.getPriority(),
cmd.getTrace().getLevel()));
- _env._fileStorHandler.addMergeStatus(id, s);
+ _env._fileStorHandler.addMergeStatus(bucket.getBucket(), s);
s->pendingGetDiff =
api::GetBucketDiffReply::SP(new api::GetBucketDiffReply(cmd));
@@ -1305,23 +1303,22 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply,
{
++_env._metrics.getBucketDiffReply;
spi::Bucket bucket(reply.getBucket(), spi::PartitionId(_env._partition));
- document::BucketId id(bucket.getBucketId());
LOG(debug, "GetBucketDiffReply(%s)", bucket.toString().c_str());
- if (!_env._fileStorHandler.isMerging(id)) {
+ if (!_env._fileStorHandler.isMerging(bucket.getBucket())) {
LOG(warning, "Got GetBucketDiffReply for %s which we have no "
"merge state for.",
bucket.toString().c_str());
- DUMP_LOGGED_BUCKET_OPERATIONS(id);
+ DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId());
return;
}
- MergeStatus& s = _env._fileStorHandler.editMergeStatus(id);
+ MergeStatus& s = _env._fileStorHandler.editMergeStatus(bucket.getBucket());
if (s.pendingId != reply.getMsgId()) {
LOG(warning, "Got GetBucketDiffReply for %s which had message "
"id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.",
bucket.toString().c_str(), reply.getMsgId(), s.pendingId);
- DUMP_LOGGED_BUCKET_OPERATIONS(id);
+ DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId());
return;
}
api::StorageReply::SP replyToSend;
@@ -1363,7 +1360,7 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply,
}
} catch (std::exception& e) {
_env._fileStorHandler.clearMergeStatus(
- id,
+ bucket.getBucket(),
api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE,
e.what()));
throw;
@@ -1372,7 +1369,7 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply,
}
if (clearState) {
- _env._fileStorHandler.clearMergeStatus(id);
+ _env._fileStorHandler.clearMergeStatus(bucket.getBucket());
}
if (replyToSend.get()) {
replyToSend->setResult(reply.getResult());
@@ -1389,10 +1386,9 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd,
_env._component.getClock()));
spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
- const document::BucketId id(bucket.getBucketId());
LOG(debug, "%s", cmd.toString().c_str());
- if (_env._fileStorHandler.isMerging(id)) {
+ if (_env._fileStorHandler.isMerging(bucket.getBucket())) {
tracker->fail(ReturnCode::BUSY,
"A merge is already running on this bucket.");
return tracker;
@@ -1451,11 +1447,11 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd,
} else {
// When not the last node in merge chain, we must save reply, and
// send command on.
- MergeStateDeleter stateGuard(_env._fileStorHandler, id);
+ MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket());
MergeStatus::SP s(new MergeStatus(_env._component.getClock(),
cmd.getLoadType(), cmd.getPriority(),
cmd.getTrace().getLevel()));
- _env._fileStorHandler.addMergeStatus(id, s);
+ _env._fileStorHandler.addMergeStatus(bucket.getBucket(), s);
s->pendingApplyDiff =
api::ApplyBucketDiffReply::SP(new api::ApplyBucketDiffReply(cmd));
@@ -1485,24 +1481,23 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,
{
++_env._metrics.applyBucketDiffReply;
spi::Bucket bucket(reply.getBucket(), spi::PartitionId(_env._partition));
- document::BucketId id(bucket.getBucketId());
std::vector<api::ApplyBucketDiffCommand::Entry>& diff(reply.getDiff());
LOG(debug, "%s", reply.toString().c_str());
- if (!_env._fileStorHandler.isMerging(id)) {
+ if (!_env._fileStorHandler.isMerging(bucket.getBucket())) {
LOG(warning, "Got ApplyBucketDiffReply for %s which we have no "
"merge state for.",
bucket.toString().c_str());
- DUMP_LOGGED_BUCKET_OPERATIONS(id);
+ DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId());
return;
}
- MergeStatus& s = _env._fileStorHandler.editMergeStatus(id);
+ MergeStatus& s = _env._fileStorHandler.editMergeStatus(bucket.getBucket());
if (s.pendingId != reply.getMsgId()) {
LOG(warning, "Got ApplyBucketDiffReply for %s which had message "
"id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.",
bucket.toString().c_str(), reply.getMsgId(), s.pendingId);
- DUMP_LOGGED_BUCKET_OPERATIONS(id);
+ DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId());
return;
}
bool clearState = true;
@@ -1587,7 +1582,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,
}
} catch (std::exception& e) {
_env._fileStorHandler.clearMergeStatus(
- id,
+ bucket.getBucket(),
api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE,
e.what()));
throw;
@@ -1596,7 +1591,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,
}
if (clearState) {
- _env._fileStorHandler.clearMergeStatus(id);
+ _env._fileStorHandler.clearMergeStatus(bucket.getBucket());
}
if (replyToSend.get()) {
// Send on
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index 13a2bbd906b..074e5aafb3c 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -316,7 +316,7 @@ PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd)
_env._metrics.createBuckets,
_env._component.getClock()));
LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str());
- if (_env._fileStorHandler.isMerging(cmd.getBucketId())) {
+ if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
LOG(warning, "Bucket %s was merging at create time. Unexpected.",
cmd.getBucketId().toString().c_str());
DUMP_LOGGED_BUCKET_OPERATIONS(cmd.getBucketId());
@@ -385,8 +385,8 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd)
_env._component.getClock()));
LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str());
LOG_BUCKET_OPERATION(cmd.getBucketId(), "deleteBucket()");
- if (_env._fileStorHandler.isMerging(cmd.getBucketId())) {
- _env._fileStorHandler.clearMergeStatus(cmd.getBucketId(),
+ if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
+ _env._fileStorHandler.clearMergeStatus(cmd.getBucket(),
api::ReturnCode(api::ReturnCode::ABORTED,
"Bucket was deleted during the merge"));
}