aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi/src/vespa
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-01-05 14:50:46 +0100
committerJon Marius Venstad <venstad@gmail.com>2021-01-05 14:50:46 +0100
commitbe5ea0ad39c15c13fb85a70d9990165499a92896 (patch)
treee92462f5f130fa68f40175ec7d987c661dd9ae0f /documentapi/src/vespa
parent6382cb8513ab166e4e4184e0ddebd60f97fb6bb3 (diff)
Revert "Revert "Jonmv/remove storage policy""
This reverts commit 75b2e4c11ea6463c335f1c77dab3fdb5493e5600.
Diffstat (limited to 'documentapi/src/vespa')
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp6
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt1
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp244
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h53
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp257
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h63
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp18
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h4
8 files changed, 297 insertions, 349 deletions
diff --git a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp
index 560f2f28f0e..0f86eb38aca 100644
--- a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp
@@ -32,14 +32,14 @@ DocumentProtocol::DocumentProtocol(std::shared_ptr<const DocumentTypeRepo> repo,
// When adding factories to this list, please KEEP THEM ORDERED alphabetically like they are now.
putRoutingPolicyFactory("AND", std::make_shared<RoutingPolicyFactories::AndPolicyFactory>());
putRoutingPolicyFactory("Content", std::make_shared<RoutingPolicyFactories::ContentPolicyFactory>());
- putRoutingPolicyFactory("MessageType", std::make_shared<RoutingPolicyFactories::MessageTypePolicyFactory>());
+ putRoutingPolicyFactory("Storage", std::make_shared<RoutingPolicyFactories::ContentPolicyFactory>()); // TODO Vespa 8: remove
putRoutingPolicyFactory("DocumentRouteSelector", std::make_shared<RoutingPolicyFactories::DocumentRouteSelectorPolicyFactory>(*_repo, cfg));
putRoutingPolicyFactory("Extern", std::make_shared<RoutingPolicyFactories::ExternPolicyFactory>());
+ putRoutingPolicyFactory("LoadBalancer", std::make_shared<RoutingPolicyFactories::LoadBalancerPolicyFactory>());
putRoutingPolicyFactory("LocalService", std::make_shared<RoutingPolicyFactories::LocalServicePolicyFactory>());
+ putRoutingPolicyFactory("MessageType", std::make_shared<RoutingPolicyFactories::MessageTypePolicyFactory>());
putRoutingPolicyFactory("RoundRobin", std::make_shared<RoutingPolicyFactories::RoundRobinPolicyFactory>());
- putRoutingPolicyFactory("Storage", std::make_shared<RoutingPolicyFactories::StoragePolicyFactory>());
putRoutingPolicyFactory("SubsetService", std::make_shared<RoutingPolicyFactories::SubsetServicePolicyFactory>());
- putRoutingPolicyFactory("LoadBalancer", std::make_shared<RoutingPolicyFactories::LoadBalancerPolicyFactory>());
// Prepare version specifications to use when adding routable factories.
vespalib::VersionSpecification version6(6, 221);
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt
index 26d51e702e9..83e1df02a24 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt
@@ -3,7 +3,6 @@ vespa_add_library(documentapi_documentapipolicies OBJECT
SOURCES
andpolicy.cpp
externslobrokpolicy.cpp
- storagepolicy.cpp
contentpolicy.cpp
messagetypepolicy.cpp
documentrouteselectorpolicy.cpp
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp
index aea393a60af..7150794653f 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp
@@ -1,12 +1,79 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "contentpolicy.h"
+#include <vespa/document/base/documentid.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/messagebus/emptyreply.h>
+#include <vespa/messagebus/error.h>
+#include <vespa/documentapi/documentapi.h>
+#include <vespa/vdslib/state/clusterstate.h>
+#include <vespa/vespalib/stllike/asciistream.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/config-stor-distribution.h>
+#include <vespa/config/subscription/configuri.h>
+#include <cassert>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".contentpolicy");
+
+using vespalib::make_string;
namespace documentapi {
ContentPolicy::ContentPolicy(const string& param)
- : StoragePolicy(param)
-{ }
+ : ExternSlobrokPolicy(parse(param)),
+ _bucketIdFactory()
+{
+ std::map<string, string> params(parse(param));
+
+ if (params.find("cluster") != params.end()) {
+ _clusterName = params.find("cluster")->second;
+ } else {
+ _error = "Required parameter clustername not set";
+ }
+
+ if (params.find("clusterconfigid") != params.end()) {
+ _clusterConfigId = params.find("clusterconfigid")->second;
+ }
+}
+
+namespace {
+ class CallBack : public config::IFetcherCallback<storage::lib::Distribution::DistributionConfig>
+ {
+ public:
+ CallBack(ContentPolicy & policy) : _policy(policy) { }
+ void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config) override {
+ _policy.configure(std::move(config));
+ }
+ private:
+ ContentPolicy & _policy;
+ };
+}
+string ContentPolicy::init()
+{
+ string error = ExternSlobrokPolicy::init();
+ if (!error.empty()) {
+ return error;
+ }
+
+ if (_clusterConfigId.empty()) {
+ _clusterConfigId = createConfigId(_clusterName);
+ }
+
+ using storage::lib::Distribution;
+ config::ConfigUri uri(_clusterConfigId);
+ if (!_configSources.empty()) {
+ _configFetcher.reset(new config::ConfigFetcher(config::ServerSpec(_configSources)));
+ } else {
+ _configFetcher.reset(new config::ConfigFetcher(uri.getContext()));
+ }
+ _callBack = std::make_unique<CallBack>(*this);
+ _configFetcher->subscribe<vespa::config::content::StorDistributionConfig>(uri.getConfigId(), static_cast<CallBack *>(_callBack.get()));
+ _configFetcher->start();
+ return "";
+}
+
+ContentPolicy::~ContentPolicy() = default;
string
ContentPolicy::createConfigId(const string & clusterName) const
@@ -14,4 +81,177 @@ ContentPolicy::createConfigId(const string & clusterName) const
return clusterName;
}
+string
+ContentPolicy::createPattern(const string & clusterName, int distributor) const
+{
+ vespalib::asciistream ost;
+
+ ost << "storage/cluster." << clusterName << "/distributor/";
+
+ if (distributor == -1) {
+ ost << '*';
+ } else {
+ ost << distributor;
+ }
+ ost << "/default";
+ return ost.str();
+}
+
+void
+ContentPolicy::configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config)
+{
+ try {
+ _nextDistribution = std::make_unique<storage::lib::Distribution>(*config);
+ } catch (const std::exception& e) {
+ LOG(warning, "Got exception when configuring distribution, config id was %s", _clusterConfigId.c_str());
+ throw e;
+ }
+}
+
+void
+ContentPolicy::doSelect(mbus::RoutingContext &context)
+{
+ const mbus::Message &msg = context.getMessage();
+
+ int distributor = -1;
+
+ if (_state.get()) {
+ document::BucketId id;
+ switch(msg.getType()) {
+ case DocumentProtocol::MESSAGE_PUTDOCUMENT:
+ id = _bucketIdFactory.getBucketId(static_cast<const PutDocumentMessage&>(msg).getDocument().getId());
+ break;
+
+ case DocumentProtocol::MESSAGE_GETDOCUMENT:
+ id = _bucketIdFactory.getBucketId(static_cast<const GetDocumentMessage&>(msg).getDocumentId());
+ break;
+
+ case DocumentProtocol::MESSAGE_REMOVEDOCUMENT:
+ id = _bucketIdFactory.getBucketId(static_cast<const RemoveDocumentMessage&>(msg).getDocumentId());
+ break;
+
+ case DocumentProtocol::MESSAGE_UPDATEDOCUMENT:
+ id = _bucketIdFactory.getBucketId(static_cast<const UpdateDocumentMessage&>(msg).getDocumentUpdate().getId());
+ break;
+
+ case DocumentProtocol::MESSAGE_STATBUCKET:
+ id = static_cast<const StatBucketMessage&>(msg).getBucketId();
+ break;
+
+ case DocumentProtocol::MESSAGE_GETBUCKETLIST:
+ id = static_cast<const GetBucketListMessage&>(msg).getBucketId();
+ break;
+
+ case DocumentProtocol::MESSAGE_CREATEVISITOR:
+ id = static_cast<const CreateVisitorMessage&>(msg).getBuckets()[0];
+ break;
+
+ case DocumentProtocol::MESSAGE_REMOVELOCATION:
+ id = static_cast<const RemoveLocationMessage&>(msg).getBucketId();
+ break;
+
+ default:
+ LOG(error, "Message type '%d' not supported.", msg.getType());
+ return;
+ }
+
+ // _P_A_R_A_N_O_I_A_
+ if (id.getRawId() == 0) {
+ mbus::Reply::UP reply(new mbus::EmptyReply());
+ reply->addError(mbus::Error(mbus::ErrorCode::APP_FATAL_ERROR,
+ "No bucket id available in message."));
+ context.setReply(std::move(reply));
+ return;
+ }
+
+ // Pick a distributor using ideal state algorithm
+ try {
+ // Update distribution here, to make it not take lock in average case
+ if (_nextDistribution) {
+ _distribution = std::move(_nextDistribution);
+ _nextDistribution.reset();
+ }
+ assert(_distribution.get());
+ distributor = _distribution->getIdealDistributorNode(*_state, id);
+ } catch (storage::lib::TooFewBucketBitsInUseException& e) {
+ auto reply = std::make_unique<WrongDistributionReply>(_state->toString());
+ reply->addError(mbus::Error(
+ DocumentProtocol::ERROR_WRONG_DISTRIBUTION,
+ "Too few distribution bits used for given cluster state"));
+ context.setReply(std::move(reply));
+ return;
+ } catch (storage::lib::NoDistributorsAvailableException& e) {
+ // No distributors available in current cluster state. Remove
+ // cluster state we cannot use and send to random target
+ _state.reset();
+ distributor = -1;
+ }
+ }
+
+ mbus::Hop hop = getRecipient(context, distributor);
+
+ if (distributor != -1 && !hop.hasDirectives()) {
+ hop = getRecipient(context, -1);
+ }
+
+ if (hop.hasDirectives()) {
+ mbus::Route route = context.getRoute();
+ route.setHop(0, hop);
+ context.addChild(route);
+ } else {
+ context.setError(
+ mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE,
+ make_string("Could not resolve a distributor to send to in cluster %s", _clusterName.c_str()));
+ }
+}
+
+mbus::Hop
+ContentPolicy::getRecipient(mbus::RoutingContext& context, int distributor)
+{
+ slobrok::api::IMirrorAPI::SpecList entries = lookup(context, createPattern(_clusterName, distributor));
+
+ if (!entries.empty()) {
+ return mbus::Hop::parse(entries[random() % entries.size()].second + "/default");
+ }
+
+ return mbus::Hop();
+}
+
+void
+ContentPolicy::merge(mbus::RoutingContext &context)
+{
+ mbus::RoutingNodeIterator it = context.getChildIterator();
+ mbus::Reply::UP reply = it.removeReply();
+
+ if (reply->getType() == DocumentProtocol::REPLY_WRONGDISTRIBUTION) {
+ updateStateFromReply(static_cast<WrongDistributionReply&>(*reply));
+ } else if (reply->hasErrors()) {
+ _state.reset();
+ }
+
+ context.setReply(std::move(reply));
+}
+
+void
+ContentPolicy::updateStateFromReply(WrongDistributionReply& wdr)
+{
+ std::unique_ptr<storage::lib::ClusterState> newState(
+ new storage::lib::ClusterState(wdr.getSystemState()));
+ if (!_state || newState->getVersion() >= _state->getVersion()) {
+ if (_state) {
+ wdr.getTrace().trace(1, make_string("System state changed from version %u to %u",
+ _state->getVersion(), newState->getVersion()));
+ } else {
+ wdr.getTrace().trace(1, make_string("System state set to version %u", newState->getVersion()));
+ }
+
+ _state = std::move(newState);
+ } else {
+ wdr.getTrace().trace(1, make_string("System state cleared because system state returned had version %d, "
+ "while old state had version %d. New states should not have a lower version than the old.",
+ newState->getVersion(), _state->getVersion()));
+ _state.reset();
+ }
+}
+
} // documentapi
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h
index 4b2f356c740..e29fbb75524 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h
@@ -1,17 +1,62 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include "storagepolicy.h"
+#include "externslobrokpolicy.h"
+#include <vespa/documentapi/messagebus/messages/wrongdistributionreply.h>
+#include <vespa/vdslib/distribution/distribution.h>
+#include <vespa/document/bucket/bucketidfactory.h>
+#include <vespa/messagebus/routing/hop.h>
+#include <vespa/config/helper/ifetchercallback.h>
+#include <vespa/config/helper/configfetcher.h>
+
+namespace config {
+ class ICallback;
+ class ConfigFetcher;
+}
+
+namespace storage {
+namespace lib {
+ class Distribution;
+ class ClusterState;
+}
+}
namespace documentapi {
-class ContentPolicy : public StoragePolicy
+class ContentPolicy : public ExternSlobrokPolicy
{
+private:
+ document::BucketIdFactory _bucketIdFactory;
+ std::unique_ptr<storage::lib::ClusterState> _state;
+ string _clusterName;
+ string _clusterConfigId;
+ std::unique_ptr<config::ICallback> _callBack;
+ std::unique_ptr<config::ConfigFetcher> _configFetcher;
+ std::unique_ptr<storage::lib::Distribution> _distribution;
+ std::unique_ptr<storage::lib::Distribution> _nextDistribution;
+
+ mbus::Hop getRecipient(mbus::RoutingContext& context, int distributor);
+
public:
ContentPolicy(const string& param);
+ ~ContentPolicy();
+ void doSelect(mbus::RoutingContext &context) override;
+ void merge(mbus::RoutingContext &context) override;
+
+ void updateStateFromReply(WrongDistributionReply& reply);
+
+ /**
+ * @return a pointer to the system state registered with this policy. If
+ * we haven't received a system state yet, returns NULL.
+ */
+ const storage::lib::ClusterState* getSystemState() const { return _state.get(); }
+
+ virtual void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config);
+ string init() override;
+
private:
- string createConfigId(const string & clusterName) const override;
+ string createConfigId(const string & clusterName) const;
+ string createPattern(const string & clusterName, int distributor) const;
};
}
-
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp
deleted file mode 100644
index 3fc1df0352a..00000000000
--- a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp
+++ /dev/null
@@ -1,257 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "storagepolicy.h"
-#include <vespa/document/base/documentid.h>
-#include <vespa/document/update/documentupdate.h>
-#include <vespa/messagebus/emptyreply.h>
-#include <vespa/messagebus/error.h>
-#include <vespa/documentapi/documentapi.h>
-#include <vespa/vdslib/state/clusterstate.h>
-#include <vespa/vespalib/stllike/asciistream.h>
-#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/config-stor-distribution.h>
-#include <vespa/config/subscription/configuri.h>
-#include <cassert>
-
-#include <vespa/log/log.h>
-LOG_SETUP(".storagepolicy");
-
-using vespalib::make_string;
-
-namespace documentapi {
-
-StoragePolicy::StoragePolicy(const string& param)
- : ExternSlobrokPolicy(parse(param)),
- _bucketIdFactory()
-{
- std::map<string, string> params(parse(param));
-
- if (params.find("cluster") != params.end()) {
- _clusterName = params.find("cluster")->second;
- } else {
- _error = "Required parameter clustername not set";
- }
-
- if (params.find("clusterconfigid") != params.end()) {
- _clusterConfigId = params.find("clusterconfigid")->second;
- }
-}
-
-namespace {
- class CallBack : public config::IFetcherCallback<storage::lib::Distribution::DistributionConfig>
- {
- public:
- CallBack(StoragePolicy & policy) : _policy(policy) { }
- void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config) override {
- _policy.configure(std::move(config));
- }
- private:
- StoragePolicy & _policy;
- };
-}
-string StoragePolicy::init()
-{
- string error = ExternSlobrokPolicy::init();
- if (!error.empty()) {
- return error;
- }
-
- if (_clusterConfigId.empty()) {
- _clusterConfigId = createConfigId(_clusterName);
- }
-
- using storage::lib::Distribution;
- config::ConfigUri uri(_clusterConfigId);
- if (!_configSources.empty()) {
- _configFetcher.reset(new config::ConfigFetcher(config::ServerSpec(_configSources)));
- } else {
- _configFetcher.reset(new config::ConfigFetcher(uri.getContext()));
- }
- _callBack = std::make_unique<CallBack>(*this);
- _configFetcher->subscribe<vespa::config::content::StorDistributionConfig>(uri.getConfigId(), static_cast<CallBack *>(_callBack.get()));
- _configFetcher->start();
- return "";
-}
-
-StoragePolicy::~StoragePolicy() = default;
-
-string
-StoragePolicy::createConfigId(const string & clusterName) const
-{
- return "storage/cluster." + clusterName;
-}
-
-string
-StoragePolicy::createPattern(const string & clusterName, int distributor) const
-{
- vespalib::asciistream ost;
-
- ost << "storage/cluster." << clusterName << "/distributor/";
-
- if (distributor == -1) {
- ost << '*';
- } else {
- ost << distributor;
- }
- ost << "/default";
- return ost.str();
-}
-
-void
-StoragePolicy::configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config)
-{
- try {
- _nextDistribution = std::make_unique<storage::lib::Distribution>(*config);
- } catch (const std::exception& e) {
- LOG(warning, "Got exception when configuring distribution, config id was %s", _clusterConfigId.c_str());
- throw e;
- }
-}
-
-void
-StoragePolicy::doSelect(mbus::RoutingContext &context)
-{
- const mbus::Message &msg = context.getMessage();
-
- int distributor = -1;
-
- if (_state.get()) {
- document::BucketId id;
- switch(msg.getType()) {
- case DocumentProtocol::MESSAGE_PUTDOCUMENT:
- id = _bucketIdFactory.getBucketId(static_cast<const PutDocumentMessage&>(msg).getDocument().getId());
- break;
-
- case DocumentProtocol::MESSAGE_GETDOCUMENT:
- id = _bucketIdFactory.getBucketId(static_cast<const GetDocumentMessage&>(msg).getDocumentId());
- break;
-
- case DocumentProtocol::MESSAGE_REMOVEDOCUMENT:
- id = _bucketIdFactory.getBucketId(static_cast<const RemoveDocumentMessage&>(msg).getDocumentId());
- break;
-
- case DocumentProtocol::MESSAGE_UPDATEDOCUMENT:
- id = _bucketIdFactory.getBucketId(static_cast<const UpdateDocumentMessage&>(msg).getDocumentUpdate().getId());
- break;
-
- case DocumentProtocol::MESSAGE_STATBUCKET:
- id = static_cast<const StatBucketMessage&>(msg).getBucketId();
- break;
-
- case DocumentProtocol::MESSAGE_GETBUCKETLIST:
- id = static_cast<const GetBucketListMessage&>(msg).getBucketId();
- break;
-
- case DocumentProtocol::MESSAGE_CREATEVISITOR:
- id = static_cast<const CreateVisitorMessage&>(msg).getBuckets()[0];
- break;
-
- case DocumentProtocol::MESSAGE_REMOVELOCATION:
- id = static_cast<const RemoveLocationMessage&>(msg).getBucketId();
- break;
-
- default:
- LOG(error, "Message type '%d' not supported.", msg.getType());
- return;
- }
-
- // _P_A_R_A_N_O_I_A_
- if (id.getRawId() == 0) {
- mbus::Reply::UP reply(new mbus::EmptyReply());
- reply->addError(mbus::Error(mbus::ErrorCode::APP_FATAL_ERROR,
- "No bucket id available in message."));
- context.setReply(std::move(reply));
- return;
- }
-
- // Pick a distributor using ideal state algorithm
- try {
- // Update distribution here, to make it not take lock in average case
- if (_nextDistribution) {
- _distribution = std::move(_nextDistribution);
- _nextDistribution.reset();
- }
- assert(_distribution.get());
- distributor = _distribution->getIdealDistributorNode(*_state, id);
- } catch (storage::lib::TooFewBucketBitsInUseException& e) {
- auto reply = std::make_unique<WrongDistributionReply>(_state->toString());
- reply->addError(mbus::Error(
- DocumentProtocol::ERROR_WRONG_DISTRIBUTION,
- "Too few distribution bits used for given cluster state"));
- context.setReply(std::move(reply));
- return;
- } catch (storage::lib::NoDistributorsAvailableException& e) {
- // No distributors available in current cluster state. Remove
- // cluster state we cannot use and send to random target
- _state.reset();
- distributor = -1;
- }
- }
-
- mbus::Hop hop = getRecipient(context, distributor);
-
- if (distributor != -1 && !hop.hasDirectives()) {
- hop = getRecipient(context, -1);
- }
-
- if (hop.hasDirectives()) {
- mbus::Route route = context.getRoute();
- route.setHop(0, hop);
- context.addChild(route);
- } else {
- context.setError(
- mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE,
- make_string("Could not resolve a distributor to send to in cluster %s", _clusterName.c_str()));
- }
-}
-
-mbus::Hop
-StoragePolicy::getRecipient(mbus::RoutingContext& context, int distributor)
-{
- slobrok::api::IMirrorAPI::SpecList entries = lookup(context, createPattern(_clusterName, distributor));
-
- if (!entries.empty()) {
- return mbus::Hop::parse(entries[random() % entries.size()].second + "/default");
- }
-
- return mbus::Hop();
-}
-
-void
-StoragePolicy::merge(mbus::RoutingContext &context)
-{
- mbus::RoutingNodeIterator it = context.getChildIterator();
- mbus::Reply::UP reply = it.removeReply();
-
- if (reply->getType() == DocumentProtocol::REPLY_WRONGDISTRIBUTION) {
- updateStateFromReply(static_cast<WrongDistributionReply&>(*reply));
- } else if (reply->hasErrors()) {
- _state.reset();
- }
-
- context.setReply(std::move(reply));
-}
-
-void
-StoragePolicy::updateStateFromReply(WrongDistributionReply& wdr)
-{
- std::unique_ptr<storage::lib::ClusterState> newState(
- new storage::lib::ClusterState(wdr.getSystemState()));
- if (!_state || newState->getVersion() >= _state->getVersion()) {
- if (_state) {
- wdr.getTrace().trace(1, make_string("System state changed from version %u to %u",
- _state->getVersion(), newState->getVersion()));
- } else {
- wdr.getTrace().trace(1, make_string("System state set to version %u", newState->getVersion()));
- }
-
- _state = std::move(newState);
- } else {
- wdr.getTrace().trace(1, make_string("System state cleared because system state returned had version %d, "
- "while old state had version %d. New states should not have a lower version than the old.",
- newState->getVersion(), _state->getVersion()));
- _state.reset();
- }
-}
-
-} // documentapi
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h
deleted file mode 100644
index 5cd2efcbbd3..00000000000
--- a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h
+++ /dev/null
@@ -1,63 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include "externslobrokpolicy.h"
-#include <vespa/documentapi/messagebus/messages/wrongdistributionreply.h>
-#include <vespa/vdslib/distribution/distribution.h>
-#include <vespa/document/bucket/bucketidfactory.h>
-#include <vespa/messagebus/routing/hop.h>
-#include <vespa/config/helper/ifetchercallback.h>
-#include <vespa/config/helper/configfetcher.h>
-
-namespace config {
- class ICallback;
- class ConfigFetcher;
-}
-
-namespace storage {
-namespace lib {
- class Distribution;
- class ClusterState;
-}
-}
-
-namespace documentapi {
-
-class StoragePolicy : public ExternSlobrokPolicy
-{
-private:
- document::BucketIdFactory _bucketIdFactory;
- std::unique_ptr<storage::lib::ClusterState> _state;
- string _clusterName;
- string _clusterConfigId;
- std::unique_ptr<config::ICallback> _callBack;
- std::unique_ptr<config::ConfigFetcher> _configFetcher;
- std::unique_ptr<storage::lib::Distribution> _distribution;
- std::unique_ptr<storage::lib::Distribution> _nextDistribution;
-
- mbus::Hop getRecipient(mbus::RoutingContext& context, int distributor);
-
-public:
- StoragePolicy(const string& param);
- ~StoragePolicy();
- void doSelect(mbus::RoutingContext &context) override;
- void merge(mbus::RoutingContext &context) override;
-
- void updateStateFromReply(WrongDistributionReply& reply);
-
- /**
- * @return a pointer to the system state registered with this policy. If
- * we haven't received a system state yet, returns NULL.
- */
- const storage::lib::ClusterState* getSystemState() const { return _state.get(); }
-
- virtual void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config);
- string init() override;
-
-private:
- virtual string createConfigId(const string & clusterName) const;
- string createPattern(const string & clusterName, int distributor) const;
-};
-
-}
-
diff --git a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp
index 2c244c63046..f945fe8cd02 100644
--- a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp
@@ -1,16 +1,15 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "routingpolicyfactories.h"
#include <vespa/documentapi/messagebus/policies/andpolicy.h>
+#include <vespa/documentapi/messagebus/policies/contentpolicy.h>
#include <vespa/documentapi/messagebus/policies/documentrouteselectorpolicy.h>
#include <vespa/documentapi/messagebus/policies/errorpolicy.h>
#include <vespa/documentapi/messagebus/policies/externpolicy.h>
+#include <vespa/documentapi/messagebus/policies/loadbalancerpolicy.h>
#include <vespa/documentapi/messagebus/policies/localservicepolicy.h>
+#include <vespa/documentapi/messagebus/policies/messagetypepolicy.h>
#include <vespa/documentapi/messagebus/policies/roundrobinpolicy.h>
#include <vespa/documentapi/messagebus/policies/subsetservicepolicy.h>
-#include <vespa/documentapi/messagebus/policies/storagepolicy.h>
-#include <vespa/documentapi/messagebus/policies/contentpolicy.h>
-#include <vespa/documentapi/messagebus/policies/messagetypepolicy.h>
-#include <vespa/documentapi/messagebus/policies/loadbalancerpolicy.h>
using namespace documentapi;
@@ -21,17 +20,6 @@ RoutingPolicyFactories::AndPolicyFactory::createPolicy(const string &param) cons
}
mbus::IRoutingPolicy::UP
-RoutingPolicyFactories::StoragePolicyFactory::createPolicy(const string &param) const
-{
- mbus::IRoutingPolicy::UP ret(new StoragePolicy(param));
- string error = static_cast<StoragePolicy&>(*ret).getError();
- if (!error.empty()) {
- ret.reset(new ErrorPolicy(error));
- }
- return ret;
-}
-
-mbus::IRoutingPolicy::UP
RoutingPolicyFactories::MessageTypePolicyFactory::createPolicy(const string &param) const
{
return mbus::IRoutingPolicy::UP(new MessageTypePolicy(param));
diff --git a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h
index e2bf5119c58..533ad93e644 100644
--- a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h
+++ b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h
@@ -16,10 +16,6 @@ public:
public:
mbus::IRoutingPolicy::UP createPolicy(const string &param) const override;
};
- class StoragePolicyFactory : public IRoutingPolicyFactory {
- public:
- mbus::IRoutingPolicy::UP createPolicy(const string &param) const override;
- };
class MessageTypePolicyFactory : public IRoutingPolicyFactory {
public:
mbus::IRoutingPolicy::UP createPolicy(const string &param) const override;