summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirstorli@yahoo.no>2017-11-06 16:42:20 +0100
committerGitHub <noreply@github.com>2017-11-06 16:42:20 +0100
commit5e9a9227722eec66390b2670359a41b4703b343a (patch)
treea1c2edd663d8bf2089afb0459c9df402b8cee529
parent34e634df6d6c1c0494114f185330b44587116b42 (diff)
parente196f9afeb9465163b96a63d0966f3547ce8ea9e (diff)
Merge pull request #4018 from vespa-engine/toregge/add-distributor-bucket-space-to-ideal-state-operation
Add distributor bucket space to ideal state operation
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp7
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp10
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h6
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp7
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp12
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.cpp6
9 files changed, 37 insertions, 23 deletions
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
index d78262709e3..53d9cc018f9 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
@@ -3,6 +3,7 @@
#include "garbagecollectionoperation.h"
#include <vespa/storage/distributor/idealstatemanager.h>
#include <vespa/storage/distributor/distributor.h>
+#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/storageapi/message/removelocation.h>
#include <vespa/log/log.h>
@@ -21,7 +22,7 @@ GarbageCollectionOperation::~GarbageCollectionOperation() { }
void
GarbageCollectionOperation::onStart(DistributorMessageSender& sender)
{
- BucketDatabase::Entry entry = _manager->getDistributorComponent().getBucketDatabase().get(getBucketId());
+ BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(getBucketId());
std::vector<uint16_t> nodes = entry->getNodes();
for (uint32_t i = 0; i < nodes.size(); i++) {
@@ -62,11 +63,11 @@ GarbageCollectionOperation::onReceive(DistributorMessageSender&,
if (_tracker.finished()) {
if (_ok) {
- BucketDatabase::Entry dbentry = _manager->getDistributorComponent().getBucketDatabase().get(getBucketId());
+ BucketDatabase::Entry dbentry = _bucketSpace->getBucketDatabase().get(getBucketId());
if (dbentry.valid()) {
dbentry->setLastGarbageCollectionTime(
_manager->getDistributorComponent().getClock().getTimeInSeconds().getTime());
- _manager->getDistributorComponent().getBucketDatabase().update(dbentry);
+ _bucketSpace->getBucketDatabase().update(dbentry);
}
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
index 3095dce7b87..1e4067eb128 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
@@ -4,6 +4,7 @@
#include <vespa/storage/distributor/pendingmessagetracker.h>
#include <vespa/storage/distributor/idealstatemetricsset.h>
#include <vespa/storage/distributor/pendingmessagetracker.h>
+#include <vespa/storage/distributor/distributor_bucket_space_repo.h>
#include <vespa/storageapi/messageapi/maintenancecommand.h>
#include <vespa/log/log.h>
@@ -25,7 +26,8 @@ const uint32_t IdealStateOperation::MAINTENANCE_MESSAGE_TYPES[] =
};
IdealStateOperation::IdealStateOperation(const BucketAndNodes& bucketAndNodes)
- : _manager(NULL),
+ : _manager(nullptr),
+ _bucketSpace(nullptr),
_bucketAndNodes(bucketAndNodes),
_ok(true),
_priority(255)
@@ -78,6 +80,12 @@ BucketAndNodes::toString() const
}
void
+IdealStateOperation::setIdealStateManager(IdealStateManager* manager) {
+ _manager = manager;
+ _bucketSpace = &_manager->getBucketSpaceRepo().get(getBucket().getBucketSpace());
+};
+
+void
IdealStateOperation::done()
{
if (_manager != NULL) {
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
index 9824ae0630f..e8480902549 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
@@ -10,6 +10,7 @@
namespace storage::distributor {
+class DistributorBucketSpace;
class PendingMessageTracker;
class IdealStateManager;
@@ -165,9 +166,7 @@ public:
@param manager The ideal state manager.
*/
- void setIdealStateManager(IdealStateManager* manager) {
- _manager = manager;
- };
+ void setIdealStateManager(IdealStateManager* manager);
/**
Returns the type of operation this is.
@@ -224,6 +223,7 @@ protected:
friend class IdealStateManager;
IdealStateManager* _manager;
+ DistributorBucketSpace *_bucketSpace;
BucketAndNodes _bucketAndNodes;
std::string _detailedReason;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
index 77135f56399..52a4a5c195c 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
@@ -2,6 +2,7 @@
#include "joinoperation.h"
#include <vespa/storageapi/message/bucketsplitting.h>
+#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <climits>
#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".distributor.operation.idealstate.join");
@@ -42,7 +43,7 @@ JoinOperation::NodeToBuckets
JoinOperation::resolveSourceBucketsPerTargetNode() const
{
NodeToBuckets nodeToBuckets;
- const auto& db(_manager->getDistributorComponent().getBucketDatabase());
+ const auto& db(_bucketSpace->getBucketDatabase());
for (const auto& bucket : _bucketsToJoin) {
BucketDatabase::Entry entry(db.get(bucket));
@@ -117,7 +118,7 @@ JoinOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP&
LOG(spam, "Adding joined bucket %s", getBucketId().toString().c_str());
}
} else if (rep.getResult().getResult() == api::ReturnCode::BUCKET_NOT_FOUND
- && _manager->getDistributorComponent().getBucketDatabase().get(getBucketId())->getNode(node) != 0)
+ && _bucketSpace->getBucketDatabase().get(getBucketId())->getNode(node) != 0)
{
_manager->getDistributorComponent().recheckBucketInfo(node, getBucket());
LOGBP(warning, "Join failed to find %s: %s",
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
index e889dbe279b..271ac35968e 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "mergeoperation.h"
#include <vespa/storage/distributor/idealstatemanager.h>
+#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <array>
#include <vespa/log/bufferedlogger.h>
@@ -104,7 +105,7 @@ struct NodeIndexComparator
void
MergeOperation::onStart(DistributorMessageSender& sender)
{
- BucketDatabase::Entry entry = _manager->getDistributorComponent().getBucketDatabase().get(getBucketId());
+ BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(getBucketId());
if (!entry.valid()) {
LOGBP(debug, "Unable to merge nonexisting bucket %s", getBucketId().toString().c_str());
_ok = false;
@@ -126,7 +127,7 @@ MergeOperation::onStart(DistributorMessageSender& sender)
}
_infoBefore = entry.getBucketInfo();
- generateSortedNodeList(_manager->getDistributorComponent().getDistribution(),
+ generateSortedNodeList(_bucketSpace->getDistribution(),
clusterState,
getBucketId(),
_limiter,
@@ -273,7 +274,7 @@ MergeOperation::onReceive(DistributorMessageSender& sender,
_ok = result.success();
if (_ok) {
BucketDatabase::Entry entry(
- _manager->getDistributorComponent().getBucketDatabase().get(getBucketId()));
+ _bucketSpace->getBucketDatabase().get(getBucketId()));
if (!entry.valid()) {
LOG(debug, "Bucket %s no longer exists after merge",
getBucketId().toString().c_str());
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
index 6c0245cb590..dd7155d0f38 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
@@ -15,7 +15,7 @@ RemoveBucketOperation::onStartInternal(DistributorMessageSender& sender)
{
std::vector<std::pair<uint16_t, std::shared_ptr<api::DeleteBucketCommand> > > msgs;
- BucketDatabase::Entry entry = _manager->getDistributorComponent().getBucketDatabase().get(getBucketId());
+ BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(getBucketId());
for (uint32_t i = 0; i < getNodes().size(); ++i) {
uint16_t node = getNodes()[i];
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp
index f3528d30aba..1acb2dcc64b 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp
@@ -2,6 +2,7 @@
#include "setbucketstateoperation.h"
#include <vespa/storage/distributor/idealstatemanager.h>
+#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/log/log.h>
LOG_SETUP(".distributor.operation.idealstate.setactive");
@@ -83,7 +84,7 @@ SetBucketStateOperation::onReceive(DistributorMessageSender& sender,
bool deactivate = false;
if (reply->getResult().success()) {
BucketDatabase::Entry entry =
- _manager->getDistributorComponent().getBucketDatabase().get(rep.getBucketId());
+ _bucketSpace->getBucketDatabase().get(rep.getBucketId());
if (entry.valid()) {
const BucketCopy* copy = entry->getNode(node);
@@ -103,7 +104,7 @@ SetBucketStateOperation::onReceive(DistributorMessageSender& sender,
node,
bInfo).setTrusted(copy->trusted()));
- _manager->getDistributorComponent().getBucketDatabase().update(entry);
+ _bucketSpace->getBucketDatabase().update(entry);
}
} else {
LOG(debug, "%s did not exist when receiving %s",
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
index a8f547afe45..1b40f744a80 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
@@ -4,6 +4,7 @@
#include <vespa/storage/distributor/idealstatemanager.h>
#include <vespa/storage/common/bucketoperationlogger.h>
#include <vespa/storageapi/message/bucketsplitting.h>
+#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <climits>
#include <vespa/log/bufferedlogger.h>
@@ -26,8 +27,7 @@ SplitOperation::onStart(DistributorMessageSender& sender)
{
_ok = false;
- BucketDatabase::Entry entry = _manager->getDistributorComponent()
- .getBucketDatabase().get(getBucketId());
+ BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(getBucketId());
for (uint32_t i = 0; i < entry->getNodeCount(); i++) {
std::shared_ptr<api::SplitBucketCommand> msg(
@@ -66,7 +66,7 @@ SplitOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP
if (rep.getResult().success()) {
BucketDatabase::Entry entry =
- _manager->getDistributorComponent().getBucketDatabase().get(rep.getBucketId());
+ _bucketSpace->getBucketDatabase().get(rep.getBucketId());
if (entry.valid()) {
entry->removeNode(node);
@@ -74,9 +74,9 @@ SplitOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP
if (entry->getNodeCount() == 0) {
LOG(spam, "Removing split bucket %s",
getBucketId().toString().c_str());
- _manager->getDistributorComponent().getBucketDatabase().remove(rep.getBucketId());
+ _bucketSpace->getBucketDatabase().remove(rep.getBucketId());
} else {
- _manager->getDistributorComponent().getBucketDatabase().update(entry);
+ _bucketSpace->getBucketDatabase().update(entry);
}
ost << getBucketId() << " => ";
@@ -115,7 +115,7 @@ SplitOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP
}
} else if (
rep.getResult().getResult() == api::ReturnCode::BUCKET_NOT_FOUND
- && _manager->getDistributorComponent().getBucketDatabase().get(rep.getBucketId())->getNode(node) != 0)
+ && _bucketSpace->getBucketDatabase().get(rep.getBucketId())->getNode(node) != 0)
{
_manager->getDistributorComponent().recheckBucketInfo(node, getBucket());
LOGBP(debug, "Split failed for %s: bucket not found. Storage and "
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
index a1b43149963..c76e4802fb3 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
@@ -4,6 +4,7 @@
#include <vespa/storage/common/vectorprinter.h>
#include <vespa/storage/common/bucketoperationlogger.h>
#include <vespa/storageapi/message/persistence.h>
+#include "distributor_bucket_space_repo.h"
#include <vespa/log/log.h>
@@ -123,8 +124,9 @@ PersistenceMessageTrackerImpl::canSendReplyEarly() const
LOG(spam, "Can't return early because we have already replied or failed");
return false;
}
-
- const lib::Distribution& distribution = _manager.getDistribution();
+ auto &bucketSpaceRepo(_manager.getBucketSpaceRepo());
+ auto &bucketSpace(bucketSpaceRepo.get(_reply->getBucket().getBucketSpace()));
+ const lib::Distribution& distribution = bucketSpace.getDistribution();
if (distribution.getInitialRedundancy() == 0) {
LOG(spam, "Not returning early because initial redundancy wasn't set");