summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@oath.com>2017-11-13 11:14:29 +0000
committerTor Egge <Tor.Egge@oath.com>2017-11-13 13:14:46 +0000
commit61a4023b39195912d642ad5256f30730c5567feb (patch)
tree555f02ae217ed1411944d48966c504c861169419 /storage
parent372b70b09f7abba1aafe4931b44f01d7f9415d4a (diff)
Partially change BucketDBUpdater to use document::Bucket instead of
document::BucketId.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp12
-rw-r--r--storage/src/tests/distributor/distributortest.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp36
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h23
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp4
5 files changed, 38 insertions, 39 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index 03a51cf6180..9f931c3f5c0 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -880,7 +880,7 @@ BucketDBUpdaterTest::testInitializingWhileRecheck()
CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.commands.size());
CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size());
- getBucketDBUpdater().recheckBucketInfo(1, document::BucketId(16, 3));
+ getBucketDBUpdater().recheckBucketInfo(1, makeDocumentBucket(document::BucketId(16, 3)));
for (int i=0; i<2; i++) {
fakeBucketReply(systemState,
@@ -1010,7 +1010,7 @@ BucketDBUpdaterTest::testRecheckNodeWithFailure()
_sender.clear();
- getBucketDBUpdater().recheckBucketInfo(1, document::BucketId(16, 3));
+ getBucketDBUpdater().recheckBucketInfo(1, makeDocumentBucket(document::BucketId(16, 3)));
CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
@@ -1060,7 +1060,7 @@ BucketDBUpdaterTest::testRecheckNode()
_sender.clear();
- getBucketDBUpdater().recheckBucketInfo(1, document::BucketId(16, 3));
+ getBucketDBUpdater().recheckBucketInfo(1, makeDocumentBucket(document::BucketId(16, 3)));
CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
@@ -1954,7 +1954,7 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInCurrentState()
}
_sender.clear();
- getBucketDBUpdater().recheckBucketInfo(0, bucket);
+ getBucketDBUpdater().recheckBucketInfo(0, makeDocumentBucket(bucket));
CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
std::shared_ptr<api::RequestBucketInfoCommand> rbi(
@@ -1986,7 +1986,7 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInPendingState()
}
_sender.clear();
- getBucketDBUpdater().recheckBucketInfo(0, bucket);
+ getBucketDBUpdater().recheckBucketInfo(0, makeDocumentBucket(bucket));
CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
std::shared_ptr<api::RequestBucketInfoCommand> rbi(
@@ -1999,7 +1999,7 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInPendingState()
CPPUNIT_ASSERT(getBucketDBUpdater().getDistributorComponent()
.ownsBucketInCurrentState(makeDocumentBucket(bucket)));
CPPUNIT_ASSERT(!getBucketDBUpdater()
- .checkOwnershipInPendingState(bucket).isOwned());
+ .checkOwnershipInPendingState(makeDocumentBucket(bucket)).isOwned());
sendFakeReplyForSingleBucketRequest(*rbi);
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index cbc78157911..1640af0f871 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -556,7 +556,7 @@ Distributor_Test::testNoDbResurrectionForBucketNotOwnedInPendingState()
document::BucketId nonOwnedBucket(16, 3);
CPPUNIT_ASSERT(!getBucketDBUpdater()
- .checkOwnershipInPendingState(nonOwnedBucket).isOwned());
+ .checkOwnershipInPendingState(makeDocumentBucket(nonOwnedBucket)).isOwned());
CPPUNIT_ASSERT(!getBucketDBUpdater().getDistributorComponent()
.checkOwnershipInPendingAndCurrentState(makeDocumentBucket(nonOwnedBucket))
.isOwned());
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index b68e53c7136..faf508cf4e6 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -2,6 +2,7 @@
#include "bucketdbupdater.h"
#include "distributor.h"
+#include "distributor_bucket_space_repo.h"
#include "simpleclusterinformation.h"
#include <vespa/storage/common/bucketoperationlogger.h>
#include <vespa/storageapi/message/persistence.h>
@@ -59,13 +60,11 @@ BucketDBUpdater::hasPendingClusterState() const
}
BucketOwnership
-BucketDBUpdater::checkOwnershipInPendingState(const document::BucketId& b) const
+BucketDBUpdater::checkOwnershipInPendingState(const document::Bucket& b) const
{
if (hasPendingClusterState()) {
const lib::ClusterState& state(_pendingClusterState->getNewClusterState());
- const lib::Distribution& distribution(_pendingClusterState->getDistribution());
- document::Bucket bucket(BucketSpace::placeHolder(), b);
- if (!_bucketSpaceComponent.ownsBucketInState(distribution, state, bucket)) {
+ if (!_bucketSpaceComponent.ownsBucketInState(state, b)) {
return BucketOwnership::createNotOwnedInState(state);
}
}
@@ -75,7 +74,7 @@ BucketDBUpdater::checkOwnershipInPendingState(const document::BucketId& b) const
void
BucketDBUpdater::sendRequestBucketInfo(
uint16_t node,
- const document::BucketId& bucket,
+ const document::Bucket& bucket,
const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard)
{
if (!_bucketSpaceComponent.storageNodeIsUp(node)) {
@@ -83,10 +82,10 @@ BucketDBUpdater::sendRequestBucketInfo(
}
std::vector<document::BucketId> buckets;
- buckets.push_back(bucket);
+ buckets.push_back(bucket.getBucketId());
std::shared_ptr<api::RequestBucketInfoCommand> msg(
- new api::RequestBucketInfoCommand(BucketSpace::placeHolder(), buckets));
+ new api::RequestBucketInfoCommand(bucket.getBucketSpace(), buckets));
LOG(debug,
"Sending request bucket info command %lu for "
@@ -106,9 +105,9 @@ BucketDBUpdater::sendRequestBucketInfo(
void
BucketDBUpdater::recheckBucketInfo(uint32_t nodeIdx,
- const document::BucketId& bid)
+ const document::Bucket& bucket)
{
- sendRequestBucketInfo(nodeIdx, bid, std::shared_ptr<MergeReplyGuard>());
+ sendRequestBucketInfo(nodeIdx, bucket, std::shared_ptr<MergeReplyGuard>());
}
void
@@ -248,7 +247,7 @@ BucketDBUpdater::onMergeBucketReply(
// bucket again to make sure it's ok.
for (uint32_t i = 0; i < reply->getNodes().size(); i++) {
sendRequestBucketInfo(reply->getNodes()[i].index,
- reply->getBucketId(),
+ reply->getBucket(),
replyGuard);
}
@@ -258,7 +257,7 @@ BucketDBUpdater::onMergeBucketReply(
void
BucketDBUpdater::enqueueRecheckUntilPendingStateEnabled(
uint16_t node,
- const document::BucketId& bucket)
+ const document::Bucket& bucket)
{
LOG(spam,
"DB updater has a pending cluster state, enqueuing recheck "
@@ -305,10 +304,10 @@ BucketDBUpdater::onNotifyBucketChange(
if (hasPendingClusterState()) {
enqueueRecheckUntilPendingStateEnabled(cmd->getSourceIndex(),
- cmd->getBucketId());
+ cmd->getBucket());
} else {
sendRequestBucketInfo(cmd->getSourceIndex(),
- cmd->getBucketId(),
+ cmd->getBucket(),
std::shared_ptr<MergeReplyGuard>());
}
@@ -357,7 +356,7 @@ BucketDBUpdater::handleSingleBucketInfoFailure(
LOG(debug, "Request bucket info failed towards node %d: error was %s",
req.targetNode, repl->getResult().toString().c_str());
- if (req.bucket != document::BucketId(0)) {
+ if (req.bucket.getBucketId() != document::BucketId(0)) {
framework::MilliSecTime sendTime(_bucketSpaceComponent.getClock());
sendTime += framework::MilliSecTime(100);
_delayedRequests.emplace_back(sendTime, req);
@@ -451,11 +450,12 @@ BucketDBUpdater::addBucketInfoForNode(
}
void
-BucketDBUpdater::findRelatedBucketsInDatabase(uint16_t node, const document::BucketId& bucketId,
+BucketDBUpdater::findRelatedBucketsInDatabase(uint16_t node, const document::Bucket& bucket,
BucketListMerger::BucketList& existing)
{
+ auto &distributorBucketSpace(_bucketSpaceComponent.getBucketSpaceRepo().get(bucket.getBucketSpace()));
std::vector<BucketDatabase::Entry> entries;
- _bucketSpaceComponent.getBucketDatabase().getAll(bucketId, entries);
+ distributorBucketSpace.getBucketDatabase().getAll(bucket.getBucketId(), entries);
for (const BucketDatabase::Entry & entry : entries) {
addBucketInfoForNode(entry, node, existing);
@@ -585,10 +585,10 @@ BucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos,
{
xos << XmlTag("storagenode")
<< XmlAttribute("index", entry.second.targetNode);
- if (entry.second.bucket.getRawId() == 0) {
+ if (entry.second.bucket.getBucketId().getRawId() == 0) {
xos << XmlAttribute("bucket", ALL);
} else {
- xos << XmlAttribute("bucket", entry.second.bucket.getId(), XmlAttribute::HEX);
+ xos << XmlAttribute("bucket", entry.second.bucket.getBucketId().getId(), XmlAttribute::HEX);
}
xos << XmlAttribute("sendtimestamp", entry.second.timestamp)
<< XmlEndTag();
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index ee07c46754f..0f85bfb2494 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -8,7 +8,7 @@
#include "pendingclusterstate.h"
#include "distributor_bucket_space_component.h"
#include "outdated_nodes_map.h"
-#include <vespa/document/bucket/bucketid.h>
+#include <vespa/document/bucket/bucket.h>
#include <vespa/storageapi/messageapi/returncode.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/vdslib/state/clusterstate.h>
@@ -38,8 +38,8 @@ public:
~BucketDBUpdater();
void flush();
- BucketOwnership checkOwnershipInPendingState(const document::BucketId&) const;
- void recheckBucketInfo(uint32_t nodeIdx, const document::BucketId& bid);
+ BucketOwnership checkOwnershipInPendingState(const document::Bucket&) const;
+ void recheckBucketInfo(uint32_t nodeIdx, const document::Bucket& bucket);
bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd) override;
bool onRequestBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply> & repl) override;
@@ -84,9 +84,9 @@ private:
struct BucketRequest {
BucketRequest()
- : targetNode(0), bucket(0), timestamp(0) {};
+ : targetNode(0), bucket(), timestamp(0) {};
- BucketRequest(uint16_t t, uint64_t currentTime, const document::BucketId& b,
+ BucketRequest(uint16_t t, uint64_t currentTime, const document::Bucket& b,
const std::shared_ptr<MergeReplyGuard>& guard)
: targetNode(t),
bucket(b),
@@ -94,7 +94,7 @@ private:
_mergeReplyGuard(guard) {};
uint16_t targetNode;
- document::BucketId bucket;
+ document::Bucket bucket;
uint64_t timestamp;
std::shared_ptr<MergeReplyGuard> _mergeReplyGuard;
@@ -102,11 +102,11 @@ private:
struct EnqueuedBucketRecheck {
uint16_t node;
- document::BucketId bucket;
+ document::Bucket bucket;
EnqueuedBucketRecheck() : node(0), bucket() {}
- EnqueuedBucketRecheck(uint16_t _node, const document::BucketId& _bucket)
+ EnqueuedBucketRecheck(uint16_t _node, const document::Bucket& _bucket)
: node(_node),
bucket(_bucket)
{}
@@ -124,7 +124,6 @@ private:
bool hasPendingClusterState() const;
bool pendingClusterStateAccepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
- bool bucketOwnedAccordingToPendingState(const document::BucketId& bucketId) const;
bool processSingleBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
void handleSingleBucketInfoFailure(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
const BucketRequest& req);
@@ -134,7 +133,7 @@ private:
const BucketRequest& req);
void convertBucketInfoToBucketList(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
uint16_t targetNode, BucketListMerger::BucketList& newList);
- void sendRequestBucketInfo(uint16_t node, const document::BucketId& bucket,
+ void sendRequestBucketInfo(uint16_t node, const document::Bucket& bucket,
const std::shared_ptr<MergeReplyGuard>& mergeReply);
void addBucketInfoForNode(const BucketDatabase::Entry& e, uint16_t node,
BucketListMerger::BucketList& existing) const;
@@ -146,7 +145,7 @@ private:
* in bucketId, or that bucketId is contained in, that have copies
* on the given node.
*/
- void findRelatedBucketsInDatabase(uint16_t node, const document::BucketId& bucketId,
+ void findRelatedBucketsInDatabase(uint16_t node, const document::Bucket& bucket,
BucketListMerger::BucketList& existing);
/**
@@ -163,7 +162,7 @@ private:
void enableCurrentClusterStateInDistributor();
void addCurrentStateToClusterStateHistory();
- void enqueueRecheckUntilPendingStateEnabled(uint16_t node, const document::BucketId&);
+ void enqueueRecheckUntilPendingStateEnabled(uint16_t node, const document::Bucket&);
void sendAllQueuedBucketRechecks();
friend class BucketDBUpdater_Test;
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 11c029c5c94..fe3a44e83ab 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -154,7 +154,7 @@ const DistributorBucketSpace& Distributor::getDefaultBucketSpace() const noexcep
BucketOwnership
Distributor::checkOwnershipInPendingState(const document::Bucket &b) const
{
- return _bucketDBUpdater.checkOwnershipInPendingState(b.getBucketId());
+ return _bucketDBUpdater.checkOwnershipInPendingState(b);
}
void
@@ -455,7 +455,7 @@ Distributor::storageDistributionChanged()
void
Distributor::recheckBucketInfo(uint16_t nodeIdx, const document::Bucket &bucket) {
- _bucketDBUpdater.recheckBucketInfo(nodeIdx, bucket.getBucketId());
+ _bucketDBUpdater.recheckBucketInfo(nodeIdx, bucket);
}
namespace {