diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-01-05 14:50:46 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-01-05 14:50:46 +0100 |
commit | be5ea0ad39c15c13fb85a70d9990165499a92896 (patch) | |
tree | e92462f5f130fa68f40175ec7d987c661dd9ae0f /documentapi/src/vespa | |
parent | 6382cb8513ab166e4e4184e0ddebd60f97fb6bb3 (diff) |
Revert "Revert "Jonmv/remove storage policy""
This reverts commit 75b2e4c11ea6463c335f1c77dab3fdb5493e5600.
Diffstat (limited to 'documentapi/src/vespa')
8 files changed, 297 insertions, 349 deletions
diff --git a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp index 560f2f28f0e..0f86eb38aca 100644 --- a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp @@ -32,14 +32,14 @@ DocumentProtocol::DocumentProtocol(std::shared_ptr<const DocumentTypeRepo> repo, // When adding factories to this list, please KEEP THEM ORDERED alphabetically like they are now. putRoutingPolicyFactory("AND", std::make_shared<RoutingPolicyFactories::AndPolicyFactory>()); putRoutingPolicyFactory("Content", std::make_shared<RoutingPolicyFactories::ContentPolicyFactory>()); - putRoutingPolicyFactory("MessageType", std::make_shared<RoutingPolicyFactories::MessageTypePolicyFactory>()); + putRoutingPolicyFactory("Storage", std::make_shared<RoutingPolicyFactories::ContentPolicyFactory>()); // TODO Vespa 8: remove putRoutingPolicyFactory("DocumentRouteSelector", std::make_shared<RoutingPolicyFactories::DocumentRouteSelectorPolicyFactory>(*_repo, cfg)); putRoutingPolicyFactory("Extern", std::make_shared<RoutingPolicyFactories::ExternPolicyFactory>()); + putRoutingPolicyFactory("LoadBalancer", std::make_shared<RoutingPolicyFactories::LoadBalancerPolicyFactory>()); putRoutingPolicyFactory("LocalService", std::make_shared<RoutingPolicyFactories::LocalServicePolicyFactory>()); + putRoutingPolicyFactory("MessageType", std::make_shared<RoutingPolicyFactories::MessageTypePolicyFactory>()); putRoutingPolicyFactory("RoundRobin", std::make_shared<RoutingPolicyFactories::RoundRobinPolicyFactory>()); - putRoutingPolicyFactory("Storage", std::make_shared<RoutingPolicyFactories::StoragePolicyFactory>()); putRoutingPolicyFactory("SubsetService", std::make_shared<RoutingPolicyFactories::SubsetServicePolicyFactory>()); - putRoutingPolicyFactory("LoadBalancer", std::make_shared<RoutingPolicyFactories::LoadBalancerPolicyFactory>()); // Prepare version specifications to use when adding routable factories. vespalib::VersionSpecification version6(6, 221); diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt index 26d51e702e9..83e1df02a24 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt +++ b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt @@ -3,7 +3,6 @@ vespa_add_library(documentapi_documentapipolicies OBJECT SOURCES andpolicy.cpp externslobrokpolicy.cpp - storagepolicy.cpp contentpolicy.cpp messagetypepolicy.cpp documentrouteselectorpolicy.cpp diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp index aea393a60af..7150794653f 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp @@ -1,12 +1,79 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "contentpolicy.h" +#include <vespa/document/base/documentid.h> +#include <vespa/document/update/documentupdate.h> +#include <vespa/messagebus/emptyreply.h> +#include <vespa/messagebus/error.h> +#include <vespa/documentapi/documentapi.h> +#include <vespa/vdslib/state/clusterstate.h> +#include <vespa/vespalib/stllike/asciistream.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <vespa/config-stor-distribution.h> +#include <vespa/config/subscription/configuri.h> +#include <cassert> + +#include <vespa/log/log.h> +LOG_SETUP(".contentpolicy"); + +using vespalib::make_string; namespace documentapi { ContentPolicy::ContentPolicy(const string& param) - : StoragePolicy(param) -{ } + : ExternSlobrokPolicy(parse(param)), + _bucketIdFactory() +{ + std::map<string, string> params(parse(param)); + + if (params.find("cluster") != params.end()) { + _clusterName = params.find("cluster")->second; + } else { + _error = "Required parameter clustername not set"; + } + + if (params.find("clusterconfigid") != params.end()) { + _clusterConfigId = params.find("clusterconfigid")->second; + } +} + +namespace { + class CallBack : public config::IFetcherCallback<storage::lib::Distribution::DistributionConfig> + { + public: + CallBack(ContentPolicy & policy) : _policy(policy) { } + void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config) override { + _policy.configure(std::move(config)); + } + private: + ContentPolicy & _policy; + }; +} +string ContentPolicy::init() +{ + string error = ExternSlobrokPolicy::init(); + if (!error.empty()) { + return error; + } + + if (_clusterConfigId.empty()) { + _clusterConfigId = createConfigId(_clusterName); + } + + using storage::lib::Distribution; + config::ConfigUri uri(_clusterConfigId); + if (!_configSources.empty()) { + _configFetcher.reset(new config::ConfigFetcher(config::ServerSpec(_configSources))); + } else { + _configFetcher.reset(new config::ConfigFetcher(uri.getContext())); + } + _callBack = std::make_unique<CallBack>(*this); + _configFetcher->subscribe<vespa::config::content::StorDistributionConfig>(uri.getConfigId(), static_cast<CallBack *>(_callBack.get())); + _configFetcher->start(); + return ""; +} + +ContentPolicy::~ContentPolicy() = default; string ContentPolicy::createConfigId(const string & clusterName) const @@ -14,4 +81,177 @@ ContentPolicy::createConfigId(const string & clusterName) const return clusterName; } +string +ContentPolicy::createPattern(const string & clusterName, int distributor) const +{ + vespalib::asciistream ost; + + ost << "storage/cluster." << clusterName << "/distributor/"; + + if (distributor == -1) { + ost << '*'; + } else { + ost << distributor; + } + ost << "/default"; + return ost.str(); +} + +void +ContentPolicy::configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config) +{ + try { + _nextDistribution = std::make_unique<storage::lib::Distribution>(*config); + } catch (const std::exception& e) { + LOG(warning, "Got exception when configuring distribution, config id was %s", _clusterConfigId.c_str()); + throw e; + } +} + +void +ContentPolicy::doSelect(mbus::RoutingContext &context) +{ + const mbus::Message &msg = context.getMessage(); + + int distributor = -1; + + if (_state.get()) { + document::BucketId id; + switch(msg.getType()) { + case DocumentProtocol::MESSAGE_PUTDOCUMENT: + id = _bucketIdFactory.getBucketId(static_cast<const PutDocumentMessage&>(msg).getDocument().getId()); + break; + + case DocumentProtocol::MESSAGE_GETDOCUMENT: + id = _bucketIdFactory.getBucketId(static_cast<const GetDocumentMessage&>(msg).getDocumentId()); + break; + + case DocumentProtocol::MESSAGE_REMOVEDOCUMENT: + id = _bucketIdFactory.getBucketId(static_cast<const RemoveDocumentMessage&>(msg).getDocumentId()); + break; + + case DocumentProtocol::MESSAGE_UPDATEDOCUMENT: + id = _bucketIdFactory.getBucketId(static_cast<const UpdateDocumentMessage&>(msg).getDocumentUpdate().getId()); + break; + + case DocumentProtocol::MESSAGE_STATBUCKET: + id = static_cast<const StatBucketMessage&>(msg).getBucketId(); + break; + + case DocumentProtocol::MESSAGE_GETBUCKETLIST: + id = static_cast<const GetBucketListMessage&>(msg).getBucketId(); + break; + + case DocumentProtocol::MESSAGE_CREATEVISITOR: + id = static_cast<const CreateVisitorMessage&>(msg).getBuckets()[0]; + break; + + case DocumentProtocol::MESSAGE_REMOVELOCATION: + id = static_cast<const RemoveLocationMessage&>(msg).getBucketId(); + break; + + default: + LOG(error, "Message type '%d' not supported.", msg.getType()); + return; + } + + // _P_A_R_A_N_O_I_A_ + if (id.getRawId() == 0) { + mbus::Reply::UP reply(new mbus::EmptyReply()); + reply->addError(mbus::Error(mbus::ErrorCode::APP_FATAL_ERROR, + "No bucket id available in message.")); + context.setReply(std::move(reply)); + return; + } + + // Pick a distributor using ideal state algorithm + try { + // Update distribution here, to make it not take lock in average case + if (_nextDistribution) { + _distribution = std::move(_nextDistribution); + _nextDistribution.reset(); + } + assert(_distribution.get()); + distributor = _distribution->getIdealDistributorNode(*_state, id); + } catch (storage::lib::TooFewBucketBitsInUseException& e) { + auto reply = std::make_unique<WrongDistributionReply>(_state->toString()); + reply->addError(mbus::Error( + DocumentProtocol::ERROR_WRONG_DISTRIBUTION, + "Too few distribution bits used for given cluster state")); + context.setReply(std::move(reply)); + return; + } catch (storage::lib::NoDistributorsAvailableException& e) { + // No distributors available in current cluster state. Remove + // cluster state we cannot use and send to random target + _state.reset(); + distributor = -1; + } + } + + mbus::Hop hop = getRecipient(context, distributor); + + if (distributor != -1 && !hop.hasDirectives()) { + hop = getRecipient(context, -1); + } + + if (hop.hasDirectives()) { + mbus::Route route = context.getRoute(); + route.setHop(0, hop); + context.addChild(route); + } else { + context.setError( + mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE, + make_string("Could not resolve a distributor to send to in cluster %s", _clusterName.c_str())); + } +} + +mbus::Hop +ContentPolicy::getRecipient(mbus::RoutingContext& context, int distributor) +{ + slobrok::api::IMirrorAPI::SpecList entries = lookup(context, createPattern(_clusterName, distributor)); + + if (!entries.empty()) { + return mbus::Hop::parse(entries[random() % entries.size()].second + "/default"); + } + + return mbus::Hop(); +} + +void +ContentPolicy::merge(mbus::RoutingContext &context) +{ + mbus::RoutingNodeIterator it = context.getChildIterator(); + mbus::Reply::UP reply = it.removeReply(); + + if (reply->getType() == DocumentProtocol::REPLY_WRONGDISTRIBUTION) { + updateStateFromReply(static_cast<WrongDistributionReply&>(*reply)); + } else if (reply->hasErrors()) { + _state.reset(); + } + + context.setReply(std::move(reply)); +} + +void +ContentPolicy::updateStateFromReply(WrongDistributionReply& wdr) +{ + std::unique_ptr<storage::lib::ClusterState> newState( + new storage::lib::ClusterState(wdr.getSystemState())); + if (!_state || newState->getVersion() >= _state->getVersion()) { + if (_state) { + wdr.getTrace().trace(1, make_string("System state changed from version %u to %u", + _state->getVersion(), newState->getVersion())); + } else { + wdr.getTrace().trace(1, make_string("System state set to version %u", newState->getVersion())); + } + + _state = std::move(newState); + } else { + wdr.getTrace().trace(1, make_string("System state cleared because system state returned had version %d, " + "while old state had version %d. New states should not have a lower version than the old.", + newState->getVersion(), _state->getVersion())); + _state.reset(); + } +} + } // documentapi diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h index 4b2f356c740..e29fbb75524 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h +++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h @@ -1,17 +1,62 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "storagepolicy.h" +#include "externslobrokpolicy.h" +#include <vespa/documentapi/messagebus/messages/wrongdistributionreply.h> +#include <vespa/vdslib/distribution/distribution.h> +#include <vespa/document/bucket/bucketidfactory.h> +#include <vespa/messagebus/routing/hop.h> +#include <vespa/config/helper/ifetchercallback.h> +#include <vespa/config/helper/configfetcher.h> + +namespace config { + class ICallback; + class ConfigFetcher; +} + +namespace storage { +namespace lib { + class Distribution; + class ClusterState; +} +} namespace documentapi { -class ContentPolicy : public StoragePolicy +class ContentPolicy : public ExternSlobrokPolicy { +private: + document::BucketIdFactory _bucketIdFactory; + std::unique_ptr<storage::lib::ClusterState> _state; + string _clusterName; + string _clusterConfigId; + std::unique_ptr<config::ICallback> _callBack; + std::unique_ptr<config::ConfigFetcher> _configFetcher; + std::unique_ptr<storage::lib::Distribution> _distribution; + std::unique_ptr<storage::lib::Distribution> _nextDistribution; + + mbus::Hop getRecipient(mbus::RoutingContext& context, int distributor); + public: ContentPolicy(const string& param); + ~ContentPolicy(); + void doSelect(mbus::RoutingContext &context) override; + void merge(mbus::RoutingContext &context) override; + + void updateStateFromReply(WrongDistributionReply& reply); + + /** + * @return a pointer to the system state registered with this policy. If + * we haven't received a system state yet, returns NULL. + */ + const storage::lib::ClusterState* getSystemState() const { return _state.get(); } + + virtual void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config); + string init() override; + private: - string createConfigId(const string & clusterName) const override; + string createConfigId(const string & clusterName) const; + string createPattern(const string & clusterName, int distributor) const; }; } - diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp deleted file mode 100644 index 3fc1df0352a..00000000000 --- a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp +++ /dev/null @@ -1,257 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "storagepolicy.h" -#include <vespa/document/base/documentid.h> -#include <vespa/document/update/documentupdate.h> -#include <vespa/messagebus/emptyreply.h> -#include <vespa/messagebus/error.h> -#include <vespa/documentapi/documentapi.h> -#include <vespa/vdslib/state/clusterstate.h> -#include <vespa/vespalib/stllike/asciistream.h> -#include <vespa/vespalib/util/stringfmt.h> -#include <vespa/config-stor-distribution.h> -#include <vespa/config/subscription/configuri.h> -#include <cassert> - -#include <vespa/log/log.h> -LOG_SETUP(".storagepolicy"); - -using vespalib::make_string; - -namespace documentapi { - -StoragePolicy::StoragePolicy(const string& param) - : ExternSlobrokPolicy(parse(param)), - _bucketIdFactory() -{ - std::map<string, string> params(parse(param)); - - if (params.find("cluster") != params.end()) { - _clusterName = params.find("cluster")->second; - } else { - _error = "Required parameter clustername not set"; - } - - if (params.find("clusterconfigid") != params.end()) { - _clusterConfigId = params.find("clusterconfigid")->second; - } -} - -namespace { - class CallBack : public config::IFetcherCallback<storage::lib::Distribution::DistributionConfig> - { - public: - CallBack(StoragePolicy & policy) : _policy(policy) { } - void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config) override { - _policy.configure(std::move(config)); - } - private: - StoragePolicy & _policy; - }; -} -string StoragePolicy::init() -{ - string error = ExternSlobrokPolicy::init(); - if (!error.empty()) { - return error; - } - - if (_clusterConfigId.empty()) { - _clusterConfigId = createConfigId(_clusterName); - } - - using storage::lib::Distribution; - config::ConfigUri uri(_clusterConfigId); - if (!_configSources.empty()) { - _configFetcher.reset(new config::ConfigFetcher(config::ServerSpec(_configSources))); - } else { - _configFetcher.reset(new config::ConfigFetcher(uri.getContext())); - } - _callBack = std::make_unique<CallBack>(*this); - _configFetcher->subscribe<vespa::config::content::StorDistributionConfig>(uri.getConfigId(), static_cast<CallBack *>(_callBack.get())); - _configFetcher->start(); - return ""; -} - -StoragePolicy::~StoragePolicy() = default; - -string -StoragePolicy::createConfigId(const string & clusterName) const -{ - return "storage/cluster." + clusterName; -} - -string -StoragePolicy::createPattern(const string & clusterName, int distributor) const -{ - vespalib::asciistream ost; - - ost << "storage/cluster." << clusterName << "/distributor/"; - - if (distributor == -1) { - ost << '*'; - } else { - ost << distributor; - } - ost << "/default"; - return ost.str(); -} - -void -StoragePolicy::configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config) -{ - try { - _nextDistribution = std::make_unique<storage::lib::Distribution>(*config); - } catch (const std::exception& e) { - LOG(warning, "Got exception when configuring distribution, config id was %s", _clusterConfigId.c_str()); - throw e; - } -} - -void -StoragePolicy::doSelect(mbus::RoutingContext &context) -{ - const mbus::Message &msg = context.getMessage(); - - int distributor = -1; - - if (_state.get()) { - document::BucketId id; - switch(msg.getType()) { - case DocumentProtocol::MESSAGE_PUTDOCUMENT: - id = _bucketIdFactory.getBucketId(static_cast<const PutDocumentMessage&>(msg).getDocument().getId()); - break; - - case DocumentProtocol::MESSAGE_GETDOCUMENT: - id = _bucketIdFactory.getBucketId(static_cast<const GetDocumentMessage&>(msg).getDocumentId()); - break; - - case DocumentProtocol::MESSAGE_REMOVEDOCUMENT: - id = _bucketIdFactory.getBucketId(static_cast<const RemoveDocumentMessage&>(msg).getDocumentId()); - break; - - case DocumentProtocol::MESSAGE_UPDATEDOCUMENT: - id = _bucketIdFactory.getBucketId(static_cast<const UpdateDocumentMessage&>(msg).getDocumentUpdate().getId()); - break; - - case DocumentProtocol::MESSAGE_STATBUCKET: - id = static_cast<const StatBucketMessage&>(msg).getBucketId(); - break; - - case DocumentProtocol::MESSAGE_GETBUCKETLIST: - id = static_cast<const GetBucketListMessage&>(msg).getBucketId(); - break; - - case DocumentProtocol::MESSAGE_CREATEVISITOR: - id = static_cast<const CreateVisitorMessage&>(msg).getBuckets()[0]; - break; - - case DocumentProtocol::MESSAGE_REMOVELOCATION: - id = static_cast<const RemoveLocationMessage&>(msg).getBucketId(); - break; - - default: - LOG(error, "Message type '%d' not supported.", msg.getType()); - return; - } - - // _P_A_R_A_N_O_I_A_ - if (id.getRawId() == 0) { - mbus::Reply::UP reply(new mbus::EmptyReply()); - reply->addError(mbus::Error(mbus::ErrorCode::APP_FATAL_ERROR, - "No bucket id available in message.")); - context.setReply(std::move(reply)); - return; - } - - // Pick a distributor using ideal state algorithm - try { - // Update distribution here, to make it not take lock in average case - if (_nextDistribution) { - _distribution = std::move(_nextDistribution); - _nextDistribution.reset(); - } - assert(_distribution.get()); - distributor = _distribution->getIdealDistributorNode(*_state, id); - } catch (storage::lib::TooFewBucketBitsInUseException& e) { - auto reply = std::make_unique<WrongDistributionReply>(_state->toString()); - reply->addError(mbus::Error( - DocumentProtocol::ERROR_WRONG_DISTRIBUTION, - "Too few distribution bits used for given cluster state")); - context.setReply(std::move(reply)); - return; - } catch (storage::lib::NoDistributorsAvailableException& e) { - // No distributors available in current cluster state. Remove - // cluster state we cannot use and send to random target - _state.reset(); - distributor = -1; - } - } - - mbus::Hop hop = getRecipient(context, distributor); - - if (distributor != -1 && !hop.hasDirectives()) { - hop = getRecipient(context, -1); - } - - if (hop.hasDirectives()) { - mbus::Route route = context.getRoute(); - route.setHop(0, hop); - context.addChild(route); - } else { - context.setError( - mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE, - make_string("Could not resolve a distributor to send to in cluster %s", _clusterName.c_str())); - } -} - -mbus::Hop -StoragePolicy::getRecipient(mbus::RoutingContext& context, int distributor) -{ - slobrok::api::IMirrorAPI::SpecList entries = lookup(context, createPattern(_clusterName, distributor)); - - if (!entries.empty()) { - return mbus::Hop::parse(entries[random() % entries.size()].second + "/default"); - } - - return mbus::Hop(); -} - -void -StoragePolicy::merge(mbus::RoutingContext &context) -{ - mbus::RoutingNodeIterator it = context.getChildIterator(); - mbus::Reply::UP reply = it.removeReply(); - - if (reply->getType() == DocumentProtocol::REPLY_WRONGDISTRIBUTION) { - updateStateFromReply(static_cast<WrongDistributionReply&>(*reply)); - } else if (reply->hasErrors()) { - _state.reset(); - } - - context.setReply(std::move(reply)); -} - -void -StoragePolicy::updateStateFromReply(WrongDistributionReply& wdr) -{ - std::unique_ptr<storage::lib::ClusterState> newState( - new storage::lib::ClusterState(wdr.getSystemState())); - if (!_state || newState->getVersion() >= _state->getVersion()) { - if (_state) { - wdr.getTrace().trace(1, make_string("System state changed from version %u to %u", - _state->getVersion(), newState->getVersion())); - } else { - wdr.getTrace().trace(1, make_string("System state set to version %u", newState->getVersion())); - } - - _state = std::move(newState); - } else { - wdr.getTrace().trace(1, make_string("System state cleared because system state returned had version %d, " - "while old state had version %d. New states should not have a lower version than the old.", - newState->getVersion(), _state->getVersion())); - _state.reset(); - } -} - -} // documentapi diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h deleted file mode 100644 index 5cd2efcbbd3..00000000000 --- a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include "externslobrokpolicy.h" -#include <vespa/documentapi/messagebus/messages/wrongdistributionreply.h> -#include <vespa/vdslib/distribution/distribution.h> -#include <vespa/document/bucket/bucketidfactory.h> -#include <vespa/messagebus/routing/hop.h> -#include <vespa/config/helper/ifetchercallback.h> -#include <vespa/config/helper/configfetcher.h> - -namespace config { - class ICallback; - class ConfigFetcher; -} - -namespace storage { -namespace lib { - class Distribution; - class ClusterState; -} -} - -namespace documentapi { - -class StoragePolicy : public ExternSlobrokPolicy -{ -private: - document::BucketIdFactory _bucketIdFactory; - std::unique_ptr<storage::lib::ClusterState> _state; - string _clusterName; - string _clusterConfigId; - std::unique_ptr<config::ICallback> _callBack; - std::unique_ptr<config::ConfigFetcher> _configFetcher; - std::unique_ptr<storage::lib::Distribution> _distribution; - std::unique_ptr<storage::lib::Distribution> _nextDistribution; - - mbus::Hop getRecipient(mbus::RoutingContext& context, int distributor); - -public: - StoragePolicy(const string& param); - ~StoragePolicy(); - void doSelect(mbus::RoutingContext &context) override; - void merge(mbus::RoutingContext &context) override; - - void updateStateFromReply(WrongDistributionReply& reply); - - /** - * @return a pointer to the system state registered with this policy. If - * we haven't received a system state yet, returns NULL. - */ - const storage::lib::ClusterState* getSystemState() const { return _state.get(); } - - virtual void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config); - string init() override; - -private: - virtual string createConfigId(const string & clusterName) const; - string createPattern(const string & clusterName, int distributor) const; -}; - -} - diff --git a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp index 2c244c63046..f945fe8cd02 100644 --- a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp @@ -1,16 +1,15 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "routingpolicyfactories.h" #include <vespa/documentapi/messagebus/policies/andpolicy.h> +#include <vespa/documentapi/messagebus/policies/contentpolicy.h> #include <vespa/documentapi/messagebus/policies/documentrouteselectorpolicy.h> #include <vespa/documentapi/messagebus/policies/errorpolicy.h> #include <vespa/documentapi/messagebus/policies/externpolicy.h> +#include <vespa/documentapi/messagebus/policies/loadbalancerpolicy.h> #include <vespa/documentapi/messagebus/policies/localservicepolicy.h> +#include <vespa/documentapi/messagebus/policies/messagetypepolicy.h> #include <vespa/documentapi/messagebus/policies/roundrobinpolicy.h> #include <vespa/documentapi/messagebus/policies/subsetservicepolicy.h> -#include <vespa/documentapi/messagebus/policies/storagepolicy.h> -#include <vespa/documentapi/messagebus/policies/contentpolicy.h> -#include <vespa/documentapi/messagebus/policies/messagetypepolicy.h> -#include <vespa/documentapi/messagebus/policies/loadbalancerpolicy.h> using namespace documentapi; @@ -21,17 +20,6 @@ RoutingPolicyFactories::AndPolicyFactory::createPolicy(const string ¶m) cons } mbus::IRoutingPolicy::UP -RoutingPolicyFactories::StoragePolicyFactory::createPolicy(const string ¶m) const -{ - mbus::IRoutingPolicy::UP ret(new StoragePolicy(param)); - string error = static_cast<StoragePolicy&>(*ret).getError(); - if (!error.empty()) { - ret.reset(new ErrorPolicy(error)); - } - return ret; -} - -mbus::IRoutingPolicy::UP RoutingPolicyFactories::MessageTypePolicyFactory::createPolicy(const string ¶m) const { return mbus::IRoutingPolicy::UP(new MessageTypePolicy(param)); diff --git a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h index e2bf5119c58..533ad93e644 100644 --- a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h +++ b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h @@ -16,10 +16,6 @@ public: public: mbus::IRoutingPolicy::UP createPolicy(const string ¶m) const override; }; - class StoragePolicyFactory : public IRoutingPolicyFactory { - public: - mbus::IRoutingPolicy::UP createPolicy(const string ¶m) const override; - }; class MessageTypePolicyFactory : public IRoutingPolicyFactory { public: mbus::IRoutingPolicy::UP createPolicy(const string ¶m) const override; |