summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@oath.com>2017-11-03 14:23:07 +0000
committerTor Egge <Tor.Egge@oath.com>2017-11-03 14:23:07 +0000
commit27ec059fa2656d1a5009bbbf3abff20ae0c699b3 (patch)
tree7bc3b5a24f0b3eea171949403a2f01b6add1527e
parentaf114174557efbecdac25a1164cc7fdb5f7e21ab (diff)
Pass distributor bucket space to get operation, put operation, two phase
update operation and update operation.
-rw-r--r--storage/src/tests/distributor/distributortestutil.cpp6
-rw-r--r--storage/src/tests/distributor/distributortestutil.h2
-rw-r--r--storage/src/tests/distributor/getoperationtest.cpp1
-rw-r--r--storage/src/tests/distributor/putoperationtest.cpp1
-rw-r--r--storage/src/tests/distributor/twophaseupdateoperationtest.cpp2
-rw-r--r--storage/src/tests/distributor/updateoperationtest.cpp1
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp9
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.h3
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.cpp21
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.h4
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp11
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h4
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp7
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/updateoperation.h4
15 files changed, 62 insertions, 19 deletions
diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp
index 613ce7aad4e..5deb31f8579 100644
--- a/storage/src/tests/distributor/distributortestutil.cpp
+++ b/storage/src/tests/distributor/distributortestutil.cpp
@@ -343,6 +343,12 @@ DistributorTestUtil::getConfig() {
return const_cast<DistributorConfiguration&>(_distributor->getConfig());
}
+DistributorBucketSpace &
+DistributorTestUtil::getDistributorBucketSpace()
+{
+ return _distributor->getDefaultBucketSpace();
+}
+
BucketDatabase&
DistributorTestUtil::getBucketDatabase() {
return _distributor->getDefaultBucketSpace().getBucketDatabase();
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index 4de84e47bfe..4f09c11ac03 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -19,6 +19,7 @@ namespace distributor {
class BucketDBUpdater;
class Distributor;
+class DistributorBucketSpace;
class IdealStateManager;
class ExternalOperationHandler;
class Operation;
@@ -121,6 +122,7 @@ public:
}
// TODO explicit notion of bucket spaces for tests
+ DistributorBucketSpace &getDistributorBucketSpace();
BucketDatabase& getBucketDatabase();
const BucketDatabase& getBucketDatabase() const;
diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp
index ee1ea70163f..8bb8e24c17a 100644
--- a/storage/src/tests/distributor/getoperationtest.cpp
+++ b/storage/src/tests/distributor/getoperationtest.cpp
@@ -75,6 +75,7 @@ public:
new api::GetCommand(makeDocumentBucket(document::BucketId(0)), docId, "[all]"));
op.reset(new GetOperation(getExternalOperationHandler(),
+ getDistributorBucketSpace(),
msg,
getDistributor().getMetrics().
gets[msg->getLoadType()]));
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp
index 755802127fe..7f54e163006 100644
--- a/storage/src/tests/distributor/putoperationtest.cpp
+++ b/storage/src/tests/distributor/putoperationtest.cpp
@@ -131,6 +131,7 @@ public:
void sendPut(std::shared_ptr<api::PutCommand> msg) {
op.reset(new PutOperation(getExternalOperationHandler(),
+ getDistributorBucketSpace(),
msg,
getDistributor().getMetrics().
puts[msg->getLoadType()]));
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
index 9f6a7010179..a7418629f81 100644
--- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
+++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
@@ -337,7 +337,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState,
ExternalOperationHandler& handler = getExternalOperationHandler();
return std::make_shared<TwoPhaseUpdateOperation>(
- handler, msg, getDistributor().getMetrics());
+ handler, getDistributorBucketSpace(), msg, getDistributor().getMetrics());
}
diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp
index 62590eabdad..c15a2b4057c 100644
--- a/storage/src/tests/distributor/updateoperationtest.cpp
+++ b/storage/src/tests/distributor/updateoperationtest.cpp
@@ -93,6 +93,7 @@ UpdateOperation_Test::sendUpdate(const std::string& bucketState)
ExternalOperationHandler& handler = getExternalOperationHandler();
return std::shared_ptr<UpdateOperation>(
new UpdateOperation(handler,
+ getDistributorBucketSpace(),
msg,
getDistributor().getMetrics().updates[msg->getLoadType()]));
}
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index 0d9135a375f..d0c14630451 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -138,7 +138,9 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Put)
auto handle = _mutationSequencer.try_acquire(cmd->getDocumentId());
if (allowMutation(handle)) {
- _op = std::make_shared<PutOperation>(*this, cmd, getMetrics().puts[cmd->getLoadType()], std::move(handle));
+ _op = std::make_shared<PutOperation>(*this,
+ _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()),
+ cmd, getMetrics().puts[cmd->getLoadType()], std::move(handle));
} else {
sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId()));
}
@@ -161,7 +163,9 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Update)
}
auto handle = _mutationSequencer.try_acquire(cmd->getDocumentId());
if (allowMutation(handle)) {
- _op = std::make_shared<TwoPhaseUpdateOperation>(*this, cmd, getMetrics(), std::move(handle));
+ _op = std::make_shared<TwoPhaseUpdateOperation>(*this,
+ _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()),
+ cmd, getMetrics(), std::move(handle));
} else {
sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId()));
}
@@ -235,6 +239,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get)
_op = Operation::SP(new GetOperation(
*this,
+ _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()),
cmd,
getMetrics().gets[cmd->getLoadType()]));
return true;
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
index b71e2728f5b..246020c191c 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
@@ -5,6 +5,7 @@
#include <vespa/storageapi/message/persistence.h>
#include <vespa/vdslib/state/nodestate.h>
#include <vespa/document/fieldvalue/document.h>
+#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/log/log.h>
LOG_SETUP(".distributor.callback.doc.get");
@@ -46,10 +47,12 @@ GetOperation::GroupId::operator==(const GroupId& other) const
}
GetOperation::GetOperation(DistributorComponent& manager,
+ DistributorBucketSpace &bucketSpace,
const std::shared_ptr<api::GetCommand> & msg,
PersistenceOperationMetricSet& metric)
: Operation(),
_manager(manager),
+ _bucketSpace(bucketSpace),
_msg(msg),
_returnCode(api::ReturnCode::OK),
_doc((document::Document*)NULL),
@@ -239,7 +242,7 @@ GetOperation::assignTargetNodeGroups()
document::BucketId bid = bucketIdFactory.getBucketId(_msg->getDocumentId());
std::vector<BucketDatabase::Entry> entries;
- _manager.getBucketDatabase().getParents(bid, entries);
+ _bucketSpace.getBucketDatabase().getParents(bid, entries);
for (uint32_t j = 0; j < entries.size(); ++j) {
const BucketDatabase::Entry& e = entries[j];
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
index 12748c38b90..03279a87152 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
@@ -18,11 +18,13 @@ class PersistenceOperationMetricSet;
namespace distributor {
class DistributorComponent;
+class DistributorBucketSpace;
class GetOperation : public Operation
{
public:
GetOperation(DistributorComponent& manager,
+ DistributorBucketSpace &bucketSpace,
const std::shared_ptr<api::GetCommand> & msg,
PersistenceOperationMetricSet& metric);
@@ -71,6 +73,7 @@ private:
std::map<GroupId, GroupVector> _responses;
DistributorComponent& _manager;
+ DistributorBucketSpace &_bucketSpace;
std::shared_ptr<api::GetCommand> _msg;
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
index 8467f9684bf..7ef03cb696a 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
@@ -12,6 +12,7 @@
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/vdslib/distribution/idealnodecalculatorimpl.h>
+#include <vespa/storage/distributor/distributor_bucket_space.h>
LOG_SETUP(".distributor.callback.doc.put");
@@ -21,6 +22,7 @@ using namespace storage;
using document::BucketSpace;
PutOperation::PutOperation(DistributorComponent& manager,
+ DistributorBucketSpace &bucketSpace,
const std::shared_ptr<api::PutCommand> & msg,
PersistenceOperationMetricSet& metric,
SequencingHandle sequencingHandle)
@@ -31,7 +33,8 @@ PutOperation::PutOperation(DistributorComponent& manager,
msg->getTimestamp()),
_tracker(_trackerInstance),
_msg(msg),
- _manager(manager)
+ _manager(manager),
+ _bucketSpace(bucketSpace)
{
};
@@ -190,11 +193,11 @@ PutOperation::insertDatabaseEntryAndScheduleCreateBucket(
assert(!multipleBuckets);
(void) multipleBuckets;
BucketDatabase::Entry entry(
- _manager.getBucketDatabase().get(lastBucket));
+ _bucketSpace.getBucketDatabase().get(lastBucket));
std::vector<uint16_t> idealState(
- _manager.getDistribution().getIdealStorageNodes(
+ _bucketSpace.getDistribution().getIdealStorageNodes(
_manager.getClusterState(), lastBucket, "ui"));
- active = ActiveCopy::calculate(idealState, _manager.getDistribution(),
+ active = ActiveCopy::calculate(idealState, _bucketSpace.getDistribution(),
entry);
LOG(debug, "Active copies for bucket %s: %s",
entry.getBucketId().toString().c_str(), active.toString().c_str());
@@ -203,7 +206,7 @@ PutOperation::insertDatabaseEntryAndScheduleCreateBucket(
copy.setActive(true);
entry->updateNode(copy);
}
- _manager.getBucketDatabase().update(entry);
+ _bucketSpace.getBucketDatabase().update(entry);
}
for (uint32_t i=0, n=copies.size(); i<n; ++i) {
if (!copies[i].isNewCopy()) continue;
@@ -274,13 +277,13 @@ PutOperation::onStart(DistributorMessageSender& sender)
std::vector<document::BucketId> bucketsToCheckForSplit;
lib::IdealNodeCalculatorImpl idealNodeCalculator;
- idealNodeCalculator.setDistribution(_manager.getDistribution());
+ idealNodeCalculator.setDistribution(_bucketSpace.getDistribution());
idealNodeCalculator.setClusterState(_manager.getClusterState());
OperationTargetResolverImpl targetResolver(
- _manager.getBucketDatabase(),
+ _bucketSpace.getBucketDatabase(),
idealNodeCalculator,
_manager.getDistributor().getConfig().getMinimalBucketSplit(),
- _manager.getDistribution().getRedundancy());
+ _bucketSpace.getDistribution().getRedundancy());
OperationTargetList targets(targetResolver.getTargets(
OperationTargetResolver::PUT, bid));
@@ -299,7 +302,7 @@ PutOperation::onStart(DistributorMessageSender& sender)
// Mark any entries we're not feeding to as not trusted.
std::vector<BucketDatabase::Entry> entries;
- _manager.getBucketDatabase().getParents(bid, entries);
+ _bucketSpace.getBucketDatabase().getParents(bid, entries);
std::vector<PersistenceMessageTracker::ToSend> createBucketBatch;
if (targets.hasAnyNewCopies()) {
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
index 191a0116031..8beffe8b2c3 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
@@ -20,10 +20,13 @@ namespace api {
}
namespace distributor {
+class DistributorBucketSpace;
+
class PutOperation : public SequencedOperation
{
public:
PutOperation(DistributorComponent& manager,
+ DistributorBucketSpace &bucketSpace,
const std::shared_ptr<api::PutCommand> & msg,
PersistenceOperationMetricSet& metric,
SequencingHandle sequencingHandle = SequencingHandle());
@@ -72,6 +75,7 @@ private:
std::shared_ptr<api::PutCommand> _msg;
DistributorComponent& _manager;
+ DistributorBucketSpace &_bucketSpace;
};
} // distributor
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index 838cfd5a55f..79ffee7430c 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -10,6 +10,7 @@
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/batch.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/log/log.h>
LOG_SETUP(".distributor.callback.twophaseupdate");
@@ -22,6 +23,7 @@ namespace distributor {
TwoPhaseUpdateOperation::TwoPhaseUpdateOperation(
DistributorComponent& manager,
+ DistributorBucketSpace &bucketSpace,
const std::shared_ptr<api::UpdateCommand>& msg,
DistributorMetricSet& metrics,
SequencingHandle sequencingHandle)
@@ -32,6 +34,7 @@ TwoPhaseUpdateOperation::TwoPhaseUpdateOperation(
_updateCmd(msg),
_updateReply(),
_manager(manager),
+ _bucketSpace(bucketSpace),
_sendState(SendState::NONE_SENT),
_mode(Mode::FAST_PATH),
_replySent(false)
@@ -148,7 +151,7 @@ TwoPhaseUpdateOperation::isFastPathPossible() const
{
// Fast path iff bucket exists AND is consistent (split and copies).
std::vector<BucketDatabase::Entry> entries;
- _manager.getBucketDatabase().getParents(_updateDocBucketId, entries);
+ _bucketSpace.getBucketDatabase().getParents(_updateDocBucketId, entries);
if (entries.size() != 1) {
return false;
@@ -161,7 +164,7 @@ TwoPhaseUpdateOperation::startFastPathUpdate(DistributorMessageSender& sender)
{
_mode = Mode::FAST_PATH;
std::shared_ptr<UpdateOperation> updateOperation(
- new UpdateOperation(_manager, _updateCmd, _updateMetric));
+ new UpdateOperation(_manager, _bucketSpace, _updateCmd, _updateMetric));
IntermediateMessageSender intermediate(
_sentMessageMap, updateOperation, sender);
@@ -189,7 +192,7 @@ TwoPhaseUpdateOperation::startSafePathUpdate(DistributorMessageSender& sender)
"[all]"));
copyMessageSettings(*_updateCmd, *get);
std::shared_ptr<GetOperation> getOperation(
- std::make_shared<GetOperation>(_manager, get, _getMetric));
+ std::make_shared<GetOperation>(_manager, _bucketSpace, get, _getMetric));
IntermediateMessageSender intermediate(
_sentMessageMap, getOperation, sender);
@@ -257,7 +260,7 @@ TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(
new api::PutCommand(bucket, doc, putTimestamp));
copyMessageSettings(*_updateCmd, *put);
std::shared_ptr<PutOperation> putOperation(
- new PutOperation(_manager, put, _putMetric));
+ new PutOperation(_manager, _bucketSpace, put, _putMetric));
IntermediateMessageSender intermediate(
_sentMessageMap, putOperation, sender);
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
index 820dce051eb..e3fb6c93a3a 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
@@ -21,6 +21,8 @@ class CreateBucketReply;
namespace distributor {
+class DistributorBucketSpace;
+
/*
* General functional outline:
*
@@ -49,6 +51,7 @@ class TwoPhaseUpdateOperation : public SequencedOperation
{
public:
TwoPhaseUpdateOperation(DistributorComponent& manager,
+ DistributorBucketSpace &bucketSpace,
const std::shared_ptr<api::UpdateCommand> & msg,
DistributorMetricSet& metrics,
SequencingHandle sequencingHandle = SequencingHandle());
@@ -124,6 +127,7 @@ private:
std::shared_ptr<api::UpdateCommand> _updateCmd;
std::shared_ptr<api::StorageReply> _updateReply;
DistributorComponent& _manager;
+ DistributorBucketSpace &_bucketSpace;
SentMessageMap _sentMessageMap;
SendState _sendState;
Mode _mode;
diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
index 89bff0d1382..d622c42b321 100644
--- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
@@ -5,6 +5,7 @@
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/document/fieldvalue/document.h>
+#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/log/log.h>
LOG_SETUP(".distributor.callback.doc.update");
@@ -15,6 +16,7 @@ using namespace storage;
using document::BucketSpace;
UpdateOperation::UpdateOperation(DistributorComponent& manager,
+ DistributorBucketSpace &bucketSpace,
const std::shared_ptr<api::UpdateCommand> & msg,
PersistenceOperationMetricSet& metric)
: Operation(),
@@ -24,7 +26,8 @@ UpdateOperation::UpdateOperation(DistributorComponent& manager,
msg->getTimestamp()),
_tracker(_trackerInstance),
_msg(msg),
- _manager(manager)
+ _manager(manager),
+ _bucketSpace(bucketSpace)
{
}
@@ -69,7 +72,7 @@ UpdateOperation::onStart(DistributorMessageSender& sender)
_msg->getDocumentId()));
std::vector<BucketDatabase::Entry> entries;
- _manager.getBucketDatabase().getParents(bucketId, entries);
+ _bucketSpace.getBucketDatabase().getParents(bucketId, entries);
if (entries.empty()) {
_tracker.fail(sender,
diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
index 802927918ca..a23fd2ab876 100644
--- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
@@ -17,10 +17,13 @@ class CreateBucketReply;
namespace distributor {
+class DistributorBucketSpace;
+
class UpdateOperation : public Operation
{
public:
UpdateOperation(DistributorComponent& manager,
+ DistributorBucketSpace &bucketSpace,
const std::shared_ptr<api::UpdateCommand> & msg,
PersistenceOperationMetricSet& metric);
@@ -40,6 +43,7 @@ private:
std::shared_ptr<api::UpdateCommand> _msg;
DistributorComponent& _manager;
+ DistributorBucketSpace &_bucketSpace;
std::pair<document::BucketId, uint16_t> _newestTimestampLocation;
bool anyStorageNodesAvailable() const;