diff options
Diffstat (limited to 'documentapi/src')
9 files changed, 321 insertions, 373 deletions
diff --git a/documentapi/src/tests/policies/policies_test.cpp b/documentapi/src/tests/policies/policies_test.cpp index 0c659f589d6..02bd6b297d0 100644 --- a/documentapi/src/tests/policies/policies_test.cpp +++ b/documentapi/src/tests/policies/policies_test.cpp @@ -4,13 +4,13 @@ #include <vespa/documentapi/documentapi.h> #include <vespa/documentapi/messagebus/policies/andpolicy.h> +#include <vespa/documentapi/messagebus/policies/contentpolicy.h> #include <vespa/documentapi/messagebus/policies/documentrouteselectorpolicy.h> #include <vespa/documentapi/messagebus/policies/errorpolicy.h> #include <vespa/documentapi/messagebus/policies/externpolicy.h> #include <vespa/documentapi/messagebus/policies/loadbalancerpolicy.h> #include <vespa/documentapi/messagebus/policies/localservicepolicy.h> #include <vespa/documentapi/messagebus/policies/roundrobinpolicy.h> -#include <vespa/documentapi/messagebus/policies/storagepolicy.h> #include <vespa/documentapi/messagebus/policies/subsetservicepolicy.h> #include <vespa/messagebus/emptyreply.h> #include <vespa/messagebus/routing/routingnode.h> @@ -51,7 +51,7 @@ private: private: bool trySelect(TestFrame &frame, uint32_t numSelects, const std::vector<string> &expected); void setupExternPolicy(TestFrame &frame, mbus::Slobrok &slobrok, const string &pattern, int32_t numEntries = -1); - StoragePolicy &setupStoragePolicy(TestFrame &frame, const string ¶m, + ContentPolicy &setupContentPolicy(TestFrame &frame, const string ¶m, const string &pattern = "", int32_t numEntries = -1); bool isErrorPolicy(const string &name, const string ¶m); void assertMirrorReady(const IMirrorAPI &mirror); @@ -83,10 +83,10 @@ public: void requireThatExternPolicyWithUnknownPatternSelectsNone(); void requireThatExternPolicySelectsFromExternSlobrok(); void requireThatExternPolicyMergesOneReplyAsProtocol(); - void requireThatStoragePolicyWithIllegalParamIsAnErrorPolicy(); - void requireThatStoragePolicyIsRandomWithoutState(); - void requireThatStoragePolicyIsTargetedWithState(); - void requireThatStoragePolicyCombinesSystemAndSlobrokState(); + void requireThatContentPolicyWithIllegalParamIsAnErrorPolicy(); + void requireThatContentPolicyIsRandomWithoutState(); + void requireThatContentPolicyIsTargetedWithState(); + void requireThatContentPolicyCombinesSystemAndSlobrokState(); }; TEST_APPHOOK(Test); @@ -128,10 +128,10 @@ Test::Main() { requireThatExternPolicySelectsFromExternSlobrok(); TEST_FLUSH(); requireThatExternPolicyMergesOneReplyAsProtocol(); TEST_FLUSH(); - requireThatStoragePolicyWithIllegalParamIsAnErrorPolicy(); TEST_FLUSH(); - requireThatStoragePolicyIsRandomWithoutState(); TEST_FLUSH(); - requireThatStoragePolicyIsTargetedWithState(); TEST_FLUSH(); - requireThatStoragePolicyCombinesSystemAndSlobrokState(); TEST_FLUSH(); + requireThatContentPolicyWithIllegalParamIsAnErrorPolicy(); TEST_FLUSH(); + requireThatContentPolicyIsRandomWithoutState(); TEST_FLUSH(); + requireThatContentPolicyIsTargetedWithState(); TEST_FLUSH(); + requireThatContentPolicyCombinesSystemAndSlobrokState(); TEST_FLUSH(); TEST_DONE(); } @@ -782,15 +782,15 @@ void Test::testLoadBalancer() { } void -Test::requireThatStoragePolicyWithIllegalParamIsAnErrorPolicy() +Test::requireThatContentPolicyWithIllegalParamIsAnErrorPolicy() { - EXPECT_TRUE(isErrorPolicy("Storage", "")); - EXPECT_TRUE(isErrorPolicy("Storage", "config=foo;slobroks=foo")); - EXPECT_TRUE(isErrorPolicy("Storage", "slobroks=foo")); + EXPECT_TRUE(isErrorPolicy("Content", "")); + EXPECT_TRUE(isErrorPolicy("Content", "config=foo;slobroks=foo")); + EXPECT_TRUE(isErrorPolicy("Content", "slobroks=foo")); } void -Test::requireThatStoragePolicyIsRandomWithoutState() +Test::requireThatContentPolicyIsRandomWithoutState() { TestFrame frame(_repo); frame.setMessage(newPutDocumentMessage("id:ns:testdoc::")); @@ -808,7 +808,7 @@ Test::requireThatStoragePolicyIsRandomWithoutState() string param = vespalib::make_string( "cluster=mycluster;slobroks=tcp/localhost:%d;clusterconfigid=%s;syncinit", slobrok.port(), getDefaultDistributionConfig(2, 5).c_str()); - StoragePolicy &policy = setupStoragePolicy( + ContentPolicy &policy = setupContentPolicy( frame, param, "storage/cluster.mycluster/distributor/*/default", 5); ASSERT_TRUE(policy.getSystemState() == nullptr); @@ -826,15 +826,15 @@ Test::requireThatStoragePolicyIsRandomWithoutState() } } -StoragePolicy & -Test::setupStoragePolicy(TestFrame &frame, const string ¶m, +ContentPolicy & +Test::setupContentPolicy(TestFrame &frame, const string ¶m, const string &pattern, int32_t numEntries) { - frame.setHop(mbus::HopSpec("test", vespalib::make_string("[Storage:%s]", param.c_str()))); + frame.setHop(mbus::HopSpec("test", vespalib::make_string("[Content:%s]", param.c_str()))); mbus::MessageBus &mbus = frame.getMessageBus(); const mbus::HopBlueprint *hop = mbus.getRoutingTable(DocumentProtocol::NAME)->getHop("test"); const mbus::PolicyDirective dir = static_cast<mbus::PolicyDirective&>(*hop->getDirective(0)); - StoragePolicy &policy = static_cast<StoragePolicy&>(*mbus.getRoutingPolicy(DocumentProtocol::NAME, + ContentPolicy &policy = static_cast<ContentPolicy&>(*mbus.getRoutingPolicy(DocumentProtocol::NAME, dir.getName(), dir.getParam())); policy.initSynchronous(); assertMirrorReady(*policy.getMirror()); @@ -845,7 +845,7 @@ Test::setupStoragePolicy(TestFrame &frame, const string ¶m, } void -Test::requireThatStoragePolicyIsTargetedWithState() +Test::requireThatContentPolicyIsTargetedWithState() { TestFrame frame(_repo); frame.setMessage(newPutDocumentMessage("id:ns:testdoc::")); @@ -863,7 +863,7 @@ Test::requireThatStoragePolicyIsTargetedWithState() string param = vespalib::make_string( "cluster=mycluster;slobroks=tcp/localhost:%d;clusterconfigid=%s;syncinit", slobrok.port(), getDefaultDistributionConfig(2, 5).c_str()); - StoragePolicy &policy = setupStoragePolicy( + ContentPolicy &policy = setupContentPolicy( frame, param, "storage/cluster.mycluster/distributor/*/default", 5); ASSERT_TRUE(policy.getSystemState() == nullptr); @@ -888,7 +888,7 @@ Test::requireThatStoragePolicyIsTargetedWithState() } void -Test::requireThatStoragePolicyCombinesSystemAndSlobrokState() +Test::requireThatContentPolicyCombinesSystemAndSlobrokState() { TestFrame frame(_repo); frame.setMessage(newPutDocumentMessage("id:ns:testdoc::")); @@ -902,7 +902,7 @@ Test::requireThatStoragePolicyCombinesSystemAndSlobrokState() string param = vespalib::make_string( "cluster=mycluster;slobroks=tcp/localhost:%d;clusterconfigid=%s;syncinit", slobrok.port(), getDefaultDistributionConfig(2, 5).c_str()); - StoragePolicy &policy = setupStoragePolicy( + ContentPolicy &policy = setupContentPolicy( frame, param, "storage/cluster.mycluster/distributor/*/default", 1); ASSERT_TRUE(policy.getSystemState() == nullptr); diff --git a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp index 560f2f28f0e..54afeefac39 100644 --- a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp @@ -32,14 +32,14 @@ DocumentProtocol::DocumentProtocol(std::shared_ptr<const DocumentTypeRepo> repo, // When adding factories to this list, please KEEP THEM ORDERED alphabetically like they are now. putRoutingPolicyFactory("AND", std::make_shared<RoutingPolicyFactories::AndPolicyFactory>()); putRoutingPolicyFactory("Content", std::make_shared<RoutingPolicyFactories::ContentPolicyFactory>()); - putRoutingPolicyFactory("MessageType", std::make_shared<RoutingPolicyFactories::MessageTypePolicyFactory>()); + putRoutingPolicyFactory("Storage", std::make_shared<RoutingPolicyFactories::ContentPolicyFactory>()); // TODO: remove putRoutingPolicyFactory("DocumentRouteSelector", std::make_shared<RoutingPolicyFactories::DocumentRouteSelectorPolicyFactory>(*_repo, cfg)); putRoutingPolicyFactory("Extern", std::make_shared<RoutingPolicyFactories::ExternPolicyFactory>()); + putRoutingPolicyFactory("LoadBalancer", std::make_shared<RoutingPolicyFactories::LoadBalancerPolicyFactory>()); putRoutingPolicyFactory("LocalService", std::make_shared<RoutingPolicyFactories::LocalServicePolicyFactory>()); + putRoutingPolicyFactory("MessageType", std::make_shared<RoutingPolicyFactories::MessageTypePolicyFactory>()); putRoutingPolicyFactory("RoundRobin", std::make_shared<RoutingPolicyFactories::RoundRobinPolicyFactory>()); - putRoutingPolicyFactory("Storage", std::make_shared<RoutingPolicyFactories::StoragePolicyFactory>()); putRoutingPolicyFactory("SubsetService", std::make_shared<RoutingPolicyFactories::SubsetServicePolicyFactory>()); - putRoutingPolicyFactory("LoadBalancer", std::make_shared<RoutingPolicyFactories::LoadBalancerPolicyFactory>()); // Prepare version specifications to use when adding routable factories. vespalib::VersionSpecification version6(6, 221); diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt index 26d51e702e9..83e1df02a24 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt +++ b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt @@ -3,7 +3,6 @@ vespa_add_library(documentapi_documentapipolicies OBJECT SOURCES andpolicy.cpp externslobrokpolicy.cpp - storagepolicy.cpp contentpolicy.cpp messagetypepolicy.cpp documentrouteselectorpolicy.cpp diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp index aea393a60af..7150794653f 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp @@ -1,12 +1,79 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "contentpolicy.h" +#include <vespa/document/base/documentid.h> +#include <vespa/document/update/documentupdate.h> +#include <vespa/messagebus/emptyreply.h> +#include <vespa/messagebus/error.h> +#include <vespa/documentapi/documentapi.h> +#include <vespa/vdslib/state/clusterstate.h> +#include <vespa/vespalib/stllike/asciistream.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <vespa/config-stor-distribution.h> +#include <vespa/config/subscription/configuri.h> +#include <cassert> + +#include <vespa/log/log.h> +LOG_SETUP(".contentpolicy"); + +using vespalib::make_string; namespace documentapi { ContentPolicy::ContentPolicy(const string& param) - : StoragePolicy(param) -{ } + : ExternSlobrokPolicy(parse(param)), + _bucketIdFactory() +{ + std::map<string, string> params(parse(param)); + + if (params.find("cluster") != params.end()) { + _clusterName = params.find("cluster")->second; + } else { + _error = "Required parameter clustername not set"; + } + + if (params.find("clusterconfigid") != params.end()) { + _clusterConfigId = params.find("clusterconfigid")->second; + } +} + +namespace { + class CallBack : public config::IFetcherCallback<storage::lib::Distribution::DistributionConfig> + { + public: + CallBack(ContentPolicy & policy) : _policy(policy) { } + void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config) override { + _policy.configure(std::move(config)); + } + private: + ContentPolicy & _policy; + }; +} +string ContentPolicy::init() +{ + string error = ExternSlobrokPolicy::init(); + if (!error.empty()) { + return error; + } + + if (_clusterConfigId.empty()) { + _clusterConfigId = createConfigId(_clusterName); + } + + using storage::lib::Distribution; + config::ConfigUri uri(_clusterConfigId); + if (!_configSources.empty()) { + _configFetcher.reset(new config::ConfigFetcher(config::ServerSpec(_configSources))); + } else { + _configFetcher.reset(new config::ConfigFetcher(uri.getContext())); + } + _callBack = std::make_unique<CallBack>(*this); + _configFetcher->subscribe<vespa::config::content::StorDistributionConfig>(uri.getConfigId(), static_cast<CallBack *>(_callBack.get())); + _configFetcher->start(); + return ""; +} + +ContentPolicy::~ContentPolicy() = default; string ContentPolicy::createConfigId(const string & clusterName) const @@ -14,4 +81,177 @@ ContentPolicy::createConfigId(const string & clusterName) const return clusterName; } +string +ContentPolicy::createPattern(const string & clusterName, int distributor) const +{ + vespalib::asciistream ost; + + ost << "storage/cluster." << clusterName << "/distributor/"; + + if (distributor == -1) { + ost << '*'; + } else { + ost << distributor; + } + ost << "/default"; + return ost.str(); +} + +void +ContentPolicy::configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config) +{ + try { + _nextDistribution = std::make_unique<storage::lib::Distribution>(*config); + } catch (const std::exception& e) { + LOG(warning, "Got exception when configuring distribution, config id was %s", _clusterConfigId.c_str()); + throw e; + } +} + +void +ContentPolicy::doSelect(mbus::RoutingContext &context) +{ + const mbus::Message &msg = context.getMessage(); + + int distributor = -1; + + if (_state.get()) { + document::BucketId id; + switch(msg.getType()) { + case DocumentProtocol::MESSAGE_PUTDOCUMENT: + id = _bucketIdFactory.getBucketId(static_cast<const PutDocumentMessage&>(msg).getDocument().getId()); + break; + + case DocumentProtocol::MESSAGE_GETDOCUMENT: + id = _bucketIdFactory.getBucketId(static_cast<const GetDocumentMessage&>(msg).getDocumentId()); + break; + + case DocumentProtocol::MESSAGE_REMOVEDOCUMENT: + id = _bucketIdFactory.getBucketId(static_cast<const RemoveDocumentMessage&>(msg).getDocumentId()); + break; + + case DocumentProtocol::MESSAGE_UPDATEDOCUMENT: + id = _bucketIdFactory.getBucketId(static_cast<const UpdateDocumentMessage&>(msg).getDocumentUpdate().getId()); + break; + + case DocumentProtocol::MESSAGE_STATBUCKET: + id = static_cast<const StatBucketMessage&>(msg).getBucketId(); + break; + + case DocumentProtocol::MESSAGE_GETBUCKETLIST: + id = static_cast<const GetBucketListMessage&>(msg).getBucketId(); + break; + + case DocumentProtocol::MESSAGE_CREATEVISITOR: + id = static_cast<const CreateVisitorMessage&>(msg).getBuckets()[0]; + break; + + case DocumentProtocol::MESSAGE_REMOVELOCATION: + id = static_cast<const RemoveLocationMessage&>(msg).getBucketId(); + break; + + default: + LOG(error, "Message type '%d' not supported.", msg.getType()); + return; + } + + // _P_A_R_A_N_O_I_A_ + if (id.getRawId() == 0) { + mbus::Reply::UP reply(new mbus::EmptyReply()); + reply->addError(mbus::Error(mbus::ErrorCode::APP_FATAL_ERROR, + "No bucket id available in message.")); + context.setReply(std::move(reply)); + return; + } + + // Pick a distributor using ideal state algorithm + try { + // Update distribution here, to make it not take lock in average case + if (_nextDistribution) { + _distribution = std::move(_nextDistribution); + _nextDistribution.reset(); + } + assert(_distribution.get()); + distributor = _distribution->getIdealDistributorNode(*_state, id); + } catch (storage::lib::TooFewBucketBitsInUseException& e) { + auto reply = std::make_unique<WrongDistributionReply>(_state->toString()); + reply->addError(mbus::Error( + DocumentProtocol::ERROR_WRONG_DISTRIBUTION, + "Too few distribution bits used for given cluster state")); + context.setReply(std::move(reply)); + return; + } catch (storage::lib::NoDistributorsAvailableException& e) { + // No distributors available in current cluster state. Remove + // cluster state we cannot use and send to random target + _state.reset(); + distributor = -1; + } + } + + mbus::Hop hop = getRecipient(context, distributor); + + if (distributor != -1 && !hop.hasDirectives()) { + hop = getRecipient(context, -1); + } + + if (hop.hasDirectives()) { + mbus::Route route = context.getRoute(); + route.setHop(0, hop); + context.addChild(route); + } else { + context.setError( + mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE, + make_string("Could not resolve a distributor to send to in cluster %s", _clusterName.c_str())); + } +} + +mbus::Hop +ContentPolicy::getRecipient(mbus::RoutingContext& context, int distributor) +{ + slobrok::api::IMirrorAPI::SpecList entries = lookup(context, createPattern(_clusterName, distributor)); + + if (!entries.empty()) { + return mbus::Hop::parse(entries[random() % entries.size()].second + "/default"); + } + + return mbus::Hop(); +} + +void +ContentPolicy::merge(mbus::RoutingContext &context) +{ + mbus::RoutingNodeIterator it = context.getChildIterator(); + mbus::Reply::UP reply = it.removeReply(); + + if (reply->getType() == DocumentProtocol::REPLY_WRONGDISTRIBUTION) { + updateStateFromReply(static_cast<WrongDistributionReply&>(*reply)); + } else if (reply->hasErrors()) { + _state.reset(); + } + + context.setReply(std::move(reply)); +} + +void +ContentPolicy::updateStateFromReply(WrongDistributionReply& wdr) +{ + std::unique_ptr<storage::lib::ClusterState> newState( + new storage::lib::ClusterState(wdr.getSystemState())); + if (!_state || newState->getVersion() >= _state->getVersion()) { + if (_state) { + wdr.getTrace().trace(1, make_string("System state changed from version %u to %u", + _state->getVersion(), newState->getVersion())); + } else { + wdr.getTrace().trace(1, make_string("System state set to version %u", newState->getVersion())); + } + + _state = std::move(newState); + } else { + wdr.getTrace().trace(1, make_string("System state cleared because system state returned had version %d, " + "while old state had version %d. New states should not have a lower version than the old.", + newState->getVersion(), _state->getVersion())); + _state.reset(); + } +} + } // documentapi diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h index 4b2f356c740..e29fbb75524 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h +++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h @@ -1,17 +1,62 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "storagepolicy.h" +#include "externslobrokpolicy.h" +#include <vespa/documentapi/messagebus/messages/wrongdistributionreply.h> +#include <vespa/vdslib/distribution/distribution.h> +#include <vespa/document/bucket/bucketidfactory.h> +#include <vespa/messagebus/routing/hop.h> +#include <vespa/config/helper/ifetchercallback.h> +#include <vespa/config/helper/configfetcher.h> + +namespace config { + class ICallback; + class ConfigFetcher; +} + +namespace storage { +namespace lib { + class Distribution; + class ClusterState; +} +} namespace documentapi { -class ContentPolicy : public StoragePolicy +class ContentPolicy : public ExternSlobrokPolicy { +private: + document::BucketIdFactory _bucketIdFactory; + std::unique_ptr<storage::lib::ClusterState> _state; + string _clusterName; + string _clusterConfigId; + std::unique_ptr<config::ICallback> _callBack; + std::unique_ptr<config::ConfigFetcher> _configFetcher; + std::unique_ptr<storage::lib::Distribution> _distribution; + std::unique_ptr<storage::lib::Distribution> _nextDistribution; + + mbus::Hop getRecipient(mbus::RoutingContext& context, int distributor); + public: ContentPolicy(const string& param); + ~ContentPolicy(); + void doSelect(mbus::RoutingContext &context) override; + void merge(mbus::RoutingContext &context) override; + + void updateStateFromReply(WrongDistributionReply& reply); + + /** + * @return a pointer to the system state registered with this policy. If + * we haven't received a system state yet, returns NULL. + */ + const storage::lib::ClusterState* getSystemState() const { return _state.get(); } + + virtual void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config); + string init() override; + private: - string createConfigId(const string & clusterName) const override; + string createConfigId(const string & clusterName) const; + string createPattern(const string & clusterName, int distributor) const; }; } - diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp deleted file mode 100644 index 3fc1df0352a..00000000000 --- a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp +++ /dev/null @@ -1,257 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "storagepolicy.h" -#include <vespa/document/base/documentid.h> -#include <vespa/document/update/documentupdate.h> -#include <vespa/messagebus/emptyreply.h> -#include <vespa/messagebus/error.h> -#include <vespa/documentapi/documentapi.h> -#include <vespa/vdslib/state/clusterstate.h> -#include <vespa/vespalib/stllike/asciistream.h> -#include <vespa/vespalib/util/stringfmt.h> -#include <vespa/config-stor-distribution.h> -#include <vespa/config/subscription/configuri.h> -#include <cassert> - -#include <vespa/log/log.h> -LOG_SETUP(".storagepolicy"); - -using vespalib::make_string; - -namespace documentapi { - -StoragePolicy::StoragePolicy(const string& param) - : ExternSlobrokPolicy(parse(param)), - _bucketIdFactory() -{ - std::map<string, string> params(parse(param)); - - if (params.find("cluster") != params.end()) { - _clusterName = params.find("cluster")->second; - } else { - _error = "Required parameter clustername not set"; - } - - if (params.find("clusterconfigid") != params.end()) { - _clusterConfigId = params.find("clusterconfigid")->second; - } -} - -namespace { - class CallBack : public config::IFetcherCallback<storage::lib::Distribution::DistributionConfig> - { - public: - CallBack(StoragePolicy & policy) : _policy(policy) { } - void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config) override { - _policy.configure(std::move(config)); - } - private: - StoragePolicy & _policy; - }; -} -string StoragePolicy::init() -{ - string error = ExternSlobrokPolicy::init(); - if (!error.empty()) { - return error; - } - - if (_clusterConfigId.empty()) { - _clusterConfigId = createConfigId(_clusterName); - } - - using storage::lib::Distribution; - config::ConfigUri uri(_clusterConfigId); - if (!_configSources.empty()) { - _configFetcher.reset(new config::ConfigFetcher(config::ServerSpec(_configSources))); - } else { - _configFetcher.reset(new config::ConfigFetcher(uri.getContext())); - } - _callBack = std::make_unique<CallBack>(*this); - _configFetcher->subscribe<vespa::config::content::StorDistributionConfig>(uri.getConfigId(), static_cast<CallBack *>(_callBack.get())); - _configFetcher->start(); - return ""; -} - -StoragePolicy::~StoragePolicy() = default; - -string -StoragePolicy::createConfigId(const string & clusterName) const -{ - return "storage/cluster." + clusterName; -} - -string -StoragePolicy::createPattern(const string & clusterName, int distributor) const -{ - vespalib::asciistream ost; - - ost << "storage/cluster." << clusterName << "/distributor/"; - - if (distributor == -1) { - ost << '*'; - } else { - ost << distributor; - } - ost << "/default"; - return ost.str(); -} - -void -StoragePolicy::configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config) -{ - try { - _nextDistribution = std::make_unique<storage::lib::Distribution>(*config); - } catch (const std::exception& e) { - LOG(warning, "Got exception when configuring distribution, config id was %s", _clusterConfigId.c_str()); - throw e; - } -} - -void -StoragePolicy::doSelect(mbus::RoutingContext &context) -{ - const mbus::Message &msg = context.getMessage(); - - int distributor = -1; - - if (_state.get()) { - document::BucketId id; - switch(msg.getType()) { - case DocumentProtocol::MESSAGE_PUTDOCUMENT: - id = _bucketIdFactory.getBucketId(static_cast<const PutDocumentMessage&>(msg).getDocument().getId()); - break; - - case DocumentProtocol::MESSAGE_GETDOCUMENT: - id = _bucketIdFactory.getBucketId(static_cast<const GetDocumentMessage&>(msg).getDocumentId()); - break; - - case DocumentProtocol::MESSAGE_REMOVEDOCUMENT: - id = _bucketIdFactory.getBucketId(static_cast<const RemoveDocumentMessage&>(msg).getDocumentId()); - break; - - case DocumentProtocol::MESSAGE_UPDATEDOCUMENT: - id = _bucketIdFactory.getBucketId(static_cast<const UpdateDocumentMessage&>(msg).getDocumentUpdate().getId()); - break; - - case DocumentProtocol::MESSAGE_STATBUCKET: - id = static_cast<const StatBucketMessage&>(msg).getBucketId(); - break; - - case DocumentProtocol::MESSAGE_GETBUCKETLIST: - id = static_cast<const GetBucketListMessage&>(msg).getBucketId(); - break; - - case DocumentProtocol::MESSAGE_CREATEVISITOR: - id = static_cast<const CreateVisitorMessage&>(msg).getBuckets()[0]; - break; - - case DocumentProtocol::MESSAGE_REMOVELOCATION: - id = static_cast<const RemoveLocationMessage&>(msg).getBucketId(); - break; - - default: - LOG(error, "Message type '%d' not supported.", msg.getType()); - return; - } - - // _P_A_R_A_N_O_I_A_ - if (id.getRawId() == 0) { - mbus::Reply::UP reply(new mbus::EmptyReply()); - reply->addError(mbus::Error(mbus::ErrorCode::APP_FATAL_ERROR, - "No bucket id available in message.")); - context.setReply(std::move(reply)); - return; - } - - // Pick a distributor using ideal state algorithm - try { - // Update distribution here, to make it not take lock in average case - if (_nextDistribution) { - _distribution = std::move(_nextDistribution); - _nextDistribution.reset(); - } - assert(_distribution.get()); - distributor = _distribution->getIdealDistributorNode(*_state, id); - } catch (storage::lib::TooFewBucketBitsInUseException& e) { - auto reply = std::make_unique<WrongDistributionReply>(_state->toString()); - reply->addError(mbus::Error( - DocumentProtocol::ERROR_WRONG_DISTRIBUTION, - "Too few distribution bits used for given cluster state")); - context.setReply(std::move(reply)); - return; - } catch (storage::lib::NoDistributorsAvailableException& e) { - // No distributors available in current cluster state. Remove - // cluster state we cannot use and send to random target - _state.reset(); - distributor = -1; - } - } - - mbus::Hop hop = getRecipient(context, distributor); - - if (distributor != -1 && !hop.hasDirectives()) { - hop = getRecipient(context, -1); - } - - if (hop.hasDirectives()) { - mbus::Route route = context.getRoute(); - route.setHop(0, hop); - context.addChild(route); - } else { - context.setError( - mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE, - make_string("Could not resolve a distributor to send to in cluster %s", _clusterName.c_str())); - } -} - -mbus::Hop -StoragePolicy::getRecipient(mbus::RoutingContext& context, int distributor) -{ - slobrok::api::IMirrorAPI::SpecList entries = lookup(context, createPattern(_clusterName, distributor)); - - if (!entries.empty()) { - return mbus::Hop::parse(entries[random() % entries.size()].second + "/default"); - } - - return mbus::Hop(); -} - -void -StoragePolicy::merge(mbus::RoutingContext &context) -{ - mbus::RoutingNodeIterator it = context.getChildIterator(); - mbus::Reply::UP reply = it.removeReply(); - - if (reply->getType() == DocumentProtocol::REPLY_WRONGDISTRIBUTION) { - updateStateFromReply(static_cast<WrongDistributionReply&>(*reply)); - } else if (reply->hasErrors()) { - _state.reset(); - } - - context.setReply(std::move(reply)); -} - -void -StoragePolicy::updateStateFromReply(WrongDistributionReply& wdr) -{ - std::unique_ptr<storage::lib::ClusterState> newState( - new storage::lib::ClusterState(wdr.getSystemState())); - if (!_state || newState->getVersion() >= _state->getVersion()) { - if (_state) { - wdr.getTrace().trace(1, make_string("System state changed from version %u to %u", - _state->getVersion(), newState->getVersion())); - } else { - wdr.getTrace().trace(1, make_string("System state set to version %u", newState->getVersion())); - } - - _state = std::move(newState); - } else { - wdr.getTrace().trace(1, make_string("System state cleared because system state returned had version %d, " - "while old state had version %d. New states should not have a lower version than the old.", - newState->getVersion(), _state->getVersion())); - _state.reset(); - } -} - -} // documentapi diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h deleted file mode 100644 index 5cd2efcbbd3..00000000000 --- a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include "externslobrokpolicy.h" -#include <vespa/documentapi/messagebus/messages/wrongdistributionreply.h> -#include <vespa/vdslib/distribution/distribution.h> -#include <vespa/document/bucket/bucketidfactory.h> -#include <vespa/messagebus/routing/hop.h> -#include <vespa/config/helper/ifetchercallback.h> -#include <vespa/config/helper/configfetcher.h> - -namespace config { - class ICallback; - class ConfigFetcher; -} - -namespace storage { -namespace lib { - class Distribution; - class ClusterState; -} -} - -namespace documentapi { - -class StoragePolicy : public ExternSlobrokPolicy -{ -private: - document::BucketIdFactory _bucketIdFactory; - std::unique_ptr<storage::lib::ClusterState> _state; - string _clusterName; - string _clusterConfigId; - std::unique_ptr<config::ICallback> _callBack; - std::unique_ptr<config::ConfigFetcher> _configFetcher; - std::unique_ptr<storage::lib::Distribution> _distribution; - std::unique_ptr<storage::lib::Distribution> _nextDistribution; - - mbus::Hop getRecipient(mbus::RoutingContext& context, int distributor); - -public: - StoragePolicy(const string& param); - ~StoragePolicy(); - void doSelect(mbus::RoutingContext &context) override; - void merge(mbus::RoutingContext &context) override; - - void updateStateFromReply(WrongDistributionReply& reply); - - /** - * @return a pointer to the system state registered with this policy. If - * we haven't received a system state yet, returns NULL. - */ - const storage::lib::ClusterState* getSystemState() const { return _state.get(); } - - virtual void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config); - string init() override; - -private: - virtual string createConfigId(const string & clusterName) const; - string createPattern(const string & clusterName, int distributor) const; -}; - -} - diff --git a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp index 2c244c63046..f945fe8cd02 100644 --- a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp @@ -1,16 +1,15 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "routingpolicyfactories.h" #include <vespa/documentapi/messagebus/policies/andpolicy.h> +#include <vespa/documentapi/messagebus/policies/contentpolicy.h> #include <vespa/documentapi/messagebus/policies/documentrouteselectorpolicy.h> #include <vespa/documentapi/messagebus/policies/errorpolicy.h> #include <vespa/documentapi/messagebus/policies/externpolicy.h> +#include <vespa/documentapi/messagebus/policies/loadbalancerpolicy.h> #include <vespa/documentapi/messagebus/policies/localservicepolicy.h> +#include <vespa/documentapi/messagebus/policies/messagetypepolicy.h> #include <vespa/documentapi/messagebus/policies/roundrobinpolicy.h> #include <vespa/documentapi/messagebus/policies/subsetservicepolicy.h> -#include <vespa/documentapi/messagebus/policies/storagepolicy.h> -#include <vespa/documentapi/messagebus/policies/contentpolicy.h> -#include <vespa/documentapi/messagebus/policies/messagetypepolicy.h> -#include <vespa/documentapi/messagebus/policies/loadbalancerpolicy.h> using namespace documentapi; @@ -21,17 +20,6 @@ RoutingPolicyFactories::AndPolicyFactory::createPolicy(const string ¶m) cons } mbus::IRoutingPolicy::UP -RoutingPolicyFactories::StoragePolicyFactory::createPolicy(const string ¶m) const -{ - mbus::IRoutingPolicy::UP ret(new StoragePolicy(param)); - string error = static_cast<StoragePolicy&>(*ret).getError(); - if (!error.empty()) { - ret.reset(new ErrorPolicy(error)); - } - return ret; -} - -mbus::IRoutingPolicy::UP RoutingPolicyFactories::MessageTypePolicyFactory::createPolicy(const string ¶m) const { return mbus::IRoutingPolicy::UP(new MessageTypePolicy(param)); diff --git a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h index e2bf5119c58..533ad93e644 100644 --- a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h +++ b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h @@ -16,10 +16,6 @@ public: public: mbus::IRoutingPolicy::UP createPolicy(const string ¶m) const override; }; - class StoragePolicyFactory : public IRoutingPolicyFactory { - public: - mbus::IRoutingPolicy::UP createPolicy(const string ¶m) const override; - }; class MessageTypePolicyFactory : public IRoutingPolicyFactory { public: mbus::IRoutingPolicy::UP createPolicy(const string ¶m) const override; |